feat: implement mutex for SlackSendJob (#7783)
This commit is contained in:
35
app/jobs/mutex_application_job.rb
Normal file
35
app/jobs/mutex_application_job.rb
Normal file
@@ -0,0 +1,35 @@
|
|||||||
|
# MutexApplicationJob serves as a base class for jobs that require distributed locking mechanisms.
|
||||||
|
# It abstracts the locking logic using Redis and ensures that a block of code can be executed with
|
||||||
|
# mutual exclusion.
|
||||||
|
#
|
||||||
|
# The primary mechanism provided is the `with_lock` method, which accepts a key format and associated
|
||||||
|
# arguments. This method attempts to acquire a lock using the generated key, and if successful, it
|
||||||
|
# executes the provided block of code. If the lock cannot be acquired, it raises a LockAcquisitionError.
|
||||||
|
#
|
||||||
|
# To use this class, inherit from MutexApplicationJob and make use of the `with_lock` method in the
|
||||||
|
# `perform` method of the derived job class.
|
||||||
|
#
|
||||||
|
# Also see, retry mechanism here: https://edgeapi.rubyonrails.org/classes/ActiveJob/Exceptions/ClassMethods.html#method-i-retry_on
|
||||||
|
#
|
||||||
|
class MutexApplicationJob < ApplicationJob
|
||||||
|
class LockAcquisitionError < StandardError; end
|
||||||
|
|
||||||
|
def with_lock(key_format, *args)
|
||||||
|
lock_key = format(key_format, *args)
|
||||||
|
lock_manager = Redis::LockManager.new
|
||||||
|
|
||||||
|
if lock_manager.locked?(lock_key)
|
||||||
|
Rails.logger.warn "[#{self.class.name}] Failed to acquire lock on attempt #{executions}: #{lock_key}"
|
||||||
|
raise LockAcquisitionError, "Failed to acquire lock for key: #{lock_key}"
|
||||||
|
end
|
||||||
|
|
||||||
|
begin
|
||||||
|
lock_manager.lock(lock_key)
|
||||||
|
Rails.logger.info "[#{self.class.name}] Acquired lock for: #{lock_key} on attempt #{executions}"
|
||||||
|
yield
|
||||||
|
ensure
|
||||||
|
# Ensure that the lock is released even if there's an error in processing
|
||||||
|
lock_manager.unlock(lock_key)
|
||||||
|
end
|
||||||
|
end
|
||||||
|
end
|
||||||
@@ -1,7 +1,10 @@
|
|||||||
class SendOnSlackJob < ApplicationJob
|
class SendOnSlackJob < MutexApplicationJob
|
||||||
queue_as :medium
|
queue_as :medium
|
||||||
|
retry_on LockAcquisitionError, wait: 1.second, attempts: 6
|
||||||
|
|
||||||
def perform(message, hook)
|
def perform(message, hook)
|
||||||
|
with_lock(::Redis::Alfred::SLACK_MESSAGE_MUTEX, sender_id: message.sender_id, reference_id: hook.reference_id) do
|
||||||
Integrations::Slack::SendOnSlackService.new(message: message, hook: hook).perform
|
Integrations::Slack::SendOnSlackService.new(message: message, hook: hook).perform
|
||||||
end
|
end
|
||||||
end
|
end
|
||||||
|
end
|
||||||
|
|||||||
@@ -1,28 +1,16 @@
|
|||||||
class Webhooks::FacebookEventsJob < ApplicationJob
|
class Webhooks::FacebookEventsJob < MutexApplicationJob
|
||||||
class LockAcquisitionError < StandardError; end
|
|
||||||
|
|
||||||
queue_as :default
|
queue_as :default
|
||||||
# https://edgeapi.rubyonrails.org/classes/ActiveJob/Exceptions/ClassMethods.html#method-i-retry_on
|
|
||||||
retry_on LockAcquisitionError, wait: 1.second, attempts: 6
|
retry_on LockAcquisitionError, wait: 1.second, attempts: 6
|
||||||
|
|
||||||
def perform(message)
|
def perform(message)
|
||||||
response = ::Integrations::Facebook::MessageParser.new(message)
|
response = ::Integrations::Facebook::MessageParser.new(message)
|
||||||
|
|
||||||
lock_key = format(::Redis::Alfred::FACEBOOK_MESSAGE_MUTEX, sender_id: response.sender_id, recipient_id: response.recipient_id)
|
with_lock(::Redis::Alfred::FACEBOOK_MESSAGE_MUTEX, sender_id: response.sender_id, recipient_id: response.recipient_id) do
|
||||||
lock_manager = Redis::LockManager.new
|
process_message(response)
|
||||||
|
end
|
||||||
if lock_manager.locked?(lock_key)
|
|
||||||
Rails.logger.error "[Facebook::MessageCreator] Failed to acquire lock on attempt #{executions}: #{lock_key}"
|
|
||||||
raise LockAcquisitionError, "Failed to acquire lock for key: #{lock_key}"
|
|
||||||
end
|
end
|
||||||
|
|
||||||
begin
|
def process_message(response)
|
||||||
lock_manager.lock(lock_key)
|
|
||||||
Rails.logger.info "[Facebook::MessageCreator] Acquired lock for: #{lock_key}"
|
|
||||||
::Integrations::Facebook::MessageCreator.new(response).perform
|
::Integrations::Facebook::MessageCreator.new(response).perform
|
||||||
ensure
|
|
||||||
# Ensure that the lock is released even if there's an error in processing
|
|
||||||
lock_manager.unlock(lock_key)
|
|
||||||
end
|
|
||||||
end
|
end
|
||||||
end
|
end
|
||||||
|
|||||||
@@ -37,4 +37,5 @@ module Redis::RedisKeys
|
|||||||
## Sempahores / Locks
|
## Sempahores / Locks
|
||||||
# We don't want to process messages from the same sender concurrently to prevent creating double conversations
|
# We don't want to process messages from the same sender concurrently to prevent creating double conversations
|
||||||
FACEBOOK_MESSAGE_MUTEX = 'FB_MESSAGE_CREATE_LOCK::%<sender_id>s::%<recipient_id>s'.freeze
|
FACEBOOK_MESSAGE_MUTEX = 'FB_MESSAGE_CREATE_LOCK::%<sender_id>s::%<recipient_id>s'.freeze
|
||||||
|
SLACK_MESSAGE_MUTEX = 'SLACK_MESSAGE_LOCK::%<sender_id>s::%<reference_id>s'.freeze
|
||||||
end
|
end
|
||||||
|
|||||||
53
spec/jobs/mutex_application_job_spec.rb
Normal file
53
spec/jobs/mutex_application_job_spec.rb
Normal file
@@ -0,0 +1,53 @@
|
|||||||
|
require 'rails_helper'
|
||||||
|
|
||||||
|
RSpec.describe MutexApplicationJob do
|
||||||
|
let(:lock_manager) { instance_double(Redis::LockManager) }
|
||||||
|
let(:lock_key) { 'test_key' }
|
||||||
|
|
||||||
|
let(:test_mutex_job_class) do
|
||||||
|
stub_const('TestMutexJob', Class.new(MutexApplicationJob) do
|
||||||
|
def perform
|
||||||
|
with_lock('test_key') do
|
||||||
|
# Do nothing
|
||||||
|
end
|
||||||
|
end
|
||||||
|
end)
|
||||||
|
end
|
||||||
|
|
||||||
|
before do
|
||||||
|
allow(Redis::LockManager).to receive(:new).and_return(lock_manager)
|
||||||
|
allow(lock_manager).to receive(:locked?).and_return(false)
|
||||||
|
allow(lock_manager).to receive(:lock).and_return(true)
|
||||||
|
allow(lock_manager).to receive(:unlock).and_return(true)
|
||||||
|
end
|
||||||
|
|
||||||
|
describe '#with_lock' do
|
||||||
|
it 'acquires the lock and yields the block if lock is not acquired' do
|
||||||
|
expect(lock_manager).to receive(:locked?).with(lock_key).and_return(false)
|
||||||
|
expect(lock_manager).to receive(:lock).with(lock_key).and_return(true)
|
||||||
|
expect(lock_manager).to receive(:unlock).with(lock_key).and_return(true)
|
||||||
|
|
||||||
|
expect { |b| described_class.new.send(:with_lock, lock_key, &b) }.to yield_control
|
||||||
|
end
|
||||||
|
|
||||||
|
it 'raises LockAcquisitionError if it cannot acquire the lock' do
|
||||||
|
allow(lock_manager).to receive(:locked?).with(lock_key).and_return(true)
|
||||||
|
|
||||||
|
expect do
|
||||||
|
described_class.new.send(:with_lock, lock_key) do
|
||||||
|
# Do nothing
|
||||||
|
end
|
||||||
|
end.to raise_error(MutexApplicationJob::LockAcquisitionError)
|
||||||
|
end
|
||||||
|
|
||||||
|
it 'ensures that the lock is released even if there is an error during block execution' do
|
||||||
|
expect(lock_manager).to receive(:locked?).with(lock_key).and_return(false)
|
||||||
|
expect(lock_manager).to receive(:lock).with(lock_key).and_return(true)
|
||||||
|
expect(lock_manager).to receive(:unlock).with(lock_key).and_return(true)
|
||||||
|
|
||||||
|
expect do
|
||||||
|
described_class.new.send(:with_lock, lock_key) { raise StandardError }
|
||||||
|
end.to raise_error(StandardError)
|
||||||
|
end
|
||||||
|
end
|
||||||
|
end
|
||||||
@@ -5,14 +5,13 @@ RSpec.describe Webhooks::FacebookEventsJob do
|
|||||||
|
|
||||||
let(:params) { { test: 'test' } }
|
let(:params) { { test: 'test' } }
|
||||||
let(:parsed_response) { instance_double(Integrations::Facebook::MessageParser) }
|
let(:parsed_response) { instance_double(Integrations::Facebook::MessageParser) }
|
||||||
let(:lock_key) { 'FB_MESSAGE_CREATE_LOCK::sender_id::recipient_id' } # Use a real format if needed
|
let(:lock_key_format) { Redis::Alfred::FACEBOOK_MESSAGE_MUTEX }
|
||||||
let(:lock_manager) { instance_double(Redis::LockManager) }
|
let(:lock_key) { format(lock_key_format, sender_id: 'sender_id', recipient_id: 'recipient_id') } # Use real format if different
|
||||||
|
|
||||||
before do
|
before do
|
||||||
allow(Integrations::Facebook::MessageParser).to receive(:new).and_return(parsed_response)
|
allow(Integrations::Facebook::MessageParser).to receive(:new).and_return(parsed_response)
|
||||||
allow(parsed_response).to receive(:sender_id).and_return('sender_id')
|
allow(parsed_response).to receive(:sender_id).and_return('sender_id')
|
||||||
allow(parsed_response).to receive(:recipient_id).and_return('recipient_id')
|
allow(parsed_response).to receive(:recipient_id).and_return('recipient_id')
|
||||||
allow(Redis::LockManager).to receive(:new).and_return(lock_manager)
|
|
||||||
end
|
end
|
||||||
|
|
||||||
it 'enqueues the job' do
|
it 'enqueues the job' do
|
||||||
@@ -22,29 +21,15 @@ RSpec.describe Webhooks::FacebookEventsJob do
|
|||||||
end
|
end
|
||||||
|
|
||||||
describe 'job execution' do
|
describe 'job execution' do
|
||||||
context 'when the lock is already acquired' do
|
|
||||||
before do
|
|
||||||
allow(lock_manager).to receive(:locked?).and_return(true)
|
|
||||||
end
|
|
||||||
|
|
||||||
it 'raises a LockAcquisitionError' do
|
|
||||||
perform_enqueued_jobs do
|
|
||||||
expect { described_class.perform_now(params) }.to raise_error(Webhooks::FacebookEventsJob::LockAcquisitionError)
|
|
||||||
end
|
|
||||||
end
|
|
||||||
end
|
|
||||||
|
|
||||||
context 'when the lock is not acquired' do
|
|
||||||
let(:message_creator) { instance_double(Integrations::Facebook::MessageCreator) }
|
let(:message_creator) { instance_double(Integrations::Facebook::MessageCreator) }
|
||||||
|
|
||||||
before do
|
before do
|
||||||
allow(lock_manager).to receive(:locked?).and_return(false)
|
allow(Integrations::Facebook::MessageParser).to receive(:new).and_return(parsed_response)
|
||||||
allow(lock_manager).to receive(:unlock)
|
|
||||||
allow(lock_manager).to receive(:lock)
|
|
||||||
allow(Integrations::Facebook::MessageCreator).to receive(:new).with(parsed_response).and_return(message_creator)
|
allow(Integrations::Facebook::MessageCreator).to receive(:new).with(parsed_response).and_return(message_creator)
|
||||||
allow(message_creator).to receive(:perform)
|
allow(message_creator).to receive(:perform)
|
||||||
end
|
end
|
||||||
|
|
||||||
|
# ensures that the response is built
|
||||||
it 'invokes the message parser and creator' do
|
it 'invokes the message parser and creator' do
|
||||||
expect(Integrations::Facebook::MessageParser).to receive(:new).with(params)
|
expect(Integrations::Facebook::MessageParser).to receive(:new).with(params)
|
||||||
expect(Integrations::Facebook::MessageCreator).to receive(:new).with(parsed_response)
|
expect(Integrations::Facebook::MessageCreator).to receive(:new).with(parsed_response)
|
||||||
@@ -53,12 +38,14 @@ RSpec.describe Webhooks::FacebookEventsJob do
|
|||||||
described_class.perform_now(params)
|
described_class.perform_now(params)
|
||||||
end
|
end
|
||||||
|
|
||||||
it 'acquires and releases the lock' do
|
# this test ensures that the process message function is indeed called
|
||||||
expect(lock_manager).to receive(:lock).with(lock_key)
|
it 'attempts to acquire a lock and then processes the message' do
|
||||||
expect(lock_manager).to receive(:unlock).with(lock_key)
|
job_instance = described_class.new
|
||||||
|
allow(job_instance).to receive(:process_message).with(parsed_response)
|
||||||
|
|
||||||
described_class.perform_now(params)
|
job_instance.perform(params)
|
||||||
end
|
|
||||||
|
expect(job_instance).to have_received(:process_message).with(parsed_response)
|
||||||
end
|
end
|
||||||
end
|
end
|
||||||
end
|
end
|
||||||
|
|||||||
Reference in New Issue
Block a user