initial commit

This commit is contained in:
2022-06-28 13:02:50 +02:00
commit a4e6b655b7
16 changed files with 7870 additions and 0 deletions

View File

@@ -0,0 +1,46 @@
export const OPENDATA_URL = 'http://opendata.bratislava.sk/api/mhd';
export default class BratislavaOpendata {
constructor(apiKey) {
this.apiKey = apiKey;
}
async fetchOneStop(stationId) {
return this.request('/stationstop/' + stationId);
}
async fetchAllStops() {
return this.request('/stationstop');
}
async fetchAllVehicles() {
return this.request('/vehicle');
}
async request(uri) {
let url = OPENDATA_URL + '/' + uri;
let resp = await fetch(url, {
cache: 'no-cache',
headers: {
'User-Agent': 'node_mhdmap/1.0',
'Key': this.apiKey,
},
});
if (!resp.ok) {
console.warn('Error while fetching', url, 'status:', resp.status, resp.statusText);
return null;
}
try {
resp = await resp.json();
return resp;
} catch (e) {
console.warn('Incorrect response for fetching', url, e);
return null;
}
}
}

136
server/ImhdClient.js Normal file
View File

@@ -0,0 +1,136 @@
import * as fs from 'fs';
import path from "path";
import https from 'https';
import {fileURLToPath} from "url";
import { nanoid } from 'nanoid'
const imhdData = path.dirname(path.dirname(fileURLToPath(import.meta.url))) + '/dist/imhd';
export default class ImhdClient {
constructor() {
fs.mkdirSync(imhdData + '/icons', { recursive: true });
this.cache = {};
this.loadCache();
}
getVehicleData(vehicleId) {
if (this.cache.hasOwnProperty(vehicleId)) {
return this.cache[vehicleId];
} else {
return null;
}
}
async search(phrase) {
let resp = await fetch('https://imhd.sk/ba/api/sk/vyhladavanie?q=' + encodeURIComponent(phrase));
if (!resp.ok) {
return null;
}
try {
return resp.json();
} catch (e) {
return null;
}
}
async downloadVehicleIcon(vehicleId) {
if (this.cache.hasOwnProperty(vehicleId)) {
if (this.cache[vehicleId].type)
return this.cache[vehicleId].url;
}
const randId = nanoid(5);
const iconPath = imhdData + '/icons/' + randId + '.png';
if (fs.existsSync(iconPath)) {
return null;
}
let results = await this.search(vehicleId);
if (!results) {
console.log('Search for vehicle unsuccessful:', vehicleId);
return null;
}
results = results.results || [];
for (let obj of results) {
if ((typeof obj.img) !== 'string' || obj.img.length < 10)
continue;
const urlParts = obj.url.trim().split('/');
if (urlParts.length < 3)
continue;
if (urlParts[2] !== 'vozidlo')
continue;
if (!obj.name.endsWith('#' + vehicleId))
continue;
let matches = obj.img.match(/.*src="([^"]*)".*/);
if (!matches)
continue;
const url = 'https://imhd.sk' + matches[1];
const existing = Object.values(this.cache).find(el => el.url === url);
if (!existing) {
// TODO: Path traversal vulnerability here!
await this.downloadFile(url, iconPath);
fs.appendFile(
imhdData + '/names.txt',
vehicleId + '|' + obj.name + "\n",
err => { err && console.error(err); }
);
}
this.cache[vehicleId] = {
img: existing ? existing.img : randId,
url: url,
type: obj.class,
info: obj.url,
};
this.saveCache();
return true;
}
}
async downloadFile(url, target) {
return new Promise ((resolve, reject) => {
const file = fs.createWriteStream(target);
https.get(url, resp => {
resp.pipe(file);
file.on('finish', () => {
file.close();
console.log('Downloaded', url);
resolve();
});
});
});
}
loadCache() {
try {
let cacheFile = fs.readFileSync(imhdData + '/iconsdb.json');
if (cacheFile) {
let parsed = JSON.parse(cacheFile);
if (parsed) {
this.cache = parsed;
}
}
} catch (e) {
console.log('Cannot read iconsdb.json for ImhdClient', e);
}
}
saveCache() {
fs.writeFile(imhdData + '/iconsdb.json', JSON.stringify(this.cache, null, 2), err => {
if (err) {
console.log('Cannot save cache for ImhdClient', err);
}
});
}
}

223
server/MhdMapApp.js Normal file
View File

@@ -0,0 +1,223 @@
import BratislavaOpendata from "./BratislavaOpendata.js";
import config from "../config.js";
import {bearing, removeMilis} from "./math.js";
import ImhdClient from "./ImhdClient.js";
import Recorder from "./Recorder.js";
import getDatabase from "./getDatabase.js";
export default class MhdMapApp {
constructor(websockets) {
this.opendata = new BratislavaOpendata(config.openDataKey);
this.websockets = websockets;
this.stops = null;
this.oldVehicles = null;
this.vehicles = null;
this.lastUpdatedStatic = null;
this.lastUpdatedDynamic = null;
this.lastDelta = null;
this.imhd = new ImhdClient();
this.recorder = new Recorder(getDatabase());
}
async start() {
this.websockets.on('join', ws => {
this.sendAction(ws, 'updateStops', this.stops);
this.sendAction(ws, 'updateVehicles', this.vehicles);
});
this.websockets.on('message', this._onWsMessage.bind(this));
await this.recorder.start();
setInterval(() => {
this.updateStatic();
}, 1000 * 60 * 60 * 24);
await this.updateStatic();
await this.updateDynamic();
setInterval(async () => {
await this.updateDynamic();
if (this.lastDelta.length > 0) {
this.broadcastAction('updateVehicles', this.lastDelta);
}
}, 1000 * 5);
}
async updateStatic() {
let tmp = await this.opendata.fetchAllStops();
if (tmp) {
this.stops = tmp;
this.lastUpdatedStatic = new Date();
}
}
async updateDynamic() {
let tmp = await this.opendata.fetchAllVehicles();
if (!tmp)
return;
// TODO: We're in Slovakia.
tmp = tmp.filter(el => el.gpsLongitude > 10);
tmp = this._vehiclesToObj(tmp);
this.oldVehicles = this.vehicles;
this.vehicles = tmp;
this.lastUpdatedDynamic = new Date();
if (this.oldVehicles) {
this.lastDelta = this.calculateVehiclesDelta(this.oldVehicles, this.vehicles);
this.vehicles = this.addRotationToVehicles(this.oldVehicles, this.vehicles);
for (let veh of Object.values(this.lastDelta)) {
await this.recorder.putVehicle(veh);
}
} else {
for (let veh of Object.values(this.vehicles)) {
await this.recorder.putVehicle(veh);
}
}
this.vehicles = this.addImhdDataToVehicles(this.vehicles);
let down = 0;
for (let vehId in this.vehicles) {
if (await this.imhd.downloadVehicleIcon(vehId) === true) {
if (down++ === 100) {
break;
}
}
}
}
calculateVehiclesDelta(beforeObj, afterObj) {
let delta = {};
for (let vehId in beforeObj) {
let beforeVeh = beforeObj[vehId];
if (afterObj.hasOwnProperty(vehId)) {
let afterVeh = afterObj[vehId];
if (!this.isVehicleObjectEqual(beforeVeh, afterVeh)) {
delta[vehId] = afterVeh;
}
} else {
// removed vehicle
beforeVeh.gpsLatitude = -1000;
beforeVeh.gpsLongitude = -1000;
delta[vehId] = beforeVeh;
console.log('vehicle left', beforeVeh);
}
}
for (let vehId in afterObj) {
if (!beforeObj.hasOwnProperty(vehId)) {
// new vehicle
delta[vehId] = afterObj[vehId];
// console.log('new vehicle joined', afterObj[vehId]);
}
}
return delta;
}
_vehiclesToObj(arr) {
let obj = {};
for (let vehicle of arr) {
let id = this._vehicleToId(vehicle);
vehicle.lastModified = removeMilis(new Date(vehicle.lastModified));
if (obj.hasOwnProperty(id)) {
console.log('DUPLICATE vehicle?!', 'old:', obj[id], 'new:', vehicle);
}
obj[id] = vehicle;
}
return obj;
}
_vehicleToId(vehicle) {
return vehicle.vehicleNumber;
}
isVehicleObjectEqual(a, b, checkLastModified = false) {
return (
a.vehicleNumber === b.vehicleNumber
&& a.lineNumber === b.lineNumber
&& a.gpsLatitude === b.gpsLatitude
&& a.gpsLongitude === b.gpsLongitude
&& (!checkLastModified || (a.lastModified === b.lastModified))
);
}
isVehiclePositionEqual(a, b) {
return a.gpsLatitude === b.gpsLatitude && a.gpsLongitude === b.gpsLongitude;
}
addImhdDataToVehicles(vehicles) {
for (let vehId in vehicles) {
const data = this.imhd.getVehicleData(vehId);
if (!data)
continue;
let vehicle = vehicles[vehId];
vehicle.img = data.img;
vehicle.type = data.type;
vehicle.imhdinfo = data.info;
}
return vehicles;
}
addRotationToVehicles(before, after) {
for (let vehId in after) {
if (before.hasOwnProperty(vehId)) {
after[vehId].bearing = bearing(before[vehId], after[vehId]);
}
}
return after;
}
broadcastAction(action, data) {
this.websockets.broadcast(JSON.stringify({ action, data }));
}
sendAction(ws, action, data) {
ws.send(JSON.stringify({ action, data }));
}
async _onWsMessage(ws, data) {
// Note: before sending the response, it should be checked if this ws is still open
try {
data = JSON.parse(data);
} catch (e) {
console.log('Malformed request: ', data, e);
return null;
}
const msgid = data.id;
let res = await this.handleWsRequest(ws, data.action, data.msg);
if (ws.readyState !== 1) { // 1 = WebSocket.OPEN
// closed while handling the request
return;
}
this.sendAction(ws, 'response', {
id: msgid,
msg: res,
});
}
async handleWsRequest(ws, action, data) {
if (action === 'requestVehicleTrace') {
return await this.recorder.getVehicleTrace(data.vehicle, data.count || 20);
} else if (action === 'requestLineTrace') {
return await this.recorder.getSimpleLineTrace(data.line, data.count || 20);
}
return null;
}
}

122
server/Recorder.js Normal file
View File

@@ -0,0 +1,122 @@
export default class Recorder {
constructor(db) {
this.db = db;
this.lines = {};
this.vehModified = {};
}
async start() {
await this._createTables();
await this._loadLast();
}
async _createTables() {
if (!this.db)
return;
if (!await this.db.schema.hasTable('veh_lines')) {
await this.db.schema.createTable('veh_lines', t => {
t.smallint('vehicle').notNullable();
t.string('line', 32).nullable(); // 4, 9....
t.timestamp('date', { useTz: false }).defaultTo(this.db.fn.now());
});
}
if (!await this.db.schema.hasTable('veh_pos')) {
await this.db.schema.createTable('veh_pos', t => {
t.smallint('vehicle').notNullable();
t.smallint('line').nullable();
t.timestamp('date', { useTz: false }).defaultTo(this.db.fn.now());
t.float('lat').nullable();
t.float('long').nullable();
});
}
}
async _loadLast() {
const lineData = await this.db('veh_lines')
.distinctOn('vehicle')
.orderBy([
{ column: 'vehicle', order: 'asc' },
{ column: 'date', order: 'desc' },
]);
for (let row of lineData) {
this.lines[row.vehicle.toString()] = parseInt(row.line) || null;
}
const vehData = await this.db
.select('vehicle', 'date')
.from('veh_pos')
.distinctOn('vehicle')
.orderBy([
{ column: 'vehicle', order: 'asc' },
{ column: 'date', order: 'desc' },
]);
for (let row of vehData) {
this.vehModified[row.vehicle.toString()] = new Date(row.date);
}
}
async putVehicle(veh) {
// TODO: Add filters for data integrity (vehicleNumber === null
let lineNumber = parseInt(veh.lineNumber) || null;
if (this.lines[veh.vehicleNumber.toString()] !== lineNumber) {
await this.db('veh_lines').insert({
vehicle: veh.vehicleNumber,
line: veh.lineNumber,
date: veh.lastModified,
});
this.lines[veh.vehicleNumber.toString()] = lineNumber;
}
if (this.vehModified[veh.vehicleNumber]?.getTime() !== veh.lastModified.getTime()) {
// console.log('for', veh.vehicleNumber, this.vehModified[veh.vehicleNumber], ' vs. from open', veh.lastModified)
await this.db('veh_pos').insert({
vehicle: veh.vehicleNumber,
line: veh.lineNumber,
date: veh.lastModified,
lat: veh.gpsLatitude,
long: veh.gpsLongitude,
});
this.vehModified[veh.vehicleNumber] = veh.lastModified;
}
}
async getVehicleTrace(vehId, count) {
return await this.db
.select('date', 'lat', 'long')
.from('veh_pos')
.where({
vehicle: parseInt(vehId),
})
// .andWhere('date', '<', '2022-06-24T14:07:00')
.orderBy('date', 'desc')
.limit(count);
}
async getLineTrace(line, count) {
return await this.db
.select('date', 'lat', 'long')
.from('veh_pos')
.where({
line: parseInt(line),
})
.orderBy('date', 'desc')
.limit(count);
}
async getSimpleLineTrace(line, count) {
return await this.db
.select('lat', 'long')
.from('veh_pos')
.where({
line: parseInt(line),
})
.limit(count);
}
}

92
server/SocketHandler.js Normal file
View File

@@ -0,0 +1,92 @@
import {tinyws} from "tinyws";
import { randomUUID } from 'crypto';
export default class SocketHandler {
constructor(path, args) {
this.path = path;
this.connections = {};
this.handlers = {
'join': [],
'leave': [],
'message': [],
};
this.pingInterval = null;
}
start(express) {
express.use(tinyws());
express.use(this.path, this._receiveRequest.bind(this));
this.pingInterval = setInterval(() => {
Object.values(this.connections).forEach(client => {
if (++client.failedPings > 3) {
this.terminateClient(client.id);
} else {
client.ws.ping();
}
});
}, 30000);
// this.pingInterval
}
stop() {
clearInterval(this.pingInterval);
}
broadcast(msg) {
Object.values(this.connections).forEach(client => {
client.ws.send(msg);
});
}
async _receiveRequest(req, res) {
if (req.ws) {
const ws = await req.ws()
return this._receiveWebsocket(req, res, ws);
} else {
// not handling HTTP
}
}
async _receiveWebsocket(req, res, ws) {
ws.id = randomUUID();
this.connections[ws.id] = {
ws: ws,
id: ws.id,
joined: new Date(),
failedPings: 0,
};
ws.on('close', () => {
this._fire('leave', ws);
delete this.connections[ws.id];
});
ws.on('message', (data, ...args) => {
this._fire('message', ws, data, ...args);
});
ws.on('pong', () => {
this.failedPings = 0;
});
this._fire('join', ws);
}
terminateClient(uuid) {
this.connections[uuid].ws.terminate();
delete this.connections[uuid];
}
on(event, callback) {
this.handlers[event].push(callback);
}
_fire(event, ...args) {
this.handlers[event].forEach(cb => cb(...args));
}
}

33
server/getDatabase.js Normal file
View File

@@ -0,0 +1,33 @@
import knex from 'knex';
import config from "../config.js";
let database = null;
let lastFailed = null;
/**
* @return {Knex}
*/
export default function getDatabase() {
if (database) {
return database;
}
if (lastFailed && (new Date().getTime() - lastFailed.getTime() < 60000)) {
// attempt to reconnect every 1 minute, not more frequently
return;
}
try {
console.log('connecting db');
database = knex({
...config.database,
});
lastFailed = null;
return database;
} catch (e) {
console.error("Cannot connect to database". e);
database = null;
lastFailed = new Date();
return null;
}
};

63
server/index.js Normal file
View File

@@ -0,0 +1,63 @@
import ParcelCore from "@parcel/core";
const { default: Parcel } = ParcelCore;
import { createProxyMiddleware } from 'http-proxy-middleware';
import express from "express";
import { createRequire } from "module";
import path from 'path';
import { fileURLToPath } from 'url';
import config from "../config.js";
import MhdMapApp from "./MhdMapApp.js";
import SocketHandler from "./SocketHandler.js";
const __filename = fileURLToPath(import.meta.url);
const __dirname = path.dirname(__filename);
const devPort = 5000;
const servicePort = 5001;
const staticFilePath = './static'
// Development Server Settings
const frontEndDevServerOptions = {
defaultConfig: createRequire(import.meta.url).resolve(
"@parcel/config-default"
),
entries: path.join(__dirname, '../client/index.html'),
mode: 'development',
logLevel: 4,
serveOptions: {
port: devPort,
},
/* hmrOptions: {
port: devPort,
}, */
publicUrl: config.urlPrefix,
defaultTargetOptions: {
publicUrl: config.urlPrefix,
},
};
const bundler = new Parcel(frontEndDevServerOptions);
(async () => {
await bundler.watch();
})();
const server = express();
const socketHandler = new SocketHandler(config.urlPrefix + '/ws');
socketHandler.start(server);
const app = new MhdMapApp(socketHandler);
app.start();
server.use(config.urlPrefix + '/static', express.static(staticFilePath));
const parcelMiddleware = createProxyMiddleware({
target: `http://localhost:${devPort}/`,
pathRewrite: {'^/mhd' : ''} // TODO: Maybe fix this with Parcel.
});
server.use('/', parcelMiddleware);
// Run your Express server
server.listen(servicePort, () => {
console.log(`Listening to port ${servicePort}...`);
});

18
server/math.js Normal file
View File

@@ -0,0 +1,18 @@
export function bearing (latlng1, latlng2) {
const rad = Math.PI / 180
const lat1 = latlng1.gpsLatitude * rad
const lat2 = latlng2.gpsLatitude * rad
const lon1 = latlng1.gpsLongitude * rad
const lon2 = latlng2.gpsLongitude * rad
const y = Math.sin(lon2 - lon1) * Math.cos(lat2)
const x = Math.cos(lat1) * Math.sin(lat2) -
Math.sin(lat1) * Math.cos(lat2) * Math.cos(lon2 - lon1)
const bearing = ((Math.atan2(y, x) * 180 / Math.PI) + 360) % 360
return bearing >= 180 ? bearing - 360 : bearing
}
export function removeMilis(date) {
date.setSeconds(date.getSeconds(), 0);
return date;
}