Fix leaking Elasticsearch connections in Sidekiq processes (#30450)
This commit is contained in:
parent
b4e3a789b1
commit
257f9abd56
1 changed files with 27 additions and 0 deletions
|
@ -8,6 +8,7 @@ class Mastodon::SidekiqMiddleware
|
||||||
rescue Mastodon::HostValidationError
|
rescue Mastodon::HostValidationError
|
||||||
# Do not retry
|
# Do not retry
|
||||||
rescue => e
|
rescue => e
|
||||||
|
clean_up_elasticsearch_connections!
|
||||||
limit_backtrace_and_raise(e)
|
limit_backtrace_and_raise(e)
|
||||||
ensure
|
ensure
|
||||||
clean_up_sockets!
|
clean_up_sockets!
|
||||||
|
@ -25,6 +26,32 @@ class Mastodon::SidekiqMiddleware
|
||||||
clean_up_statsd_socket!
|
clean_up_statsd_socket!
|
||||||
end
|
end
|
||||||
|
|
||||||
|
# This is a hack to immediately free up unused Elasticsearch connections.
|
||||||
|
#
|
||||||
|
# Indeed, Chewy creates one `Elasticsearch::Client` instance per thread,
|
||||||
|
# and each such client manages its long-lasting connection to
|
||||||
|
# Elasticsearch.
|
||||||
|
#
|
||||||
|
# As far as I know, neither `chewy`, `elasticsearch-transport` or even
|
||||||
|
# `faraday` provide a reliable way to immediately close a connection, and
|
||||||
|
# rely on the underlying object to be garbage-collected instead.
|
||||||
|
#
|
||||||
|
# Furthermore, `sidekiq` creates a new thread each time a job throws an
|
||||||
|
# exception, meaning that each failure will create a new connection, and
|
||||||
|
# the old one will only be closed on full garbage collection.
|
||||||
|
def clean_up_elasticsearch_connections!
|
||||||
|
return unless Chewy.enabled? && Chewy.current[:chewy_client].present?
|
||||||
|
|
||||||
|
Chewy.client.transport.connections.each do |connection|
|
||||||
|
# NOTE: This bit of code is tailored for the HTTPClient Faraday adapter
|
||||||
|
connection.connection.app.instance_variable_get(:@client)&.reset_all
|
||||||
|
end
|
||||||
|
|
||||||
|
Chewy.current.delete(:chewy_client)
|
||||||
|
rescue
|
||||||
|
nil
|
||||||
|
end
|
||||||
|
|
||||||
def clean_up_redis_socket!
|
def clean_up_redis_socket!
|
||||||
RedisConfiguration.pool.checkin if Thread.current[:redis]
|
RedisConfiguration.pool.checkin if Thread.current[:redis]
|
||||||
Thread.current[:redis] = nil
|
Thread.current[:redis] = nil
|
||||||
|
|
Loading…
Reference in a new issue