const redis = require('redis') const sha256 = require('sha256') const midasActions = require('../actions') const AccesRights = require('../accesRights') const crypto = require('crypto') module.exports = class RedisConnexion { constructor(options) { Object.assign(this,midasActions.methods) this.config = options.config this.debug = options.debug this.cfgh = options.cfgh this.midasSubscriptions = [] this.accessRights = new AccesRights(this.config, options.debug) // Plugins related this.plugins = {} this.gitRepoName = 'myeic-midas-service' this.gitRepoPath = `/tmp/${this.gitRepoName}` this.repoPluginsPath = `${this.gitRepoPath}/plugins` this.newPluginsPath = __dirname+'/newPlugins' this.currentPluginsPath = __dirname+'/plugins' this.pluginsReloadLock = false this.redisClient = redis.createClient({ socket: { tls: this.config.redis.tls, host: this.config.redis.host, port: this.config.redis.port }, scripts:{ // Atomically deletes key only if correct value provided. // Used by mutex as per https://redis.io/commands/set/#patterns delKeyVal: redis.defineScript({ NUMBER_OF_KEYS: 1, SCRIPT:` if redis.call("get",KEYS[1]) == ARGV[1] then return redis.call("del",KEYS[1]) else return 0 end `, transformArguments(key, toAdd) { return [key, toAdd.toString()]; }, transformReply(reply) { return reply; } }) } }); this.redisSubscriber = null; this.redisClient.on('error', (err) => { console.error('Redis error: ', err); }); if(this.debug) console.log('Redis started...') } async redisLogin(){ if(this.debug) console.log(`Connecting to Redis (${this.config.redis.host}:${this.config.redis.port}, tls:${this.config.redis.tls?'yes':'no'})...`); await this.redisClient.connect(); if(this.debug) console.log('Connected to Redis !'); if(this.config.redis.user) { await this.redisClient.sendCommand(['AUTH', this.config.redis.user, this.config.redis.pass]); if(this.debug) console.log('Logged into Redis !'); } else { if(this.debug) console.log('Connected (anon) to Redis...'); } if(this.debug) { var redisTime = await this.redisClient.time() console.log('Redis time:', redisTime) } } async redisChansStart(){ // Second subscriber connexion this.redisSubscriber = this.redisClient.duplicate(); await this.redisSubscriber.connect(); if(this.config.redis.user) { await this.redisSubscriber.sendCommand(['AUTH', this.config.redis.user, this.config.redis.pass]); } // Gather my chans of interest this.refreshPluginsChans() if(this.debug) console.log('PSubscription OK ', this.config.redis.basePrefix + '*'); } async redisSubscribe(chanName, callBack){ if(!chanName.startsWith(this.config.redis.basePrefix)) chanName = this.config.redis.basePrefix + chanName await this.redisSubscriber.subscribe(chanName, callBack); } async redisPublish(chanName, msg){ if(typeof (msg) != 'string') msg = JSON.stringify(msg); if(!chanName.startsWith(this.config.redis.basePrefix)) chanName = this.config.redis.basePrefix + chanName await this.redisClient.publish(chanName, msg); } async redisSet(k, v, exp = 0, customPrefix=null){ if(typeof(v) != 'string') v = JSON.stringify(v); if(customPrefix!==null) k = customPrefix + k else if(!k.startsWith(this.config.redis.basePrefix)) k = this.config.redis.basePrefix + k if(this.debug) console.log('Redis SET ', k) try { await this.redisClient.set(k, v) } catch(err) { console.error('Redis crash doing Redis set: ', k, v) } if(exp > 0) { try { await this.redisClient.expire(k, exp) } catch(err) { console.error('Redis crash doing Redis expire: ', k, exp) } } } async redisGet(k, customPrefix=null){ if(customPrefix!==null) k = customPrefix + k else if(!k.startsWith(this.config.redis.basePrefix)) k = this.config.redis.basePrefix + k if(this.debug) console.log('Redis GET ', k) let v=null try { v = await this.redisClient.get(k) } catch(err) { console.error('Redis crash doing Redis get: ', k) } return(v); } async redisDel(k, customPrefix=null){ if(customPrefix!==null) k = customPrefix + k else if(!k.startsWith(this.config.redis.basePrefix)) k = this.config.redis.basePrefix + k if(this.debug) console.log('Deleting ', k); await this.redisClient.del(k); } async redisGetTtl(k, customPrefix=null){ if(customPrefix!==null) k = customPrefix + k else if(!k.startsWith(this.config.redis.basePrefix)) k = this.config.redis.basePrefix + k if(this.debug) console.log('Redis Get TTL ', k) let v=null try { v = await this.redisClient.ttl(k) } catch(err) { console.error('Redis crash doing Redis ttl: ', k) } return(v); } async redisSetTtl(k, ttl, customPrefix=null){ if(customPrefix!==null) k = customPrefix + k else if(!k.startsWith(this.config.redis.basePrefix)) k = this.config.redis.basePrefix + k if(this.debug) console.log('Redis Set TTL ', k); try { await this.redisClient.expire(k, ttl) } catch(err) { console.error('Redis crash doing Redis expire: ', k, ttl) } } async redisXadd(streamName, kvObj, max = ''){ if(!streamName.startsWith(this.config.redis.basePrefix)) streamName = this.config.redis.basePrefix + streamName if(this.debug) console.log('Redis XADD ', streamName, kvObj); let arr = ['XADD', streamName] if(max != '') arr = [...arr, ...['MAXLEN', '~', (1*max).toString()]] arr.push('*') let payload = '""' try{ payload = JSON.stringify(kvObj) } catch(e) { console.warn('cannot historize bad json: ',kvObj) } arr.push('streamData') arr.push(payload) let sid = null try { sid = await this.redisClient.sendCommand(arr); } catch(err) { console.error('Redis crash doing Redis command: ', arr, err) } return(sid); } async redisXrange(streamName, start = '-', end = '+'){ if(!streamName.startsWith(this.config.redis.basePrefix)) streamName = this.config.redis.basePrefix + streamName if(this.debug) console.log('Redis XRANGE ', streamName); if(typeof(start)!='string') start = start.toString() if(typeof(end)!='string') end = end.toString() let arr = ['XRANGE', streamName, start, end]; let res = [] try { res = await this.redisClient.sendCommand(arr) } catch(err) { console.error('Redis crash doing Redis command: ', arr, err.msg) } let o = {}; for (let row of res) { // We'll take only the first content of the stream (the value of the key 'streamdata') let payload = '' try{ payload = JSON.parse(row[1][1]) } catch(e) { console.warn('cannot unhistorize bad json: ',row[1][1]) } o[row[0]] = payload } return(o); } isHistorizedChan(chan){ if(!chan.startsWith(this.config.redis.basePrefix)) chan = this.config.redis.basePrefix + chan var matches = this.config.historize.historizeChannels.filter((e) => { if(!e.startsWith(this.config.redis.basePrefix)) e = this.config.redis.basePrefix + e if(e.indexOf('*') > -1) { let r = new RegExp('^'+e.replace(/\*/g,'(.+)')+'$','g') return(chan.match(r) != null); } else return(chan == e); }); return(matches.length > 0); } } /** * TODOs: * - Subscribe only to chans of interest (action chan + each plugin chan), not all-then-filter, we're not a gaeway ! * */