diff --git a/backend/src/routes/internal/config.routes.js b/backend/src/routes/internal/config.routes.js index 4191566..5cb581b 100644 --- a/backend/src/routes/internal/config.routes.js +++ b/backend/src/routes/internal/config.routes.js @@ -190,6 +190,14 @@ export const internalConfigRoutes = async (app) => { } } const config = await setMitraPingConfig({ require_ping, stale_after_seconds }) + // Bust the customer availability cache on any instance — subscribers in + // mitra-status.service.js listen for these keys and call invalidate. + if (require_ping !== undefined) { + await publishConfigInvalidate('require_mitra_ping') + } + if (stale_after_seconds !== undefined) { + await publishConfigInvalidate('mitra_stale_after_seconds') + } return reply.send({ success: true, data: config }) }) diff --git a/backend/src/services/mitra-status.service.js b/backend/src/services/mitra-status.service.js index 5ea6eac..6016bdb 100644 --- a/backend/src/services/mitra-status.service.js +++ b/backend/src/services/mitra-status.service.js @@ -74,14 +74,23 @@ export const invalidateAvailabilityCache = async () => { } } -// Bust the shared cache when CC changes max_customers_per_mitra (any instance). +// Bust the shared cache when CC changes any config that the beacon snapshots +// over: max_customers_per_mitra (capacity gate), require_mitra_ping (whether +// stale heartbeats exclude candidates), mitra_stale_after_seconds (the gate's +// threshold itself). +const AVAILABILITY_CACHE_INVALIDATING_KEYS = new Set([ + 'max_customers_per_mitra', + 'require_mitra_ping', + 'mitra_stale_after_seconds', +]) + let _subscribed = false const ensureSubscribed = () => { if (_subscribed) return _subscribed = true try { subscribe('config:invalidate', (msg) => { - if (msg?.key === 'max_customers_per_mitra') { + if (msg?.key && AVAILABILITY_CACHE_INVALIDATING_KEYS.has(msg.key)) { invalidateAvailabilityCache() } }) @@ -349,7 +358,7 @@ export const mirrorHeartbeatsToPostgres = async () => { */ const computeAvailabilityFromValkey = async () => { const { max_customers_per_mitra } = await getMaxCustomersPerMitra() - const { stale_after_seconds } = await getMitraPingConfig() + const { require_ping, stale_after_seconds } = await getMitraPingConfig() const candidates = await valkey.sdiff(VK_MITRAS_ONLINE, VK_MITRAS_DEACTIVATED) if (!candidates.length) return { available: false, count: 0 } @@ -357,17 +366,26 @@ const computeAvailabilityFromValkey = async () => { const pipe = valkey.pipeline() for (const id of candidates) { pipe.get(vkCapacityKey(id)) - pipe.get(vkHeartbeatKey(id)) + if (require_ping) pipe.get(vkHeartbeatKey(id)) } const results = await pipe.exec() + const stride = require_ping ? 2 : 1 const cutoff = Date.now() - stale_after_seconds * 1000 let count = 0 for (let i = 0; i < candidates.length; i++) { - const capacity = Number(results[i * 2][1] ?? 0) - const heartbeat = results[i * 2 + 1][1] + const capacity = Number(results[i * stride][1] ?? 0) if (capacity >= max_customers_per_mitra) continue - if (!heartbeat || Date.parse(heartbeat) < cutoff) continue + // When the operator has turned `require_mitra_ping` off, the auto-offline + // sweep is also a no-op (see autoOfflineStaleMitras early-return). Mitras + // stay in `mitras:online` until they explicitly toggle offline, so reading + // a stale heartbeat here doesn't mean "unreachable" — it means "we aren't + // tracking liveness." Skip the freshness gate to stay consistent with the + // sweep, and to match what the Postgres fallback returns (is_online only). + if (require_ping) { + const heartbeat = results[i * stride + 1][1] + if (!heartbeat || Date.parse(heartbeat) < cutoff) continue + } count++ } return { available: count > 0, count } @@ -409,14 +427,19 @@ export const countAvailableMitrasFromCache = async () => { * Falls back to a Postgres `is_online` read if Valkey is unreachable; the * fallback skips the heartbeat-freshness check (sweep takes care of stale rows * within `stale_after_seconds + sweep_cadence`). + * + * When `require_mitra_ping=false`, both the auto-offline sweep AND this check + * skip the heartbeat gate so the read path matches the sweep's contract: a + * mitra stays "reachable" until they explicitly toggle offline. */ export const isMitraReachable = async (mitraId) => { try { const inSet = await valkey.sismember(VK_MITRAS_ONLINE, mitraId) if (!inSet) return false + const { require_ping, stale_after_seconds } = await getMitraPingConfig() + if (!require_ping) return true const heartbeat = await valkey.get(vkHeartbeatKey(mitraId)) if (!heartbeat) return false - const { stale_after_seconds } = await getMitraPingConfig() return Date.parse(heartbeat) >= Date.now() - stale_after_seconds * 1000 } catch (err) { console.warn('[isMitraReachable] valkey unavailable, falling back to DB:', err.message) diff --git a/backend/src/services/pairing.service.js b/backend/src/services/pairing.service.js index d4e1215..0ce721e 100644 --- a/backend/src/services/pairing.service.js +++ b/backend/src/services/pairing.service.js @@ -83,7 +83,7 @@ const notifyCustomer = async (customerId, data) => { // Postgres fallback runs if any Valkey op throws (full JOIN as before). const findAvailableMitrasFromValkey = async () => { const { max_customers_per_mitra } = await getMaxCustomersPerMitra() - const { stale_after_seconds } = await getMitraPingConfig() + const { require_ping, stale_after_seconds } = await getMitraPingConfig() const candidates = await valkey.sdiff(VK_MITRAS_ONLINE, VK_MITRAS_DEACTIVATED) if (!candidates.length) return [] @@ -91,17 +91,23 @@ const findAvailableMitrasFromValkey = async () => { const pipe = valkey.pipeline() for (const id of candidates) { pipe.get(vkCapacityKey(id)) - pipe.get(vkHeartbeatKey(id)) + if (require_ping) pipe.get(vkHeartbeatKey(id)) } const results = await pipe.exec() + const stride = require_ping ? 2 : 1 const cutoff = Date.now() - stale_after_seconds * 1000 const eligible = [] for (let i = 0; i < candidates.length; i++) { - const capacity = Number(results[i * 2][1] ?? 0) - const heartbeat = results[i * 2 + 1][1] + const capacity = Number(results[i * stride][1] ?? 0) if (capacity >= max_customers_per_mitra) continue - if (!heartbeat || Date.parse(heartbeat) < cutoff) continue + // See computeAvailabilityFromValkey in mitra-status.service.js: when the + // ping requirement is off, the sweep is off too, so we don't gate + // candidate selection on heartbeat freshness here either. + if (require_ping) { + const heartbeat = results[i * stride + 1][1] + if (!heartbeat || Date.parse(heartbeat) < cutoff) continue + } eligible.push({ id: candidates[i], active_session_count: capacity }) } return eligible diff --git a/backend/test/services/mitra-status.valkey-mirror.test.js b/backend/test/services/mitra-status.valkey-mirror.test.js index e99b1e3..b85e72c 100644 --- a/backend/test/services/mitra-status.valkey-mirror.test.js +++ b/backend/test/services/mitra-status.valkey-mirror.test.js @@ -224,6 +224,27 @@ describe('mitra-status valkey mirror', () => { expect(await isMitraReachable(m.id)).toBe(false) }) + + // Mirrors the autoOfflineStaleMitras "no-op when require_ping=false" + // contract: read paths must not gate on heartbeat when sweep doesn't. + it('returns true with stale heartbeat when require_ping=false', async () => { + const sql = db() + try { + await sql` + UPDATE app_config SET value=${sql.json({ value: false })} + WHERE key='require_mitra_ping' + ` + const m = await createMitra({ callName: 'NoPing', isOnline: true }) + await v().set(vkHeartbeatKey(m.id), new Date(Date.now() - 3_600_000).toISOString()) + + expect(await isMitraReachable(m.id)).toBe(true) + } finally { + await sql` + UPDATE app_config SET value=${sql.json({ value: true })} + WHERE key='require_mitra_ping' + ` + } + }) }) // ---------- recomputeCapacity ---------- @@ -331,6 +352,35 @@ describe('mitra-status valkey mirror', () => { await invalidateAvailabilityCache() expect(await v().get('availability:snapshot')).toBeNull() }) + + // Regression: when an operator turns off the ping requirement, the + // auto-offline sweep is also disabled, so heartbeats may legitimately + // become arbitrarily old. The beacon must NOT filter those out — that + // would put the CTA in a permanently-disabled state with no recovery + // path (sweep won't remove the mitra; cache always re-computes false). + it('includes mitras with stale heartbeats when require_ping=false', async () => { + const sql = db() + try { + await sql` + UPDATE app_config SET value=${sql.json({ value: false })} + WHERE key='require_mitra_ping' + ` + const m = await createMitra({ callName: 'NoPingRequired', isOnline: true }) + // Heartbeat 1 hour old — well past any reasonable stale_after_seconds. + await v().set(vkHeartbeatKey(m.id), new Date(Date.now() - 3_600_000).toISOString()) + await v().del('availability:snapshot') + + const result = await countAvailableMitrasFromCache() + expect(result.available).toBe(true) + expect(result.count).toBe(1) + } finally { + await sql` + UPDATE app_config SET value=${sql.json({ value: true })} + WHERE key='require_mitra_ping' + ` + await v().del('availability:snapshot') + } + }) }) // ---------- autoOfflineStaleMitras ----------