268 lines
10 KiB
JavaScript
268 lines
10 KiB
JavaScript
export const methods = {
|
|
/* Request:
|
|
{
|
|
"action": "SUB",
|
|
"payload" : ["chan1","chan2","unauthorized"]
|
|
}
|
|
Reply: (returns all active subscriptions after this SUB)
|
|
{
|
|
"action": "SUB",
|
|
"success": true,
|
|
"payload" : ["chan1","chan2","wasalreadysubscribed"]
|
|
}
|
|
*/
|
|
action_SUB(action, payload, reqid){
|
|
if(!Array.isArray(payload)){
|
|
this.sendErr(action, 'Invalid payload', reqid);
|
|
return;
|
|
}
|
|
// payload only accepts NON-Bas-Prefixed chans
|
|
let subscribed = []
|
|
for(var chan of payload){
|
|
if((!chan) || (typeof(chan)!='string')) continue
|
|
chan = chan.replace(/\[UID\]/g, this.userId)
|
|
chan = chan.replace(/\[CNXID\]/g, this.uuid)
|
|
if(!this.accessRights.canSubscribe(this.userId, this.roles, chan, this.uuid)) {
|
|
if(this.debug) console.log('SUB: No rights to this chan!', this.userId, this.roles, chan)
|
|
continue
|
|
}
|
|
|
|
let coudSubscribe = false
|
|
for(const rediscnx of this.allRediscnx){
|
|
if(!chan.startsWith(rediscnx.redisConfig.chansNamespace)) continue
|
|
else coudSubscribe = true
|
|
let localChan = rediscnx.redisConfig.basePrefix + chan
|
|
if(!(localChan in rediscnx.subscriptions)) rediscnx.subscriptions[localChan] = [];
|
|
if(!rediscnx.subscriptions[localChan].includes(this.uuid)) {
|
|
rediscnx.subscriptions[localChan].push(this.uuid);
|
|
}
|
|
}
|
|
|
|
if(coudSubscribe && (!subscribed.includes(chan))) subscribed.push(chan)
|
|
if(coudSubscribe && (!this.subscriptions.includes(chan)) ){ this.subscriptions.push(chan) }
|
|
|
|
}
|
|
|
|
let reply = {
|
|
'action': action,
|
|
'payload': subscribed,
|
|
'success': true,
|
|
};
|
|
if(reqid) reply.reqid = reqid;
|
|
this.send(JSON.stringify(reply));
|
|
},
|
|
|
|
/*
|
|
Request:
|
|
{
|
|
"action":"UNSUB",
|
|
"payload" : ["chan1","notsubscribed_chan","mandatory_chan"]
|
|
}
|
|
reply:
|
|
{
|
|
"action":"UNSUB",
|
|
"success": true,
|
|
"payload" : ["chan1"]
|
|
}
|
|
*/
|
|
action_UNSUB(action, payload, reqid){
|
|
if(!Array.isArray(payload)){
|
|
this.sendErr(action, 'Invalid payload', reqid);
|
|
return;
|
|
};
|
|
// payload only accepts NON-Bas-Prefixed chans
|
|
let unSubscribed = []
|
|
for(var chan of payload){
|
|
if((!chan) || (typeof(chan)!='string')) continue
|
|
chan = chan.replace(/\[UID\]/g, this.userId)
|
|
chan = chan.replace(/\[CNXID\]/g, this.uuid)
|
|
if(this.accessRights.isMandatory(this.userId, this.roles, chan, this.uuid)) continue
|
|
|
|
let couldUnsubscribe = false
|
|
for(const rediscnx of this.allRediscnx){
|
|
if(!chan.startsWith(rediscnx.redisConfig.chansNamespace)) continue
|
|
else couldUnsubscribe = true
|
|
let localChan = rediscnx.redisConfig.basePrefix + chan
|
|
if((localChan in rediscnx.subscriptions) && (rediscnx.subscriptions[chan].includes(this.uuid))) {
|
|
rediscnx.subscriptions[localChan].splice(rediscnx.subscriptions[localChan].indexOf(this.uuid), 1) ;
|
|
}
|
|
}
|
|
|
|
if(couldUnsubscribe && (!unSubscribed.includes(chan))) unSubscribed.push(chan)
|
|
if(couldUnsubscribe && this.subscriptions.includes(chan)) {
|
|
this.subscriptions.splice(this.subscriptions.indexOf(chan), 1);
|
|
}
|
|
}
|
|
|
|
let reply = {
|
|
'action': action,
|
|
'payload': unSubscribed,
|
|
'success': true,
|
|
};
|
|
if(reqid) reply.reqid = reqid;
|
|
this.send(JSON.stringify(reply));
|
|
},
|
|
|
|
/*
|
|
Request:
|
|
{
|
|
"action": "SUBLST",
|
|
}
|
|
reply:
|
|
{
|
|
"action": "SUBLST",
|
|
"success": true,
|
|
"payload" : ["chan1","chan2","mandatory_chan"]
|
|
}
|
|
*/
|
|
action_SUBLST(action, payload, reqid){
|
|
let reply = {
|
|
'action': action,
|
|
'payload': this.subscriptions,
|
|
'success': true,
|
|
};
|
|
if(reqid) reply.reqid = reqid;
|
|
this.send(JSON.stringify(reply));
|
|
},
|
|
|
|
/*
|
|
Request:
|
|
{
|
|
"action": "PUB",
|
|
"payload" : { 'chan':'chan1', 'msg':'Hello folks !'}
|
|
}
|
|
reply:
|
|
{
|
|
"action": "PUB",
|
|
"success": true,
|
|
}
|
|
*/
|
|
async action_PUB(action, payload, reqid){
|
|
if((typeof(payload)!='object') || (typeof(payload.chan)!='string') || (typeof(payload.msg)!='string')){
|
|
this.sendErr(action, 'Invalid payload', reqid);
|
|
if(this.debug) console.log('PUB: Invalid payload')
|
|
return;
|
|
};
|
|
// Chat chans are forbidden here
|
|
if((payload.chan.substr(0,8) == 'userchans') || (payload.chan.substr(0,9) == 'lobbychans')){
|
|
this.sendErr(action, 'Forbidden chan', reqid);
|
|
if(this.debug) console.log('PUB: Forbidden chan')
|
|
return;
|
|
};
|
|
|
|
if( (!this.accessRights.canPublish(this.userId, this.roles, payload.chan, this.uuid)) ) {
|
|
this.sendErr(action, 'Unauthorized chan !', reqid);
|
|
if(this.debug) console.log('PUB: Unauthorized chan', payload.chan, this.userId, this.roles)
|
|
return
|
|
}
|
|
|
|
let msgO
|
|
try { msgO = JSON.parse(payload.msg) } catch(err) { msgO = {'err':err} }
|
|
msgO.sender = this.userId
|
|
|
|
const chan = payload.chan
|
|
|
|
// First find the primary redis for this chan namespace, to do historization first, and get the histId
|
|
const primaryRediscnx = this.allRediscnx.find(cnx => ((chan.startsWith(cnx.redisConfig.chansNamespace)) &&(cnx.redisConfig.role=='primary')) )
|
|
if(!primaryRediscnx){
|
|
this.sendErr(action, 'No primary redis for this chan !', reqid);
|
|
if(this.debug) console.log('PUB: No primary redis for this chan ', chan)
|
|
return
|
|
}
|
|
if(primaryRediscnx.isHistorizedChan(chan)){
|
|
histId = await rediscnx.redisXadd(rediscnx.redisConfig.basePrefix+rediscnx.redisConfig.historizePrefix + chan, payload.msg, rediscnx.redisConfig.historizeMax);
|
|
if( !histId) {
|
|
this.sendErr(action, 'Could not historize, aborted event publish !', reqid);
|
|
console.error(`Could not historize for "${chan}", aborted event publish !`)
|
|
return
|
|
}
|
|
msgO.histId = histId
|
|
}
|
|
|
|
// Now publish on every Redis that covers this chan namespace
|
|
try { payload.msg = JSON.stringify(msgO) } catch(err) {payload.msg = `{"err":"${err}}"` }
|
|
for(const rediscnx of this.allRediscnx){
|
|
if(!chan.startsWith(rediscnx.redisConfig.chansNamespace)) continue
|
|
rediscnx.redisPublish(chan, payload.msg)
|
|
}
|
|
|
|
let reply = {
|
|
'action': action,
|
|
'payload': null,
|
|
'success': true,
|
|
};
|
|
if(reqid) reply.reqid = reqid;
|
|
this.send(JSON.stringify(reply));
|
|
},
|
|
|
|
/*
|
|
Request:
|
|
{
|
|
"action": "CHANHIST",
|
|
"payload": {
|
|
"chan": "aze",
|
|
"from": "123456879-0", //Histid or seconds since epoch
|
|
"to": "987654321-1" // Optional
|
|
}
|
|
}
|
|
reply:
|
|
{
|
|
"action": "CHANHIST",
|
|
"success": true,
|
|
"payload" : [
|
|
"123456879-1": { payload },
|
|
"123456885-0": { payload },
|
|
"123456890-0": { payload }
|
|
]
|
|
}
|
|
*/
|
|
async action_CHANHIST(action, payload, reqid){
|
|
if((!payload.channel) || (typeof(payload.channel)!='string') || (!payload.from) || (typeof(payload.from)!='string') || (payload.to && (typeof(payload.to)!='string'))){
|
|
this.sendErr(action, 'Invalid payload', reqid)
|
|
return
|
|
}
|
|
if( (!payload.from.match(/^(\d{13,})-(\d+)$/)) && (!payload.from.match(/^(\d{10,})$/)) ){
|
|
this.sendErr(action, 'Invalid payload', reqid)
|
|
return
|
|
}
|
|
if(payload.to && (!payload.to.match(/^(\d{13,})-(\d+)$/)) && (!payload.to.match(/^(\d{10,})$/)) ){
|
|
this.sendErr(action, 'Invalid payload', reqid)
|
|
return
|
|
}
|
|
|
|
payload.channel = payload.channel.replace(/\[UID\]/g, this.userId)
|
|
.replace(/\[CNXID\]/g, this.uuid)
|
|
if( (!this.accessRights.canSubscribe(this.userId, this.roles, payload.channel, this.uuid)) ) {
|
|
this.sendErr(action, 'CHANHIST: Unauthorized channel !', reqid)
|
|
return
|
|
}
|
|
|
|
const primaryRediscnx = this.allRediscnx.find(cnx => ((payload.channel.startsWith(cnx.redisConfig.chansNamespace)) &&(cnx.redisConfig.role=='primary')) )
|
|
if(!primaryRediscnx){
|
|
this.sendErr(action, 'No primary redis for this chan !', reqid);
|
|
if(this.debug) console.log('CHANHIST: No primary redis for this chan ', payload.channel)
|
|
return
|
|
}
|
|
|
|
if(!primaryRediscnx.isHistorizedChan(payload.channel)){
|
|
this.sendErr(action, 'CHANHIST: Not an historized channel !', reqid)
|
|
return
|
|
}
|
|
|
|
let from = (payload.from.indexOf('-')>-1) ? payload.from : 1000*payload.from
|
|
let to = '+'
|
|
if(payload.to) to = (payload.to.indexOf('-')>-1) ? payload.to : 1000*payload.to
|
|
let streamName = payload.channel.startsWith(primaryRediscnx.redisConfig.basePrefix) ? primaryRediscnx.redisConfig.historizePrefix+payload.channel.substr(primaryRediscnx.redisConfig.basePrefix.length) : primaryRediscnx.redisConfig.historizePrefix + payload.channel
|
|
let respPayload = await primaryRediscnx.redisXrange(streamName, from, to);
|
|
|
|
let reply = {
|
|
'action': action,
|
|
'payload': respPayload,
|
|
'success': true,
|
|
};
|
|
if(reqid) reply.reqid = reqid;
|
|
this.send(JSON.stringify(reply));
|
|
},
|
|
|
|
}
|