From a1dba5060accec798c862824fd5d5e7810ec6069 Mon Sep 17 00:00:00 2001 From: STEINNI Date: Sat, 27 Jun 2026 17:24:41 +0000 Subject: [PATCH] tons of cursor-shit cleaning, finished implementing cnxId in observer --- GPS/gpsServer.js | 60 ++++++++------------------- GPS/handlers/arena/agentMotion.js | 4 +- GPS/handlers/arena/index.js | 2 +- GPS/handlers/arena/lifecycle.js | 12 +++--- GPS/handlers/system/index.js | 2 +- GPS/handlers/system/utilities.js | 10 +++-- Maestro/handlers/arena/index.js | 2 +- Maestro/handlers/arena/prepare.js | 2 +- Maestro/handlers/system/index.js | 2 +- Maestro/handlers/system/simulation.js | 34 ++++++++------- Maestro/handlers/system/utilities.js | 7 ++-- Maestro/maestroServer.js | 56 ++++++++----------------- Observer/handlers/arena/index.js | 2 +- Observer/handlers/arena/lifecycle.js | 6 +-- Observer/handlers/system/index.js | 2 +- Observer/handlers/system/positions.js | 43 +++++++++++-------- Observer/handlers/system/utilities.js | 7 ++-- Observer/observerServer.js | 42 ++++++------------- Untitled | 1 + bus/assembleMesh.js | 7 +++- bus/dispatchActions.js | 9 +++- bus/dispatchEvents.js | 4 +- bus/publishActionReply.js | 17 +++----- config.json | 30 ++++++++------ configSchema.json | 39 +++++++++++------ config_test.json | 30 ++++++++------ redisConnexion.js | 3 ++ tests/modules/maestro1.js | 2 +- 28 files changed, 213 insertions(+), 224 deletions(-) create mode 100644 Untitled diff --git a/GPS/gpsServer.js b/GPS/gpsServer.js index b4539d5..1adfea6 100644 --- a/GPS/gpsServer.js +++ b/GPS/gpsServer.js @@ -14,10 +14,11 @@ export class gpsServer { constructor(configHelper, allRediscnx, debug) { this.configHelper = configHelper - this.gpsConfig = configHelper.config + this.rootConfig = configHelper.config + this.gpsConfig = configHelper.config.gps ?? {} this.allRediscnx = allRediscnx this.debug = debug - this.accessRights = new AccesRights(this.gpsConfig, debug) + this.accessRights = new AccesRights(this.rootConfig, debug) this.agents = new Map() this.registry = new CollisionRegistry() this.arenaCnx = null @@ -33,33 +34,6 @@ export class gpsServer { this.ignoredChangeCount = 0 } - getGpsSettings() { - const gps = this.gpsConfig.gps ?? {} - return({ - nearMissDistance: gps.nearMissDistance ?? 1, - prismTimeHeight: gps.prismTimeHeight ?? 60, - collisionTickMs: gps.collisionTickMs ?? 100, - prismRefreshLeadSeconds: gps.prismRefreshLeadSeconds ?? 1, - }) - } - - getLifecycleSettings() { - const gps = this.gpsConfig.gps ?? {} - return({ - arenaChannel: gps.lifecycle?.arenaChannel ?? 'arena:lifecycle', - godsReadyChannel: gps.lifecycle?.godsReadyChannel ?? 'arena:gods:ready', - senderId: gps.senderId ?? 'gps', - }) - } - - getArenaStorageSettings() { - const gps = this.gpsConfig.gps ?? {} - return({ - agentHashKey: gps.arenaStorage?.agentHashKey ?? 'arena:agents:[UID]', - agentsIndexKey: gps.arenaStorage?.agentsIndexKey ?? 'arena:agents', - }) - } - isLive() { return(this.state === SimState.LIVE) } @@ -103,7 +77,7 @@ export class gpsServer { } initAgentStore() { - const gpsStorage = this.gpsConfig.gps?.GPSstorage + const gpsStorage = this.gpsConfig.GPSstorage if(gpsStorage && this.systemCnx) { this.agentStore = new AgentStore(this.systemCnx, gpsStorage, this.debug) } @@ -114,7 +88,10 @@ export class gpsServer { this.arenaCnxs.push(cnx) if(!this.arenaCnx || cnx.redisConfig.role === 'primary') { this.arenaCnx = cnx - this.arenaLoader = new ArenaAgentLoader(cnx, this.getArenaStorageSettings(), this.debug) + const arenaStorage = this.gpsConfig.arenaStorage + if(arenaStorage) { + this.arenaLoader = new ArenaAgentLoader(cnx, arenaStorage, this.debug) + } } } @@ -194,7 +171,7 @@ export class gpsServer { } runInitialPairScan() { - const { nearMissDistance, prismTimeHeight } = this.getGpsSettings() + const { nearMissDistance, prismTimeHeight } = this.gpsConfig const ids = [...this.agents.keys()] for(let i = 0; i < ids.length; i++) { for(let j = i + 1; j < ids.length; j++) { @@ -213,10 +190,9 @@ export class gpsServer { async publishReadyToStart(result) { if(!this.arenaCnx) return - const { godsReadyChannel, senderId } = this.getLifecycleSettings() - await this.arenaCnx.redisPublish(godsReadyChannel, { + await this.arenaCnx.redisPublish(this.gpsConfig.lifecycle.godsReadyChannel, { eventType: 'readyToStart', - sender: senderId, + sender: this.gpsConfig.senderId, payload: { success: result.success, simulationId: this.simulationId, @@ -370,7 +346,7 @@ export class gpsServer { const agent = this.agents.get(agentId) if(!agent) return(false) - const { prismTimeHeight, prismRefreshLeadSeconds } = this.getGpsSettings() + const { prismTimeHeight, prismRefreshLeadSeconds } = this.gpsConfig const now = this.now() if(!needsPrismRefresh(agent, now, prismTimeHeight, prismRefreshLeadSeconds)) return(false) @@ -390,7 +366,7 @@ export class gpsServer { const changed = this.agents.get(changedAgentId) if(!changed) return([]) - const { nearMissDistance, prismTimeHeight } = this.getGpsSettings() + const { nearMissDistance, prismTimeHeight } = this.gpsConfig const now = this.now() const hits = [] for(const [otherId, other] of this.agents) { @@ -459,14 +435,14 @@ export class gpsServer { async publishProximityBatch(targetAgentId, pairs) { if(!this.arenaCnx || !pairs.length) return - const chan = this.arenaCnx.config.gps.collisionsChannel.replace(/\[UID\]/g, targetAgentId) + const chan = this.gpsConfig.collisionsChannel.replace(/\[UID\]/g, targetAgentId) await this.arenaCnx.redisPublish(chan, { eventType: 'proximity', payload: { pairs, simulationId: this.simulationId, }, - sender: this.getLifecycleSettings().senderId, + sender: this.gpsConfig.senderId, }) } @@ -491,12 +467,12 @@ export class gpsServer { async reloadAccessRights() { await this.configHelper.refreshAccessRights() - this.gpsConfig.accessRights = this.configHelper.config.accessRights - this.accessRights.refreshAccessRights(this.gpsConfig) + this.rootConfig.accessRights = this.configHelper.config.accessRights + this.accessRights.refreshAccessRights(this.rootConfig) } getAccessRights() { - return(this.gpsConfig.accessRights) + return(this.rootConfig.accessRights) } } diff --git a/GPS/handlers/arena/agentMotion.js b/GPS/handlers/arena/agentMotion.js index 8815851..442a5de 100644 --- a/GPS/handlers/arena/agentMotion.js +++ b/GPS/handlers/arena/agentMotion.js @@ -1,7 +1,7 @@ export const eventHandlers = { 'arena:agents:*': { - change(msg, chan) { + change(msg, chan, sender, cnxId) { const agentId = msg.sender if(!agentId || typeof(agentId) !== 'string') { console.warn(`[${this.redisId}] Agent event without sender`) @@ -15,7 +15,7 @@ export const eventHandlers = { const newPosition = msg.payload?.newPosition ?? null this.gpsSrv?.onVectorChange(agentId, newVector, newPosition) }, - remove(msg, chan) { + remove(msg, chan, sender, cnxId) { const agentId = msg.sender if(!agentId || typeof(agentId) !== 'string') { console.warn(`[${this.redisId}] Agent event without sender`) diff --git a/GPS/handlers/arena/index.js b/GPS/handlers/arena/index.js index 5bbf1ad..a3b8fff 100644 --- a/GPS/handlers/arena/index.js +++ b/GPS/handlers/arena/index.js @@ -10,7 +10,7 @@ export const dispatchMessage = createDispatchMessage({ eventHandlers, actionRules(redisCnx) { const gps = redisCnx.config.gps ?? {} - const arenaChannel = gps.bus?.arena?.actionsChannel + const arenaChannel = gps.arenaActionsChannel return({ channels: arenaChannel ? [arenaChannel] : [], }) diff --git a/GPS/handlers/arena/lifecycle.js b/GPS/handlers/arena/lifecycle.js index 2a40885..e55bc50 100644 --- a/GPS/handlers/arena/lifecycle.js +++ b/GPS/handlers/arena/lifecycle.js @@ -1,6 +1,6 @@ export function construct(redisCnx) { - const tickMs = redisCnx.gpsSrv?.getGpsSettings().collisionTickMs ?? 100 + const tickMs = redisCnx.gpsSrv?.gpsConfig.collisionTickMs ?? 100 setInterval(() => { redisCnx.gpsSrv?.tickArena() }, tickMs) @@ -8,7 +8,7 @@ export function construct(redisCnx) { export const eventHandlers = { 'arena:lifecycle': { - onYourMarks(msg, chan) { + onYourMarks(msg, chan, sender, cnxId) { const srv = this.gpsSrv if(!srv) return srv.onYourMarks(msg.payload ?? {}).catch(err => { @@ -16,16 +16,16 @@ export const eventHandlers = { srv.publishReadyToStart({ success: false, err: err.message ?? 'onYourMarks failed' }) }) }, - bigBang(msg, chan) { + bigBang(msg, chan, sender, cnxId) { this.gpsSrv?.onBigBang(msg.payload ?? {}) }, - simulationPaused(msg, chan) { + simulationPaused(msg, chan, sender, cnxId) { this.gpsSrv?.onSimulationPaused(msg.payload ?? {}) }, - simulationResumed(msg, chan) { + simulationResumed(msg, chan, sender, cnxId) { this.gpsSrv?.onSimulationResumed(msg.payload ?? {}) }, - simulationStopped(msg, chan) { + simulationStopped(msg, chan, sender, cnxId) { const srv = this.gpsSrv if(!srv) return srv.onSimulationStopped(msg.payload ?? {}).catch(err => { diff --git a/GPS/handlers/system/index.js b/GPS/handlers/system/index.js index 0b793b8..36bfd3f 100644 --- a/GPS/handlers/system/index.js +++ b/GPS/handlers/system/index.js @@ -10,7 +10,7 @@ export const dispatchMessage = createDispatchMessage({ actionRules(redisCnx) { const gps = redisCnx.config.gps ?? {} return({ - channels: [gps.gpsActionsChannel].filter(Boolean), + channels: [gps.ActionsChannel].filter(Boolean), }) }, }) diff --git a/GPS/handlers/system/utilities.js b/GPS/handlers/system/utilities.js index 4350e33..252f273 100644 --- a/GPS/handlers/system/utilities.js +++ b/GPS/handlers/system/utilities.js @@ -2,11 +2,12 @@ import { replyToAction } from '../../../bus/publishActionReply.js' export const actions = { - async action_TIME(action, payload, reqid, sender, roles) { + async action_TIME(action, payload, reqid, sender, cnxId, roles) { replyToAction(this, { action, reqid, sender, + cnxId, success: true, payload: { gpsTime: new Date().toISOString(), @@ -15,16 +16,17 @@ export const actions = { }) }, - async action_RELOADCONFIG(action, payload, reqid, sender, roles) { + async action_RELOADCONFIG(action, payload, reqid, sender, cnxId, roles) { this.reloadAccessRights() - replyToAction(this, { action, reqid, sender, success: true }) + replyToAction(this, { action, reqid, sender, cnxId, success: true }) }, - async action_GETCONFIG(action, payload, reqid, sender, roles) { + async action_GETCONFIG(action, payload, reqid, sender, cnxId, roles) { replyToAction(this, { action, reqid, sender, + cnxId, success: true, payload: this.getAccessRights(), }) diff --git a/Maestro/handlers/arena/index.js b/Maestro/handlers/arena/index.js index 77e6406..383250b 100644 --- a/Maestro/handlers/arena/index.js +++ b/Maestro/handlers/arena/index.js @@ -9,7 +9,7 @@ export const dispatchMessage = createDispatchMessage({ eventHandlers, actionRules(redisCnx) { const maestro = redisCnx.config.maestro ?? {} - const arenaChannel = maestro.bus?.arena?.actionsChannel + const arenaChannel = maestro.arenaActionsChannel return({ channels: arenaChannel ? [arenaChannel] : [], }) diff --git a/Maestro/handlers/arena/prepare.js b/Maestro/handlers/arena/prepare.js index 45aad95..9af71e4 100644 --- a/Maestro/handlers/arena/prepare.js +++ b/Maestro/handlers/arena/prepare.js @@ -1,7 +1,7 @@ export const eventHandlers = { 'arena:gods:ready': { - readyToStart(msg, chan) { + readyToStart(msg, chan, sender, cnxId) { if(!this.maestroSrv) return this.maestroSrv.handlePrepareAck(msg, chan) }, diff --git a/Maestro/handlers/system/index.js b/Maestro/handlers/system/index.js index e308779..3cb396e 100644 --- a/Maestro/handlers/system/index.js +++ b/Maestro/handlers/system/index.js @@ -11,7 +11,7 @@ export const dispatchMessage = createDispatchMessage({ actionRules(redisCnx) { const maestro = redisCnx.config.maestro ?? {} return({ - channels: [maestro.maestroActionsChannel].filter(Boolean), + channels: [maestro.ActionsChannel].filter(Boolean), }) }, }) diff --git a/Maestro/handlers/system/simulation.js b/Maestro/handlers/system/simulation.js index 99057dd..a4c83cb 100644 --- a/Maestro/handlers/system/simulation.js +++ b/Maestro/handlers/system/simulation.js @@ -3,20 +3,20 @@ import { isValidUuid } from '../../simRepository.js' export const actions = { - async action_STARTSIMULATION(action, payload, reqid, sender, roles) { + async action_STARTSIMULATION(action, payload, reqid, sender, cnxId, roles) { if(!isValidUuid(sender)) { - replyToAction(this, { action, reqid, sender, success: false, err: 'Missing or invalid sender (user UUID)' }) + replyToAction(this, { action, reqid, sender, cnxId, success: false, err: 'Missing or invalid sender (user UUID)' }) return } if(!payload?.simulationUuid) { - replyToAction(this, { action, reqid, sender, success: false, err: 'Missing simulationUuid' }) + replyToAction(this, { action, reqid, sender, cnxId, success: false, err: 'Missing simulationUuid' }) return } const result = await this.maestroSrv.startSimulation(sender, payload) if(!result.ok) { - replyToAction(this, { action, reqid, sender, success: false, err: result.err }) + replyToAction(this, { action, reqid, sender, cnxId, success: false, err: result.err }) return } @@ -24,6 +24,7 @@ export const actions = { action, reqid, sender, + cnxId, success: true, payload: { simulationId: result.simulationId, @@ -36,20 +37,20 @@ export const actions = { }) }, - async action_PAUSESIMULATION(action, payload, reqid, sender, roles) { + async action_PAUSESIMULATION(action, payload, reqid, sender, cnxId, roles) { if(!isValidUuid(sender)) { - replyToAction(this, { action, reqid, sender, success: false, err: 'Missing or invalid sender (user UUID)' }) + replyToAction(this, { action, reqid, sender, cnxId, success: false, err: 'Missing or invalid sender (user UUID)' }) return } if(!payload?.simulationUuid) { - replyToAction(this, { action, reqid, sender, success: false, err: 'Missing simulationUuid' }) + replyToAction(this, { action, reqid, sender, cnxId, success: false, err: 'Missing simulationUuid' }) return } const result = await this.maestroSrv.pauseSimulation(sender, payload) if(!result.ok) { - replyToAction(this, { action, reqid, sender, success: false, err: result.err }) + replyToAction(this, { action, reqid, sender, cnxId, success: false, err: result.err }) return } @@ -57,6 +58,7 @@ export const actions = { action, reqid, sender, + cnxId, success: true, payload: { simulationId: result.simulationId, @@ -65,20 +67,20 @@ export const actions = { }) }, - async action_STOPSIMULATION(action, payload, reqid, sender, roles) { + async action_STOPSIMULATION(action, payload, reqid, sender, cnxId, roles) { if(!isValidUuid(sender)) { - replyToAction(this, { action, reqid, sender, success: false, err: 'Missing or invalid sender (user UUID)' }) + replyToAction(this, { action, reqid, sender, cnxId, success: false, err: 'Missing or invalid sender (user UUID)' }) return } if(!payload?.simulationUuid) { - replyToAction(this, { action, reqid, sender, success: false, err: 'Missing simulationUuid' }) + replyToAction(this, { action, reqid, sender, cnxId, success: false, err: 'Missing simulationUuid' }) return } const result = await this.maestroSrv.stopSimulation(sender, payload) if(!result.ok) { - replyToAction(this, { action, reqid, sender, success: false, err: result.err }) + replyToAction(this, { action, reqid, sender, cnxId, success: false, err: result.err }) return } @@ -86,20 +88,21 @@ export const actions = { action, reqid, sender, + cnxId, success: true, payload: { simulationId: result.simulationId }, }) }, - async action_GETSIMULATIONSSTATUS(action, payload, reqid, sender, roles) { + async action_GETSIMULATIONSSTATUS(action, payload, reqid, sender, cnxId, roles) { if(!isValidUuid(sender)) { - replyToAction(this, { action, reqid, sender, success: false, err: 'Missing or invalid sender (user UUID)' }) + replyToAction(this, { action, reqid, sender, cnxId, success: false, err: 'Missing or invalid sender (user UUID)' }) return } const result = await this.maestroSrv.getSimulationsStatus(sender) if(!result.ok) { - replyToAction(this, { action, reqid, sender, success: false, err: result.err }) + replyToAction(this, { action, reqid, sender, cnxId, success: false, err: result.err }) return } @@ -107,6 +110,7 @@ export const actions = { action, reqid, sender, + cnxId, success: true, payload: result.simulations, }) diff --git a/Maestro/handlers/system/utilities.js b/Maestro/handlers/system/utilities.js index e51086c..33baba7 100644 --- a/Maestro/handlers/system/utilities.js +++ b/Maestro/handlers/system/utilities.js @@ -2,16 +2,17 @@ import { replyToAction } from '../../../bus/publishActionReply.js' export const actions = { - async action_RELOADCONFIG(action, payload, reqid, sender, roles) { + async action_RELOADCONFIG(action, payload, reqid, sender, cnxId, roles) { this.reloadAccessRights() - replyToAction(this, { action, reqid, sender, success: true }) + replyToAction(this, { action, reqid, sender, cnxId, success: true }) }, - async action_GETCONFIG(action, payload, reqid, sender, roles) { + async action_GETCONFIG(action, payload, reqid, sender, cnxId, roles) { replyToAction(this, { action, reqid, sender, + cnxId, success: true, payload: this.getAccessRights(), }) diff --git a/Maestro/maestroServer.js b/Maestro/maestroServer.js index 7e74e1a..423b757 100644 --- a/Maestro/maestroServer.js +++ b/Maestro/maestroServer.js @@ -10,10 +10,12 @@ export class maestroServer { constructor(configHelper, allRediscnx, debug) { this.configHelper = configHelper - this.maestroConfig = configHelper.config + this.rootConfig = configHelper.config + this.maestroConfig = configHelper.config.maestro ?? {} + this.gpsConfig = configHelper.config.gps ?? {} this.allRediscnx = allRediscnx this.debug = debug - this.accessRights = new AccesRights(this.maestroConfig, debug) + this.accessRights = new AccesRights(this.rootConfig, debug) this.arenaCnx = null this.arenaCnxs = [] this.systemCnx = null @@ -31,26 +33,6 @@ export class maestroServer { this.pausedAt = null } - getMaestroSettings() { - const maestro = this.maestroConfig.maestro ?? {} - return({ - senderId: maestro.senderId ?? 'maestro', - lifecycle: { - arenaChannel: maestro.lifecycle?.arenaChannel ?? 'arena:lifecycle', - prepareAckChannel: maestro.lifecycle?.godsReadyChannel ?? 'arena:gods:ready', - }, - readyTimeoutMs: maestro.readyTimeoutMs ?? 30000, - }) - } - - getArenaStorageSettings() { - const gps = this.maestroConfig.gps ?? {} - return({ - agentHashKey: gps.arenaStorage?.agentHashKey ?? 'arena:agents:[UID]', - agentsIndexKey: gps.arenaStorage?.agentsIndexKey ?? 'arena:agents', - }) - } - isIdle() { return(this.orchestrationState === MaestroState.IDLE) } @@ -72,7 +54,7 @@ export class maestroServer { } async init() { - const mysqlCfg = this.maestroConfig.mysql + const mysqlCfg = this.rootConfig.mysql if(!mysqlCfg) { console.error('[Maestro] Missing mysql config') return(false) @@ -84,10 +66,11 @@ export class maestroServer { } refreshArenaGroom() { - if(this.arenaCnx) { + const arenaStorage = this.gpsConfig.arenaStorage + if(this.arenaCnx && arenaStorage) { this.arenaGroom = new ArenaGroom( this.arenaCnx, - this.getArenaStorageSettings(), + arenaStorage, this.debug ) } @@ -95,10 +78,9 @@ export class maestroServer { refreshPrepareQuorum() { if(!this.arenaCnx) return - const { lifecycle, readyTimeoutMs } = this.getMaestroSettings() this.prepareQuorum = new PrepareQuorum({ - ackChannel: lifecycle.prepareAckChannel, - timeoutMs: readyTimeoutMs, + ackChannel: this.maestroConfig.lifecycle.godsReadyChannel, + timeoutMs: this.maestroConfig.readyTimeoutMs, matchesChan: this.arenaCnx.matchesChan.bind(this.arenaCnx), debug: this.debug, }) @@ -144,11 +126,10 @@ export class maestroServer { async publishLifecycle(eventType, payload, state = null) { if(!this.arenaCnx) throw(new Error('No arena Redis connection')) - const { arenaChannel, senderId } = this.getMaestroSettings().lifecycle const resolvedState = state ?? this.orchestrationStateFor(payload?.simulationId ?? this.simulationId) - await this.arenaCnx.redisPublish(arenaChannel, { + await this.arenaCnx.redisPublish(this.maestroConfig.lifecycle.arenaChannel, { eventType, - sender: senderId, + sender: this.maestroConfig.senderId, payload, }) await this.publishSystemLifecycle(eventType, payload, resolvedState) @@ -164,9 +145,8 @@ export class maestroServer { const ownersResult = await this.simRepo.listSimulationOwnerUuids(simulationId) if(!ownersResult.ok || !ownersResult.ownerUuids.length) return - const maestro = this.maestroConfig.maestro ?? {} - const channelTemplate = maestro.systemLifecycleChannel ?? 'system:maestro:lifecycle:[UID]' - const senderId = maestro.senderId ?? 'maestro' + const channelTemplate = this.maestroConfig.systemLifecycleChannel + const senderId = this.maestroConfig.senderId const msg = { eventType, sender: senderId, @@ -219,7 +199,7 @@ export class maestroServer { infraId, } - const expectedParticipants = buildPrepareQuorum(this.agentIds, this.maestroConfig) + const expectedParticipants = buildPrepareQuorum(this.agentIds, this.rootConfig) const readyWait = this.prepareQuorum.begin(expectedParticipants, this.simulationId) await this.publishLifecycle('onYourMarks', lifecyclePayload) @@ -410,12 +390,12 @@ export class maestroServer { async reloadAccessRights() { await this.configHelper.refreshAccessRights() - this.maestroConfig.accessRights = this.configHelper.config.accessRights - this.accessRights.refreshAccessRights(this.maestroConfig) + this.rootConfig.accessRights = this.configHelper.config.accessRights + this.accessRights.refreshAccessRights(this.rootConfig) } getAccessRights() { - return(this.maestroConfig.accessRights) + return(this.rootConfig.accessRights) } } diff --git a/Observer/handlers/arena/index.js b/Observer/handlers/arena/index.js index 89f4e2a..f59ecf4 100644 --- a/Observer/handlers/arena/index.js +++ b/Observer/handlers/arena/index.js @@ -9,7 +9,7 @@ export const dispatchMessage = createDispatchMessage({ eventHandlers, actionRules(redisCnx) { const observer = redisCnx.config.observer ?? {} - const arenaChannel = observer.bus?.arena?.actionsChannel + const arenaChannel = observer.arenaActionsChannel return({ channels: arenaChannel ? [arenaChannel] : [], }) diff --git a/Observer/handlers/arena/lifecycle.js b/Observer/handlers/arena/lifecycle.js index 76da949..25ef779 100644 --- a/Observer/handlers/arena/lifecycle.js +++ b/Observer/handlers/arena/lifecycle.js @@ -1,13 +1,13 @@ export const eventHandlers = { 'arena:lifecycle': { - onYourMarks(msg, chan) { + onYourMarks(msg, chan, sender, cnxId) { this.observerSrv?.onYourMarks() }, - bigBang(msg, chan) { + bigBang(msg, chan, sender, cnxId) { this.observerSrv?.onBigBang() }, - simulationStopped(msg, chan) { + simulationStopped(msg, chan, sender, cnxId) { this.observerSrv?.onSimulationStopped(msg.payload ?? {}) }, }, diff --git a/Observer/handlers/system/index.js b/Observer/handlers/system/index.js index 63cbbce..665058a 100644 --- a/Observer/handlers/system/index.js +++ b/Observer/handlers/system/index.js @@ -11,7 +11,7 @@ export const dispatchMessage = createDispatchMessage({ actionRules(redisCnx) { const observer = redisCnx.config.observer ?? {} return({ - channels: [observer.observerActionsChannel].filter(Boolean), + channels: [observer.ActionsChannel].filter(Boolean), }) }, }) diff --git a/Observer/handlers/system/positions.js b/Observer/handlers/system/positions.js index 67cfd69..1dba5a1 100644 --- a/Observer/handlers/system/positions.js +++ b/Observer/handlers/system/positions.js @@ -4,66 +4,66 @@ import { Frustum } from '../../frustum.js' export const actions = { - async action_GETAGENTPOSITION(action, payload, reqid, sender, roles) { + async action_GETAGENTPOSITION(action, payload, reqid, sender, cnxId, roles) { const reader = this.observerSrv.gpsStorageReader if(!reader) { - replyToAction(this, { action, reqid, sender, success: false, err: 'GPS storage reader not ready' }) + replyToAction(this, { action, reqid, sender, cnxId, success: false, err: 'GPS storage reader not ready' }) return } if(!this.observerSrv.isLive()) { - replyToAction(this, { action, reqid, sender, success: false, err: 'Simulation not live' }) + replyToAction(this, { action, reqid, sender, cnxId, success: false, err: 'Simulation not live' }) return } const agentId = payload?.agentId if(!agentId || typeof(agentId) !== 'string') { - replyToAction(this, { action, reqid, sender, success: false, err: 'Missing or invalid agentId' }) + replyToAction(this, { action, reqid, sender, cnxId, success: false, err: 'Missing or invalid agentId' }) return } const at = parseSimTime(payload, () => this.observerSrv.now()) if(at === null) { - replyToAction(this, { action, reqid, sender, success: false, err: 'Invalid simulation time' }) + replyToAction(this, { action, reqid, sender, cnxId, success: false, err: 'Invalid simulation time' }) return } const agent = await reader.getAgentPosition(agentId, at) if(!agent) { - replyToAction(this, { action, reqid, sender, success: false, err: `Unknown agent: ${agentId}` }) + replyToAction(this, { action, reqid, sender, cnxId, success: false, err: `Unknown agent: ${agentId}` }) return } - replyToAction(this, { action, reqid, sender, success: true, payload: { agent } }) + replyToAction(this, { action, reqid, sender, cnxId, success: true, payload: { agent } }) }, - async action_GETAGENTSINFRUSTUM(action, payload, reqid, sender, roles) { + async action_GETAGENTSINFRUSTUM(action, payload, reqid, sender, cnxId, roles) { const registry = this.observerSrv.requestorRegistry if(!registry) { - replyToAction(this, { action, reqid, sender, success: false, err: 'Requestor registry not ready' }) + replyToAction(this, { action, reqid, sender, cnxId, success: false, err: 'Requestor registry not ready' }) return } if(!this.observerSrv.isLive()) { - replyToAction(this, { action, reqid, sender, success: false, err: 'Simulation not live' }) + replyToAction(this, { action, reqid, sender, cnxId, success: false, err: 'Simulation not live' }) return } const frustum = Frustum.fromPlanes(payload?.planes) if(!frustum) { - replyToAction(this, { action, reqid, sender, success: false, err: 'Missing or invalid frustum planes (expected 6)' }) + replyToAction(this, { action, reqid, sender, cnxId, success: false, err: 'Missing or invalid frustum planes (expected 6)' }) return } const at = parseSimTime(payload, () => this.observerSrv.now()) if(at === null) { - replyToAction(this, { action, reqid, sender, success: false, err: 'Invalid simulation time' }) + replyToAction(this, { action, reqid, sender, cnxId, success: false, err: 'Invalid simulation time' }) return } const result = await registry.evaluateOnce({ frustum, t: at }) if(!result.ok) { - replyToAction(this, { action, reqid, sender, success: false, err: result.err }) + replyToAction(this, { action, reqid, sender, cnxId, success: false, err: result.err }) return } @@ -71,6 +71,7 @@ export const actions = { action, reqid, sender, + cnxId, success: true, payload: { agents: result.agents, @@ -79,24 +80,29 @@ export const actions = { }) }, - async action_SUBSCRIBEFRUSTUM(action, payload, reqid, sender, roles) { + async action_SUBSCRIBEFRUSTUM(action, payload, reqid, sender, cnxId, roles) { const registry = this.observerSrv.requestorRegistry if(!registry) { - replyToAction(this, { action, reqid, sender, success: false, err: 'Requestor registry not ready' }) + replyToAction(this, { action, reqid, sender, cnxId, success: false, err: 'Requestor registry not ready' }) return } if(!this.observerSrv.isLive()) { - replyToAction(this, { action, reqid, sender, success: false, err: 'Simulation not live' }) + replyToAction(this, { action, reqid, sender, cnxId, success: false, err: 'Simulation not live' }) return } - const result = await registry.subscribeFrustum(sender, { + if(!cnxId || typeof(cnxId) !== 'string') { + replyToAction(this, { action, reqid, sender, cnxId, success: false, err: 'Missing or invalid cnxId' }) + return + } + + const result = await registry.subscribeFrustum(cnxId, { planes: payload?.planes, frequency: payload?.frequency, }) if(!result.ok) { - replyToAction(this, { action, reqid, sender, success: false, err: result.err }) + replyToAction(this, { action, reqid, sender, cnxId, success: false, err: result.err }) return } @@ -104,6 +110,7 @@ export const actions = { action, reqid, sender, + cnxId, success: true, payload: { frequency: result.frequency, diff --git a/Observer/handlers/system/utilities.js b/Observer/handlers/system/utilities.js index e51086c..33baba7 100644 --- a/Observer/handlers/system/utilities.js +++ b/Observer/handlers/system/utilities.js @@ -2,16 +2,17 @@ import { replyToAction } from '../../../bus/publishActionReply.js' export const actions = { - async action_RELOADCONFIG(action, payload, reqid, sender, roles) { + async action_RELOADCONFIG(action, payload, reqid, sender, cnxId, roles) { this.reloadAccessRights() - replyToAction(this, { action, reqid, sender, success: true }) + replyToAction(this, { action, reqid, sender, cnxId, success: true }) }, - async action_GETCONFIG(action, payload, reqid, sender, roles) { + async action_GETCONFIG(action, payload, reqid, sender, cnxId, roles) { replyToAction(this, { action, reqid, sender, + cnxId, success: true, payload: this.getAccessRights(), }) diff --git a/Observer/observerServer.js b/Observer/observerServer.js index 5bba4cb..9e30564 100644 --- a/Observer/observerServer.js +++ b/Observer/observerServer.js @@ -7,10 +7,12 @@ export class observerServer { constructor(configHelper, allRediscnx, debug) { this.configHelper = configHelper - this.observerConfig = configHelper.config + this.rootConfig = configHelper.config + this.observerConfig = configHelper.config.observer ?? {} + this.gpsConfig = configHelper.config.gps ?? {} this.allRediscnx = allRediscnx this.debug = debug - this.accessRights = new AccesRights(this.observerConfig, debug) + this.accessRights = new AccesRights(this.rootConfig, debug) this.arenaCnx = null this.arenaCnxs = [] this.systemCnx = null @@ -20,27 +22,8 @@ export class observerServer { this.bigBangEpoch = null } - getObserverSettings() { - const observer = this.observerConfig.observer ?? {} - return({ - senderId: observer.senderId ?? 'observer', - scanIntervalMs: observer.scanIntervalMs ?? 300, - frustumEventsChannel: observer.observerFrustumEventsChannel - ?? 'system:observer:subscribed[UID]:agents', - lifecycle: { - arenaChannel: observer.lifecycle?.arenaChannel ?? 'arena:lifecycle', - godsReadyChannel: observer.lifecycle?.godsReadyChannel ?? 'arena:gods:ready', - }, - }) - } - - getGpsStorageSettings() { - const gps = this.observerConfig.gps ?? {} - return(gps.GPSstorage ?? null) - } - initGpsStorageReader() { - const gpsStorage = this.getGpsStorageSettings() + const gpsStorage = this.gpsConfig.GPSstorage if(gpsStorage && this.systemCnx) { this.gpsStorageReader = new GpsStorageReader(this.systemCnx, gpsStorage, this.debug) this.initRequestorRegistry() @@ -49,7 +32,7 @@ export class observerServer { initRequestorRegistry() { if(!this.gpsStorageReader || this.requestorRegistry) return - const { scanIntervalMs } = this.getObserverSettings() + const { scanIntervalMs } = this.observerConfig this.requestorRegistry = new RequestorRegistry( this.gpsStorageReader, () => this.now(), @@ -67,9 +50,9 @@ export class observerServer { if(!this.systemCnx || !subscriberId) return if(!Array.isArray(agents) || !agents.length) return - const { frustumEventsChannel } = this.getObserverSettings() - const chan = frustumEventsChannel.replace(/\[UID\]/g, subscriberId) - const senderId = this.getObserverSettings().senderId + const chan = this.observerConfig.FrustumEventsChannel + .replace(/\[CUID\]/g, subscriberId) + const senderId = this.observerConfig.senderId for(const agent of agents) { if(!agent?.id || !agent?.position) continue @@ -77,6 +60,7 @@ export class observerServer { await this.systemCnx.redisPublish(chan, { eventType: 'move', sender: senderId, + cnxId: this.systemCnx.cnxId, payload: { aid: agent.id, coords: { @@ -147,12 +131,12 @@ export class observerServer { async reloadAccessRights() { await this.configHelper.refreshAccessRights() - this.observerConfig.accessRights = this.configHelper.config.accessRights - this.accessRights.refreshAccessRights(this.observerConfig) + this.rootConfig.accessRights = this.configHelper.config.accessRights + this.accessRights.refreshAccessRights(this.rootConfig) } getAccessRights() { - return(this.observerConfig.accessRights) + return(this.rootConfig.accessRights) } } diff --git a/Untitled b/Untitled new file mode 100644 index 0000000..32f64f4 --- /dev/null +++ b/Untitled @@ -0,0 +1 @@ +t \ No newline at end of file diff --git a/bus/assembleMesh.js b/bus/assembleMesh.js index 02faece..ab09fc8 100644 --- a/bus/assembleMesh.js +++ b/bus/assembleMesh.js @@ -27,14 +27,17 @@ export function assembleHandlers(modules) { }) } -export function createDispatchMessage({ eventHandlers, actionRules }) { +export function createDispatchMessage({ eventHandlers, eventRules, actionRules }) { return(async function dispatchMessage(redisCnx, msg, chan) { if(msg.action && msg.eventType) { console.warn(`[${redisCnx.redisId}] Message has both action and eventType on ${chan}`) return(false) } if(msg.action) return(dispatchActions(redisCnx, msg, chan, actionRules(redisCnx))) - if(msg.eventType) return(dispatchEvents(redisCnx, msg, chan, eventHandlers)) + if(msg.eventType) { + const handlers = eventRules ? eventRules(redisCnx) : eventHandlers + return(dispatchEvents(redisCnx, msg, chan, handlers)) + } return(false) }) } diff --git a/bus/dispatchActions.js b/bus/dispatchActions.js index 99718c9..43693ae 100644 --- a/bus/dispatchActions.js +++ b/bus/dispatchActions.js @@ -14,6 +14,7 @@ export async function dispatchActions(redisCnx, msg, chan, rules) { const action = msg.action const sender = msg.sender ?? null + const cnxId = msg.cnxId ?? null const reqid = ('reqid' in msg) ? msg.reqid.substr(0, 50) : null const roles = Array.isArray(msg.roles) ? msg.roles : ['*'] @@ -21,8 +22,9 @@ export async function dispatchActions(redisCnx, msg, chan, rules) { if(!sender) return(true) replyToAction(redisCnx, { action, - reqid, sender, + reqid, + cnxId, success: false, err: 'Missing or invalid action', }) @@ -39,6 +41,7 @@ export async function dispatchActions(redisCnx, msg, chan, rules) { action, reqid, sender, + cnxId, success: false, err: 'Unauthorized action !', }) @@ -51,6 +54,7 @@ export async function dispatchActions(redisCnx, msg, chan, rules) { action, reqid, sender, + cnxId, success: false, err: `Unknown action: ${action}`, }) @@ -62,13 +66,14 @@ export async function dispatchActions(redisCnx, msg, chan, rules) { } try { - await handler.call(redisCnx, action, ('payload' in msg) ? msg.payload : null, reqid, sender, roles) + await handler.call(redisCnx, action, ('payload' in msg) ? msg.payload : null, reqid, sender, cnxId, roles) } catch(err) { console.error(`[${redisCnx.redisId}] Action ${action} failed:`, err) replyToAction(redisCnx, { action, reqid, sender, + cnxId, success: false, err: err.message ?? `${action} failed`, }) diff --git a/bus/dispatchEvents.js b/bus/dispatchEvents.js index 14b6485..c692689 100644 --- a/bus/dispatchEvents.js +++ b/bus/dispatchEvents.js @@ -1,6 +1,8 @@ export function dispatchEvents(redisCnx, msg, chan, eventHandlers) { const eventType = msg.eventType + const sender = msg.sender ?? null + const cnxId = msg.cnxId ?? null if(!eventType || typeof(eventType) !== 'string') return(false) let handled = false @@ -13,7 +15,7 @@ export function dispatchEvents(redisCnx, msg, chan, eventHandlers) { for(const handle of handlers) { try { - handle.call(redisCnx, msg, chan) + handle.call(redisCnx, msg, chan, sender, cnxId) } catch(err) { console.error( `[${redisCnx.redisId}] Event ${eventType} on ${chan} failed:`, diff --git a/bus/publishActionReply.js b/bus/publishActionReply.js index 3c13328..6258c39 100644 --- a/bus/publishActionReply.js +++ b/bus/publishActionReply.js @@ -3,12 +3,9 @@ export function busReplyRoute(daemonBlock, meshName) { if(!daemonBlock?.senderId) return(null) const onArena = meshName === 'arena' - const systemReply = daemonBlock.maestroActionsReply - ?? daemonBlock.gpsActionsReply - ?? daemonBlock.observerActionsReply const actionsReply = onArena - ? (daemonBlock.bus?.arena?.actionsReply ?? systemReply) - : systemReply + ? (daemonBlock.bus?.arena?.actionsReply ?? daemonBlock.ActionsReply) + : daemonBlock.ActionsReply if(!actionsReply) return(null) @@ -25,12 +22,13 @@ export function publishActionReply(redisCnx, options) { sender, reply, replyChannel, - senderId, } = options reply.action = action - reply.sender = senderId + reply.sender = redisCnx.senderId + reply.cnxId = redisCnx.cnxId if(reqid) reply.reqid = reqid const chan = replyChannel.replace(/\[UID\]/g, sender) + .replace(/\[CUID\]/g, redisCnx.cnxId) redisCnx.redisPublish(chan, reply) } @@ -43,13 +41,11 @@ export function replyToAction(redisCnx, options) { payload, err, replyChannel, - senderId, } = options const routeReplyChannel = replyChannel ?? redisCnx.actionsReply - const routeSenderId = senderId ?? redisCnx.senderId - if(!routeReplyChannel || !routeSenderId) { + if(!routeReplyChannel) { console.error(`[${redisCnx.redisId}] Cannot resolve action reply route`) return } @@ -63,7 +59,6 @@ export function replyToAction(redisCnx, options) { reqid, sender, replyChannel: routeReplyChannel, - senderId: routeSenderId, reply, }) } diff --git a/config.json b/config.json index 69bfdaf..b75ac18 100644 --- a/config.json +++ b/config.json @@ -29,8 +29,10 @@ ], "gps": { "primordialDaemon": true, - "gpsActionsChannel": "system:requests:gps", - "gpsActionsReply": "system:replies:[UID]", + "ActionsChannel": "system:requests:gps", + "ActionsReply": "system:replies:[UID]", + "arenaActionsChannel": "arena:requests:[UID]", + "arenaActionsReply": "arena:replies:[UID]", "GPSstorage": { "agentHashKey": "system:gps:agent:[UID]", "agentsIndexKey": "system:gps:agents", @@ -54,8 +56,10 @@ "prismRefreshLeadSeconds": 1 }, "maestro": { - "maestroActionsChannel": "system:requests:maestro", - "maestroActionsReply": "system:replies:[UID]", + "ActionsChannel": "system:requests:maestro", + "ActionsReply": "system:replies:[UID]", + "arenaActionsChannel": "arena:requests:[UID]", + "arenaActionsReply": "arena:replies:[UID]", "senderId": "maestro", "lifecycle": { "arenaChannel": "arena:lifecycle", @@ -64,16 +68,13 @@ "systemLifecycleChannel": "system:maestro:lifecycle:[UID]", "readyTimeoutMs": 30000 }, - "mysql": { - "socketPath": "/var/run/mysqld/mysqld.sock", - "guiDatabase": "p42GUI", - "simDatabase": "p42SIM" - }, "observer": { "primordialDaemon": false, - "observerActionsChannel": "system:requests:observer", - "observerActionsReply": "system:replies:[UID]", - "observerFrustumEventsChannel": "system:observer:subscribed[UID]:agents", + "ActionsChannel": "system:requests:observer", + "ActionsReply": "system:replies:[UID]", + "arenaActionsChannel": "arena:requests:[UID]", + "arenaActionsReply": "arena:replies:[UID]", + "FrustumEventsChannel": "system:observer:subscribed[CUID]:agents", "senderId": "observer", "scanIntervalMs": 300, "lifecycle": { @@ -110,5 +111,10 @@ "basePrefix": "messageBus:" } ] + }, + "mysql": { + "socketPath": "/var/run/mysqld/mysqld.sock", + "guiDatabase": "p42GUI", + "simDatabase": "p42SIM" } } diff --git a/configSchema.json b/configSchema.json index 8e3a24f..442bf37 100644 --- a/configSchema.json +++ b/configSchema.json @@ -61,8 +61,10 @@ "type": "object", "properties": { "primordialDaemon": { "type": "boolean" }, - "gpsActionsChannel": { "type": "string" }, - "gpsActionsReply": { "type": "string" }, + "ActionsChannel": { "type": "string" }, + "ActionsReply": { "type": "string" }, + "arenaActionsChannel": { "type": "string" }, + "arenaActionsReply": { "type": "string" }, "GPSstorage": { "type": "object", "properties": { @@ -108,8 +110,10 @@ "prismRefreshLeadSeconds": { "type": "number", "minimum": 0 } }, "required": [ - "gpsActionsChannel", - "gpsActionsReply", + "ActionsChannel", + "ActionsReply", + "arenaActionsChannel", + "arenaActionsReply", "GPSstorage", "agentVectorChangeChannel", "collisionsChannel" @@ -118,8 +122,10 @@ "maestro": { "type": "object", "properties": { - "maestroActionsChannel": { "type": "string" }, - "maestroActionsReply": { "type": "string" }, + "ActionsChannel": { "type": "string" }, + "ActionsReply": { "type": "string" }, + "arenaActionsChannel": { "type": "string" }, + "arenaActionsReply": { "type": "string" }, "senderId": { "type": "string" }, "lifecycle": { "type": "object", @@ -135,8 +141,10 @@ "readyTimeoutMs": { "type": "integer", "minimum": 1000 } }, "required": [ - "maestroActionsChannel", - "maestroActionsReply" + "ActionsChannel", + "ActionsReply", + "arenaActionsChannel", + "arenaActionsReply" ] }, "mysql": { @@ -155,9 +163,11 @@ "type": "object", "properties": { "primordialDaemon": { "type": "boolean" }, - "observerActionsChannel": { "type": "string" }, - "observerActionsReply": { "type": "string" }, - "observerFrustumEventsChannel": { "type": "string" }, + "ActionsChannel": { "type": "string" }, + "ActionsReply": { "type": "string" }, + "arenaActionsChannel": { "type": "string" }, + "arenaActionsReply": { "type": "string" }, + "FrustumEventsChannel": { "type": "string" }, "senderId": { "type": "string" }, "scanIntervalMs": { "type": "integer", "minimum": 50 }, "lifecycle": { @@ -173,8 +183,11 @@ } }, "required": [ - "observerActionsChannel", - "observerActionsReply" + "ActionsChannel", + "ActionsReply", + "FrustumEventsChannel", + "arenaActionsChannel", + "arenaActionsReply" ] }, "systemMesh": { diff --git a/config_test.json b/config_test.json index 46a5d16..854f4f2 100644 --- a/config_test.json +++ b/config_test.json @@ -29,8 +29,10 @@ ], "gps": { "primordialDaemon": true, - "gpsActionsChannel": "system:requests:gps", - "gpsActionsReply": "system:replies:[UID]", + "ActionsChannel": "system:requests:gps", + "ActionsReply": "system:replies:[UID]", + "arenaActionsChannel": "arena:requests:[UID]", + "arenaActionsReply": "arena:replies:[UID]", "GPSstorage": { "agentHashKey": "system:gps:agent:[UID]", "agentsIndexKey": "system:gps:agents", @@ -54,8 +56,10 @@ "prismRefreshLeadSeconds": 1 }, "maestro": { - "maestroActionsChannel": "system:requests:maestro", - "maestroActionsReply": "system:replies:[UID]", + "ActionsChannel": "system:requests:maestro", + "ActionsReply": "system:replies:[UID]", + "arenaActionsChannel": "arena:requests:[UID]", + "arenaActionsReply": "arena:replies:[UID]", "senderId": "maestro", "lifecycle": { "arenaChannel": "arena:lifecycle", @@ -64,16 +68,13 @@ "systemLifecycleChannel": "system:maestro:lifecycle:[UID]", "readyTimeoutMs": 30000 }, - "mysql": { - "socketPath": "/var/run/mysqld/mysqld.sock", - "guiDatabase": "test_p42GUI", - "simDatabase": "test_p42SIM" - }, "observer": { "primordialDaemon": false, - "observerActionsChannel": "system:requests:observer", - "observerActionsReply": "system:replies:[UID]", - "observerFrustumEventsChannel": "system:observer:subscribed[UID]:agents", + "ActionsChannel": "system:requests:observer", + "ActionsReply": "system:replies:[UID]", + "FrustumEventsChannel": "system:observer:subscribed[CUID]:agents", + "arenaActionsChannel": "arena:requests:[UID]", + "arenaActionsReply": "arena:replies:[UID]", "senderId": "observer", "scanIntervalMs": 300, "lifecycle": { @@ -110,5 +111,10 @@ "basePrefix": "messageBus:" } ] + }, + "mysql": { + "socketPath": "/var/run/mysqld/mysqld.sock", + "guiDatabase": "test_p42GUI", + "simDatabase": "test_p42SIM" } } diff --git a/redisConnexion.js b/redisConnexion.js index fac0d28..d14d026 100644 --- a/redisConnexion.js +++ b/redisConnexion.js @@ -1,4 +1,5 @@ import redis from 'redis' +import os from 'node:os' export class RedisConnexion { @@ -12,6 +13,8 @@ export class RedisConnexion { this.senderId = options.senderId ?? null this.actionsReply = options.actionsReply ?? null + this.cnxId = os.hostname() + ':' + process.pid + ':' + Date.now() + ':' + Math.random().toString(36).substring(2, 15) + if(this.meshModule?.actionHandlers) Object.assign(this, this.meshModule.actionHandlers) this.afterLogin = this.meshModule?.afterLogin ?? [] diff --git a/tests/modules/maestro1.js b/tests/modules/maestro1.js index 52b6a41..c7b20a8 100644 --- a/tests/modules/maestro1.js +++ b/tests/modules/maestro1.js @@ -197,7 +197,7 @@ export async function run(ctx) { const lifecycleWait = waitForLifecycleEvent(ctx, 'onYourMarks', argv.timeout) const reqid = `maestro1-${Date.now()}` - const actionsChan = config.maestro.maestroActionsChannel + const actionsChan = config.maestro.ActionsChannel log('action', `Publishing STARTSIMULATION on ${actionsChan} (reqid=${reqid})...`) await systemCnx.redisPublish(actionsChan, {