Some customers using WhatsApp inboxes with account-level webhooks were
reporting receiving duplicate `message_created` webhook deliveries for
every incoming message. Upon inspection, here's what we found
- Both payloads are identical.
- No errors appear in the application logs
- Webhook URL is only configured in one place.
This meant, the system was sending the webhooks twice. For some context,
there's a know related issue... Meta's WhatsApp Business API can deliver
the same webhook notification multiple times for a single message. The
codebase already acknowledges this — there's a comment in
`IncomingMessageBaseService#process_messages` noting that "multiple
webhook events can be received against the same message due to
misconfigurations in the Meta business manager account." A deduplication
guard exists, but it doesn't actually work under concurrency.
### Rationale
The existing dedup was a three-step sequence: check Redis (`GET`), check
the database, then set a Redis flag (`SETEX`). Two Sidekiq workers
processing duplicate Meta webhooks simultaneously would both complete
the `GET` before either executed the `SETEX`, so both would proceed to
create a message. The `source_id` column has a non-unique index, so the
database wouldn't catch the duplicate either. Each message then
independently fires `after_create_commit`, dispatching two
`message_created` webhook events to the customer.
```
Worker A Worker B
│ │
▼ ▼
Redis GET key ──► nil Redis GET key ──► nil
│ │
│ ◄── both pass guard ──► │
│ │
▼ ▼
Redis SETEX key Redis SETEX key
│ │
▼ ▼
BEGIN transaction BEGIN transaction
INSERT message INSERT message
DELETE Redis key ◄─┐ │
COMMIT │ DELETE Redis key
│ COMMIT
│ │
└── key gone before ───┘
B's commit lands
▼ ▼
after_create_commit after_create_commit
dispatch MESSAGE_CREATED dispatch MESSAGE_CREATED
│ │
▼ ▼
WebhookJob ──► n8n WebhookJob ──► n8n
(duplicate!)
```
There was a second, subtler problem visible in the diagram: the Redis
key was cleared *inside* the database transaction, before the
transaction committed. This opened a window where neither the Redis
check nor the database check would see the in-flight message.
The fix collapses the check-and-set into a single `SET NX EX` call,
which is atomic in Redis. The key is no longer eagerly cleared — it
expires naturally after 24 hours. The database lookup
(`find_message_by_source_id`) remains as a fallback for messages that
were created before the lock expired.
```
Worker A Worker B
│ │
▼ ▼
Redis SET NX ──► OK Redis SET NX ──► nil
│ │
▼ ▼
proceeds to create returns early
message normally (lock already held)
```
### Implementation Notes
The lock logic is extracted into `Whatsapp::MessageDedupLock`, a small
class that wraps a single `Redis SET NX EX` call. This makes the
concurrency guarantee testable in isolation — the spec uses a
`CyclicBarrier` to race two threads against the same key and asserts
exactly one wins, without needing database writes,
`use_transactional_tests = false`, or monkey-patching.
Because the Redis lock now persists (instead of being cleared
mid-transaction), existing WhatsApp specs needed an `after` hook to
clean up `MESSAGE_SOURCE_KEY::*` keys between examples. Transactional
fixtures only roll back the database, not Redis.
44 lines
1.3 KiB
Ruby
44 lines
1.3 KiB
Ruby
require 'rails_helper'
|
|
|
|
describe Whatsapp::MessageDedupLock do
|
|
let(:source_id) { "wamid.test_#{SecureRandom.hex(8)}" }
|
|
let(:lock) { described_class.new(source_id) }
|
|
let(:redis_key) { format(Redis::RedisKeys::MESSAGE_SOURCE_KEY, id: source_id) }
|
|
|
|
after { Redis::Alfred.delete(redis_key) }
|
|
|
|
describe '#acquire!' do
|
|
it 'returns truthy on first acquire' do
|
|
expect(lock.acquire!).to be_truthy
|
|
end
|
|
|
|
it 'returns falsy on second acquire for the same source_id' do
|
|
lock.acquire!
|
|
expect(described_class.new(source_id).acquire!).to be_falsy
|
|
end
|
|
|
|
it 'allows different source_ids to acquire independently' do
|
|
lock.acquire!
|
|
other = described_class.new("wamid.other_#{SecureRandom.hex(8)}")
|
|
expect(other.acquire!).to be_truthy
|
|
end
|
|
|
|
it 'lets exactly one thread win when two race for the same source_id' do
|
|
results = Concurrent::Array.new
|
|
barrier = Concurrent::CyclicBarrier.new(2)
|
|
|
|
threads = Array.new(2) do
|
|
Thread.new do
|
|
barrier.wait
|
|
results << described_class.new(source_id).acquire!
|
|
end
|
|
end
|
|
|
|
threads.each(&:join)
|
|
|
|
wins = results.count { |r| r }
|
|
expect(wins).to eq(1), "Expected exactly 1 winner but got #{wins}. Results: #{results.inspect}"
|
|
end
|
|
end
|
|
end
|