Files
halobestie-clone/requirement/valkey-online-mirror-plan.md
Ramadhan Sjamsani 553dbac52f Phase 6: Valkey availability mirror — move read path off Postgres
Mitra-availability state (online flag, deactivated flag, per-mitra session
count, heartbeat liveness) mirrored into Valkey so the customer beacon
+ pairing blast + dashboard counts no longer hit Postgres on the hot path.
Postgres remains the durable source of truth; Valkey state is fully
derivable via seedFromPostgres on startup + reconnect.

Schema
- mitras:online           SET    — mirror of is_online
- mitras:deactivated      SET    — mirror of is_active=false
- mitra:capacity:<id>     STRING — active+pending_payment session count
- mitra💓<id>    STRING — ISO timestamp of last ping
- availability:snapshot   JSON   — beacon cache, TTL 10s, cluster-shared

Write paths (Postgres first, best-effort Valkey)
- setOnline/setOffline mirror SADD/SREM + heartbeat SET/DEL
- updateMitraStatus mirrors mitras:deactivated AND revokes auth_sessions
  on deactivate (bounds the "ghost online" window to access-token TTL)
- heartbeat is Valkey-only on the hot path; the per-ping Postgres UPDATE
  on last_heartbeat_at is eliminated (was 1,200 ops/min at prod scale)
- chat_session lifecycle (accept/end/reroute/extension/expiry) calls
  recomputeCapacityForMitra after each UPDATE — derive-from-truth avoids
  the bookkeeping risk of per-transition INCR/DECR

Read paths (Valkey-first, Postgres fallback on Valkey error)
- isMitraReachable: SISMEMBER mitras:online + heartbeat freshness
- findAvailableMitras: SDIFF + pipelined GETs, filter by capacity + heartbeat
- countAvailableMitrasFromCache: Valkey-driven, cached cluster-wide 10s TTL
- dashboard online count: SCARD
- Each reader wraps Valkey ops in try/catch → Postgres fallback on outage

Heartbeat path on /api/mitra/status/heartbeat
- resolveMitra preHandler replaced with heartbeatGuard: SISMEMBER on
  mitras:deactivated (~0 DB hits per ping). Falls back to full DB
  resolveMitra if Valkey is unreachable so a Valkey outage doesn't
  silently accept heartbeats from deactivated mitras.

Three sweeps, env-configurable cadences
- MITRA_AUTO_OFFLINE_SWEEP_SECONDS (30) — Valkey-driven stale detection
- HEARTBEAT_MIRROR_INTERVAL_SECONDS (60) — batched UPSERT writes
  Valkey timestamps to Postgres last_heartbeat_at via UNNEST (1 statement
  per cycle, idempotent across instances)
- VALKEY_ONLINE_MIRROR_SWEEP_SECONDS (300) — periodic reseed heals drift

Startup
- restoreActiveTimers → seedFromPostgres → bind listeners
- onValkeyReady re-runs the seed on every reconnect (cold start + reseed
  on Valkey restart, no manual intervention)

Failure semantics
- Read fallback: every Valkey read wrapped, falls back to existing
  Postgres JOIN query — system stays correct during Valkey outage,
  performance degrades not breaks
- Write best-effort: Postgres write commits before Valkey is touched;
  Valkey errors log + continue; reconciliation sweep heals drift
- Auto-offline sweep aborts entirely on Valkey error (does NOT mass-
  offline via Postgres scan during Valkey hiccup)

Tests
- New: 32 integration tests in mitra-status.valkey-mirror.test.js
  covering seed, write-through, fallbacks, capacity lifecycle,
  auto-offline sweep, heartbeat mirror, deactivation flow, beacon cache
- Updated: fixtures.js seeds Valkey alongside Postgres when isOnline=true
- Updated: helpers/db.js resetDb also flushes test Valkey
- Fixed 2 pre-existing session-timer flakes (string IDs failed uuid
  parse; vi.advanceTimersByTimeAsync raced real Postgres I/O)
- All 124/124 backend tests pass (was 90/92)

Docs
- requirement/valkey-online-mirror-plan.md — canonical plan
- requirement/valkey-online-mirror-testing.md — manual E2E checklist
- requirement/deployment.md — infra + Valkey persistence guidance for
  prod (Memorystore Standard tier recommended; migration from
  self-hosted Valkey is zero-downtime via reseed-from-Postgres)

Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com>
2026-05-25 18:07:55 +08:00

21 KiB
Raw Permalink Blame History

Valkey mirror for mitra availability — plan

Status: approved (2026-05-25). Open issue: heartbeat auth preHandler (see § Open issues). Created: 2026-05-25 Owner: Ramadhan

Goal

Move the read path for "is this mitra available to blast?" entirely into Valkey at production scale (hundreds of online mitras, thousands of customers polling). Eliminate per-heartbeat Postgres writes. Keep Postgres as the durable source of truth via either real-time mirroring (writes that already exist) or periodic batched mirroring (heartbeats).

North star

  • Postgres = durable source of truth. Every fact lives there.
  • Valkey = read path + ephemeral-write target. Mirrors Postgres state; reads compute availability from primitive signals.
  • Valkey unreachable on a read: fall back to the existing Postgres JOIN query. Outage degrades performance, never breaks pairing.
  • Valkey unreachable on a write: log + continue. Reconciliation sweep heals drift.
  • Valkey restart / cold start: reseed from Postgres before serving traffic.

Schema

Four Valkey structures. None has a TTL (heartbeat freshness is computed by sweep, not by Redis expiry):

Key Type Value / members Updated by
mitras:online SET mitra UUIDs setOnline SADD; setOffline SREM; sweep SREM; bulk-SREM on deactivate
mitras:deactivated SET mitra UUIDs updateMitraStatus(is_active=false) SADD; updateMitraStatus(is_active=true) SREM
mitra:capacity:<id> STRING (integer) count of active+pending_payment sessions assigned to this mitra INCR on session accept; DECR on session end/expire/cancel; DECR/INCR pair on reroute
mitra:heartbeat:<id> STRING ISO 8601 timestamp of last heartbeat heartbeat handler SET; setOnline SET (seed); setOffline DEL

Postgres mirror columns (durable):

  • mitra_online_status.is_online — written by setOnline / setOffline / sweep. Already in schema.
  • mitra_online_status.last_heartbeat_at — written by the batched heartbeat mirror every 60s (NOT per-ping). Already in schema.
  • mitras.is_active — written by updateMitraStatus. Already in schema.
  • chat_sessions.status + mitra_id — already source-of-truth for session counts.

Read path (computing "available for blast")

All reads compute on the fly from Valkey primitives. No memoized mitras:available set.

const findAvailableMitras = async () => {
  // 1. online minus deactivated
  const candidates = await valkey.sdiff('mitras:online', 'mitras:deactivated')
  if (!candidates.length) return []

  // 2. fetch capacity + heartbeat for each candidate (pipelined: 1 roundtrip)
  const pipe = valkey.pipeline()
  for (const id of candidates) {
    pipe.get(`mitra:capacity:${id}`)
    pipe.get(`mitra:heartbeat:${id}`)
  }
  const results = await pipe.exec()

  // 3. filter by capacity + heartbeat freshness
  const { max_customers_per_mitra } = await getMaxCustomersPerMitra()
  const { stale_after_seconds } = await getMitraPingConfig()
  const cutoff = Date.now() - stale_after_seconds * 1000

  const eligible = []
  for (let i = 0; i < candidates.length; i++) {
    const count = Number(results[i * 2][1] ?? 0)
    const heartbeatAt = results[i * 2 + 1][1]
    if (count >= max_customers_per_mitra) continue
    if (!heartbeatAt || Date.parse(heartbeatAt) < cutoff) continue
    eligible.push({ id: candidates[i], active_session_count: count })
  }
  return eligible
}

Cost at prod scale (300 online): 1 SDIFF + 600 GET (pipelined) = ~1ms. Negligible.

Read sites — what changes

Caller Today After
isMitraReachable(mitraId) (mitra-status.service.js:215) SELECT is_online ... SISMEMBER mitras:online + check mitra:heartbeat:<id> freshness
findAvailableMitras (pairing.service.js:79) full JOIN with chat_sessions Valkey-driven (above)
countAvailableMitrasFromCache (mitra-status.service.js:181) full JOIN, cached in-process 10s Valkey-driven, cached in Valkey 10s (shared cluster-wide)
Dashboard online count (dashboard.service.js:16) COUNT(*) WHERE is_online=true SCARD mitras:online
getStatus(mitraId) (mitra's own status poll) full SELECT Hybrid: SISMEMBER for is_online, Postgres for timestamps
getOnlineMitras (CC dashboard) full JOIN with display_name + active_session_count unchanged — low volume, joins make sense in SQL

Reader fallback

Every Valkey read is wrapped:

try {
  return await valkey.sismember('mitras:online', mitraId)
} catch (err) {
  log.warn({ err }, '[mitras:online] valkey unreachable, falling back to DB')
  const [row] = await sql`SELECT is_online FROM mitra_online_status WHERE mitra_id = ${mitraId}`
  return Boolean(row?.is_online)
}

For findAvailableMitras, the fallback is the existing Postgres JOIN query.

Write paths

Each Postgres write is followed by a Valkey write. Postgres always commits first. Valkey failures log + continue.

Online / offline (mitra app toggle)

Action Postgres Valkey
setOnline(mitraId) UPDATE mitra_online_status SET is_online=true, last_online_at=NOW(), last_heartbeat_at=NOW() SADD mitras:online <id> + SET mitra:heartbeat:<id> <now-iso>
setOffline(mitraId) UPDATE mitra_online_status SET is_online=false, last_offline_at=NOW() SREM mitras:online <id> + DEL mitra:heartbeat:<id>

The SET heartbeat on setOnline is critical: without it, the very next sweep would mark the freshly-online mitra stale (their first real heartbeat is up to heartbeat_cadence_seconds away).

Admin activate / deactivate (CC)

updateMitraStatus (mitra.service.js:32):

Action Postgres Valkey
activate (is_active=true) UPDATE mitras SET is_active=true SREM mitras:deactivated <id>
deactivate (is_active=false) UPDATE mitras SET is_active=false SADD mitras:deactivated <id>

Mitra heartbeat (hot path)

Heartbeat handler (mitra-status.service.js:79) is rewritten to operate entirely against Valkey:

  1. authenticate plugin verifies JWT signature + expiry + userType === 'mitra' (no DB).
  2. SISMEMBER mitras:deactivated <userId> → if true, return 403.
  3. SET mitra:heartbeat:<userId> <now-iso>.

Steps 2 + 3 pipelined into one Valkey roundtrip.

Today After
Auth check getMitraById SELECT + is_active check (1 DB read) SISMEMBER mitras:deactivated (Valkey only)
Body UPDATE mitra_online_status SET last_heartbeat_at=NOW() WHERE id=? AND is_online=true (1 DB write) SET mitra:heartbeat:<id> (Valkey only)
Postgres ops per ping 2 0
Valkey ops per ping 0 2 (one pipelined roundtrip)
At prod scale (300 online × 2 pings/min) 1,200 DB ops/min 1,200 Valkey ops/min, 0 DB ops/min

mitras:deactivated is already maintained on every CC updateMitraStatus call (see § Admin activate / deactivate) so deactivation propagates to the heartbeat path within one Valkey write window (~ms).

Heartbeat → Postgres batched mirror

A background job runs every HEARTBEAT_MIRROR_INTERVAL_SECONDS (env, default 60). One SQL statement, touches all online mitras at once:

const mirrorHeartbeatsToPostgres = async () => {
  const onlineIds = await valkey.smembers('mitras:online')
  if (!onlineIds.length) return

  const pipe = valkey.pipeline()
  for (const id of onlineIds) pipe.get(`mitra:heartbeat:${id}`)
  const results = await pipe.exec()

  const pairs = []
  for (let i = 0; i < onlineIds.length; i++) {
    const ts = results[i][1]
    if (ts) pairs.push({ mitra_id: onlineIds[i], ts })
  }
  if (!pairs.length) return

  await sql`
    UPDATE mitra_online_status m
    SET last_heartbeat_at = u.ts::timestamptz, updated_at = NOW()
    FROM (SELECT * FROM UNNEST(
      ${sql.array(pairs.map(p => p.mitra_id))}::uuid[],
      ${sql.array(pairs.map(p => p.ts))}::text[]
    ) AS t(mitra_id, ts)) u
    WHERE m.mitra_id = u.mitra_id
  `
}

Cost at prod scale (300 online): 1 batched UPDATE per minute per instance. At 3 instances = 3 statements/min cluster-wide (redundant but idempotent — latest timestamp wins). ~2060× reduction in Postgres write load vs today.

No leader election initially. 3 redundant idempotent UPDATEs/min is still 200× cheaper than today. If WAL pressure becomes visible, add a Valkey-NX lease leader-elect (~15 LOC); deferred.

Session capacity counter

Touch all four services that mutate session state:

Trigger File Postgres write (existing) Valkey write
Mitra accepts a chat (status → active, mitra_id set) pairing.service.js accept handler UPDATE chat_sessions SET status='active', mitra_id=? INCR mitra:capacity:<mitra_id>
Session ends (status → ended / expired / cancelled) closure.service.js, expiry sweepers UPDATE chat_sessions SET status=... DECR mitra:capacity:<mitra_id>
Session reroute (mitra A → mitra B) session.service.js UPDATE chat_sessions SET mitra_id=B DECR mitra:capacity:A + INCR mitra:capacity:B (pipelined)

What counts as occupying capacity: sessions in ACTIVE or PENDING_PAYMENT status with a non-null mitra_id. This matches the existing findAvailableMitras predicate. Extension flow (active → pending_payment → active) does NOT change capacity — mitra stays occupied throughout.

Negative-counter guard: wrap DECR in a Math.max(0, ...) check via Lua or via a GET + SET if zero — to prevent drift if a DECR fires without a prior INCR (e.g. legacy session without Valkey mirror). The reconciliation sweep recomputes from Postgres anyway.

Auto-offline sweep — Valkey-driven

Replaces the current Postgres seq-scan with a Valkey computation. Runs every MITRA_AUTO_OFFLINE_SWEEP_SECONDS (env, default 30):

const autoOfflineStaleMitras = async () => {
  const { stale_after_seconds, require_ping } = await getMitraPingConfig()
  if (!require_ping) return 0

  const onlineIds = await valkey.smembers('mitras:online')
  if (!onlineIds.length) return 0

  const pipe = valkey.pipeline()
  for (const id of onlineIds) pipe.get(`mitra:heartbeat:${id}`)
  const results = await pipe.exec()

  const cutoff = Date.now() - stale_after_seconds * 1000
  const stale = []
  for (let i = 0; i < onlineIds.length; i++) {
    const ts = results[i][1]
    if (!ts || Date.parse(ts) < cutoff) stale.push(onlineIds[i])
  }
  if (!stale.length) return 0

  // Postgres: bulk flip is_online=false
  await sql`
    UPDATE mitra_online_status
    SET is_online = false, last_offline_at = NOW(), updated_at = NOW()
    WHERE mitra_id = ANY(${sql.array(stale)}::uuid[]) AND is_online = true
  `
  // Log rows
  for (const id of stale) {
    await sql`INSERT INTO mitra_online_logs (mitra_id, status) VALUES (${id}, 'offline')`
  }
  // Valkey: bulk SREM + DEL heartbeat keys
  const cleanup = valkey.pipeline()
  cleanup.srem('mitras:online', ...stale)
  for (const id of stale) cleanup.del(`mitra:heartbeat:${id}`)
  await cleanup.exec()

  invalidateAvailabilityCache()
  return stale.length
}

Stale threshold: stale_after_seconds is read from getMitraPingConfig() (config.service.js) — the existing CC-tunable knob. Not a new env.

Sweep cadence: MITRA_AUTO_OFFLINE_SWEEP_SECONDS env, default 30 (matches current hardcoded setInterval).

Failure semantics: if any Valkey op throws, the entire sweep aborts for this tick. The next tick retries. We never want to mass-offline mitras due to a transient Valkey hiccup.

Shared beacon snapshot cache

Replace the in-process availabilityCache (mitra-status.service.js:14) with a Valkey GET/SETEX key. Even though reads are now sub-ms Valkey ops, this caps total Valkey query load at high beacon-poll rates (5,000 customers × 12/min = 60,000 polls/min → cache → 6 computations/min cluster-wide).

KEY:     availability:snapshot
TYPE:    string (JSON: {"available": bool, "count": number})
TTL:     10 seconds

config:invalidate pub/sub subscriber extended to DEL availability:snapshot on max_customers_per_mitra bust.

Startup / reconnect / reseed

Three triggers reseed Valkey state from Postgres. All idempotent.

Backend startup

In server.js, after Valkey emits 'ready' and before the public listener binds:

const onlineRows = await sql`SELECT mitra_id FROM mitra_online_status WHERE is_online = true`
const deactRows = await sql`SELECT id FROM mitras WHERE is_active = false`
const sessionCountRows = await sql`
  SELECT mitra_id, COUNT(*)::int AS c FROM chat_sessions
  WHERE mitra_id IS NOT NULL AND status IN ('active', 'pending_payment')
  GROUP BY mitra_id
`

const pipe = valkey.multi()
pipe.del('mitras:online', 'mitras:deactivated')
if (onlineRows.length) {
  pipe.sadd('mitras:online', ...onlineRows.map(r => r.mitra_id))
  // Seed heartbeat timestamps to NOW so the first sweep doesn't mass-offline
  // currently-online mitras. They'll refresh on their next ping anyway.
  const now = new Date().toISOString()
  for (const r of onlineRows) pipe.set(`mitra:heartbeat:${r.mitra_id}`, now)
}
if (deactRows.length) pipe.sadd('mitras:deactivated', ...deactRows.map(r => r.id))
for (const r of sessionCountRows) pipe.set(`mitra:capacity:${r.mitra_id}`, r.c)
await pipe.exec()

ioredis reconnect

Listen to the ioredis 'ready' event (fires on initial connect AND each reconnect). Re-run the seed.

Periodic reconciliation sweeper

VALKEY_ONLINE_MIRROR_SWEEP_SECONDS env, default 300. Runs the seed (idempotent — DEL + SADD + SET lands the same state). Belt-and-braces against drift from failed best-effort writes, out-of-band Postgres mutations, Valkey eviction.

Two sweeps, two cadences (summary)

Sweep Purpose Cadence env Default Reads from Writes to
Auto-offline Detect stale-heartbeat mitras → flip offline MITRA_AUTO_OFFLINE_SWEEP_SECONDS 30 Valkey Postgres + Valkey
Heartbeat mirror Persist Valkey heartbeats to Postgres for forensics/backup HEARTBEAT_MIRROR_INTERVAL_SECONDS 60 Valkey Postgres
Reconciliation Heal Valkey/Postgres drift VALKEY_ONLINE_MIRROR_SWEEP_SECONDS 300 Postgres Valkey

All three run on every backend instance independently. All idempotent. No leader election required.

Multi-instance safety

Cloud Run runs N instances. Each instance:

  • Writes both stores on its own mutations. Atomic Valkey ops (SADD / SREM / INCR / DECR / SET) — no cross-instance coordination needed.
  • Runs all three sweeps independently. Redundant but idempotent.
  • Recompute-on-read for blast eligibility — no stale aggregate to invalidate.

The mitra:capacity:<id> counter is the most race-sensitive: INCR / DECR are atomic but a session-state change must consistently fire exactly one increment and one decrement over its lifetime. The reconciliation sweep recomputes from chat_sessions and resets the counter, healing any drift.

Failure mode summary

Behavior
Valkey unreachable on read Fall back to Postgres JOIN query
Valkey unreachable on write Log + continue. Reconciliation sweep heals later.
Postgres unreachable on heartbeat mirror Skip this cycle. Next cycle writes the latest.
Auto-offline sweep can't reach Valkey Skip this tick. Mitras stay "online" until Valkey comes back + heartbeat ages out.
Valkey crash (catastrophic) Backend reconnects → reseed from Postgres. Worst case: ≤60s of last_heartbeat_at forensics lost.
Backend crash Other instances keep running. New instance reseed on startup.

Files touched

File Change
backend/src/plugins/valkey.js Add wrappers: sadd, srem, sismember, smembers, sdiff, scard, set, get, del, incr, decr, pipeline/multi + 'ready' reconnect hook
backend/src/services/config.service.js Add getMitraAutoOfflineSweepSeconds, getHeartbeatMirrorIntervalSeconds, getValkeyOnlineMirrorSweepSeconds getters
backend/src/services/mitra-status.service.js Major rewrite (see § Read path and § Write paths). Add incrementCapacity, decrementCapacity, mirrorHeartbeatsToPostgres, seedFromPostgres
backend/src/services/mitra.service.js Wrap updateMitraStatus with SADD/SREM mitras:deactivated
backend/src/services/pairing.service.js Rewrite findAvailableMitras as Valkey-driven (Postgres fallback). INCR mitra:capacity on session accept.
backend/src/services/closure.service.js DECR mitra:capacity on session end/expire/cancel
backend/src/services/session.service.js DECR + INCR pair on reroute
backend/src/services/dashboard.service.js SCARD mitras:online for online count
backend/src/routes/public/mitra.status.routes.js Replace resolveMitra on POST /heartbeat with a Valkey SISMEMBER mitras:deactivated check (keep resolveMitra on /online, /offline, GET /)
backend/src/server.js Call seedFromPostgres on startup (before listener binds); replace hardcoded 30_000 setInterval with env-driven cadence; register heartbeat mirror + reconciliation sweep intervals
backend/.env.example Document MITRA_AUTO_OFFLINE_SWEEP_SECONDS, HEARTBEAT_MIRROR_INTERVAL_SECONDS, VALKEY_ONLINE_MIRROR_SWEEP_SECONDS
backend/test/services/mitra-status.service.test.js Add tests (see § Test plan)
backend/test/services/pairing.service.test.js Update for Valkey-driven findAvailableMitras
backend/test/helpers/valkey.js (new if absent) Test helper for clean-slate Valkey state per test

Estimated touch: ~400 LOC + ~200 LOC tests. ~2 days focused work.

Test plan

Unit

  1. Mock Valkey; verify each writer calls Postgres → Valkey in correct order, with the seed-heartbeat-on-setOnline and DEL-on-setOffline.
  2. Verify reader fallback path runs when Valkey ops throw.
  3. Verify auto-offline sweep aborts entirely when Valkey ops throw (does NOT mass-offline via Postgres-only path).
  4. Verify capacity counter never goes negative (Math.max guard).

Integration (real Valkey + Postgres)

  1. Seed correctness: insert N online rows + M deactivated + session counts in DB; run startup seed; verify all four Valkey structures match.
  2. Heartbeat refresh: setHeartbeat() → verify Valkey value updates; check that the periodic mirror writes Postgres last_heartbeat_at within one mirror cycle.
  3. Auto-offline: insert online mitra, manually expire heartbeat by setting timestamp in past, run sweep, verify Postgres is_online=false + Valkey SREM + DEL heartbeat key.
  4. Capacity lifecycle: simulate session accept → end across multiple mitras; verify counter increments/decrements; verify reroute moves count from A to B atomically.
  5. Restart resilience: seed state, simulate Valkey restart (FLUSHALL), trigger reconnect handler, verify all four structures reseed correctly.
  6. Reconciliation: corrupt Valkey (random SADD of non-existent mitra, wrong capacity counter, missing entries); run reconciliation sweep; verify convergence to Postgres state.
  7. Fallback: disable Valkey mid-test; verify findAvailableMitras falls back to Postgres JOIN query and returns sensible results.

Regression

  • All 90/92 existing backend tests should still pass.
  • Maestro flows for pairing (ts-customer-*) should pass unchanged.

Decisions (locked 2026-05-25)

  1. revokeAllAuthSessions(mitraId) added to updateMitraStatus in the same PR. Bounds the deactivation gap to access-token TTL across all mitra routes (not just heartbeat).
  2. Prod Valkey: Memorystore for Valkey, Standard tier (HA with replica, smallest available capacity ~1GB). Built-in replication keeps heartbeat timestamps live across failover. Staging/dev can run Basic tier — the reseed-from-Postgres flow handles cold-cache restarts correctly either way.
  3. Keep last_heartbeat_at column. Written by the 60s batched mirror; remains available for operator forensics ("when was X last seen?"). Drop only if a future audit confirms no consumer reads it.

Future phases (deferred)

  • Heartbeat → Valkey TTL with keyspace notifications. Replace timestamp-comparison sweep with notify-keyspace-events Ex → instant detection of expired heartbeats. Requires Memorystore config change + a subscriber. Defer until 30s detection lag is the visible bottleneck.
  • Leader-elected mirror/sweep. Use a Valkey-NX lease so only one instance runs each background job. ~15 LOC each. Defer until the redundant work shows up in metrics.