import redis from 'redis' import * as systemMesh from './actions/system/index.js' import * as arenaMesh from './actions/arena/index.js' const meshModules = { system: systemMesh, arena: arenaMesh, } export class RedisConnexion { constructor(options) { this.config = options.config this.debug = options.debug this.redisId = options.redisId this.redisConfig = this.config this.meshName = options.meshName const mesh = meshModules[this.meshName] if(mesh?.meshActions) Object.assign(this, mesh.meshActions) this.afterLoginMethods = mesh?.afterLoginMethods ?? [] if(!mesh) console.warn(`[${this.redisId}] Unknown meshName: ${this.meshName}`) this.redisClient = redis.createClient({ socket: { tls: this.redisConfig.tls, host: this.redisConfig.host, port: this.redisConfig.port } }); this.redisSubscriber = null; this.redisClient.on('error', (err) => { console.error('Redis error: ', err); }); if(this.debug) console.log(`[${this.redisConfig.redisId}] Redis started...`) } fullChan(chanName){ if(!chanName.startsWith(this.redisConfig.basePrefix)) chanName = this.redisConfig.basePrefix + chanName return(chanName) } matchesChan(chan, pattern) { const fullChan = this.fullChan(chan) const fullPattern = this.fullChan(pattern) const re = new RegExp('^' + fullPattern.replace(/\*/g, '(.+)') + '$') return(re.test(fullChan)) } async redisLogin(){ if(this.debug) console.log(`Connecting to Redis (${this.redisConfig.host}:${this.redisConfig.port}, tls:${this.redisConfig.tls?'yes':'no'})...`); await this.redisClient.connect(); if(this.debug) console.log(`Connected to Redis ${this.redisConfig.redisId}`); if(this.redisConfig.user) { await this.redisClient.sendCommand(['AUTH', this.redisConfig.user, this.redisConfig.pass]); if(this.debug) console.log(`Logged into Redis ${this.redisConfig.redisId}`); } else { if(this.debug) console.log(`Connected (anon) to Redis ${this.redisConfig.redisId}`); } if(this.debug) { var redisTime = await this.redisClient.time(); console.log(`[${this.redisConfig.redisId}] Redis ${this.redisConfig.redisId} time:`, redisTime); } for(const method of this.afterLoginMethods){ if(typeof method != 'function') continue method(this) } } async redisChansStart(){ this.redisSubscriber = this.redisClient.duplicate(); await this.redisSubscriber.connect(); if(this.redisConfig.user) { await this.redisSubscriber.sendCommand(['AUTH', this.redisConfig.user, this.redisConfig.pass]); } const allChans = this.redisConfig.basePrefix + this.redisConfig.chansNamespace+'*' this.redisSubscriber.pSubscribe(allChans, this.redisReceive.bind(this)); if(this.debug) console.log(`[${this.redisConfig.redisId}] PSubscription OK `, allChans); } async redisSubscribe(chanName, callBack){ if(!chanName.startsWith(this.redisConfig.basePrefix)) chanName = this.redisConfig.basePrefix + chanName if(!chanName.startsWith(this.redisConfig.basePrefix+this.redisConfig.chansNamespace)) { console.warn(`[${this.redisConfig.redisId}] redisSubscribe : forbidden channel range on this redis !`) return } await this.redisSubscriber.subscribe(chanName, callBack); } async redisPublish(chanName, msg){ if(typeof (msg) != 'string') msg = JSON.stringify(msg); if(!chanName.startsWith(this.redisConfig.basePrefix)) chanName = this.redisConfig.basePrefix + chanName if(!chanName.startsWith(this.redisConfig.basePrefix+this.redisConfig.chansNamespace)) { console.warn(`[${this.redisConfig.redisId}] redisPublish : forbidden channel range on this redis ! (${chanName} / ${this.redisConfig.basePrefix+this.redisConfig.chansNamespace}) `) return } await this.redisClient.publish(chanName, msg); } async redisSet(k, v, exp = 0, customPrefix=null){ if(typeof(v) != 'string') v = JSON.stringify(v); if(customPrefix!==null) k = customPrefix + k else if(!k.startsWith(this.redisConfig.basePrefix)) k = this.redisConfig.basePrefix + k if(this.debug) console.log(`[${this.redisConfig.redisId}] Redis SET `, k); try { await this.redisClient.set(k, v) } catch(err) { console.error(`[${this.redisConfig.redisId}] Redis crash doing Redis set: `, k, v) } if(exp > 0) { try { await this.redisClient.expire(k, exp) } catch(err) { console.error(`[${this.redisConfig.redisId}] Redis crash doing Redis expire: `, k, exp) } } } async redisGet(k, customPrefix=null){ if(customPrefix!==null) k = customPrefix + k else if(!k.startsWith(this.redisConfig.basePrefix)) k = this.redisConfig.basePrefix + k if(this.debug) console.log(`[${this.redisConfig.redisId}] Redis GET`, k) let v=null try { v = await this.redisClient.get(k) } catch(err) { console.error(`[${this.redisConfig.redisId}] Redis crash doing Redis get: `, k) } return(v); } resolveKey(k, customPrefix=null){ if(customPrefix!==null) return(customPrefix + k) if(!k.startsWith(this.redisConfig.basePrefix)) k = this.redisConfig.basePrefix + k return(k) } async redisDel(k, customPrefix=null){ k = this.resolveKey(k, customPrefix) if(this.debug) console.log(`[${this.redisConfig.redisId}] Deleting`, k) await this.redisClient.del(k) } async redisHset(k, field, v, customPrefix=null){ if(typeof(v) != 'string') v = JSON.stringify(v) k = this.resolveKey(k, customPrefix) if(this.debug) console.log(`[${this.redisConfig.redisId}] Redis HSET`, k, field) try { await this.redisClient.hSet(k, field, v) } catch(err) { console.error(`[${this.redisConfig.redisId}] Redis crash doing HSET: `, k, field, err) } } async redisHdel(k, field, customPrefix=null){ k = this.resolveKey(k, customPrefix) if(this.debug) console.log(`[${this.redisConfig.redisId}] Redis HDEL`, k, field) try { await this.redisClient.hDel(k, field) } catch(err) { console.error(`[${this.redisConfig.redisId}] Redis crash doing HDEL: `, k, field, err) } } async redisSadd(k, member, customPrefix=null){ k = this.resolveKey(k, customPrefix) if(this.debug) console.log(`[${this.redisConfig.redisId}] Redis SADD`, k, member) try { await this.redisClient.sAdd(k, member) } catch(err) { console.error(`[${this.redisConfig.redisId}] Redis crash doing SADD: `, k, member, err) } } async redisSrem(k, member, customPrefix=null){ k = this.resolveKey(k, customPrefix) if(this.debug) console.log(`[${this.redisConfig.redisId}] Redis SREM`, k, member) try { await this.redisClient.sRem(k, member) } catch(err) { console.error(`[${this.redisConfig.redisId}] Redis crash doing SREM: `, k, member, err) } } async redisGetTtl(k, customPrefix=null){ if(customPrefix!==null) k = customPrefix + k else if(!k.startsWith(this.redisConfig.basePrefix)) k = this.redisConfig.basePrefix + k if(this.debug) console.log(`[${this.redisConfig.redisId}] Redis Get TTL `, k) let v=null try { v = await this.redisClient.ttl(k) } catch(err) { console.error(`[${this.redisConfig.redisId}] Redis crash doing Redis ttl: `, k) } return(v); } async redisSetTtl(k, ttl, customPrefix=null){ if(customPrefix!==null) k = customPrefix + k else if(!k.startsWith(this.redisConfig.basePrefix)) k = this.redisConfig.basePrefix + k if(this.debug) console.log(`[${this.redisConfig.redisId}] Redis Set TTL`, k); try { await this.redisClient.expire(k, ttl) } catch(err) { console.error(`[${this.redisConfig.redisId}] Redis crash doing Redis expire: `, k, ttl) } } async redisXadd(streamName, kvObj, max = ''){ if(!streamName.startsWith(this.redisConfig.basePrefix)) streamName = this.redisConfig.basePrefix + streamName if(this.debug) console.log(`[${this.redisConfig.redisId}] Redis XADD `, streamName, kvObj); let arr = ['XADD', streamName] if(max != '') arr = [...arr, ...['MAXLEN', '~', (1*max).toString()]] arr.push('*') let payload = '""' try{ payload = JSON.stringify(kvObj) } catch(e) { console.warn(`[${this.redisConfig.redisId}] cannot historize bad json: `,kvObj) } arr.push('streamData') arr.push(payload) let sid = null try { sid = await this.redisClient.sendCommand(arr); } catch(err) { console.error(`[${this.redisConfig.redisId}] Redis crash doing Redis command: `, arr, err) } return(sid); } async redisXrange(streamName, start = '-', end = '+', withPayload = true){ if(!streamName.startsWith(this.redisConfig.basePrefix)) streamName = this.redisConfig.basePrefix + streamName if(this.debug) console.log(`[${this.redisConfig.redisId}]Redis XRANGE `, streamName); if(typeof(start)!='string') start = start.toString() if(typeof(end)!='string') end = end.toString() let arr = ['XRANGE', streamName, start, end]; let res = [] try { res = await this.redisClient.sendCommand(arr) } catch(err) { console.error(`[${this.redisConfig.redisId}] Redis crash doing Redis command: `, arr, err.msg) } if(withPayload){ let o = {}; for (let row of res) { // We'll take only the first content of the stream (the value of the key 'streamdata') let payload = '' try{ payload = JSON.parse(row[1][1]) } catch(e) { console.warn(`[${this.redisConfig.redisId}] cannot unhistorize bad json: `,row[1][1]) } o[row[0]] = payload } return(o); } else { return(res.map(row => row[0])) } } isHistorizedChan(chan){ if(!chan.startsWith(this.redisConfig.basePrefix)) chan = this.redisConfig.basePrefix + chan var matches = this.redisConfig.historizeChannels.filter((e) => { if(!e.startsWith(this.redisConfig.basePrefix)) e = this.redisConfig.basePrefix + e if(e.indexOf('*') > -1) { let r = new RegExp('^'+e.replace(/\*/g,'(.+)')+'$','g') return(chan.match(r) != null); } else return(chan == e); }); return(matches.length > 0); } async getProcessInfo(){ if(this.debug) console.log(`[${this.redisConfig.redisId}] Redis INFO`); try { const rawInfo = await this.redisClient.INFO() let arr = rawInfo.replace(/\r/g,'').split('\n') arr = arr.filter(item => (item.trim()!='') && (!item.trim().startsWith('#'))) let infoObject = arr.reduce((acc, val) => { kv = val.split(':',2); if(kv.length>0){ acc[kv[0]] = kv[1] } return(acc) }, {}) } catch(err) { console.error(`[${this.redisConfig.redisId}] Redis crash doing Redis INFO: `) } return(infoObject) } async redisReceive(msg, chan){ if(this.debug) console.log(`[${this.redisConfig.redisId}] From Redis chan:`, chan, msg) try { msg = JSON.parse(msg) } catch { console.warn(`[${this.redisConfig.redisId}] Ignoring non-json on channel ${chan} : ${msg}`) return } if(this.meshName === 'arena' && this.config.gps && typeof(this.dispatchArenaMessage) === 'function') { if(this.dispatchArenaMessage(msg, chan)) return } if(this.meshName === 'system' && this.config.gps?.gpsActionsChannel){ const actionsChan = this.fullChan(this.config.gps.gpsActionsChannel) if(chan != actionsChan) return const action = msg.action if(!action || typeof(action) !== 'string') { console.warn(`[${this.redisConfig.redisId}] Ignoring message without action on ${chan}`) return } const handler = this['action_'+action] if(typeof(handler) != 'function') { if(this.debug) console.warn(`[${this.redisConfig.redisId}] Unknown action ${action} on ${chan}`) return } const payload = ('payload' in msg) ? msg.payload : null const reqid = ('reqid' in msg) ? msg.reqid.substr(0, 50) : null const sender = msg.sender || null const roles = Array.isArray(msg.roles) ? msg.roles : ['*'] if(this.debug) console.log(`[${this.redisConfig.redisId}] Dispatching action ${action} from ${sender}`) handler.call(this, action, payload, reqid, sender, roles) } } } // redis-cli -h redis.backend.eismea.eu --tls --user default