Fix: Streaming server memory leak in HTTP EventSource cleanup (#26228)

This commit is contained in:
Emelia Smith 2023-07-28 12:06:29 +02:00 committed by Claire
parent e655b35d7e
commit 50f4af28b0

View file

@ -227,9 +227,15 @@ const startWorker = async (workerId) => {
callbacks.forEach(callback => callback(json)); callbacks.forEach(callback => callback(json));
}; };
/**
* @callback SubscriptionListener
* @param {ReturnType<parseJSON>} json of the message
* @returns void
*/
/** /**
* @param {string} channel * @param {string} channel
* @param {function(string): void} callback * @param {SubscriptionListener} callback
*/ */
const subscribe = (channel, callback) => { const subscribe = (channel, callback) => {
log.silly(`Adding listener for ${channel}`); log.silly(`Adding listener for ${channel}`);
@ -246,7 +252,7 @@ const startWorker = async (workerId) => {
/** /**
* @param {string} channel * @param {string} channel
* @param {function(Object<string, any>): void} callback * @param {SubscriptionListener} callback
*/ */
const unsubscribe = (channel, callback) => { const unsubscribe = (channel, callback) => {
log.silly(`Removing listener for ${channel}`); log.silly(`Removing listener for ${channel}`);
@ -618,9 +624,9 @@ const startWorker = async (workerId) => {
* @param {string[]} ids * @param {string[]} ids
* @param {any} req * @param {any} req
* @param {function(string, string): void} output * @param {function(string, string): void} output
* @param {function(string[], function(string): void): void} attachCloseHandler * @param {undefined | function(string[], SubscriptionListener): void} attachCloseHandler
* @param {boolean=} needsFiltering * @param {boolean=} needsFiltering
* @returns {function(object): void} * @returns {SubscriptionListener}
*/ */
const streamFrom = (ids, req, output, attachCloseHandler, needsFiltering = false) => { const streamFrom = (ids, req, output, attachCloseHandler, needsFiltering = false) => {
const accountId = req.accountId || req.remoteAddress; const accountId = req.accountId || req.remoteAddress;
@ -703,7 +709,7 @@ const startWorker = async (workerId) => {
subscribe(`${redisPrefix}${id}`, listener); subscribe(`${redisPrefix}${id}`, listener);
}); });
if (attachCloseHandler) { if (typeof attachCloseHandler === 'function') {
attachCloseHandler(ids.map(id => `${redisPrefix}${id}`), listener); attachCloseHandler(ids.map(id => `${redisPrefix}${id}`), listener);
} }
@ -740,12 +746,13 @@ const startWorker = async (workerId) => {
/** /**
* @param {any} req * @param {any} req
* @param {function(): void} [closeHandler] * @param {function(): void} [closeHandler]
* @return {function(string[]): void} * @returns {function(string[], SubscriptionListener): void}
*/ */
const streamHttpEnd = (req, closeHandler = undefined) => (ids) => {
const streamHttpEnd = (req, closeHandler = undefined) => (ids, listener) => {
req.on('close', () => { req.on('close', () => {
ids.forEach(id => { ids.forEach(id => {
unsubscribe(id); unsubscribe(id, listener);
}); });
if (closeHandler) { if (closeHandler) {
@ -956,7 +963,7 @@ const startWorker = async (workerId) => {
* @typedef WebSocketSession * @typedef WebSocketSession
* @property {any} socket * @property {any} socket
* @property {any} request * @property {any} request
* @property {Object.<string, { listener: function(string): void, stopHeartbeat: function(): void }>} subscriptions * @property {Object.<string, { listener: SubscriptionListener, stopHeartbeat: function(): void }>} subscriptions
*/ */
/** /**