Serveur d'historiques réels
Avant-propos
Ce tuto termine le chapitre 'Historiques'
Description

Pour la mise au point, seul un signal Renko sera utilisé, exemple : Maille 5.
show_matplotlib.py.À la fin de show_matplotlib.py :
if __name__ == '__main__':
""" Ajout d'arguments à la ligne de commande permettant un lancement autonome (pour la MAP). """
sys.argv.append('Calculs') # Nom du graphe.
sys.argv.append('0') # Plots id. (La ligne de commande ne doit contenir que du str).
sys.argv.append('-1') # Gestion de la fermeture automatique. Poste de contrôle PID : -1 en local.
mpl = ShowMatPlotLib()
Histos.Plots :
fixed_params() de ces nodes./nodes/generateurs/histos/histos.py > Node.fixed_params() :
def fixed_params(self):
return {
'Instrument': ['EUR/USD', {'values': ['EUR/USD', 'USD/JPY', 'EUR/CHF', 'USD/CAD', 'NZD/USD',
'EUR/GBP', 'EUR/JPY', 'GBP/JPY', 'GBP/CHF', 'GBP/USD']}],
'Ticks': False, # à chaque variation du signal.
'Maille 1': False, # Renko, maille de 1 pip.
'Maille 2': False, # 2 pips.
'Maille 3': False, # 3 pips.
'Maille 4': False, # 4 pips.
'Maille 5': True, # 5 pips.
'Maille 7': False, # 7 pips.
'Maille 10': False, # 10 pips.
'Maille 14': False, # 14 pips.
'Maille 20': False, # 20 pips.
'1 mn': False, # 1 minute.
'5 mn': False, # 5 minutes.
'15 mn': False, # 15 minutes.
'30 mn': False, # 30 minutes.
'1 h': False, # 1 heure.
'4 h': False, # 4 heures.
'1 j': False, # 1 jour.
'1 s': False, # 1 semaine.
'* Voir historiques ': '', # '*' = bouton.
'* Télécharger ': '',
'* Synchroniser bd ': '',
}
/nodes/afficheurs/plots/plots.py > Node.fixed_params() :
def fixed_params(self):
""" Valeurs par défaut. """
return {
"Nombre d'entrées": [1, {'step': 1, 'limits': (1, 8), 'compactHeight': False}],
'Abscisse': [50_000., {'suffix': ' points', 'limits': (1000, 300_000),
'compactHeight': False, 'siPrefix': True, 'dec': True}],
'Fenêtre': {
'Titre': 'Titre de la fenêtre',
'Interface': ['Matplotlib', {'values': list(self.scripts.keys())}]
}
}
Notez l'effet très intéressant de l'attribut dec en modifiant 'Abscisse' depuis le dockable (échelle logarithmique).
main.py)Abscisse ?
Position (sa valeur max) doit être automatiquement modifiée ...show_matplotlib.py. Voici son code.☐ Fichier complet /show/show_matplotlib.py :
# Imports externes.
from PyQt5.QtCore import QTimer
import psutil
import pickle
import numpy as np
import sys
import os
from matplotlib import pyplot as plt, rcParams
from matplotlib.widgets import Slider
# sys.path.append(os.path.dirname(sys.path[0])) # Pour MAP depuis le terminal.
# Imports internes.
from functions.utils import Utils, Dictionary, DateTime
from nodes.afficheurs.plots.matplotlib_yaml import YamlParams
from pc.ctrl_node import CtrlCalcul
# noinspection PyUnresolvedReferences
class ShowMatPlotLib(CtrlCalcul):
def __init__(self):
super().__init__()
""" Ini: récupération des arguments passés en ligne de commande. """
self.graph_name = sys.argv[1]
self.id = int(sys.argv[2])
self.parent_pid = int(sys.argv[3])
self.final_path = ''
self.ut = None
""" Watch. """
self.l_files_ante = list()
""" Matplotlib. """
self.fig = None
self.l_ax = [None for _ in range(8)] # Liste de 8 valeurs (les axes, ou graphiques : 1 par entrée)
self.b_busy = False
self.b_paused = False
self.b_reverse = False
""" Graphiques. """
self.l_inputs = list()
self.width_x = None
self.pointer = None
""" Timers. """
self.dt = DateTime()
self.listen = QTimer()
self.anim = QTimer()
""" Setup. """
self.setup()
def setup(self):
""" 1 - Chemin complet du node final 'Plots'. Ce chemin contient les modèles de calculs 'calc_*.pkl. """
bk_path = os.path.dirname(__file__).replace('show', 'backups') # Dossier de backup.
self.final_path = os.path.normpath(os.path.join(bk_path, self.graph_name, f'node{self.id}')) # 'Plots'
self.ut = Utils(path=self.final_path, file='matplotlib')
""" 2 - Aide pour la mise au point : styles et paramètres diponibles dans matplotlib. Décommenter pour voir. """
# [print(style) for style in plt.style.available] # Liste des styles disponibles.
# Dictionary(rcParams).print() # Liste des paramètres disponibles.
""" 3 - Fichier de configuration, éditable manuellement en yaml. """
self.o_yaml = YamlParams(self)
self.o_yaml.yaml_file = os.path.normpath(f'{self.final_path}/{self.id}-matplotlib.yaml')
self.o_yaml.update_odyaml()
self.o_yaml.set_rcparams(rcParams)
plt.style.use(self.o_yaml.get_style())
""" 4 - Vitesse d'animation (Modifiable : touches + et -), pause, reverse, pas à pas. """
self.anim.indx = 5 # <-- Vitesse moyenne (de 0 à 11)
speed = self.speed(self.anim.indx)
self.anim.delay = speed[0]
self.anim.step = speed[1]
""" 5 - Initialisation de matplotlib.pyplot. """
self.fig = plt.figure()
self.mgr.set_window_title('**ini**')
""" Slider 'Largeur'. https://matplotlib.org/stable/api/widgets_api.html#matplotlib.widgets.Slider """
slide_width = plt.axes([.12, .06, .77, .02]) # Taille et position du slider 'Largeur'.
slide_width.semilogx()
self.width_x = Slider(slide_width, 'Largeur', 10, 10_000, 316)
# |_ 316 est la moyenne géométrique de 10 et 10_000. 10**1=10, 10**2.5=316 (au milieu), 10**4=10_000
slide_posit = plt.axes([.12, .02, .77, .02]) # Taille et position du slider 'Position'.
self.pointer = Slider(ax=slide_posit, label='Position', valmin=0,
valmax=self.len_buffer - self.width_x.val - 2, valfmt='%0.0f')
self.pointer.set_val(-1)
""" 6 - Restauration de l'état de la fenêtre. """
self.ut.restore_state(self.mgr.window)
""" 7 - Surveillance des fichiers '.pkl'. """
self.ini_watch()
""" 8 - Événements. """
self.listen.timeout.connect(self.listener)
self.anim.timeout.connect(self.show_anim)
self.fig.canvas.mpl_connect('key_press_event', self.key_event)
self.pointer.on_changed(self.show_anim)
self.width_x.on_changed(self.set_width_x)
""" 9 - Lancement des timers. """
self.listen.start(1000)
self.anim.start(self.anim.delay)
""" 10 - Boucle d'exécution Matplotlib. """
plt.show()
@property
def mgr(self):
return plt.get_current_fig_manager()
def listener(self):
""" 1 - Si le poste de contrôle est fermé => fin programme, sauf en mode autonome. """
pass
if self.parent_pid > 0 and not psutil.pid_exists(self.parent_pid):
exit()
""" 2 - Persistance géométrie. """
self.ut.save_state(self.mgr.window)
def set_width_x(self, val):
self.set_pointer_xlim()
self.pointer.val += 1 if self.b_reverse else -1
self.show_anim()
def show_anim(self, _=0):
""" Anti ré-entrance. """
if self.b_busy:
return
self.b_busy = True
""" Pointeur commun. """
self.pointer.set_val(round(self.pointer.val + (-self.anim.step if self.b_reverse else self.anim.step)))
if self.pointer.val > self.pointer.valmax - round(self.width_x.val) - 2: # Bouclage provisoire.
self.pointer.set_val(0)
if self.pointer.val < 0: # Bouclage provisoire.
self.pointer.set_val(self.pointer.valmax - round(self.width_x.val) - 2)
""" Scrolling des entrées connectées et actives du node 'Plots' : subplots ou graphiques. """
for ax in self.l_ax:
if ax is None or not hasattr(ax, 'o_server') or ax.o_server is None:
continue
""" Nombre de points en abscisse.
Si non précisé dans le yaml, ce subplot utilisera la largeur commune, fixée par le slider 'Largeur'. """
width_x = round(self.width_x.val) # valeur du slider.
""" Avancement individuel du pointeur. """
if len(ax.lines) > 0: # Immobile si aucune courbe n'est affichée.
ax.pointer += -self.anim.step if self.b_reverse else self.anim.step
if ax.pointer > self.len_buffer - width_x: # Bouclage provisoire.
ax.pointer = 0
if ax.pointer < 0: # Bouclage provisoire.
ax.pointer = self.len_buffer - width_x
""" pointer = Position du 1er point à gauche dans le graphique.
Si ShareX dans le yaml (True par défaut), ce subplot utilisera le slider 'Position'. """
pointer = self.pointer.val if ax.od_self_yaml.get('ShareX', True) else ax.pointer
""" Récupération des données sous forme de matrices numpy. """
x = np.arange(pointer, pointer + width_x)
y = ax.o_server.get_matrix(pointer, width_x)
if y is None:
continue
""" Boucle sur les courbes. """
y_min, y_max = np.inf, -np.inf
for o_line in ax.lines:
num_col = int(o_line.od.get('num_col'))
if not np.isnan(y[:, num_col]).all():
y_min = min(y_min, np.nanmin(y[:, num_col]))
y_max = max(y_max, np.nanmax(y[:, num_col]))
""" Dessin d'une courbe. Attribution des valeurs. """
try:
o_line.set_xdata(x)
o_line.set_ydata(y[:, num_col])
except (Exception,):
pass
""" Limites x et y (abscisses et ordonnées). """
ax.set_xlim(pointer, pointer + width_x - 1) # Abscisses.
if y_min != np.inf:
y_padding = max(0, (y_max - y_min) * 0.05) # Marges top et bottom du graphique (5%)
y_min, y_max = y_min - y_padding, y_max + y_padding
ax.set_ylim(y_min, y_max) # Ordonnées.
""" Coloriage inter-courbes. """
self.o_yaml.between(ax, x, y)
""" Fin. """
plt.draw()
self.b_busy = False
def ini_watch(self):
""" Liste des éléments à surveiller : dossier {graphe} + dossiers {nodes} + fichiers .pkl + fichiers .yaml
- Le dossier est self.final_path, dans les backups.
- Les fichiers calc_*.pkl : un par entrée active de ce node d'affichage.
- Les fichiers .yaml : valeurs des clés 'Yaml file' dans les .pkl """
""" 1 - Dossier du graphe dans les backups :dossier-parent de tous les nodes du graphe. """
graph_path = os.path.normpath(f"{os.path.dirname(__file__).replace('show', 'backups')}/{self.graph_name}")
""" 2 - Liste des dossiers des nodes du graphe (qui contiennent des fichiers nécessaires aux calculs) """
l_folders = [graph_path] + [f'{graph_path}{os.sep}{node}' for node in os.listdir(graph_path) if
str(node).startswith('node')]
""" 3 - Liste des modèles yaml. """
l_models = list()
nodes_dir = os.path.dirname(__file__).replace('show', 'nodes')
for (folder, subfolder, files) in os.walk(nodes_dir):
if folder.endswith('models'):
for file in files:
f_include = os.path.join(folder, file)
l_models.append(f_include)
""" 4 - Liste des fichiers calc_*.pkl physiquement présents dans le dossier self.final_path """
l_watched_files = [os.path.normpath(f'{self.final_path}/{calc}') for calc in
os.listdir(self.final_path) if calc.startswith('calc_') or calc.endswith('.conf')]
""" 5 - Liste des fichiers .yaml : chaque od_calc (extrait du pkl) contient plusieurs clés 'Yaml file'.
On en fait une liste sans doublons. """
s_yaml = set() # set() au lieu de list() pour s'affranchir des doublons.
for path, folder, l_files in os.walk(graph_path):
[s_yaml.add(os.path.normpath(f'{path}/{file}')) for file in l_files if os.path.splitext(file)[1] == '.yaml']
""" 6 - Concaténation des listes précédentes. """
l_watched_files += list(s_yaml) + l_models # Concaténation de 3 listes de fichiers.
l_files = l_folders + l_watched_files # Concaténation des listes : dossiers et fichiers.
""" 7 - Vérification des changements dans cette liste, mais non dans le contenu des fichiers. """
if self.l_files_ante != l_files:
self.l_files_ante = self.ut.deepcopy(l_files) # Mémorisation (copie profonde).
self.ut.watch_file(l_files, self.watch) # (Re)lancement de la surveillance.
self.dt.delay(self.watch, 10, l_watched_files)
def watch(self, l_files):
""" l_files : liste contenant les dossiers et fichiers modifiés."""
files_only = [file for file in l_files if os.path.isfile(file)] # Filtrage : que les fichiers.
if len(files_only) == 0:
""" 1 - l_files ne contient que des dossiers : cela signifie que des fichiers ont été ajoutés ou supprimés.
Par conséquent on relance la méthode ini_watch() pour mettre à jour la liste des fichiers à surveiller. """
self.ini_watch()
return
if len(files_only) == 1 and files_only[0].endswith('.conf'):
return
""" 2 - Lecture de tous les calc_*.pkl (un par entrée) et écriture de leur contenu dans self.od_calcs. """
CtrlCalcul.od_calcs.clear() # Attribut de la classe CtrlCalcul.
l_calcs = [(f'calc_{i}', os.path.normpath(f'{self.final_path}/calc_{i}.pkl')) for i in range(8)]
for name_input, calc_file in l_calcs:
""" ex : name_input = 'calc_0' """
try:
with open(calc_file, 'rb') as pk:
self.od_calcs.write(name_input, pickle.load(pk))
except (Exception,):
pass
""" 3 - Création ou update des paramètres étendus od_yaml de o_yaml. """
self.o_yaml.update_odyaml()
""" 4 - La méthode principale update_figure() est chargée des mises à jour des calculs et des affichages. """
self.update_figure()
def update_figure(self):
""" Update suite aux modifications .pkl et .yaml """
""" 1 - Titre de la fenêtre. """
local = ' - Mode local' if len(sys.argv) < 6 else '' # Nombre d'arguments passés en ligne de commande.
title_ante = self.mgr.get_window_title()
b_first = title_ante == '**ini**' # b_first = Premier passage, au lancement de l'appli.
name_input = list(self.od_calcs.keys())[0] # Première entrée de 'Plots' connectée.
title_now = self.od_calcs.read([name_input, f'Node{self.id}', 'Fenêtre', 'Titre'], 'Graphiques') + local
l_inputs = [key for (key, val) in self.od_calcs.items() if val['edges']] # Entrées actives seulement.
if title_now != title_ante:
self.mgr.set_window_title(title_now)
""" 1.1 - Si seul le titre de la fenêtre a changé, inutile de continuer (sauf au 1er passage). """
if l_inputs == self.l_inputs and not b_first:
return True
""" 2 - Paramètres étendus yaml. """
self.o_yaml.set_rcparams(rcParams)
""" 3 - Subplots : création, update, géométrie. Chaque entrée est associée à un subplot.
- Celui-ci est créé ou mis à jour en fonction des changements de grille (nb d'entrées ou yaml). """
if self.o_yaml.grid_modified or l_inputs != self.l_inputs: # Respecter l'ordre de comparaison !
self.l_inputs = self.ut.deepcopy(l_inputs)
d_grids = self.o_yaml.get_dgrids()
for num_input in range(8):
self.update_ax(d_grids, num_input)
""" 4 - Paramètres des subplots (un par entrée active) et de leurs courbes. """
self.od_mydic.clear()
pointer_size = self.len_buffer
for num_input in range(8): # self.l_inputs = ['calc_0', 'calc_1', ...]
name_input = f'calc_{num_input}'
if name_input not in l_inputs:
self.l_ax[num_input] = None
continue
ax = self.l_ax[num_input] # Copie par référence.
""" 4.1 - Pour cette entrée : recherche de son serveur et de sa signature. """
edge = self.get_edge(name_input) # ex : edge = ((1, 0), (0, 0))
ax.o_server, signature = self.get_server(edge[0][0]) # edge[0][0] = id du node serveur.
ax.num_input = num_input
if ax.o_server is None:
continue # Entrée non connectée ou désactivée.
""" 4.2 - Si nécessaire, calculs et paramètres des courbes. """
if not hasattr(ax, 'signature') or ax.signature != signature:
ax.signature = signature
ax.od_self_yaml = Dictionary(self.o_yaml.od_yaml.read(f'Entrée {num_input}'))
ax.o_server.calculation()
""" 4.3 - Paramètres des courbes. """
self.update_subplot(ax)
""" 4.4 - Nombre de points fournis par les nodes serveurs. On conserve le min. """
pointer_size = min(pointer_size, ax.o_server.nb_points)
""" 5 - Nombre de points en abscisse.
https://matplotlib.org/stable/api/widgets_api.html#matplotlib.widgets.Slider """
if self.pointer.valmax != pointer_size - self.width_x.val - 2:
self.set_pointer_xlim(pointer_size)
""" 6 - Actualisation du buffer d'affichage nécessaire lorsque il est en pause. """
self.show_anim()
""" 7 - Affichage à l'écran du contenu du buffer d'affichage. """
plt.draw()
def set_pointer_xlim(self, pointer_size=None):
self.pointer.valmin, self.pointer.valmax = 0, self.pointer.valmax if pointer_size is None else pointer_size
self.pointer.ax.set_xlim(self.pointer.valmin, self.pointer.valmax - round(self.width_x.val) - 2)
self.pointer.val = min(self.pointer.val, self.pointer.valmax)
def get_edge(self, name_input): # ex: name_input = 'calc_1'.
""" Recherche du socket_out emetteur (directement connecté au socket_in 'name_input' de ce node 'Plots'). """
s_edges = self.od_calcs.read([name_input, 'edges'])
for edge in s_edges:
if edge[1][0] == self.id:
return edge
def update_subplot(self, ax):
""" Mise à jour des paramètres de ce subplot (axe), ainsi que de chacune de ses courbes.
@param ax: objet subplot.
@return: NA.
"""
def update_line():
""" Code d'appel : point 2 un peu plus bas.
https://matplotlib.org/stable/api/_as_gen/matplotlib.lines.Line2D.html """
""" 2.1 - Affectation des attributs aux courbes : couleur, épaisseur, style, légende. """
o_line.od = Dictionary(od_lines.read(o_line.name))
if not o_line.od:
return # 'Kalman2-Normal-Original'
num_col = ax.o_server.od_descr.read([o_line.name, 'num_col']) # 'Kalman2-Normal-kalman'
o_line.od.write('Entrée', f'Entrée {ax.num_input}')
o_line.od.write('num_col', num_col)
o_line.set_color(o_line.od.read('Couleur'))
o_line.set_linewidth(o_line.od.read('Épaisseur'))
o_line.set_linestyle(self.get_linestyle(o_line.od.read('Style')))
o_line.set_label(o_line.od.read('Légende'))
self.o_yaml.line_params(o_line) # Paramètres étendus pour cette courbe.
""" 2.2 - Exemple de compilation dynamique au niveau de la courbe 'o_line'. Décommenter si nécessaire. """
# d_context = {'line': o_line}
# self.o_yaml.compile([f'Entrée {num_input}', o_line.name, 'Code'], d_context)
""" 1 - Actualisation de la liste des courbes ax.lines.
- Si 'Plots' a 1 seule entrée, clé de type ['calc_0', 'Node0'], sinon ['calc_0', 'Node0', 'Entrée 0']. """
l_mykey = [f'calc_{ax.num_input}', f'Node{self.id}'] # Clé de od_mydic dans od_calcs.
l_keys = l_mykey + [f'Entrée {ax.num_input}'] # ['calc_0', 'Node0', 'Entrée 0']
od_lines = Dictionary(self.od_calcs.read(l_keys, self.od_calcs.read(l_mykey)))
self.update_lines(ax.num_input, od_lines) # Suppression et ajout de courbes.
""" 2 - Mise à jour de chacune des courbe. """
for o_line in ax.lines: # Parcours des courbes de ce subplot (de cette entrée de node).
update_line()
""" 3 - Exemple de compilation dynamique au niveau du subplot 'num_input'. Décommenter si nécessaire. """
# d_context = {'subplot': ax}
# self.o_yaml.compile([f'Entrée {num_input}', 'Code'], d_context)
""" 4 - Paramètres étendus yaml. """
self.o_yaml.subplot(self.l_ax, ax.num_input)
def update_lines(self, num_input, od_plots):
""" Actualisation de la liste de courbes self.l_ax[num_input].lines[].
On compare les listes de coubes nécessaires et de courbes existantes.
On en déduit celles qu'il faut ajouter, celles qu'il faut supprimer. """
""" 1) Courbes nécessaires : l_needed. """
l_needed = list()
for key, val in od_plots.items():
if isinstance(val, dict) and 'Signal actif' in val and val['Signal actif']:
l_needed.append(key)
""" 2) Courbes existantes : l_exist. """
l_exist = self.line_exist(num_input)
""" 3) Ajout (création) de courbes. """
for needed in l_needed:
if needed not in l_exist:
o_line, = self.l_ax[num_input].plot(np.empty(0)) # [num_input] = Élément 0 de la liste.
o_line.name = needed
""" 4) Suppression de courbes. """
for exist in l_exist:
if exist not in l_needed:
o_line = self.get_line(num_input, exist)
if o_line is not None:
self.l_ax[num_input].lines.remove(o_line)
def line_exist(self, num_input):
""" Appelée par update_lines(). """
l_lines = list()
for line in self.l_ax[num_input].lines:
l_lines.append(line.name)
return l_lines
def get_line(self, num_input, line_name):
""" Appelée par update_lines(). """
for o_line in self.l_ax[num_input].lines:
if o_line.name == line_name:
return o_line
return None
@staticmethod
def get_linestyle(param_style):
if param_style.startswith('___'):
return '-'
elif param_style.startswith('- -'):
return '--'
elif param_style.startswith('. .'):
return ':'
return '-.'
def update_ax(self, d_grids, num_input):
""" On vérifie les 8 inputs, connectés ou pas. """
name_input = f'calc_{num_input}'
ax = self.l_ax[num_input]
if name_input not in self.l_inputs:
""" Vérification des entrées non connectées aussi, afin éviter des affichages indésirables."""
if ax is not None:
""" Ce subplot a existé, il n'existe plus. """
ax.set_visible(False) # Suppression des graphiques (subplots) fantômes.
self.l_ax[num_input] = None
return
""" Le subplot {ax} contient l'attribut 'pointer' qui disparait après cet update.
Par conséquent, on le sauvegarde d'abord. """
if ax is None:
pointer = 0
else:
ax.set_visible(False) # Suppression des graphiques (subplots) fantômes.
pointer = ax.pointer
""" Update du subplot {ax}. add_axes() permet un positionnement absolu, totalement libre. """
ax = self.fig.add_axes(d_grids[name_input]) # Position absolue. x, y, w, h. Valeurs entre 0 et 1.
""" Réintégration de l'attribut 'pointer'. """
ax.pointer = pointer
self.l_ax[num_input] = ax
@staticmethod
def speed(indx):
""" Calcul empirique. """
values = [(3000, 1), (2000, 1), (1200, 1), (640, 1), (480, 1), (320, 1), # (durée en ms, pas de 1) <-- Lents.
(280, 2), (296, 4), (273, 7), (280, 14), (286, 26), (282, 47), (400, 20)] # <-- Rapides.
return values[indx]
def key_event(self, ev):
""" Appui sur une touche du clavier. """
keycode = ev.key
if keycode == ' ':
""" Pause on/off. """
self.b_paused = not self.b_paused
self.anim.stop() if self.b_paused else self.anim.start(int(self.anim.delay))
elif keycode == '+':
""" Accélérer. """
self.anim.stop()
self.anim.indx += 1
self.anim.indx = min(self.anim.indx, 11)
speed = self.speed(self.anim.indx)
self.anim.delay = speed[0]
self.anim.step = speed[1]
self.anim.start(int(self.anim.delay))
elif keycode == '-':
""" Ralentir. """
self.anim.stop()
self.anim.indx -= 1
self.anim.indx = max(self.anim.indx, 0)
speed = self.speed(self.anim.indx)
self.anim.delay = speed[0]
self.anim.step = speed[1]
self.anim.start(int(self.anim.delay))
elif keycode == 'tab':
self.b_reverse = not self.b_reverse
elif keycode == 'right' or keycode == 'left':
self.b_paused = True
self.anim.stop()
step = self.anim.step
self.anim.step = 1 if self.b_reverse == (keycode == 'left') else -1 # Ou exclusif.
self.show_anim()
self.anim.step = step
if __name__ == '__main__':
""" Ajout d'arguments à la ligne de commande permettant un lancement autonome (pour la MAP). """
sys.argv.append('Calculs') # Nom du graphe.
sys.argv.append('0') # Plots id. (La ligne de commande ne doit contenir que du str).
sys.argv.append('-1') # Gestion de la fermeture automatique. Poste de contrôle PID : -1 en local.
mpl = ShowMatPlotLib()
La classe Calcul du node Plots devient inutile, vous pouvez la supprimer.

Les méthodes colorées sont spécifiques au node et sont implémentées dans sa classe Calcul.
PC est enregistré dans un sous-dossier du dossier backups.params.pkl.Plots) possèdent un fichier calc_?.pkl par entrée active et connectée.*.yaml.o_server.
Plots-0 n'a qu'une entrée dont le node-serveur est Histos-1.o_server.descr_signal() est appelée :
self.od_descr à partir des fichiers pkl et yaml.o_server.pre_process() est appelée : grâce à self.od_descr, elle réalise un traitement d'initialisation des calculs.o_server.get_matrix() est totalement autonome et séparée.
o_server.pre_process().numpy avec autant de colonnes que de courbes à dessiner et avec autant de lignes que de points en abscisse du graphique.La classe Calcul du node Histos doit donc contenir ces 3 méthodes :
self.descr_signal().
self.pre_process().
self.get_matrix().
☐ /nodes/geberateurs/histos/histos.py > fichier complet :
# Imports externes
import numpy as np
# Imports internes
from pc.ctrl_node import CtrlNode, CtrlCalcul
from nodes.generateurs.histos.histos_yaml import YamlParams
from trading.historiques.ctrl_histos import CtrlHistos
d_datas = {
'name': 'Histos', # Label affiché sous l'icone.
'icon': 'histos.png', # Icone affichée.
}
class Node(CtrlNode):
def __init__(self, o_scene, s_id, pos):
super().__init__(o_scene, s_id, pos)
self.type = 'Histos'
self.setup()
def setup(self, child_file=__file__):
super().setup(child_file)
@property
def ld_outputs(self):
return [{
'label': 'Sortie',
'label_pos': (-38, -10)
}]
def fixed_params(self):
return {
'Instrument': ['EUR/USD', {'values': ['EUR/USD', 'USD/JPY', 'EUR/CHF', 'USD/CAD', 'NZD/USD',
'EUR/GBP', 'EUR/JPY', 'GBP/JPY', 'GBP/CHF', 'GBP/USD']}],
'Ticks': False, # à chaque variation du signal.
'Maille 1': False, # Renko, maille de 1 pip.
'Maille 2': False, # 2 pips.
'Maille 3': False, # 3 pips.
'Maille 4': False, # 4 pips.
'Maille 5': True, # 5 pips.
'Maille 7': False, # 7 pips.
'Maille 10': False, # 10 pips.
'Maille 14': False, # 14 pips.
'Maille 20': False, # 20 pips.
'1 mn': False, # 1 minute.
'5 mn': False, # 5 minutes.
'15 mn': False, # 15 minutes.
'30 mn': False, # 30 minutes.
'1 h': False, # 1 heure.
'4 h': False, # 4 heures.
'1 j': False, # 1 jour.
'1 s': False, # 1 semaine.
'* Voir historiques ': '', # '*' = bouton.
'* Télécharger ': '',
'* Synchroniser bd ': '',
}
def my_signals(self, l_signals_in, num_socket_out):
l_signals = list()
for signal_key in self.fixed_params().keys():
b_signal = self.get_param(signal_key) # Booléen.
if isinstance(b_signal, bool) and b_signal:
typ_id, signal_ante, signal_now, signal_source = f'{self.type}{self.id}', '', signal_key, ''
l_signals.append((typ_id, signal_ante, signal_now, signal_source))
return l_signals
def get_state(self):
""" Surcharge. """
if self.b_chk:
for signal in self.fixed_params().keys():
if isinstance(self.get_param(signal), bool) and self.get_param(signal, False):
return True
return False
def histo_data(self, button):
""" Clic sur un bouton : Voir, télécharger, synchroniser. """
o_hist = CtrlHistos(self.get_param('Instrument')) # Objet Contrôleur d'historiques.
if button.startswith('Voir'):
""" Bouton 'Voir historiques'. Si Instrument == EUR/USD => Symbole == EURUSD (sans '/') """
o_hist.verify_weeks(b_ticks=False, b_silent=True)
print()
o_hist.verify_weeks(b_ticks=True)
elif button.startswith('Télécharger'):
""" Bouton 'Télécharger'. """
o_hist.download_histos(nb_weeks=1, b_ticks=False) # Candles.
o_hist.download_histos(nb_weeks=1, b_ticks=True)
""" Option : après le téléchargement, on affiche l'état des historiques. """
print()
o_hist.verify_weeks(b_ticks=False, b_silent=True) # Candles.
print()
o_hist.verify_weeks(b_ticks=True) # Ticks.
elif button.startswith('Synchroniser'):
""" Bouton 'Synchroniser bd'. """
o_hist.synchro_db_csv(b_ticks=False)
o_hist.synchro_db_csv(b_ticks=True)
def refresh(self, l_keys):
""" Méthode appelée automatiquement à chaque modification de paramètre dans le dockable. """
if l_keys[1].startswith(' '): # Les libellés des boutons commencent par des espaces.
""" Les 3 boutons pour la base de données. Affichages dans le terminal. """
self.histo_data(l_keys[1].strip())
elif isinstance(self.get_param(l_keys[-1]), bool) or l_keys[-1] in ['Instrument', 'Nombre']:
""" Nodes à updater : Propagation aval, seulement pour les signaux (cases à cocher). """
self.lo_sockets_out[0].to_update()
""" Infrastructure : Insertion du mot 'infrastructure' dans l_keys, en 1ère place. """
l_keys = ['infrastructure'] + l_keys
super().refresh(l_keys)
class Calcul(CtrlCalcul):
""" ********** Le code ci-dessous ne concerne pas le poste de contrôle, mais seulement les calculs. ********** """
def __init__(self):
super().__init__()
self.o_yaml = YamlParams(self)
self.d_stamps = dict()
self.l_pilot = list()
self.o_histos = None
self.b_forcing_calc = True
def descr_signal(self, od_descr, val, root_key):
""" - Création du super-dictionnaire de description. Voir docstring dans la classe-mère.
- key_dock doit avoir EXACTEMENT la même orthographe que la clé affichée
dans le dockable des paramètres du node CLIENT.
- Sources EXCLUSIVES : self.od_mydic et self.o_yaml. """
instrument = self.od_mydic.read('Instrument')
od_descr.write('changed', True)
od_descr.write('instrument', instrument)
for signal_name, val in self.od_mydic.items():
if isinstance(val, bool) and signal_name != 'Checked':
key_dock = f'{self.s_id}-{signal_name}' # Ex : Histos1-Maille 7
od_descr.write([key_dock, 'notes'], signal_name)
od_descr.write([key_dock, 'actif'], self.od_mydic.read(signal_name, False)) # True ou False.
if instrument != self.od_descr.get('instrument'):
self.d_stamps.clear()
def pre_process(self):
""" Voir docstring dans la classe-mère. """
""" La récupération des paramètres se fait EXCLUSIVEMENT depuis le super-dictionnaire self.od_descr. """
""" Le tableau numpy (self.np_array) est entièrement rempli, en une seule passe. """
""" Traitement du pilote (pilote = image du slider 'Position'). """
""" 1 - Signal le +lent : ts_right = timestamp à droite de la fenêtre lorsque son début est à gauche. """
""" 2 - Recherche l'index de ts_right dans le plus rapide. """
""" 3 - ts_left = timestamp à gauche de la fenêtre du signal le +rapide. """
""" 4 - Calcul de l'offset minimum pour que tous les signaux soient affichés à 100 %. """
def get_index(_list, _val):
""" La liste doit être triée (sens croissant). """
try:
return _list.index(_val)
except ValueError:
for indx, val in enumerate(_list):
if val > _val:
return indx
return len(_list) - 1 # Le dernier par défaut
d_tables = {
'Ticks': 'Ticks', 'Maille 1': 'r1', 'Maille 2': 'r2', 'Maille 3': 'r3', 'Maille 4': 'r4', 'Maille 5': 'r5',
'Maille 7': 'r7', 'Maille 10': 'r10', 'Maille 14': 'r14', 'Maille 20': 'r20', '1 mn': 'm1', '5 mn': 'm5',
'15 mn': 'm15', '30 mn': 'm30', '1 h': 'h1', '4 h': 'h4', '1 j': 'day', '1 s': 'week'
}
abscissa_length = 300_000 # Étendue maximum du Slider 'Position'.
width_x = 316 # Nombre de points affichés = valeur du slider 'Largeur' par défaut.
instrument = self.od_descr.get('instrument', 'EUR/USD')
self.o_histos = CtrlHistos(instrument)
l_stamps, l_tables = list(), list()
slowest, fastest, len_min, len_max, ts_right = '', '', 10**6, 0, 0
for l_key in self.od_descr.key_list():
if l_key[-1] == 'actif' and self.od_descr.read(l_key):
""" Cette devise est cochée dans le dockable des paramètres. """
key_dock = l_key[0]
signal_name = key_dock.split('-')[1]
table_name = d_tables.get(signal_name)
l_tables.append((table_name, self.od_descr.read([key_dock, 'num_col'])))
if signal_name not in self.d_stamps:
self.d_stamps[signal_name] = self.o_histos.get_stamps(table_name, abscissa_length)
len_st = len(self.d_stamps[signal_name])
if len_st < len_min:
""" Recherche du signal le plus lent (le moins dense) <-- dense = nb valeurs/unité de temps. """
len_min = len_st
ts_right = self.d_stamps[signal_name][min(width_x, len_min)]
if len_st > len_max:
""" Recherche du signal le plus rapide (le plus dense). """
len_max = len_st
fastest = signal_name
l_stamps += self.d_stamps[signal_name]
self.l_pilot = list(set(l_stamps)) # Suppression des doublons.
self.l_pilot.sort() # Tri.
indx_left = max(0, get_index(self.d_stamps[fastest], ts_right) - width_x)
stamp_left = self.d_stamps[fastest][indx_left]
offset = get_index(self.l_pilot, stamp_left)
self.nb_points = len(self.l_pilot) - offset + 1
""" Infos nécessaires pour get_matrix(). """
self.od_descr['offset'] = offset
self.od_descr['bd_tables'] = l_tables
self.od_descr['pips'] = self.o_histos.pips
def get_matrix(self, pointer, nb_lines=0):
""" Construction du ndarray de retour à partir des ndarray du dictionnaire self.d_datas. """
_from, _to = pointer, pointer+nb_lines
np_matrix = np.full((nb_lines, 19), np.nan) # 1 + 18 colonnes (voir d_tables de pre_process() ci-dessus.)
""" Signal pilote (celui qui présente le plus de détails). """
pips = self.od_descr.get('pips')
offset = self.od_descr.get('offset')
try:
final_stamp = self.l_pilot[offset + pointer + nb_lines]
except (Exception,):
return np_matrix
for table_name, num_col in self.od_descr.get('bd_tables'):
slice_histo = self.o_histos.get_datas(table_name, final_stamp, nb_lines)
pass
if table_name == 'Ticks':
vector = (slice_histo[:, 1] + slice_histo[:, 2]) / (2 * pips)
elif table_name.startswith('r'):
vector = slice_histo[:, 1]
else:
vector = (slice_histo[:, 4] + slice_histo[:, 8]) / (2 * pips)
np_matrix[-slice_histo.shape[0]:, num_col] = vector
return np_matrix
histos_yaml.py et histos_seeder.yaml existent./nodes/generateur/histos/histos_yaml.py :
# Imports internes.
from nodes.yaml_parent import YamlParent
class YamlParams(YamlParent):
def __init__(self, o_calc):
super().__init__(o_calc, __file__)
/nodes/generateur/histos/histos_seeder.yaml : Ce fichier est vide, mais il doit exister.
Oui mais ... une nouvelle version de CtrlHistos est nécessaire pour satisfaire à l'implémentation des calculs dans le node histos :
self.pips ajouté.try ... except ajouté autour de requests.get(...)._existing_lists() améliorée (plus rapide).get_stamps() ajoutée.get_datas() ajoutée.☐ Fichier /trading/historiques/ctrl_histos.py :
# Imports externes
import datetime
import gzip
import os # http://www.python-simple.com/python-modules-fichiers/os-path.php (Ctrl + clic)
import requests
from io import BytesIO, StringIO
import pandas as pd
import keyboard # https://www.delftstack.com/fr/howto/python/python-detect-keypress/
# Imports internes
from functions.utils import DateTime, Utils
from trading.historiques.db_candle import DbCandle
from trading.historiques.db_tick import DbTick
class CtrlHistos:
def __init__(self, instrument):
""" https://github.com/fxcm/MarketData
Tous les symboles :
AUDCAD, AUDCHF, AUDJPY, AUDNZD, AUDUSD, CADCHF, CADJPY, EURAUD, EURCHF,
EURGBP, EURJPY, EURNZD, EURUSD, GBPCAD, GBPCHF, GBPJPY, GBPNZD, GBPUSD,
NZDCAD, NZDCHF, NZDJPY, NZDUSD, USDCAD, USDCHF, USDJPY, USDTRY
Les plus utilisés (Dans l'ordre d'importance) :
EURUSD, USDJPY, EURCHF, USDCAD, NZDUSD, EURGBP, EURJPY, GBPJPY, GBPCHF, GBPUSD """
self.instrument = instrument
self.symbol = instrument.replace('/', '')
self.dt = DateTime()
self.ut = Utils()
self.kb_break = 'f12' # 'shift', 'ctrl', ... à votre convenance.
self.pips = .01 if instrument.endswith('JPY') else .0001
self.symbol_dir = None
self.db_candle = None
self.db_tick = None
self.setup()
def setup(self):
""" Les données peuvent se trouver sur un disque dur SSD autre que celui en cours.
Le cas échéant, db_path doit être modifié (chemin absolu).
Il est également possible, pour les besoins des tests, d'avoir plusieurs bases de données.
|_ Il faudra alors modifier db_path chaque fois que l'on désire switcher. """
db_path = os.path.dirname(__file__) # <--- Chemin par défaut, à modifier si autre disque dur.
self.symbol_dir = os.path.abspath(f'{db_path}/db/{self.symbol}') # Dossier des fichiers d'historiques.
os.makedirs(self.symbol_dir, exist_ok=True)
self.db_candle = DbCandle(self)
self.db_tick = DbTick(self)
def _helper(self, key, *l_params):
""" ************ Collection d'utilitaires. Nombre de paramètres variable. ************ """
if key == 'nb_weeks_in_year':
""" Renvoie le nombre de semaines dans l'année {year}. """
year = l_params[0] # l_params = [année dont on veut connaître le nombre de semaines].
if len(l_params) > 1:
y, num_week, _ = datetime.date.today().isocalendar()
if y == year:
return num_week - 1
o_dat = self.dt.get_date_from_dtstamp(self.dt.get_dtstamp_from_dtstr(f'{year}-12-28', '%Y-%m-%d'))
return o_dat.isocalendar()[1]
elif key == 'l_iso_yw':
""" Renvoie une liste de tuples ISO (iso_year, iso_str_week) : <-- exemple de tuple : (2019, '08')
- Pour la devise en cours et son type (tick ou candle).
- Toutes les semaines, du début à aujourd'hui.
- L'année est un int, la semaine une str ('02', '12', ...) """
b_ticks = l_params[0] # l_params = [b_ticks].
st_yw = set() # Le set() évite les doublons.
since = 2018 if b_ticks else 2012
yw_today = datetime.date.today().isocalendar()[:2]
for year in range(since, yw_today[0] + 1):
for week in range(1, 54):
iso = self.dt.isoyw_from_fxyw(year, week)
st_yw.add((iso[0], f'0{iso[1]}'[-2:]))
if (year, week) >= yw_today:
lt_yw = list(st_yw)
lt_yw.sort()
return lt_yw
elif key == 'csv_file':
""" Renvoie le chemin complet du fichier correspondant à la devise en cours, à l'année et à la semaine. """
year, week, b_ticks = l_params # l_params = [*t_yw, b_ticks]
str_week = f'0{week}'[-2:]
return os.path.join(self.symbol_dir, str(year), f"{'tick' if b_ticks else 'candle'}_"
f"{str_week}.csv")
elif key == 'url_file':
""" Renvoie l'url du fichier d'histos correspondant à la devise en cours, à l'année et à la semaine. """
year, str_week, b_ticks = l_params # l_params = [*t_yw, b_ticks]
url_candle = 'https://candledata.fxcorporate.com/m1'
url_tick = 'https://tickdata.fxcorporate.com'
return f'{url_tick if b_ticks else url_candle}/{self.symbol}/{year}/{int(str_week)}.csv.gz'
def verify_weeks(self, b_ticks, b_silent=False):
""" Affichage d'un tableau dans la console (Lignes=années, Colonnes=semaines).
'▒' ou '▄' ou '▀' = semaine présente, '.' = semaine absente. """
""" 1 - Listes des semaines réellement existantes en base de données et en fichier .csv. """
l_weeks_csv, l_weeks_db = self._existing_lists(b_ticks)
""" 2 - Affichage du titre et entêtes de colonnes. """
typ = 'Ticks' if b_ticks else 'Candles'
self.ut.gauge(f"{self.instrument}-{typ}", large=17)
[self.ut.gauge(char=f"{str(week) :<2}") for week in range(1, 54)]
self.ut.gauge('end')
""" 3 - Affichage du contenu - Ordonnées = années, abscisse = semaines. """
l_weeks = l_weeks_csv + l_weeks_db
l_weeks.sort()
first_year = (2018 if b_ticks else 2012) if len(l_weeks) == 0 else l_weeks[0][0]
year_now = datetime.date.today().isocalendar()[0]
for year in range(first_year, year_now + 1):
self.ut.gauge(year, large=17)
""" Boucle sur les semaines de l'année {year} : 1 à (52 ou 53). """
for week in range(1, self._helper('nb_weeks_in_year', year, 'today') + 1): # today limite à aujourd'hui.
t_yw = year, f'0{week}'[-2:] # 7 -> 07
b_csv, b_db = t_yw in l_weeks_csv, t_yw in l_weeks_db
if not b_csv and not b_db: # 0 0 /x./y
char, color = '. ', 'BLEU'
elif not b_csv and b_db: # 0 1 /x.y
char, color = '▀ ', 'VERT'
elif b_csv and not b_db: # 1 0 x./y
char, color = '▄ ', 'VIOLET'
else: # 1 1 x.y
char, color = '▒ ', 'BLEU'
self.ut.gauge(char=char, color=color)
self.ut.gauge('end')
""" Vérification de la synchronisation entre la base de données et les fichiers csv. """
msg = "\nAttention : La base de données n'est pas entièrement synchronisée ! csv + db = rectangles bleus." \
"\nL'affichage ci-dessus montre les fichiers .csv en carrés-bas mauves, la db en carrés-hauts verts." \
if not b_silent and l_weeks_csv != l_weeks_db else ''
self.ut.printc(msg)
def download_histos(self, nb_weeks, b_ticks):
if nb_weeks > 1:
self.ut.printc(f"\nArrêt manuel : garder la touche '{self.kb_break.upper()}' appuyée.\n")
""" Intervalle des semaines à télécharger (nombres en base 53). """
""" |_ since = Depuis. """
l_weeks_csv, _ = self._existing_lists(b_ticks=b_ticks) # Semaines = str <-- '08', '09', '10', ...
l_csv = [(y, int(w)) for (y, w) in l_weeks_csv] # Semaines = int <-- 8, 9, 10, ...
for y, w in l_csv:
if w > 2:
since = y * 53 + w - 1
break
else:
since = (2018 if b_ticks else 2012) * 53
y_today, w_today = datetime.date.today().isocalendar()[:2]
now = y_today * 53 + w_today
""" |_ now = Jusqu'à. """
""" l_required = Liste des fichiers à télécharger. """
l_required = list()
for yw53 in range(since, now-1): # dernière semaine = (now-1) => pas la semaine en cours.
csv_yw = yw53 // 53, 1 + yw53 % 53
csv_file = self._helper('csv_file', *csv_yw, b_ticks)
if not os.path.isfile(csv_file):
""" Fichier absent localement => à télécharger. """
l_required.append(csv_yw)
db = self.db_tick if b_ticks else self.db_candle
nb_weeks_loaded = 0
""" Parcours des semaines à traiter. """
for csv_yw_required in l_required:
df = self._download_csv_file(csv_yw_required, b_ticks)
if df is None:
""" Arrêt manuel demandé. """
break
elif df.shape[0] > 0:
""" 1 - Enregistrement du fichier téléchargé sur disque dur. """
self._df_to_csv(df, csv_yw_required, b_ticks)
""" 2 - Écriture du contenu en base de données. break si arrêt manuel demandé. """
if not db.df_to_table(df):
continue
""" 3 - Update des volumes à partir des ticks. """
df_stamps = self.db_tick.get_df_stamps(*csv_yw_required)
if df_stamps.shape[0] > 0:
self.db_candle.update_volume(df_stamps)
""" Arrêt si le nombre de semaines demandées est atteint. """
nb_weeks_loaded += 1
if nb_weeks_loaded >= nb_weeks:
break
s = 's' if nb_weeks > 1 else ''
self.ut.printc(f"Téléchargement{s} terminé{s}.", color='vert')
def synchro_db_csv(self, b_ticks):
""" A l'issue de cette méthode, TOUTE la db sera l'image de TOUS les fichiers csv. """
""" 1 - Base de données. """
db, typ = (self.db_tick, 'Ticks') if b_ticks else (self.db_candle, 'Candles')
""" 2 - État actuel des historiques => Différences entre csv et db
- l_drop = semaines à supprimer de la db, l_add = semaines à ajouter à la db. """
l_drop, l_add, l_vol = self._lists_to_update_db(b_ticks=b_ticks)
if l_drop+l_add+l_vol == []:
self.ut.printc(f"{typ} csv <--> db : OK. La synchronisation est inutile.\n", color='VERT')
return True
""" 3 - Message : état initial. """
for l_weeks in [l_drop, l_add]:
if len(l_weeks) > 0:
s = 's sont' if len(l_weeks) > 1 else ' est'
verb = 'ajouter à' if l_weeks == l_add else 'supprimer de'
self.ut.printc(f"{len(l_weeks)} semaine{s} à {verb} la base de données '{typ.lower()}'.", color='VERT')
""" 4 - Ajout de données dans la db. """
self._add_weeks(l_add, b_ticks)
""" 5 - Suppression de données de la db. """
for yw in l_drop:
db.delete_week(*yw)
""" 6 - Update des volumes. """
for yw in l_vol:
if keyboard.is_pressed(self.kb_break):
self.ut.printc("Arrêt manuel demandé.")
break
df_stamps = self.db_tick.get_df_stamps(*yw)
self.db_candle.update_volume(df_stamps)
""" 7 - Vérification. """
l_drop, l_add, l_vol = self._lists_to_update_db(b_ticks=b_ticks)
success, color = ('réussi', 'VERT') if l_drop+l_add+l_vol == [] else ('échoué', 'ROUGE')
self.ut.printc(f'{typ} : La synchronisation a {success}.', color=color)
def _existing_lists(self, b_ticks):
""" Retourne les listes de semaines des fichiers csv et des enregistrements en base de données. """
""" s_weeks_csv n'est pas une list(), mais un set(), pour éviter les doublons. """
l_weeks_csv, s_weeks_db = list(), set()
db = self.db_tick if b_ticks else self.db_candle
l_iso_yw = self._helper('l_iso_yw', b_ticks) # Liste de toutes les semaines.
typ = 'Ticks' if b_ticks else 'Candles'
self.ut.gauge(f"{self.instrument}-{typ} Création listes :", large=0)
for num, iso_yw in enumerate(l_iso_yw):
""" Semaines existant en fichiers csv : Pour une semaine iso, on a 0, 1 ou 2 fichiers csv. """
l_csv_files, last_stamp = self._get_csv_files(*iso_yw, b_ticks)
csv_size = 0
if len(l_csv_files) > 0:
l_weeks_csv.append(iso_yw)
for csv_file in l_csv_files:
csv_size += os.path.getsize(csv_file)
""" Liste des semaines existant en base de données. """
csv = last_stamp if b_ticks else csv_size
if db.week_exists(*iso_yw, csv):
s_weeks_db.add(iso_yw) # (1)
if num % 8 == 0:
self.ut.gauge(char='.') # , color='Jaune')
self.ut.gauge('end')
self.ut.gauge('end') # Ligne vide supplémentaire.
""" Persistance en pkl. """
l_weeks_db = list(s_weeks_db)
l_weeks_csv.sort()
l_weeks_db.sort()
return l_weeks_csv, l_weeks_db
def _get_csv_files(self, iso_year, iso_week, b_ticks):
""" Attention ! Les noms de fichiers csv ne correspondent pas toujours à leur contenu ! """
""" Return : liste de noms complets de fichiers csv correspondant à (iso_year, iso_week) : 0, 1 ou 2 fichiers.
fxcorporate ne respecte pas la norme ISO. De plus :
- D'une année sur l'autre, le nom du fichier csv ne correspond pas à son contenu.
- On ne trouve pas de correspondance entre diverses devises.
- Par conséquent, le nom du fichier n'est pas fiable pour trouver le bon N° de semaine.
La méthode adoptée pour contourner ce problème est la suivante :
- Recherche de 3 fichiers csv voisins de (iso_year, iso_week) passés en paramètres.
- Lecture, pour chacun, de la date-heure du milieu du fichier.
- On en déduit le VRAI N° de semaine ISO.
Il arrive parfois, au changement d'année, qu'une même semaine soit composée de 2 fichiers csv.
- C'est pourquoi la valeur de retour est une liste. """
int_week, l_fxyw = int(iso_week), list()
last_stamp = 0
for week in range(int_week - 1, int_week + 1): # 2 Semaines : précédente, actuelle.
""" Correction : N° de semaine en base 53. """
y, w = (iso_year, week) if week > 0 else (iso_year - 1, week + 53)
y, w = (y, w) if w <= 53 else (y + 1, w - 53)
""" Les 3 fichiers csv candidats. """
str_w = f'0{w}'[-2:]
file_name = f"{'tick' if b_ticks else 'candle'}_{str_w}.csv" # 'tick_??.csv' ou 'candle_??.csv'.
csv_path = os.path.abspath(f"{os.path.dirname(self.symbol_dir)}/{self.symbol}/{y}/{file_name}")
if os.path.isfile(csv_path):
with open(csv_path, 'r') as fh:
""" On 'goûte' les 300 derniers bytes du fichier pour connaitre le N° ISO de semaine. """
nb_char = 300
fh.seek(0, os.SEEK_END)
fh.seek(fh.tell() - nb_char)
dt_str = fh.read().split('\n')[-3].split(',')[0]
last_stamp = self.dt.get_dtstamp_from_dtstr(dt_str)
o_d = self.dt.get_date_from_dtstamp(last_stamp) # Objet date.
if o_d.isocalendar()[:2] == (iso_year, int_week): # Filtrage.
l_fxyw.append(csv_path) # Candidat sélectionné.
return l_fxyw, last_stamp # 0, 1 ou 2 éléments.
def _download_csv_file(self, csv_yw_required, b_ticks):
""" Téléchargement, décompactage, décodage, DataFrame, enregistrement sur disque dur.
- return DataFrame. """
""" 1 - Demande au helper de construire l'url. """
url_required = self._helper('url_file', *csv_yw_required, b_ticks)
""" 2 - Initialisation des bytes. """
content = b''
""" 3 - Téléchargement en streaming avec lots de taille {size//50}, permettant l'insertion d'une jauge. """
y, w = csv_yw_required
try:
with requests.get(url_required, stream=True) as req:
size = int(req.headers['Content-Length'])
if size < 1_000:
return pd.DataFrame() # return df vide.
print()
self.ut.gauge(f"Téléchargement {' ticks ' if b_ticks else 'candles'} {self.symbol} ({y}, {w})")
for chunk in req.iter_content(chunk_size=size//50):
if keyboard.is_pressed(self.kb_break):
self.ut.printc("Arrêt manuel demandé.")
self.ut.gauge('end')
return None
content += chunk
""" 3.1 - Jauge de progression. """
self.ut.gauge()
self.ut.gauge('end')
except (Exception,):
self.ut.printc('Connexion internet interrompue.')
return None
""" 4 - Décompactage du buffer de données binaires (codées utf), en mémoire. """
buf = BytesIO(content)
f = gzip.GzipFile(fileobj=buf)
coded_bytes = f.read()
""" 5 - Décodage : utf-8 pour les candles, utf-16 pour les ticks. """
codec, type_data = ('utf-16', 'tick') if b_ticks else ('utf-8', 'candle')
decoded_bytes = coded_bytes.decode(codec)
""" 6 - Conversion Bytes -> String """
decoded_str = StringIO(decoded_bytes)
""" 7 - String to DataFrame. """
df = pd.read_csv(decoded_str) # Pas de colonne-index (index_col=0 retiré)
""" 8 - Retourne le dataframe. """
return df
def _df_to_csv(self, df, t_yw, b_ticks):
""" Contrôle préalable : si le dossier-cible n'existe pas, on le crée. """
csv_path = self._helper('csv_file', *t_yw, b_ticks)
os.makedirs(os.path.dirname(csv_path), exist_ok=True)
df.to_csv(csv_path, index=False) # Copie du fichier .csv sur disque dur.
@staticmethod
def _csv_to_df(csv_file):
""" Conversion du fichier {csv_file} en dataframe. """
return pd.read_csv(csv_file) if os.path.isfile(csv_file) else pd.DataFrame()
def _lists_to_update_db(self, b_ticks):
""" Retourne 3 listes : l_drop, l_add et l_vol.
Algorithme pour l_drop et l_add :
- (1) = l_weeks_db : Liste des semaines réellement existantes en base de données.
- (2) = l_weeks_csv : Liste des semaines réellement existantes en fichiers csv.
- Intersection (Inter) = (1) ⋂ (2)
- l_drop : Liste des semaines à supprimer de la db (celles qui n'existent pas en csv) : (1) - (Inter)
- l_add : Liste des semaines à ajouter dans la db (celles des csv qui n'existent pas en db) : (2) - (Inter)
--------------------------------
l_vol = liste des semaines (yw) de candles nécessitant un update du champ 'Volume' dans les tables.
Sans objet pour les ticks.
"""
""" 1 et 2 - Listes des semaines réellement existantes en base de données et en fichier .csv. """
l_weeks_csv, l_weeks_db = self._existing_lists(b_ticks=b_ticks)
""" 3 - Liste des semaines à supprimer de la db (celles qui n'existent pas en csv). """
l_drop = list()
for t_yw in l_weeks_db:
if t_yw not in l_weeks_csv:
# print('A supprimer :', t_yw) # MAP : Permet de lister les semaines qui sont à supprimer.
l_drop.append(t_yw)
""" 4 - Liste des semaines à ajouter à la db (celles des csv qui n'existent pas en db). """
l_add = list()
for t_yw in l_weeks_csv:
if t_yw not in l_weeks_db:
# print('A ajouter :', t_yw) # MAP : Permet de lister les semaines qui sont à ajouter.
l_add.append(t_yw)
""" 5 - Liste des semaines de ticks pour lesquelles les volumes des candles sont à updater. """
l_vol = list()
if not b_ticks: # Renkos non concernés.
""" Semaines ticks existantes (b_ticks=True). """
_, l_weeks_tick_db = self._existing_lists(b_ticks=True)
for yw in l_weeks_tick_db:
if yw in l_weeks_db:
if not self.db_candle.is_volume_in_week(*yw):
l_vol.append(yw)
return l_drop, l_add, l_vol
def _add_weeks(self, l_yw, b_ticks):
"""
@param l_yw : Liste des semaines à ajouter à la base de données.
@param b_ticks: True = ticks, False = candles.
"""
db = self.db_tick if b_ticks else self.db_candle
self.ut.printc(f"\nArrêt manuel : garder la touche '{self.kb_break.upper()}' appuyée.\n")
for iso_yw in l_yw:
if keyboard.is_pressed(self.kb_break):
self.ut.printc("Arrêt manuel demandé.")
break
l_csv_files, _ = self._get_csv_files(*iso_yw, b_ticks) # Liste de 0, 1 ou 2 fichiers.
for csv_file in l_csv_files:
df = self._csv_to_df(csv_file)
if df.shape[0] > 0:
db.df_to_table(df)
def get_stamps(self, table_name, nb_stamps):
db = self.db_tick if (table_name == 'Ticks' or table_name[0] == 'r') else self.db_candle
return db.get_last_stamps(table_name, nb_stamps)
def get_datas(self, table_name, final_stamp, nb_points):
b_ticks = table_name == 'Ticks' or table_name[0] == 'r' # Ticks ou Renko
db = self.db_tick if b_ticks else self.db_candle
return db.get_datas(table_name, final_stamp, nb_points)
def main(): # MAP ***** MAP ***** MAP ***** MAP ***** MAP ***** MAP ***** MAP *****
""" Code de MAP (mise au point). Espace expérimental : ajouter, supprimer, commenter, décommenter, ... """
h = CtrlHistos('EUR/USD')
# h = CtrlHistos('USD/JPY')
# h = CtrlHistos('GBP/USD')
""" Candles. """
# h.download_histos(1, b_ticks=False)
# h.synchro_db_csv(b_ticks=False)
h.verify_weeks(b_ticks=False)
""" Ticks. """
# h.download_histos(1, b_ticks=True)
# h.synchro_db_csv(b_ticks=True)
h.verify_weeks(b_ticks=True)
if __name__ == '__main__':
main()
Oui mais ... une nouvelle version de la classe DataBase est nécessaire pour satisfaire à l'implémentation des calculs dans le node histos :
self.pips modifié.get_datas() ajoutée.get_otable() ajoutée.☐ Fichier /trading/historiques/db.py :
from peewee import SqliteDatabase
from functions.utils import DateTime, Utils
import datetime
import pandas as pd
import numpy as np
import keyboard
import os
""" (Russe - vidéo) https://www.youtube.com/watch?v=8dla28TLvwA <- Ctrl + Clic
(Français - txt) https://linuxtut.com/fr/9d4e1d0afac1865acdbb/
(Anglais - vidéo) https://www.youtube.com/watch?v=Vk6Ptnvqr4M
"""
# noinspection PyUnresolvedReferences
class DataBase:
def __init__(self, o_ctrl, b_ticks):
self.o_ctrl = o_ctrl
self.b_ticks = b_ticks
self.dt = DateTime()
self.ut = Utils()
self.kb_break = o_ctrl.kb_break # Nom de la touche du clavier défini dans le contrôleur CtrlHistos.
self.pips = o_ctrl.pips
self.week_delta = datetime.timedelta(weeks=1) # <class 'datetime.timedelta'>
self.day_delta = datetime.timedelta(days=1) # <class 'datetime.timedelta'>
self.o_table = None
pd.set_option('mode.chained_assignment', None)
""" 3 valeurs pour b_ticks : True (ticks), False (candles) ou None (renkos). """
file_name = 'tick' if b_ticks is True else ('candle' if b_ticks is False else 'renko')
sql_file = os.path.abspath(f"{o_ctrl.symbol_dir}/{file_name}.sql")
self.db = SqliteDatabase(sql_file)
self.d_tables = dict()
""" Création physique du fichier *.sql sur disque dur. """
self.db.connect()
def change_datetime_column(self, df):
""" Mise en conformité de la colonne DateTime : string to float. """
if 'DateTime' in df.columns:
df = df.rename(columns={'DateTime': 'timestamp'})
dt0 = np.datetime64(self.dt.get_dtstr_from_dtstamp(0, dt_format='%Y-%m-%d %H:%M:%S')) # '1970-01-01 01:00'
t_delta = np.timedelta64(1, 's') # <class 'numpy.timedelta64'>
pd_dt = pd.to_datetime(df.timestamp, format='%m/%d/%Y %H:%M:%S.%f') # format nécessaire, sinon très lent.
last_stamp = (np.datetime64(pd_dt.iloc[-1]) - dt0) / t_delta
o_d = self.dt.get_date_from_dtstamp(last_stamp)
yw = o_d.isocalendar()[:2] # (year, week)
len_batch = int(.95 * df.shape[0] / 50)
l_ts = list()
self.ut.gauge(f'Conformité {yw}')
for _from in range(0, df.shape[0], len_batch):
if keyboard.is_pressed(self.kb_break):
self.ut.printc("Arrêt manuel demandé.")
self.ut.gauge('end')
return None
_to = _from + len_batch
self.ut.gauge()
l_ts += [(np.datetime64(x) - dt0) / t_delta for x in pd_dt[_from: _to]]
self.ut.gauge('end')
df.timestamp = l_ts
return df
def get_db_sign(self):
nb_enr = 0
for o_table in list(self.d_tables.values()):
nb_enr += o_table.select().count()
return nb_enr
def _get_range_from_df(self, df):
""" Renvoie un tuple de stamps, qui encadrent df, à l'extérieur de df (samedis 22:00). """
f_stamp = df['timestamp'].iloc[0] # Premier stamp réel dans df. <-- début.
l_stamp = df['timestamp'].iloc[-1] # Dernier stamp réel dans df. <-- fin.
stamp_before = f_stamp + self.dt.get_stamp_offset(f_stamp, b_after=False, num_day=6, _time='22:00')
stamp_after = l_stamp + self.dt.get_stamp_offset(l_stamp, b_after=True, num_day=6, _time='22:00')
return stamp_before, stamp_after
def _get_range_from_yw(self, year, str_week):
stamp = self.dt.get_dtstamp_from_dtstr(f'{year}-{str_week}-3 10', dt_format='%G-%V-%u %H') # Mercredi 10:00
stamp_before = stamp + self.dt.get_stamp_offset(stamp, b_after=False, num_day=6, _time='22:00')
stamp_after = stamp + self.dt.get_stamp_offset(stamp, b_after=True, num_day=6, _time='22:00')
return stamp_before, stamp_after
def is_volume_in_week(self, *yw):
""" Compte le nombre d'enregistrements total et le nombre d'enregistrements ayant du volume. """
""" Limites de la semaine : samedi 22:00 à samedi 22:00. """
range_stamps = self._get_range_from_yw(*yw)
""" Parcours de toutes les tables et cumul des valeurs. """
nb_total = nb_volume = 0
for o_table in list(self.d_tables.values()):
nb_total += o_table.select(o_table.Volume).where(
o_table.timestamp.between(*range_stamps)
).count()
nb_volume += o_table.select(o_table.Volume).where(
o_table.timestamp.between(*range_stamps),
~o_table.Volume.is_null()
).count()
return nb_volume > nb_total * .98 # Booléen.
def delete_week(self, *yw):
""" Supprime la semaine {yw} de chacune des tables """
range_stamps = self._get_range_from_yw(*yw)
for o_table in list(self.d_tables.values())[::-1]: # Parcours à l'envers.
o_table.delete().where(o_table.timestamp.between(*range_stamps)).execute()
def get_datas(self, table_name, final_stamp, nb_points):
o_table = self.get_otable(table_name)
""" 1 - Lecture des {nb_points} derniers points, le plus récent en premier. """
l_datas = o_table.select().order_by(o_table.timestamp.desc()).where(o_table.timestamp <= final_stamp).limit(nb_points)
""" 2 - Peewee -> DataFrame Pandas """
df = pd.DataFrame(list(l_datas.dicts()))
""" 3 - DataFrame Pandas -> Numpy avec retournement : le plus récent en dernier. """
np_datas = df.values[::-1] # Retournement
return np_datas
def get_otable(self, table_name):
if table_name == 'Ticks':
return self.o_table
else:
d_tables = self.o_renko.d_tables if table_name[0] == 'r' else self.d_tables
return d_tables.get(table_name)
def get_last_stamps(self, table_name, nb_points):
o_table = self.get_otable(table_name)
""" 1 - Lecture des {nb_points} derniers points, le plus récent en premier. """
l_datas = o_table.select(o_table.timestamp).order_by(o_table.timestamp.desc()).limit(nb_points)
""" 2 - Peewee -> DataFrame Pandas """
df = pd.DataFrame(list(l_datas.dicts()))
""" 3 - DataFrame Pandas -> List avec retournement : le plus récent en dernier. """
return list(df['timestamp'])[::-1] if df.shape[0] > 0 else []
""" Méthodes surchargées. """
def df_to_table(self, df):
return True
def update_derived(self, df):
pass
def update_derived_table(self, df, stamp_before, stamp_after, o_derived_table):
pass
def week_exists(self, year, str_week, last_stamp):
return False
Oui mais ... une nouvelle version de la classe CtrlCalcul est nécessaire pour satisfaire à l'implémentation des calculs dans le node histos :
self.len_buffer devient un getter (une @property).
self.len_buffer (voir méthode get_signature()).
# Imports externes
import copy
import shutil
import os
import hashlib
import numpy as np
import pickle
# Imports internes
from functions.utils import Dictionary, DateTime, Utils
from pc.ui_node import UiNode
from pc.parameters import Parameters
from pc.ctrl_socket import CtrlSocket
from pc.ctrl_edge import CtrlEdge
class Helper:
@staticmethod
def deepcopy(reference):
return copy.deepcopy(reference)
@staticmethod
def new_od(dic=None):
return Dictionary(dic)
@staticmethod
def join(*l_variants, sep='-'):
""" Exemple d'appel 1 : y = self.join(a, b, c, d, ...)
Exemple d'appel 2 : y = self.join(*l_x) <-- Ne pas oublier '*'
Concatène les éléments de l_strings en une chaine. Les chaînes vides sont ignorées. le séparateur est sep.
:param l_variants: Nombre quelconque d'éléments de tout type.
:param sep: '-' par défaut. Peut être '' (vide) ou \n (retour à la ligne) ... ou autre.
:return: Chaîne concaténée.
Exemple l_strings = ['Paris', 'Lille', '', 'Bruxelles', 'Prague'] => return "Paris-Lille-Bruxelles-Prague"
"""
string_txt = ''
for string in l_variants:
if string != '':
string_txt += f'{sep}{string}' # Accepte tout type de variable pour string.
return string_txt[len(sep):] # Suppression du 1er sep.
class CtrlNode(Helper):
def __init__(self, o_scene, s_id, pos):
self.o_scene = o_scene
self.s_id = s_id # Clé d'entrée dans le super-dictionnaire pkl.
self.od_pkl = self.o_scene.o_pkl.od_pkl
self.o_grnode = None
self.o_params = None
self.pos = pos # pos = (x, y)
self.width = 96
self.height = 60
self.type = ''
self.ut = Utils()
self.main_key = f"Paramètres du node '{self.s_id}'"
self.b_chk = bool(self.od_pkl.read([self.s_id, 'b_chk'], True))
self.b_on = self.b_chk
self.child_file = ''
""" Sockets. """
self.lo_sockets_in = list() # Liste d'objets 'sockets in' instanciés.
self.lo_sockets_out = list() # Liste d'objets 'sockets out' instanciés.
self.default_color = '#88bbff' # Couleur par défaut des sockets de sortie.
self.l_sep_inputs = [] # Emplacement des séparateurs (après les entrées indiquées).
self.l_sep_outputs = [] # Emplacement des séparateurs (après les sorties indiquées).
""" Contenu spécifique. """
self.o_grcontent = None
""" Secousses => Court-circuits. """
self.dt = DateTime()
self.l_short_circuits = list() # Surchargé par les classes dérivées.
self.l_shortables = list()
self.b_removable = True # Les edges d'entrée peuvent être détruits en cas de court-circuit.
self.k_shake = 0 # Nombre de secousses.
self.b_shaking = False # Secousse en cours.
self.x, self.y = 0, 0 # Position antérieure (x, y), mémorisée.
self.dx, self.dy = 0, 0 # Deltas (variations) antérieurs, mémorisés.
def setup(self, child_file):
""" Code appelant : classe dérivée, on connait donc ses attributs. """
""" Paramètres. """
self.o_params = Parameters(self)
""" Hauteur automatique du node : elle dépend du nombre de sockets en entrée ou en sortie (le plus grand). """
h_title = self.d_display['geometry']['h_title']
nb_sockets_max = max(len(self.ld_inputs), len(self.ld_outputs)) # En fait : Sockets + séparateurs.
first_y = 16 * (1 + (h_title + 4) // 16) # Multiple de 16.
self.height = first_y + nb_sockets_max * 16 - 4
""" Nouveau node par drag & drop. """
self.child_file = child_file
b_new = self.pos != (0, 0) # Nouveau node (node dropé).
if b_new:
""" - Cas d'un nouveau node (dropé).
- Son centre apparaît exactement à la position du curseur de la souris.
- On mémorise son path et sa position. """
self.pos = self.pos[0] - self.width // 2, self.pos[1] - self.height // 2
path = 'nodes' + os.path.relpath(child_file).split('nodes')[1][:-3].replace(os.sep, '.')
self.od_pkl.write([self.s_id, 'path'], path)
else:
self.pos = self.od_pkl.read([self.s_id, 'position'], (0, 0))
""" Instanciation de l'UI. """
self.o_grnode = UiNode(self) # Gestion de l'UI (Interface utilisateur) : dessin couleurs, ...
if b_new:
self.o_grnode.save_pos() # Important ! à la fin (contient o_pkl.backup()).
self.o_scene.o_grscene.addItem(self.o_grnode) # Incorporation dans la scène.
""" Sockets. """
num_socket = 0
for i, d_input in enumerate(self.ld_inputs):
""" On complète les dictionnaires de base. """
if isinstance(d_input, dict): # Si ce n'est pas un dictionnaire, c'est un séparateur.
d_input['pos'] = True, i # On ajoute l'emplacement. Tuple (input: True, num_position).
d_input['id'] = self.id, num_socket # On ajoute l'identifiant. Tuple (id_node, num_socket)
self.lo_sockets_in.append(CtrlSocket(self, d_input)) # Socket instancié et ajouté à la liste.
num_socket += 1
num_socket = 0
for i, d_output in enumerate(self.ld_outputs):
""" On complète les dictionnaires de base. """
if isinstance(d_output, dict): # Si ce n'est pas un dictionnaire, c'est un séparateur.
d_output['pos'] = False, i # On ajoute l'emplacement. Tuple (input: False, num_position).
d_output['id'] = self.id, num_socket # On ajoute l'identifiant. Tuple (id_node, num_socket)
self.lo_sockets_out.append(CtrlSocket(self, d_output)) # Socket instancié et ajouté à la liste.
num_socket += 1
self.short_circuit_check()
@property
def id(self):
""" Ex : 'Node 14' renvoie 14. """
return int(self.s_id[4:])
@property
def d_display(self):
""" Attributs par défaut pouvant être surchargés dans les classes dérivées. """
return {
'geometry': {'h_title': 32, 'round': 6},
'col_pen': {'select': '#ffa637', 'hover': '#888', 'on': "#000", 'off': '#aaaa00'}, # ordre = priorité.
'col_brush': {'on': "#8888bbcc", 'off': '#cccccccc'},
'thick_pen': {'hover': 4., 'leave': 2.}
}
def get_default(self):
""" Titre du node et couleurs des 'socket_out' pour chaque type de node. """
""" Complétée par les classes dérivées avec leurs paramètres statiques et dynamiques. """
""" 1) Paramètres communs : Titre, bouton et autant de couleurs que de sorties. """
d_default = {
'Titre du node': 'Choisir un titre',
'*🔆': '' # Un astérisque devant indique qu'il s'agit d'un bouton.
}
""" 2) Ajout de paramètres fixes spécifiques au type de node, AVANT les couleurs. """
d_default.update(self.fixed0_params()) # méthode fixed_params()
""" 3) Couleurs. """
d_colors = dict()
for d_output in self.ld_outputs:
if isinstance(d_output, dict):
d_colors[d_output['label']] = self.default_color
if d_colors:
d_default['Couleurs'] = d_colors
""" 4) Ajout de paramètres fixes spécifiques au type de node, APRES les couleurs. """
d_default.update(self.fixed_params()) # méthode fixed_params()
""" 5) Ajout des paramètres dynamiques, qui dépendent des signaux entrants. """
d_default.update(self.dynamic_params) # propriété dynamic_params
""" 6) Dictionnaire complet. Paramètres : fixes communs + fixes spécifiques + dynamiques. """
return {self.main_key: d_default}
def get_param(self, l_key, v_default=None):
if self.o_params is None:
return v_default
od_params = Dictionary(self.o_params.od_params[self.main_key])
return od_params.read(l_key, v_default)
def set_checked(self, val):
self.b_chk = bool(val)
""" Update my_signals() """
for o_socket_out in self.lo_sockets_out:
o_socket_out.to_update()
""" Infrastructure. """
self.o_scene.infrastructure()
""" Enregistrement. """
self.od_pkl.write([self.s_id, 'b_chk'], self.b_chk)
self.o_scene.o_ur.b_action = True # Ajout dans l'historique du "Undo-Redo".
self.o_scene.o_pkl.backup()
""" ************************* Méthodes à surcharger ************************** """
@property
def ld_inputs(self):
return list()
@property
def ld_outputs(self):
return list()
def fixed0_params(self):
""" - Renvoie un dictionnaire de paramètres, indépendants des connexions.
- Placé AVANT les couleurs.
- Exemple : le node de type 'Labo' renvoie 'Expérience', etc. """
return {}
def fixed_params(self):
""" - Renvoie un dictionnaire de paramètres, indépendants des connexions.
- Placé APRES les couleurs.
- Exemple : le node de type 'Signaux' renvoie 'Sinus', 'Cosinus', etc. """
return {}
def bundle_params(self):
""" - Renvoie un dictionnaire de paramètres pour chaque entrée.
- Une entrée reçoit un faisceau de signaux (bundle). """
return {}
def my_params(self, context):
"""
:param context: Liste de 7 valeurs, pouvant être utilisée pour construire le dictionnaire de sortie :
- context[0] typ_id_from = Type + id du node en amont, producteur du signal. Ex : 'MM12'.
- context[1] signal_ante_from = Nom du signal créé par le node en amont du node producteur. Ex : 'Cosinus'.
- context[2] signal_now_from = Nom du signal créé par le node en amont. Ex : 'SMA18'.
- context[3] signal_source_from = 'Provenance du signal' affiché dans le node en amont.
- context[4] (num_input, num_signal) = Tuple. num_signal = ordre du signal dans le faisceau entrant.
- context[5] signal_title = Titre du signal dans le dockable des paramètres.
- context[6] signal_source = 'Provenance du signal' -> texte avec retours à la ligne.
:return: Dictionnaire formaté pour les paramètres.
|_ Exemple {'Type de MM': ['SMA', {'values': ['SMA', 'EMA', 'SMMA', 'LWMA']}], "Périodes (sep=',')": '14'}
"""
return {}
def my_signals(self, l_signals_in, num_socket_out):
""" Description des signaux délivrés à la sortie N° num_socket_out.
Comment ça marche ?
- Cette fonction prend en entrée tous les signaux entrants ainsi que les paramètres pkl.
- Un traitement spécifique est décrit, qui produit plusieurs signaux.
- Ces signaux sont ensuite distribués aux différentes sorties du node.
:param l_signals_in: Signaux d'entrée. Cette liste contient autant d'éléments que d'entrées connectées.
- Chaque élément est un tuple : (typ_id_from, signal_ante_from, signal_now_from, signal_source_from, \
(num_input, num_signal), signal_title, typ_id, signal_ante, signal_source)
(Les variables suffixées de '_from' proviennent du node en amont).
:param num_socket_out: N° de la sortie.
:return: Liste de signaux exclusivement destinés à la sortie N° num_socket_out.
Chaque signal est décrit par un tuple à 4 valeurs : (typ_id, signal_ante, signal_now, signal_source).
La valeur de signal_now doit être soigneusement choisie afin d'éviter des doublons de noms.
"""
raise SystemExit(self.child_file + '\nCtrlNode.my_signals() : Cette méthode doit être surchargée.')
""" *********************** Fin - Méthodes à surcharger ********************** """
@property
def dynamic_params(self):
""" Code appelant : self.get_default().
Ces paramètres sont dynamiques car ils dépendent des signaux connectés aux entrées.
- Ce node demande la liste des signaux à chaque socket d'entrée : o_socket_in.l_signals
- Cette liste est formatée de la même façon pour tous les types de node.
- C'est une liste de tuples, chaque tuple a 4 valeurs et caractérise un signal.
- Il est rappelé qu'un edge véhicule non pas un signal, mais un faisceau de signaux (un faisceau).
- Le format d'un tuple est le suivant :
- typ_id_from = Type + id du node en amont, producteur du signal. Ex : 'MM12'.
- signal_ante_from = Nom du signal créé par le node en amont du node producteur. Ex : 'Cosinus'.
- signal_now_from = Nom du signal créé par le node producteur. Ex : 'SMA18'.
- signal_source_from = Texte destiné à être affiché dans le dockable, dans 'Provenance du signal :'
- Node
|_ Socket_in
|_ Faisceau de signaux
|_ Signal
|_ Paramètre dynamique.
"""
if not self.lo_sockets_in:
""" Tant que l'initialisation n'est pas terminée, lo_sockets_in est une liste vide."""
return {}
d_params = dict()
nb_inputs = len(self.lo_sockets_in)
for k in range(nb_inputs):
d_input = self.bundle_params() # Paramètres pour cette entrée.
""" Récupération en cascade : Socket_in -> Edge -> Socket_out -> Node -> Socket_in -> etc. """
for num_signal, t_signal in enumerate(self.lo_sockets_in[k].l_signals):
typ_id_from, signal_ante_from, signal_now_from, signal_source_from = t_signal[:4] # tuple à 4 valeurs.
signal_title = self.join(typ_id_from, signal_ante_from, signal_now_from)
signal_source = self.join(signal_source_from, self.join(typ_id_from, signal_now_from), sep='\n')
""" Paramètres communs à tous les signaux. """
d_input[signal_title] = {
'$Provenance du signal :': [signal_source, {'readonly': True}],
'Signal actif': True,
}
l_args = [num_signal, signal_title, signal_source] + list(t_signal)
d_input[signal_title].update(self.my_params(l_args))
d_params[f'Entrée {k}'] = d_input
if nb_inputs == 1 and 'Entrée 0' in d_params:
""" S'il n'y a qu'une entrée, inutile d'écrire, dans le titre des paramètres, de laquelle il s'agit. """
d_params = d_params['Entrée 0']
return d_params
def get_signals(self, num_socket):
""" Code appelant : oSocket_out N° num_socket.
- Ce node crée des signaux et les fournit aux nodes en aval par l'intermédiaire des sockets et des edges.
- Il crée autant de faisceaux de signaux (vecteurs) qu'il a de sorties.
- Pour chaque sortie, la liste fournie est formatée de la même façon pour tout type de node.
- Voir détails ci-dessus, dans dynamic_params().
"""
if not (self.b_chk and self.lo_sockets_out): # Setup en cours
return []
""" Update od_params. """
self.o_params.set_params()
nb_inputs = len(self.lo_sockets_in)
l_all_signals_in = list()
for k in range(nb_inputs):
""" Le nom de l'entrée se sera pas écrit dans le titre des paramètres, s'il n'y en a qu'une. """
l_name_input = [f'Entrée {k}'] if nb_inputs > 1 else []
l_signals = list() # Liste des signaux actifs de l'entrée N° k.
""" self.lo_sockets_in[k].l_signals = Faisceau de signaux arrivant sur l'entrée k. """
for num_signal, t_signal in enumerate(self.lo_sockets_in[k].l_signals):
typ_id_from, signal_ante_from, signal_now_from, signal_source_from = t_signal[:4] # tuple à 4 valeurs.
signal_title = self.join(typ_id_from, signal_ante_from, signal_now_from)
if self.get_param(l_name_input + [signal_title, 'Signal actif'], False):
typ_id, signal_ante = f'{self.type}{self.id}', self.join(signal_ante_from, signal_now_from)
signal_source = self.join(signal_source_from, self.join(typ_id_from, signal_now_from), sep='\n')
l_signals.append(t_signal[:4] + ((k, num_signal), signal_title, typ_id, signal_ante, signal_source))
l_all_signals_in.append(l_signals)
return self.my_signals(l_all_signals_in, num_socket)
""" ******************************* Secousses ******************************* """
def short_circuit_check(self):
""" Passage unique dans cette méthode, au démarrage. Fin programme si erreur.
- Peut-on supprimer les edges d'entrée en cas de court-circuit ?
- Oui, à condition que TOUTES les sorties existent dans self.l_short_circuits. """
l_indx_outputs = list() # Liste des index de sortie.
d_counter = dict()
for short_circuit in self.l_short_circuits: # Exemple -> self.l_short_circuits = [(0, 0), (0, 2)]
num_input = short_circuit[0]
num_output = short_circuit[1]
if num_input >= len(self.ld_inputs):
""" Fin programme. """
raise SystemExit(f"La liste des court-circuits du type de node '{self.type}' est erronée."
f"\nL'entrée N°{num_input} est hors limites."
f"\nRevoir le contenu de self.l_short_circuits dans le setup de '{self.type}'.")
if num_output >= len(self.ld_outputs):
""" Fin programme. """
raise SystemExit(f"La liste des court-circuits du type de node '{self.type}' est erronée."
f"\nLa sortie N°{num_output} est hors limites."
f"\nRevoir le contenu de self.l_short_circuits dans le setup de '{self.type}'.")
l_indx_outputs.append(num_output) # Dans cet exemple -> ( 0 , 2 )
if num_output in d_counter:
d_counter[num_output] += 1
else:
d_counter[num_output] = 1
for i, d_output in enumerate(self.ld_outputs):
if isinstance(d_output, dict):
if i not in l_indx_outputs:
""" Condition non satisfaite. """
self.b_removable = False # b_removable est à True dans __init__().
break
""" Contrainte technologique : Plusieurs entrées sur une même sortie est interdit ! """
for nb_inputs in d_counter.values():
if nb_inputs > 1:
""" Fin programme. """
raise SystemExit("Plusieurs entrées sont court-circuitables sur une même sortie."
f"\nRevoir le contenu de self.l_short_circuits dans le setup de '{self.type}'.")
def set_shortables(self):
""" Méthode appelée lorsque le bouton gauche de la souris est appuyé (sur ce node).
- Chaque type de node possède sa liste de court-circuits faisables.
- Retourne une liste de tuples.
- Chaque tuple est composé des 2 gr_edges pouvant être réunis pour n'en faire qu'un. """
self.l_shortables = list()
for short_circuit in self.l_short_circuits: # Exemple -> self.l_short_circuits = [(0, 0), (0, 2)]
lo_gredges_in = list(self.lo_sockets_in[short_circuit[0]].get_gredges())
lo_gredges_out = list(self.lo_sockets_out[short_circuit[1]].get_gredges())
if lo_gredges_in: # Cette liste contient 0 ou 1 edge.
for o_gredge_out in lo_gredges_out:
self.l_shortables.append((lo_gredges_in[0], o_gredge_out))
def shake(self):
def shortcircuit():
for to_gredge in self.l_shortables: # Tuples de gredges.
o_socket_out = to_gredge[0].o_edge.o_socket_out # From, depuis.
o_socket_in = to_gredge[1].o_edge.o_socket_in # To, jusqu'à.
""" Suppression des 2 edges du tuple. """
self.o_scene.o_grscene.removeItem(to_gredge[1]) # Edge de sortie.
if self.b_removable:
self.o_scene.o_grscene.removeItem(to_gredge[0]) # Edge d'entrée.
""" Création du nouvel edge de court-circuit. """
CtrlEdge(self.o_scene, o_socket_out, o_socket_in)
o_socket_in.to_update()
""" Persistance. """
self.o_scene.save_edges(backup=True)
""" Infrastructure. """
self.o_scene.infrastructure()
def raz_shakes():
""" Après cette raz, un nouveau court-circuit est autorisé. """
self.k_shake = 0
self.b_shaking = False
if self.b_shaking or not self.l_shortables:
return
self.dt.delay(raz_shakes, delay=300)
x, y = self.o_grnode.pos().x(), self.o_grnode.pos().y() # Positions.
dx, dy = x - self.x, y - self.y # Déplacements (delta x, delta y).
b_cond = (round(dx*self.dx) < 0) or (round(dy*self.dy) < 0)
""" Mémorisation. """
self.x, self.y = x, y
self.dx, self.dy = dx, dy
if b_cond:
""" Le mouvement a changé de sens => incrémentation du compteur. """
self.k_shake += 1
if self.k_shake > 10: # Valeur à ajuster.
self.b_shaking = True
self.k_shake = 0
self.dt.delay(shortcircuit, delay=50)
""" ***************************** Fin secousses ***************************** """
def set_colors(self, l_keys):
""" - La couleur est appliquée au socket_out concerné.
- Elle est également appliquée à tous ses éventuels edges connectés.
- A leur tour, ceux-ci propagent cette couleur à leur socket d'arrivée. """
for o_socket_out in self.lo_sockets_out:
if o_socket_out.__class__.__name__ == 'CtrlSocket':
if o_socket_out.label == l_keys[-1]:
""" Affectation de la couleur au socket_out concerné. """
o_socket_out.color = self.get_param(l_keys, '#666')
""" Récupération de tous les edgess partant de ce socket. """
l_gredges = list(o_socket_out.get_gredges())
for o_gredge in l_gredges:
""" Affectation de la même couleur à chaque edge. """
o_gredge.o_edge.color = o_socket_out.color
""" Affectation de la même couleur à chaque socket d'arrivée. """
o_gredge.o_edge.o_socket_in.color = o_socket_out.color
""" Rafraîchissement de l'affichage. """
self.o_grnode.update()
def need_update(self, l_keys):
"""
:param l_keys: Clé, dans od_params, du paramètre modifié (liste).
:return: True si les nodes en aval nécessitent d'être recalculés, False sinon.
"""
l_param_names = list(self.my_params('')) + list(self.fixed_params()) # Les paramètres du dockable.
b_actif = self.get_param([l_keys[1], 'Signal actif'], True) # True 'Signal actif' n'existe pas (générateurs).
""" Update nécessaire (si l'état actif a changé) OU (si actif ET un des paramètres a changé). """
return (l_keys[-1] == 'Signal actif') or (b_actif and l_keys[-1] in l_param_names)
def get_state(self):
""" Chaque node, selon ses spécificités, retourne un booléen de son état on/off (True/False). """
b_on = False
if self.b_chk:
for key in self.o_params.od_params.key_list():
if key[-1] == 'Signal actif':
if self.o_params.od_params.read(key, False):
b_on = True
break
return b_on
def recalculate(self, num_input):
""" - Propagation vers l'aval. Les nodes finaux sont des afficheurs ou des actionneurs.
- Après cela, les nodes finaux construisent leur dict de calcul par propagation vers l'amont. """
for o_socket_out in self.lo_sockets_out:
o_socket_out.recalculate()
def update_dcalc(self, d_calc):
if self.b_on:
""" Propagation amont. """
for o_socket_in in self.lo_sockets_in:
o_socket_in.update_dcalc(d_calc)
od_params = Dictionary(self.o_params.od_params[self.main_key])
""" Construction du dictionnaire pour les calculs, à partir du od_params. """
od_calc = Dictionary()
path = 'nodes' + os.path.relpath(self.child_file).split('nodes')[1][:-3].replace(os.sep, '.')
od_calc.write('Compute', self.type)
od_calc.write('Yaml file', self.get_yaml_files()[0])
od_calc.write('Compile from', path)
od_calc.write('Checked', self.b_chk)
for key in od_params.key_list():
if key[0] == 'Titre du node' or key[0] == 'Couleurs' or key[0] == '*🔆':
continue
if key[-1] == '$Provenance du signal :':
continue
od_calc.write(key, od_params.read(key))
""" Modification 'en place'. """
d_calc[self.s_id] = dict(od_calc)
def get_yaml_files(self):
py_file = os.path.basename(self.child_file)
seeder = self.child_file.replace(py_file, f'{py_file[:-3]}_seeder.yaml')
path_node = f"{__file__.split('pc')[0]}backups/{self.o_scene.graph_name}/node{self.id}"
extended_params = os.path.abspath(f'{path_node}/{self.id}-{py_file[:-3]}.yaml')
return extended_params, seeder
def yaml(self):
""" Méthode appelée lorsqu'on clique sur le bouton '🔆'. Lancement de l'appli associée à '.yaml'. """
extended_params, seeder_file = self.get_yaml_files()
path_node = os.path.dirname(extended_params)
os.makedirs(path_node, exist_ok=True)
if not os.path.exists(extended_params):
""" Création du fichier de paramètress étendus en yaml : copie du seeder si possible, sinon vide. """
if os.path.exists(seeder_file):
shutil.copyfile(seeder_file, extended_params)
else:
with open(extended_params, 'w'):
pass
if not os.path.exists(extended_params):
return
""" Lancement du fichier avec application associée. """
os.startfile(extended_params)
def rebuild(self, b_input=True):
""" Reconstruction du node suite à un changement dynamique du nombre d'entrées/sorties. """
""" Suppression des (sockets + edges) de ce node. """
lo_sockets = self.lo_sockets_in if b_input else self.lo_sockets_out
for o_socket in lo_sockets:
""" Suppression des éventuels edges connectés à ce socket. """
for o_gredge in list(o_socket.get_gredges()):
self.o_scene.o_grscene.removeItem(o_gredge)
""" Suppression du socket. """
self.o_scene.o_grscene.removeItem(o_socket.o_grsocket)
o_socket.o_grsocket = None # Suppression de l'objet.
""" Nettoyage de la liste des entrées/sorties. """
lo_sockets.clear()
""" Redessine le node avec sa nouvelle hauteur. """
ld_inputs, ld_outputs = self.deepcopy(self.ld_inputs), self.deepcopy(self.ld_outputs)
nb_inputs, nb_outputs = len(ld_inputs), len(ld_outputs)
h_title = self.d_display['geometry']['h_title']
first_y = 16 * (1 + (h_title + 4) // 16)
self.height = first_y + max(nb_inputs, nb_outputs) * 16 - 4 # Nouvelle hauteur.
self.o_grnode.set_outline()
""" Régénération de sockets. """
for pos, d_input in enumerate(ld_inputs):
""" On complète les dictionnaires de base. Pas de séparateurs. """
if isinstance(d_input, dict):
indx = len(self.lo_sockets_in)
d_input['pos'] = True, pos # On ajoute la pos. Tuple (input: True, num_position).
d_input['id'] = self.id, indx # On ajoute l'identifiant. Tuple (id_node, num_socket)
self.lo_sockets_in.append(CtrlSocket(self, d_input)) # Socket instancié et ajouté à la liste.
for pos, d_output in enumerate(ld_outputs):
""" On complète les dictionnaires de base. Pas de séparateurs. """
if isinstance(d_output, dict):
indx = len(self.lo_sockets_out)
d_output['pos'] = False, pos # On ajoute la pos. Tuple (output: False, num_position).
d_output['id'] = self.id, indx # On ajoute l'identifiant. Tuple (id_node, num_socket)
self.lo_sockets_out.append(CtrlSocket(self, d_output)) # Socket instancié et ajouté à la liste.
""" Régénération des edges. """
self.o_scene.show_edges()
self.o_scene.save_edges()
""" Dockable des paramètres : le nombre d'entrées a changé. """
self.o_params.set_params()
self.dt.delay(self.o_scene.show_params, 2000) # delay : Permet de continuer la saisie dans le dockable.
def refresh(self, l_keys):
if l_keys[-1] == '🔆':
self.yaml()
elif l_keys[-1] == 'Titre du node':
""" Titre du node. """
self.o_grnode.set_title()
elif l_keys[-2] == 'Couleurs':
""" Couleur des sockets et des edges. """
self.set_colors(l_keys[1:])
else:
""" Fin de refresh - Appel conditionnel à infrastructure(). """
if self.b_chk and (l_keys[0] == 'infrastructure' or l_keys[-1] == 'Signal actif'):
""" Les nodes de type générateur doivent insérer le mot 'infrastructure'
en 1ère place dans la liste l_keys[], car ils n'ont pas 'Signal actif' dans leurs paramètres. """
self.o_scene.infrastructure()
""" Tous les paramètres, autres que le titre et les couleurs, influencent les calculs. """
self.o_scene.recalculate()
class CtrlCalcul(Helper):
""" Attributs de classe. """
d_servers = dict() # Dictionnaire de classe.
od_calcs = Dictionary()
def __init__(self):
self.id = None
self.s_id = None
self.compute = None
self.o_yaml = None
self.l_mykey = list()
self.np_array = None
self.b_forcing_calc = False
self.od_descr = Dictionary()
self.od_mydic = Dictionary()
self.nb_points = self.len_buffer
self.no_params = ['Compute', 'Yaml file', 'Compile from', 'Checked']
def setup_server(self, id_server):
self.id = id_server
self.l_mykey = self.get_keys_from_idnode(id_server) # Clé dans self.od_calcs.
self.od_mydic = Dictionary(self.od_calcs.read(self.l_mykey))
self.o_yaml.yaml_file = self.od_mydic.read(['Yaml file'])
self.compute = self.od_mydic.read(['Compute'])
self.s_id = f"{self.compute}{self.id}"
self.o_yaml.update_odyaml() # Update du super-dictionnaire des paramètres (self.o_yaml.od_yaml).
@property
def len_buffer(self):
for l_key in self.od_calcs.key_list():
""" Recherche dans dictionnaire global (tous les nodes). """
if l_key[-1] == 'Abscisse':
len_buffer = self.od_calcs.read(l_key)
break
else:
len_buffer = 10**5
return int(len_buffer)
@property
def dt_servers_in(self):
""" - Le dictionnaire retourné contient autant de clés que d'entrées connectées et actives.
- Chaque valeur est un tuple : (o_server, signature, edge). """
""" 1 - Récupération de la liste de tous les edges dans self.od_calcs. """
s_edges = set()
ll_keys = self.od_calcs.key_list()
for l_keys in ll_keys:
if l_keys[-1] == 'edges':
s_edges.update(self.od_calcs.read(l_keys))
""" 2 - Instance et signature des nodes-serveurs en amont connectés aux entrées. """
d_servers = dict()
for edge in list(s_edges):
if edge[1][0] == self.id:
""" edge = Edge connecté à l'entrée de self. """
o_server, signature = self.get_server(edge[0][0])
d_servers[f'e{edge[1][1]}'] = o_server, signature, edge
return d_servers
@classmethod
def get_oserver(cls, id_server, compile_from):
""" Ce code est appelé par un node-client. Il aura un serveur par entrée.
- Chaque node-serveur est associé à une instance de classe Calcul (dérivée de CtrlCalcul).
- Au premier passage concernant ce node-serveur, l'objet o_server n'existe pas.
- Dans ce cas, on le crée par compilation dynamique et on l'enregistre dans un dictionnaire.
- Le dictionnaire est un dictionnaire de classe : cls.d_servers.
- Cela permet d'éviter de créer une nouvelle instance si celle-ci existe déjà.
"""
server_sid = f"{compile_from.split('.')[-1]}{id_server}" # Clé du dictionnaire des serveurs. ex : macd3
if server_sid not in cls.d_servers:
""" Compilation dynamique. Si l'objet o_server n'existe pas, on le crée."""
d_context = {}
code = f"from {compile_from} import Calcul; o_server=Calcul()"
try:
compiled = compile(code, '<string>', 'exec')
eval(compiled, d_context)
cls.d_servers[server_sid] = d_context['o_server']
except (Exception,) as err:
print(server_sid, ': Erreur de compilation dans', compile_from, '\n', err)
return None
return cls.d_servers[server_sid]
def get_keys_from_idnode(self, id_node):
"""
@param id_node: ex : 1
@return: Renvoie la clé d'accès au node 'node_sid', dans le dictionnaire général self.od_calcs.
"""
node_sid = f'Node{id_node}' # Ex : 'Node1'
for l_key in self.od_calcs.key_list():
if l_key[1] == node_sid:
return l_key[:2] # ['calc_*', 'Node*'] Clé du node-serveur dans le dictionnaire général self.od_calcs.
def get_server(self, id_server):
l_key_node = self.get_keys_from_idnode(id_server)
compile_from = self.od_calcs.read(l_key_node + ['Compile from'])
if compile_from is None:
return None
child = self.od_calcs.read(l_key_node + ['Expérience'], '')
compile_from += f'0{child}'[-2:] if isinstance(child, int) else ''
o_server = self.get_oserver(id_server, compile_from)
if o_server is None:
return None, None
o_server.setup_server(id_server)
return o_server, o_server.get_signature()
def get_signature(self):
""" La valeur de retour est un hash de od_yaml, od_mydic, liste des signatures des nodes-serveurs. """
l_signs_in = list()
for server_in in self.dt_servers_in.values():
""" server_in = tuple (instance, signature) du node-serveur. """
l_signs_in.append(server_in[1]) # server_in[1] = signature.
""" Assemblage des constituants de la signature. """
params = (self.o_yaml.od_yaml, self.od_mydic, l_signs_in, self.len_buffer)
return f'{self.id}-{hashlib.sha256(str(params).encode("utf-8")).hexdigest()}'
def calculation(self):
""" Ici node-serveur. Première méthode appelée par le node-client en aval, avant de commencer ses calculs. """
""" 1 - Chargement des derniers datas mémorisés en *.calc (pickle) : self.od_descr et self.np_array. """
self.load_last_datas() # Super-dictionnaire et tableau numpy vides si *.calc inexistant ou erroné.
""" 2 - Création du dict. de description actualisé od_descr, aux fins de comparaison avec self.od_descr. """
od_descr = self.setup_od_descr()
""" 3 - Comparaison sélective des descriptifs now et ante. Si aucune différence, return (calculs inutiles). """
self.nb_points = self.len_buffer
if self.now_equal_ante(od_descr):
return
self.od_descr = od_descr
""" 4 - Propagation des calculs en amont. """
min_points = self.len_buffer
for o_server, _, edge in self.dt_servers_in.values():
""" Bien que ce soit cette même méthode, ce n'est pas une récursivité, mais une propagation amont."""
o_server.calculation()
min_points = min(min_points, o_server.nb_points)
if o_server.b_forcing_calc:
self.b_forcing_calc = True
""" 5 - self.np_array vide. """
nb_cols = len([key for key in list(self.od_descr.keys()) if key.startswith(f'{self.s_id}-')])
self.np_array = np.full((self.len_buffer, nb_cols+1), fill_value=np.nan, dtype=np.float32) # nan
self.np_array[:, 0] = 0 # col0 = status (0 ou 1)
""" 6 - Traitement des calculs. """
self.pre_process()
self.nb_points = min(self.nb_points, min_points)
""" 7 - Persistance en *.calc (pickle). """
self.create_out_calc()
def load_last_datas(self):
""" Ici, node-serveur. Récupération des derniers datas depuis son fichier out_*.calc. Les datas sont :
- Un dictionnaire descriptif des consignes : 'self.od_descr'.
- Un tableau numpy : 'self.np_array'.
- Pour certains nodes, les datas ne sont pas dans out_*.cal, mais dans une base de données.
- Le cas échéant, cette méthode sera surchargée. """
server_root = os.path.dirname(self.o_yaml.yaml_file)
os.makedirs(server_root, exist_ok=True)
out_file = os.path.normpath(f'{server_root}/out.calc')
try:
with open(out_file, 'rb') as calc:
self.od_descr, self.np_array = pickle.load(calc)
if not self.od_descr or self.np_array.size == 0 or self.np_array.shape[0] != self.len_buffer:
raise() # On force le passage par except.
except (Exception,):
""" Initialisation. """
self.od_descr, self.np_array = self.new_od(), np.empty((self.len_buffer, 0))
if not self.dt_servers_in: # Dans la méthode suivante setup_od_descr() ...
self.od_mydic.write(['self_server', 'Signal actif'], True) # ... voir if l_keys[-1] != 'Signal actif':
def setup_od_descr(self):
""" Création du super-dictionnaire descriptif des calculs : od_descr.
- Une clé pour chaque signal, actif ou pas, ayant exactement le même libellé que celui
qui est affiché dans le dockable du node-client. Exemple pour un node 'Signaux' : 'Signaux1-Sinus'
- La valeur de cette clé est un dictionnaire ayant pour clés obligatoires :
- Diverses valeurs, spécifiques au signal, nécessaires aux calculs.
- num_col : Entier. Défini au dernier passage, sinon -1, destinée aux nodes-clients.
- actif : Booléen.
- notes : Aucune utilité en programmation, à l'attention du développeur.
- A la racine du dictionnaire, outre les clés de signaux, il y a :
- signature : signature sha256. Contient un assemblage des paramètres + signatures des nodes-serveurs.
Cette signature-composite nous permet de savoir si les signaux entrants ont changé.
- roots : liste de tuples, autant que de signaux en entrée : (name_input, key_dock)
Ex : name_input = 'e0', 'e1', ... key_dock = ADD4-Poisson-Signal
- Diverses valeurs globales, spécifiques au node, nécessaires aux calculs.
- Valeur de retour : od_descr.
"""
od_descr = Dictionary()
""" Signature = ma signature + signatures des nodes-serveurs. """
od_descr['signature'] = self.get_signature()
od_descr['roots'] = list()
num_col = 0
for l_keys in self.od_mydic.key_list():
if l_keys[-1] != 'Signal actif':
continue
key = l_keys[:-1]
val = self.od_mydic.read(key)
name_input = 'e' + l_keys[0][-1] if l_keys[0].startswith('Entrée ') else 'e0'
root_key = self.join(*[self.s_id] + key[-1].split('-')[1:])
if val.get('Signal actif', False):
od_descr['roots'].append((name_input, root_key))
od_descr['name_input'] = name_input
self.descr_signal(od_descr, val, root_key)
del od_descr['name_input']
for key_dock in od_descr.keys():
if key_dock.startswith(root_key):
if od_descr.read([key_dock, 'num_col']) is None:
num_col += 1
od_descr.write([key_dock, 'num_col'], num_col)
if key_dock.endswith('-Original'):
try:
od_descr.write([key_dock, 'actif'], val['Original'])
except (Exception,):
pass
elif od_descr.read([key_dock, 'actif']) is None:
od_descr.write([key_dock, 'actif'], val['Signal actif'])
return od_descr
def now_equal_ante(self, od_descr):
l_no_compare = ['actif']
for key_dock in od_descr.key_list():
if key_dock[-1] in l_no_compare:
continue
elif key_dock[0] == 'changed':
""" Passage forcé dans descr_signal() du node. """
return False
elif key_dock[-1] == 'signature' and od_descr.read(key_dock) != self.od_descr.read(key_dock):
return False
elif od_descr.read(key_dock) != self.od_descr.read(key_dock):
return False
else:
for o_server, _, _ in self.dt_servers_in.values():
if o_server.np_array is None:
return False
else:
return True
def descr_signal(self, od_descr, val, root_key):
""" Méthode OBLIGATOIREMENT surchargée par les classes dérivées. """
""" - Création du super-dictionnaire de description 'od_descr'.
- Les clés de 'od_descr' devront contenir les arguments nécessaires aux calculs.
- Les calculs seront effectués par pre_process(), ou, en mode 'générateur', par get_matrix().
- {key_dock} doit avoir EXACTEMENT la même orthographe que
la clé affichée dans le dockable des paramètres du node CLIENT
- Sources EXCLUSIVES : 'self.od_mydic' et 'self.o_yaml'.
- Dynamique - Cette méthode est appelée en boucle par la classe-mère :
- Une fois par signal aux entrées (actif ou pas).
- La construction du super-dictionnaire se fait donc par couches successives.
- Cela ne signifie pas qu'il y a autant de signaux en sortie qu'en entrée !
- Ces 2 nombres sont INDÉPENDANTS !
- En effet, plusieurs signaux d'entrée peuvent contribuer pour un seul signal de sortie ...
- ... et/ou, inversement, un signal d'entrée peut produire plusieurs signaux de sortie.
************************* IMPORTANT *************************
- Le coding se fera par itérations successives :
- Commencer à coder les calculs (pre_process() ou calculate()), qui parcourra ce dictionnaire 'od_descr'
- Si les calculs nécessitent une information absente dans 'od_descr', on revient ici pour l'ajouter.
- On prolonge le coding des calculs, puis on boucle ici ...
- ... jusqu'à la fin.
"""
raise SystemExit(f"{self.s_id} : ctrl_node.py > CtrlCalcul."
"\nLa méthode descr_signal() doit obligatoirement être surchargée.")
def pre_process(self):
""" Méthode surchargée par les classes 'générateur' dérivées : Signaux, Aléatoire, Histos, ... """
""" Cette méthode peut également être surchargée par d'autres classes dérivées, mais attention :
- La classe dérivée doit utiliser EXCLUSIVEMENT la méthode pre_process() OU la méthode get_matrix() :
- pre_process() : Le tableau numpy va être affecté à 100% ...
|_ ... ce qui risque d'être chronophage pour certains calculs.
- get_matrix() : Juste une toute petite partie du tableau numpy va être remplie en live, ...
|_ ... selon les besoins des nodes-clients, grâce à l'utilisation de générateurs python.
|_ Dans ce cas, la réactivité sera excellente, même si les calculs sont complexes.
Méthode pre_process() :
===================
- La récupération des paramètres se fait exclusivement depuis le super-dictionnaire self.od_descr.
- Le tableau numpy existe avec la bonne shape, mais est entièrement vide :
- Autant de colonnes que de signaux, plus une : la colonne de status, en position 0.
- Les valeurs sont des np.nan ...
- ... sauf la colonne 0 qui est remplie de '0'.
- Les valeurs sont produites :
- Par calculs, qui utilisent des fonctions "prêtes à l'emploi", du module 'indicators'.
- Par lecture d'une base de données.
"""
pass
def get_matrix(self, pointer, nb_lines=0):
"""
@param pointer: Pointeur "Depuis", si pointer < 0 , la totalité du tableau est retournée.
@param nb_lines: "Jusqu'a" = pointer + nb_lignes.
- Pour certains nodes, les datas ne sont pas dans 'self.np_array', mais en base de données.
- Le cas échéant, cette méthode sera surchargée.
- Si le node n'est pas de type 'générateur', cette méthode est également surchargée.
- Le traitement est court, ponctuel et partiel.
- A l'issue du traitement, une toute petite partie de tableau numpy 'self.np_array' a été mise à jour.
- La colonne 0 contient des flags (0 ou 1), levés lorsque les lignes correspondantes ont été traitées.
- Le retour est une matrice ayant le même nombre de colonnes que np_array et {nb_lines} lignes.
- Si les données demandées sont dans self.np_array, on les transmet directement.
- Sinon, on les génère, on les place dans self.np_array, puis on les transmet.
- Les paramètres sont à récupérer EXCLUSIVEMENT dans self.od_descr.
- Les données d'entrée sont demandées aux nodes-serveurs des entrées.
"""
_from, _to = pointer, pointer+nb_lines
""" Calculer si 'forcing' ou si le status = 0 (si au moins une valeur de la colonne 0 est à 0). """
if self.b_forcing_calc or np.prod(self.np_array[_from: _to, 0]) == 0:
""" Update de self.np_array. Début de transaction. """
self.process(pointer, nb_lines) # Traitement de tous les signaux aux entrées.
""" flag = 1 (fait) <--- Status. Fin de transaction. """
self.np_array[_from: _to, 0] = 1 # <-- Écriture du status = 1 (en colonne 0).
return self.np_array if pointer < 0 else self.np_array[_from: _to] # Tout ou slice.
def process(self, pointer, nb_lines):
od_buffer = self.new_od()
for name_input, root_key in self.od_descr.get('roots'):
o_server = self.dt_servers_in[name_input][0] # [0] = server, [1] = signature, [2] = edge
""" On demande au serveur une matrice un peu plus grande que nécessaire,
pour permettre l'amorce de certains calculs. On insère {previous_values} valeurs au début. """
previous_values = min(pointer, int(self.od_descr.read(['max_length', root_key], 0)))
np_matrix = o_server.get_matrix(pointer - previous_values, nb_lines + previous_values)
num_col_server = o_server.get_num_col(root_key)
vector_in = np_matrix[:, num_col_server]
od_buffer['name_input'] = name_input
self.calculate(pointer, nb_lines, root_key, vector_in, od_buffer)
del od_buffer['name_input']
if od_buffer:
""" Si dictionnaire non vide, modifié par calculate(), on termine le traitement. """
self.post_process(pointer, nb_lines, od_buffer)
def calculate(self, pointer, nb_lines, root_key, vector_in, od_buffer):
""" Pas de contrôle de surcharge pour les nodes sans entrées (type générateur, ...) """
if self.dt_servers_in:
raise SystemExit(f"{self.s_id} : ctrl_node.py > CtrlCalcul."
"\nLa méthode calculate() doit obligatoirement être surchargée.")
def post_process(self, pointer, nb_lines, od_buffer):
""" - Passage unique, après process() afin de finaliser certains calculs ...
... incluant d'autres signaux que le signal en cours.
- Pré-traitement spécifique. """
pass
def create_out_calc(self):
""" Création du fichier-résultat out_*.calc = (od_descr, liste de tableaux numpy). """
""" 1 - Contrôle - Le tableau nd_array doit avoir une shape à 2 dimensions. """
""" Correction nécessaire si le tableau est un vecteur. """
if len(self.np_array.shape) == 1: # <-- C'est un vecteur, on reshape.
self.np_array = self.np_array.reshape(self.np_array.shape[0], 1)
""" 2 - Persistance. """
server_root = os.path.dirname(self.od_mydic['Yaml file'])
out_file = os.path.normpath(f'{server_root}/out.calc')
try:
with open(out_file, 'wb') as calc:
pickle.dump((self.od_descr, self.np_array), calc, pickle.HIGHEST_PROTOCOL)
except (Exception,) as err:
print(err)
return False
return True
def add_vector(self, key_dock, num_col, vector):
if vector is None:
return
vector = vector.reshape(vector.shape[0], 1)
add_nan = np.full((self.len_buffer - vector.shape[0], 1), np.nan)
vector = np.vstack((add_nan, vector))
if num_col is None or num_col < 0:
""" Nouvelle colonne d'indice num_col (égal au nombre actuel de colonnes). """
num_col = self.np_array.shape[1]
self.np_array = np.hstack((self.np_array, vector))
else:
""" Surcharge de la colonne num_col. """
self.np_array[:, num_col:num_col + 1] = vector
self.od_descr.write([key_dock, 'num_col'], num_col)
def get_num_col(self, client_key): # client_key = MM4-Normal-Original
sliced_key = self.join(*client_key.split('-')[1:]) # sliced_key = Normal-Original (Début retiré).
if sliced_key != '':
for l_key in self.od_descr.key_list(): # [Aléatoire1-Normal, 'num_col']
if l_key[-1] == 'num_col' and l_key[-2].endswith(sliced_key):
return self.od_descr.read(l_key)
def get_vector_in(self, client_key): # client_key = MM4-Normal-Original
return self.np_array[:, self.get_num_col(client_key)]
Vérification
/nodes/indicateurs/moy_mobile/moy_mobile.py :
# Imports internes
import functions.indicators as indic
from nodes.indicateurs.moy_mobile.moy_mobile_yaml import YamlParams
from pc.ctrl_node import CtrlNode, CtrlCalcul
d_datas = {
'name': 'MMobile', # Label affiché sous l'icone.
'icon': 'moy_mobile.png', # Icone affichée.
}
class Node(CtrlNode):
def __init__(self, o_scene, s_id, pos):
super().__init__(o_scene, s_id, pos)
self.type = 'MM'
self.setup()
def setup(self, child_file=__file__):
self.l_short_circuits = [(0, 0)] # Entrée N°0 'court-circuitable' avec la sortie N°0.
super().setup(child_file)
@property
def ld_inputs(self):
return [{
'label': '',
'label_pos': (6, -10)
}]
@property
def ld_outputs(self):
return [{
'label': 'Sortie',
'label_pos': (-38, -10)
}]
def my_params(self, context):
return {
'Original': True,
'Type de MM': ['SMA', {'values': ['SMA', 'EMA', 'SMMA', 'LWMA']}],
"Périodes (sep=',')": '14',
}
def my_signals(self, l_signals_in, num_socket_out):
""" Faisceau de signaux (l_signals) délivré à la sortie. """
l_signals = list() # Liste de tuples à 4 valeurs.
for signals_in in l_signals_in[0]:
typ_id_from, signal_ante_from, signal_now_from, signal_source_from, \
_, signal_title, typ_id, signal_ante, signal_source = signals_in[:9]
try:
periods = [int(p.strip()) for p in self.get_param([signal_title, "Périodes (sep=',')"]).split(',')]
except (Exception,):
periods = [14]
if self.get_param([signal_title, 'Original'], True):
l_signals.append((typ_id, signal_ante, 'Original', signal_source))
for num, period in enumerate(periods):
legend = f"{signal_ante}-{self.get_param([signal_title, 'Type de MM'], 'SMA')}{period}"
signal_now = f'sign{num}'
signal_source = self.join(signal_source_from, self.join(typ_id_from, signal_now_from), sep='\n')
typ_id, signal_ante = f'{self.type}{self.id}', self.join(signal_ante_from, signal_now_from)
l_signals.append((typ_id, signal_ante, signal_now, signal_source, legend))
return l_signals
def refresh(self, l_keys):
""" Code spécifique. """
if self.need_update(l_keys):
self.lo_sockets_out[0].to_update() # Une seule sortie, N° 0.
super().refresh(l_keys)
class Calcul(CtrlCalcul):
""" ********** Le code ci-dessous ne concerne pas le poste de contrôle, mais seulement les calculs. ********** """
def __init__(self):
super().__init__()
self.o_yaml = YamlParams(self)
def descr_signal(self, od_descr, val, root_key):
""" Voir docstring dans la classe-mère. """
signal_name = root_key.split('-')[1]
""" Original. """
key_dock = f"{root_key}-Original"
od_descr.write([key_dock, 'notes'], f'{signal_name} original')
typ = val['Type de MM']
l_periods = [int(p.strip()) for p in val["Périodes (sep=',')"].split(',')]
od_descr.write(['max_length', root_key], int(1.5 * max(l_periods)))
for num, period in enumerate(l_periods):
""" key_dock doit avoir EXACTEMENT la même orthographe que la clé affichée
dans le dockable des paramètres du node CLIENT. """
key_dock = f'{root_key}-sign{num}'
od_descr.write([key_dock, 'notes'], f'{signal_name} {typ}{period}')
od_descr.write([key_dock, 'typ'], typ)
od_descr.write([key_dock, 'period'], period)
od_descr.write([key_dock, 'kwargs', 'nb_last'], min(period, self.o_yaml.smma_nb_last))
od_descr.write([key_dock, 'kwargs', 'ratio'], self.o_yaml.lwma_ratio)
def pre_process(self):
""" - Passage unique, après descr_signal() et avant get_matrix(). voir docstring dans la classe-mère.
- Pré-traitement spécifique. """
pass # Cette méthode peut être supprimée.
def calculate(self, pointer, nb_lines, root_key, vector_in, od_buffer):
""" Passage multiple. """
_to = pointer + nb_lines
""" Original. """
key_dock = root_key + '-Original'
num_col = self.od_descr.read([key_dock, 'num_col'])
self.np_array[pointer: _to, num_col] = vector_in[-nb_lines:]
""" Si plusieurs moyennes, key_dock se termine par 'sign0', 'sign1', ... """
for key_dock, d_val in self.od_descr.items():
if key_dock.startswith(root_key+'-sign'):
num_col = d_val.get('num_col')
typ = d_val.get('typ')
period = d_val.get('period')
kwargs = d_val.get('kwargs')
vector_out = indic.get_mm(vector_in, typ=typ, period=period, **kwargs)
nb_valid_lines = min(len(vector_out), nb_lines)
self.np_array[_to - nb_valid_lines: _to, num_col] = vector_out[-nb_valid_lines:]
/nodes/indicateurs/macd/macd.py :
# Imports internes.
from pc.ctrl_node import CtrlNode, CtrlCalcul
from nodes.indicateurs.macd.macd_yaml import YamlParams
import functions.indicators as indic
d_datas = {
'name': 'MACD', # Label affiché sous l'icone.
'icon': 'macd.png', # Icone affichée.
}
class Node(CtrlNode):
def __init__(self, o_scene, s_id, pos):
super().__init__(o_scene, s_id, pos)
self.type = 'MACD'
self.setup()
def setup(self, child_file=__file__):
""" Listes des dictionnaires de base attribués aux sockets. """
self.l_short_circuits = [(0, 0), (0, 1)] # e0 -> s0 et e0 -> s1
super().setup(child_file)
@property
def ld_inputs(self):
return [
{ # Liste de 1 dictionnaire.
'label': 'Entrée',
'label_pos': (6, -10)
}
]
@property
def ld_outputs(self):
return [ # Liste de 2 dictionnaires.
{
'label': 'Sortie',
'label_pos': (-38, -10)
},
'sep', # Séparateur (facultatif).
{
'label': 'Oscillateur',
'label_pos': (-59, -10)
}
]
def my_params(self, context):
""" Code appelant : CtrlNode.dynamic_params() """
return {
'Original': True,
'Type de MM longue': ['SMA', {'values': ['SMA', 'EMA', 'SMMA', 'LWMA']}],
'Moyenne longue': [26, {'compactHeight': False}],
'Type de MM courte': ['SMA', {'values': ['SMA', 'EMA', 'SMMA', 'LWMA']}],
'Moyenne courte': [12, {'compactHeight': False}],
'Type de MM Signal': ['SMA', {'values': ['SMA', 'EMA', 'SMMA', 'LWMA']}],
'Signal': [9, {'compactHeight': False}],
}
def my_signals(self, l_signals_in, num_socket_out):
""" Retourne un faisceau de signaux (l_signals) délivré à la sortie N° num_socket_out. """
l_signals = [[], []] # 2 sorties.
for signals_in in l_signals_in[0]:
typ_id_from, signal_ante_from, signal_now_from, signal_source_from, \
_, signal_title, typ_id, signal_ante, signal_source = signals_in
signal_source = self.join(signal_source_from, self.join(typ_id_from, signal_now_from), sep='\n')
typ_id, signal_ante = f'{self.type}{self.id}', self.join(signal_ante_from, signal_now_from)
""" Sortie 0 : 2 ou 3 signaux. """
orig = ['Original'] if self.get_param([signal_title, 'Original'], True) else []
for signal_now in orig + ["Longue", "Courte"]:
l_signals[0].append((typ_id, signal_ante, signal_now, signal_source))
""" Sortie 1 : 3 signaux. !!! Attention, ce n'est pas le signe moins, mais un caractère spécial !!!"""
for signal_now in ['(Courte–Longue)', 'Signal', 'MACD']:
l_signals[1].append((typ_id, signal_ante, signal_now, signal_source))
return l_signals[num_socket_out]
def refresh(self, l_keys):
""" Ex : l_keys = ["Paramètres du node 'Node0'", 'Signaux-1 : Sinus', 'Signal actif'] """
if self.need_update(l_keys):
""" Update (si l'état actif a changé) OU (si actif ET un des paramètres a changé). """
self.lo_sockets_out[0].to_update()
self.lo_sockets_out[1].to_update()
super().refresh(l_keys)
class Calcul(CtrlCalcul):
""" ********** Le code ci-dessous ne concerne pas le poste de contrôle, mais seulement les calculs. ********** """
def __init__(self):
super().__init__()
self.o_yaml = YamlParams(self)
# noinspection DuplicatedCode
def descr_signal(self, od_descr, val, root_key):
""" Voir docstring dans la classe-mère. """
signal_name = root_key.split('-')[1]
""" Original. """
key_dock = f"{root_key}-Original"
od_descr.write([key_dock, 'notes'], f'{signal_name} original')
""" Moyenne longue. """
per_long = val.get('Moyenne longue', 26)
key_dock = f"{root_key}-Longue"
typ = val.get("Type de MM longue", 'SMA')
od_descr.write([key_dock, 'notes'], f'{signal_name}-Longue {typ}{per_long}')
od_descr.write([key_dock, 'typ'], typ)
od_descr.write([key_dock, 'period'], per_long)
od_descr.write([key_dock, 'kwargs', 'nb_last'], min(per_long, self.o_yaml.long_smma_nb_last))
od_descr.write([key_dock, 'kwargs', 'ratio'], self.o_yaml.long_lwma_ratio)
""" Moyenne courte. """
per_short = val.get('Moyenne courte', 12)
key_dock = f"{root_key}-Courte"
typ = val.get("Type de MM courte", 'SMA')
od_descr.write([key_dock, 'notes'], f'{signal_name}-Courte {typ}{per_short}')
od_descr.write([key_dock, 'typ'], typ)
od_descr.write([key_dock, 'period'], per_short)
od_descr.write([key_dock, 'kwargs', 'nb_last'], min(per_short, self.o_yaml.short_smma_nb_last))
od_descr.write([key_dock, 'kwargs', 'ratio'], self.o_yaml.short_lwma_ratio)
""" Courte - Longue. """ # !!! Attention, ce n'est pas un tiret, mais un caractère spécial !!!
""" Les key_dock sont traitées par la suite par des split('-'). Le tiret ne doit donc pas être utilisé. """
key_dock = f"{root_key}-(Courte–Longue)" # (Courte{caractère spécial}Longue)
od_descr.write([key_dock, 'notes'], f'{signal_name}-Courte - {signal_name}-Longue')
""" Signal. """
per_signal = val.get('Signal', 9)
key_dock = f"{root_key}-Signal"
typ = val.get("Type de MM Signal", 'SMA')
od_descr.write([key_dock, 'notes'], f'{signal_name}-Signal {typ}{per_signal}')
od_descr.write([key_dock, 'typ'], val.get("Type de MM Signal", 'SMA'))
od_descr.write([key_dock, 'period'], per_signal)
od_descr.write([key_dock, 'kwargs', 'nb_last'], min(per_signal, self.o_yaml.signal_smma_nb_last))
od_descr.write([key_dock, 'kwargs', 'ratio'], self.o_yaml.signal_lwma_ratio)
""" MACD. """
key_dock = f"{root_key}-MACD"
od_descr.write([key_dock, 'notes'], f'{signal_name} MACD')
""" La plus grande période (augmentée de 50% par sécurité). """
od_descr.write(['max_length', root_key], 1.5 * max(per_long, per_short, per_signal))
def pre_process(self):
""" Passage unique, après descr_signal() et avant get_matrix(). voir docstring dans la classe-mère. """
pass
def calculate(self, pointer, nb_lines, root_key, vector_in, od_buffer):
""" Passage mutiple - Voir docstring dans la classe-mère. """
_to = pointer + nb_lines
od_val = self.new_od()
l_cols = list()
""" Phase 1 : récupération des paramètres du macd (od_val) et des n° de colonnes (l_cols). """
""" Original. """
key_dock = root_key + '-Original'
l_cols.append(self.od_descr.read([key_dock, 'num_col']))
""" Moyenne longue. """
key_dock = root_key + '-Longue'
l_cols.append(self.od_descr.read([key_dock, 'num_col']))
od_val.write(['long', 'typ'], self.od_descr.read([key_dock, 'typ']))
od_val.write(['long', 'period'], self.od_descr.read([key_dock, 'period']))
od_val.write(['long', 'nb_last'], self.od_descr.read([key_dock, 'kwargs', 'nb_last']))
od_val.write(['long', 'ratio'], self.od_descr.read([key_dock, 'kwargs', 'ratio']))
""" Moyenne courte. """
key_dock = root_key + '-Courte'
l_cols.append(self.od_descr.read([key_dock, 'num_col']))
od_val.write(['short', 'typ'], self.od_descr.read([key_dock, 'typ']))
od_val.write(['short', 'period'], self.od_descr.read([key_dock, 'period']))
od_val.write(['short', 'nb_last'], self.od_descr.read([key_dock, 'kwargs', 'nb_last']))
od_val.write(['short', 'ratio'], self.od_descr.read([key_dock, 'kwargs', 'ratio']))
""" Différence : (Moyenne courte - Moyenne longue). """
key_dock = root_key + '-(Courte–Longue)' # '–' = Caractère spécial.
l_cols.append(self.od_descr.read([key_dock, 'num_col']))
""" Signal. """
key_dock = root_key + '-Signal'
l_cols.append(self.od_descr.read([key_dock, 'num_col']))
od_val.write(['signal', 'typ'], self.od_descr.read([key_dock, 'typ']))
od_val.write(['signal', 'period'], self.od_descr.read([key_dock, 'period']))
od_val.write(['signal', 'nb_last'], self.od_descr.read([key_dock, 'kwargs', 'nb_last']))
od_val.write(['signal', 'ratio'], self.od_descr.read([key_dock, 'kwargs', 'ratio']))
""" MACD. """
key_dock = root_key + '-MACD' # '–' = Caractère spécial.
l_cols.append(self.od_descr.read([key_dock, 'num_col']))
""" Phase 2 : Utilisation de od_val pour obtenir le macd. """
t_macd = indic.macd(vector_in, od_val) # Tuple
""" Phase 3 : Utilisation de t_macd et de l_cols pour updater self.np_array. """
for i, vector_out in enumerate(t_macd):
nb_valid_lines = min(len(vector_out), nb_lines)
self.np_array[_to - nb_valid_lines: _to, l_cols[i]] = vector_out[-nb_valid_lines:]
/nodes/oscillateurs/rsi/rsi.py :
from pc.ctrl_node import CtrlNode, CtrlCalcul
from nodes.oscillateurs.rsi.rsi_yaml import YamlParams
import functions.indicators as indic
d_datas = {
'name': 'RSI', # Label affiché sous l'icone.
'icon': 'rsi.png', # Icone affichée.
}
class Node(CtrlNode):
def __init__(self, o_scene, s_id, pos):
super().__init__(o_scene, s_id, pos)
self.type = 'RSI'
self.setup()
def setup(self, child_file=__file__):
self.l_short_circuits = [(0, 0)] # Entrée N°0 'court-circuitable' avec la sortie N°0.
super().setup(child_file)
@property
def ld_inputs(self):
return [{
'label': '',
'label_pos': (6, -10)
}]
@property
def ld_outputs(self):
return [{
'label': 'Sortie',
'label_pos': (-38, -10)
}]
def my_params(self, context):
return {
'Zone neutre': [70., {'step': 1, 'limits': (5, 95), 'suffix': '%', 'compactHeight': False}],
"Périodes (sep=',')": '14',
}
def my_signals(self, l_signals_in, num_socket_out):
""" Faisceau de signaux (l_signals) délivré à la sortie. """
l_signals = list()
for signals_in in l_signals_in[0]:
typ_id_from, signal_ante_from, signal_now_from, signal_source_from, \
_, signal_title, typ_id, signal_ante, signal_source = signals_in
periods = [int(p) for p in self.get_param([signal_title, "Périodes (sep=',')"]).split(',')]
for period in periods:
signal_now = f't{period}'
signal_source = self.join(signal_source_from, self.join(typ_id_from, signal_now_from), sep='\n')
typ_id, signal_ante = f'{self.type}{self.id}', self.join(signal_ante_from, signal_now_from)
l_signals.append((typ_id, signal_ante, signal_now, signal_source))
return l_signals
def refresh(self, l_keys):
""" Code spécifique. """
if self.need_update(l_keys):
self.lo_sockets_out[0].to_update() # Une seule sortie, N° 0.
super().refresh(l_keys)
class Calcul(CtrlCalcul):
""" ********** Le code ci-dessous ne concerne pas le poste de contrôle, mais seulement les calculs. ********** """
def __init__(self):
super().__init__()
self.o_yaml = YamlParams(self)
def descr_signal(self, od_descr, val, root_key):
""" - Voir docstring dans la classe-mère.
- Les clés 'low' et 'high' sont utilisées par matplotlib pour colorier les zônes extrêmes. """
signal_name = root_key.split('-')[1]
l_periods = [int(p.strip()) for p in val["Périodes (sep=',')"].split(',')]
od_descr.write(['max_length', root_key], int(1.5 * max(l_periods)))
for period in l_periods:
key_dock = f"{root_key}-t{period}"
""" Limites low et high. """
low = (100 - val['Zone neutre']) / 2
high = 100 - low
od_descr.write([key_dock, 'notes'], f'{signal_name} {period}')
od_descr.write([key_dock, 'period'], period)
""" Variables 'low' et 'high' utilisables dans les paramètres étendus yaml. """
od_descr.write([key_dock, 'low'], low)
od_descr.write([key_dock, 'high'], high)
def pre_process(self):
""" - Passage unique, après descr_signal() et avant get_matrix(). voir docstring dans la classe-mère.
- Pré-traitement spécifique. """
pass
def calculate(self, pointer, nb_lines, root_key, vector_in, od_buffer):
_to = pointer + nb_lines
""" Si plusieurs périodes de rsi, key_dock se termine par 't14', 't22', ... """
for key_dock, d_val in self.od_descr.items():
if key_dock.startswith(root_key+'-t'):
num_col = d_val.get('num_col')
period = d_val.get('period')
vector_out = indic.rsi(vector_in, period)
nb_valid_lines = min(len(vector_out), nb_lines)
self.np_array[_to - nb_valid_lines: _to, num_col] = vector_out[-nb_valid_lines:]

Ce signal est un Renko de maille 5 : chaque point est à 5 pips du voisin (+ ou -).

Les signaux sont sur des plans différents, à l'instar d'un paysage vu par la fenêtre d'un train :
L'arrière plan (mouvement lent) peut faire des kilomètres, le premier plan quelques centimètres.
Les signaux convergent à l'instant présent.


Le seul point commun à toutes les courbes est à droite, à l'instant présent.


Bonjour les codeurs !