From 083613f7e5d3720af45ec4e48ba84b836402a7e9 Mon Sep 17 00:00:00 2001 From: wenweihuang Date: Thu, 13 Feb 2025 21:21:59 +0800 Subject: [PATCH] [INLONG-11760][Agent] Increase the number of global instances control --- .../inlong/agent/constant/AgentConstants.java | 2 ++ .../agent/constant/CommonConstants.java | 2 +- .../apache/inlong/agent/plugin/file/Task.java | 2 ++ .../inlong/agent/core/HeartbeatManager.java | 2 +- .../agent/core/instance/InstanceManager.java | 21 +++++++++++++++++-- .../inlong/agent/core/task/TaskManager.java | 8 +++++++ .../inlong/agent/plugin/sinks/ProxySink.java | 4 ++-- .../inlong/agent/plugin/sinks/Sender.java | 4 ++-- .../agent/plugin/task/AbstractTask.java | 7 ++++++- .../inlong/agent/plugin/task/CronTask.java | 5 +++++ .../plugin/instance/TestInstanceManager.java | 2 +- .../inlong/agent/plugin/task/MockTask.java | 5 +++++ 12 files changed, 54 insertions(+), 10 deletions(-) diff --git a/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/constant/AgentConstants.java b/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/constant/AgentConstants.java index 0c73c12852a..45ba7892506 100755 --- a/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/constant/AgentConstants.java +++ b/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/constant/AgentConstants.java @@ -81,6 +81,8 @@ public class AgentConstants { public static final String DEFAULT_AGENT_SCAN_RANGE_DAY = "-2"; public static final String DEFAULT_AGENT_SCAN_RANGE_HOUR = "-10"; public static final String DEFAULT_AGENT_SCAN_RANGE_MINUTE = "-600"; + public static final String AGENT_INSTANCE_LIMIT = "agent.instance.limit"; + public static final int DEFAULT_AGENT_INSTANCE_LIMIT = 100; // pulsar sink config public static final String PULSAR_CLIENT_IO_TREHAD_NUM = "agent.sink.pulsar.client.io.thread.num"; diff --git a/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/constant/CommonConstants.java b/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/constant/CommonConstants.java index 757db41afdd..db0e509718f 100644 --- a/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/constant/CommonConstants.java +++ b/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/constant/CommonConstants.java @@ -29,7 +29,7 @@ public class CommonConstants { public static final String DEFAULT_PROXY_INLONG_STREAM_ID = "default_inlong_stream_id"; public static final String PROXY_TOTAL_ASYNC_PROXY_SIZE = "proxy.total.async.proxy.size"; - public static final int DEFAULT_PROXY_TOTAL_ASYNC_PROXY_SIZE = 200 * 1024 * 1024; + public static final int DEFAULT_PROXY_TOTAL_ASYNC_PROXY_SIZE_KB = 200 * 1024; public static final String PROXY_ALIVE_CONNECTION_NUM = "proxy.alive.connection.num"; public static final int DEFAULT_PROXY_ALIVE_CONNECTION_NUM = 10; diff --git a/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/plugin/file/Task.java b/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/plugin/file/Task.java index 40ad855db79..ea1577917cc 100755 --- a/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/plugin/file/Task.java +++ b/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/plugin/file/Task.java @@ -54,4 +54,6 @@ public abstract class Task extends AbstractStateWrapper { * is profile valid */ public abstract boolean isProfileValid(TaskProfile profile); + + public abstract int getInstanceNum(); } diff --git a/inlong-agent/agent-core/src/main/java/org/apache/inlong/agent/core/HeartbeatManager.java b/inlong-agent/agent-core/src/main/java/org/apache/inlong/agent/core/HeartbeatManager.java index 82cd5cb6484..d1d759cd0af 100644 --- a/inlong-agent/agent-core/src/main/java/org/apache/inlong/agent/core/HeartbeatManager.java +++ b/inlong-agent/agent-core/src/main/java/org/apache/inlong/agent/core/HeartbeatManager.java @@ -202,7 +202,7 @@ private void createMessageSender() { proxyClientConfig = new TcpMsgSenderConfig(managerAddr, INLONG_AGENT_SYSTEM, authSecretId, authSecretKey); proxyClientConfig.setMaxInFlightSizeInKb( - CommonConstants.DEFAULT_PROXY_TOTAL_ASYNC_PROXY_SIZE / 1024); + CommonConstants.DEFAULT_PROXY_TOTAL_ASYNC_PROXY_SIZE_KB); proxyClientConfig.setAliveConnections(CommonConstants.DEFAULT_PROXY_ALIVE_CONNECTION_NUM); proxyClientConfig.setNettyWorkerThreadNum(CommonConstants.DEFAULT_PROXY_CLIENT_IO_THREAD_NUM); proxyClientConfig.setRequestTimeoutMs(30000L); diff --git a/inlong-agent/agent-core/src/main/java/org/apache/inlong/agent/core/instance/InstanceManager.java b/inlong-agent/agent-core/src/main/java/org/apache/inlong/agent/core/instance/InstanceManager.java index 23c16cabe8c..65a584fa8a5 100644 --- a/inlong-agent/agent-core/src/main/java/org/apache/inlong/agent/core/instance/InstanceManager.java +++ b/inlong-agent/agent-core/src/main/java/org/apache/inlong/agent/core/instance/InstanceManager.java @@ -22,6 +22,8 @@ import org.apache.inlong.agent.conf.AgentConfiguration; import org.apache.inlong.agent.conf.InstanceProfile; import org.apache.inlong.agent.conf.TaskProfile; +import org.apache.inlong.agent.constant.AgentConstants; +import org.apache.inlong.agent.core.task.TaskManager; import org.apache.inlong.agent.metrics.audit.AuditUtils; import org.apache.inlong.agent.plugin.Instance; import org.apache.inlong.agent.store.InstanceStore; @@ -81,6 +83,8 @@ public class InstanceManager extends AbstractDaemon { private long auditVersion; private volatile boolean running = false; private final double reserveCoefficient = 0.8; + protected TaskManager taskManager; + private final int globalInstanceLimit; private class InstancePrintStat { @@ -118,7 +122,9 @@ public String toString() { /** * Init task manager. */ - public InstanceManager(String taskId, int instanceLimit, Store basicStore, TaskStore taskStore) { + public InstanceManager(TaskManager taskManager, String taskId, int instanceLimit, Store basicStore, + TaskStore taskStore) { + this.taskManager = taskManager; this.taskId = taskId; instanceStore = new InstanceStore(basicStore); this.taskStore = taskStore; @@ -127,6 +133,8 @@ public InstanceManager(String taskId, int instanceLimit, Store basicStore, TaskS this.instanceLimit = instanceLimit; actionQueue = new LinkedBlockingQueue<>(ACTION_QUEUE_CAPACITY); addActionQueue = new LinkedBlockingQueue<>(ACTION_QUEUE_CAPACITY); + globalInstanceLimit = agentConf.getInt(AgentConstants.AGENT_INSTANCE_LIMIT, + AgentConstants.DEFAULT_AGENT_INSTANCE_LIMIT); } public String getTaskId() { @@ -292,8 +300,12 @@ private void dealWithActionQueue() { private void dealWithAddActionQueue() { while (isRunnable()) { + if (taskManager != null && taskManager.getInstanceNum() > globalInstanceLimit) { + LOGGER.error("global instance num {} over limit {}", taskManager.getInstanceNum(), globalInstanceLimit); + return; + } if (instanceMap.size() > instanceLimit) { - LOGGER.error("instanceMap size {} over limit {}", instanceMap.size(), instanceLimit); + LOGGER.error("task {} instanceMap size {} over limit {}", taskId, instanceMap.size(), instanceLimit); return; } InstanceAction action = addActionQueue.poll(); @@ -507,4 +519,9 @@ public long getFinishedInstanceCount() { } return count; } + + public int getInstanceNum() { + + return instanceMap.size(); + } } \ No newline at end of file diff --git a/inlong-agent/agent-core/src/main/java/org/apache/inlong/agent/core/task/TaskManager.java b/inlong-agent/agent-core/src/main/java/org/apache/inlong/agent/core/task/TaskManager.java index 6db94a546b6..201318a30ff 100644 --- a/inlong-agent/agent-core/src/main/java/org/apache/inlong/agent/core/task/TaskManager.java +++ b/inlong-agent/agent-core/src/main/java/org/apache/inlong/agent/core/task/TaskManager.java @@ -527,6 +527,14 @@ public Task getTask(String taskId) { return taskMap.get(taskId); } + public int getInstanceNum() { + int num = 0; + for (Task task : taskMap.values()) { + num += task.getInstanceNum(); + } + return num; + } + public TaskProfile getTaskProfile(String taskId) { return taskStore.getTask(taskId); } diff --git a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sinks/ProxySink.java b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sinks/ProxySink.java index 92bfaa427f5..288878a43be 100755 --- a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sinks/ProxySink.java +++ b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sinks/ProxySink.java @@ -275,8 +275,8 @@ private void doFlushOffset() { } MemoryManager.getInstance().release(AGENT_GLOBAL_WRITER_PERMIT, lenToRelease); if (info != null) { - LOGGER.info("save offset {} taskId {} instanceId {}", info.getOffset(), profile.getTaskId(), - profile.getInstanceId()); + LOGGER.info("save offset {} taskId {} instanceId {} ackInfoList {}", info.getOffset(), profile.getTaskId(), + profile.getInstanceId(), ackInfoList.size()); OffsetProfile offsetProfile = new OffsetProfile(profile.getTaskId(), profile.getInstanceId(), info.getOffset(), profile.get(INODE_INFO)); offsetManager.setOffset(offsetProfile); diff --git a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sinks/Sender.java b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sinks/Sender.java index 3195ee45c2c..a310bfcdba0 100755 --- a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sinks/Sender.java +++ b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sinks/Sender.java @@ -120,7 +120,7 @@ public Sender(InstanceProfile profile, String inlongGroupId, String sourcePath) totalAsyncBufSize = profile .getInt( CommonConstants.PROXY_TOTAL_ASYNC_PROXY_SIZE, - CommonConstants.DEFAULT_PROXY_TOTAL_ASYNC_PROXY_SIZE); + CommonConstants.DEFAULT_PROXY_TOTAL_ASYNC_PROXY_SIZE_KB); aliveConnectionNum = profile .getInt( CommonConstants.PROXY_ALIVE_CONNECTION_NUM, CommonConstants.DEFAULT_PROXY_ALIVE_CONNECTION_NUM); @@ -203,7 +203,7 @@ private AgentMetricItem getMetricItem(String groupId, String streamId) { private void createMessageSender() throws Exception { TcpMsgSenderConfig proxyClientConfig = new TcpMsgSenderConfig( managerAddr, inlongGroupId, authSecretId, authSecretKey); - proxyClientConfig.setMaxInFlightSizeInKb(totalAsyncBufSize / 1024); + proxyClientConfig.setMaxInFlightSizeInKb(totalAsyncBufSize); proxyClientConfig.setAliveConnections(aliveConnectionNum); proxyClientConfig.setRequestTimeoutMs(maxSenderTimeout * 1000L); proxyClientConfig.setNettyWorkerThreadNum(ioThreadNum); diff --git a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/task/AbstractTask.java b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/task/AbstractTask.java index ef8107c68ec..780a99b8448 100644 --- a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/task/AbstractTask.java +++ b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/task/AbstractTask.java @@ -60,7 +60,7 @@ public void init(Object srcManager, TaskProfile taskProfile, Store basicStore) t this.taskProfile = taskProfile; this.basicStore = basicStore; auditVersion = Long.parseLong(taskProfile.get(TASK_AUDIT_VERSION)); - instanceManager = new InstanceManager(taskProfile.getTaskId(), getInstanceLimit(), + instanceManager = new InstanceManager(taskManager, taskProfile.getTaskId(), getInstanceLimit(), basicStore, taskManager.getTaskStore()); try { instanceManager.start(); @@ -163,4 +163,9 @@ protected boolean shouldAddAgain(String fileName, long lastModifyTime) { protected boolean isFull() { return instanceManager.isFull(); } + + @Override + public int getInstanceNum() { + return instanceManager.getInstanceNum(); + } } diff --git a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/task/CronTask.java b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/task/CronTask.java index 8f333b8d465..deecc120f38 100644 --- a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/task/CronTask.java +++ b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/task/CronTask.java @@ -56,6 +56,11 @@ public boolean isProfileValid(TaskProfile profile) { return true; } + @Override + public int getInstanceNum() { + return 0; + } + @Override public void addCallbacks() { diff --git a/inlong-agent/agent-plugins/src/test/java/org/apache/inlong/agent/plugin/instance/TestInstanceManager.java b/inlong-agent/agent-plugins/src/test/java/org/apache/inlong/agent/plugin/instance/TestInstanceManager.java index 4340bd6176c..172387bce27 100755 --- a/inlong-agent/agent-plugins/src/test/java/org/apache/inlong/agent/plugin/instance/TestInstanceManager.java +++ b/inlong-agent/agent-plugins/src/test/java/org/apache/inlong/agent/plugin/instance/TestInstanceManager.java @@ -83,7 +83,7 @@ public static void setup() { Store taskBasicStore = TaskManager.initStore(AgentConstants.AGENT_STORE_PATH_TASK); TaskStore taskStore = new TaskStore(taskBasicStore); taskStore.storeTask(taskProfile); - manager = new InstanceManager("1", 20, basicInstanceStore, taskStore); + manager = new InstanceManager(null, "1", 20, basicInstanceStore, taskStore); manager.CORE_THREAD_SLEEP_TIME_MS = 100; } diff --git a/inlong-agent/agent-plugins/src/test/java/org/apache/inlong/agent/plugin/task/MockTask.java b/inlong-agent/agent-plugins/src/test/java/org/apache/inlong/agent/plugin/task/MockTask.java index acfce5bb9a7..32faf568d25 100644 --- a/inlong-agent/agent-plugins/src/test/java/org/apache/inlong/agent/plugin/task/MockTask.java +++ b/inlong-agent/agent-plugins/src/test/java/org/apache/inlong/agent/plugin/task/MockTask.java @@ -62,6 +62,11 @@ public boolean isProfileValid(TaskProfile profile) { return true; } + @Override + public int getInstanceNum() { + return 0; + } + @Override public void addCallbacks() {