# frozen_string_literal: true class Scheduler::AccountsStatusesCleanupScheduler include Sidekiq::Worker include Redisable # This limit is mostly to be nice to the fediverse at large and not # generate too much traffic. # This also helps limiting the running time of the scheduler itself. MAX_BUDGET = 300 # This is an attempt to spread the load across remote servers, as # spreading deletions across diverse accounts is likely to spread # the deletion across diverse followers. It also helps each individual # user see some effect sooner. PER_ACCOUNT_BUDGET = 5 # This is an attempt to limit the workload generated by status removal # jobs to something the particular server can handle. PER_THREAD_BUDGET = 5 # These are latency limits on various queues above which a server is # considered to be under load, causing the auto-deletion to be entirely # skipped for that run. LOAD_LATENCY_THRESHOLDS = { default: 5, push: 10, # The `pull` queue has lower priority jobs, and it's unlikely that # pushing deletes would cause much issues with this queue if it didn't # cause issues with `default` and `push`. Yet, do not enqueue deletes # if the instance is lagging behind too much. pull: 5.minutes.to_i, }.freeze sidekiq_options retry: 0, lock: :until_executed, lock_ttl: 1.day.to_i def perform return if under_load? budget = compute_budget # If the budget allows it, we want to consider all accounts with enabled # auto cleanup at least once. # # We start from `first_policy_id` (the last processed id in the previous # run) and process each policy until we loop to `first_policy_id`, # recording into `affected_policies` any policy that caused posts to be # deleted. # # After that, we set `full_iteration` to `false` and continue looping on # policies from `affected_policies`. first_policy_id = last_processed_id || 0 first_iteration = true full_iteration = true affected_policies = [] loop do num_processed_accounts = 0 scope = cleanup_policies(first_policy_id, affected_policies, first_iteration, full_iteration) scope.find_each(order: :asc) do |policy| num_deleted = AccountStatusesCleanupService.new.call(policy, [budget, PER_ACCOUNT_BUDGET].min) budget -= num_deleted unless num_deleted.zero? num_processed_accounts += 1 affected_policies << policy.id if full_iteration end full_iteration = false if !first_iteration && policy.id >= first_policy_id if budget.zero? save_last_processed_id(policy.id) break end end # The idea here is to loop through all policies at least once until the budget is exhausted # and start back after the last processed account otherwise break if budget.zero? || (num_processed_accounts.zero? && !full_iteration) first_iteration = false end end def compute_budget # Each post deletion is a `RemovalWorker` job (on `default` queue), each # potentially spawning many `ActivityPub::DeliveryWorker` jobs (on the `push` queue). threads = Sidekiq::ProcessSet.new.select { |x| x['queues'].include?('push') }.pluck('concurrency').sum [PER_THREAD_BUDGET * threads, MAX_BUDGET].min end def under_load? LOAD_LATENCY_THRESHOLDS.any? { |queue, max_latency| queue_under_load?(queue, max_latency) } end private def cleanup_policies(first_policy_id, affected_policies, first_iteration, full_iteration) scope = AccountStatusesCleanupPolicy.where(enabled: true) if full_iteration # If we are doing a full iteration, examine all policies we have not examined yet if first_iteration scope.where(id: first_policy_id...) else scope.where(id: ..first_policy_id).or(scope.where(id: affected_policies)) end else # Otherwise, examine only policies that previously yielded posts to delete scope.where(id: affected_policies) end end def queue_under_load?(name, max_latency) Sidekiq::Queue.new(name).latency > max_latency end def last_processed_id redis.get('account_statuses_cleanup_scheduler:last_policy_id')&.to_i end def save_last_processed_id(id) if id.nil? redis.del('account_statuses_cleanup_scheduler:last_policy_id') else redis.set('account_statuses_cleanup_scheduler:last_policy_id', id, ex: 1.hour.seconds) end end end