## PR#1: Reporting events rollup — model and write path Reporting queries currently hit the `reporting_events` table directly. This works, but the table grows linearly with event volume, and aggregation queries (counts, averages over date ranges) get progressively slower as accounts age. This PR introduces a pre-aggregated `reporting_events_rollups` table that stores daily per-metric, per-dimension (account/agent/inbox) totals. The write path is intentionally decoupled from the read path — rollup rows are written inline from the event listener via upsert, and a backfill service exists to rebuild historical data from raw events. Nothing reads from this table yet. The write path activates when an account has a `reporting_timezone` set (new account setting). The `reporting_events_rollup` feature flag controls only the future read path, not writes — so rollup data accumulates silently once timezone is configured. A `MetricRegistry` maps raw event names to rollup column semantics in one place, keeping the write and (future) read paths aligned. ### What changed - Migration for `reporting_events_rollups` with a unique composite index for upsert - `ReportingEventsRollup` model - `reporting_timezone` account setting with IANA timezone validation - `MetricRegistry` — single source of truth for event-to-metric mappings - `RollupService` — real-time upsert from event listener - `BackfillService` — rebuilds rollups for a given account + date from raw events - Rake tasks for interactive backfill and timezone setup - `reporting_events_rollup` feature flag (disabled by default) ### How to test 1. Set a `reporting_timezone` on an account (`Account.first.update!(reporting_timezone: 'Asia/Kolkata')`) 2. Resolve a conversation or trigger a first response 3. Check `ReportingEventsRollup.where(account_id: ...)` — rows should appear 4. Run backfill: `bundle exec rake reporting_events_rollup:backfill` and verify historical data populates --------- Co-authored-by: Muhsin Keloth <muhsinkeramam@gmail.com>
82 lines
2.5 KiB
Ruby
82 lines
2.5 KiB
Ruby
class ReportingEvents::RollupService
|
|
def self.perform(reporting_event)
|
|
new(reporting_event).perform
|
|
end
|
|
|
|
def initialize(reporting_event)
|
|
@reporting_event = reporting_event
|
|
@account = reporting_event.account
|
|
end
|
|
|
|
def perform
|
|
return unless rollup_enabled?
|
|
|
|
rows = build_rollup_rows
|
|
return if rows.empty?
|
|
|
|
upsert_rollups(rows)
|
|
end
|
|
|
|
private
|
|
|
|
# NOTE: This is intentionally not gated by the reporting_events_rollup feature flag.
|
|
# Rollup data is collected for all accounts with a valid reporting timezone (soft toggle).
|
|
# The feature flag only controls the read path — whether reports query rollups or raw events.
|
|
def rollup_enabled?
|
|
@account.reporting_timezone.present? && ActiveSupport::TimeZone[@account.reporting_timezone].present?
|
|
end
|
|
|
|
def event_date
|
|
@event_date ||= @reporting_event.created_at.in_time_zone(@account.reporting_timezone).to_date
|
|
end
|
|
|
|
def dimensions
|
|
{
|
|
account: @account.id,
|
|
agent: @reporting_event.user_id,
|
|
inbox: @reporting_event.inbox_id
|
|
}
|
|
end
|
|
|
|
def build_rollup_rows
|
|
event_metrics = ReportingEvents::EventMetricRegistry.metrics_for(@reporting_event)
|
|
|
|
dimensions.each_with_object([]) do |(dimension_type, dimension_id), rows|
|
|
next if dimension_id.nil?
|
|
|
|
event_metrics.each do |metric, metric_data|
|
|
rows << rollup_attributes(dimension_type, dimension_id, metric, metric_data)
|
|
end
|
|
end
|
|
end
|
|
|
|
def upsert_rollups(rows)
|
|
# rubocop:disable Rails/SkipsModelValidations
|
|
ReportingEventsRollup.upsert_all(
|
|
rows,
|
|
unique_by: [:account_id, :date, :dimension_type, :dimension_id, :metric],
|
|
on_duplicate: upsert_on_duplicate_sql
|
|
)
|
|
# rubocop:enable Rails/SkipsModelValidations
|
|
end
|
|
|
|
def rollup_attributes(dimension_type, dimension_id, metric, metric_data)
|
|
{
|
|
account_id: @account.id, date: event_date,
|
|
dimension_type: dimension_type, dimension_id: dimension_id, metric: metric,
|
|
count: metric_data[:count], sum_value: metric_data[:sum_value].to_f,
|
|
sum_value_business_hours: metric_data[:sum_value_business_hours].to_f,
|
|
created_at: Time.current, updated_at: Time.current
|
|
}
|
|
end
|
|
|
|
def upsert_on_duplicate_sql
|
|
Arel.sql(
|
|
'count = reporting_events_rollups.count + EXCLUDED.count, ' \
|
|
'sum_value = reporting_events_rollups.sum_value + EXCLUDED.sum_value, ' \
|
|
'sum_value_business_hours = reporting_events_rollups.sum_value_business_hours + EXCLUDED.sum_value_business_hours, ' \
|
|
'updated_at = EXCLUDED.updated_at'
|
|
)
|
|
end
|
|
end
|