From 7cec4ebaaefe0b2bda4efe72aa1f8b36ced33364 Mon Sep 17 00:00:00 2001 From: Aakash Bakhle <48802744+aakashb95@users.noreply.github.com> Date: Tue, 24 Feb 2026 19:37:41 +0530 Subject: [PATCH] feat: support multimodal user messages in captain v2 (#13581) Extract and pass image attachments from the latest user message to the runner, excluding the last user message from the context for processing. Fixes #13588 # Pull Request Template ## Description Adds image support to captain v2 ## Type of change Please delete options that are not relevant. - [x] Bug fix (non-breaking change which fixes an issue) ## How Has This Been Tested? Please describe the tests that you ran to verify your changes. Provide instructions so we can reproduce. Please also list any relevant details for your test configuration. specs and local testing image langfuse also shows media correctly with the instrumentation code: image ## Checklist: - [x] My code follows the style guidelines of this project - [x] I have performed a self-review of my code - [x] I have commented on my code, particularly in hard-to-understand areas - [ ] I have made corresponding changes to the documentation - [x] My changes generate no new warnings - [x] I have added tests that prove my fix is effective or that my feature works - [x] New and existing unit tests pass locally with my changes - [x] Any dependent changes have been merged and published in downstream modules --------- Co-authored-by: Shivam Mishra Co-authored-by: Claude Opus 4.6 --- Gemfile | 2 +- Gemfile.lock | 4 +- .../captain/assistant/agent_runner_service.rb | 85 ++++++------ .../assistant/runner_callbacks_helper.rb | 53 +++++++ .../captain/assistant/trace_payload_helper.rb | 51 +++++++ .../assistant/agent_runner_service_spec.rb | 131 ++++++++++++++++-- 6 files changed, 264 insertions(+), 62 deletions(-) create mode 100644 enterprise/app/services/captain/assistant/runner_callbacks_helper.rb create mode 100644 enterprise/app/services/captain/assistant/trace_payload_helper.rb diff --git a/Gemfile b/Gemfile index e6c5a5250..0636b7b2b 100644 --- a/Gemfile +++ b/Gemfile @@ -191,7 +191,7 @@ gem 'reverse_markdown' gem 'iso-639' gem 'ruby-openai' -gem 'ai-agents' +gem 'ai-agents', '>= 0.9.1' # TODO: Move this gem as a dependency of ai-agents gem 'ruby_llm', '>= 1.8.2' diff --git a/Gemfile.lock b/Gemfile.lock index aad0438cf..7a7316e3c 100644 --- a/Gemfile.lock +++ b/Gemfile.lock @@ -126,7 +126,7 @@ GEM jbuilder (~> 2) rails (>= 4.2, < 7.2) selectize-rails (~> 0.6) - ai-agents (0.9.0) + ai-agents (0.9.1) ruby_llm (~> 1.9.1) annotaterb (4.20.0) activerecord (>= 6.0.0) @@ -1024,7 +1024,7 @@ DEPENDENCIES administrate (>= 0.20.1) administrate-field-active_storage (>= 1.0.3) administrate-field-belongs_to_search (>= 0.9.0) - ai-agents + ai-agents (>= 0.9.1) annotaterb attr_extras audited (~> 5.4, >= 5.4.1) diff --git a/enterprise/app/services/captain/assistant/agent_runner_service.rb b/enterprise/app/services/captain/assistant/agent_runner_service.rb index 9aeee605f..72c44024f 100644 --- a/enterprise/app/services/captain/assistant/agent_runner_service.rb +++ b/enterprise/app/services/captain/assistant/agent_runner_service.rb @@ -3,6 +3,8 @@ require 'agents/instrumentation' class Captain::Assistant::AgentRunnerService include Integrations::LlmInstrumentationConstants + include Captain::Assistant::RunnerCallbacksHelper + include Captain::Assistant::TracePayloadHelper CONVERSATION_STATE_ATTRIBUTES = %i[ id display_id inbox_id contact_id status priority @@ -21,13 +23,7 @@ class Captain::Assistant::AgentRunnerService end def generate_response(message_history: []) - agents = build_and_wire_agents - context = build_context(message_history) - message_to_process = extract_last_user_message(message_history) - runner = Agents::Runner.with_agents(*agents) - runner = add_usage_metadata_callback(runner) - runner = add_callbacks_to_runner(runner) if @callbacks.any? - install_instrumentation(runner) + message_to_process, context = run_payload(message_history) result = runner.run(message_to_process, context: context, max_turns: 100) process_agent_result(result) @@ -45,7 +41,10 @@ class Captain::Assistant::AgentRunnerService def build_context(message_history) conversation_history = message_history.map do |msg| - content = extract_text_from_content(msg[:content]) + content = msg[:content] + # Preserve multimodal arrays (with image_url entries) as-is for the runner to restore with attachments. + # Only extract text from non-array formats (hashes from agent structured output, plain strings). + content = extract_text_from_content(content) unless content.is_a?(Array) { role: msg[:role].to_sym, @@ -63,8 +62,22 @@ class Captain::Assistant::AgentRunnerService def extract_last_user_message(message_history) last_user_msg = message_history.reverse.find { |msg| msg[:role] == 'user' } + return '' if last_user_msg.blank? - extract_text_from_content(last_user_msg[:content]) + content = last_user_msg[:content] + return extract_text_from_content(content) unless content.is_a?(Array) + + text, attachments = Captain::OpenAiMessageBuilderService.extract_text_and_attachments(content) + return text if attachments.blank? + + RubyLLM::Content.new(text, attachments) + end + + def message_history_without_last_user_message(message_history) + last_user_index = message_history.rindex { |msg| msg[:role] == 'user' } + return message_history if last_user_index.nil? + + message_history.reject.with_index { |_msg, index| index == last_user_index } end def extract_text_from_content(content) @@ -143,28 +156,25 @@ class Captain::Assistant::AgentRunnerService }, attribute_provider: ->(context_wrapper) { dynamic_trace_attributes(context_wrapper) } ) + register_trace_input_callback(runner) end def dynamic_trace_attributes(context_wrapper) state = context_wrapper&.context&.dig(:state) || {} conversation = state[:conversation] || {} + trace_input = context_wrapper&.context&.dig(:captain_v2_trace_input) + { ATTR_LANGFUSE_USER_ID => state[:account_id], format(ATTR_LANGFUSE_METADATA, 'assistant_id') => state[:assistant_id], format(ATTR_LANGFUSE_METADATA, 'conversation_id') => conversation[:id], format(ATTR_LANGFUSE_METADATA, 'conversation_display_id') => conversation[:display_id], - format(ATTR_LANGFUSE_METADATA, 'channel_type') => state[:channel_type] + format(ATTR_LANGFUSE_METADATA, 'channel_type') => state[:channel_type], + ATTR_LANGFUSE_TRACE_INPUT => trace_input, + ATTR_LANGFUSE_OBSERVATION_INPUT => trace_input }.compact.transform_values(&:to_s) end - def add_callbacks_to_runner(runner) - runner = add_agent_thinking_callback(runner) if @callbacks[:on_agent_thinking] - runner = add_tool_start_callback(runner) if @callbacks[:on_tool_start] - runner = add_tool_complete_callback(runner) if @callbacks[:on_tool_complete] - runner = add_agent_handoff_callback(runner) if @callbacks[:on_agent_handoff] - runner - end - def add_usage_metadata_callback(runner) return runner unless ChatwootApp.otel_enabled? @@ -195,35 +205,20 @@ class Captain::Assistant::AgentRunnerService root_span.set_attribute(format(ATTR_LANGFUSE_METADATA, 'credit_used'), credit_used.to_s) end - def add_agent_thinking_callback(runner) - runner.on_agent_thinking do |*args| - @callbacks[:on_agent_thinking].call(*args) - rescue StandardError => e - Rails.logger.warn "[Captain] Callback error for agent_thinking: #{e.message}" + def runner + @runner ||= begin + configured_runner = Agents::Runner.with_agents(*build_and_wire_agents) + configured_runner = add_usage_metadata_callback(configured_runner) + configured_runner = add_callbacks_to_runner(configured_runner) if @callbacks.any? + install_instrumentation(configured_runner) + configured_runner end end - def add_tool_start_callback(runner) - runner.on_tool_start do |*args| - @callbacks[:on_tool_start].call(*args) - rescue StandardError => e - Rails.logger.warn "[Captain] Callback error for tool_start: #{e.message}" - end - end - - def add_tool_complete_callback(runner) - runner.on_tool_complete do |*args| - @callbacks[:on_tool_complete].call(*args) - rescue StandardError => e - Rails.logger.warn "[Captain] Callback error for tool_complete: #{e.message}" - end - end - - def add_agent_handoff_callback(runner) - runner.on_agent_handoff do |*args| - @callbacks[:on_agent_handoff].call(*args) - rescue StandardError => e - Rails.logger.warn "[Captain] Callback error for agent_handoff: #{e.message}" - end + def run_payload(message_history) + message_to_process = extract_last_user_message(message_history) + context = build_context(message_history_without_last_user_message(message_history)) + enrich_context_with_trace_payload!(context, message_history, message_to_process) + [message_to_process, context] end end diff --git a/enterprise/app/services/captain/assistant/runner_callbacks_helper.rb b/enterprise/app/services/captain/assistant/runner_callbacks_helper.rb new file mode 100644 index 000000000..58fa0ca33 --- /dev/null +++ b/enterprise/app/services/captain/assistant/runner_callbacks_helper.rb @@ -0,0 +1,53 @@ +module Captain::Assistant::RunnerCallbacksHelper + private + + def add_callbacks_to_runner(runner) + runner = add_agent_thinking_callback(runner) if @callbacks[:on_agent_thinking] + runner = add_tool_start_callback(runner) if @callbacks[:on_tool_start] + runner = add_tool_complete_callback(runner) if @callbacks[:on_tool_complete] + runner = add_agent_handoff_callback(runner) if @callbacks[:on_agent_handoff] + runner + end + + def register_trace_input_callback(runner) + runner.on_agent_thinking do |_agent_name, _input, context_wrapper| + tracing = context_wrapper&.context&.dig(:__otel_tracing) + next unless tracing + + trace_input = context_wrapper.context[:captain_v2_trace_current_input] + tracing[:pending_llm_input] = trace_input if trace_input.present? + end + end + + def add_agent_thinking_callback(runner) + runner.on_agent_thinking do |*args| + @callbacks[:on_agent_thinking].call(*args) + rescue StandardError => e + Rails.logger.warn "[Captain] Callback error for agent_thinking: #{e.message}" + end + end + + def add_tool_start_callback(runner) + runner.on_tool_start do |*args| + @callbacks[:on_tool_start].call(*args) + rescue StandardError => e + Rails.logger.warn "[Captain] Callback error for tool_start: #{e.message}" + end + end + + def add_tool_complete_callback(runner) + runner.on_tool_complete do |*args| + @callbacks[:on_tool_complete].call(*args) + rescue StandardError => e + Rails.logger.warn "[Captain] Callback error for tool_complete: #{e.message}" + end + end + + def add_agent_handoff_callback(runner) + runner.on_agent_handoff do |*args| + @callbacks[:on_agent_handoff].call(*args) + rescue StandardError => e + Rails.logger.warn "[Captain] Callback error for agent_handoff: #{e.message}" + end + end +end diff --git a/enterprise/app/services/captain/assistant/trace_payload_helper.rb b/enterprise/app/services/captain/assistant/trace_payload_helper.rb new file mode 100644 index 000000000..3b6546c12 --- /dev/null +++ b/enterprise/app/services/captain/assistant/trace_payload_helper.rb @@ -0,0 +1,51 @@ +module Captain::Assistant::TracePayloadHelper + private + + def enrich_context_with_trace_payload!(context, message_history, message_to_process) + context[:captain_v2_trace_input] = serialize_trace_messages(message_history) + context[:captain_v2_trace_current_input] = serialize_trace_content(message_to_process) + end + + def serialize_trace_messages(message_history) + message_history.map do |message| + { + role: message[:role].to_s, + content: trace_content_payload(message[:content]) + } + end.to_json + end + + def serialize_trace_content(content) + payload = trace_content_payload(content) + return '' if payload.blank? + + payload.is_a?(String) ? payload : payload.to_json + end + + def trace_content_payload(content) + case content + when RubyLLM::Content + trace_parts_from_ruby_llm_content(content) + when Array, Hash + content + when NilClass + '' + else + content.to_s + end + end + + def trace_parts_from_ruby_llm_content(content) + parts = [] + parts << { type: 'text', text: content.text } if content.text.present? + + content.attachments.each do |attachment| + parts << { type: 'image_url', image_url: { url: attachment.source.to_s } } + end + + return '' if parts.blank? + return parts.first[:text] if parts.one? && parts.first[:type] == 'text' + + parts + end +end diff --git a/spec/enterprise/services/captain/assistant/agent_runner_service_spec.rb b/spec/enterprise/services/captain/assistant/agent_runner_service_spec.rb index c3019b82c..13e4804ce 100644 --- a/spec/enterprise/services/captain/assistant/agent_runner_service_spec.rb +++ b/spec/enterprise/services/captain/assistant/agent_runner_service_spec.rb @@ -74,12 +74,11 @@ RSpec.describe Captain::Assistant::AgentRunnerService do end it 'runs agent with extracted user message and context' do - expected_context = { + expected_context = hash_including( session_id: "#{account.id}_#{conversation.display_id}", conversation_history: [ { role: :user, content: 'Hello there', agent_name: nil }, - { role: :assistant, content: 'Hi! How can I help you?', agent_name: 'Assistant' }, - { role: :user, content: 'I need help with my account', agent_name: nil } + { role: :assistant, content: 'Hi! How can I help you?', agent_name: 'Assistant' } ], state: hash_including( account_id: account.id, @@ -87,7 +86,7 @@ RSpec.describe Captain::Assistant::AgentRunnerService do conversation: hash_including(id: conversation.id), contact: hash_including(id: contact.id) ) - } + ) expect(mock_runner).to receive(:run).with( 'I need help with my account', @@ -98,6 +97,71 @@ RSpec.describe Captain::Assistant::AgentRunnerService do service.generate_response(message_history: message_history) end + context 'when the latest user message is multimodal' do + let(:multimodal_message_history) do + [ + { role: 'assistant', content: 'Please share a screenshot' }, + { + role: 'user', + content: [ + { type: 'text', text: 'What does this error mean?' }, + { type: 'image_url', image_url: { url: 'https://example.com/error.png' } } + ] + } + ] + end + + it 'passes image attachments to the runner input' do + expect(mock_runner).to receive(:run) do |input, context:, max_turns:| + expect(input).to be_a(RubyLLM::Content) + expect(input.text).to eq('What does this error mean?') + expect(input.attachments.first.source.to_s).to eq('https://example.com/error.png') + expect(context[:conversation_history]).to eq([{ role: :assistant, content: 'Please share a screenshot', agent_name: nil }]) + expect(max_turns).to eq(100) + end + + service.generate_response(message_history: multimodal_message_history) + end + + it 'preserves multimodal content in earlier history messages' do + history_with_prior_image = [ + { + role: 'user', + content: [ + { type: 'text', text: 'Here is my error screenshot' }, + { type: 'image_url', image_url: { url: 'https://example.com/error.png' } } + ] + }, + { role: 'assistant', content: 'I see the error. Try restarting.' }, + { role: 'user', content: 'It still does not work' } + ] + + expect(mock_runner).to receive(:run) do |input, context:, max_turns:| + expect(input).to eq('It still does not work') + # The earlier user message with the image should preserve the multimodal array + first_history_msg = context[:conversation_history].first + expect(first_history_msg[:content]).to be_a(Array) + expect(first_history_msg[:content]).to include( + { type: 'text', text: 'Here is my error screenshot' }, + { type: 'image_url', image_url: { url: 'https://example.com/error.png' } } + ) + expect(max_turns).to eq(100) + end + + service.generate_response(message_history: history_with_prior_image) + end + + it 'stores multimodal trace payloads in runner context' do + expect(mock_runner).to receive(:run) do |_input, context:, max_turns:| + expect(context[:captain_v2_trace_input]).to include('image_url') + expect(context[:captain_v2_trace_current_input]).to include('image_url') + expect(max_turns).to eq(100) + end + + service.generate_response(message_history: multimodal_message_history) + end + end + it 'processes and formats agent result' do result = service.generate_response(message_history: message_history) @@ -197,22 +261,21 @@ RSpec.describe Captain::Assistant::AgentRunnerService do end context 'with multimodal content' do - let(:multimodal_message_history) do + let(:multimodal_content) do [ - { - role: 'user', - content: [ - { type: 'text', text: 'Can you help with this image?' }, - { type: 'image_url', image_url: { url: 'https://example.com/image.jpg' } } - ] - } + { type: 'text', text: 'Can you help with this image?' }, + { type: 'image_url', image_url: { url: 'https://example.com/image.jpg' } } ] end - it 'extracts text content from multimodal messages' do + let(:multimodal_message_history) do + [{ role: 'user', content: multimodal_content }] + end + + it 'preserves multimodal arrays in conversation history for image context retention' do context = service.send(:build_context, multimodal_message_history) - expect(context[:conversation_history].first[:content]).to eq('Can you help with this image?') + expect(context[:conversation_history].first[:content]).to eq(multimodal_content) end end end @@ -225,6 +288,24 @@ RSpec.describe Captain::Assistant::AgentRunnerService do expect(result).to eq('I need help with my account') end + + it 'returns multimodal content with image attachments for the runner input' do + multimodal_message_history = [ + { + role: 'user', + content: [ + { type: 'text', text: 'Can you check this screenshot?' }, + { type: 'image_url', image_url: { url: 'https://example.com/image.jpg' } } + ] + } + ] + + result = service.send(:extract_last_user_message, multimodal_message_history) + + expect(result).to be_a(RubyLLM::Content) + expect(result.text).to eq('Can you check this screenshot?') + expect(result.attachments.first.source.to_s).to eq('https://example.com/image.jpg') + end end describe '#extract_text_from_content' do @@ -256,6 +337,28 @@ RSpec.describe Captain::Assistant::AgentRunnerService do end end + describe '#dynamic_trace_attributes' do + subject(:service) { described_class.new(assistant: assistant, conversation: conversation) } + + it 'adds serialized trace input attributes when present in context' do + context = { + state: { + account_id: account.id, + assistant_id: assistant.id, + conversation: { id: conversation.id, display_id: conversation.display_id } + }, + captain_v2_trace_input: '[{"role":"user","content":[{"type":"image_url","image_url":{"url":"https://example.com/image.jpg"}}]}]' + } + context_wrapper = Struct.new(:context).new(context) + + attributes = service.send(:dynamic_trace_attributes, context_wrapper) + + expect(attributes['langfuse.trace.input']).to include('image_url') + expect(attributes['langfuse.observation.input']).to include('image_url') + expect(attributes['langfuse.user.id']).to eq(account.id.to_s) + end + end + describe '#build_state' do subject(:service) { described_class.new(assistant: assistant, conversation: conversation) }