Skip to content

Commit 0c9bc79

Browse files
Callum-AJarema
authored andcommitted
Feat : Get or create Key Value store
1 parent 23bb978 commit 0c9bc79

File tree

2 files changed

+91
-0
lines changed

2 files changed

+91
-0
lines changed

async-nats/src/jetstream/context.rs

+40
Original file line numberDiff line numberDiff line change
@@ -821,6 +821,46 @@ impl Context {
821821
Ok(store)
822822
}
823823

824+
/// Tries to get an existing key-value bucket, if one cannot be found it will create a new key-value bucket.
825+
///
826+
/// Note: This does not validate if the key-value on the server is compatible with the configuration passed in.
827+
///
828+
/// # Examples
829+
///
830+
/// ```no_run
831+
/// # #[tokio::main]
832+
/// # async fn main() -> Result<(), async_nats::Error> {
833+
/// let client = async_nats::connect("demo.nats.io:4222").await?;
834+
/// let jetstream = async_nats::jetstream::new(client);
835+
/// let kv = jetstream
836+
/// .get_or_create_key_value("kv".to_string(), async_nats::jetstream::kv::Config {
837+
/// bucket: "kv".to_string(),
838+
/// history: 10,
839+
/// ..Default::default()
840+
/// })
841+
/// .await?;
842+
/// # Ok(())
843+
/// # }
844+
/// ```
845+
pub async fn get_or_create_key_value<T: Into<String>>(
846+
&self,
847+
bucket: T,
848+
config: crate::jetstream::kv::Config,
849+
) -> Result<Store, CreateKeyValueError> {
850+
match self.get_key_value(bucket).await {
851+
Ok(kv) => Ok(kv),
852+
Err(e) => match e.kind() {
853+
KeyValueErrorKind::GetBucket => self.create_key_value(config).await,
854+
KeyValueErrorKind::InvalidStoreName => Err(CreateKeyValueError::new(
855+
CreateKeyValueErrorKind::InvalidStoreName,
856+
)),
857+
KeyValueErrorKind::JetStream => {
858+
Err(CreateKeyValueError::new(CreateKeyValueErrorKind::JetStream))
859+
}
860+
},
861+
}
862+
}
863+
824864
/// Deletes given key-value bucket.
825865
///
826866
/// # Examples

async-nats/tests/kv_tests.rs

+51
Original file line numberDiff line numberDiff line change
@@ -52,6 +52,57 @@ mod kv {
5252
assert!(info.config.allow_direct);
5353
}
5454

55+
#[tokio::test]
56+
async fn get_or_create_bucket() {
57+
let server = nats_server::run_server("tests/configs/jetstream.conf");
58+
let client = ConnectOptions::new()
59+
.event_callback(|event| async move { println!("event: {event:?}") })
60+
.connect(server.client_url())
61+
.await
62+
.unwrap();
63+
64+
let context = async_nats::jetstream::new(client);
65+
66+
let mut kv = context
67+
.get_or_create_key_value(
68+
"test",
69+
async_nats::jetstream::kv::Config {
70+
bucket: "test".into(),
71+
description: "test_description".into(),
72+
history: 10,
73+
storage: StorageType::File,
74+
num_replicas: 1,
75+
..Default::default()
76+
},
77+
)
78+
.await
79+
.unwrap();
80+
let info = kv.stream.info().await.unwrap();
81+
assert_eq!("KV_test", kv.stream_name);
82+
assert_eq!(info.config.discard, DiscardPolicy::New);
83+
assert!(info.config.allow_direct);
84+
85+
let mut kv2 = context
86+
.get_or_create_key_value(
87+
"test",
88+
async_nats::jetstream::kv::Config {
89+
bucket: "test".into(),
90+
description: "test_description".into(),
91+
history: 10,
92+
storage: StorageType::File,
93+
num_replicas: 1,
94+
..Default::default()
95+
},
96+
)
97+
.await
98+
.unwrap();
99+
100+
let info2 = kv2.stream.info().await.unwrap();
101+
assert_eq!(kv2.stream_name, kv.stream_name);
102+
assert_eq!(info2.config.discard, info.config.discard);
103+
assert_eq!(info2.config.allow_direct, info.config.allow_direct);
104+
}
105+
55106
#[tokio::test]
56107
async fn create() {
57108
let server = nats_server::run_server("tests/configs/jetstream.conf");

0 commit comments

Comments
 (0)