Module AssetAllocator.environments.PortfolioGym

Expand source code
import gym
from gym import spaces
from gym.utils import seeding

import numpy as np
import pandas as pd
from .utils import softmax, log_to_simple

import sys
sys.path.append('../')

import yfinance as yf

class PortfolioManagementGym(gym.Env):
    """
    Portfolio Management Gym
    """
    def __init__(self,
                 data,
                 episode_length = None,
                 returns = True,
                 trading_cost_ratio = 0.001,
                 lookback_period = 64,
                 initial_investment = 1_000_000,
                 retain_cash = True,
                 random_start_range = 20,
                 dsr_constant = 1e-4,
                 add_softmax = False,
                 start_date = '2009-01-01',
                 end_date = '2022-01-01',
                 seed = 0):
        """
        Initializes the gym environment

        Args:
            data : pandas dataframe with date index and stock columnns with price data 
                or list of stock tickers
                
            episode_length : how long the agent should interact with the environment

            returns: If True, use log_returns as reward. Else, use sharpe ratio

            trading_cost_ratio : percentage of stock price that accounts for trading costs

            lookback_period : a fixed sized window, used to know how much data to return to the agent as observation

            initial_investment : how much the agent wants to invest

            retain_cash : bool value to tell the value whether to keep a cash value.

            random_start_range : random start position for training, should be set to 0 for test

            dsr_constant : smoothing parameter for differential sharpe ratio

            add_softmax : bool value to tell the agent whether to soft-normalize the input action

            start_date : start date for yahoo finance download

            end_date : end date for yahoo finance download

            seed : seed value for environment reproducibility
                  
        """

        self.data = data
        self.episode_length = episode_length
        self.returns = returns
        self.trading_cost_ratio = trading_cost_ratio
        self.lookback_period = lookback_period
        self.initial_investment = initial_investment
        self.retain_cash = retain_cash
        self.random_start_range = random_start_range
        self.dsr_constant = dsr_constant
        self.add_softmax = add_softmax
        self.start_date = start_date
        self.end_date = end_date

        if isinstance(data, pd.DataFrame):
            self.stocks_names = list(self.data.columns)
            _, self.n = self.data.shape
        else:
            self.stocks = data
            self.n = len(self.data)
        
        action_dim = self.n + self.retain_cash
        state_dim = self.lookback_period * self.n
        
        self.start_date = start_date
        self.end_date = end_date
        
        self.observation_space = spaces.Box(np.finfo('d').min,np.finfo('d').max,shape=(state_dim,))
        self.action_space = spaces.Box(0, 1, shape=(action_dim,))
        
        self._seed(seed)
        self.reset()

    def reset(self):
        """
        Resets the environment to the start state

        Returns:
            Initial observation (array_like)
        """        
        self._initialize_env()
        obs = self._get_new_state()
        width = self.observation_space.shape[0] - len(obs)
        obs = np.array(np.pad(obs, (width,0), constant_values = 0), dtype = 'float32')
        return obs

    def step(self, action):
        """
        Takes in an action of size action_dim
        Returns the observation, reward, episode_statuss
        """

        if not self._get_done_status():
            self.num_actions_taken += 1
            self._take_action(action)
        
        new_state = np.array(self._get_new_state(), dtype = 'float32')
        assert self.observation_space.contains(new_state), \
            f'observation does not belong to space'

        reward = self._get_reward()
        episode_over = self._get_done_status()
        info = {}

        return new_state, reward, episode_over, info

    def render(self):
        """
        Returns an array of simple returns, differential sharpe ratio, and available amount
        """        
        return [log_to_simple(self.log_returns[-1]), 
                    self.sharpe_ratios[-1], self.AVAILABLE_AMT]


   #################################################################################################
    #################################### HELPER FUNCTIONS ###########################################
    #################################################################################################
    

    def _seed(self, seed = None):
        """
        Helper method to set seed
        """        
        self.np_random, self.seed = seeding.np_random(seed)


    def _load_data(self):
        """
        Helper method to load the data
        """
        assert isinstance(self.data, pd.DataFrame) or isinstance(self.data, list), \
        'Please provide a list of tickers or a dataframe'

        # downloading data from yahoo finance if tickers were provided
        if isinstance(self.data, list):
            prices = yf.download(self.data, start = self.start_date,
                                    end = self.end_date, interval="1d", actions=True)
            prices.dropna(inplace=True)
            prices = prices["Adj Close"]
        else:
            prices = self.data.copy()
            prices.dropna(inplace=True)
        
        prices.index = pd.to_datetime(prices.index)
        
        for col in prices.columns:
            prices[col] = prices[col].astype(np.float32)
        return prices
    
    def _preprocess_data(self, df):
        """
        Helper method to preprocess the data
        """
        date_range = range(len(df))
        index_to_date_map = dict(zip(date_range, df.index))
        returns_df = df.pct_change().fillna(0)
        return index_to_date_map, returns_df

    def _initialize_env(self):
        """
        Helper method to create all the environment variables
        """
        ext_stock_list = self.stocks_names.copy() + ['Cash', 'Trading Costs']
        self.weights = [dict(zip(ext_stock_list, [0]*(self.n) + [1, 0]))]
        self.current_holding = [0] * self.n
        self.AVAILABLE_AMT = self.initial_investment
        self.CASH = self.initial_investment

        self.num_actions_taken = 0
        self.curr_reward = 0
        self.log_returns = []
        self.sharpe_ratios = []
        self.date_map = None

        self.prices = self._load_data()
        self.date_map, self.observations = self._preprocess_data(self.prices)
        
        self.start_day = self.np_random.choice(range(self.lookback_period, 
                            self.lookback_period + self.random_start_range + 1))
        self.end_day = min(self.start_day + self.episode_length, 
                            len(self.prices) - 1)

    def _get_new_state(self):
        """
        Helper method to return current observation state
        """
        self.current_day = self.num_actions_taken + self.start_day
        state_end_day = self.current_day - 1
        state_start_day = state_end_day - self.lookback_period + 1

        state_start_date = self.date_map[state_start_day]
        state_end_date = self.date_map[state_end_day]


        state_obs = self.observations[state_start_date : state_end_date]
        state_obs = np.array(np.concatenate(state_obs.fillna(0).values, 
                    axis = 0), dtype = 'float32')
        return state_obs

    def _check_actions(self, action, check_dim = True):
        """
        Helper method to check validity of the actions received
        """
        if abs(sum(action) - 1) > 1e-3:
            print(action)
            assert False, 'Wrong portfolio weights!'
        
        if check_dim:
            assert self.action_space.contains(np.array(action, dtype = 'float32')), \
                    f'{action} action does not belong to space'

    
    def _compute_buyable_shares(self, budgets, prices):
        """Helper method to compute buyable shares
        """        
        shares = [budget/price for budget, price in zip(budgets, prices)]
        return shares
    
    def _compute_trading_costs(self, shares_now, shares_prev, prices):
        """
        Helper method to compute trading costs
        """        
        trading_costs = []
        
        for now, prev, price in zip(shares_now, shares_prev, prices):
            diff = abs(now - prev)
            if diff < 1:
                trading_costs.append(0)
            else:
                #print(diff, price, self.trading_cost_ratio)
                trading_costs.append(diff * price * self.trading_cost_ratio)
        
        return trading_costs

    def _take_action(self, actions):
        """
        Helper method to compute effects of agent's action on environment
        """
        # For stable baseline model implementations
        if isinstance(actions, tuple):
            actions = actions[0]

        if self.add_softmax:
            actions = softmax(actions)

        self._check_actions(actions)
        
        # Allocating Budget
        if self.retain_cash:
            actions_ = actions[:-1]
            cash_budget_ratio = actions[-1]
        else:
            actions_ = actions
            cash_budget_ratio = 0

        budget_allocation = [action * self.AVAILABLE_AMT \
                            for action in actions_]
        self.CASH = cash_budget_ratio * self.AVAILABLE_AMT

        # Computing Trading Costs
        current_date = self.date_map[self.current_day]
        prices = self.prices.loc[current_date]
        
        buyable_shares = self._compute_buyable_shares(
                                                      budget_allocation,
                                                      prices)

        trading_costs = self._compute_trading_costs(buyable_shares, 
                                                    self.current_holding,
                                                    list(prices.values))

        self.current_holding = buyable_shares

        # Recomputing portfolio weights
        total_trading_costs = sum(trading_costs)
        ext_stock_list = self.stocks_names.copy() + ['Cash', 'Trading Costs']
        ext_allocation = budget_allocation + [self.CASH, total_trading_costs]
        total_amount = sum(ext_allocation)
        
        portfolio_weights = {stock : allocation/total_amount \
                            for stock, allocation in \
                            zip(ext_stock_list, ext_allocation)}

        self._check_actions(list(portfolio_weights.values()), check_dim = False)
        self.weights.append(portfolio_weights)
        
        assert total_trading_costs >= 0, 'Error in trading costs calculations'
        self.AVAILABLE_AMT = total_amount - total_trading_costs
        

    def _get_returns(self):
        """
        Helper method to calculate log returns
        """
        current_date = self.date_map[self.current_day]
        observation = self.observations.loc[current_date]
        curr_portfolio = self.weights[self.num_actions_taken]
        
        
        simple_returns = 0
        for stock in self.stocks_names:
            simple_returns += curr_portfolio[stock] * observation[stock]
        
        a =  simple_returns * self.AVAILABLE_AMT
        self.AVAILABLE_AMT += a
        
        self.log_returns += [np.log(simple_returns + 1)]
        
    def _get_sharpe_ratio(self):
        """
        Helper method to calculate differential sharpe ratio
        """
        if self.num_actions_taken < self.lookback_period:
            S = 0
        else:
            window = [log_to_simple(i) for i in self.log_returns]
            S = np.nanmean(window)/np.nanstd(window) * np.sqrt(252) / len(window)
        self.sharpe_ratios += [S]

    def _get_reward(self):
        """
        Helper method to calculate both rewards and return one 
        """
        self._get_returns()
        self._get_sharpe_ratio()
        
        if self.returns:
            curr_reward = self.log_returns[-1]
        else:
            curr_reward = self.sharpe_ratios[-1]
        return curr_reward

    def _get_done_status(self):
        """
        Helper method to get end state status
        """
        return self.current_day >= self.end_day

Classes

class PortfolioManagementGym (data, episode_length=None, returns=True, trading_cost_ratio=0.001, lookback_period=64, initial_investment=1000000, retain_cash=True, random_start_range=20, dsr_constant=0.0001, add_softmax=False, start_date='2009-01-01', end_date='2022-01-01', seed=0)

Portfolio Management Gym

Initializes the gym environment

Args

data : pandas dataframe with date index and stock columnns with price data or list of stock tickers

episode_length : how long the agent should interact with the environment

returns
If True, use log_returns as reward. Else, use sharpe ratio

trading_cost_ratio : percentage of stock price that accounts for trading costs

lookback_period : a fixed sized window, used to know how much data to return to the agent as observation

initial_investment : how much the agent wants to invest

retain_cash : bool value to tell the value whether to keep a cash value.

random_start_range : random start position for training, should be set to 0 for test

dsr_constant : smoothing parameter for differential sharpe ratio

add_softmax : bool value to tell the agent whether to soft-normalize the input action

start_date : start date for yahoo finance download

end_date : end date for yahoo finance download

seed : seed value for environment reproducibility

Expand source code
class PortfolioManagementGym(gym.Env):
    """
    Portfolio Management Gym
    """
    def __init__(self,
                 data,
                 episode_length = None,
                 returns = True,
                 trading_cost_ratio = 0.001,
                 lookback_period = 64,
                 initial_investment = 1_000_000,
                 retain_cash = True,
                 random_start_range = 20,
                 dsr_constant = 1e-4,
                 add_softmax = False,
                 start_date = '2009-01-01',
                 end_date = '2022-01-01',
                 seed = 0):
        """
        Initializes the gym environment

        Args:
            data : pandas dataframe with date index and stock columnns with price data 
                or list of stock tickers
                
            episode_length : how long the agent should interact with the environment

            returns: If True, use log_returns as reward. Else, use sharpe ratio

            trading_cost_ratio : percentage of stock price that accounts for trading costs

            lookback_period : a fixed sized window, used to know how much data to return to the agent as observation

            initial_investment : how much the agent wants to invest

            retain_cash : bool value to tell the value whether to keep a cash value.

            random_start_range : random start position for training, should be set to 0 for test

            dsr_constant : smoothing parameter for differential sharpe ratio

            add_softmax : bool value to tell the agent whether to soft-normalize the input action

            start_date : start date for yahoo finance download

            end_date : end date for yahoo finance download

            seed : seed value for environment reproducibility
                  
        """

        self.data = data
        self.episode_length = episode_length
        self.returns = returns
        self.trading_cost_ratio = trading_cost_ratio
        self.lookback_period = lookback_period
        self.initial_investment = initial_investment
        self.retain_cash = retain_cash
        self.random_start_range = random_start_range
        self.dsr_constant = dsr_constant
        self.add_softmax = add_softmax
        self.start_date = start_date
        self.end_date = end_date

        if isinstance(data, pd.DataFrame):
            self.stocks_names = list(self.data.columns)
            _, self.n = self.data.shape
        else:
            self.stocks = data
            self.n = len(self.data)
        
        action_dim = self.n + self.retain_cash
        state_dim = self.lookback_period * self.n
        
        self.start_date = start_date
        self.end_date = end_date
        
        self.observation_space = spaces.Box(np.finfo('d').min,np.finfo('d').max,shape=(state_dim,))
        self.action_space = spaces.Box(0, 1, shape=(action_dim,))
        
        self._seed(seed)
        self.reset()

    def reset(self):
        """
        Resets the environment to the start state

        Returns:
            Initial observation (array_like)
        """        
        self._initialize_env()
        obs = self._get_new_state()
        width = self.observation_space.shape[0] - len(obs)
        obs = np.array(np.pad(obs, (width,0), constant_values = 0), dtype = 'float32')
        return obs

    def step(self, action):
        """
        Takes in an action of size action_dim
        Returns the observation, reward, episode_statuss
        """

        if not self._get_done_status():
            self.num_actions_taken += 1
            self._take_action(action)
        
        new_state = np.array(self._get_new_state(), dtype = 'float32')
        assert self.observation_space.contains(new_state), \
            f'observation does not belong to space'

        reward = self._get_reward()
        episode_over = self._get_done_status()
        info = {}

        return new_state, reward, episode_over, info

    def render(self):
        """
        Returns an array of simple returns, differential sharpe ratio, and available amount
        """        
        return [log_to_simple(self.log_returns[-1]), 
                    self.sharpe_ratios[-1], self.AVAILABLE_AMT]


   #################################################################################################
    #################################### HELPER FUNCTIONS ###########################################
    #################################################################################################
    

    def _seed(self, seed = None):
        """
        Helper method to set seed
        """        
        self.np_random, self.seed = seeding.np_random(seed)


    def _load_data(self):
        """
        Helper method to load the data
        """
        assert isinstance(self.data, pd.DataFrame) or isinstance(self.data, list), \
        'Please provide a list of tickers or a dataframe'

        # downloading data from yahoo finance if tickers were provided
        if isinstance(self.data, list):
            prices = yf.download(self.data, start = self.start_date,
                                    end = self.end_date, interval="1d", actions=True)
            prices.dropna(inplace=True)
            prices = prices["Adj Close"]
        else:
            prices = self.data.copy()
            prices.dropna(inplace=True)
        
        prices.index = pd.to_datetime(prices.index)
        
        for col in prices.columns:
            prices[col] = prices[col].astype(np.float32)
        return prices
    
    def _preprocess_data(self, df):
        """
        Helper method to preprocess the data
        """
        date_range = range(len(df))
        index_to_date_map = dict(zip(date_range, df.index))
        returns_df = df.pct_change().fillna(0)
        return index_to_date_map, returns_df

    def _initialize_env(self):
        """
        Helper method to create all the environment variables
        """
        ext_stock_list = self.stocks_names.copy() + ['Cash', 'Trading Costs']
        self.weights = [dict(zip(ext_stock_list, [0]*(self.n) + [1, 0]))]
        self.current_holding = [0] * self.n
        self.AVAILABLE_AMT = self.initial_investment
        self.CASH = self.initial_investment

        self.num_actions_taken = 0
        self.curr_reward = 0
        self.log_returns = []
        self.sharpe_ratios = []
        self.date_map = None

        self.prices = self._load_data()
        self.date_map, self.observations = self._preprocess_data(self.prices)
        
        self.start_day = self.np_random.choice(range(self.lookback_period, 
                            self.lookback_period + self.random_start_range + 1))
        self.end_day = min(self.start_day + self.episode_length, 
                            len(self.prices) - 1)

    def _get_new_state(self):
        """
        Helper method to return current observation state
        """
        self.current_day = self.num_actions_taken + self.start_day
        state_end_day = self.current_day - 1
        state_start_day = state_end_day - self.lookback_period + 1

        state_start_date = self.date_map[state_start_day]
        state_end_date = self.date_map[state_end_day]


        state_obs = self.observations[state_start_date : state_end_date]
        state_obs = np.array(np.concatenate(state_obs.fillna(0).values, 
                    axis = 0), dtype = 'float32')
        return state_obs

    def _check_actions(self, action, check_dim = True):
        """
        Helper method to check validity of the actions received
        """
        if abs(sum(action) - 1) > 1e-3:
            print(action)
            assert False, 'Wrong portfolio weights!'
        
        if check_dim:
            assert self.action_space.contains(np.array(action, dtype = 'float32')), \
                    f'{action} action does not belong to space'

    
    def _compute_buyable_shares(self, budgets, prices):
        """Helper method to compute buyable shares
        """        
        shares = [budget/price for budget, price in zip(budgets, prices)]
        return shares
    
    def _compute_trading_costs(self, shares_now, shares_prev, prices):
        """
        Helper method to compute trading costs
        """        
        trading_costs = []
        
        for now, prev, price in zip(shares_now, shares_prev, prices):
            diff = abs(now - prev)
            if diff < 1:
                trading_costs.append(0)
            else:
                #print(diff, price, self.trading_cost_ratio)
                trading_costs.append(diff * price * self.trading_cost_ratio)
        
        return trading_costs

    def _take_action(self, actions):
        """
        Helper method to compute effects of agent's action on environment
        """
        # For stable baseline model implementations
        if isinstance(actions, tuple):
            actions = actions[0]

        if self.add_softmax:
            actions = softmax(actions)

        self._check_actions(actions)
        
        # Allocating Budget
        if self.retain_cash:
            actions_ = actions[:-1]
            cash_budget_ratio = actions[-1]
        else:
            actions_ = actions
            cash_budget_ratio = 0

        budget_allocation = [action * self.AVAILABLE_AMT \
                            for action in actions_]
        self.CASH = cash_budget_ratio * self.AVAILABLE_AMT

        # Computing Trading Costs
        current_date = self.date_map[self.current_day]
        prices = self.prices.loc[current_date]
        
        buyable_shares = self._compute_buyable_shares(
                                                      budget_allocation,
                                                      prices)

        trading_costs = self._compute_trading_costs(buyable_shares, 
                                                    self.current_holding,
                                                    list(prices.values))

        self.current_holding = buyable_shares

        # Recomputing portfolio weights
        total_trading_costs = sum(trading_costs)
        ext_stock_list = self.stocks_names.copy() + ['Cash', 'Trading Costs']
        ext_allocation = budget_allocation + [self.CASH, total_trading_costs]
        total_amount = sum(ext_allocation)
        
        portfolio_weights = {stock : allocation/total_amount \
                            for stock, allocation in \
                            zip(ext_stock_list, ext_allocation)}

        self._check_actions(list(portfolio_weights.values()), check_dim = False)
        self.weights.append(portfolio_weights)
        
        assert total_trading_costs >= 0, 'Error in trading costs calculations'
        self.AVAILABLE_AMT = total_amount - total_trading_costs
        

    def _get_returns(self):
        """
        Helper method to calculate log returns
        """
        current_date = self.date_map[self.current_day]
        observation = self.observations.loc[current_date]
        curr_portfolio = self.weights[self.num_actions_taken]
        
        
        simple_returns = 0
        for stock in self.stocks_names:
            simple_returns += curr_portfolio[stock] * observation[stock]
        
        a =  simple_returns * self.AVAILABLE_AMT
        self.AVAILABLE_AMT += a
        
        self.log_returns += [np.log(simple_returns + 1)]
        
    def _get_sharpe_ratio(self):
        """
        Helper method to calculate differential sharpe ratio
        """
        if self.num_actions_taken < self.lookback_period:
            S = 0
        else:
            window = [log_to_simple(i) for i in self.log_returns]
            S = np.nanmean(window)/np.nanstd(window) * np.sqrt(252) / len(window)
        self.sharpe_ratios += [S]

    def _get_reward(self):
        """
        Helper method to calculate both rewards and return one 
        """
        self._get_returns()
        self._get_sharpe_ratio()
        
        if self.returns:
            curr_reward = self.log_returns[-1]
        else:
            curr_reward = self.sharpe_ratios[-1]
        return curr_reward

    def _get_done_status(self):
        """
        Helper method to get end state status
        """
        return self.current_day >= self.end_day

Ancestors

  • gym.core.Env

Methods

def render(self)

Returns an array of simple returns, differential sharpe ratio, and available amount

Expand source code
def render(self):
    """
    Returns an array of simple returns, differential sharpe ratio, and available amount
    """        
    return [log_to_simple(self.log_returns[-1]), 
                self.sharpe_ratios[-1], self.AVAILABLE_AMT]
def reset(self)

Resets the environment to the start state

Returns

Initial observation (array_like)

Expand source code
def reset(self):
    """
    Resets the environment to the start state

    Returns:
        Initial observation (array_like)
    """        
    self._initialize_env()
    obs = self._get_new_state()
    width = self.observation_space.shape[0] - len(obs)
    obs = np.array(np.pad(obs, (width,0), constant_values = 0), dtype = 'float32')
    return obs
def step(self, action)

Takes in an action of size action_dim Returns the observation, reward, episode_statuss

Expand source code
def step(self, action):
    """
    Takes in an action of size action_dim
    Returns the observation, reward, episode_statuss
    """

    if not self._get_done_status():
        self.num_actions_taken += 1
        self._take_action(action)
    
    new_state = np.array(self._get_new_state(), dtype = 'float32')
    assert self.observation_space.contains(new_state), \
        f'observation does not belong to space'

    reward = self._get_reward()
    episode_over = self._get_done_status()
    info = {}

    return new_state, reward, episode_over, info