Skip to content

Commit

Permalink
[INLONG-9600][Agent] Adjust the sinks directory for code consistency (a…
Browse files Browse the repository at this point in the history
  • Loading branch information
justinwwhuang authored Jan 23, 2024
1 parent ec1b01a commit deef029
Show file tree
Hide file tree
Showing 9 changed files with 73 additions and 338 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -17,30 +17,22 @@

package org.apache.inlong.agent.plugin.sinks;

import org.apache.inlong.agent.conf.JobProfile;
import org.apache.inlong.agent.message.PackProxyMessage;
import org.apache.inlong.agent.conf.InstanceProfile;
import org.apache.inlong.agent.message.filecollect.ProxyMessageCache;
import org.apache.inlong.agent.metrics.AgentMetricItem;
import org.apache.inlong.agent.metrics.AgentMetricItemSet;
import org.apache.inlong.agent.plugin.MessageFilter;
import org.apache.inlong.agent.plugin.Sink;
import org.apache.inlong.agent.plugin.file.Sink;
import org.apache.inlong.common.metric.MetricRegister;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicLong;

import static org.apache.inlong.agent.constant.AgentConstants.AGENT_MESSAGE_FILTER_CLASSNAME;
import static org.apache.inlong.agent.constant.CommonConstants.DEFAULT_PROXY_BATCH_FLUSH_INTERVAL;
import static org.apache.inlong.agent.constant.CommonConstants.DEFAULT_PROXY_INLONG_GROUP_ID;
import static org.apache.inlong.agent.constant.CommonConstants.DEFAULT_PROXY_INLONG_STREAM_ID;
import static org.apache.inlong.agent.constant.CommonConstants.PROXY_BATCH_FLUSH_INTERVAL;
import static org.apache.inlong.agent.constant.CommonConstants.PROXY_INLONG_GROUP_ID;
import static org.apache.inlong.agent.constant.CommonConstants.PROXY_INLONG_STREAM_ID;
import static org.apache.inlong.agent.constant.JobConstants.JOB_INSTANCE_ID;
import static org.apache.inlong.agent.metrics.AgentMetricItem.KEY_INLONG_GROUP_ID;
import static org.apache.inlong.agent.metrics.AgentMetricItem.KEY_INLONG_STREAM_ID;
import static org.apache.inlong.agent.metrics.AgentMetricItem.KEY_PLUGIN_ID;
Expand All @@ -60,39 +52,26 @@ public abstract class AbstractSink implements Sink {
protected Map<String, String> dimensions;
protected static final AtomicLong METRIC_INDEX = new AtomicLong(0);

protected JobProfile jobConf;
protected InstanceProfile profile;
protected String sourceName;
protected String jobInstanceId;
protected int batchFlushInterval;
// key is stream id, value is a batch of messages belongs to the same stream id
protected ConcurrentHashMap<String, PackProxyMessage> cache;

@Override
public MessageFilter initMessageFilter(JobProfile jobConf) {
if (jobConf.hasKey(AGENT_MESSAGE_FILTER_CLASSNAME)) {
try {
return (MessageFilter) Class.forName(jobConf.get(AGENT_MESSAGE_FILTER_CLASSNAME))
.getDeclaredConstructor().newInstance();
} catch (Exception e) {
LOGGER.error("init message filter error", e);
}
}
return null;
}
// key is stream id, value is a batch of messages belong to the same stream id
protected ProxyMessageCache cache;

@Override
public void setSourceName(String sourceFileName) {
this.sourceName = sourceFileName;
}

@Override
public void init(JobProfile jobConf) {
this.jobConf = jobConf;
jobInstanceId = jobConf.get(JOB_INSTANCE_ID);
inlongGroupId = jobConf.get(PROXY_INLONG_GROUP_ID, DEFAULT_PROXY_INLONG_GROUP_ID);
inlongStreamId = jobConf.get(PROXY_INLONG_STREAM_ID, DEFAULT_PROXY_INLONG_STREAM_ID);
cache = new ConcurrentHashMap<>(10);
batchFlushInterval = jobConf.getInt(PROXY_BATCH_FLUSH_INTERVAL, DEFAULT_PROXY_BATCH_FLUSH_INTERVAL);
public void init(InstanceProfile profile) {
this.profile = profile;
jobInstanceId = profile.getInstanceId();
inlongGroupId = profile.getInlongGroupId();
inlongStreamId = profile.getInlongStreamId();
cache = new ProxyMessageCache(this.profile, inlongGroupId, inlongStreamId);
batchFlushInterval = profile.getInt(PROXY_BATCH_FLUSH_INTERVAL, DEFAULT_PROXY_BATCH_FLUSH_INTERVAL);

this.dimensions = new HashMap<>();
dimensions.put(KEY_PLUGIN_ID, this.getClass().getSimpleName());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,11 +17,8 @@

package org.apache.inlong.agent.plugin.sinks;

import org.apache.inlong.agent.conf.JobProfile;
import org.apache.inlong.agent.conf.InstanceProfile;
import org.apache.inlong.agent.plugin.Message;
import org.apache.inlong.agent.plugin.MessageFilter;

import java.nio.charset.StandardCharsets;

/**
* message write to console
Expand All @@ -33,15 +30,8 @@ public ConsoleSink() {
}

@Override
public void write(Message message) {
if (message != null) {
System.out.println(new String(message.getBody(), StandardCharsets.UTF_8));
// increment the count of successful sinks
sinkMetric.sinkSuccessCount.incrementAndGet();
} else {
// increment the count of failed sinks
sinkMetric.sinkFailCount.incrementAndGet();
}
public boolean write(Message message) {
return true;
}

@Override
Expand All @@ -50,17 +40,17 @@ public void setSourceName(String sourceFileName) {
}

@Override
public MessageFilter initMessageFilter(JobProfile jobConf) {
return null;
}

@Override
public void init(JobProfile jobConf) {
public void init(InstanceProfile jobConf) {
super.init(jobConf);
}

@Override
public void destroy() {

}

@Override
public boolean sinkFinish() {
return false;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,15 +19,11 @@

import org.apache.inlong.agent.common.AgentThreadFactory;
import org.apache.inlong.agent.conf.AgentConfiguration;
import org.apache.inlong.agent.conf.JobProfile;
import org.apache.inlong.agent.conf.InstanceProfile;
import org.apache.inlong.agent.message.BatchProxyMessage;
import org.apache.inlong.agent.message.EndMessage;
import org.apache.inlong.agent.message.PackProxyMessage;
import org.apache.inlong.agent.message.ProxyMessage;
import org.apache.inlong.agent.metrics.audit.AuditUtils;
import org.apache.inlong.agent.plugin.Message;
import org.apache.inlong.agent.utils.AgentUtils;
import org.apache.inlong.agent.utils.ThreadUtils;
import org.apache.inlong.common.pojo.dataproxy.MQClusterInfo;

import com.google.common.base.Preconditions;
Expand Down Expand Up @@ -88,7 +84,7 @@ public class KafkaSink extends AbstractSink {
private boolean asyncSend;

@Override
public void init(JobProfile jobConf) {
public void init(InstanceProfile jobConf) {
super.init(jobConf);
int sendQueueSize = agentConf.getInt(KAFKA_SINK_SEND_QUEUE_SIZE, DEFAULT_SEND_QUEUE_SIZE);
kafkaSendQueue = new LinkedBlockingQueue<>(sendQueueSize);
Expand All @@ -103,38 +99,11 @@ public void init(JobProfile jobConf) {
kafkaSenders = new ArrayList<>();
initKafkaSender();
EXECUTOR_SERVICE.execute(sendDataThread());
EXECUTOR_SERVICE.execute(flushCache());
}

@Override
public void write(Message message) {
if (message == null || message instanceof EndMessage) {
return;
}

try {
ProxyMessage proxyMessage = new ProxyMessage(message);
// add proxy message to cache.
cache.compute(proxyMessage.getBatchKey(),
(s, packProxyMessage) -> {
if (packProxyMessage == null) {
packProxyMessage =
new PackProxyMessage(jobInstanceId, jobConf, inlongGroupId, inlongStreamId);
packProxyMessage.generateExtraMap(proxyMessage.getDataKey());
packProxyMessage.addTopicAndDataTime(topic, System.currentTimeMillis());
}
// add message to package proxy
packProxyMessage.addProxyMessage(proxyMessage);
return packProxyMessage;
});
// increment the count of successful sinks
sinkMetric.sinkSuccessCount.incrementAndGet();
} catch (Exception e) {
sinkMetric.sinkFailCount.incrementAndGet();
LOGGER.error("write job[{}] data to cache error", jobInstanceId, e);
} catch (Throwable t) {
ThreadUtils.threadThrowableHandler(Thread.currentThread(), t);
}
public boolean write(Message message) {
return true;
}

@Override
Expand All @@ -153,47 +122,8 @@ public void destroy() {
}
}

private boolean sinkFinish() {
return cache.values().stream().allMatch(PackProxyMessage::isEmpty) && kafkaSendQueue.isEmpty();
}

/**
* flush cache by batch
*
* @return thread runner
*/
private Runnable flushCache() {
return () -> {
LOGGER.info("start kafka sink flush cache thread, job[{}], groupId[{}]", jobInstanceId, inlongGroupId);
while (!shutdown) {
try {
cache.forEach((batchKey, packProxyMessage) -> {
BatchProxyMessage batchProxyMessage = packProxyMessage.fetchBatch();
if (batchProxyMessage == null) {
return;
}
try {
kafkaSendQueue.put(batchProxyMessage);
if (LOGGER.isDebugEnabled()) {
LOGGER.debug(
"send group id {}, message key {},with message size {}, the job id is {}, "
+ "read source is {} sendTime is {}",
inlongGroupId, batchKey,
batchProxyMessage.getDataList().size(), jobInstanceId, sourceName,
batchProxyMessage.getDataTime());
}
} catch (Exception e) {
LOGGER.error("flush job[{}] data to send queue exception", jobInstanceId, e);
}
});
AgentUtils.silenceSleepInMs(batchFlushInterval);
} catch (Exception ex) {
LOGGER.error("error caught", ex);
} catch (Throwable t) {
ThreadUtils.threadThrowableHandler(Thread.currentThread(), t);
}
}
};
public boolean sinkFinish() {
return true;
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
* limitations under the License.
*/

package org.apache.inlong.agent.plugin.sinks.filecollect;
package org.apache.inlong.agent.plugin.sinks;

import org.apache.inlong.agent.common.AgentThreadFactory;
import org.apache.inlong.agent.conf.InstanceProfile;
Expand All @@ -29,6 +29,7 @@
import org.apache.inlong.agent.message.filecollect.SenderMessage;
import org.apache.inlong.agent.plugin.Message;
import org.apache.inlong.agent.plugin.MessageFilter;
import org.apache.inlong.agent.plugin.sinks.filecollect.SenderManager;
import org.apache.inlong.agent.utils.AgentUtils;
import org.apache.inlong.agent.utils.ThreadUtils;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,15 +19,11 @@

import org.apache.inlong.agent.common.AgentThreadFactory;
import org.apache.inlong.agent.conf.AgentConfiguration;
import org.apache.inlong.agent.conf.JobProfile;
import org.apache.inlong.agent.conf.InstanceProfile;
import org.apache.inlong.agent.message.BatchProxyMessage;
import org.apache.inlong.agent.message.EndMessage;
import org.apache.inlong.agent.message.PackProxyMessage;
import org.apache.inlong.agent.message.ProxyMessage;
import org.apache.inlong.agent.metrics.audit.AuditUtils;
import org.apache.inlong.agent.plugin.Message;
import org.apache.inlong.agent.utils.AgentUtils;
import org.apache.inlong.agent.utils.ThreadUtils;
import org.apache.inlong.common.msg.InLongMsg;
import org.apache.inlong.common.pojo.dataproxy.MQClusterInfo;

Expand Down Expand Up @@ -115,7 +111,7 @@ public class PulsarSink extends AbstractSink {
private boolean asyncSend;

@Override
public void init(JobProfile jobConf) {
public void init(InstanceProfile jobConf) {
super.init(jobConf);
// agentConf
sendQueueSize = agentConf.getInt(PULSAR_SINK_SEND_QUEUE_SIZE, DEFAULT_SEND_QUEUE_SIZE);
Expand Down Expand Up @@ -149,41 +145,11 @@ public void init(JobProfile jobConf) {
pulsarSenders = new ArrayList<>();
initPulsarSender();
EXECUTOR_SERVICE.execute(sendDataThread());
EXECUTOR_SERVICE.execute(flushCache());
}

@Override
public void write(Message message) {
try {
if (message != null) {
if (!(message instanceof EndMessage)) {
ProxyMessage proxyMessage = new ProxyMessage(message);
// add proxy message to cache.
cache.compute(proxyMessage.getBatchKey(),
(s, packProxyMessage) -> {
if (packProxyMessage == null) {
packProxyMessage =
new PackProxyMessage(jobInstanceId, jobConf, inlongGroupId, inlongStreamId);
packProxyMessage.generateExtraMap(proxyMessage.getDataKey());
packProxyMessage.addTopicAndDataTime(topic, System.currentTimeMillis());
}
// add message to package proxy
packProxyMessage.addProxyMessage(proxyMessage);
return packProxyMessage;
});
// increment the count of successful sinks
sinkMetric.sinkSuccessCount.incrementAndGet();
} else {
// increment the count of failed sinks
sinkMetric.sinkFailCount.incrementAndGet();
}
}
} catch (Exception e) {
LOGGER.error("write message to Proxy sink error", e);
} catch (Throwable t) {
ThreadUtils.threadThrowableHandler(Thread.currentThread(), t);
}

public boolean write(Message message) {
return true;
}

@Override
Expand All @@ -203,45 +169,8 @@ public void destroy() {
}
}

private boolean sinkFinish() {
return cache.values().stream().allMatch(PackProxyMessage::isEmpty) && pulsarSendQueue.isEmpty();
}

/**
* flush cache by batch
*
* @return thread runner
*/
private Runnable flushCache() {
return () -> {
LOGGER.info("start flush cache thread for {} ProxySink", inlongGroupId);
while (!shutdown) {
try {
cache.forEach((batchKey, packProxyMessage) -> {
BatchProxyMessage batchProxyMessage = packProxyMessage.fetchBatch();
if (batchProxyMessage != null) {
try {
sendQueueSemaphore.acquire();
pulsarSendQueue.put(batchProxyMessage);
LOGGER.info("send group id {}, message key {},with message size {}, the job id is {}, "
+ "read source is {} sendTime is {}", inlongGroupId, batchKey,
batchProxyMessage.getDataList().size(), jobInstanceId, sourceName,
batchProxyMessage.getDataTime());
} catch (Exception e) {
sendQueueSemaphore.release();
LOGGER.error("flush data to send queue", e);
}
}
});
} catch (Exception ex) {
LOGGER.error("error caught", ex);
} catch (Throwable t) {
ThreadUtils.threadThrowableHandler(Thread.currentThread(), t);
} finally {
AgentUtils.silenceSleepInMs(batchFlushInterval);
}
}
};
public boolean sinkFinish() {
return true;
}

/**
Expand Down
Loading

0 comments on commit deef029

Please sign in to comment.