Mitra-availability state (online flag, deactivated flag, per-mitra session count, heartbeat liveness) mirrored into Valkey so the customer beacon + pairing blast + dashboard counts no longer hit Postgres on the hot path. Postgres remains the durable source of truth; Valkey state is fully derivable via seedFromPostgres on startup + reconnect. Schema - mitras:online SET — mirror of is_online - mitras:deactivated SET — mirror of is_active=false - mitra:capacity:<id> STRING — active+pending_payment session count - mitra💓<id> STRING — ISO timestamp of last ping - availability:snapshot JSON — beacon cache, TTL 10s, cluster-shared Write paths (Postgres first, best-effort Valkey) - setOnline/setOffline mirror SADD/SREM + heartbeat SET/DEL - updateMitraStatus mirrors mitras:deactivated AND revokes auth_sessions on deactivate (bounds the "ghost online" window to access-token TTL) - heartbeat is Valkey-only on the hot path; the per-ping Postgres UPDATE on last_heartbeat_at is eliminated (was 1,200 ops/min at prod scale) - chat_session lifecycle (accept/end/reroute/extension/expiry) calls recomputeCapacityForMitra after each UPDATE — derive-from-truth avoids the bookkeeping risk of per-transition INCR/DECR Read paths (Valkey-first, Postgres fallback on Valkey error) - isMitraReachable: SISMEMBER mitras:online + heartbeat freshness - findAvailableMitras: SDIFF + pipelined GETs, filter by capacity + heartbeat - countAvailableMitrasFromCache: Valkey-driven, cached cluster-wide 10s TTL - dashboard online count: SCARD - Each reader wraps Valkey ops in try/catch → Postgres fallback on outage Heartbeat path on /api/mitra/status/heartbeat - resolveMitra preHandler replaced with heartbeatGuard: SISMEMBER on mitras:deactivated (~0 DB hits per ping). Falls back to full DB resolveMitra if Valkey is unreachable so a Valkey outage doesn't silently accept heartbeats from deactivated mitras. Three sweeps, env-configurable cadences - MITRA_AUTO_OFFLINE_SWEEP_SECONDS (30) — Valkey-driven stale detection - HEARTBEAT_MIRROR_INTERVAL_SECONDS (60) — batched UPSERT writes Valkey timestamps to Postgres last_heartbeat_at via UNNEST (1 statement per cycle, idempotent across instances) - VALKEY_ONLINE_MIRROR_SWEEP_SECONDS (300) — periodic reseed heals drift Startup - restoreActiveTimers → seedFromPostgres → bind listeners - onValkeyReady re-runs the seed on every reconnect (cold start + reseed on Valkey restart, no manual intervention) Failure semantics - Read fallback: every Valkey read wrapped, falls back to existing Postgres JOIN query — system stays correct during Valkey outage, performance degrades not breaks - Write best-effort: Postgres write commits before Valkey is touched; Valkey errors log + continue; reconciliation sweep heals drift - Auto-offline sweep aborts entirely on Valkey error (does NOT mass- offline via Postgres scan during Valkey hiccup) Tests - New: 32 integration tests in mitra-status.valkey-mirror.test.js covering seed, write-through, fallbacks, capacity lifecycle, auto-offline sweep, heartbeat mirror, deactivation flow, beacon cache - Updated: fixtures.js seeds Valkey alongside Postgres when isOnline=true - Updated: helpers/db.js resetDb also flushes test Valkey - Fixed 2 pre-existing session-timer flakes (string IDs failed uuid parse; vi.advanceTimersByTimeAsync raced real Postgres I/O) - All 124/124 backend tests pass (was 90/92) Docs - requirement/valkey-online-mirror-plan.md — canonical plan - requirement/valkey-online-mirror-testing.md — manual E2E checklist - requirement/deployment.md — infra + Valkey persistence guidance for prod (Memorystore Standard tier recommended; migration from self-hosted Valkey is zero-downtime via reseed-from-Postgres) Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com>
21 KiB
Valkey mirror for mitra availability — plan
Status: approved (2026-05-25). Open issue: heartbeat auth preHandler (see § Open issues). Created: 2026-05-25 Owner: Ramadhan
Goal
Move the read path for "is this mitra available to blast?" entirely into Valkey at production scale (hundreds of online mitras, thousands of customers polling). Eliminate per-heartbeat Postgres writes. Keep Postgres as the durable source of truth via either real-time mirroring (writes that already exist) or periodic batched mirroring (heartbeats).
North star
- Postgres = durable source of truth. Every fact lives there.
- Valkey = read path + ephemeral-write target. Mirrors Postgres state; reads compute availability from primitive signals.
- Valkey unreachable on a read: fall back to the existing Postgres JOIN query. Outage degrades performance, never breaks pairing.
- Valkey unreachable on a write: log + continue. Reconciliation sweep heals drift.
- Valkey restart / cold start: reseed from Postgres before serving traffic.
Schema
Four Valkey structures. None has a TTL (heartbeat freshness is computed by sweep, not by Redis expiry):
| Key | Type | Value / members | Updated by |
|---|---|---|---|
mitras:online |
SET | mitra UUIDs | setOnline SADD; setOffline SREM; sweep SREM; bulk-SREM on deactivate |
mitras:deactivated |
SET | mitra UUIDs | updateMitraStatus(is_active=false) SADD; updateMitraStatus(is_active=true) SREM |
mitra:capacity:<id> |
STRING (integer) | count of active+pending_payment sessions assigned to this mitra | INCR on session accept; DECR on session end/expire/cancel; DECR/INCR pair on reroute |
mitra:heartbeat:<id> |
STRING | ISO 8601 timestamp of last heartbeat | heartbeat handler SET; setOnline SET (seed); setOffline DEL |
Postgres mirror columns (durable):
mitra_online_status.is_online— written bysetOnline/setOffline/ sweep. Already in schema.mitra_online_status.last_heartbeat_at— written by the batched heartbeat mirror every 60s (NOT per-ping). Already in schema.mitras.is_active— written byupdateMitraStatus. Already in schema.chat_sessions.status+mitra_id— already source-of-truth for session counts.
Read path (computing "available for blast")
All reads compute on the fly from Valkey primitives. No memoized mitras:available set.
const findAvailableMitras = async () => {
// 1. online minus deactivated
const candidates = await valkey.sdiff('mitras:online', 'mitras:deactivated')
if (!candidates.length) return []
// 2. fetch capacity + heartbeat for each candidate (pipelined: 1 roundtrip)
const pipe = valkey.pipeline()
for (const id of candidates) {
pipe.get(`mitra:capacity:${id}`)
pipe.get(`mitra:heartbeat:${id}`)
}
const results = await pipe.exec()
// 3. filter by capacity + heartbeat freshness
const { max_customers_per_mitra } = await getMaxCustomersPerMitra()
const { stale_after_seconds } = await getMitraPingConfig()
const cutoff = Date.now() - stale_after_seconds * 1000
const eligible = []
for (let i = 0; i < candidates.length; i++) {
const count = Number(results[i * 2][1] ?? 0)
const heartbeatAt = results[i * 2 + 1][1]
if (count >= max_customers_per_mitra) continue
if (!heartbeatAt || Date.parse(heartbeatAt) < cutoff) continue
eligible.push({ id: candidates[i], active_session_count: count })
}
return eligible
}
Cost at prod scale (300 online): 1 SDIFF + 600 GET (pipelined) = ~1ms. Negligible.
Read sites — what changes
| Caller | Today | After |
|---|---|---|
isMitraReachable(mitraId) (mitra-status.service.js:215) |
SELECT is_online ... |
SISMEMBER mitras:online + check mitra:heartbeat:<id> freshness |
findAvailableMitras (pairing.service.js:79) |
full JOIN with chat_sessions | Valkey-driven (above) |
countAvailableMitrasFromCache (mitra-status.service.js:181) |
full JOIN, cached in-process 10s | Valkey-driven, cached in Valkey 10s (shared cluster-wide) |
| Dashboard online count (dashboard.service.js:16) | COUNT(*) WHERE is_online=true |
SCARD mitras:online |
getStatus(mitraId) (mitra's own status poll) |
full SELECT | Hybrid: SISMEMBER for is_online, Postgres for timestamps |
getOnlineMitras (CC dashboard) |
full JOIN with display_name + active_session_count | unchanged — low volume, joins make sense in SQL |
Reader fallback
Every Valkey read is wrapped:
try {
return await valkey.sismember('mitras:online', mitraId)
} catch (err) {
log.warn({ err }, '[mitras:online] valkey unreachable, falling back to DB')
const [row] = await sql`SELECT is_online FROM mitra_online_status WHERE mitra_id = ${mitraId}`
return Boolean(row?.is_online)
}
For findAvailableMitras, the fallback is the existing Postgres JOIN query.
Write paths
Each Postgres write is followed by a Valkey write. Postgres always commits first. Valkey failures log + continue.
Online / offline (mitra app toggle)
| Action | Postgres | Valkey |
|---|---|---|
setOnline(mitraId) |
UPDATE mitra_online_status SET is_online=true, last_online_at=NOW(), last_heartbeat_at=NOW() |
SADD mitras:online <id> + SET mitra:heartbeat:<id> <now-iso> |
setOffline(mitraId) |
UPDATE mitra_online_status SET is_online=false, last_offline_at=NOW() |
SREM mitras:online <id> + DEL mitra:heartbeat:<id> |
The SET heartbeat on setOnline is critical: without it, the very next sweep would mark the freshly-online mitra stale (their first real heartbeat is up to heartbeat_cadence_seconds away).
Admin activate / deactivate (CC)
updateMitraStatus (mitra.service.js:32):
| Action | Postgres | Valkey |
|---|---|---|
activate (is_active=true) |
UPDATE mitras SET is_active=true |
SREM mitras:deactivated <id> |
deactivate (is_active=false) |
UPDATE mitras SET is_active=false |
SADD mitras:deactivated <id> |
Mitra heartbeat (hot path)
Heartbeat handler (mitra-status.service.js:79) is rewritten to operate entirely against Valkey:
authenticateplugin verifies JWT signature + expiry +userType === 'mitra'(no DB).SISMEMBER mitras:deactivated <userId>→ if true, return403.SET mitra:heartbeat:<userId> <now-iso>.
Steps 2 + 3 pipelined into one Valkey roundtrip.
| Today | After | |
|---|---|---|
| Auth check | getMitraById SELECT + is_active check (1 DB read) |
SISMEMBER mitras:deactivated (Valkey only) |
| Body | UPDATE mitra_online_status SET last_heartbeat_at=NOW() WHERE id=? AND is_online=true (1 DB write) |
SET mitra:heartbeat:<id> (Valkey only) |
| Postgres ops per ping | 2 | 0 |
| Valkey ops per ping | 0 | 2 (one pipelined roundtrip) |
| At prod scale (300 online × 2 pings/min) | 1,200 DB ops/min | 1,200 Valkey ops/min, 0 DB ops/min |
mitras:deactivated is already maintained on every CC updateMitraStatus call (see § Admin activate / deactivate) so deactivation propagates to the heartbeat path within one Valkey write window (~ms).
Heartbeat → Postgres batched mirror
A background job runs every HEARTBEAT_MIRROR_INTERVAL_SECONDS (env, default 60). One SQL statement, touches all online mitras at once:
const mirrorHeartbeatsToPostgres = async () => {
const onlineIds = await valkey.smembers('mitras:online')
if (!onlineIds.length) return
const pipe = valkey.pipeline()
for (const id of onlineIds) pipe.get(`mitra:heartbeat:${id}`)
const results = await pipe.exec()
const pairs = []
for (let i = 0; i < onlineIds.length; i++) {
const ts = results[i][1]
if (ts) pairs.push({ mitra_id: onlineIds[i], ts })
}
if (!pairs.length) return
await sql`
UPDATE mitra_online_status m
SET last_heartbeat_at = u.ts::timestamptz, updated_at = NOW()
FROM (SELECT * FROM UNNEST(
${sql.array(pairs.map(p => p.mitra_id))}::uuid[],
${sql.array(pairs.map(p => p.ts))}::text[]
) AS t(mitra_id, ts)) u
WHERE m.mitra_id = u.mitra_id
`
}
Cost at prod scale (300 online): 1 batched UPDATE per minute per instance. At 3 instances = 3 statements/min cluster-wide (redundant but idempotent — latest timestamp wins). ~20–60× reduction in Postgres write load vs today.
No leader election initially. 3 redundant idempotent UPDATEs/min is still 200× cheaper than today. If WAL pressure becomes visible, add a Valkey-NX lease leader-elect (~15 LOC); deferred.
Session capacity counter
Touch all four services that mutate session state:
| Trigger | File | Postgres write (existing) | Valkey write |
|---|---|---|---|
Mitra accepts a chat (status → active, mitra_id set) |
pairing.service.js accept handler |
UPDATE chat_sessions SET status='active', mitra_id=? |
INCR mitra:capacity:<mitra_id> |
Session ends (status → ended / expired / cancelled) |
closure.service.js, expiry sweepers |
UPDATE chat_sessions SET status=... |
DECR mitra:capacity:<mitra_id> |
| Session reroute (mitra A → mitra B) | session.service.js |
UPDATE chat_sessions SET mitra_id=B |
DECR mitra:capacity:A + INCR mitra:capacity:B (pipelined) |
What counts as occupying capacity: sessions in ACTIVE or PENDING_PAYMENT status with a non-null mitra_id. This matches the existing findAvailableMitras predicate. Extension flow (active → pending_payment → active) does NOT change capacity — mitra stays occupied throughout.
Negative-counter guard: wrap DECR in a Math.max(0, ...) check via Lua or via a GET + SET if zero — to prevent drift if a DECR fires without a prior INCR (e.g. legacy session without Valkey mirror). The reconciliation sweep recomputes from Postgres anyway.
Auto-offline sweep — Valkey-driven
Replaces the current Postgres seq-scan with a Valkey computation. Runs every MITRA_AUTO_OFFLINE_SWEEP_SECONDS (env, default 30):
const autoOfflineStaleMitras = async () => {
const { stale_after_seconds, require_ping } = await getMitraPingConfig()
if (!require_ping) return 0
const onlineIds = await valkey.smembers('mitras:online')
if (!onlineIds.length) return 0
const pipe = valkey.pipeline()
for (const id of onlineIds) pipe.get(`mitra:heartbeat:${id}`)
const results = await pipe.exec()
const cutoff = Date.now() - stale_after_seconds * 1000
const stale = []
for (let i = 0; i < onlineIds.length; i++) {
const ts = results[i][1]
if (!ts || Date.parse(ts) < cutoff) stale.push(onlineIds[i])
}
if (!stale.length) return 0
// Postgres: bulk flip is_online=false
await sql`
UPDATE mitra_online_status
SET is_online = false, last_offline_at = NOW(), updated_at = NOW()
WHERE mitra_id = ANY(${sql.array(stale)}::uuid[]) AND is_online = true
`
// Log rows
for (const id of stale) {
await sql`INSERT INTO mitra_online_logs (mitra_id, status) VALUES (${id}, 'offline')`
}
// Valkey: bulk SREM + DEL heartbeat keys
const cleanup = valkey.pipeline()
cleanup.srem('mitras:online', ...stale)
for (const id of stale) cleanup.del(`mitra:heartbeat:${id}`)
await cleanup.exec()
invalidateAvailabilityCache()
return stale.length
}
Stale threshold: stale_after_seconds is read from getMitraPingConfig() (config.service.js) — the existing CC-tunable knob. Not a new env.
Sweep cadence: MITRA_AUTO_OFFLINE_SWEEP_SECONDS env, default 30 (matches current hardcoded setInterval).
Failure semantics: if any Valkey op throws, the entire sweep aborts for this tick. The next tick retries. We never want to mass-offline mitras due to a transient Valkey hiccup.
Shared beacon snapshot cache
Replace the in-process availabilityCache (mitra-status.service.js:14) with a Valkey GET/SETEX key. Even though reads are now sub-ms Valkey ops, this caps total Valkey query load at high beacon-poll rates (5,000 customers × 12/min = 60,000 polls/min → cache → 6 computations/min cluster-wide).
KEY: availability:snapshot
TYPE: string (JSON: {"available": bool, "count": number})
TTL: 10 seconds
config:invalidate pub/sub subscriber extended to DEL availability:snapshot on max_customers_per_mitra bust.
Startup / reconnect / reseed
Three triggers reseed Valkey state from Postgres. All idempotent.
Backend startup
In server.js, after Valkey emits 'ready' and before the public listener binds:
const onlineRows = await sql`SELECT mitra_id FROM mitra_online_status WHERE is_online = true`
const deactRows = await sql`SELECT id FROM mitras WHERE is_active = false`
const sessionCountRows = await sql`
SELECT mitra_id, COUNT(*)::int AS c FROM chat_sessions
WHERE mitra_id IS NOT NULL AND status IN ('active', 'pending_payment')
GROUP BY mitra_id
`
const pipe = valkey.multi()
pipe.del('mitras:online', 'mitras:deactivated')
if (onlineRows.length) {
pipe.sadd('mitras:online', ...onlineRows.map(r => r.mitra_id))
// Seed heartbeat timestamps to NOW so the first sweep doesn't mass-offline
// currently-online mitras. They'll refresh on their next ping anyway.
const now = new Date().toISOString()
for (const r of onlineRows) pipe.set(`mitra:heartbeat:${r.mitra_id}`, now)
}
if (deactRows.length) pipe.sadd('mitras:deactivated', ...deactRows.map(r => r.id))
for (const r of sessionCountRows) pipe.set(`mitra:capacity:${r.mitra_id}`, r.c)
await pipe.exec()
ioredis reconnect
Listen to the ioredis 'ready' event (fires on initial connect AND each reconnect). Re-run the seed.
Periodic reconciliation sweeper
VALKEY_ONLINE_MIRROR_SWEEP_SECONDS env, default 300. Runs the seed (idempotent — DEL + SADD + SET lands the same state). Belt-and-braces against drift from failed best-effort writes, out-of-band Postgres mutations, Valkey eviction.
Two sweeps, two cadences (summary)
| Sweep | Purpose | Cadence env | Default | Reads from | Writes to |
|---|---|---|---|---|---|
| Auto-offline | Detect stale-heartbeat mitras → flip offline | MITRA_AUTO_OFFLINE_SWEEP_SECONDS |
30 | Valkey | Postgres + Valkey |
| Heartbeat mirror | Persist Valkey heartbeats to Postgres for forensics/backup | HEARTBEAT_MIRROR_INTERVAL_SECONDS |
60 | Valkey | Postgres |
| Reconciliation | Heal Valkey/Postgres drift | VALKEY_ONLINE_MIRROR_SWEEP_SECONDS |
300 | Postgres | Valkey |
All three run on every backend instance independently. All idempotent. No leader election required.
Multi-instance safety
Cloud Run runs N instances. Each instance:
- Writes both stores on its own mutations. Atomic Valkey ops (
SADD/SREM/INCR/DECR/SET) — no cross-instance coordination needed. - Runs all three sweeps independently. Redundant but idempotent.
- Recompute-on-read for blast eligibility — no stale aggregate to invalidate.
The mitra:capacity:<id> counter is the most race-sensitive: INCR / DECR are atomic but a session-state change must consistently fire exactly one increment and one decrement over its lifetime. The reconciliation sweep recomputes from chat_sessions and resets the counter, healing any drift.
Failure mode summary
| Behavior | |
|---|---|
| Valkey unreachable on read | Fall back to Postgres JOIN query |
| Valkey unreachable on write | Log + continue. Reconciliation sweep heals later. |
| Postgres unreachable on heartbeat mirror | Skip this cycle. Next cycle writes the latest. |
| Auto-offline sweep can't reach Valkey | Skip this tick. Mitras stay "online" until Valkey comes back + heartbeat ages out. |
| Valkey crash (catastrophic) | Backend reconnects → reseed from Postgres. Worst case: ≤60s of last_heartbeat_at forensics lost. |
| Backend crash | Other instances keep running. New instance reseed on startup. |
Files touched
| File | Change |
|---|---|
backend/src/plugins/valkey.js |
Add wrappers: sadd, srem, sismember, smembers, sdiff, scard, set, get, del, incr, decr, pipeline/multi + 'ready' reconnect hook |
backend/src/services/config.service.js |
Add getMitraAutoOfflineSweepSeconds, getHeartbeatMirrorIntervalSeconds, getValkeyOnlineMirrorSweepSeconds getters |
backend/src/services/mitra-status.service.js |
Major rewrite (see § Read path and § Write paths). Add incrementCapacity, decrementCapacity, mirrorHeartbeatsToPostgres, seedFromPostgres |
backend/src/services/mitra.service.js |
Wrap updateMitraStatus with SADD/SREM mitras:deactivated |
backend/src/services/pairing.service.js |
Rewrite findAvailableMitras as Valkey-driven (Postgres fallback). INCR mitra:capacity on session accept. |
backend/src/services/closure.service.js |
DECR mitra:capacity on session end/expire/cancel |
backend/src/services/session.service.js |
DECR + INCR pair on reroute |
backend/src/services/dashboard.service.js |
SCARD mitras:online for online count |
backend/src/routes/public/mitra.status.routes.js |
Replace resolveMitra on POST /heartbeat with a Valkey SISMEMBER mitras:deactivated check (keep resolveMitra on /online, /offline, GET /) |
backend/src/server.js |
Call seedFromPostgres on startup (before listener binds); replace hardcoded 30_000 setInterval with env-driven cadence; register heartbeat mirror + reconciliation sweep intervals |
backend/.env.example |
Document MITRA_AUTO_OFFLINE_SWEEP_SECONDS, HEARTBEAT_MIRROR_INTERVAL_SECONDS, VALKEY_ONLINE_MIRROR_SWEEP_SECONDS |
backend/test/services/mitra-status.service.test.js |
Add tests (see § Test plan) |
backend/test/services/pairing.service.test.js |
Update for Valkey-driven findAvailableMitras |
backend/test/helpers/valkey.js (new if absent) |
Test helper for clean-slate Valkey state per test |
Estimated touch: ~400 LOC + ~200 LOC tests. ~2 days focused work.
Test plan
Unit
- Mock Valkey; verify each writer calls Postgres → Valkey in correct order, with the seed-heartbeat-on-setOnline and DEL-on-setOffline.
- Verify reader fallback path runs when Valkey ops throw.
- Verify auto-offline sweep aborts entirely when Valkey ops throw (does NOT mass-offline via Postgres-only path).
- Verify capacity counter never goes negative (Math.max guard).
Integration (real Valkey + Postgres)
- Seed correctness: insert N online rows + M deactivated + session counts in DB; run startup seed; verify all four Valkey structures match.
- Heartbeat refresh:
setHeartbeat()→ verify Valkey value updates; check that the periodic mirror writes Postgreslast_heartbeat_atwithin one mirror cycle. - Auto-offline: insert online mitra, manually expire heartbeat by setting timestamp in past, run sweep, verify Postgres
is_online=false+ ValkeySREM+DELheartbeat key. - Capacity lifecycle: simulate session accept → end across multiple mitras; verify counter increments/decrements; verify reroute moves count from A to B atomically.
- Restart resilience: seed state, simulate Valkey restart (FLUSHALL), trigger reconnect handler, verify all four structures reseed correctly.
- Reconciliation: corrupt Valkey (random SADD of non-existent mitra, wrong capacity counter, missing entries); run reconciliation sweep; verify convergence to Postgres state.
- Fallback: disable Valkey mid-test; verify
findAvailableMitrasfalls back to Postgres JOIN query and returns sensible results.
Regression
- All 90/92 existing backend tests should still pass.
- Maestro flows for pairing (ts-customer-*) should pass unchanged.
Decisions (locked 2026-05-25)
revokeAllAuthSessions(mitraId)added toupdateMitraStatusin the same PR. Bounds the deactivation gap to access-token TTL across all mitra routes (not just heartbeat).- Prod Valkey: Memorystore for Valkey, Standard tier (HA with replica, smallest available capacity ~1GB). Built-in replication keeps heartbeat timestamps live across failover. Staging/dev can run Basic tier — the reseed-from-Postgres flow handles cold-cache restarts correctly either way.
- Keep
last_heartbeat_atcolumn. Written by the 60s batched mirror; remains available for operator forensics ("when was X last seen?"). Drop only if a future audit confirms no consumer reads it.
Future phases (deferred)
- Heartbeat → Valkey TTL with keyspace notifications. Replace timestamp-comparison sweep with
notify-keyspace-events Ex→ instant detection of expired heartbeats. Requires Memorystore config change + a subscriber. Defer until 30s detection lag is the visible bottleneck. - Leader-elected mirror/sweep. Use a Valkey-NX lease so only one instance runs each background job. ~15 LOC each. Defer until the redundant work shows up in metrics.