forked from jtan-gh/multi-agent-path-finder
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathDGSolver.py
119 lines (100 loc) · 5.03 KB
/
DGSolver.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
import time as timer
from single_agent_planner import a_star, get_sum_of_cost
from CBSSolver import CBSSolver
import copy
from mdd import get_all_optimal_paths, buildMDDTree, buildJointMDD, check_jointMDD_for_dependency
from mvc import Graph
class DGSolver(CBSSolver):
def get_dg_heuristic(self, my_map, paths, starts, goals, low_level_h, constraints):
dependencies = []
all_roots = []
all_paths = []
all_mdds = []
for i in range(len(paths)):
path = get_all_optimal_paths(my_map, starts[i], goals[i], low_level_h[i], i, constraints)
root, nodes_dict = buildMDDTree(path)
all_roots.append(root)
all_paths.append(path)
all_mdds.append(nodes_dict)
for i in range(len(paths)): # num of agents in map
# paths1 = get_all_optimal_paths(my_map, starts[i], goals[i], low_level_h[i], i, constraints)
# print(f"All optimal paths for agent {i}: {paths1}")
paths1 = all_paths[i]
root1 = all_roots[i]
node_dict1 = all_mdds[i]
for j in range(i+1,len(paths)):
# paths2 = get_all_optimal_paths(my_map, starts[j], goals[j], low_level_h[j], j, constraints)
# print(f"All optimal paths for agent {j}: {paths2}")
paths2 = all_paths[j]
root2 = all_roots[j]
node_dict2 = all_mdds[j]
root, bottom_node = buildJointMDD(paths1, paths2, root1, node_dict1, root2, node_dict2)
if (check_jointMDD_for_dependency(bottom_node, paths1, paths2)):
dependencies.append((i,j))
g = Graph(len(paths))
for conflict in dependencies:
g.addEdge(conflict[0], conflict[1])
vertex_cover = g.getVertexCover()
return len(vertex_cover)
def find_solution(self, disjoint=True):
""" Finds paths for all agents from their start locations to their goal locations
disjoint - use disjoint splitting or not
"""
self.start_time = timer.time()
root = {'cost': 0,
'h': 0,
'constraints': [],
'paths': [],
'collisions': []}
for i in range(self.num_of_agents): # Find initial path for each agent
path = a_star(self.my_map, self.starts[i], self.goals[i], self.heuristics[i],
i, root['constraints'])
if path is None:
raise BaseException('No solutions')
root['paths'].append(path)
root['cost'] = get_sum_of_cost(root['paths'])
root['h'] = self.get_dg_heuristic(self.my_map, root['paths'], self.starts, self.goals, self.heuristics, root['constraints'])
root['collisions'] = super().detect_collisions(root['paths'])
self.push_node(root)
while len(self.open_list) > 0:
curr = self.pop_node()
if not curr['collisions']:
self.print_results(curr)
self.write_results()
return curr['paths'] # this is the goal node
collision = curr['collisions'][0]
# constraints = standard_splitting(collision)
constraints = super().disjoint_splitting(collision)
for constraint in constraints:
if super().is_conflicting_constraint(constraint, curr['constraints']):
continue
child = {}
child['constraints'] = copy.deepcopy(curr['constraints'])
if constraint not in child['constraints']:
child['constraints'].append(constraint)
child['paths']= copy.deepcopy(curr['paths'])
prune_child = False
if constraint['positive']:
conflicted_agents = super().paths_violate_constraint(constraint, child['paths'])
for i in conflicted_agents:
new_path = a_star(self.my_map, self.starts[i], self.goals[i], self.heuristics[i],
i, child['constraints'])
if new_path is None:
prune_child = True
break
else:
child['paths'][i] = new_path
if prune_child:
continue
agent = constraint['agent']
path = a_star(self.my_map, self.starts[agent], self.goals[agent], self.heuristics[agent],
agent, child['constraints'])
if path is not None:
child['paths'][agent] = path
child['collisions'] = super().detect_collisions(child['paths'])
child['cost'] = get_sum_of_cost(child['paths'])
child['h'] = self.get_dg_heuristic(self.my_map, child['paths'], self.starts, self.goals, self.heuristics, child['constraints'])
self.push_node(child)
self.write_results()
self.print_results(root)
return root['paths']