Optimise la recherche des meilleurs paramètres
Avant-propos
moyennes mobiles, RSI, MACD, ADX
, etc.
import pandas_ta as ta
... puis dans une méthode de la classe, par exemple dans add_indics()
, d'exécuter ce code : help(ta.rsi)
:
help(ta.rsi)
La doc est affichée : On voit dans la partie Args : close, length, scalar, talib, drift, offset. Ils ont tous des valeurs par défaut.
L'argument close
(de type str) indique le nom de la colonne du DataFrame associé à pandas_ta qui sera utilisée comme signal à traiter.
Sa valeur par défaut est 'Close', mais tout autre nom de colonne existant dans le DataFrame peut être utilisé.
Exemples des principaux arguments numériques (entre parenthèses, la valeur par défaut) pour quelques indicateurs :
geneticalgorithm2
, depuis Pycharm.Description
4 classes, codées dans les 4 fichiers ci-après
GeneticAlgorithm
étend la classe BackTest
par des méthodes spécifiques à l'algorithme génétique./trading/strategies/genetic.py
:
""" Version 2022-04-02 - Algorithme génétique. https://pypi.org/project/geneticalgorithm2/ v6.5.0 """
# Imports externes
import copy
import numpy as np
from geneticalgorithm2 import geneticalgorithm2 as ga
# Imports internes
from functions.utils import Utils, Dictionary
from trading.strategies.backtest import BackTest
# noinspection PyUnresolvedReferences
class GeneticAlgorithm(BackTest):
def __init__(self, mode):
super().__init__()
self.df_pilot = None
self.mode = mode
self.d_indics = dict()
self.odl_params = self._odl_params
self.l_boundaries = self._l_boundaries
self.args = None
self.chromosome = None
self.best = 10**8
@property
def _odl_params(self):
""" - Super-dictionnaire des listes de paramètres possibles pour la stratégie.
- Les chromosomes seront créés par tirage au sort dans ces listes. """
od_indics = self.central_args(['indics'])
d_bounds = dict()
for l_keys in od_indics.key_list():
if len(l_keys) > 2 and l_keys[1] == 'ga_bounds':
key = f'{l_keys[0]}.params.{l_keys[2]}'
d_bounds[key] = od_indics.read(l_keys)
if not d_bounds:
raise SystemExit("Le dictionnaire central (self.central_args())"
" doit contenir des clés ['indic', *, 'ga__bounds'].")
""" Exemple : d_bounds =
ma1__length: [60, 100, 5] ---------------------------------- <class 'list'>
ma2__length: [1.01, 3, .2, func1, 'ma1.length'] ------------ <class 'list'> """
odl_params = Dictionary()
for key, l_val in d_bounds.items():
l_vals = list()
val = l_val[0]
while val <= l_val[1]:
l_vals.append(val)
val += l_val[2]
odl_params.write([key, 'l_vals'], l_vals)
if len(l_val) > 3:
if l_val[3].__class__.__name__ != 'function':
raise SystemExit(f"Dictionnaire self.central_args(['ga', 'bounds', '{key}']) :"
f"\nLe 4ème élément de la liste '{key}[]' doit être une fonction.")
odl_params.write([key, 'add'], l_val[3:])
""" Exemple : odl_params =
ma1__length: ----------------------------------------------- <class 'dict'>
l_vals: [60, 65, 70, 75, 80, 85, 90, 95, 100] ---------- <class 'list'>
ma2__length: ----------------------------------------------- <class 'dict'>
l_vals: [1.01, 1.21, 1.41, 1.61, 1.81, 2.01, 2.21, 2.41, 2.61, 2.81] <class 'list'>
add: ['ma1__length'] ----------------------------------- <class 'list'> """
return odl_params
@property
def _l_boundaries(self):
l_boundaries = list()
odl_params = self.odl_params
for param_key in odl_params:
l_boundaries.append((0, len(self.odl_params.read([param_key, 'l_vals'])) - 1))
return l_boundaries
def add_indics(self, od_indics=None):
if od_indics is None:
""" Indicateurs utilisés dans cette stratégie. Voir ci-dessus central_args(), section 'indics'. """
od_indics = self.central_args('indics')
""" Signaux pour le calcul des gains, les marqueurs et l'affichage. """
for l_keys in od_indics.key_list():
if l_keys[-1] == 'function':
indic_key = l_keys[0]
function = od_indics.read(l_keys)
self.get_indic(function, **od_indics[indic_key])
b_backtest, b_ga = od_indics.read('backtest', True), od_indics.read('b_ga', False)
if b_ga or b_backtest:
cost = self.fitness(**od_indics)
if b_ga:
return cost
else:
""" Gains en pips et en monnaie (€ ou $). """
l_gains = ['Balance (pips)', 'Open (pips)', 'Equity (pips)', 'Balance (€)', 'Open (€)', 'Equity (€)']
self.df_pilot[l_gains] = cost
""" Affiche les statistiques du backtest."""
if od_indics.read('stats', True):
self.bt_stats()
def get_indic(self, indic_function, **all_params):
""" Obtention 'intelligente' des datas d'un indicateur, pour optimiser l'exécution de l'algo génétique (ga) ...
... en effet, lors de son exécution, le ga est ammené à demander plusieurs fois le même indicateur.
- Un indicateur est caractérisé par sa signature : nom de sa fonction et ses paramètres.
- Les datas obtenus de cet indicateur sont une 'Serie' (une col.) ou un 'DataFrame' (plusieurs cols.) pandas.
- Ces datas sont stockés dans une (plusieurs) colonne(s) de self.df_pilot.
- Si les datas demandés existent déjà dans self.df_pilot, on les renvoie directement (rapide).
- Sinon la fonction de l'indicateur est appelée, le résultat est stocké dans self.df_pilot, puis renvoyé (lent).
****************************** EXCEPTION - EXCEPTION - EXCEPTION ******************************
Cette méthode n'est pas prévue pour les indicateurs qui retournent un tuple au lieu d'un DataFrame ou Series.
Le cas échéant, seul l'élément [0] du tuple sera retourné, à condition que ce soit un élément Pandas.
Si les autres éléments du tuple sont nécessaires, utiliser directement la fonction native de pandas_ta.
"""
if indic_function.__module__ == '__main__':
""" Traitement personnalisé, dans la classe dérivée utilisatrice. """
indic_function() # Exécution de la fonction ou méthode.
return
d_params = all_params.get('params')
signature = '_'.join(['', indic_function.__name__] + [str(param) for param in d_params.values()])
series = all_params.get('series')
if signature in self.d_indics:
self.df_pilot[series] = self.df_pilot[self.d_indics[signature]]
else:
close = d_params.pop('close', 'Close')
try:
df = indic_function(self.df_pilot[close], **d_params)
except (Exception,) as err:
raise SystemExit(f"La colonne '{close}' n'existe pas dans le DataFrame pilote df_pilot.\n{err}")
if df.__class__.__name__ == 'tuple':
df = df[0]
if df.__class__.__name__ not in ['DataFrame', 'Series']:
raise SystemExit(
f"L'indicateur '{indic_function.__name__}' retourne un tuple et non un tableau Pandas."
f"\nCe tuple ne contient pas de DataFrame ou de Series."
f"\nVeuillez utiliser directement la fonction 'df.ta.{indic_function.__name__}(...)'.")
if df.__class__.__name__ == 'DataFrame':
self.d_indics[signature] = ['_' + col for col in list(df.columns)] # [_col1, _col2, ...]
""" Si le nb de cols demandées ne correspond pas au nb de cols fournies, correction automatique. """
if isinstance(series, str):
series = [series]
for i, serie in enumerate(series):
if serie == '_':
continue
self.df_pilot[serie] = df.iloc[:, i]
else: # Series
self.d_indics[signature] = signature
self.df_pilot[series] = df
self.df_pilot[self.d_indics[signature]] = df # Mémorisation unique.
def control_bounds(self, *l_args):
"""
@param l_args: <vide> ou liste des paramètres produits par l'algorithme génétique.
Si l_args est une liste non vide, la méthode affiche les meilleurs paramètres trouvés.
Sinon, elle affiche les listes de paramètres sélectionnables par l'algorithme.
@return: NA -> Affichage dans le terminal.
"""
la, lp = len(l_args), len(self.odl_params)
if la != 0 and la != lp:
l_args = []
Utils.printc(f"Le nombre de valeurs reçues ({la}) est différent du nombre de paramètres à tester ({lp}).")
""" Vérification des paramètres de l'algorithme génétique. """
od_args = Dictionary()
l_pop = list()
for i, param_key in enumerate(self.odl_params):
l_p = [round(param, 2) for param in self.odl_params.read([param_key, 'l_vals'])]
l_pop.append(len(l_p))
print(param_key, ':', l_p)
if l_args:
key_args = param_key.split('__')
j = int(l_args[i])
param = self.odl_params.read([param_key, 'l_vals'])[j]
""" Paramètre dépendant. """
l_add = self.odl_params.read([param_key, 'add'])
depend = ''
if l_add:
l_keys = l_add[1].split('__')
depend = f'{od_args.read(l_keys)} * {round(param, 2)} = '
param = l_add[0](od_args, [param] + l_add[1:])
od_args.write(key_args, param)
print(f"\tSelectionné -> {param_key}[{j}] = {depend}{round(param, 2)}")
if len(l_args) == 0:
n_prod, s_prod = 1, ''
for pop in l_pop:
s_prod += f' * {pop}'
n_prod *= pop
print(f"Espace de recherche : {s_prod[3:]} = {n_prod} chromosomes.")
def fitness_function(self, chromosome):
""" Méthode starter pour l'algorithme génétique. Appelée à chaque génération avec un chromosome différent. """
""" Création de od_args, exemple :
chromosome = [3, 1] # <-- chromosome aléatoire, généré par le modèle.
self.odl_params =
ma1__length: ----------------------------------------------- <class 'dict'>
l_vals: [60, 65, 70, 75, 80, 85, 90, 95, 100] ---------- <class 'list'>
ma2__length: ----------------------------------------------- <class 'dict'>
l_vals: [1.01, 1.21, 1.41, 1.61, 1.81, 2.01, 2.21, 2.41, 2.61, 2.81] <class 'list'>
add: ['ma1__length'] ----------------------------------- <class 'list'>
A partir de {chomosome} et de {self.odl_params}, on crée {od_args} :
od_args =
ma1: ------------------------------------------------------- <class 'dict'>
length: 40 --------------------------------------------- <class 'int'>
ma2: ------------------------------------------------------- <class 'dict'>
length: 150 -------------------------------------------- <class 'int'>
b_ga: True ------------------------------------------------- <class 'bool'>
cost: ------------------------------------------------------ <class 'dict'>
tp: 30 ------------------------------------------------- <class 'int'>
sl: 200 ------------------------------------------------ <class 'int'>
skip: 4 ------------------------------------------------ <class 'int'>
"""
od_args = Dictionary()
for i, key in enumerate(self.odl_params.keys()): # ex : key = 'ma1.params.length'
key_args = key.split('.')
l_vals = self.odl_params.read([key, 'l_vals'])
l_add = self.odl_params.read([key, 'add'], [])
param = l_vals[int(chromosome[i])]
if l_add:
""" Traitement conditionnel -> l_add = [fonction, arg{0}, arg{1}, ..., arg{n} ...] """
l_params = list()
for p in l_add[1:]:
l_params.append(p.replace('.', '.params.'))
param = l_add[0](od_args, [param] + l_params) # l_add[0] est une fonction.
od_args.write(key_args, round(param, 2))
self.chromosome = chromosome
self.args = copy.deepcopy(dict(od_args))
""" Création des indicateurs nécessaires, paramétrés. La fonction de coût est appelée dans add_indics() """
od_args.write('b_ga', True)
od_args.fusion(self.central_args('indics'))
cost = self.add_indics(od_args)
if cost is None:
raise SystemExit("Erreur :\nLa méthode add_indics() de la classe dérivée doit retourner une valeur.")
return cost
def cost(self, **d_params):
""" |_ Colonnes de self.np_trades : 0 : gains (>0 ou <0) clôturés, en pips (trades fermés). cumulé
1 : gains en cours d'évolution, en pips (trades ouverts). cumulé
2 : gains total (clôturés + en évolution), en pips. cumulé
3 : gains (>0 ou <0) clôturés, en monnaie (€ ou $). cumulé
4 : gains en cours d'évolution (€ ou $). cumulé
5 : gains total (clôturés + en évolution) (€ ou $). cumulé
6 : Flag d'ouverture (-1/+1), effacé à la fermeture.
7 : marks d'ouverture (-1/+1), permanent.
8 : indx de fermeture
9 : gain du trade, en pips
10 : gain du trade, en monnaie (€ ou $) """
if d_params.get('b_ga', False):
""" Ini. """
close_index_min = 10**8
""" Parcours des trades ouverts. """
l_open_indx = np.where(self.np_trades[:, 7] != 0)[0] # Liste des index d'ouverture de trades.
# |_ Cette liste d'index {l_open_indx[]} pointe la grande table numpy {self.np_trades}.
for open_indx in l_open_indx:
l_trade = self.np_trades[open_indx] # 11 colonnes : [0., 0., 0., 0., 0., 0., 0., 1., 1.054e+03, 0., 0.]
close_indx = int(l_trade[8]) # . |_ ici, 1054
close_index_min = min(close_index_min, close_indx)
self.np_trades[:, 9] = np.nancumsum(self.np_trades[:, 9])
gains_pips = float(self.np_trades[-1, 9])
g_max, drawdown = -10**8, -10**8
self.np_trades[:, 0] = np.nancumsum(self.np_trades[:, 0])
self.np_trades[:, 2] = self.np_trades[:, 0] + self.np_trades[:, 1]
for g in self.np_trades[:, 2]:
""" Formule : https://arya.xyz/blog/enseignement/drawdown """
g_max = max(g_max, g)
drawdown = max(drawdown, g_max - g)
""" Choix minimize : On cherche à obtenir la plus petite valeur possible pour {minimize}.
Voici différentes possibilités :
- Maximisation des gains (pips) : minimize = -gains_pips
- Minimisation du drawdown : minimize = drawdown
- Maximisation du nb de trades : minimize = -nb_trades
- Mix des 3 : minimize = ((coef_drawdown * drawdown) - gains_pips) / ratio
- Etc. """
""" Maximise seulement les gains. *********************************** """
# minimize = -gains_pips
""" Minimise seulement le drawdawn. ********************************* """
# minimize = drawdown
""" Mix des 3. ****************************************************** """
nb_trades = len(l_open_indx)
if nb_trades == 0:
minimize = 10**7
else:
coef_drawdown = 3 # Importance donnée au faible drawdown par rapport au fort gain.
ratio = 10 ** (nb_trades / len(self.np_datas)) # ratio de 1 à 10 (10**0 à 10**1)
minimize = ((coef_drawdown * drawdown) - gains_pips) / ratio
""" Fin choix minimize ***************************************************************** """
if minimize == 0:
minimize = 10**7
if minimize < self.best:
self.best = minimize
print(f"\nMinimisation de l'algo génétique :\t{round(minimize, 3)}"
f"\nChromosome :\t\t\t\t\t\t{self.chromosome}"
f"\nGains :\t\t\t\t\t\t\t\t{round(gains_pips, 2)} pips"
f"\nDrawdown :\t\t\t\t\t\t\t{round(float(drawdown), 2)} pips"
f"\nArguments :\t\t\t\t\t\t\t{self.args}")
""" L'algorithme génétique tend à minimiser le résultat. """
return minimize
else:
return super().cost(**d_params)
def ga(self, b_single_loop=False):
""" Code appelant : self.__init__() ci-dessus, mode 3 puis 4. """
if b_single_loop:
""" MAP ***** MAP : mode 3 ***** mode 3 ***** mode 3 ***** mode 3 ***** mode 3 ***** mode 3 """
""" Pendant l'algorithme génétique, la fitness_function() est appelée un grand nombre de fois.
- Ici, une seule fois, avec un chromosome tiré au hasard, afin de permettre sa mise au point. """
""" Simulation de création d'un chromosome par le modèle. Tirage au sort des index des paramètres. """
chromosome = [int(np.random.choice(np.linspace(*bounds, bounds[1]))) for bounds in self.l_boundaries]
""" Vérification du chromosome tiré au hasard. """
print()
[print(f"{param_key}[{chromosome[i]}] -> {d_vals['l_vals'][chromosome[i]]}")
for i, (param_key, d_vals) in enumerate(self.odl_params.items())]
""" Résultat. """
self.fitness_function(chromosome) # Chromosome aléatoire.
else:
""" Conditions réelles : mode 4 ***** mode 4 ***** mode 4 ***** mode 4 ***** mode 4 ***** mode 4 ***** """
""" Création du modèle de GA. """
model = self.get_model()
""" Exécution du modèle de GA. """
model.run()
def get_model(self):
""" Création du modèle de GA. """
od_ga = self.central_args('ga')
l_bounds = self.l_boundaries
dim = len(l_bounds)
d_algo = od_ga.read('algo')
return ga(
function=self.fitness_function,
dimension=dim,
variable_type='int',
variable_boundaries=l_bounds,
function_timeout=od_ga.read('function_timeout', 10),
algorithm_parameters={
'max_num_iteration': d_algo.get('max_num_iteration', 200),
'population_size': d_algo.get('population_size', 100),
'mutation_probability': d_algo.get('mutation_probability', .1),
'elit_ratio': d_algo.get('elit_ratio', .01),
'crossover_probability': d_algo.get('crossover_probability', .5),
'parents_portion': d_algo.get('parents_portion', .3),
'crossover_type': d_algo.get('crossover_type', 'uniform'),
'mutation_type': d_algo.get('mutation_type', 'uniform_by_center'),
'selection_type': d_algo.get('selection_type', 'roulette'),
'max_iteration_without_improv': d_algo.get('xxxx', None)
})
BackTest
, implémentée lors du tuto 'Partie 1'./trading/strategies/backtest.py
:
""" Version 2022-04-02 """
# Imports externes
import numpy as np
import pandas as pd
# Imports internes
from functions.utils import DateTime, Utils
# noinspection PyUnresolvedReferences, PyArgumentList
class BackTest:
def __init__(self):
self.l_columns = None
self.np_datas = None
self.nb_rows = 0
""" Statistiques. """
self.np_trades = None
self.close_cond = None # Fonction : condition de fermeture d'un trade.
self.df_marks = None # dataframe
self.d_cost = None # dictionary
def fitness(self, **d_args):
""" Code appelant : UI d'affichage. """
""" Élimination des colonnes Open, High, Low, ... ainsi que celles pour le ga (commençant par '_'). """
self.l_columns = ['Close'] + [col for col in list(self.df_pilot.columns[7:]) if not col.startswith('_')]
self.np_datas = self.df_pilot[self.l_columns].values
self.d_cost = d_args.get('cost', {}) # Arguments pour la fonction de coût.
self.nb_rows = self.np_datas.shape[0]
self.np_trades = np.zeros((self.nb_rows, 11), dtype=np.float32)
""" |_ Colonnes de self.np_trades : 0 : gains (>0 ou <0) clôturés, en pips (trades fermés). cumulé
1 : gains en cours d'évolution, en pips (trades ouverts). cumulé
2 : gains total (clôturés + en évolution), en pips. cumulé
3 : gains (>0 ou <0) clôturés, en monnaie (€ ou $). cumulé
4 : gains en cours d'évolution (€ ou $). cumulé
5 : gains total (clôturés + en évolution) (€ ou $). cumulé
6 : Flag d'ouverture (-1/+1), effacé à la fermeture.
7 : marks d'ouverture (-1/+1), permanent.
8 : indx de fermeture
9 : gain du trade, en pips
10 : gain du trade, en monnaie (€ ou $) """
indx = 0
while np.isnan(np.sum(self.np_datas[indx])):
""" Recherche de l'index de la 1ère ligne où aucune valeur n'est nan. """
indx += 1
if indx >= self.nb_rows - 1:
break
d_args['indx'] = indx # <-- Ajout de cette information dans les arguments.
return self.cost(**d_args)
def cost(self, **d_params):
"""
@param d_params: paramètres spécifiques à cette stratégie : stop-loss, take-profit, seuils divers, etc.
@return: dépend du code appelant -> (Calcul du gain + Marqueurs) ou (Process d'algorithme génétique).
- Si 'Gain + Marqueurs' : tuple de 2 DataFrames : (df_gains, df_marks).
- Si 'GA' : -1 * Gain. Le -1 s'explique par le fait que le ga cherche à minimiser le résultat.
"""
""" Calculs. """
spread = self.d_cost.get('spread', 2)
l_open_indx = np.where(self.np_trades[:, 7] != 0)[0] # Liste des index d'ouverture de trades.
ll_marks = list()
balance = self.d_cost.get('capital', 1000) # Capital initial.
self.np_trades[0, 3] = balance
risk = self.d_cost.get('risk', 1)
for open_indx in l_open_indx:
l_trade = self.np_trades[open_indx]
close_indx = int(l_trade[8])
if close_indx < open_indx:
continue
coef = l_trade[7] # +1 ou -1 <-- achat ou vente.
price = self.np_datas[open_indx, 0]
gains = (self.np_datas[close_indx, 0] - price) * coef - spread
np_gains = (self.np_datas[open_indx: close_indx, 0] - price) * coef
""" Currencies (€ ou $) clôturés. """
curr = gains * balance * risk * self.pips # Currencies (€ ou $).
self.np_trades[open_indx, 10] = curr
self.np_trades[close_indx, 3] += curr
balance += curr
""" Currencies (€ ou $) ouverts. """
self.np_trades[open_indx: close_indx, 4] += np_gains * balance * risk * self.pips
""" Marqueurs. """
op = 'achat' if l_trade[7] == 1 else 'vente'
ll_marks.append([open_indx, price, f'Ouvre {op}'])
close_indx = int(l_trade[8])
if close_indx > 0:
ll_marks.append([close_indx, self.np_datas[close_indx, 0], f'Ferme {op}'])
self.np_trades[:, 0] = np.nancumsum(self.np_trades[:, 0])
self.np_trades[:, 2] = self.np_trades[:, 0] + self.np_trades[:, 1]
self.np_trades[:, 3] = np.cumsum(self.np_trades[:, 3])
self.np_trades[:, 5] = self.np_trades[:, 3] + self.np_trades[:, 4]
""" Anti-chevauchement des marqueurs. """
ll_marks.sort()
i, indx_ante = 4, 0
for l_mark in ll_marks:
if l_mark[0] == indx_ante:
l_mark[1] += i
else:
indx_ante = l_mark[0]
self.df_marks = pd.DataFrame(data=ll_marks, columns=['Index', 'Close', 'Type'])
return pd.DataFrame(data=self.np_trades[:, :6],
columns=['Close_p', 'Open_p', 'All_p', 'Close_c', 'Open_c', 'All_c'])
def bt_stats(self):
""" bt_stats = Statistiques des backtests. """
dt = DateTime()
stamp_from = self.df_pilot.iloc[0, 0]
stamp_to = self.df_pilot.iloc[-1, 0]
nb_days = max(1, round((stamp_to - stamp_from) / 86_400))
_from = dt.get_dtstr_from_dtstamp(stamp_from, dt_format='%d/%m/%Y')
_to = dt.get_dtstr_from_dtstamp(stamp_to, dt_format='%d/%m/%Y')
capital_ini = round(float(self.np_trades[0, 5]), 2)
capital_min = round(float(np.min(self.np_trades[:, 5])), 2)
crashed = '← crashed !' if capital_min < 0 else ''
capital_max = round(float(np.max(self.np_trades[:, 5])), 2)
capital_final = round(float(self.np_trades[-1, 5]), 2)
perf = round(100 * (capital_final - capital_ini) / capital_ini, 2)
nb_buy = np.sum(self.np_trades[:, 7] == 1)
nb_sell = np.sum(self.np_trades[:, 7] == -1)
nb_winner = np.sum(self.np_trades[:, 9] > 0)
nb_loser = np.sum(self.np_trades[:, 9] < 0)
mask = self.np_trades[:, 10] != 0
l_trades = self.np_trades[:, 10][mask]
win_consec = lose_consec = win_consec_max = lose_consec_max = 0
score_win = score_lose = 0
if len(l_trades) > 0:
score_ante = l_trades[0]
for score in l_trades:
if score > 0:
score_win += score
else:
score_lose += score
if score > 0 and score_ante > 0:
win_consec += 1
win_consec_max = max(win_consec_max, win_consec)
elif score < 0 and score_ante < 0:
lose_consec += 1
lose_consec_max = max(lose_consec_max, lose_consec)
else:
win_consec = lose_consec = 0
score_ante = score
if nb_winner > 0:
score_win /= nb_winner
if nb_loser > 0:
score_lose /= nb_loser
pips_max_indx = np.argmax(self.np_trades[:, 9])
pips_max = round(float(self.np_trades[pips_max_indx, 9]), 2)
stamp = self.df_pilot.iloc[pips_max_indx, 0]
pips_max_dt = dt.get_dtstr_from_dtstamp(stamp, dt_format='%d/%m/%Y')
pips_max_indexes = f'index de {pips_max_indx} à {int(self.np_trades[pips_max_indx, 8])}'
pips_min_indx = np.argmin(self.np_trades[:, 9])
pips_min = round(float(self.np_trades[pips_min_indx, 9]), 2)
stamp = self.df_pilot.iloc[pips_min_indx, 0]
pips_min_dt = dt.get_dtstr_from_dtstamp(stamp, dt_format='%d/%m/%Y')
pips_min_indexes = f'index de {pips_min_indx} à {int(self.np_trades[pips_min_indx, 8])}'
curr_max_indx = np.argmax(self.np_trades[:, 10])
curr_max = round(float(self.np_trades[curr_max_indx, 10]), 2)
stamp = self.df_pilot.iloc[curr_max_indx, 0]
curr_max_dt = dt.get_dtstr_from_dtstamp(stamp, dt_format='%d/%m/%Y')
curr_max_indexes = f'index de {curr_max_indx} à {int(self.np_trades[curr_max_indx, 8])}'
curr_min_indx = np.argmin(self.np_trades[:, 10])
curr_min = round(float(self.np_trades[curr_min_indx, 10]), 2)
stamp = self.df_pilot.iloc[curr_min_indx, 0]
curr_min_dt = dt.get_dtstr_from_dtstamp(stamp, dt_format='%d/%m/%Y')
curr_min_indexes = f'index de {curr_min_indx} à {int(self.np_trades[curr_min_indx, 8])}'
g_max = -10**8
drawdown = 0
curr_dwn_indx = 0
for i, g in enumerate(self.np_trades[:, 2]):
g_max = max(g_max, g)
if (g_max - g) > drawdown:
curr_dwn_indx = i
drawdown = g_max - g
stamp = self.df_pilot.iloc[curr_dwn_indx, 0]
curr_dwn_dt = dt.get_dtstr_from_dtstamp(stamp, dt_format='%d/%m/%Y')
""" Contexte. """
total_pips = round(float(np.sum(self.np_trades[:, 9])), 2)
od_pilot = self.central_args('pilot')
instrument = od_pilot['instrument']
table = od_pilot.read('table')
pc_from = od_pilot.read(['test', 'pc_from'])
pc_to = od_pilot.read(['test', 'pc_to'])
nb_rows = self.df_pilot.shape[0]
typ = f'Renko {table}' if isinstance(table, int) else f'Candles-{table}' if isinstance(table, str) else 'Ticks'
if pc_from is None:
points = f"{nb_rows} points jusqu'à {pc_to}% de la table en base de données."
elif pc_to is None:
points = f"{nb_rows} points à partir de {pc_from}% de la table en base de données."
else:
points = f"{nb_rows} points entre {pc_from}% et {pc_to}% de la table en base de données."
result = "Statistiques :" \
"\n=============" \
f"\nContexte : {instrument} - {typ} - {points}" \
f"\nPériode : {_from} -> {_to}" \
f"\nNb jours: {nb_days}" \
f"\nGain journalier: {round((capital_final - capital_ini) / nb_days, 2)} €." \
f"\n\nCapital initial: {capital_ini} €" \
f"\nCapital min: {capital_min} € {crashed}" \
f"\nCapital max: {capital_max} €" \
f"\nCapital final: {capital_final} €" \
f"\nPerformance: {perf} %" \
f"\nGain en pips: {total_pips} pips" \
f"\n\nNb trades à l'achat: {nb_buy}" \
f"\nNb trades à la vente: {nb_sell}" \
f"\nNb trades gagnants: {nb_winner}" \
f"\nNb trades perdants: {nb_loser}" \
f"\nNb max gagnants consécutifs: {win_consec_max}" \
f"\nNb max perdants consécutifs: {lose_consec_max}" \
f"\n\nGain moyen trades gagnants: {round(score_win, 2)} €" \
f"\nGain moyen trades perdants: {round(score_lose, 2)} €" \
f"\nMeilleur trade (pips): {pips_max} pips, le {pips_max_dt} ({pips_max_indexes})" \
f"\nMeilleur trade (monnaie): {curr_max} €, le {curr_max_dt} ({curr_max_indexes})" \
f"\nPire trade (pips): {pips_min} pips, le {pips_min_dt} ({pips_min_indexes})" \
f"\nPire trade (monnaie): {curr_min} €, le {curr_min_dt} ({curr_min_indexes})" \
f"\nPerte max (drawdown): {round(drawdown)} pips le {curr_dwn_dt} (index {curr_dwn_indx})" \
"\n-------------------------------------------------------------------------------"
print(result)
def closures(self, close_indx):
""" Fermetures, options possibles :
- Modifier le code pour permettre des fermetures partielles.
|_ https://www.google.fr/search?q=trading+fermetures+partielles
- Modifier le code dans {close_cond()} de la classe dérivée pour permettre les stops suiveurs.
|_ https://www.google.fr/search?q=trading+stop+suiveur
"""
spread = self.d_cost.get('spread', 2)
b_end = close_indx >= self.nb_rows - 1 # Flag de fermeture de tous les résiduels.
for open_indx in np.where(self.np_trades[:, 6] != 0)[0]: # Liste des indx des trades ouverts (flags col 6).
""" Liste des indx trades ouverts. Le flag6 (col 6) est obligatoirement +1 ou -1. """
if b_end or self.close_cond(open_indx): # Fonction dans la classe dérivée.
self.np_trades[open_indx, 6] = 0 # RAZ flag.
self.np_trades[open_indx, 8] = close_indx # Index de fermeture
coef = self.np_trades[open_indx, 7]
price = self.np_datas[open_indx, 0]
gains = (self.np_datas[close_indx, 0] - price) * coef - spread
np_gains = (self.np_datas[open_indx: close_indx, 0] - price) * coef
self.np_trades[open_indx, 9] = gains # 9 : gain du trade, en pips
self.np_trades[close_indx, 0] += gains # 0 : gains (>0 ou <0) clôturés, en pips.
self.np_trades[open_indx: close_indx, 1] += np_gains # 1 : gains en cours d'évolution, en pips.
def opening(self, op, indx):
""" 2 flags d'ouverture (-1/+1) : col 6 -> effacé à la fermeture, col 7 -> permanent. """
self.np_trades[indx, 6: 8] = 1 if op == 'buy' else -1 # 1='buy', -1='sell'.
""" Voir colonnes de self.np_trades dans self.fitness() """
def cond(self, txt, indx):
""" Exemple : txt = 'AO-DRV cu 0' signifie : L'indicateur AO-DRV croise vers le haut (cross-up) la valeur 0.
Opérateurs disponibles : /, \\, --, v, ^, cu, cd, ==, !=, >, >=, <, <= """
def iscolumn(column_name):
if self.txt_is_number(column_name):
return True
if column_name in self.l_columns:
return True
Utils.printc(f"Classe GeneticAlgorithm, méthode 'cond()' :\nLa colonne '{column_name}' n'existe pas.")
return False
def col(col_name):
return self.l_columns.index(col_name)
l_c = txt.split() # Séparateur = ' ' (espace).
left = l_c[0] # str
op = l_c[1] # str
if not iscolumn(left):
return False
if len(l_c) == 2:
""" /, \\, --, v, ^ """
col = col(left)
if op in ['/', '\\', '--'] and indx > 0:
""" Pente positive (/), négative (\\) ou nulle(--). """
sign_1, sign_0 = self.np_datas[indx - 1, col], self.np_datas[indx, col]
return sign_0 > sign_1 if op == '/' else sign_0 == sign_1 if op == '--' else sign_0 < sign_1
elif op in ['v', '^'] and indx > 1:
""" Les 3 points précédents forment un creux (v) ou un sommet (^). """
sign_2, sign_1, sign_0 = (self.np_datas[indx - 2, col], self.np_datas[indx - 1, col],
self.np_datas[indx, col])
return sign_1 < sign_2 and sign_1 < sign_0 if op == 'v' else sign_1 > sign_2 and sign_1 > sign_0
pass
elif len(l_c) == 3:
""" cu, cd, ==, !=, >, >=, <, <= """
right = l_c[2]
if not iscolumn(right):
return False
if op in ['cu', 'cd']:
""" Cross up & cross down : Croisements à la hausse ou à la baisse. """
col_moved = left if self.txt_is_number(left) else col(left) # Valeur en str ou N° col.
col_ref = right if self.txt_is_number(right) else col(right)
way = 'up' if op == 'cu' else 'down'
return self.cross(way, indx, col_moved, col_ref)
elif op in ['==', '!=', '>', '>=', '<', '<=']:
""" Comparaisons. """
left = float(left) if self.txt_is_number(left) else self.np_datas[indx, col(left)]
right = float(right) if self.txt_is_number(right) else self.np_datas[indx, col(right)]
if op == '==':
return left == right
elif op == '!=':
return left != right
elif op == '>':
return left > right
elif op == '>=':
return left >= right
elif op == '<':
return left < right
else: # '<='
return left <= right
return False
@staticmethod
def txt_is_number(text):
""" {text} est une chaîne ET représente un nombre (int ou float). """
if isinstance(text, str):
try:
float(text)
return True # Est une chaîne ET représente un nombre => True.
except (Exception,):
return False # Est une chaîne mais ne représente pas un nombre => False.
else:
return False # N'est pas une chaîne => False.
def cross(self, way, indx, col_moved, col_ref):
if self.txt_is_number(col_ref):
ref_now = ref_ante = float(col_ref)
else:
ref_now, ref_ante = self.np_datas[indx, col_ref], self.np_datas[indx - 1, col_ref]
if self.txt_is_number(col_moved):
mov_now = mov_ante = int(col_moved)
else:
mov_now, mov_ante = self.np_datas[indx, col_moved], self.np_datas[indx - 1, col_moved]
if ref_now == mov_now:
return False
i = 2
while ref_ante == mov_ante:
ref_ante = float(col_ref) if self.txt_is_number(col_ref) else self.np_datas[indx - i, col_ref]
mov_ante = float(col_moved) if self.txt_is_number(col_moved) else self.np_datas[indx - i, col_moved]
i += 1
b_cross = (ref_now - mov_now) * (ref_ante - mov_ante) < 0
return (mov_now > ref_now) == (way == 'up') if b_cross else False
/show/show_geek.py
:
""" Version 2022-04-02 """
# Imports externes.
import numpy as np
import seaborn as sns
from matplotlib import pyplot as plt
from matplotlib.ticker import FormatStrFormatter
import matplotlib.animation
import copy
# Imports internes.
from functions.utils import Dictionary, Utils
from trading.historiques.ctrl_histos import CtrlHistos
# noinspection PyUnusedLocal,PyUnresolvedReferences
class ShowGeek:
def __init__(self, mode):
CtrlHistos.custom_ta()
super().__init__(mode=mode) # Appelle Genetic.__init__(), 2ème classe héritée de la classe dérivée.
self.fig = plt.figure(1)
self.mode = mode
self.df_pilot = None
self.df_scats = None
self.nb_datas = 0
self.l_ax = None
self.pips = 0
""" Animation. """
self.b_anim = True
self.abscissa_size = 0
self.pointer = -1
self.b_paused = False
self.b_reverse = False
self.magn = 1 # Appui sur les flèches : Le pas d'avancement est : 1, 10 ou 100.
""" Signal pilote. """
self.od = Dictionary(self.set_heights())
self.get_df_pilot()
""" Mise au point : Exécuter les modes 1 à 5 dans l'ordre. """
if self.mode == 1:
""" Mode 'normal', selon votre imagination : Création, exécution et affichage de la stratégie. """
self.show_ui()
elif self.mode == 2:
""" Affichage des listes de paramètres à tester par l'algorithme génétique. """
self.control_bounds()
elif self.mode == 3:
""" Exécute une seule boucle de l'algorithme génétique pour la mise au point du code. """
self.ga(b_single_loop=True)
elif self.mode == 4:
""" Exécute l'algorithme génétique complet pour obtenir les meilleurs paramètres. """
self.ga() # Les indices fournis à la fin du traitement sont utilisées ci après (mode 5).
else: # 5 - Optionnel
""" Affichage des paramètres sélectionnés par l'algorithme génétique. """
# self.control_bounds(i1, i2, ..., i{n}) # <-- Remplacer les i par les indices fournis au mode 4.
def get_args(self):
args = self.central_args('ui')
return dict( # Ces valeurs seront à la racine du super-dictionnaire {self.od}.
geometry=args.read('geometry', (100, 40, 1000, 700)), # x, y, w, h.
margins=args.read('margins', (6, 8, 10, 8)), # Marges : haut, droite, bas, gauche.
abscissa_size=args.read('abscissa_size', 600), # Nb de points affichés en abscisse.
window_title=args.read('window_title', "Modèle simple"), # Titre de la fenêtre.
figure_title=args.read('figure_title', "Modèle simple - Ne pas modifier"), # Titre des graphiques.
leader_lines=args.read('leader_lines', True), # Lignes hortogonales de repère, suivi de la souris.
subplots=args.read('subplots', dict( # Noms et hauteurs (en %) des graphiques modifiables.
Principal=65, # Subplot principal : NE PAS MODIFER SON ORTHOGRAPHE.
Milieu='', # '' : Si chaîne vide => Les hauteurs seront automatiquement réparties.
Bas='',
)),
show_pilot=args.read('show_pilot', True),
show_volume=args.read('show_volume', False),
show_linked_label=args.read('show_linked_label', True),
best_zones=dict(
show=args.read(['best_zones', 'show'], False),
gap=args.read(['best_zones', 'gap'], 20), # Nombre de pips take-profit ou stop-loss.
bandwidth=args.read(['best_zones', 'bandwidth'], 80), # Pourcentage : de 0 à 100.
up=args.read(['best_zones', 'up'], True), # Affichage des palliers haut.
down=args.read(['best_zones', 'down'], True), # Affichage des palliers bas.
scatters=args.read(['best_zones', 'scatters'], True), # Affichage des optimums sous forme de ronds.
confirm=args.read(['best_zones', 'confirm'], True),
zig_zag=args.read(['best_zones', 'zig_zag'], True), # Affichage de la courbe zig-zag.
colors=args.read(['best_zones', 'colors'], ('#ffff0030', '#ff00ff10')), # Coloriage des ouvertures.
),
animation=args.get('animation', True),
interval=args.get('interval', .1),
# Paramètres généraux supplémentaires ici ...
# Paramètres seaborn : https://www.python-simple.com/python-seaborn/seaborn-general.php
)
def show_ui(self):
""" ****************** Algorithme de construction ****************** """
""" Paramètres avant la création des graphiques (axes). """
self.pre_params()
""" Ajout des indicateurs dans la dataframe {df_pilot} <-- Colonnes ajoutées à {df_pilot}. """
self.add_indics()
""" Ajout du signal pilote. """
self.show_pilot()
""" Distribution des signaux (des colonnes) : un dataframe par axe dans {self.l_ax}. """
self.distrib()
""" Création des graphiques (axes). """
self.build_axis()
""" Paramètres après la création des graphiques (axes). """
self.post_params()
""" Affichage animé. """
self.show()
def animate(self, _=''):
""" Appelé dans la boucle matplotlib.FuncAnimation depuis self.show(). """
def get_line(_line_name):
_from, _to = x[0], x[0] + len(x)
return self.df_pilot[_line_name][_from: _to]
if self.b_paused:
return
""" Gestion du pointeur. """
self.set_pointer()
""" Abscisse commune : liste de {abscissa_size} valeurs depuis {pointer}. """
x = list(range(self.pointer, self.pointer + self.abscissa_size))
""" Parcours des graphiques (axes). """
for axis_name in self.od.keys():
if axis_name == 'figure':
continue
o_ax = self.od.read([axis_name, 'o_ax'])
if o_ax is None:
continue
df = self.od.read([axis_name, 'df'])
y_min, y_max = 10 ** 9, -10 ** 9
if not (df is None or df.empty):
""" Parcours des courbes (lines). """
for o_line in self.get_axis_lines(axis_name):
""" Plusieurs courbes (lines) dans un graphique (o_ax). """
line_name = o_line.get_label()
serie = df[line_name]
y = serie[self.pointer: self.pointer + self.abscissa_size]
""" Égalisation des tailles de x et y. """
len_min = min(len(x), len(y))
x, y = copy.copy(x[:len_min]), copy.copy(y[:len_min])
""" Hook pour l'injection de paramètres dans cette courbe. """
d_attr = self.hook_line_anim(axis_name, line_name, x, y, o_ax, df, o_line)
if d_attr is None:
d_attr = {}
""" Injection des datas à afficher. """
if d_attr.get('visible', True):
o_line.set_ydata(y)
o_line.set_xdata(x)
if line_name == 'Close':
o_line.set_color('0.6')
if not np.isnan(y).all():
y_min = min(y_min, np.nanmin(y))
y_max = max(y_max, np.nanmax(y))
""" Calcul des limites des ordonnées (y). """
if y_max < y_min:
y_min, y_max = -10, 10
y_padding = max(0, (y_max - y_min) * 0.05) # Marges top et bottom dans les axes (5%)
y_min, y_max = (round(y_min - y_padding, 6), round(y_max + y_padding, 6)) if y_max >= y_min else (-10, 10)
y_min = max(y_min, -10 ** 9)
y_max = min(y_max, 10 ** 9)
""" Limites x et y. """
o_ax.set_xlim(x[0], x[-1])
if y_min == y_max:
y_min = y_max = None
o_ax.set_ylim(y_min, y_max)
self.hook_axis_anim(axis_name, x, o_ax, df, y_min, y_max, get_line)
if not self.b_anim:
plt.draw()
def set_pointer(self):
if self.b_reverse:
self.pointer -= self.magn
if self.pointer < 0:
self.pointer = self.nb_datas - self.abscissa_size
else:
self.pointer += self.magn
if self.pointer > self.nb_datas - self.abscissa_size:
self.pointer = 0
def show_pilot(self):
""" Affichage conditionnel du signal-pilote. """
self.od.write(['Principal', 'signals', 'Close', 'visible'], self.od.read(['figure', 'show_pilot'], False))
self.add_df('Principal', ['Close'])
def pre_params(self):
""" Fenêtre : taille, position, titre. """
mgr = plt.get_current_fig_manager()
mgr.set_window_title(self.od.read(['figure', 'window_title'], 'Stratégie'))
win = mgr.window
win.setGeometry(*self.od.read(['figure', 'geometry']))
""" Paramètres seaborn : https://www.python-simple.com/python-seaborn/seaborn-general.php """
sns.set_context(self.od.read(['figure', 'seaborn_context'], 'paper')) # paper, notebook, talk, poster
sns.set_style(self.od.read(['figure', 'seaborn_style'], 'darkgrid')) # white, dark, whitegrid, darkgrid, ticks
""" Ini attributs. """
self.abscissa_size = self.od.read(['figure', 'abscissa_size'], 600)
self.b_anim = self.od.read(['figure', 'animation'], False)
""" Affichage des best-zones dans le graphique (subplot) contenant le signal 'Close'. """
subplot_name = self.od.read(['figure', 'best_zones', 'subplot'])
if self.od.read(['figure', 'best_zones', 'show'], False) and subplot_name:
""" Appel de l'indicateur de trend {best_zones}. """
gap = self.od.read(['figure', 'best_zones', 'gap'], 24)
bandwidth = self.od.read(['figure', 'best_zones', 'bandwidth'], 80)
self.df_pilot[['Up', 'Down', 'Peak']], self.df_scats = self.df_pilot.ta.best_zones(gap=gap,
bandwidth=bandwidth)
""" Affichage des palliers haut et bas. """
l_columns = list()
if self.od.read(['figure', 'best_zones', 'up'], False):
l_columns.append('Up')
if self.od.read(['figure', 'best_zones', 'down'], False):
l_columns.append('Down')
self.add_df(subplot_name, l_columns)
def post_params(self):
""" Recherche des paramètres dans le dictionnaire. """
""" Titre des graphiques. """
axis_title = self.od.read(['figure', 'figure_title'])
if isinstance(axis_title, str) and len(axis_title) > 0:
o_ax = self.get_first_axis()
if self.is_axis(o_ax):
o_ax.set_title(axis_title)
""" y axis : position et visibilité des graduations (ticks). """
for l_keys in self.od.key_list():
if l_keys[-1] != 'y_ticks':
continue
position = self.od.read(l_keys)
o_ax = self.od.read([l_keys[0], 'o_ax'])
if not self.is_axis(o_ax):
continue
if position is False:
o_ax.axes.set_yticks([]) # Suppression (ticks + ticklabels + grille).
pass
else:
o_ax.yaxis.set_ticks_position(position) # left, right
""" Suppression de la grille. """
for l_keys in self.od.key_list():
if l_keys[-1] != 'grid':
continue
if self.od.read(l_keys) is False:
o_ax = self.od.read([l_keys[0], 'o_ax'])
o_ax.grid(False) # Suppression de la grille.
""" x axis : visibilité des graduations (ticks) """
for l_keys in self.od.key_list():
if l_keys[-1] != 'x_ticks':
continue
o_ax = self.od.read([l_keys[0], 'o_ax'])
if not self.is_axis(o_ax):
continue
if self.od.read(l_keys) is False:
o_ax.tick_params(axis='x', colors='#fff0') # '#fff0' = invisible
""" Légendes, lignes horizontales et verticales, coloriages. """
od_display = self.central_args('display')
for axis_name, params in od_display.items():
o_ax = self.od.read([axis_name, 'o_ax'])
if self.get_axis_lines(axis_name): # Si le graphique n'a aucune ligne, on ne fait rien.
""" Légendes. """
legend = params.get('legend')
pos = ['auto', 'rh', 'lh', 'lb', 'rb', 'rm', 'lm', 'rm', 'cb', 'ch', 'cm']
if isinstance(legend, bool):
o_ax.legend().set_visible(legend) # Visibilité de la légende.
elif legend in pos:
o_ax.legend(loc=pos.index(legend)).set_title(axis_name)
else:
o_ax.legend().set_title(axis_name)
""" Lignes horizontales. """
l_lines_h = params.get('lines_H', [])
if isinstance(l_lines_h, dict):
l_lines_h = [l_lines_h]
for d_line in l_lines_h:
self.trace_hline(axis_name, **d_line)
""" Lignes verticales. """
l_lines_v = params.get('lines_V', [])
if isinstance(l_lines_v, dict):
l_lines_v = [l_lines_v]
for d_line in l_lines_v:
self.trace_vline(axis_name, **d_line)
""" Coloriage inter-zônes : statiques et dynamiques. """
ld_color_between = params.get('color_between', []) # Liste de dictionnaires.
l_args_dyn, l_args_stat = list(), list()
for d_between in ld_color_between:
y1, y2 = d_between.get('y1'), d_between.get('y2')
if isinstance(y1, str) or isinstance(y2, str):
""" Coloriage dynamique, rafraîssement (à chaque affichage), délégué au hook axis. """
color_up, color_down = d_between.get('color_up'), d_between.get('color_down')
if color_up is not None:
l_args_dyn.append(dict(y1=y1, y2=y2, where='>', fc=color_up, interpolate=True))
if color_down is not None:
l_args_dyn.append(dict(y1=y1, y2=y2, where='<', fc=color_down, interpolate=True))
elif isinstance(y1, (int, float)) and isinstance(y2, (int, float)):
""" Coloriage statique, passage unique. """
color = d_between.get('color')
if color is not None:
l_args_stat.append(dict(y1=y1, y2=y2, fc=color, ec='#fff0'))
if l_args_stat:
self.fill_between(o_ax, l_args_stat, b_static=True)
if l_args_dyn:
self.od.write([axis_name, 'color_between'], l_args_dyn)
def build_axis(self):
""" Construction de la fenêtre Windows. """
""" Marges générales et répartition des subplots. """
l_margins = self.od.read(['figure', 'margins'], (6, 8, 10, 8)) # Marges : haut, droite, bas, gauche.
x, w = l_margins[3] / 100, 1 - (l_margins[1] + l_margins[3]) / 100
y_offset, y_height = l_margins[0], 100 - l_margins[0] - l_margins[2] # Valeurs : 0 à 100.
ax_top, y = 100, 1 - y_offset / 100
first_axis, axis_ante = '', ''
for axis_name, h in self.od.read(['figure', 'subplots']).items():
o_ax = None
if isinstance(h, (int, float)):
if first_axis == '':
first_axis = axis_name
else:
self.od.write([axis_ante, 'x_ticks'], False)
axis_ante = axis_name
h = h * y_height / 10_000
y -= h
o_ax = self.fig.add_axes((x, y, w, h), sharex=self.od.read([first_axis, 'o_ax']))
self.od.write([axis_name, 'geometry'], (x, y, w, h))
else:
""" Graphique jumelé (twined axis). """
tw = h.split()
if tw[0] == 'twinned':
""" Axes jumelés. """
parent_name = tw[-1]
parent_axis = self.od.read([parent_name, 'o_ax'])
if self.is_axis(parent_axis):
o_ax = parent_axis.twinx()
if self.is_axis(o_ax):
self.od.write([axis_name, 'o_ax'], o_ax)
""" Ajout artificiel d'attributs vides. """
o_ax.filled = list()
o_ax.note_x = o_ax.annotate('', (0, 0))
o_ax.note_y = o_ax.annotate('', (0, 0))
""" Repères : lignes hortogonales sous le curseur de la souris. Voir on_mouse_move() """
o_ax.hline = o_ax.axhline(y=-1000, color='k', lw=.2, ls='--') # -1000 = Souris hors figure.
o_ax.vline = o_ax.axvline(x=-1000, color='k', lw=.2, ls='--')
""" Format des graduations y. """
o_ax.yaxis.set_major_formatter(FormatStrFormatter('%.1f'))
""" Initialisation des courbes (DataFrames) dans les graphiques (axes). """
df = self.od.read([axis_name, 'df'])
if axis_name == self.od.read(['figure', 'best_zones', 'subplot']) and self.is_df(self.df_scats):
if self.od.read(['figure', 'best_zones', 'scatters'], False):
sns.scatterplot(data=self.df_scats, x='Indx', y='Close', hue=axis_name, ax=o_ax)
if self.od.read(['figure', 'best_zones', 'confirm'], False):
sns.scatterplot(data=self.df_scats, x='confirm_x', y='confirm_y', ax=o_ax, hue='c_type',
alpha=.3)
if self.od.read(['figure', 'best_zones', 'zig_zag'], False):
sns.lineplot(data=self.df_scats, x='Indx', y='Close', ax=o_ax, lw=.5)
o_ax.set_ylabel('')
""" Lines. """
if self.is_df(df) and len(df.columns) > 0:
sns.lineplot(data=df[:1], ax=o_ax)
for o_line in o_ax.lines:
line_name = o_line.get_label()
if not line_name.startswith('_line'):
d_vals = self.od.read([axis_name, 'signals', line_name.strip('_trace')])
if d_vals is None:
continue
color = d_vals.get('c', d_vals.get('color'))
if color is not None:
o_line.set_color(color)
if line_name.endswith('_trace'):
o_line.set(alpha=.2)
o_line.set_linewidth(4)
else:
linewidth = d_vals.get('lw', d_vals.get('linewidth'))
if linewidth is not None:
o_line.set_linewidth(linewidth)
linestyle = d_vals.get('ls', d_vals.get('linestyle'))
if linestyle is not None:
o_line.set_linestyle(linestyle)
""" Position (left, right) des graduations y. """
b_odd = False
l_axes = self.get_axis_names()
if self.od.read(['Volume', 'twined_axis']) == 'Principal':
b_odd = bool(l_axes.index('Principal') % 2) # Position impaire de 'Principal'.
for indx, axis_name in enumerate(l_axes):
self.od.write([axis_name, 'y_ticks'], 'left' if b_odd == indx % 2 else 'right')
""" Coloriage vertical conditionnel des zônes d'entrée en position (ouverture de trade). """
b_zones = self.od.read(['figure', 'best_zones', 'show'], False)
colors = self.od.read(['figure', 'best_zones', 'colors'])
if b_zones and colors is not None:
peaks = self.df_pilot['Peak']
fc_b, fc_t = colors
l_args = [
dict(x=peaks.index, y1=10 ** 5, y2=-10 ** 5, where=peaks <= -1, fc=fc_b, ec='#fff0', interpolate=True),
dict(x=peaks.index, y1=10 ** 5, y2=-10 ** 5, where=peaks >= 1, fc=fc_t, ec='#fff0', interpolate=True),
]
for ax_name in l_axes:
oax = self.o_ax(ax_name)
if self.is_axis(oax):
self.fill_between(oax, l_args)
def set_heights(self):
""" Calcul des hauteurs (en %) subplots. La somme fait 100%. """
args = self.get_args()
if not isinstance(args['subplots'], dict):
raise SystemExit("Super-dictionnaire : La clé 'subplots' doit exister et doit contenir un dictionnaire. ")
l_heights, sum_heights, nb_zeros = list(), 0, 0
for name, pc_height in args['subplots'].items():
height = pc_height if (isinstance(pc_height, int) and pc_height > 0) else 0
nb_zeros += 1 if height == 0 else 0
sum_heights += height
l_heights.append((name, height))
if sum_heights > 100:
""" La somme des hauteurs dépasse 100% : on effectue une réduction proportionnelle. """
l_heights = [(name, pc_height) for (name, pc_height) in l_heights if pc_height > 0] # Suppression des 0.
l_heights = [(name, pc_height * 100 / sum_heights) for (name, pc_height) in l_heights] # Normalisation.
else:
""" Si la somme == 100%, on ne fait rien. Si elle est < 100%, on ajoute un subplot, nommé 'rest'. """
if nb_zeros == 0:
l_heights.append(('rest', 0))
nb_zeros = 1
h_rest = (100 - sum_heights) / nb_zeros
if h_rest == 0:
l_heights = [(name, pc_height) for (name, pc_height) in l_heights if pc_height > 0] # Suppression des 0
for i, (name, pc_height) in enumerate(l_heights):
if pc_height == 0:
l_heights[i] = (name, h_rest)
args['subplots'] = dict(l_heights)
""" Ajout d'arguments par défaut. """
if 'seaborn_context' not in args: # Valeurs possibles : paper, notebook, talk, poster
args['seaborn_context'] = 'paper'
if 'seaborn_style' not in args: # Valeurs possibles : white, dark, whitegrid, darkgrid, ticks
args['seaborn_style'] = 'darkgrid'
return {'figure': args}
def dyn_indic(self, l_columns, dyn_func, **kwargs):
""" Code appelant : add_indics(). A ce stade, on ne connaît pas encore les graphiques (axes) affectés. """
""" Mise en conformité des colonnes du DataFrame : type 'list', même s'il n'y en a qu'une. """
if isinstance(l_columns, str):
l_columns = [l_columns]
kwargs['l_cols'] = l_columns # type(l_columns) = list.
""" Nombre de points affichés. """
showed_length = min(self.abscissa_size, kwargs.get('showed_length', self.abscissa_size // 3))
kwargs['showed_length'] = showed_length
def add_df(self, subplot_name, l_columns, df=None):
""" Création du dataframe qui sera associé à un graphique.
- Méthode appelée par la méthode distrib() de la classe dérivée.
- Le DataFrame {df} a plusieurs colonnes, chacune associée à une courbe. """
""" Vérification de l'existence du graphique {subplot_name}. """
if self.od.read(['figure', 'subplots', subplot_name]) is None:
raise SystemExit(f"Erreur dans self.distrib() : Le subplot {subplot_name} n'existe pas.")
l_entire_list = list()
for column in l_columns:
""" {column} est le nom de la colonne, ou bien un dictionnaire d'attributs (couleur, épaisseur, ...). """
if isinstance(column, dict):
""" Ajout du dictionnaire d'attributs au dictionnaire global {self.od}. """
col_name = column.pop('name')
d_params = column if column else None
self.od.write([subplot_name, 'signals', col_name.strip('_')], d_params)
column = col_name
""" Exemples : 'RegLine' : RegLine seul, 'RegLine_' : Trace seule, 'RegLine__' : RegLine + Trace """
if column.endswith('__'):
l_entire_list.append(column[:-2])
l_entire_list.append(f'{column[:-1]}trace')
elif column.endswith('_'):
l_entire_list.append(f'{column}trace')
else:
l_entire_list.append(column)
l_cols = list()
for column in l_entire_list:
""" Suppression des colonnes inexistantes. """
try: # Test colonne par colonne.
self.df_pilot[column] if df is None else df[column]
except KeyError:
""" Si la colonne n'existe pas ... """
Utils.printc(f"distrib()-{subplot_name} : La colonne '{column}' n'existe pas dans le DataFrame.")
continue
l_cols.append(column)
""" Fusion par ajout de colonnes dans le df existant. """
df = self.df_pilot[l_cols] if df is None else df[l_cols]
df_exist = self.od.read([subplot_name, 'df'])
if df_exist is not None:
df_exist[df.columns] = df
df = df_exist # On retrouve les colonnes de df_exist en premières places.
if not self.is_df(df):
return
self.od.write([subplot_name, 'df'], df)
for column in df.columns:
if column.lower() == 'close':
""" Si ce dataframe a une colonne 'Close', on paramètre 3 traitements distincts par défaut :
- Une étiquette-suiveuse (linked_label).
- Un graphique jumelé (twined_axis) contenant l'affichage du volume en semi-transparence.
- Éléments du graphique jumelé non affichés : y_ticks, y_ticks_labels, grille, légende. """
self.od.write(['figure', 'best_zones', 'subplot'], subplot_name)
if self.od.read(['figure', 'show_linked_label'], False):
self.od.write([subplot_name, 'linked_label'], 'Close')
if self.od.read(['figure', 'table_name'], None) is None:
self.od.write(['figure', 'show_volume'], False)
if self.od.read(['figure', 'show_volume'], False):
self.od.write(['figure', 'subplots', 'Volume'], f'twinned with {subplot_name}')
self.od.write(['Volume', 'df'], self.df_pilot[['Volume']])
self.od.write(['Volume', 'signals', 'Volume', 'visible'], False)
self.od.write(['Volume', 'twined_axis'], subplot_name)
self.od.write(['Volume', 'y_ticks'], False), # Suppression (ticks + ticklabels + grille).
self.od.write(['Volume', 'legend'], False) # Suppression de la légende.
def get_df_pilot(self):
od_pilot = self.central_args('pilot')
od_pilot.fusion(od_pilot['test' if self.mode == 1 else 'learn'])
instrument = od_pilot.read('instrument', 'EUR/USD')
self.pips = .01 if instrument.endswith('JPY') else .0001
table_name = od_pilot.read('table', 10)
self.df_pilot = CtrlHistos(instrument).get_pilot(**od_pilot) # DataFrame
self.nb_datas = self.df_pilot.shape[0]
self.od.write(['figure', 'instrument'], instrument)
self.od.write(['figure', 'table_name'], table_name)
self.od.write(['figure', 'nb_points'], self.nb_datas)
def linked_label(self, axis_name, x, o_ax, df):
""" https://matplotlib.org/stable/tutorials/text/annotations.html#annotating-with-text-with-box """
linked_label = self.od.read([axis_name, 'linked_label'])
if linked_label:
bbox = {'boxstyle': 'larrow', 'fc': '#ff02', 'ec': 'k', 'lw': .5} # fc=face color, ec=edge color
indx = min(x[-1], df.shape[0] - 1)
last_value = round(df[linked_label][indx], 2) # En pips.
""" Affichage de l'étiquette actuelle (m = marge). """
o_ax.note_y.remove() # Effacement de l'étiquette précédente.
m = self.abscissa_size / 70
o_ax.note_y = o_ax.annotate(last_value, (x[-1], last_value), xytext=(x[-1] + m, last_value), bbox=bbox)
def paint_volume(self, x, o_ax, df):
y = df['Volume'][x[0]: x[-1] + 1] * .8 # Affichage du volume à 80% d'amplitude.
l_args = [
dict(x=x, y1=y, y2=0, fc='#0079a333', ec='#fff0')
]
self.fill_between(o_ax, l_args)
def fill_between(self, o_ax, l_params, b_static=False):
""" Colorie entre 2 valeurs, conditionnellement.
@param o_ax: Axis matplotlib : graphique contenant des courbes.
@param l_params: Liste de coloriages. Chaque élément est un dictionnaire personnalisé. Voir lien ci-dessous.
@param b_static: Coloriage statique (un seul passage).
https://matplotlib.org/stable/api/_as_gen/matplotlib.axes.Axes.fill_between.html """
""" Effacement du coloriage antérieur, nécessaire si les couleurs ont une transparence. """
if not self.is_axis(o_ax):
raise SystemExit(f"Erreur dans self.fill_between() : L'objet o_ax ({o_ax}) n'est pas un axis.")
for fill in o_ax.filled:
""" Suppression des coloriages mémorisés. """
o_ax.collections.remove(fill)
o_ax.filled.clear()
""" l_fill contient autant de paramètres que de coloriages à effectuer. """
for p in l_params:
""" Égalisation des vecteurs. """
if p.get('x') is None:
p['x'] = range(self.df_pilot.shape[0])
len_min = len(p['x'])
if self.is_df(p['y1']):
len_min = min(len_min, len(p['y1']))
if self.is_df(p['y2']):
len_min = min(len_min, len(p['y2']))
p['x'] = p['x'][:len_min]
""" Coloriage. """
if b_static:
""" Sans mémorisation. """
o_ax.fill_between(**p)
else:
""" Avec mémorisation. """
o_ax.filled.append(o_ax.fill_between(**p))
""" *********************************** Helpers. ************************************ """
def trace_hline(self, axis_name, y, **kwargs):
o_ax = self.o_ax(axis_name)
if self.is_axis(o_ax):
o_ax.axhline(xmin=0, xmax=1, y=y, **kwargs)
def trace_vline(self, axis_name, x, **kwargs):
o_ax = self.o_ax(axis_name)
if self.is_axis(o_ax):
o_ax.axvline(ymin=0, ymax=1, x=x, **kwargs)
@staticmethod
def is_axis(o_ax):
return o_ax.__class__.__name__ == 'Axes'
@staticmethod
def is_df(df):
return df.__class__.__name__ in ['DataFrame', 'Series']
def get_axis_names(self, b_twin=False):
if b_twin:
""" Liste de noms de tous les graphiques (subplots) de base et twined_axis. """
return [ax_name for ax_name in list(self.od.read(['figure', 'subplots']).keys()) if ax_name != 'rest']
else:
""" Liste de noms de tous les graphiques (subplots) de base. """
return [ax_name for (ax_name, val) in list(self.od.read(['figure', 'subplots']).items())
if ax_name != 'rest' and not str(val).startswith('twinned')]
def get_first_axis(self):
l_axes = self.get_axis_names()
o_ax = self.o_ax(l_axes[0])
if self.is_axis(o_ax):
return o_ax
def get_last_axis(self):
l_axes = self.get_axis_names()
o_ax = self.o_ax(l_axes[-1])
if self.is_axis(o_ax):
return o_ax
def _get_all_ax(self):
l_ax = list()
for key, val in self.od.read(['figure', 'subplots']).items():
l_ax.append(self.od.read([key, 'o_ax']))
return l_ax
def get_axis_lines(self, axis_name):
""" Retourne une liste d'objets o_line. """
o_ax = self.o_ax(axis_name)
l_olines = list()
for o_line in o_ax.lines:
""" Plusieurs courbes (lines) dans un graphique (o_ax). """
line_name = o_line.get_label()
if line_name.startswith('_line'):
continue
l_olines.append(o_line)
return l_olines
def o_ax(self, axis_name):
return self.od.read([axis_name, 'o_ax'])
def df(self, axis_name):
return self.od.read([axis_name, 'df'])
def ax_df(self, axis_name):
o_ax = self.od.read([axis_name, 'o_ax'])
df = self.od.read([axis_name, 'df'])
return o_ax, df
def distrib(self):
od_display = self.central_args('display')
for axis, params in od_display.items():
d_lines = params.get('lines', {})
for line, d_params in d_lines.items():
if isinstance(d_params, dict):
d_params['name'] = line
else:
d_params = dict(name=line)
self.add_df(axis, [d_params])
""" ***************************** Méthodes surchargées. ***************************** """
@staticmethod
def get_args_ui():
pass
@staticmethod
def central_args(l_keys):
raise SystemExit("show_geek.py > ShowGeek :\nLa méthode 'central_args()' doit être surchargée.")
def get_pilot(self):
raise SystemExit("show_geek.py > ShowGeek :\nLa méthode 'get_pilot()' doit être surchargée.")
""" ***************************** Méthodes à surcharger. **************************** """
def hook_axis_anim(self, axis_name, x, o_ax, df, y_min, y_max, get_line):
""" Affichage de 2 éléments distincts :
- Volumes, semi-transparent, sans bordure.
- Étiquette-suiveuse """
""" 1 - Affichage des volumes, semi-transparent, sans bordure. """
if axis_name == 'Volume':
self.paint_volume(x, o_ax, df)
""" 2 - Étiquette-suiveuse à droite du graphique. """
self.linked_label(axis_name, x, o_ax, df)
""" 3 - Coloriage inter-zônes dynamique. """
l_od_args = self.od.read([axis_name, 'color_between'])
l_args = list()
if l_od_args:
for d_args in l_od_args:
params = dict()
try:
y1, y2 = d_args['y1'], d_args['y2']
if isinstance(y1, str):
y1 = get_line(y1)
if isinstance(y2, str):
y2 = get_line(y2)
params['x'] = x
params['y1'] = y1
params['y2'] = y2
params['where'] = (y1 > y2) if d_args['where'] == '>' else (y1 < y2)
params['fc'] = d_args['fc']
params['ec'] = '#fff0'
params['interpolate'] = d_args['interpolate']
l_args.append(params)
self.fill_between(o_ax, l_args)
except (Exception,) as err:
raise SystemExit(f"ShowGeek.hook_axis_anim(). Erreur dans le coloriage inter-zônes :\n{err}")
def hook_line_anim(self, axis_name, line_name, x, y, o_ax, df, o_line):
if line_name[-6:] != '_trace':
tr = line_name[-6:]
return self.od.read([axis_name, 'signals', o_line.get_label()])
""" **************************** Événements et affichage. *************************** """
def key_event(self, ev):
def arrows(b_forward):
b_reverse = self.b_reverse
self.b_paused, self.b_reverse = False, False if b_forward else True
self.animate()
self.b_paused = True
self.b_reverse = b_reverse
self.magn = 1
keycode = ev.key
if keycode == ' ':
""" Pause on/off. """
self.b_paused = not self.b_paused
elif keycode == 'tab':
""" Inversion de sens. """
self.b_reverse = not self.b_reverse
elif keycode == 'right' or keycode == 'left':
""" Appui sur touche flèche droite ou flèche gauche. """
arrows(keycode == 'right')
elif keycode == 'up' or keycode == 'down':
""" Appui sur touche flèche haut ou flèche bas. """
self.magn = 10
arrows(keycode == 'up')
elif keycode == 'pageup' or keycode == 'pagedown':
""" Appui sur touche flèche page-haut ou flèche page-bas. """
self.magn = 100
arrows(keycode == 'pageup')
def on_mouse_move(self, ev):
""" Le style des lignes est définidans self.build_axis(). """
for o_ax in self.l_ax:
o_ax.vline.set_data([ev.xdata, ev.xdata], [0, 1])
o_ax.hline.set_data(([0, 1], [ev.ydata, ev.ydata]) if o_ax == ev.inaxes else ([0, 0], [0, 0]))
if ev.xdata is not None and ev.ydata is not None:
ax = self.get_last_axis()
ax.note_x.remove() # Effacement de l'étiquette précédente.
bbox = {'boxstyle': 'round4', 'fc': '#ff02', 'ec': 'k', 'lw': .5} # fc=face color, ec=edge color
ax.note_x = ax.annotate(text=round(ev.xdata), xy=(ev.xdata, ax.viewLim.y0), bbox=bbox)
if self.pointer <= 1 or not self.b_anim:
plt.draw() # Nécessaire si pas d'animation (lorsque self.pointer reste à 1).
def show(self):
self.l_ax = self._get_all_ax()
self.fig.canvas.mpl_connect('key_press_event', self.key_event)
if self.od.read(['figure', 'leader_lines']):
self.fig.canvas.mpl_connect('motion_notify_event', self.on_mouse_move)
if self.b_anim:
interval = self.od.read(['figure', 'interval'], 1)
ani = matplotlib.animation.FuncAnimation(self.fig, self.animate, interval=interval, repeat=True)
# |_ La variable {ani} doit exister pour empêcher le garbage collector de supprimer l'animation.
self.animate()
plt.show()
/trading/strategies/models/ga_model.py
:
""" Version 2022-04-02 ---------------------------- SUIVRE CES ÉTAPES ----------------------------
--- MODE 1 : Reprendre les étapes du modèle 'strategy_model.py'.
====================================================================================
- Si le paramètre indics.backtest est True, 6 colonnes sont automatiquement crées. Elles pourront être affichées.
- Les 6 colonnes : ['Balance (pips)', 'Open (pips)', 'Equity (pips)', 'Balance (€)', 'Open (€)', 'Equity (€)']
--- MODE 2 : Contrôle des listes de paramètres de l'algorithme génétique - Pratique :
====================================================================================
1 - La stratégie doit avoir été construite (point précédent, MODE 1).
2 - Vérifier que les listes de paramètres sont bien celles que vous avez choisi.
--- MODE 3 puis 4 : Algorithme génétique - Pratique :
====================================================================================
1 - La stratégie doit avoir été construite, les paramètres du ga contrôlés (points précédents, MODE 1 et 2).
2 - MODE 3 : Exécution de self.ga(b_single_loop=True). Cela n'exécutera qu'une seule boucle de l'algorithme ...
|_ ... pour permettre sa mise au point et son refactoring.
3 - MODE 4 : Exécution de self.ga() : La boucle peut être parcourue des centaines de fois pendant la convergence.
|_ Cette opération est chronophage et peut durer des heures.
|_ Attention toutefois ! -> Pour éviter un sur-apprentissage :
|_ Utiliser une partie des données pour cet entraînement (df_pilot). Voir pilot.learn de central_args().
|_ Utiliser une autre partie pour la vérification. Voir pilot.test de central_args().
|_ Un rapport final est affiché, ainsi qu'un graphique score = f(générations).
|_ Les meilleurs paramètres sont affichés à la ligne 'Arguments' de ce rapport : les noter.
--- Retour au MODE 1 - Pratique - Meilleurs paramètres et vérification :
====================================================================================
1 - Placer les paramètres notés précédemment dans le dictionnaire de self.central_args(), section 'indics'.
2 - MODE 1 - Lancer l'appli : Les résultats seront différents car le jeu de données n'est pas le même (learn != test).
"""
# Imports externes
import seaborn as sns
import pandas_ta as ta # Pour afficher la documentation d'un indicateur -> help(ta.kama)
# import numpy as np
# Imports internes
from functions.utils import Dictionary
from show.show_geek import ShowGeek
from trading.strategies.genetic import GeneticAlgorithm
""" MODE -> Méthodes appelées : *****************************************************************************
1 : Mode normal -> ShowGeek.show_ui(). *
2 : Affiche listes de paramètres -> GeneticAlgorithm.control_bounds() *
3 : Mise au point (MAP) de GA -> GeneticAlgorithm.ga(b_single_loop=True) # Une seule boucle. *
4 : Algorithme génétique (GA) -> GeneticAlgorithm.ga() # Boucles multiples. *
********************************************************************************************************* """
MODE = 1
# noinspection PyTypeChecker,PyUnresolvedReferences
class Geek(ShowGeek, GeneticAlgorithm):
def __init__(self):
super(Geek, self).__init__(mode=MODE) # GeneticAlgorithm.__init__() est appelé dans ShowGeek.
""" Pas de code ici ! La classe-parent principale (ShowGeek) garde la main. """
def central_args(self, l_keys):
def multiply(od, l_params):
gene = l_params[0]
key = l_params[1].split('.')
val = od.read(key)
return val * gene
od_args = Dictionary(dict(
# ************************************ FENÊTRE ************************************
ui=dict( # Cette grappe {ui} sera à la racine du super-dictionnaire {self.od}.
geometry=(200, 40, 950, 500), # x, y, w, h. Par défaut : (100, 40, 1000, 700).
abscissa_size=600, # Nb de points affichés en abscisse. Par défaut : 600.
window_title="Stratégie xxxxxxxxxxxxxxxxxx", # Titre de la fenêtre.
figure_title="Mise au point du modèle 'ga'", # Titre des graphiques.
subplots=dict( # Noms et hauteurs (en %) des graphiques modifiables. Supprimer / Ajouter.
Principal=50, # Subplot principal : NE PAS MODIFER SON ORTHOGRAPHE.
Milieu='',
Bas='', # Si chaîne vide => Les hauteurs seront automatiquement réparties.
),
show_volume=True,
# animation=False,
interval=50,
# ... Autres arguments pouvant être ajoutés ici : voir ShowGeek.get_args().
),
# ******************************** DATAFRAME PILOTE *******************************
pilot=dict( # Dans cet exemple : les points EUR/USD en Renko7, de {pc_from} à {pc_to}.
instrument='EUR/USD', # EURUSD, USDJPY, EURCHF, USDCAD, NZDUSD, EURGBP, EURJPY, GBPJPY, GBPCHF, ...
table=5, # int (ex : 10) = Renko, str (ex: 'H1') = Candles, None = Ticks.
learn=dict(pc_from=0, pc_to=95, nb_rows=None), # Données différentes pour le learn et pour le test.
test=dict(pc_from=95, pc_to=100, nb_rows=None), # Choisir learn au moins 4 fois plus grand que test.
), # ... pour réduire l'overfitting.
# ****************************** ALGORITHME GÉNÉTIQUE *****************************
ga=dict( # ga = Algorithme génétique.
# Paramètres (Ctrl+clic) : https://pypi.org/project/geneticalgorithm2/#constructor-parameters
function_timeout=120,
algo=dict(
max_num_iteration=150,
population_size=10,
),
# ... Autres arguments pouvant être ajoutés ici : voir GeneticAlgorithm.get_model().
),
# ********************************** INDICATEURS **********************************
indics=dict(
# backtest=False, # par défaut : True.
# stats=False, # En MODE 1 : Affichage des statistiques (si backtest). Par défaut : True.
cost=dict(capital=1000, risk=8, spread=1.1), # Params suppl. pour la méthode cost().
ma1=dict(
function=ta.zlma, # sma, ema, dema, tema, hma, wma, alma, kama, zlma, rma, median, ...
series='MA1', # str or list.
params=dict( # help(ta.zlma)
# close='Close', # Colonne traitée (en entrée) par l'indicateur. par défaut : 'Close'.
length=110,
),
ga_bounds=dict( # Paramètres testés par l'algorithme génétique : limites, pas.
length=[100, 300, 5], # [from, to, step] ex : [100, 300, 5] -> 100 à 300 par pas de 5.
),
),
ma2=dict(
function=ta.zlma, # help(ta.zlma)
series='MA2', # str ou list.
params=dict(
# close='MA1', # par défaut : 'Close'.
length=120,
),
ga_bounds=dict(
length=[1.01, 1.2, .02, multiply, 'ma1.length'], # multiply = fonction interne.
),
),
ma3=dict(
function=self.ma3, # Nom de la fonction personnalisée qui sera exécutée.
),
stochrsi=dict(
series=['RSI', 'RSI_SMA'], # Le nb de cols demandé correspond au nb de cols fournies par l'indic.
# series='RSI', # On ne désire que la 1ère colonne du DataFrame fourni par l'indic.
# series=['_', 'RSI_SMA'], # On ne désire que la 2ème colonne : '_' = colonne ignorée.
function=ta.stochrsi,
params=dict(
# close='MA1', # Signal traité par cet indicateur. Par défaut 'Close' (de df_pilot).
length=41,
rsi_length=30,
k=3,
d=3,
),
),
),
# *********************************** AFFICHAGE ***********************************
display=dict( # c=color, lw=linewidth, ls=linestyle
Principal=dict( # # c : 'r', 'g', 'b', 'y', 'm', 'c', 'w', 'k' - Voir matplotlib named_colors.
lines=dict(
MA1=dict(c='b', lw=.6, ls='-'), # ls = '-', '--', '-.', ':' ('' = invisible)
MA2=dict(c='g'),
MA3=None, # None = Couleur, épaisseur, style, ..., définis automatiquement par Seaborn.
),
# Options dans le graphique 'Principal' : lignes H ou V, coloriage, légendes, etc.
legend='ch', # 'rh', 'lh', 'lb', 'rb', 'rm', 'lm', 'rm', 'cb', 'ch', 'cm' ... ou False.
color_between=[ # (Ctrl + clic) https://matplotlib.org/stable/gallery/color/named_colors.html
dict(y1='MA1', y2='MA2', color_up='#0a04', color_down='#00a4'),
dict(y1='MA1', y2='MA3', color_up='#0aa2', color_down='#a0a2'),
],
),
Milieu=dict(
lines={
# 'Balance (€)': None,
# 'Equity (€)': None,
'RSI': None,
'RSI_SMA': None,
},
# Options : lignes H ou V, coloriage, etc.
legend='cm',
lines_H=[
dict(y=70, lw=.4, ls='-.', c='r'),
dict(y=50, lw=.4, ls='-.', c='b'),
dict(y=30, lw=.4, ls='-.', c='r'),
],
lines_V=[
dict(x=570, lw=40, ls='-', c='#00f3'),
],
color_between=[
dict(y1='RSI', y2=10, color_down='orange'),
dict(y1='RSI', y2=30, color_down='#0aa2'),
dict(y1='RSI', y2=90, color_up='orange'),
dict(y1='RSI', y2=70, color_up='#0aa2'),
dict(y1=100, y2=70, color='#0aa2'),
dict(y1=0, y2=30, color='#0aa2'),
],
),
Bas=dict(
lines={ # Colonnes dans {self.df_pilot} automatiquement créées par le backtest.
'Balance (pips)': None,
'Equity (pips)': dict(c='b'),
# 'Open (pips)': None,
}
),
)
))
return Dictionary(od_args.read(l_keys, {}))
def ma3(self):
""" Méthode personnalisée. Ne peut utiliser que les indicateurs déclarés en amont de central_args.indics.ma3 """
self.df_pilot['MA3'] = 2 * self.df_pilot['Close'] - self.df_pilot['MA2']
def add_indics(self, od_args=None):
""" En absence de surcharge, cette méthode peut être supprimée. """
""" Valeur de retour obligatoire pour les modes 3 et 4 (Algorithme génétique). """
ret = super().add_indics(od_args)
# help(ta.stochrsi) # Affiche la documentation de l'indicateur mentionné.
return ret
def pre_params(self): # <--- Option.
""" En absence de surcharge, cette méthode peut être supprimée. """
""" Surcharge, insertion (avant super()) ou ajout (après super()) au code pre_params() de la classe-parent. """
super().pre_params()
def post_params(self):
""" En absence de surcharge, cette méthode peut être supprimée. """
""" Méthode optionnelle. A supprimer ou à commenter si la surcharge est inutile.
Surcharge, insertion (avant super()) ou ajout (après super()) au code post_params() de la classe-parent. """
""" Affichage des marqueurs. """
if self.df_marks is not None:
o_ax = self.o_ax('Principal')
sns.scatterplot(data=self.df_marks, x='Index', y='Close', ax=o_ax, hue='Type', size='Type',
sizes=(30, 30), zorder=4)
ret = super().post_params() # Code avant -> inséré, code après -> ajouté.
return ret
def hook_axis_anim(self, axis_name, x, o_ax, df, y_min, y_max, get_line):
""" En absence de surcharge, cette méthode peut être supprimée.
Surcharge, insertion ou ajout au code hook_axis_anim() de la classe-parent. """
hook = super().hook_axis_anim(axis_name, x, o_ax, df, y_min, y_max, get_line)
return hook
def hook_line_anim(self, axis_name, line_name, x, y, o_ax, df, o_line):
""" En absence de surcharge, cette méthode peut être supprimée.
Surcharge ou ajout au code hook_line_anim() de la classe-parent. """
hook = super().hook_line_anim(axis_name, line_name, x, y, o_ax, df, o_line) # hook = Valeur de retour.
""" - Masque la partie droite des graph. pour permettre de voir certaines prédictions (ex: nuage ichimoku).
- Masquage conditionnel : ajoutez des conditions avec axis_name, line_name, ... """
# y.loc[x[-100]:] = np.nan # La valeur 100 est un exemple, la remplacer par une variable.
return hook
def cost(self, **d_params):
""" Datas disponibles : {self.np_datas} <-- les colonnes correspondent à {self.l_columns}. """
""" Fonctions internes obligatoires. """
def c(txt):
""" txt : texte de la condition. Opérateurs disponibles : /, \\, --, v, ^, cu, cd, ==, !=, >, >=, <, <= """
return self.cond(txt, indx)
def open_cond(op):
self.opening(op, indx)
def close_cond(indx_open):
""" Fermeture conditionnelle du trade ouvert à {indx_open}. """
op = self.np_trades[indx_open, 6] # 1 = achat, -1=vente.
return c('MA1 cu MA2') if op == 1 else c('MA1 cd MA2')
self.close_cond = close_cond # <-- Fonction close_cond
""" Paramètres de la stratégie : self.d_cost. Commencent par 'p_'. """
# ...
# [print(f'{i}={col}', end=', ') for i, col in enumerate(self.l_columns)] # np_datas. Commenter ou supprimer.
""" Colonnes de np_datas = 0=Close, 1=MA1, 2=MA2, 3=RSI, 4=RSI_SMA, 5=Surf_H """
indx_min = d_params['indx']
for indx in range(indx_min, self.nb_rows):
""" Fermetures. """
self.closures(indx)
""" Ouverture achat (ob = Open Buy). """
if c('MA1 cd MA2'):
open_cond('buy')
""" Ouverture vente (os = Open Sell). """
if c('MA1 cu MA2'):
open_cond('sell')
return super().cost(**d_params)
if __name__ == '__main__':
Geek()
Fichier à cloner puis à adapter à vos besoins.
ga_model.py
:
Les 2 longueurs de moyennes mobiles sont 140 et 180 : Le résultat n'est pas satisfaisant.
Statistiques :
=============
Contexte : EUR/USD - Renko 5 - Tous les points entre 90 % et 100 % de la table en base de données.
Période : 20/07/2021 -> 10/12/2021
Nb jours: 144
Gain journalier: 0.47 €.Capital initial: 1000.0 €
Capital min: 718.71 €
Capital max: 1074.24 €
Capital final: 1067.74 €
Performance: 6.77 %
Gain en pips: 118.7 pipsNb trades à l'achat: 41
Nb trades à la vente: 42
Nb trades gagnants: 47
Nb trades perdants: 36
Nb max gagnants consécutifs: 4
Nb max perdants consécutifs: 3Gain moyen trades gagnants: 19.71 €
Gain moyen trades perdants: -23.85 €
Meilleur trade (pips): 63.9 pips, le 12/11/2021 (index de 4094 à 4191)
Meilleur trade (monnaie): 50.66 €, le 12/11/2021 (index de 4094 à 4191)
Pire trade (pips): -156.1 pips, le 30/07/2021 (index de 521 à 821)
Pire trade (monnaie): -147.49 €, le 30/07/2021 (index de 521 à 821)
Perte max (drawdown): 309 pips le 10/09/2021 (index 1781)
-------------------------------------------------------------------------------
ga_model.py
est volontairement surchargé de commentaires et de méthodes à surcharger vides./trading/strategies/models/ga_quick_model.py
:
""" Version 2022-04-02 """
# Imports externes
import seaborn as sns
import pandas_ta as ta # Pour afficher la documentation d'un indicateur -> help(ta.kama)
# import numpy as np
# Imports internes
from functions.utils import Dictionary
from show.show_geek import ShowGeek
from trading.strategies.genetic import GeneticAlgorithm
""" MODE -> 1 : Mode normal, 2 : Affiche listes de paramètres, 3 et 4 :Genetic Algorithm """
MODE = 1
# noinspection PyTypeChecker,PyUnresolvedReferences
class Geek(ShowGeek, GeneticAlgorithm):
def __init__(self):
super(Geek, self).__init__(mode=MODE) # GeneticAlgorithm.__init__() est appelé dans ShowGeek.
def central_args(self, l_keys):
def multiply(od, l_params):
gene = l_params[0]
key = l_params[1].split('.')
val = od.read(key)
return val * gene
od_args = Dictionary(dict(
ui=dict(
geometry=(200, 40, 950, 500),
abscissa_size=600,
window_title="Stratégie xxxxxxxxxxxxxxxxxx",
figure_title="Mise au point du modèle 'quick'",
subplots=dict(
Principal=50,
Milieu='',
Bas='',
),
# show_volume=True,
# animation=False,
# interval=50,
),
pilot=dict(
instrument='EUR/USD',
table=7,
learn=dict(pc_from=0, pc_to=95, nb_rows=None),
test=dict(pc_from=95, pc_to=100, nb_rows=None),
),
ga=dict(
function_timeout=120,
algo=dict(
max_num_iteration=150,
population_size=10,
),
),
indics=dict(
# backtest=False,
# stats=False,
cost=dict(capital=1000, risk=8, spread=1.1),
ma1=dict(
function=ta.zlma,
series='MA1',
params=dict(
length=110,
),
ga_bounds=dict(
length=[100, 300, 5],
),
),
ma2=dict(
function=ta.zlma,
series='MA2',
params=dict(
length=120,
),
ga_bounds=dict(
length=[1.01, 1.2, .02, multiply, 'ma1.length'],
),
),
ma3=dict(
function=self.ma3,
),
stochrsi=dict(
series=['RSI', 'RSI_SMA'],
function=ta.stochrsi,
params=dict(
length=41,
rsi_length=30,
k=3,
d=3,
),
),
),
display=dict(
Principal=dict( # # c : 'r', 'g', 'b', 'y', 'm', 'c', 'w', 'k' - Voir matplotlib named_colors.
lines=dict(
MA1=dict(c='b', lw=.6, ls='-'), # ls = '-', '--', '-.', ':' ('' = invisible)
MA2=dict(c='g'),
MA3=None,
),
legend='ch', # 'rh', 'lh', 'lb', 'rb', 'rm', 'lm', 'rm', 'cb', 'ch', 'cm' ... ou False.
color_between=[ # (Ctrl + clic) https://matplotlib.org/stable/gallery/color/named_colors.html
dict(y1='MA1', y2='MA2', color_up='#0a04', color_down='#00a4'),
dict(y1='MA1', y2='MA3', color_up='#0aa2', color_down='#a0a2'),
],
),
Milieu=dict(
lines={
'RSI': None,
'RSI_SMA': None,
},
legend='cm',
lines_H=[
dict(y=70, lw=.4, ls='-.', c='r'),
dict(y=50, lw=.4, ls='-.', c='b'),
dict(y=30, lw=.4, ls='-.', c='r'),
dict(y=10, lw=.4, ls=':', c='g'),
],
lines_V=[
dict(x=570, lw=4, ls='-', c='#00f6'),
],
color_between=[
dict(y1=100, y2=70, color='#0aa2'),
dict(y1='RSI', y2=30, color_down='#0aa2'),
dict(y1='RSI', y2=10, color_down='orange'),
],
),
Bas=dict(
lines={ # Colonnes automatiquement créées par le backtest.
'Balance (pips)': None,
'Equity (pips)': dict(c='b'),
'Open (pips)': None,
}
),
)
))
return Dictionary(od_args.read(l_keys, {}))
def ma3(self):
""" Méthode personnalisée. Ne peut utiliser que les indicateurs déclarés en amont de central_args.indics.ma3 """
self.df_pilot['MA3'] = 2 * self.df_pilot['Close'] - self.df_pilot['MA2']
def post_params(self):
""" En absence de surcharge, cette méthode peut être supprimée. """
""" Méthode optionnelle. A supprimer ou à commenter si la surcharge est inutile.
Surcharge, insertion (avant super()) ou ajout (après super()) au code post_params() de la classe-parent. """
""" Affichage des marqueurs. """
if self.df_marks is not None:
o_ax = self.o_ax('Principal')
sns.scatterplot(data=self.df_marks, x='Index', y='Close', ax=o_ax, hue='Type', size='Type',
sizes=(30, 30), zorder=4)
ret = super().post_params() # Code avant -> inséré, code après -> ajouté.
return ret
def cost(self, **d_params):
""" Datas disponibles : {self.np_datas} <-- les colonnes correspondent à {self.l_columns}. """
""" Fonctions internes obligatoires. """
def c(txt):
""" txt : texte de la condition. Opérateurs disponibles : /, \\, --, v, ^, cu, cd, ==, !=, >, >=, <, <= """
return self.cond(txt, indx)
def open_cond(op):
self.opening(op, indx)
def close_cond(indx_open):
""" Fermeture conditionnelle du trade ouvert à {indx_open}. """
op = self.np_trades[indx_open, 6] # 1 = achat, -1=vente.
return c('MA1 cu MA2') if op == 1 else c('MA1 cd MA2')
self.close_cond = close_cond # <-- Fonction close_cond
""" Paramètres de la stratégie : self.d_cost. Commencent par 'p_'. """
# ...
# [print(f'{i}={col}', end=', ') for i, col in enumerate(self.l_columns)] # np_datas. Commenter ou supprimer.
""" Colonnes de np_datas = 0=Close, 1=MA1, 2=MA2, 3=RSI, 4=RSI_SMA, 5=Surf_H """
indx_min = d_params['indx']
for indx in range(indx_min, self.nb_rows):
""" Fermetures. """
self.closures(indx)
""" Ouverture achat (ob = Open Buy). """
if c('MA1 cd MA2'):
open_cond('buy')
""" Ouverture vente (os = Open Sell). """
if c('MA1 cu MA2'):
open_cond('sell')
return super().cost(**d_params)
if __name__ == '__main__':
Geek()
/trading/strategies
.ga_model.py
(ou ga_quick_model.py
), de la documentation, des dépendances, etc.central_args()
.GeneticAlgorithm.get_model()
.ma3
.ga_model.py
contient des exemples desquels vous pouvez vous inspirer.MA1
et MA2
, de type zmla
.ma1.length
ma2.length
MA1
, on désire tester les valeurs 100 à 300, par pas de 5 : [100, 105, 110, 115, ..., 300]MA2
, on désire la même valeur que pour MA1
, multipliée par un coefficient de 1.01 à 1.1 par pas de 0.02
MA2
doit être plus longue que MA1
.multiply()
dans central_args()
.ga_bounds
de chaque indicateur concerné, du bloc indics
.
indics=dict(
ma1=dict(
ga_bounds=dict( # Paramètres testés par l'algorithme génétique : limites, pas.
length=[100, 300, 5], # [from, to, step] ex : [100, 300, 5] -> 100 à 300 par pas de 5.
),
),
ma2=dict(
ga_bounds=dict(
length=[1.01, 1.2, .02, multiply, 'ma1.length'], # multiply = fonction interne.
),
),
),
Ce snippet ne montre que la partie ga_bounds
, dans un souci de clarté.
ga_bounds
, comme indiqué dans le code ci-dessus.cost()
, comme vue dans la Partie 1 : Voir les ouvertures et les fermetures.La constante MODE est déclarée et affectée au début du fichier, après les imports.
Exécuter, on obtient l'affichage suivant :
ma1.params.length : [100, 105, 110, 115, 120, 125, 130, 135, 140, 145, 150, ..., 290, 295, 300]
ma2.params.length : [1.01, 1.03, 1.05, 1.07, 1.09, 1.11, 1.13, 1.15, 1.17, 1.19]
Espace de recherche : 41 * 10 = 410 chromosomes.
L'affichage nous indique les différentes valeurs qui seront testées en MODE 3 et 4..
ma1.params.length[23] -> 215
ma2.params.length[7] -> 1.1500000000000001
Minimisation de l'algo génétique : 2676.618
Chromosome : [23, 7]
Gains : 1053.6 pips
Drawdown : 1270.1 pips
Arguments : {'ma1': {'params': {'length': 215}}, 'ma2': {'params': {'length': 247.25}}}
A chaque exécution le chromosome est différent : il en résulte des scores différents pour les gains et le drawdown.
Par exemple, ici, les arguments sont 215 et 247.
population_size=100
dans la grappe ga{}
du dictionnaire de central_args()
.max_num_iteration=50
ici.self.cost()
.Minimisation de l'algo génétique : 3905.692 ← Score à minimiser.
Chromosome : [7. 0.]
Gains : -176.6 pips
Drawdown : 1264.6 pips
Arguments : {'ma1': {'params': {'length': 135}}, 'ma2': {'params': {'length': 136.35}}}
Minimisation de l'algo génétique : 3890.635
Chromosome : [38. 8.]
Gains : 564.8 pips
Drawdown : 1510.8 pips
Arguments : {'ma1': {'params': {'length': 290}}, 'ma2': {'params': {'length': 339.3}}}
Minimisation de l'algo génétique : 2734.467
Chromosome : [13. 0.]
Gains : 170.0 pips
Drawdown : 980.8 pips
Arguments : {'ma1': {'params': {'length': 165}}, 'ma2': {'params': {'length': 166.65}}}etc ...
Ici, le score à minimiser diminue avec le temps : 3905, puis 3890, puis 2734, etc.
Arguments
.indics{}
du dictionnaire de central_args()
.
Vous obtiendrez de meilleurs résultats avec une stratégie plus élaborée. La stratégie des 2 moyennes est un cas d'école.
cost()
de la classe-mère, à savoir GeneticAlgorithm.cost()
.minimize
.'Choix minimize'
qui se termine par 'Fin choix minimize'
.minimize = drawdown
.minimize = -gains_pips
(notez le signe moins).coef_drawdown
, coef_gain
, etc.central_args()
: trouvez 'Apprentissage'
et 'Test'
vers le début.\trading\strategies\partie_3_ga
.\Strategie_01_xxxx
, \Strategie_02_xxxx
, \Strategie_03_xxxx
, etc selon vos recherches.Bonjour les codeurs !