Skip to content

Commit 4ee2ef2

Browse files
committed
[MINOR] Extended DML config for federated worker parallelism
sysds.federated.par_conn (number of concurrent connections in even loop) sysds.federated.par_inst (instruction parallelism worker instructions) For both, if the value is <=0, we use by default the number of virtual cores as reported by the JVM.
1 parent a865d75 commit 4ee2ef2

File tree

5 files changed

+20
-3
lines changed

5 files changed

+20
-3
lines changed

conf/SystemDS-config.xml.template

+6
Original file line numberDiff line numberDiff line change
@@ -118,6 +118,12 @@
118118
<!-- set the federated plan generator (none, [runtime], compile_fed_all, compile_fed_heuristic, compile_cost_based) -->
119119
<sysds.federated.planner>runtime</sysds.federated.planner>
120120

121+
<!-- set the degree of parallelism of the federated worker event loop (<=0 means number of virtual cores) -->
122+
<sysds.federated.par_conn>0</sysds.federated.par_conn>
123+
124+
<!-- set the degree of parallelism of the federated worker instructions (<=0 means number of virtual cores) -->
125+
<sysds.federated.par_inst>0</sysds.federated.par_inst>
126+
121127
<!-- set buffer pool threshold (max size) in % of total heap -->
122128
<sysds.caching.bufferpoollimit>15</sysds.caching.bufferpoollimit>
123129

src/main/java/org/apache/sysds/conf/DMLConfig.java

+4
Original file line numberDiff line numberDiff line change
@@ -116,6 +116,8 @@ public class DMLConfig
116116
public static final String DEFAULT_FEDERATED_INITIALIZATION_TIMEOUT = "sysds.federated.initialization.timeout"; // int seconds
117117
public static final String FEDERATED_TIMEOUT = "sysds.federated.timeout"; // single request timeout default -1 to indicate infinite.
118118
public static final String FEDERATED_PLANNER = "sysds.federated.planner";
119+
public static final String FEDERATED_PAR_INST = "sysds.federated.par_inst";
120+
public static final String FEDERATED_PAR_CONN = "sysds.federated.par_conn";
119121
public static final int DEFAULT_FEDERATED_PORT = 4040; // borrowed default Spark Port
120122
public static final int DEFAULT_NUMBER_OF_FEDERATED_WORKER_THREADS = 2;
121123

@@ -181,6 +183,8 @@ public class DMLConfig
181183
_defaultVals.put(DEFAULT_FEDERATED_INITIALIZATION_TIMEOUT, "10");
182184
_defaultVals.put(FEDERATED_TIMEOUT, "-1");
183185
_defaultVals.put(FEDERATED_PLANNER, FederatedPlanner.RUNTIME.name());
186+
_defaultVals.put(FEDERATED_PAR_CONN, "-1"); // vcores
187+
_defaultVals.put(FEDERATED_PAR_INST, "-1"); // vcores
184188
}
185189

186190
public DMLConfig() {

src/main/java/org/apache/sysds/runtime/compress/colgroup/ASDCZero.java

+2
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,8 @@
2828
import org.apache.sysds.runtime.matrix.data.MatrixBlock;
2929

3030
public abstract class ASDCZero extends APreAgg {
31+
private static final long serialVersionUID = -69266306137398807L;
32+
3133
/** Sparse row indexes for the data */
3234
protected AOffset _indexes;
3335

src/main/java/org/apache/sysds/runtime/controlprogram/federated/FederatedWorker.java

+3-1
Original file line numberDiff line numberDiff line change
@@ -44,6 +44,7 @@
4444
import org.apache.log4j.Logger;
4545
import org.apache.sysds.conf.ConfigurationManager;
4646
import org.apache.sysds.conf.DMLConfig;
47+
import org.apache.sysds.runtime.controlprogram.parfor.stat.InfrastructureAnalyzer;
4748
import org.apache.sysds.runtime.lineage.LineageCacheConfig;
4849

4950
public class FederatedWorker {
@@ -67,7 +68,8 @@ public FederatedWorker(int port, boolean debug) {
6768

6869
public void run() throws CertificateException, SSLException {
6970
log.info("Setting up Federated Worker on port " + _port);
70-
final int EVENT_LOOP_THREADS = Math.max(4, Runtime.getRuntime().availableProcessors() * 4);
71+
int par_conn = ConfigurationManager.getDMLConfig().getIntValue(DMLConfig.FEDERATED_PAR_CONN);
72+
final int EVENT_LOOP_THREADS = (par_conn > 0) ? par_conn : InfrastructureAnalyzer.getLocalParallelism();
7173
NioEventLoopGroup bossGroup = new NioEventLoopGroup(1);
7274
ThreadPoolExecutor workerTPE = new ThreadPoolExecutor(1, Integer.MAX_VALUE,
7375
10, TimeUnit.SECONDS, new SynchronousQueue<Runnable>(true));

src/main/java/org/apache/sysds/runtime/controlprogram/federated/FederatedWorkerHandler.java

+5-2
Original file line numberDiff line numberDiff line change
@@ -37,6 +37,7 @@
3737
import org.apache.sysds.common.Types.DataType;
3838
import org.apache.sysds.common.Types.FileFormat;
3939
import org.apache.sysds.conf.ConfigurationManager;
40+
import org.apache.sysds.conf.DMLConfig;
4041
import org.apache.sysds.parser.DataExpression;
4142
import org.apache.sysds.runtime.DMLRuntimeException;
4243
import org.apache.sysds.runtime.controlprogram.BasicProgramBlock;
@@ -48,6 +49,7 @@
4849
import org.apache.sysds.runtime.controlprogram.context.SparkExecutionContext;
4950
import org.apache.sysds.runtime.controlprogram.federated.FederatedRequest.RequestType;
5051
import org.apache.sysds.runtime.controlprogram.federated.FederatedResponse.ResponseType;
52+
import org.apache.sysds.runtime.controlprogram.parfor.stat.InfrastructureAnalyzer;
5153
import org.apache.sysds.runtime.instructions.Instruction;
5254
import org.apache.sysds.runtime.instructions.Instruction.IType;
5355
import org.apache.sysds.runtime.instructions.InstructionParser;
@@ -436,8 +438,9 @@ private FederatedResponse execInstruction(FederatedRequest request, ExecutionCon
436438

437439
// set the number of threads according to the number of processors on the federated worker
438440
if(receivedInstruction.getOperator() instanceof MultiThreadedOperator) {
439-
int numProcessors = Runtime.getRuntime().availableProcessors();
440-
((MultiThreadedOperator)receivedInstruction.getOperator()).setNumThreads(numProcessors);
441+
int par_inst = ConfigurationManager.getDMLConfig().getIntValue(DMLConfig.FEDERATED_PAR_INST);
442+
((MultiThreadedOperator)receivedInstruction.getOperator())
443+
.setNumThreads((par_inst > 0) ? par_inst : InfrastructureAnalyzer.getLocalParallelism());
441444
}
442445

443446
BasicProgramBlock pb = new BasicProgramBlock(null);

0 commit comments

Comments
 (0)