-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathvisualize_graph.py
155 lines (144 loc) · 5.95 KB
/
visualize_graph.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
"""Convert OSM map to GeoJSON format."""
from networkx import degree
from networkx import all_neighbors
from osm2networkx import read_osm
from random import shuffle
from random import randint
from sys import argv
from os import remove
from os.path import isfile
def get_random_nodes(graph, K=100):
"""Sample up to K random nodes from graph."""
node_sample = graph.nodes()
shuffle(node_sample)
K = min(K, len(node_sample))
node_sample = node_sample[0:K]
return node_sample
def plot_random_nodes(graph, outfile_name, K=100):
"""Write K random nodes to GeoJSON file."""
node_sample = get_random_nodes(graph, K)
with open(outfile_name, 'w') as outfile:
outfile.write('{ "type" : "FeatureCollection", \n')
outfile.write('"features" : [\n')
plot_nodes(node_sample, graph, outfile, False, False)
outfile.write(']\n}')
plot_nodes(node_sample, graph, outfile)
def plot_nodes(node_list, graph, outfile, header=False, footer=False, color="#F5A207"):
"""Write list of nodes from graph to
GeoJSON file."""
if header:
outfile.write('{ "type" : "FeatureCollection", \n')
outfile.write('"features" : [\n')
node_strings = list(map(lambda x: node_to_GeoJSON(x, graph, color), node_list))
outfile.write(','.join(node_strings))
if footer:
outfile.write(']\n}')
print('done writing nodes to file')
def node_to_GeoJSON(node, graph, color="#F5A207"):
"""Convert node to GeoJSON string."""
data = graph.nodes[node]
lat = data['lat']
lon = data['lon']
node_string = ''
node_string += '{ "type" : "Feature",\n'
node_string += '"geometry" : {"type": "Point", "coordinates": [%f,%f]},\n'%(lon, lat)
node_string += '"properties": {"marker-color": "%s"}\n'%(color)
node_string += '}\n'
return node_string
def plot_edges(edge_list, graph, outfile, header=False, footer=False, color="#F5A207"):
"""Write list of edges from graph to
GeoJSON file."""
if header:
outfile.write('{ "type" : "FeatureCollection", \n')
outfile.write('"features" : [\n')
edge_strings = list(map(lambda x: edge_to_GeoJSON(x, graph, color), edge_list))
outfile.write(','.join(edge_strings))
if footer:
outfile.write(']\n}')
print('done writing edges to file')
def edge_to_GeoJSON(edge, graph, color="#F5A207"):
"""Convert edge to GeoJSON string."""
start = edge[0]
end = edge[1]
start_lon = graph.nodes[start]['lon']
start_lat = graph.nodes[start]['lat']
end_lon = graph.nodes[end]['lon']
end_lat = graph.nodes[end]['lat']
edge_string = ''
edge_string += '{ "type" : "Feature",\n'
edge_string += '"geometry" : {"type": "LineString", '
edge_string += '"coordinates": [[%f,%f], [%f,%f]]},\n'%(
start_lon, start_lat, end_lon, end_lat)
edge_string += '"properties": {"marker-color": "%s"}\n'%(color)
edge_string += '}\n'
return edge_string
def get_random_path(graph, K=100):
"""Pick one of top 10 most-connected nodes as
start, get a random neighbor, repeat up to K
times to build a random path.
Returns path nodes (as ids), path edges
(as id tuples), all explored nodes and
all explored edges."""
nodes = graph.nodes()
nodes = sorted(nodes, key=graph.degree, reverse=True)
start_node = nodes[randint(0,min(10,len(nodes)))]
print('start node has degree %d'%(graph.degree(start_node)))
path_nodes = []
path_edges = []
explored_nodes = set([start_node])
last_node = start_node
explored_nodes.add(start_node)
K = min(K, len(nodes))
for i in range(1,K):
neighbors = [n for n in all_neighbors(graph, last_node)]
new_nodes = set(neighbors).difference(set(path_nodes))
if not new_nodes:
print('finish with node %s with neighbors %s and explored_nodes %s'%
(last_node, str(neighbors), str(path_nodes)))
break
explored_nodes = explored_nodes.union(new_nodes)
new_nodes = sorted(list(new_nodes), key=graph.degree, reverse=True)
neighbor = new_nodes[randint(0,min(2,len(new_nodes)-1))]
path_edges.append((last_node, neighbor))
last_node = neighbor
path_nodes.append(last_node)
return path_nodes, explored_nodes
def plot_search(graph, outfile_name, path_nodes, explored_nodes):
"""Plot path nodes/edges as well as all other
explored nodes in different color."""
with open(outfile_name, 'w') as outfile:
outfile.write('{ "type" : "FeatureCollection", \n')
outfile.write('"features" : [\n')
# path nodes
plot_nodes(path_nodes, graph, outfile, False, False)
# path edges
outfile.write(',\n') # separate each category
path_edges = []
for i in range(1,len(path_nodes)):
path_edges.append((path_nodes[i-1], path_nodes[i]))
plot_edges(path_edges, graph, outfile, False, False)
# explored nodes (non-path ones)
# plot them as black
if explored_nodes:
explored_nodes = set([key for key in explored_nodes.keys() if explored_nodes[key] > 0])
explored_nodes = explored_nodes.difference(set(path_nodes))
if len(explored_nodes) > 0:
outfile.write(',\n')
plot_nodes(explored_nodes, graph, outfile, False, False, color="#000000")
outfile.write(']\n}')
def plot_random_path(graph, outfile_name, K=100):
"""Plot random path and write to file."""
path_nodes, explored_nodes = get_random_path(graph, K)
plot_search(graph, outfile_name, path_nodes, explored_nodes)
print('done plotting random path')
def test_plot_random_path(graph, outfile_name):
"""Testing the random path plotting."""
graph = read_osm(graph)
if isfile(outfile_name):
remove(outfile_name)
plot_random_path(graph, outfile_name, K=100)
if __name__ == "__main__":
"""Read graph from file and write
nodes/edges to file."""
if(len(argv) > 1):
test_plot_random_path(argv[1], argv[2])