2nd
This commit is contained in:
@@ -0,0 +1,254 @@
|
||||
const redis = require('redis');
|
||||
|
||||
module.exports = class RedisConnexion {
|
||||
|
||||
constructor(options) {
|
||||
this.config = options.config;
|
||||
this.debug = options.debug;
|
||||
|
||||
this.subscriptions = {}; // Externally fed
|
||||
this.wssConnections = {}; // Externally fed
|
||||
this.redPillsUuids = []; // Externally fed
|
||||
|
||||
this.redisClient = redis.createClient({
|
||||
socket: {
|
||||
tls: this.config.redis.tls,
|
||||
host: this.config.redis.host,
|
||||
port: this.config.redis.port
|
||||
}
|
||||
});
|
||||
|
||||
this.redisSubscriber = null;
|
||||
this.redisClient.on('error', (err) => {
|
||||
console.error('Redis error: ', err);
|
||||
});
|
||||
|
||||
if(this.debug) console.log('Redis started...')
|
||||
}
|
||||
|
||||
getSubscribedUuids(chan){
|
||||
let uuids = []
|
||||
let re
|
||||
for(let subChan in this.subscriptions){
|
||||
if(subChan==chan){
|
||||
uuids = [...uuids, ...this.subscriptions[subChan]]
|
||||
} else {
|
||||
re = new RegExp('^'+subChan.replace(/\*/g,'(.+)')+'$','g')
|
||||
if(chan.match(re)!=null){
|
||||
uuids = [...uuids, ...this.subscriptions[subChan]]
|
||||
}
|
||||
}
|
||||
}
|
||||
return(uuids)
|
||||
}
|
||||
|
||||
async redisLogin(){
|
||||
if(this.debug) console.log(`Connecting to Redis (${this.config.redis.host}:${this.config.redis.port}, tls:${this.config.redis.tls?'yes':'no'})...`);
|
||||
await this.redisClient.connect();
|
||||
if(this.debug) console.log('Connected to Redis !');
|
||||
if(this.config.redis.user) {
|
||||
await this.redisClient.sendCommand(['AUTH', this.config.redis.user, this.config.redis.pass]);
|
||||
if(this.debug) console.log('Logged into Redis !');
|
||||
} else {
|
||||
if(this.debug) console.log('Connected (anon) to Redis...');
|
||||
}
|
||||
if(this.debug) {
|
||||
var redisTime = await this.redisClient.time();
|
||||
console.log('Redis time:', redisTime);
|
||||
}
|
||||
}
|
||||
|
||||
async redisChansStart(){
|
||||
this.redisSubscriber = this.redisClient.duplicate();
|
||||
await this.redisSubscriber.connect();
|
||||
if(this.config.redis.user) {
|
||||
await this.redisSubscriber.sendCommand(['AUTH', this.config.redis.user, this.config.redis.pass]);
|
||||
}
|
||||
this.redisSubscriber.pSubscribe(this.config.redis.basePrefix + '*', this.redisReceive.bind(this));
|
||||
if(this.debug) console.log('PSubscription OK ', this.config.redis.basePrefix + '*');
|
||||
}
|
||||
|
||||
async redisSubscribe(chanName, callBack){
|
||||
if(!chanName.startsWith(this.config.redis.basePrefix)) chanName = this.config.redis.basePrefix + chanName
|
||||
await this.redisSubscriber.subscribe(chanName, callBack);
|
||||
}
|
||||
|
||||
async redisPublish(chanName, msg){
|
||||
if(typeof (msg) != 'string') msg = JSON.stringify(msg);
|
||||
if(!chanName.startsWith(this.config.redis.basePrefix)) chanName = this.config.redis.basePrefix + chanName
|
||||
await this.redisClient.publish(chanName, msg);
|
||||
}
|
||||
|
||||
async redisRefreshSession(k){
|
||||
await this.redisClient.expire(k, this.config.server.sessionExpiration);
|
||||
}
|
||||
|
||||
async redisSet(k, v, exp = 0, customPrefix=null){
|
||||
if(typeof(v) != 'string') v = JSON.stringify(v);
|
||||
if(customPrefix!==null) k = customPrefix + k
|
||||
else if(!k.startsWith(this.config.redis.basePrefix)) k = this.config.redis.basePrefix + k
|
||||
if(this.debug) console.log('Redis SET ', k);
|
||||
try { await this.redisClient.set(k, v) }
|
||||
catch(err) { console.error('Redis crash doing Redis set: ', k, v) }
|
||||
if(exp > 0) {
|
||||
try { await this.redisClient.expire(k, exp) }
|
||||
catch(err) { console.error('Redis crash doing Redis expire: ', k, exp) }
|
||||
}
|
||||
}
|
||||
|
||||
async redisGet(k, customPrefix=null){
|
||||
if(customPrefix!==null) k = customPrefix + k
|
||||
else if(!k.startsWith(this.config.redis.basePrefix)) k = this.config.redis.basePrefix + k
|
||||
if(this.debug) console.log('Redis GET ', k)
|
||||
let v=null
|
||||
try { v = await this.redisClient.get(k) }
|
||||
catch(err) { console.error('Redis crash doing Redis get: ', k) }
|
||||
return(v);
|
||||
}
|
||||
|
||||
async redisDel(k, customPrefix=null){
|
||||
if(customPrefix!==null) k = customPrefix + k
|
||||
else if(!k.startsWith(this.config.redis.basePrefix)) k = this.config.redis.basePrefix + k
|
||||
if(this.debug) console.log('Deleting ', k);
|
||||
await this.redisClient.del(k);
|
||||
}
|
||||
|
||||
|
||||
async redisGetTtl(k, customPrefix=null){
|
||||
if(customPrefix!==null) k = customPrefix + k
|
||||
else if(!k.startsWith(this.config.redis.basePrefix)) k = this.config.redis.basePrefix + k
|
||||
if(this.debug) console.log('Redis Get TTL ', k)
|
||||
let v=null
|
||||
try { v = await this.redisClient.ttl(k) }
|
||||
catch(err) { console.error('Redis crash doing Redis ttl: ', k) }
|
||||
return(v);
|
||||
}
|
||||
|
||||
async redisSetTtl(k, ttl, customPrefix=null){
|
||||
if(customPrefix!==null) k = customPrefix + k
|
||||
else if(!k.startsWith(this.config.redis.basePrefix)) k = this.config.redis.basePrefix + k
|
||||
if(this.debug) console.log('Redis Set TTL ', k);
|
||||
try { await this.redisClient.expire(k, ttl) }
|
||||
catch(err) { console.error('Redis crash doing Redis expire: ', k, ttl) }
|
||||
}
|
||||
|
||||
async redisXadd(streamName, kvObj, max = ''){
|
||||
if(!streamName.startsWith(this.config.redis.basePrefix)) streamName = this.config.redis.basePrefix + streamName
|
||||
if(this.debug) console.log('Redis XADD ', streamName, kvObj);
|
||||
let arr = ['XADD', streamName]
|
||||
if(max != '') arr = [...arr, ...['MAXLEN', '~', (1*max).toString()]]
|
||||
arr.push('*')
|
||||
let payload = '""'
|
||||
try{ payload = JSON.stringify(kvObj) }
|
||||
catch(e) { console.warn('cannot historize bad json: ',kvObj) }
|
||||
arr.push('streamData')
|
||||
arr.push(payload)
|
||||
let sid = null
|
||||
try { sid = await this.redisClient.sendCommand(arr); }
|
||||
catch(err) { console.error('Redis crash doing Redis command: ', arr, err) }
|
||||
return(sid);
|
||||
}
|
||||
|
||||
async redisXrange(streamName, start = '-', end = '+', withPayload = true){
|
||||
if(!streamName.startsWith(this.config.redis.basePrefix)) streamName = this.config.redis.basePrefix + streamName
|
||||
if(this.debug) console.log('Redis XRANGE ', streamName);
|
||||
if(typeof(start)!='string') start = start.toString()
|
||||
if(typeof(end)!='string') end = end.toString()
|
||||
let arr = ['XRANGE', streamName, start, end];
|
||||
let res = []
|
||||
try { res = await this.redisClient.sendCommand(arr) }
|
||||
catch(err) { console.error('Redis crash doing Redis command: ', arr, err.msg) }
|
||||
|
||||
if(withPayload){
|
||||
let o = {};
|
||||
for (let row of res) { // We'll take only the first content of the stream (the value of the key 'streamdata')
|
||||
let payload = ''
|
||||
try{ payload = JSON.parse(row[1][1]) }
|
||||
catch(e) { console.warn('cannot unhistorize bad json: ',row[1][1]) }
|
||||
o[row[0]] = payload
|
||||
}
|
||||
return(o);
|
||||
} else {
|
||||
return(res.map(row => row[0]))
|
||||
}
|
||||
}
|
||||
|
||||
isHistorizedChan(chan){
|
||||
if(!chan.startsWith(this.config.redis.basePrefix)) chan = this.config.redis.basePrefix + chan
|
||||
var matches = this.config.redis.historizeChannels.filter((e) => {
|
||||
if(!e.startsWith(this.config.redis.basePrefix)) e = this.config.redis.basePrefix + e
|
||||
if(e.indexOf('*') > -1) {
|
||||
let r = new RegExp('^'+e.replace(/\*/g,'(.+)')+'$','g')
|
||||
return(chan.match(r) != null);
|
||||
} else return(chan == e);
|
||||
});
|
||||
return(matches.length > 0);
|
||||
}
|
||||
|
||||
async getProcessInfo(){
|
||||
if(this.debug) console.log('Redis NIFO ');
|
||||
try {
|
||||
const rawInfo = await this.redisClient.INFO()
|
||||
let arr = rawInfo.replace(/\r/g,'').split('\n')
|
||||
arr = arr.filter(item => (item.trim()!='') && (!item.trim().startsWith('#')))
|
||||
let infoObject = arr.reduce((acc, val) => { kv = val.split(':',2); if(kv.length>0){ acc[kv[0]] = kv[1] } return(acc) }, {})
|
||||
}
|
||||
catch(err) { console.error('Redis crash doing Redis INFO: ') }
|
||||
return(infoObject)
|
||||
}
|
||||
|
||||
async redisReceive(msg, chan){
|
||||
if(this.debug) console.log('From Redis chan:', chan, msg);
|
||||
|
||||
try {
|
||||
msg = JSON.parse(msg);
|
||||
} catch {
|
||||
console.warn(`Ignoring non-json on channel ${chan} : ${msg}`);
|
||||
return;
|
||||
}
|
||||
|
||||
if(this.redPillsUuids.length>0) { // Any dev bus console in RedPills (promiscuous) mode ?
|
||||
if(this.debug) console.log(`Will send to ${this.redPillsUuids.length} REDPILLS`);
|
||||
let shortChan = chan.startsWith(this.config.redis.basePrefix) ? chan.substr(this.config.redis.basePrefix.length) : chan
|
||||
let payload ={
|
||||
'event': 'REDISMSG',
|
||||
'payload': {
|
||||
'bmsg':{ // Extra encapsulation to avoid triggering normal listeners on FE
|
||||
'msg': msg,
|
||||
'chan': shortChan,
|
||||
}
|
||||
}
|
||||
}
|
||||
for(var uuid of this.redPillsUuids) {
|
||||
if(uuid in this.wssConnections) {
|
||||
this.wssConnections[uuid].send(JSON.stringify(payload));
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
if(this.debug) console.log('will now fanout...', chan, msg);
|
||||
const uuids = this.getSubscribedUuids(chan)
|
||||
if(uuids.length>0) { // Anyone interested at all about this chan ?
|
||||
if(this.debug) console.log(`Will broadcast to ${uuids.length} web clients`);
|
||||
let shortChan = chan.startsWith(this.config.redis.basePrefix) ? chan.substr(this.config.redis.basePrefix.length) : chan
|
||||
let payload ={
|
||||
'event': 'REDISMSG',
|
||||
'payload': {
|
||||
'msg': msg,
|
||||
'chan': shortChan,
|
||||
|
||||
}
|
||||
}
|
||||
for(var uuid of uuids) {
|
||||
if(uuid in this.wssConnections) {
|
||||
if(this.debug) console.log('Sending to ', uuid, payload);
|
||||
this.wssConnections[uuid].send(JSON.stringify(payload));
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
|
||||
// redis-cli -h redis.backend.eismea.eu --tls --user default
|
||||
Reference in New Issue
Block a user