Mitra availability: read paths respect require_mitra_ping=false
When the operator sets require_mitra_ping=false, the auto-offline sweep early-returns (by design — "don't gate online status on heartbeat freshness"). The three Valkey read paths still gated on heartbeat freshness anyway, which trapped the system: sweep won't remove the mitra from mitras:online, but readers reject them as stale. The customer CTA stayed permanently disabled with no recovery. Fix all three to skip the heartbeat-freshness check when require_ping is off, matching the sweep's contract: - computeAvailabilityFromValkey (customer beacon) - isMitraReachable (extension service) - findAvailableMitrasFromValkey (pairing candidate finder) The Postgres fallbacks already did the right thing (is_online only, no heartbeat compare); this aligns the Valkey hot path. Also: PATCH /internal/config/mitra-ping now publishes config:invalidate for require_mitra_ping and mitra_stale_after_seconds, and the subscriber in mitra-status.service was widened to listen for both. Flipping the toggle in CC now busts the 10s availability snapshot immediately instead of waiting out the TTL. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
This commit is contained in:
@@ -190,6 +190,14 @@ export const internalConfigRoutes = async (app) => {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
const config = await setMitraPingConfig({ require_ping, stale_after_seconds })
|
const config = await setMitraPingConfig({ require_ping, stale_after_seconds })
|
||||||
|
// Bust the customer availability cache on any instance — subscribers in
|
||||||
|
// mitra-status.service.js listen for these keys and call invalidate.
|
||||||
|
if (require_ping !== undefined) {
|
||||||
|
await publishConfigInvalidate('require_mitra_ping')
|
||||||
|
}
|
||||||
|
if (stale_after_seconds !== undefined) {
|
||||||
|
await publishConfigInvalidate('mitra_stale_after_seconds')
|
||||||
|
}
|
||||||
return reply.send({ success: true, data: config })
|
return reply.send({ success: true, data: config })
|
||||||
})
|
})
|
||||||
|
|
||||||
|
|||||||
@@ -74,14 +74,23 @@ export const invalidateAvailabilityCache = async () => {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// Bust the shared cache when CC changes max_customers_per_mitra (any instance).
|
// Bust the shared cache when CC changes any config that the beacon snapshots
|
||||||
|
// over: max_customers_per_mitra (capacity gate), require_mitra_ping (whether
|
||||||
|
// stale heartbeats exclude candidates), mitra_stale_after_seconds (the gate's
|
||||||
|
// threshold itself).
|
||||||
|
const AVAILABILITY_CACHE_INVALIDATING_KEYS = new Set([
|
||||||
|
'max_customers_per_mitra',
|
||||||
|
'require_mitra_ping',
|
||||||
|
'mitra_stale_after_seconds',
|
||||||
|
])
|
||||||
|
|
||||||
let _subscribed = false
|
let _subscribed = false
|
||||||
const ensureSubscribed = () => {
|
const ensureSubscribed = () => {
|
||||||
if (_subscribed) return
|
if (_subscribed) return
|
||||||
_subscribed = true
|
_subscribed = true
|
||||||
try {
|
try {
|
||||||
subscribe('config:invalidate', (msg) => {
|
subscribe('config:invalidate', (msg) => {
|
||||||
if (msg?.key === 'max_customers_per_mitra') {
|
if (msg?.key && AVAILABILITY_CACHE_INVALIDATING_KEYS.has(msg.key)) {
|
||||||
invalidateAvailabilityCache()
|
invalidateAvailabilityCache()
|
||||||
}
|
}
|
||||||
})
|
})
|
||||||
@@ -349,7 +358,7 @@ export const mirrorHeartbeatsToPostgres = async () => {
|
|||||||
*/
|
*/
|
||||||
const computeAvailabilityFromValkey = async () => {
|
const computeAvailabilityFromValkey = async () => {
|
||||||
const { max_customers_per_mitra } = await getMaxCustomersPerMitra()
|
const { max_customers_per_mitra } = await getMaxCustomersPerMitra()
|
||||||
const { stale_after_seconds } = await getMitraPingConfig()
|
const { require_ping, stale_after_seconds } = await getMitraPingConfig()
|
||||||
|
|
||||||
const candidates = await valkey.sdiff(VK_MITRAS_ONLINE, VK_MITRAS_DEACTIVATED)
|
const candidates = await valkey.sdiff(VK_MITRAS_ONLINE, VK_MITRAS_DEACTIVATED)
|
||||||
if (!candidates.length) return { available: false, count: 0 }
|
if (!candidates.length) return { available: false, count: 0 }
|
||||||
@@ -357,17 +366,26 @@ const computeAvailabilityFromValkey = async () => {
|
|||||||
const pipe = valkey.pipeline()
|
const pipe = valkey.pipeline()
|
||||||
for (const id of candidates) {
|
for (const id of candidates) {
|
||||||
pipe.get(vkCapacityKey(id))
|
pipe.get(vkCapacityKey(id))
|
||||||
pipe.get(vkHeartbeatKey(id))
|
if (require_ping) pipe.get(vkHeartbeatKey(id))
|
||||||
}
|
}
|
||||||
const results = await pipe.exec()
|
const results = await pipe.exec()
|
||||||
|
const stride = require_ping ? 2 : 1
|
||||||
|
|
||||||
const cutoff = Date.now() - stale_after_seconds * 1000
|
const cutoff = Date.now() - stale_after_seconds * 1000
|
||||||
let count = 0
|
let count = 0
|
||||||
for (let i = 0; i < candidates.length; i++) {
|
for (let i = 0; i < candidates.length; i++) {
|
||||||
const capacity = Number(results[i * 2][1] ?? 0)
|
const capacity = Number(results[i * stride][1] ?? 0)
|
||||||
const heartbeat = results[i * 2 + 1][1]
|
|
||||||
if (capacity >= max_customers_per_mitra) continue
|
if (capacity >= max_customers_per_mitra) continue
|
||||||
if (!heartbeat || Date.parse(heartbeat) < cutoff) continue
|
// When the operator has turned `require_mitra_ping` off, the auto-offline
|
||||||
|
// sweep is also a no-op (see autoOfflineStaleMitras early-return). Mitras
|
||||||
|
// stay in `mitras:online` until they explicitly toggle offline, so reading
|
||||||
|
// a stale heartbeat here doesn't mean "unreachable" — it means "we aren't
|
||||||
|
// tracking liveness." Skip the freshness gate to stay consistent with the
|
||||||
|
// sweep, and to match what the Postgres fallback returns (is_online only).
|
||||||
|
if (require_ping) {
|
||||||
|
const heartbeat = results[i * stride + 1][1]
|
||||||
|
if (!heartbeat || Date.parse(heartbeat) < cutoff) continue
|
||||||
|
}
|
||||||
count++
|
count++
|
||||||
}
|
}
|
||||||
return { available: count > 0, count }
|
return { available: count > 0, count }
|
||||||
@@ -409,14 +427,19 @@ export const countAvailableMitrasFromCache = async () => {
|
|||||||
* Falls back to a Postgres `is_online` read if Valkey is unreachable; the
|
* Falls back to a Postgres `is_online` read if Valkey is unreachable; the
|
||||||
* fallback skips the heartbeat-freshness check (sweep takes care of stale rows
|
* fallback skips the heartbeat-freshness check (sweep takes care of stale rows
|
||||||
* within `stale_after_seconds + sweep_cadence`).
|
* within `stale_after_seconds + sweep_cadence`).
|
||||||
|
*
|
||||||
|
* When `require_mitra_ping=false`, both the auto-offline sweep AND this check
|
||||||
|
* skip the heartbeat gate so the read path matches the sweep's contract: a
|
||||||
|
* mitra stays "reachable" until they explicitly toggle offline.
|
||||||
*/
|
*/
|
||||||
export const isMitraReachable = async (mitraId) => {
|
export const isMitraReachable = async (mitraId) => {
|
||||||
try {
|
try {
|
||||||
const inSet = await valkey.sismember(VK_MITRAS_ONLINE, mitraId)
|
const inSet = await valkey.sismember(VK_MITRAS_ONLINE, mitraId)
|
||||||
if (!inSet) return false
|
if (!inSet) return false
|
||||||
|
const { require_ping, stale_after_seconds } = await getMitraPingConfig()
|
||||||
|
if (!require_ping) return true
|
||||||
const heartbeat = await valkey.get(vkHeartbeatKey(mitraId))
|
const heartbeat = await valkey.get(vkHeartbeatKey(mitraId))
|
||||||
if (!heartbeat) return false
|
if (!heartbeat) return false
|
||||||
const { stale_after_seconds } = await getMitraPingConfig()
|
|
||||||
return Date.parse(heartbeat) >= Date.now() - stale_after_seconds * 1000
|
return Date.parse(heartbeat) >= Date.now() - stale_after_seconds * 1000
|
||||||
} catch (err) {
|
} catch (err) {
|
||||||
console.warn('[isMitraReachable] valkey unavailable, falling back to DB:', err.message)
|
console.warn('[isMitraReachable] valkey unavailable, falling back to DB:', err.message)
|
||||||
|
|||||||
@@ -83,7 +83,7 @@ const notifyCustomer = async (customerId, data) => {
|
|||||||
// Postgres fallback runs if any Valkey op throws (full JOIN as before).
|
// Postgres fallback runs if any Valkey op throws (full JOIN as before).
|
||||||
const findAvailableMitrasFromValkey = async () => {
|
const findAvailableMitrasFromValkey = async () => {
|
||||||
const { max_customers_per_mitra } = await getMaxCustomersPerMitra()
|
const { max_customers_per_mitra } = await getMaxCustomersPerMitra()
|
||||||
const { stale_after_seconds } = await getMitraPingConfig()
|
const { require_ping, stale_after_seconds } = await getMitraPingConfig()
|
||||||
|
|
||||||
const candidates = await valkey.sdiff(VK_MITRAS_ONLINE, VK_MITRAS_DEACTIVATED)
|
const candidates = await valkey.sdiff(VK_MITRAS_ONLINE, VK_MITRAS_DEACTIVATED)
|
||||||
if (!candidates.length) return []
|
if (!candidates.length) return []
|
||||||
@@ -91,17 +91,23 @@ const findAvailableMitrasFromValkey = async () => {
|
|||||||
const pipe = valkey.pipeline()
|
const pipe = valkey.pipeline()
|
||||||
for (const id of candidates) {
|
for (const id of candidates) {
|
||||||
pipe.get(vkCapacityKey(id))
|
pipe.get(vkCapacityKey(id))
|
||||||
pipe.get(vkHeartbeatKey(id))
|
if (require_ping) pipe.get(vkHeartbeatKey(id))
|
||||||
}
|
}
|
||||||
const results = await pipe.exec()
|
const results = await pipe.exec()
|
||||||
|
const stride = require_ping ? 2 : 1
|
||||||
|
|
||||||
const cutoff = Date.now() - stale_after_seconds * 1000
|
const cutoff = Date.now() - stale_after_seconds * 1000
|
||||||
const eligible = []
|
const eligible = []
|
||||||
for (let i = 0; i < candidates.length; i++) {
|
for (let i = 0; i < candidates.length; i++) {
|
||||||
const capacity = Number(results[i * 2][1] ?? 0)
|
const capacity = Number(results[i * stride][1] ?? 0)
|
||||||
const heartbeat = results[i * 2 + 1][1]
|
|
||||||
if (capacity >= max_customers_per_mitra) continue
|
if (capacity >= max_customers_per_mitra) continue
|
||||||
if (!heartbeat || Date.parse(heartbeat) < cutoff) continue
|
// See computeAvailabilityFromValkey in mitra-status.service.js: when the
|
||||||
|
// ping requirement is off, the sweep is off too, so we don't gate
|
||||||
|
// candidate selection on heartbeat freshness here either.
|
||||||
|
if (require_ping) {
|
||||||
|
const heartbeat = results[i * stride + 1][1]
|
||||||
|
if (!heartbeat || Date.parse(heartbeat) < cutoff) continue
|
||||||
|
}
|
||||||
eligible.push({ id: candidates[i], active_session_count: capacity })
|
eligible.push({ id: candidates[i], active_session_count: capacity })
|
||||||
}
|
}
|
||||||
return eligible
|
return eligible
|
||||||
|
|||||||
@@ -224,6 +224,27 @@ describe('mitra-status valkey mirror', () => {
|
|||||||
|
|
||||||
expect(await isMitraReachable(m.id)).toBe(false)
|
expect(await isMitraReachable(m.id)).toBe(false)
|
||||||
})
|
})
|
||||||
|
|
||||||
|
// Mirrors the autoOfflineStaleMitras "no-op when require_ping=false"
|
||||||
|
// contract: read paths must not gate on heartbeat when sweep doesn't.
|
||||||
|
it('returns true with stale heartbeat when require_ping=false', async () => {
|
||||||
|
const sql = db()
|
||||||
|
try {
|
||||||
|
await sql`
|
||||||
|
UPDATE app_config SET value=${sql.json({ value: false })}
|
||||||
|
WHERE key='require_mitra_ping'
|
||||||
|
`
|
||||||
|
const m = await createMitra({ callName: 'NoPing', isOnline: true })
|
||||||
|
await v().set(vkHeartbeatKey(m.id), new Date(Date.now() - 3_600_000).toISOString())
|
||||||
|
|
||||||
|
expect(await isMitraReachable(m.id)).toBe(true)
|
||||||
|
} finally {
|
||||||
|
await sql`
|
||||||
|
UPDATE app_config SET value=${sql.json({ value: true })}
|
||||||
|
WHERE key='require_mitra_ping'
|
||||||
|
`
|
||||||
|
}
|
||||||
|
})
|
||||||
})
|
})
|
||||||
|
|
||||||
// ---------- recomputeCapacity ----------
|
// ---------- recomputeCapacity ----------
|
||||||
@@ -331,6 +352,35 @@ describe('mitra-status valkey mirror', () => {
|
|||||||
await invalidateAvailabilityCache()
|
await invalidateAvailabilityCache()
|
||||||
expect(await v().get('availability:snapshot')).toBeNull()
|
expect(await v().get('availability:snapshot')).toBeNull()
|
||||||
})
|
})
|
||||||
|
|
||||||
|
// Regression: when an operator turns off the ping requirement, the
|
||||||
|
// auto-offline sweep is also disabled, so heartbeats may legitimately
|
||||||
|
// become arbitrarily old. The beacon must NOT filter those out — that
|
||||||
|
// would put the CTA in a permanently-disabled state with no recovery
|
||||||
|
// path (sweep won't remove the mitra; cache always re-computes false).
|
||||||
|
it('includes mitras with stale heartbeats when require_ping=false', async () => {
|
||||||
|
const sql = db()
|
||||||
|
try {
|
||||||
|
await sql`
|
||||||
|
UPDATE app_config SET value=${sql.json({ value: false })}
|
||||||
|
WHERE key='require_mitra_ping'
|
||||||
|
`
|
||||||
|
const m = await createMitra({ callName: 'NoPingRequired', isOnline: true })
|
||||||
|
// Heartbeat 1 hour old — well past any reasonable stale_after_seconds.
|
||||||
|
await v().set(vkHeartbeatKey(m.id), new Date(Date.now() - 3_600_000).toISOString())
|
||||||
|
await v().del('availability:snapshot')
|
||||||
|
|
||||||
|
const result = await countAvailableMitrasFromCache()
|
||||||
|
expect(result.available).toBe(true)
|
||||||
|
expect(result.count).toBe(1)
|
||||||
|
} finally {
|
||||||
|
await sql`
|
||||||
|
UPDATE app_config SET value=${sql.json({ value: true })}
|
||||||
|
WHERE key='require_mitra_ping'
|
||||||
|
`
|
||||||
|
await v().del('availability:snapshot')
|
||||||
|
}
|
||||||
|
})
|
||||||
})
|
})
|
||||||
|
|
||||||
// ---------- autoOfflineStaleMitras ----------
|
// ---------- autoOfflineStaleMitras ----------
|
||||||
|
|||||||
Reference in New Issue
Block a user