fix: duplicate message_created webhooks for WhatsApp messages (#13523)
Some customers using WhatsApp inboxes with account-level webhooks were
reporting receiving duplicate `message_created` webhook deliveries for
every incoming message. Upon inspection, here's what we found
- Both payloads are identical.
- No errors appear in the application logs
- Webhook URL is only configured in one place.
This meant, the system was sending the webhooks twice. For some context,
there's a know related issue... Meta's WhatsApp Business API can deliver
the same webhook notification multiple times for a single message. The
codebase already acknowledges this — there's a comment in
`IncomingMessageBaseService#process_messages` noting that "multiple
webhook events can be received against the same message due to
misconfigurations in the Meta business manager account." A deduplication
guard exists, but it doesn't actually work under concurrency.
### Rationale
The existing dedup was a three-step sequence: check Redis (`GET`), check
the database, then set a Redis flag (`SETEX`). Two Sidekiq workers
processing duplicate Meta webhooks simultaneously would both complete
the `GET` before either executed the `SETEX`, so both would proceed to
create a message. The `source_id` column has a non-unique index, so the
database wouldn't catch the duplicate either. Each message then
independently fires `after_create_commit`, dispatching two
`message_created` webhook events to the customer.
```
Worker A Worker B
│ │
▼ ▼
Redis GET key ──► nil Redis GET key ──► nil
│ │
│ ◄── both pass guard ──► │
│ │
▼ ▼
Redis SETEX key Redis SETEX key
│ │
▼ ▼
BEGIN transaction BEGIN transaction
INSERT message INSERT message
DELETE Redis key ◄─┐ │
COMMIT │ DELETE Redis key
│ COMMIT
│ │
└── key gone before ───┘
B's commit lands
▼ ▼
after_create_commit after_create_commit
dispatch MESSAGE_CREATED dispatch MESSAGE_CREATED
│ │
▼ ▼
WebhookJob ──► n8n WebhookJob ──► n8n
(duplicate!)
```
There was a second, subtler problem visible in the diagram: the Redis
key was cleared *inside* the database transaction, before the
transaction committed. This opened a window where neither the Redis
check nor the database check would see the in-flight message.
The fix collapses the check-and-set into a single `SET NX EX` call,
which is atomic in Redis. The key is no longer eagerly cleared — it
expires naturally after 24 hours. The database lookup
(`find_message_by_source_id`) remains as a fallback for messages that
were created before the lock expired.
```
Worker A Worker B
│ │
▼ ▼
Redis SET NX ──► OK Redis SET NX ──► nil
│ │
▼ ▼
proceeds to create returns early
message normally (lock already held)
```
### Implementation Notes
The lock logic is extracted into `Whatsapp::MessageDedupLock`, a small
class that wraps a single `Redis SET NX EX` call. This makes the
concurrency guarantee testable in isolation — the spec uses a
`CyclicBarrier` to race two threads against the same key and asserts
exactly one wins, without needing database writes,
`use_transactional_tests = false`, or monkey-patching.
Because the Redis lock now persists (instead of being cleared
mid-transaction), existing WhatsApp specs needed an `after` hook to
clean up `MESSAGE_SOURCE_KEY::*` keys between examples. Transactional
fixtures only roll back the database, not Redis.
This commit is contained in:
@@ -28,19 +28,19 @@ class Whatsapp::IncomingMessageBaseService
|
||||
# if the webhook event is a reaction or an ephermal message or an unsupported message.
|
||||
return if unprocessable_message_type?(message_type)
|
||||
|
||||
# Multiple webhook event can be received against the same message due to misconfigurations in the Meta
|
||||
# business manager account. While we have not found the core reason yet, the following line ensure that
|
||||
# there are no duplicate messages created.
|
||||
return if find_message_by_source_id(messages_data.first[:id]) || message_under_process?
|
||||
# Multiple webhook events can be received for the same message due to
|
||||
# misconfigurations in the Meta business manager account.
|
||||
# We use an atomic Redis SET NX to prevent concurrent workers from both
|
||||
# processing the same message simultaneously.
|
||||
return if find_message_by_source_id(messages_data.first[:id])
|
||||
return unless lock_message_source_id!
|
||||
|
||||
cache_message_source_id_in_redis
|
||||
set_contact
|
||||
return unless @contact
|
||||
|
||||
ActiveRecord::Base.transaction do
|
||||
set_conversation
|
||||
create_messages
|
||||
clear_message_source_id_from_redis
|
||||
end
|
||||
end
|
||||
|
||||
|
||||
@@ -69,20 +69,9 @@ module Whatsapp::IncomingMessageServiceHelpers
|
||||
@message = Message.find_by(source_id: source_id)
|
||||
end
|
||||
|
||||
def message_under_process?
|
||||
key = format(Redis::RedisKeys::MESSAGE_SOURCE_KEY, id: messages_data.first[:id])
|
||||
Redis::Alfred.get(key)
|
||||
end
|
||||
def lock_message_source_id!
|
||||
return false if messages_data.blank?
|
||||
|
||||
def cache_message_source_id_in_redis
|
||||
return if messages_data.blank?
|
||||
|
||||
key = format(Redis::RedisKeys::MESSAGE_SOURCE_KEY, id: messages_data.first[:id])
|
||||
::Redis::Alfred.setex(key, true)
|
||||
end
|
||||
|
||||
def clear_message_source_id_from_redis
|
||||
key = format(Redis::RedisKeys::MESSAGE_SOURCE_KEY, id: messages_data.first[:id])
|
||||
::Redis::Alfred.delete(key)
|
||||
Whatsapp::MessageDedupLock.new(messages_data.first[:id]).acquire!
|
||||
end
|
||||
end
|
||||
|
||||
19
app/services/whatsapp/message_dedup_lock.rb
Normal file
19
app/services/whatsapp/message_dedup_lock.rb
Normal file
@@ -0,0 +1,19 @@
|
||||
# Atomic dedup lock for WhatsApp incoming messages.
|
||||
#
|
||||
# Meta can deliver the same webhook event multiple times. This lock uses
|
||||
# Redis SET NX EX to ensure only one worker processes a given source_id.
|
||||
class Whatsapp::MessageDedupLock
|
||||
KEY_PREFIX = Redis::RedisKeys::MESSAGE_SOURCE_KEY
|
||||
DEFAULT_TTL = 1.day.to_i
|
||||
|
||||
def initialize(source_id, ttl: DEFAULT_TTL)
|
||||
@key = format(KEY_PREFIX, id: source_id)
|
||||
@ttl = ttl
|
||||
end
|
||||
|
||||
# Returns true when the lock is acquired (caller should proceed).
|
||||
# Returns false when another worker already holds the lock.
|
||||
def acquire!
|
||||
::Redis::Alfred.set(@key, true, nx: true, ex: @ttl)
|
||||
end
|
||||
end
|
||||
@@ -6,6 +6,14 @@ describe Whatsapp::IncomingMessageService do
|
||||
stub_request(:post, 'https://waba.360dialog.io/v1/configs/webhook')
|
||||
end
|
||||
|
||||
after do
|
||||
# The atomic dedup lock lives in Redis and is not rolled back by
|
||||
# transactional fixtures. Clean up any keys created during the test.
|
||||
Redis::Alfred.scan_each(match: 'MESSAGE_SOURCE_KEY::*') do |key|
|
||||
Redis::Alfred.delete(key)
|
||||
end
|
||||
end
|
||||
|
||||
let!(:whatsapp_channel) { create(:channel_whatsapp, sync_templates: false) }
|
||||
let(:wa_id) { '2423423243' }
|
||||
let!(:params) do
|
||||
@@ -393,8 +401,8 @@ describe Whatsapp::IncomingMessageService do
|
||||
end
|
||||
end
|
||||
|
||||
describe 'when message processing is in progress' do
|
||||
it 'ignores the current message creation request' do
|
||||
describe 'when another worker already holds the dedup lock' do
|
||||
it 'skips message creation' do
|
||||
params = { 'contacts' => [{ 'profile' => { 'name' => 'Kedar' }, 'wa_id' => '919746334593' }],
|
||||
'messages' => [{ 'from' => '919446284490',
|
||||
'id' => 'wamid.SDFADSf23sfasdafasdfa',
|
||||
@@ -409,17 +417,14 @@ describe Whatsapp::IncomingMessageService do
|
||||
'phones' => [{ 'phone' => '+1 (415) 341-8386' }] }
|
||||
] }] }.with_indifferent_access
|
||||
|
||||
expect(Message.find_by(source_id: 'wamid.SDFADSf23sfasdafasdfa')).not_to be_present
|
||||
key = format(Redis::RedisKeys::MESSAGE_SOURCE_KEY, id: 'wamid.SDFADSf23sfasdafasdfa')
|
||||
|
||||
Redis::Alfred.setex(key, true)
|
||||
expect(Redis::Alfred.get(key)).to be_truthy
|
||||
# Simulate another worker holding the lock
|
||||
lock = Whatsapp::MessageDedupLock.new('wamid.SDFADSf23sfasdafasdfa')
|
||||
expect(lock.acquire!).to be_truthy
|
||||
|
||||
described_class.new(inbox: whatsapp_channel.inbox, params: params).perform
|
||||
expect(whatsapp_channel.inbox.messages.count).to eq(0)
|
||||
expect(Message.find_by(source_id: 'wamid.SDFADSf23sfasdafasdfa')).not_to be_present
|
||||
|
||||
expect(Redis::Alfred.get(key)).to be_truthy
|
||||
ensure
|
||||
key = format(Redis::RedisKeys::MESSAGE_SOURCE_KEY, id: 'wamid.SDFADSf23sfasdafasdfa')
|
||||
Redis::Alfred.delete(key)
|
||||
end
|
||||
end
|
||||
|
||||
@@ -2,6 +2,10 @@ require 'rails_helper'
|
||||
|
||||
describe Whatsapp::IncomingMessageWhatsappCloudService do
|
||||
describe '#perform' do
|
||||
after do
|
||||
Redis::Alfred.scan_each(match: 'MESSAGE_SOURCE_KEY::*') { |key| Redis::Alfred.delete(key) }
|
||||
end
|
||||
|
||||
let!(:whatsapp_channel) { create(:channel_whatsapp, provider: 'whatsapp_cloud', sync_templates: false, validate_provider_config: false) }
|
||||
let(:params) do
|
||||
{
|
||||
|
||||
43
spec/services/whatsapp/message_dedup_lock_spec.rb
Normal file
43
spec/services/whatsapp/message_dedup_lock_spec.rb
Normal file
@@ -0,0 +1,43 @@
|
||||
require 'rails_helper'
|
||||
|
||||
describe Whatsapp::MessageDedupLock do
|
||||
let(:source_id) { "wamid.test_#{SecureRandom.hex(8)}" }
|
||||
let(:lock) { described_class.new(source_id) }
|
||||
let(:redis_key) { format(Redis::RedisKeys::MESSAGE_SOURCE_KEY, id: source_id) }
|
||||
|
||||
after { Redis::Alfred.delete(redis_key) }
|
||||
|
||||
describe '#acquire!' do
|
||||
it 'returns truthy on first acquire' do
|
||||
expect(lock.acquire!).to be_truthy
|
||||
end
|
||||
|
||||
it 'returns falsy on second acquire for the same source_id' do
|
||||
lock.acquire!
|
||||
expect(described_class.new(source_id).acquire!).to be_falsy
|
||||
end
|
||||
|
||||
it 'allows different source_ids to acquire independently' do
|
||||
lock.acquire!
|
||||
other = described_class.new("wamid.other_#{SecureRandom.hex(8)}")
|
||||
expect(other.acquire!).to be_truthy
|
||||
end
|
||||
|
||||
it 'lets exactly one thread win when two race for the same source_id' do
|
||||
results = Concurrent::Array.new
|
||||
barrier = Concurrent::CyclicBarrier.new(2)
|
||||
|
||||
threads = Array.new(2) do
|
||||
Thread.new do
|
||||
barrier.wait
|
||||
results << described_class.new(source_id).acquire!
|
||||
end
|
||||
end
|
||||
|
||||
threads.each(&:join)
|
||||
|
||||
wins = results.count { |r| r }
|
||||
expect(wins).to eq(1), "Expected exactly 1 winner but got #{wins}. Results: #{results.inspect}"
|
||||
end
|
||||
end
|
||||
end
|
||||
Reference in New Issue
Block a user