Skip to content

Commit 3dffe82

Browse files
committed
Changed data structures to slightly more concurency friendly
1 parent a2cbd4b commit 3dffe82

File tree

3 files changed

+22
-69
lines changed

3 files changed

+22
-69
lines changed

src/nl/uu/cs/iss/ga/sim2apl/core/deliberation/DeliberationRunnable.java

Lines changed: 10 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,11 @@
66
import nl.uu.cs.iss.ga.sim2apl.core.platform.Platform;
77

88
import java.util.ArrayList;
9+
import java.util.Collections;
10+
import java.util.List;
11+
import java.util.Objects;
912
import java.util.concurrent.Callable;
13+
import java.util.stream.Collectors;
1014

1115
/**
1216
* A deliberation runnable implements how an agent is executed. This is done by
@@ -15,7 +19,7 @@
1519
*
1620
* @author Bas Testerink
1721
*/
18-
public final class DeliberationRunnable<T> implements Callable<ReschedulableResult<T>> {
22+
public final class DeliberationRunnable<T> implements Callable<List<T>> {
1923
/** Interface to obtain the relevant agent's data. */
2024
private final nl.uu.cs.iss.ga.sim2apl.core.agent.Agent<T> agent;
2125
/** Interface to the relevant platform functionalities. */
@@ -44,11 +48,10 @@ public DeliberationRunnable(final nl.uu.cs.iss.ga.sim2apl.core.agent.Agent<T> ag
4448
* killed and removed from the platform.
4549
*/
4650
@Override
47-
public ReschedulableResult<T> call(){
51+
public List<T> call(){
4852
if(!this.agent.isDone()){ // Check first if agent was killed outside of this runnable
4953
// Clear intended actions potential previous deliberation cycle
5054
this.intendedActions = new ArrayList<>();
51-
boolean reschedule = false;
5255

5356
try {
5457
// Go through the cycle and execute each step.
@@ -58,7 +61,7 @@ public ReschedulableResult<T> call(){
5861
}
5962

6063
for(DeliberationActionStep<T> step : this.agent.getActCycle()) {
61-
this.intendedActions.addAll(step.execute());
64+
this.intendedActions.addAll(step.execute().stream().filter(Objects::nonNull).collect(Collectors.toList()));
6265
}
6366

6467
// If all deliberation steps are finished, then check whether
@@ -70,8 +73,7 @@ public ReschedulableResult<T> call(){
7073
initiateShutdown(this.agent);
7174
} else {
7275
if (!this.agent.checkSleeping()) { // If the agents goes to sleep then it will be woken upon any external input (message, external trigger)
73-
// reschedule();
74-
reschedule = true;
76+
reschedule();
7577
} else {
7678
Platform.getLogger().log(DeliberationRunnable.class, String.format("Agent %s going to sleep",
7779
agent.getAID().getName()));
@@ -87,13 +89,12 @@ public ReschedulableResult<T> call(){
8789
}
8890

8991
// Produce the set of intended actions
90-
return new ReschedulableResult<T>(this, intendedActions, reschedule);
92+
return intendedActions;
9193
} else {
9294
initiateShutdown(agent);
9395

9496
// An agent that shuts down will no longer perform actions
95-
// return Collections.emptyList();
96-
return new ReschedulableResult<>(this, intendedActions, false);
97+
return Collections.emptyList();
9798
}
9899
}
99100

src/nl/uu/cs/iss/ga/sim2apl/core/tick/DefaultBlockingTickExecutor.java

Lines changed: 7 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -2,11 +2,9 @@
22

33
import nl.uu.cs.iss.ga.sim2apl.core.agent.AgentID;
44
import nl.uu.cs.iss.ga.sim2apl.core.deliberation.DeliberationRunnable;
5-
import nl.uu.cs.iss.ga.sim2apl.core.deliberation.ReschedulableResult;
65

76
import java.util.*;
87
import java.util.concurrent.*;
9-
import java.util.stream.Collectors;
108

119
/**
1210
* A default time step executor that uses a ThreadPoolExecutor to run the agents when the tick needs
@@ -53,17 +51,16 @@ public DefaultBlockingTickExecutor(int nThreads, Random random) {
5351
this.random = random;
5452
}
5553

54+
public <X> List<Future<X>> useExecutorForTasks(Collection<? extends Callable<X>> tasks) throws InterruptedException {
55+
return this.executor.invokeAll(tasks);
56+
}
57+
5658
/**
5759
* {@inheritDoc}
5860
*/
5961
@Override
6062
public boolean scheduleForNextTick(DeliberationRunnable<T> agentDeliberationRunnable) {
6163
this.scheduledRunnables.put(agentDeliberationRunnable.getAgentID(), agentDeliberationRunnable);
62-
// if (!this.scheduledRunnables.containsKey(agentDeliberationRunnable.getAgentID())) {
63-
// this.scheduledRunnables.add(agentDeliberationRunnable);
64-
// return true;
65-
// }
66-
// return false;
6764
return true;
6865
}
6966

@@ -88,17 +85,9 @@ public HashMap<AgentID, List<T>> doTick() {
8885

8986
long startTime = System.currentTimeMillis();
9087
try {
91-
List<Future<ReschedulableResult<T>>> currentAgentFutures = this.executor.invokeAll(runnables);
92-
// for(int i = 0; i < currentAgentFutures.size(); i++) {
93-
// agentPlanActions.put(runnables.get(i).getAgentID(),
94-
// currentAgentFutures.get(i).get().getResult().stream().filter(Objects::nonNull).collect(Collectors.toList())); // TODO will this work?
95-
// }
96-
for(Future<ReschedulableResult<T>> futures : currentAgentFutures) {
97-
ReschedulableResult<T> result = futures.get();
98-
agentPlanActions.put(result.getAgentID(), result.getResult().stream().filter(Objects::nonNull).collect(Collectors.toList()));
99-
if(result.isReschedule()) {
100-
this.scheduleForNextTick(result.getDeliberationRunnable());
101-
}
88+
List<Future<List<T>>> currentAgentFutures = this.executor.invokeAll(runnables);
89+
for(int i = 0; i < currentAgentFutures.size(); i++) {
90+
agentPlanActions.put(runnables.get(i).getAgentID(), currentAgentFutures.get(i).get());
10291
}
10392
} catch (InterruptedException | ExecutionException e) {
10493
e.printStackTrace();
@@ -141,12 +130,8 @@ public int getLastTickDuration() {
141130
public List<AgentID> getScheduledAgents() {
142131
List<AgentID> scheduledAgents = new ArrayList<>();
143132
synchronized (this.scheduledRunnables) {
144-
// for(DeliberationRunnable<T> runnable : this.scheduledRunnables) {
145-
// scheduledAgents.add(runnable.getAgentID());
146-
// }
147133
return new ArrayList<>(this.scheduledRunnables.keySet());
148134
}
149-
// return scheduledAgents;
150135
}
151136

152137
/**

src/nl/uu/cs/iss/ga/sim2apl/core/tick/matrix/MatrixAgentThread.java

Lines changed: 5 additions & 38 deletions
Original file line numberDiff line numberDiff line change
@@ -64,25 +64,14 @@ public void run() {
6464
long startTime = System.currentTimeMillis();
6565

6666
List<DeliberationRunnable<T>> runnables = inq.take();
67-
List<Future<ReschedulableResult<T>>> agentActionFutures = this.executor.invokeAll(runnables);
67+
List<Future<List<T>>> agentActionFutures = this.executor.invokeAll(runnables);
6868

6969
try {
70-
// for(int i = 0; i < runnables.size(); i++) {
71-
// List<String> agentActions = agentActionFutures.get(i).get().stream()
72-
// .filter(Objects::nonNull).map(gson::toJson).collect(Collectors.toList());
73-
// JsonObject update = new JsonObject();
74-
// update.addProperty("agentID", runnables.get(i).getAgentID().toString());
75-
// update.add("actions", gson.toJsonTree(agentActions, arrayListStringType));
76-
// JsonArray updates = new JsonArray();
77-
// updates.add(update);
78-
// this.proxy.register_events(agentproc_id, updates);
79-
// }
80-
for(Future<ReschedulableResult<T>> future : agentActionFutures) {
81-
ReschedulableResult<T> result = future.get();
82-
List<String> agentActions = result.getResult().stream().filter(Objects::nonNull)
83-
.map(gson::toJson).collect(Collectors.toList());
70+
for(int i = 0; i < runnables.size(); i++) {
71+
List<String> agentActions = agentActionFutures.get(i).get().stream()
72+
.filter(Objects::nonNull).map(gson::toJson).collect(Collectors.toList());
8473
JsonObject update = new JsonObject();
85-
update.addProperty("agentID", result.getAgentID().toString());
74+
update.addProperty("agentID", runnables.get(i).getAgentID().toString());
8675
update.add("actions", gson.toJsonTree(agentActions, arrayListStringType));
8776
JsonArray updates = new JsonArray();
8877
updates.add(update);
@@ -92,28 +81,6 @@ public void run() {
9281
LOG.severe("Error running runnable: " + ex.toString());
9382
ex.printStackTrace();
9483
}
95-
// for(DeliberationRunnable<T> dr : runnables) {
96-
// try {
97-
// List<T> currentAgentActions = this.executor.submit(dr).get();
98-
// currentAgentActions = currentAgentActions.stream().filter(Objects::nonNull).collect(Collectors.toList());
99-
//
100-
// ArrayList<String> currentAgentActionStrings = new ArrayList<>();
101-
// for (Object action: currentAgentActions) {
102-
// currentAgentActionStrings.add((String) action);
103-
// }
104-
// String agentID = dr.getAgentID().toString();
105-
//
106-
// JsonObject update = new JsonObject();
107-
// update.addProperty("agentID", agentID);
108-
// update.add("actions", gson.toJsonTree(currentAgentActionStrings, arrayListStringType));
109-
// JsonArray updates = new JsonArray();
110-
// updates.add(update);
111-
// this.proxy.register_events(agentproc_id, updates);
112-
// } catch (InterruptedException | ExecutionException ex) {
113-
// LOG.severe("Error running runnable: " + ex.toString());
114-
// ex.printStackTrace();
115-
// }
116-
// }
11784
long stepDuration = (System.currentTimeMillis() - startTime);
11885
LOG.info(String.format("Agent thread %d: Round %d: Event production took %d ms", agentproc_id, cur_round, stepDuration));
11986
}

0 commit comments

Comments
 (0)