export const methods = { /* Request: { "action": "SUB", "payload" : ["chan1","chan2","unauthorized"] } Reply: (returns all active subscriptions after this SUB) { "action": "SUB", "success": true, "payload" : ["chan1","chan2","wasalreadysubscribed"] } */ action_SUB(action, payload, reqid){ if(!Array.isArray(payload)){ this.sendErr(action, 'Invalid payload', reqid); return; } // payload only accepts NON-Bas-Prefixed chans let subscribed = [] for(var chan of payload){ if((!chan) || (typeof(chan)!='string')) continue chan = chan.replace(/\[UID\]/g, this.userId) chan = chan.replace(/\[CUID\]/g, this.uuid) if(!this.accessRights.canSubscribe(this.userId, this.roles, chan, this.uuid)) { if(this.debug) console.log('SUB: No rights to this chan!', this.userId, this.roles, chan) continue } let coudSubscribe = false for(const rediscnx of this.allRediscnx){ if(!chan.startsWith(rediscnx.redisConfig.chansNamespace)) continue else coudSubscribe = true let localChan = rediscnx.redisConfig.basePrefix + chan if(!(localChan in rediscnx.subscriptions)) rediscnx.subscriptions[localChan] = []; if(!rediscnx.subscriptions[localChan].includes(this.uuid)) { rediscnx.subscriptions[localChan].push(this.uuid); } } if(coudSubscribe && (!subscribed.includes(chan))) subscribed.push(chan) if(coudSubscribe && (!this.subscriptions.includes(chan)) ){ this.subscriptions.push(chan) } } let reply = { 'action': action, 'payload': subscribed, 'success': true, }; if(reqid) reply.reqid = reqid; this.send(JSON.stringify(reply)); }, /* Request: { "action":"UNSUB", "payload" : ["chan1","notsubscribed_chan","mandatory_chan"] } reply: { "action":"UNSUB", "success": true, "payload" : ["chan1"] } */ action_UNSUB(action, payload, reqid){ if(!Array.isArray(payload)){ this.sendErr(action, 'Invalid payload', reqid); return; }; // payload only accepts NON-Bas-Prefixed chans let unSubscribed = [] for(var chan of payload){ if((!chan) || (typeof(chan)!='string')) continue chan = chan.replace(/\[UID\]/g, this.userId) chan = chan.replace(/\[CUID\]/g, this.uuid) if(this.accessRights.isMandatory(this.userId, this.roles, chan, this.uuid)) continue let couldUnsubscribe = false for(const rediscnx of this.allRediscnx){ if(!chan.startsWith(rediscnx.redisConfig.chansNamespace)) continue else couldUnsubscribe = true let localChan = rediscnx.redisConfig.basePrefix + chan if((localChan in rediscnx.subscriptions) && (rediscnx.subscriptions[chan].includes(this.uuid))) { rediscnx.subscriptions[localChan].splice(rediscnx.subscriptions[localChan].indexOf(this.uuid), 1) ; } } if(couldUnsubscribe && (!unSubscribed.includes(chan))) unSubscribed.push(chan) if(couldUnsubscribe && this.subscriptions.includes(chan)) { this.subscriptions.splice(this.subscriptions.indexOf(chan), 1); } } let reply = { 'action': action, 'payload': unSubscribed, 'success': true, }; if(reqid) reply.reqid = reqid; this.send(JSON.stringify(reply)); }, /* Request: { "action": "SUBLST", } reply: { "action": "SUBLST", "success": true, "payload" : ["chan1","chan2","mandatory_chan"] } */ action_SUBLST(action, payload, reqid){ let reply = { 'action': action, 'payload': this.subscriptions, 'success': true, }; if(reqid) reply.reqid = reqid; this.send(JSON.stringify(reply)); }, /* Request: { "action": "PUB", "payload" : { 'chan':'chan1', 'msg':'Hello folks !'} } reply: { "action": "PUB", "success": true, } */ async action_PUB(action, payload, reqid){ if((typeof(payload)!='object') || (typeof(payload.chan)!='string') || (typeof(payload.msg)!='string')){ this.sendErr(action, 'Invalid payload', reqid); if(this.debug) console.log('PUB: Invalid payload') return; }; // Chat chans are forbidden here if((payload.chan.substr(0,8) == 'userchans') || (payload.chan.substr(0,9) == 'lobbychans')){ this.sendErr(action, 'Forbidden chan', reqid); if(this.debug) console.log('PUB: Forbidden chan') return; }; if( (!this.accessRights.canPublish(this.userId, this.roles, payload.chan, this.uuid)) ) { this.sendErr(action, 'Unauthorized chan !', reqid); if(this.debug) console.log('PUB: Unauthorized chan', payload.chan, this.userId, this.roles) return } let msgO try { msgO = JSON.parse(payload.msg) } catch(err) { msgO = {'err':err} } msgO.sender = this.userId const chan = payload.chan // First find the primary redis for this chan namespace, to do historization first, and get the histId const primaryRediscnx = this.allRediscnx.find(cnx => ((chan.startsWith(cnx.redisConfig.chansNamespace)) &&(cnx.redisConfig.role=='primary')) ) if(!primaryRediscnx){ this.sendErr(action, 'No primary redis for this chan !', reqid); if(this.debug) console.log('PUB: No primary redis for this chan ', chan) return } if(primaryRediscnx.isHistorizedChan(chan)){ histId = await rediscnx.redisXadd(rediscnx.redisConfig.basePrefix+rediscnx.redisConfig.historizePrefix + chan, payload.msg, rediscnx.redisConfig.historizeMax); if( !histId) { this.sendErr(action, 'Could not historize, aborted event publish !', reqid); console.error(`Could not historize for "${chan}", aborted event publish !`) return } msgO.histId = histId } // Now publish on every Redis that covers this chan namespace try { payload.msg = JSON.stringify(msgO) } catch(err) {payload.msg = `{"err":"${err}}"` } for(const rediscnx of this.allRediscnx){ if(!chan.startsWith(rediscnx.redisConfig.chansNamespace)) continue rediscnx.redisPublish(chan, payload.msg) } let reply = { 'action': action, 'payload': null, 'success': true, }; if(reqid) reply.reqid = reqid; this.send(JSON.stringify(reply)); }, /* Request: { "action": "CHANHIST", "payload": { "chan": "aze", "from": "123456879-0", //Histid or seconds since epoch "to": "987654321-1" // Optional } } reply: { "action": "CHANHIST", "success": true, "payload" : [ "123456879-1": { payload }, "123456885-0": { payload }, "123456890-0": { payload } ] } */ async action_CHANHIST(action, payload, reqid){ if((!payload.channel) || (typeof(payload.channel)!='string') || (!payload.from) || (typeof(payload.from)!='string') || (payload.to && (typeof(payload.to)!='string'))){ this.sendErr(action, 'Invalid payload', reqid) return } if( (!payload.from.match(/^(\d{13,})-(\d+)$/)) && (!payload.from.match(/^(\d{10,})$/)) ){ this.sendErr(action, 'Invalid payload', reqid) return } if(payload.to && (!payload.to.match(/^(\d{13,})-(\d+)$/)) && (!payload.to.match(/^(\d{10,})$/)) ){ this.sendErr(action, 'Invalid payload', reqid) return } payload.channel = payload.channel.replace(/\[UID\]/g, this.userId) .replace(/\[CUID\]/g, this.uuid) if( (!this.accessRights.canSubscribe(this.userId, this.roles, payload.channel, this.uuid)) ) { this.sendErr(action, 'CHANHIST: Unauthorized channel !', reqid) return } const primaryRediscnx = this.allRediscnx.find(cnx => ((payload.channel.startsWith(cnx.redisConfig.chansNamespace)) &&(cnx.redisConfig.role=='primary')) ) if(!primaryRediscnx){ this.sendErr(action, 'No primary redis for this chan !', reqid); if(this.debug) console.log('CHANHIST: No primary redis for this chan ', payload.channel) return } if(!primaryRediscnx.isHistorizedChan(payload.channel)){ this.sendErr(action, 'CHANHIST: Not an historized channel !', reqid) return } let from = (payload.from.indexOf('-')>-1) ? payload.from : 1000*payload.from let to = '+' if(payload.to) to = (payload.to.indexOf('-')>-1) ? payload.to : 1000*payload.to let streamName = payload.channel.startsWith(primaryRediscnx.redisConfig.basePrefix) ? primaryRediscnx.redisConfig.historizePrefix+payload.channel.substr(primaryRediscnx.redisConfig.basePrefix.length) : primaryRediscnx.redisConfig.historizePrefix + payload.channel let respPayload = await primaryRediscnx.redisXrange(streamName, from, to); let reply = { 'action': action, 'payload': respPayload, 'success': true, }; if(reqid) reply.reqid = reqid; this.send(JSON.stringify(reply)); }, }