Phase 3 scaffold: chat engine (WebSocket, FCM, pricing, timer, extension, history)
- Backend: WebSocket plugin, chat/pricing/timer/extension/closure/notification services - Client app: ChatBloc, pricing dialog, chat screen with message status, extension/goodbye flow, history - Mitra app: MitraChatBloc, ExtensionBloc, chat screen, extension accept/reject, history - Control center: free trial, extension timeout, early end config toggles - DB migration: chat_messages, session_closures, session_extensions, customer_transactions tables Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
This commit is contained in:
151
backend/src/plugins/websocket.js
Normal file
151
backend/src/plugins/websocket.js
Normal file
@@ -0,0 +1,151 @@
|
||||
import websocket from '@fastify/websocket'
|
||||
import { verifyFirebaseToken } from './firebase.js'
|
||||
import { getCustomerByFirebaseUid } from '../services/customer.service.js'
|
||||
import { getMitraByFirebaseUid } from '../services/mitra.service.js'
|
||||
import { subscribe, publish } from './valkey.js'
|
||||
|
||||
// Track active WebSocket connections: sessionId → { customer, mitra }
|
||||
const sessionConnections = new Map()
|
||||
|
||||
// Track user → socket mapping for FCM fallback detection
|
||||
const userSockets = new Map() // `customer:${id}` or `mitra:${id}` → socket
|
||||
|
||||
export const registerWebSocketPlugin = async (app) => {
|
||||
await app.register(websocket)
|
||||
}
|
||||
|
||||
export const isUserOnlineWs = (userType, userId) => {
|
||||
const key = `${userType}:${userId}`
|
||||
const socket = userSockets.get(key)
|
||||
return socket && socket.readyState === 1 // WebSocket.OPEN
|
||||
}
|
||||
|
||||
export const getSessionConnections = (sessionId) => {
|
||||
return sessionConnections.get(sessionId) || {}
|
||||
}
|
||||
|
||||
const sendToSocket = (socket, data) => {
|
||||
if (socket && socket.readyState === 1) {
|
||||
socket.send(JSON.stringify(data))
|
||||
return true
|
||||
}
|
||||
return false
|
||||
}
|
||||
|
||||
export const sendToSessionParticipant = (sessionId, userType, data) => {
|
||||
const conns = sessionConnections.get(sessionId)
|
||||
if (!conns) return false
|
||||
return sendToSocket(conns[userType], data)
|
||||
}
|
||||
|
||||
export const sendToUser = (userType, userId, data) => {
|
||||
const key = `${userType}:${userId}`
|
||||
const socket = userSockets.get(key)
|
||||
return sendToSocket(socket, data)
|
||||
}
|
||||
|
||||
export const registerWebSocketRoute = (app) => {
|
||||
app.get('/api/shared/ws', { websocket: true }, (socket, request) => {
|
||||
let authenticatedUser = null // { type: 'customer'|'mitra', id, sessionId }
|
||||
let valkeyUnsubscribes = []
|
||||
|
||||
const send = (data) => sendToSocket(socket, data)
|
||||
|
||||
socket.on('message', async (raw) => {
|
||||
let msg
|
||||
try {
|
||||
msg = JSON.parse(raw.toString())
|
||||
} catch {
|
||||
send({ type: 'error', message: 'Invalid JSON' })
|
||||
return
|
||||
}
|
||||
|
||||
// Handle auth message
|
||||
if (msg.type === 'auth') {
|
||||
try {
|
||||
const decoded = await verifyFirebaseToken(msg.token)
|
||||
const customer = await getCustomerByFirebaseUid(decoded.uid)
|
||||
const mitra = customer ? null : await getMitraByFirebaseUid(decoded.uid)
|
||||
|
||||
if (!customer && !mitra) {
|
||||
send({ type: 'error', message: 'Account not found' })
|
||||
socket.close()
|
||||
return
|
||||
}
|
||||
|
||||
const userType = customer ? 'customer' : 'mitra'
|
||||
const userId = customer ? customer.id : mitra.id
|
||||
const sessionId = msg.session_id
|
||||
|
||||
authenticatedUser = { type: userType, id: userId, sessionId }
|
||||
|
||||
// Register in connection maps
|
||||
const userKey = `${userType}:${userId}`
|
||||
userSockets.set(userKey, socket)
|
||||
|
||||
if (sessionId) {
|
||||
if (!sessionConnections.has(sessionId)) {
|
||||
sessionConnections.set(sessionId, {})
|
||||
}
|
||||
sessionConnections.get(sessionId)[userType] = socket
|
||||
}
|
||||
|
||||
// Subscribe to session channel for events from other services
|
||||
if (sessionId) {
|
||||
const unsub = subscribe(`session:${sessionId}:chat`, (data) => {
|
||||
// Don't echo messages back to sender
|
||||
if (data._sender_type === userType && data._sender_id === userId) return
|
||||
const { _sender_type, _sender_id, ...payload } = data
|
||||
send(payload)
|
||||
})
|
||||
valkeyUnsubscribes.push(unsub)
|
||||
}
|
||||
|
||||
send({ type: 'auth_ok', user_type: userType, user_id: userId })
|
||||
} catch (err) {
|
||||
send({ type: 'error', message: 'Authentication failed' })
|
||||
socket.close()
|
||||
}
|
||||
return
|
||||
}
|
||||
|
||||
// All other messages require authentication
|
||||
if (!authenticatedUser) {
|
||||
send({ type: 'error', message: 'Not authenticated. Send auth message first.' })
|
||||
return
|
||||
}
|
||||
|
||||
// Route message types to handlers via Valkey pub/sub
|
||||
const { type, ...payload } = msg
|
||||
await publish(`session:${authenticatedUser.sessionId}:incoming`, {
|
||||
type,
|
||||
...payload,
|
||||
_sender_type: authenticatedUser.type,
|
||||
_sender_id: authenticatedUser.id,
|
||||
_session_id: authenticatedUser.sessionId,
|
||||
})
|
||||
})
|
||||
|
||||
socket.on('close', () => {
|
||||
if (authenticatedUser) {
|
||||
const userKey = `${authenticatedUser.type}:${authenticatedUser.id}`
|
||||
userSockets.delete(userKey)
|
||||
|
||||
if (authenticatedUser.sessionId) {
|
||||
const conns = sessionConnections.get(authenticatedUser.sessionId)
|
||||
if (conns) {
|
||||
delete conns[authenticatedUser.type]
|
||||
if (!conns.customer && !conns.mitra) {
|
||||
sessionConnections.delete(authenticatedUser.sessionId)
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// Clean up Valkey subscriptions
|
||||
for (const unsub of valkeyUnsubscribes) {
|
||||
unsub()
|
||||
}
|
||||
})
|
||||
})
|
||||
}
|
||||
Reference in New Issue
Block a user