Belt-and-suspenders, not a bug fix: storage (timestamptz) and timer math are already tz-independent. Add SERVER_TZ env (default UTC) via getServerTimezone(); db/client.js pins the DB session timezone (reads env directly to avoid an import cycle); server.js pins process.env.TZ and asserts at boot that the DB session matches (logs [tz] or a loud warning). Keeps any future date_trunc/::date reporting deterministic and surfaces a misconfigured server early. Documented in backend/CLAUDE.md + .env.example. Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
156 lines
6.8 KiB
JavaScript
156 lines
6.8 KiB
JavaScript
import 'dotenv/config'
|
|
|
|
import { buildPublicApp } from './app.public.js'
|
|
import { buildInternalApp } from './app.internal.js'
|
|
import { autoOfflineStaleMitras, seedFromPostgres, mirrorHeartbeatsToPostgres } from './services/mitra-status.service.js'
|
|
import {
|
|
getMitraAutoOfflineSweepSeconds,
|
|
getHeartbeatMirrorIntervalSeconds,
|
|
getValkeyOnlineMirrorSweepSeconds,
|
|
} from './services/config.service.js'
|
|
import { initFirebase } from './plugins/firebase.js'
|
|
import { restoreActiveTimers } from './services/session-timer.service.js'
|
|
import { expireStalePaymentRequests, registerPairingSubscriber } from './services/payment.service.js'
|
|
import { getXenditConfig, getServerTimezone } from './services/config.service.js'
|
|
import { getDb } from './db/client.js'
|
|
|
|
const PUBLIC_PORT = process.env.PUBLIC_PORT || 3000
|
|
const INTERNAL_PORT = process.env.INTERNAL_PORT || 3001
|
|
const INTERNAL_HOST = process.env.INTERNAL_HOST || '127.0.0.1'
|
|
|
|
const start = async () => {
|
|
// Timezone assurance. Storage (timestamptz) and our instant-based timer math
|
|
// are timezone-independent, so this is belt-and-suspenders, not a fix for a
|
|
// live bug: it pins the Node process timezone for any Date formatting and
|
|
// then asserts the DB session timezone matches, so a misconfigured server/DB
|
|
// surfaces loudly at boot instead of silently skewing future date_trunc /
|
|
// ::date style queries. Defaults to UTC; override via SERVER_TZ.
|
|
const serverTz = getServerTimezone()
|
|
process.env.TZ = serverTz
|
|
const [dbTz] = await getDb()`SHOW timezone`
|
|
if (dbTz?.TimeZone !== serverTz) {
|
|
console.warn(
|
|
`[tz] WARNING: DB session timezone is "${dbTz?.TimeZone}" but SERVER_TZ="${serverTz}". ` +
|
|
'timestamptz storage is unaffected, but session-tz-dependent SQL may skew. ' +
|
|
'Check the DB server default / connection options.'
|
|
)
|
|
} else {
|
|
console.log(`[tz] process + DB session pinned to ${serverTz}`)
|
|
}
|
|
|
|
// Phase 5: fail fast if XENDIT_ENABLED=true without the required credentials.
|
|
// Bad config explodes at startup rather than at the first /payment-requests POST.
|
|
const xc = getXenditConfig()
|
|
if (xc.enabled) {
|
|
if (!xc.secretKey) throw new Error('XENDIT_ENABLED=true requires XENDIT_SECRET_KEY')
|
|
if (!xc.webhookToken || xc.webhookToken.length < 16) {
|
|
throw new Error('XENDIT_ENABLED=true requires XENDIT_WEBHOOK_TOKEN (>= 16 chars)')
|
|
}
|
|
}
|
|
|
|
initFirebase()
|
|
|
|
const publicApp = await buildPublicApp()
|
|
const internalApp = await buildInternalApp()
|
|
|
|
// restoreActiveTimers runs bulk UPDATEs on chat_sessions to clean up stale
|
|
// ACTIVE/CLOSING rows from before the restart. Run it BEFORE seedFromPostgres
|
|
// so the seed sees the post-cleanup state and capacity counters are accurate.
|
|
await restoreActiveTimers()
|
|
await seedFromPostgres()
|
|
|
|
await publicApp.listen({ port: PUBLIC_PORT, host: '0.0.0.0' })
|
|
console.log(`Public API listening on port ${PUBLIC_PORT}`)
|
|
|
|
await internalApp.listen({ port: INTERNAL_PORT, host: INTERNAL_HOST })
|
|
console.log(`Internal API listening on ${INTERNAL_HOST}:${INTERNAL_PORT}`)
|
|
|
|
// Phase 5: wire pairing service as a subscriber to payment_request.confirmed events.
|
|
// Must happen AFTER all services are loaded so the subscriber registration sees
|
|
// the EventEmitter set up by payment.service.js at module-load time.
|
|
registerPairingSubscriber()
|
|
|
|
// Phase 5: catch any payment_request.confirmed events that were lost across a restart
|
|
// by running the reconciliation sweeper immediately on boot. Without this, a customer
|
|
// whose payment confirmed during shutdown could be stranded for up to 60s waiting on
|
|
// the next sweeper tick.
|
|
try {
|
|
const result = await expireStalePaymentRequests()
|
|
if (result.expired > 0 || result.failed > 0 || result.reconciled > 0) {
|
|
console.log(`Startup reconciliation: ${result.expired} expired, ${result.failed} failed_delivery, ${result.reconciled} re-triggered`)
|
|
}
|
|
} catch (err) {
|
|
console.error('Startup reconciliation failed:', err)
|
|
}
|
|
|
|
// Auto-offline mitras with stale heartbeat (env-driven cadence, default 30s).
|
|
// Valkey-driven per requirement/valkey-online-mirror-plan.md.
|
|
setInterval(async () => {
|
|
try {
|
|
const count = await autoOfflineStaleMitras()
|
|
if (count > 0) console.log(`Auto-offlined ${count} stale mitra(s)`)
|
|
} catch (err) {
|
|
console.error('Auto-offline check failed:', err)
|
|
}
|
|
}, getMitraAutoOfflineSweepSeconds() * 1000)
|
|
|
|
// Batched heartbeat mirror: Valkey heartbeat timestamps → Postgres
|
|
// last_heartbeat_at (default 60s). Keeps forensic column current without
|
|
// per-ping DB writes. One UNNEST UPDATE per tick; idempotent across instances.
|
|
setInterval(async () => {
|
|
try {
|
|
await mirrorHeartbeatsToPostgres()
|
|
} catch (err) {
|
|
console.error('Heartbeat mirror failed:', err)
|
|
}
|
|
}, getHeartbeatMirrorIntervalSeconds() * 1000)
|
|
|
|
// Reconciliation sweep: heal Valkey/Postgres drift (default 300s; 0 disables).
|
|
// Belt-and-braces against failed best-effort Valkey writes, out-of-band
|
|
// Postgres mutations, evictions. Idempotent — just runs the seed.
|
|
const reconciliationSeconds = getValkeyOnlineMirrorSweepSeconds()
|
|
if (reconciliationSeconds > 0) {
|
|
setInterval(async () => {
|
|
try {
|
|
await seedFromPostgres()
|
|
} catch (err) {
|
|
console.error('Valkey reconciliation sweep failed:', err)
|
|
}
|
|
}, reconciliationSeconds * 1000)
|
|
}
|
|
|
|
// Expire stale payment_requests + reconcile lost subscriber work (every 60s).
|
|
// Pending past expires_at → expired (no failure row).
|
|
// Confirmed-but-stale → failed_delivery (writes a pairing_failures row).
|
|
// Confirmed-with-no-chat-session-yet → re-trigger the pairing subscriber (recovery from
|
|
// lost EventEmitter notifications across restart). See payment.service.js for details.
|
|
setInterval(async () => {
|
|
try {
|
|
const result = await expireStalePaymentRequests()
|
|
if (result.expired > 0 || result.failed > 0 || result.reconciled > 0) {
|
|
console.log(`Payment sweeper: ${result.expired} expired, ${result.failed} failed_delivery, ${result.reconciled} re-triggered`)
|
|
}
|
|
} catch (err) {
|
|
console.error('Payment request sweeper failed:', err)
|
|
}
|
|
}, 60_000)
|
|
}
|
|
|
|
// SIGTERM trap — Cloud Run gives ~10s grace before SIGKILL. Use it to drain in-flight
|
|
// EventEmitter handlers (Stage 5 of phase5-xendit-plan.md). app.close() stops accepting
|
|
// new requests; the timeout gives subscribers their last chance to finish.
|
|
const shutdown = async () => {
|
|
console.log('SIGTERM received — closing servers, draining handlers')
|
|
// App handles are scoped inside start(); fire-and-forget here is fine because both
|
|
// listeners' .close() is idempotent and process.exit truncates anything still pending.
|
|
await new Promise(r => setTimeout(r, 8_000))
|
|
process.exit(0)
|
|
}
|
|
process.on('SIGTERM', shutdown)
|
|
process.on('SIGINT', shutdown)
|
|
|
|
start().catch((err) => {
|
|
console.error(err)
|
|
process.exit(1)
|
|
})
|