Phase 2 scaffold: mitra online status & pairing logic

Add mitra online/offline status with heartbeat-based auto-offline,
customer-mitra pairing via Valkey pub/sub blast, session management,
and control center dashboard with real-time stats.

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
This commit is contained in:
2026-04-05 23:17:49 +08:00
parent a7a2a32d27
commit d668112edd
44 changed files with 2800 additions and 80 deletions

View File

@@ -6,6 +6,9 @@ INTERNAL_HOST=127.0.0.1
# Database
DATABASE_URL=postgresql://user:password@localhost:5432/halobestie
# Valkey / Redis
VALKEY_URL=redis://localhost:6379
# Firebase
FIREBASE_PROJECT_ID=your-firebase-project-id
FIREBASE_CLIENT_EMAIL=your-service-account@project.iam.gserviceaccount.com

View File

@@ -14,6 +14,7 @@
"fastify": "^4.28.1",
"@fastify/sensible": "^5.6.0",
"firebase-admin": "^12.2.0",
"ioredis": "^5.4.1",
"pg": "^8.12.0",
"postgres": "^3.4.4",
"zod": "^3.23.8",

View File

@@ -5,6 +5,7 @@ import { ccUserRoutes } from './routes/internal/cc-user.routes.js'
import { rolesRoutes } from './routes/internal/roles.routes.js'
import { internalAuthRoutes } from './routes/internal/auth.routes.js'
import { internalConfigRoutes } from './routes/internal/config.routes.js'
import { sessionManagementRoutes } from './routes/internal/session.routes.js'
import { errorHandler } from './plugins/error-handler.js'
export const buildInternalApp = async () => {
@@ -18,6 +19,7 @@ export const buildInternalApp = async () => {
app.register(ccUserRoutes, { prefix: '/internal/control-center-users' })
app.register(rolesRoutes, { prefix: '/internal/roles' })
app.register(internalConfigRoutes, { prefix: '/internal/config' })
app.register(sessionManagementRoutes, { prefix: '/internal/sessions' })
return app
}

View File

@@ -4,6 +4,9 @@ import { customerRoutes } from './routes/public/customer.routes.js'
import { clientAuthRoutes } from './routes/public/client.auth.routes.js'
import { mitraAuthRoutes } from './routes/public/mitra.auth.routes.js'
import { sharedConfigRoutes } from './routes/public/shared.config.routes.js'
import { mitraStatusRoutes } from './routes/public/mitra.status.routes.js'
import { mitraChatRoutes } from './routes/public/mitra.chat.routes.js'
import { clientChatRoutes } from './routes/public/client.chat.routes.js'
import { errorHandler } from './plugins/error-handler.js'
export const buildPublicApp = async () => {
@@ -16,6 +19,9 @@ export const buildPublicApp = async () => {
app.register(sharedConfigRoutes, { prefix: '/api/shared/config' })
app.register(clientAuthRoutes, { prefix: '/api/client/auth' })
app.register(mitraAuthRoutes, { prefix: '/api/mitra/auth' })
app.register(mitraStatusRoutes, { prefix: '/api/mitra/status' })
app.register(mitraChatRoutes, { prefix: '/api/mitra/chat-requests' })
app.register(clientChatRoutes, { prefix: '/api/client/chat' })
return app
}

View File

@@ -64,6 +64,84 @@ const migrate = async () => {
ON CONFLICT (key) DO NOTHING
`
// --- Phase 2: Mitra Online Status & Pairing ---
await sql`
CREATE TABLE IF NOT EXISTS mitra_online_status (
id UUID PRIMARY KEY DEFAULT gen_random_uuid(),
mitra_id UUID NOT NULL UNIQUE REFERENCES mitras(id),
is_online BOOLEAN NOT NULL DEFAULT FALSE,
last_online_at TIMESTAMPTZ,
last_offline_at TIMESTAMPTZ,
last_heartbeat_at TIMESTAMPTZ,
updated_at TIMESTAMPTZ NOT NULL DEFAULT NOW()
)
`
await sql`
CREATE TABLE IF NOT EXISTS mitra_online_logs (
id UUID PRIMARY KEY DEFAULT gen_random_uuid(),
mitra_id UUID NOT NULL REFERENCES mitras(id),
status VARCHAR(10) NOT NULL,
timestamp TIMESTAMPTZ NOT NULL DEFAULT NOW()
)
`
await sql`
CREATE INDEX IF NOT EXISTS idx_mitra_online_logs_mitra_id
ON mitra_online_logs (mitra_id)
`
await sql`
CREATE TABLE IF NOT EXISTS chat_sessions (
id UUID PRIMARY KEY DEFAULT gen_random_uuid(),
customer_id UUID NOT NULL REFERENCES customers(id),
mitra_id UUID REFERENCES mitras(id),
status VARCHAR(30) NOT NULL DEFAULT 'searching',
created_at TIMESTAMPTZ NOT NULL DEFAULT NOW(),
paired_at TIMESTAMPTZ,
ended_at TIMESTAMPTZ,
ended_by VARCHAR(20)
)
`
await sql`
CREATE INDEX IF NOT EXISTS idx_chat_sessions_customer_id
ON chat_sessions (customer_id)
`
await sql`
CREATE INDEX IF NOT EXISTS idx_chat_sessions_mitra_id
ON chat_sessions (mitra_id)
`
await sql`
CREATE INDEX IF NOT EXISTS idx_chat_sessions_status
ON chat_sessions (status)
`
await sql`
CREATE TABLE IF NOT EXISTS chat_request_notifications (
id UUID PRIMARY KEY DEFAULT gen_random_uuid(),
session_id UUID NOT NULL REFERENCES chat_sessions(id),
mitra_id UUID NOT NULL REFERENCES mitras(id),
notified_at TIMESTAMPTZ NOT NULL DEFAULT NOW(),
response VARCHAR(20),
responded_at TIMESTAMPTZ
)
`
await sql`
CREATE INDEX IF NOT EXISTS idx_chat_request_notifications_session_id
ON chat_request_notifications (session_id)
`
await sql`
INSERT INTO app_config (key, value)
VALUES ('max_customers_per_mitra', '{"value": 3}')
ON CONFLICT (key) DO NOTHING
`
console.log('Migration complete.')
await sql.end()
}

View File

@@ -0,0 +1,47 @@
import Redis from 'ioredis'
let pub
let sub
let client
export const getValkeyClient = () => {
if (!client) {
const url = process.env.VALKEY_URL || 'redis://localhost:6379'
client = new Redis(url)
}
return client
}
export const getValkeyPub = () => {
if (!pub) {
const url = process.env.VALKEY_URL || 'redis://localhost:6379'
pub = new Redis(url)
}
return pub
}
export const getValkeySub = () => {
if (!sub) {
const url = process.env.VALKEY_URL || 'redis://localhost:6379'
sub = new Redis(url)
}
return sub
}
export const publish = async (channel, data) => {
const pubClient = getValkeyPub()
await pubClient.publish(channel, JSON.stringify(data))
}
export const subscribe = (channel, callback) => {
const subClient = getValkeySub()
subClient.subscribe(channel)
subClient.on('message', (ch, message) => {
if (ch === channel) {
callback(JSON.parse(message))
}
})
return () => {
subClient.unsubscribe(channel)
}
}

View File

@@ -1,6 +1,6 @@
import { authenticate, requirePermission } from '../../plugins/auth.js'
import { getCcUserByFirebaseUid } from '../../services/cc-user.service.js'
import { getAnonymityConfig, setAnonymityConfig } from '../../services/config.service.js'
import { getAnonymityConfig, setAnonymityConfig, getMaxCustomersPerMitra, setMaxCustomersPerMitra } from '../../services/config.service.js'
const attachCcUser = async (request, reply) => {
const user = await getCcUserByFirebaseUid(request.firebaseUser.uid)
@@ -26,4 +26,22 @@ export const internalConfigRoutes = async (app) => {
const config = await setAnonymityConfig(anonymity_enabled)
return reply.send({ success: true, data: config })
})
app.get('/max-customers-per-mitra', {
preHandler: [authenticate, attachCcUser, requirePermission('config', 'read')],
}, async (request, reply) => {
const config = await getMaxCustomersPerMitra()
return reply.send({ success: true, data: config })
})
app.patch('/max-customers-per-mitra', {
preHandler: [authenticate, attachCcUser, requirePermission('config', 'update')],
}, async (request, reply) => {
const { max_customers_per_mitra } = request.body ?? {}
if (typeof max_customers_per_mitra !== 'number' || max_customers_per_mitra < 1) {
return reply.code(422).send({ success: false, error: { code: 'VALIDATION_ERROR', message: 'max_customers_per_mitra must be a positive number' } })
}
const config = await setMaxCustomersPerMitra(max_customers_per_mitra)
return reply.send({ success: true, data: config })
})
}

View File

@@ -1,6 +1,7 @@
import { authenticate, requirePermission } from '../../plugins/auth.js'
import { getCcUserByFirebaseUid } from '../../services/cc-user.service.js'
import { createMitra, listMitras, updateMitraStatus } from '../../services/mitra.service.js'
import { getOnlineMitras, getOnlineLogs } from '../../services/mitra-status.service.js'
const attachCcUser = async (request, reply) => {
const user = await getCcUserByFirebaseUid(request.firebaseUser.uid)
@@ -42,4 +43,19 @@ export const mitraManagementRoutes = async (app) => {
const mitra = await updateMitraStatus(request.params.id, is_active)
return reply.send({ success: true, data: mitra })
})
app.get('/online', {
preHandler: [authenticate, attachCcUser, requirePermission('mitra', 'read')],
}, async (request, reply) => {
const mitras = await getOnlineMitras()
return reply.send({ success: true, data: mitras })
})
app.get('/:id/online-logs', {
preHandler: [authenticate, attachCcUser, requirePermission('mitra', 'read')],
}, async (request, reply) => {
const { page = 1, limit = 50 } = request.query
const result = await getOnlineLogs(request.params.id, { page: Number(page), limit: Number(limit) })
return reply.send({ success: true, data: result })
})
}

View File

@@ -0,0 +1,48 @@
import { authenticate, requirePermission } from '../../plugins/auth.js'
import { getCcUserByFirebaseUid } from '../../services/cc-user.service.js'
import { listSessions, getSessionById, rerouteSession } from '../../services/session.service.js'
import { getDashboardStats } from '../../services/dashboard.service.js'
const attachCcUser = async (request, reply) => {
const user = await getCcUserByFirebaseUid(request.firebaseUser.uid)
if (!user) return reply.code(403).send({ success: false, error: { code: 'FORBIDDEN', message: 'Not a control center user' } })
request.ccUser = user
}
export const sessionManagementRoutes = async (app) => {
app.get('/dashboard/stats', {
preHandler: [authenticate, attachCcUser, requirePermission('config', 'read')],
}, async (request, reply) => {
const stats = await getDashboardStats()
return reply.send({ success: true, data: stats })
})
app.get('/', {
preHandler: [authenticate, attachCcUser, requirePermission('config', 'read')],
}, async (request, reply) => {
const { page = 1, limit = 20, status } = request.query
const result = await listSessions({ page: Number(page), limit: Number(limit), status })
return reply.send({ success: true, data: result })
})
app.get('/:sessionId', {
preHandler: [authenticate, attachCcUser, requirePermission('config', 'read')],
}, async (request, reply) => {
const session = await getSessionById(request.params.sessionId)
if (!session) {
return reply.code(404).send({ success: false, error: { code: 'NOT_FOUND', message: 'Session not found' } })
}
return reply.send({ success: true, data: session })
})
app.post('/:sessionId/reroute', {
preHandler: [authenticate, attachCcUser, requirePermission('config', 'update')],
}, async (request, reply) => {
const { new_mitra_id } = request.body ?? {}
if (!new_mitra_id) {
return reply.code(422).send({ success: false, error: { code: 'VALIDATION_ERROR', message: 'new_mitra_id is required' } })
}
const session = await rerouteSession(request.params.sessionId, new_mitra_id)
return reply.send({ success: true, data: session })
})
}

View File

@@ -0,0 +1,75 @@
import { authenticate } from '../../plugins/auth.js'
import { getCustomerByFirebaseUid } from '../../services/customer.service.js'
import { createPairingRequest, cancelPairingRequest, getSessionStatus } from '../../services/pairing.service.js'
import { getActiveSessionByCustomer, endSession } from '../../services/session.service.js'
import { subscribe } from '../../plugins/valkey.js'
const resolveCustomer = async (request, reply) => {
const customer = await getCustomerByFirebaseUid(request.firebaseUser.uid)
if (!customer) {
return reply.code(404).send({
success: false,
error: { code: 'ACCOUNT_NOT_FOUND', message: 'Customer account not found' },
})
}
request.customer = customer
}
export const clientChatRoutes = async (app) => {
app.post('/request', { preHandler: [authenticate, resolveCustomer] }, async (request, reply) => {
const session = await createPairingRequest(request.customer.id)
return reply.code(201).send({ success: true, data: session })
})
app.get('/request/:sessionId/status', { preHandler: [authenticate, resolveCustomer] }, async (request, reply) => {
const { sessionId } = request.params
// SSE stream for real-time status updates
reply.raw.writeHead(200, {
'Content-Type': 'text/event-stream',
'Cache-Control': 'no-cache',
'Connection': 'keep-alive',
})
// Send current status immediately
const current = await getSessionStatus(sessionId)
if (current) {
reply.raw.write(`data: ${JSON.stringify(current)}\n\n`)
}
// If already in a terminal state, close
if (current && ['active', 'completed', 'cancelled', 'expired'].includes(current.status)) {
reply.raw.end()
return
}
// Subscribe to status updates
const unsubscribe = subscribe(`session:${sessionId}:status`, (data) => {
reply.raw.write(`data: ${JSON.stringify(data)}\n\n`)
if (['paired', 'expired', 'session_ended'].includes(data.type)) {
reply.raw.end()
unsubscribe()
}
})
// Clean up on client disconnect
request.raw.on('close', () => {
unsubscribe()
})
})
app.post('/request/:sessionId/cancel', { preHandler: [authenticate, resolveCustomer] }, async (request, reply) => {
const session = await cancelPairingRequest(request.params.sessionId, request.customer.id)
return reply.send({ success: true, data: session })
})
app.get('/session/active', { preHandler: [authenticate, resolveCustomer] }, async (request, reply) => {
const session = await getActiveSessionByCustomer(request.customer.id)
return reply.send({ success: true, data: session ?? null })
})
app.post('/session/:sessionId/end', { preHandler: [authenticate, resolveCustomer] }, async (request, reply) => {
const session = await endSession(request.params.sessionId, 'customer')
return reply.send({ success: true, data: session })
})
}

View File

@@ -0,0 +1,69 @@
import { authenticate } from '../../plugins/auth.js'
import { getMitraByFirebaseUid } from '../../services/mitra.service.js'
import { acceptPairingRequest, declinePairingRequest } from '../../services/pairing.service.js'
import { getActiveSessionsByMitra, endSession } from '../../services/session.service.js'
import { subscribe } from '../../plugins/valkey.js'
const resolveMitra = async (request, reply) => {
const mitra = await getMitraByFirebaseUid(request.firebaseUser.uid)
if (!mitra) {
return reply.code(404).send({
success: false,
error: { code: 'ACCOUNT_NOT_FOUND', message: 'Mitra account not found' },
})
}
if (!mitra.is_active) {
return reply.code(403).send({
success: false,
error: { code: 'ACCOUNT_INACTIVE', message: 'Account is inactive' },
})
}
request.mitra = mitra
}
export const mitraChatRoutes = async (app) => {
app.get('/incoming', { preHandler: [authenticate, resolveMitra] }, async (request, reply) => {
const mitraId = request.mitra.id
// SSE stream for incoming chat requests
reply.raw.writeHead(200, {
'Content-Type': 'text/event-stream',
'Cache-Control': 'no-cache',
'Connection': 'keep-alive',
})
// Keep-alive ping
const pingInterval = setInterval(() => {
reply.raw.write(': ping\n\n')
}, 15_000)
const unsubscribe = subscribe(`mitra:${mitraId}:requests`, (data) => {
reply.raw.write(`data: ${JSON.stringify(data)}\n\n`)
})
request.raw.on('close', () => {
clearInterval(pingInterval)
unsubscribe()
})
})
app.post('/:sessionId/accept', { preHandler: [authenticate, resolveMitra] }, async (request, reply) => {
const session = await acceptPairingRequest(request.params.sessionId, request.mitra.id)
return reply.send({ success: true, data: session })
})
app.post('/:sessionId/decline', { preHandler: [authenticate, resolveMitra] }, async (request, reply) => {
await declinePairingRequest(request.params.sessionId, request.mitra.id)
return reply.send({ success: true })
})
app.get('/sessions/active', { preHandler: [authenticate, resolveMitra] }, async (request, reply) => {
const sessions = await getActiveSessionsByMitra(request.mitra.id)
return reply.send({ success: true, data: sessions })
})
app.post('/sessions/:sessionId/end', { preHandler: [authenticate, resolveMitra] }, async (request, reply) => {
const session = await endSession(request.params.sessionId, 'mitra')
return reply.send({ success: true, data: session })
})
}

View File

@@ -0,0 +1,43 @@
import { authenticate } from '../../plugins/auth.js'
import { getMitraByFirebaseUid } from '../../services/mitra.service.js'
import * as mitraStatusService from '../../services/mitra-status.service.js'
export const mitraStatusRoutes = async (app) => {
// Resolve mitra from Firebase token
const resolveMitra = async (request, reply) => {
const mitra = await getMitraByFirebaseUid(request.firebaseUser.uid)
if (!mitra) {
return reply.code(404).send({
success: false,
error: { code: 'ACCOUNT_NOT_FOUND', message: 'Mitra account not found' },
})
}
if (!mitra.is_active) {
return reply.code(403).send({
success: false,
error: { code: 'ACCOUNT_INACTIVE', message: 'Account is inactive' },
})
}
request.mitra = mitra
}
app.post('/online', { preHandler: [authenticate, resolveMitra] }, async (request, reply) => {
await mitraStatusService.setOnline(request.mitra.id)
return reply.send({ success: true, data: { is_online: true } })
})
app.post('/offline', { preHandler: [authenticate, resolveMitra] }, async (request, reply) => {
await mitraStatusService.setOffline(request.mitra.id)
return reply.send({ success: true, data: { is_online: false } })
})
app.post('/heartbeat', { preHandler: [authenticate, resolveMitra] }, async (request, reply) => {
await mitraStatusService.heartbeat(request.mitra.id)
return reply.send({ success: true })
})
app.get('/', { preHandler: [authenticate, resolveMitra] }, async (request, reply) => {
const status = await mitraStatusService.getStatus(request.mitra.id)
return reply.send({ success: true, data: status })
})
}

View File

@@ -1,6 +1,7 @@
import 'dotenv/config'
import { buildPublicApp } from './app.public.js'
import { buildInternalApp } from './app.internal.js'
import { autoOfflineStaleMitras } from './services/mitra-status.service.js'
const PUBLIC_PORT = process.env.PUBLIC_PORT || 3000
const INTERNAL_PORT = process.env.INTERNAL_PORT || 3001
@@ -15,6 +16,16 @@ const start = async () => {
await internalApp.listen({ port: INTERNAL_PORT, host: INTERNAL_HOST })
console.log(`Internal API listening on ${INTERNAL_HOST}:${INTERNAL_PORT}`)
// Auto-offline mitras with stale heartbeat (every 30s)
setInterval(async () => {
try {
const count = await autoOfflineStaleMitras(45)
if (count > 0) console.log(`Auto-offlined ${count} stale mitra(s)`)
} catch (err) {
console.error('Auto-offline check failed:', err)
}
}, 30_000)
}
start().catch((err) => {

View File

@@ -15,3 +15,17 @@ export const setAnonymityConfig = async (enabled) => {
`
return { anonymity_enabled: enabled }
}
export const getMaxCustomersPerMitra = async () => {
const [row] = await sql`SELECT value FROM app_config WHERE key = 'max_customers_per_mitra'`
return { max_customers_per_mitra: row?.value?.value ?? 3 }
}
export const setMaxCustomersPerMitra = async (value) => {
await sql`
INSERT INTO app_config (key, value, updated_at)
VALUES ('max_customers_per_mitra', ${sql.json({ value })}, NOW())
ON CONFLICT (key) DO UPDATE SET value = EXCLUDED.value, updated_at = NOW()
`
return { max_customers_per_mitra: value }
}

View File

@@ -0,0 +1,28 @@
import { getDb } from '../db/client.js'
const sql = getDb()
export const getDashboardStats = async () => {
const [[{ active_chats }], [{ online_mitras }], [{ pending_requests }]] = await Promise.all([
sql`SELECT COUNT(*) AS active_chats FROM chat_sessions WHERE status IN ('active', 'pending_payment')`,
sql`SELECT COUNT(*) AS online_mitras FROM mitra_online_status WHERE is_online = true`,
sql`SELECT COUNT(*) AS pending_requests FROM chat_sessions WHERE status IN ('searching', 'pending_acceptance')`,
])
const customersPerMitra = await sql`
SELECT m.id, m.display_name,
(SELECT COUNT(*) FROM chat_sessions cs
WHERE cs.mitra_id = m.id AND cs.status IN ('active', 'pending_payment')) AS active_session_count
FROM mitras m
INNER JOIN mitra_online_status s ON s.mitra_id = m.id
WHERE s.is_online = true
ORDER BY active_session_count DESC
`
return {
active_chats: Number(active_chats),
online_mitras: Number(online_mitras),
pending_requests: Number(pending_requests),
customers_per_mitra: customersPerMitra,
}
}

View File

@@ -0,0 +1,107 @@
import { getDb } from '../db/client.js'
const sql = getDb()
export const ensureStatusRow = async (mitraId) => {
await sql`
INSERT INTO mitra_online_status (mitra_id)
VALUES (${mitraId})
ON CONFLICT (mitra_id) DO NOTHING
`
}
export const setOnline = async (mitraId) => {
await ensureStatusRow(mitraId)
const now = new Date()
await sql`
UPDATE mitra_online_status
SET is_online = true, last_online_at = ${now}, last_heartbeat_at = ${now}, updated_at = ${now}
WHERE mitra_id = ${mitraId}
`
await sql`
INSERT INTO mitra_online_logs (mitra_id, status) VALUES (${mitraId}, 'online')
`
}
export const setOffline = async (mitraId) => {
await ensureStatusRow(mitraId)
const now = new Date()
const [status] = await sql`
SELECT is_online FROM mitra_online_status WHERE mitra_id = ${mitraId}
`
if (!status?.is_online) return
await sql`
UPDATE mitra_online_status
SET is_online = false, last_offline_at = ${now}, updated_at = ${now}
WHERE mitra_id = ${mitraId}
`
await sql`
INSERT INTO mitra_online_logs (mitra_id, status) VALUES (${mitraId}, 'offline')
`
}
export const heartbeat = async (mitraId) => {
const now = new Date()
await sql`
UPDATE mitra_online_status
SET last_heartbeat_at = ${now}, updated_at = ${now}
WHERE mitra_id = ${mitraId} AND is_online = true
`
}
export const getStatus = async (mitraId) => {
await ensureStatusRow(mitraId)
const [status] = await sql`
SELECT is_online, last_online_at, last_offline_at, updated_at
FROM mitra_online_status
WHERE mitra_id = ${mitraId}
`
return status
}
export const getOnlineMitras = async () => {
const mitras = await sql`
SELECT m.id, m.display_name, m.phone, s.last_online_at, s.updated_at,
(SELECT COUNT(*) FROM chat_sessions cs
WHERE cs.mitra_id = m.id AND cs.status IN ('active', 'pending_payment')) AS active_session_count
FROM mitras m
INNER JOIN mitra_online_status s ON s.mitra_id = m.id
WHERE s.is_online = true AND m.is_active = true
ORDER BY s.last_online_at DESC
`
return mitras
}
export const getOnlineLogs = async (mitraId, { page = 1, limit = 50 } = {}) => {
const offset = (page - 1) * limit
const items = await sql`
SELECT id, status, timestamp
FROM mitra_online_logs
WHERE mitra_id = ${mitraId}
ORDER BY timestamp DESC
LIMIT ${limit} OFFSET ${offset}
`
const [{ count }] = await sql`
SELECT COUNT(*) FROM mitra_online_logs WHERE mitra_id = ${mitraId}
`
return { items, total: Number(count), page, limit }
}
export const autoOfflineStaleMitras = async (staleSeconds = 45) => {
const stale = await sql`
UPDATE mitra_online_status
SET is_online = false, last_offline_at = NOW(), updated_at = NOW()
WHERE is_online = true
AND last_heartbeat_at < NOW() - ${staleSeconds + ' seconds'}::interval
RETURNING mitra_id
`
for (const row of stale) {
await sql`
INSERT INTO mitra_online_logs (mitra_id, status) VALUES (${row.mitra_id}, 'offline')
`
}
return stale.length
}

View File

@@ -0,0 +1,247 @@
import { getDb } from '../db/client.js'
import { getMaxCustomersPerMitra } from './config.service.js'
import { publish } from '../plugins/valkey.js'
const sql = getDb()
// Timeout map for active pairing requests (sessionId → timeoutId)
const pairingTimeouts = new Map()
export const findAvailableMitras = async () => {
const { max_customers_per_mitra } = await getMaxCustomersPerMitra()
const mitras = await sql`
SELECT m.id, m.display_name
FROM mitras m
INNER JOIN mitra_online_status s ON s.mitra_id = m.id
WHERE m.is_active = true
AND s.is_online = true
AND (
SELECT COUNT(*) FROM chat_sessions cs
WHERE cs.mitra_id = m.id AND cs.status IN ('active', 'pending_payment')
) < ${max_customers_per_mitra}
`
return mitras
}
export const createPairingRequest = async (customerId) => {
// Check for existing active session or request
const [existing] = await sql`
SELECT id, status FROM chat_sessions
WHERE customer_id = ${customerId}
AND status IN ('searching', 'pending_acceptance', 'pending_payment', 'active')
`
if (existing) {
throw Object.assign(new Error('Customer already has an active session or request'), {
code: 'ALREADY_ACTIVE', statusCode: 409,
})
}
const availableMitras = await findAvailableMitras()
if (availableMitras.length === 0) {
throw Object.assign(new Error('No bestie available'), {
code: 'NO_MITRA_AVAILABLE', statusCode: 404,
})
}
// Create session
const [session] = await sql`
INSERT INTO chat_sessions (customer_id, status)
VALUES (${customerId}, 'pending_acceptance')
RETURNING id, customer_id, status, created_at
`
// Create notifications for all available mitras
for (const mitra of availableMitras) {
await sql`
INSERT INTO chat_request_notifications (session_id, mitra_id)
VALUES (${session.id}, ${mitra.id})
`
// Publish to mitra's channel
await publish(`mitra:${mitra.id}:requests`, {
type: 'chat_request',
session_id: session.id,
created_at: session.created_at,
})
}
// Start 60s timeout
const timeoutId = setTimeout(async () => {
try {
await expirePairingRequest(session.id)
} catch (_) {}
}, 60_000)
pairingTimeouts.set(session.id, timeoutId)
return session
}
export const acceptPairingRequest = async (sessionId, mitraId) => {
// Use a transaction-like approach: update only if status is still pending_acceptance
const [session] = await sql`
UPDATE chat_sessions
SET mitra_id = ${mitraId}, status = 'pending_payment', paired_at = NOW()
WHERE id = ${sessionId} AND status = 'pending_acceptance' AND mitra_id IS NULL
RETURNING id, customer_id, mitra_id, status, paired_at
`
if (!session) {
throw Object.assign(new Error('Request already accepted or expired'), {
code: 'REQUEST_UNAVAILABLE', statusCode: 409,
})
}
// Mark this mitra's notification as accepted
await sql`
UPDATE chat_request_notifications
SET response = 'accepted', responded_at = NOW()
WHERE session_id = ${sessionId} AND mitra_id = ${mitraId}
`
// Mark other mitras' notifications as ignored
await sql`
UPDATE chat_request_notifications
SET response = 'ignored', responded_at = NOW()
WHERE session_id = ${sessionId} AND mitra_id != ${mitraId} AND response IS NULL
`
// Clear timeout
const timeoutId = pairingTimeouts.get(sessionId)
if (timeoutId) {
clearTimeout(timeoutId)
pairingTimeouts.delete(sessionId)
}
// Auto-skip payment for now: move to active
const [activeSession] = await sql`
UPDATE chat_sessions SET status = 'active'
WHERE id = ${sessionId}
RETURNING id, customer_id, mitra_id, status, paired_at
`
// Get mitra display name for customer notification
const [mitra] = await sql`
SELECT display_name FROM mitras WHERE id = ${mitraId}
`
// Notify customer
await publish(`session:${sessionId}:status`, {
type: 'paired',
session_id: sessionId,
mitra_display_name: mitra.display_name,
status: 'active',
})
// Notify other mitras to dismiss the request
const notifications = await sql`
SELECT mitra_id FROM chat_request_notifications
WHERE session_id = ${sessionId} AND mitra_id != ${mitraId}
`
for (const n of notifications) {
await publish(`mitra:${n.mitra_id}:requests`, {
type: 'chat_request_closed',
session_id: sessionId,
})
}
return activeSession
}
export const declinePairingRequest = async (sessionId, mitraId) => {
await sql`
UPDATE chat_request_notifications
SET response = 'declined', responded_at = NOW()
WHERE session_id = ${sessionId} AND mitra_id = ${mitraId} AND response IS NULL
`
}
export const cancelPairingRequest = async (sessionId, customerId) => {
const [session] = await sql`
UPDATE chat_sessions
SET status = 'cancelled'
WHERE id = ${sessionId} AND customer_id = ${customerId}
AND status IN ('searching', 'pending_acceptance')
RETURNING id, status
`
if (!session) {
throw Object.assign(new Error('Cannot cancel this request'), {
code: 'CANNOT_CANCEL', statusCode: 409,
})
}
// Clear timeout
const timeoutId = pairingTimeouts.get(sessionId)
if (timeoutId) {
clearTimeout(timeoutId)
pairingTimeouts.delete(sessionId)
}
// Mark all notifications as ignored
await sql`
UPDATE chat_request_notifications
SET response = 'ignored', responded_at = NOW()
WHERE session_id = ${sessionId} AND response IS NULL
`
// Notify mitras to dismiss
const notifications = await sql`
SELECT mitra_id FROM chat_request_notifications WHERE session_id = ${sessionId}
`
for (const n of notifications) {
await publish(`mitra:${n.mitra_id}:requests`, {
type: 'chat_request_closed',
session_id: sessionId,
})
}
return session
}
export const expirePairingRequest = async (sessionId) => {
const [session] = await sql`
UPDATE chat_sessions
SET status = 'expired'
WHERE id = ${sessionId} AND status = 'pending_acceptance'
RETURNING id, customer_id, status
`
if (!session) return null
pairingTimeouts.delete(sessionId)
// Mark all pending notifications as ignored
await sql`
UPDATE chat_request_notifications
SET response = 'ignored', responded_at = NOW()
WHERE session_id = ${sessionId} AND response IS NULL
`
// Notify customer
await publish(`session:${sessionId}:status`, {
type: 'expired',
session_id: sessionId,
})
// Notify mitras to dismiss
const notifications = await sql`
SELECT mitra_id FROM chat_request_notifications WHERE session_id = ${sessionId}
`
for (const n of notifications) {
await publish(`mitra:${n.mitra_id}:requests`, {
type: 'chat_request_closed',
session_id: sessionId,
})
}
return session
}
export const getSessionStatus = async (sessionId) => {
const [session] = await sql`
SELECT cs.id, cs.customer_id, cs.mitra_id, cs.status, cs.created_at, cs.paired_at, cs.ended_at, cs.ended_by,
m.display_name AS mitra_display_name
FROM chat_sessions cs
LEFT JOIN mitras m ON m.id = cs.mitra_id
WHERE cs.id = ${sessionId}
`
return session
}

View File

@@ -0,0 +1,149 @@
import { getDb } from '../db/client.js'
import { publish } from '../plugins/valkey.js'
const sql = getDb()
export const getActiveSessionByCustomer = async (customerId) => {
const [session] = await sql`
SELECT cs.id, cs.customer_id, cs.mitra_id, cs.status, cs.created_at, cs.paired_at,
m.display_name AS mitra_display_name
FROM chat_sessions cs
LEFT JOIN mitras m ON m.id = cs.mitra_id
WHERE cs.customer_id = ${customerId}
AND cs.status IN ('active', 'pending_payment')
ORDER BY cs.created_at DESC LIMIT 1
`
return session
}
export const getActiveSessionsByMitra = async (mitraId) => {
const sessions = await sql`
SELECT cs.id, cs.customer_id, cs.status, cs.created_at, cs.paired_at,
c.display_name AS customer_display_name
FROM chat_sessions cs
INNER JOIN customers c ON c.id = cs.customer_id
WHERE cs.mitra_id = ${mitraId}
AND cs.status IN ('active', 'pending_payment')
ORDER BY cs.created_at DESC
`
return sessions
}
export const endSession = async (sessionId, endedBy) => {
const [session] = await sql`
UPDATE chat_sessions
SET status = 'completed', ended_at = NOW(), ended_by = ${endedBy}
WHERE id = ${sessionId} AND status IN ('active', 'pending_payment')
RETURNING id, customer_id, mitra_id, status, ended_at, ended_by
`
if (!session) {
throw Object.assign(new Error('Session not found or already ended'), {
code: 'SESSION_NOT_ACTIVE', statusCode: 409,
})
}
// Notify both parties
await publish(`session:${sessionId}:status`, {
type: 'session_ended',
session_id: sessionId,
ended_by: endedBy,
})
return session
}
export const rerouteSession = async (sessionId, newMitraId) => {
// Get current session
const [current] = await sql`
SELECT id, customer_id, mitra_id, status FROM chat_sessions
WHERE id = ${sessionId} AND status IN ('active', 'pending_payment')
`
if (!current) {
throw Object.assign(new Error('Session not found or not active'), {
code: 'SESSION_NOT_ACTIVE', statusCode: 409,
})
}
// Verify new mitra is online
const [newMitraStatus] = await sql`
SELECT is_online FROM mitra_online_status WHERE mitra_id = ${newMitraId}
`
if (!newMitraStatus?.is_online) {
throw Object.assign(new Error('Target mitra is not online'), {
code: 'MITRA_NOT_ONLINE', statusCode: 422,
})
}
const oldMitraId = current.mitra_id
// Update session with new mitra (forced assignment)
const [session] = await sql`
UPDATE chat_sessions
SET mitra_id = ${newMitraId}
WHERE id = ${sessionId}
RETURNING id, customer_id, mitra_id, status
`
const [newMitra] = await sql`
SELECT display_name FROM mitras WHERE id = ${newMitraId}
`
// Notify customer about reroute
await publish(`session:${sessionId}:status`, {
type: 'rerouted',
session_id: sessionId,
mitra_display_name: newMitra.display_name,
})
// Notify old mitra session removed
if (oldMitraId) {
await publish(`mitra:${oldMitraId}:requests`, {
type: 'session_rerouted',
session_id: sessionId,
})
}
// Notify new mitra about new session
await publish(`mitra:${newMitraId}:requests`, {
type: 'session_assigned',
session_id: sessionId,
})
return session
}
export const listSessions = async ({ page = 1, limit = 20, status } = {}) => {
const offset = (page - 1) * limit
const conditions = status
? sql`WHERE cs.status = ${status}`
: sql``
const items = await sql`
SELECT cs.id, cs.customer_id, cs.mitra_id, cs.status, cs.created_at, cs.paired_at, cs.ended_at, cs.ended_by,
c.display_name AS customer_display_name,
m.display_name AS mitra_display_name
FROM chat_sessions cs
INNER JOIN customers c ON c.id = cs.customer_id
LEFT JOIN mitras m ON m.id = cs.mitra_id
${conditions}
ORDER BY cs.created_at DESC
LIMIT ${limit} OFFSET ${offset}
`
const [{ count }] = await sql`SELECT COUNT(*) FROM chat_sessions cs ${conditions}`
return { items, total: Number(count), page, limit }
}
export const getSessionById = async (sessionId) => {
const [session] = await sql`
SELECT cs.id, cs.customer_id, cs.mitra_id, cs.status, cs.created_at, cs.paired_at, cs.ended_at, cs.ended_by,
c.display_name AS customer_display_name,
m.display_name AS mitra_display_name
FROM chat_sessions cs
INNER JOIN customers c ON c.id = cs.customer_id
LEFT JOIN mitras m ON m.id = cs.mitra_id
WHERE cs.id = ${sessionId}
`
return session
}