Skip to content

Commit feeeb74

Browse files
roginjonbartels
authored andcommitted
Add msg tracking info in VMRouter
Signed-off-by: Richard Ogin <rogin@users.noreply.github.com>
1 parent 47e75ee commit feeeb74

File tree

2 files changed

+351
-51
lines changed

2 files changed

+351
-51
lines changed

server/src/com/mirth/connect/server/userutil/VMRouter.java

Lines changed: 297 additions & 47 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,14 @@
99

1010
package com.mirth.connect.server.userutil;
1111

12+
import java.util.ArrayList;
13+
import java.util.Arrays;
14+
import java.util.Collection;
15+
import java.util.Collections;
16+
import java.util.HashMap;
17+
import java.util.List;
18+
import java.util.Map;
19+
1220
import org.apache.logging.log4j.LogManager;
1321
import org.apache.logging.log4j.Logger;
1422

@@ -28,77 +36,132 @@ public class VMRouter {
2836
private ChannelController channelController = ControllerFactory.getFactory().createChannelController();
2937
private EngineController engineController = ControllerFactory.getFactory().createEngineController();
3038

39+
private TrackingEnhancer trackingEnhancer;
40+
3141
/**
3242
* Instantiates a VMRouter object.
3343
*/
3444
public VMRouter() {}
3545

3646
/**
37-
* Dispatches a message to a channel, specified by the deployed channel name. If the dispatch
38-
* fails for any reason (for example, if the target channel is not started), a Response object
39-
* with the ERROR status and the error message will be returned.
40-
*
41-
* @param channelName
42-
* The name of the deployed channel to dispatch the message to.
43-
* @param message
44-
* The message to dispatch to the channel.
45-
* @return The Response object returned by the channel, if its source connector is configured to
46-
* return one.
47+
* Instantiates a VMRouter object with additional message tracking enhancements.
48+
*
49+
* @param channelId channel ID or "NONE" if null
50+
* @param messageId message ID or -1L if null
51+
* @param sourceMap the message's source map
52+
*/
53+
public VMRouter(String channelId, Long messageId, SourceMap sourceMap) {
54+
this.trackingEnhancer = new TrackingEnhancer(channelId, messageId, sourceMap);
55+
}
56+
57+
/**
58+
* Dispatches a message to a channel, specified by the deployed channel name. If
59+
* the dispatch fails for any reason (for example, if the target channel is not
60+
* started), a {@link Response} object with the {@link Status#ERROR} status and
61+
* the error message will be returned.
62+
*
63+
* @param channelName The name of the deployed channel to dispatch the message
64+
* to.
65+
* @param message The message to dispatch to the channel.
66+
* @return The {@link Response} object returned by the channel, if its source
67+
* connector is configured to return one.
4768
*/
4869
public Response routeMessage(String channelName, String message) {
49-
return routeMessage(channelName, new RawMessage(message));
70+
return routeMessage(channelName, createRawMessage(message, null, null));
5071
}
5172

5273
/**
53-
* Dispatches a message to a channel, specified by the deployed channel name. If the dispatch
54-
* fails for any reason (for example, if the target channel is not started), a Response object
55-
* with the ERROR status and the error message will be returned.
56-
*
57-
* @param channelName
58-
* The name of the deployed channel to dispatch the message to.
59-
* @param rawMessage
60-
* A RawMessage object to dispatch to the channel.
61-
* @return The Response object returned by the channel, if its source connector is configured to
62-
* return one.
74+
* Dispatches a message to a channel, specified by the deployed channel name. If
75+
* the dispatch fails for any reason (for example, if the target channel is not
76+
* started), a {@link Response} object with the {@link Status#ERROR} status and
77+
* the error message will be returned.
78+
*
79+
* @param channelName The name of the deployed channel to dispatch the message
80+
* to.
81+
* @param rawMessage A {@link RawMessage} object to dispatch to the channel.
82+
* @return The {@link Response} object returned by the channel, if its source
83+
* connector is configured to return one.
6384
*/
6485
public Response routeMessage(String channelName, RawMessage rawMessage) {
6586
com.mirth.connect.model.Channel channel = channelController.getDeployedChannelByName(channelName);
6687

6788
if (channel == null) {
68-
logger.error("Could not find channel to route to for channel name: " + channelName);
69-
return new Response(Status.ERROR, "Could not find channel to route to for channel name: " + channelName);
89+
String message = "Could not find channel to route to for channel name: " + channelName;
90+
logger.error(message);
91+
return new Response(Status.ERROR, message);
7092
}
7193

7294
return routeMessageByChannelId(channel.getId(), rawMessage);
7395
}
7496

7597
/**
76-
* Dispatches a message to a channel, specified by the deployed channel ID. If the dispatch
77-
* fails for any reason (for example, if the target channel is not started), a Response object
78-
* with the ERROR status and the error message will be returned.
79-
*
80-
* @param channelId
81-
* The ID of the deployed channel to dispatch the message to.
82-
* @param message
83-
* The message to dispatch to the channel.
84-
* @return The Response object returned by the channel, if its source connector is configured to
85-
* return one.
98+
* Route a message to the specified channelName. Information about the chain of
99+
* source channel Ids and source message Ids will be included in the sourceMap
100+
* of the downstream message automatically in a similar manner as if a Channel
101+
* Writer was being used.
102+
*
103+
* @param channelName The name of the channel to which to route the message.
104+
* @param message The content of the message to be sent, textual or binary.
105+
* As String or byte[].
106+
* @param sourceMap A map containing entries to include in the sourceMap of
107+
* the sent message.
108+
* @return The {@link Response} object returned by the channel.
109+
*
110+
* @see #routeMessage(String, Object, Map, Collection)
111+
*/
112+
public Response routeMessage(String channelName, Object message, Map<String, Object> sourceMap) {
113+
return routeMessage(channelName, message, sourceMap, null);
114+
}
115+
116+
/**
117+
* Route a message to the specified channelName. Information about the chain of
118+
* source channel Ids and source message Ids will be included in the sourceMap
119+
* of the downstream message automatically in a similar manner as if a Channel
120+
* Writer was being used.
121+
*
122+
* @param channelName The name of the channel to which to route the message.
123+
* @param message The content of the message to be sent, textual or
124+
* binary. As String or byte[].
125+
* @param sourceMap A map containing entries to include in the sourceMap of
126+
* the sent message.
127+
* @param destinationSet A collection of integers (metadata IDs) representing
128+
* which destinations to dispatch the message to. Null may
129+
* be passed to indicate all destinations. If unspecified,
130+
* all destinations is the default.
131+
* @return The {@link Response} object returned by the channel.
132+
*
133+
* @see #routeMessage(String, RawMessage)
134+
*/
135+
public Response routeMessage(String channelName, Object message, Map<String, Object> sourceMap,
136+
Collection<Number> destinationSet) {
137+
return routeMessage(channelName, createRawMessage(message, sourceMap, destinationSet));
138+
}
139+
140+
/**
141+
* Dispatches a message to a channel, specified by the deployed channel ID. If
142+
* the dispatch fails for any reason (for example, if the target channel is not
143+
* started), a {@link Response} object with the {@link Status#ERROR} status and
144+
* the error message will be returned.
145+
*
146+
* @param channelId The ID of the deployed channel to dispatch the message to.
147+
* @param message The message to dispatch to the channel.
148+
* @return The {@link Response} object returned by the channel, if its source
149+
* connector is configured to return one.
86150
*/
87151
public Response routeMessageByChannelId(String channelId, String message) {
88-
return routeMessageByChannelId(channelId, new RawMessage(message));
152+
return routeMessageByChannelId(channelId, createRawMessage(message, null, null));
89153
}
90154

91155
/**
92-
* Dispatches a message to a channel, specified by the deployed channel ID. If the dispatch
93-
* fails for any reason (for example, if the target channel is not started), a Response object
94-
* with the ERROR status and the error message will be returned.
95-
*
96-
* @param channelId
97-
* The ID of the deployed channel to dispatch the message to.
98-
* @param rawMessage
99-
* A RawMessage object to dispatch to the channel.
100-
* @return The Response object returned by the channel, if its source connector is configured to
101-
* return one.
156+
* Dispatches a message to a channel, specified by the deployed channel ID. If
157+
* the dispatch fails for any reason (for example, if the target channel is not
158+
* started), a {@link Response} object with the {@link Status#ERROR} status and
159+
* the error message will be returned.
160+
*
161+
* @param channelId The ID of the deployed channel to dispatch the message to.
162+
* @param rawMessage A {@link RawMessage} object to dispatch to the channel.
163+
* @return The {@link Response} object returned by the channel, if its source
164+
* connector is configured to return one.
102165
*/
103166
public Response routeMessageByChannelId(String channelId, RawMessage rawMessage) {
104167
try {
@@ -119,11 +182,198 @@ public Response routeMessageByChannelId(String channelId, RawMessage rawMessage)
119182
}
120183
}
121184

185+
/**
186+
* Route a message to the specified channelId. Information about the chain of
187+
* source channel Ids and source message Ids will be included in the sourceMap
188+
* of the downstream message automatically in a similar manner as if a Channel
189+
* Writer was being used.
190+
*
191+
* @param channelId The unique identifier of the channel to which to route the
192+
* message.
193+
* @param message The content of the message to be sent, textual or binary. As
194+
* String or byte[].
195+
* @return The {@link Response} object returned by the channel.
196+
*
197+
* @see #routeMessageByChannelId(String, Object, Map, Collection)
198+
*/
199+
public Response routeMessageByChannelId(String channelId, Object message) {
200+
return routeMessageByChannelId(channelId, message, null, null);
201+
}
202+
203+
/**
204+
* Route a message to the specified channelId. Information about the chain of
205+
* source channel Ids and source message Ids will be included in the sourceMap
206+
* of the downstream message automatically in a similar manner as if a Channel
207+
* Writer was being used.
208+
*
209+
* @param channelId The unique identifier of the channel to which to route the
210+
* message.
211+
* @param message The content of the message to be sent, textual or binary. As
212+
* String or byte[].
213+
* @param sourceMap A map containing entries to include in the sourceMap of the
214+
* sent message.
215+
* @return The {@link Response} object returned by the channel.
216+
*
217+
* @see #routeMessageByChannelId(String, Object, Map, Collection)
218+
*/
219+
public Response routeMessageByChannelId(String channelId, Object message, Map<String, Object> sourceMap) {
220+
return routeMessageByChannelId(channelId, message, sourceMap, null);
221+
}
222+
223+
/**
224+
* Route a message to the specified channelId. Information about the chain of
225+
* source channel Ids and source message Ids will be included in the sourceMap
226+
* of the downstream message automatically in a similar manner as if a Channel
227+
* Writer was being used.
228+
*
229+
* @param channelId The unique identifier of the channel to which to route
230+
* the message.
231+
* @param message The content of the message to be sent, textual or
232+
* binary. As String or byte[].
233+
* @param sourceMap A map containing entries to include in the sourceMap of
234+
* the sent message.
235+
* @param destinationSet A collection of integers (metadata IDs) representing
236+
* which destinations to dispatch the message to. Null may
237+
* be passed to indicate all destinations. If unspecified,
238+
* all destinations is the default.
239+
* @return The {@link Response} object returned by the channel.
240+
*
241+
* @see #routeMessageByChannelId(String, RawMessage)
242+
*/
243+
public Response routeMessageByChannelId(String channelId, Object message, Map<String, Object> sourceMap,
244+
Collection<Number> destinationSet) {
245+
return routeMessageByChannelId(channelId, createRawMessage(message, sourceMap, destinationSet));
246+
}
247+
122248
private com.mirth.connect.donkey.model.message.RawMessage convertRawMessage(RawMessage message) {
123249
if (message.isBinary()) {
124-
return new com.mirth.connect.donkey.model.message.RawMessage(message.getRawBytes(), message.getDestinationMetaDataIds(), message.getSourceMap());
250+
return new com.mirth.connect.donkey.model.message.RawMessage(message.getRawBytes(),
251+
message.getDestinationMetaDataIds(), message.getSourceMap());
252+
} else {
253+
return new com.mirth.connect.donkey.model.message.RawMessage(message.getRawData(),
254+
message.getDestinationMetaDataIds(), message.getSourceMap());
255+
}
256+
}
257+
258+
/**
259+
* Create a {@link RawMessage} with the specified content, sourceMap, and
260+
* destinationSet.
261+
*
262+
* @param message The content of the message to be sent, textual or
263+
* binary. As String or byte[].
264+
* @param sourceMap A map containing entries to include in the sourceMap of
265+
* the {@link RawMessage} (optional).
266+
* @param destinationSet A collection of integers (metadata IDs) representing
267+
* which destinations to dispatch the message to. Null may
268+
* be passed to indicate all destinations. If unspecified,
269+
* all destinations is the default (optional).
270+
* @return A {@link RawMessage} object containing the message, source, and
271+
* destination information.
272+
*/
273+
public RawMessage createRawMessage(Object message, Map<String, Object> sourceMap,
274+
Collection<Number> destinationSet) {
275+
if (trackingEnhancer != null) {
276+
sourceMap = trackingEnhancer.enrich(sourceMap);
277+
}
278+
279+
if (message instanceof byte[]) {
280+
return new RawMessage((byte[]) message, destinationSet, sourceMap);
125281
} else {
126-
return new com.mirth.connect.donkey.model.message.RawMessage(message.getRawData(), message.getDestinationMetaDataIds(), message.getSourceMap());
282+
return new RawMessage(message.toString(), destinationSet, sourceMap);
283+
}
284+
}
285+
286+
/**
287+
* Adds additional message tracking data.
288+
*
289+
* TrackingEnhancer
290+
*/
291+
private class TrackingEnhancer {
292+
private String channelId;
293+
private Long messageId;
294+
private SourceMap envSourceMap;
295+
296+
/**
297+
* Create a new enhancer with the given parameters.
298+
*
299+
* @param channelId channel ID; null defaults to "NONE"
300+
* @param messageId message ID; null defaults to -1L
301+
* @param sourceMap the message's source map
302+
*/
303+
private TrackingEnhancer(String channelId, Long messageId, SourceMap sourceMap) {
304+
this.channelId = channelId != null ? channelId : "NONE";
305+
this.messageId = messageId != null ? messageId : -1L;
306+
this.envSourceMap = sourceMap;
307+
}
308+
309+
/**
310+
* Enrich the given source map with additional message tracking data.
311+
*
312+
* @param messageSourceMap
313+
* @return a new Map
314+
*/
315+
private Map<String, Object> enrich(Map<String, Object> messageSourceMap) {
316+
if (messageSourceMap == null) {
317+
messageSourceMap = Collections.emptyMap();
318+
}
319+
320+
List<String> sourceChannelIds = lookupAsList("sourceChannelIds", "sourceChannelId");
321+
List<String> sourceMessageIds = lookupAsList("sourceMessageIds", "sourceMessageId");
322+
323+
HashMap<String, Object> newSourceMap = new HashMap<String, Object>(messageSourceMap);
324+
String channelId = this.channelId;
325+
Long messageId = this.messageId;
326+
327+
sourceChannelIds.add(channelId);
328+
sourceMessageIds.add(messageId.toString());
329+
330+
newSourceMap.put("sourceChannelIds", sourceChannelIds);
331+
newSourceMap.put("sourceChannelId", channelId);
332+
newSourceMap.put("sourceMessageIds", sourceMessageIds);
333+
newSourceMap.put("sourceMessageId", messageId);
334+
335+
return newSourceMap;
336+
}
337+
338+
/**
339+
* Given the specified lookup keys, return the first non-null value as a List.
340+
* The expectation is the first lookup will return a List, while the second
341+
* returns an Object.
342+
*
343+
* @param primary primary lookup key to return a List
344+
* @param secondary secondary lookup key to return an Object
345+
* @return a List containing the first non-null lookup value, else an empty List
346+
*/
347+
private List<String> lookupAsList(String primary, String secondary) {
348+
List<String> result = new ArrayList<String>();
349+
350+
Object primaryValue = lookupInEnvSourceMap(primary);
351+
352+
if (primaryValue != null) {
353+
// all of this to not assume the result is a List<String>
354+
if (primaryValue instanceof Collection) {
355+
((Collection<?>) primaryValue).stream().map(i -> i.toString()).forEach(result::add);
356+
} else if (primaryValue instanceof Object[]) {
357+
Arrays.stream((Object[]) primaryValue).map(i -> i.toString()).forEach(result::add);
358+
}
359+
} else {
360+
Object secondaryValue = lookupInEnvSourceMap(secondary);
361+
if (secondaryValue != null) {
362+
result.add(secondaryValue.toString());
363+
}
364+
}
365+
366+
return result;
367+
}
368+
369+
/**
370+
* Look up a value from the environment's {@link SourceMap}
371+
*
372+
* @param key
373+
* @return its mapped value, can be null
374+
*/
375+
private Object lookupInEnvSourceMap(String key) {
376+
return this.envSourceMap.get(key);
127377
}
128378
}
129-
}
379+
}

0 commit comments

Comments
 (0)