Skip to content

Commit

Permalink
Mysql support
Browse files Browse the repository at this point in the history
  • Loading branch information
Quantumplation committed Feb 28, 2025
1 parent 556749e commit d56a15b
Show file tree
Hide file tree
Showing 6 changed files with 207 additions and 53 deletions.

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

6 changes: 2 additions & 4 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -70,8 +70,8 @@ sqlx = { version = "0.8.3", features = [
"runtime-tokio",
"tls-rustls",
"time",
"chrono",
] }
time = { version = "0.3", features = ["serde"] }
serde-aux = "4.6.0"
chrono = { version = "0.4.40", features = ["serde", "now"] }

Expand Down
13 changes: 13 additions & 0 deletions migrations/20250228020456_payloads.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
-- Add migration script here
CREATE TABLE payloads (
id BIGINT UNSIGNED AUTO_INCREMENT PRIMARY KEY,
timestamp TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP,
source VARCHAR(255),
synthetic VARCHAR(255),
synthetic_numerator VARCHAR(1024),
synthetic_denominator VARCHAR(1024),
validity_lower BIGINT UNSIGNED,
validity_upper BIGINT UNSIGNED,
payload BLOB,
error_text VARCHAR(8192)
);
113 changes: 65 additions & 48 deletions src/bin/pyromaniac.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,12 +6,12 @@ use std::{
};

use anyhow::{Context, Result};
use chrono::{DateTime, Utc};
use clap::Parser;
use reqwest::header::USER_AGENT;
use serde::{Deserialize, Serialize};
use serde_aux::prelude::*;
use sqlx::mysql::MySqlPoolOptions;
use sqlx::{mysql::MySqlPoolOptions, MySql, Pool};
use time::{format_description, OffsetDateTime};

#[derive(Parser, Debug)]
#[command(version, about, long_about = None)]
Expand Down Expand Up @@ -48,7 +48,7 @@ struct OraclePayload {
#[derive(Debug, Serialize)]
struct Payload {
id: Option<u64>,
timestamp: DateTime<Utc>,
timestamp: OffsetDateTime,
source: Option<String>,
synthetic: Option<String>,
synthetic_numerator: Option<String>,
Expand All @@ -68,8 +68,11 @@ async fn main() -> Result<()> {
println!("No directory or MySQL specified");
return Ok(());
}
let mut connection = None;
if let Some(directory) = &args.directory {
fs::create_dir_all(directory)?;
} else if let Some(mysql) = &args.mysql {
connection = Some(MySqlPoolOptions::new().connect(mysql).await?);
}

loop {
Expand Down Expand Up @@ -100,62 +103,28 @@ async fn main() -> Result<()> {
}
};
if let Some(directory) = &args.directory {
let payloads: Result<Vec<_>> = payloads
.iter()
.map(|p| to_payload("butane".to_string(), p).context("failed to convert payload"))
.collect();
let payloads = match payloads {
Ok(payloads) => payloads,
match save_file(payloads, directory).await {
Ok(()) => {}
Err(err) => {
eprintln!("Failed to convert payloads: {}", err);
continue;
eprintln!("Failed to save file: {}", err);
}
};
let path = Path::join(
directory,
format!("{}.json", Utc::now().format("%Y-%m-%d_%H-%M-%S.%f")),
);
println!("Saving to {}", path.display());
match save_file(payloads, &path).await {
}
} else if let Some(connection) = &connection {
println!("Saving to {:?}", args.mysql.clone().unwrap());
match save_db(payloads, connection).await {
Ok(()) => {}
Err(err) => {
eprintln!("Failed to save file: {}", err);
eprintln!("Failed to save database: {}", err);
}
}
}
}
/*
sqlx::query!(
"
INSERT INTO payloads
(source, synthetic, synthetic_numerator, synthetic_denominator, validity_lower, validity_upper, payload, error_text)
VALUES (?, ?, ?, ?, ?, ?, ?, ?)
",
"example_source",
"example_synthetic",
vec![1, 2, 3],
vec![4, 5, 6],
100,
200,
vec![7, 8, 9],
"example_error_text"
)
.execute(&pool)
.await
.context("failed to insert payload")?;
let rows = sqlx::query_as!(Payload, "SELECT * FROM payloads")
.fetch_all(&pool)
.await
.context("failed to fetch payloads")?;
*/
// println!("{:?}", rows);
}

fn to_payload(source: String, payload: &OraclePayload) -> Result<Payload> {
let payload = Payload {
id: None,
timestamp: Utc::now(),
timestamp: OffsetDateTime::now_utc(),
source: Some(source),
synthetic: Some(payload.synthetic.clone()),
synthetic_numerator: Some(payload.synthetic_price.numerator.clone()),
Expand All @@ -168,8 +137,56 @@ fn to_payload(source: String, payload: &OraclePayload) -> Result<Payload> {
Ok(payload)
}

async fn save_file(payload: Vec<Payload>, path: &impl AsRef<Path>) -> Result<()> {
async fn save_file(payloads: Vec<OraclePayload>, directory: impl AsRef<Path>) -> Result<()> {
let payloads: Result<Vec<_>> = payloads
.iter()
.map(|p| to_payload("butane".to_string(), p).context("failed to convert payload"))
.collect();
let payloads = match payloads {
Ok(payloads) => payloads,
Err(err) => {
eprintln!("Failed to convert payloads: {}", err);
return Ok(());
}
};
let format = format_description::parse(
"[year]-[month]-[day] [hour]:[minute]:[second] [offset_hour \
sign:mandatory]:[offset_minute]:[offset_second]",
)?;
let path = Path::join(
directory.as_ref(),
format!("{}.json", OffsetDateTime::now_utc().format(&format)?),
);
println!("Saving to {}", path.display());
let file = File::create(path)?;
let writer = BufWriter::new(file);
serde_json::to_writer(writer, &payload).context("failed to save file")
serde_json::to_writer(writer, &payloads).context("failed to save file")
}

async fn save_db(payloads: Vec<OraclePayload>, connection: &Pool<MySql>) -> Result<()> {
for payload in payloads
.iter()
.map(|p| to_payload("butane".to_string(), &p))
{
let payload = payload?;
sqlx::query!(
"
INSERT INTO payloads
(source, synthetic, synthetic_numerator, synthetic_denominator, validity_lower, validity_upper, payload, error_text)
VALUES (?, ?, ?, ?, ?, ?, ?, ?)
",
payload.source,
payload.synthetic,
payload.synthetic_numerator,
payload.synthetic_denominator,
payload.validity_lower,
payload.validity_upper,
payload.payload,
payload.error_text,
)
.execute(connection)
.await
.context("failed to save to database")?;
}
Ok(())
}

0 comments on commit d56a15b

Please sign in to comment.