chore(sidekiq): log ActiveJob class and job_id on dequeue (#12704)

## Context

Sidekiq logs only showed the Sidekiq wrapper class and JID, which wasn’t
helpful when debugging ActiveJobs.

## Changes

- Updated `ChatwootDequeuedLogger` to log the actual `ActiveJob class`
and `job_id` instead of the generic Sidekiq wrapper and JID.

> Example
> ```
> Dequeued ActionMailer::MailDeliveryJob
123e4567-e89b-12d3-a456-426614174000 from default
> ```

- Remove sidekiq worker and unify everything to `ActiveJob`
This commit is contained in:
Sojan Jose
2025-10-22 20:20:37 -07:00
committed by GitHub
parent 9bd5f15450
commit eabdfc8168
9 changed files with 92 additions and 142 deletions

View File

@@ -0,0 +1,15 @@
class ConversationReplyEmailJob < ApplicationJob
queue_as :mailers
def perform(conversation_id, last_queued_id)
conversation = Conversation.find(conversation_id)
if conversation.messages.incoming&.last&.content_type == 'incoming_email'
ConversationReplyMailer.with(account: conversation.account).reply_without_summary(conversation, last_queued_id).deliver_later
else
ConversationReplyMailer.with(account: conversation.account).reply_with_summary(conversation, last_queued_id).deliver_later
end
Redis::Alfred.delete(format(::Redis::Alfred::CONVERSATION_MAILER_KEY, conversation_id: conversation.id))
end
end

View File

@@ -12,7 +12,7 @@ class Messages::SendEmailNotificationService
# the worker never manages to clean up.
return unless Redis::Alfred.set(conversation_mail_key, message.id, nx: true, ex: 1.hour.to_i)
ConversationReplyEmailWorker.perform_in(2.minutes, conversation.id, message.id)
ConversationReplyEmailJob.set(wait: 2.minutes).perform_later(conversation.id, message.id)
end
private

View File

@@ -1,29 +0,0 @@
# TODO: lets move this to active job, since thats what we use over all
class ConversationReplyEmailWorker
include Sidekiq::Worker
sidekiq_options queue: :mailers
def perform(conversation_id, last_queued_id)
@conversation = Conversation.find(conversation_id)
# send the email
if @conversation.messages.incoming&.last&.content_type == 'incoming_email'
ConversationReplyMailer.with(account: @conversation.account).reply_without_summary(@conversation, last_queued_id).deliver_later
else
ConversationReplyMailer.with(account: @conversation.account).reply_with_summary(@conversation, last_queued_id).deliver_later
end
# delete the redis set from the first new message on the conversation
Redis::Alfred.delete(conversation_mail_key)
end
private
def email_inbox?
@conversation.inbox&.inbox_type == 'Email'
end
def conversation_mail_key
format(::Redis::Alfred::CONVERSATION_MAILER_KEY, conversation_id: @conversation.id)
end
end

View File

@@ -9,7 +9,8 @@ end
# Logs whenever a job is pulled off Redis for execution.
class ChatwootDequeuedLogger
def call(_worker, job, queue)
Sidekiq.logger.info("Dequeued #{job['class']} #{job['jid']} from #{queue}")
payload = job['args'].first
Sidekiq.logger.info("Dequeued #{job['wrapped']} #{payload['job_id']} from #{queue}")
yield
end
end

View File

@@ -1,8 +1,6 @@
require 'rails_helper'
RSpec.describe Captain::InboxPendingConversationsResolutionJob, type: :job do
include ActiveJob::TestHelper
let!(:inbox) { create(:inbox) }
let!(:resolvable_pending_conversation) { create(:conversation, inbox: inbox, last_activity_at: 2.hours.ago, status: :pending) }
@@ -14,6 +12,7 @@ RSpec.describe Captain::InboxPendingConversationsResolutionJob, type: :job do
before do
create(:captain_inbox, inbox: inbox, captain_assistant: captain_assistant)
stub_const('Limits::BULK_ACTIONS_LIMIT', 2)
inbox.reload
end
it 'queues the job' do
@@ -22,7 +21,7 @@ RSpec.describe Captain::InboxPendingConversationsResolutionJob, type: :job do
end
it 'resolves only the eligible pending conversations' do
perform_enqueued_jobs { described_class.perform_later(inbox) }
described_class.perform_now(inbox)
expect(resolvable_pending_conversation.reload.status).to eq('resolved')
expect(recent_pending_conversation.reload.status).to eq('pending')
@@ -34,7 +33,7 @@ RSpec.describe Captain::InboxPendingConversationsResolutionJob, type: :job do
captain_assistant.update!(config: { 'resolution_message' => custom_message })
expect do
perform_enqueued_jobs { described_class.perform_later(inbox) }
described_class.perform_now(inbox)
end.to change { resolvable_pending_conversation.messages.outgoing.reload.count }.by(1)
outgoing_message = resolvable_pending_conversation.messages.outgoing.last
@@ -44,7 +43,7 @@ RSpec.describe Captain::InboxPendingConversationsResolutionJob, type: :job do
it 'creates an outgoing message with default auto resolution message if not configured' do
captain_assistant.update!(config: {})
perform_enqueued_jobs { described_class.perform_later(inbox) }
described_class.perform_now(inbox)
outgoing_message = resolvable_pending_conversation.messages.outgoing.last
expect(outgoing_message.content).to eq(
I18n.t('conversations.activity.auto_resolution_message')
@@ -52,11 +51,17 @@ RSpec.describe Captain::InboxPendingConversationsResolutionJob, type: :job do
end
it 'adds the correct activity message after resolution by Captain' do
perform_enqueued_jobs { described_class.perform_later(inbox) }
activity_message = resolvable_pending_conversation.messages.activity.last
expect(activity_message).not_to be_nil
expect(activity_message.content).to eq(
I18n.t('conversations.activity.captain.resolved', user_name: captain_assistant.name)
)
described_class.perform_now(inbox)
expected_content = I18n.t('conversations.activity.captain.resolved', user_name: captain_assistant.name)
expect(Conversations::ActivityMessageJob)
.to have_been_enqueued.with(
resolvable_pending_conversation,
{
account_id: resolvable_pending_conversation.account_id,
inbox_id: resolvable_pending_conversation.inbox_id,
message_type: :activity,
content: expected_content
}
)
end
end

View File

@@ -0,0 +1,33 @@
require 'rails_helper'
RSpec.describe ConversationReplyEmailJob, type: :job do
let(:conversation) { create(:conversation) }
let(:mailer) { double }
let(:mailer_action) { double }
before do
allow(Conversation).to receive(:find).and_return(conversation)
allow(ConversationReplyMailer).to receive(:with).and_return(mailer)
allow(mailer).to receive(:reply_with_summary).and_return(mailer_action)
allow(mailer).to receive(:reply_without_summary).and_return(mailer_action)
allow(mailer_action).to receive(:deliver_later).and_return(true)
end
it 'enqueues on mailers queue' do
ActiveJob::Base.queue_adapter = :test
expect do
described_class.perform_later(conversation.id, 123)
end.to have_enqueued_job(described_class).on_queue('mailers')
end
it 'calls reply_with_summary when last incoming message was not email' do
described_class.perform_now(conversation.id, 123)
expect(mailer).to have_received(:reply_with_summary)
end
it 'calls reply_without_summary when last incoming message was email' do
create(:message, conversation: conversation, message_type: :incoming, content_type: 'incoming_email')
described_class.perform_now(conversation.id, 123)
expect(mailer).to have_received(:reply_without_summary)
end
end

View File

@@ -327,14 +327,11 @@ RSpec.describe Message do
message.conversation.contact.update!(email: 'test@example.com')
message.message_type = 'outgoing'
# Perform jobs inline to test full integration
perform_enqueued_jobs do
message.save!
ActiveJob::Base.queue_adapter = :test
allow(Redis::Alfred).to receive(:set).and_return(true)
perform_enqueued_jobs(only: SendReplyJob) do
expect { message.save! }.to have_enqueued_job(ConversationReplyEmailJob).with(message.conversation.id, kind_of(Integer)).on_queue('mailers')
end
# Verify the email worker is eventually scheduled through the service
jobs_for_conversation_count = ConversationReplyEmailWorker.jobs.count { |job| job['args'].first == message.conversation.id }
expect(jobs_for_conversation_count).to eq(1)
end
it 'does not schedule email for website channel if continuity is disabled' do
@@ -345,15 +342,8 @@ RSpec.describe Message do
message.conversation.contact.update!(email: 'test@example.com')
message.message_type = 'outgoing'
initial_job_count = ConversationReplyEmailWorker.jobs.count { |job| job['args'].first == message.conversation.id }
perform_enqueued_jobs do
message.save!
end
# No new jobs should be scheduled for this conversation
jobs_for_conversation_count = ConversationReplyEmailWorker.jobs.count { |job| job['args'].first == message.conversation.id }
expect(jobs_for_conversation_count).to eq(initial_job_count)
ActiveJob::Base.queue_adapter = :test
expect { message.save! }.not_to have_enqueued_job(ConversationReplyEmailJob)
end
it 'does not schedule email for private notes' do
@@ -363,15 +353,8 @@ RSpec.describe Message do
message.private = true
message.message_type = 'outgoing'
initial_job_count = ConversationReplyEmailWorker.jobs.count { |job| job['args'].first == message.conversation.id }
perform_enqueued_jobs do
message.save!
end
# No new jobs should be scheduled for this conversation
jobs_for_conversation_count = ConversationReplyEmailWorker.jobs.count { |job| job['args'].first == message.conversation.id }
expect(jobs_for_conversation_count).to eq(initial_job_count)
ActiveJob::Base.queue_adapter = :test
expect { message.save! }.not_to have_enqueued_job(ConversationReplyEmailJob)
end
it 'calls SendReplyJob for all channels' do

View File

@@ -14,17 +14,11 @@ describe Messages::SendEmailNotificationService do
before do
conversation.contact.update!(email: 'test@example.com')
allow(Redis::Alfred).to receive(:set).and_return(true)
allow(ConversationReplyEmailWorker).to receive(:perform_in)
ActiveJob::Base.queue_adapter = :test
end
it 'schedules ConversationReplyEmailWorker' do
service.perform
expect(ConversationReplyEmailWorker).to have_received(:perform_in).with(
2.minutes,
conversation.id,
message.id
)
it 'enqueues ConversationReplyEmailJob' do
expect { service.perform }.to have_enqueued_job(ConversationReplyEmailJob).with(conversation.id, message.id).on_queue('mailers')
end
it 'atomically sets redis key to prevent duplicate emails' do
@@ -40,10 +34,8 @@ describe Messages::SendEmailNotificationService do
allow(Redis::Alfred).to receive(:set).and_return(false)
end
it 'does not schedule worker' do
service.perform
expect(ConversationReplyEmailWorker).not_to have_received(:perform_in)
it 'does not enqueue job' do
expect { service.perform }.not_to have_enqueued_job(ConversationReplyEmailJob)
end
it 'attempts atomic set once' do
@@ -62,7 +54,7 @@ describe Messages::SendEmailNotificationService do
conversation.contact.update!(email: 'test@example.com')
end
it 'prevents duplicate workers under race conditions' do
it 'prevents duplicate jobs under race conditions' do
# Create 5 threads that simultaneously try to enqueue workers for the same conversation
threads = Array.new(5) do
Thread.new do
@@ -73,24 +65,24 @@ describe Messages::SendEmailNotificationService do
threads.each(&:join)
# Only ONE worker should be scheduled despite 5 concurrent attempts
jobs_for_conversation = ConversationReplyEmailWorker.jobs.select { |job| job['args'].first == conversation.id }
# Only ONE job should be scheduled despite 5 concurrent attempts
jobs_for_conversation = ActiveJob::Base.queue_adapter.enqueued_jobs.select do |job|
job[:job] == ConversationReplyEmailJob && job[:args].first == conversation.id
end
expect(jobs_for_conversation.size).to eq(1)
end
end
context 'when email notification should not be sent' do
before do
allow(ConversationReplyEmailWorker).to receive(:perform_in)
ActiveJob::Base.queue_adapter = :test
end
context 'when message is not email notifiable' do
let(:message) { create(:message, conversation: conversation, message_type: 'incoming') }
it 'does not schedule worker' do
service.perform
expect(ConversationReplyEmailWorker).not_to have_received(:perform_in)
it 'does not enqueue job' do
expect { service.perform }.not_to have_enqueued_job(ConversationReplyEmailJob)
end
end
@@ -102,10 +94,8 @@ describe Messages::SendEmailNotificationService do
conversation.contact.update!(email: nil)
end
it 'does not schedule worker' do
service.perform
expect(ConversationReplyEmailWorker).not_to have_received(:perform_in)
it 'does not enqueue job' do
expect { service.perform }.not_to have_enqueued_job(ConversationReplyEmailJob)
end
end
@@ -117,10 +107,8 @@ describe Messages::SendEmailNotificationService do
conversation.contact.update!(email: 'test@example.com')
end
it 'does not schedule worker' do
service.perform
expect(ConversationReplyEmailWorker).not_to have_received(:perform_in)
it 'does not enqueue job' do
expect { service.perform }.not_to have_enqueued_job(ConversationReplyEmailJob)
end
end
end

View File

@@ -1,46 +0,0 @@
require 'rails_helper'
Sidekiq::Testing.fake!
RSpec.describe ConversationReplyEmailWorker, type: :worker do
let(:conversation) { build(:conversation, display_id: nil) }
let(:message) { build(:message, conversation: conversation, content_type: 'incoming_email', inbox: conversation.inbox) }
let(:mailer) { double }
let(:mailer_action) { double }
describe 'testing ConversationSummaryEmailWorker' do
before do
conversation.save!
allow(Conversation).to receive(:find).and_return(conversation)
allow(ConversationReplyMailer).to receive(:with).and_return(mailer)
allow(ConversationReplyMailer).to receive(:with).and_return(mailer)
allow(mailer).to receive(:reply_with_summary).and_return(mailer_action)
allow(mailer).to receive(:reply_without_summary).and_return(mailer_action)
allow(mailer_action).to receive(:deliver_later).and_return(true)
end
it 'worker jobs are enqueued in the mailers queue' do
described_class.perform_async
expect(described_class.queue).to eq(:mailers)
end
it 'goes into the jobs array for testing environment' do
expect do
described_class.perform_async
end.to change(described_class.jobs, :size).by(1)
described_class.new.perform(1, message.id)
end
context 'with actions performed by the worker' do
it 'calls ConversationSummaryMailer#reply_with_summary when last incoming message was not email' do
described_class.new.perform(1, message.id)
expect(mailer).to have_received(:reply_with_summary)
end
it 'calls ConversationSummaryMailer#reply_without_summary when last incoming message was from email' do
message.save!
described_class.new.perform(1, message.id)
expect(mailer).to have_received(:reply_without_summary)
end
end
end
end