feat: support multimodal user messages in captain v2 (#13581)
Extract and pass image attachments from the latest user message to the runner, excluding the last user message from the context for processing. Fixes #13588 # Pull Request Template ## Description Adds image support to captain v2 ## Type of change Please delete options that are not relevant. - [x] Bug fix (non-breaking change which fixes an issue) ## How Has This Been Tested? Please describe the tests that you ran to verify your changes. Provide instructions so we can reproduce. Please also list any relevant details for your test configuration. specs and local testing <img width="754" height="1008" alt="image" src="https://github.com/user-attachments/assets/914cbc2c-9d30-42d0-87d4-9e5430845c87" /> langfuse also shows media correctly with the instrumentation code: <img width="1800" height="1260" alt="image" src="https://github.com/user-attachments/assets/ce0f5fa6-b1a5-42ec-a213-9a82b1751037" /> ## Checklist: - [x] My code follows the style guidelines of this project - [x] I have performed a self-review of my code - [x] I have commented on my code, particularly in hard-to-understand areas - [ ] I have made corresponding changes to the documentation - [x] My changes generate no new warnings - [x] I have added tests that prove my fix is effective or that my feature works - [x] New and existing unit tests pass locally with my changes - [x] Any dependent changes have been merged and published in downstream modules --------- Co-authored-by: Shivam Mishra <scm.mymail@gmail.com> Co-authored-by: Claude Opus 4.6 <noreply@anthropic.com>
This commit is contained in:
@@ -3,6 +3,8 @@ require 'agents/instrumentation'
|
||||
|
||||
class Captain::Assistant::AgentRunnerService
|
||||
include Integrations::LlmInstrumentationConstants
|
||||
include Captain::Assistant::RunnerCallbacksHelper
|
||||
include Captain::Assistant::TracePayloadHelper
|
||||
|
||||
CONVERSATION_STATE_ATTRIBUTES = %i[
|
||||
id display_id inbox_id contact_id status priority
|
||||
@@ -21,13 +23,7 @@ class Captain::Assistant::AgentRunnerService
|
||||
end
|
||||
|
||||
def generate_response(message_history: [])
|
||||
agents = build_and_wire_agents
|
||||
context = build_context(message_history)
|
||||
message_to_process = extract_last_user_message(message_history)
|
||||
runner = Agents::Runner.with_agents(*agents)
|
||||
runner = add_usage_metadata_callback(runner)
|
||||
runner = add_callbacks_to_runner(runner) if @callbacks.any?
|
||||
install_instrumentation(runner)
|
||||
message_to_process, context = run_payload(message_history)
|
||||
result = runner.run(message_to_process, context: context, max_turns: 100)
|
||||
|
||||
process_agent_result(result)
|
||||
@@ -45,7 +41,10 @@ class Captain::Assistant::AgentRunnerService
|
||||
|
||||
def build_context(message_history)
|
||||
conversation_history = message_history.map do |msg|
|
||||
content = extract_text_from_content(msg[:content])
|
||||
content = msg[:content]
|
||||
# Preserve multimodal arrays (with image_url entries) as-is for the runner to restore with attachments.
|
||||
# Only extract text from non-array formats (hashes from agent structured output, plain strings).
|
||||
content = extract_text_from_content(content) unless content.is_a?(Array)
|
||||
|
||||
{
|
||||
role: msg[:role].to_sym,
|
||||
@@ -63,8 +62,22 @@ class Captain::Assistant::AgentRunnerService
|
||||
|
||||
def extract_last_user_message(message_history)
|
||||
last_user_msg = message_history.reverse.find { |msg| msg[:role] == 'user' }
|
||||
return '' if last_user_msg.blank?
|
||||
|
||||
extract_text_from_content(last_user_msg[:content])
|
||||
content = last_user_msg[:content]
|
||||
return extract_text_from_content(content) unless content.is_a?(Array)
|
||||
|
||||
text, attachments = Captain::OpenAiMessageBuilderService.extract_text_and_attachments(content)
|
||||
return text if attachments.blank?
|
||||
|
||||
RubyLLM::Content.new(text, attachments)
|
||||
end
|
||||
|
||||
def message_history_without_last_user_message(message_history)
|
||||
last_user_index = message_history.rindex { |msg| msg[:role] == 'user' }
|
||||
return message_history if last_user_index.nil?
|
||||
|
||||
message_history.reject.with_index { |_msg, index| index == last_user_index }
|
||||
end
|
||||
|
||||
def extract_text_from_content(content)
|
||||
@@ -143,28 +156,25 @@ class Captain::Assistant::AgentRunnerService
|
||||
},
|
||||
attribute_provider: ->(context_wrapper) { dynamic_trace_attributes(context_wrapper) }
|
||||
)
|
||||
register_trace_input_callback(runner)
|
||||
end
|
||||
|
||||
def dynamic_trace_attributes(context_wrapper)
|
||||
state = context_wrapper&.context&.dig(:state) || {}
|
||||
conversation = state[:conversation] || {}
|
||||
trace_input = context_wrapper&.context&.dig(:captain_v2_trace_input)
|
||||
|
||||
{
|
||||
ATTR_LANGFUSE_USER_ID => state[:account_id],
|
||||
format(ATTR_LANGFUSE_METADATA, 'assistant_id') => state[:assistant_id],
|
||||
format(ATTR_LANGFUSE_METADATA, 'conversation_id') => conversation[:id],
|
||||
format(ATTR_LANGFUSE_METADATA, 'conversation_display_id') => conversation[:display_id],
|
||||
format(ATTR_LANGFUSE_METADATA, 'channel_type') => state[:channel_type]
|
||||
format(ATTR_LANGFUSE_METADATA, 'channel_type') => state[:channel_type],
|
||||
ATTR_LANGFUSE_TRACE_INPUT => trace_input,
|
||||
ATTR_LANGFUSE_OBSERVATION_INPUT => trace_input
|
||||
}.compact.transform_values(&:to_s)
|
||||
end
|
||||
|
||||
def add_callbacks_to_runner(runner)
|
||||
runner = add_agent_thinking_callback(runner) if @callbacks[:on_agent_thinking]
|
||||
runner = add_tool_start_callback(runner) if @callbacks[:on_tool_start]
|
||||
runner = add_tool_complete_callback(runner) if @callbacks[:on_tool_complete]
|
||||
runner = add_agent_handoff_callback(runner) if @callbacks[:on_agent_handoff]
|
||||
runner
|
||||
end
|
||||
|
||||
def add_usage_metadata_callback(runner)
|
||||
return runner unless ChatwootApp.otel_enabled?
|
||||
|
||||
@@ -195,35 +205,20 @@ class Captain::Assistant::AgentRunnerService
|
||||
root_span.set_attribute(format(ATTR_LANGFUSE_METADATA, 'credit_used'), credit_used.to_s)
|
||||
end
|
||||
|
||||
def add_agent_thinking_callback(runner)
|
||||
runner.on_agent_thinking do |*args|
|
||||
@callbacks[:on_agent_thinking].call(*args)
|
||||
rescue StandardError => e
|
||||
Rails.logger.warn "[Captain] Callback error for agent_thinking: #{e.message}"
|
||||
def runner
|
||||
@runner ||= begin
|
||||
configured_runner = Agents::Runner.with_agents(*build_and_wire_agents)
|
||||
configured_runner = add_usage_metadata_callback(configured_runner)
|
||||
configured_runner = add_callbacks_to_runner(configured_runner) if @callbacks.any?
|
||||
install_instrumentation(configured_runner)
|
||||
configured_runner
|
||||
end
|
||||
end
|
||||
|
||||
def add_tool_start_callback(runner)
|
||||
runner.on_tool_start do |*args|
|
||||
@callbacks[:on_tool_start].call(*args)
|
||||
rescue StandardError => e
|
||||
Rails.logger.warn "[Captain] Callback error for tool_start: #{e.message}"
|
||||
end
|
||||
end
|
||||
|
||||
def add_tool_complete_callback(runner)
|
||||
runner.on_tool_complete do |*args|
|
||||
@callbacks[:on_tool_complete].call(*args)
|
||||
rescue StandardError => e
|
||||
Rails.logger.warn "[Captain] Callback error for tool_complete: #{e.message}"
|
||||
end
|
||||
end
|
||||
|
||||
def add_agent_handoff_callback(runner)
|
||||
runner.on_agent_handoff do |*args|
|
||||
@callbacks[:on_agent_handoff].call(*args)
|
||||
rescue StandardError => e
|
||||
Rails.logger.warn "[Captain] Callback error for agent_handoff: #{e.message}"
|
||||
end
|
||||
def run_payload(message_history)
|
||||
message_to_process = extract_last_user_message(message_history)
|
||||
context = build_context(message_history_without_last_user_message(message_history))
|
||||
enrich_context_with_trace_payload!(context, message_history, message_to_process)
|
||||
[message_to_process, context]
|
||||
end
|
||||
end
|
||||
|
||||
@@ -0,0 +1,53 @@
|
||||
module Captain::Assistant::RunnerCallbacksHelper
|
||||
private
|
||||
|
||||
def add_callbacks_to_runner(runner)
|
||||
runner = add_agent_thinking_callback(runner) if @callbacks[:on_agent_thinking]
|
||||
runner = add_tool_start_callback(runner) if @callbacks[:on_tool_start]
|
||||
runner = add_tool_complete_callback(runner) if @callbacks[:on_tool_complete]
|
||||
runner = add_agent_handoff_callback(runner) if @callbacks[:on_agent_handoff]
|
||||
runner
|
||||
end
|
||||
|
||||
def register_trace_input_callback(runner)
|
||||
runner.on_agent_thinking do |_agent_name, _input, context_wrapper|
|
||||
tracing = context_wrapper&.context&.dig(:__otel_tracing)
|
||||
next unless tracing
|
||||
|
||||
trace_input = context_wrapper.context[:captain_v2_trace_current_input]
|
||||
tracing[:pending_llm_input] = trace_input if trace_input.present?
|
||||
end
|
||||
end
|
||||
|
||||
def add_agent_thinking_callback(runner)
|
||||
runner.on_agent_thinking do |*args|
|
||||
@callbacks[:on_agent_thinking].call(*args)
|
||||
rescue StandardError => e
|
||||
Rails.logger.warn "[Captain] Callback error for agent_thinking: #{e.message}"
|
||||
end
|
||||
end
|
||||
|
||||
def add_tool_start_callback(runner)
|
||||
runner.on_tool_start do |*args|
|
||||
@callbacks[:on_tool_start].call(*args)
|
||||
rescue StandardError => e
|
||||
Rails.logger.warn "[Captain] Callback error for tool_start: #{e.message}"
|
||||
end
|
||||
end
|
||||
|
||||
def add_tool_complete_callback(runner)
|
||||
runner.on_tool_complete do |*args|
|
||||
@callbacks[:on_tool_complete].call(*args)
|
||||
rescue StandardError => e
|
||||
Rails.logger.warn "[Captain] Callback error for tool_complete: #{e.message}"
|
||||
end
|
||||
end
|
||||
|
||||
def add_agent_handoff_callback(runner)
|
||||
runner.on_agent_handoff do |*args|
|
||||
@callbacks[:on_agent_handoff].call(*args)
|
||||
rescue StandardError => e
|
||||
Rails.logger.warn "[Captain] Callback error for agent_handoff: #{e.message}"
|
||||
end
|
||||
end
|
||||
end
|
||||
@@ -0,0 +1,51 @@
|
||||
module Captain::Assistant::TracePayloadHelper
|
||||
private
|
||||
|
||||
def enrich_context_with_trace_payload!(context, message_history, message_to_process)
|
||||
context[:captain_v2_trace_input] = serialize_trace_messages(message_history)
|
||||
context[:captain_v2_trace_current_input] = serialize_trace_content(message_to_process)
|
||||
end
|
||||
|
||||
def serialize_trace_messages(message_history)
|
||||
message_history.map do |message|
|
||||
{
|
||||
role: message[:role].to_s,
|
||||
content: trace_content_payload(message[:content])
|
||||
}
|
||||
end.to_json
|
||||
end
|
||||
|
||||
def serialize_trace_content(content)
|
||||
payload = trace_content_payload(content)
|
||||
return '' if payload.blank?
|
||||
|
||||
payload.is_a?(String) ? payload : payload.to_json
|
||||
end
|
||||
|
||||
def trace_content_payload(content)
|
||||
case content
|
||||
when RubyLLM::Content
|
||||
trace_parts_from_ruby_llm_content(content)
|
||||
when Array, Hash
|
||||
content
|
||||
when NilClass
|
||||
''
|
||||
else
|
||||
content.to_s
|
||||
end
|
||||
end
|
||||
|
||||
def trace_parts_from_ruby_llm_content(content)
|
||||
parts = []
|
||||
parts << { type: 'text', text: content.text } if content.text.present?
|
||||
|
||||
content.attachments.each do |attachment|
|
||||
parts << { type: 'image_url', image_url: { url: attachment.source.to_s } }
|
||||
end
|
||||
|
||||
return '' if parts.blank?
|
||||
return parts.first[:text] if parts.one? && parts.first[:type] == 'text'
|
||||
|
||||
parts
|
||||
end
|
||||
end
|
||||
Reference in New Issue
Block a user