unclean SPARC
This commit is contained in:
Executable
+444
@@ -0,0 +1,444 @@
|
||||
'use strict'
|
||||
/**
|
||||
* PROTOCOL
|
||||
|
||||
|
||||
Application-level payloads are always JSON and always either an action, or an event :
|
||||
|
||||
1. ACTIONS : are made for request-reply.
|
||||
They are aimed at the dialogue between the FE (mainly messageBus core modules) and WSSGateway.
|
||||
These messages are identified by the fact there is an "action" key, top level.
|
||||
|
||||
Example : The FE asks WSSGateway to subscribe to Redis chans :
|
||||
Request:
|
||||
{ "action" : "SUB", // Must be a valid wssGateway action.
|
||||
"payload": ["chan1", "chan2"], // Any type required by the action
|
||||
"reqid": "987654321-abcdef-123456"
|
||||
}
|
||||
|
||||
Reply:
|
||||
{ "action" : "SUB",
|
||||
"payload": ["chan1", "chan9"], // probably you were already subscribed to chan9,
|
||||
"reqid": "987654321-abcdef-123456" // don't have to right to chan2, but succeeded subscribing to chan1
|
||||
}
|
||||
|
||||
Newton principle applied to WSSG:
|
||||
When there is an action in one direction (request),
|
||||
there is the same action in the opposite direction (reply).
|
||||
|
||||
When doing a request, the FE can optionally include a "reqid", with a uuid.
|
||||
It then has the guarnatee that the corresponding reply will contain the same reqid.
|
||||
As you can receive a reply on a particular action in any number, at any time,
|
||||
this allows the FE to match one specific action request with its specific reply.
|
||||
This, in turn, allows this module to provide action-promise and action-timeouts.
|
||||
|
||||
2. EVENTS : are any other events circulating on the bus, thus on a REDIS channel.
|
||||
They are triggered by another actor on the bus, and have nothing to do with FE-WssGW dialog .
|
||||
These messages are identified by the fact there is an "event" key, top level.
|
||||
So far, this core-module has no use of bus-events, they are considered as applicative-level-use only.
|
||||
Therefore, this module just triggers a corresponding (javascript) event, for any potential listener in the app.
|
||||
|
||||
{ "eventType" : "PropaSubmitted", // Any applicative thing
|
||||
"payload": { // Any type depending on applicative convention for this event
|
||||
"propaNumber": "123456",
|
||||
"propaAcronym": "Tintin"
|
||||
}
|
||||
}
|
||||
|
||||
Will trigger a "MessageBus.PropaSubmitted" javascript event, with
|
||||
"detail":
|
||||
{ msg: {
|
||||
eventType: "PropaSubmitted",
|
||||
payload: { "propaNumber": "123456",
|
||||
"propaAcronym": "Tintin"
|
||||
}
|
||||
},
|
||||
chan: "wssGateway:chan1:subchan2",
|
||||
}
|
||||
|
||||
--------------- Low-level, WEBSOCKET ---------------
|
||||
{ "event":"REDISMSG", // low level
|
||||
"payload":{ // low level
|
||||
"msg":{ // low level
|
||||
"eventType":"PropaSubmitted", // APP LEVEL MESSAGE = Redis payload
|
||||
"payload":{ // APP LEVEL MESSAGE = Redis payload
|
||||
"propaNumber": "123456", // APP LEVEL MESSAGE = Redis payload
|
||||
"propaAcronym": "Tintin" // APP LEVEL MESSAGE = Redis payload
|
||||
}, // APP LEVEL MESSAGE = Redis payload
|
||||
sender: "N007xyz" // APP LEVEL MESSAGE = Redis payload => added by gateways !
|
||||
}, // low level
|
||||
"chan":"wssGateway:chan1:subchan2" // low level = Redis channel
|
||||
},
|
||||
}
|
||||
*
|
||||
* @author Nicolas Stein
|
||||
* @category Core
|
||||
* @subcategory Libraries
|
||||
*/
|
||||
class MessageBus {
|
||||
|
||||
/**
|
||||
*
|
||||
* @param {*} config
|
||||
* @param {*} userInfo
|
||||
*/
|
||||
constructor(config, userInfo){
|
||||
this.config = config
|
||||
if(this.config.debug) console.log('Lauching Websocket worker...');
|
||||
this.config.hostname = (('host' in this.config) && ( this.config.host!='')) ? this.config.host : document.location.hostname
|
||||
this.userInfo = userInfo
|
||||
this.createWorker();
|
||||
this.activeSubscriptions = [];
|
||||
this.promisesRegister = { };
|
||||
this.bus2jsEventsRegister = []; // items: { eventType:'string', RegisteredCb: function, realCb: function }
|
||||
this.whenConnectedQ = [];
|
||||
this.connected = false;
|
||||
}
|
||||
|
||||
/**
|
||||
*
|
||||
*/
|
||||
createWorker() {
|
||||
if(!this.config.pathToWorker.endsWith('.js')) this.config.pathToWorker+='.js';
|
||||
this.MessageBusWorker = new Worker(this.config.pathToWorker+'?'+crypto.randomUUID());
|
||||
this.MessageBusWorker.postMessage({ 'action':'start', 'config': this.config, 'userInfo': this.userInfo });
|
||||
this.MessageBusWorker.onmessage = this.receiveFromWorker.bind(this);
|
||||
if(this.config.debug) console.log('Websocket worker launched.');
|
||||
}
|
||||
|
||||
/**
|
||||
*
|
||||
* @param {*} callBack
|
||||
*/
|
||||
whenConnected(callBack){
|
||||
if(typeof(callBack) != 'function') return;
|
||||
if(this.connected) callBack();
|
||||
else this.whenConnectedQ.push(callBack);
|
||||
}
|
||||
|
||||
/**
|
||||
*
|
||||
* @param {*} timeout
|
||||
* @returns {Promise}
|
||||
*/
|
||||
whenConnectedP(timeout=0){
|
||||
return(
|
||||
new Promise((resolve,reject) => {
|
||||
this.whenConnected(resolve)
|
||||
if(timeout>0) setTimeout(reject, timeout)
|
||||
})
|
||||
)
|
||||
}
|
||||
|
||||
/**
|
||||
*
|
||||
* @param {*} callBack
|
||||
*/
|
||||
ifConnected(callBack){
|
||||
if(typeof(callBack) != 'function') return;
|
||||
if(this.connected) callBack();
|
||||
}
|
||||
|
||||
/**
|
||||
*
|
||||
*/
|
||||
executewhenConnectedQ() { for(var callBack of this.whenConnectedQ) callBack(); }
|
||||
|
||||
|
||||
/**
|
||||
* Request-reply an action from the WSSGateway
|
||||
* This is a pure websocket exchange between client and WssGW.
|
||||
* This request does not pass through the (Redis) bus.
|
||||
* This method gives (and resolves) a promise, taking care of all lower-level details
|
||||
*/
|
||||
requestWssGwAction(action, payload=null, timeOut=5000){
|
||||
if(!action) return;
|
||||
let request = {'action':action, 'payload':payload};
|
||||
request.reqid = crypto.randomUUID();
|
||||
return(new Promise((resolve, fail) => {
|
||||
let timeOutID = setTimeout(() => {
|
||||
fail(`Timeout (>${timeOut}ms) for action ${action}`);
|
||||
if(this.promisesRegister[request.reqid]) delete(this.promisesRegister[request.reqid])
|
||||
}, timeOut);
|
||||
this.promisesRegister[request.reqid] = [resolve, fail, timeOutID];
|
||||
this.MessageBusWorker.postMessage(request);
|
||||
}));
|
||||
}
|
||||
|
||||
/**
|
||||
* Request-reply an action towards an agent on the bus (normally infra, like HttpGw)
|
||||
* This request will pass through the (Redis) bus.
|
||||
* The reply will come on my own user notification channel.
|
||||
* This method gives (and resolves) a promise, taking care of all lower-level details
|
||||
*/
|
||||
requestBusAction(chan, action, payload=null, timeOut=5000){
|
||||
if(!action) return;
|
||||
let request = {'action':action, 'payload':payload};
|
||||
request.reqid = crypto.randomUUID();
|
||||
return(new Promise((resolve, fail) => {
|
||||
let timeOutID = setTimeout(() => {
|
||||
fail(`Timeout (>${timeOut}ms) for action ${action}`);
|
||||
if(this.promisesRegister[request.reqid]) delete(this.promisesRegister[request.reqid])
|
||||
}, timeOut);
|
||||
this.promisesRegister[request.reqid] = [resolve, fail, timeOutID];
|
||||
if(!chan.startsWith(this.config.frontBusPrefix)) chan = this.config.frontBusPrefix+chan
|
||||
this.send(chan, JSON.stringify(request))
|
||||
}));
|
||||
}
|
||||
|
||||
/**
|
||||
* Request-reply an action towards Midas
|
||||
* This request will pass through the (Redis) bus.
|
||||
* The reply will come on my own user notification channel.
|
||||
* This method gives (and resolves) a promise, taking care of all lower-level details
|
||||
*/
|
||||
requestMidasAction(chan, action, data=null, timeOut=5000){
|
||||
if(!action) return;
|
||||
let request = {payload: {'action':action, 'data':data}}
|
||||
request.reqid = crypto.randomUUID();
|
||||
return(new Promise((resolve, fail) => {
|
||||
let timeOutID = setTimeout(() => {
|
||||
fail(`Timeout (>${timeOut}ms) for action ${action}`);
|
||||
if(this.promisesRegister[request.reqid]) delete(this.promisesRegister[request.reqid])
|
||||
}, timeOut);
|
||||
this.promisesRegister[request.reqid] = [resolve, fail, timeOutID];
|
||||
if(!chan.startsWith(this.config.frontBusPrefix)) chan = this.config.frontBusPrefix+chan
|
||||
this.send(chan, JSON.stringify(request))
|
||||
}));
|
||||
}
|
||||
|
||||
/**
|
||||
*
|
||||
* @param {*} chan
|
||||
* @param {*} eventType
|
||||
* @param {*} eventPayload
|
||||
*/
|
||||
sendEvent(chan, eventType, eventPayload){
|
||||
if(!chan.startsWith(this.config.frontBusPrefix)) chan = this.config.frontBusPrefix+chan
|
||||
this.send(
|
||||
chan,
|
||||
JSON.stringify({ eventType: eventType,
|
||||
payload: eventPayload
|
||||
})
|
||||
)
|
||||
}
|
||||
|
||||
/**
|
||||
*
|
||||
* @param {*} chan
|
||||
* @param {*} msg
|
||||
*/
|
||||
send(chan, msg){
|
||||
// You can publish to an unsubscribed chan, userchans are the best example !
|
||||
// if(this.activeSubscriptions.indexOf(chan)<0) return;
|
||||
var request = {'action':'PUB', 'payload': { 'chan':chan, 'msg': msg}};
|
||||
this.MessageBusWorker.postMessage(request);
|
||||
}
|
||||
|
||||
/**
|
||||
* Registers a bus event, filtering on allowed incoming chans.
|
||||
* => Callback takes arguments (chan, eventType, payload)
|
||||
* where chan is the actual chan that carried the event eventType
|
||||
*
|
||||
* Filtering is important because you could have evenType = 'update',
|
||||
* arriving on chans like 'dataChange:proposal' and 'dataChange:organisation' (thus for different actions).
|
||||
* Besides, you don't want to react for example on 'growl' if it's arriving on
|
||||
* some chan publishable by another user and misused by him.
|
||||
*
|
||||
* @param {string} eventType
|
||||
* @param {Array} filterChans Array of allowed chans (string). Globbing with '*' is allowed.
|
||||
* @param {*} callback
|
||||
*/
|
||||
addBusListener(eventType, filterChans, cb, scope=''){
|
||||
let realCb = (e => {
|
||||
let realChan = e.detail.chan
|
||||
if(filterChans.every(filterChan => (!this.chanMatch(realChan, filterChan)))) return
|
||||
cb(realChan, e.detail.payload, e.detail.sender)
|
||||
})
|
||||
let realEventType = 'MessageBus.event.'+eventType
|
||||
app.events.addEvent(realEventType, realCb, 'MessageBus'+scope)
|
||||
this.bus2jsEventsRegister.push({
|
||||
eventType: eventType,
|
||||
cb: cb,
|
||||
realEventType : realEventType,
|
||||
realCb: realCb
|
||||
})
|
||||
}
|
||||
|
||||
/**
|
||||
* De-registers bus event(s)
|
||||
* If several events of the same type, same calback, then they are all whiped
|
||||
*/
|
||||
removeBusListener(eventType, cb, scope=''){
|
||||
let toKick = this.bus2jsEventsRegister.filter(
|
||||
item => ((item.eventType==eventType) && (item.cb==cb))
|
||||
)
|
||||
for(let kickItem of toKick){
|
||||
app.events.removeEvent(kickItem.realEventType, kickItem.realCb, 'MessageBus'+scope)
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
|
||||
/**
|
||||
* Subscribe to channels
|
||||
*
|
||||
* @param {object} channels
|
||||
* @returns {object}
|
||||
*/
|
||||
subscribe(channels){
|
||||
return(this.requestWssGwAction('SUB', channels))
|
||||
}
|
||||
|
||||
/**
|
||||
* Unsubscribe from channels
|
||||
*
|
||||
* @param {*} channels
|
||||
* @returns {object}
|
||||
*/
|
||||
unSubscribe(channels) { return(this.requestWssGwAction('UNSUB', channels)) }
|
||||
|
||||
/**
|
||||
* Get current subscriptions list
|
||||
* @returns {object}
|
||||
*/
|
||||
subscriptionsList() { return(this.requestWssGwAction('SUBLST')) }
|
||||
|
||||
/**
|
||||
* Get channel history
|
||||
*
|
||||
* @param {*} channels
|
||||
* @returns {object}
|
||||
*/
|
||||
chanHistory(channel, from, to){
|
||||
let payload = {
|
||||
channel: channel,
|
||||
from: from
|
||||
}
|
||||
if(to) payload['to'] = to
|
||||
return(this.requestWssGwAction('CHANHIST', payload))
|
||||
}
|
||||
|
||||
/**
|
||||
* Helper method to match a chan with globbing
|
||||
*
|
||||
* @param {string} myChan (no glob)
|
||||
* @param {string} targetChan (possible glob)
|
||||
* @returns {boolean}
|
||||
*/
|
||||
chanMatch(myChan, targetChan) {
|
||||
let re = new RegExp('^'+targetChan.replace(/\*/g,'(.+)')+'$','g')
|
||||
return(myChan.match(re)!=null)
|
||||
}
|
||||
|
||||
/**
|
||||
*
|
||||
* @param {Event} event
|
||||
*/
|
||||
receiveFromWorker(e) {
|
||||
var workermsg = e.data;
|
||||
if('event' in workermsg){
|
||||
// event "ReceiveFromServer" is the general case of a message from server, found in data, with its own struct.
|
||||
// other type og event are generated by the worker, about the connection
|
||||
switch(workermsg.event){
|
||||
case 'ReceiveFromServer':
|
||||
this.receiveFromServer(JSON.parse(workermsg.data));
|
||||
break;
|
||||
case 'connected':
|
||||
this.connected = true;
|
||||
if(this.config.debug) console.log('received connected event from worker !');
|
||||
this.executewhenConnectedQ();
|
||||
app.events.trigger('MessageBus.Connected');
|
||||
break;
|
||||
|
||||
case 'closed':
|
||||
if(this.config.debug) console.log('received closed event from worker!');
|
||||
this.activeSubscriptions = [];
|
||||
this.callBacksRegister = { };
|
||||
this.whenConnectedQ = [];
|
||||
this.connected = false;
|
||||
app.events.trigger('MessageBus.Closed');
|
||||
break;
|
||||
default:
|
||||
if(this.config.debug) console.warn('Unknown Websocket Worker message:', workermsg);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
*
|
||||
* @param {object} data
|
||||
* @param {string} data.action Possible values: 'SUB', 'SUBLST', ...
|
||||
* @param {string} [data.reqid]
|
||||
* @param {object} [data.payload] response payload
|
||||
* @param {object} [data.err] response error
|
||||
* @param {boolean} [data.success]
|
||||
*/
|
||||
receiveFromServer(srvdata) {
|
||||
// See protocol reminder comment at the bottom
|
||||
if('action' in srvdata){ // Reply to a request
|
||||
let action = srvdata.action;
|
||||
let payload = ('payload' in srvdata) ? srvdata.payload : null;
|
||||
// Piggyback on the results of some actions for this module internal use
|
||||
switch(action){
|
||||
case 'SUB':
|
||||
if(this.activeSubscriptions.indexOf(payload)<0) this.activeSubscriptions = this.activeSubscriptions.concat(payload);
|
||||
break;
|
||||
case 'SUBLST':
|
||||
if(this.activeSubscriptions.indexOf(payload)<0) this.activeSubscriptions = this.activeSubscriptions.concat(payload);
|
||||
break;
|
||||
}
|
||||
app.events.trigger('MessageBus.anyAction', srvdata);
|
||||
} else { // Low-level event : Redis Event, contrary to requ/reply with wssGateway, or other later
|
||||
if(('event' in srvdata) && (srvdata.event == 'REDISMSG')){
|
||||
var payload = ('payload' in srvdata) ? srvdata.payload : null;
|
||||
if(payload && payload.msg && (payload.msg.eventType || payload.msg.action)) {
|
||||
if(payload.msg.eventType){
|
||||
var eventType = payload.msg.eventType;
|
||||
app.events.trigger('MessageBus.event.'+eventType, {
|
||||
chan: payload.chan,
|
||||
sender: payload.msg.sender,
|
||||
eventType: payload.msg.eventType,
|
||||
payload: payload.msg.payload,
|
||||
});
|
||||
} else if(payload.msg.action && payload.msg.reqid) {
|
||||
let reqid = payload.msg.reqid;
|
||||
let action = payload.msg.action;
|
||||
let actionPayload = ('payload' in payload.msg) ? payload.msg.payload : null;
|
||||
let err = ('err' in payload.msg) ? payload.msg.err : null;
|
||||
let success = payload.msg.success;
|
||||
if(reqid in this.promisesRegister) {
|
||||
clearTimeout(this.promisesRegister[reqid][2]); // Stop timeout timer
|
||||
if(success) this.promisesRegister[reqid][0](actionPayload); // resolve
|
||||
else this.promisesRegister[reqid][1](`MsgBus action "${action}" failed.\nError: ${err}`); // Fail
|
||||
}
|
||||
}
|
||||
app.events.trigger('MessageBus.anyMessage', {
|
||||
chan: payload.chan,
|
||||
msg : payload.msg,
|
||||
});
|
||||
} else if(payload && payload.bmsg){
|
||||
app.events.trigger('MessageBus.promiscuousMessage', { // Repill msg : decapsulate & use spcific event
|
||||
chan: payload.bmsg.chan,
|
||||
msg : payload.bmsg.msg,
|
||||
});
|
||||
}
|
||||
else {
|
||||
console.warn('Weird bus message (discarted) :', srvdata)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// For request-reply, settle promise
|
||||
if(srvdata.reqid && (srvdata.reqid in this.promisesRegister)) {
|
||||
let payload = ('payload' in srvdata) ? srvdata.payload : null;
|
||||
let err = ('err' in srvdata) ? srvdata.err : null;
|
||||
let success = srvdata.success;
|
||||
clearTimeout(this.promisesRegister[srvdata.reqid][2]); // Stop timeout timer
|
||||
if(success) this.promisesRegister[srvdata.reqid][0](payload); // resolve
|
||||
else this.promisesRegister[srvdata.reqid][1](`MsgBus action failed.\nError: ${err}`); // Fail
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
app.registerClass('MessageBus', MessageBus);
|
||||
Reference in New Issue
Block a user