Observer embryo, Maestro done

This commit is contained in:
STEINNI
2026-06-13 13:47:46 +00:00
parent 932b6e4752
commit 26aefd3fe2
45 changed files with 1889 additions and 143 deletions
+25 -1
View File
@@ -1,6 +1,7 @@
export const construct = (redisCnx) => {
const tickMs = redisCnx.gpsSrv?.getGpsSettings().collisionTickMs ?? 100
// Interval always runs; tickArena no-ops until LIVE (see gpsServer.tickArena)
setInterval(() => {
redisCnx.gpsSrv?.tickArena()
}, tickMs)
@@ -8,6 +9,24 @@ export const construct = (redisCnx) => {
export const methods = {
handleLifecycleEvent(msg) {
const srv = this.gpsSrv
if(!srv) return
if(msg.eventType === 'onYourMarks') {
srv.onYourMarks(msg.payload ?? {}).catch(err => {
console.error(`[${this.redisId}] onYourMarks failed:`, err)
srv.publishReadyToStart({ success: false, err: err.message ?? 'onYourMarks failed' })
})
return
}
if(msg.eventType === 'bigBang') {
srv.onBigBang(msg.payload ?? {})
return
}
},
handleAgentEvent(msg) {
const agentId = msg.sender
if(!agentId || typeof(agentId) !== 'string') {
@@ -34,7 +53,12 @@ export const methods = {
dispatchArenaMessage(msg, chan) {
const gps = this.config.gps
if(!gps || !this.gpsSrv) return false
if(!gps || !this.gpsSrv) return(false)
if(this.matchesChan(chan, gps.lifecycle?.arenaChannel ?? 'arena:lifecycle')) {
this.handleLifecycleEvent(msg)
return(true)
}
if(this.matchesChan(chan, gps.agentVectorChangeChannel)) {
this.handleAgentEvent(msg)
+5
View File
@@ -0,0 +1,5 @@
export function dispatchMessage(redisCnx, msg, chan) {
if(!redisCnx.config.gps || typeof(redisCnx.dispatchArenaMessage) !== 'function') return
redisCnx.dispatchArenaMessage(msg, chan)
}
+2
View File
@@ -1,4 +1,5 @@
import { methods as arenaMethods, construct as arenaConstruct } from './arenaHandlers.js'
import { dispatchMessage } from './dispatch.js'
export const afterLoginMethods = [
arenaConstruct,
@@ -7,3 +8,4 @@ export const afterLoginMethods = [
export const meshActions = {
...arenaMethods,
}
export { dispatchMessage }
+28
View File
@@ -0,0 +1,28 @@
export function dispatchMessage(redisCnx, msg, chan) {
const gps = redisCnx.config.gps
if(!gps?.gpsActionsChannel) return
const actionsChan = redisCnx.fullChan(gps.gpsActionsChannel)
if(chan != actionsChan) return
const action = msg.action
if(!action || typeof(action) !== 'string') {
console.warn(`[${redisCnx.redisId}] Ignoring message without action on ${chan}`)
return
}
const handler = redisCnx['action_'+action]
if(typeof(handler) != 'function') {
if(redisCnx.debug) console.warn(`[${redisCnx.redisId}] Unknown action ${action} on ${chan}`)
return
}
const payload = ('payload' in msg) ? msg.payload : null
const reqid = ('reqid' in msg) ? msg.reqid.substr(0, 50) : null
const sender = msg.sender || null
const roles = Array.isArray(msg.roles) ? msg.roles : ['*']
if(redisCnx.debug) console.log(`[${redisCnx.redisId}] Dispatching action ${action} from ${sender}`)
handler.call(redisCnx, action, payload, reqid, sender, roles)
}
+2
View File
@@ -1,5 +1,6 @@
import { methods as utilities, construct as utilitiesConstruct } from './utilities.js'
import { methods as positions } from './positions.js'
import { dispatchMessage } from './dispatch.js'
export const afterLoginMethods = [
utilitiesConstruct,
@@ -8,3 +9,4 @@ export const meshActions = {
...utilities,
...positions,
}
export { dispatchMessage }
+26 -20
View File
@@ -1,4 +1,4 @@
import { publishActionReply, parseAt } from '../../actionsHelper.js'
import { publishActionReply, parseSimTime } from '../../actionsHelper.js'
export const methods = {
@@ -8,7 +8,7 @@ export const methods = {
"reqid": "6az5e4r6a",
"payload": {
"agentId": "agent42",
"at": "2026-06-07T12:00:00.000Z"
"t": 12.5
}
}
Event-Tx:
@@ -21,9 +21,9 @@ export const methods = {
"id": "agent42",
"position": { "x": 1, "y": 2, "z": 3 },
"vector": { "x": 0, "y": 0, "z": 0 },
"since": 1717750800,
"since": 0,
"generation": 2,
"at": "2026-06-07T12:00:00.000Z"
"t": 12.5
}
}
}
@@ -43,6 +43,14 @@ export const methods = {
return
}
if(!this.gpsSrv.isLive()) {
publishActionReply(this, { ...replyOpts, reply: {
success: false,
err: 'Simulation not live',
} })
return
}
const agentId = payload?.agentId
if(!agentId || typeof(agentId) !== 'string') {
publishActionReply(this, { ...replyOpts, reply: {
@@ -52,11 +60,11 @@ export const methods = {
return
}
const at = parseAt(payload, () => this.gpsSrv.now())
const at = parseSimTime(payload, () => this.gpsSrv.now())
if(at === null) {
publishActionReply(this, { ...replyOpts, reply: {
success: false,
err: 'Invalid at timestamp',
err: 'Invalid simulation time',
} })
return
}
@@ -86,17 +94,7 @@ export const methods = {
"yMin": -10, "yMax": 10,
"zMin": 0, "zMax": 5
},
"at": "2026-06-07T12:00:00.000Z"
}
}
Event-Tx:
{
"action": "GETAGENTSINPRISM",
"success": true,
"reqid": "6az5e4r6a",
"payload": {
"agents": [ ... ],
"at": "2026-06-07T12:00:00.000Z"
"t": 0
}
}
*/
@@ -115,6 +113,14 @@ export const methods = {
return
}
if(!this.gpsSrv.isLive()) {
publishActionReply(this, { ...replyOpts, reply: {
success: false,
err: 'Simulation not live',
} })
return
}
const prism = payload?.prism
if(!this.gpsSrv.isValidPrism(prism)) {
publishActionReply(this, { ...replyOpts, reply: {
@@ -124,11 +130,11 @@ export const methods = {
return
}
const at = parseAt(payload, () => this.gpsSrv.now())
const at = parseSimTime(payload, () => this.gpsSrv.now())
if(at === null) {
publishActionReply(this, { ...replyOpts, reply: {
success: false,
err: 'Invalid at timestamp',
err: 'Invalid simulation time',
} })
return
}
@@ -138,7 +144,7 @@ export const methods = {
success: true,
payload: {
agents,
at: new Date(at * 1000).toISOString(),
t: at,
},
} })
},
+4 -5
View File
@@ -15,9 +15,8 @@ export function publishActionReply(redisCnx, options) {
redisCnx.redisPublish(chan, reply)
}
export function parseAt(payload, fallbackFn) {
if(!payload?.at) return(fallbackFn())
const t = Date.parse(payload.at)
if(Number.isNaN(t)) return(null)
return(t / 1000)
export function parseSimTime(payload, fallbackFn) {
if(payload?.t != null && typeof(payload.t) === 'number' && !Number.isNaN(payload.t)) return(payload.t)
if(payload?.at != null && typeof(payload.at) === 'number' && !Number.isNaN(payload.at)) return(payload.at)
return(fallbackFn())
}
+18 -4
View File
@@ -11,7 +11,20 @@ export class AgentStore {
return(this.storage.agentHashKey.replace(/\[UID\]/g, agentId))
}
async exportSegment(agent, eventType) {
async clearAll() {
try {
const ids = await this.cnx.redisSmembers(this.storage.agentsIndexKey)
for(const id of ids) {
await this.cnx.redisDel(this.agentHashKey(id))
}
await this.cnx.redisDel(this.storage.agentsIndexKey)
if(this.debug) console.log(`[GPS] Cleared system agent store (${ids.length} agent(s))`)
} catch(err) {
console.error('[GPS] Failed to clear system agent store:', err)
}
}
async exportSegment(agent, eventType, simT = 0, simulationId = null) {
try {
const record = {
eventType,
@@ -20,7 +33,8 @@ export class AgentStore {
vector: { ...agent.vector },
since: agent.since,
generation: agent.generation ?? 0,
at: new Date().toISOString(),
t: simT,
simulationId,
}
await this.cnx.redisHset(this.agentHashKey(agent.id), 'segment', record)
await this.cnx.redisSadd(this.storage.agentsIndexKey, agent.id)
@@ -35,12 +49,12 @@ export class AgentStore {
}
}
async exportRemove(agentId) {
async exportRemove(agentId, simT = 0) {
try {
const record = {
eventType: 'remove',
id: agentId,
at: new Date().toISOString(),
t: simT,
}
await this.cnx.redisDel(this.agentHashKey(agentId))
await this.cnx.redisSrem(this.storage.agentsIndexKey, agentId)
+82
View File
@@ -0,0 +1,82 @@
export class ArenaAgentLoader {
constructor(arenaCnx, storage, debug = false) {
this.cnx = arenaCnx
this.storage = storage
this.debug = debug
}
async loadAgents(expectedIds = null) {
// TODO: when arena mesh is sharded, iterate all arena Redis connections and merge agent ids
const agentIds = await this.#listAgentIds(expectedIds)
const agents = []
const errors = []
for(const agentId of agentIds) {
const result = await this.#loadAgentFromHash(agentId)
if(!result.ok) {
errors.push(result.err)
continue
}
agents.push(result.agent)
}
if(errors.length) return({ ok: false, err: errors.join('; '), agents: [] })
return({ ok: true, agents })
}
#parseHashField(raw) {
if(raw == null) return(null)
if(typeof(raw) === 'object') return(raw)
try { return(JSON.parse(raw)) }
catch { return(null) }
}
#isVector(v) {
return(
v &&
typeof(v) === 'object' &&
typeof(v.x) === 'number' &&
typeof(v.y) === 'number' &&
typeof(v.z) === 'number'
)
}
async #listAgentIds(expectedIds = null) {
if(Array.isArray(expectedIds) && expectedIds.length) return([...expectedIds])
return(await this.cnx.redisSmembers(this.storage.agentsIndexKey))
}
async #loadAgentFromHash(agentId) {
const key = this.storage.agentHashKey.replace(/\[UID\]/g, agentId)
const positionRaw = await this.cnx.redisHget(key, 'position')
const vectorRaw = await this.cnx.redisHget(key, 'vector')
let position = this.#parseHashField(positionRaw)
let vector = this.#parseHashField(vectorRaw)
if(!this.#isVector(vector)) {
const segmentRaw = await this.cnx.redisHget(key, 'segment')
const segment = this.#parseHashField(segmentRaw)
if(segment) {
if(!this.#isVector(vector) && this.#isVector(segment.vector)) vector = segment.vector
if(!this.#isVector(position) && this.#isVector(segment.position)) position = segment.position
}
}
if(!this.#isVector(vector)) return({ ok: false, err: `Invalid or missing vector for ${agentId}` })
if(!this.#isVector(position)) position = { x: 0, y: 0, z: 0 }
return({
ok: true,
agent: {
id: agentId,
position: { ...position },
vector: { ...vector },
since: 0,
generation: 1,
},
})
}
}
+182 -12
View File
@@ -1,6 +1,8 @@
import { AccesRights } from '../accesRights.js'
import { AgentStore } from './agentStore.js'
import { CollisionRegistry } from './collisionRegistry.js'
import { ArenaAgentLoader } from './arenaAgentLoader.js'
import { SimState, validateAgentSets } from './simulationState.js'
import {
compareWorldlinePair,
positionAt,
@@ -19,8 +21,14 @@ export class gpsServer {
this.agents = new Map()
this.registry = new CollisionRegistry()
this.arenaCnx = null
this.arenaCnxs = []
this.systemCnx = null
this.agentStore = null
this.arenaLoader = null
this.state = SimState.IDLE
this.simulationId = null
this.bigBangEpoch = null
this.ignoredChangeCount = 0
}
getGpsSettings() {
@@ -33,8 +41,40 @@ export class gpsServer {
})
}
getLifecycleSettings() {
const gps = this.gpsConfig.gps ?? {}
return({
arenaChannel: gps.lifecycle?.arenaChannel ?? 'arena:lifecycle',
godsReadyChannel: gps.lifecycle?.godsReadyChannel ?? 'arena:gods:ready',
senderId: gps.senderId ?? 'gps',
})
}
getArenaStorageSettings() {
const gps = this.gpsConfig.gps ?? {}
return({
agentHashKey: gps.arenaStorage?.agentHashKey ?? 'arena:agents:[UID]',
agentsIndexKey: gps.arenaStorage?.agentsIndexKey ?? 'arena:agents',
})
}
isLive() {
return(this.state === SimState.LIVE)
}
isPrepare() {
return(this.state === SimState.PREPARE)
}
simNow() {
if(this.bigBangEpoch === null) return(null)
return((performance.now() - this.bigBangEpoch) / 1000)
}
now() {
return(Date.now() / 1000)
if(this.isLive()) return(this.simNow())
if(this.isPrepare()) return(0)
return(null)
}
wireSystemConnexion(cnx) {
@@ -57,7 +97,11 @@ export class gpsServer {
wireArenaConnexion(cnx) {
cnx.gpsSrv = this
this.arenaCnx = cnx
this.arenaCnxs.push(cnx)
if(!this.arenaCnx || cnx.redisConfig.role === 'primary') {
this.arenaCnx = cnx
this.arenaLoader = new ArenaAgentLoader(cnx, this.getArenaStorageSettings(), this.debug)
}
}
getAgent(agentId) {
@@ -75,14 +119,16 @@ export class gpsServer {
vector: { ...agent.vector },
since: agent.since,
generation: agent.generation ?? 0,
at: new Date(at * 1000).toISOString(),
t: at,
})
}
getAgentPosition(agentId, at = null) {
if(!this.isLive()) return(null)
const agent = this.agents.get(agentId)
if(!agent) return(null)
const t = at ?? this.now()
if(t === null) return(null)
return(this.buildAgentSnapshot(agent, t))
}
@@ -108,7 +154,9 @@ export class gpsServer {
}
getAgentsInPrism(prism, at = null) {
if(!this.isLive()) return([])
const t = at ?? this.now()
if(t === null) return([])
const agents = []
for(const agent of this.agents.values()) {
const position = positionAt(agent, t)
@@ -119,6 +167,117 @@ export class gpsServer {
return(agents)
}
async resetSimulation() {
this.agents.clear()
this.registry = new CollisionRegistry()
this.simulationId = null
this.bigBangEpoch = null
this.ignoredChangeCount = 0
this.state = SimState.IDLE
await this.agentStore?.clearAll()
}
runInitialPairScan() {
const { nearMissDistance, prismTimeHeight } = this.getGpsSettings()
const ids = [...this.agents.keys()]
for(let i = 0; i < ids.length; i++) {
for(let j = i + 1; j < ids.length; j++) {
const hit = compareWorldlinePair(
this.agents.get(ids[i]),
this.agents.get(ids[j]),
prismTimeHeight,
nearMissDistance,
0
)
if(hit) this.registry.add(hit)
}
}
if(this.debug) console.log(`[GPS] Initial pair scan: ${this.registry.entries.length} proximity entries at T=0`)
}
async publishReadyToStart(result) {
if(!this.arenaCnx) return
const { godsReadyChannel, senderId } = this.getLifecycleSettings()
await this.arenaCnx.redisPublish(godsReadyChannel, {
eventType: 'readyToStart',
sender: senderId,
payload: {
success: result.success,
simulationId: this.simulationId,
agentIds: [...this.agents.keys()],
err: result.err ?? null,
},
})
}
async onYourMarks(payload = {}) {
await this.resetSimulation()
this.simulationId = payload.simulationId ?? null
if(!this.simulationId) {
console.error('[GPS] onYourMarks rejected: missing simulationId')
await this.publishReadyToStart({ success: false, err: 'Missing simulationId' })
return
}
if(!this.arenaLoader) {
console.error('[GPS] onYourMarks rejected: no arena loader')
await this.publishReadyToStart({ success: false, err: 'No arena Redis connection' })
return
}
const loadResult = await this.arenaLoader.loadAgents(payload.agentIds ?? null)
if(!loadResult.ok) {
console.error(`[GPS] onYourMarks load failed: ${loadResult.err}`)
await this.publishReadyToStart({ success: false, err: loadResult.err })
return
}
for(const agent of loadResult.agents) {
this.agents.set(agent.id, agent)
}
if(Array.isArray(payload.agentIds) && payload.agentIds.length) {
const mismatch = validateAgentSets(payload.agentIds, [...this.agents.keys()])
if(mismatch) {
console.error(`[GPS] onYourMarks agent mismatch: ${mismatch}`)
await this.resetSimulation()
await this.publishReadyToStart({ success: false, err: mismatch })
return
}
}
this.runInitialPairScan()
this.state = SimState.PREPARE
if(this.debug) console.log(`[GPS] PREPARE: ${this.agents.size} agent(s), simulationId=${this.simulationId}`)
await this.publishReadyToStart({ success: true })
}
async onBigBang(payload = {}) {
if(this.state !== SimState.PREPARE) {
console.error(`[GPS] bigBang rejected: expected PREPARE, got ${this.state}`)
return
}
if(!payload.simulationId || payload.simulationId !== this.simulationId) {
console.error('[GPS] bigBang rejected: simulationId mismatch')
return
}
if(Array.isArray(payload.agentIds) && payload.agentIds.length) {
const mismatch = validateAgentSets(payload.agentIds, [...this.agents.keys()])
if(mismatch) {
console.error(`[GPS] bigBang agent mismatch: ${mismatch}`)
return
}
}
this.bigBangEpoch = performance.now()
this.state = SimState.LIVE
for(const agent of this.agents.values()) {
await this.agentStore?.exportSegment(agent, 'start', 0, this.simulationId)
}
if(this.debug) console.log(`[GPS] LIVE: bigBangEpoch set, simulationId=${this.simulationId}`)
}
upsertAgent(agentId, newVector, newPosition = null) {
const now = this.now()
let agent = this.agents.get(agentId)
@@ -131,7 +290,7 @@ export class gpsServer {
generation: 1,
}
this.agents.set(agentId, agent)
this.agentStore?.exportSegment(agent, 'change')
this.agentStore?.exportSegment(agent, 'change', now, this.simulationId)
return(agent)
}
@@ -140,7 +299,7 @@ export class gpsServer {
agent.since = now
agent.generation = (agent.generation ?? 0) + 1
if(newPosition) agent.position = { ...newPosition }
this.agentStore?.exportSegment(agent, 'change')
this.agentStore?.exportSegment(agent, 'change', now, this.simulationId)
return(agent)
}
@@ -163,7 +322,7 @@ export class gpsServer {
const hits = this.scanAgentPairs(agentId, refreshed)
for(const hit of hits) this.registry.add(hit)
this.agentStore?.exportSegment(agent, 'refresh')
this.agentStore?.exportSegment(agent, 'refresh', now, this.simulationId)
if(this.debug) console.log(`[GPS] Prism refresh: ${agentId}`)
return(true)
}
@@ -196,6 +355,10 @@ export class gpsServer {
}
onVectorChange(agentId, newVector, newPosition = null) {
if(!this.isLive()) {
if(this.isPrepare()) this.ignoredChangeCount++
return([])
}
this.registry.purge(agentId)
this.upsertAgent(agentId, newVector, newPosition)
const hits = this.scanAgentPairs(agentId)
@@ -205,9 +368,10 @@ export class gpsServer {
}
onAgentRemove(agentId) {
if(!this.isLive()) return
this.agents.delete(agentId)
this.registry.purge(agentId)
this.agentStore?.exportRemove(agentId)
this.agentStore?.exportRemove(agentId, this.now())
if(this.debug) console.log(`[GPS] Agent removed: ${agentId}`)
}
@@ -226,9 +390,9 @@ export class gpsServer {
byAgent.get(agentId).push({
otherAgent,
distance: entry.distance,
at: new Date(entry.time * 1000).toISOString(),
t: entry.time,
minDistance: entry.minDistance,
minAt: new Date(entry.minTime * 1000).toISOString(),
minT: entry.minTime,
})
}
}
@@ -240,13 +404,18 @@ export class gpsServer {
const chan = this.arenaCnx.config.gps.collisionsChannel.replace(/\[UID\]/g, targetAgentId)
await this.arenaCnx.redisPublish(chan, {
eventType: 'proximity',
payload: { pairs },
sender: 'gps',
payload: {
pairs,
simulationId: this.simulationId,
},
sender: this.getLifecycleSettings().senderId,
})
}
async tickCollisions() {
const due = this.registry.dueBefore(this.now())
const now = this.now()
if(now === null) return
const due = this.registry.dueBefore(now)
if(!due.length) return
const batches = this.buildProximityBatches(due)
@@ -257,6 +426,7 @@ export class gpsServer {
}
tickArena() {
if(!this.isLive()) return
this.tickPrismRefresh()
this.tickCollisions()
}
+9 -1
View File
@@ -2,9 +2,16 @@
import yargs from 'yargs/yargs'
import { hideBin } from 'yargs/helpers'
import 'node:process'
import {RedisConnexion} from './redisConnexion.js'
import {RedisConnexion} from '../redisConnexion.js'
import {configHelper} from '../configHelper.js'
import {gpsServer} from './gpsServer.js'
import * as systemMesh from './actions/system/index.js'
import * as arenaMesh from './actions/arena/index.js'
const meshModules = {
system: systemMesh,
arena: arenaMesh,
}
///////////////////////////// Little improvement on console.xxx /////////////////////////////////////
@@ -52,6 +59,7 @@ function meshRedisConns(mesh, meshName, debug, rootConfig) {
config: { ...cfg, ...meshConfig, gps: rootConfig.gps },
redisId: cfg.redisId,
meshName,
meshModule: meshModules[meshName],
})
)
}
-295
View File
@@ -1,295 +0,0 @@
import redis from 'redis'
import * as systemMesh from './actions/system/index.js'
import * as arenaMesh from './actions/arena/index.js'
const meshModules = {
system: systemMesh,
arena: arenaMesh,
}
export class RedisConnexion {
constructor(options) {
this.config = options.config
this.debug = options.debug
this.redisId = options.redisId
this.redisConfig = this.config
this.meshName = options.meshName
const mesh = meshModules[this.meshName]
if(mesh?.meshActions) Object.assign(this, mesh.meshActions)
this.afterLoginMethods = mesh?.afterLoginMethods ?? []
if(!mesh) console.warn(`[${this.redisId}] Unknown meshName: ${this.meshName}`)
this.redisClient = redis.createClient({
socket: {
tls: this.redisConfig.tls,
host: this.redisConfig.host,
port: this.redisConfig.port
}
});
this.redisSubscriber = null;
this.redisClient.on('error', (err) => {
console.error('Redis error: ', err);
});
if(this.debug) console.log(`[${this.redisConfig.redisId}] Redis started...`)
}
fullChan(chanName){
if(!chanName.startsWith(this.redisConfig.basePrefix)) chanName = this.redisConfig.basePrefix + chanName
return(chanName)
}
matchesChan(chan, pattern) {
const fullChan = this.fullChan(chan)
const fullPattern = this.fullChan(pattern)
const re = new RegExp('^' + fullPattern.replace(/\*/g, '(.+)') + '$')
return(re.test(fullChan))
}
async redisLogin(){
if(this.debug) console.log(`Connecting to Redis (${this.redisConfig.host}:${this.redisConfig.port}, tls:${this.redisConfig.tls?'yes':'no'})...`);
await this.redisClient.connect();
if(this.debug) console.log(`Connected to Redis ${this.redisConfig.redisId}`);
if(this.redisConfig.user) {
await this.redisClient.sendCommand(['AUTH', this.redisConfig.user, this.redisConfig.pass]);
if(this.debug) console.log(`Logged into Redis ${this.redisConfig.redisId}`);
} else {
if(this.debug) console.log(`Connected (anon) to Redis ${this.redisConfig.redisId}`);
}
if(this.debug) {
var redisTime = await this.redisClient.time();
console.log(`[${this.redisConfig.redisId}] Redis ${this.redisConfig.redisId} time:`, redisTime);
}
for(const method of this.afterLoginMethods){
if(typeof method != 'function') continue
method(this)
}
}
async redisChansStart(){
this.redisSubscriber = this.redisClient.duplicate();
await this.redisSubscriber.connect();
if(this.redisConfig.user) {
await this.redisSubscriber.sendCommand(['AUTH', this.redisConfig.user, this.redisConfig.pass]);
}
const allChans = this.redisConfig.basePrefix + this.redisConfig.chansNamespace+'*'
this.redisSubscriber.pSubscribe(allChans, this.redisReceive.bind(this));
if(this.debug) console.log(`[${this.redisConfig.redisId}] PSubscription OK `, allChans);
}
async redisSubscribe(chanName, callBack){
if(!chanName.startsWith(this.redisConfig.basePrefix)) chanName = this.redisConfig.basePrefix + chanName
if(!chanName.startsWith(this.redisConfig.basePrefix+this.redisConfig.chansNamespace)) {
console.warn(`[${this.redisConfig.redisId}] redisSubscribe : forbidden channel range on this redis !`)
return
}
await this.redisSubscriber.subscribe(chanName, callBack);
}
async redisPublish(chanName, msg){
if(typeof (msg) != 'string') msg = JSON.stringify(msg);
if(!chanName.startsWith(this.redisConfig.basePrefix)) chanName = this.redisConfig.basePrefix + chanName
if(!chanName.startsWith(this.redisConfig.basePrefix+this.redisConfig.chansNamespace)) {
console.warn(`[${this.redisConfig.redisId}] redisPublish : forbidden channel range on this redis ! (${chanName} / ${this.redisConfig.basePrefix+this.redisConfig.chansNamespace}) `)
return
}
await this.redisClient.publish(chanName, msg);
}
async redisSet(k, v, exp = 0, customPrefix=null){
if(typeof(v) != 'string') v = JSON.stringify(v);
if(customPrefix!==null) k = customPrefix + k
else if(!k.startsWith(this.redisConfig.basePrefix)) k = this.redisConfig.basePrefix + k
if(this.debug) console.log(`[${this.redisConfig.redisId}] Redis SET `, k);
try { await this.redisClient.set(k, v) }
catch(err) { console.error(`[${this.redisConfig.redisId}] Redis crash doing Redis set: `, k, v) }
if(exp > 0) {
try { await this.redisClient.expire(k, exp) }
catch(err) { console.error(`[${this.redisConfig.redisId}] Redis crash doing Redis expire: `, k, exp) }
}
}
async redisGet(k, customPrefix=null){
if(customPrefix!==null) k = customPrefix + k
else if(!k.startsWith(this.redisConfig.basePrefix)) k = this.redisConfig.basePrefix + k
if(this.debug) console.log(`[${this.redisConfig.redisId}] Redis GET`, k)
let v=null
try { v = await this.redisClient.get(k) }
catch(err) { console.error(`[${this.redisConfig.redisId}] Redis crash doing Redis get: `, k) }
return(v);
}
resolveKey(k, customPrefix=null){
if(customPrefix!==null) return(customPrefix + k)
if(!k.startsWith(this.redisConfig.basePrefix)) k = this.redisConfig.basePrefix + k
return(k)
}
async redisDel(k, customPrefix=null){
k = this.resolveKey(k, customPrefix)
if(this.debug) console.log(`[${this.redisConfig.redisId}] Deleting`, k)
await this.redisClient.del(k)
}
async redisHset(k, field, v, customPrefix=null){
if(typeof(v) != 'string') v = JSON.stringify(v)
k = this.resolveKey(k, customPrefix)
if(this.debug) console.log(`[${this.redisConfig.redisId}] Redis HSET`, k, field)
try { await this.redisClient.hSet(k, field, v) }
catch(err) { console.error(`[${this.redisConfig.redisId}] Redis crash doing HSET: `, k, field, err) }
}
async redisHdel(k, field, customPrefix=null){
k = this.resolveKey(k, customPrefix)
if(this.debug) console.log(`[${this.redisConfig.redisId}] Redis HDEL`, k, field)
try { await this.redisClient.hDel(k, field) }
catch(err) { console.error(`[${this.redisConfig.redisId}] Redis crash doing HDEL: `, k, field, err) }
}
async redisSadd(k, member, customPrefix=null){
k = this.resolveKey(k, customPrefix)
if(this.debug) console.log(`[${this.redisConfig.redisId}] Redis SADD`, k, member)
try { await this.redisClient.sAdd(k, member) }
catch(err) { console.error(`[${this.redisConfig.redisId}] Redis crash doing SADD: `, k, member, err) }
}
async redisSrem(k, member, customPrefix=null){
k = this.resolveKey(k, customPrefix)
if(this.debug) console.log(`[${this.redisConfig.redisId}] Redis SREM`, k, member)
try { await this.redisClient.sRem(k, member) }
catch(err) { console.error(`[${this.redisConfig.redisId}] Redis crash doing SREM: `, k, member, err) }
}
async redisGetTtl(k, customPrefix=null){
if(customPrefix!==null) k = customPrefix + k
else if(!k.startsWith(this.redisConfig.basePrefix)) k = this.redisConfig.basePrefix + k
if(this.debug) console.log(`[${this.redisConfig.redisId}] Redis Get TTL `, k)
let v=null
try { v = await this.redisClient.ttl(k) }
catch(err) { console.error(`[${this.redisConfig.redisId}] Redis crash doing Redis ttl: `, k) }
return(v);
}
async redisSetTtl(k, ttl, customPrefix=null){
if(customPrefix!==null) k = customPrefix + k
else if(!k.startsWith(this.redisConfig.basePrefix)) k = this.redisConfig.basePrefix + k
if(this.debug) console.log(`[${this.redisConfig.redisId}] Redis Set TTL`, k);
try { await this.redisClient.expire(k, ttl) }
catch(err) { console.error(`[${this.redisConfig.redisId}] Redis crash doing Redis expire: `, k, ttl) }
}
async redisXadd(streamName, kvObj, max = ''){
if(!streamName.startsWith(this.redisConfig.basePrefix)) streamName = this.redisConfig.basePrefix + streamName
if(this.debug) console.log(`[${this.redisConfig.redisId}] Redis XADD `, streamName, kvObj);
let arr = ['XADD', streamName]
if(max != '') arr = [...arr, ...['MAXLEN', '~', (1*max).toString()]]
arr.push('*')
let payload = '""'
try{ payload = JSON.stringify(kvObj) }
catch(e) { console.warn(`[${this.redisConfig.redisId}] cannot historize bad json: `,kvObj) }
arr.push('streamData')
arr.push(payload)
let sid = null
try { sid = await this.redisClient.sendCommand(arr); }
catch(err) { console.error(`[${this.redisConfig.redisId}] Redis crash doing Redis command: `, arr, err) }
return(sid);
}
async redisXrange(streamName, start = '-', end = '+', withPayload = true){
if(!streamName.startsWith(this.redisConfig.basePrefix)) streamName = this.redisConfig.basePrefix + streamName
if(this.debug) console.log(`[${this.redisConfig.redisId}]Redis XRANGE `, streamName);
if(typeof(start)!='string') start = start.toString()
if(typeof(end)!='string') end = end.toString()
let arr = ['XRANGE', streamName, start, end];
let res = []
try { res = await this.redisClient.sendCommand(arr) }
catch(err) { console.error(`[${this.redisConfig.redisId}] Redis crash doing Redis command: `, arr, err.msg) }
if(withPayload){
let o = {};
for (let row of res) { // We'll take only the first content of the stream (the value of the key 'streamdata')
let payload = ''
try{ payload = JSON.parse(row[1][1]) }
catch(e) { console.warn(`[${this.redisConfig.redisId}] cannot unhistorize bad json: `,row[1][1]) }
o[row[0]] = payload
}
return(o);
} else {
return(res.map(row => row[0]))
}
}
isHistorizedChan(chan){
if(!chan.startsWith(this.redisConfig.basePrefix)) chan = this.redisConfig.basePrefix + chan
var matches = this.redisConfig.historizeChannels.filter((e) => {
if(!e.startsWith(this.redisConfig.basePrefix)) e = this.redisConfig.basePrefix + e
if(e.indexOf('*') > -1) {
let r = new RegExp('^'+e.replace(/\*/g,'(.+)')+'$','g')
return(chan.match(r) != null);
} else return(chan == e);
});
return(matches.length > 0);
}
async getProcessInfo(){
if(this.debug) console.log(`[${this.redisConfig.redisId}] Redis INFO`);
try {
const rawInfo = await this.redisClient.INFO()
let arr = rawInfo.replace(/\r/g,'').split('\n')
arr = arr.filter(item => (item.trim()!='') && (!item.trim().startsWith('#')))
let infoObject = arr.reduce((acc, val) => { kv = val.split(':',2); if(kv.length>0){ acc[kv[0]] = kv[1] } return(acc) }, {})
}
catch(err) { console.error(`[${this.redisConfig.redisId}] Redis crash doing Redis INFO: `) }
return(infoObject)
}
async redisReceive(msg, chan){
if(this.debug) console.log(`[${this.redisConfig.redisId}] From Redis chan:`, chan, msg)
try {
msg = JSON.parse(msg)
} catch {
console.warn(`[${this.redisConfig.redisId}] Ignoring non-json on channel ${chan} : ${msg}`)
return
}
if(this.meshName === 'arena' && this.config.gps && typeof(this.dispatchArenaMessage) === 'function') {
if(this.dispatchArenaMessage(msg, chan)) return
}
if(this.meshName === 'system' && this.config.gps?.gpsActionsChannel){
const actionsChan = this.fullChan(this.config.gps.gpsActionsChannel)
if(chan != actionsChan) return
const action = msg.action
if(!action || typeof(action) !== 'string') {
console.warn(`[${this.redisConfig.redisId}] Ignoring message without action on ${chan}`)
return
}
const handler = this['action_'+action]
if(typeof(handler) != 'function') {
if(this.debug) console.warn(`[${this.redisConfig.redisId}] Unknown action ${action} on ${chan}`)
return
}
const payload = ('payload' in msg) ? msg.payload : null
const reqid = ('reqid' in msg) ? msg.reqid.substr(0, 50) : null
const sender = msg.sender || null
const roles = Array.isArray(msg.roles) ? msg.roles : ['*']
if(this.debug) console.log(`[${this.redisConfig.redisId}] Dispatching action ${action} from ${sender}`)
handler.call(this, action, payload, reqid, sender, roles)
}
}
}
// redis-cli -h redis.backend.eismea.eu --tls --user default
+23
View File
@@ -0,0 +1,23 @@
export const SimState = {
IDLE: 'idle',
PREPARE: 'prepare',
LIVE: 'live',
}
export function validateAgentSets(expected, found) {
if(!Array.isArray(expected) || !Array.isArray(found)) {
return('agentIds must be arrays')
}
const exp = new Set(expected)
const fnd = new Set(found)
if(exp.size !== fnd.size) {
return(`Agent count mismatch: expected ${exp.size}, found ${fnd.size}`)
}
for(const id of exp) {
if(!fnd.has(id)) return(`Missing agent: ${id}`)
}
for(const id of fnd) {
if(!exp.has(id)) return(`Unexpected agent: ${id}`)
}
return(null)
}
+26 -6
View File
@@ -1,13 +1,33 @@
#!/bin/sh
cd /opt/p42GodDaemons/GPS/
. /etc/p42/secrets.env
daemon=p42Gps
logfile=gps.log
pid=$(pgrep -f "$daemon")
pid=`ps -ef | grep p42Gps |grep -v grep | awk '{print $2}'`
if [ -z "$pid" ]
then
node p42Gps.js --debug > gps.log 2>&1 &
node "${daemon}.js" --debug > "$logfile" 2>&1 &
pid=$!
sleep 1
if kill -0 "$pid" 2>/dev/null
then
echo ""
echo "$daemon is now running with PID=$pid"
echo ""
else
echo ""
echo "Failed to start $daemon. Check gps.log"
echo ""
fi
else
echo ''
echo 'Already running PID='"$pid"' (use stopGps.sh to stop it)'
echo ''
echo ""
echo "$daemon is already running with PID=$pid"
echo ""
fi