Some customers using WhatsApp inboxes with account-level webhooks were
reporting receiving duplicate `message_created` webhook deliveries for
every incoming message. Upon inspection, here's what we found
- Both payloads are identical.
- No errors appear in the application logs
- Webhook URL is only configured in one place.
This meant, the system was sending the webhooks twice. For some context,
there's a know related issue... Meta's WhatsApp Business API can deliver
the same webhook notification multiple times for a single message. The
codebase already acknowledges this — there's a comment in
`IncomingMessageBaseService#process_messages` noting that "multiple
webhook events can be received against the same message due to
misconfigurations in the Meta business manager account." A deduplication
guard exists, but it doesn't actually work under concurrency.
### Rationale
The existing dedup was a three-step sequence: check Redis (`GET`), check
the database, then set a Redis flag (`SETEX`). Two Sidekiq workers
processing duplicate Meta webhooks simultaneously would both complete
the `GET` before either executed the `SETEX`, so both would proceed to
create a message. The `source_id` column has a non-unique index, so the
database wouldn't catch the duplicate either. Each message then
independently fires `after_create_commit`, dispatching two
`message_created` webhook events to the customer.
```
Worker A Worker B
│ │
▼ ▼
Redis GET key ──► nil Redis GET key ──► nil
│ │
│ ◄── both pass guard ──► │
│ │
▼ ▼
Redis SETEX key Redis SETEX key
│ │
▼ ▼
BEGIN transaction BEGIN transaction
INSERT message INSERT message
DELETE Redis key ◄─┐ │
COMMIT │ DELETE Redis key
│ COMMIT
│ │
└── key gone before ───┘
B's commit lands
▼ ▼
after_create_commit after_create_commit
dispatch MESSAGE_CREATED dispatch MESSAGE_CREATED
│ │
▼ ▼
WebhookJob ──► n8n WebhookJob ──► n8n
(duplicate!)
```
There was a second, subtler problem visible in the diagram: the Redis
key was cleared *inside* the database transaction, before the
transaction committed. This opened a window where neither the Redis
check nor the database check would see the in-flight message.
The fix collapses the check-and-set into a single `SET NX EX` call,
which is atomic in Redis. The key is no longer eagerly cleared — it
expires naturally after 24 hours. The database lookup
(`find_message_by_source_id`) remains as a fallback for messages that
were created before the lock expired.
```
Worker A Worker B
│ │
▼ ▼
Redis SET NX ──► OK Redis SET NX ──► nil
│ │
▼ ▼
proceeds to create returns early
message normally (lock already held)
```
### Implementation Notes
The lock logic is extracted into `Whatsapp::MessageDedupLock`, a small
class that wraps a single `Redis SET NX EX` call. This makes the
concurrency guarantee testable in isolation — the spec uses a
`CyclicBarrier` to race two threads against the same key and asserts
exactly one wins, without needing database writes,
`use_transactional_tests = false`, or monkey-patching.
Because the Redis lock now persists (instead of being cleared
mid-transaction), existing WhatsApp specs needed an `after` hook to
clean up `MESSAGE_SOURCE_KEY::*` keys between examples. Transactional
fixtures only roll back the database, not Redis.
231 lines
7.4 KiB
Ruby
231 lines
7.4 KiB
Ruby
# Mostly modeled after the intial implementation of the service based on 360 Dialog
|
|
# https://docs.360dialog.com/whatsapp-api/whatsapp-api/media
|
|
# https://developers.facebook.com/docs/whatsapp/api/media/
|
|
class Whatsapp::IncomingMessageBaseService
|
|
include ::Whatsapp::IncomingMessageServiceHelpers
|
|
|
|
pattr_initialize [:inbox!, :params!, :outgoing_echo]
|
|
|
|
def perform
|
|
processed_params
|
|
|
|
if processed_params.try(:[], :statuses).present?
|
|
process_statuses
|
|
elsif messages_data.present?
|
|
process_messages
|
|
end
|
|
end
|
|
|
|
# Returns messages array for both regular messages and echo events
|
|
def messages_data
|
|
@processed_params&.dig(:messages) || @processed_params&.dig(:message_echoes)
|
|
end
|
|
|
|
private
|
|
|
|
def process_messages
|
|
# We don't support reactions & ephemeral message now, we need to skip processing the message
|
|
# if the webhook event is a reaction or an ephermal message or an unsupported message.
|
|
return if unprocessable_message_type?(message_type)
|
|
|
|
# Multiple webhook events can be received for the same message due to
|
|
# misconfigurations in the Meta business manager account.
|
|
# We use an atomic Redis SET NX to prevent concurrent workers from both
|
|
# processing the same message simultaneously.
|
|
return if find_message_by_source_id(messages_data.first[:id])
|
|
return unless lock_message_source_id!
|
|
|
|
set_contact
|
|
return unless @contact
|
|
|
|
ActiveRecord::Base.transaction do
|
|
set_conversation
|
|
create_messages
|
|
end
|
|
end
|
|
|
|
def process_statuses
|
|
return unless find_message_by_source_id(@processed_params[:statuses].first[:id])
|
|
|
|
update_message_with_status(@message, @processed_params[:statuses].first)
|
|
rescue ArgumentError => e
|
|
Rails.logger.error "Error while processing whatsapp status update #{e.message}"
|
|
end
|
|
|
|
def update_message_with_status(message, status)
|
|
message.status = status[:status]
|
|
if status[:status] == 'failed' && status[:errors].present?
|
|
error = status[:errors]&.first
|
|
message.external_error = "#{error[:code]}: #{error[:title]}"
|
|
end
|
|
message.save!
|
|
end
|
|
|
|
def create_messages
|
|
message = messages_data.first
|
|
log_error(message) && return if error_webhook_event?(message)
|
|
|
|
process_in_reply_to(message)
|
|
|
|
message_type == 'contacts' ? create_contact_messages(message) : create_regular_message(message)
|
|
end
|
|
|
|
def create_contact_messages(message)
|
|
message['contacts'].each do |contact|
|
|
# Pass source_id from parent message since contact objects don't have :id
|
|
create_message(contact, source_id: message[:id])
|
|
attach_contact(contact)
|
|
@message.save!
|
|
end
|
|
end
|
|
|
|
def create_regular_message(message)
|
|
create_message(message, source_id: message[:id])
|
|
attach_files
|
|
attach_location if message_type == 'location'
|
|
@message.save!
|
|
end
|
|
|
|
def set_contact
|
|
if outgoing_echo
|
|
set_contact_from_echo
|
|
else
|
|
set_contact_from_message
|
|
end
|
|
end
|
|
|
|
def set_contact_from_echo
|
|
# For echo messages, contact phone is in the 'to' field
|
|
phone_number = messages_data.first[:to]
|
|
waid = processed_waid(phone_number)
|
|
|
|
contact_inbox = ::ContactInboxWithContactBuilder.new(
|
|
source_id: waid,
|
|
inbox: inbox,
|
|
contact_attributes: { name: "+#{phone_number}", phone_number: "+#{phone_number}" }
|
|
).perform
|
|
|
|
@contact_inbox = contact_inbox
|
|
@contact = contact_inbox.contact
|
|
end
|
|
|
|
def set_contact_from_message
|
|
contact_params = @processed_params[:contacts]&.first
|
|
return if contact_params.blank?
|
|
|
|
waid = processed_waid(contact_params[:wa_id])
|
|
|
|
contact_inbox = ::ContactInboxWithContactBuilder.new(
|
|
source_id: waid,
|
|
inbox: inbox,
|
|
contact_attributes: { name: contact_params.dig(:profile, :name), phone_number: "+#{messages_data.first[:from]}" }
|
|
).perform
|
|
|
|
@contact_inbox = contact_inbox
|
|
@contact = contact_inbox.contact
|
|
|
|
# Update existing contact name if ProfileName is available and current name is just phone number
|
|
update_contact_with_profile_name(contact_params)
|
|
end
|
|
|
|
def set_conversation
|
|
# if lock to single conversation is disabled, we will create a new conversation if previous conversation is resolved
|
|
@conversation = if @inbox.lock_to_single_conversation
|
|
@contact_inbox.conversations.last
|
|
else
|
|
@contact_inbox.conversations
|
|
.where.not(status: :resolved).last
|
|
end
|
|
return if @conversation
|
|
|
|
@conversation = ::Conversation.create!(conversation_params)
|
|
end
|
|
|
|
def attach_files
|
|
return if %w[text button interactive location contacts].include?(message_type)
|
|
|
|
attachment_payload = messages_data.first[message_type.to_sym]
|
|
@message.content ||= attachment_payload[:caption]
|
|
|
|
attachment_file = download_attachment_file(attachment_payload)
|
|
return if attachment_file.blank?
|
|
|
|
@message.attachments.new(
|
|
account_id: @message.account_id,
|
|
file_type: file_content_type(message_type),
|
|
file: {
|
|
io: attachment_file,
|
|
filename: attachment_file.original_filename,
|
|
content_type: attachment_file.content_type
|
|
}
|
|
)
|
|
end
|
|
|
|
def attach_location
|
|
location = messages_data.first['location']
|
|
location_name = location['name'] ? "#{location['name']}, #{location['address']}" : ''
|
|
@message.attachments.new(
|
|
account_id: @message.account_id,
|
|
file_type: file_content_type(message_type),
|
|
coordinates_lat: location['latitude'],
|
|
coordinates_long: location['longitude'],
|
|
fallback_title: location_name,
|
|
external_url: location['url']
|
|
)
|
|
end
|
|
|
|
def create_message(message, source_id: nil)
|
|
content_attrs = outgoing_echo ? { external_echo: true } : {}
|
|
content_attrs[:in_reply_to_external_id] = @in_reply_to_external_id if @in_reply_to_external_id.present?
|
|
|
|
@message = @conversation.messages.build(
|
|
content: message_content(message),
|
|
account_id: @inbox.account_id,
|
|
inbox_id: @inbox.id,
|
|
message_type: outgoing_echo ? :outgoing : :incoming,
|
|
# Set status to :delivered for echo messages to prevent SendReplyJob from trying to send them
|
|
status: outgoing_echo ? :delivered : :sent,
|
|
sender: outgoing_echo ? nil : @contact,
|
|
source_id: (source_id || message[:id]).to_s,
|
|
content_attributes: content_attrs
|
|
)
|
|
end
|
|
|
|
def attach_contact(contact)
|
|
phones = contact[:phones]
|
|
phones = [{ phone: 'Phone number is not available' }] if phones.blank?
|
|
|
|
name_info = contact['name'] || {}
|
|
contact_meta = {
|
|
firstName: name_info['first_name'],
|
|
lastName: name_info['last_name']
|
|
}.compact
|
|
|
|
phones.each do |phone|
|
|
@message.attachments.new(
|
|
account_id: @message.account_id,
|
|
file_type: file_content_type(message_type),
|
|
fallback_title: phone[:phone].to_s,
|
|
meta: contact_meta
|
|
)
|
|
end
|
|
end
|
|
|
|
def update_contact_with_profile_name(contact_params)
|
|
profile_name = contact_params.dig(:profile, :name)
|
|
return if profile_name.blank?
|
|
return if @contact.name == profile_name
|
|
|
|
# Only update if current name exactly matches the phone number or formatted phone number
|
|
return unless contact_name_matches_phone_number?
|
|
|
|
@contact.update!(name: profile_name)
|
|
end
|
|
|
|
def contact_name_matches_phone_number?
|
|
phone_number = "+#{messages_data.first[:from]}"
|
|
formatted_phone_number = TelephoneNumber.parse(phone_number).international_number
|
|
@contact.name == phone_number || @contact.name == formatted_phone_number
|
|
end
|
|
end
|