diff --git a/app/jobs/internal/process_stale_contacts_job.rb b/app/jobs/internal/process_stale_contacts_job.rb index 4c9990415..28143cd6a 100644 --- a/app/jobs/internal/process_stale_contacts_job.rb +++ b/app/jobs/internal/process_stale_contacts_job.rb @@ -1,5 +1,5 @@ # housekeeping -# remove stale contacts for all accounts +# remove stale contacts for subset of accounts each day # - have no identification (email, phone_number, and identifier are NULL) # - have no conversations # - are older than 30 days @@ -7,14 +7,33 @@ class Internal::ProcessStaleContactsJob < ApplicationJob queue_as :housekeeping + # Number of day-based groups to split accounts into + DISTRIBUTION_GROUPS = 5 + # Max accounts to process in one batch + MAX_ACCOUNTS_PER_BATCH = 20 + + # Process only a subset of accounts per day to avoid flooding the queue def perform return unless ChatwootApp.chatwoot_cloud? - Account.find_in_batches(batch_size: 100) do |accounts| - accounts.each do |account| - Rails.logger.info "Enqueuing RemoveStaleContactsJob for account #{account.id}" - Internal::RemoveStaleContactsJob.perform_later(account) - end + # Use the day of the month to determine which accounts to process + day_of_month = Date.current.day + remainder = day_of_month % DISTRIBUTION_GROUPS + + # Count total accounts for logging + total_accounts = Account.count + log_message = "ProcessStaleContactsJob: Processing accounts with ID % #{DISTRIBUTION_GROUPS} = " + log_message += "#{remainder} (out of #{total_accounts} total accounts)" + Rails.logger.info log_message + + # Process only accounts where ID % 5 = remainder for today + # This ensures each account is processed approximately once every 5 days + Account.where("id % #{DISTRIBUTION_GROUPS} = ?", remainder).find_each(batch_size: MAX_ACCOUNTS_PER_BATCH) do |account| + Rails.logger.info "Enqueuing RemoveStaleContactsJob for account #{account.id}" + + # Add a small delay between jobs to further reduce queue pressure + delay = rand(1..10).minutes + Internal::RemoveStaleContactsJob.set(wait: delay).perform_later(account) end end end diff --git a/spec/jobs/internal/process_stale_contacts_job_spec.rb b/spec/jobs/internal/process_stale_contacts_job_spec.rb index f7f17bf78..ca873756c 100644 --- a/spec/jobs/internal/process_stale_contacts_job_spec.rb +++ b/spec/jobs/internal/process_stale_contacts_job_spec.rb @@ -3,44 +3,62 @@ require 'rails_helper' RSpec.describe Internal::ProcessStaleContactsJob do subject(:job) { described_class.perform_later } - it 'enqueues the job' do - allow(ChatwootApp).to receive(:chatwoot_cloud?).and_return(true) - expect { job }.to have_enqueued_job(described_class) - .on_queue('housekeeping') + context 'when in cloud environment' do + before do + allow(ChatwootApp).to receive(:chatwoot_cloud?).and_return(true) + end + + it 'processes accounts based on the day of month' do + # Set a fixed day for testing + day_of_month = 16 + remainder = day_of_month % described_class::DISTRIBUTION_GROUPS + allow(Date).to receive(:current).and_return(Date.new(2025, 5, day_of_month)) + + # Create an account and set its ID to match today's pattern + account = create(:account) + allow(account).to receive(:id).and_return(remainder) + + # Mock the Account.where to return our filtered accounts + account_relation = double + allow(Account).to receive(:where).with("id % #{described_class::DISTRIBUTION_GROUPS} = ?", remainder).and_return(account_relation) + allow(account_relation).to receive(:find_each).and_yield(account) + + # Mock the delay setting + allow(Internal::RemoveStaleContactsJob).to receive(:set).and_return(Internal::RemoveStaleContactsJob) + expect(Internal::RemoveStaleContactsJob).to receive(:perform_later).with(account) + + described_class.perform_now + end + + it 'adds a delay between jobs' do + day_of_month = 15 + remainder = day_of_month % described_class::DISTRIBUTION_GROUPS + allow(Date).to receive(:current).and_return(Date.new(2025, 5, day_of_month)) + + account = create(:account) + + account_relation = double + allow(Account).to receive(:where).with("id % #{described_class::DISTRIBUTION_GROUPS} = ?", remainder).and_return(account_relation) + allow(account_relation).to receive(:find_each).and_yield(account) + + expect(Internal::RemoveStaleContactsJob).to receive(:set) do |args| + expect(args[:wait]).to be_between(1.minute, 10.minutes) + Internal::RemoveStaleContactsJob + end + expect(Internal::RemoveStaleContactsJob).to receive(:perform_later).with(account) + + described_class.perform_now + end end - it 'enqueues RemoveStaleContactsJob for each account' do - allow(ChatwootApp).to receive(:chatwoot_cloud?).and_return(true) - account1 = create(:account) - account2 = create(:account) - account3 = create(:account) + context 'when not in cloud environment' do + it 'does not process any accounts' do + allow(ChatwootApp).to receive(:chatwoot_cloud?).and_return(false) - expect { described_class.perform_now }.to have_enqueued_job(Internal::RemoveStaleContactsJob) - .with(account1) - .on_queue('housekeeping') - expect { described_class.perform_now }.to have_enqueued_job(Internal::RemoveStaleContactsJob) - .with(account2) - .on_queue('housekeeping') - expect { described_class.perform_now }.to have_enqueued_job(Internal::RemoveStaleContactsJob) - .with(account3) - .on_queue('housekeeping') - end + expect(Account).not_to receive(:where) + expect(Internal::RemoveStaleContactsJob).not_to receive(:perform_later) - it 'processes accounts in batches' do - allow(ChatwootApp).to receive(:chatwoot_cloud?).and_return(true) - account = create(:account) - allow(Account).to receive(:find_in_batches).with(batch_size: 100).and_yield([account]) - - expect(Internal::RemoveStaleContactsJob).to receive(:perform_later).with(account) - described_class.perform_now - end - - it 'does not process accounts when not in cloud environment' do - allow(ChatwootApp).to receive(:chatwoot_cloud?).and_return(false) - create(:account) - - expect(Account).not_to receive(:find_in_batches) - expect(Internal::RemoveStaleContactsJob).not_to receive(:perform_later) - described_class.perform_now + described_class.perform_now + end end end