From 44a84c64ecc264567fefd6d7c0e26b3b1b854ba7 Mon Sep 17 00:00:00 2001 From: STEINNI Date: Sat, 20 Jun 2026 18:50:26 +0000 Subject: [PATCH] General Actions to handlers Refacto --- GPS/actions/arena/agentMotion.js | 27 +++ GPS/actions/arena/arenaHandlers.js | 70 ------ GPS/actions/arena/dispatch.js | 5 - GPS/actions/arena/index.js | 25 +- GPS/actions/arena/lifecycle.js | 23 ++ GPS/actions/system/dispatch.js | 28 --- GPS/actions/system/index.js | 24 +- GPS/actions/system/utilities.js | 104 ++------- GPS/actionsHelper.js | 22 -- GPS/p42Gps.js | 4 + Maestro/actions/arena/index.js | 17 ++ Maestro/actions/arena/prepare.js | 9 + Maestro/actions/system/index.js | 17 ++ Maestro/actions/system/simulation.js | 63 +++++ Maestro/actions/system/utilities.js | 20 ++ {SimMaestro => Maestro}/arenaGroom.js | 5 + {SimMaestro => Maestro}/maestroServer.js | 95 +++----- {SimMaestro => Maestro}/orchestrationState.js | 0 .../p42SimMaestro.js => Maestro/p42Maestro.js | 6 +- {SimMaestro => Maestro}/package.json | 2 +- Maestro/prepareQuorum.js | 103 +++++++++ Maestro/primordialDaemons.js | 23 ++ {SimMaestro => Maestro}/simRepository.js | 40 +++- {SimMaestro => Maestro}/startMaestro.sh | 2 +- {SimMaestro => Maestro}/stopMaestro.sh | 2 +- Observer/actions/arena/arenaHandlers.js | 32 --- Observer/actions/arena/dispatch.js | 5 - Observer/actions/arena/index.js | 23 +- Observer/actions/arena/lifecycle.js | 11 + Observer/actions/system/dispatch.js | 28 --- Observer/actions/system/index.js | 25 +- Observer/actions/system/positions.js | 218 +++--------------- Observer/actions/system/utilities.js | 38 +-- Observer/actionsHelper.js | 16 -- Observer/observerServer.js | 11 +- Observer/p42Observer.js | 4 + SimMaestro/actions/arena/arenaHandlers.js | 22 -- SimMaestro/actions/arena/dispatch.js | 5 - SimMaestro/actions/arena/index.js | 12 - SimMaestro/actions/system/dispatch.js | 28 --- SimMaestro/actions/system/index.js | 14 -- SimMaestro/actions/system/simulation.js | 138 ----------- SimMaestro/actions/system/utilities.js | 48 ---- SimMaestro/actionsHelper.js | 16 -- bus/assembleMesh.js | 40 ++++ bus/dispatchActions.js | 78 +++++++ bus/dispatchEvents.js | 33 +++ bus/publishActionReply.js | 69 ++++++ config.json | 3 +- configSchema.json | 6 +- package.json | 1 + redisConnexion.js | 14 +- tests/commands.txt | 4 + tests/modules/maestro1.js | 82 ++++--- tests/test.js | 43 +++- tests/uuid.js | 2 + 56 files changed, 832 insertions(+), 973 deletions(-) create mode 100644 GPS/actions/arena/agentMotion.js delete mode 100644 GPS/actions/arena/arenaHandlers.js delete mode 100644 GPS/actions/arena/dispatch.js create mode 100644 GPS/actions/arena/lifecycle.js delete mode 100644 GPS/actions/system/dispatch.js delete mode 100644 GPS/actionsHelper.js create mode 100644 Maestro/actions/arena/index.js create mode 100644 Maestro/actions/arena/prepare.js create mode 100644 Maestro/actions/system/index.js create mode 100644 Maestro/actions/system/simulation.js create mode 100644 Maestro/actions/system/utilities.js rename {SimMaestro => Maestro}/arenaGroom.js (79%) rename {SimMaestro => Maestro}/maestroServer.js (73%) rename {SimMaestro => Maestro}/orchestrationState.js (100%) rename SimMaestro/p42SimMaestro.js => Maestro/p42Maestro.js (92%) rename {SimMaestro => Maestro}/package.json (85%) create mode 100644 Maestro/prepareQuorum.js create mode 100644 Maestro/primordialDaemons.js rename {SimMaestro => Maestro}/simRepository.js (77%) rename {SimMaestro => Maestro}/startMaestro.sh (96%) rename {SimMaestro => Maestro}/stopMaestro.sh (53%) delete mode 100644 Observer/actions/arena/arenaHandlers.js delete mode 100644 Observer/actions/arena/dispatch.js create mode 100644 Observer/actions/arena/lifecycle.js delete mode 100644 Observer/actions/system/dispatch.js delete mode 100644 SimMaestro/actions/arena/arenaHandlers.js delete mode 100644 SimMaestro/actions/arena/dispatch.js delete mode 100644 SimMaestro/actions/arena/index.js delete mode 100644 SimMaestro/actions/system/dispatch.js delete mode 100644 SimMaestro/actions/system/index.js delete mode 100644 SimMaestro/actions/system/simulation.js delete mode 100644 SimMaestro/actions/system/utilities.js delete mode 100644 SimMaestro/actionsHelper.js create mode 100644 bus/assembleMesh.js create mode 100644 bus/dispatchActions.js create mode 100644 bus/dispatchEvents.js create mode 100644 bus/publishActionReply.js create mode 100644 tests/commands.txt create mode 100644 tests/uuid.js diff --git a/GPS/actions/arena/agentMotion.js b/GPS/actions/arena/agentMotion.js new file mode 100644 index 0000000..8815851 --- /dev/null +++ b/GPS/actions/arena/agentMotion.js @@ -0,0 +1,27 @@ + +export const eventHandlers = { + 'arena:agents:*': { + change(msg, chan) { + const agentId = msg.sender + if(!agentId || typeof(agentId) !== 'string') { + console.warn(`[${this.redisId}] Agent event without sender`) + return + } + const newVector = msg.payload?.newVector + if(!newVector || typeof(newVector.x) !== 'number' || typeof(newVector.y) !== 'number' || typeof(newVector.z) !== 'number') { + console.warn(`[${this.redisId}] Invalid newVector from ${agentId}`) + return + } + const newPosition = msg.payload?.newPosition ?? null + this.gpsSrv?.onVectorChange(agentId, newVector, newPosition) + }, + remove(msg, chan) { + const agentId = msg.sender + if(!agentId || typeof(agentId) !== 'string') { + console.warn(`[${this.redisId}] Agent event without sender`) + return + } + this.gpsSrv?.onAgentRemove(agentId) + }, + }, +} diff --git a/GPS/actions/arena/arenaHandlers.js b/GPS/actions/arena/arenaHandlers.js deleted file mode 100644 index c336aed..0000000 --- a/GPS/actions/arena/arenaHandlers.js +++ /dev/null @@ -1,70 +0,0 @@ - -export const construct = (redisCnx) => { - const tickMs = redisCnx.gpsSrv?.getGpsSettings().collisionTickMs ?? 100 - // Interval always runs; tickArena no-ops until LIVE (see gpsServer.tickArena) - setInterval(() => { - redisCnx.gpsSrv?.tickArena() - }, tickMs) -} - -export const methods = { - - handleLifecycleEvent(msg) { - const srv = this.gpsSrv - if(!srv) return - - if(msg.eventType === 'onYourMarks') { - srv.onYourMarks(msg.payload ?? {}).catch(err => { - console.error(`[${this.redisId}] onYourMarks failed:`, err) - srv.publishReadyToStart({ success: false, err: err.message ?? 'onYourMarks failed' }) - }) - return - } - - if(msg.eventType === 'bigBang') { - srv.onBigBang(msg.payload ?? {}) - return - } - }, - - handleAgentEvent(msg) { - const agentId = msg.sender - if(!agentId || typeof(agentId) !== 'string') { - console.warn(`[${this.redisId}] Agent event without sender`) - return - } - - if(msg.eventType === 'change') { - const newVector = msg.payload?.newVector - if(!newVector || typeof(newVector.x) !== 'number' || typeof(newVector.y) !== 'number' || typeof(newVector.z) !== 'number') { - console.warn(`[${this.redisId}] Invalid newVector from ${agentId}`) - return - } - const newPosition = msg.payload?.newPosition ?? null - this.gpsSrv.onVectorChange(agentId, newVector, newPosition) - return - } - - if(msg.eventType === 'remove') { - this.gpsSrv.onAgentRemove(agentId) - return - } - }, - - dispatchArenaMessage(msg, chan) { - const gps = this.config.gps - if(!gps || !this.gpsSrv) return(false) - - if(this.matchesChan(chan, gps.lifecycle?.arenaChannel ?? 'arena:lifecycle')) { - this.handleLifecycleEvent(msg) - return(true) - } - - if(this.matchesChan(chan, gps.agentVectorChangeChannel)) { - this.handleAgentEvent(msg) - return(true) - } - return(false) - }, - -} diff --git a/GPS/actions/arena/dispatch.js b/GPS/actions/arena/dispatch.js deleted file mode 100644 index 243fcef..0000000 --- a/GPS/actions/arena/dispatch.js +++ /dev/null @@ -1,5 +0,0 @@ - -export function dispatchMessage(redisCnx, msg, chan) { - if(!redisCnx.config.gps || typeof(redisCnx.dispatchArenaMessage) !== 'function') return - redisCnx.dispatchArenaMessage(msg, chan) -} diff --git a/GPS/actions/arena/index.js b/GPS/actions/arena/index.js index 607159a..5bbf1ad 100644 --- a/GPS/actions/arena/index.js +++ b/GPS/actions/arena/index.js @@ -1,11 +1,18 @@ -import { methods as arenaMethods, construct as arenaConstruct } from './arenaHandlers.js' -import { dispatchMessage } from './dispatch.js' +import { assembleHandlers, createDispatchMessage } from '../../../bus/assembleMesh.js' +import * as lifecycle from './lifecycle.js' +import * as agentMotion from './agentMotion.js' -export const afterLoginMethods = [ - arenaConstruct, -] +const { actionHandlers, eventHandlers, afterLogin } = assembleHandlers([lifecycle, agentMotion]) -export const meshActions = { - ...arenaMethods, -} -export { dispatchMessage } +export { actionHandlers, afterLogin } + +export const dispatchMessage = createDispatchMessage({ + eventHandlers, + actionRules(redisCnx) { + const gps = redisCnx.config.gps ?? {} + const arenaChannel = gps.bus?.arena?.actionsChannel + return({ + channels: arenaChannel ? [arenaChannel] : [], + }) + }, +}) diff --git a/GPS/actions/arena/lifecycle.js b/GPS/actions/arena/lifecycle.js new file mode 100644 index 0000000..7cfada0 --- /dev/null +++ b/GPS/actions/arena/lifecycle.js @@ -0,0 +1,23 @@ + +export function construct(redisCnx) { + const tickMs = redisCnx.gpsSrv?.getGpsSettings().collisionTickMs ?? 100 + setInterval(() => { + redisCnx.gpsSrv?.tickArena() + }, tickMs) +} + +export const eventHandlers = { + 'arena:lifecycle': { + onYourMarks(msg, chan) { + const srv = this.gpsSrv + if(!srv) return + srv.onYourMarks(msg.payload ?? {}).catch(err => { + console.error(`[${this.redisId}] onYourMarks failed:`, err) + srv.publishReadyToStart({ success: false, err: err.message ?? 'onYourMarks failed' }) + }) + }, + bigBang(msg, chan) { + this.gpsSrv?.onBigBang(msg.payload ?? {}) + }, + }, +} diff --git a/GPS/actions/system/dispatch.js b/GPS/actions/system/dispatch.js deleted file mode 100644 index 969acf9..0000000 --- a/GPS/actions/system/dispatch.js +++ /dev/null @@ -1,28 +0,0 @@ - -export function dispatchMessage(redisCnx, msg, chan) { - const gps = redisCnx.config.gps - if(!gps?.gpsActionsChannel) return - - const actionsChan = redisCnx.fullChan(gps.gpsActionsChannel) - if(chan != actionsChan) return - - const action = msg.action - if(!action || typeof(action) !== 'string') { - console.warn(`[${redisCnx.redisId}] Ignoring message without action on ${chan}`) - return - } - - const handler = redisCnx['action_'+action] - if(typeof(handler) != 'function') { - if(redisCnx.debug) console.warn(`[${redisCnx.redisId}] Unknown action ${action} on ${chan}`) - return - } - - const payload = ('payload' in msg) ? msg.payload : null - const reqid = ('reqid' in msg) ? msg.reqid.substr(0, 50) : null - const sender = msg.sender || null - const roles = Array.isArray(msg.roles) ? msg.roles : ['*'] - - if(redisCnx.debug) console.log(`[${redisCnx.redisId}] Dispatching action ${action} from ${sender}`) - handler.call(redisCnx, action, payload, reqid, sender, roles) -} diff --git a/GPS/actions/system/index.js b/GPS/actions/system/index.js index f968bb8..0b793b8 100644 --- a/GPS/actions/system/index.js +++ b/GPS/actions/system/index.js @@ -1,10 +1,16 @@ -import { methods as utilities, construct as utilitiesConstruct } from './utilities.js' -import { dispatchMessage } from './dispatch.js' +import { assembleHandlers, createDispatchMessage } from '../../../bus/assembleMesh.js' +import * as utilities from './utilities.js' -export const afterLoginMethods = [ - utilitiesConstruct, -] -export const meshActions = { - ...utilities, -} -export { dispatchMessage } +const { actionHandlers, eventHandlers, afterLogin } = assembleHandlers([utilities]) + +export { actionHandlers, afterLogin } + +export const dispatchMessage = createDispatchMessage({ + eventHandlers, + actionRules(redisCnx) { + const gps = redisCnx.config.gps ?? {} + return({ + channels: [gps.gpsActionsChannel].filter(Boolean), + }) + }, +}) diff --git a/GPS/actions/system/utilities.js b/GPS/actions/system/utilities.js index 07bf1b8..4350e33 100644 --- a/GPS/actions/system/utilities.js +++ b/GPS/actions/system/utilities.js @@ -1,109 +1,33 @@ -import { publishActionReply } from '../../actionsHelper.js' +import { replyToAction } from '../../../bus/publishActionReply.js' -export const construct = (redisCnx) => { - // console.log('Hello after login from utilities...') - // redisCnx.v42=0 - // setInterval(redisCnx.move4243.bind(redisCnx), 200) -} +export const actions = { -export const methods = { - - /* Event-Rx: - { - "action": "TIME" - "reqid": "6az5e4r6a" - } - Event-Tx: - { - "action": "TIME", - "success": true, - "payload" : { - gpsTime: "2022-09-01T14:42:22.603Z", - redisTime: "2022-09-01T14:42:22.603Z" - }, - "reqid": "6az5e4r6a" - } - */ - async action_TIME(action, payload, reqid, sender, roles){ - publishActionReply(this, { + async action_TIME(action, payload, reqid, sender, roles) { + replyToAction(this, { action, reqid, sender, - replyChannel: this.config.gps.gpsActionsReply, - reply: { - success: true, - payload: { - gpsTime: new Date().toISOString(), - redisTime: await this.redisClient.time(), - }, + success: true, + payload: { + gpsTime: new Date().toISOString(), + redisTime: await this.redisClient.time(), }, }) }, - - /* Event-Rx: - { - "action": "RELOADCONFIG" - "reqid": "6az5e4r6a" - } - Event-Tx: - { - "action": "RELOADCONFIG", - "success": true, - "reqid": "6az5e4r6a" - } - */ - async action_RELOADCONFIG(action, payload, reqid, sender, roles){ - const replyOpts = { - action, - reqid, - sender, - replyChannel: this.config.gps.gpsActionsReply, - } - if(!this.accessRights.canDo(roles, action)) { - publishActionReply(this, { ...replyOpts, reply: { - success: false, - err: 'Unauthorized action !', - } }) - return - } + async action_RELOADCONFIG(action, payload, reqid, sender, roles) { this.reloadAccessRights() - publishActionReply(this, { ...replyOpts, reply: { - success: true, - } }) + replyToAction(this, { action, reqid, sender, success: true }) }, - - /* Event-Rx: - { - "action": "GETCONFIG" - "reqid": "6az5e4r6a" - } - Event-Tx: - { - "action": "GETCONFIG", - "success": true, - "reqid": "6az5e4r6a", - payload: { ...the access rights, and roles... } - } - */ - async action_GETCONFIG(action, payload, reqid, sender, roles){ - const replyOpts = { + + async action_GETCONFIG(action, payload, reqid, sender, roles) { + replyToAction(this, { action, reqid, sender, - replyChannel: this.config.gps.gpsActionsReply, - } - if(!this.accessRights.canDo(roles, action)) { - publishActionReply(this, { ...replyOpts, reply: { - success: false, - err: 'Unauthorized action !', - } }) - return - } - publishActionReply(this, { ...replyOpts, reply: { success: true, payload: this.getAccessRights(), - } }) + }) }, } diff --git a/GPS/actionsHelper.js b/GPS/actionsHelper.js deleted file mode 100644 index a867894..0000000 --- a/GPS/actionsHelper.js +++ /dev/null @@ -1,22 +0,0 @@ - -export function publishActionReply(redisCnx, options) { - const { - action, - reqid, - sender, - reply, - replyChannel, - senderId = 'gps', - } = options - reply.action = action - reply.sender = senderId - if(reqid) reply.reqid = reqid - const chan = replyChannel.replace(/\[UID\]/g, sender) - redisCnx.redisPublish(chan, reply) -} - -export function parseSimTime(payload, fallbackFn) { - if(payload?.t != null && typeof(payload.t) === 'number' && !Number.isNaN(payload.t)) return(payload.t) - if(payload?.at != null && typeof(payload.at) === 'number' && !Number.isNaN(payload.at)) return(payload.at) - return(fallbackFn()) -} diff --git a/GPS/p42Gps.js b/GPS/p42Gps.js index 838e9d8..c6e1920 100644 --- a/GPS/p42Gps.js +++ b/GPS/p42Gps.js @@ -3,6 +3,7 @@ import yargs from 'yargs/yargs' import { hideBin } from 'yargs/helpers' import 'node:process' import {RedisConnexion} from '../redisConnexion.js' +import { busReplyRoute } from '../bus/publishActionReply.js' import {configHelper} from '../configHelper.js' import {gpsServer} from './gpsServer.js' import * as systemMesh from './actions/system/index.js' @@ -53,6 +54,7 @@ let cfgh = new configHelper({ function meshRedisConns(mesh, meshName, debug, rootConfig) { const { redis, ...meshConfig } = mesh + const busRoute = busReplyRoute(rootConfig.gps, meshName) return redis.map(cfg => new RedisConnexion({ debug, @@ -60,6 +62,8 @@ function meshRedisConns(mesh, meshName, debug, rootConfig) { redisId: cfg.redisId, meshName, meshModule: meshModules[meshName], + senderId: busRoute?.senderId, + actionsReply: busRoute?.actionsReply, }) ) } diff --git a/Maestro/actions/arena/index.js b/Maestro/actions/arena/index.js new file mode 100644 index 0000000..77e6406 --- /dev/null +++ b/Maestro/actions/arena/index.js @@ -0,0 +1,17 @@ +import { assembleHandlers, createDispatchMessage } from '../../../bus/assembleMesh.js' +import * as prepare from './prepare.js' + +const { actionHandlers, eventHandlers, afterLogin } = assembleHandlers([prepare]) + +export { actionHandlers, afterLogin } + +export const dispatchMessage = createDispatchMessage({ + eventHandlers, + actionRules(redisCnx) { + const maestro = redisCnx.config.maestro ?? {} + const arenaChannel = maestro.bus?.arena?.actionsChannel + return({ + channels: arenaChannel ? [arenaChannel] : [], + }) + }, +}) diff --git a/Maestro/actions/arena/prepare.js b/Maestro/actions/arena/prepare.js new file mode 100644 index 0000000..45aad95 --- /dev/null +++ b/Maestro/actions/arena/prepare.js @@ -0,0 +1,9 @@ + +export const eventHandlers = { + 'arena:gods:ready': { + readyToStart(msg, chan) { + if(!this.maestroSrv) return + this.maestroSrv.handlePrepareAck(msg, chan) + }, + }, +} diff --git a/Maestro/actions/system/index.js b/Maestro/actions/system/index.js new file mode 100644 index 0000000..e308779 --- /dev/null +++ b/Maestro/actions/system/index.js @@ -0,0 +1,17 @@ +import { assembleHandlers, createDispatchMessage } from '../../../bus/assembleMesh.js' +import * as simulation from './simulation.js' +import * as utilities from './utilities.js' + +const { actionHandlers, eventHandlers, afterLogin } = assembleHandlers([simulation, utilities]) + +export { actionHandlers, afterLogin } + +export const dispatchMessage = createDispatchMessage({ + eventHandlers, + actionRules(redisCnx) { + const maestro = redisCnx.config.maestro ?? {} + return({ + channels: [maestro.maestroActionsChannel].filter(Boolean), + }) + }, +}) diff --git a/Maestro/actions/system/simulation.js b/Maestro/actions/system/simulation.js new file mode 100644 index 0000000..64e24af --- /dev/null +++ b/Maestro/actions/system/simulation.js @@ -0,0 +1,63 @@ +import { replyToAction } from '../../../bus/publishActionReply.js' +import { isValidUuid } from '../../simRepository.js' + +export const actions = { + + async action_STARTSIMULATION(action, payload, reqid, sender, roles) { + if(!isValidUuid(sender)) { + replyToAction(this, { action, reqid, sender, success: false, err: 'Missing or invalid sender (user UUID)' }) + return + } + + if(!payload?.simulationUuid) { + replyToAction(this, { action, reqid, sender, success: false, err: 'Missing simulationUuid' }) + return + } + + const result = await this.maestroSrv.startSimulation(sender, payload) + if(!result.ok) { + replyToAction(this, { action, reqid, sender, success: false, err: result.err }) + return + } + + replyToAction(this, { + action, + reqid, + sender, + success: true, + payload: { + simulationId: result.simulationId, + keyframeId: result.keyframeId, + infraId: result.infraId, + agentIds: result.agentIds, + }, + }) + }, + + async action_STOPSIMULATION(action, payload, reqid, sender, roles) { + if(!isValidUuid(sender)) { + replyToAction(this, { action, reqid, sender, success: false, err: 'Missing or invalid sender (user UUID)' }) + return + } + + if(!payload?.simulationUuid) { + replyToAction(this, { action, reqid, sender, success: false, err: 'Missing simulationUuid' }) + return + } + + const result = await this.maestroSrv.stopSimulation(sender, payload) + if(!result.ok) { + replyToAction(this, { action, reqid, sender, success: false, err: result.err }) + return + } + + replyToAction(this, { + action, + reqid, + sender, + success: true, + payload: { simulationId: result.simulationId }, + }) + }, + +} diff --git a/Maestro/actions/system/utilities.js b/Maestro/actions/system/utilities.js new file mode 100644 index 0000000..e51086c --- /dev/null +++ b/Maestro/actions/system/utilities.js @@ -0,0 +1,20 @@ +import { replyToAction } from '../../../bus/publishActionReply.js' + +export const actions = { + + async action_RELOADCONFIG(action, payload, reqid, sender, roles) { + this.reloadAccessRights() + replyToAction(this, { action, reqid, sender, success: true }) + }, + + async action_GETCONFIG(action, payload, reqid, sender, roles) { + replyToAction(this, { + action, + reqid, + sender, + success: true, + payload: this.getAccessRights(), + }) + }, + +} diff --git a/SimMaestro/arenaGroom.js b/Maestro/arenaGroom.js similarity index 79% rename from SimMaestro/arenaGroom.js rename to Maestro/arenaGroom.js index 4a69b35..531a0c9 100644 --- a/SimMaestro/arenaGroom.js +++ b/Maestro/arenaGroom.js @@ -1,3 +1,4 @@ +const RESERVED_HASH_FIELDS = new Set(['position', 'vector', 'speed', 'segment']) export class ArenaGroom { @@ -25,6 +26,10 @@ export class ArenaGroom { const key = this.agentHashKey(agent.id) await this.cnx.redisHset(key, 'position', agent.position) await this.cnx.redisHset(key, 'vector', agent.vector) + for(const [field, value] of Object.entries(agent.store ?? {})) { + if(RESERVED_HASH_FIELDS.has(field)) continue + await this.cnx.redisHset(key, field, value) + } await this.cnx.redisSadd(this.arenaStorage.agentsIndexKey, agent.id) } if(this.debug) console.log(`[Maestro] Groomed ${agents.length} agent(s) into arena store`) diff --git a/SimMaestro/maestroServer.js b/Maestro/maestroServer.js similarity index 73% rename from SimMaestro/maestroServer.js rename to Maestro/maestroServer.js index e4b3488..9f9a28a 100644 --- a/SimMaestro/maestroServer.js +++ b/Maestro/maestroServer.js @@ -3,6 +3,8 @@ import { MySQLClient } from '@p42/p42modules' import { SimRepository } from './simRepository.js' import { ArenaGroom } from './arenaGroom.js' import { MaestroState } from './orchestrationState.js' +import { PrepareQuorum } from './prepareQuorum.js' +import { buildPrepareQuorum } from './primordialDaemons.js' export class maestroServer { @@ -18,12 +20,10 @@ export class maestroServer { this.db = null this.simRepo = null this.arenaGroom = null + this.prepareQuorum = null this.orchestrationState = MaestroState.IDLE this.simulationId = null this.agentIds = [] - this.readyGods = new Map() - this.readyQuorumResolve = null - this.readyQuorumTimer = null } getMaestroSettings() { @@ -32,9 +32,8 @@ export class maestroServer { senderId: maestro.senderId ?? 'maestro', lifecycle: { arenaChannel: maestro.lifecycle?.arenaChannel ?? 'arena:lifecycle', - godsReadyChannel: maestro.lifecycle?.godsReadyChannel ?? 'arena:gods:ready', + prepareAckChannel: maestro.lifecycle?.godsReadyChannel ?? 'arena:gods:ready', }, - expectedGods: maestro.expectedGods ?? ['gps'], readyTimeoutMs: maestro.readyTimeoutMs ?? 30000, }) } @@ -77,6 +76,17 @@ export class maestroServer { } } + refreshPrepareQuorum() { + if(!this.arenaCnx) return + const { prepareAckChannel, readyTimeoutMs } = this.getMaestroSettings() + this.prepareQuorum = new PrepareQuorum({ + ackChannel: prepareAckChannel, + timeoutMs: readyTimeoutMs, + matchesChan: this.arenaCnx.matchesChan.bind(this.arenaCnx), + debug: this.debug, + }) + } + wireSystemConnexion(cnx) { cnx.maestroSrv = this cnx.accessRights = this.accessRights @@ -93,6 +103,7 @@ export class maestroServer { if(!this.arenaCnx || cnx.redisConfig.role === 'primary') { this.arenaCnx = cnx this.refreshArenaGroom() + this.refreshPrepareQuorum() } } @@ -100,70 +111,17 @@ export class maestroServer { this.orchestrationState = MaestroState.IDLE this.simulationId = null this.agentIds = [] - this.readyGods.clear() - this.clearReadyQuorumWait() + this.prepareQuorum?.cancel() } - clearReadyQuorumWait() { - if(this.readyQuorumTimer) { - clearTimeout(this.readyQuorumTimer) - this.readyQuorumTimer = null - } - this.readyQuorumResolve = null - } - - completeReadyQuorum(result) { - const resolve = this.readyQuorumResolve - this.clearReadyQuorumWait() - if(typeof(resolve) === 'function') resolve(result) - } - - waitForReadyQuorum() { - const { readyTimeoutMs } = this.getMaestroSettings() - return(new Promise(resolve => { - this.readyQuorumResolve = resolve - this.readyQuorumTimer = setTimeout(() => { - this.completeReadyQuorum({ - ok: false, - err: `Timeout waiting for readyToStart (${readyTimeoutMs}ms)`, - }) - }, readyTimeoutMs) - })) - } - - onReadyToStart(msg) { - if(this.orchestrationState !== MaestroState.PREPARING) return - - const payload = msg.payload ?? {} - if(payload.simulationId !== this.simulationId) return - - const sender = msg.sender - const { expectedGods } = this.getMaestroSettings() - if(!expectedGods.includes(sender)) { - if(this.debug) console.warn(`[Maestro] Ignoring readyToStart from unexpected sender: ${sender}`) - return - } - - if(!payload.success) { - this.completeReadyQuorum({ - ok: false, - err: payload.err ?? `Participant ${sender} failed prepare`, - }) - return - } - - this.readyGods.set(sender, payload) - if(this.debug) console.log(`[Maestro] readyToStart from ${sender} (${this.readyGods.size}/${expectedGods.length})`) - - for(const god of expectedGods) { - if(!this.readyGods.has(god)) return - } - - this.completeReadyQuorum({ ok: true }) + handlePrepareAck(msg, chan) { + if(this.orchestrationState !== MaestroState.PREPARING) return(false) + if(!this.prepareQuorum) return(false) + return(this.prepareQuorum.handleMessage(msg, chan)) } async publishLifecycle(eventType, payload) { - if(!this.arenaCnx) throw new Error('No arena Redis connection') + if(!this.arenaCnx) throw(new Error('No arena Redis connection')) const { arenaChannel, senderId } = this.getMaestroSettings().lifecycle await this.arenaCnx.redisPublish(arenaChannel, { eventType, @@ -176,15 +134,16 @@ export class maestroServer { async startSimulation(userUuid, payload) { if(!this.simRepo) return({ ok: false, err: 'Database not initialized' }) if(!this.arenaGroom) return({ ok: false, err: 'No arena Redis connection' }) + if(!this.prepareQuorum) return({ ok: false, err: 'No prepare quorum (arena Redis not wired)' }) if(!this.isIdle()) return({ ok: false, err: 'A simulation is already in progress' }) const simulationUuid = payload?.simulationUuid - const keyframeId = payload?.keyframeId const infraId = payload?.infraId ?? null - const access = await this.simRepo.validateSimulationAccess(userUuid, simulationUuid, keyframeId) + const access = await this.simRepo.validateSimulationAccess(userUuid, simulationUuid) if(!access.ok) return(access) + const keyframeId = access.sim.sim_root_kf_uuid const agentsResult = await this.simRepo.loadKeyframeAgents(keyframeId) if(!agentsResult.ok) return(agentsResult) @@ -193,7 +152,6 @@ export class maestroServer { this.simulationId = simulationUuid this.agentIds = agentsResult.agents.map(a => a.id) - this.readyGods.clear() this.orchestrationState = MaestroState.PREPARING const lifecyclePayload = { @@ -203,7 +161,8 @@ export class maestroServer { infraId, } - const readyWait = this.waitForReadyQuorum() + const expectedParticipants = buildPrepareQuorum(this.agentIds, this.maestroConfig) + const readyWait = this.prepareQuorum.begin(expectedParticipants, this.simulationId) await this.publishLifecycle('onYourMarks', lifecyclePayload) diff --git a/SimMaestro/orchestrationState.js b/Maestro/orchestrationState.js similarity index 100% rename from SimMaestro/orchestrationState.js rename to Maestro/orchestrationState.js diff --git a/SimMaestro/p42SimMaestro.js b/Maestro/p42Maestro.js similarity index 92% rename from SimMaestro/p42SimMaestro.js rename to Maestro/p42Maestro.js index e6d196e..dec9d3b 100644 --- a/SimMaestro/p42SimMaestro.js +++ b/Maestro/p42Maestro.js @@ -3,6 +3,7 @@ import yargs from 'yargs/yargs' import { hideBin } from 'yargs/helpers' import 'node:process' import { RedisConnexion } from '../redisConnexion.js' +import { busReplyRoute } from '../bus/publishActionReply.js' import { configHelper } from '../configHelper.js' import { maestroServer } from './maestroServer.js' import * as systemMesh from './actions/system/index.js' @@ -25,7 +26,7 @@ console.log = (...args) => logWithTimestamp(originalLog, 'LOG', ...args) console.warn = (...args) => logWithTimestamp(originalWarn, 'WARN', ...args) console.error = (...args) => logWithTimestamp(originalError, 'ERROR', ...args) -const argv = yargs(hideBin(process.argv)).command('SimMaestro', 'Simulation orchestrator for P42', {}) +const argv = yargs(hideBin(process.argv)).command('Maestro', 'Simulation orchestrator for P42', {}) .options({ 'debug': { description: 'shows debug info', @@ -49,6 +50,7 @@ let cfgh = new configHelper({ function meshRedisConns(mesh, meshName, debug, rootConfig) { const { redis, ...meshConfig } = mesh + const busRoute = busReplyRoute(rootConfig.maestro, meshName) return redis.map(cfg => new RedisConnexion({ debug, @@ -56,6 +58,8 @@ function meshRedisConns(mesh, meshName, debug, rootConfig) { redisId: cfg.redisId, meshName, meshModule: meshModules[meshName], + senderId: busRoute?.senderId, + actionsReply: busRoute?.actionsReply, }) ) } diff --git a/SimMaestro/package.json b/Maestro/package.json similarity index 85% rename from SimMaestro/package.json rename to Maestro/package.json index 6f4566c..74f6786 100644 --- a/SimMaestro/package.json +++ b/Maestro/package.json @@ -1,5 +1,5 @@ { - "name": "p42SimMaestro", + "name": "p42Maestro", "version": "1.0.0", "description": "Simulation orchestrator God-daemon for P42", "type": "module", diff --git a/Maestro/prepareQuorum.js b/Maestro/prepareQuorum.js new file mode 100644 index 0000000..c693100 --- /dev/null +++ b/Maestro/prepareQuorum.js @@ -0,0 +1,103 @@ + +export class PrepareQuorum { + + constructor({ ackChannel, timeoutMs, matchesChan, debug = false }) { + this.ackChannel = ackChannel + this.timeoutMs = timeoutMs + this.matchesChan = matchesChan + this.debug = debug + this.expected = new Set() + this.ready = new Map() + this.simulationId = null + this.active = false + this.resolve = null + this.timer = null + } + + begin(expectedParticipantIds, simulationId) { + this.cancel() + this.expected = new Set(expectedParticipantIds) + this.ready.clear() + this.simulationId = simulationId + this.active = true + + if(this.debug) { + console.log( + `[Maestro] Prepare quorum armed: ${this.expected.size} participant(s) ` + + `(agents + primordial daemons), timeout ${this.timeoutMs}ms` + ) + } + + return(new Promise(resolve => { + this.resolve = resolve + this.timer = setTimeout(() => { + const missing = [...this.expected].filter(id => !this.ready.has(id)) + this.#finish({ + ok: false, + err: `Timeout waiting for readyToStart (${this.timeoutMs}ms); ` + + `missing: ${missing.join(', ') || 'unknown'}`, + }) + }, this.timeoutMs) + })) + } + + handleMessage(msg, chan) { + if(!this.active) return(false) + if(typeof(this.matchesChan) !== 'function') return(false) + if(!this.matchesChan(chan, this.ackChannel)) return(false) + if(msg?.eventType !== 'readyToStart') return(false) + + const payload = msg.payload ?? {} + if(payload.simulationId !== this.simulationId) return(false) + + const sender = msg.sender + if(!sender || !this.expected.has(sender)) { + if(this.debug) { + console.warn(`[Maestro] Ignoring readyToStart from unexpected participant: ${sender}`) + } + return(true) + } + + if(!payload.success) { + this.#finish({ + ok: false, + err: payload.err ?? `Participant ${sender} failed prepare`, + }) + return(true) + } + + this.ready.set(sender, payload) + if(this.debug) { + console.log( + `[Maestro] readyToStart from ${sender} ` + + `(${this.ready.size}/${this.expected.size})` + ) + } + + for(const participantId of this.expected) { + if(!this.ready.has(participantId)) return(true) + } + + this.#finish({ ok: true }) + return(true) + } + + cancel() { + if(this.timer) { + clearTimeout(this.timer) + this.timer = null + } + this.active = false + this.resolve = null + this.expected.clear() + this.ready.clear() + this.simulationId = null + } + + #finish(result) { + const resolve = this.resolve + this.cancel() + if(typeof(resolve) === 'function') resolve(result) + } + +} diff --git a/Maestro/primordialDaemons.js b/Maestro/primordialDaemons.js new file mode 100644 index 0000000..36f5f7f --- /dev/null +++ b/Maestro/primordialDaemons.js @@ -0,0 +1,23 @@ +const SKIP_PRIMORDIAL_SECTIONS = new Set([ + 'maestro', + 'mysql', + 'accessRights', + 'systemMesh', + 'arenaMesh', +]) + +export function getPrimordialDaemonIds(config = {}) { + const ids = [] + for(const [section, block] of Object.entries(config)) { + if(SKIP_PRIMORDIAL_SECTIONS.has(section)) continue + if(!block || typeof(block) !== 'object' || Array.isArray(block)) continue + if(!block.primordialDaemon) continue + ids.push(typeof(block.senderId) === 'string' ? block.senderId : section) + } + return(ids) +} + +export function buildPrepareQuorum(agentIds, config) { + const primordialIds = getPrimordialDaemonIds(config) + return([...agentIds, ...primordialIds]) +} diff --git a/SimMaestro/simRepository.js b/Maestro/simRepository.js similarity index 77% rename from SimMaestro/simRepository.js rename to Maestro/simRepository.js index f7b79d6..5e3992b 100644 --- a/SimMaestro/simRepository.js +++ b/Maestro/simRepository.js @@ -19,20 +19,27 @@ export class SimRepository { return(`\`${db}\`.${table}`) } - #parseGpsValues(raw) { + #parseJsonObject(raw) { let v = raw if(v == null) return(null) if(typeof(v) === 'string') { try { v = JSON.parse(v) } catch { return(null) } } - if(typeof(v) !== 'object') return(null) + if(typeof(v) !== 'object' || Array.isArray(v)) return(null) + return(v) + } + + #parseGpsValues(raw) { + const v = this.#parseJsonObject(raw) + if(!v) return(null) const position = v.position const speed = v.speed ?? v.vector if(!position || !speed) return(null) const axes = ['x', 'y', 'z'] for(const axis of axes) { - if(typeof(position[axis]) !== 'number' || typeof(speed[axis]) !== 'number') return(null) + if(typeof(position[axis]) !== 'number' || !Number.isFinite(position[axis])) return(null) + if(typeof(speed[axis]) !== 'number' || !Number.isFinite(speed[axis])) return(null) } return({ position: { x: position.x, y: position.y, z: position.z }, @@ -40,10 +47,15 @@ export class SimRepository { }) } - async validateSimulationAccess(userUuid, simulationUuid, keyframeId) { + #parseStoreValues(raw) { + const v = this.#parseJsonObject(raw) + if(v == null) return(null) + return({ ...v }) + } + + async validateSimulationAccess(userUuid, simulationUuid) { if(!isValidUuid(userUuid)) return({ ok: false, err: 'Invalid user UUID' }) if(!isValidUuid(simulationUuid)) return({ ok: false, err: 'Invalid simulation UUID' }) - if(!isValidUuid(keyframeId)) return({ ok: false, err: 'Invalid keyframe ID' }) const rows = await MySQLClient.poolExecute(this.db, ` SELECT s.sim_id, @@ -59,16 +71,14 @@ export class SimRepository { if(!rows.length) return({ ok: false, err: 'Simulation not found or access denied' }) const sim = rows[0] - if(sim.sim_root_kf_uuid !== keyframeId) { - return({ ok: false, err: 'Keyframe does not match simulation root keyframe' }) - } + if(!sim.sim_root_kf_uuid) return({ ok: false, err: 'Simulation has no root keyframe' }) const kfRows = await MySQLClient.poolExecute(this.db, ` SELECT ekf_uuid FROM ${this.#qualify(this.simDb, 'edited_keyframes')} WHERE ekf_uuid = UUID_TO_BIN(?) - `, [keyframeId]) - if(!kfRows.length) return({ ok: false, err: 'Keyframe not found' }) + `, [sim.sim_root_kf_uuid]) + if(!kfRows.length) return({ ok: false, err: 'Root keyframe not found' }) return({ ok: true, sim }) } @@ -94,7 +104,9 @@ export class SimRepository { async loadKeyframeAgents(keyframeId) { const rows = await MySQLClient.poolExecute(this.db, ` - SELECT BIN_TO_UUID(ekfs_agent_id) AS agent_id, ekfs_gps_values + SELECT BIN_TO_UUID(ekfs_agent_id) AS agent_id, + ekfs_gps_values, + ekfs_store_values FROM ${this.#qualify(this.simDb, 'edited_kf_store')} WHERE ekfs_ekf_uuid = UUID_TO_BIN(?) `, [keyframeId]) @@ -108,10 +120,16 @@ export class SimRepository { errors.push(`Invalid GPS values for agent ${row.agent_id}`) continue } + const store = this.#parseStoreValues(row.ekfs_store_values) + if(store == null) { + errors.push(`Invalid store values for agent ${row.agent_id}`) + continue + } agents.push({ id: row.agent_id, position: parsed.position, vector: parsed.vector, + store, }) } diff --git a/SimMaestro/startMaestro.sh b/Maestro/startMaestro.sh similarity index 96% rename from SimMaestro/startMaestro.sh rename to Maestro/startMaestro.sh index b35032c..9b52442 100755 --- a/SimMaestro/startMaestro.sh +++ b/Maestro/startMaestro.sh @@ -4,7 +4,7 @@ set -a . /etc/p42/secrets.env set +a -daemon=p42SimMaestro +daemon=p42Maestro logfile=maestro.log pid=$(pgrep -f "$daemon") diff --git a/SimMaestro/stopMaestro.sh b/Maestro/stopMaestro.sh similarity index 53% rename from SimMaestro/stopMaestro.sh rename to Maestro/stopMaestro.sh index d904890..65365a4 100755 --- a/SimMaestro/stopMaestro.sh +++ b/Maestro/stopMaestro.sh @@ -1,6 +1,6 @@ #!/bin/sh -pid=`ps -ef | grep p42SimMaestro.js |grep -v grep | awk '{print $2}'` +pid=`ps -ef | grep p42Maestro.js |grep -v grep | awk '{print $2}'` if [ -n "$pid" ] then echo "killing pid: $pid" diff --git a/Observer/actions/arena/arenaHandlers.js b/Observer/actions/arena/arenaHandlers.js deleted file mode 100644 index cd415bf..0000000 --- a/Observer/actions/arena/arenaHandlers.js +++ /dev/null @@ -1,32 +0,0 @@ - -export const construct = (redisCnx) => { -} - -export const methods = { - - handleLifecycleEvent(msg) { - const srv = this.observerSrv - if(!srv) return - - if(msg.eventType === 'onYourMarks') { - srv.onYourMarks() - return - } - - if(msg.eventType === 'bigBang') { - srv.onBigBang() - } - }, - - dispatchArenaMessage(msg, chan) { - const observer = this.config.observer - if(!observer || !this.observerSrv) return(false) - - if(this.matchesChan(chan, observer.lifecycle?.arenaChannel ?? 'arena:lifecycle')) { - this.handleLifecycleEvent(msg) - return(true) - } - return(false) - }, - -} diff --git a/Observer/actions/arena/dispatch.js b/Observer/actions/arena/dispatch.js deleted file mode 100644 index a70d6d8..0000000 --- a/Observer/actions/arena/dispatch.js +++ /dev/null @@ -1,5 +0,0 @@ - -export function dispatchMessage(redisCnx, msg, chan) { - if(typeof(redisCnx.dispatchArenaMessage) !== 'function') return - redisCnx.dispatchArenaMessage(msg, chan) -} diff --git a/Observer/actions/arena/index.js b/Observer/actions/arena/index.js index c0d66bc..89f4e2a 100644 --- a/Observer/actions/arena/index.js +++ b/Observer/actions/arena/index.js @@ -1,12 +1,17 @@ -import { methods as arenaMethods, construct as arenaConstruct } from './arenaHandlers.js' -import { dispatchMessage } from './dispatch.js' +import { assembleHandlers, createDispatchMessage } from '../../../bus/assembleMesh.js' +import * as lifecycle from './lifecycle.js' -export const afterLoginMethods = [ - arenaConstruct, -] +const { actionHandlers, eventHandlers, afterLogin } = assembleHandlers([lifecycle]) -export const meshActions = { - ...arenaMethods, -} +export { actionHandlers, afterLogin } -export { dispatchMessage } +export const dispatchMessage = createDispatchMessage({ + eventHandlers, + actionRules(redisCnx) { + const observer = redisCnx.config.observer ?? {} + const arenaChannel = observer.bus?.arena?.actionsChannel + return({ + channels: arenaChannel ? [arenaChannel] : [], + }) + }, +}) diff --git a/Observer/actions/arena/lifecycle.js b/Observer/actions/arena/lifecycle.js new file mode 100644 index 0000000..6bc3d14 --- /dev/null +++ b/Observer/actions/arena/lifecycle.js @@ -0,0 +1,11 @@ + +export const eventHandlers = { + 'arena:lifecycle': { + onYourMarks(msg, chan) { + this.observerSrv?.onYourMarks() + }, + bigBang(msg, chan) { + this.observerSrv?.onBigBang() + }, + }, +} diff --git a/Observer/actions/system/dispatch.js b/Observer/actions/system/dispatch.js deleted file mode 100644 index e5ca12b..0000000 --- a/Observer/actions/system/dispatch.js +++ /dev/null @@ -1,28 +0,0 @@ - -export function dispatchMessage(redisCnx, msg, chan) { - const observer = redisCnx.config.observer - if(!observer?.observerActionsChannel) return - - const actionsChan = redisCnx.fullChan(observer.observerActionsChannel) - if(chan != actionsChan) return - - const action = msg.action - if(!action || typeof(action) !== 'string') { - console.warn(`[${redisCnx.redisId}] Ignoring message without action on ${chan}`) - return - } - - const handler = redisCnx['action_'+action] - if(typeof(handler) != 'function') { - if(redisCnx.debug) console.warn(`[${redisCnx.redisId}] Unknown action ${action} on ${chan}`) - return - } - - const payload = ('payload' in msg) ? msg.payload : null - const reqid = ('reqid' in msg) ? msg.reqid.substr(0, 50) : null - const sender = msg.sender || null - const roles = Array.isArray(msg.roles) ? msg.roles : ['*'] - - if(redisCnx.debug) console.log(`[${redisCnx.redisId}] Dispatching action ${action} from ${sender}`) - handler.call(redisCnx, action, payload, reqid, sender, roles) -} diff --git a/Observer/actions/system/index.js b/Observer/actions/system/index.js index 3f95450..63cbbce 100644 --- a/Observer/actions/system/index.js +++ b/Observer/actions/system/index.js @@ -1,14 +1,17 @@ -import { methods as utilities, construct as utilitiesConstruct } from './utilities.js' -import { methods as positions } from './positions.js' -import { dispatchMessage } from './dispatch.js' +import { assembleHandlers, createDispatchMessage } from '../../../bus/assembleMesh.js' +import * as positions from './positions.js' +import * as utilities from './utilities.js' -export const afterLoginMethods = [ - utilitiesConstruct, -] +const { actionHandlers, eventHandlers, afterLogin } = assembleHandlers([positions, utilities]) -export const meshActions = { - ...utilities, - ...positions, -} +export { actionHandlers, afterLogin } -export { dispatchMessage } +export const dispatchMessage = createDispatchMessage({ + eventHandlers, + actionRules(redisCnx) { + const observer = redisCnx.config.observer ?? {} + return({ + channels: [observer.observerActionsChannel].filter(Boolean), + }) + }, +}) diff --git a/Observer/actions/system/positions.js b/Observer/actions/system/positions.js index 223f820..226cb4f 100644 --- a/Observer/actions/system/positions.js +++ b/Observer/actions/system/positions.js @@ -1,257 +1,93 @@ -import { publishActionReply, parseSimTime } from '../../actionsHelper.js' +import { replyToAction } from '../../../bus/publishActionReply.js' +import { parseSimTime } from '../../actionsHelper.js' import { Frustum } from '../../frustum.js' -export const methods = { +export const actions = { - /* Event-Rx: - { - "action": "GETAGENTPOSITION", - "reqid": "6az5e4r6a", - "payload": { - "agentId": "agent42", - "t": 12.5 - } - } - Event-Tx: - { - "action": "GETAGENTPOSITION", - "success": true, - "reqid": "6az5e4r6a", - "payload": { - "agent": { - "id": "agent42", - "position": { "x": 1, "y": 2, "z": 3 }, - "vector": { "x": 0, "y": 0, "z": 0 }, - "since": 0, - "generation": 2, - "t": 12.5 - } - } - } - */ async action_GETAGENTPOSITION(action, payload, reqid, sender, roles) { - const replyOpts = { - action, - reqid, - sender, - replyChannel: this.config.observer.observerActionsReply, - } - if(!this.accessRights.canDo(roles, action)) { - publishActionReply(this, { ...replyOpts, reply: { - success: false, - err: 'Unauthorized action !', - } }) - return - } - const reader = this.observerSrv.gpsStorageReader if(!reader) { - publishActionReply(this, { ...replyOpts, reply: { - success: false, - err: 'GPS storage reader not ready', - } }) + replyToAction(this, { action, reqid, sender, success: false, err: 'GPS storage reader not ready' }) return } if(!this.observerSrv.isLive()) { - publishActionReply(this, { ...replyOpts, reply: { - success: false, - err: 'Simulation not live', - } }) + replyToAction(this, { action, reqid, sender, success: false, err: 'Simulation not live' }) return } const agentId = payload?.agentId if(!agentId || typeof(agentId) !== 'string') { - publishActionReply(this, { ...replyOpts, reply: { - success: false, - err: 'Missing or invalid agentId', - } }) + replyToAction(this, { action, reqid, sender, success: false, err: 'Missing or invalid agentId' }) return } const at = parseSimTime(payload, () => this.observerSrv.now()) if(at === null) { - publishActionReply(this, { ...replyOpts, reply: { - success: false, - err: 'Invalid simulation time', - } }) + replyToAction(this, { action, reqid, sender, success: false, err: 'Invalid simulation time' }) return } const agent = await reader.getAgentPosition(agentId, at) if(!agent) { - publishActionReply(this, { ...replyOpts, reply: { - success: false, - err: `Unknown agent: ${agentId}`, - } }) + replyToAction(this, { action, reqid, sender, success: false, err: `Unknown agent: ${agentId}` }) return } - publishActionReply(this, { ...replyOpts, reply: { - success: true, - payload: { agent }, - } }) + replyToAction(this, { action, reqid, sender, success: true, payload: { agent } }) }, - /* Event-Rx: - { - "action": "GETAGENTSINFRUSTUM", - "reqid": "6az5e4r6a", - "payload": { - "planes": [ - { "nx": 1, "ny": 0, "nz": 0, "d": -10 }, - { "nx": -1, "ny": 0, "nz": 0, "d": 10 }, - { "nx": 0, "ny": 1, "nz": 0, "d": -10 }, - { "nx": 0, "ny": -1, "nz": 0, "d": 10 }, - { "nx": 0, "ny": 0, "nz": 1, "d": 0 }, - { "nx": 0, "ny": 0, "nz": -1, "d": 5 } - ], - "t": 0 - } - } - */ async action_GETAGENTSINFRUSTUM(action, payload, reqid, sender, roles) { - const replyOpts = { - action, - reqid, - sender, - replyChannel: this.config.observer.observerActionsReply, - } - if(!this.accessRights.canDo(roles, action)) { - publishActionReply(this, { ...replyOpts, reply: { - success: false, - err: 'Unauthorized action !', - } }) - return - } - const registry = this.observerSrv.requestorRegistry if(!registry) { - publishActionReply(this, { ...replyOpts, reply: { - success: false, - err: 'Requestor registry not ready', - } }) + replyToAction(this, { action, reqid, sender, success: false, err: 'Requestor registry not ready' }) return } if(!this.observerSrv.isLive()) { - publishActionReply(this, { ...replyOpts, reply: { - success: false, - err: 'Simulation not live', - } }) + replyToAction(this, { action, reqid, sender, success: false, err: 'Simulation not live' }) return } const frustum = Frustum.fromPlanes(payload?.planes) if(!frustum) { - publishActionReply(this, { ...replyOpts, reply: { - success: false, - err: 'Missing or invalid frustum planes (expected 6)', - } }) + replyToAction(this, { action, reqid, sender, success: false, err: 'Missing or invalid frustum planes (expected 6)' }) return } const at = parseSimTime(payload, () => this.observerSrv.now()) if(at === null) { - publishActionReply(this, { ...replyOpts, reply: { - success: false, - err: 'Invalid simulation time', - } }) + replyToAction(this, { action, reqid, sender, success: false, err: 'Invalid simulation time' }) return } const result = await registry.evaluateOnce({ frustum, t: at }) if(!result.ok) { - publishActionReply(this, { ...replyOpts, reply: { - success: false, - err: result.err, - } }) + replyToAction(this, { action, reqid, sender, success: false, err: result.err }) return } - publishActionReply(this, { ...replyOpts, reply: { + replyToAction(this, { + action, + reqid, + sender, success: true, payload: { agents: result.agents, t: result.t, }, - } }) + }) }, - /* Event-Rx: - { - "action": "SUBSCRIBEFRUSTUM", - "reqid": "6az5e4r6a", - "sender": "client-uuid", - "payload": { - "planes": [ - { "nx": 1, "ny": 0, "nz": 0, "d": -10 }, - { "nx": -1, "ny": 0, "nz": 0, "d": 10 }, - { "nx": 0, "ny": 1, "nz": 0, "d": -10 }, - { "nx": 0, "ny": -1, "nz": 0, "d": 10 }, - { "nx": 0, "ny": 0, "nz": 1, "d": 0 }, - { "nx": 0, "ny": 0, "nz": -1, "d": 5 } - ], - "frequency": 800 - } - } - Event-Tx: - { - "action": "SUBSCRIBEFRUSTUM", - "success": true, - "reqid": "6az5e4r6a", - "payload": { - "frequency": 900, - "agents": [ ... ], - "t": 12.5 - } - } - Periodic push (no reqid): - { - "action": "GETAGENTSINFRUSTUM", - "success": true, - "sender": "observer", - "payload": { "agents": [ ... ], "t": 12.5 } - } - */ async action_SUBSCRIBEFRUSTUM(action, payload, reqid, sender, roles) { - const replyOpts = { - action, - reqid, - sender, - replyChannel: this.config.observer.observerActionsReply, - } - if(!this.accessRights.canDo(roles, action)) { - publishActionReply(this, { ...replyOpts, reply: { - success: false, - err: 'Unauthorized action !', - } }) - return - } - - if(!sender || typeof(sender) !== 'string') { - publishActionReply(this, { ...replyOpts, reply: { - success: false, - err: 'Missing or invalid sender', - } }) - return - } - const registry = this.observerSrv.requestorRegistry if(!registry) { - publishActionReply(this, { ...replyOpts, reply: { - success: false, - err: 'Requestor registry not ready', - } }) + replyToAction(this, { action, reqid, sender, success: false, err: 'Requestor registry not ready' }) return } if(!this.observerSrv.isLive()) { - publishActionReply(this, { ...replyOpts, reply: { - success: false, - err: 'Simulation not live', - } }) + replyToAction(this, { action, reqid, sender, success: false, err: 'Simulation not live' }) return } @@ -260,21 +96,21 @@ export const methods = { frequency: payload?.frequency, }) if(!result.ok) { - publishActionReply(this, { ...replyOpts, reply: { - success: false, - err: result.err, - } }) + replyToAction(this, { action, reqid, sender, success: false, err: result.err }) return } - publishActionReply(this, { ...replyOpts, reply: { + replyToAction(this, { + action, + reqid, + sender, success: true, payload: { frequency: result.frequency, agents: result.agents, t: result.t, }, - } }) + }) }, } diff --git a/Observer/actions/system/utilities.js b/Observer/actions/system/utilities.js index 1949d7c..e51086c 100644 --- a/Observer/actions/system/utilities.js +++ b/Observer/actions/system/utilities.js @@ -1,48 +1,20 @@ -import { publishActionReply } from '../../actionsHelper.js' +import { replyToAction } from '../../../bus/publishActionReply.js' -export const construct = (redisCnx) => { -} - -export const methods = { +export const actions = { async action_RELOADCONFIG(action, payload, reqid, sender, roles) { - const replyOpts = { - action, - reqid, - sender, - replyChannel: this.config.observer.observerActionsReply, - } - if(!this.accessRights.canDo(roles, action)) { - publishActionReply(this, { ...replyOpts, reply: { - success: false, - err: 'Unauthorized action !', - } }) - return - } this.reloadAccessRights() - publishActionReply(this, { ...replyOpts, reply: { - success: true, - } }) + replyToAction(this, { action, reqid, sender, success: true }) }, async action_GETCONFIG(action, payload, reqid, sender, roles) { - const replyOpts = { + replyToAction(this, { action, reqid, sender, - replyChannel: this.config.observer.observerActionsReply, - } - if(!this.accessRights.canDo(roles, action)) { - publishActionReply(this, { ...replyOpts, reply: { - success: false, - err: 'Unauthorized action !', - } }) - return - } - publishActionReply(this, { ...replyOpts, reply: { success: true, payload: this.getAccessRights(), - } }) + }) }, } diff --git a/Observer/actionsHelper.js b/Observer/actionsHelper.js index 7573130..ea48492 100644 --- a/Observer/actionsHelper.js +++ b/Observer/actionsHelper.js @@ -1,20 +1,4 @@ -export function publishActionReply(redisCnx, options) { - const { - action, - reqid, - sender, - reply, - replyChannel, - senderId = 'observer', - } = options - reply.action = action - reply.sender = senderId - if(reqid) reply.reqid = reqid - const chan = replyChannel.replace(/\[UID\]/g, sender) - redisCnx.redisPublish(chan, reply) -} - export function parseSimTime(payload, fallbackFn) { if(payload?.t != null && typeof(payload.t) === 'number' && !Number.isNaN(payload.t)) return(payload.t) if(payload?.at != null && typeof(payload.at) === 'number' && !Number.isNaN(payload.at)) return(payload.at) diff --git a/Observer/observerServer.js b/Observer/observerServer.js index bb5f4fa..7371284 100644 --- a/Observer/observerServer.js +++ b/Observer/observerServer.js @@ -1,7 +1,7 @@ import { AccesRights } from '../accesRights.js' import { GpsStorageReader } from './gpsStorageReader.js' import { RequestorRegistry } from './requestorRegistry.js' -import { publishActionReply } from './actionsHelper.js' +import { replyToAction } from '../bus/publishActionReply.js' import { SimState } from '../GPS/simulationState.js' export class observerServer { @@ -61,14 +61,11 @@ export class observerServer { publishFrustumUpdate(sender, payload) { if(!this.systemCnx || !sender) return const observer = this.observerConfig.observer ?? {} - publishActionReply(this.systemCnx, { + replyToAction(this.systemCnx, { action: 'GETAGENTSINFRUSTUM', sender, - replyChannel: observer.observerActionsReply ?? 'system:replies:[UID]', - reply: { - success: true, - payload, - }, + success: true, + payload, }) } diff --git a/Observer/p42Observer.js b/Observer/p42Observer.js index d76ad4e..b723ff5 100644 --- a/Observer/p42Observer.js +++ b/Observer/p42Observer.js @@ -3,6 +3,7 @@ import yargs from 'yargs/yargs' import { hideBin } from 'yargs/helpers' import 'node:process' import { RedisConnexion } from '../redisConnexion.js' +import { busReplyRoute } from '../bus/publishActionReply.js' import { configHelper } from '../configHelper.js' import { observerServer } from './observerServer.js' import * as systemMesh from './actions/system/index.js' @@ -49,6 +50,7 @@ let cfgh = new configHelper({ function meshRedisConns(mesh, meshName, debug, rootConfig) { const { redis, ...meshConfig } = mesh + const busRoute = busReplyRoute(rootConfig.observer, meshName) return redis.map(cfg => new RedisConnexion({ debug, @@ -56,6 +58,8 @@ function meshRedisConns(mesh, meshName, debug, rootConfig) { redisId: cfg.redisId, meshName, meshModule: meshModules[meshName], + senderId: busRoute?.senderId, + actionsReply: busRoute?.actionsReply, }) ) } diff --git a/SimMaestro/actions/arena/arenaHandlers.js b/SimMaestro/actions/arena/arenaHandlers.js deleted file mode 100644 index 6f2d418..0000000 --- a/SimMaestro/actions/arena/arenaHandlers.js +++ /dev/null @@ -1,22 +0,0 @@ - -export const construct = (redisCnx) => { -} - -export const methods = { - - dispatchArenaMessage(msg, chan) { - const maestro = this.config.maestro - if(!maestro || !this.maestroSrv) return(false) - - if(this.matchesChan(chan, maestro.lifecycle?.godsReadyChannel ?? 'arena:gods:ready')) { - if(msg.eventType === 'readyToStart') { - this.maestroSrv.onReadyToStart(msg) - return(true) - } - } - - if(this.debug) console.log(`[${this.redisId}] Arena message (unhandled):`, msg.eventType, chan) - return(false) - }, - -} diff --git a/SimMaestro/actions/arena/dispatch.js b/SimMaestro/actions/arena/dispatch.js deleted file mode 100644 index a70d6d8..0000000 --- a/SimMaestro/actions/arena/dispatch.js +++ /dev/null @@ -1,5 +0,0 @@ - -export function dispatchMessage(redisCnx, msg, chan) { - if(typeof(redisCnx.dispatchArenaMessage) !== 'function') return - redisCnx.dispatchArenaMessage(msg, chan) -} diff --git a/SimMaestro/actions/arena/index.js b/SimMaestro/actions/arena/index.js deleted file mode 100644 index c0d66bc..0000000 --- a/SimMaestro/actions/arena/index.js +++ /dev/null @@ -1,12 +0,0 @@ -import { methods as arenaMethods, construct as arenaConstruct } from './arenaHandlers.js' -import { dispatchMessage } from './dispatch.js' - -export const afterLoginMethods = [ - arenaConstruct, -] - -export const meshActions = { - ...arenaMethods, -} - -export { dispatchMessage } diff --git a/SimMaestro/actions/system/dispatch.js b/SimMaestro/actions/system/dispatch.js deleted file mode 100644 index cc409c2..0000000 --- a/SimMaestro/actions/system/dispatch.js +++ /dev/null @@ -1,28 +0,0 @@ - -export function dispatchMessage(redisCnx, msg, chan) { - const maestro = redisCnx.config.maestro - if(!maestro?.maestroActionsChannel) return - - const actionsChan = redisCnx.fullChan(maestro.maestroActionsChannel) - if(chan != actionsChan) return - - const action = msg.action - if(!action || typeof(action) !== 'string') { - console.warn(`[${redisCnx.redisId}] Ignoring message without action on ${chan}`) - return - } - - const handler = redisCnx['action_'+action] - if(typeof(handler) != 'function') { - if(redisCnx.debug) console.warn(`[${redisCnx.redisId}] Unknown action ${action} on ${chan}`) - return - } - - const payload = ('payload' in msg) ? msg.payload : null - const reqid = ('reqid' in msg) ? msg.reqid.substr(0, 50) : null - const sender = msg.sender || null - const roles = Array.isArray(msg.roles) ? msg.roles : ['*'] - - if(redisCnx.debug) console.log(`[${redisCnx.redisId}] Dispatching action ${action} from ${sender}`) - handler.call(redisCnx, action, payload, reqid, sender, roles) -} diff --git a/SimMaestro/actions/system/index.js b/SimMaestro/actions/system/index.js deleted file mode 100644 index e42a00f..0000000 --- a/SimMaestro/actions/system/index.js +++ /dev/null @@ -1,14 +0,0 @@ -import { methods as utilities, construct as utilitiesConstruct } from './utilities.js' -import { methods as simulation } from './simulation.js' -import { dispatchMessage } from './dispatch.js' - -export const afterLoginMethods = [ - utilitiesConstruct, -] - -export const meshActions = { - ...utilities, - ...simulation, -} - -export { dispatchMessage } diff --git a/SimMaestro/actions/system/simulation.js b/SimMaestro/actions/system/simulation.js deleted file mode 100644 index 90e5ed0..0000000 --- a/SimMaestro/actions/system/simulation.js +++ /dev/null @@ -1,138 +0,0 @@ -import { publishActionReply } from '../../actionsHelper.js' -import { isValidUuid } from '../../simRepository.js' - -export const methods = { - - /* Event-Rx: - { - "action": "STARTSIMULATION", - "reqid": "6az5e4r6a", - "sender": "", - "roles": ["*"], - "payload": { - "simulationUuid": "...", - "keyframeId": "...", - "infraId": "..." - } - } - */ - async action_STARTSIMULATION(action, payload, reqid, sender, roles) { - const replyOpts = { - action, - reqid, - sender, - replyChannel: this.config.maestro.maestroActionsReply, - } - if(!this.accessRights.canDo(roles, action)) { - publishActionReply(this, { ...replyOpts, reply: { - success: false, - err: 'Unauthorized action !', - } }) - return - } - - if(!sender || !isValidUuid(sender)) { - publishActionReply(this, { ...replyOpts, reply: { - success: false, - err: 'Missing or invalid sender (user UUID)', - } }) - return - } - - if(!payload?.simulationUuid || !payload?.keyframeId) { - publishActionReply(this, { ...replyOpts, reply: { - success: false, - err: 'Missing simulationUuid or keyframeId', - } }) - return - } - - try { - const result = await this.maestroSrv.startSimulation(sender, payload) - if(!result.ok) { - publishActionReply(this, { ...replyOpts, reply: { - success: false, - err: result.err, - } }) - return - } - publishActionReply(this, { ...replyOpts, reply: { - success: true, - payload: { - simulationId: result.simulationId, - keyframeId: result.keyframeId, - infraId: result.infraId, - agentIds: result.agentIds, - }, - } }) - } catch(err) { - console.error(`[${this.redisId}] STARTSIMULATION failed:`, err) - publishActionReply(this, { ...replyOpts, reply: { - success: false, - err: err.message ?? 'STARTSIMULATION failed', - } }) - } - }, - - /* Event-Rx: - { - "action": "STOPSIMULATION", - "reqid": "6az5e4r6a", - "sender": "", - "payload": { "simulationUuid": "..." } - } - */ - async action_STOPSIMULATION(action, payload, reqid, sender, roles) { - const replyOpts = { - action, - reqid, - sender, - replyChannel: this.config.maestro.maestroActionsReply, - } - if(!this.accessRights.canDo(roles, action)) { - publishActionReply(this, { ...replyOpts, reply: { - success: false, - err: 'Unauthorized action !', - } }) - return - } - - if(!sender || !isValidUuid(sender)) { - publishActionReply(this, { ...replyOpts, reply: { - success: false, - err: 'Missing or invalid sender (user UUID)', - } }) - return - } - - if(!payload?.simulationUuid) { - publishActionReply(this, { ...replyOpts, reply: { - success: false, - err: 'Missing simulationUuid', - } }) - return - } - - try { - const result = await this.maestroSrv.stopSimulation(sender, payload) - if(!result.ok) { - publishActionReply(this, { ...replyOpts, reply: { - success: false, - err: result.err, - } }) - return - } - publishActionReply(this, { ...replyOpts, reply: { - success: true, - payload: { simulationId: result.simulationId }, - } }) - } catch(err) { - console.error(`[${this.redisId}] STOPSIMULATION failed:`, err) - publishActionReply(this, { ...replyOpts, reply: { - success: false, - err: err.message ?? 'STOPSIMULATION failed', - } }) - } - }, - -} diff --git a/SimMaestro/actions/system/utilities.js b/SimMaestro/actions/system/utilities.js deleted file mode 100644 index ee7d2fa..0000000 --- a/SimMaestro/actions/system/utilities.js +++ /dev/null @@ -1,48 +0,0 @@ -import { publishActionReply } from '../../actionsHelper.js' - -export const construct = (redisCnx) => { -} - -export const methods = { - - async action_RELOADCONFIG(action, payload, reqid, sender, roles) { - const replyOpts = { - action, - reqid, - sender, - replyChannel: this.config.maestro.maestroActionsReply, - } - if(!this.accessRights.canDo(roles, action)) { - publishActionReply(this, { ...replyOpts, reply: { - success: false, - err: 'Unauthorized action !', - } }) - return - } - this.reloadAccessRights() - publishActionReply(this, { ...replyOpts, reply: { - success: true, - } }) - }, - - async action_GETCONFIG(action, payload, reqid, sender, roles) { - const replyOpts = { - action, - reqid, - sender, - replyChannel: this.config.maestro.maestroActionsReply, - } - if(!this.accessRights.canDo(roles, action)) { - publishActionReply(this, { ...replyOpts, reply: { - success: false, - err: 'Unauthorized action !', - } }) - return - } - publishActionReply(this, { ...replyOpts, reply: { - success: true, - payload: this.getAccessRights(), - } }) - }, - -} diff --git a/SimMaestro/actionsHelper.js b/SimMaestro/actionsHelper.js deleted file mode 100644 index e7701fa..0000000 --- a/SimMaestro/actionsHelper.js +++ /dev/null @@ -1,16 +0,0 @@ - -export function publishActionReply(redisCnx, options) { - const { - action, - reqid, - sender, - reply, - replyChannel, - senderId = 'maestro', - } = options - reply.action = action - reply.sender = senderId - if(reqid) reply.reqid = reqid - const chan = replyChannel.replace(/\[UID\]/g, sender) - redisCnx.redisPublish(chan, reply) -} diff --git a/bus/assembleMesh.js b/bus/assembleMesh.js new file mode 100644 index 0000000..02faece --- /dev/null +++ b/bus/assembleMesh.js @@ -0,0 +1,40 @@ +import { dispatchActions } from './dispatchActions.js' +import { dispatchEvents } from './dispatchEvents.js' + +export function assembleHandlers(modules) { + const actions = {} + const tree = {} + const afterLogin = [] + + for(const mod of modules) { + if(mod.actions) Object.assign(actions, mod.actions) + if(typeof(mod.construct) === 'function') afterLogin.push(mod.construct) + if(!mod.eventHandlers) continue + for(const [channelPattern, byType] of Object.entries(mod.eventHandlers)) { + if(!tree[channelPattern]) tree[channelPattern] = {} + for(const [eventType, handler] of Object.entries(byType)) { + if(!tree[channelPattern][eventType]) tree[channelPattern][eventType] = [] + const list = Array.isArray(handler) ? handler : [handler] + tree[channelPattern][eventType].push(...list) + } + } + } + + return({ + actionHandlers: actions, + eventHandlers: tree, + afterLogin, + }) +} + +export function createDispatchMessage({ eventHandlers, actionRules }) { + return(async function dispatchMessage(redisCnx, msg, chan) { + if(msg.action && msg.eventType) { + console.warn(`[${redisCnx.redisId}] Message has both action and eventType on ${chan}`) + return(false) + } + if(msg.action) return(dispatchActions(redisCnx, msg, chan, actionRules(redisCnx))) + if(msg.eventType) return(dispatchEvents(redisCnx, msg, chan, eventHandlers)) + return(false) + }) +} diff --git a/bus/dispatchActions.js b/bus/dispatchActions.js new file mode 100644 index 0000000..99718c9 --- /dev/null +++ b/bus/dispatchActions.js @@ -0,0 +1,78 @@ +import { replyToAction } from './publishActionReply.js' + +function matchesActionsChannel(redisCnx, chan, channels) { + if(!Array.isArray(channels) || !channels.length) return(false) + for(const configured of channels) { + if(!configured) continue + if(redisCnx.fullChan(configured) === chan) return(true) + } + return(false) +} + +export async function dispatchActions(redisCnx, msg, chan, rules) { + if(!matchesActionsChannel(redisCnx, chan, rules.channels)) return(false) + + const action = msg.action + const sender = msg.sender ?? null + const reqid = ('reqid' in msg) ? msg.reqid.substr(0, 50) : null + const roles = Array.isArray(msg.roles) ? msg.roles : ['*'] + + if(!action || typeof(action) !== 'string') { + if(!sender) return(true) + replyToAction(redisCnx, { + action, + reqid, + sender, + success: false, + err: 'Missing or invalid action', + }) + return(true) + } + + if(!sender) { + console.warn(`[${redisCnx.redisId}] Action ${action} without sender on ${chan}`) + return(true) + } + + if(redisCnx.accessRights && !redisCnx.accessRights.canDo(roles, action, sender)) { + replyToAction(redisCnx, { + action, + reqid, + sender, + success: false, + err: 'Unauthorized action !', + }) + return(true) + } + + const handler = redisCnx['action_'+action] + if(typeof(handler) !== 'function') { + replyToAction(redisCnx, { + action, + reqid, + sender, + success: false, + err: `Unknown action: ${action}`, + }) + return(true) + } + + if(redisCnx.debug) { + console.log(`[${redisCnx.redisId}] Dispatching action ${action} from ${sender}`) + } + + try { + await handler.call(redisCnx, action, ('payload' in msg) ? msg.payload : null, reqid, sender, roles) + } catch(err) { + console.error(`[${redisCnx.redisId}] Action ${action} failed:`, err) + replyToAction(redisCnx, { + action, + reqid, + sender, + success: false, + err: err.message ?? `${action} failed`, + }) + } + + return(true) +} diff --git a/bus/dispatchEvents.js b/bus/dispatchEvents.js new file mode 100644 index 0000000..14b6485 --- /dev/null +++ b/bus/dispatchEvents.js @@ -0,0 +1,33 @@ + +export function dispatchEvents(redisCnx, msg, chan, eventHandlers) { + const eventType = msg.eventType + if(!eventType || typeof(eventType) !== 'string') return(false) + + let handled = false + + for(const [channelPattern, byType] of Object.entries(eventHandlers ?? {})) { + if(!redisCnx.matchesChan(chan, channelPattern)) continue + + const handlers = byType[eventType] + if(!handlers?.length) continue + + for(const handle of handlers) { + try { + handle.call(redisCnx, msg, chan) + } catch(err) { + console.error( + `[${redisCnx.redisId}] Event ${eventType} on ${chan} failed:`, + err + ) + } + } + + handled = true + } + + if(!handled && redisCnx.debug) { + console.log(`[${redisCnx.redisId}] Unhandled event ${eventType} on ${chan}`) + } + + return(handled) +} diff --git a/bus/publishActionReply.js b/bus/publishActionReply.js new file mode 100644 index 0000000..3c13328 --- /dev/null +++ b/bus/publishActionReply.js @@ -0,0 +1,69 @@ + +export function busReplyRoute(daemonBlock, meshName) { + if(!daemonBlock?.senderId) return(null) + + const onArena = meshName === 'arena' + const systemReply = daemonBlock.maestroActionsReply + ?? daemonBlock.gpsActionsReply + ?? daemonBlock.observerActionsReply + const actionsReply = onArena + ? (daemonBlock.bus?.arena?.actionsReply ?? systemReply) + : systemReply + + if(!actionsReply) return(null) + + return({ + senderId: daemonBlock.senderId, + actionsReply, + }) +} + +export function publishActionReply(redisCnx, options) { + const { + action, + reqid, + sender, + reply, + replyChannel, + senderId, + } = options + reply.action = action + reply.sender = senderId + if(reqid) reply.reqid = reqid + const chan = replyChannel.replace(/\[UID\]/g, sender) + redisCnx.redisPublish(chan, reply) +} + +export function replyToAction(redisCnx, options) { + const { + action, + reqid, + sender, + success, + payload, + err, + replyChannel, + senderId, + } = options + + const routeReplyChannel = replyChannel ?? redisCnx.actionsReply + const routeSenderId = senderId ?? redisCnx.senderId + + if(!routeReplyChannel || !routeSenderId) { + console.error(`[${redisCnx.redisId}] Cannot resolve action reply route`) + return + } + + const reply = { success } + if(err != null) reply.err = err + if(payload !== undefined) reply.payload = payload + + publishActionReply(redisCnx, { + action, + reqid, + sender, + replyChannel: routeReplyChannel, + senderId: routeSenderId, + reply, + }) +} diff --git a/config.json b/config.json index f715b78..a4780e4 100644 --- a/config.json +++ b/config.json @@ -26,6 +26,7 @@ } ], "gps": { + "primordialDaemon": true, "gpsActionsChannel": "system:requests:gps", "gpsActionsReply": "system:replies:[UID]", "GPSstorage": { @@ -58,7 +59,6 @@ "arenaChannel": "arena:lifecycle", "godsReadyChannel": "arena:gods:ready" }, - "expectedGods": ["gps"], "readyTimeoutMs": 30000 }, "mysql": { @@ -67,6 +67,7 @@ "simDatabase": "p42SIM" }, "observer": { + "primordialDaemon": false, "observerActionsChannel": "system:requests:observer", "observerActionsReply": "system:replies:[UID]", "senderId": "observer", diff --git a/configSchema.json b/configSchema.json index 0596127..4a0e50b 100644 --- a/configSchema.json +++ b/configSchema.json @@ -60,6 +60,7 @@ "gps": { "type": "object", "properties": { + "primordialDaemon": { "type": "boolean" }, "gpsActionsChannel": { "type": "string" }, "gpsActionsReply": { "type": "string" }, "GPSstorage": { @@ -131,10 +132,6 @@ "godsReadyChannel" ] }, - "expectedGods": { - "type": "array", - "items": { "type": "string" } - }, "readyTimeoutMs": { "type": "integer", "minimum": 1000 } }, "required": [ @@ -157,6 +154,7 @@ "observer": { "type": "object", "properties": { + "primordialDaemon": { "type": "boolean" }, "observerActionsChannel": { "type": "string" }, "observerActionsReply": { "type": "string" }, "senderId": { "type": "string" }, diff --git a/package.json b/package.json index 8ffd89f..d9ee056 100644 --- a/package.json +++ b/package.json @@ -5,6 +5,7 @@ "@p42/p42modules": "^0.1.0", "ajv": "^8.12.0", "redis": "^4.3.0", + "uuid": "^14.0.0", "yargs": "^17.7.2" } } diff --git a/redisConnexion.js b/redisConnexion.js index 2ed8346..fac0d28 100644 --- a/redisConnexion.js +++ b/redisConnexion.js @@ -9,9 +9,11 @@ export class RedisConnexion { this.redisConfig = this.config this.meshName = options.meshName this.meshModule = options.meshModule ?? null + this.senderId = options.senderId ?? null + this.actionsReply = options.actionsReply ?? null - if(this.meshModule?.meshActions) Object.assign(this, this.meshModule.meshActions) - this.afterLoginMethods = this.meshModule?.afterLoginMethods ?? [] + if(this.meshModule?.actionHandlers) Object.assign(this, this.meshModule.actionHandlers) + this.afterLogin = this.meshModule?.afterLogin ?? [] this.redisClient = redis.createClient({ socket: { @@ -56,7 +58,7 @@ export class RedisConnexion { console.log(`[${this.redisConfig.redisId}] Redis ${this.redisConfig.redisId} time:`, redisTime) } - for(const method of this.afterLoginMethods){ + for(const method of this.afterLogin){ if(typeof method != 'function') continue method(this) } @@ -282,7 +284,11 @@ export class RedisConnexion { } if(typeof(this.meshModule?.dispatchMessage) === 'function') { - this.meshModule.dispatchMessage(this, msg, chan) + try { + await this.meshModule.dispatchMessage(this, msg, chan) + } catch(err) { + console.error(`[${this.redisConfig.redisId}] dispatchMessage failed on ${chan}:`, err) + } } } diff --git a/tests/commands.txt b/tests/commands.txt new file mode 100644 index 0000000..88433c0 --- /dev/null +++ b/tests/commands.txt @@ -0,0 +1,4 @@ +clear; node test.js --guiDatabase test_p42GUI --simDatabase test_p42SIM maestro1 --userUuid a4f33373-6adf-4d2d-9a6d-7fa0abf8b01f --simulationUuid 0x019ec742e12175c685a97bf9300b6b49 + + + diff --git a/tests/modules/maestro1.js b/tests/modules/maestro1.js index f51b61e..5095951 100644 --- a/tests/modules/maestro1.js +++ b/tests/modules/maestro1.js @@ -1,17 +1,35 @@ import { MySQLClient } from '@p42/p42modules' -import { SimRepository } from '../../SimMaestro/simRepository.js' +import { SimRepository } from '../../Maestro/simRepository.js' function agentHashKey(template, agentId) { return(template.replace(/\[UID\]/g, agentId)) } -function parseJsonField(raw) { +function hashFieldValue(raw) { if(raw == null) return(null) - if(typeof(raw) === 'object') return(raw) + if(typeof(raw) !== 'string') return(raw) try { return(JSON.parse(raw)) } - catch { return(null) } + catch { return(raw) } } +function valuesEqual(a, b) { + if(a === b) return(true) + if(a == null || b == null) return(false) + if(typeof(a) !== 'object' && typeof(b) !== 'object') return(a == b) + if(typeof(a) !== typeof(b)) return(false) + if(typeof(a) !== 'object') return(false) + if(Array.isArray(a) !== Array.isArray(b)) return(false) + const keysA = Object.keys(a) + const keysB = Object.keys(b) + if(keysA.length !== keysB.length) return(false) + for(const key of keysA) { + if(!valuesEqual(a[key], b[key])) return(false) + } + return(true) +} + +const RESERVED_HASH_FIELDS = new Set(['position', 'vector', 'speed', 'segment']) + function vectorsEqual(a, b) { for(const axis of ['x', 'y', 'z']) { if(a[axis] !== b[axis]) return(false) @@ -23,13 +41,12 @@ async function findSimulationFixture(ctx) { const { guiDatabase, simDatabase } = ctx.databases const rows = await MySQLClient.poolExecute(ctx.db, ` SELECT u.usr_uuid AS user_uuid, - BIN_TO_UUID(s.sim_uuid) AS simulation_uuid, - BIN_TO_UUID(s.sim_root_kf_uuid) AS keyframe_id + BIN_TO_UUID(s.sim_uuid) AS simulation_uuid FROM \`${guiDatabase}\`.users u INNER JOIN \`${guiDatabase}\`.simowners o ON o.own_usr_id = u.usr_id INNER JOIN \`${simDatabase}\`.simulations s ON o.own_sim_uuid = s.sim_uuid INNER JOIN \`${simDatabase}\`.edited_kf_store ekfs ON ekfs.ekfs_ekf_uuid = s.sim_root_kf_uuid - GROUP BY u.usr_uuid, s.sim_uuid, s.sim_root_kf_uuid + GROUP BY u.usr_uuid, s.sim_uuid HAVING COUNT(ekfs.ekfs_agent_id) > 0 LIMIT 1 `) @@ -37,14 +54,13 @@ async function findSimulationFixture(ctx) { if(!rows.length) { throw(new Error( 'No simulation fixture found in MySQL (need user-owned sim with root keyframe agents). ' - + 'Pass --userUuid, --simulationUuid, --keyframeId explicitly, or use --guiDatabase / --simDatabase for a test DB.' + + 'Pass --userUuid, --simulationUuid explicitly, or use --guiDatabase / --simDatabase for a test DB.' )) } return({ userUuid: rows[0].user_uuid, simulationUuid: rows[0].simulation_uuid, - keyframeId: rows[0].keyframe_id, }) } @@ -71,15 +87,11 @@ function waitForLifecycleEvent(ctx, eventType, timeoutMs) { export function configureYargs(yargsBuilder) { return(yargsBuilder.options({ userUuid: { - describe: 'User UUID to send STARTSIMULATION as (auto-discovered if omitted)', + describe: 'User UUID (dashed or 0x-prefixed hex; auto-discovered if omitted)', type: 'string', }, simulationUuid: { - describe: 'Simulation UUID (auto-discovered if omitted)', - type: 'string', - }, - keyframeId: { - describe: 'Root keyframe UUID (auto-discovered if omitted)', + describe: 'Simulation UUID (dashed or 0x-prefixed hex; auto-discovered if omitted)', type: 'string', }, timeout: { @@ -91,32 +103,32 @@ export function configureYargs(yargsBuilder) { } export async function run(ctx) { - const { log, argv, config, systemCnx, arenaCnx } = ctx + const { log, argv, config, systemCnx, arenaCnx, normalizeUuid } = ctx const arenaStorage = config.gps?.arenaStorage ?? { agentHashKey: 'arena:agents:[UID]', agentsIndexKey: 'arena:agents', } log('action', 'Resolving simulation fixture from MySQL...') - let userUuid = argv.userUuid - let simulationUuid = argv.simulationUuid - let keyframeId = argv.keyframeId + let userUuid = argv.userUuid ? normalizeUuid(argv.userUuid) : undefined + let simulationUuid = argv.simulationUuid ? normalizeUuid(argv.simulationUuid) : undefined - if(!userUuid || !simulationUuid || !keyframeId) { + if(!userUuid || !simulationUuid) { const fixture = await findSimulationFixture(ctx) userUuid = userUuid ?? fixture.userUuid simulationUuid = simulationUuid ?? fixture.simulationUuid - keyframeId = keyframeId ?? fixture.keyframeId } log('action', `User: ${userUuid}`) log('action', `Simulation: ${simulationUuid}`) - log('action', `Keyframe: ${keyframeId}`) const simRepo = new SimRepository(ctx.db, ctx.databases, false) - const access = await simRepo.validateSimulationAccess(userUuid, simulationUuid, keyframeId) + const access = await simRepo.validateSimulationAccess(userUuid, simulationUuid) if(!access.ok) throw(new Error(`Simulation access check failed: ${access.err}`)) + const keyframeId = access.sim.sim_root_kf_uuid + log('action', `Root keyframe: ${keyframeId}`) + const agentsResult = await simRepo.loadKeyframeAgents(keyframeId) if(!agentsResult.ok) throw(new Error(`Failed to load keyframe agents: ${agentsResult.err}`)) if(!agentsResult.agents.length) throw(new Error('Keyframe has no agents with valid GPS values')) @@ -137,7 +149,6 @@ export async function run(ctx) { roles: ['*'], payload: { simulationUuid, - keyframeId, infraId: null, }, }) @@ -174,8 +185,8 @@ export async function run(ctx) { const hash = await arenaCnx.redisHgetall(key) const expected = expectedById.get(agentId) - const position = parseJsonField(hash.position) - const vector = parseJsonField(hash.vector) + const position = hashFieldValue(hash.position) + const vector = hashFieldValue(hash.vector) if(!position || !vector) { throw(new Error(`Agent ${agentId}: missing position or vector in arena hash ${key}`)) @@ -189,8 +200,23 @@ export async function run(ctx) { throw(new Error(`Agent ${agentId}: vector mismatch (MySQL vs arena store)`)) } - log('success', `Agent ${agentId}: position and vector match MySQL`) + const storeExpected = expected.store ?? {} + for(const [field, expVal] of Object.entries(storeExpected)) { + const actualVal = hashFieldValue(hash[field]) + if(!valuesEqual(actualVal, expVal)) { + throw(new Error(`Agent ${agentId}: store field "${field}" mismatch (MySQL vs arena store)`)) + } + } + + for(const field of Object.keys(hash)) { + if(RESERVED_HASH_FIELDS.has(field)) continue + if(!(field in storeExpected)) { + throw(new Error(`Agent ${agentId}: unexpected store field "${field}" in arena hash ${key}`)) + } + } + + log('success', `Agent ${agentId}: position, vector, and store values match MySQL`) } - log('success', 'Arena store seeded correctly from MySQL (Maestro will stall waiting for readyToStart without GPS)') + log('success', 'Arena store seeded correctly from MySQL (Maestro will wait for agent + primordial daemon readyToStart until timeout)') } diff --git a/tests/test.js b/tests/test.js index ad84e88..26b37bf 100644 --- a/tests/test.js +++ b/tests/test.js @@ -2,12 +2,40 @@ import yargs from 'yargs/yargs' import { hideBin } from 'yargs/helpers' import { pathToFileURL } from 'url' import { fileURLToPath } from 'url' -import { dirname, join } from 'path' +import { dirname, join, resolve } from 'path' import { RedisConnexion } from '../redisConnexion.js' import { configHelper } from '../configHelper.js' import { MySQLClient } from '@p42/p42modules' const __dirname = dirname(fileURLToPath(import.meta.url)) +const __filename = fileURLToPath(import.meta.url) + +const UUID_DASHED_RE = /^[0-9a-f]{8}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{12}$/i +const UUID_HEX32_RE = /^[0-9a-f]{32}$/i + +export function normalizeUuid(value) { + if(typeof(value) !== 'string') { + throw(new Error('UUID must be a string')) + } + + const trimmed = value.trim() + if(!trimmed) throw(new Error('UUID must be a non-empty string')) + + if(UUID_DASHED_RE.test(trimmed)) { + return(trimmed.toLowerCase()) + } + + let hex = trimmed + if(/^0x/i.test(hex)) hex = hex.slice(2) + if(!UUID_HEX32_RE.test(hex)) { + throw(new Error( + `Invalid UUID: ${value} (expected dashed form or 32-char hex, optionally prefixed with 0x)` + )) + } + + hex = hex.toLowerCase() + return(`${hex.slice(0, 8)}-${hex.slice(8, 12)}-${hex.slice(12, 16)}-${hex.slice(16, 20)}-${hex.slice(20)}`) +} const LOG_COLORS = { action: '\x1b[37m', @@ -145,6 +173,7 @@ async function main() { systemCnx, arenaCnx, log, + normalizeUuid, onArenaMessage(handler) { arenaHandlers.add(handler) }, @@ -173,7 +202,11 @@ async function main() { process.exit(exitCode) } -main().catch(err => { - console.error(`${LOG_COLORS.error}${err.message ?? err}${LOG_COLORS.reset}`) - process.exit(1) -}) +const isMain = process.argv[1] && resolve(__filename) === resolve(process.argv[1]) + +if(isMain) { + main().catch(err => { + console.error(`${LOG_COLORS.error}${err.message ?? err}${LOG_COLORS.reset}`) + process.exit(1) + }) +} diff --git a/tests/uuid.js b/tests/uuid.js new file mode 100644 index 0000000..cdc580e --- /dev/null +++ b/tests/uuid.js @@ -0,0 +1,2 @@ +import { v7 as uuidv7 } from 'uuid' +console.log(uuidv7().replaceAll('-', ''))