feat: Add support for realtime-events in copilot-threads and copilot-messages (#11557)

- Add API support for creating a thread
- Add API support for creating a message
- Remove uuid from thread (no longer required, we will use existing
websocket connection to send messages)
- Update message_type to a column (user, assistant, assistant_thinking)
This commit is contained in:
Pranav
2025-05-22 22:25:05 -07:00
committed by GitHub
parent e92f72b318
commit 8c0885e1d2
29 changed files with 505 additions and 52 deletions

View File

@@ -204,3 +204,5 @@ class ActionCableListener < BaseListener
::ActionCableBroadcastJob.perform_later(tokens.uniq, event_name, payload)
end
end
ActionCableListener.prepend_mod_with('ActionCableListener')

View File

@@ -60,8 +60,8 @@ Rails.application.routes.draw do
end
resources :assistant_responses
resources :bulk_actions, only: [:create]
resources :copilot_threads, only: [:index] do
resources :copilot_messages, only: [:index]
resources :copilot_threads, only: [:index, :create] do
resources :copilot_messages, only: [:index, :create]
end
resources :documents, only: [:index, :show, :create, :destroy]
end

View File

@@ -0,0 +1,8 @@
class RemoveUuidFromCopilotThreads < ActiveRecord::Migration[7.1]
def change
remove_column :copilot_threads, :uuid, :string
add_column :copilot_threads, :assistant_id, :integer
add_index :copilot_threads, :assistant_id
end
end

View File

@@ -0,0 +1,5 @@
class RemoveUserIdFromCopilotMessages < ActiveRecord::Migration[7.1]
def change
remove_reference :copilot_messages, :user, index: true
end
end

View File

@@ -0,0 +1,11 @@
class ChangeMessageTypeToIntegerInCopilotMessages < ActiveRecord::Migration[7.1]
def up
remove_column :copilot_messages, :message_type
add_column :copilot_messages, :message_type, :integer, default: 0
end
def down
remove_column :copilot_messages, :message_type
add_column :copilot_messages, :message_type, :string, default: 'user'
end
end

View File

@@ -10,7 +10,7 @@
#
# It's strongly recommended that you check this file into your version control system.
ActiveRecord::Schema[7.0].define(version: 2025_05_14_045638) do
ActiveRecord::Schema[7.1].define(version: 2025_05_23_031839) do
# These extensions should be enabled to support this database
enable_extension "pg_stat_statements"
enable_extension "pg_trgm"
@@ -577,27 +577,25 @@ ActiveRecord::Schema[7.0].define(version: 2025_05_14_045638) do
create_table "copilot_messages", force: :cascade do |t|
t.bigint "copilot_thread_id", null: false
t.bigint "user_id", null: false
t.bigint "account_id", null: false
t.string "message_type", null: false
t.jsonb "message", default: {}, null: false
t.datetime "created_at", null: false
t.datetime "updated_at", null: false
t.integer "message_type", default: 0
t.index ["account_id"], name: "index_copilot_messages_on_account_id"
t.index ["copilot_thread_id"], name: "index_copilot_messages_on_copilot_thread_id"
t.index ["user_id"], name: "index_copilot_messages_on_user_id"
end
create_table "copilot_threads", force: :cascade do |t|
t.string "title", null: false
t.bigint "user_id", null: false
t.bigint "account_id", null: false
t.uuid "uuid", default: -> { "gen_random_uuid()" }, null: false
t.datetime "created_at", null: false
t.datetime "updated_at", null: false
t.integer "assistant_id"
t.index ["account_id"], name: "index_copilot_threads_on_account_id"
t.index ["assistant_id"], name: "index_copilot_threads_on_assistant_id"
t.index ["user_id"], name: "index_copilot_threads_on_user_id"
t.index ["uuid"], name: "index_copilot_threads_on_uuid", unique: true
end
create_table "csat_survey_responses", force: :cascade do |t|

View File

@@ -1,21 +1,28 @@
class Api::V1::Accounts::Captain::CopilotMessagesController < Api::V1::Accounts::BaseController
before_action :current_account
before_action -> { check_authorization(Captain::Assistant) }
before_action :set_copilot_thread
def index
@copilot_messages = @copilot_thread
.copilot_messages
.includes(:copilot_thread)
.order(created_at: :asc)
.page(permitted_params[:page] || 1)
.per(1000)
end
def create
@copilot_message = @copilot_thread.copilot_messages.create!(
message: params[:message],
message_type: :user
)
end
private
def set_copilot_thread
@copilot_thread = Current.account.copilot_threads.find_by!(
uuid: params[:copilot_thread_id], user_id: Current.user.id
id: params[:copilot_thread_id],
user: Current.user
)
end

View File

@@ -1,18 +1,41 @@
class Api::V1::Accounts::Captain::CopilotThreadsController < Api::V1::Accounts::BaseController
before_action :current_account
before_action -> { check_authorization(Captain::Assistant) }
before_action :ensure_message, only: :create
def index
@copilot_threads = Current.account.copilot_threads
.where(user_id: Current.user.id)
.includes(:user)
.includes(:user, :assistant)
.order(created_at: :desc)
.page(permitted_params[:page] || 1)
.per(5)
end
def create
ActiveRecord::Base.transaction do
@copilot_thread = Current.account.copilot_threads.create!(
title: copilot_thread_params[:message],
user: Current.user,
assistant: assistant
)
@copilot_thread.copilot_messages.create!(message_type: :user, message: copilot_thread_params[:message])
end
end
private
def ensure_message
return render_could_not_create_error('Message is required') if copilot_thread_params[:message].blank?
end
def assistant
Current.account.captain_assistants.find(copilot_thread_params[:assistant_id])
end
def copilot_thread_params
params.permit(:message, :assistant_id)
end
def permitted_params
params.permit(:page)
end

View File

@@ -1,7 +1,10 @@
class CaptainListener < BaseListener
include ::Events::Types
def conversation_resolved(event)
conversation = extract_conversation_and_account(event)[0]
assistant = conversation.inbox.captain_assistant
return unless conversation.inbox.captain_active?
Captain::Llm::ContactNotesService.new(assistant, conversation).generate_and_update_notes if assistant.config['feature_memory'].present?

View File

@@ -0,0 +1,11 @@
module Enterprise::ActionCableListener
include Events::Types
def copilot_message_created(event)
copilot_message = event.data[:copilot_message]
copilot_thread = copilot_message.copilot_thread
account = copilot_thread.account
user = copilot_thread.user
broadcast(account, [user.pubsub_token], COPILOT_MESSAGE_CREATED, copilot_message.push_event_data)
end
end

View File

@@ -29,6 +29,7 @@ class Captain::Assistant < ApplicationRecord
has_many :inboxes,
through: :captain_inboxes
has_many :messages, as: :sender, dependent: :nullify
has_many :copilot_threads, dependent: :destroy_async
validates :name, presence: true
validates :description, presence: true

View File

@@ -4,24 +4,47 @@
#
# id :bigint not null, primary key
# message :jsonb not null
# message_type :string not null
# message_type :integer default("user")
# created_at :datetime not null
# updated_at :datetime not null
# account_id :bigint not null
# copilot_thread_id :bigint not null
# user_id :bigint not null
#
# Indexes
#
# index_copilot_messages_on_account_id (account_id)
# index_copilot_messages_on_copilot_thread_id (copilot_thread_id)
# index_copilot_messages_on_user_id (user_id)
#
class CopilotMessage < ApplicationRecord
belongs_to :copilot_thread
belongs_to :user
belongs_to :account
validates :message_type, presence: true, inclusion: { in: %w[user assistant assistant_thinking] }
before_validation :ensure_account
enum message_type: { user: 0, assistant: 1, assistant_thinking: 2 }
validates :message_type, presence: true, inclusion: { in: message_types.keys }
validates :message, presence: true
after_create_commit :broadcast_message
def push_event_data
{
id: id,
message: message,
message_type: message_type,
created_at: created_at.to_i,
copilot_thread: copilot_thread.push_event_data
}
end
private
def ensure_account
self.account = copilot_thread.account
end
def broadcast_message
Rails.configuration.dispatcher.dispatch(COPILOT_MESSAGE_CREATED, Time.zone.now, copilot_message: self)
end
end

View File

@@ -4,23 +4,45 @@
#
# id :bigint not null, primary key
# title :string not null
# uuid :uuid not null
# created_at :datetime not null
# updated_at :datetime not null
# account_id :bigint not null
# assistant_id :integer
# user_id :bigint not null
#
# Indexes
#
# index_copilot_threads_on_account_id (account_id)
# index_copilot_threads_on_assistant_id (assistant_id)
# index_copilot_threads_on_user_id (user_id)
# index_copilot_threads_on_uuid (uuid) UNIQUE
#
class CopilotThread < ApplicationRecord
belongs_to :user
belongs_to :account
has_many :copilot_messages, dependent: :destroy
belongs_to :assistant, class_name: 'Captain::Assistant'
has_many :copilot_messages, dependent: :destroy_async
validates :title, presence: true
validates :uuid, presence: true, uniqueness: true
def push_event_data
{
id: id,
title: title,
created_at: created_at.to_i,
user: user.push_event_data,
account_id: account_id
}
end
def previous_history
copilot_messages
.where(message_type: %w[user assistant])
.order(created_at: :asc)
.map do |copilot_message|
{
content: copilot_message.message,
role: copilot_message.message_type
}
end
end
end

View File

@@ -0,0 +1 @@
json.partial! 'api/v1/models/captain/copilot_message', formats: [:json], resource: @copilot_message

View File

@@ -1,8 +1,5 @@
json.payload do
json.array! @copilot_messages do |message|
json.id message.id
json.message message.message
json.message_type message.message_type
json.created_at message.created_at.to_i
json.partial! 'api/v1/models/captain/copilot_message', formats: [:json], resource: message
end
end

View File

@@ -0,0 +1 @@
json.partial! 'api/v1/models/captain/copilot_thread', formats: [:json], resource: @copilot_thread

View File

@@ -1,12 +1,5 @@
json.payload do
json.array! @copilot_threads do |thread|
json.id thread.id
json.title thread.title
json.uuid thread.uuid
json.created_at thread.created_at.to_i
json.user do
json.id thread.user.id
json.name thread.user.name
end
json.partial! 'api/v1/models/captain/copilot_thread', resource: thread
end
end

View File

@@ -0,0 +1,6 @@
json.id resource.id
json.message resource.message
json.message_type resource.message_type
json.created_at resource.created_at.to_i
json.copilot_thread resource.copilot_thread.push_event_data
json.account_id resource.account_id

View File

@@ -0,0 +1,6 @@
json.id resource.id
json.title resource.title
json.created_at resource.created_at.to_i
json.user resource.user.push_event_data
json.assistant resource.assistant.push_event_data
json.account_id resource.account_id

View File

@@ -54,4 +54,7 @@ module Events::Types
# agent events
AGENT_ADDED = 'agent.added'
AGENT_REMOVED = 'agent.removed'
# copilot events
COPILOT_MESSAGE_CREATED = 'copilot.message.created'
end

View File

@@ -4,12 +4,12 @@ RSpec.describe 'Api::V1::Accounts::Captain::CopilotMessagesController', type: :r
let(:account) { create(:account) }
let(:user) { create(:user, account: account, role: :administrator) }
let(:copilot_thread) { create(:captain_copilot_thread, account: account, user: user) }
let!(:copilot_message) { create(:captain_copilot_message, copilot_thread: copilot_thread, user: user, account: account) }
let!(:copilot_message) { create(:captain_copilot_message, copilot_thread: copilot_thread, account: account) }
describe 'GET /api/v1/accounts/{account.id}/captain/copilot_threads/{thread.uuid}/copilot_messages' do
describe 'GET /api/v1/accounts/{account.id}/captain/copilot_threads/{thread.id}/copilot_messages' do
context 'when it is an authenticated user' do
it 'returns all messages' do
get "/api/v1/accounts/#{account.id}/captain/copilot_threads/#{copilot_thread.uuid}/copilot_messages",
get "/api/v1/accounts/#{account.id}/captain/copilot_threads/#{copilot_thread.id}/copilot_messages",
headers: user.create_new_auth_token,
as: :json
@@ -20,9 +20,54 @@ RSpec.describe 'Api::V1::Accounts::Captain::CopilotMessagesController', type: :r
end
end
context 'when thread uuid is invalid' do
context 'when thread id is invalid' do
it 'returns not found error' do
get "/api/v1/accounts/#{account.id}/captain/copilot_threads/invalid-uuid/copilot_messages",
get "/api/v1/accounts/#{account.id}/captain/copilot_threads/999999999/copilot_messages",
headers: user.create_new_auth_token,
as: :json
expect(response).to have_http_status(:not_found)
end
end
end
describe 'POST /api/v1/accounts/{account.id}/captain/copilot_threads/{thread.id}/copilot_messages' do
context 'when it is an authenticated user' do
it 'creates a new message' do
message_content = { 'content' => 'This is a test message' }
expect do
post "/api/v1/accounts/#{account.id}/captain/copilot_threads/#{copilot_thread.id}/copilot_messages",
params: { message: message_content },
headers: user.create_new_auth_token,
as: :json
end.to change(CopilotMessage, :count).by(1)
expect(response).to have_http_status(:success)
expect(CopilotMessage.last.message).to eq(message_content)
expect(CopilotMessage.last.message_type).to eq('user')
expect(CopilotMessage.last.copilot_thread_id).to eq(copilot_thread.id)
end
end
context 'when thread does not exist' do
it 'returns not found error' do
post "/api/v1/accounts/#{account.id}/captain/copilot_threads/999999999/copilot_messages",
params: { message: { text: 'Test message' } },
headers: user.create_new_auth_token,
as: :json
expect(response).to have_http_status(:not_found)
end
end
context 'when thread belongs to another user' do
let(:another_user) { create(:user, account: account) }
let(:another_thread) { create(:captain_copilot_thread, account: account, user: another_user) }
it 'returns not found error' do
post "/api/v1/accounts/#{account.id}/captain/copilot_threads/#{another_thread.id}/copilot_messages",
params: { message: { text: 'Test message' } },
headers: user.create_new_auth_token,
as: :json

View File

@@ -18,7 +18,7 @@ RSpec.describe 'Api::V1::Accounts::Captain::CopilotThreads', type: :request do
end
end
context 'when it is an agent' do
context 'when it is an authenticated user' do
it 'fetches copilot threads for the current user' do
# Create threads for the current agent
create_list(:captain_copilot_thread, 3, account: account, user: agent)
@@ -47,4 +47,65 @@ RSpec.describe 'Api::V1::Accounts::Captain::CopilotThreads', type: :request do
end
end
end
describe 'POST /api/v1/accounts/{account.id}/captain/copilot_threads' do
let(:assistant) { create(:captain_assistant, account: account) }
let(:valid_params) { { message: 'Hello, how can you help me?', assistant_id: assistant.id } }
context 'when it is an un-authenticated user' do
it 'returns unauthorized' do
post "/api/v1/accounts/#{account.id}/captain/copilot_threads",
params: valid_params,
as: :json
expect(response).to have_http_status(:unauthorized)
end
end
context 'when it is an authenticated user' do
context 'with invalid params' do
it 'returns error when message is blank' do
post "/api/v1/accounts/#{account.id}/captain/copilot_threads",
params: { message: '', assistant_id: assistant.id },
headers: agent.create_new_auth_token,
as: :json
expect(response).to have_http_status(:unprocessable_entity)
expect(json_response[:error]).to eq('Message is required')
end
it 'returns error when assistant_id is invalid' do
post "/api/v1/accounts/#{account.id}/captain/copilot_threads",
params: { message: 'Hello', assistant_id: 0 },
headers: agent.create_new_auth_token,
as: :json
expect(response).to have_http_status(:not_found)
end
end
context 'with valid params' do
it 'creates a new copilot thread with initial message' do
expect do
post "/api/v1/accounts/#{account.id}/captain/copilot_threads",
params: valid_params,
headers: agent.create_new_auth_token,
as: :json
end.to change(CopilotThread, :count).by(1)
.and change(CopilotMessage, :count).by(1)
expect(response).to have_http_status(:success)
thread = CopilotThread.last
expect(thread.title).to eq(valid_params[:message])
expect(thread.user_id).to eq(agent.id)
expect(thread.assistant_id).to eq(assistant.id)
message = thread.copilot_messages.last
expect(message.message_type).to eq('user')
expect(message.message).to eq(valid_params[:message])
end
end
end
end
end

View File

@@ -0,0 +1,24 @@
require 'rails_helper'
describe ActionCableListener do
describe '#copilot_message_created' do
let(:event_name) { :copilot_message_created }
let(:account) { create(:account) }
let(:user) { create(:user, account: account) }
let(:assistant) { create(:captain_assistant, account: account) }
let(:copilot_thread) { create(:captain_copilot_thread, account: account, user: user, assistant: assistant) }
let(:copilot_message) { create(:captain_copilot_message, copilot_thread: copilot_thread) }
let(:event) { Events::Base.new(event_name, Time.zone.now, copilot_message: copilot_message) }
let(:listener) { described_class.instance }
it 'broadcasts message to the user' do
expect(ActionCableBroadcastJob).to receive(:perform_later).with(
[user.pubsub_token],
'copilot.message.created',
copilot_message.push_event_data.merge(account_id: account.id)
)
listener.copilot_message_created(event)
end
end
end

View File

@@ -0,0 +1,57 @@
require 'rails_helper'
describe CaptainListener do
let(:listener) { described_class.instance }
let(:account) { create(:account) }
let(:inbox) { create(:inbox, account: account) }
let(:user) { create(:user, account: account) }
let(:assistant) { create(:captain_assistant, account: account, config: { feature_memory: true, feature_faq: true }) }
describe '#conversation_resolved' do
let(:agent) { create(:user, account: account) }
let(:conversation) { create(:conversation, account: account, inbox: inbox, assignee: agent) }
let(:event_name) { :conversation_resolved }
let(:event) { Events::Base.new(event_name, Time.zone.now, conversation: conversation) }
before do
create(:captain_inbox, captain_assistant: assistant, inbox: inbox)
end
context 'when feature_memory is enabled' do
before do
assistant.config['feature_memory'] = true
assistant.config['feature_faq'] = false
assistant.save!
end
it 'generates and updates notes' do
expect(Captain::Llm::ContactNotesService)
.to receive(:new)
.with(assistant, conversation)
.and_return(instance_double(Captain::Llm::ContactNotesService, generate_and_update_notes: nil))
expect(Captain::Llm::ConversationFaqService).not_to receive(:new)
listener.conversation_resolved(event)
end
end
context 'when feature_faq is enabled' do
before do
assistant.config['feature_faq'] = true
assistant.config['feature_memory'] = false
assistant.save!
end
it 'generates and deduplicates FAQs' do
expect(Captain::Llm::ConversationFaqService)
.to receive(:new)
.with(assistant, conversation)
.and_return(instance_double(Captain::Llm::ConversationFaqService, generate_and_deduplicate: false))
expect(Captain::Llm::ContactNotesService).not_to receive(:new)
listener.conversation_resolved(event)
end
end
end
end

View File

@@ -0,0 +1,64 @@
require 'rails_helper'
RSpec.describe CopilotMessage, type: :model do
describe 'associations' do
it { is_expected.to belong_to(:copilot_thread) }
it { is_expected.to belong_to(:account) }
end
describe 'validations' do
it { is_expected.to validate_presence_of(:message_type) }
it { is_expected.to validate_presence_of(:message) }
it { is_expected.to validate_inclusion_of(:message_type).in_array(described_class.message_types.keys) }
end
describe 'callbacks' do
let(:account) { create(:account) }
let(:user) { create(:user, account: account) }
let(:assistant) { create(:captain_assistant, account: account) }
let(:copilot_thread) { create(:captain_copilot_thread, account: account, user: user, assistant: assistant) }
describe '#ensure_account' do
it 'sets the account from the copilot thread before validation' do
message = build(:captain_copilot_message, copilot_thread: copilot_thread, account: nil)
message.valid?
expect(message.account).to eq(copilot_thread.account)
end
end
describe '#broadcast_message' do
it 'dispatches COPILOT_MESSAGE_CREATED event after create' do
message = build(:captain_copilot_message, copilot_thread: copilot_thread)
expect(Rails.configuration.dispatcher).to receive(:dispatch)
.with(COPILOT_MESSAGE_CREATED, anything, copilot_message: message)
message.save!
end
end
end
describe '#push_event_data' do
let(:account) { create(:account) }
let(:user) { create(:user, account: account) }
let(:assistant) { create(:captain_assistant, account: account) }
let(:copilot_thread) { create(:captain_copilot_thread, account: account, user: user, assistant: assistant) }
let(:message_content) { { 'content' => 'Test message' } }
let(:copilot_message) do
create(:captain_copilot_message,
copilot_thread: copilot_thread,
message_type: 'user',
message: message_content)
end
it 'returns the correct event data' do
event_data = copilot_message.push_event_data
expect(event_data[:id]).to eq(copilot_message.id)
expect(event_data[:message]).to eq(message_content)
expect(event_data[:message_type]).to eq('user')
expect(event_data[:created_at]).to eq(copilot_message.created_at.to_i)
expect(event_data[:copilot_thread]).to eq(copilot_thread.push_event_data)
end
end
end

View File

@@ -0,0 +1,62 @@
require 'rails_helper'
RSpec.describe CopilotThread, type: :model do
describe 'associations' do
it { is_expected.to belong_to(:user) }
it { is_expected.to belong_to(:account) }
it { is_expected.to belong_to(:assistant).class_name('Captain::Assistant') }
it { is_expected.to have_many(:copilot_messages).dependent(:destroy_async) }
end
describe 'validations' do
it { is_expected.to validate_presence_of(:title) }
end
describe '#push_event_data' do
let(:account) { create(:account) }
let(:user) { create(:user, account: account) }
let(:assistant) { create(:captain_assistant, account: account) }
let(:copilot_thread) { create(:captain_copilot_thread, account: account, user: user, assistant: assistant, title: 'Test Thread') }
it 'returns the correct event data' do
event_data = copilot_thread.push_event_data
expect(event_data[:id]).to eq(copilot_thread.id)
expect(event_data[:title]).to eq('Test Thread')
expect(event_data[:created_at]).to eq(copilot_thread.created_at.to_i)
expect(event_data[:user]).to eq(user.push_event_data)
expect(event_data[:account_id]).to eq(account.id)
end
end
describe '#previous_history' do
let(:account) { create(:account) }
let(:user) { create(:user, account: account) }
let(:assistant) { create(:captain_assistant, account: account) }
let(:copilot_thread) { create(:captain_copilot_thread, account: account, user: user, assistant: assistant) }
context 'when there are messages in the thread' do
before do
create(:captain_copilot_message, copilot_thread: copilot_thread, message_type: 'user', message: { 'content' => 'User message' })
create(:captain_copilot_message, copilot_thread: copilot_thread, message_type: 'assistant_thinking', message: { 'content' => 'Thinking...' })
create(:captain_copilot_message, copilot_thread: copilot_thread, message_type: 'assistant', message: { 'content' => 'Assistant message' })
end
it 'returns only user and assistant messages in chronological order' do
history = copilot_thread.previous_history
expect(history.length).to eq(2)
expect(history[0][:role]).to eq('user')
expect(history[0][:content]).to eq({ 'content' => 'User message' })
expect(history[1][:role]).to eq('assistant')
expect(history[1][:content]).to eq({ 'content' => 'Assistant message' })
end
end
context 'when there are no messages in the thread' do
it 'returns an empty array' do
expect(copilot_thread.previous_history).to eq([])
end
end
end
end

View File

@@ -1,9 +1,8 @@
FactoryBot.define do
factory :captain_copilot_message, class: 'CopilotMessage' do
account
user
copilot_thread { association :captain_copilot_thread }
message { { content: 'This is a test message' } }
message_type { 'user' }
message_type { 0 }
end
end

View File

@@ -3,6 +3,6 @@ FactoryBot.define do
account
user
title { Faker::Lorem.sentence }
uuid { SecureRandom.uuid }
assistant { create(:captain_assistant, account: account) }
end
end

View File

@@ -203,4 +203,24 @@ describe ActionCableListener do
listener.conversation_updated(event)
end
end
describe '#copilot_message_created' do
let(:event_name) { :copilot_message_created }
let(:account) { create(:account) }
let(:user) { create(:user, account: account) }
let(:assistant) { create(:captain_assistant, account: account) }
let(:copilot_thread) { create(:captain_copilot_thread, account: account, user: user, assistant: assistant) }
let(:copilot_message) { create(:captain_copilot_message, copilot_thread: copilot_thread) }
let(:event) { Events::Base.new(event_name, Time.zone.now, copilot_message: copilot_message) }
it 'broadcasts message to the user' do
expect(ActionCableBroadcastJob).to receive(:perform_later).with(
[user.pubsub_token],
'copilot.message.created',
copilot_message.push_event_data
)
listener.copilot_message_created(event)
end
end
end