fix(refactor): Cleanup the specs and the logic for FetchIMAP job (#8766)
This commit is contained in:
@@ -7,6 +7,7 @@ class Inboxes::FetchImapEmailsJob < MutexApplicationJob
|
||||
return unless should_fetch_email?(channel)
|
||||
|
||||
key = format(::Redis::Alfred::EMAIL_MESSAGE_MUTEX, inbox_id: channel.inbox.id)
|
||||
|
||||
with_lock(key, 5.minutes) do
|
||||
process_email_for_channel(channel)
|
||||
end
|
||||
@@ -28,128 +29,16 @@ class Inboxes::FetchImapEmailsJob < MutexApplicationJob
|
||||
end
|
||||
|
||||
def process_email_for_channel(channel)
|
||||
if channel.microsoft?
|
||||
fetch_mail_for_ms_provider(channel)
|
||||
else
|
||||
fetch_mail_for_channel(channel)
|
||||
end
|
||||
# clearing old failures like timeouts since the mail is now successfully processed
|
||||
channel.reauthorized!
|
||||
end
|
||||
|
||||
def fetch_mail_for_channel(channel)
|
||||
imap_client = build_imap_client(channel, channel.imap_password, 'PLAIN')
|
||||
|
||||
message_ids_with_seq = fetch_message_ids_with_sequence(imap_client, channel)
|
||||
message_ids_with_seq.each do |message_id_with_seq|
|
||||
process_message_id(channel, imap_client, message_id_with_seq)
|
||||
end
|
||||
end
|
||||
|
||||
def process_message_id(channel, imap_client, message_id_with_seq)
|
||||
seq_no, message_id = message_id_with_seq
|
||||
|
||||
return if email_already_present?(channel, message_id)
|
||||
|
||||
if message_id.blank?
|
||||
Rails.logger.info "[IMAP::FETCH_EMAIL_SERVICE] Empty message id for #{channel.email} with seq no. <#{seq_no}>."
|
||||
return
|
||||
end
|
||||
|
||||
# Fetch the original mail content using the sequence no
|
||||
mail_str = imap_client.fetch(seq_no, 'RFC822')[0].attr['RFC822']
|
||||
|
||||
if mail_str.blank?
|
||||
Rails.logger.info "[IMAP::FETCH_EMAIL_SERVICE] Fetch failed for #{channel.email} with message-id <#{message_id}>."
|
||||
return
|
||||
end
|
||||
|
||||
inbound_mail = build_mail_from_string(mail_str)
|
||||
mail_info_logger(channel, inbound_mail, seq_no)
|
||||
process_mail(inbound_mail, channel)
|
||||
end
|
||||
|
||||
# Sends a FETCH command to retrieve data associated with a message in the mailbox.
|
||||
# You can send batches of message sequence number in `.fetch` method.
|
||||
def fetch_message_ids_with_sequence(imap_client, channel)
|
||||
seq_nums = fetch_available_mail_sequence_numbers(imap_client)
|
||||
|
||||
Rails.logger.info "[IMAP::FETCH_EMAIL_SERVICE] Fetching mails from #{channel.email}, found #{seq_nums.length}."
|
||||
|
||||
message_ids_with_seq = []
|
||||
seq_nums.each_slice(10).each do |batch|
|
||||
# Fetch only message-id only without mail body or contents.
|
||||
batch_message_ids = imap_client.fetch(batch, 'BODY.PEEK[HEADER]')
|
||||
|
||||
# .fetch returns an array of Net::IMAP::FetchData or nil
|
||||
# (instead of an empty array) if there is no matching message.
|
||||
# Check
|
||||
if batch_message_ids.blank?
|
||||
Rails.logger.info "[IMAP::FETCH_EMAIL_SERVICE] Fetching the batch failed for #{channel.email}."
|
||||
next
|
||||
end
|
||||
|
||||
batch_message_ids.each do |data|
|
||||
message_id = build_mail_from_string(data.attr['BODY[HEADER]']).message_id
|
||||
message_ids_with_seq.push([data.seqno, message_id])
|
||||
end
|
||||
end
|
||||
|
||||
message_ids_with_seq
|
||||
end
|
||||
|
||||
# Sends a SEARCH command to search the mailbox for messages that were
|
||||
# created between yesterday and today and returns message sequence numbers.
|
||||
# Return <message set>
|
||||
def fetch_available_mail_sequence_numbers(imap_client)
|
||||
imap_client.search(['SINCE', yesterday])
|
||||
end
|
||||
|
||||
def fetch_mail_for_ms_provider(channel)
|
||||
return if channel.provider_config['access_token'].blank?
|
||||
|
||||
access_token = valid_access_token channel
|
||||
|
||||
return unless access_token
|
||||
|
||||
imap_client = build_imap_client(channel, access_token, 'XOAUTH2')
|
||||
process_mails(imap_client, channel)
|
||||
end
|
||||
|
||||
def process_mails(imap_client, channel)
|
||||
fetch_available_mail_sequence_numbers(imap_client).each do |seq_no|
|
||||
inbound_mail = Mail.read_from_string imap_client.fetch(seq_no, 'RFC822')[0].attr['RFC822']
|
||||
|
||||
mail_info_logger(channel, inbound_mail, seq_no)
|
||||
|
||||
next if channel.inbox.messages.find_by(source_id: inbound_mail.message_id).present?
|
||||
|
||||
inbound_emails = if channel.microsoft?
|
||||
Imap::MicrosoftFetchEmailService.new(channel: channel).perform
|
||||
else
|
||||
Imap::FetchEmailService.new(channel: channel).perform
|
||||
end
|
||||
inbound_emails.map do |inbound_mail|
|
||||
process_mail(inbound_mail, channel)
|
||||
end
|
||||
end
|
||||
|
||||
def mail_info_logger(channel, inbound_mail, uid)
|
||||
return if Rails.env.test?
|
||||
|
||||
Rails.logger.info("
|
||||
#{channel.provider} Email id: #{inbound_mail.from} - message_source_id: #{inbound_mail.message_id} - sequence id: #{uid}")
|
||||
end
|
||||
|
||||
def build_imap_client(channel, access_token, auth_method)
|
||||
imap = Net::IMAP.new(channel.imap_address, channel.imap_port, true)
|
||||
imap.authenticate(auth_method, channel.imap_login, access_token)
|
||||
imap.select('INBOX')
|
||||
imap
|
||||
end
|
||||
|
||||
def email_already_present?(channel, message_id)
|
||||
channel.inbox.messages.find_by(source_id: message_id).present?
|
||||
end
|
||||
|
||||
def build_mail_from_string(raw_email_content)
|
||||
Mail.read_from_string(raw_email_content)
|
||||
end
|
||||
|
||||
def process_mail(inbound_mail, channel)
|
||||
Imap::ImapMailbox.new.process(inbound_mail, channel)
|
||||
rescue StandardError => e
|
||||
@@ -157,13 +46,4 @@ class Inboxes::FetchImapEmailsJob < MutexApplicationJob
|
||||
Rails.logger.error("
|
||||
#{channel.provider} Email dropped: #{inbound_mail.from} and message_source_id: #{inbound_mail.message_id}")
|
||||
end
|
||||
|
||||
# Making sure the access token is valid for microsoft provider
|
||||
def valid_access_token(channel)
|
||||
Microsoft::RefreshOauthTokenService.new(channel: channel).access_token
|
||||
end
|
||||
|
||||
def yesterday
|
||||
(Time.zone.today - 1).strftime('%d-%b-%Y')
|
||||
end
|
||||
end
|
||||
|
||||
115
app/services/imap/base_fetch_email_service.rb
Normal file
115
app/services/imap/base_fetch_email_service.rb
Normal file
@@ -0,0 +1,115 @@
|
||||
require 'net/imap'
|
||||
|
||||
class Imap::BaseFetchEmailService
|
||||
pattr_initialize [:channel!]
|
||||
|
||||
def perform
|
||||
# Override this method
|
||||
end
|
||||
|
||||
private
|
||||
|
||||
def authentication_type
|
||||
# Override this method
|
||||
end
|
||||
|
||||
def imap_password
|
||||
# Override this method
|
||||
end
|
||||
|
||||
def imap_client
|
||||
@imap_client ||= build_imap_client
|
||||
end
|
||||
|
||||
def mail_info_logger(inbound_mail, seq_no)
|
||||
return if Rails.env.test?
|
||||
|
||||
Rails.logger.info("
|
||||
#{channel.provider} Email id: #{inbound_mail.from} - message_source_id: #{inbound_mail.message_id} - sequence id: #{seq_no}")
|
||||
end
|
||||
|
||||
def email_already_present?(channel, message_id)
|
||||
channel.inbox.messages.find_by(source_id: message_id).present?
|
||||
end
|
||||
|
||||
def fetch_mail_for_channel
|
||||
message_ids_with_seq = fetch_message_ids_with_sequence
|
||||
message_ids_with_seq.filter_map do |message_id_with_seq|
|
||||
process_message_id(message_id_with_seq)
|
||||
end
|
||||
end
|
||||
|
||||
def process_message_id(message_id_with_seq)
|
||||
seq_no, message_id = message_id_with_seq
|
||||
|
||||
if message_id.blank?
|
||||
Rails.logger.info "[IMAP::FETCH_EMAIL_SERVICE] Empty message id for #{channel.email} with seq no. <#{seq_no}>."
|
||||
return
|
||||
end
|
||||
|
||||
return if email_already_present?(channel, message_id)
|
||||
|
||||
# Fetch the original mail content using the sequence no
|
||||
mail_str = imap_client.fetch(seq_no, 'RFC822')[0].attr['RFC822']
|
||||
|
||||
if mail_str.blank?
|
||||
Rails.logger.info "[IMAP::FETCH_EMAIL_SERVICE] Fetch failed for #{channel.email} with message-id <#{message_id}>."
|
||||
return
|
||||
end
|
||||
|
||||
inbound_mail = build_mail_from_string(mail_str)
|
||||
mail_info_logger(inbound_mail, seq_no)
|
||||
inbound_mail
|
||||
end
|
||||
|
||||
# Sends a FETCH command to retrieve data associated with a message in the mailbox.
|
||||
# You can send batches of message sequence number in `.fetch` method.
|
||||
def fetch_message_ids_with_sequence
|
||||
seq_nums = fetch_available_mail_sequence_numbers
|
||||
|
||||
Rails.logger.info "[IMAP::FETCH_EMAIL_SERVICE] Fetching mails from #{channel.email}, found #{seq_nums.length}."
|
||||
|
||||
message_ids_with_seq = []
|
||||
seq_nums.each_slice(10).each do |batch|
|
||||
# Fetch only message-id only without mail body or contents.
|
||||
batch_message_ids = imap_client.fetch(batch, 'BODY.PEEK[HEADER]')
|
||||
|
||||
# .fetch returns an array of Net::IMAP::FetchData or nil
|
||||
# (instead of an empty array) if there is no matching message.
|
||||
# Check
|
||||
if batch_message_ids.blank?
|
||||
Rails.logger.info "[IMAP::FETCH_EMAIL_SERVICE] Fetching the batch failed for #{channel.email}."
|
||||
next
|
||||
end
|
||||
|
||||
batch_message_ids.each do |data|
|
||||
message_id = build_mail_from_string(data.attr['BODY[HEADER]']).message_id
|
||||
message_ids_with_seq.push([data.seqno, message_id])
|
||||
end
|
||||
end
|
||||
|
||||
message_ids_with_seq
|
||||
end
|
||||
|
||||
# Sends a SEARCH command to search the mailbox for messages that were
|
||||
# created between yesterday and today and returns message sequence numbers.
|
||||
# Return <message set>
|
||||
def fetch_available_mail_sequence_numbers
|
||||
imap_client.search(['SINCE', yesterday])
|
||||
end
|
||||
|
||||
def build_imap_client
|
||||
imap = Net::IMAP.new(channel.imap_address, port: channel.imap_port, ssl: true)
|
||||
imap.authenticate(authentication_type, channel.imap_login, imap_password)
|
||||
imap.select('INBOX')
|
||||
imap
|
||||
end
|
||||
|
||||
def build_mail_from_string(raw_email_content)
|
||||
Mail.read_from_string(raw_email_content)
|
||||
end
|
||||
|
||||
def yesterday
|
||||
(Time.zone.today - 1).strftime('%d-%b-%Y')
|
||||
end
|
||||
end
|
||||
15
app/services/imap/fetch_email_service.rb
Normal file
15
app/services/imap/fetch_email_service.rb
Normal file
@@ -0,0 +1,15 @@
|
||||
class Imap::FetchEmailService < Imap::BaseFetchEmailService
|
||||
def perform
|
||||
fetch_mail_for_channel
|
||||
end
|
||||
|
||||
private
|
||||
|
||||
def authentication_type
|
||||
'PLAIN'
|
||||
end
|
||||
|
||||
def imap_password
|
||||
channel.imap_password
|
||||
end
|
||||
end
|
||||
17
app/services/imap/microsoft_fetch_email_service.rb
Normal file
17
app/services/imap/microsoft_fetch_email_service.rb
Normal file
@@ -0,0 +1,17 @@
|
||||
class Imap::MicrosoftFetchEmailService < Imap::BaseFetchEmailService
|
||||
def perform
|
||||
return if channel.provider_config['access_token'].blank?
|
||||
|
||||
fetch_mail_for_channel
|
||||
end
|
||||
|
||||
private
|
||||
|
||||
def authentication_type
|
||||
'XOAUTH2'
|
||||
end
|
||||
|
||||
def imap_password
|
||||
Microsoft::RefreshOauthTokenService.new(channel: channel).access_token
|
||||
end
|
||||
end
|
||||
Reference in New Issue
Block a user