Module AssetAllocator.algorithms.DDPG.agent

Script that contains the training and testing loops

Expand source code
"""
Script that contains the training and testing loops
"""
import gym
import os
import pickle
import numpy as np
import torch
import torch.nn as nn
import torch.nn.functional as F

from .Network import Actor, Critic
from .Replay_Memory import ReplayMemory
from .OU_Noise import OrnsteinUhlenbeckNoise


class DDPGAgentHelper:

    """This is the agent class for the DDPG Agent.

    Original paper can be found at https://arxiv.org/abs/1509.02971

    This implementation was adapted from https://github.com/saashanair/rl-series/tree/master/ddpg
    
    """
    def __init__(
        self,
        env, 
        state_dim, 
        action_dim, 
        max_action, 
        device, 
        memory_capacity=10000, 
        num_memory_fill_episodes=10, 
        discount=0.99, 
        tau=0.005, 
        sigma=0.2, 
        theta=0.15, 
        actor_lr=1e-4, 
        critic_lr=1e-3, 
        batch_size=64, 
        warmup_steps = 100
        ):
        """Helper class for Initializing a DDPG Agent

        Args:
            env (gym object): Gym environment for the agent to interact with
            state_dim (int): State space dimension
            action_dim (int): Action space dimension
            max_action (int): the max value of the range in the action space (assumes a symmetric range in the action space)
            device (str, optional): One of cuda or cpu. Defaults to 'cuda'.
            memory_capacity (int, optional): Size of replay buffer. Defaults to 10_000.
            num_memory_fill_episodes (int, optional): Number of elements to initialize in the replay buffer. Defaults to 10.
            discount (float, optional): Reward discount factor. Defaults to 0.99.
            tau (float, optional): Polyak averaging soft updates factor (i.e., soft updating of the target networks). Defaults to 0.005.
            sigma (float, optional): Amount of noise to be applied to the OU process. Defaults to 0.2.
            theta (float, optional): Amount of frictional force to be applied in OU noise generation. Defaults to 0.15.
            actor_lr ([type], optional): Actor's learning rate. Defaults to 1e-4.
            critic_lr ([type], optional): Critic's learning rate. Defaults to 1e-3.
            batch_size (int, optional): Batch size for replay buffer and networks. Defaults to 128.
            warmup_steps (int, optional): Memory warmup steps. Defaults to 100.
        """      

        self.env = env
        self.batch_size = batch_size

        self.state_dim = state_dim  # dimension of the state space
        self.action_dim = action_dim  # dimension of the action space

        self.device = device  # defines which cuda or cpu device is to be used to run the networks
        # denoted a gamma in the equation for computation of the Q-value
        self.discount = discount
        # defines the factor used for Polyak averaging (i.e., soft updating of the target networks)
        self.tau = tau
        # the max value of the range in the action space (assumes a symmetric range in the action space)
        self.max_action = max_action
        self.warmup_steps = warmup_steps
        # create an instance of the replay buffer
        self.memory_capacity = memory_capacity
        self.num_memory_fill_episodes = num_memory_fill_episodes
        self.memory = ReplayMemory(memory_capacity)

        # create an instance of the noise generating process
        self.ou_noise = OrnsteinUhlenbeckNoise(
            mu=np.zeros(self.action_dim), sigma=sigma, theta=theta)

        # instances of the networks for the actor and the critic
        self.actor = Actor(state_dim, action_dim, max_action, actor_lr)
        self.critic = Critic(state_dim, action_dim, critic_lr)

        # instance of the target networks for the actor and the critic
        self.target_actor = Actor(state_dim, action_dim, max_action, actor_lr)
        self.target_critic = Critic(state_dim, action_dim, critic_lr)

        # initialise the targets to the same weight as their corresponding current networks
        self.target_actor.load_state_dict(self.actor.state_dict())
        self.target_critic.load_state_dict(self.critic.state_dict())

        # since we do not learn/train on the target networks
        self.target_actor.eval()
        self.target_critic.eval()

        self.actor.to(self.device)
        self.critic.to(self.device)

        self.target_actor.to(self.device)
        self.target_critic.to(self.device)

    def fill_memory(self):
        """
        Helper method to fill replay buffer during the warmup steps
        """   
        epochs = self.warmup_steps//self.env.episode_length + 1
        for epoch in range(epochs):
            state = self.env.reset()
            done = False

            while not done:
                action = self.env.action_space.sample()  # do random action for warmup
                action = action/action.sum() #normalize random actions
                next_state, reward, done, _ = self.env.step(action)
                # store the transition to memory
                self.memory.store([state, action, next_state, reward, done])
                state = next_state
        print("Done filling memory")

    @staticmethod
    def _softmax(x, axis=0):
        # Use the LogSumExp Trick
        max_val = np.amax(x, axis=axis, keepdims=True)
        x = x - max_val

        # Softmax
        num = np.exp(x)
        denum = num.sum(axis=axis, keepdims=True)
        softmax = num/denum
        return softmax

    def select_action(self, state):
        """
        Function to return the appropriate action for the given state.
        During training, it adds a zero-mean OU noise to the action to encourage exploration.
        During testing, no noise is added to the action decision.
        Parameters
        ---
        state (array_like): The current state of the environment as observed by the agent
        
        Returns:
            action: A numpy array representing the noisy action to be performed by the agent in the current state
        """

        if not torch.is_tensor(state):
            state = torch.tensor(np.array([state]), dtype=torch.float32).to(self.device)

        self.actor.eval()
        # performs inference using the actor based on the current state as the input and returns the corresponding np array
        act = self.actor(state).cpu().data.numpy().flatten()
        self.actor.train()

        noise = 0.0

        # for adding Gaussian noise (to use, update the code pass the exploration noise as input)
        # if self.train_mode:
        #       noise = np.random.normal(0.0, exploration_noise, size=act.shape) # generate the zero-mean gaussian noise with standard deviation determined by exploration_noise

        # for adding OU noise
        # if self.train_mode:
        noise = self.ou_noise.generate_noise()
        noisy_action = act + noise
        # to ensure that the noisy action being returned is within the limit of "legal" actions afforded to the agent;
        noisy_action = noisy_action.clip(
            min=0, max=self.max_action)
        return DDPGAgentHelper._softmax(noisy_action)

    def _learn(self):
        """
        Function to perform the updates on the 4 neural networks that run the DDPG algorithm.
        """
        if len(self.memory) < self.batch_size:
            return
        states, actions, next_states, rewards, dones = self.memory.sample(
            self.batch_size, self.device)  # a batch of experiences randomly sampled form the memory

        # ensure that the actions and rewards tensors have the appropriate shapes
        actions = actions.view(-1, self.action_dim)
        rewards = rewards.view(-1, 1)

        with torch.no_grad():
            # generate target actions
            target_action = self.target_actor(next_states)

            # calculate TD-Target
            target_q = self.target_critic(next_states, target_action)
            # being in a terminal state implies there are no more future states that the agent would encounter in the given episode and so set the associated Q-value to 0
            target_q[dones] = 0.0
            y = rewards + self.discount * target_q

        current_q = self.critic(states, actions)
        critic_loss = F.mse_loss(current_q, y).mean()

        self.critic.optimizer.zero_grad()
        critic_loss.backward()
        self.critic.optimizer.step()

        # actor loss is calculated by a gradient ascent along the crtic, thus need to apply the negative sign to convert to a gradient descent
        pred_current_actions = self.actor(states)
        pred_current_q = self.critic(states, pred_current_actions)
        actor_loss = - pred_current_q.mean()

        self.actor.optimizer.zero_grad()
        actor_loss.backward()
        self.actor.optimizer.step()

        # apply slow-update to the target networks
        self.soft_update_targets()

    def learn(self, timesteps, print_every=100):
        """
        Trains the agent

        Params
        ======
            timesteps (int): Number of timesteps the agent should interact with the environment
            print_every (int): Verbosity control
        """
        self.fill_memory()  # to populate the replay buffer before learning begins
        self.train(timesteps, print_every)

    def predict(self, state):
        """Returns agent's action based on a given state

        Args:
            state (array_like): Current environment state

        Returns:
            action (array_like): Agent's action
        """      
        ou = self.ou_noise
        self.actor.eval()
        self.critic.eval()
        self.ounoise = None

        action = self.select_action(state)

        self.actor.train()
        self.critic.train()
        self.ounoise = ou
        return action

    def soft_update_net(self, source_net_params, target_net_params):
        """
        Perform Polyak averaging to update the parameters of the provided network
        Args:
            source_net_params (list): trainable parameters of the source, ie. current version of the network
            target_net_params (list): trainable parameters of the corresponding target network
        """

        for source_param, target_param in zip(source_net_params, target_net_params):
            target_param.data.copy_(
                self.tau * source_param.data + (1 - self.tau) * target_param.data)

    def soft_update_targets(self):
        """ Function that calls Polyak averaging on both target networks """

        self.soft_update_net(self.actor.parameters(),
                             self.target_actor.parameters())
        self.soft_update_net(self.critic.parameters(),
                             self.target_critic.parameters())

    def train(self, timesteps, print_every):
        """Helper method to train the agent

        Args:
            timesteps (int): Total timesteps the agent has interacted for
            print_every (int): Verbosity control
        """     
        reward_history = []  # tracks the reward per episode
        best_score = -np.inf

        epochs = timesteps//self.env.episode_length + 1
        total_steps = 0
        flag = False
        count_of_dones = 0
        
        for ep_cnt in range(epochs):
            done = False
            state = self.env.reset()
            ep_reward = 0

            while not done:
                action = self.select_action(state)  # generate noisy action
                # print("Action:", action)
                next_state, reward, done, _ = self.env.step(
                    action)  # execute the action in the environment
                # store the interaction in the replay buffer
                self.memory.store([state, action, next_state, reward, done])

                self._learn(total_steps)  # update the networks

                state = next_state
                total_steps += 1
                ep_reward += reward
                
                if done:
                    count_of_dones += 1
                    flag = True
            
                if flag and count_of_dones % print_every == 0:
                        print(f'Score at timestep {total_steps}: {ep_reward}.')
                        flag = False

                if total_steps >= timesteps:
                    break
                
    def save(self, file_name):
        """
        Saves trained model

        Params
        =====
        filepath(str) : folder path to save the agent
        """
        self.actor.save_model(f"{file_name}_actor")
        self.critic.save_model(f"{file_name}_critic")

    def load(self, file_name):
        """
        Loads trained model

        Params
        =====
        filepath(str) : folder path to save the agent
        """
        self.actor.load_model(f"{file_name}_actor")
        self.critic.load_model(f"{file_name}_critic")

    # def save(self, path, model_name):
    #     self.actor.save_model('{}/{}_actor'.format(path, model_name))
    #     self.critic.save_model('{}/{}_critic'.format(path, model_name))

    # def load(self, path, model_name):
    #     self.actor.load_model('{}/{}_actor'.format(path, model_name))
    #     self.critic.load_model('{}/{}_critic'.format(path, model_name))


default_device = torch.device(
    "cuda") if torch.cuda.is_available() else torch.device("cpu")


def DDPGAgent(env, device=default_device):
    """Factory function for creating a DDPG Agent

    Args:
        env (gym object): Gym environment for the agent to interact with
        device (string, optional): Device for training - defaults to Cuda if GPU is detected

    Returns:
        agent: DDPG Agent Instance
    """
    ddpg_agent = DDPGAgentHelper(env=env,
                           state_dim=env.observation_space.shape[0],
                           action_dim=env.action_space.shape[0],
                           max_action=env.action_space.high[0],
                           device=device,
                           memory_capacity=10000,
                           discount=0.99,
                           tau=0.005,
                           sigma=0.2,
                           theta=0.15,
                           actor_lr=1e-4,
                           critic_lr=1e-3,
                           batch_size=64)
    return ddpg_agent

Functions

def DDPGAgent(env, device=device(type='cpu'))

Factory function for creating a DDPG Agent

Args

env : gym object
Gym environment for the agent to interact with
device : string, optional
Device for training - defaults to Cuda if GPU is detected

Returns

agent
DDPG Agent Instance
Expand source code
def DDPGAgent(env, device=default_device):
    """Factory function for creating a DDPG Agent

    Args:
        env (gym object): Gym environment for the agent to interact with
        device (string, optional): Device for training - defaults to Cuda if GPU is detected

    Returns:
        agent: DDPG Agent Instance
    """
    ddpg_agent = DDPGAgentHelper(env=env,
                           state_dim=env.observation_space.shape[0],
                           action_dim=env.action_space.shape[0],
                           max_action=env.action_space.high[0],
                           device=device,
                           memory_capacity=10000,
                           discount=0.99,
                           tau=0.005,
                           sigma=0.2,
                           theta=0.15,
                           actor_lr=1e-4,
                           critic_lr=1e-3,
                           batch_size=64)
    return ddpg_agent

Classes

class DDPGAgentHelper (env, state_dim, action_dim, max_action, device, memory_capacity=10000, num_memory_fill_episodes=10, discount=0.99, tau=0.005, sigma=0.2, theta=0.15, actor_lr=0.0001, critic_lr=0.001, batch_size=64, warmup_steps=100)

This is the agent class for the DDPG Agent.

Original paper can be found at https://arxiv.org/abs/1509.02971

This implementation was adapted from https://github.com/saashanair/rl-series/tree/master/ddpg

Helper class for Initializing a DDPG Agent

Args

env : gym object
Gym environment for the agent to interact with
state_dim : int
State space dimension
action_dim : int
Action space dimension
max_action : int
the max value of the range in the action space (assumes a symmetric range in the action space)
device : str, optional
One of cuda or cpu. Defaults to 'cuda'.
memory_capacity : int, optional
Size of replay buffer. Defaults to 10_000.
num_memory_fill_episodes : int, optional
Number of elements to initialize in the replay buffer. Defaults to 10.
discount : float, optional
Reward discount factor. Defaults to 0.99.
tau : float, optional
Polyak averaging soft updates factor (i.e., soft updating of the target networks). Defaults to 0.005.
sigma : float, optional
Amount of noise to be applied to the OU process. Defaults to 0.2.
theta : float, optional
Amount of frictional force to be applied in OU noise generation. Defaults to 0.15.
actor_lr : [type], optional
Actor's learning rate. Defaults to 1e-4.
critic_lr : [type], optional
Critic's learning rate. Defaults to 1e-3.
batch_size : int, optional
Batch size for replay buffer and networks. Defaults to 128.
warmup_steps : int, optional
Memory warmup steps. Defaults to 100.
Expand source code
class DDPGAgentHelper:

    """This is the agent class for the DDPG Agent.

    Original paper can be found at https://arxiv.org/abs/1509.02971

    This implementation was adapted from https://github.com/saashanair/rl-series/tree/master/ddpg
    
    """
    def __init__(
        self,
        env, 
        state_dim, 
        action_dim, 
        max_action, 
        device, 
        memory_capacity=10000, 
        num_memory_fill_episodes=10, 
        discount=0.99, 
        tau=0.005, 
        sigma=0.2, 
        theta=0.15, 
        actor_lr=1e-4, 
        critic_lr=1e-3, 
        batch_size=64, 
        warmup_steps = 100
        ):
        """Helper class for Initializing a DDPG Agent

        Args:
            env (gym object): Gym environment for the agent to interact with
            state_dim (int): State space dimension
            action_dim (int): Action space dimension
            max_action (int): the max value of the range in the action space (assumes a symmetric range in the action space)
            device (str, optional): One of cuda or cpu. Defaults to 'cuda'.
            memory_capacity (int, optional): Size of replay buffer. Defaults to 10_000.
            num_memory_fill_episodes (int, optional): Number of elements to initialize in the replay buffer. Defaults to 10.
            discount (float, optional): Reward discount factor. Defaults to 0.99.
            tau (float, optional): Polyak averaging soft updates factor (i.e., soft updating of the target networks). Defaults to 0.005.
            sigma (float, optional): Amount of noise to be applied to the OU process. Defaults to 0.2.
            theta (float, optional): Amount of frictional force to be applied in OU noise generation. Defaults to 0.15.
            actor_lr ([type], optional): Actor's learning rate. Defaults to 1e-4.
            critic_lr ([type], optional): Critic's learning rate. Defaults to 1e-3.
            batch_size (int, optional): Batch size for replay buffer and networks. Defaults to 128.
            warmup_steps (int, optional): Memory warmup steps. Defaults to 100.
        """      

        self.env = env
        self.batch_size = batch_size

        self.state_dim = state_dim  # dimension of the state space
        self.action_dim = action_dim  # dimension of the action space

        self.device = device  # defines which cuda or cpu device is to be used to run the networks
        # denoted a gamma in the equation for computation of the Q-value
        self.discount = discount
        # defines the factor used for Polyak averaging (i.e., soft updating of the target networks)
        self.tau = tau
        # the max value of the range in the action space (assumes a symmetric range in the action space)
        self.max_action = max_action
        self.warmup_steps = warmup_steps
        # create an instance of the replay buffer
        self.memory_capacity = memory_capacity
        self.num_memory_fill_episodes = num_memory_fill_episodes
        self.memory = ReplayMemory(memory_capacity)

        # create an instance of the noise generating process
        self.ou_noise = OrnsteinUhlenbeckNoise(
            mu=np.zeros(self.action_dim), sigma=sigma, theta=theta)

        # instances of the networks for the actor and the critic
        self.actor = Actor(state_dim, action_dim, max_action, actor_lr)
        self.critic = Critic(state_dim, action_dim, critic_lr)

        # instance of the target networks for the actor and the critic
        self.target_actor = Actor(state_dim, action_dim, max_action, actor_lr)
        self.target_critic = Critic(state_dim, action_dim, critic_lr)

        # initialise the targets to the same weight as their corresponding current networks
        self.target_actor.load_state_dict(self.actor.state_dict())
        self.target_critic.load_state_dict(self.critic.state_dict())

        # since we do not learn/train on the target networks
        self.target_actor.eval()
        self.target_critic.eval()

        self.actor.to(self.device)
        self.critic.to(self.device)

        self.target_actor.to(self.device)
        self.target_critic.to(self.device)

    def fill_memory(self):
        """
        Helper method to fill replay buffer during the warmup steps
        """   
        epochs = self.warmup_steps//self.env.episode_length + 1
        for epoch in range(epochs):
            state = self.env.reset()
            done = False

            while not done:
                action = self.env.action_space.sample()  # do random action for warmup
                action = action/action.sum() #normalize random actions
                next_state, reward, done, _ = self.env.step(action)
                # store the transition to memory
                self.memory.store([state, action, next_state, reward, done])
                state = next_state
        print("Done filling memory")

    @staticmethod
    def _softmax(x, axis=0):
        # Use the LogSumExp Trick
        max_val = np.amax(x, axis=axis, keepdims=True)
        x = x - max_val

        # Softmax
        num = np.exp(x)
        denum = num.sum(axis=axis, keepdims=True)
        softmax = num/denum
        return softmax

    def select_action(self, state):
        """
        Function to return the appropriate action for the given state.
        During training, it adds a zero-mean OU noise to the action to encourage exploration.
        During testing, no noise is added to the action decision.
        Parameters
        ---
        state (array_like): The current state of the environment as observed by the agent
        
        Returns:
            action: A numpy array representing the noisy action to be performed by the agent in the current state
        """

        if not torch.is_tensor(state):
            state = torch.tensor(np.array([state]), dtype=torch.float32).to(self.device)

        self.actor.eval()
        # performs inference using the actor based on the current state as the input and returns the corresponding np array
        act = self.actor(state).cpu().data.numpy().flatten()
        self.actor.train()

        noise = 0.0

        # for adding Gaussian noise (to use, update the code pass the exploration noise as input)
        # if self.train_mode:
        #       noise = np.random.normal(0.0, exploration_noise, size=act.shape) # generate the zero-mean gaussian noise with standard deviation determined by exploration_noise

        # for adding OU noise
        # if self.train_mode:
        noise = self.ou_noise.generate_noise()
        noisy_action = act + noise
        # to ensure that the noisy action being returned is within the limit of "legal" actions afforded to the agent;
        noisy_action = noisy_action.clip(
            min=0, max=self.max_action)
        return DDPGAgentHelper._softmax(noisy_action)

    def _learn(self):
        """
        Function to perform the updates on the 4 neural networks that run the DDPG algorithm.
        """
        if len(self.memory) < self.batch_size:
            return
        states, actions, next_states, rewards, dones = self.memory.sample(
            self.batch_size, self.device)  # a batch of experiences randomly sampled form the memory

        # ensure that the actions and rewards tensors have the appropriate shapes
        actions = actions.view(-1, self.action_dim)
        rewards = rewards.view(-1, 1)

        with torch.no_grad():
            # generate target actions
            target_action = self.target_actor(next_states)

            # calculate TD-Target
            target_q = self.target_critic(next_states, target_action)
            # being in a terminal state implies there are no more future states that the agent would encounter in the given episode and so set the associated Q-value to 0
            target_q[dones] = 0.0
            y = rewards + self.discount * target_q

        current_q = self.critic(states, actions)
        critic_loss = F.mse_loss(current_q, y).mean()

        self.critic.optimizer.zero_grad()
        critic_loss.backward()
        self.critic.optimizer.step()

        # actor loss is calculated by a gradient ascent along the crtic, thus need to apply the negative sign to convert to a gradient descent
        pred_current_actions = self.actor(states)
        pred_current_q = self.critic(states, pred_current_actions)
        actor_loss = - pred_current_q.mean()

        self.actor.optimizer.zero_grad()
        actor_loss.backward()
        self.actor.optimizer.step()

        # apply slow-update to the target networks
        self.soft_update_targets()

    def learn(self, timesteps, print_every=100):
        """
        Trains the agent

        Params
        ======
            timesteps (int): Number of timesteps the agent should interact with the environment
            print_every (int): Verbosity control
        """
        self.fill_memory()  # to populate the replay buffer before learning begins
        self.train(timesteps, print_every)

    def predict(self, state):
        """Returns agent's action based on a given state

        Args:
            state (array_like): Current environment state

        Returns:
            action (array_like): Agent's action
        """      
        ou = self.ou_noise
        self.actor.eval()
        self.critic.eval()
        self.ounoise = None

        action = self.select_action(state)

        self.actor.train()
        self.critic.train()
        self.ounoise = ou
        return action

    def soft_update_net(self, source_net_params, target_net_params):
        """
        Perform Polyak averaging to update the parameters of the provided network
        Args:
            source_net_params (list): trainable parameters of the source, ie. current version of the network
            target_net_params (list): trainable parameters of the corresponding target network
        """

        for source_param, target_param in zip(source_net_params, target_net_params):
            target_param.data.copy_(
                self.tau * source_param.data + (1 - self.tau) * target_param.data)

    def soft_update_targets(self):
        """ Function that calls Polyak averaging on both target networks """

        self.soft_update_net(self.actor.parameters(),
                             self.target_actor.parameters())
        self.soft_update_net(self.critic.parameters(),
                             self.target_critic.parameters())

    def train(self, timesteps, print_every):
        """Helper method to train the agent

        Args:
            timesteps (int): Total timesteps the agent has interacted for
            print_every (int): Verbosity control
        """     
        reward_history = []  # tracks the reward per episode
        best_score = -np.inf

        epochs = timesteps//self.env.episode_length + 1
        total_steps = 0
        flag = False
        count_of_dones = 0
        
        for ep_cnt in range(epochs):
            done = False
            state = self.env.reset()
            ep_reward = 0

            while not done:
                action = self.select_action(state)  # generate noisy action
                # print("Action:", action)
                next_state, reward, done, _ = self.env.step(
                    action)  # execute the action in the environment
                # store the interaction in the replay buffer
                self.memory.store([state, action, next_state, reward, done])

                self._learn(total_steps)  # update the networks

                state = next_state
                total_steps += 1
                ep_reward += reward
                
                if done:
                    count_of_dones += 1
                    flag = True
            
                if flag and count_of_dones % print_every == 0:
                        print(f'Score at timestep {total_steps}: {ep_reward}.')
                        flag = False

                if total_steps >= timesteps:
                    break
                
    def save(self, file_name):
        """
        Saves trained model

        Params
        =====
        filepath(str) : folder path to save the agent
        """
        self.actor.save_model(f"{file_name}_actor")
        self.critic.save_model(f"{file_name}_critic")

    def load(self, file_name):
        """
        Loads trained model

        Params
        =====
        filepath(str) : folder path to save the agent
        """
        self.actor.load_model(f"{file_name}_actor")
        self.critic.load_model(f"{file_name}_critic")

Methods

def fill_memory(self)

Helper method to fill replay buffer during the warmup steps

Expand source code
def fill_memory(self):
    """
    Helper method to fill replay buffer during the warmup steps
    """   
    epochs = self.warmup_steps//self.env.episode_length + 1
    for epoch in range(epochs):
        state = self.env.reset()
        done = False

        while not done:
            action = self.env.action_space.sample()  # do random action for warmup
            action = action/action.sum() #normalize random actions
            next_state, reward, done, _ = self.env.step(action)
            # store the transition to memory
            self.memory.store([state, action, next_state, reward, done])
            state = next_state
    print("Done filling memory")
def learn(self, timesteps, print_every=100)

Trains the agent

Params

timesteps (int): Number of timesteps the agent should interact with the environment
print_every (int): Verbosity control
Expand source code
def learn(self, timesteps, print_every=100):
    """
    Trains the agent

    Params
    ======
        timesteps (int): Number of timesteps the agent should interact with the environment
        print_every (int): Verbosity control
    """
    self.fill_memory()  # to populate the replay buffer before learning begins
    self.train(timesteps, print_every)
def load(self, file_name)

Loads trained model

Params

filepath(str) : folder path to save the agent

Expand source code
def load(self, file_name):
    """
    Loads trained model

    Params
    =====
    filepath(str) : folder path to save the agent
    """
    self.actor.load_model(f"{file_name}_actor")
    self.critic.load_model(f"{file_name}_critic")
def predict(self, state)

Returns agent's action based on a given state

Args

state : array_like
Current environment state

Returns

action (array_like): Agent's action

Expand source code
def predict(self, state):
    """Returns agent's action based on a given state

    Args:
        state (array_like): Current environment state

    Returns:
        action (array_like): Agent's action
    """      
    ou = self.ou_noise
    self.actor.eval()
    self.critic.eval()
    self.ounoise = None

    action = self.select_action(state)

    self.actor.train()
    self.critic.train()
    self.ounoise = ou
    return action
def save(self, file_name)

Saves trained model

Params

filepath(str) : folder path to save the agent

Expand source code
def save(self, file_name):
    """
    Saves trained model

    Params
    =====
    filepath(str) : folder path to save the agent
    """
    self.actor.save_model(f"{file_name}_actor")
    self.critic.save_model(f"{file_name}_critic")
def select_action(self, state)

Function to return the appropriate action for the given state. During training, it adds a zero-mean OU noise to the action to encourage exploration. During testing, no noise is added to the action decision. Parameters


state (array_like): The current state of the environment as observed by the agent

Returns

action
A numpy array representing the noisy action to be performed by the agent in the current state
Expand source code
def select_action(self, state):
    """
    Function to return the appropriate action for the given state.
    During training, it adds a zero-mean OU noise to the action to encourage exploration.
    During testing, no noise is added to the action decision.
    Parameters
    ---
    state (array_like): The current state of the environment as observed by the agent
    
    Returns:
        action: A numpy array representing the noisy action to be performed by the agent in the current state
    """

    if not torch.is_tensor(state):
        state = torch.tensor(np.array([state]), dtype=torch.float32).to(self.device)

    self.actor.eval()
    # performs inference using the actor based on the current state as the input and returns the corresponding np array
    act = self.actor(state).cpu().data.numpy().flatten()
    self.actor.train()

    noise = 0.0

    # for adding Gaussian noise (to use, update the code pass the exploration noise as input)
    # if self.train_mode:
    #       noise = np.random.normal(0.0, exploration_noise, size=act.shape) # generate the zero-mean gaussian noise with standard deviation determined by exploration_noise

    # for adding OU noise
    # if self.train_mode:
    noise = self.ou_noise.generate_noise()
    noisy_action = act + noise
    # to ensure that the noisy action being returned is within the limit of "legal" actions afforded to the agent;
    noisy_action = noisy_action.clip(
        min=0, max=self.max_action)
    return DDPGAgentHelper._softmax(noisy_action)
def soft_update_net(self, source_net_params, target_net_params)

Perform Polyak averaging to update the parameters of the provided network

Args

source_net_params : list
trainable parameters of the source, ie. current version of the network
target_net_params : list
trainable parameters of the corresponding target network
Expand source code
def soft_update_net(self, source_net_params, target_net_params):
    """
    Perform Polyak averaging to update the parameters of the provided network
    Args:
        source_net_params (list): trainable parameters of the source, ie. current version of the network
        target_net_params (list): trainable parameters of the corresponding target network
    """

    for source_param, target_param in zip(source_net_params, target_net_params):
        target_param.data.copy_(
            self.tau * source_param.data + (1 - self.tau) * target_param.data)
def soft_update_targets(self)

Function that calls Polyak averaging on both target networks

Expand source code
def soft_update_targets(self):
    """ Function that calls Polyak averaging on both target networks """

    self.soft_update_net(self.actor.parameters(),
                         self.target_actor.parameters())
    self.soft_update_net(self.critic.parameters(),
                         self.target_critic.parameters())
def train(self, timesteps, print_every)

Helper method to train the agent

Args

timesteps : int
Total timesteps the agent has interacted for
print_every : int
Verbosity control
Expand source code
def train(self, timesteps, print_every):
    """Helper method to train the agent

    Args:
        timesteps (int): Total timesteps the agent has interacted for
        print_every (int): Verbosity control
    """     
    reward_history = []  # tracks the reward per episode
    best_score = -np.inf

    epochs = timesteps//self.env.episode_length + 1
    total_steps = 0
    flag = False
    count_of_dones = 0
    
    for ep_cnt in range(epochs):
        done = False
        state = self.env.reset()
        ep_reward = 0

        while not done:
            action = self.select_action(state)  # generate noisy action
            # print("Action:", action)
            next_state, reward, done, _ = self.env.step(
                action)  # execute the action in the environment
            # store the interaction in the replay buffer
            self.memory.store([state, action, next_state, reward, done])

            self._learn(total_steps)  # update the networks

            state = next_state
            total_steps += 1
            ep_reward += reward
            
            if done:
                count_of_dones += 1
                flag = True
        
            if flag and count_of_dones % print_every == 0:
                    print(f'Score at timestep {total_steps}: {ep_reward}.')
                    flag = False

            if total_steps >= timesteps:
                break