Files
halobestie-clone/requirement/valkey-online-mirror-plan.md
Ramadhan Sjamsani 553dbac52f Phase 6: Valkey availability mirror — move read path off Postgres
Mitra-availability state (online flag, deactivated flag, per-mitra session
count, heartbeat liveness) mirrored into Valkey so the customer beacon
+ pairing blast + dashboard counts no longer hit Postgres on the hot path.
Postgres remains the durable source of truth; Valkey state is fully
derivable via seedFromPostgres on startup + reconnect.

Schema
- mitras:online           SET    — mirror of is_online
- mitras:deactivated      SET    — mirror of is_active=false
- mitra:capacity:<id>     STRING — active+pending_payment session count
- mitra💓<id>    STRING — ISO timestamp of last ping
- availability:snapshot   JSON   — beacon cache, TTL 10s, cluster-shared

Write paths (Postgres first, best-effort Valkey)
- setOnline/setOffline mirror SADD/SREM + heartbeat SET/DEL
- updateMitraStatus mirrors mitras:deactivated AND revokes auth_sessions
  on deactivate (bounds the "ghost online" window to access-token TTL)
- heartbeat is Valkey-only on the hot path; the per-ping Postgres UPDATE
  on last_heartbeat_at is eliminated (was 1,200 ops/min at prod scale)
- chat_session lifecycle (accept/end/reroute/extension/expiry) calls
  recomputeCapacityForMitra after each UPDATE — derive-from-truth avoids
  the bookkeeping risk of per-transition INCR/DECR

Read paths (Valkey-first, Postgres fallback on Valkey error)
- isMitraReachable: SISMEMBER mitras:online + heartbeat freshness
- findAvailableMitras: SDIFF + pipelined GETs, filter by capacity + heartbeat
- countAvailableMitrasFromCache: Valkey-driven, cached cluster-wide 10s TTL
- dashboard online count: SCARD
- Each reader wraps Valkey ops in try/catch → Postgres fallback on outage

Heartbeat path on /api/mitra/status/heartbeat
- resolveMitra preHandler replaced with heartbeatGuard: SISMEMBER on
  mitras:deactivated (~0 DB hits per ping). Falls back to full DB
  resolveMitra if Valkey is unreachable so a Valkey outage doesn't
  silently accept heartbeats from deactivated mitras.

Three sweeps, env-configurable cadences
- MITRA_AUTO_OFFLINE_SWEEP_SECONDS (30) — Valkey-driven stale detection
- HEARTBEAT_MIRROR_INTERVAL_SECONDS (60) — batched UPSERT writes
  Valkey timestamps to Postgres last_heartbeat_at via UNNEST (1 statement
  per cycle, idempotent across instances)
- VALKEY_ONLINE_MIRROR_SWEEP_SECONDS (300) — periodic reseed heals drift

Startup
- restoreActiveTimers → seedFromPostgres → bind listeners
- onValkeyReady re-runs the seed on every reconnect (cold start + reseed
  on Valkey restart, no manual intervention)

Failure semantics
- Read fallback: every Valkey read wrapped, falls back to existing
  Postgres JOIN query — system stays correct during Valkey outage,
  performance degrades not breaks
- Write best-effort: Postgres write commits before Valkey is touched;
  Valkey errors log + continue; reconciliation sweep heals drift
- Auto-offline sweep aborts entirely on Valkey error (does NOT mass-
  offline via Postgres scan during Valkey hiccup)

Tests
- New: 32 integration tests in mitra-status.valkey-mirror.test.js
  covering seed, write-through, fallbacks, capacity lifecycle,
  auto-offline sweep, heartbeat mirror, deactivation flow, beacon cache
- Updated: fixtures.js seeds Valkey alongside Postgres when isOnline=true
- Updated: helpers/db.js resetDb also flushes test Valkey
- Fixed 2 pre-existing session-timer flakes (string IDs failed uuid
  parse; vi.advanceTimersByTimeAsync raced real Postgres I/O)
- All 124/124 backend tests pass (was 90/92)

Docs
- requirement/valkey-online-mirror-plan.md — canonical plan
- requirement/valkey-online-mirror-testing.md — manual E2E checklist
- requirement/deployment.md — infra + Valkey persistence guidance for
  prod (Memorystore Standard tier recommended; migration from
  self-hosted Valkey is zero-downtime via reseed-from-Postgres)

Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com>
2026-05-25 18:07:55 +08:00

377 lines
21 KiB
Markdown
Raw Permalink Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
# Valkey mirror for mitra availability — plan
**Status:** approved (2026-05-25). Open issue: heartbeat auth preHandler (see [§ Open issues](#open-issues)).
**Created:** 2026-05-25
**Owner:** Ramadhan
## Goal
Move the **read path** for "is this mitra available to blast?" entirely into Valkey at production scale (hundreds of online mitras, thousands of customers polling). Eliminate per-heartbeat Postgres writes. Keep Postgres as the durable source of truth via either real-time mirroring (writes that already exist) or periodic batched mirroring (heartbeats).
## North star
- **Postgres = durable source of truth.** Every fact lives there.
- **Valkey = read path + ephemeral-write target.** Mirrors Postgres state; reads compute availability from primitive signals.
- **Valkey unreachable on a read:** fall back to the existing Postgres JOIN query. Outage degrades performance, never breaks pairing.
- **Valkey unreachable on a write:** log + continue. Reconciliation sweep heals drift.
- **Valkey restart / cold start:** reseed from Postgres before serving traffic.
## Schema
Four Valkey structures. None has a TTL (heartbeat freshness is computed by sweep, not by Redis expiry):
| Key | Type | Value / members | Updated by |
|---|---|---|---|
| `mitras:online` | SET | mitra UUIDs | `setOnline` SADD; `setOffline` SREM; sweep SREM; bulk-SREM on deactivate |
| `mitras:deactivated` | SET | mitra UUIDs | `updateMitraStatus(is_active=false)` SADD; `updateMitraStatus(is_active=true)` SREM |
| `mitra:capacity:<id>` | STRING (integer) | count of active+pending_payment sessions assigned to this mitra | INCR on session accept; DECR on session end/expire/cancel; DECR/INCR pair on reroute |
| `mitra:heartbeat:<id>` | STRING | ISO 8601 timestamp of last heartbeat | heartbeat handler `SET`; `setOnline` `SET` (seed); `setOffline` `DEL` |
Postgres mirror columns (durable):
- `mitra_online_status.is_online` — written by `setOnline` / `setOffline` / sweep. Already in schema.
- `mitra_online_status.last_heartbeat_at` — written by the **batched heartbeat mirror** every 60s (NOT per-ping). Already in schema.
- `mitras.is_active` — written by `updateMitraStatus`. Already in schema.
- `chat_sessions.status` + `mitra_id` — already source-of-truth for session counts.
## Read path (computing "available for blast")
All reads compute on the fly from Valkey primitives. No memoized `mitras:available` set.
```js
const findAvailableMitras = async () => {
// 1. online minus deactivated
const candidates = await valkey.sdiff('mitras:online', 'mitras:deactivated')
if (!candidates.length) return []
// 2. fetch capacity + heartbeat for each candidate (pipelined: 1 roundtrip)
const pipe = valkey.pipeline()
for (const id of candidates) {
pipe.get(`mitra:capacity:${id}`)
pipe.get(`mitra:heartbeat:${id}`)
}
const results = await pipe.exec()
// 3. filter by capacity + heartbeat freshness
const { max_customers_per_mitra } = await getMaxCustomersPerMitra()
const { stale_after_seconds } = await getMitraPingConfig()
const cutoff = Date.now() - stale_after_seconds * 1000
const eligible = []
for (let i = 0; i < candidates.length; i++) {
const count = Number(results[i * 2][1] ?? 0)
const heartbeatAt = results[i * 2 + 1][1]
if (count >= max_customers_per_mitra) continue
if (!heartbeatAt || Date.parse(heartbeatAt) < cutoff) continue
eligible.push({ id: candidates[i], active_session_count: count })
}
return eligible
}
```
**Cost at prod scale (300 online):** 1 `SDIFF` + 600 `GET` (pipelined) = ~1ms. Negligible.
### Read sites — what changes
| Caller | Today | After |
|---|---|---|
| `isMitraReachable(mitraId)` ([mitra-status.service.js:215](../backend/src/services/mitra-status.service.js#L215)) | `SELECT is_online ...` | `SISMEMBER mitras:online` + check `mitra:heartbeat:<id>` freshness |
| `findAvailableMitras` ([pairing.service.js:79](../backend/src/services/pairing.service.js#L79)) | full JOIN with chat_sessions | Valkey-driven (above) |
| `countAvailableMitrasFromCache` ([mitra-status.service.js:181](../backend/src/services/mitra-status.service.js#L181)) | full JOIN, cached in-process 10s | Valkey-driven, cached in **Valkey** 10s (shared cluster-wide) |
| Dashboard online count ([dashboard.service.js:16](../backend/src/services/dashboard.service.js#L16)) | `COUNT(*) WHERE is_online=true` | `SCARD mitras:online` |
| `getStatus(mitraId)` (mitra's own status poll) | full SELECT | Hybrid: `SISMEMBER` for `is_online`, Postgres for timestamps |
| `getOnlineMitras` (CC dashboard) | full JOIN with display_name + active_session_count | **unchanged** — low volume, joins make sense in SQL |
### Reader fallback
Every Valkey read is wrapped:
```js
try {
return await valkey.sismember('mitras:online', mitraId)
} catch (err) {
log.warn({ err }, '[mitras:online] valkey unreachable, falling back to DB')
const [row] = await sql`SELECT is_online FROM mitra_online_status WHERE mitra_id = ${mitraId}`
return Boolean(row?.is_online)
}
```
For `findAvailableMitras`, the fallback is the existing Postgres JOIN query.
## Write paths
Each Postgres write is followed by a Valkey write. Postgres always commits first. Valkey failures log + continue.
### Online / offline (mitra app toggle)
| Action | Postgres | Valkey |
|---|---|---|
| `setOnline(mitraId)` | `UPDATE mitra_online_status SET is_online=true, last_online_at=NOW(), last_heartbeat_at=NOW()` | `SADD mitras:online <id>` + `SET mitra:heartbeat:<id> <now-iso>` |
| `setOffline(mitraId)` | `UPDATE mitra_online_status SET is_online=false, last_offline_at=NOW()` | `SREM mitras:online <id>` + `DEL mitra:heartbeat:<id>` |
The `SET heartbeat` on `setOnline` is critical: without it, the very next sweep would mark the freshly-online mitra stale (their first real heartbeat is up to `heartbeat_cadence_seconds` away).
### Admin activate / deactivate (CC)
[`updateMitraStatus`](../backend/src/services/mitra.service.js) ([mitra.service.js:32](../backend/src/services/mitra.service.js#L32)):
| Action | Postgres | Valkey |
|---|---|---|
| activate (`is_active=true`) | `UPDATE mitras SET is_active=true` | `SREM mitras:deactivated <id>` |
| deactivate (`is_active=false`) | `UPDATE mitras SET is_active=false` | `SADD mitras:deactivated <id>` |
### Mitra heartbeat (hot path)
Heartbeat handler ([mitra-status.service.js:79](../backend/src/services/mitra-status.service.js#L79)) is rewritten to operate entirely against Valkey:
1. `authenticate` plugin verifies JWT signature + expiry + `userType === 'mitra'` (no DB).
2. `SISMEMBER mitras:deactivated <userId>` → if true, return `403`.
3. `SET mitra:heartbeat:<userId> <now-iso>`.
Steps 2 + 3 pipelined into one Valkey roundtrip.
| | Today | After |
|---|---|---|
| Auth check | `getMitraById` SELECT + `is_active` check (1 DB read) | `SISMEMBER mitras:deactivated` (Valkey only) |
| Body | `UPDATE mitra_online_status SET last_heartbeat_at=NOW() WHERE id=? AND is_online=true` (1 DB write) | `SET mitra:heartbeat:<id>` (Valkey only) |
| Postgres ops per ping | 2 | **0** |
| Valkey ops per ping | 0 | 2 (one pipelined roundtrip) |
| At prod scale (300 online × 2 pings/min) | 1,200 DB ops/min | 1,200 Valkey ops/min, **0 DB ops/min** |
`mitras:deactivated` is already maintained on every CC `updateMitraStatus` call (see [§ Admin activate / deactivate](#admin-activate--deactivate-cc)) so deactivation propagates to the heartbeat path within one Valkey write window (~ms).
### Heartbeat → Postgres batched mirror
A background job runs every `HEARTBEAT_MIRROR_INTERVAL_SECONDS` (env, default 60). One SQL statement, touches all online mitras at once:
```js
const mirrorHeartbeatsToPostgres = async () => {
const onlineIds = await valkey.smembers('mitras:online')
if (!onlineIds.length) return
const pipe = valkey.pipeline()
for (const id of onlineIds) pipe.get(`mitra:heartbeat:${id}`)
const results = await pipe.exec()
const pairs = []
for (let i = 0; i < onlineIds.length; i++) {
const ts = results[i][1]
if (ts) pairs.push({ mitra_id: onlineIds[i], ts })
}
if (!pairs.length) return
await sql`
UPDATE mitra_online_status m
SET last_heartbeat_at = u.ts::timestamptz, updated_at = NOW()
FROM (SELECT * FROM UNNEST(
${sql.array(pairs.map(p => p.mitra_id))}::uuid[],
${sql.array(pairs.map(p => p.ts))}::text[]
) AS t(mitra_id, ts)) u
WHERE m.mitra_id = u.mitra_id
`
}
```
**Cost at prod scale (300 online):** 1 batched UPDATE per minute per instance. At 3 instances = 3 statements/min cluster-wide (redundant but idempotent — latest timestamp wins). ~2060× reduction in Postgres write load vs today.
**No leader election initially.** 3 redundant idempotent UPDATEs/min is still 200× cheaper than today. If WAL pressure becomes visible, add a Valkey-NX lease leader-elect (~15 LOC); deferred.
### Session capacity counter
Touch all four services that mutate session state:
| Trigger | File | Postgres write (existing) | Valkey write |
|---|---|---|---|
| Mitra accepts a chat (status → `active`, mitra_id set) | `pairing.service.js` accept handler | `UPDATE chat_sessions SET status='active', mitra_id=?` | `INCR mitra:capacity:<mitra_id>` |
| Session ends (status → `ended` / `expired` / `cancelled`) | `closure.service.js`, expiry sweepers | `UPDATE chat_sessions SET status=...` | `DECR mitra:capacity:<mitra_id>` |
| Session reroute (mitra A → mitra B) | `session.service.js` | `UPDATE chat_sessions SET mitra_id=B` | `DECR mitra:capacity:A` + `INCR mitra:capacity:B` (pipelined) |
**What counts as occupying capacity:** sessions in `ACTIVE` or `PENDING_PAYMENT` status with a non-null `mitra_id`. This matches the existing `findAvailableMitras` predicate. Extension flow (active → pending_payment → active) does NOT change capacity — mitra stays occupied throughout.
**Negative-counter guard:** wrap `DECR` in a `Math.max(0, ...)` check via Lua or via a `GET` + `SET` if zero — to prevent drift if a DECR fires without a prior INCR (e.g. legacy session without Valkey mirror). The reconciliation sweep recomputes from Postgres anyway.
## Auto-offline sweep — Valkey-driven
Replaces the current Postgres seq-scan with a Valkey computation. Runs every `MITRA_AUTO_OFFLINE_SWEEP_SECONDS` (env, default 30):
```js
const autoOfflineStaleMitras = async () => {
const { stale_after_seconds, require_ping } = await getMitraPingConfig()
if (!require_ping) return 0
const onlineIds = await valkey.smembers('mitras:online')
if (!onlineIds.length) return 0
const pipe = valkey.pipeline()
for (const id of onlineIds) pipe.get(`mitra:heartbeat:${id}`)
const results = await pipe.exec()
const cutoff = Date.now() - stale_after_seconds * 1000
const stale = []
for (let i = 0; i < onlineIds.length; i++) {
const ts = results[i][1]
if (!ts || Date.parse(ts) < cutoff) stale.push(onlineIds[i])
}
if (!stale.length) return 0
// Postgres: bulk flip is_online=false
await sql`
UPDATE mitra_online_status
SET is_online = false, last_offline_at = NOW(), updated_at = NOW()
WHERE mitra_id = ANY(${sql.array(stale)}::uuid[]) AND is_online = true
`
// Log rows
for (const id of stale) {
await sql`INSERT INTO mitra_online_logs (mitra_id, status) VALUES (${id}, 'offline')`
}
// Valkey: bulk SREM + DEL heartbeat keys
const cleanup = valkey.pipeline()
cleanup.srem('mitras:online', ...stale)
for (const id of stale) cleanup.del(`mitra:heartbeat:${id}`)
await cleanup.exec()
invalidateAvailabilityCache()
return stale.length
}
```
**Stale threshold:** `stale_after_seconds` is read from `getMitraPingConfig()` ([config.service.js](../backend/src/services/config.service.js)) — the existing CC-tunable knob. Not a new env.
**Sweep cadence:** `MITRA_AUTO_OFFLINE_SWEEP_SECONDS` env, default 30 (matches current hardcoded setInterval).
**Failure semantics:** if any Valkey op throws, the entire sweep aborts for this tick. The next tick retries. We never want to mass-offline mitras due to a transient Valkey hiccup.
## Shared beacon snapshot cache
Replace the in-process `availabilityCache` ([mitra-status.service.js:14](../backend/src/services/mitra-status.service.js#L14)) with a Valkey GET/SETEX key. Even though reads are now sub-ms Valkey ops, this caps total Valkey query load at high beacon-poll rates (5,000 customers × 12/min = 60,000 polls/min → cache → 6 computations/min cluster-wide).
```
KEY: availability:snapshot
TYPE: string (JSON: {"available": bool, "count": number})
TTL: 10 seconds
```
`config:invalidate` pub/sub subscriber extended to `DEL availability:snapshot` on `max_customers_per_mitra` bust.
## Startup / reconnect / reseed
Three triggers reseed Valkey state from Postgres. All idempotent.
### Backend startup
In [`server.js`](../backend/src/server.js), after Valkey emits `'ready'` and before the public listener binds:
```js
const onlineRows = await sql`SELECT mitra_id FROM mitra_online_status WHERE is_online = true`
const deactRows = await sql`SELECT id FROM mitras WHERE is_active = false`
const sessionCountRows = await sql`
SELECT mitra_id, COUNT(*)::int AS c FROM chat_sessions
WHERE mitra_id IS NOT NULL AND status IN ('active', 'pending_payment')
GROUP BY mitra_id
`
const pipe = valkey.multi()
pipe.del('mitras:online', 'mitras:deactivated')
if (onlineRows.length) {
pipe.sadd('mitras:online', ...onlineRows.map(r => r.mitra_id))
// Seed heartbeat timestamps to NOW so the first sweep doesn't mass-offline
// currently-online mitras. They'll refresh on their next ping anyway.
const now = new Date().toISOString()
for (const r of onlineRows) pipe.set(`mitra:heartbeat:${r.mitra_id}`, now)
}
if (deactRows.length) pipe.sadd('mitras:deactivated', ...deactRows.map(r => r.id))
for (const r of sessionCountRows) pipe.set(`mitra:capacity:${r.mitra_id}`, r.c)
await pipe.exec()
```
### ioredis reconnect
Listen to the ioredis `'ready'` event (fires on initial connect AND each reconnect). Re-run the seed.
### Periodic reconciliation sweeper
`VALKEY_ONLINE_MIRROR_SWEEP_SECONDS` env, default 300. Runs the seed (idempotent — `DEL` + `SADD` + `SET` lands the same state). Belt-and-braces against drift from failed best-effort writes, out-of-band Postgres mutations, Valkey eviction.
## Two sweeps, two cadences (summary)
| Sweep | Purpose | Cadence env | Default | Reads from | Writes to |
|---|---|---|---|---|---|
| Auto-offline | Detect stale-heartbeat mitras → flip offline | `MITRA_AUTO_OFFLINE_SWEEP_SECONDS` | 30 | Valkey | Postgres + Valkey |
| Heartbeat mirror | Persist Valkey heartbeats to Postgres for forensics/backup | `HEARTBEAT_MIRROR_INTERVAL_SECONDS` | 60 | Valkey | Postgres |
| Reconciliation | Heal Valkey/Postgres drift | `VALKEY_ONLINE_MIRROR_SWEEP_SECONDS` | 300 | Postgres | Valkey |
All three run on every backend instance independently. All idempotent. No leader election required.
## Multi-instance safety
Cloud Run runs N instances. Each instance:
- Writes both stores on its own mutations. Atomic Valkey ops (`SADD` / `SREM` / `INCR` / `DECR` / `SET`) — no cross-instance coordination needed.
- Runs all three sweeps independently. Redundant but idempotent.
- Recompute-on-read for blast eligibility — no stale aggregate to invalidate.
The `mitra:capacity:<id>` counter is the most race-sensitive: `INCR` / `DECR` are atomic but a session-state change must consistently fire exactly one increment and one decrement over its lifetime. The reconciliation sweep recomputes from `chat_sessions` and resets the counter, healing any drift.
## Failure mode summary
| | Behavior |
|---|---|
| Valkey unreachable on read | Fall back to Postgres JOIN query |
| Valkey unreachable on write | Log + continue. Reconciliation sweep heals later. |
| Postgres unreachable on heartbeat mirror | Skip this cycle. Next cycle writes the latest. |
| Auto-offline sweep can't reach Valkey | Skip this tick. Mitras stay "online" until Valkey comes back + heartbeat ages out. |
| Valkey crash (catastrophic) | Backend reconnects → reseed from Postgres. Worst case: ≤60s of `last_heartbeat_at` forensics lost. |
| Backend crash | Other instances keep running. New instance reseed on startup. |
## Files touched
| File | Change |
|---|---|
| `backend/src/plugins/valkey.js` | Add wrappers: `sadd`, `srem`, `sismember`, `smembers`, `sdiff`, `scard`, `set`, `get`, `del`, `incr`, `decr`, `pipeline`/`multi` + `'ready'` reconnect hook |
| `backend/src/services/config.service.js` | Add `getMitraAutoOfflineSweepSeconds`, `getHeartbeatMirrorIntervalSeconds`, `getValkeyOnlineMirrorSweepSeconds` getters |
| `backend/src/services/mitra-status.service.js` | Major rewrite (see [§ Read path](#read-path-computing-available-for-blast) and [§ Write paths](#write-paths)). Add `incrementCapacity`, `decrementCapacity`, `mirrorHeartbeatsToPostgres`, `seedFromPostgres` |
| `backend/src/services/mitra.service.js` | Wrap `updateMitraStatus` with `SADD`/`SREM mitras:deactivated` |
| `backend/src/services/pairing.service.js` | Rewrite `findAvailableMitras` as Valkey-driven (Postgres fallback). `INCR mitra:capacity` on session accept. |
| `backend/src/services/closure.service.js` | `DECR mitra:capacity` on session end/expire/cancel |
| `backend/src/services/session.service.js` | `DECR` + `INCR` pair on reroute |
| `backend/src/services/dashboard.service.js` | `SCARD mitras:online` for online count |
| `backend/src/routes/public/mitra.status.routes.js` | Replace `resolveMitra` on `POST /heartbeat` with a Valkey `SISMEMBER mitras:deactivated` check (keep `resolveMitra` on `/online`, `/offline`, `GET /`) |
| `backend/src/server.js` | Call `seedFromPostgres` on startup (before listener binds); replace hardcoded 30_000 setInterval with env-driven cadence; register heartbeat mirror + reconciliation sweep intervals |
| `backend/.env.example` | Document `MITRA_AUTO_OFFLINE_SWEEP_SECONDS`, `HEARTBEAT_MIRROR_INTERVAL_SECONDS`, `VALKEY_ONLINE_MIRROR_SWEEP_SECONDS` |
| `backend/test/services/mitra-status.service.test.js` | Add tests (see [§ Test plan](#test-plan)) |
| `backend/test/services/pairing.service.test.js` | Update for Valkey-driven `findAvailableMitras` |
| `backend/test/helpers/valkey.js` (new if absent) | Test helper for clean-slate Valkey state per test |
**Estimated touch:** ~400 LOC + ~200 LOC tests. ~2 days focused work.
## Test plan
### Unit
1. Mock Valkey; verify each writer calls Postgres → Valkey in correct order, with the seed-heartbeat-on-setOnline and DEL-on-setOffline.
2. Verify reader fallback path runs when Valkey ops throw.
3. Verify auto-offline sweep aborts entirely when Valkey ops throw (does NOT mass-offline via Postgres-only path).
4. Verify capacity counter never goes negative (Math.max guard).
### Integration (real Valkey + Postgres)
1. **Seed correctness:** insert N online rows + M deactivated + session counts in DB; run startup seed; verify all four Valkey structures match.
2. **Heartbeat refresh:** `setHeartbeat()` → verify Valkey value updates; check that the periodic mirror writes Postgres `last_heartbeat_at` within one mirror cycle.
3. **Auto-offline:** insert online mitra, manually expire heartbeat by setting timestamp in past, run sweep, verify Postgres `is_online=false` + Valkey `SREM` + `DEL` heartbeat key.
4. **Capacity lifecycle:** simulate session accept → end across multiple mitras; verify counter increments/decrements; verify reroute moves count from A to B atomically.
5. **Restart resilience:** seed state, simulate Valkey restart (FLUSHALL), trigger reconnect handler, verify all four structures reseed correctly.
6. **Reconciliation:** corrupt Valkey (random SADD of non-existent mitra, wrong capacity counter, missing entries); run reconciliation sweep; verify convergence to Postgres state.
7. **Fallback:** disable Valkey mid-test; verify `findAvailableMitras` falls back to Postgres JOIN query and returns sensible results.
### Regression
- All 90/92 existing backend tests should still pass.
- Maestro flows for pairing (ts-customer-*) should pass unchanged.
## Decisions (locked 2026-05-25)
1. **`revokeAllAuthSessions(mitraId)` added to `updateMitraStatus`** in the same PR. Bounds the deactivation gap to access-token TTL across all mitra routes (not just heartbeat).
2. **Prod Valkey: Memorystore for Valkey, Standard tier** (HA with replica, smallest available capacity ~1GB). Built-in replication keeps heartbeat timestamps live across failover. Staging/dev can run Basic tier — the reseed-from-Postgres flow handles cold-cache restarts correctly either way.
3. **Keep `last_heartbeat_at` column.** Written by the 60s batched mirror; remains available for operator forensics ("when was X last seen?"). Drop only if a future audit confirms no consumer reads it.
## Future phases (deferred)
- **Heartbeat → Valkey TTL with keyspace notifications.** Replace timestamp-comparison sweep with `notify-keyspace-events Ex` → instant detection of expired heartbeats. Requires Memorystore config change + a subscriber. Defer until 30s detection lag is the visible bottleneck.
- **Leader-elected mirror/sweep.** Use a Valkey-NX lease so only one instance runs each background job. ~15 LOC each. Defer until the redundant work shows up in metrics.