Calculs & affichages / Le mode geek /
Partie 1 : Voir les ouvertures et les fermetures

Aperçu visuel des gains d'une stratégie de trading


Avant-propos

Installer TA-Lib :

TA-Lib est complémentaire à Pandas-ta :


Stratégie :


Nous allons créer un outil pour mettre en évidence les possibles améliorations :


Optimisation :


Description

Plan du tuto :
  1. Préparation.
  2. Test du modèle.
  3. Exemple 1 : double RSI.
  4. Exemple 2 : simple moyenne mobile.
  5. Exemple 3 : Affichage d'un indicateur avec datas dans le futur.
  6. A vous de jouer.

1 - Préparation :

Retour au plan


""" Version 2022-03-05 """
# Imports externes
import datetime
import gzip
import os           # http://www.python-simple.com/python-modules-fichiers/os-path.php (Ctrl + clic)
import requests
from io import BytesIO, StringIO
import pandas as pd
from pandas_ta.custom import create_dir, import_dir  # https://github.com/twopirllc/pandas-ta
import keyboard     # https://www.delftstack.com/fr/howto/python/python-detect-keypress/

# Imports internes
from functions.utils import DateTime, Utils
from trading.historiques.db_candle import DbCandle
from trading.historiques.db_tick import DbTick


class CtrlHistos:
    def __init__(self, instrument):
        """ https://github.com/fxcm/MarketData
            Tous les symboles :
                AUDCAD, AUDCHF, AUDJPY, AUDNZD, AUDUSD, CADCHF, CADJPY, EURAUD, EURCHF,
                EURGBP, EURJPY, EURNZD, EURUSD, GBPCAD, GBPCHF, GBPJPY, GBPNZD, GBPUSD,
                NZDCAD, NZDCHF, NZDJPY, NZDUSD, USDCAD, USDCHF, USDJPY, USDTRY
            Les plus utilisés (Dans l'ordre d'importance) :
                EURUSD, USDJPY, EURCHF, USDCAD, NZDUSD, EURGBP, EURJPY, GBPJPY, GBPCHF, GBPUSD """
        self.instrument = instrument
        self.symbol = instrument.replace('/', '')
        self.dt = DateTime()
        self.ut = Utils()
        self.kb_break = 'f12'   # 'shift', 'ctrl', ... à votre convenance.
        self.pips = .01 if instrument.endswith('JPY') else .0001
        self.symbol_dir = None
        self.db_candle = None
        self.db_tick = None
        self.setup()

    def setup(self):
        """ Les données peuvent se trouver sur un disque dur SSD autre que celui en cours.
            Le cas échéant, db_path doit être modifié (chemin absolu).
            Il est également possible, pour les besoins des tests, d'avoir plusieurs bases de données.
                |_ Il faudra alors modifier db_path chaque fois que l'on désire switcher. """
        db_path = os.path.dirname(__file__)             # <--- Chemin par défaut, à modifier si autre disque dur.
        self.symbol_dir = os.path.abspath(f'{db_path}/db/{self.symbol}')    # Dossier des fichiers d'historiques.
        os.makedirs(self.symbol_dir, exist_ok=True)
        self.db_candle = DbCandle(self)
        self.db_tick = DbTick(self)

    def _helper(self, key, *l_params):
        """ ************ Collection d'utilitaires. Nombre de paramètres variable. ************ """
        if key == 'nb_weeks_in_year':
            """ Renvoie le nombre de semaines dans l'année {year}. """
            year = l_params[0]   # l_params = [année dont on veut connaître le nombre de semaines].
            if len(l_params) > 1:
                y, num_week, _ = datetime.date.today().isocalendar()
                if y == year:
                    return num_week - 1
            o_dat = self.dt.get_date_from_dtstamp(self.dt.get_dtstamp_from_dtstr(f'{year}-12-28', '%Y-%m-%d'))
            return o_dat.isocalendar()[1]

        elif key == 'l_iso_yw':
            """ Renvoie une liste de tuples ISO (iso_year, iso_str_week) :  <-- exemple de tuple : (2019, '08')
                - Pour la devise en cours et son type (tick ou candle).
                - Toutes les semaines, du début à aujourd'hui.
                - L'année est un int, la semaine une str ('02', '12', ...) """
            b_ticks = l_params[0]       # l_params = [b_ticks].
            st_yw = set()               # Le set() évite les doublons.
            since = 2018 if b_ticks else 2012
            yw_today = datetime.date.today().isocalendar()[:2]
            for year in range(since, yw_today[0] + 1):
                for week in range(1, 54):
                    iso = self.dt.isoyw_from_fxyw(year, week)
                    st_yw.add((iso[0], f'0{iso[1]}'[-2:]))
                    if (year, week) >= yw_today:
                        lt_yw = list(st_yw)
                        lt_yw.sort()
                        return lt_yw

        elif key == 'csv_file':
            """ Renvoie le chemin complet du fichier correspondant à la devise en cours, à l'année et à la semaine. """
            year, week, b_ticks = l_params       # l_params = [*t_yw, b_ticks]
            str_week = f'0{week}'[-2:]
            return os.path.join(self.symbol_dir, str(year), f"{'tick' if b_ticks else 'candle'}_"
                                                            f"{str_week}.csv")
        elif key == 'url_file':
            """ Renvoie l'url du fichier d'histos correspondant à la devise en cours, à l'année et à la semaine. """
            year, str_week, b_ticks = l_params       # l_params = [*t_yw, b_ticks]
            url_candle = 'https://candledata.fxcorporate.com/m1'
            url_tick = 'https://tickdata.fxcorporate.com'
            return f'{url_tick if b_ticks else url_candle}/{self.symbol}/{year}/{int(str_week)}.csv.gz'

    def verify_weeks(self, b_ticks, b_silent=False):
        """ Affichage d'un tableau dans la console (Lignes=années, Colonnes=semaines).
            '▒' ou '▄' ou '▀' = semaine présente, '.' = semaine absente. """

        """ 1 - Listes des semaines réellement existantes en base de données et en fichier .csv. """
        l_weeks_csv, l_weeks_db = self._existing_lists(b_ticks)

        """ 2 - Affichage du titre et entêtes de colonnes. """
        typ = 'Ticks' if b_ticks else 'Candles'
        self.ut.gauge(f"{self.instrument}-{typ}", large=17)
        [self.ut.gauge(char=f"{str(week) :<2}") for week in range(1, 54)]
        self.ut.gauge('end')

        """ 3 - Affichage du contenu - Ordonnées = années, abscisse = semaines. """
        l_weeks = l_weeks_csv + l_weeks_db
        l_weeks.sort()
        first_year = (2018 if b_ticks else 2012) if len(l_weeks) == 0 else l_weeks[0][0]
        year_now = datetime.date.today().isocalendar()[0]
        for year in range(first_year, year_now + 1):
            self.ut.gauge(year, large=17)
            """ Boucle sur les semaines de l'année {year} : 1 à (52 ou 53). """
            for week in range(1, self._helper('nb_weeks_in_year', year, 'today') + 1):  # today limite à aujourd'hui.
                t_yw = year, f'0{week}'[-2:]      # 7 -> 07
                b_csv, b_db = t_yw in l_weeks_csv, t_yw in l_weeks_db
                if not b_csv and not b_db:          # 0 0       /x./y
                    char, color = '. ', 'BLEU'
                elif not b_csv and b_db:            # 0 1       /x.y
                    char, color = '▀ ', 'VERT'
                elif b_csv and not b_db:            # 1 0       x./y
                    char, color = '▄ ', 'VIOLET'
                else:                               # 1 1       x.y
                    char, color = '▒ ', 'BLEU'
                self.ut.gauge(char=char, color=color)
            self.ut.gauge('end')

        """ Vérification de la synchronisation entre la base de données et les fichiers csv. """
        msg = "\nAttention : La base de données n'est pas entièrement synchronisée ! csv + db = rectangles bleus." \
              "\nL'affichage ci-dessus montre les fichiers .csv en carrés-bas mauves, la db en carrés-hauts verts." \
              if not b_silent and l_weeks_csv != l_weeks_db else ''
        self.ut.printc(msg)

    def download_histos(self, nb_weeks, b_ticks):
        if nb_weeks > 1:
            self.ut.printc(f"\nArrêt manuel : garder la touche '{self.kb_break.upper()}' appuyée.\n")

        """ Intervalle des semaines à télécharger (nombres en base 53). """
        """                     |_ since = Depuis. """
        l_weeks_csv, _ = self._existing_lists(b_ticks=b_ticks)      # Semaines = str <-- '08', '09', '10', ...
        l_csv = [(y, int(w)) for (y, w) in l_weeks_csv]             # Semaines = int <-- 8, 9, 10, ...
        for y, w in l_csv:
            if w > 2:
                since = y * 53 + w - 1
                break
        else:
            since = (2018 if b_ticks else 2012) * 53

        y_today, w_today = datetime.date.today().isocalendar()[:2]
        now = y_today * 53 + w_today
        """                     |_ now = Jusqu'à. """

        """ l_required = Liste des fichiers à télécharger. """
        l_required = list()
        for yw53 in range(since, now-1):          # dernière semaine = (now-1) => pas la semaine en cours.
            csv_yw = yw53 // 53, 1 + yw53 % 53
            csv_file = self._helper('csv_file', *csv_yw, b_ticks)
            if not os.path.isfile(csv_file):
                """ Fichier absent localement => à télécharger. """
                l_required.append(csv_yw)

        db = self.db_tick if b_ticks else self.db_candle
        nb_weeks_loaded = 0

        """ Parcours des semaines à traiter. """
        for csv_yw_required in l_required:
            df = self._download_csv_file(csv_yw_required, b_ticks)
            if df is None:
                """ Arrêt manuel demandé. """
                break
            elif df.shape[0] > 0:
                """ 1 - Enregistrement du fichier téléchargé sur disque dur. """
                self._df_to_csv(df, csv_yw_required, b_ticks)

                """ 2 - Écriture du contenu en base de données. break si arrêt manuel demandé. """
                if not db.df_to_table(df):
                    continue

                """ 3 - Update des volumes à partir des ticks. """
                df_stamps = self.db_tick.get_df_stamps(*csv_yw_required)
                if df_stamps.shape[0] > 0:
                    self.db_candle.update_volume(df_stamps)

                """ Arrêt si le nombre de semaines demandées est atteint. """
                nb_weeks_loaded += 1
                if nb_weeks_loaded >= nb_weeks:
                    break

        s = 's' if nb_weeks > 1 else ''
        self.ut.printc(f"Téléchargement{s} terminé{s}.", color='vert')

    def synchro_db_csv(self, b_ticks):
        """ A l'issue de cette méthode, TOUTE la db sera l'image de TOUS les fichiers csv. """
        """ 1 - Base de données. """
        db, typ = (self.db_tick, 'Ticks') if b_ticks else (self.db_candle, 'Candles')

        """ 2 - État actuel des historiques => Différences entre csv et db
                - l_drop = semaines à supprimer de la db, l_add = semaines à ajouter à la db. """
        l_drop, l_add, l_vol = self._lists_to_update_db(b_ticks=b_ticks)

        if l_drop+l_add+l_vol == []:
            self.ut.printc(f"{typ} csv <--> db : OK. La synchronisation est inutile.\n", color='VERT')
            return True

        """ 3 - Message : état initial. """
        for l_weeks in [l_drop, l_add]:
            if len(l_weeks) > 0:
                s = 's sont' if len(l_weeks) > 1 else ' est'
                verb = 'ajouter à' if l_weeks == l_add else 'supprimer de'
                self.ut.printc(f"{len(l_weeks)} semaine{s} à {verb} la base de données '{typ.lower()}'.", color='VERT')

        """ 4 - Ajout de données dans la db. """
        self._add_weeks(l_add, b_ticks)

        """ 5 - Suppression de données de la db. """
        for yw in l_drop:
            db.delete_week(*yw)

        """ 6 - Update des volumes. """
        for yw in l_vol:
            if keyboard.is_pressed(self.kb_break):
                self.ut.printc("Arrêt manuel demandé.")
                break
            df_stamps = self.db_tick.get_df_stamps(*yw)
            self.db_candle.update_volume(df_stamps)

        """ 7 - Vérification. """
        l_drop, l_add, l_vol = self._lists_to_update_db(b_ticks=b_ticks)
        success, color = ('réussi', 'VERT') if l_drop+l_add+l_vol == [] else ('échoué', 'ROUGE')
        self.ut.printc(f'{typ} : La synchronisation a {success}.', color=color)

    def _existing_lists(self, b_ticks):
        """ Retourne les listes de semaines des fichiers csv et des enregistrements en base de données. """
        """ s_weeks_csv n'est pas une list(), mais un set(), pour éviter les doublons. """
        l_weeks_csv, s_weeks_db = list(), set()
        db = self.db_tick if b_ticks else self.db_candle
        l_iso_yw = self._helper('l_iso_yw', b_ticks)            # Liste de toutes les semaines.
        typ = 'Ticks' if b_ticks else 'Candles'
        self.ut.gauge(f"{self.instrument}-{typ} Création listes :", large=0)
        for num, iso_yw in enumerate(l_iso_yw):
            """ Semaines existant en fichiers csv : Pour une semaine iso, on a 0, 1 ou 2 fichiers csv. """
            l_csv_files, last_stamp = self._get_csv_files(*iso_yw, b_ticks)
            csv_size = 0
            if len(l_csv_files) > 0:
                l_weeks_csv.append(iso_yw)
                for csv_file in l_csv_files:
                    csv_size += os.path.getsize(csv_file)
            """ Liste des semaines existant en base de données. """
            csv = last_stamp if b_ticks else csv_size
            if db.week_exists(*iso_yw, csv):
                s_weeks_db.add(iso_yw)             # (1)
            if num % 8 == 0:
                self.ut.gauge(char='.')  # , color='Jaune')
        self.ut.gauge('end')
        self.ut.gauge('end')        # Ligne vide supplémentaire.

        """ Persistance en pkl. """
        l_weeks_db = list(s_weeks_db)
        l_weeks_csv.sort()
        l_weeks_db.sort()
        return l_weeks_csv, l_weeks_db

    def _get_csv_files(self, iso_year, iso_week, b_ticks):
        """ Attention ! Les noms de fichiers csv ne correspondent pas toujours à leur contenu ! """
        """ Return : liste de noms complets de fichiers csv correspondant à (iso_year, iso_week) : 0, 1 ou 2 fichiers.
            fxcorporate ne respecte pas la norme ISO. De plus :
                - D'une année sur l'autre, le nom du fichier csv ne correspond pas à son contenu.
                - On ne trouve pas de correspondance entre diverses devises.
                - Par conséquent, le nom du fichier n'est pas fiable pour trouver le bon N° de semaine.
            La méthode adoptée pour contourner ce problème est la suivante :
                - Recherche de 3 fichiers csv voisins de (iso_year, iso_week) passés en paramètres.
                - Lecture, pour chacun, de la date-heure du milieu du fichier.
                - On en déduit le VRAI N° de semaine ISO.
            Il arrive parfois, au changement d'année, qu'une même semaine soit composée de 2 fichiers csv.
                - C'est pourquoi la valeur de retour est une liste. """
        int_week, l_fxyw = int(iso_week), list()
        last_stamp = 0
        for week in range(int_week - 1, int_week + 1):     # 2 Semaines : précédente, actuelle.
            """ Correction : N° de semaine en base 53. """
            y, w = (iso_year, week) if week > 0 else (iso_year - 1, week + 53)
            y, w = (y, w) if w <= 53 else (y + 1, w - 53)

            """ Les 3 fichiers csv candidats. """
            str_w = f'0{w}'[-2:]
            file_name = f"{'tick' if b_ticks else 'candle'}_{str_w}.csv"  # 'tick_??.csv' ou 'candle_??.csv'.
            csv_path = os.path.abspath(f"{os.path.dirname(self.symbol_dir)}/{self.symbol}/{y}/{file_name}")
            if os.path.isfile(csv_path):
                with open(csv_path, 'r') as fh:
                    """ On 'goûte' les 300 derniers bytes du fichier pour connaitre le N° ISO de semaine. """
                    nb_char = 300
                    fh.seek(0, os.SEEK_END)
                    fh.seek(fh.tell() - nb_char)
                    dt_str = fh.read().split('\n')[-3].split(',')[0]
                last_stamp = self.dt.get_dtstamp_from_dtstr(dt_str)
                o_d = self.dt.get_date_from_dtstamp(last_stamp)                  # Objet date.
                if o_d.isocalendar()[:2] == (iso_year, int_week):               # Filtrage.
                    l_fxyw.append(csv_path)                                     # Candidat sélectionné.

        return l_fxyw, last_stamp   # 0, 1 ou 2 éléments.

    def _download_csv_file(self, csv_yw_required, b_ticks):
        """ Téléchargement, décompactage, décodage, DataFrame, enregistrement sur disque dur.
            - return DataFrame. """

        """ 1 - Demande au helper de construire l'url. """
        url_required = self._helper('url_file', *csv_yw_required, b_ticks)

        """ 2 - Initialisation des bytes. """
        content = b''

        """ 3 - Téléchargement en streaming avec lots de taille {size//50}, permettant l'insertion d'une jauge. """
        y, w = csv_yw_required
        try:
            with requests.get(url_required, stream=True) as req:
                size = int(req.headers['Content-Length'])
                if size < 1_000:
                    return pd.DataFrame()       # return df vide.
                print()
                self.ut.gauge(f"Téléchargement {' ticks ' if b_ticks else 'candles'} {self.symbol} ({y}, {w})")
                for chunk in req.iter_content(chunk_size=size//50):
                    if keyboard.is_pressed(self.kb_break):
                        self.ut.printc("Arrêt manuel demandé.")
                        self.ut.gauge('end')
                        return None
                    content += chunk
                    """ 3.1 - Jauge de progression. """
                    self.ut.gauge()
                self.ut.gauge('end')
        except (Exception,):
            self.ut.printc('Connexion internet interrompue.')
            return None

        """ 4 - Décompactage du buffer de données binaires (codées utf), en mémoire. """
        buf = BytesIO(content)
        f = gzip.GzipFile(fileobj=buf)
        coded_bytes = f.read()

        """ 5 - Décodage : utf-8 pour les candles, utf-16 pour les ticks. """
        codec, type_data = ('utf-16', 'tick') if b_ticks else ('utf-8', 'candle')
        decoded_bytes = coded_bytes.decode(codec)

        """ 6 - Conversion Bytes -> String """
        decoded_str = StringIO(decoded_bytes)

        """ 7 - String to DataFrame. """
        df = pd.read_csv(decoded_str)  # Pas de colonne-index (index_col=0 retiré)

        """ 8 - Retourne le dataframe. """
        return df

    def _df_to_csv(self, df, t_yw, b_ticks):
        """ Contrôle préalable : si le dossier-cible n'existe pas, on le crée. """
        csv_path = self._helper('csv_file', *t_yw, b_ticks)
        os.makedirs(os.path.dirname(csv_path), exist_ok=True)
        df.to_csv(csv_path, index=False)        # Copie du fichier .csv sur disque dur.

    @staticmethod
    def _csv_to_df(csv_file):
        """ Conversion du fichier {csv_file} en dataframe. """
        return pd.read_csv(csv_file) if os.path.isfile(csv_file) else pd.DataFrame()

    def _lists_to_update_db(self, b_ticks):
        """ Retourne 3 listes : l_drop, l_add et l_vol.
        Algorithme pour l_drop et l_add :
        - (1) = l_weeks_db : Liste des semaines réellement existantes en base de données.
        - (2) = l_weeks_csv : Liste des semaines réellement existantes en fichiers csv.
        - Intersection (Inter) = (1) ⋂ (2)
        - l_drop : Liste des semaines à supprimer de la db (celles qui n'existent pas en csv) : (1) - (Inter)
        - l_add : Liste des semaines à ajouter dans la db (celles des csv qui n'existent pas en db) : (2) - (Inter)
        --------------------------------
        l_vol  = liste des semaines (yw) de candles nécessitant un update du champ 'Volume' dans les tables.
                 Sans objet pour les ticks.
        """

        """ 1 et 2 - Listes des semaines réellement existantes en base de données et en fichier .csv. """
        l_weeks_csv, l_weeks_db = self._existing_lists(b_ticks=b_ticks)

        """ 3 - Liste des semaines à supprimer de la db (celles qui n'existent pas en csv). """
        l_drop = list()
        for t_yw in l_weeks_db:
            if t_yw not in l_weeks_csv:
                # print('A supprimer :', t_yw)    # MAP : Permet de lister les semaines qui sont à supprimer.
                l_drop.append(t_yw)

        """ 4 - Liste des semaines à ajouter à la db (celles des csv qui n'existent pas en db). """
        l_add = list()
        for t_yw in l_weeks_csv:
            if t_yw not in l_weeks_db:
                # print('A ajouter :', t_yw)      # MAP : Permet de lister les semaines qui sont à ajouter.
                l_add.append(t_yw)

        """ 5 - Liste des semaines de ticks pour lesquelles les volumes des candles sont à updater. """
        l_vol = list()
        if not b_ticks:     # Renkos non concernés.
            """ Semaines ticks existantes (b_ticks=True). """
            _, l_weeks_tick_db = self._existing_lists(b_ticks=True)
            for yw in l_weeks_tick_db:
                if yw in l_weeks_db:
                    if not self.db_candle.is_volume_in_week(*yw):
                        l_vol.append(yw)

        return l_drop, l_add, l_vol

    def _add_weeks(self, l_yw, b_ticks):
        """
        @param l_yw : Liste des semaines à ajouter à la base de données.
        @param b_ticks: True = ticks, False = candles.
        """
        db = self.db_tick if b_ticks else self.db_candle
        self.ut.printc(f"\nArrêt manuel : garder la touche '{self.kb_break.upper()}' appuyée.\n")
        for iso_yw in l_yw:
            if keyboard.is_pressed(self.kb_break):
                self.ut.printc("Arrêt manuel demandé.")
                break
            l_csv_files, _ = self._get_csv_files(*iso_yw, b_ticks)      # Liste de 0, 1 ou 2 fichiers.
            for csv_file in l_csv_files:
                df = self._csv_to_df(csv_file)
                if df.shape[0] > 0:
                    db.df_to_table(df)

    def get_stamps(self, table_name, nb_stamps):
        db = self.db_tick if (table_name == 'Ticks' or table_name[0] == 'r') else self.db_candle
        return db.get_last_stamps(table_name, nb_stamps)

    def get_datas(self, table_name, final_stamp, nb_points):
        b_ticks = table_name == 'Ticks' or table_name[0] == 'r'     # Ticks ou Renko
        db = self.db_tick if b_ticks else self.db_candle
        return db.get_datas(table_name, final_stamp, nb_points)

    """ *************************** Interface Trading <--> Base de données. *************************** """
    @staticmethod
    def custom_ta():
        """ Indicateurs d'analyse technique personnalisés (dans /functions/custom_ta). """
        path = os.path.abspath(__file__)
        ta_dir = os.path.join(path.split(os.path.join('', 'trading', ''))[0], 'functions', 'custom_ta')
        create_dir(ta_dir)  # Création automatique de 9 sous-dossiers (peut être supprimé après le 1er passage).
        import_dir(ta_dir, verbose=False)

    def slice_tick(self, pc_pos, nb_rows):
        """
        @param pc_pos: pourcentage -> 0 à 100.
        @param nb_rows: Nombre d'enregistrements demandés -> si > 0, enr après pc_pos ; si < 0, enr avant pc_pos.
        @return: pandas DataFrame.
        """
        return self.db_tick.slice_tick(pc_pos, nb_rows)

    def slice_renko(self, mesh, pc_from, pc_to, nb_rows):
        return self.db_tick.slice_renko(mesh, pc_from, pc_to, nb_rows)

    def slice_candle(self, period, pc_from, pc_to, nb_rows):
        return self.db_candle.slice_candle(period, pc_from, pc_to, nb_rows)

    def get_pilot(self, table=None, pc_from=None, pc_to=None, nb_rows=10_000, **kwargs):
        """ - Récupération des {nb_rows} points AVANT {pc_to} (en %) ou APRES {pc_from} (en %). <- pc_ comme 'pourcent'
            - Par défaut, les 10_000 derniers points. pc_to=100 signifie la fin du fichier, pc_to=50 le milieu. """

        self.custom_ta()
        if not (pc_from is None or pc_to is None):
            nb_rows = None
        if isinstance(table, int):
            df = self._df_renko(table, pc_from, pc_to, nb_rows)
        elif isinstance(table, str):
            df = self._df_candle(table, pc_from, pc_to, nb_rows)
        else:                       # elif table is None:
            if nb_rows is None:
                raise SystemExit("Ticks : Restriction à cause du trop grand nombre de points."
                                 "\nSélectionner pc_from OU pc_to (pas les deux), et nb_rows.")
            pc_pos = pc_from if isinstance(pc_from, (int, float)) else pc_to if isinstance(pc_to, (int, float)) else 50
            nb_rows = -nb_rows if pc_pos == pc_to else nb_rows
            df = self._df_tick(pc_pos, nb_rows)
        return df

    def _df_tick(self, pc_pos=50, nb_rows=50_000):
        df = self.slice_tick(pc_pos=pc_pos, nb_rows=nb_rows)

        """ Uniformisation des colonnes. """
        df['Open'] = (df['Bid'] + df['Ask']) / (2 * self.pips)
        df['High'] = (df['Bid'] + df['Ask']) / (2 * self.pips)
        df['Low'] = (df['Bid'] + df['Ask']) / (2 * self.pips)
        df['Close'] = (df['Bid'] + df['Ask']) / (2 * self.pips)
        df['Volume'] = 1
        df['Spread'] = (df['Ask'] - df['Bid']) / self.pips

        return df[['timestamp', 'Open', 'High', 'Low', 'Close', 'Volume', 'Spread']]  # Tri des colonnes.

    def _df_renko(self, table, pc_from, pc_to, nb_rows):
        """
        @param table: Maille 1, 2, 3, 4, 5, 7, 10, 14 ou 20.
        @param pc_from: En % sur toute l'étendue des données.
        @param pc_to: En % sur toute l'étendue des données.
        @param nb_rows: nb_enr = abs(nb_rows). Si > 0, nb_enr depuis pc_from, si < 0, nb_enr jusqu'à pc_to.
        @return: NA
        """
        try:
            df = self.slice_renko(table, pc_from=pc_from, pc_to=pc_to, nb_rows=nb_rows)
            if df.empty:
                raise SystemExit(f"La table {self.instrument} de maille {table} est vide.")
            df['Volume'] = df['Volume'].fillna(0)   # Remplace les nan par des zéros.

            """ Uniformisation Candles-Renko : Colonne 'Renko' renommée en 'Close'. """
            df = df.rename(columns={'Renko': 'Close'})
            df['Open'] = df['Close'].shift(1)
            df['High'] = df[['Close', 'Open']].max(axis=1) + table
            df['Low'] = df[['Close', 'Open']].min(axis=1) - table
            return df[['timestamp', 'Open', 'High', 'Low', 'Close', 'Volume', 'Spread']]  # Tri des colonnes.
        except (ValueError,) as err:
            self.ut.printc(str(err))
            return pd.DataFrame()

    def _df_candle(self, table, pc_from, pc_to, nb_rows):
        """
        @param table: 'm1', 'm5', 'm15', 'm30', 'h1', 'h4', 'day', 'week'
        @param pc_from:
        @param pc_to:
        @param nb_rows: nb_enr = abs(nb_rows). Si > 0, nb_enr depuis pc_from, si < 0, nb_enr jusqu'à pc_to.
        @return: NA
        """
        try:
            df = self.slice_candle(table, pc_from=pc_from, pc_to=pc_to, nb_rows=nb_rows)
            if df.empty:
                raise SystemExit(f"La table {self.instrument} de période {table} est vide.")
            df['Volume'] = df['Volume'].fillna(0)

            """ Uniformisation Candles-Renko : Ajout de la colonne 'Close', moyenne de 'BidClose' et 'AskClose'. """
            df['Open'] = (df['BidOpen'] + df['AskOpen']) / (2 * self.pips)
            df['High'] = (df['BidHigh'] + df['AskHigh']) / (2 * self.pips)
            df['Low'] = (df['BidLow'] + df['AskLow']) / (2 * self.pips)
            df['Close'] = (df['BidClose'] + df['AskClose']) / (2 * self.pips)
            df['Spread'] = (df['BidClose'] - df['AskClose']) / self.pips
            """ Filtrage et tri des colonnes. """
            return df[['timestamp', 'Open', 'High', 'Low', 'Close', 'Volume', 'Spread']]
        except (ValueError,) as err:
            self.ut.printc(str(err))
            return pd.DataFrame()


def main():  # MAP ***** MAP ***** MAP ***** MAP ***** MAP ***** MAP ***** MAP *****
    """ Code de MAP (mise au point). Espace expérimental : ajouter, supprimer, commenter, décommenter, ... """
    h = CtrlHistos('EUR/JPY')
    # h = CtrlHistos('USD/JPY')
    # h = CtrlHistos('GBP/USD')

    # """ Candles. """
    # h.download_histos(100, b_ticks=False)
    # h.synchro_db_csv(b_ticks=False)
    # h.verify_weeks(b_ticks=False)

    """ Ticks. """
    h.download_histos(5, b_ticks=True)
    h.synchro_db_csv(b_ticks=True)
    h.verify_weeks(b_ticks=True)


if __name__ == '__main__':
    main()

 

""" Version 2022-03-05 """
# Imports externes.
import numpy as np
import seaborn as sns
from matplotlib import pyplot as plt
from matplotlib.ticker import FormatStrFormatter
import matplotlib.animation
import copy

# Imports internes.
from functions.utils import Dictionary
from trading.historiques.ctrl_histos import CtrlHistos


# noinspection PyUnusedLocal
class ShowGeek:
    def __init__(self):  # , **args):
        super().__init__()      # Appelle Genetic.__init__(), 2ème classe héritée de la classe dérivée.
        self.fig = plt.figure(1)
        self.df_pilot = None
        self.df_scats = None
        self.nb_datas = 0
        self.l_ax = None
        self.pips = 0

        """ Animation. """
        self.b_anim = True
        self.abscissa_size = 0
        self.pointer = 0
        self.b_paused = False
        self.b_reverse = False
        self.magn = 1  # Appui sur les flèches : Le pas d'avancement est : 1, 10 ou 100.
        self.od = None

        """ Signal pilote. """
        self.od = Dictionary(self.set_heights())
        self.get_df_pilot()

    def get_args(self):
        args = self.central_args('ui')
        return dict(  # Ces valeurs seront à la racine du super-dictionnaire self.od.
            geometry=args.read('geometry', (100, 40, 1000, 700)),  # x, y, w, h.
            margins=args.read('margins', (6, 8, 10, 8)),  # Marges : haut, droite, bas, gauche.
            abscissa_size=args.read('abscissa_size', 600),  # Nb de points affichés en abscisse.
            window_title=args.read('window_title', "Modèle simple"),  # Titre de la fenêtre.
            figure_title=args.read('figure_title', "Modèle simple - Ne pas modifier"),  # Titre des graphiques.
            leader_lines=args.read('leader_lines', True),  # Lignes hortogonales de repère, suivi de la souris.
            subplots=args.read('subplots', dict(  # Noms et hauteurs (en %) des graphiques modifiables.
                Principal=65,  # Subplot principal : NE PAS MODIFER SON ORTHOGRAPHE.
                Milieu='',  # '' : Si chaîne vide => Les hauteurs seront automatiquement calculées et réparties.
                Bas='',
            )),
            show_pilot=args.read('show_pilot', True),
            show_volume=args.read('show_volume', False),
            show_linked_label=args.read('show_linked_label', True),
            best_zones=dict(
                show=args.read(['best_zones', 'show'], False),
                gap=args.read(['best_zones', 'gap'], 20),  # Nombre de pips take-profit ou stop-loss.
                bandwidth=args.read(['best_zones', 'bandwidth'], 80),  # Pourcentage : de 0 à 100.
                up=args.read(['best_zones', 'up'], True),  # Affichage des palliers haut.
                down=args.read(['best_zones', 'down'], True),  # Affichage des palliers bas.
                scatters=args.read(['best_zones', 'scatters'], True),  # Affichage des optimums sous forme de ronds.
                confirm=args.read(['best_zones', 'confirm'], True),
                zig_zag=args.read(['best_zones', 'zig_zag'], True),  # Affichage de la courbe zig-zag.
                colors=args.read(['best_zones', 'colors'], ('#ffff0030', '#ff00ff10')),  # Coloriage des ouvertures.
            ),
            animation=args.get('animation', True),
            # Paramètres généraux supplémentaires ici ...
            # Paramètres seaborn : https://www.python-simple.com/python-seaborn/seaborn-general.php
        )

    def show_ui(self):
        """ ****************** Algorithme de construction ****************** """
        """ Ajout des indicateurs dans la dataframe {df_pilot} <-- Colonnes ajoutées à {df_pilot}. """
        self.add_indics()

        """ Ajout du signal pilote. """
        self.add_pilot()

        """ Paramètres avant la création des graphiques (axes). """
        self.pre_params()

        """ Distribution des signaux (des colonnes) : un dataframe par axe dans {self.l_ax}. """
        self.distrib()

        """ Création des graphiques (axes). """
        self.build_axis()

        """ Paramètres après la création des graphiques (axes). """
        self.post_params()

        """ Affichage animé. """
        self.show()

    def animate(self, _=''):
        """ Appelé dans la boucle matplotlib.FuncAnimation depuis self.show(). """
        def get_line(_line_name):
            _from, _to = x[0], x[0] + len(x)
            return self.df_pilot[_line_name][_from: _to]

        if self.b_paused:
            return

        """ Gestion du pointeur. """
        self.set_pointer()

        """ Abscisse commune : liste de {abscissa_size} valeurs depuis {pointer}. """
        x = list(range(self.pointer, self.pointer + self.abscissa_size))

        """ Parcours des graphiques (axes). """
        for axis_name in self.od.keys():
            if axis_name == 'figure':
                continue
            o_ax = self.od.read([axis_name, 'o_ax'])
            if o_ax is None:
                continue
            df = self.od.read([axis_name, 'df'])
            y_min, y_max = 10 ** 9, -10 ** 9
            if not (df is None or df.empty):
                """ Parcours des courbes (lines). """
                for o_line in self.get_axis_lines(axis_name):
                    """ Plusieurs courbes (lines) dans un graphique (o_ax). """
                    line_name = o_line.get_label()
                    serie = df[line_name]
                    y = serie[self.pointer: self.pointer + self.abscissa_size]

                    """ Égalisation des tailles de x et y. """
                    len_min = min(len(x), len(y))
                    x, y = copy.copy(x[:len_min]), copy.copy(y[:len_min])

                    """ Hook pour l'injection de paramètres dans cette courbe. """
                    d_attr = self.hook_line_anim(axis_name, line_name, x, y, o_ax, df, o_line)
                    if d_attr is None:
                        d_attr = {}

                    """ Injection des datas à afficher. """
                    if d_attr.get('visible', True):
                        o_line.set_ydata(y)
                        o_line.set_xdata(x)

                    if not np.isnan(y).all():
                        y_min = min(y_min, np.nanmin(y))
                        y_max = max(y_max, np.nanmax(y))

            """ Calcul des limites des ordonnées (y). """
            if y_max < y_min:
                y_min, y_max = -10, 10
            y_padding = max(0, (y_max - y_min) * 0.05)  # Marges top et bottom dans les axes (5%)
            y_min, y_max = (round(y_min - y_padding, 6), round(y_max + y_padding, 6)) if y_max >= y_min else (-10, 10)
            y_min = max(y_min, -10 ** 9)
            y_max = min(y_max, 10 ** 9)
            """ Limites x et y. """
            o_ax.set_xlim(x[0], x[-1])
            if y_min == y_max:
                y_min = y_max = None
            o_ax.set_ylim(y_min, y_max)
            self.hook_axis_anim(axis_name, x, o_ax, df, y_min, y_max, get_line)      # l_hook = [axis_name, x, o_ax, df]
        if not self.b_anim:
            plt.draw()

    def set_pointer(self):
        if self.b_reverse:
            self.pointer -= self.magn
            if self.pointer < 0:
                self.pointer = self.nb_datas - self.abscissa_size
        else:
            self.pointer += self.magn
            if self.pointer > self.nb_datas - self.abscissa_size:
                self.pointer = 0

    def add_pilot(self):
        """ Affichage conditionnel du signal-pilote. """
        self.od.write(['Principal', 'signals', 'Close', 'visible'], self.od.read(['figure', 'show_pilot'], False))
        self.add_df('Principal', ['Close'])

    def pre_params(self):
        """ Fenêtre : taille, position, titre. """
        mgr = plt.get_current_fig_manager()
        mgr.set_window_title(self.od.read(['figure', 'window_title'], 'Stratégie'))
        win = mgr.window
        win.setGeometry(*self.od.read(['figure', 'geometry']))

        """ Paramètres seaborn : https://www.python-simple.com/python-seaborn/seaborn-general.php """
        sns.set_context(self.od.read(['figure', 'seaborn_context'], 'paper'))  # paper, notebook, talk, poster
        sns.set_style(self.od.read(['figure', 'seaborn_style'], 'darkgrid'))  # white, dark, whitegrid, darkgrid, ticks

        """ Ini attributs. """
        self.abscissa_size = self.od.read(['figure', 'abscissa_size'], 600)
        self.b_anim = self.od.read(['figure', 'animation'], False)

        """ Affichage des best-zones dans le graphique (subplot) contenant le signal 'Close'. """
        subplot_name = self.od.read(['figure', 'best_zones', 'subplot'])
        if self.od.read(['figure', 'best_zones', 'show'], False) and subplot_name:
            """ Appel de l'indicateur de trend {best_zones}. """
            gap = self.od.read(['figure', 'best_zones', 'gap'], 24)
            bandwidth = self.od.read(['figure', 'best_zones', 'bandwidth'], 80)
            self.df_pilot[['Up', 'Down', 'Peak']], self.df_scats = self.df_pilot.ta.best_zones(gap=gap,
                                                                                               bandwidth=bandwidth)
            """ Affichage des palliers haut et bas. """
            l_columns = list()
            if self.od.read(['figure', 'best_zones', 'up'], False):
                l_columns.append('Up')
            if self.od.read(['figure', 'best_zones', 'down'], False):
                l_columns.append('Down')
            self.add_df(subplot_name, l_columns)

    def post_params(self):
        """ Recherche des paramètres dans le dictionnaire. """

        """ Titre des graphiques. """
        axis_title = self.od.read(['figure', 'figure_title'])
        if isinstance(axis_title, str) and len(axis_title) > 0:
            o_ax = self.get_first_axis()
            if self.is_axis(o_ax):
                o_ax.set_title(axis_title)

        """ y axis : position et visibilité des graduations (ticks). """
        for l_keys in self.od.key_list():
            if l_keys[-1] != 'y_ticks':
                continue
            position = self.od.read(l_keys)
            o_ax = self.od.read([l_keys[0], 'o_ax'])
            if not self.is_axis(o_ax):
                continue
            if position is False:
                o_ax.axes.set_yticks([])  # Suppression (ticks + ticklabels + grille).
                pass
            else:
                o_ax.yaxis.set_ticks_position(position)  # left, right

        """ Suppression de la grille. """
        for l_keys in self.od.key_list():
            if l_keys[-1] != 'grid':
                continue
            if self.od.read(l_keys) is False:
                o_ax = self.od.read([l_keys[0], 'o_ax'])
                o_ax.grid(False)  # Suppression de la grille.

        """ x axis : visibilité des graduations (ticks) """
        for l_keys in self.od.key_list():
            if l_keys[-1] != 'x_ticks':
                continue
            o_ax = self.od.read([l_keys[0], 'o_ax'])
            if not self.is_axis(o_ax):
                continue
            if self.od.read(l_keys) is False:
                o_ax.tick_params(axis='x', colors='#fff0')  # '#fff0' = invisible

        """ Légendes. """
        for axis_name in self.get_axis_names():
            o_ax = self.od.read([axis_name, 'o_ax'])
            if self.get_axis_lines(axis_name):
                legend = self.od.read([axis_name, 'legend'])
                pos = ['auto', 'rh', 'lh', 'lb', 'rb', 'rm', 'lm', 'rm', 'cb', 'ch', 'cm']
                if isinstance(legend, bool):
                    o_ax.legend().set_visible(legend)  # Visibilité de la légende.
                elif legend in pos:
                    o_ax.legend(loc=pos.index(legend)).set_title(axis_name)
                else:
                    o_ax.legend().set_title(axis_name)

    def build_axis(self):
        """ Construction de la fenêtre Windows. """
        """ Marges générales et répartition des subplots. """
        l_margins = self.od.read(['figure', 'margins'], (6, 8, 10, 8))  # Marges : haut, droite, bas, gauche.
        x, w = l_margins[3] / 100, 1 - (l_margins[1] + l_margins[3]) / 100
        y_offset, y_height = l_margins[0], 100 - l_margins[0] - l_margins[2]  # Valeurs : 0 à 100.
        ax_top, y = 100, 1 - y_offset / 100
        first_axis, axis_ante = '', ''
        for axis_name, h in self.od.read(['figure', 'subplots']).items():
            o_ax = None
            if isinstance(h, (int, float)):
                if first_axis == '':
                    first_axis = axis_name
                else:
                    self.od.write([axis_ante, 'x_ticks'], False)
                axis_ante = axis_name
                h = h * y_height / 10_000
                y -= h
                o_ax = self.fig.add_axes((x, y, w, h), sharex=self.od.read([first_axis, 'o_ax']))
                self.od.write([axis_name, 'geometry'], (x, y, w, h))
            else:
                """ Graphique jumelé (twined axis). """
                tw = h.split()
                if tw[0] == 'twinned':
                    """ Axes jumelés. """
                    parent_name = tw[-1]
                    parent_axis = self.od.read([parent_name, 'o_ax'])
                    if self.is_axis(parent_axis):
                        o_ax = parent_axis.twinx()
            if self.is_axis(o_ax):
                self.od.write([axis_name, 'o_ax'], o_ax)
                """ Ajout artificiel d'attributs vides. """
                o_ax.filled = list()
                o_ax.note_x = o_ax.annotate('', (0, 0))
                o_ax.note_y = o_ax.annotate('', (0, 0))

                """ Repères : lignes hortogonales sous le curseur de la souris. Voir on_mouse_move()  """
                o_ax.hline = o_ax.axhline(y=-1000, color='k', lw=.2, ls='--')  # -1000 = Souris hors figure.
                o_ax.vline = o_ax.axvline(x=-1000, color='k', lw=.2, ls='--')

                """ Format des graduations y. """
                o_ax.yaxis.set_major_formatter(FormatStrFormatter('%.1f'))

                """ Initialisation des courbes (DataFrames) dans les graphiques (axes). """
                df = self.od.read([axis_name, 'df'])
                if axis_name == self.od.read(['figure', 'best_zones', 'subplot']) and self.is_df(self.df_scats):
                    if self.od.read(['figure', 'best_zones', 'scatters'], False):
                        sns.scatterplot(data=self.df_scats, x='indx', y='Close', hue=axis_name, ax=o_ax)
                    if self.od.read(['figure', 'best_zones', 'confirm'], False):
                        sns.scatterplot(data=self.df_scats, x='confirm_x', y='confirm_y', ax=o_ax, hue='c_type',
                                        alpha=.3)
                    if self.od.read(['figure', 'best_zones', 'zig_zag'], False):
                        sns.lineplot(data=self.df_scats, x='indx', y='Close', ax=o_ax, lw=.5)
                    o_ax.set_ylabel('')

                """ Lines. """
                if self.is_df(df) and len(df.columns) > 0:
                    sns.lineplot(data=df[:1], ax=o_ax)

        """ Position (left, right) des graduations y. """
        b_odd = False
        l_axes = self.get_axis_names()
        if self.od.read(['Volume', 'twined_axis']) == 'Principal':
            b_odd = bool(l_axes.index('Principal') % 2)  # Position impaire de 'Principal'.
        for indx, axis_name in enumerate(l_axes):
            self.od.write([axis_name, 'y_ticks'], 'left' if b_odd == indx % 2 else 'right')

        """ Coloriage vertical conditionnel des zônes d'entrée en position (ouverture de trade). """
        b_zones = self.od.read(['figure', 'best_zones', 'show'], False)
        colors = self.od.read(['figure', 'best_zones', 'colors'])
        if b_zones and colors is not None:
            peaks = self.df_pilot['Peak']
            fc_b, fc_t = colors
            l_args = [
                dict(x=peaks.index, y1=10 ** 5, y2=-10 ** 5, where=peaks <= -1, fc=fc_b, ec='#fff0', interpolate=True),
                dict(x=peaks.index, y1=10 ** 5, y2=-10 ** 5, where=peaks >= 1, fc=fc_t, ec='#fff0', interpolate=True),
            ]
            for ax_name in l_axes:
                oax = self.o_ax(ax_name)
                if self.is_axis(oax):
                    self.fill_between(oax, l_args)

    def set_heights(self):
        """ Calcul des hauteurs (en %) subplots. La somme fait 100%. """
        args = self.get_args()

        if not isinstance(args['subplots'], dict):
            raise SystemExit("Super-dictionnaire : La clé 'subplots' doit exister et doit contenir un dictionnaire. ")
        l_heights, sum_heights, nb_zeros = list(), 0, 0
        for name, pc_height in args['subplots'].items():
            height = pc_height if (isinstance(pc_height, int) and pc_height > 0) else 0
            nb_zeros += 1 if height == 0 else 0
            sum_heights += height
            l_heights.append((name, height))

        if sum_heights > 100:
            """ La somme des hauteurs dépasse 100% : on effectue une réduction proportionnelle. """
            l_heights = [(name, pc_height) for (name, pc_height) in l_heights if pc_height > 0]  # Suppression des 0.
            l_heights = [(name, pc_height * 100 / sum_heights) for (name, pc_height) in l_heights]  # Normalisation.
        else:
            """ Si la somme == 100%, on ne fait rien. Si elle est < 100%, on ajoute un subplot, nommé 'rest'. """
            if nb_zeros == 0:
                l_heights.append(('rest', 0))
                nb_zeros = 1
            h_rest = (100 - sum_heights) / nb_zeros
            if h_rest == 0:
                l_heights = [(name, pc_height) for (name, pc_height) in l_heights if pc_height > 0]  # Suppression des 0
            for i, (name, pc_height) in enumerate(l_heights):
                if pc_height == 0:
                    l_heights[i] = (name, h_rest)
        args['subplots'] = dict(l_heights)

        """ Ajout d'arguments par défaut. """
        if 'seaborn_context' not in args:  # Valeurs possibles : paper, notebook, talk, poster
            args['seaborn_context'] = 'paper'
        if 'seaborn_style' not in args:  # Valeurs possibles : white, dark, whitegrid, darkgrid, ticks
            args['seaborn_style'] = 'darkgrid'

        return {'figure': args}

    def add_df(self, subplot_name, l_columns, df=None):
        """ Création du dataframe qui sera associé à un graphique.
                - Méthode appelée par la méthode distrib() de la classe dérivée.
                - Le DataFrame {df} a plusieurs colonnes, chacune associée à une courbe. """

        """ Vérification. """
        if self.od.read(['figure', 'subplots', subplot_name]) is None:
            raise SystemExit(f"Erreur dans self.distrib() : Le subplot {subplot_name} n'existe pas.")

        l_cols = list()
        for column in l_columns:
            """ Suppression des colonnes inexistantes. """
            try:
                self.df_pilot[column] if df is None else df[column]
            except KeyError:
                """ Si la colonne n'existe pas ... """
                print(f"La colonne '{column}' n'existe pas dans le DataFrame.")
                continue
            l_cols.append(column)

        """ Fusion par ajout de colonnes dans le df existant. """
        df = self.df_pilot[l_cols] if df is None else df[l_cols]
        df_exist = self.od.read([subplot_name, 'df'])
        if df_exist is not None:
            df_exist[df.columns] = df
            df = df_exist  # On retrouve les colonnes de df_exist en premières places.

        if not self.is_df(df):
            return

        self.od.write([subplot_name, 'df'], df)

        for column in df.columns:
            if column.lower() == 'close':
                """ Si ce dataframe a une colonne 'Close', on paramètre 3 traitements distincts par défaut :
                        - Une étiquette-suiveuse (linked_label).
                        - Un graphique jumelé (twined_axis) contenant l'affichage du volume en semi-transparence.
                        - Éléments du graphique jumelé non affichés : y_ticks, y_ticks_labels, grille, légende. """
                self.od.write(['figure', 'best_zones', 'subplot'], subplot_name)
                if self.od.read(['figure', 'show_linked_label'], False):
                    self.od.write([subplot_name, 'linked_label'], 'Close')
                if self.od.read(['figure', 'table_name'], None) is None:
                    self.od.write(['figure', 'show_volume'], False)
                if self.od.read(['figure', 'show_volume'], False):
                    self.od.write(['figure', 'subplots', 'Volume'], f'twinned with {subplot_name}')
                    self.od.write(['Volume', 'df'], self.df_pilot[['Volume']])
                    self.od.write(['Volume', 'signals', 'Volume', 'visible'], False)
                    self.od.write(['Volume', 'twined_axis'], subplot_name)
                    self.od.write(['Volume', 'y_ticks'], False),  # Suppression (ticks + ticklabels + grille).
                    self.od.write(['Volume', 'legend'], False)  # Suppression de la légende.

    def get_df_pilot(self):
        od_pilot = self.central_args('pilot')
        instrument = od_pilot.read('instrument', 'EUR/USD')
        self.pips = .01 if instrument.endswith('JPY') else .0001
        table_name = od_pilot.read('table', 10)
        self.df_pilot = CtrlHistos(instrument).get_pilot(**od_pilot)  # DataFrame
        self.nb_datas = self.df_pilot.shape[0]
        self.od.write(['figure', 'instrument'], instrument)
        self.od.write(['figure', 'table_name'], table_name)
        self.od.write(['figure', 'nb_points'], self.nb_datas)

    def linked_label(self, axis_name, x, o_ax, df):
        """ https://matplotlib.org/stable/tutorials/text/annotations.html#annotating-with-text-with-box """
        linked_label = self.od.read([axis_name, 'linked_label'])
        if linked_label:
            bbox = {'boxstyle': 'larrow', 'fc': '#ff02', 'ec': 'k', 'lw': .5}  # fc=face color, ec=edge color
            indx = min(x[-1], df.shape[0] - 1)
            last_value = round(df[linked_label][indx], 2)  # En pips.
            """ Affichage de l'étiquette actuelle (m = marge). """
            o_ax.note_y.remove()  # Effacement de l'étiquette précédente.
            m = self.abscissa_size / 70
            o_ax.note_y = o_ax.annotate(last_value, (x[-1], last_value), xytext=(x[-1] + m, last_value), bbox=bbox)

    def paint_volume(self, x, o_ax, df):
        y = df['Volume'][x[0]: x[-1] + 1] * .8  # Affichage du volume à 80% d'amplitude.
        l_args = [
            dict(x=x, y1=y, y2=0, fc='#0079a333', ec='#fff0')
        ]
        self.fill_between(o_ax, l_args)

    def fill_between(self, o_ax, l_params):
        """ Colorie entre 2 valeurs, conditionnellement.
        @param o_ax: Axis matplotlib : graphique contenant des courbes.
        @param l_params: Liste de coloriages. Chaque élément est un dictionnaire personnalisé. Voir lien ci-dessous.
        https://matplotlib.org/stable/api/_as_gen/matplotlib.axes.Axes.fill_between.html """

        """ Effacement du coloriage antérieur, nécessaire si les couleurs ont une transparence. """
        if not self.is_axis(o_ax):
            raise SystemExit(f"Erreur dans self.fill_between() : L'objet o_ax ({o_ax}) n'est pas un axis.")

        for fill in o_ax.filled:
            o_ax.collections.remove(fill)
        o_ax.filled.clear()

        """ l_fill contient autant de paramètres que de coloriages à effectuer. """
        for p in l_params:
            """ Égalisation des vecteurs. """
            if p.get('x') is None:
                p['x'] = range(self.df_pilot.shape[0])
            len_min = len(p['x'])
            if self.is_df(p['y1']):
                len_min = min(len_min, len(p['y1']))
            if self.is_df(p['y2']):
                len_min = min(len_min, len(p['y2']))
            p['x'] = p['x'][:len_min]

            """ Coloriage. """
            o_ax.filled.append(o_ax.fill_between(**p))

    """ *********************************** Helpers. ************************************ """

    def trace_hline(self, axis_name, y, **kwargs):
        o_ax = self.o_ax(axis_name)
        if self.is_axis(o_ax):
            o_ax.axhline(xmin=0, xmax=1, y=y, **kwargs)

    def trace_vline(self, axis_name, x, **kwargs):
        o_ax = self.o_ax(axis_name)
        if self.is_axis(o_ax):
            o_ax.axvline(ymin=0, ymax=1, x=x, **kwargs)

    @staticmethod
    def is_axis(o_ax):
        return o_ax.__class__.__name__ == 'Axes'

    @staticmethod
    def is_df(df):
        return df.__class__.__name__ in ['DataFrame', 'Series']

    def get_axis_names(self, b_twin=False):
        if b_twin:
            """ Liste de noms de tous les graphiques (subplots) de base et twined_axis. """
            return [ax_name for ax_name in list(self.od.read(['figure', 'subplots']).keys()) if ax_name != 'rest']
        else:
            """ Liste de noms de tous les graphiques (subplots) de base. """
            return [ax_name for (ax_name, val) in list(self.od.read(['figure', 'subplots']).items())
                    if ax_name != 'rest' and not str(val).startswith('twinned')]

    def get_first_axis(self):
        l_axes = self.get_axis_names()
        o_ax = self.o_ax(l_axes[0])
        if self.is_axis(o_ax):
            return o_ax

    def get_last_axis(self):
        l_axes = self.get_axis_names()
        o_ax = self.o_ax(l_axes[-1])
        if self.is_axis(o_ax):
            return o_ax

    def _get_all_ax(self):
        l_ax = list()
        for key, val in self.od.read(['figure', 'subplots']).items():
            l_ax.append(self.od.read([key, 'o_ax']))
        return l_ax

    def get_axis_lines(self, axis_name):
        """ Retourne une liste d'objets o_line. """
        o_ax = self.o_ax(axis_name)
        l_olines = list()
        for o_line in o_ax.lines:
            """ Plusieurs courbes (lines) dans un graphique (o_ax). """
            line_name = o_line.get_label()
            if line_name.startswith('_line'):
                continue
            l_olines.append(o_line)
        return l_olines

    def o_ax(self, axis_name):
        return self.od.read([axis_name, 'o_ax'])

    def df(self, axis_name):
        return self.od.read([axis_name, 'df'])

    def ax_df(self, axis_name):
        o_ax = self.od.read([axis_name, 'o_ax'])
        df = self.od.read([axis_name, 'df'])
        return o_ax, df

    """ ***************************** Méthodes surchargées. ***************************** """
    @staticmethod
    def get_args_ui():
        pass

    @staticmethod
    def central_args(l_keys):
        raise SystemExit("show_geek.py > ShowGeek :\nLa méthode 'central_args()' doit être surchargée.")

    def get_pilot(self):
        raise SystemExit("show_geek.py > ShowGeek :\nLa méthode 'get_pilot()' doit être surchargée.")

    def add_indics(self, d_args=None):
        raise SystemExit("show_geek.py > ShowGeek :\nLa méthode 'add_indics()' doit être surchargée.")

    def distrib(self):
        pass

    """ ***************************** Méthodes à surcharger. **************************** """

    def hook_line_anim(self, axis_name, line_name, x, y, o_ax, df, o_line):
        return self.od.read([axis_name, 'signals', o_line.get_label()])

    def hook_axis_anim(self, axis_name, x, o_ax, df, y_min, y_max, get_line):
        """ Affichage de 2 éléments distincts :
            - Volumes, semi-transparent, sans bordure.
            - Étiquette-suiveuse """

        """ 1 - Affichage des volumes, semi-transparent, sans bordure. """
        if axis_name == 'Volume':
            self.paint_volume(x, o_ax, df)

        """ 2 - Étiquette-suiveuse à droite du graphique. """
        self.linked_label(axis_name, x, o_ax, df)

    """ **************************** Événements et affichage. *************************** """

    def key_event(self, ev):
        def arrows(b_forward):
            b_reverse = self.b_reverse
            self.b_paused, self.b_reverse = False, False if b_forward else True
            self.animate()
            self.b_paused = True
            self.b_reverse = b_reverse
            self.magn = 1

        keycode = ev.key
        if keycode == ' ':
            """ Pause on/off. """
            self.b_paused = not self.b_paused
        elif keycode == 'tab':
            """ Inversion de sens. """
            self.b_reverse = not self.b_reverse
        elif keycode == 'right' or keycode == 'left':
            """ Appui sur touche flèche droite ou flèche gauche. """
            arrows(keycode == 'right')
        elif keycode == 'up' or keycode == 'down':
            """ Appui sur touche flèche haut ou flèche bas. """
            self.magn = 10
            arrows(keycode == 'up')
        elif keycode == 'pageup' or keycode == 'pagedown':
            """ Appui sur touche flèche page-haut ou flèche page-bas. """
            self.magn = 100
            arrows(keycode == 'pageup')

    def on_mouse_move(self, ev):
        """ Le style des lignes est définidans self.build_axis(). """
        for o_ax in self.l_ax:
            o_ax.vline.set_data([ev.xdata, ev.xdata], [0, 1])
            o_ax.hline.set_data(([0, 1], [ev.ydata, ev.ydata]) if o_ax == ev.inaxes else ([0, 0], [0, 0]))

            if ev.xdata is not None and ev.ydata is not None:
                ax = self.get_last_axis()
                ax.note_x.remove()  # Effacement de l'étiquette précédente.
                bbox = {'boxstyle': 'round4', 'fc': '#ff02', 'ec': 'k', 'lw': .5}  # fc=face color, ec=edge color
                ax.note_x = ax.annotate(text=round(ev.xdata), xy=(ev.xdata, ax.viewLim.y0), bbox=bbox)

            if self.pointer <= 1 or not self.b_anim:
                plt.draw()  # Nécessaire si pas d'animation (lorsque self.pointer reste à 1).

    def show(self):
        self.l_ax = self._get_all_ax()
        self.fig.canvas.mpl_connect('key_press_event', self.key_event)
        if self.od.read(['figure', 'leader_lines']):
            self.fig.canvas.mpl_connect('motion_notify_event', self.on_mouse_move)
        if self.b_anim:
            ani = matplotlib.animation.FuncAnimation(self.fig, self.animate, frames=None, interval=.1, repeat=True)
        # |_ La variable {ani} doit exister pour empêcher le garbage collector de supprimer l'animation.
        self.animate()
        plt.show()

 

""" Version 2022-03-05 - Algorithme génétique. https://pypi.org/project/geneticalgorithm2/ v6.5.0 """
# Imports externes
import numpy as np
import pandas as pd

# Imports internes
from functions.utils import DateTime


# noinspection PyUnresolvedReferences, PyArgumentList
class BackTest:
    def __init__(self):
        self.l_columns = None
        self.np_datas = None
        self.nb_rows = 0

        """ Statistiques. """
        self.np_trades = None
        self.close_cond = None      # Fonction : condition de fermeture d'un trade.
        self.df_marks = None        # dataframe
        self.d_cost = None          # dictionary

    def fitness(self, **d_args):
        """ Code appelant : UI d'affichage. """
        """ Élimination des colonnes Open, High, Low, ... ainsi que celles pour le ga (commençant par '_'). """
        self.l_columns = ['Close'] + [col for col in list(self.df_pilot.columns[7:]) if not col.startswith('_')]
        self.np_datas = self.df_pilot[self.l_columns].values
        self.d_cost = d_args.get('cost', {})  # Arguments pour la fonction de coût.
        self.nb_rows = self.np_datas.shape[0]
        self.np_trades = np.zeros((self.nb_rows, 11), dtype=np.float32)
        """    |_ Colonnes de self.np_trades :  0 : gains (>0 ou <0) clôturés, en pips (trades fermés).     cumulé
                                                1 : gains en cours d'évolution, en pips (trades ouverts).   cumulé
                                                2 : gains total (clôturés + en évolution), en pips.         cumulé
                                                3 : gains (>0 ou <0) clôturés, en monnaie (€ ou $).         cumulé
                                                4 : gains en cours d'évolution (€ ou $).                    cumulé
                                                5 : gains total (clôturés + en évolution) (€ ou $).         cumulé
                                                6 : Flag d'ouverture (-1/+1), effacé à la fermeture.
                                                7 : marks d'ouverture (-1/+1), permanent.
                                                8 : indx de fermeture
                                                9 : gain du trade, en pips
                                               10 : gain du trade, en monnaie (€ ou $) """
        indx = 0
        while np.isnan(np.sum(self.np_datas[indx])):
            """ Recherche de l'index de la 1ère ligne où aucune valeur n'est nan. """
            indx += 1
            if indx >= self.nb_rows - 1:
                break
        d_args['indx'] = indx   # <-- Ajout de cette information dans les arguments.

        return self.cost(**d_args)

    def cost(self, **d_params):
        """
        @param d_params: paramètres spécifiques à cette stratégie : stop-loss, take-profit, seuils divers, etc.
        @return: dépend du code appelant -> (Calcul du gain + Marqueurs) ou (Process d'algorithme génétique).
            - Si 'Gain + Marqueurs' : tuple de 2 DataFrames : (df_gains, df_marks).
            - Si 'GA' : -1 * Gain. Le -1 s'explique par le fait que le ga cherche à minimiser le résultat.
        """
        """ Calculs. """
        spread = self.d_cost.get('spread', 2)
        l_open_indx = np.where(self.np_trades[:, 7] != 0)[0]    # Liste des index d'ouverture de trades.

        ll_marks = list()
        balance = self.d_cost.get('capital', 1000)      # Capital initial.
        self.np_trades[0, 3] = balance
        risk = self.d_cost.get('risk', 1)
        for open_indx in l_open_indx:
            l_trade = self.np_trades[open_indx]
            close_indx = int(l_trade[8])

            coef = l_trade[7]       # +1 ou -1      <-- achat ou vente.
            price = self.np_datas[open_indx, 0]
            gains = (self.np_datas[close_indx, 0] - price) * coef - spread
            np_gains = (self.np_datas[open_indx: close_indx, 0] - price) * coef

            """ Currencies (€ ou $) clôturés. """
            curr = gains * balance * risk * self.pips           # Currencies (€ ou $).
            self.np_trades[open_indx, 10] = curr
            self.np_trades[close_indx, 3] += curr
            balance += curr

            """ Currencies (€ ou $) ouverts. """
            self.np_trades[open_indx: close_indx, 4] += np_gains * balance * risk * self.pips

            """ Marqueurs. """
            op = 'achat' if l_trade[7] == 1 else 'vente'
            ll_marks.append([open_indx, price, f'Ouvre {op}'])
            close_indx = int(l_trade[8])
            if close_indx > 0:
                ll_marks.append([close_indx, self.np_datas[close_indx, 0], f'Ferme {op}'])

        self.np_trades[:, 0] = np.nancumsum(self.np_trades[:, 0])
        self.np_trades[:, 2] = self.np_trades[:, 0] + self.np_trades[:, 1]
        self.np_trades[:, 3] = np.cumsum(self.np_trades[:, 3])
        self.np_trades[:, 5] = self.np_trades[:, 3] + self.np_trades[:, 4]

        """ Anti-chevauchement des marqueurs. """
        ll_marks.sort()
        i, indx_ante = 4, 0
        for l_mark in ll_marks:
            if l_mark[0] == indx_ante:
                l_mark[1] += i
            else:
                indx_ante = l_mark[0]

        self.df_marks = pd.DataFrame(data=ll_marks, columns=['Index', 'Close', 'Type'])

        return pd.DataFrame(data=self.np_trades[:, :6],
                            columns=['Close_p', 'Open_p', 'All_p', 'Close_c', 'Open_c', 'All_c'])

    def bt_stats(self):
        """ bt_stats = Statistiques des backtests. """
        dt = DateTime()
        stamp_from = self.df_pilot.iloc[0, 0]
        stamp_to = self.df_pilot.iloc[-1, 0]
        nb_days = max(1, round((stamp_to - stamp_from) / 86_400))
        _from = dt.get_dtstr_from_dtstamp(stamp_from, dt_format='%d/%m/%Y')
        _to = dt.get_dtstr_from_dtstamp(stamp_to, dt_format='%d/%m/%Y')
        capital_ini = round(float(self.np_trades[0, 5]), 2)
        capital_min = round(float(np.min(self.np_trades[:, 5])), 2)
        crashed = '← crashed !' if capital_min < 0 else ''
        capital_max = round(float(np.max(self.np_trades[:, 5])), 2)
        capital_final = round(float(self.np_trades[-1, 5]), 2)
        perf = round(100 * (capital_final - capital_ini) / capital_ini, 2)
        nb_buy = np.sum(self.np_trades[:, 7] == 1)
        nb_sell = np.sum(self.np_trades[:, 7] == -1)
        nb_winner = np.sum(self.np_trades[:, 9] > 0)
        nb_loser = np.sum(self.np_trades[:, 9] < 0)
        mask = self.np_trades[:, 10] != 0
        l_trades = self.np_trades[:, 10][mask]
        win_consec = lose_consec = win_consec_max = lose_consec_max = 0
        score_ante = l_trades[0]
        score_win = score_lose = 0
        for score in l_trades:
            if score > 0:
                score_win += score
            else:
                score_lose += score
            if score > 0 and score_ante > 0:
                win_consec += 1
                win_consec_max = max(win_consec_max, win_consec)
            elif score < 0 and score_ante < 0:
                lose_consec += 1
                lose_consec_max = max(lose_consec_max, lose_consec)
            else:
                win_consec = lose_consec = 0
            score_ante = score
        if nb_winner > 0:
            score_win /= nb_winner
        if nb_loser > 0:
            score_lose /= nb_loser

        pips_max_indx = np.argmax(self.np_trades[:, 9])
        pips_max = round(float(self.np_trades[pips_max_indx, 9]), 2)
        stamp = self.df_pilot.iloc[pips_max_indx, 0]
        pips_max_dt = dt.get_dtstr_from_dtstamp(stamp, dt_format='%d/%m/%Y')
        pips_max_indexes = f'index de {pips_max_indx} à {int(self.np_trades[pips_max_indx, 8])}'

        pips_min_indx = np.argmin(self.np_trades[:, 9])
        pips_min = round(float(self.np_trades[pips_min_indx, 9]), 2)
        stamp = self.df_pilot.iloc[pips_min_indx, 0]
        pips_min_dt = dt.get_dtstr_from_dtstamp(stamp, dt_format='%d/%m/%Y')
        pips_min_indexes = f'index de {pips_min_indx} à {int(self.np_trades[pips_min_indx, 8])}'

        curr_max_indx = np.argmax(self.np_trades[:, 10])
        curr_max = round(float(self.np_trades[curr_max_indx, 10]), 2)
        stamp = self.df_pilot.iloc[curr_max_indx, 0]
        curr_max_dt = dt.get_dtstr_from_dtstamp(stamp, dt_format='%d/%m/%Y')
        curr_max_indexes = f'index de {curr_max_indx} à {int(self.np_trades[curr_max_indx, 8])}'

        curr_min_indx = np.argmin(self.np_trades[:, 10])
        curr_min = round(float(self.np_trades[curr_min_indx, 10]), 2)
        stamp = self.df_pilot.iloc[curr_min_indx, 0]
        curr_min_dt = dt.get_dtstr_from_dtstamp(stamp, dt_format='%d/%m/%Y')
        curr_min_indexes = f'index de {curr_min_indx} à {int(self.np_trades[curr_min_indx, 8])}'

        g_max = -10**8
        drawdown = 0
        curr_dwn_indx = 0
        for i, g in enumerate(self.np_trades[:, 2]):
            g_max = max(g_max, g)
            if (g_max - g) > drawdown:
                curr_dwn_indx = i
                drawdown = g_max - g
        stamp = self.df_pilot.iloc[curr_dwn_indx, 0]
        curr_dwn_dt = dt.get_dtstr_from_dtstamp(stamp, dt_format='%d/%m/%Y')

        """ Contexte. """
        total_pips = round(float(np.sum(self.np_trades[:, 9])), 2)
        od_pilot = self.central_args('pilot')
        instrument = od_pilot['instrument']
        table = od_pilot.read('table')
        pc_from = od_pilot.read('pc_from')
        pc_to = od_pilot.read('pc_to')
        nb_rows = self.df_pilot.shape[0]
        typ = f'Renko {table}' if isinstance(table, int) else f'Candles-{table}' if isinstance(table, str) else 'Ticks'
        if pc_from is None:
            points = f"{nb_rows} points jusqu'à {pc_to} % de la table en base de données."
        elif pc_to is None:
            points = f"{nb_rows} points à partir de {pc_from} % de la table en base de données."
        else:
            points = f"Tous les points entre {pc_from} % et {pc_to} % de la table en base de données."

        result = "Statistiques :" \
                 "\n=============" \
                 f"\nContexte :                      {instrument} - {typ} - {points}" \
                 f"\nPériode :                       {_from} -> {_to}" \
                 f"\nNb jours:                       {nb_days}" \
                 f"\nGain journalier:                {round((capital_final - capital_ini) / nb_days, 2)} €." \
                 f"\n\nCapital initial:                {capital_ini} €" \
                 f"\nCapital min:                    {capital_min} € {crashed}" \
                 f"\nCapital max:                    {capital_max} €" \
                 f"\nCapital final:                  {capital_final} €" \
                 f"\nPerformance:                    {perf} %" \
                 f"\nGain en pips:                   {total_pips} pips" \
                 f"\n\nNb trades à l'achat:            {nb_buy}" \
                 f"\nNb trades à la vente:           {nb_sell}" \
                 f"\nNb trades gagnants:             {nb_winner}" \
                 f"\nNb trades perdants:             {nb_loser}" \
                 f"\nNb max gagnants consécutifs:    {win_consec_max}" \
                 f"\nNb max perdants consécutifs:    {lose_consec_max}" \
                 f"\n\nGain moyen trades gagnants:     {round(score_win, 2)} €" \
                 f"\nGain moyen trades perdants:     {round(score_lose, 2)} €" \
                 f"\nMeilleur trade (pips):          {pips_max} pips, le {pips_max_dt} ({pips_max_indexes})" \
                 f"\nMeilleur trade (monnaie):       {curr_max} €, le {curr_max_dt} ({curr_max_indexes})" \
                 f"\nPire trade (pips):              {pips_min} pips, le {pips_min_dt} ({pips_min_indexes})" \
                 f"\nPire trade (monnaie):           {curr_min} €, le {curr_min_dt} ({curr_min_indexes})" \
                 f"\nPerte max (drawdown):           {round(drawdown)} pips le {curr_dwn_dt} (index {curr_dwn_indx})" \
                 "\n-------------------------------------------------------------------------------"
        print(result)

    def closures(self, close_indx):
        """ Fermetures, options possibles :
            - Modifier le code pour permettre des fermetures partielles.
                    |_ https://www.google.fr/search?q=trading+fermetures+partielles
            - Modifier le code dans {close_cond()} de la classe dérivée pour permettre les stops suiveurs.
                    |_ https://www.google.fr/search?q=trading+stop+suiveur
        """
        spread = self.d_cost.get('spread', 2)
        b_end = close_indx >= self.nb_rows - 1                            # Flag de fermeture de tous les résiduels.
        for open_indx in np.where(self.np_trades[:, 6] != 0)[0]:    # Liste des indx des trades ouverts (flags col 6).
            """ Liste des indx trades ouverts. Le flag6 (col 6) est obligatoirement +1 ou -1. """
            if b_end or self.close_cond(open_indx):          # Fonction dans la classe dérivée.
                self.np_trades[open_indx, 6] = 0                    # RAZ flag.
                self.np_trades[open_indx, 8] = close_indx                 # Index de fermeture

                coef = self.np_trades[open_indx, 7]
                price = self.np_datas[open_indx, 0]
                gains = (self.np_datas[close_indx, 0] - price) * coef - spread
                np_gains = (self.np_datas[open_indx: close_indx, 0] - price) * coef

                self.np_trades[open_indx, 9] = gains                    # 9 : gain du trade, en pips
                self.np_trades[close_indx, 0] += gains                  # 0 : gains (>0 ou <0) clôturés, en pips.
                self.np_trades[open_indx: close_indx, 1] += np_gains    # 1 : gains en cours d'évolution, en pips.

    def opening(self, op, indx):
        """ 2 flags d'ouverture (-1/+1) : col 6 -> effacé à la fermeture, col 7 -> permanent. """
        self.np_trades[indx, 6: 8] = 1 if op == 'buy' else -1   # 1='buy', -1='sell'.
        """ Voir colonnes de self.np_trades dans self.fitness() """

    def cond(self, txt, indx):
        """ Exemple : txt = 'AO-DRV cu 0' signifie : L'indicateur AO-DRV croise vers le haut (cross-up) la valeur 0.
        Opérateurs disponibles : /, \\, --, v, ^, cu, cd, ==, !=, >, >=, <, <= """

        def iscolumn(column_name):
            if self.txt_is_number(column_name):
                return True
            if column_name in self.l_columns:
                return True
            print(f"Classe GeneticAlgorithm, méthode 'cond()' :\nLa colonne '{column_name}' n'existe pas.")
            return False

        def col(col_name):
            return self.l_columns.index(col_name)

        l_c = txt.split()  # Séparateur = ' ' (espace).
        left = l_c[0]  # str
        op = l_c[1]  # str
        if not iscolumn(left):
            return False
        if len(l_c) == 2:
            """ /, \\, --, v, ^ """
            col = col(left)
            if op in ['/', '\\', '--'] and indx > 0:
                """ Pente positive (/), négative (\\) ou nulle(--). """
                sign_1, sign_0 = self.np_datas[indx - 1, col], self.np_datas[indx, col]
                return sign_0 > sign_1 if op == '/' else sign_0 == sign_1 if op == '--' else sign_0 < sign_1
            elif op in ['v', '^'] and indx > 1:
                """ Les 3 points précédents forment un creux (v) ou un sommet (^). """
                sign_2, sign_1, sign_0 = (self.np_datas[indx - 2, col], self.np_datas[indx - 1, col],
                                          self.np_datas[indx, col])
                return sign_1 < sign_2 and sign_1 < sign_0 if op == 'v' else sign_1 > sign_2 and sign_1 > sign_0
            pass
        elif len(l_c) == 3:
            """ cu, cd, ==, !=, >, >=, <, <= """
            right = l_c[2]
            if not iscolumn(right):
                return False
            if op in ['cu', 'cd']:
                """ Cross up & cross down : Croisements à la hausse ou à la baisse. """
                col_moved = left if self.txt_is_number(left) else col(left)  # Valeur en str ou N° col.
                col_ref = right if self.txt_is_number(right) else col(right)
                way = 'up' if op == 'cu' else 'down'
                return self.cross(way, indx, col_moved, col_ref)
            elif op in ['==', '!=', '>', '>=', '<', '<=']:
                """ Comparaisons. """
                left = float(left) if self.txt_is_number(left) else self.np_datas[indx, col(left)]
                right = float(right) if self.txt_is_number(right) else self.np_datas[indx, col(right)]
                if op == '==':
                    return left == right
                elif op == '!=':
                    return left != right
                elif op == '>':
                    return left > right
                elif op == '>=':
                    return left >= right
                elif op == '<':
                    return left < right
                else:    # '<='
                    return left <= right
        return False

    @staticmethod
    def txt_is_number(text):
        """ {text} est une chaîne ET représente un nombre (int ou float). """
        if isinstance(text, str):
            try:
                float(text)
                return True  # Est une chaîne ET représente un nombre => True.
            except (Exception,):
                return False  # Est une chaîne mais ne représente pas un nombre => False.
        else:
            return False  # N'est pas une chaîne => False.

    def cross(self, way, indx, col_moved, col_ref):
        if self.txt_is_number(col_ref):
            ref_now = ref_ante = float(col_ref)
        else:
            ref_now, ref_ante = self.np_datas[indx, col_ref], self.np_datas[indx - 1, col_ref]

        if self.txt_is_number(col_moved):
            mov_now = mov_ante = int(col_moved)
        else:
            mov_now, mov_ante = self.np_datas[indx, col_moved], self.np_datas[indx - 1, col_moved]

        if ref_now == mov_now:
            return False
        i = 2
        while ref_ante == mov_ante:
            ref_ante = float(col_ref) if self.txt_is_number(col_ref) else self.np_datas[indx - i, col_ref]
            mov_ante = float(col_moved) if self.txt_is_number(col_moved) else self.np_datas[indx - i, col_moved]
            i += 1

        b_cross = (ref_now - mov_now) * (ref_ante - mov_ante) < 0
        return (mov_now > ref_now) == (way == 'up') if b_cross else False

    """ ***************************** Méthodes surchargées ****************************** """
    @staticmethod
    def central_args(l_keys):
        raise SystemExit("genetic.py > GeneticAlgorithm :\nLa méthode 'central_args()' doit être surchargée.")

    def add_indics(self, od_args=None):
        raise SystemExit("genetic.py > GeneticAlgorithm :\nLa méthode 'add_indics()' doit être surchargée.")

 

""" Version 2022-03-05 ---------------------------- SUIVRE CES ÉTAPES ----------------------------

--- Imagination de la stratégie - Théorie :
===========================================
1 - Simple texte explicatif de quelques lignes, en langage courant (non informatique).
2 - Algorithme au format texte.
3 - Méthodes d'optimisation.
4 - Liste des documents dans le même dossier : diagrammes, images, fichiers tableurs, liens internet, etc.
5 - Liste des indicateurs utilisés : existant ou inventés.

--- Construction de la stratégie - Pratique :
=======================================================================================
1 - Titres : fenêtre et graphiques -> Compléter window_title et figure_title (dans central_args()).
2 - Choix du signal pilote : compléter self.central_args('pilot'). Choisir peu de points pour la MAP du code (ex : 1000)
3 - Ajouter indicateurs : compléter {self.add_indics()} en même temps que {self.central_args('indics')}.
4 - Afficher indicateurs : compléter {self.distrib()}.
5 - Activer ou désactiver l'affichage des points d'intérêt sur les graphiques, dans {self.post_params()} :
        |_ Un scatterplot est déjà proposé pour l'axe 'Principal', mais tout est modifiable.
6 - Méthode self.cost() :
    a - Coder la condition d'ouverture à l'achat.
        |_ Lancer l'appli : vérifier l'affichage des points d'ouverture et l'affichage des statistiques.
    b - Coder la condition de fermeture dans la fonction close_cond().
        |_  Lancer l'appli : vérifier également l'affichage des points de fermeture des achats.
    c - Terminer en codant les ventes : conditions d'ouverture et de fermeture.
7 - Résultats obtenus : affichage des statistiques.

pandas_ta -> https://github.com/twopirllc/pandas-ta
"""
# Imports externes
# import pandas_ta as ta  # Utilisé pour lire la doc : help(pdt.macd) Ne pas supprimer.
import seaborn as sns

# Imports internes
from functions.utils import Dictionary
from show.show_geek import ShowGeek
from trading.strategies.backtest import BackTest


# noinspection PyTypeChecker
class Geek(ShowGeek, BackTest):
    def __init__(self):
        super(Geek, self).__init__()  # GeneticAlgorithm.__init__() est appelé dans ShowGeek.

        """ Mode 'normal', selon votre imagination : Création, exécution et affichage de la stratégie. """
        self.show_ui()

    @staticmethod
    def central_args(l_keys):
        od_args = Dictionary(dict(
            ui=dict(                    # Ces valeurs seront à la racine du super-dictionnaire self.od.
                geometry=(100, 40, 1200, 600),  # x, y, w, h. Par défaut : (100, 40, 1200, 700).
                abscissa_size=600,      # Nb de points affichés en abscisse. Par défaut : 600.
                window_title="Stratégie xxxxxxxxxxxxxxxxxx",        # Titre de la fenêtre.
                figure_title="Mise au point du modèle 'strategy'",      # Titre des graphiques.
                subplots=dict(          # Noms et hauteurs (en %) des graphiques modifiables. Supprimer / Ajouter.
                    Principal=40,       # Subplot principal : NE PAS MODIFER SON ORTHOGRAPHE.
                    Gains='',
                    Pips='',             # Si chaîne vide => Les hauteurs seront automatiquement calculées et réparties.
                ),
            ),
            pilot=dict(                 # Dans cet exemple : les 10 000 derniers points de EUR/USD en Renko10.
                instrument='EUR/USD',   # EURUSD, USDJPY, EURCHF, USDCAD, NZDUSD, EURGBP, EURJPY, GBPJPY, GBPCHF, ...
                table=5,                # int (ex : 10) = Renko, str (ex: 'H1') = Candles, None = Ticks.
                # pc_from=0,            # en % : les valeurs après {pc_from}.
                pc_to=100,              # en % dans la table-db. Les valeurs avant {pc_to}.
                nb_rows=20_000,         # Nb points étudiés. Non utilisé si les 2 existent ({pc_from} ET {pc_to}).
            ),
            indics=dict(
                ma1=dict(length=170),                               # Indicateur moyenne mobile 1.
                ma2=dict(length=273.7),                             # Indicateur moyenne mobile 2.
                cost=dict(capital=1000, risk=20, spread=1.1)        # Params suppl. pour la méthode cost().
            ),
        ))
        return Dictionary(od_args.read(l_keys, {}))

    def add_indics(self, od_args=None):
        """ Indicateurs techniques en D:/anaconda/envs/robot/Lib/site-packages/pandas_ta <-- Ne pas les modifier !!!
            Doc indicateurs : https://github.com/twopirllc/pandas-ta#indicators-by-category
            Génération de signaux dérivés : ajout de colonnes au dataframe {df_pilot}. """

        """ Voir la documentation d'un indicateur. """
        # help(ta.trix)    # <-- Affiche la doc (Décommenter la ligne import pandas_ta as ta).

        """ Signaux pour le calcul des gains, les marqueurs et l'affichage. """
        od_args = self.central_args('indics')
        # Moyennes à tester : sma, ema, dema, tema, hma, wma, alma, kama, zlma, rma, median, ...
        self.df_pilot['MA1'] = self.df_pilot.ta.zlma(**od_args['ma1'])
        self.df_pilot['MA2'] = self.df_pilot.ta.zlma(**od_args['ma2'])

        """ Gains en pips et en monnaie (€ ou $). """
        l_gains = ['Balance (pips)', 'Open (pips)', 'Equity (pips)', 'Balance (€)', 'Open (€)', 'Equity (€)']
        self.df_pilot[l_gains] = self.fitness(**od_args)

        """ Option : Affiche les statistiques du backtest."""
        self.bt_stats()

        """ Option : Signaux uniquement pour l'affichage. """
        # self.df_pilot['xxxxx'] = self.get_indic(self.df_pilot.ta.{indic}, **od_args['xxx'])  # <-- Exemple

    def distrib(self):
        """ Méthode optionnelle : Répartition des signaux entre les différents graphiques (axes). """
        """ - Les noms des graphiques doivent correspondre avec ceux déclarés dans central_args(['ui', 'subplots']).
            - Ceux des signaux (indicateurs, ...) doivent correspondre avec ceux déclarés dans add_indics(). """
        self.add_df('Principal', ['MA1', 'MA2'])
        self.add_df('Gains', ['Balance (€)', 'Equity (€)'])
        self.add_df('Pips', ['Balance (pips)', 'Equity (pips)'])

    def pre_params(self):
        """ Méthode optionnelle. A supprimer ou à commenter s'il n'y a pas de surcharge.
        Surcharge, insertion (avant super()) ou ajout (après super()) au code pre_params() de la classe-parent. """
        super().pre_params()

        """ Légendes épinglées. Écriture dans le super-dictionnaire -> ex : cm = Center-Middle. """
        self.od.write(['Principal', 'legend'], 'ch')  # 'rh', 'lh', 'lb', 'rb', 'rm', 'lm', 'rm', 'cb', 'ch', 'cm'
        self.od.write(['Milieu', 'legend'], 'cm')

        """ Vérification (supprimer ou commenter après vérification). """
        # self.od.print()

    def post_params(self):
        """ Méthode optionnelle. A supprimer ou à commenter si la surcharge est inutile.
        Surcharge, insertion (avant super()) ou ajout (après super()) au code post_params() de la classe-parent. """

        """ Affichage des marqueurs. """
        if self.df_marks is not None:
            o_ax = self.o_ax('Principal')
            sns.scatterplot(data=self.df_marks, x='Index', y='Close', ax=o_ax, hue='Type', size='Type',
                            sizes=(30, 30), zorder=4)

        """ Tracé de lignes - https://matplotlib.org/stable/api/_as_gen/matplotlib.axes.Axes.axhline.html """
        # self.trace_hline('Milieu', 0, lw=.4, ls='-.', c='b')  # hline = ligne horizontale (vline = verticale).
        # self.trace_hline('Bas', 0, lw=.4, ls='-.', c='g')

        """ Coloriage statique. Exemple : Si le graphique 'Milieu' supporte un RSI, 2 bandes (30 à 40 et 60 à 70). """
        # o_ax = self.o_ax('Milieu')
        # if self.is_axis(o_ax):
        #     l_args = [
        #         dict(y1=30, y2=40, fc='#0aa1', ec='#f002'),
        #         dict(y1=60, y2=70, fc='#0aa1', ec='#f002'),
        #     ]
        #     self.fill_between(o_ax, l_args)

        super().post_params()  # Code avant -> inséré, code après -> ajouté.

    def hook_axis_anim(self, axis_name, x, o_ax, df, y_min, y_max, get_line):
        """ Méthode optionnelle. A supprimer ou à commenter si la surcharge est inutile.
        Surcharge, insertion ou ajout au code hook_axis_anim() de la classe-parent. """
        super().hook_axis_anim(axis_name, x, o_ax, df, y_min, y_max, get_line)

        """ Coloriage dynamique : https://matplotlib.org/stable/api/_as_gen/matplotlib.axes.Axes.fill_between.html """
        if axis_name == 'Principal':
            ma1 = get_line('MA1')   # get_line = fonction passée en paramètre.
            ma2 = get_line('MA2')
            l_args = [
                dict(x=x, y1=ma1, y2=ma2, where=ma1 > ma2, fc='#0aa2', ec='#f004', interpolate=True),
                dict(x=x, y1=ma1, y2=ma2, where=ma1 < ma2, fc='#a0a2', ec='#0f04', interpolate=True),
            ]
            self.fill_between(o_ax, l_args)

    def hook_line_anim(self, axis_name, line_name, x, y, o_ax, df, o_line):
        """ Méthode optionnelle. A supprimer ou à commenter si la surcharge est inutile.
        Surcharge ou ajout au code hook_line_anim() de la classe-parent. """
        hook = super().hook_line_anim(axis_name, line_name, x, y, o_ax, df, o_line)     # hook = Valeur de retour.

        """ - Masque la partie droite des graph. pour permettre de voir certaines prédictions (ex: nuage ichimoku).
            - Masquage conditionnel : ajoutez des conditions avec axis_name, line_name, ... """
        # y.loc[x[-100]:] = np.nan        # La valeur 100 est un exemple, la remplacer par une variable.

        return hook

    def cost(self, **d_params):
        """ Méthode obligatoire : coding de l'algorithme des ouvertures et fermetures.
        Datas disponibles : {self.np_datas} <-- les colonnes correspondent à {self.l_columns}. """

        """ Fonctions internes obligatoires. """
        def c(txt):
            """ txt : texte de la condition. Opérateurs disponibles : /, \\, --, v, ^, cu, cd, ==, !=, >, >=, <, <= """
            return self.cond(txt, indx)

        def open_cond(op):
            self.opening(op, indx)

        def close_cond(indx_open):
            """ Fermeture conditionnelle du trade ouvert à {indx_open}. """
            op = self.np_trades[indx_open, 6]   # 1 = achat, -1=vente.
            return c('MA1 cu MA2') if op == 1 else c('MA1 cd MA2')
        self.close_cond = close_cond

        """ Paramètres de la stratégie : self.d_cost. Commencent par 'p_'. """
        # ...

        # print(self.l_columns)     # Colonnes de np_datas. A commenter ou a supprimer.
        """ Colonnes de np_datas = ['Close', 'MA1', 'MA2'] <--  Liste fournie par le print ci-dessus. """
        indx_min = d_params['indx']
        for indx in range(indx_min, self.nb_rows):
            """ Fermetures. """
            self.closures(indx)

            """ Ouverture achat (ob = Open Buy). """
            if c('MA1 cd MA2'):
                open_cond('buy')

            """ Ouverture vente (os = Open Sell). """
            if c('MA1 cu MA2'):
                open_cond('sell')

        return super().cost(**d_params)


if __name__ == '__main__':
    Geek()

Créez vos propres modèles et placez-les dans ce dossier \models\


2 - Test du modèle :

Retour au plan

Statistiques :
=============
Contexte :                      EUR/USD - Renko 5 - 20000 points jusqu'à 100 % de la table en base de données.
Période :                       28/10/2020 -> 10/12/2021
Nb jours:                       408
Gain journalier:                25.49 €.

Capital initial:                1000.0 €
Capital min:                    699.16 € 
Capital max:                    11679.06 €
Capital final:                  11401.09 €
Performance:                    1040.11 %
Gain en pips:                   1697.5 pips

Nb trades à l'achat:            88
Nb trades à la vente:           87
Nb trades gagnants:             118
Nb trades perdants:             57
Nb max gagnants consécutifs:    7
Nb max perdants consécutifs:    4

Gain moyen trades gagnants:     231.23 €
Gain moyen trades perdants:     -296.22 €
Meilleur trade (pips):          113.9 pips, le 16/12/2020 (index de 3405 à 3464)
Meilleur trade (monnaie):       1339.89 €, le 30/11/2021 (index de 19275 à 19352)
Pire trade (pips):              -181.1 pips, le 31/03/2021 (index de 9438 à 9773)
Pire trade (monnaie):           -1299.13 €, le 16/06/2021 (index de 12726 à 12927)
Perte max (drawdown):           344 pips le 05/03/2021 (index 8094)
-------------------------------------------------------------------------------

3 - Exemple 1 : double RSI.

Retour au plan

\trading\strategies\partie_1_voir\Strategie_01_double_RSI\main_01.py :

"""
- Utiliser les consignes du modèle 'voir_model.py' pour détailler ici toutes les explications nécessaires :
- Croquis, liens internet, bibliographie, vidéos, etc.
"""
# Imports externes
# import pandas_ta as ta  # Utilisé seulement pour lire la doc : help(pdt.macd)
import seaborn as sns

# Imports internes
from functions.utils import Dictionary
from show.show_geek import ShowGeek
from trading.strategies.backtest import BackTest


# noinspection PyTypeChecker
class Geek(ShowGeek, BackTest):
    def __init__(self):
        super(Geek, self).__init__()  # GeneticAlgorithm.__init__() est appelé dans ShowGeek.

        """ Mode 'normal', selon votre imagination : Création, exécution et affichage de la stratégie. """
        self.show_ui()

    @staticmethod
    def central_args(l_keys):
        od_args = Dictionary(dict(
            ui=dict(  # Ces valeurs seront à la racine du super-dictionnaire self.od.
                geometry=(400, 40, 1200, 600),  # x, y, w, h. Par défaut : (100, 40, 1000, 700).
                abscissa_size=600,  # Nb de points affichés en abscisse. Par défaut : 600.
                window_title="Stratégie 01",  # Titre de la fenêtre.
                figure_title="Double RSI",  # Titre des graphiques.
                subplots=dict(  # Noms et hauteurs (en %) des graphiques modifiables. Supprimer / Ajouter.
                    Principal=40,  # Subplot principal : NE PAS MODIFER SON ORTHOGRAPHE.
                    Milieu='',
                    Milieu2='',
                    Gains='',  # Si chaîne vide => Les hauteurs seront automatiquement calculées et réparties.
                ),
            ),
            pilot=dict(  # Dans cet exemple : les 10 000 derniers points de EUR/USD en Renko10.
                instrument='EUR/USD',  # EURUSD, USDJPY, EURCHF, USDCAD, NZDUSD, EURGBP, EURJPY, GBPJPY, GBPCHF, ...
                table=10,  # int (ex : 10) = Renko, str (ex: 'H1') = Candles, None = Ticks.
                # pc_from=0,          # en % : les valeurs après {pc_from}.
                pc_to=100,  # en % dans la table-db. Les valeurs avant {pc_to}.
                nb_rows=20_000,  # Nb points étudiés. Non utilisé si {pc_from} ET {pc_to} existent.
            ),
            indics=dict(
                rsi_l=dict(length=50),
                rsi_s=dict(length=4),
                cost=dict(tp=50, sl=30, lim_rsi=40, capital=1000, risk=4, spread=1.1)
            ),
        ))
        return Dictionary(od_args.read(l_keys, {}))

    def add_indics(self, d_args=None):
        """ Indicateurs utilisés dans cette stratégie. Les arguments sont dans d_args_ga, section 'Indicateurs'. """
        """ Indicateurs en D:/anaconda/envs/robot/Lib/site-packages/pandas_ta <-- Ne pas les modifier !!!
            Doc indicateurs : https://github.com/twopirllc/pandas-ta#indicators-by-category
            Génération de signaux dérivés : ajout de colonnes au dataframe {df_pilot}. """
        # help(ta.rsi)    # <-- Affiche la doc.

        """ Signaux pour le calcul des gains, les marqueurs et l'affichage. """
        od_args = self.central_args('indics')
        self.df_pilot['RSI_L'] = self.df_pilot.ta.rsi(**od_args['rsi_l'])
        self.df_pilot['RSI_S'] = self.df_pilot.ta.rsi(**od_args['rsi_s'])

        """ Gains en pips et en monnaie (€ ou $). """
        l_gains = ['Balance (pips)', 'Open (pips)', 'Equity (pips)', 'Balance (€)', 'Open (€)', 'Equity (€)']
        self.df_pilot[l_gains] = self.fitness(**od_args)

        """ Affiche les statistiques du backtest."""
        self.bt_stats()

        """ Signaux seulement pour l'affichage. """
        # self.df_pilot['xxxxx'] = self.get_indic(self.df_pilot.ta.{indic}, **d_args['xxx'])  # <-- Exemple

    def distrib(self):
        """ Répartition des signaux entre les différents graphiques (axes). """
        """ - Les noms des graphiques doivent correspondre avec ceux déclarés dans central_args(['ui', 'subplots']).
            - Ceux des signaux (indicateurs, ...) doivent correspondre avec ceux déclarés dans add_indics(). """
        self.add_df('Milieu', ['RSI_L'])
        self.add_df('Milieu2', ['RSI_S'])
        self.add_df('Gains', ['Balance (€)', 'Equity (€)'])

    def pre_params(self):
        """ Surcharge, insertion (avant super()) ou ajout (après super()) au code pre_params() de la classe-parent. """
        super().pre_params()

        """ Exemple : écriture dans le super-dictionnaire -> ici, légende épinglée au milieu (cm = Center-Middle) """
        self.od.write(['Principal', 'legend'], 'ch')  # 'rh', 'lh', 'lb', 'rb', 'rm', 'lm', 'rm', 'cb', 'ch', 'cm'
        self.od.write(['Milieu', 'legend'], 'cm')  # 'rh', 'lh', 'lb', 'rb', 'rm', 'lm', 'rm', 'cb', 'ch', 'cm'

    def post_params(self):
        """ Surcharge, insertion (avant super()) ou ajout (après super()) au code pre_params() de la classe-parent. """

        """ Affichage des marqueurs. """
        if self.df_marks is not None:
            o_ax = self.o_ax('Principal')
            sns.scatterplot(data=self.df_marks, x='Index', y='Close', ax=o_ax, hue='Type', size='Type',
                            sizes=(50, 50), zorder=4)

        """ Exemple de tracé de ligne - https://matplotlib.org/stable/api/_as_gen/matplotlib.axes.Axes.axhline.html """
        self.trace_hline('Milieu', 50, lw=.4, ls='-.', c='r')

        """ Coloriage statique. Le graphique 'Milieu2' supporte un RSI avec une bande de y1 à y2. """
        o_ax = self.o_ax('Milieu2')
        if self.is_axis(o_ax):
            y1 = self.d_cost['lim_rsi']
            if y1 is not None:
                y2 = 100 - y1
                l_args = [
                    dict(y1=y1, y2=y2, fc='#0aa3', ec='#f003'),
                ]
                self.fill_between(o_ax, l_args)

        super().post_params()  # Code avant -> inséré, code après -> ajouté.

    def cost(self, **d_params):
        def c(txt):
            return self.cond(txt, indx)

        def open_cond(op):
            self.opening(op, indx)

        def close_cond(indx_open):
            """ Fermeture conditionnelle du trade ouvert à {indx_open}.
                Technique classique du take-profit / stop-loss.
            """
            op = self.np_trades[indx_open, 6]
            price = self.np_datas[indx, 0]
            open_price = self.np_datas[indx_open, 0]
            if op == 1:
                """ A été ouvert à l'achat. """
                return (price > open_price + p_tp) or (price < open_price - p_sl)
            else:
                """ A été ouvert à l'achat. """
                return (price < open_price - p_tp) or (price > open_price + p_sl)
        self.close_cond = close_cond

        """ Paramètres de la stratégie : self.d_cost. Commencent par 'p_'. """
        p_rsi_l = self.d_cost['lim_rsi']
        p_rsi_h = 100 - p_rsi_l
        p_tp = self.d_cost['tp']
        p_sl = self.d_cost['sl']

        indx = 0
        # print(self.l_columns)     # Colonnes de np_datas. A commenter ou a supprimer.
        """ Colonnes de np_datas = ['Close', 'RSI_L', 'RSI_S'] """
        indx_min = d_params['indx']
        for indx in range(indx_min, self.nb_rows):
            """ Fermetures. """
            self.closures(indx)

            """ Ouverture achat (ob = Open Buy). """
            if c('RSI_L > 50') and c(f'RSI_S cu {p_rsi_l}'):
                open_cond('buy')

            """ Ouverture vente (os = Open Sell). """
            if c('RSI_L < 50') and c(f'RSI_S cd {p_rsi_h}'):
                open_cond('sell')

        """ Fin. """
        return super().cost(**d_params)


if __name__ == '__main__':
    g = Geek()

 

Statistiques :
=============
Contexte :                      EUR/USD - Renko 10 - 20000 points jusqu'à 100 % de la table en base de données.
Période :                       11/04/2018 -> 10/12/2021
Nb jours:                       1339
Gain journalier:                3.2 €.

Capital initial:                1000.0 €
Capital min:                    912.04 € 
Capital max:                    7431.08 €
Capital final:                  5283.19 €
Performance:                    428.32 %
Gain en pips:                   5124.0 pips

Nb trades à l'achat:            976
Nb trades à la vente:           984
Nb trades gagnants:             857
Nb trades perdants:             1103
Nb max gagnants consécutifs:    22
Nb max perdants consécutifs:    21

Gain moyen trades gagnants:     83.26 €
Gain moyen trades perdants:     -60.81 €
Meilleur trade (pips):          58.9 pips, le 18/04/2018 (index de 81 à 129)
Meilleur trade (monnaie):       161.73 €, le 07/09/2020 (index de 14338 à 14364)
Pire trade (pips):              -51.1 pips, le 10/11/2020 (index de 15446 à 15514)
Pire trade (monnaie):           -115.51 €, le 08/09/2020 (index de 14348 à 14412)
Perte max (drawdown):           2660 pips le 18/03/2020 (index 9669)
-------------------------------------------------------------------------------


4 - Exemple 2 : Simple moyenne mobile.

Retour au plan

Notez les variations régulières sur les pips (arithmétiques), et exponentielles sur les gains (géométrique).
En effet, un nombre de pips  correspond à une fraction du capital (risque). Si ce dernier augmente, la valeur en €uros des mises (constante en pips), augmente.


5 - Exemple 3 : Affichage d'un indicateur avec datas dans le futur.

Retour au plan

L'instant présent se situe à peu près à 716, le nuage va au delà, et le signal chikou_span se termine à peu près à 690.

 


6 - A vous de jouer :

Retour au plan


Bon coding et bon courage !


Bonjour les codeurs !