fix: stream attachment handling in workers (#12870)
We’ve been watching Sidekiq workers climb from ~600 MB at boot to 1.4–1.5 GB after an hour whenever attachment-heavy jobs run. This PR is an experiment to curb that growth by streaming attachments instead of loading the whole blob into Ruby: reply-mailer inline attachments, Telegram uploads, and audio transcriptions now read/write in chunks. If this keeps RSS stable in production we’ll keep it; otherwise we’ll roll it back and keep digging
This commit is contained in:
@@ -7,6 +7,7 @@ plugins:
|
|||||||
require:
|
require:
|
||||||
- ./rubocop/use_from_email.rb
|
- ./rubocop/use_from_email.rb
|
||||||
- ./rubocop/custom_cop_location.rb
|
- ./rubocop/custom_cop_location.rb
|
||||||
|
- ./rubocop/attachment_download.rb
|
||||||
- ./rubocop/one_class_per_file.rb
|
- ./rubocop/one_class_per_file.rb
|
||||||
|
|
||||||
Layout/LineLength:
|
Layout/LineLength:
|
||||||
@@ -41,6 +42,12 @@ Style/SymbolArray:
|
|||||||
Style/OpenStructUse:
|
Style/OpenStructUse:
|
||||||
Enabled: false
|
Enabled: false
|
||||||
|
|
||||||
|
Chatwoot/AttachmentDownload:
|
||||||
|
Enabled: true
|
||||||
|
Exclude:
|
||||||
|
- 'spec/**/*'
|
||||||
|
- 'test/**/*'
|
||||||
|
|
||||||
Style/OptionalBooleanParameter:
|
Style/OptionalBooleanParameter:
|
||||||
Exclude:
|
Exclude:
|
||||||
- 'app/services/email_templates/db_resolver_service.rb'
|
- 'app/services/email_templates/db_resolver_service.rb'
|
||||||
|
|||||||
@@ -30,21 +30,15 @@ class DataImportJob < ApplicationJob
|
|||||||
def parse_csv_and_build_contacts
|
def parse_csv_and_build_contacts
|
||||||
contacts = []
|
contacts = []
|
||||||
rejected_contacts = []
|
rejected_contacts = []
|
||||||
# Ensuring that importing non utf-8 characters will not throw error
|
|
||||||
data = @data_import.import_file.download
|
|
||||||
utf8_data = data.force_encoding('UTF-8')
|
|
||||||
|
|
||||||
# Ensure that the data is valid UTF-8, preserving valid characters
|
with_import_file do |file|
|
||||||
clean_data = utf8_data.valid_encoding? ? utf8_data : utf8_data.encode('UTF-16le', invalid: :replace, replace: '').encode('UTF-8')
|
csv_reader(file).each do |row|
|
||||||
|
current_contact = @contact_manager.build_contact(row.to_h.with_indifferent_access)
|
||||||
csv = CSV.parse(clean_data, headers: true)
|
if current_contact.valid?
|
||||||
|
contacts << current_contact
|
||||||
csv.each do |row|
|
else
|
||||||
current_contact = @contact_manager.build_contact(row.to_h.with_indifferent_access)
|
append_rejected_contact(row, current_contact, rejected_contacts)
|
||||||
if current_contact.valid?
|
end
|
||||||
contacts << current_contact
|
|
||||||
else
|
|
||||||
append_rejected_contact(row, current_contact, rejected_contacts)
|
|
||||||
end
|
end
|
||||||
end
|
end
|
||||||
|
|
||||||
@@ -75,7 +69,7 @@ class DataImportJob < ApplicationJob
|
|||||||
end
|
end
|
||||||
|
|
||||||
def generate_csv_data(rejected_contacts)
|
def generate_csv_data(rejected_contacts)
|
||||||
headers = CSV.parse(@data_import.import_file.download, headers: true).headers
|
headers = csv_headers
|
||||||
headers << 'errors'
|
headers << 'errors'
|
||||||
return if rejected_contacts.blank?
|
return if rejected_contacts.blank?
|
||||||
|
|
||||||
@@ -99,4 +93,31 @@ class DataImportJob < ApplicationJob
|
|||||||
def send_import_failed_notification_to_admin
|
def send_import_failed_notification_to_admin
|
||||||
AdministratorNotifications::AccountNotificationMailer.with(account: @data_import.account).contact_import_failed.deliver_later
|
AdministratorNotifications::AccountNotificationMailer.with(account: @data_import.account).contact_import_failed.deliver_later
|
||||||
end
|
end
|
||||||
|
|
||||||
|
def csv_headers
|
||||||
|
header_row = nil
|
||||||
|
with_import_file do |file|
|
||||||
|
header_row = csv_reader(file).first
|
||||||
|
end
|
||||||
|
header_row&.headers || []
|
||||||
|
end
|
||||||
|
|
||||||
|
def csv_reader(file)
|
||||||
|
file.rewind
|
||||||
|
raw_data = file.read
|
||||||
|
utf8_data = raw_data.force_encoding('UTF-8')
|
||||||
|
clean_data = utf8_data.valid_encoding? ? utf8_data : utf8_data.encode('UTF-16le', invalid: :replace, replace: '').encode('UTF-8')
|
||||||
|
|
||||||
|
CSV.new(StringIO.new(clean_data), headers: true)
|
||||||
|
end
|
||||||
|
|
||||||
|
def with_import_file
|
||||||
|
temp_dir = Rails.root.join('tmp/imports')
|
||||||
|
FileUtils.mkdir_p(temp_dir)
|
||||||
|
|
||||||
|
@data_import.import_file.open(tmpdir: temp_dir) do |file|
|
||||||
|
file.binmode
|
||||||
|
yield file
|
||||||
|
end
|
||||||
|
end
|
||||||
end
|
end
|
||||||
|
|||||||
51
app/mailers/conversation_reply_mailer_attachment_helper.rb
Normal file
51
app/mailers/conversation_reply_mailer_attachment_helper.rb
Normal file
@@ -0,0 +1,51 @@
|
|||||||
|
# Handles attachment processing for ConversationReplyMailer flows.
|
||||||
|
module ConversationReplyMailerAttachmentHelper
|
||||||
|
private
|
||||||
|
|
||||||
|
def process_attachments_as_files_for_email_reply
|
||||||
|
# Attachment processing for direct email replies (when replying to a single message)
|
||||||
|
#
|
||||||
|
# How attachments are handled:
|
||||||
|
# 1. Total file size (<20MB): Added directly to the email as proper attachments
|
||||||
|
# 2. Total file size (>20MB): Added to @large_attachments to be displayed as links in the email
|
||||||
|
|
||||||
|
@options[:attachments] = []
|
||||||
|
@large_attachments = []
|
||||||
|
current_total_size = 0
|
||||||
|
|
||||||
|
@message.attachments.each do |attachment|
|
||||||
|
current_total_size = handle_attachment_inline(current_total_size, attachment)
|
||||||
|
end
|
||||||
|
end
|
||||||
|
|
||||||
|
def read_blob_content(blob)
|
||||||
|
buffer = +''
|
||||||
|
blob.open do |file|
|
||||||
|
while (chunk = file.read(64.kilobytes))
|
||||||
|
buffer << chunk
|
||||||
|
end
|
||||||
|
end
|
||||||
|
buffer
|
||||||
|
end
|
||||||
|
|
||||||
|
def handle_attachment_inline(current_total_size, attachment)
|
||||||
|
blob = attachment.file.blob
|
||||||
|
return current_total_size if blob.blank?
|
||||||
|
|
||||||
|
file_size = blob.byte_size
|
||||||
|
attachment_name = attachment.file.filename.to_s
|
||||||
|
|
||||||
|
if current_total_size + file_size <= 20.megabytes
|
||||||
|
content = read_blob_content(blob)
|
||||||
|
mail.attachments[attachment_name] = {
|
||||||
|
mime_type: attachment.file.content_type || 'application/octet-stream',
|
||||||
|
content: content
|
||||||
|
}
|
||||||
|
@options[:attachments] << { name: attachment_name }
|
||||||
|
current_total_size + file_size
|
||||||
|
else
|
||||||
|
@large_attachments << attachment
|
||||||
|
current_total_size
|
||||||
|
end
|
||||||
|
end
|
||||||
|
end
|
||||||
@@ -1,4 +1,6 @@
|
|||||||
module ConversationReplyMailerHelper
|
module ConversationReplyMailerHelper
|
||||||
|
include ConversationReplyMailerAttachmentHelper
|
||||||
|
|
||||||
def prepare_mail(cc_bcc_enabled)
|
def prepare_mail(cc_bcc_enabled)
|
||||||
@options = {
|
@options = {
|
||||||
to: to_emails,
|
to: to_emails,
|
||||||
@@ -27,34 +29,6 @@ module ConversationReplyMailerHelper
|
|||||||
mail(@options)
|
mail(@options)
|
||||||
end
|
end
|
||||||
|
|
||||||
def process_attachments_as_files_for_email_reply
|
|
||||||
# Attachment processing for direct email replies (when replying to a single message)
|
|
||||||
#
|
|
||||||
# How attachments are handled:
|
|
||||||
# 1. Total file size (<20MB): Added directly to the email as proper attachments
|
|
||||||
# 2. Total file size (>20MB): Added to @large_attachments to be displayed as links in the email
|
|
||||||
|
|
||||||
@options[:attachments] = []
|
|
||||||
@large_attachments = []
|
|
||||||
current_total_size = 0
|
|
||||||
|
|
||||||
@message.attachments.each do |attachment|
|
|
||||||
raw_data = attachment.file.download
|
|
||||||
attachment_name = attachment.file.filename.to_s
|
|
||||||
file_size = raw_data.bytesize
|
|
||||||
|
|
||||||
# Attach files directly until we hit 20MB total
|
|
||||||
# After reaching 20MB, send remaining files as links
|
|
||||||
if current_total_size + file_size <= 20.megabytes
|
|
||||||
mail.attachments[attachment_name] = raw_data
|
|
||||||
@options[:attachments] << { name: attachment_name }
|
|
||||||
current_total_size += file_size
|
|
||||||
else
|
|
||||||
@large_attachments << attachment
|
|
||||||
end
|
|
||||||
end
|
|
||||||
end
|
|
||||||
|
|
||||||
private
|
private
|
||||||
|
|
||||||
def oauth_smtp_settings
|
def oauth_smtp_settings
|
||||||
|
|||||||
@@ -96,11 +96,16 @@ class Telegram::SendAttachmentsService
|
|||||||
# Telegram picks up the file name from original field name, so we need to save the file with the original name.
|
# Telegram picks up the file name from original field name, so we need to save the file with the original name.
|
||||||
# Hence not using Tempfile here.
|
# Hence not using Tempfile here.
|
||||||
def save_attachment_to_tempfile(attachment)
|
def save_attachment_to_tempfile(attachment)
|
||||||
raw_data = attachment.file.download
|
temp_dir = Rails.root.join('tmp/uploads', "telegram-#{attachment.message_id}")
|
||||||
temp_dir = Rails.root.join('tmp/uploads')
|
|
||||||
FileUtils.mkdir_p(temp_dir)
|
FileUtils.mkdir_p(temp_dir)
|
||||||
temp_file_path = File.join(temp_dir, attachment.file.filename.to_s)
|
temp_file_path = File.join(temp_dir, attachment.file.filename.to_s)
|
||||||
File.write(temp_file_path, raw_data, mode: 'wb')
|
|
||||||
|
File.open(temp_file_path, 'wb') do |file|
|
||||||
|
attachment.file.blob.open do |blob_file|
|
||||||
|
IO.copy_stream(blob_file, file)
|
||||||
|
end
|
||||||
|
end
|
||||||
|
|
||||||
temp_file_path
|
temp_file_path
|
||||||
end
|
end
|
||||||
|
|
||||||
|
|||||||
@@ -29,12 +29,16 @@ class Captain::Llm::PdfProcessingService < Llm::LegacyBaseOpenAiService
|
|||||||
end
|
end
|
||||||
end
|
end
|
||||||
|
|
||||||
def with_tempfile(&)
|
def with_tempfile
|
||||||
Tempfile.create(['pdf_upload', '.pdf'], binmode: true) do |temp_file|
|
Tempfile.create(['pdf_upload', '.pdf'], binmode: true) do |temp_file|
|
||||||
temp_file.write(document.pdf_file.download)
|
document.pdf_file.blob.open do |blob_file|
|
||||||
temp_file.close
|
IO.copy_stream(blob_file, temp_file)
|
||||||
|
end
|
||||||
|
|
||||||
File.open(temp_file.path, 'rb', &)
|
temp_file.flush
|
||||||
|
temp_file.rewind
|
||||||
|
|
||||||
|
yield temp_file
|
||||||
end
|
end
|
||||||
end
|
end
|
||||||
end
|
end
|
||||||
|
|||||||
@@ -27,10 +27,16 @@ class Messages::AudioTranscriptionService < Llm::LegacyBaseOpenAiService
|
|||||||
end
|
end
|
||||||
|
|
||||||
def fetch_audio_file
|
def fetch_audio_file
|
||||||
temp_dir = Rails.root.join('tmp/uploads')
|
temp_dir = Rails.root.join('tmp/uploads/audio-transcriptions')
|
||||||
FileUtils.mkdir_p(temp_dir)
|
FileUtils.mkdir_p(temp_dir)
|
||||||
temp_file_path = File.join(temp_dir, attachment.file.filename.to_s)
|
temp_file_path = File.join(temp_dir, "#{attachment.file.blob.key}-#{attachment.file.filename}")
|
||||||
File.write(temp_file_path, attachment.file.download, mode: 'wb')
|
|
||||||
|
File.open(temp_file_path, 'wb') do |file|
|
||||||
|
attachment.file.blob.open do |blob_file|
|
||||||
|
IO.copy_stream(blob_file, file)
|
||||||
|
end
|
||||||
|
end
|
||||||
|
|
||||||
temp_file_path
|
temp_file_path
|
||||||
end
|
end
|
||||||
|
|
||||||
@@ -40,18 +46,24 @@ class Messages::AudioTranscriptionService < Llm::LegacyBaseOpenAiService
|
|||||||
|
|
||||||
temp_file_path = fetch_audio_file
|
temp_file_path = fetch_audio_file
|
||||||
|
|
||||||
response = @client.audio.transcribe(
|
response_text = nil
|
||||||
parameters: {
|
|
||||||
model: 'whisper-1',
|
File.open(temp_file_path, 'rb') do |file|
|
||||||
file: File.open(temp_file_path),
|
response = @client.audio.transcribe(
|
||||||
temperature: 0.4
|
parameters: {
|
||||||
}
|
model: 'whisper-1',
|
||||||
)
|
file: file,
|
||||||
|
temperature: 0.4
|
||||||
|
}
|
||||||
|
)
|
||||||
|
|
||||||
|
response_text = response['text']
|
||||||
|
end
|
||||||
|
|
||||||
FileUtils.rm_f(temp_file_path)
|
FileUtils.rm_f(temp_file_path)
|
||||||
|
|
||||||
update_transcription(response['text'])
|
update_transcription(response_text)
|
||||||
response['text']
|
response_text
|
||||||
end
|
end
|
||||||
|
|
||||||
def update_transcription(transcribed_text)
|
def update_transcription(transcribed_text)
|
||||||
|
|||||||
@@ -121,8 +121,6 @@ class Integrations::Slack::SendOnSlackService < Base::SendOnChannelService
|
|||||||
end
|
end
|
||||||
|
|
||||||
def upload_files
|
def upload_files
|
||||||
return unless message.attachments.any?
|
|
||||||
|
|
||||||
files = build_files_array
|
files = build_files_array
|
||||||
return if files.empty?
|
return if files.empty?
|
||||||
|
|
||||||
@@ -136,6 +134,8 @@ class Integrations::Slack::SendOnSlackService < Base::SendOnChannelService
|
|||||||
Rails.logger.info "slack_upload_result: #{result}"
|
Rails.logger.info "slack_upload_result: #{result}"
|
||||||
rescue Slack::Web::Api::Errors::SlackError => e
|
rescue Slack::Web::Api::Errors::SlackError => e
|
||||||
Rails.logger.error "Failed to upload files: #{e.message}"
|
Rails.logger.error "Failed to upload files: #{e.message}"
|
||||||
|
ensure
|
||||||
|
files.each { |file| file[:content]&.clear }
|
||||||
end
|
end
|
||||||
end
|
end
|
||||||
|
|
||||||
@@ -143,14 +143,31 @@ class Integrations::Slack::SendOnSlackService < Base::SendOnChannelService
|
|||||||
message.attachments.filter_map do |attachment|
|
message.attachments.filter_map do |attachment|
|
||||||
next unless attachment.with_attached_file?
|
next unless attachment.with_attached_file?
|
||||||
|
|
||||||
{
|
build_file_payload(attachment)
|
||||||
filename: attachment.file.filename.to_s,
|
|
||||||
content: attachment.file.download,
|
|
||||||
title: attachment.file.filename.to_s
|
|
||||||
}
|
|
||||||
end
|
end
|
||||||
end
|
end
|
||||||
|
|
||||||
|
def build_file_payload(attachment)
|
||||||
|
content = download_attachment_content(attachment)
|
||||||
|
return if content.blank?
|
||||||
|
|
||||||
|
{
|
||||||
|
filename: attachment.file.filename.to_s,
|
||||||
|
content: content,
|
||||||
|
title: attachment.file.filename.to_s
|
||||||
|
}
|
||||||
|
end
|
||||||
|
|
||||||
|
def download_attachment_content(attachment)
|
||||||
|
buffer = +''
|
||||||
|
attachment.file.blob.open do |file|
|
||||||
|
while (chunk = file.read(64.kilobytes))
|
||||||
|
buffer << chunk
|
||||||
|
end
|
||||||
|
end
|
||||||
|
buffer
|
||||||
|
end
|
||||||
|
|
||||||
def sender_name(sender)
|
def sender_name(sender)
|
||||||
sender.try(:name) ? "#{sender.try(:name)} (#{sender_type(sender)})" : sender_type(sender)
|
sender.try(:name) ? "#{sender.try(:name)} (#{sender_type(sender)})" : sender_type(sender)
|
||||||
end
|
end
|
||||||
|
|||||||
17
rubocop/attachment_download.rb
Normal file
17
rubocop/attachment_download.rb
Normal file
@@ -0,0 +1,17 @@
|
|||||||
|
require 'rubocop'
|
||||||
|
|
||||||
|
module RuboCop::Cop::Chatwoot; end
|
||||||
|
|
||||||
|
class RuboCop::Cop::Chatwoot::AttachmentDownload < RuboCop::Cop::Base
|
||||||
|
MSG = 'Avoid calling `.file/.blob.download`; use `blob.open` or streaming IO instead.'.freeze
|
||||||
|
|
||||||
|
def_node_matcher :unsafe_download?, <<~PATTERN
|
||||||
|
(send (send _ {:file :blob}) :download ...)
|
||||||
|
PATTERN
|
||||||
|
|
||||||
|
def on_send(node)
|
||||||
|
return unless unsafe_download?(node)
|
||||||
|
|
||||||
|
add_offense(node.loc.selector, message: MSG)
|
||||||
|
end
|
||||||
|
end
|
||||||
@@ -28,13 +28,14 @@ RSpec.describe Captain::Llm::PdfProcessingService do
|
|||||||
context 'when uploading PDF to OpenAI' do
|
context 'when uploading PDF to OpenAI' do
|
||||||
let(:mock_client) { instance_double(OpenAI::Client) }
|
let(:mock_client) { instance_double(OpenAI::Client) }
|
||||||
let(:pdf_content) { 'PDF content' }
|
let(:pdf_content) { 'PDF content' }
|
||||||
|
let(:blob_double) { instance_double(ActiveStorage::Blob) }
|
||||||
|
let(:pdf_file) { instance_double(ActiveStorage::Attachment) }
|
||||||
|
|
||||||
before do
|
before do
|
||||||
allow(document).to receive(:openai_file_id).and_return(nil)
|
allow(document).to receive(:openai_file_id).and_return(nil)
|
||||||
|
|
||||||
# Use a simple double for ActiveStorage since it's a complex Rails object
|
|
||||||
pdf_file = double('pdf_file', download: pdf_content) # rubocop:disable RSpec/VerifiedDoubles
|
|
||||||
allow(document).to receive(:pdf_file).and_return(pdf_file)
|
allow(document).to receive(:pdf_file).and_return(pdf_file)
|
||||||
|
allow(pdf_file).to receive(:blob).and_return(blob_double)
|
||||||
|
allow(blob_double).to receive(:open).and_yield(StringIO.new(pdf_content))
|
||||||
|
|
||||||
allow(OpenAI::Client).to receive(:new).and_return(mock_client)
|
allow(OpenAI::Client).to receive(:new).and_return(mock_client)
|
||||||
# Use a simple double for OpenAI::Files as it may not be loaded
|
# Use a simple double for OpenAI::Files as it may not be loaded
|
||||||
|
|||||||
@@ -15,8 +15,11 @@ RSpec.describe DataImportJob do
|
|||||||
|
|
||||||
describe 'retrying the job' do
|
describe 'retrying the job' do
|
||||||
context 'when ActiveStorage::FileNotFoundError is raised' do
|
context 'when ActiveStorage::FileNotFoundError is raised' do
|
||||||
|
let(:import_file_double) { instance_double(ActiveStorage::Blob) }
|
||||||
|
|
||||||
before do
|
before do
|
||||||
allow(data_import.import_file).to receive(:download).and_raise(ActiveStorage::FileNotFoundError)
|
allow(data_import).to receive(:import_file).and_return(import_file_double)
|
||||||
|
allow(import_file_double).to receive(:open).and_raise(ActiveStorage::FileNotFoundError)
|
||||||
end
|
end
|
||||||
|
|
||||||
it 'retries the job' do
|
it 'retries the job' do
|
||||||
@@ -158,7 +161,9 @@ RSpec.describe DataImportJob do
|
|||||||
end
|
end
|
||||||
|
|
||||||
before do
|
before do
|
||||||
allow(data_import.import_file).to receive(:download).and_return(invalid_csv_content)
|
import_file_double = instance_double(ActiveStorage::Blob)
|
||||||
|
allow(data_import).to receive(:import_file).and_return(import_file_double)
|
||||||
|
allow(import_file_double).to receive(:open).and_yield(StringIO.new(invalid_csv_content))
|
||||||
end
|
end
|
||||||
|
|
||||||
it 'does not import any data and handles the MalformedCSVError' do
|
it 'does not import any data and handles the MalformedCSVError' do
|
||||||
|
|||||||
@@ -206,6 +206,21 @@ describe Integrations::Slack::SendOnSlackService do
|
|||||||
expect(message.attachments.count).to eq 2
|
expect(message.attachments.count).to eq 2
|
||||||
end
|
end
|
||||||
|
|
||||||
|
it 'streams attachment blobs and uploads only once' do
|
||||||
|
expect(slack_client).to receive(:chat_postMessage).and_return(slack_message)
|
||||||
|
|
||||||
|
attachment = message.attachments.new(account_id: message.account_id, file_type: :image)
|
||||||
|
attachment.file.attach(io: Rails.root.join('spec/assets/avatar.png').open, filename: 'avatar.png', content_type: 'image/png')
|
||||||
|
blob = attachment.file.blob
|
||||||
|
allow(blob).to receive(:open).and_call_original
|
||||||
|
|
||||||
|
expect(blob).to receive(:open).and_call_original
|
||||||
|
expect(slack_client).to receive(:files_upload_v2).once.and_return(file_attachment)
|
||||||
|
|
||||||
|
message.save!
|
||||||
|
builder.perform
|
||||||
|
end
|
||||||
|
|
||||||
it 'handles file upload errors gracefully' do
|
it 'handles file upload errors gracefully' do
|
||||||
expect(slack_client).to receive(:chat_postMessage).with(
|
expect(slack_client).to receive(:chat_postMessage).with(
|
||||||
channel: hook.reference_id,
|
channel: hook.reference_id,
|
||||||
|
|||||||
Reference in New Issue
Block a user