From e81a5b573beabdc37d3b7b4d1b924cc39a8a2aa3 Mon Sep 17 00:00:00 2001 From: STEINNI Date: Sat, 13 Jun 2026 13:47:46 +0000 Subject: [PATCH] Observer embryo, Maestro done --- GPS/actions/arena/arenaHandlers.js | 26 +- GPS/actions/arena/dispatch.js | 5 + GPS/actions/arena/index.js | 2 + GPS/actions/system/dispatch.js | 28 +++ GPS/actions/system/index.js | 2 + GPS/actions/system/positions.js | 46 ++-- GPS/actionsHelper.js | 9 +- GPS/agentStore.js | 22 +- GPS/arenaAgentLoader.js | 82 +++++++ GPS/gpsServer.js | 194 ++++++++++++++- GPS/p42Gps.js | 10 +- GPS/simulationState.js | 23 ++ GPS/startGps.sh | 32 ++- Observer/actions/arena/arenaHandlers.js | 12 + Observer/actions/arena/dispatch.js | 5 + Observer/actions/arena/index.js | 12 + Observer/actions/system/dispatch.js | 28 +++ Observer/actions/system/index.js | 12 + Observer/actions/system/utilities.js | 48 ++++ Observer/actionsHelper.js | 16 ++ Observer/observerServer.js | 55 +++++ Observer/p42Observer.js | 131 ++++++++++ Observer/package.json | 9 + Observer/startObserver.sh | 31 +++ Observer/stopObserver.sh | 8 + SimMaestro/actions/arena/arenaHandlers.js | 22 ++ SimMaestro/actions/arena/dispatch.js | 5 + SimMaestro/actions/arena/index.js | 12 + SimMaestro/actions/system/dispatch.js | 28 +++ SimMaestro/actions/system/index.js | 14 ++ SimMaestro/actions/system/simulation.js | 138 +++++++++++ SimMaestro/actions/system/utilities.js | 48 ++++ SimMaestro/actionsHelper.js | 16 ++ SimMaestro/arenaGroom.js | 33 +++ SimMaestro/maestroServer.js | 264 +++++++++++++++++++++ SimMaestro/orchestrationState.js | 5 + SimMaestro/p42SimMaestro.js | 136 +++++++++++ SimMaestro/package.json | 9 + SimMaestro/simRepository.js | 116 +++++++++ SimMaestro/startMaestro.sh | 31 +++ SimMaestro/stopMaestro.sh | 8 + config.json | 40 ++++ configSchema.json | 85 +++++++ mysqlClient.js | 29 +++ package.json | 5 +- GPS/redisConnexion.js => redisConnexion.js | 180 +++++++------- 46 files changed, 1929 insertions(+), 143 deletions(-) create mode 100644 GPS/actions/arena/dispatch.js create mode 100644 GPS/actions/system/dispatch.js create mode 100644 GPS/arenaAgentLoader.js create mode 100644 GPS/simulationState.js create mode 100644 Observer/actions/arena/arenaHandlers.js create mode 100644 Observer/actions/arena/dispatch.js create mode 100644 Observer/actions/arena/index.js create mode 100644 Observer/actions/system/dispatch.js create mode 100644 Observer/actions/system/index.js create mode 100644 Observer/actions/system/utilities.js create mode 100644 Observer/actionsHelper.js create mode 100644 Observer/observerServer.js create mode 100644 Observer/p42Observer.js create mode 100644 Observer/package.json create mode 100755 Observer/startObserver.sh create mode 100755 Observer/stopObserver.sh create mode 100644 SimMaestro/actions/arena/arenaHandlers.js create mode 100644 SimMaestro/actions/arena/dispatch.js create mode 100644 SimMaestro/actions/arena/index.js create mode 100644 SimMaestro/actions/system/dispatch.js create mode 100644 SimMaestro/actions/system/index.js create mode 100644 SimMaestro/actions/system/simulation.js create mode 100644 SimMaestro/actions/system/utilities.js create mode 100644 SimMaestro/actionsHelper.js create mode 100644 SimMaestro/arenaGroom.js create mode 100644 SimMaestro/maestroServer.js create mode 100644 SimMaestro/orchestrationState.js create mode 100644 SimMaestro/p42SimMaestro.js create mode 100644 SimMaestro/package.json create mode 100644 SimMaestro/simRepository.js create mode 100755 SimMaestro/startMaestro.sh create mode 100755 SimMaestro/stopMaestro.sh create mode 100644 mysqlClient.js rename GPS/redisConnexion.js => redisConnexion.js (72%) diff --git a/GPS/actions/arena/arenaHandlers.js b/GPS/actions/arena/arenaHandlers.js index 5ba7eac..c336aed 100644 --- a/GPS/actions/arena/arenaHandlers.js +++ b/GPS/actions/arena/arenaHandlers.js @@ -1,6 +1,7 @@ export const construct = (redisCnx) => { const tickMs = redisCnx.gpsSrv?.getGpsSettings().collisionTickMs ?? 100 + // Interval always runs; tickArena no-ops until LIVE (see gpsServer.tickArena) setInterval(() => { redisCnx.gpsSrv?.tickArena() }, tickMs) @@ -8,6 +9,24 @@ export const construct = (redisCnx) => { export const methods = { + handleLifecycleEvent(msg) { + const srv = this.gpsSrv + if(!srv) return + + if(msg.eventType === 'onYourMarks') { + srv.onYourMarks(msg.payload ?? {}).catch(err => { + console.error(`[${this.redisId}] onYourMarks failed:`, err) + srv.publishReadyToStart({ success: false, err: err.message ?? 'onYourMarks failed' }) + }) + return + } + + if(msg.eventType === 'bigBang') { + srv.onBigBang(msg.payload ?? {}) + return + } + }, + handleAgentEvent(msg) { const agentId = msg.sender if(!agentId || typeof(agentId) !== 'string') { @@ -34,7 +53,12 @@ export const methods = { dispatchArenaMessage(msg, chan) { const gps = this.config.gps - if(!gps || !this.gpsSrv) return false + if(!gps || !this.gpsSrv) return(false) + + if(this.matchesChan(chan, gps.lifecycle?.arenaChannel ?? 'arena:lifecycle')) { + this.handleLifecycleEvent(msg) + return(true) + } if(this.matchesChan(chan, gps.agentVectorChangeChannel)) { this.handleAgentEvent(msg) diff --git a/GPS/actions/arena/dispatch.js b/GPS/actions/arena/dispatch.js new file mode 100644 index 0000000..243fcef --- /dev/null +++ b/GPS/actions/arena/dispatch.js @@ -0,0 +1,5 @@ + +export function dispatchMessage(redisCnx, msg, chan) { + if(!redisCnx.config.gps || typeof(redisCnx.dispatchArenaMessage) !== 'function') return + redisCnx.dispatchArenaMessage(msg, chan) +} diff --git a/GPS/actions/arena/index.js b/GPS/actions/arena/index.js index 0f98cb3..607159a 100644 --- a/GPS/actions/arena/index.js +++ b/GPS/actions/arena/index.js @@ -1,4 +1,5 @@ import { methods as arenaMethods, construct as arenaConstruct } from './arenaHandlers.js' +import { dispatchMessage } from './dispatch.js' export const afterLoginMethods = [ arenaConstruct, @@ -7,3 +8,4 @@ export const afterLoginMethods = [ export const meshActions = { ...arenaMethods, } +export { dispatchMessage } diff --git a/GPS/actions/system/dispatch.js b/GPS/actions/system/dispatch.js new file mode 100644 index 0000000..969acf9 --- /dev/null +++ b/GPS/actions/system/dispatch.js @@ -0,0 +1,28 @@ + +export function dispatchMessage(redisCnx, msg, chan) { + const gps = redisCnx.config.gps + if(!gps?.gpsActionsChannel) return + + const actionsChan = redisCnx.fullChan(gps.gpsActionsChannel) + if(chan != actionsChan) return + + const action = msg.action + if(!action || typeof(action) !== 'string') { + console.warn(`[${redisCnx.redisId}] Ignoring message without action on ${chan}`) + return + } + + const handler = redisCnx['action_'+action] + if(typeof(handler) != 'function') { + if(redisCnx.debug) console.warn(`[${redisCnx.redisId}] Unknown action ${action} on ${chan}`) + return + } + + const payload = ('payload' in msg) ? msg.payload : null + const reqid = ('reqid' in msg) ? msg.reqid.substr(0, 50) : null + const sender = msg.sender || null + const roles = Array.isArray(msg.roles) ? msg.roles : ['*'] + + if(redisCnx.debug) console.log(`[${redisCnx.redisId}] Dispatching action ${action} from ${sender}`) + handler.call(redisCnx, action, payload, reqid, sender, roles) +} diff --git a/GPS/actions/system/index.js b/GPS/actions/system/index.js index 481aa58..4c4e5e4 100644 --- a/GPS/actions/system/index.js +++ b/GPS/actions/system/index.js @@ -1,5 +1,6 @@ import { methods as utilities, construct as utilitiesConstruct } from './utilities.js' import { methods as positions } from './positions.js' +import { dispatchMessage } from './dispatch.js' export const afterLoginMethods = [ utilitiesConstruct, @@ -8,3 +9,4 @@ export const meshActions = { ...utilities, ...positions, } +export { dispatchMessage } diff --git a/GPS/actions/system/positions.js b/GPS/actions/system/positions.js index a05ec85..ae6d361 100644 --- a/GPS/actions/system/positions.js +++ b/GPS/actions/system/positions.js @@ -1,4 +1,4 @@ -import { publishActionReply, parseAt } from '../../actionsHelper.js' +import { publishActionReply, parseSimTime } from '../../actionsHelper.js' export const methods = { @@ -8,7 +8,7 @@ export const methods = { "reqid": "6az5e4r6a", "payload": { "agentId": "agent42", - "at": "2026-06-07T12:00:00.000Z" + "t": 12.5 } } Event-Tx: @@ -21,9 +21,9 @@ export const methods = { "id": "agent42", "position": { "x": 1, "y": 2, "z": 3 }, "vector": { "x": 0, "y": 0, "z": 0 }, - "since": 1717750800, + "since": 0, "generation": 2, - "at": "2026-06-07T12:00:00.000Z" + "t": 12.5 } } } @@ -43,6 +43,14 @@ export const methods = { return } + if(!this.gpsSrv.isLive()) { + publishActionReply(this, { ...replyOpts, reply: { + success: false, + err: 'Simulation not live', + } }) + return + } + const agentId = payload?.agentId if(!agentId || typeof(agentId) !== 'string') { publishActionReply(this, { ...replyOpts, reply: { @@ -52,11 +60,11 @@ export const methods = { return } - const at = parseAt(payload, () => this.gpsSrv.now()) + const at = parseSimTime(payload, () => this.gpsSrv.now()) if(at === null) { publishActionReply(this, { ...replyOpts, reply: { success: false, - err: 'Invalid at timestamp', + err: 'Invalid simulation time', } }) return } @@ -86,17 +94,7 @@ export const methods = { "yMin": -10, "yMax": 10, "zMin": 0, "zMax": 5 }, - "at": "2026-06-07T12:00:00.000Z" - } - } - Event-Tx: - { - "action": "GETAGENTSINPRISM", - "success": true, - "reqid": "6az5e4r6a", - "payload": { - "agents": [ ... ], - "at": "2026-06-07T12:00:00.000Z" + "t": 0 } } */ @@ -115,6 +113,14 @@ export const methods = { return } + if(!this.gpsSrv.isLive()) { + publishActionReply(this, { ...replyOpts, reply: { + success: false, + err: 'Simulation not live', + } }) + return + } + const prism = payload?.prism if(!this.gpsSrv.isValidPrism(prism)) { publishActionReply(this, { ...replyOpts, reply: { @@ -124,11 +130,11 @@ export const methods = { return } - const at = parseAt(payload, () => this.gpsSrv.now()) + const at = parseSimTime(payload, () => this.gpsSrv.now()) if(at === null) { publishActionReply(this, { ...replyOpts, reply: { success: false, - err: 'Invalid at timestamp', + err: 'Invalid simulation time', } }) return } @@ -138,7 +144,7 @@ export const methods = { success: true, payload: { agents, - at: new Date(at * 1000).toISOString(), + t: at, }, } }) }, diff --git a/GPS/actionsHelper.js b/GPS/actionsHelper.js index f67eb18..a867894 100644 --- a/GPS/actionsHelper.js +++ b/GPS/actionsHelper.js @@ -15,9 +15,8 @@ export function publishActionReply(redisCnx, options) { redisCnx.redisPublish(chan, reply) } -export function parseAt(payload, fallbackFn) { - if(!payload?.at) return(fallbackFn()) - const t = Date.parse(payload.at) - if(Number.isNaN(t)) return(null) - return(t / 1000) +export function parseSimTime(payload, fallbackFn) { + if(payload?.t != null && typeof(payload.t) === 'number' && !Number.isNaN(payload.t)) return(payload.t) + if(payload?.at != null && typeof(payload.at) === 'number' && !Number.isNaN(payload.at)) return(payload.at) + return(fallbackFn()) } diff --git a/GPS/agentStore.js b/GPS/agentStore.js index 7da7980..fa64363 100644 --- a/GPS/agentStore.js +++ b/GPS/agentStore.js @@ -11,7 +11,20 @@ export class AgentStore { return(this.storage.agentHashKey.replace(/\[UID\]/g, agentId)) } - async exportSegment(agent, eventType) { + async clearAll() { + try { + const ids = await this.cnx.redisSmembers(this.storage.agentsIndexKey) + for(const id of ids) { + await this.cnx.redisDel(this.agentHashKey(id)) + } + await this.cnx.redisDel(this.storage.agentsIndexKey) + if(this.debug) console.log(`[GPS] Cleared system agent store (${ids.length} agent(s))`) + } catch(err) { + console.error('[GPS] Failed to clear system agent store:', err) + } + } + + async exportSegment(agent, eventType, simT = 0, simulationId = null) { try { const record = { eventType, @@ -20,7 +33,8 @@ export class AgentStore { vector: { ...agent.vector }, since: agent.since, generation: agent.generation ?? 0, - at: new Date().toISOString(), + t: simT, + simulationId, } await this.cnx.redisHset(this.agentHashKey(agent.id), 'segment', record) await this.cnx.redisSadd(this.storage.agentsIndexKey, agent.id) @@ -35,12 +49,12 @@ export class AgentStore { } } - async exportRemove(agentId) { + async exportRemove(agentId, simT = 0) { try { const record = { eventType: 'remove', id: agentId, - at: new Date().toISOString(), + t: simT, } await this.cnx.redisDel(this.agentHashKey(agentId)) await this.cnx.redisSrem(this.storage.agentsIndexKey, agentId) diff --git a/GPS/arenaAgentLoader.js b/GPS/arenaAgentLoader.js new file mode 100644 index 0000000..8c73f9c --- /dev/null +++ b/GPS/arenaAgentLoader.js @@ -0,0 +1,82 @@ + +export class ArenaAgentLoader { + + constructor(arenaCnx, storage, debug = false) { + this.cnx = arenaCnx + this.storage = storage + this.debug = debug + } + + async loadAgents(expectedIds = null) { + // TODO: when arena mesh is sharded, iterate all arena Redis connections and merge agent ids + const agentIds = await this.#listAgentIds(expectedIds) + const agents = [] + const errors = [] + + for(const agentId of agentIds) { + const result = await this.#loadAgentFromHash(agentId) + if(!result.ok) { + errors.push(result.err) + continue + } + agents.push(result.agent) + } + + if(errors.length) return({ ok: false, err: errors.join('; '), agents: [] }) + return({ ok: true, agents }) + } + + #parseHashField(raw) { + if(raw == null) return(null) + if(typeof(raw) === 'object') return(raw) + try { return(JSON.parse(raw)) } + catch { return(null) } + } + + #isVector(v) { + return( + v && + typeof(v) === 'object' && + typeof(v.x) === 'number' && + typeof(v.y) === 'number' && + typeof(v.z) === 'number' + ) + } + + async #listAgentIds(expectedIds = null) { + if(Array.isArray(expectedIds) && expectedIds.length) return([...expectedIds]) + return(await this.cnx.redisSmembers(this.storage.agentsIndexKey)) + } + + async #loadAgentFromHash(agentId) { + const key = this.storage.agentHashKey.replace(/\[UID\]/g, agentId) + const positionRaw = await this.cnx.redisHget(key, 'position') + const vectorRaw = await this.cnx.redisHget(key, 'vector') + let position = this.#parseHashField(positionRaw) + let vector = this.#parseHashField(vectorRaw) + + if(!this.#isVector(vector)) { + const segmentRaw = await this.cnx.redisHget(key, 'segment') + const segment = this.#parseHashField(segmentRaw) + if(segment) { + if(!this.#isVector(vector) && this.#isVector(segment.vector)) vector = segment.vector + if(!this.#isVector(position) && this.#isVector(segment.position)) position = segment.position + } + } + + if(!this.#isVector(vector)) return({ ok: false, err: `Invalid or missing vector for ${agentId}` }) + if(!this.#isVector(position)) position = { x: 0, y: 0, z: 0 } + + return({ + ok: true, + agent: { + id: agentId, + position: { ...position }, + vector: { ...vector }, + since: 0, + generation: 1, + }, + }) + } + +} diff --git a/GPS/gpsServer.js b/GPS/gpsServer.js index c6e3b5f..cbca56e 100644 --- a/GPS/gpsServer.js +++ b/GPS/gpsServer.js @@ -1,6 +1,8 @@ import { AccesRights } from '../accesRights.js' import { AgentStore } from './agentStore.js' import { CollisionRegistry } from './collisionRegistry.js' +import { ArenaAgentLoader } from './arenaAgentLoader.js' +import { SimState, validateAgentSets } from './simulationState.js' import { compareWorldlinePair, positionAt, @@ -19,8 +21,14 @@ export class gpsServer { this.agents = new Map() this.registry = new CollisionRegistry() this.arenaCnx = null + this.arenaCnxs = [] this.systemCnx = null this.agentStore = null + this.arenaLoader = null + this.state = SimState.IDLE + this.simulationId = null + this.bigBangEpoch = null + this.ignoredChangeCount = 0 } getGpsSettings() { @@ -33,8 +41,40 @@ export class gpsServer { }) } + getLifecycleSettings() { + const gps = this.gpsConfig.gps ?? {} + return({ + arenaChannel: gps.lifecycle?.arenaChannel ?? 'arena:lifecycle', + godsReadyChannel: gps.lifecycle?.godsReadyChannel ?? 'arena:gods:ready', + senderId: gps.senderId ?? 'gps', + }) + } + + getArenaStorageSettings() { + const gps = this.gpsConfig.gps ?? {} + return({ + agentHashKey: gps.arenaStorage?.agentHashKey ?? 'arena:agents:[UID]', + agentsIndexKey: gps.arenaStorage?.agentsIndexKey ?? 'arena:agents', + }) + } + + isLive() { + return(this.state === SimState.LIVE) + } + + isPrepare() { + return(this.state === SimState.PREPARE) + } + + simNow() { + if(this.bigBangEpoch === null) return(null) + return((performance.now() - this.bigBangEpoch) / 1000) + } + now() { - return(Date.now() / 1000) + if(this.isLive()) return(this.simNow()) + if(this.isPrepare()) return(0) + return(null) } wireSystemConnexion(cnx) { @@ -57,7 +97,11 @@ export class gpsServer { wireArenaConnexion(cnx) { cnx.gpsSrv = this - this.arenaCnx = cnx + this.arenaCnxs.push(cnx) + if(!this.arenaCnx || cnx.redisConfig.role === 'primary') { + this.arenaCnx = cnx + this.arenaLoader = new ArenaAgentLoader(cnx, this.getArenaStorageSettings(), this.debug) + } } getAgent(agentId) { @@ -75,14 +119,16 @@ export class gpsServer { vector: { ...agent.vector }, since: agent.since, generation: agent.generation ?? 0, - at: new Date(at * 1000).toISOString(), + t: at, }) } getAgentPosition(agentId, at = null) { + if(!this.isLive()) return(null) const agent = this.agents.get(agentId) if(!agent) return(null) const t = at ?? this.now() + if(t === null) return(null) return(this.buildAgentSnapshot(agent, t)) } @@ -108,7 +154,9 @@ export class gpsServer { } getAgentsInPrism(prism, at = null) { + if(!this.isLive()) return([]) const t = at ?? this.now() + if(t === null) return([]) const agents = [] for(const agent of this.agents.values()) { const position = positionAt(agent, t) @@ -119,6 +167,117 @@ export class gpsServer { return(agents) } + async resetSimulation() { + this.agents.clear() + this.registry = new CollisionRegistry() + this.simulationId = null + this.bigBangEpoch = null + this.ignoredChangeCount = 0 + this.state = SimState.IDLE + await this.agentStore?.clearAll() + } + + runInitialPairScan() { + const { nearMissDistance, prismTimeHeight } = this.getGpsSettings() + const ids = [...this.agents.keys()] + for(let i = 0; i < ids.length; i++) { + for(let j = i + 1; j < ids.length; j++) { + const hit = compareWorldlinePair( + this.agents.get(ids[i]), + this.agents.get(ids[j]), + prismTimeHeight, + nearMissDistance, + 0 + ) + if(hit) this.registry.add(hit) + } + } + if(this.debug) console.log(`[GPS] Initial pair scan: ${this.registry.entries.length} proximity entries at T=0`) + } + + async publishReadyToStart(result) { + if(!this.arenaCnx) return + const { godsReadyChannel, senderId } = this.getLifecycleSettings() + await this.arenaCnx.redisPublish(godsReadyChannel, { + eventType: 'readyToStart', + sender: senderId, + payload: { + success: result.success, + simulationId: this.simulationId, + agentIds: [...this.agents.keys()], + err: result.err ?? null, + }, + }) + } + + async onYourMarks(payload = {}) { + await this.resetSimulation() + this.simulationId = payload.simulationId ?? null + if(!this.simulationId) { + console.error('[GPS] onYourMarks rejected: missing simulationId') + await this.publishReadyToStart({ success: false, err: 'Missing simulationId' }) + return + } + if(!this.arenaLoader) { + console.error('[GPS] onYourMarks rejected: no arena loader') + await this.publishReadyToStart({ success: false, err: 'No arena Redis connection' }) + return + } + + const loadResult = await this.arenaLoader.loadAgents(payload.agentIds ?? null) + if(!loadResult.ok) { + console.error(`[GPS] onYourMarks load failed: ${loadResult.err}`) + await this.publishReadyToStart({ success: false, err: loadResult.err }) + return + } + + for(const agent of loadResult.agents) { + this.agents.set(agent.id, agent) + } + + if(Array.isArray(payload.agentIds) && payload.agentIds.length) { + const mismatch = validateAgentSets(payload.agentIds, [...this.agents.keys()]) + if(mismatch) { + console.error(`[GPS] onYourMarks agent mismatch: ${mismatch}`) + await this.resetSimulation() + await this.publishReadyToStart({ success: false, err: mismatch }) + return + } + } + + this.runInitialPairScan() + this.state = SimState.PREPARE + if(this.debug) console.log(`[GPS] PREPARE: ${this.agents.size} agent(s), simulationId=${this.simulationId}`) + await this.publishReadyToStart({ success: true }) + } + + async onBigBang(payload = {}) { + if(this.state !== SimState.PREPARE) { + console.error(`[GPS] bigBang rejected: expected PREPARE, got ${this.state}`) + return + } + if(!payload.simulationId || payload.simulationId !== this.simulationId) { + console.error('[GPS] bigBang rejected: simulationId mismatch') + return + } + if(Array.isArray(payload.agentIds) && payload.agentIds.length) { + const mismatch = validateAgentSets(payload.agentIds, [...this.agents.keys()]) + if(mismatch) { + console.error(`[GPS] bigBang agent mismatch: ${mismatch}`) + return + } + } + + this.bigBangEpoch = performance.now() + this.state = SimState.LIVE + + for(const agent of this.agents.values()) { + await this.agentStore?.exportSegment(agent, 'start', 0, this.simulationId) + } + + if(this.debug) console.log(`[GPS] LIVE: bigBangEpoch set, simulationId=${this.simulationId}`) + } + upsertAgent(agentId, newVector, newPosition = null) { const now = this.now() let agent = this.agents.get(agentId) @@ -131,7 +290,7 @@ export class gpsServer { generation: 1, } this.agents.set(agentId, agent) - this.agentStore?.exportSegment(agent, 'change') + this.agentStore?.exportSegment(agent, 'change', now, this.simulationId) return(agent) } @@ -140,7 +299,7 @@ export class gpsServer { agent.since = now agent.generation = (agent.generation ?? 0) + 1 if(newPosition) agent.position = { ...newPosition } - this.agentStore?.exportSegment(agent, 'change') + this.agentStore?.exportSegment(agent, 'change', now, this.simulationId) return(agent) } @@ -163,7 +322,7 @@ export class gpsServer { const hits = this.scanAgentPairs(agentId, refreshed) for(const hit of hits) this.registry.add(hit) - this.agentStore?.exportSegment(agent, 'refresh') + this.agentStore?.exportSegment(agent, 'refresh', now, this.simulationId) if(this.debug) console.log(`[GPS] Prism refresh: ${agentId}`) return(true) } @@ -196,6 +355,10 @@ export class gpsServer { } onVectorChange(agentId, newVector, newPosition = null) { + if(!this.isLive()) { + if(this.isPrepare()) this.ignoredChangeCount++ + return([]) + } this.registry.purge(agentId) this.upsertAgent(agentId, newVector, newPosition) const hits = this.scanAgentPairs(agentId) @@ -205,9 +368,10 @@ export class gpsServer { } onAgentRemove(agentId) { + if(!this.isLive()) return this.agents.delete(agentId) this.registry.purge(agentId) - this.agentStore?.exportRemove(agentId) + this.agentStore?.exportRemove(agentId, this.now()) if(this.debug) console.log(`[GPS] Agent removed: ${agentId}`) } @@ -226,9 +390,9 @@ export class gpsServer { byAgent.get(agentId).push({ otherAgent, distance: entry.distance, - at: new Date(entry.time * 1000).toISOString(), + t: entry.time, minDistance: entry.minDistance, - minAt: new Date(entry.minTime * 1000).toISOString(), + minT: entry.minTime, }) } } @@ -240,13 +404,18 @@ export class gpsServer { const chan = this.arenaCnx.config.gps.collisionsChannel.replace(/\[UID\]/g, targetAgentId) await this.arenaCnx.redisPublish(chan, { eventType: 'proximity', - payload: { pairs }, - sender: 'gps', + payload: { + pairs, + simulationId: this.simulationId, + }, + sender: this.getLifecycleSettings().senderId, }) } async tickCollisions() { - const due = this.registry.dueBefore(this.now()) + const now = this.now() + if(now === null) return + const due = this.registry.dueBefore(now) if(!due.length) return const batches = this.buildProximityBatches(due) @@ -257,6 +426,7 @@ export class gpsServer { } tickArena() { + if(!this.isLive()) return this.tickPrismRefresh() this.tickCollisions() } diff --git a/GPS/p42Gps.js b/GPS/p42Gps.js index 32fb32b..838e9d8 100644 --- a/GPS/p42Gps.js +++ b/GPS/p42Gps.js @@ -2,9 +2,16 @@ import yargs from 'yargs/yargs' import { hideBin } from 'yargs/helpers' import 'node:process' -import {RedisConnexion} from './redisConnexion.js' +import {RedisConnexion} from '../redisConnexion.js' import {configHelper} from '../configHelper.js' import {gpsServer} from './gpsServer.js' +import * as systemMesh from './actions/system/index.js' +import * as arenaMesh from './actions/arena/index.js' + +const meshModules = { + system: systemMesh, + arena: arenaMesh, +} ///////////////////////////// Little improvement on console.xxx ///////////////////////////////////// @@ -52,6 +59,7 @@ function meshRedisConns(mesh, meshName, debug, rootConfig) { config: { ...cfg, ...meshConfig, gps: rootConfig.gps }, redisId: cfg.redisId, meshName, + meshModule: meshModules[meshName], }) ) } diff --git a/GPS/simulationState.js b/GPS/simulationState.js new file mode 100644 index 0000000..35b630a --- /dev/null +++ b/GPS/simulationState.js @@ -0,0 +1,23 @@ +export const SimState = { + IDLE: 'idle', + PREPARE: 'prepare', + LIVE: 'live', +} + +export function validateAgentSets(expected, found) { + if(!Array.isArray(expected) || !Array.isArray(found)) { + return('agentIds must be arrays') + } + const exp = new Set(expected) + const fnd = new Set(found) + if(exp.size !== fnd.size) { + return(`Agent count mismatch: expected ${exp.size}, found ${fnd.size}`) + } + for(const id of exp) { + if(!fnd.has(id)) return(`Missing agent: ${id}`) + } + for(const id of fnd) { + if(!exp.has(id)) return(`Unexpected agent: ${id}`) + } + return(null) +} diff --git a/GPS/startGps.sh b/GPS/startGps.sh index 323d831..c3731d0 100755 --- a/GPS/startGps.sh +++ b/GPS/startGps.sh @@ -1,13 +1,33 @@ #!/bin/sh -cd /opt/p42GodDaemons/GPS/ +. /etc/p42/secrets.env + +daemon=p42Gps +logfile=gps.log + + +pid=$(pgrep -f "$daemon") -pid=`ps -ef | grep p42Gps |grep -v grep | awk '{print $2}'` if [ -z "$pid" ] then - node p42Gps.js --debug > gps.log 2>&1 & + node "${daemon}.js" --debug > "$logfile" 2>&1 & + pid=$! + + sleep 1 + + if kill -0 "$pid" 2>/dev/null + then + echo "" + echo "$daemon is now running with PID=$pid" + echo "" + else + echo "" + echo "Failed to start $daemon. Check gps.log" + echo "" + fi else - echo '' - echo 'Already running PID='"$pid"' (use stopGps.sh to stop it)' - echo '' + echo "" + echo "$daemon is already running with PID=$pid" + echo "" fi + diff --git a/Observer/actions/arena/arenaHandlers.js b/Observer/actions/arena/arenaHandlers.js new file mode 100644 index 0000000..7c42d3a --- /dev/null +++ b/Observer/actions/arena/arenaHandlers.js @@ -0,0 +1,12 @@ + +export const construct = (redisCnx) => { +} + +export const methods = { + + dispatchArenaMessage(msg, chan) { + if(this.debug) console.log(`[${this.redisId}] Arena message (unhandled):`, msg.eventType, chan) + return(false) + }, + +} diff --git a/Observer/actions/arena/dispatch.js b/Observer/actions/arena/dispatch.js new file mode 100644 index 0000000..a70d6d8 --- /dev/null +++ b/Observer/actions/arena/dispatch.js @@ -0,0 +1,5 @@ + +export function dispatchMessage(redisCnx, msg, chan) { + if(typeof(redisCnx.dispatchArenaMessage) !== 'function') return + redisCnx.dispatchArenaMessage(msg, chan) +} diff --git a/Observer/actions/arena/index.js b/Observer/actions/arena/index.js new file mode 100644 index 0000000..c0d66bc --- /dev/null +++ b/Observer/actions/arena/index.js @@ -0,0 +1,12 @@ +import { methods as arenaMethods, construct as arenaConstruct } from './arenaHandlers.js' +import { dispatchMessage } from './dispatch.js' + +export const afterLoginMethods = [ + arenaConstruct, +] + +export const meshActions = { + ...arenaMethods, +} + +export { dispatchMessage } diff --git a/Observer/actions/system/dispatch.js b/Observer/actions/system/dispatch.js new file mode 100644 index 0000000..e5ca12b --- /dev/null +++ b/Observer/actions/system/dispatch.js @@ -0,0 +1,28 @@ + +export function dispatchMessage(redisCnx, msg, chan) { + const observer = redisCnx.config.observer + if(!observer?.observerActionsChannel) return + + const actionsChan = redisCnx.fullChan(observer.observerActionsChannel) + if(chan != actionsChan) return + + const action = msg.action + if(!action || typeof(action) !== 'string') { + console.warn(`[${redisCnx.redisId}] Ignoring message without action on ${chan}`) + return + } + + const handler = redisCnx['action_'+action] + if(typeof(handler) != 'function') { + if(redisCnx.debug) console.warn(`[${redisCnx.redisId}] Unknown action ${action} on ${chan}`) + return + } + + const payload = ('payload' in msg) ? msg.payload : null + const reqid = ('reqid' in msg) ? msg.reqid.substr(0, 50) : null + const sender = msg.sender || null + const roles = Array.isArray(msg.roles) ? msg.roles : ['*'] + + if(redisCnx.debug) console.log(`[${redisCnx.redisId}] Dispatching action ${action} from ${sender}`) + handler.call(redisCnx, action, payload, reqid, sender, roles) +} diff --git a/Observer/actions/system/index.js b/Observer/actions/system/index.js new file mode 100644 index 0000000..6b282da --- /dev/null +++ b/Observer/actions/system/index.js @@ -0,0 +1,12 @@ +import { methods as utilities, construct as utilitiesConstruct } from './utilities.js' +import { dispatchMessage } from './dispatch.js' + +export const afterLoginMethods = [ + utilitiesConstruct, +] + +export const meshActions = { + ...utilities, +} + +export { dispatchMessage } diff --git a/Observer/actions/system/utilities.js b/Observer/actions/system/utilities.js new file mode 100644 index 0000000..1949d7c --- /dev/null +++ b/Observer/actions/system/utilities.js @@ -0,0 +1,48 @@ +import { publishActionReply } from '../../actionsHelper.js' + +export const construct = (redisCnx) => { +} + +export const methods = { + + async action_RELOADCONFIG(action, payload, reqid, sender, roles) { + const replyOpts = { + action, + reqid, + sender, + replyChannel: this.config.observer.observerActionsReply, + } + if(!this.accessRights.canDo(roles, action)) { + publishActionReply(this, { ...replyOpts, reply: { + success: false, + err: 'Unauthorized action !', + } }) + return + } + this.reloadAccessRights() + publishActionReply(this, { ...replyOpts, reply: { + success: true, + } }) + }, + + async action_GETCONFIG(action, payload, reqid, sender, roles) { + const replyOpts = { + action, + reqid, + sender, + replyChannel: this.config.observer.observerActionsReply, + } + if(!this.accessRights.canDo(roles, action)) { + publishActionReply(this, { ...replyOpts, reply: { + success: false, + err: 'Unauthorized action !', + } }) + return + } + publishActionReply(this, { ...replyOpts, reply: { + success: true, + payload: this.getAccessRights(), + } }) + }, + +} diff --git a/Observer/actionsHelper.js b/Observer/actionsHelper.js new file mode 100644 index 0000000..10274e2 --- /dev/null +++ b/Observer/actionsHelper.js @@ -0,0 +1,16 @@ + +export function publishActionReply(redisCnx, options) { + const { + action, + reqid, + sender, + reply, + replyChannel, + senderId = 'observer', + } = options + reply.action = action + reply.sender = senderId + if(reqid) reply.reqid = reqid + const chan = replyChannel.replace(/\[UID\]/g, sender) + redisCnx.redisPublish(chan, reply) +} diff --git a/Observer/observerServer.js b/Observer/observerServer.js new file mode 100644 index 0000000..fc1f480 --- /dev/null +++ b/Observer/observerServer.js @@ -0,0 +1,55 @@ +import { AccesRights } from '../accesRights.js' + +export class observerServer { + + constructor(configHelper, allRediscnx, debug) { + this.configHelper = configHelper + this.observerConfig = configHelper.config + this.allRediscnx = allRediscnx + this.debug = debug + this.accessRights = new AccesRights(this.observerConfig, debug) + this.arenaCnx = null + this.arenaCnxs = [] + this.systemCnx = null + } + + getObserverSettings() { + const observer = this.observerConfig.observer ?? {} + return({ + senderId: observer.senderId ?? 'observer', + lifecycle: { + arenaChannel: observer.lifecycle?.arenaChannel ?? 'arena:lifecycle', + godsReadyChannel: observer.lifecycle?.godsReadyChannel ?? 'arena:gods:ready', + }, + }) + } + + wireSystemConnexion(cnx) { + cnx.observerSrv = this + cnx.accessRights = this.accessRights + cnx.reloadAccessRights = () => this.reloadAccessRights() + cnx.getAccessRights = () => this.getAccessRights() + if(!this.systemCnx || cnx.redisConfig.role === 'primary') { + this.systemCnx = cnx + } + } + + wireArenaConnexion(cnx) { + cnx.observerSrv = this + this.arenaCnxs.push(cnx) + if(!this.arenaCnx || cnx.redisConfig.role === 'primary') { + this.arenaCnx = cnx + } + } + + async reloadAccessRights() { + await this.configHelper.refreshAccessRights() + this.observerConfig.accessRights = this.configHelper.config.accessRights + this.accessRights.refreshAccessRights(this.observerConfig) + } + + getAccessRights() { + return(this.observerConfig.accessRights) + } + +} diff --git a/Observer/p42Observer.js b/Observer/p42Observer.js new file mode 100644 index 0000000..d76ad4e --- /dev/null +++ b/Observer/p42Observer.js @@ -0,0 +1,131 @@ + +import yargs from 'yargs/yargs' +import { hideBin } from 'yargs/helpers' +import 'node:process' +import { RedisConnexion } from '../redisConnexion.js' +import { configHelper } from '../configHelper.js' +import { observerServer } from './observerServer.js' +import * as systemMesh from './actions/system/index.js' +import * as arenaMesh from './actions/arena/index.js' + +const meshModules = { + system: systemMesh, + arena: arenaMesh, +} + +const originalLog = console.log +const originalWarn = console.warn +const originalError = console.error +function logWithTimestamp(originalFn, level, ...args) { + const timestamp = new Date().toISOString() + originalFn(`[${timestamp}] [${level}]`, ...args) +} + +console.log = (...args) => logWithTimestamp(originalLog, 'LOG', ...args) +console.warn = (...args) => logWithTimestamp(originalWarn, 'WARN', ...args) +console.error = (...args) => logWithTimestamp(originalError, 'ERROR', ...args) + +const argv = yargs(hideBin(process.argv)).command('Observer', 'Simulation observer for P42', {}) + .options({ + 'debug': { + description: 'shows debug info', + alias: 'd', + defaut: false, + type: 'boolean' + }, + 'config': { + description: 'Points to config file (default: ../config.json)', + alias: 'c', + default: '../config.json', + type: 'string' + }, + }).help().version('1.0').argv + +const debug = Boolean(process.env.DEBUG) || argv.debug + +let cfgh = new configHelper({ + localfile: argv.config, +}) + +function meshRedisConns(mesh, meshName, debug, rootConfig) { + const { redis, ...meshConfig } = mesh + return redis.map(cfg => + new RedisConnexion({ + debug, + config: { ...cfg, ...meshConfig, observer: rootConfig.observer }, + redisId: cfg.redisId, + meshName, + meshModule: meshModules[meshName], + }) + ) +} + +async function startAllRedis(rootConfig, cfgh) { + if(debug) console.log('Starting all Redis instances...') + + const redisConns = [ + ...meshRedisConns(rootConfig.systemMesh, 'system', debug, rootConfig), + ...meshRedisConns(rootConfig.arenaMesh, 'arena', debug, rootConfig), + ] + + const srv = new observerServer(cfgh, redisConns, debug) + for(const cnx of redisConns) { + if(cnx.meshName === 'system') srv.wireSystemConnexion(cnx) + else if(cnx.meshName === 'arena') srv.wireArenaConnexion(cnx) + } + + const loginResults = await Promise.allSettled( + redisConns.map(async cnx => { + await cnx.redisLogin() + return(cnx.redisId) + }) + ) + + const failedLogin = loginResults.filter(r => r.status !== 'fulfilled') + if(failedLogin.length > 0) { + console.error('Redis login failures:') + failedLogin.forEach((r, i) => { + const id = redisConns[i].redisId + console.error(`login failed for redis:[${id}] → ${r.reason}`) + }) + throw new Error( + `Redis login failed for ${failedLogin.length}/${redisConns.length} instances` + ) + } + + if(debug) console.log('All Redis logins OK') + + const chanResults = await Promise.allSettled( + redisConns.map(async cnx => { + await cnx.redisChansStart() + return(cnx.redisId) + }) + ) + + const failedChans = chanResults.filter(r => r.status !== 'fulfilled') + if(failedChans.length > 0) { + console.error('Redis chansStart failures:') + failedChans.forEach((r, i) => { + const id = redisConns[i].redisId + console.error(`chansStart failed for redis:[${id}] → ${r.reason}`) + }) + throw new Error( + `Redis chansStart failed for ${failedChans.length}/${redisConns.length} instances` + ) + } + + if(debug) console.log('All Redis chansStart OK') + + return({ redisConns, srv }) +} + +cfgh.fetchConfig().then(async rootConfig => { + if(!rootConfig) { + console.error('Cannot get a valid configuration ! Aaarrghhh...') + process.exit() + } + + console.log(`Debug mode : ${debug ? 'ON' : 'OFF'}`) + + await startAllRedis(rootConfig, cfgh) +}) diff --git a/Observer/package.json b/Observer/package.json new file mode 100644 index 0000000..3f9b558 --- /dev/null +++ b/Observer/package.json @@ -0,0 +1,9 @@ +{ + "name": "p42Observer", + "version": "1.0.0", + "description": "Simulation observer God-daemon for P42", + "type": "module", + "dependencies": { + "yargs": "^17.7.2" + } +} diff --git a/Observer/startObserver.sh b/Observer/startObserver.sh new file mode 100755 index 0000000..2573af3 --- /dev/null +++ b/Observer/startObserver.sh @@ -0,0 +1,31 @@ +#!/bin/sh + +. /etc/p42/secrets.env + +daemon=p42Observer +logfile=observer.log + +pid=$(pgrep -f "$daemon") + +if [ -z "$pid" ] +then + node "${daemon}.js" --debug > "$logfile" 2>&1 & + pid=$! + + sleep 1 + + if kill -0 "$pid" 2>/dev/null + then + echo "" + echo "$daemon is now running with PID=$pid" + echo "" + else + echo "" + echo "Failed to start $daemon. Check observer.log" + echo "" + fi +else + echo "" + echo "$daemon is already running with PID=$pid" + echo "" +fi diff --git a/Observer/stopObserver.sh b/Observer/stopObserver.sh new file mode 100755 index 0000000..0fc2704 --- /dev/null +++ b/Observer/stopObserver.sh @@ -0,0 +1,8 @@ +#!/bin/sh + +pid=`ps -ef | grep p42Observer.js |grep -v grep | awk '{print $2}'` +if [ -n "$pid" ] +then + echo "killing pid: $pid" + kill -9 $pid +fi diff --git a/SimMaestro/actions/arena/arenaHandlers.js b/SimMaestro/actions/arena/arenaHandlers.js new file mode 100644 index 0000000..6f2d418 --- /dev/null +++ b/SimMaestro/actions/arena/arenaHandlers.js @@ -0,0 +1,22 @@ + +export const construct = (redisCnx) => { +} + +export const methods = { + + dispatchArenaMessage(msg, chan) { + const maestro = this.config.maestro + if(!maestro || !this.maestroSrv) return(false) + + if(this.matchesChan(chan, maestro.lifecycle?.godsReadyChannel ?? 'arena:gods:ready')) { + if(msg.eventType === 'readyToStart') { + this.maestroSrv.onReadyToStart(msg) + return(true) + } + } + + if(this.debug) console.log(`[${this.redisId}] Arena message (unhandled):`, msg.eventType, chan) + return(false) + }, + +} diff --git a/SimMaestro/actions/arena/dispatch.js b/SimMaestro/actions/arena/dispatch.js new file mode 100644 index 0000000..a70d6d8 --- /dev/null +++ b/SimMaestro/actions/arena/dispatch.js @@ -0,0 +1,5 @@ + +export function dispatchMessage(redisCnx, msg, chan) { + if(typeof(redisCnx.dispatchArenaMessage) !== 'function') return + redisCnx.dispatchArenaMessage(msg, chan) +} diff --git a/SimMaestro/actions/arena/index.js b/SimMaestro/actions/arena/index.js new file mode 100644 index 0000000..c0d66bc --- /dev/null +++ b/SimMaestro/actions/arena/index.js @@ -0,0 +1,12 @@ +import { methods as arenaMethods, construct as arenaConstruct } from './arenaHandlers.js' +import { dispatchMessage } from './dispatch.js' + +export const afterLoginMethods = [ + arenaConstruct, +] + +export const meshActions = { + ...arenaMethods, +} + +export { dispatchMessage } diff --git a/SimMaestro/actions/system/dispatch.js b/SimMaestro/actions/system/dispatch.js new file mode 100644 index 0000000..cc409c2 --- /dev/null +++ b/SimMaestro/actions/system/dispatch.js @@ -0,0 +1,28 @@ + +export function dispatchMessage(redisCnx, msg, chan) { + const maestro = redisCnx.config.maestro + if(!maestro?.maestroActionsChannel) return + + const actionsChan = redisCnx.fullChan(maestro.maestroActionsChannel) + if(chan != actionsChan) return + + const action = msg.action + if(!action || typeof(action) !== 'string') { + console.warn(`[${redisCnx.redisId}] Ignoring message without action on ${chan}`) + return + } + + const handler = redisCnx['action_'+action] + if(typeof(handler) != 'function') { + if(redisCnx.debug) console.warn(`[${redisCnx.redisId}] Unknown action ${action} on ${chan}`) + return + } + + const payload = ('payload' in msg) ? msg.payload : null + const reqid = ('reqid' in msg) ? msg.reqid.substr(0, 50) : null + const sender = msg.sender || null + const roles = Array.isArray(msg.roles) ? msg.roles : ['*'] + + if(redisCnx.debug) console.log(`[${redisCnx.redisId}] Dispatching action ${action} from ${sender}`) + handler.call(redisCnx, action, payload, reqid, sender, roles) +} diff --git a/SimMaestro/actions/system/index.js b/SimMaestro/actions/system/index.js new file mode 100644 index 0000000..e42a00f --- /dev/null +++ b/SimMaestro/actions/system/index.js @@ -0,0 +1,14 @@ +import { methods as utilities, construct as utilitiesConstruct } from './utilities.js' +import { methods as simulation } from './simulation.js' +import { dispatchMessage } from './dispatch.js' + +export const afterLoginMethods = [ + utilitiesConstruct, +] + +export const meshActions = { + ...utilities, + ...simulation, +} + +export { dispatchMessage } diff --git a/SimMaestro/actions/system/simulation.js b/SimMaestro/actions/system/simulation.js new file mode 100644 index 0000000..90e5ed0 --- /dev/null +++ b/SimMaestro/actions/system/simulation.js @@ -0,0 +1,138 @@ +import { publishActionReply } from '../../actionsHelper.js' +import { isValidUuid } from '../../simRepository.js' + +export const methods = { + + /* Event-Rx: + { + "action": "STARTSIMULATION", + "reqid": "6az5e4r6a", + "sender": "", + "roles": ["*"], + "payload": { + "simulationUuid": "...", + "keyframeId": "...", + "infraId": "..." + } + } + */ + async action_STARTSIMULATION(action, payload, reqid, sender, roles) { + const replyOpts = { + action, + reqid, + sender, + replyChannel: this.config.maestro.maestroActionsReply, + } + if(!this.accessRights.canDo(roles, action)) { + publishActionReply(this, { ...replyOpts, reply: { + success: false, + err: 'Unauthorized action !', + } }) + return + } + + if(!sender || !isValidUuid(sender)) { + publishActionReply(this, { ...replyOpts, reply: { + success: false, + err: 'Missing or invalid sender (user UUID)', + } }) + return + } + + if(!payload?.simulationUuid || !payload?.keyframeId) { + publishActionReply(this, { ...replyOpts, reply: { + success: false, + err: 'Missing simulationUuid or keyframeId', + } }) + return + } + + try { + const result = await this.maestroSrv.startSimulation(sender, payload) + if(!result.ok) { + publishActionReply(this, { ...replyOpts, reply: { + success: false, + err: result.err, + } }) + return + } + publishActionReply(this, { ...replyOpts, reply: { + success: true, + payload: { + simulationId: result.simulationId, + keyframeId: result.keyframeId, + infraId: result.infraId, + agentIds: result.agentIds, + }, + } }) + } catch(err) { + console.error(`[${this.redisId}] STARTSIMULATION failed:`, err) + publishActionReply(this, { ...replyOpts, reply: { + success: false, + err: err.message ?? 'STARTSIMULATION failed', + } }) + } + }, + + /* Event-Rx: + { + "action": "STOPSIMULATION", + "reqid": "6az5e4r6a", + "sender": "", + "payload": { "simulationUuid": "..." } + } + */ + async action_STOPSIMULATION(action, payload, reqid, sender, roles) { + const replyOpts = { + action, + reqid, + sender, + replyChannel: this.config.maestro.maestroActionsReply, + } + if(!this.accessRights.canDo(roles, action)) { + publishActionReply(this, { ...replyOpts, reply: { + success: false, + err: 'Unauthorized action !', + } }) + return + } + + if(!sender || !isValidUuid(sender)) { + publishActionReply(this, { ...replyOpts, reply: { + success: false, + err: 'Missing or invalid sender (user UUID)', + } }) + return + } + + if(!payload?.simulationUuid) { + publishActionReply(this, { ...replyOpts, reply: { + success: false, + err: 'Missing simulationUuid', + } }) + return + } + + try { + const result = await this.maestroSrv.stopSimulation(sender, payload) + if(!result.ok) { + publishActionReply(this, { ...replyOpts, reply: { + success: false, + err: result.err, + } }) + return + } + publishActionReply(this, { ...replyOpts, reply: { + success: true, + payload: { simulationId: result.simulationId }, + } }) + } catch(err) { + console.error(`[${this.redisId}] STOPSIMULATION failed:`, err) + publishActionReply(this, { ...replyOpts, reply: { + success: false, + err: err.message ?? 'STOPSIMULATION failed', + } }) + } + }, + +} diff --git a/SimMaestro/actions/system/utilities.js b/SimMaestro/actions/system/utilities.js new file mode 100644 index 0000000..ee7d2fa --- /dev/null +++ b/SimMaestro/actions/system/utilities.js @@ -0,0 +1,48 @@ +import { publishActionReply } from '../../actionsHelper.js' + +export const construct = (redisCnx) => { +} + +export const methods = { + + async action_RELOADCONFIG(action, payload, reqid, sender, roles) { + const replyOpts = { + action, + reqid, + sender, + replyChannel: this.config.maestro.maestroActionsReply, + } + if(!this.accessRights.canDo(roles, action)) { + publishActionReply(this, { ...replyOpts, reply: { + success: false, + err: 'Unauthorized action !', + } }) + return + } + this.reloadAccessRights() + publishActionReply(this, { ...replyOpts, reply: { + success: true, + } }) + }, + + async action_GETCONFIG(action, payload, reqid, sender, roles) { + const replyOpts = { + action, + reqid, + sender, + replyChannel: this.config.maestro.maestroActionsReply, + } + if(!this.accessRights.canDo(roles, action)) { + publishActionReply(this, { ...replyOpts, reply: { + success: false, + err: 'Unauthorized action !', + } }) + return + } + publishActionReply(this, { ...replyOpts, reply: { + success: true, + payload: this.getAccessRights(), + } }) + }, + +} diff --git a/SimMaestro/actionsHelper.js b/SimMaestro/actionsHelper.js new file mode 100644 index 0000000..e7701fa --- /dev/null +++ b/SimMaestro/actionsHelper.js @@ -0,0 +1,16 @@ + +export function publishActionReply(redisCnx, options) { + const { + action, + reqid, + sender, + reply, + replyChannel, + senderId = 'maestro', + } = options + reply.action = action + reply.sender = senderId + if(reqid) reply.reqid = reqid + const chan = replyChannel.replace(/\[UID\]/g, sender) + redisCnx.redisPublish(chan, reply) +} diff --git a/SimMaestro/arenaGroom.js b/SimMaestro/arenaGroom.js new file mode 100644 index 0000000..0cd889f --- /dev/null +++ b/SimMaestro/arenaGroom.js @@ -0,0 +1,33 @@ + +export class ArenaGroom { + + constructor(arenaCnx, storage, debug = false) { + this.cnx = arenaCnx + this.storage = storage + this.debug = debug + } + + agentHashKey(agentId) { + return(this.storage.agentHashKey.replace(/\[UID\]/g, agentId)) + } + + async clearArena() { + const ids = await this.cnx.redisSmembers(this.storage.agentsIndexKey) + for(const id of ids) { + await this.cnx.redisDel(this.agentHashKey(id)) + } + await this.cnx.redisDel(this.storage.agentsIndexKey) + if(this.debug) console.log(`[Maestro] Cleared arena store (${ids.length} agent(s))`) + } + + async seedAgents(agents) { + for(const agent of agents) { + const key = this.agentHashKey(agent.id) + await this.cnx.redisHset(key, 'position', agent.position) + await this.cnx.redisHset(key, 'vector', agent.vector) + await this.cnx.redisSadd(this.storage.agentsIndexKey, agent.id) + } + if(this.debug) console.log(`[Maestro] Groomed ${agents.length} agent(s) into arena store`) + } + +} diff --git a/SimMaestro/maestroServer.js b/SimMaestro/maestroServer.js new file mode 100644 index 0000000..36601f0 --- /dev/null +++ b/SimMaestro/maestroServer.js @@ -0,0 +1,264 @@ +import { AccesRights } from '../accesRights.js' +import { createMysqlPool } from '../mysqlClient.js' +import { SimRepository } from './simRepository.js' +import { ArenaGroom } from './arenaGroom.js' +import { MaestroState } from './orchestrationState.js' + +export class maestroServer { + + constructor(configHelper, allRediscnx, debug) { + this.configHelper = configHelper + this.maestroConfig = configHelper.config + this.allRediscnx = allRediscnx + this.debug = debug + this.accessRights = new AccesRights(this.maestroConfig, debug) + this.arenaCnx = null + this.arenaCnxs = [] + this.systemCnx = null + this.db = null + this.simRepo = null + this.arenaGroom = null + this.orchestrationState = MaestroState.IDLE + this.simulationId = null + this.agentIds = [] + this.readyGods = new Map() + this.readyQuorumResolve = null + this.readyQuorumTimer = null + } + + getMaestroSettings() { + const maestro = this.maestroConfig.maestro ?? {} + return({ + senderId: maestro.senderId ?? 'maestro', + lifecycle: { + arenaChannel: maestro.lifecycle?.arenaChannel ?? 'arena:lifecycle', + godsReadyChannel: maestro.lifecycle?.godsReadyChannel ?? 'arena:gods:ready', + }, + expectedGods: maestro.expectedGods ?? ['gps'], + readyTimeoutMs: maestro.readyTimeoutMs ?? 30000, + }) + } + + getArenaStorageSettings() { + const gps = this.maestroConfig.gps ?? {} + return({ + agentHashKey: gps.arenaStorage?.agentHashKey ?? 'arena:agents:[UID]', + agentsIndexKey: gps.arenaStorage?.agentsIndexKey ?? 'arena:agents', + }) + } + + isIdle() { + return(this.orchestrationState === MaestroState.IDLE) + } + + isLive() { + return(this.orchestrationState === MaestroState.LIVE) + } + + async init() { + const mysqlCfg = this.maestroConfig.mysql + if(!mysqlCfg) { + console.error('[Maestro] Missing mysql config') + return(false) + } + this.db = await createMysqlPool(mysqlCfg) + this.simRepo = new SimRepository(this.db, this.debug) + if(this.debug) console.log('[Maestro] MySQL pool ready') + return(true) + } + + refreshArenaGroom() { + if(this.arenaCnx) { + this.arenaGroom = new ArenaGroom( + this.arenaCnx, + this.getArenaStorageSettings(), + this.debug + ) + } + } + + wireSystemConnexion(cnx) { + cnx.maestroSrv = this + cnx.accessRights = this.accessRights + cnx.reloadAccessRights = () => this.reloadAccessRights() + cnx.getAccessRights = () => this.getAccessRights() + if(!this.systemCnx || cnx.redisConfig.role === 'primary') { + this.systemCnx = cnx + } + } + + wireArenaConnexion(cnx) { + cnx.maestroSrv = this + this.arenaCnxs.push(cnx) + if(!this.arenaCnx || cnx.redisConfig.role === 'primary') { + this.arenaCnx = cnx + this.refreshArenaGroom() + } + } + + resetOrchestration() { + this.orchestrationState = MaestroState.IDLE + this.simulationId = null + this.agentIds = [] + this.readyGods.clear() + this.clearReadyQuorumWait() + } + + clearReadyQuorumWait() { + if(this.readyQuorumTimer) { + clearTimeout(this.readyQuorumTimer) + this.readyQuorumTimer = null + } + this.readyQuorumResolve = null + } + + completeReadyQuorum(result) { + const resolve = this.readyQuorumResolve + this.clearReadyQuorumWait() + if(typeof(resolve) === 'function') resolve(result) + } + + waitForReadyQuorum() { + const { readyTimeoutMs } = this.getMaestroSettings() + return(new Promise(resolve => { + this.readyQuorumResolve = resolve + this.readyQuorumTimer = setTimeout(() => { + this.completeReadyQuorum({ + ok: false, + err: `Timeout waiting for readyToStart (${readyTimeoutMs}ms)`, + }) + }, readyTimeoutMs) + })) + } + + onReadyToStart(msg) { + if(this.orchestrationState !== MaestroState.PREPARING) return + + const payload = msg.payload ?? {} + if(payload.simulationId !== this.simulationId) return + + const sender = msg.sender + const { expectedGods } = this.getMaestroSettings() + if(!expectedGods.includes(sender)) { + if(this.debug) console.warn(`[Maestro] Ignoring readyToStart from unexpected sender: ${sender}`) + return + } + + if(!payload.success) { + this.completeReadyQuorum({ + ok: false, + err: payload.err ?? `Participant ${sender} failed prepare`, + }) + return + } + + this.readyGods.set(sender, payload) + if(this.debug) console.log(`[Maestro] readyToStart from ${sender} (${this.readyGods.size}/${expectedGods.length})`) + + for(const god of expectedGods) { + if(!this.readyGods.has(god)) return + } + + this.completeReadyQuorum({ ok: true }) + } + + async publishLifecycle(eventType, payload) { + if(!this.arenaCnx) throw new Error('No arena Redis connection') + const { arenaChannel, senderId } = this.getMaestroSettings().lifecycle + await this.arenaCnx.redisPublish(arenaChannel, { + eventType, + sender: senderId, + payload, + }) + if(this.debug) console.log(`[Maestro] Published ${eventType} simulationId=${payload.simulationId}`) + } + + async startSimulation(userUuid, payload) { + if(!this.simRepo) return({ ok: false, err: 'Database not initialized' }) + if(!this.arenaGroom) return({ ok: false, err: 'No arena Redis connection' }) + if(!this.isIdle()) return({ ok: false, err: 'A simulation is already in progress' }) + + const simulationUuid = payload?.simulationUuid + const keyframeId = payload?.keyframeId + const infraId = payload?.infraId ?? null + + const access = await this.simRepo.validateSimulationAccess(userUuid, simulationUuid, keyframeId) + if(!access.ok) return(access) + + const agentsResult = await this.simRepo.loadKeyframeAgents(keyframeId) + if(!agentsResult.ok) return(agentsResult) + + await this.arenaGroom.clearArena() + await this.arenaGroom.seedAgents(agentsResult.agents) + + this.simulationId = simulationUuid + this.agentIds = agentsResult.agents.map(a => a.id) + this.readyGods.clear() + this.orchestrationState = MaestroState.PREPARING + + const lifecyclePayload = { + simulationId: this.simulationId, + agentIds: this.agentIds, + keyframeId, + infraId, + } + + const readyWait = this.waitForReadyQuorum() + + await this.publishLifecycle('onYourMarks', lifecyclePayload) + + const quorum = await readyWait + if(!quorum.ok) { + await this.arenaGroom.clearArena() + this.resetOrchestration() + return(quorum) + } + + await this.publishLifecycle('bigBang', { + simulationId: this.simulationId, + agentIds: this.agentIds, + }) + + this.orchestrationState = MaestroState.LIVE + if(this.debug) console.log(`[Maestro] LIVE simulationId=${this.simulationId}`) + + return({ + ok: true, + simulationId: this.simulationId, + keyframeId, + infraId, + agentIds: this.agentIds, + }) + } + + async stopSimulation(userUuid, payload) { + if(!this.simRepo) return({ ok: false, err: 'Database not initialized' }) + if(!this.arenaGroom) return({ ok: false, err: 'No arena Redis connection' }) + + const simulationUuid = payload?.simulationUuid + const access = await this.simRepo.validateSimulationOwner(userUuid, simulationUuid) + if(!access.ok) return(access) + + if(this.simulationId && this.simulationId !== simulationUuid) { + return({ ok: false, err: 'Another simulation is active' }) + } + + await this.arenaGroom.clearArena() + this.resetOrchestration() + + if(this.debug) console.log(`[Maestro] Stopped simulationId=${simulationUuid}`) + + return({ ok: true, simulationId: simulationUuid }) + } + + async reloadAccessRights() { + await this.configHelper.refreshAccessRights() + this.maestroConfig.accessRights = this.configHelper.config.accessRights + this.accessRights.refreshAccessRights(this.maestroConfig) + } + + getAccessRights() { + return(this.maestroConfig.accessRights) + } + +} diff --git a/SimMaestro/orchestrationState.js b/SimMaestro/orchestrationState.js new file mode 100644 index 0000000..25db8dd --- /dev/null +++ b/SimMaestro/orchestrationState.js @@ -0,0 +1,5 @@ +export const MaestroState = { + IDLE: 'idle', + PREPARING: 'preparing', + LIVE: 'live', +} diff --git a/SimMaestro/p42SimMaestro.js b/SimMaestro/p42SimMaestro.js new file mode 100644 index 0000000..e6d196e --- /dev/null +++ b/SimMaestro/p42SimMaestro.js @@ -0,0 +1,136 @@ + +import yargs from 'yargs/yargs' +import { hideBin } from 'yargs/helpers' +import 'node:process' +import { RedisConnexion } from '../redisConnexion.js' +import { configHelper } from '../configHelper.js' +import { maestroServer } from './maestroServer.js' +import * as systemMesh from './actions/system/index.js' +import * as arenaMesh from './actions/arena/index.js' + +const meshModules = { + system: systemMesh, + arena: arenaMesh, +} + +const originalLog = console.log +const originalWarn = console.warn +const originalError = console.error +function logWithTimestamp(originalFn, level, ...args) { + const timestamp = new Date().toISOString() + originalFn(`[${timestamp}] [${level}]`, ...args) +} + +console.log = (...args) => logWithTimestamp(originalLog, 'LOG', ...args) +console.warn = (...args) => logWithTimestamp(originalWarn, 'WARN', ...args) +console.error = (...args) => logWithTimestamp(originalError, 'ERROR', ...args) + +const argv = yargs(hideBin(process.argv)).command('SimMaestro', 'Simulation orchestrator for P42', {}) + .options({ + 'debug': { + description: 'shows debug info', + alias: 'd', + defaut: false, + type: 'boolean' + }, + 'config': { + description: 'Points to config file (default: ../config.json)', + alias: 'c', + default: '../config.json', + type: 'string' + }, + }).help().version('1.0').argv + +const debug = Boolean(process.env.DEBUG) || argv.debug + +let cfgh = new configHelper({ + localfile: argv.config, +}) + +function meshRedisConns(mesh, meshName, debug, rootConfig) { + const { redis, ...meshConfig } = mesh + return redis.map(cfg => + new RedisConnexion({ + debug, + config: { ...cfg, ...meshConfig, maestro: rootConfig.maestro }, + redisId: cfg.redisId, + meshName, + meshModule: meshModules[meshName], + }) + ) +} + +async function startAllRedis(rootConfig, cfgh) { + if(debug) console.log('Starting all Redis instances...') + + const redisConns = [ + ...meshRedisConns(rootConfig.systemMesh, 'system', debug, rootConfig), + ...meshRedisConns(rootConfig.arenaMesh, 'arena', debug, rootConfig), + ] + + const srv = new maestroServer(cfgh, redisConns, debug) + for(const cnx of redisConns) { + if(cnx.meshName === 'system') srv.wireSystemConnexion(cnx) + else if(cnx.meshName === 'arena') srv.wireArenaConnexion(cnx) + } + + const loginResults = await Promise.allSettled( + redisConns.map(async cnx => { + await cnx.redisLogin() + return(cnx.redisId) + }) + ) + + const failedLogin = loginResults.filter(r => r.status !== 'fulfilled') + if(failedLogin.length > 0) { + console.error('Redis login failures:') + failedLogin.forEach((r, i) => { + const id = redisConns[i].redisId + console.error(`login failed for redis:[${id}] → ${r.reason}`) + }) + throw new Error( + `Redis login failed for ${failedLogin.length}/${redisConns.length} instances` + ) + } + + if(debug) console.log('All Redis logins OK') + + const chanResults = await Promise.allSettled( + redisConns.map(async cnx => { + await cnx.redisChansStart() + return(cnx.redisId) + }) + ) + + const failedChans = chanResults.filter(r => r.status !== 'fulfilled') + if(failedChans.length > 0) { + console.error('Redis chansStart failures:') + failedChans.forEach((r, i) => { + const id = redisConns[i].redisId + console.error(`chansStart failed for redis:[${id}] → ${r.reason}`) + }) + throw new Error( + `Redis chansStart failed for ${failedChans.length}/${redisConns.length} instances` + ) + } + + if(debug) console.log('All Redis chansStart OK') + + return({ redisConns, srv }) +} + +cfgh.fetchConfig().then(async rootConfig => { + if(!rootConfig) { + console.error('Cannot get a valid configuration ! Aaarrghhh...') + process.exit() + } + + console.log(`Debug mode : ${debug ? 'ON' : 'OFF'}`) + + const { srv } = await startAllRedis(rootConfig, cfgh) + const dbOk = await srv.init() + if(!dbOk) { + console.error('Maestro MySQL init failed — exiting') + process.exit(1) + } +}) diff --git a/SimMaestro/package.json b/SimMaestro/package.json new file mode 100644 index 0000000..6f4566c --- /dev/null +++ b/SimMaestro/package.json @@ -0,0 +1,9 @@ +{ + "name": "p42SimMaestro", + "version": "1.0.0", + "description": "Simulation orchestrator God-daemon for P42", + "type": "module", + "dependencies": { + "yargs": "^17.7.2" + } +} diff --git a/SimMaestro/simRepository.js b/SimMaestro/simRepository.js new file mode 100644 index 0000000..6b412d1 --- /dev/null +++ b/SimMaestro/simRepository.js @@ -0,0 +1,116 @@ +import { mysqlExecute } from '../mysqlClient.js' + +const UUID_RE = /^[0-9a-f]{8}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{12}$/i + +export function isValidUuid(val) { + return(typeof(val) === 'string' && UUID_RE.test(val)) +} + +function parseGpsValues(raw) { + let v = raw + if(v == null) return(null) + if(typeof(v) === 'string') { + try { v = JSON.parse(v) } + catch { return(null) } + } + if(typeof(v) !== 'object') return(null) + const position = v.position + const speed = v.speed ?? v.vector + if(!position || !speed) return(null) + const axes = ['x', 'y', 'z'] + for(const axis of axes) { + if(typeof(position[axis]) !== 'number' || typeof(speed[axis]) !== 'number') return(null) + } + return({ + position: { x: position.x, y: position.y, z: position.z }, + vector: { x: speed.x, y: speed.y, z: speed.z }, + }) +} + +export class SimRepository { + + constructor(dbPool, debug = false) { + this.db = dbPool + this.debug = debug + } + + async validateSimulationAccess(userUuid, simulationUuid, keyframeId) { + if(!isValidUuid(userUuid)) return({ ok: false, err: 'Invalid user UUID' }) + if(!isValidUuid(simulationUuid)) return({ ok: false, err: 'Invalid simulation UUID' }) + if(!isValidUuid(keyframeId)) return({ ok: false, err: 'Invalid keyframe ID' }) + + const rows = await mysqlExecute(this.db, ` + SELECT s.sim_id, + BIN_TO_UUID(s.sim_uuid) AS sim_uuid, + BIN_TO_UUID(s.sim_root_kf_uuid) AS sim_root_kf_uuid + FROM p42SIM.simulations s + INNER JOIN p42GUI.simowners o ON o.own_sim_uuid = s.sim_uuid + INNER JOIN p42GUI.users u ON o.own_usr_id = u.usr_id + WHERE u.usr_uuid = ? + AND s.sim_uuid = UUID_TO_BIN(?) + `, [userUuid, simulationUuid]) + + if(!rows.length) return({ ok: false, err: 'Simulation not found or access denied' }) + + const sim = rows[0] + if(sim.sim_root_kf_uuid !== keyframeId) { + return({ ok: false, err: 'Keyframe does not match simulation root keyframe' }) + } + + const kfRows = await mysqlExecute(this.db, ` + SELECT ekf_uuid + FROM p42SIM.edited_keyframes + WHERE ekf_uuid = UUID_TO_BIN(?) + `, [keyframeId]) + if(!kfRows.length) return({ ok: false, err: 'Keyframe not found' }) + + return({ ok: true, sim }) + } + + async validateSimulationOwner(userUuid, simulationUuid) { + if(!isValidUuid(userUuid)) return({ ok: false, err: 'Invalid user UUID' }) + if(!isValidUuid(simulationUuid)) return({ ok: false, err: 'Invalid simulation UUID' }) + + const rows = await mysqlExecute(this.db, ` + SELECT s.sim_id, + BIN_TO_UUID(s.sim_uuid) AS sim_uuid, + BIN_TO_UUID(s.sim_root_kf_uuid) AS sim_root_kf_uuid + FROM p42SIM.simulations s + INNER JOIN p42GUI.simowners o ON o.own_sim_uuid = s.sim_uuid + INNER JOIN p42GUI.users u ON o.own_usr_id = u.usr_id + WHERE u.usr_uuid = ? + AND s.sim_uuid = UUID_TO_BIN(?) + `, [userUuid, simulationUuid]) + + if(!rows.length) return({ ok: false, err: 'Simulation not found or access denied' }) + return({ ok: true, sim: rows[0] }) + } + + async loadKeyframeAgents(keyframeId) { + const rows = await mysqlExecute(this.db, ` + SELECT BIN_TO_UUID(ekfs_agent_id) AS agent_id, ekfs_gps_values + FROM p42SIM.edited_kf_store + WHERE ekfs_ekf_uuid = UUID_TO_BIN(?) + `, [keyframeId]) + + const agents = [] + const errors = [] + + for(const row of rows) { + const parsed = parseGpsValues(row.ekfs_gps_values) + if(!parsed) { + errors.push(`Invalid GPS values for agent ${row.agent_id}`) + continue + } + agents.push({ + id: row.agent_id, + position: parsed.position, + vector: parsed.vector, + }) + } + + if(errors.length) return({ ok: false, err: errors.join('; '), agents: [] }) + return({ ok: true, agents }) + } + +} diff --git a/SimMaestro/startMaestro.sh b/SimMaestro/startMaestro.sh new file mode 100755 index 0000000..559f4cd --- /dev/null +++ b/SimMaestro/startMaestro.sh @@ -0,0 +1,31 @@ +#!/bin/sh + +. /etc/p42/secrets.env + +daemon=p42SimMaestro +logfile=maestro.log + +pid=$(pgrep -f "$daemon") + +if [ -z "$pid" ] +then + node "${daemon}.js" --debug > "$logfile" 2>&1 & + pid=$! + + sleep 1 + + if kill -0 "$pid" 2>/dev/null + then + echo "" + echo "$daemon is now running with PID=$pid" + echo "" + else + echo "" + echo "Failed to start $daemon. Check maestro.log" + echo "" + fi +else + echo "" + echo "$daemon is already running with PID=$pid" + echo "" +fi diff --git a/SimMaestro/stopMaestro.sh b/SimMaestro/stopMaestro.sh new file mode 100755 index 0000000..d904890 --- /dev/null +++ b/SimMaestro/stopMaestro.sh @@ -0,0 +1,8 @@ +#!/bin/sh + +pid=`ps -ef | grep p42SimMaestro.js |grep -v grep | awk '{print $2}'` +if [ -n "$pid" ] +then + echo "killing pid: $pid" + kill -9 $pid +fi diff --git a/config.json b/config.json index d241541..8d9e675 100644 --- a/config.json +++ b/config.json @@ -15,6 +15,13 @@ "GETAGENTSINPRISM" ], "roles": "*" + }, + { + "canDo": [ + "STARTSIMULATION", + "STOPSIMULATION" + ], + "roles": "*" } ], "gps": { @@ -28,11 +35,44 @@ }, "agentVectorChangeChannel": "arena:agents:*", "collisionsChannel": "arena:agents:[UID]", + "lifecycle": { + "arenaChannel": "arena:lifecycle", + "godsReadyChannel": "arena:gods:ready" + }, + "arenaStorage": { + "agentHashKey": "arena:agents:[UID]", + "agentsIndexKey": "arena:agents" + }, + "senderId": "gps", "nearMissDistance": 1, "prismTimeHeight": 60, "collisionTickMs": 100, "prismRefreshLeadSeconds": 1 }, + "maestro": { + "maestroActionsChannel": "system:requests:maestro", + "maestroActionsReply": "system:replies:[UID]", + "senderId": "maestro", + "lifecycle": { + "arenaChannel": "arena:lifecycle", + "godsReadyChannel": "arena:gods:ready" + }, + "expectedGods": ["gps"], + "readyTimeoutMs": 30000 + }, + "mysql": { + "socketPath": "/var/run/mysqld/mysqld.sock", + "database": "p42GUI" + }, + "observer": { + "observerActionsChannel": "system:requests:observer", + "observerActionsReply": "system:replies:[UID]", + "senderId": "observer", + "lifecycle": { + "arenaChannel": "arena:lifecycle", + "godsReadyChannel": "arena:gods:ready" + } + }, "systemMesh": { "redis": [ { diff --git a/configSchema.json b/configSchema.json index 8fed15c..4c02853 100644 --- a/configSchema.json +++ b/configSchema.json @@ -78,6 +78,29 @@ }, "agentVectorChangeChannel": { "type": "string" }, "collisionsChannel": { "type": "string" }, + "lifecycle": { + "type": "object", + "properties": { + "arenaChannel": { "type": "string" }, + "godsReadyChannel": { "type": "string" } + }, + "required": [ + "arenaChannel", + "godsReadyChannel" + ] + }, + "arenaStorage": { + "type": "object", + "properties": { + "agentHashKey": { "type": "string" }, + "agentsIndexKey": { "type": "string" } + }, + "required": [ + "agentHashKey", + "agentsIndexKey" + ] + }, + "senderId": { "type": "string" }, "nearMissDistance": { "type": "number", "minimum": 0 }, "prismTimeHeight": { "type": "number", "minimum": 0 }, "collisionTickMs": { "type": "integer", "minimum": 1 }, @@ -91,6 +114,68 @@ "collisionsChannel" ] }, + "maestro": { + "type": "object", + "properties": { + "maestroActionsChannel": { "type": "string" }, + "maestroActionsReply": { "type": "string" }, + "senderId": { "type": "string" }, + "lifecycle": { + "type": "object", + "properties": { + "arenaChannel": { "type": "string" }, + "godsReadyChannel": { "type": "string" } + }, + "required": [ + "arenaChannel", + "godsReadyChannel" + ] + }, + "expectedGods": { + "type": "array", + "items": { "type": "string" } + }, + "readyTimeoutMs": { "type": "integer", "minimum": 1000 } + }, + "required": [ + "maestroActionsChannel", + "maestroActionsReply" + ] + }, + "mysql": { + "type": "object", + "properties": { + "socketPath": { "type": "string" }, + "host": { "type": "string" }, + "port": { "type": "integer" }, + "database": { "type": "string" }, + "connectionLimit": { "type": "integer", "minimum": 1 } + }, + "required": [] + }, + "observer": { + "type": "object", + "properties": { + "observerActionsChannel": { "type": "string" }, + "observerActionsReply": { "type": "string" }, + "senderId": { "type": "string" }, + "lifecycle": { + "type": "object", + "properties": { + "arenaChannel": { "type": "string" }, + "godsReadyChannel": { "type": "string" } + }, + "required": [ + "arenaChannel", + "godsReadyChannel" + ] + } + }, + "required": [ + "observerActionsChannel", + "observerActionsReply" + ] + }, "systemMesh": { "type": "object", "properties": { diff --git a/mysqlClient.js b/mysqlClient.js new file mode 100644 index 0000000..bab229d --- /dev/null +++ b/mysqlClient.js @@ -0,0 +1,29 @@ +import mysql from 'mysql2/promise' + +export function resolveMysqlCredentials(config = {}) { + const user = process.env.user + const password = process.env.mysql_pass + if(!user || !password) { + throw new Error('Missing MySQL credentials: set user and mysql_pass in environment') + } + return({ + socketPath: config.socketPath, + host: config.host, + port: config.port, + user, + password, + database: config.database ?? 'p42GUI', + waitForConnections: true, + connectionLimit: config.connectionLimit ?? 5, + queueLimit: 0, + }) +} + +export async function createMysqlPool(config) { + return(await mysql.createPool(resolveMysqlCredentials(config))) +} + +export async function mysqlExecute(pool, query, values = []) { + const [rows] = await pool.execute(query, values) + return(rows) +} diff --git a/package.json b/package.json index a0c10d6..73b784a 100644 --- a/package.json +++ b/package.json @@ -2,6 +2,9 @@ "name": "p42GodDaemons", "type": "module", "dependencies": { - "ajv": "^8.12.0" + "ajv": "^8.12.0", + "mysql2": "^3.11.0", + "redis": "^4.3.0", + "yargs": "^17.7.2" } } diff --git a/GPS/redisConnexion.js b/redisConnexion.js similarity index 72% rename from GPS/redisConnexion.js rename to redisConnexion.js index 9db90ba..2ed8346 100644 --- a/GPS/redisConnexion.js +++ b/redisConnexion.js @@ -1,25 +1,17 @@ import redis from 'redis' -import * as systemMesh from './actions/system/index.js' -import * as arenaMesh from './actions/arena/index.js' - -const meshModules = { - system: systemMesh, - arena: arenaMesh, -} export class RedisConnexion { - constructor(options) { + constructor(options) { this.config = options.config this.debug = options.debug this.redisId = options.redisId this.redisConfig = this.config this.meshName = options.meshName + this.meshModule = options.meshModule ?? null - const mesh = meshModules[this.meshName] - if(mesh?.meshActions) Object.assign(this, mesh.meshActions) - this.afterLoginMethods = mesh?.afterLoginMethods ?? [] - if(!mesh) console.warn(`[${this.redisId}] Unknown meshName: ${this.meshName}`) + if(this.meshModule?.meshActions) Object.assign(this, this.meshModule.meshActions) + this.afterLoginMethods = this.meshModule?.afterLoginMethods ?? [] this.redisClient = redis.createClient({ socket: { @@ -27,12 +19,12 @@ export class RedisConnexion { host: this.redisConfig.host, port: this.redisConfig.port } - }); + }) - this.redisSubscriber = null; + this.redisSubscriber = null this.redisClient.on('error', (err) => { - console.error('Redis error: ', err); - }); + console.error('Redis error: ', err) + }) if(this.debug) console.log(`[${this.redisConfig.redisId}] Redis started...`) } @@ -50,18 +42,18 @@ export class RedisConnexion { } async redisLogin(){ - if(this.debug) console.log(`Connecting to Redis (${this.redisConfig.host}:${this.redisConfig.port}, tls:${this.redisConfig.tls?'yes':'no'})...`); - await this.redisClient.connect(); - if(this.debug) console.log(`Connected to Redis ${this.redisConfig.redisId}`); + if(this.debug) console.log(`Connecting to Redis (${this.redisConfig.host}:${this.redisConfig.port}, tls:${this.redisConfig.tls?'yes':'no'})...`) + await this.redisClient.connect() + if(this.debug) console.log(`Connected to Redis ${this.redisConfig.redisId}`) if(this.redisConfig.user) { - await this.redisClient.sendCommand(['AUTH', this.redisConfig.user, this.redisConfig.pass]); - if(this.debug) console.log(`Logged into Redis ${this.redisConfig.redisId}`); + await this.redisClient.sendCommand(['AUTH', this.redisConfig.user, this.redisConfig.pass]) + if(this.debug) console.log(`Logged into Redis ${this.redisConfig.redisId}`) } else { - if(this.debug) console.log(`Connected (anon) to Redis ${this.redisConfig.redisId}`); + if(this.debug) console.log(`Connected (anon) to Redis ${this.redisConfig.redisId}`) } if(this.debug) { - var redisTime = await this.redisClient.time(); - console.log(`[${this.redisConfig.redisId}] Redis ${this.redisConfig.redisId} time:`, redisTime); + var redisTime = await this.redisClient.time() + console.log(`[${this.redisConfig.redisId}] Redis ${this.redisConfig.redisId} time:`, redisTime) } for(const method of this.afterLoginMethods){ @@ -71,41 +63,41 @@ export class RedisConnexion { } async redisChansStart(){ - this.redisSubscriber = this.redisClient.duplicate(); - await this.redisSubscriber.connect(); + this.redisSubscriber = this.redisClient.duplicate() + await this.redisSubscriber.connect() if(this.redisConfig.user) { - await this.redisSubscriber.sendCommand(['AUTH', this.redisConfig.user, this.redisConfig.pass]); + await this.redisSubscriber.sendCommand(['AUTH', this.redisConfig.user, this.redisConfig.pass]) } const allChans = this.redisConfig.basePrefix + this.redisConfig.chansNamespace+'*' - this.redisSubscriber.pSubscribe(allChans, this.redisReceive.bind(this)); - if(this.debug) console.log(`[${this.redisConfig.redisId}] PSubscription OK `, allChans); + this.redisSubscriber.pSubscribe(allChans, this.redisReceive.bind(this)) + if(this.debug) console.log(`[${this.redisConfig.redisId}] PSubscription OK `, allChans) } async redisSubscribe(chanName, callBack){ if(!chanName.startsWith(this.redisConfig.basePrefix)) chanName = this.redisConfig.basePrefix + chanName if(!chanName.startsWith(this.redisConfig.basePrefix+this.redisConfig.chansNamespace)) { console.warn(`[${this.redisConfig.redisId}] redisSubscribe : forbidden channel range on this redis !`) - return + return } - await this.redisSubscriber.subscribe(chanName, callBack); + await this.redisSubscriber.subscribe(chanName, callBack) } async redisPublish(chanName, msg){ - if(typeof (msg) != 'string') msg = JSON.stringify(msg); + if(typeof (msg) != 'string') msg = JSON.stringify(msg) if(!chanName.startsWith(this.redisConfig.basePrefix)) chanName = this.redisConfig.basePrefix + chanName if(!chanName.startsWith(this.redisConfig.basePrefix+this.redisConfig.chansNamespace)) { console.warn(`[${this.redisConfig.redisId}] redisPublish : forbidden channel range on this redis ! (${chanName} / ${this.redisConfig.basePrefix+this.redisConfig.chansNamespace}) `) - return + return } - await this.redisClient.publish(chanName, msg); + await this.redisClient.publish(chanName, msg) } async redisSet(k, v, exp = 0, customPrefix=null){ - if(typeof(v) != 'string') v = JSON.stringify(v); + if(typeof(v) != 'string') v = JSON.stringify(v) if(customPrefix!==null) k = customPrefix + k else if(!k.startsWith(this.redisConfig.basePrefix)) k = this.redisConfig.basePrefix + k - if(this.debug) console.log(`[${this.redisConfig.redisId}] Redis SET `, k); + if(this.debug) console.log(`[${this.redisConfig.redisId}] Redis SET `, k) try { await this.redisClient.set(k, v) } catch(err) { console.error(`[${this.redisConfig.redisId}] Redis crash doing Redis set: `, k, v) } if(exp > 0) { @@ -120,8 +112,8 @@ export class RedisConnexion { if(this.debug) console.log(`[${this.redisConfig.redisId}] Redis GET`, k) let v=null try { v = await this.redisClient.get(k) } - catch(err) { console.error(`[${this.redisConfig.redisId}] Redis crash doing Redis get: `, k) } - return(v); + catch(err) { console.error(`[${this.redisConfig.redisId}] Redis crash doing Redis get: `, k) } + return(v) } resolveKey(k, customPrefix=null){ @@ -136,6 +128,36 @@ export class RedisConnexion { await this.redisClient.del(k) } + async redisHget(k, field, customPrefix=null){ + k = this.resolveKey(k, customPrefix) + if(this.debug) console.log(`[${this.redisConfig.redisId}] Redis HGET`, k, field) + try { return(await this.redisClient.hGet(k, field)) } + catch(err) { + console.error(`[${this.redisConfig.redisId}] Redis crash doing HGET: `, k, field, err) + return(null) + } + } + + async redisHgetall(k, customPrefix=null){ + k = this.resolveKey(k, customPrefix) + if(this.debug) console.log(`[${this.redisConfig.redisId}] Redis HGETALL`, k) + try { return(await this.redisClient.hGetAll(k)) } + catch(err) { + console.error(`[${this.redisConfig.redisId}] Redis crash doing HGETALL: `, k, err) + return({}) + } + } + + async redisSmembers(k, customPrefix=null){ + k = this.resolveKey(k, customPrefix) + if(this.debug) console.log(`[${this.redisConfig.redisId}] Redis SMEMBERS`, k) + try { return(await this.redisClient.sMembers(k)) } + catch(err) { + console.error(`[${this.redisConfig.redisId}] Redis crash doing SMEMBERS: `, k, err) + return([]) + } + } + async redisHset(k, field, v, customPrefix=null){ if(typeof(v) != 'string') v = JSON.stringify(v) k = this.resolveKey(k, customPrefix) @@ -165,28 +187,27 @@ export class RedisConnexion { catch(err) { console.error(`[${this.redisConfig.redisId}] Redis crash doing SREM: `, k, member, err) } } - async redisGetTtl(k, customPrefix=null){ if(customPrefix!==null) k = customPrefix + k else if(!k.startsWith(this.redisConfig.basePrefix)) k = this.redisConfig.basePrefix + k if(this.debug) console.log(`[${this.redisConfig.redisId}] Redis Get TTL `, k) let v=null try { v = await this.redisClient.ttl(k) } - catch(err) { console.error(`[${this.redisConfig.redisId}] Redis crash doing Redis ttl: `, k) } - return(v); + catch(err) { console.error(`[${this.redisConfig.redisId}] Redis crash doing Redis ttl: `, k) } + return(v) } - + async redisSetTtl(k, ttl, customPrefix=null){ if(customPrefix!==null) k = customPrefix + k else if(!k.startsWith(this.redisConfig.basePrefix)) k = this.redisConfig.basePrefix + k - if(this.debug) console.log(`[${this.redisConfig.redisId}] Redis Set TTL`, k); + if(this.debug) console.log(`[${this.redisConfig.redisId}] Redis Set TTL`, k) try { await this.redisClient.expire(k, ttl) } catch(err) { console.error(`[${this.redisConfig.redisId}] Redis crash doing Redis expire: `, k, ttl) } - } + } async redisXadd(streamName, kvObj, max = ''){ if(!streamName.startsWith(this.redisConfig.basePrefix)) streamName = this.redisConfig.basePrefix + streamName - if(this.debug) console.log(`[${this.redisConfig.redisId}] Redis XADD `, streamName, kvObj); + if(this.debug) console.log(`[${this.redisConfig.redisId}] Redis XADD `, streamName, kvObj) let arr = ['XADD', streamName] if(max != '') arr = [...arr, ...['MAXLEN', '~', (1*max).toString()]] arr.push('*') @@ -196,30 +217,30 @@ export class RedisConnexion { arr.push('streamData') arr.push(payload) let sid = null - try { sid = await this.redisClient.sendCommand(arr); } + try { sid = await this.redisClient.sendCommand(arr) } catch(err) { console.error(`[${this.redisConfig.redisId}] Redis crash doing Redis command: `, arr, err) } - return(sid); + return(sid) } async redisXrange(streamName, start = '-', end = '+', withPayload = true){ if(!streamName.startsWith(this.redisConfig.basePrefix)) streamName = this.redisConfig.basePrefix + streamName - if(this.debug) console.log(`[${this.redisConfig.redisId}]Redis XRANGE `, streamName); + if(this.debug) console.log(`[${this.redisConfig.redisId}]Redis XRANGE `, streamName) if(typeof(start)!='string') start = start.toString() if(typeof(end)!='string') end = end.toString() - let arr = ['XRANGE', streamName, start, end]; + let arr = ['XRANGE', streamName, start, end] let res = [] try { res = await this.redisClient.sendCommand(arr) } catch(err) { console.error(`[${this.redisConfig.redisId}] Redis crash doing Redis command: `, arr, err.msg) } if(withPayload){ - let o = {}; - for (let row of res) { // We'll take only the first content of the stream (the value of the key 'streamdata') + let o = {} + for (let row of res) { let payload = '' try{ payload = JSON.parse(row[1][1]) } catch(e) { console.warn(`[${this.redisConfig.redisId}] cannot unhistorize bad json: `,row[1][1]) } o[row[0]] = payload } - return(o); + return(o) } else { return(res.map(row => row[0])) } @@ -227,24 +248,26 @@ export class RedisConnexion { isHistorizedChan(chan){ if(!chan.startsWith(this.redisConfig.basePrefix)) chan = this.redisConfig.basePrefix + chan - var matches = this.redisConfig.historizeChannels.filter((e) => { + const historizeChannels = this.redisConfig.historizeChannels ?? [] + var matches = historizeChannels.filter((e) => { if(!e.startsWith(this.redisConfig.basePrefix)) e = this.redisConfig.basePrefix + e if(e.indexOf('*') > -1) { let r = new RegExp('^'+e.replace(/\*/g,'(.+)')+'$','g') - return(chan.match(r) != null); - } else return(chan == e); - }); - return(matches.length > 0); + return(chan.match(r) != null) + } else return(chan == e) + }) + return(matches.length > 0) } - + async getProcessInfo(){ - if(this.debug) console.log(`[${this.redisConfig.redisId}] Redis INFO`); - try { + if(this.debug) console.log(`[${this.redisConfig.redisId}] Redis INFO`) + let infoObject = {} + try { const rawInfo = await this.redisClient.INFO() let arr = rawInfo.replace(/\r/g,'').split('\n') arr = arr.filter(item => (item.trim()!='') && (!item.trim().startsWith('#'))) - let infoObject = arr.reduce((acc, val) => { kv = val.split(':',2); if(kv.length>0){ acc[kv[0]] = kv[1] } return(acc) }, {}) - } + infoObject = arr.reduce((acc, val) => { kv = val.split(':',2); if(kv.length>0){ acc[kv[0]] = kv[1] } return(acc) }, {}) + } catch(err) { console.error(`[${this.redisConfig.redisId}] Redis crash doing Redis INFO: `) } return(infoObject) } @@ -258,38 +281,9 @@ export class RedisConnexion { return } - if(this.meshName === 'arena' && this.config.gps && typeof(this.dispatchArenaMessage) === 'function') { - if(this.dispatchArenaMessage(msg, chan)) return + if(typeof(this.meshModule?.dispatchMessage) === 'function') { + this.meshModule.dispatchMessage(this, msg, chan) } - - if(this.meshName === 'system' && this.config.gps?.gpsActionsChannel){ - const actionsChan = this.fullChan(this.config.gps.gpsActionsChannel) - if(chan != actionsChan) return - - const action = msg.action - if(!action || typeof(action) !== 'string') { - console.warn(`[${this.redisConfig.redisId}] Ignoring message without action on ${chan}`) - return - } - - const handler = this['action_'+action] - if(typeof(handler) != 'function') { - if(this.debug) console.warn(`[${this.redisConfig.redisId}] Unknown action ${action} on ${chan}`) - return - } - - const payload = ('payload' in msg) ? msg.payload : null - const reqid = ('reqid' in msg) ? msg.reqid.substr(0, 50) : null - const sender = msg.sender || null - const roles = Array.isArray(msg.roles) ? msg.roles : ['*'] - - if(this.debug) console.log(`[${this.redisConfig.redisId}] Dispatching action ${action} from ${sender}`) - handler.call(this, action, payload, reqid, sender, roles) - } - } } - - -// redis-cli -h redis.backend.eismea.eu --tls --user default