From cc86b8c7f1dff9b47fcdfce76d4c21fc5e3b4149 Mon Sep 17 00:00:00 2001 From: Sojan Jose Date: Fri, 5 Dec 2025 13:02:53 -0800 Subject: [PATCH] fix: stream attachment handling in workers (#12870) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit We’ve been watching Sidekiq workers climb from ~600 MB at boot to 1.4–1.5 GB after an hour whenever attachment-heavy jobs run. This PR is an experiment to curb that growth by streaming attachments instead of loading the whole blob into Ruby: reply-mailer inline attachments, Telegram uploads, and audio transcriptions now read/write in chunks. If this keeps RSS stable in production we’ll keep it; otherwise we’ll roll it back and keep digging --- .rubocop.yml | 7 +++ app/jobs/data_import_job.rb | 51 +++++++++++++------ ...ersation_reply_mailer_attachment_helper.rb | 51 +++++++++++++++++++ .../conversation_reply_mailer_helper.rb | 30 +---------- .../telegram/send_attachments_service.rb | 11 ++-- .../captain/llm/pdf_processing_service.rb | 12 +++-- .../messages/audio_transcription_service.rb | 36 ++++++++----- .../slack/send_on_slack_service.rb | 31 ++++++++--- rubocop/attachment_download.rb | 17 +++++++ .../llm/pdf_processing_service_spec.rb | 7 +-- spec/jobs/data_import_job_spec.rb | 9 +++- .../slack/send_on_slack_service_spec.rb | 15 ++++++ 12 files changed, 203 insertions(+), 74 deletions(-) create mode 100644 app/mailers/conversation_reply_mailer_attachment_helper.rb create mode 100644 rubocop/attachment_download.rb diff --git a/.rubocop.yml b/.rubocop.yml index 22c4220d9..d87f08bfd 100644 --- a/.rubocop.yml +++ b/.rubocop.yml @@ -7,6 +7,7 @@ plugins: require: - ./rubocop/use_from_email.rb - ./rubocop/custom_cop_location.rb + - ./rubocop/attachment_download.rb - ./rubocop/one_class_per_file.rb Layout/LineLength: @@ -41,6 +42,12 @@ Style/SymbolArray: Style/OpenStructUse: Enabled: false +Chatwoot/AttachmentDownload: + Enabled: true + Exclude: + - 'spec/**/*' + - 'test/**/*' + Style/OptionalBooleanParameter: Exclude: - 'app/services/email_templates/db_resolver_service.rb' diff --git a/app/jobs/data_import_job.rb b/app/jobs/data_import_job.rb index 6146336fa..db1411e68 100644 --- a/app/jobs/data_import_job.rb +++ b/app/jobs/data_import_job.rb @@ -30,21 +30,15 @@ class DataImportJob < ApplicationJob def parse_csv_and_build_contacts contacts = [] rejected_contacts = [] - # Ensuring that importing non utf-8 characters will not throw error - data = @data_import.import_file.download - utf8_data = data.force_encoding('UTF-8') - # Ensure that the data is valid UTF-8, preserving valid characters - clean_data = utf8_data.valid_encoding? ? utf8_data : utf8_data.encode('UTF-16le', invalid: :replace, replace: '').encode('UTF-8') - - csv = CSV.parse(clean_data, headers: true) - - csv.each do |row| - current_contact = @contact_manager.build_contact(row.to_h.with_indifferent_access) - if current_contact.valid? - contacts << current_contact - else - append_rejected_contact(row, current_contact, rejected_contacts) + with_import_file do |file| + csv_reader(file).each do |row| + current_contact = @contact_manager.build_contact(row.to_h.with_indifferent_access) + if current_contact.valid? + contacts << current_contact + else + append_rejected_contact(row, current_contact, rejected_contacts) + end end end @@ -75,7 +69,7 @@ class DataImportJob < ApplicationJob end def generate_csv_data(rejected_contacts) - headers = CSV.parse(@data_import.import_file.download, headers: true).headers + headers = csv_headers headers << 'errors' return if rejected_contacts.blank? @@ -99,4 +93,31 @@ class DataImportJob < ApplicationJob def send_import_failed_notification_to_admin AdministratorNotifications::AccountNotificationMailer.with(account: @data_import.account).contact_import_failed.deliver_later end + + def csv_headers + header_row = nil + with_import_file do |file| + header_row = csv_reader(file).first + end + header_row&.headers || [] + end + + def csv_reader(file) + file.rewind + raw_data = file.read + utf8_data = raw_data.force_encoding('UTF-8') + clean_data = utf8_data.valid_encoding? ? utf8_data : utf8_data.encode('UTF-16le', invalid: :replace, replace: '').encode('UTF-8') + + CSV.new(StringIO.new(clean_data), headers: true) + end + + def with_import_file + temp_dir = Rails.root.join('tmp/imports') + FileUtils.mkdir_p(temp_dir) + + @data_import.import_file.open(tmpdir: temp_dir) do |file| + file.binmode + yield file + end + end end diff --git a/app/mailers/conversation_reply_mailer_attachment_helper.rb b/app/mailers/conversation_reply_mailer_attachment_helper.rb new file mode 100644 index 000000000..f4c0e2282 --- /dev/null +++ b/app/mailers/conversation_reply_mailer_attachment_helper.rb @@ -0,0 +1,51 @@ +# Handles attachment processing for ConversationReplyMailer flows. +module ConversationReplyMailerAttachmentHelper + private + + def process_attachments_as_files_for_email_reply + # Attachment processing for direct email replies (when replying to a single message) + # + # How attachments are handled: + # 1. Total file size (<20MB): Added directly to the email as proper attachments + # 2. Total file size (>20MB): Added to @large_attachments to be displayed as links in the email + + @options[:attachments] = [] + @large_attachments = [] + current_total_size = 0 + + @message.attachments.each do |attachment| + current_total_size = handle_attachment_inline(current_total_size, attachment) + end + end + + def read_blob_content(blob) + buffer = +'' + blob.open do |file| + while (chunk = file.read(64.kilobytes)) + buffer << chunk + end + end + buffer + end + + def handle_attachment_inline(current_total_size, attachment) + blob = attachment.file.blob + return current_total_size if blob.blank? + + file_size = blob.byte_size + attachment_name = attachment.file.filename.to_s + + if current_total_size + file_size <= 20.megabytes + content = read_blob_content(blob) + mail.attachments[attachment_name] = { + mime_type: attachment.file.content_type || 'application/octet-stream', + content: content + } + @options[:attachments] << { name: attachment_name } + current_total_size + file_size + else + @large_attachments << attachment + current_total_size + end + end +end diff --git a/app/mailers/conversation_reply_mailer_helper.rb b/app/mailers/conversation_reply_mailer_helper.rb index 2fe3695f9..cce236ec7 100644 --- a/app/mailers/conversation_reply_mailer_helper.rb +++ b/app/mailers/conversation_reply_mailer_helper.rb @@ -1,4 +1,6 @@ module ConversationReplyMailerHelper + include ConversationReplyMailerAttachmentHelper + def prepare_mail(cc_bcc_enabled) @options = { to: to_emails, @@ -27,34 +29,6 @@ module ConversationReplyMailerHelper mail(@options) end - def process_attachments_as_files_for_email_reply - # Attachment processing for direct email replies (when replying to a single message) - # - # How attachments are handled: - # 1. Total file size (<20MB): Added directly to the email as proper attachments - # 2. Total file size (>20MB): Added to @large_attachments to be displayed as links in the email - - @options[:attachments] = [] - @large_attachments = [] - current_total_size = 0 - - @message.attachments.each do |attachment| - raw_data = attachment.file.download - attachment_name = attachment.file.filename.to_s - file_size = raw_data.bytesize - - # Attach files directly until we hit 20MB total - # After reaching 20MB, send remaining files as links - if current_total_size + file_size <= 20.megabytes - mail.attachments[attachment_name] = raw_data - @options[:attachments] << { name: attachment_name } - current_total_size += file_size - else - @large_attachments << attachment - end - end - end - private def oauth_smtp_settings diff --git a/app/services/telegram/send_attachments_service.rb b/app/services/telegram/send_attachments_service.rb index e214fe78f..5ba8fd4ae 100644 --- a/app/services/telegram/send_attachments_service.rb +++ b/app/services/telegram/send_attachments_service.rb @@ -96,11 +96,16 @@ class Telegram::SendAttachmentsService # Telegram picks up the file name from original field name, so we need to save the file with the original name. # Hence not using Tempfile here. def save_attachment_to_tempfile(attachment) - raw_data = attachment.file.download - temp_dir = Rails.root.join('tmp/uploads') + temp_dir = Rails.root.join('tmp/uploads', "telegram-#{attachment.message_id}") FileUtils.mkdir_p(temp_dir) temp_file_path = File.join(temp_dir, attachment.file.filename.to_s) - File.write(temp_file_path, raw_data, mode: 'wb') + + File.open(temp_file_path, 'wb') do |file| + attachment.file.blob.open do |blob_file| + IO.copy_stream(blob_file, file) + end + end + temp_file_path end diff --git a/enterprise/app/services/captain/llm/pdf_processing_service.rb b/enterprise/app/services/captain/llm/pdf_processing_service.rb index 3ab616577..55177b78d 100644 --- a/enterprise/app/services/captain/llm/pdf_processing_service.rb +++ b/enterprise/app/services/captain/llm/pdf_processing_service.rb @@ -29,12 +29,16 @@ class Captain::Llm::PdfProcessingService < Llm::LegacyBaseOpenAiService end end - def with_tempfile(&) + def with_tempfile Tempfile.create(['pdf_upload', '.pdf'], binmode: true) do |temp_file| - temp_file.write(document.pdf_file.download) - temp_file.close + document.pdf_file.blob.open do |blob_file| + IO.copy_stream(blob_file, temp_file) + end - File.open(temp_file.path, 'rb', &) + temp_file.flush + temp_file.rewind + + yield temp_file end end end diff --git a/enterprise/app/services/messages/audio_transcription_service.rb b/enterprise/app/services/messages/audio_transcription_service.rb index a200c1496..d3cc91376 100644 --- a/enterprise/app/services/messages/audio_transcription_service.rb +++ b/enterprise/app/services/messages/audio_transcription_service.rb @@ -27,10 +27,16 @@ class Messages::AudioTranscriptionService < Llm::LegacyBaseOpenAiService end def fetch_audio_file - temp_dir = Rails.root.join('tmp/uploads') + temp_dir = Rails.root.join('tmp/uploads/audio-transcriptions') FileUtils.mkdir_p(temp_dir) - temp_file_path = File.join(temp_dir, attachment.file.filename.to_s) - File.write(temp_file_path, attachment.file.download, mode: 'wb') + temp_file_path = File.join(temp_dir, "#{attachment.file.blob.key}-#{attachment.file.filename}") + + File.open(temp_file_path, 'wb') do |file| + attachment.file.blob.open do |blob_file| + IO.copy_stream(blob_file, file) + end + end + temp_file_path end @@ -40,18 +46,24 @@ class Messages::AudioTranscriptionService < Llm::LegacyBaseOpenAiService temp_file_path = fetch_audio_file - response = @client.audio.transcribe( - parameters: { - model: 'whisper-1', - file: File.open(temp_file_path), - temperature: 0.4 - } - ) + response_text = nil + + File.open(temp_file_path, 'rb') do |file| + response = @client.audio.transcribe( + parameters: { + model: 'whisper-1', + file: file, + temperature: 0.4 + } + ) + + response_text = response['text'] + end FileUtils.rm_f(temp_file_path) - update_transcription(response['text']) - response['text'] + update_transcription(response_text) + response_text end def update_transcription(transcribed_text) diff --git a/lib/integrations/slack/send_on_slack_service.rb b/lib/integrations/slack/send_on_slack_service.rb index 46c111a83..4e959c469 100644 --- a/lib/integrations/slack/send_on_slack_service.rb +++ b/lib/integrations/slack/send_on_slack_service.rb @@ -121,8 +121,6 @@ class Integrations::Slack::SendOnSlackService < Base::SendOnChannelService end def upload_files - return unless message.attachments.any? - files = build_files_array return if files.empty? @@ -136,6 +134,8 @@ class Integrations::Slack::SendOnSlackService < Base::SendOnChannelService Rails.logger.info "slack_upload_result: #{result}" rescue Slack::Web::Api::Errors::SlackError => e Rails.logger.error "Failed to upload files: #{e.message}" + ensure + files.each { |file| file[:content]&.clear } end end @@ -143,14 +143,31 @@ class Integrations::Slack::SendOnSlackService < Base::SendOnChannelService message.attachments.filter_map do |attachment| next unless attachment.with_attached_file? - { - filename: attachment.file.filename.to_s, - content: attachment.file.download, - title: attachment.file.filename.to_s - } + build_file_payload(attachment) end end + def build_file_payload(attachment) + content = download_attachment_content(attachment) + return if content.blank? + + { + filename: attachment.file.filename.to_s, + content: content, + title: attachment.file.filename.to_s + } + end + + def download_attachment_content(attachment) + buffer = +'' + attachment.file.blob.open do |file| + while (chunk = file.read(64.kilobytes)) + buffer << chunk + end + end + buffer + end + def sender_name(sender) sender.try(:name) ? "#{sender.try(:name)} (#{sender_type(sender)})" : sender_type(sender) end diff --git a/rubocop/attachment_download.rb b/rubocop/attachment_download.rb new file mode 100644 index 000000000..14c56203a --- /dev/null +++ b/rubocop/attachment_download.rb @@ -0,0 +1,17 @@ +require 'rubocop' + +module RuboCop::Cop::Chatwoot; end + +class RuboCop::Cop::Chatwoot::AttachmentDownload < RuboCop::Cop::Base + MSG = 'Avoid calling `.file/.blob.download`; use `blob.open` or streaming IO instead.'.freeze + + def_node_matcher :unsafe_download?, <<~PATTERN + (send (send _ {:file :blob}) :download ...) + PATTERN + + def on_send(node) + return unless unsafe_download?(node) + + add_offense(node.loc.selector, message: MSG) + end +end diff --git a/spec/enterprise/services/captain/llm/pdf_processing_service_spec.rb b/spec/enterprise/services/captain/llm/pdf_processing_service_spec.rb index 9dc416685..fd519dcdd 100644 --- a/spec/enterprise/services/captain/llm/pdf_processing_service_spec.rb +++ b/spec/enterprise/services/captain/llm/pdf_processing_service_spec.rb @@ -28,13 +28,14 @@ RSpec.describe Captain::Llm::PdfProcessingService do context 'when uploading PDF to OpenAI' do let(:mock_client) { instance_double(OpenAI::Client) } let(:pdf_content) { 'PDF content' } + let(:blob_double) { instance_double(ActiveStorage::Blob) } + let(:pdf_file) { instance_double(ActiveStorage::Attachment) } before do allow(document).to receive(:openai_file_id).and_return(nil) - - # Use a simple double for ActiveStorage since it's a complex Rails object - pdf_file = double('pdf_file', download: pdf_content) # rubocop:disable RSpec/VerifiedDoubles allow(document).to receive(:pdf_file).and_return(pdf_file) + allow(pdf_file).to receive(:blob).and_return(blob_double) + allow(blob_double).to receive(:open).and_yield(StringIO.new(pdf_content)) allow(OpenAI::Client).to receive(:new).and_return(mock_client) # Use a simple double for OpenAI::Files as it may not be loaded diff --git a/spec/jobs/data_import_job_spec.rb b/spec/jobs/data_import_job_spec.rb index fb1057f4a..19178b177 100644 --- a/spec/jobs/data_import_job_spec.rb +++ b/spec/jobs/data_import_job_spec.rb @@ -15,8 +15,11 @@ RSpec.describe DataImportJob do describe 'retrying the job' do context 'when ActiveStorage::FileNotFoundError is raised' do + let(:import_file_double) { instance_double(ActiveStorage::Blob) } + before do - allow(data_import.import_file).to receive(:download).and_raise(ActiveStorage::FileNotFoundError) + allow(data_import).to receive(:import_file).and_return(import_file_double) + allow(import_file_double).to receive(:open).and_raise(ActiveStorage::FileNotFoundError) end it 'retries the job' do @@ -158,7 +161,9 @@ RSpec.describe DataImportJob do end before do - allow(data_import.import_file).to receive(:download).and_return(invalid_csv_content) + import_file_double = instance_double(ActiveStorage::Blob) + allow(data_import).to receive(:import_file).and_return(import_file_double) + allow(import_file_double).to receive(:open).and_yield(StringIO.new(invalid_csv_content)) end it 'does not import any data and handles the MalformedCSVError' do diff --git a/spec/lib/integrations/slack/send_on_slack_service_spec.rb b/spec/lib/integrations/slack/send_on_slack_service_spec.rb index 87d58b79b..5d75a8c2b 100644 --- a/spec/lib/integrations/slack/send_on_slack_service_spec.rb +++ b/spec/lib/integrations/slack/send_on_slack_service_spec.rb @@ -206,6 +206,21 @@ describe Integrations::Slack::SendOnSlackService do expect(message.attachments.count).to eq 2 end + it 'streams attachment blobs and uploads only once' do + expect(slack_client).to receive(:chat_postMessage).and_return(slack_message) + + attachment = message.attachments.new(account_id: message.account_id, file_type: :image) + attachment.file.attach(io: Rails.root.join('spec/assets/avatar.png').open, filename: 'avatar.png', content_type: 'image/png') + blob = attachment.file.blob + allow(blob).to receive(:open).and_call_original + + expect(blob).to receive(:open).and_call_original + expect(slack_client).to receive(:files_upload_v2).once.and_return(file_attachment) + + message.save! + builder.perform + end + it 'handles file upload errors gracefully' do expect(slack_client).to receive(:chat_postMessage).with( channel: hook.reference_id,