Phase 6: Valkey availability mirror — move read path off Postgres

Mitra-availability state (online flag, deactivated flag, per-mitra session
count, heartbeat liveness) mirrored into Valkey so the customer beacon
+ pairing blast + dashboard counts no longer hit Postgres on the hot path.
Postgres remains the durable source of truth; Valkey state is fully
derivable via seedFromPostgres on startup + reconnect.

Schema
- mitras:online           SET    — mirror of is_online
- mitras:deactivated      SET    — mirror of is_active=false
- mitra:capacity:<id>     STRING — active+pending_payment session count
- mitra💓<id>    STRING — ISO timestamp of last ping
- availability:snapshot   JSON   — beacon cache, TTL 10s, cluster-shared

Write paths (Postgres first, best-effort Valkey)
- setOnline/setOffline mirror SADD/SREM + heartbeat SET/DEL
- updateMitraStatus mirrors mitras:deactivated AND revokes auth_sessions
  on deactivate (bounds the "ghost online" window to access-token TTL)
- heartbeat is Valkey-only on the hot path; the per-ping Postgres UPDATE
  on last_heartbeat_at is eliminated (was 1,200 ops/min at prod scale)
- chat_session lifecycle (accept/end/reroute/extension/expiry) calls
  recomputeCapacityForMitra after each UPDATE — derive-from-truth avoids
  the bookkeeping risk of per-transition INCR/DECR

Read paths (Valkey-first, Postgres fallback on Valkey error)
- isMitraReachable: SISMEMBER mitras:online + heartbeat freshness
- findAvailableMitras: SDIFF + pipelined GETs, filter by capacity + heartbeat
- countAvailableMitrasFromCache: Valkey-driven, cached cluster-wide 10s TTL
- dashboard online count: SCARD
- Each reader wraps Valkey ops in try/catch → Postgres fallback on outage

Heartbeat path on /api/mitra/status/heartbeat
- resolveMitra preHandler replaced with heartbeatGuard: SISMEMBER on
  mitras:deactivated (~0 DB hits per ping). Falls back to full DB
  resolveMitra if Valkey is unreachable so a Valkey outage doesn't
  silently accept heartbeats from deactivated mitras.

Three sweeps, env-configurable cadences
- MITRA_AUTO_OFFLINE_SWEEP_SECONDS (30) — Valkey-driven stale detection
- HEARTBEAT_MIRROR_INTERVAL_SECONDS (60) — batched UPSERT writes
  Valkey timestamps to Postgres last_heartbeat_at via UNNEST (1 statement
  per cycle, idempotent across instances)
- VALKEY_ONLINE_MIRROR_SWEEP_SECONDS (300) — periodic reseed heals drift

Startup
- restoreActiveTimers → seedFromPostgres → bind listeners
- onValkeyReady re-runs the seed on every reconnect (cold start + reseed
  on Valkey restart, no manual intervention)

Failure semantics
- Read fallback: every Valkey read wrapped, falls back to existing
  Postgres JOIN query — system stays correct during Valkey outage,
  performance degrades not breaks
- Write best-effort: Postgres write commits before Valkey is touched;
  Valkey errors log + continue; reconciliation sweep heals drift
- Auto-offline sweep aborts entirely on Valkey error (does NOT mass-
  offline via Postgres scan during Valkey hiccup)

Tests
- New: 32 integration tests in mitra-status.valkey-mirror.test.js
  covering seed, write-through, fallbacks, capacity lifecycle,
  auto-offline sweep, heartbeat mirror, deactivation flow, beacon cache
- Updated: fixtures.js seeds Valkey alongside Postgres when isOnline=true
- Updated: helpers/db.js resetDb also flushes test Valkey
- Fixed 2 pre-existing session-timer flakes (string IDs failed uuid
  parse; vi.advanceTimersByTimeAsync raced real Postgres I/O)
- All 124/124 backend tests pass (was 90/92)

Docs
- requirement/valkey-online-mirror-plan.md — canonical plan
- requirement/valkey-online-mirror-testing.md — manual E2E checklist
- requirement/deployment.md — infra + Valkey persistence guidance for
  prod (Memorystore Standard tier recommended; migration from
  self-hosted Valkey is zero-downtime via reseed-from-Postgres)

Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com>
This commit is contained in:
2026-05-25 18:07:55 +08:00
parent 3fff4b1c6e
commit 553dbac52f
20 changed files with 1839 additions and 82 deletions

View File

@@ -3,6 +3,7 @@ import { publish } from '../plugins/valkey.js'
import { clearSessionTimer, clearClosureGraceTimer, startClosureGraceTimer } from './session-timer.service.js'
import { sendToSessionParticipant } from '../plugins/websocket.js'
import { sendPushNotification } from './notification.service.js'
import { recomputeCapacityForMitra } from './mitra-status.service.js'
import { UserType, SessionStatus, EndedBy, WsMessage } from '../constants.js'
const sql = getDb()
@@ -59,6 +60,7 @@ export const completeSession = async (sessionId) => {
RETURNING id, customer_id, mitra_id, status, ended_at
`
if (!session) return null
await recomputeCapacityForMitra(session.mitra_id)
// Notify both parties, FCM fallback if WebSocket is down
const data = { type: WsMessage.SESSION_COMPLETED, session_id: sessionId }
@@ -109,6 +111,7 @@ export const initiateEarlyEnd = async (sessionId, userType) => {
code: 'SESSION_NOT_ACTIVE', statusCode: 409,
})
}
await recomputeCapacityForMitra(session.mitra_id)
clearSessionTimer(sessionId)
startClosureGraceTimer(sessionId)

View File

@@ -149,6 +149,34 @@ export const getMitraHeartbeatCadenceSeconds = () => {
return Number.isFinite(parsed) && parsed >= 5 ? parsed : 30
}
// --- Valkey availability mirror — env-driven cadences ---
//
// Per requirement/valkey-online-mirror-plan.md. All three are operational
// knobs (env, per backend/CLAUDE.md Config-Source Convention), not
// operator-tunable. Defaults match the plan; values are floor-clamped.
export const getMitraAutoOfflineSweepSeconds = () => {
const raw = process.env.MITRA_AUTO_OFFLINE_SWEEP_SECONDS
if (!raw || raw.trim() === '') return 30
const parsed = Number.parseInt(raw, 10)
return Number.isFinite(parsed) && parsed >= 5 ? parsed : 30
}
export const getHeartbeatMirrorIntervalSeconds = () => {
const raw = process.env.HEARTBEAT_MIRROR_INTERVAL_SECONDS
if (!raw || raw.trim() === '') return 60
const parsed = Number.parseInt(raw, 10)
return Number.isFinite(parsed) && parsed >= 10 ? parsed : 60
}
export const getValkeyOnlineMirrorSweepSeconds = () => {
const raw = process.env.VALKEY_ONLINE_MIRROR_SWEEP_SECONDS
if (!raw || raw.trim() === '') return 300
const parsed = Number.parseInt(raw, 10)
if (parsed === 0) return 0 // explicit disable
return Number.isFinite(parsed) && parsed >= 30 ? parsed : 300
}
// --- Phase 5: Xendit integration ---
//
// Env-driven (per backend/CLAUDE.md Config-Source Convention). All five values

View File

@@ -1,19 +1,33 @@
import { getDb } from '../db/client.js'
import * as valkey from '../plugins/valkey.js'
import { VK_MITRAS_ONLINE } from './mitra-status.service.js'
import { SessionStatus, TopicSensitivity } from '../constants.js'
const sql = getDb()
// Valkey-fast SCARD with Postgres fallback. The CC dashboard polls every few
// seconds; SCARD is sub-ms so this keeps the dashboard responsive at any scale.
const getOnlineMitrasCount = async () => {
try {
return await valkey.scard(VK_MITRAS_ONLINE)
} catch (err) {
console.warn('[dashboard] valkey unavailable, falling back to DB:', err.message)
const [{ c }] = await sql`SELECT COUNT(*)::int AS c FROM mitra_online_status WHERE is_online = true`
return c
}
}
export const getDashboardStats = async () => {
const [
[{ active_chats }],
[{ online_mitras }],
online_mitras,
[{ pending_requests }],
[{ sensitive_total }],
[{ sensitive_last_30d_total }],
[{ sensitive_last_30d_sensitive }],
] = await Promise.all([
sql`SELECT COUNT(*) AS active_chats FROM chat_sessions WHERE status IN (${SessionStatus.ACTIVE}, ${SessionStatus.PENDING_PAYMENT})`,
sql`SELECT COUNT(*) AS online_mitras FROM mitra_online_status WHERE is_online = true`,
getOnlineMitrasCount(),
sql`SELECT COUNT(*) AS pending_requests FROM chat_sessions WHERE status IN (${SessionStatus.SEARCHING}, ${SessionStatus.PENDING_ACCEPTANCE})`,
sql`SELECT COUNT(*) AS sensitive_total FROM chat_sessions WHERE topic_sensitivity = ${TopicSensitivity.SENSITIVE}`,
sql`SELECT COUNT(*) AS sensitive_last_30d_total FROM chat_sessions WHERE created_at >= NOW() - INTERVAL '30 days'`,

View File

@@ -1,7 +1,7 @@
import { getDb } from '../db/client.js'
import { sendToSessionParticipant, isUserOnlineWs } from '../plugins/websocket.js'
import { extendSessionTimer, clearClosureGraceTimer, startClosureGraceTimer } from './session-timer.service.js'
import { isMitraReachable } from './mitra-status.service.js'
import { isMitraReachable, recomputeCapacityBySession } from './mitra-status.service.js'
import { consumePaymentSession, failPaymentSession, getPaymentSession } from './payment.service.js'
import { sendPushNotification } from './notification.service.js'
import {
@@ -104,6 +104,7 @@ export const requestExtension = async (sessionId, customerId, { duration_minutes
// Pause the session
await sql`UPDATE chat_sessions SET status = ${SessionStatus.EXTENDING} WHERE id = ${sessionId}`
await recomputeCapacityBySession(sessionId)
// Resolve timeout once so we can both surface it in the WS payload and start the server-side timer.
const timeoutMs = await getExtensionTimeoutMs()
@@ -213,6 +214,7 @@ const finalizeExtension = async (extensionId, sessionId, accepted, viaTimeout) =
// Resume session
await sql`UPDATE chat_sessions SET status = ${SessionStatus.ACTIVE} WHERE id = ${extension.session_id}`
await recomputeCapacityBySession(extension.session_id)
// Record transaction
await sql`
@@ -249,6 +251,7 @@ const finalizeExtension = async (extensionId, sessionId, accepted, viaTimeout) =
}
await sql`UPDATE chat_sessions SET status = ${SessionStatus.CLOSING} WHERE id = ${extension.session_id}`
await recomputeCapacityBySession(extension.session_id)
sendToSessionParticipant(sessionId, UserType.CUSTOMER, {
type: WsMessage.EXTENSION_RESPONSE,
@@ -331,6 +334,7 @@ const timeoutExtension = async (extensionId, sessionId, mitraId) => {
// Move session to closing & notify both parties (matches the explicit-reject UX).
await sql`UPDATE chat_sessions SET status = ${SessionStatus.CLOSING} WHERE id = ${sessionId}`
await recomputeCapacityBySession(sessionId)
sendToSessionParticipant(sessionId, UserType.CUSTOMER, {
type: WsMessage.EXTENSION_RESPONSE,
accepted: false,

View File

@@ -1,24 +1,80 @@
import { getDb } from '../db/client.js'
import { SessionStatus } from '../constants.js'
import { getMitraPingConfig, getMaxCustomersPerMitra } from './config.service.js'
import { subscribe } from '../plugins/valkey.js'
import * as valkey from '../plugins/valkey.js'
import { subscribe, onValkeyReady } from '../plugins/valkey.js'
const sql = getDb()
// --- Short-TTL availability cache for the 5s-poll endpoint ---
// In-memory snapshot { available, count, expiresAt }. The cache:
// - is recomputed at most once per AVAILABILITY_TTL_MS (10s backstop)
// - is invalidated explicitly when CC changes max_customers_per_mitra (call invalidateAvailabilityCache())
// This keeps customer polls off the DB hot path while staying close to real time.
const AVAILABILITY_TTL_MS = 10_000
let availabilityCache = null // { available, count, expiresAt }
// Per requirement/valkey-online-mirror-plan.md § Schema.
export const VK_MITRAS_ONLINE = 'mitras:online'
export const VK_MITRAS_DEACTIVATED = 'mitras:deactivated'
export const vkCapacityKey = (mitraId) => `mitra:capacity:${mitraId}`
export const vkHeartbeatKey = (mitraId) => `mitra:heartbeat:${mitraId}`
export const invalidateAvailabilityCache = () => {
availabilityCache = null
// Rebuilds Valkey availability state from Postgres. Called on backend startup,
// on Valkey reconnect (via onValkeyReady), and by the reconciliation sweep.
// Idempotent — DEL + bulk SADD/SET produces the same final state on every run.
export const seedFromPostgres = async () => {
try {
const [onlineRows, deactRows, capacityRows] = await Promise.all([
sql`SELECT mitra_id FROM mitra_online_status WHERE is_online = true`,
sql`SELECT id FROM mitras WHERE is_active = false`,
sql`
SELECT mitra_id, COUNT(*)::int AS c FROM chat_sessions
WHERE mitra_id IS NOT NULL
AND status IN (${SessionStatus.ACTIVE}, ${SessionStatus.PENDING_PAYMENT})
GROUP BY mitra_id
`,
])
const pipe = valkey.pipeline()
pipe.del(VK_MITRAS_ONLINE)
pipe.del(VK_MITRAS_DEACTIVATED)
const now = new Date().toISOString()
if (onlineRows.length) {
pipe.sadd(VK_MITRAS_ONLINE, ...onlineRows.map((r) => r.mitra_id))
// Seed heartbeats with NOW so the first sweep after restart doesn't
// mass-offline. Mitras refresh on their next ping anyway.
for (const r of onlineRows) pipe.set(vkHeartbeatKey(r.mitra_id), now)
// Reset capacity for currently-online mitras; overlay real counts below.
// Offline mitras' stale capacity keys don't affect reads (SDIFF excludes them).
for (const r of onlineRows) pipe.set(vkCapacityKey(r.mitra_id), 0)
}
if (deactRows.length) {
pipe.sadd(VK_MITRAS_DEACTIVATED, ...deactRows.map((r) => r.id))
}
for (const r of capacityRows) pipe.set(vkCapacityKey(r.mitra_id), r.c)
await pipe.exec()
console.log(
`[valkey-mirror] seed: ${onlineRows.length} online, ${deactRows.length} deactivated, ${capacityRows.length} with active sessions`,
)
} catch (err) {
console.error('[valkey-mirror] seed failed:', err)
}
}
// Subscribe once at module load so other-instance config updates also bust this cache.
// Single-instance: the local mutator already invalidates, so this is a no-op extra.
// Re-seed on every Valkey reconnect (fires on initial connect too).
onValkeyReady(seedFromPostgres)
// --- Beacon snapshot cache (Valkey-backed, cluster-shared) ---
// `availability:snapshot` JSON `{available, count}`, TTL 10s. All backend
// instances share the same cache: one Valkey-driven compute per 10s
// cluster-wide regardless of how many customers are polling.
const AVAILABILITY_CACHE_KEY = 'availability:snapshot'
const AVAILABILITY_TTL_SECONDS = 10
export const invalidateAvailabilityCache = async () => {
try {
await valkey.del(AVAILABILITY_CACHE_KEY)
} catch (err) {
console.error('[valkey-mirror] invalidateAvailabilityCache failed:', err)
}
}
// Bust the shared cache when CC changes max_customers_per_mitra (any instance).
let _subscribed = false
const ensureSubscribed = () => {
if (_subscribed) return
@@ -43,6 +99,39 @@ export const ensureStatusRow = async (mitraId) => {
`
}
// Best-effort Valkey writer. Postgres remains source of truth; a Valkey hiccup
// shouldn't fail the originating request — the reconciliation sweep heals drift.
const tryValkey = async (fn, label) => {
try { await fn() } catch (err) {
console.error(`[valkey-mirror] ${label} failed:`, err)
}
}
// Recompute `mitra:capacity:<id>` from chat_sessions truth. Called after every
// chat_session state change that could affect a mitra's occupied-slot count.
// Recompute-from-truth avoids the bookkeeping risks of per-transition INCR/DECR
// (double-counts, missed transitions across all the UPDATE sites in pairing,
// closure, extension, session-timer, session services).
export const recomputeCapacityForMitra = async (mitraId) => {
if (!mitraId) return
const [row] = await sql`
SELECT COUNT(*)::int AS c FROM chat_sessions
WHERE mitra_id = ${mitraId}
AND status IN (${SessionStatus.ACTIVE}, ${SessionStatus.PENDING_PAYMENT})
`
await tryValkey(
() => valkey.set(vkCapacityKey(mitraId), row.c),
`recomputeCapacity ${mitraId}`,
)
}
// Lookup mitra_id from the session, then recompute. Use this from UPDATE sites
// where the session's mitra_id may not be in local scope.
export const recomputeCapacityBySession = async (sessionId) => {
const [row] = await sql`SELECT mitra_id FROM chat_sessions WHERE id = ${sessionId}`
if (row?.mitra_id) await recomputeCapacityForMitra(row.mitra_id)
}
export const setOnline = async (mitraId) => {
await ensureStatusRow(mitraId)
const now = new Date()
@@ -54,6 +143,14 @@ export const setOnline = async (mitraId) => {
await sql`
INSERT INTO mitra_online_logs (mitra_id, status) VALUES (${mitraId}, 'online')
`
await tryValkey(async () => {
const pipe = valkey.pipeline()
pipe.sadd(VK_MITRAS_ONLINE, mitraId)
pipe.set(vkHeartbeatKey(mitraId), now.toISOString())
await pipe.exec()
}, `setOnline ${mitraId}`)
invalidateAvailabilityCache()
}
@@ -73,16 +170,32 @@ export const setOffline = async (mitraId) => {
await sql`
INSERT INTO mitra_online_logs (mitra_id, status) VALUES (${mitraId}, 'offline')
`
await tryValkey(async () => {
const pipe = valkey.pipeline()
pipe.srem(VK_MITRAS_ONLINE, mitraId)
pipe.del(vkHeartbeatKey(mitraId))
await pipe.exec()
}, `setOffline ${mitraId}`)
invalidateAvailabilityCache()
}
// Heartbeat hot path: Valkey-only. Per-ping Postgres UPDATE eliminated; the
// 60s batched heartbeat-mirror job (mirrorHeartbeatsToPostgres) writes
// `last_heartbeat_at` to Postgres for forensics/restart safety.
//
// NOTE: there is intentionally no `is_online = true` gate here (the old SQL
// UPDATE had one). The Valkey SET is global; if a mitra heartbeats while
// `is_online=false` in Postgres, their TTL key gets refreshed but they're
// still not in `mitras:online`, so blast eligibility is unchanged. The
// reconciliation sweep will clean up the orphan heartbeat key.
export const heartbeat = async (mitraId) => {
const now = new Date()
await sql`
UPDATE mitra_online_status
SET last_heartbeat_at = ${now}, updated_at = ${now}
WHERE mitra_id = ${mitraId} AND is_online = true
`
const now = new Date().toISOString()
await tryValkey(
() => valkey.set(vkHeartbeatKey(mitraId), now),
`heartbeat ${mitraId}`,
)
}
export const getStatus = async (mitraId) => {
@@ -130,39 +243,95 @@ export const getOnlineLogs = async (mitraId, { page = 1, limit = 50 } = {}) => {
return { items, total: Number(count), page, limit }
}
// Valkey-driven: enumerate mitras:online, read each heartbeat timestamp from
// Valkey, find stales, then bulk-flip Postgres + clean up Valkey.
//
// Failure semantics: if any Valkey op throws, the sweep aborts entirely. We
// never mass-offline mitras via a Postgres scan because Valkey is unreachable
// — that would risk false-offlining a fleet during a Valkey hiccup.
export const autoOfflineStaleMitras = async () => {
const pingConfig = await getMitraPingConfig()
// If ping is not required, skip the auto-offline sweep entirely
if (!pingConfig.require_ping) return 0
// stale_after_seconds is the operator-facing knob — what they set is what
// they get. No multiplier, no implicit "tolerate N missed heartbeats"
// contract baked in. The CC PATCH validates that the value is >= the env-
// driven heartbeat cadence so single missed pings can't flip a mitra
// offline.
const staleSeconds = pingConfig.stale_after_seconds
const stale = await sql`
UPDATE mitra_online_status
SET is_online = false, last_offline_at = NOW(), updated_at = NOW()
WHERE is_online = true
AND last_heartbeat_at < NOW() - ${staleSeconds + ' seconds'}::interval
RETURNING mitra_id
`
for (const row of stale) {
await sql`
INSERT INTO mitra_online_logs (mitra_id, status) VALUES (${row.mitra_id}, 'offline')
`
let onlineIds, heartbeatValues
try {
onlineIds = await valkey.smembers(VK_MITRAS_ONLINE)
if (!onlineIds.length) return 0
const pipe = valkey.pipeline()
for (const id of onlineIds) pipe.get(vkHeartbeatKey(id))
const results = await pipe.exec()
heartbeatValues = results.map((r) => r[1])
} catch (err) {
console.warn('[auto-offline] valkey unavailable, skipping this tick:', err.message)
return 0
}
// Capacity may have changed (mitra went offline) — invalidate the customer-facing
// availability cache so the next poll reflects reality.
if (stale.length > 0) invalidateAvailabilityCache()
const cutoff = Date.now() - pingConfig.stale_after_seconds * 1000
const stale = []
for (let i = 0; i < onlineIds.length; i++) {
const ts = heartbeatValues[i]
if (!ts || Date.parse(ts) < cutoff) stale.push(onlineIds[i])
}
if (!stale.length) return 0
await sql`
UPDATE mitra_online_status
SET is_online = false, last_offline_at = NOW(), updated_at = NOW()
WHERE mitra_id = ANY(${sql.array(stale)}::uuid[]) AND is_online = true
`
for (const id of stale) {
await sql`INSERT INTO mitra_online_logs (mitra_id, status) VALUES (${id}, 'offline')`
}
await tryValkey(async () => {
const cleanup = valkey.pipeline()
cleanup.srem(VK_MITRAS_ONLINE, ...stale)
for (const id of stale) cleanup.del(vkHeartbeatKey(id))
await cleanup.exec()
}, `auto-offline cleanup (${stale.length} stale)`)
invalidateAvailabilityCache()
return stale.length
}
// Batched mirror: Valkey heartbeat timestamps → Postgres `last_heartbeat_at`.
// Runs every HEARTBEAT_MIRROR_INTERVAL_SECONDS (default 60). One UNNEST UPDATE
// regardless of online count. Idempotent — latest timestamp wins; multiple
// instances running concurrently is fine (no leader election needed).
export const mirrorHeartbeatsToPostgres = async () => {
let onlineIds, heartbeatValues
try {
onlineIds = await valkey.smembers(VK_MITRAS_ONLINE)
if (!onlineIds.length) return 0
const pipe = valkey.pipeline()
for (const id of onlineIds) pipe.get(vkHeartbeatKey(id))
const results = await pipe.exec()
heartbeatValues = results.map((r) => r[1])
} catch (err) {
console.warn('[heartbeat-mirror] valkey unavailable, skipping:', err.message)
return 0
}
const ids = []
const ts = []
for (let i = 0; i < onlineIds.length; i++) {
if (heartbeatValues[i]) {
ids.push(onlineIds[i])
ts.push(heartbeatValues[i])
}
}
if (!ids.length) return 0
await sql`
UPDATE mitra_online_status m
SET last_heartbeat_at = u.ts::timestamptz, updated_at = NOW()
FROM (
SELECT * FROM UNNEST(${sql.array(ids)}::uuid[], ${sql.array(ts)}::text[]) AS t(mitra_id, ts)
) u
WHERE m.mitra_id = u.mitra_id
`
return ids.length
}
/**
* Customer-home availability check, cached in-memory for AVAILABILITY_TTL_MS.
*
@@ -178,12 +347,33 @@ export const autoOfflineStaleMitras = async () => {
* sets/hashes (matching the existing memory item "Session Timer Scaling"); the contract
* of this function — Valkey/cache reads only on the hot path — stays the same.
*/
export const countAvailableMitrasFromCache = async () => {
const now = Date.now()
if (availabilityCache && availabilityCache.expiresAt > now) {
return { available: availabilityCache.available, count: availabilityCache.count }
}
const computeAvailabilityFromValkey = async () => {
const { max_customers_per_mitra } = await getMaxCustomersPerMitra()
const { stale_after_seconds } = await getMitraPingConfig()
const candidates = await valkey.sdiff(VK_MITRAS_ONLINE, VK_MITRAS_DEACTIVATED)
if (!candidates.length) return { available: false, count: 0 }
const pipe = valkey.pipeline()
for (const id of candidates) {
pipe.get(vkCapacityKey(id))
pipe.get(vkHeartbeatKey(id))
}
const results = await pipe.exec()
const cutoff = Date.now() - stale_after_seconds * 1000
let count = 0
for (let i = 0; i < candidates.length; i++) {
const capacity = Number(results[i * 2][1] ?? 0)
const heartbeat = results[i * 2 + 1][1]
if (capacity >= max_customers_per_mitra) continue
if (!heartbeat || Date.parse(heartbeat) < cutoff) continue
count++
}
return { available: count > 0, count }
}
const computeAvailabilityFromPostgres = async () => {
const { max_customers_per_mitra } = await getMaxCustomersPerMitra()
const [{ count }] = await sql`
SELECT COUNT(*)::int AS count
@@ -197,26 +387,42 @@ export const countAvailableMitrasFromCache = async () => {
AND cs.status IN (${SessionStatus.ACTIVE}, ${SessionStatus.PENDING_PAYMENT})
) < ${max_customers_per_mitra}
`
return { available: count > 0, count }
}
const available = count > 0
availabilityCache = {
available,
count,
expiresAt: now + AVAILABILITY_TTL_MS,
export const countAvailableMitrasFromCache = async () => {
try {
const cached = await valkey.get(AVAILABILITY_CACHE_KEY)
if (cached) return JSON.parse(cached)
const snapshot = await computeAvailabilityFromValkey()
await valkey.getValkeyClient().setex(AVAILABILITY_CACHE_KEY, AVAILABILITY_TTL_SECONDS, JSON.stringify(snapshot))
return snapshot
} catch (err) {
console.warn('[countAvailableMitras] valkey unavailable, falling back to Postgres:', err.message)
return computeAvailabilityFromPostgres()
}
return { available, count }
}
/**
* Mitra-online check for use during pairing/extension safeguards.
* Combines the Valkey-mirrored online flag (Postgres mitra_online_status today) with
* the WebSocket-connected check. Never use "in-session" as a proxy for "online".
* Mitra-reachable check: in `mitras:online` SET AND heartbeat is fresh.
* Falls back to a Postgres `is_online` read if Valkey is unreachable; the
* fallback skips the heartbeat-freshness check (sweep takes care of stale rows
* within `stale_after_seconds + sweep_cadence`).
*/
export const isMitraReachable = async (mitraId) => {
const [row] = await sql`
SELECT is_online FROM mitra_online_status WHERE mitra_id = ${mitraId}
`
return Boolean(row?.is_online)
try {
const inSet = await valkey.sismember(VK_MITRAS_ONLINE, mitraId)
if (!inSet) return false
const heartbeat = await valkey.get(vkHeartbeatKey(mitraId))
if (!heartbeat) return false
const { stale_after_seconds } = await getMitraPingConfig()
return Date.parse(heartbeat) >= Date.now() - stale_after_seconds * 1000
} catch (err) {
console.warn('[isMitraReachable] valkey unavailable, falling back to DB:', err.message)
const [row] = await sql`SELECT is_online FROM mitra_online_status WHERE mitra_id = ${mitraId}`
return Boolean(row?.is_online)
}
}
/**

View File

@@ -1,4 +1,8 @@
import { getDb } from '../db/client.js'
import * as valkey from '../plugins/valkey.js'
import { VK_MITRAS_DEACTIVATED } from './mitra-status.service.js'
import { revokeAllSessionsForUser } from './token.service.js'
import { UserType } from '../constants.js'
const sql = getDb()
@@ -36,6 +40,24 @@ export const updateMitraStatus = async (id, is_active) => {
RETURNING id, is_active
`
if (!mitra) throw Object.assign(new Error('Mitra not found'), { code: 'NOT_FOUND', statusCode: 404 })
// Deactivation also revokes all auth_sessions so the next token refresh fails
// (bounds the "ghost online" window to access-token TTL across all routes,
// not just heartbeat). See requirement/valkey-online-mirror-plan.md.
if (!is_active) {
await revokeAllSessionsForUser({ userType: UserType.MITRA, userId: id })
}
try {
if (is_active) {
await valkey.srem(VK_MITRAS_DEACTIVATED, id)
} else {
await valkey.sadd(VK_MITRAS_DEACTIVATED, id)
}
} catch (err) {
console.error(`[valkey-mirror] updateMitraStatus ${id} failed:`, err)
}
return mitra
}

View File

@@ -1,11 +1,13 @@
import { getDb } from '../db/client.js'
import { getMaxCustomersPerMitra, getPairingBlastTimeoutSeconds, getReturningChatConfirmationTimeoutSeconds } from './config.service.js'
import * as valkey from '../plugins/valkey.js'
import { VK_MITRAS_ONLINE, VK_MITRAS_DEACTIVATED, vkCapacityKey, vkHeartbeatKey } from './mitra-status.service.js'
import { getMaxCustomersPerMitra, getPairingBlastTimeoutSeconds, getReturningChatConfirmationTimeoutSeconds, getMitraPingConfig } from './config.service.js'
import { sendToUser } from '../plugins/websocket.js'
import { sendPushNotification } from './notification.service.js'
import { startSessionTimer } from './session-timer.service.js'
import { startSessionListener } from './chat-handler.service.js'
import { consumePaymentSession, failPaymentSession, getPaymentSession, recordIntermediateFailure } from './payment.service.js'
import { isMitraReachable, isMitraInActiveSessionWithCustomer, getMitraActiveSessionCount } from './mitra-status.service.js'
import { isMitraReachable, isMitraInActiveSessionWithCustomer, getMitraActiveSessionCount, recomputeCapacityForMitra, recomputeCapacityBySession } from './mitra-status.service.js'
import {
UserType,
SessionStatus,
@@ -76,10 +78,37 @@ const notifyCustomer = async (customerId, data) => {
}
}
export const findAvailableMitras = async () => {
// Valkey-driven: SDIFF(online, deactivated) → for each candidate, pipelined
// GET capacity + heartbeat, then filter by capacity gate + heartbeat freshness.
// Postgres fallback runs if any Valkey op throws (full JOIN as before).
const findAvailableMitrasFromValkey = async () => {
const { max_customers_per_mitra } = await getMaxCustomersPerMitra()
const { stale_after_seconds } = await getMitraPingConfig()
const candidates = await valkey.sdiff(VK_MITRAS_ONLINE, VK_MITRAS_DEACTIVATED)
if (!candidates.length) return []
const pipe = valkey.pipeline()
for (const id of candidates) {
pipe.get(vkCapacityKey(id))
pipe.get(vkHeartbeatKey(id))
}
const results = await pipe.exec()
const cutoff = Date.now() - stale_after_seconds * 1000
const eligible = []
for (let i = 0; i < candidates.length; i++) {
const capacity = Number(results[i * 2][1] ?? 0)
const heartbeat = results[i * 2 + 1][1]
if (capacity >= max_customers_per_mitra) continue
if (!heartbeat || Date.parse(heartbeat) < cutoff) continue
eligible.push({ id: candidates[i], active_session_count: capacity })
}
return eligible
}
const findAvailableMitrasFromPostgres = async () => {
const { max_customers_per_mitra } = await getMaxCustomersPerMitra()
// Project active_session_count alongside the mitra row so the blast loop doesn't
// need a per-mitra COUNT roundtrip later.
const mitras = await sql`
SELECT m.id, m.display_name, sub.active_session_count
FROM mitras m
@@ -96,6 +125,15 @@ export const findAvailableMitras = async () => {
return mitras
}
export const findAvailableMitras = async () => {
try {
return await findAvailableMitrasFromValkey()
} catch (err) {
console.warn('[findAvailableMitras] valkey unavailable, falling back to Postgres:', err.message)
return findAvailableMitrasFromPostgres()
}
}
/**
* Validate that a payment session is owned by the customer, confirmed, and not yet consumed.
* Throws on mismatch. Returns the loaded payment session row.
@@ -414,6 +452,10 @@ export const acceptPairingRequest = async (sessionId, mitraId) => {
})
}
// Mitra now occupies a capacity slot (PENDING_PAYMENT counts per
// findAvailableMitras predicate). Mirror to Valkey.
await recomputeCapacityForMitra(mitraId)
// Mark this mitra's notification as accepted
await sql`
UPDATE chat_request_notifications

View File

@@ -2,6 +2,7 @@ import { getDb } from '../db/client.js'
import { publish } from '../plugins/valkey.js'
import { sendToSessionParticipant } from '../plugins/websocket.js'
import { sendPushNotification } from './notification.service.js'
import { recomputeCapacityForMitra } from './mitra-status.service.js'
import { UserType, SessionStatus, WsMessage, EndedBy } from '../constants.js'
const sql = getDb()
@@ -152,6 +153,7 @@ const onSessionExpired = async (sessionId) => {
RETURNING id, customer_id, mitra_id
`
if (!session) return
await recomputeCapacityForMitra(session.mitra_id)
// Notify customer — sees extend/close dialog; FCM fallback if WebSocket is down
const expiredData = { type: WsMessage.SESSION_EXPIRED, session_id: sessionId }
@@ -207,9 +209,10 @@ const autoCompleteIfStillClosing = async (sessionId) => {
ended_at = COALESCE(ended_at, NOW()),
ended_by = ${EndedBy.SYSTEM_AUTO_CLOSE}
WHERE id = ${sessionId} AND status = ${SessionStatus.CLOSING}
RETURNING id
RETURNING id, mitra_id
`
if (!updated) return
await recomputeCapacityForMitra(updated.mitra_id)
const data = { type: WsMessage.SESSION_COMPLETED, session_id: sessionId }
sendToSessionParticipant(sessionId, UserType.CUSTOMER, data)

View File

@@ -1,5 +1,6 @@
import { getDb } from '../db/client.js'
import { publish } from '../plugins/valkey.js'
import { recomputeCapacityForMitra } from './mitra-status.service.js'
import { UserType, SessionStatus, MessageStatus, WsMessage } from '../constants.js'
const sql = getDb()
@@ -48,6 +49,7 @@ export const endSession = async (sessionId, endedBy, userId) => {
code: 'SESSION_NOT_ACTIVE', statusCode: 409,
})
}
await recomputeCapacityForMitra(session.mitra_id)
// Notify both parties
await publish(`session:${sessionId}:status`, {
@@ -91,6 +93,11 @@ export const rerouteSession = async (sessionId, newMitraId) => {
WHERE id = ${sessionId}
RETURNING id, customer_id, mitra_id, status
`
// Both mitras' capacity flipped — recompute both.
await Promise.all([
recomputeCapacityForMitra(oldMitraId),
recomputeCapacityForMitra(newMitraId),
])
const [newMitra] = await sql`
SELECT display_name FROM mitras WHERE id = ${newMitraId}