Compare commits
7 Commits
| Author | SHA1 | Date | |
|---|---|---|---|
| ed42582ce7 | |||
| b551f473b4 | |||
| a868caf8c9 | |||
| 155692f71d | |||
| b82026fe85 | |||
| 1db846daa3 | |||
| 01bf35a238 |
+23
-12
@@ -10,31 +10,42 @@ export class AccesRights {
|
|||||||
this.rights = config.accessRights
|
this.rights = config.accessRights
|
||||||
}
|
}
|
||||||
|
|
||||||
mustSubscribe(uid, roles) {
|
expandPattern(pattern, uid, cnxId=null) {
|
||||||
|
if(/\[CUID\]/.test(pattern) && !cnxId) return(null)
|
||||||
|
let item = pattern.replace(/\[UID\]/g, uid)
|
||||||
|
if(cnxId) item = item.replace(/\[CUID\]/g, cnxId)
|
||||||
|
return(item)
|
||||||
|
}
|
||||||
|
|
||||||
|
expandPatterns(patterns, uid, cnxId=null) {
|
||||||
|
return(patterns.map(item => this.expandPattern(item, uid, cnxId)).filter(item => item != null))
|
||||||
|
}
|
||||||
|
|
||||||
|
mustSubscribe(uid, roles, cnxId=null) {
|
||||||
if(roles.indexOf('*')<0) roles.push('*')
|
if(roles.indexOf('*')<0) roles.push('*')
|
||||||
let chans = []
|
let chans = []
|
||||||
for(let myRole of roles){
|
for(let myRole of roles){
|
||||||
for(let rightBlock of this.rights) {
|
for(let rightBlock of this.rights) {
|
||||||
if(!rightBlock.mustSubscribe) continue
|
if(!rightBlock.mustSubscribe) continue
|
||||||
if((rightBlock.roles=='*') || (rightBlock.roles.indexOf(myRole)>-1)) {
|
if((rightBlock.roles=='*') || (rightBlock.roles.indexOf(myRole)>-1)) {
|
||||||
chans = this.merge(chans, rightBlock.mustSubscribe.map(item=>item.replace(/\[UID\]/g,uid)))
|
chans = this.merge(chans, this.expandPatterns(rightBlock.mustSubscribe, uid, cnxId))
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
return(chans)
|
return(chans)
|
||||||
}
|
}
|
||||||
|
|
||||||
isMandatory(uid, roles, chan){
|
isMandatory(uid, roles, chan, cnxId=null){
|
||||||
return(this.mustSubscribe(uid, roles).filter(this.chanMatch.bind(this, chan)).length>0)
|
return(this.mustSubscribe(uid, roles, cnxId).filter(this.chanMatch.bind(this, chan)).length>0)
|
||||||
}
|
}
|
||||||
|
|
||||||
canSubscribe(uid, roles, myChan) {
|
canSubscribe(uid, roles, myChan, cnxId=null) {
|
||||||
if(roles.indexOf('*')<0) roles.push('*')
|
if(roles.indexOf('*')<0) roles.push('*')
|
||||||
for(let myRole of roles){
|
for(let myRole of roles){
|
||||||
for(let rightBlock of this.rights) {
|
for(let rightBlock of this.rights) {
|
||||||
if(!rightBlock.canSubscribe) continue
|
if(!rightBlock.canSubscribe) continue
|
||||||
if((rightBlock.roles=='*') || (rightBlock.roles.indexOf(myRole)>-1)) {
|
if((rightBlock.roles=='*') || (rightBlock.roles.indexOf(myRole)>-1)) {
|
||||||
let canSubList = rightBlock.canSubscribe.map(item=>item.replace(/\[UID\]/g, uid))
|
let canSubList = this.expandPatterns(rightBlock.canSubscribe, uid, cnxId)
|
||||||
if(canSubList.find(this.chanMatch.bind(this, myChan))) return(true)
|
if(canSubList.find(this.chanMatch.bind(this, myChan))) return(true)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@@ -43,13 +54,13 @@ export class AccesRights {
|
|||||||
return(false)
|
return(false)
|
||||||
}
|
}
|
||||||
|
|
||||||
canPublish(uid, roles, myChan) {
|
canPublish(uid, roles, myChan, cnxId=null) {
|
||||||
if(roles.indexOf('*')<0) roles.push('*')
|
if(roles.indexOf('*')<0) roles.push('*')
|
||||||
for(let myRole of roles){
|
for(let myRole of roles){
|
||||||
for(let rightBlock of this.rights) {
|
for(let rightBlock of this.rights) {
|
||||||
if(!rightBlock.canPublish) continue
|
if(!rightBlock.canPublish) continue
|
||||||
if((rightBlock.roles=='*') || (rightBlock.roles.indexOf(myRole)>-1)) {
|
if((rightBlock.roles=='*') || (rightBlock.roles.indexOf(myRole)>-1)) {
|
||||||
let canPubList = rightBlock.canPublish.map(item=>item.replace(/\[UID\]/g, uid))
|
let canPubList = this.expandPatterns(rightBlock.canPublish, uid, cnxId)
|
||||||
if(canPubList.find(this.chanMatch.bind(this, myChan))) return(true)
|
if(canPubList.find(this.chanMatch.bind(this, myChan))) return(true)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@@ -58,13 +69,13 @@ export class AccesRights {
|
|||||||
return(false)
|
return(false)
|
||||||
}
|
}
|
||||||
|
|
||||||
canSet(uid, roles, myKey){
|
canSet(uid, roles, myKey, cnxId=null){
|
||||||
if(roles.indexOf('*')<0) roles.push('*')
|
if(roles.indexOf('*')<0) roles.push('*')
|
||||||
for(let myRole of roles){
|
for(let myRole of roles){
|
||||||
for(let rightBlock of this.rights) {
|
for(let rightBlock of this.rights) {
|
||||||
if(!rightBlock.canSet) continue
|
if(!rightBlock.canSet) continue
|
||||||
if((rightBlock.roles=='*') || (rightBlock.roles.indexOf(myRole)>-1)) {
|
if((rightBlock.roles=='*') || (rightBlock.roles.indexOf(myRole)>-1)) {
|
||||||
let canSetList = rightBlock.canSet.map(item=>item.replace(/\[UID\]/g, uid))
|
let canSetList = this.expandPatterns(rightBlock.canSet, uid, cnxId)
|
||||||
if(canSetList.find(this.chanMatch.bind(this, myKey))) return(true)
|
if(canSetList.find(this.chanMatch.bind(this, myKey))) return(true)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@@ -73,13 +84,13 @@ export class AccesRights {
|
|||||||
return(false)
|
return(false)
|
||||||
}
|
}
|
||||||
|
|
||||||
canGet(uid, roles, myKey){
|
canGet(uid, roles, myKey, cnxId=null){
|
||||||
if(roles.indexOf('*')<0) roles.push('*')
|
if(roles.indexOf('*')<0) roles.push('*')
|
||||||
for(let myRole of roles){
|
for(let myRole of roles){
|
||||||
for(let rightBlock of this.rights) {
|
for(let rightBlock of this.rights) {
|
||||||
if(!rightBlock.canGet) continue
|
if(!rightBlock.canGet) continue
|
||||||
if((rightBlock.roles=='*') || (rightBlock.roles.indexOf(myRole)>-1)) {
|
if((rightBlock.roles=='*') || (rightBlock.roles.indexOf(myRole)>-1)) {
|
||||||
let canGetList = rightBlock.canGet.map(item=>item.replace(/\[UID\]/g, uid))
|
let canGetList = this.expandPatterns(rightBlock.canGet, uid, cnxId)
|
||||||
if(canGetList.find(this.chanMatch.bind(this, myKey))) return(true)
|
if(canGetList.find(this.chanMatch.bind(this, myKey))) return(true)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|||||||
+13
-7
@@ -20,7 +20,9 @@ export const methods = {
|
|||||||
let subscribed = []
|
let subscribed = []
|
||||||
for(var chan of payload){
|
for(var chan of payload){
|
||||||
if((!chan) || (typeof(chan)!='string')) continue
|
if((!chan) || (typeof(chan)!='string')) continue
|
||||||
if(!this.accessRights.canSubscribe(this.userId, this.roles, chan)) {
|
chan = chan.replace(/\[UID\]/g, this.userId)
|
||||||
|
chan = chan.replace(/\[CUID\]/g, this.uuid)
|
||||||
|
if(!this.accessRights.canSubscribe(this.userId, this.roles, chan, this.uuid)) {
|
||||||
if(this.debug) console.log('SUB: No rights to this chan!', this.userId, this.roles, chan)
|
if(this.debug) console.log('SUB: No rights to this chan!', this.userId, this.roles, chan)
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
@@ -29,7 +31,7 @@ export const methods = {
|
|||||||
for(const rediscnx of this.allRediscnx){
|
for(const rediscnx of this.allRediscnx){
|
||||||
if(!chan.startsWith(rediscnx.redisConfig.chansNamespace)) continue
|
if(!chan.startsWith(rediscnx.redisConfig.chansNamespace)) continue
|
||||||
else coudSubscribe = true
|
else coudSubscribe = true
|
||||||
let localChan = rediscnx.redisConfig.basePrefix + localChan
|
let localChan = rediscnx.redisConfig.basePrefix + chan
|
||||||
if(!(localChan in rediscnx.subscriptions)) rediscnx.subscriptions[localChan] = [];
|
if(!(localChan in rediscnx.subscriptions)) rediscnx.subscriptions[localChan] = [];
|
||||||
if(!rediscnx.subscriptions[localChan].includes(this.uuid)) {
|
if(!rediscnx.subscriptions[localChan].includes(this.uuid)) {
|
||||||
rediscnx.subscriptions[localChan].push(this.uuid);
|
rediscnx.subscriptions[localChan].push(this.uuid);
|
||||||
@@ -72,7 +74,9 @@ export const methods = {
|
|||||||
let unSubscribed = []
|
let unSubscribed = []
|
||||||
for(var chan of payload){
|
for(var chan of payload){
|
||||||
if((!chan) || (typeof(chan)!='string')) continue
|
if((!chan) || (typeof(chan)!='string')) continue
|
||||||
if(this.accessRights.isMandatory(this.userId, this.roles, chan)) continue
|
chan = chan.replace(/\[UID\]/g, this.userId)
|
||||||
|
chan = chan.replace(/\[CUID\]/g, this.uuid)
|
||||||
|
if(this.accessRights.isMandatory(this.userId, this.roles, chan, this.uuid)) continue
|
||||||
|
|
||||||
let couldUnsubscribe = false
|
let couldUnsubscribe = false
|
||||||
for(const rediscnx of this.allRediscnx){
|
for(const rediscnx of this.allRediscnx){
|
||||||
@@ -146,7 +150,7 @@ export const methods = {
|
|||||||
return;
|
return;
|
||||||
};
|
};
|
||||||
|
|
||||||
if( (!this.accessRights.canPublish(this.userId, this.roles, payload.chan)) ) {
|
if( (!this.accessRights.canPublish(this.userId, this.roles, payload.chan, this.uuid)) ) {
|
||||||
this.sendErr(action, 'Unauthorized chan !', reqid);
|
this.sendErr(action, 'Unauthorized chan !', reqid);
|
||||||
if(this.debug) console.log('PUB: Unauthorized chan', payload.chan, this.userId, this.roles)
|
if(this.debug) console.log('PUB: Unauthorized chan', payload.chan, this.userId, this.roles)
|
||||||
return
|
return
|
||||||
@@ -226,15 +230,17 @@ export const methods = {
|
|||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
if( (!this.accessRights.canSubscribe(this.userId, this.roles, payload.channel)) ) {
|
payload.channel = payload.channel.replace(/\[UID\]/g, this.userId)
|
||||||
|
.replace(/\[CUID\]/g, this.uuid)
|
||||||
|
if( (!this.accessRights.canSubscribe(this.userId, this.roles, payload.channel, this.uuid)) ) {
|
||||||
this.sendErr(action, 'CHANHIST: Unauthorized channel !', reqid)
|
this.sendErr(action, 'CHANHIST: Unauthorized channel !', reqid)
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
const primaryRediscnx = this.allRediscnx.find(cnx => ((chan.startsWith(cnx.redisConfig.chansNamespace)) &&(cnx.redisConfig.role=='primary')) )
|
const primaryRediscnx = this.allRediscnx.find(cnx => ((payload.channel.startsWith(cnx.redisConfig.chansNamespace)) &&(cnx.redisConfig.role=='primary')) )
|
||||||
if(!primaryRediscnx){
|
if(!primaryRediscnx){
|
||||||
this.sendErr(action, 'No primary redis for this chan !', reqid);
|
this.sendErr(action, 'No primary redis for this chan !', reqid);
|
||||||
if(this.debug) console.log('CHANHIST: No primary redis for this chan ', chan)
|
if(this.debug) console.log('CHANHIST: No primary redis for this chan ', payload.channel)
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|||||||
+2
-2
@@ -30,7 +30,7 @@ export const methods = {
|
|||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
if(!this.accessRights.canSet(this.userId, this.roles, payload.key)){
|
if(!this.accessRights.canSet(this.userId, this.roles, payload.key, this.uuid)){
|
||||||
this.sendErr(action, 'Unauthorized key !', reqid);
|
this.sendErr(action, 'Unauthorized key !', reqid);
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
@@ -92,7 +92,7 @@ export const methods = {
|
|||||||
return;
|
return;
|
||||||
};
|
};
|
||||||
|
|
||||||
if(!this.accessRights.canGet(this.userId, this.roles, payload.key)) {
|
if(!this.accessRights.canGet(this.userId, this.roles, payload.key, this.uuid)) {
|
||||||
console.log('Unauth GET key:',this.userId, this.roles, payload.key)
|
console.log('Unauth GET key:',this.userId, this.roles, payload.key)
|
||||||
this.sendErr(action, 'Unauthorized key !', reqid);
|
this.sendErr(action, 'Unauthorized key !', reqid);
|
||||||
return
|
return
|
||||||
|
|||||||
@@ -9,8 +9,15 @@ export const methods = {
|
|||||||
"action": "PONG",
|
"action": "PONG",
|
||||||
}
|
}
|
||||||
*/
|
*/
|
||||||
action_PONG(action, payload){
|
action_PONG(action, payload, reqid){
|
||||||
clearTimeout(this.keepAliveBomb);
|
|
||||||
|
if(reqid == this.latestPing.id) {
|
||||||
|
console.log('====>PONG', reqid, this.latestPing, Date.now() - this.latestPing.time)
|
||||||
|
this.roundTripTime = (this.roundTripTime + (Date.now() - this.latestPing.time)) / 2 //Exponential smoothing
|
||||||
|
}
|
||||||
|
// First defuse the bomb
|
||||||
|
clearTimeout(this.keepAliveBomb)
|
||||||
|
// Then setup the next one ;-)
|
||||||
this.keepAliveNextTimeout = setTimeout(this.keepAlive.bind(this),this.config.server.keepAliveInterval*1000);
|
this.keepAliveNextTimeout = setTimeout(this.keepAlive.bind(this),this.config.server.keepAliveInterval*1000);
|
||||||
},
|
},
|
||||||
|
|
||||||
|
|||||||
+3
-2
@@ -117,8 +117,9 @@
|
|||||||
"listenPath": { "type": "string" },
|
"listenPath": { "type": "string" },
|
||||||
"unsecure": { "type": "boolean" },
|
"unsecure": { "type": "boolean" },
|
||||||
"challengeExpiration": { "type": "integer" },
|
"challengeExpiration": { "type": "integer" },
|
||||||
"keepAliveInterval": { "type": "string" },
|
"keepAliveInterval": { "type": "number" },
|
||||||
"keepAliveTimeout": { "type": "string" }
|
"keepAliveTimeout": { "type": "number" },
|
||||||
|
"refreshSessionInterval": { "type": "number" }
|
||||||
},
|
},
|
||||||
"required": [
|
"required": [
|
||||||
"challengeExpiration",
|
"challengeExpiration",
|
||||||
|
|||||||
+29
-11
@@ -10,6 +10,18 @@ import express from 'express'
|
|||||||
import session from 'express-session'
|
import session from 'express-session'
|
||||||
import connectMySQL from 'express-mysql-session'
|
import connectMySQL from 'express-mysql-session'
|
||||||
|
|
||||||
|
const originalLog = console.log
|
||||||
|
const originalWarn = console.warn
|
||||||
|
const originalError = console.error
|
||||||
|
function logWithTimestamp(originalFn, level, ...args) {
|
||||||
|
const timestamp = new Date().toISOString()
|
||||||
|
originalFn(`[${timestamp}] [${level}]`, ...args)
|
||||||
|
}
|
||||||
|
|
||||||
|
console.log = (...args) => logWithTimestamp(originalLog, 'LOG', ...args)
|
||||||
|
console.warn = (...args) => logWithTimestamp(originalWarn, 'WARN', ...args)
|
||||||
|
console.error = (...args) => logWithTimestamp(originalError, 'ERROR', ...args)
|
||||||
|
|
||||||
const argv = yargs(hideBin(process.argv)).command('wssGateway', 'Redis <=> Websocket message bus gateway', {})
|
const argv = yargs(hideBin(process.argv)).command('wssGateway', 'Redis <=> Websocket message bus gateway', {})
|
||||||
.options({
|
.options({
|
||||||
'argv.debug': {
|
'argv.debug': {
|
||||||
@@ -31,21 +43,26 @@ const mysqlCreds = {
|
|||||||
// host: '127.0.0.1',
|
// host: '127.0.0.1',
|
||||||
// port: 3306,
|
// port: 3306,
|
||||||
socketPath: '/var/run/mysqld/mysqld.sock',
|
socketPath: '/var/run/mysqld/mysqld.sock',
|
||||||
user: 'p42',
|
user: process.env.mysql_user,
|
||||||
password: 'C3h=V9!r>Mvc>skxPf9?W2P3duJTk',
|
password: process.env.mysql_pass,
|
||||||
database: 'p42',
|
database: 'p42GUI',
|
||||||
waitForConnections: true,
|
waitForConnections: true,
|
||||||
connectionLimit: 10,
|
connectionLimit: 10,
|
||||||
queueLimit: 0
|
queueLimit: 0
|
||||||
}
|
}
|
||||||
|
|
||||||
|
if(!mysqlCreds.user || !mysqlCreds.password) {
|
||||||
|
console.error('Missing MySQL credentials: set mysql_user and mysql_pass in environment')
|
||||||
|
process.exit(1)
|
||||||
|
}
|
||||||
|
|
||||||
const db = await mysql.createConnection(mysqlCreds)
|
const db = await mysql.createConnection(mysqlCreds)
|
||||||
|
|
||||||
const sessionStore = new MySQLStore({
|
const sessionStore = new MySQLStore({
|
||||||
createDatabaseTable: false,
|
createDatabaseTable: false,
|
||||||
clearExpired: true,
|
clearExpired: true,
|
||||||
schema: {
|
schema: {
|
||||||
tableName: 'p42_sessions',
|
tableName: 'sessions',
|
||||||
columnNames: {
|
columnNames: {
|
||||||
session_id: 'session_id',
|
session_id: 'session_id',
|
||||||
expires: 'expires',
|
expires: 'expires',
|
||||||
@@ -84,12 +101,12 @@ async function startAllRedis(wssGatewayConfig) {
|
|||||||
)
|
)
|
||||||
const loginResults = await Promise.allSettled(
|
const loginResults = await Promise.allSettled(
|
||||||
redisConns.map(async cnx => {
|
redisConns.map(async cnx => {
|
||||||
cnx.redisLogin()
|
await cnx.redisLogin()
|
||||||
return cnx.redisId
|
return cnx.redisId
|
||||||
})
|
})
|
||||||
)
|
)
|
||||||
|
|
||||||
2. //make sure all connected before going any further
|
//2. make sure all connected before going any further
|
||||||
const failedLogin = loginResults.filter(r => r.status !== 'fulfilled')
|
const failedLogin = loginResults.filter(r => r.status !== 'fulfilled')
|
||||||
if (failedLogin.length > 0) {
|
if (failedLogin.length > 0) {
|
||||||
console.error('Redis login failures:')
|
console.error('Redis login failures:')
|
||||||
@@ -107,7 +124,7 @@ async function startAllRedis(wssGatewayConfig) {
|
|||||||
// --- Phase 2: start channels for all (since all succeeded)
|
// --- Phase 2: start channels for all (since all succeeded)
|
||||||
const chanResults = await Promise.allSettled(
|
const chanResults = await Promise.allSettled(
|
||||||
redisConns.map(async cnx => {
|
redisConns.map(async cnx => {
|
||||||
cnx.redisChansStart()
|
await cnx.redisChansStart()
|
||||||
return cnx.redisId
|
return cnx.redisId
|
||||||
})
|
})
|
||||||
)
|
)
|
||||||
@@ -172,7 +189,7 @@ cfgh.fetchConfig().then( async wssGatewayConfig => {
|
|||||||
return
|
return
|
||||||
}
|
}
|
||||||
WSSServer.handleUpgrade(req, socket, head, (ws) => {
|
WSSServer.handleUpgrade(req, socket, head, (ws) => {
|
||||||
ws.session = req.session // direct access to Express session
|
ws.session = req.session // Caution : that one is turned in stone, not alive !
|
||||||
WSSServer.emit('connection', ws, req)
|
WSSServer.emit('connection', ws, req)
|
||||||
})
|
})
|
||||||
})
|
})
|
||||||
@@ -186,12 +203,13 @@ cfgh.fetchConfig().then( async wssGatewayConfig => {
|
|||||||
// ws.send(`You visited ${ws.session.views} times`)
|
// ws.send(`You visited ${ws.session.views} times`)
|
||||||
// })
|
// })
|
||||||
|
|
||||||
|
startAllRedis(wssGatewayConfig)
|
||||||
|
.then((allRediscnx) => {
|
||||||
|
console.log('All edis OK')
|
||||||
|
const wssSrv = new wssServer(cfgh, WSSServer, allRediscnx, debug);
|
||||||
HTTPserver.listen(wssGatewayConfig.server.listenPort, () => {
|
HTTPserver.listen(wssGatewayConfig.server.listenPort, () => {
|
||||||
console.log(`WS${wssGatewayConfig.server.unsecure ? '': 'S'} server created for ${wssGatewayConfig.server.listenHost}:${wssGatewayConfig.server.listenPort}`)
|
console.log(`WS${wssGatewayConfig.server.unsecure ? '': 'S'} server created for ${wssGatewayConfig.server.listenHost}:${wssGatewayConfig.server.listenPort}`)
|
||||||
})
|
})
|
||||||
|
|
||||||
startAllRedis(wssGatewayConfig).then((allRediscnx) => {
|
|
||||||
const wssSrv = new wssServer(cfgh, WSSServer, allRediscnx, debug);
|
|
||||||
});
|
});
|
||||||
|
|
||||||
|
|
||||||
|
|||||||
+2
-2
@@ -71,7 +71,7 @@ export class RedisConnexion {
|
|||||||
|
|
||||||
async redisSubscribe(chanName, callBack){
|
async redisSubscribe(chanName, callBack){
|
||||||
if(!chanName.startsWith(this.redisConfig.basePrefix)) chanName = this.redisConfig.basePrefix + chanName
|
if(!chanName.startsWith(this.redisConfig.basePrefix)) chanName = this.redisConfig.basePrefix + chanName
|
||||||
if(!chanName.startsWith(this.redisConfig.basePrefix+this.chansNamespace)) {
|
if(!chanName.startsWith(this.redisConfig.basePrefix+this.redisConfig.chansNamespace)) {
|
||||||
console.warn(`[${this.redisConfig.redisId}] redisSubscribe : forbidden channel range on this redis !`)
|
console.warn(`[${this.redisConfig.redisId}] redisSubscribe : forbidden channel range on this redis !`)
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
@@ -81,7 +81,7 @@ export class RedisConnexion {
|
|||||||
async redisPublish(chanName, msg){
|
async redisPublish(chanName, msg){
|
||||||
if(typeof (msg) != 'string') msg = JSON.stringify(msg);
|
if(typeof (msg) != 'string') msg = JSON.stringify(msg);
|
||||||
if(!chanName.startsWith(this.redisConfig.basePrefix)) chanName = this.redisConfig.basePrefix + chanName
|
if(!chanName.startsWith(this.redisConfig.basePrefix)) chanName = this.redisConfig.basePrefix + chanName
|
||||||
if(!chanName.startsWith(this.redisConfig.basePrefix+this.chansNamespace)) {
|
if(!chanName.startsWith(this.redisConfig.basePrefix+this.redisConfig.chansNamespace)) {
|
||||||
console.warn(`[${this.redisConfig.redisId}] redisPublish : forbidden channel range on this redis !`)
|
console.warn(`[${this.redisConfig.redisId}] redisPublish : forbidden channel range on this redis !`)
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|||||||
+5
-1
@@ -1,11 +1,15 @@
|
|||||||
#!/bin/sh
|
#!/bin/sh
|
||||||
|
|
||||||
|
set -a
|
||||||
|
. /etc/p42/secrets.env
|
||||||
|
set +a
|
||||||
|
|
||||||
cd /opt/p42wssGateway/
|
cd /opt/p42wssGateway/
|
||||||
|
|
||||||
pid=`ps -ef | grep p42wssGateway |grep -v grep | awk '{print $2}'`
|
pid=`ps -ef | grep p42wssGateway |grep -v grep | awk '{print $2}'`
|
||||||
if [ -z "$pid" ]
|
if [ -z "$pid" ]
|
||||||
then
|
then
|
||||||
node p42wssGateway.js > wssGateway.log 2>&1 &
|
node p42wssGateway.js --debug > wssGateway.log 2>&1 &
|
||||||
else
|
else
|
||||||
echo ''
|
echo ''
|
||||||
echo 'Already running PID='"$pid"' (use stopWssGw.sh to stop it)'
|
echo 'Already running PID='"$pid"' (use stopWssGw.sh to stop it)'
|
||||||
|
|||||||
+17
-5
@@ -17,6 +17,7 @@ export class WssConnexion {
|
|||||||
this.userId = options.userId
|
this.userId = options.userId
|
||||||
this.roles = options.roles
|
this.roles = options.roles
|
||||||
this.sessionID = null // null until login
|
this.sessionID = null // null until login
|
||||||
|
this.roundTripTime = 0
|
||||||
|
|
||||||
this.subscriptions = [];
|
this.subscriptions = [];
|
||||||
this.usersWatched = [];
|
this.usersWatched = [];
|
||||||
@@ -30,10 +31,15 @@ export class WssConnexion {
|
|||||||
}
|
}
|
||||||
|
|
||||||
welcome(){
|
welcome(){
|
||||||
clearTimeout(this.challengeTimeout)
|
// clearTimeout(this.challengeTimeout)
|
||||||
this.challengeTimeout = null
|
// this.challengeTimeout = null
|
||||||
this.cnxState = 'CONNECTED'
|
this.cnxState = 'CONNECTED'
|
||||||
if(this.debug) console.log(`Welcome to UUID ${this.uuid}`)
|
this.send(JSON.stringify({
|
||||||
|
'action': 'WELCOME',
|
||||||
|
'cnxId': this.uuid,
|
||||||
|
'serverTime': Date.now(),
|
||||||
|
}));
|
||||||
|
if(this.debug) console.log(`Welcome to UUID ${this.uuid}, server time: ${performance.now()}`)
|
||||||
}
|
}
|
||||||
|
|
||||||
startKeepAlive() {
|
startKeepAlive() {
|
||||||
@@ -58,7 +64,7 @@ export class WssConnexion {
|
|||||||
}
|
}
|
||||||
|
|
||||||
if(typeof this['action_'+action] == "function") {
|
if(typeof this['action_'+action] == "function") {
|
||||||
if((this.debug) && (action != 'PONG')) console.warn(`${action} for uuid ${this.uuid}`);
|
if((this.debug) && (action != 'PONG rcv')) console.warn(`${action} for uuid ${this.uuid}`);
|
||||||
this['action_'+action](action, payload, reqid);
|
this['action_'+action](action, payload, reqid);
|
||||||
} else {
|
} else {
|
||||||
if(this.debug) console.warn(`Unknown action ${action} for UUID ${this.uuid}`);
|
if(this.debug) console.warn(`Unknown action ${action} for UUID ${this.uuid}`);
|
||||||
@@ -72,9 +78,15 @@ export class WssConnexion {
|
|||||||
|
|
||||||
|
|
||||||
keepAlive(){
|
keepAlive(){
|
||||||
|
this.latestPing = {
|
||||||
|
id: crypto.randomUUID(),
|
||||||
|
time: Date.now()
|
||||||
|
}
|
||||||
this.send(JSON.stringify({
|
this.send(JSON.stringify({
|
||||||
'action': 'PING',
|
'action': 'PING',
|
||||||
|
'reqid': this.latestPing.id,
|
||||||
}));
|
}));
|
||||||
|
console.log('====>PING sent', this.latestPing)
|
||||||
this.keepAliveBomb = setTimeout(this.MissedKeepAlive.bind(this), (this.config.server.keepAliveTimeout+1)*1000);
|
this.keepAliveBomb = setTimeout(this.MissedKeepAlive.bind(this), (this.config.server.keepAliveTimeout+1)*1000);
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -97,7 +109,7 @@ export class WssConnexion {
|
|||||||
}
|
}
|
||||||
|
|
||||||
subscribeMandatoryChans(){
|
subscribeMandatoryChans(){
|
||||||
let mandaChans = this.accessRights.mustSubscribe(this.userId, this.roles)
|
let mandaChans = this.accessRights.mustSubscribe(this.userId, this.roles, this.uuid)
|
||||||
for(let rediscnx of this.allRediscnx){
|
for(let rediscnx of this.allRediscnx){
|
||||||
mandaChans = mandaChans.filter(chan => chan.startsWith(rediscnx.redisConfig.chansNamespace))
|
mandaChans = mandaChans.filter(chan => chan.startsWith(rediscnx.redisConfig.chansNamespace))
|
||||||
mandaChans = mandaChans.map(item=>rediscnx.redisConfig.basePrefix+item)
|
mandaChans = mandaChans.map(item=>rediscnx.redisConfig.basePrefix+item)
|
||||||
|
|||||||
+8
-14
@@ -4,8 +4,9 @@
|
|||||||
"listenHost": "127.0.0.1",
|
"listenHost": "127.0.0.1",
|
||||||
"listenPort": 3999,
|
"listenPort": 3999,
|
||||||
"listenPath": "/msgbus",
|
"listenPath": "/msgbus",
|
||||||
"keepAliveInterval": "30",
|
"keepAliveInterval": 30,
|
||||||
"keepAliveTimeout": "5",
|
"keepAliveTimeout": 5,
|
||||||
|
"refreshSessionInterval": 600,
|
||||||
"XXcertFile": "/etc/letsencrypt/live/42.internike.com/fullchain.pem",
|
"XXcertFile": "/etc/letsencrypt/live/42.internike.com/fullchain.pem",
|
||||||
"XXcertKeyFile": "/etc/letsencrypt/live/42.internike.com/privkey.pem",
|
"XXcertKeyFile": "/etc/letsencrypt/live/42.internike.com/privkey.pem",
|
||||||
"challengeExpiration": 20,
|
"challengeExpiration": 20,
|
||||||
@@ -14,20 +15,13 @@
|
|||||||
"accessRights":[
|
"accessRights":[
|
||||||
{
|
{
|
||||||
"roles": "*",
|
"roles": "*",
|
||||||
"mustSubscribe": [ "system:notifs:[UID]", "system:notifs" ],
|
"mustSubscribe": [ "system:notifs:[UID]", "system:notifs", "system:replies:[UID]" ],
|
||||||
"canSubscribe": ["system:gps:*", "arena:gps:*","arena:agents:*"],
|
"canSubscribe": [ "system:gps:*", "arena:gps:*","arena:agents:*", "arena:requests:[UID]", "arena:replies:[UID]",
|
||||||
"canPublish": [ ],
|
"system:observer:subscribed[CUID]:agents",
|
||||||
|
"system:maestro:lifecycle:[UID]" ],
|
||||||
|
"canPublish": [ "system:requests:*", "system:observer:requests" ],
|
||||||
"canSet": [ ],
|
"canSet": [ ],
|
||||||
"canGet": [ ]
|
"canGet": [ ]
|
||||||
},
|
|
||||||
{
|
|
||||||
"roles": ["admin"],
|
|
||||||
"mustSubscribe": ["system:infraNotifs", "system:replies:[UID]"],
|
|
||||||
"canSubscribe": [ ],
|
|
||||||
"canPublish": [ "system:requests:*" ],
|
|
||||||
"canSet": ["system:*"],
|
|
||||||
"canGet": ["system:*"],
|
|
||||||
"canDo": ["getActiveUsers", "reloadAccessRights", "getAccessRights"]
|
|
||||||
}
|
}
|
||||||
],
|
],
|
||||||
"redis":[
|
"redis":[
|
||||||
|
|||||||
+10
-8
@@ -29,6 +29,7 @@ export class wssServer {
|
|||||||
}
|
}
|
||||||
|
|
||||||
newWSSConnexion(socket, req) {
|
newWSSConnexion(socket, req) {
|
||||||
|
console.log('newWSSConnexion')
|
||||||
var uuid = crypto.randomUUID();
|
var uuid = crypto.randomUUID();
|
||||||
if(socket.session && socket.session.authenticated && socket.session.userInfos && socket.session.userInfos.identity && socket.session.userInfos.identity.username){
|
if(socket.session && socket.session.authenticated && socket.session.userInfos && socket.session.userInfos.identity && socket.session.userInfos.identity.username){
|
||||||
var wssCnx = new WssConnexion({
|
var wssCnx = new WssConnexion({
|
||||||
@@ -48,12 +49,13 @@ export class wssServer {
|
|||||||
this.Users2uuids[wssCnx.userId].add(uuid);
|
this.Users2uuids[wssCnx.userId].add(uuid);
|
||||||
this.OnlineUsers.add(wssCnx.userId);
|
this.OnlineUsers.add(wssCnx.userId);
|
||||||
this.allRediscnx.forEach(cnx => { cnx.wssConnections[uuid] = wssCnx })
|
this.allRediscnx.forEach(cnx => { cnx.wssConnections[uuid] = wssCnx })
|
||||||
this.postLoginActions(wssCnx)
|
setImmediate(() => this.postLoginActions(wssCnx))
|
||||||
} else socket.close()
|
} else socket.close()
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
postLoginActions(wssCnx) {
|
postLoginActions(wssCnx) {
|
||||||
|
wssCnx.welcome()
|
||||||
wssCnx.startKeepAlive()
|
wssCnx.startKeepAlive()
|
||||||
wssCnx.subscribeMandatoryChans()
|
wssCnx.subscribeMandatoryChans()
|
||||||
wssCnx.action_SUBLST('SUBLST', null, '')
|
wssCnx.action_SUBLST('SUBLST', null, '')
|
||||||
@@ -92,13 +94,13 @@ export class wssServer {
|
|||||||
return(OnlineUsers);
|
return(OnlineUsers);
|
||||||
}
|
}
|
||||||
|
|
||||||
sessionConnected(sessionID){
|
// sessionConnected(sessionID){
|
||||||
if(!sessionID) return(false) // If that cnx is not finished login-in
|
// if(!sessionID) return(false) // If that cnx is not finished login-in
|
||||||
for(let uuid in this.AllWssConnections) {
|
// for(let uuid in this.AllWssConnections) {
|
||||||
if(this.AllWssConnections[uuid].sessionID==sessionID) return(true)
|
// if(this.AllWssConnections[uuid].sessionID==sessionID) return(true)
|
||||||
}
|
// }
|
||||||
return(false)
|
// return(false)
|
||||||
}
|
// }
|
||||||
|
|
||||||
async reloadAccessRights() {
|
async reloadAccessRights() {
|
||||||
await this.configHelper.refreshAccessRights()
|
await this.configHelper.refreshAccessRights()
|
||||||
|
|||||||
Reference in New Issue
Block a user