From 77c8ad958a8834f6329c76663c35c0465acf5a0a Mon Sep 17 00:00:00 2001 From: STEINNI Date: Mon, 6 Oct 2025 17:58:12 +0000 Subject: [PATCH] MP starts again... --- actions/pubSub.js | 10 +++--- actions/store.js | 4 +-- configSchema.json | 4 +-- p42wssGateway.js | 2 +- redisConnexion.js | 72 +++++++++++++++++++++---------------------- wssConnexion.js | 2 +- wssGatewayConfig.json | 4 +-- 7 files changed, 49 insertions(+), 49 deletions(-) diff --git a/actions/pubSub.js b/actions/pubSub.js index 651d7d1..db0273a 100644 --- a/actions/pubSub.js +++ b/actions/pubSub.js @@ -27,7 +27,7 @@ export const methods = { let coudSubscribe = false for(const rediscnx of this.allRediscnx){ - if(!chan.startsWith(rediscnx.redisConfig.ChansFilter)) continue + if(!chan.startsWith(rediscnx.redisConfig.chansNamespace)) continue else coudSubscribe = true let localChan = rediscnx.redisConfig.basePrefix + localChan if(!(localChan in rediscnx.subscriptions)) rediscnx.subscriptions[localChan] = []; @@ -76,7 +76,7 @@ export const methods = { let couldUnsubscribe = false for(const rediscnx of this.allRediscnx){ - if(!chan.startsWith(rediscnx.redisConfig.ChansFilter)) continue + if(!chan.startsWith(rediscnx.redisConfig.chansNamespace)) continue else couldUnsubscribe = true let localChan = rediscnx.redisConfig.basePrefix + chan if((localChan in rediscnx.subscriptions) && (rediscnx.subscriptions[chan].includes(this.uuid))) { @@ -159,7 +159,7 @@ export const methods = { const chan = payload.chan // First find the primary redis for this chan namespace, to do historization first, and get the histId - const primaryRediscnx = this.allRediscnx.find(cnx => ((chan.startsWith(cnx.redisConfig.ChansFilter)) &&(cnx.redisConfig.role=='primary')) ) + const primaryRediscnx = this.allRediscnx.find(cnx => ((chan.startsWith(cnx.redisConfig.chansNamespace)) &&(cnx.redisConfig.role=='primary')) ) if(!primaryRediscnx){ this.sendErr(action, 'No primary redis for this chan !', reqid); if(this.debug) console.log('PUB: No primary redis for this chan ', chan) @@ -178,7 +178,7 @@ export const methods = { // Now publish on every Redis that covers this chan namespace try { payload.msg = JSON.stringify(msgO) } catch(err) {payload.msg = `{"err":"${err}}"` } for(const rediscnx of this.allRediscnx){ - if(!chan.startsWith(rediscnx.redisConfig.ChansFilter)) continue + if(!chan.startsWith(rediscnx.redisConfig.chansNamespace)) continue rediscnx.redisPublish(chan, payload.msg) } @@ -231,7 +231,7 @@ export const methods = { return } - const primaryRediscnx = this.allRediscnx.find(cnx => ((chan.startsWith(cnx.redisConfig.ChansFilter)) &&(cnx.redisConfig.role=='primary')) ) + const primaryRediscnx = this.allRediscnx.find(cnx => ((chan.startsWith(cnx.redisConfig.chansNamespace)) &&(cnx.redisConfig.role=='primary')) ) if(!primaryRediscnx){ this.sendErr(action, 'No primary redis for this chan !', reqid); if(this.debug) console.log('CHANHIST: No primary redis for this chan ', chan) diff --git a/actions/store.js b/actions/store.js index 82bb7e4..e533206 100644 --- a/actions/store.js +++ b/actions/store.js @@ -35,7 +35,7 @@ export const methods = { return } - const primaryRediscnx = this.allRediscnx.find(cnx => ((payload.key.startsWith(cnx.redisConfig.ChansFilter)) &&(cnx.redisConfig.role=='primary')) ) + const primaryRediscnx = this.allRediscnx.find(cnx => ((payload.key.startsWith(cnx.redisConfig.chansNamespace)) &&(cnx.redisConfig.role=='primary')) ) if(!primaryRediscnx){ this.sendErr(action, 'No primary redis for this key prefix !', reqid); if(this.debug) console.log('ACTION_SET: No primary redis for this key ', payload.key) @@ -98,7 +98,7 @@ export const methods = { return } - const primaryRediscnx = this.allRediscnx.find(cnx => ((payload.key.startsWith(cnx.redisConfig.ChansFilter)) &&(cnx.redisConfig.role=='primary')) ) + const primaryRediscnx = this.allRediscnx.find(cnx => ((payload.key.startsWith(cnx.redisConfig.chansNamespace)) &&(cnx.redisConfig.role=='primary')) ) if(!primaryRediscnx){ this.sendErr(action, 'No primary redis for this key prefix !', reqid); if(this.debug) console.log('ACTION_GET: No primary redis for this key ', payload.key) diff --git a/configSchema.json b/configSchema.json index c272461..1b1fa68 100644 --- a/configSchema.json +++ b/configSchema.json @@ -78,7 +78,7 @@ "port": { "type": "integer" }, "user": { "type": "string" }, "pass": { "type": "string" }, - "chansFilter": { "type": "string" }, + "chansNamespace": { "type": "string" }, "basePrefix": { "type": "string" }, "historizeMax": { "type": "integer" }, "historizePrefix": { "type": "string" }, @@ -93,7 +93,7 @@ }, "required": [ "redisId", - "chansFilter", + "chansNamespace", "role", "basePrefix", "storeMaxSize", diff --git a/p42wssGateway.js b/p42wssGateway.js index d83be22..a9a474f 100644 --- a/p42wssGateway.js +++ b/p42wssGateway.js @@ -190,7 +190,7 @@ cfgh.fetchConfig().then( async wssGatewayConfig => { console.log(`WS${wssGatewayConfig.server.unsecure ? '': 'S'} server created for ${wssGatewayConfig.server.listenHost}:${wssGatewayConfig.server.listenPort}`) }) - startRedis(wssGatewayConfig).then((allRediscnx) => { + startAllRedis(wssGatewayConfig).then((allRediscnx) => { const wssSrv = new wssServer(cfgh, WSSServer, allRediscnx, debug); }); diff --git a/redisConnexion.js b/redisConnexion.js index 3289ed8..f79c987 100644 --- a/redisConnexion.js +++ b/redisConnexion.js @@ -5,7 +5,7 @@ export class RedisConnexion { this.config = options.config; this.debug = options.debug; this.redisId = options.redisId; - this.redisConfig = this.config.redis[this.redisId] + this.redisConfig = this.config this.subscriptions = {}; // Externally fed this.wssConnections = {}; // Externally fed @@ -23,7 +23,7 @@ export class RedisConnexion { console.error('Redis error: ', err); }); - if(this.debug) console.log('Redis started...') + if(this.debug) console.log(`[${this.redisConfig.redisId}] Redis started...`) } getSubscribedUuids(chan){ @@ -45,16 +45,16 @@ export class RedisConnexion { async redisLogin(){ if(this.debug) console.log(`Connecting to Redis (${this.redisConfig.host}:${this.redisConfig.port}, tls:${this.redisConfig.tls?'yes':'no'})...`); await this.redisClient.connect(); - if(this.debug) console.log('Connected to Redis !'); + if(this.debug) console.log(`Connected to Redis ${this.redisConfig.redisId}`); if(this.redisConfig.user) { await this.redisClient.sendCommand(['AUTH', this.redisConfig.user, this.redisConfig.pass]); - if(this.debug) console.log('Logged into Redis !'); + if(this.debug) console.log(`Logged into Redis ${this.redisConfig.redisId}`); } else { - if(this.debug) console.log('Connected (anon) to Redis...'); + if(this.debug) console.log(`Connected (anon) to Redis ${this.redisConfig.redisId}`); } if(this.debug) { var redisTime = await this.redisClient.time(); - console.log('Redis time:', redisTime); + console.log(`[${this.redisConfig.redisId}] Redis ${this.redisConfig.redisId} time:`, redisTime); } } @@ -64,15 +64,15 @@ export class RedisConnexion { if(this.redisConfig.user) { await this.redisSubscriber.sendCommand(['AUTH', this.redisConfig.user, this.redisConfig.pass]); } - const allChans = this.redisConfig.basePrefix + this.redisConfig.ChansFilter+'*' + const allChans = this.redisConfig.basePrefix + this.redisConfig.chansNamespace+'*' this.redisSubscriber.pSubscribe(allChans, this.redisReceive.bind(this)); - if(this.debug) console.log('PSubscription OK ', allChans); + if(this.debug) console.log(`[${this.redisConfig.redisId}] PSubscription OK `, allChans); } async redisSubscribe(chanName, callBack){ if(!chanName.startsWith(this.redisConfig.basePrefix)) chanName = this.redisConfig.basePrefix + chanName - if(!chanName.startsWith(this.redisConfig.basePrefix+this.ChansFilter)) { - console.warn(`redisSubscribe : forbidden channel range on this redis !`) + if(!chanName.startsWith(this.redisConfig.basePrefix+this.chansNamespace)) { + console.warn(`[${this.redisConfig.redisId}] redisSubscribe : forbidden channel range on this redis !`) return } await this.redisSubscriber.subscribe(chanName, callBack); @@ -81,8 +81,8 @@ export class RedisConnexion { async redisPublish(chanName, msg){ if(typeof (msg) != 'string') msg = JSON.stringify(msg); if(!chanName.startsWith(this.redisConfig.basePrefix)) chanName = this.redisConfig.basePrefix + chanName - if(!chanName.startsWith(this.redisConfig.basePrefix+this.ChansFilter)) { - console.warn(`redisPublish : forbidden channel range on this redis !`) + if(!chanName.startsWith(this.redisConfig.basePrefix+this.chansNamespace)) { + console.warn(`[${this.redisConfig.redisId}] redisPublish : forbidden channel range on this redis !`) return } @@ -93,29 +93,29 @@ export class RedisConnexion { if(typeof(v) != 'string') v = JSON.stringify(v); if(customPrefix!==null) k = customPrefix + k else if(!k.startsWith(this.redisConfig.basePrefix)) k = this.redisConfig.basePrefix + k - if(this.debug) console.log('Redis SET ', k); + if(this.debug) console.log(`[${this.redisConfig.redisId}] Redis SET `, k); try { await this.redisClient.set(k, v) } - catch(err) { console.error('Redis crash doing Redis set: ', k, v) } + catch(err) { console.error(`[${this.redisConfig.redisId}] Redis crash doing Redis set: `, k, v) } if(exp > 0) { try { await this.redisClient.expire(k, exp) } - catch(err) { console.error('Redis crash doing Redis expire: ', k, exp) } + catch(err) { console.error(`[${this.redisConfig.redisId}] Redis crash doing Redis expire: `, k, exp) } } } async redisGet(k, customPrefix=null){ if(customPrefix!==null) k = customPrefix + k else if(!k.startsWith(this.redisConfig.basePrefix)) k = this.redisConfig.basePrefix + k - if(this.debug) console.log('Redis GET ', k) + if(this.debug) console.log(`[${this.redisConfig.redisId}] Redis GET`, k) let v=null try { v = await this.redisClient.get(k) } - catch(err) { console.error('Redis crash doing Redis get: ', k) } + catch(err) { console.error(`[${this.redisConfig.redisId}] Redis crash doing Redis get: `, k) } return(v); } async redisDel(k, customPrefix=null){ if(customPrefix!==null) k = customPrefix + k else if(!k.startsWith(this.redisConfig.basePrefix)) k = this.redisConfig.basePrefix + k - if(this.debug) console.log('Deleting ', k); + if(this.debug) console.log(`[${this.redisConfig.redisId}] Deleting`, k); await this.redisClient.del(k); } @@ -123,54 +123,54 @@ export class RedisConnexion { async redisGetTtl(k, customPrefix=null){ if(customPrefix!==null) k = customPrefix + k else if(!k.startsWith(this.redisConfig.basePrefix)) k = this.redisConfig.basePrefix + k - if(this.debug) console.log('Redis Get TTL ', k) + if(this.debug) console.log(`[${this.redisConfig.redisId}] Redis Get TTL `, k) let v=null try { v = await this.redisClient.ttl(k) } - catch(err) { console.error('Redis crash doing Redis ttl: ', k) } + catch(err) { console.error(`[${this.redisConfig.redisId}] Redis crash doing Redis ttl: `, k) } return(v); } async redisSetTtl(k, ttl, customPrefix=null){ if(customPrefix!==null) k = customPrefix + k else if(!k.startsWith(this.redisConfig.basePrefix)) k = this.redisConfig.basePrefix + k - if(this.debug) console.log('Redis Set TTL ', k); + if(this.debug) console.log(`[${this.redisConfig.redisId}] Redis Set TTL`, k); try { await this.redisClient.expire(k, ttl) } - catch(err) { console.error('Redis crash doing Redis expire: ', k, ttl) } + catch(err) { console.error(`[${this.redisConfig.redisId}] Redis crash doing Redis expire: `, k, ttl) } } async redisXadd(streamName, kvObj, max = ''){ if(!streamName.startsWith(this.redisConfig.basePrefix)) streamName = this.redisConfig.basePrefix + streamName - if(this.debug) console.log('Redis XADD ', streamName, kvObj); + if(this.debug) console.log(`[${this.redisConfig.redisId}] Redis XADD `, streamName, kvObj); let arr = ['XADD', streamName] if(max != '') arr = [...arr, ...['MAXLEN', '~', (1*max).toString()]] arr.push('*') let payload = '""' try{ payload = JSON.stringify(kvObj) } - catch(e) { console.warn('cannot historize bad json: ',kvObj) } + catch(e) { console.warn(`[${this.redisConfig.redisId}] cannot historize bad json: `,kvObj) } arr.push('streamData') arr.push(payload) let sid = null try { sid = await this.redisClient.sendCommand(arr); } - catch(err) { console.error('Redis crash doing Redis command: ', arr, err) } + catch(err) { console.error(`[${this.redisConfig.redisId}] Redis crash doing Redis command: `, arr, err) } return(sid); } async redisXrange(streamName, start = '-', end = '+', withPayload = true){ if(!streamName.startsWith(this.redisConfig.basePrefix)) streamName = this.redisConfig.basePrefix + streamName - if(this.debug) console.log('Redis XRANGE ', streamName); + if(this.debug) console.log(`[${this.redisConfig.redisId}]Redis XRANGE `, streamName); if(typeof(start)!='string') start = start.toString() if(typeof(end)!='string') end = end.toString() let arr = ['XRANGE', streamName, start, end]; let res = [] try { res = await this.redisClient.sendCommand(arr) } - catch(err) { console.error('Redis crash doing Redis command: ', arr, err.msg) } + catch(err) { console.error(`[${this.redisConfig.redisId}] Redis crash doing Redis command: `, arr, err.msg) } if(withPayload){ let o = {}; for (let row of res) { // We'll take only the first content of the stream (the value of the key 'streamdata') let payload = '' try{ payload = JSON.parse(row[1][1]) } - catch(e) { console.warn('cannot unhistorize bad json: ',row[1][1]) } + catch(e) { console.warn(`[${this.redisConfig.redisId}] cannot unhistorize bad json: `,row[1][1]) } o[row[0]] = payload } return(o); @@ -192,31 +192,31 @@ export class RedisConnexion { } async getProcessInfo(){ - if(this.debug) console.log('Redis NIFO '); + if(this.debug) console.log(`[${this.redisConfig.redisId}] Redis INFO`); try { const rawInfo = await this.redisClient.INFO() let arr = rawInfo.replace(/\r/g,'').split('\n') arr = arr.filter(item => (item.trim()!='') && (!item.trim().startsWith('#'))) let infoObject = arr.reduce((acc, val) => { kv = val.split(':',2); if(kv.length>0){ acc[kv[0]] = kv[1] } return(acc) }, {}) } - catch(err) { console.error('Redis crash doing Redis INFO: ') } + catch(err) { console.error(`[${this.redisConfig.redisId}] Redis crash doing Redis INFO: `) } return(infoObject) } async redisReceive(msg, chan){ - if(this.debug) console.log('From Redis chan:', chan, msg); + if(this.debug) console.log(`[${this.redisConfig.redisId}] From Redis chan:`, chan, msg); try { msg = JSON.parse(msg); } catch { - console.warn(`Ignoring non-json on channel ${chan} : ${msg}`); + console.warn(`[${this.redisConfig.redisId}] Ignoring non-json on channel ${chan} : ${msg}`); return; } - if(this.debug) console.log('will now fanout...', chan, msg); + if(this.debug) console.log(`[${this.redisConfig.redisId}] will now fanout...`, chan, msg); const uuids = this.getSubscribedUuids(chan) if(uuids.length>0) { // Anyone interested at all about this chan ? - if(this.debug) console.log(`Will broadcast to ${uuids.length} web clients`); + if(this.debug) console.log(`[${this.redisConfig.redisId}] Will broadcast to ${uuids.length} web clients`); let shortChan = chan.startsWith(this.redisConfig.basePrefix) ? chan.substr(this.redisConfig.basePrefix.length) : chan let payload ={ 'event': 'REDISMSG', @@ -226,10 +226,10 @@ export class RedisConnexion { } } - console.log('=====>',uuids) + for(var uuid of uuids) { if(uuid in this.wssConnections) { - if(this.debug) console.log('Sending to ', uuid, payload); + if(this.debug) console.log(`[${this.redisConfig.redisId}] Sending to `, uuid, payload); this.wssConnections[uuid].send(JSON.stringify(payload)); } } diff --git a/wssConnexion.js b/wssConnexion.js index 3725496..f546e0d 100644 --- a/wssConnexion.js +++ b/wssConnexion.js @@ -99,7 +99,7 @@ export class WssConnexion { subscribeMandatoryChans(){ let mandaChans = this.accessRights.mustSubscribe(this.userId, this.roles) for(let rediscnx of this.allRediscnx){ - mandaChans = mandaChans.filter(chan => chan.startsWith(rediscnx.redisConfig.chansFilter)) + mandaChans = mandaChans.filter(chan => chan.startsWith(rediscnx.redisConfig.chansNamespace)) mandaChans = mandaChans.map(item=>rediscnx.redisConfig.basePrefix+item) for(var chan of mandaChans){ if(!(chan in rediscnx.subscriptions)) rediscnx.subscriptions[chan] = []; diff --git a/wssGatewayConfig.json b/wssGatewayConfig.json index ab6746c..772579a 100644 --- a/wssGatewayConfig.json +++ b/wssGatewayConfig.json @@ -41,7 +41,7 @@ "pass": "", "historizeChannels": [ ], "historizeMax": 1000, - "ChansFilter":"system:", + "chansNamespace":"system:", "basePrefix": "messageBus:", "storePrefix": "messageBus:Store:", "storeMaxSize": 51200, @@ -57,7 +57,7 @@ "pass": "", "historizeChannels": [ ], "historizeMax": 1000, - "chansFilter":"arena:", + "chansNamespace":"arena:", "basePrefix": "messageBus:", "storePrefix": "messageBus:Store:", "storeMaxSize": 51200,