diff --git a/cvxportfolio/simulator.py b/cvxportfolio/simulator.py index 59e2b3360..9026e668b 100644 --- a/cvxportfolio/simulator.py +++ b/cvxportfolio/simulator.py @@ -50,10 +50,11 @@ import numpy as np import pandas as pd -try: # pragma: no cover + +try: # pragma: no cover from multiprocess import Lock, Pool # pylint: disable=no-name-in-module except ImportError: - from multiprocessing import Lock, Pool + from multiprocessing import Lock, Pool, cpu_count from .cache import _load_cache, _mp_init, _store_cache from .costs import StocksHoldingCost, StocksTransactionCost @@ -61,10 +62,11 @@ from .result import BacktestResult PPY = 252 -__all__ = ['StockMarketSimulator', 'MarketSimulator'] +__all__ = ["StockMarketSimulator", "MarketSimulator"] logger = logging.getLogger(__name__) + class MarketSimulator: """This class is a generic financial market simulator. @@ -155,36 +157,45 @@ class MarketSimulator: """ # pylint: disable=too-many-arguments - def __init__(self, universe=(), returns=None, volumes=None, - prices=None, market_data=None, costs=(), round_trades=False, - max_fraction_liquidity=None, - datasource='YahooFinance', - cash_key="USDOLLAR", - base_location=BASE_LOCATION, - min_history=pd.Timedelta('365.24d'), - trading_frequency=None, - backtest_result_cls=BacktestResult): + def __init__( + self, + universe=(), + returns=None, + volumes=None, + prices=None, + market_data=None, + costs=(), + round_trades=False, + max_fraction_liquidity=None, + datasource="YahooFinance", + cash_key="USDOLLAR", + base_location=BASE_LOCATION, + min_history=pd.Timedelta("365.24d"), + trading_frequency=None, + backtest_result_cls=BacktestResult, + ): """Initialize the Simulator and download data if necessary.""" self.base_location = Path(base_location) - if not market_data is None: + if market_data is not None: self.market_data = market_data else: - if (len(universe) == 0) and prices is None: if round_trades: raise SyntaxError( - "If you don't specify prices you can't request " - + "`round_trades`.") + "If you don't specify prices you can't request " + "`round_trades`." + ) if len(universe) == 0: self.market_data = UserProvidedMarketData( returns=returns, - volumes=volumes, prices=prices, + volumes=volumes, + prices=prices, cash_key=cash_key, base_location=base_location, min_history=min_history, - trading_frequency=trading_frequency) + trading_frequency=trading_frequency, + ) else: self.market_data = DownloadedMarketData( universe=universe, @@ -192,16 +203,17 @@ def __init__(self, universe=(), returns=None, volumes=None, base_location=base_location, min_history=min_history, trading_frequency=trading_frequency, - datasource=datasource) + datasource=datasource, + ) self.round_trades = round_trades if max_fraction_liquidity is not None: - if (max_fraction_liquidity <= 0.) or ( - max_fraction_liquidity > 1.): + if (max_fraction_liquidity <= 0.0) or (max_fraction_liquidity > 1.0): raise SyntaxError( "max_fraction_liquidity should be a positive number" - + " lesser or equal than 1.0.") + + " lesser or equal than 1.0." + ) self.max_fraction_liquidity = max_fraction_liquidity @@ -220,8 +232,17 @@ def _round_trade_vector(u, current_prices): # pylint: disable=too-many-arguments def simulate( - self, t, t_next, h, policy, past_returns, current_returns, - past_volumes, current_volumes, current_prices): + self, + t, + t_next, + h, + policy, + past_returns, + current_returns, + past_volumes, + current_volumes, + current_prices, + ): """Get next portfolio and statistics used by Backtest for reporting. The signature of this method differs from other estimators @@ -253,18 +274,20 @@ def simulate( # translate to weights current_portfolio_value = sum(h) - logger.info( - 'Portfolio value at time %s: %s', t, current_portfolio_value) + logger.info("Portfolio value at time %s: %s", t, current_portfolio_value) current_weights = pd.to_numeric(h / current_portfolio_value) # evaluate the policy s = time.time() - logger.info('Evaluating the policy at time %s', t) + logger.info("Evaluating the policy at time %s", t) policy_w = policy.values_in_time_recursive( - t=t, current_weights=current_weights, + t=t, + current_weights=current_weights, current_portfolio_value=current_portfolio_value, - past_returns=past_returns, past_volumes=past_volumes, - current_prices=current_prices) + past_returns=past_returns, + past_volumes=past_volumes, + current_prices=current_prices, + ) z = policy_w - current_weights @@ -274,37 +297,43 @@ def simulate( z.iloc[-1] = -sum(z.iloc[:-1]) # only in Python < 3.12 https://github.com/python/cpython/issues/111933 - if sys.version.split(' ', maxsplit=1)[0] < '3.12': - assert sum(z) == 0. + if sys.version.split(" ", maxsplit=1)[0] < "3.12": + assert sum(z) == 0.0 # trades in dollars u = z * current_portfolio_value # zero out trades on stock that weren't trading on that day - if not current_volumes is None: + if current_volumes is not None: non_tradable_stocks = current_volumes[current_volumes <= 0].index if len(non_tradable_stocks): logger.info( "At time %s %s canceled trades on assets %s" + " because their market volumes for the period are zero.", - t, self.__class__.__name__, non_tradable_stocks) - u[non_tradable_stocks] = 0. + t, + self.__class__.__name__, + non_tradable_stocks, + ) + u[non_tradable_stocks] = 0.0 if self.max_fraction_liquidity is not None: - capped_stocks = ( - np.abs(u.iloc[:-1]) - > current_volumes * self.max_fraction_liquidity) + capped_stocks = np.abs(u.iloc[:-1]) > current_volumes * self.max_fraction_liquidity capped_stocks = u.index[:-1][capped_stocks] if len(capped_stocks): logger.info( "At time %s %s capped trades on assets %s" + " because their trades exceedes %s of the available" - + " liquidity.", t, self.__class__.__name__, - capped_stocks, self.max_fraction_liquidity) + + " liquidity.", + t, + self.__class__.__name__, + capped_stocks, + self.max_fraction_liquidity, + ) u[capped_stocks] = ( np.sign(u[capped_stocks]) * current_volumes[capped_stocks] - * self.max_fraction_liquidity) + * self.max_fraction_liquidity + ) # round trades if self.round_trades: @@ -314,25 +343,33 @@ def simulate( u.iloc[-1] = -sum(u.iloc[:-1]) # only in Python < 3.12 https://github.com/python/cpython/issues/111933 - if sys.version.split(' ', maxsplit=1)[0] < '3.12': - assert sum(u) == 0. + if sys.version.split(" ", maxsplit=1)[0] < "3.12": + assert sum(u) == 0.0 # compute post-trade holdings (including cash balance) h_plus = h + u # evaluate cost functions - realized_costs = {cost.__class__.__name__: getattr(cost, + realized_costs = { + cost.__class__.__name__: getattr( + cost, # to support interface before 1.2.0 - 'simulate_recursive' if hasattr(cost, 'simulate_recursive') - else 'simulate')( - t=t, u=u, h_plus=h_plus, past_volumes=past_volumes, - current_volumes=current_volumes, past_returns=past_returns, - current_returns=current_returns, - current_prices=current_prices, - current_weights=current_weights, - current_portfolio_value=current_portfolio_value, - t_next=t_next) - for cost in self.costs} + "simulate_recursive" if hasattr(cost, "simulate_recursive") else "simulate", + )( + t=t, + u=u, + h_plus=h_plus, + past_volumes=past_volumes, + current_volumes=current_volumes, + past_returns=past_returns, + current_returns=current_returns, + current_prices=current_prices, + current_weights=current_weights, + current_portfolio_value=current_portfolio_value, + t_next=t_next, + ) + for cost in self.costs + } # initialize tomorrow's holdings h_next = pd.Series(h_plus, copy=True) @@ -342,101 +379,107 @@ def simulate( # multiply positions (including cash) by market returns assert not np.any(current_returns.isnull()) - h_next *= (1 + current_returns) + h_next *= 1 + current_returns return h_next, z, u, realized_costs, policy_time def _get_initialized_policy(self, orig_policy, universe, trading_calendar): - # TODO: more testing to make sure we don't need this any more # policy = copy.deepcopy(orig_policy) policy = orig_policy # caching will be handled here - policy.initialize_estimator_recursive( - universe=universe, trading_calendar=trading_calendar) + policy.initialize_estimator_recursive(universe=universe, trading_calendar=trading_calendar) # we also initialize cost objects for cost in self.costs: - if hasattr(cost, 'initialize_estimator_recursive'): + if hasattr(cost, "initialize_estimator_recursive"): # to support interface before 1.2.0 cost.initialize_estimator_recursive( - universe=universe, trading_calendar=trading_calendar) + universe=universe, trading_calendar=trading_calendar + ) # TODO: this will be handled by initialize_estimator of the forecasters # if policy uses a cache load it from disk - if hasattr(policy, '_cache'): - logger.info('Trying to load cache from disk...') + if hasattr(policy, "_cache"): + logger.info("Trying to load cache from disk...") policy._cache = _load_cache( - signature=self.market_data.partial_universe_signature(universe), - base_location=self.base_location) + signature=self.market_data.partial_universe_signature(universe), + base_location=self.base_location, + ) return policy def _finalize_policy(self, policy, universe): - # TODO: this will be handled by finalize_estimator of the forecasters # save cache to disk - if hasattr(policy, '_cache'): - logger.info('Storing cache from policy to disk...') + if hasattr(policy, "_cache"): + logger.info("Storing cache from policy to disk...") _store_cache( - cache=policy._cache, - signature=self.market_data.partial_universe_signature(universe), - base_location=self.base_location) + cache=policy._cache, + signature=self.market_data.partial_universe_signature(universe), + base_location=self.base_location, + ) policy.finalize_estimator_recursive() for cost in self.costs: - if hasattr(cost, 'finalize_estimator_recursive'): + if hasattr(cost, "finalize_estimator_recursive"): # to support interface before 1.2.0 - cost.finalize_estimator_recursive() # currently unused + cost.finalize_estimator_recursive() # currently unused def _backtest(self, policy, start_time, end_time, h): """Run a backtest with changing universe.""" timer = time.time() - trading_calendar = self.market_data.trading_calendar( - start_time, end_time, include_end=True) + trading_calendar = self.market_data.trading_calendar(start_time, end_time, include_end=True) universe = self.market_data.universe_at_time(trading_calendar[0]) used_policy = self._get_initialized_policy( - policy, universe=universe, trading_calendar=trading_calendar) + policy, universe=universe, trading_calendar=trading_calendar + ) with self.backtest_result_cls( - universe=universe, trading_calendar=trading_calendar, - costs=self.costs) as result: - + universe=universe, trading_calendar=trading_calendar, costs=self.costs + ) as result: for t, t_next in zip(trading_calendar[:-1], trading_calendar[1:]): - md_timer = time.time() served = self.market_data.serve(t) market_data_time = time.time() - md_timer - past_returns, current_returns, past_volumes, current_volumes, \ - current_prices = served[:5] + ( + past_returns, + current_returns, + past_volumes, + current_volumes, + current_prices, + ) = served[:5] current_universe = current_returns.index if not current_universe.equals(h.index): - self._finalize_policy(used_policy, h.index) h = self._adjust_h_new_universe(h, current_universe) used_policy = self._get_initialized_policy( - policy, universe=current_universe, - trading_calendar=trading_calendar[ - trading_calendar >= t]) + policy, + universe=current_universe, + trading_calendar=trading_calendar[trading_calendar >= t], + ) h_next, z, u, realized_costs, policy_time = self.simulate( - t=t, h=h, policy=used_policy, + t=t, + h=h, + policy=used_policy, t_next=t_next, past_returns=past_returns, current_returns=current_returns, past_volumes=past_volumes, current_volumes=current_volumes, - current_prices=current_prices) + current_prices=current_prices, + ) - if hasattr(used_policy, 'benchmark'): + if hasattr(used_policy, "benchmark"): w_bm = used_policy.benchmark.current_value bm_ret = w_bm @ current_returns else: @@ -446,24 +489,28 @@ def _backtest(self, policy, start_time, end_time, h): timer = time.time() - result.log_trading(t=t, h=h, z=z, u=u, costs=realized_costs, - policy_time=policy_time, - simulator_time=simulator_time, - market_data_time = market_data_time, - cash_return=current_returns.iloc[-1], - benchmark_return=bm_ret) + result.log_trading( + t=t, + h=h, + z=z, + u=u, + costs=realized_costs, + policy_time=policy_time, + simulator_time=simulator_time, + market_data_time=market_data_time, + cash_return=current_returns.iloc[-1], + benchmark_return=bm_ret, + ) h = h_next - if sum(h) <= 0.: # bankruptcy - logger.warning( - 'Back-test ended in bankruptcy at time %s!', t) + if sum(h) <= 0.0: # bankruptcy + logger.warning("Back-test ended in bankruptcy at time %s!", t) break self._finalize_policy(used_policy, h.index) - result.log_final(t, t_next, h, - extra_simulator_time=time.time() - timer) + result.log_final(t, t_next, h, extra_simulator_time=time.time() - timer) return result @@ -501,7 +548,7 @@ def _adjust_h_new_universe(self, h, new_universe): assert new_universe[-1] == h.index[-1] intersection = pd.Index(set(new_universe).intersection(h.index)) - new_h = pd.Series(0., new_universe) + new_h = pd.Series(0.0, new_universe) new_h[intersection] = h[intersection] new_assets = pd.Index(set(new_universe).difference(h.index)) @@ -514,7 +561,10 @@ def _adjust_h_new_universe(self, h, new_universe): logger.info( "Adjusting h vector by removing assets %s." + " Their current market value of %s is added" - + " to the cash account.", remove_assets, total_liquidation) + + " to the cash account.", + remove_assets, + total_liquidation, + ) new_h.iloc[-1] += total_liquidation return new_h @@ -524,8 +574,16 @@ def _worker(policy, simulator, start_time, end_time, h): return simulator._backtest(policy, start_time, end_time, h) # pylint: disable=too-many-arguments - def optimize_hyperparameters(self, policy, start_time=None, end_time=None, - initial_value=1E6, h=None, objective='sharpe_ratio', parallel=True): + def optimize_hyperparameters( + self, + policy, + start_time=None, + end_time=None, + initial_value=1e6, + h=None, + objective="sharpe_ratio", + parallel=True, + ): """Optimize hyperparameters to maximize back-test objective. :param policy: Trading policy with symbolic hyperparameters. @@ -565,20 +623,20 @@ def modify_orig_policy(target_policy): results = {} - current_result = self.backtest(policy, start_time=start_time, - end_time=end_time, - initial_value=initial_value, h=h) + current_result = self.backtest( + policy, start_time=start_time, end_time=end_time, initial_value=initial_value, h=h + ) current_objective = getattr(current_result, objective) results[str(policy)] = current_objective for i in range(100): - print('iteration', i) + print("iteration", i) # print('Current optimal hyper-parameters:') # print(policy) - logger.info('Current policy: %s', policy) - print('Current objective:') + logger.info("Current policy: %s", policy) + print("Current objective:") print(current_objective) # print() # print('Current result:') @@ -605,13 +663,16 @@ def modify_orig_policy(target_policy): if len(test_policies) == 0: break - results_partial = self.backtest_many(test_policies, - start_time=start_time, end_time=end_time, + results_partial = self.backtest_many( + test_policies, + start_time=start_time, + end_time=end_time, initial_value=initial_value, - h=h, parallel=parallel) + h=h, + parallel=parallel, + ) - objectives_partial = [getattr(res, objective) - for res in results_partial] + objectives_partial = [getattr(res, objective) for res in results_partial] for pol, obje in zip(test_policies, objectives_partial): results[str(pol)] = obje @@ -629,9 +690,7 @@ def modify_orig_policy(target_policy): return policy # pylint: disable=too-many-arguments - def backtest( - self, policy, start_time=None, end_time=None, initial_value=1E6, - h=None): + def backtest(self, policy, start_time=None, end_time=None, initial_value=1e6, h=None): """Back-test a trading policy. The default initial portfolio is all cash, or you can pass any @@ -659,17 +718,28 @@ def backtest( :rtype: :class:`cvxportfolio.BacktestResult` """ return self.backtest_many( - [policy], start_time=start_time, end_time=end_time, - initial_value=initial_value, h=None if h is None else [h], - parallel=False)[0] + [policy], + start_time=start_time, + end_time=end_time, + initial_value=initial_value, + h=None if h is None else [h], + parallel=False, + )[0] # Alias to match original syntax run_backtest = backtest # pylint: disable=too-many-arguments def backtest_many( - self, policies, start_time=None, end_time=None, initial_value=1E6, - h=None, parallel=True): + self, + policies, + start_time=None, + end_time=None, + initial_value=1e6, + h=None, + parallel=True, + n_processes=None, + ): """Back-test many trading policies. The default initial portfolio is all cash, or you can pass any @@ -703,6 +773,8 @@ def backtest_many( It is good practice to define the market simulator and execute this inside a ``if __name__ == '__main__:'`` clause, if in a script. :type parallel: bool + :param n_processes: Number of processes to use (for parallel). + :type n_processes: int or None :raises SyntaxError: If the lenghts of objects passed don't match, .... :raises ValueError: If there are no trading days between the provided @@ -714,34 +786,33 @@ def backtest_many( :rtype: list """ - if not hasattr(policies, '__len__'): - raise SyntaxError('You should pass a list of policies.') + if not hasattr(policies, "__len__"): + raise SyntaxError("You should pass a list of policies.") - if not hasattr(h, '__len__'): + if not hasattr(h, "__len__"): h = [h] * len(policies) if not len(policies) == len(h): raise SyntaxError( - 'If passing lists of policies and initial portfolios' - + 'they must have the same length.') + "If passing lists of policies and initial portfolios" + + "they must have the same length." + ) if start_time is not None: start_time = pd.Timestamp(start_time) if start_time.tz is None: - start_time = start_time.tz_localize( - self.market_data.trading_calendar().tz) + start_time = start_time.tz_localize(self.market_data.trading_calendar().tz) if end_time is not None: end_time = pd.Timestamp(end_time) if end_time.tz is None: - end_time = end_time.tz_localize( - self.market_data.trading_calendar().tz) + end_time = end_time.tz_localize(self.market_data.trading_calendar().tz) trading_calendar_inclusive = self.market_data.trading_calendar( - start_time, end_time, include_end=True) + start_time, end_time, include_end=True + ) if len(trading_calendar_inclusive) < 1: - raise ValueError( - 'There are no trading days between the provided times.') + raise ValueError("There are no trading days between the provided times.") start_time = trading_calendar_inclusive[0] end_time = trading_calendar_inclusive[-1] initial_universe = self.market_data.universe_at_time(start_time) @@ -752,24 +823,27 @@ def backtest_many( if sorted(single_h.index) != sorted(initial_universe): raise ValueError( "Holdings provided don't match the universe" - " implied by the market data server.") + " implied by the market data server." + ) h[i] = single_h[initial_universe] # initialize policies and get initial portfolios for i in range(len(policies)): if h[i] is None: - h[i] = pd.Series(0., initial_universe) + h[i] = pd.Series(0.0, initial_universe) h[i].iloc[-1] = initial_value n = len(policies) - zip_args = zip(policies, [self] * n, - [start_time] * n, [end_time] * n, h) + zip_args = zip(policies, [self] * n, [start_time] * n, [end_time] * n, h) if (not parallel) or len(policies) == 1: result = list(starmap(self._worker, zip_args)) else: - with Pool(initializer=_mp_init, initargs=(Lock(),)) as p: + pool_kwargs = dict(initializer=_mp_init, initargs=(Lock(),)) + if n_processes: + pool_kwargs["processes"] = n_processes + with Pool(**pool_kwargs) as p: result = p.starmap(self._worker, zip_args) return list(result) @@ -821,15 +895,24 @@ class StockMarketSimulator(MarketSimulator): :type kwargs: dict """ - def __init__(self, universe=(), - costs=(StocksTransactionCost, StocksHoldingCost), - round_trades=True, - max_fraction_liquidity=0.05, - cash_key="USDOLLAR", base_location=BASE_LOCATION, - trading_frequency=None, **kwargs): - - super().__init__(universe=universe, - costs=costs, round_trades=round_trades, - max_fraction_liquidity=max_fraction_liquidity, - cash_key=cash_key, base_location=base_location, - trading_frequency=trading_frequency, **kwargs) + def __init__( + self, + universe=(), + costs=(StocksTransactionCost, StocksHoldingCost), + round_trades=True, + max_fraction_liquidity=0.05, + cash_key="USDOLLAR", + base_location=BASE_LOCATION, + trading_frequency=None, + **kwargs + ): + super().__init__( + universe=universe, + costs=costs, + round_trades=round_trades, + max_fraction_liquidity=max_fraction_liquidity, + cash_key=cash_key, + base_location=base_location, + trading_frequency=trading_frequency, + **kwargs + )