Skip to content

Commit 1673cb7

Browse files
committed
add consumer filter but the tests are not passing with prefixes.
1 parent 60a496d commit 1673cb7

File tree

4 files changed

+612
-0
lines changed

4 files changed

+612
-0
lines changed
Lines changed: 319 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,319 @@
1+
package io.kurrent.dbclient.v2;
2+
3+
import io.kurrent.dbclient.SubscriptionFilter;
4+
import io.kurrent.dbclient.SubscriptionFilterBuilder;
5+
6+
import javax.validation.constraints.NotNull;
7+
import java.util.Objects;
8+
import java.util.regex.Pattern;
9+
import java.util.regex.Matcher;
10+
11+
/**
12+
* Represents a filter for consuming events.
13+
*/
14+
public class ConsumeFilter {
15+
/**
16+
* Represents an empty filter.
17+
*/
18+
public static final ConsumeFilter NONE = new ConsumeFilter();
19+
20+
private final ConsumeFilterScope scope;
21+
private final ConsumeFilterType type;
22+
private final String expression;
23+
private final Pattern regex;
24+
25+
/**
26+
* Creates a new empty consume filter.
27+
*/
28+
public ConsumeFilter() {
29+
this.scope = ConsumeFilterScope.UNSPECIFIED;
30+
this.type = ConsumeFilterType.UNSPECIFIED;
31+
this.expression = "";
32+
this.regex = Pattern.compile("");
33+
}
34+
35+
/**
36+
* Creates a new consume filter with the specified parameters.
37+
*
38+
* @param scope The scope of the filter.
39+
* @param type The type of the filter.
40+
* @param expression The filter expression.
41+
* @param regex The compiled regex pattern.
42+
*/
43+
private ConsumeFilter(ConsumeFilterScope scope, ConsumeFilterType type, String expression, Pattern regex) {
44+
this.scope = scope;
45+
this.type = type;
46+
this.expression = expression;
47+
this.regex = regex;
48+
}
49+
50+
/**
51+
* Gets the scope of the filter.
52+
*
53+
* @return The filter scope.
54+
*/
55+
public ConsumeFilterScope getScope() {
56+
return scope;
57+
}
58+
59+
/**
60+
* Gets the type of the filter.
61+
*
62+
* @return The filter type.
63+
*/
64+
public ConsumeFilterType getType() {
65+
return type;
66+
}
67+
68+
/**
69+
* Gets the filter expression.
70+
*
71+
* @return The filter expression.
72+
*/
73+
public String getExpression() {
74+
return expression;
75+
}
76+
77+
/**
78+
* Checks if this is a literal filter.
79+
*
80+
* @return True if this is a literal filter, false otherwise.
81+
*/
82+
public boolean isLiteralFilter() {
83+
return type == ConsumeFilterType.LITERAL;
84+
}
85+
86+
/**
87+
* Checks if this is a regex filter.
88+
*
89+
* @return True if this is a regex filter, false otherwise.
90+
*/
91+
public boolean isRegexFilter() {
92+
return type == ConsumeFilterType.REGEX;
93+
}
94+
95+
/**
96+
* Checks if this is a stream filter.
97+
*
98+
* @return True if this is a stream filter, false otherwise.
99+
*/
100+
public boolean isStreamFilter() {
101+
return scope == ConsumeFilterScope.STREAM;
102+
}
103+
104+
/**
105+
* Checks if this is a record filter.
106+
*
107+
* @return True if this is a record filter, false otherwise.
108+
*/
109+
public boolean isRecordFilter() {
110+
return scope == ConsumeFilterScope.RECORD;
111+
}
112+
113+
/**
114+
* Checks if this is a stream name filter.
115+
*
116+
* @return True if this is a stream name filter, false otherwise.
117+
*/
118+
public boolean isStreamNameFilter() {
119+
return type == ConsumeFilterType.LITERAL && scope == ConsumeFilterScope.STREAM;
120+
}
121+
122+
/**
123+
* Checks if this is an empty filter.
124+
*
125+
* @return True if this is an empty filter, false otherwise.
126+
*/
127+
public boolean isEmptyFilter() {
128+
return type == ConsumeFilterType.UNSPECIFIED && scope == ConsumeFilterScope.UNSPECIFIED;
129+
}
130+
131+
/**
132+
* Checks if the input matches this filter.
133+
*
134+
* @param input The input to check.
135+
* @return True if the input matches this filter, false otherwise.
136+
*/
137+
public boolean isMatch(CharSequence input) {
138+
return regex.matcher(input).matches();
139+
}
140+
141+
/**
142+
* Creates a stream filter from a stream name.
143+
*
144+
* @param stream The stream name.
145+
* @return A new consume filter for the specified stream.
146+
* @throws IllegalArgumentException If the stream name is invalid.
147+
*/
148+
public static ConsumeFilter fromStream(String stream) {
149+
if (stream == null || stream.trim().isEmpty())
150+
throw new IllegalArgumentException("Stream name cannot be null or whitespace.");
151+
152+
if (stream.startsWith("~"))
153+
throw new IllegalArgumentException("Stream name cannot start with '~'.");
154+
155+
if (stream.length() < 2)
156+
throw new IllegalArgumentException("Stream name must be at least 2 characters long.");
157+
158+
return new ConsumeFilter(
159+
ConsumeFilterScope.STREAM,
160+
ConsumeFilterType.LITERAL,
161+
stream,
162+
Pattern.compile(Pattern.quote(stream))
163+
);
164+
}
165+
166+
/**
167+
* Creates a filter from prefixes.
168+
*
169+
* @param scope The scope of the filter.
170+
* @param prefixes The prefixes to filter by.
171+
* @return A new consume filter for the specified prefixes.
172+
* @throws IllegalArgumentException If the prefixes are invalid.
173+
*/
174+
public static ConsumeFilter fromPrefixes(ConsumeFilterScope scope, String... prefixes) {
175+
if (prefixes.length == 0)
176+
throw new IllegalArgumentException("Prefixes cannot be empty.");
177+
178+
StringBuilder patternBuilder = new StringBuilder("^(");
179+
boolean first = true;
180+
181+
for (String prefix : prefixes) {
182+
if (prefix == null || prefix.trim().isEmpty())
183+
throw new IllegalArgumentException("Prefix cannot be empty.");
184+
185+
if (!first)
186+
patternBuilder.append("|");
187+
188+
patternBuilder.append(Pattern.quote(prefix));
189+
first = false;
190+
}
191+
patternBuilder.append(")");
192+
193+
String pattern = patternBuilder.toString();
194+
return new ConsumeFilter(
195+
scope,
196+
ConsumeFilterType.REGEX,
197+
pattern,
198+
Pattern.compile(pattern)
199+
);
200+
}
201+
202+
/**
203+
* Creates a filter from a comma-separated list of prefixes.
204+
*
205+
* @param scope The scope of the filter.
206+
* @param expression The comma-separated list of prefixes.
207+
* @return A new consume filter for the specified prefixes.
208+
* @throws IllegalArgumentException If the expression is invalid.
209+
*/
210+
public static ConsumeFilter fromPrefixes(ConsumeFilterScope scope, String expression) {
211+
if (expression == null || expression.trim().isEmpty())
212+
throw new IllegalArgumentException("Prefix expression cannot be empty.");
213+
214+
return fromPrefixes(scope, expression.split(","));
215+
}
216+
217+
/**
218+
* Creates a filter from a regex pattern.
219+
*
220+
* @param scope The scope of the filter.
221+
* @param pattern The regex pattern.
222+
* @return A new consume filter for the specified regex pattern.
223+
* @throws IllegalArgumentException If the pattern is invalid.
224+
*/
225+
public static ConsumeFilter fromRegex(ConsumeFilterScope scope, String pattern) {
226+
String expression = pattern.startsWith("~") ? pattern.substring(1) : pattern;
227+
228+
try {
229+
return new ConsumeFilter(
230+
scope,
231+
ConsumeFilterType.REGEX,
232+
expression,
233+
Pattern.compile(expression)
234+
);
235+
} catch (Exception ex) {
236+
throw new IllegalArgumentException("Invalid regex pattern: " + pattern, ex);
237+
}
238+
}
239+
240+
/**
241+
* Creates a filter from an expression.
242+
*
243+
* @param scope The scope of the filter.
244+
* @param expression The filter expression.
245+
* @return A new consume filter for the specified expression.
246+
* @throws IllegalArgumentException If the expression is invalid.
247+
*/
248+
public static ConsumeFilter create(ConsumeFilterScope scope, String expression) {
249+
if (expression == null || expression.trim().isEmpty())
250+
throw new IllegalArgumentException("Expression cannot be null or empty.");
251+
252+
if (expression.startsWith("~")) {
253+
return fromRegex(scope, expression);
254+
} else {
255+
return new ConsumeFilter(
256+
scope,
257+
ConsumeFilterType.LITERAL,
258+
expression,
259+
Pattern.compile(Pattern.quote(expression))
260+
);
261+
}
262+
}
263+
264+
/**
265+
* Converts a ConsumeFilter to a SubscriptionFilter.
266+
*
267+
* @param checkpointInterval The checkpoint interval to use.
268+
* @return The converted subscription filter, or null if the filter is empty.
269+
* @throws IllegalArgumentException If the filter is invalid.
270+
*/
271+
public SubscriptionFilter toSubscriptionFilter(int checkpointInterval) {
272+
if (isEmptyFilter())
273+
return null;
274+
275+
SubscriptionFilterBuilder builder = SubscriptionFilter.newBuilder();
276+
277+
// Set the checkpoint interval
278+
builder.withMaxWindow(checkpointInterval);
279+
280+
// Configure the filter based on its scope and type
281+
if (isStreamFilter()) {
282+
if (isRegexFilter()) {
283+
builder.withStreamNameRegularExpression(getExpression());
284+
} else if (isLiteralFilter()) {
285+
builder.addStreamNamePrefix(getExpression());
286+
}
287+
} else if (isRecordFilter()) {
288+
if (isRegexFilter()) {
289+
builder.withEventTypeRegularExpression(getExpression());
290+
} else if (isLiteralFilter()) {
291+
builder.addEventTypePrefix(getExpression());
292+
}
293+
} else {
294+
throw new IllegalArgumentException("Invalid consume filter.");
295+
}
296+
297+
return builder.build();
298+
}
299+
300+
@Override
301+
public String toString() {
302+
return "[" + scope + "|" + type + "] " + expression;
303+
}
304+
305+
@Override
306+
public boolean equals(Object o) {
307+
if (this == o) return true;
308+
if (o == null || getClass() != o.getClass()) return false;
309+
ConsumeFilter that = (ConsumeFilter) o;
310+
return scope == that.scope &&
311+
type == that.type &&
312+
Objects.equals(expression, that.expression);
313+
}
314+
315+
@Override
316+
public int hashCode() {
317+
return Objects.hash(scope, type, expression);
318+
}
319+
}
Lines changed: 35 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,35 @@
1+
package io.kurrent.dbclient.v2;
2+
3+
/**
4+
* Defines the scope of a consume filter.
5+
*/
6+
public enum ConsumeFilterScope {
7+
/**
8+
* Unspecified filter scope.
9+
*/
10+
UNSPECIFIED(0),
11+
12+
/**
13+
* Filter applies to stream names.
14+
*/
15+
STREAM(1),
16+
17+
/**
18+
* Filter applies to record types.
19+
*/
20+
RECORD(2);
21+
22+
private final int value;
23+
24+
ConsumeFilterScope(int value) {
25+
this.value = value;
26+
}
27+
28+
/**
29+
* Gets the integer value of the enum.
30+
* @return The integer value.
31+
*/
32+
public int getValue() {
33+
return value;
34+
}
35+
}
Lines changed: 35 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,35 @@
1+
package io.kurrent.dbclient.v2;
2+
3+
/**
4+
* Defines the type of a consume filter.
5+
*/
6+
public enum ConsumeFilterType {
7+
/**
8+
* Unspecified filter type.
9+
*/
10+
UNSPECIFIED(0),
11+
12+
/**
13+
* Literal string filter.
14+
*/
15+
LITERAL(1),
16+
17+
/**
18+
* Regular expression filter.
19+
*/
20+
REGEX(2);
21+
22+
private final int value;
23+
24+
ConsumeFilterType(int value) {
25+
this.value = value;
26+
}
27+
28+
/**
29+
* Gets the integer value of the enum.
30+
* @return The integer value.
31+
*/
32+
public int getValue() {
33+
return value;
34+
}
35+
}

0 commit comments

Comments
 (0)