From 350d37e465dd2e2b4c243f8b25c9b32f8448d1e5 Mon Sep 17 00:00:00 2001 From: STEINNI Date: Sun, 5 Oct 2025 21:24:10 +0000 Subject: [PATCH] converting to MP, just config in accessRights left --- actions/chat.js | 167 --------------------- actions/index.js | 7 +- actions/notifications.js | 13 -- actions/pubSub.js | 116 +++++++------- actions/sessions.js | 316 --------------------------------------- actions/store.js | 22 ++- actions/utilities.js | 51 +------ configSchema.json | 86 +++++------ p42wssGateway.js | 69 +++++++-- redisConnexion.js | 82 +++++----- wssConnexion.js | 46 ++---- wssGatewayConfig.json | 79 +++++----- wssServer.js | 25 +--- 13 files changed, 283 insertions(+), 796 deletions(-) delete mode 100644 actions/chat.js delete mode 100644 actions/notifications.js delete mode 100644 actions/sessions.js diff --git a/actions/chat.js b/actions/chat.js deleted file mode 100644 index f65b160..0000000 --- a/actions/chat.js +++ /dev/null @@ -1,167 +0,0 @@ -export const methods = { - - /* Creates (predictable) peer-to-peer chan if necessary, or check for lobby existence, then subscribe to it. - Request: - { - "action": "STARTCHAT", - "payload": "P" // or "C" => "P" = Peer-to-peer, "C" = (chat) Channel - } - reply: - { - "action": "STARTCHAT", - "success": true, - "payload" : "dynamically created chan" - } - */ - action_STARTCHAT(action, payload, reqid){ - if(typeof(payload)!='string'){ - this.sendErr(action, 'Invalid payload', reqid); - return; - }; - let recipientId = payload.substring(2); - let chan; - if(payload[0]=='P') { - chan = (this.userId "P" = Peer-to-peer, "C" = (chat) Channel - } - reply: - { - "action": "SENDCHAT", - "success": true, - "payload" : null - } - */ - action_SENDCHAT(action, payload, reqid){ - //TODO: prevent unauthorized recipient !! - let recipientId = payload.recipient.substring(2); - let chan; - payload.event = 'CHATMSG' - if(payload.recipient[0]=='P') { - chan = (this.userId "P" = Peer-to-peer, "C" = (chat) Channel - } - reply: - { - "action": "ISONLINE", - "success": true, - "payload" : - } - */ - // You can only ask the satus of a list of usernames you know (and have the right to) - action_ISONLINE(action, payload, reqid){ - //TODO: can you really ask about those users ? (but that might cost too much time, because => ML?) - if(!Array.isArray(payload)){ - this.sendErr(action, 'Invalid payload', reqid); - return; - }; - let onlineUsers = Object.keys(this.wssSrv.getOnlineUsers()); - let reply = { - 'action': action, - 'payload': payload.filter((x) => (onlineUsers.indexOf(x)>-1)), - 'success': true, - }; - if(reqid) reply.reqid = reqid; - this.send(JSON.stringify(reply)); - }, - - // Same as ISONLINE, but subscribe to watch changes - action_WATCHUSERS(action, payload, reqid){ - if(!Array.isArray(payload)){ - this.sendErr(action, 'Invalid payload', reqid); - return; - } - //TODO: can you really ask about those users ? (but that might cost too much time, because => ML?) - this.usersWatched = payload; - let reply = { - 'action': action, - 'payload': null, - 'success': true, - }; - if(reqid) reply.reqid = reqid; - this.send(JSON.stringify(reply)); - }, - - /* - Request: - { - "action": "CHANLST", - "payload": { "filter": "ChatRoom*" } - } - reply: - { - "action": "CHANLST", - "success": true, - "payload" : ["ChatRoom1","ChatRoom2","ChatRoom_Experts"] - } - */ - action_CHANLST(action, payload, reqid){ - //TODO : Filter based on user rights!! - // - let reply = { - 'action': action, - 'payload': [ - this.config.redis.basePrefix+"onlineUsers", - this.config.redis.basePrefix+"system:chan1", - this.config.redis.basePrefix+"proposals:updates" - ],//this.config.redis.watchChannels, - 'success': true, - }; - if(reqid) reply.reqid = reqid; - this.send(JSON.stringify(reply)); - }, - -} \ No newline at end of file diff --git a/actions/index.js b/actions/index.js index 2e1956d..faac79c 100644 --- a/actions/index.js +++ b/actions/index.js @@ -1,17 +1,12 @@ import { methods as utilities } from './utilities.js' import { methods as pubSub } from './pubSub.js' import { methods as store } from './store.js' -import { methods as sessions } from './sessions.js' -import { methods as notifications } from './notifications.js' -import { methods as chat } from './chat.js' + export const gatewayActions = { ...utilities, ...pubSub, ...store, - ...sessions, - ...notifications, - ...chat } diff --git a/actions/notifications.js b/actions/notifications.js deleted file mode 100644 index 20cebe2..0000000 --- a/actions/notifications.js +++ /dev/null @@ -1,13 +0,0 @@ -export const methods = { - - async action_NOTIFS(action, payload, reqid){ - let reply = { - 'action': action, - 'payload': await this.getAwaitingNotifs(), - 'success': true, - }; - if(reqid) reply.reqid = reqid; - this.send(JSON.stringify(reply)); - }, - -} \ No newline at end of file diff --git a/actions/pubSub.js b/actions/pubSub.js index ee4bc4c..651d7d1 100644 --- a/actions/pubSub.js +++ b/actions/pubSub.js @@ -15,34 +15,35 @@ export const methods = { if(!Array.isArray(payload)){ this.sendErr(action, 'Invalid payload', reqid); return; - }; - + } + // payload only accepts NON-Bas-Prefixed chans + let subscribed = [] for(var chan of payload){ if((!chan) || (typeof(chan)!='string')) continue if(!this.accessRights.canSubscribe(this.userId, this.roles, chan)) { if(this.debug) console.log('SUB: No rights to this chan!', this.userId, this.roles, chan) continue } - // Chat chans are forbidden here - if((chan.substr(0,8) == 'userchans') || (chan.substr(0,9) == 'lobbychans')) continue; - if(!chan.startsWith(this.config.redis.basePrefix)) chan = this.config.redis.basePrefix + chan - if(this.subscriptions.indexOf(chan)<0) { - this.subscriptions.push(chan); + let coudSubscribe = false + for(const rediscnx of this.allRediscnx){ + if(!chan.startsWith(rediscnx.redisConfig.ChansFilter)) continue + else coudSubscribe = true + let localChan = rediscnx.redisConfig.basePrefix + localChan + if(!(localChan in rediscnx.subscriptions)) rediscnx.subscriptions[localChan] = []; + if(!rediscnx.subscriptions[localChan].includes(this.uuid)) { + rediscnx.subscriptions[localChan].push(this.uuid); + } } - if(!(chan in this.rediscnx.subscriptions)) this.rediscnx.subscriptions[chan] = []; - if(this.rediscnx.subscriptions[chan].indexOf(this.uuid)<0) { - this.rediscnx.subscriptions[chan].push(this.uuid); - } - } - let shortChans = this.subscriptions.map(item => ( - item.startsWith(this.config.redis.basePrefix) ? item.substr(this.config.redis.basePrefix.length) : item - )) + if(coudSubscribe && (!subscribed.includes(chan))) subscribed.push(chan) + if(coudSubscribe && (!this.subscriptions.includes(chan)) ){ this.subscriptions.push(chan) } + + } let reply = { 'action': action, - 'payload': shortChans, + 'payload': subscribed, 'success': true, }; if(reqid) reply.reqid = reqid; @@ -67,29 +68,31 @@ export const methods = { this.sendErr(action, 'Invalid payload', reqid); return; }; - + // payload only accepts NON-Bas-Prefixed chans + let unSubscribed = [] for(var chan of payload){ if((!chan) || (typeof(chan)!='string')) continue if(this.accessRights.isMandatory(this.userId, this.roles, chan)) continue - // Chat chans are forbidden here - if((chan.substr(0,8) == 'userchans') || (chan.substr(0,9) == 'lobbychans')) continue; - - if(!chan.startsWith(this.config.redis.basePrefix)) chan = this.config.redis.basePrefix + chan - if(this.subscriptions.indexOf(chan)>-1) { - this.subscriptions.splice(this.subscriptions.indexOf(chan), 1); + let couldUnsubscribe = false + for(const rediscnx of this.allRediscnx){ + if(!chan.startsWith(rediscnx.redisConfig.ChansFilter)) continue + else couldUnsubscribe = true + let localChan = rediscnx.redisConfig.basePrefix + chan + if((localChan in rediscnx.subscriptions) && (rediscnx.subscriptions[chan].includes(this.uuid))) { + rediscnx.subscriptions[localChan].splice(rediscnx.subscriptions[localChan].indexOf(this.uuid), 1) ; + } } - if((chan in this.rediscnx.subscriptions) && (this.rediscnx.subscriptions[chan].indexOf(this.uuid)>-1)) { - this.rediscnx.subscriptions[chan].splice(this.rediscnx.subscriptions[chan].indexOf(this.uuid), 1) ; + + if(couldUnsubscribe && (!unSubscribed.includes(chan))) unSubscribed.push(chan) + if(couldUnsubscribe && this.subscriptions.includes(chan)) { + this.subscriptions.splice(this.subscriptions.indexOf(chan), 1); } } - let shortChans = this.subscriptions.map(item => ( - item.startsWith(this.config.redis.basePrefix) ? item.substr(this.config.redis.basePrefix.length) : item - )) let reply = { 'action': action, - 'payload': shortChans, + 'payload': unSubscribed, 'success': true, }; if(reqid) reply.reqid = reqid; @@ -109,12 +112,9 @@ export const methods = { } */ action_SUBLST(action, payload, reqid){ - let shortChans = this.subscriptions.map(item => ( - item.startsWith(this.config.redis.basePrefix) ? item.substr(this.config.redis.basePrefix.length) : item - )) let reply = { 'action': action, - 'payload': shortChans, + 'payload': this.subscriptions, 'success': true, }; if(reqid) reply.reqid = reqid; @@ -146,8 +146,7 @@ export const methods = { return; }; - if( (!this.accessRights.canPublish(this.userId, this.roles, payload.chan)) && - (! this.rediscnx.redPillsUuids.includes(this.uuid)) ) { + if( (!this.accessRights.canPublish(this.userId, this.roles, payload.chan)) ) { this.sendErr(action, 'Unauthorized chan !', reqid); if(this.debug) console.log('PUB: Unauthorized chan', payload.chan, this.userId, this.roles) return @@ -155,22 +154,33 @@ export const methods = { let msgO try { msgO = JSON.parse(payload.msg) } catch(err) { msgO = {'err':err} } + msgO.sender = this.userId - let histId = null - if(this.rediscnx.isHistorizedChan(payload.chan)) { // historize first to add the histId - let shortChan = payload.chan.startsWith(this.config.redis.basePrefix) ? payload.chan.substr(this.config.redis.basePrefix.length) : payload.chan - histId = await this.rediscnx.redisXadd(this.config.redis.basePrefix+this.config.redis.historizePrefix + shortChan, payload.msg, this.config.redis.historizeMax); + const chan = payload.chan + + // First find the primary redis for this chan namespace, to do historization first, and get the histId + const primaryRediscnx = this.allRediscnx.find(cnx => ((chan.startsWith(cnx.redisConfig.ChansFilter)) &&(cnx.redisConfig.role=='primary')) ) + if(!primaryRediscnx){ + this.sendErr(action, 'No primary redis for this chan !', reqid); + if(this.debug) console.log('PUB: No primary redis for this chan ', chan) + return + } + if(primaryRediscnx.isHistorizedChan(chan)){ + histId = await rediscnx.redisXadd(rediscnx.redisConfig.basePrefix+rediscnx.redisConfig.historizePrefix + chan, payload.msg, rediscnx.redisConfig.historizeMax); if( !histId) { this.sendErr(action, 'Could not historize, aborted event publish !', reqid); - console.error(`Could not historize for "${shortChan}", aborted event publish !`) + console.error(`Could not historize for "${chan}", aborted event publish !`) return } msgO.histId = histId } - msgO.sender = this.userId - try { payload.msg = JSON.stringify(msgO) } catch(err) {payload.msg = `{"err":"${err}}"` } - this.rediscnx.redisPublish(payload.chan, payload.msg) + // Now publish on every Redis that covers this chan namespace + try { payload.msg = JSON.stringify(msgO) } catch(err) {payload.msg = `{"err":"${err}}"` } + for(const rediscnx of this.allRediscnx){ + if(!chan.startsWith(rediscnx.redisConfig.ChansFilter)) continue + rediscnx.redisPublish(chan, payload.msg) + } let reply = { 'action': action, @@ -216,22 +226,28 @@ export const methods = { return } - if( (!this.accessRights.canSubscribe(this.userId, this.roles, payload.channel)) && - (! this.rediscnx.redPillsUuids.includes(this.uuid)) ) { - this.sendErr(action, 'Unauthorized channel !', reqid) + if( (!this.accessRights.canSubscribe(this.userId, this.roles, payload.channel)) ) { + this.sendErr(action, 'CHANHIST: Unauthorized channel !', reqid) return } - if(!this.rediscnx.isHistorizedChan(payload.channel)){ - this.sendErr(action, 'Not an historized channel !', reqid) + const primaryRediscnx = this.allRediscnx.find(cnx => ((chan.startsWith(cnx.redisConfig.ChansFilter)) &&(cnx.redisConfig.role=='primary')) ) + if(!primaryRediscnx){ + this.sendErr(action, 'No primary redis for this chan !', reqid); + if(this.debug) console.log('CHANHIST: No primary redis for this chan ', chan) + return + } + + if(!primaryRediscnx.isHistorizedChan(payload.channel)){ + this.sendErr(action, 'CHANHIST: Not an historized channel !', reqid) return } let from = (payload.from.indexOf('-')>-1) ? payload.from : 1000*payload.from let to = '+' if(payload.to) to = (payload.to.indexOf('-')>-1) ? payload.to : 1000*payload.to - let streamName = payload.channel.startsWith(this.config.redis.basePrefix) ? this.config.redis.historizePrefix+payload.channel.substr(this.config.redis.basePrefix.length) : this.config.redis.historizePrefix + payload.channel - let respPayload = await this.rediscnx.redisXrange(streamName, from, to); + let streamName = payload.channel.startsWith(primaryRediscnx.redisConfig.basePrefix) ? primaryRediscnx.redisConfig.historizePrefix+payload.channel.substr(primaryRediscnx.redisConfig.basePrefix.length) : primaryRediscnx.redisConfig.historizePrefix + payload.channel + let respPayload = await primaryRediscnx.redisXrange(streamName, from, to); let reply = { 'action': action, diff --git a/actions/sessions.js b/actions/sessions.js deleted file mode 100644 index 3c834b6..0000000 --- a/actions/sessions.js +++ /dev/null @@ -1,316 +0,0 @@ -export const methods = { - - /* Request payload : null - Reply: - { - "action": "GETACTIVEUSERS", - "payload": [ - { - "uid": "steinic", - "email": "Nicolas.STEIN@ext.ec.europa.eu", - "given_name": "Nicolas", - "family_name": "STEIN", - "userRoles": [ - "BP_PO", - "SP_Admin", - "Org_Member", - "Org_Pending", - "EIC_Dev" - ], - "sessionExpire": 3594, - "busConnected": true - } - ], - "success": true, - "reqid": "df58a401-4ed2-4908-a2b1-8bae155e413a" - } - */ - async action_GETACTIVEUSERS(action, payload, reqid){ - if(!this.accessRights.canDo(this.roles, 'getActiveUsers')) { - this.sendErr(action, 'Unauthorized action !', reqid); - return - } - -//TODO: take from new config key instead of hardcded - const iterOptions = { - TYPE: 'string', - MATCH: 'authorizer:sessid_*' - } - - let activeUsers = [] - for await (const key of this.rediscnx.redisClient.scanIterator(iterOptions)) { - let sess = null - try{ sess = JSON.parse(await this.rediscnx.redisGet(key, '')) } - catch(err) { console.log('bad sess info')} - if((!sess) || (!sess.isAuthenticated) || (!sess.sessionID) - || (!sess.userInfo) || (!sess.userInfo.userRoles) || (!sess.userInfo.euLoginId)){ - continue - } - - let ttl = await this.rediscnx.redisGetTtl(key, '') - activeUsers.push({ - uid: sess.userInfo.euLoginId, - email: sess.userInfo.email, - given_name: sess.userInfo.given_name, - family_name: sess.userInfo.family_name, - userRoles: sess.userInfo.userRoles, - sessionExpire: ttl, - busConnected: this.wssSrv.sessionConnected(sess.sessionID), - }) - } - var reply = { - 'action': action, - 'payload': activeUsers, - 'success': true, - }; - if(reqid) reply.reqid = reqid; - this.send(JSON.stringify(reply)); - }, - /* - * payload: { - uids: [ 'fallimi' ], - notRoles : ['EIC_ADMIN', 'EIC_Dev' ], - ttl: 0 - } - => Both conditions must be met (here nothing gets done as fallimi is EIC_Dev) - - Any uid, but not some roles : - { - uids: null, - notRoles : ['EIC_ADMIN', 'EIC_Dev' ], - ttl: 0 - } - - Some uids, don't care their roles in 30 seconds : - { - uids: [ 'infosca', 'nz01234' ], - notRoles : [], - ttl: 30 - } - - */ - - - /* Request payload : { "uid":"steinni" } - Reply: - { - "action": "GETUSERSTATUS", - "payload": - { - "uid": "steinic", - "email": "Nicolas.STEIN@ext.ec.europa.eu", - "given_name": "Nicolas", - "family_name": "STEIN", - "sessionExpire": 3594, - "busConnected": true - }, - "success": true, - "reqid": "df58a401-4ed2-4908-a2b1-8bae155e413a" - } - */ - async action_GETUSERSTATUS(action, payload, reqid){ - if(!this.accessRights.canDo(this.roles, 'getUserStatus')) { - this.sendErr(action, 'Unauthorized action !', reqid); - return - } - - const iterOptions = { - TYPE: 'string', - MATCH: 'authorizer:sessid_*' - } - - let user = { - uid: payload.uid, - email: null, - given_name: null, - family_name: null, - sessionExpire: null, - busConnected: null, - } - - for await (const key of this.rediscnx.redisClient.scanIterator(iterOptions)) { - let sess = null - try{ sess = JSON.parse(await this.rediscnx.redisGet(key, '')) } - catch(err) { console.log('bad sess info')} - - if((!sess) || (!sess.isAuthenticated) || (!sess.sessionID) - || (!sess.userInfo) || (!sess.userInfo.userRoles) || (!sess.userInfo.euLoginId) - || (sess.userInfo.euLoginId != payload.uid) - ) { - continue - } else { - let ttl = await this.rediscnx.redisGetTtl(key, '') - user={ - uid: sess.userInfo.euLoginId, - email: sess.userInfo.email, - given_name: sess.userInfo.given_name, - family_name: sess.userInfo.family_name, - sessionExpire: ttl, - busConnected: this.wssSrv.sessionConnected(sess.sessionID), - } - break - } - } - - var reply = { - 'action': action, - 'payload': user, - 'success': true, - }; - if(reqid) reply.reqid = reqid; - this.send(JSON.stringify(reply)); - }, - - async action_KILLSESSION(action, payload, reqid){ - if(!this.accessRights.canDo(this.roles, 'killSessions')) { - this.sendErr(action, 'Unauthorized action !', reqid); - return - } - if( (!payload.notRoles) || (!Array.isArray(payload.notRoles)) || (payload.uids && (!Array.isArray(payload.uids))) ){ - this.sendErr(action, 'Bad payload !', reqid); - return - } - - - -//TODO: take from new config key instead of hardcded - const iterOptions = { - TYPE: 'string', - MATCH: 'authorizer:sessid_*' - } - - for await (const key of this.rediscnx.redisClient.scanIterator(iterOptions)) { - if(key.endsWith('_cookie')) continue - let sess = null - try{ sess = JSON.parse(await this.rediscnx.redisGet(key, '')) } - catch(err) { console.log('bad sess info')} - if((!sess) || (!sess.isAuthenticated)) continue - - if(payload.uids && (payload.uids.indexOf(sess.userInfo['euLoginId'])<0)) continue - let intersect = payload.notRoles.filter(value => sess.userInfo.userRoles.includes(value)); - if(intersect.length>0) continue - - if((!payload.ttl) || (typeof(payload.ttl)!= number) || (payload.ttl<0) || (payload.ttl>3600)) payload.ttl=0 - let ttl = await this.rediscnx.redisSetTtl(key, payload.ttl, '') - - } - var reply = { - 'action': action, - 'success': true, - }; - if(reqid) reply.reqid = reqid; - this.send(JSON.stringify(reply)); - }, - - /* Request: (curtain down, except for devs & admins) - { - "action": "SETSPARCSTATE", - "payload" : { - blockedUids: [], - allowedRoles : ['EIC_Admin', 'EIC_Dev'], - }, - } - - Request: (curtain up, for everyone) - { - "action": "SETSPARCSTATE", - "payload" : { - blockedUids: [], - allowedRoles : '*', - }, - } - - Request: (curtain up, block some bad-guys) - { - "action": "SETSPARCSTATE", - "payload" : { - blockedUids: ['hacker1', 'hacker2'], - allowedRoles : '*', - }, - } - - Reply: - { - "success": true, - "reqid": "6az5e4r6a", - "payload": { the accessrights } - } - */ - async action_SETPLATFORMMODE(action, payload, reqid){ - if(!this.accessRights.canDo(this.roles, 'setPlatformState')) { - this.sendErr(action, 'Unauthorized action !', reqid); - return - } - if((typeof(payload)!='object') || (!Array.isArray(payload.blockedUUIDs)) || - ( (typeof(payload.platformRestrictions)=='object') && (!Array.isArray(payload.platformRestrictions.allowedRoles)) ) - ){ - this.sendErr(action, 'Invalid payload', reqid) - return - } - - if(typeof(payload.platformRestrictions)=='object'){ // curtain down - if(!payload.platformRestrictions.allowedRoles.includes('EIC_Dev')){ // anti-shoot-your-foot - payload.platformRestrictions.allowedRoles.push('EIC_Dev') - } - } else { // curtain up - //force-in an example - payload.XX_platformRestrictions = { "allowedRoles":["EIC_Admin","EIC_Dev"],"allowedUUIDs":["valentin"] } - } - - - - await this.rediscnx.redisSet(this.config.redis.platformStateKey, - payload, - 0, - '' - ) - - var reply = { - 'action': action, - 'success': true - }; - if(reqid) reply.reqid = reqid; - this.send(JSON.stringify(reply)); - }, - - - /* Request: - { - "action": "GETSPARCMODE" - "payload": { - "key": "keyname" - } - "reqid": "6az5e4r6a" - } - Reply: - { - "action":"STORE", - "success":true, - "payload": { - ...the sparc mode - } - "reqid": reqid - } - */ - async action_GETPLATFORMMODE(action, payload, reqid){ - if(!this.accessRights.canDo(this.roles, 'getPlatformState')) { - this.sendErr(action, 'Unauthorized action !', reqid); - return - } - - let rawVal = await this.rediscnx.redisGet(this.config.redis.platformStateKey, '') - let val = null - try { val = JSON.parse(rawVal)} - catch(err) { console.error('Action GETSPARCMODE: Not a json !? ', rawVal) } - - var reply = { - 'action': action, - 'payload': val, - 'success': true, - }; - if(reqid) reply.reqid = reqid; - this.send(JSON.stringify(reply)); - } - - -} \ No newline at end of file diff --git a/actions/store.js b/actions/store.js index f606565..82bb7e4 100644 --- a/actions/store.js +++ b/actions/store.js @@ -35,6 +35,13 @@ export const methods = { return } + const primaryRediscnx = this.allRediscnx.find(cnx => ((payload.key.startsWith(cnx.redisConfig.ChansFilter)) &&(cnx.redisConfig.role=='primary')) ) + if(!primaryRediscnx){ + this.sendErr(action, 'No primary redis for this key prefix !', reqid); + if(this.debug) console.log('ACTION_SET: No primary redis for this key ', payload.key) + return + } + if(payload.value) { let val = null try { val = JSON.stringify(payload.value)} @@ -42,14 +49,14 @@ export const methods = { this.sendErr(action, 'Cannot stringify value object !', reqid); return } - if(val.length > this.config.redis.storeMaxSize){ + if(val.length > primaryRediscnx.redisConfig.storeMaxSize){ this.sendErr(action, 'value too large !', reqid); return } let exp = ((payload.expire>0) && (payload.expire<63072000)) ? payload.expire : 63072000 - await this.rediscnx.redisSet(payload.key, val, exp, this.config.redis.storePrefix) + await primaryRediscnx.redisSet(payload.key, val, exp, primaryRediscnx.redisConfig.storePrefix) } else { - await this.rediscnx.redisDel(payload.key, this.config.redis.storePrefix) + await primaryRediscnx.redisDel(payload.key, primaryRediscnx.redisConfig.storePrefix) } var reply = { 'action': action, @@ -91,7 +98,14 @@ export const methods = { return } - let rawVal = await this.rediscnx.redisGet(payload.key, this.config.redis.storePrefix) + const primaryRediscnx = this.allRediscnx.find(cnx => ((payload.key.startsWith(cnx.redisConfig.ChansFilter)) &&(cnx.redisConfig.role=='primary')) ) + if(!primaryRediscnx){ + this.sendErr(action, 'No primary redis for this key prefix !', reqid); + if(this.debug) console.log('ACTION_GET: No primary redis for this key ', payload.key) + return + } + + let rawVal = await primaryRediscnx.redisGet(payload.key, primaryRediscnx.redisConfig.storePrefix) let val = null try { val = JSON.parse(rawVal)} catch(err) { console.error('Action GET: Not a json !? ', rawVal) } diff --git a/actions/utilities.js b/actions/utilities.js index 9a40773..07d8ae3 100644 --- a/actions/utilities.js +++ b/actions/utilities.js @@ -37,7 +37,7 @@ export const methods = { 'action': action, 'payload': { wssGatewayTime: tmstp, - redisTime: this.rediscnx.redisClient.time() + redisTime: this.allRediscnx.map(cnx => cnx.redisClient.time()) }, 'success': true, }; @@ -145,59 +145,12 @@ export const methods = { var reply = { 'action': action, 'success': true, - 'payload': this.rediscnx.getProcessInfo + 'payload': this.allRediscnx.map(cnx => cnx.getProcessInfo) }; if(reqid) reply.reqid = reqid; this.send(JSON.stringify(reply)); }, - action_REDPILL(action, payload, reqid){ - if(!this.accessRights.canDo(this.roles, 'REDPILL', this.userId)) { - this.sendErr(action, 'Unauthorized action', reqid); - return; - }; - if(!this.rediscnx.redPillsUuids.includes(this.uuid)) { - this.rediscnx.redPillsUuids.push(this.uuid) - setTimeout(() => { - if(this.rediscnx.redPillsUuids.includes(this.uuid)){ // could have been removed meanwhile & splice(-1) would remove the last !!! - this.rediscnx.redPillsUuids.splice(this.rediscnx.redPillsUuids.indexOf(this.uuid),1) - } - let reply = { - 'action': 'BLUEPILL', - 'payload': {}, - 'success': true, - }; - this.send(JSON.stringify(reply)); - }, 600000) // Back to blupill after 10min - } - - let reply = { - 'action': action, - 'payload': {}, - 'success': true, - }; - if(reqid) reply.reqid = reqid; - this.send(JSON.stringify(reply)); - }, - - action_BLUEPILL(action, payload, reqid){ - if(!this.accessRights.canDo(this.roles, 'BLUEPILL', this.userId)) { - this.sendErr(action, 'Unauthorized action', reqid); - return; - }; - - if(this.rediscnx.redPillsUuids.includes(this.uuid)) { - this.rediscnx.redPillsUuids.splice(this.rediscnx.redPillsUuids.indexOf(this.uuid),1) - } - - let reply = { - 'action': action, - 'payload': {}, - 'success': true, - }; - if(reqid) reply.reqid = reqid; - this.send(JSON.stringify(reply)); - }, } \ No newline at end of file diff --git a/configSchema.json b/configSchema.json index ca9f169..c272461 100644 --- a/configSchema.json +++ b/configSchema.json @@ -66,38 +66,46 @@ ] } }, - "redis": { - "type": "object", - "properties": { - "host": { "type": "string" }, - "tls": { "type": "boolean" }, - "port": { "type": "integer" }, - "user": { "type": "string" }, - "pass": { "type": "string" }, - "basePrefix": { "type": "string" }, - "challengePrefix": { "type": "string" }, - "historizeMax": { "type": "integer" }, - "historizePrefix": { "type": "string" }, - "platformStateKey": { "type": "string" }, - "storeMaxSize": { "type": "integer" }, - "storePrefix": { "type": "string" }, - "historizeChannels": { - "type": "array", - "items": { - "type": "string" - } + "redis":{ + "type": "array", + "items": { + "type": "object", + "properties": { + "redisId": {"type": "string" }, + "role": {"type": "string", "enum": ["primary", "shard"] }, + "host": { "type": "string" }, + "tls": { "type": "boolean" }, + "port": { "type": "integer" }, + "user": { "type": "string" }, + "pass": { "type": "string" }, + "chansFilter": { "type": "string" }, + "basePrefix": { "type": "string" }, + "historizeMax": { "type": "integer" }, + "historizePrefix": { "type": "string" }, + "storeMaxSize": { "type": "integer" }, + "storePrefix": { "type": "string" }, + "historizeChannels": { + "type": "array", + "items": { + "type": "string" + } + } + }, + "required": [ + "redisId", + "chansFilter", + "role", + "basePrefix", + "storeMaxSize", + "storePrefix" + ], + "if": { + "properties": { "role": { "const": "primary" } } + }, + "then": { + "required": ["historizeChannels", "historizeMax", "historizePrefix"] } - }, - "required": [ - "basePrefix", - "challengePrefix", - "historizeChannels", - "historizeMax", - "historizePrefix", - "platformStateKey", - "storeMaxSize", - "storePrefix" - ] + } }, "server": { "type": "object", @@ -109,30 +117,16 @@ "listenPath": { "type": "string" }, "unsecure": { "type": "boolean" }, "challengeExpiration": { "type": "integer" }, - "healthCheckPath": { "type": "string" }, "keepAliveInterval": { "type": "string" }, - "keepAliveTimeout": { "type": "string" }, - "systemChannels": { - "type": "object", - "properties": { - "onlineUsers": { - "type": "string" - } - }, - "required": [ - "onlineUsers" - ] - } + "keepAliveTimeout": { "type": "string" } }, "required": [ "challengeExpiration", - "healthCheckPath", "keepAliveInterval", "keepAliveTimeout", "listenHost", "listenPath", "listenPort", - "systemChannels", "unsecure" ], "if": { diff --git a/p42wssGateway.js b/p42wssGateway.js index bc196ff..d83be22 100644 --- a/p42wssGateway.js +++ b/p42wssGateway.js @@ -75,19 +75,59 @@ const cfgh = new configHelper({ localfile: './wssGatewayConfig.json', }) -async function startRedis(wssGatewayConfig) { - let REDIScnx = new RedisConnexion({ - debug: debug, - config: wssGatewayConfig, - }); - if(debug) console.log('Starting REDIS...'); - await REDIScnx.redisLogin(); - if(debug) console.log('REDIS Login OK'); - await REDIScnx.redisChansStart(); - if(debug) console.log('REDIS ChansStart OK'); - return (REDIScnx); -} +async function startAllRedis(wssGatewayConfig) { + if (debug) console.log('Starting all Redis instances...') + + //1. instantiate all & login all + const redisConns = wssGatewayConfig.redis.map(cfg => + new RedisConnexion({ debug, config: cfg, redisId:cfg.redisId }) + ) + const loginResults = await Promise.allSettled( + redisConns.map(async cnx => { + cnx.redisLogin() + return cnx.redisId + }) + ) + 2. //make sure all connected before going any further + const failedLogin = loginResults.filter(r => r.status !== 'fulfilled') + if (failedLogin.length > 0) { + console.error('Redis login failures:') + failedLogin.forEach((r, i) => { + const id = redisConns[i].redisId + console.error(`chansStart failed for redis:[${id}] → ${r.reason}`) + }) + throw new Error( + `Redis login failed for ${failedLogin.length}/${redisConns.length} instances` + ) + } + + if (debug) console.log('All Redis logins OK') + + // --- Phase 2: start channels for all (since all succeeded) + const chanResults = await Promise.allSettled( + redisConns.map(async cnx => { + cnx.redisChansStart() + return cnx.redisId + }) + ) + + const failedChans = chanResults.filter(r => r.status !== 'fulfilled') + if (failedChans.length > 0) { + console.error('Redis chansStart failures:') + failedChans.forEach((r, i) => { + const id = redisConns[i].redisId + console.error(`chansStart failed for redis:[${id}] → ${r.reason}`) + }) + throw new Error( + `Redis chansStart failed for ${failedChans.length}/${redisConns.length} instances` + ) + } + + if (debug) console.log('All Redis chansStart OK') + + return redisConns +} cfgh.fetchConfig().then( async wssGatewayConfig => { @@ -150,9 +190,8 @@ cfgh.fetchConfig().then( async wssGatewayConfig => { console.log(`WS${wssGatewayConfig.server.unsecure ? '': 'S'} server created for ${wssGatewayConfig.server.listenHost}:${wssGatewayConfig.server.listenPort}`) }) - startRedis(wssGatewayConfig).then((rediscnx) => { - if(debug) console.log('Redis started & logged in !'); - const wssSrv = new wssServer(cfgh, WSSServer, rediscnx, debug); + startRedis(wssGatewayConfig).then((allRediscnx) => { + const wssSrv = new wssServer(cfgh, WSSServer, allRediscnx, debug); }); diff --git a/redisConnexion.js b/redisConnexion.js index 451e4b8..3289ed8 100644 --- a/redisConnexion.js +++ b/redisConnexion.js @@ -4,16 +4,17 @@ export class RedisConnexion { constructor(options) { this.config = options.config; this.debug = options.debug; + this.redisId = options.redisId; + this.redisConfig = this.config.redis[this.redisId] this.subscriptions = {}; // Externally fed this.wssConnections = {}; // Externally fed - this.redPillsUuids = []; // Externally fed this.redisClient = redis.createClient({ socket: { - tls: this.config.redis.tls, - host: this.config.redis.host, - port: this.config.redis.port + tls: this.redisConfig.tls, + host: this.redisConfig.host, + port: this.redisConfig.port } }); @@ -42,11 +43,11 @@ export class RedisConnexion { } async redisLogin(){ - if(this.debug) console.log(`Connecting to Redis (${this.config.redis.host}:${this.config.redis.port}, tls:${this.config.redis.tls?'yes':'no'})...`); + if(this.debug) console.log(`Connecting to Redis (${this.redisConfig.host}:${this.redisConfig.port}, tls:${this.redisConfig.tls?'yes':'no'})...`); await this.redisClient.connect(); if(this.debug) console.log('Connected to Redis !'); - if(this.config.redis.user) { - await this.redisClient.sendCommand(['AUTH', this.config.redis.user, this.config.redis.pass]); + if(this.redisConfig.user) { + await this.redisClient.sendCommand(['AUTH', this.redisConfig.user, this.redisConfig.pass]); if(this.debug) console.log('Logged into Redis !'); } else { if(this.debug) console.log('Connected (anon) to Redis...'); @@ -60,32 +61,38 @@ export class RedisConnexion { async redisChansStart(){ this.redisSubscriber = this.redisClient.duplicate(); await this.redisSubscriber.connect(); - if(this.config.redis.user) { - await this.redisSubscriber.sendCommand(['AUTH', this.config.redis.user, this.config.redis.pass]); + if(this.redisConfig.user) { + await this.redisSubscriber.sendCommand(['AUTH', this.redisConfig.user, this.redisConfig.pass]); } - this.redisSubscriber.pSubscribe(this.config.redis.basePrefix + '*', this.redisReceive.bind(this)); - if(this.debug) console.log('PSubscription OK ', this.config.redis.basePrefix + '*'); + const allChans = this.redisConfig.basePrefix + this.redisConfig.ChansFilter+'*' + this.redisSubscriber.pSubscribe(allChans, this.redisReceive.bind(this)); + if(this.debug) console.log('PSubscription OK ', allChans); } async redisSubscribe(chanName, callBack){ - if(!chanName.startsWith(this.config.redis.basePrefix)) chanName = this.config.redis.basePrefix + chanName + if(!chanName.startsWith(this.redisConfig.basePrefix)) chanName = this.redisConfig.basePrefix + chanName + if(!chanName.startsWith(this.redisConfig.basePrefix+this.ChansFilter)) { + console.warn(`redisSubscribe : forbidden channel range on this redis !`) + return + } await this.redisSubscriber.subscribe(chanName, callBack); } async redisPublish(chanName, msg){ if(typeof (msg) != 'string') msg = JSON.stringify(msg); - if(!chanName.startsWith(this.config.redis.basePrefix)) chanName = this.config.redis.basePrefix + chanName - await this.redisClient.publish(chanName, msg); - } + if(!chanName.startsWith(this.redisConfig.basePrefix)) chanName = this.redisConfig.basePrefix + chanName + if(!chanName.startsWith(this.redisConfig.basePrefix+this.ChansFilter)) { + console.warn(`redisPublish : forbidden channel range on this redis !`) + return + } - async redisRefreshSession(k){ - await this.redisClient.expire(k, this.config.server.sessionExpiration); + await this.redisClient.publish(chanName, msg); } async redisSet(k, v, exp = 0, customPrefix=null){ if(typeof(v) != 'string') v = JSON.stringify(v); if(customPrefix!==null) k = customPrefix + k - else if(!k.startsWith(this.config.redis.basePrefix)) k = this.config.redis.basePrefix + k + else if(!k.startsWith(this.redisConfig.basePrefix)) k = this.redisConfig.basePrefix + k if(this.debug) console.log('Redis SET ', k); try { await this.redisClient.set(k, v) } catch(err) { console.error('Redis crash doing Redis set: ', k, v) } @@ -97,7 +104,7 @@ export class RedisConnexion { async redisGet(k, customPrefix=null){ if(customPrefix!==null) k = customPrefix + k - else if(!k.startsWith(this.config.redis.basePrefix)) k = this.config.redis.basePrefix + k + else if(!k.startsWith(this.redisConfig.basePrefix)) k = this.redisConfig.basePrefix + k if(this.debug) console.log('Redis GET ', k) let v=null try { v = await this.redisClient.get(k) } @@ -107,7 +114,7 @@ export class RedisConnexion { async redisDel(k, customPrefix=null){ if(customPrefix!==null) k = customPrefix + k - else if(!k.startsWith(this.config.redis.basePrefix)) k = this.config.redis.basePrefix + k + else if(!k.startsWith(this.redisConfig.basePrefix)) k = this.redisConfig.basePrefix + k if(this.debug) console.log('Deleting ', k); await this.redisClient.del(k); } @@ -115,7 +122,7 @@ export class RedisConnexion { async redisGetTtl(k, customPrefix=null){ if(customPrefix!==null) k = customPrefix + k - else if(!k.startsWith(this.config.redis.basePrefix)) k = this.config.redis.basePrefix + k + else if(!k.startsWith(this.redisConfig.basePrefix)) k = this.redisConfig.basePrefix + k if(this.debug) console.log('Redis Get TTL ', k) let v=null try { v = await this.redisClient.ttl(k) } @@ -125,14 +132,14 @@ export class RedisConnexion { async redisSetTtl(k, ttl, customPrefix=null){ if(customPrefix!==null) k = customPrefix + k - else if(!k.startsWith(this.config.redis.basePrefix)) k = this.config.redis.basePrefix + k + else if(!k.startsWith(this.redisConfig.basePrefix)) k = this.redisConfig.basePrefix + k if(this.debug) console.log('Redis Set TTL ', k); try { await this.redisClient.expire(k, ttl) } catch(err) { console.error('Redis crash doing Redis expire: ', k, ttl) } } async redisXadd(streamName, kvObj, max = ''){ - if(!streamName.startsWith(this.config.redis.basePrefix)) streamName = this.config.redis.basePrefix + streamName + if(!streamName.startsWith(this.redisConfig.basePrefix)) streamName = this.redisConfig.basePrefix + streamName if(this.debug) console.log('Redis XADD ', streamName, kvObj); let arr = ['XADD', streamName] if(max != '') arr = [...arr, ...['MAXLEN', '~', (1*max).toString()]] @@ -149,7 +156,7 @@ export class RedisConnexion { } async redisXrange(streamName, start = '-', end = '+', withPayload = true){ - if(!streamName.startsWith(this.config.redis.basePrefix)) streamName = this.config.redis.basePrefix + streamName + if(!streamName.startsWith(this.redisConfig.basePrefix)) streamName = this.redisConfig.basePrefix + streamName if(this.debug) console.log('Redis XRANGE ', streamName); if(typeof(start)!='string') start = start.toString() if(typeof(end)!='string') end = end.toString() @@ -173,9 +180,9 @@ export class RedisConnexion { } isHistorizedChan(chan){ - if(!chan.startsWith(this.config.redis.basePrefix)) chan = this.config.redis.basePrefix + chan - var matches = this.config.redis.historizeChannels.filter((e) => { - if(!e.startsWith(this.config.redis.basePrefix)) e = this.config.redis.basePrefix + e + if(!chan.startsWith(this.redisConfig.basePrefix)) chan = this.redisConfig.basePrefix + chan + var matches = this.redisConfig.historizeChannels.filter((e) => { + if(!e.startsWith(this.redisConfig.basePrefix)) e = this.redisConfig.basePrefix + e if(e.indexOf('*') > -1) { let r = new RegExp('^'+e.replace(/\*/g,'(.+)')+'$','g') return(chan.match(r) != null); @@ -206,30 +213,11 @@ export class RedisConnexion { return; } - if(this.redPillsUuids.length>0) { // Any dev bus console in RedPills (promiscuous) mode ? - if(this.debug) console.log(`Will send to ${this.redPillsUuids.length} REDPILLS`); - let shortChan = chan.startsWith(this.config.redis.basePrefix) ? chan.substr(this.config.redis.basePrefix.length) : chan - let payload ={ - 'event': 'REDISMSG', - 'payload': { - 'bmsg':{ // Extra encapsulation to avoid triggering normal listeners on FE - 'msg': msg, - 'chan': shortChan, - } - } - } - for(var uuid of this.redPillsUuids) { - if(uuid in this.wssConnections) { - this.wssConnections[uuid].send(JSON.stringify(payload)); - } - } - } - if(this.debug) console.log('will now fanout...', chan, msg); const uuids = this.getSubscribedUuids(chan) if(uuids.length>0) { // Anyone interested at all about this chan ? if(this.debug) console.log(`Will broadcast to ${uuids.length} web clients`); - let shortChan = chan.startsWith(this.config.redis.basePrefix) ? chan.substr(this.config.redis.basePrefix.length) : chan + let shortChan = chan.startsWith(this.redisConfig.basePrefix) ? chan.substr(this.redisConfig.basePrefix.length) : chan let payload ={ 'event': 'REDISMSG', 'payload': { diff --git a/wssConnexion.js b/wssConnexion.js index 86ba74e..3725496 100644 --- a/wssConnexion.js +++ b/wssConnexion.js @@ -1,5 +1,6 @@ import crypto from 'crypto' import { gatewayActions } from './actions/index.js' +import { RedisSearchLanguages } from 'redis' export class WssConnexion { @@ -11,7 +12,7 @@ export class WssConnexion { this.uuid = options.uuid this.wssSrv = options.wssSrv this.debug = options.debug - this.rediscnx = options.rediscnx + this.allRediscnx = options.allRediscnx this.accessRights = options.accessRights this.userId = options.userId this.roles = options.roles @@ -95,34 +96,17 @@ export class WssConnexion { // Also think about all possibly active bind(this), which -I guess- also make references preventing GC. } - async getAwaitingNotifs(){ - //TODO : AWAIT this from either Redis and/or ML - - // Key: notif destination module, value: either KV with V=nb of notifs, or Array whose length is nb of notifs - let notifs = { // TEST EXAMPLE - "unreadChats": { - "chan001" : 2, - "chan002" : 10, - "chan003" : 7, - }, - "OTS": [ "fallimi", "infosca" ], - "OtherNotifDest" : [] - }; - - return(notifs); - } - subscribeMandatoryChans(){ let mandaChans = this.accessRights.mustSubscribe(this.userId, this.roles) - mandaChans.push('userchans:'+this.userId); // Add user private chan - - mandaChans = mandaChans.map(item=>this.config.redis.basePrefix+item) - - for(var chan of mandaChans){ - if(!(chan in this.rediscnx.subscriptions)) this.rediscnx.subscriptions[chan] = []; - if(this.subscriptions.indexOf(chan)<0) { - this.subscriptions.push(chan); - this.rediscnx.subscriptions[chan].push(this.uuid); + for(let rediscnx of this.allRediscnx){ + mandaChans = mandaChans.filter(chan => chan.startsWith(rediscnx.redisConfig.chansFilter)) + mandaChans = mandaChans.map(item=>rediscnx.redisConfig.basePrefix+item) + for(var chan of mandaChans){ + if(!(chan in rediscnx.subscriptions)) rediscnx.subscriptions[chan] = []; + if(this.subscriptions.indexOf(chan)<0) { + this.subscriptions.push(chan); + rediscnx.subscriptions[chan].push(this.uuid); + } } } this.action_SUBLST('SUBLST', null, ''); @@ -130,11 +114,13 @@ export class WssConnexion { clearAllSubscriptions(){ for(var chan of this.subscriptions){ - if(chan in this.rediscnx.subscriptions) { - this.rediscnx.subscriptions[chan].splice(this.rediscnx.subscriptions[chan].indexOf(this.uuid), 1) ; + for(let rediscnx of this.allRediscnx){ + if(chan in rediscnx.subscriptions) { + rediscnx.subscriptions[chan].splice(rediscnx.subscriptions[chan].indexOf(this.uuid), 1) + } } } - this.subscriptions = []; + this.subscriptions = [] } sendErr(action, msg, reqid){ diff --git a/wssGatewayConfig.json b/wssGatewayConfig.json index 58575ea..ab6746c 100644 --- a/wssGatewayConfig.json +++ b/wssGatewayConfig.json @@ -9,48 +9,59 @@ "XXcertFile": "/etc/letsencrypt/live/42.internike.com/fullchain.pem", "XXcertKeyFile": "/etc/letsencrypt/live/42.internike.com/privkey.pem", "challengeExpiration": 20, - "unsecure": true, - "healthCheckPath": "/status", - "devotpToken": "qhsdfkjhqsgdfkqhs", - "systemChannels": { - "onlineUsers": "onlineUsers" - } + "unsecure": true }, "accessRights":[ { "roles": "*", - "mustSubscribe": [ "system:notifs:[UID]", "onlineUsers", "system:notifs" ], - "canSubscribe": ["gps:*","agents:*"], + "mustSubscribe": [ "system:notifs:[UID]", "system:notifs" ], + "canSubscribe": ["system:gps:*", "arena:gps:*","arena:agents:*"], "canPublish": [ ], - "canSet": [ "[UID]:userPrefs" ], - "canGet": [ "[UID]:userPrefs"] + "canSet": [ ], + "canGet": [ ] }, { "roles": ["admin"], - "mustSubscribe": ["system:adminNotifs"], - "canSubscribe": [ "infraNotifs:*", "gps:*","agents:*"], - "canPublish": ["gps:*", "agents:*", "system:notifs:*", "system:notifs", "infraNotifs:*"], - "canSet": ["*:userPrefs"], - "canGet": ["*:userPrefs"], - "canDo": ["getActiveUsers", "killSessions","reloadAccessRights", "getAccessRights", "getPlatformState", "setPlatformState", "redPill"] + "mustSubscribe": ["system:infraNotifs", "system:replies:[UID]"], + "canSubscribe": [ ], + "canPublish": [ "system:requests:*" ], + "canSet": ["system:*"], + "canGet": ["system:*"], + "canDo": ["getActiveUsers", "reloadAccessRights", "getAccessRights"] } ], - "redis":{ - "host": "127.0.0.1", - "tls":false, - "port": 6379, - "Xuser": "msgbus", - "Xpass": "yj465sqfCTA0bKDw3zEYg8OqYl9Tv", - "user": "", - "pass": "", - "historizeChannels": [ "userchans:*" ], - "historizeMax": 1000, - "authTokenPrefix": "authorizer:message_bus_user_", - "challengePrefix": "msgBusChallenge:", - "basePrefix": "messageBus:", - "storePrefix": "messageBus:Store:", - "storeMaxSize": 51200, - "historizePrefix": "histoChan:", - "platformStateKey": "authorizer:platformDown" - } + "redis":[ + { + "redisId":"SYS_1", + "role": "primary", + "host": "127.0.0.1", + "tls":false, + "port": 6380, + "user": "", + "pass": "", + "historizeChannels": [ ], + "historizeMax": 1000, + "ChansFilter":"system:", + "basePrefix": "messageBus:", + "storePrefix": "messageBus:Store:", + "storeMaxSize": 51200, + "historizePrefix": "histoChan:" + }, + { + "redisId":"ARN_1", + "role": "primary", + "host": "127.0.0.1", + "tls":false, + "port": 6379, + "user": "", + "pass": "", + "historizeChannels": [ ], + "historizeMax": 1000, + "chansFilter":"arena:", + "basePrefix": "messageBus:", + "storePrefix": "messageBus:Store:", + "storeMaxSize": 51200, + "historizePrefix": "histoChan:" + } + ] } diff --git a/wssServer.js b/wssServer.js index a3a166d..bfdabcf 100644 --- a/wssServer.js +++ b/wssServer.js @@ -4,10 +4,10 @@ import {WssConnexion} from './wssConnexion.js' export class wssServer { - constructor(configHelper, WSSServer, REDIScnx, debug) { + constructor(configHelper, WSSServer, allRediscnx, debug) { this.debug = debug if(this.debug) console.log('Starting WSSGateway...') - this.REDIScnx = REDIScnx + this.allRediscnx = allRediscnx this.configHelper = configHelper this.wssGatewayConfig = configHelper.config this.AllWssConnections = {} @@ -38,7 +38,7 @@ export class wssServer { wssSrv: this, debug: this.debug, config: this.wssGatewayConfig, - rediscnx: this.REDIScnx, + allRediscnx: this.allRediscnx, accessRights: this.accessRights, userId: socket.session.userInfos.identity.uuid, roles: socket.session.userInfos.roles, @@ -47,23 +47,10 @@ export class wssServer { if(!(wssCnx.userId in this.Users2uuids)) this.Users2uuids[wssCnx.userId] = new Set(); this.Users2uuids[wssCnx.userId].add(uuid); this.OnlineUsers.add(wssCnx.userId); - this.REDIScnx.wssConnections[uuid] = wssCnx; + this.allRediscnx.forEach(cnx => { cnx.wssConnections[uuid] = wssCnx }) this.postLoginActions(wssCnx) } else socket.close() - - - // wssCnx.doLogin().then(() => { // Things to execute only when successfuly logged-in - // if(!(wssCnx.userId in this.Users2uuids)) this.Users2uuids[wssCnx.userId] = new Set(); - // this.Users2uuids[wssCnx.userId].add(uuid); - // this.OnlineUsers.add(wssCnx.userId); - // this.REDIScnx.wssConnections[uuid] = wssCnx; - // //}).then(() => { - // wssCnx.send(JSON.stringify({ - // 'action': 'LOGIN', - // 'logged': true - // })) - // this.postLoginActions(wssCnx) - // }) + } postLoginActions(wssCnx) { @@ -79,7 +66,7 @@ export class wssServer { this.Users2uuids[userId].delete(uuid); if(this.Users2uuids[userId].size == 0) this.OnlineUsers.delete(userId); } - delete(this.REDIScnx.wssConnections[uuid]); + this.allRediscnx.forEach(cnx => delete cns.wssConnections[uuid]) this.fanoutOnlineUsers(this.getOnlineUsers()); }