-
Notifications
You must be signed in to change notification settings - Fork 1
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
feat: connect to InfluxDB and store telemetry + car data
- Loading branch information
1 parent
d51ef0f
commit f313cf0
Showing
13 changed files
with
179 additions
and
20 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,26 @@ | ||
import { FastifyPluginAsync } from 'fastify'; | ||
import fp from 'fastify-plugin'; | ||
import { Plugins } from './plugins'; | ||
import { CarsService } from '../services/cars.service'; | ||
|
||
declare module 'fastify' { | ||
interface FastifyInstance { | ||
carsService: CarsService; | ||
} | ||
} | ||
|
||
const carsServicePlugin: FastifyPluginAsync = async (fastify) => { | ||
const carsService = new CarsService(fastify.influxClient, fastify.influxOrg); | ||
|
||
await carsService.init(); | ||
|
||
fastify.decorate('carsService', carsService); | ||
fastify.addHook('onClose', async () => { | ||
await carsService.deinit(); | ||
}); | ||
}; | ||
|
||
export default fp(carsServicePlugin, { | ||
name: Plugins.CARS_SERVICE, | ||
dependencies: [Plugins.INFLUXDB], | ||
}); |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -1,5 +1,6 @@ | ||
export enum Plugins { | ||
INFLUXDB = 'influxdb', | ||
SONDEHUB = 'sondehub', | ||
MQTT = 'mqtt', | ||
CARS_SERVICE = 'carsService', | ||
TELEMETRY_SERVICE = 'telemetryService', | ||
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,26 @@ | ||
import { FastifyPluginAsync } from 'fastify'; | ||
import fp from 'fastify-plugin'; | ||
import { Plugins } from './plugins'; | ||
import { TelemetryService } from '../services/telemetry.service'; | ||
|
||
declare module 'fastify' { | ||
interface FastifyInstance { | ||
telemetryService: TelemetryService; | ||
} | ||
} | ||
|
||
const telemetryServicePlugin: FastifyPluginAsync = async (fastify) => { | ||
const telemetryService = new TelemetryService(fastify.influxClient, fastify.influxOrg); | ||
|
||
await telemetryService.init(); | ||
|
||
fastify.decorate('telemetryService', telemetryService); | ||
fastify.addHook('onClose', async () => { | ||
await telemetryService.deinit(); | ||
}); | ||
}; | ||
|
||
export default fp(telemetryServicePlugin, { | ||
name: Plugins.TELEMETRY_SERVICE, | ||
dependencies: [Plugins.INFLUXDB], | ||
}); |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,32 @@ | ||
import { InfluxDB, Point, WriteApi } from '@influxdata/influxdb-client'; | ||
import { InfluxDbServiceBase } from '../utils/influxdb-service-base'; | ||
import { Organization } from '@influxdata/influxdb-client-apis'; | ||
import { CarStatus } from '../schemas'; | ||
|
||
export class CarsService extends InfluxDbServiceBase { | ||
private writeAPi: WriteApi; | ||
|
||
constructor(private client: InfluxDB, private org: Organization) { | ||
super(client, org.id); | ||
} | ||
|
||
public async init() { | ||
await this.ensureBucket('cars'); | ||
this.writeAPi = this.client.getWriteApi(this.org.id, 'cars', 'ms'); | ||
} | ||
|
||
public async deinit() { | ||
await this.writeAPi.close(); | ||
} | ||
|
||
public writeCarStatus(callsign: string, status: CarStatus) { | ||
const point = new Point('car_status') | ||
.timestamp(Date.now()) | ||
.tag('callsign', callsign) | ||
.floatField('latitude', status.latitude) | ||
.floatField('longitude', status.longitude) | ||
.floatField('altitude', status.altitude); | ||
|
||
this.writeAPi.writePoint(point); | ||
} | ||
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,32 @@ | ||
import { InfluxDB, Point, WriteApi } from '@influxdata/influxdb-client'; | ||
import { Organization } from '@influxdata/influxdb-client-apis'; | ||
import { InfluxDbServiceBase } from '../utils/influxdb-service-base'; | ||
import { TelemetryPacket } from '@gapp/sondehub'; | ||
|
||
export class TelemetryService extends InfluxDbServiceBase { | ||
private writeAPi: WriteApi; | ||
|
||
constructor(private client: InfluxDB, private org: Organization) { | ||
super(client, org.id); | ||
} | ||
|
||
public async init() { | ||
await this.ensureBucket('telemetry'); | ||
this.writeAPi = this.client.getWriteApi(this.org.id, 'telemetry', 'ms'); | ||
} | ||
|
||
public async deinit() { | ||
await this.writeAPi.close(); | ||
} | ||
|
||
public writeTelemetry(telemetry: TelemetryPacket) { | ||
const point = new Point('telemetry_packet') | ||
.timestamp(new Date(telemetry.time_received)) | ||
.tag('callsign', telemetry.payload_callsign) | ||
.floatField('latitude', telemetry.lat) | ||
.floatField('longitude', telemetry.lon) | ||
.floatField('altitude', telemetry.alt); | ||
|
||
this.writeAPi.writePoint(point); | ||
} | ||
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,26 @@ | ||
import { InfluxDB } from '@influxdata/influxdb-client'; | ||
import { Bucket, BucketsAPI } from '@influxdata/influxdb-client-apis'; | ||
|
||
export abstract class InfluxDbServiceBase { | ||
constructor(protected readonly influxdbClient: InfluxDB, protected readonly orgID: string) {} | ||
|
||
public abstract init(): Promise<void>; | ||
public abstract deinit(): Promise<void>; | ||
|
||
protected async ensureBucket(name: string): Promise<Bucket> { | ||
const bucketsApi = new BucketsAPI(this.influxdbClient); | ||
|
||
const buckets = await bucketsApi.getBuckets(); | ||
let bucket = buckets.buckets.find((bucket) => bucket.name === name); | ||
if (!bucket) { | ||
bucket = await bucketsApi.postBuckets({ | ||
body: { | ||
orgID: this.orgID, | ||
name, | ||
}, | ||
}); | ||
} | ||
|
||
return bucket; | ||
} | ||
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.
Oops, something went wrong.