// Phase 5: Payment service — single owner of the payment_requests table + Xendit integration. // // Public API surface (the future microservice contract): // // requestPayment({ productType, productMetadata, customerId, amount, ttlMinutes, ... }) // → inserts row, optionally creates Xendit invoice, returns row (with invoice_url if Xendit on) // // confirmPayment(paymentRequestId, xenditMeta = {}) // → pending → confirmed; emits 'payment_request.confirmed' // // expirePayment(paymentRequestId) // → pending → expired (webhook-callable, no customer check); emits 'payment_request.expired' // // cancelPayment(paymentRequestId, customerId) // → customer-initiated pending → abandoned; emits 'payment_request.cancelled' // // markDeliveryFailed(paymentRequestId, causeTag) // → confirmed → failed_delivery; writes pairing_failures row; emits 'payment_request.delivery_failed' // // consumePayment(paymentRequestId) // → confirmed → consumed; no event (terminal success) // // getPayment(id) → row or null // getCustomerPendingPayments(customerId) → { items, total } // expireStalePaymentRequests() → background sweeper + reconciliation // // on(eventName, handler) → subscribe to lifecycle events // verifyWebhookToken(headerToken) → constant-time compare for webhook auth (used by route) // // registerPairingSubscriber() → wires pairing.service as a subscriber to payment_request.confirmed // recordIntermediateFailure(...) → audit-only failure for flows with a fallback path // // Internals (NOT exported): // createXenditInvoice() — wraps xendit-node SDK // emit() / emitter — EventEmitter setup // // Events emit AFTER the DB transition commits. Subscribers run on the next tick // (handlers wrapped fire-and-forget) so the publisher is never blocked. // // Durability story: events are in-process EventEmitter; lost on process death. // The reconciliation sweeper (expireStalePaymentRequests) re-derives missed work // from DB state every minute + on startup. Subscribers MUST be idempotent. // See requirement/phase5-xendit-plan.md "Event durability" section. import { EventEmitter } from 'node:events' import { Xendit } from 'xendit-node' import { getDb } from '../db/client.js' import { PaymentRequestStatus, PairingFailureCause, UserType, WsMessage, SessionMode } from '../constants.js' import { recordFailure } from './pairing-failure.service.js' import { sendToUser } from '../plugins/websocket.js' import { sendPushNotification } from './notification.service.js' import { getPaymentRequestTimeoutMinutes as readPaymentRequestTimeoutMinutes, getXenditConfig, } from './config.service.js' const sql = getDb() // --- EventEmitter setup --- const emitter = new EventEmitter() // Bump default 10-listener cap so future product subscribers don't trigger the leak warning emitter.setMaxListeners(50) export const on = (eventName, handler) => { emitter.on(eventName, (payload) => { // Wrap every handler so an unhandled throw doesn't kill the process AND so handlers // run async-fire-and-forget (don't block the publisher's emit() return). Errors are // logged; recovery is the sweeper's job. Promise.resolve() .then(() => handler(payload)) .catch((err) => console.error(`[payment event ${eventName}] handler failed`, err)) }) } const emit = (eventName, payload) => emitter.emit(eventName, payload) // --- Internal Xendit client (lazy + re-creatable for test env stubbing) --- let _xenditClient = null let _xenditKey = null const xenditClient = () => { const { secretKey } = getXenditConfig() if (_xenditClient && _xenditKey === secretKey) return _xenditClient _xenditClient = new Xendit({ secretKey }) _xenditKey = secretKey return _xenditClient } const createXenditInvoice = async ({ paymentRequestId, amount, ttlMinutes, description }) => { const { successRedirectUrl, failureRedirectUrl } = getXenditConfig() const inv = await xenditClient().Invoice.createInvoice({ data: { externalId: paymentRequestId, // D4 — our UUID is the Xendit external_id amount, description, invoiceDuration: Math.floor(ttlMinutes * 60), // D5 — TTL mirrors session timeout currency: 'IDR', successRedirectUrl: successRedirectUrl || undefined, failureRedirectUrl: failureRedirectUrl || undefined, // Stamped so a shared webhook router (no DB access) can route v1 vs v2 traffic // purely from the echoed payload. Keep this string stable — it is a routing key. metadata: { app: 'halobestie_v2' }, // paymentMethods omitted → honor dashboard config (operator picks methods without a deploy) }, }) return { invoiceId: inv.id, invoiceUrl: inv.invoiceUrl } } // Used by the webhook route to authenticate Xendit's x-callback-token header. export const verifyWebhookToken = (headerToken) => { const { webhookToken } = getXenditConfig() if (!headerToken || !webhookToken) return false if (typeof headerToken !== 'string') return false if (headerToken.length !== webhookToken.length) return false let mismatch = 0 for (let i = 0; i < headerToken.length; i++) { mismatch |= headerToken.charCodeAt(i) ^ webhookToken.charCodeAt(i) } return mismatch === 0 } // Test-only — drop cached client so vi.stubEnv changes take effect. export const _resetXenditClientForTest = () => { _xenditClient = null _xenditKey = null } // --- Helpers --- const getPaymentRequestTimeoutMinutes = async () => { const { payment_request_timeout_minutes } = await readPaymentRequestTimeoutMinutes() return payment_request_timeout_minutes } const buildEventPayload = (row) => ({ paymentRequestId: row.id, productType: row.product_type ?? 'chat_session', productMetadata: row.product_metadata ?? {}, customerId: row.customer_id, amount: row.amount, xenditInvoiceId: row.xendit_invoice_id ?? null, xenditPaymentMethod: row.xendit_payment_method ?? null, }) const buildInvoiceDescription = (row) => { if (row.product_type === 'chat_session') { return row.is_extension ? `Perpanjangan sesi ${row.duration_minutes} menit` : `Sesi ${row.duration_minutes} menit` } // Generic fallback — future products can build their own descriptions and pass via productMetadata return row.product_metadata?.description ?? `Pembayaran ${row.product_type}` } // --- requestPayment: insert pending row + (if Xendit on) mint invoice --- /** * Create a new payment request in `pending` status. When XENDIT_ENABLED=true, also * creates a Xendit Invoice and stamps invoice_id + invoice_url on the row. * * Product-agnostic: callers stamp `productType` + `productMetadata`. The legacy * top-level chat-specific args (durationMinutes, mode, isExtension, targetedMitraId, * isFirstSessionDiscount) are accepted for backward compat with existing chat code, * and also written into product_metadata when productType === 'chat_session'. */ export const requestPayment = async ({ productType = 'chat_session', productMetadata = {}, customerId, amount, ttlMinutes, // Chat-specific legacy fields (still written to top-level columns for now) durationMinutes, mode = SessionMode.CHAT, isFirstSessionDiscount = false, isExtension = false, targetedMitraId = null, // Customer's pre-picked payment method from the catalog. Optional; // upper-cased Xendit channel code (e.g. `OVO`). Stamped onto // product_metadata for analytics + future use as a Xendit `paymentMethods` // filter. Not currently passed to Xendit invoice creation — the customer // re-picks on Xendit's checkout page. preferredPaymentCode = null, }) => { if (!customerId) { throw Object.assign(new Error('customerId is required'), { code: 'VALIDATION_ERROR', statusCode: 422 }) } if (typeof amount !== 'number' || amount < 0) { throw Object.assign(new Error('amount must be a non-negative number'), { code: 'VALIDATION_ERROR', statusCode: 422 }) } if (productType === 'chat_session') { if (typeof durationMinutes !== 'number' || durationMinutes <= 0) { throw Object.assign(new Error('durationMinutes must be a positive number for chat_session'), { code: 'VALIDATION_ERROR', statusCode: 422 }) } if (mode !== SessionMode.CHAT && mode !== SessionMode.CALL) { throw Object.assign(new Error('mode must be chat or call'), { code: 'VALIDATION_ERROR', statusCode: 422 }) } } const ttl = ttlMinutes ?? await getPaymentRequestTimeoutMinutes() // For chat_session, fold legacy args into product_metadata so the canonical // location is the JSONB blob. Subscribers read from product_metadata. const meta = productType === 'chat_session' ? { duration_minutes: durationMinutes, mode, is_extension: isExtension, targeted_mitra_id: targetedMitraId, is_first_session_discount: isFirstSessionDiscount, preferred_payment_code: preferredPaymentCode, ...productMetadata, } : { preferred_payment_code: preferredPaymentCode, ...productMetadata } const [row] = await sql` INSERT INTO payment_requests ( customer_id, amount, duration_minutes, is_first_session_discount, is_extension, status, targeted_mitra_id, mode, expires_at, product_type, product_metadata ) VALUES ( ${customerId}, ${amount}, ${durationMinutes ?? 0}, ${isFirstSessionDiscount}, ${isExtension}, ${PaymentRequestStatus.PENDING}, ${targetedMitraId}, ${mode}, NOW() + (${ttl} || ' minutes')::interval, ${productType}, ${sql.json(meta)} ) RETURNING * ` // If Xendit is on, create the invoice + stamp the row. If creation fails, mark // the row as `failed` (NOT `expired` — distinct: "we never got an invoice at all" // vs. "TTL elapsed unpaid") and surface a 502 to the caller. const xc = getXenditConfig() if (xc.enabled) { try { const { invoiceId, invoiceUrl } = await createXenditInvoice({ paymentRequestId: row.id, amount: row.amount, ttlMinutes: ttl, description: buildInvoiceDescription(row), }) await sql` UPDATE payment_requests SET xendit_invoice_id = ${invoiceId}, xendit_invoice_url = ${invoiceUrl} WHERE id = ${row.id} ` row.xendit_invoice_id = invoiceId row.xendit_invoice_url = invoiceUrl } catch (err) { console.error('[xendit] createInvoice failed; marking payment_request failed', { paymentRequestId: row.id, err }) const [failed] = await sql` UPDATE payment_requests SET status = ${PaymentRequestStatus.FAILED} WHERE id = ${row.id} AND status = ${PaymentRequestStatus.PENDING} RETURNING * ` if (failed) emit('payment_request.failed', buildEventPayload(failed)) throw Object.assign(new Error('Payment provider error'), { code: 'PAYMENT_PROVIDER_ERROR', statusCode: 502, cause: err, }) } } return row } // --- confirmPayment --- /** * Transition pending → confirmed. Customer-facing callers (the legacy * /payment-requests/:id/confirm route) verify customer ownership themselves before * calling. Webhook caller does not check ownership (Xendit's authority). * * Optional xenditMeta stamps payment-time data from the webhook body: * { invoiceId, paymentMethod, amount } * * Throws on missing row / wrong status / expiry. Webhook handler swallows * INVALID_STATE (already confirmed) and EXPIRED (raced sweeper) and ACKs. * * Emits 'payment_request.confirmed' after commit. */ export const confirmPayment = async (paymentRequestId, xenditMeta = {}) => { const [existing] = await sql` SELECT id, customer_id, status, expires_at FROM payment_requests WHERE id = ${paymentRequestId} ` if (!existing) { throw Object.assign(new Error('Payment request not found'), { code: 'NOT_FOUND', statusCode: 404 }) } if (existing.status !== PaymentRequestStatus.PENDING) { throw Object.assign(new Error(`Payment request is ${existing.status}, cannot confirm`), { code: 'INVALID_STATE', statusCode: 409, }) } if (new Date(existing.expires_at) <= new Date()) { // Inline expiry check (sweeper hasn't run yet) await sql` UPDATE payment_requests SET status = ${PaymentRequestStatus.EXPIRED} WHERE id = ${paymentRequestId} AND status = ${PaymentRequestStatus.PENDING} ` throw Object.assign(new Error('Payment request has expired'), { code: 'EXPIRED', statusCode: 409 }) } const [updated] = await sql` UPDATE payment_requests SET status = ${PaymentRequestStatus.CONFIRMED}, confirmed_at = NOW(), xendit_invoice_id = COALESCE(${xenditMeta.invoiceId ?? null}, xendit_invoice_id), xendit_payment_method = COALESCE(${xenditMeta.paymentMethod ?? null}, xendit_payment_method), xendit_paid_amount = COALESCE(${xenditMeta.amount ?? null}, xendit_paid_amount) WHERE id = ${paymentRequestId} AND status = ${PaymentRequestStatus.PENDING} RETURNING * ` if (!updated) { throw Object.assign(new Error('Payment request state changed during confirm'), { code: 'CONFLICT', statusCode: 409 }) } emit('payment_request.confirmed', buildEventPayload(updated)) return updated } // Customer-facing wrapper used by the legacy /payment-requests/:id/confirm route // (kept only for dev/Maestro — production gates the route on XENDIT_ENABLED). // Verifies customer ownership before delegating to the internal confirmPayment. export const confirmPaymentForCustomer = async (paymentRequestId, customerId) => { const [existing] = await sql`SELECT customer_id FROM payment_requests WHERE id = ${paymentRequestId}` if (!existing) { throw Object.assign(new Error('Payment request not found'), { code: 'NOT_FOUND', statusCode: 404 }) } if (existing.customer_id !== customerId) { throw Object.assign(new Error('Payment request does not belong to this customer'), { code: 'FORBIDDEN', statusCode: 403 }) } return confirmPayment(paymentRequestId) } // --- expirePayment (webhook-callable; no ownership check) --- /** * Transition pending → expired. Called by the Xendit EXPIRED webhook handler * and by the background sweeper. Idempotent — if already terminal, no-op. * Emits 'payment_request.expired' if a transition occurred. */ export const expirePayment = async (paymentRequestId) => { const [updated] = await sql` UPDATE payment_requests SET status = ${PaymentRequestStatus.EXPIRED} WHERE id = ${paymentRequestId} AND status = ${PaymentRequestStatus.PENDING} RETURNING * ` if (updated) emit('payment_request.expired', buildEventPayload(updated)) return updated ?? null } // --- consumePayment --- /** * Transition confirmed → consumed. Called by product code (pairing service for chat) * after successful delivery. No event — terminal success. */ export const consumePayment = async (paymentRequestId) => { const [updated] = await sql` UPDATE payment_requests SET status = ${PaymentRequestStatus.CONSUMED}, consumed_at = NOW() WHERE id = ${paymentRequestId} AND status = ${PaymentRequestStatus.CONFIRMED} RETURNING id, status, consumed_at ` return updated ?? null } // --- markDeliveryFailed --- /** * TERMINAL: mark a confirmed payment as failed_delivery + write a pairing_failures * audit row. Idempotent: no-op if not currently confirmed. * * Use only when no fallback is possible (no mitra available, all rejected, etc.). * For intermediate failures with a fallback (targeted reject during returning-chat), * use recordIntermediateFailure() which keeps the payment confirmed. * * Emits 'payment_request.delivery_failed' on transition. */ export const markDeliveryFailed = async (paymentRequestId, causeTag) => { if (!Object.values(PairingFailureCause).includes(causeTag)) { throw Object.assign(new Error(`Unknown cause_tag: ${causeTag}`), { code: 'VALIDATION_ERROR', statusCode: 422 }) } const [existing] = await sql` SELECT id, customer_id, targeted_mitra_id, amount, status, product_type, product_metadata, xendit_invoice_id, xendit_payment_method FROM payment_requests WHERE id = ${paymentRequestId} ` if (!existing) return null if (existing.status !== PaymentRequestStatus.CONFIRMED) return existing // idempotent const [updated] = await sql` UPDATE payment_requests SET status = ${PaymentRequestStatus.FAILED_DELIVERY} WHERE id = ${paymentRequestId} AND status = ${PaymentRequestStatus.CONFIRMED} RETURNING * ` if (!updated) return existing await recordFailure({ paymentRequestId, customerId: existing.customer_id, targetedMitraId: existing.targeted_mitra_id, causeTag, amount: existing.amount, }) emit('payment_request.delivery_failed', { ...buildEventPayload(updated), causeTag }) return updated } // Backward-compat alias — pairing.service still calls failPaymentSession by name. // TODO follow-up phase: rename call sites and drop this alias. export const failPaymentSession = markDeliveryFailed // --- recordIntermediateFailure (audit only; doesn't terminate) --- /** * INTERMEDIATE: write a pairing_failures audit row WITHOUT terminating the payment. * Used inside flows with a fallback path (targeted "Curhat lagi" reject can fall back * to general blast on the same payment). One payment_request may have many audit rows. * * Returns the inserted pairing_failures row, or null if the payment is missing. */ export const recordIntermediateFailure = async ({ paymentRequestId, customerId, targetedMitraId = null, causeTag, amount, }) => { if (!Object.values(PairingFailureCause).includes(causeTag)) { throw Object.assign(new Error(`Unknown cause_tag: ${causeTag}`), { code: 'VALIDATION_ERROR', statusCode: 422 }) } const [existing] = await sql`SELECT id FROM payment_requests WHERE id = ${paymentRequestId}` if (!existing) return null return recordFailure({ paymentRequestId, customerId, targetedMitraId, causeTag, amount }) } // --- cancelPayment (customer-initiated abandonment) --- /** * Customer-initiated abandonment of a pending payment (closed payment screen). * No pairing_failures row (no money moved). Idempotent. * * Emits 'payment_request.cancelled' on transition. */ export const cancelPayment = async (paymentRequestId, customerId) => { const [existing] = await sql` SELECT id, customer_id, status FROM payment_requests WHERE id = ${paymentRequestId} ` if (!existing) { throw Object.assign(new Error('Payment request not found'), { code: 'NOT_FOUND', statusCode: 404 }) } if (existing.customer_id !== customerId) { throw Object.assign(new Error('Payment request does not belong to this customer'), { code: 'FORBIDDEN', statusCode: 403 }) } if (existing.status !== PaymentRequestStatus.PENDING) { return existing // idempotent } const [updated] = await sql` UPDATE payment_requests SET status = ${PaymentRequestStatus.ABANDONED} WHERE id = ${paymentRequestId} AND status = ${PaymentRequestStatus.PENDING} RETURNING * ` if (updated) emit('payment_request.cancelled', buildEventPayload(updated)) return updated ?? existing } // --- Background sweeper + reconciliation --- /** * Runs every 60s from server.js + once at startup. * * Three jobs: * 1. Pending past expires_at → expired (no failure row; emits .expired) * 2. Confirmed past expires_at AND not consumed → failed_delivery (writes pairing_failures, emits .delivery_failed) * 3. Confirmed-with-no-chat-session-yet → re-invoke pairing subscriber (lost-event recovery) * * Job 3 is the durability backstop — if a process restart lost the in-process * EventEmitter notification before pairing started, the sweeper re-triggers it * on the next tick. Subscribers MUST be idempotent (the pairing subscriber checks * "chat_sessions WHERE payment_request_id = ?" before doing work). */ export const expireStalePaymentRequests = async () => { // 1) pending → expired (emit event for each) const expired = await sql` UPDATE payment_requests SET status = ${PaymentRequestStatus.EXPIRED} WHERE status = ${PaymentRequestStatus.PENDING} AND expires_at <= NOW() RETURNING * ` for (const row of expired) emit('payment_request.expired', buildEventPayload(row)) // 2) confirmed-but-stale → failed_delivery const flipped = await sql` UPDATE payment_requests SET status = ${PaymentRequestStatus.FAILED_DELIVERY} WHERE status = ${PaymentRequestStatus.CONFIRMED} AND expires_at <= NOW() RETURNING * ` await Promise.all(flipped.map(async (row) => { await recordFailure({ paymentRequestId: row.id, customerId: row.customer_id, targetedMitraId: row.targeted_mitra_id, causeTag: PairingFailureCause.PAYMENT_REQUEST_EXPIRED, amount: row.amount, }) // Customer-facing: push terminal PAIRING_FAILED via WS; FCM fallback if not connected. try { const wsSent = sendToUser(UserType.CUSTOMER, row.customer_id, { type: WsMessage.PAIRING_FAILED, payment_request_id: row.id, cause_tag: PairingFailureCause.PAYMENT_REQUEST_EXPIRED, is_terminal: true, }) if (!wsSent) { await sendPushNotification(UserType.CUSTOMER, row.customer_id, { title: 'Sesi gagal', body: 'Sesi pembayaranmu telah berakhir. Silakan mulai ulang.', data: { type: WsMessage.PAIRING_FAILED, payment_request_id: row.id, cause_tag: PairingFailureCause.PAYMENT_REQUEST_EXPIRED, }, }) } } catch (err) { console.error('expireStalePaymentRequests: failed to notify customer', { paymentRequestId: row.id, customerId: row.customer_id, err, }) } emit('payment_request.delivery_failed', { ...buildEventPayload(row), causeTag: PairingFailureCause.PAYMENT_REQUEST_EXPIRED, }) })) // 3) Reconciliation — confirmed payments that should have started product work but didn't. // For chat_session: no chat_sessions row exists AND no pairing_failures row exists. // The 30-second buffer avoids racing with happy-path subscribers mid-flight. const orphans = await sql` SELECT * FROM payment_requests pr WHERE pr.status = ${PaymentRequestStatus.CONFIRMED} AND pr.confirmed_at < NOW() - INTERVAL '30 seconds' AND pr.product_type = 'chat_session' AND NOT EXISTS (SELECT 1 FROM chat_sessions cs WHERE cs.payment_request_id = pr.id) AND NOT EXISTS (SELECT 1 FROM pairing_failures pf WHERE pf.payment_request_id = pr.id) LIMIT 100 ` let reconciled = 0 for (const row of orphans) { try { // Re-emit the event the subscriber missed. Subscriber's idempotency check // makes this safe even if the subscriber did actually run (just slow). emit('payment_request.confirmed', buildEventPayload(row)) reconciled++ } catch (err) { console.error('[reconciler] re-emit failed', { paymentRequestId: row.id, err }) } } return { expired: expired.length, failed: flipped.length, reconciled } } // Backward-compat alias — server.js previously called expireStalePaymentSessions export const expireStalePaymentSessions = expireStalePaymentRequests // --- getPayment --- export const getPayment = async (id) => { const [row] = await sql`SELECT * FROM payment_requests WHERE id = ${id}` return row ?? null } // Backward-compat aliases (legacy callers — to be migrated in a follow-up pass) export const createPaymentSession = (args) => requestPayment(args) export const confirmPaymentSession = confirmPaymentForCustomer export const consumePaymentSession = consumePayment export const abandonPaymentSession = cancelPayment export const getPaymentSession = getPayment // --- getCustomerPendingPayments (unchanged shape) --- /** * Phase 4 Stage 10 — Chat Tab Pembayaran feed. * Returns the customer's pending payment requests (initial + extension) that * haven't paid AND haven't expired. */ export const getCustomerPendingPayments = async (customerId) => { const items = await sql` SELECT ps.id, ps.is_extension, ps.amount, ps.duration_minutes, ps.mode, ps.created_at, ps.expires_at, COALESCE(ext_m.id, tgt_m.id) AS mitra_id, COALESCE(ext_m.display_name, tgt_m.display_name) AS mitra_display_name FROM payment_requests ps LEFT JOIN session_extensions se ON se.payment_request_id = ps.id LEFT JOIN chat_sessions cs ON cs.id = se.session_id LEFT JOIN mitras ext_m ON ext_m.id = cs.mitra_id LEFT JOIN mitras tgt_m ON tgt_m.id = ps.targeted_mitra_id WHERE ps.customer_id = ${customerId} AND ps.status = ${PaymentRequestStatus.PENDING} AND ps.expires_at > NOW() ORDER BY ps.created_at DESC ` return { items, total: items.length } } // --- Pairing subscriber wiring (called from server.js at startup) --- /** * Wires pairing.service as a subscriber to payment_request.confirmed. * Filters on productType so future product subscribers can coexist. * Subscriber implements its own idempotency (skip if chat_sessions row exists). * * Imported lazily to avoid a circular import: pairing.service imports payment.service * for failPaymentSession + consumePayment + getPayment. */ export const registerPairingSubscriber = () => { on('payment_request.confirmed', async (evt) => { if (evt.productType !== 'chat_session') return const { startPairingFromPaymentRequest } = await import('./pairing.service.js') await startPairingFromPaymentRequest({ paymentRequestId: evt.paymentRequestId, productMetadata: evt.productMetadata, customerId: evt.customerId, }) }) }