Aperçu visuel des gains d'une stratégie de trading
Avant-propos
TA-Lib est complémentaire à Pandas-ta
:
Pandas-ta
appellera par défaut les indicateurs de TA-Lib
si ce dernier est installé.TA_Lib-0.4.24-cp39-cp39-win_amd64.whl
dans un dossier quelconque.$ pip install D:\anaconda\TA_Lib-0.4.24-cp39-cp39-win_amd64.whl
$ pip list
ou dans les settings de Pycharm : TA-Lib -> 0.4.24
Description
ShowGeek
BackTest
CtrlHistos
ont dû être adaptées.CtrlHistos
, nouvelle version → \trading\historiques\ctrl_histos.py
:
""" Version 2022-03-05 """
# Imports externes
import datetime
import gzip
import os # http://www.python-simple.com/python-modules-fichiers/os-path.php (Ctrl + clic)
import requests
from io import BytesIO, StringIO
import pandas as pd
from pandas_ta.custom import create_dir, import_dir # https://github.com/twopirllc/pandas-ta
import keyboard # https://www.delftstack.com/fr/howto/python/python-detect-keypress/
# Imports internes
from functions.utils import DateTime, Utils
from trading.historiques.db_candle import DbCandle
from trading.historiques.db_tick import DbTick
class CtrlHistos:
def __init__(self, instrument):
""" https://github.com/fxcm/MarketData
Tous les symboles :
AUDCAD, AUDCHF, AUDJPY, AUDNZD, AUDUSD, CADCHF, CADJPY, EURAUD, EURCHF,
EURGBP, EURJPY, EURNZD, EURUSD, GBPCAD, GBPCHF, GBPJPY, GBPNZD, GBPUSD,
NZDCAD, NZDCHF, NZDJPY, NZDUSD, USDCAD, USDCHF, USDJPY, USDTRY
Les plus utilisés (Dans l'ordre d'importance) :
EURUSD, USDJPY, EURCHF, USDCAD, NZDUSD, EURGBP, EURJPY, GBPJPY, GBPCHF, GBPUSD """
self.instrument = instrument
self.symbol = instrument.replace('/', '')
self.dt = DateTime()
self.ut = Utils()
self.kb_break = 'f12' # 'shift', 'ctrl', ... à votre convenance.
self.pips = .01 if instrument.endswith('JPY') else .0001
self.symbol_dir = None
self.db_candle = None
self.db_tick = None
self.setup()
def setup(self):
""" Les données peuvent se trouver sur un disque dur SSD autre que celui en cours.
Le cas échéant, db_path doit être modifié (chemin absolu).
Il est également possible, pour les besoins des tests, d'avoir plusieurs bases de données.
|_ Il faudra alors modifier db_path chaque fois que l'on désire switcher. """
db_path = os.path.dirname(__file__) # <--- Chemin par défaut, à modifier si autre disque dur.
self.symbol_dir = os.path.abspath(f'{db_path}/db/{self.symbol}') # Dossier des fichiers d'historiques.
os.makedirs(self.symbol_dir, exist_ok=True)
self.db_candle = DbCandle(self)
self.db_tick = DbTick(self)
def _helper(self, key, *l_params):
""" ************ Collection d'utilitaires. Nombre de paramètres variable. ************ """
if key == 'nb_weeks_in_year':
""" Renvoie le nombre de semaines dans l'année {year}. """
year = l_params[0] # l_params = [année dont on veut connaître le nombre de semaines].
if len(l_params) > 1:
y, num_week, _ = datetime.date.today().isocalendar()
if y == year:
return num_week - 1
o_dat = self.dt.get_date_from_dtstamp(self.dt.get_dtstamp_from_dtstr(f'{year}-12-28', '%Y-%m-%d'))
return o_dat.isocalendar()[1]
elif key == 'l_iso_yw':
""" Renvoie une liste de tuples ISO (iso_year, iso_str_week) : <-- exemple de tuple : (2019, '08')
- Pour la devise en cours et son type (tick ou candle).
- Toutes les semaines, du début à aujourd'hui.
- L'année est un int, la semaine une str ('02', '12', ...) """
b_ticks = l_params[0] # l_params = [b_ticks].
st_yw = set() # Le set() évite les doublons.
since = 2018 if b_ticks else 2012
yw_today = datetime.date.today().isocalendar()[:2]
for year in range(since, yw_today[0] + 1):
for week in range(1, 54):
iso = self.dt.isoyw_from_fxyw(year, week)
st_yw.add((iso[0], f'0{iso[1]}'[-2:]))
if (year, week) >= yw_today:
lt_yw = list(st_yw)
lt_yw.sort()
return lt_yw
elif key == 'csv_file':
""" Renvoie le chemin complet du fichier correspondant à la devise en cours, à l'année et à la semaine. """
year, week, b_ticks = l_params # l_params = [*t_yw, b_ticks]
str_week = f'0{week}'[-2:]
return os.path.join(self.symbol_dir, str(year), f"{'tick' if b_ticks else 'candle'}_"
f"{str_week}.csv")
elif key == 'url_file':
""" Renvoie l'url du fichier d'histos correspondant à la devise en cours, à l'année et à la semaine. """
year, str_week, b_ticks = l_params # l_params = [*t_yw, b_ticks]
url_candle = 'https://candledata.fxcorporate.com/m1'
url_tick = 'https://tickdata.fxcorporate.com'
return f'{url_tick if b_ticks else url_candle}/{self.symbol}/{year}/{int(str_week)}.csv.gz'
def verify_weeks(self, b_ticks, b_silent=False):
""" Affichage d'un tableau dans la console (Lignes=années, Colonnes=semaines).
'▒' ou '▄' ou '▀' = semaine présente, '.' = semaine absente. """
""" 1 - Listes des semaines réellement existantes en base de données et en fichier .csv. """
l_weeks_csv, l_weeks_db = self._existing_lists(b_ticks)
""" 2 - Affichage du titre et entêtes de colonnes. """
typ = 'Ticks' if b_ticks else 'Candles'
self.ut.gauge(f"{self.instrument}-{typ}", large=17)
[self.ut.gauge(char=f"{str(week) :<2}") for week in range(1, 54)]
self.ut.gauge('end')
""" 3 - Affichage du contenu - Ordonnées = années, abscisse = semaines. """
l_weeks = l_weeks_csv + l_weeks_db
l_weeks.sort()
first_year = (2018 if b_ticks else 2012) if len(l_weeks) == 0 else l_weeks[0][0]
year_now = datetime.date.today().isocalendar()[0]
for year in range(first_year, year_now + 1):
self.ut.gauge(year, large=17)
""" Boucle sur les semaines de l'année {year} : 1 à (52 ou 53). """
for week in range(1, self._helper('nb_weeks_in_year', year, 'today') + 1): # today limite à aujourd'hui.
t_yw = year, f'0{week}'[-2:] # 7 -> 07
b_csv, b_db = t_yw in l_weeks_csv, t_yw in l_weeks_db
if not b_csv and not b_db: # 0 0 /x./y
char, color = '. ', 'BLEU'
elif not b_csv and b_db: # 0 1 /x.y
char, color = '▀ ', 'VERT'
elif b_csv and not b_db: # 1 0 x./y
char, color = '▄ ', 'VIOLET'
else: # 1 1 x.y
char, color = '▒ ', 'BLEU'
self.ut.gauge(char=char, color=color)
self.ut.gauge('end')
""" Vérification de la synchronisation entre la base de données et les fichiers csv. """
msg = "\nAttention : La base de données n'est pas entièrement synchronisée ! csv + db = rectangles bleus." \
"\nL'affichage ci-dessus montre les fichiers .csv en carrés-bas mauves, la db en carrés-hauts verts." \
if not b_silent and l_weeks_csv != l_weeks_db else ''
self.ut.printc(msg)
def download_histos(self, nb_weeks, b_ticks):
if nb_weeks > 1:
self.ut.printc(f"\nArrêt manuel : garder la touche '{self.kb_break.upper()}' appuyée.\n")
""" Intervalle des semaines à télécharger (nombres en base 53). """
""" |_ since = Depuis. """
l_weeks_csv, _ = self._existing_lists(b_ticks=b_ticks) # Semaines = str <-- '08', '09', '10', ...
l_csv = [(y, int(w)) for (y, w) in l_weeks_csv] # Semaines = int <-- 8, 9, 10, ...
for y, w in l_csv:
if w > 2:
since = y * 53 + w - 1
break
else:
since = (2018 if b_ticks else 2012) * 53
y_today, w_today = datetime.date.today().isocalendar()[:2]
now = y_today * 53 + w_today
""" |_ now = Jusqu'à. """
""" l_required = Liste des fichiers à télécharger. """
l_required = list()
for yw53 in range(since, now-1): # dernière semaine = (now-1) => pas la semaine en cours.
csv_yw = yw53 // 53, 1 + yw53 % 53
csv_file = self._helper('csv_file', *csv_yw, b_ticks)
if not os.path.isfile(csv_file):
""" Fichier absent localement => à télécharger. """
l_required.append(csv_yw)
db = self.db_tick if b_ticks else self.db_candle
nb_weeks_loaded = 0
""" Parcours des semaines à traiter. """
for csv_yw_required in l_required:
df = self._download_csv_file(csv_yw_required, b_ticks)
if df is None:
""" Arrêt manuel demandé. """
break
elif df.shape[0] > 0:
""" 1 - Enregistrement du fichier téléchargé sur disque dur. """
self._df_to_csv(df, csv_yw_required, b_ticks)
""" 2 - Écriture du contenu en base de données. break si arrêt manuel demandé. """
if not db.df_to_table(df):
continue
""" 3 - Update des volumes à partir des ticks. """
df_stamps = self.db_tick.get_df_stamps(*csv_yw_required)
if df_stamps.shape[0] > 0:
self.db_candle.update_volume(df_stamps)
""" Arrêt si le nombre de semaines demandées est atteint. """
nb_weeks_loaded += 1
if nb_weeks_loaded >= nb_weeks:
break
s = 's' if nb_weeks > 1 else ''
self.ut.printc(f"Téléchargement{s} terminé{s}.", color='vert')
def synchro_db_csv(self, b_ticks):
""" A l'issue de cette méthode, TOUTE la db sera l'image de TOUS les fichiers csv. """
""" 1 - Base de données. """
db, typ = (self.db_tick, 'Ticks') if b_ticks else (self.db_candle, 'Candles')
""" 2 - État actuel des historiques => Différences entre csv et db
- l_drop = semaines à supprimer de la db, l_add = semaines à ajouter à la db. """
l_drop, l_add, l_vol = self._lists_to_update_db(b_ticks=b_ticks)
if l_drop+l_add+l_vol == []:
self.ut.printc(f"{typ} csv <--> db : OK. La synchronisation est inutile.\n", color='VERT')
return True
""" 3 - Message : état initial. """
for l_weeks in [l_drop, l_add]:
if len(l_weeks) > 0:
s = 's sont' if len(l_weeks) > 1 else ' est'
verb = 'ajouter à' if l_weeks == l_add else 'supprimer de'
self.ut.printc(f"{len(l_weeks)} semaine{s} à {verb} la base de données '{typ.lower()}'.", color='VERT')
""" 4 - Ajout de données dans la db. """
self._add_weeks(l_add, b_ticks)
""" 5 - Suppression de données de la db. """
for yw in l_drop:
db.delete_week(*yw)
""" 6 - Update des volumes. """
for yw in l_vol:
if keyboard.is_pressed(self.kb_break):
self.ut.printc("Arrêt manuel demandé.")
break
df_stamps = self.db_tick.get_df_stamps(*yw)
self.db_candle.update_volume(df_stamps)
""" 7 - Vérification. """
l_drop, l_add, l_vol = self._lists_to_update_db(b_ticks=b_ticks)
success, color = ('réussi', 'VERT') if l_drop+l_add+l_vol == [] else ('échoué', 'ROUGE')
self.ut.printc(f'{typ} : La synchronisation a {success}.', color=color)
def _existing_lists(self, b_ticks):
""" Retourne les listes de semaines des fichiers csv et des enregistrements en base de données. """
""" s_weeks_csv n'est pas une list(), mais un set(), pour éviter les doublons. """
l_weeks_csv, s_weeks_db = list(), set()
db = self.db_tick if b_ticks else self.db_candle
l_iso_yw = self._helper('l_iso_yw', b_ticks) # Liste de toutes les semaines.
typ = 'Ticks' if b_ticks else 'Candles'
self.ut.gauge(f"{self.instrument}-{typ} Création listes :", large=0)
for num, iso_yw in enumerate(l_iso_yw):
""" Semaines existant en fichiers csv : Pour une semaine iso, on a 0, 1 ou 2 fichiers csv. """
l_csv_files, last_stamp = self._get_csv_files(*iso_yw, b_ticks)
csv_size = 0
if len(l_csv_files) > 0:
l_weeks_csv.append(iso_yw)
for csv_file in l_csv_files:
csv_size += os.path.getsize(csv_file)
""" Liste des semaines existant en base de données. """
csv = last_stamp if b_ticks else csv_size
if db.week_exists(*iso_yw, csv):
s_weeks_db.add(iso_yw) # (1)
if num % 8 == 0:
self.ut.gauge(char='.') # , color='Jaune')
self.ut.gauge('end')
self.ut.gauge('end') # Ligne vide supplémentaire.
""" Persistance en pkl. """
l_weeks_db = list(s_weeks_db)
l_weeks_csv.sort()
l_weeks_db.sort()
return l_weeks_csv, l_weeks_db
def _get_csv_files(self, iso_year, iso_week, b_ticks):
""" Attention ! Les noms de fichiers csv ne correspondent pas toujours à leur contenu ! """
""" Return : liste de noms complets de fichiers csv correspondant à (iso_year, iso_week) : 0, 1 ou 2 fichiers.
fxcorporate ne respecte pas la norme ISO. De plus :
- D'une année sur l'autre, le nom du fichier csv ne correspond pas à son contenu.
- On ne trouve pas de correspondance entre diverses devises.
- Par conséquent, le nom du fichier n'est pas fiable pour trouver le bon N° de semaine.
La méthode adoptée pour contourner ce problème est la suivante :
- Recherche de 3 fichiers csv voisins de (iso_year, iso_week) passés en paramètres.
- Lecture, pour chacun, de la date-heure du milieu du fichier.
- On en déduit le VRAI N° de semaine ISO.
Il arrive parfois, au changement d'année, qu'une même semaine soit composée de 2 fichiers csv.
- C'est pourquoi la valeur de retour est une liste. """
int_week, l_fxyw = int(iso_week), list()
last_stamp = 0
for week in range(int_week - 1, int_week + 1): # 2 Semaines : précédente, actuelle.
""" Correction : N° de semaine en base 53. """
y, w = (iso_year, week) if week > 0 else (iso_year - 1, week + 53)
y, w = (y, w) if w <= 53 else (y + 1, w - 53)
""" Les 3 fichiers csv candidats. """
str_w = f'0{w}'[-2:]
file_name = f"{'tick' if b_ticks else 'candle'}_{str_w}.csv" # 'tick_??.csv' ou 'candle_??.csv'.
csv_path = os.path.abspath(f"{os.path.dirname(self.symbol_dir)}/{self.symbol}/{y}/{file_name}")
if os.path.isfile(csv_path):
with open(csv_path, 'r') as fh:
""" On 'goûte' les 300 derniers bytes du fichier pour connaitre le N° ISO de semaine. """
nb_char = 300
fh.seek(0, os.SEEK_END)
fh.seek(fh.tell() - nb_char)
dt_str = fh.read().split('\n')[-3].split(',')[0]
last_stamp = self.dt.get_dtstamp_from_dtstr(dt_str)
o_d = self.dt.get_date_from_dtstamp(last_stamp) # Objet date.
if o_d.isocalendar()[:2] == (iso_year, int_week): # Filtrage.
l_fxyw.append(csv_path) # Candidat sélectionné.
return l_fxyw, last_stamp # 0, 1 ou 2 éléments.
def _download_csv_file(self, csv_yw_required, b_ticks):
""" Téléchargement, décompactage, décodage, DataFrame, enregistrement sur disque dur.
- return DataFrame. """
""" 1 - Demande au helper de construire l'url. """
url_required = self._helper('url_file', *csv_yw_required, b_ticks)
""" 2 - Initialisation des bytes. """
content = b''
""" 3 - Téléchargement en streaming avec lots de taille {size//50}, permettant l'insertion d'une jauge. """
y, w = csv_yw_required
try:
with requests.get(url_required, stream=True) as req:
size = int(req.headers['Content-Length'])
if size < 1_000:
return pd.DataFrame() # return df vide.
print()
self.ut.gauge(f"Téléchargement {' ticks ' if b_ticks else 'candles'} {self.symbol} ({y}, {w})")
for chunk in req.iter_content(chunk_size=size//50):
if keyboard.is_pressed(self.kb_break):
self.ut.printc("Arrêt manuel demandé.")
self.ut.gauge('end')
return None
content += chunk
""" 3.1 - Jauge de progression. """
self.ut.gauge()
self.ut.gauge('end')
except (Exception,):
self.ut.printc('Connexion internet interrompue.')
return None
""" 4 - Décompactage du buffer de données binaires (codées utf), en mémoire. """
buf = BytesIO(content)
f = gzip.GzipFile(fileobj=buf)
coded_bytes = f.read()
""" 5 - Décodage : utf-8 pour les candles, utf-16 pour les ticks. """
codec, type_data = ('utf-16', 'tick') if b_ticks else ('utf-8', 'candle')
decoded_bytes = coded_bytes.decode(codec)
""" 6 - Conversion Bytes -> String """
decoded_str = StringIO(decoded_bytes)
""" 7 - String to DataFrame. """
df = pd.read_csv(decoded_str) # Pas de colonne-index (index_col=0 retiré)
""" 8 - Retourne le dataframe. """
return df
def _df_to_csv(self, df, t_yw, b_ticks):
""" Contrôle préalable : si le dossier-cible n'existe pas, on le crée. """
csv_path = self._helper('csv_file', *t_yw, b_ticks)
os.makedirs(os.path.dirname(csv_path), exist_ok=True)
df.to_csv(csv_path, index=False) # Copie du fichier .csv sur disque dur.
@staticmethod
def _csv_to_df(csv_file):
""" Conversion du fichier {csv_file} en dataframe. """
return pd.read_csv(csv_file) if os.path.isfile(csv_file) else pd.DataFrame()
def _lists_to_update_db(self, b_ticks):
""" Retourne 3 listes : l_drop, l_add et l_vol.
Algorithme pour l_drop et l_add :
- (1) = l_weeks_db : Liste des semaines réellement existantes en base de données.
- (2) = l_weeks_csv : Liste des semaines réellement existantes en fichiers csv.
- Intersection (Inter) = (1) ⋂ (2)
- l_drop : Liste des semaines à supprimer de la db (celles qui n'existent pas en csv) : (1) - (Inter)
- l_add : Liste des semaines à ajouter dans la db (celles des csv qui n'existent pas en db) : (2) - (Inter)
--------------------------------
l_vol = liste des semaines (yw) de candles nécessitant un update du champ 'Volume' dans les tables.
Sans objet pour les ticks.
"""
""" 1 et 2 - Listes des semaines réellement existantes en base de données et en fichier .csv. """
l_weeks_csv, l_weeks_db = self._existing_lists(b_ticks=b_ticks)
""" 3 - Liste des semaines à supprimer de la db (celles qui n'existent pas en csv). """
l_drop = list()
for t_yw in l_weeks_db:
if t_yw not in l_weeks_csv:
# print('A supprimer :', t_yw) # MAP : Permet de lister les semaines qui sont à supprimer.
l_drop.append(t_yw)
""" 4 - Liste des semaines à ajouter à la db (celles des csv qui n'existent pas en db). """
l_add = list()
for t_yw in l_weeks_csv:
if t_yw not in l_weeks_db:
# print('A ajouter :', t_yw) # MAP : Permet de lister les semaines qui sont à ajouter.
l_add.append(t_yw)
""" 5 - Liste des semaines de ticks pour lesquelles les volumes des candles sont à updater. """
l_vol = list()
if not b_ticks: # Renkos non concernés.
""" Semaines ticks existantes (b_ticks=True). """
_, l_weeks_tick_db = self._existing_lists(b_ticks=True)
for yw in l_weeks_tick_db:
if yw in l_weeks_db:
if not self.db_candle.is_volume_in_week(*yw):
l_vol.append(yw)
return l_drop, l_add, l_vol
def _add_weeks(self, l_yw, b_ticks):
"""
@param l_yw : Liste des semaines à ajouter à la base de données.
@param b_ticks: True = ticks, False = candles.
"""
db = self.db_tick if b_ticks else self.db_candle
self.ut.printc(f"\nArrêt manuel : garder la touche '{self.kb_break.upper()}' appuyée.\n")
for iso_yw in l_yw:
if keyboard.is_pressed(self.kb_break):
self.ut.printc("Arrêt manuel demandé.")
break
l_csv_files, _ = self._get_csv_files(*iso_yw, b_ticks) # Liste de 0, 1 ou 2 fichiers.
for csv_file in l_csv_files:
df = self._csv_to_df(csv_file)
if df.shape[0] > 0:
db.df_to_table(df)
def get_stamps(self, table_name, nb_stamps):
db = self.db_tick if (table_name == 'Ticks' or table_name[0] == 'r') else self.db_candle
return db.get_last_stamps(table_name, nb_stamps)
def get_datas(self, table_name, final_stamp, nb_points):
b_ticks = table_name == 'Ticks' or table_name[0] == 'r' # Ticks ou Renko
db = self.db_tick if b_ticks else self.db_candle
return db.get_datas(table_name, final_stamp, nb_points)
""" *************************** Interface Trading <--> Base de données. *************************** """
@staticmethod
def custom_ta():
""" Indicateurs d'analyse technique personnalisés (dans /functions/custom_ta). """
path = os.path.abspath(__file__)
ta_dir = os.path.join(path.split(os.path.join('', 'trading', ''))[0], 'functions', 'custom_ta')
create_dir(ta_dir) # Création automatique de 9 sous-dossiers (peut être supprimé après le 1er passage).
import_dir(ta_dir, verbose=False)
def slice_tick(self, pc_pos, nb_rows):
"""
@param pc_pos: pourcentage -> 0 à 100.
@param nb_rows: Nombre d'enregistrements demandés -> si > 0, enr après pc_pos ; si < 0, enr avant pc_pos.
@return: pandas DataFrame.
"""
return self.db_tick.slice_tick(pc_pos, nb_rows)
def slice_renko(self, mesh, pc_from, pc_to, nb_rows):
return self.db_tick.slice_renko(mesh, pc_from, pc_to, nb_rows)
def slice_candle(self, period, pc_from, pc_to, nb_rows):
return self.db_candle.slice_candle(period, pc_from, pc_to, nb_rows)
def get_pilot(self, table=None, pc_from=None, pc_to=None, nb_rows=10_000, **kwargs):
""" - Récupération des {nb_rows} points AVANT {pc_to} (en %) ou APRES {pc_from} (en %). <- pc_ comme 'pourcent'
- Par défaut, les 10_000 derniers points. pc_to=100 signifie la fin du fichier, pc_to=50 le milieu. """
self.custom_ta()
if not (pc_from is None or pc_to is None):
nb_rows = None
if isinstance(table, int):
df = self._df_renko(table, pc_from, pc_to, nb_rows)
elif isinstance(table, str):
df = self._df_candle(table, pc_from, pc_to, nb_rows)
else: # elif table is None:
if nb_rows is None:
raise SystemExit("Ticks : Restriction à cause du trop grand nombre de points."
"\nSélectionner pc_from OU pc_to (pas les deux), et nb_rows.")
pc_pos = pc_from if isinstance(pc_from, (int, float)) else pc_to if isinstance(pc_to, (int, float)) else 50
nb_rows = -nb_rows if pc_pos == pc_to else nb_rows
df = self._df_tick(pc_pos, nb_rows)
return df
def _df_tick(self, pc_pos=50, nb_rows=50_000):
df = self.slice_tick(pc_pos=pc_pos, nb_rows=nb_rows)
""" Uniformisation des colonnes. """
df['Open'] = (df['Bid'] + df['Ask']) / (2 * self.pips)
df['High'] = (df['Bid'] + df['Ask']) / (2 * self.pips)
df['Low'] = (df['Bid'] + df['Ask']) / (2 * self.pips)
df['Close'] = (df['Bid'] + df['Ask']) / (2 * self.pips)
df['Volume'] = 1
df['Spread'] = (df['Ask'] - df['Bid']) / self.pips
return df[['timestamp', 'Open', 'High', 'Low', 'Close', 'Volume', 'Spread']] # Tri des colonnes.
def _df_renko(self, table, pc_from, pc_to, nb_rows):
"""
@param table: Maille 1, 2, 3, 4, 5, 7, 10, 14 ou 20.
@param pc_from: En % sur toute l'étendue des données.
@param pc_to: En % sur toute l'étendue des données.
@param nb_rows: nb_enr = abs(nb_rows). Si > 0, nb_enr depuis pc_from, si < 0, nb_enr jusqu'à pc_to.
@return: NA
"""
try:
df = self.slice_renko(table, pc_from=pc_from, pc_to=pc_to, nb_rows=nb_rows)
if df.empty:
raise SystemExit(f"La table {self.instrument} de maille {table} est vide.")
df['Volume'] = df['Volume'].fillna(0) # Remplace les nan par des zéros.
""" Uniformisation Candles-Renko : Colonne 'Renko' renommée en 'Close'. """
df = df.rename(columns={'Renko': 'Close'})
df['Open'] = df['Close'].shift(1)
df['High'] = df[['Close', 'Open']].max(axis=1) + table
df['Low'] = df[['Close', 'Open']].min(axis=1) - table
return df[['timestamp', 'Open', 'High', 'Low', 'Close', 'Volume', 'Spread']] # Tri des colonnes.
except (ValueError,) as err:
self.ut.printc(str(err))
return pd.DataFrame()
def _df_candle(self, table, pc_from, pc_to, nb_rows):
"""
@param table: 'm1', 'm5', 'm15', 'm30', 'h1', 'h4', 'day', 'week'
@param pc_from:
@param pc_to:
@param nb_rows: nb_enr = abs(nb_rows). Si > 0, nb_enr depuis pc_from, si < 0, nb_enr jusqu'à pc_to.
@return: NA
"""
try:
df = self.slice_candle(table, pc_from=pc_from, pc_to=pc_to, nb_rows=nb_rows)
if df.empty:
raise SystemExit(f"La table {self.instrument} de période {table} est vide.")
df['Volume'] = df['Volume'].fillna(0)
""" Uniformisation Candles-Renko : Ajout de la colonne 'Close', moyenne de 'BidClose' et 'AskClose'. """
df['Open'] = (df['BidOpen'] + df['AskOpen']) / (2 * self.pips)
df['High'] = (df['BidHigh'] + df['AskHigh']) / (2 * self.pips)
df['Low'] = (df['BidLow'] + df['AskLow']) / (2 * self.pips)
df['Close'] = (df['BidClose'] + df['AskClose']) / (2 * self.pips)
df['Spread'] = (df['BidClose'] - df['AskClose']) / self.pips
""" Filtrage et tri des colonnes. """
return df[['timestamp', 'Open', 'High', 'Low', 'Close', 'Volume', 'Spread']]
except (ValueError,) as err:
self.ut.printc(str(err))
return pd.DataFrame()
def main(): # MAP ***** MAP ***** MAP ***** MAP ***** MAP ***** MAP ***** MAP *****
""" Code de MAP (mise au point). Espace expérimental : ajouter, supprimer, commenter, décommenter, ... """
h = CtrlHistos('EUR/JPY')
# h = CtrlHistos('USD/JPY')
# h = CtrlHistos('GBP/USD')
# """ Candles. """
# h.download_histos(100, b_ticks=False)
# h.synchro_db_csv(b_ticks=False)
# h.verify_weeks(b_ticks=False)
""" Ticks. """
h.download_histos(5, b_ticks=True)
h.synchro_db_csv(b_ticks=True)
h.verify_weeks(b_ticks=True)
if __name__ == '__main__':
main()
ShowGeek
, nouvelle version → \show\show_geek.py
:
""" Version 2022-03-05 """
# Imports externes.
import numpy as np
import seaborn as sns
from matplotlib import pyplot as plt
from matplotlib.ticker import FormatStrFormatter
import matplotlib.animation
import copy
# Imports internes.
from functions.utils import Dictionary
from trading.historiques.ctrl_histos import CtrlHistos
# noinspection PyUnusedLocal
class ShowGeek:
def __init__(self): # , **args):
super().__init__() # Appelle Genetic.__init__(), 2ème classe héritée de la classe dérivée.
self.fig = plt.figure(1)
self.df_pilot = None
self.df_scats = None
self.nb_datas = 0
self.l_ax = None
self.pips = 0
""" Animation. """
self.b_anim = True
self.abscissa_size = 0
self.pointer = 0
self.b_paused = False
self.b_reverse = False
self.magn = 1 # Appui sur les flèches : Le pas d'avancement est : 1, 10 ou 100.
self.od = None
""" Signal pilote. """
self.od = Dictionary(self.set_heights())
self.get_df_pilot()
def get_args(self):
args = self.central_args('ui')
return dict( # Ces valeurs seront à la racine du super-dictionnaire self.od.
geometry=args.read('geometry', (100, 40, 1000, 700)), # x, y, w, h.
margins=args.read('margins', (6, 8, 10, 8)), # Marges : haut, droite, bas, gauche.
abscissa_size=args.read('abscissa_size', 600), # Nb de points affichés en abscisse.
window_title=args.read('window_title', "Modèle simple"), # Titre de la fenêtre.
figure_title=args.read('figure_title', "Modèle simple - Ne pas modifier"), # Titre des graphiques.
leader_lines=args.read('leader_lines', True), # Lignes hortogonales de repère, suivi de la souris.
subplots=args.read('subplots', dict( # Noms et hauteurs (en %) des graphiques modifiables.
Principal=65, # Subplot principal : NE PAS MODIFER SON ORTHOGRAPHE.
Milieu='', # '' : Si chaîne vide => Les hauteurs seront automatiquement calculées et réparties.
Bas='',
)),
show_pilot=args.read('show_pilot', True),
show_volume=args.read('show_volume', False),
show_linked_label=args.read('show_linked_label', True),
best_zones=dict(
show=args.read(['best_zones', 'show'], False),
gap=args.read(['best_zones', 'gap'], 20), # Nombre de pips take-profit ou stop-loss.
bandwidth=args.read(['best_zones', 'bandwidth'], 80), # Pourcentage : de 0 à 100.
up=args.read(['best_zones', 'up'], True), # Affichage des palliers haut.
down=args.read(['best_zones', 'down'], True), # Affichage des palliers bas.
scatters=args.read(['best_zones', 'scatters'], True), # Affichage des optimums sous forme de ronds.
confirm=args.read(['best_zones', 'confirm'], True),
zig_zag=args.read(['best_zones', 'zig_zag'], True), # Affichage de la courbe zig-zag.
colors=args.read(['best_zones', 'colors'], ('#ffff0030', '#ff00ff10')), # Coloriage des ouvertures.
),
animation=args.get('animation', True),
# Paramètres généraux supplémentaires ici ...
# Paramètres seaborn : https://www.python-simple.com/python-seaborn/seaborn-general.php
)
def show_ui(self):
""" ****************** Algorithme de construction ****************** """
""" Ajout des indicateurs dans la dataframe {df_pilot} <-- Colonnes ajoutées à {df_pilot}. """
self.add_indics()
""" Ajout du signal pilote. """
self.add_pilot()
""" Paramètres avant la création des graphiques (axes). """
self.pre_params()
""" Distribution des signaux (des colonnes) : un dataframe par axe dans {self.l_ax}. """
self.distrib()
""" Création des graphiques (axes). """
self.build_axis()
""" Paramètres après la création des graphiques (axes). """
self.post_params()
""" Affichage animé. """
self.show()
def animate(self, _=''):
""" Appelé dans la boucle matplotlib.FuncAnimation depuis self.show(). """
def get_line(_line_name):
_from, _to = x[0], x[0] + len(x)
return self.df_pilot[_line_name][_from: _to]
if self.b_paused:
return
""" Gestion du pointeur. """
self.set_pointer()
""" Abscisse commune : liste de {abscissa_size} valeurs depuis {pointer}. """
x = list(range(self.pointer, self.pointer + self.abscissa_size))
""" Parcours des graphiques (axes). """
for axis_name in self.od.keys():
if axis_name == 'figure':
continue
o_ax = self.od.read([axis_name, 'o_ax'])
if o_ax is None:
continue
df = self.od.read([axis_name, 'df'])
y_min, y_max = 10 ** 9, -10 ** 9
if not (df is None or df.empty):
""" Parcours des courbes (lines). """
for o_line in self.get_axis_lines(axis_name):
""" Plusieurs courbes (lines) dans un graphique (o_ax). """
line_name = o_line.get_label()
serie = df[line_name]
y = serie[self.pointer: self.pointer + self.abscissa_size]
""" Égalisation des tailles de x et y. """
len_min = min(len(x), len(y))
x, y = copy.copy(x[:len_min]), copy.copy(y[:len_min])
""" Hook pour l'injection de paramètres dans cette courbe. """
d_attr = self.hook_line_anim(axis_name, line_name, x, y, o_ax, df, o_line)
if d_attr is None:
d_attr = {}
""" Injection des datas à afficher. """
if d_attr.get('visible', True):
o_line.set_ydata(y)
o_line.set_xdata(x)
if not np.isnan(y).all():
y_min = min(y_min, np.nanmin(y))
y_max = max(y_max, np.nanmax(y))
""" Calcul des limites des ordonnées (y). """
if y_max < y_min:
y_min, y_max = -10, 10
y_padding = max(0, (y_max - y_min) * 0.05) # Marges top et bottom dans les axes (5%)
y_min, y_max = (round(y_min - y_padding, 6), round(y_max + y_padding, 6)) if y_max >= y_min else (-10, 10)
y_min = max(y_min, -10 ** 9)
y_max = min(y_max, 10 ** 9)
""" Limites x et y. """
o_ax.set_xlim(x[0], x[-1])
if y_min == y_max:
y_min = y_max = None
o_ax.set_ylim(y_min, y_max)
self.hook_axis_anim(axis_name, x, o_ax, df, y_min, y_max, get_line) # l_hook = [axis_name, x, o_ax, df]
if not self.b_anim:
plt.draw()
def set_pointer(self):
if self.b_reverse:
self.pointer -= self.magn
if self.pointer < 0:
self.pointer = self.nb_datas - self.abscissa_size
else:
self.pointer += self.magn
if self.pointer > self.nb_datas - self.abscissa_size:
self.pointer = 0
def add_pilot(self):
""" Affichage conditionnel du signal-pilote. """
self.od.write(['Principal', 'signals', 'Close', 'visible'], self.od.read(['figure', 'show_pilot'], False))
self.add_df('Principal', ['Close'])
def pre_params(self):
""" Fenêtre : taille, position, titre. """
mgr = plt.get_current_fig_manager()
mgr.set_window_title(self.od.read(['figure', 'window_title'], 'Stratégie'))
win = mgr.window
win.setGeometry(*self.od.read(['figure', 'geometry']))
""" Paramètres seaborn : https://www.python-simple.com/python-seaborn/seaborn-general.php """
sns.set_context(self.od.read(['figure', 'seaborn_context'], 'paper')) # paper, notebook, talk, poster
sns.set_style(self.od.read(['figure', 'seaborn_style'], 'darkgrid')) # white, dark, whitegrid, darkgrid, ticks
""" Ini attributs. """
self.abscissa_size = self.od.read(['figure', 'abscissa_size'], 600)
self.b_anim = self.od.read(['figure', 'animation'], False)
""" Affichage des best-zones dans le graphique (subplot) contenant le signal 'Close'. """
subplot_name = self.od.read(['figure', 'best_zones', 'subplot'])
if self.od.read(['figure', 'best_zones', 'show'], False) and subplot_name:
""" Appel de l'indicateur de trend {best_zones}. """
gap = self.od.read(['figure', 'best_zones', 'gap'], 24)
bandwidth = self.od.read(['figure', 'best_zones', 'bandwidth'], 80)
self.df_pilot[['Up', 'Down', 'Peak']], self.df_scats = self.df_pilot.ta.best_zones(gap=gap,
bandwidth=bandwidth)
""" Affichage des palliers haut et bas. """
l_columns = list()
if self.od.read(['figure', 'best_zones', 'up'], False):
l_columns.append('Up')
if self.od.read(['figure', 'best_zones', 'down'], False):
l_columns.append('Down')
self.add_df(subplot_name, l_columns)
def post_params(self):
""" Recherche des paramètres dans le dictionnaire. """
""" Titre des graphiques. """
axis_title = self.od.read(['figure', 'figure_title'])
if isinstance(axis_title, str) and len(axis_title) > 0:
o_ax = self.get_first_axis()
if self.is_axis(o_ax):
o_ax.set_title(axis_title)
""" y axis : position et visibilité des graduations (ticks). """
for l_keys in self.od.key_list():
if l_keys[-1] != 'y_ticks':
continue
position = self.od.read(l_keys)
o_ax = self.od.read([l_keys[0], 'o_ax'])
if not self.is_axis(o_ax):
continue
if position is False:
o_ax.axes.set_yticks([]) # Suppression (ticks + ticklabels + grille).
pass
else:
o_ax.yaxis.set_ticks_position(position) # left, right
""" Suppression de la grille. """
for l_keys in self.od.key_list():
if l_keys[-1] != 'grid':
continue
if self.od.read(l_keys) is False:
o_ax = self.od.read([l_keys[0], 'o_ax'])
o_ax.grid(False) # Suppression de la grille.
""" x axis : visibilité des graduations (ticks) """
for l_keys in self.od.key_list():
if l_keys[-1] != 'x_ticks':
continue
o_ax = self.od.read([l_keys[0], 'o_ax'])
if not self.is_axis(o_ax):
continue
if self.od.read(l_keys) is False:
o_ax.tick_params(axis='x', colors='#fff0') # '#fff0' = invisible
""" Légendes. """
for axis_name in self.get_axis_names():
o_ax = self.od.read([axis_name, 'o_ax'])
if self.get_axis_lines(axis_name):
legend = self.od.read([axis_name, 'legend'])
pos = ['auto', 'rh', 'lh', 'lb', 'rb', 'rm', 'lm', 'rm', 'cb', 'ch', 'cm']
if isinstance(legend, bool):
o_ax.legend().set_visible(legend) # Visibilité de la légende.
elif legend in pos:
o_ax.legend(loc=pos.index(legend)).set_title(axis_name)
else:
o_ax.legend().set_title(axis_name)
def build_axis(self):
""" Construction de la fenêtre Windows. """
""" Marges générales et répartition des subplots. """
l_margins = self.od.read(['figure', 'margins'], (6, 8, 10, 8)) # Marges : haut, droite, bas, gauche.
x, w = l_margins[3] / 100, 1 - (l_margins[1] + l_margins[3]) / 100
y_offset, y_height = l_margins[0], 100 - l_margins[0] - l_margins[2] # Valeurs : 0 à 100.
ax_top, y = 100, 1 - y_offset / 100
first_axis, axis_ante = '', ''
for axis_name, h in self.od.read(['figure', 'subplots']).items():
o_ax = None
if isinstance(h, (int, float)):
if first_axis == '':
first_axis = axis_name
else:
self.od.write([axis_ante, 'x_ticks'], False)
axis_ante = axis_name
h = h * y_height / 10_000
y -= h
o_ax = self.fig.add_axes((x, y, w, h), sharex=self.od.read([first_axis, 'o_ax']))
self.od.write([axis_name, 'geometry'], (x, y, w, h))
else:
""" Graphique jumelé (twined axis). """
tw = h.split()
if tw[0] == 'twinned':
""" Axes jumelés. """
parent_name = tw[-1]
parent_axis = self.od.read([parent_name, 'o_ax'])
if self.is_axis(parent_axis):
o_ax = parent_axis.twinx()
if self.is_axis(o_ax):
self.od.write([axis_name, 'o_ax'], o_ax)
""" Ajout artificiel d'attributs vides. """
o_ax.filled = list()
o_ax.note_x = o_ax.annotate('', (0, 0))
o_ax.note_y = o_ax.annotate('', (0, 0))
""" Repères : lignes hortogonales sous le curseur de la souris. Voir on_mouse_move() """
o_ax.hline = o_ax.axhline(y=-1000, color='k', lw=.2, ls='--') # -1000 = Souris hors figure.
o_ax.vline = o_ax.axvline(x=-1000, color='k', lw=.2, ls='--')
""" Format des graduations y. """
o_ax.yaxis.set_major_formatter(FormatStrFormatter('%.1f'))
""" Initialisation des courbes (DataFrames) dans les graphiques (axes). """
df = self.od.read([axis_name, 'df'])
if axis_name == self.od.read(['figure', 'best_zones', 'subplot']) and self.is_df(self.df_scats):
if self.od.read(['figure', 'best_zones', 'scatters'], False):
sns.scatterplot(data=self.df_scats, x='indx', y='Close', hue=axis_name, ax=o_ax)
if self.od.read(['figure', 'best_zones', 'confirm'], False):
sns.scatterplot(data=self.df_scats, x='confirm_x', y='confirm_y', ax=o_ax, hue='c_type',
alpha=.3)
if self.od.read(['figure', 'best_zones', 'zig_zag'], False):
sns.lineplot(data=self.df_scats, x='indx', y='Close', ax=o_ax, lw=.5)
o_ax.set_ylabel('')
""" Lines. """
if self.is_df(df) and len(df.columns) > 0:
sns.lineplot(data=df[:1], ax=o_ax)
""" Position (left, right) des graduations y. """
b_odd = False
l_axes = self.get_axis_names()
if self.od.read(['Volume', 'twined_axis']) == 'Principal':
b_odd = bool(l_axes.index('Principal') % 2) # Position impaire de 'Principal'.
for indx, axis_name in enumerate(l_axes):
self.od.write([axis_name, 'y_ticks'], 'left' if b_odd == indx % 2 else 'right')
""" Coloriage vertical conditionnel des zônes d'entrée en position (ouverture de trade). """
b_zones = self.od.read(['figure', 'best_zones', 'show'], False)
colors = self.od.read(['figure', 'best_zones', 'colors'])
if b_zones and colors is not None:
peaks = self.df_pilot['Peak']
fc_b, fc_t = colors
l_args = [
dict(x=peaks.index, y1=10 ** 5, y2=-10 ** 5, where=peaks <= -1, fc=fc_b, ec='#fff0', interpolate=True),
dict(x=peaks.index, y1=10 ** 5, y2=-10 ** 5, where=peaks >= 1, fc=fc_t, ec='#fff0', interpolate=True),
]
for ax_name in l_axes:
oax = self.o_ax(ax_name)
if self.is_axis(oax):
self.fill_between(oax, l_args)
def set_heights(self):
""" Calcul des hauteurs (en %) subplots. La somme fait 100%. """
args = self.get_args()
if not isinstance(args['subplots'], dict):
raise SystemExit("Super-dictionnaire : La clé 'subplots' doit exister et doit contenir un dictionnaire. ")
l_heights, sum_heights, nb_zeros = list(), 0, 0
for name, pc_height in args['subplots'].items():
height = pc_height if (isinstance(pc_height, int) and pc_height > 0) else 0
nb_zeros += 1 if height == 0 else 0
sum_heights += height
l_heights.append((name, height))
if sum_heights > 100:
""" La somme des hauteurs dépasse 100% : on effectue une réduction proportionnelle. """
l_heights = [(name, pc_height) for (name, pc_height) in l_heights if pc_height > 0] # Suppression des 0.
l_heights = [(name, pc_height * 100 / sum_heights) for (name, pc_height) in l_heights] # Normalisation.
else:
""" Si la somme == 100%, on ne fait rien. Si elle est < 100%, on ajoute un subplot, nommé 'rest'. """
if nb_zeros == 0:
l_heights.append(('rest', 0))
nb_zeros = 1
h_rest = (100 - sum_heights) / nb_zeros
if h_rest == 0:
l_heights = [(name, pc_height) for (name, pc_height) in l_heights if pc_height > 0] # Suppression des 0
for i, (name, pc_height) in enumerate(l_heights):
if pc_height == 0:
l_heights[i] = (name, h_rest)
args['subplots'] = dict(l_heights)
""" Ajout d'arguments par défaut. """
if 'seaborn_context' not in args: # Valeurs possibles : paper, notebook, talk, poster
args['seaborn_context'] = 'paper'
if 'seaborn_style' not in args: # Valeurs possibles : white, dark, whitegrid, darkgrid, ticks
args['seaborn_style'] = 'darkgrid'
return {'figure': args}
def add_df(self, subplot_name, l_columns, df=None):
""" Création du dataframe qui sera associé à un graphique.
- Méthode appelée par la méthode distrib() de la classe dérivée.
- Le DataFrame {df} a plusieurs colonnes, chacune associée à une courbe. """
""" Vérification. """
if self.od.read(['figure', 'subplots', subplot_name]) is None:
raise SystemExit(f"Erreur dans self.distrib() : Le subplot {subplot_name} n'existe pas.")
l_cols = list()
for column in l_columns:
""" Suppression des colonnes inexistantes. """
try:
self.df_pilot[column] if df is None else df[column]
except KeyError:
""" Si la colonne n'existe pas ... """
print(f"La colonne '{column}' n'existe pas dans le DataFrame.")
continue
l_cols.append(column)
""" Fusion par ajout de colonnes dans le df existant. """
df = self.df_pilot[l_cols] if df is None else df[l_cols]
df_exist = self.od.read([subplot_name, 'df'])
if df_exist is not None:
df_exist[df.columns] = df
df = df_exist # On retrouve les colonnes de df_exist en premières places.
if not self.is_df(df):
return
self.od.write([subplot_name, 'df'], df)
for column in df.columns:
if column.lower() == 'close':
""" Si ce dataframe a une colonne 'Close', on paramètre 3 traitements distincts par défaut :
- Une étiquette-suiveuse (linked_label).
- Un graphique jumelé (twined_axis) contenant l'affichage du volume en semi-transparence.
- Éléments du graphique jumelé non affichés : y_ticks, y_ticks_labels, grille, légende. """
self.od.write(['figure', 'best_zones', 'subplot'], subplot_name)
if self.od.read(['figure', 'show_linked_label'], False):
self.od.write([subplot_name, 'linked_label'], 'Close')
if self.od.read(['figure', 'table_name'], None) is None:
self.od.write(['figure', 'show_volume'], False)
if self.od.read(['figure', 'show_volume'], False):
self.od.write(['figure', 'subplots', 'Volume'], f'twinned with {subplot_name}')
self.od.write(['Volume', 'df'], self.df_pilot[['Volume']])
self.od.write(['Volume', 'signals', 'Volume', 'visible'], False)
self.od.write(['Volume', 'twined_axis'], subplot_name)
self.od.write(['Volume', 'y_ticks'], False), # Suppression (ticks + ticklabels + grille).
self.od.write(['Volume', 'legend'], False) # Suppression de la légende.
def get_df_pilot(self):
od_pilot = self.central_args('pilot')
instrument = od_pilot.read('instrument', 'EUR/USD')
self.pips = .01 if instrument.endswith('JPY') else .0001
table_name = od_pilot.read('table', 10)
self.df_pilot = CtrlHistos(instrument).get_pilot(**od_pilot) # DataFrame
self.nb_datas = self.df_pilot.shape[0]
self.od.write(['figure', 'instrument'], instrument)
self.od.write(['figure', 'table_name'], table_name)
self.od.write(['figure', 'nb_points'], self.nb_datas)
def linked_label(self, axis_name, x, o_ax, df):
""" https://matplotlib.org/stable/tutorials/text/annotations.html#annotating-with-text-with-box """
linked_label = self.od.read([axis_name, 'linked_label'])
if linked_label:
bbox = {'boxstyle': 'larrow', 'fc': '#ff02', 'ec': 'k', 'lw': .5} # fc=face color, ec=edge color
indx = min(x[-1], df.shape[0] - 1)
last_value = round(df[linked_label][indx], 2) # En pips.
""" Affichage de l'étiquette actuelle (m = marge). """
o_ax.note_y.remove() # Effacement de l'étiquette précédente.
m = self.abscissa_size / 70
o_ax.note_y = o_ax.annotate(last_value, (x[-1], last_value), xytext=(x[-1] + m, last_value), bbox=bbox)
def paint_volume(self, x, o_ax, df):
y = df['Volume'][x[0]: x[-1] + 1] * .8 # Affichage du volume à 80% d'amplitude.
l_args = [
dict(x=x, y1=y, y2=0, fc='#0079a333', ec='#fff0')
]
self.fill_between(o_ax, l_args)
def fill_between(self, o_ax, l_params):
""" Colorie entre 2 valeurs, conditionnellement.
@param o_ax: Axis matplotlib : graphique contenant des courbes.
@param l_params: Liste de coloriages. Chaque élément est un dictionnaire personnalisé. Voir lien ci-dessous.
https://matplotlib.org/stable/api/_as_gen/matplotlib.axes.Axes.fill_between.html """
""" Effacement du coloriage antérieur, nécessaire si les couleurs ont une transparence. """
if not self.is_axis(o_ax):
raise SystemExit(f"Erreur dans self.fill_between() : L'objet o_ax ({o_ax}) n'est pas un axis.")
for fill in o_ax.filled:
o_ax.collections.remove(fill)
o_ax.filled.clear()
""" l_fill contient autant de paramètres que de coloriages à effectuer. """
for p in l_params:
""" Égalisation des vecteurs. """
if p.get('x') is None:
p['x'] = range(self.df_pilot.shape[0])
len_min = len(p['x'])
if self.is_df(p['y1']):
len_min = min(len_min, len(p['y1']))
if self.is_df(p['y2']):
len_min = min(len_min, len(p['y2']))
p['x'] = p['x'][:len_min]
""" Coloriage. """
o_ax.filled.append(o_ax.fill_between(**p))
""" *********************************** Helpers. ************************************ """
def trace_hline(self, axis_name, y, **kwargs):
o_ax = self.o_ax(axis_name)
if self.is_axis(o_ax):
o_ax.axhline(xmin=0, xmax=1, y=y, **kwargs)
def trace_vline(self, axis_name, x, **kwargs):
o_ax = self.o_ax(axis_name)
if self.is_axis(o_ax):
o_ax.axvline(ymin=0, ymax=1, x=x, **kwargs)
@staticmethod
def is_axis(o_ax):
return o_ax.__class__.__name__ == 'Axes'
@staticmethod
def is_df(df):
return df.__class__.__name__ in ['DataFrame', 'Series']
def get_axis_names(self, b_twin=False):
if b_twin:
""" Liste de noms de tous les graphiques (subplots) de base et twined_axis. """
return [ax_name for ax_name in list(self.od.read(['figure', 'subplots']).keys()) if ax_name != 'rest']
else:
""" Liste de noms de tous les graphiques (subplots) de base. """
return [ax_name for (ax_name, val) in list(self.od.read(['figure', 'subplots']).items())
if ax_name != 'rest' and not str(val).startswith('twinned')]
def get_first_axis(self):
l_axes = self.get_axis_names()
o_ax = self.o_ax(l_axes[0])
if self.is_axis(o_ax):
return o_ax
def get_last_axis(self):
l_axes = self.get_axis_names()
o_ax = self.o_ax(l_axes[-1])
if self.is_axis(o_ax):
return o_ax
def _get_all_ax(self):
l_ax = list()
for key, val in self.od.read(['figure', 'subplots']).items():
l_ax.append(self.od.read([key, 'o_ax']))
return l_ax
def get_axis_lines(self, axis_name):
""" Retourne une liste d'objets o_line. """
o_ax = self.o_ax(axis_name)
l_olines = list()
for o_line in o_ax.lines:
""" Plusieurs courbes (lines) dans un graphique (o_ax). """
line_name = o_line.get_label()
if line_name.startswith('_line'):
continue
l_olines.append(o_line)
return l_olines
def o_ax(self, axis_name):
return self.od.read([axis_name, 'o_ax'])
def df(self, axis_name):
return self.od.read([axis_name, 'df'])
def ax_df(self, axis_name):
o_ax = self.od.read([axis_name, 'o_ax'])
df = self.od.read([axis_name, 'df'])
return o_ax, df
""" ***************************** Méthodes surchargées. ***************************** """
@staticmethod
def get_args_ui():
pass
@staticmethod
def central_args(l_keys):
raise SystemExit("show_geek.py > ShowGeek :\nLa méthode 'central_args()' doit être surchargée.")
def get_pilot(self):
raise SystemExit("show_geek.py > ShowGeek :\nLa méthode 'get_pilot()' doit être surchargée.")
def add_indics(self, d_args=None):
raise SystemExit("show_geek.py > ShowGeek :\nLa méthode 'add_indics()' doit être surchargée.")
def distrib(self):
pass
""" ***************************** Méthodes à surcharger. **************************** """
def hook_line_anim(self, axis_name, line_name, x, y, o_ax, df, o_line):
return self.od.read([axis_name, 'signals', o_line.get_label()])
def hook_axis_anim(self, axis_name, x, o_ax, df, y_min, y_max, get_line):
""" Affichage de 2 éléments distincts :
- Volumes, semi-transparent, sans bordure.
- Étiquette-suiveuse """
""" 1 - Affichage des volumes, semi-transparent, sans bordure. """
if axis_name == 'Volume':
self.paint_volume(x, o_ax, df)
""" 2 - Étiquette-suiveuse à droite du graphique. """
self.linked_label(axis_name, x, o_ax, df)
""" **************************** Événements et affichage. *************************** """
def key_event(self, ev):
def arrows(b_forward):
b_reverse = self.b_reverse
self.b_paused, self.b_reverse = False, False if b_forward else True
self.animate()
self.b_paused = True
self.b_reverse = b_reverse
self.magn = 1
keycode = ev.key
if keycode == ' ':
""" Pause on/off. """
self.b_paused = not self.b_paused
elif keycode == 'tab':
""" Inversion de sens. """
self.b_reverse = not self.b_reverse
elif keycode == 'right' or keycode == 'left':
""" Appui sur touche flèche droite ou flèche gauche. """
arrows(keycode == 'right')
elif keycode == 'up' or keycode == 'down':
""" Appui sur touche flèche haut ou flèche bas. """
self.magn = 10
arrows(keycode == 'up')
elif keycode == 'pageup' or keycode == 'pagedown':
""" Appui sur touche flèche page-haut ou flèche page-bas. """
self.magn = 100
arrows(keycode == 'pageup')
def on_mouse_move(self, ev):
""" Le style des lignes est définidans self.build_axis(). """
for o_ax in self.l_ax:
o_ax.vline.set_data([ev.xdata, ev.xdata], [0, 1])
o_ax.hline.set_data(([0, 1], [ev.ydata, ev.ydata]) if o_ax == ev.inaxes else ([0, 0], [0, 0]))
if ev.xdata is not None and ev.ydata is not None:
ax = self.get_last_axis()
ax.note_x.remove() # Effacement de l'étiquette précédente.
bbox = {'boxstyle': 'round4', 'fc': '#ff02', 'ec': 'k', 'lw': .5} # fc=face color, ec=edge color
ax.note_x = ax.annotate(text=round(ev.xdata), xy=(ev.xdata, ax.viewLim.y0), bbox=bbox)
if self.pointer <= 1 or not self.b_anim:
plt.draw() # Nécessaire si pas d'animation (lorsque self.pointer reste à 1).
def show(self):
self.l_ax = self._get_all_ax()
self.fig.canvas.mpl_connect('key_press_event', self.key_event)
if self.od.read(['figure', 'leader_lines']):
self.fig.canvas.mpl_connect('motion_notify_event', self.on_mouse_move)
if self.b_anim:
ani = matplotlib.animation.FuncAnimation(self.fig, self.animate, frames=None, interval=.1, repeat=True)
# |_ La variable {ani} doit exister pour empêcher le garbage collector de supprimer l'animation.
self.animate()
plt.show()
BackTest
→ \trading\strategies\backtest.py
:
""" Version 2022-03-05 - Algorithme génétique. https://pypi.org/project/geneticalgorithm2/ v6.5.0 """
# Imports externes
import numpy as np
import pandas as pd
# Imports internes
from functions.utils import DateTime
# noinspection PyUnresolvedReferences, PyArgumentList
class BackTest:
def __init__(self):
self.l_columns = None
self.np_datas = None
self.nb_rows = 0
""" Statistiques. """
self.np_trades = None
self.close_cond = None # Fonction : condition de fermeture d'un trade.
self.df_marks = None # dataframe
self.d_cost = None # dictionary
def fitness(self, **d_args):
""" Code appelant : UI d'affichage. """
""" Élimination des colonnes Open, High, Low, ... ainsi que celles pour le ga (commençant par '_'). """
self.l_columns = ['Close'] + [col for col in list(self.df_pilot.columns[7:]) if not col.startswith('_')]
self.np_datas = self.df_pilot[self.l_columns].values
self.d_cost = d_args.get('cost', {}) # Arguments pour la fonction de coût.
self.nb_rows = self.np_datas.shape[0]
self.np_trades = np.zeros((self.nb_rows, 11), dtype=np.float32)
""" |_ Colonnes de self.np_trades : 0 : gains (>0 ou <0) clôturés, en pips (trades fermés). cumulé
1 : gains en cours d'évolution, en pips (trades ouverts). cumulé
2 : gains total (clôturés + en évolution), en pips. cumulé
3 : gains (>0 ou <0) clôturés, en monnaie (€ ou $). cumulé
4 : gains en cours d'évolution (€ ou $). cumulé
5 : gains total (clôturés + en évolution) (€ ou $). cumulé
6 : Flag d'ouverture (-1/+1), effacé à la fermeture.
7 : marks d'ouverture (-1/+1), permanent.
8 : indx de fermeture
9 : gain du trade, en pips
10 : gain du trade, en monnaie (€ ou $) """
indx = 0
while np.isnan(np.sum(self.np_datas[indx])):
""" Recherche de l'index de la 1ère ligne où aucune valeur n'est nan. """
indx += 1
if indx >= self.nb_rows - 1:
break
d_args['indx'] = indx # <-- Ajout de cette information dans les arguments.
return self.cost(**d_args)
def cost(self, **d_params):
"""
@param d_params: paramètres spécifiques à cette stratégie : stop-loss, take-profit, seuils divers, etc.
@return: dépend du code appelant -> (Calcul du gain + Marqueurs) ou (Process d'algorithme génétique).
- Si 'Gain + Marqueurs' : tuple de 2 DataFrames : (df_gains, df_marks).
- Si 'GA' : -1 * Gain. Le -1 s'explique par le fait que le ga cherche à minimiser le résultat.
"""
""" Calculs. """
spread = self.d_cost.get('spread', 2)
l_open_indx = np.where(self.np_trades[:, 7] != 0)[0] # Liste des index d'ouverture de trades.
ll_marks = list()
balance = self.d_cost.get('capital', 1000) # Capital initial.
self.np_trades[0, 3] = balance
risk = self.d_cost.get('risk', 1)
for open_indx in l_open_indx:
l_trade = self.np_trades[open_indx]
close_indx = int(l_trade[8])
coef = l_trade[7] # +1 ou -1 <-- achat ou vente.
price = self.np_datas[open_indx, 0]
gains = (self.np_datas[close_indx, 0] - price) * coef - spread
np_gains = (self.np_datas[open_indx: close_indx, 0] - price) * coef
""" Currencies (€ ou $) clôturés. """
curr = gains * balance * risk * self.pips # Currencies (€ ou $).
self.np_trades[open_indx, 10] = curr
self.np_trades[close_indx, 3] += curr
balance += curr
""" Currencies (€ ou $) ouverts. """
self.np_trades[open_indx: close_indx, 4] += np_gains * balance * risk * self.pips
""" Marqueurs. """
op = 'achat' if l_trade[7] == 1 else 'vente'
ll_marks.append([open_indx, price, f'Ouvre {op}'])
close_indx = int(l_trade[8])
if close_indx > 0:
ll_marks.append([close_indx, self.np_datas[close_indx, 0], f'Ferme {op}'])
self.np_trades[:, 0] = np.nancumsum(self.np_trades[:, 0])
self.np_trades[:, 2] = self.np_trades[:, 0] + self.np_trades[:, 1]
self.np_trades[:, 3] = np.cumsum(self.np_trades[:, 3])
self.np_trades[:, 5] = self.np_trades[:, 3] + self.np_trades[:, 4]
""" Anti-chevauchement des marqueurs. """
ll_marks.sort()
i, indx_ante = 4, 0
for l_mark in ll_marks:
if l_mark[0] == indx_ante:
l_mark[1] += i
else:
indx_ante = l_mark[0]
self.df_marks = pd.DataFrame(data=ll_marks, columns=['Index', 'Close', 'Type'])
return pd.DataFrame(data=self.np_trades[:, :6],
columns=['Close_p', 'Open_p', 'All_p', 'Close_c', 'Open_c', 'All_c'])
def bt_stats(self):
""" bt_stats = Statistiques des backtests. """
dt = DateTime()
stamp_from = self.df_pilot.iloc[0, 0]
stamp_to = self.df_pilot.iloc[-1, 0]
nb_days = max(1, round((stamp_to - stamp_from) / 86_400))
_from = dt.get_dtstr_from_dtstamp(stamp_from, dt_format='%d/%m/%Y')
_to = dt.get_dtstr_from_dtstamp(stamp_to, dt_format='%d/%m/%Y')
capital_ini = round(float(self.np_trades[0, 5]), 2)
capital_min = round(float(np.min(self.np_trades[:, 5])), 2)
crashed = '← crashed !' if capital_min < 0 else ''
capital_max = round(float(np.max(self.np_trades[:, 5])), 2)
capital_final = round(float(self.np_trades[-1, 5]), 2)
perf = round(100 * (capital_final - capital_ini) / capital_ini, 2)
nb_buy = np.sum(self.np_trades[:, 7] == 1)
nb_sell = np.sum(self.np_trades[:, 7] == -1)
nb_winner = np.sum(self.np_trades[:, 9] > 0)
nb_loser = np.sum(self.np_trades[:, 9] < 0)
mask = self.np_trades[:, 10] != 0
l_trades = self.np_trades[:, 10][mask]
win_consec = lose_consec = win_consec_max = lose_consec_max = 0
score_ante = l_trades[0]
score_win = score_lose = 0
for score in l_trades:
if score > 0:
score_win += score
else:
score_lose += score
if score > 0 and score_ante > 0:
win_consec += 1
win_consec_max = max(win_consec_max, win_consec)
elif score < 0 and score_ante < 0:
lose_consec += 1
lose_consec_max = max(lose_consec_max, lose_consec)
else:
win_consec = lose_consec = 0
score_ante = score
if nb_winner > 0:
score_win /= nb_winner
if nb_loser > 0:
score_lose /= nb_loser
pips_max_indx = np.argmax(self.np_trades[:, 9])
pips_max = round(float(self.np_trades[pips_max_indx, 9]), 2)
stamp = self.df_pilot.iloc[pips_max_indx, 0]
pips_max_dt = dt.get_dtstr_from_dtstamp(stamp, dt_format='%d/%m/%Y')
pips_max_indexes = f'index de {pips_max_indx} à {int(self.np_trades[pips_max_indx, 8])}'
pips_min_indx = np.argmin(self.np_trades[:, 9])
pips_min = round(float(self.np_trades[pips_min_indx, 9]), 2)
stamp = self.df_pilot.iloc[pips_min_indx, 0]
pips_min_dt = dt.get_dtstr_from_dtstamp(stamp, dt_format='%d/%m/%Y')
pips_min_indexes = f'index de {pips_min_indx} à {int(self.np_trades[pips_min_indx, 8])}'
curr_max_indx = np.argmax(self.np_trades[:, 10])
curr_max = round(float(self.np_trades[curr_max_indx, 10]), 2)
stamp = self.df_pilot.iloc[curr_max_indx, 0]
curr_max_dt = dt.get_dtstr_from_dtstamp(stamp, dt_format='%d/%m/%Y')
curr_max_indexes = f'index de {curr_max_indx} à {int(self.np_trades[curr_max_indx, 8])}'
curr_min_indx = np.argmin(self.np_trades[:, 10])
curr_min = round(float(self.np_trades[curr_min_indx, 10]), 2)
stamp = self.df_pilot.iloc[curr_min_indx, 0]
curr_min_dt = dt.get_dtstr_from_dtstamp(stamp, dt_format='%d/%m/%Y')
curr_min_indexes = f'index de {curr_min_indx} à {int(self.np_trades[curr_min_indx, 8])}'
g_max = -10**8
drawdown = 0
curr_dwn_indx = 0
for i, g in enumerate(self.np_trades[:, 2]):
g_max = max(g_max, g)
if (g_max - g) > drawdown:
curr_dwn_indx = i
drawdown = g_max - g
stamp = self.df_pilot.iloc[curr_dwn_indx, 0]
curr_dwn_dt = dt.get_dtstr_from_dtstamp(stamp, dt_format='%d/%m/%Y')
""" Contexte. """
total_pips = round(float(np.sum(self.np_trades[:, 9])), 2)
od_pilot = self.central_args('pilot')
instrument = od_pilot['instrument']
table = od_pilot.read('table')
pc_from = od_pilot.read('pc_from')
pc_to = od_pilot.read('pc_to')
nb_rows = self.df_pilot.shape[0]
typ = f'Renko {table}' if isinstance(table, int) else f'Candles-{table}' if isinstance(table, str) else 'Ticks'
if pc_from is None:
points = f"{nb_rows} points jusqu'à {pc_to} % de la table en base de données."
elif pc_to is None:
points = f"{nb_rows} points à partir de {pc_from} % de la table en base de données."
else:
points = f"Tous les points entre {pc_from} % et {pc_to} % de la table en base de données."
result = "Statistiques :" \
"\n=============" \
f"\nContexte : {instrument} - {typ} - {points}" \
f"\nPériode : {_from} -> {_to}" \
f"\nNb jours: {nb_days}" \
f"\nGain journalier: {round((capital_final - capital_ini) / nb_days, 2)} €." \
f"\n\nCapital initial: {capital_ini} €" \
f"\nCapital min: {capital_min} € {crashed}" \
f"\nCapital max: {capital_max} €" \
f"\nCapital final: {capital_final} €" \
f"\nPerformance: {perf} %" \
f"\nGain en pips: {total_pips} pips" \
f"\n\nNb trades à l'achat: {nb_buy}" \
f"\nNb trades à la vente: {nb_sell}" \
f"\nNb trades gagnants: {nb_winner}" \
f"\nNb trades perdants: {nb_loser}" \
f"\nNb max gagnants consécutifs: {win_consec_max}" \
f"\nNb max perdants consécutifs: {lose_consec_max}" \
f"\n\nGain moyen trades gagnants: {round(score_win, 2)} €" \
f"\nGain moyen trades perdants: {round(score_lose, 2)} €" \
f"\nMeilleur trade (pips): {pips_max} pips, le {pips_max_dt} ({pips_max_indexes})" \
f"\nMeilleur trade (monnaie): {curr_max} €, le {curr_max_dt} ({curr_max_indexes})" \
f"\nPire trade (pips): {pips_min} pips, le {pips_min_dt} ({pips_min_indexes})" \
f"\nPire trade (monnaie): {curr_min} €, le {curr_min_dt} ({curr_min_indexes})" \
f"\nPerte max (drawdown): {round(drawdown)} pips le {curr_dwn_dt} (index {curr_dwn_indx})" \
"\n-------------------------------------------------------------------------------"
print(result)
def closures(self, close_indx):
""" Fermetures, options possibles :
- Modifier le code pour permettre des fermetures partielles.
|_ https://www.google.fr/search?q=trading+fermetures+partielles
- Modifier le code dans {close_cond()} de la classe dérivée pour permettre les stops suiveurs.
|_ https://www.google.fr/search?q=trading+stop+suiveur
"""
spread = self.d_cost.get('spread', 2)
b_end = close_indx >= self.nb_rows - 1 # Flag de fermeture de tous les résiduels.
for open_indx in np.where(self.np_trades[:, 6] != 0)[0]: # Liste des indx des trades ouverts (flags col 6).
""" Liste des indx trades ouverts. Le flag6 (col 6) est obligatoirement +1 ou -1. """
if b_end or self.close_cond(open_indx): # Fonction dans la classe dérivée.
self.np_trades[open_indx, 6] = 0 # RAZ flag.
self.np_trades[open_indx, 8] = close_indx # Index de fermeture
coef = self.np_trades[open_indx, 7]
price = self.np_datas[open_indx, 0]
gains = (self.np_datas[close_indx, 0] - price) * coef - spread
np_gains = (self.np_datas[open_indx: close_indx, 0] - price) * coef
self.np_trades[open_indx, 9] = gains # 9 : gain du trade, en pips
self.np_trades[close_indx, 0] += gains # 0 : gains (>0 ou <0) clôturés, en pips.
self.np_trades[open_indx: close_indx, 1] += np_gains # 1 : gains en cours d'évolution, en pips.
def opening(self, op, indx):
""" 2 flags d'ouverture (-1/+1) : col 6 -> effacé à la fermeture, col 7 -> permanent. """
self.np_trades[indx, 6: 8] = 1 if op == 'buy' else -1 # 1='buy', -1='sell'.
""" Voir colonnes de self.np_trades dans self.fitness() """
def cond(self, txt, indx):
""" Exemple : txt = 'AO-DRV cu 0' signifie : L'indicateur AO-DRV croise vers le haut (cross-up) la valeur 0.
Opérateurs disponibles : /, \\, --, v, ^, cu, cd, ==, !=, >, >=, <, <= """
def iscolumn(column_name):
if self.txt_is_number(column_name):
return True
if column_name in self.l_columns:
return True
print(f"Classe GeneticAlgorithm, méthode 'cond()' :\nLa colonne '{column_name}' n'existe pas.")
return False
def col(col_name):
return self.l_columns.index(col_name)
l_c = txt.split() # Séparateur = ' ' (espace).
left = l_c[0] # str
op = l_c[1] # str
if not iscolumn(left):
return False
if len(l_c) == 2:
""" /, \\, --, v, ^ """
col = col(left)
if op in ['/', '\\', '--'] and indx > 0:
""" Pente positive (/), négative (\\) ou nulle(--). """
sign_1, sign_0 = self.np_datas[indx - 1, col], self.np_datas[indx, col]
return sign_0 > sign_1 if op == '/' else sign_0 == sign_1 if op == '--' else sign_0 < sign_1
elif op in ['v', '^'] and indx > 1:
""" Les 3 points précédents forment un creux (v) ou un sommet (^). """
sign_2, sign_1, sign_0 = (self.np_datas[indx - 2, col], self.np_datas[indx - 1, col],
self.np_datas[indx, col])
return sign_1 < sign_2 and sign_1 < sign_0 if op == 'v' else sign_1 > sign_2 and sign_1 > sign_0
pass
elif len(l_c) == 3:
""" cu, cd, ==, !=, >, >=, <, <= """
right = l_c[2]
if not iscolumn(right):
return False
if op in ['cu', 'cd']:
""" Cross up & cross down : Croisements à la hausse ou à la baisse. """
col_moved = left if self.txt_is_number(left) else col(left) # Valeur en str ou N° col.
col_ref = right if self.txt_is_number(right) else col(right)
way = 'up' if op == 'cu' else 'down'
return self.cross(way, indx, col_moved, col_ref)
elif op in ['==', '!=', '>', '>=', '<', '<=']:
""" Comparaisons. """
left = float(left) if self.txt_is_number(left) else self.np_datas[indx, col(left)]
right = float(right) if self.txt_is_number(right) else self.np_datas[indx, col(right)]
if op == '==':
return left == right
elif op == '!=':
return left != right
elif op == '>':
return left > right
elif op == '>=':
return left >= right
elif op == '<':
return left < right
else: # '<='
return left <= right
return False
@staticmethod
def txt_is_number(text):
""" {text} est une chaîne ET représente un nombre (int ou float). """
if isinstance(text, str):
try:
float(text)
return True # Est une chaîne ET représente un nombre => True.
except (Exception,):
return False # Est une chaîne mais ne représente pas un nombre => False.
else:
return False # N'est pas une chaîne => False.
def cross(self, way, indx, col_moved, col_ref):
if self.txt_is_number(col_ref):
ref_now = ref_ante = float(col_ref)
else:
ref_now, ref_ante = self.np_datas[indx, col_ref], self.np_datas[indx - 1, col_ref]
if self.txt_is_number(col_moved):
mov_now = mov_ante = int(col_moved)
else:
mov_now, mov_ante = self.np_datas[indx, col_moved], self.np_datas[indx - 1, col_moved]
if ref_now == mov_now:
return False
i = 2
while ref_ante == mov_ante:
ref_ante = float(col_ref) if self.txt_is_number(col_ref) else self.np_datas[indx - i, col_ref]
mov_ante = float(col_moved) if self.txt_is_number(col_moved) else self.np_datas[indx - i, col_moved]
i += 1
b_cross = (ref_now - mov_now) * (ref_ante - mov_ante) < 0
return (mov_now > ref_now) == (way == 'up') if b_cross else False
""" ***************************** Méthodes surchargées ****************************** """
@staticmethod
def central_args(l_keys):
raise SystemExit("genetic.py > GeneticAlgorithm :\nLa méthode 'central_args()' doit être surchargée.")
def add_indics(self, od_args=None):
raise SystemExit("genetic.py > GeneticAlgorithm :\nLa méthode 'add_indics()' doit être surchargée.")
\trading\strategies\models\
\trading\strategies\models\strategy_model.py
:
""" Version 2022-03-05 ---------------------------- SUIVRE CES ÉTAPES ----------------------------
--- Imagination de la stratégie - Théorie :
===========================================
1 - Simple texte explicatif de quelques lignes, en langage courant (non informatique).
2 - Algorithme au format texte.
3 - Méthodes d'optimisation.
4 - Liste des documents dans le même dossier : diagrammes, images, fichiers tableurs, liens internet, etc.
5 - Liste des indicateurs utilisés : existant ou inventés.
--- Construction de la stratégie - Pratique :
=======================================================================================
1 - Titres : fenêtre et graphiques -> Compléter window_title et figure_title (dans central_args()).
2 - Choix du signal pilote : compléter self.central_args('pilot'). Choisir peu de points pour la MAP du code (ex : 1000)
3 - Ajouter indicateurs : compléter {self.add_indics()} en même temps que {self.central_args('indics')}.
4 - Afficher indicateurs : compléter {self.distrib()}.
5 - Activer ou désactiver l'affichage des points d'intérêt sur les graphiques, dans {self.post_params()} :
|_ Un scatterplot est déjà proposé pour l'axe 'Principal', mais tout est modifiable.
6 - Méthode self.cost() :
a - Coder la condition d'ouverture à l'achat.
|_ Lancer l'appli : vérifier l'affichage des points d'ouverture et l'affichage des statistiques.
b - Coder la condition de fermeture dans la fonction close_cond().
|_ Lancer l'appli : vérifier également l'affichage des points de fermeture des achats.
c - Terminer en codant les ventes : conditions d'ouverture et de fermeture.
7 - Résultats obtenus : affichage des statistiques.
pandas_ta -> https://github.com/twopirllc/pandas-ta
"""
# Imports externes
# import pandas_ta as ta # Utilisé pour lire la doc : help(pdt.macd) Ne pas supprimer.
import seaborn as sns
# Imports internes
from functions.utils import Dictionary
from show.show_geek import ShowGeek
from trading.strategies.backtest import BackTest
# noinspection PyTypeChecker
class Geek(ShowGeek, BackTest):
def __init__(self):
super(Geek, self).__init__() # GeneticAlgorithm.__init__() est appelé dans ShowGeek.
""" Mode 'normal', selon votre imagination : Création, exécution et affichage de la stratégie. """
self.show_ui()
@staticmethod
def central_args(l_keys):
od_args = Dictionary(dict(
ui=dict( # Ces valeurs seront à la racine du super-dictionnaire self.od.
geometry=(100, 40, 1200, 600), # x, y, w, h. Par défaut : (100, 40, 1200, 700).
abscissa_size=600, # Nb de points affichés en abscisse. Par défaut : 600.
window_title="Stratégie xxxxxxxxxxxxxxxxxx", # Titre de la fenêtre.
figure_title="Mise au point du modèle 'strategy'", # Titre des graphiques.
subplots=dict( # Noms et hauteurs (en %) des graphiques modifiables. Supprimer / Ajouter.
Principal=40, # Subplot principal : NE PAS MODIFER SON ORTHOGRAPHE.
Gains='',
Pips='', # Si chaîne vide => Les hauteurs seront automatiquement calculées et réparties.
),
),
pilot=dict( # Dans cet exemple : les 10 000 derniers points de EUR/USD en Renko10.
instrument='EUR/USD', # EURUSD, USDJPY, EURCHF, USDCAD, NZDUSD, EURGBP, EURJPY, GBPJPY, GBPCHF, ...
table=5, # int (ex : 10) = Renko, str (ex: 'H1') = Candles, None = Ticks.
# pc_from=0, # en % : les valeurs après {pc_from}.
pc_to=100, # en % dans la table-db. Les valeurs avant {pc_to}.
nb_rows=20_000, # Nb points étudiés. Non utilisé si les 2 existent ({pc_from} ET {pc_to}).
),
indics=dict(
ma1=dict(length=170), # Indicateur moyenne mobile 1.
ma2=dict(length=273.7), # Indicateur moyenne mobile 2.
cost=dict(capital=1000, risk=20, spread=1.1) # Params suppl. pour la méthode cost().
),
))
return Dictionary(od_args.read(l_keys, {}))
def add_indics(self, od_args=None):
""" Indicateurs techniques en D:/anaconda/envs/robot/Lib/site-packages/pandas_ta <-- Ne pas les modifier !!!
Doc indicateurs : https://github.com/twopirllc/pandas-ta#indicators-by-category
Génération de signaux dérivés : ajout de colonnes au dataframe {df_pilot}. """
""" Voir la documentation d'un indicateur. """
# help(ta.trix) # <-- Affiche la doc (Décommenter la ligne import pandas_ta as ta).
""" Signaux pour le calcul des gains, les marqueurs et l'affichage. """
od_args = self.central_args('indics')
# Moyennes à tester : sma, ema, dema, tema, hma, wma, alma, kama, zlma, rma, median, ...
self.df_pilot['MA1'] = self.df_pilot.ta.zlma(**od_args['ma1'])
self.df_pilot['MA2'] = self.df_pilot.ta.zlma(**od_args['ma2'])
""" Gains en pips et en monnaie (€ ou $). """
l_gains = ['Balance (pips)', 'Open (pips)', 'Equity (pips)', 'Balance (€)', 'Open (€)', 'Equity (€)']
self.df_pilot[l_gains] = self.fitness(**od_args)
""" Option : Affiche les statistiques du backtest."""
self.bt_stats()
""" Option : Signaux uniquement pour l'affichage. """
# self.df_pilot['xxxxx'] = self.get_indic(self.df_pilot.ta.{indic}, **od_args['xxx']) # <-- Exemple
def distrib(self):
""" Méthode optionnelle : Répartition des signaux entre les différents graphiques (axes). """
""" - Les noms des graphiques doivent correspondre avec ceux déclarés dans central_args(['ui', 'subplots']).
- Ceux des signaux (indicateurs, ...) doivent correspondre avec ceux déclarés dans add_indics(). """
self.add_df('Principal', ['MA1', 'MA2'])
self.add_df('Gains', ['Balance (€)', 'Equity (€)'])
self.add_df('Pips', ['Balance (pips)', 'Equity (pips)'])
def pre_params(self):
""" Méthode optionnelle. A supprimer ou à commenter s'il n'y a pas de surcharge.
Surcharge, insertion (avant super()) ou ajout (après super()) au code pre_params() de la classe-parent. """
super().pre_params()
""" Légendes épinglées. Écriture dans le super-dictionnaire -> ex : cm = Center-Middle. """
self.od.write(['Principal', 'legend'], 'ch') # 'rh', 'lh', 'lb', 'rb', 'rm', 'lm', 'rm', 'cb', 'ch', 'cm'
self.od.write(['Milieu', 'legend'], 'cm')
""" Vérification (supprimer ou commenter après vérification). """
# self.od.print()
def post_params(self):
""" Méthode optionnelle. A supprimer ou à commenter si la surcharge est inutile.
Surcharge, insertion (avant super()) ou ajout (après super()) au code post_params() de la classe-parent. """
""" Affichage des marqueurs. """
if self.df_marks is not None:
o_ax = self.o_ax('Principal')
sns.scatterplot(data=self.df_marks, x='Index', y='Close', ax=o_ax, hue='Type', size='Type',
sizes=(30, 30), zorder=4)
""" Tracé de lignes - https://matplotlib.org/stable/api/_as_gen/matplotlib.axes.Axes.axhline.html """
# self.trace_hline('Milieu', 0, lw=.4, ls='-.', c='b') # hline = ligne horizontale (vline = verticale).
# self.trace_hline('Bas', 0, lw=.4, ls='-.', c='g')
""" Coloriage statique. Exemple : Si le graphique 'Milieu' supporte un RSI, 2 bandes (30 à 40 et 60 à 70). """
# o_ax = self.o_ax('Milieu')
# if self.is_axis(o_ax):
# l_args = [
# dict(y1=30, y2=40, fc='#0aa1', ec='#f002'),
# dict(y1=60, y2=70, fc='#0aa1', ec='#f002'),
# ]
# self.fill_between(o_ax, l_args)
super().post_params() # Code avant -> inséré, code après -> ajouté.
def hook_axis_anim(self, axis_name, x, o_ax, df, y_min, y_max, get_line):
""" Méthode optionnelle. A supprimer ou à commenter si la surcharge est inutile.
Surcharge, insertion ou ajout au code hook_axis_anim() de la classe-parent. """
super().hook_axis_anim(axis_name, x, o_ax, df, y_min, y_max, get_line)
""" Coloriage dynamique : https://matplotlib.org/stable/api/_as_gen/matplotlib.axes.Axes.fill_between.html """
if axis_name == 'Principal':
ma1 = get_line('MA1') # get_line = fonction passée en paramètre.
ma2 = get_line('MA2')
l_args = [
dict(x=x, y1=ma1, y2=ma2, where=ma1 > ma2, fc='#0aa2', ec='#f004', interpolate=True),
dict(x=x, y1=ma1, y2=ma2, where=ma1 < ma2, fc='#a0a2', ec='#0f04', interpolate=True),
]
self.fill_between(o_ax, l_args)
def hook_line_anim(self, axis_name, line_name, x, y, o_ax, df, o_line):
""" Méthode optionnelle. A supprimer ou à commenter si la surcharge est inutile.
Surcharge ou ajout au code hook_line_anim() de la classe-parent. """
hook = super().hook_line_anim(axis_name, line_name, x, y, o_ax, df, o_line) # hook = Valeur de retour.
""" - Masque la partie droite des graph. pour permettre de voir certaines prédictions (ex: nuage ichimoku).
- Masquage conditionnel : ajoutez des conditions avec axis_name, line_name, ... """
# y.loc[x[-100]:] = np.nan # La valeur 100 est un exemple, la remplacer par une variable.
return hook
def cost(self, **d_params):
""" Méthode obligatoire : coding de l'algorithme des ouvertures et fermetures.
Datas disponibles : {self.np_datas} <-- les colonnes correspondent à {self.l_columns}. """
""" Fonctions internes obligatoires. """
def c(txt):
""" txt : texte de la condition. Opérateurs disponibles : /, \\, --, v, ^, cu, cd, ==, !=, >, >=, <, <= """
return self.cond(txt, indx)
def open_cond(op):
self.opening(op, indx)
def close_cond(indx_open):
""" Fermeture conditionnelle du trade ouvert à {indx_open}. """
op = self.np_trades[indx_open, 6] # 1 = achat, -1=vente.
return c('MA1 cu MA2') if op == 1 else c('MA1 cd MA2')
self.close_cond = close_cond
""" Paramètres de la stratégie : self.d_cost. Commencent par 'p_'. """
# ...
# print(self.l_columns) # Colonnes de np_datas. A commenter ou a supprimer.
""" Colonnes de np_datas = ['Close', 'MA1', 'MA2'] <-- Liste fournie par le print ci-dessus. """
indx_min = d_params['indx']
for indx in range(indx_min, self.nb_rows):
""" Fermetures. """
self.closures(indx)
""" Ouverture achat (ob = Open Buy). """
if c('MA1 cd MA2'):
open_cond('buy')
""" Ouverture vente (os = Open Sell). """
if c('MA1 cu MA2'):
open_cond('sell')
return super().cost(**d_params)
if __name__ == '__main__':
Geek()
Créez vos propres modèles et placez-les dans ce dossier \models\
strategy_model.py
.EUR/USD, Renko de maille 5, les 20 000 derniers points
), sont :Statistiques : ============= Contexte : EUR/USD - Renko 5 - 20000 points jusqu'à 100 % de la table en base de données. Période : 28/10/2020 -> 10/12/2021 Nb jours: 408 Gain journalier: 25.49 €. Capital initial: 1000.0 € Capital min: 699.16 € Capital max: 11679.06 € Capital final: 11401.09 € Performance: 1040.11 % Gain en pips: 1697.5 pips Nb trades à l'achat: 88 Nb trades à la vente: 87 Nb trades gagnants: 118 Nb trades perdants: 57 Nb max gagnants consécutifs: 7 Nb max perdants consécutifs: 4 Gain moyen trades gagnants: 231.23 € Gain moyen trades perdants: -296.22 € Meilleur trade (pips): 113.9 pips, le 16/12/2020 (index de 3405 à 3464) Meilleur trade (monnaie): 1339.89 €, le 30/11/2021 (index de 19275 à 19352) Pire trade (pips): -181.1 pips, le 31/03/2021 (index de 9438 à 9773) Pire trade (monnaie): -1299.13 €, le 16/06/2021 (index de 12726 à 12927) Perte max (drawdown): 344 pips le 05/03/2021 (index 8094) -------------------------------------------------------------------------------
\trading\strategies\partie_1_voir\Strategie_01_double_RSI\
main_01.py
.Run 'main_01'
.RSI
: il faut donc refaire le code de self.add_indics()
et du dictionnaire de paramètres dans self.central_args()
.self.cost()
.EUR/USD, Renko10, du 11/04/2018 au 10/12/2021
.☐ \trading\strategies\partie_1_voir\Strategie_01_double_RSI\main_01.py
:
"""
- Utiliser les consignes du modèle 'voir_model.py' pour détailler ici toutes les explications nécessaires :
- Croquis, liens internet, bibliographie, vidéos, etc.
"""
# Imports externes
# import pandas_ta as ta # Utilisé seulement pour lire la doc : help(pdt.macd)
import seaborn as sns
# Imports internes
from functions.utils import Dictionary
from show.show_geek import ShowGeek
from trading.strategies.backtest import BackTest
# noinspection PyTypeChecker
class Geek(ShowGeek, BackTest):
def __init__(self):
super(Geek, self).__init__() # GeneticAlgorithm.__init__() est appelé dans ShowGeek.
""" Mode 'normal', selon votre imagination : Création, exécution et affichage de la stratégie. """
self.show_ui()
@staticmethod
def central_args(l_keys):
od_args = Dictionary(dict(
ui=dict( # Ces valeurs seront à la racine du super-dictionnaire self.od.
geometry=(400, 40, 1200, 600), # x, y, w, h. Par défaut : (100, 40, 1000, 700).
abscissa_size=600, # Nb de points affichés en abscisse. Par défaut : 600.
window_title="Stratégie 01", # Titre de la fenêtre.
figure_title="Double RSI", # Titre des graphiques.
subplots=dict( # Noms et hauteurs (en %) des graphiques modifiables. Supprimer / Ajouter.
Principal=40, # Subplot principal : NE PAS MODIFER SON ORTHOGRAPHE.
Milieu='',
Milieu2='',
Gains='', # Si chaîne vide => Les hauteurs seront automatiquement calculées et réparties.
),
),
pilot=dict( # Dans cet exemple : les 10 000 derniers points de EUR/USD en Renko10.
instrument='EUR/USD', # EURUSD, USDJPY, EURCHF, USDCAD, NZDUSD, EURGBP, EURJPY, GBPJPY, GBPCHF, ...
table=10, # int (ex : 10) = Renko, str (ex: 'H1') = Candles, None = Ticks.
# pc_from=0, # en % : les valeurs après {pc_from}.
pc_to=100, # en % dans la table-db. Les valeurs avant {pc_to}.
nb_rows=20_000, # Nb points étudiés. Non utilisé si {pc_from} ET {pc_to} existent.
),
indics=dict(
rsi_l=dict(length=50),
rsi_s=dict(length=4),
cost=dict(tp=50, sl=30, lim_rsi=40, capital=1000, risk=4, spread=1.1)
),
))
return Dictionary(od_args.read(l_keys, {}))
def add_indics(self, d_args=None):
""" Indicateurs utilisés dans cette stratégie. Les arguments sont dans d_args_ga, section 'Indicateurs'. """
""" Indicateurs en D:/anaconda/envs/robot/Lib/site-packages/pandas_ta <-- Ne pas les modifier !!!
Doc indicateurs : https://github.com/twopirllc/pandas-ta#indicators-by-category
Génération de signaux dérivés : ajout de colonnes au dataframe {df_pilot}. """
# help(ta.rsi) # <-- Affiche la doc.
""" Signaux pour le calcul des gains, les marqueurs et l'affichage. """
od_args = self.central_args('indics')
self.df_pilot['RSI_L'] = self.df_pilot.ta.rsi(**od_args['rsi_l'])
self.df_pilot['RSI_S'] = self.df_pilot.ta.rsi(**od_args['rsi_s'])
""" Gains en pips et en monnaie (€ ou $). """
l_gains = ['Balance (pips)', 'Open (pips)', 'Equity (pips)', 'Balance (€)', 'Open (€)', 'Equity (€)']
self.df_pilot[l_gains] = self.fitness(**od_args)
""" Affiche les statistiques du backtest."""
self.bt_stats()
""" Signaux seulement pour l'affichage. """
# self.df_pilot['xxxxx'] = self.get_indic(self.df_pilot.ta.{indic}, **d_args['xxx']) # <-- Exemple
def distrib(self):
""" Répartition des signaux entre les différents graphiques (axes). """
""" - Les noms des graphiques doivent correspondre avec ceux déclarés dans central_args(['ui', 'subplots']).
- Ceux des signaux (indicateurs, ...) doivent correspondre avec ceux déclarés dans add_indics(). """
self.add_df('Milieu', ['RSI_L'])
self.add_df('Milieu2', ['RSI_S'])
self.add_df('Gains', ['Balance (€)', 'Equity (€)'])
def pre_params(self):
""" Surcharge, insertion (avant super()) ou ajout (après super()) au code pre_params() de la classe-parent. """
super().pre_params()
""" Exemple : écriture dans le super-dictionnaire -> ici, légende épinglée au milieu (cm = Center-Middle) """
self.od.write(['Principal', 'legend'], 'ch') # 'rh', 'lh', 'lb', 'rb', 'rm', 'lm', 'rm', 'cb', 'ch', 'cm'
self.od.write(['Milieu', 'legend'], 'cm') # 'rh', 'lh', 'lb', 'rb', 'rm', 'lm', 'rm', 'cb', 'ch', 'cm'
def post_params(self):
""" Surcharge, insertion (avant super()) ou ajout (après super()) au code pre_params() de la classe-parent. """
""" Affichage des marqueurs. """
if self.df_marks is not None:
o_ax = self.o_ax('Principal')
sns.scatterplot(data=self.df_marks, x='Index', y='Close', ax=o_ax, hue='Type', size='Type',
sizes=(50, 50), zorder=4)
""" Exemple de tracé de ligne - https://matplotlib.org/stable/api/_as_gen/matplotlib.axes.Axes.axhline.html """
self.trace_hline('Milieu', 50, lw=.4, ls='-.', c='r')
""" Coloriage statique. Le graphique 'Milieu2' supporte un RSI avec une bande de y1 à y2. """
o_ax = self.o_ax('Milieu2')
if self.is_axis(o_ax):
y1 = self.d_cost['lim_rsi']
if y1 is not None:
y2 = 100 - y1
l_args = [
dict(y1=y1, y2=y2, fc='#0aa3', ec='#f003'),
]
self.fill_between(o_ax, l_args)
super().post_params() # Code avant -> inséré, code après -> ajouté.
def cost(self, **d_params):
def c(txt):
return self.cond(txt, indx)
def open_cond(op):
self.opening(op, indx)
def close_cond(indx_open):
""" Fermeture conditionnelle du trade ouvert à {indx_open}.
Technique classique du take-profit / stop-loss.
"""
op = self.np_trades[indx_open, 6]
price = self.np_datas[indx, 0]
open_price = self.np_datas[indx_open, 0]
if op == 1:
""" A été ouvert à l'achat. """
return (price > open_price + p_tp) or (price < open_price - p_sl)
else:
""" A été ouvert à l'achat. """
return (price < open_price - p_tp) or (price > open_price + p_sl)
self.close_cond = close_cond
""" Paramètres de la stratégie : self.d_cost. Commencent par 'p_'. """
p_rsi_l = self.d_cost['lim_rsi']
p_rsi_h = 100 - p_rsi_l
p_tp = self.d_cost['tp']
p_sl = self.d_cost['sl']
indx = 0
# print(self.l_columns) # Colonnes de np_datas. A commenter ou a supprimer.
""" Colonnes de np_datas = ['Close', 'RSI_L', 'RSI_S'] """
indx_min = d_params['indx']
for indx in range(indx_min, self.nb_rows):
""" Fermetures. """
self.closures(indx)
""" Ouverture achat (ob = Open Buy). """
if c('RSI_L > 50') and c(f'RSI_S cu {p_rsi_l}'):
open_cond('buy')
""" Ouverture vente (os = Open Sell). """
if c('RSI_L < 50') and c(f'RSI_S cd {p_rsi_h}'):
open_cond('sell')
""" Fin. """
return super().cost(**d_params)
if __name__ == '__main__':
g = Geek()
Statistiques : ============= Contexte : EUR/USD - Renko 10 - 20000 points jusqu'à 100 % de la table en base de données. Période : 11/04/2018 -> 10/12/2021 Nb jours: 1339 Gain journalier: 3.2 €. Capital initial: 1000.0 € Capital min: 912.04 € Capital max: 7431.08 € Capital final: 5283.19 € Performance: 428.32 % Gain en pips: 5124.0 pips Nb trades à l'achat: 976 Nb trades à la vente: 984 Nb trades gagnants: 857 Nb trades perdants: 1103 Nb max gagnants consécutifs: 22 Nb max perdants consécutifs: 21 Gain moyen trades gagnants: 83.26 € Gain moyen trades perdants: -60.81 € Meilleur trade (pips): 58.9 pips, le 18/04/2018 (index de 81 à 129) Meilleur trade (monnaie): 161.73 €, le 07/09/2020 (index de 14338 à 14364) Pire trade (pips): -51.1 pips, le 10/11/2020 (index de 15446 à 15514) Pire trade (monnaie): -115.51 €, le 08/09/2020 (index de 14348 à 14412) Perte max (drawdown): 2660 pips le 18/03/2020 (index 9669) -------------------------------------------------------------------------------
\trading\strategies\partie_1_voir\Strategie_02_simple_MM\
p_lim
.skip
pour éviter un mitraillage agressif.self.cost()
.EUR/USD, Renko10, du 11/04/2018 au 10/12/2021
.\trading\strategies\partie_1_voir\Strategie_02_simple_MM\main_02.py
:
"""
- Utiliser les consignes du modèle pour détailler ici toutes les explications nécessaires :
- Croquis, liens internet, bibliographie, vidéos, etc.
"""
# Imports externes
# import pandas_ta as ta # Utilisé pour lire la doc : help(pdt.macd) Ne pas supprimer ni commenter.
import seaborn as sns
# Imports internes
from functions.utils import Dictionary
from show.show_geek import ShowGeek
from trading.strategies.backtest import BackTest
# noinspection PyTypeChecker
class Geek(ShowGeek, BackTest):
def __init__(self):
super(Geek, self).__init__() # GeneticAlgorithm.__init__() est appelé dans ShowGeek.
""" Exécute et affiche la stratégie. """
self.show_ui()
@staticmethod
def central_args(l_keys):
od_args = Dictionary(dict(
ui=dict( # Ces valeurs seront à la racine du super-dictionnaire self.od.
geometry=(400, 40, 1200, 600), # x, y, w, h. Par défaut : (100, 40, 1000, 700).
abscissa_size=20000, # Nb de points affichés en abscisse. Par défaut : 600.
window_title="Stratégie 02", # Titre de la fenêtre.
figure_title="Simple moyenne mobile", # Titre des graphiques.
subplots=dict( # Noms et hauteurs (en %) des graphiques modifiables. Supprimer / Ajouter.
Principal=40, # Subplot principal : NE PAS MODIFER SON ORTHOGRAPHE.
Milieu='',
Pips='',
Gains='',
),
),
pilot=dict( # Dans cet exemple : les 10 000 derniers points de EUR/USD en Renko10.
instrument='EUR/USD',
table=10,
# pc_from=0,
pc_to=100,
nb_rows=20_000,
),
indics=dict(
ma=dict(length=400),
cost=dict(tp=40, sl=30, lim=18, skip=3, capital=1000, risk=10, spread=1.2)
),
))
return Dictionary(od_args.read(l_keys, {}))
def add_indics(self, od_args=None):
""" Indicateurs techniques en D:/anaconda/envs/robot/Lib/site-packages/pandas_ta <-- Ne pas les modifier !!!
Doc indicateurs : https://github.com/twopirllc/pandas-ta#indicators-by-category
Génération de signaux dérivés : ajout de colonnes au dataframe {df_pilot}. """
""" Voir la documentation d'un indicateur. """
# help(ta.sma) # <-- Affiche la doc.
""" Signaux pour le calcul des gains, les marqueurs et l'affichage. """
od_args = self.central_args('indics')
self.df_pilot['MA'] = self.df_pilot.ta.sma(**od_args['ma'])
# |_ Moyennes à tester : sma, ema, dema, tema, hma, wma, alma, kama, zlma, rma, median, ...
self.df_pilot['CLOSE-FLAT'] = self.df_pilot['Close'] - self.df_pilot['MA']
# """ Gains et marqueurs. """
l_gains = ['Balance (pips)', 'Open (pips)', 'Equity (pips)', 'Balance (€)', 'Open (€)', 'Equity (€)']
self.df_pilot[l_gains] = self.fitness(**od_args)
""" Affiche les statistiques du backtest."""
self.bt_stats()
def distrib(self):
""" Répartition des signaux entre les différents graphiques (axes). """
""" - Les noms des graphiques doivent correspondre avec ceux déclarés dans central_args(['ui', 'subplots']).
- Ceux des signaux (indicateurs, ...) doivent correspondre avec ceux déclarés dans add_indics(). """
self.add_df('Principal', ['MA'])
self.add_df('Milieu', ['CLOSE-FLAT'])
self.add_df('Pips', ['Balance (pips)', 'Equity (pips)'])
self.add_df('Gains', ['Balance (€)', 'Equity (€)'])
def pre_params(self):
""" Surcharge, insertion (avant super()) ou ajout (après super()) au code pre_params() de la classe-parent. """
super().pre_params()
""" Exemple : écriture dans le super-dictionnaire -> ici, légende épinglée au milieu (cm = Center-Middle) """
self.od.write(['Principal', 'legend'], 'ch') # 'rh', 'lh', 'lb', 'rb', 'rm', 'lm', 'rm', 'cb', 'ch', 'cm'
self.od.write(['Milieu', 'legend'], 'cm') # 'rh', 'lh', 'lb', 'rb', 'rm', 'lm', 'rm', 'cb', 'ch', 'cm'
def post_params(self):
""" Surcharge, insertion (avant super()) ou ajout (après super()) au code pre_params() de la classe-parent. """
""" Affichage des marqueurs. """
if self.df_marks is not None:
o_ax = self.o_ax('Principal')
sns.scatterplot(data=self.df_marks, x='Index', y='Close', ax=o_ax, hue='Type', size='Type',
sizes=(50, 50), zorder=4)
""" Exemple de tracé de lignes - https://matplotlib.org/stable/api/_as_gen/matplotlib.axes.Axes.axhline.html """
self.trace_hline('Milieu', 180, lw=.4, ls='-.', c='r')
self.trace_hline('Milieu', 0, lw=.4, ls='-.', c='b')
self.trace_hline('Milieu', -180, lw=.4, ls='-.', c='r')
super().post_params() # Code avant -> inséré, code après -> ajouté.
def cost(self, **d_params):
def c(txt):
return self.cond(txt, indx)
def opening(op):
""" 2 flags d'ouverture (-1/+1) : col 6 -> effacé à la fermeture, col 7 -> permanent. """
self.np_trades[indx, 6: 8] = 1 if op == 'ob' else -1
""" Voir colonnes de self.np_trades dans self.fitness() """
def close_cond(indx_open):
""" Fermeture conditionnelle ou au dernier point. """
op = self.np_trades[indx_open, 6] # 1 = achat, -1=vente.
price = self.np_datas[indx, 0]
open_price = self.np_datas[indx_open, 0]
return (price > open_price + p_tp or price < open_price - p_sl) if op == 1\
else (price < open_price - p_tp or price > open_price + p_sl)
self.close_cond = close_cond
""" Paramètres de la stratégie : self.d_cost. Commencent par 'p_'. """
p_tp = self.d_cost.get('tp', 20)
p_sl = self.d_cost.get('sl', 20)
p_lim = self.d_cost.get('lim', 20)
p_skip = self.d_cost.get('skip', 15)
skip = p_skip
# print(self.l_columns) # Colonnes de np_datas. A commenter ou a supprimer.
""" Colonnes de np_datas = ['Close', 'MA', 'MACD', 'HISTO', 'SIGNAL', 'CLOSE-FLAT'] """
indx_min = d_params['indx']
for indx in range(indx_min, self.nb_rows):
""" Fermetures. """
self.closures(indx)
if skip > 0:
skip -= 1
continue
if c('MA /') and c(f'CLOSE-FLAT < {p_lim}') and c(f'CLOSE-FLAT v'):
""" Ouverture achat (ob = Open Buy). """
opening('ob')
skip = p_skip
elif c('MA \\') and c(f'CLOSE-FLAT > {-p_lim}') and c(f'CLOSE-FLAT ^'):
""" Ouverture vente (os = Open Sell). """
opening('os')
skip = p_skip
return super().cost(**d_params)
if __name__ == '__main__':
Geek()
Statistiques : ============= Contexte : EUR/USD - Renko 10 - 20000 points jusqu'à 100 % de la table en base de données. Période : 11/04/2018 -> 10/12/2021 Nb jours: 1339 Gain journalier: 4.34 €. Capital initial: 1000.0 € Capital min: 514.53 € Capital max: 8817.96 € Capital final: 6812.1 € Performance: 581.21 % Gain en pips: 2596.0 pips Nb trades à l'achat: 292 Nb trades à la vente: 378 Nb trades gagnants: 335 Nb trades perdants: 335 Nb max gagnants consécutifs: 11 Nb max perdants consécutifs: 19 Gain moyen trades gagnants: 146.33 € Gain moyen trades perdants: -128.99 € Meilleur trade (pips): 48.8 pips, le 31/05/2018 (index de 982 à 995) Meilleur trade (monnaie): 402.57 €, le 14/10/2020 (index de 14957 à 14962) Pire trade (pips): -51.2 pips, le 30/10/2020 (index de 15136 à 15148) Pire trade (monnaie): -356.46 €, le 16/10/2020 (index de 14982 à 14990) Perte max (drawdown): 1291 pips le 15/04/2020 (index 11362) -------------------------------------------------------------------------------
Notez les variations régulières sur les pips (arithmétiques), et exponentielles sur les gains (géométrique).
En effet, un nombre de pips correspond à une fraction du capital (risque). Si ce dernier augmente, la valeur en €uros des mises (constante en pips), augmente.
\trading\strategies\partie_1_voir\Strategie_03_ichimoku\
L'instant présent se situe à peu près à 716, le nuage va au delà, et le signal chikou_span se termine à peu près à 690.
\trading\strategies\partie_1_voir\Strategie_03_ichimoku\main_03.py
:
# Imports externes
import numpy as np
# Imports internes
from functions.utils import Dictionary
from show.show_geek import ShowGeek
from trading.strategies.backtest import BackTest
class Geek(ShowGeek, BackTest):
def __init__(self):
super(Geek, self).__init__() # GeneticAlgorithm.__init__() est appelé dans ShowGeek.
self.kijun = None
self.show_ui()
@staticmethod
def central_args(l_keys):
od_args = Dictionary(dict(
ui=dict( # Ces valeurs seront à la racine du super-dictionnaire self.od.
geometry=(200, 40, 1200, 600), # x, y, w, h. Par défaut : (100, 40, 1000, 700).
abscissa_size=200, # Nb de points affichés en abscisse. Par défaut : 600.
window_title="Stratégie 03", # Titre de la fenêtre.
figure_title="Affichage ichimoku", # Titre des graphiques.
subplots=dict( # Noms et hauteurs (en %) des graphiques modifiables. Supprimer / Ajouter.
Principal=100, # Subplot principal : NE PAS MODIFER SON ORTHOGRAPHE.
),
),
pilot=dict( # Dans cet exemple : les 10 000 derniers points de EUR/USD en Renko10.
instrument='EUR/USD', # EURUSD, USDJPY, EURCHF, USDCAD, NZDUSD, EURGBP, EURJPY, GBPJPY, GBPCHF, ...
table=3, # int (ex : 10) = Renko, str (ex: 'H1') = Candles, None = Ticks.
# pc_from=0, # en % : les valeurs après {pc_from}.
pc_to=100, # en % dans la table-db. Les valeurs avant {pc_to}.
nb_rows=10_000, # Nb points étudiés. Non utilisé si les 2 existent ({pc_from} ET {pc_to}).
),
indics=dict(
ichimoku=dict(tenkan=9, kijun=26, senkou=52),
),
))
return Dictionary(od_args.read(l_keys, {}))
def add_indics(self, od_args=None):
od_args = self.central_args('indics')
self.kijun = od_args.read(['ichimoku', 'kijun'])
cols = ['spanA', 'spanB', 'tenkan_sen', 'kijun_sen', 'chikou_span']
self.df_pilot[cols] = self.df_pilot.ta.ichimoku(**od_args['ichimoku'])[0]
"""
tenkan sen = Conversion line -> (max + min) / 2 des {tenkan} derniers points.
kijun sen = Base line -> (max + min) / 2 des {kijun} derniers points.
spanA = Lead 1 -> (tenkan sen + kijun sen) / 2, dans le futur de {kijun} points.
spanB = Lead 2 -> (max + min) / 2 des {senkou} derniers points, dans le futur de {kijun} points.
kumo = nuage (cloud) -> zône colorée entre spanA et spanB
chiku span = Lagging span -> Signal, shift de {kijun} points dans le passé.
"""
def distrib(self):
self.add_df('Principal', ['kijun_sen', 'tenkan_sen', 'chikou_span'])
def hook_axis_anim(self, axis_name, x, o_ax, df, y_min, y_max, get_line):
""" Coloriage inter-zônes : https://matplotlib.org/stable/api/_as_gen/matplotlib.axes.Axes.fill_between.html """
if axis_name == 'Principal':
span_a = get_line('spanA')
span_b = get_line('spanB')
l_args = [
dict(x=x, y1=span_a, y2=span_b, where=span_a > span_b, fc='#0aa2', ec='#f004', interpolate=True),
dict(x=x, y1=span_a, y2=span_b, where=span_a < span_b, fc='#a0a2', ec='#0f04', interpolate=True),
]
self.fill_between(o_ax, l_args)
super().hook_axis_anim(axis_name, x, o_ax, df, y_min, y_max, get_line)
def hook_line_anim(self, axis_name, line_name, x, y, o_ax, df, o_line):
""" Masque la partie droite des graphiques pour permettre de voir le futur 'prédit' par le nuage ichimoku. """
if line_name == 'chikou_span':
y.loc[x[2*-self.kijun]:] = np.nan
else:
y.loc[x[-self.kijun]:] = np.nan
return super().hook_line_anim(axis_name, line_name, x, y, o_ax, df, o_line)
if __name__ == '__main__':
Geek()
/partie_1_voir/
pour créer vos propres stratégies.pandas-ta
.self.cost()
.Bonjour les codeurs !