Compare commits
2 Commits
553dbac52f
...
d60c048776
| Author | SHA1 | Date | |
|---|---|---|---|
| d60c048776 | |||
| 3052f7b799 |
@@ -71,3 +71,17 @@ XENDIT_SECRET_KEY=
|
||||
XENDIT_WEBHOOK_TOKEN=
|
||||
XENDIT_SUCCESS_REDIRECT_URL=
|
||||
XENDIT_FAILURE_REDIRECT_URL=
|
||||
|
||||
# --- Xendit webhook survival sink (optional file-based fallback) ---
|
||||
#
|
||||
# When the primary DB log (`webhook_logs` table) is unreachable, the route can
|
||||
# spill each inbound webhook to a daily-rolling JSONL file instead. Disabled
|
||||
# by default — production opts in by mounting a persistent volume / GCS sync
|
||||
# and flipping XENDIT_WEBHOOK_FALLBACK_ENABLED=true. The handler will NOT
|
||||
# fall back to stdout; if both DB and this sink are unavailable the event is
|
||||
# dropped.
|
||||
#
|
||||
# Filename pattern: "<NAME>-YYYY-MM-DD.jsonl" (UTC day boundary).
|
||||
XENDIT_WEBHOOK_FALLBACK_ENABLED=false
|
||||
XENDIT_WEBHOOK_FALLBACK_DIR=./logs
|
||||
XENDIT_WEBHOOK_FALLBACK_NAME=xendit-webhook-fallback
|
||||
|
||||
@@ -1086,6 +1086,65 @@ const migrate = async () => {
|
||||
AND product_type = 'chat_session'
|
||||
`
|
||||
|
||||
// 10. webhook_logs — survival/audit table for every inbound payment-provider
|
||||
// webhook. The route handler inserts a row BEFORE auth checks or business
|
||||
// logic so a forensic record exists even when the request is rejected,
|
||||
// the body is malformed, or processing throws.
|
||||
//
|
||||
// Primary fields are extracted as columns (queryable in CC, indexed where
|
||||
// useful); the full body is kept in `raw_body` JSONB so we can replay or
|
||||
// diff later. `provider` is a string column (not enum) so adding a new
|
||||
// payment provider doesn't require a migration.
|
||||
//
|
||||
// No FK to payment_requests — logs must survive even if the matching
|
||||
// payment row was wiped, never existed, or arrives for a product/event
|
||||
// type we don't yet model.
|
||||
await sql`
|
||||
CREATE TABLE IF NOT EXISTS webhook_logs (
|
||||
id UUID PRIMARY KEY DEFAULT gen_random_uuid(),
|
||||
provider TEXT NOT NULL DEFAULT 'xendit',
|
||||
received_at TIMESTAMPTZ NOT NULL DEFAULT NOW(),
|
||||
|
||||
-- Parsed primary fields (Xendit Invoice callback shape — NULL for other event types)
|
||||
xendit_event_id TEXT,
|
||||
external_id TEXT,
|
||||
payment_request_id UUID,
|
||||
status TEXT,
|
||||
amount BIGINT,
|
||||
currency TEXT,
|
||||
payment_method TEXT,
|
||||
paid_at TIMESTAMPTZ,
|
||||
metadata_app TEXT,
|
||||
|
||||
-- Integrity + verbatim record
|
||||
callback_token_valid BOOLEAN NOT NULL,
|
||||
headers JSONB NOT NULL,
|
||||
raw_body JSONB NOT NULL,
|
||||
|
||||
-- Outcome (filled by the handler after processing). Leaving these NULL
|
||||
-- is itself a useful signal — it means the handler crashed before the
|
||||
-- finally block could stamp the result.
|
||||
http_status SMALLINT,
|
||||
processing_result TEXT,
|
||||
processing_error TEXT,
|
||||
processed_at TIMESTAMPTZ
|
||||
)
|
||||
`
|
||||
await sql`
|
||||
CREATE INDEX IF NOT EXISTS idx_webhook_logs_received_at
|
||||
ON webhook_logs (received_at DESC)
|
||||
`
|
||||
await sql`
|
||||
CREATE INDEX IF NOT EXISTS idx_webhook_logs_external_id
|
||||
ON webhook_logs (external_id)
|
||||
WHERE external_id IS NOT NULL
|
||||
`
|
||||
await sql`
|
||||
CREATE INDEX IF NOT EXISTS idx_webhook_logs_payment_request
|
||||
ON webhook_logs (payment_request_id)
|
||||
WHERE payment_request_id IS NOT NULL
|
||||
`
|
||||
|
||||
console.log('Migration complete.')
|
||||
await sql.end()
|
||||
}
|
||||
|
||||
@@ -190,6 +190,14 @@ export const internalConfigRoutes = async (app) => {
|
||||
}
|
||||
}
|
||||
const config = await setMitraPingConfig({ require_ping, stale_after_seconds })
|
||||
// Bust the customer availability cache on any instance — subscribers in
|
||||
// mitra-status.service.js listen for these keys and call invalidate.
|
||||
if (require_ping !== undefined) {
|
||||
await publishConfigInvalidate('require_mitra_ping')
|
||||
}
|
||||
if (stale_after_seconds !== undefined) {
|
||||
await publishConfigInvalidate('mitra_stale_after_seconds')
|
||||
}
|
||||
return reply.send({ success: true, data: config })
|
||||
})
|
||||
|
||||
|
||||
@@ -6,87 +6,205 @@
|
||||
// `x-callback-token` header verified against env XENDIT_WEBHOOK_TOKEN.
|
||||
//
|
||||
// Body shape from Xendit Invoice callback (relevant fields only):
|
||||
// { id, external_id, status, amount, payment_method, paid_at, ... }
|
||||
// { id, external_id, status, amount, payment_method, paid_at, metadata, ... }
|
||||
//
|
||||
// Handled statuses: PAID (→ confirmPayment), EXPIRED (→ expirePayment).
|
||||
// Anything else ACKs with `{ ok: true, ignored: <status> }` for forward-compat.
|
||||
//
|
||||
// All state transitions go through payment.service.js — this handler is just
|
||||
// the entry point. Events emit from inside the service, not from here.
|
||||
// SURVIVAL LOGGING — every inbound webhook lands in `webhook_logs` BEFORE the
|
||||
// auth check or any business logic, so we keep a forensic record even when:
|
||||
// • the token is wrong (401)
|
||||
// • the body is malformed / missing external_id
|
||||
// • the referenced payment_request doesn't exist
|
||||
// • amount mismatches and we reject (409)
|
||||
// • downstream processing throws (500)
|
||||
//
|
||||
// If the DB insert itself fails, an optional rolling-file sink can absorb the
|
||||
// event (see WEBHOOK_FALLBACK_* env vars in webhook-log.service.js). The sink
|
||||
// is disabled by default — production opts in by mounting a persistent volume
|
||||
// and flipping WEBHOOK_FALLBACK_ENABLED=true. We deliberately do NOT fall
|
||||
// back to stdout; operators decide if/where survival writes happen.
|
||||
//
|
||||
// CONTROL FLOW NOTE — branches set `result` + `httpStatus` + `responseBody`
|
||||
// into closure vars rather than calling `reply.send()` directly. Fastify
|
||||
// flushes the response on `reply.send()` and `app.inject` resolves before any
|
||||
// post-send finally would run, which would lose the outcome columns. We
|
||||
// instead update the log once at the end, then send the reply once.
|
||||
|
||||
import { confirmPayment, expirePayment, getPayment, verifyWebhookToken } from '../../services/payment.service.js'
|
||||
import {
|
||||
insertWebhookLog,
|
||||
sanitizeHeaders,
|
||||
updateWebhookLog,
|
||||
writeWebhookFallback,
|
||||
} from '../../services/webhook-log.service.js'
|
||||
|
||||
// Our payment_requests.id is a UUID. A non-UUID external_id is either a legacy
|
||||
// old-app invoice (should have been filtered by the router) or a stray Xendit
|
||||
// event for a product we don't model. Skip the DB lookup (it would throw on
|
||||
// the cast) and ACK so Xendit stops retrying.
|
||||
const UUID_RE = /^[0-9a-f]{8}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{12}$/i
|
||||
|
||||
export const paymentWebhookRoutes = async (app) => {
|
||||
app.post('/webhooks/xendit', async (request, reply) => {
|
||||
const headerToken = request.headers['x-callback-token']
|
||||
if (!verifyWebhookToken(headerToken)) {
|
||||
request.log.warn('xendit webhook: bad token')
|
||||
return reply.code(401).send({ error: 'invalid_token' })
|
||||
}
|
||||
|
||||
const body = request.body ?? {}
|
||||
const invoiceId = body.id
|
||||
const paymentRequestId = body.external_id
|
||||
const status = body.status
|
||||
const amount = typeof body.amount === 'number' ? body.amount : null
|
||||
const paymentMethod = body.payment_method ?? null
|
||||
|
||||
request.log.info(
|
||||
{ paymentRequestId, invoiceId, status, amount, paymentMethod },
|
||||
'xendit webhook received',
|
||||
)
|
||||
|
||||
if (!paymentRequestId) {
|
||||
// Forward-compat: future Xendit event types may not carry external_id
|
||||
return reply.send({ ok: true, ignored: 'no_external_id' })
|
||||
}
|
||||
|
||||
const existing = await getPayment(paymentRequestId)
|
||||
if (!existing) {
|
||||
// Unknown payment — could be stale orphan from a wiped dev DB. ACK so Xendit
|
||||
// stops retrying; warn so we notice if this becomes common in prod.
|
||||
request.log.warn({ paymentRequestId, invoiceId }, 'unknown payment_request — ACKing')
|
||||
return reply.send({ ok: true, ignored: 'unknown_payment_request' })
|
||||
}
|
||||
|
||||
if (status === 'PAID') {
|
||||
// Defensive: amount mismatch = either tampering or config drift. Refuse to confirm.
|
||||
if (amount !== null && amount !== existing.amount) {
|
||||
request.log.error(
|
||||
{ paymentRequestId, expected: existing.amount, got: amount },
|
||||
'xendit webhook: amount mismatch',
|
||||
)
|
||||
return reply.code(409).send({ error: 'amount_mismatch' })
|
||||
}
|
||||
const headerToken = request.headers['x-callback-token']
|
||||
const tokenValid = verifyWebhookToken(headerToken)
|
||||
const sanitizedHeaders = sanitizeHeaders(request.headers)
|
||||
|
||||
// --- STEP 1: survival log ----------------------------------------------
|
||||
// Insert before doing anything else. If this throws, attempt the optional
|
||||
// rolling-file sink (env-gated, off by default). If that ALSO fails or is
|
||||
// disabled, we silently continue — no stdout fallback by design, the
|
||||
// operator's env config decides where survival writes go.
|
||||
let logId = null
|
||||
try {
|
||||
logId = await insertWebhookLog({
|
||||
provider: 'xendit',
|
||||
headers: sanitizedHeaders,
|
||||
rawBody: body,
|
||||
callbackTokenValid: tokenValid,
|
||||
})
|
||||
} catch (dbErr) {
|
||||
try {
|
||||
await confirmPayment(paymentRequestId, { invoiceId, paymentMethod, amount })
|
||||
} catch (err) {
|
||||
// INVALID_STATE = already confirmed/consumed (Xendit retry); CONFLICT = race lost. ACK.
|
||||
// EXPIRED = customer paid AFTER our sweeper expired the row — painful, manual recovery
|
||||
// needed. Log loud so we notice. (D5 alignment should keep this rare.)
|
||||
if (err.code === 'INVALID_STATE' || err.code === 'CONFLICT') {
|
||||
request.log.info(
|
||||
{ paymentRequestId, code: err.code, prevStatus: existing.status },
|
||||
'xendit webhook: already in terminal state, ACKing',
|
||||
)
|
||||
} else if (err.code === 'EXPIRED') {
|
||||
request.log.error(
|
||||
{ paymentRequestId, expiredAt: existing.expires_at },
|
||||
'xendit webhook: PAID after expiry — manual recovery needed',
|
||||
)
|
||||
await writeWebhookFallback({
|
||||
provider: 'xendit',
|
||||
headers: sanitizedHeaders,
|
||||
rawBody: body,
|
||||
callbackTokenValid: tokenValid,
|
||||
dbErrorMessage: dbErr?.message,
|
||||
})
|
||||
} catch {
|
||||
// Both sinks down. Nothing left to try — proceed without an audit
|
||||
// trail. The operator chose this configuration; failure here is
|
||||
// their disk/permission/config issue to monitor externally.
|
||||
}
|
||||
}
|
||||
|
||||
// --- STEP 2: process ---------------------------------------------------
|
||||
// Defaults match the "uncaught exception in processing" case — finally
|
||||
// (well, the post-try outcome stamp below) will stamp these into the log.
|
||||
let result = 'error'
|
||||
let httpStatus = 500
|
||||
let responseBody = { error: 'internal_error' }
|
||||
let errorMsg = null
|
||||
|
||||
try {
|
||||
if (!tokenValid) {
|
||||
request.log.warn('xendit webhook: bad token')
|
||||
result = 'rejected_invalid_token'
|
||||
httpStatus = 401
|
||||
responseBody = { error: 'invalid_token' }
|
||||
} else {
|
||||
const invoiceId = body.id
|
||||
const paymentRequestId = body.external_id
|
||||
const status = body.status
|
||||
const amount = typeof body.amount === 'number' ? body.amount : null
|
||||
const paymentMethod = body.payment_method ?? null
|
||||
|
||||
request.log.info(
|
||||
{ paymentRequestId, invoiceId, status, amount, paymentMethod },
|
||||
'xendit webhook received',
|
||||
)
|
||||
|
||||
if (!paymentRequestId) {
|
||||
// Forward-compat: future Xendit event types may not carry external_id
|
||||
result = 'ignored_no_external_id'
|
||||
httpStatus = 200
|
||||
responseBody = { ok: true, ignored: 'no_external_id' }
|
||||
} else if (!UUID_RE.test(paymentRequestId)) {
|
||||
// Not one of ours — see UUID_RE comment above.
|
||||
request.log.warn({ paymentRequestId, invoiceId }, 'non-UUID external_id — ACKing')
|
||||
result = 'ignored_non_uuid_external_id'
|
||||
httpStatus = 200
|
||||
responseBody = { ok: true, ignored: 'non_uuid_external_id' }
|
||||
} else {
|
||||
throw err
|
||||
const existing = await getPayment(paymentRequestId)
|
||||
if (!existing) {
|
||||
// Unknown payment — could be stale orphan from a wiped dev DB. ACK so Xendit
|
||||
// stops retrying; warn so we notice if this becomes common in prod.
|
||||
request.log.warn({ paymentRequestId, invoiceId }, 'unknown payment_request — ACKing')
|
||||
result = 'ignored_unknown_payment_request'
|
||||
httpStatus = 200
|
||||
responseBody = { ok: true, ignored: 'unknown_payment_request' }
|
||||
} else if (status === 'PAID') {
|
||||
// Defensive: amount mismatch = either tampering or config drift. Refuse to confirm.
|
||||
if (amount !== null && amount !== existing.amount) {
|
||||
request.log.error(
|
||||
{ paymentRequestId, expected: existing.amount, got: amount },
|
||||
'xendit webhook: amount mismatch',
|
||||
)
|
||||
result = 'rejected_amount_mismatch'
|
||||
httpStatus = 409
|
||||
responseBody = { error: 'amount_mismatch' }
|
||||
} else {
|
||||
try {
|
||||
await confirmPayment(paymentRequestId, { invoiceId, paymentMethod, amount })
|
||||
result = 'confirmed'
|
||||
} catch (err) {
|
||||
// INVALID_STATE = already confirmed/consumed (Xendit retry); CONFLICT = race lost. ACK.
|
||||
// EXPIRED = customer paid AFTER our sweeper expired the row — painful, manual
|
||||
// recovery needed. Log loud so we notice. (D5 alignment should keep
|
||||
// this rare.)
|
||||
if (err.code === 'INVALID_STATE' || err.code === 'CONFLICT') {
|
||||
request.log.info(
|
||||
{ paymentRequestId, code: err.code, prevStatus: existing.status },
|
||||
'xendit webhook: already in terminal state, ACKing',
|
||||
)
|
||||
result = `idempotent_${err.code.toLowerCase()}`
|
||||
} else if (err.code === 'EXPIRED') {
|
||||
request.log.error(
|
||||
{ paymentRequestId, expiredAt: existing.expires_at },
|
||||
'xendit webhook: PAID after expiry — manual recovery needed',
|
||||
)
|
||||
result = 'paid_after_expiry'
|
||||
} else {
|
||||
throw err
|
||||
}
|
||||
}
|
||||
httpStatus = 200
|
||||
responseBody = { ok: true }
|
||||
}
|
||||
} else if (status === 'EXPIRED') {
|
||||
await expirePayment(paymentRequestId)
|
||||
result = 'expired'
|
||||
httpStatus = 200
|
||||
responseBody = { ok: true }
|
||||
} else {
|
||||
result = `ignored_${status}`
|
||||
httpStatus = 200
|
||||
responseBody = { ok: true, ignored: status }
|
||||
}
|
||||
}
|
||||
}
|
||||
return reply.send({ ok: true })
|
||||
} catch (err) {
|
||||
// Anything that bubbles out of the processing block lands here. We do NOT
|
||||
// re-throw — instead we ACK to ourselves (logged loudly) and return 500
|
||||
// so Xendit retries on its own schedule. The forensic row records why.
|
||||
errorMsg = err?.message ?? String(err)
|
||||
request.log.error(
|
||||
{ err: errorMsg, paymentRequestId: body.external_id },
|
||||
'xendit webhook: unhandled exception during processing',
|
||||
)
|
||||
// Defaults already at result='error', httpStatus=500, responseBody=error
|
||||
}
|
||||
|
||||
if (status === 'EXPIRED') {
|
||||
await expirePayment(paymentRequestId)
|
||||
return reply.send({ ok: true })
|
||||
// --- STEP 3: outcome stamp + reply -------------------------------------
|
||||
// Awaited so the log row is consistent before we hand control back to the
|
||||
// framework. Wrapped in its own try/catch: a failed update must never
|
||||
// prevent us from responding to Xendit.
|
||||
if (logId) {
|
||||
try {
|
||||
await updateWebhookLog(logId, {
|
||||
httpStatus,
|
||||
processingResult: result,
|
||||
processingError: errorMsg,
|
||||
})
|
||||
} catch (updErr) {
|
||||
request.log.error({ err: updErr?.message, logId }, 'webhook_log update failed')
|
||||
}
|
||||
}
|
||||
|
||||
return reply.send({ ok: true, ignored: status })
|
||||
return reply.code(httpStatus).send(responseBody)
|
||||
})
|
||||
}
|
||||
|
||||
@@ -74,14 +74,23 @@ export const invalidateAvailabilityCache = async () => {
|
||||
}
|
||||
}
|
||||
|
||||
// Bust the shared cache when CC changes max_customers_per_mitra (any instance).
|
||||
// Bust the shared cache when CC changes any config that the beacon snapshots
|
||||
// over: max_customers_per_mitra (capacity gate), require_mitra_ping (whether
|
||||
// stale heartbeats exclude candidates), mitra_stale_after_seconds (the gate's
|
||||
// threshold itself).
|
||||
const AVAILABILITY_CACHE_INVALIDATING_KEYS = new Set([
|
||||
'max_customers_per_mitra',
|
||||
'require_mitra_ping',
|
||||
'mitra_stale_after_seconds',
|
||||
])
|
||||
|
||||
let _subscribed = false
|
||||
const ensureSubscribed = () => {
|
||||
if (_subscribed) return
|
||||
_subscribed = true
|
||||
try {
|
||||
subscribe('config:invalidate', (msg) => {
|
||||
if (msg?.key === 'max_customers_per_mitra') {
|
||||
if (msg?.key && AVAILABILITY_CACHE_INVALIDATING_KEYS.has(msg.key)) {
|
||||
invalidateAvailabilityCache()
|
||||
}
|
||||
})
|
||||
@@ -349,7 +358,7 @@ export const mirrorHeartbeatsToPostgres = async () => {
|
||||
*/
|
||||
const computeAvailabilityFromValkey = async () => {
|
||||
const { max_customers_per_mitra } = await getMaxCustomersPerMitra()
|
||||
const { stale_after_seconds } = await getMitraPingConfig()
|
||||
const { require_ping, stale_after_seconds } = await getMitraPingConfig()
|
||||
|
||||
const candidates = await valkey.sdiff(VK_MITRAS_ONLINE, VK_MITRAS_DEACTIVATED)
|
||||
if (!candidates.length) return { available: false, count: 0 }
|
||||
@@ -357,17 +366,26 @@ const computeAvailabilityFromValkey = async () => {
|
||||
const pipe = valkey.pipeline()
|
||||
for (const id of candidates) {
|
||||
pipe.get(vkCapacityKey(id))
|
||||
pipe.get(vkHeartbeatKey(id))
|
||||
if (require_ping) pipe.get(vkHeartbeatKey(id))
|
||||
}
|
||||
const results = await pipe.exec()
|
||||
const stride = require_ping ? 2 : 1
|
||||
|
||||
const cutoff = Date.now() - stale_after_seconds * 1000
|
||||
let count = 0
|
||||
for (let i = 0; i < candidates.length; i++) {
|
||||
const capacity = Number(results[i * 2][1] ?? 0)
|
||||
const heartbeat = results[i * 2 + 1][1]
|
||||
const capacity = Number(results[i * stride][1] ?? 0)
|
||||
if (capacity >= max_customers_per_mitra) continue
|
||||
if (!heartbeat || Date.parse(heartbeat) < cutoff) continue
|
||||
// When the operator has turned `require_mitra_ping` off, the auto-offline
|
||||
// sweep is also a no-op (see autoOfflineStaleMitras early-return). Mitras
|
||||
// stay in `mitras:online` until they explicitly toggle offline, so reading
|
||||
// a stale heartbeat here doesn't mean "unreachable" — it means "we aren't
|
||||
// tracking liveness." Skip the freshness gate to stay consistent with the
|
||||
// sweep, and to match what the Postgres fallback returns (is_online only).
|
||||
if (require_ping) {
|
||||
const heartbeat = results[i * stride + 1][1]
|
||||
if (!heartbeat || Date.parse(heartbeat) < cutoff) continue
|
||||
}
|
||||
count++
|
||||
}
|
||||
return { available: count > 0, count }
|
||||
@@ -409,14 +427,19 @@ export const countAvailableMitrasFromCache = async () => {
|
||||
* Falls back to a Postgres `is_online` read if Valkey is unreachable; the
|
||||
* fallback skips the heartbeat-freshness check (sweep takes care of stale rows
|
||||
* within `stale_after_seconds + sweep_cadence`).
|
||||
*
|
||||
* When `require_mitra_ping=false`, both the auto-offline sweep AND this check
|
||||
* skip the heartbeat gate so the read path matches the sweep's contract: a
|
||||
* mitra stays "reachable" until they explicitly toggle offline.
|
||||
*/
|
||||
export const isMitraReachable = async (mitraId) => {
|
||||
try {
|
||||
const inSet = await valkey.sismember(VK_MITRAS_ONLINE, mitraId)
|
||||
if (!inSet) return false
|
||||
const { require_ping, stale_after_seconds } = await getMitraPingConfig()
|
||||
if (!require_ping) return true
|
||||
const heartbeat = await valkey.get(vkHeartbeatKey(mitraId))
|
||||
if (!heartbeat) return false
|
||||
const { stale_after_seconds } = await getMitraPingConfig()
|
||||
return Date.parse(heartbeat) >= Date.now() - stale_after_seconds * 1000
|
||||
} catch (err) {
|
||||
console.warn('[isMitraReachable] valkey unavailable, falling back to DB:', err.message)
|
||||
|
||||
@@ -83,7 +83,7 @@ const notifyCustomer = async (customerId, data) => {
|
||||
// Postgres fallback runs if any Valkey op throws (full JOIN as before).
|
||||
const findAvailableMitrasFromValkey = async () => {
|
||||
const { max_customers_per_mitra } = await getMaxCustomersPerMitra()
|
||||
const { stale_after_seconds } = await getMitraPingConfig()
|
||||
const { require_ping, stale_after_seconds } = await getMitraPingConfig()
|
||||
|
||||
const candidates = await valkey.sdiff(VK_MITRAS_ONLINE, VK_MITRAS_DEACTIVATED)
|
||||
if (!candidates.length) return []
|
||||
@@ -91,17 +91,23 @@ const findAvailableMitrasFromValkey = async () => {
|
||||
const pipe = valkey.pipeline()
|
||||
for (const id of candidates) {
|
||||
pipe.get(vkCapacityKey(id))
|
||||
pipe.get(vkHeartbeatKey(id))
|
||||
if (require_ping) pipe.get(vkHeartbeatKey(id))
|
||||
}
|
||||
const results = await pipe.exec()
|
||||
const stride = require_ping ? 2 : 1
|
||||
|
||||
const cutoff = Date.now() - stale_after_seconds * 1000
|
||||
const eligible = []
|
||||
for (let i = 0; i < candidates.length; i++) {
|
||||
const capacity = Number(results[i * 2][1] ?? 0)
|
||||
const heartbeat = results[i * 2 + 1][1]
|
||||
const capacity = Number(results[i * stride][1] ?? 0)
|
||||
if (capacity >= max_customers_per_mitra) continue
|
||||
if (!heartbeat || Date.parse(heartbeat) < cutoff) continue
|
||||
// See computeAvailabilityFromValkey in mitra-status.service.js: when the
|
||||
// ping requirement is off, the sweep is off too, so we don't gate
|
||||
// candidate selection on heartbeat freshness here either.
|
||||
if (require_ping) {
|
||||
const heartbeat = results[i * stride + 1][1]
|
||||
if (!heartbeat || Date.parse(heartbeat) < cutoff) continue
|
||||
}
|
||||
eligible.push({ id: candidates[i], active_session_count: capacity })
|
||||
}
|
||||
return eligible
|
||||
|
||||
@@ -98,6 +98,9 @@ const createXenditInvoice = async ({ paymentRequestId, amount, ttlMinutes, descr
|
||||
currency: 'IDR',
|
||||
successRedirectUrl: successRedirectUrl || undefined,
|
||||
failureRedirectUrl: failureRedirectUrl || undefined,
|
||||
// Stamped so a shared webhook router (no DB access) can route v1 vs v2 traffic
|
||||
// purely from the echoed payload. Keep this string stable — it is a routing key.
|
||||
metadata: { app: 'halobestie_v2' },
|
||||
// paymentMethods omitted → honor dashboard config (operator picks methods without a deploy)
|
||||
},
|
||||
})
|
||||
|
||||
166
backend/src/services/webhook-log.service.js
Normal file
166
backend/src/services/webhook-log.service.js
Normal file
@@ -0,0 +1,166 @@
|
||||
// Provider-agnostic webhook audit log. Every inbound provider webhook (Xendit
|
||||
// today; others later) gets a row in `webhook_logs` BEFORE we run any auth or
|
||||
// business logic, so a forensic record survives rejection, malformed bodies,
|
||||
// and processing exceptions.
|
||||
//
|
||||
// Primary fields are parsed as columns (queryable / indexed); the entire body
|
||||
// stays in `raw_body` JSONB verbatim so we can replay or diff later. Outcome
|
||||
// columns are stamped by the route after processing.
|
||||
//
|
||||
// SECONDARY SINK — when the DB insert itself fails (DB outage, schema drift),
|
||||
// an optional file-based sink writes one JSONL line per event to a daily
|
||||
// rolling file. Disabled by default; opt in via env. We deliberately do NOT
|
||||
// fall back to stdout — operators who want survival pick the file sink.
|
||||
//
|
||||
// This module exposes only what the route needs:
|
||||
// sanitizeHeaders(headers) → header dict with secrets redacted
|
||||
// insertWebhookLog({...}) → returns inserted row id
|
||||
// updateWebhookLog(id, {...}) → stamps outcome columns
|
||||
// writeWebhookFallback({...}) → optional file sink on DB failure
|
||||
// getWebhookLog(id) [test/CC use] → row or null
|
||||
|
||||
import { appendFile, mkdir } from 'node:fs/promises'
|
||||
import path from 'node:path'
|
||||
import { getDb } from '../db/client.js'
|
||||
|
||||
const sql = getDb()
|
||||
|
||||
// Header values we never persist. We still keep the *names* so callers can see
|
||||
// which auth scheme the request used — only the secret material is dropped.
|
||||
const SECRET_HEADER_NAMES = new Set([
|
||||
'x-callback-token',
|
||||
'authorization',
|
||||
'cookie',
|
||||
'set-cookie',
|
||||
'proxy-authorization',
|
||||
])
|
||||
|
||||
export const sanitizeHeaders = (headers) => {
|
||||
const out = {}
|
||||
for (const [k, v] of Object.entries(headers ?? {})) {
|
||||
const key = k.toLowerCase()
|
||||
out[key] = SECRET_HEADER_NAMES.has(key) ? '[REDACTED]' : v
|
||||
}
|
||||
return out
|
||||
}
|
||||
|
||||
const UUID_RE = /^[0-9a-f]{8}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{12}$/i
|
||||
const isUuid = (s) => typeof s === 'string' && UUID_RE.test(s)
|
||||
const str = (v) => (typeof v === 'string' ? v : null)
|
||||
const num = (v) => (typeof v === 'number' && Number.isFinite(v) ? v : null)
|
||||
|
||||
const parseTimestamp = (v) => {
|
||||
if (typeof v !== 'string') return null
|
||||
const d = new Date(v)
|
||||
return Number.isFinite(d.getTime()) ? d.toISOString() : null
|
||||
}
|
||||
|
||||
/**
|
||||
* Insert the survival row. Throws only if the DB itself is unreachable — the
|
||||
* caller wraps this in try/catch and falls through to a structured-log
|
||||
* fallback so a misbehaving DB never causes Xendit to retry forever.
|
||||
*/
|
||||
export const insertWebhookLog = async ({ provider, headers, rawBody, callbackTokenValid }) => {
|
||||
const body = rawBody ?? {}
|
||||
const externalId = str(body.external_id)
|
||||
const paymentRequestId = isUuid(externalId) ? externalId : null
|
||||
const [row] = await sql`
|
||||
INSERT INTO webhook_logs (
|
||||
provider, headers, raw_body, callback_token_valid,
|
||||
xendit_event_id, external_id, payment_request_id,
|
||||
status, amount, currency, payment_method, paid_at, metadata_app
|
||||
) VALUES (
|
||||
${provider},
|
||||
${sql.json(headers ?? {})},
|
||||
${sql.json(body)},
|
||||
${callbackTokenValid},
|
||||
${str(body.id)},
|
||||
${externalId},
|
||||
${paymentRequestId},
|
||||
${str(body.status)},
|
||||
${num(body.amount)},
|
||||
${str(body.currency)},
|
||||
${str(body.payment_method)},
|
||||
${parseTimestamp(body.paid_at)},
|
||||
${str(body?.metadata?.app)}
|
||||
)
|
||||
RETURNING id
|
||||
`
|
||||
return row.id
|
||||
}
|
||||
|
||||
/**
|
||||
* Stamp outcome columns once the handler has decided a response. Wrapped by
|
||||
* the caller in its own try/catch so a failed update never replaces the
|
||||
* original processing error.
|
||||
*/
|
||||
export const updateWebhookLog = async (logId, { httpStatus, processingResult, processingError }) => {
|
||||
await sql`
|
||||
UPDATE webhook_logs
|
||||
SET http_status = ${httpStatus ?? null},
|
||||
processing_result = ${processingResult ?? null},
|
||||
processing_error = ${processingError ?? null},
|
||||
processed_at = NOW()
|
||||
WHERE id = ${logId}
|
||||
`
|
||||
}
|
||||
|
||||
export const getWebhookLog = async (logId) => {
|
||||
const rows = await sql`SELECT * FROM webhook_logs WHERE id = ${logId}`
|
||||
return rows[0] ?? null
|
||||
}
|
||||
|
||||
// --- Secondary sink: rolling JSONL file --------------------------------------
|
||||
//
|
||||
// Only used when the DB insert above fails. Env-controlled so production can
|
||||
// opt in (e.g. mount a persistent volume + tail with logrotate) without
|
||||
// affecting dev/test machines. By default the sink is OFF — the handler will
|
||||
// silently skip it and rely on the DB as the sole audit trail.
|
||||
//
|
||||
// Config (xendit_ prefix groups these with the other XENDIT_* vars):
|
||||
// XENDIT_WEBHOOK_FALLBACK_ENABLED — "true" to enable; anything else = off (default off)
|
||||
// XENDIT_WEBHOOK_FALLBACK_DIR — directory the JSONL files live in (default "./logs")
|
||||
// XENDIT_WEBHOOK_FALLBACK_NAME — basename for rolling files (default "xendit-webhook-fallback")
|
||||
//
|
||||
// Rolling pattern: one file per UTC day, named "<NAME>-YYYY-MM-DD.jsonl".
|
||||
// Old files accumulate — pair with logrotate / GCS sync if you need retention
|
||||
// limits. We don't size-rotate inside Node because the fallback only fires on
|
||||
// DB outage, which should be rare enough that daily granularity is plenty.
|
||||
|
||||
export const getWebhookFallbackConfig = () => ({
|
||||
enabled: process.env.XENDIT_WEBHOOK_FALLBACK_ENABLED === 'true',
|
||||
dir: process.env.XENDIT_WEBHOOK_FALLBACK_DIR || './logs',
|
||||
name: process.env.XENDIT_WEBHOOK_FALLBACK_NAME || 'xendit-webhook-fallback',
|
||||
})
|
||||
|
||||
export const rollingFallbackFilename = (name, now = new Date()) => {
|
||||
const yyyy = now.getUTCFullYear()
|
||||
const mm = String(now.getUTCMonth() + 1).padStart(2, '0')
|
||||
const dd = String(now.getUTCDate()).padStart(2, '0')
|
||||
return `${name}-${yyyy}-${mm}-${dd}.jsonl`
|
||||
}
|
||||
|
||||
/**
|
||||
* Append one JSONL line to the rolling fallback file. Returns true if the
|
||||
* write happened, false if the sink is disabled. Throws on real I/O errors —
|
||||
* the caller decides whether to swallow.
|
||||
*/
|
||||
export const writeWebhookFallback = async ({ provider, headers, rawBody, callbackTokenValid, dbErrorMessage }) => {
|
||||
const cfg = getWebhookFallbackConfig()
|
||||
if (!cfg.enabled) return false
|
||||
|
||||
const filename = rollingFallbackFilename(cfg.name)
|
||||
const filepath = path.join(cfg.dir, filename)
|
||||
const line = JSON.stringify({
|
||||
received_at: new Date().toISOString(),
|
||||
provider,
|
||||
callback_token_valid: callbackTokenValid,
|
||||
headers,
|
||||
raw_body: rawBody,
|
||||
db_error: dbErrorMessage ?? null,
|
||||
}) + '\n'
|
||||
|
||||
await mkdir(cfg.dir, { recursive: true })
|
||||
await appendFile(filepath, line, 'utf8')
|
||||
return true
|
||||
}
|
||||
@@ -19,6 +19,7 @@ export const db = () => getDb()
|
||||
* force every test to re-create users (slow + noisy).
|
||||
*/
|
||||
const TRUNCATE_TABLES = [
|
||||
'webhook_logs',
|
||||
'pairing_failures',
|
||||
'payment_requests',
|
||||
'chat_request_notifications',
|
||||
|
||||
@@ -187,4 +187,179 @@ describe('POST /api/shared/payment/webhooks/xendit', () => {
|
||||
const [row] = await db()`SELECT status FROM payment_requests WHERE id = ${session.id}`
|
||||
expect(row.status).toBe(PaymentRequestStatus.PENDING)
|
||||
})
|
||||
|
||||
// --- Survival logging --------------------------------------------------
|
||||
// Every inbound webhook lands in `webhook_logs` BEFORE auth/validation, so
|
||||
// we keep a forensic record on rejects, unknown payments, and exceptions.
|
||||
describe('webhook_logs survival audit', () => {
|
||||
const latestLog = async () => {
|
||||
const rows = await db()`SELECT * FROM webhook_logs ORDER BY received_at DESC LIMIT 1`
|
||||
return rows[0]
|
||||
}
|
||||
|
||||
it('PAID: logs row with parsed primary fields + outcome stamped', async () => {
|
||||
const session = await requestPayment({
|
||||
productType: 'chat_session',
|
||||
customerId: customer.id, durationMinutes: 12, amount: 50_000,
|
||||
})
|
||||
|
||||
await fireWebhook(app, {
|
||||
id: 'inv_log_paid',
|
||||
external_id: session.id,
|
||||
status: 'PAID',
|
||||
amount: 50_000,
|
||||
currency: 'IDR',
|
||||
payment_method: 'BCA',
|
||||
paid_at: '2026-05-25T10:00:00.000Z',
|
||||
metadata: { app: 'halobestie_v2' },
|
||||
})
|
||||
|
||||
const log = await latestLog()
|
||||
expect(log.provider).toBe('xendit')
|
||||
expect(log.callback_token_valid).toBe(true)
|
||||
expect(log.xendit_event_id).toBe('inv_log_paid')
|
||||
expect(log.external_id).toBe(session.id)
|
||||
expect(log.payment_request_id).toBe(session.id)
|
||||
expect(log.status).toBe('PAID')
|
||||
expect(Number(log.amount)).toBe(50_000)
|
||||
expect(log.currency).toBe('IDR')
|
||||
expect(log.payment_method).toBe('BCA')
|
||||
expect(log.paid_at).not.toBeNull()
|
||||
expect(log.metadata_app).toBe('halobestie_v2')
|
||||
expect(log.http_status).toBe(200)
|
||||
expect(log.processing_result).toBe('confirmed')
|
||||
expect(log.processing_error).toBeNull()
|
||||
expect(log.processed_at).not.toBeNull()
|
||||
// raw_body is verbatim — unknown future fields would land here
|
||||
expect(log.raw_body.metadata.app).toBe('halobestie_v2')
|
||||
})
|
||||
|
||||
it('401 invalid token still logs a row with callback_token_valid=false', async () => {
|
||||
const res = await fireWebhook(
|
||||
app,
|
||||
{ id: 'inv_log_401', external_id: 'whatever', status: 'PAID', amount: 1 },
|
||||
'wrong-token-but-same-length-padding',
|
||||
)
|
||||
expect(res.statusCode).toBe(401)
|
||||
|
||||
const log = await latestLog()
|
||||
expect(log.callback_token_valid).toBe(false)
|
||||
expect(log.http_status).toBe(401)
|
||||
expect(log.processing_result).toBe('rejected_invalid_token')
|
||||
// Raw body still captured even though the request was rejected
|
||||
expect(log.raw_body.id).toBe('inv_log_401')
|
||||
})
|
||||
|
||||
it('redacts x-callback-token in stored headers', async () => {
|
||||
await fireWebhook(app, { id: 'inv_redact', external_id: 'x', status: 'PAID', amount: 1 })
|
||||
const log = await latestLog()
|
||||
expect(log.headers['x-callback-token']).toBe('[REDACTED]')
|
||||
// Non-secret headers are kept (e.g. content-type)
|
||||
expect(log.headers['content-type']).toMatch(/application\/json/)
|
||||
})
|
||||
|
||||
it('amount mismatch (409) logs rejected_amount_mismatch', async () => {
|
||||
const session = await requestPayment({
|
||||
productType: 'chat_session',
|
||||
customerId: customer.id, durationMinutes: 12, amount: 50_000,
|
||||
})
|
||||
const res = await fireWebhook(app, {
|
||||
id: 'inv_log_mismatch', external_id: session.id, status: 'PAID', amount: 999,
|
||||
})
|
||||
expect(res.statusCode).toBe(409)
|
||||
|
||||
const log = await latestLog()
|
||||
expect(log.http_status).toBe(409)
|
||||
expect(log.processing_result).toBe('rejected_amount_mismatch')
|
||||
expect(Number(log.amount)).toBe(999)
|
||||
})
|
||||
|
||||
it('unknown payment_request still logs ignored_unknown_payment_request', async () => {
|
||||
await fireWebhook(app, {
|
||||
id: 'inv_log_orphan',
|
||||
external_id: '00000000-0000-0000-0000-000000000000',
|
||||
status: 'PAID',
|
||||
amount: 50_000,
|
||||
})
|
||||
const log = await latestLog()
|
||||
expect(log.processing_result).toBe('ignored_unknown_payment_request')
|
||||
expect(log.payment_request_id).toBe('00000000-0000-0000-0000-000000000000')
|
||||
})
|
||||
|
||||
it('missing external_id logs ignored_no_external_id with payment_request_id NULL', async () => {
|
||||
await fireWebhook(app, { id: 'evt_nox', status: 'SOMETHING' })
|
||||
const log = await latestLog()
|
||||
expect(log.processing_result).toBe('ignored_no_external_id')
|
||||
expect(log.external_id).toBeNull()
|
||||
expect(log.payment_request_id).toBeNull()
|
||||
})
|
||||
|
||||
it('non-UUID external_id ACKs 200 and logs ignored_non_uuid_external_id', async () => {
|
||||
const res = await fireWebhook(app, {
|
||||
id: 'inv_log_legacy',
|
||||
external_id: 'legacy-old-app-id-42',
|
||||
status: 'PAID',
|
||||
amount: 1,
|
||||
})
|
||||
expect(res.statusCode).toBe(200)
|
||||
expect(res.json().ignored).toBe('non_uuid_external_id')
|
||||
|
||||
const log = await latestLog()
|
||||
expect(log.external_id).toBe('legacy-old-app-id-42')
|
||||
expect(log.payment_request_id).toBeNull()
|
||||
expect(log.processing_result).toBe('ignored_non_uuid_external_id')
|
||||
expect(log.http_status).toBe(200)
|
||||
})
|
||||
|
||||
it('logs raw_body verbatim including unknown fields', async () => {
|
||||
const session = await requestPayment({
|
||||
productType: 'chat_session',
|
||||
customerId: customer.id, durationMinutes: 12, amount: 50_000,
|
||||
})
|
||||
await fireWebhook(app, {
|
||||
id: 'inv_log_extra',
|
||||
external_id: session.id,
|
||||
status: 'PAID',
|
||||
amount: 50_000,
|
||||
future_field_we_dont_know_yet: { nested: ['anything', 42] },
|
||||
metadata: { app: 'halobestie_v2', extra: 'preserved' },
|
||||
})
|
||||
const log = await latestLog()
|
||||
expect(log.raw_body.future_field_we_dont_know_yet.nested).toEqual(['anything', 42])
|
||||
expect(log.raw_body.metadata.extra).toBe('preserved')
|
||||
expect(log.metadata_app).toBe('halobestie_v2')
|
||||
})
|
||||
|
||||
it('survives processing exceptions: logs processing_error + http_status=500', async () => {
|
||||
// Force an exception by stubbing getPayment to throw. The survival
|
||||
// contract is that even when the handler errors, the forensic row is
|
||||
// intact and stamped — Xendit will retry on its own, but we have the
|
||||
// record either way.
|
||||
const paymentSvc = await import('../../src/services/payment.service.js')
|
||||
const orig = paymentSvc.getPayment
|
||||
const stub = vi.spyOn(paymentSvc, 'getPayment').mockImplementation(async () => {
|
||||
throw new Error('synthetic_db_outage')
|
||||
})
|
||||
try {
|
||||
const res = await fireWebhook(app, {
|
||||
id: 'inv_boom',
|
||||
external_id: '11111111-1111-1111-1111-111111111111',
|
||||
status: 'PAID',
|
||||
amount: 50_000,
|
||||
})
|
||||
expect(res.statusCode).toBe(500)
|
||||
|
||||
const log = await latestLog()
|
||||
expect(log.http_status).toBe(500)
|
||||
expect(log.processing_result).toBe('error')
|
||||
expect(log.processing_error).toBe('synthetic_db_outage')
|
||||
expect(log.xendit_event_id).toBe('inv_boom')
|
||||
// Raw body is still there even though processing blew up
|
||||
expect(log.raw_body.id).toBe('inv_boom')
|
||||
} finally {
|
||||
stub.mockRestore()
|
||||
void orig
|
||||
}
|
||||
})
|
||||
})
|
||||
})
|
||||
|
||||
@@ -224,6 +224,27 @@ describe('mitra-status valkey mirror', () => {
|
||||
|
||||
expect(await isMitraReachable(m.id)).toBe(false)
|
||||
})
|
||||
|
||||
// Mirrors the autoOfflineStaleMitras "no-op when require_ping=false"
|
||||
// contract: read paths must not gate on heartbeat when sweep doesn't.
|
||||
it('returns true with stale heartbeat when require_ping=false', async () => {
|
||||
const sql = db()
|
||||
try {
|
||||
await sql`
|
||||
UPDATE app_config SET value=${sql.json({ value: false })}
|
||||
WHERE key='require_mitra_ping'
|
||||
`
|
||||
const m = await createMitra({ callName: 'NoPing', isOnline: true })
|
||||
await v().set(vkHeartbeatKey(m.id), new Date(Date.now() - 3_600_000).toISOString())
|
||||
|
||||
expect(await isMitraReachable(m.id)).toBe(true)
|
||||
} finally {
|
||||
await sql`
|
||||
UPDATE app_config SET value=${sql.json({ value: true })}
|
||||
WHERE key='require_mitra_ping'
|
||||
`
|
||||
}
|
||||
})
|
||||
})
|
||||
|
||||
// ---------- recomputeCapacity ----------
|
||||
@@ -331,6 +352,35 @@ describe('mitra-status valkey mirror', () => {
|
||||
await invalidateAvailabilityCache()
|
||||
expect(await v().get('availability:snapshot')).toBeNull()
|
||||
})
|
||||
|
||||
// Regression: when an operator turns off the ping requirement, the
|
||||
// auto-offline sweep is also disabled, so heartbeats may legitimately
|
||||
// become arbitrarily old. The beacon must NOT filter those out — that
|
||||
// would put the CTA in a permanently-disabled state with no recovery
|
||||
// path (sweep won't remove the mitra; cache always re-computes false).
|
||||
it('includes mitras with stale heartbeats when require_ping=false', async () => {
|
||||
const sql = db()
|
||||
try {
|
||||
await sql`
|
||||
UPDATE app_config SET value=${sql.json({ value: false })}
|
||||
WHERE key='require_mitra_ping'
|
||||
`
|
||||
const m = await createMitra({ callName: 'NoPingRequired', isOnline: true })
|
||||
// Heartbeat 1 hour old — well past any reasonable stale_after_seconds.
|
||||
await v().set(vkHeartbeatKey(m.id), new Date(Date.now() - 3_600_000).toISOString())
|
||||
await v().del('availability:snapshot')
|
||||
|
||||
const result = await countAvailableMitrasFromCache()
|
||||
expect(result.available).toBe(true)
|
||||
expect(result.count).toBe(1)
|
||||
} finally {
|
||||
await sql`
|
||||
UPDATE app_config SET value=${sql.json({ value: true })}
|
||||
WHERE key='require_mitra_ping'
|
||||
`
|
||||
await v().del('availability:snapshot')
|
||||
}
|
||||
})
|
||||
})
|
||||
|
||||
// ---------- autoOfflineStaleMitras ----------
|
||||
|
||||
184
backend/test/services/webhook-log.service.test.js
Normal file
184
backend/test/services/webhook-log.service.test.js
Normal file
@@ -0,0 +1,184 @@
|
||||
import { describe, it, expect, afterEach, beforeEach } from 'vitest'
|
||||
import { mkdtemp, readFile, rm, readdir } from 'node:fs/promises'
|
||||
import { tmpdir } from 'node:os'
|
||||
import path from 'node:path'
|
||||
|
||||
const {
|
||||
rollingFallbackFilename,
|
||||
getWebhookFallbackConfig,
|
||||
writeWebhookFallback,
|
||||
sanitizeHeaders,
|
||||
} = await import('../../src/services/webhook-log.service.js')
|
||||
|
||||
describe('webhook-log.service helpers', () => {
|
||||
describe('sanitizeHeaders', () => {
|
||||
it('redacts secret-bearing headers but keeps their names', () => {
|
||||
const out = sanitizeHeaders({
|
||||
'x-callback-token': 'shhhh',
|
||||
'Authorization': 'Bearer abc',
|
||||
'cookie': 'sid=xyz',
|
||||
'content-type': 'application/json',
|
||||
'x-request-id': 'req-1',
|
||||
})
|
||||
expect(out['x-callback-token']).toBe('[REDACTED]')
|
||||
expect(out['authorization']).toBe('[REDACTED]')
|
||||
expect(out['cookie']).toBe('[REDACTED]')
|
||||
expect(out['content-type']).toBe('application/json')
|
||||
expect(out['x-request-id']).toBe('req-1')
|
||||
})
|
||||
|
||||
it('lowercases keys so lookups are consistent', () => {
|
||||
const out = sanitizeHeaders({ 'X-Callback-Token': 't', 'Content-Type': 'app/json' })
|
||||
expect(out['x-callback-token']).toBe('[REDACTED]')
|
||||
expect(out['content-type']).toBe('app/json')
|
||||
expect(out['X-Callback-Token']).toBeUndefined()
|
||||
})
|
||||
})
|
||||
|
||||
describe('rollingFallbackFilename', () => {
|
||||
it('produces "<name>-YYYY-MM-DD.jsonl" using UTC day boundary', () => {
|
||||
const d = new Date(Date.UTC(2026, 4, 25, 10, 30))
|
||||
expect(rollingFallbackFilename('xendit', d)).toBe('xendit-2026-05-25.jsonl')
|
||||
})
|
||||
|
||||
it('zero-pads single-digit months and days', () => {
|
||||
const d = new Date(Date.UTC(2026, 0, 3, 0, 0))
|
||||
expect(rollingFallbackFilename('wh', d)).toBe('wh-2026-01-03.jsonl')
|
||||
})
|
||||
|
||||
it('a date right before UTC midnight stays on the current day', () => {
|
||||
const d = new Date(Date.UTC(2026, 4, 25, 23, 59, 59))
|
||||
expect(rollingFallbackFilename('x', d)).toBe('x-2026-05-25.jsonl')
|
||||
})
|
||||
})
|
||||
|
||||
describe('getWebhookFallbackConfig', () => {
|
||||
const originalEnv = { ...process.env }
|
||||
afterEach(() => {
|
||||
// Restore only the vars this suite touches — leaving the rest alone.
|
||||
for (const k of ['XENDIT_WEBHOOK_FALLBACK_ENABLED', 'XENDIT_WEBHOOK_FALLBACK_DIR', 'XENDIT_WEBHOOK_FALLBACK_NAME']) {
|
||||
if (originalEnv[k] === undefined) delete process.env[k]
|
||||
else process.env[k] = originalEnv[k]
|
||||
}
|
||||
})
|
||||
|
||||
it('defaults to disabled with sensible dir/name', () => {
|
||||
delete process.env.XENDIT_WEBHOOK_FALLBACK_ENABLED
|
||||
delete process.env.XENDIT_WEBHOOK_FALLBACK_DIR
|
||||
delete process.env.XENDIT_WEBHOOK_FALLBACK_NAME
|
||||
expect(getWebhookFallbackConfig()).toEqual({
|
||||
enabled: false,
|
||||
dir: './logs',
|
||||
name: 'xendit-webhook-fallback',
|
||||
})
|
||||
})
|
||||
|
||||
it('only the literal string "true" enables the sink', () => {
|
||||
process.env.XENDIT_WEBHOOK_FALLBACK_ENABLED = '1'
|
||||
expect(getWebhookFallbackConfig().enabled).toBe(false)
|
||||
process.env.XENDIT_WEBHOOK_FALLBACK_ENABLED = 'yes'
|
||||
expect(getWebhookFallbackConfig().enabled).toBe(false)
|
||||
process.env.XENDIT_WEBHOOK_FALLBACK_ENABLED = 'true'
|
||||
expect(getWebhookFallbackConfig().enabled).toBe(true)
|
||||
})
|
||||
|
||||
it('honors custom dir/name', () => {
|
||||
process.env.XENDIT_WEBHOOK_FALLBACK_ENABLED = 'true'
|
||||
process.env.XENDIT_WEBHOOK_FALLBACK_DIR = '/var/log/webhooks'
|
||||
process.env.XENDIT_WEBHOOK_FALLBACK_NAME = 'xendit-prod'
|
||||
expect(getWebhookFallbackConfig()).toEqual({
|
||||
enabled: true,
|
||||
dir: '/var/log/webhooks',
|
||||
name: 'xendit-prod',
|
||||
})
|
||||
})
|
||||
})
|
||||
|
||||
describe('writeWebhookFallback', () => {
|
||||
let workDir
|
||||
const originalEnv = { ...process.env }
|
||||
|
||||
beforeEach(async () => {
|
||||
workDir = await mkdtemp(path.join(tmpdir(), 'webhook-fallback-'))
|
||||
})
|
||||
|
||||
afterEach(async () => {
|
||||
for (const k of ['XENDIT_WEBHOOK_FALLBACK_ENABLED', 'XENDIT_WEBHOOK_FALLBACK_DIR', 'XENDIT_WEBHOOK_FALLBACK_NAME']) {
|
||||
if (originalEnv[k] === undefined) delete process.env[k]
|
||||
else process.env[k] = originalEnv[k]
|
||||
}
|
||||
await rm(workDir, { recursive: true, force: true })
|
||||
})
|
||||
|
||||
it('returns false and writes nothing when disabled', async () => {
|
||||
delete process.env.XENDIT_WEBHOOK_FALLBACK_ENABLED
|
||||
const ok = await writeWebhookFallback({
|
||||
provider: 'xendit',
|
||||
headers: { 'content-type': 'application/json' },
|
||||
rawBody: { id: 'inv_x' },
|
||||
callbackTokenValid: true,
|
||||
})
|
||||
expect(ok).toBe(false)
|
||||
expect(await readdir(workDir)).toEqual([])
|
||||
})
|
||||
|
||||
it('appends one JSONL line per call to the rolling daily file', async () => {
|
||||
process.env.XENDIT_WEBHOOK_FALLBACK_ENABLED = 'true'
|
||||
process.env.XENDIT_WEBHOOK_FALLBACK_DIR = workDir
|
||||
process.env.XENDIT_WEBHOOK_FALLBACK_NAME = 'test-fallback'
|
||||
|
||||
await writeWebhookFallback({
|
||||
provider: 'xendit',
|
||||
headers: { 'content-type': 'application/json', 'x-callback-token': '[REDACTED]' },
|
||||
rawBody: { id: 'inv_a', external_id: 'pr-1', status: 'PAID' },
|
||||
callbackTokenValid: true,
|
||||
dbErrorMessage: 'connection refused',
|
||||
})
|
||||
await writeWebhookFallback({
|
||||
provider: 'xendit',
|
||||
headers: {},
|
||||
rawBody: { id: 'inv_b', status: 'EXPIRED' },
|
||||
callbackTokenValid: false,
|
||||
})
|
||||
|
||||
const files = await readdir(workDir)
|
||||
expect(files).toHaveLength(1)
|
||||
expect(files[0]).toMatch(/^test-fallback-\d{4}-\d{2}-\d{2}\.jsonl$/)
|
||||
|
||||
const contents = await readFile(path.join(workDir, files[0]), 'utf8')
|
||||
const lines = contents.trim().split('\n')
|
||||
expect(lines).toHaveLength(2)
|
||||
|
||||
const first = JSON.parse(lines[0])
|
||||
expect(first.provider).toBe('xendit')
|
||||
expect(first.callback_token_valid).toBe(true)
|
||||
expect(first.raw_body.id).toBe('inv_a')
|
||||
expect(first.raw_body.external_id).toBe('pr-1')
|
||||
expect(first.db_error).toBe('connection refused')
|
||||
expect(first.headers['x-callback-token']).toBe('[REDACTED]')
|
||||
expect(first.received_at).toMatch(/^\d{4}-\d{2}-\d{2}T/)
|
||||
|
||||
const second = JSON.parse(lines[1])
|
||||
expect(second.raw_body.id).toBe('inv_b')
|
||||
expect(second.callback_token_valid).toBe(false)
|
||||
expect(second.db_error).toBeNull()
|
||||
})
|
||||
|
||||
it('creates the destination directory if it does not exist', async () => {
|
||||
process.env.XENDIT_WEBHOOK_FALLBACK_ENABLED = 'true'
|
||||
const nested = path.join(workDir, 'does', 'not', 'exist', 'yet')
|
||||
process.env.XENDIT_WEBHOOK_FALLBACK_DIR = nested
|
||||
process.env.XENDIT_WEBHOOK_FALLBACK_NAME = 'nested'
|
||||
|
||||
const ok = await writeWebhookFallback({
|
||||
provider: 'xendit',
|
||||
headers: {},
|
||||
rawBody: { id: 'inv_n' },
|
||||
callbackTokenValid: true,
|
||||
})
|
||||
expect(ok).toBe(true)
|
||||
const files = await readdir(nested)
|
||||
expect(files).toHaveLength(1)
|
||||
})
|
||||
})
|
||||
})
|
||||
Reference in New Issue
Block a user