@@ -137,3 +137,103 @@ impl TaskTransform<Event> for Dedupe {
137
137
Box :: pin ( task. filter_map ( move |v| ready ( inner. transform_one ( v) ) ) )
138
138
}
139
139
}
140
+
141
+ #[ cfg( test) ]
142
+ mod tests {
143
+ use crate :: event:: LogEvent ;
144
+ use crate :: test_util:: components:: assert_transform_compliance;
145
+ use crate :: transforms:: dedupe:: common:: { default_cache_config, FieldMatchConfig } ;
146
+ use crate :: transforms:: dedupe:: config:: DedupeConfig ;
147
+ use crate :: transforms:: test:: create_topology;
148
+ use tokio:: sync:: mpsc;
149
+ use tokio_stream:: wrappers:: ReceiverStream ;
150
+ use vector_lib:: lookup:: lookup_v2:: ConfigTargetPath ;
151
+ use vrl:: value:: Value ;
152
+
153
+ pub fn assert_eq_values ( left : LogEvent , right : LogEvent ) {
154
+ let inner_left = left. into_parts ( ) . 0 ;
155
+ let inner_right = right. into_parts ( ) . 0 ;
156
+ assert_eq ! ( inner_left, inner_right) ;
157
+ }
158
+
159
+ #[ tokio:: test]
160
+ async fn default_match ( ) {
161
+ let config = DedupeConfig {
162
+ cache : default_cache_config ( ) ,
163
+ fields : None ,
164
+ } ;
165
+
166
+ assert_transform_compliance ( async move {
167
+ let ( tx, rx) = mpsc:: channel ( 1 ) ;
168
+
169
+ let ( topology, mut out) = create_topology ( ReceiverStream :: new ( rx) , config) . await ;
170
+
171
+ let event1 = LogEvent :: from ( btreemap ! {
172
+ "message" => "foo" ,
173
+ "host" => "bar" ,
174
+ "timestamp" => "t1" ,
175
+ } ) ;
176
+ tx. send ( event1. clone ( ) . into ( ) ) . await . unwrap ( ) ;
177
+ let output = out. recv ( ) . await . unwrap ( ) . into_log ( ) ;
178
+ assert_eq_values ( event1. clone ( ) , output) ;
179
+
180
+ let event2 = event1. clone ( ) ;
181
+ tx. send ( event2. into ( ) ) . await . unwrap ( ) ;
182
+
183
+ let mut event3 = event1. clone ( ) ;
184
+ event3. insert ( "message" , Value :: from ( "another" ) ) ;
185
+ tx. send ( event3. clone ( ) . into ( ) ) . await . unwrap ( ) ;
186
+
187
+ let output = out. recv ( ) . await . unwrap ( ) . into_log ( ) ;
188
+ assert_eq_values ( event3, output) ;
189
+
190
+ drop ( tx) ;
191
+ topology. stop ( ) . await ;
192
+ assert_eq ! ( out. recv( ) . await , None ) ;
193
+ } )
194
+ . await
195
+ }
196
+
197
+ #[ tokio:: test]
198
+ async fn custom_match ( ) {
199
+ let config = DedupeConfig {
200
+ cache : default_cache_config ( ) ,
201
+ fields : Some ( FieldMatchConfig :: MatchFields ( vec ! [
202
+ ConfigTargetPath :: from( "a" ) ,
203
+ ConfigTargetPath :: from( "b" ) ,
204
+ ] ) ) ,
205
+ } ;
206
+
207
+ assert_transform_compliance ( async move {
208
+ let ( tx, rx) = mpsc:: channel ( 1 ) ;
209
+
210
+ let ( topology, mut out) = create_topology ( ReceiverStream :: new ( rx) , config) . await ;
211
+
212
+ let event1 = LogEvent :: from ( btreemap ! {
213
+ "message" => "foo" ,
214
+ "a" => 1 ,
215
+ "b" => 2 ,
216
+ } ) ;
217
+ tx. send ( event1. clone ( ) . into ( ) ) . await . unwrap ( ) ;
218
+ let output = out. recv ( ) . await . unwrap ( ) . into_log ( ) ;
219
+ assert_eq_values ( event1. clone ( ) , output) ;
220
+
221
+ let event2 = event1. clone ( ) ;
222
+ tx. send ( event2. into ( ) ) . await . unwrap ( ) ;
223
+
224
+ let event3 = LogEvent :: from ( btreemap ! {
225
+ "message" => "bar" ,
226
+ "a" => 3 ,
227
+ "b" => 2 ,
228
+ } ) ;
229
+ tx. send ( event3. clone ( ) . into ( ) ) . await . unwrap ( ) ;
230
+ let output = out. recv ( ) . await . unwrap ( ) . into_log ( ) ;
231
+ assert_eq_values ( event3, output) ;
232
+
233
+ drop ( tx) ;
234
+ topology. stop ( ) . await ;
235
+ assert_eq ! ( out. recv( ) . await , None ) ;
236
+ } )
237
+ . await
238
+ }
239
+ }
0 commit comments