feat: Add configurable interval for IMAP sync (#9302)
This commit is contained in:
@@ -3,13 +3,13 @@ require 'net/imap'
|
|||||||
class Inboxes::FetchImapEmailsJob < MutexApplicationJob
|
class Inboxes::FetchImapEmailsJob < MutexApplicationJob
|
||||||
queue_as :scheduled_jobs
|
queue_as :scheduled_jobs
|
||||||
|
|
||||||
def perform(channel)
|
def perform(channel, interval = 1)
|
||||||
return unless should_fetch_email?(channel)
|
return unless should_fetch_email?(channel)
|
||||||
|
|
||||||
key = format(::Redis::Alfred::EMAIL_MESSAGE_MUTEX, inbox_id: channel.inbox.id)
|
key = format(::Redis::Alfred::EMAIL_MESSAGE_MUTEX, inbox_id: channel.inbox.id)
|
||||||
|
|
||||||
with_lock(key, 5.minutes) do
|
with_lock(key, 5.minutes) do
|
||||||
process_email_for_channel(channel)
|
process_email_for_channel(channel, interval)
|
||||||
end
|
end
|
||||||
rescue *ExceptionList::IMAP_EXCEPTIONS => e
|
rescue *ExceptionList::IMAP_EXCEPTIONS => e
|
||||||
Rails.logger.error "Authorization error for email channel - #{channel.inbox.id} : #{e.message}"
|
Rails.logger.error "Authorization error for email channel - #{channel.inbox.id} : #{e.message}"
|
||||||
@@ -28,11 +28,11 @@ class Inboxes::FetchImapEmailsJob < MutexApplicationJob
|
|||||||
channel.imap_enabled? && !channel.reauthorization_required?
|
channel.imap_enabled? && !channel.reauthorization_required?
|
||||||
end
|
end
|
||||||
|
|
||||||
def process_email_for_channel(channel)
|
def process_email_for_channel(channel, interval)
|
||||||
inbound_emails = if channel.microsoft?
|
inbound_emails = if channel.microsoft?
|
||||||
Imap::MicrosoftFetchEmailService.new(channel: channel).perform
|
Imap::MicrosoftFetchEmailService.new(channel: channel, interval: interval).perform
|
||||||
else
|
else
|
||||||
Imap::FetchEmailService.new(channel: channel).perform
|
Imap::FetchEmailService.new(channel: channel, interval: interval).perform
|
||||||
end
|
end
|
||||||
inbound_emails.map do |inbound_mail|
|
inbound_emails.map do |inbound_mail|
|
||||||
process_mail(inbound_mail, channel)
|
process_mail(inbound_mail, channel)
|
||||||
|
|||||||
@@ -1,7 +1,7 @@
|
|||||||
require 'net/imap'
|
require 'net/imap'
|
||||||
|
|
||||||
class Imap::BaseFetchEmailService
|
class Imap::BaseFetchEmailService
|
||||||
pattr_initialize [:channel!]
|
pattr_initialize [:channel!, :interval]
|
||||||
|
|
||||||
def fetch_emails
|
def fetch_emails
|
||||||
# Override this method
|
# Override this method
|
||||||
@@ -99,10 +99,10 @@ class Imap::BaseFetchEmailService
|
|||||||
end
|
end
|
||||||
|
|
||||||
# Sends a SEARCH command to search the mailbox for messages that were
|
# Sends a SEARCH command to search the mailbox for messages that were
|
||||||
# created between yesterday and today and returns message sequence numbers.
|
# created between yesterday (or given date) and today and returns message sequence numbers.
|
||||||
# Return <message set>
|
# Return <message set>
|
||||||
def fetch_available_mail_sequence_numbers
|
def fetch_available_mail_sequence_numbers
|
||||||
imap_client.search(['SINCE', yesterday])
|
imap_client.search(['SINCE', since])
|
||||||
end
|
end
|
||||||
|
|
||||||
def build_imap_client
|
def build_imap_client
|
||||||
@@ -123,7 +123,8 @@ class Imap::BaseFetchEmailService
|
|||||||
Mail.read_from_string(raw_email_content)
|
Mail.read_from_string(raw_email_content)
|
||||||
end
|
end
|
||||||
|
|
||||||
def yesterday
|
def since
|
||||||
(Time.zone.today - 1).strftime('%d-%b-%Y')
|
previous_day = Time.zone.today - (interval || 1).to_i
|
||||||
|
previous_day.strftime('%d-%b-%Y')
|
||||||
end
|
end
|
||||||
end
|
end
|
||||||
|
|||||||
@@ -41,18 +41,27 @@ RSpec.describe Inboxes::FetchImapEmailsJob do
|
|||||||
context 'when the channel is regular imap' do
|
context 'when the channel is regular imap' do
|
||||||
it 'calls the imap fetch service' do
|
it 'calls the imap fetch service' do
|
||||||
fetch_service = double
|
fetch_service = double
|
||||||
allow(Imap::FetchEmailService).to receive(:new).with(channel: imap_email_channel).and_return(fetch_service)
|
allow(Imap::FetchEmailService).to receive(:new).with(channel: imap_email_channel, interval: 1).and_return(fetch_service)
|
||||||
allow(fetch_service).to receive(:perform).and_return([])
|
allow(fetch_service).to receive(:perform).and_return([])
|
||||||
|
|
||||||
described_class.perform_now(imap_email_channel)
|
described_class.perform_now(imap_email_channel)
|
||||||
expect(fetch_service).to have_received(:perform)
|
expect(fetch_service).to have_received(:perform)
|
||||||
end
|
end
|
||||||
|
|
||||||
|
it 'calls the imap fetch service with the correct interval' do
|
||||||
|
fetch_service = double
|
||||||
|
allow(Imap::FetchEmailService).to receive(:new).with(channel: imap_email_channel, interval: 4).and_return(fetch_service)
|
||||||
|
allow(fetch_service).to receive(:perform).and_return([])
|
||||||
|
|
||||||
|
described_class.perform_now(imap_email_channel, 4)
|
||||||
|
expect(fetch_service).to have_received(:perform)
|
||||||
|
end
|
||||||
end
|
end
|
||||||
|
|
||||||
context 'when the channel is Microsoft' do
|
context 'when the channel is Microsoft' do
|
||||||
it 'calls the Microsoft fetch service' do
|
it 'calls the Microsoft fetch service' do
|
||||||
fetch_service = double
|
fetch_service = double
|
||||||
allow(Imap::MicrosoftFetchEmailService).to receive(:new).with(channel: microsoft_imap_email_channel).and_return(fetch_service)
|
allow(Imap::MicrosoftFetchEmailService).to receive(:new).with(channel: microsoft_imap_email_channel, interval: 1).and_return(fetch_service)
|
||||||
allow(fetch_service).to receive(:perform).and_return([])
|
allow(fetch_service).to receive(:perform).and_return([])
|
||||||
|
|
||||||
described_class.perform_now(microsoft_imap_email_channel)
|
described_class.perform_now(microsoft_imap_email_channel)
|
||||||
@@ -62,7 +71,7 @@ RSpec.describe Inboxes::FetchImapEmailsJob do
|
|||||||
|
|
||||||
context 'when IMAP connection errors out' do
|
context 'when IMAP connection errors out' do
|
||||||
it 'mark the connection for authorization required' do
|
it 'mark the connection for authorization required' do
|
||||||
allow(Imap::FetchEmailService).to receive(:new).with(channel: imap_email_channel).and_raise(Errno::ECONNREFUSED)
|
allow(Imap::FetchEmailService).to receive(:new).with(channel: imap_email_channel, interval: 1).and_raise(Errno::ECONNREFUSED)
|
||||||
allow(Redis::Alfred).to receive(:incr)
|
allow(Redis::Alfred).to receive(:incr)
|
||||||
|
|
||||||
expect(Redis::Alfred).to receive(:incr).with("AUTHORIZATION_ERROR_COUNT:channel_email:#{imap_email_channel.id}")
|
expect(Redis::Alfred).to receive(:incr).with("AUTHORIZATION_ERROR_COUNT:channel_email:#{imap_email_channel.id}")
|
||||||
@@ -80,14 +89,14 @@ RSpec.describe Inboxes::FetchImapEmailsJob do
|
|||||||
allow(Imap::ImapMailbox).to receive(:new).and_return(mailbox)
|
allow(Imap::ImapMailbox).to receive(:new).and_return(mailbox)
|
||||||
allow(ChatwootExceptionTracker).to receive(:new).and_return(exception_tracker)
|
allow(ChatwootExceptionTracker).to receive(:new).and_return(exception_tracker)
|
||||||
|
|
||||||
allow(Imap::FetchEmailService).to receive(:new).with(channel: imap_email_channel).and_return(fetch_service)
|
allow(Imap::FetchEmailService).to receive(:new).with(channel: imap_email_channel, interval: 1).and_return(fetch_service)
|
||||||
allow(fetch_service).to receive(:perform).and_return([inbound_mail])
|
allow(fetch_service).to receive(:perform).and_return([inbound_mail])
|
||||||
end
|
end
|
||||||
|
|
||||||
it 'calls the mailbox to create emails' do
|
it 'calls the mailbox to create emails' do
|
||||||
allow(mailbox).to receive(:process)
|
allow(mailbox).to receive(:process)
|
||||||
|
|
||||||
expect(Imap::FetchEmailService).to receive(:new).with(channel: imap_email_channel).and_return(fetch_service)
|
expect(Imap::FetchEmailService).to receive(:new).with(channel: imap_email_channel, interval: 1).and_return(fetch_service)
|
||||||
expect(fetch_service).to receive(:perform).and_return([inbound_mail])
|
expect(fetch_service).to receive(:perform).and_return([inbound_mail])
|
||||||
expect(mailbox).to receive(:process).with(inbound_mail, imap_email_channel)
|
expect(mailbox).to receive(:process).with(inbound_mail, imap_email_channel)
|
||||||
|
|
||||||
|
|||||||
@@ -50,5 +50,31 @@ RSpec.describe Imap::MicrosoftFetchEmailService do
|
|||||||
end
|
end
|
||||||
end
|
end
|
||||||
end
|
end
|
||||||
|
|
||||||
|
context 'when the interval is passed during an IMAP Sync' do
|
||||||
|
it 'fetches the emails based on the interval specified in the job' do
|
||||||
|
travel_to '26.10.2020 10:00'.to_datetime do
|
||||||
|
email_object = create_inbound_email_from_fixture('only_text.eml')
|
||||||
|
email_header = Net::IMAP::FetchData.new(1, 'BODY[HEADER]' => eml_content_with_message_id)
|
||||||
|
imap_fetch_mail = Net::IMAP::FetchData.new(1, 'RFC822' => eml_content_with_message_id)
|
||||||
|
|
||||||
|
allow(imap).to receive(:search).with(%w[SINCE 18-Oct-2020]).and_return([1])
|
||||||
|
allow(imap).to receive(:fetch).with([1], 'BODY.PEEK[HEADER]').and_return([email_header])
|
||||||
|
allow(imap).to receive(:fetch).with(1, 'RFC822').and_return([imap_fetch_mail])
|
||||||
|
allow(imap).to receive(:logout)
|
||||||
|
|
||||||
|
result = described_class.new(channel: microsoft_channel, interval: 8).perform
|
||||||
|
|
||||||
|
expect(refresh_token_service).to have_received(:access_token)
|
||||||
|
|
||||||
|
expect(result.length).to eq 1
|
||||||
|
expect(result[0].message_id).to eq email_object.message_id
|
||||||
|
expect(imap).to have_received(:search).with(%w[SINCE 18-Oct-2020])
|
||||||
|
expect(imap).to have_received(:fetch).with([1], 'BODY.PEEK[HEADER]')
|
||||||
|
expect(imap).to have_received(:fetch).with(1, 'RFC822')
|
||||||
|
expect(logger).to have_received(:info).with("[IMAP::FETCH_EMAIL_SERVICE] Fetching mails from #{microsoft_channel.email}, found 1.")
|
||||||
|
end
|
||||||
|
end
|
||||||
|
end
|
||||||
end
|
end
|
||||||
end
|
end
|
||||||
|
|||||||
Reference in New Issue
Block a user