Module AssetAllocator.algorithms.DDPG.DDPG

Expand source code
import numpy as np
import torch
import torch.nn as nn
import torch.nn.functional as F

from .Network import Actor, Critic
from .Replay_Memory import ReplayMemory
from .OU_Noise import OrnsteinUhlenbeckNoise

class DDPGAgent:
    """This is the agent class for the DDPG Agent.

    Original paper can be found at https://arxiv.org/abs/1509.02971

    This implementation was adapted from https://github.com/saashanair/rl-series/tree/master/ddpg
    
    """

    def __init__(
        self, 
        state_dim, 
        action_dim,
        max_action, 
        device, 
        memory_capacity=10000, 
        discount=0.99, 
        tau=0.005, 
        sigma=0.2, 
        theta=0.15, 
        actor_lr=1e-4, 
        critic_lr=1e-3, 
        train_mode=True):

        """Initializes the DDPG Agent

        Args:
            state_dim (int): State space dimension
            action_dim (int): Action space dimension
            max_action (int): the max value of the range in the action space (assumes a symmetric range in the action space)
            device (str, optional): One of cuda or cpu. Defaults to 'cuda'.
            memory_capacity ([type], optional): Size of replay buffer. Defaults to 10_000.
            discount (float, optional): Reward discount factor. Defaults to 0.99.
            tau (float, optional): Polyak averaging soft updates factor (i.e., soft updating of the target networks). Defaults to 0.005.
            sigma (float, optional): Amount of noise to be applied to the OU process. Defaults to 0.2.
            theta (float, optional): Amount of frictional force to be applied in OU noise generation. Defaults to 0.15.
            actor_lr ([type], optional): Actor's learning rate. Defaults to 1e-4.
            critic_lr ([type], optional): Critic's learning rate. Defaults to 1e-3.
            train_mode (bool, optional): Training or eval mode flag. Defaults to True.
        """      

        self.train_mode = train_mode # whether the agent is in training or testing mode

        self.state_dim = state_dim # dimension of the state space
        self.action_dim = action_dim # dimension of the action space
        
        self.device = device # defines which cuda or cpu device is to be used to run the networks
        self.discount = discount # denoted a gamma in the equation for computation of the Q-value
        self.tau = tau # defines the factor used for Polyak averaging (i.e., soft updating of the target networks)
        self.max_action = max_action # the max value of the range in the action space (assumes a symmetric range in the action space)
        
        # create an instance of the replay buffer
        self.memory = ReplayMemory(memory_capacity)

        # create an instance of the noise generating process
        self.ou_noise = OrnsteinUhlenbeckNoise(mu=np.zeros(self.action_dim), sigma=sigma, theta=theta)

        # instances of the networks for the actor and the critic
        self.actor = Actor(state_dim, action_dim, max_action, actor_lr)
        self.critic = Critic(state_dim, action_dim, critic_lr)

        # instance of the target networks for the actor and the critic
        self.target_actor = Actor(state_dim, action_dim, max_action, actor_lr)
        self.target_critic = Critic(state_dim, action_dim, critic_lr)

        # initialise the targets to the same weight as their corresponding current networks
        self.target_actor.load_state_dict(self.actor.state_dict())
        self.target_critic.load_state_dict(self.critic.state_dict())

        # since we do not learn/train on the target networks
        self.target_actor.eval()
        self.target_critic.eval()

        # for test mode
        if not self.train_mode:
            self.actor.eval()
            self.critic.eval()
            self.ounoise = None

        self.actor.to(self.device)
        self.critic.to(self.device)

        self.target_actor.to(self.device)
        self.target_critic.to(self.device)

    def select_action(self, state):
        """
        Function to return the appropriate action for the given state.
        During training, it adds a zero-mean OU noise to the action to encourage exploration.
        During testing, no noise is added to the action decision.
        Parameters
        ---
        state (array_like): The current state of the environment as observed by the agent
        
        Returns:
            action: A numpy array representing the noisy action to be performed by the agent in the current state
        """

        if not torch.is_tensor(state):
            state = torch.tensor([state], dtype=torch.float32).to(self.device)
        
        self.actor.eval()
        act = self.actor(state).cpu().data.numpy().flatten() # performs inference using the actor based on the current state as the input and returns the corresponding np array
        self.actor.train()

        noise = 0.0

        ## for adding Gaussian noise (to use, update the code pass the exploration noise as input)
        #if self.train_mode:
        #       noise = np.random.normal(0.0, exploration_noise, size=act.shape) # generate the zero-mean gaussian noise with standard deviation determined by exploration_noise

        # for adding OU noise
        if self.train_mode:
            noise = self.ou_noise.generate_noise()

        noisy_action = act + noise
        noisy_action = noisy_action.clip(min=-self.max_action, max=self.max_action) # to ensure that the noisy action being returned is within the limit of "legal" actions afforded to the agent; assumes action range is symmetric

        return noisy_action

    def learn(self, batchsize):
        """
        Function to perform the updates on the 4 neural networks that run the DDPG algorithm.
        Args: 
            batchsize (int): Number of experiences to be randomly sampled from the memory for the agent to learn from
        """

        if len(self.memory) < batchsize:
            return
        states, actions, next_states, rewards, dones = self.memory.sample(batchsize, self.device) # a batch of experiences randomly sampled form the memory

        # ensure that the actions and rewards tensors have the appropriate shapes
        actions = actions.view(-1, self.action_dim) 
        rewards = rewards.view(-1, 1)

        with torch.no_grad():
            # generate target actions
            target_action = self.target_actor(next_states)

            # calculate TD-Target
            target_q = self.target_critic(next_states, target_action)
            target_q[dones] = 0.0 # being in a terminal state implies there are no more future states that the agent would encounter in the given episode and so set the associated Q-value to 0
            y = rewards + self.discount * target_q

        current_q = self.critic(states, actions)
        critic_loss = F.mse_loss(current_q, y).mean()
        
        self.critic.optimizer.zero_grad()
        critic_loss.backward()
        self.critic.optimizer.step()

        # actor loss is calculated by a gradient ascent along the crtic, thus need to apply the negative sign to convert to a gradient descent
        pred_current_actions = self.actor(states)
        pred_current_q = self.critic(states, pred_current_actions)
        actor_loss = - pred_current_q.mean()

        self.actor.optimizer.zero_grad()
        actor_loss.backward()
        self.actor.optimizer.step()

        # apply slow-update to the target networks
        self.soft_update_targets()


    def soft_update_net(self, source_net_params, target_net_params):
        """
        Perform Polyak averaging to update the parameters of the provided network
        Args:
            source_net_params (list): trainable parameters of the source, ie. current version of the network
            target_net_params (list): trainable parameters of the corresponding target network
        """

        for source_param, target_param in zip(source_net_params, target_net_params):
            target_param.data.copy_(self.tau * source_param.data + (1 - self.tau) * target_param.data)

    def soft_update_targets(self):
        """ Function that calls Polyak averaging on both target networks """

        self.soft_update_net(self.actor.parameters(), self.target_actor.parameters())
        self.soft_update_net(self.critic.parameters(), self.target_critic.parameters())

    def save(self, path, model_name):
        """
            Saves trained model

            Params
            =====
            path(str) : folder path to save the agent's weights
            name(str) : name to save the agent's weights 
        """
        self.actor.save_model('{}/{}_actor'.format(path, model_name))
        self.critic.save_model('{}/{}_critic'.format(path, model_name))

    def load(self, path, model_name):
        """
            Loads trained model

            Params
            =====
            path(str) : folder path to the agent's weights
            name(str) : name of the saved agent's weights 
        """
        self.actor.load_model('{}/{}_actor'.format(path, model_name))
        self.critic.load_model('{}/{}_critic'.format(path, model_name))

Classes

class DDPGAgent (state_dim, action_dim, max_action, device, memory_capacity=10000, discount=0.99, tau=0.005, sigma=0.2, theta=0.15, actor_lr=0.0001, critic_lr=0.001, train_mode=True)

This is the agent class for the DDPG Agent.

Original paper can be found at https://arxiv.org/abs/1509.02971

This implementation was adapted from https://github.com/saashanair/rl-series/tree/master/ddpg

Initializes the DDPG Agent

Args

state_dim : int
State space dimension
action_dim : int
Action space dimension
max_action : int
the max value of the range in the action space (assumes a symmetric range in the action space)
device : str, optional
One of cuda or cpu. Defaults to 'cuda'.
memory_capacity : [type], optional
Size of replay buffer. Defaults to 10_000.
discount : float, optional
Reward discount factor. Defaults to 0.99.
tau : float, optional
Polyak averaging soft updates factor (i.e., soft updating of the target networks). Defaults to 0.005.
sigma : float, optional
Amount of noise to be applied to the OU process. Defaults to 0.2.
theta : float, optional
Amount of frictional force to be applied in OU noise generation. Defaults to 0.15.
actor_lr : [type], optional
Actor's learning rate. Defaults to 1e-4.
critic_lr : [type], optional
Critic's learning rate. Defaults to 1e-3.
train_mode : bool, optional
Training or eval mode flag. Defaults to True.
Expand source code
class DDPGAgent:
    """This is the agent class for the DDPG Agent.

    Original paper can be found at https://arxiv.org/abs/1509.02971

    This implementation was adapted from https://github.com/saashanair/rl-series/tree/master/ddpg
    
    """

    def __init__(
        self, 
        state_dim, 
        action_dim,
        max_action, 
        device, 
        memory_capacity=10000, 
        discount=0.99, 
        tau=0.005, 
        sigma=0.2, 
        theta=0.15, 
        actor_lr=1e-4, 
        critic_lr=1e-3, 
        train_mode=True):

        """Initializes the DDPG Agent

        Args:
            state_dim (int): State space dimension
            action_dim (int): Action space dimension
            max_action (int): the max value of the range in the action space (assumes a symmetric range in the action space)
            device (str, optional): One of cuda or cpu. Defaults to 'cuda'.
            memory_capacity ([type], optional): Size of replay buffer. Defaults to 10_000.
            discount (float, optional): Reward discount factor. Defaults to 0.99.
            tau (float, optional): Polyak averaging soft updates factor (i.e., soft updating of the target networks). Defaults to 0.005.
            sigma (float, optional): Amount of noise to be applied to the OU process. Defaults to 0.2.
            theta (float, optional): Amount of frictional force to be applied in OU noise generation. Defaults to 0.15.
            actor_lr ([type], optional): Actor's learning rate. Defaults to 1e-4.
            critic_lr ([type], optional): Critic's learning rate. Defaults to 1e-3.
            train_mode (bool, optional): Training or eval mode flag. Defaults to True.
        """      

        self.train_mode = train_mode # whether the agent is in training or testing mode

        self.state_dim = state_dim # dimension of the state space
        self.action_dim = action_dim # dimension of the action space
        
        self.device = device # defines which cuda or cpu device is to be used to run the networks
        self.discount = discount # denoted a gamma in the equation for computation of the Q-value
        self.tau = tau # defines the factor used for Polyak averaging (i.e., soft updating of the target networks)
        self.max_action = max_action # the max value of the range in the action space (assumes a symmetric range in the action space)
        
        # create an instance of the replay buffer
        self.memory = ReplayMemory(memory_capacity)

        # create an instance of the noise generating process
        self.ou_noise = OrnsteinUhlenbeckNoise(mu=np.zeros(self.action_dim), sigma=sigma, theta=theta)

        # instances of the networks for the actor and the critic
        self.actor = Actor(state_dim, action_dim, max_action, actor_lr)
        self.critic = Critic(state_dim, action_dim, critic_lr)

        # instance of the target networks for the actor and the critic
        self.target_actor = Actor(state_dim, action_dim, max_action, actor_lr)
        self.target_critic = Critic(state_dim, action_dim, critic_lr)

        # initialise the targets to the same weight as their corresponding current networks
        self.target_actor.load_state_dict(self.actor.state_dict())
        self.target_critic.load_state_dict(self.critic.state_dict())

        # since we do not learn/train on the target networks
        self.target_actor.eval()
        self.target_critic.eval()

        # for test mode
        if not self.train_mode:
            self.actor.eval()
            self.critic.eval()
            self.ounoise = None

        self.actor.to(self.device)
        self.critic.to(self.device)

        self.target_actor.to(self.device)
        self.target_critic.to(self.device)

    def select_action(self, state):
        """
        Function to return the appropriate action for the given state.
        During training, it adds a zero-mean OU noise to the action to encourage exploration.
        During testing, no noise is added to the action decision.
        Parameters
        ---
        state (array_like): The current state of the environment as observed by the agent
        
        Returns:
            action: A numpy array representing the noisy action to be performed by the agent in the current state
        """

        if not torch.is_tensor(state):
            state = torch.tensor([state], dtype=torch.float32).to(self.device)
        
        self.actor.eval()
        act = self.actor(state).cpu().data.numpy().flatten() # performs inference using the actor based on the current state as the input and returns the corresponding np array
        self.actor.train()

        noise = 0.0

        ## for adding Gaussian noise (to use, update the code pass the exploration noise as input)
        #if self.train_mode:
        #       noise = np.random.normal(0.0, exploration_noise, size=act.shape) # generate the zero-mean gaussian noise with standard deviation determined by exploration_noise

        # for adding OU noise
        if self.train_mode:
            noise = self.ou_noise.generate_noise()

        noisy_action = act + noise
        noisy_action = noisy_action.clip(min=-self.max_action, max=self.max_action) # to ensure that the noisy action being returned is within the limit of "legal" actions afforded to the agent; assumes action range is symmetric

        return noisy_action

    def learn(self, batchsize):
        """
        Function to perform the updates on the 4 neural networks that run the DDPG algorithm.
        Args: 
            batchsize (int): Number of experiences to be randomly sampled from the memory for the agent to learn from
        """

        if len(self.memory) < batchsize:
            return
        states, actions, next_states, rewards, dones = self.memory.sample(batchsize, self.device) # a batch of experiences randomly sampled form the memory

        # ensure that the actions and rewards tensors have the appropriate shapes
        actions = actions.view(-1, self.action_dim) 
        rewards = rewards.view(-1, 1)

        with torch.no_grad():
            # generate target actions
            target_action = self.target_actor(next_states)

            # calculate TD-Target
            target_q = self.target_critic(next_states, target_action)
            target_q[dones] = 0.0 # being in a terminal state implies there are no more future states that the agent would encounter in the given episode and so set the associated Q-value to 0
            y = rewards + self.discount * target_q

        current_q = self.critic(states, actions)
        critic_loss = F.mse_loss(current_q, y).mean()
        
        self.critic.optimizer.zero_grad()
        critic_loss.backward()
        self.critic.optimizer.step()

        # actor loss is calculated by a gradient ascent along the crtic, thus need to apply the negative sign to convert to a gradient descent
        pred_current_actions = self.actor(states)
        pred_current_q = self.critic(states, pred_current_actions)
        actor_loss = - pred_current_q.mean()

        self.actor.optimizer.zero_grad()
        actor_loss.backward()
        self.actor.optimizer.step()

        # apply slow-update to the target networks
        self.soft_update_targets()


    def soft_update_net(self, source_net_params, target_net_params):
        """
        Perform Polyak averaging to update the parameters of the provided network
        Args:
            source_net_params (list): trainable parameters of the source, ie. current version of the network
            target_net_params (list): trainable parameters of the corresponding target network
        """

        for source_param, target_param in zip(source_net_params, target_net_params):
            target_param.data.copy_(self.tau * source_param.data + (1 - self.tau) * target_param.data)

    def soft_update_targets(self):
        """ Function that calls Polyak averaging on both target networks """

        self.soft_update_net(self.actor.parameters(), self.target_actor.parameters())
        self.soft_update_net(self.critic.parameters(), self.target_critic.parameters())

    def save(self, path, model_name):
        """
            Saves trained model

            Params
            =====
            path(str) : folder path to save the agent's weights
            name(str) : name to save the agent's weights 
        """
        self.actor.save_model('{}/{}_actor'.format(path, model_name))
        self.critic.save_model('{}/{}_critic'.format(path, model_name))

    def load(self, path, model_name):
        """
            Loads trained model

            Params
            =====
            path(str) : folder path to the agent's weights
            name(str) : name of the saved agent's weights 
        """
        self.actor.load_model('{}/{}_actor'.format(path, model_name))
        self.critic.load_model('{}/{}_critic'.format(path, model_name))

Methods

def learn(self, batchsize)

Function to perform the updates on the 4 neural networks that run the DDPG algorithm. Args: batchsize (int): Number of experiences to be randomly sampled from the memory for the agent to learn from

Expand source code
def learn(self, batchsize):
    """
    Function to perform the updates on the 4 neural networks that run the DDPG algorithm.
    Args: 
        batchsize (int): Number of experiences to be randomly sampled from the memory for the agent to learn from
    """

    if len(self.memory) < batchsize:
        return
    states, actions, next_states, rewards, dones = self.memory.sample(batchsize, self.device) # a batch of experiences randomly sampled form the memory

    # ensure that the actions and rewards tensors have the appropriate shapes
    actions = actions.view(-1, self.action_dim) 
    rewards = rewards.view(-1, 1)

    with torch.no_grad():
        # generate target actions
        target_action = self.target_actor(next_states)

        # calculate TD-Target
        target_q = self.target_critic(next_states, target_action)
        target_q[dones] = 0.0 # being in a terminal state implies there are no more future states that the agent would encounter in the given episode and so set the associated Q-value to 0
        y = rewards + self.discount * target_q

    current_q = self.critic(states, actions)
    critic_loss = F.mse_loss(current_q, y).mean()
    
    self.critic.optimizer.zero_grad()
    critic_loss.backward()
    self.critic.optimizer.step()

    # actor loss is calculated by a gradient ascent along the crtic, thus need to apply the negative sign to convert to a gradient descent
    pred_current_actions = self.actor(states)
    pred_current_q = self.critic(states, pred_current_actions)
    actor_loss = - pred_current_q.mean()

    self.actor.optimizer.zero_grad()
    actor_loss.backward()
    self.actor.optimizer.step()

    # apply slow-update to the target networks
    self.soft_update_targets()
def load(self, path, model_name)

Loads trained model

Params

path(str) : folder path to the agent's weights name(str) : name of the saved agent's weights

Expand source code
def load(self, path, model_name):
    """
        Loads trained model

        Params
        =====
        path(str) : folder path to the agent's weights
        name(str) : name of the saved agent's weights 
    """
    self.actor.load_model('{}/{}_actor'.format(path, model_name))
    self.critic.load_model('{}/{}_critic'.format(path, model_name))
def save(self, path, model_name)

Saves trained model

Params

path(str) : folder path to save the agent's weights name(str) : name to save the agent's weights

Expand source code
def save(self, path, model_name):
    """
        Saves trained model

        Params
        =====
        path(str) : folder path to save the agent's weights
        name(str) : name to save the agent's weights 
    """
    self.actor.save_model('{}/{}_actor'.format(path, model_name))
    self.critic.save_model('{}/{}_critic'.format(path, model_name))
def select_action(self, state)

Function to return the appropriate action for the given state. During training, it adds a zero-mean OU noise to the action to encourage exploration. During testing, no noise is added to the action decision. Parameters


state (array_like): The current state of the environment as observed by the agent

Returns

action
A numpy array representing the noisy action to be performed by the agent in the current state
Expand source code
def select_action(self, state):
    """
    Function to return the appropriate action for the given state.
    During training, it adds a zero-mean OU noise to the action to encourage exploration.
    During testing, no noise is added to the action decision.
    Parameters
    ---
    state (array_like): The current state of the environment as observed by the agent
    
    Returns:
        action: A numpy array representing the noisy action to be performed by the agent in the current state
    """

    if not torch.is_tensor(state):
        state = torch.tensor([state], dtype=torch.float32).to(self.device)
    
    self.actor.eval()
    act = self.actor(state).cpu().data.numpy().flatten() # performs inference using the actor based on the current state as the input and returns the corresponding np array
    self.actor.train()

    noise = 0.0

    ## for adding Gaussian noise (to use, update the code pass the exploration noise as input)
    #if self.train_mode:
    #       noise = np.random.normal(0.0, exploration_noise, size=act.shape) # generate the zero-mean gaussian noise with standard deviation determined by exploration_noise

    # for adding OU noise
    if self.train_mode:
        noise = self.ou_noise.generate_noise()

    noisy_action = act + noise
    noisy_action = noisy_action.clip(min=-self.max_action, max=self.max_action) # to ensure that the noisy action being returned is within the limit of "legal" actions afforded to the agent; assumes action range is symmetric

    return noisy_action
def soft_update_net(self, source_net_params, target_net_params)

Perform Polyak averaging to update the parameters of the provided network

Args

source_net_params : list
trainable parameters of the source, ie. current version of the network
target_net_params : list
trainable parameters of the corresponding target network
Expand source code
def soft_update_net(self, source_net_params, target_net_params):
    """
    Perform Polyak averaging to update the parameters of the provided network
    Args:
        source_net_params (list): trainable parameters of the source, ie. current version of the network
        target_net_params (list): trainable parameters of the corresponding target network
    """

    for source_param, target_param in zip(source_net_params, target_net_params):
        target_param.data.copy_(self.tau * source_param.data + (1 - self.tau) * target_param.data)
def soft_update_targets(self)

Function that calls Polyak averaging on both target networks

Expand source code
def soft_update_targets(self):
    """ Function that calls Polyak averaging on both target networks """

    self.soft_update_net(self.actor.parameters(), self.target_actor.parameters())
    self.soft_update_net(self.critic.parameters(), self.target_critic.parameters())