-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathgridworld_mdp.py
118 lines (94 loc) · 3.39 KB
/
gridworld_mdp.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
import numpy as np
UP = 0
DOWN = 1
RIGHT = 2
LEFT = 3
actions_list = [UP, DOWN, RIGHT, LEFT]
class GridworldMDP():
def _read_grid_map(self, grid_map_path):
grid_map = open(grid_map_path, 'r').readlines()
grid_map_array = []
for k1 in grid_map:
k1s = k1.split(' ')
tmp_arr = []
for k2 in k1s:
try:
tmp_arr.append(int(k2))
except:
pass
grid_map_array.append(tmp_arr)
grid_map_array = np.array(grid_map_array)
return grid_map_array
def __init__(self, plan_file='plan0.txt', gamma=0.9, random_slide=0.0):
grid_map = self._read_grid_map(plan_file)
shape = grid_map.shape
nS = shape[0] * shape[1]
nA = len(actions_list)
MAX_X = shape[0]
MAX_Y = shape[1]
grid_values = np.arange(0, nS).reshape(shape)
def get_xy(ss):
x, y = np.where(grid_values == ss)
return x,y
def ns_up(ss):
x,y = get_xy(ss)
return ss if x == 0 or grid_map[x - 1, y] == 1 else ss - MAX_Y
def ns_down(ss):
x,y = get_xy(ss)
return ss if x == MAX_X - 1 or grid_map[x + 1, y] == 1 else ss + MAX_Y
def ns_right(ss):
x,y = get_xy(ss)
return ss if y == MAX_Y - 1 or grid_map[x, y + 1] == 1 else ss + 1
def ns_left(ss):
x,y = get_xy(ss)
return ss if y == 0 or grid_map[x, y - 1] == 1 else ss - 1
actions_func_list = [ns_up, ns_down, ns_right, ns_left]
P = np.zeros((nS, nA, nS))
R = np.zeros((nS, nA))
it = np.nditer(grid_map, flags=['multi_index'])
state_vars = np.zeros((nS, 2))
while not it.finished:
s = it.iterindex
x, y = it.multi_index
state_vars[s] = [x, y]
is_done = grid_map[x, y] == 3 or grid_map[x, y] == 1
reward = -1.0
if grid_map[x, y] == 3:
reward = 0.0
elif grid_map[x, y] == 1:
reward = 0.0
elif grid_map[x, y] == 2:
reward = -5.0
for a in range(nA):
R[s, a] = reward
# Terminal state
if is_done:
for a in range(nA):
P[s, a, s] = 1.0
# Not a terminal state
else:
for action, func in zip(actions_list, actions_func_list[:len(actions_list)]):
if random_slide > 0:
P[s, action, func(s)] = 1.0 - 2 * random_slide
P[s, action, ns_up(func(s))] += random_slide
P[s, action, ns_down(func(s))] += random_slide
else:
P[s, action, func(s)] = 1.0
it.iternext()
# Initial state distribution is uniform
self.d_0 = np.reshape(np.ones(nS) / nS, [-1, 1])
# MDP
self.P = P
self.R = R
self.gamma = gamma
self.nS = nS
self.nA = nA
self.shape = shape
# state features
self.state_vars = state_vars
# test
self.test_p_stochastic()
def test_p_stochastic(self):
for s in range(self.nS):
np.testing.assert_array_almost_equal(self.P[s, :, :].sum(axis=1), np.ones(self.nA), decimal=2)
print("P is stochastic")