209 lines
8.5 KiB
JavaScript
209 lines
8.5 KiB
JavaScript
const redis = require('redis')
|
|
const sha256 = require('sha256')
|
|
const midasActions = require('../actions')
|
|
const AccesRights = require('../accesRights')
|
|
const crypto = require('crypto')
|
|
|
|
module.exports = class RedisConnexion {
|
|
|
|
constructor(options) {
|
|
Object.assign(this,midasActions.methods)
|
|
this.config = options.config
|
|
this.debug = options.debug
|
|
this.cfgh = options.cfgh
|
|
this.midasSubscriptions = []
|
|
this.accessRights = new AccesRights(this.config, options.debug)
|
|
this.AWScodeCommitRole = 'arn:aws:iam::959160311435:role/EIC-CodeCommitReader'
|
|
|
|
// Plugins related
|
|
this.plugins = {}
|
|
this.gitRepoName = 'myeic-midas-service'
|
|
this.gitRepoPath = `/tmp/${this.gitRepoName}`
|
|
this.repoPluginsPath = `${this.gitRepoPath}/plugins`
|
|
this.newPluginsPath = __dirname+'/newPlugins'
|
|
this.currentPluginsPath = __dirname+'/plugins'
|
|
this.pluginsReloadLock = false
|
|
|
|
this.redisClient = redis.createClient({
|
|
socket: {
|
|
tls: this.config.redis.tls,
|
|
host: this.config.redis.host,
|
|
port: this.config.redis.port
|
|
},
|
|
scripts:{
|
|
// Atomically deletes key only if correct value provided.
|
|
// Used by mutex as per https://redis.io/commands/set/#patterns
|
|
delKeyVal: redis.defineScript({
|
|
NUMBER_OF_KEYS: 1,
|
|
SCRIPT:`
|
|
if redis.call("get",KEYS[1]) == ARGV[1]
|
|
then
|
|
return redis.call("del",KEYS[1])
|
|
else
|
|
return 0
|
|
end
|
|
`,
|
|
transformArguments(key, toAdd) {
|
|
return [key, toAdd.toString()];
|
|
},
|
|
transformReply(reply) {
|
|
return reply;
|
|
}
|
|
})
|
|
}
|
|
});
|
|
|
|
this.redisSubscriber = null;
|
|
this.redisClient.on('error', (err) => {
|
|
console.error('Redis error: ', err);
|
|
});
|
|
|
|
if(this.debug) console.log('Redis started...')
|
|
}
|
|
|
|
async redisLogin(){
|
|
if(this.debug) console.log(`Connecting to Redis (${this.config.redis.host}:${this.config.redis.port}, tls:${this.config.redis.tls?'yes':'no'})...`);
|
|
await this.redisClient.connect();
|
|
if(this.debug) console.log('Connected to Redis !');
|
|
if(this.config.redis.user) {
|
|
await this.redisClient.sendCommand(['AUTH', this.config.redis.user, this.config.redis.pass]);
|
|
if(this.debug) console.log('Logged into Redis !');
|
|
} else {
|
|
if(this.debug) console.log('Connected (anon) to Redis...');
|
|
}
|
|
if(this.debug) {
|
|
var redisTime = await this.redisClient.time()
|
|
console.log('Redis time:', redisTime)
|
|
}
|
|
}
|
|
|
|
async redisChansStart(){ // Second subscriber connexion
|
|
this.redisSubscriber = this.redisClient.duplicate();
|
|
await this.redisSubscriber.connect();
|
|
if(this.config.redis.user) {
|
|
await this.redisSubscriber.sendCommand(['AUTH', this.config.redis.user, this.config.redis.pass]);
|
|
}
|
|
|
|
// Gather my chans of interest
|
|
this.refreshPluginsChans()
|
|
|
|
if(this.debug) console.log('PSubscription OK ', this.config.redis.basePrefix + '*');
|
|
}
|
|
|
|
async redisSubscribe(chanName, callBack){
|
|
if(!chanName.startsWith(this.config.redis.basePrefix)) chanName = this.config.redis.basePrefix + chanName
|
|
await this.redisSubscriber.subscribe(chanName, callBack);
|
|
}
|
|
|
|
async redisPublish(chanName, msg){
|
|
if(typeof (msg) != 'string') msg = JSON.stringify(msg);
|
|
if(!chanName.startsWith(this.config.redis.basePrefix)) chanName = this.config.redis.basePrefix + chanName
|
|
await this.redisClient.publish(chanName, msg);
|
|
}
|
|
|
|
|
|
async redisSet(k, v, exp = 0, customPrefix=null){
|
|
if(typeof(v) != 'string') v = JSON.stringify(v);
|
|
if(customPrefix!==null) k = customPrefix + k
|
|
else if(!k.startsWith(this.config.redis.basePrefix)) k = this.config.redis.basePrefix + k
|
|
if(this.debug) console.log('Redis SET ', k)
|
|
try { await this.redisClient.set(k, v) }
|
|
catch(err) { console.error('Redis crash doing Redis set: ', k, v) }
|
|
if(exp > 0) {
|
|
try { await this.redisClient.expire(k, exp) }
|
|
catch(err) { console.error('Redis crash doing Redis expire: ', k, exp) }
|
|
}
|
|
}
|
|
|
|
async redisGet(k, customPrefix=null){
|
|
if(customPrefix!==null) k = customPrefix + k
|
|
else if(!k.startsWith(this.config.redis.basePrefix)) k = this.config.redis.basePrefix + k
|
|
if(this.debug) console.log('Redis GET ', k)
|
|
let v=null
|
|
try { v = await this.redisClient.get(k) }
|
|
catch(err) { console.error('Redis crash doing Redis get: ', k) }
|
|
return(v);
|
|
}
|
|
|
|
async redisDel(k, customPrefix=null){
|
|
if(customPrefix!==null) k = customPrefix + k
|
|
else if(!k.startsWith(this.config.redis.basePrefix)) k = this.config.redis.basePrefix + k
|
|
if(this.debug) console.log('Deleting ', k);
|
|
await this.redisClient.del(k);
|
|
}
|
|
|
|
|
|
async redisGetTtl(k, customPrefix=null){
|
|
if(customPrefix!==null) k = customPrefix + k
|
|
else if(!k.startsWith(this.config.redis.basePrefix)) k = this.config.redis.basePrefix + k
|
|
if(this.debug) console.log('Redis Get TTL ', k)
|
|
let v=null
|
|
try { v = await this.redisClient.ttl(k) }
|
|
catch(err) { console.error('Redis crash doing Redis ttl: ', k) }
|
|
return(v);
|
|
}
|
|
|
|
async redisSetTtl(k, ttl, customPrefix=null){
|
|
if(customPrefix!==null) k = customPrefix + k
|
|
else if(!k.startsWith(this.config.redis.basePrefix)) k = this.config.redis.basePrefix + k
|
|
if(this.debug) console.log('Redis Set TTL ', k);
|
|
try { await this.redisClient.expire(k, ttl) }
|
|
catch(err) { console.error('Redis crash doing Redis expire: ', k, ttl) }
|
|
}
|
|
|
|
async redisXadd(streamName, kvObj, max = ''){
|
|
if(!streamName.startsWith(this.config.redis.basePrefix)) streamName = this.config.redis.basePrefix + streamName
|
|
if(this.debug) console.log('Redis XADD ', streamName, kvObj);
|
|
let arr = ['XADD', streamName]
|
|
if(max != '') arr = [...arr, ...['MAXLEN', '~', (1*max).toString()]]
|
|
arr.push('*')
|
|
let payload = '""'
|
|
try{ payload = JSON.stringify(kvObj) }
|
|
catch(e) { console.warn('cannot historize bad json: ',kvObj) }
|
|
arr.push('streamData')
|
|
arr.push(payload)
|
|
let sid = null
|
|
try { sid = await this.redisClient.sendCommand(arr); }
|
|
catch(err) { console.error('Redis crash doing Redis command: ', arr, err) }
|
|
return(sid);
|
|
}
|
|
|
|
async redisXrange(streamName, start = '-', end = '+'){
|
|
if(!streamName.startsWith(this.config.redis.basePrefix)) streamName = this.config.redis.basePrefix + streamName
|
|
if(this.debug) console.log('Redis XRANGE ', streamName);
|
|
if(typeof(start)!='string') start = start.toString()
|
|
if(typeof(end)!='string') end = end.toString()
|
|
let arr = ['XRANGE', streamName, start, end];
|
|
let res = []
|
|
try { res = await this.redisClient.sendCommand(arr) }
|
|
catch(err) { console.error('Redis crash doing Redis command: ', arr, err.msg) }
|
|
let o = {};
|
|
for (let row of res) { // We'll take only the first content of the stream (the value of the key 'streamdata')
|
|
let payload = ''
|
|
try{ payload = JSON.parse(row[1][1]) }
|
|
catch(e) { console.warn('cannot unhistorize bad json: ',row[1][1]) }
|
|
o[row[0]] = payload
|
|
}
|
|
return(o);
|
|
}
|
|
|
|
isHistorizedChan(chan){
|
|
if(!chan.startsWith(this.config.redis.basePrefix)) chan = this.config.redis.basePrefix + chan
|
|
var matches = this.config.historize.historizeChannels.filter((e) => {
|
|
if(!e.startsWith(this.config.redis.basePrefix)) e = this.config.redis.basePrefix + e
|
|
if(e.indexOf('*') > -1) {
|
|
let r = new RegExp('^'+e.replace(/\*/g,'(.+)')+'$','g')
|
|
return(chan.match(r) != null);
|
|
} else return(chan == e);
|
|
});
|
|
return(matches.length > 0);
|
|
}
|
|
|
|
|
|
}
|
|
|
|
/**
|
|
* TODOs:
|
|
* - Subscribe only to chans of interest (action chan + each plugin chan), not all-then-filter, we're not a gaeway !
|
|
*
|
|
*/ |