444 lines
18 KiB
JavaScript
Executable File
444 lines
18 KiB
JavaScript
Executable File
'use strict'
|
||
/**
|
||
* PROTOCOL
|
||
|
||
|
||
Application-level payloads are always JSON and always either an action, or an event :
|
||
|
||
1. ACTIONS : are made for request-reply.
|
||
They are aimed at the dialogue between the FE (mainly messageBus core modules) and WSSGateway.
|
||
These messages are identified by the fact there is an "action" key, top level.
|
||
|
||
Example : The FE asks WSSGateway to subscribe to Redis chans :
|
||
Request:
|
||
{ "action" : "SUB", // Must be a valid wssGateway action.
|
||
"payload": ["chan1", "chan2"], // Any type required by the action
|
||
"reqid": "987654321-abcdef-123456"
|
||
}
|
||
|
||
Reply:
|
||
{ "action" : "SUB",
|
||
"payload": ["chan1", "chan9"], // probably you were already subscribed to chan9,
|
||
"reqid": "987654321-abcdef-123456" // don't have to right to chan2, but succeeded subscribing to chan1
|
||
}
|
||
|
||
Newton principle applied to WSSG:
|
||
When there is an action in one direction (request),
|
||
there is the same action in the opposite direction (reply).
|
||
|
||
When doing a request, the FE can optionally include a "reqid", with a uuid.
|
||
It then has the guarnatee that the corresponding reply will contain the same reqid.
|
||
As you can receive a reply on a particular action in any number, at any time,
|
||
this allows the FE to match one specific action request with its specific reply.
|
||
This, in turn, allows this module to provide action-promise and action-timeouts.
|
||
|
||
2. EVENTS : are any other events circulating on the bus, thus on a REDIS channel.
|
||
They are triggered by another actor on the bus, and have nothing to do with FE-WssGW dialog .
|
||
These messages are identified by the fact there is an "event" key, top level.
|
||
So far, this core-module has no use of bus-events, they are considered as applicative-level-use only.
|
||
Therefore, this module just triggers a corresponding (javascript) event, for any potential listener in the app.
|
||
|
||
{ "eventType" : "PropaSubmitted", // Any applicative thing
|
||
"payload": { // Any type depending on applicative convention for this event
|
||
"propaNumber": "123456",
|
||
"propaAcronym": "Tintin"
|
||
}
|
||
}
|
||
|
||
Will trigger a "MessageBus.PropaSubmitted" javascript event, with
|
||
"detail":
|
||
{ msg: {
|
||
eventType: "PropaSubmitted",
|
||
payload: { "propaNumber": "123456",
|
||
"propaAcronym": "Tintin"
|
||
}
|
||
},
|
||
chan: "wssGateway:chan1:subchan2",
|
||
}
|
||
|
||
--------------- Low-level, WEBSOCKET ---------------
|
||
{ "event":"REDISMSG", // low level
|
||
"payload":{ // low level
|
||
"msg":{ // low level
|
||
"eventType":"PropaSubmitted", // APP LEVEL MESSAGE = Redis payload
|
||
"payload":{ // APP LEVEL MESSAGE = Redis payload
|
||
"propaNumber": "123456", // APP LEVEL MESSAGE = Redis payload
|
||
"propaAcronym": "Tintin" // APP LEVEL MESSAGE = Redis payload
|
||
}, // APP LEVEL MESSAGE = Redis payload
|
||
sender: "N007xyz" // APP LEVEL MESSAGE = Redis payload => added by gateways !
|
||
}, // low level
|
||
"chan":"wssGateway:chan1:subchan2" // low level = Redis channel
|
||
},
|
||
}
|
||
*
|
||
* @author Nicolas Stein
|
||
* @category Core
|
||
* @subcategory Libraries
|
||
*/
|
||
class MessageBus {
|
||
|
||
/**
|
||
*
|
||
* @param {*} config
|
||
* @param {*} userInfo
|
||
*/
|
||
constructor(config, userInfo){
|
||
this.config = config
|
||
if(this.config.debug) console.log('Lauching Websocket worker...');
|
||
this.config.hostname = (('host' in this.config) && ( this.config.host!='')) ? this.config.host : document.location.hostname
|
||
this.userInfo = userInfo
|
||
this.createWorker();
|
||
this.activeSubscriptions = [];
|
||
this.promisesRegister = { };
|
||
this.bus2jsEventsRegister = []; // items: { eventType:'string', RegisteredCb: function, realCb: function }
|
||
this.whenConnectedQ = [];
|
||
this.connected = false;
|
||
}
|
||
|
||
/**
|
||
*
|
||
*/
|
||
createWorker() {
|
||
if(!this.config.pathToWorker.endsWith('.js')) this.config.pathToWorker+='.js';
|
||
this.MessageBusWorker = new Worker(this.config.pathToWorker+'?'+crypto.randomUUID());
|
||
this.MessageBusWorker.postMessage({ 'action':'start', 'config': this.config, 'userInfo': this.userInfo });
|
||
this.MessageBusWorker.onmessage = this.receiveFromWorker.bind(this);
|
||
if(this.config.debug) console.log('Websocket worker launched.');
|
||
}
|
||
|
||
/**
|
||
*
|
||
* @param {*} callBack
|
||
*/
|
||
whenConnected(callBack){
|
||
if(typeof(callBack) != 'function') return;
|
||
if(this.connected) callBack();
|
||
else this.whenConnectedQ.push(callBack);
|
||
}
|
||
|
||
/**
|
||
*
|
||
* @param {*} timeout
|
||
* @returns {Promise}
|
||
*/
|
||
whenConnectedP(timeout=0){
|
||
return(
|
||
new Promise((resolve,reject) => {
|
||
this.whenConnected(resolve)
|
||
if(timeout>0) setTimeout(reject, timeout)
|
||
})
|
||
)
|
||
}
|
||
|
||
/**
|
||
*
|
||
* @param {*} callBack
|
||
*/
|
||
ifConnected(callBack){
|
||
if(typeof(callBack) != 'function') return;
|
||
if(this.connected) callBack();
|
||
}
|
||
|
||
/**
|
||
*
|
||
*/
|
||
executewhenConnectedQ() { for(var callBack of this.whenConnectedQ) callBack(); }
|
||
|
||
|
||
/**
|
||
* Request-reply an action from the WSSGateway
|
||
* This is a pure websocket exchange between client and WssGW.
|
||
* This request does not pass through the (Redis) bus.
|
||
* This method gives (and resolves) a promise, taking care of all lower-level details
|
||
*/
|
||
requestWssGwAction(action, payload=null, timeOut=5000){
|
||
if(!action) return;
|
||
let request = {'action':action, 'payload':payload};
|
||
request.reqid = crypto.randomUUID();
|
||
return(new Promise((resolve, fail) => {
|
||
let timeOutID = setTimeout(() => {
|
||
fail(`Timeout (>${timeOut}ms) for action ${action}`);
|
||
if(this.promisesRegister[request.reqid]) delete(this.promisesRegister[request.reqid])
|
||
}, timeOut);
|
||
this.promisesRegister[request.reqid] = [resolve, fail, timeOutID];
|
||
this.MessageBusWorker.postMessage(request);
|
||
}));
|
||
}
|
||
|
||
/**
|
||
* Request-reply an action towards an agent on the bus (normally infra, like HttpGw)
|
||
* This request will pass through the (Redis) bus.
|
||
* The reply will come on my own user notification channel.
|
||
* This method gives (and resolves) a promise, taking care of all lower-level details
|
||
*/
|
||
requestBusAction(chan, action, payload=null, timeOut=5000){
|
||
if(!action) return;
|
||
let request = {'action':action, 'payload':payload};
|
||
request.reqid = crypto.randomUUID();
|
||
return(new Promise((resolve, fail) => {
|
||
let timeOutID = setTimeout(() => {
|
||
fail(`Timeout (>${timeOut}ms) for action ${action}`);
|
||
if(this.promisesRegister[request.reqid]) delete(this.promisesRegister[request.reqid])
|
||
}, timeOut);
|
||
this.promisesRegister[request.reqid] = [resolve, fail, timeOutID];
|
||
if(!chan.startsWith(this.config.frontBusPrefix)) chan = this.config.frontBusPrefix+chan
|
||
this.send(chan, JSON.stringify(request))
|
||
}));
|
||
}
|
||
|
||
/**
|
||
* Request-reply an action towards Midas
|
||
* This request will pass through the (Redis) bus.
|
||
* The reply will come on my own user notification channel.
|
||
* This method gives (and resolves) a promise, taking care of all lower-level details
|
||
*/
|
||
requestMidasAction(chan, action, data=null, timeOut=5000){
|
||
if(!action) return;
|
||
let request = {payload: {'action':action, 'data':data}}
|
||
request.reqid = crypto.randomUUID();
|
||
return(new Promise((resolve, fail) => {
|
||
let timeOutID = setTimeout(() => {
|
||
fail(`Timeout (>${timeOut}ms) for action ${action}`);
|
||
if(this.promisesRegister[request.reqid]) delete(this.promisesRegister[request.reqid])
|
||
}, timeOut);
|
||
this.promisesRegister[request.reqid] = [resolve, fail, timeOutID];
|
||
if(!chan.startsWith(this.config.frontBusPrefix)) chan = this.config.frontBusPrefix+chan
|
||
this.send(chan, JSON.stringify(request))
|
||
}));
|
||
}
|
||
|
||
/**
|
||
*
|
||
* @param {*} chan
|
||
* @param {*} eventType
|
||
* @param {*} eventPayload
|
||
*/
|
||
sendEvent(chan, eventType, eventPayload){
|
||
if(!chan.startsWith(this.config.frontBusPrefix)) chan = this.config.frontBusPrefix+chan
|
||
this.send(
|
||
chan,
|
||
JSON.stringify({ eventType: eventType,
|
||
payload: eventPayload
|
||
})
|
||
)
|
||
}
|
||
|
||
/**
|
||
*
|
||
* @param {*} chan
|
||
* @param {*} msg
|
||
*/
|
||
send(chan, msg){
|
||
// You can publish to an unsubscribed chan, userchans are the best example !
|
||
// if(this.activeSubscriptions.indexOf(chan)<0) return;
|
||
var request = {'action':'PUB', 'payload': { 'chan':chan, 'msg': msg}};
|
||
this.MessageBusWorker.postMessage(request);
|
||
}
|
||
|
||
/**
|
||
* Registers a bus event, filtering on allowed incoming chans.
|
||
* => Callback takes arguments (chan, eventType, payload)
|
||
* where chan is the actual chan that carried the event eventType
|
||
*
|
||
* Filtering is important because you could have evenType = 'update',
|
||
* arriving on chans like 'dataChange:proposal' and 'dataChange:organisation' (thus for different actions).
|
||
* Besides, you don't want to react for example on 'growl' if it's arriving on
|
||
* some chan publishable by another user and misused by him.
|
||
*
|
||
* @param {string} eventType
|
||
* @param {Array} filterChans Array of allowed chans (string). Globbing with '*' is allowed.
|
||
* @param {*} callback
|
||
*/
|
||
addBusListener(eventType, filterChans, cb, scope=''){
|
||
let realCb = (e => {
|
||
let realChan = e.detail.chan
|
||
if(filterChans.every(filterChan => (!this.chanMatch(realChan, filterChan)))) return
|
||
cb(realChan, e.detail.payload, e.detail.sender)
|
||
})
|
||
let realEventType = 'MessageBus.event.'+eventType
|
||
app.events.addEvent(realEventType, realCb, 'MessageBus'+scope)
|
||
this.bus2jsEventsRegister.push({
|
||
eventType: eventType,
|
||
cb: cb,
|
||
realEventType : realEventType,
|
||
realCb: realCb
|
||
})
|
||
}
|
||
|
||
/**
|
||
* De-registers bus event(s)
|
||
* If several events of the same type, same calback, then they are all whiped
|
||
*/
|
||
removeBusListener(eventType, cb, scope=''){
|
||
let toKick = this.bus2jsEventsRegister.filter(
|
||
item => ((item.eventType==eventType) && (item.cb==cb))
|
||
)
|
||
for(let kickItem of toKick){
|
||
app.events.removeEvent(kickItem.realEventType, kickItem.realCb, 'MessageBus'+scope)
|
||
}
|
||
|
||
}
|
||
|
||
|
||
/**
|
||
* Subscribe to channels
|
||
*
|
||
* @param {object} channels
|
||
* @returns {object}
|
||
*/
|
||
subscribe(channels){
|
||
return(this.requestWssGwAction('SUB', channels))
|
||
}
|
||
|
||
/**
|
||
* Unsubscribe from channels
|
||
*
|
||
* @param {*} channels
|
||
* @returns {object}
|
||
*/
|
||
unSubscribe(channels) { return(this.requestWssGwAction('UNSUB', channels)) }
|
||
|
||
/**
|
||
* Get current subscriptions list
|
||
* @returns {object}
|
||
*/
|
||
subscriptionsList() { return(this.requestWssGwAction('SUBLST')) }
|
||
|
||
/**
|
||
* Get channel history
|
||
*
|
||
* @param {*} channels
|
||
* @returns {object}
|
||
*/
|
||
chanHistory(channel, from, to){
|
||
let payload = {
|
||
channel: channel,
|
||
from: from
|
||
}
|
||
if(to) payload['to'] = to
|
||
return(this.requestWssGwAction('CHANHIST', payload))
|
||
}
|
||
|
||
/**
|
||
* Helper method to match a chan with globbing
|
||
*
|
||
* @param {string} myChan (no glob)
|
||
* @param {string} targetChan (possible glob)
|
||
* @returns {boolean}
|
||
*/
|
||
chanMatch(myChan, targetChan) {
|
||
let re = new RegExp('^'+targetChan.replace(/\*/g,'(.+)')+'$','g')
|
||
return(myChan.match(re)!=null)
|
||
}
|
||
|
||
/**
|
||
*
|
||
* @param {Event} event
|
||
*/
|
||
receiveFromWorker(e) {
|
||
var workermsg = e.data;
|
||
if('event' in workermsg){
|
||
// event "ReceiveFromServer" is the general case of a message from server, found in data, with its own struct.
|
||
// other type og event are generated by the worker, about the connection
|
||
switch(workermsg.event){
|
||
case 'ReceiveFromServer':
|
||
this.receiveFromServer(JSON.parse(workermsg.data));
|
||
break;
|
||
case 'connected':
|
||
this.connected = true;
|
||
if(this.config.debug) console.log('received connected event from worker !');
|
||
this.executewhenConnectedQ();
|
||
app.events.trigger('MessageBus.Connected');
|
||
break;
|
||
|
||
case 'closed':
|
||
if(this.config.debug) console.log('received closed event from worker!');
|
||
this.activeSubscriptions = [];
|
||
this.callBacksRegister = { };
|
||
this.whenConnectedQ = [];
|
||
this.connected = false;
|
||
app.events.trigger('MessageBus.Closed');
|
||
break;
|
||
default:
|
||
if(this.config.debug) console.warn('Unknown Websocket Worker message:', workermsg);
|
||
}
|
||
}
|
||
}
|
||
|
||
/**
|
||
*
|
||
* @param {object} data
|
||
* @param {string} data.action Possible values: 'SUB', 'SUBLST', ...
|
||
* @param {string} [data.reqid]
|
||
* @param {object} [data.payload] response payload
|
||
* @param {object} [data.err] response error
|
||
* @param {boolean} [data.success]
|
||
*/
|
||
receiveFromServer(srvdata) {
|
||
// See protocol reminder comment at the bottom
|
||
if('action' in srvdata){ // Reply to a request
|
||
let action = srvdata.action;
|
||
let payload = ('payload' in srvdata) ? srvdata.payload : null;
|
||
// Piggyback on the results of some actions for this module internal use
|
||
switch(action){
|
||
case 'SUB':
|
||
if(this.activeSubscriptions.indexOf(payload)<0) this.activeSubscriptions = this.activeSubscriptions.concat(payload);
|
||
break;
|
||
case 'SUBLST':
|
||
if(this.activeSubscriptions.indexOf(payload)<0) this.activeSubscriptions = this.activeSubscriptions.concat(payload);
|
||
break;
|
||
}
|
||
app.events.trigger('MessageBus.anyAction', srvdata);
|
||
} else { // Low-level event : Redis Event, contrary to requ/reply with wssGateway, or other later
|
||
if(('event' in srvdata) && (srvdata.event == 'REDISMSG')){
|
||
var payload = ('payload' in srvdata) ? srvdata.payload : null;
|
||
if(payload && payload.msg && (payload.msg.eventType || payload.msg.action)) {
|
||
if(payload.msg.eventType){
|
||
var eventType = payload.msg.eventType;
|
||
app.events.trigger('MessageBus.event.'+eventType, {
|
||
chan: payload.chan,
|
||
sender: payload.msg.sender,
|
||
eventType: payload.msg.eventType,
|
||
payload: payload.msg.payload,
|
||
});
|
||
} else if(payload.msg.action && payload.msg.reqid) {
|
||
let reqid = payload.msg.reqid;
|
||
let action = payload.msg.action;
|
||
let actionPayload = ('payload' in payload.msg) ? payload.msg.payload : null;
|
||
let err = ('err' in payload.msg) ? payload.msg.err : null;
|
||
let success = payload.msg.success;
|
||
if(reqid in this.promisesRegister) {
|
||
clearTimeout(this.promisesRegister[reqid][2]); // Stop timeout timer
|
||
if(success) this.promisesRegister[reqid][0](actionPayload); // resolve
|
||
else this.promisesRegister[reqid][1](`MsgBus action "${action}" failed.\nError: ${err}`); // Fail
|
||
}
|
||
}
|
||
app.events.trigger('MessageBus.anyMessage', {
|
||
chan: payload.chan,
|
||
msg : payload.msg,
|
||
});
|
||
} else if(payload && payload.bmsg){
|
||
app.events.trigger('MessageBus.promiscuousMessage', { // Repill msg : decapsulate & use spcific event
|
||
chan: payload.bmsg.chan,
|
||
msg : payload.bmsg.msg,
|
||
});
|
||
}
|
||
else {
|
||
console.warn('Weird bus message (discarted) :', srvdata)
|
||
}
|
||
}
|
||
}
|
||
|
||
// For request-reply, settle promise
|
||
if(srvdata.reqid && (srvdata.reqid in this.promisesRegister)) {
|
||
let payload = ('payload' in srvdata) ? srvdata.payload : null;
|
||
let err = ('err' in srvdata) ? srvdata.err : null;
|
||
let success = srvdata.success;
|
||
clearTimeout(this.promisesRegister[srvdata.reqid][2]); // Stop timeout timer
|
||
if(success) this.promisesRegister[srvdata.reqid][0](payload); // resolve
|
||
else this.promisesRegister[srvdata.reqid][1](`MsgBus action failed.\nError: ${err}`); // Fail
|
||
}
|
||
}
|
||
}
|
||
|
||
app.registerClass('MessageBus', MessageBus); |