import { AccesRights } from '../accesRights.js' import { createMysqlPool } from '../mysqlClient.js' import { SimRepository } from './simRepository.js' import { ArenaGroom } from './arenaGroom.js' import { MaestroState } from './orchestrationState.js' export class maestroServer { constructor(configHelper, allRediscnx, debug) { this.configHelper = configHelper this.maestroConfig = configHelper.config this.allRediscnx = allRediscnx this.debug = debug this.accessRights = new AccesRights(this.maestroConfig, debug) this.arenaCnx = null this.arenaCnxs = [] this.systemCnx = null this.db = null this.simRepo = null this.arenaGroom = null this.orchestrationState = MaestroState.IDLE this.simulationId = null this.agentIds = [] this.readyGods = new Map() this.readyQuorumResolve = null this.readyQuorumTimer = null } getMaestroSettings() { const maestro = this.maestroConfig.maestro ?? {} return({ senderId: maestro.senderId ?? 'maestro', lifecycle: { arenaChannel: maestro.lifecycle?.arenaChannel ?? 'arena:lifecycle', godsReadyChannel: maestro.lifecycle?.godsReadyChannel ?? 'arena:gods:ready', }, expectedGods: maestro.expectedGods ?? ['gps'], readyTimeoutMs: maestro.readyTimeoutMs ?? 30000, }) } getArenaStorageSettings() { const gps = this.maestroConfig.gps ?? {} return({ agentHashKey: gps.arenaStorage?.agentHashKey ?? 'arena:agents:[UID]', agentsIndexKey: gps.arenaStorage?.agentsIndexKey ?? 'arena:agents', }) } isIdle() { return(this.orchestrationState === MaestroState.IDLE) } isLive() { return(this.orchestrationState === MaestroState.LIVE) } async init() { const mysqlCfg = this.maestroConfig.mysql if(!mysqlCfg) { console.error('[Maestro] Missing mysql config') return(false) } this.db = await createMysqlPool(mysqlCfg) this.simRepo = new SimRepository(this.db, this.debug) if(this.debug) console.log('[Maestro] MySQL pool ready') return(true) } refreshArenaGroom() { if(this.arenaCnx) { this.arenaGroom = new ArenaGroom( this.arenaCnx, this.getArenaStorageSettings(), this.debug ) } } wireSystemConnexion(cnx) { cnx.maestroSrv = this cnx.accessRights = this.accessRights cnx.reloadAccessRights = () => this.reloadAccessRights() cnx.getAccessRights = () => this.getAccessRights() if(!this.systemCnx || cnx.redisConfig.role === 'primary') { this.systemCnx = cnx } } wireArenaConnexion(cnx) { cnx.maestroSrv = this this.arenaCnxs.push(cnx) if(!this.arenaCnx || cnx.redisConfig.role === 'primary') { this.arenaCnx = cnx this.refreshArenaGroom() } } resetOrchestration() { this.orchestrationState = MaestroState.IDLE this.simulationId = null this.agentIds = [] this.readyGods.clear() this.clearReadyQuorumWait() } clearReadyQuorumWait() { if(this.readyQuorumTimer) { clearTimeout(this.readyQuorumTimer) this.readyQuorumTimer = null } this.readyQuorumResolve = null } completeReadyQuorum(result) { const resolve = this.readyQuorumResolve this.clearReadyQuorumWait() if(typeof(resolve) === 'function') resolve(result) } waitForReadyQuorum() { const { readyTimeoutMs } = this.getMaestroSettings() return(new Promise(resolve => { this.readyQuorumResolve = resolve this.readyQuorumTimer = setTimeout(() => { this.completeReadyQuorum({ ok: false, err: `Timeout waiting for readyToStart (${readyTimeoutMs}ms)`, }) }, readyTimeoutMs) })) } onReadyToStart(msg) { if(this.orchestrationState !== MaestroState.PREPARING) return const payload = msg.payload ?? {} if(payload.simulationId !== this.simulationId) return const sender = msg.sender const { expectedGods } = this.getMaestroSettings() if(!expectedGods.includes(sender)) { if(this.debug) console.warn(`[Maestro] Ignoring readyToStart from unexpected sender: ${sender}`) return } if(!payload.success) { this.completeReadyQuorum({ ok: false, err: payload.err ?? `Participant ${sender} failed prepare`, }) return } this.readyGods.set(sender, payload) if(this.debug) console.log(`[Maestro] readyToStart from ${sender} (${this.readyGods.size}/${expectedGods.length})`) for(const god of expectedGods) { if(!this.readyGods.has(god)) return } this.completeReadyQuorum({ ok: true }) } async publishLifecycle(eventType, payload) { if(!this.arenaCnx) throw new Error('No arena Redis connection') const { arenaChannel, senderId } = this.getMaestroSettings().lifecycle await this.arenaCnx.redisPublish(arenaChannel, { eventType, sender: senderId, payload, }) if(this.debug) console.log(`[Maestro] Published ${eventType} simulationId=${payload.simulationId}`) } async startSimulation(userUuid, payload) { if(!this.simRepo) return({ ok: false, err: 'Database not initialized' }) if(!this.arenaGroom) return({ ok: false, err: 'No arena Redis connection' }) if(!this.isIdle()) return({ ok: false, err: 'A simulation is already in progress' }) const simulationUuid = payload?.simulationUuid const keyframeId = payload?.keyframeId const infraId = payload?.infraId ?? null const access = await this.simRepo.validateSimulationAccess(userUuid, simulationUuid, keyframeId) if(!access.ok) return(access) const agentsResult = await this.simRepo.loadKeyframeAgents(keyframeId) if(!agentsResult.ok) return(agentsResult) await this.arenaGroom.clearArena() await this.arenaGroom.seedAgents(agentsResult.agents) this.simulationId = simulationUuid this.agentIds = agentsResult.agents.map(a => a.id) this.readyGods.clear() this.orchestrationState = MaestroState.PREPARING const lifecyclePayload = { simulationId: this.simulationId, agentIds: this.agentIds, keyframeId, infraId, } const readyWait = this.waitForReadyQuorum() await this.publishLifecycle('onYourMarks', lifecyclePayload) const quorum = await readyWait if(!quorum.ok) { await this.arenaGroom.clearArena() this.resetOrchestration() return(quorum) } await this.publishLifecycle('bigBang', { simulationId: this.simulationId, agentIds: this.agentIds, }) this.orchestrationState = MaestroState.LIVE if(this.debug) console.log(`[Maestro] LIVE simulationId=${this.simulationId}`) return({ ok: true, simulationId: this.simulationId, keyframeId, infraId, agentIds: this.agentIds, }) } async stopSimulation(userUuid, payload) { if(!this.simRepo) return({ ok: false, err: 'Database not initialized' }) if(!this.arenaGroom) return({ ok: false, err: 'No arena Redis connection' }) const simulationUuid = payload?.simulationUuid const access = await this.simRepo.validateSimulationOwner(userUuid, simulationUuid) if(!access.ok) return(access) if(this.simulationId && this.simulationId !== simulationUuid) { return({ ok: false, err: 'Another simulation is active' }) } await this.arenaGroom.clearArena() this.resetOrchestration() if(this.debug) console.log(`[Maestro] Stopped simulationId=${simulationUuid}`) return({ ok: true, simulationId: simulationUuid }) } async reloadAccessRights() { await this.configHelper.refreshAccessRights() this.maestroConfig.accessRights = this.configHelper.config.accessRights this.accessRights.refreshAccessRights(this.maestroConfig) } getAccessRights() { return(this.maestroConfig.accessRights) } }