From 336584c95a55c14b6b1a9ee74cafdb22e39178ca Mon Sep 17 00:00:00 2001 From: Shivam Mishra Date: Mon, 4 Sep 2023 16:32:13 +0700 Subject: [PATCH] feat: mutex for `InstagramEventsJob` [CW-2447] (#7828) --- app/jobs/webhooks/facebook_events_job.rb | 2 +- app/jobs/webhooks/instagram_events_job.rb | 21 ++++++++++++++++++--- lib/redis/redis_keys.rb | 1 + 3 files changed, 20 insertions(+), 4 deletions(-) diff --git a/app/jobs/webhooks/facebook_events_job.rb b/app/jobs/webhooks/facebook_events_job.rb index 653f83c11..4240d62b9 100644 --- a/app/jobs/webhooks/facebook_events_job.rb +++ b/app/jobs/webhooks/facebook_events_job.rb @@ -1,6 +1,6 @@ class Webhooks::FacebookEventsJob < MutexApplicationJob queue_as :default - retry_on LockAcquisitionError, wait: 1.second, attempts: 6 + retry_on LockAcquisitionError, wait: 1.second, attempts: 8 def perform(message) response = ::Integrations::Facebook::MessageParser.new(message) diff --git a/app/jobs/webhooks/instagram_events_job.rb b/app/jobs/webhooks/instagram_events_job.rb index ef4dd0247..ad1114036 100644 --- a/app/jobs/webhooks/instagram_events_job.rb +++ b/app/jobs/webhooks/instagram_events_job.rb @@ -1,5 +1,6 @@ -class Webhooks::InstagramEventsJob < ApplicationJob +class Webhooks::InstagramEventsJob < MutexApplicationJob queue_as :default + retry_on LockAcquisitionError, wait: 1.second, attempts: 8 include HTTParty @@ -8,11 +9,17 @@ class Webhooks::InstagramEventsJob < ApplicationJob # @return [Array] We will support further events like reaction or seen in future SUPPORTED_EVENTS = [:message].freeze - # @see https://developers.facebook.com/docs/messenger-platform/instagram/features/webhook def perform(entries) @entries = entries - @entries.each do |entry| + with_lock(::Redis::Alfred::IG_MESSAGE_MUTEX, sender_id: sender_id, ig_account_id: ig_account_id) do + process_entries(entries) + end + end + + # @see https://developers.facebook.com/docs/messenger-platform/instagram/features/webhook + def process_entries(entries) + entries.each do |entry| entry = entry.with_indifferent_access messages(entry).each do |messaging| send(@event_name, messaging) if event_name(messaging) @@ -22,6 +29,14 @@ class Webhooks::InstagramEventsJob < ApplicationJob private + def ig_account_id + @entries&.first&.dig(:id) + end + + def sender_id + @entries&.dig(0, :messaging, 0, :sender, :id) + end + def event_name(messaging) @event_name ||= SUPPORTED_EVENTS.find { |key| messaging.key?(key) } end diff --git a/lib/redis/redis_keys.rb b/lib/redis/redis_keys.rb index dc0b672ef..8c5b92548 100644 --- a/lib/redis/redis_keys.rb +++ b/lib/redis/redis_keys.rb @@ -37,5 +37,6 @@ module Redis::RedisKeys ## Sempahores / Locks # We don't want to process messages from the same sender concurrently to prevent creating double conversations FACEBOOK_MESSAGE_MUTEX = 'FB_MESSAGE_CREATE_LOCK::%s::%s'.freeze + IG_MESSAGE_MUTEX = 'IG_MESSAGE_CREATE_LOCK::%s::%s'.freeze SLACK_MESSAGE_MUTEX = 'SLACK_MESSAGE_LOCK::%s::%s'.freeze end