chore: Contact import improvements (CW-1362) (#6747)
Fixes: https://linear.app/chatwoot/issue/CW-1362/csv-imports-are-not-working-properly Fixes: #3462 --------- Co-authored-by: Sojan <sojan@pepalo.com>
This commit is contained in:
@@ -5,53 +5,123 @@ class DataImportJob < ApplicationJob
|
||||
queue_as :low
|
||||
|
||||
def perform(data_import)
|
||||
contacts = []
|
||||
data_import.update!(status: :processing)
|
||||
csv = CSV.parse(data_import.import_file.download, headers: true)
|
||||
csv.each { |row| contacts << build_contact(row.to_h.with_indifferent_access, data_import.account) }
|
||||
result = Contact.import contacts, on_duplicate_key_update: :all, batch_size: 1000
|
||||
data_import.update!(status: :completed, processed_records: csv.length - result.failed_instances.length, total_records: csv.length)
|
||||
@data_import = data_import
|
||||
process_import_file
|
||||
send_failed_records_to_admin
|
||||
end
|
||||
|
||||
private
|
||||
|
||||
def build_contact(params, account)
|
||||
# TODO: rather than doing the find or initialize individually lets fetch objects in bulk and update them in memory
|
||||
contact = init_contact(params, account)
|
||||
def process_import_file
|
||||
@data_import.update!(status: :processing)
|
||||
|
||||
contacts, rejected_contacts = parse_csv_and_build_contacts
|
||||
|
||||
import_contacts(contacts)
|
||||
update_data_import_status(contacts.length, rejected_contacts.length)
|
||||
save_failed_records_csv(rejected_contacts)
|
||||
end
|
||||
|
||||
def parse_csv_and_build_contacts
|
||||
contacts = []
|
||||
rejected_contacts = []
|
||||
csv = CSV.parse(@data_import.import_file.download, headers: true)
|
||||
|
||||
csv.each do |row|
|
||||
current_contact = build_contact(row.to_h.with_indifferent_access, @data_import.account)
|
||||
if current_contact.valid?
|
||||
contacts << current_contact
|
||||
else
|
||||
row['errors'] = current_contact.errors.full_messages.join(', ')
|
||||
rejected_contacts << row
|
||||
end
|
||||
end
|
||||
|
||||
[contacts, rejected_contacts]
|
||||
end
|
||||
|
||||
def import_contacts(contacts)
|
||||
# <struct ActiveRecord::Import::Result failed_instances=[], num_inserts=1, ids=[444, 445], results=[]>
|
||||
Contact.import(contacts, synchronize: contacts, on_duplicate_key_ignore: true, track_validation_failures: true, validate: true, batch_size: 1000)
|
||||
end
|
||||
|
||||
def update_data_import_status(processed_records, rejected_records)
|
||||
@data_import.update!(status: :completed, processed_records: processed_records, total_records: processed_records + rejected_records)
|
||||
end
|
||||
|
||||
def build_contact(params, account)
|
||||
contact = find_or_initialize_contact(params, account)
|
||||
contact.name = params[:name] if params[:name].present?
|
||||
contact.additional_attributes ||= {}
|
||||
contact.additional_attributes[:company] = params[:company] if params[:company].present?
|
||||
contact.additional_attributes[:city] = params[:city] if params[:city].present?
|
||||
contact.assign_attributes(custom_attributes: contact.custom_attributes.merge(params.except(:identifier, :email, :name, :phone_number)))
|
||||
contact
|
||||
end
|
||||
|
||||
def get_identified_contacts(params, account)
|
||||
identifier_contact = account.contacts.find_by(identifier: params[:identifier]) if params[:identifier]
|
||||
email_contact = account.contacts.find_by(email: params[:email]) if params[:email]
|
||||
phone_number_contact = account.contacts.find_by(phone_number: params[:phone_number]) if params[:phone_number]
|
||||
contact = merge_identified_contact_attributes(params, [identifier_contact, email_contact, phone_number_contact])
|
||||
# intiating the new contact / contact attributes only by ensuring the identifier, email or phone_number duplication errors won't occur
|
||||
contact ||= merge_contact(email_contact, phone_number_contact)
|
||||
contact
|
||||
end
|
||||
|
||||
def merge_contact(email_contact, phone_number_contact)
|
||||
contact ||= email_contact
|
||||
contact ||= phone_number_contact
|
||||
contact
|
||||
end
|
||||
|
||||
def merge_identified_contact_attributes(params, available_contacts)
|
||||
identifier_contact, email_contact, phone_number_contact = available_contacts
|
||||
|
||||
contact = identifier_contact
|
||||
contact&.email = params[:email] if params[:email].present? && email_contact.blank?
|
||||
contact&.phone_number = params[:phone_number] if params[:phone_number].present? && phone_number_contact.blank?
|
||||
contact
|
||||
end
|
||||
|
||||
def init_contact(params, account)
|
||||
contact = get_identified_contacts(params, account)
|
||||
def find_or_initialize_contact(params, account)
|
||||
contact = find_existing_contact(params, account)
|
||||
contact ||= account.contacts.new(params.slice(:email, :identifier, :phone_number))
|
||||
contact
|
||||
end
|
||||
|
||||
def find_existing_contact(params, account)
|
||||
contact = find_contact_by_identifier(params, account)
|
||||
contact ||= find_contact_by_email(params, account)
|
||||
contact ||= find_contact_by_phone_number(params, account)
|
||||
|
||||
update_contact_with_merged_attributes(params, contact) if contact.present? && contact.valid?
|
||||
contact
|
||||
end
|
||||
|
||||
def find_contact_by_identifier(params, account)
|
||||
return unless params[:identifier]
|
||||
|
||||
account.contacts.find_by(identifier: params[:identifier])
|
||||
end
|
||||
|
||||
def find_contact_by_email(params, account)
|
||||
return unless params[:email]
|
||||
|
||||
account.contacts.find_by(email: params[:email])
|
||||
end
|
||||
|
||||
def find_contact_by_phone_number(params, account)
|
||||
return unless params[:phone_number]
|
||||
|
||||
account.contacts.find_by(phone_number: params[:phone_number])
|
||||
end
|
||||
|
||||
def update_contact_with_merged_attributes(params, contact)
|
||||
contact.email = params[:email] if params[:email].present?
|
||||
contact.phone_number = params[:phone_number] if params[:phone_number].present?
|
||||
contact.save
|
||||
end
|
||||
|
||||
def save_failed_records_csv(rejected_contacts)
|
||||
csv_data = generate_csv_data(rejected_contacts)
|
||||
|
||||
return if csv_data.blank?
|
||||
|
||||
@data_import.failed_records.attach(io: StringIO.new(csv_data), filename: "#{Time.zone.today.strftime('%Y%m%d')}_contacts.csv",
|
||||
content_type: 'text/csv')
|
||||
send_failed_records_to_admin
|
||||
end
|
||||
|
||||
def generate_csv_data(rejected_contacts)
|
||||
headers = CSV.parse(@data_import.import_file.download, headers: true).headers
|
||||
headers << 'errors'
|
||||
return if rejected_contacts.blank?
|
||||
|
||||
CSV.generate do |csv|
|
||||
csv << headers
|
||||
rejected_contacts.each do |record|
|
||||
csv << record
|
||||
end
|
||||
end
|
||||
end
|
||||
|
||||
def send_failed_records_to_admin
|
||||
AdministratorNotifications::ChannelNotificationsMailer.with(account: @data_import.account).failed_records(@data_import).deliver_later
|
||||
end
|
||||
end
|
||||
|
||||
@@ -1,4 +1,16 @@
|
||||
class AdministratorNotifications::ChannelNotificationsMailer < ApplicationMailer
|
||||
def failed_records(resource)
|
||||
return unless smtp_config_set_or_development?
|
||||
|
||||
subject = 'Contact Import Completed'
|
||||
|
||||
@attachment_url = Rails.application.routes.url_helpers.rails_blob_url(resource.failed_records) if resource.failed_records.attached?
|
||||
@action_url = "#{ENV.fetch('FRONTEND_URL', nil)}/app/accounts/#{resource.account.id}/contacts"
|
||||
@failed_contacts = resource.total_records - resource.processed_records
|
||||
@imported_contacts = resource.processed_records
|
||||
send_mail_with_liquid(to: admin_emails, subject: subject) and return
|
||||
end
|
||||
|
||||
def slack_disconnect
|
||||
return unless smtp_config_set_or_development?
|
||||
|
||||
|
||||
@@ -52,10 +52,14 @@ class ApplicationMailer < ActionMailer::Base
|
||||
|
||||
def liquid_locals
|
||||
# expose variables you want to be exposed in liquid
|
||||
{
|
||||
locals = {
|
||||
global_config: GlobalConfig.get('BRAND_NAME', 'BRAND_URL'),
|
||||
action_url: @action_url
|
||||
}
|
||||
|
||||
locals.merge({ attachment_url: @attachment_url }) if @attachment_url
|
||||
locals.merge({ failed_contacts: @failed_contacts, imported_contacts: @imported_contacts })
|
||||
locals
|
||||
end
|
||||
|
||||
def locale_from_account(account)
|
||||
|
||||
@@ -22,6 +22,7 @@ class DataImport < ApplicationRecord
|
||||
enum status: { pending: 0, processing: 1, completed: 2, failed: 3 }
|
||||
|
||||
has_one_attached :import_file
|
||||
has_one_attached :failed_records
|
||||
|
||||
after_create_commit :process_data_import
|
||||
|
||||
|
||||
@@ -0,0 +1,14 @@
|
||||
<p>Hello,</p>
|
||||
|
||||
<p>Your contact import has been completed. Please check the contacts tab to view the imported contacts.</p>
|
||||
|
||||
<p>Number of records imported. : {{imported_contacts}}</p>
|
||||
|
||||
<p>Number of records failed. : {{failed_contacts}}</p>
|
||||
|
||||
<p>
|
||||
Attachment [<a href="{{ attachment_url }}" _target="blank">Click here to view</a>]
|
||||
</p>
|
||||
<p>
|
||||
Click <a href="{{action_url}}">here</a>
|
||||
</p>
|
||||
@@ -1,5 +1,5 @@
|
||||
id,first_name,last_name,email,gender,ip_address,identifier,phone_number
|
||||
1,Clarice,Uzzell,cuzzell0@mozilla.org,Genderfluid,70.61.11.201,bb4e11cd-0f23-49da-a123-dcc1fec6852c,+918080808080
|
||||
id,first_name,last_name,email,gender,ip_address,identifier,phone_number,company
|
||||
1,Clarice,Uzzell,cuzzell0@mozilla.org,Genderfluid,70.61.11.201,bb4e11cd-0f23-49da-a123-dcc1fec6852c,+918080808080,My Company Name
|
||||
2,Marieann,Creegan,mcreegan1@cornell.edu,Genderfluid,168.186.4.241,e60bab4c-9fbb-47eb-8f75-42025b789c47,+918080808081
|
||||
3,Nancey,Windibank,nwindibank2@bluehost.com,Agender,73.44.41.59,f793e813-4210-4bf3-a812-711418de25d2,+918080808082
|
||||
4,Sibel,Stennine,sstennine3@yellowbook.com,Genderqueer,115.249.27.155,d6e35a2d-d093-4437-a577-7df76316b937,+918080808083
|
||||
|
||||
|
@@ -5,18 +5,104 @@ RSpec.describe DataImportJob, type: :job do
|
||||
|
||||
let!(:data_import) { create(:data_import) }
|
||||
|
||||
it 'queues the job' do
|
||||
expect { job }.to have_enqueued_job(described_class)
|
||||
.with(data_import)
|
||||
.on_queue('low')
|
||||
describe 'enqueueing the job' do
|
||||
it 'queues the job on the low priority queue' do
|
||||
expect { job }.to have_enqueued_job(described_class)
|
||||
.with(data_import)
|
||||
.on_queue('low')
|
||||
end
|
||||
end
|
||||
|
||||
it 'imports data into the account' do
|
||||
csv_length = CSV.parse(data_import.import_file.download, headers: true).length
|
||||
described_class.perform_now(data_import)
|
||||
expect(data_import.account.contacts.count).to eq(csv_length)
|
||||
expect(data_import.reload.total_records).to eq(csv_length)
|
||||
expect(data_import.reload.processed_records).to eq(csv_length)
|
||||
expect(Contact.find_by(phone_number: '+918080808080')).to be_truthy
|
||||
describe 'importing data' do
|
||||
context 'when the data is valid' do
|
||||
it 'imports data into the account' do
|
||||
csv_length = CSV.parse(data_import.import_file.download, headers: true).length
|
||||
|
||||
described_class.perform_now(data_import)
|
||||
expect(data_import.account.contacts.count).to eq(csv_length)
|
||||
expect(data_import.reload.total_records).to eq(csv_length)
|
||||
expect(data_import.reload.processed_records).to eq(csv_length)
|
||||
contact = Contact.find_by(phone_number: '+918080808080')
|
||||
expect(contact).to be_truthy
|
||||
expect(contact['additional_attributes']['company']).to eq('My Company Name')
|
||||
end
|
||||
end
|
||||
|
||||
context 'when the data contains errors' do
|
||||
it 'imports erroneous data into the account, skipping invalid records' do
|
||||
# Last record is invalid because of duplicate email
|
||||
invalid_data = [
|
||||
%w[id first_name last_name email phone_number],
|
||||
['1', 'Clarice', 'Uzzell', 'cuzzell0@mozilla.org', '+918484848484'],
|
||||
['2', 'Marieann', 'Creegan', 'mcreegan1@cornell.edu', '+918484848485'],
|
||||
['3', 'Nancey', 'Windibank', 'cuzzell0@mozilla.org', '+91848484848']
|
||||
]
|
||||
|
||||
invalid_data_import = create(:data_import, import_file: generate_csv_file(invalid_data))
|
||||
csv_data = CSV.parse(invalid_data_import.import_file.download, headers: true)
|
||||
csv_length = csv_data.length
|
||||
|
||||
described_class.perform_now(invalid_data_import)
|
||||
expect(invalid_data_import.account.contacts.count).to eq(csv_length - 1)
|
||||
expect(invalid_data_import.reload.total_records).to eq(csv_length)
|
||||
expect(invalid_data_import.reload.processed_records).to eq(csv_length)
|
||||
end
|
||||
end
|
||||
|
||||
context 'when the data contains existing records' do
|
||||
let(:existing_data) do
|
||||
[
|
||||
%w[id first_name last_name email phone_number],
|
||||
['1', 'Clarice', 'Uzzell', 'cuzzell0@mozilla.org', '+918080808080'],
|
||||
['2', 'Marieann', 'Creegan', 'mcreegan1@cornell.edu', '+918080808081'],
|
||||
['3', 'Nancey', 'Windibank', 'nwindibank2@bluehost.com', '+918080808082']
|
||||
]
|
||||
end
|
||||
let(:existing_data_import) { create(:data_import, import_file: generate_csv_file(existing_data)) }
|
||||
let(:csv_data) { CSV.parse(existing_data_import.import_file.download, headers: true) }
|
||||
|
||||
context 'when the existing record has an email in import data' do
|
||||
it 'updates the existing record with new data' do
|
||||
contact = Contact.create!(email: csv_data[0]['email'], account_id: existing_data_import.account_id)
|
||||
expect(contact.reload.phone_number).to be_nil
|
||||
|
||||
csv_length = csv_data.length
|
||||
|
||||
described_class.perform_now(existing_data_import)
|
||||
expect(existing_data_import.account.contacts.count).to eq(csv_length)
|
||||
expect(Contact.find_by(email: csv_data[0]['email']).phone_number).to eq(csv_data[0]['phone_number'])
|
||||
expect(Contact.where(email: csv_data[0]['email']).count).to eq(1)
|
||||
end
|
||||
end
|
||||
|
||||
context 'when the existing record has a phone_number in import data' do
|
||||
it 'updates the existing record with new data' do
|
||||
contact = Contact.create!(account_id: existing_data_import.account_id, phone_number: csv_data[1]['phone_number'])
|
||||
expect(contact.reload.email).to be_nil
|
||||
csv_length = csv_data.length
|
||||
|
||||
described_class.perform_now(existing_data_import)
|
||||
expect(existing_data_import.account.contacts.count).to eq(csv_length)
|
||||
|
||||
expect(Contact.find_by(phone_number: csv_data[1]['phone_number']).email).to eq(csv_data[1]['email'])
|
||||
expect(Contact.where(phone_number: csv_data[1]['phone_number']).count).to eq(1)
|
||||
end
|
||||
end
|
||||
|
||||
context 'when the existing record has both email and phone_number in import data' do
|
||||
it 'skips importing the records' do
|
||||
phone_contact = Contact.create!(account_id: existing_data_import.account_id, phone_number: csv_data[1]['phone_number'])
|
||||
email_contact = Contact.create!(account_id: existing_data_import.account_id, email: csv_data[1]['email'])
|
||||
|
||||
csv_length = csv_data.length
|
||||
|
||||
described_class.perform_now(existing_data_import)
|
||||
expect(phone_contact.reload.email).to be_nil
|
||||
expect(email_contact.reload.phone_number).to be_nil
|
||||
expect(existing_data_import.total_records).to eq(csv_length)
|
||||
expect(existing_data_import.processed_records).to eq(csv_length - 1)
|
||||
end
|
||||
end
|
||||
end
|
||||
end
|
||||
end
|
||||
|
||||
@@ -69,6 +69,7 @@ RSpec.configure do |config|
|
||||
# config.filter_gems_from_backtrace("gem name")
|
||||
config.include SlackStubs
|
||||
config.include FileUploadHelpers
|
||||
config.include CsvSpecHelpers
|
||||
config.include Devise::Test::IntegrationHelpers, type: :request
|
||||
config.include ActiveSupport::Testing::TimeHelpers
|
||||
config.include ActionCable::TestHelper
|
||||
|
||||
18
spec/support/csv_spec_helpers.rb
Normal file
18
spec/support/csv_spec_helpers.rb
Normal file
@@ -0,0 +1,18 @@
|
||||
module CsvSpecHelpers
|
||||
# Generates a Rack::Test::UploadedFile object from an array of arrays
|
||||
# data: Accepts an array of arrays as the only argument
|
||||
def generate_csv_file(data)
|
||||
# Create a temporary file
|
||||
temp_file = Tempfile.new(['data', '.csv'])
|
||||
|
||||
# Write the array of arrays to the temporary file as CSV
|
||||
CSV.open(temp_file.path, 'wb') do |csv|
|
||||
data.each do |row|
|
||||
csv << row
|
||||
end
|
||||
end
|
||||
|
||||
# Create and return a Rack::Test::UploadedFile object
|
||||
Rack::Test::UploadedFile.new(temp_file.path, 'text/csv')
|
||||
end
|
||||
end
|
||||
Reference in New Issue
Block a user