import crypto from 'crypto' import { gatewayActions } from './actions/index.js' import { RedisSearchLanguages } from 'redis' export class WssConnexion { constructor(options){ Object.assign(this, gatewayActions) this.config = options.config this.socket = options.socket this.req = options.req this.uuid = options.uuid this.wssSrv = options.wssSrv this.debug = options.debug this.allRediscnx = options.allRediscnx this.accessRights = options.accessRights this.userId = options.userId this.roles = options.roles this.sessionID = null // null until login this.roundTripTime = 0 this.subscriptions = []; this.usersWatched = []; this.socket.on('close', this.onClose.bind(this)); this.socket.on('message', this.receive.bind(this)); if(this.debug) console.log(`Spawned new WSS connection to client: ${this.req.socket.remoteAddress}:${this.req.socket.remotePort}`) console.log('Session infos:',this.socket.session.authenticated, this.socket.session.userInfos) } welcome(){ clearTimeout(this.challengeTimeout) this.challengeTimeout = null this.cnxState = 'CONNECTED' if(this.debug) console.log(`Welcome to UUID ${this.uuid}`) } startKeepAlive() { if(this.config.server.keepAliveInterval>0) { if(this.config.server.keepAliveInterval >= (1.5*this.config.server.keepAliveTimeout)) { this.keepAliveNextTimeout = setTimeout(this.keepAlive.bind(this),this.config.server.keepAliveInterval*1000); } else { console.warn('keepAliveTimeout should be at least 1.5 x keepAliveInterval ! Ignoring Keepalive!'); } } } async receive(data) { //receive from Websocket try{ var pdata = JSON.parse(data); var action = pdata.action; var payload = ('payload' in pdata) ? pdata.payload : null; var reqid = ('reqid' in pdata) ? pdata.reqid.substr(0,50) : null; } catch (err){ if(this.debug) console.warn(`Malformed json for uuid ${this.uuid}\n${data}`); return; } if(typeof this['action_'+action] == "function") { if((this.debug) && (action != 'PONG rcv')) console.warn(`${action} for uuid ${this.uuid}`); this['action_'+action](action, payload, reqid); } else { if(this.debug) console.warn(`Unknown action ${action} for UUID ${this.uuid}`); } } send(data) { // Send to Websocket this.socket.send(data); } keepAlive(){ this.latestPing = { id: crypto.randomUUID(), time: Date.now() } this.send(JSON.stringify({ 'action': 'PING', 'reqid': this.latestPing.id, })); console.log('====>PING sent', this.latestPing) this.keepAliveBomb = setTimeout(this.MissedKeepAlive.bind(this), (this.config.server.keepAliveTimeout+1)*1000); } MissedKeepAlive(){ if(this.debug) console.warn(`Keep-alive timeout for UUID ${this.uuid}, closing connection !`); this.socket.close(); } close() { if(this.socket) this.socket.close() } onClose() { if(this.debug) console.log(`UUID ${this.uuid} disconnected !`); clearTimeout(this.keepAliveNextTimeout); this.clearAllSubscriptions(); this.wssSrv.cleanupConnexion(this.uuid, this.userId); // Suicide and leave my body to GC (Carefull if you added other references to me from outside !!) // Also think about all possibly active bind(this), which -I guess- also make references preventing GC. } subscribeMandatoryChans(){ let mandaChans = this.accessRights.mustSubscribe(this.userId, this.roles) for(let rediscnx of this.allRediscnx){ mandaChans = mandaChans.filter(chan => chan.startsWith(rediscnx.redisConfig.chansNamespace)) mandaChans = mandaChans.map(item=>rediscnx.redisConfig.basePrefix+item) for(var chan of mandaChans){ if(!(chan in rediscnx.subscriptions)) rediscnx.subscriptions[chan] = []; if(this.subscriptions.indexOf(chan)<0) { this.subscriptions.push(chan); rediscnx.subscriptions[chan].push(this.uuid); } } } this.action_SUBLST('SUBLST', null, ''); } clearAllSubscriptions(){ for(var chan of this.subscriptions){ for(let rediscnx of this.allRediscnx){ if(chan in rediscnx.subscriptions) { rediscnx.subscriptions[chan].splice(rediscnx.subscriptions[chan].indexOf(this.uuid), 1) } } } this.subscriptions = [] } sendErr(action, msg, reqid){ this.send(JSON.stringify({ 'action': action, 'success': false, 'err': msg, 'reqid': reqid, })); } updateOnlineUsers(onlineUsers){ //Helper for our parent, triggered from above on new/lost connexion onlineUsers = Object.keys(onlineUsers); let reply = { 'action': 'ISONLINE', 'payload': onlineUsers.filter((x) => (this.usersWatched.indexOf(x)>-1)), 'success': true, }; this.send(JSON.stringify(reply)); } } /* PROTOCOL Application-level payloads are always JSON and always either an action, or an event : 1. ACTIONS : are made for request-reply. They are aimed at the dialogue between the FE (mainly messageBus core modules) and WSSGateway. These messages are identified by the fact there is an "action" key, top level. Example : The FE asks WSSGateway to subscribe to Redis chans : Request: { "action" : "SUB", // Must be a valid wssGateway action. "payload": ["chan1", "chan2"], // Any type required by the action "reqid": "987654321-abcdef-123456" } Reply: { "action" : "SUB", "payload": ["chan1", "chan9"], // probably you were already subscribed to chan9, "reqid": "987654321-abcdef-123456" // don't have to right to chan2, but succeeded subscribing to chan1 } Newton principle applied to WSSG: When there is an action in one direction (request), there is the same action in the opposite direction (reply). When doing a request, the FE can optionally include a "reqid", with a uuid. It then has the guarnatee that the corresponding reply will contain the same reqid. As you can receive a reply on a particular action in any number, at any time, this allows the FE to match one specific action request with its specific reply. This, in turn, allows this module to provide action-promise and action-timeouts. 2. EVENTS : are any other events circulating on the bus, thus on a REDIS channel. They are triggered by another actor on the bus, and have nothing to do with FE-WssGW dialog . These messages are identified by the fact there is an "event" key, top level. So far, this core-module has no use of bus-events, they are considered as applicative-level-use only. Therefore, this module just triggers a corresponding (javascript) event, for any potential listener in the app. Example : { "event" : "PropaSubmitted", // Any applicative thing "payload": { // Any type depending on applicative convention for this event "propaNumber": "123456", "propaAcronym": "Tintin" } } Will trigger a "MessageBus.PropaSubmitted" javascript event, with "detail": { msg: { event: "PropaSubmitted", payload: { "propaNumber": "123456", "propaAcronym": "Tintin" } }, chan: "wssGateway:chan1:subchan2", }​​​ --------------- Low-level, WEBSOCKET --------------- { "event":"REDISMSG", // low level "payload":{ // low level "msg":{ // low level "eventType":"PropaSubmitted", // APP LEVEL MESSAGE = Redis payload "payload":{ // APP LEVEL MESSAGE = Redis payload "propaNumber": "123456", // APP LEVEL MESSAGE = Redis payload "propaAcronym": "Tintin" // APP LEVEL MESSAGE = Redis payload }, // APP LEVEL MESSAGE = Redis payload sender: "N007xyz" // APP LEVEL MESSAGE = Redis payload => added by gateways ! }, // low level "chan":"wssGateway:chan1:subchan2" // low level = Redis channel }, } */