Module AssetAllocator.algorithms.A2C.a2c
Expand source code
import math
import random
import sys
sys.path.append('..')
import numpy as np
import pandas as pd
import torch
import torch.nn as nn
import torch.optim as optim
import torch.nn.functional as F
# helper function to convert numpy arrays to tensors
def tensor(x):
x = np.array(x) if not isinstance(x, np.ndarray) else x
return torch.from_numpy(x).float()
class Actor(nn.Module):
"""This is the actor network for the A2C Agent.
Original paper can be found at https://arxiv.org/abs/1802.09477
This implementation was adapted from https://github.com/higgsfield/RL-Adventure-2/blob/master/1.actor-critic.ipynb
"""
def __init__(self, state_dim, hidden_dim, n_actions):
super().__init__()
self.n_actions = n_actions
self.model = nn.Sequential(
nn.Linear(state_dim, hidden_dim),
nn.Tanh(),
nn.Linear(hidden_dim, hidden_dim),
nn.Tanh(),
nn.Linear(hidden_dim, n_actions)
)
logstds_param = nn.Parameter(torch.full((n_actions,), 0.1))
self.register_parameter("logstds", logstds_param)
def forward(self, X):
means = self.model(X)
stds = torch.clamp(self.logstds.exp(), 1e-3, 50)
res = torch.distributions.Normal(means, stds)
return res
## Critic module
class Critic(nn.Module):
"""This is the critic network for the A2C Agent.
Original paper can be found at https://arxiv.org/abs/1802.09477
This implementation was adapted from https://github.com/higgsfield/RL-Adventure-2/blob/master/1.actor-critic.ipynb
"""
def __init__(self, state_dim, hidden_dim):
"""Initializes the A2C Critic Network
Args:
state_dim (int): State space dimension
action_dim (int): Action space dimension
hidden_dim (int): Size of hidden layer
"""
super().__init__()
self.model = nn.Sequential(
nn.Linear(state_dim, hidden_dim),
nn.Tanh(),
nn.Linear(hidden_dim, hidden_dim),
nn.Tanh(),
nn.Linear(hidden_dim, 1),
)
def forward(self, X):
return self.model(X)
def discounted_rewards(rewards, dones, gamma):
ret = 0
discounted = []
for reward, done in zip(rewards[::-1], dones[::-1]):
ret = reward + ret * gamma * (1-done)
discounted.append(ret)
return discounted[::-1]
def process_memory(memory, gamma=0.99, discount_rewards=True):
actions = []
states = []
next_states = []
rewards = []
dones = []
for action, reward, state, next_state, done in memory:
actions.append(action)
rewards.append(reward)
states.append(state)
next_states.append(next_state)
dones.append(done)
if discount_rewards:
if False and dones[-1] == 0:
rewards = discounted_rewards(rewards + [last_value], dones + [0], gamma)[:-1]
else:
rewards = discounted_rewards(rewards, dones, gamma)
actions = tensor(actions)
states = tensor(states)
next_states = tensor(next_states)
rewards = tensor(rewards).view(-1, 1)
dones = tensor(dones).view(-1, 1)
return actions, rewards, states, next_states, dones
def clip_grad_norm_(module, max_grad_norm):
nn.utils.clip_grad_norm_([p for g in module.param_groups for p in g["params"]], max_grad_norm)
class A2CLearner():
def __init__(self, actor, critic, gamma=0.9, entropy_beta=0,
actor_lr=4e-4, critic_lr=4e-3, max_grad_norm=0.5):
self.gamma = gamma
self.max_grad_norm = max_grad_norm
self.actor = actor
self.critic = critic
self.entropy_beta = entropy_beta
self.actor_optim = torch.optim.Adam(actor.parameters(), lr=actor_lr)
self.critic_optim = torch.optim.Adam(critic.parameters(), lr=critic_lr)
def learn(self, memory, steps, discount_rewards=True):
"""
Trains the agent
Params
======
timesteps (int): Number of timesteps the agent should interact with the environment
print_every (int): Verbosity control
"""
actions, rewards, states, next_states, dones = process_memory(memory, self.gamma, discount_rewards)
td_target = rewards + self.gamma * self.critic(next_states) * (1-dones)
value = self.critic(states)
advantage = td_target - value
# actor
norm_dists = self.actor(states)
logs_probs = norm_dists.log_prob(actions)
entropy = norm_dists.entropy().mean()
actor_loss = (-logs_probs*advantage.detach()).mean() - entropy*self.entropy_beta
self.actor_optim.zero_grad()
actor_loss.backward()
clip_grad_norm_(self.actor_optim, self.max_grad_norm)
self.actor_optim.step()
# critic
critic_loss = F.mse_loss(td_target, value)
self.critic_optim.zero_grad()
critic_loss.backward()
clip_grad_norm_(self.critic_optim, self.max_grad_norm)
self.critic_optim.step()
def predict(self, state):
"""Returns the action for a given state
Params
======
state (array_like): current state
"""
dists = self.actor(tensor(state))
actions = dists.sample()
actions_clipped = torch.nn.Softmax(dim = 0)(actions).detach().data.numpy()
return actions_clipped
class Runner():
def __init__(self, env, actor):
self.env = env
self.actor = actor
self.state = None
self.done = True
self.steps = 0
self.episode_reward = 0
self.episode_rewards = []
def reset(self):
self.episode_reward = 0
self.done = False
self.state = self.env.reset()
def run(self, max_steps, print_every, memory=None):
if not memory: memory = []
count_of_dones = 0
flag = False
for i in range(max_steps):
if self.done:
self.reset()
dists = self.actor(tensor(self.state))
actions = dists.sample()
actions_clipped = torch.nn.Softmax(dim = 0)(actions).detach().data.numpy()
next_state, reward, self.done, info = self.env.step(actions_clipped)
memory.append((actions_clipped, reward, self.state, next_state, self.done))
self.state = next_state
self.steps += 1
self.episode_reward += reward
if self.done:
count_of_dones += 1
self.episode_rewards.append(self.episode_reward)
flag = True
if flag and count_of_dones % print_every == 0:
print(f'Score at timestep {self.steps}: {self.episode_reward}.')
flag = False
return memory
Functions
def clip_grad_norm_(module, max_grad_norm)
-
Expand source code
def clip_grad_norm_(module, max_grad_norm): nn.utils.clip_grad_norm_([p for g in module.param_groups for p in g["params"]], max_grad_norm)
def discounted_rewards(rewards, dones, gamma)
-
Expand source code
def discounted_rewards(rewards, dones, gamma): ret = 0 discounted = [] for reward, done in zip(rewards[::-1], dones[::-1]): ret = reward + ret * gamma * (1-done) discounted.append(ret) return discounted[::-1]
def process_memory(memory, gamma=0.99, discount_rewards=True)
-
Expand source code
def process_memory(memory, gamma=0.99, discount_rewards=True): actions = [] states = [] next_states = [] rewards = [] dones = [] for action, reward, state, next_state, done in memory: actions.append(action) rewards.append(reward) states.append(state) next_states.append(next_state) dones.append(done) if discount_rewards: if False and dones[-1] == 0: rewards = discounted_rewards(rewards + [last_value], dones + [0], gamma)[:-1] else: rewards = discounted_rewards(rewards, dones, gamma) actions = tensor(actions) states = tensor(states) next_states = tensor(next_states) rewards = tensor(rewards).view(-1, 1) dones = tensor(dones).view(-1, 1) return actions, rewards, states, next_states, dones
def tensor(x)
-
Expand source code
def tensor(x): x = np.array(x) if not isinstance(x, np.ndarray) else x return torch.from_numpy(x).float()
Classes
class A2CLearner (actor, critic, gamma=0.9, entropy_beta=0, actor_lr=0.0004, critic_lr=0.004, max_grad_norm=0.5)
-
Expand source code
class A2CLearner(): def __init__(self, actor, critic, gamma=0.9, entropy_beta=0, actor_lr=4e-4, critic_lr=4e-3, max_grad_norm=0.5): self.gamma = gamma self.max_grad_norm = max_grad_norm self.actor = actor self.critic = critic self.entropy_beta = entropy_beta self.actor_optim = torch.optim.Adam(actor.parameters(), lr=actor_lr) self.critic_optim = torch.optim.Adam(critic.parameters(), lr=critic_lr) def learn(self, memory, steps, discount_rewards=True): """ Trains the agent Params ====== timesteps (int): Number of timesteps the agent should interact with the environment print_every (int): Verbosity control """ actions, rewards, states, next_states, dones = process_memory(memory, self.gamma, discount_rewards) td_target = rewards + self.gamma * self.critic(next_states) * (1-dones) value = self.critic(states) advantage = td_target - value # actor norm_dists = self.actor(states) logs_probs = norm_dists.log_prob(actions) entropy = norm_dists.entropy().mean() actor_loss = (-logs_probs*advantage.detach()).mean() - entropy*self.entropy_beta self.actor_optim.zero_grad() actor_loss.backward() clip_grad_norm_(self.actor_optim, self.max_grad_norm) self.actor_optim.step() # critic critic_loss = F.mse_loss(td_target, value) self.critic_optim.zero_grad() critic_loss.backward() clip_grad_norm_(self.critic_optim, self.max_grad_norm) self.critic_optim.step() def predict(self, state): """Returns the action for a given state Params ====== state (array_like): current state """ dists = self.actor(tensor(state)) actions = dists.sample() actions_clipped = torch.nn.Softmax(dim = 0)(actions).detach().data.numpy() return actions_clipped
Methods
def learn(self, memory, steps, discount_rewards=True)
-
Trains the agent Params ====== timesteps (int): Number of timesteps the agent should interact with the environment print_every (int): Verbosity control
Expand source code
def learn(self, memory, steps, discount_rewards=True): """ Trains the agent Params ====== timesteps (int): Number of timesteps the agent should interact with the environment print_every (int): Verbosity control """ actions, rewards, states, next_states, dones = process_memory(memory, self.gamma, discount_rewards) td_target = rewards + self.gamma * self.critic(next_states) * (1-dones) value = self.critic(states) advantage = td_target - value # actor norm_dists = self.actor(states) logs_probs = norm_dists.log_prob(actions) entropy = norm_dists.entropy().mean() actor_loss = (-logs_probs*advantage.detach()).mean() - entropy*self.entropy_beta self.actor_optim.zero_grad() actor_loss.backward() clip_grad_norm_(self.actor_optim, self.max_grad_norm) self.actor_optim.step() # critic critic_loss = F.mse_loss(td_target, value) self.critic_optim.zero_grad() critic_loss.backward() clip_grad_norm_(self.critic_optim, self.max_grad_norm) self.critic_optim.step()
def predict(self, state)
-
Returns the action for a given state
Params
state (array_like): current state
Expand source code
def predict(self, state): """Returns the action for a given state Params ====== state (array_like): current state """ dists = self.actor(tensor(state)) actions = dists.sample() actions_clipped = torch.nn.Softmax(dim = 0)(actions).detach().data.numpy() return actions_clipped
class Actor (state_dim, hidden_dim, n_actions)
-
This is the actor network for the A2C Agent. Original paper can be found at https://arxiv.org/abs/1802.09477 This implementation was adapted from https://github.com/higgsfield/RL-Adventure-2/blob/master/1.actor-critic.ipynb
Initializes internal Module state, shared by both nn.Module and ScriptModule.
Expand source code
class Actor(nn.Module): """This is the actor network for the A2C Agent. Original paper can be found at https://arxiv.org/abs/1802.09477 This implementation was adapted from https://github.com/higgsfield/RL-Adventure-2/blob/master/1.actor-critic.ipynb """ def __init__(self, state_dim, hidden_dim, n_actions): super().__init__() self.n_actions = n_actions self.model = nn.Sequential( nn.Linear(state_dim, hidden_dim), nn.Tanh(), nn.Linear(hidden_dim, hidden_dim), nn.Tanh(), nn.Linear(hidden_dim, n_actions) ) logstds_param = nn.Parameter(torch.full((n_actions,), 0.1)) self.register_parameter("logstds", logstds_param) def forward(self, X): means = self.model(X) stds = torch.clamp(self.logstds.exp(), 1e-3, 50) res = torch.distributions.Normal(means, stds) return res
Ancestors
- torch.nn.modules.module.Module
Class variables
var dump_patches : bool
var training : bool
Methods
def forward(self, X) ‑> Callable[..., Any]
-
Defines the computation performed at every call.
Should be overridden by all subclasses.
Note
Although the recipe for forward pass needs to be defined within this function, one should call the :class:
Module
instance afterwards instead of this since the former takes care of running the registered hooks while the latter silently ignores them.Expand source code
def forward(self, X): means = self.model(X) stds = torch.clamp(self.logstds.exp(), 1e-3, 50) res = torch.distributions.Normal(means, stds) return res
class Critic (state_dim, hidden_dim)
-
This is the critic network for the A2C Agent. Original paper can be found at https://arxiv.org/abs/1802.09477 This implementation was adapted from https://github.com/higgsfield/RL-Adventure-2/blob/master/1.actor-critic.ipynb
Initializes the A2C Critic Network
Args
state_dim
:int
- State space dimension
action_dim
:int
- Action space dimension
hidden_dim
:int
- Size of hidden layer
Expand source code
class Critic(nn.Module): """This is the critic network for the A2C Agent. Original paper can be found at https://arxiv.org/abs/1802.09477 This implementation was adapted from https://github.com/higgsfield/RL-Adventure-2/blob/master/1.actor-critic.ipynb """ def __init__(self, state_dim, hidden_dim): """Initializes the A2C Critic Network Args: state_dim (int): State space dimension action_dim (int): Action space dimension hidden_dim (int): Size of hidden layer """ super().__init__() self.model = nn.Sequential( nn.Linear(state_dim, hidden_dim), nn.Tanh(), nn.Linear(hidden_dim, hidden_dim), nn.Tanh(), nn.Linear(hidden_dim, 1), ) def forward(self, X): return self.model(X)
Ancestors
- torch.nn.modules.module.Module
Class variables
var dump_patches : bool
var training : bool
Methods
def forward(self, X) ‑> Callable[..., Any]
-
Defines the computation performed at every call.
Should be overridden by all subclasses.
Note
Although the recipe for forward pass needs to be defined within this function, one should call the :class:
Module
instance afterwards instead of this since the former takes care of running the registered hooks while the latter silently ignores them.Expand source code
def forward(self, X): return self.model(X)
class Runner (env, actor)
-
Expand source code
class Runner(): def __init__(self, env, actor): self.env = env self.actor = actor self.state = None self.done = True self.steps = 0 self.episode_reward = 0 self.episode_rewards = [] def reset(self): self.episode_reward = 0 self.done = False self.state = self.env.reset() def run(self, max_steps, print_every, memory=None): if not memory: memory = [] count_of_dones = 0 flag = False for i in range(max_steps): if self.done: self.reset() dists = self.actor(tensor(self.state)) actions = dists.sample() actions_clipped = torch.nn.Softmax(dim = 0)(actions).detach().data.numpy() next_state, reward, self.done, info = self.env.step(actions_clipped) memory.append((actions_clipped, reward, self.state, next_state, self.done)) self.state = next_state self.steps += 1 self.episode_reward += reward if self.done: count_of_dones += 1 self.episode_rewards.append(self.episode_reward) flag = True if flag and count_of_dones % print_every == 0: print(f'Score at timestep {self.steps}: {self.episode_reward}.') flag = False return memory
Methods
def reset(self)
-
Expand source code
def reset(self): self.episode_reward = 0 self.done = False self.state = self.env.reset()
def run(self, max_steps, print_every, memory=None)
-
Expand source code
def run(self, max_steps, print_every, memory=None): if not memory: memory = [] count_of_dones = 0 flag = False for i in range(max_steps): if self.done: self.reset() dists = self.actor(tensor(self.state)) actions = dists.sample() actions_clipped = torch.nn.Softmax(dim = 0)(actions).detach().data.numpy() next_state, reward, self.done, info = self.env.step(actions_clipped) memory.append((actions_clipped, reward, self.state, next_state, self.done)) self.state = next_state self.steps += 1 self.episode_reward += reward if self.done: count_of_dones += 1 self.episode_rewards.append(self.episode_reward) flag = True if flag and count_of_dones % print_every == 0: print(f'Score at timestep {self.steps}: {self.episode_reward}.') flag = False return memory