Skip to content

Commit a2cbd4b

Browse files
committed
Agent avoid lock by returning a bool if they need to reschedule
1 parent 0283488 commit a2cbd4b

File tree

6 files changed

+99
-58
lines changed

6 files changed

+99
-58
lines changed

src/nl/uu/cs/iss/ga/sim2apl/core/deliberation/DeliberationRunnable.java

Lines changed: 13 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -1,13 +1,11 @@
11
package nl.uu.cs.iss.ga.sim2apl.core.deliberation;
2-
3-
import nl.uu.cs.iss.ga.sim2apl.core.platform.Platform;
2+
43
import nl.uu.cs.iss.ga.sim2apl.core.agent.Agent;
54
import nl.uu.cs.iss.ga.sim2apl.core.agent.AgentID;
65
import nl.uu.cs.iss.ga.sim2apl.core.plan.PlanExecutionError;
6+
import nl.uu.cs.iss.ga.sim2apl.core.platform.Platform;
77

88
import java.util.ArrayList;
9-
import java.util.Collections;
10-
import java.util.List;
119
import java.util.concurrent.Callable;
1210

1311
/**
@@ -17,7 +15,7 @@
1715
*
1816
* @author Bas Testerink
1917
*/
20-
public final class DeliberationRunnable<T> implements Callable<List<T>> {
18+
public final class DeliberationRunnable<T> implements Callable<ReschedulableResult<T>> {
2119
/** Interface to obtain the relevant agent's data. */
2220
private final nl.uu.cs.iss.ga.sim2apl.core.agent.Agent<T> agent;
2321
/** Interface to the relevant platform functionalities. */
@@ -34,7 +32,7 @@ public final class DeliberationRunnable<T> implements Callable<List<T>> {
3432
public DeliberationRunnable(final nl.uu.cs.iss.ga.sim2apl.core.agent.Agent<T> agent, final Platform platform) {
3533
this.agent = agent;
3634
this.platform = platform;
37-
this.agent.setSelfRescheduler(new SelfRescheduler(this));
35+
this.agent.setSelfRescheduler(new SelfRescheduler<>(this));
3836
}
3937

4038
/**
@@ -46,10 +44,11 @@ public DeliberationRunnable(final nl.uu.cs.iss.ga.sim2apl.core.agent.Agent<T> ag
4644
* killed and removed from the platform.
4745
*/
4846
@Override
49-
public List<T> call(){
47+
public ReschedulableResult<T> call(){
5048
if(!this.agent.isDone()){ // Check first if agent was killed outside of this runnable
5149
// Clear intended actions potential previous deliberation cycle
5250
this.intendedActions = new ArrayList<>();
51+
boolean reschedule = false;
5352

5453
try {
5554
// Go through the cycle and execute each step.
@@ -71,7 +70,8 @@ public List<T> call(){
7170
initiateShutdown(this.agent);
7271
} else {
7372
if (!this.agent.checkSleeping()) { // If the agents goes to sleep then it will be woken upon any external input (message, external trigger)
74-
reschedule();
73+
// reschedule();
74+
reschedule = true;
7575
} else {
7676
Platform.getLogger().log(DeliberationRunnable.class, String.format("Agent %s going to sleep",
7777
agent.getAID().getName()));
@@ -87,12 +87,13 @@ public List<T> call(){
8787
}
8888

8989
// Produce the set of intended actions
90-
return intendedActions;
90+
return new ReschedulableResult<T>(this, intendedActions, reschedule);
9191
} else {
9292
initiateShutdown(agent);
9393

9494
// An agent that shuts down will no longer perform actions
95-
return Collections.emptyList();
95+
// return Collections.emptyList();
96+
return new ReschedulableResult<>(this, intendedActions, false);
9697
}
9798
}
9899

@@ -115,7 +116,7 @@ private void initiateShutdown(Agent<T> agent) {
115116
public final AgentID getAgentID(){ return this.agent.getAID(); }
116117

117118
/** Reschedule this deliberation runnable so it will be executed again in the future. */
118-
public final synchronized void reschedule(){
119+
public final void reschedule(){
119120
this.platform.scheduleForExecution(this);
120-
}
121+
}
121122
}
Lines changed: 36 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,36 @@
1+
package nl.uu.cs.iss.ga.sim2apl.core.deliberation;
2+
3+
import nl.uu.cs.iss.ga.sim2apl.core.agent.AgentID;
4+
5+
import java.util.List;
6+
7+
public class ReschedulableResult<T> {
8+
9+
final AgentID agentID;
10+
final DeliberationRunnable<T> deliberationRunnable;
11+
final boolean reschedule;
12+
final List<T> result;
13+
14+
public ReschedulableResult(DeliberationRunnable<T> deliberationRunnable, List<T> result, boolean reschedule) {
15+
this.deliberationRunnable = deliberationRunnable;
16+
this.agentID = deliberationRunnable.getAgentID();
17+
this.reschedule = reschedule;
18+
this.result = result;
19+
}
20+
21+
public AgentID getAgentID() {
22+
return agentID;
23+
}
24+
25+
public DeliberationRunnable<T> getDeliberationRunnable() {
26+
return deliberationRunnable;
27+
}
28+
29+
public boolean isReschedule() {
30+
return reschedule;
31+
}
32+
33+
public List<T> getResult() {
34+
return result;
35+
}
36+
}

src/nl/uu/cs/iss/ga/sim2apl/core/deliberation/SelfRescheduler.java

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -5,11 +5,11 @@
55
*
66
* @author Bas Testerink
77
*/
8-
public final class SelfRescheduler {
8+
public final class SelfRescheduler<T> {
99
/** The deliberation runnable that can be rescheduled. */
10-
private final DeliberationRunnable deliberationRunnable;
10+
private final DeliberationRunnable<T> deliberationRunnable;
1111

12-
public SelfRescheduler(final DeliberationRunnable deliberationRunnable){
12+
public SelfRescheduler(final DeliberationRunnable<T> deliberationRunnable){
1313
this.deliberationRunnable = deliberationRunnable;
1414
}
1515

src/nl/uu/cs/iss/ga/sim2apl/core/platform/Platform.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -275,9 +275,9 @@ public Set<AgentID> getLocalDirectoryFacilitators() {
275275
* @param deliberationRunnable Deliberation cycle to be executed sometime in the future.
276276
*/
277277
public final void scheduleForExecution(final DeliberationRunnable deliberationRunnable) {
278-
synchronized (this.tickExecutor) {
278+
// synchronized (this.tickExecutor) {
279279
this.tickExecutor.scheduleForNextTick(deliberationRunnable);
280-
}
280+
// }
281281
}
282282

283283
/**

src/nl/uu/cs/iss/ga/sim2apl/core/tick/DefaultBlockingTickExecutor.java

Lines changed: 28 additions & 36 deletions
Original file line numberDiff line numberDiff line change
@@ -2,12 +2,10 @@
22

33
import nl.uu.cs.iss.ga.sim2apl.core.agent.AgentID;
44
import nl.uu.cs.iss.ga.sim2apl.core.deliberation.DeliberationRunnable;
5+
import nl.uu.cs.iss.ga.sim2apl.core.deliberation.ReschedulableResult;
56

67
import java.util.*;
7-
import java.util.concurrent.ExecutionException;
8-
import java.util.concurrent.ExecutorService;
9-
import java.util.concurrent.Executors;
10-
import java.util.concurrent.Future;
8+
import java.util.concurrent.*;
119
import java.util.stream.Collectors;
1210

1311
/**
@@ -29,15 +27,15 @@ public class DefaultBlockingTickExecutor<T> implements TickExecutor<T> {
2927
private final ExecutorService executor;
3028

3129
/** The list of agents scheduled for the next tick **/
32-
private final ArrayList<DeliberationRunnable<T>> scheduledRunnables;
30+
private final Map<AgentID, DeliberationRunnable<T>> scheduledRunnables;
3331

3432
/**
3533
* Default constructor
3634
* @param nThreads Number of threads to use to execute the agent's sense-reason-act cycles.
3735
*/
3836
public DefaultBlockingTickExecutor(int nThreads) {
3937
this.executor = Executors.newFixedThreadPool(nThreads);
40-
this.scheduledRunnables = new ArrayList<>();
38+
this.scheduledRunnables = new ConcurrentHashMap<>();
4139
}
4240

4341
/**
@@ -60,11 +58,13 @@ public DefaultBlockingTickExecutor(int nThreads, Random random) {
6058
*/
6159
@Override
6260
public boolean scheduleForNextTick(DeliberationRunnable<T> agentDeliberationRunnable) {
63-
if (!this.scheduledRunnables.contains(agentDeliberationRunnable)) {
64-
this.scheduledRunnables.add(agentDeliberationRunnable);
65-
return true;
66-
}
67-
return false;
61+
this.scheduledRunnables.put(agentDeliberationRunnable.getAgentID(), agentDeliberationRunnable);
62+
// if (!this.scheduledRunnables.containsKey(agentDeliberationRunnable.getAgentID())) {
63+
// this.scheduledRunnables.add(agentDeliberationRunnable);
64+
// return true;
65+
// }
66+
// return false;
67+
return true;
6868
}
6969

7070
/**
@@ -75,7 +75,7 @@ public HashMap<AgentID, List<T>> doTick() {
7575
ArrayList<DeliberationRunnable<T>> runnables;
7676
// TODO make sure running can only happen once with some sort of mutex? How to verify if a tick is currently being executed?
7777
synchronized (this.scheduledRunnables) {
78-
runnables = new ArrayList<>(this.scheduledRunnables);
78+
runnables = new ArrayList<>(this.scheduledRunnables.values());
7979
this.scheduledRunnables.clear();
8080
}
8181

@@ -88,30 +88,21 @@ public HashMap<AgentID, List<T>> doTick() {
8888

8989
long startTime = System.currentTimeMillis();
9090
try {
91-
List<Future<List<T>>> currentAgentFutures = this.executor.invokeAll(runnables);
92-
for(int i = 0; i < currentAgentFutures.size(); i++) {
93-
agentPlanActions.put(runnables.get(i).getAgentID(),
94-
currentAgentFutures.get(i).get().stream().filter(Objects::nonNull).collect(Collectors.toList())); // TODO will this work?
91+
List<Future<ReschedulableResult<T>>> currentAgentFutures = this.executor.invokeAll(runnables);
92+
// for(int i = 0; i < currentAgentFutures.size(); i++) {
93+
// agentPlanActions.put(runnables.get(i).getAgentID(),
94+
// currentAgentFutures.get(i).get().getResult().stream().filter(Objects::nonNull).collect(Collectors.toList())); // TODO will this work?
95+
// }
96+
for(Future<ReschedulableResult<T>> futures : currentAgentFutures) {
97+
ReschedulableResult<T> result = futures.get();
98+
agentPlanActions.put(result.getAgentID(), result.getResult().stream().filter(Objects::nonNull).collect(Collectors.toList()));
99+
if(result.isReschedule()) {
100+
this.scheduleForNextTick(result.getDeliberationRunnable());
101+
}
95102
}
96103
} catch (InterruptedException | ExecutionException e) {
97104
e.printStackTrace();
98105
}
99-
// for(DeliberationRunnable<T> dr : runnables) {
100-
// try {
101-
// List<T> currentAgentActions = this.executor.submit(dr).get();
102-
// currentAgentActions = currentAgentActions.stream().filter(Objects::nonNull).collect(Collectors.toList());
103-
//
104-
// List<String> currentAgentActionStrings = new ArrayList<>();
105-
// for (Object action: currentAgentActions) {
106-
// currentAgentActionStrings.add((String) action);
107-
// }
108-
//
109-
// agentPlanActions.put(dr.getAgentID(), currentAgentActionStrings);
110-
//
111-
// } catch (InterruptedException | ExecutionException e) {
112-
// e.printStackTrace();
113-
// }
114-
// }
115106
this.stepDuration = (int) (System.currentTimeMillis() - startTime);
116107

117108
tick++;
@@ -150,11 +141,12 @@ public int getLastTickDuration() {
150141
public List<AgentID> getScheduledAgents() {
151142
List<AgentID> scheduledAgents = new ArrayList<>();
152143
synchronized (this.scheduledRunnables) {
153-
for(DeliberationRunnable runnable : this.scheduledRunnables) {
154-
scheduledAgents.add(runnable.getAgentID());
155-
}
144+
// for(DeliberationRunnable<T> runnable : this.scheduledRunnables) {
145+
// scheduledAgents.add(runnable.getAgentID());
146+
// }
147+
return new ArrayList<>(this.scheduledRunnables.keySet());
156148
}
157-
return scheduledAgents;
149+
// return scheduledAgents;
158150
}
159151

160152
/**

src/nl/uu/cs/iss/ga/sim2apl/core/tick/matrix/MatrixAgentThread.java

Lines changed: 17 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,7 @@
1313
import java.util.logging.Logger;
1414
import java.util.stream.Collectors;
1515
import nl.uu.cs.iss.ga.sim2apl.core.deliberation.DeliberationRunnable;
16+
import nl.uu.cs.iss.ga.sim2apl.core.deliberation.ReschedulableResult;
1617

1718
/**
1819
* An Agent Thread is responsible for producing events on behalf of a specific set of agents.
@@ -63,14 +64,25 @@ public void run() {
6364
long startTime = System.currentTimeMillis();
6465

6566
List<DeliberationRunnable<T>> runnables = inq.take();
66-
List<Future<List<T>>> agentActionFutures = this.executor.invokeAll(runnables);
67+
List<Future<ReschedulableResult<T>>> agentActionFutures = this.executor.invokeAll(runnables);
6768

6869
try {
69-
for(int i = 0; i < runnables.size(); i++) {
70-
List<String> agentActions = agentActionFutures.get(i).get().stream()
71-
.filter(Objects::nonNull).map(gson::toJson).collect(Collectors.toList());
70+
// for(int i = 0; i < runnables.size(); i++) {
71+
// List<String> agentActions = agentActionFutures.get(i).get().stream()
72+
// .filter(Objects::nonNull).map(gson::toJson).collect(Collectors.toList());
73+
// JsonObject update = new JsonObject();
74+
// update.addProperty("agentID", runnables.get(i).getAgentID().toString());
75+
// update.add("actions", gson.toJsonTree(agentActions, arrayListStringType));
76+
// JsonArray updates = new JsonArray();
77+
// updates.add(update);
78+
// this.proxy.register_events(agentproc_id, updates);
79+
// }
80+
for(Future<ReschedulableResult<T>> future : agentActionFutures) {
81+
ReschedulableResult<T> result = future.get();
82+
List<String> agentActions = result.getResult().stream().filter(Objects::nonNull)
83+
.map(gson::toJson).collect(Collectors.toList());
7284
JsonObject update = new JsonObject();
73-
update.addProperty("agentID", runnables.get(i).getAgentID().toString());
85+
update.addProperty("agentID", result.getAgentID().toString());
7486
update.add("actions", gson.toJsonTree(agentActions, arrayListStringType));
7587
JsonArray updates = new JsonArray();
7688
updates.add(update);

0 commit comments

Comments
 (0)