import { getDb } from '../db/client.js' import { SessionStatus } from '../constants.js' import { getMitraPingConfig, getMaxCustomersPerMitra } from './config.service.js' import * as valkey from '../plugins/valkey.js' import { subscribe, onValkeyReady } from '../plugins/valkey.js' const sql = getDb() // Per requirement/valkey-online-mirror-plan.md § Schema. export const VK_MITRAS_ONLINE = 'mitras:online' export const VK_MITRAS_DEACTIVATED = 'mitras:deactivated' export const vkCapacityKey = (mitraId) => `mitra:capacity:${mitraId}` export const vkHeartbeatKey = (mitraId) => `mitra:heartbeat:${mitraId}` // Rebuilds Valkey availability state from Postgres. Called on backend startup, // on Valkey reconnect (via onValkeyReady), and by the reconciliation sweep. // Idempotent — DEL + bulk SADD/SET produces the same final state on every run. export const seedFromPostgres = async () => { try { const [onlineRows, deactRows, capacityRows] = await Promise.all([ sql`SELECT mitra_id FROM mitra_online_status WHERE is_online = true`, sql`SELECT id FROM mitras WHERE is_active = false`, sql` SELECT mitra_id, COUNT(*)::int AS c FROM chat_sessions WHERE mitra_id IS NOT NULL AND status IN (${SessionStatus.ACTIVE}, ${SessionStatus.PENDING_PAYMENT}) GROUP BY mitra_id `, ]) const pipe = valkey.pipeline() pipe.del(VK_MITRAS_ONLINE) pipe.del(VK_MITRAS_DEACTIVATED) const now = new Date().toISOString() if (onlineRows.length) { pipe.sadd(VK_MITRAS_ONLINE, ...onlineRows.map((r) => r.mitra_id)) // Seed heartbeats with NOW so the first sweep after restart doesn't // mass-offline. Mitras refresh on their next ping anyway. for (const r of onlineRows) pipe.set(vkHeartbeatKey(r.mitra_id), now) // Reset capacity for currently-online mitras; overlay real counts below. // Offline mitras' stale capacity keys don't affect reads (SDIFF excludes them). for (const r of onlineRows) pipe.set(vkCapacityKey(r.mitra_id), 0) } if (deactRows.length) { pipe.sadd(VK_MITRAS_DEACTIVATED, ...deactRows.map((r) => r.id)) } for (const r of capacityRows) pipe.set(vkCapacityKey(r.mitra_id), r.c) await pipe.exec() console.log( `[valkey-mirror] seed: ${onlineRows.length} online, ${deactRows.length} deactivated, ${capacityRows.length} with active sessions`, ) } catch (err) { console.error('[valkey-mirror] seed failed:', err) } } // Re-seed on every Valkey reconnect (fires on initial connect too). onValkeyReady(seedFromPostgres) // --- Beacon snapshot cache (Valkey-backed, cluster-shared) --- // `availability:snapshot` JSON `{available, count}`, TTL 10s. All backend // instances share the same cache: one Valkey-driven compute per 10s // cluster-wide regardless of how many customers are polling. const AVAILABILITY_CACHE_KEY = 'availability:snapshot' const AVAILABILITY_TTL_SECONDS = 10 export const invalidateAvailabilityCache = async () => { try { await valkey.del(AVAILABILITY_CACHE_KEY) } catch (err) { console.error('[valkey-mirror] invalidateAvailabilityCache failed:', err) } } // Bust the shared cache when CC changes any config that the beacon snapshots // over: max_customers_per_mitra (capacity gate), require_mitra_ping (whether // stale heartbeats exclude candidates), mitra_stale_after_seconds (the gate's // threshold itself). const AVAILABILITY_CACHE_INVALIDATING_KEYS = new Set([ 'max_customers_per_mitra', 'require_mitra_ping', 'mitra_stale_after_seconds', ]) let _subscribed = false const ensureSubscribed = () => { if (_subscribed) return _subscribed = true try { subscribe('config:invalidate', (msg) => { if (msg?.key && AVAILABILITY_CACHE_INVALIDATING_KEYS.has(msg.key)) { invalidateAvailabilityCache() } }) } catch (_) { // Valkey may not be reachable in some test contexts; non-fatal. } } ensureSubscribed() export const ensureStatusRow = async (mitraId) => { await sql` INSERT INTO mitra_online_status (mitra_id) VALUES (${mitraId}) ON CONFLICT (mitra_id) DO NOTHING ` } // Best-effort Valkey writer. Postgres remains source of truth; a Valkey hiccup // shouldn't fail the originating request — the reconciliation sweep heals drift. const tryValkey = async (fn, label) => { try { await fn() } catch (err) { console.error(`[valkey-mirror] ${label} failed:`, err) } } // Recompute `mitra:capacity:` from chat_sessions truth. Called after every // chat_session state change that could affect a mitra's occupied-slot count. // Recompute-from-truth avoids the bookkeeping risks of per-transition INCR/DECR // (double-counts, missed transitions across all the UPDATE sites in pairing, // closure, extension, session-timer, session services). export const recomputeCapacityForMitra = async (mitraId) => { if (!mitraId) return const [row] = await sql` SELECT COUNT(*)::int AS c FROM chat_sessions WHERE mitra_id = ${mitraId} AND status IN (${SessionStatus.ACTIVE}, ${SessionStatus.PENDING_PAYMENT}) ` await tryValkey( () => valkey.set(vkCapacityKey(mitraId), row.c), `recomputeCapacity ${mitraId}`, ) } // Lookup mitra_id from the session, then recompute. Use this from UPDATE sites // where the session's mitra_id may not be in local scope. export const recomputeCapacityBySession = async (sessionId) => { const [row] = await sql`SELECT mitra_id FROM chat_sessions WHERE id = ${sessionId}` if (row?.mitra_id) await recomputeCapacityForMitra(row.mitra_id) } export const setOnline = async (mitraId) => { await ensureStatusRow(mitraId) const now = new Date() await sql` UPDATE mitra_online_status SET is_online = true, last_online_at = ${now}, last_heartbeat_at = ${now}, updated_at = ${now} WHERE mitra_id = ${mitraId} ` await sql` INSERT INTO mitra_online_logs (mitra_id, status) VALUES (${mitraId}, 'online') ` await tryValkey(async () => { const pipe = valkey.pipeline() pipe.sadd(VK_MITRAS_ONLINE, mitraId) pipe.set(vkHeartbeatKey(mitraId), now.toISOString()) await pipe.exec() }, `setOnline ${mitraId}`) invalidateAvailabilityCache() } export const setOffline = async (mitraId) => { await ensureStatusRow(mitraId) const now = new Date() const [status] = await sql` SELECT is_online FROM mitra_online_status WHERE mitra_id = ${mitraId} ` if (!status?.is_online) return await sql` UPDATE mitra_online_status SET is_online = false, last_offline_at = ${now}, updated_at = ${now} WHERE mitra_id = ${mitraId} ` await sql` INSERT INTO mitra_online_logs (mitra_id, status) VALUES (${mitraId}, 'offline') ` await tryValkey(async () => { const pipe = valkey.pipeline() pipe.srem(VK_MITRAS_ONLINE, mitraId) pipe.del(vkHeartbeatKey(mitraId)) await pipe.exec() }, `setOffline ${mitraId}`) invalidateAvailabilityCache() } // Heartbeat hot path: Valkey-only. Per-ping Postgres UPDATE eliminated; the // 60s batched heartbeat-mirror job (mirrorHeartbeatsToPostgres) writes // `last_heartbeat_at` to Postgres for forensics/restart safety. // // NOTE: there is intentionally no `is_online = true` gate here (the old SQL // UPDATE had one). The Valkey SET is global; if a mitra heartbeats while // `is_online=false` in Postgres, their TTL key gets refreshed but they're // still not in `mitras:online`, so blast eligibility is unchanged. The // reconciliation sweep will clean up the orphan heartbeat key. export const heartbeat = async (mitraId) => { const now = new Date().toISOString() await tryValkey( () => valkey.set(vkHeartbeatKey(mitraId), now), `heartbeat ${mitraId}`, ) } export const getStatus = async (mitraId) => { await ensureStatusRow(mitraId) const [status] = await sql` SELECT is_online, last_online_at, last_offline_at, updated_at FROM mitra_online_status WHERE mitra_id = ${mitraId} ` const pingConfig = await getMitraPingConfig() return { ...status, require_ping: pingConfig.require_ping, // The app reads this to set its Timer.periodic interval. Backend-fixed // (via env), not operator-tunable. heartbeat_cadence_seconds: pingConfig.heartbeat_cadence_seconds, } } export const getOnlineMitras = async () => { const mitras = await sql` SELECT m.id, m.display_name, m.phone, s.last_online_at, s.updated_at, (SELECT COUNT(*) FROM chat_sessions cs WHERE cs.mitra_id = m.id AND cs.status IN (${SessionStatus.ACTIVE}, ${SessionStatus.PENDING_PAYMENT})) AS active_session_count FROM mitras m INNER JOIN mitra_online_status s ON s.mitra_id = m.id WHERE s.is_online = true AND m.is_active = true ORDER BY s.last_online_at DESC ` return mitras } export const getOnlineLogs = async (mitraId, { page = 1, limit = 50 } = {}) => { const offset = (page - 1) * limit const items = await sql` SELECT id, status, timestamp FROM mitra_online_logs WHERE mitra_id = ${mitraId} ORDER BY timestamp DESC LIMIT ${limit} OFFSET ${offset} ` const [{ count }] = await sql` SELECT COUNT(*) FROM mitra_online_logs WHERE mitra_id = ${mitraId} ` return { items, total: Number(count), page, limit } } // Valkey-driven: enumerate mitras:online, read each heartbeat timestamp from // Valkey, find stales, then bulk-flip Postgres + clean up Valkey. // // Failure semantics: if any Valkey op throws, the sweep aborts entirely. We // never mass-offline mitras via a Postgres scan because Valkey is unreachable // — that would risk false-offlining a fleet during a Valkey hiccup. export const autoOfflineStaleMitras = async () => { const pingConfig = await getMitraPingConfig() if (!pingConfig.require_ping) return 0 let onlineIds, heartbeatValues try { onlineIds = await valkey.smembers(VK_MITRAS_ONLINE) if (!onlineIds.length) return 0 const pipe = valkey.pipeline() for (const id of onlineIds) pipe.get(vkHeartbeatKey(id)) const results = await pipe.exec() heartbeatValues = results.map((r) => r[1]) } catch (err) { console.warn('[auto-offline] valkey unavailable, skipping this tick:', err.message) return 0 } const cutoff = Date.now() - pingConfig.stale_after_seconds * 1000 const stale = [] for (let i = 0; i < onlineIds.length; i++) { const ts = heartbeatValues[i] if (!ts || Date.parse(ts) < cutoff) stale.push(onlineIds[i]) } if (!stale.length) return 0 await sql` UPDATE mitra_online_status SET is_online = false, last_offline_at = NOW(), updated_at = NOW() WHERE mitra_id = ANY(${sql.array(stale)}::uuid[]) AND is_online = true ` for (const id of stale) { await sql`INSERT INTO mitra_online_logs (mitra_id, status) VALUES (${id}, 'offline')` } await tryValkey(async () => { const cleanup = valkey.pipeline() cleanup.srem(VK_MITRAS_ONLINE, ...stale) for (const id of stale) cleanup.del(vkHeartbeatKey(id)) await cleanup.exec() }, `auto-offline cleanup (${stale.length} stale)`) invalidateAvailabilityCache() return stale.length } // Batched mirror: Valkey heartbeat timestamps → Postgres `last_heartbeat_at`. // Runs every HEARTBEAT_MIRROR_INTERVAL_SECONDS (default 60). One UNNEST UPDATE // regardless of online count. Idempotent — latest timestamp wins; multiple // instances running concurrently is fine (no leader election needed). export const mirrorHeartbeatsToPostgres = async () => { let onlineIds, heartbeatValues try { onlineIds = await valkey.smembers(VK_MITRAS_ONLINE) if (!onlineIds.length) return 0 const pipe = valkey.pipeline() for (const id of onlineIds) pipe.get(vkHeartbeatKey(id)) const results = await pipe.exec() heartbeatValues = results.map((r) => r[1]) } catch (err) { console.warn('[heartbeat-mirror] valkey unavailable, skipping:', err.message) return 0 } const ids = [] const ts = [] for (let i = 0; i < onlineIds.length; i++) { if (heartbeatValues[i]) { ids.push(onlineIds[i]) ts.push(heartbeatValues[i]) } } if (!ids.length) return 0 await sql` UPDATE mitra_online_status m SET last_heartbeat_at = u.ts::timestamptz, updated_at = NOW() FROM ( SELECT * FROM UNNEST(${sql.array(ids)}::uuid[], ${sql.array(ts)}::text[]) AS t(mitra_id, ts) ) u WHERE m.mitra_id = u.mitra_id ` return ids.length } /** * Customer-home availability check, cached in-memory for AVAILABILITY_TTL_MS. * * Returns { available, count } where: * - available = true iff at least one mitra is online AND below max_customers_per_mitra * - count is the number of qualifying mitras (CC/debug only — never expose to customer UI) * * The 5s-poll endpoint backed by this function MUST NOT issue per-poll DB queries. * The 10s TTL caps DB load to ~6 queries/min total regardless of poller count. * * Note: today the source of truth for online status + active session counts is Postgres * (mitra_online_status + chat_sessions). A future refactor can mirror these into Valkey * sets/hashes (matching the existing memory item "Session Timer Scaling"); the contract * of this function — Valkey/cache reads only on the hot path — stays the same. */ const computeAvailabilityFromValkey = async () => { const { max_customers_per_mitra } = await getMaxCustomersPerMitra() const { require_ping, stale_after_seconds } = await getMitraPingConfig() const candidates = await valkey.sdiff(VK_MITRAS_ONLINE, VK_MITRAS_DEACTIVATED) if (!candidates.length) return { available: false, count: 0 } const pipe = valkey.pipeline() for (const id of candidates) { pipe.get(vkCapacityKey(id)) if (require_ping) pipe.get(vkHeartbeatKey(id)) } const results = await pipe.exec() const stride = require_ping ? 2 : 1 const cutoff = Date.now() - stale_after_seconds * 1000 let count = 0 for (let i = 0; i < candidates.length; i++) { const capacity = Number(results[i * stride][1] ?? 0) if (capacity >= max_customers_per_mitra) continue // When the operator has turned `require_mitra_ping` off, the auto-offline // sweep is also a no-op (see autoOfflineStaleMitras early-return). Mitras // stay in `mitras:online` until they explicitly toggle offline, so reading // a stale heartbeat here doesn't mean "unreachable" — it means "we aren't // tracking liveness." Skip the freshness gate to stay consistent with the // sweep, and to match what the Postgres fallback returns (is_online only). if (require_ping) { const heartbeat = results[i * stride + 1][1] if (!heartbeat || Date.parse(heartbeat) < cutoff) continue } count++ } return { available: count > 0, count } } const computeAvailabilityFromPostgres = async () => { const { max_customers_per_mitra } = await getMaxCustomersPerMitra() const [{ count }] = await sql` SELECT COUNT(*)::int AS count FROM mitras m INNER JOIN mitra_online_status s ON s.mitra_id = m.id WHERE m.is_active = true AND s.is_online = true AND ( SELECT COUNT(*) FROM chat_sessions cs WHERE cs.mitra_id = m.id AND cs.status IN (${SessionStatus.ACTIVE}, ${SessionStatus.PENDING_PAYMENT}) ) < ${max_customers_per_mitra} ` return { available: count > 0, count } } export const countAvailableMitrasFromCache = async () => { try { const cached = await valkey.get(AVAILABILITY_CACHE_KEY) if (cached) return JSON.parse(cached) const snapshot = await computeAvailabilityFromValkey() await valkey.getValkeyClient().setex(AVAILABILITY_CACHE_KEY, AVAILABILITY_TTL_SECONDS, JSON.stringify(snapshot)) return snapshot } catch (err) { console.warn('[countAvailableMitras] valkey unavailable, falling back to Postgres:', err.message) return computeAvailabilityFromPostgres() } } /** * Mitra-reachable check: in `mitras:online` SET AND heartbeat is fresh. * Falls back to a Postgres `is_online` read if Valkey is unreachable; the * fallback skips the heartbeat-freshness check (sweep takes care of stale rows * within `stale_after_seconds + sweep_cadence`). * * When `require_mitra_ping=false`, both the auto-offline sweep AND this check * skip the heartbeat gate so the read path matches the sweep's contract: a * mitra stays "reachable" until they explicitly toggle offline. */ export const isMitraReachable = async (mitraId) => { try { const inSet = await valkey.sismember(VK_MITRAS_ONLINE, mitraId) if (!inSet) return false const { require_ping, stale_after_seconds } = await getMitraPingConfig() if (!require_ping) return true const heartbeat = await valkey.get(vkHeartbeatKey(mitraId)) if (!heartbeat) return false return Date.parse(heartbeat) >= Date.now() - stale_after_seconds * 1000 } catch (err) { console.warn('[isMitraReachable] valkey unavailable, falling back to DB:', err.message) const [row] = await sql`SELECT is_online FROM mitra_online_status WHERE mitra_id = ${mitraId}` return Boolean(row?.is_online) } } /** * Returns active session count for a mitra (sessions that count toward max_customers_per_mitra). */ export const getMitraActiveSessionCount = async (mitraId) => { const [{ count }] = await sql` SELECT COUNT(*)::int AS count FROM chat_sessions WHERE mitra_id = ${mitraId} AND status IN (${SessionStatus.ACTIVE}, ${SessionStatus.PENDING_PAYMENT}) ` return count } /** * True iff this mitra is currently in an ACTIVE chat with this specific customer. * Used by targeted "Curhat lagi" pre-check: a mitra at-capacity but mid-session * with the requesting customer is still allowed to receive a returning-chat card. */ export const isMitraInActiveSessionWithCustomer = async (mitraId, customerId) => { const [row] = await sql` SELECT id FROM chat_sessions WHERE mitra_id = ${mitraId} AND customer_id = ${customerId} AND status IN (${SessionStatus.ACTIVE}, ${SessionStatus.EXTENDING}, ${SessionStatus.CLOSING}) LIMIT 1 ` return Boolean(row) }