General Actions to handlers Refacto

This commit is contained in:
STEINNI
2026-06-20 18:50:26 +00:00
parent 7435d96135
commit 44a84c64ec
56 changed files with 832 additions and 973 deletions
+27
View File
@@ -0,0 +1,27 @@
export const eventHandlers = {
'arena:agents:*': {
change(msg, chan) {
const agentId = msg.sender
if(!agentId || typeof(agentId) !== 'string') {
console.warn(`[${this.redisId}] Agent event without sender`)
return
}
const newVector = msg.payload?.newVector
if(!newVector || typeof(newVector.x) !== 'number' || typeof(newVector.y) !== 'number' || typeof(newVector.z) !== 'number') {
console.warn(`[${this.redisId}] Invalid newVector from ${agentId}`)
return
}
const newPosition = msg.payload?.newPosition ?? null
this.gpsSrv?.onVectorChange(agentId, newVector, newPosition)
},
remove(msg, chan) {
const agentId = msg.sender
if(!agentId || typeof(agentId) !== 'string') {
console.warn(`[${this.redisId}] Agent event without sender`)
return
}
this.gpsSrv?.onAgentRemove(agentId)
},
},
}
-70
View File
@@ -1,70 +0,0 @@
export const construct = (redisCnx) => {
const tickMs = redisCnx.gpsSrv?.getGpsSettings().collisionTickMs ?? 100
// Interval always runs; tickArena no-ops until LIVE (see gpsServer.tickArena)
setInterval(() => {
redisCnx.gpsSrv?.tickArena()
}, tickMs)
}
export const methods = {
handleLifecycleEvent(msg) {
const srv = this.gpsSrv
if(!srv) return
if(msg.eventType === 'onYourMarks') {
srv.onYourMarks(msg.payload ?? {}).catch(err => {
console.error(`[${this.redisId}] onYourMarks failed:`, err)
srv.publishReadyToStart({ success: false, err: err.message ?? 'onYourMarks failed' })
})
return
}
if(msg.eventType === 'bigBang') {
srv.onBigBang(msg.payload ?? {})
return
}
},
handleAgentEvent(msg) {
const agentId = msg.sender
if(!agentId || typeof(agentId) !== 'string') {
console.warn(`[${this.redisId}] Agent event without sender`)
return
}
if(msg.eventType === 'change') {
const newVector = msg.payload?.newVector
if(!newVector || typeof(newVector.x) !== 'number' || typeof(newVector.y) !== 'number' || typeof(newVector.z) !== 'number') {
console.warn(`[${this.redisId}] Invalid newVector from ${agentId}`)
return
}
const newPosition = msg.payload?.newPosition ?? null
this.gpsSrv.onVectorChange(agentId, newVector, newPosition)
return
}
if(msg.eventType === 'remove') {
this.gpsSrv.onAgentRemove(agentId)
return
}
},
dispatchArenaMessage(msg, chan) {
const gps = this.config.gps
if(!gps || !this.gpsSrv) return(false)
if(this.matchesChan(chan, gps.lifecycle?.arenaChannel ?? 'arena:lifecycle')) {
this.handleLifecycleEvent(msg)
return(true)
}
if(this.matchesChan(chan, gps.agentVectorChangeChannel)) {
this.handleAgentEvent(msg)
return(true)
}
return(false)
},
}
-5
View File
@@ -1,5 +0,0 @@
export function dispatchMessage(redisCnx, msg, chan) {
if(!redisCnx.config.gps || typeof(redisCnx.dispatchArenaMessage) !== 'function') return
redisCnx.dispatchArenaMessage(msg, chan)
}
+16 -9
View File
@@ -1,11 +1,18 @@
import { methods as arenaMethods, construct as arenaConstruct } from './arenaHandlers.js' import { assembleHandlers, createDispatchMessage } from '../../../bus/assembleMesh.js'
import { dispatchMessage } from './dispatch.js' import * as lifecycle from './lifecycle.js'
import * as agentMotion from './agentMotion.js'
export const afterLoginMethods = [ const { actionHandlers, eventHandlers, afterLogin } = assembleHandlers([lifecycle, agentMotion])
arenaConstruct,
]
export const meshActions = { export { actionHandlers, afterLogin }
...arenaMethods,
} export const dispatchMessage = createDispatchMessage({
export { dispatchMessage } eventHandlers,
actionRules(redisCnx) {
const gps = redisCnx.config.gps ?? {}
const arenaChannel = gps.bus?.arena?.actionsChannel
return({
channels: arenaChannel ? [arenaChannel] : [],
})
},
})
+23
View File
@@ -0,0 +1,23 @@
export function construct(redisCnx) {
const tickMs = redisCnx.gpsSrv?.getGpsSettings().collisionTickMs ?? 100
setInterval(() => {
redisCnx.gpsSrv?.tickArena()
}, tickMs)
}
export const eventHandlers = {
'arena:lifecycle': {
onYourMarks(msg, chan) {
const srv = this.gpsSrv
if(!srv) return
srv.onYourMarks(msg.payload ?? {}).catch(err => {
console.error(`[${this.redisId}] onYourMarks failed:`, err)
srv.publishReadyToStart({ success: false, err: err.message ?? 'onYourMarks failed' })
})
},
bigBang(msg, chan) {
this.gpsSrv?.onBigBang(msg.payload ?? {})
},
},
}
-28
View File
@@ -1,28 +0,0 @@
export function dispatchMessage(redisCnx, msg, chan) {
const gps = redisCnx.config.gps
if(!gps?.gpsActionsChannel) return
const actionsChan = redisCnx.fullChan(gps.gpsActionsChannel)
if(chan != actionsChan) return
const action = msg.action
if(!action || typeof(action) !== 'string') {
console.warn(`[${redisCnx.redisId}] Ignoring message without action on ${chan}`)
return
}
const handler = redisCnx['action_'+action]
if(typeof(handler) != 'function') {
if(redisCnx.debug) console.warn(`[${redisCnx.redisId}] Unknown action ${action} on ${chan}`)
return
}
const payload = ('payload' in msg) ? msg.payload : null
const reqid = ('reqid' in msg) ? msg.reqid.substr(0, 50) : null
const sender = msg.sender || null
const roles = Array.isArray(msg.roles) ? msg.roles : ['*']
if(redisCnx.debug) console.log(`[${redisCnx.redisId}] Dispatching action ${action} from ${sender}`)
handler.call(redisCnx, action, payload, reqid, sender, roles)
}
+15 -9
View File
@@ -1,10 +1,16 @@
import { methods as utilities, construct as utilitiesConstruct } from './utilities.js' import { assembleHandlers, createDispatchMessage } from '../../../bus/assembleMesh.js'
import { dispatchMessage } from './dispatch.js' import * as utilities from './utilities.js'
export const afterLoginMethods = [ const { actionHandlers, eventHandlers, afterLogin } = assembleHandlers([utilities])
utilitiesConstruct,
] export { actionHandlers, afterLogin }
export const meshActions = {
...utilities, export const dispatchMessage = createDispatchMessage({
} eventHandlers,
export { dispatchMessage } actionRules(redisCnx) {
const gps = redisCnx.config.gps ?? {}
return({
channels: [gps.gpsActionsChannel].filter(Boolean),
})
},
})
+6 -82
View File
@@ -1,109 +1,33 @@
import { publishActionReply } from '../../actionsHelper.js' import { replyToAction } from '../../../bus/publishActionReply.js'
export const construct = (redisCnx) => { export const actions = {
// console.log('Hello after login from utilities...')
// redisCnx.v42=0
// setInterval(redisCnx.move4243.bind(redisCnx), 200)
}
export const methods = {
/* Event-Rx:
{
"action": "TIME"
"reqid": "6az5e4r6a"
}
Event-Tx:
{
"action": "TIME",
"success": true,
"payload" : {
gpsTime: "2022-09-01T14:42:22.603Z",
redisTime: "2022-09-01T14:42:22.603Z"
},
"reqid": "6az5e4r6a"
}
*/
async action_TIME(action, payload, reqid, sender, roles) { async action_TIME(action, payload, reqid, sender, roles) {
publishActionReply(this, { replyToAction(this, {
action, action,
reqid, reqid,
sender, sender,
replyChannel: this.config.gps.gpsActionsReply,
reply: {
success: true, success: true,
payload: { payload: {
gpsTime: new Date().toISOString(), gpsTime: new Date().toISOString(),
redisTime: await this.redisClient.time(), redisTime: await this.redisClient.time(),
}, },
},
}) })
}, },
/* Event-Rx:
{
"action": "RELOADCONFIG"
"reqid": "6az5e4r6a"
}
Event-Tx:
{
"action": "RELOADCONFIG",
"success": true,
"reqid": "6az5e4r6a"
}
*/
async action_RELOADCONFIG(action, payload, reqid, sender, roles) { async action_RELOADCONFIG(action, payload, reqid, sender, roles) {
const replyOpts = {
action,
reqid,
sender,
replyChannel: this.config.gps.gpsActionsReply,
}
if(!this.accessRights.canDo(roles, action)) {
publishActionReply(this, { ...replyOpts, reply: {
success: false,
err: 'Unauthorized action !',
} })
return
}
this.reloadAccessRights() this.reloadAccessRights()
publishActionReply(this, { ...replyOpts, reply: { replyToAction(this, { action, reqid, sender, success: true })
success: true,
} })
}, },
/* Event-Rx:
{
"action": "GETCONFIG"
"reqid": "6az5e4r6a"
}
Event-Tx:
{
"action": "GETCONFIG",
"success": true,
"reqid": "6az5e4r6a",
payload: { ...the access rights, and roles... }
}
*/
async action_GETCONFIG(action, payload, reqid, sender, roles) { async action_GETCONFIG(action, payload, reqid, sender, roles) {
const replyOpts = { replyToAction(this, {
action, action,
reqid, reqid,
sender, sender,
replyChannel: this.config.gps.gpsActionsReply,
}
if(!this.accessRights.canDo(roles, action)) {
publishActionReply(this, { ...replyOpts, reply: {
success: false,
err: 'Unauthorized action !',
} })
return
}
publishActionReply(this, { ...replyOpts, reply: {
success: true, success: true,
payload: this.getAccessRights(), payload: this.getAccessRights(),
} }) })
}, },
} }
-22
View File
@@ -1,22 +0,0 @@
export function publishActionReply(redisCnx, options) {
const {
action,
reqid,
sender,
reply,
replyChannel,
senderId = 'gps',
} = options
reply.action = action
reply.sender = senderId
if(reqid) reply.reqid = reqid
const chan = replyChannel.replace(/\[UID\]/g, sender)
redisCnx.redisPublish(chan, reply)
}
export function parseSimTime(payload, fallbackFn) {
if(payload?.t != null && typeof(payload.t) === 'number' && !Number.isNaN(payload.t)) return(payload.t)
if(payload?.at != null && typeof(payload.at) === 'number' && !Number.isNaN(payload.at)) return(payload.at)
return(fallbackFn())
}
+4
View File
@@ -3,6 +3,7 @@ import yargs from 'yargs/yargs'
import { hideBin } from 'yargs/helpers' import { hideBin } from 'yargs/helpers'
import 'node:process' import 'node:process'
import {RedisConnexion} from '../redisConnexion.js' import {RedisConnexion} from '../redisConnexion.js'
import { busReplyRoute } from '../bus/publishActionReply.js'
import {configHelper} from '../configHelper.js' import {configHelper} from '../configHelper.js'
import {gpsServer} from './gpsServer.js' import {gpsServer} from './gpsServer.js'
import * as systemMesh from './actions/system/index.js' import * as systemMesh from './actions/system/index.js'
@@ -53,6 +54,7 @@ let cfgh = new configHelper({
function meshRedisConns(mesh, meshName, debug, rootConfig) { function meshRedisConns(mesh, meshName, debug, rootConfig) {
const { redis, ...meshConfig } = mesh const { redis, ...meshConfig } = mesh
const busRoute = busReplyRoute(rootConfig.gps, meshName)
return redis.map(cfg => return redis.map(cfg =>
new RedisConnexion({ new RedisConnexion({
debug, debug,
@@ -60,6 +62,8 @@ function meshRedisConns(mesh, meshName, debug, rootConfig) {
redisId: cfg.redisId, redisId: cfg.redisId,
meshName, meshName,
meshModule: meshModules[meshName], meshModule: meshModules[meshName],
senderId: busRoute?.senderId,
actionsReply: busRoute?.actionsReply,
}) })
) )
} }
+17
View File
@@ -0,0 +1,17 @@
import { assembleHandlers, createDispatchMessage } from '../../../bus/assembleMesh.js'
import * as prepare from './prepare.js'
const { actionHandlers, eventHandlers, afterLogin } = assembleHandlers([prepare])
export { actionHandlers, afterLogin }
export const dispatchMessage = createDispatchMessage({
eventHandlers,
actionRules(redisCnx) {
const maestro = redisCnx.config.maestro ?? {}
const arenaChannel = maestro.bus?.arena?.actionsChannel
return({
channels: arenaChannel ? [arenaChannel] : [],
})
},
})
+9
View File
@@ -0,0 +1,9 @@
export const eventHandlers = {
'arena:gods:ready': {
readyToStart(msg, chan) {
if(!this.maestroSrv) return
this.maestroSrv.handlePrepareAck(msg, chan)
},
},
}
+17
View File
@@ -0,0 +1,17 @@
import { assembleHandlers, createDispatchMessage } from '../../../bus/assembleMesh.js'
import * as simulation from './simulation.js'
import * as utilities from './utilities.js'
const { actionHandlers, eventHandlers, afterLogin } = assembleHandlers([simulation, utilities])
export { actionHandlers, afterLogin }
export const dispatchMessage = createDispatchMessage({
eventHandlers,
actionRules(redisCnx) {
const maestro = redisCnx.config.maestro ?? {}
return({
channels: [maestro.maestroActionsChannel].filter(Boolean),
})
},
})
+63
View File
@@ -0,0 +1,63 @@
import { replyToAction } from '../../../bus/publishActionReply.js'
import { isValidUuid } from '../../simRepository.js'
export const actions = {
async action_STARTSIMULATION(action, payload, reqid, sender, roles) {
if(!isValidUuid(sender)) {
replyToAction(this, { action, reqid, sender, success: false, err: 'Missing or invalid sender (user UUID)' })
return
}
if(!payload?.simulationUuid) {
replyToAction(this, { action, reqid, sender, success: false, err: 'Missing simulationUuid' })
return
}
const result = await this.maestroSrv.startSimulation(sender, payload)
if(!result.ok) {
replyToAction(this, { action, reqid, sender, success: false, err: result.err })
return
}
replyToAction(this, {
action,
reqid,
sender,
success: true,
payload: {
simulationId: result.simulationId,
keyframeId: result.keyframeId,
infraId: result.infraId,
agentIds: result.agentIds,
},
})
},
async action_STOPSIMULATION(action, payload, reqid, sender, roles) {
if(!isValidUuid(sender)) {
replyToAction(this, { action, reqid, sender, success: false, err: 'Missing or invalid sender (user UUID)' })
return
}
if(!payload?.simulationUuid) {
replyToAction(this, { action, reqid, sender, success: false, err: 'Missing simulationUuid' })
return
}
const result = await this.maestroSrv.stopSimulation(sender, payload)
if(!result.ok) {
replyToAction(this, { action, reqid, sender, success: false, err: result.err })
return
}
replyToAction(this, {
action,
reqid,
sender,
success: true,
payload: { simulationId: result.simulationId },
})
},
}
+20
View File
@@ -0,0 +1,20 @@
import { replyToAction } from '../../../bus/publishActionReply.js'
export const actions = {
async action_RELOADCONFIG(action, payload, reqid, sender, roles) {
this.reloadAccessRights()
replyToAction(this, { action, reqid, sender, success: true })
},
async action_GETCONFIG(action, payload, reqid, sender, roles) {
replyToAction(this, {
action,
reqid,
sender,
success: true,
payload: this.getAccessRights(),
})
},
}
@@ -1,3 +1,4 @@
const RESERVED_HASH_FIELDS = new Set(['position', 'vector', 'speed', 'segment'])
export class ArenaGroom { export class ArenaGroom {
@@ -25,6 +26,10 @@ export class ArenaGroom {
const key = this.agentHashKey(agent.id) const key = this.agentHashKey(agent.id)
await this.cnx.redisHset(key, 'position', agent.position) await this.cnx.redisHset(key, 'position', agent.position)
await this.cnx.redisHset(key, 'vector', agent.vector) await this.cnx.redisHset(key, 'vector', agent.vector)
for(const [field, value] of Object.entries(agent.store ?? {})) {
if(RESERVED_HASH_FIELDS.has(field)) continue
await this.cnx.redisHset(key, field, value)
}
await this.cnx.redisSadd(this.arenaStorage.agentsIndexKey, agent.id) await this.cnx.redisSadd(this.arenaStorage.agentsIndexKey, agent.id)
} }
if(this.debug) console.log(`[Maestro] Groomed ${agents.length} agent(s) into arena store`) if(this.debug) console.log(`[Maestro] Groomed ${agents.length} agent(s) into arena store`)
@@ -3,6 +3,8 @@ import { MySQLClient } from '@p42/p42modules'
import { SimRepository } from './simRepository.js' import { SimRepository } from './simRepository.js'
import { ArenaGroom } from './arenaGroom.js' import { ArenaGroom } from './arenaGroom.js'
import { MaestroState } from './orchestrationState.js' import { MaestroState } from './orchestrationState.js'
import { PrepareQuorum } from './prepareQuorum.js'
import { buildPrepareQuorum } from './primordialDaemons.js'
export class maestroServer { export class maestroServer {
@@ -18,12 +20,10 @@ export class maestroServer {
this.db = null this.db = null
this.simRepo = null this.simRepo = null
this.arenaGroom = null this.arenaGroom = null
this.prepareQuorum = null
this.orchestrationState = MaestroState.IDLE this.orchestrationState = MaestroState.IDLE
this.simulationId = null this.simulationId = null
this.agentIds = [] this.agentIds = []
this.readyGods = new Map()
this.readyQuorumResolve = null
this.readyQuorumTimer = null
} }
getMaestroSettings() { getMaestroSettings() {
@@ -32,9 +32,8 @@ export class maestroServer {
senderId: maestro.senderId ?? 'maestro', senderId: maestro.senderId ?? 'maestro',
lifecycle: { lifecycle: {
arenaChannel: maestro.lifecycle?.arenaChannel ?? 'arena:lifecycle', arenaChannel: maestro.lifecycle?.arenaChannel ?? 'arena:lifecycle',
godsReadyChannel: maestro.lifecycle?.godsReadyChannel ?? 'arena:gods:ready', prepareAckChannel: maestro.lifecycle?.godsReadyChannel ?? 'arena:gods:ready',
}, },
expectedGods: maestro.expectedGods ?? ['gps'],
readyTimeoutMs: maestro.readyTimeoutMs ?? 30000, readyTimeoutMs: maestro.readyTimeoutMs ?? 30000,
}) })
} }
@@ -77,6 +76,17 @@ export class maestroServer {
} }
} }
refreshPrepareQuorum() {
if(!this.arenaCnx) return
const { prepareAckChannel, readyTimeoutMs } = this.getMaestroSettings()
this.prepareQuorum = new PrepareQuorum({
ackChannel: prepareAckChannel,
timeoutMs: readyTimeoutMs,
matchesChan: this.arenaCnx.matchesChan.bind(this.arenaCnx),
debug: this.debug,
})
}
wireSystemConnexion(cnx) { wireSystemConnexion(cnx) {
cnx.maestroSrv = this cnx.maestroSrv = this
cnx.accessRights = this.accessRights cnx.accessRights = this.accessRights
@@ -93,6 +103,7 @@ export class maestroServer {
if(!this.arenaCnx || cnx.redisConfig.role === 'primary') { if(!this.arenaCnx || cnx.redisConfig.role === 'primary') {
this.arenaCnx = cnx this.arenaCnx = cnx
this.refreshArenaGroom() this.refreshArenaGroom()
this.refreshPrepareQuorum()
} }
} }
@@ -100,70 +111,17 @@ export class maestroServer {
this.orchestrationState = MaestroState.IDLE this.orchestrationState = MaestroState.IDLE
this.simulationId = null this.simulationId = null
this.agentIds = [] this.agentIds = []
this.readyGods.clear() this.prepareQuorum?.cancel()
this.clearReadyQuorumWait()
} }
clearReadyQuorumWait() { handlePrepareAck(msg, chan) {
if(this.readyQuorumTimer) { if(this.orchestrationState !== MaestroState.PREPARING) return(false)
clearTimeout(this.readyQuorumTimer) if(!this.prepareQuorum) return(false)
this.readyQuorumTimer = null return(this.prepareQuorum.handleMessage(msg, chan))
}
this.readyQuorumResolve = null
}
completeReadyQuorum(result) {
const resolve = this.readyQuorumResolve
this.clearReadyQuorumWait()
if(typeof(resolve) === 'function') resolve(result)
}
waitForReadyQuorum() {
const { readyTimeoutMs } = this.getMaestroSettings()
return(new Promise(resolve => {
this.readyQuorumResolve = resolve
this.readyQuorumTimer = setTimeout(() => {
this.completeReadyQuorum({
ok: false,
err: `Timeout waiting for readyToStart (${readyTimeoutMs}ms)`,
})
}, readyTimeoutMs)
}))
}
onReadyToStart(msg) {
if(this.orchestrationState !== MaestroState.PREPARING) return
const payload = msg.payload ?? {}
if(payload.simulationId !== this.simulationId) return
const sender = msg.sender
const { expectedGods } = this.getMaestroSettings()
if(!expectedGods.includes(sender)) {
if(this.debug) console.warn(`[Maestro] Ignoring readyToStart from unexpected sender: ${sender}`)
return
}
if(!payload.success) {
this.completeReadyQuorum({
ok: false,
err: payload.err ?? `Participant ${sender} failed prepare`,
})
return
}
this.readyGods.set(sender, payload)
if(this.debug) console.log(`[Maestro] readyToStart from ${sender} (${this.readyGods.size}/${expectedGods.length})`)
for(const god of expectedGods) {
if(!this.readyGods.has(god)) return
}
this.completeReadyQuorum({ ok: true })
} }
async publishLifecycle(eventType, payload) { async publishLifecycle(eventType, payload) {
if(!this.arenaCnx) throw new Error('No arena Redis connection') if(!this.arenaCnx) throw(new Error('No arena Redis connection'))
const { arenaChannel, senderId } = this.getMaestroSettings().lifecycle const { arenaChannel, senderId } = this.getMaestroSettings().lifecycle
await this.arenaCnx.redisPublish(arenaChannel, { await this.arenaCnx.redisPublish(arenaChannel, {
eventType, eventType,
@@ -176,15 +134,16 @@ export class maestroServer {
async startSimulation(userUuid, payload) { async startSimulation(userUuid, payload) {
if(!this.simRepo) return({ ok: false, err: 'Database not initialized' }) if(!this.simRepo) return({ ok: false, err: 'Database not initialized' })
if(!this.arenaGroom) return({ ok: false, err: 'No arena Redis connection' }) if(!this.arenaGroom) return({ ok: false, err: 'No arena Redis connection' })
if(!this.prepareQuorum) return({ ok: false, err: 'No prepare quorum (arena Redis not wired)' })
if(!this.isIdle()) return({ ok: false, err: 'A simulation is already in progress' }) if(!this.isIdle()) return({ ok: false, err: 'A simulation is already in progress' })
const simulationUuid = payload?.simulationUuid const simulationUuid = payload?.simulationUuid
const keyframeId = payload?.keyframeId
const infraId = payload?.infraId ?? null const infraId = payload?.infraId ?? null
const access = await this.simRepo.validateSimulationAccess(userUuid, simulationUuid, keyframeId) const access = await this.simRepo.validateSimulationAccess(userUuid, simulationUuid)
if(!access.ok) return(access) if(!access.ok) return(access)
const keyframeId = access.sim.sim_root_kf_uuid
const agentsResult = await this.simRepo.loadKeyframeAgents(keyframeId) const agentsResult = await this.simRepo.loadKeyframeAgents(keyframeId)
if(!agentsResult.ok) return(agentsResult) if(!agentsResult.ok) return(agentsResult)
@@ -193,7 +152,6 @@ export class maestroServer {
this.simulationId = simulationUuid this.simulationId = simulationUuid
this.agentIds = agentsResult.agents.map(a => a.id) this.agentIds = agentsResult.agents.map(a => a.id)
this.readyGods.clear()
this.orchestrationState = MaestroState.PREPARING this.orchestrationState = MaestroState.PREPARING
const lifecyclePayload = { const lifecyclePayload = {
@@ -203,7 +161,8 @@ export class maestroServer {
infraId, infraId,
} }
const readyWait = this.waitForReadyQuorum() const expectedParticipants = buildPrepareQuorum(this.agentIds, this.maestroConfig)
const readyWait = this.prepareQuorum.begin(expectedParticipants, this.simulationId)
await this.publishLifecycle('onYourMarks', lifecyclePayload) await this.publishLifecycle('onYourMarks', lifecyclePayload)
@@ -3,6 +3,7 @@ import yargs from 'yargs/yargs'
import { hideBin } from 'yargs/helpers' import { hideBin } from 'yargs/helpers'
import 'node:process' import 'node:process'
import { RedisConnexion } from '../redisConnexion.js' import { RedisConnexion } from '../redisConnexion.js'
import { busReplyRoute } from '../bus/publishActionReply.js'
import { configHelper } from '../configHelper.js' import { configHelper } from '../configHelper.js'
import { maestroServer } from './maestroServer.js' import { maestroServer } from './maestroServer.js'
import * as systemMesh from './actions/system/index.js' import * as systemMesh from './actions/system/index.js'
@@ -25,7 +26,7 @@ console.log = (...args) => logWithTimestamp(originalLog, 'LOG', ...args)
console.warn = (...args) => logWithTimestamp(originalWarn, 'WARN', ...args) console.warn = (...args) => logWithTimestamp(originalWarn, 'WARN', ...args)
console.error = (...args) => logWithTimestamp(originalError, 'ERROR', ...args) console.error = (...args) => logWithTimestamp(originalError, 'ERROR', ...args)
const argv = yargs(hideBin(process.argv)).command('SimMaestro', 'Simulation orchestrator for P42', {}) const argv = yargs(hideBin(process.argv)).command('Maestro', 'Simulation orchestrator for P42', {})
.options({ .options({
'debug': { 'debug': {
description: 'shows debug info', description: 'shows debug info',
@@ -49,6 +50,7 @@ let cfgh = new configHelper({
function meshRedisConns(mesh, meshName, debug, rootConfig) { function meshRedisConns(mesh, meshName, debug, rootConfig) {
const { redis, ...meshConfig } = mesh const { redis, ...meshConfig } = mesh
const busRoute = busReplyRoute(rootConfig.maestro, meshName)
return redis.map(cfg => return redis.map(cfg =>
new RedisConnexion({ new RedisConnexion({
debug, debug,
@@ -56,6 +58,8 @@ function meshRedisConns(mesh, meshName, debug, rootConfig) {
redisId: cfg.redisId, redisId: cfg.redisId,
meshName, meshName,
meshModule: meshModules[meshName], meshModule: meshModules[meshName],
senderId: busRoute?.senderId,
actionsReply: busRoute?.actionsReply,
}) })
) )
} }
@@ -1,5 +1,5 @@
{ {
"name": "p42SimMaestro", "name": "p42Maestro",
"version": "1.0.0", "version": "1.0.0",
"description": "Simulation orchestrator God-daemon for P42", "description": "Simulation orchestrator God-daemon for P42",
"type": "module", "type": "module",
+103
View File
@@ -0,0 +1,103 @@
export class PrepareQuorum {
constructor({ ackChannel, timeoutMs, matchesChan, debug = false }) {
this.ackChannel = ackChannel
this.timeoutMs = timeoutMs
this.matchesChan = matchesChan
this.debug = debug
this.expected = new Set()
this.ready = new Map()
this.simulationId = null
this.active = false
this.resolve = null
this.timer = null
}
begin(expectedParticipantIds, simulationId) {
this.cancel()
this.expected = new Set(expectedParticipantIds)
this.ready.clear()
this.simulationId = simulationId
this.active = true
if(this.debug) {
console.log(
`[Maestro] Prepare quorum armed: ${this.expected.size} participant(s) ` +
`(agents + primordial daemons), timeout ${this.timeoutMs}ms`
)
}
return(new Promise(resolve => {
this.resolve = resolve
this.timer = setTimeout(() => {
const missing = [...this.expected].filter(id => !this.ready.has(id))
this.#finish({
ok: false,
err: `Timeout waiting for readyToStart (${this.timeoutMs}ms); ` +
`missing: ${missing.join(', ') || 'unknown'}`,
})
}, this.timeoutMs)
}))
}
handleMessage(msg, chan) {
if(!this.active) return(false)
if(typeof(this.matchesChan) !== 'function') return(false)
if(!this.matchesChan(chan, this.ackChannel)) return(false)
if(msg?.eventType !== 'readyToStart') return(false)
const payload = msg.payload ?? {}
if(payload.simulationId !== this.simulationId) return(false)
const sender = msg.sender
if(!sender || !this.expected.has(sender)) {
if(this.debug) {
console.warn(`[Maestro] Ignoring readyToStart from unexpected participant: ${sender}`)
}
return(true)
}
if(!payload.success) {
this.#finish({
ok: false,
err: payload.err ?? `Participant ${sender} failed prepare`,
})
return(true)
}
this.ready.set(sender, payload)
if(this.debug) {
console.log(
`[Maestro] readyToStart from ${sender} ` +
`(${this.ready.size}/${this.expected.size})`
)
}
for(const participantId of this.expected) {
if(!this.ready.has(participantId)) return(true)
}
this.#finish({ ok: true })
return(true)
}
cancel() {
if(this.timer) {
clearTimeout(this.timer)
this.timer = null
}
this.active = false
this.resolve = null
this.expected.clear()
this.ready.clear()
this.simulationId = null
}
#finish(result) {
const resolve = this.resolve
this.cancel()
if(typeof(resolve) === 'function') resolve(result)
}
}
+23
View File
@@ -0,0 +1,23 @@
const SKIP_PRIMORDIAL_SECTIONS = new Set([
'maestro',
'mysql',
'accessRights',
'systemMesh',
'arenaMesh',
])
export function getPrimordialDaemonIds(config = {}) {
const ids = []
for(const [section, block] of Object.entries(config)) {
if(SKIP_PRIMORDIAL_SECTIONS.has(section)) continue
if(!block || typeof(block) !== 'object' || Array.isArray(block)) continue
if(!block.primordialDaemon) continue
ids.push(typeof(block.senderId) === 'string' ? block.senderId : section)
}
return(ids)
}
export function buildPrepareQuorum(agentIds, config) {
const primordialIds = getPrimordialDaemonIds(config)
return([...agentIds, ...primordialIds])
}
@@ -19,20 +19,27 @@ export class SimRepository {
return(`\`${db}\`.${table}`) return(`\`${db}\`.${table}`)
} }
#parseGpsValues(raw) { #parseJsonObject(raw) {
let v = raw let v = raw
if(v == null) return(null) if(v == null) return(null)
if(typeof(v) === 'string') { if(typeof(v) === 'string') {
try { v = JSON.parse(v) } try { v = JSON.parse(v) }
catch { return(null) } catch { return(null) }
} }
if(typeof(v) !== 'object') return(null) if(typeof(v) !== 'object' || Array.isArray(v)) return(null)
return(v)
}
#parseGpsValues(raw) {
const v = this.#parseJsonObject(raw)
if(!v) return(null)
const position = v.position const position = v.position
const speed = v.speed ?? v.vector const speed = v.speed ?? v.vector
if(!position || !speed) return(null) if(!position || !speed) return(null)
const axes = ['x', 'y', 'z'] const axes = ['x', 'y', 'z']
for(const axis of axes) { for(const axis of axes) {
if(typeof(position[axis]) !== 'number' || typeof(speed[axis]) !== 'number') return(null) if(typeof(position[axis]) !== 'number' || !Number.isFinite(position[axis])) return(null)
if(typeof(speed[axis]) !== 'number' || !Number.isFinite(speed[axis])) return(null)
} }
return({ return({
position: { x: position.x, y: position.y, z: position.z }, position: { x: position.x, y: position.y, z: position.z },
@@ -40,10 +47,15 @@ export class SimRepository {
}) })
} }
async validateSimulationAccess(userUuid, simulationUuid, keyframeId) { #parseStoreValues(raw) {
const v = this.#parseJsonObject(raw)
if(v == null) return(null)
return({ ...v })
}
async validateSimulationAccess(userUuid, simulationUuid) {
if(!isValidUuid(userUuid)) return({ ok: false, err: 'Invalid user UUID' }) if(!isValidUuid(userUuid)) return({ ok: false, err: 'Invalid user UUID' })
if(!isValidUuid(simulationUuid)) return({ ok: false, err: 'Invalid simulation UUID' }) if(!isValidUuid(simulationUuid)) return({ ok: false, err: 'Invalid simulation UUID' })
if(!isValidUuid(keyframeId)) return({ ok: false, err: 'Invalid keyframe ID' })
const rows = await MySQLClient.poolExecute(this.db, ` const rows = await MySQLClient.poolExecute(this.db, `
SELECT s.sim_id, SELECT s.sim_id,
@@ -59,16 +71,14 @@ export class SimRepository {
if(!rows.length) return({ ok: false, err: 'Simulation not found or access denied' }) if(!rows.length) return({ ok: false, err: 'Simulation not found or access denied' })
const sim = rows[0] const sim = rows[0]
if(sim.sim_root_kf_uuid !== keyframeId) { if(!sim.sim_root_kf_uuid) return({ ok: false, err: 'Simulation has no root keyframe' })
return({ ok: false, err: 'Keyframe does not match simulation root keyframe' })
}
const kfRows = await MySQLClient.poolExecute(this.db, ` const kfRows = await MySQLClient.poolExecute(this.db, `
SELECT ekf_uuid SELECT ekf_uuid
FROM ${this.#qualify(this.simDb, 'edited_keyframes')} FROM ${this.#qualify(this.simDb, 'edited_keyframes')}
WHERE ekf_uuid = UUID_TO_BIN(?) WHERE ekf_uuid = UUID_TO_BIN(?)
`, [keyframeId]) `, [sim.sim_root_kf_uuid])
if(!kfRows.length) return({ ok: false, err: 'Keyframe not found' }) if(!kfRows.length) return({ ok: false, err: 'Root keyframe not found' })
return({ ok: true, sim }) return({ ok: true, sim })
} }
@@ -94,7 +104,9 @@ export class SimRepository {
async loadKeyframeAgents(keyframeId) { async loadKeyframeAgents(keyframeId) {
const rows = await MySQLClient.poolExecute(this.db, ` const rows = await MySQLClient.poolExecute(this.db, `
SELECT BIN_TO_UUID(ekfs_agent_id) AS agent_id, ekfs_gps_values SELECT BIN_TO_UUID(ekfs_agent_id) AS agent_id,
ekfs_gps_values,
ekfs_store_values
FROM ${this.#qualify(this.simDb, 'edited_kf_store')} FROM ${this.#qualify(this.simDb, 'edited_kf_store')}
WHERE ekfs_ekf_uuid = UUID_TO_BIN(?) WHERE ekfs_ekf_uuid = UUID_TO_BIN(?)
`, [keyframeId]) `, [keyframeId])
@@ -108,10 +120,16 @@ export class SimRepository {
errors.push(`Invalid GPS values for agent ${row.agent_id}`) errors.push(`Invalid GPS values for agent ${row.agent_id}`)
continue continue
} }
const store = this.#parseStoreValues(row.ekfs_store_values)
if(store == null) {
errors.push(`Invalid store values for agent ${row.agent_id}`)
continue
}
agents.push({ agents.push({
id: row.agent_id, id: row.agent_id,
position: parsed.position, position: parsed.position,
vector: parsed.vector, vector: parsed.vector,
store,
}) })
} }
@@ -4,7 +4,7 @@ set -a
. /etc/p42/secrets.env . /etc/p42/secrets.env
set +a set +a
daemon=p42SimMaestro daemon=p42Maestro
logfile=maestro.log logfile=maestro.log
pid=$(pgrep -f "$daemon") pid=$(pgrep -f "$daemon")
@@ -1,6 +1,6 @@
#!/bin/sh #!/bin/sh
pid=`ps -ef | grep p42SimMaestro.js |grep -v grep | awk '{print $2}'` pid=`ps -ef | grep p42Maestro.js |grep -v grep | awk '{print $2}'`
if [ -n "$pid" ] if [ -n "$pid" ]
then then
echo "killing pid: $pid" echo "killing pid: $pid"
-32
View File
@@ -1,32 +0,0 @@
export const construct = (redisCnx) => {
}
export const methods = {
handleLifecycleEvent(msg) {
const srv = this.observerSrv
if(!srv) return
if(msg.eventType === 'onYourMarks') {
srv.onYourMarks()
return
}
if(msg.eventType === 'bigBang') {
srv.onBigBang()
}
},
dispatchArenaMessage(msg, chan) {
const observer = this.config.observer
if(!observer || !this.observerSrv) return(false)
if(this.matchesChan(chan, observer.lifecycle?.arenaChannel ?? 'arena:lifecycle')) {
this.handleLifecycleEvent(msg)
return(true)
}
return(false)
},
}
-5
View File
@@ -1,5 +0,0 @@
export function dispatchMessage(redisCnx, msg, chan) {
if(typeof(redisCnx.dispatchArenaMessage) !== 'function') return
redisCnx.dispatchArenaMessage(msg, chan)
}
+14 -9
View File
@@ -1,12 +1,17 @@
import { methods as arenaMethods, construct as arenaConstruct } from './arenaHandlers.js' import { assembleHandlers, createDispatchMessage } from '../../../bus/assembleMesh.js'
import { dispatchMessage } from './dispatch.js' import * as lifecycle from './lifecycle.js'
export const afterLoginMethods = [ const { actionHandlers, eventHandlers, afterLogin } = assembleHandlers([lifecycle])
arenaConstruct,
]
export const meshActions = { export { actionHandlers, afterLogin }
...arenaMethods,
}
export { dispatchMessage } export const dispatchMessage = createDispatchMessage({
eventHandlers,
actionRules(redisCnx) {
const observer = redisCnx.config.observer ?? {}
const arenaChannel = observer.bus?.arena?.actionsChannel
return({
channels: arenaChannel ? [arenaChannel] : [],
})
},
})
+11
View File
@@ -0,0 +1,11 @@
export const eventHandlers = {
'arena:lifecycle': {
onYourMarks(msg, chan) {
this.observerSrv?.onYourMarks()
},
bigBang(msg, chan) {
this.observerSrv?.onBigBang()
},
},
}
-28
View File
@@ -1,28 +0,0 @@
export function dispatchMessage(redisCnx, msg, chan) {
const observer = redisCnx.config.observer
if(!observer?.observerActionsChannel) return
const actionsChan = redisCnx.fullChan(observer.observerActionsChannel)
if(chan != actionsChan) return
const action = msg.action
if(!action || typeof(action) !== 'string') {
console.warn(`[${redisCnx.redisId}] Ignoring message without action on ${chan}`)
return
}
const handler = redisCnx['action_'+action]
if(typeof(handler) != 'function') {
if(redisCnx.debug) console.warn(`[${redisCnx.redisId}] Unknown action ${action} on ${chan}`)
return
}
const payload = ('payload' in msg) ? msg.payload : null
const reqid = ('reqid' in msg) ? msg.reqid.substr(0, 50) : null
const sender = msg.sender || null
const roles = Array.isArray(msg.roles) ? msg.roles : ['*']
if(redisCnx.debug) console.log(`[${redisCnx.redisId}] Dispatching action ${action} from ${sender}`)
handler.call(redisCnx, action, payload, reqid, sender, roles)
}
+14 -11
View File
@@ -1,14 +1,17 @@
import { methods as utilities, construct as utilitiesConstruct } from './utilities.js' import { assembleHandlers, createDispatchMessage } from '../../../bus/assembleMesh.js'
import { methods as positions } from './positions.js' import * as positions from './positions.js'
import { dispatchMessage } from './dispatch.js' import * as utilities from './utilities.js'
export const afterLoginMethods = [ const { actionHandlers, eventHandlers, afterLogin } = assembleHandlers([positions, utilities])
utilitiesConstruct,
]
export const meshActions = { export { actionHandlers, afterLogin }
...utilities,
...positions,
}
export { dispatchMessage } export const dispatchMessage = createDispatchMessage({
eventHandlers,
actionRules(redisCnx) {
const observer = redisCnx.config.observer ?? {}
return({
channels: [observer.observerActionsChannel].filter(Boolean),
})
},
})
+27 -191
View File
@@ -1,257 +1,93 @@
import { publishActionReply, parseSimTime } from '../../actionsHelper.js' import { replyToAction } from '../../../bus/publishActionReply.js'
import { parseSimTime } from '../../actionsHelper.js'
import { Frustum } from '../../frustum.js' import { Frustum } from '../../frustum.js'
export const methods = { export const actions = {
/* Event-Rx:
{
"action": "GETAGENTPOSITION",
"reqid": "6az5e4r6a",
"payload": {
"agentId": "agent42",
"t": 12.5
}
}
Event-Tx:
{
"action": "GETAGENTPOSITION",
"success": true,
"reqid": "6az5e4r6a",
"payload": {
"agent": {
"id": "agent42",
"position": { "x": 1, "y": 2, "z": 3 },
"vector": { "x": 0, "y": 0, "z": 0 },
"since": 0,
"generation": 2,
"t": 12.5
}
}
}
*/
async action_GETAGENTPOSITION(action, payload, reqid, sender, roles) { async action_GETAGENTPOSITION(action, payload, reqid, sender, roles) {
const replyOpts = {
action,
reqid,
sender,
replyChannel: this.config.observer.observerActionsReply,
}
if(!this.accessRights.canDo(roles, action)) {
publishActionReply(this, { ...replyOpts, reply: {
success: false,
err: 'Unauthorized action !',
} })
return
}
const reader = this.observerSrv.gpsStorageReader const reader = this.observerSrv.gpsStorageReader
if(!reader) { if(!reader) {
publishActionReply(this, { ...replyOpts, reply: { replyToAction(this, { action, reqid, sender, success: false, err: 'GPS storage reader not ready' })
success: false,
err: 'GPS storage reader not ready',
} })
return return
} }
if(!this.observerSrv.isLive()) { if(!this.observerSrv.isLive()) {
publishActionReply(this, { ...replyOpts, reply: { replyToAction(this, { action, reqid, sender, success: false, err: 'Simulation not live' })
success: false,
err: 'Simulation not live',
} })
return return
} }
const agentId = payload?.agentId const agentId = payload?.agentId
if(!agentId || typeof(agentId) !== 'string') { if(!agentId || typeof(agentId) !== 'string') {
publishActionReply(this, { ...replyOpts, reply: { replyToAction(this, { action, reqid, sender, success: false, err: 'Missing or invalid agentId' })
success: false,
err: 'Missing or invalid agentId',
} })
return return
} }
const at = parseSimTime(payload, () => this.observerSrv.now()) const at = parseSimTime(payload, () => this.observerSrv.now())
if(at === null) { if(at === null) {
publishActionReply(this, { ...replyOpts, reply: { replyToAction(this, { action, reqid, sender, success: false, err: 'Invalid simulation time' })
success: false,
err: 'Invalid simulation time',
} })
return return
} }
const agent = await reader.getAgentPosition(agentId, at) const agent = await reader.getAgentPosition(agentId, at)
if(!agent) { if(!agent) {
publishActionReply(this, { ...replyOpts, reply: { replyToAction(this, { action, reqid, sender, success: false, err: `Unknown agent: ${agentId}` })
success: false,
err: `Unknown agent: ${agentId}`,
} })
return return
} }
publishActionReply(this, { ...replyOpts, reply: { replyToAction(this, { action, reqid, sender, success: true, payload: { agent } })
success: true,
payload: { agent },
} })
}, },
/* Event-Rx:
{
"action": "GETAGENTSINFRUSTUM",
"reqid": "6az5e4r6a",
"payload": {
"planes": [
{ "nx": 1, "ny": 0, "nz": 0, "d": -10 },
{ "nx": -1, "ny": 0, "nz": 0, "d": 10 },
{ "nx": 0, "ny": 1, "nz": 0, "d": -10 },
{ "nx": 0, "ny": -1, "nz": 0, "d": 10 },
{ "nx": 0, "ny": 0, "nz": 1, "d": 0 },
{ "nx": 0, "ny": 0, "nz": -1, "d": 5 }
],
"t": 0
}
}
*/
async action_GETAGENTSINFRUSTUM(action, payload, reqid, sender, roles) { async action_GETAGENTSINFRUSTUM(action, payload, reqid, sender, roles) {
const replyOpts = {
action,
reqid,
sender,
replyChannel: this.config.observer.observerActionsReply,
}
if(!this.accessRights.canDo(roles, action)) {
publishActionReply(this, { ...replyOpts, reply: {
success: false,
err: 'Unauthorized action !',
} })
return
}
const registry = this.observerSrv.requestorRegistry const registry = this.observerSrv.requestorRegistry
if(!registry) { if(!registry) {
publishActionReply(this, { ...replyOpts, reply: { replyToAction(this, { action, reqid, sender, success: false, err: 'Requestor registry not ready' })
success: false,
err: 'Requestor registry not ready',
} })
return return
} }
if(!this.observerSrv.isLive()) { if(!this.observerSrv.isLive()) {
publishActionReply(this, { ...replyOpts, reply: { replyToAction(this, { action, reqid, sender, success: false, err: 'Simulation not live' })
success: false,
err: 'Simulation not live',
} })
return return
} }
const frustum = Frustum.fromPlanes(payload?.planes) const frustum = Frustum.fromPlanes(payload?.planes)
if(!frustum) { if(!frustum) {
publishActionReply(this, { ...replyOpts, reply: { replyToAction(this, { action, reqid, sender, success: false, err: 'Missing or invalid frustum planes (expected 6)' })
success: false,
err: 'Missing or invalid frustum planes (expected 6)',
} })
return return
} }
const at = parseSimTime(payload, () => this.observerSrv.now()) const at = parseSimTime(payload, () => this.observerSrv.now())
if(at === null) { if(at === null) {
publishActionReply(this, { ...replyOpts, reply: { replyToAction(this, { action, reqid, sender, success: false, err: 'Invalid simulation time' })
success: false,
err: 'Invalid simulation time',
} })
return return
} }
const result = await registry.evaluateOnce({ frustum, t: at }) const result = await registry.evaluateOnce({ frustum, t: at })
if(!result.ok) { if(!result.ok) {
publishActionReply(this, { ...replyOpts, reply: { replyToAction(this, { action, reqid, sender, success: false, err: result.err })
success: false,
err: result.err,
} })
return return
} }
publishActionReply(this, { ...replyOpts, reply: { replyToAction(this, {
action,
reqid,
sender,
success: true, success: true,
payload: { payload: {
agents: result.agents, agents: result.agents,
t: result.t, t: result.t,
}, },
} }) })
}, },
/* Event-Rx:
{
"action": "SUBSCRIBEFRUSTUM",
"reqid": "6az5e4r6a",
"sender": "client-uuid",
"payload": {
"planes": [
{ "nx": 1, "ny": 0, "nz": 0, "d": -10 },
{ "nx": -1, "ny": 0, "nz": 0, "d": 10 },
{ "nx": 0, "ny": 1, "nz": 0, "d": -10 },
{ "nx": 0, "ny": -1, "nz": 0, "d": 10 },
{ "nx": 0, "ny": 0, "nz": 1, "d": 0 },
{ "nx": 0, "ny": 0, "nz": -1, "d": 5 }
],
"frequency": 800
}
}
Event-Tx:
{
"action": "SUBSCRIBEFRUSTUM",
"success": true,
"reqid": "6az5e4r6a",
"payload": {
"frequency": 900,
"agents": [ ... ],
"t": 12.5
}
}
Periodic push (no reqid):
{
"action": "GETAGENTSINFRUSTUM",
"success": true,
"sender": "observer",
"payload": { "agents": [ ... ], "t": 12.5 }
}
*/
async action_SUBSCRIBEFRUSTUM(action, payload, reqid, sender, roles) { async action_SUBSCRIBEFRUSTUM(action, payload, reqid, sender, roles) {
const replyOpts = {
action,
reqid,
sender,
replyChannel: this.config.observer.observerActionsReply,
}
if(!this.accessRights.canDo(roles, action)) {
publishActionReply(this, { ...replyOpts, reply: {
success: false,
err: 'Unauthorized action !',
} })
return
}
if(!sender || typeof(sender) !== 'string') {
publishActionReply(this, { ...replyOpts, reply: {
success: false,
err: 'Missing or invalid sender',
} })
return
}
const registry = this.observerSrv.requestorRegistry const registry = this.observerSrv.requestorRegistry
if(!registry) { if(!registry) {
publishActionReply(this, { ...replyOpts, reply: { replyToAction(this, { action, reqid, sender, success: false, err: 'Requestor registry not ready' })
success: false,
err: 'Requestor registry not ready',
} })
return return
} }
if(!this.observerSrv.isLive()) { if(!this.observerSrv.isLive()) {
publishActionReply(this, { ...replyOpts, reply: { replyToAction(this, { action, reqid, sender, success: false, err: 'Simulation not live' })
success: false,
err: 'Simulation not live',
} })
return return
} }
@@ -260,21 +96,21 @@ export const methods = {
frequency: payload?.frequency, frequency: payload?.frequency,
}) })
if(!result.ok) { if(!result.ok) {
publishActionReply(this, { ...replyOpts, reply: { replyToAction(this, { action, reqid, sender, success: false, err: result.err })
success: false,
err: result.err,
} })
return return
} }
publishActionReply(this, { ...replyOpts, reply: { replyToAction(this, {
action,
reqid,
sender,
success: true, success: true,
payload: { payload: {
frequency: result.frequency, frequency: result.frequency,
agents: result.agents, agents: result.agents,
t: result.t, t: result.t,
}, },
} }) })
}, },
} }
+5 -33
View File
@@ -1,48 +1,20 @@
import { publishActionReply } from '../../actionsHelper.js' import { replyToAction } from '../../../bus/publishActionReply.js'
export const construct = (redisCnx) => { export const actions = {
}
export const methods = {
async action_RELOADCONFIG(action, payload, reqid, sender, roles) { async action_RELOADCONFIG(action, payload, reqid, sender, roles) {
const replyOpts = {
action,
reqid,
sender,
replyChannel: this.config.observer.observerActionsReply,
}
if(!this.accessRights.canDo(roles, action)) {
publishActionReply(this, { ...replyOpts, reply: {
success: false,
err: 'Unauthorized action !',
} })
return
}
this.reloadAccessRights() this.reloadAccessRights()
publishActionReply(this, { ...replyOpts, reply: { replyToAction(this, { action, reqid, sender, success: true })
success: true,
} })
}, },
async action_GETCONFIG(action, payload, reqid, sender, roles) { async action_GETCONFIG(action, payload, reqid, sender, roles) {
const replyOpts = { replyToAction(this, {
action, action,
reqid, reqid,
sender, sender,
replyChannel: this.config.observer.observerActionsReply,
}
if(!this.accessRights.canDo(roles, action)) {
publishActionReply(this, { ...replyOpts, reply: {
success: false,
err: 'Unauthorized action !',
} })
return
}
publishActionReply(this, { ...replyOpts, reply: {
success: true, success: true,
payload: this.getAccessRights(), payload: this.getAccessRights(),
} }) })
}, },
} }
-16
View File
@@ -1,20 +1,4 @@
export function publishActionReply(redisCnx, options) {
const {
action,
reqid,
sender,
reply,
replyChannel,
senderId = 'observer',
} = options
reply.action = action
reply.sender = senderId
if(reqid) reply.reqid = reqid
const chan = replyChannel.replace(/\[UID\]/g, sender)
redisCnx.redisPublish(chan, reply)
}
export function parseSimTime(payload, fallbackFn) { export function parseSimTime(payload, fallbackFn) {
if(payload?.t != null && typeof(payload.t) === 'number' && !Number.isNaN(payload.t)) return(payload.t) if(payload?.t != null && typeof(payload.t) === 'number' && !Number.isNaN(payload.t)) return(payload.t)
if(payload?.at != null && typeof(payload.at) === 'number' && !Number.isNaN(payload.at)) return(payload.at) if(payload?.at != null && typeof(payload.at) === 'number' && !Number.isNaN(payload.at)) return(payload.at)
+2 -5
View File
@@ -1,7 +1,7 @@
import { AccesRights } from '../accesRights.js' import { AccesRights } from '../accesRights.js'
import { GpsStorageReader } from './gpsStorageReader.js' import { GpsStorageReader } from './gpsStorageReader.js'
import { RequestorRegistry } from './requestorRegistry.js' import { RequestorRegistry } from './requestorRegistry.js'
import { publishActionReply } from './actionsHelper.js' import { replyToAction } from '../bus/publishActionReply.js'
import { SimState } from '../GPS/simulationState.js' import { SimState } from '../GPS/simulationState.js'
export class observerServer { export class observerServer {
@@ -61,14 +61,11 @@ export class observerServer {
publishFrustumUpdate(sender, payload) { publishFrustumUpdate(sender, payload) {
if(!this.systemCnx || !sender) return if(!this.systemCnx || !sender) return
const observer = this.observerConfig.observer ?? {} const observer = this.observerConfig.observer ?? {}
publishActionReply(this.systemCnx, { replyToAction(this.systemCnx, {
action: 'GETAGENTSINFRUSTUM', action: 'GETAGENTSINFRUSTUM',
sender, sender,
replyChannel: observer.observerActionsReply ?? 'system:replies:[UID]',
reply: {
success: true, success: true,
payload, payload,
},
}) })
} }
+4
View File
@@ -3,6 +3,7 @@ import yargs from 'yargs/yargs'
import { hideBin } from 'yargs/helpers' import { hideBin } from 'yargs/helpers'
import 'node:process' import 'node:process'
import { RedisConnexion } from '../redisConnexion.js' import { RedisConnexion } from '../redisConnexion.js'
import { busReplyRoute } from '../bus/publishActionReply.js'
import { configHelper } from '../configHelper.js' import { configHelper } from '../configHelper.js'
import { observerServer } from './observerServer.js' import { observerServer } from './observerServer.js'
import * as systemMesh from './actions/system/index.js' import * as systemMesh from './actions/system/index.js'
@@ -49,6 +50,7 @@ let cfgh = new configHelper({
function meshRedisConns(mesh, meshName, debug, rootConfig) { function meshRedisConns(mesh, meshName, debug, rootConfig) {
const { redis, ...meshConfig } = mesh const { redis, ...meshConfig } = mesh
const busRoute = busReplyRoute(rootConfig.observer, meshName)
return redis.map(cfg => return redis.map(cfg =>
new RedisConnexion({ new RedisConnexion({
debug, debug,
@@ -56,6 +58,8 @@ function meshRedisConns(mesh, meshName, debug, rootConfig) {
redisId: cfg.redisId, redisId: cfg.redisId,
meshName, meshName,
meshModule: meshModules[meshName], meshModule: meshModules[meshName],
senderId: busRoute?.senderId,
actionsReply: busRoute?.actionsReply,
}) })
) )
} }
-22
View File
@@ -1,22 +0,0 @@
export const construct = (redisCnx) => {
}
export const methods = {
dispatchArenaMessage(msg, chan) {
const maestro = this.config.maestro
if(!maestro || !this.maestroSrv) return(false)
if(this.matchesChan(chan, maestro.lifecycle?.godsReadyChannel ?? 'arena:gods:ready')) {
if(msg.eventType === 'readyToStart') {
this.maestroSrv.onReadyToStart(msg)
return(true)
}
}
if(this.debug) console.log(`[${this.redisId}] Arena message (unhandled):`, msg.eventType, chan)
return(false)
},
}
-5
View File
@@ -1,5 +0,0 @@
export function dispatchMessage(redisCnx, msg, chan) {
if(typeof(redisCnx.dispatchArenaMessage) !== 'function') return
redisCnx.dispatchArenaMessage(msg, chan)
}
-12
View File
@@ -1,12 +0,0 @@
import { methods as arenaMethods, construct as arenaConstruct } from './arenaHandlers.js'
import { dispatchMessage } from './dispatch.js'
export const afterLoginMethods = [
arenaConstruct,
]
export const meshActions = {
...arenaMethods,
}
export { dispatchMessage }
-28
View File
@@ -1,28 +0,0 @@
export function dispatchMessage(redisCnx, msg, chan) {
const maestro = redisCnx.config.maestro
if(!maestro?.maestroActionsChannel) return
const actionsChan = redisCnx.fullChan(maestro.maestroActionsChannel)
if(chan != actionsChan) return
const action = msg.action
if(!action || typeof(action) !== 'string') {
console.warn(`[${redisCnx.redisId}] Ignoring message without action on ${chan}`)
return
}
const handler = redisCnx['action_'+action]
if(typeof(handler) != 'function') {
if(redisCnx.debug) console.warn(`[${redisCnx.redisId}] Unknown action ${action} on ${chan}`)
return
}
const payload = ('payload' in msg) ? msg.payload : null
const reqid = ('reqid' in msg) ? msg.reqid.substr(0, 50) : null
const sender = msg.sender || null
const roles = Array.isArray(msg.roles) ? msg.roles : ['*']
if(redisCnx.debug) console.log(`[${redisCnx.redisId}] Dispatching action ${action} from ${sender}`)
handler.call(redisCnx, action, payload, reqid, sender, roles)
}
-14
View File
@@ -1,14 +0,0 @@
import { methods as utilities, construct as utilitiesConstruct } from './utilities.js'
import { methods as simulation } from './simulation.js'
import { dispatchMessage } from './dispatch.js'
export const afterLoginMethods = [
utilitiesConstruct,
]
export const meshActions = {
...utilities,
...simulation,
}
export { dispatchMessage }
-138
View File
@@ -1,138 +0,0 @@
import { publishActionReply } from '../../actionsHelper.js'
import { isValidUuid } from '../../simRepository.js'
export const methods = {
/* Event-Rx:
{
"action": "STARTSIMULATION",
"reqid": "6az5e4r6a",
"sender": "<user-uuid>",
"roles": ["*"],
"payload": {
"simulationUuid": "...",
"keyframeId": "...",
"infraId": "..."
}
}
*/
async action_STARTSIMULATION(action, payload, reqid, sender, roles) {
const replyOpts = {
action,
reqid,
sender,
replyChannel: this.config.maestro.maestroActionsReply,
}
if(!this.accessRights.canDo(roles, action)) {
publishActionReply(this, { ...replyOpts, reply: {
success: false,
err: 'Unauthorized action !',
} })
return
}
if(!sender || !isValidUuid(sender)) {
publishActionReply(this, { ...replyOpts, reply: {
success: false,
err: 'Missing or invalid sender (user UUID)',
} })
return
}
if(!payload?.simulationUuid || !payload?.keyframeId) {
publishActionReply(this, { ...replyOpts, reply: {
success: false,
err: 'Missing simulationUuid or keyframeId',
} })
return
}
try {
const result = await this.maestroSrv.startSimulation(sender, payload)
if(!result.ok) {
publishActionReply(this, { ...replyOpts, reply: {
success: false,
err: result.err,
} })
return
}
publishActionReply(this, { ...replyOpts, reply: {
success: true,
payload: {
simulationId: result.simulationId,
keyframeId: result.keyframeId,
infraId: result.infraId,
agentIds: result.agentIds,
},
} })
} catch(err) {
console.error(`[${this.redisId}] STARTSIMULATION failed:`, err)
publishActionReply(this, { ...replyOpts, reply: {
success: false,
err: err.message ?? 'STARTSIMULATION failed',
} })
}
},
/* Event-Rx:
{
"action": "STOPSIMULATION",
"reqid": "6az5e4r6a",
"sender": "<user-uuid>",
"payload": { "simulationUuid": "..." }
}
*/
async action_STOPSIMULATION(action, payload, reqid, sender, roles) {
const replyOpts = {
action,
reqid,
sender,
replyChannel: this.config.maestro.maestroActionsReply,
}
if(!this.accessRights.canDo(roles, action)) {
publishActionReply(this, { ...replyOpts, reply: {
success: false,
err: 'Unauthorized action !',
} })
return
}
if(!sender || !isValidUuid(sender)) {
publishActionReply(this, { ...replyOpts, reply: {
success: false,
err: 'Missing or invalid sender (user UUID)',
} })
return
}
if(!payload?.simulationUuid) {
publishActionReply(this, { ...replyOpts, reply: {
success: false,
err: 'Missing simulationUuid',
} })
return
}
try {
const result = await this.maestroSrv.stopSimulation(sender, payload)
if(!result.ok) {
publishActionReply(this, { ...replyOpts, reply: {
success: false,
err: result.err,
} })
return
}
publishActionReply(this, { ...replyOpts, reply: {
success: true,
payload: { simulationId: result.simulationId },
} })
} catch(err) {
console.error(`[${this.redisId}] STOPSIMULATION failed:`, err)
publishActionReply(this, { ...replyOpts, reply: {
success: false,
err: err.message ?? 'STOPSIMULATION failed',
} })
}
},
}
-48
View File
@@ -1,48 +0,0 @@
import { publishActionReply } from '../../actionsHelper.js'
export const construct = (redisCnx) => {
}
export const methods = {
async action_RELOADCONFIG(action, payload, reqid, sender, roles) {
const replyOpts = {
action,
reqid,
sender,
replyChannel: this.config.maestro.maestroActionsReply,
}
if(!this.accessRights.canDo(roles, action)) {
publishActionReply(this, { ...replyOpts, reply: {
success: false,
err: 'Unauthorized action !',
} })
return
}
this.reloadAccessRights()
publishActionReply(this, { ...replyOpts, reply: {
success: true,
} })
},
async action_GETCONFIG(action, payload, reqid, sender, roles) {
const replyOpts = {
action,
reqid,
sender,
replyChannel: this.config.maestro.maestroActionsReply,
}
if(!this.accessRights.canDo(roles, action)) {
publishActionReply(this, { ...replyOpts, reply: {
success: false,
err: 'Unauthorized action !',
} })
return
}
publishActionReply(this, { ...replyOpts, reply: {
success: true,
payload: this.getAccessRights(),
} })
},
}
-16
View File
@@ -1,16 +0,0 @@
export function publishActionReply(redisCnx, options) {
const {
action,
reqid,
sender,
reply,
replyChannel,
senderId = 'maestro',
} = options
reply.action = action
reply.sender = senderId
if(reqid) reply.reqid = reqid
const chan = replyChannel.replace(/\[UID\]/g, sender)
redisCnx.redisPublish(chan, reply)
}
+40
View File
@@ -0,0 +1,40 @@
import { dispatchActions } from './dispatchActions.js'
import { dispatchEvents } from './dispatchEvents.js'
export function assembleHandlers(modules) {
const actions = {}
const tree = {}
const afterLogin = []
for(const mod of modules) {
if(mod.actions) Object.assign(actions, mod.actions)
if(typeof(mod.construct) === 'function') afterLogin.push(mod.construct)
if(!mod.eventHandlers) continue
for(const [channelPattern, byType] of Object.entries(mod.eventHandlers)) {
if(!tree[channelPattern]) tree[channelPattern] = {}
for(const [eventType, handler] of Object.entries(byType)) {
if(!tree[channelPattern][eventType]) tree[channelPattern][eventType] = []
const list = Array.isArray(handler) ? handler : [handler]
tree[channelPattern][eventType].push(...list)
}
}
}
return({
actionHandlers: actions,
eventHandlers: tree,
afterLogin,
})
}
export function createDispatchMessage({ eventHandlers, actionRules }) {
return(async function dispatchMessage(redisCnx, msg, chan) {
if(msg.action && msg.eventType) {
console.warn(`[${redisCnx.redisId}] Message has both action and eventType on ${chan}`)
return(false)
}
if(msg.action) return(dispatchActions(redisCnx, msg, chan, actionRules(redisCnx)))
if(msg.eventType) return(dispatchEvents(redisCnx, msg, chan, eventHandlers))
return(false)
})
}
+78
View File
@@ -0,0 +1,78 @@
import { replyToAction } from './publishActionReply.js'
function matchesActionsChannel(redisCnx, chan, channels) {
if(!Array.isArray(channels) || !channels.length) return(false)
for(const configured of channels) {
if(!configured) continue
if(redisCnx.fullChan(configured) === chan) return(true)
}
return(false)
}
export async function dispatchActions(redisCnx, msg, chan, rules) {
if(!matchesActionsChannel(redisCnx, chan, rules.channels)) return(false)
const action = msg.action
const sender = msg.sender ?? null
const reqid = ('reqid' in msg) ? msg.reqid.substr(0, 50) : null
const roles = Array.isArray(msg.roles) ? msg.roles : ['*']
if(!action || typeof(action) !== 'string') {
if(!sender) return(true)
replyToAction(redisCnx, {
action,
reqid,
sender,
success: false,
err: 'Missing or invalid action',
})
return(true)
}
if(!sender) {
console.warn(`[${redisCnx.redisId}] Action ${action} without sender on ${chan}`)
return(true)
}
if(redisCnx.accessRights && !redisCnx.accessRights.canDo(roles, action, sender)) {
replyToAction(redisCnx, {
action,
reqid,
sender,
success: false,
err: 'Unauthorized action !',
})
return(true)
}
const handler = redisCnx['action_'+action]
if(typeof(handler) !== 'function') {
replyToAction(redisCnx, {
action,
reqid,
sender,
success: false,
err: `Unknown action: ${action}`,
})
return(true)
}
if(redisCnx.debug) {
console.log(`[${redisCnx.redisId}] Dispatching action ${action} from ${sender}`)
}
try {
await handler.call(redisCnx, action, ('payload' in msg) ? msg.payload : null, reqid, sender, roles)
} catch(err) {
console.error(`[${redisCnx.redisId}] Action ${action} failed:`, err)
replyToAction(redisCnx, {
action,
reqid,
sender,
success: false,
err: err.message ?? `${action} failed`,
})
}
return(true)
}
+33
View File
@@ -0,0 +1,33 @@
export function dispatchEvents(redisCnx, msg, chan, eventHandlers) {
const eventType = msg.eventType
if(!eventType || typeof(eventType) !== 'string') return(false)
let handled = false
for(const [channelPattern, byType] of Object.entries(eventHandlers ?? {})) {
if(!redisCnx.matchesChan(chan, channelPattern)) continue
const handlers = byType[eventType]
if(!handlers?.length) continue
for(const handle of handlers) {
try {
handle.call(redisCnx, msg, chan)
} catch(err) {
console.error(
`[${redisCnx.redisId}] Event ${eventType} on ${chan} failed:`,
err
)
}
}
handled = true
}
if(!handled && redisCnx.debug) {
console.log(`[${redisCnx.redisId}] Unhandled event ${eventType} on ${chan}`)
}
return(handled)
}
+69
View File
@@ -0,0 +1,69 @@
export function busReplyRoute(daemonBlock, meshName) {
if(!daemonBlock?.senderId) return(null)
const onArena = meshName === 'arena'
const systemReply = daemonBlock.maestroActionsReply
?? daemonBlock.gpsActionsReply
?? daemonBlock.observerActionsReply
const actionsReply = onArena
? (daemonBlock.bus?.arena?.actionsReply ?? systemReply)
: systemReply
if(!actionsReply) return(null)
return({
senderId: daemonBlock.senderId,
actionsReply,
})
}
export function publishActionReply(redisCnx, options) {
const {
action,
reqid,
sender,
reply,
replyChannel,
senderId,
} = options
reply.action = action
reply.sender = senderId
if(reqid) reply.reqid = reqid
const chan = replyChannel.replace(/\[UID\]/g, sender)
redisCnx.redisPublish(chan, reply)
}
export function replyToAction(redisCnx, options) {
const {
action,
reqid,
sender,
success,
payload,
err,
replyChannel,
senderId,
} = options
const routeReplyChannel = replyChannel ?? redisCnx.actionsReply
const routeSenderId = senderId ?? redisCnx.senderId
if(!routeReplyChannel || !routeSenderId) {
console.error(`[${redisCnx.redisId}] Cannot resolve action reply route`)
return
}
const reply = { success }
if(err != null) reply.err = err
if(payload !== undefined) reply.payload = payload
publishActionReply(redisCnx, {
action,
reqid,
sender,
replyChannel: routeReplyChannel,
senderId: routeSenderId,
reply,
})
}
+2 -1
View File
@@ -26,6 +26,7 @@
} }
], ],
"gps": { "gps": {
"primordialDaemon": true,
"gpsActionsChannel": "system:requests:gps", "gpsActionsChannel": "system:requests:gps",
"gpsActionsReply": "system:replies:[UID]", "gpsActionsReply": "system:replies:[UID]",
"GPSstorage": { "GPSstorage": {
@@ -58,7 +59,6 @@
"arenaChannel": "arena:lifecycle", "arenaChannel": "arena:lifecycle",
"godsReadyChannel": "arena:gods:ready" "godsReadyChannel": "arena:gods:ready"
}, },
"expectedGods": ["gps"],
"readyTimeoutMs": 30000 "readyTimeoutMs": 30000
}, },
"mysql": { "mysql": {
@@ -67,6 +67,7 @@
"simDatabase": "p42SIM" "simDatabase": "p42SIM"
}, },
"observer": { "observer": {
"primordialDaemon": false,
"observerActionsChannel": "system:requests:observer", "observerActionsChannel": "system:requests:observer",
"observerActionsReply": "system:replies:[UID]", "observerActionsReply": "system:replies:[UID]",
"senderId": "observer", "senderId": "observer",
+2 -4
View File
@@ -60,6 +60,7 @@
"gps": { "gps": {
"type": "object", "type": "object",
"properties": { "properties": {
"primordialDaemon": { "type": "boolean" },
"gpsActionsChannel": { "type": "string" }, "gpsActionsChannel": { "type": "string" },
"gpsActionsReply": { "type": "string" }, "gpsActionsReply": { "type": "string" },
"GPSstorage": { "GPSstorage": {
@@ -131,10 +132,6 @@
"godsReadyChannel" "godsReadyChannel"
] ]
}, },
"expectedGods": {
"type": "array",
"items": { "type": "string" }
},
"readyTimeoutMs": { "type": "integer", "minimum": 1000 } "readyTimeoutMs": { "type": "integer", "minimum": 1000 }
}, },
"required": [ "required": [
@@ -157,6 +154,7 @@
"observer": { "observer": {
"type": "object", "type": "object",
"properties": { "properties": {
"primordialDaemon": { "type": "boolean" },
"observerActionsChannel": { "type": "string" }, "observerActionsChannel": { "type": "string" },
"observerActionsReply": { "type": "string" }, "observerActionsReply": { "type": "string" },
"senderId": { "type": "string" }, "senderId": { "type": "string" },
+1
View File
@@ -5,6 +5,7 @@
"@p42/p42modules": "^0.1.0", "@p42/p42modules": "^0.1.0",
"ajv": "^8.12.0", "ajv": "^8.12.0",
"redis": "^4.3.0", "redis": "^4.3.0",
"uuid": "^14.0.0",
"yargs": "^17.7.2" "yargs": "^17.7.2"
} }
} }
+10 -4
View File
@@ -9,9 +9,11 @@ export class RedisConnexion {
this.redisConfig = this.config this.redisConfig = this.config
this.meshName = options.meshName this.meshName = options.meshName
this.meshModule = options.meshModule ?? null this.meshModule = options.meshModule ?? null
this.senderId = options.senderId ?? null
this.actionsReply = options.actionsReply ?? null
if(this.meshModule?.meshActions) Object.assign(this, this.meshModule.meshActions) if(this.meshModule?.actionHandlers) Object.assign(this, this.meshModule.actionHandlers)
this.afterLoginMethods = this.meshModule?.afterLoginMethods ?? [] this.afterLogin = this.meshModule?.afterLogin ?? []
this.redisClient = redis.createClient({ this.redisClient = redis.createClient({
socket: { socket: {
@@ -56,7 +58,7 @@ export class RedisConnexion {
console.log(`[${this.redisConfig.redisId}] Redis ${this.redisConfig.redisId} time:`, redisTime) console.log(`[${this.redisConfig.redisId}] Redis ${this.redisConfig.redisId} time:`, redisTime)
} }
for(const method of this.afterLoginMethods){ for(const method of this.afterLogin){
if(typeof method != 'function') continue if(typeof method != 'function') continue
method(this) method(this)
} }
@@ -282,7 +284,11 @@ export class RedisConnexion {
} }
if(typeof(this.meshModule?.dispatchMessage) === 'function') { if(typeof(this.meshModule?.dispatchMessage) === 'function') {
this.meshModule.dispatchMessage(this, msg, chan) try {
await this.meshModule.dispatchMessage(this, msg, chan)
} catch(err) {
console.error(`[${this.redisConfig.redisId}] dispatchMessage failed on ${chan}:`, err)
}
} }
} }
+4
View File
@@ -0,0 +1,4 @@
clear; node test.js --guiDatabase test_p42GUI --simDatabase test_p42SIM maestro1 --userUuid a4f33373-6adf-4d2d-9a6d-7fa0abf8b01f --simulationUuid 0x019ec742e12175c685a97bf9300b6b49
+54 -28
View File
@@ -1,17 +1,35 @@
import { MySQLClient } from '@p42/p42modules' import { MySQLClient } from '@p42/p42modules'
import { SimRepository } from '../../SimMaestro/simRepository.js' import { SimRepository } from '../../Maestro/simRepository.js'
function agentHashKey(template, agentId) { function agentHashKey(template, agentId) {
return(template.replace(/\[UID\]/g, agentId)) return(template.replace(/\[UID\]/g, agentId))
} }
function parseJsonField(raw) { function hashFieldValue(raw) {
if(raw == null) return(null) if(raw == null) return(null)
if(typeof(raw) === 'object') return(raw) if(typeof(raw) !== 'string') return(raw)
try { return(JSON.parse(raw)) } try { return(JSON.parse(raw)) }
catch { return(null) } catch { return(raw) }
} }
function valuesEqual(a, b) {
if(a === b) return(true)
if(a == null || b == null) return(false)
if(typeof(a) !== 'object' && typeof(b) !== 'object') return(a == b)
if(typeof(a) !== typeof(b)) return(false)
if(typeof(a) !== 'object') return(false)
if(Array.isArray(a) !== Array.isArray(b)) return(false)
const keysA = Object.keys(a)
const keysB = Object.keys(b)
if(keysA.length !== keysB.length) return(false)
for(const key of keysA) {
if(!valuesEqual(a[key], b[key])) return(false)
}
return(true)
}
const RESERVED_HASH_FIELDS = new Set(['position', 'vector', 'speed', 'segment'])
function vectorsEqual(a, b) { function vectorsEqual(a, b) {
for(const axis of ['x', 'y', 'z']) { for(const axis of ['x', 'y', 'z']) {
if(a[axis] !== b[axis]) return(false) if(a[axis] !== b[axis]) return(false)
@@ -23,13 +41,12 @@ async function findSimulationFixture(ctx) {
const { guiDatabase, simDatabase } = ctx.databases const { guiDatabase, simDatabase } = ctx.databases
const rows = await MySQLClient.poolExecute(ctx.db, ` const rows = await MySQLClient.poolExecute(ctx.db, `
SELECT u.usr_uuid AS user_uuid, SELECT u.usr_uuid AS user_uuid,
BIN_TO_UUID(s.sim_uuid) AS simulation_uuid, BIN_TO_UUID(s.sim_uuid) AS simulation_uuid
BIN_TO_UUID(s.sim_root_kf_uuid) AS keyframe_id
FROM \`${guiDatabase}\`.users u FROM \`${guiDatabase}\`.users u
INNER JOIN \`${guiDatabase}\`.simowners o ON o.own_usr_id = u.usr_id INNER JOIN \`${guiDatabase}\`.simowners o ON o.own_usr_id = u.usr_id
INNER JOIN \`${simDatabase}\`.simulations s ON o.own_sim_uuid = s.sim_uuid INNER JOIN \`${simDatabase}\`.simulations s ON o.own_sim_uuid = s.sim_uuid
INNER JOIN \`${simDatabase}\`.edited_kf_store ekfs ON ekfs.ekfs_ekf_uuid = s.sim_root_kf_uuid INNER JOIN \`${simDatabase}\`.edited_kf_store ekfs ON ekfs.ekfs_ekf_uuid = s.sim_root_kf_uuid
GROUP BY u.usr_uuid, s.sim_uuid, s.sim_root_kf_uuid GROUP BY u.usr_uuid, s.sim_uuid
HAVING COUNT(ekfs.ekfs_agent_id) > 0 HAVING COUNT(ekfs.ekfs_agent_id) > 0
LIMIT 1 LIMIT 1
`) `)
@@ -37,14 +54,13 @@ async function findSimulationFixture(ctx) {
if(!rows.length) { if(!rows.length) {
throw(new Error( throw(new Error(
'No simulation fixture found in MySQL (need user-owned sim with root keyframe agents). ' 'No simulation fixture found in MySQL (need user-owned sim with root keyframe agents). '
+ 'Pass --userUuid, --simulationUuid, --keyframeId explicitly, or use --guiDatabase / --simDatabase for a test DB.' + 'Pass --userUuid, --simulationUuid explicitly, or use --guiDatabase / --simDatabase for a test DB.'
)) ))
} }
return({ return({
userUuid: rows[0].user_uuid, userUuid: rows[0].user_uuid,
simulationUuid: rows[0].simulation_uuid, simulationUuid: rows[0].simulation_uuid,
keyframeId: rows[0].keyframe_id,
}) })
} }
@@ -71,15 +87,11 @@ function waitForLifecycleEvent(ctx, eventType, timeoutMs) {
export function configureYargs(yargsBuilder) { export function configureYargs(yargsBuilder) {
return(yargsBuilder.options({ return(yargsBuilder.options({
userUuid: { userUuid: {
describe: 'User UUID to send STARTSIMULATION as (auto-discovered if omitted)', describe: 'User UUID (dashed or 0x-prefixed hex; auto-discovered if omitted)',
type: 'string', type: 'string',
}, },
simulationUuid: { simulationUuid: {
describe: 'Simulation UUID (auto-discovered if omitted)', describe: 'Simulation UUID (dashed or 0x-prefixed hex; auto-discovered if omitted)',
type: 'string',
},
keyframeId: {
describe: 'Root keyframe UUID (auto-discovered if omitted)',
type: 'string', type: 'string',
}, },
timeout: { timeout: {
@@ -91,32 +103,32 @@ export function configureYargs(yargsBuilder) {
} }
export async function run(ctx) { export async function run(ctx) {
const { log, argv, config, systemCnx, arenaCnx } = ctx const { log, argv, config, systemCnx, arenaCnx, normalizeUuid } = ctx
const arenaStorage = config.gps?.arenaStorage ?? { const arenaStorage = config.gps?.arenaStorage ?? {
agentHashKey: 'arena:agents:[UID]', agentHashKey: 'arena:agents:[UID]',
agentsIndexKey: 'arena:agents', agentsIndexKey: 'arena:agents',
} }
log('action', 'Resolving simulation fixture from MySQL...') log('action', 'Resolving simulation fixture from MySQL...')
let userUuid = argv.userUuid let userUuid = argv.userUuid ? normalizeUuid(argv.userUuid) : undefined
let simulationUuid = argv.simulationUuid let simulationUuid = argv.simulationUuid ? normalizeUuid(argv.simulationUuid) : undefined
let keyframeId = argv.keyframeId
if(!userUuid || !simulationUuid || !keyframeId) { if(!userUuid || !simulationUuid) {
const fixture = await findSimulationFixture(ctx) const fixture = await findSimulationFixture(ctx)
userUuid = userUuid ?? fixture.userUuid userUuid = userUuid ?? fixture.userUuid
simulationUuid = simulationUuid ?? fixture.simulationUuid simulationUuid = simulationUuid ?? fixture.simulationUuid
keyframeId = keyframeId ?? fixture.keyframeId
} }
log('action', `User: ${userUuid}`) log('action', `User: ${userUuid}`)
log('action', `Simulation: ${simulationUuid}`) log('action', `Simulation: ${simulationUuid}`)
log('action', `Keyframe: ${keyframeId}`)
const simRepo = new SimRepository(ctx.db, ctx.databases, false) const simRepo = new SimRepository(ctx.db, ctx.databases, false)
const access = await simRepo.validateSimulationAccess(userUuid, simulationUuid, keyframeId) const access = await simRepo.validateSimulationAccess(userUuid, simulationUuid)
if(!access.ok) throw(new Error(`Simulation access check failed: ${access.err}`)) if(!access.ok) throw(new Error(`Simulation access check failed: ${access.err}`))
const keyframeId = access.sim.sim_root_kf_uuid
log('action', `Root keyframe: ${keyframeId}`)
const agentsResult = await simRepo.loadKeyframeAgents(keyframeId) const agentsResult = await simRepo.loadKeyframeAgents(keyframeId)
if(!agentsResult.ok) throw(new Error(`Failed to load keyframe agents: ${agentsResult.err}`)) if(!agentsResult.ok) throw(new Error(`Failed to load keyframe agents: ${agentsResult.err}`))
if(!agentsResult.agents.length) throw(new Error('Keyframe has no agents with valid GPS values')) if(!agentsResult.agents.length) throw(new Error('Keyframe has no agents with valid GPS values'))
@@ -137,7 +149,6 @@ export async function run(ctx) {
roles: ['*'], roles: ['*'],
payload: { payload: {
simulationUuid, simulationUuid,
keyframeId,
infraId: null, infraId: null,
}, },
}) })
@@ -174,8 +185,8 @@ export async function run(ctx) {
const hash = await arenaCnx.redisHgetall(key) const hash = await arenaCnx.redisHgetall(key)
const expected = expectedById.get(agentId) const expected = expectedById.get(agentId)
const position = parseJsonField(hash.position) const position = hashFieldValue(hash.position)
const vector = parseJsonField(hash.vector) const vector = hashFieldValue(hash.vector)
if(!position || !vector) { if(!position || !vector) {
throw(new Error(`Agent ${agentId}: missing position or vector in arena hash ${key}`)) throw(new Error(`Agent ${agentId}: missing position or vector in arena hash ${key}`))
@@ -189,8 +200,23 @@ export async function run(ctx) {
throw(new Error(`Agent ${agentId}: vector mismatch (MySQL vs arena store)`)) throw(new Error(`Agent ${agentId}: vector mismatch (MySQL vs arena store)`))
} }
log('success', `Agent ${agentId}: position and vector match MySQL`) const storeExpected = expected.store ?? {}
for(const [field, expVal] of Object.entries(storeExpected)) {
const actualVal = hashFieldValue(hash[field])
if(!valuesEqual(actualVal, expVal)) {
throw(new Error(`Agent ${agentId}: store field "${field}" mismatch (MySQL vs arena store)`))
}
} }
log('success', 'Arena store seeded correctly from MySQL (Maestro will stall waiting for readyToStart without GPS)') for(const field of Object.keys(hash)) {
if(RESERVED_HASH_FIELDS.has(field)) continue
if(!(field in storeExpected)) {
throw(new Error(`Agent ${agentId}: unexpected store field "${field}" in arena hash ${key}`))
}
}
log('success', `Agent ${agentId}: position, vector, and store values match MySQL`)
}
log('success', 'Arena store seeded correctly from MySQL (Maestro will wait for agent + primordial daemon readyToStart until timeout)')
} }
+34 -1
View File
@@ -2,12 +2,40 @@ import yargs from 'yargs/yargs'
import { hideBin } from 'yargs/helpers' import { hideBin } from 'yargs/helpers'
import { pathToFileURL } from 'url' import { pathToFileURL } from 'url'
import { fileURLToPath } from 'url' import { fileURLToPath } from 'url'
import { dirname, join } from 'path' import { dirname, join, resolve } from 'path'
import { RedisConnexion } from '../redisConnexion.js' import { RedisConnexion } from '../redisConnexion.js'
import { configHelper } from '../configHelper.js' import { configHelper } from '../configHelper.js'
import { MySQLClient } from '@p42/p42modules' import { MySQLClient } from '@p42/p42modules'
const __dirname = dirname(fileURLToPath(import.meta.url)) const __dirname = dirname(fileURLToPath(import.meta.url))
const __filename = fileURLToPath(import.meta.url)
const UUID_DASHED_RE = /^[0-9a-f]{8}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{12}$/i
const UUID_HEX32_RE = /^[0-9a-f]{32}$/i
export function normalizeUuid(value) {
if(typeof(value) !== 'string') {
throw(new Error('UUID must be a string'))
}
const trimmed = value.trim()
if(!trimmed) throw(new Error('UUID must be a non-empty string'))
if(UUID_DASHED_RE.test(trimmed)) {
return(trimmed.toLowerCase())
}
let hex = trimmed
if(/^0x/i.test(hex)) hex = hex.slice(2)
if(!UUID_HEX32_RE.test(hex)) {
throw(new Error(
`Invalid UUID: ${value} (expected dashed form or 32-char hex, optionally prefixed with 0x)`
))
}
hex = hex.toLowerCase()
return(`${hex.slice(0, 8)}-${hex.slice(8, 12)}-${hex.slice(12, 16)}-${hex.slice(16, 20)}-${hex.slice(20)}`)
}
const LOG_COLORS = { const LOG_COLORS = {
action: '\x1b[37m', action: '\x1b[37m',
@@ -145,6 +173,7 @@ async function main() {
systemCnx, systemCnx,
arenaCnx, arenaCnx,
log, log,
normalizeUuid,
onArenaMessage(handler) { onArenaMessage(handler) {
arenaHandlers.add(handler) arenaHandlers.add(handler)
}, },
@@ -173,7 +202,11 @@ async function main() {
process.exit(exitCode) process.exit(exitCode)
} }
const isMain = process.argv[1] && resolve(__filename) === resolve(process.argv[1])
if(isMain) {
main().catch(err => { main().catch(err => {
console.error(`${LOG_COLORS.error}${err.message ?? err}${LOG_COLORS.reset}`) console.error(`${LOG_COLORS.error}${err.message ?? err}${LOG_COLORS.reset}`)
process.exit(1) process.exit(1)
}) })
}
+2
View File
@@ -0,0 +1,2 @@
import { v7 as uuidv7 } from 'uuid'
console.log(uuidv7().replaceAll('-', ''))