Files
P42_godDaemons/SimMaestro/maestroServer.js
T
2026-06-13 13:47:46 +00:00

265 lines
8.6 KiB
JavaScript

import { AccesRights } from '../accesRights.js'
import { createMysqlPool } from '../mysqlClient.js'
import { SimRepository } from './simRepository.js'
import { ArenaGroom } from './arenaGroom.js'
import { MaestroState } from './orchestrationState.js'
export class maestroServer {
constructor(configHelper, allRediscnx, debug) {
this.configHelper = configHelper
this.maestroConfig = configHelper.config
this.allRediscnx = allRediscnx
this.debug = debug
this.accessRights = new AccesRights(this.maestroConfig, debug)
this.arenaCnx = null
this.arenaCnxs = []
this.systemCnx = null
this.db = null
this.simRepo = null
this.arenaGroom = null
this.orchestrationState = MaestroState.IDLE
this.simulationId = null
this.agentIds = []
this.readyGods = new Map()
this.readyQuorumResolve = null
this.readyQuorumTimer = null
}
getMaestroSettings() {
const maestro = this.maestroConfig.maestro ?? {}
return({
senderId: maestro.senderId ?? 'maestro',
lifecycle: {
arenaChannel: maestro.lifecycle?.arenaChannel ?? 'arena:lifecycle',
godsReadyChannel: maestro.lifecycle?.godsReadyChannel ?? 'arena:gods:ready',
},
expectedGods: maestro.expectedGods ?? ['gps'],
readyTimeoutMs: maestro.readyTimeoutMs ?? 30000,
})
}
getArenaStorageSettings() {
const gps = this.maestroConfig.gps ?? {}
return({
agentHashKey: gps.arenaStorage?.agentHashKey ?? 'arena:agents:[UID]',
agentsIndexKey: gps.arenaStorage?.agentsIndexKey ?? 'arena:agents',
})
}
isIdle() {
return(this.orchestrationState === MaestroState.IDLE)
}
isLive() {
return(this.orchestrationState === MaestroState.LIVE)
}
async init() {
const mysqlCfg = this.maestroConfig.mysql
if(!mysqlCfg) {
console.error('[Maestro] Missing mysql config')
return(false)
}
this.db = await createMysqlPool(mysqlCfg)
this.simRepo = new SimRepository(this.db, this.debug)
if(this.debug) console.log('[Maestro] MySQL pool ready')
return(true)
}
refreshArenaGroom() {
if(this.arenaCnx) {
this.arenaGroom = new ArenaGroom(
this.arenaCnx,
this.getArenaStorageSettings(),
this.debug
)
}
}
wireSystemConnexion(cnx) {
cnx.maestroSrv = this
cnx.accessRights = this.accessRights
cnx.reloadAccessRights = () => this.reloadAccessRights()
cnx.getAccessRights = () => this.getAccessRights()
if(!this.systemCnx || cnx.redisConfig.role === 'primary') {
this.systemCnx = cnx
}
}
wireArenaConnexion(cnx) {
cnx.maestroSrv = this
this.arenaCnxs.push(cnx)
if(!this.arenaCnx || cnx.redisConfig.role === 'primary') {
this.arenaCnx = cnx
this.refreshArenaGroom()
}
}
resetOrchestration() {
this.orchestrationState = MaestroState.IDLE
this.simulationId = null
this.agentIds = []
this.readyGods.clear()
this.clearReadyQuorumWait()
}
clearReadyQuorumWait() {
if(this.readyQuorumTimer) {
clearTimeout(this.readyQuorumTimer)
this.readyQuorumTimer = null
}
this.readyQuorumResolve = null
}
completeReadyQuorum(result) {
const resolve = this.readyQuorumResolve
this.clearReadyQuorumWait()
if(typeof(resolve) === 'function') resolve(result)
}
waitForReadyQuorum() {
const { readyTimeoutMs } = this.getMaestroSettings()
return(new Promise(resolve => {
this.readyQuorumResolve = resolve
this.readyQuorumTimer = setTimeout(() => {
this.completeReadyQuorum({
ok: false,
err: `Timeout waiting for readyToStart (${readyTimeoutMs}ms)`,
})
}, readyTimeoutMs)
}))
}
onReadyToStart(msg) {
if(this.orchestrationState !== MaestroState.PREPARING) return
const payload = msg.payload ?? {}
if(payload.simulationId !== this.simulationId) return
const sender = msg.sender
const { expectedGods } = this.getMaestroSettings()
if(!expectedGods.includes(sender)) {
if(this.debug) console.warn(`[Maestro] Ignoring readyToStart from unexpected sender: ${sender}`)
return
}
if(!payload.success) {
this.completeReadyQuorum({
ok: false,
err: payload.err ?? `Participant ${sender} failed prepare`,
})
return
}
this.readyGods.set(sender, payload)
if(this.debug) console.log(`[Maestro] readyToStart from ${sender} (${this.readyGods.size}/${expectedGods.length})`)
for(const god of expectedGods) {
if(!this.readyGods.has(god)) return
}
this.completeReadyQuorum({ ok: true })
}
async publishLifecycle(eventType, payload) {
if(!this.arenaCnx) throw new Error('No arena Redis connection')
const { arenaChannel, senderId } = this.getMaestroSettings().lifecycle
await this.arenaCnx.redisPublish(arenaChannel, {
eventType,
sender: senderId,
payload,
})
if(this.debug) console.log(`[Maestro] Published ${eventType} simulationId=${payload.simulationId}`)
}
async startSimulation(userUuid, payload) {
if(!this.simRepo) return({ ok: false, err: 'Database not initialized' })
if(!this.arenaGroom) return({ ok: false, err: 'No arena Redis connection' })
if(!this.isIdle()) return({ ok: false, err: 'A simulation is already in progress' })
const simulationUuid = payload?.simulationUuid
const keyframeId = payload?.keyframeId
const infraId = payload?.infraId ?? null
const access = await this.simRepo.validateSimulationAccess(userUuid, simulationUuid, keyframeId)
if(!access.ok) return(access)
const agentsResult = await this.simRepo.loadKeyframeAgents(keyframeId)
if(!agentsResult.ok) return(agentsResult)
await this.arenaGroom.clearArena()
await this.arenaGroom.seedAgents(agentsResult.agents)
this.simulationId = simulationUuid
this.agentIds = agentsResult.agents.map(a => a.id)
this.readyGods.clear()
this.orchestrationState = MaestroState.PREPARING
const lifecyclePayload = {
simulationId: this.simulationId,
agentIds: this.agentIds,
keyframeId,
infraId,
}
const readyWait = this.waitForReadyQuorum()
await this.publishLifecycle('onYourMarks', lifecyclePayload)
const quorum = await readyWait
if(!quorum.ok) {
await this.arenaGroom.clearArena()
this.resetOrchestration()
return(quorum)
}
await this.publishLifecycle('bigBang', {
simulationId: this.simulationId,
agentIds: this.agentIds,
})
this.orchestrationState = MaestroState.LIVE
if(this.debug) console.log(`[Maestro] LIVE simulationId=${this.simulationId}`)
return({
ok: true,
simulationId: this.simulationId,
keyframeId,
infraId,
agentIds: this.agentIds,
})
}
async stopSimulation(userUuid, payload) {
if(!this.simRepo) return({ ok: false, err: 'Database not initialized' })
if(!this.arenaGroom) return({ ok: false, err: 'No arena Redis connection' })
const simulationUuid = payload?.simulationUuid
const access = await this.simRepo.validateSimulationOwner(userUuid, simulationUuid)
if(!access.ok) return(access)
if(this.simulationId && this.simulationId !== simulationUuid) {
return({ ok: false, err: 'Another simulation is active' })
}
await this.arenaGroom.clearArena()
this.resetOrchestration()
if(this.debug) console.log(`[Maestro] Stopped simulationId=${simulationUuid}`)
return({ ok: true, simulationId: simulationUuid })
}
async reloadAccessRights() {
await this.configHelper.refreshAccessRights()
this.maestroConfig.accessRights = this.configHelper.config.accessRights
this.accessRights.refreshAccessRights(this.maestroConfig)
}
getAccessRights() {
return(this.maestroConfig.accessRights)
}
}