# frozen_string_literal: true class Scheduler::AccountsStatusesCleanupScheduler include Sidekiq::Worker include Redisable # This limit is mostly to be nice to the fediverse at large and not # generate too much traffic. # This also helps limiting the running time of the scheduler itself. MAX_BUDGET = 50 # This is an attempt to spread the load across instances, as various # accounts are likely to have various followers. PER_ACCOUNT_BUDGET = 5 # This is an attempt to limit the workload generated by status removal # jobs to something the particular instance can handle. PER_THREAD_BUDGET = 5 # Those avoid loading an instance that is already under load MAX_DEFAULT_SIZE = 2 MAX_DEFAULT_LATENCY = 5 MAX_PUSH_SIZE = 5 MAX_PUSH_LATENCY = 10 # 'pull' queue has lower priority jobs, and it's unlikely that pushing # deletes would cause much issues with this queue if it didn't cause issues # with default and push. Yet, do not enqueue deletes if the instance is # lagging behind too much. MAX_PULL_SIZE = 500 MAX_PULL_LATENCY = 300 # This is less of an issue in general, but deleting old statuses is likely # to cause delivery errors, and thus increase the number of jobs to be retried. # This doesn't directly translate to load, but connection errors and a high # number of dead instances may lead to this spiraling out of control if # unchecked. MAX_RETRY_SIZE = 50_000 sidekiq_options retry: 0, lock: :until_executed def perform return if under_load? budget = compute_budget first_policy_id = last_processed_id loop do num_processed_accounts = 0 scope = AccountStatusesCleanupPolicy.where(enabled: true) scope.where(Account.arel_table[:id].gt(first_policy_id)) if first_policy_id.present? scope.find_each(order: :asc) do |policy| num_deleted = AccountStatusesCleanupService.new.call(policy, [budget, PER_ACCOUNT_BUDGET].min) num_processed_accounts += 1 unless num_deleted.zero? budget -= num_deleted if budget.zero? save_last_processed_id(policy.id) break end end # The idea here is to loop through all policies at least once until the budget is exhausted # and start back after the last processed account otherwise break if budget.zero? || (num_processed_accounts.zero? && first_policy_id.nil?) first_policy_id = nil end end def compute_budget threads = Sidekiq::ProcessSet.new.select { |x| x['queues'].include?('push') }.map { |x| x['concurrency'] }.sum [PER_THREAD_BUDGET * threads, MAX_BUDGET].min end def under_load? return true if Sidekiq::Stats.new.retry_size > MAX_RETRY_SIZE queue_under_load?('default', MAX_DEFAULT_SIZE, MAX_DEFAULT_LATENCY) || queue_under_load?('push', MAX_PUSH_SIZE, MAX_PUSH_LATENCY) || queue_under_load?('pull', MAX_PULL_SIZE, MAX_PULL_LATENCY) end private def queue_under_load?(name, max_size, max_latency) queue = Sidekiq::Queue.new(name) queue.size > max_size || queue.latency > max_latency end def last_processed_id redis.get('account_statuses_cleanup_scheduler:last_account_id') end def save_last_processed_id(id) if id.nil? redis.del('account_statuses_cleanup_scheduler:last_account_id') else redis.set('account_statuses_cleanup_scheduler:last_account_id', id, ex: 1.hour.seconds) end end end