Skip to content

Commit

Permalink
Luis/eng 5685 adapt ipfs data fetching to encompass new types (#55)
Browse files Browse the repository at this point in the history
* RPC hardening for histocrawler and rpc-proxy

* Last fixes

* Last updates on deployment scripts

* Hardening consumers

* Latest fixes on consumers and RPC

* Adding Pinata fallback to IPFS fetching

* Adding aquamarine IPFS pin option

* Fixing aquamarine URL

* latest fixes

* Adapting IPFS fetching

* Fixing atomValue and creating models and tests for new types

* Fixing issue that classify ipfs atom data as text

* Fixing deposit and refactoring

* Latest fixes

* Updating emojis

* Reverting changes on ipfs docker compose

* Fixing IPFS docker compose
  • Loading branch information
leboiko authored Jan 31, 2025
1 parent 3e3e1e5 commit 3371989
Show file tree
Hide file tree
Showing 22 changed files with 887 additions and 225 deletions.
8 changes: 6 additions & 2 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ async-trait = "0.1.83"
aws-config = { version = "1.0.1", features = ["behavior-version-latest"] }
aws-sdk-sqs = { version = "1.3.0" }
aws-smithy-runtime-api = "1.7.0"
bytes = "1.8.0"
chrono = { version = "0.4.35", features = ["serde"] }
clap = { version = "4.5.20", features = ["derive"] }
dotenvy = "0.15.7"
Expand Down
1 change: 1 addition & 0 deletions consumer/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ aws-config.workspace = true
aws-sdk-s3 = "1.20.0"
aws-sdk-sqs.workspace = true
aws-smithy-runtime-api.workspace = true
bytes.workspace = true
dotenvy.workspace = true
envy.workspace = true
futures = "0.3.31"
Expand Down
4 changes: 4 additions & 0 deletions consumer/src/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,8 @@ pub enum ConsumerError {
aws_smithy_runtime_api::http::Response,
>,
),
#[error("ByteObject error")]
ByteObjectError(String),
#[error("Deposited error")]
Deposited(String),
#[error("Failed to delete claim: {0}")]
Expand All @@ -81,6 +83,8 @@ pub enum ConsumerError {
Envy(#[from] envy::Error),
#[error("Failed to resolve ENS data: {0}")]
Ens(String),
#[error("Failed to get bytes from IPFS response")]
FailedToGetBytes,
#[error(transparent)]
Hex(#[from] hex::FromHexError),
#[error(transparent)]
Expand Down
16 changes: 7 additions & 9 deletions consumer/src/mode/decoded/atom/atom_creation.rs
Original file line number Diff line number Diff line change
Expand Up @@ -197,9 +197,15 @@ impl AtomCreated {
.decode_atom_data_and_update_atom(&mut atom, decoded_consumer_context)
.await?;

// get the supported atom metadata
// get the supported atom metadata and update the atom metadata
let supported_atom_metadata =
get_supported_atom_metadata(&mut atom, &decoded_atom_data, decoded_consumer_context)
.await?
.update_atom_metadata(
&mut atom,
&decoded_consumer_context.pg_pool,
&decoded_consumer_context.backend_schema,
)
.await?;

// Handle the account or caip10 type
Expand All @@ -208,14 +214,6 @@ impl AtomCreated {
.handle_account_or_caip10_type(&resolved_atom, decoded_consumer_context)
.await?;

// Update the atom metadata to reflect the supported atom type
supported_atom_metadata
.update_atom_metadata(
&mut atom,
&decoded_consumer_context.pg_pool,
&decoded_consumer_context.backend_schema,
)
.await?;
// Create the event
self.create_event(decoded_message, decoded_consumer_context)
.await?;
Expand Down
45 changes: 41 additions & 4 deletions consumer/src/mode/decoded/atom/atom_supported_types.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ use crate::{
mode::{
decoded::utils::{short_id, update_account_with_atom_id},
resolver::{
atom_resolver::{try_to_parse_json, try_to_resolve_schema_org_url},
atom_resolver::{try_to_parse_json_or_text, try_to_resolve_schema_org_url},
types::{ResolveAtom, ResolverConsumerMessage},
},
types::DecodedConsumerContext,
Expand Down Expand Up @@ -51,6 +51,16 @@ impl AtomMetadata {
}
}

/// Creates a new atom metadata for a byte object
pub fn byte_object(image: Option<String>) -> Self {
Self {
label: "byte object".to_string(),
emoji: "🔢".to_string(),
atom_type: "ByteObject".to_string(),
image,
}
}

/// Creates a new atom metadata for a caip10
pub fn caip10(caip10: String) -> Self {
Self {
Expand Down Expand Up @@ -137,6 +147,17 @@ impl AtomMetadata {
}
}
}

/// Creates a new atom metadata for a json object
pub fn json_object(image: Option<String>) -> Self {
Self {
label: "json object".to_string(),
emoji: "📦".to_string(),
atom_type: "JsonObject".to_string(),
image,
}
}

/// Creates a new atom metadata for a keywords predicate
pub fn keywords_predicate(image: Option<String>) -> Self {
Self {
Expand Down Expand Up @@ -197,6 +218,16 @@ impl AtomMetadata {
}
}

/// Creates a new atom metadata for a text object
pub fn text_object(image: Option<String>) -> Self {
Self {
label: "text object".to_string(),
emoji: "📝".to_string(),
atom_type: "TextObject".to_string(),
image,
}
}

/// Creates a new atom metadata for a thing
pub fn thing(name: String, image: Option<String>) -> Self {
Self {
Expand Down Expand Up @@ -281,13 +312,18 @@ impl AtomMetadata {
atom: &mut Atom,
pg_pool: &PgPool,
backend_schema: &str,
) -> Result<(), ConsumerError> {
) -> Result<AtomMetadata, ConsumerError> {
atom.emoji = Some(self.emoji.clone());
atom.atom_type = AtomType::from_str(&self.atom_type)?;
atom.label = Some(self.label.clone());
atom.image = self.image.clone();
atom.upsert(pg_pool, backend_schema).await?;
Ok(())
Ok(AtomMetadata {
label: self.label.clone(),
emoji: self.emoji.clone(),
atom_type: self.atom_type.clone(),
image: self.image.clone(),
})
}
}

Expand Down Expand Up @@ -393,7 +429,8 @@ pub async fn get_supported_atom_metadata(

// 5. Now we try to parse the JSON and return the metadata. At this point
// the resolver will handle the rest of the cases.
let metadata = try_to_parse_json(decoded_atom_data, atom, decoded_consumer_context).await?;
let metadata =
try_to_parse_json_or_text(decoded_atom_data, atom, decoded_consumer_context).await?;

Ok(metadata)
}
Expand Down
8 changes: 6 additions & 2 deletions consumer/src/mode/decoded/deposited.rs
Original file line number Diff line number Diff line change
Expand Up @@ -128,12 +128,14 @@ impl Deposited {
&self,
event: &DecodedMessage,
decoded_consumer_context: &DecodedConsumerContext,
deposit_id: String,
) -> Result<Event, ConsumerError> {
// Create the event
let event = if self.isTriple {
Event::builder()
.id(DecodedMessage::event_id(event))
.event_type(EventType::Deposited)
.deposit_id(deposit_id)
.block_number(U256Wrapper::try_from(event.block_number)?)
.block_timestamp(event.block_timestamp)
.transaction_hash(event.transaction_hash.clone())
Expand All @@ -143,6 +145,7 @@ impl Deposited {
Event::builder()
.id(DecodedMessage::event_id(event))
.event_type(EventType::Deposited)
.deposit_id(deposit_id)
.block_number(U256Wrapper::try_from(event.block_number)?)
.block_timestamp(event.block_timestamp)
.transaction_hash(event.transaction_hash.clone())
Expand Down Expand Up @@ -326,14 +329,15 @@ impl Deposited {
)?;

// Create deposit record
self.create_deposit(event, decoded_consumer_context).await?;
let deposit = self.create_deposit(event, decoded_consumer_context).await?;

// Handle position and related entities
self.handle_position_and_claims(decoded_consumer_context, &vault)
.await?;

// Create event
self.create_event(event, decoded_consumer_context).await?;
self.create_event(event, decoded_consumer_context, deposit.id)
.await?;

// Create signal
self.create_signal(decoded_consumer_context, event, &vault)
Expand Down
Loading

0 comments on commit 3371989

Please sign in to comment.