added MQTT monitor type

This commit is contained in:
Tarun Singh 2021-11-03 21:46:43 -04:00
parent d4c9431142
commit 670754b697
8 changed files with 261 additions and 14937 deletions

View file

@ -0,0 +1,10 @@
-- You should not modify if this have pushed to Github, unless it does serious wrong with the db.
BEGIN TRANSACTION;
ALTER TABLE monitor
ADD topic VARCHAR(50);
ALTER TABLE monitor
ADD success_message VARCHAR(255);
COMMIT;

15091
package-lock.json generated

File diff suppressed because it is too large Load diff

View file

@ -78,6 +78,7 @@
"jsonwebtoken": "~8.5.1", "jsonwebtoken": "~8.5.1",
"jwt-decode": "^3.1.2", "jwt-decode": "^3.1.2",
"limiter": "^2.1.0", "limiter": "^2.1.0",
"mqtt": "^4.2.8",
"nodemailer": "~6.6.5", "nodemailer": "~6.6.5",
"notp": "~2.0.3", "notp": "~2.0.3",
"password-hash": "~1.2.2", "password-hash": "~1.2.2",

View file

@ -52,6 +52,7 @@ class Database {
"patch-http-monitor-method-body-and-headers.sql": true, "patch-http-monitor-method-body-and-headers.sql": true,
"patch-2fa-invalidate-used-token.sql": true, "patch-2fa-invalidate-used-token.sql": true,
"patch-notification_sent_history.sql": true, "patch-notification_sent_history.sql": true,
"patch-added-mqtt-monitor.sql": true,
} }
/** /**

View file

@ -1,5 +1,6 @@
const https = require("https"); const https = require("https");
const dayjs = require("dayjs"); const dayjs = require("dayjs");
const mqtt = require("mqtt");
const utc = require("dayjs/plugin/utc"); const utc = require("dayjs/plugin/utc");
let timezone = require("dayjs/plugin/timezone"); let timezone = require("dayjs/plugin/timezone");
dayjs.extend(utc); dayjs.extend(utc);
@ -7,7 +8,7 @@ dayjs.extend(timezone);
const axios = require("axios"); const axios = require("axios");
const { Prometheus } = require("../prometheus"); const { Prometheus } = require("../prometheus");
const { debug, UP, DOWN, PENDING, flipStatus, TimeLogger } = require("../../src/util"); const { debug, UP, DOWN, PENDING, flipStatus, TimeLogger } = require("../../src/util");
const { tcping, ping, dnsResolve, checkCertificate, checkStatusCode, getTotalClientInRoom, setting, errorLog } = require("../util-server"); const { tcping, ping, dnsResolve, checkCertificate, checkStatusCode, getTotalClientInRoom, setting, errorLog, mqttAsync } = require("../util-server");
const { R } = require("redbean-node"); const { R } = require("redbean-node");
const { BeanModel } = require("redbean-node/dist/bean-model"); const { BeanModel } = require("redbean-node/dist/bean-model");
const { Notification } = require("../notification"); const { Notification } = require("../notification");
@ -112,7 +113,7 @@ class Monitor extends BeanModel {
// undefined if not https // undefined if not https
let tlsInfo = undefined; let tlsInfo = undefined;
if (! previousBeat) { if (!previousBeat) {
previousBeat = await R.findOne("heartbeat", " monitor_id = ? ORDER BY time DESC", [ previousBeat = await R.findOne("heartbeat", " monitor_id = ? ORDER BY time DESC", [
this.id, this.id,
]); ]);
@ -130,7 +131,7 @@ class Monitor extends BeanModel {
} }
// Duration // Duration
if (! isFirstBeat) { if (!isFirstBeat) {
bean.duration = dayjs(bean.time).diff(dayjs(previousBeat.time), "second"); bean.duration = dayjs(bean.time).diff(dayjs(previousBeat.time), "second");
} else { } else {
bean.duration = 0; bean.duration = 0;
@ -153,7 +154,7 @@ class Monitor extends BeanModel {
}, },
httpsAgent: new https.Agent({ httpsAgent: new https.Agent({
maxCachedSessions: 0, // Use Custom agent to disable session reuse (https://github.com/nodejs/node/issues/3940) maxCachedSessions: 0, // Use Custom agent to disable session reuse (https://github.com/nodejs/node/issues/3940)
rejectUnauthorized: ! this.getIgnoreTls(), rejectUnauthorized: !this.getIgnoreTls(),
}), }),
maxRedirects: this.maxredirects, maxRedirects: this.maxredirects,
validateStatus: (status) => { validateStatus: (status) => {
@ -296,7 +297,7 @@ class Monitor extends BeanModel {
}, },
httpsAgent: new https.Agent({ httpsAgent: new https.Agent({
maxCachedSessions: 0, // Use Custom agent to disable session reuse (https://github.com/nodejs/node/issues/3940) maxCachedSessions: 0, // Use Custom agent to disable session reuse (https://github.com/nodejs/node/issues/3940)
rejectUnauthorized: ! this.getIgnoreTls(), rejectUnauthorized: !this.getIgnoreTls(),
}), }),
maxRedirects: this.maxredirects, maxRedirects: this.maxredirects,
validateStatus: (status) => { validateStatus: (status) => {
@ -319,6 +320,14 @@ class Monitor extends BeanModel {
throw new Error("Server not found on Steam"); throw new Error("Server not found on Steam");
} }
} else if (this.type === "mqtt") {
try {
bean.msg = await mqttAsync(this.url, this.topic, this.successMessage);
bean.status = UP;
} catch (error) {
bean.status = DOWN;
bean.msg = error.message;
}
} else { } else {
bean.msg = "Unknown Monitor Type"; bean.msg = "Unknown Monitor Type";
bean.status = PENDING; bean.status = PENDING;
@ -385,7 +394,7 @@ class Monitor extends BeanModel {
previousBeat = bean; previousBeat = bean;
if (! this.isStop) { if (!this.isStop) {
if (demoMode) { if (demoMode) {
if (beatInterval < 20) { if (beatInterval < 20) {
@ -407,7 +416,7 @@ class Monitor extends BeanModel {
errorLog(e, false); errorLog(e, false);
console.error("Please report to https://github.com/louislam/uptime-kuma/issues"); console.error("Please report to https://github.com/louislam/uptime-kuma/issues");
if (! this.isStop) { if (!this.isStop) {
console.log("Try to restart the monitor"); console.log("Try to restart the monitor");
this.heartbeatInterval = setTimeout(safeBeat, this.interval * 1000); this.heartbeatInterval = setTimeout(safeBeat, this.interval * 1000);
} }
@ -590,7 +599,7 @@ class Monitor extends BeanModel {
} else { } else {
// Handle new monitor with only one beat, because the beat's duration = 0 // Handle new monitor with only one beat, because the beat's duration = 0
let status = parseInt(await R.getCell("SELECT `status` FROM heartbeat WHERE monitor_id = ?", [ monitorID ])); let status = parseInt(await R.getCell("SELECT `status` FROM heartbeat WHERE monitor_id = ?", [monitorID]));
if (status === UP) { if (status === UP) {
uptime = 1; uptime = 1;

View file

@ -10,6 +10,8 @@ const iconv = require("iconv-lite");
const chardet = require("chardet"); const chardet = require("chardet");
const fs = require("fs"); const fs = require("fs");
const nodeJsUtil = require("util"); const nodeJsUtil = require("util");
const mqtt = require("mqtt");
// From ping-lite // From ping-lite
exports.WIN = /^win/.test(process.platform); exports.WIN = /^win/.test(process.platform);
@ -26,7 +28,7 @@ exports.initJWTSecret = async () => {
"jwtSecret", "jwtSecret",
]); ]);
if (! jwtSecretBean) { if (!jwtSecretBean) {
jwtSecretBean = R.dispense("setting"); jwtSecretBean = R.dispense("setting");
jwtSecretBean.key = "jwtSecret"; jwtSecretBean.key = "jwtSecret";
} }
@ -88,6 +90,30 @@ exports.pingAsync = function (hostname, ipv6 = false) {
}); });
}; };
exports.mqttAsync = function (hostname, topic, okMessage) {
return new Promise((resolve, reject) => {
try {
let client = mqtt.connect(hostname);
client.on("connect", () => {
client.subscribe(topic);
}
);
client.on("message", (messageTopic, message) => {
console.log(messageTopic);
if (messageTopic == topic && message.toString() !== okMessage) {
client.end();
reject(new Error(`Error; Topic: ${messageTopic}; Message: ${message.toString()}`));
} else {
client.end();
resolve(`Topic: ${messageTopic}; Message: ${message.toString()}`);
}
});
} catch (error) {
reject(new Error(error));
}
});
};
exports.dnsResolve = function (hostname, resolver_server, rrtype) { exports.dnsResolve = function (hostname, resolver_server, rrtype) {
const resolver = new Resolver(); const resolver = new Resolver();
resolver.setServers([resolver_server]); resolver.setServers([resolver_server]);
@ -269,13 +295,13 @@ exports.getTotalClientInRoom = (io, roomName) => {
const sockets = io.sockets; const sockets = io.sockets;
if (! sockets) { if (!sockets) {
return 0; return 0;
} }
const adapter = sockets.adapter; const adapter = sockets.adapter;
if (! adapter) { if (!adapter) {
return 0; return 0;
} }
@ -300,7 +326,7 @@ exports.allowAllOrigin = (res) => {
}; };
exports.checkLogin = (socket) => { exports.checkLogin = (socket) => {
if (! socket.userID) { if (!socket.userID) {
throw new Error("You are not logged in."); throw new Error("You are not logged in.");
} }
}; };

View file

@ -306,4 +306,9 @@ export default {
"One record": "One record", "One record": "One record",
steamApiKeyDescription: "For monitoring a Steam Game Server you need a Steam Web-API key. You can register your API key here: ", steamApiKeyDescription: "For monitoring a Steam Game Server you need a Steam Web-API key. You can register your API key here: ",
"Current User": "Current User", "Current User": "Current User",
topic: "Topic",
topicExplanation: "MQTT topic to monitor",
successMessage: "Success Message",
successMessageExplanation: "MQTT message that will be considered as success",
url: "Server URL",
}; };

View file

@ -32,6 +32,9 @@
<option value="steam"> <option value="steam">
Steam Game Server Steam Game Server
</option> </option>
<option value="mqtt">
MQTT
</option>
</select> </select>
</div> </div>
@ -115,6 +118,32 @@
</div> </div>
</template> </template>
<!-- MQTT -->
<!-- For MQTT Type -->
<template v-if="monitor.type === 'mqtt'">
<div class="my-3">
<label for="url" class="form-label">{{ $t("url") }}</label>
<input id="url" v-model="monitor.url" type="text" class="form-control" pattern="https?://.+" required>
</div>
<div class="my-3">
<label for="topic" class="form-label">{{ $t("topic") }}</label>
<input id="topic" v-model="monitor.topic" type="text" class="form-control" required>
<div class="form-text">
{{ $t("topicExplanation") }}
</div>
</div>
<div class="my-3">
<label for="successMessage" class="form-label">{{ $t("successMessage") }}</label>
<input id="successMessage" v-model="monitor.successMessage" type="text" class="form-control" required>
<div class="form-text">
{{ $t("successMessageExplanation") }}
</div>
</div>
</template>
<!-- Interval --> <!-- Interval -->
<div class="my-3"> <div class="my-3">
<label for="interval" class="form-label">{{ $t("Heartbeat Interval") }} ({{ $t("checkEverySecond", [ monitor.interval ]) }})</label> <label for="interval" class="form-label">{{ $t("Heartbeat Interval") }} ({{ $t("checkEverySecond", [ monitor.interval ]) }})</label>
@ -426,6 +455,8 @@ export default {
accepted_statuscodes: ["200-299"], accepted_statuscodes: ["200-299"],
dns_resolve_type: "A", dns_resolve_type: "A",
dns_resolve_server: "1.1.1.1", dns_resolve_server: "1.1.1.1",
topic: "",
successMessage: "",
}; };
for (let i = 0; i < this.$root.notificationList.length; i++) { for (let i = 0; i < this.$root.notificationList.length; i++) {