Files
P42_godDaemons/Maestro/maestroServer.js
T

402 lines
14 KiB
JavaScript

import { AccesRights } from '../accesRights.js'
import { MySQLClient } from '@p42/p42modules'
import { SimRepository } from './simRepository.js'
import { ArenaGroom } from './arenaGroom.js'
import { MaestroState } from './orchestrationState.js'
import { PrepareQuorum } from './prepareQuorum.js'
import { buildPrepareQuorum } from './primordialDaemons.js'
export class maestroServer {
constructor(configHelper, allRediscnx, debug) {
this.configHelper = configHelper
this.rootConfig = configHelper.config
this.maestroConfig = configHelper.config.maestro ?? {}
this.gpsConfig = configHelper.config.gps ?? {}
this.allRediscnx = allRediscnx
this.debug = debug
this.accessRights = new AccesRights(this.rootConfig, debug)
this.arenaCnx = null
this.arenaCnxs = []
this.systemCnx = null
this.db = null
this.simRepo = null
this.arenaGroom = null
this.prepareQuorum = null
this.orchestrationState = MaestroState.IDLE
this.simulationId = null
this.agentIds = []
this.keyframeId = null
this.infraId = null
this.bigBangEpoch = null
this.resumeEpoch = null
this.pausedAt = null
}
isIdle() {
return(this.orchestrationState === MaestroState.IDLE)
}
isLive() {
return(this.orchestrationState === MaestroState.LIVE)
}
isPaused() {
return(this.orchestrationState === MaestroState.PAUSED)
}
simNow() {
if(this.bigBangEpoch === null) return(null)
if(this.resumeEpoch !== null) {
return(this.pausedAt + (performance.now() - this.resumeEpoch) / 1000)
}
return((performance.now() - this.bigBangEpoch) / 1000)
}
async init() {
const mysqlCfg = this.rootConfig.mysql
if(!mysqlCfg) {
console.error('[Maestro] Missing mysql config')
return(false)
}
this.db = await MySQLClient.createPool(mysqlCfg)
this.simRepo = new SimRepository(this.db, MySQLClient.resolveDatabases(mysqlCfg), this.debug)
if(this.debug) console.log('[Maestro] MySQL pool ready')
return(true)
}
refreshArenaGroom() {
const arenaStorage = this.gpsConfig.arenaStorage
if(this.arenaCnx && arenaStorage) {
this.arenaGroom = new ArenaGroom(
this.arenaCnx,
arenaStorage,
this.debug
)
}
}
refreshPrepareQuorum() {
if(!this.arenaCnx) return
this.prepareQuorum = new PrepareQuorum({
ackChannel: this.maestroConfig.lifecycle.godsReadyChannel,
timeoutMs: this.maestroConfig.readyTimeoutMs,
matchesChan: this.arenaCnx.matchesChan.bind(this.arenaCnx),
debug: this.debug,
})
}
wireSystemConnexion(cnx) {
cnx.maestroSrv = this
cnx.accessRights = this.accessRights
cnx.reloadAccessRights = () => this.reloadAccessRights()
cnx.getAccessRights = () => this.getAccessRights()
if(!this.systemCnx || cnx.redisConfig.role === 'primary') {
this.systemCnx = cnx
}
}
wireArenaConnexion(cnx) {
cnx.maestroSrv = this
this.arenaCnxs.push(cnx)
if(!this.arenaCnx || cnx.redisConfig.role === 'primary') {
this.arenaCnx = cnx
this.refreshArenaGroom()
this.refreshPrepareQuorum()
}
}
resetOrchestration() {
this.orchestrationState = MaestroState.IDLE
this.simulationId = null
this.agentIds = []
this.keyframeId = null
this.infraId = null
this.bigBangEpoch = null
this.resumeEpoch = null
this.pausedAt = null
this.prepareQuorum?.cancel()
}
handlePrepareAck(msg, chan) {
if(this.orchestrationState !== MaestroState.PREPARING) return(false)
if(!this.prepareQuorum) return(false)
return(this.prepareQuorum.handleMessage(msg, chan))
}
async publishLifecycle(eventType, payload, state = null) {
if(!this.arenaCnx) throw(new Error('No arena Redis connection'))
const resolvedState = state ?? this.orchestrationStateFor(payload?.simulationId ?? this.simulationId)
await this.arenaCnx.redisPublish(this.maestroConfig.lifecycle.arenaChannel, {
eventType,
sender: this.maestroConfig.senderId,
payload,
})
await this.publishSystemLifecycle(eventType, payload, resolvedState)
if(this.debug) console.log(`[Maestro] Published ${eventType} simulationId=${payload.simulationId}`)
}
async publishSystemLifecycle(eventType, payload, state) {
if(!this.systemCnx || !this.simRepo) return
const simulationId = payload?.simulationId
if(!simulationId) return
const ownersResult = await this.simRepo.listSimulationOwnerUuids(simulationId)
if(!ownersResult.ok || !ownersResult.ownerUuids.length) return
const channelTemplate = this.maestroConfig.systemLifecycleChannel
const senderId = this.maestroConfig.senderId
const msg = {
eventType,
sender: senderId,
payload: { ...payload, state },
}
for(const uid of ownersResult.ownerUuids) {
const chan = channelTemplate.replace(/\[UID\]/g, uid)
await this.systemCnx.redisPublish(chan, msg)
}
if(this.debug) {
console.log(`[Maestro] Published system ${eventType} state=${state} simulationId=${simulationId}`)
}
}
async startSimulation(userUuid, payload) {
if(!this.simRepo) return({ ok: false, err: 'Database not initialized' })
if(!this.arenaGroom) return({ ok: false, err: 'No arena Redis connection' })
const simulationUuid = payload?.simulationUuid
if(this.isPaused()) {
return(this.resumeSimulation(userUuid, simulationUuid))
}
if(!this.prepareQuorum) return({ ok: false, err: 'No prepare quorum (arena Redis not wired)' })
if(!this.isIdle()) return({ ok: false, err: 'A simulation is already in progress' })
const infraId = payload?.infraId ?? null
const access = await this.simRepo.validateSimulationAccess(userUuid, simulationUuid)
if(!access.ok) return(access)
const keyframeId = access.sim.sim_root_kf_uuid
const agentsResult = await this.simRepo.loadKeyframeAgents(keyframeId)
if(!agentsResult.ok) return(agentsResult)
await this.arenaGroom.clearArena()
await this.arenaGroom.seedAgents(agentsResult.agents)
this.simulationId = simulationUuid
this.agentIds = agentsResult.agents.map(a => a.id)
this.keyframeId = keyframeId
this.infraId = infraId
this.orchestrationState = MaestroState.PREPARING
const lifecyclePayload = {
simulationId: this.simulationId,
agentIds: this.agentIds,
keyframeId,
infraId,
}
const expectedParticipants = buildPrepareQuorum(this.agentIds, this.rootConfig)
const readyWait = this.prepareQuorum.begin(expectedParticipants, this.simulationId)
await this.publishLifecycle('onYourMarks', lifecyclePayload)
this.continueStartSimulation(simulationUuid, readyWait).catch(err => {
console.error('[Maestro] continueStartSimulation failed:', err)
})
return({
ok: true,
simulationId: this.simulationId,
keyframeId,
infraId,
agentIds: this.agentIds,
resumed: false,
state: MaestroState.PREPARING,
})
}
async continueStartSimulation(simulationUuid, readyWait) {
try {
const quorum = await readyWait
if(!quorum.ok) {
if(this.orchestrationState === MaestroState.PREPARING && this.simulationId === simulationUuid) {
await this.publishSystemLifecycle('simulationPrepareFailed', {
simulationId: simulationUuid,
err: quorum.err,
}, MaestroState.IDLE)
await this.arenaGroom.clearArena()
this.resetOrchestration()
}
if(this.debug) {
console.warn(`[Maestro] Prepare failed simulationId=${simulationUuid}: ${quorum.err}`)
}
return
}
if(this.orchestrationState !== MaestroState.PREPARING || this.simulationId !== simulationUuid) {
return
}
this.bigBangEpoch = performance.now()
this.resumeEpoch = null
this.pausedAt = null
this.orchestrationState = MaestroState.LIVE
await this.publishLifecycle('bigBang', {
simulationId: this.simulationId,
agentIds: this.agentIds,
})
if(this.debug) console.log(`[Maestro] LIVE simulationId=${this.simulationId}`)
} catch(err) {
console.error(`[Maestro] continueStartSimulation error simulationId=${simulationUuid}:`, err)
if(this.simulationId === simulationUuid && this.orchestrationState === MaestroState.PREPARING) {
await this.publishSystemLifecycle('simulationPrepareFailed', {
simulationId: simulationUuid,
err: err.message ?? 'Prepare failed',
}, MaestroState.IDLE)
await this.arenaGroom?.clearArena()
this.resetOrchestration()
}
}
}
async resumeSimulation(userUuid, simulationUuid) {
if(this.simulationId !== simulationUuid) {
return({ ok: false, err: 'Another simulation is paused' })
}
const access = await this.simRepo.validateSimulationAccess(userUuid, simulationUuid)
if(!access.ok) return(access)
await this.publishLifecycle('simulationResumed', {
simulationId: simulationUuid,
agentIds: [...this.agentIds],
t: this.pausedAt,
})
this.resumeEpoch = performance.now()
this.orchestrationState = MaestroState.LIVE
if(this.debug) console.log(`[Maestro] RESUMED at t=${this.pausedAt}, simulationId=${simulationUuid}`)
return({
ok: true,
simulationId: this.simulationId,
keyframeId: this.keyframeId,
infraId: this.infraId,
agentIds: [...this.agentIds],
resumed: true,
state: MaestroState.LIVE,
})
}
async pauseSimulation(userUuid, payload) {
if(!this.simRepo) return({ ok: false, err: 'Database not initialized' })
const simulationUuid = payload?.simulationUuid
const access = await this.simRepo.validateSimulationAccess(userUuid, simulationUuid)
if(!access.ok) return(access)
if(this.simulationId !== simulationUuid) {
if(this.simulationId) return({ ok: false, err: 'Another simulation is active' })
return({ ok: false, err: 'Simulation is not running' })
}
if(this.orchestrationState === MaestroState.PAUSED) {
return({ ok: true, simulationId: simulationUuid, t: this.pausedAt })
}
if(this.orchestrationState !== MaestroState.LIVE && this.orchestrationState !== MaestroState.PREPARING) {
return({ ok: false, err: 'Simulation is not running' })
}
const t = this.orchestrationState === MaestroState.LIVE ? this.simNow() : 0
await this.publishLifecycle('simulationPaused', {
simulationId: simulationUuid,
agentIds: [...this.agentIds],
t,
})
this.pausedAt = t
this.orchestrationState = MaestroState.PAUSED
if(this.debug) console.log(`[Maestro] PAUSED at t=${t}, simulationId=${simulationUuid}`)
return({ ok: true, simulationId: simulationUuid, t })
}
async stopSimulation(userUuid, payload) {
if(!this.simRepo) return({ ok: false, err: 'Database not initialized' })
if(!this.arenaGroom) return({ ok: false, err: 'No arena Redis connection' })
const simulationUuid = payload?.simulationUuid
const access = await this.simRepo.validateSimulationOwner(userUuid, simulationUuid)
if(!access.ok) return(access)
if(this.simulationId && this.simulationId !== simulationUuid) {
return({ ok: false, err: 'Another simulation is active' })
}
if(this.simulationId === simulationUuid) {
let t = null
if(this.orchestrationState === MaestroState.LIVE) {
t = this.simNow()
} else if(this.orchestrationState === MaestroState.PREPARING) {
t = 0
} else if(this.orchestrationState === MaestroState.PAUSED) {
t = this.pausedAt
}
await this.publishLifecycle('simulationStopped', {
simulationId: simulationUuid,
agentIds: [...this.agentIds],
t,
}, MaestroState.IDLE)
}
if(this.simulationId === simulationUuid && this.orchestrationState === MaestroState.PREPARING) {
this.prepareQuorum?.abortPending({ ok: false, err: 'Simulation stopped' })
}
await this.arenaGroom.clearArena()
this.resetOrchestration()
if(this.debug) console.log(`[Maestro] Stopped simulationId=${simulationUuid}`)
return({ ok: true, simulationId: simulationUuid })
}
orchestrationStateFor(simulationId) {
if(this.simulationId !== simulationId) return(MaestroState.IDLE)
return(this.orchestrationState)
}
async getSimulationsStatus(userUuid) {
if(!this.simRepo) return({ ok: false, err: 'Database not initialized' })
const listResult = await this.simRepo.listOwnerSimulations(userUuid)
if(!listResult.ok) return(listResult)
const simulations = listResult.simulationIds.map(simulationId => ({
simulationId,
state: this.orchestrationStateFor(simulationId),
}))
return({ ok: true, simulations })
}
async reloadAccessRights() {
await this.configHelper.refreshAccessRights()
this.rootConfig.accessRights = this.configHelper.config.accessRights
this.accessRights.refreshAccessRights(this.rootConfig)
}
getAccessRights() {
return(this.rootConfig.accessRights)
}
}