Module modules.algo.td
Expand source code
import numpy as np
class OneStepTDControl:
def __init__(self, env, policy, mode, alpha):
self.env = env
self.policy = policy
self.alpha = alpha
assert mode == 'sarsa' or mode == 'qlearning'
self.mode = mode
self.initialize_tables()
def initialize_tables(self):
self.q = self.policy.q.copy()
self.num_updates = np.zeros_like(self.q)
def sample_greedy_trajectory(self):
states = []
self.env.reset()
while not self.env.is_episode_terminated():
s = self.env.current_coord; states.append(s)
a = self.policy.act_greedily(s)
s_prime, r = self.env.step(a)
states.append(self.env.current_coord)
return states
def do_td_control_for_one_trajectory(self):
total_reward = 0
self.env.reset()
while not self.env.is_episode_terminated():
s = self.env.current_coord
a = self.policy.act_softly(s)
s_prime, r = self.env.step(a); total_reward += r
if self.mode == 'sarsa':
bellman_sample = r + self.q[s_prime][self.policy.act_softly(s)]
elif self.mode == 'qlearning':
bellman_sample = r + self.q[s_prime][self.policy.act_greedily(s)]
self.q[s][a] = self.q[s][a] + self.alpha * (bellman_sample - self.q[s][a])
self.num_updates[s][a] += 1
self.policy.q = self.q.copy() # update the policy automatically
return total_reward
def run(self, max_iterations, which_tqdm):
if which_tqdm == 'terminal':
from tqdm import tqdm
elif which_tqdm == 'notebook':
from tqdm.notebook import tqdm
total_rewards = []
for i in tqdm(range(max_iterations), leave=False):
total_rewards.append(self.do_td_control_for_one_trajectory())
return total_rewards
class NStepTDPrediction:
def __init__(self, env, policy, alpha, n, gamma=1, use_td_errors=False, true_v=None):
self.env = env
self.policy = policy
self.alpha = alpha
self.n = n
self.gamma = gamma
self.use_td_errors = use_td_errors # for solution to exercise 7.2
self.true_v = true_v # for solution to exercise 7.2
self.initialize_tables()
def initialize_tables(self):
self.v = np.zeros(self.env.state_space_shape)
def do_td_prediction_for_one_trajectory(self):
states = {}
rewards = {}
td_errors = {}
self.env.reset()
states[0] = self.env.current_coord
T = np.inf
t = 0
while True:
if t < T:
a = self.policy.act_softly(states[t])
s_prime, r = self.env.step(a)
states[t+1] = s_prime
rewards[t+1] = r
td_errors[t] = r + self.gamma * self.v[s_prime] - self.v[states[t]]
if self.env.is_episode_terminated():
T = t + 1
if self.use_td_errors:
break # we won't be updating the value of the terminal state, so no further information needed beyond this point
if not self.use_td_errors:
tau = t - self.n + 1 # at timestep t, update the value of the state encountered at timestep tau
if tau >= 0: # no changes at all are made during the first n - 1 steps of each episode
# G_{t : t + n} is
# - the truncated return for time t up until time t + n (after n actions are taken)
# - plus the discounted estimate (gamma ** n) * v(S_{t+n}) at the end
first_index = tau + 1
final_index = np.min([tau + self.n, T])
truncated_G = np.sum([
self.gamma ** (i - tau - 1) * rewards[i] for i in np.arange(first_index, final_index+1)
])
if tau + self.n < T:
corrected_G = truncated_G + (self.gamma ** self.n) * self.v[states[tau + self.n]]
else:
corrected_G = truncated_G
error = corrected_G - self.v[states[tau]]
self.v[states[tau]] = self.v[states[tau]] + self.alpha * error
if tau == T - 1: break # the terminal state needs no update since no further actions are taken
t += 1
if self.use_td_errors:
t = 0
while True:
tau = t - self.n + 1
if tau >= 0:
first_index = tau
final_index = np.min([tau + self.n - 1, T-1]) # td error is zero for the Tth timestep
error = np.sum([
self.gamma ** (k - tau) * td_errors[k] for k in np.arange(tau, final_index+1)
]) # see solution to exercise 7.1 for a derivation of this formula
self.v[states[tau]] = self.v[states[tau]] + self.alpha * error
if tau == T - 1: break
t += 1
if self.true_v is not None:
return np.mean((self.true_v - self.v) ** 2) ** (0.5)
def run(self, max_iterations, which_tqdm, seed):
if which_tqdm == 'terminal':
from tqdm import tqdm
elif which_tqdm == 'notebook':
from tqdm.notebook import tqdm
np.random.seed(seed)
if self.true_v is None:
for i in tqdm(range(max_iterations), leave=False):
self.do_td_prediction_for_one_trajectory()
else:
rms_errors = []
for i in tqdm(range(max_iterations), leave=False):
rms_errors.append(self.do_td_prediction_for_one_trajectory())
return rms_errors
Classes
class NStepTDPrediction (env, policy, alpha, n, gamma=1, use_td_errors=False, true_v=None)
-
Expand source code
class NStepTDPrediction: def __init__(self, env, policy, alpha, n, gamma=1, use_td_errors=False, true_v=None): self.env = env self.policy = policy self.alpha = alpha self.n = n self.gamma = gamma self.use_td_errors = use_td_errors # for solution to exercise 7.2 self.true_v = true_v # for solution to exercise 7.2 self.initialize_tables() def initialize_tables(self): self.v = np.zeros(self.env.state_space_shape) def do_td_prediction_for_one_trajectory(self): states = {} rewards = {} td_errors = {} self.env.reset() states[0] = self.env.current_coord T = np.inf t = 0 while True: if t < T: a = self.policy.act_softly(states[t]) s_prime, r = self.env.step(a) states[t+1] = s_prime rewards[t+1] = r td_errors[t] = r + self.gamma * self.v[s_prime] - self.v[states[t]] if self.env.is_episode_terminated(): T = t + 1 if self.use_td_errors: break # we won't be updating the value of the terminal state, so no further information needed beyond this point if not self.use_td_errors: tau = t - self.n + 1 # at timestep t, update the value of the state encountered at timestep tau if tau >= 0: # no changes at all are made during the first n - 1 steps of each episode # G_{t : t + n} is # - the truncated return for time t up until time t + n (after n actions are taken) # - plus the discounted estimate (gamma ** n) * v(S_{t+n}) at the end first_index = tau + 1 final_index = np.min([tau + self.n, T]) truncated_G = np.sum([ self.gamma ** (i - tau - 1) * rewards[i] for i in np.arange(first_index, final_index+1) ]) if tau + self.n < T: corrected_G = truncated_G + (self.gamma ** self.n) * self.v[states[tau + self.n]] else: corrected_G = truncated_G error = corrected_G - self.v[states[tau]] self.v[states[tau]] = self.v[states[tau]] + self.alpha * error if tau == T - 1: break # the terminal state needs no update since no further actions are taken t += 1 if self.use_td_errors: t = 0 while True: tau = t - self.n + 1 if tau >= 0: first_index = tau final_index = np.min([tau + self.n - 1, T-1]) # td error is zero for the Tth timestep error = np.sum([ self.gamma ** (k - tau) * td_errors[k] for k in np.arange(tau, final_index+1) ]) # see solution to exercise 7.1 for a derivation of this formula self.v[states[tau]] = self.v[states[tau]] + self.alpha * error if tau == T - 1: break t += 1 if self.true_v is not None: return np.mean((self.true_v - self.v) ** 2) ** (0.5) def run(self, max_iterations, which_tqdm, seed): if which_tqdm == 'terminal': from tqdm import tqdm elif which_tqdm == 'notebook': from tqdm.notebook import tqdm np.random.seed(seed) if self.true_v is None: for i in tqdm(range(max_iterations), leave=False): self.do_td_prediction_for_one_trajectory() else: rms_errors = [] for i in tqdm(range(max_iterations), leave=False): rms_errors.append(self.do_td_prediction_for_one_trajectory()) return rms_errors
Methods
def do_td_prediction_for_one_trajectory(self)
-
Expand source code
def do_td_prediction_for_one_trajectory(self): states = {} rewards = {} td_errors = {} self.env.reset() states[0] = self.env.current_coord T = np.inf t = 0 while True: if t < T: a = self.policy.act_softly(states[t]) s_prime, r = self.env.step(a) states[t+1] = s_prime rewards[t+1] = r td_errors[t] = r + self.gamma * self.v[s_prime] - self.v[states[t]] if self.env.is_episode_terminated(): T = t + 1 if self.use_td_errors: break # we won't be updating the value of the terminal state, so no further information needed beyond this point if not self.use_td_errors: tau = t - self.n + 1 # at timestep t, update the value of the state encountered at timestep tau if tau >= 0: # no changes at all are made during the first n - 1 steps of each episode # G_{t : t + n} is # - the truncated return for time t up until time t + n (after n actions are taken) # - plus the discounted estimate (gamma ** n) * v(S_{t+n}) at the end first_index = tau + 1 final_index = np.min([tau + self.n, T]) truncated_G = np.sum([ self.gamma ** (i - tau - 1) * rewards[i] for i in np.arange(first_index, final_index+1) ]) if tau + self.n < T: corrected_G = truncated_G + (self.gamma ** self.n) * self.v[states[tau + self.n]] else: corrected_G = truncated_G error = corrected_G - self.v[states[tau]] self.v[states[tau]] = self.v[states[tau]] + self.alpha * error if tau == T - 1: break # the terminal state needs no update since no further actions are taken t += 1 if self.use_td_errors: t = 0 while True: tau = t - self.n + 1 if tau >= 0: first_index = tau final_index = np.min([tau + self.n - 1, T-1]) # td error is zero for the Tth timestep error = np.sum([ self.gamma ** (k - tau) * td_errors[k] for k in np.arange(tau, final_index+1) ]) # see solution to exercise 7.1 for a derivation of this formula self.v[states[tau]] = self.v[states[tau]] + self.alpha * error if tau == T - 1: break t += 1 if self.true_v is not None: return np.mean((self.true_v - self.v) ** 2) ** (0.5)
def initialize_tables(self)
-
Expand source code
def initialize_tables(self): self.v = np.zeros(self.env.state_space_shape)
def run(self, max_iterations, which_tqdm, seed)
-
Expand source code
def run(self, max_iterations, which_tqdm, seed): if which_tqdm == 'terminal': from tqdm import tqdm elif which_tqdm == 'notebook': from tqdm.notebook import tqdm np.random.seed(seed) if self.true_v is None: for i in tqdm(range(max_iterations), leave=False): self.do_td_prediction_for_one_trajectory() else: rms_errors = [] for i in tqdm(range(max_iterations), leave=False): rms_errors.append(self.do_td_prediction_for_one_trajectory()) return rms_errors
class OneStepTDControl (env, policy, mode, alpha)
-
Expand source code
class OneStepTDControl: def __init__(self, env, policy, mode, alpha): self.env = env self.policy = policy self.alpha = alpha assert mode == 'sarsa' or mode == 'qlearning' self.mode = mode self.initialize_tables() def initialize_tables(self): self.q = self.policy.q.copy() self.num_updates = np.zeros_like(self.q) def sample_greedy_trajectory(self): states = [] self.env.reset() while not self.env.is_episode_terminated(): s = self.env.current_coord; states.append(s) a = self.policy.act_greedily(s) s_prime, r = self.env.step(a) states.append(self.env.current_coord) return states def do_td_control_for_one_trajectory(self): total_reward = 0 self.env.reset() while not self.env.is_episode_terminated(): s = self.env.current_coord a = self.policy.act_softly(s) s_prime, r = self.env.step(a); total_reward += r if self.mode == 'sarsa': bellman_sample = r + self.q[s_prime][self.policy.act_softly(s)] elif self.mode == 'qlearning': bellman_sample = r + self.q[s_prime][self.policy.act_greedily(s)] self.q[s][a] = self.q[s][a] + self.alpha * (bellman_sample - self.q[s][a]) self.num_updates[s][a] += 1 self.policy.q = self.q.copy() # update the policy automatically return total_reward def run(self, max_iterations, which_tqdm): if which_tqdm == 'terminal': from tqdm import tqdm elif which_tqdm == 'notebook': from tqdm.notebook import tqdm total_rewards = [] for i in tqdm(range(max_iterations), leave=False): total_rewards.append(self.do_td_control_for_one_trajectory()) return total_rewards
Methods
def do_td_control_for_one_trajectory(self)
-
Expand source code
def do_td_control_for_one_trajectory(self): total_reward = 0 self.env.reset() while not self.env.is_episode_terminated(): s = self.env.current_coord a = self.policy.act_softly(s) s_prime, r = self.env.step(a); total_reward += r if self.mode == 'sarsa': bellman_sample = r + self.q[s_prime][self.policy.act_softly(s)] elif self.mode == 'qlearning': bellman_sample = r + self.q[s_prime][self.policy.act_greedily(s)] self.q[s][a] = self.q[s][a] + self.alpha * (bellman_sample - self.q[s][a]) self.num_updates[s][a] += 1 self.policy.q = self.q.copy() # update the policy automatically return total_reward
def initialize_tables(self)
-
Expand source code
def initialize_tables(self): self.q = self.policy.q.copy() self.num_updates = np.zeros_like(self.q)
def run(self, max_iterations, which_tqdm)
-
Expand source code
def run(self, max_iterations, which_tqdm): if which_tqdm == 'terminal': from tqdm import tqdm elif which_tqdm == 'notebook': from tqdm.notebook import tqdm total_rewards = [] for i in tqdm(range(max_iterations), leave=False): total_rewards.append(self.do_td_control_for_one_trajectory()) return total_rewards
def sample_greedy_trajectory(self)
-
Expand source code
def sample_greedy_trajectory(self): states = [] self.env.reset() while not self.env.is_episode_terminated(): s = self.env.current_coord; states.append(s) a = self.policy.act_greedily(s) s_prime, r = self.env.step(a) states.append(self.env.current_coord) return states