-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathrouting_engine.py
187 lines (138 loc) · 6.28 KB
/
routing_engine.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
##########################################################################
# Author: Griffin Della Grotte (gdellagr@andrew.cmu.edu)
#
# This module performs all the routing functions that MapCMU relies on
# for navigation, which includes the actual pathfinding as well as
# managing the data for the routes
##########################################################################
import mongodb_databases
from mapcmu_data import *
class RoutingEngine(object):
def __init__(self):
self.db = mongodb_databases.MDBDatabase(Constants.database)
self.segTable = mongodb_databases.SegmentTable(self.db.database)
#self.segTable.nukeTable()
def testTable(self):
# Set up some fake data in the database so the display can be tested
self.segTable.addSegment([10,10,0,"GHC"], [10,100,0,"GHC"])
self.segTable.addSegment([10,100,0,"GHC"], [100,100,0,"GHC"])
self.segTable.addSegment([10,10,0,"GHC"], [100,10,0,"GHC"])
self.segTable.addSegment([100,10,0,"GHC"], [200,10,0,"GHC"])
self.segTable.addSegment([200,10,0,"GHC"], [100,100,0,"GHC"])
self.segTable.addSegment([200,10,0,"GHC"], [100,150,0,"GHC"])
self.segTable.addSegment([100,100,0,"GHC"], [100,150,0,"GHC"])
self.segTable.addSegment([200,10,0,"GHC"], [200,220,0,"GHC"])
self.segTable.addSegment([100,150,0,"GHC"], [200,220,0,"GHC"])
def getRoomNode(self, room):
return self.segTable.collection.find(
{"NOTE" : room})[0]["LOC_A"]
def getAllSegments(self, onFloor=None):
if onFloor == None:
return self.segTable.collection.find()
else:
floorSegList = []
floorZ = (onFloor-1) * Constants.floorHeight
segments = self.segTable.collection.find()
for seg in segments:
if seg["LOC_A"][2] == floorZ and seg["LOC_B"][2] == floorZ:
floorSegList.append(seg)
return floorSegList
def getAllNodes(self, onFloor=None):
locs = set()
for row in self.getAllSegments(onFloor):
locs.add(tuple(row["LOC_A"]))
locs.add(tuple(row["LOC_B"]))
locsLists = list(map(lambda x: list(x), locs))
return locsLists
def getAllConnections(self, node):
# Find all the connected nodes an input node has
node = list(node)
q = { '$or' : [{"LOC_A" : node}, {"LOC_B" : node}]}
res = self.segTable.collection.find(q)
neighbors = []
for row in res:
if row["LOC_A"] == node:
neighbors.append(RoutingEngine.AStarNode(row["LOC_B"],node))
elif row["LOC_B"] == node:
neighbors.append(RoutingEngine.AStarNode(row["LOC_A"],node))
return neighbors
class AStarNode(object):
# Node object for the A* algorithm
def __init__(self, nodeInfo, parent):
self.info = nodeInfo
self.parent = parent
def __eq__(self, other):
if isinstance(other, type(self)):
return self.info == other.info
elif isinstance(other, list):
return other == self.info
def __repr__(self):
return repr(self.info)
def euclDist(self,a,b):
if isinstance(a, type(self)): ai = a.info
else: ai = a
if isinstance(b, type(self)): bi = b.info
else: bi = b
return ((ai[0] - bi[0])**2 + \
(ai[1] - bi[1])**2 + \
(ai[2] - bi[2])**2)**0.5
def calcHCost(self,testNode,dst):
# Just euclidian dist from current to end
return self.euclDist(testNode, dst)
def calcGCost(self,testNode):
# Recursively calulate distance to origin through nodes
# Euclidian distance between nodes (that's how the map's set up)
if not isinstance(testNode.parent, type(self)):
return 0
return self.euclDist(testNode, testNode.parent) + \
self.calcGCost(testNode.parent)
def calcFCost(self, testNode, dst):
return self.calcHCost(testNode, dst) + \
self.calcGCost(testNode)
def findRoute(self, origin, dst):
# Do the A* algorithm
# Based upon tutorials online (not copy/pasted)
nodes = self.getAllNodes()
if origin not in nodes or dst not in nodes:
return None
openNodes = list()
openNodes.append(RoutingEngine.AStarNode(origin, None))
closedNodes = list()
while(True):
#print("Open: %r" % openNodes)
#print("Closed: %r" % closedNodes)
currentNode = self.findLowestFCost(openNodes,dst)
if currentNode == None:
# Found No Path!
return None
openNodes.remove(currentNode)
closedNodes.append(currentNode)
if dst == currentNode:
return self.retracePath(currentNode)
for neighbor in self.getAllConnections(currentNode.info):
if neighbor in closedNodes: continue
if (neighbor not in openNodes or
(self.pathFromThru(neighbor, currentNode) <
neighbor.calcGCost(neighbor))):
neighbor.parent = currentNode # Setting new, better H cost
if neighbor not in openNodes:
openNodes.append(neighbor)
def retracePath(self, node):
# Once A* finds the terminus, retrace back through the node's parents
# to find the start. This recursive call returns this in the
# correct, start-to-end format.
if node == None:
return []
return self.retracePath(node.parent) + [node]
def pathFromThru(self, startNode, thruNode):
segA = startNode.euclDist(startNode, thruNode)
return segA + thruNode.calcGCost(thruNode)
def findLowestFCost(self, nodes, dst):
bestNode = None
bestCost = 10e42
for node in nodes:
fCost = node.calcFCost(node, dst)
if fCost < bestCost:
bestCost = fCost
bestNode = node
return bestNode