When the operator sets require_mitra_ping=false, the auto-offline sweep early-returns (by design — "don't gate online status on heartbeat freshness"). The three Valkey read paths still gated on heartbeat freshness anyway, which trapped the system: sweep won't remove the mitra from mitras:online, but readers reject them as stale. The customer CTA stayed permanently disabled with no recovery. Fix all three to skip the heartbeat-freshness check when require_ping is off, matching the sweep's contract: - computeAvailabilityFromValkey (customer beacon) - isMitraReachable (extension service) - findAvailableMitrasFromValkey (pairing candidate finder) The Postgres fallbacks already did the right thing (is_online only, no heartbeat compare); this aligns the Valkey hot path. Also: PATCH /internal/config/mitra-ping now publishes config:invalidate for require_mitra_ping and mitra_stale_after_seconds, and the subscriber in mitra-status.service was widened to listen for both. Flipping the toggle in CC now busts the 10s availability snapshot immediately instead of waiting out the TTL. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
478 lines
18 KiB
JavaScript
478 lines
18 KiB
JavaScript
import { getDb } from '../db/client.js'
|
|
import { SessionStatus } from '../constants.js'
|
|
import { getMitraPingConfig, getMaxCustomersPerMitra } from './config.service.js'
|
|
import * as valkey from '../plugins/valkey.js'
|
|
import { subscribe, onValkeyReady } from '../plugins/valkey.js'
|
|
|
|
const sql = getDb()
|
|
|
|
// Per requirement/valkey-online-mirror-plan.md § Schema.
|
|
export const VK_MITRAS_ONLINE = 'mitras:online'
|
|
export const VK_MITRAS_DEACTIVATED = 'mitras:deactivated'
|
|
export const vkCapacityKey = (mitraId) => `mitra:capacity:${mitraId}`
|
|
export const vkHeartbeatKey = (mitraId) => `mitra:heartbeat:${mitraId}`
|
|
|
|
// Rebuilds Valkey availability state from Postgres. Called on backend startup,
|
|
// on Valkey reconnect (via onValkeyReady), and by the reconciliation sweep.
|
|
// Idempotent — DEL + bulk SADD/SET produces the same final state on every run.
|
|
export const seedFromPostgres = async () => {
|
|
try {
|
|
const [onlineRows, deactRows, capacityRows] = await Promise.all([
|
|
sql`SELECT mitra_id FROM mitra_online_status WHERE is_online = true`,
|
|
sql`SELECT id FROM mitras WHERE is_active = false`,
|
|
sql`
|
|
SELECT mitra_id, COUNT(*)::int AS c FROM chat_sessions
|
|
WHERE mitra_id IS NOT NULL
|
|
AND status IN (${SessionStatus.ACTIVE}, ${SessionStatus.PENDING_PAYMENT})
|
|
GROUP BY mitra_id
|
|
`,
|
|
])
|
|
|
|
const pipe = valkey.pipeline()
|
|
pipe.del(VK_MITRAS_ONLINE)
|
|
pipe.del(VK_MITRAS_DEACTIVATED)
|
|
|
|
const now = new Date().toISOString()
|
|
if (onlineRows.length) {
|
|
pipe.sadd(VK_MITRAS_ONLINE, ...onlineRows.map((r) => r.mitra_id))
|
|
// Seed heartbeats with NOW so the first sweep after restart doesn't
|
|
// mass-offline. Mitras refresh on their next ping anyway.
|
|
for (const r of onlineRows) pipe.set(vkHeartbeatKey(r.mitra_id), now)
|
|
// Reset capacity for currently-online mitras; overlay real counts below.
|
|
// Offline mitras' stale capacity keys don't affect reads (SDIFF excludes them).
|
|
for (const r of onlineRows) pipe.set(vkCapacityKey(r.mitra_id), 0)
|
|
}
|
|
if (deactRows.length) {
|
|
pipe.sadd(VK_MITRAS_DEACTIVATED, ...deactRows.map((r) => r.id))
|
|
}
|
|
for (const r of capacityRows) pipe.set(vkCapacityKey(r.mitra_id), r.c)
|
|
await pipe.exec()
|
|
|
|
console.log(
|
|
`[valkey-mirror] seed: ${onlineRows.length} online, ${deactRows.length} deactivated, ${capacityRows.length} with active sessions`,
|
|
)
|
|
} catch (err) {
|
|
console.error('[valkey-mirror] seed failed:', err)
|
|
}
|
|
}
|
|
|
|
// Re-seed on every Valkey reconnect (fires on initial connect too).
|
|
onValkeyReady(seedFromPostgres)
|
|
|
|
// --- Beacon snapshot cache (Valkey-backed, cluster-shared) ---
|
|
// `availability:snapshot` JSON `{available, count}`, TTL 10s. All backend
|
|
// instances share the same cache: one Valkey-driven compute per 10s
|
|
// cluster-wide regardless of how many customers are polling.
|
|
const AVAILABILITY_CACHE_KEY = 'availability:snapshot'
|
|
const AVAILABILITY_TTL_SECONDS = 10
|
|
|
|
export const invalidateAvailabilityCache = async () => {
|
|
try {
|
|
await valkey.del(AVAILABILITY_CACHE_KEY)
|
|
} catch (err) {
|
|
console.error('[valkey-mirror] invalidateAvailabilityCache failed:', err)
|
|
}
|
|
}
|
|
|
|
// Bust the shared cache when CC changes any config that the beacon snapshots
|
|
// over: max_customers_per_mitra (capacity gate), require_mitra_ping (whether
|
|
// stale heartbeats exclude candidates), mitra_stale_after_seconds (the gate's
|
|
// threshold itself).
|
|
const AVAILABILITY_CACHE_INVALIDATING_KEYS = new Set([
|
|
'max_customers_per_mitra',
|
|
'require_mitra_ping',
|
|
'mitra_stale_after_seconds',
|
|
])
|
|
|
|
let _subscribed = false
|
|
const ensureSubscribed = () => {
|
|
if (_subscribed) return
|
|
_subscribed = true
|
|
try {
|
|
subscribe('config:invalidate', (msg) => {
|
|
if (msg?.key && AVAILABILITY_CACHE_INVALIDATING_KEYS.has(msg.key)) {
|
|
invalidateAvailabilityCache()
|
|
}
|
|
})
|
|
} catch (_) {
|
|
// Valkey may not be reachable in some test contexts; non-fatal.
|
|
}
|
|
}
|
|
ensureSubscribed()
|
|
|
|
export const ensureStatusRow = async (mitraId) => {
|
|
await sql`
|
|
INSERT INTO mitra_online_status (mitra_id)
|
|
VALUES (${mitraId})
|
|
ON CONFLICT (mitra_id) DO NOTHING
|
|
`
|
|
}
|
|
|
|
// Best-effort Valkey writer. Postgres remains source of truth; a Valkey hiccup
|
|
// shouldn't fail the originating request — the reconciliation sweep heals drift.
|
|
const tryValkey = async (fn, label) => {
|
|
try { await fn() } catch (err) {
|
|
console.error(`[valkey-mirror] ${label} failed:`, err)
|
|
}
|
|
}
|
|
|
|
// Recompute `mitra:capacity:<id>` from chat_sessions truth. Called after every
|
|
// chat_session state change that could affect a mitra's occupied-slot count.
|
|
// Recompute-from-truth avoids the bookkeeping risks of per-transition INCR/DECR
|
|
// (double-counts, missed transitions across all the UPDATE sites in pairing,
|
|
// closure, extension, session-timer, session services).
|
|
export const recomputeCapacityForMitra = async (mitraId) => {
|
|
if (!mitraId) return
|
|
const [row] = await sql`
|
|
SELECT COUNT(*)::int AS c FROM chat_sessions
|
|
WHERE mitra_id = ${mitraId}
|
|
AND status IN (${SessionStatus.ACTIVE}, ${SessionStatus.PENDING_PAYMENT})
|
|
`
|
|
await tryValkey(
|
|
() => valkey.set(vkCapacityKey(mitraId), row.c),
|
|
`recomputeCapacity ${mitraId}`,
|
|
)
|
|
}
|
|
|
|
// Lookup mitra_id from the session, then recompute. Use this from UPDATE sites
|
|
// where the session's mitra_id may not be in local scope.
|
|
export const recomputeCapacityBySession = async (sessionId) => {
|
|
const [row] = await sql`SELECT mitra_id FROM chat_sessions WHERE id = ${sessionId}`
|
|
if (row?.mitra_id) await recomputeCapacityForMitra(row.mitra_id)
|
|
}
|
|
|
|
export const setOnline = async (mitraId) => {
|
|
await ensureStatusRow(mitraId)
|
|
const now = new Date()
|
|
await sql`
|
|
UPDATE mitra_online_status
|
|
SET is_online = true, last_online_at = ${now}, last_heartbeat_at = ${now}, updated_at = ${now}
|
|
WHERE mitra_id = ${mitraId}
|
|
`
|
|
await sql`
|
|
INSERT INTO mitra_online_logs (mitra_id, status) VALUES (${mitraId}, 'online')
|
|
`
|
|
|
|
await tryValkey(async () => {
|
|
const pipe = valkey.pipeline()
|
|
pipe.sadd(VK_MITRAS_ONLINE, mitraId)
|
|
pipe.set(vkHeartbeatKey(mitraId), now.toISOString())
|
|
await pipe.exec()
|
|
}, `setOnline ${mitraId}`)
|
|
|
|
invalidateAvailabilityCache()
|
|
}
|
|
|
|
export const setOffline = async (mitraId) => {
|
|
await ensureStatusRow(mitraId)
|
|
const now = new Date()
|
|
const [status] = await sql`
|
|
SELECT is_online FROM mitra_online_status WHERE mitra_id = ${mitraId}
|
|
`
|
|
if (!status?.is_online) return
|
|
|
|
await sql`
|
|
UPDATE mitra_online_status
|
|
SET is_online = false, last_offline_at = ${now}, updated_at = ${now}
|
|
WHERE mitra_id = ${mitraId}
|
|
`
|
|
await sql`
|
|
INSERT INTO mitra_online_logs (mitra_id, status) VALUES (${mitraId}, 'offline')
|
|
`
|
|
|
|
await tryValkey(async () => {
|
|
const pipe = valkey.pipeline()
|
|
pipe.srem(VK_MITRAS_ONLINE, mitraId)
|
|
pipe.del(vkHeartbeatKey(mitraId))
|
|
await pipe.exec()
|
|
}, `setOffline ${mitraId}`)
|
|
|
|
invalidateAvailabilityCache()
|
|
}
|
|
|
|
// Heartbeat hot path: Valkey-only. Per-ping Postgres UPDATE eliminated; the
|
|
// 60s batched heartbeat-mirror job (mirrorHeartbeatsToPostgres) writes
|
|
// `last_heartbeat_at` to Postgres for forensics/restart safety.
|
|
//
|
|
// NOTE: there is intentionally no `is_online = true` gate here (the old SQL
|
|
// UPDATE had one). The Valkey SET is global; if a mitra heartbeats while
|
|
// `is_online=false` in Postgres, their TTL key gets refreshed but they're
|
|
// still not in `mitras:online`, so blast eligibility is unchanged. The
|
|
// reconciliation sweep will clean up the orphan heartbeat key.
|
|
export const heartbeat = async (mitraId) => {
|
|
const now = new Date().toISOString()
|
|
await tryValkey(
|
|
() => valkey.set(vkHeartbeatKey(mitraId), now),
|
|
`heartbeat ${mitraId}`,
|
|
)
|
|
}
|
|
|
|
export const getStatus = async (mitraId) => {
|
|
await ensureStatusRow(mitraId)
|
|
const [status] = await sql`
|
|
SELECT is_online, last_online_at, last_offline_at, updated_at
|
|
FROM mitra_online_status
|
|
WHERE mitra_id = ${mitraId}
|
|
`
|
|
const pingConfig = await getMitraPingConfig()
|
|
return {
|
|
...status,
|
|
require_ping: pingConfig.require_ping,
|
|
// The app reads this to set its Timer.periodic interval. Backend-fixed
|
|
// (via env), not operator-tunable.
|
|
heartbeat_cadence_seconds: pingConfig.heartbeat_cadence_seconds,
|
|
}
|
|
}
|
|
|
|
export const getOnlineMitras = async () => {
|
|
const mitras = await sql`
|
|
SELECT m.id, m.display_name, m.phone, s.last_online_at, s.updated_at,
|
|
(SELECT COUNT(*) FROM chat_sessions cs
|
|
WHERE cs.mitra_id = m.id AND cs.status IN (${SessionStatus.ACTIVE}, ${SessionStatus.PENDING_PAYMENT})) AS active_session_count
|
|
FROM mitras m
|
|
INNER JOIN mitra_online_status s ON s.mitra_id = m.id
|
|
WHERE s.is_online = true AND m.is_active = true
|
|
ORDER BY s.last_online_at DESC
|
|
`
|
|
return mitras
|
|
}
|
|
|
|
export const getOnlineLogs = async (mitraId, { page = 1, limit = 50 } = {}) => {
|
|
const offset = (page - 1) * limit
|
|
const items = await sql`
|
|
SELECT id, status, timestamp
|
|
FROM mitra_online_logs
|
|
WHERE mitra_id = ${mitraId}
|
|
ORDER BY timestamp DESC
|
|
LIMIT ${limit} OFFSET ${offset}
|
|
`
|
|
const [{ count }] = await sql`
|
|
SELECT COUNT(*) FROM mitra_online_logs WHERE mitra_id = ${mitraId}
|
|
`
|
|
return { items, total: Number(count), page, limit }
|
|
}
|
|
|
|
// Valkey-driven: enumerate mitras:online, read each heartbeat timestamp from
|
|
// Valkey, find stales, then bulk-flip Postgres + clean up Valkey.
|
|
//
|
|
// Failure semantics: if any Valkey op throws, the sweep aborts entirely. We
|
|
// never mass-offline mitras via a Postgres scan because Valkey is unreachable
|
|
// — that would risk false-offlining a fleet during a Valkey hiccup.
|
|
export const autoOfflineStaleMitras = async () => {
|
|
const pingConfig = await getMitraPingConfig()
|
|
if (!pingConfig.require_ping) return 0
|
|
|
|
let onlineIds, heartbeatValues
|
|
try {
|
|
onlineIds = await valkey.smembers(VK_MITRAS_ONLINE)
|
|
if (!onlineIds.length) return 0
|
|
const pipe = valkey.pipeline()
|
|
for (const id of onlineIds) pipe.get(vkHeartbeatKey(id))
|
|
const results = await pipe.exec()
|
|
heartbeatValues = results.map((r) => r[1])
|
|
} catch (err) {
|
|
console.warn('[auto-offline] valkey unavailable, skipping this tick:', err.message)
|
|
return 0
|
|
}
|
|
|
|
const cutoff = Date.now() - pingConfig.stale_after_seconds * 1000
|
|
const stale = []
|
|
for (let i = 0; i < onlineIds.length; i++) {
|
|
const ts = heartbeatValues[i]
|
|
if (!ts || Date.parse(ts) < cutoff) stale.push(onlineIds[i])
|
|
}
|
|
if (!stale.length) return 0
|
|
|
|
await sql`
|
|
UPDATE mitra_online_status
|
|
SET is_online = false, last_offline_at = NOW(), updated_at = NOW()
|
|
WHERE mitra_id = ANY(${sql.array(stale)}::uuid[]) AND is_online = true
|
|
`
|
|
for (const id of stale) {
|
|
await sql`INSERT INTO mitra_online_logs (mitra_id, status) VALUES (${id}, 'offline')`
|
|
}
|
|
await tryValkey(async () => {
|
|
const cleanup = valkey.pipeline()
|
|
cleanup.srem(VK_MITRAS_ONLINE, ...stale)
|
|
for (const id of stale) cleanup.del(vkHeartbeatKey(id))
|
|
await cleanup.exec()
|
|
}, `auto-offline cleanup (${stale.length} stale)`)
|
|
|
|
invalidateAvailabilityCache()
|
|
return stale.length
|
|
}
|
|
|
|
// Batched mirror: Valkey heartbeat timestamps → Postgres `last_heartbeat_at`.
|
|
// Runs every HEARTBEAT_MIRROR_INTERVAL_SECONDS (default 60). One UNNEST UPDATE
|
|
// regardless of online count. Idempotent — latest timestamp wins; multiple
|
|
// instances running concurrently is fine (no leader election needed).
|
|
export const mirrorHeartbeatsToPostgres = async () => {
|
|
let onlineIds, heartbeatValues
|
|
try {
|
|
onlineIds = await valkey.smembers(VK_MITRAS_ONLINE)
|
|
if (!onlineIds.length) return 0
|
|
const pipe = valkey.pipeline()
|
|
for (const id of onlineIds) pipe.get(vkHeartbeatKey(id))
|
|
const results = await pipe.exec()
|
|
heartbeatValues = results.map((r) => r[1])
|
|
} catch (err) {
|
|
console.warn('[heartbeat-mirror] valkey unavailable, skipping:', err.message)
|
|
return 0
|
|
}
|
|
|
|
const ids = []
|
|
const ts = []
|
|
for (let i = 0; i < onlineIds.length; i++) {
|
|
if (heartbeatValues[i]) {
|
|
ids.push(onlineIds[i])
|
|
ts.push(heartbeatValues[i])
|
|
}
|
|
}
|
|
if (!ids.length) return 0
|
|
|
|
await sql`
|
|
UPDATE mitra_online_status m
|
|
SET last_heartbeat_at = u.ts::timestamptz, updated_at = NOW()
|
|
FROM (
|
|
SELECT * FROM UNNEST(${sql.array(ids)}::uuid[], ${sql.array(ts)}::text[]) AS t(mitra_id, ts)
|
|
) u
|
|
WHERE m.mitra_id = u.mitra_id
|
|
`
|
|
return ids.length
|
|
}
|
|
|
|
/**
|
|
* Customer-home availability check, cached in-memory for AVAILABILITY_TTL_MS.
|
|
*
|
|
* Returns { available, count } where:
|
|
* - available = true iff at least one mitra is online AND below max_customers_per_mitra
|
|
* - count is the number of qualifying mitras (CC/debug only — never expose to customer UI)
|
|
*
|
|
* The 5s-poll endpoint backed by this function MUST NOT issue per-poll DB queries.
|
|
* The 10s TTL caps DB load to ~6 queries/min total regardless of poller count.
|
|
*
|
|
* Note: today the source of truth for online status + active session counts is Postgres
|
|
* (mitra_online_status + chat_sessions). A future refactor can mirror these into Valkey
|
|
* sets/hashes (matching the existing memory item "Session Timer Scaling"); the contract
|
|
* of this function — Valkey/cache reads only on the hot path — stays the same.
|
|
*/
|
|
const computeAvailabilityFromValkey = async () => {
|
|
const { max_customers_per_mitra } = await getMaxCustomersPerMitra()
|
|
const { require_ping, stale_after_seconds } = await getMitraPingConfig()
|
|
|
|
const candidates = await valkey.sdiff(VK_MITRAS_ONLINE, VK_MITRAS_DEACTIVATED)
|
|
if (!candidates.length) return { available: false, count: 0 }
|
|
|
|
const pipe = valkey.pipeline()
|
|
for (const id of candidates) {
|
|
pipe.get(vkCapacityKey(id))
|
|
if (require_ping) pipe.get(vkHeartbeatKey(id))
|
|
}
|
|
const results = await pipe.exec()
|
|
const stride = require_ping ? 2 : 1
|
|
|
|
const cutoff = Date.now() - stale_after_seconds * 1000
|
|
let count = 0
|
|
for (let i = 0; i < candidates.length; i++) {
|
|
const capacity = Number(results[i * stride][1] ?? 0)
|
|
if (capacity >= max_customers_per_mitra) continue
|
|
// When the operator has turned `require_mitra_ping` off, the auto-offline
|
|
// sweep is also a no-op (see autoOfflineStaleMitras early-return). Mitras
|
|
// stay in `mitras:online` until they explicitly toggle offline, so reading
|
|
// a stale heartbeat here doesn't mean "unreachable" — it means "we aren't
|
|
// tracking liveness." Skip the freshness gate to stay consistent with the
|
|
// sweep, and to match what the Postgres fallback returns (is_online only).
|
|
if (require_ping) {
|
|
const heartbeat = results[i * stride + 1][1]
|
|
if (!heartbeat || Date.parse(heartbeat) < cutoff) continue
|
|
}
|
|
count++
|
|
}
|
|
return { available: count > 0, count }
|
|
}
|
|
|
|
const computeAvailabilityFromPostgres = async () => {
|
|
const { max_customers_per_mitra } = await getMaxCustomersPerMitra()
|
|
const [{ count }] = await sql`
|
|
SELECT COUNT(*)::int AS count
|
|
FROM mitras m
|
|
INNER JOIN mitra_online_status s ON s.mitra_id = m.id
|
|
WHERE m.is_active = true
|
|
AND s.is_online = true
|
|
AND (
|
|
SELECT COUNT(*) FROM chat_sessions cs
|
|
WHERE cs.mitra_id = m.id
|
|
AND cs.status IN (${SessionStatus.ACTIVE}, ${SessionStatus.PENDING_PAYMENT})
|
|
) < ${max_customers_per_mitra}
|
|
`
|
|
return { available: count > 0, count }
|
|
}
|
|
|
|
export const countAvailableMitrasFromCache = async () => {
|
|
try {
|
|
const cached = await valkey.get(AVAILABILITY_CACHE_KEY)
|
|
if (cached) return JSON.parse(cached)
|
|
|
|
const snapshot = await computeAvailabilityFromValkey()
|
|
await valkey.getValkeyClient().setex(AVAILABILITY_CACHE_KEY, AVAILABILITY_TTL_SECONDS, JSON.stringify(snapshot))
|
|
return snapshot
|
|
} catch (err) {
|
|
console.warn('[countAvailableMitras] valkey unavailable, falling back to Postgres:', err.message)
|
|
return computeAvailabilityFromPostgres()
|
|
}
|
|
}
|
|
|
|
/**
|
|
* Mitra-reachable check: in `mitras:online` SET AND heartbeat is fresh.
|
|
* Falls back to a Postgres `is_online` read if Valkey is unreachable; the
|
|
* fallback skips the heartbeat-freshness check (sweep takes care of stale rows
|
|
* within `stale_after_seconds + sweep_cadence`).
|
|
*
|
|
* When `require_mitra_ping=false`, both the auto-offline sweep AND this check
|
|
* skip the heartbeat gate so the read path matches the sweep's contract: a
|
|
* mitra stays "reachable" until they explicitly toggle offline.
|
|
*/
|
|
export const isMitraReachable = async (mitraId) => {
|
|
try {
|
|
const inSet = await valkey.sismember(VK_MITRAS_ONLINE, mitraId)
|
|
if (!inSet) return false
|
|
const { require_ping, stale_after_seconds } = await getMitraPingConfig()
|
|
if (!require_ping) return true
|
|
const heartbeat = await valkey.get(vkHeartbeatKey(mitraId))
|
|
if (!heartbeat) return false
|
|
return Date.parse(heartbeat) >= Date.now() - stale_after_seconds * 1000
|
|
} catch (err) {
|
|
console.warn('[isMitraReachable] valkey unavailable, falling back to DB:', err.message)
|
|
const [row] = await sql`SELECT is_online FROM mitra_online_status WHERE mitra_id = ${mitraId}`
|
|
return Boolean(row?.is_online)
|
|
}
|
|
}
|
|
|
|
/**
|
|
* Returns active session count for a mitra (sessions that count toward max_customers_per_mitra).
|
|
*/
|
|
export const getMitraActiveSessionCount = async (mitraId) => {
|
|
const [{ count }] = await sql`
|
|
SELECT COUNT(*)::int AS count FROM chat_sessions
|
|
WHERE mitra_id = ${mitraId}
|
|
AND status IN (${SessionStatus.ACTIVE}, ${SessionStatus.PENDING_PAYMENT})
|
|
`
|
|
return count
|
|
}
|
|
|
|
/**
|
|
* True iff this mitra is currently in an ACTIVE chat with this specific customer.
|
|
* Used by targeted "Curhat lagi" pre-check: a mitra at-capacity but mid-session
|
|
* with the requesting customer is still allowed to receive a returning-chat card.
|
|
*/
|
|
export const isMitraInActiveSessionWithCustomer = async (mitraId, customerId) => {
|
|
const [row] = await sql`
|
|
SELECT id FROM chat_sessions
|
|
WHERE mitra_id = ${mitraId}
|
|
AND customer_id = ${customerId}
|
|
AND status IN (${SessionStatus.ACTIVE}, ${SessionStatus.EXTENDING}, ${SessionStatus.CLOSING})
|
|
LIMIT 1
|
|
`
|
|
return Boolean(row)
|
|
}
|