Files
halobestie-clone/backend/test/services/session-timer.service.test.js
Ramadhan Sjamsani 553dbac52f Phase 6: Valkey availability mirror — move read path off Postgres
Mitra-availability state (online flag, deactivated flag, per-mitra session
count, heartbeat liveness) mirrored into Valkey so the customer beacon
+ pairing blast + dashboard counts no longer hit Postgres on the hot path.
Postgres remains the durable source of truth; Valkey state is fully
derivable via seedFromPostgres on startup + reconnect.

Schema
- mitras:online           SET    — mirror of is_online
- mitras:deactivated      SET    — mirror of is_active=false
- mitra:capacity:<id>     STRING — active+pending_payment session count
- mitra💓<id>    STRING — ISO timestamp of last ping
- availability:snapshot   JSON   — beacon cache, TTL 10s, cluster-shared

Write paths (Postgres first, best-effort Valkey)
- setOnline/setOffline mirror SADD/SREM + heartbeat SET/DEL
- updateMitraStatus mirrors mitras:deactivated AND revokes auth_sessions
  on deactivate (bounds the "ghost online" window to access-token TTL)
- heartbeat is Valkey-only on the hot path; the per-ping Postgres UPDATE
  on last_heartbeat_at is eliminated (was 1,200 ops/min at prod scale)
- chat_session lifecycle (accept/end/reroute/extension/expiry) calls
  recomputeCapacityForMitra after each UPDATE — derive-from-truth avoids
  the bookkeeping risk of per-transition INCR/DECR

Read paths (Valkey-first, Postgres fallback on Valkey error)
- isMitraReachable: SISMEMBER mitras:online + heartbeat freshness
- findAvailableMitras: SDIFF + pipelined GETs, filter by capacity + heartbeat
- countAvailableMitrasFromCache: Valkey-driven, cached cluster-wide 10s TTL
- dashboard online count: SCARD
- Each reader wraps Valkey ops in try/catch → Postgres fallback on outage

Heartbeat path on /api/mitra/status/heartbeat
- resolveMitra preHandler replaced with heartbeatGuard: SISMEMBER on
  mitras:deactivated (~0 DB hits per ping). Falls back to full DB
  resolveMitra if Valkey is unreachable so a Valkey outage doesn't
  silently accept heartbeats from deactivated mitras.

Three sweeps, env-configurable cadences
- MITRA_AUTO_OFFLINE_SWEEP_SECONDS (30) — Valkey-driven stale detection
- HEARTBEAT_MIRROR_INTERVAL_SECONDS (60) — batched UPSERT writes
  Valkey timestamps to Postgres last_heartbeat_at via UNNEST (1 statement
  per cycle, idempotent across instances)
- VALKEY_ONLINE_MIRROR_SWEEP_SECONDS (300) — periodic reseed heals drift

Startup
- restoreActiveTimers → seedFromPostgres → bind listeners
- onValkeyReady re-runs the seed on every reconnect (cold start + reseed
  on Valkey restart, no manual intervention)

Failure semantics
- Read fallback: every Valkey read wrapped, falls back to existing
  Postgres JOIN query — system stays correct during Valkey outage,
  performance degrades not breaks
- Write best-effort: Postgres write commits before Valkey is touched;
  Valkey errors log + continue; reconciliation sweep heals drift
- Auto-offline sweep aborts entirely on Valkey error (does NOT mass-
  offline via Postgres scan during Valkey hiccup)

Tests
- New: 32 integration tests in mitra-status.valkey-mirror.test.js
  covering seed, write-through, fallbacks, capacity lifecycle,
  auto-offline sweep, heartbeat mirror, deactivation flow, beacon cache
- Updated: fixtures.js seeds Valkey alongside Postgres when isOnline=true
- Updated: helpers/db.js resetDb also flushes test Valkey
- Fixed 2 pre-existing session-timer flakes (string IDs failed uuid
  parse; vi.advanceTimersByTimeAsync raced real Postgres I/O)
- All 124/124 backend tests pass (was 90/92)

Docs
- requirement/valkey-online-mirror-plan.md — canonical plan
- requirement/valkey-online-mirror-testing.md — manual E2E checklist
- requirement/deployment.md — infra + Valkey persistence guidance for
  prod (Memorystore Standard tier recommended; migration from
  self-hosted Valkey is zero-downtime via reseed-from-Postgres)

Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com>
2026-05-25 18:07:55 +08:00

127 lines
4.9 KiB
JavaScript

import { describe, it, expect, beforeEach, afterEach, vi } from 'vitest'
import { randomUUID } from 'node:crypto'
// Capture calls to sendToSessionParticipant so we can assert the 3-min warning event.
vi.mock('../../src/plugins/websocket.js', () => ({
sendToUser: vi.fn(() => true),
sendToSessionParticipant: vi.fn(() => true),
registerWebSocketPlugin: vi.fn(),
registerWebSocketRoute: vi.fn(),
isUserOnlineWs: vi.fn(() => true),
getSessionConnections: vi.fn(() => ({})),
}))
vi.mock('../../src/services/notification.service.js', () => ({
sendPushNotification: vi.fn(async () => true),
registerDeviceToken: vi.fn(async () => {}),
}))
// Real DB queries don't settle under fake timers (they're real socket I/O, not
// microtasks). Stub getDb() with a tagged-template-compatible mock so onThreeMinuteWarning's
// `SELECT expires_at FROM chat_sessions WHERE id = ${sessionId}` resolves synchronously.
vi.mock('../../src/db/client.js', () => {
const fakeSql = () => Promise.resolve([{ expires_at: null }])
fakeSql.unsafe = () => Promise.resolve([])
fakeSql.array = (arr) => arr
fakeSql.json = (v) => v
return { getDb: () => fakeSql }
})
vi.mock('../../src/plugins/valkey.js', () => {
const noopPipeline = { sadd: () => noopPipeline, srem: () => noopPipeline, set: () => noopPipeline, get: () => noopPipeline, del: () => noopPipeline, exec: async () => [] }
return {
publish: vi.fn(async () => {}),
subscribe: vi.fn(() => () => {}),
onValkeyReady: vi.fn(),
getValkeyClient: vi.fn(() => ({ setex: vi.fn(async () => 'OK') })),
getValkeyPub: vi.fn(),
getValkeySub: vi.fn(),
sadd: vi.fn(async () => 1),
srem: vi.fn(async () => 1),
sismember: vi.fn(async () => false),
smembers: vi.fn(async () => []),
sdiff: vi.fn(async () => []),
scard: vi.fn(async () => 0),
set: vi.fn(async () => 'OK'),
get: vi.fn(async () => null),
del: vi.fn(async () => 1),
incr: vi.fn(async () => 1),
decr: vi.fn(async () => 0),
exists: vi.fn(async () => 0),
pipeline: vi.fn(() => noopPipeline),
multi: vi.fn(() => noopPipeline),
}
})
const { sendToSessionParticipant } = await import('../../src/plugins/websocket.js')
const { startSessionTimer, clearSessionTimer } = await import('../../src/services/session-timer.service.js')
const { WsMessage, UserType } = await import('../../src/constants.js')
describe('session-timer 3-minute warning (Phase 4)', () => {
beforeEach(() => {
vi.useFakeTimers()
sendToSessionParticipant.mockClear()
})
afterEach(() => {
vi.useRealTimers()
})
it('emits session_warning kind:three_minutes_left exactly once at the 3-min mark', async () => {
// Real UUID — onThreeMinuteWarning runs a Postgres SELECT against chat_sessions.id
// which is uuid-typed; string ids throw a parse error before we hit the row check.
const sessionId = randomUUID()
const expiresAt = new Date(Date.now() + 5 * 60_000) // 5 minutes from now
startSessionTimer(sessionId, expiresAt)
// Advance 1 min 59 s — well before the 2-min mark when the 3-min warning fires.
await vi.advanceTimersByTimeAsync(60_000 + 59_000)
const warnCallsEarly = sendToSessionParticipant.mock.calls.filter(
([, , data]) => data?.type === WsMessage.SESSION_WARNING,
)
expect(warnCallsEarly).toHaveLength(0)
// Cross the 3-min-left threshold. 5 min total - 3 min = warning fires at t=2:00.
await vi.advanceTimersByTimeAsync(2_000)
// sendToSessionParticipant signature: (sessionId, userType, data)
const warnCalls = sendToSessionParticipant.mock.calls.filter(
([, , data]) => data?.type === WsMessage.SESSION_WARNING,
)
expect(warnCalls).toHaveLength(1)
const [calledSessionId, userType, data] = warnCalls[0]
expect(calledSessionId).toBe(sessionId)
expect(userType).toBe(UserType.CUSTOMER)
expect(data.kind).toBe('three_minutes_left')
expect(data.session_id).toBe(sessionId)
// Cleanup before expiry hits.
clearSessionTimer(sessionId)
})
it('does NOT re-fire the 3-min warning when the timer is rescheduled (e.g. extension)', async () => {
const sessionId = randomUUID()
const initial = new Date(Date.now() + 5 * 60_000)
startSessionTimer(sessionId, initial)
// Cross the 3-min mark on the original schedule.
await vi.advanceTimersByTimeAsync(2 * 60_000 + 1_000)
let warnCalls = sendToSessionParticipant.mock.calls.filter(
([, , data]) => data?.type === WsMessage.SESSION_WARNING,
)
expect(warnCalls).toHaveLength(1)
// Extension reschedules — give a new 5-min window. The 3-min warning must NOT fire again.
const extended = new Date(Date.now() + 5 * 60_000)
startSessionTimer(sessionId, extended)
await vi.advanceTimersByTimeAsync(2 * 60_000 + 1_000)
warnCalls = sendToSessionParticipant.mock.calls.filter(
([, , data]) => data?.type === WsMessage.SESSION_WARNING,
)
expect(warnCalls).toHaveLength(1) // still 1, no double-fire
clearSessionTimer(sessionId)
})
})