Module AssetAllocator.algorithms.TD3.agent

Expand source code
import torch
import torch.nn as nn
import torch.nn.functional as F
from .actor import Actor
from .critic import Critic
from .memory import Memory
import numpy as np

class TD3Agent:
    """This is the agent class for the TD3 Agent.

    Original paper can be found at https://arxiv.org/abs/1802.09477

    This implementation was adapted from https://github.com/saashanair/rl-series/tree/master/td3
    
    """
    def __init__(
            self,
            env,
            hidden_dim = 256,
            device = 'cuda',
            memory_dim=100_000,
            max_action = 1,
            discount=0.99,
            update_freq=2,
            tau=0.005,
            policy_noise_std=0.2,
            policy_noise_clip=0.5,
            actor_lr= 1e-3,
            critic_lr= 1e-3,
            batch_size=128,
            exploration_noise=0.1,
            num_layers = 3,
            dropout = 0.2,
            add_lstm = False,
            warmup_steps = 100):

        """Initializes the TD3 Agent

        Args:
            env (gym object): Gym environment for the agent to interact with
            hidden_dim (int, optional): Size of hidden layer neurons. Defaults to 256.
            device (str, optional): One of cuda or cpu. Defaults to 'cuda'.
            memory_dim (int, optional): Size of replay buffer. Defaults to 100_000.
            max_action (int, optional): Action scaling factor. Defaults to 1.
            discount (float, optional): Reward discount factor. Defaults to 0.99.
            update_freq (int, optional): Number of times to update targets networks. Defaults to 2.
            tau (float, optional): Polyak averaging soft updates factor. Defaults to 0.005.
            policy_noise_std (float, optional): Standard deviation of noise. Defaults to 0.2.
            policy_noise_clip (float, optional): Clip value of noise. Defaults to 0.5.
            actor_lr (float, optional): Actor's learning rate. Defaults to 1e-3.
            critic_lr (float, optional): Critic's learning rate. Defaults to 1e-3.
            batch_size (int, optional): Batch size for replay buffer and networks. Defaults to 128.
            exploration_noise (float, optional): Exploration noise value. Defaults to 0.1.
            num_layers (int, optional): Number of LSTM layers. Defaults to 3.
            dropout (float, optional): Dropout value of LSTM. Defaults to 0.2.
            add_lstm (bool, optional): Boolean flag to add LSTM or not. Defaults to False.
            warmup_steps (int, optional): Memory warmup steps. Defaults to 100.
        """            

        self.env = env
        self.state_dim = env.observation_space.shape[0]
        self.hidden_dim = hidden_dim
        self.action_dim = env.action_space.shape[-1]
        self.lookback = env.lookback_period
        self.device = device
        self.max_action = max_action
        self.memory_dim = memory_dim
        self.discount = discount
        self.update_freq = update_freq
        self.tau = tau
        self.policy_noise_std = policy_noise_std
        self.policy_noise_clip = policy_noise_clip
        self.actor_lr = actor_lr
        self.critic_lr = critic_lr
        self.batch_size = batch_size
        self.exploration_noise = exploration_noise
        self.eval = False
        self.num_layers = num_layers
        self.dropout =dropout
        self.warmup_steps = warmup_steps

        # Instatiate Memory Buffer
        self.memory = Memory(self.memory_dim)

        # Instantiate Actor and Target Actor
        self.actor = Actor(state_dim = self.state_dim, 
                           action_dim = self.action_dim, 
                           hidden_dim = self.hidden_dim, 
                           lookback_dim = self.lookback, 
                           num_layers = self.num_layers,
                           lr = self.actor_lr, 
                           max_action = self.max_action, 
                           dropout = self.dropout, 
                           add_lstm = add_lstm)       
        self.actor.to(self.device)
        
        self.target_actor = Actor(state_dim = self.state_dim, 
                           action_dim = self.action_dim, 
                           hidden_dim = self.hidden_dim, 
                           lookback_dim = self.lookback, 
                           num_layers = self.num_layers,
                           lr = self.actor_lr, 
                           max_action = self.max_action, 
                           dropout = self.dropout,
                           add_lstm = add_lstm)       
        self.target_actor.to(self.device)

        # Instantiate Critic 1 and Target Critic 1
        self.critic1 = Critic(state_dim = self.state_dim, 
                              action_dim = self.action_dim , 
                              hidden_dim = self.hidden_dim,
                              lr = self.critic_lr)
        self.critic1.to(self.device)
        
        self.target_critic1 = Critic(state_dim = self.state_dim, 
                              action_dim = self.action_dim , 
                              hidden_dim = self.hidden_dim,
                              lr = self.critic_lr)
        self.target_critic1.to(self.device)

        # Instantiate Critic 2 and Target Critic 2
        self.critic2 = Critic(state_dim = self.state_dim, 
                              action_dim = self.action_dim , 
                              hidden_dim = self.hidden_dim,
                              lr = self.critic_lr)
        self.critic2.to(self.device)
        
        self.target_critic2 = Critic(state_dim = self.state_dim, 
                              action_dim = self.action_dim , 
                              hidden_dim = self.hidden_dim,
                              lr = self.critic_lr)
        self.target_critic2.to(self.device)

        # Copy weight to target networks
        self.target_actor.load_state_dict(self.actor.state_dict())
        self.target_critic1.load_state_dict(self.critic1.state_dict())
        self.target_critic2.load_state_dict(self.critic2.state_dict())
        
        # since we do not learn/train on the target networks
        self.target_actor.eval()
        self.target_critic1.eval()
        self.target_critic2.eval()

    # for test mode
    def eval_mode(self):
        """
        Switches agent from training mode to eval mode
        """        
        self.eval = True
        self.actor.eval()
        self.critic1.eval()
        self.critic2.eval()
    
    def select_action(self, state, exploration_noise=0.1):
        """Takes in current environment's state and returns the agent's action

        Args:
            state (array_like): Current environment state
            exploration_noise (float, optional): Policy exploration noise. Defaults to 0.1.

        Returns:
            action: Agent's action
        """
        if not torch.is_tensor(state):
            state = torch.tensor(np.array([state]), dtype=torch.float32).to(self.device)

        # Forward pass through actor network
        action = self.actor(state).cpu().data.numpy().flatten()

        if self.eval:
            exploration_noise = 0.0

        noise = np.random.normal(0.0, exploration_noise, size=action.shape)

        noisy_action = (action + noise)
        noisy_action = TD3Agent._softmax(noisy_action)

        return noisy_action

    def _update(self, source_net_params, target_net_params):
        """
        Helper method to update target network weights
        """        
        for source_param, target_param in zip(
                source_net_params, target_net_params):
            target_param.data.copy_(
                self.tau * source_param.data + (1 - self.tau) * target_param.data)

    def update_targets(self):
        self._update(self.actor.parameters(), self.target_actor.parameters())
        self._update(
            self.critic1.parameters(),
            self.target_critic1.parameters())
        self._update(
            self.critic2.parameters(),
            self.target_critic2.parameters())
    
    @staticmethod
    def _softmax(x, axis = 0):
        """Helper method to softmax action values

        Args:
            x (array_like): Action values
            axis (int, optional): Defaults to 0.
        """        
        # Use the LogSumExp Trick
        max_val = np.amax(x, axis=axis, keepdims = True)
        x = x - max_val

        # Softmax
        num = np.exp(x)
        denum = num.sum(axis = axis, keepdims = True)
        softmax = num/denum
        return softmax

    def _learn(self, iteration):
        """Helper method to train agent

        Args:
            iteration (int): Number of training iterations
        """        
        if len(self.memory) < self.batch_size:
            return

        # Memory Replay
        states, actions, next_states, rewards, dones = self.memory.sample(
            self.batch_size, self.device)

        actions =  nn.Softmax(dim = 1)(actions)
        actions = actions.view(-1, self.action_dim)
        rewards = rewards.view(-1, 1)
        
        
        with torch.no_grad():
            # Target Policy Smoothing
            pred_action = self.target_actor(next_states)
            noise = torch.zeros_like(pred_action).normal_(0, self.policy_noise_std).to(self.device)
            noisy_pred_action = pred_action + noise.clamp(-self.policy_noise_clip, self.policy_noise_clip)
            noisy_pred_action = nn.Softmax(dim = 1)(noisy_pred_action)

            # Clipped Double Q Learning
            target_q1 = self.target_critic1(next_states, noisy_pred_action)
            target_q2 = self.target_critic2(next_states, noisy_pred_action)
            target_q = torch.min(target_q1, target_q2).detach()
            target_q[dones] = 0.0
            y = rewards + self.discount * target_q

        # Loss Computation
        current_q1 = self.critic1(states, actions)
        current_q2 = self.critic2(states, actions)

        critic1_loss = F.mse_loss(current_q1, y).mean()
        critic2_loss = F.mse_loss(current_q2, y).mean()

        # Gradient Descent on critics
        self.critic1.optimizer.zero_grad()
        critic1_loss.backward()
        self.critic1.optimizer.step()

        self.critic2.optimizer.zero_grad()
        critic2_loss.backward()
        self.critic2.optimizer.step()


        # delayed policy and target updates
        if iteration % self.update_freq == 0:

            # Compute actor loss
            pred_current_actions = self.actor(states)
            pred_current_q1 = self.critic1(states, pred_current_actions)
            actor_loss = - pred_current_q1.mean()
            
            self.actor.optimizer.zero_grad()
            actor_loss.backward()
            self.actor.optimizer.step()

            # apply slow-update to all three target networks
            self.update_targets()

    def fill_memory(self):
        """
        Helper method to fill replay buffer during the warmup steps
        """        
        fill_memory_epochs = self.warmup_steps//self.env.episode_length
        
        for _ in range(fill_memory_epochs):
            state = self.env.reset()
            done = False

            while not done:
                # do random action for warmup
                action = self.env.action_space.sample() 
                action = np.array(action/sum(action))
                next_state, reward, done, _ = self.env.step(action)
                
                # store the transition to memory
                self.memory.store([state, action, next_state, reward, done]) 
                state = next_state

    def train(self, total_steps, timesteps, print_every, count_of_dones):
        """Helper method to train the agent

        Args:
            total_steps (int): Total steps the agent has taken
            timesteps (int): Total timesteps the agent has interacted for
            print_every (int): Verbosity control
            count_of_dones (int): Count of completed episodes
        """        
        done = False
        state = self.env.reset()
        ep_reward = 0
        flag = False
        
        while not done:
            action = self.select_action(state, self.exploration_noise)
            next_state, reward, done, _ = self.env.step(action)
            self.memory.store([state, action, next_state, reward, done])
            self._learn(total_steps)
            state = next_state
            total_steps += 1
            ep_reward += reward
                     
            if done:
                count_of_dones += 1
                flag = True
            
            if flag and count_of_dones % print_every == 0:
                    print(f'Score at timestep {total_steps}: {ep_reward}.')
                    flag = False
                    
            if total_steps >= timesteps:
                break
        
        return total_steps, count_of_dones

    def predict(self, state):
        """Returns agent's action based on a given state

        Args:
            state (array_like): Current environment state

        Returns:
            action (array_like): Agent's action
        """        
        self.eval_mode()
        action = self.select_action(state, self.exploration_noise)
        return action

    def learn(self, timesteps, print_every = 1):
        """
        Trains the agent

        Params
        ======
            timesteps (int): Number of timesteps the agent should interact with the environment
            print_every (int): Verbosity control
        """
        epochs = timesteps//self.env.episode_length + 1
        self.fill_memory()
        print('Startup memory filled!')
        
        count_of_dones = 0
        
        total_steps = 0
        for _ in range(epochs):
            total_steps, count_of_dones = self.train(total_steps, timesteps, print_every, count_of_dones)

    def save(self, filepath):
        """
        Saves trained model

        Params
        =====
        filepath(str) : folder path to save the agent
        """
        torch.save(self.critic1.state_dict(), filepath + '_critic1')
        torch.save(self.critic1.optimizer.state_dict(), filepath + '_critic1_optimizer')

        torch.save(self.critic2.state_dict(), filepath + '_critic2')
        torch.save(self.critic2.optimizer.state_dict(), filepath + '_critic2_optimizer')

        torch.save(self.actor.state_dict(), filepath + '_actor')
        torch.save(self.actor.optimizer.state_dict(), filepath + '_actor_optimizer')

    def load(self, filename):
        """
        Loads trained model

        Params
        =====
        filepath(str) : folder path to save the agent
        """
        self.critic1.load_state_dict(torch.load(filename + '_critic1'))
        self.critic1.optimizer.load_state_dict(torch.load(filename + '_critic1_optimizer'))

        self.critic2.load_state_dict(torch.load(filename + '_critic2'))
        self.critic2.optimizer.load_state_dict(torch.load(filename + '_critic2_optimizer'))

        self.actor.load_state_dict(torch.load(filename + '_actor'))
        self.actor.optimizer.load_state_dict(torch.load(filename + '_actor_optimizer'))
        
        self.target_actor.load_state_dict(self.actor.state_dict())
        self.target_critic1.load_state_dict(self.critic1.state_dict())
        self.target_critic2.load_state_dict(self.critic2.state_dict())

Classes

class TD3Agent (env, hidden_dim=256, device='cuda', memory_dim=100000, max_action=1, discount=0.99, update_freq=2, tau=0.005, policy_noise_std=0.2, policy_noise_clip=0.5, actor_lr=0.001, critic_lr=0.001, batch_size=128, exploration_noise=0.1, num_layers=3, dropout=0.2, add_lstm=False, warmup_steps=100)

This is the agent class for the TD3 Agent.

Original paper can be found at https://arxiv.org/abs/1802.09477

This implementation was adapted from https://github.com/saashanair/rl-series/tree/master/td3

Initializes the TD3 Agent

Args

env : gym object
Gym environment for the agent to interact with
hidden_dim : int, optional
Size of hidden layer neurons. Defaults to 256.
device : str, optional
One of cuda or cpu. Defaults to 'cuda'.
memory_dim : int, optional
Size of replay buffer. Defaults to 100_000.
max_action : int, optional
Action scaling factor. Defaults to 1.
discount : float, optional
Reward discount factor. Defaults to 0.99.
update_freq : int, optional
Number of times to update targets networks. Defaults to 2.
tau : float, optional
Polyak averaging soft updates factor. Defaults to 0.005.
policy_noise_std : float, optional
Standard deviation of noise. Defaults to 0.2.
policy_noise_clip : float, optional
Clip value of noise. Defaults to 0.5.
actor_lr : float, optional
Actor's learning rate. Defaults to 1e-3.
critic_lr : float, optional
Critic's learning rate. Defaults to 1e-3.
batch_size : int, optional
Batch size for replay buffer and networks. Defaults to 128.
exploration_noise : float, optional
Exploration noise value. Defaults to 0.1.
num_layers : int, optional
Number of LSTM layers. Defaults to 3.
dropout : float, optional
Dropout value of LSTM. Defaults to 0.2.
add_lstm : bool, optional
Boolean flag to add LSTM or not. Defaults to False.
warmup_steps : int, optional
Memory warmup steps. Defaults to 100.
Expand source code
class TD3Agent:
    """This is the agent class for the TD3 Agent.

    Original paper can be found at https://arxiv.org/abs/1802.09477

    This implementation was adapted from https://github.com/saashanair/rl-series/tree/master/td3
    
    """
    def __init__(
            self,
            env,
            hidden_dim = 256,
            device = 'cuda',
            memory_dim=100_000,
            max_action = 1,
            discount=0.99,
            update_freq=2,
            tau=0.005,
            policy_noise_std=0.2,
            policy_noise_clip=0.5,
            actor_lr= 1e-3,
            critic_lr= 1e-3,
            batch_size=128,
            exploration_noise=0.1,
            num_layers = 3,
            dropout = 0.2,
            add_lstm = False,
            warmup_steps = 100):

        """Initializes the TD3 Agent

        Args:
            env (gym object): Gym environment for the agent to interact with
            hidden_dim (int, optional): Size of hidden layer neurons. Defaults to 256.
            device (str, optional): One of cuda or cpu. Defaults to 'cuda'.
            memory_dim (int, optional): Size of replay buffer. Defaults to 100_000.
            max_action (int, optional): Action scaling factor. Defaults to 1.
            discount (float, optional): Reward discount factor. Defaults to 0.99.
            update_freq (int, optional): Number of times to update targets networks. Defaults to 2.
            tau (float, optional): Polyak averaging soft updates factor. Defaults to 0.005.
            policy_noise_std (float, optional): Standard deviation of noise. Defaults to 0.2.
            policy_noise_clip (float, optional): Clip value of noise. Defaults to 0.5.
            actor_lr (float, optional): Actor's learning rate. Defaults to 1e-3.
            critic_lr (float, optional): Critic's learning rate. Defaults to 1e-3.
            batch_size (int, optional): Batch size for replay buffer and networks. Defaults to 128.
            exploration_noise (float, optional): Exploration noise value. Defaults to 0.1.
            num_layers (int, optional): Number of LSTM layers. Defaults to 3.
            dropout (float, optional): Dropout value of LSTM. Defaults to 0.2.
            add_lstm (bool, optional): Boolean flag to add LSTM or not. Defaults to False.
            warmup_steps (int, optional): Memory warmup steps. Defaults to 100.
        """            

        self.env = env
        self.state_dim = env.observation_space.shape[0]
        self.hidden_dim = hidden_dim
        self.action_dim = env.action_space.shape[-1]
        self.lookback = env.lookback_period
        self.device = device
        self.max_action = max_action
        self.memory_dim = memory_dim
        self.discount = discount
        self.update_freq = update_freq
        self.tau = tau
        self.policy_noise_std = policy_noise_std
        self.policy_noise_clip = policy_noise_clip
        self.actor_lr = actor_lr
        self.critic_lr = critic_lr
        self.batch_size = batch_size
        self.exploration_noise = exploration_noise
        self.eval = False
        self.num_layers = num_layers
        self.dropout =dropout
        self.warmup_steps = warmup_steps

        # Instatiate Memory Buffer
        self.memory = Memory(self.memory_dim)

        # Instantiate Actor and Target Actor
        self.actor = Actor(state_dim = self.state_dim, 
                           action_dim = self.action_dim, 
                           hidden_dim = self.hidden_dim, 
                           lookback_dim = self.lookback, 
                           num_layers = self.num_layers,
                           lr = self.actor_lr, 
                           max_action = self.max_action, 
                           dropout = self.dropout, 
                           add_lstm = add_lstm)       
        self.actor.to(self.device)
        
        self.target_actor = Actor(state_dim = self.state_dim, 
                           action_dim = self.action_dim, 
                           hidden_dim = self.hidden_dim, 
                           lookback_dim = self.lookback, 
                           num_layers = self.num_layers,
                           lr = self.actor_lr, 
                           max_action = self.max_action, 
                           dropout = self.dropout,
                           add_lstm = add_lstm)       
        self.target_actor.to(self.device)

        # Instantiate Critic 1 and Target Critic 1
        self.critic1 = Critic(state_dim = self.state_dim, 
                              action_dim = self.action_dim , 
                              hidden_dim = self.hidden_dim,
                              lr = self.critic_lr)
        self.critic1.to(self.device)
        
        self.target_critic1 = Critic(state_dim = self.state_dim, 
                              action_dim = self.action_dim , 
                              hidden_dim = self.hidden_dim,
                              lr = self.critic_lr)
        self.target_critic1.to(self.device)

        # Instantiate Critic 2 and Target Critic 2
        self.critic2 = Critic(state_dim = self.state_dim, 
                              action_dim = self.action_dim , 
                              hidden_dim = self.hidden_dim,
                              lr = self.critic_lr)
        self.critic2.to(self.device)
        
        self.target_critic2 = Critic(state_dim = self.state_dim, 
                              action_dim = self.action_dim , 
                              hidden_dim = self.hidden_dim,
                              lr = self.critic_lr)
        self.target_critic2.to(self.device)

        # Copy weight to target networks
        self.target_actor.load_state_dict(self.actor.state_dict())
        self.target_critic1.load_state_dict(self.critic1.state_dict())
        self.target_critic2.load_state_dict(self.critic2.state_dict())
        
        # since we do not learn/train on the target networks
        self.target_actor.eval()
        self.target_critic1.eval()
        self.target_critic2.eval()

    # for test mode
    def eval_mode(self):
        """
        Switches agent from training mode to eval mode
        """        
        self.eval = True
        self.actor.eval()
        self.critic1.eval()
        self.critic2.eval()
    
    def select_action(self, state, exploration_noise=0.1):
        """Takes in current environment's state and returns the agent's action

        Args:
            state (array_like): Current environment state
            exploration_noise (float, optional): Policy exploration noise. Defaults to 0.1.

        Returns:
            action: Agent's action
        """
        if not torch.is_tensor(state):
            state = torch.tensor(np.array([state]), dtype=torch.float32).to(self.device)

        # Forward pass through actor network
        action = self.actor(state).cpu().data.numpy().flatten()

        if self.eval:
            exploration_noise = 0.0

        noise = np.random.normal(0.0, exploration_noise, size=action.shape)

        noisy_action = (action + noise)
        noisy_action = TD3Agent._softmax(noisy_action)

        return noisy_action

    def _update(self, source_net_params, target_net_params):
        """
        Helper method to update target network weights
        """        
        for source_param, target_param in zip(
                source_net_params, target_net_params):
            target_param.data.copy_(
                self.tau * source_param.data + (1 - self.tau) * target_param.data)

    def update_targets(self):
        self._update(self.actor.parameters(), self.target_actor.parameters())
        self._update(
            self.critic1.parameters(),
            self.target_critic1.parameters())
        self._update(
            self.critic2.parameters(),
            self.target_critic2.parameters())
    
    @staticmethod
    def _softmax(x, axis = 0):
        """Helper method to softmax action values

        Args:
            x (array_like): Action values
            axis (int, optional): Defaults to 0.
        """        
        # Use the LogSumExp Trick
        max_val = np.amax(x, axis=axis, keepdims = True)
        x = x - max_val

        # Softmax
        num = np.exp(x)
        denum = num.sum(axis = axis, keepdims = True)
        softmax = num/denum
        return softmax

    def _learn(self, iteration):
        """Helper method to train agent

        Args:
            iteration (int): Number of training iterations
        """        
        if len(self.memory) < self.batch_size:
            return

        # Memory Replay
        states, actions, next_states, rewards, dones = self.memory.sample(
            self.batch_size, self.device)

        actions =  nn.Softmax(dim = 1)(actions)
        actions = actions.view(-1, self.action_dim)
        rewards = rewards.view(-1, 1)
        
        
        with torch.no_grad():
            # Target Policy Smoothing
            pred_action = self.target_actor(next_states)
            noise = torch.zeros_like(pred_action).normal_(0, self.policy_noise_std).to(self.device)
            noisy_pred_action = pred_action + noise.clamp(-self.policy_noise_clip, self.policy_noise_clip)
            noisy_pred_action = nn.Softmax(dim = 1)(noisy_pred_action)

            # Clipped Double Q Learning
            target_q1 = self.target_critic1(next_states, noisy_pred_action)
            target_q2 = self.target_critic2(next_states, noisy_pred_action)
            target_q = torch.min(target_q1, target_q2).detach()
            target_q[dones] = 0.0
            y = rewards + self.discount * target_q

        # Loss Computation
        current_q1 = self.critic1(states, actions)
        current_q2 = self.critic2(states, actions)

        critic1_loss = F.mse_loss(current_q1, y).mean()
        critic2_loss = F.mse_loss(current_q2, y).mean()

        # Gradient Descent on critics
        self.critic1.optimizer.zero_grad()
        critic1_loss.backward()
        self.critic1.optimizer.step()

        self.critic2.optimizer.zero_grad()
        critic2_loss.backward()
        self.critic2.optimizer.step()


        # delayed policy and target updates
        if iteration % self.update_freq == 0:

            # Compute actor loss
            pred_current_actions = self.actor(states)
            pred_current_q1 = self.critic1(states, pred_current_actions)
            actor_loss = - pred_current_q1.mean()
            
            self.actor.optimizer.zero_grad()
            actor_loss.backward()
            self.actor.optimizer.step()

            # apply slow-update to all three target networks
            self.update_targets()

    def fill_memory(self):
        """
        Helper method to fill replay buffer during the warmup steps
        """        
        fill_memory_epochs = self.warmup_steps//self.env.episode_length
        
        for _ in range(fill_memory_epochs):
            state = self.env.reset()
            done = False

            while not done:
                # do random action for warmup
                action = self.env.action_space.sample() 
                action = np.array(action/sum(action))
                next_state, reward, done, _ = self.env.step(action)
                
                # store the transition to memory
                self.memory.store([state, action, next_state, reward, done]) 
                state = next_state

    def train(self, total_steps, timesteps, print_every, count_of_dones):
        """Helper method to train the agent

        Args:
            total_steps (int): Total steps the agent has taken
            timesteps (int): Total timesteps the agent has interacted for
            print_every (int): Verbosity control
            count_of_dones (int): Count of completed episodes
        """        
        done = False
        state = self.env.reset()
        ep_reward = 0
        flag = False
        
        while not done:
            action = self.select_action(state, self.exploration_noise)
            next_state, reward, done, _ = self.env.step(action)
            self.memory.store([state, action, next_state, reward, done])
            self._learn(total_steps)
            state = next_state
            total_steps += 1
            ep_reward += reward
                     
            if done:
                count_of_dones += 1
                flag = True
            
            if flag and count_of_dones % print_every == 0:
                    print(f'Score at timestep {total_steps}: {ep_reward}.')
                    flag = False
                    
            if total_steps >= timesteps:
                break
        
        return total_steps, count_of_dones

    def predict(self, state):
        """Returns agent's action based on a given state

        Args:
            state (array_like): Current environment state

        Returns:
            action (array_like): Agent's action
        """        
        self.eval_mode()
        action = self.select_action(state, self.exploration_noise)
        return action

    def learn(self, timesteps, print_every = 1):
        """
        Trains the agent

        Params
        ======
            timesteps (int): Number of timesteps the agent should interact with the environment
            print_every (int): Verbosity control
        """
        epochs = timesteps//self.env.episode_length + 1
        self.fill_memory()
        print('Startup memory filled!')
        
        count_of_dones = 0
        
        total_steps = 0
        for _ in range(epochs):
            total_steps, count_of_dones = self.train(total_steps, timesteps, print_every, count_of_dones)

    def save(self, filepath):
        """
        Saves trained model

        Params
        =====
        filepath(str) : folder path to save the agent
        """
        torch.save(self.critic1.state_dict(), filepath + '_critic1')
        torch.save(self.critic1.optimizer.state_dict(), filepath + '_critic1_optimizer')

        torch.save(self.critic2.state_dict(), filepath + '_critic2')
        torch.save(self.critic2.optimizer.state_dict(), filepath + '_critic2_optimizer')

        torch.save(self.actor.state_dict(), filepath + '_actor')
        torch.save(self.actor.optimizer.state_dict(), filepath + '_actor_optimizer')

    def load(self, filename):
        """
        Loads trained model

        Params
        =====
        filepath(str) : folder path to save the agent
        """
        self.critic1.load_state_dict(torch.load(filename + '_critic1'))
        self.critic1.optimizer.load_state_dict(torch.load(filename + '_critic1_optimizer'))

        self.critic2.load_state_dict(torch.load(filename + '_critic2'))
        self.critic2.optimizer.load_state_dict(torch.load(filename + '_critic2_optimizer'))

        self.actor.load_state_dict(torch.load(filename + '_actor'))
        self.actor.optimizer.load_state_dict(torch.load(filename + '_actor_optimizer'))
        
        self.target_actor.load_state_dict(self.actor.state_dict())
        self.target_critic1.load_state_dict(self.critic1.state_dict())
        self.target_critic2.load_state_dict(self.critic2.state_dict())

Methods

def eval_mode(self)

Switches agent from training mode to eval mode

Expand source code
def eval_mode(self):
    """
    Switches agent from training mode to eval mode
    """        
    self.eval = True
    self.actor.eval()
    self.critic1.eval()
    self.critic2.eval()
def fill_memory(self)

Helper method to fill replay buffer during the warmup steps

Expand source code
def fill_memory(self):
    """
    Helper method to fill replay buffer during the warmup steps
    """        
    fill_memory_epochs = self.warmup_steps//self.env.episode_length
    
    for _ in range(fill_memory_epochs):
        state = self.env.reset()
        done = False

        while not done:
            # do random action for warmup
            action = self.env.action_space.sample() 
            action = np.array(action/sum(action))
            next_state, reward, done, _ = self.env.step(action)
            
            # store the transition to memory
            self.memory.store([state, action, next_state, reward, done]) 
            state = next_state
def learn(self, timesteps, print_every=1)

Trains the agent

Params

timesteps (int): Number of timesteps the agent should interact with the environment
print_every (int): Verbosity control
Expand source code
def learn(self, timesteps, print_every = 1):
    """
    Trains the agent

    Params
    ======
        timesteps (int): Number of timesteps the agent should interact with the environment
        print_every (int): Verbosity control
    """
    epochs = timesteps//self.env.episode_length + 1
    self.fill_memory()
    print('Startup memory filled!')
    
    count_of_dones = 0
    
    total_steps = 0
    for _ in range(epochs):
        total_steps, count_of_dones = self.train(total_steps, timesteps, print_every, count_of_dones)
def load(self, filename)

Loads trained model

Params

filepath(str) : folder path to save the agent

Expand source code
def load(self, filename):
    """
    Loads trained model

    Params
    =====
    filepath(str) : folder path to save the agent
    """
    self.critic1.load_state_dict(torch.load(filename + '_critic1'))
    self.critic1.optimizer.load_state_dict(torch.load(filename + '_critic1_optimizer'))

    self.critic2.load_state_dict(torch.load(filename + '_critic2'))
    self.critic2.optimizer.load_state_dict(torch.load(filename + '_critic2_optimizer'))

    self.actor.load_state_dict(torch.load(filename + '_actor'))
    self.actor.optimizer.load_state_dict(torch.load(filename + '_actor_optimizer'))
    
    self.target_actor.load_state_dict(self.actor.state_dict())
    self.target_critic1.load_state_dict(self.critic1.state_dict())
    self.target_critic2.load_state_dict(self.critic2.state_dict())
def predict(self, state)

Returns agent's action based on a given state

Args

state : array_like
Current environment state

Returns

action (array_like): Agent's action

Expand source code
def predict(self, state):
    """Returns agent's action based on a given state

    Args:
        state (array_like): Current environment state

    Returns:
        action (array_like): Agent's action
    """        
    self.eval_mode()
    action = self.select_action(state, self.exploration_noise)
    return action
def save(self, filepath)

Saves trained model

Params

filepath(str) : folder path to save the agent

Expand source code
def save(self, filepath):
    """
    Saves trained model

    Params
    =====
    filepath(str) : folder path to save the agent
    """
    torch.save(self.critic1.state_dict(), filepath + '_critic1')
    torch.save(self.critic1.optimizer.state_dict(), filepath + '_critic1_optimizer')

    torch.save(self.critic2.state_dict(), filepath + '_critic2')
    torch.save(self.critic2.optimizer.state_dict(), filepath + '_critic2_optimizer')

    torch.save(self.actor.state_dict(), filepath + '_actor')
    torch.save(self.actor.optimizer.state_dict(), filepath + '_actor_optimizer')
def select_action(self, state, exploration_noise=0.1)

Takes in current environment's state and returns the agent's action

Args

state : array_like
Current environment state
exploration_noise : float, optional
Policy exploration noise. Defaults to 0.1.

Returns

action
Agent's action
Expand source code
def select_action(self, state, exploration_noise=0.1):
    """Takes in current environment's state and returns the agent's action

    Args:
        state (array_like): Current environment state
        exploration_noise (float, optional): Policy exploration noise. Defaults to 0.1.

    Returns:
        action: Agent's action
    """
    if not torch.is_tensor(state):
        state = torch.tensor(np.array([state]), dtype=torch.float32).to(self.device)

    # Forward pass through actor network
    action = self.actor(state).cpu().data.numpy().flatten()

    if self.eval:
        exploration_noise = 0.0

    noise = np.random.normal(0.0, exploration_noise, size=action.shape)

    noisy_action = (action + noise)
    noisy_action = TD3Agent._softmax(noisy_action)

    return noisy_action
def train(self, total_steps, timesteps, print_every, count_of_dones)

Helper method to train the agent

Args

total_steps : int
Total steps the agent has taken
timesteps : int
Total timesteps the agent has interacted for
print_every : int
Verbosity control
count_of_dones : int
Count of completed episodes
Expand source code
def train(self, total_steps, timesteps, print_every, count_of_dones):
    """Helper method to train the agent

    Args:
        total_steps (int): Total steps the agent has taken
        timesteps (int): Total timesteps the agent has interacted for
        print_every (int): Verbosity control
        count_of_dones (int): Count of completed episodes
    """        
    done = False
    state = self.env.reset()
    ep_reward = 0
    flag = False
    
    while not done:
        action = self.select_action(state, self.exploration_noise)
        next_state, reward, done, _ = self.env.step(action)
        self.memory.store([state, action, next_state, reward, done])
        self._learn(total_steps)
        state = next_state
        total_steps += 1
        ep_reward += reward
                 
        if done:
            count_of_dones += 1
            flag = True
        
        if flag and count_of_dones % print_every == 0:
                print(f'Score at timestep {total_steps}: {ep_reward}.')
                flag = False
                
        if total_steps >= timesteps:
            break
    
    return total_steps, count_of_dones
def update_targets(self)
Expand source code
def update_targets(self):
    self._update(self.actor.parameters(), self.target_actor.parameters())
    self._update(
        self.critic1.parameters(),
        self.target_critic1.parameters())
    self._update(
        self.critic2.parameters(),
        self.target_critic2.parameters())