diff --git a/GPS/actions/system/index.js b/GPS/actions/system/index.js index 4c4e5e4..f968bb8 100644 --- a/GPS/actions/system/index.js +++ b/GPS/actions/system/index.js @@ -1,5 +1,4 @@ import { methods as utilities, construct as utilitiesConstruct } from './utilities.js' -import { methods as positions } from './positions.js' import { dispatchMessage } from './dispatch.js' export const afterLoginMethods = [ @@ -7,6 +6,5 @@ export const afterLoginMethods = [ ] export const meshActions = { ...utilities, - ...positions, } export { dispatchMessage } diff --git a/GPS/actions/system/positions.js b/GPS/actions/system/positions.js deleted file mode 100644 index ae6d361..0000000 --- a/GPS/actions/system/positions.js +++ /dev/null @@ -1,152 +0,0 @@ -import { publishActionReply, parseSimTime } from '../../actionsHelper.js' - -export const methods = { - - /* Event-Rx: - { - "action": "GETAGENTPOSITION", - "reqid": "6az5e4r6a", - "payload": { - "agentId": "agent42", - "t": 12.5 - } - } - Event-Tx: - { - "action": "GETAGENTPOSITION", - "success": true, - "reqid": "6az5e4r6a", - "payload": { - "agent": { - "id": "agent42", - "position": { "x": 1, "y": 2, "z": 3 }, - "vector": { "x": 0, "y": 0, "z": 0 }, - "since": 0, - "generation": 2, - "t": 12.5 - } - } - } - */ - async action_GETAGENTPOSITION(action, payload, reqid, sender, roles) { - const replyOpts = { - action, - reqid, - sender, - replyChannel: this.config.gps.gpsActionsReply, - } - if(!this.accessRights.canDo(roles, action)) { - publishActionReply(this, { ...replyOpts, reply: { - success: false, - err: 'Unauthorized action !', - } }) - return - } - - if(!this.gpsSrv.isLive()) { - publishActionReply(this, { ...replyOpts, reply: { - success: false, - err: 'Simulation not live', - } }) - return - } - - const agentId = payload?.agentId - if(!agentId || typeof(agentId) !== 'string') { - publishActionReply(this, { ...replyOpts, reply: { - success: false, - err: 'Missing or invalid agentId', - } }) - return - } - - const at = parseSimTime(payload, () => this.gpsSrv.now()) - if(at === null) { - publishActionReply(this, { ...replyOpts, reply: { - success: false, - err: 'Invalid simulation time', - } }) - return - } - - const agent = this.gpsSrv.getAgentPosition(agentId, at) - if(!agent) { - publishActionReply(this, { ...replyOpts, reply: { - success: false, - err: `Unknown agent: ${agentId}`, - } }) - return - } - - publishActionReply(this, { ...replyOpts, reply: { - success: true, - payload: { agent }, - } }) - }, - - /* Event-Rx: - { - "action": "GETAGENTSINPRISM", - "reqid": "6az5e4r6a", - "payload": { - "prism": { - "xMin": -10, "xMax": 10, - "yMin": -10, "yMax": 10, - "zMin": 0, "zMax": 5 - }, - "t": 0 - } - } - */ - async action_GETAGENTSINPRISM(action, payload, reqid, sender, roles) { - const replyOpts = { - action, - reqid, - sender, - replyChannel: this.config.gps.gpsActionsReply, - } - if(!this.accessRights.canDo(roles, action)) { - publishActionReply(this, { ...replyOpts, reply: { - success: false, - err: 'Unauthorized action !', - } }) - return - } - - if(!this.gpsSrv.isLive()) { - publishActionReply(this, { ...replyOpts, reply: { - success: false, - err: 'Simulation not live', - } }) - return - } - - const prism = payload?.prism - if(!this.gpsSrv.isValidPrism(prism)) { - publishActionReply(this, { ...replyOpts, reply: { - success: false, - err: 'Missing or invalid prism bounds', - } }) - return - } - - const at = parseSimTime(payload, () => this.gpsSrv.now()) - if(at === null) { - publishActionReply(this, { ...replyOpts, reply: { - success: false, - err: 'Invalid simulation time', - } }) - return - } - - const agents = this.gpsSrv.getAgentsInPrism(prism, at) - publishActionReply(this, { ...replyOpts, reply: { - success: true, - payload: { - agents, - t: at, - }, - } }) - }, - -} diff --git a/GPS/agentStore.js b/GPS/agentStore.js index fa64363..e9e9350 100644 --- a/GPS/agentStore.js +++ b/GPS/agentStore.js @@ -1,23 +1,23 @@ export class AgentStore { - constructor(systemCnx, storage, debug = false) { + constructor(systemCnx, gpsStorage, debug = false) { this.cnx = systemCnx - this.storage = storage + this.gpsStorage = gpsStorage this.debug = debug } agentHashKey(agentId) { - return(this.storage.agentHashKey.replace(/\[UID\]/g, agentId)) + return(this.gpsStorage.agentHashKey.replace(/\[UID\]/g, agentId)) } async clearAll() { try { - const ids = await this.cnx.redisSmembers(this.storage.agentsIndexKey) + const ids = await this.cnx.redisSmembers(this.gpsStorage.agentsIndexKey) for(const id of ids) { await this.cnx.redisDel(this.agentHashKey(id)) } - await this.cnx.redisDel(this.storage.agentsIndexKey) + await this.cnx.redisDel(this.gpsStorage.agentsIndexKey) if(this.debug) console.log(`[GPS] Cleared system agent store (${ids.length} agent(s))`) } catch(err) { console.error('[GPS] Failed to clear system agent store:', err) @@ -37,11 +37,11 @@ export class AgentStore { simulationId, } await this.cnx.redisHset(this.agentHashKey(agent.id), 'segment', record) - await this.cnx.redisSadd(this.storage.agentsIndexKey, agent.id) + await this.cnx.redisSadd(this.gpsStorage.agentsIndexKey, agent.id) await this.cnx.redisXadd( - this.storage.positionsStream, + this.gpsStorage.positionsStream, record, - this.storage.streamMaxLen ?? '' + this.gpsStorage.streamMaxLen ?? '' ) if(this.debug) console.log(`[GPS] Exported segment ${agent.id} (${eventType})`) } catch(err) { @@ -57,11 +57,11 @@ export class AgentStore { t: simT, } await this.cnx.redisDel(this.agentHashKey(agentId)) - await this.cnx.redisSrem(this.storage.agentsIndexKey, agentId) + await this.cnx.redisSrem(this.gpsStorage.agentsIndexKey, agentId) await this.cnx.redisXadd( - this.storage.positionsStream, + this.gpsStorage.positionsStream, record, - this.storage.streamMaxLen ?? '' + this.gpsStorage.streamMaxLen ?? '' ) if(this.debug) console.log(`[GPS] Exported remove ${agentId}`) } catch(err) { diff --git a/GPS/arenaAgentLoader.js b/GPS/arenaAgentLoader.js index 8c73f9c..828dd76 100644 --- a/GPS/arenaAgentLoader.js +++ b/GPS/arenaAgentLoader.js @@ -1,9 +1,9 @@ export class ArenaAgentLoader { - constructor(arenaCnx, storage, debug = false) { + constructor(arenaCnx, arenaStorage, debug = false) { this.cnx = arenaCnx - this.storage = storage + this.arenaStorage = arenaStorage this.debug = debug } @@ -45,11 +45,11 @@ export class ArenaAgentLoader { async #listAgentIds(expectedIds = null) { if(Array.isArray(expectedIds) && expectedIds.length) return([...expectedIds]) - return(await this.cnx.redisSmembers(this.storage.agentsIndexKey)) + return(await this.cnx.redisSmembers(this.arenaStorage.agentsIndexKey)) } async #loadAgentFromHash(agentId) { - const key = this.storage.agentHashKey.replace(/\[UID\]/g, agentId) + const key = this.arenaStorage.agentHashKey.replace(/\[UID\]/g, agentId) const positionRaw = await this.cnx.redisHget(key, 'position') const vectorRaw = await this.cnx.redisHget(key, 'vector') let position = this.#parseHashField(positionRaw) diff --git a/GPS/gpsServer.js b/GPS/gpsServer.js index cbca56e..5a0c831 100644 --- a/GPS/gpsServer.js +++ b/GPS/gpsServer.js @@ -89,9 +89,9 @@ export class gpsServer { } initAgentStore() { - const storage = this.gpsConfig.gps?.storage - if(storage && this.systemCnx) { - this.agentStore = new AgentStore(this.systemCnx, storage, this.debug) + const gpsStorage = this.gpsConfig.gps?.GPSstorage + if(gpsStorage && this.systemCnx) { + this.agentStore = new AgentStore(this.systemCnx, gpsStorage, this.debug) } } diff --git a/GPS/startGps.sh b/GPS/startGps.sh index c3731d0..20b49df 100755 --- a/GPS/startGps.sh +++ b/GPS/startGps.sh @@ -1,6 +1,8 @@ #!/bin/sh +set -a . /etc/p42/secrets.env +set +a daemon=p42Gps logfile=gps.log diff --git a/Observer/actions/arena/arenaHandlers.js b/Observer/actions/arena/arenaHandlers.js index 7c42d3a..cd415bf 100644 --- a/Observer/actions/arena/arenaHandlers.js +++ b/Observer/actions/arena/arenaHandlers.js @@ -4,8 +4,28 @@ export const construct = (redisCnx) => { export const methods = { + handleLifecycleEvent(msg) { + const srv = this.observerSrv + if(!srv) return + + if(msg.eventType === 'onYourMarks') { + srv.onYourMarks() + return + } + + if(msg.eventType === 'bigBang') { + srv.onBigBang() + } + }, + dispatchArenaMessage(msg, chan) { - if(this.debug) console.log(`[${this.redisId}] Arena message (unhandled):`, msg.eventType, chan) + const observer = this.config.observer + if(!observer || !this.observerSrv) return(false) + + if(this.matchesChan(chan, observer.lifecycle?.arenaChannel ?? 'arena:lifecycle')) { + this.handleLifecycleEvent(msg) + return(true) + } return(false) }, diff --git a/Observer/actions/system/index.js b/Observer/actions/system/index.js index 6b282da..3f95450 100644 --- a/Observer/actions/system/index.js +++ b/Observer/actions/system/index.js @@ -1,4 +1,5 @@ import { methods as utilities, construct as utilitiesConstruct } from './utilities.js' +import { methods as positions } from './positions.js' import { dispatchMessage } from './dispatch.js' export const afterLoginMethods = [ @@ -7,6 +8,7 @@ export const afterLoginMethods = [ export const meshActions = { ...utilities, + ...positions, } export { dispatchMessage } diff --git a/Observer/actions/system/positions.js b/Observer/actions/system/positions.js new file mode 100644 index 0000000..223f820 --- /dev/null +++ b/Observer/actions/system/positions.js @@ -0,0 +1,280 @@ +import { publishActionReply, parseSimTime } from '../../actionsHelper.js' +import { Frustum } from '../../frustum.js' + +export const methods = { + + /* Event-Rx: + { + "action": "GETAGENTPOSITION", + "reqid": "6az5e4r6a", + "payload": { + "agentId": "agent42", + "t": 12.5 + } + } + Event-Tx: + { + "action": "GETAGENTPOSITION", + "success": true, + "reqid": "6az5e4r6a", + "payload": { + "agent": { + "id": "agent42", + "position": { "x": 1, "y": 2, "z": 3 }, + "vector": { "x": 0, "y": 0, "z": 0 }, + "since": 0, + "generation": 2, + "t": 12.5 + } + } + } + */ + async action_GETAGENTPOSITION(action, payload, reqid, sender, roles) { + const replyOpts = { + action, + reqid, + sender, + replyChannel: this.config.observer.observerActionsReply, + } + if(!this.accessRights.canDo(roles, action)) { + publishActionReply(this, { ...replyOpts, reply: { + success: false, + err: 'Unauthorized action !', + } }) + return + } + + const reader = this.observerSrv.gpsStorageReader + if(!reader) { + publishActionReply(this, { ...replyOpts, reply: { + success: false, + err: 'GPS storage reader not ready', + } }) + return + } + + if(!this.observerSrv.isLive()) { + publishActionReply(this, { ...replyOpts, reply: { + success: false, + err: 'Simulation not live', + } }) + return + } + + const agentId = payload?.agentId + if(!agentId || typeof(agentId) !== 'string') { + publishActionReply(this, { ...replyOpts, reply: { + success: false, + err: 'Missing or invalid agentId', + } }) + return + } + + const at = parseSimTime(payload, () => this.observerSrv.now()) + if(at === null) { + publishActionReply(this, { ...replyOpts, reply: { + success: false, + err: 'Invalid simulation time', + } }) + return + } + + const agent = await reader.getAgentPosition(agentId, at) + if(!agent) { + publishActionReply(this, { ...replyOpts, reply: { + success: false, + err: `Unknown agent: ${agentId}`, + } }) + return + } + + publishActionReply(this, { ...replyOpts, reply: { + success: true, + payload: { agent }, + } }) + }, + + /* Event-Rx: + { + "action": "GETAGENTSINFRUSTUM", + "reqid": "6az5e4r6a", + "payload": { + "planes": [ + { "nx": 1, "ny": 0, "nz": 0, "d": -10 }, + { "nx": -1, "ny": 0, "nz": 0, "d": 10 }, + { "nx": 0, "ny": 1, "nz": 0, "d": -10 }, + { "nx": 0, "ny": -1, "nz": 0, "d": 10 }, + { "nx": 0, "ny": 0, "nz": 1, "d": 0 }, + { "nx": 0, "ny": 0, "nz": -1, "d": 5 } + ], + "t": 0 + } + } + */ + async action_GETAGENTSINFRUSTUM(action, payload, reqid, sender, roles) { + const replyOpts = { + action, + reqid, + sender, + replyChannel: this.config.observer.observerActionsReply, + } + if(!this.accessRights.canDo(roles, action)) { + publishActionReply(this, { ...replyOpts, reply: { + success: false, + err: 'Unauthorized action !', + } }) + return + } + + const registry = this.observerSrv.requestorRegistry + if(!registry) { + publishActionReply(this, { ...replyOpts, reply: { + success: false, + err: 'Requestor registry not ready', + } }) + return + } + + if(!this.observerSrv.isLive()) { + publishActionReply(this, { ...replyOpts, reply: { + success: false, + err: 'Simulation not live', + } }) + return + } + + const frustum = Frustum.fromPlanes(payload?.planes) + if(!frustum) { + publishActionReply(this, { ...replyOpts, reply: { + success: false, + err: 'Missing or invalid frustum planes (expected 6)', + } }) + return + } + + const at = parseSimTime(payload, () => this.observerSrv.now()) + if(at === null) { + publishActionReply(this, { ...replyOpts, reply: { + success: false, + err: 'Invalid simulation time', + } }) + return + } + + const result = await registry.evaluateOnce({ frustum, t: at }) + if(!result.ok) { + publishActionReply(this, { ...replyOpts, reply: { + success: false, + err: result.err, + } }) + return + } + + publishActionReply(this, { ...replyOpts, reply: { + success: true, + payload: { + agents: result.agents, + t: result.t, + }, + } }) + }, + + /* Event-Rx: + { + "action": "SUBSCRIBEFRUSTUM", + "reqid": "6az5e4r6a", + "sender": "client-uuid", + "payload": { + "planes": [ + { "nx": 1, "ny": 0, "nz": 0, "d": -10 }, + { "nx": -1, "ny": 0, "nz": 0, "d": 10 }, + { "nx": 0, "ny": 1, "nz": 0, "d": -10 }, + { "nx": 0, "ny": -1, "nz": 0, "d": 10 }, + { "nx": 0, "ny": 0, "nz": 1, "d": 0 }, + { "nx": 0, "ny": 0, "nz": -1, "d": 5 } + ], + "frequency": 800 + } + } + Event-Tx: + { + "action": "SUBSCRIBEFRUSTUM", + "success": true, + "reqid": "6az5e4r6a", + "payload": { + "frequency": 900, + "agents": [ ... ], + "t": 12.5 + } + } + Periodic push (no reqid): + { + "action": "GETAGENTSINFRUSTUM", + "success": true, + "sender": "observer", + "payload": { "agents": [ ... ], "t": 12.5 } + } + */ + async action_SUBSCRIBEFRUSTUM(action, payload, reqid, sender, roles) { + const replyOpts = { + action, + reqid, + sender, + replyChannel: this.config.observer.observerActionsReply, + } + if(!this.accessRights.canDo(roles, action)) { + publishActionReply(this, { ...replyOpts, reply: { + success: false, + err: 'Unauthorized action !', + } }) + return + } + + if(!sender || typeof(sender) !== 'string') { + publishActionReply(this, { ...replyOpts, reply: { + success: false, + err: 'Missing or invalid sender', + } }) + return + } + + const registry = this.observerSrv.requestorRegistry + if(!registry) { + publishActionReply(this, { ...replyOpts, reply: { + success: false, + err: 'Requestor registry not ready', + } }) + return + } + + if(!this.observerSrv.isLive()) { + publishActionReply(this, { ...replyOpts, reply: { + success: false, + err: 'Simulation not live', + } }) + return + } + + const result = await registry.subscribeFrustum(sender, { + planes: payload?.planes, + frequency: payload?.frequency, + }) + if(!result.ok) { + publishActionReply(this, { ...replyOpts, reply: { + success: false, + err: result.err, + } }) + return + } + + publishActionReply(this, { ...replyOpts, reply: { + success: true, + payload: { + frequency: result.frequency, + agents: result.agents, + t: result.t, + }, + } }) + }, + +} diff --git a/Observer/actionsHelper.js b/Observer/actionsHelper.js index 10274e2..7573130 100644 --- a/Observer/actionsHelper.js +++ b/Observer/actionsHelper.js @@ -14,3 +14,9 @@ export function publishActionReply(redisCnx, options) { const chan = replyChannel.replace(/\[UID\]/g, sender) redisCnx.redisPublish(chan, reply) } + +export function parseSimTime(payload, fallbackFn) { + if(payload?.t != null && typeof(payload.t) === 'number' && !Number.isNaN(payload.t)) return(payload.t) + if(payload?.at != null && typeof(payload.at) === 'number' && !Number.isNaN(payload.at)) return(payload.at) + return(fallbackFn()) +} diff --git a/Observer/frustum.js b/Observer/frustum.js new file mode 100644 index 0000000..ed4a2d9 --- /dev/null +++ b/Observer/frustum.js @@ -0,0 +1,33 @@ +export class Frustum { + + constructor(planes) { + if(!Frustum.#validatePlanes(planes)) throw(new Error('Invalid frustum planes')) + this.planes = planes.map(p => ({ ...p })) + } + + static #validatePlanes(planes) { + if(!Array.isArray(planes) || planes.length !== 6) return(false) + for(const p of planes) { + if(!p || typeof(p) !== 'object') return(false) + if(typeof(p.nx) !== 'number' || Number.isNaN(p.nx)) return(false) + if(typeof(p.ny) !== 'number' || Number.isNaN(p.ny)) return(false) + if(typeof(p.nz) !== 'number' || Number.isNaN(p.nz)) return(false) + if(typeof(p.d) !== 'number' || Number.isNaN(p.d)) return(false) + } + return(true) + } + + static fromPlanes(planes) { + if(!Frustum.#validatePlanes(planes)) return(null) + return(new Frustum(planes)) + } + + containsPoint(position) { + for(const p of this.planes) { + const dist = p.nx * position.x + p.ny * position.y + p.nz * position.z + p.d + if(dist < 0) return(false) + } + return(true) + } + +} diff --git a/Observer/gpsStorageReader.js b/Observer/gpsStorageReader.js new file mode 100644 index 0000000..422a805 --- /dev/null +++ b/Observer/gpsStorageReader.js @@ -0,0 +1,82 @@ +import { positionAt } from '../GPS/actions/arena/worldline.js' + +export class GpsStorageReader { + + constructor(systemCnx, gpsStorage, debug = false) { + this.cnx = systemCnx + this.gpsStorage = gpsStorage + this.debug = debug + } + + agentHashKey(agentId) { + return(this.gpsStorage.agentHashKey.replace(/\[UID\]/g, agentId)) + } + + #parseHashField(raw) { + if(raw == null) return(null) + if(typeof(raw) === 'object') return(raw) + try { return(JSON.parse(raw)) } + catch { return(null) } + } + + #segmentToAgent(segment) { + if(!segment?.id) return(null) + if(!this.#isVector(segment.position) || !this.#isVector(segment.vector)) return(null) + return({ + id: segment.id, + position: { ...segment.position }, + vector: { ...segment.vector }, + since: segment.since ?? 0, + generation: segment.generation ?? 0, + }) + } + + #isVector(v) { + return( + v && + typeof(v) === 'object' && + typeof(v.x) === 'number' && + typeof(v.y) === 'number' && + typeof(v.z) === 'number' + ) + } + + buildAgentSnapshot(agent, at) { + return({ + id: agent.id, + position: positionAt(agent, at), + vector: { ...agent.vector }, + since: agent.since, + generation: agent.generation ?? 0, + t: at, + }) + } + + async loadSegment(agentId) { + const raw = await this.cnx.redisHget(this.agentHashKey(agentId), 'segment') + return(this.#parseHashField(raw)) + } + + async listAgentIds() { + return(await this.cnx.redisSmembers(this.gpsStorage.agentsIndexKey)) + } + + async loadAllAgents() { + const ids = await this.listAgentIds() + const agents = new Map() + for(const id of ids) { + const segment = await this.loadSegment(id) + const agent = this.#segmentToAgent(segment) + if(agent) agents.set(id, agent) + } + return(agents) + } + + async getAgentPosition(agentId, at) { + const segment = await this.loadSegment(agentId) + const agent = this.#segmentToAgent(segment) + if(!agent) return(null) + return(this.buildAgentSnapshot(agent, at)) + } + +} diff --git a/Observer/observerServer.js b/Observer/observerServer.js index fc1f480..bb5f4fa 100644 --- a/Observer/observerServer.js +++ b/Observer/observerServer.js @@ -1,4 +1,8 @@ import { AccesRights } from '../accesRights.js' +import { GpsStorageReader } from './gpsStorageReader.js' +import { RequestorRegistry } from './requestorRegistry.js' +import { publishActionReply } from './actionsHelper.js' +import { SimState } from '../GPS/simulationState.js' export class observerServer { @@ -11,12 +15,17 @@ export class observerServer { this.arenaCnx = null this.arenaCnxs = [] this.systemCnx = null + this.gpsStorageReader = null + this.requestorRegistry = null + this.state = SimState.IDLE + this.bigBangEpoch = null } getObserverSettings() { const observer = this.observerConfig.observer ?? {} return({ senderId: observer.senderId ?? 'observer', + scanIntervalMs: observer.scanIntervalMs ?? 300, lifecycle: { arenaChannel: observer.lifecycle?.arenaChannel ?? 'arena:lifecycle', godsReadyChannel: observer.lifecycle?.godsReadyChannel ?? 'arena:gods:ready', @@ -24,6 +33,70 @@ export class observerServer { }) } + getGpsStorageSettings() { + const gps = this.observerConfig.gps ?? {} + return(gps.GPSstorage ?? null) + } + + initGpsStorageReader() { + const gpsStorage = this.getGpsStorageSettings() + if(gpsStorage && this.systemCnx) { + this.gpsStorageReader = new GpsStorageReader(this.systemCnx, gpsStorage, this.debug) + this.initRequestorRegistry() + } + } + + initRequestorRegistry() { + if(!this.gpsStorageReader || this.requestorRegistry) return + const { scanIntervalMs } = this.getObserverSettings() + this.requestorRegistry = new RequestorRegistry( + this.gpsStorageReader, + () => this.now(), + scanIntervalMs, + (sender, payload) => this.publishFrustumUpdate(sender, payload), + this.debug + ) + } + + publishFrustumUpdate(sender, payload) { + if(!this.systemCnx || !sender) return + const observer = this.observerConfig.observer ?? {} + publishActionReply(this.systemCnx, { + action: 'GETAGENTSINFRUSTUM', + sender, + replyChannel: observer.observerActionsReply ?? 'system:replies:[UID]', + reply: { + success: true, + payload, + }, + }) + } + + isLive() { + return(this.state === SimState.LIVE) + } + + simNow() { + if(this.bigBangEpoch === null) return(null) + return((performance.now() - this.bigBangEpoch) / 1000) + } + + now() { + if(this.isLive()) return(this.simNow()) + return(null) + } + + onYourMarks() { + this.state = SimState.PREPARE + this.bigBangEpoch = null + this.requestorRegistry?.clear() + } + + onBigBang() { + this.bigBangEpoch = performance.now() + this.state = SimState.LIVE + } + wireSystemConnexion(cnx) { cnx.observerSrv = this cnx.accessRights = this.accessRights @@ -31,6 +104,7 @@ export class observerServer { cnx.getAccessRights = () => this.getAccessRights() if(!this.systemCnx || cnx.redisConfig.role === 'primary') { this.systemCnx = cnx + this.initGpsStorageReader() } } diff --git a/Observer/requestorRegistry.js b/Observer/requestorRegistry.js new file mode 100644 index 0000000..46d897a --- /dev/null +++ b/Observer/requestorRegistry.js @@ -0,0 +1,187 @@ +import { positionAt } from '../GPS/actions/arena/worldline.js' +import { Frustum } from './frustum.js' + +export class RequestorRegistry { + + constructor(reader, getNow, scanIntervalMs, onPush, debug = false) { + this.reader = reader + this.getNow = getNow + this.scanIntervalMs = scanIntervalMs + this.onPush = onPush + this.debug = debug + this.requestors = new Map() + } + + #tickTimer = null + #scanInFlight = false + + #adjustFrequency(requestedMs) { + if(typeof(requestedMs) !== 'number' || Number.isNaN(requestedMs)) return(null) + if(requestedMs < 300 || requestedMs > 10000) return(null) + const tickMs = this.scanIntervalMs + const ticks = Math.round(requestedMs / tickMs) + const minTicks = Math.ceil(300 / tickMs) + const maxTicks = Math.floor(10000 / tickMs) + const clampedTicks = Math.max(minTicks, Math.min(maxTicks, ticks)) + return(clampedTicks * tickMs) + } + + async subscribeFrustum(id, { planes, frequency }) { + if(!id || typeof(id) !== 'string') return({ ok: false, err: 'Invalid requestor id' }) + const frustum = Frustum.fromPlanes(planes) + if(!frustum) return({ ok: false, err: 'Invalid frustum planes' }) + + const frequencyMs = this.#adjustFrequency(frequency) + if(frequencyMs === null) { + return({ ok: false, err: 'Invalid frequency (expected 300–10000 ms)' }) + } + + const t = this.getNow() + if(t === null) return({ ok: false, err: 'Simulation not live' }) + + const agents = await this.reader.loadAllAgents() + const matching = this.#matchAgents(agents, frustum, t) + const pushEveryNTicks = frequencyMs / this.scanIntervalMs + + this.requestors.set(id, { + id, + frustum, + tMode: 'live', + subscription: true, + frequencyMs, + pushEveryNTicks, + tickCounter: 0, + agents: matching, + t, + updatedAt: Date.now(), + }) + this.#ensureTick() + + return({ + ok: true, + frequency: frequencyMs, + agents: matching, + t, + }) + } + + updateRequestor(id, spec) { + const requestor = this.requestors.get(id) + if(!requestor) return({ ok: false, err: 'Unknown requestor' }) + if(Array.isArray(spec?.planes)) { + const frustum = Frustum.fromPlanes(spec.planes) + if(!frustum) return({ ok: false, err: 'Invalid frustum planes' }) + requestor.frustum = frustum + } + if(typeof(spec?.frequency) === 'number') { + const frequencyMs = this.#adjustFrequency(spec.frequency) + if(frequencyMs === null) return({ ok: false, err: 'Invalid frequency (expected 300–10000 ms)' }) + requestor.frequencyMs = frequencyMs + requestor.pushEveryNTicks = frequencyMs / this.scanIntervalMs + requestor.tickCounter = 0 + } + return({ ok: true, frequency: requestor.frequencyMs }) + } + + unregisterRequestor(id) { + this.requestors.delete(id) + if(this.requestors.size === 0) this.#stopTick() + return({ ok: true }) + } + + getRequestorAgents(id) { + const requestor = this.requestors.get(id) + if(!requestor) return(null) + return({ + agents: [...requestor.agents], + t: requestor.t, + frequency: requestor.frequencyMs ?? null, + updatedAt: requestor.updatedAt, + }) + } + + clear() { + this.requestors.clear() + this.#stopTick() + } + + async evaluateOnce({ frustum, t }) { + if(!frustum || !(frustum instanceof Frustum)) { + return({ ok: false, err: 'Invalid frustum', agents: [] }) + } + if(typeof(t) !== 'number' || Number.isNaN(t)) return({ ok: false, err: 'Invalid simulation time', agents: [] }) + const agents = await this.reader.loadAllAgents() + const matching = this.#matchAgents(agents, frustum, t) + return({ ok: true, agents: matching, t }) + } + + #resolveT(requestor) { + if(requestor.tMode === 'fixed') return(requestor.fixedT) + return(this.getNow()) + } + + #matchAgents(agents, frustum, t) { + const matching = [] + for(const agent of agents.values()) { + const position = positionAt(agent, t) + if(!frustum.containsPoint(position)) continue + matching.push(this.reader.buildAgentSnapshot(agent, t)) + } + return(matching) + } + + #pushUpdate(requestor) { + if(typeof(this.onPush) !== 'function') return + this.onPush(requestor.id, { + agents: [...requestor.agents], + t: requestor.t, + }) + } + + async #scanAll() { + if(this.#scanInFlight || this.requestors.size === 0) return + this.#scanInFlight = true + try { + const agents = await this.reader.loadAllAgents() + for(const requestor of this.requestors.values()) { + const t = this.#resolveT(requestor) + if(t === null) { + requestor.agents = [] + requestor.t = null + requestor.updatedAt = Date.now() + continue + } + requestor.agents = this.#matchAgents(agents, requestor.frustum, t) + requestor.t = t + requestor.updatedAt = Date.now() + + if(!requestor.subscription) continue + + requestor.tickCounter++ + if(requestor.tickCounter >= requestor.pushEveryNTicks) { + requestor.tickCounter = 0 + this.#pushUpdate(requestor) + } + } + if(this.debug) console.log(`[Observer] Scanned ${agents.size} agent(s) for ${this.requestors.size} requestor(s)`) + } catch(err) { + console.error('[Observer] Requestor scan failed:', err) + } finally { + this.#scanInFlight = false + } + } + + #ensureTick() { + if(this.#tickTimer) return + this.#tickTimer = setInterval(() => { + this.#scanAll() + }, this.scanIntervalMs) + } + + #stopTick() { + if(!this.#tickTimer) return + clearInterval(this.#tickTimer) + this.#tickTimer = null + } + +} diff --git a/Observer/startObserver.sh b/Observer/startObserver.sh index 2573af3..cc026a2 100755 --- a/Observer/startObserver.sh +++ b/Observer/startObserver.sh @@ -1,6 +1,8 @@ #!/bin/sh +set -a . /etc/p42/secrets.env +set +a daemon=p42Observer logfile=observer.log diff --git a/SimMaestro/arenaGroom.js b/SimMaestro/arenaGroom.js index 0cd889f..4a69b35 100644 --- a/SimMaestro/arenaGroom.js +++ b/SimMaestro/arenaGroom.js @@ -1,22 +1,22 @@ export class ArenaGroom { - constructor(arenaCnx, storage, debug = false) { + constructor(arenaCnx, arenaStorage, debug = false) { this.cnx = arenaCnx - this.storage = storage + this.arenaStorage = arenaStorage this.debug = debug } agentHashKey(agentId) { - return(this.storage.agentHashKey.replace(/\[UID\]/g, agentId)) + return(this.arenaStorage.agentHashKey.replace(/\[UID\]/g, agentId)) } async clearArena() { - const ids = await this.cnx.redisSmembers(this.storage.agentsIndexKey) + const ids = await this.cnx.redisSmembers(this.arenaStorage.agentsIndexKey) for(const id of ids) { await this.cnx.redisDel(this.agentHashKey(id)) } - await this.cnx.redisDel(this.storage.agentsIndexKey) + await this.cnx.redisDel(this.arenaStorage.agentsIndexKey) if(this.debug) console.log(`[Maestro] Cleared arena store (${ids.length} agent(s))`) } @@ -25,7 +25,7 @@ export class ArenaGroom { const key = this.agentHashKey(agent.id) await this.cnx.redisHset(key, 'position', agent.position) await this.cnx.redisHset(key, 'vector', agent.vector) - await this.cnx.redisSadd(this.storage.agentsIndexKey, agent.id) + await this.cnx.redisSadd(this.arenaStorage.agentsIndexKey, agent.id) } if(this.debug) console.log(`[Maestro] Groomed ${agents.length} agent(s) into arena store`) } diff --git a/SimMaestro/startMaestro.sh b/SimMaestro/startMaestro.sh index 559f4cd..b35032c 100755 --- a/SimMaestro/startMaestro.sh +++ b/SimMaestro/startMaestro.sh @@ -1,6 +1,8 @@ #!/bin/sh +set -a . /etc/p42/secrets.env +set +a daemon=p42SimMaestro logfile=maestro.log diff --git a/config.json b/config.json index 8d9e675..084978c 100644 --- a/config.json +++ b/config.json @@ -12,7 +12,8 @@ { "canDo": [ "GETAGENTPOSITION", - "GETAGENTSINPRISM" + "GETAGENTSINFRUSTUM", + "SUBSCRIBEFRUSTUM" ], "roles": "*" }, @@ -27,7 +28,7 @@ "gps": { "gpsActionsChannel": "system:requests:gps", "gpsActionsReply": "system:replies:[UID]", - "storage": { + "GPSstorage": { "agentHashKey": "system:gps:agent:[UID]", "agentsIndexKey": "system:gps:agents", "positionsStream": "system:gps:positions", @@ -68,6 +69,7 @@ "observerActionsChannel": "system:requests:observer", "observerActionsReply": "system:replies:[UID]", "senderId": "observer", + "scanIntervalMs": 300, "lifecycle": { "arenaChannel": "arena:lifecycle", "godsReadyChannel": "arena:gods:ready" diff --git a/configSchema.json b/configSchema.json index 4c02853..63a5c42 100644 --- a/configSchema.json +++ b/configSchema.json @@ -62,7 +62,7 @@ "properties": { "gpsActionsChannel": { "type": "string" }, "gpsActionsReply": { "type": "string" }, - "storage": { + "GPSstorage": { "type": "object", "properties": { "agentHashKey": { "type": "string" }, @@ -109,7 +109,7 @@ "required": [ "gpsActionsChannel", "gpsActionsReply", - "storage", + "GPSstorage", "agentVectorChangeChannel", "collisionsChannel" ] @@ -159,6 +159,7 @@ "observerActionsChannel": { "type": "string" }, "observerActionsReply": { "type": "string" }, "senderId": { "type": "string" }, + "scanIntervalMs": { "type": "integer", "minimum": 50 }, "lifecycle": { "type": "object", "properties": { diff --git a/mysqlClient.js b/mysqlClient.js index bab229d..6211740 100644 --- a/mysqlClient.js +++ b/mysqlClient.js @@ -1,10 +1,12 @@ import mysql from 'mysql2/promise' +import { loadP42Secrets } from './secretsLoader.js' export function resolveMysqlCredentials(config = {}) { - const user = process.env.user + loadP42Secrets() + const user = process.env.mysql_user const password = process.env.mysql_pass if(!user || !password) { - throw new Error('Missing MySQL credentials: set user and mysql_pass in environment') + throw new Error('Missing MySQL credentials: set mysql_user and mysql_pass in environment') } return({ socketPath: config.socketPath, diff --git a/secretsLoader.js b/secretsLoader.js new file mode 100644 index 0000000..8ec7b11 --- /dev/null +++ b/secretsLoader.js @@ -0,0 +1,34 @@ +import fs from 'fs' + +const DEFAULT_SECRETS_PATH = '/etc/p42/secrets.env' + +function stripQuotes(value) { + if( + (value.startsWith('"') && value.endsWith('"')) || + (value.startsWith("'") && value.endsWith("'")) + ) { + return(value.slice(1, -1)) + } + return(value) +} + +export function loadP42Secrets(filePath = DEFAULT_SECRETS_PATH) { + if(process.env.mysql_user && process.env.mysql_pass) return(true) + + if(!fs.existsSync(filePath)) return(false) + + const text = fs.readFileSync(filePath, 'utf8') + for(const rawLine of text.split('\n')) { + const line = rawLine.trim() + if(!line || line.startsWith('#')) continue + const eq = line.indexOf('=') + if(eq < 1) continue + const key = line.slice(0, eq).trim() + const value = stripQuotes(line.slice(eq + 1).trim()) + if(key === 'mysql_user' || key === 'mysql_pass') { + process.env[key] = value + } + } + + return(Boolean(process.env.mysql_user && process.env.mysql_pass)) +}