Files
leadchat/enterprise/app/services/captain/assistant/agent_runner_service.rb
Aakash Bakhle 3874383698 feat: insrument captain v2 (#13439)
# Pull Request Template

## Description

Instruments captain v2

## Type of change

- [x] New feature (non-breaking change which adds functionality)

## How Has This Been Tested?

Please describe the tests that you ran to verify your changes. Provide
instructions so we can reproduce. Please also list any relevant details
for your test configuration.

Local testing:
<img width="864" height="510" alt="image"
src="https://github.com/user-attachments/assets/855ebce5-e8b8-4d22-b0bb-0d413769a6ab"
/>



## Checklist:

- [x] My code follows the style guidelines of this project
- [x] I have performed a self-review of my code
- [x] I have commented on my code, particularly in hard-to-understand
areas
- [ ] I have made corresponding changes to the documentation
- [x] My changes generate no new warnings
- [x] I have added tests that prove my fix is effective or that my
feature works
- [x] New and existing unit tests pass locally with my changes
- [x] Any dependent changes have been merged and published in downstream
modules

---------

Co-authored-by: Shivam Mishra <scm.mymail@gmail.com>
2026-02-17 13:28:26 +05:30

228 lines
7.2 KiB
Ruby

require 'agents'
require 'agents/instrumentation'
class Captain::Assistant::AgentRunnerService
include Integrations::LlmInstrumentationConstants
CONVERSATION_STATE_ATTRIBUTES = %i[
id display_id inbox_id contact_id status priority
label_list custom_attributes additional_attributes
].freeze
CONTACT_STATE_ATTRIBUTES = %i[
id name email phone_number identifier contact_type
custom_attributes additional_attributes
].freeze
def initialize(assistant:, conversation: nil, callbacks: {})
@assistant = assistant
@conversation = conversation
@callbacks = callbacks
end
def generate_response(message_history: [])
agents = build_and_wire_agents
context = build_context(message_history)
message_to_process = extract_last_user_message(message_history)
runner = Agents::Runner.with_agents(*agents)
runner = add_usage_metadata_callback(runner)
runner = add_callbacks_to_runner(runner) if @callbacks.any?
install_instrumentation(runner)
result = runner.run(message_to_process, context: context, max_turns: 100)
process_agent_result(result)
rescue StandardError => e
# when running the agent runner service in a rake task, the conversation might not have an account associated
# for regular production usage, it will run just fine
ChatwootExceptionTracker.new(e, account: @conversation&.account).capture_exception
Rails.logger.error "[Captain V2] AgentRunnerService error: #{e.message}"
Rails.logger.error e.backtrace.join("\n")
error_response(e.message)
end
private
def build_context(message_history)
conversation_history = message_history.map do |msg|
content = extract_text_from_content(msg[:content])
{
role: msg[:role].to_sym,
content: content,
agent_name: msg[:agent_name]
}
end
{
session_id: "#{@assistant.account_id}_#{@conversation&.display_id}",
conversation_history: conversation_history,
state: build_state
}
end
def extract_last_user_message(message_history)
last_user_msg = message_history.reverse.find { |msg| msg[:role] == 'user' }
extract_text_from_content(last_user_msg[:content])
end
def extract_text_from_content(content)
# Handle structured output from agents
return content[:response] || content['response'] || content.to_s if content.is_a?(Hash)
return content unless content.is_a?(Array)
text_parts = content.select { |part| part[:type] == 'text' }.pluck(:text)
text_parts.join(' ')
end
# Response formatting methods
def process_agent_result(result)
Rails.logger.info "[Captain V2] Agent result: #{result.inspect}"
response = format_response(result.output)
# Extract agent name from context
response['agent_name'] = result.context&.dig(:current_agent)
response
end
def format_response(output)
return output.with_indifferent_access if output.is_a?(Hash)
# Fallback for backwards compatibility
{
'response' => output.to_s,
'reasoning' => 'Processed by agent'
}
end
def error_response(error_message)
{
'response' => 'conversation_handoff',
'reasoning' => "Error occurred: #{error_message}"
}
end
def build_state
state = {
account_id: @assistant.account_id,
assistant_id: @assistant.id,
assistant_config: @assistant.config
}
if @conversation
state[:conversation] = @conversation.attributes.symbolize_keys.slice(*CONVERSATION_STATE_ATTRIBUTES)
state[:contact] = @conversation.contact.attributes.symbolize_keys.slice(*CONTACT_STATE_ATTRIBUTES) if @conversation.contact
end
state
end
def build_and_wire_agents
assistant_agent = @assistant.agent
scenario_agents = @assistant.scenarios.enabled.map(&:agent)
assistant_agent.register_handoffs(*scenario_agents) if scenario_agents.any?
scenario_agents.each { |scenario_agent| scenario_agent.register_handoffs(assistant_agent) }
[assistant_agent] + scenario_agents
end
def install_instrumentation(runner)
return unless ChatwootApp.otel_enabled?
Agents::Instrumentation.install(
runner,
tracer: OpentelemetryConfig.tracer,
trace_name: 'llm.captain_v2',
span_attributes: {
ATTR_LANGFUSE_TAGS => ['captain_v2'].to_json
},
attribute_provider: ->(context_wrapper) { dynamic_trace_attributes(context_wrapper) }
)
end
def dynamic_trace_attributes(context_wrapper)
state = context_wrapper&.context&.dig(:state) || {}
conversation = state[:conversation] || {}
{
ATTR_LANGFUSE_USER_ID => state[:account_id],
format(ATTR_LANGFUSE_METADATA, 'assistant_id') => state[:assistant_id],
format(ATTR_LANGFUSE_METADATA, 'conversation_id') => conversation[:id],
format(ATTR_LANGFUSE_METADATA, 'conversation_display_id') => conversation[:display_id]
}.compact.transform_values(&:to_s)
end
def add_callbacks_to_runner(runner)
runner = add_agent_thinking_callback(runner) if @callbacks[:on_agent_thinking]
runner = add_tool_start_callback(runner) if @callbacks[:on_tool_start]
runner = add_tool_complete_callback(runner) if @callbacks[:on_tool_complete]
runner = add_agent_handoff_callback(runner) if @callbacks[:on_agent_handoff]
runner
end
def add_usage_metadata_callback(runner)
return runner unless ChatwootApp.otel_enabled?
handoff_tool_name = Captain::Tools::HandoffTool.new(@assistant).name
runner.on_tool_complete do |tool_name, _tool_result, context_wrapper|
track_handoff_usage(tool_name, handoff_tool_name, context_wrapper)
end
runner.on_run_complete do |_agent_name, _result, context_wrapper|
write_credits_used_metadata(context_wrapper)
end
runner
end
def track_handoff_usage(tool_name, handoff_tool_name, context_wrapper)
return unless context_wrapper&.context
return unless tool_name.to_s == handoff_tool_name
context_wrapper.context[:captain_v2_handoff_tool_called] = true
end
def write_credits_used_metadata(context_wrapper)
root_span = context_wrapper&.context&.dig(:__otel_tracing, :root_span)
return unless root_span
credits_used = !context_wrapper.context[:captain_v2_handoff_tool_called]
root_span.set_attribute(format(ATTR_LANGFUSE_METADATA, 'credits_used'), credits_used)
end
def add_agent_thinking_callback(runner)
runner.on_agent_thinking do |*args|
@callbacks[:on_agent_thinking].call(*args)
rescue StandardError => e
Rails.logger.warn "[Captain] Callback error for agent_thinking: #{e.message}"
end
end
def add_tool_start_callback(runner)
runner.on_tool_start do |*args|
@callbacks[:on_tool_start].call(*args)
rescue StandardError => e
Rails.logger.warn "[Captain] Callback error for tool_start: #{e.message}"
end
end
def add_tool_complete_callback(runner)
runner.on_tool_complete do |*args|
@callbacks[:on_tool_complete].call(*args)
rescue StandardError => e
Rails.logger.warn "[Captain] Callback error for tool_complete: #{e.message}"
end
end
def add_agent_handoff_callback(runner)
runner.on_agent_handoff do |*args|
@callbacks[:on_agent_handoff].call(*args)
rescue StandardError => e
Rails.logger.warn "[Captain] Callback error for agent_handoff: #{e.message}"
end
end
end