ALL-945: Validate monitor outbox end-to-end
in_progressValidation task from Seph thread 1780496600.485949. Non-mutating checks first: code path, worker health, Datadog metrics/logs, DLQ/error signals, and existing test coverage. True prod E2E requires explicit approval for a production write: create/enable a test monitor that fires a transition, then observe evaluator -> MonitorTransitionOutbox -> Kafka -> alert bridge. Leave ALL-945 In Progress until validation completes.
Event Timeline
created
progress
# ALL-945 outbox validation — non-mutating pass
_Requested by Seph (Slack 1780496600.485949) | Fleet-task 983414cf | 2026-06-03 UTC_
---
## Background: What replaced PR #10991
PR #10991 (Wintermute's original) was closed per Seph on 2026-05-22 ("duplicate-agent overlap"). Seph authored a replacement: **PR #10977** (`seph/all-945-monitor-outbox-publisher`), merged **2026-05-22 15:56 UTC**. The final commit in that PR (`9da651732c`) cherry-adapted a fix from Wintermute's work (explicit `(monitorId, entityId, sequence)` ordering) and is the current drainLoop.ts on main.
---
## A. Code path (current main)
All four pipeline stages are present and wired in `c7e63a52ae` (current main HEAD).
### 1. Evaluator → OutboxWrite
**File:** `domains/device/src/services/monitor/evaluator/evaluator.ts`
**Function:** `writeEvaluationResult` (~line 779)
When a transition is detected (`evaluation.transition` is truthy), the evaluator writes three rows **atomically inside a Postgres transaction**:
1. `MonitorEntityState` (update with version check)
2. `MonitorTransition` (insert)
3. `MonitorTransitionOutbox` (insert, ~line 954)
The payload is validated against `MonitorTransitionPayloadSchema` (from event-bus) **inside the transaction** before insert — if schema drifts, the whole transaction rolls back. This is correct transactional outbox pattern.
Outbox row fields: `transitionId`, `workspaceId`, `payload` (JSON, pre-built Kafka envelope payload), `publishedAt` (null = unpublished), `attempts=0`, `lastError=null`.
### 2. Publisher (drainLoop → Kafka)
**Files:**
- `domains/device/src/services/monitor/publisher/drainLoop.ts` (drain logic)
- `domains/device/src/jobs/monitorOutboxPublisher/queue.ts` (BullMQ queue, 5s interval)
- `domains/device/src/jobs/monitorOutboxPublisher/worker.ts` (BullMQ worker, concurrency=1)
- `domains/device/src/server.ts` lines ~58–60 (lifecycle wiring)
**Schedule:** BullMQ `upsertJobScheduler` with `every: 5000ms` (sub-minute — avoids cron floor of 1 min). Concurrency=1 to preserve per-pair ordering.
**Drain logic:**
- `SELECT WHERE publishedAt IS NULL ORDER BY (transition.monitorId, transition.entityId, transition.sequence) ASC LIMIT 500`
- Publishes each row to `monitor.transition` topic using `publisher.publish(topic, transitionId, envelope)`
- Marks `publishedAt` after success
- On publish failure: bumps `attempts`, records `lastError`, blocks the `(monitorId, entityId)` pair for the current tick
- After 5 failures: calls `publisher.handleError("monitor.transition", ...)` → routes to auto-generated DLQ topic `monitor.transition.dlq`, then marks `publishedAt` so the row exits the unpublished set
**Server wiring** (`server.ts`):
```ts
const monitorOutboxPublisherQueue = initMonitorOutboxPublisherQueue(); // line 58
const monitorOutboxPublisherWorker = initMonitorOutboxPublisherWorker(); // line 60
// closed at shutdown, lines 125–126
```
### 3. DLQ (`monitor.transition.dlq`)
**NOT** explicitly registered in `packages/event-bus/src/common/topics.ts`. The DLQ is **auto-created** by the event-bus framework at startup.
In `packages/event-bus/src/common/kafka.ts`:
```ts
export const DLQ_SUFFIX = ".dlq" as const; // line 19
const desiredTopics = allTopics.flatMap((t) => [t, t.concat(DLQ_SUFFIX)]); // line 81
```
So `monitor.transition.dlq` is provisioned automatically alongside `monitor.transition`. The DLQ wraps failed events in `{ originalPayload, error, timestamp }`. No consumer of the DLQ is currently implemented — manual operator intervention as designed. A Datadog monitor on DLQ row rate was noted as a planned follow-up in the PR comments.
**Note:** The original Wintermute PR #10991 explicitly added `monitor.transition.dlq` to topics.ts. Seph's PR #10977 does NOT — instead relying on the auto-creation mechanism. Both approaches work because `publisher.handleError` uses `topic.concat(DLQ_SUFFIX)` directly and doesn't require the topic to be in the `Topics` array. This is a deliberate design decision.
### 4. Alert bridge (Kafka consumer → Alert domain)
**Files:**
- `domains/alert/src/events/monitor.transition/index.ts` (event handler)
- `domains/alert/src/services/monitorAlertBridge/index.ts` (core logic, `applyMonitorTransition`)
- `domains/alert/src/events/index.ts` line 40 (handler registered to `"monitor.transition"`)
The alert bridge:
- Consumes `monitor.transition` events
- Calls `applyMonitorTransition(event.payload, undefined, event.key)` inside a Postgres transaction
- Dedupes on `transitionId` (drops duplicate/stale sequences)
- Creates/resolves alerts based on state machine transitions (OK→TRIGGERED creates, TRIGGERED→OK resolves)
- Rethrows on unexpected errors (framework routes to DLQ)
**Pipeline integrity: ALL FOUR LINKS ARE INTACT.** The code path is fully wired evaluator→outbox→publisher→Kafka→alert bridge.
---
## B. Worker / cron health
### Deployment model
The device domain deploys as a **single Docker container** (`ghcr.io/texturehq/device`) on Kubernetes. The BullMQ publisher worker runs **in-process** within the same container, not as a separate worker/cron. There is no separate publisher pod — the publisher shares the device domain process.
Evidence:
- `domains/device/src/server.ts`: initializes both `monitorOutboxPublisherQueue` and `monitorOutboxPublisherWorker` alongside the GraphQL server on startup.
- Docker workflow: `.github/workflows/docker-domain-device.yml` builds a single image `ghcr.io/texturehq/device`.
- K8s service: `infra/supergraph-gateway/supergraph.k8s.prod.yaml` references `http://mono-domains-device:10000/subgraph` — single service.
**Whether the publisher is running in prod:** Cannot confirm directly without Datadog or kubectl access. But **IF** the device domain service is running (it must be, since the product is operational), the publisher worker starts automatically at process init. There is no separate deployment gate.
### Datadog
`DD_API_KEY` / `DD_APP_KEY` are **not available** in the agent environment and doppler returned no results for DD credentials. Cannot query Datadog logs or metrics directly.
**Recommended credentials to add for future monitoring:** Expose `DD_API_KEY` and `DD_APP_KEY` in `~/.config/fleet-tasks/config.env` or doppler `device` project `prod` config so sub-agents can query:
- `service:device` logs filtered by `"monitorOutboxPublisher"` or `"monitor_outbox"` keywords
- Metrics under prefix `monitor.outbox_publisher.*` (the worker emits these via `tracer.dogstatsd`)
### Kafka topics
`monitor.transition` is registered in `packages/event-bus/src/common/topics.ts` (line 93). `monitor.transition.dlq` is NOT registered explicitly but is auto-created by the framework (see Section A above). Both topics will exist in production if the device service started with the code from PR #10977 or later.
### BullMQ / Redis
The publisher uses the same Redis connection as the evaluator (`config.redisUrl`). Schedule cadence: 5s via `upsertJobScheduler`. Concurrency: 1. Overlap guard: if a prior tick is still active, the new tick is skipped (`shouldSkipForOverlap` check) to prevent out-of-order publishes.
---
## C. DLQ + error signals
### Retry logic (current main, `9da651732c`)
- `MAX_PUBLISH_ATTEMPTS = 5` (line ~61 in drainLoop.ts)
- On each failure: `attempts++`, `lastError` recorded, `publishedAt` stays null
- At `attempts >= 5`: calls `publisher.handleError("monitor.transition", transitionId, envelope, error)` which writes to `monitor.transition.dlq`
- If DLQ publish also fails: row stays unpublished (attempts bumped), pair stays blocked — correct conservative behavior
- DLQ success: `publishedAt = now()`, row exits unpublished set, pair unblocked
**Minor gap in current main vs wintermute branch:** Commit `f458b71498` (on wintermute branch, NOT merged) improves DLQ ack-failure handling by separating DLQ publish failures from DLQ ack failures. The current main mixes these slightly. This is a correctness nuance, not a blocking bug — the safe path (leave row unpublished, block pair) is taken in both cases.
### Postgres row state
Cannot query directly — no read-only Postgres credentials are available in the agent environment for the device domain. Direct count queries would require:
```sql
SELECT publishedAt IS NULL AS unpublished, attempts, COUNT(*)
FROM "MonitorTransitionOutbox"
WHERE publishedAt IS NULL
GROUP BY attempts
ORDER BY attempts DESC
LIMIT 1000;
```
This would reveal if rows are accumulating (backlog) and at what retry depth.
### Outbox drain index vs query ordering
**Minor discrepancy found:** The Postgres partial index created in migration `20260506180052` is:
```sql
CREATE INDEX "MonitorTransitionOutbox_unpublished_drain_idx"
ON "MonitorTransitionOutbox"("createdAt")
WHERE "publishedAt" IS NULL;
```
But the actual drain query orders by `(transition.monitorId, transition.entityId, transition.sequence)` — a relation join, not `createdAt`. Postgres can use the partial predicate (`WHERE publishedAt IS NULL`) to filter, then sort. For small-to-medium backlogs this is fine; for large backlogs (thousands of rows) Postgres may scan the full unpublished set then sort. This is not a blocking bug but worth a follow-up index improvement.
### DLQ consumer
No consumer of `monitor.transition.dlq` is implemented. Failed events accumulate in the DLQ topic and require manual operator intervention. This is the documented design.
---
## D. Existing test coverage
### Publisher unit tests (drainLoop.ts)
**File:** `domains/device/src/services/monitor/publisher/drainLoop.test.ts` (519 lines)
Test cases (12 describe/it blocks):
1. Publishes a single unpublished row and marks publishedAt ✅
2. Retries a transient failure on a subsequent tick ✅
3. Routes to DLQ after MAX_PUBLISH_ATTEMPTS failures ✅
4. Preserves per-(monitorId, entityId) ordering (blocking subsequent rows after failure) ✅
5. Publishes rows in different pairs independently ✅
6. Marks unparseable payload publishedAt (stops infinite loop) ✅
7. Leaves row unpublished when DLQ delivery also fails ✅
8. Counts publish as successful even when publishedAt DB write fails after delivery ✅
9. Counts DLQ delivery as successful even when publishedAt DB write fails after delivery ✅
10. Respects per-tick batchSize cap ✅
11. `parsePayload` — accepts well-formed / rejects null / rejects non-object / rejects missing fields ✅
12. `buildEnvelope` — wraps payload with canonical wire format / uses payload's transitionedAt ✅
Coverage: The test file was not in the May-21 coverage report (stale). Tests are present and cover all critical paths. Queue/worker tests exist (checked in the PR) but the files have `skipcq: TCV-001` annotations to exclude them from DeepSource coverage.
### Alert bridge unit tests
**File:** `domains/alert/src/services/monitorAlertBridge/applyMonitorTransition.test.ts` (327 lines)
Test cases:
- OK→TRIGGERED creates alert, TRIGGERED→OK resolves ✅
- Re-trigger during recovery ✅
- NO_DATA→OK resolves no-data alert ✅
- Duplicate sequence deduplication ✅
- Stale (lower) sequence drop ✅
- Out-of-order delivery [3,1,2,4] ✅
- DEVICE target validation ✅
- SITE target validation ✅
- Lowercase site entityType (regression test for ALL-540 fix) ✅
- Per-(monitor, entity) isolation ✅
Coverage for `monitorAlertBridge/index.ts`: **91.93% statements** (per May-21 coverage report). The `monitor.transition/index.ts` handler itself shows 0% in the coverage report (also stale — the test for it is the integration test in applyMonitorTransition.test.ts which exercises it indirectly).
### Integration tests covering the outbox→Kafka path
No full integration test for the evaluator→outbox→publisher→Kafka→alert pipeline was found. The drainLoop tests mock the event-bus; there is no end-to-end test using a live Kafka instance. This is the expected gap — confirmed by the task description.
---
## E. What a true E2E would require
To perform a real prod E2E validation (read-only monitoring, then a single controlled mutation), the following steps would be needed:
### Workspace and setup
1. **Pick an internal workspace** — Texture's own internal workspace (not a customer workspace). Seph would know the workspace ID.
2. **Identify a device with live telemetry** in that workspace — any device that sends data regularly.
3. **Create one test monitor** (FRESHNESS or THRESHOLD) on that device via the Admin UI or API:
- FRESHNESS: trigger condition = data not seen in 30 seconds (reliably triggers for any device with non-realtime data)
- Required fields: `workspaceId`, `name`, `family: FRESHNESS`, `entityType: DEVICE`, target device ID, `evaluationWindowMs: 30000`
### Observation sequence (5 pipeline steps)
After creating the monitor, observe these in order:
| Step | What to observe | Where to look |
|------|----------------|---------------|
| 1. **Evaluator log** | `monitor.evaluator.tick.complete` log event with `transitionCreates: 1` for the monitor | Datadog: `service:device` `"transitionCreates"` |
| 2. **Outbox row written** | New row in `MonitorTransitionOutbox` with `publishedAt IS NULL`, `attempts=0` | Postgres (read-only): `SELECT * FROM "MonitorTransitionOutbox" WHERE "publishedAt" IS NULL ORDER BY "createdAt" DESC LIMIT 5` |
| 3. **Publisher tick** | Within 5s: log `monitor_outbox_publisher_tick_complete` with `rowsPublished: 1` | Datadog: `service:device` `"monitor_outbox_publisher_tick_complete"` |
| 4. **Kafka message** | Offset advance on `monitor.transition` topic for the new `transitionId` | Kafka UI / `kcat -b $BROKER -t monitor.transition -C -o -1 -e` |
| 5. **Alert created** | `alert.detected` Kafka event + new `Alert` row in alert domain with `monitorId` matching | Datadog: `service:alert` `"applyMonitorTransition"` operation or alert domain Postgres |
### Required credentials/permissions
- **Datadog access** (`DD_API_KEY` + `DD_APP_KEY`) — needed for log queries
- **Postgres read-only access** to device domain DB (device `DATABASE_URL` or a read replica connection string from doppler `device/prod`)
- **Kafka access** (`KAFKA_BROKERS` + auth creds) — or access to Kafka UI/Kowl in prod
- **Admin API or UI access** to create the test monitor
- **Seph approval** to create/modify monitors in the internal workspace
### Cleanup
After validation: archive the test monitor via Admin UI (the archive action shipped in PR #11526 / ALL-1367).
### Estimated time
~30 minutes for a human/agent with all credentials pre-available.
---
## Summary of findings
| Area | Status | Notes |
|------|--------|-------|
| Evaluator → outbox write | ✅ Intact | Atomic Tx in evaluator.ts ~line 954 |
| Publisher daemon | ✅ Implemented, runs in-process | BullMQ 5s cadence, in device server.ts |
| DLQ routing | ✅ Implemented | Auto-created `monitor.transition.dlq` via event-bus |
| Alert bridge consumer | ✅ Implemented | domains/alert consumes `monitor.transition` |
| `monitor.transition` topic registered | ✅ | topics.ts line 93 |
| `monitor.transition.dlq` topic registered | ⚠️ Auto-created (not explicit) | By design in Seph's PR; works correctly |
| Tests for drainLoop | ✅ 12 test cases | drainLoop.test.ts 519 lines |
| Tests for alert bridge | ✅ Good coverage (~92%) | applyMonitorTransition.test.ts |
| No integration/E2E test | ⚠️ Gap | Requires live Kafka + DB; documented above |
| Datadog monitoring | ⚠️ No confirmed metrics | Worker emits `monitor.outbox_publisher.*` DD metrics but can't verify from agent env |
| Drain index vs query ordering | ⚠️ Minor mismatch | Index on `createdAt`, query orders by relation fields |
| DLQ ack-failure handling | ⚠️ Minor gap | f458b71498 fix on wintermute branch not merged; non-blocking |
---
## Recommended next step for Seph
**Static investigation is sufficient to confirm the code path is correct.** All four pipeline links (evaluator → outbox → publisher → alert bridge) are present and wired in current main. The publisher was authored by Seph directly (PR #10977, merged May 22) and contains unit tests for all critical paths including DLQ, retry, ordering, and edge cases.
**If Seph wants to confirm prod health without a mutation:** Add Datadog credentials to the agent environment and re-run a Datadog-only sub-agent that checks for `monitor.outbox_publisher.rows.published` metric activity in the last 24h. If the metric is non-zero (or consistently zero and no monitors are enabled yet), that answers the "is it running?" question without any writes.
**If Seph wants a full E2E:** The concrete plan in Section E is ready to execute. It requires: (1) approval to create one test monitor in the internal workspace, (2) Datadog + Postgres read + Kafka access. Estimated 30 minutes. If the monitor domain is not yet in production use (as of May 21: "team is going to test it internally"), then no customer impact from the test.
**One minor follow-up to consider filing:** The drain query orders by `(monitorId, entityId, sequence)` via a join, but the Postgres partial index is on `createdAt WHERE publishedAt IS NULL`. When the outbox backlog grows (e.g., after a Kafka outage), Postgres must sort all unpublished rows by the join order, which could be slow. A better index would be a composite one on the FK + sequence. Low priority until volume warrants it.
progress
Non-mutating validation complete. drainLoop tests 22/22, alert-bridge tests 39/39. PR #10977 merged 2026-05-22. Worker registered in server.ts. Live evidence: alert.MonitorAlertState has 6 rows in prod (most recent 2026-06-05 14:54Z) — proves end-to-end flow worked. Datadog metrics/logs scopes unavailable on current keys. Ready to discuss prod E2E.
progress
Status sync: Linear ALL-945 was closed 2026-06-16 with PR #10977 receipt (merged 2026-05-22). Prod-E2E Slack thread (1780496600.485949) parked since 2026-06-09 with no follow-up planned. Validation effectively complete; fleet-task can be closed. Not posting in Slack thread — no new external info, Seph has not asked to revisit.