Skip to content

Commit 552bc65

Browse files
authored
[FSTORE-1454] Unify "platform" APIs (logicalclocks#220)
* Move * Adapt the moved files * Fix version in pyproject * Fix mocking * Prepare to move kafka_api from hopsworks * Move kafka_api from hopsworks * Adapt the moved kafka-related files * Fix KafkaApi.__init__
1 parent 7f17aeb commit 552bc65

34 files changed

+1863
-1459
lines changed

python/hopsworks/core/kafka_api.py

+7-368
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
#
2-
# Copyright 2022 Logical Clocks AB
2+
# Copyright 2024 Hopsworks AB
33
#
44
# Licensed under the Apache License, Version 2.0 (the "License");
55
# you may not use this file except in compliance with the License.
@@ -14,372 +14,11 @@
1414
# limitations under the License.
1515
#
1616

17-
import json
18-
import socket
17+
from hopsworks_common.core.kafka_api import (
18+
KafkaApi,
19+
)
1920

20-
from hopsworks import client, constants, kafka_schema, kafka_topic
21-
from hopsworks.client.exceptions import KafkaException
22-
from hopsworks.client.external import Client
2321

24-
25-
class KafkaApi:
26-
def __init__(
27-
self,
28-
project_id,
29-
project_name,
30-
):
31-
self._project_id = project_id
32-
self._project_name = project_name
33-
34-
def create_topic(
35-
self,
36-
name: str,
37-
schema: str,
38-
schema_version: int,
39-
replicas: int = 1,
40-
partitions: int = 1,
41-
):
42-
"""Create a new kafka topic.
43-
44-
```python
45-
46-
import hopsworks
47-
48-
project = hopsworks.login()
49-
50-
kafka_api = project.get_kafka_api()
51-
52-
kafka_topic = kafka_api.create_topic("my_topic", "my_schema", 1)
53-
54-
```
55-
# Arguments
56-
name: name of the topic
57-
schema: subject name of the schema
58-
schema_version: version of the schema
59-
replicas: replication factor for the topic
60-
partitions: partitions for the topic
61-
# Returns
62-
`KafkaTopic`: The KafkaTopic object
63-
# Raises
64-
`RestAPIError`: If unable to create the topic
65-
"""
66-
_client = client.get_instance()
67-
68-
path_params = ["project", self._project_id, "kafka", "topics"]
69-
data = {
70-
"name": name,
71-
"schemaName": schema,
72-
"schemaVersion": schema_version,
73-
"numOfReplicas": replicas,
74-
"numOfPartitions": partitions,
75-
}
76-
77-
headers = {"content-type": "application/json"}
78-
return kafka_topic.KafkaTopic.from_response_json(
79-
_client._send_request(
80-
"POST", path_params, headers=headers, data=json.dumps(data)
81-
),
82-
self._project_id,
83-
self._project_name,
84-
)
85-
86-
def create_schema(self, subject: str, schema: dict):
87-
"""Create a new kafka schema.
88-
89-
```python
90-
91-
import hopsworks
92-
93-
project = hopsworks.login()
94-
95-
kafka_api = project.get_kafka_api()
96-
97-
avro_schema = {
98-
"type": "record",
99-
"name": "tutorial",
100-
"fields": [
101-
{
102-
"name": "id",
103-
"type": "int"
104-
},
105-
{
106-
"name": "data",
107-
"type": "string"
108-
}
109-
]
110-
}
111-
112-
kafka_topic = kafka_api.create_schema("my_schema", avro_schema)
113-
114-
```
115-
# Arguments
116-
subject: subject name of the schema
117-
schema: avro schema definition
118-
# Returns
119-
`KafkaSchema`: The KafkaSchema object
120-
# Raises
121-
`RestAPIError`: If unable to create the schema
122-
"""
123-
_client = client.get_instance()
124-
125-
path_params = [
126-
"project",
127-
self._project_id,
128-
"kafka",
129-
"subjects",
130-
subject,
131-
"versions",
132-
]
133-
134-
headers = {"content-type": "application/json"}
135-
schema = kafka_schema.KafkaSchema.from_response_json(
136-
_client._send_request(
137-
"POST",
138-
path_params,
139-
headers=headers,
140-
data=json.dumps({"schema": json.dumps(schema)}),
141-
),
142-
self._project_id,
143-
self._project_name,
144-
)
145-
# TODO: Fix backend, GET request required as POST does not set schema field in the returned payload
146-
return self.get_schema(schema.subject, schema.version)
147-
148-
def get_topic(self, name: str):
149-
"""Get kafka topic by name.
150-
151-
# Arguments
152-
name: name of the topic
153-
# Returns
154-
`KafkaTopic`: The KafkaTopic object
155-
# Raises
156-
`RestAPIError`: If unable to get the topic
157-
"""
158-
topics = self.get_topics()
159-
160-
for topic in topics:
161-
if topic.name == name:
162-
return topic
163-
164-
raise KafkaException("No topic named {} could be found".format(name))
165-
166-
def get_topics(self):
167-
"""Get all kafka topics.
168-
169-
# Returns
170-
`List[KafkaTopic]`: List of KafkaTopic objects
171-
# Raises
172-
`RestAPIError`: If unable to get the topics
173-
"""
174-
_client = client.get_instance()
175-
path_params = ["project", self._project_id, "kafka", "topics"]
176-
177-
return kafka_topic.KafkaTopic.from_response_json(
178-
_client._send_request("GET", path_params),
179-
self._project_id,
180-
self._project_name,
181-
)
182-
183-
def _delete_topic(self, name: str):
184-
"""Delete the topic.
185-
:param name: name of the topic
186-
:type name: str
187-
"""
188-
_client = client.get_instance()
189-
path_params = [
190-
"project",
191-
self._project_id,
192-
"kafka",
193-
"topics",
194-
name,
195-
]
196-
_client._send_request("DELETE", path_params)
197-
198-
def _delete_subject_version(self, subject: str, version: int):
199-
"""Delete the schema.
200-
:param subject: subject name of the schema
201-
:type subject: str
202-
:param version: version of the subject
203-
:type version: int
204-
"""
205-
_client = client.get_instance()
206-
path_params = [
207-
"project",
208-
self._project_id,
209-
"kafka",
210-
"subjects",
211-
subject,
212-
"versions",
213-
str(version),
214-
]
215-
_client._send_request("DELETE", path_params)
216-
217-
def get_subjects(self):
218-
"""Get all subjects.
219-
220-
# Returns
221-
`List[str]`: List of registered subjects
222-
# Raises
223-
`RestAPIError`: If unable to get the subjects
224-
"""
225-
topics = self.get_topics()
226-
227-
subjects = set()
228-
229-
for topic in topics:
230-
subjects.add(topic.schema.subject)
231-
232-
return list(subjects)
233-
234-
def get_schemas(self, subject: str):
235-
"""Get all schema versions for the subject.
236-
237-
# Arguments
238-
subject: subject name
239-
# Returns
240-
`List[KafkaSchema]`: List of KafkaSchema objects
241-
# Raises
242-
`RestAPIError`: If unable to get the schemas
243-
"""
244-
_client = client.get_instance()
245-
path_params = [
246-
"project",
247-
self._project_id,
248-
"kafka",
249-
"subjects",
250-
subject,
251-
"versions",
252-
]
253-
254-
versions = _client._send_request("GET", path_params)
255-
256-
schemas = []
257-
for version in versions:
258-
schemas.append(self._get_schema_details(subject, version))
259-
260-
return schemas
261-
262-
def get_schema(self, subject: str, version: int):
263-
"""Get schema given subject name and version.
264-
265-
# Arguments
266-
subject: subject name
267-
version: version number
268-
# Returns
269-
`KafkaSchema`: KafkaSchema object
270-
# Raises
271-
`RestAPIError`: If unable to get the schema
272-
"""
273-
schemas = self.get_schemas(subject)
274-
for schema in schemas:
275-
if schema.version == version:
276-
return schema
277-
278-
raise KafkaException(
279-
"No schema for subject {} and version {} could be found".format(
280-
subject, version
281-
)
282-
)
283-
284-
def _get_schema_details(self, subject: str, version: int):
285-
"""Get the schema details.
286-
:param subject: subject name of the schema
287-
:type subject: str
288-
:param version: version of the subject
289-
:type version: int
290-
"""
291-
_client = client.get_instance()
292-
path_params = [
293-
"project",
294-
self._project_id,
295-
"kafka",
296-
"subjects",
297-
subject,
298-
"versions",
299-
str(version),
300-
]
301-
302-
return kafka_schema.KafkaSchema.from_response_json(
303-
_client._send_request("GET", path_params),
304-
self._project_id,
305-
self._project_name,
306-
)
307-
308-
def _get_broker_endpoints(self, externalListeners: bool = False):
309-
_client = client.get_instance()
310-
path_params = [
311-
"project",
312-
self._project_id,
313-
"kafka",
314-
"clusterinfo",
315-
]
316-
query_params = {"external": externalListeners}
317-
headers = {"content-type": "application/json"}
318-
return _client._send_request(
319-
"GET", path_params, query_params=query_params, headers=headers
320-
)["brokers"]
321-
322-
def _get_security_protocol(self):
323-
"""
324-
Gets the security protocol used for communicating with Kafka brokers in a Hopsworks cluster
325-
Returns:
326-
the security protocol for communicating with Kafka brokers in a Hopsworks cluster
327-
"""
328-
return constants.KAFKA_SSL_CONFIG.SSL
329-
330-
def get_default_config(self):
331-
"""Get the configuration to set up a Producer or Consumer for a Kafka broker using confluent-kafka.
332-
333-
```python
334-
335-
import hopsworks
336-
337-
project = hopsworks.login()
338-
339-
kafka_api = project.get_kafka_api()
340-
341-
kafka_conf = kafka_api.get_default_config()
342-
343-
from confluent_kafka import Producer
344-
345-
producer = Producer(kafka_conf)
346-
347-
```
348-
# Returns
349-
`dict`: The kafka configuration
350-
# Raises
351-
`RestAPIError`: If unable to get the kafka configuration.
352-
"""
353-
354-
config = {
355-
constants.KAFKA_SSL_CONFIG.SECURITY_PROTOCOL_CONFIG: self._get_security_protocol(),
356-
constants.KAFKA_SSL_CONFIG.SSL_CA_LOCATION_CONFIG: client.get_instance()._get_ca_chain_path(
357-
self._project_name
358-
),
359-
constants.KAFKA_SSL_CONFIG.SSL_CERTIFICATE_LOCATION_CONFIG: client.get_instance()._get_client_cert_path(
360-
self._project_name
361-
),
362-
constants.KAFKA_SSL_CONFIG.SSL_PRIVATE_KEY_LOCATION_CONFIG: client.get_instance()._get_client_key_path(
363-
self._project_name
364-
),
365-
constants.KAFKA_CONSUMER_CONFIG.CLIENT_ID_CONFIG: socket.gethostname(),
366-
constants.KAFKA_CONSUMER_CONFIG.GROUP_ID_CONFIG: "my-group-id",
367-
constants.KAFKA_SSL_CONFIG.SSL_ENDPOINT_IDENTIFICATION_ALGORITHM_CONFIG: "none",
368-
}
369-
_client = client.get_instance()
370-
if type(_client) is Client:
371-
config[constants.KAFKA_PRODUCER_CONFIG.BOOTSTRAP_SERVERS_CONFIG] = ",".join(
372-
[
373-
endpoint.replace("EXTERNAL://", "")
374-
for endpoint in self._get_broker_endpoints(externalListeners=True)
375-
]
376-
)
377-
else:
378-
config[constants.KAFKA_PRODUCER_CONFIG.BOOTSTRAP_SERVERS_CONFIG] = ",".join(
379-
[
380-
endpoint.replace("INTERNAL://", "")
381-
for endpoint in self._get_broker_endpoints(externalListeners=False)
382-
]
383-
)
384-
385-
return config
22+
__all__ = [
23+
KafkaApi,
24+
]

0 commit comments

Comments
 (0)