Skip to content

Commit a9a1f8d

Browse files
committed
make also producer configuration safe; extract helpers into dedicated ns
1 parent 1dbdba5 commit a9a1f8d

File tree

3 files changed

+104
-77
lines changed

3 files changed

+104
-77
lines changed

src/clj_kafka_x/consumers/simple.clj

Lines changed: 17 additions & 72 deletions
Original file line numberDiff line numberDiff line change
@@ -2,75 +2,19 @@
22
For complete JavaDocs, see:
33
http://kafka.apache.org/0100/javadoc/index.html?org/apache/kafka/clients/consumer/package-summary.html"}
44
clj-kafka-x.consumers.simple
5-
(:require [clj-kafka-x.data :refer :all])
6-
(:import java.util.List
7-
java.util.regex.Pattern
8-
[org.apache.kafka.clients.consumer ConsumerRebalanceListener Consumer KafkaConsumer OffsetAndMetadata OffsetCommitCallback]
5+
(:require [clj-kafka-x.data :refer :all]
6+
[clj-kafka-x.impl.helpers :refer :all])
7+
(:import java.util.regex.Pattern
8+
[org.apache.kafka.clients.consumer ConsumerRebalanceListener Consumer KafkaConsumer OffsetCommitCallback]
99
[org.apache.kafka.common.serialization ByteArrayDeserializer Deserializer StringDeserializer]
1010
org.apache.kafka.common.TopicPartition
11-
(java.util Map)
12-
(clojure.lang IMapEntry MapEntry IRecord)))
11+
(java.util Map Collection)
12+
(java.time Duration)))
1313

1414

1515
(defn string-deserializer [] (StringDeserializer.))
1616
(defn byte-array-deserializer [] (ByteArrayDeserializer.))
1717

18-
(defn- walk
19-
[inner outer form]
20-
(cond
21-
(list? form) (outer (apply list (map inner form)))
22-
(instance? IMapEntry form)
23-
(outer (MapEntry/create (inner (key form)) (inner (val form))))
24-
(seq? form) (outer (doall (map inner form)))
25-
(instance? IRecord form)
26-
(outer (reduce (fn [r x] (conj r (inner x))) form form))
27-
(coll? form) (outer (into (empty form) (map inner form)))
28-
:else (outer form)))
29-
30-
(defn- postwalk
31-
[f form]
32-
(walk (partial postwalk f) f form))
33-
34-
(defn- stringify-keys
35-
[m]
36-
(let [f (fn [[k v]] (if (keyword? k) [(name k) v] [k v]))]
37-
(postwalk (fn [x] (if (map? x) (into {} (map f x)) x)) m)))
38-
39-
(defn update-in-when
40-
[m k f & args]
41-
(if (not= ::not-found (get-in m k ::not-found))
42-
(apply update-in m k f args)
43-
m))
44-
45-
(defn- int! [v]
46-
(if (number? v)
47-
(int v)
48-
(condp instance? v
49-
String (try
50-
(-> v str Integer/parseInt)
51-
(catch NumberFormatException _ nil))
52-
nil)))
53-
54-
(defn- safe-config ^Map [^Map config]
55-
(if (map? config)
56-
(-> config
57-
stringify-keys
58-
(update-in-when ["fetch.min.bytes"] int!)
59-
(update-in-when ["fetch.max.bytes"] int!)
60-
(update-in-when ["fetch.max.wait.ms"] int!)
61-
(update-in-when ["heartbeat.interval.ms"] int!)
62-
(update-in-when ["max.partition.fetch.bytes"] int!)
63-
(update-in-when ["session.timeout.ms"] int!)
64-
(update-in-when ["max.poll.interval.ms"] int!)
65-
(update-in-when ["max.poll.records"] int!)
66-
(update-in-when ["receive.buffer.bytes"] int!)
67-
(update-in-when ["request.timeout.ms"] int!)
68-
(update-in-when ["send.buffer.bytes"] int!)
69-
(update-in-when ["auto.commit.interval.ms"] int!)
70-
(update-in-when ["fetch.max.wait.ms"] int!)
71-
(update-in-when ["metrics.num.samples"] int!))
72-
{}))
73-
7418
(defn consumer
7519
"Takes a map of config options and returns a `KafkaConsumer` for consuming records from Kafka.
7620
@@ -157,14 +101,14 @@
157101
(let [listener (reify ConsumerRebalanceListener
158102
(onPartitionsAssigned [_ partitions] (assigned-callback (mapv to-clojure partitions)))
159103
(onPartitionsRevoked [_ partitions] (revoked-callback (mapv to-clojure partitions))))
160-
topics (cond
161-
(string? topics) (vector topics)
162-
(and (sequential? topics) (string? (first topics))) topics
163-
(= Pattern (type topics)) topics
164-
(and (sequential? topics) (map? (first topics))) topics
165-
:else (throw
166-
(ex-info "Topic should be a string, sequence (of strings or maps) or pattern"
167-
{:topic topics})))]
104+
topics ^Collection (cond
105+
(string? topics) (vector topics)
106+
(and (sequential? topics) (string? (first topics))) topics
107+
(= Pattern (type topics)) topics
108+
(and (sequential? topics) (map? (first topics))) topics
109+
:else (throw
110+
(ex-info "Topic should be a string, sequence (of strings or maps) or pattern"
111+
{:topic topics})))]
168112

169113
(if (and (sequential? topics) (map? (first topics)))
170114
(do
@@ -288,7 +232,8 @@
288232
"
289233
[^Consumer consumer & {:keys [timeout] :or {timeout 1000}}]
290234

291-
(let [consumer-records (.poll consumer timeout)]
235+
(let [duration (Duration/ofMillis timeout)
236+
consumer-records (.poll consumer duration)]
292237
(to-clojure consumer-records)))
293238

294239

@@ -365,7 +310,7 @@
365310
"
366311
([^Consumer consumer] (.commitSync consumer))
367312
([^Consumer consumer topic-partitions-offsets-metadata]
368-
(let [tp-om-map (map->tp-om-map topic-partitions-offsets-metadata)]
313+
(let [tp-om-map ^Map (map->tp-om-map topic-partitions-offsets-metadata)]
369314
(.commitSync consumer tp-om-map))))
370315

371316

src/clj_kafka_x/impl/helpers.clj

Lines changed: 82 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,82 @@
1+
(ns clj-kafka-x.impl.helpers
2+
(:import (clojure.lang MapEntry IRecord IMapEntry)
3+
(java.util Map)))
4+
5+
(defn- walk
6+
[inner outer form]
7+
(cond
8+
(list? form) (outer (apply list (map inner form)))
9+
(instance? IMapEntry form)
10+
(outer (MapEntry/create (inner (key form)) (inner (val form))))
11+
(seq? form) (outer (doall (map inner form)))
12+
(instance? IRecord form)
13+
(outer (reduce (fn [r x] (conj r (inner x))) form form))
14+
(coll? form) (outer (into (empty form) (map inner form)))
15+
:else (outer form)))
16+
17+
(defn- postwalk
18+
[f form]
19+
(walk (partial postwalk f) f form))
20+
21+
(defn- stringify-keys
22+
[m]
23+
(let [f (fn [[k v]] (if (keyword? k) [(name k) v] [k v]))]
24+
(postwalk (fn [x] (if (map? x) (into {} (map f x)) x)) m)))
25+
26+
(defn update-in-when
27+
[m k f & args]
28+
(if (not= ::not-found (get-in m k ::not-found))
29+
(apply update-in m k f args)
30+
m))
31+
32+
(defn- short! [v]
33+
(if (number? v)
34+
(short v)
35+
(condp instance? v
36+
String (try
37+
(-> v str Short/parseShort)
38+
(catch NumberFormatException _ nil))
39+
nil)))
40+
41+
(defn- int! [v]
42+
(if (number? v)
43+
(int v)
44+
(condp instance? v
45+
String (try
46+
(-> v str Integer/parseInt)
47+
(catch NumberFormatException _ nil))
48+
nil)))
49+
50+
(defn safe-config ^Map [^Map config]
51+
(if (map? config)
52+
(-> config
53+
stringify-keys
54+
;; common
55+
(update-in-when ["request.timeout.ms"] int!)
56+
(update-in-when ["receive.buffer.bytes"] int!)
57+
(update-in-when ["metrics.num.samples"] int!)
58+
;; producer
59+
(update-in-when ["send.buffer.bytes"] int!)
60+
(update-in-when ["retries"] int!)
61+
(update-in-when ["batch.size"] int!)
62+
(update-in-when ["delivery.timeout.ms"] int!)
63+
(update-in-when ["max.request.size"] int!)
64+
(update-in-when ["max.in.flight.requests.per.connection"] int!)
65+
(update-in-when ["sasl.login.connect.timeout.ms"] int!)
66+
(update-in-when ["sasl.login.read.timeout.ms"] int!)
67+
(update-in-when ["sasl.login.refresh.buffer.seconds"] short!)
68+
(update-in-when ["sasl.login.refresh.min.period.seconds"] short!)
69+
(update-in-when ["sasl.oauthbearer.clock.skew.seconds"] int!)
70+
(update-in-when ["transaction.timeout.ms"] int!)
71+
;; consumer
72+
(update-in-when ["fetch.min.bytes"] int!)
73+
(update-in-when ["fetch.max.bytes"] int!)
74+
(update-in-when ["fetch.max.wait.ms"] int!)
75+
(update-in-when ["heartbeat.interval.ms"] int!)
76+
(update-in-when ["max.partition.fetch.bytes"] int!)
77+
(update-in-when ["session.timeout.ms"] int!)
78+
(update-in-when ["max.poll.interval.ms"] int!)
79+
(update-in-when ["max.poll.records"] int!)
80+
(update-in-when ["auto.commit.interval.ms"] int!)
81+
(update-in-when ["fetch.max.wait.ms"] int!))
82+
{}))

src/clj_kafka_x/producer.clj

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -3,10 +3,10 @@
33
http://kafka.apache.org/0100/javadoc/index.html?org/apache/kafka/clients/producer/package-summary.html"}
44
clj-kafka-x.producer
55
(:refer-clojure :exclude [send flush])
6-
(:require [clj-kafka-x.data :refer :all])
7-
(:import [java.util.concurrent Future TimeUnit TimeoutException]
8-
[org.apache.kafka.clients.producer Callback Producer KafkaProducer ProducerRecord RecordMetadata]
9-
[org.apache.kafka.common Metric MetricName]
6+
(:require [clj-kafka-x.data :refer :all]
7+
[clj-kafka-x.impl.helpers :refer :all])
8+
(:import [java.util.concurrent Future TimeUnit]
9+
[org.apache.kafka.clients.producer Callback Producer KafkaProducer ProducerRecord]
1010
(org.apache.kafka.common.serialization Serializer ByteArraySerializer StringSerializer)
1111
(java.util Map)))
1212

@@ -57,7 +57,7 @@
5757
"
5858

5959
([^Map config]
60-
(KafkaProducer. config))
60+
(KafkaProducer. (safe-config config)))
6161
([^Map config ^Serializer key-serializer ^Serializer value-serializer]
6262
(KafkaProducer. config key-serializer value-serializer)))
6363

0 commit comments

Comments
 (0)