diff --git a/Cargo.lock b/Cargo.lock index bdf8e4b..4112bb2 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1797,6 +1797,7 @@ dependencies = [ "reqwest", "serde", "serde_json", + "tokio", "worker", ] diff --git a/Cargo.toml b/Cargo.toml index 3e92747..87073ab 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -3,7 +3,6 @@ name = "worker-rust" version = "0.1.0" edition = "2021" license = "Apache-2.0" -license-file = "LICENSE" [lib] crate-type = ["cdylib"] @@ -21,6 +20,7 @@ serde_json = "1.0.116" opentelemetry-http = { version="0.11.1" } opentelemetry-otlp = { version="0.15.0", features = ["metrics", "http-proto"], default-features = false } http = { version = "0.2.12"} +tokio = { version = "1.37.0"} [profile.release] opt-level = "s" # optimize for size in release builds diff --git a/features/metrics.json b/features/metrics.json new file mode 100644 index 0000000..b3143a5 --- /dev/null +++ b/features/metrics.json @@ -0,0 +1,107 @@ +{ + "resourceMetrics": [ + { + "resource": { + "attributes": [ + { + "key": "service.name", + "value": { + "stringValue": "my.service" + } + } + ] + }, + "scopeMetrics": [ + { + "scope": { + "name": "my.library", + "version": "1.0.0", + "attributes": [ + { + "key": "my.scope.attribute", + "value": { + "stringValue": "some scope attribute" + } + } + ] + }, + "metrics": [ + { + "name": "my.counter", + "unit": "1", + "description": "I'm a Counter", + "sum": { + "aggregationTemporality": 1, + "isMonotonic": true, + "dataPoints": [ + { + "asDouble": 5, + "startTimeUnixNano": 1544712660300000000, + "timeUnixNano": 1544712660300000000, + "attributes": [ + { + "key": "my.counter.attr", + "value": { + "stringValue": "some value" + } + } + ] + } + ] + } + }, + { + "name": "my.gauge", + "unit": "1", + "description": "I'm a Gauge", + "gauge": { + "dataPoints": [ + { + "asDouble": 10, + "timeUnixNano": 1544712660300000000, + "attributes": [ + { + "key": "my.gauge.attr", + "value": { + "stringValue": "some value" + } + } + ] + } + ] + } + }, + { + "name": "my.histogram", + "unit": "1", + "description": "I'm a Histogram", + "histogram": { + "aggregationTemporality": 1, + "dataPoints": [ + { + "startTimeUnixNano": 1544712660300000000, + "timeUnixNano": 1544712660300000000, + "count": 3, + "sum": 3, + "bucketCounts": [1,1,1], + "explicitBounds": [1], + "min": 1, + "max": 1, + "attributes": [ + { + "key": "my.histogram.attr", + "value": { + "stringValue": "some value" + } + } + ] + } + ] + } + } + ] + } + ] + } + ] +} \ No newline at end of file diff --git a/features/step_definitions/mf.ts b/features/step_definitions/mf.ts index 0d010e9..4d94deb 100644 --- a/features/step_definitions/mf.ts +++ b/features/step_definitions/mf.ts @@ -1,9 +1,18 @@ import {Log, LogLevel, Miniflare} from "miniflare"; import { MockAgent } from "undici"; +type MfConfig = { + metricsUrl: string|undefined; + cloudflareApiUrl: string|undefined +}; + export class MiniflareDriver { mockAgent = new MockAgent(); mf: Miniflare | undefined; + config: MfConfig = { + metricsUrl: undefined, + cloudflareApiUrl: undefined, + } start(options?: {metricsUrl?: string, cloudflareApiUrl?: string}): Miniflare { this.mockAgent @@ -29,15 +38,12 @@ export class MiniflareDriver { } ); - let metricsUrl = ""; - let cloudflareApiUrl = ""; - if (options !== undefined) { - if (options.metricsUrl !== undefined) { - metricsUrl = options.metricsUrl; - } - if (options.cloudflareApiUrl !== undefined) { - cloudflareApiUrl = options.cloudflareApiUrl; - } + let self = this; + if(self.config.metricsUrl === undefined) { + throw new Error("metricsUrl is not defined!"); + } + if(self.config.cloudflareApiUrl === undefined) { + throw new Error("cloudflareApiUrl is not defined!"); } this.mf = new Miniflare({ @@ -51,6 +57,11 @@ export class MiniflareDriver { compatibilityDate: "2022-04-05", cache: true, modules: true, + bindings: { + METRICS_URL: self.config.metricsUrl, + CLOUDFLARE_API_URL: self.config.cloudflareApiUrl, + CLOUDFLARE_API_KEY: "fake-key", + }, modulesRules: [ { type: "CompiledWasm", include: ["**/*.wasm"], fallthrough: true }, ], @@ -70,7 +81,10 @@ export class MiniflareDriver { async trigger() { this.start({}); - await this.mf?.dispatchFetch("http://localhost:8787/cdn-cgi/mf/scheduled"); + const res = await this.mf?.dispatchFetch("http://localhost:8787/"); + +// await this.mf?.dispatchFetch("http://fake.host/cdn-cgi/mf/scheduled"); + console.log("Triggered worker"); this.dispose(); } } diff --git a/features/step_definitions/o11y_steps.ts b/features/step_definitions/o11y_steps.ts index a7859ad..cd519d2 100644 --- a/features/step_definitions/o11y_steps.ts +++ b/features/step_definitions/o11y_steps.ts @@ -1,22 +1,23 @@ import {After, Given, When, Then} from '@cucumber/cucumber'; -import {cloudflareMockServer, mf, mfConfig, otelServer} from "./state"; +import {cloudflareMockServer, mf, otelServer} from "./state"; import {expect} from "chai"; Given('Worker is configured to point to mock Cloudflare API', function () { cloudflareMockServer.start(); - mfConfig.cloudflareApiUrl = cloudflareMockServer.url(); + mf.config.cloudflareApiUrl = cloudflareMockServer.url(); }); Given('Worker is configured to send metrics to a mock OpenTelemetry collector', function () { otelServer.start(); - mfConfig.metricsUrl = otelServer.metricsUrl(); + mf.config.metricsUrl = otelServer.metricsUrl(); }); When('Worker is triggered', async function () { await mf.trigger(); }); -Then('Worker metrics are published', function () { +Then('Worker metrics are published', async function () { + await new Promise(r => setTimeout(r, 5000)); let metrics = otelServer.getMetrics(); expect(metrics).to.have.length.gte(1); }); diff --git a/features/step_definitions/otel_server.ts b/features/step_definitions/otel_server.ts index d42ad92..08fc1ef 100644 --- a/features/step_definitions/otel_server.ts +++ b/features/step_definitions/otel_server.ts @@ -40,6 +40,7 @@ export class OpenTelemetryServer { indexMetrics() { let self = this; this.metricNames.clear(); + console.log("Indexing metrics", this.metrics); for (let metrics of this.metrics) { for (let resourceMetrics of metrics.resourceMetrics) { for (let scopeMetrics of resourceMetrics.scopeMetrics) { diff --git a/features/step_definitions/state.ts b/features/step_definitions/state.ts index 803f975..8c33415 100644 --- a/features/step_definitions/state.ts +++ b/features/step_definitions/state.ts @@ -6,14 +6,4 @@ const mf = new MiniflareDriver(); const otelServer = new OpenTelemetryServer(); const cloudflareMockServer = new CloudflareMockServer(); -type MfConfig = { - metricsUrl: string|undefined; - cloudflareApiUrl: string|undefined -}; - -const mfConfig: MfConfig = { - metricsUrl: undefined, - cloudflareApiUrl: undefined, -} - -export { mf, mfConfig, otelServer, cloudflareMockServer }; +export { mf, otelServer, cloudflareMockServer }; diff --git a/src/gql.rs b/src/gql.rs index f5179bb..4c3406f 100644 --- a/src/gql.rs +++ b/src/gql.rs @@ -1,7 +1,13 @@ +use std::borrow::Cow; use std::error::Error; +use std::sync::{Arc, Mutex}; use graphql_client::GraphQLQuery; -use opentelemetry::{global, KeyValue}; +use opentelemetry::KeyValue; use opentelemetry::metrics::Unit; +use opentelemetry_sdk::AttributeSet; +use opentelemetry_sdk::metrics::data::{DataPoint, Metric}; +use opentelemetry_sdk::metrics::data::Gauge; +use worker::console_log; // The paths are relative to the directory where your `Cargo.toml` is located. // Both json and the GraphQL schema language are supported as sources for the schema @@ -25,26 +31,52 @@ type Time = String; #[allow(non_camel_case_types)] type uint64 = u64; -pub async fn perform_my_query(variables: get_workers_analytics_query::Variables) -> Result<(), Box> { +pub async fn perform_my_query(cloudflare_api_url: String, cloudflare_api_key: String, variables: get_workers_analytics_query::Variables) -> Result, Box> { + let metrics = Arc::new(Mutex::new(Vec::new())); + let request_body = GetWorkersAnalyticsQuery::build_query(variables); let client = reqwest::Client::new(); - let res = client.post("/graphql").json(&request_body).send().await?; + let res = client.post(cloudflare_api_url) + .bearer_auth(cloudflare_api_key) + .json(&request_body).send().await?; + if !res.status().is_success() { + return Err(Box::new(res.error_for_status().unwrap_err())); + } + let response: get_workers_analytics_query::ResponseData = res.json().await?; + console_log!("Response: {:?}", response); let _ = response.viewer.unwrap().accounts.iter().map(|account| account.workers_invocations_adaptive.iter().map(|worker| { // See https://github.com/lablabs/cloudflare-exporter/blob/05e80d9cc5034c5a40b08f7630e6ca5a54c66b20/prometheus.go#L44C61-L44C93 let requests = worker.sum.as_ref().unwrap().requests; - let meter = global::meter("cloudflare_worker_requests"); - let gauge = meter - .u64_gauge("count") - .with_description("A gauge of the number of requests to a worker.") - .with_unit(Unit::new("requests")) - .init(); - gauge.record( - requests, - &[ - KeyValue::new("script_name", worker.dimensions.as_ref().unwrap().script_name.clone()) - ], - ); + let metric = create_metric("cloudflare_worker_requests".to_string(), "A gauge of the number of requests to a worker.".to_string(), requests, "requests".to_string()).unwrap(); + metrics.lock().unwrap().push(metric); })); - Ok(()) + + let mut metrics_to_return: Vec = Vec::new(); + let mut vec = metrics.lock().unwrap(); + metrics_to_return.extend(vec.drain(..)); + Ok(metrics_to_return) } + +fn create_metric(name: String, description: String, value: uint64, unit:String) -> Result> { + let key_value = KeyValue::new("key", "value"); + let attribute_set: AttributeSet = std::slice::from_ref(&key_value).into(); + let data_point = DataPoint { + attributes: attribute_set, + start_time: None, + time: None, + value, + exemplars: vec![], + }; + let sample: Gauge = Gauge { + data_points: vec![data_point], + }; + Ok(Metric { + name: Cow::from(name), + description: Cow::from(description), + unit: Unit::new(unit), + data: Box::new(sample), + }) +} + + diff --git a/src/http.rs b/src/http.rs deleted file mode 100644 index ddd48ea..0000000 --- a/src/http.rs +++ /dev/null @@ -1,42 +0,0 @@ -use std::fmt::{Debug, Formatter}; -use std::future::Future; -use opentelemetry_http::{Bytes, HttpClient, HttpError, Request, Response}; -use worker::async_trait::async_trait; -use std::error::Error; - -pub struct MyClient { - inner: reqwest::Client, -} - -impl MyClient { - pub fn new() -> MyClient { - let inner = reqwest::Client::new(); - Self { inner } - } - - pub fn execute( - &self, - request: reqwest::Request, - ) -> impl Future> { - self.inner.execute(request) - } -} - -impl Debug for MyClient { - fn fmt(&self, _f: &mut Formatter<'_>) -> std::fmt::Result { - todo!() - } -} - -#[async_trait] -impl HttpClient for MyClient { - async fn send(&self, request: Request>) -> Result, HttpError> { - let res = Response::builder() - .status(200) - .body(Bytes::new()) - .unwrap(); - Ok(res) - } - - -} diff --git a/src/lib.rs b/src/lib.rs index ccf9075..cb4dd7a 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -1,91 +1,61 @@ -use opentelemetry::{global, KeyValue}; -use opentelemetry::metrics::{MetricsError, Unit}; -use opentelemetry_otlp::WithExportConfig; -use opentelemetry_sdk::metrics::SdkMeterProvider; +use opentelemetry_sdk::metrics::data::{ResourceMetrics, ScopeMetrics}; +use opentelemetry_sdk::Resource; +use opentelemetry_stdout::MetricsData; use worker::*; +use worker::wasm_bindgen::JsValue; +use crate::gql::{get_workers_analytics_query, perform_my_query}; mod gql; -mod http; -fn init_metrics() -> - core::result::Result { - let export_config = opentelemetry_otlp::ExportConfig { - endpoint: "http://localhost:4318/v1/metrics".to_string(), - ..opentelemetry_otlp::ExportConfig::default() - }; - let provider = opentelemetry_otlp::new_pipeline() - .metrics(opentelemetry_sdk::runtime::TokioCurrentThread) - .with_exporter( - opentelemetry_otlp::new_exporter() - .http() - .with_http_client(http::MyClient::new()) - .with_export_config(export_config), - ) - .build(); - provider -} - -fn flush_and_shutdown(provider: SdkMeterProvider) -> core::result::Result<(), MetricsError> { - provider.force_flush()?; - provider.shutdown()?; - Ok(()) +#[worker::send] +pub async fn do_fetch( + url: String, + data: Option, +) -> Result { + let mut init = RequestInit::new(); + init.method = Method::Post; + init.with_body(data); + Fetch::Request(Request::new_with_init(url.as_str(), &init)?) + .send() + .await } #[event(fetch)] -async fn main(_req: Request, _env: Env, _ctx: Context) -> Result { - let maybe_provider = init_metrics(); - let provider = match maybe_provider { - Ok(provider) => provider, - Err(err) => { - console_log!("{}", err.to_string()); - return Response::error("failed to init metrics", 500); +async fn main(_req: Request, env: Env, _ctx: Context) -> Result { + let metrics_url = env.var("METRICS_URL")?.to_string(); + let cloudflare_api_url = env.var("CLOUDFLARE_API_URL")?.to_string(); + let cloudflare_api_key = env.var("CLOUDFLARE_API_KEY")?.to_string(); + + let result = perform_my_query(cloudflare_api_url, cloudflare_api_key, get_workers_analytics_query::Variables { + account_tag: Some("123".to_string()), + datetime_start: Some("2021-01-01T00:00:00Z".to_string()), + datetime_end: Some("2021-01-02T00:00:00Z".to_string()), + script_name: None, + }).await; + let cf_metrics = match result { + Ok(metrics) => metrics, + Err(e) => { + console_log!("Querying Cloudflare API failed: {:?}", e); + return Response::error(format!("Error: {:?}", e), 500); } }; - let variables = gql::get_workers_analytics_query::Variables { - account_tag: Some("".to_string()), - datetime_start: Some("".to_string()), - datetime_end: Some("".to_string()), - script_name: Some("".to_string()), - }; - let _ = gql::perform_my_query(variables); - - // Create a meter from the above MeterProvider. - let meter = global::meter("mylibraryname"); - let counter = meter.u64_counter("my_counter").init(); - counter.add( - 10, - &[ - KeyValue::new("mykey1", "myvalue1"), - KeyValue::new("mykey2", "myvalue2"), - ], + let library = opentelemetry::InstrumentationLibrary::new( + "my-crate", + Some(env!("CARGO_PKG_VERSION")), + Some("https:// opentelemetry. io/ schemas/ 1.17.0"), + None, ); - - // Create a Gauge Instrument. - { - let gauge = meter - .f64_gauge("my_gauge") - .with_description("A gauge set to 1.0") - .with_unit(Unit::new("myunit")) - .init(); - - gauge.record( - 1.0, - &[ - KeyValue::new("mykey1", "myvalue1"), - KeyValue::new("mykey2", "myvalue2"), - ], - ); - } - - let res = flush_and_shutdown(provider); - match res { - Ok(_) => { - Response::ok("metrics flushed") - } - Err(err) => { - console_log!("{}", err.to_string()); - Response::error("failed to flushed", 500) - } - } + let scope_metrics = ScopeMetrics { + scope: library, + metrics: cf_metrics, + }; + let mut resource_metrics = ResourceMetrics { + resource: Resource::default(), + scope_metrics: vec![scope_metrics], + }; + let metrics = MetricsData::from(&mut resource_metrics); + let metrics_json = serde_json::to_string(&metrics).unwrap(); + let response = do_fetch(metrics_url, Some(JsValue::from_str(&metrics_json)).into()).await?; + return Ok(response); } diff --git a/wrangler.toml b/wrangler.toml index 2a88b3d..636a991 100644 --- a/wrangler.toml +++ b/wrangler.toml @@ -12,3 +12,8 @@ command = "cargo install -q worker-build && worker-build --release" [env.dev] build = { command = "cargo install -q worker-build && worker-build --dev" } + +[env.dev.vars] +METRICS_URL = "http://collector:4318/v1/metrics" +CLOUDFLARE_API_URL = "https://api.cloudflare.com/client/v4/graphql" +CLOUDFLARE_API_KEY = "fake"