~cytrogen/masto-fe

81cdc0f972ce2008f8daab8100cdfd5e7f3906b4 — Emelia Smith 2 years ago 255606d
Fix: Streaming server memory leak in HTTP EventSource cleanup (#26228)

1 files changed, 17 insertions(+), 9 deletions(-)

M streaming/index.js
M streaming/index.js => streaming/index.js +17 -9
@@ 223,8 223,14 @@ const startServer = async () => {
  };

  /**
   * @callback SubscriptionListener
   * @param {ReturnType<parseJSON>} json of the message
   * @returns void
   */

  /**
   * @param {string} channel
   * @param {function(string): void} callback
   * @param {SubscriptionListener} callback
   */
  const subscribe = (channel, callback) => {
    log.silly(`Adding listener for ${channel}`);


@@ 241,7 247,7 @@ const startServer = async () => {

  /**
   * @param {string} channel
   * @param {function(Object<string, any>): void} callback
   * @param {SubscriptionListener} callback
   */
  const unsubscribe = (channel, callback) => {
    log.silly(`Removing listener for ${channel}`);


@@ 613,9 619,9 @@ const startServer = async () => {
   * @param {string[]} ids
   * @param {any} req
   * @param {function(string, string): void} output
   * @param {function(string[], function(string): void): void} attachCloseHandler
   * @param {undefined | function(string[], SubscriptionListener): void} attachCloseHandler
   * @param {boolean=} needsFiltering
   * @returns {function(object): void}
   * @returns {SubscriptionListener}
   */
  const streamFrom = (ids, req, output, attachCloseHandler, needsFiltering = false) => {
    const accountId = req.accountId || req.remoteAddress;


@@ 633,6 639,7 @@ const startServer = async () => {
    // The listener used to process each message off the redis subscription,
    // message here is an object with an `event` and `payload` property. Some
    // events also include a queued_at value, but this is being removed shortly.
    /** @type {SubscriptionListener} */
    const listener = message => {
      const { event, payload } = message;



@@ 825,7 832,7 @@ const startServer = async () => {
      subscribe(`${redisPrefix}${id}`, listener);
    });

    if (attachCloseHandler) {
    if (typeof attachCloseHandler === 'function') {
      attachCloseHandler(ids.map(id => `${redisPrefix}${id}`), listener);
    }



@@ 862,12 869,13 @@ const startServer = async () => {
  /**
   * @param {any} req
   * @param {function(): void} [closeHandler]
   * @returns {function(string[]): void}
   * @returns {function(string[], SubscriptionListener): void}
   */
  const streamHttpEnd = (req, closeHandler = undefined) => (ids) => {

  const streamHttpEnd = (req, closeHandler = undefined) => (ids, listener) => {
    req.on('close', () => {
      ids.forEach(id => {
        unsubscribe(id);
        unsubscribe(id, listener);
      });

      if (closeHandler) {


@@ 1131,7 1139,7 @@ const startServer = async () => {
   * @typedef WebSocketSession
   * @property {any} socket
   * @property {any} request
   * @property {Object.<string, { listener: function(string): void, stopHeartbeat: function(): void }>} subscriptions
   * @property {Object.<string, { listener: SubscriptionListener, stopHeartbeat: function(): void }>} subscriptions
   */

  /**