Files
halobestie-clone/backend/src/server.js
Ramadhan Sjamsani 553dbac52f Phase 6: Valkey availability mirror — move read path off Postgres
Mitra-availability state (online flag, deactivated flag, per-mitra session
count, heartbeat liveness) mirrored into Valkey so the customer beacon
+ pairing blast + dashboard counts no longer hit Postgres on the hot path.
Postgres remains the durable source of truth; Valkey state is fully
derivable via seedFromPostgres on startup + reconnect.

Schema
- mitras:online           SET    — mirror of is_online
- mitras:deactivated      SET    — mirror of is_active=false
- mitra:capacity:<id>     STRING — active+pending_payment session count
- mitra💓<id>    STRING — ISO timestamp of last ping
- availability:snapshot   JSON   — beacon cache, TTL 10s, cluster-shared

Write paths (Postgres first, best-effort Valkey)
- setOnline/setOffline mirror SADD/SREM + heartbeat SET/DEL
- updateMitraStatus mirrors mitras:deactivated AND revokes auth_sessions
  on deactivate (bounds the "ghost online" window to access-token TTL)
- heartbeat is Valkey-only on the hot path; the per-ping Postgres UPDATE
  on last_heartbeat_at is eliminated (was 1,200 ops/min at prod scale)
- chat_session lifecycle (accept/end/reroute/extension/expiry) calls
  recomputeCapacityForMitra after each UPDATE — derive-from-truth avoids
  the bookkeeping risk of per-transition INCR/DECR

Read paths (Valkey-first, Postgres fallback on Valkey error)
- isMitraReachable: SISMEMBER mitras:online + heartbeat freshness
- findAvailableMitras: SDIFF + pipelined GETs, filter by capacity + heartbeat
- countAvailableMitrasFromCache: Valkey-driven, cached cluster-wide 10s TTL
- dashboard online count: SCARD
- Each reader wraps Valkey ops in try/catch → Postgres fallback on outage

Heartbeat path on /api/mitra/status/heartbeat
- resolveMitra preHandler replaced with heartbeatGuard: SISMEMBER on
  mitras:deactivated (~0 DB hits per ping). Falls back to full DB
  resolveMitra if Valkey is unreachable so a Valkey outage doesn't
  silently accept heartbeats from deactivated mitras.

Three sweeps, env-configurable cadences
- MITRA_AUTO_OFFLINE_SWEEP_SECONDS (30) — Valkey-driven stale detection
- HEARTBEAT_MIRROR_INTERVAL_SECONDS (60) — batched UPSERT writes
  Valkey timestamps to Postgres last_heartbeat_at via UNNEST (1 statement
  per cycle, idempotent across instances)
- VALKEY_ONLINE_MIRROR_SWEEP_SECONDS (300) — periodic reseed heals drift

Startup
- restoreActiveTimers → seedFromPostgres → bind listeners
- onValkeyReady re-runs the seed on every reconnect (cold start + reseed
  on Valkey restart, no manual intervention)

Failure semantics
- Read fallback: every Valkey read wrapped, falls back to existing
  Postgres JOIN query — system stays correct during Valkey outage,
  performance degrades not breaks
- Write best-effort: Postgres write commits before Valkey is touched;
  Valkey errors log + continue; reconciliation sweep heals drift
- Auto-offline sweep aborts entirely on Valkey error (does NOT mass-
  offline via Postgres scan during Valkey hiccup)

Tests
- New: 32 integration tests in mitra-status.valkey-mirror.test.js
  covering seed, write-through, fallbacks, capacity lifecycle,
  auto-offline sweep, heartbeat mirror, deactivation flow, beacon cache
- Updated: fixtures.js seeds Valkey alongside Postgres when isOnline=true
- Updated: helpers/db.js resetDb also flushes test Valkey
- Fixed 2 pre-existing session-timer flakes (string IDs failed uuid
  parse; vi.advanceTimersByTimeAsync raced real Postgres I/O)
- All 124/124 backend tests pass (was 90/92)

Docs
- requirement/valkey-online-mirror-plan.md — canonical plan
- requirement/valkey-online-mirror-testing.md — manual E2E checklist
- requirement/deployment.md — infra + Valkey persistence guidance for
  prod (Memorystore Standard tier recommended; migration from
  self-hosted Valkey is zero-downtime via reseed-from-Postgres)

Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com>
2026-05-25 18:07:55 +08:00

136 lines
5.8 KiB
JavaScript

import 'dotenv/config'
import { buildPublicApp } from './app.public.js'
import { buildInternalApp } from './app.internal.js'
import { autoOfflineStaleMitras, seedFromPostgres, mirrorHeartbeatsToPostgres } from './services/mitra-status.service.js'
import {
getMitraAutoOfflineSweepSeconds,
getHeartbeatMirrorIntervalSeconds,
getValkeyOnlineMirrorSweepSeconds,
} from './services/config.service.js'
import { initFirebase } from './plugins/firebase.js'
import { restoreActiveTimers } from './services/session-timer.service.js'
import { expireStalePaymentRequests, registerPairingSubscriber } from './services/payment.service.js'
import { getXenditConfig } from './services/config.service.js'
const PUBLIC_PORT = process.env.PUBLIC_PORT || 3000
const INTERNAL_PORT = process.env.INTERNAL_PORT || 3001
const INTERNAL_HOST = process.env.INTERNAL_HOST || '127.0.0.1'
const start = async () => {
// Phase 5: fail fast if XENDIT_ENABLED=true without the required credentials.
// Bad config explodes at startup rather than at the first /payment-requests POST.
const xc = getXenditConfig()
if (xc.enabled) {
if (!xc.secretKey) throw new Error('XENDIT_ENABLED=true requires XENDIT_SECRET_KEY')
if (!xc.webhookToken || xc.webhookToken.length < 16) {
throw new Error('XENDIT_ENABLED=true requires XENDIT_WEBHOOK_TOKEN (>= 16 chars)')
}
}
initFirebase()
const publicApp = await buildPublicApp()
const internalApp = await buildInternalApp()
// restoreActiveTimers runs bulk UPDATEs on chat_sessions to clean up stale
// ACTIVE/CLOSING rows from before the restart. Run it BEFORE seedFromPostgres
// so the seed sees the post-cleanup state and capacity counters are accurate.
await restoreActiveTimers()
await seedFromPostgres()
await publicApp.listen({ port: PUBLIC_PORT, host: '0.0.0.0' })
console.log(`Public API listening on port ${PUBLIC_PORT}`)
await internalApp.listen({ port: INTERNAL_PORT, host: INTERNAL_HOST })
console.log(`Internal API listening on ${INTERNAL_HOST}:${INTERNAL_PORT}`)
// Phase 5: wire pairing service as a subscriber to payment_request.confirmed events.
// Must happen AFTER all services are loaded so the subscriber registration sees
// the EventEmitter set up by payment.service.js at module-load time.
registerPairingSubscriber()
// Phase 5: catch any payment_request.confirmed events that were lost across a restart
// by running the reconciliation sweeper immediately on boot. Without this, a customer
// whose payment confirmed during shutdown could be stranded for up to 60s waiting on
// the next sweeper tick.
try {
const result = await expireStalePaymentRequests()
if (result.expired > 0 || result.failed > 0 || result.reconciled > 0) {
console.log(`Startup reconciliation: ${result.expired} expired, ${result.failed} failed_delivery, ${result.reconciled} re-triggered`)
}
} catch (err) {
console.error('Startup reconciliation failed:', err)
}
// Auto-offline mitras with stale heartbeat (env-driven cadence, default 30s).
// Valkey-driven per requirement/valkey-online-mirror-plan.md.
setInterval(async () => {
try {
const count = await autoOfflineStaleMitras()
if (count > 0) console.log(`Auto-offlined ${count} stale mitra(s)`)
} catch (err) {
console.error('Auto-offline check failed:', err)
}
}, getMitraAutoOfflineSweepSeconds() * 1000)
// Batched heartbeat mirror: Valkey heartbeat timestamps → Postgres
// last_heartbeat_at (default 60s). Keeps forensic column current without
// per-ping DB writes. One UNNEST UPDATE per tick; idempotent across instances.
setInterval(async () => {
try {
await mirrorHeartbeatsToPostgres()
} catch (err) {
console.error('Heartbeat mirror failed:', err)
}
}, getHeartbeatMirrorIntervalSeconds() * 1000)
// Reconciliation sweep: heal Valkey/Postgres drift (default 300s; 0 disables).
// Belt-and-braces against failed best-effort Valkey writes, out-of-band
// Postgres mutations, evictions. Idempotent — just runs the seed.
const reconciliationSeconds = getValkeyOnlineMirrorSweepSeconds()
if (reconciliationSeconds > 0) {
setInterval(async () => {
try {
await seedFromPostgres()
} catch (err) {
console.error('Valkey reconciliation sweep failed:', err)
}
}, reconciliationSeconds * 1000)
}
// Expire stale payment_requests + reconcile lost subscriber work (every 60s).
// Pending past expires_at → expired (no failure row).
// Confirmed-but-stale → failed_delivery (writes a pairing_failures row).
// Confirmed-with-no-chat-session-yet → re-trigger the pairing subscriber (recovery from
// lost EventEmitter notifications across restart). See payment.service.js for details.
setInterval(async () => {
try {
const result = await expireStalePaymentRequests()
if (result.expired > 0 || result.failed > 0 || result.reconciled > 0) {
console.log(`Payment sweeper: ${result.expired} expired, ${result.failed} failed_delivery, ${result.reconciled} re-triggered`)
}
} catch (err) {
console.error('Payment request sweeper failed:', err)
}
}, 60_000)
}
// SIGTERM trap — Cloud Run gives ~10s grace before SIGKILL. Use it to drain in-flight
// EventEmitter handlers (Stage 5 of phase5-xendit-plan.md). app.close() stops accepting
// new requests; the timeout gives subscribers their last chance to finish.
const shutdown = async () => {
console.log('SIGTERM received — closing servers, draining handlers')
// App handles are scoped inside start(); fire-and-forget here is fine because both
// listeners' .close() is idempotent and process.exit truncates anything still pending.
await new Promise(r => setTimeout(r, 8_000))
process.exit(0)
}
process.on('SIGTERM', shutdown)
process.on('SIGINT', shutdown)
start().catch((err) => {
console.error(err)
process.exit(1)
})