diff --git a/Cargo.lock b/Cargo.lock index 7cb17c4a..d26b999d 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2033,6 +2033,7 @@ dependencies = [ "aws-sdk-s3", "aws-sdk-sqs", "aws-smithy-runtime-api", + "bytes", "clap", "dotenvy", "envy", @@ -4126,6 +4127,7 @@ version = "0.1.0" dependencies = [ "alloy", "async-trait", + "bytes", "chrono", "hex", "hypersync-client", @@ -5680,9 +5682,9 @@ dependencies = [ [[package]] name = "schnellru" -version = "0.2.3" +version = "0.2.4" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "c9a8ef13a93c54d20580de1e5c413e624e53121d42fc7e2c11d10ef7f8b02367" +checksum = "356285bbf17bea63d9e52e96bd18f039672ac92b55b8cb997d6162a2a37d1649" dependencies = [ "ahash 0.8.11", "cfg-if", @@ -5921,6 +5923,7 @@ name = "shared-utils" version = "0.1.0" dependencies = [ "bytes", + "http 1.2.0", "log", "macon", "models", @@ -5930,6 +5933,7 @@ dependencies = [ "sqlx", "thiserror 2.0.11", "tokio", + "tracing", "utoipa", ] diff --git a/Cargo.toml b/Cargo.toml index 87e4d12d..fa1235cb 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -9,6 +9,7 @@ async-trait = "0.1.83" aws-config = { version = "1.0.1", features = ["behavior-version-latest"] } aws-sdk-sqs = { version = "1.3.0" } aws-smithy-runtime-api = "1.7.0" +bytes = "1.8.0" chrono = { version = "0.4.35", features = ["serde"] } clap = { version = "4.5.20", features = ["derive"] } dotenvy = "0.15.7" diff --git a/consumer/Cargo.toml b/consumer/Cargo.toml index 16535a07..109333a1 100644 --- a/consumer/Cargo.toml +++ b/consumer/Cargo.toml @@ -20,6 +20,7 @@ aws-config.workspace = true aws-sdk-s3 = "1.20.0" aws-sdk-sqs.workspace = true aws-smithy-runtime-api.workspace = true +bytes.workspace = true dotenvy.workspace = true envy.workspace = true futures = "0.3.31" diff --git a/consumer/src/error.rs b/consumer/src/error.rs index 86e3b48c..702b485b 100644 --- a/consumer/src/error.rs +++ b/consumer/src/error.rs @@ -69,6 +69,8 @@ pub enum ConsumerError { aws_smithy_runtime_api::http::Response, >, ), + #[error("ByteObject error")] + ByteObjectError(String), #[error("Deposited error")] Deposited(String), #[error("Failed to delete claim: {0}")] @@ -81,6 +83,8 @@ pub enum ConsumerError { Envy(#[from] envy::Error), #[error("Failed to resolve ENS data: {0}")] Ens(String), + #[error("Failed to get bytes from IPFS response")] + FailedToGetBytes, #[error(transparent)] Hex(#[from] hex::FromHexError), #[error(transparent)] diff --git a/consumer/src/mode/decoded/atom/atom_creation.rs b/consumer/src/mode/decoded/atom/atom_creation.rs index 3c829b00..9ca02eef 100644 --- a/consumer/src/mode/decoded/atom/atom_creation.rs +++ b/consumer/src/mode/decoded/atom/atom_creation.rs @@ -197,9 +197,15 @@ impl AtomCreated { .decode_atom_data_and_update_atom(&mut atom, decoded_consumer_context) .await?; - // get the supported atom metadata + // get the supported atom metadata and update the atom metadata let supported_atom_metadata = get_supported_atom_metadata(&mut atom, &decoded_atom_data, decoded_consumer_context) + .await? + .update_atom_metadata( + &mut atom, + &decoded_consumer_context.pg_pool, + &decoded_consumer_context.backend_schema, + ) .await?; // Handle the account or caip10 type @@ -208,14 +214,6 @@ impl AtomCreated { .handle_account_or_caip10_type(&resolved_atom, decoded_consumer_context) .await?; - // Update the atom metadata to reflect the supported atom type - supported_atom_metadata - .update_atom_metadata( - &mut atom, - &decoded_consumer_context.pg_pool, - &decoded_consumer_context.backend_schema, - ) - .await?; // Create the event self.create_event(decoded_message, decoded_consumer_context) .await?; diff --git a/consumer/src/mode/decoded/atom/atom_supported_types.rs b/consumer/src/mode/decoded/atom/atom_supported_types.rs index 536c26ea..1551e055 100644 --- a/consumer/src/mode/decoded/atom/atom_supported_types.rs +++ b/consumer/src/mode/decoded/atom/atom_supported_types.rs @@ -3,7 +3,7 @@ use crate::{ mode::{ decoded::utils::{short_id, update_account_with_atom_id}, resolver::{ - atom_resolver::{try_to_parse_json, try_to_resolve_schema_org_url}, + atom_resolver::{try_to_parse_json_or_text, try_to_resolve_schema_org_url}, types::{ResolveAtom, ResolverConsumerMessage}, }, types::DecodedConsumerContext, @@ -51,6 +51,16 @@ impl AtomMetadata { } } + /// Creates a new atom metadata for a byte object + pub fn byte_object(image: Option) -> Self { + Self { + label: "byte object".to_string(), + emoji: "🔢".to_string(), + atom_type: "ByteObject".to_string(), + image, + } + } + /// Creates a new atom metadata for a caip10 pub fn caip10(caip10: String) -> Self { Self { @@ -137,6 +147,17 @@ impl AtomMetadata { } } } + + /// Creates a new atom metadata for a json object + pub fn json_object(image: Option) -> Self { + Self { + label: "json object".to_string(), + emoji: "📦".to_string(), + atom_type: "JsonObject".to_string(), + image, + } + } + /// Creates a new atom metadata for a keywords predicate pub fn keywords_predicate(image: Option) -> Self { Self { @@ -197,6 +218,16 @@ impl AtomMetadata { } } + /// Creates a new atom metadata for a text object + pub fn text_object(image: Option) -> Self { + Self { + label: "text object".to_string(), + emoji: "📝".to_string(), + atom_type: "TextObject".to_string(), + image, + } + } + /// Creates a new atom metadata for a thing pub fn thing(name: String, image: Option) -> Self { Self { @@ -281,13 +312,18 @@ impl AtomMetadata { atom: &mut Atom, pg_pool: &PgPool, backend_schema: &str, - ) -> Result<(), ConsumerError> { + ) -> Result { atom.emoji = Some(self.emoji.clone()); atom.atom_type = AtomType::from_str(&self.atom_type)?; atom.label = Some(self.label.clone()); atom.image = self.image.clone(); atom.upsert(pg_pool, backend_schema).await?; - Ok(()) + Ok(AtomMetadata { + label: self.label.clone(), + emoji: self.emoji.clone(), + atom_type: self.atom_type.clone(), + image: self.image.clone(), + }) } } @@ -393,7 +429,8 @@ pub async fn get_supported_atom_metadata( // 5. Now we try to parse the JSON and return the metadata. At this point // the resolver will handle the rest of the cases. - let metadata = try_to_parse_json(decoded_atom_data, atom, decoded_consumer_context).await?; + let metadata = + try_to_parse_json_or_text(decoded_atom_data, atom, decoded_consumer_context).await?; Ok(metadata) } diff --git a/consumer/src/mode/decoded/deposited.rs b/consumer/src/mode/decoded/deposited.rs index e7f3cf71..c4f86568 100644 --- a/consumer/src/mode/decoded/deposited.rs +++ b/consumer/src/mode/decoded/deposited.rs @@ -128,12 +128,14 @@ impl Deposited { &self, event: &DecodedMessage, decoded_consumer_context: &DecodedConsumerContext, + deposit_id: String, ) -> Result { // Create the event let event = if self.isTriple { Event::builder() .id(DecodedMessage::event_id(event)) .event_type(EventType::Deposited) + .deposit_id(deposit_id) .block_number(U256Wrapper::try_from(event.block_number)?) .block_timestamp(event.block_timestamp) .transaction_hash(event.transaction_hash.clone()) @@ -143,6 +145,7 @@ impl Deposited { Event::builder() .id(DecodedMessage::event_id(event)) .event_type(EventType::Deposited) + .deposit_id(deposit_id) .block_number(U256Wrapper::try_from(event.block_number)?) .block_timestamp(event.block_timestamp) .transaction_hash(event.transaction_hash.clone()) @@ -326,14 +329,15 @@ impl Deposited { )?; // Create deposit record - self.create_deposit(event, decoded_consumer_context).await?; + let deposit = self.create_deposit(event, decoded_consumer_context).await?; // Handle position and related entities self.handle_position_and_claims(decoded_consumer_context, &vault) .await?; // Create event - self.create_event(event, decoded_consumer_context).await?; + self.create_event(event, decoded_consumer_context, deposit.id) + .await?; // Create signal self.create_signal(decoded_consumer_context, event, &vault) diff --git a/consumer/src/mode/resolver/atom_resolver.rs b/consumer/src/mode/resolver/atom_resolver.rs index 5741bd7f..621967e7 100644 --- a/consumer/src/mode/resolver/atom_resolver.rs +++ b/consumer/src/mode/resolver/atom_resolver.rs @@ -5,18 +5,23 @@ use crate::{ types::{AtomUpdater, ResolverConsumerContext}, }, }; +use bytes::Bytes; use models::{ atom::{Atom, AtomType}, atom_value::AtomValue, book::Book, + byte_object::ByteObject, + json_object::JsonObject, organization::Organization, person::Person, + text_object::TextObject, thing::Thing, traits::SimpleCrud, }; +use reqwest::Response; use serde_json::Value; use std::str::FromStr; -use tracing::warn; +use tracing::{info, warn}; /// Supported schema.org contexts pub const SCHEMA_ORG_CONTEXTS: [&str; 4] = [ @@ -30,22 +35,24 @@ pub const SCHEMA_ORG_CONTEXTS: [&str; 4] = [ pub async fn try_to_resolve_ipfs_uri( atom_data: &str, resolver_consumer_context: &ResolverConsumerContext, -) -> Result, ConsumerError> { +) -> Result, ConsumerError> { // Handle IPFS URIs + warn!("Trying to resolve IPFS URI: {}", atom_data); if let Some(ipfs_hash) = atom_data.strip_prefix("ipfs://") { if let Ok(ipfs_data) = resolver_consumer_context .ipfs_resolver .fetch_from_ipfs(ipfs_hash) .await { - // Remove UTF-8 BOM if present - let data = ipfs_data.replace('\u{feff}', ""); - Ok(Some(data)) + // At this point we dont know what type of data is contained in the response, + // we just know that the response is valid + Ok(Some(ipfs_data)) } else { warn!("Failed to fetch IPFS data, atom data: {}", atom_data); Ok(None) } } else { + warn!("Atom data is not an IPFS URI: {}", atom_data); Ok(None) } } @@ -66,97 +73,6 @@ pub async fn try_to_resolve_schema_org_url( } } -/// Tries to parse JSON and handle schema.org data -pub async fn try_to_parse_json( - atom_data: &str, - atom: &Atom, - consumer_context: &impl AtomUpdater, -) -> Result { - // TODO: What if the JSON is not valid? Should we return an error? - // Currently, we just return unknown metadata - if let Ok(json) = serde_json::from_str::(atom_data) { - match json.get("@context").and_then(|c| c.as_str()) { - Some(ctx_str) if SCHEMA_ORG_CONTEXTS.contains(&ctx_str) => { - let metadata = - try_to_resolve_schema_org_properties(consumer_context, atom, &json).await?; - Ok(metadata) - } - // TODO: Handle unsuported schemas - Some(ctx_str) if !SCHEMA_ORG_CONTEXTS.contains(&ctx_str) => { - warn!("Unsupported schema.org context: {}", ctx_str); - Ok(AtomMetadata::unknown()) - } - _ => { - // TODO: Handle unknown contexts - warn!("No @context found in JSON: {:?}", json); - Ok(AtomMetadata::unknown()) - } - } - } else { - Ok(AtomMetadata::unknown()) - } -} - -/// Creates an atom value for a thing -pub async fn create_thing_atom_value( - atom: &Atom, - thing: &Thing, - consumer_context: &impl AtomUpdater, -) -> Result<(), ConsumerError> { - AtomValue::builder() - .id(atom.id.clone()) - .thing_id(thing.id.clone()) - .build() - .upsert(consumer_context.pool(), consumer_context.backend_schema()) - .await?; - Ok(()) -} - -/// Creates an atom value for a person -pub async fn create_person_atom_value( - atom: &Atom, - person: &Person, - consumer_context: &impl AtomUpdater, -) -> Result<(), ConsumerError> { - AtomValue::builder() - .id(atom.id.clone()) - .person_id(person.id.clone()) - .build() - .upsert(consumer_context.pool(), consumer_context.backend_schema()) - .await?; - Ok(()) -} - -/// Creates an atom value for an organization -pub async fn create_organization_atom_value( - atom: &Atom, - organization: &Organization, - consumer_context: &impl AtomUpdater, -) -> Result<(), ConsumerError> { - AtomValue::builder() - .id(atom.id.clone()) - .organization_id(organization.id.clone()) - .build() - .upsert(consumer_context.pool(), consumer_context.backend_schema()) - .await?; - Ok(()) -} - -/// Creates an atom value for a book -pub async fn create_book_atom_value( - atom: &Atom, - book: &Book, - consumer_context: &impl AtomUpdater, -) -> Result<(), ConsumerError> { - AtomValue::builder() - .id(atom.id.clone()) - .book_id(book.id.clone()) - .build() - .upsert(consumer_context.pool(), consumer_context.backend_schema()) - .await?; - Ok(()) -} - /// Resolves schema.org properties async fn try_to_resolve_schema_org_properties( consumer_context: &impl AtomUpdater, @@ -219,6 +135,31 @@ async fn try_to_resolve_schema_org_properties( } } +/// Creates a ByteObject from a schema.org object +pub fn create_byte_object_from_obj(atom: &Atom, obj: Vec) -> Result { + let byte_object = ByteObject::builder().id(atom.id.clone()).data(obj).build(); + if !byte_object.data.is_empty() && byte_object.data.len() <= 1_000_000 { + Ok(byte_object) + } else { + Err(ConsumerError::ByteObjectError( + "Failed to create ByteObject".to_string(), + )) + } +} + +/// Creates a JsonObject from a schema.org object +pub fn create_json_object_from_obj(atom: &Atom, obj: &Value) -> JsonObject { + JsonObject::builder() + .id(atom.id.clone()) + .data(obj.to_string()) + .build() +} + +/// Creates a TextObject from a schema.org object +pub fn create_text_object_from_obj(atom: &Atom, obj: &str) -> TextObject { + TextObject::builder().id(atom.id.clone()).data(obj).build() +} + /// Creates a Thing from a schema.org object pub fn create_thing_from_obj(atom: &Atom, obj: &Value) -> Thing { Thing::builder() @@ -354,3 +295,196 @@ pub fn create_book_from_obj(atom: &Atom, obj: &Value) -> Book { ) .build() } + +/// Handles schema.org JSON +async fn handle_schema_org_json( + consumer_context: &impl AtomUpdater, + atom: &Atom, + json: &Value, +) -> Result { + let metadata = try_to_resolve_schema_org_properties(consumer_context, atom, json).await?; + Ok(metadata) +} + +/// Handles regular JSON +async fn handle_regular_json( + consumer_context: &impl AtomUpdater, + atom: &Atom, + json: &Value, +) -> Result { + info!( + "No @context found in JSON: {:?}, returning it as JsonObject", + json + ); + let json_object = create_json_object_from_obj(atom, json) + .upsert(consumer_context.pool(), consumer_context.backend_schema()) + .await?; + create_json_object_atom_value(atom, &json_object, consumer_context).await?; + Ok(AtomMetadata::json_object(None)) +} + +/// Handles binary data +pub async fn handle_binary_data( + consumer_context: &impl AtomUpdater, + atom: &Atom, + atom_data: Bytes, +) -> Result { + info!("Data is likely binary, returning it as ByteObject"); + let byte_object = create_byte_object_from_obj(atom, atom_data.to_vec()); + match byte_object { + Ok(byte_object) => { + byte_object + .upsert(consumer_context.pool(), consumer_context.backend_schema()) + .await?; + create_byte_object_atom_value(atom, &byte_object, consumer_context).await?; + Ok(AtomMetadata::byte_object(None)) + } + Err(e) => { + warn!("Failed to create ByteObject: {}", e); + Ok(AtomMetadata::unknown()) + } + } +} + +/// Handles text data +async fn handle_text_data( + consumer_context: &impl AtomUpdater, + atom: &Atom, + atom_data: &str, +) -> Result { + // We need to ignore the case where the atom data is an IPFS URI, as we already handled it + // in the previous step + if atom_data.starts_with("ipfs://") { + return Ok(AtomMetadata::unknown()); + } + + info!("Data is likely text, returning it as TextObject"); + let text_object = create_text_object_from_obj(atom, atom_data) + .upsert(consumer_context.pool(), consumer_context.backend_schema()) + .await?; + create_text_object_atom_value(atom, &text_object, consumer_context).await?; + Ok(AtomMetadata::text_object(None)) +} + +/// Tries to parse JSON +pub async fn try_to_parse_json_or_text( + atom_data: &str, + atom: &Atom, + consumer_context: &impl AtomUpdater, +) -> Result { + if let Ok(json) = serde_json::from_str::(atom_data) { + match json.get("@context").and_then(|c| c.as_str()) { + Some(ctx_str) if SCHEMA_ORG_CONTEXTS.contains(&ctx_str) => { + handle_schema_org_json(consumer_context, atom, &json).await + } + _ => handle_regular_json(consumer_context, atom, &json).await, + } + } else { + handle_text_data(consumer_context, atom, atom_data).await + } +} + +/// Creates an atom value for a byte object +pub async fn create_byte_object_atom_value( + atom: &Atom, + byte_object: &ByteObject, + consumer_context: &impl AtomUpdater, +) -> Result<(), ConsumerError> { + AtomValue::builder() + .id(atom.id.clone()) + .byte_object_id(byte_object.id.clone()) + .build() + .upsert(consumer_context.pool(), consumer_context.backend_schema()) + .await?; + Ok(()) +} + +/// Creates an atom value for a text object +pub async fn create_text_object_atom_value( + atom: &Atom, + text_object: &TextObject, + consumer_context: &impl AtomUpdater, +) -> Result<(), ConsumerError> { + AtomValue::builder() + .id(atom.id.clone()) + .text_object_id(text_object.id.clone()) + .build() + .upsert(consumer_context.pool(), consumer_context.backend_schema()) + .await?; + Ok(()) +} + +/// Creates an atom value for a json object +pub async fn create_json_object_atom_value( + atom: &Atom, + json_object: &JsonObject, + consumer_context: &impl AtomUpdater, +) -> Result<(), ConsumerError> { + AtomValue::builder() + .id(atom.id.clone()) + .json_object_id(json_object.id.clone()) + .build() + .upsert(consumer_context.pool(), consumer_context.backend_schema()) + .await?; + Ok(()) +} + +/// Creates an atom value for a thing +pub async fn create_thing_atom_value( + atom: &Atom, + thing: &Thing, + consumer_context: &impl AtomUpdater, +) -> Result<(), ConsumerError> { + AtomValue::builder() + .id(atom.id.clone()) + .thing_id(thing.id.clone()) + .build() + .upsert(consumer_context.pool(), consumer_context.backend_schema()) + .await?; + Ok(()) +} + +/// Creates an atom value for a person +pub async fn create_person_atom_value( + atom: &Atom, + person: &Person, + consumer_context: &impl AtomUpdater, +) -> Result<(), ConsumerError> { + AtomValue::builder() + .id(atom.id.clone()) + .person_id(person.id.clone()) + .build() + .upsert(consumer_context.pool(), consumer_context.backend_schema()) + .await?; + Ok(()) +} + +/// Creates an atom value for an organization +pub async fn create_organization_atom_value( + atom: &Atom, + organization: &Organization, + consumer_context: &impl AtomUpdater, +) -> Result<(), ConsumerError> { + AtomValue::builder() + .id(atom.id.clone()) + .organization_id(organization.id.clone()) + .build() + .upsert(consumer_context.pool(), consumer_context.backend_schema()) + .await?; + Ok(()) +} + +/// Creates an atom value for a book +pub async fn create_book_atom_value( + atom: &Atom, + book: &Book, + consumer_context: &impl AtomUpdater, +) -> Result<(), ConsumerError> { + AtomValue::builder() + .id(atom.id.clone()) + .book_id(book.id.clone()) + .build() + .upsert(consumer_context.pool(), consumer_context.backend_schema()) + .await?; + Ok(()) +} diff --git a/consumer/src/mode/resolver/types.rs b/consumer/src/mode/resolver/types.rs index 5ae1d724..92b08190 100644 --- a/consumer/src/mode/resolver/types.rs +++ b/consumer/src/mode/resolver/types.rs @@ -1,9 +1,12 @@ use crate::{ error::ConsumerError, mode::{ + decoded::atom::atom_supported_types::AtomMetadata, ipfs_upload::types::IpfsUploadMessage, resolver::{ - atom_resolver::{try_to_parse_json, try_to_resolve_ipfs_uri}, + atom_resolver::{ + handle_binary_data, try_to_parse_json_or_text, try_to_resolve_ipfs_uri, + }, ens_resolver::Ens, }, types::ResolverConsumerContext, @@ -118,43 +121,106 @@ impl ResolverMessageType { resolver_consumer_context: &ResolverConsumerContext, resolver_message: &ResolveAtom, ) -> Result<(), ConsumerError> { - let data = try_to_resolve_ipfs_uri( - &resolver_message - .atom - .data - .clone() - .ok_or(ConsumerError::AtomDataNotFound)?, - resolver_consumer_context, - ) - .await?; - // If we resolved an IPFS URI, we need to try to parse the JSON - let metadata = if let Some(data) = data { - info!("Atom data is an IPFS URI: {data}"); - try_to_parse_json(&data, &resolver_message.atom, resolver_consumer_context).await? + let metadata = self + .resolve_and_parse_atom_data(resolver_consumer_context, resolver_message) + .await?; + + // If the atom type is not unknown, we handle the new atom type that was resolved + if AtomType::from_str(&metadata.atom_type)? != AtomType::Unknown { + self.handle_known_atom_type(resolver_consumer_context, resolver_message, metadata) + .await?; + } else { + self.mark_atom_as_failed(resolver_consumer_context, resolver_message) + .await?; + } + Ok(()) + } + + /// This function resolves and parses the atom data + async fn resolve_and_parse_atom_data( + &self, + resolver_consumer_context: &ResolverConsumerContext, + resolver_message: &ResolveAtom, + ) -> Result { + let atom_data = resolver_message + .atom + .data + .clone() + .ok_or(ConsumerError::AtomDataNotFound)?; + + // We check if the atom data is an IPFS URI and if it is, we fetch the data from the IPFS node + let data = try_to_resolve_ipfs_uri(&atom_data, resolver_consumer_context).await?; + // let text = data.text().await; + // let bytes = data.bytes().await; + + // This is the case where we receive a response from the IPFS node, but we dont know yet + // if the response is a JSON or a binary file. + if let Some(data) = data { + info!("Atom data is an IPFS URI and we have a response from the IPFS node"); + // First we try to decode the response as bytes + match data.bytes().await { + Ok(bytes) => { + // Try to convert bytes to text + match String::from_utf8(bytes.to_vec()) { + Ok(text) => { + info!("Trying to get text from {}", text); + let data = text.replace('\u{feff}', ""); + try_to_parse_json_or_text( + &data, + &resolver_message.atom, + resolver_consumer_context, + ) + .await + } + Err(_) => { + info!("Failed to parse as text, trying to parse atom data as Binary"); + handle_binary_data( + resolver_consumer_context, + &resolver_message.atom, + bytes, + ) + .await + } + } + } + Err(e) => { + info!("Failed to get bytes from IPFS response: {e}"); + Err(ConsumerError::FailedToGetBytes) + } + } + // This is the case where the atom data is not an IPFS URI, so we try to parse it as JSON } else { info!( - "No IPFS URI found or IPFS URI is not valid, trying to parse atom data as JSON..." + "No IPFS URI found or IPFS URI is not valid, trying to parse atom data as JSON or text..." ); - // This is the fallback case, where we try to parse the atom data as JSON - // even if it's not a valid IPFS URI. This is useful for cases where the - // atom data is a JSON object that is not a schema.org URL. - try_to_parse_json( - &resolver_message - .atom - .data - .clone() - .ok_or(ConsumerError::AtomDataNotFound)?, + try_to_parse_json_or_text( + &atom_data, &resolver_message.atom, resolver_consumer_context, ) - .await? - }; + .await + } + } - // If at this point we have an atom type that is not unknown (it means it changes it state), - // we need to update the atom metadata - if AtomType::from_str(&metadata.atom_type)? != AtomType::Unknown { - let atom = Atom::find_by_id( - resolver_message.atom.id.clone(), + /// This function handles the known atom type + async fn handle_known_atom_type( + &self, + resolver_consumer_context: &ResolverConsumerContext, + resolver_message: &ResolveAtom, + metadata: AtomMetadata, + ) -> Result<(), ConsumerError> { + let atom = self + .find_and_update_atom(resolver_consumer_context, resolver_message, metadata) + .await?; + + if let Some(image) = atom.image.clone() { + self.handle_atom_image(resolver_consumer_context, image) + .await?; + } + + resolver_message + .atom + .mark_as_resolved( &resolver_consumer_context.pg_pool, &resolver_consumer_context .server_initialize @@ -163,56 +229,74 @@ impl ResolverMessageType { ) .await?; - if let Some(mut atom) = atom { - metadata - .update_atom_metadata( - &mut atom, - &resolver_consumer_context.pg_pool, - &resolver_consumer_context - .server_initialize - .env - .backend_schema, - ) - .await?; + info!("Updated atom metadata: {atom:?}"); + Ok(()) + } - // If the atom has an image, we need to download it and classify it - if let Some(image) = metadata.image { - // If we receive an image, we send it to the IPFS upload consumer - // to be classified and stored - info!("Sending image to IPFS upload consumer: {}", image); - resolver_consumer_context - .client - .send_message(serde_json::to_string(&IpfsUploadMessage { image })?, None) - .await?; - } + /// This function finds the atom and updates its metadata + async fn find_and_update_atom( + &self, + resolver_consumer_context: &ResolverConsumerContext, + resolver_message: &ResolveAtom, + metadata: AtomMetadata, + ) -> Result { + let atom = Atom::find_by_id( + resolver_message.atom.id.clone(), + &resolver_consumer_context.pg_pool, + &resolver_consumer_context + .server_initialize + .env + .backend_schema, + ) + .await? + .ok_or(ConsumerError::AtomNotFound)?; - // Mark the atom as resolved - resolver_message - .atom - .mark_as_resolved( - &resolver_consumer_context.pg_pool, - &resolver_consumer_context - .server_initialize - .env - .backend_schema, - ) - .await?; - info!("Updated atom metadata: {atom:?}"); - } - } else { - // Mark the atom as failed - resolver_message - .atom - .mark_as_failed( - &resolver_consumer_context.pg_pool, - &resolver_consumer_context - .server_initialize - .env - .backend_schema, - ) - .await?; - } - Ok(()) + let mut atom = atom; + metadata + .update_atom_metadata( + &mut atom, + &resolver_consumer_context.pg_pool, + &resolver_consumer_context + .server_initialize + .env + .backend_schema, + ) + .await?; + + Ok(atom) + } + + /// This function handles the atom image + async fn handle_atom_image( + &self, + resolver_consumer_context: &ResolverConsumerContext, + image: String, + ) -> Result<(), ConsumerError> { + info!("Sending image to IPFS upload consumer: {}", image); + resolver_consumer_context + .client + .send_message(serde_json::to_string(&IpfsUploadMessage { image })?, None) + .await + .map_err(ConsumerError::from) + } + + /// This function marks the atom as failed + async fn mark_atom_as_failed( + &self, + resolver_consumer_context: &ResolverConsumerContext, + resolver_message: &ResolveAtom, + ) -> Result<(), ConsumerError> { + resolver_message + .atom + .mark_as_failed( + &resolver_consumer_context.pg_pool, + &resolver_consumer_context + .server_initialize + .env + .backend_schema, + ) + .await + .map_err(ConsumerError::from) } /// This function updates the account metadata diff --git a/docker-compose.yml b/docker-compose.yml index 8c18c36b..876dc05d 100644 --- a/docker-compose.yml +++ b/docker-compose.yml @@ -49,7 +49,7 @@ services: - ~/.aws/:/root/.aws:ro - ./logs:/app/logs deploy: - replicas: 2 + replicas: 5 ipfs_upload_consumer: image: ghcr.io/0xintuition/consumer:latest @@ -237,6 +237,8 @@ services: environment: IPFS_PATH: /data/ipfs PINATA_API_JWT: $PINATA_API_JWT + QUIC_GO_DISABLE_RECEIVE_BUFFER_WARNING: "true" + QUIC_GO_SET_RECEIVE_BUFFER: "false" volumes: - ipfs_data:/data/ipfs entrypoint: /bin/sh @@ -251,24 +253,23 @@ services: ipfs config Addresses.API /ip4/0.0.0.0/tcp/5001; ipfs config Addresses.Gateway /ip4/0.0.0.0/tcp/8080; - # Add Pinata as a remote pinning service - ipfs pin remote service add Pinata https://api.pinata.cloud/psa $PINATA_API_JWT; - - # Configure Pinata gateway and use subdomains - ipfs config --json Gateway.PublicGateways '{\"localhost\": {\"UseSubdomains\": false,\"Paths\": [\"/ipfs\", \"/ipns\"]},\"ipfs\": {\"UseSubdomains\": false,\"Paths\": [\"/ipfs\", \"/ipns\"]}}'; - - # Set Pinata as the default remote pinning service - ipfs config --json Pinning.RemoteServices '[\"Pinata\"]'; - - # Configure policy to automatically pin all files to Pinata (MFS policy) - ipfs config --json Pinning.RemoteServices.Pinata.Policies '{\"MFS\":{\"Enable\": false,\"PinName\": \"\",\"RepinInterval\": \"1m\"}}'; - - # Add Peering to Pinata (optional) - ipfs config --json Peering.Peers '[{\"ID\": \"Qma8ddFEQWEU8ijWvdxXm3nxU7oHsRtCykAaVz8WUYhiKn\", \"Addrs\": [\"/dnsaddr/bitswap.pinata.cloud\"]}]'; + # Add Pinata as a remote pinning service with proper JSON format + ipfs config --json Services.RemotePinning.Pinata '{ + \"api\": { + \"endpoint\": \"https://api.pinata.cloud/psa\", + \"key\": \"'\"$PINATA_API_JWT\"'\" + } + }'; + ipfs config --json Peering.Peers '[{\"ID\": \"Qma8ddFEQWEU8ijWvdxXm3nxU7oHsRtCykAaVz8WUYhiKn\", \"Addrs\": [\"/dnsaddr/bitswap.pinata.cloud\"]}]'; + # Start the IPFS daemon exec ipfs daemon --migrate=true --agent-version-suffix=docker " + ulimits: + nofile: + soft: 65536 + hard: 65536 # substreams-sink: # container_name: substreams-sink diff --git a/models/Cargo.toml b/models/Cargo.toml index 225dddd7..28113de0 100644 --- a/models/Cargo.toml +++ b/models/Cargo.toml @@ -6,6 +6,7 @@ edition = "2021" [dependencies] alloy.workspace = true async-trait.workspace = true +bytes = "1.9.0" chrono.workspace = true hex.workspace = true hypersync-client.workspace = true @@ -21,4 +22,4 @@ tokio.workspace = true utoipa.workspace = true [package.metadata.cargo-machete] -ignored = ["strum"] \ No newline at end of file +ignored = ["strum"] diff --git a/models/src/atom.rs b/models/src/atom.rs index 0eed0929..1435c15f 100644 --- a/models/src/atom.rs +++ b/models/src/atom.rs @@ -42,15 +42,18 @@ pub enum AtomResolvingStatus { #[sqlx(type_name = "atom_type")] pub enum AtomType { Account, + ByteObject, Book, Caip10, FollowAction, + JsonObject, Keywords, LikeAction, Organization, OrganizationPredicate, Person, PersonPredicate, + TextObject, Thing, ThingPredicate, Unknown, diff --git a/models/src/atom_value.rs b/models/src/atom_value.rs index 07aef0e1..227ebef3 100644 --- a/models/src/atom_value.rs +++ b/models/src/atom_value.rs @@ -15,6 +15,9 @@ pub struct AtomValue { pub person_id: Option, pub organization_id: Option, pub book_id: Option, + pub json_object_id: Option, + pub text_object_id: Option, + pub byte_object_id: Option, } /// This is the implementation of the `Model` trait for the `AtomValue` struct. @@ -26,21 +29,27 @@ impl SimpleCrud for AtomValue { async fn upsert(&self, pool: &PgPool, schema: &str) -> Result { let query = format!( r#" - INSERT INTO {}.atom_value (id, account_id, thing_id, person_id, organization_id, book_id) - VALUES ($1, $2, $3, $4, $5, $6) + INSERT INTO {}.atom_value (id, account_id, thing_id, person_id, organization_id, book_id, json_object_id, text_object_id, byte_object_id) + VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9) ON CONFLICT (id) DO UPDATE SET account_id = EXCLUDED.account_id, thing_id = EXCLUDED.thing_id, person_id = EXCLUDED.person_id, organization_id = EXCLUDED.organization_id, - book_id = EXCLUDED.book_id + book_id = EXCLUDED.book_id, + json_object_id = EXCLUDED.json_object_id, + text_object_id = EXCLUDED.text_object_id, + byte_object_id = EXCLUDED.byte_object_id RETURNING id, account_id, thing_id, person_id, organization_id, - book_id + book_id, + json_object_id, + text_object_id, + byte_object_id "#, schema ); @@ -60,6 +69,21 @@ impl SimpleCrud for AtomValue { .and_then(|w| w.to_big_decimal().ok()), ) .bind(self.book_id.as_ref().and_then(|w| w.to_big_decimal().ok())) + .bind( + self.json_object_id + .as_ref() + .and_then(|w| w.to_big_decimal().ok()), + ) + .bind( + self.text_object_id + .as_ref() + .and_then(|w| w.to_big_decimal().ok()), + ) + .bind( + self.byte_object_id + .as_ref() + .and_then(|w| w.to_big_decimal().ok()), + ) .fetch_one(pool) .await .map_err(|e| ModelError::InsertError(e.to_string())) diff --git a/models/src/byte_object.rs b/models/src/byte_object.rs new file mode 100644 index 00000000..a07e22ec --- /dev/null +++ b/models/src/byte_object.rs @@ -0,0 +1,65 @@ +use crate::{ + error::ModelError, + traits::{Model, SimpleCrud}, + types::U256Wrapper, +}; +use async_trait::async_trait; +use sqlx::PgPool; + +/// ByteObject is a struct that represents a byte object in the database. +#[derive(Debug, sqlx::FromRow, Builder)] +#[sqlx(type_name = "byte_object")] +pub struct ByteObject { + pub id: U256Wrapper, + pub data: Vec, +} + +/// This is a trait that all models must implement. +impl Model for ByteObject {} + +#[async_trait] +impl SimpleCrud for ByteObject { + /// Upserts a thing into the database. + async fn upsert(&self, pool: &PgPool, schema: &str) -> Result { + let query = format!( + r#" + INSERT INTO {}.byte_object (id, data) + VALUES ($1, $2) + ON CONFLICT (id) DO UPDATE SET + data = EXCLUDED.data + RETURNING id, data + "#, + schema, + ); + + sqlx::query_as::<_, ByteObject>(&query) + .bind(self.id.to_big_decimal()?) + .bind(&self.data[..]) + .fetch_one(pool) + .await + .map_err(|e| ModelError::InsertError(e.to_string())) + } + + /// Finds a thing by its id. + async fn find_by_id( + id: U256Wrapper, + pool: &PgPool, + schema: &str, + ) -> Result, ModelError> { + let query = format!( + r#" + SELECT id, + data + FROM {}.byte_object + WHERE id = $1 + "#, + schema + ); + + sqlx::query_as::<_, ByteObject>(&query) + .bind(id.to_big_decimal()?) + .fetch_optional(pool) + .await + .map_err(|e| ModelError::QueryError(e.to_string())) + } +} diff --git a/models/src/json_object.rs b/models/src/json_object.rs new file mode 100644 index 00000000..a84bd275 --- /dev/null +++ b/models/src/json_object.rs @@ -0,0 +1,67 @@ +use crate::{ + error::ModelError, + traits::{Model, SimpleCrud}, + types::U256Wrapper, +}; +use async_trait::async_trait; +use serde_json::Value; +use sqlx::PgPool; + +/// Thing is a struct that represents a thing in the database. +#[derive(Debug, sqlx::FromRow, Builder)] +#[sqlx(type_name = "json_object")] +pub struct JsonObject { + pub id: U256Wrapper, + pub data: Value, +} + +/// This is a trait that all models must implement. +impl Model for JsonObject {} + +#[async_trait] +impl SimpleCrud for JsonObject { + /// Upserts a thing into the database. + async fn upsert(&self, pool: &PgPool, schema: &str) -> Result { + let query = format!( + r#" + INSERT INTO {}.json_object (id, data) + VALUES ($1, $2) + ON CONFLICT (id) DO UPDATE SET + data = EXCLUDED.data + RETURNING id, + data + "#, + schema, + ); + + sqlx::query_as::<_, JsonObject>(&query) + .bind(self.id.to_big_decimal()?) + .bind(self.data.clone()) + .fetch_one(pool) + .await + .map_err(|e| ModelError::InsertError(e.to_string())) + } + + /// Finds a thing by its id. + async fn find_by_id( + id: U256Wrapper, + pool: &PgPool, + schema: &str, + ) -> Result, ModelError> { + let query = format!( + r#" + SELECT id, + data + FROM {}.json_object + WHERE id = $1 + "#, + schema + ); + + sqlx::query_as::<_, JsonObject>(&query) + .bind(id.to_big_decimal()?) + .fetch_optional(pool) + .await + .map_err(|e| ModelError::QueryError(e.to_string())) + } +} diff --git a/models/src/lib.rs b/models/src/lib.rs index 751c7dfc..022a145a 100644 --- a/models/src/lib.rs +++ b/models/src/lib.rs @@ -2,6 +2,7 @@ pub mod account; pub mod atom; pub mod atom_value; pub mod book; +pub mod byte_object; pub mod cached_image; pub mod caip10; pub mod claim; @@ -9,6 +10,7 @@ pub mod deposit; pub mod error; pub mod event; pub mod fee_transfer; +pub mod json_object; pub mod organization; pub mod person; pub mod position; @@ -20,11 +22,11 @@ pub mod stats; pub mod stats_hour; pub mod substreams_cursor; pub mod test_helpers; +pub mod text_object; pub mod thing; pub mod traits; pub mod triple; pub mod types; pub mod vault; - #[macro_use] extern crate macon; diff --git a/models/src/text_object.rs b/models/src/text_object.rs new file mode 100644 index 00000000..c7d3dc05 --- /dev/null +++ b/models/src/text_object.rs @@ -0,0 +1,66 @@ +use crate::{ + error::ModelError, + traits::{Model, SimpleCrud}, + types::U256Wrapper, +}; +use async_trait::async_trait; +use sqlx::PgPool; + +/// Thing is a struct that represents a thing in the database. +#[derive(Debug, sqlx::FromRow, Builder)] +#[sqlx(type_name = "text_object")] +pub struct TextObject { + pub id: U256Wrapper, + pub data: String, +} + +/// This is a trait that all models must implement. +impl Model for TextObject {} + +#[async_trait] +impl SimpleCrud for TextObject { + /// Upserts a thing into the database. + async fn upsert(&self, pool: &PgPool, schema: &str) -> Result { + let query = format!( + r#" + INSERT INTO {}.text_object (id, data) + VALUES ($1, $2) + ON CONFLICT (id) DO UPDATE SET + data = EXCLUDED.data + RETURNING id, + data + "#, + schema, + ); + + sqlx::query_as::<_, TextObject>(&query) + .bind(self.id.to_big_decimal()?) + .bind(self.data.clone()) + .fetch_one(pool) + .await + .map_err(|e| ModelError::InsertError(e.to_string())) + } + + /// Finds a thing by its id. + async fn find_by_id( + id: U256Wrapper, + pool: &PgPool, + schema: &str, + ) -> Result, ModelError> { + let query = format!( + r#" + SELECT id, + data + FROM {}.text_object + WHERE id = $1 + "#, + schema + ); + + sqlx::query_as::<_, TextObject>(&query) + .bind(id.to_big_decimal()?) + .fetch_optional(pool) + .await + .map_err(|e| ModelError::QueryError(e.to_string())) + } +} diff --git a/models/tests/test_byte_object_crud.rs b/models/tests/test_byte_object_crud.rs new file mode 100644 index 00000000..65242833 --- /dev/null +++ b/models/tests/test_byte_object_crud.rs @@ -0,0 +1,61 @@ +// This test is going to be removed once we have a proper testing crate +#[cfg(test)] +mod tests { + use bytes::Bytes; + use models::{ + byte_object::ByteObject, + test_helpers::{create_random_u256wrapper, setup_test_db, TEST_SCHEMA}, + traits::SimpleCrud, + }; + + #[tokio::test] + async fn test_byte_object_upsert_and_find_by_id() { + let pool = setup_test_db().await; + let id = create_random_u256wrapper(); + + // Simulate reqwest::Response::bytes() output + let data = Bytes::from(vec![1, 2, 3, 4, 5]); + let byte_object = ByteObject { + id: id.clone(), + data: data.to_vec(), // Convert Bytes to Vec here + }; + + // Upsert the object + byte_object.upsert(&pool, TEST_SCHEMA).await.unwrap(); + + // Debug: Direct DB query + let raw_data: (Vec,) = sqlx::query_as(&format!( + "SELECT data FROM {}.byte_object WHERE id = $1 AND data = $2", + TEST_SCHEMA + )) + .bind(id.to_big_decimal().unwrap()) + .bind(&data[..]) + .fetch_one(&pool) + .await + .unwrap(); + + assert_eq!(raw_data.0, data); + + // Rest of the test... + let fetched = ByteObject::find_by_id(id.clone(), &pool, TEST_SCHEMA) + .await + .unwrap() + .unwrap(); + assert_eq!(fetched.data, data); + + // Modify and upsert again + let modified_data = Bytes::from(&[6, 7, 8, 9, 10, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0][..]); + let modified_object = ByteObject { + id: id.clone(), + data: modified_data.to_vec(), + }; + modified_object.upsert(&pool, TEST_SCHEMA).await.unwrap(); + + // Fetch and verify modified state + let fetched_modified = ByteObject::find_by_id(id.clone(), &pool, TEST_SCHEMA) + .await + .unwrap() + .unwrap(); + assert_eq!(fetched_modified.data, modified_data); + } +} diff --git a/models/tests/test_json_object_crud.rs b/models/tests/test_json_object_crud.rs new file mode 100644 index 00000000..b95f5061 --- /dev/null +++ b/models/tests/test_json_object_crud.rs @@ -0,0 +1,45 @@ +// This test is going to be removed once we have a proper testing crate +#[cfg(test)] +mod tests { + use models::{ + json_object::JsonObject, + test_helpers::{create_random_u256wrapper, setup_test_db, TEST_SCHEMA}, + traits::SimpleCrud, + }; + + #[tokio::test] + async fn test_json_object_upsert_and_find_by_id() { + let pool = setup_test_db().await; + let id = create_random_u256wrapper(); + let data = serde_json::json!({ "test": "test" }); + let json_object = JsonObject { + id: id.clone(), + data, + }; + + // Upsert the object + json_object.upsert(&pool, TEST_SCHEMA).await.unwrap(); + + // Fetch and verify initial state + let fetched = JsonObject::find_by_id(id.clone(), &pool, TEST_SCHEMA) + .await + .unwrap() + .unwrap(); + assert_eq!(fetched.data, json_object.data); + + // Modify and upsert again + let modified_data = serde_json::json!({ "test": "modified" }); + let modified_object = JsonObject { + id: id.clone(), + data: modified_data.clone(), + }; + modified_object.upsert(&pool, TEST_SCHEMA).await.unwrap(); + + // Fetch and verify modified state + let fetched_modified = JsonObject::find_by_id(id.clone(), &pool, TEST_SCHEMA) + .await + .unwrap() + .unwrap(); + assert_eq!(fetched_modified.data, modified_data); + } +} diff --git a/models/tests/test_text_object_crud.rs b/models/tests/test_text_object_crud.rs new file mode 100644 index 00000000..b94ad3d9 --- /dev/null +++ b/models/tests/test_text_object_crud.rs @@ -0,0 +1,45 @@ +// This test is going to be removed once we have a proper testing crate +#[cfg(test)] +mod tests { + use models::{ + test_helpers::{create_random_u256wrapper, setup_test_db, TEST_SCHEMA}, + text_object::TextObject, + traits::SimpleCrud, + }; + + #[tokio::test] + async fn test_text_object_upsert_and_find_by_id() { + let pool = setup_test_db().await; + let id = create_random_u256wrapper(); + let data = "test".to_string(); + let text_object = TextObject { + id: id.clone(), + data, + }; + + // Upsert the object + text_object.upsert(&pool, TEST_SCHEMA).await.unwrap(); + + // Fetch and verify initial state + let fetched = TextObject::find_by_id(id.clone(), &pool, TEST_SCHEMA) + .await + .unwrap() + .unwrap(); + assert_eq!(fetched.data, text_object.data); + + // Modify and upsert again + let modified_data = "modified".to_string(); + let modified_object = TextObject { + id: id.clone(), + data: modified_data.clone(), + }; + modified_object.upsert(&pool, TEST_SCHEMA).await.unwrap(); + + // Fetch and verify modified state + let fetched_modified = TextObject::find_by_id(id.clone(), &pool, TEST_SCHEMA) + .await + .unwrap() + .unwrap(); + assert_eq!(fetched_modified.data, modified_data); + } +} diff --git a/shared-utils/Cargo.toml b/shared-utils/Cargo.toml index e81aea5c..425f1b17 100644 --- a/shared-utils/Cargo.toml +++ b/shared-utils/Cargo.toml @@ -4,7 +4,8 @@ version = "0.1.0" edition = "2021" [dependencies] -bytes = "1.8.0" +bytes.workspace = true +http = "1.2.0" log.workspace = true macon.workspace = true models = { path = "../models" } @@ -14,5 +15,6 @@ serde_json.workspace = true sqlx.workspace = true thiserror.workspace = true tokio.workspace = true +tracing = "0.1" utoipa.workspace = true diff --git a/shared-utils/src/ipfs.rs b/shared-utils/src/ipfs.rs index 38130de8..fa7a0f09 100644 --- a/shared-utils/src/ipfs.rs +++ b/shared-utils/src/ipfs.rs @@ -1,5 +1,4 @@ use crate::{error::LibError, types::MultiPartHandler}; -use log::warn; use macon::Builder; use reqwest::{ multipart::{Form, Part}, @@ -8,6 +7,7 @@ use reqwest::{ use serde::Deserialize; use std::time::Duration; use tokio::time::sleep; +use tracing::warn; /// The base delays for the retry mechanism and timeouts pub const BASE_DELAY: Duration = Duration::from_secs(1); @@ -58,7 +58,7 @@ impl IPFSResolver { } /// Fetches a file and returns its content as a string from IPFS /// using the configured gateway. - pub async fn fetch_from_ipfs(&self, cid: &str) -> Result { + pub async fn fetch_from_ipfs(&self, cid: &str) -> Result { let mut attempts = 0; let response = loop { @@ -94,17 +94,12 @@ impl IPFSResolver { } /// Fetches a file from IPFS using the configured gateway. - async fn fetch_from_ipfs_request(&self, cid: &str) -> Result { + async fn fetch_from_ipfs_request(&self, cid: &str) -> Result { let nodes = self.get_ipfs_nodes(); for (node, pinata_token) in nodes { match self.try_fetch_from_node(cid, node, &pinata_token).await { Ok(resp) => { - let body = resp.text().await.unwrap_or_default(); - if body.contains("resource does not exist") { - warn!("Resource not found in {}", node); - continue; - } - return Ok(body); + return Ok(resp); } Err(e) => warn!("IPFS fetch from {} failed: {}", node, e), } @@ -121,12 +116,30 @@ impl IPFSResolver { node: &str, token: &Option, ) -> Result { + let url = self.format_ipfs_fetch_url(cid, node, token); + + // Debug log the URL being called + tracing::debug!("Attempting to fetch from URL: {}", url); + self.http_client - .get(self.format_ipfs_fetch_url(cid, node, token)) + .get(&url) .timeout(self.fetch_timeout.unwrap_or(FETCH_TIMEOUT)) .send() .await - .map_err(|e| e.into()) + .map_err(|e| { + // Log detailed error info + tracing::error!("Request failed: {:#?}", e); + if e.is_timeout() { + tracing::error!("Request timed out"); + } + if e.is_connect() { + tracing::error!("Connection error"); + } + if e.is_request() { + tracing::error!("Invalid request"); + } + e.into() + }) } /// Formats the URL to add a remote pin to Pinata