Module AssetAllocator.experiment

Expand source code
import os
import sys

if '__file__' in vars():
    # print("We are running the script non interactively")
    path = os.path.join(os.path.dirname(__file__), os.pardir)
    sys.path.append(path)    
else:
    # print('We are running the script interactively')
    sys.path.append("..")

from .algorithms.Baselines.agent import BaselineAgent
from .algorithms.NAF.agent import NAFAgent
from .algorithms.TD3.agent import TD3Agent

from .algorithms.PPO.agent import PPOAgent
from .algorithms.TRPO.agent import TRPOAgent

from .algorithms.DDPG.agent import DDPGAgent
from .algorithms.REINFORCE.agent import REINFORCEAgent

from .algorithms.SAC.agent import SACAgent
from .algorithms.A2C.agent import A2CAgent

from .environments.utils import log_to_simple, simple_to_log
from .trainer import Trainer

from stable_baselines3.common.vec_env import DummyVecEnv, SubprocVecEnv
from stable_baselines3 import TD3 as STBTD3
from stable_baselines3.common.noise import NormalActionNoise, OrnsteinUhlenbeckActionNoise
from stable_baselines3 import SAC as STBSAC
from stable_baselines3 import PPO as STBPPO
from stable_baselines3 import DDPG as STBDDPG
from stable_baselines3 import A2C as STBA2C

class Experiment:
    def __init__(self, trainer_kwargs = {}, model_kwargs = {}, timesteps = None):
        self.trainer_kwargs = trainer_kwargs
        self.model_kwargs = model_kwargs
        self.timesteps = timesteps
        
    def run(self, model_name, dataset = None):
        if dataset is None:
            dataset = 'DOW30'
        
        if self.timesteps is None:
            timesteps = [10_000, 100_000]
        else:
            timesteps = self.timesteps
        
        rewards = [True] #[True, False]
        trading_costs = [0, 0.001, 0.01]
        retain_cash = False
        lookback = 64
        
        if model_name in ['MPT', 'Uniform', 'Random', 'BuyAndHold']:
            for trading_cost in trading_costs:
                path = f'./Baseline_{dataset}_Results/'  
                isExist = os.path.exists(path)
                if not isExist:
                    os.makedirs(path) 
                name = f'{path}{model_name}-Trading_cost-{trading_cost}'
                
                if model_name == 'Random':
                    self.trainer_kwargs['test_runs'] = 100

                trainer = Trainer(filepath = f'data/{dataset}.csv',
                              experiment_name = name,
                              timesteps = -1,
                              lookback_period = lookback,
                              trading_cost_ratio = trading_cost,
                              **self.trainer_kwargs)

                env = trainer.get_train_env()
                model = BaselineAgent(model_name, env, **self.model_kwargs)
                    
                returns = trainer.run(model)

                log_rets = 0

                for ret in returns:
                        log_rets += log_to_simple(sum([simple_to_log(i[0]) for i in ret]))

                log_rets /= len(returns)
                n = name.split('/')[-1]
                print(f'{n} : {log_rets}')

        else:
            for reward in rewards:
                for timestep in timesteps:
                    for trading_cost in trading_costs:                                            
                        if reward:
                            strng = 'LogRets'
                        else:
                            strng = 'ShRt'

                        path = f'./{model_name}_{dataset}_Results/'
                        isExist = os.path.exists(path)
                        if not isExist:
                            os.makedirs(path)                       

                        name = f'{path}Reward-{strng}_timestep-{timestep}_trading_cost-{trading_cost}'

                        trainer = Trainer(filepath = f'data/{dataset}.csv',
                              experiment_name = name,
                              timesteps = timestep,
                              lookback_period = lookback,
                              trading_cost_ratio = trading_cost,
                              returns = reward,
                              retain_cash = retain_cash,
                              **self.trainer_kwargs)

                        env = trainer.get_train_env()
                                         
                        if model_name in ['TD3']:
                            model = TD3Agent(env, **self.model_kwargs)
                        elif model_name in ['NAF']:
                            model = NAFAgent(env, **self.model_kwargs)
                        elif model_name in ['PPO']:
                            model = PPO(env, **self.model_kwargs)
                        elif model_name in ['TRPO']:
                            #env = SubprocVecEnv([env  for i in range(4)])
                            model = TRPOAgent(env, **self.model_kwargs)
                        elif model_name in ['DDPG']:
                            model = DDPGAgent(env, **self.model_kwargs)
                        elif model_name in ['REINFORCE']:
                            model = REINFORCEAgent(env, **self.model_kwargs)
                        elif model_name in ['SAC']:
                            model = SACAgent(env, **self.model_kwargs)
                        elif model_name in ['A2C']:
                            model = A2CAgent(env, **self.model_kwargs)
                        elif model_name in ['STB-TD3']:
                            n_actions = env.action_space.shape[-1]
                            action_noise = NormalActionNoise(mean=np.zeros(n_actions), sigma=0.1 * np.ones(n_actions))
                            model = STBTD3("MlpPolicy", env, action_noise=action_noise, verbose=1, **self.model_kwargs)
                        elif model_name in ['STB-SAC']:
                            model = STBSAC("MlpPolicy", env, verbose=1, **self.model_kwargs)
                        elif model_name in ['STB-PPO']:
                            #env = SubprocVecEnv([env  for i in range(4)])
                            model = STBPPO("MlpPolicy", env, verbose=1)
                        elif model_name in ['STB-A2C']:
                            #env = SubprocVecEnv([env  for i in range(4)])
                            model = STBA2C("MlpPolicy", env, verbose=1)
                        elif model_name in ['STB-DDPG']:
                            n_actions = env.action_space.shape[-1]
                            action_noise = NormalActionNoise(mean=np.zeros(n_actions), sigma=0.1 * np.ones(n_actions))
                            model = STBDDPG("MlpPolicy", env, action_noise=action_noise, verbose=1, **self.model_kwargs)
                        else:
                            assert False, 'Wrong Name Passed In!'

                        returns = trainer.run(model)

                        log_rets = 0

                        for ret in returns:
                                log_rets += log_to_simple(sum([simple_to_log(i[0]) for i in ret]))

                        log_rets /= len(returns)
                        n = name.split('/')[-1]
                        print(f'{n} : {log_rets}')

Classes

class Experiment (trainer_kwargs={}, model_kwargs={}, timesteps=None)
Expand source code
class Experiment:
    def __init__(self, trainer_kwargs = {}, model_kwargs = {}, timesteps = None):
        self.trainer_kwargs = trainer_kwargs
        self.model_kwargs = model_kwargs
        self.timesteps = timesteps
        
    def run(self, model_name, dataset = None):
        if dataset is None:
            dataset = 'DOW30'
        
        if self.timesteps is None:
            timesteps = [10_000, 100_000]
        else:
            timesteps = self.timesteps
        
        rewards = [True] #[True, False]
        trading_costs = [0, 0.001, 0.01]
        retain_cash = False
        lookback = 64
        
        if model_name in ['MPT', 'Uniform', 'Random', 'BuyAndHold']:
            for trading_cost in trading_costs:
                path = f'./Baseline_{dataset}_Results/'  
                isExist = os.path.exists(path)
                if not isExist:
                    os.makedirs(path) 
                name = f'{path}{model_name}-Trading_cost-{trading_cost}'
                
                if model_name == 'Random':
                    self.trainer_kwargs['test_runs'] = 100

                trainer = Trainer(filepath = f'data/{dataset}.csv',
                              experiment_name = name,
                              timesteps = -1,
                              lookback_period = lookback,
                              trading_cost_ratio = trading_cost,
                              **self.trainer_kwargs)

                env = trainer.get_train_env()
                model = BaselineAgent(model_name, env, **self.model_kwargs)
                    
                returns = trainer.run(model)

                log_rets = 0

                for ret in returns:
                        log_rets += log_to_simple(sum([simple_to_log(i[0]) for i in ret]))

                log_rets /= len(returns)
                n = name.split('/')[-1]
                print(f'{n} : {log_rets}')

        else:
            for reward in rewards:
                for timestep in timesteps:
                    for trading_cost in trading_costs:                                            
                        if reward:
                            strng = 'LogRets'
                        else:
                            strng = 'ShRt'

                        path = f'./{model_name}_{dataset}_Results/'
                        isExist = os.path.exists(path)
                        if not isExist:
                            os.makedirs(path)                       

                        name = f'{path}Reward-{strng}_timestep-{timestep}_trading_cost-{trading_cost}'

                        trainer = Trainer(filepath = f'data/{dataset}.csv',
                              experiment_name = name,
                              timesteps = timestep,
                              lookback_period = lookback,
                              trading_cost_ratio = trading_cost,
                              returns = reward,
                              retain_cash = retain_cash,
                              **self.trainer_kwargs)

                        env = trainer.get_train_env()
                                         
                        if model_name in ['TD3']:
                            model = TD3Agent(env, **self.model_kwargs)
                        elif model_name in ['NAF']:
                            model = NAFAgent(env, **self.model_kwargs)
                        elif model_name in ['PPO']:
                            model = PPO(env, **self.model_kwargs)
                        elif model_name in ['TRPO']:
                            #env = SubprocVecEnv([env  for i in range(4)])
                            model = TRPOAgent(env, **self.model_kwargs)
                        elif model_name in ['DDPG']:
                            model = DDPGAgent(env, **self.model_kwargs)
                        elif model_name in ['REINFORCE']:
                            model = REINFORCEAgent(env, **self.model_kwargs)
                        elif model_name in ['SAC']:
                            model = SACAgent(env, **self.model_kwargs)
                        elif model_name in ['A2C']:
                            model = A2CAgent(env, **self.model_kwargs)
                        elif model_name in ['STB-TD3']:
                            n_actions = env.action_space.shape[-1]
                            action_noise = NormalActionNoise(mean=np.zeros(n_actions), sigma=0.1 * np.ones(n_actions))
                            model = STBTD3("MlpPolicy", env, action_noise=action_noise, verbose=1, **self.model_kwargs)
                        elif model_name in ['STB-SAC']:
                            model = STBSAC("MlpPolicy", env, verbose=1, **self.model_kwargs)
                        elif model_name in ['STB-PPO']:
                            #env = SubprocVecEnv([env  for i in range(4)])
                            model = STBPPO("MlpPolicy", env, verbose=1)
                        elif model_name in ['STB-A2C']:
                            #env = SubprocVecEnv([env  for i in range(4)])
                            model = STBA2C("MlpPolicy", env, verbose=1)
                        elif model_name in ['STB-DDPG']:
                            n_actions = env.action_space.shape[-1]
                            action_noise = NormalActionNoise(mean=np.zeros(n_actions), sigma=0.1 * np.ones(n_actions))
                            model = STBDDPG("MlpPolicy", env, action_noise=action_noise, verbose=1, **self.model_kwargs)
                        else:
                            assert False, 'Wrong Name Passed In!'

                        returns = trainer.run(model)

                        log_rets = 0

                        for ret in returns:
                                log_rets += log_to_simple(sum([simple_to_log(i[0]) for i in ret]))

                        log_rets /= len(returns)
                        n = name.split('/')[-1]
                        print(f'{n} : {log_rets}')

Methods

def run(self, model_name, dataset=None)
Expand source code
def run(self, model_name, dataset = None):
    if dataset is None:
        dataset = 'DOW30'
    
    if self.timesteps is None:
        timesteps = [10_000, 100_000]
    else:
        timesteps = self.timesteps
    
    rewards = [True] #[True, False]
    trading_costs = [0, 0.001, 0.01]
    retain_cash = False
    lookback = 64
    
    if model_name in ['MPT', 'Uniform', 'Random', 'BuyAndHold']:
        for trading_cost in trading_costs:
            path = f'./Baseline_{dataset}_Results/'  
            isExist = os.path.exists(path)
            if not isExist:
                os.makedirs(path) 
            name = f'{path}{model_name}-Trading_cost-{trading_cost}'
            
            if model_name == 'Random':
                self.trainer_kwargs['test_runs'] = 100

            trainer = Trainer(filepath = f'data/{dataset}.csv',
                          experiment_name = name,
                          timesteps = -1,
                          lookback_period = lookback,
                          trading_cost_ratio = trading_cost,
                          **self.trainer_kwargs)

            env = trainer.get_train_env()
            model = BaselineAgent(model_name, env, **self.model_kwargs)
                
            returns = trainer.run(model)

            log_rets = 0

            for ret in returns:
                    log_rets += log_to_simple(sum([simple_to_log(i[0]) for i in ret]))

            log_rets /= len(returns)
            n = name.split('/')[-1]
            print(f'{n} : {log_rets}')

    else:
        for reward in rewards:
            for timestep in timesteps:
                for trading_cost in trading_costs:                                            
                    if reward:
                        strng = 'LogRets'
                    else:
                        strng = 'ShRt'

                    path = f'./{model_name}_{dataset}_Results/'
                    isExist = os.path.exists(path)
                    if not isExist:
                        os.makedirs(path)                       

                    name = f'{path}Reward-{strng}_timestep-{timestep}_trading_cost-{trading_cost}'

                    trainer = Trainer(filepath = f'data/{dataset}.csv',
                          experiment_name = name,
                          timesteps = timestep,
                          lookback_period = lookback,
                          trading_cost_ratio = trading_cost,
                          returns = reward,
                          retain_cash = retain_cash,
                          **self.trainer_kwargs)

                    env = trainer.get_train_env()
                                     
                    if model_name in ['TD3']:
                        model = TD3Agent(env, **self.model_kwargs)
                    elif model_name in ['NAF']:
                        model = NAFAgent(env, **self.model_kwargs)
                    elif model_name in ['PPO']:
                        model = PPO(env, **self.model_kwargs)
                    elif model_name in ['TRPO']:
                        #env = SubprocVecEnv([env  for i in range(4)])
                        model = TRPOAgent(env, **self.model_kwargs)
                    elif model_name in ['DDPG']:
                        model = DDPGAgent(env, **self.model_kwargs)
                    elif model_name in ['REINFORCE']:
                        model = REINFORCEAgent(env, **self.model_kwargs)
                    elif model_name in ['SAC']:
                        model = SACAgent(env, **self.model_kwargs)
                    elif model_name in ['A2C']:
                        model = A2CAgent(env, **self.model_kwargs)
                    elif model_name in ['STB-TD3']:
                        n_actions = env.action_space.shape[-1]
                        action_noise = NormalActionNoise(mean=np.zeros(n_actions), sigma=0.1 * np.ones(n_actions))
                        model = STBTD3("MlpPolicy", env, action_noise=action_noise, verbose=1, **self.model_kwargs)
                    elif model_name in ['STB-SAC']:
                        model = STBSAC("MlpPolicy", env, verbose=1, **self.model_kwargs)
                    elif model_name in ['STB-PPO']:
                        #env = SubprocVecEnv([env  for i in range(4)])
                        model = STBPPO("MlpPolicy", env, verbose=1)
                    elif model_name in ['STB-A2C']:
                        #env = SubprocVecEnv([env  for i in range(4)])
                        model = STBA2C("MlpPolicy", env, verbose=1)
                    elif model_name in ['STB-DDPG']:
                        n_actions = env.action_space.shape[-1]
                        action_noise = NormalActionNoise(mean=np.zeros(n_actions), sigma=0.1 * np.ones(n_actions))
                        model = STBDDPG("MlpPolicy", env, action_noise=action_noise, verbose=1, **self.model_kwargs)
                    else:
                        assert False, 'Wrong Name Passed In!'

                    returns = trainer.run(model)

                    log_rets = 0

                    for ret in returns:
                            log_rets += log_to_simple(sum([simple_to_log(i[0]) for i in ret]))

                    log_rets /= len(returns)
                    n = name.split('/')[-1]
                    print(f'{n} : {log_rets}')