296 lines
13 KiB
JavaScript
296 lines
13 KiB
JavaScript
import redis from 'redis'
|
|
|
|
export class RedisConnexion {
|
|
|
|
constructor(options) {
|
|
this.config = options.config
|
|
this.debug = options.debug
|
|
this.redisId = options.redisId
|
|
this.redisConfig = this.config
|
|
this.meshName = options.meshName
|
|
this.meshModule = options.meshModule ?? null
|
|
this.senderId = options.senderId ?? null
|
|
this.actionsReply = options.actionsReply ?? null
|
|
|
|
if(this.meshModule?.actionHandlers) Object.assign(this, this.meshModule.actionHandlers)
|
|
this.afterLogin = this.meshModule?.afterLogin ?? []
|
|
|
|
this.redisClient = redis.createClient({
|
|
socket: {
|
|
tls: this.redisConfig.tls,
|
|
host: this.redisConfig.host,
|
|
port: this.redisConfig.port
|
|
}
|
|
})
|
|
|
|
this.redisSubscriber = null
|
|
this.redisClient.on('error', (err) => {
|
|
console.error('Redis error: ', err)
|
|
})
|
|
|
|
if(this.debug) console.log(`[${this.redisConfig.redisId}] Redis started...`)
|
|
}
|
|
|
|
fullChan(chanName){
|
|
if(!chanName.startsWith(this.redisConfig.basePrefix)) chanName = this.redisConfig.basePrefix + chanName
|
|
return(chanName)
|
|
}
|
|
|
|
matchesChan(chan, pattern) {
|
|
const fullChan = this.fullChan(chan)
|
|
const fullPattern = this.fullChan(pattern)
|
|
const re = new RegExp('^' + fullPattern.replace(/\*/g, '(.+)') + '$')
|
|
return(re.test(fullChan))
|
|
}
|
|
|
|
async redisLogin(){
|
|
if(this.debug) console.log(`Connecting to Redis (${this.redisConfig.host}:${this.redisConfig.port}, tls:${this.redisConfig.tls?'yes':'no'})...`)
|
|
await this.redisClient.connect()
|
|
if(this.debug) console.log(`Connected to Redis ${this.redisConfig.redisId}`)
|
|
if(this.redisConfig.user) {
|
|
await this.redisClient.sendCommand(['AUTH', this.redisConfig.user, this.redisConfig.pass])
|
|
if(this.debug) console.log(`Logged into Redis ${this.redisConfig.redisId}`)
|
|
} else {
|
|
if(this.debug) console.log(`Connected (anon) to Redis ${this.redisConfig.redisId}`)
|
|
}
|
|
if(this.debug) {
|
|
var redisTime = await this.redisClient.time()
|
|
console.log(`[${this.redisConfig.redisId}] Redis ${this.redisConfig.redisId} time:`, redisTime)
|
|
}
|
|
|
|
for(const method of this.afterLogin){
|
|
if(typeof method != 'function') continue
|
|
method(this)
|
|
}
|
|
}
|
|
|
|
async redisChansStart(){
|
|
this.redisSubscriber = this.redisClient.duplicate()
|
|
await this.redisSubscriber.connect()
|
|
if(this.redisConfig.user) {
|
|
await this.redisSubscriber.sendCommand(['AUTH', this.redisConfig.user, this.redisConfig.pass])
|
|
}
|
|
const allChans = this.redisConfig.basePrefix + this.redisConfig.chansNamespace+'*'
|
|
this.redisSubscriber.pSubscribe(allChans, this.redisReceive.bind(this))
|
|
if(this.debug) console.log(`[${this.redisConfig.redisId}] PSubscription OK `, allChans)
|
|
}
|
|
|
|
async redisSubscribe(chanName, callBack){
|
|
if(!chanName.startsWith(this.redisConfig.basePrefix)) chanName = this.redisConfig.basePrefix + chanName
|
|
if(!chanName.startsWith(this.redisConfig.basePrefix+this.redisConfig.chansNamespace)) {
|
|
console.warn(`[${this.redisConfig.redisId}] redisSubscribe : forbidden channel range on this redis !`)
|
|
return
|
|
}
|
|
await this.redisSubscriber.subscribe(chanName, callBack)
|
|
}
|
|
|
|
async redisPublish(chanName, msg){
|
|
if(typeof (msg) != 'string') msg = JSON.stringify(msg)
|
|
if(!chanName.startsWith(this.redisConfig.basePrefix)) chanName = this.redisConfig.basePrefix + chanName
|
|
if(!chanName.startsWith(this.redisConfig.basePrefix+this.redisConfig.chansNamespace)) {
|
|
console.warn(`[${this.redisConfig.redisId}] redisPublish : forbidden channel range on this redis ! (${chanName} / ${this.redisConfig.basePrefix+this.redisConfig.chansNamespace}) `)
|
|
return
|
|
}
|
|
|
|
await this.redisClient.publish(chanName, msg)
|
|
}
|
|
|
|
async redisSet(k, v, exp = 0, customPrefix=null){
|
|
if(typeof(v) != 'string') v = JSON.stringify(v)
|
|
if(customPrefix!==null) k = customPrefix + k
|
|
else if(!k.startsWith(this.redisConfig.basePrefix)) k = this.redisConfig.basePrefix + k
|
|
if(this.debug) console.log(`[${this.redisConfig.redisId}] Redis SET `, k)
|
|
try { await this.redisClient.set(k, v) }
|
|
catch(err) { console.error(`[${this.redisConfig.redisId}] Redis crash doing Redis set: `, k, v) }
|
|
if(exp > 0) {
|
|
try { await this.redisClient.expire(k, exp) }
|
|
catch(err) { console.error(`[${this.redisConfig.redisId}] Redis crash doing Redis expire: `, k, exp) }
|
|
}
|
|
}
|
|
|
|
async redisGet(k, customPrefix=null){
|
|
if(customPrefix!==null) k = customPrefix + k
|
|
else if(!k.startsWith(this.redisConfig.basePrefix)) k = this.redisConfig.basePrefix + k
|
|
if(this.debug) console.log(`[${this.redisConfig.redisId}] Redis GET`, k)
|
|
let v=null
|
|
try { v = await this.redisClient.get(k) }
|
|
catch(err) { console.error(`[${this.redisConfig.redisId}] Redis crash doing Redis get: `, k) }
|
|
return(v)
|
|
}
|
|
|
|
resolveKey(k, customPrefix=null){
|
|
if(customPrefix!==null) return(customPrefix + k)
|
|
if(!k.startsWith(this.redisConfig.basePrefix)) k = this.redisConfig.basePrefix + k
|
|
return(k)
|
|
}
|
|
|
|
async redisDel(k, customPrefix=null){
|
|
k = this.resolveKey(k, customPrefix)
|
|
if(this.debug) console.log(`[${this.redisConfig.redisId}] Deleting`, k)
|
|
await this.redisClient.del(k)
|
|
}
|
|
|
|
async redisHget(k, field, customPrefix=null){
|
|
k = this.resolveKey(k, customPrefix)
|
|
if(this.debug) console.log(`[${this.redisConfig.redisId}] Redis HGET`, k, field)
|
|
try { return(await this.redisClient.hGet(k, field)) }
|
|
catch(err) {
|
|
console.error(`[${this.redisConfig.redisId}] Redis crash doing HGET: `, k, field, err)
|
|
return(null)
|
|
}
|
|
}
|
|
|
|
async redisHgetall(k, customPrefix=null){
|
|
k = this.resolveKey(k, customPrefix)
|
|
if(this.debug) console.log(`[${this.redisConfig.redisId}] Redis HGETALL`, k)
|
|
try { return(await this.redisClient.hGetAll(k)) }
|
|
catch(err) {
|
|
console.error(`[${this.redisConfig.redisId}] Redis crash doing HGETALL: `, k, err)
|
|
return({})
|
|
}
|
|
}
|
|
|
|
async redisSmembers(k, customPrefix=null){
|
|
k = this.resolveKey(k, customPrefix)
|
|
if(this.debug) console.log(`[${this.redisConfig.redisId}] Redis SMEMBERS`, k)
|
|
try { return(await this.redisClient.sMembers(k)) }
|
|
catch(err) {
|
|
console.error(`[${this.redisConfig.redisId}] Redis crash doing SMEMBERS: `, k, err)
|
|
return([])
|
|
}
|
|
}
|
|
|
|
async redisHset(k, field, v, customPrefix=null){
|
|
if(typeof(v) != 'string') v = JSON.stringify(v)
|
|
k = this.resolveKey(k, customPrefix)
|
|
if(this.debug) console.log(`[${this.redisConfig.redisId}] Redis HSET`, k, field)
|
|
try { await this.redisClient.hSet(k, field, v) }
|
|
catch(err) { console.error(`[${this.redisConfig.redisId}] Redis crash doing HSET: `, k, field, err) }
|
|
}
|
|
|
|
async redisHdel(k, field, customPrefix=null){
|
|
k = this.resolveKey(k, customPrefix)
|
|
if(this.debug) console.log(`[${this.redisConfig.redisId}] Redis HDEL`, k, field)
|
|
try { await this.redisClient.hDel(k, field) }
|
|
catch(err) { console.error(`[${this.redisConfig.redisId}] Redis crash doing HDEL: `, k, field, err) }
|
|
}
|
|
|
|
async redisSadd(k, member, customPrefix=null){
|
|
k = this.resolveKey(k, customPrefix)
|
|
if(this.debug) console.log(`[${this.redisConfig.redisId}] Redis SADD`, k, member)
|
|
try { await this.redisClient.sAdd(k, member) }
|
|
catch(err) { console.error(`[${this.redisConfig.redisId}] Redis crash doing SADD: `, k, member, err) }
|
|
}
|
|
|
|
async redisSrem(k, member, customPrefix=null){
|
|
k = this.resolveKey(k, customPrefix)
|
|
if(this.debug) console.log(`[${this.redisConfig.redisId}] Redis SREM`, k, member)
|
|
try { await this.redisClient.sRem(k, member) }
|
|
catch(err) { console.error(`[${this.redisConfig.redisId}] Redis crash doing SREM: `, k, member, err) }
|
|
}
|
|
|
|
async redisGetTtl(k, customPrefix=null){
|
|
if(customPrefix!==null) k = customPrefix + k
|
|
else if(!k.startsWith(this.redisConfig.basePrefix)) k = this.redisConfig.basePrefix + k
|
|
if(this.debug) console.log(`[${this.redisConfig.redisId}] Redis Get TTL `, k)
|
|
let v=null
|
|
try { v = await this.redisClient.ttl(k) }
|
|
catch(err) { console.error(`[${this.redisConfig.redisId}] Redis crash doing Redis ttl: `, k) }
|
|
return(v)
|
|
}
|
|
|
|
async redisSetTtl(k, ttl, customPrefix=null){
|
|
if(customPrefix!==null) k = customPrefix + k
|
|
else if(!k.startsWith(this.redisConfig.basePrefix)) k = this.redisConfig.basePrefix + k
|
|
if(this.debug) console.log(`[${this.redisConfig.redisId}] Redis Set TTL`, k)
|
|
try { await this.redisClient.expire(k, ttl) }
|
|
catch(err) { console.error(`[${this.redisConfig.redisId}] Redis crash doing Redis expire: `, k, ttl) }
|
|
}
|
|
|
|
async redisXadd(streamName, kvObj, max = ''){
|
|
if(!streamName.startsWith(this.redisConfig.basePrefix)) streamName = this.redisConfig.basePrefix + streamName
|
|
if(this.debug) console.log(`[${this.redisConfig.redisId}] Redis XADD `, streamName, kvObj)
|
|
let arr = ['XADD', streamName]
|
|
if(max != '') arr = [...arr, ...['MAXLEN', '~', (1*max).toString()]]
|
|
arr.push('*')
|
|
let payload = '""'
|
|
try{ payload = JSON.stringify(kvObj) }
|
|
catch(e) { console.warn(`[${this.redisConfig.redisId}] cannot historize bad json: `,kvObj) }
|
|
arr.push('streamData')
|
|
arr.push(payload)
|
|
let sid = null
|
|
try { sid = await this.redisClient.sendCommand(arr) }
|
|
catch(err) { console.error(`[${this.redisConfig.redisId}] Redis crash doing Redis command: `, arr, err) }
|
|
return(sid)
|
|
}
|
|
|
|
async redisXrange(streamName, start = '-', end = '+', withPayload = true){
|
|
if(!streamName.startsWith(this.redisConfig.basePrefix)) streamName = this.redisConfig.basePrefix + streamName
|
|
if(this.debug) console.log(`[${this.redisConfig.redisId}]Redis XRANGE `, streamName)
|
|
if(typeof(start)!='string') start = start.toString()
|
|
if(typeof(end)!='string') end = end.toString()
|
|
let arr = ['XRANGE', streamName, start, end]
|
|
let res = []
|
|
try { res = await this.redisClient.sendCommand(arr) }
|
|
catch(err) { console.error(`[${this.redisConfig.redisId}] Redis crash doing Redis command: `, arr, err.msg) }
|
|
|
|
if(withPayload){
|
|
let o = {}
|
|
for (let row of res) {
|
|
let payload = ''
|
|
try{ payload = JSON.parse(row[1][1]) }
|
|
catch(e) { console.warn(`[${this.redisConfig.redisId}] cannot unhistorize bad json: `,row[1][1]) }
|
|
o[row[0]] = payload
|
|
}
|
|
return(o)
|
|
} else {
|
|
return(res.map(row => row[0]))
|
|
}
|
|
}
|
|
|
|
isHistorizedChan(chan){
|
|
if(!chan.startsWith(this.redisConfig.basePrefix)) chan = this.redisConfig.basePrefix + chan
|
|
const historizeChannels = this.redisConfig.historizeChannels ?? []
|
|
var matches = historizeChannels.filter((e) => {
|
|
if(!e.startsWith(this.redisConfig.basePrefix)) e = this.redisConfig.basePrefix + e
|
|
if(e.indexOf('*') > -1) {
|
|
let r = new RegExp('^'+e.replace(/\*/g,'(.+)')+'$','g')
|
|
return(chan.match(r) != null)
|
|
} else return(chan == e)
|
|
})
|
|
return(matches.length > 0)
|
|
}
|
|
|
|
async getProcessInfo(){
|
|
if(this.debug) console.log(`[${this.redisConfig.redisId}] Redis INFO`)
|
|
let infoObject = {}
|
|
try {
|
|
const rawInfo = await this.redisClient.INFO()
|
|
let arr = rawInfo.replace(/\r/g,'').split('\n')
|
|
arr = arr.filter(item => (item.trim()!='') && (!item.trim().startsWith('#')))
|
|
infoObject = arr.reduce((acc, val) => { kv = val.split(':',2); if(kv.length>0){ acc[kv[0]] = kv[1] } return(acc) }, {})
|
|
}
|
|
catch(err) { console.error(`[${this.redisConfig.redisId}] Redis crash doing Redis INFO: `) }
|
|
return(infoObject)
|
|
}
|
|
|
|
async redisReceive(msg, chan){
|
|
if(this.debug) console.log(`[${this.redisConfig.redisId}] From Redis chan:`, chan, msg)
|
|
try {
|
|
msg = JSON.parse(msg)
|
|
} catch {
|
|
console.warn(`[${this.redisConfig.redisId}] Ignoring non-json on channel ${chan} : ${msg}`)
|
|
return
|
|
}
|
|
|
|
if(typeof(this.meshModule?.dispatchMessage) === 'function') {
|
|
try {
|
|
await this.meshModule.dispatchMessage(this, msg, chan)
|
|
} catch(err) {
|
|
console.error(`[${this.redisConfig.redisId}] dispatchMessage failed on ${chan}:`, err)
|
|
}
|
|
}
|
|
}
|
|
|
|
}
|