Repousser les limites
Avant-propos
Voici un exemple de pensée :
Caractéristiques non aléatoires :
Sujets à maîtriser :
Description
best_zones
proposé lors du tuto Élaboration d'une stratégie de trading a été amélioré en apportant des points supplémentaires de confirmation :Ce n'est pas un indicateur, mais un gabarit, c'est à dire un outil d'aide à la création d'indicateurs.
Les indicateurs que vous créerez devront proposer des points d'entrée dans les zônes de couleur : jaune = achat, rose = vente.
/functions/custom_ta/trend/best_zones.py
:
import numpy as np
import pandas as pd
def mark_peaks(np_vals, gap, bandwidth, l_indx):
if len(l_indx) < 2:
return
l_indx = [0] + l_indx
l_indx.sort()
v1, v2 = np_vals[l_indx[-1], 0], np_vals[l_indx[-2], 0]
if np_vals[l_indx[-2], 0] > np_vals[l_indx[-1], 0]:
""" Sommet : ouverture à la vente. """
for i in range(l_indx[-3]+1, l_indx[-1]):
threshold = v2 - (v2 - v1 - gap) * bandwidth / 100
if i < l_indx[-2]:
threshold = max(threshold, v2-gap)
elif i == l_indx[-2]:
continue
if np_vals[i, 0] >= threshold:
np_vals[i, 3] = 1
else:
""" Vallée : ouverture à l'achat. """
for i in range(l_indx[-3]+1, l_indx[-1]):
threshold = v2 + (v1 - v2 - gap) * bandwidth / 100
if i < l_indx[-2]:
threshold = min(threshold, v2+gap)
elif i == l_indx[-2]:
continue
if np_vals[i, 0] <= threshold:
np_vals[i, 3] = -1
def best_zones(close, gap, bandwidth):
""" Initialisation. """
len_close = close.shape[0]
np_vals = np.zeros((len_close, 4), dtype=np.float32) # 0=Close, 1=Up, 2=Down, 3=Peak
np_vals[:, 0] = close.values # 0 = Close
np_indx = last_indx_max = last_indx_min = 0
max_val = min_val = np_vals[0, 0] # Valeur du 1er Close.
step = 'ini'
b_stay = False
l_max, l_min, l_confirm = list(), list(), list()
""" Grafcet. """
while True:
""" Compteur. """
if b_stay:
""" Le compteur n'avance pas, on reste sur le même {Close}. """
b_stay = False
else:
""" Avant que le compteur n'avance, on mémorise les palliers. """
np_vals[np_indx, 1] = max_val # Valeur du pallier 'Up'.
np_vals[np_indx, 2] = min_val # Valeur du pallier 'Down'.
np_indx += 1 # Avancement du compteur.
if np_indx >= len_close: # Fin.
break
val_now = np_vals[np_indx, 0] # Close.
if step == 'ini':
max_val = max(max_val, val_now)
min_val = min(min_val, val_now)
if val_now == max_val:
last_indx_max = np_indx
if max_val - min_val >= gap:
step = 'up'
elif val_now == min_val:
last_indx_min = np_indx
if max_val - min_val >= gap:
step = 'down'
elif step == 'up':
max_val = max(max_val, val_now)
if val_now == max_val: # Plafond
last_indx_max = np_indx
min_val = max_val - gap
if val_now <= min_val:
np_vals[last_indx_min, 3] = -2
l_min.append(last_indx_min)
l_confirm.append((np_indx, np_vals[last_indx_min, 0]-2, 'confirm_b'))
mark_peaks(np_vals, gap, bandwidth, l_min+l_max)
b_stay = True # La prochaine boucle passera par 'down', mais avec le même {Close} (b_stay = True).
step = 'down'
elif step == 'down':
min_val = min(min_val, val_now)
if val_now == min_val:
last_indx_min = np_indx
max_val = min_val + gap
if val_now >= max_val:
np_vals[last_indx_max, 3] = 2
l_max.append(last_indx_max)
l_confirm.append((np_indx, np_vals[last_indx_max, 0]+2, 'confirm_t'))
mark_peaks(np_vals, gap, bandwidth, l_min+l_max)
b_stay = True
step = 'up'
np_x = np_vals
l_scats = [(indx, np_x[indx, 0], 'top') for indx in l_max] + [(indx, np_x[indx, 0], 'bottom') for indx in l_min]
l_scats.sort()
df_scats = pd.DataFrame(columns=['Indx', 'Close', 'Principal'], data=l_scats)
df_scats[['confirm_x', 'confirm_y', 'c_type']] = l_confirm
return pd.DataFrame(np_vals[:, 1:], columns=['Up', 'Down', 'Peak']), df_scats
def best_zones_method(self, gap=12, bandwidth=80, **kwargs):
close = self._get_column(kwargs.pop("close", "close"))
result = best_zones(close=close, gap=gap, bandwidth=min(100, bandwidth))
return result
best_zones.__doc__ = """
Ceci est un faux indicateur :
=============================
- En effet, les signaux sont calculés en tenant compte du futur dans les historiques.
- Il ne faut donc pas s'en servir comme indicateur de trading.
- Toutefois, il peut être précieux pour la mise au point de stratégies.
Il affiche sur les courbes :
- Les zônes où l'on peut ouvrir un trade à l'achat.
- Les zônes où l'on peut ouvrir un trade à la vente.
- Les zônes où l'on ne doit pas ouvrir de trade.
Il est adapté pour la technique du trade-profit / stop-loss.
Calcul:
Paramètres :
close = Par défaut, série de la colonne CLOSE.
gap = Par défaut, 12. Valeur du take profit et du stop loss (Même valeur pour les deux).
bandwidth = Par défaut, 80. Pourcentage, valeur de 0 à 100.
Permet d'obtenir des zônes d'ouverture plus ou moins larges.
Valeur de retour : un tuple de 2 DataFrames.
DataFrame 0 : ['Close', 'Up', 'Down', 'Peak'], même nombre de lignes que la série d'entrée.
DataFrame 1 : ['indx', 'close', 'Principal'], petite taille, ne contient que les points nécessaires
pour afficher les scatters et les lignes zig-zag sur le signal d'entrée.
Le signal 'Peak' a pour valeurs -2, -1, 0, 1, 2 :
0 : Ne pas entrer dans le marché.
1, 2 : Ouverture à la vente.
-1, -2 : Ouverture à l'achat.
Activation automatique :
Dictionnaire {d_args} du script de stratégie : Commenter / Décommenter les clés de {best_zones}.
"""
ShowGeek
a dû être modifiée en conséquence./show/show_geek.py
:
""" Version 2022-03-15 """
# Imports externes.
import numpy as np
import seaborn as sns
from matplotlib import pyplot as plt
from matplotlib.ticker import FormatStrFormatter
import matplotlib.animation
import copy
# Imports internes.
from functions.utils import Dictionary
from trading.historiques.ctrl_histos import CtrlHistos
# noinspection PyUnusedLocal
class ShowGeek:
def __init__(self): # , **args):
super().__init__() # Appelle Genetic.__init__(), 2ème classe héritée de la classe dérivée.
self.fig = plt.figure(1)
self.df_pilot = None
self.df_scats = None
self.nb_datas = 0
self.l_ax = None
self.pips = 0
""" Animation. """
self.b_anim = True
self.abscissa_size = 0
self.pointer = 0
self.b_paused = False
self.b_reverse = False
self.magn = 1 # Appui sur les flèches : Le pas d'avancement est : 1, 10 ou 100.
self.od = None
""" Signal pilote. """
self.od = Dictionary(self.set_heights())
self.get_df_pilot()
def get_args(self):
args = self.central_args('ui')
return dict( # Ces valeurs seront à la racine du super-dictionnaire self.od.
geometry=args.read('geometry', (100, 40, 1000, 700)), # x, y, w, h.
margins=args.read('margins', (6, 8, 10, 8)), # Marges : haut, droite, bas, gauche.
abscissa_size=args.read('abscissa_size', 600), # Nb de points affichés en abscisse.
window_title=args.read('window_title', "Modèle simple"), # Titre de la fenêtre.
figure_title=args.read('figure_title', "Modèle simple - Ne pas modifier"), # Titre des graphiques.
leader_lines=args.read('leader_lines', True), # Lignes hortogonales de repère, suivi de la souris.
subplots=args.read('subplots', dict( # Noms et hauteurs (en %) des graphiques modifiables.
Principal=65, # Subplot principal : NE PAS MODIFER SON ORTHOGRAPHE.
Milieu='', # '' : Si chaîne vide => Les hauteurs seront automatiquement calculées et réparties.
Bas='',
)),
show_pilot=args.read('show_pilot', True),
show_volume=args.read('show_volume', False),
show_linked_label=args.read('show_linked_label', True),
best_zones=dict(
show=args.read(['best_zones', 'show'], False),
gap=args.read(['best_zones', 'gap'], 20), # Nombre de pips take-profit ou stop-loss.
bandwidth=args.read(['best_zones', 'bandwidth'], 80), # Pourcentage : de 0 à 100.
up=args.read(['best_zones', 'up'], True), # Affichage des palliers haut.
down=args.read(['best_zones', 'down'], True), # Affichage des palliers bas.
scatters=args.read(['best_zones', 'scatters'], True), # Affichage des optimums sous forme de ronds.
confirm=args.read(['best_zones', 'confirm'], True),
zig_zag=args.read(['best_zones', 'zig_zag'], True), # Affichage de la courbe zig-zag.
colors=args.read(['best_zones', 'colors'], ('#ffff0030', '#ff00ff10')), # Coloriage des ouvertures.
),
animation=args.get('animation', True),
# Paramètres généraux supplémentaires ici ...
# Paramètres seaborn : https://www.python-simple.com/python-seaborn/seaborn-general.php
)
def show_ui(self):
""" ****************** Algorithme de construction ****************** """
""" Ajout des indicateurs dans la dataframe {df_pilot} <-- Colonnes ajoutées à {df_pilot}. """
self.add_indics()
""" Ajout du signal pilote. """
self.add_pilot()
""" Paramètres avant la création des graphiques (axes). """
self.pre_params()
""" Distribution des signaux (des colonnes) : un dataframe par axe dans {self.l_ax}. """
self.distrib()
""" Création des graphiques (axes). """
self.build_axis()
""" Paramètres après la création des graphiques (axes). """
self.post_params()
""" Affichage animé. """
self.show()
def animate(self, _=''):
""" Appelé dans la boucle matplotlib.FuncAnimation depuis self.show(). """
def get_line(_line_name):
_from, _to = x[0], x[0] + len(x)
return self.df_pilot[_line_name][_from: _to]
if self.b_paused:
return
""" Gestion du pointeur. """
self.set_pointer()
""" Abscisse commune : liste de {abscissa_size} valeurs depuis {pointer}. """
x = list(range(self.pointer, self.pointer + self.abscissa_size))
""" Parcours des graphiques (axes). """
for axis_name in self.od.keys():
if axis_name == 'figure':
continue
o_ax = self.od.read([axis_name, 'o_ax'])
if o_ax is None:
continue
df = self.od.read([axis_name, 'df'])
y_min, y_max = 10 ** 9, -10 ** 9
if not (df is None or df.empty):
""" Parcours des courbes (lines). """
for o_line in self.get_axis_lines(axis_name):
""" Plusieurs courbes (lines) dans un graphique (o_ax). """
line_name = o_line.get_label()
serie = df[line_name]
y = serie[self.pointer: self.pointer + self.abscissa_size]
""" Égalisation des tailles de x et y. """
len_min = min(len(x), len(y))
x, y = copy.copy(x[:len_min]), copy.copy(y[:len_min])
""" Hook pour l'injection de paramètres dans cette courbe. """
d_attr = self.hook_line_anim(axis_name, line_name, x, y, o_ax, df, o_line)
if d_attr is None:
d_attr = {}
""" Injection des datas à afficher. """
if d_attr.get('visible', True):
o_line.set_ydata(y)
o_line.set_xdata(x)
if not np.isnan(y).all():
y_min = min(y_min, np.nanmin(y))
y_max = max(y_max, np.nanmax(y))
""" Calcul des limites des ordonnées (y). """
if y_max < y_min:
y_min, y_max = -10, 10
y_padding = max(0, (y_max - y_min) * 0.05) # Marges top et bottom dans les axes (5%)
y_min, y_max = (round(y_min - y_padding, 6), round(y_max + y_padding, 6)) if y_max >= y_min else (-10, 10)
y_min = max(y_min, -10 ** 9)
y_max = min(y_max, 10 ** 9)
""" Limites x et y. """
o_ax.set_xlim(x[0], x[-1])
if y_min == y_max:
y_min = y_max = None
o_ax.set_ylim(y_min, y_max)
self.hook_axis_anim(axis_name, x, o_ax, df, y_min, y_max, get_line) # l_hook = [axis_name, x, o_ax, df]
if not self.b_anim:
plt.draw()
def set_pointer(self):
if self.b_reverse:
self.pointer -= self.magn
if self.pointer < 0:
self.pointer = self.nb_datas - self.abscissa_size
else:
self.pointer += self.magn
if self.pointer > self.nb_datas - self.abscissa_size:
self.pointer = 0
def add_pilot(self):
""" Affichage conditionnel du signal-pilote. """
self.od.write(['Principal', 'signals', 'Close', 'visible'], self.od.read(['figure', 'show_pilot'], False))
self.add_df('Principal', ['Close'])
def pre_params(self):
""" Fenêtre : taille, position, titre. """
mgr = plt.get_current_fig_manager()
mgr.set_window_title(self.od.read(['figure', 'window_title'], 'Stratégie'))
win = mgr.window
win.setGeometry(*self.od.read(['figure', 'geometry']))
""" Paramètres seaborn : https://www.python-simple.com/python-seaborn/seaborn-general.php """
sns.set_context(self.od.read(['figure', 'seaborn_context'], 'paper')) # paper, notebook, talk, poster
sns.set_style(self.od.read(['figure', 'seaborn_style'], 'darkgrid')) # white, dark, whitegrid, darkgrid, ticks
""" Ini attributs. """
self.abscissa_size = self.od.read(['figure', 'abscissa_size'], 600)
self.b_anim = self.od.read(['figure', 'animation'], False)
""" Affichage des best-zones dans le graphique (subplot) contenant le signal 'Close'. """
subplot_name = self.od.read(['figure', 'best_zones', 'subplot'])
if self.od.read(['figure', 'best_zones', 'show'], False) and subplot_name:
""" Appel de l'indicateur de trend {best_zones}. """
gap = self.od.read(['figure', 'best_zones', 'gap'], 24)
bandwidth = self.od.read(['figure', 'best_zones', 'bandwidth'], 80)
self.df_pilot[['Up', 'Down', 'Peak']], self.df_scats = self.df_pilot.ta.best_zones(gap=gap,
bandwidth=bandwidth)
""" Affichage des palliers haut et bas. """
l_columns = list()
if self.od.read(['figure', 'best_zones', 'up'], False):
l_columns.append('Up')
if self.od.read(['figure', 'best_zones', 'down'], False):
l_columns.append('Down')
self.add_df(subplot_name, l_columns)
def post_params(self):
""" Recherche des paramètres dans le dictionnaire. """
""" Titre des graphiques. """
axis_title = self.od.read(['figure', 'figure_title'])
if isinstance(axis_title, str) and len(axis_title) > 0:
o_ax = self.get_first_axis()
if self.is_axis(o_ax):
o_ax.set_title(axis_title)
""" y axis : position et visibilité des graduations (ticks). """
for l_keys in self.od.key_list():
if l_keys[-1] != 'y_ticks':
continue
position = self.od.read(l_keys)
o_ax = self.od.read([l_keys[0], 'o_ax'])
if not self.is_axis(o_ax):
continue
if position is False:
o_ax.axes.set_yticks([]) # Suppression (ticks + ticklabels + grille).
pass
else:
o_ax.yaxis.set_ticks_position(position) # left, right
""" Suppression de la grille. """
for l_keys in self.od.key_list():
if l_keys[-1] != 'grid':
continue
if self.od.read(l_keys) is False:
o_ax = self.od.read([l_keys[0], 'o_ax'])
o_ax.grid(False) # Suppression de la grille.
""" x axis : visibilité des graduations (ticks) """
for l_keys in self.od.key_list():
if l_keys[-1] != 'x_ticks':
continue
o_ax = self.od.read([l_keys[0], 'o_ax'])
if not self.is_axis(o_ax):
continue
if self.od.read(l_keys) is False:
o_ax.tick_params(axis='x', colors='#fff0') # '#fff0' = invisible
""" Légendes. """
for axis_name in self.get_axis_names():
o_ax = self.od.read([axis_name, 'o_ax'])
if self.get_axis_lines(axis_name):
legend = self.od.read([axis_name, 'legend'])
pos = ['auto', 'rh', 'lh', 'lb', 'rb', 'rm', 'lm', 'rm', 'cb', 'ch', 'cm']
if isinstance(legend, bool):
o_ax.legend().set_visible(legend) # Visibilité de la légende.
elif legend in pos:
o_ax.legend(loc=pos.index(legend)).set_title(axis_name)
else:
o_ax.legend().set_title(axis_name)
def build_axis(self):
""" Construction de la fenêtre Windows. """
""" Marges générales et répartition des subplots. """
l_margins = self.od.read(['figure', 'margins'], (6, 8, 10, 8)) # Marges : haut, droite, bas, gauche.
x, w = l_margins[3] / 100, 1 - (l_margins[1] + l_margins[3]) / 100
y_offset, y_height = l_margins[0], 100 - l_margins[0] - l_margins[2] # Valeurs : 0 à 100.
ax_top, y = 100, 1 - y_offset / 100
first_axis, axis_ante = '', ''
for axis_name, h in self.od.read(['figure', 'subplots']).items():
o_ax = None
if isinstance(h, (int, float)):
if first_axis == '':
first_axis = axis_name
else:
self.od.write([axis_ante, 'x_ticks'], False)
axis_ante = axis_name
h = h * y_height / 10_000
y -= h
o_ax = self.fig.add_axes((x, y, w, h), sharex=self.od.read([first_axis, 'o_ax']))
self.od.write([axis_name, 'geometry'], (x, y, w, h))
else:
""" Graphique jumelé (twined axis). """
tw = h.split()
if tw[0] == 'twinned':
""" Axes jumelés. """
parent_name = tw[-1]
parent_axis = self.od.read([parent_name, 'o_ax'])
if self.is_axis(parent_axis):
o_ax = parent_axis.twinx()
if self.is_axis(o_ax):
self.od.write([axis_name, 'o_ax'], o_ax)
""" Ajout artificiel d'attributs vides. """
o_ax.filled = list()
o_ax.note_x = o_ax.annotate('', (0, 0))
o_ax.note_y = o_ax.annotate('', (0, 0))
""" Repères : lignes hortogonales sous le curseur de la souris. Voir on_mouse_move() """
o_ax.hline = o_ax.axhline(y=-1000, color='k', lw=.2, ls='--') # -1000 = Souris hors figure.
o_ax.vline = o_ax.axvline(x=-1000, color='k', lw=.2, ls='--')
""" Format des graduations y. """
o_ax.yaxis.set_major_formatter(FormatStrFormatter('%.1f'))
""" Initialisation des courbes (DataFrames) dans les graphiques (axes). """
df = self.od.read([axis_name, 'df'])
if axis_name == self.od.read(['figure', 'best_zones', 'subplot']) and self.is_df(self.df_scats):
if self.od.read(['figure', 'best_zones', 'scatters'], False):
sns.scatterplot(data=self.df_scats, x='Indx', y='Close', hue=axis_name, ax=o_ax)
if self.od.read(['figure', 'best_zones', 'confirm'], False):
sns.scatterplot(data=self.df_scats, x='confirm_x', y='confirm_y', ax=o_ax, hue='c_type',
alpha=.3)
if self.od.read(['figure', 'best_zones', 'zig_zag'], False):
sns.lineplot(data=self.df_scats, x='Indx', y='Close', ax=o_ax, lw=.5)
o_ax.set_ylabel('')
""" Lines. """
if self.is_df(df) and len(df.columns) > 0:
sns.lineplot(data=df[:1], ax=o_ax)
""" Position (left, right) des graduations y. """
b_odd = False
l_axes = self.get_axis_names()
if self.od.read(['Volume', 'twined_axis']) == 'Principal':
b_odd = bool(l_axes.index('Principal') % 2) # Position impaire de 'Principal'.
for indx, axis_name in enumerate(l_axes):
self.od.write([axis_name, 'y_ticks'], 'left' if b_odd == indx % 2 else 'right')
""" Coloriage vertical conditionnel des zônes d'entrée en position (ouverture de trade). """
b_zones = self.od.read(['figure', 'best_zones', 'show'], False)
colors = self.od.read(['figure', 'best_zones', 'colors'])
if b_zones and colors is not None:
peaks = self.df_pilot['Peak']
fc_b, fc_t = colors
l_args = [
dict(x=peaks.index, y1=10 ** 5, y2=-10 ** 5, where=peaks <= -1, fc=fc_b, ec='#fff0', interpolate=True),
dict(x=peaks.index, y1=10 ** 5, y2=-10 ** 5, where=peaks >= 1, fc=fc_t, ec='#fff0', interpolate=True),
]
for ax_name in l_axes:
oax = self.o_ax(ax_name)
if self.is_axis(oax):
self.fill_between(oax, l_args)
def set_heights(self):
""" Calcul des hauteurs (en %) subplots. La somme fait 100%. """
args = self.get_args()
if not isinstance(args['subplots'], dict):
raise SystemExit("Super-dictionnaire : La clé 'subplots' doit exister et doit contenir un dictionnaire. ")
l_heights, sum_heights, nb_zeros = list(), 0, 0
for name, pc_height in args['subplots'].items():
height = pc_height if (isinstance(pc_height, int) and pc_height > 0) else 0
nb_zeros += 1 if height == 0 else 0
sum_heights += height
l_heights.append((name, height))
if sum_heights > 100:
""" La somme des hauteurs dépasse 100% : on effectue une réduction proportionnelle. """
l_heights = [(name, pc_height) for (name, pc_height) in l_heights if pc_height > 0] # Suppression des 0.
l_heights = [(name, pc_height * 100 / sum_heights) for (name, pc_height) in l_heights] # Normalisation.
else:
""" Si la somme == 100%, on ne fait rien. Si elle est < 100%, on ajoute un subplot, nommé 'rest'. """
if nb_zeros == 0:
l_heights.append(('rest', 0))
nb_zeros = 1
h_rest = (100 - sum_heights) / nb_zeros
if h_rest == 0:
l_heights = [(name, pc_height) for (name, pc_height) in l_heights if pc_height > 0] # Suppression des 0
for i, (name, pc_height) in enumerate(l_heights):
if pc_height == 0:
l_heights[i] = (name, h_rest)
args['subplots'] = dict(l_heights)
""" Ajout d'arguments par défaut. """
if 'seaborn_context' not in args: # Valeurs possibles : paper, notebook, talk, poster
args['seaborn_context'] = 'paper'
if 'seaborn_style' not in args: # Valeurs possibles : white, dark, whitegrid, darkgrid, ticks
args['seaborn_style'] = 'darkgrid'
return {'figure': args}
def add_df(self, subplot_name, l_columns, df=None):
""" Création du dataframe qui sera associé à un graphique.
- Méthode appelée par la méthode distrib() de la classe dérivée.
- Le DataFrame {df} a plusieurs colonnes, chacune associée à une courbe. """
""" Vérification. """
if self.od.read(['figure', 'subplots', subplot_name]) is None:
raise SystemExit(f"Erreur dans self.distrib() : Le subplot {subplot_name} n'existe pas.")
l_cols = list()
for column in l_columns:
""" Suppression des colonnes inexistantes. """
try:
self.df_pilot[column] if df is None else df[column]
except KeyError:
""" Si la colonne n'existe pas ... """
print(f"La colonne '{column}' n'existe pas dans le DataFrame.")
continue
l_cols.append(column)
""" Fusion par ajout de colonnes dans le df existant. """
df = self.df_pilot[l_cols] if df is None else df[l_cols]
df_exist = self.od.read([subplot_name, 'df'])
if df_exist is not None:
df_exist[df.columns] = df
df = df_exist # On retrouve les colonnes de df_exist en premières places.
if not self.is_df(df):
return
self.od.write([subplot_name, 'df'], df)
for column in df.columns:
if column.lower() == 'close':
""" Si ce dataframe a une colonne 'Close', on paramètre 3 traitements distincts par défaut :
- Une étiquette-suiveuse (linked_label).
- Un graphique jumelé (twined_axis) contenant l'affichage du volume en semi-transparence.
- Éléments du graphique jumelé non affichés : y_ticks, y_ticks_labels, grille, légende. """
self.od.write(['figure', 'best_zones', 'subplot'], subplot_name)
if self.od.read(['figure', 'show_linked_label'], False):
self.od.write([subplot_name, 'linked_label'], 'Close')
if self.od.read(['figure', 'table_name'], None) is None:
self.od.write(['figure', 'show_volume'], False)
if self.od.read(['figure', 'show_volume'], False):
self.od.write(['figure', 'subplots', 'Volume'], f'twinned with {subplot_name}')
self.od.write(['Volume', 'df'], self.df_pilot[['Volume']])
self.od.write(['Volume', 'signals', 'Volume', 'visible'], False)
self.od.write(['Volume', 'twined_axis'], subplot_name)
self.od.write(['Volume', 'y_ticks'], False), # Suppression (ticks + ticklabels + grille).
self.od.write(['Volume', 'legend'], False) # Suppression de la légende.
def get_df_pilot(self):
od_pilot = self.central_args('pilot')
instrument = od_pilot.read('instrument', 'EUR/USD')
self.pips = .01 if instrument.endswith('JPY') else .0001
table_name = od_pilot.read('table', 10)
self.df_pilot = CtrlHistos(instrument).get_pilot(**od_pilot) # DataFrame
self.nb_datas = self.df_pilot.shape[0]
self.od.write(['figure', 'instrument'], instrument)
self.od.write(['figure', 'table_name'], table_name)
self.od.write(['figure', 'nb_points'], self.nb_datas)
def linked_label(self, axis_name, x, o_ax, df):
""" https://matplotlib.org/stable/tutorials/text/annotations.html#annotating-with-text-with-box """
linked_label = self.od.read([axis_name, 'linked_label'])
if linked_label:
bbox = {'boxstyle': 'larrow', 'fc': '#ff02', 'ec': 'k', 'lw': .5} # fc=face color, ec=edge color
indx = min(x[-1], df.shape[0] - 1)
last_value = round(df[linked_label][indx], 2) # En pips.
""" Affichage de l'étiquette actuelle (m = marge). """
o_ax.note_y.remove() # Effacement de l'étiquette précédente.
m = self.abscissa_size / 70
o_ax.note_y = o_ax.annotate(last_value, (x[-1], last_value), xytext=(x[-1] + m, last_value), bbox=bbox)
def paint_volume(self, x, o_ax, df):
y = df['Volume'][x[0]: x[-1] + 1] * .8 # Affichage du volume à 80% d'amplitude.
l_args = [
dict(x=x, y1=y, y2=0, fc='#0079a333', ec='#fff0')
]
self.fill_between(o_ax, l_args)
def fill_between(self, o_ax, l_params):
""" Colorie entre 2 valeurs, conditionnellement.
@param o_ax: Axis matplotlib : graphique contenant des courbes.
@param l_params: Liste de coloriages. Chaque élément est un dictionnaire personnalisé. Voir lien ci-dessous.
https://matplotlib.org/stable/api/_as_gen/matplotlib.axes.Axes.fill_between.html """
""" Effacement du coloriage antérieur, nécessaire si les couleurs ont une transparence. """
if not self.is_axis(o_ax):
raise SystemExit(f"Erreur dans self.fill_between() : L'objet o_ax ({o_ax}) n'est pas un axis.")
for fill in o_ax.filled:
o_ax.collections.remove(fill)
o_ax.filled.clear()
""" l_fill contient autant de paramètres que de coloriages à effectuer. """
for p in l_params:
""" Égalisation des vecteurs. """
if p.get('x') is None:
p['x'] = range(self.df_pilot.shape[0])
len_min = len(p['x'])
if self.is_df(p['y1']):
len_min = min(len_min, len(p['y1']))
if self.is_df(p['y2']):
len_min = min(len_min, len(p['y2']))
p['x'] = p['x'][:len_min]
""" Coloriage. """
o_ax.filled.append(o_ax.fill_between(**p))
""" *********************************** Helpers. ************************************ """
def trace_hline(self, axis_name, y, **kwargs):
o_ax = self.o_ax(axis_name)
if self.is_axis(o_ax):
o_ax.axhline(xmin=0, xmax=1, y=y, **kwargs)
def trace_vline(self, axis_name, x, **kwargs):
o_ax = self.o_ax(axis_name)
if self.is_axis(o_ax):
o_ax.axvline(ymin=0, ymax=1, x=x, **kwargs)
@staticmethod
def is_axis(o_ax):
return o_ax.__class__.__name__ == 'Axes'
@staticmethod
def is_df(df):
return df.__class__.__name__ in ['DataFrame', 'Series']
def get_axis_names(self, b_twin=False):
if b_twin:
""" Liste de noms de tous les graphiques (subplots) de base et twined_axis. """
return [ax_name for ax_name in list(self.od.read(['figure', 'subplots']).keys()) if ax_name != 'rest']
else:
""" Liste de noms de tous les graphiques (subplots) de base. """
return [ax_name for (ax_name, val) in list(self.od.read(['figure', 'subplots']).items())
if ax_name != 'rest' and not str(val).startswith('twinned')]
def get_first_axis(self):
l_axes = self.get_axis_names()
o_ax = self.o_ax(l_axes[0])
if self.is_axis(o_ax):
return o_ax
def get_last_axis(self):
l_axes = self.get_axis_names()
o_ax = self.o_ax(l_axes[-1])
if self.is_axis(o_ax):
return o_ax
def _get_all_ax(self):
l_ax = list()
for key, val in self.od.read(['figure', 'subplots']).items():
l_ax.append(self.od.read([key, 'o_ax']))
return l_ax
def get_axis_lines(self, axis_name):
""" Retourne une liste d'objets o_line. """
o_ax = self.o_ax(axis_name)
l_olines = list()
for o_line in o_ax.lines:
""" Plusieurs courbes (lines) dans un graphique (o_ax). """
line_name = o_line.get_label()
if line_name.startswith('_line'):
continue
l_olines.append(o_line)
return l_olines
def o_ax(self, axis_name):
return self.od.read([axis_name, 'o_ax'])
def df(self, axis_name):
return self.od.read([axis_name, 'df'])
def ax_df(self, axis_name):
o_ax = self.od.read([axis_name, 'o_ax'])
df = self.od.read([axis_name, 'df'])
return o_ax, df
""" ***************************** Méthodes surchargées. ***************************** """
@staticmethod
def get_args_ui():
pass
@staticmethod
def central_args(l_keys):
raise SystemExit("show_geek.py > ShowGeek :\nLa méthode 'central_args()' doit être surchargée.")
def get_pilot(self):
raise SystemExit("show_geek.py > ShowGeek :\nLa méthode 'get_pilot()' doit être surchargée.")
def add_indics(self, d_args=None):
raise SystemExit("show_geek.py > ShowGeek :\nLa méthode 'add_indics()' doit être surchargée.")
def distrib(self):
pass
""" ***************************** Méthodes à surcharger. **************************** """
def hook_line_anim(self, axis_name, line_name, x, y, o_ax, df, o_line):
return self.od.read([axis_name, 'signals', o_line.get_label()])
def hook_axis_anim(self, axis_name, x, o_ax, df, y_min, y_max, get_line):
""" Affichage de 2 éléments distincts :
- Volumes, semi-transparent, sans bordure.
- Étiquette-suiveuse """
""" 1 - Affichage des volumes, semi-transparent, sans bordure. """
if axis_name == 'Volume':
self.paint_volume(x, o_ax, df)
""" 2 - Étiquette-suiveuse à droite du graphique. """
self.linked_label(axis_name, x, o_ax, df)
""" **************************** Événements et affichage. *************************** """
def key_event(self, ev):
def arrows(b_forward):
b_reverse = self.b_reverse
self.b_paused, self.b_reverse = False, False if b_forward else True
self.animate()
self.b_paused = True
self.b_reverse = b_reverse
self.magn = 1
keycode = ev.key
if keycode == ' ':
""" Pause on/off. """
self.b_paused = not self.b_paused
elif keycode == 'tab':
""" Inversion de sens. """
self.b_reverse = not self.b_reverse
elif keycode == 'right' or keycode == 'left':
""" Appui sur touche flèche droite ou flèche gauche. """
arrows(keycode == 'right')
elif keycode == 'up' or keycode == 'down':
""" Appui sur touche flèche haut ou flèche bas. """
self.magn = 10
arrows(keycode == 'up')
elif keycode == 'pageup' or keycode == 'pagedown':
""" Appui sur touche flèche page-haut ou flèche page-bas. """
self.magn = 100
arrows(keycode == 'pageup')
def on_mouse_move(self, ev):
""" Le style des lignes est définidans self.build_axis(). """
for o_ax in self.l_ax:
o_ax.vline.set_data([ev.xdata, ev.xdata], [0, 1])
o_ax.hline.set_data(([0, 1], [ev.ydata, ev.ydata]) if o_ax == ev.inaxes else ([0, 0], [0, 0]))
if ev.xdata is not None and ev.ydata is not None:
ax = self.get_last_axis()
ax.note_x.remove() # Effacement de l'étiquette précédente.
bbox = {'boxstyle': 'round4', 'fc': '#ff02', 'ec': 'k', 'lw': .5} # fc=face color, ec=edge color
ax.note_x = ax.annotate(text=round(ev.xdata), xy=(ev.xdata, ax.viewLim.y0), bbox=bbox)
if self.pointer <= 1 or not self.b_anim:
plt.draw() # Nécessaire si pas d'animation (lorsque self.pointer reste à 1).
def show(self):
self.l_ax = self._get_all_ax()
self.fig.canvas.mpl_connect('key_press_event', self.key_event)
if self.od.read(['figure', 'leader_lines']):
self.fig.canvas.mpl_connect('motion_notify_event', self.on_mouse_move)
if self.b_anim:
ani = matplotlib.animation.FuncAnimation(self.fig, self.animate, frames=None, interval=.1, repeat=True)
# |_ La variable {ani} doit exister pour empêcher le garbage collector de supprimer l'animation.
self.animate()
plt.show()
\trading\strategies\partie_2_creer\Strategie_01_Best_zones
.\trading\strategies\partie_2_creer\Strategie_01_Best_zones\main_01.py
:
# Imports internes
from functions.utils import Dictionary
from show.show_geek import ShowGeek
from trading.strategies.backtest import BackTest
class Geek(ShowGeek, BackTest):
def __init__(self):
super(Geek, self).__init__()
self.show_ui()
@staticmethod
def central_args(l_keys):
od_args = Dictionary(dict(
ui=dict(
geometry=(100, 40, 1200, 600),
abscissa_size=200,
window_title="Guide de construction d'un indicateur",
figure_title="Faux indicateur, ne pas l'utiliser pour trader :"
" Gabarit pour construire un vrai indicateur.",
subplots=dict(
Principal=70,
Bas='',
),
best_zones=dict(
show=True,
gap=20, # Nombre de pips take-profit ou stop-loss.
bandwidth=80, # Pourcentage : de 0 à 100.
up=True, # Affichage des palliers haut.
down=True, # Affichage des palliers bas.
scatters=True, # Affichage des optimums.
confirm=True,
zig_zag=True, # Affichage de la courbe zig-zag.
colors=('#ffff0030', '#ff00ff10'), # Coloriage ouvertures.
),
),
pilot=dict(
instrument='EUR/USD',
table=5,
# pc_from=0,
pc_to=100,
nb_rows=20_000,
),
))
return Dictionary(od_args.read(l_keys, {}))
def add_indics(self, od_args=None):
# ... à coder.
pass
if __name__ == '__main__':
Geek()
Il faut effectuer des backtests pour vérifier que les rebonds l'emportent sur les traversées (breakouts).
statistics
./functions/custom_ta/statistics/polarity.py
:
# Imports externes.
import numpy as np
import pandas as pd
# Imports internes.
from functions.indicators import np_rolling
def polarity(close, length, nb_lines, nb_peaks, b_sorted_force):
""" close = Pandas Serie. """
nb_rows = close.shape[0]
np_datas = np.full((nb_rows, 5), np.nan, dtype=np.float32) # Matrice de nan -> shape : (nb_rows, 5)
np_datas[:, 0] = close.values
np_datas[2:, 2] = np_datas[1:-1, 0] - np_datas[:-2, 0] # d0 = Derniers deltas.
np_datas[2:, 3] = np_datas[2:, 0] - np_datas[1:-1, 0] # d1 = Avant-derniers deltas.
np_datas[:, 1] = np_datas[:, 2] * np_datas[:, 3] # Si d0*d1<0 => peak (sommet ou creux)
np_roll_close = np_rolling(np_datas[:, 0], length) # Vecteur-close -> matrice 2D - shape : (nb_rows, nb_lines)
np_mask = np_datas[:, 1] < 0 # Masque.
np_roll_mask = np_rolling(np_mask, length, dtype=np.bool) # Vecteur-masque -> matrice 2D (nb_rows, nb_lines)
""" En mode 'renko', les valeurs absolues de la colonne 1 de np_datas sont égales (double contrôle). """
b_renko = (abs(np_datas[-1, 1]) == abs(np_datas[-2, 1])) and (abs(np_datas[-2, 1]) == abs(np_datas[-3, 1]))
np_best = np.full((nb_rows, nb_lines), np.nan, dtype=np.float32) # Matrice de nan -> shape : (nb_rows, nb_lines)
for i in range(1, nb_rows):
mask = np_roll_mask[i] # Le masque (i) ... [False False False False False True True False ... False]
l_vals = np_roll_close[i-1] # ...correspond au close (i-1) [12160 12180 12200 12200 12220 12240 ... 12240]
np_peaks = l_vals[mask] # l_vals filtré : [12240 12220 12280 12260 12280 12260 12280 12260 12280 12220]
if b_renko:
unique, counts = np.unique(np_peaks, return_counts=True) # [12220. 12240. 12260. 12280.], [2 1 3 4]
else:
try:
counts, l_binends = np.histogram(np_peaks, bins=nb_lines)
except (Exception,):
continue
unique = (l_binends[:-1] + l_binends[1:]) / 2
mask = counts >= nb_peaks
unique = unique[mask]
counts = counts[mask]
if b_sorted_force:
np_stack = np.column_stack((counts, unique)) # [[2 12220], [1 12240], [3 12260], [4 12280]]
col = 1
else:
np_stack = np.column_stack((unique, counts)) # [[12220 2], [12240 1], [12260 3], [12280 4]]
col = 0
np_sorted_peaks = list(np_stack[np.argsort(np_stack[:, 0])][::-1][:, col]) + [np.nan for _ in range(nb_lines)]
# |_ [12280.0, 12260.0, 12220.0, 12240.0, nan, nan, nan, nan, nan, nan]
""" Résultat. """
np_best[i] = np_sorted_peaks[:nb_lines] # [12280.0, 12260.0, 12220.0, 12240.0, nan, nan]
return pd.DataFrame(data=np_best, columns=[f'Line{i}' for i in range(nb_lines)]) # ['Line0', 'Line1', ...]
def polarity_method(self, length=40, nb_lines=6, nb_peaks=3, b_sorted_force=False, **kwargs):
close = self._get_column(kwargs.pop("close", "close"))
result = polarity(close=close, length=length, nb_lines=nb_lines, nb_peaks=nb_peaks, b_sorted_force=b_sorted_force)
return self._post_process(result, **kwargs)
polarity.__doc__ = """
Les lignes de polarité sont les lignes horizontales de support et/ou de résistance.
Seules les traces à l'instant t {indx} sont mémorisées, permettant de voir où se situe le signal dans ce contexte.
L'indicateur retourne plusieurs signaux :
- Line0, Line1, ..., Line_n : Niveaux des peaks des {length} points passés : les plus souvent rencontrés.
Args :
length : Nombre de points passés à étudier. Valeur par défaut 40.
nb_lines : Nombre de lignes retournées. Valeur par défaut : 6.
nb_peaks : Chaque ligne retournée relie au moins {nb_peaks} peaks (sommets et creux). Valeur par défaut : 3.
b_sorted_force : Lignes triées par ordre d'influente. Valeur par défaut : False.
Kwargs :
NA.
Return :
pd.DataFrame: [Line0, Line1, ..., Line{n}]
"""
np_rolling
du module indicators
, qui n'existe pas. Créons la.indicators.py
./functions/indicators.py
:
""" Version 2022-03-15 """
# Imports externes
import copy
import warnings
import numpy as np
import scipy.signal as sig
warnings.filterwarnings('ignore')
def get_periodic(len_vector=100_000, signal_name='Sinus', period=100, amplitude=20):
"""
@param len_vector: Taille du vecteur de sortie.
@param signal_name: 'Sinus', 'Cosinus', 'Carré', 'Triangle', 'Dent de scie montante' ou 'Dent de scie descendante'
@param period: Période du signal.
@param amplitude: Amplitude du signal.
@return: vecteur numpy de taille len_vector.
"""
nb_iter = 2 + len_vector // (2 * period)
""" y = Calcul d'une seule période. """
if signal_name == 'Sinus':
x = np.linspace(0, 2 * np.pi, period + 1)[:-1]
y = np.reshape(amplitude * np.sin(x), (period, 1))
elif signal_name == 'Cosinus':
x = np.linspace(0, 2 * np.pi, period + 1)[:-1]
y = np.reshape(amplitude * np.cos(x), (period, 1))
elif signal_name == 'Carré':
y = np.ones((period, 1))
y[:period // 2] *= amplitude
y[period // 2:] *= - amplitude
elif signal_name == 'Triangle':
y = np.ones((period, 1))
y[:period // 2] *= np.linspace(amplitude, -amplitude, period // 2, endpoint=False).reshape((period // 2, 1))
y[period // 2:] *= np.linspace(-amplitude, amplitude, period // 2, endpoint=False).reshape((period // 2, 1))
elif signal_name == 'Dent de scie montante':
y = np.ones((period, 1)) * np.linspace(-amplitude, amplitude, period).reshape((period, 1))
else: # Dent de scie descendante
y = np.ones((period, 1)) * np.linspace(amplitude, -amplitude, period).reshape((period, 1))
""" Répétition du motif. """
y = np.concatenate((y, y) * nb_iter)
return y[: len_vector]
def get_random(len_vector=100_000, typ='Normal', seeder=None, b_sum=True, **kwargs):
"""
@param len_vector: Taille du vecteur de sortie.
@param typ: Normal, poisson, binomial ou logistic.
@param seeder: Si seeder >=0, le signal aléatoire sera toujours le même, en fonction de la valeur seeder.
Cela permet de répéter des expériences à l'identique.
@param b_sum: Si Vrai, chaque valeur est cumulée, imitant un historique de trading.
@param kwargs: Certains 'randoms' nécessitent des arguments spécifiques : nb trials, lambda, etc.
@return: Vecteur de shape (len_vector,)
"""
""" https://numpy.org/doc/1.16/reference/routines.random.html """
state_ini = np.random.get_state() # On mémorise l'état du seeder.
np.random.seed(None) if seeder is None or seeder < 0 else np.random.seed(seeder)
if typ == 'Poisson':
lambd = kwargs.get('lambda', 10)
vector = np.random.poisson(lambd, len_vector) - lambd
elif typ == 'Binomial':
nb_trials = kwargs.get('nb_trials', 20) # Nombre d'essais'
success_prob = kwargs.get('success_prob', .5) # Probabilité de succès. 0.5 = centré sur 0.
vector = np.random.binomial(nb_trials, success_prob, len_vector) - nb_trials / 2
elif typ == 'Logistic':
loc = kwargs.get('loc', 0) # Location (défaut = 0)
scale = kwargs.get('scale', 2.) # Scale
vector = np.random.logistic(loc, scale, len_vector)
else: # 'Normal'
amplitude = kwargs.get('amplitude', 10)
vector = np.random.standard_normal(len_vector) * amplitude / 2
np.random.set_state(state_ini) # On rétablit l'état initial.
return np.cumsum(vector) if b_sum else vector
def sma(v_signal, period):
""" Moyenne mobile arithmétique. https://fr.wikipedia.org/wiki/Moyenne_mobile
:param v_signal: vecteur numpy.
:param period: Longueur de la sma.
:return: Vecteur plus court que celui d'entrée (longueur raccourcie de {period})
"""
if period <= 1:
return v_signal
_mm_control(v_signal, period, 'sma')
tiles = rolling_window(v_signal, period) # Warning supprimé par warnings.filterwarnings('ignore'), dans imports.
return np.nanmean(tiles, axis=1) # RuntimeWarning: Mean of empty slice
def ema(v_signal, period):
""" Moyenne Mobile Exponentielle. https://fr.wikipedia.org/wiki/Moyenne_mobile
:param v_signal: vecteur numpy.
:param period: Longueur de la ema.
:return: Vecteur plus court que celui d'entrée (longueur raccourcie de {period})
"""
if period <= 1:
return v_signal
# beta = (1 - .8645) ** (1 / period)
beta = (period - 1) / (period + 1)
""" vp = Vecteur de pondération. """
vp = np.array([beta ** (period - n - 1) for n in range(period)])
tiles = rolling_window(v_signal, period)
return np.sum(tiles * vp / np.sum(vp), axis=1)
def smma(v_signal, period, nb_last=2): # Remplacer nb_last par un % de period.
""" Moyenne mobile lissée.
https://admiralmarkets.com/fr/formation/articles/indicateurs-forex/indicateur-moyenne-mobile-mt4
:param v_signal: vecteur numpy à traiter.
:param period: Longueur de la smma.
:param nb_last: nb derniers.
:return: Vecteur plus court que celui d'entrée (longueur raccourcie de {period})
"""
if period <= 1:
return v_signal
tiles = rolling_window(v_signal, period)
_sum = np.nanmean(tiles[:-nb_last, :], axis=1) * (period - nb_last)
try:
return (_sum + np.sum(tiles[nb_last:, -nb_last:], axis=1)) / period
except(Exception,):
return v_signal
def lwma(v_signal, period, ratio=1.02):
""" Moyenne mobile linéaire pondérée (Linear Weighted Moving Average, LWMA).
https://www.instaforex.eu/fr/forex_technical_indicators/moving_average
:param v_signal: vecteur numpy.
:param period: Longueur de la lwma.
:param ratio: Raison géométrique de la pondération.
:return: Vecteur plus court que celui d'entrée (longueur raccourcie de {period})
"""
if period <= 1:
return v_signal
""" Suite géométrique de raison {ratio}, pour pondérer les valeurs (les plus récentes = les plus fortes). """
np_pond = np.ones(period)
for p in range(period):
np_pond[p] = ratio ** p
_sum_pond = sum(np_pond)
tiles = rolling_window(v_signal, period)
tiles_pond = tiles * np_pond
_sum_rows = np.sum(tiles_pond, axis=1)
return _sum_rows / _sum_pond
def is_invalid_vector(v_signal, indx):
if indx >= 0:
return v_signal.shape[0] < indx + 1
else:
return v_signal.shape[0] < -indx
def single_rsi(v_signal, _from, _to, period=14):
indx = _from
while indx < _to:
if is_invalid_vector(v_signal, indx - period - 1):
val = np.nan
else:
""" Dérivée de v_signal (les deltas). """
l_dv = v_signal[indx - period: indx] - v_signal[indx - period - 1: indx - 1]
up, down = 0, .0001
for dv in l_dv:
up += max(0, dv)
down += max(0, -dv)
rs = up / down
val = 100 - (100 / (1 + rs))
yield indx, val
indx += 1
def rsi(v_signal, period):
""" https://fr.wikipedia.org/wiki/Relative_strength_index
RS = H / B => RSI = 100 - (100 / (1 + RS)) <-- Normalisé à [0, 100]
"""
""" La dérivée de v_signal (les deltas). """
dv = v_signal[1:] - v_signal[:-1]
""" Tuilage numpy. """
rw = rolling_window(dv, period)
""" Copies profondes. """
rwh, rwb = copy.deepcopy(rw), copy.deepcopy(rw * -1)
""" Suppression des valeurs négatives. """
rwh[rwh < 0] = 0
rwb[rwb < 0] = 0
""" Coefficient Beta. """
beta = (period - 1) / (period + 1)
""" vp = Vecteur de pondération. """
vp = np.array([beta ** (period - n - 1) for n in range(period)])
""" Somme du vecteur de pondération. """
svp = np.sum(vp)
""" Moyennes mobiles exponentielles. """
h = np.sum(rwh * vp / svp, axis=1)
b = np.sum(rwb * vp / svp, axis=1)
""" Suppression des zéros pour éviter des erreurs lors de la division par b """
b[b <= 0] = .0001
rs = h / b
""" Normalisation entre 0 et 100. """
return 100 - (100 / (1 + rs))
def get_mm(v_signal, typ='SMA', period=14, **kwargs):
"""
@param v_signal: Vecteur sur lequel la moyenne doit être calculée.
@param typ: 'SMA', 'EMA', 'SMMA', 'LWMA'
@param period: Longueur de la moyenne.
@return:
"""
if period > 0:
if typ == 'SMA': # Moyenne Mobile Arithmétique.
return sma(v_signal, period)
elif typ == 'EMA': # Moyenne Mobile Exponentielle.
return ema(v_signal, period)
elif typ == 'SMMA': # Moyenne Mobile Lissée.
nb_last = kwargs.get('nb_last', 4)
return smma(v_signal, period, nb_last=nb_last)
elif typ == 'LWMA': # Moyenne Mobile Linéaire Pondérée.
ratio = kwargs.get('ratio', 1.02)
return lwma(v_signal, period, ratio=ratio)
""" Si le type est inconnu ou la période < 1, on retourne le signal d'entrée. """
return v_signal
def macd(v_signal, d_val):
""" Ordre des calculs des vecteurs : https://fr.wikipedia.org/wiki/MACD
1) _long = mm(v_signal)
2) _short = mm(v_signal)
3) _diff = (_short - _long)
4) _signal = mm(_diff)
5) _macd = (_diff - _signal) """
_long, _short, _diff, _signal, _macd = None, None, None, None, None,
for key, val in d_val.items():
typ = val.get('typ', 'SMA')
period = val.get('period', 14)
kwargs = {
'nb_last': min(period, val.get('nb_last', 4)),
'ratio': d_val.get('ratio', 1.02)
}
if key == 'long':
_long = get_mm(v_signal, typ=typ, period=period, **kwargs)
elif key == 'short':
_short = get_mm(v_signal, typ=typ, period=period, **kwargs)
else:
nb_rows = min(_long.shape[0], _short.shape[0])
_diff = _short[-nb_rows:] - _long[-nb_rows:]
""" key = 'signal' """
_signal = get_mm(_diff, typ=typ, period=period, **kwargs)
nb_rows = min(_diff.shape[0], _signal.shape[0])
_macd = _diff[-nb_rows:] - _signal[-nb_rows:] # diff - signal
return v_signal, _long, _short, _diff, _signal, _macd
def _mm_control(v_signal, period, indic):
if len(v_signal) < period:
raise SystemExit(f"Erreur {indic} :\nLa période ({period}) est supérieure"
f" au nombre de points à traiter ({len(v_signal)})")
def savitzky_golay(v_signal, window_size, order, mode='interp'):
""" (Ctrl + Clic)
https://docs.scipy.org/doc/scipy-1.5.0/reference/generated/scipy.signal.savgol_filter.html#scipy.signal.savgol_filter
"""
if mode == 'interp' and len(v_signal) < window_size:
""" En mode 'interp', v_size doit être >= window_size. """
return np.full(v_signal.shape, np.nan)
else:
return sig.savgol_filter(v_signal, window_size, order, mode=mode)
def rolling_window(a, window): # a.shape = (2, 5), window = 3
""" https://www.rigtorp.se/2011/01/01/rolling-statistics-numpy.html
:param a: tableau numpy
:param window: tranche glissante.
return: Le tableau de sortie 's' a une dimension de plus que a.
|_ Ex : Avec w=3 et a.shape = (2, 7) => s.shape = (2, 5, 3)
|_ a.shape = (x, y) => s.shape = (x, y-w+1, w)
Chaque ligne de a est traitée -> Traitement d'une ligne :
row = [0 1 2 3 4 5], window = 3 => [[0 1 2]
[1 2 3]
(6,) => (6-3+1, 3) [2 3 4] <- shape = (4, 3)
[3 4 5]]
"""
shape = a.shape[:-1] + (max(0, a.shape[-1] - window + 1), window) # (2,) + (5 - 3 + 1, 3) = (2, 3, 3)
strides = a.strides + (a.strides[-1],) # (20, 4) + (4,) -> (20, 4, 4)
return np.lib.stride_tricks.as_strided(a, shape=shape, strides=strides)
def np_rolling(vector, length, dtype=np.float32):
""" Retourne np_array à 2 dimensions, même nb de lignes que df_series, mais {length} colonnes. Le début -> nan. """
np_vector = vector.values if vector.__class__.__name__ == 'Series' else vector
shape = max(0, np_vector.shape[0] - length + 1), length
strides = np_vector.strides + (np_vector.strides[-1],)
np_roll = np.full((np_vector.shape[0], length), np.nan, dtype=dtype)
np_roll[length-1:, :] = np.lib.stride_tricks.as_strided(np_vector, shape=shape, strides=strides)
return np_roll
# ********************************************* Générateurs *********************************************
def to_generate(v_status):
"""
On cherche, dans l_status, des successions de 0. Chacune donne lieu à un tuple d'index (x, y). x inclu, y exclu.
Ces tuples informent le code appelant des parties à générer.
@param v_status: vecteur numpy de flags (0 ou 1).
@return: Liste de tuples.
"""
nb_lines = v_status.shape[0]
start, b_start = 0, False
lt_gener = list()
for k, flag in enumerate(v_status):
""" i==0 ou i==1 """
if not b_start and flag == 0:
""" 1er passage. """
b_start = True
start = k - nb_lines # Valeur négative
elif b_start and flag == 1:
b_start = False
lt_gener.append((start, k - nb_lines)) # Valeurs négatives.
if b_start:
""" Si, en fin de boucle, le flag est toujours à 0, on finalise à la taille du vecteur. """
lt_gener.append((start, 0))
return lt_gener
def single_sma(v_signal, period, indx):
""" Retourne un seul point. : SMA[t] = Somme (cours[t-p] à cours[t]) / p """
return np.nanmean(v_signal[indx - period: indx])
def single_ema(v_signal, v_self_signal, period, indx):
""" Retourne un seul point. : EMA[t] = EMA[t-1] - alpha * (Cours[t] – EMA[t-1])"""
alpha = 2 / (period + 1)
if np.isnan(v_self_signal[indx - 1]): # Donnée antérieure nécessaire, mais absente ... on la régénère.
val_ante = np.nan
for v_in in v_signal[1: indx]:
if np.isnan(val_ante) and not np.isnan(v_in):
val_ante = v_in
val_ante -= alpha * (val_ante - v_in)
else:
val_ante = v_self_signal[indx - 1]
val = val_ante - alpha * (val_ante - v_signal[indx])
return val
def single_smma(v_signal, v_self_signal, period, indx, nb_last):
""" Moyenne mobile lissée.
https://admiralmarkets.com/fr/formation/articles/indicateurs-forex/indicateur-moyenne-mobile-mt4 """
if np.isnan(v_self_signal[indx - 1]):
val = single_sma(v_signal, period, indx)
else:
sum1 = (period - nb_last) * v_self_signal[indx - 1]
sum2 = np.nansum(v_signal[indx - nb_last: indx])
val = (sum1 + sum2) / period
return val
def single_lwma(v_signal, period, indx, np_pond):
""" Moyenne mobile linéaire pondérée (Linear Weighted Moving Average, LWMA). """
""" https://www.metatrader5.com/fr/mobile-trading/iphone/help/chart/indicators/trend_indicators/moving_average
Suite géométrique de raison {ratio}, pour pondérer les valeurs (les plus récentes = les plus fortes). """
return np.nansum(v_signal[indx - period: indx] * np_pond) / np.sum(np_pond)
def gener_mm(v_signal_in, v_self_signal, _from, _to, typ='SMA', period=14, **kwargs):
""" https://www.instaforex.eu/fr/forex_technical_indicators/moving_average
@param v_signal_in: Vecteur sur lequel la moyenne doit être calculée.
@param v_self_signal: Valeurs précédentes du vecteur produit.
@param _from: Début d'itération.
@param _to: Fin d'itération.
@param typ: 'SMA', 'EMA', 'SMMA', 'LWMA'
@param period: Longueur de la moyenne.
@return: Yield un tuple (pointeur, valeur).
"""
nb_last = kwargs.get('nb_last', 3)
ratio = kwargs.get('ratio', 1.02)
""" Création d'une liste de pondération. """
np_pond = np.ones(period) # shape = (period, 0)
for p in range(period):
np_pond[p] = ratio ** p
indx = _from
while indx < _to:
if period == 0:
val = v_signal_in[indx]
elif v_signal_in.shape[0] + indx < period or np.isnan(min(v_signal_in[indx - period: indx])):
val = np.nan
elif typ == 'SMA':
""" Moyenne Mobile Arithmétique. """
val = single_sma(v_signal_in, period, indx)
elif typ == 'EMA':
""" Moyenne Mobile Exponentielle. """
val = single_ema(v_signal_in, v_self_signal, period, indx)
elif typ == 'SMMA':
""" Moyenne Mobile Lissée. """
val = single_smma(v_signal_in, v_self_signal, period, indx, nb_last)
elif typ == 'LWMA':
""" Moyenne mobile linéaire pondérée (Linear Weighted Moving Average, LWMA). """
val = single_lwma(v_signal_in, period, indx, np_pond)
else:
val = np.nan
yield indx, val
indx += 1
def main():
""" Lancement autonome, nécessaire en phase de développement pour la MAP (mise au point des fonctions). """
pass
if __name__ == '__main__':
main()
\trading\strategies\partie_2_creer\Strategie_02_Lignes_polarisees
.\trading\strategies\partie_2_creer\Strategie_02_Lignes_polarisees\main_02.py
:
# Imports internes
from functions.utils import Dictionary
from show.show_geek import ShowGeek
from trading.strategies.backtest import BackTest
class Geek(ShowGeek, BackTest):
def __init__(self):
super(Geek, self).__init__() # GeneticAlgorithm.__init__() est appelé dans ShowGeek.
self.show_ui()
@staticmethod
def central_args(l_keys):
od_args = Dictionary(dict(
ui=dict( # Ces valeurs seront à la racine du super-dictionnaire self.od.
geometry=(100, 40, 1200, 600), # x, y, w, h. Par défaut : (100, 40, 1200, 700).
abscissa_size=60, # Nb de points affichés en abscisse. Par défaut : 600.
window_title="Création 02 : L'indicateur 'polarity'", # Titre de la fenêtre.
figure_title="Lignes polarisées : de support et de résistance", # Titre des graphiques.
subplots=dict( # Noms et hauteurs (en %) des graphiques modifiables. Supprimer / Ajouter.
Principal=100, # Subplot principal : NE PAS MODIFER SON ORTHOGRAPHE.
),
best_zones=dict(
show=True,
gap=20, # Nombre de pips take-profit ou stop-loss.
bandwidth=80, # Pourcentage : de 0 à 100.
up=False, # Affichage des palliers haut.
down=False, # Affichage des palliers bas.
scatters=False, # Affichage des optimums.
confirm=False,
zig_zag=False, # Affichage de la courbe zig-zag.
colors=('#ffff0030', '#ff00ff10'), # Coloriage ouvertures.
),
),
pilot=dict( # Dans cet exemple : les 10 000 derniers points de EUR/USD en Renko10.
instrument='EUR/USD', # EURUSD, USDJPY, EURCHF, USDCAD, NZDUSD, EURGBP, EURJPY, GBPJPY, GBPCHF, ...
table=7, # int (ex : 10) = Renko, str (ex: 'H1') = Candles, None = Ticks.
# pc_from=0, # en % : les valeurs après {pc_from}.
pc_to=100, # en % dans la table-db. Les valeurs avant {pc_to}.
nb_rows=10_000, # Nb points étudiés. Non utilisé si les 2 existent ({pc_from} ET {pc_to}).
),
indics=dict(
polarity=dict(length=80, nb_lines=6, nb_peaks=4), # , b_sorted_force=True),
),
))
return Dictionary(od_args.read(l_keys, {}))
def add_indics(self, od_args=None):
od_args = self.central_args('indics')
nb_lines = od_args.read(['polarity', 'nb_lines'], 3)
l_cols = [f'Line{i}' for i in range(nb_lines)] # Liste des noms de lignes polarisées.
self.df_pilot[l_cols] = self.df_pilot.ta.polarity(**od_args['polarity'])
def distrib(self):
nb_lines = self.central_args(['indics', 'polarity']).read('nb_lines')
self.add_df('Principal', [f'Line{i}' for i in range(nb_lines)])
if __name__ == '__main__':
Geek()
...\site-packages\pandas_ta\custom.py
.
import pandas_ta as ta
, puis faire help(ta.custom)
...
/functions/custom_ta/volatility/normalize.py
.normalize
doit impérativement se trouver aux 5 endroits indiqués.normalize_method()
, elle doit avoir les valeurs par défaut des paramètres.normalize()
.Les zones de saturation provoquent des pertes d'information.
volatility
./functions/custom_ta/volatility/normalize.py
:
# Imports externes.
import numpy as np
import pandas as pd
# Imports internes.
from functions.indicators import np_rolling
def normalize(close, length):
np_roll = np_rolling(close, length)
np_min = np.min(np_roll, axis=1)
np_max = np.max(np_roll, axis=1)
np_norm = (close.values - np_min) / (np_max - np_min) # Valeurs de 0 à 1 ...
return pd.DataFrame(np_norm, columns=['Norm'])
def normalize_method(self, length=30, **kwargs):
close = self._get_column(kwargs.pop("close", "close"))
result = normalize(close=close, length=length)
return self._post_process(result, **kwargs)
normalize.__doc__ = """ https://datascientest.com/normalisation-des-donnees
Normalisation d'un signal, borné entre 0 et 1.
vector_norm = (vector - vector_min) / (vector_max - vector_min)
Arguments:
==========
length : longueur du vecteur. Valeur par défaut : 30.
Return :
========
pd.Series: Column name = 'Norm'.
"""
\trading\strategies\partie_2_creer\Strategie_03_Normalisation
.\trading\strategies\partie_2_creer\Strategie_03_Normalisation\main_03.py
:
# Imports internes
from functions.utils import Dictionary
from show.show_geek import ShowGeek
from trading.strategies.backtest import BackTest
class Geek(ShowGeek, BackTest):
def __init__(self):
super(Geek, self).__init__() # GeneticAlgorithm.__init__() est appelé dans ShowGeek.
self.show_ui()
@staticmethod
def central_args(l_keys):
od_args = Dictionary(dict(
ui=dict( # Ces valeurs seront à la racine du super-dictionnaire self.od.
geometry=(100, 40, 1200, 700), # x, y, w, h. Par défaut : (100, 40, 1200, 700).
abscissa_size=2_000, # Nb de points affichés en abscisse. Par défaut : 600.
window_title="Création 03 : Normalisation", # Titre de la fenêtre.
figure_title="Normalisation - entre 0 et 1 - v_norm = (v-min) / (max-min)", # Titre des graphiques.
subplots=dict( # Noms et hauteurs (en %) des graphiques modifiables. Supprimer / Ajouter.
Principal=60, # Subplot principal : NE PAS MODIFER SON ORTHOGRAPHE.
Milieu=''
),
best_zones=dict(
show=True,
gap=20, # Nombre de pips take-profit ou stop-loss.
bandwidth=80, # Pourcentage : de 0 à 100.
up=False, # Affichage des palliers haut.
down=False, # Affichage des palliers bas.
scatters=False, # Affichage des optimums.
confirm=False,
zig_zag=False, # Affichage de la courbe zig-zag.
colors=('#ffff0030', '#ff00ff10'), # Coloriage ouvertures.
),
),
pilot=dict( # Dans cet exemple : les 10 000 derniers points de EUR/USD en Renko10.
instrument='EUR/USD', # EURUSD, USDJPY, EURCHF, USDCAD, NZDUSD, EURGBP, EURJPY, GBPJPY, GBPCHF, ...
table=7, # int (ex : 10) = Renko, str (ex: 'H1') = Candles, None = Ticks.
# pc_from=0, # en % : les valeurs après {pc_from}.
pc_to=100, # en % dans la table-db. Les valeurs avant {pc_to}.
nb_rows=10_000, # Nb points étudiés. Non utilisé si les 2 existent ({pc_from} ET {pc_to}).
),
indics=dict(
normalize=dict(length=100),
),
))
return Dictionary(od_args.read(l_keys, {}))
def add_indics(self, od_args=None):
od_args = self.central_args('indics')
self.df_pilot['NORM'] = self.df_pilot.ta.normalize(**od_args['normalize'])
def distrib(self):
self.add_df('Milieu', ['NORM'])
def post_params(self):
self.trace_hline('Milieu', .5, lw=.4, ls='-.', c='b')
super().post_params() # Code avant -> inséré, code après -> ajouté.
if __name__ == '__main__':
Geek()
Le problème de saturation est résolu.
volatility
./functions/custom_ta/volatility/standardize.py
:
# Imports externes.
import numpy as np
import pandas as pd
# Imports internes.
from functions.indicators import np_rolling
def standardize(close, length, weighted):
np_roll = np_rolling(close, length) # nan sur les {length-1} 1ères lignes. shape = (close.shape[0], length).
w = np.linspace(1, weighted, length) # Coefficients de pondération.
np_mean = np.average(np_roll, weights=w, axis=1)
np_std = np.std(np_roll, axis=1)
np_norm = (np_roll[:, -1] - np_mean) / np_std
return pd.DataFrame(np_norm, columns=['Stand'])
def standardize_method(self, length=30, weighted=1, **kwargs):
close = self._get_column(kwargs.pop("close", "close"))
result = standardize(close=close, length=length, weighted=weighted)
return self._post_process(result, **kwargs)
standardize.__doc__ = """ https://datascientest.com/normalisation-des-donnees
vector_stand = (vector - μ) / σ <-- μ = moyenne, σ = écart-type.
Arguments:
==========
length : longueur du vecteur. Valeur par défaut : 30.
weighted : Coefficient de pondération pour le calcul de la moyenne mobile. Valeur par défaut : 1 (sma).
Return :
========
pd.Series: Column name = 'Stand'.
"""
\trading\strategies\partie_2_creer\Strategie_04_Standardisation
.\trading\strategies\partie_2_creer\Strategie_04_Standardisation\main_04.py
:
# Imports internes
from functions.utils import Dictionary
from show.show_geek import ShowGeek
from trading.strategies.backtest import BackTest
class Geek(ShowGeek, BackTest):
def __init__(self):
super(Geek, self).__init__() # GeneticAlgorithm.__init__() est appelé dans ShowGeek.
self.show_ui()
@staticmethod
def central_args(l_keys):
od_args = Dictionary(dict(
ui=dict( # Ces valeurs seront à la racine du super-dictionnaire self.od.
geometry=(100, 40, 1200, 700), # x, y, w, h. Par défaut : (100, 40, 1200, 700).
abscissa_size=2000, # Nb de points affichés en abscisse. Par défaut : 600.
window_title="Création 04 : Standardisation", # Titre de la fenêtre.
figure_title="Standardisation - centré sur 0 - v_stand = (v-μ) / σ", # Titre des graphiques.
subplots=dict( # Noms et hauteurs (en %) des graphiques modifiables. Supprimer / Ajouter.
Principal=60, # Subplot principal : NE PAS MODIFER SON ORTHOGRAPHE.
Milieu=''
),
best_zones=dict(
show=True,
gap=20, # Nombre de pips take-profit ou stop-loss.
bandwidth=80, # Pourcentage : de 0 à 100.
up=False, # Affichage des palliers haut.
down=False, # Affichage des palliers bas.
scatters=False, # Affichage des optimums.
confirm=False,
zig_zag=False, # Affichage de la courbe zig-zag.
colors=('#ffff0030', '#ff00ff10'), # Coloriage ouvertures.
),
),
pilot=dict( # Dans cet exemple : les 10 000 derniers points de EUR/USD en Renko10.
instrument='EUR/USD', # EURUSD, USDJPY, EURCHF, USDCAD, NZDUSD, EURGBP, EURJPY, GBPJPY, GBPCHF, ...
table=7, # int (ex : 10) = Renko, str (ex: 'H1') = Candles, None = Ticks.
# pc_from=0, # en % : les valeurs après {pc_from}.
pc_to=100, # en % dans la table-db. Les valeurs avant {pc_to}.
nb_rows=10_000, # Nb points étudiés. Non utilisé si les 2 existent ({pc_from} ET {pc_to}).
),
indics=dict(
standardize=dict(length=100, weighted=3),
),
))
return Dictionary(od_args.read(l_keys, {}))
def add_indics(self, od_args=None):
od_args = self.central_args('indics')
self.df_pilot['STAND'] = self.df_pilot.ta.standardize(**od_args['standardize'])
def distrib(self):
self.add_df('Milieu', ['STAND'])
def post_params(self):
self.trace_hline('Milieu', 0, lw=.4, ls='-.', c='b')
super().post_params() # Code avant -> inséré, code après -> ajouté.
if __name__ == '__main__':
Geek()
Si la moyenne mobile est d'ordre 1 (weighted=1 ← valeur par défaut), ce sera une moyenne mobile simple (SMA). Dans cet exemple, elle est d'ordre 3.
pandas-ta
, vos modifications seraient perdues./functions/custom_ta/{dans sa famille}/
.zlma
présente un bug. Nous allons le cloner, corriger le bug, puis enfin, nous en servir.\trading\strategies\partie_2_creer\Strategie_05_Surcharge
.\trading\strategies\partie_2_creer\
Strategie_05_Surcharge
\main_05.py
:
# Imports internes
from functions.utils import Dictionary
from show.show_geek import ShowGeek
from trading.strategies.backtest import BackTest
class Geek(ShowGeek, BackTest):
def __init__(self):
super(Geek, self).__init__() # GeneticAlgorithm.__init__() est appelé dans ShowGeek.
self.show_ui()
@staticmethod
def central_args(l_keys):
od_args = Dictionary(dict(
ui=dict( # Ces valeurs seront à la racine du super-dictionnaire self.od.
geometry=(100, 40, 1200, 700), # x, y, w, h. Par défaut : (100, 40, 1200, 700).
abscissa_size=200, # Nb de points affichés en abscisse. Par défaut : 600.
window_title="Création 05 : Surcharge", # Titre de la fenêtre.
figure_title="Surcharge : correction de bug", # Titre des graphiques.
subplots=dict( # Noms et hauteurs (en %) des graphiques modifiables. Supprimer / Ajouter.
Principal=100, # Subplot principal : NE PAS MODIFER SON ORTHOGRAPHE.
),
best_zones=dict(
show=True,
gap=20, # Nombre de pips take-profit ou stop-loss.
bandwidth=80, # Pourcentage : de 0 à 100.
up=False, # Affichage des palliers haut.
down=False, # Affichage des palliers bas.
scatters=False, # Affichage des optimums.
confirm=False,
zig_zag=False, # Affichage de la courbe zig-zag.
colors=('#ffff0030', '#ff00ff10'), # Coloriage ouvertures.
),
),
pilot=dict( # Dans cet exemple : les 10 000 derniers points de EUR/USD en Renko10.
instrument='EUR/USD', # EURUSD, USDJPY, EURCHF, USDCAD, NZDUSD, EURGBP, EURJPY, GBPJPY, GBPCHF, ...
table=10, # int (ex : 10) = Renko, str (ex: 'H1') = Candles, None = Ticks.
# pc_from=0, # en % : les valeurs après {pc_from}.
pc_to=100, # en % dans la table-db. Les valeurs avant {pc_to}.
nb_rows=10_000, # Nb points étudiés. Non utilisé si les 2 existent ({pc_from} ET {pc_to}).
),
indics=dict(
zlma=dict(length=20, mamode='sma'),
),
))
return Dictionary(od_args.read(l_keys, {}))
def add_indics(self, od_args=None):
od_args = self.central_args('indics')
self.df_pilot['ZLMA'] = self.df_pilot.ta.zlma(**od_args['zlma'])
def distrib(self):
self.add_df('Principal', ['ZLMA'])
if __name__ == '__main__':
Geek()
On obtient cette erreur : 'module' object is not callable.
mamode='sma'
← Voir code de main_05.py
ci-dessus, ligne 43.
ema (par défaut), dema, hma, linreg, rma, sma, swma, t3, tema, trima, vidya, wma
ema
et dema
.zlma
, dans ses imports.custom_ta/overlap/
.overlap
./functions/custom_ta/overlap/zlma.py
:
# -*- coding: utf-8 -*-
# from . import (
# dema, ema, hma, linreg, rma, sma, swma, t3, tema, trima, vidya, wma
# )
""" Modification WL -> Erreur dans la version originale :
- TypeError: 'module' object is not callable.
- Correction : Les lignes d'import ci-dessus ont été remplacées par la ligne ci-dessous"""
from pandas_ta import dema, ema, hma, linreg, rma, sma, swma, t3, tema, trima, vidya, wma
from pandas_ta.utils import get_offset, verify_series
def zlma(close, length=None, mamode=None, offset=None, **kwargs):
"""Indicator: Zero Lag Moving Average (ZLMA)"""
# Validate Arguments
length = int(length) if length and length > 0 else 10
mamode = mamode.lower() if isinstance(mamode, str) else "ema"
close = verify_series(close, length)
offset = get_offset(offset)
if close is None: return
# Calculate Result
lag = int(0.5 * (length - 1))
close_ = 2 * close - close.shift(lag)
if mamode == "dema": zlma = dema(close_, length=length, **kwargs)
elif mamode == "hma": zlma = hma(close_, length=length, **kwargs)
elif mamode == "linreg": zlma = linreg(close_, length=length, **kwargs)
elif mamode == "rma": zlma = rma(close_, length=length, **kwargs)
elif mamode == "sma": zlma = sma(close_, length=length, **kwargs)
elif mamode == "swma": zlma = swma(close_, length=length, **kwargs)
elif mamode == "t3": zlma = t3(close_, length=length, **kwargs)
elif mamode == "tema": zlma = tema(close_, length=length, **kwargs)
elif mamode == "trima": zlma = trima(close_, length=length, **kwargs)
elif mamode == "vidya": zlma = vidya(close_, length=length, **kwargs)
elif mamode == "wma": zlma = wma(close_, length=length, **kwargs)
else: zlma = ema(close_, length=length, **kwargs) # "ema"
# Offset
if offset != 0:
zlma = zlma.shift(offset)
# Handle fills
if "fillna" in kwargs:
zlma.fillna(kwargs["fillna"], inplace=True)
if "fill_method" in kwargs:
zlma.fillna(method=kwargs["fill_method"], inplace=True)
# Name & Category
zlma.name = f"ZL_{zlma.name}"
zlma.category = "overlap"
return zlma
# Pour inhiber cet indicateur surchargé, renommer l'extension de ce fichier, par exemple .py --> .pyx.
def zlma_method(self, length=None, mamode=None, offset=None, **kwargs):
close = self._get_column(kwargs.pop("close", "close"))
result = zlma(close=close, length=length, mamode=mamode, offset=offset, **kwargs)
return self._post_process(result, **kwargs)
zlma.__doc__ = """Zero Lag Moving Average (ZLMA)
The Zero Lag Moving Average attempts to eliminate the lag associated
with moving averages. This is an adaption created by John Ehler and Ric Way.
Sources:
https://en.wikipedia.org/wiki/Zero_lag_exponential_moving_average
Calculation:
Default Inputs:
length=10, mamode=EMA
EMA = Exponential Moving Average
lag = int(0.5 * (length - 1))
SOURCE = 2 * close - close.shift(lag)
ZLMA = MA(kind=mamode, SOURCE, length)
Args:
close (pd.Series): Series of 'close's
length (int): It's period. Default: 10
mamode (str): Options: 'dema', 'ema', 'hma', 'linreg', 'rma', 'sma', 'swma', 't3', 'tema', 'trima', 'vidya', 'wma'.
|_ Default: 'ema'
offset (int): How many periods to offset the result. Default: 0
Kwargs:
fillna (value, optional): pd.DataFrame.fillna(value)
fill_method (value, optional): Type of fill method
Returns:
pd.Series: New feature generated.
"""
Le commentaire, en début de code, indique le problème ... et sa correction.
main_05.py
.Le problème est résolu, quelle que soit la valeur de l'argument mamode
.
Notez la méthode zlma_method()
ajoutée, nécessaire pour activer cet indicateur.
Le bug dans un indicateur n'est pas la seule raison pour procéder de la sorte.
Vous pouvez vouloir améliorer un indicateur, l'alléger ou l'enrichir, ou bien en créer un nouveau qui y ressemble à quelques détails près.
- Si votre indicateur porte le même nom que l'original, il sera prioritaire.
- S'il porte un nom différent, par exemple ici
my_zlma.py
, vous devrez faire le choix dans le code appelant.
Bonjour les codeurs !