Skip to content

Commit 1283c5d

Browse files
authored
improve python examples (#328)
1 parent c522df3 commit 1283c5d

File tree

6 files changed

+375
-321
lines changed

6 files changed

+375
-321
lines changed

docs/fluvio/apis/python/example.mdx

Lines changed: 57 additions & 107 deletions
Original file line numberDiff line numberDiff line change
@@ -5,141 +5,91 @@ sidebar_position: 20
55

66
# Python SDK Examples
77

8-
* The Python client [wraps the rust client](https://www.infinyon.com/blog/2021/03/python-client/).
9-
* It currently does not support the administrator features that the rust client does.
10-
* The [PartitionConsumer.stream](https://infinyon.github.io/fluvio-client-python/fluvio.html#PartitionConsumer.stream) returns an object which implements the [python iterator convention](https://wiki.python.org/moin/Iterator) to allow for iterating over the stream in a for-loop.
8+
The Fluvio Python module provides an extension for working with the Fluvio streaming platform.
119

12-
To see the full docs, visit our [pdoc page](https://infinyon.github.io/fluvio-client-python/fluvio.html).
10+
This module builds on top of the Fluvio Client Rust Crate and provides a pythonic access to the API.
1311

14-
## Examples
1512

16-
### Producer
13+
Creating a topic with default settings is as simple as:
1714

18-
Create the topic used to produce and consume records:
19-
20-
```bash copy="cmd"
21-
$ fluvio topic create python-data
15+
```python
16+
fluvio_admin = FluvioAdmin.connect()
17+
fluvio_admin.create_topic("a_topic")
2218
```
2319

24-
Create a file called `python-produce.py`:
20+
Or just create a topic with custom settings:
2521

2622
```python
27-
#!/usr/bin/env python
28-
from datetime import datetime
29-
from fluvio import Fluvio
30-
31-
TOPIC_NAME = "python-data"
32-
PARTITION = 0
23+
import fluvio
24+
25+
fluvio_admin = FluvioAdmin.connect()
26+
topic_spec = (
27+
TopicSpec.create()
28+
.with_retention_time("1h")
29+
.with_segment_size("10M")
30+
.build()
31+
)
32+
fluvio_admin.create_topic("a_topic", topic_spec)
33+
```
3334

34-
if __name__ == "__main__":
35-
# Connect to cluster
36-
fluvio = Fluvio.connect()
35+
Producing data to a topic in a Fluvio cluster is as simple as:
3736

38-
# Produce 10 records to topic
39-
producer = fluvio.topic_producer(TOPIC_NAME)
40-
for x in range(10):
41-
producer.send_string("{}: timestamp: {}".format(x, datetime.now()))
37+
```python
38+
import fluvio
4239

43-
# Flush the last entry
44-
producer.flush()
45-
```
40+
fluvio = Fluvio.connect()
4641

47-
Let's run the file:
42+
topic = "a_topic"
43+
producer = fluvio.topic_producer(topic)
4844

49-
```bash copy="cmd"
50-
$ python python-produce.py
45+
for i in range(10):
46+
producer.send_string("Hello %s " % i)
5147
```
5248

53-
### Consumer
54-
55-
Create a file called `python-consume.py`:
49+
Consuming is also simple:
5650

5751
```python
58-
#!/usr/bin/env python
59-
from fluvio import Fluvio, Offset
60-
61-
TOPIC_NAME = "python-data"
62-
PARTITION = 0
52+
import fluvio
6353

64-
if __name__ == "__main__":
65-
# Connect to cluster
66-
fluvio = Fluvio.connect()
54+
fluvio = Fluvio.connect()
6755

68-
# Consume last 10 records from topic
69-
consumer = fluvio.partition_consumer(TOPIC_NAME, PARTITION)
70-
for idx, record in enumerate( consumer.stream(Offset.from_end(10)) ):
71-
print("{}".format(record.value_string()))
56+
topic = "a_topic"
57+
builder = ConsumerConfigExtBuilder(topic)
58+
config = builder.build()
59+
stream = fluvio.consumer_with_config(config)
7260

73-
if idx >= 9:
74-
break
61+
num_items = 2
62+
records = [bytearray(next(stream).value()).decode() for _ in range(num_items)]
7563
```
7664

77-
Let's run the file:
65+
Also you can consume usign offset management:
7866

79-
```bash copy="cmd"
80-
$ python python-consume.py
81-
```
82-
83-
## Example with a SmartModule
84-
85-
This is a 3 part example:
67+
```python
68+
import fluvio
8669

87-
* [Build a SmartModule that converts records to uppercase]
88-
* [Add a Python script that uses the SmartModule](#python-script-that-uses-a-smartmodule)
89-
* [Test the Python script](#test-python-script)
70+
fluvio = Fluvio.connect()
9071

91-
### Python Script that uses a SmartModule
72+
topic = "a_topic"
73+
builder = ConsumerConfigExtBuilder(topic)
74+
builder.offset_start(Offset.beginning())
75+
builder.offset_strategy(OffsetManagementStrategy.MANUAL)
76+
builder.offset_consumer("a-consumer")
77+
config = builder.build()
78+
stream = fluvio.consumer_with_config(config)
9279

93-
Create a file called `smartmodule-consumer.py`:
80+
num_items = 2
81+
records = [bytearray(next(stream).value()).decode() for _ in range(num_items)]
9482

95-
```python
96-
#!/usr/bin/env python
97-
import os
98-
from datetime import datetime
99-
from fluvio import Fluvio, Offset, ConsumerConfig
100-
101-
TOPIC_NAME = "hello-python-smartmodule"
102-
PARTITION = 0
103-
104-
# This is an example of a basic Fluvio workflow in Python
105-
#
106-
# 1. Create a topic to store data in via CLI
107-
# 2. Establish a connection to the Fluvio cluster
108-
# 3. Create a producer and send some bytes
109-
# 4. Create a consumer, and stream the data back
110-
if __name__ == "__main__":
111-
# Currently the Python client does not support creating topics
112-
# Using the Fluvio CLI
113-
os.popen("fluvio topic create {}".format(TOPIC_NAME))
114-
115-
# Connect to cluster
116-
fluvio = Fluvio.connect()
117-
118-
# Produce to topic
119-
producer = fluvio.topic_producer(TOPIC_NAME)
120-
producer.send_string("Hello World! - Time is: {}".format(datetime.now()))
121-
122-
# Consume from topic
123-
# We're just going to get the last record
124-
consumer = fluvio.partition_consumer(TOPIC_NAME, PARTITION)
125-
126-
# Create a ConsumerConfig using your "uppercase" smartmodule
127-
config = ConsumerConfig()
128-
config.smartmodule(name="uppercase")
129-
130-
for record in consumer.stream_with_config(Offset.from_end(0), config):
131-
print("{}".format(record.value_string()))
132-
break
83+
stream.offset_commit()
84+
stream.offset_flush()
13385
```
13486

135-
### Test Python Script
136-
137-
Read to test the script:
138-
139-
```shell copy="cmd"
140-
$ python smartmodule-consumer.py
141-
HELLO WORLD! - TIME IS: 2024-08-25 21:04:51.172045
142-
```
87+
For more examples see the integration tests in the fluvio-python repository.
14388

89+
* [Fluvio producer tests]
90+
* [Fluvio consumer tests]
91+
* [Example with a SmartModule]
14492

145-
[Build a SmartModule that converts records to uppercase]: /docs/smartmodules/tutorials/make-uppercase.mdx
93+
[Fluvio producer tests]: https://github.com/infinyon/fluvio-client-python/blob/main/integration-tests/test_produce.py
94+
[Fluvio consumer tests]: https://github.com/infinyon/fluvio-client-python/blob/main/integration-tests/test_consume.py
95+
[Example with a SmartModule]: fluvio/apis/python/smartmodule.mdx
Lines changed: 68 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,68 @@
1+
---
2+
title: Smart Modules
3+
sidebar_position: 30
4+
---
5+
6+
# Example with a SmartModule
7+
8+
This is a 3 part example:
9+
10+
* [Build a SmartModule that converts records to uppercase]
11+
* [Add a Python script that uses the SmartModule](#python-script-that-uses-a-smartmodule)
12+
* [Test the Python script](#test-python-script)
13+
14+
### Python Script that uses a SmartModule
15+
16+
Create a file called `smartmodule-consumer.py`:
17+
18+
```python
19+
#!/usr/bin/env python
20+
import os
21+
from datetime import datetime
22+
from fluvio import Fluvio, Offset, ConsumerConfig
23+
24+
TOPIC_NAME = "hello-python-smartmodule"
25+
PARTITION = 0
26+
27+
# This is an example of a basic Fluvio workflow in Python
28+
#
29+
# 1. Create a topic to store data in via CLI
30+
# 2. Establish a connection to the Fluvio cluster
31+
# 3. Create a producer and send some bytes
32+
# 4. Create a consumer, and stream the data back
33+
if __name__ == "__main__":
34+
# Currently the Python client does not support creating topics
35+
# Using the Fluvio CLI
36+
os.popen("fluvio topic create {}".format(TOPIC_NAME))
37+
38+
# Connect to cluster
39+
fluvio = Fluvio.connect()
40+
41+
# Produce to topic
42+
producer = fluvio.topic_producer(TOPIC_NAME)
43+
producer.send_string("Hello World! - Time is: {}".format(datetime.now()))
44+
45+
# Consume from topic
46+
# We're just going to get the last record
47+
consumer = fluvio.partition_consumer(TOPIC_NAME, PARTITION)
48+
49+
# Create a ConsumerConfig using your "uppercase" smartmodule
50+
config = ConsumerConfig()
51+
config.smartmodule(name="uppercase")
52+
53+
for record in consumer.stream_with_config(Offset.from_end(0), config):
54+
print("{}".format(record.value_string()))
55+
break
56+
```
57+
58+
### Test Python Script
59+
60+
Read to test the script:
61+
62+
```shell copy="cmd"
63+
$ python smartmodule-consumer.py
64+
HELLO WORLD! - TIME IS: 2024-08-25 21:04:51.172045
65+
```
66+
67+
68+
[Build a SmartModule that converts records to uppercase]: /docs/smartmodules/tutorials/make-uppercase.mdx

0 commit comments

Comments
 (0)