Serveur d'historiques réels
Avant-propos
Ce tuto termine le chapitre 'Historiques'
Description
Pour la mise au point, seul un signal Renko sera utilisé, exemple : Maille 5.
show_matplotlib.py
.À la fin de show_matplotlib.py
:
if __name__ == '__main__':
""" Ajout d'arguments à la ligne de commande permettant un lancement autonome (pour la MAP). """
sys.argv.append('Calculs') # Nom du graphe.
sys.argv.append('0') # Plots id. (La ligne de commande ne doit contenir que du str).
sys.argv.append('-1') # Gestion de la fermeture automatique. Poste de contrôle PID : -1 en local.
mpl = ShowMatPlotLib()
Histos
.Plots
:
fixed_params()
de ces nodes./nodes/generateurs/histos/histos.py > Node.fixed_params()
:
def fixed_params(self):
return {
'Instrument': ['EUR/USD', {'values': ['EUR/USD', 'USD/JPY', 'EUR/CHF', 'USD/CAD', 'NZD/USD',
'EUR/GBP', 'EUR/JPY', 'GBP/JPY', 'GBP/CHF', 'GBP/USD']}],
'Ticks': False, # à chaque variation du signal.
'Maille 1': False, # Renko, maille de 1 pip.
'Maille 2': False, # 2 pips.
'Maille 3': False, # 3 pips.
'Maille 4': False, # 4 pips.
'Maille 5': True, # 5 pips.
'Maille 7': False, # 7 pips.
'Maille 10': False, # 10 pips.
'Maille 14': False, # 14 pips.
'Maille 20': False, # 20 pips.
'1 mn': False, # 1 minute.
'5 mn': False, # 5 minutes.
'15 mn': False, # 15 minutes.
'30 mn': False, # 30 minutes.
'1 h': False, # 1 heure.
'4 h': False, # 4 heures.
'1 j': False, # 1 jour.
'1 s': False, # 1 semaine.
'* Voir historiques ': '', # '*' = bouton.
'* Télécharger ': '',
'* Synchroniser bd ': '',
}
/nodes/afficheurs/plots/plots.py > Node.fixed_params()
:
def fixed_params(self):
""" Valeurs par défaut. """
return {
"Nombre d'entrées": [1, {'step': 1, 'limits': (1, 8), 'compactHeight': False}],
'Abscisse': [50_000., {'suffix': ' points', 'limits': (1000, 300_000),
'compactHeight': False, 'siPrefix': True, 'dec': True}],
'Fenêtre': {
'Titre': 'Titre de la fenêtre',
'Interface': ['Matplotlib', {'values': list(self.scripts.keys())}]
}
}
Notez l'effet très intéressant de l'attribut dec
en modifiant 'Abscisse' depuis le dockable (échelle logarithmique).
main.py
)Abscisse
?
Position
(sa valeur max) doit être automatiquement modifiée ...show_matplotlib.py
. Voici son code.☐ Fichier complet /show/show_matplotlib.py
:
# Imports externes.
from PyQt5.QtCore import QTimer
import psutil
import pickle
import numpy as np
import sys
import os
from matplotlib import pyplot as plt, rcParams
from matplotlib.widgets import Slider
# sys.path.append(os.path.dirname(sys.path[0])) # Pour MAP depuis le terminal.
# Imports internes.
from functions.utils import Utils, Dictionary, DateTime
from nodes.afficheurs.plots.matplotlib_yaml import YamlParams
from pc.ctrl_node import CtrlCalcul
# noinspection PyUnresolvedReferences
class ShowMatPlotLib(CtrlCalcul):
def __init__(self):
super().__init__()
""" Ini: récupération des arguments passés en ligne de commande. """
self.graph_name = sys.argv[1]
self.id = int(sys.argv[2])
self.parent_pid = int(sys.argv[3])
self.final_path = ''
self.ut = None
""" Watch. """
self.l_files_ante = list()
""" Matplotlib. """
self.fig = None
self.l_ax = [None for _ in range(8)] # Liste de 8 valeurs (les axes, ou graphiques : 1 par entrée)
self.b_busy = False
self.b_paused = False
self.b_reverse = False
""" Graphiques. """
self.l_inputs = list()
self.width_x = None
self.pointer = None
""" Timers. """
self.dt = DateTime()
self.listen = QTimer()
self.anim = QTimer()
""" Setup. """
self.setup()
def setup(self):
""" 1 - Chemin complet du node final 'Plots'. Ce chemin contient les modèles de calculs 'calc_*.pkl. """
bk_path = os.path.dirname(__file__).replace('show', 'backups') # Dossier de backup.
self.final_path = os.path.normpath(os.path.join(bk_path, self.graph_name, f'node{self.id}')) # 'Plots'
self.ut = Utils(path=self.final_path, file='matplotlib')
""" 2 - Aide pour la mise au point : styles et paramètres diponibles dans matplotlib. Décommenter pour voir. """
# [print(style) for style in plt.style.available] # Liste des styles disponibles.
# Dictionary(rcParams).print() # Liste des paramètres disponibles.
""" 3 - Fichier de configuration, éditable manuellement en yaml. """
self.o_yaml = YamlParams(self)
self.o_yaml.yaml_file = os.path.normpath(f'{self.final_path}/{self.id}-matplotlib.yaml')
self.o_yaml.update_odyaml()
self.o_yaml.set_rcparams(rcParams)
plt.style.use(self.o_yaml.get_style())
""" 4 - Vitesse d'animation (Modifiable : touches + et -), pause, reverse, pas à pas. """
self.anim.indx = 5 # <-- Vitesse moyenne (de 0 à 11)
speed = self.speed(self.anim.indx)
self.anim.delay = speed[0]
self.anim.step = speed[1]
""" 5 - Initialisation de matplotlib.pyplot. """
self.fig = plt.figure()
self.mgr.set_window_title('**ini**')
""" Slider 'Largeur'. https://matplotlib.org/stable/api/widgets_api.html#matplotlib.widgets.Slider """
slide_width = plt.axes([.12, .06, .77, .02]) # Taille et position du slider 'Largeur'.
slide_width.semilogx()
self.width_x = Slider(slide_width, 'Largeur', 10, 10_000, 316)
# |_ 316 est la moyenne géométrique de 10 et 10_000. 10**1=10, 10**2.5=316 (au milieu), 10**4=10_000
slide_posit = plt.axes([.12, .02, .77, .02]) # Taille et position du slider 'Position'.
self.pointer = Slider(ax=slide_posit, label='Position', valmin=0,
valmax=self.len_buffer - self.width_x.val - 2, valfmt='%0.0f')
self.pointer.set_val(-1)
""" 6 - Restauration de l'état de la fenêtre. """
self.ut.restore_state(self.mgr.window)
""" 7 - Surveillance des fichiers '.pkl'. """
self.ini_watch()
""" 8 - Événements. """
self.listen.timeout.connect(self.listener)
self.anim.timeout.connect(self.show_anim)
self.fig.canvas.mpl_connect('key_press_event', self.key_event)
self.pointer.on_changed(self.show_anim)
self.width_x.on_changed(self.set_width_x)
""" 9 - Lancement des timers. """
self.listen.start(1000)
self.anim.start(self.anim.delay)
""" 10 - Boucle d'exécution Matplotlib. """
plt.show()
@property
def mgr(self):
return plt.get_current_fig_manager()
def listener(self):
""" 1 - Si le poste de contrôle est fermé => fin programme, sauf en mode autonome. """
pass
if self.parent_pid > 0 and not psutil.pid_exists(self.parent_pid):
exit()
""" 2 - Persistance géométrie. """
self.ut.save_state(self.mgr.window)
def set_width_x(self, val):
self.set_pointer_xlim()
self.pointer.val += 1 if self.b_reverse else -1
self.show_anim()
def show_anim(self, _=0):
""" Anti ré-entrance. """
if self.b_busy:
return
self.b_busy = True
""" Pointeur commun. """
self.pointer.set_val(round(self.pointer.val + (-self.anim.step if self.b_reverse else self.anim.step)))
if self.pointer.val > self.pointer.valmax - round(self.width_x.val) - 2: # Bouclage provisoire.
self.pointer.set_val(0)
if self.pointer.val < 0: # Bouclage provisoire.
self.pointer.set_val(self.pointer.valmax - round(self.width_x.val) - 2)
""" Scrolling des entrées connectées et actives du node 'Plots' : subplots ou graphiques. """
for ax in self.l_ax:
if ax is None or not hasattr(ax, 'o_server') or ax.o_server is None:
continue
""" Nombre de points en abscisse.
Si non précisé dans le yaml, ce subplot utilisera la largeur commune, fixée par le slider 'Largeur'. """
width_x = round(self.width_x.val) # valeur du slider.
""" Avancement individuel du pointeur. """
if len(ax.lines) > 0: # Immobile si aucune courbe n'est affichée.
ax.pointer += -self.anim.step if self.b_reverse else self.anim.step
if ax.pointer > self.len_buffer - width_x: # Bouclage provisoire.
ax.pointer = 0
if ax.pointer < 0: # Bouclage provisoire.
ax.pointer = self.len_buffer - width_x
""" pointer = Position du 1er point à gauche dans le graphique.
Si ShareX dans le yaml (True par défaut), ce subplot utilisera le slider 'Position'. """
pointer = self.pointer.val if ax.od_self_yaml.get('ShareX', True) else ax.pointer
""" Récupération des données sous forme de matrices numpy. """
x = np.arange(pointer, pointer + width_x)
y = ax.o_server.get_matrix(pointer, width_x)
if y is None:
continue
""" Boucle sur les courbes. """
y_min, y_max = np.inf, -np.inf
for o_line in ax.lines:
num_col = int(o_line.od.get('num_col'))
if not np.isnan(y[:, num_col]).all():
y_min = min(y_min, np.nanmin(y[:, num_col]))
y_max = max(y_max, np.nanmax(y[:, num_col]))
""" Dessin d'une courbe. Attribution des valeurs. """
try:
o_line.set_xdata(x)
o_line.set_ydata(y[:, num_col])
except (Exception,):
pass
""" Limites x et y (abscisses et ordonnées). """
ax.set_xlim(pointer, pointer + width_x - 1) # Abscisses.
if y_min != np.inf:
y_padding = max(0, (y_max - y_min) * 0.05) # Marges top et bottom du graphique (5%)
y_min, y_max = y_min - y_padding, y_max + y_padding
ax.set_ylim(y_min, y_max) # Ordonnées.
""" Coloriage inter-courbes. """
self.o_yaml.between(ax, x, y)
""" Fin. """
plt.draw()
self.b_busy = False
def ini_watch(self):
""" Liste des éléments à surveiller : dossier {graphe} + dossiers {nodes} + fichiers .pkl + fichiers .yaml
- Le dossier est self.final_path, dans les backups.
- Les fichiers calc_*.pkl : un par entrée active de ce node d'affichage.
- Les fichiers .yaml : valeurs des clés 'Yaml file' dans les .pkl """
""" 1 - Dossier du graphe dans les backups :dossier-parent de tous les nodes du graphe. """
graph_path = os.path.normpath(f"{os.path.dirname(__file__).replace('show', 'backups')}/{self.graph_name}")
""" 2 - Liste des dossiers des nodes du graphe (qui contiennent des fichiers nécessaires aux calculs) """
l_folders = [graph_path] + [f'{graph_path}{os.sep}{node}' for node in os.listdir(graph_path) if
str(node).startswith('node')]
""" 3 - Liste des modèles yaml. """
l_models = list()
nodes_dir = os.path.dirname(__file__).replace('show', 'nodes')
for (folder, subfolder, files) in os.walk(nodes_dir):
if folder.endswith('models'):
for file in files:
f_include = os.path.join(folder, file)
l_models.append(f_include)
""" 4 - Liste des fichiers calc_*.pkl physiquement présents dans le dossier self.final_path """
l_watched_files = [os.path.normpath(f'{self.final_path}/{calc}') for calc in
os.listdir(self.final_path) if calc.startswith('calc_') or calc.endswith('.conf')]
""" 5 - Liste des fichiers .yaml : chaque od_calc (extrait du pkl) contient plusieurs clés 'Yaml file'.
On en fait une liste sans doublons. """
s_yaml = set() # set() au lieu de list() pour s'affranchir des doublons.
for path, folder, l_files in os.walk(graph_path):
[s_yaml.add(os.path.normpath(f'{path}/{file}')) for file in l_files if os.path.splitext(file)[1] == '.yaml']
""" 6 - Concaténation des listes précédentes. """
l_watched_files += list(s_yaml) + l_models # Concaténation de 3 listes de fichiers.
l_files = l_folders + l_watched_files # Concaténation des listes : dossiers et fichiers.
""" 7 - Vérification des changements dans cette liste, mais non dans le contenu des fichiers. """
if self.l_files_ante != l_files:
self.l_files_ante = self.ut.deepcopy(l_files) # Mémorisation (copie profonde).
self.ut.watch_file(l_files, self.watch) # (Re)lancement de la surveillance.
self.dt.delay(self.watch, 10, l_watched_files)
def watch(self, l_files):
""" l_files : liste contenant les dossiers et fichiers modifiés."""
files_only = [file for file in l_files if os.path.isfile(file)] # Filtrage : que les fichiers.
if len(files_only) == 0:
""" 1 - l_files ne contient que des dossiers : cela signifie que des fichiers ont été ajoutés ou supprimés.
Par conséquent on relance la méthode ini_watch() pour mettre à jour la liste des fichiers à surveiller. """
self.ini_watch()
return
if len(files_only) == 1 and files_only[0].endswith('.conf'):
return
""" 2 - Lecture de tous les calc_*.pkl (un par entrée) et écriture de leur contenu dans self.od_calcs. """
CtrlCalcul.od_calcs.clear() # Attribut de la classe CtrlCalcul.
l_calcs = [(f'calc_{i}', os.path.normpath(f'{self.final_path}/calc_{i}.pkl')) for i in range(8)]
for name_input, calc_file in l_calcs:
""" ex : name_input = 'calc_0' """
try:
with open(calc_file, 'rb') as pk:
self.od_calcs.write(name_input, pickle.load(pk))
except (Exception,):
pass
""" 3 - Création ou update des paramètres étendus od_yaml de o_yaml. """
self.o_yaml.update_odyaml()
""" 4 - La méthode principale update_figure() est chargée des mises à jour des calculs et des affichages. """
self.update_figure()
def update_figure(self):
""" Update suite aux modifications .pkl et .yaml """
""" 1 - Titre de la fenêtre. """
local = ' - Mode local' if len(sys.argv) < 6 else '' # Nombre d'arguments passés en ligne de commande.
title_ante = self.mgr.get_window_title()
b_first = title_ante == '**ini**' # b_first = Premier passage, au lancement de l'appli.
name_input = list(self.od_calcs.keys())[0] # Première entrée de 'Plots' connectée.
title_now = self.od_calcs.read([name_input, f'Node{self.id}', 'Fenêtre', 'Titre'], 'Graphiques') + local
l_inputs = [key for (key, val) in self.od_calcs.items() if val['edges']] # Entrées actives seulement.
if title_now != title_ante:
self.mgr.set_window_title(title_now)
""" 1.1 - Si seul le titre de la fenêtre a changé, inutile de continuer (sauf au 1er passage). """
if l_inputs == self.l_inputs and not b_first:
return True
""" 2 - Paramètres étendus yaml. """
self.o_yaml.set_rcparams(rcParams)
""" 3 - Subplots : création, update, géométrie. Chaque entrée est associée à un subplot.
- Celui-ci est créé ou mis à jour en fonction des changements de grille (nb d'entrées ou yaml). """
if self.o_yaml.grid_modified or l_inputs != self.l_inputs: # Respecter l'ordre de comparaison !
self.l_inputs = self.ut.deepcopy(l_inputs)
d_grids = self.o_yaml.get_dgrids()
for num_input in range(8):
self.update_ax(d_grids, num_input)
""" 4 - Paramètres des subplots (un par entrée active) et de leurs courbes. """
self.od_mydic.clear()
pointer_size = self.len_buffer
for num_input in range(8): # self.l_inputs = ['calc_0', 'calc_1', ...]
name_input = f'calc_{num_input}'
if name_input not in l_inputs:
self.l_ax[num_input] = None
continue
ax = self.l_ax[num_input] # Copie par référence.
""" 4.1 - Pour cette entrée : recherche de son serveur et de sa signature. """
edge = self.get_edge(name_input) # ex : edge = ((1, 0), (0, 0))
ax.o_server, signature = self.get_server(edge[0][0]) # edge[0][0] = id du node serveur.
ax.num_input = num_input
if ax.o_server is None:
continue # Entrée non connectée ou désactivée.
""" 4.2 - Si nécessaire, calculs et paramètres des courbes. """
if not hasattr(ax, 'signature') or ax.signature != signature:
ax.signature = signature
ax.od_self_yaml = Dictionary(self.o_yaml.od_yaml.read(f'Entrée {num_input}'))
ax.o_server.calculation()
""" 4.3 - Paramètres des courbes. """
self.update_subplot(ax)
""" 4.4 - Nombre de points fournis par les nodes serveurs. On conserve le min. """
pointer_size = min(pointer_size, ax.o_server.nb_points)
""" 5 - Nombre de points en abscisse.
https://matplotlib.org/stable/api/widgets_api.html#matplotlib.widgets.Slider """
if self.pointer.valmax != pointer_size - self.width_x.val - 2:
self.set_pointer_xlim(pointer_size)
""" 6 - Actualisation du buffer d'affichage nécessaire lorsque il est en pause. """
self.show_anim()
""" 7 - Affichage à l'écran du contenu du buffer d'affichage. """
plt.draw()
def set_pointer_xlim(self, pointer_size=None):
self.pointer.valmin, self.pointer.valmax = 0, self.pointer.valmax if pointer_size is None else pointer_size
self.pointer.ax.set_xlim(self.pointer.valmin, self.pointer.valmax - round(self.width_x.val) - 2)
self.pointer.val = min(self.pointer.val, self.pointer.valmax)
def get_edge(self, name_input): # ex: name_input = 'calc_1'.
""" Recherche du socket_out emetteur (directement connecté au socket_in 'name_input' de ce node 'Plots'). """
s_edges = self.od_calcs.read([name_input, 'edges'])
for edge in s_edges:
if edge[1][0] == self.id:
return edge
def update_subplot(self, ax):
""" Mise à jour des paramètres de ce subplot (axe), ainsi que de chacune de ses courbes.
@param ax: objet subplot.
@return: NA.
"""
def update_line():
""" Code d'appel : point 2 un peu plus bas.
https://matplotlib.org/stable/api/_as_gen/matplotlib.lines.Line2D.html """
""" 2.1 - Affectation des attributs aux courbes : couleur, épaisseur, style, légende. """
o_line.od = Dictionary(od_lines.read(o_line.name))
if not o_line.od:
return # 'Kalman2-Normal-Original'
num_col = ax.o_server.od_descr.read([o_line.name, 'num_col']) # 'Kalman2-Normal-kalman'
o_line.od.write('Entrée', f'Entrée {ax.num_input}')
o_line.od.write('num_col', num_col)
o_line.set_color(o_line.od.read('Couleur'))
o_line.set_linewidth(o_line.od.read('Épaisseur'))
o_line.set_linestyle(self.get_linestyle(o_line.od.read('Style')))
o_line.set_label(o_line.od.read('Légende'))
self.o_yaml.line_params(o_line) # Paramètres étendus pour cette courbe.
""" 2.2 - Exemple de compilation dynamique au niveau de la courbe 'o_line'. Décommenter si nécessaire. """
# d_context = {'line': o_line}
# self.o_yaml.compile([f'Entrée {num_input}', o_line.name, 'Code'], d_context)
""" 1 - Actualisation de la liste des courbes ax.lines.
- Si 'Plots' a 1 seule entrée, clé de type ['calc_0', 'Node0'], sinon ['calc_0', 'Node0', 'Entrée 0']. """
l_mykey = [f'calc_{ax.num_input}', f'Node{self.id}'] # Clé de od_mydic dans od_calcs.
l_keys = l_mykey + [f'Entrée {ax.num_input}'] # ['calc_0', 'Node0', 'Entrée 0']
od_lines = Dictionary(self.od_calcs.read(l_keys, self.od_calcs.read(l_mykey)))
self.update_lines(ax.num_input, od_lines) # Suppression et ajout de courbes.
""" 2 - Mise à jour de chacune des courbe. """
for o_line in ax.lines: # Parcours des courbes de ce subplot (de cette entrée de node).
update_line()
""" 3 - Exemple de compilation dynamique au niveau du subplot 'num_input'. Décommenter si nécessaire. """
# d_context = {'subplot': ax}
# self.o_yaml.compile([f'Entrée {num_input}', 'Code'], d_context)
""" 4 - Paramètres étendus yaml. """
self.o_yaml.subplot(self.l_ax, ax.num_input)
def update_lines(self, num_input, od_plots):
""" Actualisation de la liste de courbes self.l_ax[num_input].lines[].
On compare les listes de coubes nécessaires et de courbes existantes.
On en déduit celles qu'il faut ajouter, celles qu'il faut supprimer. """
""" 1) Courbes nécessaires : l_needed. """
l_needed = list()
for key, val in od_plots.items():
if isinstance(val, dict) and 'Signal actif' in val and val['Signal actif']:
l_needed.append(key)
""" 2) Courbes existantes : l_exist. """
l_exist = self.line_exist(num_input)
""" 3) Ajout (création) de courbes. """
for needed in l_needed:
if needed not in l_exist:
o_line, = self.l_ax[num_input].plot(np.empty(0)) # [num_input] = Élément 0 de la liste.
o_line.name = needed
""" 4) Suppression de courbes. """
for exist in l_exist:
if exist not in l_needed:
o_line = self.get_line(num_input, exist)
if o_line is not None:
self.l_ax[num_input].lines.remove(o_line)
def line_exist(self, num_input):
""" Appelée par update_lines(). """
l_lines = list()
for line in self.l_ax[num_input].lines:
l_lines.append(line.name)
return l_lines
def get_line(self, num_input, line_name):
""" Appelée par update_lines(). """
for o_line in self.l_ax[num_input].lines:
if o_line.name == line_name:
return o_line
return None
@staticmethod
def get_linestyle(param_style):
if param_style.startswith('___'):
return '-'
elif param_style.startswith('- -'):
return '--'
elif param_style.startswith('. .'):
return ':'
return '-.'
def update_ax(self, d_grids, num_input):
""" On vérifie les 8 inputs, connectés ou pas. """
name_input = f'calc_{num_input}'
ax = self.l_ax[num_input]
if name_input not in self.l_inputs:
""" Vérification des entrées non connectées aussi, afin éviter des affichages indésirables."""
if ax is not None:
""" Ce subplot a existé, il n'existe plus. """
ax.set_visible(False) # Suppression des graphiques (subplots) fantômes.
self.l_ax[num_input] = None
return
""" Le subplot {ax} contient l'attribut 'pointer' qui disparait après cet update.
Par conséquent, on le sauvegarde d'abord. """
if ax is None:
pointer = 0
else:
ax.set_visible(False) # Suppression des graphiques (subplots) fantômes.
pointer = ax.pointer
""" Update du subplot {ax}. add_axes() permet un positionnement absolu, totalement libre. """
ax = self.fig.add_axes(d_grids[name_input]) # Position absolue. x, y, w, h. Valeurs entre 0 et 1.
""" Réintégration de l'attribut 'pointer'. """
ax.pointer = pointer
self.l_ax[num_input] = ax
@staticmethod
def speed(indx):
""" Calcul empirique. """
values = [(3000, 1), (2000, 1), (1200, 1), (640, 1), (480, 1), (320, 1), # (durée en ms, pas de 1) <-- Lents.
(280, 2), (296, 4), (273, 7), (280, 14), (286, 26), (282, 47), (400, 20)] # <-- Rapides.
return values[indx]
def key_event(self, ev):
""" Appui sur une touche du clavier. """
keycode = ev.key
if keycode == ' ':
""" Pause on/off. """
self.b_paused = not self.b_paused
self.anim.stop() if self.b_paused else self.anim.start(int(self.anim.delay))
elif keycode == '+':
""" Accélérer. """
self.anim.stop()
self.anim.indx += 1
self.anim.indx = min(self.anim.indx, 11)
speed = self.speed(self.anim.indx)
self.anim.delay = speed[0]
self.anim.step = speed[1]
self.anim.start(int(self.anim.delay))
elif keycode == '-':
""" Ralentir. """
self.anim.stop()
self.anim.indx -= 1
self.anim.indx = max(self.anim.indx, 0)
speed = self.speed(self.anim.indx)
self.anim.delay = speed[0]
self.anim.step = speed[1]
self.anim.start(int(self.anim.delay))
elif keycode == 'tab':
self.b_reverse = not self.b_reverse
elif keycode == 'right' or keycode == 'left':
self.b_paused = True
self.anim.stop()
step = self.anim.step
self.anim.step = 1 if self.b_reverse == (keycode == 'left') else -1 # Ou exclusif.
self.show_anim()
self.anim.step = step
if __name__ == '__main__':
""" Ajout d'arguments à la ligne de commande permettant un lancement autonome (pour la MAP). """
sys.argv.append('Calculs') # Nom du graphe.
sys.argv.append('0') # Plots id. (La ligne de commande ne doit contenir que du str).
sys.argv.append('-1') # Gestion de la fermeture automatique. Poste de contrôle PID : -1 en local.
mpl = ShowMatPlotLib()
La classe Calcul
du node Plots
devient inutile, vous pouvez la supprimer.
Les méthodes colorées sont spécifiques au node et sont implémentées dans sa classe Calcul
.
PC
est enregistré dans un sous-dossier du dossier backups
.params.pkl
.Plots
) possèdent un fichier calc_?.pkl
par entrée active et connectée.*.yaml
.o_server
.
Plots-0
n'a qu'une entrée dont le node-serveur est Histos-1
.o_server.descr_signal()
est appelée :
self.od_descr
à partir des fichiers pkl
et yaml
.o_server.pre_process()
est appelée : grâce à self.od_descr
, elle réalise un traitement d'initialisation des calculs.o_server.get_matrix()
est totalement autonome et séparée.
o_server.pre_process()
.numpy
avec autant de colonnes que de courbes à dessiner et avec autant de lignes que de points en abscisse du graphique.La classe Calcul
du node Histos
doit donc contenir ces 3 méthodes :
self.descr_signal()
.
self.pre_process()
.
self.get_matrix()
.
☐ /nodes/geberateurs/histos/histos.py > fichier complet
:
# Imports externes
import numpy as np
# Imports internes
from pc.ctrl_node import CtrlNode, CtrlCalcul
from nodes.generateurs.histos.histos_yaml import YamlParams
from trading.historiques.ctrl_histos import CtrlHistos
d_datas = {
'name': 'Histos', # Label affiché sous l'icone.
'icon': 'histos.png', # Icone affichée.
}
class Node(CtrlNode):
def __init__(self, o_scene, s_id, pos):
super().__init__(o_scene, s_id, pos)
self.type = 'Histos'
self.setup()
def setup(self, child_file=__file__):
super().setup(child_file)
@property
def ld_outputs(self):
return [{
'label': 'Sortie',
'label_pos': (-38, -10)
}]
def fixed_params(self):
return {
'Instrument': ['EUR/USD', {'values': ['EUR/USD', 'USD/JPY', 'EUR/CHF', 'USD/CAD', 'NZD/USD',
'EUR/GBP', 'EUR/JPY', 'GBP/JPY', 'GBP/CHF', 'GBP/USD']}],
'Ticks': False, # à chaque variation du signal.
'Maille 1': False, # Renko, maille de 1 pip.
'Maille 2': False, # 2 pips.
'Maille 3': False, # 3 pips.
'Maille 4': False, # 4 pips.
'Maille 5': True, # 5 pips.
'Maille 7': False, # 7 pips.
'Maille 10': False, # 10 pips.
'Maille 14': False, # 14 pips.
'Maille 20': False, # 20 pips.
'1 mn': False, # 1 minute.
'5 mn': False, # 5 minutes.
'15 mn': False, # 15 minutes.
'30 mn': False, # 30 minutes.
'1 h': False, # 1 heure.
'4 h': False, # 4 heures.
'1 j': False, # 1 jour.
'1 s': False, # 1 semaine.
'* Voir historiques ': '', # '*' = bouton.
'* Télécharger ': '',
'* Synchroniser bd ': '',
}
def my_signals(self, l_signals_in, num_socket_out):
l_signals = list()
for signal_key in self.fixed_params().keys():
b_signal = self.get_param(signal_key) # Booléen.
if isinstance(b_signal, bool) and b_signal:
typ_id, signal_ante, signal_now, signal_source = f'{self.type}{self.id}', '', signal_key, ''
l_signals.append((typ_id, signal_ante, signal_now, signal_source))
return l_signals
def get_state(self):
""" Surcharge. """
if self.b_chk:
for signal in self.fixed_params().keys():
if isinstance(self.get_param(signal), bool) and self.get_param(signal, False):
return True
return False
def histo_data(self, button):
""" Clic sur un bouton : Voir, télécharger, synchroniser. """
o_hist = CtrlHistos(self.get_param('Instrument')) # Objet Contrôleur d'historiques.
if button.startswith('Voir'):
""" Bouton 'Voir historiques'. Si Instrument == EUR/USD => Symbole == EURUSD (sans '/') """
o_hist.verify_weeks(b_ticks=False, b_silent=True)
print()
o_hist.verify_weeks(b_ticks=True)
elif button.startswith('Télécharger'):
""" Bouton 'Télécharger'. """
o_hist.download_histos(nb_weeks=1, b_ticks=False) # Candles.
o_hist.download_histos(nb_weeks=1, b_ticks=True)
""" Option : après le téléchargement, on affiche l'état des historiques. """
print()
o_hist.verify_weeks(b_ticks=False, b_silent=True) # Candles.
print()
o_hist.verify_weeks(b_ticks=True) # Ticks.
elif button.startswith('Synchroniser'):
""" Bouton 'Synchroniser bd'. """
o_hist.synchro_db_csv(b_ticks=False)
o_hist.synchro_db_csv(b_ticks=True)
def refresh(self, l_keys):
""" Méthode appelée automatiquement à chaque modification de paramètre dans le dockable. """
if l_keys[1].startswith(' '): # Les libellés des boutons commencent par des espaces.
""" Les 3 boutons pour la base de données. Affichages dans le terminal. """
self.histo_data(l_keys[1].strip())
elif isinstance(self.get_param(l_keys[-1]), bool) or l_keys[-1] in ['Instrument', 'Nombre']:
""" Nodes à updater : Propagation aval, seulement pour les signaux (cases à cocher). """
self.lo_sockets_out[0].to_update()
""" Infrastructure : Insertion du mot 'infrastructure' dans l_keys, en 1ère place. """
l_keys = ['infrastructure'] + l_keys
super().refresh(l_keys)
class Calcul(CtrlCalcul):
""" ********** Le code ci-dessous ne concerne pas le poste de contrôle, mais seulement les calculs. ********** """
def __init__(self):
super().__init__()
self.o_yaml = YamlParams(self)
self.d_stamps = dict()
self.l_pilot = list()
self.o_histos = None
self.b_forcing_calc = True
def descr_signal(self, od_descr, val, root_key):
""" - Création du super-dictionnaire de description. Voir docstring dans la classe-mère.
- key_dock doit avoir EXACTEMENT la même orthographe que la clé affichée
dans le dockable des paramètres du node CLIENT.
- Sources EXCLUSIVES : self.od_mydic et self.o_yaml. """
instrument = self.od_mydic.read('Instrument')
od_descr.write('changed', True)
od_descr.write('instrument', instrument)
for signal_name, val in self.od_mydic.items():
if isinstance(val, bool) and signal_name != 'Checked':
key_dock = f'{self.s_id}-{signal_name}' # Ex : Histos1-Maille 7
od_descr.write([key_dock, 'notes'], signal_name)
od_descr.write([key_dock, 'actif'], self.od_mydic.read(signal_name, False)) # True ou False.
if instrument != self.od_descr.get('instrument'):
self.d_stamps.clear()
def pre_process(self):
""" Voir docstring dans la classe-mère. """
""" La récupération des paramètres se fait EXCLUSIVEMENT depuis le super-dictionnaire self.od_descr. """
""" Le tableau numpy (self.np_array) est entièrement rempli, en une seule passe. """
""" Traitement du pilote (pilote = image du slider 'Position'). """
""" 1 - Signal le +lent : ts_right = timestamp à droite de la fenêtre lorsque son début est à gauche. """
""" 2 - Recherche l'index de ts_right dans le plus rapide. """
""" 3 - ts_left = timestamp à gauche de la fenêtre du signal le +rapide. """
""" 4 - Calcul de l'offset minimum pour que tous les signaux soient affichés à 100 %. """
def get_index(_list, _val):
""" La liste doit être triée (sens croissant). """
try:
return _list.index(_val)
except ValueError:
for indx, val in enumerate(_list):
if val > _val:
return indx
return len(_list) - 1 # Le dernier par défaut
d_tables = {
'Ticks': 'Ticks', 'Maille 1': 'r1', 'Maille 2': 'r2', 'Maille 3': 'r3', 'Maille 4': 'r4', 'Maille 5': 'r5',
'Maille 7': 'r7', 'Maille 10': 'r10', 'Maille 14': 'r14', 'Maille 20': 'r20', '1 mn': 'm1', '5 mn': 'm5',
'15 mn': 'm15', '30 mn': 'm30', '1 h': 'h1', '4 h': 'h4', '1 j': 'day', '1 s': 'week'
}
abscissa_length = 300_000 # Étendue maximum du Slider 'Position'.
width_x = 316 # Nombre de points affichés = valeur du slider 'Largeur' par défaut.
instrument = self.od_descr.get('instrument', 'EUR/USD')
self.o_histos = CtrlHistos(instrument)
l_stamps, l_tables = list(), list()
slowest, fastest, len_min, len_max, ts_right = '', '', 10**6, 0, 0
for l_key in self.od_descr.key_list():
if l_key[-1] == 'actif' and self.od_descr.read(l_key):
""" Cette devise est cochée dans le dockable des paramètres. """
key_dock = l_key[0]
signal_name = key_dock.split('-')[1]
table_name = d_tables.get(signal_name)
l_tables.append((table_name, self.od_descr.read([key_dock, 'num_col'])))
if signal_name not in self.d_stamps:
self.d_stamps[signal_name] = self.o_histos.get_stamps(table_name, abscissa_length)
len_st = len(self.d_stamps[signal_name])
if len_st < len_min:
""" Recherche du signal le plus lent (le moins dense) <-- dense = nb valeurs/unité de temps. """
len_min = len_st
ts_right = self.d_stamps[signal_name][min(width_x, len_min)]
if len_st > len_max:
""" Recherche du signal le plus rapide (le plus dense). """
len_max = len_st
fastest = signal_name
l_stamps += self.d_stamps[signal_name]
self.l_pilot = list(set(l_stamps)) # Suppression des doublons.
self.l_pilot.sort() # Tri.
indx_left = max(0, get_index(self.d_stamps[fastest], ts_right) - width_x)
stamp_left = self.d_stamps[fastest][indx_left]
offset = get_index(self.l_pilot, stamp_left)
self.nb_points = len(self.l_pilot) - offset + 1
""" Infos nécessaires pour get_matrix(). """
self.od_descr['offset'] = offset
self.od_descr['bd_tables'] = l_tables
self.od_descr['pips'] = self.o_histos.pips
def get_matrix(self, pointer, nb_lines=0):
""" Construction du ndarray de retour à partir des ndarray du dictionnaire self.d_datas. """
_from, _to = pointer, pointer+nb_lines
np_matrix = np.full((nb_lines, 19), np.nan) # 1 + 18 colonnes (voir d_tables de pre_process() ci-dessus.)
""" Signal pilote (celui qui présente le plus de détails). """
pips = self.od_descr.get('pips')
offset = self.od_descr.get('offset')
try:
final_stamp = self.l_pilot[offset + pointer + nb_lines]
except (Exception,):
return np_matrix
for table_name, num_col in self.od_descr.get('bd_tables'):
slice_histo = self.o_histos.get_datas(table_name, final_stamp, nb_lines)
pass
if table_name == 'Ticks':
vector = (slice_histo[:, 1] + slice_histo[:, 2]) / (2 * pips)
elif table_name.startswith('r'):
vector = slice_histo[:, 1]
else:
vector = (slice_histo[:, 4] + slice_histo[:, 8]) / (2 * pips)
np_matrix[-slice_histo.shape[0]:, num_col] = vector
return np_matrix
histos_yaml.py
et histos_seeder.yaml
existent./nodes/generateur/histos/histos_yaml.py
:
# Imports internes.
from nodes.yaml_parent import YamlParent
class YamlParams(YamlParent):
def __init__(self, o_calc):
super().__init__(o_calc, __file__)
/nodes/generateur/histos/histos_seeder.yaml
: Ce fichier est vide, mais il doit exister.
Oui mais ... une nouvelle version de CtrlHistos
est nécessaire pour satisfaire à l'implémentation des calculs dans le node histos
:
self.pips
ajouté.try ... except
ajouté autour de requests.get(...)
._existing_lists()
améliorée (plus rapide).get_stamps()
ajoutée.get_datas()
ajoutée.☐ Fichier /trading/historiques/ctrl_histos.py
:
# Imports externes
import datetime
import gzip
import os # http://www.python-simple.com/python-modules-fichiers/os-path.php (Ctrl + clic)
import requests
from io import BytesIO, StringIO
import pandas as pd
import keyboard # https://www.delftstack.com/fr/howto/python/python-detect-keypress/
# Imports internes
from functions.utils import DateTime, Utils
from trading.historiques.db_candle import DbCandle
from trading.historiques.db_tick import DbTick
class CtrlHistos:
def __init__(self, instrument):
""" https://github.com/fxcm/MarketData
Tous les symboles :
AUDCAD, AUDCHF, AUDJPY, AUDNZD, AUDUSD, CADCHF, CADJPY, EURAUD, EURCHF,
EURGBP, EURJPY, EURNZD, EURUSD, GBPCAD, GBPCHF, GBPJPY, GBPNZD, GBPUSD,
NZDCAD, NZDCHF, NZDJPY, NZDUSD, USDCAD, USDCHF, USDJPY, USDTRY
Les plus utilisés (Dans l'ordre d'importance) :
EURUSD, USDJPY, EURCHF, USDCAD, NZDUSD, EURGBP, EURJPY, GBPJPY, GBPCHF, GBPUSD """
self.instrument = instrument
self.symbol = instrument.replace('/', '')
self.dt = DateTime()
self.ut = Utils()
self.kb_break = 'f12' # 'shift', 'ctrl', ... à votre convenance.
self.pips = .01 if instrument.endswith('JPY') else .0001
self.symbol_dir = None
self.db_candle = None
self.db_tick = None
self.setup()
def setup(self):
""" Les données peuvent se trouver sur un disque dur SSD autre que celui en cours.
Le cas échéant, db_path doit être modifié (chemin absolu).
Il est également possible, pour les besoins des tests, d'avoir plusieurs bases de données.
|_ Il faudra alors modifier db_path chaque fois que l'on désire switcher. """
db_path = os.path.dirname(__file__) # <--- Chemin par défaut, à modifier si autre disque dur.
self.symbol_dir = os.path.abspath(f'{db_path}/db/{self.symbol}') # Dossier des fichiers d'historiques.
os.makedirs(self.symbol_dir, exist_ok=True)
self.db_candle = DbCandle(self)
self.db_tick = DbTick(self)
def _helper(self, key, *l_params):
""" ************ Collection d'utilitaires. Nombre de paramètres variable. ************ """
if key == 'nb_weeks_in_year':
""" Renvoie le nombre de semaines dans l'année {year}. """
year = l_params[0] # l_params = [année dont on veut connaître le nombre de semaines].
if len(l_params) > 1:
y, num_week, _ = datetime.date.today().isocalendar()
if y == year:
return num_week - 1
o_dat = self.dt.get_date_from_dtstamp(self.dt.get_dtstamp_from_dtstr(f'{year}-12-28', '%Y-%m-%d'))
return o_dat.isocalendar()[1]
elif key == 'l_iso_yw':
""" Renvoie une liste de tuples ISO (iso_year, iso_str_week) : <-- exemple de tuple : (2019, '08')
- Pour la devise en cours et son type (tick ou candle).
- Toutes les semaines, du début à aujourd'hui.
- L'année est un int, la semaine une str ('02', '12', ...) """
b_ticks = l_params[0] # l_params = [b_ticks].
st_yw = set() # Le set() évite les doublons.
since = 2018 if b_ticks else 2012
yw_today = datetime.date.today().isocalendar()[:2]
for year in range(since, yw_today[0] + 1):
for week in range(1, 54):
iso = self.dt.isoyw_from_fxyw(year, week)
st_yw.add((iso[0], f'0{iso[1]}'[-2:]))
if (year, week) >= yw_today:
lt_yw = list(st_yw)
lt_yw.sort()
return lt_yw
elif key == 'csv_file':
""" Renvoie le chemin complet du fichier correspondant à la devise en cours, à l'année et à la semaine. """
year, week, b_ticks = l_params # l_params = [*t_yw, b_ticks]
str_week = f'0{week}'[-2:]
return os.path.join(self.symbol_dir, str(year), f"{'tick' if b_ticks else 'candle'}_"
f"{str_week}.csv")
elif key == 'url_file':
""" Renvoie l'url du fichier d'histos correspondant à la devise en cours, à l'année et à la semaine. """
year, str_week, b_ticks = l_params # l_params = [*t_yw, b_ticks]
url_candle = 'https://candledata.fxcorporate.com/m1'
url_tick = 'https://tickdata.fxcorporate.com'
return f'{url_tick if b_ticks else url_candle}/{self.symbol}/{year}/{int(str_week)}.csv.gz'
def verify_weeks(self, b_ticks, b_silent=False):
""" Affichage d'un tableau dans la console (Lignes=années, Colonnes=semaines).
'▒' ou '▄' ou '▀' = semaine présente, '.' = semaine absente. """
""" 1 - Listes des semaines réellement existantes en base de données et en fichier .csv. """
l_weeks_csv, l_weeks_db = self._existing_lists(b_ticks)
""" 2 - Affichage du titre et entêtes de colonnes. """
typ = 'Ticks' if b_ticks else 'Candles'
self.ut.gauge(f"{self.instrument}-{typ}", large=17)
[self.ut.gauge(char=f"{str(week) :<2}") for week in range(1, 54)]
self.ut.gauge('end')
""" 3 - Affichage du contenu - Ordonnées = années, abscisse = semaines. """
l_weeks = l_weeks_csv + l_weeks_db
l_weeks.sort()
first_year = (2018 if b_ticks else 2012) if len(l_weeks) == 0 else l_weeks[0][0]
year_now = datetime.date.today().isocalendar()[0]
for year in range(first_year, year_now + 1):
self.ut.gauge(year, large=17)
""" Boucle sur les semaines de l'année {year} : 1 à (52 ou 53). """
for week in range(1, self._helper('nb_weeks_in_year', year, 'today') + 1): # today limite à aujourd'hui.
t_yw = year, f'0{week}'[-2:] # 7 -> 07
b_csv, b_db = t_yw in l_weeks_csv, t_yw in l_weeks_db
if not b_csv and not b_db: # 0 0 /x./y
char, color = '. ', 'BLEU'
elif not b_csv and b_db: # 0 1 /x.y
char, color = '▀ ', 'VERT'
elif b_csv and not b_db: # 1 0 x./y
char, color = '▄ ', 'VIOLET'
else: # 1 1 x.y
char, color = '▒ ', 'BLEU'
self.ut.gauge(char=char, color=color)
self.ut.gauge('end')
""" Vérification de la synchronisation entre la base de données et les fichiers csv. """
msg = "\nAttention : La base de données n'est pas entièrement synchronisée ! csv + db = rectangles bleus." \
"\nL'affichage ci-dessus montre les fichiers .csv en carrés-bas mauves, la db en carrés-hauts verts." \
if not b_silent and l_weeks_csv != l_weeks_db else ''
self.ut.printc(msg)
def download_histos(self, nb_weeks, b_ticks):
if nb_weeks > 1:
self.ut.printc(f"\nArrêt manuel : garder la touche '{self.kb_break.upper()}' appuyée.\n")
""" Intervalle des semaines à télécharger (nombres en base 53). """
""" |_ since = Depuis. """
l_weeks_csv, _ = self._existing_lists(b_ticks=b_ticks) # Semaines = str <-- '08', '09', '10', ...
l_csv = [(y, int(w)) for (y, w) in l_weeks_csv] # Semaines = int <-- 8, 9, 10, ...
for y, w in l_csv:
if w > 2:
since = y * 53 + w - 1
break
else:
since = (2018 if b_ticks else 2012) * 53
y_today, w_today = datetime.date.today().isocalendar()[:2]
now = y_today * 53 + w_today
""" |_ now = Jusqu'à. """
""" l_required = Liste des fichiers à télécharger. """
l_required = list()
for yw53 in range(since, now-1): # dernière semaine = (now-1) => pas la semaine en cours.
csv_yw = yw53 // 53, 1 + yw53 % 53
csv_file = self._helper('csv_file', *csv_yw, b_ticks)
if not os.path.isfile(csv_file):
""" Fichier absent localement => à télécharger. """
l_required.append(csv_yw)
db = self.db_tick if b_ticks else self.db_candle
nb_weeks_loaded = 0
""" Parcours des semaines à traiter. """
for csv_yw_required in l_required:
df = self._download_csv_file(csv_yw_required, b_ticks)
if df is None:
""" Arrêt manuel demandé. """
break
elif df.shape[0] > 0:
""" 1 - Enregistrement du fichier téléchargé sur disque dur. """
self._df_to_csv(df, csv_yw_required, b_ticks)
""" 2 - Écriture du contenu en base de données. break si arrêt manuel demandé. """
if not db.df_to_table(df):
continue
""" 3 - Update des volumes à partir des ticks. """
df_stamps = self.db_tick.get_df_stamps(*csv_yw_required)
if df_stamps.shape[0] > 0:
self.db_candle.update_volume(df_stamps)
""" Arrêt si le nombre de semaines demandées est atteint. """
nb_weeks_loaded += 1
if nb_weeks_loaded >= nb_weeks:
break
s = 's' if nb_weeks > 1 else ''
self.ut.printc(f"Téléchargement{s} terminé{s}.", color='vert')
def synchro_db_csv(self, b_ticks):
""" A l'issue de cette méthode, TOUTE la db sera l'image de TOUS les fichiers csv. """
""" 1 - Base de données. """
db, typ = (self.db_tick, 'Ticks') if b_ticks else (self.db_candle, 'Candles')
""" 2 - État actuel des historiques => Différences entre csv et db
- l_drop = semaines à supprimer de la db, l_add = semaines à ajouter à la db. """
l_drop, l_add, l_vol = self._lists_to_update_db(b_ticks=b_ticks)
if l_drop+l_add+l_vol == []:
self.ut.printc(f"{typ} csv <--> db : OK. La synchronisation est inutile.\n", color='VERT')
return True
""" 3 - Message : état initial. """
for l_weeks in [l_drop, l_add]:
if len(l_weeks) > 0:
s = 's sont' if len(l_weeks) > 1 else ' est'
verb = 'ajouter à' if l_weeks == l_add else 'supprimer de'
self.ut.printc(f"{len(l_weeks)} semaine{s} à {verb} la base de données '{typ.lower()}'.", color='VERT')
""" 4 - Ajout de données dans la db. """
self._add_weeks(l_add, b_ticks)
""" 5 - Suppression de données de la db. """
for yw in l_drop:
db.delete_week(*yw)
""" 6 - Update des volumes. """
for yw in l_vol:
if keyboard.is_pressed(self.kb_break):
self.ut.printc("Arrêt manuel demandé.")
break
df_stamps = self.db_tick.get_df_stamps(*yw)
self.db_candle.update_volume(df_stamps)
""" 7 - Vérification. """
l_drop, l_add, l_vol = self._lists_to_update_db(b_ticks=b_ticks)
success, color = ('réussi', 'VERT') if l_drop+l_add+l_vol == [] else ('échoué', 'ROUGE')
self.ut.printc(f'{typ} : La synchronisation a {success}.', color=color)
def _existing_lists(self, b_ticks):
""" Retourne les listes de semaines des fichiers csv et des enregistrements en base de données. """
""" s_weeks_csv n'est pas une list(), mais un set(), pour éviter les doublons. """
l_weeks_csv, s_weeks_db = list(), set()
db = self.db_tick if b_ticks else self.db_candle
l_iso_yw = self._helper('l_iso_yw', b_ticks) # Liste de toutes les semaines.
typ = 'Ticks' if b_ticks else 'Candles'
self.ut.gauge(f"{self.instrument}-{typ} Création listes :", large=0)
for num, iso_yw in enumerate(l_iso_yw):
""" Semaines existant en fichiers csv : Pour une semaine iso, on a 0, 1 ou 2 fichiers csv. """
l_csv_files, last_stamp = self._get_csv_files(*iso_yw, b_ticks)
csv_size = 0
if len(l_csv_files) > 0:
l_weeks_csv.append(iso_yw)
for csv_file in l_csv_files:
csv_size += os.path.getsize(csv_file)
""" Liste des semaines existant en base de données. """
csv = last_stamp if b_ticks else csv_size
if db.week_exists(*iso_yw, csv):
s_weeks_db.add(iso_yw) # (1)
if num % 8 == 0:
self.ut.gauge(char='.') # , color='Jaune')
self.ut.gauge('end')
self.ut.gauge('end') # Ligne vide supplémentaire.
""" Persistance en pkl. """
l_weeks_db = list(s_weeks_db)
l_weeks_csv.sort()
l_weeks_db.sort()
return l_weeks_csv, l_weeks_db
def _get_csv_files(self, iso_year, iso_week, b_ticks):
""" Attention ! Les noms de fichiers csv ne correspondent pas toujours à leur contenu ! """
""" Return : liste de noms complets de fichiers csv correspondant à (iso_year, iso_week) : 0, 1 ou 2 fichiers.
fxcorporate ne respecte pas la norme ISO. De plus :
- D'une année sur l'autre, le nom du fichier csv ne correspond pas à son contenu.
- On ne trouve pas de correspondance entre diverses devises.
- Par conséquent, le nom du fichier n'est pas fiable pour trouver le bon N° de semaine.
La méthode adoptée pour contourner ce problème est la suivante :
- Recherche de 3 fichiers csv voisins de (iso_year, iso_week) passés en paramètres.
- Lecture, pour chacun, de la date-heure du milieu du fichier.
- On en déduit le VRAI N° de semaine ISO.
Il arrive parfois, au changement d'année, qu'une même semaine soit composée de 2 fichiers csv.
- C'est pourquoi la valeur de retour est une liste. """
int_week, l_fxyw = int(iso_week), list()
last_stamp = 0
for week in range(int_week - 1, int_week + 1): # 2 Semaines : précédente, actuelle.
""" Correction : N° de semaine en base 53. """
y, w = (iso_year, week) if week > 0 else (iso_year - 1, week + 53)
y, w = (y, w) if w <= 53 else (y + 1, w - 53)
""" Les 3 fichiers csv candidats. """
str_w = f'0{w}'[-2:]
file_name = f"{'tick' if b_ticks else 'candle'}_{str_w}.csv" # 'tick_??.csv' ou 'candle_??.csv'.
csv_path = os.path.abspath(f"{os.path.dirname(self.symbol_dir)}/{self.symbol}/{y}/{file_name}")
if os.path.isfile(csv_path):
with open(csv_path, 'r') as fh:
""" On 'goûte' les 300 derniers bytes du fichier pour connaitre le N° ISO de semaine. """
nb_char = 300
fh.seek(0, os.SEEK_END)
fh.seek(fh.tell() - nb_char)
dt_str = fh.read().split('\n')[-3].split(',')[0]
last_stamp = self.dt.get_dtstamp_from_dtstr(dt_str)
o_d = self.dt.get_date_from_dtstamp(last_stamp) # Objet date.
if o_d.isocalendar()[:2] == (iso_year, int_week): # Filtrage.
l_fxyw.append(csv_path) # Candidat sélectionné.
return l_fxyw, last_stamp # 0, 1 ou 2 éléments.
def _download_csv_file(self, csv_yw_required, b_ticks):
""" Téléchargement, décompactage, décodage, DataFrame, enregistrement sur disque dur.
- return DataFrame. """
""" 1 - Demande au helper de construire l'url. """
url_required = self._helper('url_file', *csv_yw_required, b_ticks)
""" 2 - Initialisation des bytes. """
content = b''
""" 3 - Téléchargement en streaming avec lots de taille {size//50}, permettant l'insertion d'une jauge. """
y, w = csv_yw_required
try:
with requests.get(url_required, stream=True) as req:
size = int(req.headers['Content-Length'])
if size < 1_000:
return pd.DataFrame() # return df vide.
print()
self.ut.gauge(f"Téléchargement {' ticks ' if b_ticks else 'candles'} {self.symbol} ({y}, {w})")
for chunk in req.iter_content(chunk_size=size//50):
if keyboard.is_pressed(self.kb_break):
self.ut.printc("Arrêt manuel demandé.")
self.ut.gauge('end')
return None
content += chunk
""" 3.1 - Jauge de progression. """
self.ut.gauge()
self.ut.gauge('end')
except (Exception,):
self.ut.printc('Connexion internet interrompue.')
return None
""" 4 - Décompactage du buffer de données binaires (codées utf), en mémoire. """
buf = BytesIO(content)
f = gzip.GzipFile(fileobj=buf)
coded_bytes = f.read()
""" 5 - Décodage : utf-8 pour les candles, utf-16 pour les ticks. """
codec, type_data = ('utf-16', 'tick') if b_ticks else ('utf-8', 'candle')
decoded_bytes = coded_bytes.decode(codec)
""" 6 - Conversion Bytes -> String """
decoded_str = StringIO(decoded_bytes)
""" 7 - String to DataFrame. """
df = pd.read_csv(decoded_str) # Pas de colonne-index (index_col=0 retiré)
""" 8 - Retourne le dataframe. """
return df
def _df_to_csv(self, df, t_yw, b_ticks):
""" Contrôle préalable : si le dossier-cible n'existe pas, on le crée. """
csv_path = self._helper('csv_file', *t_yw, b_ticks)
os.makedirs(os.path.dirname(csv_path), exist_ok=True)
df.to_csv(csv_path, index=False) # Copie du fichier .csv sur disque dur.
@staticmethod
def _csv_to_df(csv_file):
""" Conversion du fichier {csv_file} en dataframe. """
return pd.read_csv(csv_file) if os.path.isfile(csv_file) else pd.DataFrame()
def _lists_to_update_db(self, b_ticks):
""" Retourne 3 listes : l_drop, l_add et l_vol.
Algorithme pour l_drop et l_add :
- (1) = l_weeks_db : Liste des semaines réellement existantes en base de données.
- (2) = l_weeks_csv : Liste des semaines réellement existantes en fichiers csv.
- Intersection (Inter) = (1) ⋂ (2)
- l_drop : Liste des semaines à supprimer de la db (celles qui n'existent pas en csv) : (1) - (Inter)
- l_add : Liste des semaines à ajouter dans la db (celles des csv qui n'existent pas en db) : (2) - (Inter)
--------------------------------
l_vol = liste des semaines (yw) de candles nécessitant un update du champ 'Volume' dans les tables.
Sans objet pour les ticks.
"""
""" 1 et 2 - Listes des semaines réellement existantes en base de données et en fichier .csv. """
l_weeks_csv, l_weeks_db = self._existing_lists(b_ticks=b_ticks)
""" 3 - Liste des semaines à supprimer de la db (celles qui n'existent pas en csv). """
l_drop = list()
for t_yw in l_weeks_db:
if t_yw not in l_weeks_csv:
# print('A supprimer :', t_yw) # MAP : Permet de lister les semaines qui sont à supprimer.
l_drop.append(t_yw)
""" 4 - Liste des semaines à ajouter à la db (celles des csv qui n'existent pas en db). """
l_add = list()
for t_yw in l_weeks_csv:
if t_yw not in l_weeks_db:
# print('A ajouter :', t_yw) # MAP : Permet de lister les semaines qui sont à ajouter.
l_add.append(t_yw)
""" 5 - Liste des semaines de ticks pour lesquelles les volumes des candles sont à updater. """
l_vol = list()
if not b_ticks: # Renkos non concernés.
""" Semaines ticks existantes (b_ticks=True). """
_, l_weeks_tick_db = self._existing_lists(b_ticks=True)
for yw in l_weeks_tick_db:
if yw in l_weeks_db:
if not self.db_candle.is_volume_in_week(*yw):
l_vol.append(yw)
return l_drop, l_add, l_vol
def _add_weeks(self, l_yw, b_ticks):
"""
@param l_yw : Liste des semaines à ajouter à la base de données.
@param b_ticks: True = ticks, False = candles.
"""
db = self.db_tick if b_ticks else self.db_candle
self.ut.printc(f"\nArrêt manuel : garder la touche '{self.kb_break.upper()}' appuyée.\n")
for iso_yw in l_yw:
if keyboard.is_pressed(self.kb_break):
self.ut.printc("Arrêt manuel demandé.")
break
l_csv_files, _ = self._get_csv_files(*iso_yw, b_ticks) # Liste de 0, 1 ou 2 fichiers.
for csv_file in l_csv_files:
df = self._csv_to_df(csv_file)
if df.shape[0] > 0:
db.df_to_table(df)
def get_stamps(self, table_name, nb_stamps):
db = self.db_tick if (table_name == 'Ticks' or table_name[0] == 'r') else self.db_candle
return db.get_last_stamps(table_name, nb_stamps)
def get_datas(self, table_name, final_stamp, nb_points):
b_ticks = table_name == 'Ticks' or table_name[0] == 'r' # Ticks ou Renko
db = self.db_tick if b_ticks else self.db_candle
return db.get_datas(table_name, final_stamp, nb_points)
def main(): # MAP ***** MAP ***** MAP ***** MAP ***** MAP ***** MAP ***** MAP *****
""" Code de MAP (mise au point). Espace expérimental : ajouter, supprimer, commenter, décommenter, ... """
h = CtrlHistos('EUR/USD')
# h = CtrlHistos('USD/JPY')
# h = CtrlHistos('GBP/USD')
""" Candles. """
# h.download_histos(1, b_ticks=False)
# h.synchro_db_csv(b_ticks=False)
h.verify_weeks(b_ticks=False)
""" Ticks. """
# h.download_histos(1, b_ticks=True)
# h.synchro_db_csv(b_ticks=True)
h.verify_weeks(b_ticks=True)
if __name__ == '__main__':
main()
Oui mais ... une nouvelle version de la classe DataBase
est nécessaire pour satisfaire à l'implémentation des calculs dans le node histos
:
self.pips
modifié.get_datas()
ajoutée.get_otable()
ajoutée.☐ Fichier /trading/historiques/db.py
:
from peewee import SqliteDatabase
from functions.utils import DateTime, Utils
import datetime
import pandas as pd
import numpy as np
import keyboard
import os
""" (Russe - vidéo) https://www.youtube.com/watch?v=8dla28TLvwA <- Ctrl + Clic
(Français - txt) https://linuxtut.com/fr/9d4e1d0afac1865acdbb/
(Anglais - vidéo) https://www.youtube.com/watch?v=Vk6Ptnvqr4M
"""
# noinspection PyUnresolvedReferences
class DataBase:
def __init__(self, o_ctrl, b_ticks):
self.o_ctrl = o_ctrl
self.b_ticks = b_ticks
self.dt = DateTime()
self.ut = Utils()
self.kb_break = o_ctrl.kb_break # Nom de la touche du clavier défini dans le contrôleur CtrlHistos.
self.pips = o_ctrl.pips
self.week_delta = datetime.timedelta(weeks=1) # <class 'datetime.timedelta'>
self.day_delta = datetime.timedelta(days=1) # <class 'datetime.timedelta'>
self.o_table = None
pd.set_option('mode.chained_assignment', None)
""" 3 valeurs pour b_ticks : True (ticks), False (candles) ou None (renkos). """
file_name = 'tick' if b_ticks is True else ('candle' if b_ticks is False else 'renko')
sql_file = os.path.abspath(f"{o_ctrl.symbol_dir}/{file_name}.sql")
self.db = SqliteDatabase(sql_file)
self.d_tables = dict()
""" Création physique du fichier *.sql sur disque dur. """
self.db.connect()
def change_datetime_column(self, df):
""" Mise en conformité de la colonne DateTime : string to float. """
if 'DateTime' in df.columns:
df = df.rename(columns={'DateTime': 'timestamp'})
dt0 = np.datetime64(self.dt.get_dtstr_from_dtstamp(0, dt_format='%Y-%m-%d %H:%M:%S')) # '1970-01-01 01:00'
t_delta = np.timedelta64(1, 's') # <class 'numpy.timedelta64'>
pd_dt = pd.to_datetime(df.timestamp, format='%m/%d/%Y %H:%M:%S.%f') # format nécessaire, sinon très lent.
last_stamp = (np.datetime64(pd_dt.iloc[-1]) - dt0) / t_delta
o_d = self.dt.get_date_from_dtstamp(last_stamp)
yw = o_d.isocalendar()[:2] # (year, week)
len_batch = int(.95 * df.shape[0] / 50)
l_ts = list()
self.ut.gauge(f'Conformité {yw}')
for _from in range(0, df.shape[0], len_batch):
if keyboard.is_pressed(self.kb_break):
self.ut.printc("Arrêt manuel demandé.")
self.ut.gauge('end')
return None
_to = _from + len_batch
self.ut.gauge()
l_ts += [(np.datetime64(x) - dt0) / t_delta for x in pd_dt[_from: _to]]
self.ut.gauge('end')
df.timestamp = l_ts
return df
def get_db_sign(self):
nb_enr = 0
for o_table in list(self.d_tables.values()):
nb_enr += o_table.select().count()
return nb_enr
def _get_range_from_df(self, df):
""" Renvoie un tuple de stamps, qui encadrent df, à l'extérieur de df (samedis 22:00). """
f_stamp = df['timestamp'].iloc[0] # Premier stamp réel dans df. <-- début.
l_stamp = df['timestamp'].iloc[-1] # Dernier stamp réel dans df. <-- fin.
stamp_before = f_stamp + self.dt.get_stamp_offset(f_stamp, b_after=False, num_day=6, _time='22:00')
stamp_after = l_stamp + self.dt.get_stamp_offset(l_stamp, b_after=True, num_day=6, _time='22:00')
return stamp_before, stamp_after
def _get_range_from_yw(self, year, str_week):
stamp = self.dt.get_dtstamp_from_dtstr(f'{year}-{str_week}-3 10', dt_format='%G-%V-%u %H') # Mercredi 10:00
stamp_before = stamp + self.dt.get_stamp_offset(stamp, b_after=False, num_day=6, _time='22:00')
stamp_after = stamp + self.dt.get_stamp_offset(stamp, b_after=True, num_day=6, _time='22:00')
return stamp_before, stamp_after
def is_volume_in_week(self, *yw):
""" Compte le nombre d'enregistrements total et le nombre d'enregistrements ayant du volume. """
""" Limites de la semaine : samedi 22:00 à samedi 22:00. """
range_stamps = self._get_range_from_yw(*yw)
""" Parcours de toutes les tables et cumul des valeurs. """
nb_total = nb_volume = 0
for o_table in list(self.d_tables.values()):
nb_total += o_table.select(o_table.Volume).where(
o_table.timestamp.between(*range_stamps)
).count()
nb_volume += o_table.select(o_table.Volume).where(
o_table.timestamp.between(*range_stamps),
~o_table.Volume.is_null()
).count()
return nb_volume > nb_total * .98 # Booléen.
def delete_week(self, *yw):
""" Supprime la semaine {yw} de chacune des tables """
range_stamps = self._get_range_from_yw(*yw)
for o_table in list(self.d_tables.values())[::-1]: # Parcours à l'envers.
o_table.delete().where(o_table.timestamp.between(*range_stamps)).execute()
def get_datas(self, table_name, final_stamp, nb_points):
o_table = self.get_otable(table_name)
""" 1 - Lecture des {nb_points} derniers points, le plus récent en premier. """
l_datas = o_table.select().order_by(o_table.timestamp.desc()).where(o_table.timestamp <= final_stamp).limit(nb_points)
""" 2 - Peewee -> DataFrame Pandas """
df = pd.DataFrame(list(l_datas.dicts()))
""" 3 - DataFrame Pandas -> Numpy avec retournement : le plus récent en dernier. """
np_datas = df.values[::-1] # Retournement
return np_datas
def get_otable(self, table_name):
if table_name == 'Ticks':
return self.o_table
else:
d_tables = self.o_renko.d_tables if table_name[0] == 'r' else self.d_tables
return d_tables.get(table_name)
def get_last_stamps(self, table_name, nb_points):
o_table = self.get_otable(table_name)
""" 1 - Lecture des {nb_points} derniers points, le plus récent en premier. """
l_datas = o_table.select(o_table.timestamp).order_by(o_table.timestamp.desc()).limit(nb_points)
""" 2 - Peewee -> DataFrame Pandas """
df = pd.DataFrame(list(l_datas.dicts()))
""" 3 - DataFrame Pandas -> List avec retournement : le plus récent en dernier. """
return list(df['timestamp'])[::-1] if df.shape[0] > 0 else []
""" Méthodes surchargées. """
def df_to_table(self, df):
return True
def update_derived(self, df):
pass
def update_derived_table(self, df, stamp_before, stamp_after, o_derived_table):
pass
def week_exists(self, year, str_week, last_stamp):
return False
Oui mais ... une nouvelle version de la classe CtrlCalcul
est nécessaire pour satisfaire à l'implémentation des calculs dans le node histos
:
self.len_buffer
devient un getter
(une @property
).
self.len_buffer
(voir méthode get_signature()).
# Imports externes
import copy
import shutil
import os
import hashlib
import numpy as np
import pickle
# Imports internes
from functions.utils import Dictionary, DateTime, Utils
from pc.ui_node import UiNode
from pc.parameters import Parameters
from pc.ctrl_socket import CtrlSocket
from pc.ctrl_edge import CtrlEdge
class Helper:
@staticmethod
def deepcopy(reference):
return copy.deepcopy(reference)
@staticmethod
def new_od(dic=None):
return Dictionary(dic)
@staticmethod
def join(*l_variants, sep='-'):
""" Exemple d'appel 1 : y = self.join(a, b, c, d, ...)
Exemple d'appel 2 : y = self.join(*l_x) <-- Ne pas oublier '*'
Concatène les éléments de l_strings en une chaine. Les chaînes vides sont ignorées. le séparateur est sep.
:param l_variants: Nombre quelconque d'éléments de tout type.
:param sep: '-' par défaut. Peut être '' (vide) ou \n (retour à la ligne) ... ou autre.
:return: Chaîne concaténée.
Exemple l_strings = ['Paris', 'Lille', '', 'Bruxelles', 'Prague'] => return "Paris-Lille-Bruxelles-Prague"
"""
string_txt = ''
for string in l_variants:
if string != '':
string_txt += f'{sep}{string}' # Accepte tout type de variable pour string.
return string_txt[len(sep):] # Suppression du 1er sep.
class CtrlNode(Helper):
def __init__(self, o_scene, s_id, pos):
self.o_scene = o_scene
self.s_id = s_id # Clé d'entrée dans le super-dictionnaire pkl.
self.od_pkl = self.o_scene.o_pkl.od_pkl
self.o_grnode = None
self.o_params = None
self.pos = pos # pos = (x, y)
self.width = 96
self.height = 60
self.type = ''
self.ut = Utils()
self.main_key = f"Paramètres du node '{self.s_id}'"
self.b_chk = bool(self.od_pkl.read([self.s_id, 'b_chk'], True))
self.b_on = self.b_chk
self.child_file = ''
""" Sockets. """
self.lo_sockets_in = list() # Liste d'objets 'sockets in' instanciés.
self.lo_sockets_out = list() # Liste d'objets 'sockets out' instanciés.
self.default_color = '#88bbff' # Couleur par défaut des sockets de sortie.
self.l_sep_inputs = [] # Emplacement des séparateurs (après les entrées indiquées).
self.l_sep_outputs = [] # Emplacement des séparateurs (après les sorties indiquées).
""" Contenu spécifique. """
self.o_grcontent = None
""" Secousses => Court-circuits. """
self.dt = DateTime()
self.l_short_circuits = list() # Surchargé par les classes dérivées.
self.l_shortables = list()
self.b_removable = True # Les edges d'entrée peuvent être détruits en cas de court-circuit.
self.k_shake = 0 # Nombre de secousses.
self.b_shaking = False # Secousse en cours.
self.x, self.y = 0, 0 # Position antérieure (x, y), mémorisée.
self.dx, self.dy = 0, 0 # Deltas (variations) antérieurs, mémorisés.
def setup(self, child_file):
""" Code appelant : classe dérivée, on connait donc ses attributs. """
""" Paramètres. """
self.o_params = Parameters(self)
""" Hauteur automatique du node : elle dépend du nombre de sockets en entrée ou en sortie (le plus grand). """
h_title = self.d_display['geometry']['h_title']
nb_sockets_max = max(len(self.ld_inputs), len(self.ld_outputs)) # En fait : Sockets + séparateurs.
first_y = 16 * (1 + (h_title + 4) // 16) # Multiple de 16.
self.height = first_y + nb_sockets_max * 16 - 4
""" Nouveau node par drag & drop. """
self.child_file = child_file
b_new = self.pos != (0, 0) # Nouveau node (node dropé).
if b_new:
""" - Cas d'un nouveau node (dropé).
- Son centre apparaît exactement à la position du curseur de la souris.
- On mémorise son path et sa position. """
self.pos = self.pos[0] - self.width // 2, self.pos[1] - self.height // 2
path = 'nodes' + os.path.relpath(child_file).split('nodes')[1][:-3].replace(os.sep, '.')
self.od_pkl.write([self.s_id, 'path'], path)
else:
self.pos = self.od_pkl.read([self.s_id, 'position'], (0, 0))
""" Instanciation de l'UI. """
self.o_grnode = UiNode(self) # Gestion de l'UI (Interface utilisateur) : dessin couleurs, ...
if b_new:
self.o_grnode.save_pos() # Important ! à la fin (contient o_pkl.backup()).
self.o_scene.o_grscene.addItem(self.o_grnode) # Incorporation dans la scène.
""" Sockets. """
num_socket = 0
for i, d_input in enumerate(self.ld_inputs):
""" On complète les dictionnaires de base. """
if isinstance(d_input, dict): # Si ce n'est pas un dictionnaire, c'est un séparateur.
d_input['pos'] = True, i # On ajoute l'emplacement. Tuple (input: True, num_position).
d_input['id'] = self.id, num_socket # On ajoute l'identifiant. Tuple (id_node, num_socket)
self.lo_sockets_in.append(CtrlSocket(self, d_input)) # Socket instancié et ajouté à la liste.
num_socket += 1
num_socket = 0
for i, d_output in enumerate(self.ld_outputs):
""" On complète les dictionnaires de base. """
if isinstance(d_output, dict): # Si ce n'est pas un dictionnaire, c'est un séparateur.
d_output['pos'] = False, i # On ajoute l'emplacement. Tuple (input: False, num_position).
d_output['id'] = self.id, num_socket # On ajoute l'identifiant. Tuple (id_node, num_socket)
self.lo_sockets_out.append(CtrlSocket(self, d_output)) # Socket instancié et ajouté à la liste.
num_socket += 1
self.short_circuit_check()
@property
def id(self):
""" Ex : 'Node 14' renvoie 14. """
return int(self.s_id[4:])
@property
def d_display(self):
""" Attributs par défaut pouvant être surchargés dans les classes dérivées. """
return {
'geometry': {'h_title': 32, 'round': 6},
'col_pen': {'select': '#ffa637', 'hover': '#888', 'on': "#000", 'off': '#aaaa00'}, # ordre = priorité.
'col_brush': {'on': "#8888bbcc", 'off': '#cccccccc'},
'thick_pen': {'hover': 4., 'leave': 2.}
}
def get_default(self):
""" Titre du node et couleurs des 'socket_out' pour chaque type de node. """
""" Complétée par les classes dérivées avec leurs paramètres statiques et dynamiques. """
""" 1) Paramètres communs : Titre, bouton et autant de couleurs que de sorties. """
d_default = {
'Titre du node': 'Choisir un titre',
'*🔆': '' # Un astérisque devant indique qu'il s'agit d'un bouton.
}
""" 2) Ajout de paramètres fixes spécifiques au type de node, AVANT les couleurs. """
d_default.update(self.fixed0_params()) # méthode fixed_params()
""" 3) Couleurs. """
d_colors = dict()
for d_output in self.ld_outputs:
if isinstance(d_output, dict):
d_colors[d_output['label']] = self.default_color
if d_colors:
d_default['Couleurs'] = d_colors
""" 4) Ajout de paramètres fixes spécifiques au type de node, APRES les couleurs. """
d_default.update(self.fixed_params()) # méthode fixed_params()
""" 5) Ajout des paramètres dynamiques, qui dépendent des signaux entrants. """
d_default.update(self.dynamic_params) # propriété dynamic_params
""" 6) Dictionnaire complet. Paramètres : fixes communs + fixes spécifiques + dynamiques. """
return {self.main_key: d_default}
def get_param(self, l_key, v_default=None):
if self.o_params is None:
return v_default
od_params = Dictionary(self.o_params.od_params[self.main_key])
return od_params.read(l_key, v_default)
def set_checked(self, val):
self.b_chk = bool(val)
""" Update my_signals() """
for o_socket_out in self.lo_sockets_out:
o_socket_out.to_update()
""" Infrastructure. """
self.o_scene.infrastructure()
""" Enregistrement. """
self.od_pkl.write([self.s_id, 'b_chk'], self.b_chk)
self.o_scene.o_ur.b_action = True # Ajout dans l'historique du "Undo-Redo".
self.o_scene.o_pkl.backup()
""" ************************* Méthodes à surcharger ************************** """
@property
def ld_inputs(self):
return list()
@property
def ld_outputs(self):
return list()
def fixed0_params(self):
""" - Renvoie un dictionnaire de paramètres, indépendants des connexions.
- Placé AVANT les couleurs.
- Exemple : le node de type 'Labo' renvoie 'Expérience', etc. """
return {}
def fixed_params(self):
""" - Renvoie un dictionnaire de paramètres, indépendants des connexions.
- Placé APRES les couleurs.
- Exemple : le node de type 'Signaux' renvoie 'Sinus', 'Cosinus', etc. """
return {}
def bundle_params(self):
""" - Renvoie un dictionnaire de paramètres pour chaque entrée.
- Une entrée reçoit un faisceau de signaux (bundle). """
return {}
def my_params(self, context):
"""
:param context: Liste de 7 valeurs, pouvant être utilisée pour construire le dictionnaire de sortie :
- context[0] typ_id_from = Type + id du node en amont, producteur du signal. Ex : 'MM12'.
- context[1] signal_ante_from = Nom du signal créé par le node en amont du node producteur. Ex : 'Cosinus'.
- context[2] signal_now_from = Nom du signal créé par le node en amont. Ex : 'SMA18'.
- context[3] signal_source_from = 'Provenance du signal' affiché dans le node en amont.
- context[4] (num_input, num_signal) = Tuple. num_signal = ordre du signal dans le faisceau entrant.
- context[5] signal_title = Titre du signal dans le dockable des paramètres.
- context[6] signal_source = 'Provenance du signal' -> texte avec retours à la ligne.
:return: Dictionnaire formaté pour les paramètres.
|_ Exemple {'Type de MM': ['SMA', {'values': ['SMA', 'EMA', 'SMMA', 'LWMA']}], "Périodes (sep=',')": '14'}
"""
return {}
def my_signals(self, l_signals_in, num_socket_out):
""" Description des signaux délivrés à la sortie N° num_socket_out.
Comment ça marche ?
- Cette fonction prend en entrée tous les signaux entrants ainsi que les paramètres pkl.
- Un traitement spécifique est décrit, qui produit plusieurs signaux.
- Ces signaux sont ensuite distribués aux différentes sorties du node.
:param l_signals_in: Signaux d'entrée. Cette liste contient autant d'éléments que d'entrées connectées.
- Chaque élément est un tuple : (typ_id_from, signal_ante_from, signal_now_from, signal_source_from, \
(num_input, num_signal), signal_title, typ_id, signal_ante, signal_source)
(Les variables suffixées de '_from' proviennent du node en amont).
:param num_socket_out: N° de la sortie.
:return: Liste de signaux exclusivement destinés à la sortie N° num_socket_out.
Chaque signal est décrit par un tuple à 4 valeurs : (typ_id, signal_ante, signal_now, signal_source).
La valeur de signal_now doit être soigneusement choisie afin d'éviter des doublons de noms.
"""
raise SystemExit(self.child_file + '\nCtrlNode.my_signals() : Cette méthode doit être surchargée.')
""" *********************** Fin - Méthodes à surcharger ********************** """
@property
def dynamic_params(self):
""" Code appelant : self.get_default().
Ces paramètres sont dynamiques car ils dépendent des signaux connectés aux entrées.
- Ce node demande la liste des signaux à chaque socket d'entrée : o_socket_in.l_signals
- Cette liste est formatée de la même façon pour tous les types de node.
- C'est une liste de tuples, chaque tuple a 4 valeurs et caractérise un signal.
- Il est rappelé qu'un edge véhicule non pas un signal, mais un faisceau de signaux (un faisceau).
- Le format d'un tuple est le suivant :
- typ_id_from = Type + id du node en amont, producteur du signal. Ex : 'MM12'.
- signal_ante_from = Nom du signal créé par le node en amont du node producteur. Ex : 'Cosinus'.
- signal_now_from = Nom du signal créé par le node producteur. Ex : 'SMA18'.
- signal_source_from = Texte destiné à être affiché dans le dockable, dans 'Provenance du signal :'
- Node
|_ Socket_in
|_ Faisceau de signaux
|_ Signal
|_ Paramètre dynamique.
"""
if not self.lo_sockets_in:
""" Tant que l'initialisation n'est pas terminée, lo_sockets_in est une liste vide."""
return {}
d_params = dict()
nb_inputs = len(self.lo_sockets_in)
for k in range(nb_inputs):
d_input = self.bundle_params() # Paramètres pour cette entrée.
""" Récupération en cascade : Socket_in -> Edge -> Socket_out -> Node -> Socket_in -> etc. """
for num_signal, t_signal in enumerate(self.lo_sockets_in[k].l_signals):
typ_id_from, signal_ante_from, signal_now_from, signal_source_from = t_signal[:4] # tuple à 4 valeurs.
signal_title = self.join(typ_id_from, signal_ante_from, signal_now_from)
signal_source = self.join(signal_source_from, self.join(typ_id_from, signal_now_from), sep='\n')
""" Paramètres communs à tous les signaux. """
d_input[signal_title] = {
'$Provenance du signal :': [signal_source, {'readonly': True}],
'Signal actif': True,
}
l_args = [num_signal, signal_title, signal_source] + list(t_signal)
d_input[signal_title].update(self.my_params(l_args))
d_params[f'Entrée {k}'] = d_input
if nb_inputs == 1 and 'Entrée 0' in d_params:
""" S'il n'y a qu'une entrée, inutile d'écrire, dans le titre des paramètres, de laquelle il s'agit. """
d_params = d_params['Entrée 0']
return d_params
def get_signals(self, num_socket):
""" Code appelant : oSocket_out N° num_socket.
- Ce node crée des signaux et les fournit aux nodes en aval par l'intermédiaire des sockets et des edges.
- Il crée autant de faisceaux de signaux (vecteurs) qu'il a de sorties.
- Pour chaque sortie, la liste fournie est formatée de la même façon pour tout type de node.
- Voir détails ci-dessus, dans dynamic_params().
"""
if not (self.b_chk and self.lo_sockets_out): # Setup en cours
return []
""" Update od_params. """
self.o_params.set_params()
nb_inputs = len(self.lo_sockets_in)
l_all_signals_in = list()
for k in range(nb_inputs):
""" Le nom de l'entrée se sera pas écrit dans le titre des paramètres, s'il n'y en a qu'une. """
l_name_input = [f'Entrée {k}'] if nb_inputs > 1 else []
l_signals = list() # Liste des signaux actifs de l'entrée N° k.
""" self.lo_sockets_in[k].l_signals = Faisceau de signaux arrivant sur l'entrée k. """
for num_signal, t_signal in enumerate(self.lo_sockets_in[k].l_signals):
typ_id_from, signal_ante_from, signal_now_from, signal_source_from = t_signal[:4] # tuple à 4 valeurs.
signal_title = self.join(typ_id_from, signal_ante_from, signal_now_from)
if self.get_param(l_name_input + [signal_title, 'Signal actif'], False):
typ_id, signal_ante = f'{self.type}{self.id}', self.join(signal_ante_from, signal_now_from)
signal_source = self.join(signal_source_from, self.join(typ_id_from, signal_now_from), sep='\n')
l_signals.append(t_signal[:4] + ((k, num_signal), signal_title, typ_id, signal_ante, signal_source))
l_all_signals_in.append(l_signals)
return self.my_signals(l_all_signals_in, num_socket)
""" ******************************* Secousses ******************************* """
def short_circuit_check(self):
""" Passage unique dans cette méthode, au démarrage. Fin programme si erreur.
- Peut-on supprimer les edges d'entrée en cas de court-circuit ?
- Oui, à condition que TOUTES les sorties existent dans self.l_short_circuits. """
l_indx_outputs = list() # Liste des index de sortie.
d_counter = dict()
for short_circuit in self.l_short_circuits: # Exemple -> self.l_short_circuits = [(0, 0), (0, 2)]
num_input = short_circuit[0]
num_output = short_circuit[1]
if num_input >= len(self.ld_inputs):
""" Fin programme. """
raise SystemExit(f"La liste des court-circuits du type de node '{self.type}' est erronée."
f"\nL'entrée N°{num_input} est hors limites."
f"\nRevoir le contenu de self.l_short_circuits dans le setup de '{self.type}'.")
if num_output >= len(self.ld_outputs):
""" Fin programme. """
raise SystemExit(f"La liste des court-circuits du type de node '{self.type}' est erronée."
f"\nLa sortie N°{num_output} est hors limites."
f"\nRevoir le contenu de self.l_short_circuits dans le setup de '{self.type}'.")
l_indx_outputs.append(num_output) # Dans cet exemple -> ( 0 , 2 )
if num_output in d_counter:
d_counter[num_output] += 1
else:
d_counter[num_output] = 1
for i, d_output in enumerate(self.ld_outputs):
if isinstance(d_output, dict):
if i not in l_indx_outputs:
""" Condition non satisfaite. """
self.b_removable = False # b_removable est à True dans __init__().
break
""" Contrainte technologique : Plusieurs entrées sur une même sortie est interdit ! """
for nb_inputs in d_counter.values():
if nb_inputs > 1:
""" Fin programme. """
raise SystemExit("Plusieurs entrées sont court-circuitables sur une même sortie."
f"\nRevoir le contenu de self.l_short_circuits dans le setup de '{self.type}'.")
def set_shortables(self):
""" Méthode appelée lorsque le bouton gauche de la souris est appuyé (sur ce node).
- Chaque type de node possède sa liste de court-circuits faisables.
- Retourne une liste de tuples.
- Chaque tuple est composé des 2 gr_edges pouvant être réunis pour n'en faire qu'un. """
self.l_shortables = list()
for short_circuit in self.l_short_circuits: # Exemple -> self.l_short_circuits = [(0, 0), (0, 2)]
lo_gredges_in = list(self.lo_sockets_in[short_circuit[0]].get_gredges())
lo_gredges_out = list(self.lo_sockets_out[short_circuit[1]].get_gredges())
if lo_gredges_in: # Cette liste contient 0 ou 1 edge.
for o_gredge_out in lo_gredges_out:
self.l_shortables.append((lo_gredges_in[0], o_gredge_out))
def shake(self):
def shortcircuit():
for to_gredge in self.l_shortables: # Tuples de gredges.
o_socket_out = to_gredge[0].o_edge.o_socket_out # From, depuis.
o_socket_in = to_gredge[1].o_edge.o_socket_in # To, jusqu'à.
""" Suppression des 2 edges du tuple. """
self.o_scene.o_grscene.removeItem(to_gredge[1]) # Edge de sortie.
if self.b_removable:
self.o_scene.o_grscene.removeItem(to_gredge[0]) # Edge d'entrée.
""" Création du nouvel edge de court-circuit. """
CtrlEdge(self.o_scene, o_socket_out, o_socket_in)
o_socket_in.to_update()
""" Persistance. """
self.o_scene.save_edges(backup=True)
""" Infrastructure. """
self.o_scene.infrastructure()
def raz_shakes():
""" Après cette raz, un nouveau court-circuit est autorisé. """
self.k_shake = 0
self.b_shaking = False
if self.b_shaking or not self.l_shortables:
return
self.dt.delay(raz_shakes, delay=300)
x, y = self.o_grnode.pos().x(), self.o_grnode.pos().y() # Positions.
dx, dy = x - self.x, y - self.y # Déplacements (delta x, delta y).
b_cond = (round(dx*self.dx) < 0) or (round(dy*self.dy) < 0)
""" Mémorisation. """
self.x, self.y = x, y
self.dx, self.dy = dx, dy
if b_cond:
""" Le mouvement a changé de sens => incrémentation du compteur. """
self.k_shake += 1
if self.k_shake > 10: # Valeur à ajuster.
self.b_shaking = True
self.k_shake = 0
self.dt.delay(shortcircuit, delay=50)
""" ***************************** Fin secousses ***************************** """
def set_colors(self, l_keys):
""" - La couleur est appliquée au socket_out concerné.
- Elle est également appliquée à tous ses éventuels edges connectés.
- A leur tour, ceux-ci propagent cette couleur à leur socket d'arrivée. """
for o_socket_out in self.lo_sockets_out:
if o_socket_out.__class__.__name__ == 'CtrlSocket':
if o_socket_out.label == l_keys[-1]:
""" Affectation de la couleur au socket_out concerné. """
o_socket_out.color = self.get_param(l_keys, '#666')
""" Récupération de tous les edgess partant de ce socket. """
l_gredges = list(o_socket_out.get_gredges())
for o_gredge in l_gredges:
""" Affectation de la même couleur à chaque edge. """
o_gredge.o_edge.color = o_socket_out.color
""" Affectation de la même couleur à chaque socket d'arrivée. """
o_gredge.o_edge.o_socket_in.color = o_socket_out.color
""" Rafraîchissement de l'affichage. """
self.o_grnode.update()
def need_update(self, l_keys):
"""
:param l_keys: Clé, dans od_params, du paramètre modifié (liste).
:return: True si les nodes en aval nécessitent d'être recalculés, False sinon.
"""
l_param_names = list(self.my_params('')) + list(self.fixed_params()) # Les paramètres du dockable.
b_actif = self.get_param([l_keys[1], 'Signal actif'], True) # True 'Signal actif' n'existe pas (générateurs).
""" Update nécessaire (si l'état actif a changé) OU (si actif ET un des paramètres a changé). """
return (l_keys[-1] == 'Signal actif') or (b_actif and l_keys[-1] in l_param_names)
def get_state(self):
""" Chaque node, selon ses spécificités, retourne un booléen de son état on/off (True/False). """
b_on = False
if self.b_chk:
for key in self.o_params.od_params.key_list():
if key[-1] == 'Signal actif':
if self.o_params.od_params.read(key, False):
b_on = True
break
return b_on
def recalculate(self, num_input):
""" - Propagation vers l'aval. Les nodes finaux sont des afficheurs ou des actionneurs.
- Après cela, les nodes finaux construisent leur dict de calcul par propagation vers l'amont. """
for o_socket_out in self.lo_sockets_out:
o_socket_out.recalculate()
def update_dcalc(self, d_calc):
if self.b_on:
""" Propagation amont. """
for o_socket_in in self.lo_sockets_in:
o_socket_in.update_dcalc(d_calc)
od_params = Dictionary(self.o_params.od_params[self.main_key])
""" Construction du dictionnaire pour les calculs, à partir du od_params. """
od_calc = Dictionary()
path = 'nodes' + os.path.relpath(self.child_file).split('nodes')[1][:-3].replace(os.sep, '.')
od_calc.write('Compute', self.type)
od_calc.write('Yaml file', self.get_yaml_files()[0])
od_calc.write('Compile from', path)
od_calc.write('Checked', self.b_chk)
for key in od_params.key_list():
if key[0] == 'Titre du node' or key[0] == 'Couleurs' or key[0] == '*🔆':
continue
if key[-1] == '$Provenance du signal :':
continue
od_calc.write(key, od_params.read(key))
""" Modification 'en place'. """
d_calc[self.s_id] = dict(od_calc)
def get_yaml_files(self):
py_file = os.path.basename(self.child_file)
seeder = self.child_file.replace(py_file, f'{py_file[:-3]}_seeder.yaml')
path_node = f"{__file__.split('pc')[0]}backups/{self.o_scene.graph_name}/node{self.id}"
extended_params = os.path.abspath(f'{path_node}/{self.id}-{py_file[:-3]}.yaml')
return extended_params, seeder
def yaml(self):
""" Méthode appelée lorsqu'on clique sur le bouton '🔆'. Lancement de l'appli associée à '.yaml'. """
extended_params, seeder_file = self.get_yaml_files()
path_node = os.path.dirname(extended_params)
os.makedirs(path_node, exist_ok=True)
if not os.path.exists(extended_params):
""" Création du fichier de paramètress étendus en yaml : copie du seeder si possible, sinon vide. """
if os.path.exists(seeder_file):
shutil.copyfile(seeder_file, extended_params)
else:
with open(extended_params, 'w'):
pass
if not os.path.exists(extended_params):
return
""" Lancement du fichier avec application associée. """
os.startfile(extended_params)
def rebuild(self, b_input=True):
""" Reconstruction du node suite à un changement dynamique du nombre d'entrées/sorties. """
""" Suppression des (sockets + edges) de ce node. """
lo_sockets = self.lo_sockets_in if b_input else self.lo_sockets_out
for o_socket in lo_sockets:
""" Suppression des éventuels edges connectés à ce socket. """
for o_gredge in list(o_socket.get_gredges()):
self.o_scene.o_grscene.removeItem(o_gredge)
""" Suppression du socket. """
self.o_scene.o_grscene.removeItem(o_socket.o_grsocket)
o_socket.o_grsocket = None # Suppression de l'objet.
""" Nettoyage de la liste des entrées/sorties. """
lo_sockets.clear()
""" Redessine le node avec sa nouvelle hauteur. """
ld_inputs, ld_outputs = self.deepcopy(self.ld_inputs), self.deepcopy(self.ld_outputs)
nb_inputs, nb_outputs = len(ld_inputs), len(ld_outputs)
h_title = self.d_display['geometry']['h_title']
first_y = 16 * (1 + (h_title + 4) // 16)
self.height = first_y + max(nb_inputs, nb_outputs) * 16 - 4 # Nouvelle hauteur.
self.o_grnode.set_outline()
""" Régénération de sockets. """
for pos, d_input in enumerate(ld_inputs):
""" On complète les dictionnaires de base. Pas de séparateurs. """
if isinstance(d_input, dict):
indx = len(self.lo_sockets_in)
d_input['pos'] = True, pos # On ajoute la pos. Tuple (input: True, num_position).
d_input['id'] = self.id, indx # On ajoute l'identifiant. Tuple (id_node, num_socket)
self.lo_sockets_in.append(CtrlSocket(self, d_input)) # Socket instancié et ajouté à la liste.
for pos, d_output in enumerate(ld_outputs):
""" On complète les dictionnaires de base. Pas de séparateurs. """
if isinstance(d_output, dict):
indx = len(self.lo_sockets_out)
d_output['pos'] = False, pos # On ajoute la pos. Tuple (output: False, num_position).
d_output['id'] = self.id, indx # On ajoute l'identifiant. Tuple (id_node, num_socket)
self.lo_sockets_out.append(CtrlSocket(self, d_output)) # Socket instancié et ajouté à la liste.
""" Régénération des edges. """
self.o_scene.show_edges()
self.o_scene.save_edges()
""" Dockable des paramètres : le nombre d'entrées a changé. """
self.o_params.set_params()
self.dt.delay(self.o_scene.show_params, 2000) # delay : Permet de continuer la saisie dans le dockable.
def refresh(self, l_keys):
if l_keys[-1] == '🔆':
self.yaml()
elif l_keys[-1] == 'Titre du node':
""" Titre du node. """
self.o_grnode.set_title()
elif l_keys[-2] == 'Couleurs':
""" Couleur des sockets et des edges. """
self.set_colors(l_keys[1:])
else:
""" Fin de refresh - Appel conditionnel à infrastructure(). """
if self.b_chk and (l_keys[0] == 'infrastructure' or l_keys[-1] == 'Signal actif'):
""" Les nodes de type générateur doivent insérer le mot 'infrastructure'
en 1ère place dans la liste l_keys[], car ils n'ont pas 'Signal actif' dans leurs paramètres. """
self.o_scene.infrastructure()
""" Tous les paramètres, autres que le titre et les couleurs, influencent les calculs. """
self.o_scene.recalculate()
class CtrlCalcul(Helper):
""" Attributs de classe. """
d_servers = dict() # Dictionnaire de classe.
od_calcs = Dictionary()
def __init__(self):
self.id = None
self.s_id = None
self.compute = None
self.o_yaml = None
self.l_mykey = list()
self.np_array = None
self.b_forcing_calc = False
self.od_descr = Dictionary()
self.od_mydic = Dictionary()
self.nb_points = self.len_buffer
self.no_params = ['Compute', 'Yaml file', 'Compile from', 'Checked']
def setup_server(self, id_server):
self.id = id_server
self.l_mykey = self.get_keys_from_idnode(id_server) # Clé dans self.od_calcs.
self.od_mydic = Dictionary(self.od_calcs.read(self.l_mykey))
self.o_yaml.yaml_file = self.od_mydic.read(['Yaml file'])
self.compute = self.od_mydic.read(['Compute'])
self.s_id = f"{self.compute}{self.id}"
self.o_yaml.update_odyaml() # Update du super-dictionnaire des paramètres (self.o_yaml.od_yaml).
@property
def len_buffer(self):
for l_key in self.od_calcs.key_list():
""" Recherche dans dictionnaire global (tous les nodes). """
if l_key[-1] == 'Abscisse':
len_buffer = self.od_calcs.read(l_key)
break
else:
len_buffer = 10**5
return int(len_buffer)
@property
def dt_servers_in(self):
""" - Le dictionnaire retourné contient autant de clés que d'entrées connectées et actives.
- Chaque valeur est un tuple : (o_server, signature, edge). """
""" 1 - Récupération de la liste de tous les edges dans self.od_calcs. """
s_edges = set()
ll_keys = self.od_calcs.key_list()
for l_keys in ll_keys:
if l_keys[-1] == 'edges':
s_edges.update(self.od_calcs.read(l_keys))
""" 2 - Instance et signature des nodes-serveurs en amont connectés aux entrées. """
d_servers = dict()
for edge in list(s_edges):
if edge[1][0] == self.id:
""" edge = Edge connecté à l'entrée de self. """
o_server, signature = self.get_server(edge[0][0])
d_servers[f'e{edge[1][1]}'] = o_server, signature, edge
return d_servers
@classmethod
def get_oserver(cls, id_server, compile_from):
""" Ce code est appelé par un node-client. Il aura un serveur par entrée.
- Chaque node-serveur est associé à une instance de classe Calcul (dérivée de CtrlCalcul).
- Au premier passage concernant ce node-serveur, l'objet o_server n'existe pas.
- Dans ce cas, on le crée par compilation dynamique et on l'enregistre dans un dictionnaire.
- Le dictionnaire est un dictionnaire de classe : cls.d_servers.
- Cela permet d'éviter de créer une nouvelle instance si celle-ci existe déjà.
"""
server_sid = f"{compile_from.split('.')[-1]}{id_server}" # Clé du dictionnaire des serveurs. ex : macd3
if server_sid not in cls.d_servers:
""" Compilation dynamique. Si l'objet o_server n'existe pas, on le crée."""
d_context = {}
code = f"from {compile_from} import Calcul; o_server=Calcul()"
try:
compiled = compile(code, '<string>', 'exec')
eval(compiled, d_context)
cls.d_servers[server_sid] = d_context['o_server']
except (Exception,) as err:
print(server_sid, ': Erreur de compilation dans', compile_from, '\n', err)
return None
return cls.d_servers[server_sid]
def get_keys_from_idnode(self, id_node):
"""
@param id_node: ex : 1
@return: Renvoie la clé d'accès au node 'node_sid', dans le dictionnaire général self.od_calcs.
"""
node_sid = f'Node{id_node}' # Ex : 'Node1'
for l_key in self.od_calcs.key_list():
if l_key[1] == node_sid:
return l_key[:2] # ['calc_*', 'Node*'] Clé du node-serveur dans le dictionnaire général self.od_calcs.
def get_server(self, id_server):
l_key_node = self.get_keys_from_idnode(id_server)
compile_from = self.od_calcs.read(l_key_node + ['Compile from'])
if compile_from is None:
return None
child = self.od_calcs.read(l_key_node + ['Expérience'], '')
compile_from += f'0{child}'[-2:] if isinstance(child, int) else ''
o_server = self.get_oserver(id_server, compile_from)
if o_server is None:
return None, None
o_server.setup_server(id_server)
return o_server, o_server.get_signature()
def get_signature(self):
""" La valeur de retour est un hash de od_yaml, od_mydic, liste des signatures des nodes-serveurs. """
l_signs_in = list()
for server_in in self.dt_servers_in.values():
""" server_in = tuple (instance, signature) du node-serveur. """
l_signs_in.append(server_in[1]) # server_in[1] = signature.
""" Assemblage des constituants de la signature. """
params = (self.o_yaml.od_yaml, self.od_mydic, l_signs_in, self.len_buffer)
return f'{self.id}-{hashlib.sha256(str(params).encode("utf-8")).hexdigest()}'
def calculation(self):
""" Ici node-serveur. Première méthode appelée par le node-client en aval, avant de commencer ses calculs. """
""" 1 - Chargement des derniers datas mémorisés en *.calc (pickle) : self.od_descr et self.np_array. """
self.load_last_datas() # Super-dictionnaire et tableau numpy vides si *.calc inexistant ou erroné.
""" 2 - Création du dict. de description actualisé od_descr, aux fins de comparaison avec self.od_descr. """
od_descr = self.setup_od_descr()
""" 3 - Comparaison sélective des descriptifs now et ante. Si aucune différence, return (calculs inutiles). """
self.nb_points = self.len_buffer
if self.now_equal_ante(od_descr):
return
self.od_descr = od_descr
""" 4 - Propagation des calculs en amont. """
min_points = self.len_buffer
for o_server, _, edge in self.dt_servers_in.values():
""" Bien que ce soit cette même méthode, ce n'est pas une récursivité, mais une propagation amont."""
o_server.calculation()
min_points = min(min_points, o_server.nb_points)
if o_server.b_forcing_calc:
self.b_forcing_calc = True
""" 5 - self.np_array vide. """
nb_cols = len([key for key in list(self.od_descr.keys()) if key.startswith(f'{self.s_id}-')])
self.np_array = np.full((self.len_buffer, nb_cols+1), fill_value=np.nan, dtype=np.float32) # nan
self.np_array[:, 0] = 0 # col0 = status (0 ou 1)
""" 6 - Traitement des calculs. """
self.pre_process()
self.nb_points = min(self.nb_points, min_points)
""" 7 - Persistance en *.calc (pickle). """
self.create_out_calc()
def load_last_datas(self):
""" Ici, node-serveur. Récupération des derniers datas depuis son fichier out_*.calc. Les datas sont :
- Un dictionnaire descriptif des consignes : 'self.od_descr'.
- Un tableau numpy : 'self.np_array'.
- Pour certains nodes, les datas ne sont pas dans out_*.cal, mais dans une base de données.
- Le cas échéant, cette méthode sera surchargée. """
server_root = os.path.dirname(self.o_yaml.yaml_file)
os.makedirs(server_root, exist_ok=True)
out_file = os.path.normpath(f'{server_root}/out.calc')
try:
with open(out_file, 'rb') as calc:
self.od_descr, self.np_array = pickle.load(calc)
if not self.od_descr or self.np_array.size == 0 or self.np_array.shape[0] != self.len_buffer:
raise() # On force le passage par except.
except (Exception,):
""" Initialisation. """
self.od_descr, self.np_array = self.new_od(), np.empty((self.len_buffer, 0))
if not self.dt_servers_in: # Dans la méthode suivante setup_od_descr() ...
self.od_mydic.write(['self_server', 'Signal actif'], True) # ... voir if l_keys[-1] != 'Signal actif':
def setup_od_descr(self):
""" Création du super-dictionnaire descriptif des calculs : od_descr.
- Une clé pour chaque signal, actif ou pas, ayant exactement le même libellé que celui
qui est affiché dans le dockable du node-client. Exemple pour un node 'Signaux' : 'Signaux1-Sinus'
- La valeur de cette clé est un dictionnaire ayant pour clés obligatoires :
- Diverses valeurs, spécifiques au signal, nécessaires aux calculs.
- num_col : Entier. Défini au dernier passage, sinon -1, destinée aux nodes-clients.
- actif : Booléen.
- notes : Aucune utilité en programmation, à l'attention du développeur.
- A la racine du dictionnaire, outre les clés de signaux, il y a :
- signature : signature sha256. Contient un assemblage des paramètres + signatures des nodes-serveurs.
Cette signature-composite nous permet de savoir si les signaux entrants ont changé.
- roots : liste de tuples, autant que de signaux en entrée : (name_input, key_dock)
Ex : name_input = 'e0', 'e1', ... key_dock = ADD4-Poisson-Signal
- Diverses valeurs globales, spécifiques au node, nécessaires aux calculs.
- Valeur de retour : od_descr.
"""
od_descr = Dictionary()
""" Signature = ma signature + signatures des nodes-serveurs. """
od_descr['signature'] = self.get_signature()
od_descr['roots'] = list()
num_col = 0
for l_keys in self.od_mydic.key_list():
if l_keys[-1] != 'Signal actif':
continue
key = l_keys[:-1]
val = self.od_mydic.read(key)
name_input = 'e' + l_keys[0][-1] if l_keys[0].startswith('Entrée ') else 'e0'
root_key = self.join(*[self.s_id] + key[-1].split('-')[1:])
if val.get('Signal actif', False):
od_descr['roots'].append((name_input, root_key))
od_descr['name_input'] = name_input
self.descr_signal(od_descr, val, root_key)
del od_descr['name_input']
for key_dock in od_descr.keys():
if key_dock.startswith(root_key):
if od_descr.read([key_dock, 'num_col']) is None:
num_col += 1
od_descr.write([key_dock, 'num_col'], num_col)
if key_dock.endswith('-Original'):
try:
od_descr.write([key_dock, 'actif'], val['Original'])
except (Exception,):
pass
elif od_descr.read([key_dock, 'actif']) is None:
od_descr.write([key_dock, 'actif'], val['Signal actif'])
return od_descr
def now_equal_ante(self, od_descr):
l_no_compare = ['actif']
for key_dock in od_descr.key_list():
if key_dock[-1] in l_no_compare:
continue
elif key_dock[0] == 'changed':
""" Passage forcé dans descr_signal() du node. """
return False
elif key_dock[-1] == 'signature' and od_descr.read(key_dock) != self.od_descr.read(key_dock):
return False
elif od_descr.read(key_dock) != self.od_descr.read(key_dock):
return False
else:
for o_server, _, _ in self.dt_servers_in.values():
if o_server.np_array is None:
return False
else:
return True
def descr_signal(self, od_descr, val, root_key):
""" Méthode OBLIGATOIREMENT surchargée par les classes dérivées. """
""" - Création du super-dictionnaire de description 'od_descr'.
- Les clés de 'od_descr' devront contenir les arguments nécessaires aux calculs.
- Les calculs seront effectués par pre_process(), ou, en mode 'générateur', par get_matrix().
- {key_dock} doit avoir EXACTEMENT la même orthographe que
la clé affichée dans le dockable des paramètres du node CLIENT
- Sources EXCLUSIVES : 'self.od_mydic' et 'self.o_yaml'.
- Dynamique - Cette méthode est appelée en boucle par la classe-mère :
- Une fois par signal aux entrées (actif ou pas).
- La construction du super-dictionnaire se fait donc par couches successives.
- Cela ne signifie pas qu'il y a autant de signaux en sortie qu'en entrée !
- Ces 2 nombres sont INDÉPENDANTS !
- En effet, plusieurs signaux d'entrée peuvent contribuer pour un seul signal de sortie ...
- ... et/ou, inversement, un signal d'entrée peut produire plusieurs signaux de sortie.
************************* IMPORTANT *************************
- Le coding se fera par itérations successives :
- Commencer à coder les calculs (pre_process() ou calculate()), qui parcourra ce dictionnaire 'od_descr'
- Si les calculs nécessitent une information absente dans 'od_descr', on revient ici pour l'ajouter.
- On prolonge le coding des calculs, puis on boucle ici ...
- ... jusqu'à la fin.
"""
raise SystemExit(f"{self.s_id} : ctrl_node.py > CtrlCalcul."
"\nLa méthode descr_signal() doit obligatoirement être surchargée.")
def pre_process(self):
""" Méthode surchargée par les classes 'générateur' dérivées : Signaux, Aléatoire, Histos, ... """
""" Cette méthode peut également être surchargée par d'autres classes dérivées, mais attention :
- La classe dérivée doit utiliser EXCLUSIVEMENT la méthode pre_process() OU la méthode get_matrix() :
- pre_process() : Le tableau numpy va être affecté à 100% ...
|_ ... ce qui risque d'être chronophage pour certains calculs.
- get_matrix() : Juste une toute petite partie du tableau numpy va être remplie en live, ...
|_ ... selon les besoins des nodes-clients, grâce à l'utilisation de générateurs python.
|_ Dans ce cas, la réactivité sera excellente, même si les calculs sont complexes.
Méthode pre_process() :
===================
- La récupération des paramètres se fait exclusivement depuis le super-dictionnaire self.od_descr.
- Le tableau numpy existe avec la bonne shape, mais est entièrement vide :
- Autant de colonnes que de signaux, plus une : la colonne de status, en position 0.
- Les valeurs sont des np.nan ...
- ... sauf la colonne 0 qui est remplie de '0'.
- Les valeurs sont produites :
- Par calculs, qui utilisent des fonctions "prêtes à l'emploi", du module 'indicators'.
- Par lecture d'une base de données.
"""
pass
def get_matrix(self, pointer, nb_lines=0):
"""
@param pointer: Pointeur "Depuis", si pointer < 0 , la totalité du tableau est retournée.
@param nb_lines: "Jusqu'a" = pointer + nb_lignes.
- Pour certains nodes, les datas ne sont pas dans 'self.np_array', mais en base de données.
- Le cas échéant, cette méthode sera surchargée.
- Si le node n'est pas de type 'générateur', cette méthode est également surchargée.
- Le traitement est court, ponctuel et partiel.
- A l'issue du traitement, une toute petite partie de tableau numpy 'self.np_array' a été mise à jour.
- La colonne 0 contient des flags (0 ou 1), levés lorsque les lignes correspondantes ont été traitées.
- Le retour est une matrice ayant le même nombre de colonnes que np_array et {nb_lines} lignes.
- Si les données demandées sont dans self.np_array, on les transmet directement.
- Sinon, on les génère, on les place dans self.np_array, puis on les transmet.
- Les paramètres sont à récupérer EXCLUSIVEMENT dans self.od_descr.
- Les données d'entrée sont demandées aux nodes-serveurs des entrées.
"""
_from, _to = pointer, pointer+nb_lines
""" Calculer si 'forcing' ou si le status = 0 (si au moins une valeur de la colonne 0 est à 0). """
if self.b_forcing_calc or np.prod(self.np_array[_from: _to, 0]) == 0:
""" Update de self.np_array. Début de transaction. """
self.process(pointer, nb_lines) # Traitement de tous les signaux aux entrées.
""" flag = 1 (fait) <--- Status. Fin de transaction. """
self.np_array[_from: _to, 0] = 1 # <-- Écriture du status = 1 (en colonne 0).
return self.np_array if pointer < 0 else self.np_array[_from: _to] # Tout ou slice.
def process(self, pointer, nb_lines):
od_buffer = self.new_od()
for name_input, root_key in self.od_descr.get('roots'):
o_server = self.dt_servers_in[name_input][0] # [0] = server, [1] = signature, [2] = edge
""" On demande au serveur une matrice un peu plus grande que nécessaire,
pour permettre l'amorce de certains calculs. On insère {previous_values} valeurs au début. """
previous_values = min(pointer, int(self.od_descr.read(['max_length', root_key], 0)))
np_matrix = o_server.get_matrix(pointer - previous_values, nb_lines + previous_values)
num_col_server = o_server.get_num_col(root_key)
vector_in = np_matrix[:, num_col_server]
od_buffer['name_input'] = name_input
self.calculate(pointer, nb_lines, root_key, vector_in, od_buffer)
del od_buffer['name_input']
if od_buffer:
""" Si dictionnaire non vide, modifié par calculate(), on termine le traitement. """
self.post_process(pointer, nb_lines, od_buffer)
def calculate(self, pointer, nb_lines, root_key, vector_in, od_buffer):
""" Pas de contrôle de surcharge pour les nodes sans entrées (type générateur, ...) """
if self.dt_servers_in:
raise SystemExit(f"{self.s_id} : ctrl_node.py > CtrlCalcul."
"\nLa méthode calculate() doit obligatoirement être surchargée.")
def post_process(self, pointer, nb_lines, od_buffer):
""" - Passage unique, après process() afin de finaliser certains calculs ...
... incluant d'autres signaux que le signal en cours.
- Pré-traitement spécifique. """
pass
def create_out_calc(self):
""" Création du fichier-résultat out_*.calc = (od_descr, liste de tableaux numpy). """
""" 1 - Contrôle - Le tableau nd_array doit avoir une shape à 2 dimensions. """
""" Correction nécessaire si le tableau est un vecteur. """
if len(self.np_array.shape) == 1: # <-- C'est un vecteur, on reshape.
self.np_array = self.np_array.reshape(self.np_array.shape[0], 1)
""" 2 - Persistance. """
server_root = os.path.dirname(self.od_mydic['Yaml file'])
out_file = os.path.normpath(f'{server_root}/out.calc')
try:
with open(out_file, 'wb') as calc:
pickle.dump((self.od_descr, self.np_array), calc, pickle.HIGHEST_PROTOCOL)
except (Exception,) as err:
print(err)
return False
return True
def add_vector(self, key_dock, num_col, vector):
if vector is None:
return
vector = vector.reshape(vector.shape[0], 1)
add_nan = np.full((self.len_buffer - vector.shape[0], 1), np.nan)
vector = np.vstack((add_nan, vector))
if num_col is None or num_col < 0:
""" Nouvelle colonne d'indice num_col (égal au nombre actuel de colonnes). """
num_col = self.np_array.shape[1]
self.np_array = np.hstack((self.np_array, vector))
else:
""" Surcharge de la colonne num_col. """
self.np_array[:, num_col:num_col + 1] = vector
self.od_descr.write([key_dock, 'num_col'], num_col)
def get_num_col(self, client_key): # client_key = MM4-Normal-Original
sliced_key = self.join(*client_key.split('-')[1:]) # sliced_key = Normal-Original (Début retiré).
if sliced_key != '':
for l_key in self.od_descr.key_list(): # [Aléatoire1-Normal, 'num_col']
if l_key[-1] == 'num_col' and l_key[-2].endswith(sliced_key):
return self.od_descr.read(l_key)
def get_vector_in(self, client_key): # client_key = MM4-Normal-Original
return self.np_array[:, self.get_num_col(client_key)]
Vérification
/nodes/indicateurs/moy_mobile/moy_mobile.py
:
# Imports internes
import functions.indicators as indic
from nodes.indicateurs.moy_mobile.moy_mobile_yaml import YamlParams
from pc.ctrl_node import CtrlNode, CtrlCalcul
d_datas = {
'name': 'MMobile', # Label affiché sous l'icone.
'icon': 'moy_mobile.png', # Icone affichée.
}
class Node(CtrlNode):
def __init__(self, o_scene, s_id, pos):
super().__init__(o_scene, s_id, pos)
self.type = 'MM'
self.setup()
def setup(self, child_file=__file__):
self.l_short_circuits = [(0, 0)] # Entrée N°0 'court-circuitable' avec la sortie N°0.
super().setup(child_file)
@property
def ld_inputs(self):
return [{
'label': '',
'label_pos': (6, -10)
}]
@property
def ld_outputs(self):
return [{
'label': 'Sortie',
'label_pos': (-38, -10)
}]
def my_params(self, context):
return {
'Original': True,
'Type de MM': ['SMA', {'values': ['SMA', 'EMA', 'SMMA', 'LWMA']}],
"Périodes (sep=',')": '14',
}
def my_signals(self, l_signals_in, num_socket_out):
""" Faisceau de signaux (l_signals) délivré à la sortie. """
l_signals = list() # Liste de tuples à 4 valeurs.
for signals_in in l_signals_in[0]:
typ_id_from, signal_ante_from, signal_now_from, signal_source_from, \
_, signal_title, typ_id, signal_ante, signal_source = signals_in[:9]
try:
periods = [int(p.strip()) for p in self.get_param([signal_title, "Périodes (sep=',')"]).split(',')]
except (Exception,):
periods = [14]
if self.get_param([signal_title, 'Original'], True):
l_signals.append((typ_id, signal_ante, 'Original', signal_source))
for num, period in enumerate(periods):
legend = f"{signal_ante}-{self.get_param([signal_title, 'Type de MM'], 'SMA')}{period}"
signal_now = f'sign{num}'
signal_source = self.join(signal_source_from, self.join(typ_id_from, signal_now_from), sep='\n')
typ_id, signal_ante = f'{self.type}{self.id}', self.join(signal_ante_from, signal_now_from)
l_signals.append((typ_id, signal_ante, signal_now, signal_source, legend))
return l_signals
def refresh(self, l_keys):
""" Code spécifique. """
if self.need_update(l_keys):
self.lo_sockets_out[0].to_update() # Une seule sortie, N° 0.
super().refresh(l_keys)
class Calcul(CtrlCalcul):
""" ********** Le code ci-dessous ne concerne pas le poste de contrôle, mais seulement les calculs. ********** """
def __init__(self):
super().__init__()
self.o_yaml = YamlParams(self)
def descr_signal(self, od_descr, val, root_key):
""" Voir docstring dans la classe-mère. """
signal_name = root_key.split('-')[1]
""" Original. """
key_dock = f"{root_key}-Original"
od_descr.write([key_dock, 'notes'], f'{signal_name} original')
typ = val['Type de MM']
l_periods = [int(p.strip()) for p in val["Périodes (sep=',')"].split(',')]
od_descr.write(['max_length', root_key], int(1.5 * max(l_periods)))
for num, period in enumerate(l_periods):
""" key_dock doit avoir EXACTEMENT la même orthographe que la clé affichée
dans le dockable des paramètres du node CLIENT. """
key_dock = f'{root_key}-sign{num}'
od_descr.write([key_dock, 'notes'], f'{signal_name} {typ}{period}')
od_descr.write([key_dock, 'typ'], typ)
od_descr.write([key_dock, 'period'], period)
od_descr.write([key_dock, 'kwargs', 'nb_last'], min(period, self.o_yaml.smma_nb_last))
od_descr.write([key_dock, 'kwargs', 'ratio'], self.o_yaml.lwma_ratio)
def pre_process(self):
""" - Passage unique, après descr_signal() et avant get_matrix(). voir docstring dans la classe-mère.
- Pré-traitement spécifique. """
pass # Cette méthode peut être supprimée.
def calculate(self, pointer, nb_lines, root_key, vector_in, od_buffer):
""" Passage multiple. """
_to = pointer + nb_lines
""" Original. """
key_dock = root_key + '-Original'
num_col = self.od_descr.read([key_dock, 'num_col'])
self.np_array[pointer: _to, num_col] = vector_in[-nb_lines:]
""" Si plusieurs moyennes, key_dock se termine par 'sign0', 'sign1', ... """
for key_dock, d_val in self.od_descr.items():
if key_dock.startswith(root_key+'-sign'):
num_col = d_val.get('num_col')
typ = d_val.get('typ')
period = d_val.get('period')
kwargs = d_val.get('kwargs')
vector_out = indic.get_mm(vector_in, typ=typ, period=period, **kwargs)
nb_valid_lines = min(len(vector_out), nb_lines)
self.np_array[_to - nb_valid_lines: _to, num_col] = vector_out[-nb_valid_lines:]
/nodes/indicateurs/macd/macd.py
:
# Imports internes.
from pc.ctrl_node import CtrlNode, CtrlCalcul
from nodes.indicateurs.macd.macd_yaml import YamlParams
import functions.indicators as indic
d_datas = {
'name': 'MACD', # Label affiché sous l'icone.
'icon': 'macd.png', # Icone affichée.
}
class Node(CtrlNode):
def __init__(self, o_scene, s_id, pos):
super().__init__(o_scene, s_id, pos)
self.type = 'MACD'
self.setup()
def setup(self, child_file=__file__):
""" Listes des dictionnaires de base attribués aux sockets. """
self.l_short_circuits = [(0, 0), (0, 1)] # e0 -> s0 et e0 -> s1
super().setup(child_file)
@property
def ld_inputs(self):
return [
{ # Liste de 1 dictionnaire.
'label': 'Entrée',
'label_pos': (6, -10)
}
]
@property
def ld_outputs(self):
return [ # Liste de 2 dictionnaires.
{
'label': 'Sortie',
'label_pos': (-38, -10)
},
'sep', # Séparateur (facultatif).
{
'label': 'Oscillateur',
'label_pos': (-59, -10)
}
]
def my_params(self, context):
""" Code appelant : CtrlNode.dynamic_params() """
return {
'Original': True,
'Type de MM longue': ['SMA', {'values': ['SMA', 'EMA', 'SMMA', 'LWMA']}],
'Moyenne longue': [26, {'compactHeight': False}],
'Type de MM courte': ['SMA', {'values': ['SMA', 'EMA', 'SMMA', 'LWMA']}],
'Moyenne courte': [12, {'compactHeight': False}],
'Type de MM Signal': ['SMA', {'values': ['SMA', 'EMA', 'SMMA', 'LWMA']}],
'Signal': [9, {'compactHeight': False}],
}
def my_signals(self, l_signals_in, num_socket_out):
""" Retourne un faisceau de signaux (l_signals) délivré à la sortie N° num_socket_out. """
l_signals = [[], []] # 2 sorties.
for signals_in in l_signals_in[0]:
typ_id_from, signal_ante_from, signal_now_from, signal_source_from, \
_, signal_title, typ_id, signal_ante, signal_source = signals_in
signal_source = self.join(signal_source_from, self.join(typ_id_from, signal_now_from), sep='\n')
typ_id, signal_ante = f'{self.type}{self.id}', self.join(signal_ante_from, signal_now_from)
""" Sortie 0 : 2 ou 3 signaux. """
orig = ['Original'] if self.get_param([signal_title, 'Original'], True) else []
for signal_now in orig + ["Longue", "Courte"]:
l_signals[0].append((typ_id, signal_ante, signal_now, signal_source))
""" Sortie 1 : 3 signaux. !!! Attention, ce n'est pas le signe moins, mais un caractère spécial !!!"""
for signal_now in ['(Courte–Longue)', 'Signal', 'MACD']:
l_signals[1].append((typ_id, signal_ante, signal_now, signal_source))
return l_signals[num_socket_out]
def refresh(self, l_keys):
""" Ex : l_keys = ["Paramètres du node 'Node0'", 'Signaux-1 : Sinus', 'Signal actif'] """
if self.need_update(l_keys):
""" Update (si l'état actif a changé) OU (si actif ET un des paramètres a changé). """
self.lo_sockets_out[0].to_update()
self.lo_sockets_out[1].to_update()
super().refresh(l_keys)
class Calcul(CtrlCalcul):
""" ********** Le code ci-dessous ne concerne pas le poste de contrôle, mais seulement les calculs. ********** """
def __init__(self):
super().__init__()
self.o_yaml = YamlParams(self)
# noinspection DuplicatedCode
def descr_signal(self, od_descr, val, root_key):
""" Voir docstring dans la classe-mère. """
signal_name = root_key.split('-')[1]
""" Original. """
key_dock = f"{root_key}-Original"
od_descr.write([key_dock, 'notes'], f'{signal_name} original')
""" Moyenne longue. """
per_long = val.get('Moyenne longue', 26)
key_dock = f"{root_key}-Longue"
typ = val.get("Type de MM longue", 'SMA')
od_descr.write([key_dock, 'notes'], f'{signal_name}-Longue {typ}{per_long}')
od_descr.write([key_dock, 'typ'], typ)
od_descr.write([key_dock, 'period'], per_long)
od_descr.write([key_dock, 'kwargs', 'nb_last'], min(per_long, self.o_yaml.long_smma_nb_last))
od_descr.write([key_dock, 'kwargs', 'ratio'], self.o_yaml.long_lwma_ratio)
""" Moyenne courte. """
per_short = val.get('Moyenne courte', 12)
key_dock = f"{root_key}-Courte"
typ = val.get("Type de MM courte", 'SMA')
od_descr.write([key_dock, 'notes'], f'{signal_name}-Courte {typ}{per_short}')
od_descr.write([key_dock, 'typ'], typ)
od_descr.write([key_dock, 'period'], per_short)
od_descr.write([key_dock, 'kwargs', 'nb_last'], min(per_short, self.o_yaml.short_smma_nb_last))
od_descr.write([key_dock, 'kwargs', 'ratio'], self.o_yaml.short_lwma_ratio)
""" Courte - Longue. """ # !!! Attention, ce n'est pas un tiret, mais un caractère spécial !!!
""" Les key_dock sont traitées par la suite par des split('-'). Le tiret ne doit donc pas être utilisé. """
key_dock = f"{root_key}-(Courte–Longue)" # (Courte{caractère spécial}Longue)
od_descr.write([key_dock, 'notes'], f'{signal_name}-Courte - {signal_name}-Longue')
""" Signal. """
per_signal = val.get('Signal', 9)
key_dock = f"{root_key}-Signal"
typ = val.get("Type de MM Signal", 'SMA')
od_descr.write([key_dock, 'notes'], f'{signal_name}-Signal {typ}{per_signal}')
od_descr.write([key_dock, 'typ'], val.get("Type de MM Signal", 'SMA'))
od_descr.write([key_dock, 'period'], per_signal)
od_descr.write([key_dock, 'kwargs', 'nb_last'], min(per_signal, self.o_yaml.signal_smma_nb_last))
od_descr.write([key_dock, 'kwargs', 'ratio'], self.o_yaml.signal_lwma_ratio)
""" MACD. """
key_dock = f"{root_key}-MACD"
od_descr.write([key_dock, 'notes'], f'{signal_name} MACD')
""" La plus grande période (augmentée de 50% par sécurité). """
od_descr.write(['max_length', root_key], 1.5 * max(per_long, per_short, per_signal))
def pre_process(self):
""" Passage unique, après descr_signal() et avant get_matrix(). voir docstring dans la classe-mère. """
pass
def calculate(self, pointer, nb_lines, root_key, vector_in, od_buffer):
""" Passage mutiple - Voir docstring dans la classe-mère. """
_to = pointer + nb_lines
od_val = self.new_od()
l_cols = list()
""" Phase 1 : récupération des paramètres du macd (od_val) et des n° de colonnes (l_cols). """
""" Original. """
key_dock = root_key + '-Original'
l_cols.append(self.od_descr.read([key_dock, 'num_col']))
""" Moyenne longue. """
key_dock = root_key + '-Longue'
l_cols.append(self.od_descr.read([key_dock, 'num_col']))
od_val.write(['long', 'typ'], self.od_descr.read([key_dock, 'typ']))
od_val.write(['long', 'period'], self.od_descr.read([key_dock, 'period']))
od_val.write(['long', 'nb_last'], self.od_descr.read([key_dock, 'kwargs', 'nb_last']))
od_val.write(['long', 'ratio'], self.od_descr.read([key_dock, 'kwargs', 'ratio']))
""" Moyenne courte. """
key_dock = root_key + '-Courte'
l_cols.append(self.od_descr.read([key_dock, 'num_col']))
od_val.write(['short', 'typ'], self.od_descr.read([key_dock, 'typ']))
od_val.write(['short', 'period'], self.od_descr.read([key_dock, 'period']))
od_val.write(['short', 'nb_last'], self.od_descr.read([key_dock, 'kwargs', 'nb_last']))
od_val.write(['short', 'ratio'], self.od_descr.read([key_dock, 'kwargs', 'ratio']))
""" Différence : (Moyenne courte - Moyenne longue). """
key_dock = root_key + '-(Courte–Longue)' # '–' = Caractère spécial.
l_cols.append(self.od_descr.read([key_dock, 'num_col']))
""" Signal. """
key_dock = root_key + '-Signal'
l_cols.append(self.od_descr.read([key_dock, 'num_col']))
od_val.write(['signal', 'typ'], self.od_descr.read([key_dock, 'typ']))
od_val.write(['signal', 'period'], self.od_descr.read([key_dock, 'period']))
od_val.write(['signal', 'nb_last'], self.od_descr.read([key_dock, 'kwargs', 'nb_last']))
od_val.write(['signal', 'ratio'], self.od_descr.read([key_dock, 'kwargs', 'ratio']))
""" MACD. """
key_dock = root_key + '-MACD' # '–' = Caractère spécial.
l_cols.append(self.od_descr.read([key_dock, 'num_col']))
""" Phase 2 : Utilisation de od_val pour obtenir le macd. """
t_macd = indic.macd(vector_in, od_val) # Tuple
""" Phase 3 : Utilisation de t_macd et de l_cols pour updater self.np_array. """
for i, vector_out in enumerate(t_macd):
nb_valid_lines = min(len(vector_out), nb_lines)
self.np_array[_to - nb_valid_lines: _to, l_cols[i]] = vector_out[-nb_valid_lines:]
/nodes/oscillateurs/rsi/rsi.py
:
from pc.ctrl_node import CtrlNode, CtrlCalcul
from nodes.oscillateurs.rsi.rsi_yaml import YamlParams
import functions.indicators as indic
d_datas = {
'name': 'RSI', # Label affiché sous l'icone.
'icon': 'rsi.png', # Icone affichée.
}
class Node(CtrlNode):
def __init__(self, o_scene, s_id, pos):
super().__init__(o_scene, s_id, pos)
self.type = 'RSI'
self.setup()
def setup(self, child_file=__file__):
self.l_short_circuits = [(0, 0)] # Entrée N°0 'court-circuitable' avec la sortie N°0.
super().setup(child_file)
@property
def ld_inputs(self):
return [{
'label': '',
'label_pos': (6, -10)
}]
@property
def ld_outputs(self):
return [{
'label': 'Sortie',
'label_pos': (-38, -10)
}]
def my_params(self, context):
return {
'Zone neutre': [70., {'step': 1, 'limits': (5, 95), 'suffix': '%', 'compactHeight': False}],
"Périodes (sep=',')": '14',
}
def my_signals(self, l_signals_in, num_socket_out):
""" Faisceau de signaux (l_signals) délivré à la sortie. """
l_signals = list()
for signals_in in l_signals_in[0]:
typ_id_from, signal_ante_from, signal_now_from, signal_source_from, \
_, signal_title, typ_id, signal_ante, signal_source = signals_in
periods = [int(p) for p in self.get_param([signal_title, "Périodes (sep=',')"]).split(',')]
for period in periods:
signal_now = f't{period}'
signal_source = self.join(signal_source_from, self.join(typ_id_from, signal_now_from), sep='\n')
typ_id, signal_ante = f'{self.type}{self.id}', self.join(signal_ante_from, signal_now_from)
l_signals.append((typ_id, signal_ante, signal_now, signal_source))
return l_signals
def refresh(self, l_keys):
""" Code spécifique. """
if self.need_update(l_keys):
self.lo_sockets_out[0].to_update() # Une seule sortie, N° 0.
super().refresh(l_keys)
class Calcul(CtrlCalcul):
""" ********** Le code ci-dessous ne concerne pas le poste de contrôle, mais seulement les calculs. ********** """
def __init__(self):
super().__init__()
self.o_yaml = YamlParams(self)
def descr_signal(self, od_descr, val, root_key):
""" - Voir docstring dans la classe-mère.
- Les clés 'low' et 'high' sont utilisées par matplotlib pour colorier les zônes extrêmes. """
signal_name = root_key.split('-')[1]
l_periods = [int(p.strip()) for p in val["Périodes (sep=',')"].split(',')]
od_descr.write(['max_length', root_key], int(1.5 * max(l_periods)))
for period in l_periods:
key_dock = f"{root_key}-t{period}"
""" Limites low et high. """
low = (100 - val['Zone neutre']) / 2
high = 100 - low
od_descr.write([key_dock, 'notes'], f'{signal_name} {period}')
od_descr.write([key_dock, 'period'], period)
""" Variables 'low' et 'high' utilisables dans les paramètres étendus yaml. """
od_descr.write([key_dock, 'low'], low)
od_descr.write([key_dock, 'high'], high)
def pre_process(self):
""" - Passage unique, après descr_signal() et avant get_matrix(). voir docstring dans la classe-mère.
- Pré-traitement spécifique. """
pass
def calculate(self, pointer, nb_lines, root_key, vector_in, od_buffer):
_to = pointer + nb_lines
""" Si plusieurs périodes de rsi, key_dock se termine par 't14', 't22', ... """
for key_dock, d_val in self.od_descr.items():
if key_dock.startswith(root_key+'-t'):
num_col = d_val.get('num_col')
period = d_val.get('period')
vector_out = indic.rsi(vector_in, period)
nb_valid_lines = min(len(vector_out), nb_lines)
self.np_array[_to - nb_valid_lines: _to, num_col] = vector_out[-nb_valid_lines:]
Ce signal est un Renko de maille 5 : chaque point est à 5 pips du voisin (+ ou -).
Les signaux sont sur des plans différents, à l'instar d'un paysage vu par la fenêtre d'un train :
L'arrière plan (mouvement lent) peut faire des kilomètres, le premier plan quelques centimètres.
Les signaux convergent à l'instant présent.
Le seul point commun à toutes les courbes est à droite, à l'instant présent.
Bonjour les codeurs !