Skip to content

Minor tweaks, major hopes #578

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 9 commits into
base: dev
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
78 changes: 76 additions & 2 deletions components/devices/class.interface.js
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ const { Agent } = require("http");
const https = require("https");
const tls = require("tls");
const mongodb = require("mongodb");
const { Duplex, PassThrough } = require("stream");
const { PassThrough, pipeline, duplexPair } = require("stream");
const { randomUUID } = require("crypto");
//const path = require("path");

Expand Down Expand Up @@ -176,7 +176,7 @@ module.exports = class Interface {

}


/*
bridge() {

let { logger } = Interface.scope;
Expand Down Expand Up @@ -222,11 +222,21 @@ module.exports = class Interface {

}

/*
["close", "end", "error"].forEach((event) => {
stream.on(event, (...args) => {
console.log("Event", event, "on websocket stream received");
socket.emit(event, ...args);
});
});
*

stream.once("end", () => {
console.log("End on websocket stream received, call end on socket stream")
writable.end();
readable.end();
socket.end();
});

stream.once("destroy", () => {
writable.destory();
Expand Down Expand Up @@ -277,6 +287,70 @@ module.exports = class Interface {

return socket;

}
*/

// new approach
bridge() {

let { logger } = Interface.scope;
let { host, port, socket: proto } = this.settings;

// from this.adapter array
// placeholder for adapter pipeline
const encode = new PassThrough(); // outbound
const decode = new PassThrough(); // inbound
const [clientSock, proxySock] = duplexPair();


["ref", "unref", "setKeepAlive", "setTimeout", "setNoDelay"].forEach((fnc) => {
clientSock[fnc] = (...args) => {
logger.trace(`Interface ${this._id} method "${fnc}" called on socket stream, args:`, args);
};
});

Interface.socket(this._id, (err, ws, request) => {

if (err) {

if (process.env.NODE_ENV === "development") {
logger.warn(err, "Could not create socket");
}

return clientSock.destroy(err);

}

if (process.env.NODE_ENV === "development") {

clientSock.once("open", () => {
logger.debug(`Bridge open: iface ${this._id} <-> ${proto}://${host}:${port} (${request.uuid})`);
});

clientSock.once("close", () => {
logger.debug(`Bridge closed: iface ${this._id} <-> ${proto}://${host}:${port} (${request.uuid})`);
});

}

// outbound: client -> encode -> websocket
pipeline(proxySock, encode, ws, (err) => {
clientSock.destroy(err);
});

// inbound: websocket -> decode -> client
pipeline(ws, decode, proxySock, (err) => {
clientSock.destroy(err);
});

process.nextTick(() => {
clientSock.emit("open");
});

});

return clientSock;

}

// bridge methods connects adapter with the underlaying network socket
Expand Down
2 changes: 1 addition & 1 deletion components/endpoints/class.endpoint.js
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@ module.exports = class Endpoint extends Item {

} catch (err) {

logger.warn(err, "Could not update item states after debouncing");
logger.warn(err, `Could not update item states after debouncing (${this.name})`, this.states);

}
}, 100); // TODO: remove time, or set to 1/0
Expand Down
2 changes: 2 additions & 0 deletions components/endpoints/class.state.js
Original file line number Diff line number Diff line change
Expand Up @@ -117,6 +117,8 @@ module.exports = class State {
is: "string",
then: Joi.object({
value: Joi.string().default(null).allow(null)
// enum: Joi.array().items(Joi.string()).validate(...)
// if enum are not empty, state value can only be on of the enums
})
}, {
is: "boolean",
Expand Down
6 changes: 5 additions & 1 deletion components/scenes/class.scene.js
Original file line number Diff line number Diff line change
Expand Up @@ -162,7 +162,7 @@ module.exports = class Scene extends Item {

}

trigger(inputs) {
trigger(inputs = []) {

let { logger } = Scene.scope;
logger.info(`Trigger scene "${this.name}" (${this._id}), inputs:`, inputs);
Expand All @@ -183,6 +183,10 @@ module.exports = class Scene extends Item {
return input.key === key;
});

if (!input) {
return;
}

input.value = value;

});
Expand Down
1 change: 1 addition & 0 deletions components/ssdp/index.js
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ const messageHandler = require("./message-handler.js");
* @example
* ```sh
* nc -ulvv 239.255.255.250 1900
* socat UDP4-RECVFROM:1900,ip-add-membership=239.255.255.250:0.0.0.0,fork -
* ```
*
* @example
Expand Down
6 changes: 6 additions & 0 deletions index.js
Original file line number Diff line number Diff line change
Expand Up @@ -126,6 +126,9 @@ if (process.env.NODE_ENV !== "production") {
logger.warn("> OpenHaus runs not in production mode! (%s)", process.env.NODE_ENV);
}

if (process.env.WORKER_THREADS_ENABLED === "true") {
logger.warn("Worker threads are a experimental feature!");
}

// see #471
if (!semver.satisfies(process.versions.node, pkg.engines.node)) {
Expand Down Expand Up @@ -153,6 +156,7 @@ if (process.env.GC_INTERVAL !== null && global.gc) {
const init_db = require("./system/init/init.database.js")(logger);
const init_components = require("./system/init/init.components.js")(logger);
const init_http = require("./system/init/init.http-server.js")(logger);
const { channel } = require("./system/component/class.events.js");


// NOTE: Could/should be removed
Expand Down Expand Up @@ -310,6 +314,8 @@ const starter = new Promise((resolve) => {
logger.debug(`signal=${signal} received`);
logger.warn("Shuting down...");

channel.close();

setTimeout(() => {
process.exit(0);
});
Expand Down
3 changes: 0 additions & 3 deletions routes/router.api.devices.js
Original file line number Diff line number Diff line change
Expand Up @@ -147,11 +147,8 @@ module.exports = (app, router) => {
// TODO: check code and decide if error or success closing
//stream.emit("close"); // desotroy() emit close event(!|?)
if (code === 1005 || code === 1000) {
console.log("end normaly");
stream.end();
//stream.emit("end");
} else {
//console.log("End destory");
stream.destroy();
}

Expand Down
2 changes: 1 addition & 1 deletion routes/router.api.scenes.js
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
module.exports = (app, router) => {

router.post("/:_id/trigger", (req, res) => {
req.item.trigger();
req.item.trigger(req.body?.inputs || []);
res.status(202).json(req.item);
});

Expand Down