diff --git a/app/services/whatsapp/incoming_message_base_service.rb b/app/services/whatsapp/incoming_message_base_service.rb index a90814bdc..a8ad176b6 100644 --- a/app/services/whatsapp/incoming_message_base_service.rb +++ b/app/services/whatsapp/incoming_message_base_service.rb @@ -28,19 +28,19 @@ class Whatsapp::IncomingMessageBaseService # if the webhook event is a reaction or an ephermal message or an unsupported message. return if unprocessable_message_type?(message_type) - # Multiple webhook event can be received against the same message due to misconfigurations in the Meta - # business manager account. While we have not found the core reason yet, the following line ensure that - # there are no duplicate messages created. - return if find_message_by_source_id(messages_data.first[:id]) || message_under_process? + # Multiple webhook events can be received for the same message due to + # misconfigurations in the Meta business manager account. + # We use an atomic Redis SET NX to prevent concurrent workers from both + # processing the same message simultaneously. + return if find_message_by_source_id(messages_data.first[:id]) + return unless lock_message_source_id! - cache_message_source_id_in_redis set_contact return unless @contact ActiveRecord::Base.transaction do set_conversation create_messages - clear_message_source_id_from_redis end end diff --git a/app/services/whatsapp/incoming_message_service_helpers.rb b/app/services/whatsapp/incoming_message_service_helpers.rb index c803d61bd..bac6f6222 100644 --- a/app/services/whatsapp/incoming_message_service_helpers.rb +++ b/app/services/whatsapp/incoming_message_service_helpers.rb @@ -69,20 +69,9 @@ module Whatsapp::IncomingMessageServiceHelpers @message = Message.find_by(source_id: source_id) end - def message_under_process? - key = format(Redis::RedisKeys::MESSAGE_SOURCE_KEY, id: messages_data.first[:id]) - Redis::Alfred.get(key) - end + def lock_message_source_id! + return false if messages_data.blank? - def cache_message_source_id_in_redis - return if messages_data.blank? - - key = format(Redis::RedisKeys::MESSAGE_SOURCE_KEY, id: messages_data.first[:id]) - ::Redis::Alfred.setex(key, true) - end - - def clear_message_source_id_from_redis - key = format(Redis::RedisKeys::MESSAGE_SOURCE_KEY, id: messages_data.first[:id]) - ::Redis::Alfred.delete(key) + Whatsapp::MessageDedupLock.new(messages_data.first[:id]).acquire! end end diff --git a/app/services/whatsapp/message_dedup_lock.rb b/app/services/whatsapp/message_dedup_lock.rb new file mode 100644 index 000000000..4a863a4a4 --- /dev/null +++ b/app/services/whatsapp/message_dedup_lock.rb @@ -0,0 +1,19 @@ +# Atomic dedup lock for WhatsApp incoming messages. +# +# Meta can deliver the same webhook event multiple times. This lock uses +# Redis SET NX EX to ensure only one worker processes a given source_id. +class Whatsapp::MessageDedupLock + KEY_PREFIX = Redis::RedisKeys::MESSAGE_SOURCE_KEY + DEFAULT_TTL = 1.day.to_i + + def initialize(source_id, ttl: DEFAULT_TTL) + @key = format(KEY_PREFIX, id: source_id) + @ttl = ttl + end + + # Returns true when the lock is acquired (caller should proceed). + # Returns false when another worker already holds the lock. + def acquire! + ::Redis::Alfred.set(@key, true, nx: true, ex: @ttl) + end +end diff --git a/spec/services/whatsapp/incoming_message_service_spec.rb b/spec/services/whatsapp/incoming_message_service_spec.rb index 6c23e9b71..2ecf60acb 100644 --- a/spec/services/whatsapp/incoming_message_service_spec.rb +++ b/spec/services/whatsapp/incoming_message_service_spec.rb @@ -6,6 +6,14 @@ describe Whatsapp::IncomingMessageService do stub_request(:post, 'https://waba.360dialog.io/v1/configs/webhook') end + after do + # The atomic dedup lock lives in Redis and is not rolled back by + # transactional fixtures. Clean up any keys created during the test. + Redis::Alfred.scan_each(match: 'MESSAGE_SOURCE_KEY::*') do |key| + Redis::Alfred.delete(key) + end + end + let!(:whatsapp_channel) { create(:channel_whatsapp, sync_templates: false) } let(:wa_id) { '2423423243' } let!(:params) do @@ -393,8 +401,8 @@ describe Whatsapp::IncomingMessageService do end end - describe 'when message processing is in progress' do - it 'ignores the current message creation request' do + describe 'when another worker already holds the dedup lock' do + it 'skips message creation' do params = { 'contacts' => [{ 'profile' => { 'name' => 'Kedar' }, 'wa_id' => '919746334593' }], 'messages' => [{ 'from' => '919446284490', 'id' => 'wamid.SDFADSf23sfasdafasdfa', @@ -409,17 +417,14 @@ describe Whatsapp::IncomingMessageService do 'phones' => [{ 'phone' => '+1 (415) 341-8386' }] } ] }] }.with_indifferent_access - expect(Message.find_by(source_id: 'wamid.SDFADSf23sfasdafasdfa')).not_to be_present - key = format(Redis::RedisKeys::MESSAGE_SOURCE_KEY, id: 'wamid.SDFADSf23sfasdafasdfa') - - Redis::Alfred.setex(key, true) - expect(Redis::Alfred.get(key)).to be_truthy + # Simulate another worker holding the lock + lock = Whatsapp::MessageDedupLock.new('wamid.SDFADSf23sfasdafasdfa') + expect(lock.acquire!).to be_truthy described_class.new(inbox: whatsapp_channel.inbox, params: params).perform expect(whatsapp_channel.inbox.messages.count).to eq(0) - expect(Message.find_by(source_id: 'wamid.SDFADSf23sfasdafasdfa')).not_to be_present - - expect(Redis::Alfred.get(key)).to be_truthy + ensure + key = format(Redis::RedisKeys::MESSAGE_SOURCE_KEY, id: 'wamid.SDFADSf23sfasdafasdfa') Redis::Alfred.delete(key) end end diff --git a/spec/services/whatsapp/incoming_message_whatsapp_cloud_service_spec.rb b/spec/services/whatsapp/incoming_message_whatsapp_cloud_service_spec.rb index 6112ddc7b..4b6841811 100644 --- a/spec/services/whatsapp/incoming_message_whatsapp_cloud_service_spec.rb +++ b/spec/services/whatsapp/incoming_message_whatsapp_cloud_service_spec.rb @@ -2,6 +2,10 @@ require 'rails_helper' describe Whatsapp::IncomingMessageWhatsappCloudService do describe '#perform' do + after do + Redis::Alfred.scan_each(match: 'MESSAGE_SOURCE_KEY::*') { |key| Redis::Alfred.delete(key) } + end + let!(:whatsapp_channel) { create(:channel_whatsapp, provider: 'whatsapp_cloud', sync_templates: false, validate_provider_config: false) } let(:params) do { diff --git a/spec/services/whatsapp/message_dedup_lock_spec.rb b/spec/services/whatsapp/message_dedup_lock_spec.rb new file mode 100644 index 000000000..f3009b2ff --- /dev/null +++ b/spec/services/whatsapp/message_dedup_lock_spec.rb @@ -0,0 +1,43 @@ +require 'rails_helper' + +describe Whatsapp::MessageDedupLock do + let(:source_id) { "wamid.test_#{SecureRandom.hex(8)}" } + let(:lock) { described_class.new(source_id) } + let(:redis_key) { format(Redis::RedisKeys::MESSAGE_SOURCE_KEY, id: source_id) } + + after { Redis::Alfred.delete(redis_key) } + + describe '#acquire!' do + it 'returns truthy on first acquire' do + expect(lock.acquire!).to be_truthy + end + + it 'returns falsy on second acquire for the same source_id' do + lock.acquire! + expect(described_class.new(source_id).acquire!).to be_falsy + end + + it 'allows different source_ids to acquire independently' do + lock.acquire! + other = described_class.new("wamid.other_#{SecureRandom.hex(8)}") + expect(other.acquire!).to be_truthy + end + + it 'lets exactly one thread win when two race for the same source_id' do + results = Concurrent::Array.new + barrier = Concurrent::CyclicBarrier.new(2) + + threads = Array.new(2) do + Thread.new do + barrier.wait + results << described_class.new(source_id).acquire! + end + end + + threads.each(&:join) + + wins = results.count { |r| r } + expect(wins).to eq(1), "Expected exactly 1 winner but got #{wins}. Results: #{results.inspect}" + end + end +end