Fix websocket connections being incorrectly decremented twice on errors (#27238)
This commit is contained in:
		
					parent
					
						
							
								5f2d494f0d
							
						
					
				
			
			
				commit
				
					
						f68d540271
					
				
			
		
					 1 changed files with 21 additions and 12 deletions
				
			
		|  | @ -1386,19 +1386,21 @@ const startServer = async () => { | |||
|   }; | ||||
| 
 | ||||
|   wss.on('connection', (ws, req) => { | ||||
|     const location = url.parse(req.url, true); | ||||
|     // Note: url.parse could throw, which would terminate the connection, so we
 | ||||
|     // increment the connected clients metric straight away when we establish
 | ||||
|     // the connection, without waiting:
 | ||||
|     connectedClients.labels({ type: 'websocket' }).inc(); | ||||
| 
 | ||||
|     // Setup request properties:
 | ||||
|     req.requestId = uuid.v4(); | ||||
|     req.remoteAddress = ws._socket.remoteAddress; | ||||
| 
 | ||||
|     // Setup connection keep-alive state:
 | ||||
|     ws.isAlive = true; | ||||
| 
 | ||||
|     ws.on('pong', () => { | ||||
|       ws.isAlive = true; | ||||
|     }); | ||||
| 
 | ||||
|     connectedClients.labels({ type: 'websocket' }).inc(); | ||||
| 
 | ||||
|     /** | ||||
|      * @type {WebSocketSession} | ||||
|      */ | ||||
|  | @ -1408,27 +1410,31 @@ const startServer = async () => { | |||
|       subscriptions: {}, | ||||
|     }; | ||||
| 
 | ||||
|     const onEnd = () => { | ||||
|     ws.on('close', function onWebsocketClose() { | ||||
|       const subscriptions = Object.keys(session.subscriptions); | ||||
| 
 | ||||
|       subscriptions.forEach(channelIds => { | ||||
|         removeSubscription(session.subscriptions, channelIds.split(';'), req) | ||||
|       }); | ||||
| 
 | ||||
|       // Decrement the metrics for connected clients:
 | ||||
|       connectedClients.labels({ type: 'websocket' }).dec(); | ||||
| 
 | ||||
|       // ensure garbage collection:
 | ||||
|       session.socket = null; | ||||
|       session.request = null; | ||||
|       session.subscriptions = {}; | ||||
|     }); | ||||
| 
 | ||||
|       connectedClients.labels({ type: 'websocket' }).dec(); | ||||
|     }; | ||||
| 
 | ||||
|     ws.on('close', onEnd); | ||||
|     ws.on('error', onEnd); | ||||
|     // Note: immediately after the `error` event is emitted, the `close` event
 | ||||
|     // is emitted. As such, all we need to do is log the error here.
 | ||||
|     ws.on('error', (err) => { | ||||
|       log.error('websocket', err.toString()); | ||||
|     }); | ||||
| 
 | ||||
|     ws.on('message', (data, isBinary) => { | ||||
|       if (isBinary) { | ||||
|         log.warn('socket', 'Received binary data, closing connection'); | ||||
|         log.warn('websocket', 'Received binary data, closing connection'); | ||||
|         ws.close(1003, 'The mastodon streaming server does not support binary messages'); | ||||
|         return; | ||||
|       } | ||||
|  | @ -1451,7 +1457,10 @@ const startServer = async () => { | |||
| 
 | ||||
|     subscribeWebsocketToSystemChannel(session); | ||||
| 
 | ||||
|     if (location.query.stream) { | ||||
|     // Parse the URL for the connection arguments (if supplied), url.parse can throw:
 | ||||
|     const location = req.url && url.parse(req.url, true); | ||||
| 
 | ||||
|     if (location && location.query.stream) { | ||||
|       subscribeWebsocketToChannel(session, firstParam(location.query.stream), location.query); | ||||
|     } | ||||
|   }); | ||||
|  |  | |||
		Loading…
	
	Add table
		Add a link
		
	
		Reference in a new issue