From eabdfc81684b467c478bbf6ee238f5fa4ee8f7ec Mon Sep 17 00:00:00 2001 From: Sojan Jose Date: Wed, 22 Oct 2025 20:20:37 -0700 Subject: [PATCH] chore(sidekiq): log ActiveJob class and job_id on dequeue (#12704) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit ## Context Sidekiq logs only showed the Sidekiq wrapper class and JID, which wasn’t helpful when debugging ActiveJobs. ## Changes - Updated `ChatwootDequeuedLogger` to log the actual `ActiveJob class` and `job_id` instead of the generic Sidekiq wrapper and JID. > Example > ``` > Dequeued ActionMailer::MailDeliveryJob 123e4567-e89b-12d3-a456-426614174000 from default > ``` - Remove sidekiq worker and unify everything to `ActiveJob` --- app/jobs/conversation_reply_email_job.rb | 15 ++++++ .../send_email_notification_service.rb | 2 +- .../conversation_reply_email_worker.rb | 29 ------------ config/initializers/sidekiq.rb | 3 +- ...nding_conversations_resolution_job_spec.rb | 27 ++++++----- .../jobs/conversation_reply_email_job_spec.rb | 33 +++++++++++++ spec/models/message_spec.rb | 33 ++++--------- .../send_email_notification_service_spec.rb | 46 +++++++------------ .../conversation_reply_email_worker_spec.rb | 46 ------------------- 9 files changed, 92 insertions(+), 142 deletions(-) create mode 100644 app/jobs/conversation_reply_email_job.rb delete mode 100644 app/workers/conversation_reply_email_worker.rb create mode 100644 spec/jobs/conversation_reply_email_job_spec.rb delete mode 100644 spec/workers/conversation_reply_email_worker_spec.rb diff --git a/app/jobs/conversation_reply_email_job.rb b/app/jobs/conversation_reply_email_job.rb new file mode 100644 index 000000000..5d186bf29 --- /dev/null +++ b/app/jobs/conversation_reply_email_job.rb @@ -0,0 +1,15 @@ +class ConversationReplyEmailJob < ApplicationJob + queue_as :mailers + + def perform(conversation_id, last_queued_id) + conversation = Conversation.find(conversation_id) + + if conversation.messages.incoming&.last&.content_type == 'incoming_email' + ConversationReplyMailer.with(account: conversation.account).reply_without_summary(conversation, last_queued_id).deliver_later + else + ConversationReplyMailer.with(account: conversation.account).reply_with_summary(conversation, last_queued_id).deliver_later + end + + Redis::Alfred.delete(format(::Redis::Alfred::CONVERSATION_MAILER_KEY, conversation_id: conversation.id)) + end +end diff --git a/app/services/messages/send_email_notification_service.rb b/app/services/messages/send_email_notification_service.rb index 87ba1a797..25a77b0d5 100644 --- a/app/services/messages/send_email_notification_service.rb +++ b/app/services/messages/send_email_notification_service.rb @@ -12,7 +12,7 @@ class Messages::SendEmailNotificationService # the worker never manages to clean up. return unless Redis::Alfred.set(conversation_mail_key, message.id, nx: true, ex: 1.hour.to_i) - ConversationReplyEmailWorker.perform_in(2.minutes, conversation.id, message.id) + ConversationReplyEmailJob.set(wait: 2.minutes).perform_later(conversation.id, message.id) end private diff --git a/app/workers/conversation_reply_email_worker.rb b/app/workers/conversation_reply_email_worker.rb deleted file mode 100644 index 0eddecaf7..000000000 --- a/app/workers/conversation_reply_email_worker.rb +++ /dev/null @@ -1,29 +0,0 @@ -# TODO: lets move this to active job, since thats what we use over all -class ConversationReplyEmailWorker - include Sidekiq::Worker - sidekiq_options queue: :mailers - - def perform(conversation_id, last_queued_id) - @conversation = Conversation.find(conversation_id) - - # send the email - if @conversation.messages.incoming&.last&.content_type == 'incoming_email' - ConversationReplyMailer.with(account: @conversation.account).reply_without_summary(@conversation, last_queued_id).deliver_later - else - ConversationReplyMailer.with(account: @conversation.account).reply_with_summary(@conversation, last_queued_id).deliver_later - end - - # delete the redis set from the first new message on the conversation - Redis::Alfred.delete(conversation_mail_key) - end - - private - - def email_inbox? - @conversation.inbox&.inbox_type == 'Email' - end - - def conversation_mail_key - format(::Redis::Alfred::CONVERSATION_MAILER_KEY, conversation_id: @conversation.id) - end -end diff --git a/config/initializers/sidekiq.rb b/config/initializers/sidekiq.rb index 1ada67cc1..04d605c2e 100644 --- a/config/initializers/sidekiq.rb +++ b/config/initializers/sidekiq.rb @@ -9,7 +9,8 @@ end # Logs whenever a job is pulled off Redis for execution. class ChatwootDequeuedLogger def call(_worker, job, queue) - Sidekiq.logger.info("Dequeued #{job['class']} #{job['jid']} from #{queue}") + payload = job['args'].first + Sidekiq.logger.info("Dequeued #{job['wrapped']} #{payload['job_id']} from #{queue}") yield end end diff --git a/spec/enterprise/jobs/captain/inbox_pending_conversations_resolution_job_spec.rb b/spec/enterprise/jobs/captain/inbox_pending_conversations_resolution_job_spec.rb index 40f1ea294..1a8a5a342 100644 --- a/spec/enterprise/jobs/captain/inbox_pending_conversations_resolution_job_spec.rb +++ b/spec/enterprise/jobs/captain/inbox_pending_conversations_resolution_job_spec.rb @@ -1,8 +1,6 @@ require 'rails_helper' RSpec.describe Captain::InboxPendingConversationsResolutionJob, type: :job do - include ActiveJob::TestHelper - let!(:inbox) { create(:inbox) } let!(:resolvable_pending_conversation) { create(:conversation, inbox: inbox, last_activity_at: 2.hours.ago, status: :pending) } @@ -14,6 +12,7 @@ RSpec.describe Captain::InboxPendingConversationsResolutionJob, type: :job do before do create(:captain_inbox, inbox: inbox, captain_assistant: captain_assistant) stub_const('Limits::BULK_ACTIONS_LIMIT', 2) + inbox.reload end it 'queues the job' do @@ -22,7 +21,7 @@ RSpec.describe Captain::InboxPendingConversationsResolutionJob, type: :job do end it 'resolves only the eligible pending conversations' do - perform_enqueued_jobs { described_class.perform_later(inbox) } + described_class.perform_now(inbox) expect(resolvable_pending_conversation.reload.status).to eq('resolved') expect(recent_pending_conversation.reload.status).to eq('pending') @@ -34,7 +33,7 @@ RSpec.describe Captain::InboxPendingConversationsResolutionJob, type: :job do captain_assistant.update!(config: { 'resolution_message' => custom_message }) expect do - perform_enqueued_jobs { described_class.perform_later(inbox) } + described_class.perform_now(inbox) end.to change { resolvable_pending_conversation.messages.outgoing.reload.count }.by(1) outgoing_message = resolvable_pending_conversation.messages.outgoing.last @@ -44,7 +43,7 @@ RSpec.describe Captain::InboxPendingConversationsResolutionJob, type: :job do it 'creates an outgoing message with default auto resolution message if not configured' do captain_assistant.update!(config: {}) - perform_enqueued_jobs { described_class.perform_later(inbox) } + described_class.perform_now(inbox) outgoing_message = resolvable_pending_conversation.messages.outgoing.last expect(outgoing_message.content).to eq( I18n.t('conversations.activity.auto_resolution_message') @@ -52,11 +51,17 @@ RSpec.describe Captain::InboxPendingConversationsResolutionJob, type: :job do end it 'adds the correct activity message after resolution by Captain' do - perform_enqueued_jobs { described_class.perform_later(inbox) } - activity_message = resolvable_pending_conversation.messages.activity.last - expect(activity_message).not_to be_nil - expect(activity_message.content).to eq( - I18n.t('conversations.activity.captain.resolved', user_name: captain_assistant.name) - ) + described_class.perform_now(inbox) + expected_content = I18n.t('conversations.activity.captain.resolved', user_name: captain_assistant.name) + expect(Conversations::ActivityMessageJob) + .to have_been_enqueued.with( + resolvable_pending_conversation, + { + account_id: resolvable_pending_conversation.account_id, + inbox_id: resolvable_pending_conversation.inbox_id, + message_type: :activity, + content: expected_content + } + ) end end diff --git a/spec/jobs/conversation_reply_email_job_spec.rb b/spec/jobs/conversation_reply_email_job_spec.rb new file mode 100644 index 000000000..32758c48e --- /dev/null +++ b/spec/jobs/conversation_reply_email_job_spec.rb @@ -0,0 +1,33 @@ +require 'rails_helper' + +RSpec.describe ConversationReplyEmailJob, type: :job do + let(:conversation) { create(:conversation) } + let(:mailer) { double } + let(:mailer_action) { double } + + before do + allow(Conversation).to receive(:find).and_return(conversation) + allow(ConversationReplyMailer).to receive(:with).and_return(mailer) + allow(mailer).to receive(:reply_with_summary).and_return(mailer_action) + allow(mailer).to receive(:reply_without_summary).and_return(mailer_action) + allow(mailer_action).to receive(:deliver_later).and_return(true) + end + + it 'enqueues on mailers queue' do + ActiveJob::Base.queue_adapter = :test + expect do + described_class.perform_later(conversation.id, 123) + end.to have_enqueued_job(described_class).on_queue('mailers') + end + + it 'calls reply_with_summary when last incoming message was not email' do + described_class.perform_now(conversation.id, 123) + expect(mailer).to have_received(:reply_with_summary) + end + + it 'calls reply_without_summary when last incoming message was email' do + create(:message, conversation: conversation, message_type: :incoming, content_type: 'incoming_email') + described_class.perform_now(conversation.id, 123) + expect(mailer).to have_received(:reply_without_summary) + end +end diff --git a/spec/models/message_spec.rb b/spec/models/message_spec.rb index 49a1d1e51..9cc43d0fd 100644 --- a/spec/models/message_spec.rb +++ b/spec/models/message_spec.rb @@ -327,14 +327,11 @@ RSpec.describe Message do message.conversation.contact.update!(email: 'test@example.com') message.message_type = 'outgoing' - # Perform jobs inline to test full integration - perform_enqueued_jobs do - message.save! + ActiveJob::Base.queue_adapter = :test + allow(Redis::Alfred).to receive(:set).and_return(true) + perform_enqueued_jobs(only: SendReplyJob) do + expect { message.save! }.to have_enqueued_job(ConversationReplyEmailJob).with(message.conversation.id, kind_of(Integer)).on_queue('mailers') end - - # Verify the email worker is eventually scheduled through the service - jobs_for_conversation_count = ConversationReplyEmailWorker.jobs.count { |job| job['args'].first == message.conversation.id } - expect(jobs_for_conversation_count).to eq(1) end it 'does not schedule email for website channel if continuity is disabled' do @@ -345,15 +342,8 @@ RSpec.describe Message do message.conversation.contact.update!(email: 'test@example.com') message.message_type = 'outgoing' - initial_job_count = ConversationReplyEmailWorker.jobs.count { |job| job['args'].first == message.conversation.id } - - perform_enqueued_jobs do - message.save! - end - - # No new jobs should be scheduled for this conversation - jobs_for_conversation_count = ConversationReplyEmailWorker.jobs.count { |job| job['args'].first == message.conversation.id } - expect(jobs_for_conversation_count).to eq(initial_job_count) + ActiveJob::Base.queue_adapter = :test + expect { message.save! }.not_to have_enqueued_job(ConversationReplyEmailJob) end it 'does not schedule email for private notes' do @@ -363,15 +353,8 @@ RSpec.describe Message do message.private = true message.message_type = 'outgoing' - initial_job_count = ConversationReplyEmailWorker.jobs.count { |job| job['args'].first == message.conversation.id } - - perform_enqueued_jobs do - message.save! - end - - # No new jobs should be scheduled for this conversation - jobs_for_conversation_count = ConversationReplyEmailWorker.jobs.count { |job| job['args'].first == message.conversation.id } - expect(jobs_for_conversation_count).to eq(initial_job_count) + ActiveJob::Base.queue_adapter = :test + expect { message.save! }.not_to have_enqueued_job(ConversationReplyEmailJob) end it 'calls SendReplyJob for all channels' do diff --git a/spec/services/messages/send_email_notification_service_spec.rb b/spec/services/messages/send_email_notification_service_spec.rb index cda728f21..7c0970fe1 100644 --- a/spec/services/messages/send_email_notification_service_spec.rb +++ b/spec/services/messages/send_email_notification_service_spec.rb @@ -14,17 +14,11 @@ describe Messages::SendEmailNotificationService do before do conversation.contact.update!(email: 'test@example.com') allow(Redis::Alfred).to receive(:set).and_return(true) - allow(ConversationReplyEmailWorker).to receive(:perform_in) + ActiveJob::Base.queue_adapter = :test end - it 'schedules ConversationReplyEmailWorker' do - service.perform - - expect(ConversationReplyEmailWorker).to have_received(:perform_in).with( - 2.minutes, - conversation.id, - message.id - ) + it 'enqueues ConversationReplyEmailJob' do + expect { service.perform }.to have_enqueued_job(ConversationReplyEmailJob).with(conversation.id, message.id).on_queue('mailers') end it 'atomically sets redis key to prevent duplicate emails' do @@ -40,10 +34,8 @@ describe Messages::SendEmailNotificationService do allow(Redis::Alfred).to receive(:set).and_return(false) end - it 'does not schedule worker' do - service.perform - - expect(ConversationReplyEmailWorker).not_to have_received(:perform_in) + it 'does not enqueue job' do + expect { service.perform }.not_to have_enqueued_job(ConversationReplyEmailJob) end it 'attempts atomic set once' do @@ -62,7 +54,7 @@ describe Messages::SendEmailNotificationService do conversation.contact.update!(email: 'test@example.com') end - it 'prevents duplicate workers under race conditions' do + it 'prevents duplicate jobs under race conditions' do # Create 5 threads that simultaneously try to enqueue workers for the same conversation threads = Array.new(5) do Thread.new do @@ -73,24 +65,24 @@ describe Messages::SendEmailNotificationService do threads.each(&:join) - # Only ONE worker should be scheduled despite 5 concurrent attempts - jobs_for_conversation = ConversationReplyEmailWorker.jobs.select { |job| job['args'].first == conversation.id } + # Only ONE job should be scheduled despite 5 concurrent attempts + jobs_for_conversation = ActiveJob::Base.queue_adapter.enqueued_jobs.select do |job| + job[:job] == ConversationReplyEmailJob && job[:args].first == conversation.id + end expect(jobs_for_conversation.size).to eq(1) end end context 'when email notification should not be sent' do before do - allow(ConversationReplyEmailWorker).to receive(:perform_in) + ActiveJob::Base.queue_adapter = :test end context 'when message is not email notifiable' do let(:message) { create(:message, conversation: conversation, message_type: 'incoming') } - it 'does not schedule worker' do - service.perform - - expect(ConversationReplyEmailWorker).not_to have_received(:perform_in) + it 'does not enqueue job' do + expect { service.perform }.not_to have_enqueued_job(ConversationReplyEmailJob) end end @@ -102,10 +94,8 @@ describe Messages::SendEmailNotificationService do conversation.contact.update!(email: nil) end - it 'does not schedule worker' do - service.perform - - expect(ConversationReplyEmailWorker).not_to have_received(:perform_in) + it 'does not enqueue job' do + expect { service.perform }.not_to have_enqueued_job(ConversationReplyEmailJob) end end @@ -117,10 +107,8 @@ describe Messages::SendEmailNotificationService do conversation.contact.update!(email: 'test@example.com') end - it 'does not schedule worker' do - service.perform - - expect(ConversationReplyEmailWorker).not_to have_received(:perform_in) + it 'does not enqueue job' do + expect { service.perform }.not_to have_enqueued_job(ConversationReplyEmailJob) end end end diff --git a/spec/workers/conversation_reply_email_worker_spec.rb b/spec/workers/conversation_reply_email_worker_spec.rb deleted file mode 100644 index 7b28eedde..000000000 --- a/spec/workers/conversation_reply_email_worker_spec.rb +++ /dev/null @@ -1,46 +0,0 @@ -require 'rails_helper' - -Sidekiq::Testing.fake! -RSpec.describe ConversationReplyEmailWorker, type: :worker do - let(:conversation) { build(:conversation, display_id: nil) } - let(:message) { build(:message, conversation: conversation, content_type: 'incoming_email', inbox: conversation.inbox) } - let(:mailer) { double } - let(:mailer_action) { double } - - describe 'testing ConversationSummaryEmailWorker' do - before do - conversation.save! - allow(Conversation).to receive(:find).and_return(conversation) - allow(ConversationReplyMailer).to receive(:with).and_return(mailer) - allow(ConversationReplyMailer).to receive(:with).and_return(mailer) - allow(mailer).to receive(:reply_with_summary).and_return(mailer_action) - allow(mailer).to receive(:reply_without_summary).and_return(mailer_action) - allow(mailer_action).to receive(:deliver_later).and_return(true) - end - - it 'worker jobs are enqueued in the mailers queue' do - described_class.perform_async - expect(described_class.queue).to eq(:mailers) - end - - it 'goes into the jobs array for testing environment' do - expect do - described_class.perform_async - end.to change(described_class.jobs, :size).by(1) - described_class.new.perform(1, message.id) - end - - context 'with actions performed by the worker' do - it 'calls ConversationSummaryMailer#reply_with_summary when last incoming message was not email' do - described_class.new.perform(1, message.id) - expect(mailer).to have_received(:reply_with_summary) - end - - it 'calls ConversationSummaryMailer#reply_without_summary when last incoming message was from email' do - message.save! - described_class.new.perform(1, message.id) - expect(mailer).to have_received(:reply_without_summary) - end - end - end -end