When the operator sets require_mitra_ping=false, the auto-offline sweep early-returns (by design — "don't gate online status on heartbeat freshness"). The three Valkey read paths still gated on heartbeat freshness anyway, which trapped the system: sweep won't remove the mitra from mitras:online, but readers reject them as stale. The customer CTA stayed permanently disabled with no recovery. Fix all three to skip the heartbeat-freshness check when require_ping is off, matching the sweep's contract: - computeAvailabilityFromValkey (customer beacon) - isMitraReachable (extension service) - findAvailableMitrasFromValkey (pairing candidate finder) The Postgres fallbacks already did the right thing (is_online only, no heartbeat compare); this aligns the Valkey hot path. Also: PATCH /internal/config/mitra-ping now publishes config:invalidate for require_mitra_ping and mitra_stale_after_seconds, and the subscriber in mitra-status.service was widened to listen for both. Flipping the toggle in CC now busts the 10s availability snapshot immediately instead of waiting out the TTL. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
946 lines
38 KiB
JavaScript
946 lines
38 KiB
JavaScript
import { getDb } from '../db/client.js'
|
|
import * as valkey from '../plugins/valkey.js'
|
|
import { VK_MITRAS_ONLINE, VK_MITRAS_DEACTIVATED, vkCapacityKey, vkHeartbeatKey } from './mitra-status.service.js'
|
|
import { getMaxCustomersPerMitra, getPairingBlastTimeoutSeconds, getReturningChatConfirmationTimeoutSeconds, getMitraPingConfig } from './config.service.js'
|
|
import { sendToUser } from '../plugins/websocket.js'
|
|
import { sendPushNotification } from './notification.service.js'
|
|
import { startSessionTimer } from './session-timer.service.js'
|
|
import { startSessionListener } from './chat-handler.service.js'
|
|
import { consumePaymentSession, failPaymentSession, getPaymentSession, recordIntermediateFailure } from './payment.service.js'
|
|
import { isMitraReachable, isMitraInActiveSessionWithCustomer, getMitraActiveSessionCount, recomputeCapacityForMitra, recomputeCapacityBySession } from './mitra-status.service.js'
|
|
import {
|
|
UserType,
|
|
SessionStatus,
|
|
NotificationResponse,
|
|
TransactionType,
|
|
WsMessage,
|
|
TopicSensitivity,
|
|
PaymentRequestStatus,
|
|
PairingFailureCause,
|
|
PairingRequestType,
|
|
} from '../constants.js'
|
|
|
|
const sql = getDb()
|
|
|
|
// Timeout map for active pairing requests (sessionId → timeoutId)
|
|
const pairingTimeouts = new Map()
|
|
|
|
// Send notification to mitra via WebSocket, fall back to FCM if offline
|
|
const notifyMitra = async (mitraId, data) => {
|
|
const sent = sendToUser(UserType.MITRA, mitraId, data)
|
|
if (!sent) {
|
|
// Mitra not connected via WebSocket — send FCM push
|
|
if (data.type === WsMessage.CHAT_REQUEST) {
|
|
await sendPushNotification(UserType.MITRA, mitraId, {
|
|
title: 'Permintaan Chat Baru',
|
|
body: 'Ada pelanggan yang ingin curhat! Ketuk untuk menerima.',
|
|
data: {
|
|
type: WsMessage.CHAT_REQUEST,
|
|
session_id: data.session_id,
|
|
request_type: data.request_type || PairingRequestType.GENERAL,
|
|
action: 'open_accept',
|
|
},
|
|
})
|
|
}
|
|
}
|
|
}
|
|
|
|
// Send notification to customer via WebSocket, fall back to FCM if offline
|
|
const notifyCustomer = async (customerId, data) => {
|
|
const sent = sendToUser(UserType.CUSTOMER, customerId, data)
|
|
console.log(`[notifyCustomer] customerId=${customerId} type=${data.type} ws_sent=${sent}`)
|
|
if (!sent) {
|
|
if (data.type === WsMessage.PAIRED) {
|
|
await sendPushNotification(UserType.CUSTOMER, customerId, {
|
|
title: 'Bestie Ditemukan!',
|
|
body: `${data.mitra_display_name} siap menemanimu curhat`,
|
|
data: { type: WsMessage.PAIRED, session_id: data.session_id },
|
|
})
|
|
} else if (data.type === WsMessage.SESSION_EXPIRED) {
|
|
await sendPushNotification(UserType.CUSTOMER, customerId, {
|
|
title: 'Tidak Ada Bestie',
|
|
body: 'Maaf, tidak ada bestie yang tersedia saat ini.',
|
|
data: { type: WsMessage.SESSION_EXPIRED, session_id: data.session_id },
|
|
})
|
|
} else if (data.type === WsMessage.PAIRING_FAILED) {
|
|
// Terminal pairing failure on a confirmed payment. Push so the customer
|
|
// can come back to the app and see the failed-pairing screen / contact support.
|
|
await sendPushNotification(UserType.CUSTOMER, customerId, {
|
|
title: 'Sesi gagal',
|
|
body: 'Maaf, kami tidak bisa menemukan bestie untuk sesimu. Tim kami akan menghubungimu segera.',
|
|
data: {
|
|
type: WsMessage.PAIRING_FAILED,
|
|
payment_request_id: data.payment_request_id || '',
|
|
cause_tag: data.cause_tag || '',
|
|
},
|
|
})
|
|
}
|
|
}
|
|
}
|
|
|
|
// Valkey-driven: SDIFF(online, deactivated) → for each candidate, pipelined
|
|
// GET capacity + heartbeat, then filter by capacity gate + heartbeat freshness.
|
|
// Postgres fallback runs if any Valkey op throws (full JOIN as before).
|
|
const findAvailableMitrasFromValkey = async () => {
|
|
const { max_customers_per_mitra } = await getMaxCustomersPerMitra()
|
|
const { require_ping, stale_after_seconds } = await getMitraPingConfig()
|
|
|
|
const candidates = await valkey.sdiff(VK_MITRAS_ONLINE, VK_MITRAS_DEACTIVATED)
|
|
if (!candidates.length) return []
|
|
|
|
const pipe = valkey.pipeline()
|
|
for (const id of candidates) {
|
|
pipe.get(vkCapacityKey(id))
|
|
if (require_ping) pipe.get(vkHeartbeatKey(id))
|
|
}
|
|
const results = await pipe.exec()
|
|
const stride = require_ping ? 2 : 1
|
|
|
|
const cutoff = Date.now() - stale_after_seconds * 1000
|
|
const eligible = []
|
|
for (let i = 0; i < candidates.length; i++) {
|
|
const capacity = Number(results[i * stride][1] ?? 0)
|
|
if (capacity >= max_customers_per_mitra) continue
|
|
// See computeAvailabilityFromValkey in mitra-status.service.js: when the
|
|
// ping requirement is off, the sweep is off too, so we don't gate
|
|
// candidate selection on heartbeat freshness here either.
|
|
if (require_ping) {
|
|
const heartbeat = results[i * stride + 1][1]
|
|
if (!heartbeat || Date.parse(heartbeat) < cutoff) continue
|
|
}
|
|
eligible.push({ id: candidates[i], active_session_count: capacity })
|
|
}
|
|
return eligible
|
|
}
|
|
|
|
const findAvailableMitrasFromPostgres = async () => {
|
|
const { max_customers_per_mitra } = await getMaxCustomersPerMitra()
|
|
const mitras = await sql`
|
|
SELECT m.id, m.display_name, sub.active_session_count
|
|
FROM mitras m
|
|
INNER JOIN mitra_online_status s ON s.mitra_id = m.id
|
|
INNER JOIN LATERAL (
|
|
SELECT COUNT(*)::int AS active_session_count
|
|
FROM chat_sessions cs
|
|
WHERE cs.mitra_id = m.id AND cs.status IN (${SessionStatus.ACTIVE}, ${SessionStatus.PENDING_PAYMENT})
|
|
) sub ON true
|
|
WHERE m.is_active = true
|
|
AND s.is_online = true
|
|
AND sub.active_session_count < ${max_customers_per_mitra}
|
|
`
|
|
return mitras
|
|
}
|
|
|
|
export const findAvailableMitras = async () => {
|
|
try {
|
|
return await findAvailableMitrasFromValkey()
|
|
} catch (err) {
|
|
console.warn('[findAvailableMitras] valkey unavailable, falling back to Postgres:', err.message)
|
|
return findAvailableMitrasFromPostgres()
|
|
}
|
|
}
|
|
|
|
/**
|
|
* Validate that a payment session is owned by the customer, confirmed, and not yet consumed.
|
|
* Throws on mismatch. Returns the loaded payment session row.
|
|
*/
|
|
const requireConfirmedPaymentRequest = async (paymentRequestId, customerId, { allowExtension = false } = {}) => {
|
|
if (!paymentRequestId) {
|
|
throw Object.assign(new Error('payment_request_id is required'), {
|
|
code: 'VALIDATION_ERROR', statusCode: 422,
|
|
})
|
|
}
|
|
const payRequest = await getPaymentSession(paymentRequestId)
|
|
if (!payRequest) {
|
|
throw Object.assign(new Error('Payment session not found'), { code: 'NOT_FOUND', statusCode: 404 })
|
|
}
|
|
if (payRequest.customer_id !== customerId) {
|
|
throw Object.assign(new Error('Payment session does not belong to this customer'), {
|
|
code: 'FORBIDDEN', statusCode: 403,
|
|
})
|
|
}
|
|
if (payRequest.status !== PaymentRequestStatus.CONFIRMED) {
|
|
throw Object.assign(new Error(`Payment session is ${payRequest.status}, must be confirmed`), {
|
|
code: 'INVALID_STATE', statusCode: 409,
|
|
})
|
|
}
|
|
if (payRequest.is_extension && !allowExtension) {
|
|
throw Object.assign(new Error('Extension payment session cannot be used to start a new chat'), {
|
|
code: 'INVALID_STATE', statusCode: 409,
|
|
})
|
|
}
|
|
if (new Date(payRequest.expires_at) <= new Date()) {
|
|
// Check expiry inline at every state transition (defense in depth vs. the background sweeper).
|
|
await failPaymentSession(paymentRequestId, PairingFailureCause.PAYMENT_REQUEST_EXPIRED)
|
|
throw Object.assign(new Error('Payment session has expired'), { code: 'EXPIRED', statusCode: 409 })
|
|
}
|
|
return payRequest
|
|
}
|
|
|
|
/**
|
|
* General-blast pairing request. Requires a confirmed payment_request_id.
|
|
*
|
|
* The duration_minutes / price / is_first_session_discount values for the chat_session row are
|
|
* sourced from the payment session — the client does not dictate pricing here.
|
|
*
|
|
* `allowTargetedPayment` is set true by the fallback-to-blast path: the original payment
|
|
* was created with a `targeted_mitra_id` for "Curhat lagi" but the customer chose to
|
|
* fall back to general blast on the same payment. The flag bypasses the
|
|
* "use returning-chat endpoint" guard in that exact case.
|
|
*/
|
|
export const createPairingRequest = async (customerId, { paymentRequestId, topic_sensitivity, allowTargetedPayment = false } = {}) => {
|
|
const payRequest = await requireConfirmedPaymentRequest(paymentRequestId, customerId)
|
|
|
|
// Targeted payment session must use createTargetedPairingRequest unless we're
|
|
// explicitly invoked by the fallback-to-blast path.
|
|
if (payRequest.targeted_mitra_id && !allowTargetedPayment) {
|
|
throw Object.assign(new Error('Payment session is targeted to a specific mitra; use returning-chat endpoint'), {
|
|
code: 'INVALID_STATE', statusCode: 409,
|
|
})
|
|
}
|
|
|
|
// Check for existing active session or request
|
|
const [existing] = await sql`
|
|
SELECT id, status FROM chat_sessions
|
|
WHERE customer_id = ${customerId}
|
|
AND status IN (${SessionStatus.SEARCHING}, ${SessionStatus.PENDING_ACCEPTANCE}, ${SessionStatus.PENDING_PAYMENT}, ${SessionStatus.ACTIVE})
|
|
`
|
|
if (existing) {
|
|
throw Object.assign(new Error('Customer already has an active session or request'), {
|
|
code: 'ALREADY_ACTIVE', statusCode: 409,
|
|
})
|
|
}
|
|
|
|
const availableMitras = await findAvailableMitras()
|
|
if (availableMitras.length === 0) {
|
|
// No mitras to blast to — fail the payment immediately.
|
|
await failPaymentSession(paymentRequestId, PairingFailureCause.NO_MITRA_AVAILABLE)
|
|
throw Object.assign(new Error('No bestie available'), {
|
|
code: 'NO_MITRA_AVAILABLE', statusCode: 404,
|
|
})
|
|
}
|
|
|
|
const resolvedTopic = topic_sensitivity === TopicSensitivity.SENSITIVE
|
|
? TopicSensitivity.SENSITIVE
|
|
: TopicSensitivity.REGULAR
|
|
|
|
// Create session sourced from the payment session.
|
|
const [session] = await sql`
|
|
INSERT INTO chat_sessions (
|
|
customer_id, status, duration_minutes, price, is_first_session_discount, topic_sensitivity, payment_request_id
|
|
)
|
|
VALUES (
|
|
${customerId}, ${SessionStatus.PENDING_ACCEPTANCE},
|
|
${payRequest.duration_minutes}, ${payRequest.amount}, ${payRequest.is_first_session_discount},
|
|
${resolvedTopic}, ${paymentRequestId}
|
|
)
|
|
RETURNING id, customer_id, status, duration_minutes, price, is_first_session_discount, topic_sensitivity, payment_request_id, created_at
|
|
`
|
|
|
|
// Fan out to all available mitras in parallel — DB inserts and notifications are
|
|
// independent per mitra. active_session_count was already projected by findAvailableMitras.
|
|
await Promise.all(availableMitras.map(async (mitra) => {
|
|
await sql`
|
|
INSERT INTO chat_request_notifications (session_id, mitra_id, active_session_count)
|
|
VALUES (${session.id}, ${mitra.id}, ${mitra.active_session_count})
|
|
`
|
|
await notifyMitra(mitra.id, {
|
|
type: WsMessage.CHAT_REQUEST,
|
|
session_id: session.id,
|
|
request_type: PairingRequestType.GENERAL,
|
|
created_at: session.created_at,
|
|
duration_minutes: session.duration_minutes,
|
|
is_first_session_discount: session.is_first_session_discount,
|
|
topic_sensitivity: session.topic_sensitivity,
|
|
})
|
|
}))
|
|
|
|
// Start blast timeout (configurable via app_config)
|
|
const { pairing_blast_timeout_seconds } = await getPairingBlastTimeoutSeconds()
|
|
const timeoutId = setTimeout(async () => {
|
|
try {
|
|
await expirePairingRequest(session.id, PairingFailureCause.NO_MITRA_AVAILABLE)
|
|
} catch (err) {
|
|
console.error('expirePairingRequest failed', { sessionId: session.id, err })
|
|
}
|
|
}, pairing_blast_timeout_seconds * 1000)
|
|
pairingTimeouts.set(session.id, timeoutId)
|
|
|
|
return session
|
|
}
|
|
|
|
/**
|
|
* Phase 5: server-driven pairing entry point — called by the payment service's
|
|
* `payment_request.confirmed` event subscriber. Replaces the client-driven
|
|
* POST /chat/request path for new payments.
|
|
*
|
|
* **Idempotent.** If a chat_session already exists for this payment_request_id,
|
|
* returns it without doing anything. Safe to call multiple times — the
|
|
* reconciliation sweeper relies on this property to retry lost events.
|
|
*
|
|
* Routes general-blast vs targeted based on productMetadata.targeted_mitra_id.
|
|
* Errors are caught and logged (don't bubble — the event subscriber wrapper would
|
|
* also catch but logging here gives better context).
|
|
*/
|
|
export const startPairingFromPaymentRequest = async ({ paymentRequestId, productMetadata, customerId }) => {
|
|
// Idempotency check — covers webhook retries, reconciliation sweeper re-emit,
|
|
// and the case where the legacy client still POSTs to /chat/request after our
|
|
// subscriber already started pairing.
|
|
const [existing] = await sql`
|
|
SELECT id FROM chat_sessions WHERE payment_request_id = ${paymentRequestId}
|
|
`
|
|
if (existing) return existing
|
|
|
|
const targetedMitraId = productMetadata?.targeted_mitra_id ?? null
|
|
const topicSensitivity = productMetadata?.topic_sensitivity ?? TopicSensitivity.REGULAR
|
|
|
|
try {
|
|
if (targetedMitraId) {
|
|
return await createTargetedPairingRequest(customerId, {
|
|
paymentRequestId,
|
|
targetedMitraId,
|
|
topic_sensitivity: topicSensitivity,
|
|
})
|
|
}
|
|
return await createPairingRequest(customerId, {
|
|
paymentRequestId,
|
|
topic_sensitivity: topicSensitivity,
|
|
})
|
|
} catch (err) {
|
|
// Already-active is benign — covers the race where the legacy /chat/request
|
|
// beat the subscriber to it. NO_MITRA_AVAILABLE has already failed the payment
|
|
// (createPairingRequest does that internally) — we don't need to act further.
|
|
if (err.code === 'ALREADY_ACTIVE' || err.code === 'NO_MITRA_AVAILABLE') {
|
|
console.log(`[pairing subscriber] ${err.code} for payment ${paymentRequestId} — already handled`)
|
|
return null
|
|
}
|
|
console.error('[pairing subscriber] startPairing failed', { paymentRequestId, err })
|
|
throw err
|
|
}
|
|
}
|
|
|
|
/**
|
|
* Targeted pairing request for "Curhat lagi" (returning chat).
|
|
*
|
|
* - Pre-check targeted mitra reachability + capacity. If unreachable or at-capacity-and-not-mid-session
|
|
* with this customer → fail payment immediately and return 409 with `reason: 'targeted_mitra_offline'`.
|
|
* - Fire ONE notification to the targeted mitra.
|
|
* - Start a server-side timer of `returning_chat_confirmation_timeout_seconds`. On expiry,
|
|
* mark request auto-rejected, fail payment with `targeted_mitra_timeout`, push WS event.
|
|
* - On explicit decline by mitra: fail payment with `targeted_mitra_rejected`, push WS event.
|
|
* - On accept: existing accept path runs (consumes payment session as for general blast).
|
|
*/
|
|
export const createTargetedPairingRequest = async (customerId, { paymentRequestId, targetedMitraId, topic_sensitivity } = {}) => {
|
|
const payRequest = await requireConfirmedPaymentRequest(paymentRequestId, customerId)
|
|
|
|
if (!targetedMitraId) {
|
|
throw Object.assign(new Error('targetedMitraId is required'), { code: 'VALIDATION_ERROR', statusCode: 422 })
|
|
}
|
|
// Cross-check: payment_request.targeted_mitra_id should match (if set).
|
|
if (payRequest.targeted_mitra_id && payRequest.targeted_mitra_id !== targetedMitraId) {
|
|
throw Object.assign(new Error('targetedMitraId does not match payment session'), {
|
|
code: 'INVALID_STATE', statusCode: 409,
|
|
})
|
|
}
|
|
|
|
// Check for existing active session or request
|
|
const [existing] = await sql`
|
|
SELECT id, status FROM chat_sessions
|
|
WHERE customer_id = ${customerId}
|
|
AND status IN (${SessionStatus.SEARCHING}, ${SessionStatus.PENDING_ACCEPTANCE}, ${SessionStatus.PENDING_PAYMENT}, ${SessionStatus.ACTIVE})
|
|
`
|
|
if (existing) {
|
|
throw Object.assign(new Error('Customer already has an active session or request'), {
|
|
code: 'ALREADY_ACTIVE', statusCode: 409,
|
|
})
|
|
}
|
|
|
|
// Pre-check: mitra reachable?
|
|
const reachable = await isMitraReachable(targetedMitraId)
|
|
if (!reachable) {
|
|
// Intermediate failure: audit row written, payment stays `confirmed` so the customer
|
|
// can choose to fall back to general blast (or cancel, which terminates).
|
|
await recordIntermediateFailure({
|
|
paymentRequestId,
|
|
customerId,
|
|
targetedMitraId,
|
|
causeTag: PairingFailureCause.TARGETED_MITRA_OFFLINE,
|
|
amount: payRequest.amount,
|
|
})
|
|
throw Object.assign(new Error('Targeted mitra is offline'), {
|
|
code: 'TARGETED_MITRA_OFFLINE', statusCode: 409, reason: 'targeted_mitra_offline',
|
|
})
|
|
}
|
|
|
|
// Pre-check: mitra at capacity AND not mid-session with this customer?
|
|
const { max_customers_per_mitra } = await getMaxCustomersPerMitra()
|
|
const activeCount = await getMitraActiveSessionCount(targetedMitraId)
|
|
if (activeCount >= max_customers_per_mitra) {
|
|
const midSessionWithCustomer = await isMitraInActiveSessionWithCustomer(targetedMitraId, customerId)
|
|
if (!midSessionWithCustomer) {
|
|
await recordIntermediateFailure({
|
|
paymentRequestId,
|
|
customerId,
|
|
targetedMitraId,
|
|
causeTag: PairingFailureCause.TARGETED_MITRA_OFFLINE,
|
|
amount: payRequest.amount,
|
|
})
|
|
throw Object.assign(new Error('Targeted mitra is at capacity'), {
|
|
code: 'TARGETED_MITRA_OFFLINE', statusCode: 409, reason: 'targeted_mitra_offline',
|
|
})
|
|
}
|
|
// Else: at-capacity but mid-session with the requesting customer — request allowed through.
|
|
}
|
|
|
|
const resolvedTopic = topic_sensitivity === TopicSensitivity.SENSITIVE
|
|
? TopicSensitivity.SENSITIVE
|
|
: TopicSensitivity.REGULAR
|
|
|
|
// Create session sourced from the payment session, status = pending_acceptance.
|
|
const [session] = await sql`
|
|
INSERT INTO chat_sessions (
|
|
customer_id, status, duration_minutes, price, is_first_session_discount, topic_sensitivity, payment_request_id
|
|
)
|
|
VALUES (
|
|
${customerId}, ${SessionStatus.PENDING_ACCEPTANCE},
|
|
${payRequest.duration_minutes}, ${payRequest.amount}, ${payRequest.is_first_session_discount},
|
|
${resolvedTopic}, ${paymentRequestId}
|
|
)
|
|
RETURNING id, customer_id, status, duration_minutes, price, is_first_session_discount, topic_sensitivity, payment_request_id, created_at
|
|
`
|
|
|
|
// Single notification to the targeted mitra
|
|
await sql`
|
|
INSERT INTO chat_request_notifications (session_id, mitra_id, active_session_count)
|
|
VALUES (${session.id}, ${targetedMitraId}, ${activeCount})
|
|
`
|
|
// Server-side timer (configurable, default 20s) — also surfaced in the WS payload so the mitra
|
|
// app countdown UI matches what the server is enforcing.
|
|
const { returning_chat_confirmation_timeout_seconds } = await getReturningChatConfirmationTimeoutSeconds()
|
|
|
|
await notifyMitra(targetedMitraId, {
|
|
type: WsMessage.CHAT_REQUEST,
|
|
session_id: session.id,
|
|
request_type: PairingRequestType.RETURNING,
|
|
created_at: session.created_at,
|
|
duration_minutes: session.duration_minutes,
|
|
is_first_session_discount: session.is_first_session_discount,
|
|
topic_sensitivity: session.topic_sensitivity,
|
|
confirmation_timeout_seconds: returning_chat_confirmation_timeout_seconds,
|
|
})
|
|
|
|
const timeoutId = setTimeout(async () => {
|
|
try {
|
|
await expireTargetedPairingRequest(session.id)
|
|
} catch (err) {
|
|
console.error('expireTargetedPairingRequest failed', { sessionId: session.id, err })
|
|
}
|
|
}, returning_chat_confirmation_timeout_seconds * 1000)
|
|
pairingTimeouts.set(session.id, timeoutId)
|
|
|
|
// Surface the timeout to the customer so the targeted-waiting overlay countdown
|
|
// matches the server-side timer exactly (CC-configurable; never stale).
|
|
return { ...session, confirmation_timeout_seconds: returning_chat_confirmation_timeout_seconds }
|
|
}
|
|
|
|
export const acceptPairingRequest = async (sessionId, mitraId) => {
|
|
// Use a transaction-like approach: update only if status is still pending_acceptance
|
|
const [session] = await sql`
|
|
UPDATE chat_sessions
|
|
SET mitra_id = ${mitraId}, status = ${SessionStatus.PENDING_PAYMENT}, paired_at = NOW()
|
|
WHERE id = ${sessionId} AND status = ${SessionStatus.PENDING_ACCEPTANCE} AND mitra_id IS NULL
|
|
RETURNING id, customer_id, mitra_id, status, paired_at, payment_request_id
|
|
`
|
|
|
|
if (!session) {
|
|
throw Object.assign(new Error('Request already accepted or expired'), {
|
|
code: 'REQUEST_UNAVAILABLE', statusCode: 409,
|
|
})
|
|
}
|
|
|
|
// Mitra now occupies a capacity slot (PENDING_PAYMENT counts per
|
|
// findAvailableMitras predicate). Mirror to Valkey.
|
|
await recomputeCapacityForMitra(mitraId)
|
|
|
|
// Mark this mitra's notification as accepted
|
|
await sql`
|
|
UPDATE chat_request_notifications
|
|
SET response = ${NotificationResponse.ACCEPTED}, responded_at = NOW()
|
|
WHERE session_id = ${sessionId} AND mitra_id = ${mitraId}
|
|
`
|
|
|
|
// Mark other mitras' notifications as missed (another mitra accepted)
|
|
await sql`
|
|
UPDATE chat_request_notifications
|
|
SET response = ${NotificationResponse.MISSED}, responded_at = NOW()
|
|
WHERE session_id = ${sessionId} AND mitra_id != ${mitraId} AND response IS NULL
|
|
`
|
|
|
|
// Clear timeout
|
|
const timeoutId = pairingTimeouts.get(sessionId)
|
|
if (timeoutId) {
|
|
clearTimeout(timeoutId)
|
|
pairingTimeouts.delete(sessionId)
|
|
}
|
|
|
|
// Consume the payment session at the moment of acceptance.
|
|
if (session.payment_request_id) {
|
|
await consumePaymentSession(session.payment_request_id)
|
|
}
|
|
|
|
// Activate the session and set expires_at.
|
|
const [activeSession] = await sql`
|
|
UPDATE chat_sessions
|
|
SET status = ${SessionStatus.ACTIVE},
|
|
expires_at = CASE
|
|
WHEN duration_minutes IS NOT NULL THEN NOW() + (duration_minutes || ' minutes')::interval
|
|
ELSE NULL
|
|
END
|
|
WHERE id = ${sessionId}
|
|
RETURNING id, customer_id, mitra_id, status, paired_at, duration_minutes, price, is_first_session_discount, expires_at, payment_request_id
|
|
`
|
|
|
|
// Record transaction
|
|
if (activeSession.duration_minutes) {
|
|
const txType = activeSession.is_first_session_discount ? TransactionType.FIRST_SESSION_DISCOUNT : TransactionType.PAID
|
|
await sql`
|
|
INSERT INTO customer_transactions (customer_id, session_id, type, amount)
|
|
VALUES (${activeSession.customer_id}, ${sessionId}, ${txType}, ${activeSession.price || 0})
|
|
`
|
|
}
|
|
|
|
// Start session timer if duration is set
|
|
if (activeSession.expires_at) {
|
|
startSessionTimer(sessionId, activeSession.expires_at)
|
|
}
|
|
|
|
// Start chat message listener for this session
|
|
startSessionListener(sessionId)
|
|
|
|
// Get mitra display name for customer notification
|
|
const [mitra] = await sql`
|
|
SELECT display_name FROM mitras WHERE id = ${mitraId}
|
|
`
|
|
|
|
// Notify customer via WebSocket (FCM fallback)
|
|
await notifyCustomer(activeSession.customer_id, {
|
|
type: WsMessage.PAIRED,
|
|
session_id: sessionId,
|
|
mitra_display_name: mitra.display_name,
|
|
status: SessionStatus.ACTIVE,
|
|
})
|
|
|
|
// Notify other mitras to dismiss the request — independent fan-out, run in parallel.
|
|
const notifications = await sql`
|
|
SELECT mitra_id FROM chat_request_notifications
|
|
WHERE session_id = ${sessionId} AND mitra_id != ${mitraId}
|
|
`
|
|
await Promise.all(notifications.map((n) => notifyMitra(n.mitra_id, {
|
|
type: WsMessage.CHAT_REQUEST_CLOSED,
|
|
session_id: sessionId,
|
|
reason: 'accepted_by_other',
|
|
})))
|
|
|
|
return activeSession
|
|
}
|
|
|
|
export const declinePairingRequest = async (sessionId, mitraId) => {
|
|
await sql`
|
|
UPDATE chat_request_notifications
|
|
SET response = ${NotificationResponse.DECLINED}, responded_at = NOW()
|
|
WHERE session_id = ${sessionId} AND mitra_id = ${mitraId} AND response IS NULL
|
|
`
|
|
|
|
// Targeted-vs-general is determined by the payment_request.targeted_mitra_id, not by
|
|
// notification count — a general blast with only one online mitra also has length=1.
|
|
const [targetCheck] = await sql`
|
|
SELECT ps.targeted_mitra_id
|
|
FROM chat_sessions cs
|
|
LEFT JOIN payment_requests ps ON ps.id = cs.payment_request_id
|
|
WHERE cs.id = ${sessionId}
|
|
`
|
|
const isTargeted = !!targetCheck?.targeted_mitra_id
|
|
|
|
if (isTargeted) {
|
|
// Mark the chat_session as expired (the targeted attempt is over) — but keep the
|
|
// payment_request in `confirmed` so the customer can fall back to general blast on
|
|
// the same payment, or cancel (which then terminates).
|
|
const [session] = await sql`
|
|
UPDATE chat_sessions
|
|
SET status = ${SessionStatus.EXPIRED}
|
|
WHERE id = ${sessionId} AND status = ${SessionStatus.PENDING_ACCEPTANCE}
|
|
RETURNING id, customer_id, payment_request_id
|
|
`
|
|
if (session) {
|
|
// Clear the 20s timer if still pending.
|
|
const timeoutId = pairingTimeouts.get(sessionId)
|
|
if (timeoutId) {
|
|
clearTimeout(timeoutId)
|
|
pairingTimeouts.delete(sessionId)
|
|
}
|
|
|
|
// Audit row only; payment session stays `confirmed`.
|
|
if (session.payment_request_id) {
|
|
const payRequest = await getPaymentSession(session.payment_request_id)
|
|
if (payRequest) {
|
|
await recordIntermediateFailure({
|
|
paymentRequestId: session.payment_request_id,
|
|
customerId: session.customer_id,
|
|
targetedMitraId: mitraId,
|
|
causeTag: PairingFailureCause.TARGETED_MITRA_REJECTED,
|
|
amount: payRequest.amount,
|
|
})
|
|
}
|
|
}
|
|
|
|
// Push a returning-chat-rejected WS event to the customer (fall-through to fallback flow).
|
|
await notifyCustomer(session.customer_id, {
|
|
type: WsMessage.RETURNING_CHAT_REJECTED,
|
|
session_id: sessionId,
|
|
payment_request_id: session.payment_request_id,
|
|
})
|
|
}
|
|
return
|
|
}
|
|
|
|
// General-blast: if all notifications now have a non-null DECLINED response → treat as
|
|
// every-mitra-rejected (terminal, distinct from blast-window-timeout). Empty-array guard
|
|
// prevents a misfire when the SELECT happens to return zero rows.
|
|
const notifications = await sql`
|
|
SELECT response FROM chat_request_notifications WHERE session_id = ${sessionId}
|
|
`
|
|
const allDeclined = notifications.length > 0
|
|
&& notifications.every((n) => n.response === NotificationResponse.DECLINED)
|
|
if (allDeclined) {
|
|
const [session] = await sql`
|
|
UPDATE chat_sessions
|
|
SET status = ${SessionStatus.EXPIRED}
|
|
WHERE id = ${sessionId} AND status = ${SessionStatus.PENDING_ACCEPTANCE}
|
|
RETURNING id, customer_id, payment_request_id
|
|
`
|
|
if (session) {
|
|
const timeoutId = pairingTimeouts.get(sessionId)
|
|
if (timeoutId) {
|
|
clearTimeout(timeoutId)
|
|
pairingTimeouts.delete(sessionId)
|
|
}
|
|
|
|
// Intermediate failure: payment stays confirmed so the customer can re-blast
|
|
// from the S7 timeout CTA. Audit row is still written.
|
|
if (session.payment_request_id) {
|
|
const payRequest = await getPaymentSession(session.payment_request_id)
|
|
if (payRequest) {
|
|
await recordIntermediateFailure({
|
|
paymentRequestId: session.payment_request_id,
|
|
customerId: session.customer_id,
|
|
causeTag: PairingFailureCause.ALL_MITRAS_REJECTED,
|
|
amount: payRequest.amount,
|
|
})
|
|
}
|
|
}
|
|
|
|
await notifyCustomer(session.customer_id, {
|
|
type: WsMessage.PAIRING_FAILED,
|
|
session_id: sessionId,
|
|
payment_request_id: session.payment_request_id,
|
|
cause_tag: PairingFailureCause.ALL_MITRAS_REJECTED,
|
|
is_terminal: false,
|
|
})
|
|
}
|
|
}
|
|
}
|
|
|
|
export const cancelPairingRequest = async (sessionId, customerId) => {
|
|
const [session] = await sql`
|
|
UPDATE chat_sessions
|
|
SET status = ${SessionStatus.CANCELLED}
|
|
WHERE id = ${sessionId} AND customer_id = ${customerId}
|
|
AND status IN (${SessionStatus.SEARCHING}, ${SessionStatus.PENDING_ACCEPTANCE})
|
|
RETURNING id, customer_id, status, payment_request_id
|
|
`
|
|
|
|
if (!session) {
|
|
throw Object.assign(new Error('Cannot cancel this request'), {
|
|
code: 'CANNOT_CANCEL', statusCode: 409,
|
|
})
|
|
}
|
|
|
|
// Clear timeout
|
|
const timeoutId = pairingTimeouts.get(sessionId)
|
|
if (timeoutId) {
|
|
clearTimeout(timeoutId)
|
|
pairingTimeouts.delete(sessionId)
|
|
}
|
|
|
|
// Mark all notifications as ignored
|
|
await sql`
|
|
UPDATE chat_request_notifications
|
|
SET response = ${NotificationResponse.IGNORED}, responded_at = NOW()
|
|
WHERE session_id = ${sessionId} AND response IS NULL
|
|
`
|
|
|
|
// Notify mitras to dismiss (customer cancelled) — independent fan-out, run in parallel.
|
|
const notifications = await sql`
|
|
SELECT mitra_id FROM chat_request_notifications WHERE session_id = ${sessionId}
|
|
`
|
|
await Promise.all(notifications.map((n) => notifyMitra(n.mitra_id, {
|
|
type: WsMessage.CHAT_REQUEST_CLOSED,
|
|
session_id: sessionId,
|
|
reason: 'cancelled_by_customer',
|
|
})))
|
|
|
|
// Customer initiated this cancel; the calling client already navigates home. Do not
|
|
// push PAIRING_FAILED for customer-initiated cancels — surfacing it as a "failure"
|
|
// event (especially via FCM if backgrounded) misframes the user's own action.
|
|
if (session.payment_request_id) {
|
|
await failPaymentSession(session.payment_request_id, PairingFailureCause.CUSTOMER_CANCELLED)
|
|
}
|
|
|
|
return session
|
|
}
|
|
|
|
/**
|
|
* Customer-initiated cancel during a payment-search.
|
|
*
|
|
* Use this when the customer is sitting on the searching/waiting screen with a confirmed
|
|
* payment but no chat-session row yet exists, OR when they're in a returning-chat 20s wait.
|
|
* If a chat_session was already created (general blast in flight, or targeted request out),
|
|
* we cancel that too.
|
|
*/
|
|
export const cancelPaymentSearch = async (paymentRequestId, customerId) => {
|
|
const payRequest = await getPaymentSession(paymentRequestId)
|
|
if (!payRequest) {
|
|
throw Object.assign(new Error('Payment session not found'), { code: 'NOT_FOUND', statusCode: 404 })
|
|
}
|
|
if (payRequest.customer_id !== customerId) {
|
|
throw Object.assign(new Error('Payment session does not belong to this customer'), {
|
|
code: 'FORBIDDEN', statusCode: 403,
|
|
})
|
|
}
|
|
|
|
// If a chat_session exists for this payment in pending_acceptance/searching, cancel it.
|
|
const [linkedSession] = await sql`
|
|
SELECT id FROM chat_sessions
|
|
WHERE payment_request_id = ${paymentRequestId}
|
|
AND status IN (${SessionStatus.SEARCHING}, ${SessionStatus.PENDING_ACCEPTANCE})
|
|
`
|
|
if (linkedSession) {
|
|
// cancelPairingRequest also fails the payment session — short-circuit to avoid double work.
|
|
return cancelPairingRequest(linkedSession.id, customerId)
|
|
}
|
|
|
|
// Otherwise fail the payment directly. Covers the case where the customer cancels after
|
|
// the targeted attempt already expired/rejected (chat_session no longer pending_acceptance)
|
|
// but the payment is still `confirmed`. No customer-side WS push — see cancelPairingRequest.
|
|
if (payRequest.status === PaymentRequestStatus.CONFIRMED) {
|
|
await failPaymentSession(paymentRequestId, PairingFailureCause.CUSTOMER_CANCELLED)
|
|
}
|
|
return { id: paymentRequestId, payment_request_id: paymentRequestId }
|
|
}
|
|
|
|
/**
|
|
* After a returning-chat fail, customer taps "Chat dengan bestie lain".
|
|
*
|
|
* The original payment_request stays in `confirmed` for the entire returning-chat flow —
|
|
* targeted reject/timeout writes an audit-only `pairing_failures` row but does NOT terminate.
|
|
* So when the customer falls back to general blast, we reuse the same `payment_request_id`
|
|
* directly. Multiple `pairing_failures` rows may FK from one payment_request — that's the
|
|
* desired CC UX (one row per failed attempt). Termination happens only at the actual end
|
|
* of the flow (chat starts → consumed; cancel/blast-exhaust → failed_delivery).
|
|
*
|
|
* The targeted_mitra_id flag on the original row is left as-is (it records the customer's
|
|
* original intent); the general blast happens regardless.
|
|
*/
|
|
export const fallbackToGeneralBlast = async (paymentRequestId, customerId, { topic_sensitivity } = {}) => {
|
|
const payRequest = await getPaymentSession(paymentRequestId)
|
|
if (!payRequest) {
|
|
throw Object.assign(new Error('Payment session not found'), { code: 'NOT_FOUND', statusCode: 404 })
|
|
}
|
|
if (payRequest.customer_id !== customerId) {
|
|
throw Object.assign(new Error('Payment session does not belong to this customer'), {
|
|
code: 'FORBIDDEN', statusCode: 403,
|
|
})
|
|
}
|
|
if (payRequest.status !== PaymentRequestStatus.CONFIRMED) {
|
|
throw Object.assign(new Error(`Cannot fallback from payment in status ${payRequest.status}`), {
|
|
code: 'INVALID_STATE', statusCode: 409,
|
|
})
|
|
}
|
|
|
|
// Run the general blast against the SAME payment session. Pass `allowTargetedPayment`
|
|
// so the targeted_mitra_id on the payment session doesn't trip the general-blast guard.
|
|
return createPairingRequest(customerId, {
|
|
paymentRequestId,
|
|
topic_sensitivity,
|
|
allowTargetedPayment: true,
|
|
})
|
|
}
|
|
|
|
export const expirePairingRequest = async (sessionId, causeTag = PairingFailureCause.NO_MITRA_AVAILABLE) => {
|
|
const [session] = await sql`
|
|
UPDATE chat_sessions
|
|
SET status = ${SessionStatus.EXPIRED}
|
|
WHERE id = ${sessionId} AND status = ${SessionStatus.PENDING_ACCEPTANCE}
|
|
RETURNING id, customer_id, status, payment_request_id
|
|
`
|
|
if (!session) return null
|
|
|
|
pairingTimeouts.delete(sessionId)
|
|
|
|
// Mark all pending notifications as ignored
|
|
await sql`
|
|
UPDATE chat_request_notifications
|
|
SET response = ${NotificationResponse.IGNORED}, responded_at = NOW()
|
|
WHERE session_id = ${sessionId} AND response IS NULL
|
|
`
|
|
|
|
// Intermediate failure: payment session stays `confirmed` so the customer can
|
|
// re-blast on the same payment from the S7 timeout CTA. Audit row is still
|
|
// written so the failed-pairing CC view captures every attempt.
|
|
if (session.payment_request_id) {
|
|
const payRequest = await getPaymentSession(session.payment_request_id)
|
|
if (payRequest) {
|
|
await recordIntermediateFailure({
|
|
paymentRequestId: session.payment_request_id,
|
|
customerId: session.customer_id,
|
|
causeTag,
|
|
amount: payRequest.amount,
|
|
})
|
|
}
|
|
}
|
|
|
|
await notifyCustomer(session.customer_id, {
|
|
type: WsMessage.PAIRING_FAILED,
|
|
session_id: sessionId,
|
|
payment_request_id: session.payment_request_id,
|
|
cause_tag: causeTag,
|
|
is_terminal: false,
|
|
})
|
|
|
|
// Notify mitras to dismiss (request expired) — independent fan-out, run in parallel.
|
|
const notifications = await sql`
|
|
SELECT mitra_id FROM chat_request_notifications WHERE session_id = ${sessionId}
|
|
`
|
|
await Promise.all(notifications.map((n) => notifyMitra(n.mitra_id, {
|
|
type: WsMessage.CHAT_REQUEST_CLOSED,
|
|
session_id: sessionId,
|
|
reason: 'expired',
|
|
})))
|
|
|
|
return session
|
|
}
|
|
|
|
/**
|
|
* Targeted-request timer fired with no mitra response.
|
|
*
|
|
* INTERMEDIATE failure: the chat_session is marked expired (the targeted attempt is over)
|
|
* but the payment_request stays `confirmed` so the customer can fall back to general blast
|
|
* on the same payment, or cancel (which then terminates).
|
|
*
|
|
* - cause_tag is targeted_mitra_timeout (audit row only)
|
|
* - WS event sent to customer is RETURNING_CHAT_TIMEOUT (not PAIRING_FAILED)
|
|
*/
|
|
export const expireTargetedPairingRequest = async (sessionId) => {
|
|
const [session] = await sql`
|
|
UPDATE chat_sessions
|
|
SET status = ${SessionStatus.EXPIRED}
|
|
WHERE id = ${sessionId} AND status = ${SessionStatus.PENDING_ACCEPTANCE}
|
|
RETURNING id, customer_id, status, payment_request_id
|
|
`
|
|
if (!session) return null
|
|
|
|
pairingTimeouts.delete(sessionId)
|
|
|
|
// Capture which mitra was targeted (for the audit row).
|
|
const [notif] = await sql`
|
|
SELECT mitra_id FROM chat_request_notifications WHERE session_id = ${sessionId} LIMIT 1
|
|
`
|
|
|
|
await sql`
|
|
UPDATE chat_request_notifications
|
|
SET response = ${NotificationResponse.IGNORED}, responded_at = NOW()
|
|
WHERE session_id = ${sessionId} AND response IS NULL
|
|
`
|
|
|
|
if (session.payment_request_id) {
|
|
const payRequest = await getPaymentSession(session.payment_request_id)
|
|
if (payRequest) {
|
|
await recordIntermediateFailure({
|
|
paymentRequestId: session.payment_request_id,
|
|
customerId: session.customer_id,
|
|
targetedMitraId: notif?.mitra_id ?? null,
|
|
causeTag: PairingFailureCause.TARGETED_MITRA_TIMEOUT,
|
|
amount: payRequest.amount,
|
|
})
|
|
}
|
|
}
|
|
|
|
await notifyCustomer(session.customer_id, {
|
|
type: WsMessage.RETURNING_CHAT_TIMEOUT,
|
|
session_id: sessionId,
|
|
payment_request_id: session.payment_request_id,
|
|
})
|
|
|
|
// Notify the targeted mitra that the card is no longer actionable — fan-out in parallel
|
|
// (single recipient today, but cheap to future-proof).
|
|
const notifications = await sql`
|
|
SELECT mitra_id FROM chat_request_notifications WHERE session_id = ${sessionId}
|
|
`
|
|
await Promise.all(notifications.map((n) => notifyMitra(n.mitra_id, {
|
|
type: WsMessage.CHAT_REQUEST_CLOSED,
|
|
session_id: sessionId,
|
|
reason: 'timeout',
|
|
})))
|
|
|
|
return session
|
|
}
|
|
|
|
export const getPendingRequestsForMitra = async (mitraId) => {
|
|
// Distinguish general blast from "Curhat lagi" returning requests via payment_request.targeted_mitra_id.
|
|
// For returning requests, surface the configured timeout so the cold-start (FCM-tap) path can render
|
|
// the countdown overlay — same field the WS payload provides for the live path.
|
|
const rows = await sql`
|
|
SELECT
|
|
cs.id AS session_id,
|
|
cs.duration_minutes,
|
|
cs.is_first_session_discount,
|
|
cs.topic_sensitivity,
|
|
cs.created_at,
|
|
CASE
|
|
WHEN ps.targeted_mitra_id IS NOT NULL THEN ${PairingRequestType.RETURNING}
|
|
ELSE ${PairingRequestType.GENERAL}
|
|
END AS request_type
|
|
FROM chat_request_notifications crn
|
|
JOIN chat_sessions cs ON cs.id = crn.session_id
|
|
LEFT JOIN payment_requests ps ON ps.id = cs.payment_request_id
|
|
WHERE crn.mitra_id = ${mitraId}
|
|
AND crn.response IS NULL
|
|
AND cs.status = ${SessionStatus.PENDING_ACCEPTANCE}
|
|
ORDER BY cs.created_at ASC
|
|
`
|
|
|
|
if (!rows.some((r) => r.request_type === PairingRequestType.RETURNING)) {
|
|
return rows
|
|
}
|
|
|
|
// At least one returning row — fetch the timeout config once and attach.
|
|
const { returning_chat_confirmation_timeout_seconds } = await getReturningChatConfirmationTimeoutSeconds()
|
|
return rows.map((r) =>
|
|
r.request_type === PairingRequestType.RETURNING
|
|
? { ...r, confirmation_timeout_seconds: returning_chat_confirmation_timeout_seconds }
|
|
: r
|
|
)
|
|
}
|
|
|
|
export const getSessionStatus = async (sessionId) => {
|
|
const [session] = await sql`
|
|
SELECT cs.id, cs.customer_id, cs.mitra_id, cs.status, cs.topic_sensitivity,
|
|
cs.created_at, cs.paired_at, cs.ended_at, cs.ended_by,
|
|
m.display_name AS mitra_display_name
|
|
FROM chat_sessions cs
|
|
LEFT JOIN mitras m ON m.id = cs.mitra_id
|
|
WHERE cs.id = ${sessionId}
|
|
`
|
|
return session
|
|
}
|