converting to MP, just config in accessRights left
This commit is contained in:
-167
@@ -1,167 +0,0 @@
|
||||
export const methods = {
|
||||
|
||||
/* Creates (predictable) peer-to-peer chan if necessary, or check for lobby existence, then subscribe to it.
|
||||
Request:
|
||||
{
|
||||
"action": "STARTCHAT",
|
||||
"payload": "P" // or "C" => "P" = Peer-to-peer, "C" = (chat) Channel
|
||||
}
|
||||
reply:
|
||||
{
|
||||
"action": "STARTCHAT",
|
||||
"success": true,
|
||||
"payload" : "dynamically created chan"
|
||||
}
|
||||
*/
|
||||
action_STARTCHAT(action, payload, reqid){
|
||||
if(typeof(payload)!='string'){
|
||||
this.sendErr(action, 'Invalid payload', reqid);
|
||||
return;
|
||||
};
|
||||
let recipientId = payload.substring(2);
|
||||
let chan;
|
||||
if(payload[0]=='P') {
|
||||
chan = (this.userId<recipientId) ? 'userchans:'+this.userId+'|'+recipientId : 'userchans:'+recipientId+'|'+this.userId;
|
||||
} else if(payload[0]=='C') {
|
||||
chan = 'lobbies:'+recipientId;
|
||||
} else {
|
||||
this.sendErr(action, 'Bad chat destination', reqid);
|
||||
return;
|
||||
}
|
||||
// subscribe this connexion
|
||||
if(!(chan in this.rediscnx.subscriptions)) this.rediscnx.subscriptions[chan] = [];
|
||||
if(this.rediscnx.subscriptions[chan].indexOf(this.uuid)<0) {
|
||||
this.rediscnx.subscriptions[chan].push(this.uuid);
|
||||
if(this.debug) console.log('Subscribed to chat chan: ',chan);
|
||||
}
|
||||
// other connexion of this user might be on different chat, so don't subscribe them
|
||||
let reply = {
|
||||
'action': action,
|
||||
'payload': chan,
|
||||
'success': true,
|
||||
};
|
||||
if(reqid) reply.reqid = reqid;
|
||||
this.send(JSON.stringify(reply));
|
||||
},
|
||||
|
||||
/* Send human-message in chat chan.
|
||||
Request:
|
||||
{
|
||||
"action": "SENDCHAT",
|
||||
"payload": "P" // or "C" => "P" = Peer-to-peer, "C" = (chat) Channel
|
||||
}
|
||||
reply:
|
||||
{
|
||||
"action": "SENDCHAT",
|
||||
"success": true,
|
||||
"payload" : null
|
||||
}
|
||||
*/
|
||||
action_SENDCHAT(action, payload, reqid){
|
||||
//TODO: prevent unauthorized recipient !!
|
||||
let recipientId = payload.recipient.substring(2);
|
||||
let chan;
|
||||
payload.event = 'CHATMSG'
|
||||
if(payload.recipient[0]=='P') {
|
||||
chan = (this.userId<recipientId) ? 'userchans:'+this.userId+'|'+recipientId : 'userchans:'+recipientId+'|'+this.userId;
|
||||
} else if(payload.recipient[0]=='C') {
|
||||
chan = 'lobbies:'+recipientId;
|
||||
} else {
|
||||
this.sendErr(action, 'Bad chat destination', reqid);
|
||||
return;
|
||||
}
|
||||
if(this.debug) console.log('Publishing to chat chan: ',chan, payload.msg);
|
||||
|
||||
//NIKE TODO: be coherent, 'sender' is app-level, therefore should be inside payload, not outside !!!
|
||||
// (see remark in Protocol description in FE MessageBus class)
|
||||
|
||||
this.rediscnx.redisPublish(chan, {
|
||||
'sender': this.userId,
|
||||
'msg' : payload.msg
|
||||
});
|
||||
let reply = {
|
||||
'action': action,
|
||||
'payload': null,
|
||||
'success': true,
|
||||
};
|
||||
if(reqid) reply.reqid = reqid;
|
||||
this.send(JSON.stringify(reply));
|
||||
},
|
||||
|
||||
/* Send human-message in chat chan.
|
||||
Request:
|
||||
{
|
||||
"action": "ISONLINE",
|
||||
"payload": "P" // or "C" => "P" = Peer-to-peer, "C" = (chat) Channel
|
||||
}
|
||||
reply:
|
||||
{
|
||||
"action": "ISONLINE",
|
||||
"success": true,
|
||||
"payload" :
|
||||
}
|
||||
*/
|
||||
// You can only ask the satus of a list of usernames you know (and have the right to)
|
||||
action_ISONLINE(action, payload, reqid){
|
||||
//TODO: can you really ask about those users ? (but that might cost too much time, because => ML?)
|
||||
if(!Array.isArray(payload)){
|
||||
this.sendErr(action, 'Invalid payload', reqid);
|
||||
return;
|
||||
};
|
||||
let onlineUsers = Object.keys(this.wssSrv.getOnlineUsers());
|
||||
let reply = {
|
||||
'action': action,
|
||||
'payload': payload.filter((x) => (onlineUsers.indexOf(x)>-1)),
|
||||
'success': true,
|
||||
};
|
||||
if(reqid) reply.reqid = reqid;
|
||||
this.send(JSON.stringify(reply));
|
||||
},
|
||||
|
||||
// Same as ISONLINE, but subscribe to watch changes
|
||||
action_WATCHUSERS(action, payload, reqid){
|
||||
if(!Array.isArray(payload)){
|
||||
this.sendErr(action, 'Invalid payload', reqid);
|
||||
return;
|
||||
}
|
||||
//TODO: can you really ask about those users ? (but that might cost too much time, because => ML?)
|
||||
this.usersWatched = payload;
|
||||
let reply = {
|
||||
'action': action,
|
||||
'payload': null,
|
||||
'success': true,
|
||||
};
|
||||
if(reqid) reply.reqid = reqid;
|
||||
this.send(JSON.stringify(reply));
|
||||
},
|
||||
|
||||
/*
|
||||
Request:
|
||||
{
|
||||
"action": "CHANLST",
|
||||
"payload": { "filter": "ChatRoom*" }
|
||||
}
|
||||
reply:
|
||||
{
|
||||
"action": "CHANLST",
|
||||
"success": true,
|
||||
"payload" : ["ChatRoom1","ChatRoom2","ChatRoom_Experts"]
|
||||
}
|
||||
*/
|
||||
action_CHANLST(action, payload, reqid){
|
||||
//TODO : Filter based on user rights!!
|
||||
//
|
||||
let reply = {
|
||||
'action': action,
|
||||
'payload': [
|
||||
this.config.redis.basePrefix+"onlineUsers",
|
||||
this.config.redis.basePrefix+"system:chan1",
|
||||
this.config.redis.basePrefix+"proposals:updates"
|
||||
],//this.config.redis.watchChannels,
|
||||
'success': true,
|
||||
};
|
||||
if(reqid) reply.reqid = reqid;
|
||||
this.send(JSON.stringify(reply));
|
||||
},
|
||||
|
||||
}
|
||||
+1
-6
@@ -1,17 +1,12 @@
|
||||
import { methods as utilities } from './utilities.js'
|
||||
import { methods as pubSub } from './pubSub.js'
|
||||
import { methods as store } from './store.js'
|
||||
import { methods as sessions } from './sessions.js'
|
||||
import { methods as notifications } from './notifications.js'
|
||||
import { methods as chat } from './chat.js'
|
||||
|
||||
|
||||
export const gatewayActions = {
|
||||
...utilities,
|
||||
...pubSub,
|
||||
...store,
|
||||
...sessions,
|
||||
...notifications,
|
||||
...chat
|
||||
}
|
||||
|
||||
|
||||
|
||||
@@ -1,13 +0,0 @@
|
||||
export const methods = {
|
||||
|
||||
async action_NOTIFS(action, payload, reqid){
|
||||
let reply = {
|
||||
'action': action,
|
||||
'payload': await this.getAwaitingNotifs(),
|
||||
'success': true,
|
||||
};
|
||||
if(reqid) reply.reqid = reqid;
|
||||
this.send(JSON.stringify(reply));
|
||||
},
|
||||
|
||||
}
|
||||
+66
-50
@@ -15,34 +15,35 @@ export const methods = {
|
||||
if(!Array.isArray(payload)){
|
||||
this.sendErr(action, 'Invalid payload', reqid);
|
||||
return;
|
||||
};
|
||||
|
||||
}
|
||||
// payload only accepts NON-Bas-Prefixed chans
|
||||
let subscribed = []
|
||||
for(var chan of payload){
|
||||
if((!chan) || (typeof(chan)!='string')) continue
|
||||
if(!this.accessRights.canSubscribe(this.userId, this.roles, chan)) {
|
||||
if(this.debug) console.log('SUB: No rights to this chan!', this.userId, this.roles, chan)
|
||||
continue
|
||||
}
|
||||
// Chat chans are forbidden here
|
||||
if((chan.substr(0,8) == 'userchans') || (chan.substr(0,9) == 'lobbychans')) continue;
|
||||
|
||||
if(!chan.startsWith(this.config.redis.basePrefix)) chan = this.config.redis.basePrefix + chan
|
||||
if(this.subscriptions.indexOf(chan)<0) {
|
||||
this.subscriptions.push(chan);
|
||||
let coudSubscribe = false
|
||||
for(const rediscnx of this.allRediscnx){
|
||||
if(!chan.startsWith(rediscnx.redisConfig.ChansFilter)) continue
|
||||
else coudSubscribe = true
|
||||
let localChan = rediscnx.redisConfig.basePrefix + localChan
|
||||
if(!(localChan in rediscnx.subscriptions)) rediscnx.subscriptions[localChan] = [];
|
||||
if(!rediscnx.subscriptions[localChan].includes(this.uuid)) {
|
||||
rediscnx.subscriptions[localChan].push(this.uuid);
|
||||
}
|
||||
}
|
||||
if(!(chan in this.rediscnx.subscriptions)) this.rediscnx.subscriptions[chan] = [];
|
||||
if(this.rediscnx.subscriptions[chan].indexOf(this.uuid)<0) {
|
||||
this.rediscnx.subscriptions[chan].push(this.uuid);
|
||||
}
|
||||
}
|
||||
|
||||
let shortChans = this.subscriptions.map(item => (
|
||||
item.startsWith(this.config.redis.basePrefix) ? item.substr(this.config.redis.basePrefix.length) : item
|
||||
))
|
||||
if(coudSubscribe && (!subscribed.includes(chan))) subscribed.push(chan)
|
||||
if(coudSubscribe && (!this.subscriptions.includes(chan)) ){ this.subscriptions.push(chan) }
|
||||
|
||||
}
|
||||
|
||||
let reply = {
|
||||
'action': action,
|
||||
'payload': shortChans,
|
||||
'payload': subscribed,
|
||||
'success': true,
|
||||
};
|
||||
if(reqid) reply.reqid = reqid;
|
||||
@@ -67,29 +68,31 @@ export const methods = {
|
||||
this.sendErr(action, 'Invalid payload', reqid);
|
||||
return;
|
||||
};
|
||||
|
||||
// payload only accepts NON-Bas-Prefixed chans
|
||||
let unSubscribed = []
|
||||
for(var chan of payload){
|
||||
if((!chan) || (typeof(chan)!='string')) continue
|
||||
if(this.accessRights.isMandatory(this.userId, this.roles, chan)) continue
|
||||
|
||||
// Chat chans are forbidden here
|
||||
if((chan.substr(0,8) == 'userchans') || (chan.substr(0,9) == 'lobbychans')) continue;
|
||||
|
||||
if(!chan.startsWith(this.config.redis.basePrefix)) chan = this.config.redis.basePrefix + chan
|
||||
if(this.subscriptions.indexOf(chan)>-1) {
|
||||
this.subscriptions.splice(this.subscriptions.indexOf(chan), 1);
|
||||
let couldUnsubscribe = false
|
||||
for(const rediscnx of this.allRediscnx){
|
||||
if(!chan.startsWith(rediscnx.redisConfig.ChansFilter)) continue
|
||||
else couldUnsubscribe = true
|
||||
let localChan = rediscnx.redisConfig.basePrefix + chan
|
||||
if((localChan in rediscnx.subscriptions) && (rediscnx.subscriptions[chan].includes(this.uuid))) {
|
||||
rediscnx.subscriptions[localChan].splice(rediscnx.subscriptions[localChan].indexOf(this.uuid), 1) ;
|
||||
}
|
||||
}
|
||||
if((chan in this.rediscnx.subscriptions) && (this.rediscnx.subscriptions[chan].indexOf(this.uuid)>-1)) {
|
||||
this.rediscnx.subscriptions[chan].splice(this.rediscnx.subscriptions[chan].indexOf(this.uuid), 1) ;
|
||||
|
||||
if(couldUnsubscribe && (!unSubscribed.includes(chan))) unSubscribed.push(chan)
|
||||
if(couldUnsubscribe && this.subscriptions.includes(chan)) {
|
||||
this.subscriptions.splice(this.subscriptions.indexOf(chan), 1);
|
||||
}
|
||||
}
|
||||
|
||||
let shortChans = this.subscriptions.map(item => (
|
||||
item.startsWith(this.config.redis.basePrefix) ? item.substr(this.config.redis.basePrefix.length) : item
|
||||
))
|
||||
let reply = {
|
||||
'action': action,
|
||||
'payload': shortChans,
|
||||
'payload': unSubscribed,
|
||||
'success': true,
|
||||
};
|
||||
if(reqid) reply.reqid = reqid;
|
||||
@@ -109,12 +112,9 @@ export const methods = {
|
||||
}
|
||||
*/
|
||||
action_SUBLST(action, payload, reqid){
|
||||
let shortChans = this.subscriptions.map(item => (
|
||||
item.startsWith(this.config.redis.basePrefix) ? item.substr(this.config.redis.basePrefix.length) : item
|
||||
))
|
||||
let reply = {
|
||||
'action': action,
|
||||
'payload': shortChans,
|
||||
'payload': this.subscriptions,
|
||||
'success': true,
|
||||
};
|
||||
if(reqid) reply.reqid = reqid;
|
||||
@@ -146,8 +146,7 @@ export const methods = {
|
||||
return;
|
||||
};
|
||||
|
||||
if( (!this.accessRights.canPublish(this.userId, this.roles, payload.chan)) &&
|
||||
(! this.rediscnx.redPillsUuids.includes(this.uuid)) ) {
|
||||
if( (!this.accessRights.canPublish(this.userId, this.roles, payload.chan)) ) {
|
||||
this.sendErr(action, 'Unauthorized chan !', reqid);
|
||||
if(this.debug) console.log('PUB: Unauthorized chan', payload.chan, this.userId, this.roles)
|
||||
return
|
||||
@@ -155,22 +154,33 @@ export const methods = {
|
||||
|
||||
let msgO
|
||||
try { msgO = JSON.parse(payload.msg) } catch(err) { msgO = {'err':err} }
|
||||
msgO.sender = this.userId
|
||||
|
||||
let histId = null
|
||||
if(this.rediscnx.isHistorizedChan(payload.chan)) { // historize first to add the histId
|
||||
let shortChan = payload.chan.startsWith(this.config.redis.basePrefix) ? payload.chan.substr(this.config.redis.basePrefix.length) : payload.chan
|
||||
histId = await this.rediscnx.redisXadd(this.config.redis.basePrefix+this.config.redis.historizePrefix + shortChan, payload.msg, this.config.redis.historizeMax);
|
||||
const chan = payload.chan
|
||||
|
||||
// First find the primary redis for this chan namespace, to do historization first, and get the histId
|
||||
const primaryRediscnx = this.allRediscnx.find(cnx => ((chan.startsWith(cnx.redisConfig.ChansFilter)) &&(cnx.redisConfig.role=='primary')) )
|
||||
if(!primaryRediscnx){
|
||||
this.sendErr(action, 'No primary redis for this chan !', reqid);
|
||||
if(this.debug) console.log('PUB: No primary redis for this chan ', chan)
|
||||
return
|
||||
}
|
||||
if(primaryRediscnx.isHistorizedChan(chan)){
|
||||
histId = await rediscnx.redisXadd(rediscnx.redisConfig.basePrefix+rediscnx.redisConfig.historizePrefix + chan, payload.msg, rediscnx.redisConfig.historizeMax);
|
||||
if( !histId) {
|
||||
this.sendErr(action, 'Could not historize, aborted event publish !', reqid);
|
||||
console.error(`Could not historize for "${shortChan}", aborted event publish !`)
|
||||
console.error(`Could not historize for "${chan}", aborted event publish !`)
|
||||
return
|
||||
}
|
||||
msgO.histId = histId
|
||||
}
|
||||
|
||||
msgO.sender = this.userId
|
||||
try { payload.msg = JSON.stringify(msgO) } catch(err) {payload.msg = `{"err":"${err}}"` }
|
||||
this.rediscnx.redisPublish(payload.chan, payload.msg)
|
||||
// Now publish on every Redis that covers this chan namespace
|
||||
try { payload.msg = JSON.stringify(msgO) } catch(err) {payload.msg = `{"err":"${err}}"` }
|
||||
for(const rediscnx of this.allRediscnx){
|
||||
if(!chan.startsWith(rediscnx.redisConfig.ChansFilter)) continue
|
||||
rediscnx.redisPublish(chan, payload.msg)
|
||||
}
|
||||
|
||||
let reply = {
|
||||
'action': action,
|
||||
@@ -216,22 +226,28 @@ export const methods = {
|
||||
return
|
||||
}
|
||||
|
||||
if( (!this.accessRights.canSubscribe(this.userId, this.roles, payload.channel)) &&
|
||||
(! this.rediscnx.redPillsUuids.includes(this.uuid)) ) {
|
||||
this.sendErr(action, 'Unauthorized channel !', reqid)
|
||||
if( (!this.accessRights.canSubscribe(this.userId, this.roles, payload.channel)) ) {
|
||||
this.sendErr(action, 'CHANHIST: Unauthorized channel !', reqid)
|
||||
return
|
||||
}
|
||||
|
||||
if(!this.rediscnx.isHistorizedChan(payload.channel)){
|
||||
this.sendErr(action, 'Not an historized channel !', reqid)
|
||||
const primaryRediscnx = this.allRediscnx.find(cnx => ((chan.startsWith(cnx.redisConfig.ChansFilter)) &&(cnx.redisConfig.role=='primary')) )
|
||||
if(!primaryRediscnx){
|
||||
this.sendErr(action, 'No primary redis for this chan !', reqid);
|
||||
if(this.debug) console.log('CHANHIST: No primary redis for this chan ', chan)
|
||||
return
|
||||
}
|
||||
|
||||
if(!primaryRediscnx.isHistorizedChan(payload.channel)){
|
||||
this.sendErr(action, 'CHANHIST: Not an historized channel !', reqid)
|
||||
return
|
||||
}
|
||||
|
||||
let from = (payload.from.indexOf('-')>-1) ? payload.from : 1000*payload.from
|
||||
let to = '+'
|
||||
if(payload.to) to = (payload.to.indexOf('-')>-1) ? payload.to : 1000*payload.to
|
||||
let streamName = payload.channel.startsWith(this.config.redis.basePrefix) ? this.config.redis.historizePrefix+payload.channel.substr(this.config.redis.basePrefix.length) : this.config.redis.historizePrefix + payload.channel
|
||||
let respPayload = await this.rediscnx.redisXrange(streamName, from, to);
|
||||
let streamName = payload.channel.startsWith(primaryRediscnx.redisConfig.basePrefix) ? primaryRediscnx.redisConfig.historizePrefix+payload.channel.substr(primaryRediscnx.redisConfig.basePrefix.length) : primaryRediscnx.redisConfig.historizePrefix + payload.channel
|
||||
let respPayload = await primaryRediscnx.redisXrange(streamName, from, to);
|
||||
|
||||
let reply = {
|
||||
'action': action,
|
||||
|
||||
@@ -1,316 +0,0 @@
|
||||
export const methods = {
|
||||
|
||||
/* Request payload : null
|
||||
Reply:
|
||||
{
|
||||
"action": "GETACTIVEUSERS",
|
||||
"payload": [
|
||||
{
|
||||
"uid": "steinic",
|
||||
"email": "Nicolas.STEIN@ext.ec.europa.eu",
|
||||
"given_name": "Nicolas",
|
||||
"family_name": "STEIN",
|
||||
"userRoles": [
|
||||
"BP_PO",
|
||||
"SP_Admin",
|
||||
"Org_Member",
|
||||
"Org_Pending",
|
||||
"EIC_Dev"
|
||||
],
|
||||
"sessionExpire": 3594,
|
||||
"busConnected": true
|
||||
}
|
||||
],
|
||||
"success": true,
|
||||
"reqid": "df58a401-4ed2-4908-a2b1-8bae155e413a"
|
||||
}
|
||||
*/
|
||||
async action_GETACTIVEUSERS(action, payload, reqid){
|
||||
if(!this.accessRights.canDo(this.roles, 'getActiveUsers')) {
|
||||
this.sendErr(action, 'Unauthorized action !', reqid);
|
||||
return
|
||||
}
|
||||
|
||||
//TODO: take from new config key instead of hardcded
|
||||
const iterOptions = {
|
||||
TYPE: 'string',
|
||||
MATCH: 'authorizer:sessid_*'
|
||||
}
|
||||
|
||||
let activeUsers = []
|
||||
for await (const key of this.rediscnx.redisClient.scanIterator(iterOptions)) {
|
||||
let sess = null
|
||||
try{ sess = JSON.parse(await this.rediscnx.redisGet(key, '')) }
|
||||
catch(err) { console.log('bad sess info')}
|
||||
if((!sess) || (!sess.isAuthenticated) || (!sess.sessionID)
|
||||
|| (!sess.userInfo) || (!sess.userInfo.userRoles) || (!sess.userInfo.euLoginId)){
|
||||
continue
|
||||
}
|
||||
|
||||
let ttl = await this.rediscnx.redisGetTtl(key, '')
|
||||
activeUsers.push({
|
||||
uid: sess.userInfo.euLoginId,
|
||||
email: sess.userInfo.email,
|
||||
given_name: sess.userInfo.given_name,
|
||||
family_name: sess.userInfo.family_name,
|
||||
userRoles: sess.userInfo.userRoles,
|
||||
sessionExpire: ttl,
|
||||
busConnected: this.wssSrv.sessionConnected(sess.sessionID),
|
||||
})
|
||||
}
|
||||
var reply = {
|
||||
'action': action,
|
||||
'payload': activeUsers,
|
||||
'success': true,
|
||||
};
|
||||
if(reqid) reply.reqid = reqid;
|
||||
this.send(JSON.stringify(reply));
|
||||
},
|
||||
/*
|
||||
* payload: {
|
||||
uids: [ 'fallimi' ],
|
||||
notRoles : ['EIC_ADMIN', 'EIC_Dev' ],
|
||||
ttl: 0
|
||||
}
|
||||
=> Both conditions must be met (here nothing gets done as fallimi is EIC_Dev)
|
||||
|
||||
Any uid, but not some roles :
|
||||
{
|
||||
uids: null,
|
||||
notRoles : ['EIC_ADMIN', 'EIC_Dev' ],
|
||||
ttl: 0
|
||||
}
|
||||
|
||||
Some uids, don't care their roles in 30 seconds :
|
||||
{
|
||||
uids: [ 'infosca', 'nz01234' ],
|
||||
notRoles : [],
|
||||
ttl: 30
|
||||
}
|
||||
|
||||
*/
|
||||
|
||||
|
||||
/* Request payload : { "uid":"steinni" }
|
||||
Reply:
|
||||
{
|
||||
"action": "GETUSERSTATUS",
|
||||
"payload":
|
||||
{
|
||||
"uid": "steinic",
|
||||
"email": "Nicolas.STEIN@ext.ec.europa.eu",
|
||||
"given_name": "Nicolas",
|
||||
"family_name": "STEIN",
|
||||
"sessionExpire": 3594,
|
||||
"busConnected": true
|
||||
},
|
||||
"success": true,
|
||||
"reqid": "df58a401-4ed2-4908-a2b1-8bae155e413a"
|
||||
}
|
||||
*/
|
||||
async action_GETUSERSTATUS(action, payload, reqid){
|
||||
if(!this.accessRights.canDo(this.roles, 'getUserStatus')) {
|
||||
this.sendErr(action, 'Unauthorized action !', reqid);
|
||||
return
|
||||
}
|
||||
|
||||
const iterOptions = {
|
||||
TYPE: 'string',
|
||||
MATCH: 'authorizer:sessid_*'
|
||||
}
|
||||
|
||||
let user = {
|
||||
uid: payload.uid,
|
||||
email: null,
|
||||
given_name: null,
|
||||
family_name: null,
|
||||
sessionExpire: null,
|
||||
busConnected: null,
|
||||
}
|
||||
|
||||
for await (const key of this.rediscnx.redisClient.scanIterator(iterOptions)) {
|
||||
let sess = null
|
||||
try{ sess = JSON.parse(await this.rediscnx.redisGet(key, '')) }
|
||||
catch(err) { console.log('bad sess info')}
|
||||
|
||||
if((!sess) || (!sess.isAuthenticated) || (!sess.sessionID)
|
||||
|| (!sess.userInfo) || (!sess.userInfo.userRoles) || (!sess.userInfo.euLoginId)
|
||||
|| (sess.userInfo.euLoginId != payload.uid)
|
||||
) {
|
||||
continue
|
||||
} else {
|
||||
let ttl = await this.rediscnx.redisGetTtl(key, '')
|
||||
user={
|
||||
uid: sess.userInfo.euLoginId,
|
||||
email: sess.userInfo.email,
|
||||
given_name: sess.userInfo.given_name,
|
||||
family_name: sess.userInfo.family_name,
|
||||
sessionExpire: ttl,
|
||||
busConnected: this.wssSrv.sessionConnected(sess.sessionID),
|
||||
}
|
||||
break
|
||||
}
|
||||
}
|
||||
|
||||
var reply = {
|
||||
'action': action,
|
||||
'payload': user,
|
||||
'success': true,
|
||||
};
|
||||
if(reqid) reply.reqid = reqid;
|
||||
this.send(JSON.stringify(reply));
|
||||
},
|
||||
|
||||
async action_KILLSESSION(action, payload, reqid){
|
||||
if(!this.accessRights.canDo(this.roles, 'killSessions')) {
|
||||
this.sendErr(action, 'Unauthorized action !', reqid);
|
||||
return
|
||||
}
|
||||
if( (!payload.notRoles) || (!Array.isArray(payload.notRoles)) || (payload.uids && (!Array.isArray(payload.uids))) ){
|
||||
this.sendErr(action, 'Bad payload !', reqid);
|
||||
return
|
||||
}
|
||||
|
||||
|
||||
|
||||
//TODO: take from new config key instead of hardcded
|
||||
const iterOptions = {
|
||||
TYPE: 'string',
|
||||
MATCH: 'authorizer:sessid_*'
|
||||
}
|
||||
|
||||
for await (const key of this.rediscnx.redisClient.scanIterator(iterOptions)) {
|
||||
if(key.endsWith('_cookie')) continue
|
||||
let sess = null
|
||||
try{ sess = JSON.parse(await this.rediscnx.redisGet(key, '')) }
|
||||
catch(err) { console.log('bad sess info')}
|
||||
if((!sess) || (!sess.isAuthenticated)) continue
|
||||
|
||||
if(payload.uids && (payload.uids.indexOf(sess.userInfo['euLoginId'])<0)) continue
|
||||
let intersect = payload.notRoles.filter(value => sess.userInfo.userRoles.includes(value));
|
||||
if(intersect.length>0) continue
|
||||
|
||||
if((!payload.ttl) || (typeof(payload.ttl)!= number) || (payload.ttl<0) || (payload.ttl>3600)) payload.ttl=0
|
||||
let ttl = await this.rediscnx.redisSetTtl(key, payload.ttl, '')
|
||||
|
||||
}
|
||||
var reply = {
|
||||
'action': action,
|
||||
'success': true,
|
||||
};
|
||||
if(reqid) reply.reqid = reqid;
|
||||
this.send(JSON.stringify(reply));
|
||||
},
|
||||
|
||||
/* Request: (curtain down, except for devs & admins)
|
||||
{
|
||||
"action": "SETSPARCSTATE",
|
||||
"payload" : {
|
||||
blockedUids: [],
|
||||
allowedRoles : ['EIC_Admin', 'EIC_Dev'],
|
||||
},
|
||||
}
|
||||
|
||||
Request: (curtain up, for everyone)
|
||||
{
|
||||
"action": "SETSPARCSTATE",
|
||||
"payload" : {
|
||||
blockedUids: [],
|
||||
allowedRoles : '*',
|
||||
},
|
||||
}
|
||||
|
||||
Request: (curtain up, block some bad-guys)
|
||||
{
|
||||
"action": "SETSPARCSTATE",
|
||||
"payload" : {
|
||||
blockedUids: ['hacker1', 'hacker2'],
|
||||
allowedRoles : '*',
|
||||
},
|
||||
}
|
||||
|
||||
Reply:
|
||||
{
|
||||
"success": true,
|
||||
"reqid": "6az5e4r6a",
|
||||
"payload": { the accessrights }
|
||||
}
|
||||
*/
|
||||
async action_SETPLATFORMMODE(action, payload, reqid){
|
||||
if(!this.accessRights.canDo(this.roles, 'setPlatformState')) {
|
||||
this.sendErr(action, 'Unauthorized action !', reqid);
|
||||
return
|
||||
}
|
||||
if((typeof(payload)!='object') || (!Array.isArray(payload.blockedUUIDs)) ||
|
||||
( (typeof(payload.platformRestrictions)=='object') && (!Array.isArray(payload.platformRestrictions.allowedRoles)) )
|
||||
){
|
||||
this.sendErr(action, 'Invalid payload', reqid)
|
||||
return
|
||||
}
|
||||
|
||||
if(typeof(payload.platformRestrictions)=='object'){ // curtain down
|
||||
if(!payload.platformRestrictions.allowedRoles.includes('EIC_Dev')){ // anti-shoot-your-foot
|
||||
payload.platformRestrictions.allowedRoles.push('EIC_Dev')
|
||||
}
|
||||
} else { // curtain up
|
||||
//force-in an example
|
||||
payload.XX_platformRestrictions = { "allowedRoles":["EIC_Admin","EIC_Dev"],"allowedUUIDs":["valentin"] }
|
||||
}
|
||||
|
||||
|
||||
|
||||
await this.rediscnx.redisSet(this.config.redis.platformStateKey,
|
||||
payload,
|
||||
0,
|
||||
''
|
||||
)
|
||||
|
||||
var reply = {
|
||||
'action': action,
|
||||
'success': true
|
||||
};
|
||||
if(reqid) reply.reqid = reqid;
|
||||
this.send(JSON.stringify(reply));
|
||||
},
|
||||
|
||||
|
||||
/* Request:
|
||||
{
|
||||
"action": "GETSPARCMODE"
|
||||
"payload": {
|
||||
"key": "keyname"
|
||||
}
|
||||
"reqid": "6az5e4r6a"
|
||||
}
|
||||
Reply:
|
||||
{
|
||||
"action":"STORE",
|
||||
"success":true,
|
||||
"payload": {
|
||||
...the sparc mode
|
||||
}
|
||||
"reqid": reqid
|
||||
}
|
||||
*/
|
||||
async action_GETPLATFORMMODE(action, payload, reqid){
|
||||
if(!this.accessRights.canDo(this.roles, 'getPlatformState')) {
|
||||
this.sendErr(action, 'Unauthorized action !', reqid);
|
||||
return
|
||||
}
|
||||
|
||||
let rawVal = await this.rediscnx.redisGet(this.config.redis.platformStateKey, '')
|
||||
let val = null
|
||||
try { val = JSON.parse(rawVal)}
|
||||
catch(err) { console.error('Action GETSPARCMODE: Not a json !? ', rawVal) }
|
||||
|
||||
var reply = {
|
||||
'action': action,
|
||||
'payload': val,
|
||||
'success': true,
|
||||
};
|
||||
if(reqid) reply.reqid = reqid;
|
||||
this.send(JSON.stringify(reply));
|
||||
}
|
||||
|
||||
|
||||
}
|
||||
+18
-4
@@ -35,6 +35,13 @@ export const methods = {
|
||||
return
|
||||
}
|
||||
|
||||
const primaryRediscnx = this.allRediscnx.find(cnx => ((payload.key.startsWith(cnx.redisConfig.ChansFilter)) &&(cnx.redisConfig.role=='primary')) )
|
||||
if(!primaryRediscnx){
|
||||
this.sendErr(action, 'No primary redis for this key prefix !', reqid);
|
||||
if(this.debug) console.log('ACTION_SET: No primary redis for this key ', payload.key)
|
||||
return
|
||||
}
|
||||
|
||||
if(payload.value) {
|
||||
let val = null
|
||||
try { val = JSON.stringify(payload.value)}
|
||||
@@ -42,14 +49,14 @@ export const methods = {
|
||||
this.sendErr(action, 'Cannot stringify value object !', reqid);
|
||||
return
|
||||
}
|
||||
if(val.length > this.config.redis.storeMaxSize){
|
||||
if(val.length > primaryRediscnx.redisConfig.storeMaxSize){
|
||||
this.sendErr(action, 'value too large !', reqid);
|
||||
return
|
||||
}
|
||||
let exp = ((payload.expire>0) && (payload.expire<63072000)) ? payload.expire : 63072000
|
||||
await this.rediscnx.redisSet(payload.key, val, exp, this.config.redis.storePrefix)
|
||||
await primaryRediscnx.redisSet(payload.key, val, exp, primaryRediscnx.redisConfig.storePrefix)
|
||||
} else {
|
||||
await this.rediscnx.redisDel(payload.key, this.config.redis.storePrefix)
|
||||
await primaryRediscnx.redisDel(payload.key, primaryRediscnx.redisConfig.storePrefix)
|
||||
}
|
||||
var reply = {
|
||||
'action': action,
|
||||
@@ -91,7 +98,14 @@ export const methods = {
|
||||
return
|
||||
}
|
||||
|
||||
let rawVal = await this.rediscnx.redisGet(payload.key, this.config.redis.storePrefix)
|
||||
const primaryRediscnx = this.allRediscnx.find(cnx => ((payload.key.startsWith(cnx.redisConfig.ChansFilter)) &&(cnx.redisConfig.role=='primary')) )
|
||||
if(!primaryRediscnx){
|
||||
this.sendErr(action, 'No primary redis for this key prefix !', reqid);
|
||||
if(this.debug) console.log('ACTION_GET: No primary redis for this key ', payload.key)
|
||||
return
|
||||
}
|
||||
|
||||
let rawVal = await primaryRediscnx.redisGet(payload.key, primaryRediscnx.redisConfig.storePrefix)
|
||||
let val = null
|
||||
try { val = JSON.parse(rawVal)}
|
||||
catch(err) { console.error('Action GET: Not a json !? ', rawVal) }
|
||||
|
||||
+2
-49
@@ -37,7 +37,7 @@ export const methods = {
|
||||
'action': action,
|
||||
'payload': {
|
||||
wssGatewayTime: tmstp,
|
||||
redisTime: this.rediscnx.redisClient.time()
|
||||
redisTime: this.allRediscnx.map(cnx => cnx.redisClient.time())
|
||||
},
|
||||
'success': true,
|
||||
};
|
||||
@@ -145,59 +145,12 @@ export const methods = {
|
||||
var reply = {
|
||||
'action': action,
|
||||
'success': true,
|
||||
'payload': this.rediscnx.getProcessInfo
|
||||
'payload': this.allRediscnx.map(cnx => cnx.getProcessInfo)
|
||||
};
|
||||
if(reqid) reply.reqid = reqid;
|
||||
this.send(JSON.stringify(reply));
|
||||
},
|
||||
|
||||
action_REDPILL(action, payload, reqid){
|
||||
if(!this.accessRights.canDo(this.roles, 'REDPILL', this.userId)) {
|
||||
this.sendErr(action, 'Unauthorized action', reqid);
|
||||
return;
|
||||
};
|
||||
|
||||
if(!this.rediscnx.redPillsUuids.includes(this.uuid)) {
|
||||
this.rediscnx.redPillsUuids.push(this.uuid)
|
||||
setTimeout(() => {
|
||||
if(this.rediscnx.redPillsUuids.includes(this.uuid)){ // could have been removed meanwhile & splice(-1) would remove the last !!!
|
||||
this.rediscnx.redPillsUuids.splice(this.rediscnx.redPillsUuids.indexOf(this.uuid),1)
|
||||
}
|
||||
let reply = {
|
||||
'action': 'BLUEPILL',
|
||||
'payload': {},
|
||||
'success': true,
|
||||
};
|
||||
this.send(JSON.stringify(reply));
|
||||
}, 600000) // Back to blupill after 10min
|
||||
}
|
||||
|
||||
let reply = {
|
||||
'action': action,
|
||||
'payload': {},
|
||||
'success': true,
|
||||
};
|
||||
if(reqid) reply.reqid = reqid;
|
||||
this.send(JSON.stringify(reply));
|
||||
},
|
||||
|
||||
action_BLUEPILL(action, payload, reqid){
|
||||
if(!this.accessRights.canDo(this.roles, 'BLUEPILL', this.userId)) {
|
||||
this.sendErr(action, 'Unauthorized action', reqid);
|
||||
return;
|
||||
};
|
||||
|
||||
if(this.rediscnx.redPillsUuids.includes(this.uuid)) {
|
||||
this.rediscnx.redPillsUuids.splice(this.rediscnx.redPillsUuids.indexOf(this.uuid),1)
|
||||
}
|
||||
|
||||
let reply = {
|
||||
'action': action,
|
||||
'payload': {},
|
||||
'success': true,
|
||||
};
|
||||
if(reqid) reply.reqid = reqid;
|
||||
this.send(JSON.stringify(reply));
|
||||
},
|
||||
|
||||
}
|
||||
+40
-46
@@ -66,38 +66,46 @@
|
||||
]
|
||||
}
|
||||
},
|
||||
"redis": {
|
||||
"type": "object",
|
||||
"properties": {
|
||||
"host": { "type": "string" },
|
||||
"tls": { "type": "boolean" },
|
||||
"port": { "type": "integer" },
|
||||
"user": { "type": "string" },
|
||||
"pass": { "type": "string" },
|
||||
"basePrefix": { "type": "string" },
|
||||
"challengePrefix": { "type": "string" },
|
||||
"historizeMax": { "type": "integer" },
|
||||
"historizePrefix": { "type": "string" },
|
||||
"platformStateKey": { "type": "string" },
|
||||
"storeMaxSize": { "type": "integer" },
|
||||
"storePrefix": { "type": "string" },
|
||||
"historizeChannels": {
|
||||
"type": "array",
|
||||
"items": {
|
||||
"type": "string"
|
||||
}
|
||||
"redis":{
|
||||
"type": "array",
|
||||
"items": {
|
||||
"type": "object",
|
||||
"properties": {
|
||||
"redisId": {"type": "string" },
|
||||
"role": {"type": "string", "enum": ["primary", "shard"] },
|
||||
"host": { "type": "string" },
|
||||
"tls": { "type": "boolean" },
|
||||
"port": { "type": "integer" },
|
||||
"user": { "type": "string" },
|
||||
"pass": { "type": "string" },
|
||||
"chansFilter": { "type": "string" },
|
||||
"basePrefix": { "type": "string" },
|
||||
"historizeMax": { "type": "integer" },
|
||||
"historizePrefix": { "type": "string" },
|
||||
"storeMaxSize": { "type": "integer" },
|
||||
"storePrefix": { "type": "string" },
|
||||
"historizeChannels": {
|
||||
"type": "array",
|
||||
"items": {
|
||||
"type": "string"
|
||||
}
|
||||
}
|
||||
},
|
||||
"required": [
|
||||
"redisId",
|
||||
"chansFilter",
|
||||
"role",
|
||||
"basePrefix",
|
||||
"storeMaxSize",
|
||||
"storePrefix"
|
||||
],
|
||||
"if": {
|
||||
"properties": { "role": { "const": "primary" } }
|
||||
},
|
||||
"then": {
|
||||
"required": ["historizeChannels", "historizeMax", "historizePrefix"]
|
||||
}
|
||||
},
|
||||
"required": [
|
||||
"basePrefix",
|
||||
"challengePrefix",
|
||||
"historizeChannels",
|
||||
"historizeMax",
|
||||
"historizePrefix",
|
||||
"platformStateKey",
|
||||
"storeMaxSize",
|
||||
"storePrefix"
|
||||
]
|
||||
}
|
||||
},
|
||||
"server": {
|
||||
"type": "object",
|
||||
@@ -109,30 +117,16 @@
|
||||
"listenPath": { "type": "string" },
|
||||
"unsecure": { "type": "boolean" },
|
||||
"challengeExpiration": { "type": "integer" },
|
||||
"healthCheckPath": { "type": "string" },
|
||||
"keepAliveInterval": { "type": "string" },
|
||||
"keepAliveTimeout": { "type": "string" },
|
||||
"systemChannels": {
|
||||
"type": "object",
|
||||
"properties": {
|
||||
"onlineUsers": {
|
||||
"type": "string"
|
||||
}
|
||||
},
|
||||
"required": [
|
||||
"onlineUsers"
|
||||
]
|
||||
}
|
||||
"keepAliveTimeout": { "type": "string" }
|
||||
},
|
||||
"required": [
|
||||
"challengeExpiration",
|
||||
"healthCheckPath",
|
||||
"keepAliveInterval",
|
||||
"keepAliveTimeout",
|
||||
"listenHost",
|
||||
"listenPath",
|
||||
"listenPort",
|
||||
"systemChannels",
|
||||
"unsecure"
|
||||
],
|
||||
"if": {
|
||||
|
||||
+54
-15
@@ -75,19 +75,59 @@ const cfgh = new configHelper({
|
||||
localfile: './wssGatewayConfig.json',
|
||||
})
|
||||
|
||||
async function startRedis(wssGatewayConfig) {
|
||||
let REDIScnx = new RedisConnexion({
|
||||
debug: debug,
|
||||
config: wssGatewayConfig,
|
||||
});
|
||||
if(debug) console.log('Starting REDIS...');
|
||||
await REDIScnx.redisLogin();
|
||||
if(debug) console.log('REDIS Login OK');
|
||||
await REDIScnx.redisChansStart();
|
||||
if(debug) console.log('REDIS ChansStart OK');
|
||||
return (REDIScnx);
|
||||
}
|
||||
async function startAllRedis(wssGatewayConfig) {
|
||||
if (debug) console.log('Starting all Redis instances...')
|
||||
|
||||
//1. instantiate all & login all
|
||||
const redisConns = wssGatewayConfig.redis.map(cfg =>
|
||||
new RedisConnexion({ debug, config: cfg, redisId:cfg.redisId })
|
||||
)
|
||||
const loginResults = await Promise.allSettled(
|
||||
redisConns.map(async cnx => {
|
||||
cnx.redisLogin()
|
||||
return cnx.redisId
|
||||
})
|
||||
)
|
||||
|
||||
2. //make sure all connected before going any further
|
||||
const failedLogin = loginResults.filter(r => r.status !== 'fulfilled')
|
||||
if (failedLogin.length > 0) {
|
||||
console.error('Redis login failures:')
|
||||
failedLogin.forEach((r, i) => {
|
||||
const id = redisConns[i].redisId
|
||||
console.error(`chansStart failed for redis:[${id}] → ${r.reason}`)
|
||||
})
|
||||
throw new Error(
|
||||
`Redis login failed for ${failedLogin.length}/${redisConns.length} instances`
|
||||
)
|
||||
}
|
||||
|
||||
if (debug) console.log('All Redis logins OK')
|
||||
|
||||
// --- Phase 2: start channels for all (since all succeeded)
|
||||
const chanResults = await Promise.allSettled(
|
||||
redisConns.map(async cnx => {
|
||||
cnx.redisChansStart()
|
||||
return cnx.redisId
|
||||
})
|
||||
)
|
||||
|
||||
const failedChans = chanResults.filter(r => r.status !== 'fulfilled')
|
||||
if (failedChans.length > 0) {
|
||||
console.error('Redis chansStart failures:')
|
||||
failedChans.forEach((r, i) => {
|
||||
const id = redisConns[i].redisId
|
||||
console.error(`chansStart failed for redis:[${id}] → ${r.reason}`)
|
||||
})
|
||||
throw new Error(
|
||||
`Redis chansStart failed for ${failedChans.length}/${redisConns.length} instances`
|
||||
)
|
||||
}
|
||||
|
||||
if (debug) console.log('All Redis chansStart OK')
|
||||
|
||||
return redisConns
|
||||
}
|
||||
|
||||
cfgh.fetchConfig().then( async wssGatewayConfig => {
|
||||
|
||||
@@ -150,9 +190,8 @@ cfgh.fetchConfig().then( async wssGatewayConfig => {
|
||||
console.log(`WS${wssGatewayConfig.server.unsecure ? '': 'S'} server created for ${wssGatewayConfig.server.listenHost}:${wssGatewayConfig.server.listenPort}`)
|
||||
})
|
||||
|
||||
startRedis(wssGatewayConfig).then((rediscnx) => {
|
||||
if(debug) console.log('Redis started & logged in !');
|
||||
const wssSrv = new wssServer(cfgh, WSSServer, rediscnx, debug);
|
||||
startRedis(wssGatewayConfig).then((allRediscnx) => {
|
||||
const wssSrv = new wssServer(cfgh, WSSServer, allRediscnx, debug);
|
||||
});
|
||||
|
||||
|
||||
|
||||
+35
-47
@@ -4,16 +4,17 @@ export class RedisConnexion {
|
||||
constructor(options) {
|
||||
this.config = options.config;
|
||||
this.debug = options.debug;
|
||||
this.redisId = options.redisId;
|
||||
this.redisConfig = this.config.redis[this.redisId]
|
||||
|
||||
this.subscriptions = {}; // Externally fed
|
||||
this.wssConnections = {}; // Externally fed
|
||||
this.redPillsUuids = []; // Externally fed
|
||||
|
||||
this.redisClient = redis.createClient({
|
||||
socket: {
|
||||
tls: this.config.redis.tls,
|
||||
host: this.config.redis.host,
|
||||
port: this.config.redis.port
|
||||
tls: this.redisConfig.tls,
|
||||
host: this.redisConfig.host,
|
||||
port: this.redisConfig.port
|
||||
}
|
||||
});
|
||||
|
||||
@@ -42,11 +43,11 @@ export class RedisConnexion {
|
||||
}
|
||||
|
||||
async redisLogin(){
|
||||
if(this.debug) console.log(`Connecting to Redis (${this.config.redis.host}:${this.config.redis.port}, tls:${this.config.redis.tls?'yes':'no'})...`);
|
||||
if(this.debug) console.log(`Connecting to Redis (${this.redisConfig.host}:${this.redisConfig.port}, tls:${this.redisConfig.tls?'yes':'no'})...`);
|
||||
await this.redisClient.connect();
|
||||
if(this.debug) console.log('Connected to Redis !');
|
||||
if(this.config.redis.user) {
|
||||
await this.redisClient.sendCommand(['AUTH', this.config.redis.user, this.config.redis.pass]);
|
||||
if(this.redisConfig.user) {
|
||||
await this.redisClient.sendCommand(['AUTH', this.redisConfig.user, this.redisConfig.pass]);
|
||||
if(this.debug) console.log('Logged into Redis !');
|
||||
} else {
|
||||
if(this.debug) console.log('Connected (anon) to Redis...');
|
||||
@@ -60,32 +61,38 @@ export class RedisConnexion {
|
||||
async redisChansStart(){
|
||||
this.redisSubscriber = this.redisClient.duplicate();
|
||||
await this.redisSubscriber.connect();
|
||||
if(this.config.redis.user) {
|
||||
await this.redisSubscriber.sendCommand(['AUTH', this.config.redis.user, this.config.redis.pass]);
|
||||
if(this.redisConfig.user) {
|
||||
await this.redisSubscriber.sendCommand(['AUTH', this.redisConfig.user, this.redisConfig.pass]);
|
||||
}
|
||||
this.redisSubscriber.pSubscribe(this.config.redis.basePrefix + '*', this.redisReceive.bind(this));
|
||||
if(this.debug) console.log('PSubscription OK ', this.config.redis.basePrefix + '*');
|
||||
const allChans = this.redisConfig.basePrefix + this.redisConfig.ChansFilter+'*'
|
||||
this.redisSubscriber.pSubscribe(allChans, this.redisReceive.bind(this));
|
||||
if(this.debug) console.log('PSubscription OK ', allChans);
|
||||
}
|
||||
|
||||
async redisSubscribe(chanName, callBack){
|
||||
if(!chanName.startsWith(this.config.redis.basePrefix)) chanName = this.config.redis.basePrefix + chanName
|
||||
if(!chanName.startsWith(this.redisConfig.basePrefix)) chanName = this.redisConfig.basePrefix + chanName
|
||||
if(!chanName.startsWith(this.redisConfig.basePrefix+this.ChansFilter)) {
|
||||
console.warn(`redisSubscribe : forbidden channel range on this redis !`)
|
||||
return
|
||||
}
|
||||
await this.redisSubscriber.subscribe(chanName, callBack);
|
||||
}
|
||||
|
||||
async redisPublish(chanName, msg){
|
||||
if(typeof (msg) != 'string') msg = JSON.stringify(msg);
|
||||
if(!chanName.startsWith(this.config.redis.basePrefix)) chanName = this.config.redis.basePrefix + chanName
|
||||
await this.redisClient.publish(chanName, msg);
|
||||
}
|
||||
if(!chanName.startsWith(this.redisConfig.basePrefix)) chanName = this.redisConfig.basePrefix + chanName
|
||||
if(!chanName.startsWith(this.redisConfig.basePrefix+this.ChansFilter)) {
|
||||
console.warn(`redisPublish : forbidden channel range on this redis !`)
|
||||
return
|
||||
}
|
||||
|
||||
async redisRefreshSession(k){
|
||||
await this.redisClient.expire(k, this.config.server.sessionExpiration);
|
||||
await this.redisClient.publish(chanName, msg);
|
||||
}
|
||||
|
||||
async redisSet(k, v, exp = 0, customPrefix=null){
|
||||
if(typeof(v) != 'string') v = JSON.stringify(v);
|
||||
if(customPrefix!==null) k = customPrefix + k
|
||||
else if(!k.startsWith(this.config.redis.basePrefix)) k = this.config.redis.basePrefix + k
|
||||
else if(!k.startsWith(this.redisConfig.basePrefix)) k = this.redisConfig.basePrefix + k
|
||||
if(this.debug) console.log('Redis SET ', k);
|
||||
try { await this.redisClient.set(k, v) }
|
||||
catch(err) { console.error('Redis crash doing Redis set: ', k, v) }
|
||||
@@ -97,7 +104,7 @@ export class RedisConnexion {
|
||||
|
||||
async redisGet(k, customPrefix=null){
|
||||
if(customPrefix!==null) k = customPrefix + k
|
||||
else if(!k.startsWith(this.config.redis.basePrefix)) k = this.config.redis.basePrefix + k
|
||||
else if(!k.startsWith(this.redisConfig.basePrefix)) k = this.redisConfig.basePrefix + k
|
||||
if(this.debug) console.log('Redis GET ', k)
|
||||
let v=null
|
||||
try { v = await this.redisClient.get(k) }
|
||||
@@ -107,7 +114,7 @@ export class RedisConnexion {
|
||||
|
||||
async redisDel(k, customPrefix=null){
|
||||
if(customPrefix!==null) k = customPrefix + k
|
||||
else if(!k.startsWith(this.config.redis.basePrefix)) k = this.config.redis.basePrefix + k
|
||||
else if(!k.startsWith(this.redisConfig.basePrefix)) k = this.redisConfig.basePrefix + k
|
||||
if(this.debug) console.log('Deleting ', k);
|
||||
await this.redisClient.del(k);
|
||||
}
|
||||
@@ -115,7 +122,7 @@ export class RedisConnexion {
|
||||
|
||||
async redisGetTtl(k, customPrefix=null){
|
||||
if(customPrefix!==null) k = customPrefix + k
|
||||
else if(!k.startsWith(this.config.redis.basePrefix)) k = this.config.redis.basePrefix + k
|
||||
else if(!k.startsWith(this.redisConfig.basePrefix)) k = this.redisConfig.basePrefix + k
|
||||
if(this.debug) console.log('Redis Get TTL ', k)
|
||||
let v=null
|
||||
try { v = await this.redisClient.ttl(k) }
|
||||
@@ -125,14 +132,14 @@ export class RedisConnexion {
|
||||
|
||||
async redisSetTtl(k, ttl, customPrefix=null){
|
||||
if(customPrefix!==null) k = customPrefix + k
|
||||
else if(!k.startsWith(this.config.redis.basePrefix)) k = this.config.redis.basePrefix + k
|
||||
else if(!k.startsWith(this.redisConfig.basePrefix)) k = this.redisConfig.basePrefix + k
|
||||
if(this.debug) console.log('Redis Set TTL ', k);
|
||||
try { await this.redisClient.expire(k, ttl) }
|
||||
catch(err) { console.error('Redis crash doing Redis expire: ', k, ttl) }
|
||||
}
|
||||
|
||||
async redisXadd(streamName, kvObj, max = ''){
|
||||
if(!streamName.startsWith(this.config.redis.basePrefix)) streamName = this.config.redis.basePrefix + streamName
|
||||
if(!streamName.startsWith(this.redisConfig.basePrefix)) streamName = this.redisConfig.basePrefix + streamName
|
||||
if(this.debug) console.log('Redis XADD ', streamName, kvObj);
|
||||
let arr = ['XADD', streamName]
|
||||
if(max != '') arr = [...arr, ...['MAXLEN', '~', (1*max).toString()]]
|
||||
@@ -149,7 +156,7 @@ export class RedisConnexion {
|
||||
}
|
||||
|
||||
async redisXrange(streamName, start = '-', end = '+', withPayload = true){
|
||||
if(!streamName.startsWith(this.config.redis.basePrefix)) streamName = this.config.redis.basePrefix + streamName
|
||||
if(!streamName.startsWith(this.redisConfig.basePrefix)) streamName = this.redisConfig.basePrefix + streamName
|
||||
if(this.debug) console.log('Redis XRANGE ', streamName);
|
||||
if(typeof(start)!='string') start = start.toString()
|
||||
if(typeof(end)!='string') end = end.toString()
|
||||
@@ -173,9 +180,9 @@ export class RedisConnexion {
|
||||
}
|
||||
|
||||
isHistorizedChan(chan){
|
||||
if(!chan.startsWith(this.config.redis.basePrefix)) chan = this.config.redis.basePrefix + chan
|
||||
var matches = this.config.redis.historizeChannels.filter((e) => {
|
||||
if(!e.startsWith(this.config.redis.basePrefix)) e = this.config.redis.basePrefix + e
|
||||
if(!chan.startsWith(this.redisConfig.basePrefix)) chan = this.redisConfig.basePrefix + chan
|
||||
var matches = this.redisConfig.historizeChannels.filter((e) => {
|
||||
if(!e.startsWith(this.redisConfig.basePrefix)) e = this.redisConfig.basePrefix + e
|
||||
if(e.indexOf('*') > -1) {
|
||||
let r = new RegExp('^'+e.replace(/\*/g,'(.+)')+'$','g')
|
||||
return(chan.match(r) != null);
|
||||
@@ -206,30 +213,11 @@ export class RedisConnexion {
|
||||
return;
|
||||
}
|
||||
|
||||
if(this.redPillsUuids.length>0) { // Any dev bus console in RedPills (promiscuous) mode ?
|
||||
if(this.debug) console.log(`Will send to ${this.redPillsUuids.length} REDPILLS`);
|
||||
let shortChan = chan.startsWith(this.config.redis.basePrefix) ? chan.substr(this.config.redis.basePrefix.length) : chan
|
||||
let payload ={
|
||||
'event': 'REDISMSG',
|
||||
'payload': {
|
||||
'bmsg':{ // Extra encapsulation to avoid triggering normal listeners on FE
|
||||
'msg': msg,
|
||||
'chan': shortChan,
|
||||
}
|
||||
}
|
||||
}
|
||||
for(var uuid of this.redPillsUuids) {
|
||||
if(uuid in this.wssConnections) {
|
||||
this.wssConnections[uuid].send(JSON.stringify(payload));
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
if(this.debug) console.log('will now fanout...', chan, msg);
|
||||
const uuids = this.getSubscribedUuids(chan)
|
||||
if(uuids.length>0) { // Anyone interested at all about this chan ?
|
||||
if(this.debug) console.log(`Will broadcast to ${uuids.length} web clients`);
|
||||
let shortChan = chan.startsWith(this.config.redis.basePrefix) ? chan.substr(this.config.redis.basePrefix.length) : chan
|
||||
let shortChan = chan.startsWith(this.redisConfig.basePrefix) ? chan.substr(this.redisConfig.basePrefix.length) : chan
|
||||
let payload ={
|
||||
'event': 'REDISMSG',
|
||||
'payload': {
|
||||
|
||||
+16
-30
@@ -1,5 +1,6 @@
|
||||
import crypto from 'crypto'
|
||||
import { gatewayActions } from './actions/index.js'
|
||||
import { RedisSearchLanguages } from 'redis'
|
||||
|
||||
export class WssConnexion {
|
||||
|
||||
@@ -11,7 +12,7 @@ export class WssConnexion {
|
||||
this.uuid = options.uuid
|
||||
this.wssSrv = options.wssSrv
|
||||
this.debug = options.debug
|
||||
this.rediscnx = options.rediscnx
|
||||
this.allRediscnx = options.allRediscnx
|
||||
this.accessRights = options.accessRights
|
||||
this.userId = options.userId
|
||||
this.roles = options.roles
|
||||
@@ -95,34 +96,17 @@ export class WssConnexion {
|
||||
// Also think about all possibly active bind(this), which -I guess- also make references preventing GC.
|
||||
}
|
||||
|
||||
async getAwaitingNotifs(){
|
||||
//TODO : AWAIT this from either Redis and/or ML
|
||||
|
||||
// Key: notif destination module, value: either KV with V=nb of notifs, or Array whose length is nb of notifs
|
||||
let notifs = { // TEST EXAMPLE
|
||||
"unreadChats": {
|
||||
"chan001" : 2,
|
||||
"chan002" : 10,
|
||||
"chan003" : 7,
|
||||
},
|
||||
"OTS": [ "fallimi", "infosca" ],
|
||||
"OtherNotifDest" : []
|
||||
};
|
||||
|
||||
return(notifs);
|
||||
}
|
||||
|
||||
subscribeMandatoryChans(){
|
||||
let mandaChans = this.accessRights.mustSubscribe(this.userId, this.roles)
|
||||
mandaChans.push('userchans:'+this.userId); // Add user private chan
|
||||
|
||||
mandaChans = mandaChans.map(item=>this.config.redis.basePrefix+item)
|
||||
|
||||
for(var chan of mandaChans){
|
||||
if(!(chan in this.rediscnx.subscriptions)) this.rediscnx.subscriptions[chan] = [];
|
||||
if(this.subscriptions.indexOf(chan)<0) {
|
||||
this.subscriptions.push(chan);
|
||||
this.rediscnx.subscriptions[chan].push(this.uuid);
|
||||
for(let rediscnx of this.allRediscnx){
|
||||
mandaChans = mandaChans.filter(chan => chan.startsWith(rediscnx.redisConfig.chansFilter))
|
||||
mandaChans = mandaChans.map(item=>rediscnx.redisConfig.basePrefix+item)
|
||||
for(var chan of mandaChans){
|
||||
if(!(chan in rediscnx.subscriptions)) rediscnx.subscriptions[chan] = [];
|
||||
if(this.subscriptions.indexOf(chan)<0) {
|
||||
this.subscriptions.push(chan);
|
||||
rediscnx.subscriptions[chan].push(this.uuid);
|
||||
}
|
||||
}
|
||||
}
|
||||
this.action_SUBLST('SUBLST', null, '');
|
||||
@@ -130,11 +114,13 @@ export class WssConnexion {
|
||||
|
||||
clearAllSubscriptions(){
|
||||
for(var chan of this.subscriptions){
|
||||
if(chan in this.rediscnx.subscriptions) {
|
||||
this.rediscnx.subscriptions[chan].splice(this.rediscnx.subscriptions[chan].indexOf(this.uuid), 1) ;
|
||||
for(let rediscnx of this.allRediscnx){
|
||||
if(chan in rediscnx.subscriptions) {
|
||||
rediscnx.subscriptions[chan].splice(rediscnx.subscriptions[chan].indexOf(this.uuid), 1)
|
||||
}
|
||||
}
|
||||
}
|
||||
this.subscriptions = [];
|
||||
this.subscriptions = []
|
||||
}
|
||||
|
||||
sendErr(action, msg, reqid){
|
||||
|
||||
+45
-34
@@ -9,48 +9,59 @@
|
||||
"XXcertFile": "/etc/letsencrypt/live/42.internike.com/fullchain.pem",
|
||||
"XXcertKeyFile": "/etc/letsencrypt/live/42.internike.com/privkey.pem",
|
||||
"challengeExpiration": 20,
|
||||
"unsecure": true,
|
||||
"healthCheckPath": "/status",
|
||||
"devotpToken": "qhsdfkjhqsgdfkqhs",
|
||||
"systemChannels": {
|
||||
"onlineUsers": "onlineUsers"
|
||||
}
|
||||
"unsecure": true
|
||||
},
|
||||
"accessRights":[
|
||||
{
|
||||
"roles": "*",
|
||||
"mustSubscribe": [ "system:notifs:[UID]", "onlineUsers", "system:notifs" ],
|
||||
"canSubscribe": ["gps:*","agents:*"],
|
||||
"mustSubscribe": [ "system:notifs:[UID]", "system:notifs" ],
|
||||
"canSubscribe": ["system:gps:*", "arena:gps:*","arena:agents:*"],
|
||||
"canPublish": [ ],
|
||||
"canSet": [ "[UID]:userPrefs" ],
|
||||
"canGet": [ "[UID]:userPrefs"]
|
||||
"canSet": [ ],
|
||||
"canGet": [ ]
|
||||
},
|
||||
{
|
||||
"roles": ["admin"],
|
||||
"mustSubscribe": ["system:adminNotifs"],
|
||||
"canSubscribe": [ "infraNotifs:*", "gps:*","agents:*"],
|
||||
"canPublish": ["gps:*", "agents:*", "system:notifs:*", "system:notifs", "infraNotifs:*"],
|
||||
"canSet": ["*:userPrefs"],
|
||||
"canGet": ["*:userPrefs"],
|
||||
"canDo": ["getActiveUsers", "killSessions","reloadAccessRights", "getAccessRights", "getPlatformState", "setPlatformState", "redPill"]
|
||||
"mustSubscribe": ["system:infraNotifs", "system:replies:[UID]"],
|
||||
"canSubscribe": [ ],
|
||||
"canPublish": [ "system:requests:*" ],
|
||||
"canSet": ["system:*"],
|
||||
"canGet": ["system:*"],
|
||||
"canDo": ["getActiveUsers", "reloadAccessRights", "getAccessRights"]
|
||||
}
|
||||
],
|
||||
"redis":{
|
||||
"host": "127.0.0.1",
|
||||
"tls":false,
|
||||
"port": 6379,
|
||||
"Xuser": "msgbus",
|
||||
"Xpass": "yj465sqfCTA0bKDw3zEYg8OqYl9Tv",
|
||||
"user": "",
|
||||
"pass": "",
|
||||
"historizeChannels": [ "userchans:*" ],
|
||||
"historizeMax": 1000,
|
||||
"authTokenPrefix": "authorizer:message_bus_user_",
|
||||
"challengePrefix": "msgBusChallenge:",
|
||||
"basePrefix": "messageBus:",
|
||||
"storePrefix": "messageBus:Store:",
|
||||
"storeMaxSize": 51200,
|
||||
"historizePrefix": "histoChan:",
|
||||
"platformStateKey": "authorizer:platformDown"
|
||||
}
|
||||
"redis":[
|
||||
{
|
||||
"redisId":"SYS_1",
|
||||
"role": "primary",
|
||||
"host": "127.0.0.1",
|
||||
"tls":false,
|
||||
"port": 6380,
|
||||
"user": "",
|
||||
"pass": "",
|
||||
"historizeChannels": [ ],
|
||||
"historizeMax": 1000,
|
||||
"ChansFilter":"system:",
|
||||
"basePrefix": "messageBus:",
|
||||
"storePrefix": "messageBus:Store:",
|
||||
"storeMaxSize": 51200,
|
||||
"historizePrefix": "histoChan:"
|
||||
},
|
||||
{
|
||||
"redisId":"ARN_1",
|
||||
"role": "primary",
|
||||
"host": "127.0.0.1",
|
||||
"tls":false,
|
||||
"port": 6379,
|
||||
"user": "",
|
||||
"pass": "",
|
||||
"historizeChannels": [ ],
|
||||
"historizeMax": 1000,
|
||||
"chansFilter":"arena:",
|
||||
"basePrefix": "messageBus:",
|
||||
"storePrefix": "messageBus:Store:",
|
||||
"storeMaxSize": 51200,
|
||||
"historizePrefix": "histoChan:"
|
||||
}
|
||||
]
|
||||
}
|
||||
|
||||
+6
-19
@@ -4,10 +4,10 @@ import {WssConnexion} from './wssConnexion.js'
|
||||
|
||||
export class wssServer {
|
||||
|
||||
constructor(configHelper, WSSServer, REDIScnx, debug) {
|
||||
constructor(configHelper, WSSServer, allRediscnx, debug) {
|
||||
this.debug = debug
|
||||
if(this.debug) console.log('Starting WSSGateway...')
|
||||
this.REDIScnx = REDIScnx
|
||||
this.allRediscnx = allRediscnx
|
||||
this.configHelper = configHelper
|
||||
this.wssGatewayConfig = configHelper.config
|
||||
this.AllWssConnections = {}
|
||||
@@ -38,7 +38,7 @@ export class wssServer {
|
||||
wssSrv: this,
|
||||
debug: this.debug,
|
||||
config: this.wssGatewayConfig,
|
||||
rediscnx: this.REDIScnx,
|
||||
allRediscnx: this.allRediscnx,
|
||||
accessRights: this.accessRights,
|
||||
userId: socket.session.userInfos.identity.uuid,
|
||||
roles: socket.session.userInfos.roles,
|
||||
@@ -47,23 +47,10 @@ export class wssServer {
|
||||
if(!(wssCnx.userId in this.Users2uuids)) this.Users2uuids[wssCnx.userId] = new Set();
|
||||
this.Users2uuids[wssCnx.userId].add(uuid);
|
||||
this.OnlineUsers.add(wssCnx.userId);
|
||||
this.REDIScnx.wssConnections[uuid] = wssCnx;
|
||||
this.allRediscnx.forEach(cnx => { cnx.wssConnections[uuid] = wssCnx })
|
||||
this.postLoginActions(wssCnx)
|
||||
} else socket.close()
|
||||
|
||||
|
||||
// wssCnx.doLogin().then(() => { // Things to execute only when successfuly logged-in
|
||||
// if(!(wssCnx.userId in this.Users2uuids)) this.Users2uuids[wssCnx.userId] = new Set();
|
||||
// this.Users2uuids[wssCnx.userId].add(uuid);
|
||||
// this.OnlineUsers.add(wssCnx.userId);
|
||||
// this.REDIScnx.wssConnections[uuid] = wssCnx;
|
||||
// //}).then(() => {
|
||||
// wssCnx.send(JSON.stringify({
|
||||
// 'action': 'LOGIN',
|
||||
// 'logged': true
|
||||
// }))
|
||||
// this.postLoginActions(wssCnx)
|
||||
// })
|
||||
|
||||
}
|
||||
|
||||
postLoginActions(wssCnx) {
|
||||
@@ -79,7 +66,7 @@ export class wssServer {
|
||||
this.Users2uuids[userId].delete(uuid);
|
||||
if(this.Users2uuids[userId].size == 0) this.OnlineUsers.delete(userId);
|
||||
}
|
||||
delete(this.REDIScnx.wssConnections[uuid]);
|
||||
this.allRediscnx.forEach(cnx => delete cns.wssConnections[uuid])
|
||||
this.fanoutOnlineUsers(this.getOnlineUsers());
|
||||
}
|
||||
|
||||
|
||||
Reference in New Issue
Block a user