diff --git a/app/jobs/inboxes/fetch_imap_emails_job.rb b/app/jobs/inboxes/fetch_imap_emails_job.rb index 8d3582440..b4cd640e9 100644 --- a/app/jobs/inboxes/fetch_imap_emails_job.rb +++ b/app/jobs/inboxes/fetch_imap_emails_job.rb @@ -1,17 +1,21 @@ require 'net/imap' -class Inboxes::FetchImapEmailsJob < ApplicationJob +class Inboxes::FetchImapEmailsJob < MutexApplicationJob queue_as :scheduled_jobs def perform(channel) return unless should_fetch_email?(channel) - process_email_for_channel(channel) + with_lock(::Redis::Alfred::EMAIL_MESSAGE_MUTEX, inbox_id: channel.inbox.id) do + process_email_for_channel(channel) + end rescue *ExceptionList::IMAP_EXCEPTIONS => e Rails.logger.error e channel.authorization_error! rescue EOFError, OpenSSL::SSL::SSLError, Net::IMAP::NoResponseError, Net::IMAP::BadResponseError => e Rails.logger.error e + rescue LockAcquisitionError + Rails.logger.error "Lock failed for #{inbox.id}" rescue StandardError => e ChatwootExceptionTracker.new(e, account: channel.account).capture_exception end @@ -35,33 +39,43 @@ class Inboxes::FetchImapEmailsJob < ApplicationJob def fetch_mail_for_channel(channel) imap_inbox = authenticated_imap_inbox(channel, channel.imap_password, 'PLAIN') - last_email_time = DateTime.parse(Net::IMAP.format_datetime(last_email_time(channel))) - received_mails(imap_inbox).each do |message_id| - inbound_mail = Mail.read_from_string imap_inbox.fetch(message_id, 'RFC822')[0].attr['RFC822'] + fetch_message_ids(imap_inbox, channel).each do |message_id_uid_pair| + uid, message_id = message_id_uid_pair + next if email_already_present?(channel, message_id) - mail_info_logger(channel, inbound_mail, message_id) - - next if email_already_present?(channel, inbound_mail, last_email_time) + inbound_mail = Mail.read_from_string(imap_inbox.fetch(uid, 'RFC822')[0].attr['RFC822']) + mail_info_logger(channel, inbound_mail, uid) process_mail(inbound_mail, channel) end end - def email_already_present?(channel, inbound_mail, _last_email_time) - channel.inbox.messages.find_by(source_id: inbound_mail.message_id).present? + def fetch_message_ids(imap_inbox, channel) + uids = fetch_uids(imap_inbox) + Rails.logger.info "FETCH_EMAILS_FROM: #{channel.email} - Found #{uids.length} emails \n\n\n\n" + + message_ids = [] + uids.each_slice(10).each do |batch| + uid_fetched = imap_inbox.fetch(batch, 'BODY.PEEK[HEADER.FIELDS (MESSAGE-ID)]') + # print uid_fetched + uid_fetched.each do |data| + message_id = data.attr['BODY[HEADER.FIELDS (MESSAGE-ID)]'].to_s.scan(/<(.+?)>/).flatten.first + message_ids.push([data.seqno, message_id]) + end + end + + message_ids end - def received_mails(imap_inbox) + def email_already_present?(channel, message_id) + channel.inbox.messages.find_by(source_id: message_id).present? + end + + def fetch_uids(imap_inbox) imap_inbox.search(['BEFORE', tomorrow, 'SINCE', yesterday]) end - def processed_email?(current_email, last_email_time) - return current_email.date < last_email_time if current_email.date.present? - - false - end - def fetch_mail_for_ms_provider(channel) return if channel.provider_config['access_token'].blank? @@ -75,7 +89,7 @@ class Inboxes::FetchImapEmailsJob < ApplicationJob end def process_mails(imap_inbox, channel) - received_mails(imap_inbox).each do |message_id| + fetch_uids(imap_inbox).each do |message_id| inbound_mail = Mail.read_from_string imap_inbox.fetch(message_id, 'RFC822')[0].attr['RFC822'] mail_info_logger(channel, inbound_mail, message_id) @@ -86,11 +100,11 @@ class Inboxes::FetchImapEmailsJob < ApplicationJob end end - def mail_info_logger(channel, inbound_mail, message_id) + def mail_info_logger(channel, inbound_mail, uid) return if Rails.env.test? Rails.logger.info(" - #{channel.provider} Email id: #{inbound_mail.from} and message_source_id: #{inbound_mail.message_id}, message_id: #{message_id}") + #{channel.provider} Email id: #{inbound_mail.from} - message_source_id: #{inbound_mail.message_id} - sequence id: #{uid}") end def authenticated_imap_inbox(channel, access_token, auth_method) @@ -100,18 +114,6 @@ class Inboxes::FetchImapEmailsJob < ApplicationJob imap end - def last_email_time(channel) - # we are only checking for emails in last 2 day - last_email_incoming_message = channel.inbox.messages.incoming.where('messages.created_at >= ?', 2.days.ago).last - if last_email_incoming_message.present? - time = last_email_incoming_message.content_attributes['email']['date'] - time ||= last_email_incoming_message.created_at.to_s - end - time ||= 1.hour.ago.to_s - - DateTime.parse(time) - end - def yesterday (Time.zone.today - 1).strftime('%d-%b-%Y') end diff --git a/lib/redis/redis_keys.rb b/lib/redis/redis_keys.rb index 01c1fc0b5..535d3774d 100644 --- a/lib/redis/redis_keys.rb +++ b/lib/redis/redis_keys.rb @@ -38,4 +38,5 @@ module Redis::RedisKeys FACEBOOK_MESSAGE_MUTEX = 'FB_MESSAGE_CREATE_LOCK::%s::%s'.freeze IG_MESSAGE_MUTEX = 'IG_MESSAGE_CREATE_LOCK::%s::%s'.freeze SLACK_MESSAGE_MUTEX = 'SLACK_MESSAGE_LOCK::%s::%s'.freeze + EMAIL_MESSAGE_MUTEX = 'EMAIL_CHANNEL_LOCK::%s'.freeze end