From cf292031fdc4e1c75917dd169ae5f04c73783a23 Mon Sep 17 00:00:00 2001 From: STEINNI Date: Fri, 26 Jun 2026 21:56:19 +0000 Subject: [PATCH] threetobus now uses [UID] template, MessageBus upgraded to cnxId --- app/thirdparty/Snaptobus/snaptobus.js | 2 +- .../Threetobus/threetobus.module.js | 21 +-- core/libs/MessageBus.js | 145 +++++++++--------- doc/prompts_blobs.txt | 2 + 4 files changed, 81 insertions(+), 89 deletions(-) diff --git a/app/thirdparty/Snaptobus/snaptobus.js b/app/thirdparty/Snaptobus/snaptobus.js index d998723..cb43f40 100644 --- a/app/thirdparty/Snaptobus/snaptobus.js +++ b/app/thirdparty/Snaptobus/snaptobus.js @@ -99,7 +99,7 @@ class Snaptobus{ processBusEvent(eventType, chan, payload, userId, x){ - const chanObj = this._curBusConfig.find(item => item.chan==chan) + const chanObj = this._curBusConfig.find(item => app.MessageBus.chanMatch(chan, item.chan)) if(!chanObj) return const eventObj = chanObj.events.find(item => item.eventName==eventType) if(!eventObj) return diff --git a/app/thirdparty/Threetobus/threetobus.module.js b/app/thirdparty/Threetobus/threetobus.module.js index f9bbdb2..d91674a 100644 --- a/app/thirdparty/Threetobus/threetobus.module.js +++ b/app/thirdparty/Threetobus/threetobus.module.js @@ -33,31 +33,14 @@ export class Threetobus{ if(this._frustumWatching) this._scheduleFrustumResubscribe() } - resolveSubscriberChan(chanTemplate) { - if(typeof(chanTemplate) !== 'string') return(null) - const uid = app.User?.identity?.uuid - if(!uid) return(null) - return(chanTemplate.replace(/\[UID\]/g, uid).replace(/\{uid\}/g, uid)) - } - - _resolvedEventsMapping() { - if(!Array.isArray(this._stagedEventsMapping)) return([]) - return(this._stagedEventsMapping.map(chanObj => { - const chan = this.resolveSubscriberChan(chanObj.chan) - if(!chan) return(null) - return({ ...chanObj, chan }) - }).filter(Boolean)) - } - get EventsMapping() { return this._stagedEventsMapping } get liveEventsMapping() { return this._curEventsMapping } set EventsMapping(newConfig) { this._stagedEventsMapping = newConfig } async commitConfig(){ - const resolvedMapping = this._resolvedEventsMapping() const chansToAdd = [] const chansToKeep = [] - for(const chanObj of resolvedMapping){ + for(const chanObj of this._stagedEventsMapping){ if(this._curEventsMapping.map(item => item.chan).includes(chanObj.chan)) chansToKeep.push(chanObj) else chansToAdd.push(chanObj) } @@ -91,7 +74,7 @@ export class Threetobus{ app.MessageBus.removeBusListener(eventToDel.eventName, this.processBusEvent.bind(this, eventToDel.eventName), 'threetobus') } - this._curEventsMapping = this.deepClone(resolvedMapping) + this._curEventsMapping = this.deepClone(this._stagedEventsMapping) } deepClone(obj) { // Needed because structuredClone doesn't take functions (and we have transformers) diff --git a/core/libs/MessageBus.js b/core/libs/MessageBus.js index abe1891..ccbae2e 100755 --- a/core/libs/MessageBus.js +++ b/core/libs/MessageBus.js @@ -84,26 +84,26 @@ class MessageBus { */ constructor(config, userInfo){ this.config = config - if(this.config.debug) console.log('Lauching Websocket worker...'); + if(this.config.debug) console.log('Lauching Websocket worker...') this.config.hostname = (('host' in this.config) && ( this.config.host!='')) ? this.config.host : document.location.hostname this.userInfo = userInfo - this.createWorker(); + this.createWorker() this.activeSubscriptions = []; - this.promisesRegister = { }; + this.promisesRegister = { } this.bus2jsEventsRegister = []; // items: { eventType:'string', RegisteredCb: function, realCb: function } - this.whenConnectedQ = []; - this.connected = false; + this.whenConnectedQ = [] + this.connected = false } /** * */ createWorker() { - if(!this.config.pathToWorker.endsWith('.js')) this.config.pathToWorker+='.js'; - this.MessageBusWorker = new Worker(this.config.pathToWorker+'?'+crypto.randomUUID()); + if(!this.config.pathToWorker.endsWith('.js')) this.config.pathToWorker+='.js' + this.MessageBusWorker = new Worker(this.config.pathToWorker+'?'+crypto.randomUUID()) this.MessageBusWorker.postMessage({ 'action':'start', 'config': this.config, 'userInfo': this.userInfo }); - this.MessageBusWorker.onmessage = this.receiveFromWorker.bind(this); - if(this.config.debug) console.log('Websocket worker launched.'); + this.MessageBusWorker.onmessage = this.receiveFromWorker.bind(this) + if(this.config.debug) console.log('Websocket worker launched.') } /** @@ -111,9 +111,9 @@ class MessageBus { * @param {*} callBack */ whenConnected(callBack){ - if(typeof(callBack) != 'function') return; - if(this.connected) callBack(); - else this.whenConnectedQ.push(callBack); + if(typeof(callBack) != 'function') return + if(this.connected) callBack() + else this.whenConnectedQ.push(callBack) } /** @@ -135,8 +135,8 @@ class MessageBus { * @param {*} callBack */ ifConnected(callBack){ - if(typeof(callBack) != 'function') return; - if(this.connected) callBack(); + if(typeof(callBack) != 'function') return + if(this.connected) callBack() } /** @@ -152,17 +152,17 @@ class MessageBus { * This method gives (and resolves) a promise, taking care of all lower-level details */ requestWssGwAction(action, payload=null, timeOut=5000){ - if(!action) return; - let request = {'action':action, 'payload':payload}; - request.reqid = crypto.randomUUID(); + if(!action) return + let request = {'action':action, 'payload':payload} + request.reqid = crypto.randomUUID() return(new Promise((resolve, fail) => { let timeOutID = setTimeout(() => { fail(`Timeout (>${timeOut}ms) for action ${action}`); if(this.promisesRegister[request.reqid]) delete(this.promisesRegister[request.reqid]) - }, timeOut); - this.promisesRegister[request.reqid] = [resolve, fail, timeOutID]; - this.MessageBusWorker.postMessage(request); - })); + }, timeOut) + this.promisesRegister[request.reqid] = [resolve, fail, timeOutID] + this.MessageBusWorker.postMessage(request) + })) } /** @@ -172,18 +172,18 @@ class MessageBus { * This method gives (and resolves) a promise, taking care of all lower-level details */ requestBusAction(chan, action, payload=null, timeOut=5000){ - if(!action) return; - let request = {'action':action, 'payload':payload}; - request.reqid = crypto.randomUUID(); + if(!action) return + let request = {'action':action, 'payload':payload} + request.reqid = crypto.randomUUID() return(new Promise((resolve, fail) => { let timeOutID = setTimeout(() => { fail(`Timeout (>${timeOut}ms) for action ${action}`); if(this.promisesRegister[request.reqid]) delete(this.promisesRegister[request.reqid]) - }, timeOut); - this.promisesRegister[request.reqid] = [resolve, fail, timeOutID]; + }, timeOut) + this.promisesRegister[request.reqid] = [resolve, fail, timeOutID] if(!chan.startsWith(this.config.frontBusPrefix)) chan = this.config.frontBusPrefix+chan this.send(chan, JSON.stringify(request)) - })); + })) } /** @@ -193,18 +193,18 @@ class MessageBus { * This method gives (and resolves) a promise, taking care of all lower-level details */ requestMidasAction(chan, action, data=null, timeOut=5000){ - if(!action) return; + if(!action) return let request = {payload: {'action':action, 'data':data}} - request.reqid = crypto.randomUUID(); + request.reqid = crypto.randomUUID() return(new Promise((resolve, fail) => { let timeOutID = setTimeout(() => { fail(`Timeout (>${timeOut}ms) for action ${action}`); if(this.promisesRegister[request.reqid]) delete(this.promisesRegister[request.reqid]) - }, timeOut); - this.promisesRegister[request.reqid] = [resolve, fail, timeOutID]; + }, timeOut) + this.promisesRegister[request.reqid] = [resolve, fail, timeOutID] if(!chan.startsWith(this.config.frontBusPrefix)) chan = this.config.frontBusPrefix+chan this.send(chan, JSON.stringify(request)) - })); + })) } /** @@ -230,9 +230,9 @@ class MessageBus { */ send(chan, msg){ // You can publish to an unsubscribed chan, userchans are the best example ! - // if(this.activeSubscriptions.indexOf(chan)<0) return; - var request = {'action':'PUB', 'payload': { 'chan':chan, 'msg': msg}}; - this.MessageBusWorker.postMessage(request); + // if(this.activeSubscriptions.indexOf(chan)<0) return + var request = {'action':'PUB', 'payload': { 'chan':chan, 'msg': msg}} + this.MessageBusWorker.postMessage(request) } /** @@ -322,11 +322,13 @@ class MessageBus { /** * Helper method to match a chan with globbing * - * @param {string} myChan (no glob) - * @param {string} targetChan PATTERN (possible glob) + * @param {string} myChan (no glob, no user expansion) + * @param {string} targetChan PATTERN (possible glob and user expansion) * @returns {boolean} */ chanMatch(myChan, targetChan) { + targetChan = targetChan.replace(/\[UID\]/g, this.userInfo.uuid) + targetChan = targetChan.replace(/\[CNXID\]/g, this.cnxId) let re = new RegExp('^'+targetChan.replace(/\*/g,'(.+)')+'$','g') return(myChan.match(re)!=null) } @@ -339,28 +341,28 @@ class MessageBus { var workermsg = e.data; if('event' in workermsg){ // event "ReceiveFromServer" is the general case of a message from server, found in data, with its own struct. - // other type og event are generated by the worker, about the connection + // other type of event are generated by the worker, about the connection switch(workermsg.event){ case 'ReceiveFromServer': - this.receiveFromServer(JSON.parse(workermsg.data)); - break; + this.receiveFromServer(JSON.parse(workermsg.data)) + break case 'connected': - this.connected = true; - if(this.config.debug) console.log('received connected event from worker !'); - this.executewhenConnectedQ(); - app.events.trigger('MessageBus.Connected'); - break; + this.connected = true + if(this.config.debug) console.log('received connected event from worker !') + this.executewhenConnectedQ() + app.events.trigger('MessageBus.Connected') + break case 'closed': - if(this.config.debug) console.log('received closed event from worker!'); - this.activeSubscriptions = []; - this.callBacksRegister = { }; - this.whenConnectedQ = []; + if(this.config.debug) console.log('received closed event from worker!') + this.activeSubscriptions = [] + this.callBacksRegister = { } + this.whenConnectedQ = [] this.connected = false; app.events.trigger('MessageBus.Closed'); - break; + break default: - if(this.config.debug) console.warn('Unknown Websocket Worker message:', workermsg); + if(this.config.debug) console.warn('Unknown Websocket Worker message:', workermsg) } } } @@ -377,35 +379,40 @@ class MessageBus { receiveFromServer(srvdata) { // See protocol reminder comment at the bottom if('action' in srvdata){ // Reply to a request - let action = srvdata.action; - let payload = ('payload' in srvdata) ? srvdata.payload : null; + let action = srvdata.action + let payload = ('payload' in srvdata) ? srvdata.payload : null // Piggyback on the results of some actions for this module internal use switch(action){ + case 'WELCOME': + console.log('Received WSS welcome', srvdata) + this.cnxId = srvdata.cnxId + this.serverTimeDelta = Date.now() - srvdata.serverTime + break case 'SUB': - if(this.activeSubscriptions.indexOf(payload)<0) this.activeSubscriptions = this.activeSubscriptions.concat(payload); - break; + if(this.activeSubscriptions.indexOf(payload)<0) this.activeSubscriptions = this.activeSubscriptions.concat(payload) + break case 'SUBLST': - if(this.activeSubscriptions.indexOf(payload)<0) this.activeSubscriptions = this.activeSubscriptions.concat(payload); - break; + if(this.activeSubscriptions.indexOf(payload)<0) this.activeSubscriptions = this.activeSubscriptions.concat(payload) + break } app.events.trigger('MessageBus.anyAction', srvdata); } else { // Low-level event : Redis Event, contrary to requ/reply with wssGateway, or other later if(('event' in srvdata) && (srvdata.event == 'REDISMSG')){ - var payload = ('payload' in srvdata) ? srvdata.payload : null; + var payload = ('payload' in srvdata) ? srvdata.payload : null if(payload && payload.msg && (payload.msg.eventType || payload.msg.action)) { if(payload.msg.eventType){ - var eventType = payload.msg.eventType; + var eventType = payload.msg.eventType app.events.trigger('MessageBus.event.'+eventType, { chan: payload.chan, sender: payload.msg.sender, eventType: payload.msg.eventType, payload: payload.msg.payload, - }); + }) } else if(payload.msg.action && payload.msg.reqid) { - let reqid = payload.msg.reqid; - let action = payload.msg.action; - let actionPayload = ('payload' in payload.msg) ? payload.msg.payload : null; - let err = ('err' in payload.msg) ? payload.msg.err : null; + let reqid = payload.msg.reqid + let action = payload.msg.action + let actionPayload = ('payload' in payload.msg) ? payload.msg.payload : null + let err = ('err' in payload.msg) ? payload.msg.err : null let success = payload.msg.success; if(reqid in this.promisesRegister) { clearTimeout(this.promisesRegister[reqid][2]); // Stop timeout timer @@ -416,12 +423,12 @@ class MessageBus { app.events.trigger('MessageBus.anyMessage', { chan: payload.chan, msg : payload.msg, - }); + }) } else if(payload && payload.bmsg){ app.events.trigger('MessageBus.promiscuousMessage', { // Repill msg : decapsulate & use spcific event chan: payload.bmsg.chan, msg : payload.bmsg.msg, - }); + }) } else { console.warn('Weird bus message (discarted) :', srvdata) @@ -431,9 +438,9 @@ class MessageBus { // For request-reply, settle promise if(srvdata.reqid && (srvdata.reqid in this.promisesRegister)) { - let payload = ('payload' in srvdata) ? srvdata.payload : null; - let err = ('err' in srvdata) ? srvdata.err : null; - let success = srvdata.success; + let payload = ('payload' in srvdata) ? srvdata.payload : null + let err = ('err' in srvdata) ? srvdata.err : null + let success = srvdata.success clearTimeout(this.promisesRegister[srvdata.reqid][2]); // Stop timeout timer if(success) this.promisesRegister[srvdata.reqid][0](payload); // resolve else this.promisesRegister[srvdata.reqid][1](`MsgBus action failed.\nError: ${err}`); // Fail diff --git a/doc/prompts_blobs.txt b/doc/prompts_blobs.txt index 3616b61..3540d7e 100644 --- a/doc/prompts_blobs.txt +++ b/doc/prompts_blobs.txt @@ -103,3 +103,5 @@ Maybe in the second browser he's viewing another angle, or meybe he's not viewin Back to business. I slightly changed the listSims query, so we can display sim name, primordial frame name and owner name. + +Now that this mecanism is in place in the gateway, Let's change Observer so that it uses a combination of sender and cnxId as key for its frustum register, instead of just sender \ No newline at end of file