diff --git a/backend/.env.example b/backend/.env.example index ff03217..612ddb9 100644 --- a/backend/.env.example +++ b/backend/.env.example @@ -41,6 +41,25 @@ ADMIN_PASSWORD=ChangeMe123! # Path to Firebase service-account JSON (falls back to backend/firebase-service-account.json) FIREBASE_SERVICE_ACCOUNT_PATH= +# --- Valkey availability mirror cadences --- +# +# All env-driven per backend/CLAUDE.md Config-Source Convention. Defaults match +# requirement/valkey-online-mirror-plan.md. Floor-clamped by their getters. + +# How often the auto-offline sweep checks heartbeat freshness (seconds). +# The staleness threshold itself (`stale_after_seconds`) is CC-tunable via app_config. +MITRA_AUTO_OFFLINE_SWEEP_SECONDS=30 + +# How often heartbeat timestamps are batched from Valkey → Postgres (seconds). +# Per-ping heartbeat writes go to Valkey only; this preserves forensic +# `last_heartbeat_at` in Postgres with up to seconds of lag. +HEARTBEAT_MIRROR_INTERVAL_SECONDS=60 + +# How often Valkey state is re-derived from Postgres to heal drift (seconds). +# Belt-and-braces against failed best-effort Valkey writes, out-of-band Postgres +# mutations, or evictions. Set to 0 to disable (not recommended). +VALKEY_ONLINE_MIRROR_SWEEP_SECONDS=300 + # --- Phase 5: Xendit (dev-safe defaults: integration disabled) --- # # Flip XENDIT_ENABLED=true in staging/prod once secret + webhook token are populated. diff --git a/backend/src/plugins/valkey.js b/backend/src/plugins/valkey.js index f4885e2..9cb9cc1 100644 --- a/backend/src/plugins/valkey.js +++ b/backend/src/plugins/valkey.js @@ -4,10 +4,31 @@ let pub let sub let client +// 'ready' listeners (registered before connect; fire on initial connect AND each +// reconnect). Used by services that need to reseed Valkey state from Postgres. +const readyListeners = new Set() + +const attachReadyHandler = (instance) => { + instance.on('ready', () => { + for (const fn of readyListeners) { + // Fire-and-forget; each listener owns its own error handling. + Promise.resolve() + .then(() => fn()) + .catch((err) => console.error('[valkey] ready listener failed:', err)) + } + }) +} + +export const onValkeyReady = (fn) => { + readyListeners.add(fn) + return () => readyListeners.delete(fn) +} + export const getValkeyClient = () => { if (!client) { const url = process.env.VALKEY_URL || 'redis://localhost:6379' client = new Redis(url) + attachReadyHandler(client) } return client } @@ -51,3 +72,25 @@ export const subscribe = (channel, callback) => { console.log(`[valkey] unsubscribed from ${channel}`) } } + +// --- Thin wrappers used by the mitra-availability mirror --- +// +// Each wrapper uses the shared `client` (separate ioredis instance from pub/sub +// to keep subscribe state isolated). Callers in services/* wrap these in +// try/catch and fall back to Postgres on error — see the plan doc. + +export const sadd = (key, ...members) => getValkeyClient().sadd(key, ...members) +export const srem = (key, ...members) => getValkeyClient().srem(key, ...members) +export const sismember = async (key, member) => + (await getValkeyClient().sismember(key, member)) === 1 +export const smembers = (key) => getValkeyClient().smembers(key) +export const sdiff = (...keys) => getValkeyClient().sdiff(...keys) +export const scard = (key) => getValkeyClient().scard(key) +export const set = (key, value) => getValkeyClient().set(key, value) +export const get = (key) => getValkeyClient().get(key) +export const del = (...keys) => getValkeyClient().del(...keys) +export const incr = (key) => getValkeyClient().incr(key) +export const decr = (key) => getValkeyClient().decr(key) +export const exists = (key) => getValkeyClient().exists(key) +export const pipeline = () => getValkeyClient().pipeline() +export const multi = () => getValkeyClient().multi() diff --git a/backend/src/routes/public/mitra.status.routes.js b/backend/src/routes/public/mitra.status.routes.js index 22038b3..25be5cc 100644 --- a/backend/src/routes/public/mitra.status.routes.js +++ b/backend/src/routes/public/mitra.status.routes.js @@ -1,6 +1,8 @@ import { authenticate } from '../../plugins/auth.js' import { getMitraById } from '../../services/mitra.service.js' import * as mitraStatusService from '../../services/mitra-status.service.js' +import * as valkey from '../../plugins/valkey.js' +import { VK_MITRAS_DEACTIVATED } from '../../services/mitra-status.service.js' import { UserType } from '../../constants.js' export const mitraStatusRoutes = async (app) => { @@ -27,6 +29,32 @@ export const mitraStatusRoutes = async (app) => { request.mitra = mitra } + // Lightweight heartbeat guard: no Postgres SELECT in the hot path. Checks + // `mitras:deactivated` in Valkey (maintained on every updateMitraStatus) and + // falls back to the full resolveMitra/DB check if Valkey is unreachable so a + // Valkey outage doesn't accept heartbeats from deactivated mitras silently. + const heartbeatGuard = async (request, reply) => { + if (request.auth?.userType !== UserType.MITRA) { + return reply.code(403).send({ + success: false, + error: { code: 'FORBIDDEN', message: 'Mitra account required' }, + }) + } + try { + const deactivated = await valkey.sismember(VK_MITRAS_DEACTIVATED, request.auth.userId) + if (deactivated) { + return reply.code(403).send({ + success: false, + error: { code: 'ACCOUNT_INACTIVE', message: 'Account is inactive' }, + }) + } + return + } catch (err) { + console.warn('[heartbeat] valkey check failed, falling back to DB:', err.message) + return resolveMitra(request, reply) + } + } + app.post('/online', { preHandler: [authenticate, resolveMitra] }, async (request, reply) => { await mitraStatusService.setOnline(request.mitra.id) return reply.send({ success: true, data: { is_online: true } }) @@ -37,8 +65,8 @@ export const mitraStatusRoutes = async (app) => { return reply.send({ success: true, data: { is_online: false } }) }) - app.post('/heartbeat', { preHandler: [authenticate, resolveMitra] }, async (request, reply) => { - await mitraStatusService.heartbeat(request.mitra.id) + app.post('/heartbeat', { preHandler: [authenticate, heartbeatGuard] }, async (request, reply) => { + await mitraStatusService.heartbeat(request.auth.userId) return reply.send({ success: true }) }) diff --git a/backend/src/server.js b/backend/src/server.js index 40d2707..46f583c 100644 --- a/backend/src/server.js +++ b/backend/src/server.js @@ -1,7 +1,13 @@ import 'dotenv/config' + import { buildPublicApp } from './app.public.js' import { buildInternalApp } from './app.internal.js' -import { autoOfflineStaleMitras } from './services/mitra-status.service.js' +import { autoOfflineStaleMitras, seedFromPostgres, mirrorHeartbeatsToPostgres } from './services/mitra-status.service.js' +import { + getMitraAutoOfflineSweepSeconds, + getHeartbeatMirrorIntervalSeconds, + getValkeyOnlineMirrorSweepSeconds, +} from './services/config.service.js' import { initFirebase } from './plugins/firebase.js' import { restoreActiveTimers } from './services/session-timer.service.js' import { expireStalePaymentRequests, registerPairingSubscriber } from './services/payment.service.js' @@ -23,18 +29,22 @@ const start = async () => { } initFirebase() + const publicApp = await buildPublicApp() const internalApp = await buildInternalApp() + // restoreActiveTimers runs bulk UPDATEs on chat_sessions to clean up stale + // ACTIVE/CLOSING rows from before the restart. Run it BEFORE seedFromPostgres + // so the seed sees the post-cleanup state and capacity counters are accurate. + await restoreActiveTimers() + await seedFromPostgres() + await publicApp.listen({ port: PUBLIC_PORT, host: '0.0.0.0' }) console.log(`Public API listening on port ${PUBLIC_PORT}`) await internalApp.listen({ port: INTERNAL_PORT, host: INTERNAL_HOST }) console.log(`Internal API listening on ${INTERNAL_HOST}:${INTERNAL_PORT}`) - // Restore session timers for active sessions (on server restart) - await restoreActiveTimers() - // Phase 5: wire pairing service as a subscriber to payment_request.confirmed events. // Must happen AFTER all services are loaded so the subscriber registration sees // the EventEmitter set up by payment.service.js at module-load time. @@ -53,7 +63,8 @@ const start = async () => { console.error('Startup reconciliation failed:', err) } - // Auto-offline mitras with stale heartbeat (every 30s) + // Auto-offline mitras with stale heartbeat (env-driven cadence, default 30s). + // Valkey-driven per requirement/valkey-online-mirror-plan.md. setInterval(async () => { try { const count = await autoOfflineStaleMitras() @@ -61,7 +72,32 @@ const start = async () => { } catch (err) { console.error('Auto-offline check failed:', err) } - }, 30_000) + }, getMitraAutoOfflineSweepSeconds() * 1000) + + // Batched heartbeat mirror: Valkey heartbeat timestamps → Postgres + // last_heartbeat_at (default 60s). Keeps forensic column current without + // per-ping DB writes. One UNNEST UPDATE per tick; idempotent across instances. + setInterval(async () => { + try { + await mirrorHeartbeatsToPostgres() + } catch (err) { + console.error('Heartbeat mirror failed:', err) + } + }, getHeartbeatMirrorIntervalSeconds() * 1000) + + // Reconciliation sweep: heal Valkey/Postgres drift (default 300s; 0 disables). + // Belt-and-braces against failed best-effort Valkey writes, out-of-band + // Postgres mutations, evictions. Idempotent — just runs the seed. + const reconciliationSeconds = getValkeyOnlineMirrorSweepSeconds() + if (reconciliationSeconds > 0) { + setInterval(async () => { + try { + await seedFromPostgres() + } catch (err) { + console.error('Valkey reconciliation sweep failed:', err) + } + }, reconciliationSeconds * 1000) + } // Expire stale payment_requests + reconcile lost subscriber work (every 60s). // Pending past expires_at → expired (no failure row). diff --git a/backend/src/services/closure.service.js b/backend/src/services/closure.service.js index 8ac574f..55d10b0 100644 --- a/backend/src/services/closure.service.js +++ b/backend/src/services/closure.service.js @@ -3,6 +3,7 @@ import { publish } from '../plugins/valkey.js' import { clearSessionTimer, clearClosureGraceTimer, startClosureGraceTimer } from './session-timer.service.js' import { sendToSessionParticipant } from '../plugins/websocket.js' import { sendPushNotification } from './notification.service.js' +import { recomputeCapacityForMitra } from './mitra-status.service.js' import { UserType, SessionStatus, EndedBy, WsMessage } from '../constants.js' const sql = getDb() @@ -59,6 +60,7 @@ export const completeSession = async (sessionId) => { RETURNING id, customer_id, mitra_id, status, ended_at ` if (!session) return null + await recomputeCapacityForMitra(session.mitra_id) // Notify both parties, FCM fallback if WebSocket is down const data = { type: WsMessage.SESSION_COMPLETED, session_id: sessionId } @@ -109,6 +111,7 @@ export const initiateEarlyEnd = async (sessionId, userType) => { code: 'SESSION_NOT_ACTIVE', statusCode: 409, }) } + await recomputeCapacityForMitra(session.mitra_id) clearSessionTimer(sessionId) startClosureGraceTimer(sessionId) diff --git a/backend/src/services/config.service.js b/backend/src/services/config.service.js index ae1fcbe..fa6e7a3 100644 --- a/backend/src/services/config.service.js +++ b/backend/src/services/config.service.js @@ -149,6 +149,34 @@ export const getMitraHeartbeatCadenceSeconds = () => { return Number.isFinite(parsed) && parsed >= 5 ? parsed : 30 } +// --- Valkey availability mirror — env-driven cadences --- +// +// Per requirement/valkey-online-mirror-plan.md. All three are operational +// knobs (env, per backend/CLAUDE.md Config-Source Convention), not +// operator-tunable. Defaults match the plan; values are floor-clamped. + +export const getMitraAutoOfflineSweepSeconds = () => { + const raw = process.env.MITRA_AUTO_OFFLINE_SWEEP_SECONDS + if (!raw || raw.trim() === '') return 30 + const parsed = Number.parseInt(raw, 10) + return Number.isFinite(parsed) && parsed >= 5 ? parsed : 30 +} + +export const getHeartbeatMirrorIntervalSeconds = () => { + const raw = process.env.HEARTBEAT_MIRROR_INTERVAL_SECONDS + if (!raw || raw.trim() === '') return 60 + const parsed = Number.parseInt(raw, 10) + return Number.isFinite(parsed) && parsed >= 10 ? parsed : 60 +} + +export const getValkeyOnlineMirrorSweepSeconds = () => { + const raw = process.env.VALKEY_ONLINE_MIRROR_SWEEP_SECONDS + if (!raw || raw.trim() === '') return 300 + const parsed = Number.parseInt(raw, 10) + if (parsed === 0) return 0 // explicit disable + return Number.isFinite(parsed) && parsed >= 30 ? parsed : 300 +} + // --- Phase 5: Xendit integration --- // // Env-driven (per backend/CLAUDE.md Config-Source Convention). All five values diff --git a/backend/src/services/dashboard.service.js b/backend/src/services/dashboard.service.js index a4e1f44..5147ab0 100644 --- a/backend/src/services/dashboard.service.js +++ b/backend/src/services/dashboard.service.js @@ -1,19 +1,33 @@ import { getDb } from '../db/client.js' +import * as valkey from '../plugins/valkey.js' +import { VK_MITRAS_ONLINE } from './mitra-status.service.js' import { SessionStatus, TopicSensitivity } from '../constants.js' const sql = getDb() +// Valkey-fast SCARD with Postgres fallback. The CC dashboard polls every few +// seconds; SCARD is sub-ms so this keeps the dashboard responsive at any scale. +const getOnlineMitrasCount = async () => { + try { + return await valkey.scard(VK_MITRAS_ONLINE) + } catch (err) { + console.warn('[dashboard] valkey unavailable, falling back to DB:', err.message) + const [{ c }] = await sql`SELECT COUNT(*)::int AS c FROM mitra_online_status WHERE is_online = true` + return c + } +} + export const getDashboardStats = async () => { const [ [{ active_chats }], - [{ online_mitras }], + online_mitras, [{ pending_requests }], [{ sensitive_total }], [{ sensitive_last_30d_total }], [{ sensitive_last_30d_sensitive }], ] = await Promise.all([ sql`SELECT COUNT(*) AS active_chats FROM chat_sessions WHERE status IN (${SessionStatus.ACTIVE}, ${SessionStatus.PENDING_PAYMENT})`, - sql`SELECT COUNT(*) AS online_mitras FROM mitra_online_status WHERE is_online = true`, + getOnlineMitrasCount(), sql`SELECT COUNT(*) AS pending_requests FROM chat_sessions WHERE status IN (${SessionStatus.SEARCHING}, ${SessionStatus.PENDING_ACCEPTANCE})`, sql`SELECT COUNT(*) AS sensitive_total FROM chat_sessions WHERE topic_sensitivity = ${TopicSensitivity.SENSITIVE}`, sql`SELECT COUNT(*) AS sensitive_last_30d_total FROM chat_sessions WHERE created_at >= NOW() - INTERVAL '30 days'`, diff --git a/backend/src/services/extension.service.js b/backend/src/services/extension.service.js index b02c299..76bba70 100644 --- a/backend/src/services/extension.service.js +++ b/backend/src/services/extension.service.js @@ -1,7 +1,7 @@ import { getDb } from '../db/client.js' import { sendToSessionParticipant, isUserOnlineWs } from '../plugins/websocket.js' import { extendSessionTimer, clearClosureGraceTimer, startClosureGraceTimer } from './session-timer.service.js' -import { isMitraReachable } from './mitra-status.service.js' +import { isMitraReachable, recomputeCapacityBySession } from './mitra-status.service.js' import { consumePaymentSession, failPaymentSession, getPaymentSession } from './payment.service.js' import { sendPushNotification } from './notification.service.js' import { @@ -104,6 +104,7 @@ export const requestExtension = async (sessionId, customerId, { duration_minutes // Pause the session await sql`UPDATE chat_sessions SET status = ${SessionStatus.EXTENDING} WHERE id = ${sessionId}` + await recomputeCapacityBySession(sessionId) // Resolve timeout once so we can both surface it in the WS payload and start the server-side timer. const timeoutMs = await getExtensionTimeoutMs() @@ -213,6 +214,7 @@ const finalizeExtension = async (extensionId, sessionId, accepted, viaTimeout) = // Resume session await sql`UPDATE chat_sessions SET status = ${SessionStatus.ACTIVE} WHERE id = ${extension.session_id}` + await recomputeCapacityBySession(extension.session_id) // Record transaction await sql` @@ -249,6 +251,7 @@ const finalizeExtension = async (extensionId, sessionId, accepted, viaTimeout) = } await sql`UPDATE chat_sessions SET status = ${SessionStatus.CLOSING} WHERE id = ${extension.session_id}` + await recomputeCapacityBySession(extension.session_id) sendToSessionParticipant(sessionId, UserType.CUSTOMER, { type: WsMessage.EXTENSION_RESPONSE, @@ -331,6 +334,7 @@ const timeoutExtension = async (extensionId, sessionId, mitraId) => { // Move session to closing & notify both parties (matches the explicit-reject UX). await sql`UPDATE chat_sessions SET status = ${SessionStatus.CLOSING} WHERE id = ${sessionId}` + await recomputeCapacityBySession(sessionId) sendToSessionParticipant(sessionId, UserType.CUSTOMER, { type: WsMessage.EXTENSION_RESPONSE, accepted: false, diff --git a/backend/src/services/mitra-status.service.js b/backend/src/services/mitra-status.service.js index 558deda..5ea6eac 100644 --- a/backend/src/services/mitra-status.service.js +++ b/backend/src/services/mitra-status.service.js @@ -1,24 +1,80 @@ import { getDb } from '../db/client.js' import { SessionStatus } from '../constants.js' import { getMitraPingConfig, getMaxCustomersPerMitra } from './config.service.js' -import { subscribe } from '../plugins/valkey.js' +import * as valkey from '../plugins/valkey.js' +import { subscribe, onValkeyReady } from '../plugins/valkey.js' const sql = getDb() -// --- Short-TTL availability cache for the 5s-poll endpoint --- -// In-memory snapshot { available, count, expiresAt }. The cache: -// - is recomputed at most once per AVAILABILITY_TTL_MS (10s backstop) -// - is invalidated explicitly when CC changes max_customers_per_mitra (call invalidateAvailabilityCache()) -// This keeps customer polls off the DB hot path while staying close to real time. -const AVAILABILITY_TTL_MS = 10_000 -let availabilityCache = null // { available, count, expiresAt } +// Per requirement/valkey-online-mirror-plan.md § Schema. +export const VK_MITRAS_ONLINE = 'mitras:online' +export const VK_MITRAS_DEACTIVATED = 'mitras:deactivated' +export const vkCapacityKey = (mitraId) => `mitra:capacity:${mitraId}` +export const vkHeartbeatKey = (mitraId) => `mitra:heartbeat:${mitraId}` -export const invalidateAvailabilityCache = () => { - availabilityCache = null +// Rebuilds Valkey availability state from Postgres. Called on backend startup, +// on Valkey reconnect (via onValkeyReady), and by the reconciliation sweep. +// Idempotent — DEL + bulk SADD/SET produces the same final state on every run. +export const seedFromPostgres = async () => { + try { + const [onlineRows, deactRows, capacityRows] = await Promise.all([ + sql`SELECT mitra_id FROM mitra_online_status WHERE is_online = true`, + sql`SELECT id FROM mitras WHERE is_active = false`, + sql` + SELECT mitra_id, COUNT(*)::int AS c FROM chat_sessions + WHERE mitra_id IS NOT NULL + AND status IN (${SessionStatus.ACTIVE}, ${SessionStatus.PENDING_PAYMENT}) + GROUP BY mitra_id + `, + ]) + + const pipe = valkey.pipeline() + pipe.del(VK_MITRAS_ONLINE) + pipe.del(VK_MITRAS_DEACTIVATED) + + const now = new Date().toISOString() + if (onlineRows.length) { + pipe.sadd(VK_MITRAS_ONLINE, ...onlineRows.map((r) => r.mitra_id)) + // Seed heartbeats with NOW so the first sweep after restart doesn't + // mass-offline. Mitras refresh on their next ping anyway. + for (const r of onlineRows) pipe.set(vkHeartbeatKey(r.mitra_id), now) + // Reset capacity for currently-online mitras; overlay real counts below. + // Offline mitras' stale capacity keys don't affect reads (SDIFF excludes them). + for (const r of onlineRows) pipe.set(vkCapacityKey(r.mitra_id), 0) + } + if (deactRows.length) { + pipe.sadd(VK_MITRAS_DEACTIVATED, ...deactRows.map((r) => r.id)) + } + for (const r of capacityRows) pipe.set(vkCapacityKey(r.mitra_id), r.c) + await pipe.exec() + + console.log( + `[valkey-mirror] seed: ${onlineRows.length} online, ${deactRows.length} deactivated, ${capacityRows.length} with active sessions`, + ) + } catch (err) { + console.error('[valkey-mirror] seed failed:', err) + } } -// Subscribe once at module load so other-instance config updates also bust this cache. -// Single-instance: the local mutator already invalidates, so this is a no-op extra. +// Re-seed on every Valkey reconnect (fires on initial connect too). +onValkeyReady(seedFromPostgres) + +// --- Beacon snapshot cache (Valkey-backed, cluster-shared) --- +// `availability:snapshot` JSON `{available, count}`, TTL 10s. All backend +// instances share the same cache: one Valkey-driven compute per 10s +// cluster-wide regardless of how many customers are polling. +const AVAILABILITY_CACHE_KEY = 'availability:snapshot' +const AVAILABILITY_TTL_SECONDS = 10 + +export const invalidateAvailabilityCache = async () => { + try { + await valkey.del(AVAILABILITY_CACHE_KEY) + } catch (err) { + console.error('[valkey-mirror] invalidateAvailabilityCache failed:', err) + } +} + +// Bust the shared cache when CC changes max_customers_per_mitra (any instance). let _subscribed = false const ensureSubscribed = () => { if (_subscribed) return @@ -43,6 +99,39 @@ export const ensureStatusRow = async (mitraId) => { ` } +// Best-effort Valkey writer. Postgres remains source of truth; a Valkey hiccup +// shouldn't fail the originating request — the reconciliation sweep heals drift. +const tryValkey = async (fn, label) => { + try { await fn() } catch (err) { + console.error(`[valkey-mirror] ${label} failed:`, err) + } +} + +// Recompute `mitra:capacity:` from chat_sessions truth. Called after every +// chat_session state change that could affect a mitra's occupied-slot count. +// Recompute-from-truth avoids the bookkeeping risks of per-transition INCR/DECR +// (double-counts, missed transitions across all the UPDATE sites in pairing, +// closure, extension, session-timer, session services). +export const recomputeCapacityForMitra = async (mitraId) => { + if (!mitraId) return + const [row] = await sql` + SELECT COUNT(*)::int AS c FROM chat_sessions + WHERE mitra_id = ${mitraId} + AND status IN (${SessionStatus.ACTIVE}, ${SessionStatus.PENDING_PAYMENT}) + ` + await tryValkey( + () => valkey.set(vkCapacityKey(mitraId), row.c), + `recomputeCapacity ${mitraId}`, + ) +} + +// Lookup mitra_id from the session, then recompute. Use this from UPDATE sites +// where the session's mitra_id may not be in local scope. +export const recomputeCapacityBySession = async (sessionId) => { + const [row] = await sql`SELECT mitra_id FROM chat_sessions WHERE id = ${sessionId}` + if (row?.mitra_id) await recomputeCapacityForMitra(row.mitra_id) +} + export const setOnline = async (mitraId) => { await ensureStatusRow(mitraId) const now = new Date() @@ -54,6 +143,14 @@ export const setOnline = async (mitraId) => { await sql` INSERT INTO mitra_online_logs (mitra_id, status) VALUES (${mitraId}, 'online') ` + + await tryValkey(async () => { + const pipe = valkey.pipeline() + pipe.sadd(VK_MITRAS_ONLINE, mitraId) + pipe.set(vkHeartbeatKey(mitraId), now.toISOString()) + await pipe.exec() + }, `setOnline ${mitraId}`) + invalidateAvailabilityCache() } @@ -73,16 +170,32 @@ export const setOffline = async (mitraId) => { await sql` INSERT INTO mitra_online_logs (mitra_id, status) VALUES (${mitraId}, 'offline') ` + + await tryValkey(async () => { + const pipe = valkey.pipeline() + pipe.srem(VK_MITRAS_ONLINE, mitraId) + pipe.del(vkHeartbeatKey(mitraId)) + await pipe.exec() + }, `setOffline ${mitraId}`) + invalidateAvailabilityCache() } +// Heartbeat hot path: Valkey-only. Per-ping Postgres UPDATE eliminated; the +// 60s batched heartbeat-mirror job (mirrorHeartbeatsToPostgres) writes +// `last_heartbeat_at` to Postgres for forensics/restart safety. +// +// NOTE: there is intentionally no `is_online = true` gate here (the old SQL +// UPDATE had one). The Valkey SET is global; if a mitra heartbeats while +// `is_online=false` in Postgres, their TTL key gets refreshed but they're +// still not in `mitras:online`, so blast eligibility is unchanged. The +// reconciliation sweep will clean up the orphan heartbeat key. export const heartbeat = async (mitraId) => { - const now = new Date() - await sql` - UPDATE mitra_online_status - SET last_heartbeat_at = ${now}, updated_at = ${now} - WHERE mitra_id = ${mitraId} AND is_online = true - ` + const now = new Date().toISOString() + await tryValkey( + () => valkey.set(vkHeartbeatKey(mitraId), now), + `heartbeat ${mitraId}`, + ) } export const getStatus = async (mitraId) => { @@ -130,39 +243,95 @@ export const getOnlineLogs = async (mitraId, { page = 1, limit = 50 } = {}) => { return { items, total: Number(count), page, limit } } +// Valkey-driven: enumerate mitras:online, read each heartbeat timestamp from +// Valkey, find stales, then bulk-flip Postgres + clean up Valkey. +// +// Failure semantics: if any Valkey op throws, the sweep aborts entirely. We +// never mass-offline mitras via a Postgres scan because Valkey is unreachable +// — that would risk false-offlining a fleet during a Valkey hiccup. export const autoOfflineStaleMitras = async () => { const pingConfig = await getMitraPingConfig() - - // If ping is not required, skip the auto-offline sweep entirely if (!pingConfig.require_ping) return 0 - // stale_after_seconds is the operator-facing knob — what they set is what - // they get. No multiplier, no implicit "tolerate N missed heartbeats" - // contract baked in. The CC PATCH validates that the value is >= the env- - // driven heartbeat cadence so single missed pings can't flip a mitra - // offline. - const staleSeconds = pingConfig.stale_after_seconds - const stale = await sql` - UPDATE mitra_online_status - SET is_online = false, last_offline_at = NOW(), updated_at = NOW() - WHERE is_online = true - AND last_heartbeat_at < NOW() - ${staleSeconds + ' seconds'}::interval - RETURNING mitra_id - ` - - for (const row of stale) { - await sql` - INSERT INTO mitra_online_logs (mitra_id, status) VALUES (${row.mitra_id}, 'offline') - ` + let onlineIds, heartbeatValues + try { + onlineIds = await valkey.smembers(VK_MITRAS_ONLINE) + if (!onlineIds.length) return 0 + const pipe = valkey.pipeline() + for (const id of onlineIds) pipe.get(vkHeartbeatKey(id)) + const results = await pipe.exec() + heartbeatValues = results.map((r) => r[1]) + } catch (err) { + console.warn('[auto-offline] valkey unavailable, skipping this tick:', err.message) + return 0 } - // Capacity may have changed (mitra went offline) — invalidate the customer-facing - // availability cache so the next poll reflects reality. - if (stale.length > 0) invalidateAvailabilityCache() + const cutoff = Date.now() - pingConfig.stale_after_seconds * 1000 + const stale = [] + for (let i = 0; i < onlineIds.length; i++) { + const ts = heartbeatValues[i] + if (!ts || Date.parse(ts) < cutoff) stale.push(onlineIds[i]) + } + if (!stale.length) return 0 + await sql` + UPDATE mitra_online_status + SET is_online = false, last_offline_at = NOW(), updated_at = NOW() + WHERE mitra_id = ANY(${sql.array(stale)}::uuid[]) AND is_online = true + ` + for (const id of stale) { + await sql`INSERT INTO mitra_online_logs (mitra_id, status) VALUES (${id}, 'offline')` + } + await tryValkey(async () => { + const cleanup = valkey.pipeline() + cleanup.srem(VK_MITRAS_ONLINE, ...stale) + for (const id of stale) cleanup.del(vkHeartbeatKey(id)) + await cleanup.exec() + }, `auto-offline cleanup (${stale.length} stale)`) + + invalidateAvailabilityCache() return stale.length } +// Batched mirror: Valkey heartbeat timestamps → Postgres `last_heartbeat_at`. +// Runs every HEARTBEAT_MIRROR_INTERVAL_SECONDS (default 60). One UNNEST UPDATE +// regardless of online count. Idempotent — latest timestamp wins; multiple +// instances running concurrently is fine (no leader election needed). +export const mirrorHeartbeatsToPostgres = async () => { + let onlineIds, heartbeatValues + try { + onlineIds = await valkey.smembers(VK_MITRAS_ONLINE) + if (!onlineIds.length) return 0 + const pipe = valkey.pipeline() + for (const id of onlineIds) pipe.get(vkHeartbeatKey(id)) + const results = await pipe.exec() + heartbeatValues = results.map((r) => r[1]) + } catch (err) { + console.warn('[heartbeat-mirror] valkey unavailable, skipping:', err.message) + return 0 + } + + const ids = [] + const ts = [] + for (let i = 0; i < onlineIds.length; i++) { + if (heartbeatValues[i]) { + ids.push(onlineIds[i]) + ts.push(heartbeatValues[i]) + } + } + if (!ids.length) return 0 + + await sql` + UPDATE mitra_online_status m + SET last_heartbeat_at = u.ts::timestamptz, updated_at = NOW() + FROM ( + SELECT * FROM UNNEST(${sql.array(ids)}::uuid[], ${sql.array(ts)}::text[]) AS t(mitra_id, ts) + ) u + WHERE m.mitra_id = u.mitra_id + ` + return ids.length +} + /** * Customer-home availability check, cached in-memory for AVAILABILITY_TTL_MS. * @@ -178,12 +347,33 @@ export const autoOfflineStaleMitras = async () => { * sets/hashes (matching the existing memory item "Session Timer Scaling"); the contract * of this function — Valkey/cache reads only on the hot path — stays the same. */ -export const countAvailableMitrasFromCache = async () => { - const now = Date.now() - if (availabilityCache && availabilityCache.expiresAt > now) { - return { available: availabilityCache.available, count: availabilityCache.count } - } +const computeAvailabilityFromValkey = async () => { + const { max_customers_per_mitra } = await getMaxCustomersPerMitra() + const { stale_after_seconds } = await getMitraPingConfig() + const candidates = await valkey.sdiff(VK_MITRAS_ONLINE, VK_MITRAS_DEACTIVATED) + if (!candidates.length) return { available: false, count: 0 } + + const pipe = valkey.pipeline() + for (const id of candidates) { + pipe.get(vkCapacityKey(id)) + pipe.get(vkHeartbeatKey(id)) + } + const results = await pipe.exec() + + const cutoff = Date.now() - stale_after_seconds * 1000 + let count = 0 + for (let i = 0; i < candidates.length; i++) { + const capacity = Number(results[i * 2][1] ?? 0) + const heartbeat = results[i * 2 + 1][1] + if (capacity >= max_customers_per_mitra) continue + if (!heartbeat || Date.parse(heartbeat) < cutoff) continue + count++ + } + return { available: count > 0, count } +} + +const computeAvailabilityFromPostgres = async () => { const { max_customers_per_mitra } = await getMaxCustomersPerMitra() const [{ count }] = await sql` SELECT COUNT(*)::int AS count @@ -197,26 +387,42 @@ export const countAvailableMitrasFromCache = async () => { AND cs.status IN (${SessionStatus.ACTIVE}, ${SessionStatus.PENDING_PAYMENT}) ) < ${max_customers_per_mitra} ` + return { available: count > 0, count } +} - const available = count > 0 - availabilityCache = { - available, - count, - expiresAt: now + AVAILABILITY_TTL_MS, +export const countAvailableMitrasFromCache = async () => { + try { + const cached = await valkey.get(AVAILABILITY_CACHE_KEY) + if (cached) return JSON.parse(cached) + + const snapshot = await computeAvailabilityFromValkey() + await valkey.getValkeyClient().setex(AVAILABILITY_CACHE_KEY, AVAILABILITY_TTL_SECONDS, JSON.stringify(snapshot)) + return snapshot + } catch (err) { + console.warn('[countAvailableMitras] valkey unavailable, falling back to Postgres:', err.message) + return computeAvailabilityFromPostgres() } - return { available, count } } /** - * Mitra-online check for use during pairing/extension safeguards. - * Combines the Valkey-mirrored online flag (Postgres mitra_online_status today) with - * the WebSocket-connected check. Never use "in-session" as a proxy for "online". + * Mitra-reachable check: in `mitras:online` SET AND heartbeat is fresh. + * Falls back to a Postgres `is_online` read if Valkey is unreachable; the + * fallback skips the heartbeat-freshness check (sweep takes care of stale rows + * within `stale_after_seconds + sweep_cadence`). */ export const isMitraReachable = async (mitraId) => { - const [row] = await sql` - SELECT is_online FROM mitra_online_status WHERE mitra_id = ${mitraId} - ` - return Boolean(row?.is_online) + try { + const inSet = await valkey.sismember(VK_MITRAS_ONLINE, mitraId) + if (!inSet) return false + const heartbeat = await valkey.get(vkHeartbeatKey(mitraId)) + if (!heartbeat) return false + const { stale_after_seconds } = await getMitraPingConfig() + return Date.parse(heartbeat) >= Date.now() - stale_after_seconds * 1000 + } catch (err) { + console.warn('[isMitraReachable] valkey unavailable, falling back to DB:', err.message) + const [row] = await sql`SELECT is_online FROM mitra_online_status WHERE mitra_id = ${mitraId}` + return Boolean(row?.is_online) + } } /** diff --git a/backend/src/services/mitra.service.js b/backend/src/services/mitra.service.js index 2daf15c..089675c 100644 --- a/backend/src/services/mitra.service.js +++ b/backend/src/services/mitra.service.js @@ -1,4 +1,8 @@ import { getDb } from '../db/client.js' +import * as valkey from '../plugins/valkey.js' +import { VK_MITRAS_DEACTIVATED } from './mitra-status.service.js' +import { revokeAllSessionsForUser } from './token.service.js' +import { UserType } from '../constants.js' const sql = getDb() @@ -36,6 +40,24 @@ export const updateMitraStatus = async (id, is_active) => { RETURNING id, is_active ` if (!mitra) throw Object.assign(new Error('Mitra not found'), { code: 'NOT_FOUND', statusCode: 404 }) + + // Deactivation also revokes all auth_sessions so the next token refresh fails + // (bounds the "ghost online" window to access-token TTL across all routes, + // not just heartbeat). See requirement/valkey-online-mirror-plan.md. + if (!is_active) { + await revokeAllSessionsForUser({ userType: UserType.MITRA, userId: id }) + } + + try { + if (is_active) { + await valkey.srem(VK_MITRAS_DEACTIVATED, id) + } else { + await valkey.sadd(VK_MITRAS_DEACTIVATED, id) + } + } catch (err) { + console.error(`[valkey-mirror] updateMitraStatus ${id} failed:`, err) + } + return mitra } diff --git a/backend/src/services/pairing.service.js b/backend/src/services/pairing.service.js index 1816ca1..d4e1215 100644 --- a/backend/src/services/pairing.service.js +++ b/backend/src/services/pairing.service.js @@ -1,11 +1,13 @@ import { getDb } from '../db/client.js' -import { getMaxCustomersPerMitra, getPairingBlastTimeoutSeconds, getReturningChatConfirmationTimeoutSeconds } from './config.service.js' +import * as valkey from '../plugins/valkey.js' +import { VK_MITRAS_ONLINE, VK_MITRAS_DEACTIVATED, vkCapacityKey, vkHeartbeatKey } from './mitra-status.service.js' +import { getMaxCustomersPerMitra, getPairingBlastTimeoutSeconds, getReturningChatConfirmationTimeoutSeconds, getMitraPingConfig } from './config.service.js' import { sendToUser } from '../plugins/websocket.js' import { sendPushNotification } from './notification.service.js' import { startSessionTimer } from './session-timer.service.js' import { startSessionListener } from './chat-handler.service.js' import { consumePaymentSession, failPaymentSession, getPaymentSession, recordIntermediateFailure } from './payment.service.js' -import { isMitraReachable, isMitraInActiveSessionWithCustomer, getMitraActiveSessionCount } from './mitra-status.service.js' +import { isMitraReachable, isMitraInActiveSessionWithCustomer, getMitraActiveSessionCount, recomputeCapacityForMitra, recomputeCapacityBySession } from './mitra-status.service.js' import { UserType, SessionStatus, @@ -76,10 +78,37 @@ const notifyCustomer = async (customerId, data) => { } } -export const findAvailableMitras = async () => { +// Valkey-driven: SDIFF(online, deactivated) → for each candidate, pipelined +// GET capacity + heartbeat, then filter by capacity gate + heartbeat freshness. +// Postgres fallback runs if any Valkey op throws (full JOIN as before). +const findAvailableMitrasFromValkey = async () => { + const { max_customers_per_mitra } = await getMaxCustomersPerMitra() + const { stale_after_seconds } = await getMitraPingConfig() + + const candidates = await valkey.sdiff(VK_MITRAS_ONLINE, VK_MITRAS_DEACTIVATED) + if (!candidates.length) return [] + + const pipe = valkey.pipeline() + for (const id of candidates) { + pipe.get(vkCapacityKey(id)) + pipe.get(vkHeartbeatKey(id)) + } + const results = await pipe.exec() + + const cutoff = Date.now() - stale_after_seconds * 1000 + const eligible = [] + for (let i = 0; i < candidates.length; i++) { + const capacity = Number(results[i * 2][1] ?? 0) + const heartbeat = results[i * 2 + 1][1] + if (capacity >= max_customers_per_mitra) continue + if (!heartbeat || Date.parse(heartbeat) < cutoff) continue + eligible.push({ id: candidates[i], active_session_count: capacity }) + } + return eligible +} + +const findAvailableMitrasFromPostgres = async () => { const { max_customers_per_mitra } = await getMaxCustomersPerMitra() - // Project active_session_count alongside the mitra row so the blast loop doesn't - // need a per-mitra COUNT roundtrip later. const mitras = await sql` SELECT m.id, m.display_name, sub.active_session_count FROM mitras m @@ -96,6 +125,15 @@ export const findAvailableMitras = async () => { return mitras } +export const findAvailableMitras = async () => { + try { + return await findAvailableMitrasFromValkey() + } catch (err) { + console.warn('[findAvailableMitras] valkey unavailable, falling back to Postgres:', err.message) + return findAvailableMitrasFromPostgres() + } +} + /** * Validate that a payment session is owned by the customer, confirmed, and not yet consumed. * Throws on mismatch. Returns the loaded payment session row. @@ -414,6 +452,10 @@ export const acceptPairingRequest = async (sessionId, mitraId) => { }) } + // Mitra now occupies a capacity slot (PENDING_PAYMENT counts per + // findAvailableMitras predicate). Mirror to Valkey. + await recomputeCapacityForMitra(mitraId) + // Mark this mitra's notification as accepted await sql` UPDATE chat_request_notifications diff --git a/backend/src/services/session-timer.service.js b/backend/src/services/session-timer.service.js index 1d45581..d2153cb 100644 --- a/backend/src/services/session-timer.service.js +++ b/backend/src/services/session-timer.service.js @@ -2,6 +2,7 @@ import { getDb } from '../db/client.js' import { publish } from '../plugins/valkey.js' import { sendToSessionParticipant } from '../plugins/websocket.js' import { sendPushNotification } from './notification.service.js' +import { recomputeCapacityForMitra } from './mitra-status.service.js' import { UserType, SessionStatus, WsMessage, EndedBy } from '../constants.js' const sql = getDb() @@ -152,6 +153,7 @@ const onSessionExpired = async (sessionId) => { RETURNING id, customer_id, mitra_id ` if (!session) return + await recomputeCapacityForMitra(session.mitra_id) // Notify customer — sees extend/close dialog; FCM fallback if WebSocket is down const expiredData = { type: WsMessage.SESSION_EXPIRED, session_id: sessionId } @@ -207,9 +209,10 @@ const autoCompleteIfStillClosing = async (sessionId) => { ended_at = COALESCE(ended_at, NOW()), ended_by = ${EndedBy.SYSTEM_AUTO_CLOSE} WHERE id = ${sessionId} AND status = ${SessionStatus.CLOSING} - RETURNING id + RETURNING id, mitra_id ` if (!updated) return + await recomputeCapacityForMitra(updated.mitra_id) const data = { type: WsMessage.SESSION_COMPLETED, session_id: sessionId } sendToSessionParticipant(sessionId, UserType.CUSTOMER, data) diff --git a/backend/src/services/session.service.js b/backend/src/services/session.service.js index 5514463..81b4dfc 100644 --- a/backend/src/services/session.service.js +++ b/backend/src/services/session.service.js @@ -1,5 +1,6 @@ import { getDb } from '../db/client.js' import { publish } from '../plugins/valkey.js' +import { recomputeCapacityForMitra } from './mitra-status.service.js' import { UserType, SessionStatus, MessageStatus, WsMessage } from '../constants.js' const sql = getDb() @@ -48,6 +49,7 @@ export const endSession = async (sessionId, endedBy, userId) => { code: 'SESSION_NOT_ACTIVE', statusCode: 409, }) } + await recomputeCapacityForMitra(session.mitra_id) // Notify both parties await publish(`session:${sessionId}:status`, { @@ -91,6 +93,11 @@ export const rerouteSession = async (sessionId, newMitraId) => { WHERE id = ${sessionId} RETURNING id, customer_id, mitra_id, status ` + // Both mitras' capacity flipped — recompute both. + await Promise.all([ + recomputeCapacityForMitra(oldMitraId), + recomputeCapacityForMitra(newMitraId), + ]) const [newMitra] = await sql` SELECT display_name FROM mitras WHERE id = ${newMitraId} diff --git a/backend/test/helpers/db.js b/backend/test/helpers/db.js index 41e31fa..39717d5 100644 --- a/backend/test/helpers/db.js +++ b/backend/test/helpers/db.js @@ -1,4 +1,5 @@ import { getDb } from '../../src/db/client.js' +import { flushTestDb } from './valkey.js' /** * Single shared sql client used by tests. Same singleton the services use, since @@ -37,6 +38,9 @@ export const resetDb = async () => { const sql = db() // RESTART IDENTITY is a no-op for UUID PKs but cheap; CASCADE handles any future FK additions. await sql.unsafe(`TRUNCATE TABLE ${TRUNCATE_TABLES.join(', ')} RESTART IDENTITY CASCADE`) + // Flush Valkey availability state so each test starts hermetic. Fixtures + // (createMitra etc.) re-seed Valkey alongside their Postgres writes. + await flushTestDb() } /** diff --git a/backend/test/helpers/fixtures.js b/backend/test/helpers/fixtures.js index 2388935..a200901 100644 --- a/backend/test/helpers/fixtures.js +++ b/backend/test/helpers/fixtures.js @@ -1,5 +1,6 @@ import { randomUUID } from 'node:crypto' import { db, resetAppConfig } from './db.js' +import { getTestValkey } from './valkey.js' /** * Insert a customer row. Defaults to the schema after the Phase 3.4 auth rewrite @@ -47,6 +48,19 @@ export const createMitra = async ({ ON CONFLICT (mitra_id) DO UPDATE SET is_online = true, last_online_at = ${now}, last_heartbeat_at = ${now}, updated_at = ${now} ` + // Mirror to Valkey so findAvailableMitras (Valkey-driven) sees this mitra. + // resetDb already FLUSHDBs Valkey, so seeding here per-mitra keeps tests + // hermetic without depending on production's startup seed. + const v = getTestValkey() + await v.multi() + .sadd('mitras:online', id) + .set(`mitra:heartbeat:${id}`, now.toISOString()) + .set(`mitra:capacity:${id}`, 0) + .exec() + } + if (!isActive) { + const v = getTestValkey() + await v.sadd('mitras:deactivated', id) } return row } diff --git a/backend/test/services/mitra-status.valkey-mirror.test.js b/backend/test/services/mitra-status.valkey-mirror.test.js new file mode 100644 index 0000000..e99b1e3 --- /dev/null +++ b/backend/test/services/mitra-status.valkey-mirror.test.js @@ -0,0 +1,494 @@ +import { describe, it, expect, beforeAll, beforeEach, afterAll, vi } from 'vitest' + +/** + * Integration tests for the Valkey availability mirror + * (requirement/valkey-online-mirror-plan.md). + * + * Real Postgres + real Valkey via test/setup.js — no mocks. We assert on both + * stores' state after each operation to catch missed mirrors or order bugs. + */ + +vi.mock('../../src/plugins/websocket.js', () => ({ + sendToUser: vi.fn(() => false), + sendToSessionParticipant: vi.fn(() => false), + registerWebSocketPlugin: vi.fn(), + registerWebSocketRoute: vi.fn(), + isUserOnlineWs: vi.fn(() => false), + getSessionConnections: vi.fn(() => ({})), +})) + +vi.mock('../../src/services/notification.service.js', () => ({ + sendPushNotification: vi.fn(async () => true), + registerDeviceToken: vi.fn(async () => {}), +})) + +const valkey = await import('../../src/plugins/valkey.js') +const { + setOnline, + setOffline, + heartbeat, + isMitraReachable, + recomputeCapacityForMitra, + recomputeCapacityBySession, + seedFromPostgres, + autoOfflineStaleMitras, + mirrorHeartbeatsToPostgres, + countAvailableMitrasFromCache, + invalidateAvailabilityCache, + VK_MITRAS_ONLINE, + VK_MITRAS_DEACTIVATED, + vkCapacityKey, + vkHeartbeatKey, +} = await import('../../src/services/mitra-status.service.js') +const { updateMitraStatus } = await import('../../src/services/mitra.service.js') +const { findAvailableMitras, acceptPairingRequest, createPairingRequest } = await import('../../src/services/pairing.service.js') +const { createPaymentSession, confirmPaymentSession } = await import('../../src/services/payment.service.js') +const { db, resetDb, resetAppConfig } = await import('../helpers/db.js') +const { getTestValkey } = await import('../helpers/valkey.js') +const { createCustomer, createMitra } = await import('../helpers/fixtures.js') +const { SessionStatus, UserType } = await import('../../src/constants.js') + +const v = () => getTestValkey() + +describe('mitra-status valkey mirror', () => { + beforeAll(async () => { + await resetAppConfig() + }) + + beforeEach(async () => { + await resetDb() + }) + + // ---------- Seed ---------- + + describe('seedFromPostgres', () => { + it('populates mitras:online from is_online=true rows', async () => { + const m1 = await createMitra({ callName: 'M1', isOnline: true }) + const m2 = await createMitra({ callName: 'M2', isOnline: true }) + await createMitra({ callName: 'M3', isOnline: false }) + + await v().flushdb() + await seedFromPostgres() + + const members = await v().smembers(VK_MITRAS_ONLINE) + expect(members.sort()).toEqual([m1.id, m2.id].sort()) + }) + + it('seeds mitras:deactivated from is_active=false', async () => { + const m = await createMitra({ callName: 'Dead', isActive: false }) + await createMitra({ callName: 'Alive', isActive: true }) + + await v().flushdb() + await seedFromPostgres() + + const members = await v().smembers(VK_MITRAS_DEACTIVATED) + expect(members).toEqual([m.id]) + }) + + it('seeds heartbeat keys for online mitras with current timestamp', async () => { + const m = await createMitra({ callName: 'Live', isOnline: true }) + await v().flushdb() + + const before = Date.now() + await seedFromPostgres() + const after = Date.now() + + const ts = await v().get(vkHeartbeatKey(m.id)) + expect(ts).toBeTruthy() + const seeded = Date.parse(ts) + expect(seeded).toBeGreaterThanOrEqual(before) + expect(seeded).toBeLessThanOrEqual(after) + }) + + it('seeds capacity counters from chat_sessions', async () => { + const c = await createCustomer({ callName: 'C' }) + const m = await createMitra({ callName: 'M', isOnline: true }) + const sql = db() + await sql` + INSERT INTO chat_sessions (customer_id, mitra_id, status) + VALUES (${c.id}, ${m.id}, ${SessionStatus.ACTIVE}) + ` + + await v().flushdb() + await seedFromPostgres() + + expect(await v().get(vkCapacityKey(m.id))).toBe('1') + }) + + it('is idempotent — running twice yields the same state', async () => { + const m = await createMitra({ callName: 'Idem', isOnline: true }) + await seedFromPostgres() + const first = { + online: (await v().smembers(VK_MITRAS_ONLINE)).sort(), + heartbeat: await v().get(vkHeartbeatKey(m.id)), + } + await seedFromPostgres() + const second = { + online: (await v().smembers(VK_MITRAS_ONLINE)).sort(), + heartbeat: await v().get(vkHeartbeatKey(m.id)), + } + expect(second.online).toEqual(first.online) + // Heartbeat is reseeded with NOW each call — must be >= first + expect(Date.parse(second.heartbeat)).toBeGreaterThanOrEqual(Date.parse(first.heartbeat)) + }) + }) + + // ---------- setOnline / setOffline ---------- + + describe('setOnline / setOffline write-through', () => { + it('setOnline adds to mitras:online + writes heartbeat key', async () => { + const m = await createMitra({ callName: 'Toggle', isOnline: false }) + await v().flushdb() + + await setOnline(m.id) + + expect(await v().sismember(VK_MITRAS_ONLINE, m.id)).toBe(1) + expect(await v().get(vkHeartbeatKey(m.id))).toBeTruthy() + }) + + it('setOffline removes from mitras:online + deletes heartbeat key', async () => { + const m = await createMitra({ callName: 'Toggle', isOnline: true }) + await setOnline(m.id) // ensure heartbeat key exists + + await setOffline(m.id) + + expect(await v().sismember(VK_MITRAS_ONLINE, m.id)).toBe(0) + expect(await v().get(vkHeartbeatKey(m.id))).toBeNull() + }) + + it('setOffline is no-op when mitra was already offline', async () => { + const m = await createMitra({ callName: 'OffAlready', isOnline: false }) + const sql = db() + const beforeLogs = await sql`SELECT COUNT(*)::int AS c FROM mitra_online_logs WHERE mitra_id=${m.id}` + + await setOffline(m.id) + + const afterLogs = await sql`SELECT COUNT(*)::int AS c FROM mitra_online_logs WHERE mitra_id=${m.id}` + expect(afterLogs[0].c).toBe(beforeLogs[0].c) + }) + }) + + // ---------- heartbeat ---------- + + describe('heartbeat (Valkey-only)', () => { + it('writes Valkey timestamp without touching Postgres last_heartbeat_at', async () => { + const m = await createMitra({ callName: 'Pinger', isOnline: true }) + const sql = db() + const [before] = await sql`SELECT last_heartbeat_at FROM mitra_online_status WHERE mitra_id=${m.id}` + const pgBefore = before.last_heartbeat_at + + // Make sure subsequent NOW() would differ + await new Promise(r => setTimeout(r, 50)) + await heartbeat(m.id) + + const [after] = await sql`SELECT last_heartbeat_at FROM mitra_online_status WHERE mitra_id=${m.id}` + // Postgres untouched + expect(after.last_heartbeat_at).toEqual(pgBefore) + // Valkey updated + const ts = await v().get(vkHeartbeatKey(m.id)) + expect(ts).toBeTruthy() + expect(Date.parse(ts)).toBeGreaterThan(pgBefore.getTime()) + }) + + it('advances the heartbeat timestamp on each call', async () => { + const m = await createMitra({ callName: 'P', isOnline: true }) + await heartbeat(m.id) + const t1 = await v().get(vkHeartbeatKey(m.id)) + + await new Promise(r => setTimeout(r, 20)) + await heartbeat(m.id) + const t2 = await v().get(vkHeartbeatKey(m.id)) + + expect(Date.parse(t2)).toBeGreaterThan(Date.parse(t1)) + }) + }) + + // ---------- isMitraReachable ---------- + + describe('isMitraReachable', () => { + it('returns true for online mitra with fresh heartbeat', async () => { + const m = await createMitra({ callName: 'Reach', isOnline: true }) + expect(await isMitraReachable(m.id)).toBe(true) + }) + + it('returns false when mitra is not in mitras:online', async () => { + const m = await createMitra({ callName: 'NoReach', isOnline: false }) + expect(await isMitraReachable(m.id)).toBe(false) + }) + + it('returns false when heartbeat is stale', async () => { + const m = await createMitra({ callName: 'Stale', isOnline: true }) + // Force stale heartbeat (one hour ago) + const ancient = new Date(Date.now() - 3_600_000).toISOString() + await v().set(vkHeartbeatKey(m.id), ancient) + + expect(await isMitraReachable(m.id)).toBe(false) + }) + }) + + // ---------- recomputeCapacity ---------- + + describe('recomputeCapacityForMitra', () => { + it('counts ACTIVE + PENDING_PAYMENT sessions', async () => { + const c = await createCustomer({ callName: 'C' }) + const c2 = await createCustomer({ callName: 'C2' }) + const m = await createMitra({ callName: 'Cap', isOnline: true }) + const sql = db() + await sql` + INSERT INTO chat_sessions (customer_id, mitra_id, status) + VALUES (${c.id}, ${m.id}, ${SessionStatus.ACTIVE}), + (${c2.id}, ${m.id}, ${SessionStatus.PENDING_PAYMENT}) + ` + await recomputeCapacityForMitra(m.id) + expect(await v().get(vkCapacityKey(m.id))).toBe('2') + }) + + it('excludes ended/closing/extending sessions', async () => { + const c = await createCustomer({ callName: 'C' }) + const m = await createMitra({ callName: 'Cap', isOnline: true }) + const sql = db() + await sql` + INSERT INTO chat_sessions (customer_id, mitra_id, status) + VALUES (${c.id}, ${m.id}, ${SessionStatus.COMPLETED}) + ` + await recomputeCapacityForMitra(m.id) + expect(await v().get(vkCapacityKey(m.id))).toBe('0') + }) + + it('no-op when mitraId is null/undefined', async () => { + await recomputeCapacityForMitra(null) // should not throw + await recomputeCapacityForMitra(undefined) + }) + }) + + // ---------- findAvailableMitras ---------- + + describe('findAvailableMitras (Valkey-driven)', () => { + it('returns online + not-deactivated + under-capacity + fresh-heartbeat mitras', async () => { + const ok = await createMitra({ callName: 'OK', isOnline: true }) + const deact = await createMitra({ callName: 'Deact', isOnline: true, isActive: false }) + const offline = await createMitra({ callName: 'Off', isOnline: false }) + const stale = await createMitra({ callName: 'Stale', isOnline: true }) + await v().set(vkHeartbeatKey(stale.id), new Date(Date.now() - 3_600_000).toISOString()) + + const result = await findAvailableMitras() + const ids = result.map(r => r.id).sort() + expect(ids).toEqual([ok.id].sort()) + expect(result.find(r => r.id === ok.id).active_session_count).toBe(0) + }) + + it('excludes a mitra whose capacity is at max', async () => { + const m = await createMitra({ callName: 'AtCap', isOnline: true }) + // max_customers_per_mitra default is 3 + await v().set(vkCapacityKey(m.id), 3) + + const result = await findAvailableMitras() + expect(result.find(r => r.id === m.id)).toBeUndefined() + }) + + it('returns capacity in the result for the blast loop', async () => { + const m = await createMitra({ callName: 'WithCap', isOnline: true }) + await v().set(vkCapacityKey(m.id), 2) + const result = await findAvailableMitras() + expect(result.find(r => r.id === m.id).active_session_count).toBe(2) + }) + }) + + // ---------- countAvailableMitrasFromCache ---------- + + describe('countAvailableMitrasFromCache (beacon)', () => { + it('caches the snapshot in Valkey with TTL', async () => { + await createMitra({ callName: 'On', isOnline: true }) + await v().del('availability:snapshot') + + const first = await countAvailableMitrasFromCache() + expect(first.available).toBe(true) + expect(first.count).toBe(1) + + const cached = await v().get('availability:snapshot') + expect(cached).toBeTruthy() + expect(JSON.parse(cached)).toEqual(first) + + const ttl = await v().ttl('availability:snapshot') + expect(ttl).toBeGreaterThan(0) + expect(ttl).toBeLessThanOrEqual(10) + }) + + it('returns cached snapshot on subsequent calls without recompute', async () => { + await createMitra({ callName: 'On', isOnline: true }) + await countAvailableMitrasFromCache() // primes cache + + // Manually corrupt SET to prove subsequent call reads cache, not Valkey state + await v().flushdb() + await v().set('availability:snapshot', JSON.stringify({ available: true, count: 42 }), 'EX', 10) + + const result = await countAvailableMitrasFromCache() + expect(result.count).toBe(42) + }) + + it('invalidateAvailabilityCache deletes the snapshot', async () => { + await v().set('availability:snapshot', JSON.stringify({ available: true, count: 1 }), 'EX', 10) + await invalidateAvailabilityCache() + expect(await v().get('availability:snapshot')).toBeNull() + }) + }) + + // ---------- autoOfflineStaleMitras ---------- + + describe('autoOfflineStaleMitras', () => { + it('flips Postgres + cleans Valkey for mitras with stale heartbeat', async () => { + const m = await createMitra({ callName: 'WillStale', isOnline: true }) + const sql = db() + + // Force stale heartbeat + await v().set(vkHeartbeatKey(m.id), new Date(Date.now() - 3_600_000).toISOString()) + + const count = await autoOfflineStaleMitras() + expect(count).toBe(1) + + const [row] = await sql`SELECT is_online FROM mitra_online_status WHERE mitra_id=${m.id}` + expect(row.is_online).toBe(false) + expect(await v().sismember(VK_MITRAS_ONLINE, m.id)).toBe(0) + expect(await v().get(vkHeartbeatKey(m.id))).toBeNull() + + const [log] = await sql` + SELECT status FROM mitra_online_logs + WHERE mitra_id=${m.id} ORDER BY timestamp DESC LIMIT 1 + ` + expect(log.status).toBe('offline') + }) + + it('no-op when no mitras are stale', async () => { + await createMitra({ callName: 'Fresh', isOnline: true }) + const count = await autoOfflineStaleMitras() + expect(count).toBe(0) + }) + + it('no-op when require_ping=false', async () => { + const sql = db() + await sql` + UPDATE app_config SET value=${sql.json({ value: false })} + WHERE key='require_mitra_ping' + ` + const m = await createMitra({ callName: 'WouldBeStale', isOnline: true }) + await v().set(vkHeartbeatKey(m.id), new Date(Date.now() - 3_600_000).toISOString()) + + const count = await autoOfflineStaleMitras() + expect(count).toBe(0) + // Restore for other tests + await sql` + UPDATE app_config SET value=${sql.json({ value: true })} + WHERE key='require_mitra_ping' + ` + }) + }) + + // ---------- mirrorHeartbeatsToPostgres ---------- + + describe('mirrorHeartbeatsToPostgres', () => { + it('writes Valkey heartbeat timestamps to Postgres last_heartbeat_at in one batch', async () => { + const m1 = await createMitra({ callName: 'P1', isOnline: true }) + const m2 = await createMitra({ callName: 'P2', isOnline: true }) + const sql = db() + + const ts = new Date(Date.now() - 2_000).toISOString() + await v().set(vkHeartbeatKey(m1.id), ts) + await v().set(vkHeartbeatKey(m2.id), ts) + + const count = await mirrorHeartbeatsToPostgres() + expect(count).toBe(2) + + const rows = await sql` + SELECT mitra_id, last_heartbeat_at FROM mitra_online_status + WHERE mitra_id IN (${m1.id}, ${m2.id}) + ` + for (const row of rows) { + expect(row.last_heartbeat_at.toISOString()).toBe(ts) + } + }) + + it('no-op when no mitras are online', async () => { + await v().del(VK_MITRAS_ONLINE) + const count = await mirrorHeartbeatsToPostgres() + expect(count).toBe(0) + }) + }) + + // ---------- updateMitraStatus / revokeAllSessions ---------- + + describe('updateMitraStatus + auth_session revocation', () => { + it('deactivation adds to mitras:deactivated AND revokes all auth_sessions', async () => { + const m = await createMitra({ callName: 'Banned', isActive: true }) + const sql = db() + const tokenHash = '$2b$10$abcdefghijklmnopqrstuv' + await sql` + INSERT INTO auth_sessions (user_type, user_id, refresh_token_hash, expires_at) + VALUES (${UserType.MITRA}, ${m.id}, ${tokenHash}, NOW() + INTERVAL '30 days') + ` + + await updateMitraStatus(m.id, false) + + expect(await v().sismember(VK_MITRAS_DEACTIVATED, m.id)).toBe(1) + const [auth] = await sql`SELECT revoked_at FROM auth_sessions WHERE user_id=${m.id}` + expect(auth.revoked_at).not.toBeNull() + }) + + it('reactivation removes from mitras:deactivated', async () => { + const m = await createMitra({ callName: 'Pardoned', isActive: false }) + await v().sadd(VK_MITRAS_DEACTIVATED, m.id) + + await updateMitraStatus(m.id, true) + + expect(await v().sismember(VK_MITRAS_DEACTIVATED, m.id)).toBe(0) + }) + }) + + // ---------- E2E: blast lifecycle ---------- + + describe('end-to-end: blast lifecycle drives capacity counter', () => { + it('mitra accept → capacity++; session end is covered separately', async () => { + const c = await createCustomer({ callName: 'BlastC' }) + const m = await createMitra({ callName: 'BlastM', isOnline: true }) + + const pay = await createPaymentSession({ + customerId: c.id, + durationMinutes: 15, + amount: 30000, + }) + await confirmPaymentSession(pay.id, c.id) + const session = await createPairingRequest(c.id, { paymentRequestId: pay.id }) + expect(await v().get(vkCapacityKey(m.id))).toBe('0') // no accept yet + + await acceptPairingRequest(session.id, m.id) + expect(await v().get(vkCapacityKey(m.id))).toBe('1') + }) + }) + + // ---------- Reader fallback when Valkey is unavailable ---------- + + describe('reader fallback', () => { + it('isMitraReachable falls back to Postgres on Valkey error', async () => { + const m = await createMitra({ callName: 'Fallback', isOnline: true }) + // Stub sismember to throw + const spy = vi.spyOn(valkey, 'sismember').mockRejectedValue(new Error('valkey down')) + try { + // Postgres has is_online=true → fallback returns true + const result = await isMitraReachable(m.id) + expect(result).toBe(true) + } finally { + spy.mockRestore() + } + }) + + it('findAvailableMitras falls back to Postgres JOIN when Valkey sdiff throws', async () => { + const m = await createMitra({ callName: 'FallbackBlast', isOnline: true }) + const spy = vi.spyOn(valkey, 'sdiff').mockRejectedValue(new Error('valkey down')) + try { + const result = await findAvailableMitras() + expect(result.find(r => r.id === m.id)).toBeDefined() + } finally { + spy.mockRestore() + } + }) + }) +}) diff --git a/backend/test/services/session-timer.service.test.js b/backend/test/services/session-timer.service.test.js index 46f199b..6489848 100644 --- a/backend/test/services/session-timer.service.test.js +++ b/backend/test/services/session-timer.service.test.js @@ -1,4 +1,5 @@ import { describe, it, expect, beforeEach, afterEach, vi } from 'vitest' +import { randomUUID } from 'node:crypto' // Capture calls to sendToSessionParticipant so we can assert the 3-min warning event. vi.mock('../../src/plugins/websocket.js', () => ({ @@ -15,10 +16,42 @@ vi.mock('../../src/services/notification.service.js', () => ({ registerDeviceToken: vi.fn(async () => {}), })) -vi.mock('../../src/plugins/valkey.js', () => ({ - publish: vi.fn(async () => {}), - subscribe: vi.fn(() => () => {}), -})) +// Real DB queries don't settle under fake timers (they're real socket I/O, not +// microtasks). Stub getDb() with a tagged-template-compatible mock so onThreeMinuteWarning's +// `SELECT expires_at FROM chat_sessions WHERE id = ${sessionId}` resolves synchronously. +vi.mock('../../src/db/client.js', () => { + const fakeSql = () => Promise.resolve([{ expires_at: null }]) + fakeSql.unsafe = () => Promise.resolve([]) + fakeSql.array = (arr) => arr + fakeSql.json = (v) => v + return { getDb: () => fakeSql } +}) + +vi.mock('../../src/plugins/valkey.js', () => { + const noopPipeline = { sadd: () => noopPipeline, srem: () => noopPipeline, set: () => noopPipeline, get: () => noopPipeline, del: () => noopPipeline, exec: async () => [] } + return { + publish: vi.fn(async () => {}), + subscribe: vi.fn(() => () => {}), + onValkeyReady: vi.fn(), + getValkeyClient: vi.fn(() => ({ setex: vi.fn(async () => 'OK') })), + getValkeyPub: vi.fn(), + getValkeySub: vi.fn(), + sadd: vi.fn(async () => 1), + srem: vi.fn(async () => 1), + sismember: vi.fn(async () => false), + smembers: vi.fn(async () => []), + sdiff: vi.fn(async () => []), + scard: vi.fn(async () => 0), + set: vi.fn(async () => 'OK'), + get: vi.fn(async () => null), + del: vi.fn(async () => 1), + incr: vi.fn(async () => 1), + decr: vi.fn(async () => 0), + exists: vi.fn(async () => 0), + pipeline: vi.fn(() => noopPipeline), + multi: vi.fn(() => noopPipeline), + } +}) const { sendToSessionParticipant } = await import('../../src/plugins/websocket.js') const { startSessionTimer, clearSessionTimer } = await import('../../src/services/session-timer.service.js') @@ -35,7 +68,9 @@ describe('session-timer 3-minute warning (Phase 4)', () => { }) it('emits session_warning kind:three_minutes_left exactly once at the 3-min mark', async () => { - const sessionId = 'sess-3min-test' + // Real UUID — onThreeMinuteWarning runs a Postgres SELECT against chat_sessions.id + // which is uuid-typed; string ids throw a parse error before we hit the row check. + const sessionId = randomUUID() const expiresAt = new Date(Date.now() + 5 * 60_000) // 5 minutes from now startSessionTimer(sessionId, expiresAt) @@ -65,7 +100,7 @@ describe('session-timer 3-minute warning (Phase 4)', () => { }) it('does NOT re-fire the 3-min warning when the timer is rescheduled (e.g. extension)', async () => { - const sessionId = 'sess-rescheduled' + const sessionId = randomUUID() const initial = new Date(Date.now() + 5 * 60_000) startSessionTimer(sessionId, initial) diff --git a/requirement/deployment.md b/requirement/deployment.md new file mode 100644 index 0000000..df18cb9 --- /dev/null +++ b/requirement/deployment.md @@ -0,0 +1,107 @@ +# Deployment notes + +Operational decisions and dependency configuration for staging/production. Keep this updated as we make infra choices; cross-link from feature plans when a deploy-time setting matters. + +## Infrastructure summary + +| Component | Service | Tier / Notes | +|---|---|---| +| Backend (public + internal) | GCP Cloud Run | Horizontal scaling; SIGTERM trapped for graceful drain ([server.js](../backend/src/server.js)) | +| Database | GCP Cloud SQL (PostgreSQL) | Source of truth for all durable state | +| Pub/sub + cache | Valkey | Self-hosted on VM today; Memorystore Standard (HA) recommended for prod (see [§ Valkey](#valkey)) | +| Networking | GCP VPC | Internal listener (port 3001) never exposed; CC reaches it via VPN | +| Payment | Xendit | See [phase5-xendit-plan.md](phase5-xendit-plan.md) for keys / webhook URL setup | +| Auth | Self-managed JWT + FCM-only Firebase | See [backend/CLAUDE.md](../backend/CLAUDE.md) | + +## Valkey + +Valkey is used for two distinct purposes: + +1. **Pub/sub** — cross-instance event fan-out (chat messages, session lifecycle, config invalidation). See [backend/src/plugins/valkey.js](../backend/src/plugins/valkey.js). +2. **Availability mirror** — `mitras:online`, `mitras:deactivated`, `mitra:capacity:`, `mitra:heartbeat:`, and `availability:snapshot` per [valkey-online-mirror-plan.md](valkey-online-mirror-plan.md). Postgres remains the durable source of truth; Valkey is the hot read path. + +### Persistence — required or optional? + +**Not required.** All durable state lives in Postgres; Valkey is a cache + ephemeral liveness layer that fully rebuilds via `seedFromPostgres()` on backend reconnect. + +What's actually in Valkey, and what happens if it's wiped: + +| Key | Derivable from Postgres? | Cost of loss | +|---|---|---| +| `mitras:online` | yes | reseeded on reconnect | +| `mitras:deactivated` | yes | reseeded on reconnect | +| `mitra:capacity:` | yes (`COUNT(*) FROM chat_sessions`) | reseeded on reconnect | +| `mitra:heartbeat:` | **no** — pure transient liveness | seed writes `NOW`; ≤ a few seconds of fuzz on `last_heartbeat_at` forensics | +| `availability:snapshot` | recomputable | next beacon poll repopulates | + +Reader code in services/* has explicit Postgres fallbacks for every Valkey op, so the cold-cache window during a restart degrades performance, not correctness. + +### Persistence recommendation by environment + +| Environment | Setting | Reason | +|---|---|---| +| **Dev / local** | No persistence (`--save "" --appendonly no` or just default) | Restarts wipe state; reseed handles it cleanly; zero disk overhead | +| **Staging** | AOF on (`--appendonly yes`) | Verifies prod-like behavior; tiny disk cost | +| **Production** | AOF on, optionally RDB too (`--appendonly yes --save 60 1000`) | Eliminates cold-cache window after restart; trivial disk footprint (few MB) | + +The application code is identical across all three — persistence is a deploy-time knob, not a code-level concern. + +### Self-hosted Valkey (current state, dev/staging) + +Docker container on the existing VM. Reference config: + +```yaml +valkey: + image: valkey/valkey:7-alpine + command: valkey-server --appendonly yes --save 60 1000 + volumes: + - valkey-data:/data + ports: + - "6379:6379" + restart: unless-stopped +``` + +Backend reaches it via `VALKEY_URL=redis://:6379` in `backend/.env` (or Cloud Run env var). + +### Memorystore migration (when going to prod) + +The reseed-from-Postgres flow makes migration trivial — Valkey state is never load-bearing: + +1. Provision **Memorystore for Valkey, Standard tier** (HA with replica) in the same VPC + region as Cloud Run. + - Smallest available size (~1 GB) is plenty; actual data footprint is well under 1 MB. + - Cost: ~$50/month at minimum sizing in asia-southeast2. +2. Update Cloud Run env: `VALKEY_URL=redis://:6379`. +3. Deploy new revision. Cloud Run rolling deploy → new instances seed Memorystore from Postgres; old instances drain on old Valkey. +4. Shut down old Valkey once traffic has migrated. + +**Zero downtime.** No data migration needed (state is derivable). The cold-cache window on new instances is handled by the existing Postgres-fallback reader paths. + +### Tier choice rationale + +| Tier | When to use | Failover behavior | +|---|---|---| +| Self-hosted Docker | Dev, staging | Manual restart; backend reseeds when Valkey comes back | +| Memorystore Basic | Cost-sensitive single-AZ staging | ~1–5 min outage per maintenance event; backend handles via Postgres fallback | +| Memorystore Standard (HA) | **Production** | ~30s automatic failover; replica keeps data live | + +The system is correct on any tier — HA reduces customer-visible latency spikes during Valkey events from minutes to seconds. + +## Cloud Run + +(Placeholder — fill in as we make decisions about region, min/max instances, concurrency, secrets manager wiring.) + +## Cloud SQL + +(Placeholder — pool size, machine type, HA flag, backup retention.) + +## Xendit + +See [phase5-xendit-plan.md](phase5-xendit-plan.md) for credential setup and webhook URL configuration. Stage 8 (live E2E) is currently blocked on test-mode keys. + +## Open ops decisions + +- [ ] Confirm Memorystore Standard tier for prod deploy (recommended in [§ Valkey](#valkey)). +- [ ] Pin GCP region for backend + Cloud SQL + Memorystore (all must match for sub-ms internal latency). +- [ ] Secrets manager (GCP Secret Manager vs Cloud Run env vars) for `AUTH_JWT_SECRET`, `XENDIT_SECRET_KEY`, etc. +- [ ] Backup retention policy for Cloud SQL. +- [ ] CI/CD pipeline for Cloud Run deploys. diff --git a/requirement/valkey-online-mirror-plan.md b/requirement/valkey-online-mirror-plan.md new file mode 100644 index 0000000..1d1040c --- /dev/null +++ b/requirement/valkey-online-mirror-plan.md @@ -0,0 +1,376 @@ +# Valkey mirror for mitra availability — plan + +**Status:** approved (2026-05-25). Open issue: heartbeat auth preHandler (see [§ Open issues](#open-issues)). +**Created:** 2026-05-25 +**Owner:** Ramadhan + +## Goal + +Move the **read path** for "is this mitra available to blast?" entirely into Valkey at production scale (hundreds of online mitras, thousands of customers polling). Eliminate per-heartbeat Postgres writes. Keep Postgres as the durable source of truth via either real-time mirroring (writes that already exist) or periodic batched mirroring (heartbeats). + +## North star + +- **Postgres = durable source of truth.** Every fact lives there. +- **Valkey = read path + ephemeral-write target.** Mirrors Postgres state; reads compute availability from primitive signals. +- **Valkey unreachable on a read:** fall back to the existing Postgres JOIN query. Outage degrades performance, never breaks pairing. +- **Valkey unreachable on a write:** log + continue. Reconciliation sweep heals drift. +- **Valkey restart / cold start:** reseed from Postgres before serving traffic. + +## Schema + +Four Valkey structures. None has a TTL (heartbeat freshness is computed by sweep, not by Redis expiry): + +| Key | Type | Value / members | Updated by | +|---|---|---|---| +| `mitras:online` | SET | mitra UUIDs | `setOnline` SADD; `setOffline` SREM; sweep SREM; bulk-SREM on deactivate | +| `mitras:deactivated` | SET | mitra UUIDs | `updateMitraStatus(is_active=false)` SADD; `updateMitraStatus(is_active=true)` SREM | +| `mitra:capacity:` | STRING (integer) | count of active+pending_payment sessions assigned to this mitra | INCR on session accept; DECR on session end/expire/cancel; DECR/INCR pair on reroute | +| `mitra:heartbeat:` | STRING | ISO 8601 timestamp of last heartbeat | heartbeat handler `SET`; `setOnline` `SET` (seed); `setOffline` `DEL` | + +Postgres mirror columns (durable): +- `mitra_online_status.is_online` — written by `setOnline` / `setOffline` / sweep. Already in schema. +- `mitra_online_status.last_heartbeat_at` — written by the **batched heartbeat mirror** every 60s (NOT per-ping). Already in schema. +- `mitras.is_active` — written by `updateMitraStatus`. Already in schema. +- `chat_sessions.status` + `mitra_id` — already source-of-truth for session counts. + +## Read path (computing "available for blast") + +All reads compute on the fly from Valkey primitives. No memoized `mitras:available` set. + +```js +const findAvailableMitras = async () => { + // 1. online minus deactivated + const candidates = await valkey.sdiff('mitras:online', 'mitras:deactivated') + if (!candidates.length) return [] + + // 2. fetch capacity + heartbeat for each candidate (pipelined: 1 roundtrip) + const pipe = valkey.pipeline() + for (const id of candidates) { + pipe.get(`mitra:capacity:${id}`) + pipe.get(`mitra:heartbeat:${id}`) + } + const results = await pipe.exec() + + // 3. filter by capacity + heartbeat freshness + const { max_customers_per_mitra } = await getMaxCustomersPerMitra() + const { stale_after_seconds } = await getMitraPingConfig() + const cutoff = Date.now() - stale_after_seconds * 1000 + + const eligible = [] + for (let i = 0; i < candidates.length; i++) { + const count = Number(results[i * 2][1] ?? 0) + const heartbeatAt = results[i * 2 + 1][1] + if (count >= max_customers_per_mitra) continue + if (!heartbeatAt || Date.parse(heartbeatAt) < cutoff) continue + eligible.push({ id: candidates[i], active_session_count: count }) + } + return eligible +} +``` + +**Cost at prod scale (300 online):** 1 `SDIFF` + 600 `GET` (pipelined) = ~1ms. Negligible. + +### Read sites — what changes + +| Caller | Today | After | +|---|---|---| +| `isMitraReachable(mitraId)` ([mitra-status.service.js:215](../backend/src/services/mitra-status.service.js#L215)) | `SELECT is_online ...` | `SISMEMBER mitras:online` + check `mitra:heartbeat:` freshness | +| `findAvailableMitras` ([pairing.service.js:79](../backend/src/services/pairing.service.js#L79)) | full JOIN with chat_sessions | Valkey-driven (above) | +| `countAvailableMitrasFromCache` ([mitra-status.service.js:181](../backend/src/services/mitra-status.service.js#L181)) | full JOIN, cached in-process 10s | Valkey-driven, cached in **Valkey** 10s (shared cluster-wide) | +| Dashboard online count ([dashboard.service.js:16](../backend/src/services/dashboard.service.js#L16)) | `COUNT(*) WHERE is_online=true` | `SCARD mitras:online` | +| `getStatus(mitraId)` (mitra's own status poll) | full SELECT | Hybrid: `SISMEMBER` for `is_online`, Postgres for timestamps | +| `getOnlineMitras` (CC dashboard) | full JOIN with display_name + active_session_count | **unchanged** — low volume, joins make sense in SQL | + +### Reader fallback + +Every Valkey read is wrapped: + +```js +try { + return await valkey.sismember('mitras:online', mitraId) +} catch (err) { + log.warn({ err }, '[mitras:online] valkey unreachable, falling back to DB') + const [row] = await sql`SELECT is_online FROM mitra_online_status WHERE mitra_id = ${mitraId}` + return Boolean(row?.is_online) +} +``` + +For `findAvailableMitras`, the fallback is the existing Postgres JOIN query. + +## Write paths + +Each Postgres write is followed by a Valkey write. Postgres always commits first. Valkey failures log + continue. + +### Online / offline (mitra app toggle) + +| Action | Postgres | Valkey | +|---|---|---| +| `setOnline(mitraId)` | `UPDATE mitra_online_status SET is_online=true, last_online_at=NOW(), last_heartbeat_at=NOW()` | `SADD mitras:online ` + `SET mitra:heartbeat: ` | +| `setOffline(mitraId)` | `UPDATE mitra_online_status SET is_online=false, last_offline_at=NOW()` | `SREM mitras:online ` + `DEL mitra:heartbeat:` | + +The `SET heartbeat` on `setOnline` is critical: without it, the very next sweep would mark the freshly-online mitra stale (their first real heartbeat is up to `heartbeat_cadence_seconds` away). + +### Admin activate / deactivate (CC) + +[`updateMitraStatus`](../backend/src/services/mitra.service.js) ([mitra.service.js:32](../backend/src/services/mitra.service.js#L32)): + +| Action | Postgres | Valkey | +|---|---|---| +| activate (`is_active=true`) | `UPDATE mitras SET is_active=true` | `SREM mitras:deactivated ` | +| deactivate (`is_active=false`) | `UPDATE mitras SET is_active=false` | `SADD mitras:deactivated ` | + +### Mitra heartbeat (hot path) + +Heartbeat handler ([mitra-status.service.js:79](../backend/src/services/mitra-status.service.js#L79)) is rewritten to operate entirely against Valkey: + +1. `authenticate` plugin verifies JWT signature + expiry + `userType === 'mitra'` (no DB). +2. `SISMEMBER mitras:deactivated ` → if true, return `403`. +3. `SET mitra:heartbeat: `. + +Steps 2 + 3 pipelined into one Valkey roundtrip. + +| | Today | After | +|---|---|---| +| Auth check | `getMitraById` SELECT + `is_active` check (1 DB read) | `SISMEMBER mitras:deactivated` (Valkey only) | +| Body | `UPDATE mitra_online_status SET last_heartbeat_at=NOW() WHERE id=? AND is_online=true` (1 DB write) | `SET mitra:heartbeat:` (Valkey only) | +| Postgres ops per ping | 2 | **0** | +| Valkey ops per ping | 0 | 2 (one pipelined roundtrip) | +| At prod scale (300 online × 2 pings/min) | 1,200 DB ops/min | 1,200 Valkey ops/min, **0 DB ops/min** | + +`mitras:deactivated` is already maintained on every CC `updateMitraStatus` call (see [§ Admin activate / deactivate](#admin-activate--deactivate-cc)) so deactivation propagates to the heartbeat path within one Valkey write window (~ms). + +### Heartbeat → Postgres batched mirror + +A background job runs every `HEARTBEAT_MIRROR_INTERVAL_SECONDS` (env, default 60). One SQL statement, touches all online mitras at once: + +```js +const mirrorHeartbeatsToPostgres = async () => { + const onlineIds = await valkey.smembers('mitras:online') + if (!onlineIds.length) return + + const pipe = valkey.pipeline() + for (const id of onlineIds) pipe.get(`mitra:heartbeat:${id}`) + const results = await pipe.exec() + + const pairs = [] + for (let i = 0; i < onlineIds.length; i++) { + const ts = results[i][1] + if (ts) pairs.push({ mitra_id: onlineIds[i], ts }) + } + if (!pairs.length) return + + await sql` + UPDATE mitra_online_status m + SET last_heartbeat_at = u.ts::timestamptz, updated_at = NOW() + FROM (SELECT * FROM UNNEST( + ${sql.array(pairs.map(p => p.mitra_id))}::uuid[], + ${sql.array(pairs.map(p => p.ts))}::text[] + ) AS t(mitra_id, ts)) u + WHERE m.mitra_id = u.mitra_id + ` +} +``` + +**Cost at prod scale (300 online):** 1 batched UPDATE per minute per instance. At 3 instances = 3 statements/min cluster-wide (redundant but idempotent — latest timestamp wins). ~20–60× reduction in Postgres write load vs today. + +**No leader election initially.** 3 redundant idempotent UPDATEs/min is still 200× cheaper than today. If WAL pressure becomes visible, add a Valkey-NX lease leader-elect (~15 LOC); deferred. + +### Session capacity counter + +Touch all four services that mutate session state: + +| Trigger | File | Postgres write (existing) | Valkey write | +|---|---|---|---| +| Mitra accepts a chat (status → `active`, mitra_id set) | `pairing.service.js` accept handler | `UPDATE chat_sessions SET status='active', mitra_id=?` | `INCR mitra:capacity:` | +| Session ends (status → `ended` / `expired` / `cancelled`) | `closure.service.js`, expiry sweepers | `UPDATE chat_sessions SET status=...` | `DECR mitra:capacity:` | +| Session reroute (mitra A → mitra B) | `session.service.js` | `UPDATE chat_sessions SET mitra_id=B` | `DECR mitra:capacity:A` + `INCR mitra:capacity:B` (pipelined) | + +**What counts as occupying capacity:** sessions in `ACTIVE` or `PENDING_PAYMENT` status with a non-null `mitra_id`. This matches the existing `findAvailableMitras` predicate. Extension flow (active → pending_payment → active) does NOT change capacity — mitra stays occupied throughout. + +**Negative-counter guard:** wrap `DECR` in a `Math.max(0, ...)` check via Lua or via a `GET` + `SET` if zero — to prevent drift if a DECR fires without a prior INCR (e.g. legacy session without Valkey mirror). The reconciliation sweep recomputes from Postgres anyway. + +## Auto-offline sweep — Valkey-driven + +Replaces the current Postgres seq-scan with a Valkey computation. Runs every `MITRA_AUTO_OFFLINE_SWEEP_SECONDS` (env, default 30): + +```js +const autoOfflineStaleMitras = async () => { + const { stale_after_seconds, require_ping } = await getMitraPingConfig() + if (!require_ping) return 0 + + const onlineIds = await valkey.smembers('mitras:online') + if (!onlineIds.length) return 0 + + const pipe = valkey.pipeline() + for (const id of onlineIds) pipe.get(`mitra:heartbeat:${id}`) + const results = await pipe.exec() + + const cutoff = Date.now() - stale_after_seconds * 1000 + const stale = [] + for (let i = 0; i < onlineIds.length; i++) { + const ts = results[i][1] + if (!ts || Date.parse(ts) < cutoff) stale.push(onlineIds[i]) + } + if (!stale.length) return 0 + + // Postgres: bulk flip is_online=false + await sql` + UPDATE mitra_online_status + SET is_online = false, last_offline_at = NOW(), updated_at = NOW() + WHERE mitra_id = ANY(${sql.array(stale)}::uuid[]) AND is_online = true + ` + // Log rows + for (const id of stale) { + await sql`INSERT INTO mitra_online_logs (mitra_id, status) VALUES (${id}, 'offline')` + } + // Valkey: bulk SREM + DEL heartbeat keys + const cleanup = valkey.pipeline() + cleanup.srem('mitras:online', ...stale) + for (const id of stale) cleanup.del(`mitra:heartbeat:${id}`) + await cleanup.exec() + + invalidateAvailabilityCache() + return stale.length +} +``` + +**Stale threshold:** `stale_after_seconds` is read from `getMitraPingConfig()` ([config.service.js](../backend/src/services/config.service.js)) — the existing CC-tunable knob. Not a new env. + +**Sweep cadence:** `MITRA_AUTO_OFFLINE_SWEEP_SECONDS` env, default 30 (matches current hardcoded setInterval). + +**Failure semantics:** if any Valkey op throws, the entire sweep aborts for this tick. The next tick retries. We never want to mass-offline mitras due to a transient Valkey hiccup. + +## Shared beacon snapshot cache + +Replace the in-process `availabilityCache` ([mitra-status.service.js:14](../backend/src/services/mitra-status.service.js#L14)) with a Valkey GET/SETEX key. Even though reads are now sub-ms Valkey ops, this caps total Valkey query load at high beacon-poll rates (5,000 customers × 12/min = 60,000 polls/min → cache → 6 computations/min cluster-wide). + +``` +KEY: availability:snapshot +TYPE: string (JSON: {"available": bool, "count": number}) +TTL: 10 seconds +``` + +`config:invalidate` pub/sub subscriber extended to `DEL availability:snapshot` on `max_customers_per_mitra` bust. + +## Startup / reconnect / reseed + +Three triggers reseed Valkey state from Postgres. All idempotent. + +### Backend startup + +In [`server.js`](../backend/src/server.js), after Valkey emits `'ready'` and before the public listener binds: + +```js +const onlineRows = await sql`SELECT mitra_id FROM mitra_online_status WHERE is_online = true` +const deactRows = await sql`SELECT id FROM mitras WHERE is_active = false` +const sessionCountRows = await sql` + SELECT mitra_id, COUNT(*)::int AS c FROM chat_sessions + WHERE mitra_id IS NOT NULL AND status IN ('active', 'pending_payment') + GROUP BY mitra_id +` + +const pipe = valkey.multi() +pipe.del('mitras:online', 'mitras:deactivated') +if (onlineRows.length) { + pipe.sadd('mitras:online', ...onlineRows.map(r => r.mitra_id)) + // Seed heartbeat timestamps to NOW so the first sweep doesn't mass-offline + // currently-online mitras. They'll refresh on their next ping anyway. + const now = new Date().toISOString() + for (const r of onlineRows) pipe.set(`mitra:heartbeat:${r.mitra_id}`, now) +} +if (deactRows.length) pipe.sadd('mitras:deactivated', ...deactRows.map(r => r.id)) +for (const r of sessionCountRows) pipe.set(`mitra:capacity:${r.mitra_id}`, r.c) +await pipe.exec() +``` + +### ioredis reconnect + +Listen to the ioredis `'ready'` event (fires on initial connect AND each reconnect). Re-run the seed. + +### Periodic reconciliation sweeper + +`VALKEY_ONLINE_MIRROR_SWEEP_SECONDS` env, default 300. Runs the seed (idempotent — `DEL` + `SADD` + `SET` lands the same state). Belt-and-braces against drift from failed best-effort writes, out-of-band Postgres mutations, Valkey eviction. + +## Two sweeps, two cadences (summary) + +| Sweep | Purpose | Cadence env | Default | Reads from | Writes to | +|---|---|---|---|---|---| +| Auto-offline | Detect stale-heartbeat mitras → flip offline | `MITRA_AUTO_OFFLINE_SWEEP_SECONDS` | 30 | Valkey | Postgres + Valkey | +| Heartbeat mirror | Persist Valkey heartbeats to Postgres for forensics/backup | `HEARTBEAT_MIRROR_INTERVAL_SECONDS` | 60 | Valkey | Postgres | +| Reconciliation | Heal Valkey/Postgres drift | `VALKEY_ONLINE_MIRROR_SWEEP_SECONDS` | 300 | Postgres | Valkey | + +All three run on every backend instance independently. All idempotent. No leader election required. + +## Multi-instance safety + +Cloud Run runs N instances. Each instance: + +- Writes both stores on its own mutations. Atomic Valkey ops (`SADD` / `SREM` / `INCR` / `DECR` / `SET`) — no cross-instance coordination needed. +- Runs all three sweeps independently. Redundant but idempotent. +- Recompute-on-read for blast eligibility — no stale aggregate to invalidate. + +The `mitra:capacity:` counter is the most race-sensitive: `INCR` / `DECR` are atomic but a session-state change must consistently fire exactly one increment and one decrement over its lifetime. The reconciliation sweep recomputes from `chat_sessions` and resets the counter, healing any drift. + +## Failure mode summary + +| | Behavior | +|---|---| +| Valkey unreachable on read | Fall back to Postgres JOIN query | +| Valkey unreachable on write | Log + continue. Reconciliation sweep heals later. | +| Postgres unreachable on heartbeat mirror | Skip this cycle. Next cycle writes the latest. | +| Auto-offline sweep can't reach Valkey | Skip this tick. Mitras stay "online" until Valkey comes back + heartbeat ages out. | +| Valkey crash (catastrophic) | Backend reconnects → reseed from Postgres. Worst case: ≤60s of `last_heartbeat_at` forensics lost. | +| Backend crash | Other instances keep running. New instance reseed on startup. | + +## Files touched + +| File | Change | +|---|---| +| `backend/src/plugins/valkey.js` | Add wrappers: `sadd`, `srem`, `sismember`, `smembers`, `sdiff`, `scard`, `set`, `get`, `del`, `incr`, `decr`, `pipeline`/`multi` + `'ready'` reconnect hook | +| `backend/src/services/config.service.js` | Add `getMitraAutoOfflineSweepSeconds`, `getHeartbeatMirrorIntervalSeconds`, `getValkeyOnlineMirrorSweepSeconds` getters | +| `backend/src/services/mitra-status.service.js` | Major rewrite (see [§ Read path](#read-path-computing-available-for-blast) and [§ Write paths](#write-paths)). Add `incrementCapacity`, `decrementCapacity`, `mirrorHeartbeatsToPostgres`, `seedFromPostgres` | +| `backend/src/services/mitra.service.js` | Wrap `updateMitraStatus` with `SADD`/`SREM mitras:deactivated` | +| `backend/src/services/pairing.service.js` | Rewrite `findAvailableMitras` as Valkey-driven (Postgres fallback). `INCR mitra:capacity` on session accept. | +| `backend/src/services/closure.service.js` | `DECR mitra:capacity` on session end/expire/cancel | +| `backend/src/services/session.service.js` | `DECR` + `INCR` pair on reroute | +| `backend/src/services/dashboard.service.js` | `SCARD mitras:online` for online count | +| `backend/src/routes/public/mitra.status.routes.js` | Replace `resolveMitra` on `POST /heartbeat` with a Valkey `SISMEMBER mitras:deactivated` check (keep `resolveMitra` on `/online`, `/offline`, `GET /`) | +| `backend/src/server.js` | Call `seedFromPostgres` on startup (before listener binds); replace hardcoded 30_000 setInterval with env-driven cadence; register heartbeat mirror + reconciliation sweep intervals | +| `backend/.env.example` | Document `MITRA_AUTO_OFFLINE_SWEEP_SECONDS`, `HEARTBEAT_MIRROR_INTERVAL_SECONDS`, `VALKEY_ONLINE_MIRROR_SWEEP_SECONDS` | +| `backend/test/services/mitra-status.service.test.js` | Add tests (see [§ Test plan](#test-plan)) | +| `backend/test/services/pairing.service.test.js` | Update for Valkey-driven `findAvailableMitras` | +| `backend/test/helpers/valkey.js` (new if absent) | Test helper for clean-slate Valkey state per test | + +**Estimated touch:** ~400 LOC + ~200 LOC tests. ~2 days focused work. + +## Test plan + +### Unit +1. Mock Valkey; verify each writer calls Postgres → Valkey in correct order, with the seed-heartbeat-on-setOnline and DEL-on-setOffline. +2. Verify reader fallback path runs when Valkey ops throw. +3. Verify auto-offline sweep aborts entirely when Valkey ops throw (does NOT mass-offline via Postgres-only path). +4. Verify capacity counter never goes negative (Math.max guard). + +### Integration (real Valkey + Postgres) +1. **Seed correctness:** insert N online rows + M deactivated + session counts in DB; run startup seed; verify all four Valkey structures match. +2. **Heartbeat refresh:** `setHeartbeat()` → verify Valkey value updates; check that the periodic mirror writes Postgres `last_heartbeat_at` within one mirror cycle. +3. **Auto-offline:** insert online mitra, manually expire heartbeat by setting timestamp in past, run sweep, verify Postgres `is_online=false` + Valkey `SREM` + `DEL` heartbeat key. +4. **Capacity lifecycle:** simulate session accept → end across multiple mitras; verify counter increments/decrements; verify reroute moves count from A to B atomically. +5. **Restart resilience:** seed state, simulate Valkey restart (FLUSHALL), trigger reconnect handler, verify all four structures reseed correctly. +6. **Reconciliation:** corrupt Valkey (random SADD of non-existent mitra, wrong capacity counter, missing entries); run reconciliation sweep; verify convergence to Postgres state. +7. **Fallback:** disable Valkey mid-test; verify `findAvailableMitras` falls back to Postgres JOIN query and returns sensible results. + +### Regression +- All 90/92 existing backend tests should still pass. +- Maestro flows for pairing (ts-customer-*) should pass unchanged. + +## Decisions (locked 2026-05-25) + +1. **`revokeAllAuthSessions(mitraId)` added to `updateMitraStatus`** in the same PR. Bounds the deactivation gap to access-token TTL across all mitra routes (not just heartbeat). +2. **Prod Valkey: Memorystore for Valkey, Standard tier** (HA with replica, smallest available capacity ~1GB). Built-in replication keeps heartbeat timestamps live across failover. Staging/dev can run Basic tier — the reseed-from-Postgres flow handles cold-cache restarts correctly either way. +3. **Keep `last_heartbeat_at` column.** Written by the 60s batched mirror; remains available for operator forensics ("when was X last seen?"). Drop only if a future audit confirms no consumer reads it. + +## Future phases (deferred) + +- **Heartbeat → Valkey TTL with keyspace notifications.** Replace timestamp-comparison sweep with `notify-keyspace-events Ex` → instant detection of expired heartbeats. Requires Memorystore config change + a subscriber. Defer until 30s detection lag is the visible bottleneck. +- **Leader-elected mirror/sweep.** Use a Valkey-NX lease so only one instance runs each background job. ~15 LOC each. Defer until the redundant work shows up in metrics. diff --git a/requirement/valkey-online-mirror-testing.md b/requirement/valkey-online-mirror-testing.md new file mode 100644 index 0000000..a108458 --- /dev/null +++ b/requirement/valkey-online-mirror-testing.md @@ -0,0 +1,272 @@ +# Valkey Availability Mirror — Testing Checklist + +End-to-end verification for the Valkey-mirror refactor described in [valkey-online-mirror-plan.md](valkey-online-mirror-plan.md). + +Cluster labels: **[BE]** backend / curl / SQL / Valkey-cli, **[CC]** control_center, **[M]** mitra_app, **[C]** client_app. + +> **Run order:** Section A first (seed + startup) — every subsequent section assumes a fresh seed. Sections B–J are otherwise independent. + +--- + +## Setup + +- [ ] Backend running on `192.168.88.247:3000` (public) + `:3001` (internal) — `curl http://192.168.88.247:3000/api/shared/auth-providers` returns 200 +- [ ] Valkey reachable from backend (`VALKEY_URL` matches running instance; `[valkey] subscribed to config:invalidate` appears in backend boot log) +- [ ] Postgres reachable; backend boot log shows `[valkey-mirror] seed: X online, Y deactivated, Z with active sessions` +- [ ] At least 3 mitra accounts exist in DB (we need to flip them online/offline/deactivated across tests) +- [ ] One customer account ready for the blast scenarios +- [ ] Helpful aliases for verification (run from `backend/`): + ```bash + alias vk='node --env-file=.env -e' + # Then in tests: vk "(async()=>{const v=(await import('./src/plugins/valkey.js')).getValkeyClient(); + # console.log(await v.smembers('mitras:online'));process.exit(0)})()" + ``` + +--- + +## Section A — Seed + Startup + +Verifies `seedFromPostgres()` populates Valkey correctly from Postgres truth. + +- [ ] **[BE]** Restart backend; log shows one `[valkey-mirror] seed: N online, M deactivated, K with active sessions` line on startup +- [ ] **[BE]** Counts in the log match Postgres truth: + ```sql + SELECT + (SELECT COUNT(*) FROM mitra_online_status WHERE is_online=true) AS online, + (SELECT COUNT(*) FROM mitras WHERE is_active=false) AS deactivated, + (SELECT COUNT(DISTINCT mitra_id) FROM chat_sessions + WHERE mitra_id IS NOT NULL AND status IN ('active','pending_payment')) AS with_sessions; + ``` +- [ ] **[BE]** Valkey contents match — `SMEMBERS mitras:online` returns the same IDs as `SELECT mitra_id FROM mitra_online_status WHERE is_online=true` +- [ ] **[BE]** Valkey `mitra:heartbeat:` exists for every currently-online mitra, value is a recent ISO timestamp (within seed time) +- [ ] **[BE]** Valkey `mitra:capacity:` matches `SELECT COUNT(*) FROM chat_sessions WHERE mitra_id= AND status IN ('active','pending_payment')` for every online mitra +- [ ] **[BE]** `SMEMBERS mitras:deactivated` matches `SELECT id FROM mitras WHERE is_active=false` + +### Reconnect re-seed + +- [ ] **[BE]** With backend running, `FLUSHDB` on Valkey, wait ~5s for ioredis reconnect, verify a second `[valkey-mirror] seed:` log entry appears +- [ ] **[BE]** All four Valkey structures are rebuilt and match Postgres again + +--- + +## Section B — Online / Offline toggle (write-through) + +Verifies `setOnline` / `setOffline` writes both stores in the right order. + +- [ ] **[M]** Mitra taps "online" → backend updates Postgres `mitra_online_status.is_online=true` +- [ ] **[BE]** `SISMEMBER mitras:online ` returns `1` within 1s of the toggle +- [ ] **[BE]** `GET mitra:heartbeat:` returns a fresh ISO timestamp (within seconds of the toggle) +- [ ] **[M]** Mitra taps "offline" +- [ ] **[BE]** Postgres `is_online=false`, Valkey `SISMEMBER mitras:online ` returns `0`, `GET mitra:heartbeat:` returns `nil` +- [ ] **[BE]** `mitra_online_logs` has paired `online` / `offline` audit rows + +### Valkey-failure best-effort write + +- [ ] **[BE]** Stop Valkey, then toggle a mitra online → request succeeds (200), backend log shows `[valkey-mirror] setOnline failed:` but Postgres is updated correctly +- [ ] **[BE]** Restart Valkey → reconciliation sweep (≤ 300s default) eventually rebuilds the SET to include this mitra + +--- + +## Section C — Heartbeat path + +Verifies the rewrite: per-ping = 1 Valkey write, 0 DB writes. + +- [ ] **[M]** Mitra online for at least one heartbeat cycle (~30s) +- [ ] **[BE]** Watch Postgres query log during heartbeat — **no `UPDATE mitra_online_status SET last_heartbeat_at` rows fire on every ping** (only the batched mirror, default every 60s) +- [ ] **[BE]** `GET mitra:heartbeat:` value advances on each ping (re-check ~30s later, timestamp moves forward) +- [ ] **[BE]** After 60s+ wait, `SELECT last_heartbeat_at FROM mitra_online_status WHERE mitra_id=` advances (heartbeat mirror sweep ran) + +### Deactivation guard via Valkey + +- [ ] **[CC]** Admin deactivates the mitra (Phase 5 path: `is_active=false`) +- [ ] **[BE]** `SISMEMBER mitras:deactivated ` immediately returns `1` +- [ ] **[M]** Mitra app's next heartbeat → backend returns `403 ACCOUNT_INACTIVE` +- [ ] **[BE]** No Postgres SELECT on `mitras` table for that heartbeat (verify with query log) — guard is pure Valkey + +### Fallback when Valkey is down + +- [ ] **[BE]** Stop Valkey +- [ ] **[M]** Mitra app heartbeats → backend logs `[heartbeat] valkey check failed, falling back to DB`, request still succeeds for active mitra, returns `403` for deactivated mitra (full DB-backed resolveMitra path) +- [ ] **[BE]** Restart Valkey → next heartbeat uses Valkey path again (no fallback log line) + +--- + +## Section D — Capacity counter + +Verifies INCR/DECR across session lifecycle via `recomputeCapacityForMitra`. + +- [ ] **[BE]** Reset: pick a mitra with `mitra:capacity: = 0` +- [ ] **[C]** Customer pays → mitra accepts the blast → chat starts +- [ ] **[BE]** `GET mitra:capacity:` returns `1` within 1s of mitra accepting +- [ ] **[C]** Second customer pays → same mitra accepts (assuming `max_customers_per_mitra >= 2`) +- [ ] **[BE]** `GET mitra:capacity:` returns `2` +- [ ] **[C]** First session ends naturally (timer expires + goodbye flow completes) +- [ ] **[BE]** `GET mitra:capacity:` returns `1` within 1s +- [ ] **[C]** Second session ends +- [ ] **[BE]** `GET mitra:capacity:` returns `0` + +### Reroute + +- [ ] **[CC]** Reroute an active session from mitra A → mitra B +- [ ] **[BE]** `mitra:capacity:A` decrements, `mitra:capacity:B` increments — both atomic with the chat_sessions UPDATE + +### Capacity gates blast + +- [ ] **[BE]** Set `mitra:capacity:` to `max_customers_per_mitra` directly (`SET mitra:capacity: 3`) +- [ ] **[C]** Customer pays → blast → **this mitra is excluded** (verify with `chat_request_notifications` — no row for this mitra) + +--- + +## Section E — Deactivation flow (full propagation) + +Verifies `updateMitraStatus` + `revokeAllSessionsForUser` close the deactivation gap. + +- [ ] **[BE]** Mitra has an active access token (capture from `/api/mitra/auth/otp/verify` or use existing logged-in session) +- [ ] **[BE]** Confirm mitra can call protected routes (`curl -H "Authorization: Bearer " /api/mitra/...`) +- [ ] **[CC]** Admin deactivates the mitra +- [ ] **[BE]** Postgres: `mitras.is_active=false`, `auth_sessions.revoked_at IS NOT NULL` for this mitra +- [ ] **[BE]** Valkey: `SISMEMBER mitras:deactivated ` = 1 +- [ ] **[BE]** Mitra's current access token still works on routes that don't re-check active state (stateless JWT) — bounded by access-token TTL +- [ ] **[BE]** Mitra's heartbeat returns `403 ACCOUNT_INACTIVE` immediately (Valkey check on hot path) +- [ ] **[BE]** Mitra's next refresh token attempt fails (because `auth_sessions.revoked_at` was set) → app effectively logs them out + +### Re-activation + +- [ ] **[CC]** Re-activate the mitra +- [ ] **[BE]** Postgres `is_active=true`, Valkey `SISMEMBER mitras:deactivated ` = 0 +- [ ] **[M]** Mitra re-logs in, heartbeats again successfully + +--- + +## Section F — Read paths (Valkey-first) + +Verifies all reads use Valkey with Postgres fallback. + +### `isMitraReachable` + +- [ ] **[BE]** Online mitra with fresh heartbeat → `isMitraReachable` returns `true` (call via `node -e` or any route that uses it, e.g. extension flow) +- [ ] **[BE]** Manually set `mitra:heartbeat:` to a timestamp older than `stale_after_seconds` → `isMitraReachable` returns `false` (even though `is_online=true` in Postgres) +- [ ] **[BE]** Stop Valkey → `isMitraReachable` logs `[isMitraReachable] valkey unavailable, falling back to DB` and returns based on Postgres `is_online` + +### `findAvailableMitras` (blast) + +- [ ] **[BE]** Run `findAvailableMitras` (e.g. trigger a customer blast) — log shows Valkey path used (no warning about fallback) +- [ ] **[BE]** Result IDs match `SDIFF mitras:online mitras:deactivated` filtered by capacity + heartbeat freshness +- [ ] **[BE]** Stop Valkey → next blast logs `[findAvailableMitras] valkey unavailable, falling back to Postgres` and still returns correct results + +### `countAvailableMitrasFromCache` (customer beacon) + +- [ ] **[BE]** `curl /public/bestie-availability` returns `{available: bool, count: N}` matching reality +- [ ] **[BE]** `GET availability:snapshot` in Valkey shows cached JSON within 10s of last poll +- [ ] **[BE]** Multiple rapid polls (5+ per second from 3 different IPs) → only one Valkey-driven recompute per 10s; Postgres query log shows **zero** mitra-availability JOINs in steady state (only the once-per-10s cache miss) +- [ ] **[CC]** Operator changes `max_customers_per_mitra` → `availability:snapshot` is `DEL`d (cache bust), next poll recomputes + +### Dashboard online count + +- [ ] **[CC]** Dashboard "Online Mitras" stat matches `SCARD mitras:online` in Valkey +- [ ] **[BE]** Verify the dashboard query no longer hits `SELECT COUNT(*) FROM mitra_online_status WHERE is_online=true` (check query log) + +--- + +## Section G — Auto-offline sweep (Valkey-driven) + +Verifies stale heartbeats → flipped offline via Valkey diff. + +- [ ] **[BE]** Set `MITRA_AUTO_OFFLINE_SWEEP_SECONDS=10` in env for faster test cycle, restart backend +- [ ] **[CC]** Set `stale_after_seconds=15` in CC settings (or directly in `app_config`) +- [ ] **[M]** Mitra goes online, sends one heartbeat +- [ ] **[BE]** Manually delete the heartbeat key: `DEL mitra:heartbeat:` +- [ ] **[BE]** Wait up to 25s (15s stale + 10s sweep cadence) +- [ ] **[BE]** Postgres: `is_online=false` for this mitra, audit row in `mitra_online_logs` with status='offline' +- [ ] **[BE]** Valkey: `SISMEMBER mitras:online ` = 0, no `mitra:heartbeat:` key + +### Sweep skips on Valkey error + +- [ ] **[BE]** Stop Valkey +- [ ] **[BE]** Backend log shows `[auto-offline] valkey unavailable, skipping this tick:` each sweep cadence +- [ ] **[BE]** **No Postgres UPDATE fires** during the outage (verify with query log) — confirms we don't mass-offline on Valkey hiccup + +--- + +## Section H — Heartbeat → Postgres batched mirror + +Verifies the 60s UNNEST UPDATE. + +- [ ] **[BE]** Multiple mitras online, all heartbeating +- [ ] **[BE]** Set `HEARTBEAT_MIRROR_INTERVAL_SECONDS=15` for faster cycles, restart +- [ ] **[BE]** Wait one mirror cycle — Postgres log shows **one** UPDATE statement (with `UNNEST(...)` containing all online mitra IDs) +- [ ] **[BE]** `SELECT last_heartbeat_at FROM mitra_online_status WHERE is_online=true` returns timestamps within last cycle +- [ ] **[BE]** Compare with Valkey `GET mitra:heartbeat:` — Postgres lags Valkey by ≤ mirror-cadence seconds (forensic-grade, not real-time) + +### Mirror skips on Valkey error + +- [ ] **[BE]** Stop Valkey +- [ ] **[BE]** Backend log shows `[heartbeat-mirror] valkey unavailable, skipping:` each cycle +- [ ] **[BE]** Postgres `last_heartbeat_at` does NOT advance during the outage (correct — Valkey is the source of "when was last ping?") + +--- + +## Section I — Reconciliation sweep + +Verifies drift heals every `VALKEY_ONLINE_MIRROR_SWEEP_SECONDS`. + +- [ ] **[BE]** Set `VALKEY_ONLINE_MIRROR_SWEEP_SECONDS=30` for faster test, restart +- [ ] **[BE]** Manually corrupt Valkey: + ``` + SADD mitras:online 00000000-0000-0000-0000-000000000999 # bogus ID + SREM mitras:online # remove a real one + SET mitra:capacity: 99 # bogus capacity + ``` +- [ ] **[BE]** Wait one sweep cycle (~30s) → log shows `[valkey-mirror] seed:` again +- [ ] **[BE]** After sweep: Valkey state matches Postgres exactly (bogus ID gone, real ID present, capacity reset) + +### Sweep disabled when env=0 + +- [ ] **[BE]** Set `VALKEY_ONLINE_MIRROR_SWEEP_SECONDS=0`, restart +- [ ] **[BE]** Confirm no periodic seed log appears after the initial startup seed + +--- + +## Section J — Failure modes (Valkey degradation) + +End-to-end behavior when Valkey is down for an extended period. + +- [ ] **[BE]** Stop Valkey +- [ ] **[C]** Customer beacon poll → `availability:snapshot` GET fails → falls back to Postgres JOIN; UX unchanged but DB query rate spikes +- [ ] **[C]** Customer triggers blast → `findAvailableMitras` Valkey path fails → falls back to Postgres JOIN; blast still works +- [ ] **[M]** Mitra heartbeat → Valkey write fails (logged), but heartbeat returns 200; the missed write is irrelevant (auto-offline sweep is also skipping) +- [ ] **[M]** Mitra toggle online → Postgres update succeeds, Valkey SADD fails (logged); on next reconciliation sweep after Valkey returns, mitra is back in `mitras:online` SET +- [ ] **[BE]** Restart Valkey → reconnect listener fires → `seedFromPostgres()` runs → state restored; degraded period ends + +--- + +## Section K — Multi-instance (defer until Cloud Run) + +Run only when ≥ 2 backend instances are active. + +- [ ] **[BE]** Two instances both run their own seed on startup — final Valkey state is consistent (idempotent `DEL + SADD`) +- [ ] **[BE]** Concurrent setOnline calls on the same mitra from different instances → final SET state correct (atomic SADD) +- [ ] **[BE]** `availability:snapshot` cache miss on instance A fills the snapshot; instance B's next poll reads the cached value (cluster-shared cache works) +- [ ] **[BE]** Operator changes `max_customers_per_mitra` on one instance → `config:invalidate` pub/sub fires → other instance also DELs `availability:snapshot` +- [ ] **[BE]** Heartbeat mirror UPDATEs from multiple instances are idempotent (last writer wins on timestamp, no errors) + +--- + +## Smoke tests (quick happy path) + +5-minute sanity check after any deploy: + +- [ ] **[BE]** Backend log shows successful seed on startup +- [ ] **[M]** Mitra toggles online → appears in `SMEMBERS mitras:online` +- [ ] **[C]** Customer sees "Mulai Curhat" enabled +- [ ] **[C]** Customer pays → mitra accepts → chat starts → `mitra:capacity:` increments +- [ ] **[C]** Chat ends → counter decrements +- [ ] **[M]** Mitra toggles offline → removed from SET + +--- + +## Known limitations / what this checklist does NOT cover + +- **Load testing** — sustained heartbeat volume at prod scale (300+ mitras × 2 pings/min). Plan: separate load-test stage when prod is provisioned. +- **Memorystore-specific behavior** — failover, RDB+AOF interaction. Plan: re-run Sections A, G, I, J against Memorystore Standard tier before prod cutover. +- **Long-running drift** — overnight runs where eviction or memory-pressure could affect Valkey state. Plan: monitor `INFO memory` in prod for the first week.