From 3760f206e82dd01db8490923bf89b23f337b5c26 Mon Sep 17 00:00:00 2001 From: Shivam Mishra Date: Wed, 24 Jan 2024 15:48:21 +0530 Subject: [PATCH] fix: mutex timeout and error handling (#8770) Fixes the follow cases - The ensure block released the lock even on LockAcquisitionError - Custom timeout was not allowed This also refactored the with_lock method, now the key has to be constructed in the parent function itself Co-authored-by: Sojan Jose --- app/jobs/inboxes/fetch_imap_emails_job.rb | 3 +- app/jobs/mutex_application_job.rb | 32 +++++++++++++++------ app/jobs/send_on_slack_job.rb | 3 +- app/jobs/webhooks/facebook_events_job.rb | 3 +- app/jobs/webhooks/instagram_events_job.rb | 3 +- spec/jobs/mutex_application_job_spec.rb | 35 ++++++++++++++--------- 6 files changed, 54 insertions(+), 25 deletions(-) diff --git a/app/jobs/inboxes/fetch_imap_emails_job.rb b/app/jobs/inboxes/fetch_imap_emails_job.rb index 8f61e54b5..0f7718701 100644 --- a/app/jobs/inboxes/fetch_imap_emails_job.rb +++ b/app/jobs/inboxes/fetch_imap_emails_job.rb @@ -6,7 +6,8 @@ class Inboxes::FetchImapEmailsJob < MutexApplicationJob def perform(channel) return unless should_fetch_email?(channel) - with_lock(::Redis::Alfred::EMAIL_MESSAGE_MUTEX, inbox_id: channel.inbox.id) do + key = format(::Redis::Alfred::EMAIL_MESSAGE_MUTEX, inbox_id: channel.inbox.id) + with_lock(key, 5.minutes) do process_email_for_channel(channel) end rescue *ExceptionList::IMAP_EXCEPTIONS => e diff --git a/app/jobs/mutex_application_job.rb b/app/jobs/mutex_application_job.rb index 98e6bf9cd..58c7cbf36 100644 --- a/app/jobs/mutex_application_job.rb +++ b/app/jobs/mutex_application_job.rb @@ -14,20 +14,36 @@ class MutexApplicationJob < ApplicationJob class LockAcquisitionError < StandardError; end - def with_lock(key_format, *args) - lock_key = format(key_format, *args) + def with_lock(lock_key, timeout = Redis::LockManager::LOCK_TIMEOUT) lock_manager = Redis::LockManager.new begin - if lock_manager.lock(lock_key) - Rails.logger.info "[#{self.class.name}] Acquired lock for: #{lock_key} on attempt #{executions}" + if lock_manager.lock(lock_key, timeout) + log_attempt(lock_key, executions) yield + # release the lock after the block has been executed + lock_manager.unlock(lock_key) else - Rails.logger.warn "[#{self.class.name}] Failed to acquire lock on attempt #{executions}: #{lock_key}" - raise LockAcquisitionError, "Failed to acquire lock for key: #{lock_key}" + handle_failed_lock_acquisition(lock_key) end - ensure - lock_manager.unlock(lock_key) + rescue StandardError => e + handle_error(e, lock_manager, lock_key) end end + + private + + def log_attempt(lock_key, executions) + Rails.logger.info "[#{self.class.name}] Acquired lock for: #{lock_key} on attempt #{executions}" + end + + def handle_error(err, lock_manager, lock_key) + lock_manager.unlock(lock_key) unless err.is_a?(LockAcquisitionError) + raise err + end + + def handle_failed_lock_acquisition(lock_key) + Rails.logger.warn "[#{self.class.name}] Failed to acquire lock on attempt #{executions}: #{lock_key}" + raise LockAcquisitionError, "Failed to acquire lock for key: #{lock_key}" + end end diff --git a/app/jobs/send_on_slack_job.rb b/app/jobs/send_on_slack_job.rb index eececa409..c8a556ce7 100644 --- a/app/jobs/send_on_slack_job.rb +++ b/app/jobs/send_on_slack_job.rb @@ -3,7 +3,8 @@ class SendOnSlackJob < MutexApplicationJob retry_on LockAcquisitionError, wait: 1.second, attempts: 8 def perform(message, hook) - with_lock(::Redis::Alfred::SLACK_MESSAGE_MUTEX, conversation_id: message.conversation_id, reference_id: hook.reference_id) do + key = format(::Redis::Alfred::SLACK_MESSAGE_MUTEX, conversation_id: message.conversation_id, reference_id: hook.reference_id) + with_lock(key) do Integrations::Slack::SendOnSlackService.new(message: message, hook: hook).perform end end diff --git a/app/jobs/webhooks/facebook_events_job.rb b/app/jobs/webhooks/facebook_events_job.rb index 4240d62b9..0694a2b85 100644 --- a/app/jobs/webhooks/facebook_events_job.rb +++ b/app/jobs/webhooks/facebook_events_job.rb @@ -5,7 +5,8 @@ class Webhooks::FacebookEventsJob < MutexApplicationJob def perform(message) response = ::Integrations::Facebook::MessageParser.new(message) - with_lock(::Redis::Alfred::FACEBOOK_MESSAGE_MUTEX, sender_id: response.sender_id, recipient_id: response.recipient_id) do + key = format(::Redis::Alfred::FACEBOOK_MESSAGE_MUTEX, sender_id: response.sender_id, recipient_id: response.recipient_id) + with_lock(key) do process_message(response) end end diff --git a/app/jobs/webhooks/instagram_events_job.rb b/app/jobs/webhooks/instagram_events_job.rb index 024ba4a23..825f220a7 100644 --- a/app/jobs/webhooks/instagram_events_job.rb +++ b/app/jobs/webhooks/instagram_events_job.rb @@ -12,7 +12,8 @@ class Webhooks::InstagramEventsJob < MutexApplicationJob def perform(entries) @entries = entries - with_lock(::Redis::Alfred::IG_MESSAGE_MUTEX, sender_id: sender_id, ig_account_id: ig_account_id) do + key = format(::Redis::Alfred::IG_MESSAGE_MUTEX, sender_id: sender_id, ig_account_id: ig_account_id) + with_lock(key) do process_entries(entries) end end diff --git a/spec/jobs/mutex_application_job_spec.rb b/spec/jobs/mutex_application_job_spec.rb index 919195c32..b62db00f0 100644 --- a/spec/jobs/mutex_application_job_spec.rb +++ b/spec/jobs/mutex_application_job_spec.rb @@ -4,16 +4,6 @@ RSpec.describe MutexApplicationJob do let(:lock_manager) { instance_double(Redis::LockManager) } let(:lock_key) { 'test_key' } - let(:test_mutex_job_class) do - stub_const('TestMutexJob', Class.new(MutexApplicationJob) do - def perform - with_lock('test_key') do - # Do nothing - end - end - end) - end - before do allow(Redis::LockManager).to receive(:new).and_return(lock_manager) allow(lock_manager).to receive(:lock).and_return(true) @@ -22,24 +12,43 @@ RSpec.describe MutexApplicationJob do describe '#with_lock' do it 'acquires the lock and yields the block if lock is not acquired' do - expect(lock_manager).to receive(:lock).with(lock_key).and_return(true) + expect(lock_manager).to receive(:lock).with(lock_key, Redis::LockManager::LOCK_TIMEOUT).and_return(true) expect(lock_manager).to receive(:unlock).with(lock_key).and_return(true) expect { |b| described_class.new.send(:with_lock, lock_key, &b) }.to yield_control end + it 'acquires the lock with custom timeout' do + expect(lock_manager).to receive(:lock).with(lock_key, 5.seconds).and_return(true) + expect(lock_manager).to receive(:unlock).with(lock_key).and_return(true) + + expect { |b| described_class.new.send(:with_lock, lock_key, 5.seconds, &b) }.to yield_control + end + it 'raises LockAcquisitionError if it cannot acquire the lock' do - allow(lock_manager).to receive(:lock).with(lock_key).and_return(false) + allow(lock_manager).to receive(:lock).with(lock_key, Redis::LockManager::LOCK_TIMEOUT).and_return(false) expect do described_class.new.send(:with_lock, lock_key) do # Do nothing end end.to raise_error(MutexApplicationJob::LockAcquisitionError) + expect(lock_manager).not_to receive(:unlock) + end + + it 'raises StandardError if it execution raises it' do + allow(lock_manager).to receive(:lock).with(lock_key, Redis::LockManager::LOCK_TIMEOUT).and_return(false) + allow(lock_manager).to receive(:unlock).with(lock_key).and_return(true) + + expect do + described_class.new.send(:with_lock, lock_key) do + raise StandardError + end + end.to raise_error(StandardError) end it 'ensures that the lock is released even if there is an error during block execution' do - expect(lock_manager).to receive(:lock).with(lock_key).and_return(true) + expect(lock_manager).to receive(:lock).with(lock_key, Redis::LockManager::LOCK_TIMEOUT).and_return(true) expect(lock_manager).to receive(:unlock).with(lock_key).and_return(true) expect do