From de69b3d4de0eec36ecd815790814204a0e370bf1 Mon Sep 17 00:00:00 2001 From: STEINNI Date: Thu, 11 Sep 2025 20:50:34 +0000 Subject: [PATCH] 2nd --- accesRights.js | 134 ++++++++++++++++ actions/chat.js | 167 ++++++++++++++++++++ actions/index.js | 9 ++ actions/notifications.js | 13 ++ actions/pubSub.js | 242 ++++++++++++++++++++++++++++ actions/sessions.js | 316 +++++++++++++++++++++++++++++++++++++ actions/store.js | 110 +++++++++++++ actions/utilities.js | 203 ++++++++++++++++++++++++ configHelper.js | 97 ++++++++++++ configSchema.json | 159 +++++++++++++++++++ package.json | 27 ++++ redisConnexion.js | 254 +++++++++++++++++++++++++++++ startWssGw.sh | 13 ++ stopWssGw.sh | 11 ++ tests/accessRights.test.js | 164 +++++++++++++++++++ wssConnexion.js | 290 ++++++++++++++++++++++++++++++++++ wssGateway.js | 108 +++++++++++++ wssServer.js | 110 +++++++++++++ 18 files changed, 2427 insertions(+) create mode 100644 accesRights.js create mode 100644 actions/chat.js create mode 100644 actions/index.js create mode 100644 actions/notifications.js create mode 100644 actions/pubSub.js create mode 100644 actions/sessions.js create mode 100644 actions/store.js create mode 100644 actions/utilities.js create mode 100644 configHelper.js create mode 100644 configSchema.json create mode 100644 package.json create mode 100644 redisConnexion.js create mode 100755 startWssGw.sh create mode 100755 stopWssGw.sh create mode 100644 tests/accessRights.test.js create mode 100644 wssConnexion.js create mode 100644 wssGateway.js create mode 100644 wssServer.js diff --git a/accesRights.js b/accesRights.js new file mode 100644 index 0000000..1d6c974 --- /dev/null +++ b/accesRights.js @@ -0,0 +1,134 @@ +module.exports = class AccesRights { + + constructor(config, debug){ + this.debug = debug + this.config = config + this.rights = config.accessRights + } + + refreshAccessRights(config){ + this.rights = config.accessRights + } + + mustSubscribe(uid, roles) { + if(roles.indexOf('*')<0) roles.push('*') + let chans = [] + for(let myRole of roles){ + for(let rightBlock of this.rights) { + if(!rightBlock.mustSubscribe) continue + if((rightBlock.roles=='*') || (rightBlock.roles.indexOf(myRole)>-1)) { + chans = this.merge(chans, rightBlock.mustSubscribe.map(item=>item.replace(/\[UID\]/g,uid))) + } + } + } + return(chans) + } + + isMandatory(uid, roles, chan){ + return(this.mustSubscribe(uid, roles).filter(this.chanMatch.bind(this, chan)).length>0) + } + + canSubscribe(uid, roles, myChan) { + if(roles.indexOf('*')<0) roles.push('*') + for(let myRole of roles){ + for(let rightBlock of this.rights) { + if(!rightBlock.canSubscribe) continue + if((rightBlock.roles=='*') || (rightBlock.roles.indexOf(myRole)>-1)) { + let canSubList = rightBlock.canSubscribe.map(item=>item.replace(/\[UID\]/g, uid)) + if(canSubList.find(this.chanMatch.bind(this, myChan))) return(true) + } + } + } + //if(this.debug) console.log(`Roles : ${roles} cannot subscribe on ${myChan}`) + return(false) + } + + canPublish(uid, roles, myChan) { + myChan = myChan.startsWith(this.config.redis.basePrefix) ? myChan.substr(this.config.redis.basePrefix.length) : myChan + if(roles.indexOf('*')<0) roles.push('*') + for(let myRole of roles){ + for(let rightBlock of this.rights) { + if(!rightBlock.canPublish) continue + if((rightBlock.roles=='*') || (rightBlock.roles.indexOf(myRole)>-1)) { + let canPubList = rightBlock.canPublish.map(item=>item.replace(/\[UID\]/g, uid)) + if(canPubList.find(this.chanMatch.bind(this, myChan))) return(true) + } + } + } + //if(this.debug) console.log(`Roles : ${roles} cannot publish on ${myChan}`) + return(false) + } + + canSet(uid, roles, myKey){ + myKey = myKey.startsWith(this.config.redis.storePrefix) ? myKey.substr(this.config.redis.storePrefix.length) : myKey + if(roles.indexOf('*')<0) roles.push('*') + for(let myRole of roles){ + for(let rightBlock of this.rights) { + if(!rightBlock.canSet) continue + if((rightBlock.roles=='*') || (rightBlock.roles.indexOf(myRole)>-1)) { + let canSetList = rightBlock.canSet.map(item=>item.replace(/\[UID\]/g, uid)) + if(canSetList.find(this.chanMatch.bind(this, myKey))) return(true) + } + } + } + //if(this.debug) console.log(`Roles : ${roles} cannot set ${myKey}`) + return(false) + } + + canGet(uid, roles, myKey){ + myKey = myKey.startsWith(this.config.redis.storePrefix) ? myKey.substr(this.config.redis.storePrefix.length) : myKey + if(roles.indexOf('*')<0) roles.push('*') + for(let myRole of roles){ + for(let rightBlock of this.rights) { + if(!rightBlock.canGet) continue + if((rightBlock.roles=='*') || (rightBlock.roles.indexOf(myRole)>-1)) { + let canGetList = rightBlock.canGet.map(item=>item.replace(/\[UID\]/g, uid)) + if(canGetList.find(this.chanMatch.bind(this, myKey))) return(true) + } + } + } + //if(this.debug) console.log(`Roles : ${roles} cannot get ${myKey}`) + return(false) + } + + canDo(roles, action, uid=null){ + if(roles.indexOf('*')<0) roles.push('*') + for(let myRole of roles){ + for(let rightBlock of this.rights) { + if(!rightBlock.canDo) continue + if((rightBlock.roles=='*') || (rightBlock.roles.indexOf(myRole)>-1)) { + if(rightBlock.canDo.indexOf(action)>-1) { + if((rightBlock.uuids) && Array.isArray((rightBlock.uuids))) { + // !!! Separate condition so if rightBlock.uuids but not uid in it, we don't give access ! + if(rightBlock.uuids.includes(uid)) return(true) // null should never be in config uuids => old style calls are safe + } else { // no uuid block => role is sufficient to give access + return(true) + } + } + } + } + } + + //Anti-shoot-your-foot + if((action=='reloadAccessRights') && (roles.includes('EIC_Dev'))) { + console.error('Prevented you from shooting your foot ! \nPlease keep least EIC_Dev in reload access rights !') + return(true) + } + + //if(this.debug) console.log(`Roles : ${roles} cannot do ${action}`) + return(false) + } + + chanMatch(myChan, configChan) { + if((!myChan) || (typeof(myChan)!='string')) return(false) + let re = new RegExp('^'+configChan.replace(/\*/g,'(.+)')+'$','g') + return(myChan.match(re)!=null) + } + + merge(x, y) { + let tmp = x + for(let yitem of y) if(tmp.indexOf(yitem)<0) tmp.push(yitem) + return(tmp) + } + +} \ No newline at end of file diff --git a/actions/chat.js b/actions/chat.js new file mode 100644 index 0000000..012ee53 --- /dev/null +++ b/actions/chat.js @@ -0,0 +1,167 @@ +module.exports.methods = { + + /* Creates (predictable) peer-to-peer chan if necessary, or check for lobby existence, then subscribe to it. + Request: + { + "action": "STARTCHAT", + "payload": "P" // or "C" => "P" = Peer-to-peer, "C" = (chat) Channel + } + reply: + { + "action": "STARTCHAT", + "success": true, + "payload" : "dynamically created chan" + } + */ + action_STARTCHAT(action, payload, reqid){ + if(typeof(payload)!='string'){ + this.sendErr(action, 'Invalid payload', reqid); + return; + }; + let recipientId = payload.substring(2); + let chan; + if(payload[0]=='P') { + chan = (this.userId "P" = Peer-to-peer, "C" = (chat) Channel + } + reply: + { + "action": "SENDCHAT", + "success": true, + "payload" : null + } + */ + action_SENDCHAT(action, payload, reqid){ + //TODO: prevent unauthorized recipient !! + let recipientId = payload.recipient.substring(2); + let chan; + payload.event = 'CHATMSG' + if(payload.recipient[0]=='P') { + chan = (this.userId "P" = Peer-to-peer, "C" = (chat) Channel + } + reply: + { + "action": "ISONLINE", + "success": true, + "payload" : + } + */ + // You can only ask the satus of a list of usernames you know (and have the right to) + action_ISONLINE(action, payload, reqid){ + //TODO: can you really ask about those users ? (but that might cost too much time, because => ML?) + if(!Array.isArray(payload)){ + this.sendErr(action, 'Invalid payload', reqid); + return; + }; + let onlineUsers = Object.keys(this.wssSrv.getOnlineUsers()); + let reply = { + 'action': action, + 'payload': payload.filter((x) => (onlineUsers.indexOf(x)>-1)), + 'success': true, + }; + if(reqid) reply.reqid = reqid; + this.send(JSON.stringify(reply)); + }, + + // Same as ISONLINE, but subscribe to watch changes + action_WATCHUSERS(action, payload, reqid){ + if(!Array.isArray(payload)){ + this.sendErr(action, 'Invalid payload', reqid); + return; + } + //TODO: can you really ask about those users ? (but that might cost too much time, because => ML?) + this.usersWatched = payload; + let reply = { + 'action': action, + 'payload': null, + 'success': true, + }; + if(reqid) reply.reqid = reqid; + this.send(JSON.stringify(reply)); + }, + + /* + Request: + { + "action": "CHANLST", + "payload": { "filter": "ChatRoom*" } + } + reply: + { + "action": "CHANLST", + "success": true, + "payload" : ["ChatRoom1","ChatRoom2","ChatRoom_Experts"] + } + */ + action_CHANLST(action, payload, reqid){ + //TODO : Filter based on user rights!! + // + let reply = { + 'action': action, + 'payload': [ + this.config.redis.basePrefix+"onlineUsers", + this.config.redis.basePrefix+"system:chan1", + this.config.redis.basePrefix+"proposals:updates" + ],//this.config.redis.watchChannels, + 'success': true, + }; + if(reqid) reply.reqid = reqid; + this.send(JSON.stringify(reply)); + }, + +} \ No newline at end of file diff --git a/actions/index.js b/actions/index.js new file mode 100644 index 0000000..63a83b5 --- /dev/null +++ b/actions/index.js @@ -0,0 +1,9 @@ +module.exports.methods = {} +Object.assign(module.exports.methods, require('./utilities').methods ) +Object.assign(module.exports.methods, require('./pubSub').methods ) +Object.assign(module.exports.methods, require('./store').methods ) +Object.assign(module.exports.methods, require('./sessions').methods ) +Object.assign(module.exports.methods, require('./notifications').methods ) +Object.assign(module.exports.methods, require('./chat').methods ) + + diff --git a/actions/notifications.js b/actions/notifications.js new file mode 100644 index 0000000..879e740 --- /dev/null +++ b/actions/notifications.js @@ -0,0 +1,13 @@ +module.exports.methods = { + + async action_NOTIFS(action, payload, reqid){ + let reply = { + 'action': action, + 'payload': await this.getAwaitingNotifs(), + 'success': true, + }; + if(reqid) reply.reqid = reqid; + this.send(JSON.stringify(reply)); + }, + +} \ No newline at end of file diff --git a/actions/pubSub.js b/actions/pubSub.js new file mode 100644 index 0000000..163755d --- /dev/null +++ b/actions/pubSub.js @@ -0,0 +1,242 @@ +module.exports.methods = { + /* Request: + { + "action": "SUB", + "payload" : ["chan1","chan2","unauthorized"] + } + Reply: (returns all active subscriptions after this SUB) + { + "action": "SUB", + "success": true, + "payload" : ["chan1","chan2","wasalreadysubscribed"] + } + */ + action_SUB(action, payload, reqid){ + if(!Array.isArray(payload)){ + this.sendErr(action, 'Invalid payload', reqid); + return; + }; + + for(var chan of payload){ + if((!chan) || (typeof(chan)!='string')) continue + if(!this.accessRights.canSubscribe(this.userId, this.roles, chan)) continue + // Chat chans are forbidden here + if((chan.substr(0,8) == 'userchans') || (chan.substr(0,9) == 'lobbychans')) continue; + + if(!chan.startsWith(this.config.redis.basePrefix)) chan = this.config.redis.basePrefix + chan + if(this.subscriptions.indexOf(chan)<0) { + this.subscriptions.push(chan); + } + if(!(chan in this.rediscnx.subscriptions)) this.rediscnx.subscriptions[chan] = []; + if(this.rediscnx.subscriptions[chan].indexOf(this.uuid)<0) { + this.rediscnx.subscriptions[chan].push(this.uuid); + } + } + + let shortChans = this.subscriptions.map(item => ( + item.startsWith(this.config.redis.basePrefix) ? item.substr(this.config.redis.basePrefix.length) : item + )) + + let reply = { + 'action': action, + 'payload': shortChans, + 'success': true, + }; + if(reqid) reply.reqid = reqid; + this.send(JSON.stringify(reply)); + }, + + /* + Request: + { + "action":"UNSUB", + "payload" : ["chan1","notsubscribed_chan","mandatory_chan"] + } + reply: + { + "action":"UNSUB", + "success": true, + "payload" : ["chan1"] + } + */ + action_UNSUB(action, payload, reqid){ + if(!Array.isArray(payload)){ + this.sendErr(action, 'Invalid payload', reqid); + return; + }; + + for(var chan of payload){ + if((!chan) || (typeof(chan)!='string')) continue + if(this.accessRights.isMandatory(this.userId, this.roles, chan)) continue + + // Chat chans are forbidden here + if((chan.substr(0,8) == 'userchans') || (chan.substr(0,9) == 'lobbychans')) continue; + + if(!chan.startsWith(this.config.redis.basePrefix)) chan = this.config.redis.basePrefix + chan + if(this.subscriptions.indexOf(chan)>-1) { + this.subscriptions.splice(this.subscriptions.indexOf(chan), 1); + } + if((chan in this.rediscnx.subscriptions) && (this.rediscnx.subscriptions[chan].indexOf(this.uuid)>-1)) { + this.rediscnx.subscriptions[chan].splice(this.rediscnx.subscriptions[chan].indexOf(this.uuid), 1) ; + } + } + + let shortChans = this.subscriptions.map(item => ( + item.startsWith(this.config.redis.basePrefix) ? item.substr(this.config.redis.basePrefix.length) : item + )) + let reply = { + 'action': action, + 'payload': shortChans, + 'success': true, + }; + if(reqid) reply.reqid = reqid; + this.send(JSON.stringify(reply)); + }, + + /* + Request: + { + "action": "SUBLST", + } + reply: + { + "action": "SUBLST", + "success": true, + "payload" : ["chan1","chan2","mandatory_chan"] + } + */ + action_SUBLST(action, payload, reqid){ + let shortChans = this.subscriptions.map(item => ( + item.startsWith(this.config.redis.basePrefix) ? item.substr(this.config.redis.basePrefix.length) : item + )) + let reply = { + 'action': action, + 'payload': shortChans, + 'success': true, + }; + if(reqid) reply.reqid = reqid; + this.send(JSON.stringify(reply)); + }, + + /* + Request: + { + "action": "PUB", + "payload" : { 'chan':'chan1', 'msg':'Hello folks !'} + } + reply: + { + "action": "PUB", + "success": true, + } + */ + async action_PUB(action, payload, reqid){ + if((typeof(payload)!='object') || (typeof(payload.chan)!='string') || (typeof(payload.msg)!='string')){ + this.sendErr(action, 'Invalid payload', reqid); + if(this.debug) console.log('PUB: Invalid payload') + return; + }; + // Chat chans are forbidden here + if((payload.chan.substr(0,8) == 'userchans') || (payload.chan.substr(0,9) == 'lobbychans')){ + this.sendErr(action, 'Forbidden chan', reqid); + if(this.debug) console.log('PUB: Forbidden chan') + return; + }; + + if( (!this.accessRights.canPublish(this.userId, this.roles, payload.chan)) && + (! this.rediscnx.redPillsUuids.includes(this.uuid)) ) { + this.sendErr(action, 'Unauthorized chan !', reqid); + if(this.debug) console.log('PUB: Unauthorized chan', payload.chan) + return + } + + let msgO + try { msgO = JSON.parse(payload.msg) } catch(err) { msgO = {'err':err} } + + let histId = null + if(this.rediscnx.isHistorizedChan(payload.chan)) { // historize first to add the histId + let shortChan = payload.chan.startsWith(this.config.redis.basePrefix) ? payload.chan.substr(this.config.redis.basePrefix.length) : payload.chan + histId = await this.rediscnx.redisXadd(this.config.redis.basePrefix+this.config.redis.historizePrefix + shortChan, payload.msg, this.config.redis.historizeMax); + if( !histId) { + this.sendErr(action, 'Could not historize, aborted event publish !', reqid); + console.error(`Could not historize for "${shortChan}", aborted event publish !`) + return + } + msgO.histId = histId + } + + msgO.sender = this.userId + try { payload.msg = JSON.stringify(msgO) } catch(err) {payload.msg = `{"err":"${err}}"` } + this.rediscnx.redisPublish(payload.chan, payload.msg) + + let reply = { + 'action': action, + 'payload': null, + 'success': true, + }; + if(reqid) reply.reqid = reqid; + this.send(JSON.stringify(reply)); + }, + + /* + Request: + { + "action": "CHANHIST", + "payload": { + "chan": "aze", + "from": "123456879-0", //Histid or seconds since epoch + "to": "987654321-1" // Optional + } + } + reply: + { + "action": "CHANHIST", + "success": true, + "payload" : [ + "123456879-1": { payload }, + "123456885-0": { payload }, + "123456890-0": { payload } + ] + } + */ + async action_CHANHIST(action, payload, reqid){ + if((!payload.channel) || (typeof(payload.channel)!='string') || (!payload.from) || (typeof(payload.from)!='string') || (payload.to && (typeof(payload.to)!='string'))){ + this.sendErr(action, 'Invalid payload', reqid) + return + } + if( (!payload.from.match(/^(\d{13,})-(\d+)$/)) && (!payload.from.match(/^(\d{10,})$/)) ){ + this.sendErr(action, 'Invalid payload', reqid) + return + } + if(payload.to && (!payload.to.match(/^(\d{13,})-(\d+)$/)) && (!payload.to.match(/^(\d{10,})$/)) ){ + this.sendErr(action, 'Invalid payload', reqid) + return + } + + if( (!this.accessRights.canSubscribe(this.userId, this.roles, payload.channel)) && + (! this.rediscnx.redPillsUuids.includes(this.uuid)) ) { + this.sendErr(action, 'Unauthorized channel !', reqid) + return + } + + if(!this.rediscnx.isHistorizedChan(payload.channel)){ + this.sendErr(action, 'Not an historized channel !', reqid) + return + } + + let from = (payload.from.indexOf('-')>-1) ? payload.from : 1000*payload.from + let to = '+' + if(payload.to) to = (payload.to.indexOf('-')>-1) ? payload.to : 1000*payload.to + let streamName = payload.channel.startsWith(this.config.redis.basePrefix) ? this.config.redis.historizePrefix+payload.channel.substr(this.config.redis.basePrefix.length) : this.config.redis.historizePrefix + payload.channel + let respPayload = await this.rediscnx.redisXrange(streamName, from, to); + + let reply = { + 'action': action, + 'payload': respPayload, + 'success': true, + }; + if(reqid) reply.reqid = reqid; + this.send(JSON.stringify(reply)); + }, + +} diff --git a/actions/sessions.js b/actions/sessions.js new file mode 100644 index 0000000..7fabdae --- /dev/null +++ b/actions/sessions.js @@ -0,0 +1,316 @@ +module.exports.methods = { + + /* Request payload : null + Reply: + { + "action": "GETACTIVEUSERS", + "payload": [ + { + "uid": "steinic", + "email": "Nicolas.STEIN@ext.ec.europa.eu", + "given_name": "Nicolas", + "family_name": "STEIN", + "userRoles": [ + "BP_PO", + "SP_Admin", + "Org_Member", + "Org_Pending", + "EIC_Dev" + ], + "sessionExpire": 3594, + "busConnected": true + } + ], + "success": true, + "reqid": "df58a401-4ed2-4908-a2b1-8bae155e413a" + } + */ + async action_GETACTIVEUSERS(action, payload, reqid){ + if(!this.accessRights.canDo(this.roles, 'getActiveUsers')) { + this.sendErr(action, 'Unauthorized action !', reqid); + return + } + +//TODO: take from new config key instead of hardcded + const iterOptions = { + TYPE: 'string', + MATCH: 'authorizer:sessid_*' + } + + let activeUsers = [] + for await (const key of this.rediscnx.redisClient.scanIterator(iterOptions)) { + let sess = null + try{ sess = JSON.parse(await this.rediscnx.redisGet(key, '')) } + catch(err) { console.log('bad sess info')} + if((!sess) || (!sess.isAuthenticated) || (!sess.sessionID) + || (!sess.userInfo) || (!sess.userInfo.userRoles) || (!sess.userInfo.euLoginId)){ + continue + } + + let ttl = await this.rediscnx.redisGetTtl(key, '') + activeUsers.push({ + uid: sess.userInfo.euLoginId, + email: sess.userInfo.email, + given_name: sess.userInfo.given_name, + family_name: sess.userInfo.family_name, + userRoles: sess.userInfo.userRoles, + sessionExpire: ttl, + busConnected: this.wssSrv.sessionConnected(sess.sessionID), + }) + } + var reply = { + 'action': action, + 'payload': activeUsers, + 'success': true, + }; + if(reqid) reply.reqid = reqid; + this.send(JSON.stringify(reply)); + }, + /* + * payload: { + uids: [ 'fallimi' ], + notRoles : ['EIC_ADMIN', 'EIC_Dev' ], + ttl: 0 + } + => Both conditions must be met (here nothing gets done as fallimi is EIC_Dev) + + Any uid, but not some roles : + { + uids: null, + notRoles : ['EIC_ADMIN', 'EIC_Dev' ], + ttl: 0 + } + + Some uids, don't care their roles in 30 seconds : + { + uids: [ 'infosca', 'nz01234' ], + notRoles : [], + ttl: 30 + } + + */ + + + /* Request payload : { "uid":"steinni" } + Reply: + { + "action": "GETUSERSTATUS", + "payload": + { + "uid": "steinic", + "email": "Nicolas.STEIN@ext.ec.europa.eu", + "given_name": "Nicolas", + "family_name": "STEIN", + "sessionExpire": 3594, + "busConnected": true + }, + "success": true, + "reqid": "df58a401-4ed2-4908-a2b1-8bae155e413a" + } + */ + async action_GETUSERSTATUS(action, payload, reqid){ + if(!this.accessRights.canDo(this.roles, 'getUserStatus')) { + this.sendErr(action, 'Unauthorized action !', reqid); + return + } + + const iterOptions = { + TYPE: 'string', + MATCH: 'authorizer:sessid_*' + } + + let user = { + uid: payload.uid, + email: null, + given_name: null, + family_name: null, + sessionExpire: null, + busConnected: null, + } + + for await (const key of this.rediscnx.redisClient.scanIterator(iterOptions)) { + let sess = null + try{ sess = JSON.parse(await this.rediscnx.redisGet(key, '')) } + catch(err) { console.log('bad sess info')} + + if((!sess) || (!sess.isAuthenticated) || (!sess.sessionID) + || (!sess.userInfo) || (!sess.userInfo.userRoles) || (!sess.userInfo.euLoginId) + || (sess.userInfo.euLoginId != payload.uid) + ) { + continue + } else { + let ttl = await this.rediscnx.redisGetTtl(key, '') + user={ + uid: sess.userInfo.euLoginId, + email: sess.userInfo.email, + given_name: sess.userInfo.given_name, + family_name: sess.userInfo.family_name, + sessionExpire: ttl, + busConnected: this.wssSrv.sessionConnected(sess.sessionID), + } + break + } + } + + var reply = { + 'action': action, + 'payload': user, + 'success': true, + }; + if(reqid) reply.reqid = reqid; + this.send(JSON.stringify(reply)); + }, + + async action_KILLSESSION(action, payload, reqid){ + if(!this.accessRights.canDo(this.roles, 'killSessions')) { + this.sendErr(action, 'Unauthorized action !', reqid); + return + } + if( (!payload.notRoles) || (!Array.isArray(payload.notRoles)) || (payload.uids && (!Array.isArray(payload.uids))) ){ + this.sendErr(action, 'Bad payload !', reqid); + return + } + + + +//TODO: take from new config key instead of hardcded + const iterOptions = { + TYPE: 'string', + MATCH: 'authorizer:sessid_*' + } + + for await (const key of this.rediscnx.redisClient.scanIterator(iterOptions)) { + if(key.endsWith('_cookie')) continue + let sess = null + try{ sess = JSON.parse(await this.rediscnx.redisGet(key, '')) } + catch(err) { console.log('bad sess info')} + if((!sess) || (!sess.isAuthenticated)) continue + + if(payload.uids && (payload.uids.indexOf(sess.userInfo['euLoginId'])<0)) continue + let intersect = payload.notRoles.filter(value => sess.userInfo.userRoles.includes(value)); + if(intersect.length>0) continue + + if((!payload.ttl) || (typeof(payload.ttl)!= number) || (payload.ttl<0) || (payload.ttl>3600)) payload.ttl=0 + let ttl = await this.rediscnx.redisSetTtl(key, payload.ttl, '') + + } + var reply = { + 'action': action, + 'success': true, + }; + if(reqid) reply.reqid = reqid; + this.send(JSON.stringify(reply)); + }, + + /* Request: (curtain down, except for devs & admins) + { + "action": "SETSPARCSTATE", + "payload" : { + blockedUids: [], + allowedRoles : ['EIC_Admin', 'EIC_Dev'], + }, + } + + Request: (curtain up, for everyone) + { + "action": "SETSPARCSTATE", + "payload" : { + blockedUids: [], + allowedRoles : '*', + }, + } + + Request: (curtain up, block some bad-guys) + { + "action": "SETSPARCSTATE", + "payload" : { + blockedUids: ['hacker1', 'hacker2'], + allowedRoles : '*', + }, + } + + Reply: + { + "success": true, + "reqid": "6az5e4r6a", + "payload": { the accessrights } + } + */ + async action_SETPLATFORMMODE(action, payload, reqid){ + if(!this.accessRights.canDo(this.roles, 'setPlatformState')) { + this.sendErr(action, 'Unauthorized action !', reqid); + return + } + if((typeof(payload)!='object') || (!Array.isArray(payload.blockedUUIDs)) || + ( (typeof(payload.platformRestrictions)=='object') && (!Array.isArray(payload.platformRestrictions.allowedRoles)) ) + ){ + this.sendErr(action, 'Invalid payload', reqid) + return + } + + if(typeof(payload.platformRestrictions)=='object'){ // curtain down + if(!payload.platformRestrictions.allowedRoles.includes('EIC_Dev')){ // anti-shoot-your-foot + payload.platformRestrictions.allowedRoles.push('EIC_Dev') + } + } else { // curtain up + //force-in an example + payload.XX_platformRestrictions = { "allowedRoles":["EIC_Admin","EIC_Dev"],"allowedUUIDs":["valentin"] } + } + + + + await this.rediscnx.redisSet(this.config.redis.platformStateKey, + payload, + 0, + '' + ) + + var reply = { + 'action': action, + 'success': true + }; + if(reqid) reply.reqid = reqid; + this.send(JSON.stringify(reply)); + }, + + + /* Request: + { + "action": "GETSPARCMODE" + "payload": { + "key": "keyname" + } + "reqid": "6az5e4r6a" + } + Reply: + { + "action":"STORE", + "success":true, + "payload": { + ...the sparc mode + } + "reqid": reqid + } + */ + async action_GETPLATFORMMODE(action, payload, reqid){ + if(!this.accessRights.canDo(this.roles, 'getPlatformState')) { + this.sendErr(action, 'Unauthorized action !', reqid); + return + } + + let rawVal = await this.rediscnx.redisGet(this.config.redis.platformStateKey, '') + let val = null + try { val = JSON.parse(rawVal)} + catch(err) { console.error('Action GETSPARCMODE: Not a json !? ', rawVal) } + + var reply = { + 'action': action, + 'payload': val, + 'success': true, + }; + if(reqid) reply.reqid = reqid; + this.send(JSON.stringify(reply)); + } + + +} \ No newline at end of file diff --git a/actions/store.js b/actions/store.js new file mode 100644 index 0000000..0016722 --- /dev/null +++ b/actions/store.js @@ -0,0 +1,110 @@ +module.exports.methods = { + + /* Request: + { + "action": "SET" + "payload": { + "key": "keyname", + "value": "stringifiedvalue", // If null : deletes the key + "expire": 60 // if present & non zero, expires the key in x seconds. + // if absent or zero, we force a TTL of 2 years = 63072000 sec. + "observe": true // if true: make it an observable key & subscribe to it + } + "reqid": "6az5e4r6a" + } + Reply: + { + "action":"SET", + "success":true, + "reqid": reqid + } + */ + async action_SET(action, payload, reqid){ + //TODO : observable... + if((typeof(payload)!='object') || (typeof(payload.key)!='string') || + ((typeof(payload.value)!='object') && (payload.value!==null)) || + (payload.expire && (typeof(payload.expire)!='number')) || + (payload.observe && (typeof(payload.observe)!='boolean')) + ){ + this.sendErr(action, 'Invalid payload', reqid) + return + } + + if(!this.accessRights.canSet(this.userId, this.roles, payload.key)){ + this.sendErr(action, 'Unauthorized key !', reqid); + return + } + + if(payload.value) { + let val = null + try { val = JSON.stringify(payload.value)} + catch(err) { + this.sendErr(action, 'Cannot stringify value object !', reqid); + return + } + if(val.length > this.config.redis.storeMaxSize){ + this.sendErr(action, 'value too large !', reqid); + return + } + let exp = ((payload.expire>0) && (payload.expire<63072000)) ? payload.expire : 63072000 + await this.rediscnx.redisSet(payload.key, val, exp, this.config.redis.storePrefix) + } else { + await this.rediscnx.redisDel(payload.key, this.config.redis.storePrefix) + } + var reply = { + 'action': action, + 'success': true, + }; + if(reqid) reply.reqid = reqid; + this.send(JSON.stringify(reply)); + }, + + /* Request: + { + "action": "GET" + "payload": { + "key": "keyname" + } + "reqid": "6az5e4r6a" + } + Reply: + { + "action":"GET", + "success":true, + "payload": { + "key" : key, + "value": value + } + "reqid": reqid + } + */ + async action_GET(action, payload, reqid){ + //TODO : observable... + if((typeof(payload)!='object') || (typeof(payload.key)!='string')){ + this.sendErr(action, 'Invalid payload', reqid); + return; + }; + + if(!this.accessRights.canGet(this.userId, this.roles, payload.key)) { + this.sendErr(action, 'Unauthorized key !', reqid); + return + } + + let rawVal = await this.rediscnx.redisGet(payload.key, this.config.redis.storePrefix) + let val = null + try { val = JSON.parse(rawVal)} + catch(err) { console.error('Action GET: Not a json !? ', rawVal) } + var reply = { + 'action': action, + 'payload': { + 'key': payload.key, + 'value': val + }, + 'success': true, + }; + if(reqid) reply.reqid = reqid; + this.send(JSON.stringify(reply)); + } + + +} \ No newline at end of file diff --git a/actions/utilities.js b/actions/utilities.js new file mode 100644 index 0000000..f6911a7 --- /dev/null +++ b/actions/utilities.js @@ -0,0 +1,203 @@ +module.exports.methods = { + + /* Request: + { + "action": "PING" + } + Reply: + { + "action": "PONG", + } + */ + action_PONG(action, payload){ + clearTimeout(this.keepAliveBomb); + this.keepAliveNextTimeout = setTimeout(this.keepAlive.bind(this),this.config.server.keepAliveInterval*1000); + }, + + + /* Request: + { + "action": "TIME" + "reqid": "6az5e4r6a" + } + Reply: + { + "action": "TIME", + "success": true, + "payload" : { + wssGatewayTime: "2022-09-01T14:42:22.603Z", + redisTime: "2022-09-01T14:42:22.603Z" + }, + "reqid": "6az5e4r6a" + } + */ + action_TIME(action, payload, reqid){ + var tmstp =new Date().toISOString(); + var reply = { + 'action': action, + 'payload': { + wssGatewayTime: tmstp, + redisTime: this.rediscnx.redisClient.time() + }, + 'success': true, + }; + if(reqid) reply.reqid = reqid; + this.send(JSON.stringify(reply)); + }, + + + /* Request: + { + "action": "RELOADACCESSRIGHTS" + } + Reply: + { + "success": true, + "reqid": "6az5e4r6a" + } + */ + action_RELOADACCESSRIGHTS(action, payload, reqid){ + if(!this.accessRights.canDo(this.roles, 'reloadAccessRights')) { + this.sendErr(action, 'Unauthorized action !', reqid); + return + } + this.wssSrv.reloadAccessRights() + var reply = { + 'action': action, + 'success': true, + }; + if(reqid) reply.reqid = reqid; + this.send(JSON.stringify(reply)); + }, + + /* Request: + { + "action": "GETACCESSRIGHTS" + } + Reply: + { + "success": true, + "reqid": "6az5e4r6a", + "payload": { the accessrights } + } + + Kept for backward compatibility : GETCONFIG gets everything ! + */ + action_GETACCESSRIGHTS(action, payload, reqid){ + if(!this.accessRights.canDo(this.roles, 'reloadAccessRights')) { + this.sendErr(action, 'Unauthorized action !', reqid); + return + } + + var reply = { + 'action': action, + 'success': true, + 'payload': this.wssSrv.wssGatewayConfig.accessRights + }; + if(reqid) reply.reqid = reqid; + this.send(JSON.stringify(reply)); + }, + + /* Request: + { + "action": "GETCONFIG" + } + Reply: + { + "success": true, + "reqid": "6az5e4r6a", + "payload": { the config } + } + */ + action_GETCONFIG(action, payload, reqid){ + if(!this.accessRights.canDo(this.roles, 'reloadAccessRights')) { + this.sendErr(action, 'Unauthorized action !', reqid); + return + } + + var reply = { + 'action': action, + 'success': true, + 'payload': this.wssSrv.wssGatewayConfig + }; + if(reqid) reply.reqid = reqid; + this.send(JSON.stringify(reply)); + }, + + + /* Request: + { + "action": "GETPROCESSINFO" + } + Reply: + { + "success": true, + "reqid": "6az5e4r6a", + "payload": { the result of redis INFO command } + } + */ + action_GETPROCESSINFO(action, payload, reqid){ + if(!this.accessRights.canDo(this.roles, 'getProcessInfo')) { + this.sendErr(action, 'Unauthorized action !', reqid); + return + } + + var reply = { + 'action': action, + 'success': true, + 'payload': this.rediscnx.getProcessInfo + }; + if(reqid) reply.reqid = reqid; + this.send(JSON.stringify(reply)); + }, + + action_REDPILL(action, payload, reqid){ + if(!this.accessRights.canDo(this.roles, 'REDPILL', this.userId)) { + this.sendErr(action, 'Unauthorized action', reqid); + return; + }; + + if(!this.rediscnx.redPillsUuids.includes(this.uuid)) { + this.rediscnx.redPillsUuids.push(this.uuid) + setTimeout(() => { + if(this.rediscnx.redPillsUuids.includes(this.uuid)){ // could have been removed meanwhile & splice(-1) would remove the last !!! + this.rediscnx.redPillsUuids.splice(this.rediscnx.redPillsUuids.indexOf(this.uuid),1) + } + let reply = { + 'action': 'BLUEPILL', + 'payload': {}, + 'success': true, + }; + this.send(JSON.stringify(reply)); + }, 600000) // Back to blupill after 10min + } + + let reply = { + 'action': action, + 'payload': {}, + 'success': true, + }; + if(reqid) reply.reqid = reqid; + this.send(JSON.stringify(reply)); + }, + + action_BLUEPILL(action, payload, reqid){ + if(!this.accessRights.canDo(this.roles, 'BLUEPILL', this.userId)) { + this.sendErr(action, 'Unauthorized action', reqid); + return; + }; + + if(this.rediscnx.redPillsUuids.includes(this.uuid)) { + this.rediscnx.redPillsUuids.splice(this.rediscnx.redPillsUuids.indexOf(this.uuid),1) + } + + let reply = { + 'action': action, + 'payload': {}, + 'success': true, + }; + if(reqid) reply.reqid = reqid; + this.send(JSON.stringify(reply)); + }, + +} \ No newline at end of file diff --git a/configHelper.js b/configHelper.js new file mode 100644 index 0000000..a993140 --- /dev/null +++ b/configHelper.js @@ -0,0 +1,97 @@ +const Ajv = require("ajv") +const confSchema = require('./configSchema.json') + +let DynamoDBClient, GetItemCommand, marshall, unmarshall + +module.exports = class configHelper { + + constructor(options){ + this.config = {} + if(options.onAws) { + DynamoDBClient = require("@aws-sdk/client-dynamodb").DynamoDBClient + GetItemCommand = require("@aws-sdk/client-dynamodb").GetItemCommand + marshall = require("@aws-sdk/util-dynamodb").marshall + unmarshall = require("@aws-sdk/util-dynamodb").unmarshall + this. dynamoClient = new DynamoDBClient({ region: options.awsRegion }) + + this.awsTable = options.awsTable + this.awsServiceName = options.awsServiceName + this.fetchConfig = this.fetchConfigDynamo + this.refreshAccessRights = this.refreshAccessRightsDynamo + } else { + this.localfile = options.localfile + this.fetchConfig = this.fetchConfigFile + this.refreshAccessRights = this.refreshAccessRightsFile + } + const ajv = new Ajv({ + allowUnionTypes: true + }) + this.configValidator = ajv.compile(confSchema) + } + + isValidConfig(conf){ + if(this.configValidator(conf)) return(true) + console.error('Invalid configuration: ', this.configValidator.errors) + return(false) + } + + async dynamoGet(subKey = null){ + let params = new GetItemCommand({ + TableName: this.awsTable, + Key: { + serviceName: marshall(this.awsServiceName), + }, + }) + + let result = await this.dynamoClient.send(params) + let obj = unmarshall(result.Item) + if((typeof(subKey)=='string') && (subKey in obj)) return(obj[subKey]) + return(obj) + } + + async fetchConfigDynamo(){ + let curConfig = this.config + this.config = await this.dynamoGet() + + if(process.env.UNSECURE) this.config.server.unsecure = Boolean(process.env.UNSECURE); + if(process.env.REDIS_HOST) this.config.redis.host = process.env.REDIS_HOST; + if(process.env.REDIS_TLS) this.config.redis.tls = Boolean(process.env.REDIS_TLS); + if(process.env.REDIS_PORT) this.config.redis.port = Number(process.env.REDIS_PORT); + if(process.env.REDIS_USER) this.config.redis.user = process.env.REDIS_USER; + if(process.env.REDIS_PASSWORD) this.config.redis.pass = process.env.REDIS_PASSWORD; + + if(this.isValidConfig(this.config)) return(this.config) + console.error(this.isValidConfig.errors) + //revert if invalid conf + this.config = curConfig + } + + async fetchConfigFile(){ + let curConfig = this.config + this.config = await require(this.localfile) + if(this.isValidConfig(this.config)) return(this.config) + console.error(this.isValidConfig.errors) + //revert if invalid conf + this.config = curConfig + } + + async refreshAccessRightsDynamo(){ + let ar = await this.dynamoGet('accessRights') + this.config.accessRights = ar + } + + async refreshAccessRightsFile(){ + delete require.cache[require.resolve(this.localfile)] + let tmp + try { tmp = require(this.localfile) } + catch(err) { + console.error('Error Reloading config !! (bad json?) => Keeping current accessRights !') + return + } + if(!tmp.accessRights) { + console.error('Error Reloading config !! (no accessRights !) => Keeping current accessRights !') + return + } + this.config.accessRights = tmp.accessRights + } +} diff --git a/configSchema.json b/configSchema.json new file mode 100644 index 0000000..c8d866e --- /dev/null +++ b/configSchema.json @@ -0,0 +1,159 @@ +{ + "$id": "https://nicsys.eu/wssGW-config.schema.json", + "title": "wssGW Configuration", + "description": "wssGateway is one of the components od Myeic's bus.", + "type": "object", + "properties": { + "serviceName": { + "type": "string" + }, + "accessRights": { + "type": "array", + "items": { + "type": "object", + "properties": { + "roles": { + "type": [ + "string", + "array" + ], + "items": { + "type": "string" + } + }, + "canPublish": { + "type": "array", + "items": { + "type": "string" + } + }, + "canSubscribe": { + "type": "array", + "items": { + "type": "string" + } + }, + "mustSubscribe": { + "type": "array", + "items": { + "type": "string" + } + }, + "canGet": { + "type": "array", + "items": { + "type": "string" + } + }, + "canSet": { + "type": "array", + "items": { + "type": "string" + } + }, + "canDo": { + "type": "array", + "items": { + "type": "string" + } + } + }, + "required": [ + "roles", + "canPublish", + "canSubscribe", + "mustSubscribe" + ] + } + }, + "redis": { + "type": "object", + "properties": { + "host": { "type": "string" }, + "tls": { "type": "boolean" }, + "port": { "type": "integer" }, + "user": { "type": "string" }, + "pass": { "type": "string" }, + "authTokenPrefix": { "type": "string" }, + "basePrefix": { "type": "string" }, + "challengePrefix": { "type": "string" }, + "historizeMax": { "type": "integer" }, + "historizePrefix": { "type": "string" }, + "platformStateKey": { "type": "string" }, + "storeMaxSize": { "type": "integer" }, + "storePrefix": { "type": "string" }, + "historizeChannels": { + "type": "array", + "items": { + "type": "string" + } + } + }, + "required": [ + "authTokenPrefix", + "basePrefix", + "challengePrefix", + "historizeChannels", + "historizeMax", + "historizePrefix", + "platformStateKey", + "storeMaxSize", + "storePrefix" + ] + }, + "server": { + "type": "object", + "properties": { + "certFile": { "type": "string" }, + "certKeyFile": { "type": "string" }, + "listenHost": { "type": "string" }, + "listenPort": { "type": ["integer","null"] }, + "listenPath": { "type": "string" }, + "unsecure": { "type": "boolean" }, + "challengeExpiration": { "type": "integer" }, + "healthCheckPath": { "type": "string" }, + "keepAliveInterval": { "type": "string" }, + "keepAliveTimeout": { "type": "string" }, + "systemChannels": { + "type": "object", + "properties": { + "onlineUsers": { + "type": "string" + } + }, + "required": [ + "onlineUsers" + ] + } + }, + "required": [ + "challengeExpiration", + "healthCheckPath", + "keepAliveInterval", + "keepAliveTimeout", + "listenHost", + "listenPath", + "listenPort", + "systemChannels", + "unsecure" + ], + "if": { + "properties": { + "unsecure": { "const": "false" } + } + }, + "then": { + "required": [ + "certFile", + "certKeyFile" + ] + } + } + }, + "required": [ + "serviceName", + "accessRights", + "redis", + "server" + ] +} \ No newline at end of file diff --git a/package.json b/package.json new file mode 100644 index 0000000..4dd59c8 --- /dev/null +++ b/package.json @@ -0,0 +1,27 @@ +{ + "name": "msgbus", + "version": "3.4.6", + "description": "Websocket-Redis Message Bus Gateway", + "main": "wssGateway.js", + "scripts": { + "test": "jest --verbose" + }, + "repository": { + "type": "git", + "url": "wssGateway" + }, + "author": "Nike", + "license": "ISC", + "dependencies": { + "ajv": "^8.12.0", + "cookie-parser": "^1.4.6", + "pm2": "^5.2.0", + "redis": "^4.3.0", + "urldecode": "^1.0.1", + "ws": "^8.8.1", + "yargs": "^17.5.1" + }, + "devDependencies": { + "jest": "^29.7.0" + } +} diff --git a/redisConnexion.js b/redisConnexion.js new file mode 100644 index 0000000..8dafbe5 --- /dev/null +++ b/redisConnexion.js @@ -0,0 +1,254 @@ +const redis = require('redis'); + +module.exports = class RedisConnexion { + + constructor(options) { + this.config = options.config; + this.debug = options.debug; + + this.subscriptions = {}; // Externally fed + this.wssConnections = {}; // Externally fed + this.redPillsUuids = []; // Externally fed + + this.redisClient = redis.createClient({ + socket: { + tls: this.config.redis.tls, + host: this.config.redis.host, + port: this.config.redis.port + } + }); + + this.redisSubscriber = null; + this.redisClient.on('error', (err) => { + console.error('Redis error: ', err); + }); + + if(this.debug) console.log('Redis started...') + } + + getSubscribedUuids(chan){ + let uuids = [] + let re + for(let subChan in this.subscriptions){ + if(subChan==chan){ + uuids = [...uuids, ...this.subscriptions[subChan]] + } else { + re = new RegExp('^'+subChan.replace(/\*/g,'(.+)')+'$','g') + if(chan.match(re)!=null){ + uuids = [...uuids, ...this.subscriptions[subChan]] + } + } + } + return(uuids) + } + + async redisLogin(){ + if(this.debug) console.log(`Connecting to Redis (${this.config.redis.host}:${this.config.redis.port}, tls:${this.config.redis.tls?'yes':'no'})...`); + await this.redisClient.connect(); + if(this.debug) console.log('Connected to Redis !'); + if(this.config.redis.user) { + await this.redisClient.sendCommand(['AUTH', this.config.redis.user, this.config.redis.pass]); + if(this.debug) console.log('Logged into Redis !'); + } else { + if(this.debug) console.log('Connected (anon) to Redis...'); + } + if(this.debug) { + var redisTime = await this.redisClient.time(); + console.log('Redis time:', redisTime); + } + } + + async redisChansStart(){ + this.redisSubscriber = this.redisClient.duplicate(); + await this.redisSubscriber.connect(); + if(this.config.redis.user) { + await this.redisSubscriber.sendCommand(['AUTH', this.config.redis.user, this.config.redis.pass]); + } + this.redisSubscriber.pSubscribe(this.config.redis.basePrefix + '*', this.redisReceive.bind(this)); + if(this.debug) console.log('PSubscription OK ', this.config.redis.basePrefix + '*'); + } + + async redisSubscribe(chanName, callBack){ + if(!chanName.startsWith(this.config.redis.basePrefix)) chanName = this.config.redis.basePrefix + chanName + await this.redisSubscriber.subscribe(chanName, callBack); + } + + async redisPublish(chanName, msg){ + if(typeof (msg) != 'string') msg = JSON.stringify(msg); + if(!chanName.startsWith(this.config.redis.basePrefix)) chanName = this.config.redis.basePrefix + chanName + await this.redisClient.publish(chanName, msg); + } + + async redisRefreshSession(k){ + await this.redisClient.expire(k, this.config.server.sessionExpiration); + } + + async redisSet(k, v, exp = 0, customPrefix=null){ + if(typeof(v) != 'string') v = JSON.stringify(v); + if(customPrefix!==null) k = customPrefix + k + else if(!k.startsWith(this.config.redis.basePrefix)) k = this.config.redis.basePrefix + k + if(this.debug) console.log('Redis SET ', k); + try { await this.redisClient.set(k, v) } + catch(err) { console.error('Redis crash doing Redis set: ', k, v) } + if(exp > 0) { + try { await this.redisClient.expire(k, exp) } + catch(err) { console.error('Redis crash doing Redis expire: ', k, exp) } + } + } + + async redisGet(k, customPrefix=null){ + if(customPrefix!==null) k = customPrefix + k + else if(!k.startsWith(this.config.redis.basePrefix)) k = this.config.redis.basePrefix + k + if(this.debug) console.log('Redis GET ', k) + let v=null + try { v = await this.redisClient.get(k) } + catch(err) { console.error('Redis crash doing Redis get: ', k) } + return(v); + } + + async redisDel(k, customPrefix=null){ + if(customPrefix!==null) k = customPrefix + k + else if(!k.startsWith(this.config.redis.basePrefix)) k = this.config.redis.basePrefix + k + if(this.debug) console.log('Deleting ', k); + await this.redisClient.del(k); + } + + + async redisGetTtl(k, customPrefix=null){ + if(customPrefix!==null) k = customPrefix + k + else if(!k.startsWith(this.config.redis.basePrefix)) k = this.config.redis.basePrefix + k + if(this.debug) console.log('Redis Get TTL ', k) + let v=null + try { v = await this.redisClient.ttl(k) } + catch(err) { console.error('Redis crash doing Redis ttl: ', k) } + return(v); + } + + async redisSetTtl(k, ttl, customPrefix=null){ + if(customPrefix!==null) k = customPrefix + k + else if(!k.startsWith(this.config.redis.basePrefix)) k = this.config.redis.basePrefix + k + if(this.debug) console.log('Redis Set TTL ', k); + try { await this.redisClient.expire(k, ttl) } + catch(err) { console.error('Redis crash doing Redis expire: ', k, ttl) } + } + + async redisXadd(streamName, kvObj, max = ''){ + if(!streamName.startsWith(this.config.redis.basePrefix)) streamName = this.config.redis.basePrefix + streamName + if(this.debug) console.log('Redis XADD ', streamName, kvObj); + let arr = ['XADD', streamName] + if(max != '') arr = [...arr, ...['MAXLEN', '~', (1*max).toString()]] + arr.push('*') + let payload = '""' + try{ payload = JSON.stringify(kvObj) } + catch(e) { console.warn('cannot historize bad json: ',kvObj) } + arr.push('streamData') + arr.push(payload) + let sid = null + try { sid = await this.redisClient.sendCommand(arr); } + catch(err) { console.error('Redis crash doing Redis command: ', arr, err) } + return(sid); + } + + async redisXrange(streamName, start = '-', end = '+', withPayload = true){ + if(!streamName.startsWith(this.config.redis.basePrefix)) streamName = this.config.redis.basePrefix + streamName + if(this.debug) console.log('Redis XRANGE ', streamName); + if(typeof(start)!='string') start = start.toString() + if(typeof(end)!='string') end = end.toString() + let arr = ['XRANGE', streamName, start, end]; + let res = [] + try { res = await this.redisClient.sendCommand(arr) } + catch(err) { console.error('Redis crash doing Redis command: ', arr, err.msg) } + + if(withPayload){ + let o = {}; + for (let row of res) { // We'll take only the first content of the stream (the value of the key 'streamdata') + let payload = '' + try{ payload = JSON.parse(row[1][1]) } + catch(e) { console.warn('cannot unhistorize bad json: ',row[1][1]) } + o[row[0]] = payload + } + return(o); + } else { + return(res.map(row => row[0])) + } + } + + isHistorizedChan(chan){ + if(!chan.startsWith(this.config.redis.basePrefix)) chan = this.config.redis.basePrefix + chan + var matches = this.config.redis.historizeChannels.filter((e) => { + if(!e.startsWith(this.config.redis.basePrefix)) e = this.config.redis.basePrefix + e + if(e.indexOf('*') > -1) { + let r = new RegExp('^'+e.replace(/\*/g,'(.+)')+'$','g') + return(chan.match(r) != null); + } else return(chan == e); + }); + return(matches.length > 0); + } + + async getProcessInfo(){ + if(this.debug) console.log('Redis NIFO '); + try { + const rawInfo = await this.redisClient.INFO() + let arr = rawInfo.replace(/\r/g,'').split('\n') + arr = arr.filter(item => (item.trim()!='') && (!item.trim().startsWith('#'))) + let infoObject = arr.reduce((acc, val) => { kv = val.split(':',2); if(kv.length>0){ acc[kv[0]] = kv[1] } return(acc) }, {}) + } + catch(err) { console.error('Redis crash doing Redis INFO: ') } + return(infoObject) + } + + async redisReceive(msg, chan){ + if(this.debug) console.log('From Redis chan:', chan, msg); + + try { + msg = JSON.parse(msg); + } catch { + console.warn(`Ignoring non-json on channel ${chan} : ${msg}`); + return; + } + + if(this.redPillsUuids.length>0) { // Any dev bus console in RedPills (promiscuous) mode ? + if(this.debug) console.log(`Will send to ${this.redPillsUuids.length} REDPILLS`); + let shortChan = chan.startsWith(this.config.redis.basePrefix) ? chan.substr(this.config.redis.basePrefix.length) : chan + let payload ={ + 'event': 'REDISMSG', + 'payload': { + 'bmsg':{ // Extra encapsulation to avoid triggering normal listeners on FE + 'msg': msg, + 'chan': shortChan, + } + } + } + for(var uuid of this.redPillsUuids) { + if(uuid in this.wssConnections) { + this.wssConnections[uuid].send(JSON.stringify(payload)); + } + } + } + + if(this.debug) console.log('will now fanout...', chan, msg); + const uuids = this.getSubscribedUuids(chan) + if(uuids.length>0) { // Anyone interested at all about this chan ? + if(this.debug) console.log(`Will broadcast to ${uuids.length} web clients`); + let shortChan = chan.startsWith(this.config.redis.basePrefix) ? chan.substr(this.config.redis.basePrefix.length) : chan + let payload ={ + 'event': 'REDISMSG', + 'payload': { + 'msg': msg, + 'chan': shortChan, + + } + } + for(var uuid of uuids) { + if(uuid in this.wssConnections) { + if(this.debug) console.log('Sending to ', uuid, payload); + this.wssConnections[uuid].send(JSON.stringify(payload)); + } + } + } + } + +} + + +// redis-cli -h redis.backend.eismea.eu --tls --user default diff --git a/startWssGw.sh b/startWssGw.sh new file mode 100755 index 0000000..1d92a25 --- /dev/null +++ b/startWssGw.sh @@ -0,0 +1,13 @@ +#!/bin/sh + +cd /opt/WssGateway/ + +pid=`ps -ef | grep wssGateway |grep -v grep | awk '{print $2}'` +if [ -z "$pid" ] +then + node wssGateway.js > wssGateway.log 2>&1 & +else + echo '' + echo 'Already running PID='"$pid"' (use stopWssGw.sh to stop it)' + echo '' +fi diff --git a/stopWssGw.sh b/stopWssGw.sh new file mode 100755 index 0000000..25c9622 --- /dev/null +++ b/stopWssGw.sh @@ -0,0 +1,11 @@ +#!/bin/sh + +pid=`ps -ef | grep wssGateway.js |grep -v grep | awk '{print $2}'` +if [ -n "$pid" ] +then + echo "killing pid: $pid" + kill -9 $pid +fi + + + diff --git a/tests/accessRights.test.js b/tests/accessRights.test.js new file mode 100644 index 0000000..666a548 --- /dev/null +++ b/tests/accessRights.test.js @@ -0,0 +1,164 @@ +const AccesRights = require('../accesRights') +let accessRights = new AccesRights({ + "accessRights": [ + { + "canDo": [ + "getUserStatus" + ], + "canGet": [ + "[UID]:userPrefs", + "chat:friends", + "collaborative:chalkboardUsers" + ], + "canPublish": [ + "chat:*_*", + "collaborative:*", + "peer2peer:*", + "rendezVous:*" + ], + "canSet": [ + "[UID]:userPrefs", + "chat:friends", + "collaborative:chalkboardUsers" + ], + "canSubscribe": [ + "chat:*_*", + "collaborative:*", + "peer2peer:*" + ], + "mustSubscribe": [ + "rendezVous:[UID]", + "system:notifs", + "system:notifs:[UID]" + ], + "roles": "*" + }, + { + "canDo": [ + "getActiveUsers", + "killSessions", + "reloadAccessRights", + "setPlatformState", + "getPlatformState" + ], + "canPublish": [ + "system:notifs", + "system:notifs:*", + "infraNotifs:httpGateway", + "dataSync:testNike", + "infraNotifs:midas", + "services:*" + ], + "canSubscribe": [ + "busconsole:redpill" + ], + "mustSubscribe": [ + "system:adminNotifs", + "system:notifs:[UID]" + ], + "roles": [ + "EIC_Admin", + "EIC_Dev" + ] + }, + { + "canPublish": [ + "system:notifs" + ], + "canSubscribe": [ + ], + "mustSubscribe": [ + ], + "roles": [ + "BP_PO", + "BP_SPOC_*", + "BP_ADMIN" + ] + }, + { + "canPublish": [ + "collaborative:shortprop_*" + ], + "canSubscribe": [ + "collaborative:shortprop_*" + ], + "mustSubscribe": [ + ], + "roles": [ + "ORG_MEMBER" + ] + }, + { + "canDo": [ + "REDPILL", + "BLUEPILL" + ], + "canPublish": [ + ], + "canSubscribe": [ + ], + "mustSubscribe": [ + ], + "roles": [ + "EIC_Dev" + ], + "uuids": [ + "steinic" + ] + } + ], + "historize":{ + "historizeChannels": [ "dataSync:organisationUpdates" ], + "historizeMax": 1000, + "historizePrefix": "histoChan:" + }, + "actions":{ + "gatewayActionsChannel": "infraNotifs:httpGateway", + "gatewayActionsReply": "system:notifs:[UID]" + }, + "redis":{ + "basePrefix": "messageBus:", + "authTokenPrefix": "authorizer:message_bus_user_" + } +}, false) + + + +test('mustSubscribe', () => { + expect(accessRights.mustSubscribe('anon1', ['ANON1'])).toEqual(["rendezVous:anon1", "system:notifs", "system:notifs:anon1"]); +}); + +test('canSubscribe', () => { + expect(accessRights.canSubscribe('anon1', ['ANON1'], 'toto')).toBe(false); + expect(accessRights.canSubscribe('anon1', ['ANON1'], 'collaborative:642')).toBe(true); +}); + +test('isMandatory', () => { + expect(accessRights.isMandatory('anon1', ['ANON1'], 'toto')).toBe(false); + expect(accessRights.isMandatory('anon1', ['ANON1'], 'system:notifs')).toBe(true); +}); + +test('canPublish', () => { + expect(accessRights.canPublish('anon1', ['ANON1'], 'toto')).toBe(false); + expect(accessRights.canPublish('', ['ANON1'], 'collaborative:642')).toBe(true); + expect(accessRights.canPublish('', ['ANON1'], 'collaborative:642')).toBe(true); + expect(accessRights.canPublish('', ['ANON1'], 'collaborative:642')).toBe(true); +}); + +test('canDo', () => { + expect(accessRights.canDo(['TEST1', 'TEST2'], 'getActiveUsers')).toBe(false); + expect(accessRights.canDo(['EIC_Dev', 'TEST2'], 'killSessions')).toBe(true); + expect(accessRights.canDo(['EIC_Dev', 'TEST2'], 'crashplatform')).toBe(false); + expect(accessRights.canDo(['EIC_Dev'], 'reloadAccessRights')).toBe(true); + //expect(accessRights.canDo(['marklogic', 'smed'], 'TIME')).toBe(true); + + expect(accessRights.canDo(['PROJECT_Hack'], 'REDPILL')).toBe(false); + expect(accessRights.canDo(['PROJECT_Hack'], 'REDPILL', 'steinic')).toBe(false); + expect(accessRights.canDo(['EIC_Dev'], 'REDPILL')).toBe(false); + expect(accessRights.canDo(['EIC_Dev'], 'REDPILL', 'pirate')).toBe(false); + expect(accessRights.canDo(['EIC_Dev'], 'REDPILL', 'steinic')).toBe(true); + expect(accessRights.canDo(['EIC_Dev'], 'BLUEPILL', 'pirate')).toBe(false); + expect(accessRights.canDo(['EIC_Dev'], 'BLUEPILL', 'steinic')).toBe(true); +}); + + diff --git a/wssConnexion.js b/wssConnexion.js new file mode 100644 index 0000000..2cc1836 --- /dev/null +++ b/wssConnexion.js @@ -0,0 +1,290 @@ +const crypto = require('crypto') +const gatewayActions = require('./actions') + +module.exports = class WssConnexion { + + constructor(options){ + Object.assign(this, gatewayActions.methods) + + this.config = options.config; + this.socket = options.socket; + this.req = options.req; + this.uuid = options.uuid; + this.wssSrv = options.wssSrv; + this.debug = options.debug; + this.rediscnx = options.rediscnx; + this.roles = [] + this.accessRights = options.accessRights; + this.userId = ''; + this.sessionID = null // null until login + + this.subscriptions = []; + this.usersWatched = []; + + this.socket.on('close', this.onClose.bind(this)); + this.socket.on('message', this.receive.bind(this)); + + if(this.debug) console.log(`Spawned new WSS connection to client: ${this.req.socket.remoteAddress}:${this.req.socket.remotePort}`) + + } + + + doLogin(){ + this.cnxState = 'LOGIN' // then CONNECTED + this.challenge = crypto.randomUUID() + this.challengeTimeout = setTimeout(() => { + if(this.debug) console.warn(`Timeout waiting for login response for UUID ${this.uuid}, closing connection !`); + this.close() + }, this.config.server.challengeExpiration*1000) + this.send(JSON.stringify({ + 'action': 'LOGIN', + 'challenge': this.challenge + })); + if(this.debug) console.log(`Sent LOGIN for UUID ${this.uuid} ==> challenge=${this.challenge}`) + return(new Promise((resolve, reject) => { + this.resolveLogin = resolve + })) + } + + welcome(){ + clearTimeout(this.challengeTimeout) + this.challengeTimeout = null + this.cnxState = 'CONNECTED' + if(this.debug) console.log(`Welcome to UUID ${this.uuid}`) + this.resolveLogin() + } + + async checkLogin(userInfo, otp){ + if(!this.config.server.devotpToken){ + let rawPayload = await this.rediscnx.redisGet(userInfo, this.config.redis.authTokenPrefix) + let payload = JSON.parse(rawPayload) + if(this.debug) console.log(`Got a token from Redis for ${userInfo} => ${JSON.stringify(payload)}`) + if((!payload) || (!payload.token) || (!payload.roles)) return(false) // Redis/sessions issues : don't crash the daemon ! + this.token = payload.token + this.roles = payload.roles + this.sessionID = payload.sessionID || '!!no sessionID in the token key in Redis!!' + } else { + this.token = this.config.server.devotpToken + this.roles = ['EIC_Dev'] + this.sessionID = this.config.server.devotpToken + } + + let data = new TextEncoder().encode(this.token+this.challenge) + let bytesBuf = await crypto.subtle.digest("SHA-512", data) + let arrayBuf = Array.from(new Uint8Array(bytesBuf)) + let goodOTP = arrayBuf.map((b) => b.toString(16).padStart(2, "0")).join("") + if(this.debug) console.log(`Checking challenge-response (token=${this.token}): ${otp} ?? ${goodOTP}`) + return(otp == goodOTP) + } + + startKeepAlive() { + if(this.config.server.keepAliveInterval>0) { + if(this.config.server.keepAliveInterval >= (1.5*this.config.server.keepAliveTimeout)) { + this.keepAliveNextTimeout = setTimeout(this.keepAlive.bind(this),this.config.server.keepAliveInterval*1000); + } else { + console.warn('keepAliveTimeout should be at least 1.5 x keepAliveInterval ! Ignoring Keepalive!'); + } + } + } + + async receive(data) { //receive from Websocket + try{ + var pdata = JSON.parse(data); + var action = pdata.action; + var payload = ('payload' in pdata) ? pdata.payload : null; + var reqid = ('reqid' in pdata) ? pdata.reqid.substr(0,50) : null; + } catch (err){ + if(this.debug) console.warn(`Malformed json for uuid ${this.uuid}\n${data}`); + return; + } + + if(this.cnxState == 'LOGIN'){ + if((action=='LOGIN') && pdata.userInfo && pdata.otp) { + if(this.debug) console.log(`received login response : user=${pdata.userInfo} otp=${pdata.otp}`) + if(await this.checkLogin(pdata.userInfo, pdata.otp)) { + this.userId = pdata.userInfo + this.welcome() + } else { + if(this.debug) console.warn(`Bad OTP response to login request for uuid ${this.uuid}`); + this.send(JSON.stringify({ + 'action': 'LOGIN', + 'logged': false + })); + this.close() + } + } else { + if(this.debug) console.warn(`Invalid response to login request for uuid ${this.uuid}`,pdata); + } + } else { + if(typeof this['action_'+action] == "function") { + if((this.debug) && (action != 'PONG')) console.warn(`${action} for uuid ${this.uuid}`); + this['action_'+action](action, payload, reqid); + } else { + if(this.debug) console.warn(`Unknown action ${action} for UUID ${this.uuid}`); + } + } + } + + send(data) { // Send to Websocket + this.socket.send(data); + } + + + keepAlive(){ + this.send(JSON.stringify({ + 'action': 'PING', + })); + this.keepAliveBomb = setTimeout(this.MissedKeepAlive.bind(this), (this.config.server.keepAliveTimeout+1)*1000); + } + + MissedKeepAlive(){ + if(this.debug) console.warn(`Keep-alive timeout for UUID ${this.uuid}, closing connection !`); + this.socket.close(); + } + + close() { + if(this.socket) this.socket.close() + } + + onClose() { + if(this.debug) console.log(`UUID ${this.uuid} disconnected !`); + clearTimeout(this.keepAliveNextTimeout); + this.clearAllSubscriptions(); + this.wssSrv.cleanupConnexion(this.uuid, this.userId); + // Suicide and leave my body to GC (Carefull if you added other references to me from outside !!) + // Also think about all possibly active bind(this), which -I guess- also make references preventing GC. + } + + async getAwaitingNotifs(){ + //TODO : AWAIT this from either Redis and/or ML + + // Key: notif destination module, value: either KV with V=nb of notifs, or Array whose length is nb of notifs + let notifs = { // TEST EXAMPLE + "unreadChats": { + "chan001" : 2, + "chan002" : 10, + "chan003" : 7, + }, + "OTS": [ "fallimi", "infosca" ], + "OtherNotifDest" : [] + }; + + return(notifs); + } + + subscribeMandatoryChans(){ + let mandaChans = this.accessRights.mustSubscribe(this.userId, this.roles) + mandaChans.push('userchans:'+this.userId); // Add user private chan + + mandaChans = mandaChans.map(item=>this.config.redis.basePrefix+item) + + for(var chan of mandaChans){ + if(!(chan in this.rediscnx.subscriptions)) this.rediscnx.subscriptions[chan] = []; + if(this.subscriptions.indexOf(chan)<0) { + this.subscriptions.push(chan); + this.rediscnx.subscriptions[chan].push(this.uuid); + } + } + this.action_SUBLST('SUBLST', null, ''); + } + + clearAllSubscriptions(){ + for(var chan of this.subscriptions){ + if(chan in this.rediscnx.subscriptions) { + this.rediscnx.subscriptions[chan].splice(this.rediscnx.subscriptions[chan].indexOf(this.uuid), 1) ; + } + } + this.subscriptions = []; + } + + sendErr(action, msg, reqid){ + this.send(JSON.stringify({ + 'action': action, + 'success': false, + 'err': msg, + 'reqid': reqid, + })); + } + + updateOnlineUsers(onlineUsers){ //Helper for our parent, triggered from above on new/lost connexion + onlineUsers = Object.keys(onlineUsers); + let reply = { + 'action': 'ISONLINE', + 'payload': onlineUsers.filter((x) => (this.usersWatched.indexOf(x)>-1)), + 'success': true, + }; + this.send(JSON.stringify(reply)); + } +} + +/* PROTOCOL + Application-level payloads are always JSON and always either an action, or an event : + + 1. ACTIONS : are made for request-reply. + They are aimed at the dialogue between the FE (mainly messageBus core modules) and WSSGateway. + These messages are identified by the fact there is an "action" key, top level. + + Example : The FE asks WSSGateway to subscribe to Redis chans : + Request: + { "action" : "SUB", // Must be a valid wssGateway action. + "payload": ["chan1", "chan2"], // Any type required by the action + "reqid": "987654321-abcdef-123456" + } + + Reply: + { "action" : "SUB", + "payload": ["chan1", "chan9"], // probably you were already subscribed to chan9, + "reqid": "987654321-abcdef-123456" // don't have to right to chan2, but succeeded subscribing to chan1 + } + + Newton principle applied to WSSG: + When there is an action in one direction (request), + there is the same action in the opposite direction (reply). + + When doing a request, the FE can optionally include a "reqid", with a uuid. + It then has the guarnatee that the corresponding reply will contain the same reqid. + As you can receive a reply on a particular action in any number, at any time, + this allows the FE to match one specific action request with its specific reply. + This, in turn, allows this module to provide action-promise and action-timeouts. + + 2. EVENTS : are any other events circulating on the bus, thus on a REDIS channel. + They are triggered by another actor on the bus, and have nothing to do with FE-WssGW dialog . + These messages are identified by the fact there is an "event" key, top level. + So far, this core-module has no use of bus-events, they are considered as applicative-level-use only. + Therefore, this module just triggers a corresponding (javascript) event, for any potential listener in the app. + + Example : + { "event" : "PropaSubmitted", // Any applicative thing + "payload": { // Any type depending on applicative convention for this event + "propaNumber": "123456", + "propaAcronym": "Tintin" + } + } + + Will trigger a "MessageBus.PropaSubmitted" javascript event, with + "detail": + { msg: { + event: "PropaSubmitted", + payload: { "propaNumber": "123456", + "propaAcronym": "Tintin" + } + }, + chan: "wssGateway:chan1:subchan2", + }​​​ + + --------------- Low-level, WEBSOCKET --------------- + { "event":"REDISMSG", // low level + "payload":{ // low level + "msg":{ // low level + "eventType":"PropaSubmitted", // APP LEVEL MESSAGE = Redis payload + "payload":{ // APP LEVEL MESSAGE = Redis payload + "propaNumber": "123456", // APP LEVEL MESSAGE = Redis payload + "propaAcronym": "Tintin" // APP LEVEL MESSAGE = Redis payload + }, // APP LEVEL MESSAGE = Redis payload + sender: "N007xyz" // APP LEVEL MESSAGE = Redis payload => added by gateways ! + }, // low level + "chan":"wssGateway:chan1:subchan2" // low level = Redis channel + }, + } + +*/ diff --git a/wssGateway.js b/wssGateway.js new file mode 100644 index 0000000..d09b6e6 --- /dev/null +++ b/wssGateway.js @@ -0,0 +1,108 @@ +const yargs = require('yargs') +const ws = require('ws') +const fs = require('fs') +const RedisConnexion = require('./redisConnexion') +const urlparser = require('url'); +const wssServer = require('./wssServer') +const configHelper = require('./configHelper') + + +const argv = yargs.command('wssGateway', 'Redis <=> Websocket message bus gateway', {}) + .options({ + 'argv.debug': { + description: 'shows debug info', + alias: 'd', + type: 'boolean' + }, + }).help().version('1.1').argv + +const debug = Boolean(process.env.DEBUG) || argv.debug + +const cfgh = new configHelper({ + localfile: './wssGatewayConfig.json', + onAws: typeof(process.env.AWS_EXECUTION_ENV)=='string', + awsRegion: 'eu-west-1', + awsTable: 'bus-config', + awsServiceName: 'wssGateway', +}) + +async function startRedis(wssGatewayConfig) { + let REDIScnx = new RedisConnexion({ + debug: debug, + config: wssGatewayConfig, + }); + if(debug) console.log('Starting REDIS...'); + await REDIScnx.redisLogin(); + if(debug) console.log('REDIS Login OK'); + await REDIScnx.redisChansStart(); + if(debug) console.log('REDIS ChansStart OK'); + return (REDIScnx); +} + + +cfgh.fetchConfig().then( wssGatewayConfig => { + + if((!wssGatewayConfig) || (Object.keys(wssGatewayConfig).length<4)) { + console.error('Cannot get a valid configuration ! Aaarrghhh...') + process.exit() + } + let httpLib + if(wssGatewayConfig.server.unsecure) httpLib = require('http') + else httpLib = require('https') + + + /////////////////////// Create & Start servers \\\\\\\\\\\\\\\\\\\\\\ + console.log(`Debug mode : ${debug ? 'ON' : 'OFF'}`) + let options + if(!wssGatewayConfig.server.unsecure) { + options = { + key: fs.readFileSync(wssGatewayConfig.server.certKeyFile), + cert: fs.readFileSync(wssGatewayConfig.server.certFile), + }; + } else options = {} + + const httpRequestsHandler = function(request, res) { + let parsedUrl + try{ + parsedUrl = new urlparser.URL(request.url, `http${wssGatewayConfig.server.unsecure ? '': 's'}://${request.headers.host}`); + } catch(e) { + res.end() + return + } + if(parsedUrl.pathname === wssGatewayConfig.server.healthCheckPath) { + //if(debug) console.log('Got a Health-Check Request') + res.end(JSON.stringify({ + status: 'Healthy!', + })) + } + } + + const HTTPserver = httpLib.createServer(options, httpRequestsHandler) + .listen(Number(wssGatewayConfig.server.listenPort), + wssGatewayConfig.server.listenHost ? wssGatewayConfig.server.listenHost : undefined, + function (req, res) { + console.log(`HTTP${wssGatewayConfig.server.unsecure ? '': 'S'} now listening on ${wssGatewayConfig.server.listenHost}:${wssGatewayConfig.server.listenPort}\n`+ + `Websocket served at ${wssGatewayConfig.server.listenPath}\n`+ + `Healthcheck served at ${wssGatewayConfig.server.healthCheckPath}`) + }); + + // Start serving WSS + const wssServerOptions = { + server: HTTPserver, + path: wssGatewayConfig.server.listenPath, + } + if(!wssGatewayConfig.server.unsecure) { + wssServerOptions['key'] = fs.readFileSync(wssGatewayConfig.server.certKeyFile) + wssServerOptions['cert'] = fs.readFileSync(wssGatewayConfig.server.certFile) + } + const WSSServer = new ws.WebSocketServer(wssServerOptions); + console.log(`WS${wssGatewayConfig.server.unsecure ? '': 'S'} server created for ${wssGatewayConfig.server.listenHost}:${wssGatewayConfig.server.listenPort}`) + + startRedis(wssGatewayConfig).then((rediscnx) => { + if(debug) console.log('Redis started & logged in !'); + const wssSrv = new wssServer(cfgh, WSSServer, rediscnx, debug); + }); + + +}) + diff --git a/wssServer.js b/wssServer.js new file mode 100644 index 0000000..cc04af9 --- /dev/null +++ b/wssServer.js @@ -0,0 +1,110 @@ +const AccesRights = require('./accesRights') +const crypto = require('crypto') +const WssConnexion = require('./wssConnexion') + +module.exports = class wssServer { + + constructor(configHelper, WSSServer, REDIScnx, debug) { + this.debug = debug + if(this.debug) console.log('Starting WSSGateway...') + this.REDIScnx = REDIScnx + this.configHelper = configHelper + this.wssGatewayConfig = configHelper.config + this.AllWssConnections = {} + this.Users2uuids = {} + this.OnlineUsers = new Set() + this.accessRights = new AccesRights(configHelper.config, this.debug) + + WSSServer.on('error', (err) => { + console.warn('wssGateway websocket error:', error) + }); + + WSSServer.on('listening', (e) => { + console.log('wssGateway listening for websockets on ' + configHelper.config.server.listenPort) + }); + + WSSServer.on('connection', this.newWSSConnexion.bind(this)) + + if(this.debug) console.log('WSS Gateway ready...') + } + + newWSSConnexion(socket, req) { + var uuid = crypto.randomUUID(); + var wssCnx = new WssConnexion({ + socket: socket, + req, req, + uuid: uuid, + wssSrv: this, + debug: this.debug, + config: this.wssGatewayConfig, + rediscnx: this.REDIScnx, + accessRights: this.accessRights, + }); + this.AllWssConnections[uuid] = wssCnx; + wssCnx.doLogin().then(() => { // Things to execute only when successfuly logged-in + if(!(wssCnx.userId in this.Users2uuids)) this.Users2uuids[wssCnx.userId] = new Set(); + this.Users2uuids[wssCnx.userId].add(uuid); + this.OnlineUsers.add(wssCnx.userId); + this.REDIScnx.wssConnections[uuid] = wssCnx; + //}).then(() => { + wssCnx.send(JSON.stringify({ + 'action': 'LOGIN', + 'logged': true + })) + this.postLoginActions(wssCnx) + }) + } + + postLoginActions(wssCnx) { + wssCnx.startKeepAlive() + wssCnx.subscribeMandatoryChans() + wssCnx.action_SUBLST('SUBLST', null, '') + this.fanoutOnlineUsers(this.getOnlineUsers()); + } + + cleanupConnexion(uuid, userId) { + delete(this.AllWssConnections[uuid]); + if(userId in this.Users2uuids) { + this.Users2uuids[userId].delete(uuid); + if(this.Users2uuids[userId].size == 0) this.OnlineUsers.delete(userId); + } + delete(this.REDIScnx.wssConnections[uuid]); + this.fanoutOnlineUsers(this.getOnlineUsers()); + } + + fanoutOnlineUsers(onlineUsers) { + for(let uuid in this.AllWssConnections) { + // Normally should not happen as you're added only after login.(newWSSConnexion) + if(this.AllWssConnections.cnxState!='CONNECTED') continue + this.AllWssConnections[uuid].updateOnlineUsers(onlineUsers); + } + } + + userConnected(uid){ + return( this.OnlineUsers.has(uid)) + } + + getOnlineUsers() { + var OnlineUsers = {}; + for(var usr of this.OnlineUsers.values()) { + if(usr in this.Users2uuids) { + OnlineUsers[usr] = this.Users2uuids[usr].size; + } + } + return(OnlineUsers); + } + + sessionConnected(sessionID){ + if(!sessionID) return(false) // If that cnx is not finished login-in + for(let uuid in this.AllWssConnections) { + if(this.AllWssConnections[uuid].sessionID==sessionID) return(true) + } + return(false) + } + + async reloadAccessRights() { + await this.configHelper.refreshAccessRights() + this.wssGatewayConfig.accessRights = this.configHelper.config.accessRights + this.accessRights.refreshAccessRights(this.wssGatewayConfig) + } +} \ No newline at end of file