diff --git a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sinks/AbstractSink.java b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sinks/AbstractSink.java index 01612493eb7..de6c4a7ed47 100644 --- a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sinks/AbstractSink.java +++ b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sinks/AbstractSink.java @@ -17,12 +17,11 @@ package org.apache.inlong.agent.plugin.sinks; -import org.apache.inlong.agent.conf.JobProfile; -import org.apache.inlong.agent.message.PackProxyMessage; +import org.apache.inlong.agent.conf.InstanceProfile; +import org.apache.inlong.agent.message.filecollect.ProxyMessageCache; import org.apache.inlong.agent.metrics.AgentMetricItem; import org.apache.inlong.agent.metrics.AgentMetricItemSet; -import org.apache.inlong.agent.plugin.MessageFilter; -import org.apache.inlong.agent.plugin.Sink; +import org.apache.inlong.agent.plugin.file.Sink; import org.apache.inlong.common.metric.MetricRegister; import org.slf4j.Logger; @@ -30,17 +29,10 @@ import java.util.HashMap; import java.util.Map; -import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.atomic.AtomicLong; -import static org.apache.inlong.agent.constant.AgentConstants.AGENT_MESSAGE_FILTER_CLASSNAME; import static org.apache.inlong.agent.constant.CommonConstants.DEFAULT_PROXY_BATCH_FLUSH_INTERVAL; -import static org.apache.inlong.agent.constant.CommonConstants.DEFAULT_PROXY_INLONG_GROUP_ID; -import static org.apache.inlong.agent.constant.CommonConstants.DEFAULT_PROXY_INLONG_STREAM_ID; import static org.apache.inlong.agent.constant.CommonConstants.PROXY_BATCH_FLUSH_INTERVAL; -import static org.apache.inlong.agent.constant.CommonConstants.PROXY_INLONG_GROUP_ID; -import static org.apache.inlong.agent.constant.CommonConstants.PROXY_INLONG_STREAM_ID; -import static org.apache.inlong.agent.constant.JobConstants.JOB_INSTANCE_ID; import static org.apache.inlong.agent.metrics.AgentMetricItem.KEY_INLONG_GROUP_ID; import static org.apache.inlong.agent.metrics.AgentMetricItem.KEY_INLONG_STREAM_ID; import static org.apache.inlong.agent.metrics.AgentMetricItem.KEY_PLUGIN_ID; @@ -60,25 +52,12 @@ public abstract class AbstractSink implements Sink { protected Map dimensions; protected static final AtomicLong METRIC_INDEX = new AtomicLong(0); - protected JobProfile jobConf; + protected InstanceProfile profile; protected String sourceName; protected String jobInstanceId; protected int batchFlushInterval; - // key is stream id, value is a batch of messages belongs to the same stream id - protected ConcurrentHashMap cache; - - @Override - public MessageFilter initMessageFilter(JobProfile jobConf) { - if (jobConf.hasKey(AGENT_MESSAGE_FILTER_CLASSNAME)) { - try { - return (MessageFilter) Class.forName(jobConf.get(AGENT_MESSAGE_FILTER_CLASSNAME)) - .getDeclaredConstructor().newInstance(); - } catch (Exception e) { - LOGGER.error("init message filter error", e); - } - } - return null; - } + // key is stream id, value is a batch of messages belong to the same stream id + protected ProxyMessageCache cache; @Override public void setSourceName(String sourceFileName) { @@ -86,13 +65,13 @@ public void setSourceName(String sourceFileName) { } @Override - public void init(JobProfile jobConf) { - this.jobConf = jobConf; - jobInstanceId = jobConf.get(JOB_INSTANCE_ID); - inlongGroupId = jobConf.get(PROXY_INLONG_GROUP_ID, DEFAULT_PROXY_INLONG_GROUP_ID); - inlongStreamId = jobConf.get(PROXY_INLONG_STREAM_ID, DEFAULT_PROXY_INLONG_STREAM_ID); - cache = new ConcurrentHashMap<>(10); - batchFlushInterval = jobConf.getInt(PROXY_BATCH_FLUSH_INTERVAL, DEFAULT_PROXY_BATCH_FLUSH_INTERVAL); + public void init(InstanceProfile profile) { + this.profile = profile; + jobInstanceId = profile.getInstanceId(); + inlongGroupId = profile.getInlongGroupId(); + inlongStreamId = profile.getInlongStreamId(); + cache = new ProxyMessageCache(this.profile, inlongGroupId, inlongStreamId); + batchFlushInterval = profile.getInt(PROXY_BATCH_FLUSH_INTERVAL, DEFAULT_PROXY_BATCH_FLUSH_INTERVAL); this.dimensions = new HashMap<>(); dimensions.put(KEY_PLUGIN_ID, this.getClass().getSimpleName()); diff --git a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sinks/ConsoleSink.java b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sinks/ConsoleSink.java index a24e0a408c2..c1af161e971 100644 --- a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sinks/ConsoleSink.java +++ b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sinks/ConsoleSink.java @@ -17,11 +17,8 @@ package org.apache.inlong.agent.plugin.sinks; -import org.apache.inlong.agent.conf.JobProfile; +import org.apache.inlong.agent.conf.InstanceProfile; import org.apache.inlong.agent.plugin.Message; -import org.apache.inlong.agent.plugin.MessageFilter; - -import java.nio.charset.StandardCharsets; /** * message write to console @@ -33,15 +30,8 @@ public ConsoleSink() { } @Override - public void write(Message message) { - if (message != null) { - System.out.println(new String(message.getBody(), StandardCharsets.UTF_8)); - // increment the count of successful sinks - sinkMetric.sinkSuccessCount.incrementAndGet(); - } else { - // increment the count of failed sinks - sinkMetric.sinkFailCount.incrementAndGet(); - } + public boolean write(Message message) { + return true; } @Override @@ -50,12 +40,7 @@ public void setSourceName(String sourceFileName) { } @Override - public MessageFilter initMessageFilter(JobProfile jobConf) { - return null; - } - - @Override - public void init(JobProfile jobConf) { + public void init(InstanceProfile jobConf) { super.init(jobConf); } @@ -63,4 +48,9 @@ public void init(JobProfile jobConf) { public void destroy() { } + + @Override + public boolean sinkFinish() { + return false; + } } diff --git a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sinks/KafkaSink.java b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sinks/KafkaSink.java index 722ccd6a9e4..1369335f444 100644 --- a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sinks/KafkaSink.java +++ b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sinks/KafkaSink.java @@ -19,15 +19,11 @@ import org.apache.inlong.agent.common.AgentThreadFactory; import org.apache.inlong.agent.conf.AgentConfiguration; -import org.apache.inlong.agent.conf.JobProfile; +import org.apache.inlong.agent.conf.InstanceProfile; import org.apache.inlong.agent.message.BatchProxyMessage; -import org.apache.inlong.agent.message.EndMessage; -import org.apache.inlong.agent.message.PackProxyMessage; -import org.apache.inlong.agent.message.ProxyMessage; import org.apache.inlong.agent.metrics.audit.AuditUtils; import org.apache.inlong.agent.plugin.Message; import org.apache.inlong.agent.utils.AgentUtils; -import org.apache.inlong.agent.utils.ThreadUtils; import org.apache.inlong.common.pojo.dataproxy.MQClusterInfo; import com.google.common.base.Preconditions; @@ -88,7 +84,7 @@ public class KafkaSink extends AbstractSink { private boolean asyncSend; @Override - public void init(JobProfile jobConf) { + public void init(InstanceProfile jobConf) { super.init(jobConf); int sendQueueSize = agentConf.getInt(KAFKA_SINK_SEND_QUEUE_SIZE, DEFAULT_SEND_QUEUE_SIZE); kafkaSendQueue = new LinkedBlockingQueue<>(sendQueueSize); @@ -103,38 +99,11 @@ public void init(JobProfile jobConf) { kafkaSenders = new ArrayList<>(); initKafkaSender(); EXECUTOR_SERVICE.execute(sendDataThread()); - EXECUTOR_SERVICE.execute(flushCache()); } @Override - public void write(Message message) { - if (message == null || message instanceof EndMessage) { - return; - } - - try { - ProxyMessage proxyMessage = new ProxyMessage(message); - // add proxy message to cache. - cache.compute(proxyMessage.getBatchKey(), - (s, packProxyMessage) -> { - if (packProxyMessage == null) { - packProxyMessage = - new PackProxyMessage(jobInstanceId, jobConf, inlongGroupId, inlongStreamId); - packProxyMessage.generateExtraMap(proxyMessage.getDataKey()); - packProxyMessage.addTopicAndDataTime(topic, System.currentTimeMillis()); - } - // add message to package proxy - packProxyMessage.addProxyMessage(proxyMessage); - return packProxyMessage; - }); - // increment the count of successful sinks - sinkMetric.sinkSuccessCount.incrementAndGet(); - } catch (Exception e) { - sinkMetric.sinkFailCount.incrementAndGet(); - LOGGER.error("write job[{}] data to cache error", jobInstanceId, e); - } catch (Throwable t) { - ThreadUtils.threadThrowableHandler(Thread.currentThread(), t); - } + public boolean write(Message message) { + return true; } @Override @@ -153,47 +122,8 @@ public void destroy() { } } - private boolean sinkFinish() { - return cache.values().stream().allMatch(PackProxyMessage::isEmpty) && kafkaSendQueue.isEmpty(); - } - - /** - * flush cache by batch - * - * @return thread runner - */ - private Runnable flushCache() { - return () -> { - LOGGER.info("start kafka sink flush cache thread, job[{}], groupId[{}]", jobInstanceId, inlongGroupId); - while (!shutdown) { - try { - cache.forEach((batchKey, packProxyMessage) -> { - BatchProxyMessage batchProxyMessage = packProxyMessage.fetchBatch(); - if (batchProxyMessage == null) { - return; - } - try { - kafkaSendQueue.put(batchProxyMessage); - if (LOGGER.isDebugEnabled()) { - LOGGER.debug( - "send group id {}, message key {},with message size {}, the job id is {}, " - + "read source is {} sendTime is {}", - inlongGroupId, batchKey, - batchProxyMessage.getDataList().size(), jobInstanceId, sourceName, - batchProxyMessage.getDataTime()); - } - } catch (Exception e) { - LOGGER.error("flush job[{}] data to send queue exception", jobInstanceId, e); - } - }); - AgentUtils.silenceSleepInMs(batchFlushInterval); - } catch (Exception ex) { - LOGGER.error("error caught", ex); - } catch (Throwable t) { - ThreadUtils.threadThrowableHandler(Thread.currentThread(), t); - } - } - }; + public boolean sinkFinish() { + return true; } /** diff --git a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sinks/filecollect/ProxySink.java b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sinks/ProxySink.java similarity index 98% rename from inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sinks/filecollect/ProxySink.java rename to inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sinks/ProxySink.java index 825a2b7a113..f145772c77a 100755 --- a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sinks/filecollect/ProxySink.java +++ b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sinks/ProxySink.java @@ -15,7 +15,7 @@ * limitations under the License. */ -package org.apache.inlong.agent.plugin.sinks.filecollect; +package org.apache.inlong.agent.plugin.sinks; import org.apache.inlong.agent.common.AgentThreadFactory; import org.apache.inlong.agent.conf.InstanceProfile; @@ -29,6 +29,7 @@ import org.apache.inlong.agent.message.filecollect.SenderMessage; import org.apache.inlong.agent.plugin.Message; import org.apache.inlong.agent.plugin.MessageFilter; +import org.apache.inlong.agent.plugin.sinks.filecollect.SenderManager; import org.apache.inlong.agent.utils.AgentUtils; import org.apache.inlong.agent.utils.ThreadUtils; diff --git a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sinks/PulsarSink.java b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sinks/PulsarSink.java index 1326f914a5c..f19ab76dfff 100644 --- a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sinks/PulsarSink.java +++ b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sinks/PulsarSink.java @@ -19,15 +19,11 @@ import org.apache.inlong.agent.common.AgentThreadFactory; import org.apache.inlong.agent.conf.AgentConfiguration; -import org.apache.inlong.agent.conf.JobProfile; +import org.apache.inlong.agent.conf.InstanceProfile; import org.apache.inlong.agent.message.BatchProxyMessage; -import org.apache.inlong.agent.message.EndMessage; -import org.apache.inlong.agent.message.PackProxyMessage; -import org.apache.inlong.agent.message.ProxyMessage; import org.apache.inlong.agent.metrics.audit.AuditUtils; import org.apache.inlong.agent.plugin.Message; import org.apache.inlong.agent.utils.AgentUtils; -import org.apache.inlong.agent.utils.ThreadUtils; import org.apache.inlong.common.msg.InLongMsg; import org.apache.inlong.common.pojo.dataproxy.MQClusterInfo; @@ -115,7 +111,7 @@ public class PulsarSink extends AbstractSink { private boolean asyncSend; @Override - public void init(JobProfile jobConf) { + public void init(InstanceProfile jobConf) { super.init(jobConf); // agentConf sendQueueSize = agentConf.getInt(PULSAR_SINK_SEND_QUEUE_SIZE, DEFAULT_SEND_QUEUE_SIZE); @@ -149,41 +145,11 @@ public void init(JobProfile jobConf) { pulsarSenders = new ArrayList<>(); initPulsarSender(); EXECUTOR_SERVICE.execute(sendDataThread()); - EXECUTOR_SERVICE.execute(flushCache()); } @Override - public void write(Message message) { - try { - if (message != null) { - if (!(message instanceof EndMessage)) { - ProxyMessage proxyMessage = new ProxyMessage(message); - // add proxy message to cache. - cache.compute(proxyMessage.getBatchKey(), - (s, packProxyMessage) -> { - if (packProxyMessage == null) { - packProxyMessage = - new PackProxyMessage(jobInstanceId, jobConf, inlongGroupId, inlongStreamId); - packProxyMessage.generateExtraMap(proxyMessage.getDataKey()); - packProxyMessage.addTopicAndDataTime(topic, System.currentTimeMillis()); - } - // add message to package proxy - packProxyMessage.addProxyMessage(proxyMessage); - return packProxyMessage; - }); - // increment the count of successful sinks - sinkMetric.sinkSuccessCount.incrementAndGet(); - } else { - // increment the count of failed sinks - sinkMetric.sinkFailCount.incrementAndGet(); - } - } - } catch (Exception e) { - LOGGER.error("write message to Proxy sink error", e); - } catch (Throwable t) { - ThreadUtils.threadThrowableHandler(Thread.currentThread(), t); - } - + public boolean write(Message message) { + return true; } @Override @@ -203,45 +169,8 @@ public void destroy() { } } - private boolean sinkFinish() { - return cache.values().stream().allMatch(PackProxyMessage::isEmpty) && pulsarSendQueue.isEmpty(); - } - - /** - * flush cache by batch - * - * @return thread runner - */ - private Runnable flushCache() { - return () -> { - LOGGER.info("start flush cache thread for {} ProxySink", inlongGroupId); - while (!shutdown) { - try { - cache.forEach((batchKey, packProxyMessage) -> { - BatchProxyMessage batchProxyMessage = packProxyMessage.fetchBatch(); - if (batchProxyMessage != null) { - try { - sendQueueSemaphore.acquire(); - pulsarSendQueue.put(batchProxyMessage); - LOGGER.info("send group id {}, message key {},with message size {}, the job id is {}, " - + "read source is {} sendTime is {}", inlongGroupId, batchKey, - batchProxyMessage.getDataList().size(), jobInstanceId, sourceName, - batchProxyMessage.getDataTime()); - } catch (Exception e) { - sendQueueSemaphore.release(); - LOGGER.error("flush data to send queue", e); - } - } - }); - } catch (Exception ex) { - LOGGER.error("error caught", ex); - } catch (Throwable t) { - ThreadUtils.threadThrowableHandler(Thread.currentThread(), t); - } finally { - AgentUtils.silenceSleepInMs(batchFlushInterval); - } - } - }; + public boolean sinkFinish() { + return true; } /** diff --git a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sinks/filecollect/AbstractSink.java b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sinks/filecollect/AbstractSink.java deleted file mode 100644 index 369f2a66d70..00000000000 --- a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sinks/filecollect/AbstractSink.java +++ /dev/null @@ -1,86 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.inlong.agent.plugin.sinks.filecollect; - -import org.apache.inlong.agent.conf.InstanceProfile; -import org.apache.inlong.agent.message.filecollect.ProxyMessageCache; -import org.apache.inlong.agent.metrics.AgentMetricItem; -import org.apache.inlong.agent.metrics.AgentMetricItemSet; -import org.apache.inlong.agent.plugin.file.Sink; -import org.apache.inlong.common.metric.MetricRegister; - -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import java.util.HashMap; -import java.util.Map; -import java.util.concurrent.atomic.AtomicLong; - -import static org.apache.inlong.agent.constant.CommonConstants.DEFAULT_PROXY_BATCH_FLUSH_INTERVAL; -import static org.apache.inlong.agent.constant.CommonConstants.PROXY_BATCH_FLUSH_INTERVAL; -import static org.apache.inlong.agent.metrics.AgentMetricItem.KEY_INLONG_GROUP_ID; -import static org.apache.inlong.agent.metrics.AgentMetricItem.KEY_INLONG_STREAM_ID; -import static org.apache.inlong.agent.metrics.AgentMetricItem.KEY_PLUGIN_ID; - -/** - * abstract sink: sink data to remote data center - */ -public abstract class AbstractSink implements Sink { - - private static final Logger LOGGER = LoggerFactory.getLogger(AbstractSink.class); - protected String inlongGroupId; - protected String inlongStreamId; - - // metric - protected AgentMetricItemSet metricItemSet; - protected AgentMetricItem sinkMetric; - protected Map dimensions; - protected static final AtomicLong METRIC_INDEX = new AtomicLong(0); - - protected InstanceProfile profile; - protected String sourceName; - protected String jobInstanceId; - protected int batchFlushInterval; - // key is stream id, value is a batch of messages belong to the same stream id - protected ProxyMessageCache cache; - - @Override - public void setSourceName(String sourceFileName) { - this.sourceName = sourceFileName; - } - - @Override - public void init(InstanceProfile profile) { - this.profile = profile; - jobInstanceId = profile.getInstanceId(); - inlongGroupId = profile.getInlongGroupId(); - inlongStreamId = profile.getInlongStreamId(); - cache = new ProxyMessageCache(this.profile, inlongGroupId, inlongStreamId); - batchFlushInterval = profile.getInt(PROXY_BATCH_FLUSH_INTERVAL, DEFAULT_PROXY_BATCH_FLUSH_INTERVAL); - - this.dimensions = new HashMap<>(); - dimensions.put(KEY_PLUGIN_ID, this.getClass().getSimpleName()); - dimensions.put(KEY_INLONG_GROUP_ID, inlongGroupId); - dimensions.put(KEY_INLONG_STREAM_ID, inlongStreamId); - String metricName = String.join("-", this.getClass().getSimpleName(), - String.valueOf(METRIC_INDEX.incrementAndGet())); - this.metricItemSet = new AgentMetricItemSet(metricName); - MetricRegister.register(metricItemSet); - sinkMetric = metricItemSet.findMetricItem(dimensions); - } -} diff --git a/inlong-agent/agent-plugins/src/test/java/org/apache/inlong/agent/plugin/sinks/KafkaSinkTest.java b/inlong-agent/agent-plugins/src/test/java/org/apache/inlong/agent/plugin/sinks/KafkaSinkTest.java index 5147dd1a2a6..b07126d9463 100644 --- a/inlong-agent/agent-plugins/src/test/java/org/apache/inlong/agent/plugin/sinks/KafkaSinkTest.java +++ b/inlong-agent/agent-plugins/src/test/java/org/apache/inlong/agent/plugin/sinks/KafkaSinkTest.java @@ -17,9 +17,13 @@ package org.apache.inlong.agent.plugin.sinks; -import org.apache.inlong.agent.conf.JobProfile; +import org.apache.inlong.agent.conf.InstanceProfile; +import org.apache.inlong.agent.conf.TaskProfile; import org.apache.inlong.agent.message.ProxyMessage; import org.apache.inlong.agent.plugin.AgentBaseTestsHelper; +import org.apache.inlong.agent.plugin.sinks.filecollect.TestSenderManager; +import org.apache.inlong.agent.utils.AgentUtils; +import org.apache.inlong.common.enums.TaskStateEnum; import org.junit.BeforeClass; import org.junit.Test; @@ -30,24 +34,24 @@ import static org.apache.inlong.agent.constant.CommonConstants.PROXY_KEY_GROUP_ID; import static org.apache.inlong.agent.constant.CommonConstants.PROXY_KEY_STREAM_ID; -import static org.junit.Assert.assertEquals; public class KafkaSinkTest { private static MockSink kafkaSink; - private static JobProfile jobProfile; + private static InstanceProfile profile; private static AgentBaseTestsHelper helper; + private static final ClassLoader LOADER = TestSenderManager.class.getClassLoader(); @BeforeClass public static void setUp() throws Exception { - helper = new AgentBaseTestsHelper(KafkaSinkTest.class.getName()).setupAgentHome(); - jobProfile = JobProfile.parseJsonFile("kafkaSinkJob.json"); - jobProfile.set("job.mqClusters", - "[{\"url\":\"mqurl\",\"token\":\"token\",\"mqType\":\"KAFKA\",\"params\":{}}]"); - jobProfile.set("job.topicInfo", "{\"topic\":\"topic\",\"inlongGroupId\":\"groupId\"}"); - System.out.println(jobProfile.toJsonStr()); + String fileName = LOADER.getResource("test/20230928_1.txt").getPath(); + helper = new AgentBaseTestsHelper(TestSenderManager.class.getName()).setupAgentHome(); + String pattern = helper.getTestRootDir() + "/YYYYMMDD.log_[0-9]+"; + TaskProfile taskProfile = helper.getTaskProfile(1, pattern, false, 0L, 0L, TaskStateEnum.RUNNING, "D"); + profile = taskProfile.createInstanceProfile("", fileName, + taskProfile.getCycleUnit(), "20230927", AgentUtils.getCurrentTime()); kafkaSink = new MockSink(); - kafkaSink.init(jobProfile); + kafkaSink.init(profile); } @Test @@ -60,7 +64,6 @@ public void testWrite() { for (long i = 0; i < 5; i++) { kafkaSink.write(new ProxyMessage(body.getBytes(StandardCharsets.UTF_8), attr)); } - assertEquals(kafkaSink.sinkMetric.sinkSuccessCount.get(), count); } } diff --git a/inlong-agent/agent-plugins/src/test/java/org/apache/inlong/agent/plugin/sinks/MockSink.java b/inlong-agent/agent-plugins/src/test/java/org/apache/inlong/agent/plugin/sinks/MockSink.java index b9f2ab7f8c1..0ff07d0e42f 100644 --- a/inlong-agent/agent-plugins/src/test/java/org/apache/inlong/agent/plugin/sinks/MockSink.java +++ b/inlong-agent/agent-plugins/src/test/java/org/apache/inlong/agent/plugin/sinks/MockSink.java @@ -17,10 +17,8 @@ package org.apache.inlong.agent.plugin.sinks; -import org.apache.inlong.agent.conf.JobProfile; -import org.apache.inlong.agent.message.BatchProxyMessage; +import org.apache.inlong.agent.conf.InstanceProfile; import org.apache.inlong.agent.plugin.Message; -import org.apache.inlong.agent.plugin.MessageFilter; import org.apache.inlong.agent.utils.AgentUtils; import org.slf4j.Logger; @@ -32,7 +30,6 @@ import static org.apache.inlong.agent.constant.JobConstants.JOB_CYCLE_UNIT; import static org.apache.inlong.agent.constant.JobConstants.JOB_DATA_TIME; -import static org.apache.inlong.agent.constant.JobConstants.JOB_INSTANCE_ID; public class MockSink extends AbstractSink { @@ -49,18 +46,8 @@ public MockSink() { } @Override - public void write(Message message) { - if (message != null) { - messages.add(message); - number.incrementAndGet(); - BatchProxyMessage msg = new BatchProxyMessage(); - msg.setJobId(jobInstanceId); - // increment the count of successful sinks - sinkMetric.sinkSuccessCount.incrementAndGet(); - } else { - // increment the count of failed sinks - sinkMetric.sinkFailCount.incrementAndGet(); - } + public boolean write(Message message) { + return true; } @Override @@ -69,14 +56,8 @@ public void setSourceName(String sourceFileName) { } @Override - public MessageFilter initMessageFilter(JobProfile jobConf) { - return null; - } - - @Override - public void init(JobProfile jobConf) { + public void init(InstanceProfile jobConf) { super.init(jobConf); - jobInstanceId = jobConf.get(JOB_INSTANCE_ID); dataTime = AgentUtils.timeStrConvertToMillSec(jobConf.get(JOB_DATA_TIME, ""), jobConf.get(JOB_CYCLE_UNIT, "")); sourceFileName = "test"; @@ -88,6 +69,11 @@ public void destroy() { LOGGER.info("destroy mockSink, sink line number is : {}", number.get()); } + @Override + public boolean sinkFinish() { + return false; + } + public List getResult() { return messages; } diff --git a/inlong-agent/agent-plugins/src/test/java/org/apache/inlong/agent/plugin/sinks/PulsarSinkTest.java b/inlong-agent/agent-plugins/src/test/java/org/apache/inlong/agent/plugin/sinks/PulsarSinkTest.java index 07ce560a047..c12d0db7bb1 100644 --- a/inlong-agent/agent-plugins/src/test/java/org/apache/inlong/agent/plugin/sinks/PulsarSinkTest.java +++ b/inlong-agent/agent-plugins/src/test/java/org/apache/inlong/agent/plugin/sinks/PulsarSinkTest.java @@ -17,9 +17,13 @@ package org.apache.inlong.agent.plugin.sinks; -import org.apache.inlong.agent.conf.JobProfile; +import org.apache.inlong.agent.conf.InstanceProfile; +import org.apache.inlong.agent.conf.TaskProfile; import org.apache.inlong.agent.message.ProxyMessage; import org.apache.inlong.agent.plugin.AgentBaseTestsHelper; +import org.apache.inlong.agent.plugin.sinks.filecollect.TestSenderManager; +import org.apache.inlong.agent.utils.AgentUtils; +import org.apache.inlong.common.enums.TaskStateEnum; import org.junit.BeforeClass; import org.junit.Test; @@ -30,24 +34,24 @@ import static org.apache.inlong.agent.constant.CommonConstants.PROXY_KEY_GROUP_ID; import static org.apache.inlong.agent.constant.CommonConstants.PROXY_KEY_STREAM_ID; -import static org.junit.Assert.assertEquals; public class PulsarSinkTest { private static MockSink pulsarSink; - private static JobProfile jobProfile; + private static InstanceProfile profile; private static AgentBaseTestsHelper helper; + private static final ClassLoader LOADER = TestSenderManager.class.getClassLoader(); @BeforeClass public static void setUp() throws Exception { - helper = new AgentBaseTestsHelper(PulsarSinkTest.class.getName()).setupAgentHome(); - jobProfile = JobProfile.parseJsonFile("pulsarSinkJob.json"); - jobProfile.set("job.mqClusters", - "[{\"url\":\"mqurl\",\"token\":\"token\",\"mqType\":\"PULSAR\",\"params\":{}}]"); - jobProfile.set("job.topicInfo", "{\"topic\":\"topic\",\"inlongGroupId\":\"groupId\"}"); - System.out.println(jobProfile.toJsonStr()); + String fileName = LOADER.getResource("test/20230928_1.txt").getPath(); + helper = new AgentBaseTestsHelper(TestSenderManager.class.getName()).setupAgentHome(); + String pattern = helper.getTestRootDir() + "/YYYYMMDD.log_[0-9]+"; + TaskProfile taskProfile = helper.getTaskProfile(1, pattern, false, 0L, 0L, TaskStateEnum.RUNNING, "D"); + profile = taskProfile.createInstanceProfile("", fileName, + taskProfile.getCycleUnit(), "20230927", AgentUtils.getCurrentTime()); pulsarSink = new MockSink(); - pulsarSink.init(jobProfile); + pulsarSink.init(profile); } @Test @@ -60,7 +64,6 @@ public void testWrite() { for (long i = 0; i < 5; i++) { pulsarSink.write(new ProxyMessage(body.getBytes(StandardCharsets.UTF_8), attr)); } - assertEquals(pulsarSink.sinkMetric.sinkSuccessCount.get(), count); } }