maestro refacto to grrom, getpositions actions transfered to Observer, changed to frustum, with subscription

This commit is contained in:
STEINNI
2026-06-14 10:16:56 +00:00
parent f3102d5fbc
commit c399f9ddb4
21 changed files with 760 additions and 185 deletions
+21 -1
View File
@@ -4,8 +4,28 @@ export const construct = (redisCnx) => {
export const methods = {
handleLifecycleEvent(msg) {
const srv = this.observerSrv
if(!srv) return
if(msg.eventType === 'onYourMarks') {
srv.onYourMarks()
return
}
if(msg.eventType === 'bigBang') {
srv.onBigBang()
}
},
dispatchArenaMessage(msg, chan) {
if(this.debug) console.log(`[${this.redisId}] Arena message (unhandled):`, msg.eventType, chan)
const observer = this.config.observer
if(!observer || !this.observerSrv) return(false)
if(this.matchesChan(chan, observer.lifecycle?.arenaChannel ?? 'arena:lifecycle')) {
this.handleLifecycleEvent(msg)
return(true)
}
return(false)
},
+2
View File
@@ -1,4 +1,5 @@
import { methods as utilities, construct as utilitiesConstruct } from './utilities.js'
import { methods as positions } from './positions.js'
import { dispatchMessage } from './dispatch.js'
export const afterLoginMethods = [
@@ -7,6 +8,7 @@ export const afterLoginMethods = [
export const meshActions = {
...utilities,
...positions,
}
export { dispatchMessage }
+280
View File
@@ -0,0 +1,280 @@
import { publishActionReply, parseSimTime } from '../../actionsHelper.js'
import { Frustum } from '../../frustum.js'
export const methods = {
/* Event-Rx:
{
"action": "GETAGENTPOSITION",
"reqid": "6az5e4r6a",
"payload": {
"agentId": "agent42",
"t": 12.5
}
}
Event-Tx:
{
"action": "GETAGENTPOSITION",
"success": true,
"reqid": "6az5e4r6a",
"payload": {
"agent": {
"id": "agent42",
"position": { "x": 1, "y": 2, "z": 3 },
"vector": { "x": 0, "y": 0, "z": 0 },
"since": 0,
"generation": 2,
"t": 12.5
}
}
}
*/
async action_GETAGENTPOSITION(action, payload, reqid, sender, roles) {
const replyOpts = {
action,
reqid,
sender,
replyChannel: this.config.observer.observerActionsReply,
}
if(!this.accessRights.canDo(roles, action)) {
publishActionReply(this, { ...replyOpts, reply: {
success: false,
err: 'Unauthorized action !',
} })
return
}
const reader = this.observerSrv.gpsStorageReader
if(!reader) {
publishActionReply(this, { ...replyOpts, reply: {
success: false,
err: 'GPS storage reader not ready',
} })
return
}
if(!this.observerSrv.isLive()) {
publishActionReply(this, { ...replyOpts, reply: {
success: false,
err: 'Simulation not live',
} })
return
}
const agentId = payload?.agentId
if(!agentId || typeof(agentId) !== 'string') {
publishActionReply(this, { ...replyOpts, reply: {
success: false,
err: 'Missing or invalid agentId',
} })
return
}
const at = parseSimTime(payload, () => this.observerSrv.now())
if(at === null) {
publishActionReply(this, { ...replyOpts, reply: {
success: false,
err: 'Invalid simulation time',
} })
return
}
const agent = await reader.getAgentPosition(agentId, at)
if(!agent) {
publishActionReply(this, { ...replyOpts, reply: {
success: false,
err: `Unknown agent: ${agentId}`,
} })
return
}
publishActionReply(this, { ...replyOpts, reply: {
success: true,
payload: { agent },
} })
},
/* Event-Rx:
{
"action": "GETAGENTSINFRUSTUM",
"reqid": "6az5e4r6a",
"payload": {
"planes": [
{ "nx": 1, "ny": 0, "nz": 0, "d": -10 },
{ "nx": -1, "ny": 0, "nz": 0, "d": 10 },
{ "nx": 0, "ny": 1, "nz": 0, "d": -10 },
{ "nx": 0, "ny": -1, "nz": 0, "d": 10 },
{ "nx": 0, "ny": 0, "nz": 1, "d": 0 },
{ "nx": 0, "ny": 0, "nz": -1, "d": 5 }
],
"t": 0
}
}
*/
async action_GETAGENTSINFRUSTUM(action, payload, reqid, sender, roles) {
const replyOpts = {
action,
reqid,
sender,
replyChannel: this.config.observer.observerActionsReply,
}
if(!this.accessRights.canDo(roles, action)) {
publishActionReply(this, { ...replyOpts, reply: {
success: false,
err: 'Unauthorized action !',
} })
return
}
const registry = this.observerSrv.requestorRegistry
if(!registry) {
publishActionReply(this, { ...replyOpts, reply: {
success: false,
err: 'Requestor registry not ready',
} })
return
}
if(!this.observerSrv.isLive()) {
publishActionReply(this, { ...replyOpts, reply: {
success: false,
err: 'Simulation not live',
} })
return
}
const frustum = Frustum.fromPlanes(payload?.planes)
if(!frustum) {
publishActionReply(this, { ...replyOpts, reply: {
success: false,
err: 'Missing or invalid frustum planes (expected 6)',
} })
return
}
const at = parseSimTime(payload, () => this.observerSrv.now())
if(at === null) {
publishActionReply(this, { ...replyOpts, reply: {
success: false,
err: 'Invalid simulation time',
} })
return
}
const result = await registry.evaluateOnce({ frustum, t: at })
if(!result.ok) {
publishActionReply(this, { ...replyOpts, reply: {
success: false,
err: result.err,
} })
return
}
publishActionReply(this, { ...replyOpts, reply: {
success: true,
payload: {
agents: result.agents,
t: result.t,
},
} })
},
/* Event-Rx:
{
"action": "SUBSCRIBEFRUSTUM",
"reqid": "6az5e4r6a",
"sender": "client-uuid",
"payload": {
"planes": [
{ "nx": 1, "ny": 0, "nz": 0, "d": -10 },
{ "nx": -1, "ny": 0, "nz": 0, "d": 10 },
{ "nx": 0, "ny": 1, "nz": 0, "d": -10 },
{ "nx": 0, "ny": -1, "nz": 0, "d": 10 },
{ "nx": 0, "ny": 0, "nz": 1, "d": 0 },
{ "nx": 0, "ny": 0, "nz": -1, "d": 5 }
],
"frequency": 800
}
}
Event-Tx:
{
"action": "SUBSCRIBEFRUSTUM",
"success": true,
"reqid": "6az5e4r6a",
"payload": {
"frequency": 900,
"agents": [ ... ],
"t": 12.5
}
}
Periodic push (no reqid):
{
"action": "GETAGENTSINFRUSTUM",
"success": true,
"sender": "observer",
"payload": { "agents": [ ... ], "t": 12.5 }
}
*/
async action_SUBSCRIBEFRUSTUM(action, payload, reqid, sender, roles) {
const replyOpts = {
action,
reqid,
sender,
replyChannel: this.config.observer.observerActionsReply,
}
if(!this.accessRights.canDo(roles, action)) {
publishActionReply(this, { ...replyOpts, reply: {
success: false,
err: 'Unauthorized action !',
} })
return
}
if(!sender || typeof(sender) !== 'string') {
publishActionReply(this, { ...replyOpts, reply: {
success: false,
err: 'Missing or invalid sender',
} })
return
}
const registry = this.observerSrv.requestorRegistry
if(!registry) {
publishActionReply(this, { ...replyOpts, reply: {
success: false,
err: 'Requestor registry not ready',
} })
return
}
if(!this.observerSrv.isLive()) {
publishActionReply(this, { ...replyOpts, reply: {
success: false,
err: 'Simulation not live',
} })
return
}
const result = await registry.subscribeFrustum(sender, {
planes: payload?.planes,
frequency: payload?.frequency,
})
if(!result.ok) {
publishActionReply(this, { ...replyOpts, reply: {
success: false,
err: result.err,
} })
return
}
publishActionReply(this, { ...replyOpts, reply: {
success: true,
payload: {
frequency: result.frequency,
agents: result.agents,
t: result.t,
},
} })
},
}
+6
View File
@@ -14,3 +14,9 @@ export function publishActionReply(redisCnx, options) {
const chan = replyChannel.replace(/\[UID\]/g, sender)
redisCnx.redisPublish(chan, reply)
}
export function parseSimTime(payload, fallbackFn) {
if(payload?.t != null && typeof(payload.t) === 'number' && !Number.isNaN(payload.t)) return(payload.t)
if(payload?.at != null && typeof(payload.at) === 'number' && !Number.isNaN(payload.at)) return(payload.at)
return(fallbackFn())
}
+33
View File
@@ -0,0 +1,33 @@
export class Frustum {
constructor(planes) {
if(!Frustum.#validatePlanes(planes)) throw(new Error('Invalid frustum planes'))
this.planes = planes.map(p => ({ ...p }))
}
static #validatePlanes(planes) {
if(!Array.isArray(planes) || planes.length !== 6) return(false)
for(const p of planes) {
if(!p || typeof(p) !== 'object') return(false)
if(typeof(p.nx) !== 'number' || Number.isNaN(p.nx)) return(false)
if(typeof(p.ny) !== 'number' || Number.isNaN(p.ny)) return(false)
if(typeof(p.nz) !== 'number' || Number.isNaN(p.nz)) return(false)
if(typeof(p.d) !== 'number' || Number.isNaN(p.d)) return(false)
}
return(true)
}
static fromPlanes(planes) {
if(!Frustum.#validatePlanes(planes)) return(null)
return(new Frustum(planes))
}
containsPoint(position) {
for(const p of this.planes) {
const dist = p.nx * position.x + p.ny * position.y + p.nz * position.z + p.d
if(dist < 0) return(false)
}
return(true)
}
}
+82
View File
@@ -0,0 +1,82 @@
import { positionAt } from '../GPS/actions/arena/worldline.js'
export class GpsStorageReader {
constructor(systemCnx, gpsStorage, debug = false) {
this.cnx = systemCnx
this.gpsStorage = gpsStorage
this.debug = debug
}
agentHashKey(agentId) {
return(this.gpsStorage.agentHashKey.replace(/\[UID\]/g, agentId))
}
#parseHashField(raw) {
if(raw == null) return(null)
if(typeof(raw) === 'object') return(raw)
try { return(JSON.parse(raw)) }
catch { return(null) }
}
#segmentToAgent(segment) {
if(!segment?.id) return(null)
if(!this.#isVector(segment.position) || !this.#isVector(segment.vector)) return(null)
return({
id: segment.id,
position: { ...segment.position },
vector: { ...segment.vector },
since: segment.since ?? 0,
generation: segment.generation ?? 0,
})
}
#isVector(v) {
return(
v &&
typeof(v) === 'object' &&
typeof(v.x) === 'number' &&
typeof(v.y) === 'number' &&
typeof(v.z) === 'number'
)
}
buildAgentSnapshot(agent, at) {
return({
id: agent.id,
position: positionAt(agent, at),
vector: { ...agent.vector },
since: agent.since,
generation: agent.generation ?? 0,
t: at,
})
}
async loadSegment(agentId) {
const raw = await this.cnx.redisHget(this.agentHashKey(agentId), 'segment')
return(this.#parseHashField(raw))
}
async listAgentIds() {
return(await this.cnx.redisSmembers(this.gpsStorage.agentsIndexKey))
}
async loadAllAgents() {
const ids = await this.listAgentIds()
const agents = new Map()
for(const id of ids) {
const segment = await this.loadSegment(id)
const agent = this.#segmentToAgent(segment)
if(agent) agents.set(id, agent)
}
return(agents)
}
async getAgentPosition(agentId, at) {
const segment = await this.loadSegment(agentId)
const agent = this.#segmentToAgent(segment)
if(!agent) return(null)
return(this.buildAgentSnapshot(agent, at))
}
}
+74
View File
@@ -1,4 +1,8 @@
import { AccesRights } from '../accesRights.js'
import { GpsStorageReader } from './gpsStorageReader.js'
import { RequestorRegistry } from './requestorRegistry.js'
import { publishActionReply } from './actionsHelper.js'
import { SimState } from '../GPS/simulationState.js'
export class observerServer {
@@ -11,12 +15,17 @@ export class observerServer {
this.arenaCnx = null
this.arenaCnxs = []
this.systemCnx = null
this.gpsStorageReader = null
this.requestorRegistry = null
this.state = SimState.IDLE
this.bigBangEpoch = null
}
getObserverSettings() {
const observer = this.observerConfig.observer ?? {}
return({
senderId: observer.senderId ?? 'observer',
scanIntervalMs: observer.scanIntervalMs ?? 300,
lifecycle: {
arenaChannel: observer.lifecycle?.arenaChannel ?? 'arena:lifecycle',
godsReadyChannel: observer.lifecycle?.godsReadyChannel ?? 'arena:gods:ready',
@@ -24,6 +33,70 @@ export class observerServer {
})
}
getGpsStorageSettings() {
const gps = this.observerConfig.gps ?? {}
return(gps.GPSstorage ?? null)
}
initGpsStorageReader() {
const gpsStorage = this.getGpsStorageSettings()
if(gpsStorage && this.systemCnx) {
this.gpsStorageReader = new GpsStorageReader(this.systemCnx, gpsStorage, this.debug)
this.initRequestorRegistry()
}
}
initRequestorRegistry() {
if(!this.gpsStorageReader || this.requestorRegistry) return
const { scanIntervalMs } = this.getObserverSettings()
this.requestorRegistry = new RequestorRegistry(
this.gpsStorageReader,
() => this.now(),
scanIntervalMs,
(sender, payload) => this.publishFrustumUpdate(sender, payload),
this.debug
)
}
publishFrustumUpdate(sender, payload) {
if(!this.systemCnx || !sender) return
const observer = this.observerConfig.observer ?? {}
publishActionReply(this.systemCnx, {
action: 'GETAGENTSINFRUSTUM',
sender,
replyChannel: observer.observerActionsReply ?? 'system:replies:[UID]',
reply: {
success: true,
payload,
},
})
}
isLive() {
return(this.state === SimState.LIVE)
}
simNow() {
if(this.bigBangEpoch === null) return(null)
return((performance.now() - this.bigBangEpoch) / 1000)
}
now() {
if(this.isLive()) return(this.simNow())
return(null)
}
onYourMarks() {
this.state = SimState.PREPARE
this.bigBangEpoch = null
this.requestorRegistry?.clear()
}
onBigBang() {
this.bigBangEpoch = performance.now()
this.state = SimState.LIVE
}
wireSystemConnexion(cnx) {
cnx.observerSrv = this
cnx.accessRights = this.accessRights
@@ -31,6 +104,7 @@ export class observerServer {
cnx.getAccessRights = () => this.getAccessRights()
if(!this.systemCnx || cnx.redisConfig.role === 'primary') {
this.systemCnx = cnx
this.initGpsStorageReader()
}
}
+187
View File
@@ -0,0 +1,187 @@
import { positionAt } from '../GPS/actions/arena/worldline.js'
import { Frustum } from './frustum.js'
export class RequestorRegistry {
constructor(reader, getNow, scanIntervalMs, onPush, debug = false) {
this.reader = reader
this.getNow = getNow
this.scanIntervalMs = scanIntervalMs
this.onPush = onPush
this.debug = debug
this.requestors = new Map()
}
#tickTimer = null
#scanInFlight = false
#adjustFrequency(requestedMs) {
if(typeof(requestedMs) !== 'number' || Number.isNaN(requestedMs)) return(null)
if(requestedMs < 300 || requestedMs > 10000) return(null)
const tickMs = this.scanIntervalMs
const ticks = Math.round(requestedMs / tickMs)
const minTicks = Math.ceil(300 / tickMs)
const maxTicks = Math.floor(10000 / tickMs)
const clampedTicks = Math.max(minTicks, Math.min(maxTicks, ticks))
return(clampedTicks * tickMs)
}
async subscribeFrustum(id, { planes, frequency }) {
if(!id || typeof(id) !== 'string') return({ ok: false, err: 'Invalid requestor id' })
const frustum = Frustum.fromPlanes(planes)
if(!frustum) return({ ok: false, err: 'Invalid frustum planes' })
const frequencyMs = this.#adjustFrequency(frequency)
if(frequencyMs === null) {
return({ ok: false, err: 'Invalid frequency (expected 30010000 ms)' })
}
const t = this.getNow()
if(t === null) return({ ok: false, err: 'Simulation not live' })
const agents = await this.reader.loadAllAgents()
const matching = this.#matchAgents(agents, frustum, t)
const pushEveryNTicks = frequencyMs / this.scanIntervalMs
this.requestors.set(id, {
id,
frustum,
tMode: 'live',
subscription: true,
frequencyMs,
pushEveryNTicks,
tickCounter: 0,
agents: matching,
t,
updatedAt: Date.now(),
})
this.#ensureTick()
return({
ok: true,
frequency: frequencyMs,
agents: matching,
t,
})
}
updateRequestor(id, spec) {
const requestor = this.requestors.get(id)
if(!requestor) return({ ok: false, err: 'Unknown requestor' })
if(Array.isArray(spec?.planes)) {
const frustum = Frustum.fromPlanes(spec.planes)
if(!frustum) return({ ok: false, err: 'Invalid frustum planes' })
requestor.frustum = frustum
}
if(typeof(spec?.frequency) === 'number') {
const frequencyMs = this.#adjustFrequency(spec.frequency)
if(frequencyMs === null) return({ ok: false, err: 'Invalid frequency (expected 30010000 ms)' })
requestor.frequencyMs = frequencyMs
requestor.pushEveryNTicks = frequencyMs / this.scanIntervalMs
requestor.tickCounter = 0
}
return({ ok: true, frequency: requestor.frequencyMs })
}
unregisterRequestor(id) {
this.requestors.delete(id)
if(this.requestors.size === 0) this.#stopTick()
return({ ok: true })
}
getRequestorAgents(id) {
const requestor = this.requestors.get(id)
if(!requestor) return(null)
return({
agents: [...requestor.agents],
t: requestor.t,
frequency: requestor.frequencyMs ?? null,
updatedAt: requestor.updatedAt,
})
}
clear() {
this.requestors.clear()
this.#stopTick()
}
async evaluateOnce({ frustum, t }) {
if(!frustum || !(frustum instanceof Frustum)) {
return({ ok: false, err: 'Invalid frustum', agents: [] })
}
if(typeof(t) !== 'number' || Number.isNaN(t)) return({ ok: false, err: 'Invalid simulation time', agents: [] })
const agents = await this.reader.loadAllAgents()
const matching = this.#matchAgents(agents, frustum, t)
return({ ok: true, agents: matching, t })
}
#resolveT(requestor) {
if(requestor.tMode === 'fixed') return(requestor.fixedT)
return(this.getNow())
}
#matchAgents(agents, frustum, t) {
const matching = []
for(const agent of agents.values()) {
const position = positionAt(agent, t)
if(!frustum.containsPoint(position)) continue
matching.push(this.reader.buildAgentSnapshot(agent, t))
}
return(matching)
}
#pushUpdate(requestor) {
if(typeof(this.onPush) !== 'function') return
this.onPush(requestor.id, {
agents: [...requestor.agents],
t: requestor.t,
})
}
async #scanAll() {
if(this.#scanInFlight || this.requestors.size === 0) return
this.#scanInFlight = true
try {
const agents = await this.reader.loadAllAgents()
for(const requestor of this.requestors.values()) {
const t = this.#resolveT(requestor)
if(t === null) {
requestor.agents = []
requestor.t = null
requestor.updatedAt = Date.now()
continue
}
requestor.agents = this.#matchAgents(agents, requestor.frustum, t)
requestor.t = t
requestor.updatedAt = Date.now()
if(!requestor.subscription) continue
requestor.tickCounter++
if(requestor.tickCounter >= requestor.pushEveryNTicks) {
requestor.tickCounter = 0
this.#pushUpdate(requestor)
}
}
if(this.debug) console.log(`[Observer] Scanned ${agents.size} agent(s) for ${this.requestors.size} requestor(s)`)
} catch(err) {
console.error('[Observer] Requestor scan failed:', err)
} finally {
this.#scanInFlight = false
}
}
#ensureTick() {
if(this.#tickTimer) return
this.#tickTimer = setInterval(() => {
this.#scanAll()
}, this.scanIntervalMs)
}
#stopTick() {
if(!this.#tickTimer) return
clearInterval(this.#tickTimer)
this.#tickTimer = null
}
}
+2
View File
@@ -1,6 +1,8 @@
#!/bin/sh
set -a
. /etc/p42/secrets.env
set +a
daemon=p42Observer
logfile=observer.log