From feeeb74934c0215b200f842e64d8be789b5e30da Mon Sep 17 00:00:00 2001 From: Richard Ogin Date: Fri, 18 Apr 2025 22:25:07 -0500 Subject: [PATCH] Add msg tracking info in VMRouter Signed-off-by: Richard Ogin --- .../connect/server/userutil/VMRouter.java | 344 +++++++++++++++--- .../util/javascript/JavaScriptScopeUtil.java | 58 ++- 2 files changed, 351 insertions(+), 51 deletions(-) diff --git a/server/src/com/mirth/connect/server/userutil/VMRouter.java b/server/src/com/mirth/connect/server/userutil/VMRouter.java index f1c8ca342a..879ee0d667 100644 --- a/server/src/com/mirth/connect/server/userutil/VMRouter.java +++ b/server/src/com/mirth/connect/server/userutil/VMRouter.java @@ -9,6 +9,14 @@ package com.mirth.connect.server.userutil; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collection; +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; @@ -28,77 +36,132 @@ public class VMRouter { private ChannelController channelController = ControllerFactory.getFactory().createChannelController(); private EngineController engineController = ControllerFactory.getFactory().createEngineController(); + private TrackingEnhancer trackingEnhancer; + /** * Instantiates a VMRouter object. */ public VMRouter() {} /** - * Dispatches a message to a channel, specified by the deployed channel name. If the dispatch - * fails for any reason (for example, if the target channel is not started), a Response object - * with the ERROR status and the error message will be returned. - * - * @param channelName - * The name of the deployed channel to dispatch the message to. - * @param message - * The message to dispatch to the channel. - * @return The Response object returned by the channel, if its source connector is configured to - * return one. + * Instantiates a VMRouter object with additional message tracking enhancements. + * + * @param channelId channel ID or "NONE" if null + * @param messageId message ID or -1L if null + * @param sourceMap the message's source map + */ + public VMRouter(String channelId, Long messageId, SourceMap sourceMap) { + this.trackingEnhancer = new TrackingEnhancer(channelId, messageId, sourceMap); + } + + /** + * Dispatches a message to a channel, specified by the deployed channel name. If + * the dispatch fails for any reason (for example, if the target channel is not + * started), a {@link Response} object with the {@link Status#ERROR} status and + * the error message will be returned. + * + * @param channelName The name of the deployed channel to dispatch the message + * to. + * @param message The message to dispatch to the channel. + * @return The {@link Response} object returned by the channel, if its source + * connector is configured to return one. */ public Response routeMessage(String channelName, String message) { - return routeMessage(channelName, new RawMessage(message)); + return routeMessage(channelName, createRawMessage(message, null, null)); } /** - * Dispatches a message to a channel, specified by the deployed channel name. If the dispatch - * fails for any reason (for example, if the target channel is not started), a Response object - * with the ERROR status and the error message will be returned. - * - * @param channelName - * The name of the deployed channel to dispatch the message to. - * @param rawMessage - * A RawMessage object to dispatch to the channel. - * @return The Response object returned by the channel, if its source connector is configured to - * return one. + * Dispatches a message to a channel, specified by the deployed channel name. If + * the dispatch fails for any reason (for example, if the target channel is not + * started), a {@link Response} object with the {@link Status#ERROR} status and + * the error message will be returned. + * + * @param channelName The name of the deployed channel to dispatch the message + * to. + * @param rawMessage A {@link RawMessage} object to dispatch to the channel. + * @return The {@link Response} object returned by the channel, if its source + * connector is configured to return one. */ public Response routeMessage(String channelName, RawMessage rawMessage) { com.mirth.connect.model.Channel channel = channelController.getDeployedChannelByName(channelName); if (channel == null) { - logger.error("Could not find channel to route to for channel name: " + channelName); - return new Response(Status.ERROR, "Could not find channel to route to for channel name: " + channelName); + String message = "Could not find channel to route to for channel name: " + channelName; + logger.error(message); + return new Response(Status.ERROR, message); } return routeMessageByChannelId(channel.getId(), rawMessage); } /** - * Dispatches a message to a channel, specified by the deployed channel ID. If the dispatch - * fails for any reason (for example, if the target channel is not started), a Response object - * with the ERROR status and the error message will be returned. - * - * @param channelId - * The ID of the deployed channel to dispatch the message to. - * @param message - * The message to dispatch to the channel. - * @return The Response object returned by the channel, if its source connector is configured to - * return one. + * Route a message to the specified channelName. Information about the chain of + * source channel Ids and source message Ids will be included in the sourceMap + * of the downstream message automatically in a similar manner as if a Channel + * Writer was being used. + * + * @param channelName The name of the channel to which to route the message. + * @param message The content of the message to be sent, textual or binary. + * As String or byte[]. + * @param sourceMap A map containing entries to include in the sourceMap of + * the sent message. + * @return The {@link Response} object returned by the channel. + * + * @see #routeMessage(String, Object, Map, Collection) + */ + public Response routeMessage(String channelName, Object message, Map sourceMap) { + return routeMessage(channelName, message, sourceMap, null); + } + + /** + * Route a message to the specified channelName. Information about the chain of + * source channel Ids and source message Ids will be included in the sourceMap + * of the downstream message automatically in a similar manner as if a Channel + * Writer was being used. + * + * @param channelName The name of the channel to which to route the message. + * @param message The content of the message to be sent, textual or + * binary. As String or byte[]. + * @param sourceMap A map containing entries to include in the sourceMap of + * the sent message. + * @param destinationSet A collection of integers (metadata IDs) representing + * which destinations to dispatch the message to. Null may + * be passed to indicate all destinations. If unspecified, + * all destinations is the default. + * @return The {@link Response} object returned by the channel. + * + * @see #routeMessage(String, RawMessage) + */ + public Response routeMessage(String channelName, Object message, Map sourceMap, + Collection destinationSet) { + return routeMessage(channelName, createRawMessage(message, sourceMap, destinationSet)); + } + + /** + * Dispatches a message to a channel, specified by the deployed channel ID. If + * the dispatch fails for any reason (for example, if the target channel is not + * started), a {@link Response} object with the {@link Status#ERROR} status and + * the error message will be returned. + * + * @param channelId The ID of the deployed channel to dispatch the message to. + * @param message The message to dispatch to the channel. + * @return The {@link Response} object returned by the channel, if its source + * connector is configured to return one. */ public Response routeMessageByChannelId(String channelId, String message) { - return routeMessageByChannelId(channelId, new RawMessage(message)); + return routeMessageByChannelId(channelId, createRawMessage(message, null, null)); } /** - * Dispatches a message to a channel, specified by the deployed channel ID. If the dispatch - * fails for any reason (for example, if the target channel is not started), a Response object - * with the ERROR status and the error message will be returned. - * - * @param channelId - * The ID of the deployed channel to dispatch the message to. - * @param rawMessage - * A RawMessage object to dispatch to the channel. - * @return The Response object returned by the channel, if its source connector is configured to - * return one. + * Dispatches a message to a channel, specified by the deployed channel ID. If + * the dispatch fails for any reason (for example, if the target channel is not + * started), a {@link Response} object with the {@link Status#ERROR} status and + * the error message will be returned. + * + * @param channelId The ID of the deployed channel to dispatch the message to. + * @param rawMessage A {@link RawMessage} object to dispatch to the channel. + * @return The {@link Response} object returned by the channel, if its source + * connector is configured to return one. */ public Response routeMessageByChannelId(String channelId, RawMessage rawMessage) { try { @@ -119,11 +182,198 @@ public Response routeMessageByChannelId(String channelId, RawMessage rawMessage) } } + /** + * Route a message to the specified channelId. Information about the chain of + * source channel Ids and source message Ids will be included in the sourceMap + * of the downstream message automatically in a similar manner as if a Channel + * Writer was being used. + * + * @param channelId The unique identifier of the channel to which to route the + * message. + * @param message The content of the message to be sent, textual or binary. As + * String or byte[]. + * @return The {@link Response} object returned by the channel. + * + * @see #routeMessageByChannelId(String, Object, Map, Collection) + */ + public Response routeMessageByChannelId(String channelId, Object message) { + return routeMessageByChannelId(channelId, message, null, null); + } + + /** + * Route a message to the specified channelId. Information about the chain of + * source channel Ids and source message Ids will be included in the sourceMap + * of the downstream message automatically in a similar manner as if a Channel + * Writer was being used. + * + * @param channelId The unique identifier of the channel to which to route the + * message. + * @param message The content of the message to be sent, textual or binary. As + * String or byte[]. + * @param sourceMap A map containing entries to include in the sourceMap of the + * sent message. + * @return The {@link Response} object returned by the channel. + * + * @see #routeMessageByChannelId(String, Object, Map, Collection) + */ + public Response routeMessageByChannelId(String channelId, Object message, Map sourceMap) { + return routeMessageByChannelId(channelId, message, sourceMap, null); + } + + /** + * Route a message to the specified channelId. Information about the chain of + * source channel Ids and source message Ids will be included in the sourceMap + * of the downstream message automatically in a similar manner as if a Channel + * Writer was being used. + * + * @param channelId The unique identifier of the channel to which to route + * the message. + * @param message The content of the message to be sent, textual or + * binary. As String or byte[]. + * @param sourceMap A map containing entries to include in the sourceMap of + * the sent message. + * @param destinationSet A collection of integers (metadata IDs) representing + * which destinations to dispatch the message to. Null may + * be passed to indicate all destinations. If unspecified, + * all destinations is the default. + * @return The {@link Response} object returned by the channel. + * + * @see #routeMessageByChannelId(String, RawMessage) + */ + public Response routeMessageByChannelId(String channelId, Object message, Map sourceMap, + Collection destinationSet) { + return routeMessageByChannelId(channelId, createRawMessage(message, sourceMap, destinationSet)); + } + private com.mirth.connect.donkey.model.message.RawMessage convertRawMessage(RawMessage message) { if (message.isBinary()) { - return new com.mirth.connect.donkey.model.message.RawMessage(message.getRawBytes(), message.getDestinationMetaDataIds(), message.getSourceMap()); + return new com.mirth.connect.donkey.model.message.RawMessage(message.getRawBytes(), + message.getDestinationMetaDataIds(), message.getSourceMap()); + } else { + return new com.mirth.connect.donkey.model.message.RawMessage(message.getRawData(), + message.getDestinationMetaDataIds(), message.getSourceMap()); + } + } + + /** + * Create a {@link RawMessage} with the specified content, sourceMap, and + * destinationSet. + * + * @param message The content of the message to be sent, textual or + * binary. As String or byte[]. + * @param sourceMap A map containing entries to include in the sourceMap of + * the {@link RawMessage} (optional). + * @param destinationSet A collection of integers (metadata IDs) representing + * which destinations to dispatch the message to. Null may + * be passed to indicate all destinations. If unspecified, + * all destinations is the default (optional). + * @return A {@link RawMessage} object containing the message, source, and + * destination information. + */ + public RawMessage createRawMessage(Object message, Map sourceMap, + Collection destinationSet) { + if (trackingEnhancer != null) { + sourceMap = trackingEnhancer.enrich(sourceMap); + } + + if (message instanceof byte[]) { + return new RawMessage((byte[]) message, destinationSet, sourceMap); } else { - return new com.mirth.connect.donkey.model.message.RawMessage(message.getRawData(), message.getDestinationMetaDataIds(), message.getSourceMap()); + return new RawMessage(message.toString(), destinationSet, sourceMap); + } + } + + /** + * Adds additional message tracking data. + * + * TrackingEnhancer + */ + private class TrackingEnhancer { + private String channelId; + private Long messageId; + private SourceMap envSourceMap; + + /** + * Create a new enhancer with the given parameters. + * + * @param channelId channel ID; null defaults to "NONE" + * @param messageId message ID; null defaults to -1L + * @param sourceMap the message's source map + */ + private TrackingEnhancer(String channelId, Long messageId, SourceMap sourceMap) { + this.channelId = channelId != null ? channelId : "NONE"; + this.messageId = messageId != null ? messageId : -1L; + this.envSourceMap = sourceMap; + } + + /** + * Enrich the given source map with additional message tracking data. + * + * @param messageSourceMap + * @return a new Map + */ + private Map enrich(Map messageSourceMap) { + if (messageSourceMap == null) { + messageSourceMap = Collections.emptyMap(); + } + + List sourceChannelIds = lookupAsList("sourceChannelIds", "sourceChannelId"); + List sourceMessageIds = lookupAsList("sourceMessageIds", "sourceMessageId"); + + HashMap newSourceMap = new HashMap(messageSourceMap); + String channelId = this.channelId; + Long messageId = this.messageId; + + sourceChannelIds.add(channelId); + sourceMessageIds.add(messageId.toString()); + + newSourceMap.put("sourceChannelIds", sourceChannelIds); + newSourceMap.put("sourceChannelId", channelId); + newSourceMap.put("sourceMessageIds", sourceMessageIds); + newSourceMap.put("sourceMessageId", messageId); + + return newSourceMap; + } + + /** + * Given the specified lookup keys, return the first non-null value as a List. + * The expectation is the first lookup will return a List, while the second + * returns an Object. + * + * @param primary primary lookup key to return a List + * @param secondary secondary lookup key to return an Object + * @return a List containing the first non-null lookup value, else an empty List + */ + private List lookupAsList(String primary, String secondary) { + List result = new ArrayList(); + + Object primaryValue = lookupInEnvSourceMap(primary); + + if (primaryValue != null) { + // all of this to not assume the result is a List + if (primaryValue instanceof Collection) { + ((Collection) primaryValue).stream().map(i -> i.toString()).forEach(result::add); + } else if (primaryValue instanceof Object[]) { + Arrays.stream((Object[]) primaryValue).map(i -> i.toString()).forEach(result::add); + } + } else { + Object secondaryValue = lookupInEnvSourceMap(secondary); + if (secondaryValue != null) { + result.add(secondaryValue.toString()); + } + } + + return result; + } + + /** + * Look up a value from the environment's {@link SourceMap} + * + * @param key + * @return its mapped value, can be null + */ + private Object lookupInEnvSourceMap(String key) { + return this.envSourceMap.get(key); } } -} +} \ No newline at end of file diff --git a/server/src/com/mirth/connect/server/util/javascript/JavaScriptScopeUtil.java b/server/src/com/mirth/connect/server/util/javascript/JavaScriptScopeUtil.java index 3eb537eb9d..b9bb00ec66 100644 --- a/server/src/com/mirth/connect/server/util/javascript/JavaScriptScopeUtil.java +++ b/server/src/com/mirth/connect/server/util/javascript/JavaScriptScopeUtil.java @@ -82,6 +82,10 @@ protected static Context getContext(ContextFactory contextFactory) { return context; } + public Context doGetContext(ContextFactory contextFactory) { + return JavaScriptScopeUtil.getContext(contextFactory); + } + protected static ScriptableObject createSealedSharedScope(ContextFactory contextFactory) { Context context = contextFactory.enterContext(); @@ -159,6 +163,14 @@ private static void addRouter(Scriptable scope) { add("router", scope, new VMRouter()); } + private static void addRouterEnhancement(Scriptable scope, String channelId, Long messageId, Map sourceMap) { + if (sourceMap == null) { + sourceMap = Collections.emptyMap(); + } + + add("router", scope, new VMRouter(channelId, messageId, new SourceMap(Collections.unmodifiableMap(sourceMap)))); + } + // Replacer private static void addReplacer(Scriptable scope) { add("replacer", scope, new TemplateValueReplacer()); @@ -248,6 +260,9 @@ public static Scriptable getAttachmentScope(ContextFactory contextFactory, Objec add("sourceMap", scope, new SourceMap(Collections.unmodifiableMap(message.getSourceMap()))); add("mirth_attachments", scope, attachments); add("binary", scope, isBinary); + + addRouterEnhancement(scope, channelId, message.getOriginalMessageId(), message.getSourceMap()); + return scope; } @@ -260,6 +275,8 @@ public static Scriptable getPreprocessorScope(ContextFactory contextFactory, Obj addRawMessage(scope, message); addConnectorMessage(scope, connectorMessage); + addRouterEnhancement(scope, channelId, connectorMessage.getMessageId(), connectorMessage.getSourceMap()); + return scope; } @@ -271,6 +288,9 @@ public static Scriptable getPostprocessorScope(ContextFactory contextFactory, Ob Scriptable scope = getBasicScope(getContext(contextFactory), logger, channelId, message.getMergedConnectorMessage().getChannelName()); addStatusValues(scope); addMessage(scope, message); + + addRouterEnhancement(scope, channelId, message.getMessageId(), message.getMergedConnectorMessage().getSourceMap()); + return scope; } @@ -283,6 +303,9 @@ public static Scriptable getPostprocessorScope(ContextFactory contextFactory, Ob addMessage(scope, message); addStatusValues(scope); add("response", scope, response); + + addRouterEnhancement(scope, channelId, message.getMessageId(), message.getMergedConnectorMessage().getSourceMap()); + return scope; } @@ -295,6 +318,9 @@ public static Scriptable getFilterTransformerScope(ContextFactory contextFactory addConnectorMessage(scope, message); add("template", scope, template); add("phase", scope, phase); + + addRouterEnhancement(scope, null, message.getMessageId(), message.getSourceMap()); + return scope; } @@ -308,6 +334,9 @@ public static Scriptable getResponseTransformerScope(ContextFactory contextFacto addResponse(scope, response); addStatusValues(scope); add("template", scope, template); + + addRouterEnhancement(scope, null, message.getMessageId(), message.getSourceMap()); + return scope; } @@ -316,7 +345,11 @@ public static Scriptable getResponseTransformerScope(ContextFactory contextFacto * try-finally with Context.exit() in the finally block. */ public static Scriptable getDeployScope(ContextFactory contextFactory, Object logger, String channelId, String channelName) { - return getBasicScope(getContext(contextFactory), logger, channelId, channelName); + Scriptable scope = getBasicScope(getContext(contextFactory), logger, channelId, channelName); + + addRouterEnhancement(scope, channelId, null, null); + + return scope; } /** @@ -331,8 +364,13 @@ public static Scriptable getDeployScope(ContextFactory contextFactory, Object lo * Since this method calls getContext(), anything calling it should wrap this method in a * try-finally with Context.exit() in the finally block. */ - public static Scriptable getUndeployScope(ContextFactory contextFactory, Object logger, String channelId, String channelName) { - return getBasicScope(getContext(contextFactory), logger, channelId, channelName); + public static Scriptable getUndeployScope(ContextFactory contextFactory, Object logger, String channelId, + String channelName) { + Scriptable scope = getBasicScope(getContext(contextFactory), logger, channelId, channelName); + + addRouterEnhancement(scope, channelId, null, null); + + return scope; } /** @@ -348,7 +386,11 @@ public static Scriptable getUndeployScope(ContextFactory contextFactory, Object * try-finally with Context.exit() in the finally block. */ public static Scriptable getMessageReceiverScope(ContextFactory contextFactory, Object logger, String channelId, String channelName) { - return getBasicScope(getContext(contextFactory), logger, channelId, channelName); + Scriptable scope = getBasicScope(getContext(contextFactory), logger, channelId, channelName); + + addRouterEnhancement(scope, channelId, null, null); + + return scope; } /** @@ -358,6 +400,9 @@ public static Scriptable getMessageReceiverScope(ContextFactory contextFactory, public static Scriptable getMessageReceiverScope(ContextFactory contextFactory, Object logger, String channelId, ImmutableConnectorMessage message) { Scriptable scope = getBasicScope(getContext(contextFactory), logger, channelId, message.getChannelName()); addConnectorMessage(scope, message); + + addRouterEnhancement(scope, channelId, message.getMessageId(), message.getSourceMap()); + return scope; } @@ -369,6 +414,9 @@ public static Scriptable getMessageDispatcherScope(ContextFactory contextFactory Scriptable scope = getBasicScope(getContext(contextFactory), logger, channelId, message.getChannelName()); addConnectorMessage(scope, message); addStatusValues(scope); + + addRouterEnhancement(scope, channelId, message.getMessageId(), message.getSourceMap()); + return scope; } @@ -387,6 +435,8 @@ public static Scriptable getBatchProcessorScope(ContextFactory contextFactory, O addChannel(scope, channelId, channelName); } + addRouterEnhancement(scope, channelId, null, null); + return scope; }