From 8018d478abccae540e4fa172d6e2e30e88fbd202 Mon Sep 17 00:00:00 2001 From: Emelia Smith Date: Fri, 28 Jul 2023 12:06:29 +0200 Subject: [PATCH] Fix: Streaming server memory leak in HTTP EventSource cleanup (#26228) --- streaming/index.js | 26 +++++++++++++++++--------- 1 file changed, 17 insertions(+), 9 deletions(-) diff --git a/streaming/index.js b/streaming/index.js index 4d0937bf8..816a1e379 100644 --- a/streaming/index.js +++ b/streaming/index.js @@ -228,9 +228,15 @@ const startWorker = async (workerId) => { callbacks.forEach(callback => callback(json)); }; + /** + * @callback SubscriptionListener + * @param {ReturnType} json of the message + * @returns void + */ + /** * @param {string} channel - * @param {function(string): void} callback + * @param {SubscriptionListener} callback */ const subscribe = (channel, callback) => { log.silly(`Adding listener for ${channel}`); @@ -247,7 +253,7 @@ const startWorker = async (workerId) => { /** * @param {string} channel - * @param {function(Object): void} callback + * @param {SubscriptionListener} callback */ const unsubscribe = (channel, callback) => { log.silly(`Removing listener for ${channel}`); @@ -625,9 +631,9 @@ const startWorker = async (workerId) => { * @param {string[]} ids * @param {any} req * @param {function(string, string): void} output - * @param {function(string[], function(string): void): void} attachCloseHandler + * @param {undefined | function(string[], SubscriptionListener): void} attachCloseHandler * @param {boolean=} needsFiltering - * @returns {function(object): void} + * @returns {SubscriptionListener} */ const streamFrom = (ids, req, output, attachCloseHandler, needsFiltering = false) => { const accountId = req.accountId || req.remoteAddress; @@ -645,6 +651,7 @@ const startWorker = async (workerId) => { // The listener used to process each message off the redis subscription, // message here is an object with an `event` and `payload` property. Some // events also include a queued_at value, but this is being removed shortly. + /** @type {SubscriptionListener} */ const listener = message => { const { event, payload } = message; @@ -837,7 +844,7 @@ const startWorker = async (workerId) => { subscribe(`${redisPrefix}${id}`, listener); }); - if (attachCloseHandler) { + if (typeof attachCloseHandler === 'function') { attachCloseHandler(ids.map(id => `${redisPrefix}${id}`), listener); } @@ -874,12 +881,13 @@ const startWorker = async (workerId) => { /** * @param {any} req * @param {function(): void} [closeHandler] - * @return {function(string[]): void} + * @returns {function(string[], SubscriptionListener): void} */ - const streamHttpEnd = (req, closeHandler = undefined) => (ids) => { + + const streamHttpEnd = (req, closeHandler = undefined) => (ids, listener) => { req.on('close', () => { ids.forEach(id => { - unsubscribe(id); + unsubscribe(id, listener); }); if (closeHandler) { @@ -1118,7 +1126,7 @@ const startWorker = async (workerId) => { * @typedef WebSocketSession * @property {any} socket * @property {any} request - * @property {Object.} subscriptions + * @property {Object.} subscriptions */ /**