Calculs & affichages / Historiques /
Le node générateur d'historiques

Serveur d'historiques réels


Avant-propos

Ce tuto termine le chapitre 'Historiques'


Description

Plan du tuto :
  1. Préparation.
  2. Rappel de l'algorithme de fonctionnement.
  3. Application au node Histos.
  4. Affichage de courbes.

 


1 - Préparation :


Pour la mise au point, seul un signal Renko sera utilisé, exemple : Maille 5.
 


 


La modification de l'abscisse entraine quelques conséquences.

☐ Fichier complet /show/show_matplotlib.py :

# Imports externes.
from PyQt5.QtCore import QTimer
import psutil
import pickle
import numpy as np
import sys
import os
from matplotlib import pyplot as plt, rcParams
from matplotlib.widgets import Slider
# sys.path.append(os.path.dirname(sys.path[0]))     # Pour MAP depuis le terminal.

# Imports internes.
from functions.utils import Utils, Dictionary, DateTime
from nodes.afficheurs.plots.matplotlib_yaml import YamlParams
from pc.ctrl_node import CtrlCalcul


# noinspection PyUnresolvedReferences
class ShowMatPlotLib(CtrlCalcul):
    def __init__(self):
        super().__init__()

        """ Ini: récupération des arguments passés en ligne de commande. """
        self.graph_name = sys.argv[1]
        self.id = int(sys.argv[2])
        self.parent_pid = int(sys.argv[3])
        self.final_path = ''
        self.ut = None

        """ Watch. """
        self.l_files_ante = list()

        """ Matplotlib. """
        self.fig = None
        self.l_ax = [None for _ in range(8)]  # Liste de 8 valeurs (les axes, ou graphiques : 1 par entrée)
        self.b_busy = False
        self.b_paused = False
        self.b_reverse = False

        """ Graphiques. """
        self.l_inputs = list()
        self.width_x = None
        self.pointer = None

        """ Timers. """
        self.dt = DateTime()
        self.listen = QTimer()
        self.anim = QTimer()

        """ Setup. """
        self.setup()

    def setup(self):
        """ 1 - Chemin complet du node final 'Plots'. Ce chemin contient les modèles de calculs 'calc_*.pkl. """
        bk_path = os.path.dirname(__file__).replace('show', 'backups')  # Dossier de backup.
        self.final_path = os.path.normpath(os.path.join(bk_path, self.graph_name, f'node{self.id}'))  # 'Plots'
        self.ut = Utils(path=self.final_path, file='matplotlib')

        """ 2 - Aide pour la mise au point : styles et paramètres diponibles dans matplotlib. Décommenter pour voir. """
        # [print(style) for style in plt.style.available]   # Liste des styles disponibles.
        # Dictionary(rcParams).print()                      # Liste des paramètres disponibles.

        """ 3 - Fichier de configuration, éditable manuellement en yaml. """
        self.o_yaml = YamlParams(self)
        self.o_yaml.yaml_file = os.path.normpath(f'{self.final_path}/{self.id}-matplotlib.yaml')
        self.o_yaml.update_odyaml()
        self.o_yaml.set_rcparams(rcParams)
        plt.style.use(self.o_yaml.get_style())

        """ 4 - Vitesse d'animation (Modifiable : touches + et -), pause, reverse, pas à pas. """
        self.anim.indx = 5      # <-- Vitesse moyenne (de 0 à 11)
        speed = self.speed(self.anim.indx)
        self.anim.delay = speed[0]
        self.anim.step = speed[1]

        """ 5 - Initialisation de matplotlib.pyplot. """
        self.fig = plt.figure()
        self.mgr.set_window_title('**ini**')

        """ Slider 'Largeur'. https://matplotlib.org/stable/api/widgets_api.html#matplotlib.widgets.Slider """
        slide_width = plt.axes([.12, .06, .77, .02])    # Taille et position du slider 'Largeur'.
        slide_width.semilogx()
        self.width_x = Slider(slide_width, 'Largeur', 10, 10_000, 316)
        #      |_ 316 est la moyenne géométrique de 10 et 10_000. 10**1=10, 10**2.5=316 (au milieu), 10**4=10_000

        slide_posit = plt.axes([.12, .02, .77, .02])  # Taille et position du slider 'Position'.
        self.pointer = Slider(ax=slide_posit, label='Position', valmin=0,
                              valmax=self.len_buffer - self.width_x.val - 2, valfmt='%0.0f')
        self.pointer.set_val(-1)

        """ 6 - Restauration de l'état de la fenêtre. """
        self.ut.restore_state(self.mgr.window)

        """ 7 - Surveillance des fichiers '.pkl'. """
        self.ini_watch()

        """ 8 - Événements. """
        self.listen.timeout.connect(self.listener)
        self.anim.timeout.connect(self.show_anim)
        self.fig.canvas.mpl_connect('key_press_event', self.key_event)
        self.pointer.on_changed(self.show_anim)
        self.width_x.on_changed(self.set_width_x)

        """ 9 - Lancement des timers. """
        self.listen.start(1000)
        self.anim.start(self.anim.delay)

        """ 10 - Boucle d'exécution Matplotlib. """
        plt.show()

    @property
    def mgr(self):
        return plt.get_current_fig_manager()

    def listener(self):
        """ 1 - Si le poste de contrôle est fermé => fin programme, sauf en mode autonome. """
        pass
        if self.parent_pid > 0 and not psutil.pid_exists(self.parent_pid):
            exit()

        """ 2 - Persistance géométrie. """
        self.ut.save_state(self.mgr.window)

    def set_width_x(self, val):
        self.set_pointer_xlim()
        self.pointer.val += 1 if self.b_reverse else -1
        self.show_anim()

    def show_anim(self, _=0):
        """ Anti ré-entrance. """
        if self.b_busy:
            return
        self.b_busy = True
        """ Pointeur commun. """
        self.pointer.set_val(round(self.pointer.val + (-self.anim.step if self.b_reverse else self.anim.step)))
        if self.pointer.val > self.pointer.valmax - round(self.width_x.val) - 2:  # Bouclage provisoire.
            self.pointer.set_val(0)
        if self.pointer.val < 0:  # Bouclage provisoire.
            self.pointer.set_val(self.pointer.valmax - round(self.width_x.val) - 2)

        """ Scrolling des entrées connectées et actives du node 'Plots' : subplots ou graphiques. """
        for ax in self.l_ax:
            if ax is None or not hasattr(ax, 'o_server') or ax.o_server is None:
                continue

            """ Nombre de points en abscisse.
            Si non précisé dans le yaml, ce subplot utilisera la largeur commune, fixée par le slider 'Largeur'. """
            width_x = round(self.width_x.val)       # valeur du slider.

            """ Avancement individuel du pointeur. """
            if len(ax.lines) > 0:  # Immobile si aucune courbe n'est affichée.
                ax.pointer += -self.anim.step if self.b_reverse else self.anim.step
            if ax.pointer > self.len_buffer - width_x:  # Bouclage provisoire.
                ax.pointer = 0
            if ax.pointer < 0:  # Bouclage provisoire.
                ax.pointer = self.len_buffer - width_x

            """ pointer = Position du 1er point à gauche dans le graphique.
            Si ShareX dans le yaml (True par défaut), ce subplot utilisera le slider 'Position'. """
            pointer = self.pointer.val if ax.od_self_yaml.get('ShareX', True) else ax.pointer

            """ Récupération des données sous forme de matrices numpy. """
            x = np.arange(pointer, pointer + width_x)
            y = ax.o_server.get_matrix(pointer, width_x)
            if y is None:
                continue

            """ Boucle sur les courbes. """
            y_min, y_max = np.inf, -np.inf
            for o_line in ax.lines:
                num_col = int(o_line.od.get('num_col'))
                if not np.isnan(y[:, num_col]).all():
                    y_min = min(y_min, np.nanmin(y[:, num_col]))
                    y_max = max(y_max, np.nanmax(y[:, num_col]))

                """ Dessin d'une courbe. Attribution des valeurs. """
                try:
                    o_line.set_xdata(x)
                    o_line.set_ydata(y[:, num_col])
                except (Exception,):
                    pass

            """ Limites x et y (abscisses et ordonnées). """
            ax.set_xlim(pointer, pointer + width_x - 1)  # Abscisses.
            if y_min != np.inf:
                y_padding = max(0, (y_max - y_min) * 0.05)  # Marges top et bottom du graphique (5%)
                y_min, y_max = y_min - y_padding, y_max + y_padding
                ax.set_ylim(y_min, y_max)  # Ordonnées.

            """ Coloriage inter-courbes. """
            self.o_yaml.between(ax, x, y)

        """ Fin. """
        plt.draw()
        self.b_busy = False

    def ini_watch(self):
        """ Liste des éléments à surveiller : dossier {graphe} + dossiers {nodes} + fichiers .pkl + fichiers .yaml
        - Le dossier est self.final_path, dans les backups.
        - Les fichiers calc_*.pkl : un par entrée active de ce node d'affichage.
        - Les fichiers .yaml : valeurs des clés 'Yaml file' dans les .pkl """

        """ 1 - Dossier du graphe dans les backups :dossier-parent de tous les nodes du graphe. """
        graph_path = os.path.normpath(f"{os.path.dirname(__file__).replace('show', 'backups')}/{self.graph_name}")

        """ 2 - Liste des dossiers des nodes du graphe (qui contiennent des fichiers nécessaires aux calculs) """
        l_folders = [graph_path] + [f'{graph_path}{os.sep}{node}' for node in os.listdir(graph_path) if
                                    str(node).startswith('node')]
        """ 3 - Liste des modèles yaml. """
        l_models = list()
        nodes_dir = os.path.dirname(__file__).replace('show', 'nodes')
        for (folder, subfolder, files) in os.walk(nodes_dir):
            if folder.endswith('models'):
                for file in files:
                    f_include = os.path.join(folder, file)
                    l_models.append(f_include)

        """ 4 - Liste des fichiers calc_*.pkl physiquement présents dans le dossier self.final_path """
        l_watched_files = [os.path.normpath(f'{self.final_path}/{calc}') for calc in
                           os.listdir(self.final_path) if calc.startswith('calc_') or calc.endswith('.conf')]

        """ 5 - Liste des fichiers .yaml : chaque od_calc (extrait du pkl) contient plusieurs clés 'Yaml file'.
                 On en fait une liste sans doublons. """
        s_yaml = set()  # set() au lieu de list() pour s'affranchir des doublons.
        for path, folder, l_files in os.walk(graph_path):
            [s_yaml.add(os.path.normpath(f'{path}/{file}')) for file in l_files if os.path.splitext(file)[1] == '.yaml']

        """ 6 - Concaténation des listes précédentes. """
        l_watched_files += list(s_yaml) + l_models  # Concaténation de 3 listes de fichiers.
        l_files = l_folders + l_watched_files  # Concaténation des listes : dossiers et fichiers.

        """ 7 - Vérification des changements dans cette liste, mais non dans le contenu des fichiers. """
        if self.l_files_ante != l_files:
            self.l_files_ante = self.ut.deepcopy(l_files)  # Mémorisation (copie profonde).
            self.ut.watch_file(l_files, self.watch)  # (Re)lancement de la surveillance.
            self.dt.delay(self.watch, 10, l_watched_files)

    def watch(self, l_files):
        """ l_files : liste contenant les dossiers et fichiers modifiés."""
        files_only = [file for file in l_files if os.path.isfile(file)]     # Filtrage : que les fichiers.
        if len(files_only) == 0:
            """ 1 - l_files ne contient que des dossiers : cela signifie que des fichiers ont été ajoutés ou supprimés.
            Par conséquent on relance la méthode ini_watch() pour mettre à jour la liste des fichiers à surveiller. """
            self.ini_watch()
            return
        if len(files_only) == 1 and files_only[0].endswith('.conf'):
            return

        """ 2 - Lecture de tous les calc_*.pkl (un par entrée) et écriture de leur contenu dans self.od_calcs. """
        CtrlCalcul.od_calcs.clear()  # Attribut de la classe CtrlCalcul.
        l_calcs = [(f'calc_{i}', os.path.normpath(f'{self.final_path}/calc_{i}.pkl')) for i in range(8)]
        for name_input, calc_file in l_calcs:
            """ ex : name_input = 'calc_0' """
            try:
                with open(calc_file, 'rb') as pk:
                    self.od_calcs.write(name_input, pickle.load(pk))
            except (Exception,):
                pass

        """ 3 - Création ou update des paramètres étendus od_yaml de o_yaml. """
        self.o_yaml.update_odyaml()

        """ 4 - La méthode principale update_figure() est chargée des mises à jour des calculs et des affichages. """
        self.update_figure()

    def update_figure(self):
        """ Update suite aux modifications .pkl et .yaml """

        """ 1 - Titre de la fenêtre. """
        local = ' - Mode local' if len(sys.argv) < 6 else ''    # Nombre d'arguments passés en ligne de commande.
        title_ante = self.mgr.get_window_title()
        b_first = title_ante == '**ini**'  # b_first = Premier passage, au lancement de l'appli.
        name_input = list(self.od_calcs.keys())[0]  # Première entrée de 'Plots' connectée.
        title_now = self.od_calcs.read([name_input, f'Node{self.id}', 'Fenêtre', 'Titre'], 'Graphiques') + local
        l_inputs = [key for (key, val) in self.od_calcs.items() if val['edges']]  # Entrées actives seulement.
        if title_now != title_ante:
            self.mgr.set_window_title(title_now)
            """ 1.1 - Si seul le titre de la fenêtre a changé, inutile de continuer (sauf au 1er passage). """
            if l_inputs == self.l_inputs and not b_first:
                return True

        """ 2 - Paramètres étendus yaml. """
        self.o_yaml.set_rcparams(rcParams)

        """ 3 - Subplots : création, update, géométrie. Chaque entrée est associée à un subplot.
        - Celui-ci est créé ou mis à jour en fonction des changements de grille (nb d'entrées ou yaml). """
        if self.o_yaml.grid_modified or l_inputs != self.l_inputs:  # Respecter l'ordre de comparaison !
            self.l_inputs = self.ut.deepcopy(l_inputs)
            d_grids = self.o_yaml.get_dgrids()
            for num_input in range(8):
                self.update_ax(d_grids, num_input)

        """ 4 - Paramètres des subplots (un par entrée active) et de leurs courbes. """
        self.od_mydic.clear()
        pointer_size = self.len_buffer
        for num_input in range(8):  # self.l_inputs = ['calc_0', 'calc_1', ...]
            name_input = f'calc_{num_input}'
            if name_input not in l_inputs:
                self.l_ax[num_input] = None
                continue

            ax = self.l_ax[num_input]  # Copie par référence.

            """ 4.1 - Pour cette entrée : recherche de son serveur et de sa signature. """
            edge = self.get_edge(name_input)  # ex : edge = ((1, 0), (0, 0))
            ax.o_server, signature = self.get_server(edge[0][0])  # edge[0][0] = id du node serveur.
            ax.num_input = num_input
            if ax.o_server is None:
                continue    # Entrée non connectée ou désactivée.

            """ 4.2 - Si nécessaire, calculs et paramètres des courbes. """
            if not hasattr(ax, 'signature') or ax.signature != signature:
                ax.signature = signature
                ax.od_self_yaml = Dictionary(self.o_yaml.od_yaml.read(f'Entrée {num_input}'))
                ax.o_server.calculation()

            """ 4.3 - Paramètres des courbes. """
            self.update_subplot(ax)

            """ 4.4 - Nombre de points fournis par les nodes serveurs. On conserve le min. """
            pointer_size = min(pointer_size, ax.o_server.nb_points)

        """ 5 - Nombre de points en abscisse.
        https://matplotlib.org/stable/api/widgets_api.html#matplotlib.widgets.Slider """
        if self.pointer.valmax != pointer_size - self.width_x.val - 2:
            self.set_pointer_xlim(pointer_size)

        """ 6 - Actualisation du buffer d'affichage nécessaire lorsque il est en pause. """
        self.show_anim()

        """ 7 - Affichage à l'écran du contenu du buffer d'affichage. """
        plt.draw()

    def set_pointer_xlim(self, pointer_size=None):
        self.pointer.valmin, self.pointer.valmax = 0, self.pointer.valmax if pointer_size is None else pointer_size
        self.pointer.ax.set_xlim(self.pointer.valmin, self.pointer.valmax - round(self.width_x.val) - 2)
        self.pointer.val = min(self.pointer.val, self.pointer.valmax)

    def get_edge(self, name_input):  # ex: name_input = 'calc_1'.
        """ Recherche du socket_out emetteur (directement connecté au socket_in 'name_input' de ce node 'Plots'). """
        s_edges = self.od_calcs.read([name_input, 'edges'])
        for edge in s_edges:
            if edge[1][0] == self.id:
                return edge

    def update_subplot(self, ax):
        """ Mise à jour des paramètres de ce subplot (axe), ainsi que de chacune de ses courbes.
        @param ax: objet subplot.
        @return: NA.
        """

        def update_line():
            """ Code d'appel : point 2 un peu plus bas.
                https://matplotlib.org/stable/api/_as_gen/matplotlib.lines.Line2D.html """
            """ 2.1 - Affectation des attributs aux courbes : couleur, épaisseur, style, légende. """
            o_line.od = Dictionary(od_lines.read(o_line.name))
            if not o_line.od:
                return                                                      # 'Kalman2-Normal-Original'
            num_col = ax.o_server.od_descr.read([o_line.name, 'num_col'])   # 'Kalman2-Normal-kalman'
            o_line.od.write('Entrée', f'Entrée {ax.num_input}')
            o_line.od.write('num_col', num_col)
            o_line.set_color(o_line.od.read('Couleur'))
            o_line.set_linewidth(o_line.od.read('Épaisseur'))
            o_line.set_linestyle(self.get_linestyle(o_line.od.read('Style')))
            o_line.set_label(o_line.od.read('Légende'))
            self.o_yaml.line_params(o_line)  # Paramètres étendus pour cette courbe.

            """ 2.2 - Exemple de compilation dynamique au niveau de la courbe 'o_line'. Décommenter si nécessaire. """
            # d_context = {'line': o_line}
            # self.o_yaml.compile([f'Entrée {num_input}', o_line.name, 'Code'], d_context)

        """ 1 - Actualisation de la liste des courbes ax.lines.
              - Si 'Plots' a 1 seule entrée, clé de type ['calc_0', 'Node0'], sinon ['calc_0', 'Node0', 'Entrée 0']. """
        l_mykey = [f'calc_{ax.num_input}', f'Node{self.id}']  # Clé de od_mydic dans od_calcs.
        l_keys = l_mykey + [f'Entrée {ax.num_input}']  # ['calc_0', 'Node0', 'Entrée 0']
        od_lines = Dictionary(self.od_calcs.read(l_keys, self.od_calcs.read(l_mykey)))
        self.update_lines(ax.num_input, od_lines)  # Suppression et ajout de courbes.

        """ 2 - Mise à jour de chacune des courbe. """
        for o_line in ax.lines:  # Parcours des courbes de ce subplot (de cette entrée de node).
            update_line()

        """ 3 - Exemple de compilation dynamique au niveau du subplot 'num_input'. Décommenter si nécessaire. """
        # d_context = {'subplot': ax}
        # self.o_yaml.compile([f'Entrée {num_input}', 'Code'], d_context)

        """ 4 - Paramètres étendus yaml.  """
        self.o_yaml.subplot(self.l_ax, ax.num_input)

    def update_lines(self, num_input, od_plots):
        """ Actualisation de la liste de courbes self.l_ax[num_input].lines[].
        On compare les listes de coubes nécessaires et de courbes existantes.
        On en déduit celles qu'il faut ajouter, celles qu'il faut supprimer. """

        """ 1) Courbes nécessaires : l_needed. """
        l_needed = list()
        for key, val in od_plots.items():
            if isinstance(val, dict) and 'Signal actif' in val and val['Signal actif']:
                l_needed.append(key)

        """ 2) Courbes existantes : l_exist. """
        l_exist = self.line_exist(num_input)

        """ 3) Ajout (création) de courbes. """
        for needed in l_needed:
            if needed not in l_exist:
                o_line, = self.l_ax[num_input].plot(np.empty(0))  # [num_input] = Élément 0 de la liste.
                o_line.name = needed

        """ 4) Suppression de courbes. """
        for exist in l_exist:
            if exist not in l_needed:
                o_line = self.get_line(num_input, exist)
                if o_line is not None:
                    self.l_ax[num_input].lines.remove(o_line)

    def line_exist(self, num_input):
        """ Appelée par update_lines(). """
        l_lines = list()
        for line in self.l_ax[num_input].lines:
            l_lines.append(line.name)
        return l_lines

    def get_line(self, num_input, line_name):
        """ Appelée par update_lines(). """
        for o_line in self.l_ax[num_input].lines:
            if o_line.name == line_name:
                return o_line
        return None

    @staticmethod
    def get_linestyle(param_style):
        if param_style.startswith('___'):
            return '-'
        elif param_style.startswith('- -'):
            return '--'
        elif param_style.startswith('. .'):
            return ':'
        return '-.'

    def update_ax(self, d_grids, num_input):
        """ On vérifie les 8 inputs, connectés ou pas. """
        name_input = f'calc_{num_input}'
        ax = self.l_ax[num_input]

        if name_input not in self.l_inputs:
            """ Vérification des entrées non connectées aussi, afin éviter des affichages indésirables."""
            if ax is not None:
                """ Ce subplot a existé, il n'existe plus. """
                ax.set_visible(False)  # Suppression des graphiques (subplots) fantômes.
                self.l_ax[num_input] = None
            return

        """ Le subplot {ax} contient l'attribut 'pointer' qui disparait après cet update.
            Par conséquent, on le sauvegarde d'abord. """
        if ax is None:
            pointer = 0
        else:
            ax.set_visible(False)  # Suppression des graphiques (subplots) fantômes.
            pointer = ax.pointer

        """ Update du subplot {ax}. add_axes() permet un positionnement absolu, totalement libre. """
        ax = self.fig.add_axes(d_grids[name_input])   # Position absolue.   x, y, w, h. Valeurs entre 0 et 1.

        """ Réintégration de l'attribut 'pointer'. """
        ax.pointer = pointer

        self.l_ax[num_input] = ax

    @staticmethod
    def speed(indx):
        """ Calcul empirique. """
        values = [(3000, 1), (2000, 1), (1200, 1), (640, 1), (480, 1), (320, 1),    # (durée en ms, pas de 1) <-- Lents.
                  (280, 2), (296, 4), (273, 7), (280, 14), (286, 26), (282, 47), (400, 20)]      # <-- Rapides.
        return values[indx]

    def key_event(self, ev):
        """ Appui sur une touche du clavier. """
        keycode = ev.key
        if keycode == ' ':
            """ Pause on/off. """
            self.b_paused = not self.b_paused
            self.anim.stop() if self.b_paused else self.anim.start(int(self.anim.delay))
        elif keycode == '+':
            """ Accélérer. """
            self.anim.stop()
            self.anim.indx += 1
            self.anim.indx = min(self.anim.indx, 11)
            speed = self.speed(self.anim.indx)
            self.anim.delay = speed[0]
            self.anim.step = speed[1]
            self.anim.start(int(self.anim.delay))
        elif keycode == '-':
            """ Ralentir. """
            self.anim.stop()
            self.anim.indx -= 1
            self.anim.indx = max(self.anim.indx, 0)
            speed = self.speed(self.anim.indx)
            self.anim.delay = speed[0]
            self.anim.step = speed[1]
            self.anim.start(int(self.anim.delay))
        elif keycode == 'tab':
            self.b_reverse = not self.b_reverse
        elif keycode == 'right' or keycode == 'left':
            self.b_paused = True
            self.anim.stop()
            step = self.anim.step
            self.anim.step = 1 if self.b_reverse == (keycode == 'left') else -1     # Ou exclusif.
            self.show_anim()
            self.anim.step = step


if __name__ == '__main__':
    """ Ajout d'arguments à la ligne de commande permettant un lancement autonome (pour la MAP). """
    sys.argv.append('Calculs')  # Nom du graphe.
    sys.argv.append('0')  # Plots id. (La ligne de commande ne doit contenir que du str).
    sys.argv.append('-1')  # Gestion de la fermeture automatique. Poste de contrôle PID : -1 en local.
    mpl = ShowMatPlotLib()

La classe Calcul du node Plots devient inutile, vous pouvez la supprimer.


2 - Rappel de l'algorithme de fonctionnement :

Retour au plan


Les méthodes colorées sont spécifiques au node et sont implémentées dans sa classe Calcul.


Les dossiers el les fichiers :


A chaque démarrage de l'application ou à chaque modification des fichiers ou dossiers ci-dessus :

Pour chaque entrée du node Plots :


3 - Application au node Histos :

Retour au plan

# Imports externes
import numpy as np

# Imports internes
from pc.ctrl_node import CtrlNode, CtrlCalcul
from nodes.generateurs.histos.histos_yaml import YamlParams
from trading.historiques.ctrl_histos import CtrlHistos

d_datas = {
    'name': 'Histos',           # Label affiché sous l'icone.
    'icon': 'histos.png',       # Icone affichée.
}


class Node(CtrlNode):
    def __init__(self, o_scene, s_id, pos):
        super().__init__(o_scene, s_id, pos)
        self.type = 'Histos'
        self.setup()

    def setup(self, child_file=__file__):
        super().setup(child_file)

    @property
    def ld_outputs(self):
        return [{
            'label': 'Sortie',
            'label_pos': (-38, -10)
        }]

    def fixed_params(self):
        return {
            'Instrument': ['EUR/USD', {'values': ['EUR/USD', 'USD/JPY', 'EUR/CHF', 'USD/CAD', 'NZD/USD',
                                                  'EUR/GBP', 'EUR/JPY', 'GBP/JPY', 'GBP/CHF', 'GBP/USD']}],
            'Ticks': False,  # à chaque variation du signal.
            'Maille 1': False,  # Renko, maille de 1 pip.
            'Maille 2': False,  # 2 pips.
            'Maille 3': False,  # 3 pips.
            'Maille 4': False,  # 4 pips.
            'Maille 5': True,  # 5 pips.
            'Maille 7': False,  # 7 pips.
            'Maille 10': False,  # 10 pips.
            'Maille 14': False,  # 14 pips.
            'Maille 20': False,  # 20 pips.
            '1 mn': False,  # 1 minute.
            '5 mn': False,  # 5 minutes.
            '15 mn': False,  # 15 minutes.
            '30 mn': False,  # 30 minutes.
            '1 h': False,  # 1 heure.
            '4 h': False,  # 4 heures.
            '1 j': False,  # 1 jour.
            '1 s': False,  # 1 semaine.
            '*   Voir historiques   ': '',      # '*' = bouton.
            '*   Télécharger  ': '',
            '*   Synchroniser bd  ': '',
        }

    def my_signals(self, l_signals_in, num_socket_out):
        l_signals = list()
        for signal_key in self.fixed_params().keys():
            b_signal = self.get_param(signal_key)       # Booléen.
            if isinstance(b_signal, bool) and b_signal:
                typ_id, signal_ante, signal_now, signal_source = f'{self.type}{self.id}', '', signal_key, ''
                l_signals.append((typ_id, signal_ante, signal_now, signal_source))
        return l_signals

    def get_state(self):
        """ Surcharge. """
        if self.b_chk:
            for signal in self.fixed_params().keys():
                if isinstance(self.get_param(signal), bool) and self.get_param(signal, False):
                    return True
        return False

    def histo_data(self, button):
        """ Clic sur un bouton : Voir, télécharger, synchroniser. """
        o_hist = CtrlHistos(self.get_param('Instrument'))  # Objet Contrôleur d'historiques.
        if button.startswith('Voir'):
            """ Bouton 'Voir historiques'. Si Instrument == EUR/USD => Symbole == EURUSD (sans '/') """
            o_hist.verify_weeks(b_ticks=False, b_silent=True)
            print()
            o_hist.verify_weeks(b_ticks=True)
        elif button.startswith('Télécharger'):
            """ Bouton 'Télécharger'. """
            o_hist.download_histos(nb_weeks=1, b_ticks=False)  # Candles.
            o_hist.download_histos(nb_weeks=1, b_ticks=True)

            """ Option : après le téléchargement, on affiche l'état des historiques. """
            print()
            o_hist.verify_weeks(b_ticks=False, b_silent=True)  # Candles.
            print()
            o_hist.verify_weeks(b_ticks=True)  # Ticks.
        elif button.startswith('Synchroniser'):
            """ Bouton 'Synchroniser bd'. """
            o_hist.synchro_db_csv(b_ticks=False)
            o_hist.synchro_db_csv(b_ticks=True)

    def refresh(self, l_keys):
        """ Méthode appelée automatiquement à chaque modification de paramètre dans le dockable. """
        if l_keys[1].startswith('   '):     # Les libellés des boutons commencent par des espaces.
            """ Les 3 boutons pour la base de données. Affichages dans le terminal. """
            self.histo_data(l_keys[1].strip())

        elif isinstance(self.get_param(l_keys[-1]), bool) or l_keys[-1] in ['Instrument', 'Nombre']:
            """ Nodes à updater : Propagation aval, seulement pour les signaux (cases à cocher). """
            self.lo_sockets_out[0].to_update()

            """ Infrastructure : Insertion du mot 'infrastructure' dans l_keys, en 1ère place. """
            l_keys = ['infrastructure'] + l_keys

            super().refresh(l_keys)


class Calcul(CtrlCalcul):
    """ ********** Le code ci-dessous ne concerne pas le poste de contrôle, mais seulement les calculs. ********** """
    def __init__(self):
        super().__init__()
        self.o_yaml = YamlParams(self)
        self.d_stamps = dict()
        self.l_pilot = list()
        self.o_histos = None
        self.b_forcing_calc = True

    def descr_signal(self, od_descr, val, root_key):
        """ - Création du super-dictionnaire de description. Voir docstring dans la classe-mère.
            - key_dock doit avoir EXACTEMENT la même orthographe que la clé affichée
              dans le dockable des paramètres du node CLIENT.
            - Sources EXCLUSIVES : self.od_mydic et self.o_yaml. """
        instrument = self.od_mydic.read('Instrument')
        od_descr.write('changed', True)
        od_descr.write('instrument', instrument)
        for signal_name, val in self.od_mydic.items():
            if isinstance(val, bool) and signal_name != 'Checked':
                key_dock = f'{self.s_id}-{signal_name}'     # Ex : Histos1-Maille 7
                od_descr.write([key_dock, 'notes'], signal_name)
                od_descr.write([key_dock, 'actif'], self.od_mydic.read(signal_name, False))    # True ou False.
        if instrument != self.od_descr.get('instrument'):
            self.d_stamps.clear()

    def pre_process(self):
        """ Voir docstring dans la classe-mère. """
        """ La récupération des paramètres se fait EXCLUSIVEMENT depuis le super-dictionnaire self.od_descr. """
        """ Le tableau numpy (self.np_array) est entièrement rempli, en une seule passe. """
        """ Traitement du pilote (pilote = image du slider 'Position'). """
        """ 1 - Signal le +lent : ts_right = timestamp à droite de la fenêtre lorsque son début est à gauche. """
        """ 2 - Recherche l'index de ts_right dans le plus rapide. """
        """ 3 - ts_left = timestamp à gauche de la fenêtre du signal le +rapide. """
        """ 4 - Calcul de l'offset minimum pour que tous les signaux soient affichés à 100 %. """

        def get_index(_list, _val):
            """ La liste doit être triée (sens croissant). """
            try:
                return _list.index(_val)
            except ValueError:
                for indx, val in enumerate(_list):
                    if val > _val:
                        return indx
            return len(_list) - 1   # Le dernier par défaut

        d_tables = {
            'Ticks': 'Ticks', 'Maille 1': 'r1', 'Maille 2': 'r2', 'Maille 3': 'r3', 'Maille 4': 'r4', 'Maille 5': 'r5',
            'Maille 7': 'r7', 'Maille 10': 'r10', 'Maille 14': 'r14', 'Maille 20': 'r20', '1 mn': 'm1', '5 mn': 'm5',
            '15 mn': 'm15', '30 mn': 'm30', '1 h': 'h1', '4 h': 'h4', '1 j': 'day', '1 s': 'week'
        }
        abscissa_length = 300_000   # Étendue maximum du Slider 'Position'.
        width_x = 316               # Nombre de points affichés = valeur du slider 'Largeur' par défaut.
        instrument = self.od_descr.get('instrument', 'EUR/USD')
        self.o_histos = CtrlHistos(instrument)
        l_stamps, l_tables = list(), list()
        slowest, fastest, len_min, len_max, ts_right = '', '', 10**6, 0, 0
        for l_key in self.od_descr.key_list():
            if l_key[-1] == 'actif' and self.od_descr.read(l_key):
                """ Cette devise est cochée dans le dockable des paramètres. """
                key_dock = l_key[0]
                signal_name = key_dock.split('-')[1]
                table_name = d_tables.get(signal_name)
                l_tables.append((table_name, self.od_descr.read([key_dock, 'num_col'])))
                if signal_name not in self.d_stamps:
                    self.d_stamps[signal_name] = self.o_histos.get_stamps(table_name, abscissa_length)
                len_st = len(self.d_stamps[signal_name])
                if len_st < len_min:
                    """ Recherche du signal le plus lent (le moins dense) <-- dense = nb valeurs/unité de temps. """
                    len_min = len_st
                    ts_right = self.d_stamps[signal_name][min(width_x, len_min)]
                if len_st > len_max:
                    """ Recherche du signal le plus rapide (le plus dense). """
                    len_max = len_st
                    fastest = signal_name
                l_stamps += self.d_stamps[signal_name]

        self.l_pilot = list(set(l_stamps))     # Suppression des doublons.
        self.l_pilot.sort()                    # Tri.
        indx_left = max(0, get_index(self.d_stamps[fastest], ts_right) - width_x)
        stamp_left = self.d_stamps[fastest][indx_left]
        offset = get_index(self.l_pilot, stamp_left)
        self.nb_points = len(self.l_pilot) - offset + 1

        """ Infos nécessaires pour get_matrix(). """
        self.od_descr['offset'] = offset
        self.od_descr['bd_tables'] = l_tables
        self.od_descr['pips'] = self.o_histos.pips

    def get_matrix(self, pointer, nb_lines=0):
        """ Construction du ndarray de retour à partir des ndarray du dictionnaire self.d_datas. """
        _from, _to = pointer, pointer+nb_lines
        np_matrix = np.full((nb_lines, 19), np.nan)     # 1 + 18 colonnes (voir d_tables de pre_process() ci-dessus.)
        """ Signal pilote (celui qui présente le plus de détails). """
        pips = self.od_descr.get('pips')
        offset = self.od_descr.get('offset')

        try:
            final_stamp = self.l_pilot[offset + pointer + nb_lines]
        except (Exception,):
            return np_matrix
        for table_name, num_col in self.od_descr.get('bd_tables'):
            slice_histo = self.o_histos.get_datas(table_name, final_stamp, nb_lines)
            pass
            if table_name == 'Ticks':
                vector = (slice_histo[:, 1] + slice_histo[:, 2]) / (2 * pips)
            elif table_name.startswith('r'):
                vector = slice_histo[:, 1]
            else:
                vector = (slice_histo[:, 4] + slice_histo[:, 8]) / (2 * pips)
            np_matrix[-slice_histo.shape[0]:, num_col] = vector

        return np_matrix

 


Conséquences en cascade 🥵

/nodes/generateur/histos/histos_yaml.py :

# Imports internes.
from nodes.yaml_parent import YamlParent


class YamlParams(YamlParent):
    def __init__(self, o_calc):
        super().__init__(o_calc, __file__)

/nodes/generateur/histos/histos_seeder.yaml : Ce fichier est vide, mais il doit exister.


Oui mais ... une nouvelle version de CtrlHistos est nécessaire pour satisfaire à l'implémentation des calculs dans le node histos :

☐ Fichier /trading/historiques/ctrl_histos.py :

# Imports externes
import datetime
import gzip
import os           # http://www.python-simple.com/python-modules-fichiers/os-path.php (Ctrl + clic)
import requests
from io import BytesIO, StringIO
import pandas as pd
import keyboard     # https://www.delftstack.com/fr/howto/python/python-detect-keypress/

# Imports internes
from functions.utils import DateTime, Utils
from trading.historiques.db_candle import DbCandle
from trading.historiques.db_tick import DbTick


class CtrlHistos:
    def __init__(self, instrument):
        """ https://github.com/fxcm/MarketData
            Tous les symboles :
                AUDCAD, AUDCHF, AUDJPY, AUDNZD, AUDUSD, CADCHF, CADJPY, EURAUD, EURCHF,
                EURGBP, EURJPY, EURNZD, EURUSD, GBPCAD, GBPCHF, GBPJPY, GBPNZD, GBPUSD,
                NZDCAD, NZDCHF, NZDJPY, NZDUSD, USDCAD, USDCHF, USDJPY, USDTRY
            Les plus utilisés (Dans l'ordre d'importance) :
                EURUSD, USDJPY, EURCHF, USDCAD, NZDUSD, EURGBP, EURJPY, GBPJPY, GBPCHF, GBPUSD """
        self.instrument = instrument
        self.symbol = instrument.replace('/', '')
        self.dt = DateTime()
        self.ut = Utils()
        self.kb_break = 'f12'   # 'shift', 'ctrl', ... à votre convenance.
        self.pips = .01 if instrument.endswith('JPY') else .0001
        self.symbol_dir = None
        self.db_candle = None
        self.db_tick = None
        self.setup()

    def setup(self):
        """ Les données peuvent se trouver sur un disque dur SSD autre que celui en cours.
            Le cas échéant, db_path doit être modifié (chemin absolu).
            Il est également possible, pour les besoins des tests, d'avoir plusieurs bases de données.
                |_ Il faudra alors modifier db_path chaque fois que l'on désire switcher. """
        db_path = os.path.dirname(__file__)             # <--- Chemin par défaut, à modifier si autre disque dur.
        self.symbol_dir = os.path.abspath(f'{db_path}/db/{self.symbol}')    # Dossier des fichiers d'historiques.
        os.makedirs(self.symbol_dir, exist_ok=True)
        self.db_candle = DbCandle(self)
        self.db_tick = DbTick(self)

    def _helper(self, key, *l_params):
        """ ************ Collection d'utilitaires. Nombre de paramètres variable. ************ """
        if key == 'nb_weeks_in_year':
            """ Renvoie le nombre de semaines dans l'année {year}. """
            year = l_params[0]   # l_params = [année dont on veut connaître le nombre de semaines].
            if len(l_params) > 1:
                y, num_week, _ = datetime.date.today().isocalendar()
                if y == year:
                    return num_week - 1
            o_dat = self.dt.get_date_from_dtstamp(self.dt.get_dtstamp_from_dtstr(f'{year}-12-28', '%Y-%m-%d'))
            return o_dat.isocalendar()[1]

        elif key == 'l_iso_yw':
            """ Renvoie une liste de tuples ISO (iso_year, iso_str_week) :  <-- exemple de tuple : (2019, '08')
                - Pour la devise en cours et son type (tick ou candle).
                - Toutes les semaines, du début à aujourd'hui.
                - L'année est un int, la semaine une str ('02', '12', ...) """
            b_ticks = l_params[0]       # l_params = [b_ticks].
            st_yw = set()               # Le set() évite les doublons.
            since = 2018 if b_ticks else 2012
            yw_today = datetime.date.today().isocalendar()[:2]
            for year in range(since, yw_today[0] + 1):
                for week in range(1, 54):
                    iso = self.dt.isoyw_from_fxyw(year, week)
                    st_yw.add((iso[0], f'0{iso[1]}'[-2:]))
                    if (year, week) >= yw_today:
                        lt_yw = list(st_yw)
                        lt_yw.sort()
                        return lt_yw

        elif key == 'csv_file':
            """ Renvoie le chemin complet du fichier correspondant à la devise en cours, à l'année et à la semaine. """
            year, week, b_ticks = l_params       # l_params = [*t_yw, b_ticks]
            str_week = f'0{week}'[-2:]
            return os.path.join(self.symbol_dir, str(year), f"{'tick' if b_ticks else 'candle'}_"
                                                            f"{str_week}.csv")
        elif key == 'url_file':
            """ Renvoie l'url du fichier d'histos correspondant à la devise en cours, à l'année et à la semaine. """
            year, str_week, b_ticks = l_params       # l_params = [*t_yw, b_ticks]
            url_candle = 'https://candledata.fxcorporate.com/m1'
            url_tick = 'https://tickdata.fxcorporate.com'
            return f'{url_tick if b_ticks else url_candle}/{self.symbol}/{year}/{int(str_week)}.csv.gz'

    def verify_weeks(self, b_ticks, b_silent=False):
        """ Affichage d'un tableau dans la console (Lignes=années, Colonnes=semaines).
            '▒' ou '▄' ou '▀' = semaine présente, '.' = semaine absente. """

        """ 1 - Listes des semaines réellement existantes en base de données et en fichier .csv. """
        l_weeks_csv, l_weeks_db = self._existing_lists(b_ticks)

        """ 2 - Affichage du titre et entêtes de colonnes. """
        typ = 'Ticks' if b_ticks else 'Candles'
        self.ut.gauge(f"{self.instrument}-{typ}", large=17)
        [self.ut.gauge(char=f"{str(week) :<2}") for week in range(1, 54)]
        self.ut.gauge('end')

        """ 3 - Affichage du contenu - Ordonnées = années, abscisse = semaines. """
        l_weeks = l_weeks_csv + l_weeks_db
        l_weeks.sort()
        first_year = (2018 if b_ticks else 2012) if len(l_weeks) == 0 else l_weeks[0][0]
        year_now = datetime.date.today().isocalendar()[0]
        for year in range(first_year, year_now + 1):
            self.ut.gauge(year, large=17)
            """ Boucle sur les semaines de l'année {year} : 1 à (52 ou 53). """
            for week in range(1, self._helper('nb_weeks_in_year', year, 'today') + 1):  # today limite à aujourd'hui.
                t_yw = year, f'0{week}'[-2:]      # 7 -> 07
                b_csv, b_db = t_yw in l_weeks_csv, t_yw in l_weeks_db
                if not b_csv and not b_db:          # 0 0       /x./y
                    char, color = '. ', 'BLEU'
                elif not b_csv and b_db:            # 0 1       /x.y
                    char, color = '▀ ', 'VERT'
                elif b_csv and not b_db:            # 1 0       x./y
                    char, color = '▄ ', 'VIOLET'
                else:                               # 1 1       x.y
                    char, color = '▒ ', 'BLEU'
                self.ut.gauge(char=char, color=color)
            self.ut.gauge('end')

        """ Vérification de la synchronisation entre la base de données et les fichiers csv. """
        msg = "\nAttention : La base de données n'est pas entièrement synchronisée ! csv + db = rectangles bleus." \
              "\nL'affichage ci-dessus montre les fichiers .csv en carrés-bas mauves, la db en carrés-hauts verts." \
              if not b_silent and l_weeks_csv != l_weeks_db else ''
        self.ut.printc(msg)

    def download_histos(self, nb_weeks, b_ticks):
        if nb_weeks > 1:
            self.ut.printc(f"\nArrêt manuel : garder la touche '{self.kb_break.upper()}' appuyée.\n")

        """ Intervalle des semaines à télécharger (nombres en base 53). """
        """                     |_ since = Depuis. """
        l_weeks_csv, _ = self._existing_lists(b_ticks=b_ticks)      # Semaines = str <-- '08', '09', '10', ...
        l_csv = [(y, int(w)) for (y, w) in l_weeks_csv]             # Semaines = int <-- 8, 9, 10, ...
        for y, w in l_csv:
            if w > 2:
                since = y * 53 + w - 1
                break
        else:
            since = (2018 if b_ticks else 2012) * 53

        y_today, w_today = datetime.date.today().isocalendar()[:2]
        now = y_today * 53 + w_today
        """                     |_ now = Jusqu'à. """

        """ l_required = Liste des fichiers à télécharger. """
        l_required = list()
        for yw53 in range(since, now-1):          # dernière semaine = (now-1) => pas la semaine en cours.
            csv_yw = yw53 // 53, 1 + yw53 % 53
            csv_file = self._helper('csv_file', *csv_yw, b_ticks)
            if not os.path.isfile(csv_file):
                """ Fichier absent localement => à télécharger. """
                l_required.append(csv_yw)

        db = self.db_tick if b_ticks else self.db_candle
        nb_weeks_loaded = 0

        """ Parcours des semaines à traiter. """
        for csv_yw_required in l_required:
            df = self._download_csv_file(csv_yw_required, b_ticks)
            if df is None:
                """ Arrêt manuel demandé. """
                break
            elif df.shape[0] > 0:
                """ 1 - Enregistrement du fichier téléchargé sur disque dur. """
                self._df_to_csv(df, csv_yw_required, b_ticks)

                """ 2 - Écriture du contenu en base de données. break si arrêt manuel demandé. """
                if not db.df_to_table(df):
                    continue

                """ 3 - Update des volumes à partir des ticks. """
                df_stamps = self.db_tick.get_df_stamps(*csv_yw_required)
                if df_stamps.shape[0] > 0:
                    self.db_candle.update_volume(df_stamps)

                """ Arrêt si le nombre de semaines demandées est atteint. """
                nb_weeks_loaded += 1
                if nb_weeks_loaded >= nb_weeks:
                    break

        s = 's' if nb_weeks > 1 else ''
        self.ut.printc(f"Téléchargement{s} terminé{s}.", color='vert')

    def synchro_db_csv(self, b_ticks):
        """ A l'issue de cette méthode, TOUTE la db sera l'image de TOUS les fichiers csv. """
        """ 1 - Base de données. """
        db, typ = (self.db_tick, 'Ticks') if b_ticks else (self.db_candle, 'Candles')

        """ 2 - État actuel des historiques => Différences entre csv et db
                - l_drop = semaines à supprimer de la db, l_add = semaines à ajouter à la db. """
        l_drop, l_add, l_vol = self._lists_to_update_db(b_ticks=b_ticks)

        if l_drop+l_add+l_vol == []:
            self.ut.printc(f"{typ} csv <--> db : OK. La synchronisation est inutile.\n", color='VERT')
            return True

        """ 3 - Message : état initial. """
        for l_weeks in [l_drop, l_add]:
            if len(l_weeks) > 0:
                s = 's sont' if len(l_weeks) > 1 else ' est'
                verb = 'ajouter à' if l_weeks == l_add else 'supprimer de'
                self.ut.printc(f"{len(l_weeks)} semaine{s} à {verb} la base de données '{typ.lower()}'.", color='VERT')

        """ 4 - Ajout de données dans la db. """
        self._add_weeks(l_add, b_ticks)

        """ 5 - Suppression de données de la db. """
        for yw in l_drop:
            db.delete_week(*yw)

        """ 6 - Update des volumes. """
        for yw in l_vol:
            if keyboard.is_pressed(self.kb_break):
                self.ut.printc("Arrêt manuel demandé.")
                break
            df_stamps = self.db_tick.get_df_stamps(*yw)
            self.db_candle.update_volume(df_stamps)

        """ 7 - Vérification. """
        l_drop, l_add, l_vol = self._lists_to_update_db(b_ticks=b_ticks)
        success, color = ('réussi', 'VERT') if l_drop+l_add+l_vol == [] else ('échoué', 'ROUGE')
        self.ut.printc(f'{typ} : La synchronisation a {success}.', color=color)

    def _existing_lists(self, b_ticks):
        """ Retourne les listes de semaines des fichiers csv et des enregistrements en base de données. """
        """ s_weeks_csv n'est pas une list(), mais un set(), pour éviter les doublons. """
        l_weeks_csv, s_weeks_db = list(), set()
        db = self.db_tick if b_ticks else self.db_candle
        l_iso_yw = self._helper('l_iso_yw', b_ticks)            # Liste de toutes les semaines.
        typ = 'Ticks' if b_ticks else 'Candles'
        self.ut.gauge(f"{self.instrument}-{typ} Création listes :", large=0)
        for num, iso_yw in enumerate(l_iso_yw):
            """ Semaines existant en fichiers csv : Pour une semaine iso, on a 0, 1 ou 2 fichiers csv. """
            l_csv_files, last_stamp = self._get_csv_files(*iso_yw, b_ticks)
            csv_size = 0
            if len(l_csv_files) > 0:
                l_weeks_csv.append(iso_yw)
                for csv_file in l_csv_files:
                    csv_size += os.path.getsize(csv_file)
            """ Liste des semaines existant en base de données. """
            csv = last_stamp if b_ticks else csv_size
            if db.week_exists(*iso_yw, csv):
                s_weeks_db.add(iso_yw)             # (1)
            if num % 8 == 0:
                self.ut.gauge(char='.')  # , color='Jaune')
        self.ut.gauge('end')
        self.ut.gauge('end')        # Ligne vide supplémentaire.

        """ Persistance en pkl. """
        l_weeks_db = list(s_weeks_db)
        l_weeks_csv.sort()
        l_weeks_db.sort()
        return l_weeks_csv, l_weeks_db

    def _get_csv_files(self, iso_year, iso_week, b_ticks):
        """ Attention ! Les noms de fichiers csv ne correspondent pas toujours à leur contenu ! """
        """ Return : liste de noms complets de fichiers csv correspondant à (iso_year, iso_week) : 0, 1 ou 2 fichiers.
            fxcorporate ne respecte pas la norme ISO. De plus :
                - D'une année sur l'autre, le nom du fichier csv ne correspond pas à son contenu.
                - On ne trouve pas de correspondance entre diverses devises.
                - Par conséquent, le nom du fichier n'est pas fiable pour trouver le bon N° de semaine.
            La méthode adoptée pour contourner ce problème est la suivante :
                - Recherche de 3 fichiers csv voisins de (iso_year, iso_week) passés en paramètres.
                - Lecture, pour chacun, de la date-heure du milieu du fichier.
                - On en déduit le VRAI N° de semaine ISO.
            Il arrive parfois, au changement d'année, qu'une même semaine soit composée de 2 fichiers csv.
                - C'est pourquoi la valeur de retour est une liste. """
        int_week, l_fxyw = int(iso_week), list()
        last_stamp = 0
        for week in range(int_week - 1, int_week + 1):     # 2 Semaines : précédente, actuelle.
            """ Correction : N° de semaine en base 53. """
            y, w = (iso_year, week) if week > 0 else (iso_year - 1, week + 53)
            y, w = (y, w) if w <= 53 else (y + 1, w - 53)

            """ Les 3 fichiers csv candidats. """
            str_w = f'0{w}'[-2:]
            file_name = f"{'tick' if b_ticks else 'candle'}_{str_w}.csv"  # 'tick_??.csv' ou 'candle_??.csv'.
            csv_path = os.path.abspath(f"{os.path.dirname(self.symbol_dir)}/{self.symbol}/{y}/{file_name}")
            if os.path.isfile(csv_path):
                with open(csv_path, 'r') as fh:
                    """ On 'goûte' les 300 derniers bytes du fichier pour connaitre le N° ISO de semaine. """
                    nb_char = 300
                    fh.seek(0, os.SEEK_END)
                    fh.seek(fh.tell() - nb_char)
                    dt_str = fh.read().split('\n')[-3].split(',')[0]
                last_stamp = self.dt.get_dtstamp_from_dtstr(dt_str)
                o_d = self.dt.get_date_from_dtstamp(last_stamp)                  # Objet date.
                if o_d.isocalendar()[:2] == (iso_year, int_week):               # Filtrage.
                    l_fxyw.append(csv_path)                                     # Candidat sélectionné.

        return l_fxyw, last_stamp   # 0, 1 ou 2 éléments.

    def _download_csv_file(self, csv_yw_required, b_ticks):
        """ Téléchargement, décompactage, décodage, DataFrame, enregistrement sur disque dur.
            - return DataFrame. """

        """ 1 - Demande au helper de construire l'url. """
        url_required = self._helper('url_file', *csv_yw_required, b_ticks)

        """ 2 - Initialisation des bytes. """
        content = b''

        """ 3 - Téléchargement en streaming avec lots de taille {size//50}, permettant l'insertion d'une jauge. """
        y, w = csv_yw_required
        try:
            with requests.get(url_required, stream=True) as req:
                size = int(req.headers['Content-Length'])
                if size < 1_000:
                    return pd.DataFrame()       # return df vide.
                print()
                self.ut.gauge(f"Téléchargement {' ticks ' if b_ticks else 'candles'} {self.symbol} ({y}, {w})")
                for chunk in req.iter_content(chunk_size=size//50):
                    if keyboard.is_pressed(self.kb_break):
                        self.ut.printc("Arrêt manuel demandé.")
                        self.ut.gauge('end')
                        return None
                    content += chunk
                    """ 3.1 - Jauge de progression. """
                    self.ut.gauge()
                self.ut.gauge('end')
        except (Exception,):
            self.ut.printc('Connexion internet interrompue.')
            return None

        """ 4 - Décompactage du buffer de données binaires (codées utf), en mémoire. """
        buf = BytesIO(content)
        f = gzip.GzipFile(fileobj=buf)
        coded_bytes = f.read()

        """ 5 - Décodage : utf-8 pour les candles, utf-16 pour les ticks. """
        codec, type_data = ('utf-16', 'tick') if b_ticks else ('utf-8', 'candle')
        decoded_bytes = coded_bytes.decode(codec)

        """ 6 - Conversion Bytes -> String """
        decoded_str = StringIO(decoded_bytes)

        """ 7 - String to DataFrame. """
        df = pd.read_csv(decoded_str)  # Pas de colonne-index (index_col=0 retiré)

        """ 8 - Retourne le dataframe. """
        return df

    def _df_to_csv(self, df, t_yw, b_ticks):
        """ Contrôle préalable : si le dossier-cible n'existe pas, on le crée. """
        csv_path = self._helper('csv_file', *t_yw, b_ticks)
        os.makedirs(os.path.dirname(csv_path), exist_ok=True)
        df.to_csv(csv_path, index=False)        # Copie du fichier .csv sur disque dur.

    @staticmethod
    def _csv_to_df(csv_file):
        """ Conversion du fichier {csv_file} en dataframe. """
        return pd.read_csv(csv_file) if os.path.isfile(csv_file) else pd.DataFrame()

    def _lists_to_update_db(self, b_ticks):
        """ Retourne 3 listes : l_drop, l_add et l_vol.
        Algorithme pour l_drop et l_add :
        - (1) = l_weeks_db : Liste des semaines réellement existantes en base de données.
        - (2) = l_weeks_csv : Liste des semaines réellement existantes en fichiers csv.
        - Intersection (Inter) = (1) ⋂ (2)
        - l_drop : Liste des semaines à supprimer de la db (celles qui n'existent pas en csv) : (1) - (Inter)
        - l_add : Liste des semaines à ajouter dans la db (celles des csv qui n'existent pas en db) : (2) - (Inter)
        --------------------------------
        l_vol  = liste des semaines (yw) de candles nécessitant un update du champ 'Volume' dans les tables.
                 Sans objet pour les ticks.
        """

        """ 1 et 2 - Listes des semaines réellement existantes en base de données et en fichier .csv. """
        l_weeks_csv, l_weeks_db = self._existing_lists(b_ticks=b_ticks)

        """ 3 - Liste des semaines à supprimer de la db (celles qui n'existent pas en csv). """
        l_drop = list()
        for t_yw in l_weeks_db:
            if t_yw not in l_weeks_csv:
                # print('A supprimer :', t_yw)    # MAP : Permet de lister les semaines qui sont à supprimer.
                l_drop.append(t_yw)

        """ 4 - Liste des semaines à ajouter à la db (celles des csv qui n'existent pas en db). """
        l_add = list()
        for t_yw in l_weeks_csv:
            if t_yw not in l_weeks_db:
                # print('A ajouter :', t_yw)      # MAP : Permet de lister les semaines qui sont à ajouter.
                l_add.append(t_yw)

        """ 5 - Liste des semaines de ticks pour lesquelles les volumes des candles sont à updater. """
        l_vol = list()
        if not b_ticks:     # Renkos non concernés.
            """ Semaines ticks existantes (b_ticks=True). """
            _, l_weeks_tick_db = self._existing_lists(b_ticks=True)
            for yw in l_weeks_tick_db:
                if yw in l_weeks_db:
                    if not self.db_candle.is_volume_in_week(*yw):
                        l_vol.append(yw)

        return l_drop, l_add, l_vol

    def _add_weeks(self, l_yw, b_ticks):
        """
        @param l_yw : Liste des semaines à ajouter à la base de données.
        @param b_ticks: True = ticks, False = candles.
        """
        db = self.db_tick if b_ticks else self.db_candle
        self.ut.printc(f"\nArrêt manuel : garder la touche '{self.kb_break.upper()}' appuyée.\n")
        for iso_yw in l_yw:
            if keyboard.is_pressed(self.kb_break):
                self.ut.printc("Arrêt manuel demandé.")
                break
            l_csv_files, _ = self._get_csv_files(*iso_yw, b_ticks)      # Liste de 0, 1 ou 2 fichiers.
            for csv_file in l_csv_files:
                df = self._csv_to_df(csv_file)
                if df.shape[0] > 0:
                    db.df_to_table(df)

    def get_stamps(self, table_name, nb_stamps):
        db = self.db_tick if (table_name == 'Ticks' or table_name[0] == 'r') else self.db_candle
        return db.get_last_stamps(table_name, nb_stamps)

    def get_datas(self, table_name, final_stamp, nb_points):
        b_ticks = table_name == 'Ticks' or table_name[0] == 'r'     # Ticks ou Renko
        db = self.db_tick if b_ticks else self.db_candle
        return db.get_datas(table_name, final_stamp, nb_points)


def main():  # MAP ***** MAP ***** MAP ***** MAP ***** MAP ***** MAP ***** MAP *****
    """ Code de MAP (mise au point). Espace expérimental : ajouter, supprimer, commenter, décommenter, ... """
    h = CtrlHistos('EUR/USD')
    # h = CtrlHistos('USD/JPY')
    # h = CtrlHistos('GBP/USD')

    """ Candles. """
    # h.download_histos(1, b_ticks=False)
    # h.synchro_db_csv(b_ticks=False)
    h.verify_weeks(b_ticks=False)

    """ Ticks. """
    # h.download_histos(1, b_ticks=True)
    # h.synchro_db_csv(b_ticks=True)
    h.verify_weeks(b_ticks=True)


if __name__ == '__main__':
    main()

 


Oui mais ... une nouvelle version de la classe DataBase est nécessaire pour satisfaire à l'implémentation des calculs dans le node histos :

☐ Fichier /trading/historiques/db.py :

from peewee import SqliteDatabase
from functions.utils import DateTime, Utils
import datetime
import pandas as pd
import numpy as np
import keyboard
import os
""" (Russe - vidéo) https://www.youtube.com/watch?v=8dla28TLvwA   <- Ctrl + Clic
    (Français - txt) https://linuxtut.com/fr/9d4e1d0afac1865acdbb/
    (Anglais - vidéo) https://www.youtube.com/watch?v=Vk6Ptnvqr4M
"""


# noinspection PyUnresolvedReferences
class DataBase:
    def __init__(self, o_ctrl, b_ticks):
        self.o_ctrl = o_ctrl
        self.b_ticks = b_ticks
        self.dt = DateTime()
        self.ut = Utils()
        self.kb_break = o_ctrl.kb_break    # Nom de la touche du clavier défini dans le contrôleur CtrlHistos.
        self.pips = o_ctrl.pips
        self.week_delta = datetime.timedelta(weeks=1)  # <class 'datetime.timedelta'>
        self.day_delta = datetime.timedelta(days=1)  # <class 'datetime.timedelta'>
        self.o_table = None
        pd.set_option('mode.chained_assignment', None)

        """ 3 valeurs pour b_ticks : True (ticks), False (candles) ou None (renkos). """
        file_name = 'tick' if b_ticks is True else ('candle' if b_ticks is False else 'renko')
        sql_file = os.path.abspath(f"{o_ctrl.symbol_dir}/{file_name}.sql")
        self.db = SqliteDatabase(sql_file)
        self.d_tables = dict()

        """ Création physique du fichier *.sql sur disque dur. """
        self.db.connect()

    def change_datetime_column(self, df):
        """ Mise en conformité de la colonne DateTime : string to float. """
        if 'DateTime' in df.columns:
            df = df.rename(columns={'DateTime': 'timestamp'})
            dt0 = np.datetime64(self.dt.get_dtstr_from_dtstamp(0, dt_format='%Y-%m-%d %H:%M:%S'))  # '1970-01-01 01:00'
            t_delta = np.timedelta64(1, 's')        # <class 'numpy.timedelta64'>
            pd_dt = pd.to_datetime(df.timestamp, format='%m/%d/%Y %H:%M:%S.%f')  # format nécessaire, sinon très lent.
            last_stamp = (np.datetime64(pd_dt.iloc[-1]) - dt0) / t_delta
            o_d = self.dt.get_date_from_dtstamp(last_stamp)
            yw = o_d.isocalendar()[:2]      # (year, week)
            len_batch = int(.95 * df.shape[0] / 50)
            l_ts = list()
            self.ut.gauge(f'Conformité {yw}')
            for _from in range(0, df.shape[0], len_batch):
                if keyboard.is_pressed(self.kb_break):
                    self.ut.printc("Arrêt manuel demandé.")
                    self.ut.gauge('end')
                    return None
                _to = _from + len_batch
                self.ut.gauge()
                l_ts += [(np.datetime64(x) - dt0) / t_delta for x in pd_dt[_from: _to]]
            self.ut.gauge('end')
            df.timestamp = l_ts
        return df

    def get_db_sign(self):
        nb_enr = 0
        for o_table in list(self.d_tables.values()):
            nb_enr += o_table.select().count()
        return nb_enr

    def _get_range_from_df(self, df):
        """ Renvoie un tuple de stamps, qui encadrent df, à l'extérieur de df (samedis 22:00). """
        f_stamp = df['timestamp'].iloc[0]  # Premier stamp réel dans df.  <-- début.
        l_stamp = df['timestamp'].iloc[-1]  # Dernier stamp réel dans df.  <-- fin.
        stamp_before = f_stamp + self.dt.get_stamp_offset(f_stamp, b_after=False, num_day=6, _time='22:00')
        stamp_after = l_stamp + self.dt.get_stamp_offset(l_stamp, b_after=True, num_day=6, _time='22:00')
        return stamp_before, stamp_after

    def _get_range_from_yw(self, year, str_week):
        stamp = self.dt.get_dtstamp_from_dtstr(f'{year}-{str_week}-3 10', dt_format='%G-%V-%u %H')  # Mercredi 10:00
        stamp_before = stamp + self.dt.get_stamp_offset(stamp, b_after=False, num_day=6, _time='22:00')
        stamp_after = stamp + self.dt.get_stamp_offset(stamp, b_after=True, num_day=6, _time='22:00')
        return stamp_before, stamp_after

    def is_volume_in_week(self, *yw):
        """ Compte le nombre d'enregistrements total et le nombre d'enregistrements ayant du volume. """

        """ Limites de la semaine : samedi 22:00 à samedi 22:00. """
        range_stamps = self._get_range_from_yw(*yw)

        """ Parcours de toutes les tables et cumul des valeurs. """
        nb_total = nb_volume = 0
        for o_table in list(self.d_tables.values()):
            nb_total += o_table.select(o_table.Volume).where(
                o_table.timestamp.between(*range_stamps)
            ).count()
            nb_volume += o_table.select(o_table.Volume).where(
                o_table.timestamp.between(*range_stamps),
                ~o_table.Volume.is_null()
            ).count()

        return nb_volume > nb_total * .98    # Booléen.

    def delete_week(self, *yw):
        """ Supprime la semaine {yw} de chacune des tables """
        range_stamps = self._get_range_from_yw(*yw)
        for o_table in list(self.d_tables.values())[::-1]:  # Parcours à l'envers.
            o_table.delete().where(o_table.timestamp.between(*range_stamps)).execute()

    def get_datas(self, table_name, final_stamp, nb_points):
        o_table = self.get_otable(table_name)

        """ 1 - Lecture des {nb_points} derniers points, le plus récent en premier. """
        l_datas = o_table.select().order_by(o_table.timestamp.desc()).where(o_table.timestamp <= final_stamp).limit(nb_points)
        """ 2 - Peewee -> DataFrame Pandas """
        df = pd.DataFrame(list(l_datas.dicts()))

        """ 3 - DataFrame Pandas -> Numpy avec retournement : le plus récent en dernier. """
        np_datas = df.values[::-1]      # Retournement

        return np_datas

    def get_otable(self, table_name):
        if table_name == 'Ticks':
            return self.o_table
        else:
            d_tables = self.o_renko.d_tables if table_name[0] == 'r' else self.d_tables
            return d_tables.get(table_name)

    def get_last_stamps(self, table_name, nb_points):
        o_table = self.get_otable(table_name)
        """ 1 - Lecture des {nb_points} derniers points, le plus récent en premier. """
        l_datas = o_table.select(o_table.timestamp).order_by(o_table.timestamp.desc()).limit(nb_points)

        """ 2 - Peewee -> DataFrame Pandas """
        df = pd.DataFrame(list(l_datas.dicts()))

        """ 3 - DataFrame Pandas -> List avec retournement : le plus récent en dernier. """
        return list(df['timestamp'])[::-1] if df.shape[0] > 0 else []

    """ Méthodes surchargées. """
    def df_to_table(self, df):
        return True

    def update_derived(self, df):
        pass

    def update_derived_table(self, df, stamp_before, stamp_after, o_derived_table):
        pass

    def week_exists(self, year, str_week, last_stamp):
        return False

 


Oui mais ... une nouvelle version de la classe CtrlCalcul est nécessaire pour satisfaire à l'implémentation des calculs dans le node histos :

Fichier /pc/ctrl_node.py :

# Imports externes
import copy
import shutil
import os
import hashlib
import numpy as np
import pickle

# Imports internes
from functions.utils import Dictionary, DateTime, Utils
from pc.ui_node import UiNode
from pc.parameters import Parameters
from pc.ctrl_socket import CtrlSocket
from pc.ctrl_edge import CtrlEdge


class Helper:
    @staticmethod
    def deepcopy(reference):
        return copy.deepcopy(reference)

    @staticmethod
    def new_od(dic=None):
        return Dictionary(dic)

    @staticmethod
    def join(*l_variants, sep='-'):
        """ Exemple d'appel 1 : y = self.join(a, b, c, d, ...)
            Exemple d'appel 2 : y = self.join(*l_x)     <-- Ne pas oublier '*'
        Concatène les éléments de l_strings en une chaine. Les chaînes vides sont ignorées. le séparateur est sep.
        :param l_variants: Nombre quelconque d'éléments de tout type.
        :param sep: '-' par défaut. Peut être '' (vide) ou \n (retour à la ligne) ... ou autre.
        :return: Chaîne concaténée.
            Exemple l_strings = ['Paris', 'Lille', '', 'Bruxelles', 'Prague'] => return "Paris-Lille-Bruxelles-Prague"
        """
        string_txt = ''
        for string in l_variants:
            if string != '':
                string_txt += f'{sep}{string}'  # Accepte tout type de variable pour string.
        return string_txt[len(sep):]            # Suppression du 1er sep.


class CtrlNode(Helper):
    def __init__(self, o_scene, s_id, pos):
        self.o_scene = o_scene
        self.s_id = s_id  # Clé d'entrée dans le super-dictionnaire pkl.
        self.od_pkl = self.o_scene.o_pkl.od_pkl
        self.o_grnode = None
        self.o_params = None
        self.pos = pos  # pos = (x, y)
        self.width = 96
        self.height = 60
        self.type = ''
        self.ut = Utils()
        self.main_key = f"Paramètres du node '{self.s_id}'"
        self.b_chk = bool(self.od_pkl.read([self.s_id, 'b_chk'], True))
        self.b_on = self.b_chk
        self.child_file = ''

        """ Sockets. """
        self.lo_sockets_in = list()     # Liste d'objets 'sockets in'  instanciés.
        self.lo_sockets_out = list()    # Liste d'objets 'sockets out' instanciés.
        self.default_color = '#88bbff'  # Couleur par défaut des sockets de sortie.
        self.l_sep_inputs = []          # Emplacement des séparateurs (après les entrées indiquées).
        self.l_sep_outputs = []         # Emplacement des séparateurs (après les sorties indiquées).

        """ Contenu spécifique. """
        self.o_grcontent = None

        """ Secousses => Court-circuits. """
        self.dt = DateTime()
        self.l_short_circuits = list()  # Surchargé par les classes dérivées.
        self.l_shortables = list()
        self.b_removable = True  # Les edges d'entrée peuvent être détruits en cas de court-circuit.
        self.k_shake = 0  # Nombre de secousses.
        self.b_shaking = False  # Secousse en cours.
        self.x, self.y = 0, 0  # Position antérieure (x, y), mémorisée.
        self.dx, self.dy = 0, 0  # Deltas (variations) antérieurs, mémorisés.

    def setup(self, child_file):
        """ Code appelant : classe dérivée, on connait donc ses attributs. """
        """ Paramètres. """
        self.o_params = Parameters(self)

        """ Hauteur automatique du node : elle dépend du nombre de sockets en entrée ou en sortie (le plus grand).  """
        h_title = self.d_display['geometry']['h_title']
        nb_sockets_max = max(len(self.ld_inputs), len(self.ld_outputs))  # En fait : Sockets + séparateurs.
        first_y = 16 * (1 + (h_title + 4) // 16)  # Multiple de 16.
        self.height = first_y + nb_sockets_max * 16 - 4

        """ Nouveau node par drag & drop. """
        self.child_file = child_file
        b_new = self.pos != (0, 0)  # Nouveau node (node dropé).
        if b_new:
            """ - Cas d'un nouveau node (dropé).
                - Son centre apparaît exactement à la position du curseur de la souris.
                - On mémorise son path et sa position. """
            self.pos = self.pos[0] - self.width // 2, self.pos[1] - self.height // 2
            path = 'nodes' + os.path.relpath(child_file).split('nodes')[1][:-3].replace(os.sep, '.')
            self.od_pkl.write([self.s_id, 'path'], path)
        else:
            self.pos = self.od_pkl.read([self.s_id, 'position'], (0, 0))

        """ Instanciation de l'UI. """
        self.o_grnode = UiNode(self)  # Gestion de l'UI (Interface utilisateur) : dessin couleurs, ...
        if b_new:
            self.o_grnode.save_pos()  # Important ! à la fin (contient o_pkl.backup()).
        self.o_scene.o_grscene.addItem(self.o_grnode)  # Incorporation dans la scène.

        """ Sockets. """
        num_socket = 0
        for i, d_input in enumerate(self.ld_inputs):
            """ On complète les dictionnaires de base. """
            if isinstance(d_input, dict):  # Si ce n'est pas un dictionnaire, c'est un séparateur.
                d_input['pos'] = True, i  # On ajoute l'emplacement. Tuple (input: True, num_position).
                d_input['id'] = self.id, num_socket  # On ajoute l'identifiant. Tuple (id_node, num_socket)
                self.lo_sockets_in.append(CtrlSocket(self, d_input))  # Socket instancié et ajouté à la liste.
                num_socket += 1
        num_socket = 0
        for i, d_output in enumerate(self.ld_outputs):
            """ On complète les dictionnaires de base. """
            if isinstance(d_output, dict):  # Si ce n'est pas un dictionnaire, c'est un séparateur.
                d_output['pos'] = False, i  # On ajoute l'emplacement. Tuple (input: False, num_position).
                d_output['id'] = self.id, num_socket  # On ajoute l'identifiant. Tuple (id_node, num_socket)
                self.lo_sockets_out.append(CtrlSocket(self, d_output))  # Socket instancié et ajouté à la liste.
                num_socket += 1

        self.short_circuit_check()

    @property
    def id(self):
        """ Ex : 'Node 14' renvoie 14. """
        return int(self.s_id[4:])

    @property
    def d_display(self):
        """ Attributs par défaut pouvant être surchargés dans les classes dérivées. """
        return {
            'geometry': {'h_title': 32, 'round': 6},
            'col_pen': {'select': '#ffa637', 'hover': '#888', 'on': "#000", 'off': '#aaaa00'},  # ordre = priorité.
            'col_brush': {'on': "#8888bbcc", 'off': '#cccccccc'},
            'thick_pen': {'hover': 4., 'leave': 2.}
        }

    def get_default(self):
        """ Titre du node et couleurs des 'socket_out' pour chaque type de node. """
        """ Complétée par les classes dérivées avec leurs paramètres statiques et dynamiques. """

        """ 1) Paramètres communs : Titre, bouton et autant de couleurs que de sorties. """
        d_default = {
            'Titre du node': 'Choisir un titre',
            '*🔆': ''        # Un astérisque devant indique qu'il s'agit d'un bouton.
        }

        """ 2) Ajout de paramètres fixes spécifiques au type de node, AVANT les couleurs. """
        d_default.update(self.fixed0_params())  # méthode fixed_params()

        """ 3) Couleurs. """
        d_colors = dict()
        for d_output in self.ld_outputs:
            if isinstance(d_output, dict):
                d_colors[d_output['label']] = self.default_color
        if d_colors:
            d_default['Couleurs'] = d_colors

        """ 4) Ajout de paramètres fixes spécifiques au type de node, APRES les couleurs. """
        d_default.update(self.fixed_params())  # méthode fixed_params()

        """ 5) Ajout des paramètres dynamiques, qui dépendent des signaux entrants. """
        d_default.update(self.dynamic_params)  # propriété dynamic_params

        """ 6) Dictionnaire complet. Paramètres : fixes communs + fixes spécifiques + dynamiques. """
        return {self.main_key: d_default}

    def get_param(self, l_key, v_default=None):
        if self.o_params is None:
            return v_default
        od_params = Dictionary(self.o_params.od_params[self.main_key])
        return od_params.read(l_key, v_default)

    def set_checked(self, val):
        self.b_chk = bool(val)

        """ Update my_signals() """
        for o_socket_out in self.lo_sockets_out:
            o_socket_out.to_update()

        """ Infrastructure. """
        self.o_scene.infrastructure()

        """ Enregistrement. """
        self.od_pkl.write([self.s_id, 'b_chk'], self.b_chk)
        self.o_scene.o_ur.b_action = True  # Ajout dans l'historique du "Undo-Redo".
        self.o_scene.o_pkl.backup()

    """ ************************* Méthodes à surcharger ************************** """
    @property
    def ld_inputs(self):
        return list()

    @property
    def ld_outputs(self):
        return list()

    def fixed0_params(self):
        """ - Renvoie un dictionnaire de paramètres, indépendants des connexions.
            - Placé AVANT les couleurs.
            - Exemple : le node de type 'Labo' renvoie 'Expérience', etc. """
        return {}

    def fixed_params(self):
        """ - Renvoie un dictionnaire de paramètres, indépendants des connexions.
            - Placé APRES les couleurs.
            - Exemple : le node de type 'Signaux' renvoie 'Sinus', 'Cosinus', etc. """
        return {}

    def bundle_params(self):
        """ - Renvoie un dictionnaire de paramètres pour chaque entrée.
            - Une entrée reçoit un faisceau de signaux (bundle). """
        return {}

    def my_params(self, context):
        """
        :param context: Liste de 7 valeurs, pouvant être utilisée pour construire le dictionnaire de sortie :
            - context[0] typ_id_from = Type + id du node en amont, producteur du signal. Ex : 'MM12'.
            - context[1] signal_ante_from = Nom du signal créé par le node en amont du node producteur. Ex : 'Cosinus'.
            - context[2] signal_now_from = Nom du signal créé par le node en amont. Ex : 'SMA18'.
            - context[3] signal_source_from = 'Provenance du signal' affiché dans le node en amont.
            - context[4] (num_input, num_signal) = Tuple. num_signal = ordre du signal dans le faisceau entrant.
            - context[5] signal_title = Titre du signal dans le dockable des paramètres.
            - context[6] signal_source = 'Provenance du signal' -> texte avec retours à la ligne.
        :return: Dictionnaire formaté pour les paramètres.
            |_ Exemple {'Type de MM': ['SMA', {'values': ['SMA', 'EMA', 'SMMA', 'LWMA']}], "Périodes (sep=',')": '14'}
        """
        return {}

    def my_signals(self, l_signals_in, num_socket_out):
        """ Description des signaux délivrés à la sortie N° num_socket_out.
        Comment ça marche ?
            - Cette fonction prend en entrée tous les signaux entrants ainsi que les paramètres pkl.
            - Un traitement spécifique est décrit, qui produit plusieurs signaux.
            - Ces signaux sont ensuite distribués aux différentes sorties du node.
        :param l_signals_in: Signaux d'entrée. Cette liste contient autant d'éléments que d'entrées connectées.
            - Chaque élément est un tuple : (typ_id_from, signal_ante_from, signal_now_from, signal_source_from, \
            (num_input, num_signal), signal_title, typ_id, signal_ante, signal_source)
            (Les variables suffixées de '_from' proviennent du node en amont).
        :param num_socket_out: N° de la sortie.
        :return: Liste de signaux exclusivement destinés à la sortie N° num_socket_out.
            Chaque signal est décrit par un tuple à 4 valeurs : (typ_id, signal_ante, signal_now, signal_source).
            La valeur de signal_now doit être soigneusement choisie afin d'éviter des doublons de noms.
        """
        raise SystemExit(self.child_file + '\nCtrlNode.my_signals() : Cette méthode doit être surchargée.')
    """ *********************** Fin - Méthodes à surcharger ********************** """

    @property
    def dynamic_params(self):
        """ Code appelant : self.get_default().
        Ces paramètres sont dynamiques car ils dépendent des signaux connectés aux entrées.
        - Ce node demande la liste des signaux à chaque socket d'entrée : o_socket_in.l_signals
        - Cette liste est formatée de la même façon pour tous les types de node.
        - C'est une liste de tuples, chaque tuple a 4 valeurs et caractérise un signal.
        - Il est rappelé qu'un edge véhicule non pas un signal, mais un faisceau de signaux (un faisceau).
        - Le format d'un tuple est le suivant :
            - typ_id_from = Type + id du node en amont, producteur du signal. Ex : 'MM12'.
            - signal_ante_from = Nom du signal créé par le node en amont du node producteur. Ex : 'Cosinus'.
            - signal_now_from = Nom du signal créé par le node producteur. Ex : 'SMA18'.
            - signal_source_from = Texte destiné à être affiché dans le dockable, dans 'Provenance du signal :'
        - Node
            |_ Socket_in
                |_ Faisceau de signaux
                    |_ Signal
                        |_ Paramètre dynamique.
        """
        if not self.lo_sockets_in:
            """ Tant que l'initialisation n'est pas terminée, lo_sockets_in est une liste vide."""
            return {}

        d_params = dict()
        nb_inputs = len(self.lo_sockets_in)
        for k in range(nb_inputs):
            d_input = self.bundle_params()  # Paramètres pour cette entrée.
            """ Récupération en cascade : Socket_in -> Edge -> Socket_out -> Node -> Socket_in -> etc. """
            for num_signal, t_signal in enumerate(self.lo_sockets_in[k].l_signals):
                typ_id_from, signal_ante_from, signal_now_from, signal_source_from = t_signal[:4]   # tuple à 4 valeurs.
                signal_title = self.join(typ_id_from, signal_ante_from, signal_now_from)
                signal_source = self.join(signal_source_from, self.join(typ_id_from, signal_now_from), sep='\n')
                """ Paramètres communs à tous les signaux. """
                d_input[signal_title] = {
                    '$Provenance du signal :': [signal_source, {'readonly': True}],
                    'Signal actif': True,
                }
                l_args = [num_signal, signal_title, signal_source] + list(t_signal)
                d_input[signal_title].update(self.my_params(l_args))
                d_params[f'Entrée {k}'] = d_input
        if nb_inputs == 1 and 'Entrée 0' in d_params:
            """ S'il n'y a qu'une entrée, inutile d'écrire, dans le titre des paramètres, de laquelle il s'agit. """
            d_params = d_params['Entrée 0']
        return d_params

    def get_signals(self, num_socket):
        """ Code appelant : oSocket_out N° num_socket.
        - Ce node crée des signaux et les fournit aux nodes en aval par l'intermédiaire des sockets et des edges.
        - Il crée autant de faisceaux de signaux (vecteurs) qu'il a de sorties.
        - Pour chaque sortie, la liste fournie est formatée de la même façon pour tout type de node.
        - Voir détails ci-dessus, dans dynamic_params().
        """
        if not (self.b_chk and self.lo_sockets_out):  # Setup en cours
            return []

        """ Update od_params. """
        self.o_params.set_params()

        nb_inputs = len(self.lo_sockets_in)
        l_all_signals_in = list()
        for k in range(nb_inputs):
            """ Le nom de l'entrée se sera pas écrit dans le titre des paramètres, s'il n'y en a qu'une. """
            l_name_input = [f'Entrée {k}'] if nb_inputs > 1 else []
            l_signals = list()  # Liste des signaux actifs de l'entrée N° k.
            """ self.lo_sockets_in[k].l_signals = Faisceau de signaux arrivant sur l'entrée k. """
            for num_signal, t_signal in enumerate(self.lo_sockets_in[k].l_signals):
                typ_id_from, signal_ante_from, signal_now_from, signal_source_from = t_signal[:4]  # tuple à 4 valeurs.
                signal_title = self.join(typ_id_from, signal_ante_from, signal_now_from)
                if self.get_param(l_name_input + [signal_title, 'Signal actif'], False):
                    typ_id, signal_ante = f'{self.type}{self.id}', self.join(signal_ante_from, signal_now_from)
                    signal_source = self.join(signal_source_from, self.join(typ_id_from, signal_now_from), sep='\n')
                    l_signals.append(t_signal[:4] + ((k, num_signal), signal_title, typ_id, signal_ante, signal_source))
            l_all_signals_in.append(l_signals)

        return self.my_signals(l_all_signals_in, num_socket)

    """ ******************************* Secousses ******************************* """
    def short_circuit_check(self):
        """ Passage unique dans cette méthode, au démarrage. Fin programme si erreur.
            - Peut-on supprimer les edges d'entrée en cas de court-circuit ?
            - Oui, à condition que TOUTES les sorties existent dans self.l_short_circuits. """
        l_indx_outputs = list()  # Liste des index de sortie.
        d_counter = dict()
        for short_circuit in self.l_short_circuits:  # Exemple -> self.l_short_circuits = [(0, 0), (0, 2)]
            num_input = short_circuit[0]
            num_output = short_circuit[1]
            if num_input >= len(self.ld_inputs):
                """ Fin programme. """
                raise SystemExit(f"La liste des court-circuits du type de node '{self.type}' est erronée."
                                 f"\nL'entrée N°{num_input} est hors limites."
                                 f"\nRevoir le contenu de self.l_short_circuits dans le setup de '{self.type}'.")
            if num_output >= len(self.ld_outputs):
                """ Fin programme. """
                raise SystemExit(f"La liste des court-circuits du type de node '{self.type}' est erronée."
                                 f"\nLa sortie N°{num_output} est hors limites."
                                 f"\nRevoir le contenu de self.l_short_circuits dans le setup de '{self.type}'.")
            l_indx_outputs.append(num_output)  # Dans cet exemple ->                   ( 0  ,    2 )
            if num_output in d_counter:
                d_counter[num_output] += 1
            else:
                d_counter[num_output] = 1
        for i, d_output in enumerate(self.ld_outputs):
            if isinstance(d_output, dict):
                if i not in l_indx_outputs:
                    """ Condition non satisfaite. """
                    self.b_removable = False  # b_removable est à True dans __init__().
                    break
        """ Contrainte technologique : Plusieurs entrées sur une même sortie est interdit ! """
        for nb_inputs in d_counter.values():
            if nb_inputs > 1:
                """ Fin programme. """
                raise SystemExit("Plusieurs entrées sont court-circuitables sur une même sortie."
                                 f"\nRevoir le contenu de self.l_short_circuits dans le setup de '{self.type}'.")

    def set_shortables(self):
        """ Méthode appelée lorsque le bouton gauche de la souris est appuyé (sur ce node).
            - Chaque type de node possède sa liste de court-circuits faisables.
            - Retourne une liste de tuples.
            - Chaque tuple est composé des 2 gr_edges pouvant être réunis pour n'en faire qu'un. """
        self.l_shortables = list()
        for short_circuit in self.l_short_circuits:  # Exemple -> self.l_short_circuits = [(0, 0), (0, 2)]
            lo_gredges_in = list(self.lo_sockets_in[short_circuit[0]].get_gredges())
            lo_gredges_out = list(self.lo_sockets_out[short_circuit[1]].get_gredges())
            if lo_gredges_in:  # Cette liste contient 0 ou 1 edge.
                for o_gredge_out in lo_gredges_out:
                    self.l_shortables.append((lo_gredges_in[0], o_gredge_out))

    def shake(self):
        def shortcircuit():
            for to_gredge in self.l_shortables:  # Tuples de gredges.
                o_socket_out = to_gredge[0].o_edge.o_socket_out  # From, depuis.
                o_socket_in = to_gredge[1].o_edge.o_socket_in  # To, jusqu'à.

                """ Suppression des 2 edges du tuple. """
                self.o_scene.o_grscene.removeItem(to_gredge[1])  # Edge de sortie.
                if self.b_removable:
                    self.o_scene.o_grscene.removeItem(to_gredge[0])  # Edge d'entrée.

                """ Création du nouvel edge de court-circuit. """
                CtrlEdge(self.o_scene, o_socket_out, o_socket_in)
                o_socket_in.to_update()

            """ Persistance. """
            self.o_scene.save_edges(backup=True)

            """ Infrastructure. """
            self.o_scene.infrastructure()

        def raz_shakes():
            """ Après cette raz, un nouveau court-circuit est autorisé. """
            self.k_shake = 0
            self.b_shaking = False

        if self.b_shaking or not self.l_shortables:
            return
        self.dt.delay(raz_shakes, delay=300)
        x, y = self.o_grnode.pos().x(), self.o_grnode.pos().y()  # Positions.
        dx, dy = x - self.x, y - self.y  # Déplacements (delta x, delta y).
        b_cond = (round(dx*self.dx) < 0) or (round(dy*self.dy) < 0)

        """ Mémorisation. """
        self.x, self.y = x, y
        self.dx, self.dy = dx, dy

        if b_cond:
            """ Le mouvement a changé de sens => incrémentation du compteur. """
            self.k_shake += 1
            if self.k_shake > 10:  # Valeur à ajuster.
                self.b_shaking = True
                self.k_shake = 0
                self.dt.delay(shortcircuit, delay=50)

    """ ***************************** Fin secousses ***************************** """

    def set_colors(self, l_keys):
        """ - La couleur est appliquée au socket_out concerné.
            - Elle est également appliquée à tous ses éventuels edges connectés.
            - A leur tour, ceux-ci propagent cette couleur à leur socket d'arrivée. """
        for o_socket_out in self.lo_sockets_out:
            if o_socket_out.__class__.__name__ == 'CtrlSocket':
                if o_socket_out.label == l_keys[-1]:
                    """ Affectation de la couleur au socket_out concerné. """
                    o_socket_out.color = self.get_param(l_keys, '#666')
                    """ Récupération de tous les edgess partant de ce socket. """
                    l_gredges = list(o_socket_out.get_gredges())
                    for o_gredge in l_gredges:
                        """ Affectation de la même couleur à chaque edge. """
                        o_gredge.o_edge.color = o_socket_out.color
                        """ Affectation de la même couleur à chaque socket d'arrivée. """
                        o_gredge.o_edge.o_socket_in.color = o_socket_out.color
        """ Rafraîchissement de l'affichage. """
        self.o_grnode.update()

    def need_update(self, l_keys):
        """
        :param l_keys: Clé, dans od_params, du paramètre modifié (liste).
        :return: True si les nodes en aval nécessitent d'être recalculés, False sinon.
        """
        l_param_names = list(self.my_params('')) + list(self.fixed_params())     # Les paramètres du dockable.
        b_actif = self.get_param([l_keys[1], 'Signal actif'], True)  # True 'Signal actif' n'existe pas (générateurs).
        """ Update nécessaire (si l'état actif a changé) OU (si actif ET un des paramètres a changé). """
        return (l_keys[-1] == 'Signal actif') or (b_actif and l_keys[-1] in l_param_names)

    def get_state(self):
        """ Chaque node, selon ses spécificités, retourne un booléen de son état on/off (True/False). """
        b_on = False
        if self.b_chk:
            for key in self.o_params.od_params.key_list():
                if key[-1] == 'Signal actif':
                    if self.o_params.od_params.read(key, False):
                        b_on = True
                        break
        return b_on

    def recalculate(self, num_input):
        """ - Propagation vers l'aval. Les nodes finaux sont des afficheurs ou des actionneurs.
            - Après cela, les nodes finaux construisent leur dict de calcul par propagation vers l'amont. """
        for o_socket_out in self.lo_sockets_out:
            o_socket_out.recalculate()

    def update_dcalc(self, d_calc):
        if self.b_on:
            """ Propagation amont. """
            for o_socket_in in self.lo_sockets_in:
                o_socket_in.update_dcalc(d_calc)

        od_params = Dictionary(self.o_params.od_params[self.main_key])

        """ Construction du dictionnaire pour les calculs, à partir du od_params. """
        od_calc = Dictionary()
        path = 'nodes' + os.path.relpath(self.child_file).split('nodes')[1][:-3].replace(os.sep, '.')
        od_calc.write('Compute', self.type)
        od_calc.write('Yaml file', self.get_yaml_files()[0])
        od_calc.write('Compile from', path)
        od_calc.write('Checked', self.b_chk)
        for key in od_params.key_list():
            if key[0] == 'Titre du node' or key[0] == 'Couleurs' or key[0] == '*🔆':
                continue
            if key[-1] == '$Provenance du signal :':
                continue
            od_calc.write(key, od_params.read(key))

        """ Modification 'en place'. """
        d_calc[self.s_id] = dict(od_calc)

    def get_yaml_files(self):
        py_file = os.path.basename(self.child_file)
        seeder = self.child_file.replace(py_file, f'{py_file[:-3]}_seeder.yaml')

        path_node = f"{__file__.split('pc')[0]}backups/{self.o_scene.graph_name}/node{self.id}"
        extended_params = os.path.abspath(f'{path_node}/{self.id}-{py_file[:-3]}.yaml')

        return extended_params, seeder

    def yaml(self):
        """ Méthode appelée lorsqu'on clique sur le bouton '🔆'. Lancement de l'appli associée à '.yaml'. """
        extended_params, seeder_file = self.get_yaml_files()
        path_node = os.path.dirname(extended_params)
        os.makedirs(path_node, exist_ok=True)
        if not os.path.exists(extended_params):
            """ Création du fichier de paramètress étendus en yaml : copie du seeder si possible, sinon vide. """
            if os.path.exists(seeder_file):
                shutil.copyfile(seeder_file, extended_params)
            else:
                with open(extended_params, 'w'):
                    pass
        if not os.path.exists(extended_params):
            return
        """ Lancement du fichier avec application associée. """
        os.startfile(extended_params)

    def rebuild(self, b_input=True):
        """ Reconstruction du node suite à un changement dynamique du nombre d'entrées/sorties. """
        """ Suppression des (sockets + edges) de ce node. """
        lo_sockets = self.lo_sockets_in if b_input else self.lo_sockets_out
        for o_socket in lo_sockets:
            """ Suppression des éventuels edges connectés à ce socket. """
            for o_gredge in list(o_socket.get_gredges()):
                self.o_scene.o_grscene.removeItem(o_gredge)
            """ Suppression du socket. """
            self.o_scene.o_grscene.removeItem(o_socket.o_grsocket)
            o_socket.o_grsocket = None  # Suppression de l'objet.

        """ Nettoyage de la liste des entrées/sorties. """
        lo_sockets.clear()

        """ Redessine le node avec sa nouvelle hauteur. """
        ld_inputs, ld_outputs = self.deepcopy(self.ld_inputs), self.deepcopy(self.ld_outputs)
        nb_inputs, nb_outputs = len(ld_inputs), len(ld_outputs)
        h_title = self.d_display['geometry']['h_title']
        first_y = 16 * (1 + (h_title + 4) // 16)
        self.height = first_y + max(nb_inputs, nb_outputs) * 16 - 4  # Nouvelle hauteur.
        self.o_grnode.set_outline()

        """ Régénération de sockets. """
        for pos, d_input in enumerate(ld_inputs):
            """ On complète les dictionnaires de base. Pas de séparateurs. """
            if isinstance(d_input, dict):
                indx = len(self.lo_sockets_in)
                d_input['pos'] = True, pos  # On ajoute la pos. Tuple (input: True, num_position).
                d_input['id'] = self.id, indx  # On ajoute l'identifiant. Tuple (id_node, num_socket)
                self.lo_sockets_in.append(CtrlSocket(self, d_input))  # Socket instancié et ajouté à la liste.

        for pos, d_output in enumerate(ld_outputs):
            """ On complète les dictionnaires de base. Pas de séparateurs. """
            if isinstance(d_output, dict):
                indx = len(self.lo_sockets_out)
                d_output['pos'] = False, pos  # On ajoute la pos. Tuple (output: False, num_position).
                d_output['id'] = self.id, indx  # On ajoute l'identifiant. Tuple (id_node, num_socket)
                self.lo_sockets_out.append(CtrlSocket(self, d_output))  # Socket instancié et ajouté à la liste.

        """ Régénération des edges. """
        self.o_scene.show_edges()
        self.o_scene.save_edges()

        """ Dockable des paramètres : le nombre d'entrées a changé. """
        self.o_params.set_params()
        self.dt.delay(self.o_scene.show_params, 2000)  # delay : Permet de continuer la saisie dans le dockable.

    def refresh(self, l_keys):
        if l_keys[-1] == '🔆':
            self.yaml()
        elif l_keys[-1] == 'Titre du node':
            """ Titre du node. """
            self.o_grnode.set_title()

        elif l_keys[-2] == 'Couleurs':
            """ Couleur des sockets et des edges. """
            self.set_colors(l_keys[1:])

        else:
            """ Fin de refresh - Appel conditionnel à infrastructure(). """
            if self.b_chk and (l_keys[0] == 'infrastructure' or l_keys[-1] == 'Signal actif'):
                """ Les nodes de type générateur doivent insérer le mot 'infrastructure'
                en 1ère place dans la liste l_keys[], car ils n'ont pas 'Signal actif' dans leurs paramètres. """
                self.o_scene.infrastructure()

            """ Tous les paramètres, autres que le titre et les couleurs, influencent les calculs. """
            self.o_scene.recalculate()


class CtrlCalcul(Helper):
    """ Attributs de classe. """
    d_servers = dict()      # Dictionnaire de classe.
    od_calcs = Dictionary()

    def __init__(self):
        self.id = None
        self.s_id = None
        self.compute = None
        self.o_yaml = None
        self.l_mykey = list()
        self.np_array = None
        self.b_forcing_calc = False
        self.od_descr = Dictionary()
        self.od_mydic = Dictionary()
        self.nb_points = self.len_buffer
        self.no_params = ['Compute', 'Yaml file', 'Compile from', 'Checked']

    def setup_server(self, id_server):
        self.id = id_server
        self.l_mykey = self.get_keys_from_idnode(id_server)  # Clé dans self.od_calcs.
        self.od_mydic = Dictionary(self.od_calcs.read(self.l_mykey))
        self.o_yaml.yaml_file = self.od_mydic.read(['Yaml file'])
        self.compute = self.od_mydic.read(['Compute'])
        self.s_id = f"{self.compute}{self.id}"
        self.o_yaml.update_odyaml()  # Update du super-dictionnaire des paramètres (self.o_yaml.od_yaml).

    @property
    def len_buffer(self):
        for l_key in self.od_calcs.key_list():
            """ Recherche dans dictionnaire global (tous les nodes). """
            if l_key[-1] == 'Abscisse':
                len_buffer = self.od_calcs.read(l_key)
                break
        else:
            len_buffer = 10**5
        return int(len_buffer)

    @property
    def dt_servers_in(self):
        """ - Le dictionnaire retourné contient autant de clés que d'entrées connectées et actives.
            - Chaque valeur est un tuple : (o_server, signature, edge). """

        """ 1 - Récupération de la liste de tous les edges dans self.od_calcs. """
        s_edges = set()
        ll_keys = self.od_calcs.key_list()
        for l_keys in ll_keys:
            if l_keys[-1] == 'edges':
                s_edges.update(self.od_calcs.read(l_keys))

        """ 2 - Instance et signature des nodes-serveurs en amont connectés aux entrées. """
        d_servers = dict()
        for edge in list(s_edges):
            if edge[1][0] == self.id:
                """ edge = Edge connecté à l'entrée de self. """
                o_server, signature = self.get_server(edge[0][0])
                d_servers[f'e{edge[1][1]}'] = o_server, signature, edge

        return d_servers

    @classmethod
    def get_oserver(cls, id_server, compile_from):
        """ Ce code est appelé par un node-client. Il aura un serveur par entrée.
            - Chaque node-serveur est associé à une instance de classe Calcul (dérivée de CtrlCalcul).
            - Au premier passage concernant ce node-serveur, l'objet o_server n'existe pas.
            - Dans ce cas, on le crée par compilation dynamique et on l'enregistre dans un dictionnaire.
            - Le dictionnaire est un dictionnaire de classe : cls.d_servers.
            - Cela permet d'éviter de créer une nouvelle instance si celle-ci existe déjà.
        """
        server_sid = f"{compile_from.split('.')[-1]}{id_server}"    # Clé du dictionnaire des serveurs. ex : macd3
        if server_sid not in cls.d_servers:
            """ Compilation dynamique. Si l'objet o_server n'existe pas, on le crée."""
            d_context = {}
            code = f"from {compile_from} import Calcul; o_server=Calcul()"
            try:
                compiled = compile(code, '<string>', 'exec')
                eval(compiled, d_context)
                cls.d_servers[server_sid] = d_context['o_server']
            except (Exception,) as err:
                print(server_sid, ': Erreur de compilation dans', compile_from, '\n', err)
                return None
        return cls.d_servers[server_sid]

    def get_keys_from_idnode(self, id_node):
        """
        @param id_node: ex : 1
        @return: Renvoie la clé d'accès au node 'node_sid', dans le dictionnaire général self.od_calcs.
        """
        node_sid = f'Node{id_node}'     # Ex : 'Node1'
        for l_key in self.od_calcs.key_list():
            if l_key[1] == node_sid:
                return l_key[:2]  # ['calc_*', 'Node*'] Clé du node-serveur dans le dictionnaire général self.od_calcs.

    def get_server(self, id_server):
        l_key_node = self.get_keys_from_idnode(id_server)
        compile_from = self.od_calcs.read(l_key_node + ['Compile from'])
        if compile_from is None:
            return None
        child = self.od_calcs.read(l_key_node + ['Expérience'], '')
        compile_from += f'0{child}'[-2:] if isinstance(child, int) else ''
        o_server = self.get_oserver(id_server, compile_from)
        if o_server is None:
            return None, None
        o_server.setup_server(id_server)
        return o_server, o_server.get_signature()

    def get_signature(self):
        """ La valeur de retour est un hash de od_yaml, od_mydic, liste des signatures des nodes-serveurs. """
        l_signs_in = list()
        for server_in in self.dt_servers_in.values():
            """ server_in = tuple (instance, signature) du node-serveur. """
            l_signs_in.append(server_in[1])     # server_in[1] = signature.

        """ Assemblage des constituants de la signature. """
        params = (self.o_yaml.od_yaml, self.od_mydic, l_signs_in, self.len_buffer)
        return f'{self.id}-{hashlib.sha256(str(params).encode("utf-8")).hexdigest()}'

    def calculation(self):
        """ Ici node-serveur. Première méthode appelée par le node-client en aval, avant de commencer ses calculs. """
        """ 1 - Chargement des derniers datas mémorisés en *.calc (pickle) : self.od_descr et self.np_array. """
        self.load_last_datas()   # Super-dictionnaire et tableau numpy vides si *.calc inexistant ou erroné.

        """ 2 - Création du dict. de description actualisé od_descr, aux fins de comparaison avec self.od_descr. """
        od_descr = self.setup_od_descr()

        """ 3 - Comparaison sélective des descriptifs now et ante. Si aucune différence, return (calculs inutiles). """
        self.nb_points = self.len_buffer
        if self.now_equal_ante(od_descr):
            return
        self.od_descr = od_descr

        """ 4 - Propagation des calculs en amont. """
        min_points = self.len_buffer
        for o_server, _, edge in self.dt_servers_in.values():
            """ Bien que ce soit cette même méthode, ce n'est pas une récursivité, mais une propagation amont."""
            o_server.calculation()
            min_points = min(min_points, o_server.nb_points)
            if o_server.b_forcing_calc:
                self.b_forcing_calc = True

        """ 5 - self.np_array vide. """
        nb_cols = len([key for key in list(self.od_descr.keys()) if key.startswith(f'{self.s_id}-')])
        self.np_array = np.full((self.len_buffer, nb_cols+1), fill_value=np.nan, dtype=np.float32)  # nan
        self.np_array[:, 0] = 0        # col0 = status (0 ou 1)

        """ 6 - Traitement des calculs.  """
        self.pre_process()
        self.nb_points = min(self.nb_points, min_points)

        """ 7 - Persistance en *.calc (pickle). """
        self.create_out_calc()

    def load_last_datas(self):
        """ Ici, node-serveur. Récupération des derniers datas depuis son fichier out_*.calc. Les datas sont :
                - Un dictionnaire descriptif des consignes : 'self.od_descr'.
                - Un tableau numpy : 'self.np_array'.
            - Pour certains nodes, les datas ne sont pas dans out_*.cal, mais dans une base de données.
                - Le cas échéant, cette méthode sera surchargée. """
        server_root = os.path.dirname(self.o_yaml.yaml_file)
        os.makedirs(server_root, exist_ok=True)
        out_file = os.path.normpath(f'{server_root}/out.calc')

        try:
            with open(out_file, 'rb') as calc:
                self.od_descr, self.np_array = pickle.load(calc)
                if not self.od_descr or self.np_array.size == 0 or self.np_array.shape[0] != self.len_buffer:
                    raise()     # On force le passage par except.
        except (Exception,):
            """ Initialisation. """
            self.od_descr, self.np_array = self.new_od(), np.empty((self.len_buffer, 0))
        if not self.dt_servers_in:                                      # Dans la méthode suivante setup_od_descr() ...
            self.od_mydic.write(['self_server', 'Signal actif'], True)  # ... voir if l_keys[-1] != 'Signal actif':

    def setup_od_descr(self):
        """ Création du super-dictionnaire descriptif des calculs : od_descr.
        - Une clé pour chaque signal, actif ou pas, ayant exactement le même libellé que celui
            qui est affiché dans le dockable du node-client. Exemple pour un node 'Signaux' : 'Signaux1-Sinus'
            - La valeur de cette clé est un dictionnaire ayant pour clés obligatoires :
                - Diverses valeurs, spécifiques au signal, nécessaires aux calculs.
                - num_col : Entier. Défini au dernier passage, sinon -1, destinée aux nodes-clients.
                - actif : Booléen.
                - notes : Aucune utilité en programmation, à l'attention du développeur.
        - A la racine du dictionnaire, outre les clés de signaux, il y a :
            - signature : signature sha256. Contient un assemblage des paramètres + signatures des nodes-serveurs.
                        Cette signature-composite nous permet de savoir si les signaux entrants ont changé.
            - roots : liste de tuples, autant que de signaux en entrée : (name_input, key_dock)
                        Ex : name_input = 'e0', 'e1', ...       key_dock = ADD4-Poisson-Signal
            - Diverses valeurs globales, spécifiques au node, nécessaires aux calculs.

        - Valeur de retour : od_descr.
        """
        od_descr = Dictionary()
        """ Signature = ma signature + signatures des nodes-serveurs. """
        od_descr['signature'] = self.get_signature()
        od_descr['roots'] = list()

        num_col = 0
        for l_keys in self.od_mydic.key_list():
            if l_keys[-1] != 'Signal actif':
                continue

            key = l_keys[:-1]
            val = self.od_mydic.read(key)

            name_input = 'e' + l_keys[0][-1] if l_keys[0].startswith('Entrée ') else 'e0'
            root_key = self.join(*[self.s_id] + key[-1].split('-')[1:])
            if val.get('Signal actif', False):
                od_descr['roots'].append((name_input, root_key))

            od_descr['name_input'] = name_input
            self.descr_signal(od_descr, val, root_key)
            del od_descr['name_input']
            for key_dock in od_descr.keys():
                if key_dock.startswith(root_key):
                    if od_descr.read([key_dock, 'num_col']) is None:
                        num_col += 1
                        od_descr.write([key_dock, 'num_col'], num_col)
                    if key_dock.endswith('-Original'):
                        try:
                            od_descr.write([key_dock, 'actif'], val['Original'])
                        except (Exception,):
                            pass
                    elif od_descr.read([key_dock, 'actif']) is None:
                        od_descr.write([key_dock, 'actif'], val['Signal actif'])

        return od_descr

    def now_equal_ante(self, od_descr):
        l_no_compare = ['actif']
        for key_dock in od_descr.key_list():
            if key_dock[-1] in l_no_compare:
                continue
            elif key_dock[0] == 'changed':
                """ Passage forcé dans descr_signal() du node. """
                return False
            elif key_dock[-1] == 'signature' and od_descr.read(key_dock) != self.od_descr.read(key_dock):
                return False
            elif od_descr.read(key_dock) != self.od_descr.read(key_dock):
                return False
        else:
            for o_server, _, _ in self.dt_servers_in.values():
                if o_server.np_array is None:
                    return False
            else:
                return True

    def descr_signal(self, od_descr, val, root_key):
        """ Méthode OBLIGATOIREMENT surchargée par les classes dérivées. """
        """ - Création du super-dictionnaire de description 'od_descr'.
                - Les clés de 'od_descr' devront contenir les arguments nécessaires aux calculs.
                - Les calculs seront effectués par pre_process(), ou, en mode 'générateur', par get_matrix().
            - {key_dock} doit avoir EXACTEMENT la même orthographe que
              la clé affichée dans le dockable des paramètres du node CLIENT
            - Sources EXCLUSIVES : 'self.od_mydic' et 'self.o_yaml'. 
            
            - Dynamique - Cette méthode est appelée en boucle par la classe-mère :
                - Une fois par signal aux entrées (actif ou pas).
                - La construction du super-dictionnaire se fait donc par couches successives.
                - Cela ne signifie pas qu'il y a autant de signaux en sortie qu'en entrée !
                    - Ces 2 nombres sont INDÉPENDANTS !
                    - En effet, plusieurs signaux d'entrée peuvent contribuer pour un seul signal de sortie ...
                    - ... et/ou, inversement, un signal d'entrée peut produire plusieurs signaux de sortie.

            ************************* IMPORTANT *************************
            - Le coding se fera par itérations successives :
                - Commencer à coder les calculs (pre_process() ou calculate()), qui parcourra ce dictionnaire 'od_descr'
                - Si les calculs nécessitent une information absente dans 'od_descr', on revient ici pour l'ajouter.
                - On prolonge le coding des calculs, puis on boucle ici ...
                - ... jusqu'à la fin. 
        """
        raise SystemExit(f"{self.s_id} : ctrl_node.py > CtrlCalcul."
                         "\nLa méthode descr_signal() doit obligatoirement être surchargée.")

    def pre_process(self):
        """ Méthode surchargée par les classes 'générateur' dérivées : Signaux, Aléatoire, Histos, ... """
        """ Cette méthode peut également être surchargée par d'autres classes dérivées, mais attention :
            - La classe dérivée doit utiliser EXCLUSIVEMENT la méthode pre_process() OU la méthode get_matrix() :
                - pre_process() : Le tableau numpy va être affecté à 100% ...
                            |_ ... ce qui risque d'être chronophage pour certains calculs.
                - get_matrix() : Juste une toute petite partie du tableau numpy va être remplie en live, ...
                            |_ ... selon les besoins des nodes-clients, grâce à l'utilisation de générateurs python.
                            |_ Dans ce cas, la réactivité sera excellente, même si les calculs sont complexes. 
            
            Méthode pre_process() :
            ===================
            - La récupération des paramètres se fait exclusivement depuis le super-dictionnaire self.od_descr.
            - Le tableau numpy existe avec la bonne shape, mais est entièrement vide :
                - Autant de colonnes que de signaux, plus une : la colonne de status, en position 0.
                - Les valeurs sont des np.nan ...
                - ... sauf la colonne 0 qui est remplie de '0'.
                - Les valeurs sont produites :
                    - Par calculs, qui utilisent des fonctions "prêtes à l'emploi", du module 'indicators'.
                    - Par lecture d'une base de données.
        """
        pass

    def get_matrix(self, pointer, nb_lines=0):
        """
            @param pointer: Pointeur "Depuis", si pointer < 0 , la totalité du tableau est retournée.
            @param nb_lines: "Jusqu'a" = pointer + nb_lignes.
            - Pour certains nodes, les datas ne sont pas dans 'self.np_array', mais en base de données.
                - Le cas échéant, cette méthode sera surchargée.
            - Si le node n'est pas de type 'générateur', cette méthode est également surchargée.
                - Le traitement est court, ponctuel et partiel.
                - A l'issue du traitement, une toute petite partie de tableau numpy 'self.np_array' a été mise à jour.
                - La colonne 0 contient des flags (0 ou 1), levés lorsque les lignes correspondantes ont été traitées.
                - Le retour est une matrice ayant le même nombre de colonnes que np_array et {nb_lines} lignes.
            - Si les données demandées sont dans self.np_array, on les transmet directement.
            - Sinon, on les génère, on les place dans self.np_array, puis on les transmet.
            - Les paramètres sont à récupérer EXCLUSIVEMENT dans self.od_descr.
            - Les données d'entrée sont demandées aux nodes-serveurs des entrées.
        """
        _from, _to = pointer, pointer+nb_lines
        """ Calculer si 'forcing' ou si le status = 0 (si au moins une valeur de la colonne 0 est à 0). """
        if self.b_forcing_calc or np.prod(self.np_array[_from: _to, 0]) == 0:
            """ Update de self.np_array. Début de transaction. """
            self.process(pointer, nb_lines)     # Traitement de tous les signaux aux entrées.
            """ flag = 1 (fait) <--- Status. Fin de transaction. """
            self.np_array[_from: _to, 0] = 1            # <-- Écriture du status = 1 (en colonne 0).
        return self.np_array if pointer < 0 else self.np_array[_from: _to]     # Tout ou slice.

    def process(self, pointer, nb_lines):
        od_buffer = self.new_od()
        for name_input, root_key in self.od_descr.get('roots'):
            o_server = self.dt_servers_in[name_input][0]  # [0] = server, [1] = signature, [2] = edge
            """ On demande au serveur une matrice un peu plus grande que nécessaire,
            pour permettre l'amorce de certains calculs. On insère {previous_values} valeurs au début. """
            previous_values = min(pointer, int(self.od_descr.read(['max_length', root_key], 0)))
            np_matrix = o_server.get_matrix(pointer - previous_values, nb_lines + previous_values)
            num_col_server = o_server.get_num_col(root_key)
            vector_in = np_matrix[:, num_col_server]
            od_buffer['name_input'] = name_input
            self.calculate(pointer, nb_lines, root_key, vector_in, od_buffer)
            del od_buffer['name_input']

        if od_buffer:
            """ Si dictionnaire non vide, modifié par calculate(), on termine le traitement. """
            self.post_process(pointer, nb_lines, od_buffer)

    def calculate(self, pointer, nb_lines, root_key, vector_in, od_buffer):
        """ Pas de contrôle de surcharge pour les nodes sans entrées (type générateur, ...) """
        if self.dt_servers_in:
            raise SystemExit(f"{self.s_id} : ctrl_node.py > CtrlCalcul."
                             "\nLa méthode calculate() doit obligatoirement être surchargée.")

    def post_process(self, pointer, nb_lines, od_buffer):
        """ - Passage unique, après process() afin de finaliser certains calculs ...
                ... incluant d'autres signaux que le signal en cours.
            - Pré-traitement spécifique. """
        pass

    def create_out_calc(self):
        """ Création du fichier-résultat out_*.calc = (od_descr, liste de tableaux numpy). """

        """ 1 - Contrôle - Le tableau nd_array doit avoir une shape à 2 dimensions. """
        """ Correction nécessaire si le tableau est un vecteur. """
        if len(self.np_array.shape) == 1:        # <-- C'est un vecteur, on reshape.
            self.np_array = self.np_array.reshape(self.np_array.shape[0], 1)

        """ 2 - Persistance. """
        server_root = os.path.dirname(self.od_mydic['Yaml file'])
        out_file = os.path.normpath(f'{server_root}/out.calc')

        try:
            with open(out_file, 'wb') as calc:
                pickle.dump((self.od_descr, self.np_array), calc, pickle.HIGHEST_PROTOCOL)
        except (Exception,) as err:
            print(err)
            return False
        return True

    def add_vector(self, key_dock, num_col, vector):
        if vector is None:
            return
        vector = vector.reshape(vector.shape[0], 1)
        add_nan = np.full((self.len_buffer - vector.shape[0], 1), np.nan)
        vector = np.vstack((add_nan, vector))
        if num_col is None or num_col < 0:
            """ Nouvelle colonne d'indice num_col (égal au nombre actuel de colonnes). """
            num_col = self.np_array.shape[1]
            self.np_array = np.hstack((self.np_array, vector))
        else:
            """ Surcharge de la colonne num_col. """
            self.np_array[:, num_col:num_col + 1] = vector
        self.od_descr.write([key_dock, 'num_col'], num_col)

    def get_num_col(self, client_key):                          # client_key = MM4-Normal-Original
        sliced_key = self.join(*client_key.split('-')[1:])      # sliced_key =     Normal-Original (Début retiré).
        if sliced_key != '':
            for l_key in self.od_descr.key_list():              # [Aléatoire1-Normal, 'num_col']
                if l_key[-1] == 'num_col' and l_key[-2].endswith(sliced_key):
                    return self.od_descr.read(l_key)

    def get_vector_in(self, client_key):                        # client_key =  MM4-Normal-Original
        return self.np_array[:, self.get_num_col(client_key)]

 


Vérification

4 - Affichage de courbes :

Retour au plan

  • Avant toute chose, s'assurer que les bases de données de l'instrument (devise) en cours sont à jour.
  • L'entretien doit être hebdomadaire : téléchargement et synchronisation avec les fichiers csv.
    • Vous pouvez automatiser cette maintenance en ajoutant du code au lancement de l'application.
    • Cette option n'est pas traitée ici, à vous de jouer.
  • Mise en garde : Pendant ces opérations de téléchargement et de synchronisation, la base de données est verrouillée.
    • Attendre la fin de ces opérations pour afficher les courbes.
  • Vérifier que le code des nodes est à jour. Par sécurité, et pour être conforme, les voici :
  • /nodes/indicateurs/moy_mobile/moy_mobile.py :
    # Imports internes
    import functions.indicators as indic
    from nodes.indicateurs.moy_mobile.moy_mobile_yaml import YamlParams
    from pc.ctrl_node import CtrlNode, CtrlCalcul
    
    d_datas = {
        'name': 'MMobile',  # Label affiché sous l'icone.
        'icon': 'moy_mobile.png',  # Icone affichée.
    }
    
    
    class Node(CtrlNode):
        def __init__(self, o_scene, s_id, pos):
            super().__init__(o_scene, s_id, pos)
            self.type = 'MM'
            self.setup()
    
        def setup(self, child_file=__file__):
            self.l_short_circuits = [(0, 0)]  # Entrée N°0 'court-circuitable' avec la sortie N°0.
            super().setup(child_file)
    
        @property
        def ld_inputs(self):
            return [{
                'label': '',
                'label_pos': (6, -10)
            }]
    
        @property
        def ld_outputs(self):
            return [{
                'label': 'Sortie',
                'label_pos': (-38, -10)
            }]
    
        def my_params(self, context):
            return {
                'Original': True,
                'Type de MM': ['SMA', {'values': ['SMA', 'EMA', 'SMMA', 'LWMA']}],
                "Périodes (sep=',')": '14',
            }
    
        def my_signals(self, l_signals_in, num_socket_out):
            """ Faisceau de signaux (l_signals) délivré à la sortie. """
            l_signals = list()      # Liste de tuples à 4 valeurs.
            for signals_in in l_signals_in[0]:
                typ_id_from, signal_ante_from, signal_now_from, signal_source_from, \
                 _, signal_title, typ_id, signal_ante, signal_source = signals_in[:9]
                try:
                    periods = [int(p.strip()) for p in self.get_param([signal_title, "Périodes (sep=',')"]).split(',')]
                except (Exception,):
                    periods = [14]
                if self.get_param([signal_title, 'Original'], True):
                    l_signals.append((typ_id, signal_ante, 'Original', signal_source))
                for num, period in enumerate(periods):
                    legend = f"{signal_ante}-{self.get_param([signal_title, 'Type de MM'], 'SMA')}{period}"
                    signal_now = f'sign{num}'
                    signal_source = self.join(signal_source_from, self.join(typ_id_from, signal_now_from), sep='\n')
                    typ_id, signal_ante = f'{self.type}{self.id}', self.join(signal_ante_from, signal_now_from)
                    l_signals.append((typ_id, signal_ante, signal_now, signal_source, legend))
            return l_signals
    
        def refresh(self, l_keys):
            """ Code spécifique. """
            if self.need_update(l_keys):
                self.lo_sockets_out[0].to_update()  # Une seule sortie, N° 0.
            super().refresh(l_keys)
    
    
    class Calcul(CtrlCalcul):
        """ ********** Le code ci-dessous ne concerne pas le poste de contrôle, mais seulement les calculs. ********** """
        def __init__(self):
            super().__init__()
            self.o_yaml = YamlParams(self)
    
        def descr_signal(self, od_descr, val, root_key):
            """ Voir docstring dans la classe-mère. """
            signal_name = root_key.split('-')[1]
            """ Original. """
            key_dock = f"{root_key}-Original"
            od_descr.write([key_dock, 'notes'], f'{signal_name} original')
    
            typ = val['Type de MM']
            l_periods = [int(p.strip()) for p in val["Périodes (sep=',')"].split(',')]
            od_descr.write(['max_length', root_key], int(1.5 * max(l_periods)))
            for num, period in enumerate(l_periods):
                """ key_dock doit avoir EXACTEMENT la même orthographe que la clé affichée
                dans le dockable des paramètres du node CLIENT. """
                key_dock = f'{root_key}-sign{num}'
                od_descr.write([key_dock, 'notes'], f'{signal_name} {typ}{period}')
                od_descr.write([key_dock, 'typ'], typ)
                od_descr.write([key_dock, 'period'], period)
                od_descr.write([key_dock, 'kwargs', 'nb_last'], min(period, self.o_yaml.smma_nb_last))
                od_descr.write([key_dock, 'kwargs', 'ratio'], self.o_yaml.lwma_ratio)
    
        def pre_process(self):
            """ - Passage unique, après descr_signal() et avant get_matrix(). voir docstring dans la classe-mère.
                - Pré-traitement spécifique. """
            pass    # Cette méthode peut être supprimée.
    
        def calculate(self, pointer, nb_lines, root_key, vector_in, od_buffer):
            """ Passage multiple. """
            _to = pointer + nb_lines
            """ Original. """
            key_dock = root_key + '-Original'
            num_col = self.od_descr.read([key_dock, 'num_col'])
            self.np_array[pointer: _to, num_col] = vector_in[-nb_lines:]
    
            """ Si plusieurs moyennes, key_dock se termine par 'sign0', 'sign1', ... """
            for key_dock, d_val in self.od_descr.items():
                if key_dock.startswith(root_key+'-sign'):
                    num_col = d_val.get('num_col')
                    typ = d_val.get('typ')
                    period = d_val.get('period')
                    kwargs = d_val.get('kwargs')
                    vector_out = indic.get_mm(vector_in, typ=typ, period=period, **kwargs)
                    nb_valid_lines = min(len(vector_out), nb_lines)
                    self.np_array[_to - nb_valid_lines: _to, num_col] = vector_out[-nb_valid_lines:]
    

     

  • /nodes/indicateurs/macd/macd.py :
    # Imports internes.
    from pc.ctrl_node import CtrlNode, CtrlCalcul
    from nodes.indicateurs.macd.macd_yaml import YamlParams
    import functions.indicators as indic
    
    d_datas = {
        'name': 'MACD',         # Label affiché sous l'icone.
        'icon': 'macd.png',     # Icone affichée.
    }
    
    
    class Node(CtrlNode):
        def __init__(self, o_scene, s_id, pos):
            super().__init__(o_scene, s_id, pos)
            self.type = 'MACD'
            self.setup()
    
        def setup(self, child_file=__file__):
            """ Listes des dictionnaires de base attribués aux sockets. """
            self.l_short_circuits = [(0, 0), (0, 1)]  # e0 -> s0 et e0 -> s1
            super().setup(child_file)
    
        @property
        def ld_inputs(self):
            return [
                {  # Liste de 1 dictionnaire.
                    'label': 'Entrée',
                    'label_pos': (6, -10)
                }
            ]
    
        @property
        def ld_outputs(self):
            return [  # Liste de 2 dictionnaires.
                {
                    'label': 'Sortie',
                    'label_pos': (-38, -10)
                },
                'sep',  # Séparateur (facultatif).
                {
                    'label': 'Oscillateur',
                    'label_pos': (-59, -10)
                }
            ]
    
        def my_params(self, context):
            """ Code appelant : CtrlNode.dynamic_params() """
            return {
                'Original': True,
                'Type de MM longue': ['SMA', {'values': ['SMA', 'EMA', 'SMMA', 'LWMA']}],
                'Moyenne longue': [26, {'compactHeight': False}],
                'Type de MM courte': ['SMA', {'values': ['SMA', 'EMA', 'SMMA', 'LWMA']}],
                'Moyenne courte': [12, {'compactHeight': False}],
                'Type de MM Signal': ['SMA', {'values': ['SMA', 'EMA', 'SMMA', 'LWMA']}],
                'Signal': [9, {'compactHeight': False}],
            }
    
        def my_signals(self, l_signals_in, num_socket_out):
            """ Retourne un faisceau de signaux (l_signals) délivré à la sortie N° num_socket_out. """
            l_signals = [[], []]        # 2 sorties.
            for signals_in in l_signals_in[0]:
                typ_id_from, signal_ante_from, signal_now_from, signal_source_from, \
                 _, signal_title, typ_id, signal_ante, signal_source = signals_in
    
                signal_source = self.join(signal_source_from, self.join(typ_id_from, signal_now_from), sep='\n')
                typ_id, signal_ante = f'{self.type}{self.id}', self.join(signal_ante_from, signal_now_from)
    
                """ Sortie 0 : 2 ou 3 signaux. """
                orig = ['Original'] if self.get_param([signal_title, 'Original'], True) else []
                for signal_now in orig + ["Longue", "Courte"]:
                    l_signals[0].append((typ_id, signal_ante, signal_now, signal_source))
    
                """ Sortie 1 : 3 signaux. !!! Attention, ce n'est pas le signe moins, mais un caractère spécial !!!"""
                for signal_now in ['(Courte–Longue)', 'Signal', 'MACD']:
                    l_signals[1].append((typ_id, signal_ante, signal_now, signal_source))
    
            return l_signals[num_socket_out]
    
        def refresh(self, l_keys):
            """ Ex : l_keys = ["Paramètres du node 'Node0'", 'Signaux-1 : Sinus', 'Signal actif'] """
            if self.need_update(l_keys):
                """ Update (si l'état actif a changé) OU (si actif ET un des paramètres a changé). """
                self.lo_sockets_out[0].to_update()
                self.lo_sockets_out[1].to_update()
            super().refresh(l_keys)
    
    
    class Calcul(CtrlCalcul):
        """ ********** Le code ci-dessous ne concerne pas le poste de contrôle, mais seulement les calculs. ********** """
        def __init__(self):
            super().__init__()
            self.o_yaml = YamlParams(self)
    
        # noinspection DuplicatedCode
        def descr_signal(self, od_descr, val, root_key):
            """ Voir docstring dans la classe-mère. """
            signal_name = root_key.split('-')[1]
    
            """ Original. """
            key_dock = f"{root_key}-Original"
            od_descr.write([key_dock, 'notes'], f'{signal_name} original')
    
            """ Moyenne longue. """
            per_long = val.get('Moyenne longue', 26)
            key_dock = f"{root_key}-Longue"
            typ = val.get("Type de MM longue", 'SMA')
            od_descr.write([key_dock, 'notes'], f'{signal_name}-Longue {typ}{per_long}')
            od_descr.write([key_dock, 'typ'], typ)
            od_descr.write([key_dock, 'period'], per_long)
            od_descr.write([key_dock, 'kwargs', 'nb_last'], min(per_long, self.o_yaml.long_smma_nb_last))
            od_descr.write([key_dock, 'kwargs', 'ratio'], self.o_yaml.long_lwma_ratio)
    
            """ Moyenne courte. """
            per_short = val.get('Moyenne courte', 12)
            key_dock = f"{root_key}-Courte"
            typ = val.get("Type de MM courte", 'SMA')
            od_descr.write([key_dock, 'notes'], f'{signal_name}-Courte {typ}{per_short}')
            od_descr.write([key_dock, 'typ'], typ)
            od_descr.write([key_dock, 'period'], per_short)
            od_descr.write([key_dock, 'kwargs', 'nb_last'], min(per_short, self.o_yaml.short_smma_nb_last))
            od_descr.write([key_dock, 'kwargs', 'ratio'], self.o_yaml.short_lwma_ratio)
    
            """ Courte - Longue. """       # !!! Attention, ce n'est pas un tiret, mais un caractère spécial !!!
            """ Les key_dock sont traitées par la suite par des split('-'). Le tiret ne doit donc pas être utilisé. """
            key_dock = f"{root_key}-(Courte–Longue)"    # (Courte{caractère spécial}Longue)
            od_descr.write([key_dock, 'notes'], f'{signal_name}-Courte - {signal_name}-Longue')
    
            """ Signal. """
            per_signal = val.get('Signal', 9)
            key_dock = f"{root_key}-Signal"
            typ = val.get("Type de MM Signal", 'SMA')
            od_descr.write([key_dock, 'notes'], f'{signal_name}-Signal {typ}{per_signal}')
            od_descr.write([key_dock, 'typ'], val.get("Type de MM Signal", 'SMA'))
            od_descr.write([key_dock, 'period'], per_signal)
            od_descr.write([key_dock, 'kwargs', 'nb_last'], min(per_signal, self.o_yaml.signal_smma_nb_last))
            od_descr.write([key_dock, 'kwargs', 'ratio'], self.o_yaml.signal_lwma_ratio)
    
            """ MACD. """
            key_dock = f"{root_key}-MACD"
            od_descr.write([key_dock, 'notes'], f'{signal_name} MACD')
    
            """ La plus grande période (augmentée de 50% par sécurité). """
            od_descr.write(['max_length', root_key], 1.5 * max(per_long, per_short, per_signal))
    
        def pre_process(self):
            """ Passage unique, après descr_signal() et avant get_matrix(). voir docstring dans la classe-mère. """
            pass
    
        def calculate(self, pointer, nb_lines, root_key, vector_in, od_buffer):
            """ Passage mutiple - Voir docstring dans la classe-mère. """
            _to = pointer + nb_lines
            od_val = self.new_od()
            l_cols = list()
    
            """ Phase 1 : récupération des paramètres du macd (od_val) et des n° de colonnes (l_cols). """
            """ Original. """
            key_dock = root_key + '-Original'
            l_cols.append(self.od_descr.read([key_dock, 'num_col']))
    
            """ Moyenne longue. """
            key_dock = root_key + '-Longue'
            l_cols.append(self.od_descr.read([key_dock, 'num_col']))
            od_val.write(['long', 'typ'], self.od_descr.read([key_dock, 'typ']))
            od_val.write(['long', 'period'], self.od_descr.read([key_dock, 'period']))
            od_val.write(['long', 'nb_last'], self.od_descr.read([key_dock, 'kwargs', 'nb_last']))
            od_val.write(['long', 'ratio'], self.od_descr.read([key_dock, 'kwargs', 'ratio']))
    
            """ Moyenne courte. """
            key_dock = root_key + '-Courte'
            l_cols.append(self.od_descr.read([key_dock, 'num_col']))
            od_val.write(['short', 'typ'], self.od_descr.read([key_dock, 'typ']))
            od_val.write(['short', 'period'], self.od_descr.read([key_dock, 'period']))
            od_val.write(['short', 'nb_last'], self.od_descr.read([key_dock, 'kwargs', 'nb_last']))
            od_val.write(['short', 'ratio'], self.od_descr.read([key_dock, 'kwargs', 'ratio']))
    
            """ Différence : (Moyenne courte - Moyenne longue). """
            key_dock = root_key + '-(Courte–Longue)'     # '–' = Caractère spécial.
            l_cols.append(self.od_descr.read([key_dock, 'num_col']))
    
            """ Signal. """
            key_dock = root_key + '-Signal'
            l_cols.append(self.od_descr.read([key_dock, 'num_col']))
            od_val.write(['signal', 'typ'], self.od_descr.read([key_dock, 'typ']))
            od_val.write(['signal', 'period'], self.od_descr.read([key_dock, 'period']))
            od_val.write(['signal', 'nb_last'], self.od_descr.read([key_dock, 'kwargs', 'nb_last']))
            od_val.write(['signal', 'ratio'], self.od_descr.read([key_dock, 'kwargs', 'ratio']))
    
            """ MACD. """
            key_dock = root_key + '-MACD'     # '–' = Caractère spécial.
            l_cols.append(self.od_descr.read([key_dock, 'num_col']))
    
            """ Phase 2 : Utilisation de od_val pour obtenir le macd. """
            t_macd = indic.macd(vector_in, od_val)   # Tuple
    
            """ Phase 3 : Utilisation de t_macd et de l_cols pour updater self.np_array. """
            for i, vector_out in enumerate(t_macd):
                nb_valid_lines = min(len(vector_out), nb_lines)
                self.np_array[_to - nb_valid_lines: _to, l_cols[i]] = vector_out[-nb_valid_lines:]
    

     

  • /nodes/oscillateurs/rsi/rsi.py :
    from pc.ctrl_node import CtrlNode, CtrlCalcul
    from nodes.oscillateurs.rsi.rsi_yaml import YamlParams
    import functions.indicators as indic
    d_datas = {
        'name': 'RSI',           # Label affiché sous l'icone.
        'icon': 'rsi.png',       # Icone affichée.
    }
    
    
    class Node(CtrlNode):
        def __init__(self, o_scene, s_id, pos):
            super().__init__(o_scene, s_id, pos)
            self.type = 'RSI'
            self.setup()
    
        def setup(self, child_file=__file__):
            self.l_short_circuits = [(0, 0)]  # Entrée N°0 'court-circuitable' avec la sortie N°0.
            super().setup(child_file)
    
        @property
        def ld_inputs(self):
            return [{
                'label': '',
                'label_pos': (6, -10)
            }]
    
        @property
        def ld_outputs(self):
            return [{
                'label': 'Sortie',
                'label_pos': (-38, -10)
            }]
    
        def my_params(self, context):
            return {
                'Zone neutre': [70., {'step': 1, 'limits': (5, 95), 'suffix': '%', 'compactHeight': False}],
                "Périodes (sep=',')": '14',
            }
    
        def my_signals(self, l_signals_in, num_socket_out):
            """ Faisceau de signaux (l_signals) délivré à la sortie. """
            l_signals = list()
            for signals_in in l_signals_in[0]:
                typ_id_from, signal_ante_from, signal_now_from, signal_source_from, \
                 _, signal_title, typ_id, signal_ante, signal_source = signals_in
                periods = [int(p) for p in self.get_param([signal_title, "Périodes (sep=',')"]).split(',')]
                for period in periods:
                    signal_now = f't{period}'
                    signal_source = self.join(signal_source_from, self.join(typ_id_from, signal_now_from), sep='\n')
                    typ_id, signal_ante = f'{self.type}{self.id}', self.join(signal_ante_from, signal_now_from)
                    l_signals.append((typ_id, signal_ante, signal_now, signal_source))
            return l_signals
    
        def refresh(self, l_keys):
            """ Code spécifique. """
            if self.need_update(l_keys):
                self.lo_sockets_out[0].to_update()  # Une seule sortie, N° 0.
            super().refresh(l_keys)
    
    
    class Calcul(CtrlCalcul):
        """ ********** Le code ci-dessous ne concerne pas le poste de contrôle, mais seulement les calculs. ********** """
        def __init__(self):
            super().__init__()
            self.o_yaml = YamlParams(self)
    
        def descr_signal(self, od_descr, val, root_key):
            """ - Voir docstring dans la classe-mère.
                - Les clés 'low' et 'high' sont utilisées par matplotlib pour colorier les zônes extrêmes. """
            signal_name = root_key.split('-')[1]
            l_periods = [int(p.strip()) for p in val["Périodes (sep=',')"].split(',')]
            od_descr.write(['max_length', root_key], int(1.5 * max(l_periods)))
            for period in l_periods:
                key_dock = f"{root_key}-t{period}"
                """ Limites low et high. """
                low = (100 - val['Zone neutre']) / 2
                high = 100 - low
                od_descr.write([key_dock, 'notes'], f'{signal_name} {period}')
                od_descr.write([key_dock, 'period'], period)
                """ Variables 'low' et 'high' utilisables dans les paramètres étendus yaml. """
                od_descr.write([key_dock, 'low'], low)
                od_descr.write([key_dock, 'high'], high)
    
        def pre_process(self):
            """ - Passage unique, après descr_signal() et avant get_matrix(). voir docstring dans la classe-mère.
                - Pré-traitement spécifique. """
            pass
    
        def calculate(self, pointer, nb_lines, root_key, vector_in, od_buffer):
            _to = pointer + nb_lines
            """ Si plusieurs périodes de rsi, key_dock se termine par 't14', 't22', ... """
            for key_dock, d_val in self.od_descr.items():
                if key_dock.startswith(root_key+'-t'):
                    num_col = d_val.get('num_col')
                    period = d_val.get('period')
                    vector_out = indic.rsi(vector_in, period)
                    nb_valid_lines = min(len(vector_out), nb_lines)
                    self.np_array[_to - nb_valid_lines: _to, num_col] = vector_out[-nb_valid_lines:]
    

     



Ce signal est un Renko de maille 5 : chaque point est à 5 pips du voisin (+ ou -).

  • Ordonnée : la valeur du signal est exprimée en pips : c'est une unité pivot normalisée qui permet d'effectuer des comparaisons entre devises.
  • Abscisse : Les points sont affichés chronologiquement de la gauche vers la droite, mais la durée entre 2 points n'a aucun sens en mode Renko :
    • En effet, entre 2 points il peut y avoir 1 seconde ... ou 1 heure !
    • Celà n'a pas importance, puisque pour le trading, c'est le niveau qui fournit les gains, pas le temps.
  • Lorsque nous élaborons une stratégie de trading, il faut garder à l'esprit que les signaux du futur sont inconnus.
  • Bien sur, ils existent en base de données, puisque ce sont des historiques, mais les utiliser dans les calculs serait une triche.

Que se passe-t-il si l'on affiche plusieurs signaux ?

Les signaux sont sur des plans différents, à l'instar d'un paysage vu par la fenêtre d'un train :
L'arrière plan (mouvement lent) peut faire des kilomètres, le premier plan quelques centimètres.
Les signaux convergent à l'instant présent.


Exemple : moyenne mobile SMA14


Le seul point commun à toutes les courbes est à droite, à l'instant présent.


Exemple : MACD + RSI


Bonjour les codeurs !