From 39243b9e71c70b26a2e274bcfa64934c1c32b5bb Mon Sep 17 00:00:00 2001 From: Shivam Mishra Date: Tue, 17 Feb 2026 14:01:10 +0530 Subject: [PATCH] fix: duplicate `message_created` webhooks for WhatsApp messages (#13523) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Some customers using WhatsApp inboxes with account-level webhooks were reporting receiving duplicate `message_created` webhook deliveries for every incoming message. Upon inspection, here's what we found - Both payloads are identical. - No errors appear in the application logs - Webhook URL is only configured in one place. This meant, the system was sending the webhooks twice. For some context, there's a know related issue... Meta's WhatsApp Business API can deliver the same webhook notification multiple times for a single message. The codebase already acknowledges this — there's a comment in `IncomingMessageBaseService#process_messages` noting that "multiple webhook events can be received against the same message due to misconfigurations in the Meta business manager account." A deduplication guard exists, but it doesn't actually work under concurrency. ### Rationale The existing dedup was a three-step sequence: check Redis (`GET`), check the database, then set a Redis flag (`SETEX`). Two Sidekiq workers processing duplicate Meta webhooks simultaneously would both complete the `GET` before either executed the `SETEX`, so both would proceed to create a message. The `source_id` column has a non-unique index, so the database wouldn't catch the duplicate either. Each message then independently fires `after_create_commit`, dispatching two `message_created` webhook events to the customer. ``` Worker A Worker B │ │ ▼ ▼ Redis GET key ──► nil Redis GET key ──► nil │ │ │ ◄── both pass guard ──► │ │ │ ▼ ▼ Redis SETEX key Redis SETEX key │ │ ▼ ▼ BEGIN transaction BEGIN transaction INSERT message INSERT message DELETE Redis key ◄─┐ │ COMMIT │ DELETE Redis key │ COMMIT │ │ └── key gone before ───┘ B's commit lands ▼ ▼ after_create_commit after_create_commit dispatch MESSAGE_CREATED dispatch MESSAGE_CREATED │ │ ▼ ▼ WebhookJob ──► n8n WebhookJob ──► n8n (duplicate!) ``` There was a second, subtler problem visible in the diagram: the Redis key was cleared *inside* the database transaction, before the transaction committed. This opened a window where neither the Redis check nor the database check would see the in-flight message. The fix collapses the check-and-set into a single `SET NX EX` call, which is atomic in Redis. The key is no longer eagerly cleared — it expires naturally after 24 hours. The database lookup (`find_message_by_source_id`) remains as a fallback for messages that were created before the lock expired. ``` Worker A Worker B │ │ ▼ ▼ Redis SET NX ──► OK Redis SET NX ──► nil │ │ ▼ ▼ proceeds to create returns early message normally (lock already held) ``` ### Implementation Notes The lock logic is extracted into `Whatsapp::MessageDedupLock`, a small class that wraps a single `Redis SET NX EX` call. This makes the concurrency guarantee testable in isolation — the spec uses a `CyclicBarrier` to race two threads against the same key and asserts exactly one wins, without needing database writes, `use_transactional_tests = false`, or monkey-patching. Because the Redis lock now persists (instead of being cleared mid-transaction), existing WhatsApp specs needed an `after` hook to clean up `MESSAGE_SOURCE_KEY::*` keys between examples. Transactional fixtures only roll back the database, not Redis. --- .../whatsapp/incoming_message_base_service.rb | 12 +++--- .../incoming_message_service_helpers.rb | 17 ++------ app/services/whatsapp/message_dedup_lock.rb | 19 ++++++++ .../whatsapp/incoming_message_service_spec.rb | 25 ++++++----- ...ing_message_whatsapp_cloud_service_spec.rb | 4 ++ .../whatsapp/message_dedup_lock_spec.rb | 43 +++++++++++++++++++ 6 files changed, 90 insertions(+), 30 deletions(-) create mode 100644 app/services/whatsapp/message_dedup_lock.rb create mode 100644 spec/services/whatsapp/message_dedup_lock_spec.rb diff --git a/app/services/whatsapp/incoming_message_base_service.rb b/app/services/whatsapp/incoming_message_base_service.rb index a90814bdc..a8ad176b6 100644 --- a/app/services/whatsapp/incoming_message_base_service.rb +++ b/app/services/whatsapp/incoming_message_base_service.rb @@ -28,19 +28,19 @@ class Whatsapp::IncomingMessageBaseService # if the webhook event is a reaction or an ephermal message or an unsupported message. return if unprocessable_message_type?(message_type) - # Multiple webhook event can be received against the same message due to misconfigurations in the Meta - # business manager account. While we have not found the core reason yet, the following line ensure that - # there are no duplicate messages created. - return if find_message_by_source_id(messages_data.first[:id]) || message_under_process? + # Multiple webhook events can be received for the same message due to + # misconfigurations in the Meta business manager account. + # We use an atomic Redis SET NX to prevent concurrent workers from both + # processing the same message simultaneously. + return if find_message_by_source_id(messages_data.first[:id]) + return unless lock_message_source_id! - cache_message_source_id_in_redis set_contact return unless @contact ActiveRecord::Base.transaction do set_conversation create_messages - clear_message_source_id_from_redis end end diff --git a/app/services/whatsapp/incoming_message_service_helpers.rb b/app/services/whatsapp/incoming_message_service_helpers.rb index c803d61bd..bac6f6222 100644 --- a/app/services/whatsapp/incoming_message_service_helpers.rb +++ b/app/services/whatsapp/incoming_message_service_helpers.rb @@ -69,20 +69,9 @@ module Whatsapp::IncomingMessageServiceHelpers @message = Message.find_by(source_id: source_id) end - def message_under_process? - key = format(Redis::RedisKeys::MESSAGE_SOURCE_KEY, id: messages_data.first[:id]) - Redis::Alfred.get(key) - end + def lock_message_source_id! + return false if messages_data.blank? - def cache_message_source_id_in_redis - return if messages_data.blank? - - key = format(Redis::RedisKeys::MESSAGE_SOURCE_KEY, id: messages_data.first[:id]) - ::Redis::Alfred.setex(key, true) - end - - def clear_message_source_id_from_redis - key = format(Redis::RedisKeys::MESSAGE_SOURCE_KEY, id: messages_data.first[:id]) - ::Redis::Alfred.delete(key) + Whatsapp::MessageDedupLock.new(messages_data.first[:id]).acquire! end end diff --git a/app/services/whatsapp/message_dedup_lock.rb b/app/services/whatsapp/message_dedup_lock.rb new file mode 100644 index 000000000..4a863a4a4 --- /dev/null +++ b/app/services/whatsapp/message_dedup_lock.rb @@ -0,0 +1,19 @@ +# Atomic dedup lock for WhatsApp incoming messages. +# +# Meta can deliver the same webhook event multiple times. This lock uses +# Redis SET NX EX to ensure only one worker processes a given source_id. +class Whatsapp::MessageDedupLock + KEY_PREFIX = Redis::RedisKeys::MESSAGE_SOURCE_KEY + DEFAULT_TTL = 1.day.to_i + + def initialize(source_id, ttl: DEFAULT_TTL) + @key = format(KEY_PREFIX, id: source_id) + @ttl = ttl + end + + # Returns true when the lock is acquired (caller should proceed). + # Returns false when another worker already holds the lock. + def acquire! + ::Redis::Alfred.set(@key, true, nx: true, ex: @ttl) + end +end diff --git a/spec/services/whatsapp/incoming_message_service_spec.rb b/spec/services/whatsapp/incoming_message_service_spec.rb index 6c23e9b71..2ecf60acb 100644 --- a/spec/services/whatsapp/incoming_message_service_spec.rb +++ b/spec/services/whatsapp/incoming_message_service_spec.rb @@ -6,6 +6,14 @@ describe Whatsapp::IncomingMessageService do stub_request(:post, 'https://waba.360dialog.io/v1/configs/webhook') end + after do + # The atomic dedup lock lives in Redis and is not rolled back by + # transactional fixtures. Clean up any keys created during the test. + Redis::Alfred.scan_each(match: 'MESSAGE_SOURCE_KEY::*') do |key| + Redis::Alfred.delete(key) + end + end + let!(:whatsapp_channel) { create(:channel_whatsapp, sync_templates: false) } let(:wa_id) { '2423423243' } let!(:params) do @@ -393,8 +401,8 @@ describe Whatsapp::IncomingMessageService do end end - describe 'when message processing is in progress' do - it 'ignores the current message creation request' do + describe 'when another worker already holds the dedup lock' do + it 'skips message creation' do params = { 'contacts' => [{ 'profile' => { 'name' => 'Kedar' }, 'wa_id' => '919746334593' }], 'messages' => [{ 'from' => '919446284490', 'id' => 'wamid.SDFADSf23sfasdafasdfa', @@ -409,17 +417,14 @@ describe Whatsapp::IncomingMessageService do 'phones' => [{ 'phone' => '+1 (415) 341-8386' }] } ] }] }.with_indifferent_access - expect(Message.find_by(source_id: 'wamid.SDFADSf23sfasdafasdfa')).not_to be_present - key = format(Redis::RedisKeys::MESSAGE_SOURCE_KEY, id: 'wamid.SDFADSf23sfasdafasdfa') - - Redis::Alfred.setex(key, true) - expect(Redis::Alfred.get(key)).to be_truthy + # Simulate another worker holding the lock + lock = Whatsapp::MessageDedupLock.new('wamid.SDFADSf23sfasdafasdfa') + expect(lock.acquire!).to be_truthy described_class.new(inbox: whatsapp_channel.inbox, params: params).perform expect(whatsapp_channel.inbox.messages.count).to eq(0) - expect(Message.find_by(source_id: 'wamid.SDFADSf23sfasdafasdfa')).not_to be_present - - expect(Redis::Alfred.get(key)).to be_truthy + ensure + key = format(Redis::RedisKeys::MESSAGE_SOURCE_KEY, id: 'wamid.SDFADSf23sfasdafasdfa') Redis::Alfred.delete(key) end end diff --git a/spec/services/whatsapp/incoming_message_whatsapp_cloud_service_spec.rb b/spec/services/whatsapp/incoming_message_whatsapp_cloud_service_spec.rb index 6112ddc7b..4b6841811 100644 --- a/spec/services/whatsapp/incoming_message_whatsapp_cloud_service_spec.rb +++ b/spec/services/whatsapp/incoming_message_whatsapp_cloud_service_spec.rb @@ -2,6 +2,10 @@ require 'rails_helper' describe Whatsapp::IncomingMessageWhatsappCloudService do describe '#perform' do + after do + Redis::Alfred.scan_each(match: 'MESSAGE_SOURCE_KEY::*') { |key| Redis::Alfred.delete(key) } + end + let!(:whatsapp_channel) { create(:channel_whatsapp, provider: 'whatsapp_cloud', sync_templates: false, validate_provider_config: false) } let(:params) do { diff --git a/spec/services/whatsapp/message_dedup_lock_spec.rb b/spec/services/whatsapp/message_dedup_lock_spec.rb new file mode 100644 index 000000000..f3009b2ff --- /dev/null +++ b/spec/services/whatsapp/message_dedup_lock_spec.rb @@ -0,0 +1,43 @@ +require 'rails_helper' + +describe Whatsapp::MessageDedupLock do + let(:source_id) { "wamid.test_#{SecureRandom.hex(8)}" } + let(:lock) { described_class.new(source_id) } + let(:redis_key) { format(Redis::RedisKeys::MESSAGE_SOURCE_KEY, id: source_id) } + + after { Redis::Alfred.delete(redis_key) } + + describe '#acquire!' do + it 'returns truthy on first acquire' do + expect(lock.acquire!).to be_truthy + end + + it 'returns falsy on second acquire for the same source_id' do + lock.acquire! + expect(described_class.new(source_id).acquire!).to be_falsy + end + + it 'allows different source_ids to acquire independently' do + lock.acquire! + other = described_class.new("wamid.other_#{SecureRandom.hex(8)}") + expect(other.acquire!).to be_truthy + end + + it 'lets exactly one thread win when two race for the same source_id' do + results = Concurrent::Array.new + barrier = Concurrent::CyclicBarrier.new(2) + + threads = Array.new(2) do + Thread.new do + barrier.wait + results << described_class.new(source_id).acquire! + end + end + + threads.each(&:join) + + wins = results.count { |r| r } + expect(wins).to eq(1), "Expected exactly 1 winner but got #{wins}. Results: #{results.inspect}" + end + end +end