3
3
import com .microsoft .azure .functions .ExecutionContext ;
4
4
import com .microsoft .azure .functions .OutputBinding ;
5
5
import com .microsoft .azure .functions .annotation .*;
6
+ import it .gov .pagopa .nodoverifykotodatastore .exception .AppException ;
6
7
import it .gov .pagopa .nodoverifykotodatastore .util .Constants ;
7
8
import it .gov .pagopa .nodoverifykotodatastore .util .ObjectMapperUtils ;
8
9
import lombok .NonNull ;
21
22
*/
22
23
public class NodoVerifyKOEventToDataStore {
23
24
25
+ private static final Integer MAX_RETRY_COUNT = 5 ;
26
+
24
27
@ FunctionName ("EventHubNodoVerifyKOEventToDSProcessor" )
28
+ @ ExponentialBackoffRetry (maxRetryCount = 5 , maximumInterval = "00:15:00" , minimumInterval = "00:00:10" )
25
29
public void processNodoVerifyKOEvent (
26
30
@ EventHubTrigger (
27
31
name = "NodoVerifyKOEvent" ,
@@ -39,8 +43,16 @@ public void processNodoVerifyKOEvent (
39
43
@ NonNull OutputBinding <List <Object >> documentdb ,
40
44
final ExecutionContext context ) {
41
45
46
+ String errorCause = null ;
47
+ boolean isPersistenceOk = true ;
48
+ int retryIndex = context .getRetryContext () == null ? -1 : context .getRetryContext ().getRetrycount ();
49
+
42
50
Logger logger = context .getLogger ();
43
- logger .log (Level .INFO , () -> String .format ("Persisting [%d] events..." , events .size ()));
51
+ logger .log (Level .FINE , () -> String .format ("Persisting [%d] events..." , events .size ()));
52
+
53
+ if (retryIndex == MAX_RETRY_COUNT ) {
54
+ logger .log (Level .WARNING , () -> String .format ("[ALERT][LAST RETRY][VerifyKOToDS] Performing last retry for event ingestion: InvocationId [%s], Events: %s" , context .getInvocationId (), events ));
55
+ }
44
56
45
57
try {
46
58
if (events .size () == properties .length ) {
@@ -75,20 +87,45 @@ public void processNodoVerifyKOEvent (
75
87
eventsToPersist .add (event );
76
88
}
77
89
90
+ logger .log (Level .INFO , () -> String .format ("Performing event ingestion: InvocationId [%s], Retry Attempt [%d], Events: %s" , context .getInvocationId (), retryIndex , extractTraceForEventsToPersist (eventsToPersist )));
91
+
78
92
// save all events in the retrieved batch in the storage
79
93
persistEventBatch (logger , documentdb , eventsToPersist );
80
94
} else {
81
- logger .log (Level .SEVERE , () -> String .format ("[ALERT][VerifyKOToDS] AppException - Error processing events, lengths do not match: [events: %d - properties: %d]" , events .size (), properties .length ));
95
+ isPersistenceOk = false ;
96
+ errorCause = String .format ("[ALERT][VerifyKOToDS] AppException - Error processing events, lengths do not match: [events: %d - properties: %d]" , events .size (), properties .length );
82
97
}
83
98
} catch (IllegalArgumentException e ) {
84
- logger .log (Level .SEVERE , () -> "[ALERT][VerifyKOToDS] AppException - Illegal argument exception on cosmos nodo-verify-ko-events msg ingestion at " + LocalDateTime .now () + " : " + e );
99
+ isPersistenceOk = false ;
100
+ errorCause = "[ALERT][VerifyKOToDS] AppException - Illegal argument exception on cosmos nodo-verify-ko-events msg ingestion at " + LocalDateTime .now () + " : " + e ;
85
101
} catch (IllegalStateException e ) {
86
- logger .log (Level .SEVERE , () -> "[ALERT][VerifyKOToDS] AppException - Missing argument exception on nodo-verify-ko-events msg ingestion at " + LocalDateTime .now () + " : " + e );
102
+ isPersistenceOk = false ;
103
+ errorCause = "[ALERT][VerifyKOToDS] AppException - Missing argument exception on nodo-verify-ko-events msg ingestion at " + LocalDateTime .now () + " : " + e ;
87
104
} catch (Exception e ) {
88
- logger .log (Level .SEVERE , () -> "[ALERT][VerifyKOToDS] AppException - Generic exception on cosmos nodo-verify-ko-events msg ingestion at " + LocalDateTime .now () + " : " + e .getMessage ());
105
+ isPersistenceOk = false ;
106
+ errorCause = "[ALERT][VerifyKOToDS] AppException - Generic exception on cosmos nodo-verify-ko-events msg ingestion at " + LocalDateTime .now () + " : " + e .getMessage ();
89
107
}
108
+
109
+ if (!isPersistenceOk ) {
110
+ String finalErrorCause = errorCause ;
111
+ logger .log (Level .SEVERE , () -> finalErrorCause );
112
+ throw new AppException (errorCause );
113
+ }
90
114
}
91
115
116
+ @ SuppressWarnings ({"unchecked" })
117
+ private static String extractTraceForEventsToPersist (List <Object > eventsToPersist ) {
118
+ return Arrays .toString (eventsToPersist .stream ()
119
+ .map (event -> {
120
+ Map <String , Object > eventMap = (Map <String , Object >) event ;
121
+ String rowKey = getEventField (eventMap , "id" , String .class , "null" );
122
+ String partitionKey = getEventField (eventMap , Constants .PARTITION_KEY_EVENT_FIELD , String .class , "null" );
123
+ Long eventTimestamp = getEventField (eventMap , "faultBean.timestamp" , Long .class , -1L );
124
+ return String .format ("{PartitionKey: %s, RowKey: %s, EventTimestamp: %d}" , partitionKey , rowKey , eventTimestamp );
125
+ })
126
+ .toArray ());
127
+ }
128
+
92
129
private String fixDateTime (String faultBeanTimestamp ) {
93
130
int dotIndex = faultBeanTimestamp .indexOf ('.' );
94
131
if (dotIndex != -1 ) {
@@ -116,7 +153,7 @@ private String replaceDashWithUppercase(String input) {
116
153
117
154
private void persistEventBatch (Logger logger , OutputBinding <List <Object >> documentdb , List <Object > eventsToPersistCosmos ) {
118
155
documentdb .setValue (eventsToPersistCosmos );
119
- logger .info ( "Done processing events" );
156
+ logger .log ( Level . FINE , () -> "Done processing events" );
120
157
}
121
158
122
159
private String generatePartitionKey (Map <String , Object > event , String insertedDateValue ) {
@@ -127,7 +164,8 @@ private String generatePartitionKey(Map<String, Object> event, String insertedDa
127
164
getEventField (event , Constants .PSP_ID_EVENT_FIELD , String .class , Constants .NA );
128
165
}
129
166
130
- private <T > T getEventField (Map <String , Object > event , String name , Class <T > clazz , T defaultValue ) {
167
+ @ SuppressWarnings ({"rawtypes" })
168
+ private static <T > T getEventField (Map <String , Object > event , String name , Class <T > clazz , T defaultValue ) {
131
169
T field = null ;
132
170
List <String > splitPath = List .of (name .split ("\\ ." ));
133
171
Map eventSubset = event ;
0 commit comments