diff --git a/app/workers/scheduler/accounts_statuses_cleanup_scheduler.rb b/app/workers/scheduler/accounts_statuses_cleanup_scheduler.rb index c4fc53794..5b8c15d2d 100644 --- a/app/workers/scheduler/accounts_statuses_cleanup_scheduler.rb +++ b/app/workers/scheduler/accounts_statuses_cleanup_scheduler.rb @@ -38,17 +38,37 @@ class Scheduler::AccountsStatusesCleanupScheduler return if under_load? budget = compute_budget - first_policy_id = last_processed_id + + # If the budget allows it, we want to consider all accounts with enabled + # auto cleanup at least once. + # + # We start from `first_policy_id` (the last processed id in the previous + # run) and process each policy until we loop to `first_policy_id`, + # recording into `affected_policies` any policy that caused posts to be + # deleted. + # + # After that, we set `full_iteration` to `false` and continue looping on + # policies from `affected_policies`. + first_policy_id = last_processed_id || 0 + first_iteration = true + full_iteration = true + affected_policies = [] loop do num_processed_accounts = 0 - scope = AccountStatusesCleanupPolicy.where(enabled: true) - scope = scope.where(id: first_policy_id...) if first_policy_id.present? + scope = cleanup_policies(first_policy_id, affected_policies, first_iteration, full_iteration) scope.find_each(order: :asc) do |policy| num_deleted = AccountStatusesCleanupService.new.call(policy, [budget, PER_ACCOUNT_BUDGET].min) - num_processed_accounts += 1 unless num_deleted.zero? budget -= num_deleted + + unless num_deleted.zero? + num_processed_accounts += 1 + affected_policies << policy.id if full_iteration + end + + full_iteration = false if !first_iteration && policy.id >= first_policy_id + if budget.zero? save_last_processed_id(policy.id) break @@ -57,8 +77,9 @@ class Scheduler::AccountsStatusesCleanupScheduler # The idea here is to loop through all policies at least once until the budget is exhausted # and start back after the last processed account otherwise - break if budget.zero? || (num_processed_accounts.zero? && first_policy_id.nil?) - first_policy_id = nil + break if budget.zero? || (num_processed_accounts.zero? && !full_iteration) + + first_iteration = false end end @@ -75,12 +96,28 @@ class Scheduler::AccountsStatusesCleanupScheduler private + def cleanup_policies(first_policy_id, affected_policies, first_iteration, full_iteration) + scope = AccountStatusesCleanupPolicy.where(enabled: true) + + if full_iteration + # If we are doing a full iteration, examine all policies we have not examined yet + if first_iteration + scope.where(id: first_policy_id...) + else + scope.where(id: ..first_policy_id).or(scope.where(id: affected_policies)) + end + else + # Otherwise, examine only policies that previously yielded posts to delete + scope.where(id: affected_policies) + end + end + def queue_under_load?(name, max_latency) Sidekiq::Queue.new(name).latency > max_latency end def last_processed_id - redis.get('account_statuses_cleanup_scheduler:last_policy_id') + redis.get('account_statuses_cleanup_scheduler:last_policy_id')&.to_i end def save_last_processed_id(id) diff --git a/spec/workers/scheduler/accounts_statuses_cleanup_scheduler_spec.rb b/spec/workers/scheduler/accounts_statuses_cleanup_scheduler_spec.rb index bd17b2abf..2a7a7ffbb 100644 --- a/spec/workers/scheduler/accounts_statuses_cleanup_scheduler_spec.rb +++ b/spec/workers/scheduler/accounts_statuses_cleanup_scheduler_spec.rb @@ -73,7 +73,7 @@ describe Scheduler::AccountsStatusesCleanupScheduler do end end - describe '#get_budget' do + describe '#compute_budget' do context 'on a single thread' do let(:process_set_stub) { [ { 'concurrency' => 1, 'queues' => ['push', 'default'] } ] } @@ -128,6 +128,19 @@ describe Scheduler::AccountsStatusesCleanupScheduler do .and change { account3.statuses.count } .and change { account5.statuses.count } end + + context 'when given a big budget' do + let(:process_set_stub) { [{ 'concurrency' => 400, 'queues' => %w(push default) }] } + + before do + stub_const 'Scheduler::AccountsStatusesCleanupScheduler::MAX_BUDGET', 400 + end + + it 'correctly handles looping in a single run' do + expect(subject.compute_budget).to eq(400) + expect { subject.perform }.to change { Status.count }.by(-30) + end + end end end end