diff --git a/Gemfile b/Gemfile index e6c5a5250..0636b7b2b 100644 --- a/Gemfile +++ b/Gemfile @@ -191,7 +191,7 @@ gem 'reverse_markdown' gem 'iso-639' gem 'ruby-openai' -gem 'ai-agents' +gem 'ai-agents', '>= 0.9.1' # TODO: Move this gem as a dependency of ai-agents gem 'ruby_llm', '>= 1.8.2' diff --git a/Gemfile.lock b/Gemfile.lock index aad0438cf..7a7316e3c 100644 --- a/Gemfile.lock +++ b/Gemfile.lock @@ -126,7 +126,7 @@ GEM jbuilder (~> 2) rails (>= 4.2, < 7.2) selectize-rails (~> 0.6) - ai-agents (0.9.0) + ai-agents (0.9.1) ruby_llm (~> 1.9.1) annotaterb (4.20.0) activerecord (>= 6.0.0) @@ -1024,7 +1024,7 @@ DEPENDENCIES administrate (>= 0.20.1) administrate-field-active_storage (>= 1.0.3) administrate-field-belongs_to_search (>= 0.9.0) - ai-agents + ai-agents (>= 0.9.1) annotaterb attr_extras audited (~> 5.4, >= 5.4.1) diff --git a/enterprise/app/services/captain/assistant/agent_runner_service.rb b/enterprise/app/services/captain/assistant/agent_runner_service.rb index 9aeee605f..72c44024f 100644 --- a/enterprise/app/services/captain/assistant/agent_runner_service.rb +++ b/enterprise/app/services/captain/assistant/agent_runner_service.rb @@ -3,6 +3,8 @@ require 'agents/instrumentation' class Captain::Assistant::AgentRunnerService include Integrations::LlmInstrumentationConstants + include Captain::Assistant::RunnerCallbacksHelper + include Captain::Assistant::TracePayloadHelper CONVERSATION_STATE_ATTRIBUTES = %i[ id display_id inbox_id contact_id status priority @@ -21,13 +23,7 @@ class Captain::Assistant::AgentRunnerService end def generate_response(message_history: []) - agents = build_and_wire_agents - context = build_context(message_history) - message_to_process = extract_last_user_message(message_history) - runner = Agents::Runner.with_agents(*agents) - runner = add_usage_metadata_callback(runner) - runner = add_callbacks_to_runner(runner) if @callbacks.any? - install_instrumentation(runner) + message_to_process, context = run_payload(message_history) result = runner.run(message_to_process, context: context, max_turns: 100) process_agent_result(result) @@ -45,7 +41,10 @@ class Captain::Assistant::AgentRunnerService def build_context(message_history) conversation_history = message_history.map do |msg| - content = extract_text_from_content(msg[:content]) + content = msg[:content] + # Preserve multimodal arrays (with image_url entries) as-is for the runner to restore with attachments. + # Only extract text from non-array formats (hashes from agent structured output, plain strings). + content = extract_text_from_content(content) unless content.is_a?(Array) { role: msg[:role].to_sym, @@ -63,8 +62,22 @@ class Captain::Assistant::AgentRunnerService def extract_last_user_message(message_history) last_user_msg = message_history.reverse.find { |msg| msg[:role] == 'user' } + return '' if last_user_msg.blank? - extract_text_from_content(last_user_msg[:content]) + content = last_user_msg[:content] + return extract_text_from_content(content) unless content.is_a?(Array) + + text, attachments = Captain::OpenAiMessageBuilderService.extract_text_and_attachments(content) + return text if attachments.blank? + + RubyLLM::Content.new(text, attachments) + end + + def message_history_without_last_user_message(message_history) + last_user_index = message_history.rindex { |msg| msg[:role] == 'user' } + return message_history if last_user_index.nil? + + message_history.reject.with_index { |_msg, index| index == last_user_index } end def extract_text_from_content(content) @@ -143,28 +156,25 @@ class Captain::Assistant::AgentRunnerService }, attribute_provider: ->(context_wrapper) { dynamic_trace_attributes(context_wrapper) } ) + register_trace_input_callback(runner) end def dynamic_trace_attributes(context_wrapper) state = context_wrapper&.context&.dig(:state) || {} conversation = state[:conversation] || {} + trace_input = context_wrapper&.context&.dig(:captain_v2_trace_input) + { ATTR_LANGFUSE_USER_ID => state[:account_id], format(ATTR_LANGFUSE_METADATA, 'assistant_id') => state[:assistant_id], format(ATTR_LANGFUSE_METADATA, 'conversation_id') => conversation[:id], format(ATTR_LANGFUSE_METADATA, 'conversation_display_id') => conversation[:display_id], - format(ATTR_LANGFUSE_METADATA, 'channel_type') => state[:channel_type] + format(ATTR_LANGFUSE_METADATA, 'channel_type') => state[:channel_type], + ATTR_LANGFUSE_TRACE_INPUT => trace_input, + ATTR_LANGFUSE_OBSERVATION_INPUT => trace_input }.compact.transform_values(&:to_s) end - def add_callbacks_to_runner(runner) - runner = add_agent_thinking_callback(runner) if @callbacks[:on_agent_thinking] - runner = add_tool_start_callback(runner) if @callbacks[:on_tool_start] - runner = add_tool_complete_callback(runner) if @callbacks[:on_tool_complete] - runner = add_agent_handoff_callback(runner) if @callbacks[:on_agent_handoff] - runner - end - def add_usage_metadata_callback(runner) return runner unless ChatwootApp.otel_enabled? @@ -195,35 +205,20 @@ class Captain::Assistant::AgentRunnerService root_span.set_attribute(format(ATTR_LANGFUSE_METADATA, 'credit_used'), credit_used.to_s) end - def add_agent_thinking_callback(runner) - runner.on_agent_thinking do |*args| - @callbacks[:on_agent_thinking].call(*args) - rescue StandardError => e - Rails.logger.warn "[Captain] Callback error for agent_thinking: #{e.message}" + def runner + @runner ||= begin + configured_runner = Agents::Runner.with_agents(*build_and_wire_agents) + configured_runner = add_usage_metadata_callback(configured_runner) + configured_runner = add_callbacks_to_runner(configured_runner) if @callbacks.any? + install_instrumentation(configured_runner) + configured_runner end end - def add_tool_start_callback(runner) - runner.on_tool_start do |*args| - @callbacks[:on_tool_start].call(*args) - rescue StandardError => e - Rails.logger.warn "[Captain] Callback error for tool_start: #{e.message}" - end - end - - def add_tool_complete_callback(runner) - runner.on_tool_complete do |*args| - @callbacks[:on_tool_complete].call(*args) - rescue StandardError => e - Rails.logger.warn "[Captain] Callback error for tool_complete: #{e.message}" - end - end - - def add_agent_handoff_callback(runner) - runner.on_agent_handoff do |*args| - @callbacks[:on_agent_handoff].call(*args) - rescue StandardError => e - Rails.logger.warn "[Captain] Callback error for agent_handoff: #{e.message}" - end + def run_payload(message_history) + message_to_process = extract_last_user_message(message_history) + context = build_context(message_history_without_last_user_message(message_history)) + enrich_context_with_trace_payload!(context, message_history, message_to_process) + [message_to_process, context] end end diff --git a/enterprise/app/services/captain/assistant/runner_callbacks_helper.rb b/enterprise/app/services/captain/assistant/runner_callbacks_helper.rb new file mode 100644 index 000000000..58fa0ca33 --- /dev/null +++ b/enterprise/app/services/captain/assistant/runner_callbacks_helper.rb @@ -0,0 +1,53 @@ +module Captain::Assistant::RunnerCallbacksHelper + private + + def add_callbacks_to_runner(runner) + runner = add_agent_thinking_callback(runner) if @callbacks[:on_agent_thinking] + runner = add_tool_start_callback(runner) if @callbacks[:on_tool_start] + runner = add_tool_complete_callback(runner) if @callbacks[:on_tool_complete] + runner = add_agent_handoff_callback(runner) if @callbacks[:on_agent_handoff] + runner + end + + def register_trace_input_callback(runner) + runner.on_agent_thinking do |_agent_name, _input, context_wrapper| + tracing = context_wrapper&.context&.dig(:__otel_tracing) + next unless tracing + + trace_input = context_wrapper.context[:captain_v2_trace_current_input] + tracing[:pending_llm_input] = trace_input if trace_input.present? + end + end + + def add_agent_thinking_callback(runner) + runner.on_agent_thinking do |*args| + @callbacks[:on_agent_thinking].call(*args) + rescue StandardError => e + Rails.logger.warn "[Captain] Callback error for agent_thinking: #{e.message}" + end + end + + def add_tool_start_callback(runner) + runner.on_tool_start do |*args| + @callbacks[:on_tool_start].call(*args) + rescue StandardError => e + Rails.logger.warn "[Captain] Callback error for tool_start: #{e.message}" + end + end + + def add_tool_complete_callback(runner) + runner.on_tool_complete do |*args| + @callbacks[:on_tool_complete].call(*args) + rescue StandardError => e + Rails.logger.warn "[Captain] Callback error for tool_complete: #{e.message}" + end + end + + def add_agent_handoff_callback(runner) + runner.on_agent_handoff do |*args| + @callbacks[:on_agent_handoff].call(*args) + rescue StandardError => e + Rails.logger.warn "[Captain] Callback error for agent_handoff: #{e.message}" + end + end +end diff --git a/enterprise/app/services/captain/assistant/trace_payload_helper.rb b/enterprise/app/services/captain/assistant/trace_payload_helper.rb new file mode 100644 index 000000000..3b6546c12 --- /dev/null +++ b/enterprise/app/services/captain/assistant/trace_payload_helper.rb @@ -0,0 +1,51 @@ +module Captain::Assistant::TracePayloadHelper + private + + def enrich_context_with_trace_payload!(context, message_history, message_to_process) + context[:captain_v2_trace_input] = serialize_trace_messages(message_history) + context[:captain_v2_trace_current_input] = serialize_trace_content(message_to_process) + end + + def serialize_trace_messages(message_history) + message_history.map do |message| + { + role: message[:role].to_s, + content: trace_content_payload(message[:content]) + } + end.to_json + end + + def serialize_trace_content(content) + payload = trace_content_payload(content) + return '' if payload.blank? + + payload.is_a?(String) ? payload : payload.to_json + end + + def trace_content_payload(content) + case content + when RubyLLM::Content + trace_parts_from_ruby_llm_content(content) + when Array, Hash + content + when NilClass + '' + else + content.to_s + end + end + + def trace_parts_from_ruby_llm_content(content) + parts = [] + parts << { type: 'text', text: content.text } if content.text.present? + + content.attachments.each do |attachment| + parts << { type: 'image_url', image_url: { url: attachment.source.to_s } } + end + + return '' if parts.blank? + return parts.first[:text] if parts.one? && parts.first[:type] == 'text' + + parts + end +end diff --git a/spec/enterprise/services/captain/assistant/agent_runner_service_spec.rb b/spec/enterprise/services/captain/assistant/agent_runner_service_spec.rb index c3019b82c..13e4804ce 100644 --- a/spec/enterprise/services/captain/assistant/agent_runner_service_spec.rb +++ b/spec/enterprise/services/captain/assistant/agent_runner_service_spec.rb @@ -74,12 +74,11 @@ RSpec.describe Captain::Assistant::AgentRunnerService do end it 'runs agent with extracted user message and context' do - expected_context = { + expected_context = hash_including( session_id: "#{account.id}_#{conversation.display_id}", conversation_history: [ { role: :user, content: 'Hello there', agent_name: nil }, - { role: :assistant, content: 'Hi! How can I help you?', agent_name: 'Assistant' }, - { role: :user, content: 'I need help with my account', agent_name: nil } + { role: :assistant, content: 'Hi! How can I help you?', agent_name: 'Assistant' } ], state: hash_including( account_id: account.id, @@ -87,7 +86,7 @@ RSpec.describe Captain::Assistant::AgentRunnerService do conversation: hash_including(id: conversation.id), contact: hash_including(id: contact.id) ) - } + ) expect(mock_runner).to receive(:run).with( 'I need help with my account', @@ -98,6 +97,71 @@ RSpec.describe Captain::Assistant::AgentRunnerService do service.generate_response(message_history: message_history) end + context 'when the latest user message is multimodal' do + let(:multimodal_message_history) do + [ + { role: 'assistant', content: 'Please share a screenshot' }, + { + role: 'user', + content: [ + { type: 'text', text: 'What does this error mean?' }, + { type: 'image_url', image_url: { url: 'https://example.com/error.png' } } + ] + } + ] + end + + it 'passes image attachments to the runner input' do + expect(mock_runner).to receive(:run) do |input, context:, max_turns:| + expect(input).to be_a(RubyLLM::Content) + expect(input.text).to eq('What does this error mean?') + expect(input.attachments.first.source.to_s).to eq('https://example.com/error.png') + expect(context[:conversation_history]).to eq([{ role: :assistant, content: 'Please share a screenshot', agent_name: nil }]) + expect(max_turns).to eq(100) + end + + service.generate_response(message_history: multimodal_message_history) + end + + it 'preserves multimodal content in earlier history messages' do + history_with_prior_image = [ + { + role: 'user', + content: [ + { type: 'text', text: 'Here is my error screenshot' }, + { type: 'image_url', image_url: { url: 'https://example.com/error.png' } } + ] + }, + { role: 'assistant', content: 'I see the error. Try restarting.' }, + { role: 'user', content: 'It still does not work' } + ] + + expect(mock_runner).to receive(:run) do |input, context:, max_turns:| + expect(input).to eq('It still does not work') + # The earlier user message with the image should preserve the multimodal array + first_history_msg = context[:conversation_history].first + expect(first_history_msg[:content]).to be_a(Array) + expect(first_history_msg[:content]).to include( + { type: 'text', text: 'Here is my error screenshot' }, + { type: 'image_url', image_url: { url: 'https://example.com/error.png' } } + ) + expect(max_turns).to eq(100) + end + + service.generate_response(message_history: history_with_prior_image) + end + + it 'stores multimodal trace payloads in runner context' do + expect(mock_runner).to receive(:run) do |_input, context:, max_turns:| + expect(context[:captain_v2_trace_input]).to include('image_url') + expect(context[:captain_v2_trace_current_input]).to include('image_url') + expect(max_turns).to eq(100) + end + + service.generate_response(message_history: multimodal_message_history) + end + end + it 'processes and formats agent result' do result = service.generate_response(message_history: message_history) @@ -197,22 +261,21 @@ RSpec.describe Captain::Assistant::AgentRunnerService do end context 'with multimodal content' do - let(:multimodal_message_history) do + let(:multimodal_content) do [ - { - role: 'user', - content: [ - { type: 'text', text: 'Can you help with this image?' }, - { type: 'image_url', image_url: { url: 'https://example.com/image.jpg' } } - ] - } + { type: 'text', text: 'Can you help with this image?' }, + { type: 'image_url', image_url: { url: 'https://example.com/image.jpg' } } ] end - it 'extracts text content from multimodal messages' do + let(:multimodal_message_history) do + [{ role: 'user', content: multimodal_content }] + end + + it 'preserves multimodal arrays in conversation history for image context retention' do context = service.send(:build_context, multimodal_message_history) - expect(context[:conversation_history].first[:content]).to eq('Can you help with this image?') + expect(context[:conversation_history].first[:content]).to eq(multimodal_content) end end end @@ -225,6 +288,24 @@ RSpec.describe Captain::Assistant::AgentRunnerService do expect(result).to eq('I need help with my account') end + + it 'returns multimodal content with image attachments for the runner input' do + multimodal_message_history = [ + { + role: 'user', + content: [ + { type: 'text', text: 'Can you check this screenshot?' }, + { type: 'image_url', image_url: { url: 'https://example.com/image.jpg' } } + ] + } + ] + + result = service.send(:extract_last_user_message, multimodal_message_history) + + expect(result).to be_a(RubyLLM::Content) + expect(result.text).to eq('Can you check this screenshot?') + expect(result.attachments.first.source.to_s).to eq('https://example.com/image.jpg') + end end describe '#extract_text_from_content' do @@ -256,6 +337,28 @@ RSpec.describe Captain::Assistant::AgentRunnerService do end end + describe '#dynamic_trace_attributes' do + subject(:service) { described_class.new(assistant: assistant, conversation: conversation) } + + it 'adds serialized trace input attributes when present in context' do + context = { + state: { + account_id: account.id, + assistant_id: assistant.id, + conversation: { id: conversation.id, display_id: conversation.display_id } + }, + captain_v2_trace_input: '[{"role":"user","content":[{"type":"image_url","image_url":{"url":"https://example.com/image.jpg"}}]}]' + } + context_wrapper = Struct.new(:context).new(context) + + attributes = service.send(:dynamic_trace_attributes, context_wrapper) + + expect(attributes['langfuse.trace.input']).to include('image_url') + expect(attributes['langfuse.observation.input']).to include('image_url') + expect(attributes['langfuse.user.id']).to eq(account.id.to_s) + end + end + describe '#build_state' do subject(:service) { described_class.new(assistant: assistant, conversation: conversation) }