M app/workers/scheduler/accounts_statuses_cleanup_scheduler.rb => app/workers/scheduler/accounts_statuses_cleanup_scheduler.rb +43 -7
@@ 38,17 38,37 @@ class Scheduler::AccountsStatusesCleanupScheduler
return if under_load?
budget = compute_budget
- first_policy_id = last_processed_id
+
+ # If the budget allows it, we want to consider all accounts with enabled
+ # auto cleanup at least once.
+ #
+ # We start from `first_policy_id` (the last processed id in the previous
+ # run) and process each policy until we loop to `first_policy_id`,
+ # recording into `affected_policies` any policy that caused posts to be
+ # deleted.
+ #
+ # After that, we set `full_iteration` to `false` and continue looping on
+ # policies from `affected_policies`.
+ first_policy_id = last_processed_id || 0
+ first_iteration = true
+ full_iteration = true
+ affected_policies = []
loop do
num_processed_accounts = 0
- scope = AccountStatusesCleanupPolicy.where(enabled: true)
- scope = scope.where(id: first_policy_id...) if first_policy_id.present?
+ scope = cleanup_policies(first_policy_id, affected_policies, first_iteration, full_iteration)
scope.find_each(order: :asc) do |policy|
num_deleted = AccountStatusesCleanupService.new.call(policy, [budget, PER_ACCOUNT_BUDGET].min)
- num_processed_accounts += 1 unless num_deleted.zero?
budget -= num_deleted
+
+ unless num_deleted.zero?
+ num_processed_accounts += 1
+ affected_policies << policy.id if full_iteration
+ end
+
+ full_iteration = false if !first_iteration && policy.id >= first_policy_id
+
if budget.zero?
save_last_processed_id(policy.id)
break
@@ 57,9 77,9 @@ class Scheduler::AccountsStatusesCleanupScheduler
# The idea here is to loop through all policies at least once until the budget is exhausted
# and start back after the last processed account otherwise
- break if budget.zero? || (num_processed_accounts.zero? && first_policy_id.nil?)
+ break if budget.zero? || (num_processed_accounts.zero? && !full_iteration)
- first_policy_id = nil
+ first_iteration = false
end
end
@@ 76,12 96,28 @@ class Scheduler::AccountsStatusesCleanupScheduler
private
+ def cleanup_policies(first_policy_id, affected_policies, first_iteration, full_iteration)
+ scope = AccountStatusesCleanupPolicy.where(enabled: true)
+
+ if full_iteration
+ # If we are doing a full iteration, examine all policies we have not examined yet
+ if first_iteration
+ scope.where(id: first_policy_id...)
+ else
+ scope.where(id: ..first_policy_id).or(scope.where(id: affected_policies))
+ end
+ else
+ # Otherwise, examine only policies that previously yielded posts to delete
+ scope.where(id: affected_policies)
+ end
+ end
+
def queue_under_load?(name, max_latency)
Sidekiq::Queue.new(name).latency > max_latency
end
def last_processed_id
- redis.get('account_statuses_cleanup_scheduler:last_policy_id')
+ redis.get('account_statuses_cleanup_scheduler:last_policy_id')&.to_i
end
def save_last_processed_id(id)
M spec/workers/scheduler/accounts_statuses_cleanup_scheduler_spec.rb => spec/workers/scheduler/accounts_statuses_cleanup_scheduler_spec.rb +14 -1
@@ 75,7 75,7 @@ describe Scheduler::AccountsStatusesCleanupScheduler do
end
end
- describe '#get_budget' do
+ describe '#compute_budget' do
context 'on a single thread' do
let(:process_set_stub) { [{ 'concurrency' => 1, 'queues' => %w(push default) }] }
@@ 130,6 130,19 @@ describe Scheduler::AccountsStatusesCleanupScheduler do
.and change { account3.statuses.count }
.and change { account5.statuses.count }
end
+
+ context 'when given a big budget' do
+ let(:process_set_stub) { [{ 'concurrency' => 400, 'queues' => %w(push default) }] }
+
+ before do
+ stub_const 'Scheduler::AccountsStatusesCleanupScheduler::MAX_BUDGET', 400
+ end
+
+ it 'correctly handles looping in a single run' do
+ expect(subject.compute_budget).to eq(400)
+ expect { subject.perform }.to change { Status.count }.by(-30)
+ end
+ end
end
end
end