Skip to content

Commit

Permalink
GRPC entrypoint to download a blob (#2044)
Browse files Browse the repository at this point in the history
## Motivation

We need to be able to download blobs directly from the Proxy as part of the user blobs work. Fixes #2028

## Proposal

This PR does a few things:
* Adds a storage connection to the Proxy
* Adds an entrypoint for downloading a blob given a blob ID

## Test Plan

WIP
  • Loading branch information
Andre da Silva authored May 28, 2024
1 parent 40d0dad commit 7eaa917
Show file tree
Hide file tree
Showing 22 changed files with 466 additions and 92 deletions.
1 change: 1 addition & 0 deletions docker/Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -110,4 +110,5 @@ COPY --from=binaries \
COPY --chmod=755 \
docker/server-entrypoint.sh \
docker/server-init.sh \
docker/proxy-init.sh \
./
19 changes: 19 additions & 0 deletions docker/proxy-init.sh
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
#!/bin/sh

while true; do
./linera-db check_existence --storage "scylladb:tcp:scylla-client.scylla.svc.cluster.local:9042"
status=$?

if [ $status -eq 0 ]; then
echo "Database already exists, no need to initialize."
exit 0
else
# We rely on the shards to initialize the database, so just wait here
if [ $status -eq 1 ]; then
echo "Database does not exist, retrying in 5 seconds..."
else
echo "An unexpected error occurred (status: $status), retrying in 5 seconds..."
fi
sleep 5
fi
done
23 changes: 20 additions & 3 deletions kubernetes/linera-validator/templates/proxy.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@ spec:
type: NodePort

---

apiVersion: v1
kind: Service
metadata:
Expand All @@ -36,7 +35,6 @@ spec:
type: NodePort

---

apiVersion: apps/v1
kind: Deployment
metadata:
Expand All @@ -53,6 +51,16 @@ spec:
spec:
serviceAccountName: linera-admin
terminationGracePeriodSeconds: 10
initContainers:
- name: linera-proxy-initializer
image: {{ .Values.lineraImage }}
imagePullPolicy: {{ .Values.lineraImagePullPolicy }}
command: ["./proxy-init.sh"]
env:
- name: RUST_LOG
value: {{ .Values.logLevel }}
- name: RUST_BACKTRACE
value: "1"
containers:
- name: linera-proxy
imagePullPolicy: {{ .Values.lineraImagePullPolicy }}
Expand All @@ -62,7 +70,14 @@ spec:
name: linera-port
- containerPort: 20100
name: linera-port-int
command: ["./linera-proxy"]
command:
[
"./linera-proxy",
"--storage",
"scylladb:tcp:scylla-client.scylla.svc.cluster.local:9042",
"--genesis",
"/config/genesis.json",
]
args: ["/config/server.json"]
env:
- name: RUST_LOG
Expand Down Expand Up @@ -90,3 +105,5 @@ spec:
items:
- key: serverConfig
path: server.json
- key: genesisConfig
path: genesis.json
13 changes: 12 additions & 1 deletion linera-base/src/data_types.rs
Original file line number Diff line number Diff line change
Expand Up @@ -737,8 +737,9 @@ impl std::str::FromStr for OracleResponse {
/// A blob of binary data.
#[derive(Eq, PartialEq, Debug, Hash, Clone, Serialize, Deserialize)]
pub struct Blob {
/// Bytes of the binary blob.
#[serde(with = "serde_bytes")]
bytes: Vec<u8>,
pub bytes: Vec<u8>,
}

impl Blob {
Expand Down Expand Up @@ -794,6 +795,16 @@ impl HashedBlob {
};
blob.into_hashed()
}

/// Returns a reference to the inner `Blob`, without the hash.
pub fn blob(&self) -> &Blob {
&self.blob
}

/// Moves ownership of the blob of binary data
pub fn into_inner(self) -> Blob {
self.blob
}
}

impl Serialize for HashedBlob {
Expand Down
4 changes: 3 additions & 1 deletion linera-core/src/node.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
use futures::stream::{BoxStream, LocalBoxStream, Stream};
use linera_base::{
crypto::CryptoError,
data_types::{ArithmeticError, BlockHeight, HashedBlob},
data_types::{ArithmeticError, Blob, BlockHeight, HashedBlob},
identifiers::{BlobId, ChainId},
};
use linera_chain::{
Expand Down Expand Up @@ -80,6 +80,8 @@ pub trait LocalValidatorNode {
&mut self,
chains: Vec<ChainId>,
) -> Result<Self::NotificationStream, NodeError>;

async fn download_blob(&mut self, blob_id: BlobId) -> Result<Blob, NodeError>;
}

/// Turn an address into a validator node.
Expand Down
22 changes: 21 additions & 1 deletion linera-core/src/unit_tests/test_utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ use futures::{lock::Mutex, Future};
use linera_base::{
crypto::*,
data_types::*,
identifiers::{ChainDescription, ChainId},
identifiers::{BlobId, ChainDescription, ChainId},
};
use linera_chain::data_types::{
BlockProposal, Certificate, HashedCertificateValue, LiteCertificate,
Expand Down Expand Up @@ -158,6 +158,11 @@ where
async fn get_version_info(&mut self) -> Result<VersionInfo, NodeError> {
Ok(Default::default())
}

async fn download_blob(&mut self, blob_id: BlobId) -> Result<Blob, NodeError> {
self.spawn_and_receive(move |validator, sender| validator.do_download_blob(blob_id, sender))
.await
}
}

impl<S> LocalValidatorClient<S>
Expand Down Expand Up @@ -324,6 +329,21 @@ where
let stream: NotificationStream = Box::pin(UnboundedReceiverStream::new(rx));
sender.send(Ok(stream))
}

async fn do_download_blob(
self,
blob_id: BlobId,
sender: oneshot::Sender<Result<Blob, NodeError>>,
) -> Result<(), Result<Blob, NodeError>> {
let validator = self.client.lock().await;
let hashed_blob = validator
.state
.storage_client()
.read_hashed_blob(blob_id)
.await
.map_err(Into::into);
sender.send(hashed_blob.map(|hashed_blob| hashed_blob.blob().clone()))
}
}

#[derive(Clone)]
Expand Down
19 changes: 16 additions & 3 deletions linera-rpc/proto/rpc.proto
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,9 @@ service ValidatorNode {

// Request the node's version info.
rpc GetVersionInfo(google.protobuf.Empty) returns (VersionInfo);

// Downloads a blob.
rpc DownloadBlob(BlobId) returns (Blob);
}

// Information about the Linera crate version the validator is running
Expand Down Expand Up @@ -147,7 +150,7 @@ message ChainInfoQuery {

// Request a signed vote for fallback mode.
bool request_fallback = 11;

// Query a value that contains a binary blob (e.g. bytecode) required by this chain.
optional bytes request_blob = 12;
}
Expand All @@ -171,7 +174,7 @@ message BlockProposal {

// A certificate for a validated block that justifies the proposal in this round.
optional bytes validated = 6;

// Required blob
bytes blobs = 7;
}
Expand Down Expand Up @@ -216,7 +219,7 @@ message Certificate {
// Wait until all outgoing cross-chain messages from this certificate have
// been received by the target chains.
bool wait_for_outgoing_messages = 6;

// Blobs required by this certificate
bytes blobs = 7;
}
Expand All @@ -237,6 +240,16 @@ message Signature {
bytes bytes = 1;
}

// A content-addressed blob ID i.e. the hash of the Blob.
message BlobId {
bytes bytes = 1;
}

// A blob of binary data.
message Blob {
bytes bytes = 1;
}

// Response to `ChainInfoQuery`
message ChainInfoResponse {
// bincode-encoded chain info
Expand Down
14 changes: 13 additions & 1 deletion linera-rpc/src/client.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,10 @@
// Copyright (c) Zefchain Labs, Inc.
// SPDX-License-Identifier: Apache-2.0

use linera_base::{data_types::HashedBlob, identifiers::ChainId};
use linera_base::{
data_types::{Blob, HashedBlob},
identifiers::{BlobId, ChainId},
};
use linera_chain::data_types::{
BlockProposal, Certificate, HashedCertificateValue, LiteCertificate,
};
Expand Down Expand Up @@ -141,4 +144,13 @@ impl ValidatorNode for Client {
Client::Simple(simple_client) => simple_client.get_version_info().await?,
})
}

async fn download_blob(&mut self, blob_id: BlobId) -> Result<Blob, NodeError> {
Ok(match self {
Client::Grpc(grpc_client) => grpc_client.download_blob(blob_id).await?,

#[cfg(with_simple_network)]
Client::Simple(simple_client) => simple_client.download_blob(blob_id).await?,
})
}
}
18 changes: 16 additions & 2 deletions linera-rpc/src/grpc/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,10 @@
use std::{iter, time::Duration};

use futures::{future, stream, StreamExt};
use linera_base::{data_types::HashedBlob, identifiers::ChainId};
use linera_base::{
data_types::{Blob, HashedBlob},
identifiers::{BlobId, ChainId},
};
use linera_chain::data_types;
#[cfg(web)]
use linera_core::node::{
Expand All @@ -26,7 +29,8 @@ use {

use super::{
api::{
chain_info_result::Inner, validator_node_client::ValidatorNodeClient, SubscriptionRequest,
self, chain_info_result::Inner, validator_node_client::ValidatorNodeClient,
SubscriptionRequest,
},
transport, GrpcError, GRPC_MAX_MESSAGE_SIZE,
};
Expand Down Expand Up @@ -260,6 +264,16 @@ impl ValidatorNode for GrpcClient {
async fn get_version_info(&mut self) -> Result<VersionInfo, NodeError> {
Ok(self.client.get_version_info(()).await?.into_inner().into())
}

#[instrument(target = "grpc_client", skip_all, err, fields(address = self.address))]
async fn download_blob(&mut self, blob_id: BlobId) -> Result<Blob, NodeError> {
Ok(self
.client
.download_blob(<BlobId as Into<api::BlobId>>::into(blob_id))
.await?
.into_inner()
.into())
}
}

#[cfg(not(web))]
Expand Down
32 changes: 30 additions & 2 deletions linera-rpc/src/grpc/conversions.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,9 +3,9 @@

use linera_base::{
crypto::{CryptoError, CryptoHash, PublicKey, Signature},
data_types::BlockHeight,
data_types::{Blob, BlockHeight},
ensure,
identifiers::{ChainId, Owner},
identifiers::{BlobId, ChainId, Owner},
};
use linera_chain::data_types::{
BlockAndRound, BlockProposal, Certificate, HashedCertificateValue, LiteCertificate, LiteValue,
Expand Down Expand Up @@ -530,6 +530,34 @@ impl TryFrom<api::Owner> for Owner {
}
}

impl TryFrom<api::BlobId> for BlobId {
type Error = GrpcProtoConversionError;

fn try_from(blob_id: api::BlobId) -> Result<Self, Self::Error> {
Ok(Self(CryptoHash::try_from(blob_id.bytes.as_slice())?))
}
}

impl From<BlobId> for api::BlobId {
fn from(blob_id: BlobId) -> Self {
Self {
bytes: blob_id.0.as_bytes().to_vec(),
}
}
}

impl From<Blob> for api::Blob {
fn from(blob: Blob) -> Self {
Self { bytes: blob.bytes }
}
}

impl From<api::Blob> for Blob {
fn from(blob: api::Blob) -> Self {
Self { bytes: blob.bytes }
}
}

#[cfg(test)]
pub mod tests {
use std::{borrow::Cow, fmt::Debug};
Expand Down
Loading

0 comments on commit 7eaa917

Please sign in to comment.