feat(rollup): add models and write path [1/3] (#13796)
## PR#1: Reporting events rollup — model and write path Reporting queries currently hit the `reporting_events` table directly. This works, but the table grows linearly with event volume, and aggregation queries (counts, averages over date ranges) get progressively slower as accounts age. This PR introduces a pre-aggregated `reporting_events_rollups` table that stores daily per-metric, per-dimension (account/agent/inbox) totals. The write path is intentionally decoupled from the read path — rollup rows are written inline from the event listener via upsert, and a backfill service exists to rebuild historical data from raw events. Nothing reads from this table yet. The write path activates when an account has a `reporting_timezone` set (new account setting). The `reporting_events_rollup` feature flag controls only the future read path, not writes — so rollup data accumulates silently once timezone is configured. A `MetricRegistry` maps raw event names to rollup column semantics in one place, keeping the write and (future) read paths aligned. ### What changed - Migration for `reporting_events_rollups` with a unique composite index for upsert - `ReportingEventsRollup` model - `reporting_timezone` account setting with IANA timezone validation - `MetricRegistry` — single source of truth for event-to-metric mappings - `RollupService` — real-time upsert from event listener - `BackfillService` — rebuilds rollups for a given account + date from raw events - Rake tasks for interactive backfill and timezone setup - `reporting_events_rollup` feature flag (disabled by default) ### How to test 1. Set a `reporting_timezone` on an account (`Account.first.update!(reporting_timezone: 'Asia/Kolkata')`) 2. Resolve a conversation or trigger a first response 3. Check `ReportingEventsRollup.where(account_id: ...)` — rows should appear 4. Run backfill: `bundle exec rake reporting_events_rollup:backfill` and verify historical data populates --------- Co-authored-by: Muhsin Keloth <muhsinkeramam@gmail.com>
This commit is contained in:
142
app/services/reporting_events/backfill_service.rb
Normal file
142
app/services/reporting_events/backfill_service.rb
Normal file
@@ -0,0 +1,142 @@
|
||||
# frozen_string_literal: true
|
||||
|
||||
class ReportingEvents::BackfillService
|
||||
DIMENSIONS = [
|
||||
{ type: 'account', group_column: nil },
|
||||
{ type: 'agent', group_column: :user_id },
|
||||
{ type: 'inbox', group_column: :inbox_id }
|
||||
].freeze
|
||||
|
||||
# TODO: Move this to EventMetricRegistry when we expand distinct-counting support.
|
||||
# The live path already guards uniqueness in ReportingEventListener#conversation_bot_handoff,
|
||||
# but historical duplicates can exist since it's not enforced at the DB level.
|
||||
# These events are queried per-dimension (not group-then-sum) because COUNT(DISTINCT) is not additive.
|
||||
DISTINCT_COUNT_EVENTS = %w[conversation_bot_handoff].freeze
|
||||
|
||||
DISTINCT_COUNT_SQL = Arel.sql('COUNT(DISTINCT conversation_id)')
|
||||
|
||||
def self.backfill_date(account, date)
|
||||
new(account, date).perform
|
||||
end
|
||||
|
||||
def initialize(account, date)
|
||||
@account = account
|
||||
@date = date
|
||||
end
|
||||
|
||||
def perform
|
||||
start_utc, end_utc = date_boundaries_in_utc
|
||||
rollup_rows = build_rollup_rows(start_utc, end_utc)
|
||||
|
||||
ReportingEventsRollup.transaction do
|
||||
delete_existing_rollups
|
||||
bulk_insert_rollups(rollup_rows) if rollup_rows.any?
|
||||
end
|
||||
end
|
||||
|
||||
private
|
||||
|
||||
def delete_existing_rollups
|
||||
ReportingEventsRollup.where(account_id: @account.id, date: @date).delete_all
|
||||
end
|
||||
|
||||
def date_boundaries_in_utc
|
||||
tz = ActiveSupport::TimeZone[@account.reporting_timezone]
|
||||
start_in_tz = tz.parse(@date.to_s)
|
||||
end_in_tz = tz.parse((@date + 1.day).to_s)
|
||||
[start_in_tz.utc, end_in_tz.utc]
|
||||
end
|
||||
|
||||
def build_rollup_rows(start_utc, end_utc)
|
||||
aggregates = build_aggregates(start_utc, end_utc)
|
||||
|
||||
aggregates.map do |(dimension_type, dimension_id, metric), data|
|
||||
{
|
||||
account_id: @account.id,
|
||||
date: @date,
|
||||
dimension_type: dimension_type,
|
||||
dimension_id: dimension_id,
|
||||
metric: metric,
|
||||
count: data[:count],
|
||||
sum_value: data[:sum_value],
|
||||
sum_value_business_hours: data[:sum_value_business_hours],
|
||||
created_at: Time.current,
|
||||
updated_at: Time.current
|
||||
}
|
||||
end
|
||||
end
|
||||
|
||||
def build_aggregates(start_utc, end_utc)
|
||||
aggregates = Hash.new { |h, k| h[k] = { count: 0, sum_value: 0.0, sum_value_business_hours: 0.0 } }
|
||||
standard_names = ReportingEvents::EventMetricRegistry.event_names - DISTINCT_COUNT_EVENTS
|
||||
base = @account.reporting_events.where(created_at: start_utc...end_utc)
|
||||
|
||||
DIMENSIONS.each do |dimension|
|
||||
aggregate_standard_events(aggregates, base.where(name: standard_names), dimension)
|
||||
aggregate_distinct_events(aggregates, base.where(name: DISTINCT_COUNT_EVENTS), dimension)
|
||||
end
|
||||
|
||||
aggregates
|
||||
end
|
||||
|
||||
def aggregate_standard_events(aggregates, scope, dimension)
|
||||
group_cols, selects = dimension_groups_and_selects(dimension)
|
||||
|
||||
scope.group(*group_cols).pluck(*selects).each do |row|
|
||||
event_name, dimension_id, count, sum_value, sum_value_business_hours = unpack_row(row, dimension)
|
||||
next if dimension_id.nil?
|
||||
|
||||
accumulate_metrics(aggregates, dimension[:type], dimension_id, event_name,
|
||||
{ count: count, sum_value: sum_value, sum_value_business_hours: sum_value_business_hours })
|
||||
end
|
||||
end
|
||||
|
||||
def accumulate_metrics(aggregates, dimension_type, dimension_id, event_name, values)
|
||||
ReportingEvents::EventMetricRegistry.metrics_for_aggregate(event_name, **values).each do |metric, metric_data|
|
||||
key = [dimension_type, dimension_id, metric]
|
||||
aggregates[key][:count] += metric_data[:count]
|
||||
aggregates[key][:sum_value] += metric_data[:sum_value].to_f
|
||||
aggregates[key][:sum_value_business_hours] += metric_data[:sum_value_business_hours].to_f
|
||||
end
|
||||
end
|
||||
|
||||
def aggregate_distinct_events(aggregates, scope, dimension)
|
||||
return if DISTINCT_COUNT_EVENTS.empty?
|
||||
|
||||
group_cols = dimension[:group_column] ? [:name, dimension[:group_column]] : [:name]
|
||||
|
||||
scope.group(*group_cols).pluck(*group_cols, DISTINCT_COUNT_SQL).each do |row|
|
||||
event_name, dimension_id, count = dimension[:group_column] ? row : [row[0], @account.id, row[1]]
|
||||
next if dimension_id.nil?
|
||||
|
||||
accumulate_metrics(aggregates, dimension[:type], dimension_id, event_name,
|
||||
{ count: count, sum_value: 0, sum_value_business_hours: 0 })
|
||||
end
|
||||
end
|
||||
|
||||
def dimension_groups_and_selects(dimension)
|
||||
agg_selects = [Arel.sql('COUNT(*)'), Arel.sql('COALESCE(SUM(value), 0)'), Arel.sql('COALESCE(SUM(value_in_business_hours), 0)')]
|
||||
|
||||
if dimension[:group_column]
|
||||
[[:name, dimension[:group_column]], [:name, dimension[:group_column], *agg_selects]]
|
||||
else
|
||||
[[:name], [:name, *agg_selects]]
|
||||
end
|
||||
end
|
||||
|
||||
def unpack_row(row, dimension)
|
||||
if dimension[:group_column]
|
||||
# [name, dimension_id, count, sum_value, sum_value_business_hours]
|
||||
row
|
||||
else
|
||||
# [name, count, sum_value, sum_value_business_hours] → inject account id
|
||||
[row[0], @account.id, row[1], row[2], row[3]]
|
||||
end
|
||||
end
|
||||
|
||||
def bulk_insert_rollups(rollup_rows)
|
||||
# rubocop:disable Rails/SkipsModelValidations
|
||||
ReportingEventsRollup.insert_all(rollup_rows)
|
||||
# rubocop:enable Rails/SkipsModelValidations
|
||||
end
|
||||
end
|
||||
79
app/services/reporting_events/event_metric_registry.rb
Normal file
79
app/services/reporting_events/event_metric_registry.rb
Normal file
@@ -0,0 +1,79 @@
|
||||
module ReportingEvents::EventMetricRegistry
|
||||
# Describes one rollup metric emitted by a raw reporting event.
|
||||
# rollup_metric: metric name stored in reporting_events_rollups.
|
||||
# payload_kind: whether the emitted row carries only a count or a duration payload.
|
||||
Metric = Data.define(:rollup_metric, :payload_kind)
|
||||
|
||||
EVENTS = {
|
||||
conversation_resolved: [
|
||||
Metric.new(rollup_metric: :resolutions_count, payload_kind: :count),
|
||||
Metric.new(rollup_metric: :resolution_time, payload_kind: :duration)
|
||||
].freeze,
|
||||
first_response: [
|
||||
Metric.new(rollup_metric: :first_response, payload_kind: :duration)
|
||||
].freeze,
|
||||
reply_time: [
|
||||
Metric.new(rollup_metric: :reply_time, payload_kind: :duration)
|
||||
].freeze,
|
||||
conversation_bot_resolved: [
|
||||
Metric.new(rollup_metric: :bot_resolutions_count, payload_kind: :count)
|
||||
].freeze,
|
||||
conversation_bot_handoff: [
|
||||
Metric.new(rollup_metric: :bot_handoffs_count, payload_kind: :count)
|
||||
].freeze
|
||||
}.freeze
|
||||
|
||||
module_function
|
||||
|
||||
def event_names
|
||||
EVENTS.keys.map(&:to_s)
|
||||
end
|
||||
|
||||
def metrics_for(event)
|
||||
return {} if event.blank?
|
||||
|
||||
metrics_for_aggregate(
|
||||
event.name,
|
||||
count: 1,
|
||||
sum_value: event.try(:value),
|
||||
sum_value_business_hours: event.try(:value_in_business_hours)
|
||||
)
|
||||
end
|
||||
|
||||
def metrics_for_aggregate(event_name, count:, sum_value:, sum_value_business_hours:)
|
||||
return {} if event_name.blank?
|
||||
|
||||
values = {
|
||||
count: count.to_i,
|
||||
sum_value: sum_value.to_f,
|
||||
sum_value_business_hours: sum_value_business_hours.to_f
|
||||
}
|
||||
|
||||
EVENTS.fetch(event_name.to_sym, []).to_h do |metric|
|
||||
[metric.rollup_metric, metric_values(metric.payload_kind, values)]
|
||||
end
|
||||
end
|
||||
|
||||
private_class_method def metric_values(payload_kind, values)
|
||||
case payload_kind
|
||||
when :count
|
||||
count_values(values[:count])
|
||||
when :duration
|
||||
duration_values(values)
|
||||
else
|
||||
raise ArgumentError, "Unknown metric payload kind: #{payload_kind.inspect}"
|
||||
end
|
||||
end
|
||||
|
||||
private_class_method def count_values(count)
|
||||
{ count: count, sum_value: 0, sum_value_business_hours: 0 }
|
||||
end
|
||||
|
||||
private_class_method def duration_values(values)
|
||||
{
|
||||
count: values[:count],
|
||||
sum_value: values[:sum_value],
|
||||
sum_value_business_hours: values[:sum_value_business_hours]
|
||||
}
|
||||
end
|
||||
end
|
||||
107
app/services/reporting_events/metric_registry.rb
Normal file
107
app/services/reporting_events/metric_registry.rb
Normal file
@@ -0,0 +1,107 @@
|
||||
# Raw reporting events and rollup rows do not share a single metric namespace; this registry keeps write and read paths aligned.
|
||||
# TODO: Split this into separate registries for raw event mappings and report metric definitions.
|
||||
module ReportingEvents::MetricRegistry
|
||||
# Maps report summary response keys to the metric definitions they read from.
|
||||
SUMMARY_METRICS = {
|
||||
resolutions_count: :resolved_conversations_count,
|
||||
avg_resolution_time: :avg_resolution_time,
|
||||
avg_first_response_time: :avg_first_response_time,
|
||||
reply_time: :avg_reply_time
|
||||
}.freeze
|
||||
|
||||
# Expands each raw reporting event into the rollup metric payloads persisted for aggregation.
|
||||
EVENT_METRICS = {
|
||||
'conversation_resolved' => lambda do |values|
|
||||
{
|
||||
resolutions_count: count_metric(values[:count]),
|
||||
resolution_time: duration_metric(values)
|
||||
}
|
||||
end,
|
||||
'first_response' => ->(values) { { first_response: duration_metric(values) } },
|
||||
'reply_time' => ->(values) { { reply_time: duration_metric(values) } },
|
||||
'conversation_bot_resolved' => ->(values) { { bot_resolutions_count: count_metric(values[:count]) } },
|
||||
'conversation_bot_handoff' => ->(values) { { bot_handoffs_count: count_metric(values[:count]) } }
|
||||
}.freeze
|
||||
|
||||
# Describes which report metrics are supported and how each one is sourced and aggregated.
|
||||
REPORT_METRICS = {
|
||||
conversations_count: { aggregate: :count }.freeze,
|
||||
incoming_messages_count: { aggregate: :count }.freeze,
|
||||
outgoing_messages_count: { aggregate: :count }.freeze,
|
||||
avg_first_response_time: { raw_event_name: :first_response, rollup_metric: :first_response, aggregate: :average }.freeze,
|
||||
avg_resolution_time: { raw_event_name: :conversation_resolved, rollup_metric: :resolution_time, aggregate: :average }.freeze,
|
||||
reply_time: { raw_event_name: :reply_time, rollup_metric: :reply_time, aggregate: :average }.freeze,
|
||||
resolutions_count: { raw_event_name: :conversation_resolved, rollup_metric: :resolutions_count, aggregate: :count }.freeze,
|
||||
bot_resolutions_count: { raw_event_name: :conversation_bot_resolved, rollup_metric: :bot_resolutions_count, aggregate: :count }.freeze,
|
||||
bot_handoffs_count: { raw_event_name: :conversation_bot_handoff, rollup_metric: :bot_handoffs_count, aggregate: :count,
|
||||
raw_count_strategy: :distinct_conversation }.freeze
|
||||
}.freeze
|
||||
|
||||
module_function
|
||||
|
||||
def event_metrics_for(event)
|
||||
return {} if event.blank?
|
||||
return {} unless EVENT_METRICS.key?(event.name.to_s)
|
||||
|
||||
event_metrics_for_aggregate(
|
||||
event.name,
|
||||
count: 1,
|
||||
sum_value: event.try(:value),
|
||||
sum_value_business_hours: event.try(:value_in_business_hours)
|
||||
)
|
||||
end
|
||||
|
||||
def event_metrics_for_aggregate(event_name, count:, sum_value:, sum_value_business_hours:)
|
||||
values = {
|
||||
count: count.to_i,
|
||||
sum_value: sum_value.to_f,
|
||||
sum_value_business_hours: sum_value_business_hours.to_f
|
||||
}
|
||||
|
||||
EVENT_METRICS[event_name.to_s]&.call(values) || {}
|
||||
end
|
||||
|
||||
def report_metric(metric)
|
||||
return if metric.blank?
|
||||
|
||||
REPORT_METRICS[metric.to_sym]
|
||||
end
|
||||
|
||||
def supported_metric?(metric)
|
||||
report_metric(metric).present?
|
||||
end
|
||||
|
||||
def aggregate_for(metric)
|
||||
report_metric(metric)&.dig(:aggregate)
|
||||
end
|
||||
|
||||
def rollup_supported_metric?(metric)
|
||||
rollup_metric_for(metric).present?
|
||||
end
|
||||
|
||||
def rollup_metric_for(metric)
|
||||
report_metric(metric)&.dig(:rollup_metric)
|
||||
end
|
||||
|
||||
def raw_event_name_for(metric)
|
||||
report_metric(metric)&.dig(:raw_event_name)
|
||||
end
|
||||
|
||||
def summary_metrics
|
||||
SUMMARY_METRICS.map do |metric_name, summary_key|
|
||||
report_metric(metric_name).merge(metric_name: metric_name, summary_key: summary_key)
|
||||
end
|
||||
end
|
||||
|
||||
private_class_method def count_metric(count)
|
||||
{ count: count, sum_value: 0, sum_value_business_hours: 0 }
|
||||
end
|
||||
|
||||
private_class_method def duration_metric(values)
|
||||
{
|
||||
count: values[:count],
|
||||
sum_value: values[:sum_value],
|
||||
sum_value_business_hours: values[:sum_value_business_hours]
|
||||
}
|
||||
end
|
||||
end
|
||||
81
app/services/reporting_events/rollup_service.rb
Normal file
81
app/services/reporting_events/rollup_service.rb
Normal file
@@ -0,0 +1,81 @@
|
||||
class ReportingEvents::RollupService
|
||||
def self.perform(reporting_event)
|
||||
new(reporting_event).perform
|
||||
end
|
||||
|
||||
def initialize(reporting_event)
|
||||
@reporting_event = reporting_event
|
||||
@account = reporting_event.account
|
||||
end
|
||||
|
||||
def perform
|
||||
return unless rollup_enabled?
|
||||
|
||||
rows = build_rollup_rows
|
||||
return if rows.empty?
|
||||
|
||||
upsert_rollups(rows)
|
||||
end
|
||||
|
||||
private
|
||||
|
||||
# NOTE: This is intentionally not gated by the reporting_events_rollup feature flag.
|
||||
# Rollup data is collected for all accounts with a valid reporting timezone (soft toggle).
|
||||
# The feature flag only controls the read path — whether reports query rollups or raw events.
|
||||
def rollup_enabled?
|
||||
@account.reporting_timezone.present? && ActiveSupport::TimeZone[@account.reporting_timezone].present?
|
||||
end
|
||||
|
||||
def event_date
|
||||
@event_date ||= @reporting_event.created_at.in_time_zone(@account.reporting_timezone).to_date
|
||||
end
|
||||
|
||||
def dimensions
|
||||
{
|
||||
account: @account.id,
|
||||
agent: @reporting_event.user_id,
|
||||
inbox: @reporting_event.inbox_id
|
||||
}
|
||||
end
|
||||
|
||||
def build_rollup_rows
|
||||
event_metrics = ReportingEvents::EventMetricRegistry.metrics_for(@reporting_event)
|
||||
|
||||
dimensions.each_with_object([]) do |(dimension_type, dimension_id), rows|
|
||||
next if dimension_id.nil?
|
||||
|
||||
event_metrics.each do |metric, metric_data|
|
||||
rows << rollup_attributes(dimension_type, dimension_id, metric, metric_data)
|
||||
end
|
||||
end
|
||||
end
|
||||
|
||||
def upsert_rollups(rows)
|
||||
# rubocop:disable Rails/SkipsModelValidations
|
||||
ReportingEventsRollup.upsert_all(
|
||||
rows,
|
||||
unique_by: [:account_id, :date, :dimension_type, :dimension_id, :metric],
|
||||
on_duplicate: upsert_on_duplicate_sql
|
||||
)
|
||||
# rubocop:enable Rails/SkipsModelValidations
|
||||
end
|
||||
|
||||
def rollup_attributes(dimension_type, dimension_id, metric, metric_data)
|
||||
{
|
||||
account_id: @account.id, date: event_date,
|
||||
dimension_type: dimension_type, dimension_id: dimension_id, metric: metric,
|
||||
count: metric_data[:count], sum_value: metric_data[:sum_value].to_f,
|
||||
sum_value_business_hours: metric_data[:sum_value_business_hours].to_f,
|
||||
created_at: Time.current, updated_at: Time.current
|
||||
}
|
||||
end
|
||||
|
||||
def upsert_on_duplicate_sql
|
||||
Arel.sql(
|
||||
'count = reporting_events_rollups.count + EXCLUDED.count, ' \
|
||||
'sum_value = reporting_events_rollups.sum_value + EXCLUDED.sum_value, ' \
|
||||
'sum_value_business_hours = reporting_events_rollups.sum_value_business_hours + EXCLUDED.sum_value_business_hours, ' \
|
||||
'updated_at = EXCLUDED.updated_at'
|
||||
)
|
||||
end
|
||||
end
|
||||
Reference in New Issue
Block a user