76 lines
		
	
	
	
		
			2.3 KiB
		
	
	
	
		
			Ruby
		
	
	
	
	
	
			
		
		
	
	
			76 lines
		
	
	
	
		
			2.3 KiB
		
	
	
	
		
			Ruby
		
	
	
	
	
	
| # frozen_string_literal: true
 | |
| 
 | |
| class Mastodon::SidekiqMiddleware
 | |
|   BACKTRACE_LIMIT = 3
 | |
| 
 | |
|   def call(_worker_class, job, _queue, &block)
 | |
|     setup_query_log_tags(job) do
 | |
|       Chewy.strategy(:mastodon, &block)
 | |
|     end
 | |
|   rescue Mastodon::HostValidationError
 | |
|     # Do not retry
 | |
|   rescue => e
 | |
|     clean_up_elasticsearch_connections!
 | |
|     limit_backtrace_and_raise(e)
 | |
|   ensure
 | |
|     clean_up_sockets!
 | |
|   end
 | |
| 
 | |
|   private
 | |
| 
 | |
|   def limit_backtrace_and_raise(exception)
 | |
|     exception.set_backtrace(exception.backtrace.first(BACKTRACE_LIMIT)) unless ENV['BACKTRACE']
 | |
|     raise exception
 | |
|   end
 | |
| 
 | |
|   def clean_up_sockets!
 | |
|     clean_up_redis_socket!
 | |
|     clean_up_statsd_socket!
 | |
|   end
 | |
| 
 | |
|   # This is a hack to immediately free up unused Elasticsearch connections.
 | |
|   #
 | |
|   # Indeed, Chewy creates one `Elasticsearch::Client` instance per thread,
 | |
|   # and each such client manages its long-lasting connection to
 | |
|   # Elasticsearch.
 | |
|   #
 | |
|   # As far as I know, neither `chewy`,  `elasticsearch-transport` or even
 | |
|   # `faraday` provide a reliable way to immediately close a connection, and
 | |
|   # rely on the underlying object to be garbage-collected instead.
 | |
|   #
 | |
|   # Furthermore, `sidekiq` creates a new thread each time a job throws an
 | |
|   # exception, meaning that each failure will create a new connection, and
 | |
|   # the old one will only be closed on full garbage collection.
 | |
|   def clean_up_elasticsearch_connections!
 | |
|     return unless Chewy.enabled? && Chewy.current[:chewy_client].present?
 | |
| 
 | |
|     Chewy.client.transport.transport.connections.each do |connection|
 | |
|       # NOTE: This bit of code is tailored for the HTTPClient Faraday adapter
 | |
|       connection.connection.app.instance_variable_get(:@client)&.reset_all
 | |
|     end
 | |
| 
 | |
|     Chewy.current.delete(:chewy_client)
 | |
|   rescue
 | |
|     nil
 | |
|   end
 | |
| 
 | |
|   def clean_up_redis_socket!
 | |
|     RedisConnection.pool.checkin if Thread.current[:redis]
 | |
|     Thread.current[:redis] = nil
 | |
|   end
 | |
| 
 | |
|   def clean_up_statsd_socket!
 | |
|     Thread.current[:statsd_socket]&.close
 | |
|     Thread.current[:statsd_socket] = nil
 | |
|   end
 | |
| 
 | |
|   def setup_query_log_tags(job, &block)
 | |
|     if Rails.configuration.active_record.query_log_tags_enabled
 | |
|       # If `wrapped` is set, this is an `ActiveJob` which is already in the execution context
 | |
|       sidekiq_job_class = job['wrapped'].present? ? nil : job['class'].to_s
 | |
|       ActiveSupport::ExecutionContext.set(sidekiq_job_class: sidekiq_job_class, &block)
 | |
|     else
 | |
|       yield
 | |
|     end
 | |
|   end
 | |
| end
 |