Files
leadchat/app/services/reporting_events/event_metric_registry.rb
Shivam Mishra 9967101b48 feat(rollup): add models and write path [1/3] (#13796)
## PR#1: Reporting events rollup — model and write path

Reporting queries currently hit the `reporting_events` table directly.
This works, but the table grows linearly with event volume, and
aggregation queries (counts, averages over date ranges) get
progressively slower as accounts age.

This PR introduces a pre-aggregated `reporting_events_rollups` table
that stores daily per-metric, per-dimension (account/agent/inbox)
totals. The write path is intentionally decoupled from the read path —
rollup rows are written inline from the event listener via upsert, and a
backfill service exists to rebuild historical data from raw events.
Nothing reads from this table yet.

The write path activates when an account has a `reporting_timezone` set
(new account setting). The `reporting_events_rollup` feature flag
controls only the future read path, not writes — so rollup data
accumulates silently once timezone is configured. A `MetricRegistry`
maps raw event names to rollup column semantics in one place, keeping
the write and (future) read paths aligned.

### What changed

- Migration for `reporting_events_rollups` with a unique composite index
for upsert
- `ReportingEventsRollup` model
- `reporting_timezone` account setting with IANA timezone validation
- `MetricRegistry` — single source of truth for event-to-metric mappings
- `RollupService` — real-time upsert from event listener
- `BackfillService` — rebuilds rollups for a given account + date from
raw events
- Rake tasks for interactive backfill and timezone setup
- `reporting_events_rollup` feature flag (disabled by default)

### How to test

1. Set a `reporting_timezone` on an account
(`Account.first.update!(reporting_timezone: 'Asia/Kolkata')`)
2. Resolve a conversation or trigger a first response
3. Check `ReportingEventsRollup.where(account_id: ...)` — rows should
appear
4. Run backfill: `bundle exec rake reporting_events_rollup:backfill` and
verify historical data populates

---------

Co-authored-by: Muhsin Keloth <muhsinkeramam@gmail.com>
2026-03-19 13:12:36 +05:30

80 lines
2.3 KiB
Ruby

module ReportingEvents::EventMetricRegistry
# Describes one rollup metric emitted by a raw reporting event.
# rollup_metric: metric name stored in reporting_events_rollups.
# payload_kind: whether the emitted row carries only a count or a duration payload.
Metric = Data.define(:rollup_metric, :payload_kind)
EVENTS = {
conversation_resolved: [
Metric.new(rollup_metric: :resolutions_count, payload_kind: :count),
Metric.new(rollup_metric: :resolution_time, payload_kind: :duration)
].freeze,
first_response: [
Metric.new(rollup_metric: :first_response, payload_kind: :duration)
].freeze,
reply_time: [
Metric.new(rollup_metric: :reply_time, payload_kind: :duration)
].freeze,
conversation_bot_resolved: [
Metric.new(rollup_metric: :bot_resolutions_count, payload_kind: :count)
].freeze,
conversation_bot_handoff: [
Metric.new(rollup_metric: :bot_handoffs_count, payload_kind: :count)
].freeze
}.freeze
module_function
def event_names
EVENTS.keys.map(&:to_s)
end
def metrics_for(event)
return {} if event.blank?
metrics_for_aggregate(
event.name,
count: 1,
sum_value: event.try(:value),
sum_value_business_hours: event.try(:value_in_business_hours)
)
end
def metrics_for_aggregate(event_name, count:, sum_value:, sum_value_business_hours:)
return {} if event_name.blank?
values = {
count: count.to_i,
sum_value: sum_value.to_f,
sum_value_business_hours: sum_value_business_hours.to_f
}
EVENTS.fetch(event_name.to_sym, []).to_h do |metric|
[metric.rollup_metric, metric_values(metric.payload_kind, values)]
end
end
private_class_method def metric_values(payload_kind, values)
case payload_kind
when :count
count_values(values[:count])
when :duration
duration_values(values)
else
raise ArgumentError, "Unknown metric payload kind: #{payload_kind.inspect}"
end
end
private_class_method def count_values(count)
{ count: count, sum_value: 0, sum_value_business_hours: 0 }
end
private_class_method def duration_values(values)
{
count: values[:count],
sum_value: values[:sum_value],
sum_value_business_hours: values[:sum_value_business_hours]
}
end
end