|
22 | 22 | import java.util.Set;
|
23 | 23 | import java.util.function.Supplier;
|
24 | 24 | import org.apache.kafka.common.serialization.Serdes;
|
25 |
| -import org.apache.kafka.streams.KeyValue; |
26 | 25 | import org.apache.kafka.streams.StreamsBuilder;
|
27 | 26 | import org.apache.kafka.streams.Topology;
|
28 | 27 | import org.apache.kafka.streams.kstream.Consumed;
|
@@ -79,99 +78,104 @@ public TraceStoreTopologySupplier(String spansTopicName, List<String> autoComple
|
79 | 78 | @Override public Topology get() {
|
80 | 79 | StreamsBuilder builder = new StreamsBuilder();
|
81 | 80 | builder
|
| 81 | + // Logging disabled to avoid long starting times |
82 | 82 | .addStateStore(Stores.keyValueStoreBuilder(
|
83 | 83 | Stores.persistentKeyValueStore(TRACES_STORE_NAME),
|
84 | 84 | Serdes.String(),
|
85 |
| - spansSerde)) |
| 85 | + spansSerde).withLoggingDisabled()) |
| 86 | + // Disabling logging to avoid long starting times |
86 | 87 | .addStateStore(Stores.keyValueStoreBuilder(
|
87 | 88 | Stores.persistentKeyValueStore(SPAN_IDS_BY_TS_STORE_NAME),
|
88 | 89 | Serdes.Long(),
|
89 |
| - spanIdsSerde)) |
| 90 | + spanIdsSerde).withLoggingDisabled()) |
| 91 | + // In-memory as service names are bounded |
90 | 92 | .addStateStore(Stores.keyValueStoreBuilder(
|
91 |
| - Stores.persistentKeyValueStore(SERVICE_NAMES_STORE_NAME), |
| 93 | + Stores.inMemoryKeyValueStore(SERVICE_NAMES_STORE_NAME), |
92 | 94 | Serdes.String(),
|
93 | 95 | Serdes.String()))
|
| 96 | + // In-memory as span names are bounded |
94 | 97 | .addStateStore(Stores.keyValueStoreBuilder(
|
95 |
| - Stores.persistentKeyValueStore(SPAN_NAMES_STORE_NAME), |
| 98 | + Stores.inMemoryKeyValueStore(SPAN_NAMES_STORE_NAME), |
96 | 99 | Serdes.String(),
|
97 | 100 | namesSerde))
|
| 101 | + // In-memory as remote-service names are bounded |
98 | 102 | .addStateStore(Stores.keyValueStoreBuilder(
|
99 |
| - Stores.persistentKeyValueStore(REMOTE_SERVICE_NAMES_STORE_NAME), |
| 103 | + Stores.inMemoryKeyValueStore(REMOTE_SERVICE_NAMES_STORE_NAME), |
100 | 104 | Serdes.String(),
|
101 | 105 | namesSerde))
|
| 106 | + // Persistent as values could be unbounded |
102 | 107 | .addStateStore(Stores.keyValueStoreBuilder(
|
103 | 108 | Stores.persistentKeyValueStore(AUTOCOMPLETE_TAGS_STORE_NAME),
|
104 | 109 | Serdes.String(),
|
105 |
| - namesSerde)); |
| 110 | + namesSerde).withLoggingDisabled()); |
106 | 111 | // Traces stream
|
107 | 112 | KStream<String, List<Span>> spansStream = builder
|
108 |
| - .stream(spansTopicName, Consumed.with(Serdes.String(), spansSerde)); |
| 113 | + .stream( |
| 114 | + spansTopicName, |
| 115 | + Consumed.with(Serdes.String(), spansSerde) |
| 116 | + .withOffsetResetPolicy(Topology.AutoOffsetReset.LATEST)); |
109 | 117 | // Store traces
|
110 | 118 | spansStream.process(() -> new Processor<String, List<Span>>() {
|
111 |
| - ProcessorContext context; |
112 |
| - // Actual traces store |
113 |
| - KeyValueStore<String, List<Span>> tracesStore; |
114 |
| - // timestamp index for trace IDs |
115 |
| - KeyValueStore<Long, Set<String>> spanIdsByTsStore; |
| 119 | + ProcessorContext context; |
| 120 | + // Actual traces store |
| 121 | + KeyValueStore<String, List<Span>> tracesStore; |
| 122 | + // timestamp index for trace IDs |
| 123 | + KeyValueStore<Long, Set<String>> spanIdsByTsStore; |
116 | 124 |
|
117 |
| - @Override public void init(ProcessorContext context) { |
118 |
| - this.context = context; |
119 |
| - tracesStore = |
120 |
| - (KeyValueStore<String, List<Span>>) context.getStateStore(TRACES_STORE_NAME); |
121 |
| - spanIdsByTsStore = |
122 |
| - (KeyValueStore<Long, Set<String>>) context.getStateStore(SPAN_IDS_BY_TS_STORE_NAME); |
123 |
| - // Retention scheduling |
124 |
| - context.schedule( |
125 |
| - traceTtlCheckInterval, |
126 |
| - PunctuationType.STREAM_TIME, |
127 |
| - timestamp -> { |
128 |
| - if (traceTtl.toMillis() > 0 && |
129 |
| - tracesStore.approximateNumEntries() > minTracesStored) { |
130 |
| - // preparing range filtering |
131 |
| - long from = 0L; |
132 |
| - long to = timestamp - traceTtl.toMillis(); |
133 |
| - long toMicro = to * 1000; |
134 |
| - // query traceIds active during period |
135 |
| - try (final KeyValueIterator<Long, Set<String>> all = |
136 |
| - spanIdsByTsStore.range(from, toMicro)) { |
137 |
| - int deletions = 0; // logging purpose |
138 |
| - while (all.hasNext()) { |
139 |
| - final KeyValue<Long, Set<String>> record = all.next(); |
140 |
| - spanIdsByTsStore.delete(record.key); // clean timestamp index |
141 |
| - for (String traceId : record.value) { |
142 |
| - tracesStore.delete(traceId); // clean traces store |
143 |
| - deletions++; |
144 |
| - } |
145 |
| - } |
146 |
| - if (deletions > 0) { |
147 |
| - LOG.info("Traces deletion emitted: {}, older than {}", |
148 |
| - deletions, |
149 |
| - Instant.ofEpochMilli(to).atZone(ZoneId.systemDefault())); |
150 |
| - } |
| 125 | + @Override public void init(ProcessorContext context) { |
| 126 | + this.context = context; |
| 127 | + tracesStore = |
| 128 | + (KeyValueStore<String, List<Span>>) context.getStateStore(TRACES_STORE_NAME); |
| 129 | + spanIdsByTsStore = |
| 130 | + (KeyValueStore<Long, Set<String>>) context.getStateStore(SPAN_IDS_BY_TS_STORE_NAME); |
| 131 | + // Retention scheduling |
| 132 | + context.schedule( |
| 133 | + traceTtlCheckInterval, |
| 134 | + PunctuationType.STREAM_TIME, |
| 135 | + timestamp -> { |
| 136 | + if (traceTtl.toMillis() > 0 && |
| 137 | + tracesStore.approximateNumEntries() > minTracesStored) { |
| 138 | + // preparing range filtering |
| 139 | + long from = 0L; |
| 140 | + long to = timestamp - traceTtl.toMillis(); |
| 141 | + long toMicro = to * 1000; |
| 142 | + // query traceIds active during period |
| 143 | + try (final KeyValueIterator<Long, Set<String>> range = |
| 144 | + spanIdsByTsStore.range(from, toMicro)) { |
| 145 | + range.forEachRemaining(record -> { |
| 146 | + spanIdsByTsStore.delete(record.key); // clean timestamp index |
| 147 | + for (String traceId : record.value) { |
| 148 | + tracesStore.delete(traceId); // clean traces store |
151 | 149 | }
|
152 |
| - } |
153 |
| - }); |
154 |
| - } |
| 150 | + }); |
| 151 | + LOG.info("Traces deletion emitted at {}, approx. number of traces stored {} - partition: {}", |
| 152 | + Instant.ofEpochMilli(to).atZone(ZoneId.systemDefault()), |
| 153 | + tracesStore.approximateNumEntries(), |
| 154 | + context.partition()); |
| 155 | + } |
| 156 | + } |
| 157 | + }); |
| 158 | + } |
155 | 159 |
|
156 |
| - @Override public void process(String traceId, List<Span> spans) { |
157 |
| - if (!spans.isEmpty()) { |
158 |
| - // Persist traces |
159 |
| - List<Span> currentSpans = tracesStore.get(traceId); |
160 |
| - if (currentSpans == null) currentSpans = new ArrayList<>(); |
161 |
| - currentSpans.addAll(spans); |
162 |
| - tracesStore.put(traceId, currentSpans); |
163 |
| - // Persist timestamp indexed span ids |
164 |
| - long timestamp = spans.get(0).timestamp(); |
165 |
| - Set<String> currentSpanIds = spanIdsByTsStore.get(timestamp); |
166 |
| - if (currentSpanIds == null) currentSpanIds = new HashSet<>(); |
167 |
| - currentSpanIds.add(traceId); |
168 |
| - spanIdsByTsStore.put(timestamp, currentSpanIds); |
169 |
| - } |
170 |
| - } |
| 160 | + @Override public void process(String traceId, List<Span> spans) { |
| 161 | + if (!spans.isEmpty()) { |
| 162 | + // Persist traces |
| 163 | + List<Span> currentSpans = tracesStore.get(traceId); |
| 164 | + if (currentSpans == null) currentSpans = new ArrayList<>(); |
| 165 | + currentSpans.addAll(spans); |
| 166 | + tracesStore.put(traceId, currentSpans); |
| 167 | + // Persist timestamp indexed span ids |
| 168 | + long timestamp = spans.get(0).timestamp(); |
| 169 | + Set<String> currentSpanIds = spanIdsByTsStore.get(timestamp); |
| 170 | + if (currentSpanIds == null) currentSpanIds = new HashSet<>(); |
| 171 | + currentSpanIds.add(traceId); |
| 172 | + spanIdsByTsStore.put(timestamp, currentSpanIds); |
| 173 | + } |
| 174 | + } |
171 | 175 |
|
172 |
| - @Override public void close() { |
173 |
| - } |
174 |
| - }, TRACES_STORE_NAME, SPAN_IDS_BY_TS_STORE_NAME); |
| 176 | + @Override public void close() { |
| 177 | + } |
| 178 | + }, TRACES_STORE_NAME, SPAN_IDS_BY_TS_STORE_NAME); |
175 | 179 | // Store service, span and remote service names
|
176 | 180 | spansStream.process(() -> new Processor<String, List<Span>>() {
|
177 | 181 | KeyValueStore<String, String> serviceNameStore;
|
|
0 commit comments