Skip to content

Commit 4cc14be

Browse files
authored
Merge pull request #3 from onoga/master
coerce config values with config definition
2 parents b7eaa72 + 8357db0 commit 4cc14be

File tree

3 files changed

+34
-88
lines changed

3 files changed

+34
-88
lines changed

src/clj_kafka_x/consumers/simple.clj

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -5,12 +5,13 @@
55
(:require [clj-kafka-x.data :refer :all]
66
[clj-kafka-x.impl.helpers :refer :all])
77
(:import java.util.regex.Pattern
8-
[org.apache.kafka.clients.consumer ConsumerRebalanceListener Consumer KafkaConsumer OffsetCommitCallback]
8+
[org.apache.kafka.clients.consumer ConsumerRebalanceListener Consumer KafkaConsumer OffsetCommitCallback ConsumerConfig]
99
[org.apache.kafka.common.serialization ByteArrayDeserializer Deserializer StringDeserializer]
1010
org.apache.kafka.common.TopicPartition
1111
(java.util Map Collection)
1212
(java.time Duration)))
1313

14+
(def ^:private config-def (ConsumerConfig/configDef))
1415

1516
(defn string-deserializer [] (StringDeserializer.))
1617
(defn byte-array-deserializer [] (ByteArrayDeserializer.))
@@ -48,9 +49,9 @@
4849
4950
"
5051
([^Map config]
51-
(KafkaConsumer. (safe-config config)))
52+
(KafkaConsumer. ^Map (coerce-config config-def config)))
5253
([^Map config ^Deserializer key-deserializer ^Deserializer value-deserializer]
53-
(KafkaConsumer. (safe-config config) key-deserializer value-deserializer)))
54+
(KafkaConsumer. ^Map (coerce-config config-def config) key-deserializer value-deserializer)))
5455

5556

5657
(defn subscribe

src/clj_kafka_x/impl/helpers.clj

Lines changed: 25 additions & 77 deletions
Original file line numberDiff line numberDiff line change
@@ -1,82 +1,30 @@
11
(ns clj-kafka-x.impl.helpers
2-
(:import (clojure.lang MapEntry IRecord IMapEntry)
3-
(java.util Map)))
2+
(:import (java.util Arrays)
3+
(org.apache.kafka.common.config ConfigDef ConfigDef$ConfigKey)))
44

5-
(defn- walk
6-
[inner outer form]
7-
(cond
8-
(list? form) (outer (apply list (map inner form)))
9-
(instance? IMapEntry form)
10-
(outer (MapEntry/create (inner (key form)) (inner (val form))))
11-
(seq? form) (outer (doall (map inner form)))
12-
(instance? IRecord form)
13-
(outer (reduce (fn [r x] (conj r (inner x))) form form))
14-
(coll? form) (outer (into (empty form) (map inner form)))
15-
:else (outer form)))
165

17-
(defn- postwalk
18-
[f form]
19-
(walk (partial postwalk f) f form))
6+
(defn- coerce-value [t v]
7+
(case t
8+
:SHORT (when (number? v) (short v))
9+
:INT (when (number? v) (int v))
10+
:LONG (when (number? v) (long v))
11+
:DOUBLE (when (number? v) (double v))
12+
:LIST (when (coll? v)
13+
(->> v
14+
(into-array String)
15+
Arrays/asList))
16+
nil))
2017

21-
(defn- stringify-keys
22-
[m]
23-
(let [f (fn [[k v]] (if (keyword? k) [(name k) v] [k v]))]
24-
(postwalk (fn [x] (if (map? x) (into {} (map f x)) x)) m)))
18+
(defn- coerce-config-entry [config-def [k v]]
19+
(let [k (cond-> k keyword? name)]
20+
(or
21+
(when-let [^ConfigDef$ConfigKey key (get config-def k)]
22+
(let [t (-> (.type key) str keyword)]
23+
(when-let [v (coerce-value t v)]
24+
[k v])))
25+
[k v])))
2526

26-
(defn update-in-when
27-
[m k f & args]
28-
(if (not= ::not-found (get-in m k ::not-found))
29-
(apply update-in m k f args)
30-
m))
31-
32-
(defn- short! [v]
33-
(if (number? v)
34-
(short v)
35-
(condp instance? v
36-
String (try
37-
(-> v str Short/parseShort)
38-
(catch NumberFormatException _ nil))
39-
nil)))
40-
41-
(defn- int! [v]
42-
(if (number? v)
43-
(int v)
44-
(condp instance? v
45-
String (try
46-
(-> v str Integer/parseInt)
47-
(catch NumberFormatException _ nil))
48-
nil)))
49-
50-
(defn safe-config ^Map [^Map config]
51-
(if (map? config)
52-
(-> config
53-
stringify-keys
54-
;; common
55-
(update-in-when ["request.timeout.ms"] int!)
56-
(update-in-when ["receive.buffer.bytes"] int!)
57-
(update-in-when ["metrics.num.samples"] int!)
58-
;; producer
59-
(update-in-when ["send.buffer.bytes"] int!)
60-
(update-in-when ["retries"] int!)
61-
(update-in-when ["batch.size"] int!)
62-
(update-in-when ["delivery.timeout.ms"] int!)
63-
(update-in-when ["max.request.size"] int!)
64-
(update-in-when ["max.in.flight.requests.per.connection"] int!)
65-
(update-in-when ["sasl.login.connect.timeout.ms"] int!)
66-
(update-in-when ["sasl.login.read.timeout.ms"] int!)
67-
(update-in-when ["sasl.login.refresh.buffer.seconds"] short!)
68-
(update-in-when ["sasl.login.refresh.min.period.seconds"] short!)
69-
(update-in-when ["sasl.oauthbearer.clock.skew.seconds"] int!)
70-
(update-in-when ["transaction.timeout.ms"] int!)
71-
;; consumer
72-
(update-in-when ["fetch.min.bytes"] int!)
73-
(update-in-when ["fetch.max.bytes"] int!)
74-
(update-in-when ["fetch.max.wait.ms"] int!)
75-
(update-in-when ["heartbeat.interval.ms"] int!)
76-
(update-in-when ["max.partition.fetch.bytes"] int!)
77-
(update-in-when ["session.timeout.ms"] int!)
78-
(update-in-when ["max.poll.interval.ms"] int!)
79-
(update-in-when ["max.poll.records"] int!)
80-
(update-in-when ["auto.commit.interval.ms"] int!)
81-
(update-in-when ["fetch.max.wait.ms"] int!))
82-
{}))
27+
(defn coerce-config [^ConfigDef def conf]
28+
(->> conf
29+
(map (partial coerce-config-entry (into {} (.configKeys def))))
30+
(into {})))

src/clj_kafka_x/producer.clj

Lines changed: 5 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -5,13 +5,13 @@
55
(:refer-clojure :exclude [send flush])
66
(:require [clj-kafka-x.data :refer :all]
77
[clj-kafka-x.impl.helpers :refer :all])
8-
(:import [java.util.concurrent Future TimeUnit]
9-
[org.apache.kafka.clients.producer Callback Producer KafkaProducer ProducerRecord]
8+
(:import [java.util.concurrent Future]
9+
[org.apache.kafka.clients.producer Callback Producer KafkaProducer ProducerRecord ProducerConfig]
1010
(org.apache.kafka.common.serialization Serializer ByteArraySerializer StringSerializer)
1111
(java.util Map)
1212
(java.time Duration)))
1313

14-
14+
(def ^:private config-def (ProducerConfig/configDef))
1515

1616
(defn- map-future-val
1717
[^Future fut f]
@@ -23,8 +23,6 @@
2323
(isCancelled [_] (.isCancelled fut))
2424
(isDone [_] (.isDone fut))))
2525

26-
27-
2826
(defn string-serializer [] (StringSerializer.))
2927
(defn byte-array-serializer [] (ByteArraySerializer.))
3028

@@ -56,11 +54,10 @@
5654
(-> (send p (record \"topic-a\" \"Hello World\"))
5755
(.get)))
5856
"
59-
6057
([^Map config]
61-
(KafkaProducer. (safe-config config)))
58+
(KafkaProducer. ^Map (coerce-config config-def config)))
6259
([^Map config ^Serializer key-serializer ^Serializer value-serializer]
63-
(KafkaProducer. (safe-config config) key-serializer value-serializer)))
60+
(KafkaProducer. ^Map (coerce-config config-def config) key-serializer value-serializer)))
6461

6562
(defn record
6663
"Return a record that can be published to Kafka using [[send]]."

0 commit comments

Comments
 (0)