import { getDb } from '../db/client.js' import { publish } from '../plugins/valkey.js' import { UserType, SessionStatus, MessageStatus, WsMessage } from '../constants.js' const sql = getDb() export const getActiveSessionByCustomer = async (customerId) => { const [session] = await sql` SELECT cs.id, cs.customer_id, cs.mitra_id, cs.status, cs.topic_sensitivity, cs.created_at, cs.paired_at, cs.duration_minutes, cs.price, cs.is_first_session_discount, cs.expires_at, cs.extended_minutes, m.display_name AS mitra_display_name FROM chat_sessions cs LEFT JOIN mitras m ON m.id = cs.mitra_id WHERE cs.customer_id = ${customerId} AND cs.status IN (${SessionStatus.ACTIVE}, ${SessionStatus.PENDING_PAYMENT}, ${SessionStatus.EXTENDING}, ${SessionStatus.CLOSING}) ORDER BY cs.created_at DESC LIMIT 1 ` return session } export const getActiveSessionsByMitra = async (mitraId) => { const sessions = await sql` SELECT cs.id, cs.customer_id, cs.status, cs.topic_sensitivity, cs.created_at, cs.paired_at, cs.duration_minutes, cs.expires_at, cs.extended_minutes, c.display_name AS customer_display_name FROM chat_sessions cs INNER JOIN customers c ON c.id = cs.customer_id WHERE cs.mitra_id = ${mitraId} AND cs.status IN (${SessionStatus.ACTIVE}, ${SessionStatus.PENDING_PAYMENT}, ${SessionStatus.EXTENDING}, ${SessionStatus.CLOSING}) ORDER BY cs.created_at DESC ` return sessions } export const endSession = async (sessionId, endedBy, userId) => { // Validate session belongs to this user const ownerCol = endedBy === UserType.CUSTOMER ? 'customer_id' : 'mitra_id' const [session] = await sql` UPDATE chat_sessions SET status = ${SessionStatus.COMPLETED}, ended_at = NOW(), ended_by = ${endedBy} WHERE id = ${sessionId} AND status IN (${SessionStatus.ACTIVE}, ${SessionStatus.PENDING_PAYMENT}) AND ${sql(ownerCol)} = ${userId} RETURNING id, customer_id, mitra_id, status, ended_at, ended_by ` if (!session) { throw Object.assign(new Error('Session not found or already ended'), { code: 'SESSION_NOT_ACTIVE', statusCode: 409, }) } // Notify both parties await publish(`session:${sessionId}:status`, { type: WsMessage.SESSION_ENDED, session_id: sessionId, ended_by: endedBy, }) return session } export const rerouteSession = async (sessionId, newMitraId) => { // Get current session const [current] = await sql` SELECT id, customer_id, mitra_id, status FROM chat_sessions WHERE id = ${sessionId} AND status IN (${SessionStatus.ACTIVE}, ${SessionStatus.PENDING_PAYMENT}) ` if (!current) { throw Object.assign(new Error('Session not found or not active'), { code: 'SESSION_NOT_ACTIVE', statusCode: 409, }) } // Verify new mitra is online const [newMitraStatus] = await sql` SELECT is_online FROM mitra_online_status WHERE mitra_id = ${newMitraId} ` if (!newMitraStatus?.is_online) { throw Object.assign(new Error('Target mitra is not online'), { code: 'MITRA_NOT_ONLINE', statusCode: 422, }) } const oldMitraId = current.mitra_id // Update session with new mitra (forced assignment) const [session] = await sql` UPDATE chat_sessions SET mitra_id = ${newMitraId} WHERE id = ${sessionId} RETURNING id, customer_id, mitra_id, status ` const [newMitra] = await sql` SELECT display_name FROM mitras WHERE id = ${newMitraId} ` // Notify customer about reroute await publish(`session:${sessionId}:status`, { type: WsMessage.REROUTED, session_id: sessionId, mitra_display_name: newMitra.display_name, }) // Notify old mitra session removed if (oldMitraId) { await publish(`mitra:${oldMitraId}:requests`, { type: WsMessage.SESSION_REROUTED, session_id: sessionId, }) } // Notify new mitra about new session await publish(`mitra:${newMitraId}:requests`, { type: WsMessage.SESSION_ASSIGNED, session_id: sessionId, }) return session } export const listSessions = async ({ page = 1, limit = 20, status, topic_sensitivity } = {}) => { const offset = (page - 1) * limit let conditions = sql`` if (status && topic_sensitivity) { conditions = sql`WHERE cs.status = ${status} AND cs.topic_sensitivity = ${topic_sensitivity}` } else if (status) { conditions = sql`WHERE cs.status = ${status}` } else if (topic_sensitivity) { conditions = sql`WHERE cs.topic_sensitivity = ${topic_sensitivity}` } const items = await sql` SELECT cs.id, cs.customer_id, cs.mitra_id, cs.status, cs.topic_sensitivity, cs.created_at, cs.paired_at, cs.ended_at, cs.ended_by, c.display_name AS customer_display_name, m.display_name AS mitra_display_name FROM chat_sessions cs INNER JOIN customers c ON c.id = cs.customer_id LEFT JOIN mitras m ON m.id = cs.mitra_id ${conditions} ORDER BY cs.created_at DESC LIMIT ${limit} OFFSET ${offset} ` const [{ count }] = await sql`SELECT COUNT(*) FROM chat_sessions cs ${conditions}` return { items, total: Number(count), page, limit } } export const getSessionById = async (sessionId) => { // `mode` lives on payment_sessions (chat | call), introduced in Phase 4.1. // The chat header pill needs it, so surface it on every session.info read. // Falls back to 'chat' for pre-3.7 rows where payment_session_id is null. const [session] = await sql` SELECT cs.id, cs.customer_id, cs.mitra_id, cs.status, cs.topic_sensitivity, cs.topics, cs.created_at, cs.paired_at, cs.ended_at, cs.ended_by, cs.duration_minutes, cs.price, cs.is_first_session_discount, cs.expires_at, cs.extended_minutes, COALESCE(ps.mode, 'chat') AS mode, c.display_name AS customer_display_name, m.display_name AS mitra_display_name FROM chat_sessions cs INNER JOIN customers c ON c.id = cs.customer_id LEFT JOIN mitras m ON m.id = cs.mitra_id LEFT JOIN payment_sessions ps ON ps.id = cs.payment_session_id WHERE cs.id = ${sessionId} ` return session } // --- Phase 3.1: Unread counts --- export const getActiveSessionByCustomerWithUnread = async (customerId) => { const [session] = await sql` SELECT cs.id, cs.customer_id, cs.mitra_id, cs.status, cs.topic_sensitivity, cs.created_at, cs.paired_at, cs.duration_minutes, cs.price, cs.is_first_session_discount, cs.expires_at, cs.extended_minutes, m.display_name AS mitra_display_name, (SELECT COUNT(*) FROM chat_messages cm WHERE cm.session_id = cs.id AND cm.sender_type = ${UserType.MITRA} AND cm.status IN (${MessageStatus.SENT}, ${MessageStatus.DELIVERED}))::int AS unread_count FROM chat_sessions cs LEFT JOIN mitras m ON m.id = cs.mitra_id WHERE cs.customer_id = ${customerId} AND cs.status IN (${SessionStatus.ACTIVE}, ${SessionStatus.PENDING_PAYMENT}, ${SessionStatus.EXTENDING}, ${SessionStatus.CLOSING}) ORDER BY cs.created_at DESC LIMIT 1 ` return session } export const getActiveSessionsByMitraWithUnread = async (mitraId) => { const sessions = await sql` SELECT cs.id, cs.customer_id, cs.status, cs.topic_sensitivity, cs.created_at, cs.paired_at, cs.duration_minutes, cs.expires_at, cs.extended_minutes, c.display_name AS customer_display_name, (SELECT COUNT(*) FROM chat_messages cm WHERE cm.session_id = cs.id AND cm.sender_type = ${UserType.CUSTOMER} AND cm.status IN (${MessageStatus.SENT}, ${MessageStatus.DELIVERED}))::int AS unread_count FROM chat_sessions cs INNER JOIN customers c ON c.id = cs.customer_id WHERE cs.mitra_id = ${mitraId} AND cs.status IN (${SessionStatus.ACTIVE}, ${SessionStatus.PENDING_PAYMENT}, ${SessionStatus.EXTENDING}, ${SessionStatus.CLOSING}) ORDER BY cs.created_at DESC ` return sessions } /** * Phase 4 Stage 10 — Selesai sub-tab uses cursor pagination on this endpoint. * * Cursor is a base64-encoded `|` of the last row's * `COALESCE(ended_at, created_at)` and `id`. The next page reads strictly * older rows, breaking ties on `id` so adjacent rows with the same timestamp * don't duplicate or skip across pages. */ const encodeHistoryCursor = (row) => { const ts = (row.ended_at ?? row.created_at).toISOString ? (row.ended_at ?? row.created_at).toISOString() : new Date(row.ended_at ?? row.created_at).toISOString() return Buffer.from(`${ts}|${row.id}`, 'utf8').toString('base64url') } const decodeHistoryCursor = (cursor) => { if (!cursor) return null try { const decoded = Buffer.from(cursor, 'base64url').toString('utf8') const [ts, id] = decoded.split('|') if (!ts || !id) return null return { ts, id } } catch { return null } } export const getCustomerHistory = async (customerId, { cursor = null, limit = 20 } = {}) => { const cap = Math.min(Math.max(parseInt(limit, 10) || 20, 1), 50) const decoded = decodeHistoryCursor(cursor) // Fetch one extra to determine has_more without a second query const fetch = cap + 1 const items = decoded ? await sql` SELECT cs.id, cs.mitra_id, cs.status, cs.topic_sensitivity, cs.topics, cs.created_at, cs.paired_at, cs.ended_at, cs.duration_minutes, cs.price, cs.is_first_session_discount, cs.extended_minutes, ps.mode AS mode, m.display_name AS mitra_display_name, COALESCE(mos.is_online, false) AS mitra_is_online, (SELECT message FROM session_closures WHERE session_id = cs.id AND user_type = ${UserType.MITRA} LIMIT 1) AS mitra_closure_message, (SELECT message FROM session_closures WHERE session_id = cs.id AND user_type = ${UserType.CUSTOMER} LIMIT 1) AS customer_closure_message, (SELECT COUNT(*)::int FROM chat_sessions x WHERE x.customer_id = ${customerId} AND x.mitra_id = cs.mitra_id AND x.status IN (${SessionStatus.COMPLETED}, ${SessionStatus.CLOSING})) AS sessions_count FROM chat_sessions cs LEFT JOIN mitras m ON m.id = cs.mitra_id LEFT JOIN mitra_online_status mos ON mos.mitra_id = cs.mitra_id LEFT JOIN payment_sessions ps ON ps.id = cs.payment_session_id WHERE cs.customer_id = ${customerId} AND cs.status IN (${SessionStatus.COMPLETED}, ${SessionStatus.CLOSING}) AND ( COALESCE(cs.ended_at, cs.created_at) < ${decoded.ts}::timestamptz OR (COALESCE(cs.ended_at, cs.created_at) = ${decoded.ts}::timestamptz AND cs.id < ${decoded.id}) ) ORDER BY COALESCE(cs.ended_at, cs.created_at) DESC, cs.id DESC LIMIT ${fetch} ` : await sql` SELECT cs.id, cs.mitra_id, cs.status, cs.topic_sensitivity, cs.topics, cs.created_at, cs.paired_at, cs.ended_at, cs.duration_minutes, cs.price, cs.is_first_session_discount, cs.extended_minutes, ps.mode AS mode, m.display_name AS mitra_display_name, COALESCE(mos.is_online, false) AS mitra_is_online, (SELECT message FROM session_closures WHERE session_id = cs.id AND user_type = ${UserType.MITRA} LIMIT 1) AS mitra_closure_message, (SELECT message FROM session_closures WHERE session_id = cs.id AND user_type = ${UserType.CUSTOMER} LIMIT 1) AS customer_closure_message, (SELECT COUNT(*)::int FROM chat_sessions x WHERE x.customer_id = ${customerId} AND x.mitra_id = cs.mitra_id AND x.status IN (${SessionStatus.COMPLETED}, ${SessionStatus.CLOSING})) AS sessions_count FROM chat_sessions cs LEFT JOIN mitras m ON m.id = cs.mitra_id LEFT JOIN mitra_online_status mos ON mos.mitra_id = cs.mitra_id LEFT JOIN payment_sessions ps ON ps.id = cs.payment_session_id WHERE cs.customer_id = ${customerId} AND cs.status IN (${SessionStatus.COMPLETED}, ${SessionStatus.CLOSING}) ORDER BY COALESCE(cs.ended_at, cs.created_at) DESC, cs.id DESC LIMIT ${fetch} ` const hasMore = items.length > cap const page = hasMore ? items.slice(0, cap) : items const nextCursor = hasMore ? encodeHistoryCursor(page[page.length - 1]) : null return { items: page, next_cursor: nextCursor, has_more: hasMore } } export const getMitraHistory = async (mitraId, { page = 1, limit = 20 } = {}) => { const offset = (page - 1) * limit const items = await sql` SELECT cs.id, cs.customer_id, cs.status, cs.topic_sensitivity, cs.created_at, cs.paired_at, cs.ended_at, cs.duration_minutes, cs.price, cs.is_first_session_discount, cs.extended_minutes, c.display_name AS customer_display_name, (SELECT message FROM session_closures WHERE session_id = cs.id AND user_type = ${UserType.MITRA} LIMIT 1) AS mitra_closure_message, (SELECT message FROM session_closures WHERE session_id = cs.id AND user_type = ${UserType.CUSTOMER} LIMIT 1) AS customer_closure_message FROM chat_sessions cs INNER JOIN customers c ON c.id = cs.customer_id WHERE cs.mitra_id = ${mitraId} AND cs.status IN (${SessionStatus.COMPLETED}, ${SessionStatus.CLOSING}) ORDER BY COALESCE(cs.ended_at, cs.created_at) DESC LIMIT ${limit} OFFSET ${offset} ` const [{ count }] = await sql` SELECT COUNT(*) FROM chat_sessions WHERE mitra_id = ${mitraId} AND status IN (${SessionStatus.COMPLETED}, ${SessionStatus.CLOSING}) ` return { items, total: Number(count), page, limit } }