Module AssetAllocator.algorithms.REINFORCE.reinforce_continuous
Expand source code
import sys
import math
import torch
import torch.autograd as autograd
import torch.nn as nn
import torch.nn.functional as F
import torch.optim as optim
import torch.nn.utils as utils
import torchvision.transforms as T
from torch.autograd import Variable
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
pi = Variable(torch.FloatTensor([math.pi])).to(device)
def normal(x, mu, sigma_sq):
a = (-1*(Variable(x)-mu).pow(2)/(2*sigma_sq)).exp()
b = 1/(2*sigma_sq*pi.expand_as(sigma_sq)).sqrt()
return a*b
class Policy(nn.Module):
"""This is the policy network for the REINFORCCE Agent.
This implementation was adapted from https://github.com/chingyaoc/pytorch-REINFORCE
"""
def __init__(self, hidden_size, num_inputs, action_space):
super(Policy, self).__init__()
self.action_space = action_space
num_outputs = action_space.shape[0]
self.linear1 = nn.Linear(num_inputs, hidden_size)
self.linear2 = nn.Linear(hidden_size, num_outputs)
self.linear2_ = nn.Linear(hidden_size, num_outputs)
def forward(self, inputs):
x = inputs
x = F.relu(self.linear1(x))
mu = self.linear2(x)
sigma_sq = self.linear2_(x)
return mu, sigma_sq
class REINFORCE:
def __init__(self, hidden_size, num_inputs, action_space):
"""Initializes the REINFORCE Agent
Args:
hidden_size (int): Size of hidden layer neurons in the policy network
num_inputs (str): input size to the first hidden dimension.
action_space (gym.env.Box): Action space of the gym environment
"""
self.action_space = action_space
self.model = Policy(hidden_size, num_inputs, action_space)
self.model = self.model.to(device)
self.optimizer = optim.Adam(self.model.parameters(), lr=1e-3)
self.model.train()
def select_action(self, state):
"""Takes in current environment's state and returns the agent's action
Args:
state (array_like): Current environment state
Returns:
action: Agent's action
"""
mu, sigma_sq = self.model(Variable(state).to(device))
sigma_sq = F.softplus(sigma_sq)
eps = torch.randn(mu.size())
# calculate the probability
action = (mu + sigma_sq.sqrt()*Variable(eps).to(device)).data
prob = normal(action, mu, sigma_sq)
entropy = -0.5*((sigma_sq+2*pi.expand_as(sigma_sq)).log()+1)
log_prob = prob.log()
return action, log_prob, entropy
def update_parameters(self, rewards, log_probs, entropies, gamma):
"""Takes in previous reward, log probabilities, entropies and noise to update the policy network's parameters
Args:
rewards (array_like): rewards from previous step
log_probs
entropies
gamma
Returns:
action: Agent's action
"""
R = torch.zeros(1, 1)
loss = 0
for i in reversed(range(len(rewards))):
R = gamma * R + rewards[i]
loss = loss - (log_probs[i]*(Variable(R).expand_as(log_probs[i])).to(
device)).sum() - (0.0001*entropies[i].to(device)).sum()
loss = loss / len(rewards)
self.optimizer.zero_grad()
loss.backward()
utils.clip_grad_norm_(self.model.parameters(), 40)
self.optimizer.step()
Functions
def normal(x, mu, sigma_sq)
-
Expand source code
def normal(x, mu, sigma_sq): a = (-1*(Variable(x)-mu).pow(2)/(2*sigma_sq)).exp() b = 1/(2*sigma_sq*pi.expand_as(sigma_sq)).sqrt() return a*b
Classes
class Policy (hidden_size, num_inputs, action_space)
-
This is the policy network for the REINFORCCE Agent.
This implementation was adapted from https://github.com/chingyaoc/pytorch-REINFORCE
Initializes internal Module state, shared by both nn.Module and ScriptModule.
Expand source code
class Policy(nn.Module): """This is the policy network for the REINFORCCE Agent. This implementation was adapted from https://github.com/chingyaoc/pytorch-REINFORCE """ def __init__(self, hidden_size, num_inputs, action_space): super(Policy, self).__init__() self.action_space = action_space num_outputs = action_space.shape[0] self.linear1 = nn.Linear(num_inputs, hidden_size) self.linear2 = nn.Linear(hidden_size, num_outputs) self.linear2_ = nn.Linear(hidden_size, num_outputs) def forward(self, inputs): x = inputs x = F.relu(self.linear1(x)) mu = self.linear2(x) sigma_sq = self.linear2_(x) return mu, sigma_sq
Ancestors
- torch.nn.modules.module.Module
Class variables
var dump_patches : bool
var training : bool
Methods
def forward(self, inputs) ‑> Callable[..., Any]
-
Defines the computation performed at every call.
Should be overridden by all subclasses.
Note
Although the recipe for forward pass needs to be defined within this function, one should call the :class:
Module
instance afterwards instead of this since the former takes care of running the registered hooks while the latter silently ignores them.Expand source code
def forward(self, inputs): x = inputs x = F.relu(self.linear1(x)) mu = self.linear2(x) sigma_sq = self.linear2_(x) return mu, sigma_sq
class REINFORCE (hidden_size, num_inputs, action_space)
-
Initializes the REINFORCE Agent
Args
hidden_size
:int
- Size of hidden layer neurons in the policy network
num_inputs
:str
- input size to the first hidden dimension.
action_space
:gym.env.Box
- Action space of the gym environment
Expand source code
class REINFORCE: def __init__(self, hidden_size, num_inputs, action_space): """Initializes the REINFORCE Agent Args: hidden_size (int): Size of hidden layer neurons in the policy network num_inputs (str): input size to the first hidden dimension. action_space (gym.env.Box): Action space of the gym environment """ self.action_space = action_space self.model = Policy(hidden_size, num_inputs, action_space) self.model = self.model.to(device) self.optimizer = optim.Adam(self.model.parameters(), lr=1e-3) self.model.train() def select_action(self, state): """Takes in current environment's state and returns the agent's action Args: state (array_like): Current environment state Returns: action: Agent's action """ mu, sigma_sq = self.model(Variable(state).to(device)) sigma_sq = F.softplus(sigma_sq) eps = torch.randn(mu.size()) # calculate the probability action = (mu + sigma_sq.sqrt()*Variable(eps).to(device)).data prob = normal(action, mu, sigma_sq) entropy = -0.5*((sigma_sq+2*pi.expand_as(sigma_sq)).log()+1) log_prob = prob.log() return action, log_prob, entropy def update_parameters(self, rewards, log_probs, entropies, gamma): """Takes in previous reward, log probabilities, entropies and noise to update the policy network's parameters Args: rewards (array_like): rewards from previous step log_probs entropies gamma Returns: action: Agent's action """ R = torch.zeros(1, 1) loss = 0 for i in reversed(range(len(rewards))): R = gamma * R + rewards[i] loss = loss - (log_probs[i]*(Variable(R).expand_as(log_probs[i])).to( device)).sum() - (0.0001*entropies[i].to(device)).sum() loss = loss / len(rewards) self.optimizer.zero_grad() loss.backward() utils.clip_grad_norm_(self.model.parameters(), 40) self.optimizer.step()
Methods
def select_action(self, state)
-
Takes in current environment's state and returns the agent's action
Args
state
:array_like
- Current environment state
Returns
action
- Agent's action
Expand source code
def select_action(self, state): """Takes in current environment's state and returns the agent's action Args: state (array_like): Current environment state Returns: action: Agent's action """ mu, sigma_sq = self.model(Variable(state).to(device)) sigma_sq = F.softplus(sigma_sq) eps = torch.randn(mu.size()) # calculate the probability action = (mu + sigma_sq.sqrt()*Variable(eps).to(device)).data prob = normal(action, mu, sigma_sq) entropy = -0.5*((sigma_sq+2*pi.expand_as(sigma_sq)).log()+1) log_prob = prob.log() return action, log_prob, entropy
def update_parameters(self, rewards, log_probs, entropies, gamma)
-
Takes in previous reward, log probabilities, entropies and noise to update the policy network's parameters
Args
rewards
:array_like
- rewards from previous step
log_probs entropies gamma
Returns
action
- Agent's action
Expand source code
def update_parameters(self, rewards, log_probs, entropies, gamma): """Takes in previous reward, log probabilities, entropies and noise to update the policy network's parameters Args: rewards (array_like): rewards from previous step log_probs entropies gamma Returns: action: Agent's action """ R = torch.zeros(1, 1) loss = 0 for i in reversed(range(len(rewards))): R = gamma * R + rewards[i] loss = loss - (log_probs[i]*(Variable(R).expand_as(log_probs[i])).to( device)).sum() - (0.0001*entropies[i].to(device)).sum() loss = loss / len(rewards) self.optimizer.zero_grad() loss.backward() utils.clip_grad_norm_(self.model.parameters(), 40) self.optimizer.step()