diff --git a/GPS/gpsServer.js b/GPS/gpsServer.js index d0d070a..b4539d5 100644 --- a/GPS/gpsServer.js +++ b/GPS/gpsServer.js @@ -28,6 +28,8 @@ export class gpsServer { this.state = SimState.IDLE this.simulationId = null this.bigBangEpoch = null + this.resumeEpoch = null + this.pausedAt = null this.ignoredChangeCount = 0 } @@ -62,17 +64,29 @@ export class gpsServer { return(this.state === SimState.LIVE) } + isPaused() { + return(this.state === SimState.PAUSED) + } + + canQuerySim() { + return(this.isLive() || this.isPaused()) + } + isPrepare() { return(this.state === SimState.PREPARE) } simNow() { if(this.bigBangEpoch === null) return(null) + if(this.resumeEpoch !== null) { + return(this.pausedAt + (performance.now() - this.resumeEpoch) / 1000) + } return((performance.now() - this.bigBangEpoch) / 1000) } now() { if(this.isLive()) return(this.simNow()) + if(this.isPaused()) return(this.pausedAt) if(this.isPrepare()) return(0) return(null) } @@ -124,7 +138,7 @@ export class gpsServer { } getAgentPosition(agentId, at = null) { - if(!this.isLive()) return(null) + if(!this.canQuerySim()) return(null) const agent = this.agents.get(agentId) if(!agent) return(null) const t = at ?? this.now() @@ -154,7 +168,7 @@ export class gpsServer { } getAgentsInPrism(prism, at = null) { - if(!this.isLive()) return([]) + if(!this.canQuerySim()) return([]) const t = at ?? this.now() if(t === null) return([]) const agents = [] @@ -172,6 +186,8 @@ export class gpsServer { this.registry = new CollisionRegistry() this.simulationId = null this.bigBangEpoch = null + this.resumeEpoch = null + this.pausedAt = null this.ignoredChangeCount = 0 this.state = SimState.IDLE await this.agentStore?.clearAll() @@ -269,6 +285,8 @@ export class gpsServer { } this.bigBangEpoch = performance.now() + this.resumeEpoch = null + this.pausedAt = null this.state = SimState.LIVE for(const agent of this.agents.values()) { @@ -278,6 +296,46 @@ export class gpsServer { if(this.debug) console.log(`[GPS] LIVE: bigBangEpoch set, simulationId=${this.simulationId}`) } + onSimulationPaused(payload = {}) { + if(this.state === SimState.IDLE || this.state === SimState.PAUSED) return + if(payload.simulationId && this.simulationId && payload.simulationId !== this.simulationId) { + console.error('[GPS] simulationPaused rejected: simulationId mismatch') + return + } + + const t = (typeof(payload.t) === 'number' && !Number.isNaN(payload.t)) + ? payload.t + : this.now() + this.pausedAt = t + this.state = SimState.PAUSED + if(this.debug) console.log(`[GPS] PAUSED at t=${t}, simulationId=${this.simulationId}`) + } + + async onSimulationStopped(payload = {}) { + if(this.state === SimState.IDLE) return + if(payload.simulationId && this.simulationId && payload.simulationId !== this.simulationId) { + console.error('[GPS] simulationStopped rejected: simulationId mismatch') + return + } + if(this.debug) console.log(`[GPS] STOPPED (reset), simulationId=${this.simulationId}`) + await this.resetSimulation() + } + + onSimulationResumed(payload = {}) { + if(this.state !== SimState.PAUSED) { + console.error(`[GPS] simulationResumed rejected: expected PAUSED, got ${this.state}`) + return + } + if(payload.simulationId && this.simulationId && payload.simulationId !== this.simulationId) { + console.error('[GPS] simulationResumed rejected: simulationId mismatch') + return + } + + this.resumeEpoch = performance.now() + this.state = SimState.LIVE + if(this.debug) console.log(`[GPS] RESUMED at t=${this.pausedAt}, simulationId=${this.simulationId}`) + } + upsertAgent(agentId, newVector, newPosition = null) { const now = this.now() let agent = this.agents.get(agentId) diff --git a/GPS/handlers/arena/lifecycle.js b/GPS/handlers/arena/lifecycle.js index 7cfada0..2a40885 100644 --- a/GPS/handlers/arena/lifecycle.js +++ b/GPS/handlers/arena/lifecycle.js @@ -19,5 +19,18 @@ export const eventHandlers = { bigBang(msg, chan) { this.gpsSrv?.onBigBang(msg.payload ?? {}) }, + simulationPaused(msg, chan) { + this.gpsSrv?.onSimulationPaused(msg.payload ?? {}) + }, + simulationResumed(msg, chan) { + this.gpsSrv?.onSimulationResumed(msg.payload ?? {}) + }, + simulationStopped(msg, chan) { + const srv = this.gpsSrv + if(!srv) return + srv.onSimulationStopped(msg.payload ?? {}).catch(err => { + console.error(`[${this.redisId}] simulationStopped failed:`, err) + }) + }, }, } diff --git a/GPS/simulationState.js b/GPS/simulationState.js index 35b630a..dbe1925 100644 --- a/GPS/simulationState.js +++ b/GPS/simulationState.js @@ -2,6 +2,7 @@ export const SimState = { IDLE: 'idle', PREPARE: 'prepare', LIVE: 'live', + PAUSED: 'paused', } export function validateAgentSets(expected, found) { diff --git a/Maestro/handlers/system/simulation.js b/Maestro/handlers/system/simulation.js index 64e24af..99057dd 100644 --- a/Maestro/handlers/system/simulation.js +++ b/Maestro/handlers/system/simulation.js @@ -30,6 +30,37 @@ export const actions = { keyframeId: result.keyframeId, infraId: result.infraId, agentIds: result.agentIds, + resumed: result.resumed ?? false, + state: result.state ?? null, + }, + }) + }, + + async action_PAUSESIMULATION(action, payload, reqid, sender, roles) { + if(!isValidUuid(sender)) { + replyToAction(this, { action, reqid, sender, success: false, err: 'Missing or invalid sender (user UUID)' }) + return + } + + if(!payload?.simulationUuid) { + replyToAction(this, { action, reqid, sender, success: false, err: 'Missing simulationUuid' }) + return + } + + const result = await this.maestroSrv.pauseSimulation(sender, payload) + if(!result.ok) { + replyToAction(this, { action, reqid, sender, success: false, err: result.err }) + return + } + + replyToAction(this, { + action, + reqid, + sender, + success: true, + payload: { + simulationId: result.simulationId, + t: result.t, }, }) }, @@ -60,4 +91,25 @@ export const actions = { }) }, + async action_GETSIMULATIONSSTATUS(action, payload, reqid, sender, roles) { + if(!isValidUuid(sender)) { + replyToAction(this, { action, reqid, sender, success: false, err: 'Missing or invalid sender (user UUID)' }) + return + } + + const result = await this.maestroSrv.getSimulationsStatus(sender) + if(!result.ok) { + replyToAction(this, { action, reqid, sender, success: false, err: result.err }) + return + } + + replyToAction(this, { + action, + reqid, + sender, + success: true, + payload: result.simulations, + }) + }, + } diff --git a/Maestro/maestroServer.js b/Maestro/maestroServer.js index daed119..7e74e1a 100644 --- a/Maestro/maestroServer.js +++ b/Maestro/maestroServer.js @@ -24,6 +24,11 @@ export class maestroServer { this.orchestrationState = MaestroState.IDLE this.simulationId = null this.agentIds = [] + this.keyframeId = null + this.infraId = null + this.bigBangEpoch = null + this.resumeEpoch = null + this.pausedAt = null } getMaestroSettings() { @@ -54,6 +59,18 @@ export class maestroServer { return(this.orchestrationState === MaestroState.LIVE) } + isPaused() { + return(this.orchestrationState === MaestroState.PAUSED) + } + + simNow() { + if(this.bigBangEpoch === null) return(null) + if(this.resumeEpoch !== null) { + return(this.pausedAt + (performance.now() - this.resumeEpoch) / 1000) + } + return((performance.now() - this.bigBangEpoch) / 1000) + } + async init() { const mysqlCfg = this.maestroConfig.mysql if(!mysqlCfg) { @@ -111,6 +128,11 @@ export class maestroServer { this.orchestrationState = MaestroState.IDLE this.simulationId = null this.agentIds = [] + this.keyframeId = null + this.infraId = null + this.bigBangEpoch = null + this.resumeEpoch = null + this.pausedAt = null this.prepareQuorum?.cancel() } @@ -120,24 +142,58 @@ export class maestroServer { return(this.prepareQuorum.handleMessage(msg, chan)) } - async publishLifecycle(eventType, payload) { + async publishLifecycle(eventType, payload, state = null) { if(!this.arenaCnx) throw(new Error('No arena Redis connection')) const { arenaChannel, senderId } = this.getMaestroSettings().lifecycle + const resolvedState = state ?? this.orchestrationStateFor(payload?.simulationId ?? this.simulationId) await this.arenaCnx.redisPublish(arenaChannel, { eventType, sender: senderId, payload, }) + await this.publishSystemLifecycle(eventType, payload, resolvedState) if(this.debug) console.log(`[Maestro] Published ${eventType} simulationId=${payload.simulationId}`) } + async publishSystemLifecycle(eventType, payload, state) { + if(!this.systemCnx || !this.simRepo) return + + const simulationId = payload?.simulationId + if(!simulationId) return + + const ownersResult = await this.simRepo.listSimulationOwnerUuids(simulationId) + if(!ownersResult.ok || !ownersResult.ownerUuids.length) return + + const maestro = this.maestroConfig.maestro ?? {} + const channelTemplate = maestro.systemLifecycleChannel ?? 'system:maestro:lifecycle:[UID]' + const senderId = maestro.senderId ?? 'maestro' + const msg = { + eventType, + sender: senderId, + payload: { ...payload, state }, + } + + for(const uid of ownersResult.ownerUuids) { + const chan = channelTemplate.replace(/\[UID\]/g, uid) + await this.systemCnx.redisPublish(chan, msg) + } + if(this.debug) { + console.log(`[Maestro] Published system ${eventType} state=${state} simulationId=${simulationId}`) + } + } + async startSimulation(userUuid, payload) { if(!this.simRepo) return({ ok: false, err: 'Database not initialized' }) if(!this.arenaGroom) return({ ok: false, err: 'No arena Redis connection' }) + + const simulationUuid = payload?.simulationUuid + if(this.isPaused()) { + return(this.resumeSimulation(userUuid, simulationUuid)) + } + if(!this.prepareQuorum) return({ ok: false, err: 'No prepare quorum (arena Redis not wired)' }) if(!this.isIdle()) return({ ok: false, err: 'A simulation is already in progress' }) - const simulationUuid = payload?.simulationUuid const infraId = payload?.infraId ?? null const access = await this.simRepo.validateSimulationAccess(userUuid, simulationUuid) @@ -152,6 +208,8 @@ export class maestroServer { this.simulationId = simulationUuid this.agentIds = agentsResult.agents.map(a => a.id) + this.keyframeId = keyframeId + this.infraId = infraId this.orchestrationState = MaestroState.PREPARING const lifecyclePayload = { @@ -166,30 +224,131 @@ export class maestroServer { await this.publishLifecycle('onYourMarks', lifecyclePayload) - const quorum = await readyWait - if(!quorum.ok) { - await this.arenaGroom.clearArena() - this.resetOrchestration() - return(quorum) - } - - await this.publishLifecycle('bigBang', { - simulationId: this.simulationId, - agentIds: this.agentIds, + this.continueStartSimulation(simulationUuid, readyWait).catch(err => { + console.error('[Maestro] continueStartSimulation failed:', err) }) - this.orchestrationState = MaestroState.LIVE - if(this.debug) console.log(`[Maestro] LIVE simulationId=${this.simulationId}`) - return({ ok: true, simulationId: this.simulationId, keyframeId, infraId, agentIds: this.agentIds, + resumed: false, + state: MaestroState.PREPARING, }) } + async continueStartSimulation(simulationUuid, readyWait) { + try { + const quorum = await readyWait + if(!quorum.ok) { + if(this.orchestrationState === MaestroState.PREPARING && this.simulationId === simulationUuid) { + await this.publishSystemLifecycle('simulationPrepareFailed', { + simulationId: simulationUuid, + err: quorum.err, + }, MaestroState.IDLE) + await this.arenaGroom.clearArena() + this.resetOrchestration() + } + if(this.debug) { + console.warn(`[Maestro] Prepare failed simulationId=${simulationUuid}: ${quorum.err}`) + } + return + } + + if(this.orchestrationState !== MaestroState.PREPARING || this.simulationId !== simulationUuid) { + return + } + + this.bigBangEpoch = performance.now() + this.resumeEpoch = null + this.pausedAt = null + this.orchestrationState = MaestroState.LIVE + + await this.publishLifecycle('bigBang', { + simulationId: this.simulationId, + agentIds: this.agentIds, + }) + + if(this.debug) console.log(`[Maestro] LIVE simulationId=${this.simulationId}`) + } catch(err) { + console.error(`[Maestro] continueStartSimulation error simulationId=${simulationUuid}:`, err) + if(this.simulationId === simulationUuid && this.orchestrationState === MaestroState.PREPARING) { + await this.publishSystemLifecycle('simulationPrepareFailed', { + simulationId: simulationUuid, + err: err.message ?? 'Prepare failed', + }, MaestroState.IDLE) + await this.arenaGroom?.clearArena() + this.resetOrchestration() + } + } + } + + async resumeSimulation(userUuid, simulationUuid) { + if(this.simulationId !== simulationUuid) { + return({ ok: false, err: 'Another simulation is paused' }) + } + + const access = await this.simRepo.validateSimulationAccess(userUuid, simulationUuid) + if(!access.ok) return(access) + + await this.publishLifecycle('simulationResumed', { + simulationId: simulationUuid, + agentIds: [...this.agentIds], + t: this.pausedAt, + }) + + this.resumeEpoch = performance.now() + this.orchestrationState = MaestroState.LIVE + if(this.debug) console.log(`[Maestro] RESUMED at t=${this.pausedAt}, simulationId=${simulationUuid}`) + + return({ + ok: true, + simulationId: this.simulationId, + keyframeId: this.keyframeId, + infraId: this.infraId, + agentIds: [...this.agentIds], + resumed: true, + state: MaestroState.LIVE, + }) + } + + async pauseSimulation(userUuid, payload) { + if(!this.simRepo) return({ ok: false, err: 'Database not initialized' }) + + const simulationUuid = payload?.simulationUuid + const access = await this.simRepo.validateSimulationAccess(userUuid, simulationUuid) + if(!access.ok) return(access) + + if(this.simulationId !== simulationUuid) { + if(this.simulationId) return({ ok: false, err: 'Another simulation is active' }) + return({ ok: false, err: 'Simulation is not running' }) + } + + if(this.orchestrationState === MaestroState.PAUSED) { + return({ ok: true, simulationId: simulationUuid, t: this.pausedAt }) + } + + if(this.orchestrationState !== MaestroState.LIVE && this.orchestrationState !== MaestroState.PREPARING) { + return({ ok: false, err: 'Simulation is not running' }) + } + + const t = this.orchestrationState === MaestroState.LIVE ? this.simNow() : 0 + + await this.publishLifecycle('simulationPaused', { + simulationId: simulationUuid, + agentIds: [...this.agentIds], + t, + }) + + this.pausedAt = t + this.orchestrationState = MaestroState.PAUSED + if(this.debug) console.log(`[Maestro] PAUSED at t=${t}, simulationId=${simulationUuid}`) + + return({ ok: true, simulationId: simulationUuid, t }) + } + async stopSimulation(userUuid, payload) { if(!this.simRepo) return({ ok: false, err: 'Database not initialized' }) if(!this.arenaGroom) return({ ok: false, err: 'No arena Redis connection' }) @@ -202,6 +361,26 @@ export class maestroServer { return({ ok: false, err: 'Another simulation is active' }) } + if(this.simulationId === simulationUuid) { + let t = null + if(this.orchestrationState === MaestroState.LIVE) { + t = this.simNow() + } else if(this.orchestrationState === MaestroState.PREPARING) { + t = 0 + } else if(this.orchestrationState === MaestroState.PAUSED) { + t = this.pausedAt + } + await this.publishLifecycle('simulationStopped', { + simulationId: simulationUuid, + agentIds: [...this.agentIds], + t, + }, MaestroState.IDLE) + } + + if(this.simulationId === simulationUuid && this.orchestrationState === MaestroState.PREPARING) { + this.prepareQuorum?.abortPending({ ok: false, err: 'Simulation stopped' }) + } + await this.arenaGroom.clearArena() this.resetOrchestration() @@ -210,6 +389,25 @@ export class maestroServer { return({ ok: true, simulationId: simulationUuid }) } + orchestrationStateFor(simulationId) { + if(this.simulationId !== simulationId) return(MaestroState.IDLE) + return(this.orchestrationState) + } + + async getSimulationsStatus(userUuid) { + if(!this.simRepo) return({ ok: false, err: 'Database not initialized' }) + + const listResult = await this.simRepo.listOwnerSimulations(userUuid) + if(!listResult.ok) return(listResult) + + const simulations = listResult.simulationIds.map(simulationId => ({ + simulationId, + state: this.orchestrationStateFor(simulationId), + })) + + return({ ok: true, simulations }) + } + async reloadAccessRights() { await this.configHelper.refreshAccessRights() this.maestroConfig.accessRights = this.configHelper.config.accessRights diff --git a/Maestro/orchestrationState.js b/Maestro/orchestrationState.js index 25db8dd..c48b60e 100644 --- a/Maestro/orchestrationState.js +++ b/Maestro/orchestrationState.js @@ -2,4 +2,5 @@ export const MaestroState = { IDLE: 'idle', PREPARING: 'preparing', LIVE: 'live', + PAUSED: 'paused', } diff --git a/Maestro/prepareQuorum.js b/Maestro/prepareQuorum.js index c693100..f08251b 100644 --- a/Maestro/prepareQuorum.js +++ b/Maestro/prepareQuorum.js @@ -83,6 +83,18 @@ export class PrepareQuorum { } cancel() { + this.#cleanup() + } + + abortPending(result) { + const resolve = this.resolve + this.#cleanup() + if(typeof(resolve) !== 'function') return(false) + resolve(result) + return(true) + } + + #cleanup() { if(this.timer) { clearTimeout(this.timer) this.timer = null @@ -96,7 +108,7 @@ export class PrepareQuorum { #finish(result) { const resolve = this.resolve - this.cancel() + this.#cleanup() if(typeof(resolve) === 'function') resolve(result) } diff --git a/Maestro/simRepository.js b/Maestro/simRepository.js index 5e3992b..e9af162 100644 --- a/Maestro/simRepository.js +++ b/Maestro/simRepository.js @@ -58,14 +58,14 @@ export class SimRepository { if(!isValidUuid(simulationUuid)) return({ ok: false, err: 'Invalid simulation UUID' }) const rows = await MySQLClient.poolExecute(this.db, ` - SELECT s.sim_id, - BIN_TO_UUID(s.sim_uuid) AS sim_uuid, - BIN_TO_UUID(s.sim_root_kf_uuid) AS sim_root_kf_uuid - FROM ${this.#qualify(this.simDb, 'simulations')} s - INNER JOIN ${this.#qualify(this.guiDb, 'simowners')} o ON o.own_sim_uuid = s.sim_uuid - INNER JOIN ${this.#qualify(this.guiDb, 'users')} u ON o.own_usr_id = u.usr_id - WHERE u.usr_uuid = ? - AND s.sim_uuid = UUID_TO_BIN(?) + SELECT sim_id, + BIN_TO_UUID(sim_uuid) AS sim_uuid, + BIN_TO_UUID(sim_root_kf_uuid) AS sim_root_kf_uuid + FROM ${this.#qualify(this.simDb, 'simulations')} + INNER JOIN ${this.#qualify(this.guiDb, 'simowners')} ON own_sim_uuid = sim_uuid + INNER JOIN ${this.#qualify(this.guiDb, 'users')} ON own_usr_id = usr_id + WHERE usr_uuid = ? + AND sim_uuid = UUID_TO_BIN(?) `, [userUuid, simulationUuid]) if(!rows.length) return({ ok: false, err: 'Simulation not found or access denied' }) @@ -88,20 +88,54 @@ export class SimRepository { if(!isValidUuid(simulationUuid)) return({ ok: false, err: 'Invalid simulation UUID' }) const rows = await MySQLClient.poolExecute(this.db, ` - SELECT s.sim_id, - BIN_TO_UUID(s.sim_uuid) AS sim_uuid, - BIN_TO_UUID(s.sim_root_kf_uuid) AS sim_root_kf_uuid - FROM ${this.#qualify(this.simDb, 'simulations')} s - INNER JOIN ${this.#qualify(this.guiDb, 'simowners')} o ON o.own_sim_uuid = s.sim_uuid - INNER JOIN ${this.#qualify(this.guiDb, 'users')} u ON o.own_usr_id = u.usr_id - WHERE u.usr_uuid = ? - AND s.sim_uuid = UUID_TO_BIN(?) + SELECT sim_id, + BIN_TO_UUID(sim_uuid) AS sim_uuid, + BIN_TO_UUID(sim_root_kf_uuid) AS sim_root_kf_uuid + FROM ${this.#qualify(this.simDb, 'simulations')} + INNER JOIN ${this.#qualify(this.guiDb, 'simowners')} ON own_sim_uuid = sim_uuid + INNER JOIN ${this.#qualify(this.guiDb, 'users')} ON own_usr_id = usr_id + WHERE usr_uuid = ? + AND sim_uuid = UUID_TO_BIN(?) `, [userUuid, simulationUuid]) if(!rows.length) return({ ok: false, err: 'Simulation not found or access denied' }) return({ ok: true, sim: rows[0] }) } + async listSimulationOwnerUuids(simulationUuid) { + if(!isValidUuid(simulationUuid)) return({ ok: false, err: 'Invalid simulation UUID' }) + + const rows = await MySQLClient.poolExecute(this.db, ` + SELECT BIN_TO_UUID(usr_uuid) AS owner_uuid + FROM ${this.#qualify(this.guiDb, 'simowners')} + INNER JOIN ${this.#qualify(this.guiDb, 'users')} ON own_usr_id = usr_id + WHERE own_sim_uuid = UUID_TO_BIN(?) + `, [simulationUuid]) + + return({ + ok: true, + ownerUuids: rows.map(row => row.owner_uuid), + }) + } + + async listOwnerSimulations(userUuid) { + if(!isValidUuid(userUuid)) return({ ok: false, err: 'Invalid user UUID' }) + + const rows = await MySQLClient.poolExecute(this.db, ` + SELECT BIN_TO_UUID(sim_uuid) AS simulation_id + FROM ${this.#qualify(this.simDb, 'simulations')} + INNER JOIN ${this.#qualify(this.guiDb, 'simowners')} ON own_sim_uuid = sim_uuid + INNER JOIN ${this.#qualify(this.guiDb, 'users')} ON own_usr_id = usr_id + WHERE usr_uuid = ? + ORDER BY sim_id + `, [userUuid]) + + return({ + ok: true, + simulationIds: rows.map(row => row.simulation_id), + }) + } + async loadKeyframeAgents(keyframeId) { const rows = await MySQLClient.poolExecute(this.db, ` SELECT BIN_TO_UUID(ekfs_agent_id) AS agent_id, diff --git a/Observer/handlers/arena/lifecycle.js b/Observer/handlers/arena/lifecycle.js index 6bc3d14..76da949 100644 --- a/Observer/handlers/arena/lifecycle.js +++ b/Observer/handlers/arena/lifecycle.js @@ -7,5 +7,8 @@ export const eventHandlers = { bigBang(msg, chan) { this.observerSrv?.onBigBang() }, + simulationStopped(msg, chan) { + this.observerSrv?.onSimulationStopped(msg.payload ?? {}) + }, }, } diff --git a/Observer/handlers/system/positions.js b/Observer/handlers/system/positions.js index 226cb4f..67cfd69 100644 --- a/Observer/handlers/system/positions.js +++ b/Observer/handlers/system/positions.js @@ -107,7 +107,6 @@ export const actions = { success: true, payload: { frequency: result.frequency, - agents: result.agents, t: result.t, }, }) diff --git a/Observer/observerServer.js b/Observer/observerServer.js index 7371284..5bba4cb 100644 --- a/Observer/observerServer.js +++ b/Observer/observerServer.js @@ -1,7 +1,6 @@ import { AccesRights } from '../accesRights.js' import { GpsStorageReader } from './gpsStorageReader.js' import { RequestorRegistry } from './requestorRegistry.js' -import { replyToAction } from '../bus/publishActionReply.js' import { SimState } from '../GPS/simulationState.js' export class observerServer { @@ -26,6 +25,8 @@ export class observerServer { return({ senderId: observer.senderId ?? 'observer', scanIntervalMs: observer.scanIntervalMs ?? 300, + frustumEventsChannel: observer.observerFrustumEventsChannel + ?? 'system:observer:subscribed[UID]:agents', lifecycle: { arenaChannel: observer.lifecycle?.arenaChannel ?? 'arena:lifecycle', godsReadyChannel: observer.lifecycle?.godsReadyChannel ?? 'arena:gods:ready', @@ -53,20 +54,44 @@ export class observerServer { this.gpsStorageReader, () => this.now(), scanIntervalMs, - (sender, payload) => this.publishFrustumUpdate(sender, payload), + (subscriberId, payload) => this.publishFrustumAgentEvents( + subscriberId, + payload.agents, + payload.t + ), this.debug ) } - publishFrustumUpdate(sender, payload) { - if(!this.systemCnx || !sender) return - const observer = this.observerConfig.observer ?? {} - replyToAction(this.systemCnx, { - action: 'GETAGENTSINFRUSTUM', - sender, - success: true, - payload, - }) + async publishFrustumAgentEvents(subscriberId, agents, t) { + if(!this.systemCnx || !subscriberId) return + if(!Array.isArray(agents) || !agents.length) return + + const { frustumEventsChannel } = this.getObserverSettings() + const chan = frustumEventsChannel.replace(/\[UID\]/g, subscriberId) + const senderId = this.getObserverSettings().senderId + + for(const agent of agents) { + if(!agent?.id || !agent?.position) continue + + await this.systemCnx.redisPublish(chan, { + eventType: 'move', + sender: senderId, + payload: { + aid: agent.id, + coords: { + x: agent.position.x, + y: agent.position.y, + z: agent.position.z, + }, + t, + }, + }) + } + + if(this.debug) { + console.log(`[Observer] Frustum events: ${agents.length} agent(s) on ${chan} at t=${t}`) + } } isLive() { @@ -94,6 +119,13 @@ export class observerServer { this.state = SimState.LIVE } + onSimulationStopped(payload = {}) { + this.requestorRegistry?.clear() + this.state = SimState.IDLE + this.bigBangEpoch = null + if(this.debug) console.log(`[Observer] simulationStopped at t=${payload.t}`) + } + wireSystemConnexion(cnx) { cnx.observerSrv = this cnx.accessRights = this.accessRights diff --git a/Observer/requestorRegistry.js b/Observer/requestorRegistry.js index 807dd00..08f3923 100644 --- a/Observer/requestorRegistry.js +++ b/Observer/requestorRegistry.js @@ -56,11 +56,11 @@ export class RequestorRegistry { updatedAt: Date.now(), }) this.#ensureTick() + this.#pushAgentEvents(this.requestors.get(id)) return({ ok: true, frequency: frequencyMs, - agents: matching, t, }) } @@ -130,9 +130,9 @@ export class RequestorRegistry { return(matching) } - #pushUpdate(requestor) { + #pushAgentEvents(requestor) { if(typeof(this.onPush) !== 'function') return - this.onPush(requestor.id, { + void this.onPush(requestor.id, { agents: [...requestor.agents], t: requestor.t, }) @@ -160,7 +160,7 @@ export class RequestorRegistry { requestor.tickCounter++ if(requestor.tickCounter >= requestor.pushEveryNTicks) { requestor.tickCounter = 0 - this.#pushUpdate(requestor) + this.#pushAgentEvents(requestor) } } if(this.debug) console.log(`[Observer] Scanned ${agents.size} agent(s) for ${this.requestors.size} requestor(s)`) diff --git a/config.json b/config.json index a4780e4..69bfdaf 100644 --- a/config.json +++ b/config.json @@ -20,7 +20,9 @@ { "canDo": [ "STARTSIMULATION", - "STOPSIMULATION" + "PAUSESIMULATION", + "STOPSIMULATION", + "GETSIMULATIONSSTATUS" ], "roles": "*" } @@ -59,6 +61,7 @@ "arenaChannel": "arena:lifecycle", "godsReadyChannel": "arena:gods:ready" }, + "systemLifecycleChannel": "system:maestro:lifecycle:[UID]", "readyTimeoutMs": 30000 }, "mysql": { @@ -70,6 +73,7 @@ "primordialDaemon": false, "observerActionsChannel": "system:requests:observer", "observerActionsReply": "system:replies:[UID]", + "observerFrustumEventsChannel": "system:observer:subscribed[UID]:agents", "senderId": "observer", "scanIntervalMs": 300, "lifecycle": { diff --git a/configSchema.json b/configSchema.json index 4a0e50b..8e3a24f 100644 --- a/configSchema.json +++ b/configSchema.json @@ -157,6 +157,7 @@ "primordialDaemon": { "type": "boolean" }, "observerActionsChannel": { "type": "string" }, "observerActionsReply": { "type": "string" }, + "observerFrustumEventsChannel": { "type": "string" }, "senderId": { "type": "string" }, "scanIntervalMs": { "type": "integer", "minimum": 50 }, "lifecycle": { diff --git a/config_test.json b/config_test.json index 0e5f444..46a5d16 100644 --- a/config_test.json +++ b/config_test.json @@ -20,7 +20,9 @@ { "canDo": [ "STARTSIMULATION", - "STOPSIMULATION" + "PAUSESIMULATION", + "STOPSIMULATION", + "GETSIMULATIONSSTATUS" ], "roles": "*" } @@ -59,6 +61,7 @@ "arenaChannel": "arena:lifecycle", "godsReadyChannel": "arena:gods:ready" }, + "systemLifecycleChannel": "system:maestro:lifecycle:[UID]", "readyTimeoutMs": 30000 }, "mysql": { @@ -70,6 +73,7 @@ "primordialDaemon": false, "observerActionsChannel": "system:requests:observer", "observerActionsReply": "system:replies:[UID]", + "observerFrustumEventsChannel": "system:observer:subscribed[UID]:agents", "senderId": "observer", "scanIntervalMs": 300, "lifecycle": { diff --git a/tests/modules/maestro1.js b/tests/modules/maestro1.js index 9424b00..52b6a41 100644 --- a/tests/modules/maestro1.js +++ b/tests/modules/maestro1.js @@ -40,14 +40,14 @@ function vectorsEqual(a, b) { async function findSimulationFixture(ctx) { const { guiDatabase, simDatabase } = ctx.databases const rows = await MySQLClient.poolExecute(ctx.db, ` - SELECT u.usr_uuid AS user_uuid, - BIN_TO_UUID(s.sim_uuid) AS simulation_uuid - FROM \`${guiDatabase}\`.users u - INNER JOIN \`${guiDatabase}\`.simowners o ON o.own_usr_id = u.usr_id - INNER JOIN \`${simDatabase}\`.simulations s ON o.own_sim_uuid = s.sim_uuid - INNER JOIN \`${simDatabase}\`.edited_kf_store ekfs ON ekfs.ekfs_ekf_uuid = s.sim_root_kf_uuid - GROUP BY u.usr_uuid, s.sim_uuid - HAVING COUNT(ekfs.ekfs_agent_id) > 0 + SELECT usr_uuid AS user_uuid, + BIN_TO_UUID(sim_uuid) AS simulation_uuid + FROM \`${guiDatabase}\`.users + INNER JOIN \`${guiDatabase}\`.simowners ON own_usr_id = usr_id + INNER JOIN \`${simDatabase}\`.simulations ON own_sim_uuid = sim_uuid + INNER JOIN \`${simDatabase}\`.edited_kf_store ON ekfs_ekf_uuid = sim_root_kf_uuid + GROUP BY usr_uuid, sim_uuid + HAVING COUNT(ekfs_agent_id) > 0 LIMIT 1 `)