diff --git a/components/devices/class.interface.js b/components/devices/class.interface.js index 7a24a97..eba21f4 100644 --- a/components/devices/class.interface.js +++ b/components/devices/class.interface.js @@ -3,7 +3,7 @@ const { Agent } = require("http"); const https = require("https"); const tls = require("tls"); const mongodb = require("mongodb"); -const { Duplex, PassThrough } = require("stream"); +const { PassThrough, pipeline, duplexPair } = require("stream"); const { randomUUID } = require("crypto"); //const path = require("path"); @@ -176,7 +176,7 @@ module.exports = class Interface { } - + /* bridge() { let { logger } = Interface.scope; @@ -222,11 +222,21 @@ module.exports = class Interface { } + /* ["close", "end", "error"].forEach((event) => { stream.on(event, (...args) => { + console.log("Event", event, "on websocket stream received"); socket.emit(event, ...args); }); }); + * + + stream.once("end", () => { + console.log("End on websocket stream received, call end on socket stream") + writable.end(); + readable.end(); + socket.end(); + }); stream.once("destroy", () => { writable.destory(); @@ -277,6 +287,70 @@ module.exports = class Interface { return socket; + } + */ + + // new approach + bridge() { + + let { logger } = Interface.scope; + let { host, port, socket: proto } = this.settings; + + // from this.adapter array + // placeholder for adapter pipeline + const encode = new PassThrough(); // outbound + const decode = new PassThrough(); // inbound + const [clientSock, proxySock] = duplexPair(); + + + ["ref", "unref", "setKeepAlive", "setTimeout", "setNoDelay"].forEach((fnc) => { + clientSock[fnc] = (...args) => { + logger.trace(`Interface ${this._id} method "${fnc}" called on socket stream, args:`, args); + }; + }); + + Interface.socket(this._id, (err, ws, request) => { + + if (err) { + + if (process.env.NODE_ENV === "development") { + logger.warn(err, "Could not create socket"); + } + + return clientSock.destroy(err); + + } + + if (process.env.NODE_ENV === "development") { + + clientSock.once("open", () => { + logger.debug(`Bridge open: iface ${this._id} <-> ${proto}://${host}:${port} (${request.uuid})`); + }); + + clientSock.once("close", () => { + logger.debug(`Bridge closed: iface ${this._id} <-> ${proto}://${host}:${port} (${request.uuid})`); + }); + + } + + // outbound: client -> encode -> websocket + pipeline(proxySock, encode, ws, (err) => { + clientSock.destroy(err); + }); + + // inbound: websocket -> decode -> client + pipeline(ws, decode, proxySock, (err) => { + clientSock.destroy(err); + }); + + process.nextTick(() => { + clientSock.emit("open"); + }); + + }); + + return clientSock; + } // bridge methods connects adapter with the underlaying network socket diff --git a/components/endpoints/class.endpoint.js b/components/endpoints/class.endpoint.js index f174e9b..fd0b881 100644 --- a/components/endpoints/class.endpoint.js +++ b/components/endpoints/class.endpoint.js @@ -58,7 +58,7 @@ module.exports = class Endpoint extends Item { } catch (err) { - logger.warn(err, "Could not update item states after debouncing"); + logger.warn(err, `Could not update item states after debouncing (${this.name})`, this.states); } }, 100); // TODO: remove time, or set to 1/0 diff --git a/components/endpoints/class.state.js b/components/endpoints/class.state.js index 8fe1970..dfbdee7 100644 --- a/components/endpoints/class.state.js +++ b/components/endpoints/class.state.js @@ -117,6 +117,8 @@ module.exports = class State { is: "string", then: Joi.object({ value: Joi.string().default(null).allow(null) + // enum: Joi.array().items(Joi.string()).validate(...) + // if enum are not empty, state value can only be on of the enums }) }, { is: "boolean", diff --git a/components/scenes/class.scene.js b/components/scenes/class.scene.js index f114d5b..6e9d72e 100644 --- a/components/scenes/class.scene.js +++ b/components/scenes/class.scene.js @@ -162,7 +162,7 @@ module.exports = class Scene extends Item { } - trigger(inputs) { + trigger(inputs = []) { let { logger } = Scene.scope; logger.info(`Trigger scene "${this.name}" (${this._id}), inputs:`, inputs); @@ -183,6 +183,10 @@ module.exports = class Scene extends Item { return input.key === key; }); + if (!input) { + return; + } + input.value = value; }); diff --git a/components/ssdp/index.js b/components/ssdp/index.js index 46bb878..4143ab0 100644 --- a/components/ssdp/index.js +++ b/components/ssdp/index.js @@ -29,6 +29,7 @@ const messageHandler = require("./message-handler.js"); * @example * ```sh * nc -ulvv 239.255.255.250 1900 + * socat UDP4-RECVFROM:1900,ip-add-membership=239.255.255.250:0.0.0.0,fork - * ``` * * @example diff --git a/index.js b/index.js index c141e5f..eb8a548 100644 --- a/index.js +++ b/index.js @@ -126,6 +126,9 @@ if (process.env.NODE_ENV !== "production") { logger.warn("> OpenHaus runs not in production mode! (%s)", process.env.NODE_ENV); } +if (process.env.WORKER_THREADS_ENABLED === "true") { + logger.warn("Worker threads are a experimental feature!"); +} // see #471 if (!semver.satisfies(process.versions.node, pkg.engines.node)) { @@ -153,6 +156,7 @@ if (process.env.GC_INTERVAL !== null && global.gc) { const init_db = require("./system/init/init.database.js")(logger); const init_components = require("./system/init/init.components.js")(logger); const init_http = require("./system/init/init.http-server.js")(logger); +const { channel } = require("./system/component/class.events.js"); // NOTE: Could/should be removed @@ -310,6 +314,8 @@ const starter = new Promise((resolve) => { logger.debug(`signal=${signal} received`); logger.warn("Shuting down..."); + channel.close(); + setTimeout(() => { process.exit(0); }); diff --git a/routes/router.api.devices.js b/routes/router.api.devices.js index 9fe44d7..97c4248 100644 --- a/routes/router.api.devices.js +++ b/routes/router.api.devices.js @@ -147,11 +147,8 @@ module.exports = (app, router) => { // TODO: check code and decide if error or success closing //stream.emit("close"); // desotroy() emit close event(!|?) if (code === 1005 || code === 1000) { - console.log("end normaly"); stream.end(); - //stream.emit("end"); } else { - //console.log("End destory"); stream.destroy(); } diff --git a/routes/router.api.scenes.js b/routes/router.api.scenes.js index aa97a4e..74383c6 100644 --- a/routes/router.api.scenes.js +++ b/routes/router.api.scenes.js @@ -1,7 +1,7 @@ module.exports = (app, router) => { router.post("/:_id/trigger", (req, res) => { - req.item.trigger(); + req.item.trigger(req.body?.inputs || []); res.status(202).json(req.item); });