Ajout d'une dimension à l'espace des recherches
Description
y=ax+b
, 2ème degré : y=ax^2 + bx + c
, 3ème degré : y=ax^3 + bx^2 + cx + d
, etc.y
en fonction de x
.
Voici 2 modèles conçus pour gagner du temps lors de la création d'une nouvelle stratégie basée sur des indicateurs dynamiques :
☐ /trading/strategies/models/dyn_model.py
:
""" Version 2022-04-22 """
# Imports externes
import pandas_ta as ta # Pour afficher la documentation d'un indicateur -> help(ta.kama)
# import numpy as np
import seaborn as sns
# Imports internes
from functions.utils import Dictionary
from show.show_geek import ShowGeek
# noinspection PyTypeChecker,PyUnresolvedReferences
class Geek(ShowGeek):
def __init__(self):
super().__init__()
def central_args(self, l_keys):
od_args = Dictionary(dict(
# ************************************ FENÊTRE ************************************
ui=dict(
geometry=(200, 40, 1_600, 700),
abscissa_size=200,
window_title="Signaux dynamiques (Modèle)",
figure_title="Modèle 'dyn_model.py'",
subplots=dict(
Principal=60,
Milieu='',
# Bas='',
),
# show_volume=True,
# animation=False,
interval=10,
),
# ******************************** DATAFRAME PILOTE *******************************
pilot=dict(
instrument='EUR/USD',
table=10,
test=dict(pc_from=None, pc_to=100, nb_rows=2_000),
),
# ********************************** INDICATEURS **********************************
indics=dict(
backtest=False,
stats=False,
lignes=dict(
function=ta.lines,
series=['Trace_H', 'Pente_H', 'X_H', 'Trace_L', 'Pente_L', 'X_L', 'Stats'],
params=dict(
# statistics=False,
delay=10,
),
),
custom=dict(
function=self.custom,
params=dict(),
),
ma=dict(
function=ta.ema,
series='MA',
params=dict(
length=80,
),
),
),
# *********************************** AFFICHAGE ***********************************
display=dict(
Principal=dict(
lines=dict( # c : 'r', 'g', 'b', 'y', 'm', 'c', 'w', 'k' - Voir matplotlib named_colors.
Trace_H=dict(c='b', lw=.6, ls='-'), # ls = '-', '--', '-.', ':' ('' = invisible)
Trace_L=dict(c='darkorange', lw=.6, ls='-'),
Mean_trace=dict(c='g', lw=.3, ls='-'),
MA=None,
),
legend='lm', # 2 lettres : [l|c|r][h|m,b], ou bien False.
color_between=[ # (Ctrl + clic) https://matplotlib.org/stable/gallery/color/named_colors.html
dict(y1='Trace_H', y2='Trace_L', color_up='#0a01', color_down='#00a1'),
],
),
Milieu=dict(
lines={
'Pente_H': dict(c='b', lw=.6, ls='-'),
'Pente_L': dict(c='darkorange', lw=.6, ls='-'),
# 'Length': dict(zoom=.05),
},
legend='lm', # 'rh', 'lh', 'lb', 'rb', 'rm', 'lm', 'rm', 'cb', 'ch', 'cm' ... ou False.
lines_H=[
dict(y=0, lw=.5, ls='-.', c='r'),
],
color_between=[
dict(y1='Pente_H', y2='Pente_L', color_up='#00aa0008', color_down='#00a1'),
],
),
)
))
return Dictionary(od_args.read(l_keys, {}))
def custom(self, **kwargs):
""" Indicateurs personnalisés.
- Le nom de cette méthode est libre.
- Il doit simplement correspondre au nom donné à un indicateur de [central_args, indics].
- Les 'Series' utilisées ici (les colonnes de df_pilot) doivent exister (préalablement créées).
"""
indx = self.df_pilot.index.to_series()
self.df_pilot['Length'] = 2*indx - (self.df_pilot['X_H'] + self.df_pilot['X_L'])
self.df_pilot['Mean_trace'] = (self.df_pilot['Trace_H'] + self.df_pilot['Trace_L']) / 2
def post_params(self):
""" En absence de surcharge, cette méthode peut être supprimée. """
""" Méthode optionnelle. A supprimer ou à commenter si la surcharge est inutile.
Surcharge, insertion (avant super()) ou ajout (après super()) au code post_params() de la classe-parent. """
""" Documentation de ta.lines """
help(ta.lines)
""" Affichage des peaks. """
t_added = self.get_d_added('lignes')
if t_added is not None:
pass
np_top, np_bottom = t_added
o_ax = self.o_ax('Principal')
sns.scatterplot(x=np_top[:, 0], y=np_top[:, 1], ax=o_ax)
sns.scatterplot(x=np_bottom[:, 0], y=np_bottom[:, 1], ax=o_ax)
ret = super().post_params() # Code avant -> inséré, code après -> ajouté.
return ret
def hook_axis_anim(self, axis_name, x, o_ax, df, y_min, y_max, get_line):
""" En absence de surcharge, cette méthode peut être supprimée.
Surcharge, insertion ou ajout au code hook_axis_anim() de la classe-parent. """
if axis_name == 'Principal':
kwargs = dict(
line_h=dict(col_trace='Trace_H', col_pente='Pente_H', col_x='X_H', c='b', lw=2, ls='-.'),
line_l=dict(col_trace='Trace_L', col_pente='Pente_L', col_x='X_L', c='darkorange', lw=2, ls='-.'),
# pattern_verbose=False, # Valeur par défaut : True.
)
""" Voir documentation de l'indicateur ta.lines """
ta.lines.plot(ax=o_ax, df=self.df_pilot, x=x, **kwargs)
return super().hook_axis_anim(axis_name, x, o_ax, df, y_min, y_max, get_line)
def hook_line_anim(self, axis_name, line_name, x, y, o_ax, df, o_line):
""" En absence de surcharge, cette méthode peut être supprimée.
Surcharge ou ajout au code hook_line_anim() de la classe-parent. """
hook = super().hook_line_anim(axis_name, line_name, x, y, o_ax, df, o_line) # hook = Valeur de retour.
""" - Masque la partie droite des graph. pour permettre de voir certaines prédictions (ex: nuage ichimoku).
- Masquage conditionnel : ajoutez des conditions avec axis_name, line_name, ... """
# y.loc[x[-10]:] = np.nan # La valeur 100 est un exemple, la remplacer par une variable.
return hook
if __name__ == '__main__':
Geek()
☐ /trading/strategies/models/dyn_quick_model.py
:
""" Version 2022-04-22 """
# Imports externes
# import pandas_ta as ta # Pour afficher la documentation d'un indicateur -> help(ta.kama)
# import seaborn as sns
# Imports internes
from functions.utils import Dictionary
from show.show_geek import ShowGeek
# noinspection PyTypeChecker,PyUnresolvedReferences
class Geek(ShowGeek):
def __init__(self):
super().__init__()
def central_args(self, l_keys):
od_args = Dictionary(dict(
# ************************************ FENÊTRE ************************************
ui=dict(
geometry=(200, 40, 1_600, 700),
abscissa_size=200,
window_title="Signaux dynamiques (Quick model)",
figure_title="Modèle 'dyn_quick_model.py'",
subplots=dict(
Principal=60,
Milieu='',
),
# animation=False,
interval=10,
),
# ******************************** DATAFRAME PILOTE *******************************
pilot=dict(
instrument='EUR/USD',
table=10,
test=dict(pc_from=None, pc_to=100, nb_rows=2_000),
),
# ********************************** INDICATEURS **********************************
indics=dict(
backtest=False,
stats=False,
# xxxxx=dict( # ... indicateurs
# function=ta.xxxxx,
# series=['xxxx', 'xxxx', 'xxxx'],
# params=dict(
# # ...
# ),
# ),
custom=dict(
function=self.custom,
params=dict(),
),
),
# *********************************** AFFICHAGE ***********************************
# color : 'r', 'g', 'b', 'y', 'm', 'c', 'w', 'k' - Voir aussi matplotlib named_colors.
# ls = '-', '--', '-.', ':' ('' = invisible) - Voir matplotlib linestyles.
# legend, 2 lettres : [l|c|r][h|m,b], ou bien False.
display=dict(
Principal=dict(
lines=dict(
# ...
),
# legend='lm',
# color_between=[ # (Ctrl + clic) https://matplotlib.org/stable/gallery/color/named_colors.html
# dict(y1='...', y2='...', color_up='...', color_down='...'),
# ],
),
Milieu=dict(
lines=dict(
# ...
),
),
)
))
return Dictionary(od_args.read(l_keys, {}))
def custom(self, **kwargs):
pass
if __name__ == '__main__':
Geek()
☐ Renommer quick_model
en ga_quick_model
.
Pas d'héritage multiple, ces modèles héritent seulement de show_geek, qui a du être adapté pour assurer la compatibilité ascendante avec les modèles étudiés précédemment.
☐ /show/show_geek.py
:
""" Version 2022-04-22 """
# Imports externes.
import numpy as np
import seaborn as sns
from matplotlib import pyplot as plt
from matplotlib.ticker import FormatStrFormatter
import matplotlib.animation
import copy
# Imports internes.
from functions.utils import Dictionary, Utils
from trading.historiques.ctrl_histos import CtrlHistos
# noinspection PyUnusedLocal,PyUnresolvedReferences
class ShowGeek:
def __init__(self, mode=None):
CtrlHistos.custom_ta()
self.b_ga = mode is not None
if self.b_ga:
super().__init__(mode=mode) # Appelle Genetic.__init__(), 2ème classe héritée de la classe dérivée.
else:
mode = 1
self.fig = plt.figure(1)
self.mode = mode
self.df_pilot = None
self.df_scats = None
self.d_added = dict()
self.nb_datas = 0
self.l_ax = None
self.pips = 0
""" Animation. """
self.b_anim = True
self.abscissa_size = 0
self.pointer = -1
self.b_paused = False
self.b_reverse = False
self.magn = 1 # Appui sur les flèches : Le pas d'avancement est : 1, 10 ou 100.
""" Signal pilote. """
self.od = Dictionary(self.set_heights())
self.get_df_pilot()
""" Mise au point : Exécuter les modes 1 à 5 dans l'ordre. """
if self.mode == 1:
""" Mode 'normal', selon votre imagination : Création, exécution et affichage de la stratégie. """
self.show_ui()
elif self.mode == 2:
""" Affichage des listes de paramètres à tester par l'algorithme génétique. """
self.control_bounds()
elif self.mode == 3:
""" Exécute une seule boucle de l'algorithme génétique pour la mise au point du code. """
self.ga(b_single_loop=True)
elif self.mode == 4:
""" Exécute l'algorithme génétique complet pour obtenir les meilleurs paramètres. """
self.ga() # Les indices fournis à la fin du traitement sont utilisées ci après (mode 5).
else: # 5 - Optionnel
""" Affichage des paramètres sélectionnés par l'algorithme génétique. """
# self.control_bounds(i1, i2, ..., i{n}) # <-- Remplacer les i par les indices fournis au mode 4.
def get_args(self):
args = self.central_args('ui')
return dict( # Ces valeurs seront à la racine du super-dictionnaire {self.od}.
geometry=args.read('geometry', (100, 40, 1000, 700)), # x, y, w, h.
margins=args.read('margins', (6, 8, 10, 8)), # Marges : haut, droite, bas, gauche.
abscissa_size=args.read('abscissa_size', 600), # Nb de points affichés en abscisse.
window_title=args.read('window_title', "Modèle simple"), # Titre de la fenêtre.
figure_title=args.read('figure_title', "Modèle simple - Ne pas modifier"), # Titre des graphiques.
leader_lines=args.read('leader_lines', True), # Lignes hortogonales de repère, suivi de la souris.
subplots=args.read('subplots', dict( # Noms et hauteurs (en %) des graphiques modifiables.
Principal=65, # Subplot principal : NE PAS MODIFER SON ORTHOGRAPHE.
Milieu='', # '' : Si chaîne vide => Les hauteurs seront automatiquement réparties.
Bas='',
)),
show_pilot=args.read('show_pilot', True),
show_volume=args.read('show_volume', False),
show_linked_label=args.read('show_linked_label', True),
best_zones=dict(
show=args.read(['best_zones', 'show'], False),
gap=args.read(['best_zones', 'gap'], 20), # Nombre de pips take-profit ou stop-loss.
bandwidth=args.read(['best_zones', 'bandwidth'], 80), # Pourcentage : de 0 à 100.
up=args.read(['best_zones', 'up'], True), # Affichage des palliers haut.
down=args.read(['best_zones', 'down'], True), # Affichage des palliers bas.
scatters=args.read(['best_zones', 'scatters'], True), # Affichage des optimums sous forme de ronds.
confirm=args.read(['best_zones', 'confirm'], True),
zig_zag=args.read(['best_zones', 'zig_zag'], True), # Affichage de la courbe zig-zag.
colors=args.read(['best_zones', 'colors'], ('#ffff0030', '#ff00ff10')), # Coloriage des ouvertures.
),
animation=args.get('animation', True),
interval=args.get('interval', .1),
# Paramètres généraux supplémentaires ici ...
# Paramètres seaborn : https://www.python-simple.com/python-seaborn/seaborn-general.php
)
def show_ui(self):
""" ****************** Algorithme de construction ****************** """
""" Paramètres avant la création des graphiques (axes). """
if self.l_ax is not None:
return
self.pre_params()
""" Ajout des indicateurs dans la dataframe {df_pilot} <-- Colonnes ajoutées à {df_pilot}. """
self.add_indics()
""" Ajout du signal pilote. """
self.show_pilot()
""" Distribution des signaux (des colonnes) : un dataframe par axe dans {self.l_ax}. """
self.distrib()
""" Création des graphiques (axes). """
self.build_axis()
""" Paramètres après la création des graphiques (axes). """
self.post_params()
""" Affichage animé. """
self.show()
def animate(self, _=''):
""" Appelé dans la boucle matplotlib.FuncAnimation depuis self.show(). """
def get_line(_line_name):
_from, _to = x[0], x[0] + len(x)
return self.df_pilot[_line_name][_from: _to]
if self.b_paused:
return
""" Gestion du pointeur. """
self.set_pointer()
""" Abscisse commune : liste de {abscissa_size} valeurs depuis {pointer}. """
x = list(range(self.pointer, self.pointer + self.abscissa_size))
""" Parcours des graphiques (axes). """
for axis_name in self.od.keys():
if axis_name == 'figure':
continue
o_ax = self.od.read([axis_name, 'o_ax'])
if o_ax is None:
continue
df = self.od.read([axis_name, 'df'])
y_min, y_max = 10 ** 9, -10 ** 9
if not (df is None or df.empty):
""" Parcours des courbes (lines). """
for o_line in self.get_axis_lines(axis_name):
""" Plusieurs courbes (lines) dans un graphique (o_ax). """
line_name = o_line.get_label()
serie = df[line_name]
y = serie[self.pointer: self.pointer + self.abscissa_size]
""" Égalisation des tailles de x et y. """
len_min = min(len(x), len(y))
x, y = copy.copy(x[:len_min]), copy.copy(y[:len_min])
""" Hook pour l'injection de paramètres dans cette courbe. """
d_attr = self.hook_line_anim(axis_name, line_name, x, y, o_ax, df, o_line)
if d_attr is None:
d_attr = {}
""" Injection des datas à afficher. """
if d_attr.get('visible', True):
o_line.set_ydata(y)
o_line.set_xdata(x)
if line_name == 'Close':
o_line.set_color('0.6')
if not np.isnan(y).all():
y_min = min(y_min, np.nanmin(y))
y_max = max(y_max, np.nanmax(y))
""" Calcul des limites des ordonnées (y). """
if y_max < y_min:
y_min, y_max = -10, 10
y_padding = max(0, (y_max - y_min) * 0.05) # Marges top et bottom dans les axes (5%)
y_min, y_max = (round(y_min - y_padding, 6), round(y_max + y_padding, 6)) if y_max >= y_min else (-10, 10)
y_min = max(y_min, -10 ** 9)
y_max = min(y_max, 10 ** 9)
""" Limites x et y. """
o_ax.set_xlim(x[0], x[-1])
if y_min == y_max:
y_min = y_max = None
o_ax.set_ylim(y_min, y_max)
""" Effacement d'éléments ajoutés dans le hook : lignes, scatters, zônes colorées, textes, ... """
# https://matplotlib.org/stable/api/collections_api.html#matplotlib.collections.PathCollection
[o_child.remove() for o_child in o_ax.l_added]
l_before = o_ax.get_children()
""" Injection d'éventuels éléments : lignes, scatters, zônes colorées, textes, ... """
self.hook_axis_anim(axis_name, x, o_ax, df, y_min, y_max, get_line)
""" Mémorisation des éventuels éléments. """
o_ax.l_added = [o_child for o_child in o_ax.get_children() if o_child not in l_before]
if not self.b_anim:
plt.draw()
def set_pointer(self):
if self.b_reverse:
self.pointer -= self.magn
if self.pointer < 0:
self.pointer = self.nb_datas - self.abscissa_size
else:
self.pointer += self.magn
if self.pointer > self.nb_datas - self.abscissa_size:
self.pointer = 0
def show_pilot(self):
""" Affichage conditionnel du signal-pilote. """
self.od.write(['Principal', 'signals', 'Close', 'visible'], self.od.read(['figure', 'show_pilot'], False))
self.add_df('Principal', ['Close'])
def pre_params(self):
""" Fenêtre : taille, position, titre. """
mgr = plt.get_current_fig_manager()
mgr.set_window_title(self.od.read(['figure', 'window_title'], 'Stratégie'))
win = mgr.window
win.setGeometry(*self.od.read(['figure', 'geometry']))
""" Paramètres seaborn : https://www.python-simple.com/python-seaborn/seaborn-general.php """
sns.set_context(self.od.read(['figure', 'seaborn_context'], 'paper')) # paper, notebook, talk, poster
sns.set_style(self.od.read(['figure', 'seaborn_style'], 'darkgrid')) # white, dark, whitegrid, darkgrid, ticks
""" Ini attributs. """
self.abscissa_size = self.od.read(['figure', 'abscissa_size'], 600)
self.b_anim = self.od.read(['figure', 'animation'], False)
""" Affichage des best-zones dans le graphique (subplot) contenant le signal 'Close'. """
subplot_name = self.od.read(['figure', 'best_zones', 'subplot'])
if self.od.read(['figure', 'best_zones', 'show'], False) and subplot_name:
""" Appel de l'indicateur de trend {best_zones}. """
gap = self.od.read(['figure', 'best_zones', 'gap'], 24)
bandwidth = self.od.read(['figure', 'best_zones', 'bandwidth'], 80)
self.df_pilot[['Up', 'Down', 'Peak']], self.df_scats = self.df_pilot.ta.best_zones(gap=gap,
bandwidth=bandwidth)
""" Affichage des palliers haut et bas. """
l_columns = list()
if self.od.read(['figure', 'best_zones', 'up'], False):
l_columns.append('Up')
if self.od.read(['figure', 'best_zones', 'down'], False):
l_columns.append('Down')
self.add_df(subplot_name, l_columns)
def post_params(self):
""" Recherche des paramètres dans le dictionnaire. """
""" Titre des graphiques. """
axis_title = self.od.read(['figure', 'figure_title'])
if isinstance(axis_title, str) and len(axis_title) > 0:
o_ax = self.get_first_axis()
if self.is_axis(o_ax):
o_ax.set_title(axis_title)
""" y axis : position et visibilité des graduations (ticks). """
for l_keys in self.od.key_list():
if l_keys[-1] != 'y_ticks':
continue
position = self.od.read(l_keys)
o_ax = self.od.read([l_keys[0], 'o_ax'])
if not self.is_axis(o_ax):
continue
if position is False:
o_ax.axes.set_yticks([]) # Suppression (ticks + ticklabels + grille).
pass
else:
o_ax.yaxis.set_ticks_position(position) # left, right
""" Suppression de la grille. """
for l_keys in self.od.key_list():
if l_keys[-1] != 'grid':
continue
if self.od.read(l_keys) is False:
o_ax = self.od.read([l_keys[0], 'o_ax'])
o_ax.grid(False) # Suppression de la grille.
""" x axis : visibilité des graduations (ticks) """
for l_keys in self.od.key_list():
if l_keys[-1] != 'x_ticks':
continue
o_ax = self.od.read([l_keys[0], 'o_ax'])
if not self.is_axis(o_ax):
continue
if self.od.read(l_keys) is False:
o_ax.tick_params(axis='x', colors='#fff0') # '#fff0' = invisible
""" Légendes, lignes horizontales et verticales, coloriages. """
od_display = self.central_args('display')
for axis_name, params in od_display.items():
o_ax = self.od.read([axis_name, 'o_ax'])
if self.get_axis_lines(axis_name): # Si le graphique n'a aucune ligne, on ne fait rien.
""" Légendes. """
legend = params.get('legend')
pos = ['auto', 'rh', 'lh', 'lb', 'rb', 'rm', 'lm', 'rm', 'cb', 'ch', 'cm']
if isinstance(legend, bool):
o_ax.legend().set_visible(legend) # Visibilité de la légende.
elif legend in pos:
o_ax.legend(loc=pos.index(legend)).set_title(axis_name)
else:
o_ax.legend().set_title(axis_name)
""" Lignes horizontales. """
l_lines_h = params.get('lines_H', [])
if isinstance(l_lines_h, dict):
l_lines_h = [l_lines_h]
for d_line in l_lines_h:
self.trace_hline(axis_name, **d_line)
""" Lignes verticales. """
l_lines_v = params.get('lines_V', [])
if isinstance(l_lines_v, dict):
l_lines_v = [l_lines_v]
for d_line in l_lines_v:
self.trace_vline(axis_name, **d_line)
""" Coloriage inter-zônes : statiques et dynamiques. """
ld_color_between = params.get('color_between', []) # Liste de dictionnaires.
l_args_dyn, l_args_stat = list(), list()
for d_between in ld_color_between:
y1, y2 = d_between.get('y1'), d_between.get('y2')
if isinstance(y1, str) or isinstance(y2, str):
""" Coloriage dynamique, rafraîssement (à chaque affichage), délégué au hook axis. """
color_up, color_down = d_between.get('color_up'), d_between.get('color_down')
if color_up is not None:
l_args_dyn.append(dict(y1=y1, y2=y2, where='>', fc=color_up, interpolate=True))
if color_down is not None:
l_args_dyn.append(dict(y1=y1, y2=y2, where='<', fc=color_down, interpolate=True))
elif isinstance(y1, (int, float)) and isinstance(y2, (int, float)):
""" Coloriage statique, passage unique. """
color = d_between.get('color')
if color is not None:
l_args_stat.append(dict(y1=y1, y2=y2, fc=color, ec='#fff0'))
if l_args_stat:
self.fill_between(o_ax, l_args_stat, b_static=True)
if l_args_dyn:
self.od.write([axis_name, 'color_between'], l_args_dyn)
def build_axis(self):
""" Construction de la fenêtre Windows. """
""" Marges générales et répartition des subplots. """
l_margins = self.od.read(['figure', 'margins'], (6, 8, 10, 8)) # Marges : haut, droite, bas, gauche.
x, w = l_margins[3] / 100, 1 - (l_margins[1] + l_margins[3]) / 100
y_offset, y_height = l_margins[0], 100 - l_margins[0] - l_margins[2] # Valeurs : 0 à 100.
ax_top, y = 100, 1 - y_offset / 100
first_axis, axis_ante = '', ''
for axis_name, h in self.od.read(['figure', 'subplots']).items():
o_ax = None
if isinstance(h, (int, float)):
if first_axis == '':
first_axis = axis_name
else:
self.od.write([axis_ante, 'x_ticks'], False)
axis_ante = axis_name
h = h * y_height / 10_000
y -= h
o_ax = self.fig.add_axes((x, y, w, h), sharex=self.od.read([first_axis, 'o_ax']))
self.od.write([axis_name, 'geometry'], (x, y, w, h))
else:
""" Graphique jumelé (twined axis). """
tw = h.split()
if tw[0] == 'twinned':
""" Axes jumelés. """
parent_name = tw[-1]
parent_axis = self.od.read([parent_name, 'o_ax'])
if self.is_axis(parent_axis):
o_ax = parent_axis.twinx()
if self.is_axis(o_ax):
self.od.write([axis_name, 'o_ax'], o_ax)
""" Ajout artificiel d'attributs vides. """
o_ax.note_x = o_ax.annotate('', (0, 0))
# o_ax.note_y = o_ax.annotate('', (0, 0))
o_ax.l_added = list()
""" Repères : lignes hortogonales sous le curseur de la souris. Voir on_mouse_move() """
o_ax.hline = o_ax.axhline(y=-1000, color='k', lw=.2, ls='--') # -1000 = Souris hors figure.
o_ax.vline = o_ax.axvline(x=-1000, color='k', lw=.2, ls='--')
""" Format des graduations y. """
o_ax.yaxis.set_major_formatter(FormatStrFormatter('%.1f'))
""" Initialisation des courbes (DataFrames) dans les graphiques (axes). """
df = self.od.read([axis_name, 'df'])
if axis_name == self.od.read(['figure', 'best_zones', 'subplot']) and self.is_df(self.df_scats):
if self.od.read(['figure', 'best_zones', 'scatters'], False):
sns.scatterplot(data=self.df_scats, x='Indx', y='Close', hue=axis_name, ax=o_ax)
if self.od.read(['figure', 'best_zones', 'confirm'], False):
sns.scatterplot(data=self.df_scats, x='confirm_x', y='confirm_y', ax=o_ax, hue='c_type',
alpha=.3)
if self.od.read(['figure', 'best_zones', 'zig_zag'], False):
sns.lineplot(data=self.df_scats, x='Indx', y='Close', ax=o_ax, lw=.5)
o_ax.set_ylabel('')
""" Lines. """
if self.is_df(df) and len(df.columns) > 0:
sns.lineplot(data=df[:1], ax=o_ax)
for o_line in o_ax.lines:
line_name = o_line.get_label()
if not line_name.startswith('_line'):
d_vals = self.od.read([axis_name, 'signals', line_name])
if d_vals is None:
continue
color = d_vals.get('c', d_vals.get('color'))
if color is not None:
o_line.set_color(color)
if line_name.endswith('__trace'):
o_line.set(alpha=.2)
o_line.set_linewidth(4)
else:
linewidth = d_vals.get('lw', d_vals.get('linewidth'))
if linewidth is not None:
o_line.set_linewidth(linewidth)
linestyle = d_vals.get('ls', d_vals.get('linestyle'))
if linestyle is not None:
o_line.set_linestyle(linestyle)
""" Position (left, right) des graduations y. """
b_odd = False
l_axes = self.get_axis_names()
if self.od.read(['Volume', 'twined_axis']) == 'Principal':
b_odd = bool(l_axes.index('Principal') % 2) # Position impaire de 'Principal'.
for indx, axis_name in enumerate(l_axes):
self.od.write([axis_name, 'y_ticks'], 'left' if b_odd == indx % 2 else 'right')
""" Coloriage vertical conditionnel des zônes d'entrée en position (ouverture de trade). """
b_zones = self.od.read(['figure', 'best_zones', 'show'], False)
colors = self.od.read(['figure', 'best_zones', 'colors'])
if b_zones and colors is not None:
peaks = self.df_pilot['Peak']
fc_b, fc_t = colors
l_args = [
dict(x=peaks.index, y1=10 ** 5, y2=-10 ** 5, where=peaks <= -1, fc=fc_b, ec='#fff0', interpolate=True),
dict(x=peaks.index, y1=10 ** 5, y2=-10 ** 5, where=peaks >= 1, fc=fc_t, ec='#fff0', interpolate=True),
]
for ax_name in l_axes:
oax = self.o_ax(ax_name)
if self.is_axis(oax):
self.fill_between(oax, l_args)
def set_heights(self):
""" Calcul des hauteurs (en %) subplots. La somme fait 100%. """
args = self.get_args()
if not isinstance(args['subplots'], dict):
raise SystemExit("Super-dictionnaire : La clé 'subplots' doit exister et doit contenir un dictionnaire. ")
l_heights, sum_heights, nb_zeros = list(), 0, 0
for name, pc_height in args['subplots'].items():
height = pc_height if (isinstance(pc_height, int) and pc_height > 0) else 0
nb_zeros += 1 if height == 0 else 0
sum_heights += height
l_heights.append((name, height))
if sum_heights > 100:
""" La somme des hauteurs dépasse 100% : on effectue une réduction proportionnelle. """
l_heights = [(name, pc_height) for (name, pc_height) in l_heights if pc_height > 0] # Suppression des 0.
l_heights = [(name, pc_height * 100 / sum_heights) for (name, pc_height) in l_heights] # Normalisation.
else:
""" Si la somme == 100%, on ne fait rien. Si elle est < 100%, on ajoute un subplot, nommé 'rest'. """
if nb_zeros == 0:
l_heights.append(('rest', 0))
nb_zeros = 1
h_rest = (100 - sum_heights) / nb_zeros
if h_rest == 0:
l_heights = [(name, pc_height) for (name, pc_height) in l_heights if pc_height > 0] # Suppression des 0
for i, (name, pc_height) in enumerate(l_heights):
if pc_height == 0:
l_heights[i] = (name, h_rest)
args['subplots'] = dict(l_heights)
""" Ajout d'arguments par défaut. """
if 'seaborn_context' not in args: # Valeurs possibles : paper, notebook, talk, poster
args['seaborn_context'] = 'paper'
if 'seaborn_style' not in args: # Valeurs possibles : white, dark, whitegrid, darkgrid, ticks
args['seaborn_style'] = 'darkgrid'
return {'figure': args}
def dyn_indic(self, l_columns, dyn_func, **kwargs):
""" Code appelant : add_indics(). A ce stade, on ne connaît pas encore les graphiques (axes) affectés. """
""" Mise en conformité des colonnes du DataFrame : type 'list', même s'il n'y en a qu'une. """
if isinstance(l_columns, str):
l_columns = [l_columns]
kwargs['l_cols'] = l_columns # type(l_columns) = list.
""" Nombre de points affichés. """
showed_length = min(self.abscissa_size, kwargs.get('showed_length', self.abscissa_size // 3))
kwargs['showed_length'] = showed_length
self.df_pilot = self.df_pilot.reindex(columns=self.df_pilot.columns.tolist() + o_dyn.l_columns)
def add_df(self, subplot_name, l_columns, df=None):
""" Création du dataframe qui sera associé à un graphique.
- Méthode appelée par la méthode distrib() de la classe dérivée.
- Le DataFrame {df} a plusieurs colonnes, chacune associée à une courbe. """
""" Vérification de l'existence du graphique {subplot_name}. """
if self.od.read(['figure', 'subplots', subplot_name]) is None:
raise SystemExit(f"Erreur dans self.distrib() : Le subplot {subplot_name} n'existe pas.")
l_entire_list = list()
for column in l_columns:
""" {column} est le nom de la colonne, ou bien un dictionnaire d'attributs (couleur, épaisseur, ...). """
if isinstance(column, dict):
""" Ajout du dictionnaire d'attributs au dictionnaire global {self.od}. """
col_name = column.pop('name')
d_params = column if column else None
self.od.write([subplot_name, 'signals', col_name.strip('_')], d_params)
column = col_name
""" Exemples : 'RegLine' : RegLine seul, 'RegLine_' : Trace seule, 'RegLine__' : RegLine + Trace """
if column.endswith('__'):
l_entire_list.append(column[:-2])
l_entire_list.append(f'{column[:-1]}trace')
elif column.endswith('_'):
l_entire_list.append(f'{column}trace')
else:
l_entire_list.append(column)
l_cols = list()
for column in l_entire_list:
""" Suppression des colonnes inexistantes. """
try: # Test colonne par colonne.
self.df_pilot[column] if df is None else df[column]
except KeyError:
""" Si la colonne n'existe pas ... """
Utils.printc(f"distrib()-{subplot_name} : La colonne '{column}' n'existe pas dans le DataFrame.")
continue
l_cols.append(column)
""" Fusion par ajout de colonnes dans le df existant. """
df = self.df_pilot[l_cols] if df is None else df[l_cols]
df_exist = self.od.read([subplot_name, 'df'])
if df_exist is not None:
df_exist[df.columns] = df
df = df_exist # On retrouve les colonnes de df_exist en premières places.
if not self.is_df(df):
return
self.od.write([subplot_name, 'df'], df)
for column in df.columns:
if column.lower() == 'close':
""" Si ce dataframe a une colonne 'Close', on paramètre 3 traitements distincts par défaut :
- Une étiquette-suiveuse (linked_label).
- Un graphique jumelé (twined_axis) contenant l'affichage du volume en semi-transparence.
- Éléments du graphique jumelé non affichés : y_ticks, y_ticks_labels, grille, légende. """
self.od.write(['figure', 'best_zones', 'subplot'], subplot_name)
if self.od.read(['figure', 'show_linked_label'], False):
self.od.write([subplot_name, 'linked_label'], 'Close')
if self.od.read(['figure', 'table_name'], None) is None:
self.od.write(['figure', 'show_volume'], False)
if self.od.read(['figure', 'show_volume'], False):
self.od.write(['figure', 'subplots', 'Volume'], f'twinned with {subplot_name}')
self.od.write(['Volume', 'df'], self.df_pilot[['Volume']])
self.od.write(['Volume', 'signals', 'Volume', 'visible'], False)
self.od.write(['Volume', 'twined_axis'], subplot_name)
self.od.write(['Volume', 'y_ticks'], False), # Suppression (ticks + ticklabels + grille).
self.od.write(['Volume', 'legend'], False) # Suppression de la légende.
def get_df_pilot(self):
od_pilot = self.central_args('pilot')
if 'test' in od_pilot:
od_pilot.fusion(od_pilot['test' if self.mode == 1 else 'learn'])
instrument = od_pilot.read('instrument', 'EUR/USD')
self.pips = .01 if instrument.endswith('JPY') else .0001
table_name = od_pilot.read('table', 10)
self.df_pilot = CtrlHistos(instrument).get_pilot(**od_pilot) # DataFrame
self.nb_datas = self.df_pilot.shape[0]
self.od.write(['figure', 'instrument'], instrument)
self.od.write(['figure', 'table_name'], table_name)
self.od.write(['figure', 'nb_points'], self.nb_datas)
def linked_label(self, axis_name, x, o_ax, df):
""" https://matplotlib.org/stable/tutorials/text/annotations.html#annotating-with-text-with-box """
linked_label = self.od.read([axis_name, 'linked_label'])
if linked_label:
bbox = {'boxstyle': 'larrow', 'fc': '#ff02', 'ec': 'k', 'lw': .5} # fc=face color, ec=edge color
indx = min(x[-1], df.shape[0] - 1)
last_value = round(df[linked_label][indx], 2) # En pips.
""" Affichage de l'étiquette actuelle (m = marge). """
m = self.abscissa_size / 70
o_ax.annotate(last_value, (x[-1], last_value), xytext=(x[-1] + m, last_value), bbox=bbox)
def paint_volume(self, x, o_ax, df):
y = df['Volume'][x[0]: x[-1] + 1] * .8 # Affichage du volume à 80% d'amplitude.
l_args = [
dict(x=x, y1=y, y2=0, fc='#0079a333', ec='#fff0')
]
self.fill_between(o_ax, l_args)
def fill_between(self, o_ax, l_params, b_static=False):
""" Colorie entre 2 valeurs, conditionnellement.
@param o_ax: Axis matplotlib : graphique contenant des courbes.
@param l_params: Liste de coloriages. Chaque élément est un dictionnaire personnalisé. Voir lien ci-dessous.
@param b_static: Coloriage statique (un seul passage).
https://matplotlib.org/stable/api/_as_gen/matplotlib.axes.Axes.fill_between.html """
""" Effacement du coloriage antérieur, nécessaire si les couleurs ont une transparence. """
if not self.is_axis(o_ax):
raise SystemExit(f"Erreur dans self.fill_between() : L'objet o_ax ({o_ax}) n'est pas un axis.")
""" l_fill contient autant de paramètres que de coloriages à effectuer. """
for p in l_params:
""" Égalisation des vecteurs. """
if p.get('x') is None:
p['x'] = range(self.df_pilot.shape[0])
len_min = len(p['x'])
if self.is_df(p['y1']):
len_min = min(len_min, len(p['y1']))
if self.is_df(p['y2']):
len_min = min(len_min, len(p['y2']))
p['x'] = p['x'][:len_min]
""" Coloriage. """
o_ax.fill_between(**p)
""" *********************************** Helpers. ************************************ """
def trace_hline(self, axis_name, y, **kwargs):
o_ax = self.o_ax(axis_name)
if self.is_axis(o_ax):
o_ax.axhline(xmin=0, xmax=1, y=y, **kwargs)
def trace_vline(self, axis_name, x, **kwargs):
o_ax = self.o_ax(axis_name)
if self.is_axis(o_ax):
o_ax.axvline(ymin=0, ymax=1, x=x, **kwargs)
@staticmethod
def is_axis(o_ax):
return o_ax.__class__.__name__ == 'Axes'
@staticmethod
def is_df(df):
return df.__class__.__name__ in ['DataFrame', 'Series']
def get_axis_names(self, b_twin=False):
if b_twin:
""" Liste de noms de tous les graphiques (subplots) de base et twined_axis. """
return [ax_name for ax_name in list(self.od.read(['figure', 'subplots']).keys()) if ax_name != 'rest']
else:
""" Liste de noms de tous les graphiques (subplots) de base. """
return [ax_name for (ax_name, val) in list(self.od.read(['figure', 'subplots']).items())
if ax_name != 'rest' and not str(val).startswith('twinned')]
def get_first_axis(self):
l_axes = self.get_axis_names()
o_ax = self.o_ax(l_axes[0])
if self.is_axis(o_ax):
return o_ax
def get_last_axis(self):
l_axes = self.get_axis_names()
o_ax = self.o_ax(l_axes[-1])
if self.is_axis(o_ax):
return o_ax
def _get_all_ax(self):
l_ax = list()
for key, val in self.od.read(['figure', 'subplots']).items():
l_ax.append(self.od.read([key, 'o_ax']))
return l_ax
def get_axis_lines(self, axis_name):
""" Retourne une liste d'objets o_line. """
o_ax = self.o_ax(axis_name)
if o_ax is None:
return []
l_olines = list()
for o_line in o_ax.lines:
""" Plusieurs courbes (lines) dans un graphique (o_ax). """
line_name = o_line.get_label()
if line_name.startswith('_line'):
continue
l_olines.append(o_line)
return l_olines
def o_ax(self, axis_name):
return self.od.read([axis_name, 'o_ax'])
def df(self, axis_name):
return self.od.read([axis_name, 'df'])
def ax_df(self, axis_name):
o_ax = self.od.read([axis_name, 'o_ax'])
df = self.od.read([axis_name, 'df'])
return o_ax, df
def distrib(self):
od_display = self.central_args('display')
for axis, params in od_display.items():
d_lines = params.get('lines', {})
for line, d_params in d_lines.items():
if isinstance(d_params, dict):
d_params['name'] = line
else:
d_params = dict(name=line)
self.add_df(axis, [d_params])
""" ***************************** Méthodes surchargées. ***************************** """
@staticmethod
def get_args_ui():
pass
@staticmethod
def central_args(l_keys):
raise SystemExit("show_geek.py > ShowGeek :\nLa méthode 'central_args()' doit être surchargée.")
def get_pilot(self):
raise SystemExit("show_geek.py > ShowGeek :\nLa méthode 'get_pilot()' doit être surchargée.")
""" ***************************** Méthodes à surcharger. **************************** """
def get_d_added(self, indic_key):
return self.d_added.get(self.get_signature(indic_key))
def get_signature(self, indic_key):
""" Fournit une signature unique à chaque indicateur, par assemblage des ses paramètres et noms de colonnes. """
od_indics = self.central_args('indics')
d_params = od_indics.read([indic_key, 'params'], {})
d_params['l_cols'] = od_indics.read([indic_key, 'series'], [])
signature = '_'.join(['', indic_key] + [str(param) for param in d_params.values()])
return signature.replace("'", '') # Suppression des apostrophes.
def add_indics(self, od_indics=None):
if od_indics is None:
""" Indicateurs utilisés dans cette stratégie. Voir central_args(), section 'indics'. """
od_indics = self.central_args('indics')
""" Signaux pour le calcul des gains, les marqueurs et l'affichage. """
for l_keys in od_indics.key_list():
if l_keys[-1] == 'function':
indic_key = l_keys[0]
function = od_indics.read(l_keys)
od_indics.write([indic_key, 'name'], indic_key)
kwargs = od_indics.read([indic_key, 'params'], {})
if function.__module__ == '__main__':
""" Traitement personnalisé, dans la classe dérivée utilisatrice. """
function(**kwargs) # Exécution de la fonction ou méthode.
continue
if self.b_ga:
self.get_indic(function, **od_indics[indic_key])
else:
series = od_indics.read([indic_key, 'series'])
if series is None:
continue
close = kwargs.pop('close', 'Close')
df = function(self.df_pilot[close], **kwargs)
if df.__class__.__name__ == 'tuple':
signature = self.get_signature(indic_key)
self.df_pilot[series] = df[0]
self.d_added[signature] = df[1:]
else:
self.df_pilot[series] = df
if self.b_ga:
return super().add_indics(od_indics)
def hook_axis_anim(self, axis_name, x, o_ax, df, y_min, y_max, get_line):
""" Affichage de 2 éléments distincts :
- Volumes, semi-transparent, sans bordure.
- Étiquette-suiveuse """
""" 1 - Affichage des volumes, semi-transparent, sans bordure. """
if axis_name == 'Volume':
self.paint_volume(x, o_ax, df)
""" 2 - Étiquette-suiveuse à droite du graphique. """
self.linked_label(axis_name, x, o_ax, df)
""" 3 - Coloriage inter-zônes dynamique. """
l_od_args = self.od.read([axis_name, 'color_between'])
l_args = list()
if l_od_args:
for d_args in l_od_args:
params = dict()
try:
y1, y2 = d_args['y1'], d_args['y2']
if isinstance(y1, str):
y1 = get_line(y1)
if isinstance(y2, str):
y2 = get_line(y2)
params['x'] = x
params['y1'] = y1
params['y2'] = y2
params['where'] = (y1 > y2) if d_args['where'] == '>' else (y1 < y2)
params['fc'] = d_args['fc']
params['ec'] = '#fff0'
params['interpolate'] = d_args['interpolate']
l_args.append(params)
self.fill_between(o_ax, l_args)
except (Exception,) as err:
if not hasattr(o_ax, 'hook_error'): # Pour éviter la répétition.
o_ax.hook_error = f"ShowGeek.hook_axis_anim()." \
f" Erreur dans le coloriage inter-zônes du graphique '{axis_name}' :\n{err}"
Utils.printc(o_ax.hook_error)
def hook_line_anim(self, axis_name, line_name, x, y, o_ax, df, o_line):
d_attr = self.od.read([axis_name, 'signals', o_line.get_label()])
llab = o_line.get_label()
if d_attr is not None:
zoom = d_attr.get('zoom')
if zoom is not None:
y *= zoom
return d_attr
""" **************************** Événements et affichage. *************************** """
def key_event(self, ev):
def arrows(b_forward):
b_reverse = self.b_reverse
self.b_paused, self.b_reverse = False, False if b_forward else True
self.animate()
self.b_paused = True
self.b_reverse = b_reverse
self.magn = 1
keycode = ev.key
if keycode == ' ':
""" Pause on/off. """
self.b_paused = not self.b_paused
elif keycode == 'tab':
""" Inversion de sens. """
self.b_reverse = not self.b_reverse
elif keycode == 'right' or keycode == 'left':
""" Appui sur touche flèche droite ou flèche gauche. """
arrows(keycode == 'right')
elif keycode == 'up' or keycode == 'down':
""" Appui sur touche flèche haut ou flèche bas. """
self.magn = 10
arrows(keycode == 'up')
elif keycode == 'pageup' or keycode == 'pagedown':
""" Appui sur touche flèche page-haut ou flèche page-bas. """
self.magn = 100
arrows(keycode == 'pageup')
def on_mouse_move(self, ev):
""" Le style des lignes est définidans self.build_axis(). """
for o_ax in self.l_ax:
o_ax.vline.set_data([ev.xdata, ev.xdata], [0, 1])
o_ax.hline.set_data(([0, 1], [ev.ydata, ev.ydata]) if o_ax == ev.inaxes else ([0, 0], [0, 0]))
if ev.xdata is not None and ev.ydata is not None:
ax = self.get_last_axis()
ax.note_x.remove() # Effacement de l'étiquette précédente.
bbox = {'boxstyle': 'round4', 'fc': '#ff02', 'ec': 'k', 'lw': .5} # fc=face color, ec=edge color
ax.note_x = ax.annotate(text=round(ev.xdata), xy=(ev.xdata, ax.viewLim.y0), bbox=bbox)
if self.pointer <= 1 or not self.b_anim:
plt.draw() # Nécessaire si pas d'animation (lorsque self.pointer reste à 1).
def show(self):
self.l_ax = self._get_all_ax()
self.fig.canvas.mpl_connect('key_press_event', self.key_event)
if self.od.read(['figure', 'leader_lines']):
self.fig.canvas.mpl_connect('motion_notify_event', self.on_mouse_move)
if self.b_anim:
interval = self.od.read(['figure', 'interval'], 1)
ani = matplotlib.animation.FuncAnimation(self.fig, self.animate, interval=interval, repeat=True)
# |_ La variable {ani} doit exister pour empêcher le garbage collector de supprimer l'animation.
self.animate()
plt.show()
genetic.py
vers show_geek.py
, backtest.py
a été refactoré.\trading\strategies\genetic.py
:
""" Version 2022-04-22 - Algorithme génétique. https://pypi.org/project/geneticalgorithm2/ v6.5.0 """
# Imports externes
import copy
import numpy as np
from geneticalgorithm2 import geneticalgorithm2 as ga
# Imports internes
from functions.utils import Utils, Dictionary
from trading.strategies.backtest import BackTest
# noinspection PyUnresolvedReferences
class GeneticAlgorithm(BackTest):
def __init__(self, mode):
super().__init__()
self.df_pilot = None
self.mode = mode
self.d_indics = dict()
# self.d_added = dict()
self.odl_params = self._odl_params
self.l_boundaries = self._l_boundaries
self.args = None
self.chromosome = None
self.best = 10**8
@property
def _odl_params(self):
""" - Super-dictionnaire des listes de paramètres possibles pour la stratégie.
- Les chromosomes seront créés par tirage au sort dans ces listes. """
if self.mode == 1:
return Dictionary()
od_indics = self.central_args(['indics'])
d_bounds = dict()
for l_keys in od_indics.key_list():
if len(l_keys) > 2 and l_keys[1] == 'ga_bounds':
key = f'{l_keys[0]}.params.{l_keys[2]}'
d_bounds[key] = od_indics.read(l_keys)
if not d_bounds:
raise SystemExit("Le dictionnaire central (self.central_args())"
" doit contenir des clés ['indic', *, 'ga__bounds'].")
""" Exemple : d_bounds =
ma1__length: [60, 100, 5] ---------------------------------- <class 'list'>
ma2__length: [1.01, 3, .2, func1, 'ma1.length'] ------------ <class 'list'> """
odl_params = Dictionary()
for key, l_val in d_bounds.items():
l_vals = list()
val = l_val[0]
while val <= l_val[1]:
l_vals.append(val)
val += l_val[2]
odl_params.write([key, 'l_vals'], l_vals)
if len(l_val) > 3:
if l_val[3].__class__.__name__ != 'function':
raise SystemExit(f"Dictionnaire self.central_args(['ga', 'bounds', '{key}']) :"
f"\nLe 4ème élément de la liste '{key}[]' doit être une fonction.")
odl_params.write([key, 'add'], l_val[3:])
""" Exemple : odl_params =
ma1__length: ----------------------------------------------- <class 'dict'>
l_vals: [60, 65, 70, 75, 80, 85, 90, 95, 100] ---------- <class 'list'>
ma2__length: ----------------------------------------------- <class 'dict'>
l_vals: [1.01, 1.21, 1.41, 1.61, 1.81, 2.01, 2.21, 2.41, 2.61, 2.81] <class 'list'>
add: ['ma1__length'] ----------------------------------- <class 'list'> """
return odl_params
@property
def _l_boundaries(self):
l_boundaries = list()
odl_params = self.odl_params
for param_key in odl_params:
l_boundaries.append((0, len(self.odl_params.read([param_key, 'l_vals'])) - 1))
return l_boundaries
def add_indics(self, od_indics=None):
b_backtest, b_ga = od_indics.read('backtest', True), od_indics.read('b_ga', False)
if b_ga or b_backtest:
cost = self.fitness(**od_indics)
if b_ga:
return cost
else:
""" Gains en pips et en monnaie (€ ou $). """
l_gains = ['Balance (pips)', 'Open (pips)', 'Equity (pips)', 'Balance (€)', 'Open (€)', 'Equity (€)']
self.df_pilot[l_gains] = cost
""" Affiche les statistiques du backtest."""
if od_indics.read('stats', True):
self.bt_stats()
def get_indic(self, indic_function, **all_params):
""" Obtention 'intelligente' des datas d'un indicateur, pour optimiser l'exécution de l'algo génétique (ga) ...
... en effet, lors de son exécution, le ga est ammené à demander plusieurs fois le même indicateur.
- Un indicateur est caractérisé par sa signature : nom de sa fonction et ses paramètres.
- Les datas obtenus de cet indicateur sont une 'Serie' (une col.) ou un 'DataFrame' (plusieurs cols.) pandas.
- Ces datas sont stockés dans une (plusieurs) colonne(s) de self.df_pilot.
- Si les datas demandés existent déjà dans self.df_pilot, on les renvoie directement (rapide).
- Sinon la fonction de l'indicateur est appelée, le résultat est stocké dans self.df_pilot, puis renvoyé (lent).
****************************** EXCEPTION - EXCEPTION - EXCEPTION ******************************
Cette méthode n'est pas prévue pour les indicateurs qui retournent un tuple au lieu d'un DataFrame ou Series.
Le cas échéant, seul l'élément [0] du tuple sera retourné, à condition que ce soit un élément Pandas.
Si les autres éléments du tuple sont nécessaires, utiliser directement la fonction native de pandas_ta.
"""
d_params = all_params.get('params', {})
signature = self.get_signature(all_params['name'])
series = all_params.get('series')
if signature in self.d_indics:
self.df_pilot[series] = self.df_pilot[self.d_indics[signature]]
else:
close = d_params.pop('close', 'Close')
""" MAP : pour voir les erreurs, décommenter la ligne (1), commenter les lignes (2) ******************* """
# df = indic_function(self.df_pilot[close], **d_params) # (1) <-- Ne pas supprimer.
try: # (2) ... jusqu'à "raise SystemExit(msg)"
df = indic_function(self.df_pilot[close], **d_params)
except (Exception,) as err:
if close in self.df_pilot.columns:
msg = f"Erreur lors de la création de l'indicateur '{indic_function.__name__}' :\n{err}"
else:
msg = f"La colonne '{close}' n'existe pas dans le DataFrame pilote df_pilot."
raise SystemExit(msg)
if df.__class__.__name__ == 'tuple':
""" Le 1er élément du tuple est un DataFrame ou une Serie Pandas, les autres -> dans d_added{} """
self.d_added[signature] = df[1:]
df = df[0]
if df.__class__.__name__ not in ['DataFrame', 'Series']:
raise SystemExit(
f"L'indicateur '{indic_function.__name__}' retourne un tuple et non un tableau Pandas."
f"\nCe tuple ne contient pas de DataFrame ou de Series."
f"\nVeuillez utiliser directement la fonction 'df.ta.{indic_function.__name__}(...)'.")
if df.__class__.__name__ == 'DataFrame':
self.d_indics[signature] = ['_' + col for col in list(df.columns)] # [_col1, _col2, ...]
""" Si le nb de cols demandées ne correspond pas au nb de cols fournies, correction automatique. """
if isinstance(series, str):
series = [series]
for i, serie in enumerate(series):
if serie == '_':
continue
self.df_pilot[serie] = df.iloc[:, i]
else: # Series
self.d_indics[signature] = signature
self.df_pilot[series] = df
self.df_pilot[self.d_indics[signature]] = df # Mémorisation unique.
def control_bounds(self, *l_args):
"""
@param l_args: <vide> ou liste des paramètres produits par l'algorithme génétique.
Si l_args est une liste non vide, la méthode affiche les meilleurs paramètres trouvés.
Sinon, elle affiche les listes de paramètres sélectionnables par l'algorithme.
@return: NA -> Affichage dans le terminal.
"""
la, lp = len(l_args), len(self.odl_params)
if la != 0 and la != lp:
l_args = []
Utils.printc(f"Le nombre de valeurs reçues ({la}) est différent du nombre de paramètres à tester ({lp}).")
""" Vérification des paramètres de l'algorithme génétique. """
od_args = Dictionary()
l_pop = list()
for i, param_key in enumerate(self.odl_params):
l_p = [round(param, 2) for param in self.odl_params.read([param_key, 'l_vals'])]
l_pop.append(len(l_p))
print(param_key, ':', l_p)
if l_args:
key_args = param_key.split('__')
j = int(l_args[i])
param = self.odl_params.read([param_key, 'l_vals'])[j]
""" Paramètre dépendant. """
l_add = self.odl_params.read([param_key, 'add'])
depend = ''
if l_add:
l_keys = l_add[1].split('__')
depend = f'{od_args.read(l_keys)} * {round(param, 2)} = '
param = l_add[0](od_args, [param] + l_add[1:])
od_args.write(key_args, param)
print(f"\tSelectionné -> {param_key}[{j}] = {depend}{round(param, 2)}")
if len(l_args) == 0:
n_prod, s_prod = 1, ''
for pop in l_pop:
s_prod += f' * {pop}'
n_prod *= pop
print(f"Espace de recherche : {s_prod[3:]} = {n_prod} chromosomes.")
def fitness_function(self, chromosome):
""" Méthode starter pour l'algorithme génétique. Appelée à chaque génération avec un chromosome différent. """
""" Création de od_args, exemple :
chromosome = [3, 1] # <-- chromosome aléatoire, généré par le modèle.
self.odl_params =
ma1__length: ----------------------------------------------- <class 'dict'>
l_vals: [60, 65, 70, 75, 80, 85, 90, 95, 100] ---------- <class 'list'>
ma2__length: ----------------------------------------------- <class 'dict'>
l_vals: [1.01, 1.21, 1.41, 1.61, 1.81, 2.01, 2.21, 2.41, 2.61, 2.81] <class 'list'>
add: ['ma1__length'] ----------------------------------- <class 'list'>
A partir de {chomosome} et de {self.odl_params}, on crée {od_args} :
od_args =
ma1: ------------------------------------------------------- <class 'dict'>
length: 40 --------------------------------------------- <class 'int'>
ma2: ------------------------------------------------------- <class 'dict'>
length: 150 -------------------------------------------- <class 'int'>
b_ga: True ------------------------------------------------- <class 'bool'>
cost: ------------------------------------------------------ <class 'dict'>
tp: 30 ------------------------------------------------- <class 'int'>
sl: 200 ------------------------------------------------ <class 'int'>
skip: 4 ------------------------------------------------ <class 'int'>
"""
od_args = Dictionary()
for i, key in enumerate(self.odl_params.keys()): # ex : key = 'ma1.params.length'
key_args = key.split('.')
l_vals = self.odl_params.read([key, 'l_vals'])
l_add = self.odl_params.read([key, 'add'], [])
param = l_vals[int(chromosome[i])]
if l_add:
""" Traitement conditionnel -> l_add = [fonction, arg{0}, arg{1}, ..., arg{n} ...] """
l_params = list()
for p in l_add[1:]:
l_params.append(p.replace('.', '.params.'))
param = l_add[0](od_args, [param] + l_params) # l_add[0] est une fonction.
od_args.write(key_args, round(param, 2))
self.chromosome = chromosome
self.args = copy.deepcopy(dict(od_args))
""" Création des indicateurs nécessaires, paramétrés. La fonction de coût est appelée dans add_indics() """
od_args.write('b_ga', True)
od_args.fusion(self.central_args('indics'))
cost = self.add_indics(od_args)
if cost is None:
raise SystemExit("Erreur :\nLa méthode add_indics() de la classe dérivée doit retourner une valeur.")
return cost
def cost(self, **d_params):
""" |_ Colonnes de self.np_trades : 0 : gains (>0 ou <0) clôturés, en pips (trades fermés). cumulé
1 : gains en cours d'évolution, en pips (trades ouverts). cumulé
2 : gains total (clôturés + en évolution), en pips. cumulé
3 : gains (>0 ou <0) clôturés, en monnaie (€ ou $). cumulé
4 : gains en cours d'évolution (€ ou $). cumulé
5 : gains total (clôturés + en évolution) (€ ou $). cumulé
6 : Flag d'ouverture (-1/+1), effacé à la fermeture.
7 : marks d'ouverture (-1/+1), permanent.
8 : indx de fermeture
9 : gain du trade, en pips
10 : gain du trade, en monnaie (€ ou $) """
if d_params.get('b_ga', False):
""" Ini. """
close_index_min = 10**8
""" Parcours des trades ouverts. """
l_open_indx = np.where(self.np_trades[:, 7] != 0)[0] # Liste des index d'ouverture de trades.
# |_ Cette liste d'index {l_open_indx[]} pointe la grande table numpy {self.np_trades}.
for open_indx in l_open_indx:
l_trade = self.np_trades[open_indx] # 11 colonnes : [0., 0., 0., 0., 0., 0., 0., 1., 1.054e+03, 0., 0.]
close_indx = int(l_trade[8]) # . |_ ici, 1054
close_index_min = min(close_index_min, close_indx)
self.np_trades[:, 9] = np.nancumsum(self.np_trades[:, 9])
gains_pips = float(self.np_trades[-1, 9])
g_max, drawdown = -10**8, -10**8
self.np_trades[:, 0] = np.nancumsum(self.np_trades[:, 0])
self.np_trades[:, 2] = self.np_trades[:, 0] + self.np_trades[:, 1]
for g in self.np_trades[:, 2]:
""" Formule : https://arya.xyz/blog/enseignement/drawdown """
g_max = max(g_max, g)
drawdown = max(drawdown, g_max - g)
""" Choix minimize : On cherche à obtenir la plus petite valeur possible pour {minimize}.
Voici différentes possibilités :
- Maximisation des gains (pips) : minimize = -gains_pips
- Minimisation du drawdown : minimize = drawdown
- Maximisation du nb de trades : minimize = -nb_trades
- Mix des 3 : minimize = ((coef_drawdown * drawdown) - gains_pips) / ratio
- Etc. """
""" Maximise seulement les gains. *********************************** """
# minimize = -gains_pips
""" Minimise seulement le drawdawn. ********************************* """
# minimize = drawdown
""" Mix des 3. ****************************************************** """
nb_trades = len(l_open_indx)
if nb_trades == 0:
minimize = 10**7
else:
coef_drawdown = 3 # Importance donnée au faible drawdown par rapport au fort gain.
ratio = 10 ** (nb_trades / len(self.np_datas)) # ratio de 1 à 10 (10**0 à 10**1)
minimize = ((coef_drawdown * drawdown) - gains_pips) / ratio
""" Fin choix minimize ***************************************************************** """
if minimize == 0:
minimize = 10**7
if minimize < self.best:
self.best = minimize
print(f"\nMinimisation de l'algo génétique :\t{round(minimize, 3)}"
f"\nChromosome :\t\t\t\t\t\t{self.chromosome}"
f"\nGains :\t\t\t\t\t\t\t\t{round(gains_pips, 2)} pips"
f"\nDrawdown :\t\t\t\t\t\t\t{round(float(drawdown), 2)} pips"
f"\nArguments :\t\t\t\t\t\t\t{self.args}")
""" L'algorithme génétique tend à minimiser le résultat. """
return minimize
else:
return super().cost(**d_params)
def ga(self, b_single_loop=False):
""" Code appelant : self.__init__() ci-dessus, mode 3 puis 4. """
if b_single_loop:
""" MAP ***** MAP : mode 3 ***** mode 3 ***** mode 3 ***** mode 3 ***** mode 3 ***** mode 3 """
""" Pendant l'algorithme génétique, la fitness_function() est appelée un grand nombre de fois.
- Ici, une seule fois, avec un chromosome tiré au hasard, afin de permettre sa mise au point. """
""" Simulation de création d'un chromosome par le modèle. Tirage au sort des index des paramètres. """
chromosome = [int(np.random.choice(np.linspace(*bounds, bounds[1]))) for bounds in self.l_boundaries]
""" Vérification du chromosome tiré au hasard. """
print()
[print(f"{param_key}[{chromosome[i]}] -> {d_vals['l_vals'][chromosome[i]]}")
for i, (param_key, d_vals) in enumerate(self.odl_params.items())]
""" Résultat. """
self.fitness_function(chromosome) # Chromosome aléatoire.
else:
""" Conditions réelles : mode 4 ***** mode 4 ***** mode 4 ***** mode 4 ***** mode 4 ***** mode 4 ***** """
""" Création du modèle de GA. """
model = self.get_model()
""" Exécution du modèle de GA. """
model.run()
def get_model(self):
""" Création du modèle de GA. """
od_ga = self.central_args('ga')
l_bounds = self.l_boundaries
dim = len(l_bounds)
d_algo = od_ga.read('algo')
return ga(
function=self.fitness_function,
dimension=dim,
variable_type='int',
variable_boundaries=l_bounds,
function_timeout=od_ga.read('function_timeout', 10),
algorithm_parameters={
'max_num_iteration': d_algo.get('max_num_iteration', 200),
'population_size': d_algo.get('population_size', 100),
'mutation_probability': d_algo.get('mutation_probability', .1),
'elit_ratio': d_algo.get('elit_ratio', .01),
'crossover_probability': d_algo.get('crossover_probability', .5),
'parents_portion': d_algo.get('parents_portion', .3),
'crossover_type': d_algo.get('crossover_type', 'uniform'),
'mutation_type': d_algo.get('mutation_type', 'uniform_by_center'),
'selection_type': d_algo.get('selection_type', 'roulette'),
'max_iteration_without_improv': d_algo.get('xxxx', None)
})
\trading\strategies\backtest.py
:
""" Version 2022-04-22 """
# Imports externes
import numpy as np
import pandas as pd
# Imports internes
from functions.utils import DateTime, Utils
# noinspection PyUnresolvedReferences, PyArgumentList
class BackTest:
def __init__(self):
self.l_columns = None
self.np_datas = None
self.nb_rows = 0
""" Statistiques. """
self.np_trades = None
self.close_cond = None # Fonction : condition de fermeture d'un trade.
self.df_marks = None # dataframe
self.d_cost = None # dictionary
def fitness(self, **d_args):
""" Code appelant : UI d'affichage. """
""" Élimination des colonnes Open, High, Low, ... ainsi que celles pour le ga (commençant par '_'). """
self.l_columns = ['Close'] + [col for col in list(self.df_pilot.columns[7:]) if not col.startswith('_')]
self.np_datas = self.df_pilot[self.l_columns].values
self.d_cost = d_args.get('cost', {}) # Arguments pour la fonction de coût.
self.nb_rows = self.np_datas.shape[0]
self.np_trades = np.zeros((self.nb_rows, 11), dtype=np.float32)
""" |_ Colonnes de self.np_trades : 0 : gains (>0 ou <0) clôturés, en pips (trades fermés). cumulé
1 : gains en cours d'évolution, en pips (trades ouverts). cumulé
2 : gains total (clôturés + en évolution), en pips. cumulé
3 : gains (>0 ou <0) clôturés, en monnaie (€ ou $). cumulé
4 : gains en cours d'évolution (€ ou $). cumulé
5 : gains total (clôturés + en évolution) (€ ou $). cumulé
6 : Flag d'ouverture (-1/+1), effacé à la fermeture.
7 : marks d'ouverture (-1/+1), permanent.
8 : indx de fermeture
9 : gain du trade, en pips
10 : gain du trade, en monnaie (€ ou $) """
indx = 0
while np.isnan(np.sum(self.np_datas[indx])):
""" Recherche de l'index de la 1ère ligne où aucune valeur n'est nan. """
indx += 1
if indx >= self.nb_rows - 1:
break
d_args['indx'] = indx # <-- Ajout de cette information dans les arguments.
return self.cost(**d_args)
def cost(self, **d_params):
"""
@param d_params: paramètres spécifiques à cette stratégie : stop-loss, take-profit, seuils divers, etc.
@return: dépend du code appelant -> (Calcul du gain + Marqueurs) ou (Process d'algorithme génétique).
- Si 'Gain + Marqueurs' : tuple de 2 DataFrames : (df_gains, df_marks).
- Si 'GA' : -1 * Gain. Le -1 s'explique par le fait que le ga cherche à minimiser le résultat.
"""
""" Calculs. """
spread = self.d_cost.get('spread', 2)
l_open_indx = np.where(self.np_trades[:, 7] != 0)[0] # Liste des index d'ouverture de trades.
ll_marks = list()
balance = self.d_cost.get('capital', 1000) # Capital initial.
self.np_trades[0, 3] = balance
risk = self.d_cost.get('risk', 1)
for open_indx in l_open_indx:
l_trade = self.np_trades[open_indx]
close_indx = int(l_trade[8])
if close_indx < open_indx:
continue
coef = l_trade[7] # +1 ou -1 <-- achat ou vente.
price = self.np_datas[open_indx, 0]
gains = (self.np_datas[close_indx, 0] - price) * coef - spread
np_gains = (self.np_datas[open_indx: close_indx, 0] - price) * coef
""" Currencies (€ ou $) clôturés. """
curr = gains * balance * risk * self.pips # Currencies (€ ou $).
self.np_trades[open_indx, 10] = curr
self.np_trades[close_indx, 3] += curr
balance += curr
""" Currencies (€ ou $) ouverts. """
self.np_trades[open_indx: close_indx, 4] += np_gains * balance * risk * self.pips
""" Marqueurs. """
op = 'achat' if l_trade[7] == 1 else 'vente'
ll_marks.append([open_indx, price, f'Ouvre {op}'])
close_indx = int(l_trade[8])
if close_indx > 0:
ll_marks.append([close_indx, self.np_datas[close_indx, 0], f'Ferme {op}'])
self.np_trades[:, 0] = np.nancumsum(self.np_trades[:, 0])
self.np_trades[:, 2] = self.np_trades[:, 0] + self.np_trades[:, 1]
self.np_trades[:, 3] = np.cumsum(self.np_trades[:, 3])
self.np_trades[:, 5] = self.np_trades[:, 3] + self.np_trades[:, 4]
""" Anti-chevauchement des marqueurs. """
ll_marks.sort()
i, indx_ante = 4, 0
for l_mark in ll_marks:
if l_mark[0] == indx_ante:
l_mark[1] += i
else:
indx_ante = l_mark[0]
self.df_marks = pd.DataFrame(data=ll_marks, columns=['Index', 'Close', 'Type'])
return pd.DataFrame(data=self.np_trades[:, :6],
columns=['Close_p', 'Open_p', 'All_p', 'Close_c', 'Open_c', 'All_c'])
def bt_stats(self):
""" bt_stats = Statistiques des backtests. """
dt = DateTime()
stamp_from = self.df_pilot.iloc[0, 0]
stamp_to = self.df_pilot.iloc[-1, 0]
nb_days = max(1, round((stamp_to - stamp_from) / 86_400))
_from = dt.get_dtstr_from_dtstamp(stamp_from, dt_format='%d/%m/%Y')
_to = dt.get_dtstr_from_dtstamp(stamp_to, dt_format='%d/%m/%Y')
capital_ini = round(float(self.np_trades[0, 5]), 2)
capital_min = round(float(np.min(self.np_trades[:, 5])), 2)
crashed = '← crashed !' if capital_min < 0 else ''
capital_max = round(float(np.max(self.np_trades[:, 5])), 2)
capital_final = round(float(self.np_trades[-1, 5]), 2)
perf = round(100 * (capital_final - capital_ini) / capital_ini, 2)
nb_buy = np.sum(self.np_trades[:, 7] == 1)
nb_sell = np.sum(self.np_trades[:, 7] == -1)
nb_winner = np.sum(self.np_trades[:, 9] > 0)
nb_loser = np.sum(self.np_trades[:, 9] < 0)
mask = self.np_trades[:, 10] != 0
l_trades = self.np_trades[:, 10][mask]
win_consec = lose_consec = win_consec_max = lose_consec_max = 0
score_win = score_lose = 0
if len(l_trades) > 0:
score_ante = l_trades[0]
for score in l_trades:
if score > 0:
score_win += score
else:
score_lose += score
if score > 0 and score_ante > 0:
win_consec += 1
win_consec_max = max(win_consec_max, win_consec)
elif score < 0 and score_ante < 0:
lose_consec += 1
lose_consec_max = max(lose_consec_max, lose_consec)
else:
win_consec = lose_consec = 0
score_ante = score
if nb_winner > 0:
score_win /= nb_winner
if nb_loser > 0:
score_lose /= nb_loser
pips_max_indx = np.argmax(self.np_trades[:, 9])
pips_max = round(float(self.np_trades[pips_max_indx, 9]), 2)
stamp = self.df_pilot.iloc[pips_max_indx, 0]
pips_max_dt = dt.get_dtstr_from_dtstamp(stamp, dt_format='%d/%m/%Y')
pips_max_indexes = f'index de {pips_max_indx} à {int(self.np_trades[pips_max_indx, 8])}'
pips_min_indx = np.argmin(self.np_trades[:, 9])
pips_min = round(float(self.np_trades[pips_min_indx, 9]), 2)
stamp = self.df_pilot.iloc[pips_min_indx, 0]
pips_min_dt = dt.get_dtstr_from_dtstamp(stamp, dt_format='%d/%m/%Y')
pips_min_indexes = f'index de {pips_min_indx} à {int(self.np_trades[pips_min_indx, 8])}'
curr_max_indx = np.argmax(self.np_trades[:, 10])
curr_max = round(float(self.np_trades[curr_max_indx, 10]), 2)
stamp = self.df_pilot.iloc[curr_max_indx, 0]
curr_max_dt = dt.get_dtstr_from_dtstamp(stamp, dt_format='%d/%m/%Y')
curr_max_indexes = f'index de {curr_max_indx} à {int(self.np_trades[curr_max_indx, 8])}'
curr_min_indx = np.argmin(self.np_trades[:, 10])
curr_min = round(float(self.np_trades[curr_min_indx, 10]), 2)
stamp = self.df_pilot.iloc[curr_min_indx, 0]
curr_min_dt = dt.get_dtstr_from_dtstamp(stamp, dt_format='%d/%m/%Y')
curr_min_indexes = f'index de {curr_min_indx} à {int(self.np_trades[curr_min_indx, 8])}'
g_max = -10**8
drawdown = 0
curr_dwn_indx = 0
for i, g in enumerate(self.np_trades[:, 2]):
g_max = max(g_max, g)
if (g_max - g) > drawdown:
curr_dwn_indx = i
drawdown = g_max - g
stamp = self.df_pilot.iloc[curr_dwn_indx, 0]
curr_dwn_dt = dt.get_dtstr_from_dtstamp(stamp, dt_format='%d/%m/%Y')
""" Contexte. """
total_pips = round(float(np.sum(self.np_trades[:, 9])), 2)
od_pilot = self.central_args('pilot')
instrument = od_pilot['instrument']
table = od_pilot.read('table')
pc_from = od_pilot.read(['test', 'pc_from'])
pc_to = od_pilot.read(['test', 'pc_to'])
nb_rows = self.df_pilot.shape[0]
typ = f'Renko {table}' if isinstance(table, int) else f'Candles-{table}' if isinstance(table, str) else 'Ticks'
if pc_from is None:
points = f"{nb_rows} points jusqu'à {pc_to}% de la table en base de données."
elif pc_to is None:
points = f"{nb_rows} points à partir de {pc_from}% de la table en base de données."
else:
points = f"{nb_rows} points entre {pc_from}% et {pc_to}% de la table en base de données."
result = "Statistiques :" \
"\n=============" \
f"\nContexte : {instrument} - {typ} - {points}" \
f"\nPériode : {_from} -> {_to}" \
f"\nNb jours: {nb_days}" \
f"\nGain journalier: {round((capital_final - capital_ini) / nb_days, 2)} €." \
f"\n\nCapital initial: {capital_ini} €" \
f"\nCapital min: {capital_min} € {crashed}" \
f"\nCapital max: {capital_max} €" \
f"\nCapital final: {capital_final} €" \
f"\nPerformance: {perf} %" \
f"\nGain en pips: {total_pips} pips" \
f"\n\nNb trades à l'achat: {nb_buy}" \
f"\nNb trades à la vente: {nb_sell}" \
f"\nNb trades gagnants: {nb_winner}" \
f"\nNb trades perdants: {nb_loser}" \
f"\nNb max gagnants consécutifs: {win_consec_max}" \
f"\nNb max perdants consécutifs: {lose_consec_max}" \
f"\n\nGain moyen trades gagnants: {round(score_win, 2)} €" \
f"\nGain moyen trades perdants: {round(score_lose, 2)} €" \
f"\nMeilleur trade (pips): {pips_max} pips, le {pips_max_dt} ({pips_max_indexes})" \
f"\nMeilleur trade (monnaie): {curr_max} €, le {curr_max_dt} ({curr_max_indexes})" \
f"\nPire trade (pips): {pips_min} pips, le {pips_min_dt} ({pips_min_indexes})" \
f"\nPire trade (monnaie): {curr_min} €, le {curr_min_dt} ({curr_min_indexes})" \
f"\nPerte max (drawdown): {round(drawdown)} pips le {curr_dwn_dt} (index {curr_dwn_indx})" \
"\n-------------------------------------------------------------------------------"
print(result)
def closures(self, close_indx):
""" Fermetures, options possibles :
- Modifier le code pour permettre des fermetures partielles.
|_ https://www.google.fr/search?q=trading+fermetures+partielles
- Modifier le code dans {close_cond()} de la classe dérivée pour permettre les stops suiveurs.
|_ https://www.google.fr/search?q=trading+stop+suiveur
"""
spread = self.d_cost.get('spread', 2)
b_end = close_indx >= self.nb_rows - 1 # Flag de fermeture de tous les résiduels.
for open_indx in np.where(self.np_trades[:, 6] != 0)[0]: # Liste des indx des trades ouverts (flags col 6).
""" Liste des indx trades ouverts. Le flag6 (col 6) est obligatoirement +1 ou -1. """
if b_end or self.close_cond(open_indx): # Fonction dans la classe dérivée.
self.np_trades[open_indx, 6] = 0 # RAZ flag.
self.np_trades[open_indx, 8] = close_indx # Index de fermeture
coef = self.np_trades[open_indx, 7]
price = self.np_datas[open_indx, 0]
gains = (self.np_datas[close_indx, 0] - price) * coef - spread
np_gains = (self.np_datas[open_indx: close_indx, 0] - price) * coef
self.np_trades[open_indx, 9] = gains # 9 : gain du trade, en pips
self.np_trades[close_indx, 0] += gains # 0 : gains (>0 ou <0) clôturés, en pips.
self.np_trades[open_indx: close_indx, 1] += np_gains # 1 : gains en cours d'évolution, en pips.
def opening(self, op, indx):
""" 2 flags d'ouverture (-1/+1) : col 6 -> effacé à la fermeture, col 7 -> permanent. """
self.np_trades[indx, 6: 8] = 1 if op == 'buy' else -1 # 1='buy', -1='sell'.
""" Voir colonnes de self.np_trades dans self.fitness() """
def cond(self, txt, indx):
""" Exemple : txt = 'AO-DRV cu 0' signifie : L'indicateur AO-DRV croise vers le haut (cross-up) la valeur 0.
Opérateurs disponibles : /, \\, --, v, ^, cu, cd, ==, !=, >, >=, <, <= """
def iscolumn(column_name):
if self.txt_is_number(column_name):
return True
if column_name in self.l_columns:
return True
Utils.printc(f"Classe GeneticAlgorithm, méthode 'cond()' :\nLa colonne '{column_name}' n'existe pas.")
return False
def col(col_name):
return self.l_columns.index(col_name)
l_c = txt.split() # Séparateur = ' ' (espace).
left = l_c[0] # str
op = l_c[1] # str
if not iscolumn(left):
return False
if len(l_c) == 2:
""" /, \\, --, v, ^ """
col = col(left)
if op in ['/', '\\', '--'] and indx > 0:
""" Pente positive (/), négative (\\) ou nulle(--). """
sign_1, sign_0 = self.np_datas[indx - 1, col], self.np_datas[indx, col]
return sign_0 > sign_1 if op == '/' else sign_0 == sign_1 if op == '--' else sign_0 < sign_1
elif op in ['v', '^'] and indx > 1:
""" Les 3 points précédents forment un creux (v) ou un sommet (^). """
sign_2, sign_1, sign_0 = (self.np_datas[indx - 2, col], self.np_datas[indx - 1, col],
self.np_datas[indx, col])
return sign_1 < sign_2 and sign_1 < sign_0 if op == 'v' else sign_1 > sign_2 and sign_1 > sign_0
pass
elif len(l_c) == 3:
""" cu, cd, ==, !=, >, >=, <, <= """
right = l_c[2]
if not iscolumn(right):
return False
if op in ['cu', 'cd']:
""" Cross up & cross down : Croisements à la hausse ou à la baisse. """
col_moved = left if self.txt_is_number(left) else col(left) # Valeur en str ou N° col.
col_ref = right if self.txt_is_number(right) else col(right)
way = 'up' if op == 'cu' else 'down'
return self.cross(way, indx, col_moved, col_ref)
elif op in ['==', '!=', '>', '>=', '<', '<=']:
""" Comparaisons. """
left = float(left) if self.txt_is_number(left) else self.np_datas[indx, col(left)]
right = float(right) if self.txt_is_number(right) else self.np_datas[indx, col(right)]
if op == '==':
return left == right
elif op == '!=':
return left != right
elif op == '>':
return left > right
elif op == '>=':
return left >= right
elif op == '<':
return left < right
else: # '<='
return left <= right
return False
@staticmethod
def txt_is_number(text):
""" {text} est une chaîne ET représente un nombre (int ou float). """
if isinstance(text, str):
try:
float(text)
return True # Est une chaîne ET représente un nombre => True.
except (Exception,):
return False # Est une chaîne mais ne représente pas un nombre => False.
else:
return False # N'est pas une chaîne => False.
def cross(self, way, indx, col_moved, col_ref):
if self.txt_is_number(col_ref):
ref_now = ref_ante = float(col_ref)
else:
ref_now, ref_ante = self.np_datas[indx, col_ref], self.np_datas[indx - 1, col_ref]
if self.txt_is_number(col_moved):
mov_now = mov_ante = int(col_moved)
else:
mov_now, mov_ante = self.np_datas[indx, col_moved], self.np_datas[indx - 1, col_moved]
if ref_now == mov_now:
return False
i = 2
while ref_ante == mov_ante:
ref_ante = float(col_ref) if self.txt_is_number(col_ref) else self.np_datas[indx - i, col_ref]
mov_ante = float(col_moved) if self.txt_is_number(col_moved) else self.np_datas[indx - i, col_moved]
i += 1
b_cross = (ref_now - mov_now) * (ref_ante - mov_ante) < 0
return (mov_now > ref_now) == (way == 'up') if b_cross else False
poly
, créé pour la circonstance, est à placer dans les indicateurs personnalisés./functions/custom_ta/trend/poly.py
:
""" Version 2022-04-22 """
# Imports externes.
import numpy as np
import pandas as pd
from scipy.signal import argrelextrema
import seaborn as sns
def poly(close, length, delay, deg, **kwargs):
deg = min(4, max(1, deg)) # degré limité de 1 à 4.
nb_rows = close.shape[0]
np_datas = np.full((nb_rows, 2), np.nan)
np_datas[:, 0] = np.array(close.index)
np_datas[:, 1] = close.values
np_traces = np.full((nb_rows, 2), np.nan)
""" Sommets (t_top) du signal d'entrée (close). """
idx_top = argrelextrema(np_datas[:, 1], np.greater_equal)[0].astype(np.int)
np_top = np.zeros((idx_top.size, deg+3))
np_top[:, :2] = np_datas[idx_top] # datas filtrés -> 2 colonnes : 0 (indx) et 1 (val sommets)
for p in range(length, np_top.shape[0]):
np_idx_roll = np_top[p-length:p, 0]
np_val_roll = np_top[p-length:p, 1]
np_top[p, 2:] = np.polyfit(np_idx_roll, np_val_roll, deg)
indx_ante, indx = int(np_top[p-1, 0])+delay, int(np_top[p, 0])+delay
if indx >= nb_rows+1:
break
""" Trace du dernier point, pour le graphique. """
if p > length:
np_x = np.arange(indx_ante, indx).astype(np.int64) # int64, car {np_x} sera élévé à la puissance {deg}.
np_traces[indx_ante: indx, 0] = 0
for i in range(deg+1): # Pour deg==3 => ax^3 + bx^2 + cx + d
np_traces[indx_ante: indx, 0] += np_top[p-1, 2+i] * np_x**(deg-i)
""" Creux (t_bot) du signal d'entrée (close). """
idx_bot = argrelextrema(np_datas[:, 1], np.less_equal)[0].astype(np.int)
np_bot = np.zeros((idx_bot.size, deg+3))
np_bot[:, :2] = np_datas[idx_bot] # datas filtrés -> 2 colonnes : 0 (indx) et 1 (val creux)
for p in range(length, np_bot.shape[0]):
np_idx_roll = np_bot[p-length:p, 0]
np_val_roll = np_bot[p-length:p, 1]
np_bot[p, 2:] = np.polyfit(np_idx_roll, np_val_roll, deg)
indx_ante, indx = int(np_bot[p-1, 0])+delay, int(np_bot[p, 0])+delay
if indx >= nb_rows+1:
break
""" Trace du dernier point, pour le graphique. """
if p > length:
np_x = np.arange(indx_ante, indx).astype(np.int64)
np_traces[indx_ante: indx, 1] = 0
for i in range(deg+1): # Pour deg==3 => ax^3 + bx^2 + cx + d
np_traces[indx_ante: indx, 1] += np_bot[p-1, 2+i] * np_x**(deg-i)
return pd.DataFrame(np_traces, columns=['Trace_H', 'Trace_L']), np_top, np_bot
def poly_method(self, length=10, delay=1, deg=2, **kwargs):
""" Méthode appelée uniquement en mode "Pandas TA DataFrame Extension" (pas en mode "Standerd"). """
close = self._get_column(kwargs.pop("close", "Close"))
result = poly(close=close, length=length, delay=delay, deg=deg, **kwargs)
return self._post_process(result, **kwargs)
def plot(ax, l_x, np_top, np_bot, **kwargs):
""" Affichage dynamique des courbes polynomiales. """
length = 4*kwargs.get('length', 6)
deg = np_top.shape[1] - 3
delay = kwargs.get('delay', 5)
x = l_x[-1]
np_x = np.array(l_x[-length:], dtype=np.int64)
""" Recherche du batch-top concerné par np_x[-1] """
p = np.where(np_top[:, 0] <= x-delay)[0][-1]
coefs = np_top[p, 2:]
np_y_top = np.zeros((length,))
for i in range(deg + 1):
np_y_top += coefs[i] * np_x**(deg-i)
""" Recherche du batch-bot concerné par np_x[-1] """
p = np.where(np_bot[:, 0] <= x-delay)[0][-1]
coefs = np_bot[p, 2:]
np_y_bot = np.zeros((length,))
for i in range(deg + 1): # Pour deg==3 => y = ax^3 + bx^2 + cx + d
np_y_bot += coefs[i] * np_x**(deg-i)
""" Tracés dynamiques. """
sns.lineplot(x=np_x, y=np_y_top, ax=ax, legend=False, **kwargs.get('Trace_H'))
sns.lineplot(x=np_x, y=np_y_bot, ax=ax, legend=False, **kwargs.get('Trace_L'))
poly.plot = plot
poly.__doc__ = """
La fonction 'poly()' de cet indicateur est la fonction standard.
La fonction 'plot()' de cet indicateur peut être appelée pour tracer les segments de droite à la volée.
|_ Sur un graphique animé, l'appel se fera à chaque rafraîchissement de l'affichage, dans un "hook axis".
Fonction poly() :
==================
Ligne du haut, ligne du bas. Pour chacune d'elles, 3 points d'abscisse importants -> {x_m}, {x_n} et {x_présent} :
- Le batch étudié va de {x_m} à {x_n}, suivi d'un délai : l'ensemble {x_n, x_présent}, ignoré dans les calculs.
- La longueur du batch est le paramètre {length}.
|_ La longueur du segment tracé est celle du batch, à savoir {x_m} à {x_n}.
- La taille du délai est le paramètre {delay}.
- Le degré de la fonction est le paramètre {deg}. Les valeurs admises sont 1, 2, 3 et 4.
|_ 1er degré : y = ax + b, 2ème degré : y = ax^2 + bx + c, etc.
Paramètres :
------------
length : Valeur par défaut : 10
delay : Valeur par défaut : 1
deg : Valeur par défaut : 2
Return :
--------
Tuple à 3 valeurs : (pd.DataFrame ['Trace_H', 'Trace_L'], sommets (ndarray), creux (ndarray)).
Fonction plot():
================
- Cette fonction doit être appelée à chaque rafraîchissement du graphique, car le tracé est dynamique.
- Le code appelant est dans la méthode hook_axis_anim(), la syntaxe d'appel est : ta.poly.plot(...)
|_ Voir exemple de code d'appel ci-dessous.
- Elle permet d'afficher des courbes relatives aux sommets et aux creux.
- L'extrémité droite de ces courbes est le présent et coïncident avec les traces fournies par l'indicateur.
Paramètres :
------------
ax : objet {axis} matplotlib : graphique dans lequel seront affichées les courbes.
l_x = liste des abscisses.
np_top et np_bot : Les sommets et creux fournis dans un tuple par la fonction standard (voir 'Return' au dessus).
kwargs : dictionnaire d'arguments supplémentaires.
Exemple de code d'appel :
-------------------------
if axis_name == 'Principal':
# Remarque : {self.np_poly_top} et {self.np_poly_bot} doivent avoir été affectés dans la méthode post_params().
self.od_params.write('Trace_H', dict(color='b', lw=.6, ls='-'))
self.od_params.write('Trace_L', dict(color='darkorange', lw=.6, ls='-'))
ta.poly.plot(ax=o_ax, l_x=l_x, np_top=self.np_poly_top, np_bot=self.np_poly_bot, **self.od_params)
"""
/signaux_dynamiques
, /signaux_dynamiques/dyn_01_polyfit
ainsi que le fichier suivant./trading/strategies/signaux_dynamiques/dyn_01_polyfit/main_01.py
:
""" Version 2022-04-02 """
# Imports externes
import pandas_ta as ta # Pour afficher la documentation d'un indicateur -> help(ta.kama)
# import seaborn as sns
# Imports internes
from functions.utils import Dictionary
from show.show_geek import ShowGeek
# noinspection PyTypeChecker,PyUnresolvedReferences
class Geek(ShowGeek):
def __init__(self):
self.np_poly_top = None
self.np_poly_bot = None
self.od_params = None
super().__init__()
def central_args(self, l_keys):
od_args = Dictionary(dict(
# ************************************ FENÊTRE ************************************
ui=dict(
geometry=(200, 40, 1_000, 700),
abscissa_size=200,
window_title="Signaux dynamiques 01 - Polyfit",
figure_title="Régression polynomiale dynamique",
subplots=dict(
Principal=60,
Milieu='',
# Bas='',
),
# show_volume=True,
# animation=False,
interval=30,
),
# ******************************** DATAFRAME PILOTE *******************************
pilot=dict(
instrument='EUR/USD',
table=7,
test=dict(pc_from=None, pc_to=100, nb_rows=20_001),
),
# ********************************** INDICATEURS **********************************
indics=dict(
backtest=False,
stats=False,
poly=dict( # ... indicateurs
function=ta.poly,
series=['Trace_H', 'Trace_L'],
params=dict(
length=14,
delay=1,
deg=3
),
),
custom=dict(
function=self.custom,
params=dict(),
),
),
# *********************************** AFFICHAGE ***********************************
# color : 'r', 'g', 'b', 'y', 'm', 'c', 'w', 'k' - Voir aussi matplotlib named_colors.
# ls = '-', '--', '-.', ':' ('' = invisible) - Voir matplotlib linestyles.
# legend, 2 lettres : [l|c|r][h|m,b], ou bien False.
display=dict(
Principal=dict(
lines=dict(
# Trace_H=dict(color='b', lw=.6, ls=':'),
# Trace_L=dict(color='darkorange', lw=.6, ls=':'),
),
color_between=[ # (Ctrl + clic) https://matplotlib.org/stable/gallery/color/named_colors.html
# dict(y1='Trace_H', y2='Trace_L', color_up='#0a01', color_down='#00a1'),
],
# legend='lm',
),
Milieu=dict(
lines=dict(
# ...
),
),
)
))
return Dictionary(od_args.read(l_keys, {}))
def custom(self, **kwargs):
pass
def post_params(self):
""" Affichage des peaks. """
t_added = self.get_d_added('poly')
if t_added is not None:
o_ax = self.o_ax('Principal')
self.np_poly_top, self.np_poly_bot = t_added
np_top, np_bot = self.np_poly_top[:, :2], self.np_poly_bot[:, :2]
# sns.scatterplot(x=np_top[:, 0], y=np_top[:, 1], ax=o_ax)
# sns.scatterplot(x=np_bot[:, 0], y=np_bot[:, 1], ax=o_ax)
""" {self.od_params} utilisé en boucle dans hook_axis_anim() (ci-après). """
self.od_params = self.central_args(['indics', 'poly', 'params'])
ret = super().post_params() # Code avant -> inséré, code après -> ajouté.
return ret
def hook_axis_anim(self, axis_name, l_x, o_ax, df, y_min, y_max, get_line):
# if axis_name == 'Principal':
# """ {self.np_poly_top} et {self.np_poly_bot} doivent avoir été affectés dans la méthode post_params(). """
# self.od_params.write('Trace_H', dict(color='b', lw=.6, ls='-'))
# self.od_params.write('Trace_L', dict(color='darkorange', lw=.6, ls=''))
# ta.poly.plot(ax=o_ax, l_x=l_x, np_top=self.np_poly_top, np_bot=self.np_poly_bot, **self.od_params)
return super().hook_axis_anim(axis_name, l_x, o_ax, df, y_min, y_max, get_line)
if __name__ == '__main__':
Geek()
L'explication est exposée pas à pas. Ce point de départ ne représente que le signal pilote, à savoir les 20 000 derniers points d'un Renko de maille 7.
seaborn
.
deg=1
hook_axis_anim()
.
L'indicateur poly
calcule la droite de régression linéaire sur les derniers points du graphique affiché.
np_top
et np_bot
.plot()
, appellée dynamiquement, ces coefficients sont utilisés pour synthétiser la courbe en fonction de la dernière valeur x
de l'abscisse du graphique.scatterplot
de post_param()
.ls
dans le 2ème od_params
de hook_axis_anim()
, par exemple ls='-'
.
Les sommets et les creux sont affichés pour la compréhension.
scatterplot()
dans post_params()
).Trace_L
.dict
dans color_between.
Les segments de droite se comportent comme des stylets laissant une trace tout à droite. Avancer manuellement pour mieux observer.
deg
de l'indicateur.
Régression polynomiale de degré 3 : y = ax^3 + bx^2 + cx + d
Seuls les coefficients a, b, c et d
de chaque courbe ont été mémorisés par l'indicateur.
La fonction plot()
, appellée à chaque affichage, synthétise les courbes avec ces coefficients.
Chaque combinaison de droites prétend à des prédictions sur l'évolution du marché.
lines
./functions/custom_ta/trend/lines.py
:
""" Version 2022-04-22 """
# Imports externes.
import numpy as np
import pandas as pd
from scipy import stats
from scipy.signal import argrelextrema
import seaborn as sns
def lines(close, delay, **kwargs):
shape = close.shape[0], 9 # 0=indx, 1=close, 2=Trace_H, 3=Pente_H, 4=X_H, 5=Trace_L, 6=Pente_L, 7=X_L, 8=Stats
np_canals = np.full(shape, np.nan, dtype=np.float32)
np_canals[:, 0] = np.array(close.index) # x : Vecteur : [100 101 102 103 ... 297 298 299] <-- exemple.
np_canals[:, 1] = close.values
""" *************** 1ère partie : Création des lignes délimitant les bandes. **************** """
delay = max(1, delay)
max_et = .2 # Indépendant de la maille.
""" Sommets (t_top) du signal d'entrée (close). """
idx_top = argrelextrema(np_canals[:, 1], np.greater)[0]
np_top = np_canals[:, :2][idx_top] # 2 colonnes : 0 (indx) et 1 (sommets)
""" Creux (t_bottom) du signal d'entrée (close). """
idx_bottom = argrelextrema(np_canals[:, 1], np.less)[0]
np_bottom = np_canals[:, :2][idx_bottom] # 2 colonnes : indx(0) et creux(1)
""" Parcours en 2 passes : k==0 -> 1ère passe (np_peak==np_top), k==1 -> 2ème passe (np_peak==np_bottom) """
for k, np_peak in enumerate([np_top, np_bottom]):
""" Parcours des peaks (extremums). """
for indx in range(15, np_peak.shape[0]):
np_batch, a, b, i = None, 0., 0., 5
while True:
i += 1
""" Segment à tester, de plus en plus grand : fin fixe, début décroissant. """
b_cont = i > indx
if b_cont:
break
np_batch = np.copy(np_peak[indx - i: indx, :2])
""" Modèle de régression linéaire sur le nuage de points {(x, y)} de _np_batch. """
res = stats.linregress(np_batch)
a, b = res.slope, res.intercept
""" Synthèse de la droite de régression linéaire, sur {i} points. """
np_x = np_batch[:, 0].astype(np.int)
np_y = a * np_x + b
""" Mise 'à l'horizontale' du nuage, par rotation des {y}, valeur moyenne des y = 0. """
np_horiz = np_batch[:, 1] - np_y
""" Filtrage du batch. """
mask = (np_horiz > 0) if k == 0 else (np_horiz < 0)
mask[-2:] = True # Les 2 derniers points ne sont pas masqués.
np_batch = np_batch[mask]
if np_batch.shape[0] < 3:
continue
res = stats.linregress(np_batch)
a, b = res.slope, res.intercept
np_x = np_batch[:, 0].astype(np.int)
np_y = a * np_x + b
np_horiz = np_batch[:, 1] - np_y
""" Le dernier point ajouté est le 1er de la liste. Est-il abérrant ? si oui, fin de boucle. """
# https://docs.scipy.org/doc/scipy/reference/generated/scipy.stats.zscore.html
l_outliers = stats.zscore(np_horiz) # l_outliers[0] = Dernier point.
if (l_outliers[0] < -max_et and a <= 0) or (l_outliers[0] > max_et and a >= 0):
break
if b_cont:
continue
""" Mémorisation dans np_canals. """
first_x = np_batch[0, 0]
last_x = np_batch[-1, 0].astype(int) + delay
next_x = np_peak[indx, 0].astype(int) + delay
np_trace = a * np_canals[last_x: next_x, 0] + b
np_canals[last_x: next_x, 2 + 3 * k] = np_trace # cols 2 et 5 : ndarray de {last_x} à {next_x}.
np_canals[last_x: next_x, 3+3*k: 5+3*k] = a, first_x # 3-4 et 6-7 : Même valeurs de {last_x} à {next_x}.
""" *************** 2ème partie : Statistiques. *************** """
if kwargs.get('statistics', True):
maille = max(1, np.round(np.nanmean(abs(np_canals[-10:, 1] - np_canals[-11: -1, 1]))).astype(int))
length_peaks = 100
for x, row in enumerate(np_canals):
""" https://www.centralcharts.com/fr/gm/1-apprendre/7-analyse-technique/27-figure-chartiste """
if np.isnan(np.sum(row[2: 8])):
continue
signal_now = row[1]
np_y = np.array([row[2], row[5]]) # [2] et [5] = traces (y, dans l'équation b = y-ax)
np_a = np.array([row[3], row[6]])
np_b = np_y - np_a * x
np_x_ante = np.array([row[4], row[7]]).astype(int) # first_x : permet de calculer la longueur de la ligne.
np_y_ante = np_a * np_x_ante + np_b
""" Calcul de b_top_ante : Le signal anté est-il supérieur au point de début de la plus longue ligne ? """
indx = 0 if np_x_ante[0] < np_x_ante[1] else 1 # indx de la ligne la plus longue.
_from, _to = max(0, np_x_ante[indx] - length_peaks), np_x_ante[indx] # Range des points anté à étudier.
argmin = _from + np.argmin(np_canals[_from: _to, 1]) # Indice de la plus petite valeur.
argmax = _from + np.argmax(np_canals[_from: _to, 1]) # Indice de la plus grande valeur.
y_ante = np_y_ante[indx] # Valeur du point de début de la plus longue ligne.
opti = max(argmax, argmin) # Indx le plus proche du canal.
""" Validation de opti : le signal ne doit pas être dans le range du canal. Le cas échéant, on inverse. """
if np.min(np_y_ante) <= np_canals[opti, 1] <= np.max(np_y_ante):
opti = min(argmax, argmin) # Indx le plus éloigné du canal.
y_peak_ante = np_canals[opti, 1]
b_top_ante = y_peak_ante > y_ante
epsilon = 3 / maille # Pente limite.
b_top = (np_y[0] - signal_now) < (signal_now - np_y[1]) # Signal présent plus près du top que du bottom.
""" Cluster : composé de 5 lettres : [a|h|d] [a|h|d] [x|q|i] [t|b] [t|b] => 3 x 4 x 2 x 2 = 108 clusters.
Lettr : [np_a[0](pente-haut), np_a[1](pente-bas), np_a[1](par rapport à np_a[0]), b_top_ante, b_top_now] """
sign = 'a' if np_a[0] > epsilon else ('d' if np_a[0] < -epsilon else 'h') # (a)scend, (d)escend, (h)oriz.
sign += 'a' if np_a[1] > epsilon else ('d' if np_a[1] < -epsilon else 'h') # (a)scend, (d)escend, (h)oriz.
if abs(np_a[1]) > abs(np_a[0]) + epsilon:
sign += 'x' # a1 e(x)terne à a0 => abs(pente-bas) > abs(pente-haut) + ε
elif abs(np_a[1]) < abs(np_a[0]) - epsilon:
sign += 'i' # a1 (i)nterne à a0 => abs(pente-bas) < abs(pente-haut) - ε
else:
sign += 'q' # Parallèles ou symétriques : ~e(q)ual à a0 => abs(pente-bas) ~= abs(pente-haut)
sign += 't' if b_top_ante else 'b'
sign += 't' if b_top else 'b'
stat = float(sign.replace('a', '1').replace('h', '2').replace('d', '3')
.replace('x', '4').replace('q', '5').replace('i', '6').replace('t', '7').replace('b', '8'))
row[8] = stat # float pour pouvoir être porté par {np_canals}.
return pd.DataFrame(np_canals[:, 2:],
columns=['Trace_H', 'Pente_H', 'X_H', 'Trace_L', 'Pente_L', 'X_L',
'Stats']), np_top, np_bottom
def plot(ax, df, x, **kwargs):
""" Tracé dynamique - kwargs -> 1, 2 ou 3 clés : line_h, line_l et pattern_verbose (respecter l'orthographe). """
""" Listes à 2 valeurs : [top, bottom] """
col_trace = [kwargs.get('line_h', {}).get('col_trace'), kwargs.get('line_l', {}).get('col_trace')] # trace.
col_pente = [kwargs.get('line_h', {}).get('col_pente'), kwargs.get('line_l', {}).get('col_pente')] # pente.
col_x = [kwargs.get('line_h', {}).get('col_x'), kwargs.get('line_l', {}).get('col_x')]
color = [kwargs.get('line_h', {}).get('c', 'b'), kwargs.get('line_l', {}).get('c', 'darkorange')]
lw = [kwargs.get('line_h', {}).get('lw', 1), kwargs.get('line_l', {}).get('lw', 1)]
ls = [kwargs.get('line_h', {}).get('ls', '-.'), kwargs.get('line_l', {}).get('ls', '-.')]
""" 2 passes : i --> 0=top, 1=bottom. """
trace = [[], []]
for i in range(2):
trace[i] = df.loc[x[-1], col_trace[i]]
if np.isnan(trace[i]): # 282 - 11858
continue
a = df.loc[x[-1], col_pente[i]]
b = trace[i] - a * x[-1]
first_x = df.loc[x[-1], col_x[i]].astype(int)
np_x = np.array(range(first_x, x[-1] + 1))
np_y = a * np_x + b
""" Ne pas afficher une ligne inexistante dans kwargs. """
sns.lineplot(x=np_x, y=np_y, ax=ax, legend=False, color=color[i], lw=lw[i], ls=ls[i])
if kwargs.get('pattern_verbose', True):
""" https://www.centralcharts.com/fr/gm/1-apprendre/7-analyse-technique/27-figure-chartiste/ <-- suite ...
488-trading-avec-les-figures-chartistes <-- ... ici. """
col_type = kwargs.get('Stats', 'Stats')
sign = df.loc[x[-1], col_type]
if np.isnan(sign):
return
sign = str(int(df.loc[x[-1], col_type]))
sign = sign.replace('1', 'a').replace('2', 'h').replace('3', 'd').replace('4', 'x')
sign = sign.replace('5', 'q').replace('6', 'i').replace('7', 't').replace('8', 'b')
""" Cluster : composé de 5 lettres : [a|h|d] [a|h|d] [x|q|i] [t|b] [t|b] => 3 x 4 x 2 x 2 = 108 clusters.
Lettres : [np_a[0](pente-haut), np_a[1](pente-bas), np_a[1](par rapport à np_a[0]), b_top_ante, b_top_now] """
txt, typ = '', 0
if sign[:2] == 'hh':
if sign[-1] == 'b':
txt = "Canal horizontal :\nSortie à la baisse"
else:
txt = "Canal horizontal :\nSortie à la hausse"
elif sign[:3] == 'aaq':
txt = "Canal haussier"
if sign[-2:] == 'bt':
txt += " :\nSortie à la hausse"
elif sign[-2:] == 'tb':
txt = "Drapeau ascendant :\nSortie à la baisse"
elif sign == 'ddptb':
txt = "Canal baissier :\nSortie à la baisse"
elif sign[:2] == 'ha':
txt = "Triangle ascendant"
if sign[-2:] == 'bt':
txt += " :\nSortie à la hausse"
elif sign[:2] == 'dh':
txt = "Triangle descendant"
if sign[-2:] == 'tb':
txt += " :\nSortie à la baisse"
elif sign[:2] == 'da':
txt = "Triangle"
if sign[2] == 'q':
txt += " symétrique"
if sign[-1:] == 'b':
txt += " de creux"
else:
txt += " de sommet"
if sign[-2:] == 'bt':
txt += " :\nSortie à la hausse"
elif sign[-2:] == 'tb':
txt += " :\nSortie à la baisse"
elif sign[:3] == 'ddq':
txt = "Drapeau decendant"
if sign[-2:] == 'bt':
txt += " :\nSortie à la hausse"
elif sign[:3] == 'aax':
txt = "Biseau ascendant"
if sign[-2:] == 'bb':
txt += " :\nSortie à la baisse"
elif sign[:3] == 'ddi':
txt = "Biseau descendant"
if sign[-2:] == 'tt':
txt += " :\nSortie à la hausse"
elif sign[:3] == 'ddx':
txt = "Biseau d'élargissement descendant"
if sign[-1] == 't':
txt += " :\nSortie à la hausse"
elif sign[:3] == 'aai':
txt = "Biseau d'élargissement ascendant"
if sign[-1] == 'b':
txt += " :\nSortie à la baisse"
elif sign[:2] == 'ad':
txt = "Élargissement symétrique" if sign[2] == 'q' else 'Élargissement'
txt += " :\nSortie à la " + ('hausse' if sign[-1] == 't' else 'baisse')
elif sign[:2] == 'ah':
txt = "Élargissement ascendant droit :\nSortie à la " + ('hausse' if sign[-1] == 't' else 'baisse')
elif sign[:2] == 'hd':
txt = "Élargissement decendant droit :\nSortie à la " + ('hausse' if sign[-1] == 't' else 'baisse')
if txt:
""" Couleurs différentes. """
color = '#fddb' if txt.endswith('baisse') else ('#bcfb' if txt.endswith('hausse') else '#dddb')
bbox = {'boxstyle': 'round4', 'fc': color, 'ec': 'k', 'lw': .5} # fc=face color, ec=edge color
y_min, y_max = ax.get_ylim()
y_pos = min(y_max - 10, max(y_min + 15, -10 + (trace[0] + trace[1]) // 2))
ax.annotate(text=txt, xy=(x[-1] - 34, y_pos), bbox=bbox)
def lines_method(self, delay=20, **kwargs):
close = self._get_column(kwargs.pop("close", "Close"))
result = lines(close=close, delay=delay, **kwargs)
return self._post_process(result, **kwargs)
lines.plot = plot
lines.__doc__ = """
La fonction 'lines()' de cet indicateur est la fonction standard.
La fonction 'plot()' de cet indicateur peut être appelée pour tracer les segments de droite à la volée.
|_ Sur un graphique animé, l'appel se fera à chaque rafraîchissement de l'affichage, dans un "hook axis".
Fonction lines() :
==================
Ligne du haut, ligne du bas. Pour chacune d'elles, 3 points d'abscisse importants -> {x_m}, {x_n} et {x_présent} :
- Le batch étudié va de {x_m} à {x_n}, suivi d'un délai : l'ensemble {x_n, x_présent}, ignoré dans les calculs.
- La valeur {x_m} est ajustée automatiquement, elle dépend de la structure du marché.
- La longueur du segment tracé est celle du batch, à savoir {x_m} à {x_n}.
- La droite est une fonction du 1er degré : y = ax + b, mais tous les points du segments ne seront pas mémorisés.
|_ Seules 2 de ses valeurs seront retournées pour pouvoir la synthétser dans code utilisateur : {y} et {a}
|_ {y} est le dernier point, tout à droite du graphique, nommé 'trace'.
|_ {a} est la pente de la droite à ce moment là.
|_ La valeur {b} est alors calculée par le code utilisateur : b = y - ax
|_ Dès lors la fonction y = f(x) peut être utilisée pour synthétiser la droite.
Paramètres :
------------
delay : Nombre de points ignorés avant l'instant présent <-- l'ensemble {x_n, x_présent} décrit ci-dessus.
|_ Valeur par défaut : 1
statistics : booléen. Affiche le type de figure chartiste ainsi que ses prédictions, basées sur les statistiques.
|_ Valeur par défaut : True.
Return :
--------
pd.DataFrame ['Trace_H', 'Pente_H', 'X_H', 'Trace_L', 'Pente_L', 'X_L', 'Stats']
Fonction plot() :
=================
- Cette fonction doit être appelée à chaque rafraîchissement du graphique, car le tracé est dynamique.
- Le code appelant est dans la méthode hook_axis_anim(), la syntaxe d'appel est : ta.line.plot(...)
|_ Voir exemple de code d'appel ci-dessous.
- Elle permet de faire ressortir des figures chartistes : biseaux, canaux, triangles, etc.
- Probabilités de réussite : https://www.centralcharts.com/fr/gm/1-apprendre/7-analyse-technique/27-figure-chartiste/488-trading-avec-les-figures-chartistes
Paramètres :
------------
ax : objet {axis} matplotlib : graphique dans lequel seront affichées les lignes et les annotations.
df : DataFrame contenant les données.
x = liste des abscisses.
kwargs : dictionnaire d'arguments supplémentaires.
Exemple de code d'appel :
-------------------------
if axis_name == 'Principal':
kwargs = dict(
line_h=dict(col_trace='Trace_H', col_pente='Pente_H', col_x='X_H', c='b', lw=2, ls='-.'),
line_l=dict(col_trace='Trace_L', col_pente='Pente_L', col_x='X_L', c='darkorange', lw=2, ls='-.'),
# pattern_verbose=False, # Valeur par défaut : True. <-- Affichage des annotations.
)
ta.lines.plot(ax=o_ax, df=self.df_pilot, x=x, **kwargs)
"""
/trading/strategies/signaux_dynamiques/dyn_02_trendlines/main_02.py
:
""" Version 2022-04-02 """
# Imports externes
import seaborn as sns
import pandas_ta as ta # Pour afficher la documentation d'un indicateur -> help(ta.kama)
# Imports internes
from functions.utils import Dictionary
from show.show_geek import ShowGeek
# noinspection PyTypeChecker,PyUnresolvedReferences
class Geek(ShowGeek):
def __init__(self):
super().__init__()
def central_args(self, l_keys):
def multiply(od, l_params):
gene = l_params[0]
key = l_params[1].split('.')
val = od.read(key)
return val * gene
od_args = Dictionary(dict(
# ************************************ FENÊTRE ************************************
ui=dict(
geometry=(200, 40, 1_000, 700),
abscissa_size=200,
window_title="Signaux dynamiques 02 - Trendlines",
figure_title="Lignes de tendance",
subplots=dict(
Principal=60,
Milieu='',
# Bas='',
),
# show_volume=True,
# animation=False,
interval=10,
),
# ******************************** DATAFRAME PILOTE *******************************
pilot=dict(
instrument='EUR/USD',
table=10,
learn=dict(pc_from=0, pc_to=95, nb_rows=None),
# test=dict(pc_from=80, pc_to=100, nb_rows=None),
test=dict(pc_from=None, pc_to=100, nb_rows=2_000),
),
# ********************************** INDICATEURS **********************************
indics=dict(
backtest=False,
stats=False,
cost=dict(capital=1000, risk=8, spread=1.1),
lignes=dict(
function=ta.lines,
series=['Trace_H', 'Pente_H', 'X_H', 'Trace_L', 'Pente_L', 'X_L', 'Stats'],
params=dict(
# statistics=False,
delay=10,
),
),
custom=dict(
function=self.custom,
params=dict(),
),
),
# *********************************** AFFICHAGE ***********************************
# color : 'r', 'g', 'b', 'y', 'm', 'c', 'w', 'k' - Voir aussi matplotlib named_colors.
# ls = '-', '--', '-.', ':' ('' = invisible) - Voir matplotlib linestyles.
# legend, 2 lettres : [l|c|r][h|m,b], ou bien False.
display=dict(
Principal=dict(
lines=dict(
Trace_H=dict(c='b', lw=.6, ls='-'),
Trace_L=dict(c='darkorange', lw=.6, ls='-'),
Mean_trace=dict(c='g', lw=.3, ls='-'),
),
legend='ch',
color_between=[ # (Ctrl + clic) https://matplotlib.org/stable/gallery/color/named_colors.html
dict(y1='Trace_H', y2='Trace_L', color_up='#0a01', color_down='#00a1'),
],
),
Milieu=dict(
lines={
'Pente_H': dict(c='b', lw=.6, ls='-'),
'Pente_L': dict(c='darkorange', lw=.6, ls='-'),
},
lines_H=[
dict(y=0, lw=.5, ls='-.', c='r'),
],
color_between=[
dict(y1='Pente_H', y2='Pente_L', color_up='#00aa0008', color_down='#00a1'),
],
),
)
))
return Dictionary(od_args.read(l_keys, {}))
def custom(self, **dwargs):
indx = self.df_pilot.index.to_series()
self.df_pilot['Length'] = 2*indx - (self.df_pilot['X_H'] + self.df_pilot['X_L'])
self.df_pilot['Mean_trace'] = (self.df_pilot['Trace_H'] + self.df_pilot['Trace_L']) / 2
def post_params(self):
""" Affichage des peaks. """
t_added = self.get_d_added('lignes')
if t_added is not None:
np_top, np_bottom = t_added
o_ax = self.o_ax('Principal')
sns.scatterplot(x=np_top[:, 0], y=np_top[:, 1], ax=o_ax)
sns.scatterplot(x=np_bottom[:, 0], y=np_bottom[:, 1], ax=o_ax)
ret = super().post_params()
return ret
def hook_axis_anim(self, axis_name, x, o_ax, df, y_min, y_max, get_line):
if axis_name == 'Principal':
""" MAP lines ***** MAP lines ***** MAP lines ***** MAP lines ***** MAP lines ***** MAP lines ***** """
if axis_name == 'Principal':
kwargs = dict(
line_h=dict(col_trace='Trace_H', col_pente='Pente_H', col_x='X_H', c='b', lw=2, ls='-.'),
line_l=dict(col_trace='Trace_L', col_pente='Pente_L', col_x='X_L', c='darkorange', lw=2, ls='-.'),
# pattern_verbose=False, # Valeur par défaut : True.
)
ta.lines.plot(ax=o_ax, df=self.df_pilot, x=x, **kwargs)
return super().hook_axis_anim(axis_name, x, o_ax, df, y_min, y_max, get_line)
if __name__ == '__main__':
Geek()
Les pentes (les dérivées) sont positives, nulles ou négatives. Elles sont représentées dans le graphique 'Milieu'.
Ce tuto termine le mode geek.
Bonjour les codeurs !