refacto & cleanup of toplevel functions, use @p42/p42Modules, test microframework
This commit is contained in:
@@ -0,0 +1,196 @@
|
||||
import { MySQLClient } from '@p42/p42modules'
|
||||
import { SimRepository } from '../../SimMaestro/simRepository.js'
|
||||
|
||||
function agentHashKey(template, agentId) {
|
||||
return(template.replace(/\[UID\]/g, agentId))
|
||||
}
|
||||
|
||||
function parseJsonField(raw) {
|
||||
if(raw == null) return(null)
|
||||
if(typeof(raw) === 'object') return(raw)
|
||||
try { return(JSON.parse(raw)) }
|
||||
catch { return(null) }
|
||||
}
|
||||
|
||||
function vectorsEqual(a, b) {
|
||||
for(const axis of ['x', 'y', 'z']) {
|
||||
if(a[axis] !== b[axis]) return(false)
|
||||
}
|
||||
return(true)
|
||||
}
|
||||
|
||||
async function findSimulationFixture(ctx) {
|
||||
const { guiDatabase, simDatabase } = ctx.databases
|
||||
const rows = await MySQLClient.poolExecute(ctx.db, `
|
||||
SELECT u.usr_uuid AS user_uuid,
|
||||
BIN_TO_UUID(s.sim_uuid) AS simulation_uuid,
|
||||
BIN_TO_UUID(s.sim_root_kf_uuid) AS keyframe_id
|
||||
FROM \`${guiDatabase}\`.users u
|
||||
INNER JOIN \`${guiDatabase}\`.simowners o ON o.own_usr_id = u.usr_id
|
||||
INNER JOIN \`${simDatabase}\`.simulations s ON o.own_sim_uuid = s.sim_uuid
|
||||
INNER JOIN \`${simDatabase}\`.edited_kf_store ekfs ON ekfs.ekfs_ekf_uuid = s.sim_root_kf_uuid
|
||||
GROUP BY u.usr_uuid, s.sim_uuid, s.sim_root_kf_uuid
|
||||
HAVING COUNT(ekfs.ekfs_agent_id) > 0
|
||||
LIMIT 1
|
||||
`)
|
||||
|
||||
if(!rows.length) {
|
||||
throw(new Error(
|
||||
'No simulation fixture found in MySQL (need user-owned sim with root keyframe agents). '
|
||||
+ 'Pass --userUuid, --simulationUuid, --keyframeId explicitly, or use --guiDatabase / --simDatabase for a test DB.'
|
||||
))
|
||||
}
|
||||
|
||||
return({
|
||||
userUuid: rows[0].user_uuid,
|
||||
simulationUuid: rows[0].simulation_uuid,
|
||||
keyframeId: rows[0].keyframe_id,
|
||||
})
|
||||
}
|
||||
|
||||
function waitForLifecycleEvent(ctx, eventType, timeoutMs) {
|
||||
return(new Promise((resolve, reject) => {
|
||||
const lifecyclePattern = ctx.config.maestro.lifecycle.arenaChannel
|
||||
const timer = setTimeout(() => {
|
||||
ctx.offArenaMessage(handler)
|
||||
reject(new Error(`Timeout waiting for ${eventType} on ${lifecyclePattern} (${timeoutMs}ms)`))
|
||||
}, timeoutMs)
|
||||
|
||||
const handler = (msg, chan) => {
|
||||
if(!ctx.arenaCnx.matchesChan(chan, lifecyclePattern)) return
|
||||
if(msg.eventType !== eventType) return
|
||||
clearTimeout(timer)
|
||||
ctx.offArenaMessage(handler)
|
||||
resolve(msg)
|
||||
}
|
||||
|
||||
ctx.onArenaMessage(handler)
|
||||
}))
|
||||
}
|
||||
|
||||
export function configureYargs(yargsBuilder) {
|
||||
return(yargsBuilder.options({
|
||||
userUuid: {
|
||||
describe: 'User UUID to send STARTSIMULATION as (auto-discovered if omitted)',
|
||||
type: 'string',
|
||||
},
|
||||
simulationUuid: {
|
||||
describe: 'Simulation UUID (auto-discovered if omitted)',
|
||||
type: 'string',
|
||||
},
|
||||
keyframeId: {
|
||||
describe: 'Root keyframe UUID (auto-discovered if omitted)',
|
||||
type: 'string',
|
||||
},
|
||||
timeout: {
|
||||
describe: 'Milliseconds to wait for onYourMarks',
|
||||
default: 15000,
|
||||
type: 'number',
|
||||
},
|
||||
}))
|
||||
}
|
||||
|
||||
export async function run(ctx) {
|
||||
const { log, argv, config, systemCnx, arenaCnx } = ctx
|
||||
const arenaStorage = config.gps?.arenaStorage ?? {
|
||||
agentHashKey: 'arena:agents:[UID]',
|
||||
agentsIndexKey: 'arena:agents',
|
||||
}
|
||||
|
||||
log('action', 'Resolving simulation fixture from MySQL...')
|
||||
let userUuid = argv.userUuid
|
||||
let simulationUuid = argv.simulationUuid
|
||||
let keyframeId = argv.keyframeId
|
||||
|
||||
if(!userUuid || !simulationUuid || !keyframeId) {
|
||||
const fixture = await findSimulationFixture(ctx)
|
||||
userUuid = userUuid ?? fixture.userUuid
|
||||
simulationUuid = simulationUuid ?? fixture.simulationUuid
|
||||
keyframeId = keyframeId ?? fixture.keyframeId
|
||||
}
|
||||
|
||||
log('action', `User: ${userUuid}`)
|
||||
log('action', `Simulation: ${simulationUuid}`)
|
||||
log('action', `Keyframe: ${keyframeId}`)
|
||||
|
||||
const simRepo = new SimRepository(ctx.db, ctx.databases, false)
|
||||
const access = await simRepo.validateSimulationAccess(userUuid, simulationUuid, keyframeId)
|
||||
if(!access.ok) throw(new Error(`Simulation access check failed: ${access.err}`))
|
||||
|
||||
const agentsResult = await simRepo.loadKeyframeAgents(keyframeId)
|
||||
if(!agentsResult.ok) throw(new Error(`Failed to load keyframe agents: ${agentsResult.err}`))
|
||||
if(!agentsResult.agents.length) throw(new Error('Keyframe has no agents with valid GPS values'))
|
||||
|
||||
log('success', `Loaded ${agentsResult.agents.length} agent(s) from MySQL`)
|
||||
|
||||
const expectedById = new Map(agentsResult.agents.map(a => [a.id, a]))
|
||||
const lifecycleWait = waitForLifecycleEvent(ctx, 'onYourMarks', argv.timeout)
|
||||
|
||||
const reqid = `maestro1-${Date.now()}`
|
||||
const actionsChan = config.maestro.maestroActionsChannel
|
||||
|
||||
log('action', `Publishing STARTSIMULATION on ${actionsChan} (reqid=${reqid})...`)
|
||||
await systemCnx.redisPublish(actionsChan, {
|
||||
action: 'STARTSIMULATION',
|
||||
reqid,
|
||||
sender: userUuid,
|
||||
roles: ['*'],
|
||||
payload: {
|
||||
simulationUuid,
|
||||
keyframeId,
|
||||
infraId: null,
|
||||
},
|
||||
})
|
||||
|
||||
log('action', `Waiting for onYourMarks on ${config.maestro.lifecycle.arenaChannel}...`)
|
||||
const lifecycleMsg = await lifecycleWait
|
||||
|
||||
const payload = lifecycleMsg.payload ?? {}
|
||||
if(payload.simulationId !== simulationUuid) {
|
||||
throw(new Error(`onYourMarks simulationId mismatch: expected ${simulationUuid}, got ${payload.simulationId}`))
|
||||
}
|
||||
|
||||
log('success', `Received onYourMarks for simulationId=${payload.simulationId}`)
|
||||
|
||||
log('action', 'Reading arena store...')
|
||||
const indexIds = await arenaCnx.redisSmembers(arenaStorage.agentsIndexKey)
|
||||
const expectedIds = [...expectedById.keys()].sort()
|
||||
const actualIds = [...indexIds].sort()
|
||||
|
||||
if(actualIds.length !== expectedIds.length) {
|
||||
throw(new Error(`Agent index count mismatch: expected ${expectedIds.length}, got ${actualIds.length}`))
|
||||
}
|
||||
|
||||
for(let i = 0; i < expectedIds.length; i++) {
|
||||
if(actualIds[i] !== expectedIds[i]) {
|
||||
throw(new Error(`Agent index mismatch at ${i}: expected ${expectedIds[i]}, got ${actualIds[i]}`))
|
||||
}
|
||||
}
|
||||
|
||||
log('success', `Arena agents index contains ${actualIds.length} agent(s)`)
|
||||
|
||||
for(const agentId of expectedIds) {
|
||||
const key = agentHashKey(arenaStorage.agentHashKey, agentId)
|
||||
const hash = await arenaCnx.redisHgetall(key)
|
||||
const expected = expectedById.get(agentId)
|
||||
|
||||
const position = parseJsonField(hash.position)
|
||||
const vector = parseJsonField(hash.vector)
|
||||
|
||||
if(!position || !vector) {
|
||||
throw(new Error(`Agent ${agentId}: missing position or vector in arena hash ${key}`))
|
||||
}
|
||||
|
||||
if(!vectorsEqual(position, expected.position)) {
|
||||
throw(new Error(`Agent ${agentId}: position mismatch (MySQL vs arena store)`))
|
||||
}
|
||||
|
||||
if(!vectorsEqual(vector, expected.vector)) {
|
||||
throw(new Error(`Agent ${agentId}: vector mismatch (MySQL vs arena store)`))
|
||||
}
|
||||
|
||||
log('success', `Agent ${agentId}: position and vector match MySQL`)
|
||||
}
|
||||
|
||||
log('success', 'Arena store seeded correctly from MySQL (Maestro will stall waiting for readyToStart without GPS)')
|
||||
}
|
||||
Reference in New Issue
Block a user