'use strict' /** * PROTOCOL Application-level payloads are always JSON and always either an action, or an event : 1. ACTIONS : are made for request-reply. They are aimed at the dialogue between the FE (mainly messageBus core modules) and WSSGateway. These messages are identified by the fact there is an "action" key, top level. Example : The FE asks WSSGateway to subscribe to Redis chans : Request: { "action" : "SUB", // Must be a valid wssGateway action. "payload": ["chan1", "chan2"], // Any type required by the action "reqid": "987654321-abcdef-123456" } Reply: { "action" : "SUB", "payload": ["chan1", "chan9"], // probably you were already subscribed to chan9, "reqid": "987654321-abcdef-123456" // don't have to right to chan2, but succeeded subscribing to chan1 } Newton principle applied to WSSG: When there is an action in one direction (request), there is the same action in the opposite direction (reply). When doing a request, the FE can optionally include a "reqid", with a uuid. It then has the guarnatee that the corresponding reply will contain the same reqid. As you can receive a reply on a particular action in any number, at any time, this allows the FE to match one specific action request with its specific reply. This, in turn, allows this module to provide action-promise and action-timeouts. 2. EVENTS : are any other events circulating on the bus, thus on a REDIS channel. They are triggered by another actor on the bus, and have nothing to do with FE-WssGW dialog . These messages are identified by the fact there is an "event" key, top level. So far, this core-module has no use of bus-events, they are considered as applicative-level-use only. Therefore, this module just triggers a corresponding (javascript) event, for any potential listener in the app. { "eventType" : "PropaSubmitted", // Any applicative thing "payload": { // Any type depending on applicative convention for this event "propaNumber": "123456", "propaAcronym": "Tintin" } } Will trigger a "MessageBus.PropaSubmitted" javascript event, with "detail": { msg: { eventType: "PropaSubmitted", payload: { "propaNumber": "123456", "propaAcronym": "Tintin" } }, chan: "wssGateway:chan1:subchan2", }​​​ --------------- Low-level, WEBSOCKET --------------- { "event":"REDISMSG", // low level "payload":{ // low level "msg":{ // low level "eventType":"PropaSubmitted", // APP LEVEL MESSAGE = Redis payload "payload":{ // APP LEVEL MESSAGE = Redis payload "propaNumber": "123456", // APP LEVEL MESSAGE = Redis payload "propaAcronym": "Tintin" // APP LEVEL MESSAGE = Redis payload }, // APP LEVEL MESSAGE = Redis payload sender: "N007xyz" // APP LEVEL MESSAGE = Redis payload => added by gateways ! }, // low level "chan":"wssGateway:chan1:subchan2" // low level = Redis channel }, } * * @author Nicolas Stein * @category Core * @subcategory Libraries */ class MessageBus { /** * * @param {*} config * @param {*} userInfo */ constructor(config, userInfo){ this.config = config if(this.config.debug) console.log('Lauching Websocket worker...') this.config.hostname = (('host' in this.config) && ( this.config.host!='')) ? this.config.host : document.location.hostname this.userInfo = userInfo this.createWorker() this.activeSubscriptions = []; this.promisesRegister = { } this.bus2jsEventsRegister = []; // items: { eventType:'string', RegisteredCb: function, realCb: function } this.whenConnectedQ = [] this.connected = false } /** * */ createWorker() { if(!this.config.pathToWorker.endsWith('.js')) this.config.pathToWorker+='.js' this.MessageBusWorker = new Worker(this.config.pathToWorker+'?'+crypto.randomUUID()) this.MessageBusWorker.postMessage({ 'action':'start', 'config': this.config, 'userInfo': this.userInfo }); this.MessageBusWorker.onmessage = this.receiveFromWorker.bind(this) if(this.config.debug) console.log('Websocket worker launched.') } /** * * @param {*} callBack */ whenConnected(callBack){ if(typeof(callBack) != 'function') return if(this.connected) callBack() else this.whenConnectedQ.push(callBack) } /** * * @param {*} timeout * @returns {Promise} */ whenConnectedP(timeout=0){ return( new Promise((resolve,reject) => { this.whenConnected(resolve) if(timeout>0) setTimeout(reject, timeout) }) ) } /** * * @param {*} callBack */ ifConnected(callBack){ if(typeof(callBack) != 'function') return if(this.connected) callBack() } /** * */ executewhenConnectedQ() { for(var callBack of this.whenConnectedQ) callBack(); } /** * Request-reply an action from the WSSGateway * This is a pure websocket exchange between client and WssGW. * This request does not pass through the (Redis) bus. * This method gives (and resolves) a promise, taking care of all lower-level details */ requestWssGwAction(action, payload=null, timeOut=5000){ if(!action) return let request = {'action':action, 'payload':payload} request.reqid = crypto.randomUUID() return(new Promise((resolve, fail) => { let timeOutID = setTimeout(() => { fail(`Timeout (>${timeOut}ms) for action ${action}`); if(this.promisesRegister[request.reqid]) delete(this.promisesRegister[request.reqid]) }, timeOut) this.promisesRegister[request.reqid] = [resolve, fail, timeOutID] this.MessageBusWorker.postMessage(request) })) } /** * Request-reply an action towards an agent on the bus (normally infra, like HttpGw) * This request will pass through the (Redis) bus. * The reply will come on my own user notification channel. * This method gives (and resolves) a promise, taking care of all lower-level details */ requestBusAction(chan, action, payload=null, timeOut=5000){ if(!action) return let request = {'action':action, 'payload':payload} request.reqid = crypto.randomUUID() return(new Promise((resolve, fail) => { let timeOutID = setTimeout(() => { fail(`Timeout (>${timeOut}ms) for action ${action}`); if(this.promisesRegister[request.reqid]) delete(this.promisesRegister[request.reqid]) }, timeOut) this.promisesRegister[request.reqid] = [resolve, fail, timeOutID] if(!chan.startsWith(this.config.frontBusPrefix)) chan = this.config.frontBusPrefix+chan this.send(chan, JSON.stringify(request)) })) } /** * Request-reply an action towards Midas * This request will pass through the (Redis) bus. * The reply will come on my own user notification channel. * This method gives (and resolves) a promise, taking care of all lower-level details */ requestMidasAction(chan, action, data=null, timeOut=5000){ if(!action) return let request = {payload: {'action':action, 'data':data}} request.reqid = crypto.randomUUID() return(new Promise((resolve, fail) => { let timeOutID = setTimeout(() => { fail(`Timeout (>${timeOut}ms) for action ${action}`); if(this.promisesRegister[request.reqid]) delete(this.promisesRegister[request.reqid]) }, timeOut) this.promisesRegister[request.reqid] = [resolve, fail, timeOutID] if(!chan.startsWith(this.config.frontBusPrefix)) chan = this.config.frontBusPrefix+chan this.send(chan, JSON.stringify(request)) })) } /** * * @param {*} chan * @param {*} eventType * @param {*} eventPayload */ sendEvent(chan, eventType, eventPayload){ if(!chan.startsWith(this.config.frontBusPrefix)) chan = this.config.frontBusPrefix+chan this.send( chan, JSON.stringify({ eventType: eventType, payload: eventPayload }) ) } /** * * @param {*} chan * @param {*} msg */ send(chan, msg){ // You can publish to an unsubscribed chan, userchans are the best example ! // if(this.activeSubscriptions.indexOf(chan)<0) return var request = {'action':'PUB', 'payload': { 'chan':chan, 'msg': msg}} this.MessageBusWorker.postMessage(request) } /** * Registers a bus event, filtering on allowed incoming chans. * => Callback takes arguments (chan, eventType, payload) * where chan is the actual chan that carried the event eventType * * Filtering is important because you could have evenType = 'update', * arriving on chans like 'dataChange:proposal' and 'dataChange:organisation' (thus for different actions). * Besides, you don't want to react for example on 'growl' if it's arriving on * some chan publishable by another user and misused by him. * * @param {string} eventType * @param {Array} filterChans Array of allowed chans (string). Globbing with '*' is allowed. * @param {*} callback */ addBusListener(eventType, filterChans, cb, scope=''){ let realCb = (e => { let realChan = e.detail.chan if(filterChans.every(filterChan => (!this.chanMatch(realChan, filterChan)))) return cb(realChan, e.detail.payload, e.detail.sender) }) let realEventType = 'MessageBus.event.'+eventType app.events.addEvent(realEventType, realCb, 'MessageBus'+scope) this.bus2jsEventsRegister.push({ eventType: eventType, cb: cb, realEventType : realEventType, realCb: realCb }) } /** * De-registers bus event(s) * If several events of the same type, same calback, then they are all whiped */ removeBusListener(eventType, cb, scope=''){ let toKick = this.bus2jsEventsRegister.filter( item => ((item.eventType==eventType) && (item.cb==cb)) ) for(let kickItem of toKick){ app.events.removeEvent(kickItem.realEventType, kickItem.realCb, 'MessageBus'+scope) } } /** * Subscribe to channels * * @param {object} channels * @returns {object} */ subscribe(channels){ return(this.requestWssGwAction('SUB', channels)) } /** * Unsubscribe from channels * * @param {*} channels * @returns {object} */ unSubscribe(channels) { return(this.requestWssGwAction('UNSUB', channels)) } /** * Get current subscriptions list * @returns {object} */ subscriptionsList() { return(this.requestWssGwAction('SUBLST')) } /** * Get channel history * * @param {*} channels * @returns {object} */ chanHistory(channel, from, to){ let payload = { channel: channel, from: from } if(to) payload['to'] = to return(this.requestWssGwAction('CHANHIST', payload)) } /** * Helper method to match a chan with globbing * * @param {string} myChan (no glob, no user expansion) * @param {string} targetChan PATTERN (possible glob and user expansion) * @returns {boolean} */ chanMatch(myChan, targetChan) { targetChan = targetChan.replace(/\[UID\]/g, this.userInfo.uuid) targetChan = targetChan.replace(/\[CNXID\]/g, this.cnxId) let re = new RegExp('^'+targetChan.replace(/\*/g,'(.+)')+'$','g') return(myChan.match(re)!=null) } /** * * @param {Event} event */ receiveFromWorker(e) { var workermsg = e.data; if('event' in workermsg){ // event "ReceiveFromServer" is the general case of a message from server, found in data, with its own struct. // other type of event are generated by the worker, about the connection switch(workermsg.event){ case 'ReceiveFromServer': this.receiveFromServer(JSON.parse(workermsg.data)) break case 'connected': this.connected = true if(this.config.debug) console.log('received connected event from worker !') this.executewhenConnectedQ() app.events.trigger('MessageBus.Connected') break case 'closed': if(this.config.debug) console.log('received closed event from worker!') this.activeSubscriptions = [] this.callBacksRegister = { } this.whenConnectedQ = [] this.connected = false; app.events.trigger('MessageBus.Closed'); break default: if(this.config.debug) console.warn('Unknown Websocket Worker message:', workermsg) } } } /** * * @param {object} data * @param {string} data.action Possible values: 'SUB', 'SUBLST', ... * @param {string} [data.reqid] * @param {object} [data.payload] response payload * @param {object} [data.err] response error * @param {boolean} [data.success] */ receiveFromServer(srvdata) { // See protocol reminder comment at the bottom if('action' in srvdata){ // Reply to a request let action = srvdata.action let payload = ('payload' in srvdata) ? srvdata.payload : null // Piggyback on the results of some actions for this module internal use switch(action){ case 'WELCOME': console.log('Received WSS welcome', srvdata) this.cnxId = srvdata.cnxId this.serverTimeDelta = Date.now() - srvdata.serverTime break case 'SUB': if(this.activeSubscriptions.indexOf(payload)<0) this.activeSubscriptions = this.activeSubscriptions.concat(payload) break case 'SUBLST': if(this.activeSubscriptions.indexOf(payload)<0) this.activeSubscriptions = this.activeSubscriptions.concat(payload) break } app.events.trigger('MessageBus.anyAction', srvdata); } else { // Low-level event : Redis Event, contrary to requ/reply with wssGateway, or other later if(('event' in srvdata) && (srvdata.event == 'REDISMSG')){ var payload = ('payload' in srvdata) ? srvdata.payload : null if(payload && payload.msg && (payload.msg.eventType || payload.msg.action)) { if(payload.msg.eventType){ var eventType = payload.msg.eventType app.events.trigger('MessageBus.event.'+eventType, { chan: payload.chan, sender: payload.msg.sender, eventType: payload.msg.eventType, payload: payload.msg.payload, }) } else if(payload.msg.action && payload.msg.reqid) { let reqid = payload.msg.reqid let action = payload.msg.action let actionPayload = ('payload' in payload.msg) ? payload.msg.payload : null let err = ('err' in payload.msg) ? payload.msg.err : null let success = payload.msg.success; if(reqid in this.promisesRegister) { clearTimeout(this.promisesRegister[reqid][2]); // Stop timeout timer if(success) this.promisesRegister[reqid][0](actionPayload); // resolve else this.promisesRegister[reqid][1](`MsgBus action "${action}" failed.\nError: ${err}`); // Fail } } app.events.trigger('MessageBus.anyMessage', { chan: payload.chan, msg : payload.msg, }) } else if(payload && payload.bmsg){ app.events.trigger('MessageBus.promiscuousMessage', { // Repill msg : decapsulate & use spcific event chan: payload.bmsg.chan, msg : payload.bmsg.msg, }) } else { console.warn('Weird bus message (discarted) :', srvdata) } } } // For request-reply, settle promise if(srvdata.reqid && (srvdata.reqid in this.promisesRegister)) { let payload = ('payload' in srvdata) ? srvdata.payload : null let err = ('err' in srvdata) ? srvdata.err : null let success = srvdata.success clearTimeout(this.promisesRegister[srvdata.reqid][2]); // Stop timeout timer if(success) this.promisesRegister[srvdata.reqid][0](payload); // resolve else this.promisesRegister[srvdata.reqid][1](`MsgBus action failed.\nError: ${err}`); // Fail } } } app.registerClass('MessageBus', MessageBus);