Module AssetAllocator.algorithms.DDPG.agent
Script that contains the training and testing loops
Script that contains the training and testing loops
import gym
import os
import pickle
import numpy as np
import torch
import torch.nn as nn
import torch.nn.functional as F
from .Network import Actor, Critic
from .Replay_Memory import ReplayMemory
from .OU_Noise import OrnsteinUhlenbeckNoise
class DDPGAgentHelper:
"""This is the agent class for the DDPG Agent.
Original paper can be found at
This implementation was adapted from
def __init__(
warmup_steps = 100
"""Helper class for Initializing a DDPG Agent
env (gym object): Gym environment for the agent to interact with
state_dim (int): State space dimension
action_dim (int): Action space dimension
max_action (int): the max value of the range in the action space (assumes a symmetric range in the action space)
device (str, optional): One of cuda or cpu. Defaults to 'cuda'.
memory_capacity (int, optional): Size of replay buffer. Defaults to 10_000.
num_memory_fill_episodes (int, optional): Number of elements to initialize in the replay buffer. Defaults to 10.
discount (float, optional): Reward discount factor. Defaults to 0.99.
tau (float, optional): Polyak averaging soft updates factor (i.e., soft updating of the target networks). Defaults to 0.005.
sigma (float, optional): Amount of noise to be applied to the OU process. Defaults to 0.2.
theta (float, optional): Amount of frictional force to be applied in OU noise generation. Defaults to 0.15.
actor_lr ([type], optional): Actor's learning rate. Defaults to 1e-4.
critic_lr ([type], optional): Critic's learning rate. Defaults to 1e-3.
batch_size (int, optional): Batch size for replay buffer and networks. Defaults to 128.
warmup_steps (int, optional): Memory warmup steps. Defaults to 100.
self.env = env
self.batch_size = batch_size
self.state_dim = state_dim # dimension of the state space
self.action_dim = action_dim # dimension of the action space
self.device = device # defines which cuda or cpu device is to be used to run the networks
# denoted a gamma in the equation for computation of the Q-value = discount
# defines the factor used for Polyak averaging (i.e., soft updating of the target networks)
self.tau = tau
# the max value of the range in the action space (assumes a symmetric range in the action space)
self.max_action = max_action
self.warmup_steps = warmup_steps
# create an instance of the replay buffer
self.memory_capacity = memory_capacity
self.num_memory_fill_episodes = num_memory_fill_episodes
self.memory = ReplayMemory(memory_capacity)
# create an instance of the noise generating process
self.ou_noise = OrnsteinUhlenbeckNoise(
mu=np.zeros(self.action_dim), sigma=sigma, theta=theta)
# instances of the networks for the actor and the critic = Actor(state_dim, action_dim, max_action, actor_lr)
self.critic = Critic(state_dim, action_dim, critic_lr)
# instance of the target networks for the actor and the critic
self.target_actor = Actor(state_dim, action_dim, max_action, actor_lr)
self.target_critic = Critic(state_dim, action_dim, critic_lr)
# initialise the targets to the same weight as their corresponding current networks
# since we do not learn/train on the target networks
def fill_memory(self):
Helper method to fill replay buffer during the warmup steps
epochs = self.warmup_steps//self.env.episode_length + 1
for epoch in range(epochs):
state = self.env.reset()
done = False
while not done:
action = self.env.action_space.sample() # do random action for warmup
action = action/action.sum() #normalize random actions
next_state, reward, done, _ = self.env.step(action)
# store the transition to memory[state, action, next_state, reward, done])
state = next_state
print("Done filling memory")
def _softmax(x, axis=0):
# Use the LogSumExp Trick
max_val = np.amax(x, axis=axis, keepdims=True)
x = x - max_val
# Softmax
num = np.exp(x)
denum = num.sum(axis=axis, keepdims=True)
softmax = num/denum
return softmax
def select_action(self, state):
Function to return the appropriate action for the given state.
During training, it adds a zero-mean OU noise to the action to encourage exploration.
During testing, no noise is added to the action decision.
state (array_like): The current state of the environment as observed by the agent
action: A numpy array representing the noisy action to be performed by the agent in the current state
if not torch.is_tensor(state):
state = torch.tensor(np.array([state]), dtype=torch.float32).to(self.device)
# performs inference using the actor based on the current state as the input and returns the corresponding np array
act =
noise = 0.0
# for adding Gaussian noise (to use, update the code pass the exploration noise as input)
# if self.train_mode:
# noise = np.random.normal(0.0, exploration_noise, size=act.shape) # generate the zero-mean gaussian noise with standard deviation determined by exploration_noise
# for adding OU noise
# if self.train_mode:
noise = self.ou_noise.generate_noise()
noisy_action = act + noise
# to ensure that the noisy action being returned is within the limit of "legal" actions afforded to the agent;
noisy_action = noisy_action.clip(
min=0, max=self.max_action)
return DDPGAgentHelper._softmax(noisy_action)
def _learn(self):
Function to perform the updates on the 4 neural networks that run the DDPG algorithm.
if len(self.memory) < self.batch_size:
states, actions, next_states, rewards, dones = self.memory.sample(
self.batch_size, self.device) # a batch of experiences randomly sampled form the memory
# ensure that the actions and rewards tensors have the appropriate shapes
actions = actions.view(-1, self.action_dim)
rewards = rewards.view(-1, 1)
with torch.no_grad():
# generate target actions
target_action = self.target_actor(next_states)
# calculate TD-Target
target_q = self.target_critic(next_states, target_action)
# being in a terminal state implies there are no more future states that the agent would encounter in the given episode and so set the associated Q-value to 0
target_q[dones] = 0.0
y = rewards + * target_q
current_q = self.critic(states, actions)
critic_loss = F.mse_loss(current_q, y).mean()
# actor loss is calculated by a gradient ascent along the crtic, thus need to apply the negative sign to convert to a gradient descent
pred_current_actions =
pred_current_q = self.critic(states, pred_current_actions)
actor_loss = - pred_current_q.mean()
# apply slow-update to the target networks
def learn(self, timesteps, print_every=100):
Trains the agent
timesteps (int): Number of timesteps the agent should interact with the environment
print_every (int): Verbosity control
self.fill_memory() # to populate the replay buffer before learning begins
self.train(timesteps, print_every)
def predict(self, state):
"""Returns agent's action based on a given state
state (array_like): Current environment state
action (array_like): Agent's action
ou = self.ou_noise
self.ounoise = None
action = self.select_action(state)
self.ounoise = ou
return action
def soft_update_net(self, source_net_params, target_net_params):
Perform Polyak averaging to update the parameters of the provided network
source_net_params (list): trainable parameters of the source, ie. current version of the network
target_net_params (list): trainable parameters of the corresponding target network
for source_param, target_param in zip(source_net_params, target_net_params):
self.tau * + (1 - self.tau) *
def soft_update_targets(self):
""" Function that calls Polyak averaging on both target networks """
def train(self, timesteps, print_every):
"""Helper method to train the agent
timesteps (int): Total timesteps the agent has interacted for
print_every (int): Verbosity control
reward_history = [] # tracks the reward per episode
best_score = -np.inf
epochs = timesteps//self.env.episode_length + 1
total_steps = 0
flag = False
count_of_dones = 0
for ep_cnt in range(epochs):
done = False
state = self.env.reset()
ep_reward = 0
while not done:
action = self.select_action(state) # generate noisy action
# print("Action:", action)
next_state, reward, done, _ = self.env.step(
action) # execute the action in the environment
# store the interaction in the replay buffer[state, action, next_state, reward, done])
self._learn(total_steps) # update the networks
state = next_state
total_steps += 1
ep_reward += reward
if done:
count_of_dones += 1
flag = True
if flag and count_of_dones % print_every == 0:
print(f'Score at timestep {total_steps}: {ep_reward}.')
flag = False
if total_steps >= timesteps:
def save(self, file_name):
Saves trained model
filepath(str) : folder path to save the agent
def load(self, file_name):
Loads trained model
filepath(str) : folder path to save the agent
# def save(self, path, model_name):
#'{}/{}_actor'.format(path, model_name))
# self.critic.save_model('{}/{}_critic'.format(path, model_name))
# def load(self, path, model_name):
#'{}/{}_actor'.format(path, model_name))
# self.critic.load_model('{}/{}_critic'.format(path, model_name))
default_device = torch.device(
"cuda") if torch.cuda.is_available() else torch.device("cpu")
def DDPGAgent(env, device=default_device):
"""Factory function for creating a DDPG Agent
env (gym object): Gym environment for the agent to interact with
device (string, optional): Device for training - defaults to Cuda if GPU is detected
agent: DDPG Agent Instance
ddpg_agent = DDPGAgentHelper(env=env,
return ddpg_agent
def DDPGAgent(env, device=device(type='cpu'))
Factory function for creating a DDPG Agent
:gym object
- Gym environment for the agent to interact with
, optional- Device for training - defaults to Cuda if GPU is detected
- DDPG Agent Instance
class DDPGAgentHelper (env, state_dim, action_dim, max_action, device, memory_capacity=10000, num_memory_fill_episodes=10, discount=0.99, tau=0.005, sigma=0.2, theta=0.15, actor_lr=0.0001, critic_lr=0.001, batch_size=64, warmup_steps=100)
This is the agent class for the DDPG Agent.
Original paper can be found at
This implementation was adapted from
Helper class for Initializing a DDPG Agent
:gym object
- Gym environment for the agent to interact with
- State space dimension
- Action space dimension
- the max value of the range in the action space (assumes a symmetric range in the action space)
, optional- One of cuda or cpu. Defaults to 'cuda'.
, optional- Size of replay buffer. Defaults to 10_000.
, optional- Number of elements to initialize in the replay buffer. Defaults to 10.
, optional- Reward discount factor. Defaults to 0.99.
, optional- Polyak averaging soft updates factor (i.e., soft updating of the target networks). Defaults to 0.005.
, optional- Amount of noise to be applied to the OU process. Defaults to 0.2.
, optional- Amount of frictional force to be applied in OU noise generation. Defaults to 0.15.
, optional- Actor's learning rate. Defaults to 1e-4.
, optional- Critic's learning rate. Defaults to 1e-3.
, optional- Batch size for replay buffer and networks. Defaults to 128.
, optional- Memory warmup steps. Defaults to 100.
