Moins de code, plus de rapidité.
Avant-propos
/functions
, /nodes
, /pc
et / show
en packages Python :
__init__.py
vide dans /functions
.__init__.py
vide dans /nodes
.__init__.py
vide dans /pc
.__init__.py
vide dans /show
..py
, et améliorez les imports comme dans le code de main.py
ci-après :
# Imports externes
.# Imports internes
.from ctrl_scene import CtrlScene
from pc.ctrl_scene import CtrlScene
← Ajout du préfixe pc.
/pc
: Normalement, il est affiché en bleu dans Pycharm, car nous l'avions marqué comme Sources Root
.
Clic droit > Mark Directory as > Unmark as Sources Root
.backups
.Signaux
, avec 2 nodes reliés : Plots-0
et Signaux-1
PC
(fenêtre main
).Voir
du node Plots
: Le graphique devrait s'afficher.D:\Robot\pc
robot
(Sur Pycharm c'est déjà fait).python main.py
Enter
en fin de saisie."ModuleNotFoundError: ...etc."
Robot
n'existe pas dans les variables d'environnement de Windows.PYTHONPATH
dans les variables d'environnement de Windows. (Voir explication détaillées sur Google).main.py
comme ceci :
# Imports externes.
from PyQt5.QtWidgets import QApplication, QMainWindow, QDialog, QFileDialog
from PyQt5.QtCore import QEvent
import os
import sys
sys.path.append(os.path.dirname(sys.path[0])) # Activer cette ligne pour debugger en ligne de commande.
# Imports internes.
from pc.dock_nodes import GroupNodes
from pc.ctrl_scene import CtrlScene
from functions.utils import Utils
Pendant la phase de développement, la ligne sys.path.append(...)
pourra être commentée.
python main.py
Voir
du node Plots
, la fenêtre Matplotlib
ne s'affiche pas.show_matplotlib.py
le même correctif :
sys.path.append(os.path.dirname(sys.path[0]))
python main.py
, cette fois-ci tout fonctionne.PYTHONPATH
est inutile.sys.path.append( ... etc.
)
main.py
et show_matplotlib.py
.Parameters.collapsed_state()
, commenter 2 lignes et modifier 1 ligne.parameters.py > Parameters.collapsed_state()
:
def collapsed_state(self, pt):
""" Ce code est exécuté à chaque affichage des paramètres d'un node ou de la scène.
- Lecture et application du dernier état mémorisé.
- Déclaration d'événements pour détecter les actions 'expanded-collapsed' produites par l'utilisateur.
|_ Ces événements appellent une fonction qui mémorise dans le pkl l'état de l'arbre des paramètres.
"""
def set_collapsed_state():
""" s_collapsed ne contient QUE les nodes collapsés. """
def branches(tree, parent=''):
if tree.param.hasChildren():
parent += '$' + tree.param.name()
if parent != '$params': # On ignore le tronc de l'arbre ('params').
if parent[8:] in s_collapsed:
tree.setExpanded(False)
for e in range(tree.childCount()):
branches(tree.child(e), parent)
# elif tree.param.name() == key_text and f'{parent[8:]}${key_text}' in s_collapsed:
elif tree.param.name() == key_text:
tree.setExpanded(False)
""" Lecture. """
s_collapsed = self.od_real.read([self.o_parent.s_id] + ['collapsed'], set())
branches(item_top)
def get_collapsed_state():
""" Exploration de l'arbre pour relever l'état des 'collapse'.
- Envoi de cet état au parent qui se charge de l'enregistrer.
- Note : s_collapsed ne contient QUE les nodes collapsés. """
def branches(tree, parent=''):
if tree.param.hasChildren():
parent += '$' + tree.param.name()
if parent != '$params' and not tree.isExpanded(): # On ignore le tronc de l'arbre ('params').
s_collapsed.add(parent[8:]) # On retire '$params$' du début.
for e in range(tree.childCount()):
branches(tree.child(e), parent)
# elif tree.param.name() == key_text and not tree.isExpanded():
# s_collapsed.add(f'{parent[8:]}${key_text}')
s_collapsed = set()
branches(item_top)
""" Écriture + Enregistrement. """
self.od_real.write([self.o_parent.s_id] + ['collapsed'], s_collapsed)
self.o_scene.o_pkl.backup()
item_top = pt.topLevelItem(0)
key_text = 'Provenance du signal :' # Clé du champ de type 'text'.
""" Événements. """
pt.itemExpanded.connect(get_collapsed_state)
pt.itemCollapsed.connect(get_collapsed_state)
set_collapsed_state()
Parameters.change()
.parameters.py > Parameters.change()
:
def change(self, _, l_params):
""" 1 - Mise à jour du dictionnaire od_real. """
o_param = l_params[0][0]
od_param = Dictionary(o_param.__dict__)
value = o_param.value()
if isinstance(value, QColor):
value = value.name() + hex(value.alpha())[2:]
l_keys = od_param.read(['opts', 'l_keys'])
self.od_real.write([self.o_parent.s_id] + l_keys, value)
if self.o_parent.__class__.__name__ == 'Node':
self.o_scene.o_ur.b_action = True
""" 2 - Enregistrement. """
self.o_scene.o_pkl.backup()
self.set_params()
self.o_parent.refresh(l_keys)
value.alpha
a été ajouté.
Description
Calcul
, qui hérite de CtrlCalcul
.ctrl_node.py > CtrlCalcul
est généraliste, c'est à dire qu'il concerne tous les nodes.
Calcul
.Calcul
est spécifique à chaque type de node. Il est composé de 3 parties :
descr_signal()
.
od_descr
, qui génère une signature par hachage.process()
.
numpy
, à plusieurs colonnes :
process()
.ctrl_node.py
par celui-ci.ctrl_node.py
:
# Imports externes
import copy
import shutil
import os
import hashlib
import numpy as np
import pickle
# Imports internes
from functions.utils import Dictionary, DateTime, Utils
from pc.ui_node import UiNode
from pc.parameters import Parameters
from pc.ctrl_socket import CtrlSocket
from pc.ctrl_edge import CtrlEdge
class Helper:
@staticmethod
def deepcopy(reference):
return copy.deepcopy(reference)
@staticmethod
def new_od(dic=None):
return Dictionary(dic)
@staticmethod
def join(*l_variants, sep='-'):
""" Exemple d'appel 1 : y = self.join(a, b, c, d, ...)
Exemple d'appel 2 : y = self.join(*l_x) <-- Ne pas oublier '*'
Concatène les éléments de l_strings en une chaine. Les chaînes vides sont ignorées. le séparateur est sep.
:param l_variants: Nombre quelconque d'éléments de tout type.
:param sep: '-' par défaut. Peut être '' (vide) ou \n (retour à la ligne) ... ou autre.
:return: Chaîne concaténée.
Exemple l_strings = ['Paris', 'Lille', '', 'Bruxelles', 'Prague'] => return "Paris-Lille-Bruxelles-Prague"
"""
string_txt = ''
for string in l_variants:
if string != '':
string_txt += f'{sep}{string}' # Accepte tout type de variable pour string.
return string_txt[len(sep):] # Suppression du 1er sep.
class CtrlNode(Helper):
def __init__(self, o_scene, s_id, pos):
self.o_scene = o_scene
self.s_id = s_id # Clé d'entrée dans le super-dictionnaire pkl.
self.od_pkl = self.o_scene.o_pkl.od_pkl
self.o_grnode = None
self.o_params = None
self.pos = pos # pos = (x, y)
self.width = 96
self.height = 60
self.type = ''
self.ut = Utils()
self.main_key = f"Paramètres du node '{self.s_id}'"
self.b_chk = bool(self.od_pkl.read([self.s_id, 'b_chk'], True))
self.b_on = self.b_chk
self.child_file = ''
""" Sockets. """
self.lo_sockets_in = list() # Liste d'objets 'sockets in' instanciés.
self.lo_sockets_out = list() # Liste d'objets 'sockets out' instanciés.
self.default_color = '#88bbff' # Couleur par défaut des sockets de sortie.
self.l_sep_inputs = [] # Emplacement des séparateurs (après les entrées indiquées).
self.l_sep_outputs = [] # Emplacement des séparateurs (après les sorties indiquées).
""" Contenu spécifique. """
self.o_grcontent = None
""" Secousses => Court-circuits. """
self.dt = DateTime()
self.l_short_circuits = list() # Surchargé par les classes dérivées.
self.l_shortables = list()
self.b_removable = True # Les edges d'entrée peuvent être détruits en cas de court-circuit.
self.k_shake = 0 # Nombre de secousses.
self.b_shaking = False # Secousse en cours.
self.x, self.y = 0, 0 # Position antérieure (x, y), mémorisée.
self.dx, self.dy = 0, 0 # Deltas (variations) antérieurs, mémorisés.
def setup(self, child_file):
""" Code appelant : classe dérivée, on connait donc ses attributs. """
""" Paramètres. """
self.o_params = Parameters(self)
""" Hauteur automatique du node : elle dépend du nombre de sockets en entrée ou en sortie (le plus grand). """
h_title = self.d_display['geometry']['h_title']
nb_sockets_max = max(len(self.ld_inputs), len(self.ld_outputs)) # En fait : Sockets + séparateurs.
first_y = 16 * (1 + (h_title + 4) // 16) # Multiple de 16.
self.height = first_y + nb_sockets_max * 16 - 4
""" Nouveau node par drag & drop. """
self.child_file = child_file
b_new = self.pos != (0, 0) # Nouveau node (node dropé).
if b_new:
""" - Cas d'un nouveau node (dropé).
- Son centre apparaît exactement à la position du curseur de la souris.
- On mémorise son path et sa position. """
self.pos = self.pos[0] - self.width // 2, self.pos[1] - self.height // 2
path = 'nodes' + os.path.relpath(child_file).split('nodes')[1][:-3].replace(os.sep, '.')
self.od_pkl.write([self.s_id, 'path'], path)
else:
self.pos = self.od_pkl.read([self.s_id, 'position'], (0, 0))
""" Instanciation de l'UI. """
self.o_grnode = UiNode(self) # Gestion de l'UI (Interface utilisateur) : dessin couleurs, ...
if b_new:
self.o_grnode.save_pos() # Important ! à la fin (contient o_pkl.backup()).
self.o_scene.o_grscene.addItem(self.o_grnode) # Incorporation dans la scène.
""" Sockets. """
num_socket = 0
for i, d_input in enumerate(self.ld_inputs):
""" On complète les dictionnaires de base. """
if isinstance(d_input, dict): # Si ce n'est pas un dictionnaire, c'est un séparateur.
d_input['pos'] = True, i # On ajoute l'emplacement. Tuple (input: True, num_position).
d_input['id'] = self.id, num_socket # On ajoute l'identifiant. Tuple (id_node, num_socket)
self.lo_sockets_in.append(CtrlSocket(self, d_input)) # Socket instancié et ajouté à la liste.
num_socket += 1
num_socket = 0
for i, d_output in enumerate(self.ld_outputs):
""" On complète les dictionnaires de base. """
if isinstance(d_output, dict): # Si ce n'est pas un dictionnaire, c'est un séparateur.
d_output['pos'] = False, i # On ajoute l'emplacement. Tuple (input: False, num_position).
d_output['id'] = self.id, num_socket # On ajoute l'identifiant. Tuple (id_node, num_socket)
self.lo_sockets_out.append(CtrlSocket(self, d_output)) # Socket instancié et ajouté à la liste.
num_socket += 1
self.short_circuit_check()
@property
def id(self):
""" Ex : 'Node 14' renvoie 14. """
return int(self.s_id[4:])
@property
def d_display(self):
""" Attributs par défaut pouvant être surchargés dans les classes dérivées. """
return {
'geometry': {'h_title': 32, 'round': 6},
'col_pen': {'select': '#ffa637', 'hover': '#888', 'on': "#000", 'off': '#aaaa00'}, # ordre = priorité.
'col_brush': {'on': "#8888bbcc", 'off': '#cccccccc'},
'thick_pen': {'hover': 4., 'leave': 2.}
}
def get_default(self):
""" Titre du node et couleurs des 'socket_out' pour chaque type de node. """
""" Complétée par les classes dérivées avec leurs paramètres statiques et dynamiques. """
""" 1) Paramètres communs : Titre, bouton et autant de couleurs que de sorties. """
d_default = {
'Titre du node': 'Choisir un titre',
'*🔆': '' # Un astérisque devant indique qu'il s'agit d'un bouton.
}
""" 2) Ajout de paramètres fixes spécifiques au type de node, AVANT les couleurs. """
d_default.update(self.fixed0_params()) # méthode fixed_params()
""" 3) Couleurs. """
d_colors = dict()
for d_output in self.ld_outputs:
if isinstance(d_output, dict):
d_colors[d_output['label']] = self.default_color
if d_colors:
d_default['Couleurs'] = d_colors
""" 4) Ajout de paramètres fixes spécifiques au type de node, APRES les couleurs. """
d_default.update(self.fixed_params()) # méthode fixed_params()
""" 5) Ajout des paramètres dynamiques, qui dépendent des signaux entrants. """
d_default.update(self.dynamic_params) # propriété dynamic_params
""" 6) Dictionnaire complet. Paramètres : fixes communs + fixes spécifiques + dynamiques. """
return {self.main_key: d_default}
def get_param(self, l_key, v_default=None):
if self.o_params is None:
return v_default
od_params = Dictionary(self.o_params.od_params[self.main_key])
return od_params.read(l_key, v_default)
def set_checked(self, val):
self.b_chk = val
""" Infrastructure. """
self.o_scene.infrastructure()
""" Enregistrement. """
self.od_pkl.write([self.s_id, 'b_chk'], self.b_chk)
self.o_scene.o_ur.b_action = True # Ajout dans l'historique du "Undo-Redo".
self.o_scene.o_pkl.backup()
""" ************************* Méthodes à surcharger ************************** """
@property
def ld_inputs(self):
return list()
@property
def ld_outputs(self):
return list()
def fixed0_params(self):
""" - Renvoie un dictionnaire de paramètres, indépendants des connexions.
- Placé AVANT les couleurs.
- Exemple : le node de type 'Labo' renvoie 'Expérience', etc. """
return {}
def fixed_params(self):
""" - Renvoie un dictionnaire de paramètres, indépendants des connexions.
- Placé APRES les couleurs.
- Exemple : le node de type 'Signaux' renvoie 'Sinus', 'Cosinus', etc. """
return {}
def bundle_params(self):
""" - Renvoie un dictionnaire de paramètres pour chaque entrée du node.
- Une entrée reçoit un faisceau de signaux (bundle). """
return {}
def my_params(self, context):
"""
:param context: Liste de 7 valeurs, pouvant être utilisée pour construire le dictionnaire de sortie :
- context[0] typ_id_from = Type + id du node en amont, producteur du signal. Ex : 'MM12'.
- context[1] signal_ante_from = Nom du signal créé par le node en amont du node producteur. Ex : 'Cosinus'.
- context[2] signal_now_from = Nom du signal créé par le node en amont. Ex : 'SMA18'.
- context[3] signal_source_from = 'Provenance du signal' affiché dans le node en amont.
- context[4] num_signal = Ordre du signal dans le faisceau entrant.
- context[5] signal_title = Titre du signal dans le dockable des paramètres.
- context[6] signal_source = 'Provenance du signal' -> texte avec retours à la ligne.
:return: Dictionnaire formaté pour les paramètres.
|_ Exemple {'Type de MA': ['SMA', {'values': ['SMA', 'EMA', 'SMMA', 'LWMA']}], "Périodes (sep=',')": '14'}
"""
return {}
def my_signals(self, l_signals_in, num_socket_out):
""" Description des signaux délivrés à la sortie N° num_socket_out.
Comment ça marche ?
- Cette fonction prend en entrée tous les signaux entrants ainsi que les paramètres pkl.
- Un traitement spécifique est décrit, qui produit plusieurs signaux.
- Ces signaux sont ensuite distribués aux différentes sorties du node.
:param l_signals_in: Signaux d'entrée. Cette liste contient autant d'éléments que d'entrées connectées.
- Chaque élément est un tuple : (typ_id_from, signal_ante_from, signal_now_from, signal_source_from, \
num_signal, signal_title, typ_id, signal_ante, signal_source)
(Les variables suffixées de '_from' proviennent du node en amont).
:param num_socket_out: N° de la sortie.
:return: Liste de signaux exclusivement destinés à la sortie N° num_socket_out.
Chaque signal est décrit par un tuple à 4 valeurs : (typ_id, signal_ante, signal_now, signal_source).
"""
raise SystemExit(self.child_file + '\nCtrlNode.my_signals() : Cette méthode doit être surchargée.')
""" *********************** Fin - Méthodes à surcharger ********************** """
@property
def dynamic_params(self):
""" Code appelant : self.get_default().
Ces paramètres sont dynamiques car ils dépendent des signaux connectés aux entrées.
- Ce node demande la liste des signaux à chaque socket d'entrée : o_socket_in.l_signals
- Cette liste est formatée de la même façon pour tous les types de node.
- C'est une liste de tuples, chaque tuple a 4 valeurs et caractérise un signal.
- Il est rappelé qu'un edge véhicule non pas un signal, mais un faisceau de signaux (un faisceau).
- Le format d'un tuple est le suivant :
- typ_id_from = Type + id du node en amont, producteur du signal. Ex : 'MM12'.
- signal_ante_from = Nom du signal créé par le node en amont du node producteur. Ex : 'Cosinus'.
- signal_now_from = Nom du signal créé par le node producteur. Ex : 'SMA18'.
- signal_source_from = Texte destiné à être affiché dans le dockable, dans 'Provenance du signal :'
- Node
|_ Socket_in
|_ Faisceau de signaux
|_ Signal
|_ Paramètre dynamique.
"""
if not self.lo_sockets_in:
""" Tant que l'initialisation n'est pas terminée, lo_sockets_in est une liste vide."""
return {}
d_params = dict()
nb_inputs = len(self.lo_sockets_in)
for k in range(nb_inputs):
d_input = self.bundle_params() # Paramètres pour cette entrée.
""" Récupération en cascade : Socket_in -> Edge -> Socket_out -> Node -> Socket_in -> etc. """
for num_signal, t_signal in enumerate(self.lo_sockets_in[k].l_signals):
typ_id_from, signal_ante_from, signal_now_from, signal_source_from = t_signal[:4] # tuple à 4 valeurs.
signal_title = self.join(typ_id_from, signal_ante_from, signal_now_from)
signal_source = self.join(signal_source_from, self.join(typ_id_from, signal_now_from), sep='\n')
""" Paramètres communs à tous les signaux. """
d_input[signal_title] = {
'$Provenance du signal :': [signal_source, {'readonly': True}],
'Signal actif': True,
}
l_args = [num_signal, signal_title, signal_source] + list(t_signal)
d_input[signal_title].update(self.my_params(l_args))
d_params[f'Entrée {k}'] = d_input
if nb_inputs == 1 and 'Entrée 0' in d_params:
""" S'il n'y a qu'une entrée, inutile d'écrire, dans le titre des paramètres, de laquelle il s'agit. """
d_params = d_params['Entrée 0']
return d_params
def get_signals(self, num_socket):
""" Code appelant : oSocket_out N° num_socket.
- Ce node crée des signaux et les fournit aux nodes en aval par l'intermédiaire des sockets et des edges.
- Il crée autant de faisceaux de signaux (vecteurs) qu'il a de sorties.
- Pour chaque sortie, la liste fournie est formatée de la même façon pour tout type de node.
- Voir détails ci-dessus, dans dynamic_params().
"""
if not self.lo_sockets_out: # Setup en cours
return []
""" Update od_params. """
self.o_params.set_params()
nb_inputs = len(self.lo_sockets_in)
l_all_signals_in = list()
for k in range(nb_inputs):
""" Le nom de l'entrée se sera pas écrit dans le titre des paramètres, s'il n'y en a qu'une. """
l_name_input = [f'Entrée {k}'] if nb_inputs > 1 else []
l_signals = list() # Liste des signaux actifs de l'entrée N° k.
""" self.lo_sockets_in[k].l_signals = Faisceau de signaux arrivant sur l'entrée k. """
for num_signal, t_signal in enumerate(self.lo_sockets_in[k].l_signals):
typ_id_from, signal_ante_from, signal_now_from, signal_source_from = t_signal[:4] # tuple à 4 valeurs.
signal_title = self.join(typ_id_from, signal_ante_from, signal_now_from)
if self.get_param(l_name_input + [signal_title, 'Signal actif'], False):
typ_id, signal_ante = f'{self.type}{self.id}', self.join(signal_ante_from, signal_now_from)
signal_source = self.join(signal_source_from, self.join(typ_id_from, signal_now_from), sep='\n')
l_signals.append(t_signal[:4] + (num_signal, signal_title, typ_id, signal_ante, signal_source))
l_all_signals_in.append(l_signals)
return self.my_signals(l_all_signals_in, num_socket)
""" ******************************* Secousses ******************************* """
def short_circuit_check(self):
""" Passage unique dans cette méthode, au démarrage. Fin programme si erreur.
- Peut-on supprimer les edges d'entrée en cas de court-circuit ?
- Oui, à condition que TOUTES les sorties existent dans self.l_short_circuits. """
l_indx_outputs = list() # Liste des index de sortie.
d_counter = dict()
for short_circuit in self.l_short_circuits: # Exemple -> self.l_short_circuits = [(0, 0), (0, 2)]
num_input = short_circuit[0]
num_output = short_circuit[1]
if num_input >= len(self.ld_inputs):
""" Fin programme. """
raise SystemExit(f"La liste des court-circuits du type de node '{self.type}' est erronée."
f"\nL'entrée N°{num_input} est hors limites."
f"\nRevoir le contenu de self.l_short_circuits dans le setup de '{self.type}'.")
if num_output >= len(self.ld_outputs):
""" Fin programme. """
raise SystemExit(f"La liste des court-circuits du type de node '{self.type}' est erronée."
f"\nLa sortie N°{num_output} est hors limites."
f"\nRevoir le contenu de self.l_short_circuits dans le setup de '{self.type}'.")
l_indx_outputs.append(num_output) # Dans cet exemple -> ( 0 , 2 )
if num_output in d_counter:
d_counter[num_output] += 1
else:
d_counter[num_output] = 1
for i, d_output in enumerate(self.ld_outputs):
if isinstance(d_output, dict):
if i not in l_indx_outputs:
""" Condition non satisfaite. """
self.b_removable = False # b_removable est à True dans __init__().
break
""" Contrainte technologique : Plusieurs entrées sur une même sortie est interdit ! """
for nb_inputs in d_counter.values():
if nb_inputs > 1:
""" Fin programme. """
raise SystemExit("Plusieurs entrées sont court-circuitables sur une même sortie."
f"\nRevoir le contenu de self.l_short_circuits dans le setup de '{self.type}'.")
def set_shortables(self):
""" Méthode appelée lorsque le bouton gauche de la souris est appuyé (sur ce node).
- Chaque type de node possède sa liste de court-circuits faisables.
- Retourne une liste de tuples.
- Chaque tuple est composé des 2 gr_edges pouvant être réunis pour n'en faire qu'un. """
self.l_shortables = list()
for short_circuit in self.l_short_circuits: # Exemple -> self.l_short_circuits = [(0, 0), (0, 2)]
lo_gredges_in = list(self.lo_sockets_in[short_circuit[0]].get_gredges())
lo_gredges_out = list(self.lo_sockets_out[short_circuit[1]].get_gredges())
if lo_gredges_in: # Cette liste contient 0 ou 1 edge.
for o_gredge_out in lo_gredges_out:
self.l_shortables.append((lo_gredges_in[0], o_gredge_out))
def shake(self):
def shortcircuit():
for to_gredge in self.l_shortables: # Tuples de gredges.
o_socket_out = to_gredge[0].o_edge.o_socket_out # From, depuis.
o_socket_in = to_gredge[1].o_edge.o_socket_in # To, jusqu'à.
""" Suppression des 2 edges du tuple. """
self.o_scene.o_grscene.removeItem(to_gredge[1]) # Edge de sortie.
if self.b_removable:
self.o_scene.o_grscene.removeItem(to_gredge[0]) # Edge d'entrée.
""" Création du nouvel edge de court-circuit. """
CtrlEdge(self.o_scene, o_socket_out, o_socket_in)
o_socket_in.to_update()
""" Persistance. """
self.o_scene.save_edges(backup=True)
""" Infrastructure. """
self.o_scene.infrastructure()
def raz_shakes():
""" Après cette raz, un nouveau court-circuit est autorisé. """
self.k_shake = 0
self.b_shaking = False
if self.b_shaking or not self.l_shortables:
return
self.dt.delay(raz_shakes, delay=1000)
x, y = self.o_grnode.pos().x(), self.o_grnode.pos().y() # Positions.
dx, dy = x - self.x, y - self.y # Déplacements (delta x, delta y).
b_cond = (round(dx*self.dx) < 0) or (round(dy*self.dy) < 0)
""" Mémorisation. """
self.x, self.y = x, y
self.dx, self.dy = dx, dy
if b_cond:
""" Le mouvement a changé de sens => incrémentation du compteur. """
self.k_shake += 1
if self.k_shake > 10: # Valeur à ajuster.
self.b_shaking = True
self.k_shake = 0
self.dt.delay(shortcircuit, delay=50)
""" ***************************** Fin secousses ***************************** """
def set_colors(self, l_keys):
""" - La couleur est appliquée au socket_out concerné.
- Elle est également appliquée à tous ses éventuels edges connectés.
- A leur tour, ceux-ci propagent cette couleur à leur socket d'arrivée. """
for o_socket_out in self.lo_sockets_out:
if o_socket_out.__class__.__name__ == 'CtrlSocket':
if o_socket_out.label == l_keys[-1]:
""" Affectation de la couleur au socket_out concerné. """
o_socket_out.color = self.get_param(l_keys, '#666')
""" Récupération de tous les edgess partant de ce socket. """
l_gredges = list(o_socket_out.get_gredges())
for o_gredge in l_gredges:
""" Affectation de la même couleur à chaque edge. """
o_gredge.o_edge.color = o_socket_out.color
""" Affectation de la même couleur à chaque socket d'arrivée. """
o_gredge.o_edge.o_socket_in.color = o_socket_out.color
""" Rafraîchissement de l'affichage. """
self.o_grnode.update()
def need_update(self, l_keys):
"""
:param l_keys: Clé, dans od_params, du paramètre modifié (liste).
:return: True si les nodes en aval nécessitent d'être recalculés, False sinon.
"""
l_param_names = list(self.my_params('')) + list(self.fixed_params()) # Les paramètres du dockable.
b_actif = self.get_param([l_keys[1], 'Signal actif'], True) # True 'Signal actif' n'existe pas (générateurs).
""" Update nécessaire (si l'état actif a changé) OU (si actif ET un des paramètres a changé). """
return (l_keys[-1] == 'Signal actif') or (b_actif and l_keys[-1] in l_param_names)
def get_state(self):
""" Chaque node, selon ses spécificités, retourne un booléen de son état on/off (True/False). """
b_on = False
if self.b_chk:
for key in self.o_params.od_params.key_list():
if key[-1] == 'Signal actif':
if self.o_params.od_params.read(key, False):
b_on = True
break
return b_on
def recalculate(self, num_input):
""" - Propagation vers l'aval. Les nodes finaux sont des afficheurs ou des actionneurs.
- Après cela, les nodes finaux construisent leur dict de calcul par propagation vers l'amont. """
for o_socket_out in self.lo_sockets_out:
o_socket_out.recalculate()
def update_dcalc(self, d_calc):
if self.b_on:
""" Propagation amont. """
for o_socket_in in self.lo_sockets_in:
o_socket_in.update_dcalc(d_calc)
od_params = Dictionary(self.o_params.od_params[self.main_key])
""" Construction du dictionnaire pour les calculs, à partir du od_params. """
od_calc = Dictionary()
path = 'nodes' + os.path.relpath(self.child_file).split('nodes')[1][:-3].replace(os.sep, '.')
od_calc.write('Compute', self.type)
od_calc.write('Yaml file', self.get_yaml_files()[0])
od_calc.write('Compile from', path)
od_calc.write('Checked', self.b_chk)
for key in od_params.key_list():
if key[0] == 'Titre du node' or key[0] == 'Couleurs' or key[0] == '*🔆':
continue
if key[-1] == '$Provenance du signal :':
continue
od_calc.write(key, od_params.read(key))
""" Modification 'en place'. """
d_calc[self.s_id] = dict(od_calc)
def get_yaml_files(self):
py_file = os.path.basename(self.child_file)
seeder = self.child_file.replace(py_file, f'{py_file[:-3]}_seeder.yaml')
path_node = f"{__file__.split('pc')[0]}backups/{self.o_scene.graph_name}/node{self.id}"
extended_params = os.path.abspath(f'{path_node}/{self.id}-{py_file[:-3]}.yaml')
return extended_params, seeder
def yaml(self):
""" Méthode appelée lorsqu'on clique sur le bouton '🔆'. Lancement de l'appli associée à '.yaml'. """
extended_params, seeder_file = self.get_yaml_files()
path_node = os.path.dirname(extended_params)
os.makedirs(path_node, exist_ok=True)
if not os.path.exists(extended_params):
""" Création du fichier de paramètress étendus en yaml : copie du seeder si possible, sinon vide. """
if os.path.exists(seeder_file):
shutil.copyfile(seeder_file, extended_params)
else:
with open(extended_params, 'w'):
pass
if not os.path.exists(extended_params):
return
""" Lancement du fichier avec application associée. """
os.startfile(extended_params)
def rebuild(self, b_input=True):
""" Reconstruction du node suite à un changement dynamique du nombre d'entrées/sorties. """
""" Suppression des (sockets + edges) de ce node. """
lo_sockets = self.lo_sockets_in if b_input else self.lo_sockets_out
for o_socket in lo_sockets:
""" Suppression des éventuels edges connectés à ce socket. """
for o_gredge in list(o_socket.get_gredges()):
self.o_scene.o_grscene.removeItem(o_gredge)
""" Suppression du socket. """
self.o_scene.o_grscene.removeItem(o_socket.o_grsocket)
o_socket.o_grsocket = None # Suppression de l'objet.
""" Nettoyage de la liste des entrées/sorties. """
lo_sockets.clear()
""" Redessine le node avec sa nouvelle hauteur. """
ld_inputs, ld_outputs = self.deepcopy(self.ld_inputs), self.deepcopy(self.ld_outputs)
nb_inputs, nb_outputs = len(ld_inputs), len(ld_outputs)
h_title = self.d_display['geometry']['h_title']
first_y = 16 * (1 + (h_title + 4) // 16)
self.height = first_y + max(nb_inputs, nb_outputs) * 16 - 4 # Nouvelle hauteur.
self.o_grnode.set_outline()
""" Régénération de sockets. """
for pos, d_input in enumerate(ld_inputs):
""" On complète les dictionnaires de base. Pas de séparateurs. """
if isinstance(d_input, dict):
indx = len(self.lo_sockets_in)
d_input['pos'] = True, pos # On ajoute la pos. Tuple (input: True, num_position).
d_input['id'] = self.id, indx # On ajoute l'identifiant. Tuple (id_node, num_socket)
self.lo_sockets_in.append(CtrlSocket(self, d_input)) # Socket instancié et ajouté à la liste.
for pos, d_output in enumerate(ld_outputs):
""" On complète les dictionnaires de base. Pas de séparateurs. """
if isinstance(d_output, dict):
indx = len(self.lo_sockets_out)
d_output['pos'] = False, pos # On ajoute la pos. Tuple (output: False, num_position).
d_output['id'] = self.id, indx # On ajoute l'identifiant. Tuple (id_node, num_socket)
self.lo_sockets_out.append(CtrlSocket(self, d_output)) # Socket instancié et ajouté à la liste.
""" Régénération des edges. """
self.o_scene.show_edges()
self.o_scene.save_edges()
""" Dockable des paramètres : le nombre d'entrées a changé. """
self.o_params.set_params()
self.dt.delay(self.o_scene.show_params, 2000) # delay : Permet de continuer la saisie dans le dockable.
def refresh(self, l_keys):
if l_keys[-1] == '🔆':
self.yaml()
elif l_keys[-1] == 'Titre du node':
""" Titre du node. """
self.o_grnode.set_title()
elif l_keys[-2] == 'Couleurs':
""" Couleur des sockets et des edges. """
self.set_colors(l_keys[1:])
else:
""" Fin de refresh - Appel conditionnel à infrastructure(). """
if self.b_chk and (l_keys[0] == 'infrastructure' or l_keys[-1] == 'Signal actif'):
""" Les nodes de type générateur doivent insérer le mot 'infrastructure'
en 1ère place dans la liste l_keys[], car ils n'ont pas 'Signal actif' dans leurs paramètres. """
self.o_scene.infrastructure()
""" Tous les paramètres, autres que le titre et les couleurs, influencent les calculs. """
self.o_scene.recalculate()
class CtrlCalcul(Helper):
""" Attributs de classe. """
d_servers = dict() # Dictionnaire de classe.
od_calcs = Dictionary()
def __init__(self):
self.id = None
self.s_id = None
self.compute = None
self.o_yaml = None
self.l_mykey = list()
self.np_array = None
self.od_descr = Dictionary()
self.od_mydic = Dictionary()
self.len_buffer = 300_000
self.no_params = ['Compute', 'Yaml file', 'Compile from', 'Checked']
# def setup_server(self, id_server):
# self.id = id_server
# self.l_mykey = self.get_keys_from_idnode(id_server) # Clé dans self.od_calcs.
# self.od_mydic = Dictionary(self.od_calcs.read(self.l_mykey))
# self.o_yaml.yaml_file = self.od_mydic.read(['Yaml file'])
# self.compute = self.od_mydic.read(['Compute'])
# self.s_id = f"{self.compute}{self.id}"
# self.o_yaml.update_odyaml() # Update du super-dictionnaire des paramètres (self.o_yaml.od_yaml).
#
# def get_matrix(self, pointer, nb_lines=0):
# """ - Pour certains nodes, les datas ne sont pas dans 'self.np_array', mais en base de données.
# - Le cas échéant, cette méthode sera surchargée.
# @param pointer: Pointeur "Depuis", si pointer<0 , la totalité du tableau est retourné.
# @param nb_lines: "Jusqu'a" = pointer + nb_lignes.
# """
# if self.np_array.__class__.__name__ == 'ndarray':
# return self.np_array if pointer < 0 else self.np_array[pointer: pointer + nb_lines] # Tout ou slice.
#
# @property
# def dt_servers_in(self):
# """ - Le dictionnaire retourné contient autant de clés que d'entrées connectées et actives.
# - Chaque valeur est un tuple : (o_server, signature, edge). """
#
# """ 1 - Récupération de la liste de tous les edges dans self.od_calcs. """
# s_edges = set()
# ll_keys = self.od_calcs.key_list()
# for l_keys in ll_keys:
# if l_keys[-1] == 'edges':
# s_edges.update(self.od_calcs.read(l_keys))
#
# """ 2 - Instance et signature des nodes-serveurs en amont connectés aux entrées. """
# d_servers = dict()
# for edge in list(s_edges):
# if edge[1][0] == self.id:
# """ edge = Edge connecté à l'entrée de self. """
# o_server, signature = self.get_server(edge[0][0])
# d_servers[f'e{edge[1][1]}'] = o_server, signature, edge
#
# return d_servers
#
# @classmethod
# def get_oserver(cls, id_server, compile_from):
# """ Ce code est appelé par un node-client ayant des entrées. Il aura un serveur par entrée.
# - Chaque node-serveur est associé à une instance de classe Calcul (dérivée de CtrlCalcul).
# - Au premier passage concernant ce node-serveur, l'objet o_server n'existe pas.
# - Dans ce cas, on le crée par compilation dynamique et on l'enregistre dans un dictionnaire.
# - Le dictionnaire est un dictionnaire de classe : cls.d_servers.
# - Cela permet d'éviter de créer une nouvelle instance si celle-ci existe déjà.
# """
# server_sid = f"{compile_from.split('.')[-1]}{id_server}" # Clé du dictionnaire des serveurs. ex : macd3
# if server_sid not in cls.d_servers:
# """ Compilation dynamique. Si l'objet o_server n'existe pas, on le crée."""
# d_context = {}
# code = f"from {compile_from} import Calcul; o_server=Calcul()"
# try:
# compiled = compile(code, '<string>', 'exec')
# eval(compiled, d_context)
# cls.d_servers[server_sid] = d_context['o_server']
# except (Exception,) as err:
# print(server_sid, ': Erreur de compilation dans', compile_from, '\n', err)
# return None
# return cls.d_servers[server_sid] # Objet o_server.
#
# def get_keys_from_idnode(self, id_node):
# """
# @param id_node: ex : 1
# @return: Renvoie la clé d'accès au node 'node_sid', dans le dictionnaire général self.od_calcs.
# """
# node_sid = f'Node{id_node}' # Ex : 'Node1'
# for l_key in self.od_calcs.key_list():
# if l_key[1] == node_sid:
# return l_key[:2] # ['calc_*', 'Node*'] Clé du node-serveur dans le dictionnaire général self.od_calcs.
#
# def get_server(self, id_server):
# l_key_node = self.get_keys_from_idnode(id_server)
# compile_from = self.od_calcs.read(l_key_node + ['Compile from'])
# if compile_from is None:
# return None
# child = self.od_calcs.read(l_key_node + ['Expérience'], '')
# compile_from += f'0{child}'[-2:] if isinstance(child, int) else ''
# o_server = self.get_oserver(id_server, compile_from)
# if o_server is None:
# return None, None
# o_server.setup_server(id_server)
# return o_server, o_server.get_signature()
#
# def get_signature(self):
# l_signs_in = list()
# for server_in in self.dt_servers_in.values():
# """ server_in = tuple (instance, signature) du node-serveur. """
# l_signs_in.append(server_in[1]) # server_in[1] = signature.
#
# """ Assemblage des constituants de la signature. """
# params = (self.o_yaml.od_yaml, self.od_mydic, l_signs_in)
# return f'{self.id}-{hashlib.sha256(str(params).encode("utf-8")).hexdigest()}'
#
# def calculation(self, num_out):
# """ Ici node-serveur. Première méthode appelée par le node-client en aval, avant de commencer ses calculs. """
# """ 1 - Chargement des derniers datas mémorisés en *.calc (pickle) : self.od_descr et self.np_array. """
# if self.id == 1:
# a = self.np_array
#
# self.load_last_datas(num_out) # Super-dictionnaire et tableau numpy vides si *.calc inexistant ou erroné.
#
# """ 2 - Création du dict. de description actualisé od_descr, aux fins de comparaison avec self.od_descr. """
# od_descr = self.setup_od_descr()
#
# """ 3 - Comparaison des descriptifs now et ante. Si aucune différence, return (calculs inutiles). """
# if not self.update_od_descr(od_descr):
# return
#
# """ 4 - Propagation des calculs en amont. """
# for o_server, _, edge in self.dt_servers_in.values():
# """ Bien que ce soit cette même méthode, ce n'est pas une récursivité, mais une propagation amont."""
# o_server.calculation(edge[0][1])
#
# """ 5 - Traitement des calculs. """
# self.process()
#
# """ 6 - Persistance en *.calc (pickle). """
# self.create_out_calc(num_out)
#
# def load_last_datas(self, num_out):
# """ Ici, node-serveur. Récupération des derniers datas depuis son fichier out_*.calc. Les datas sont :
# - Un dictionnaire descriptif des consignes : 'self.od_descr'.
# - Un tableau numpy : 'self.np_array'.
# - Pour certains nodes, les datas ne sont pas dans out_*.cal, mais dans une base de données.
# - Le cas échéant, cette méthode sera surchargée. """
# server_root = os.path.dirname(self.o_yaml.yaml_file)
# os.makedirs(server_root, exist_ok=True)
# out_file = os.path.normpath(f'{server_root}/out.calc')
#
# try:
# with open(out_file, 'rb') as calc:
# self.od_descr, self.np_array = pickle.load(calc)
# if not self.od_descr or self.np_array.size == 0:
# raise() # On force le passage par except.
# except (Exception,):
# """ Initialisation. """
# self.od_descr, self.np_array = self.new_od(), np.empty((self.len_buffer, 0))
# if not self.dt_servers_in:
# self.od_mydic.write(['self_server', 'Signal actif'], True)
#
# def setup_od_descr(self):
# """ Création du super-dictionnaire descriptif des calculs : od_descr.
# - Une clé pour chaque signal, actif ou pas, ayant exactement le même libellé que celui
# qui est affiché dans le dockable du node-client. Exemple pour un node 'Signaux' : 'Signaux1-Sinus'
# - La valeur de cette clé est un dictionnaire ayant pour clés obligatoires :
# - Diverses valeurs, spécifiques au node, nécessaires aux calculs.
# - num_col : Entier. Défini au dernier passage, sinon -1, destinée aux nodes-clients.
# - A la racine du dictionnaire, outre les clés de signaux, il y a :
# - error : Booléen. Erreur générale. Par exemple, ici, on vérifie que le tableau numpy
# a bien le nombre de colonnes indiqué dans le dictionnaire de description.
# - signatures : liste de signatures sha256. Une signature par entrée. Pour le node 'Signaux', 0.
# Ces signatures nous permettent de savoir si les signaux entrants ont changé.
# - Valeur de retour : od_descr.
# """
# od_descr = Dictionary()
# """ Signature = ma signature + signatures des nodes-serveurs. """
# od_descr['signature'] = self.get_signature()
# for _, signature_in, _ in self.dt_servers_in.values():
# od_descr['signature'] += signature_in
#
# for l_keys in self.od_mydic.key_list():
# if l_keys[-1] != 'Signal actif':
# continue
# my_sid = self.s_id
# if l_keys[0].startswith('Entrée '):
# key = l_keys[:2]
# my_sid += '.' + l_keys[0][-1]
# else:
# key = l_keys[:1]
# val = self.od_mydic.read(key)
# root_key = self.join(*[my_sid] + key[-1].split('-')[1:])
# self.descr_signal(od_descr, val, root_key)
# for key_dock in od_descr.keys():
# if key_dock.startswith(root_key):
# if key_dock.endswith('-Original'):
# try:
# od_descr.write([key_dock, 'actif'], val['Original'])
# except (Exception,):
# pass
# else:
# od_descr.write([key_dock, 'actif'], val['Signal actif'])
# return od_descr
#
# def descr_signal(self, od_descr, val, root_key):
# """ Méthode surchargée par les classes dérivées. """
# """ Ici, node-serveur. Création du dictionnaire de description 'od_descr'.
# Les clés de 'od_descr' contiennent les données nécessaires aux calculs ...
# ... sauf la clé 'num_col', utilisée par le client qui exploite le tableau numpy créé par les calculs. """
# raise SystemExit(f"{self.s_id} : ctrl_node.py > CtrlCalcul."
# "\nLa méthode descr_signal() doit obligatoirement être surchargée.")
#
# def update_od_descr(self, od_descr):
# nb_cols = 0
# for key_dock in od_descr.key_list():
# if key_dock[-1] == 'actif':
# num_col_ante = self.od_descr.read(key_dock[:-1] + ['num_col'])
# num_col = -1 if num_col_ante is None else num_col_ante
# nb_cols += 1 if num_col >= 0 else 0
# od_descr.write(key_dock[:-1] + ['num_col'], num_col)
#
# b_changed = self.np_array.size != self.len_buffer * nb_cols # Taille avant (nb de valeurs dans np_array)
# b_changed = b_changed or od_descr != self.od_descr
# if b_changed:
# """ Le node-serveur a changé => réinitialisation du tableau numpy de sortie. """
# self.np_array = np.empty((self.len_buffer, 0))
# for l_key in od_descr.key_list():
# if l_key[-1] == 'num_col':
# od_descr.write(l_key, -1)
# self.od_descr = od_descr
# return b_changed
#
# def process(self):
# """ Méthode surchargée par les classes dérivées.
# 1 - Tous les calculs nécessaires sont écrits dans des méthodes bien documentées.
# - Elles peuvent recevoir les paramètres suivants :
# - Les signaux aux entrées du node.
# - Les paramètres du dockable.
# - Les paramètres du yaml.
# - Elles génèrent un vecteur ou une matrice numpy.
# 2 - Création du super-dictionnaire descriptif des calculs : od_descr (voir ci-dessous self.set_od_descr()).
# 3 - En cas d'erreur générale, on refait tous les calculs.
# 4 - Sinon, on ne refait que les calculs nécessaires. Pour chaque signal :
# - On prépare les arguments nécessaires.
# - On appelle la méthode spécifique à son traitement.
# - Le vecteur ou la matrice de retour est incorporée aux tableaux numpy de sortie.
# 5 - sha .....
# 6 - Dictionnaire de directives pour le client.
# """
# raise SystemExit(f"{self.s_id} : ctrl_node.py > CtrlCalcul."
# f"\nLa méthode process() doit obligatoirement être surchargée.")
#
# def create_out_calc(self, num_out):
# """ Création du fichier-résultat out_*.calc = (od_descr, liste de tableaux numpy). """
#
# """ 1 - Contrôle - Le tableau nd_array doit avoir une shape à 2 dimensions. """
# """ Correction nécessaire si le tableau est un vecteur. """
# if len(self.np_array.shape) == 1: # <-- C'est un vecteur, on reshape.
# self.np_array = self.np_array.reshape(self.np_array.shape[0], 1)
#
# """ 2 - Persistance. """
# server_root = os.path.dirname(self.od_mydic['Yaml file'])
# out_file = os.path.normpath(f'{server_root}/out.calc')
#
# try:
# with open(out_file, 'wb') as calc:
# pickle.dump((self.od_descr, self.np_array), calc, pickle.HIGHEST_PROTOCOL)
# except (Exception,) as err:
# print(err)
# return False
# return True
#
# def add_vector(self, key_dock, num_col, vector):
# if vector is None:
# return
# vector = vector.reshape(vector.shape[0], 1)
# add_nan = np.full((self.len_buffer - vector.shape[0], 1), np.nan)
# vector = np.vstack((add_nan, vector))
# if num_col is None or num_col < 0:
# """ Nouvelle colonne d'indice num_col (égal au nombre actuel de colonnes). """
# num_col = self.np_array.shape[1]
# self.np_array = np.hstack((self.np_array, vector))
# else:
# """ Surcharge de la colonne num_col. """
# self.np_array[:, num_col:num_col + 1] = vector
# self.od_descr.write([key_dock, 'num_col'], num_col)
#
# def get_vector_in(self, client_key): # client_key = MM4-Normal-Original-Original-Original
# sliced_key = self.join(*client_key.split('-')[1: -1]) # Normal-Original-Original (extrémités retirées).
# for l_key in self.od_descr.key_list(): # [Aléatoire1-Normal, 'num_col']
# if l_key[-1] == 'num_col' and l_key[-2].endswith(sliced_key):
# num_col = self.od_descr.read(l_key)
# return self.np_array[:, num_col]
La majorité du code de la classe CtrlCalcul est commentée. Nous allons l'activer progressivement.
ShowMatPlotLib
par celle-ci.show_matplotlib.py
:
# Imports externes. *******************************************
from PyQt5.QtCore import QTimer
import psutil
import pickle
import numpy as np
import sys
import os
from matplotlib import pyplot as plt, rcParams
sys.path.append(os.path.dirname(sys.path[0]))
# Imports internes. *******************************************
from functions.utils import Utils, Dictionary, DateTime
from nodes.afficheurs.plots.matplotlib_yaml import YamlParams
from nodes.afficheurs.plots.plots import Calcul
# noinspection PyUnresolvedReferences
class ShowMatPlotLib(Calcul):
def __init__(self):
super().__init__()
""" Ini: récupération des arguments passés en ligne de commande. """
self.graph_name = sys.argv[1]
self.id = int(sys.argv[2])
self.parent_pid = int(sys.argv[3])
self.final_path = ''
self.ut = None
""" Watch. """
self.l_files_ante = list()
""" Matplotlib. """
self.fig = None
self.l_ax = [None for _ in range(8)] # Liste de 8 valeurs (les axes, ou graphiques : 1 par entrée)
self.b_busy = False
self.b_paused = False
self.nb_points = 200
self.b_reverse = False
""" Graphiques. """
self.l_inputs = list()
""" Timers. """
self.dt = DateTime()
self.geometry = None
self.listen = QTimer()
self.anim = QTimer()
""" Setup. """
self.setup()
def setup(self):
""" 1 - Chemin complet du node final 'Plots'. Ce chemin contient les modèles de calculs 'calc_*.pkl. """
bk_path = os.path.dirname(__file__).replace('show', 'backups') # Dossier de backup.
self.final_path = os.path.normpath(os.path.join(bk_path, self.graph_name, f'node{self.id}')) # 'Plots'
self.ut = Utils(path=self.final_path, file='matplotlib')
""" 2 - Aide pour la mise au point : styles et paramètres diponibles dans matplotlib. Décommenter pour voir. """
# [print(style) for style in plt.style.available] # Liste des styles disponibles.
# Dictionary(rcParams).print() # Liste des paramètres disponibles.
""" 3 - Fichier de configuration, éditable manuellement en yaml. """
# self.o_yaml = YamlParams(self)
# self.o_yaml.yaml_file = os.path.normpath(f'{self.final_path}/{self.id}-matplotlib.yaml')
# self.o_yaml.update_odyaml()
# self.o_yaml.set_rcparams(rcParams)
# plt.style.use(self.o_yaml.get_style())
""" 4 - Vitesse d'animation (Modifiable : touches + et -), pause, reverse, pas à pas. """
# self.anim.indx = 5 # <-- Vitesse moyenne (de 0 à 11)
# speed = self.speed(self.anim.indx)
# self.anim.delay = speed[0]
# self.anim.step = speed[1]
""" 5 - Initialisation de matplotlib.pyplot. """
self.fig = plt.figure() # .93
self.mgr.set_window_title('**ini**')
""" 6 - Restauration de l'état de la fenêtre. """
self.ut.restore_state(self.mgr.window)
""" 7 - Surveillance des fichiers '.pkl'. """
# self.ini_watch()
""" 8 - Événements. """
self.listen.timeout.connect(self.listener)
# self.anim.timeout.connect(self.show_anim)
# self.fig.canvas.mpl_connect('key_press_event', self.key_event)
""" 9 - Lancement des timers. """
self.listen.start(1000)
# self.anim.start(self.anim.delay)
""" 10 - Boucle d'exécution Matplotlib. """
plt.show()
@property
def mgr(self):
return plt.get_current_fig_manager()
def listener(self):
""" 1 - Si le poste de contrôle est fermé => fin programme, sauf en mode autonome. """
if self.parent_pid > 0 and not psutil.pid_exists(self.parent_pid):
exit()
""" 2 - Persistance géométrie. """
geom = self.mgr.window.geometry()
geometry = geom.x(), geom.y(), geom.width(), geom.height()
if self.geometry != geometry:
self.ut.save_state(self.mgr.window)
self.geometry = geometry
# def show_anim(self):
# def draw_subplot():
# def draw_line():
# """ Attribution des valeurs. """
# try:
# o_line.set_xdata(x)
# o_line.set_ydata(y[:, num_col])
# except (Exception,):
# pass
#
# if ax is None or not hasattr(ax, 'o_server') or ax.o_server is None:
# return
# x = np.arange(ax.pointer, ax.pointer + self.nb_points)
# y = ax.o_server.get_matrix(ax.pointer, self.nb_points)
# if y is None or np.isnan(y).all() or len(y.shape) == 1 or y.shape[1] == 0:
# return # Nombre de courbes dans le subplot en cours == 0.
#
# """ Boucle sur les courbes. """
# y_min, y_max = np.inf, -np.inf
# for o_line in ax.lines:
# try:
# num_col = int(o_line.od['num_col'])
# except (Exception,):
# continue
# y_min, y_max = min(y_min, np.nanmin(y[:, num_col])), max(y_max, np.nanmax(y[:, num_col]))
#
# """ Dessin d'une courbe. """
# draw_line()
#
# """ Limites x et y (abscisses et ordonnées). """
# try:
# ax.set_xlim(ax.pointer, ax.pointer + self.nb_points) # Abscisses.
# except (Exception,):
# pass
# if y_min != np.inf:
# y_padding = max(1, (y_max - y_min) * 0.05) # Marges top et bottom du graphique (5%)
# y_min, y_max = y_min - y_padding, y_max + y_padding
# ax.set_ylim(y_min, y_max) # Ordonnées.
#
# self.o_yaml.between(ax, x, y) # Paramètres étendus pour le subplot.
#
# """ Avancement du pointeur. """
# if len(ax.lines) > 0: # Immobile si aucune courbe n'est affichée.
# ax.pointer += -self.anim.step if self.b_reverse else self.anim.step
# if ax.pointer > self.len_buffer - self.nb_points: # Bouclage provisoire.
# ax.pointer = 0
# if ax.pointer < 0: # Bouclage provisoire.
# ax.pointer = self.len_buffer - self.nb_points
#
# if self.b_busy:
# return
# self.b_busy = True
# """ Boucle asyncio sur les subplots. """
# for ax in self.l_ax:
# draw_subplot()
#
# plt.draw()
# self.b_busy = False
# def ini_watch(self):
# """ Liste des éléments à surveiller : dossier {graphe} + dossiers {nodes} + fichiers .pkl + fichiers .yaml
# - Le dossier est self.final_path, dans les backups.
# - Les fichiers calc_*.pkl : un par entrée active de ce node d'affichage.
# - Les fichiers .yaml : valeurs des clés 'Yaml file' dans les .pkl """
#
# """ 1 - Dossier du graphe dans les backups :dossier-parent de tous les nodes du graphe. """
# graph_path = os.path.normpath(f"{os.path.dirname(__file__).replace('show', 'backups')}/{self.graph_name}")
#
# """ 2 - Liste des dossiers des nodes du graphe (qui contiennent des fichiers nécessaires aux calculs) """
# l_folders = [graph_path] + [f'{graph_path}{os.sep}{node}' for node in os.listdir(graph_path) if
# str(node).startswith('node')]
#
# """ 3 - Liste des fichiers calc_*.pkl physiquement présents dans le dossier self.final_path """
# l_watched_files = [os.path.normpath(f'{self.final_path}/{calc}') for calc in
# os.listdir(self.final_path) if calc.startswith('calc_')]
#
# """ 4 - Liste des fichiers .yaml : chaque od_calc (extrait du pkl) contient plusieurs clés 'Yaml file'.
# On en fait une liste sans doublons. """
# s_yaml = set() # set() au lieu de list() pour s'affranchir des doublons.
# for path, folder, l_files in os.walk(graph_path):
# [s_yaml.add(os.path.normpath(f'{path}/{file}')) for file in l_files if os.path.splitext(file)[1] == '.yaml']
#
# """ 5 - Concaténation des listes précédentes. """
# l_watched_files += list(s_yaml) # Concaténation des listes de fichiers.
# l_files = l_folders + l_watched_files # Concaténation des listes : dossiers et fichiers.
#
# """ 6 - Vérification des changements dans cette liste, mais non dans le contenu des fichiers. """
# if self.l_files_ante != l_files:
# self.l_files_ante = self.ut.deepcopy(l_files) # Mémorisation (copie profonde).
# self.ut.watch_file(l_files, self.watch) # (Re)lancement de la surveillance.
# self.dt.delay(self.watch, 10, l_watched_files)
def watch(self, l_files):
""" l_files : liste contenant les dossiers et fichiers modifiés."""
files_only = [file for file in l_files if os.path.isfile(file)] # Filtrage : que les fichiers.
if len(files_only) == 0:
""" 1 - l_files ne contient que des dossiers : cela signifie que des fichiers ont été ajoutés ou supprimés.
Par conséquent on relance la méthode ini_watch() pour mettre à jour la liste des fichiers à surveiller. """
self.ini_watch()
return
print(l_files)
# """ 2 - Lecture de tous les calc_*.pkl (un par entrée) et écriture de leur contenu dans self.od_calcs. """
# Calcul.od_calcs.clear() # Attribut de la classe CtrlCalcul.
# l_calcs = [(f'calc_{i}', os.path.normpath(f'{self.final_path}/calc_{i}.pkl')) for i in range(8)]
# for name_input, calc_file in l_calcs:
# """ ex : name_input = 'calc_0' """
# try:
# with open(calc_file, 'rb') as pk:
# self.od_calcs.write(name_input, pickle.load(pk))
# except (Exception,):
# pass
#
# """ 3 - Création ou update des paramètres étendus od_yaml de o_yaml. """
# self.o_yaml.update_odyaml()
#
# """ 4 - La méthode principale update_figure() est chargée des mises à jour des calculs et des affichages. """
# self.update_figure()
# def update_figure(self):
# """ Update suite aux modifications .pkl et .yaml """
# """ 1 - Titre de la fenêtre. """
# local = ' - Mode local' if len(sys.argv) < 6 else '' # Nombre d'arguments passés en ligne de commande.
# title_ante = self.mgr.get_window_title()
# node_sid = f'Node{self.id}'
# b_first = title_ante == '**ini**' # b_first = Premier passage, au lancement de l'appli.
# for key in self.od_calcs.keys():
# name_input = key
# title_now = self.od_calcs.read([name_input, node_sid, 'Fenêtre', 'Titre'], 'Graphiques')
# break
# else:
# title_now = 'Graphiques'
# title_now += local
# l_inputs = [key for (key, val) in self.od_calcs.items() if val['edges']] # Entrées actives seulement.
# if title_now != title_ante:
# self.mgr.set_window_title(title_now)
# """ 1.1 - Si seul le titre de la fenêtre a changé, inutile de continuer (sauf au 1er passage). """
# if l_inputs == self.l_inputs and not b_first:
# return True
#
# """ 2 - Paramètres étendus yaml. """
# self.o_yaml.set_rcparams(rcParams)
#
# """ 3 - Subplots : création, update, géométrie. Chaque entrée est associée à un subplot.
# - Celui-ci est créé ou mis à jour en fonction des changements de grille (nb d'entrées ou yaml). """
# if self.o_yaml.grid_modified or l_inputs != self.l_inputs: # Respecter l'ordre de comparaison !
# self.l_inputs = self.ut.deepcopy(l_inputs)
# d_grids = self.o_yaml.get_dgrids()
# for num_input in range(8):
# self.update_ax(d_grids, num_input)
#
# """ 4 - Paramètres des subplots (un par entrée active) et de leurs courbes. """
# for num_input in range(8): # self.l_inputs = ['calc_0', 'calc_1', ...]
# name_input = f'calc_{num_input}'
# if name_input not in l_inputs:
# self.l_ax[num_input] = None
# continue
#
# """ 4.1 - Pour cette entrée : recherche de son serveur et de sa signature. """
# self.l_mykey = [name_input, node_sid] # Clé de od_mydic dans od_calcs.
# self.od_mydic = Dictionary(self.od_calcs.read(self.l_mykey))
#
# ax = self.l_ax[num_input] # Copie par référence.
# edge = self.get_edge(name_input) # ex : edge = ((1, 0), (0, 0))
# ax.o_server, signature = self.get_server(edge[0][0]) # edge[0][0] = id du node serveur.
# ax.num_input = num_input
# if ax.o_server is None:
# continue
# print(ax.o_server)
#
# """ 4.2 - Calculs et paramètres des courbes. """
# if not hasattr(ax, 'signature') or ax.signature != signature:
# ax.signature = signature
# ax.o_server.calculation(edge[0][1]) # edge[0][1] = num socket out du node serveur.
# print(ax.o_server.np_array.shape)
#
# """ 4.3 - Paramètres des courbes. """
# self.update_subplot(ax)
#
# """ 5 - Actualisation du buffer d'affichage nécessaire lorsque il est en pause. """
# self.show_anim()
#
# """ 6 - Affichage à l'écran du contenu du buffer d'affichage. """
# plt.draw()
# def get_edge(self, name_input): # ex: name_input = 'calc_1'.
# """ Recherche du socket_out emetteur (directement connecté au socket_in 'name_input' de ce node 'Plots'). """
# s_edges = self.od_calcs.read([name_input, 'edges'])
# for edge in s_edges:
# if edge[1][0] == self.id:
# return edge
# def update_subplot(self, ax):
# """ Mise à jour des paramètres de ce subplot (axe), ainsi que de chacune de ses courbes.
# @param ax: objet subplot.
# @return: NA.
# """
#
# def update_line():
# """ Code d'appel : point 2 un peu plus bas.
# https://matplotlib.org/stable/api/_as_gen/matplotlib.lines.Line2D.html """
# """ 2.1 - Affectation des attributs aux courbes : couleur, épaisseur, style, légende. """
# o_line.od = Dictionary(od_lines.read(o_line.name))
# if not o_line.od:
# return
# num_col = ax.o_server.od_descr.read([o_line.name, 'num_col'])
# o_line.od.write('Entrée', f'Entrée {ax.num_input}')
# o_line.od.write('num_col', num_col)
# o_line.set_color(o_line.od.read('Couleur'))
# o_line.set_linewidth(o_line.od.read('Épaisseur'))
# o_line.set_linestyle(self.get_linestyle(o_line.od.read('Style')))
# o_line.set_label(o_line.od.read('Légende'))
# self.o_yaml.line_params(o_line) # Paramètres étendus pour cette courbe.
#
# """ 2.2 - Exemple de compilation dynamique au niveau de la courbe 'o_line'. Décommenter si nécessaire. """
# # d_context = {'line': o_line}
# # self.o_yaml.compile([f'Entrée {num_input}', o_line.name, 'Code'], d_context)
#
# """ 1 - Actualisation de la liste des courbes ax.lines.
# - Si 'Plots' a 1 seule entrée, clé de type ['calc_0', 'Node0'], sinon ['calc_0', 'Node0', 'Entrée e']. """
# l_keys = self.l_mykey + [f'Entrée {ax.num_input}'] # ['calc_0', 'Node0', 'Entrée 0']
# od_lines = Dictionary(self.od_calcs.read(l_keys, self.od_calcs.read(self.l_mykey)))
# self.update_lines(ax.num_input, od_lines) # Suppression et ajout de courbes.
#
# """ 2 - Mise à jour de chacune des courbe. """
# for o_line in ax.lines: # Parcours des courbes de ce subplot (de cette entrée de node).
# update_line()
#
# """ 3 - Exemple de compilation dynamique au niveau du subplot 'num_input'. Décommenter si nécessaire. """
# # d_context = {'subplot': ax}
# # self.o_yaml.compile([f'Entrée {num_input}', 'Code'], d_context)
#
# """ 4 - Paramètres étendus yaml. """
# self.o_yaml.subplot(self.l_ax, ax.num_input)
# def update_lines(self, num_input, od_plots):
# """ Actualisation de la liste de courbes self.l_ax[num_input].lines[].
# On compare les listes de coubes nécessaires et de courbes existantes.
# On en déduit celles qu'il faut ajouter, celles qu'il faut supprimer. """
#
# """ 1) Courbes nécessaires : l_needed. """
# l_needed = list()
# for key, val in od_plots.items():
# if isinstance(val, dict) and 'Signal actif' in val and val['Signal actif']:
# l_needed.append(key)
#
# """ 2) Courbes existantes : l_exist. """
# l_exist = self.line_exist(num_input)
#
# """ 3) Ajout (création) de courbes. """
# for needed in l_needed:
# if needed not in l_exist:
# o_line, = self.l_ax[num_input].plot(np.empty(0)) # [num_input] = Élément 0 de la liste.
# o_line.name = needed
#
# """ 4) Suppression de courbes. """
# for exist in l_exist:
# if exist not in l_needed:
# o_line = self.get_line(num_input, exist)
# if o_line is not None:
# self.l_ax[num_input].lines.remove(o_line)
# def line_exist(self, num_input):
# """ Appelée par update_lines(). """
# l_lines = list()
# for line in self.l_ax[num_input].lines:
# l_lines.append(line.name)
# return l_lines
# def get_line(self, num_input, line_name):
# """ Appelée par update_lines(). """
# for o_line in self.l_ax[num_input].lines:
# if o_line.name == line_name:
# return o_line
# return None
# @staticmethod
# def get_linestyle(param_style):
# if param_style.startswith('___'):
# return '-'
# elif param_style.startswith('- -'):
# return '--'
# elif param_style.startswith('. .'):
# return ':'
# return '-.'
# def update_ax(self, d_grids, num_input):
# """ On vérifie les 8 inputs, connectés ou pas. """
# name_input = f'calc_{num_input}'
# ax = self.l_ax[num_input]
#
# if name_input not in self.l_inputs:
# """ Vérification des entrées non connectées aussi, afin éviter des affichages indésirables."""
# if ax is not None:
# """ Ce subplot a existé, il n'existe plus. """
# ax.set_visible(False) # Suppression des graphiques (subplots) fantômes.
# self.l_ax[num_input] = None
# return
#
# """ Le subplot {ax} contient l'attribut 'pointer' qui disparait après cet update.
# Par conséquent, on le sauvegarde d'abord. """
# if ax is None:
# pointer = 0
# else:
# ax.set_visible(False) # Suppression des graphiques (subplots) fantômes.
# pointer = ax.pointer
#
# """ Update du subplot {ax}. add_axes() permet un positionnement absolu, totalement libre. """
# ax = self.fig.add_axes(d_grids[name_input]) # Position absolue. x, y, w, h. Valeurs entre 0 et 1.
#
# """ Réintégration de l'attribut 'pointer'. """
# ax.pointer = pointer
#
# self.l_ax[num_input] = ax
# @staticmethod
# def speed(indx):
# """ Calcul empirique. """
# values = [(3000, 1), (2000, 1), (1200, 1), (640, 1), (480, 1), (320, 1), # <-- Lents.
# (280, 2), (296, 4), (273, 7), (280, 14), (286, 26), (282, 47), ] # <-- Rapides.
# return values[indx]
# def key_event(self, ev):
# """ Appui sur une touche du clavier. """
# keycode = ev.key
# if keycode == ' ':
# """ Pause on/off. """
# self.b_paused = not self.b_paused
# self.anim.stop() if self.b_paused else self.anim.start(int(self.anim.delay))
# elif keycode == '+':
# """ Accélérer. """
# self.anim.stop()
# self.anim.indx += 1
# self.anim.indx = min(self.anim.indx, 11)
# speed = self.speed(self.anim.indx)
# self.anim.delay = speed[0]
# self.anim.step = speed[1]
# self.anim.start(int(self.anim.delay))
# elif keycode == '-':
# """ Ralentir. """
# self.anim.stop()
# self.anim.indx -= 1
# self.anim.indx = max(self.anim.indx, 0)
# speed = self.speed(self.anim.indx)
# self.anim.delay = speed[0]
# self.anim.step = speed[1]
# self.anim.start(int(self.anim.delay))
# elif keycode == 'tab':
# self.b_reverse = not self.b_reverse
# elif keycode == 'right' or keycode == 'left':
# self.b_paused = True
# self.anim.stop()
# step = self.anim.step
# self.anim.step = 1 if keycode == 'right' else -1
# self.show_anim()
# self.anim.step = step
if __name__ == '__main__':
""" Les 3 premières lignes permettent un fonctionnement autonome, en phase de mise au point.
- Elles simulent l'ajout d'arguments en ligne de commande.
- Elles définissent le nom du graphe ainsi que l'id du node 'Plots'.
- Elles pourront être commentées en phase de production.
"""
sys.argv.append('Signaux') # Nom du graphe.
sys.argv.append('0') # Plots id. (La ligne de commande ne doit contenir que du str).
sys.argv.append('-1') # Gestion de la fermeture automatique. Poste de contrôle PID : -1 en local.
""" Lancement de l'appli. """
mpl = ShowMatPlotLib()
La majorité du code est commentée. Nous allons l'activer progressivement.
ShowMatPlotLib
hérite de Calcul
et non plus de ShowBase
.
ShowBase
devient inutile, on supprime le fichier /show/show_base.py
.plots.py
par cellui-ci.plots.py
:
# Imports externes
from PyQt5.QtWidgets import QWidget, QVBoxLayout, QPushButton
from PyQt5.QtCore import Qt, QTimer
import pickle
import os
import psutil
# Imports externes
from pc.ctrl_node import CtrlNode, CtrlCalcul
d_datas = {
'name': 'Plots',
'icon': 'plots.png',
}
class Node(CtrlNode):
def __init__(self, o_scene, s_id, pos):
super().__init__(o_scene, s_id, pos)
self.type = 'Plots'
self.l_sep_inputs = [2] # Emplacement des séparateurs
self.scripts = dict()
self.graph_pid = None # PID du process affichant le graphique.
self.path_node = ''
self.ld_calc = [None for i in range(8)] # Liste de 8 None.
self.clock = QTimer()
self.setup()
def setup(self, child_file=__file__):
self.o_grcontent = UiContentPlots(self)
self.clock.timeout.connect(self.detect_graph_closed)
self.scripts = {
'Matplotlib': 'show_matplotlib.py',
'Pyqtgraph': 'show_pyqtgraph.py',
'Plotly': 'show_plotly.py',
'Bokeh': 'show_bokeh.py',
'Cufflinks': 'show_cufflinks.py',
'Dash': 'show_dash.py',
'Guiqwt': 'show_guiqwt.py',
'OpenCV': 'show_opencv.py',
'Seaborn': 'show_seaborn.py',
}
super().setup(child_file)
self.init_calc()
def init_calc(self):
""" Persistance dans un pkl. Un sous-dossier par node. """
self.path_node = os.path.abspath(
f"{os.path.dirname(__file__).split('nodes')[0]}backups/{self.o_scene.graph_name}/node{self.id}")
os.makedirs(self.path_node, exist_ok=True) # Création dossier. Si ce dossier existe, pas d'erreur renvoyée.
""" Affectaion de self.ld_calc[] : Lecture des modèles de calcul mémorisés. """
for num_input in range(len(self.lo_sockets_in)):
fic_pkl = os.path.abspath(f'{self.path_node}/calc_{num_input}.pkl')
if os.path.isfile(fic_pkl):
try:
""" Lecture du fichier pkl. """
with open(fic_pkl, 'rb') as pk: # Pas d'encoding pour les fichiers binaires.
self.ld_calc[num_input] = pickle.load(pk)
except (Exception,):
pass
@property
def ld_inputs(self):
""" Lecture des paramètres pour connaître le nombre d'entrées. """
nb_inputs = self.get_param(["Nombre d'entrées"], 1) # 1 entrée par défaut.
l_inputs = list()
for k in range(nb_inputs):
""" Ajout des entrées. """
label = 'gr' if nb_inputs == 1 else f'gr {k}'
l_inputs.append({'label': label, 'label_pos': (6, -10)})
""" Ajout des séparateurs (on évite un séparateur en fin de liste). """
if k in self.l_sep_inputs and k < nb_inputs - 1:
l_inputs.append('sep')
return l_inputs
def fixed_params(self):
""" Valeurs par défaut. """
return {
"Nombre d'entrées": [1, {'step': 1, 'limits': (1, 8)}],
'Fenêtre': {
'Titre': 'Titre de la fenêtre',
'Interface': ['Matplotlib', {'values': list(self.scripts.keys())}]
}
}
def bundle_params(self):
""" Valeurs par défaut. """
return {
'Titre': 'Titre du graphique',
}
def my_params(self, context):
""" Voir documentation dans la classe-mère : CtrlNode.my_params(). """
palette = ['ff0000', 'aa007f', '55aa00', '0000ff', 'ff5500', 'aa557f', '00aa00', 'ff00ff', '55aaff', '00ff7f']
ident = context[7] if len(context) >= 8 else context[5]
legend = self.join(context[4], ident) # {Nom du signal}-{traitement}. Ex : 'Cosinus-SMA18'.
indx = context[0] % len(palette) # Ordre du signal dans le faisceau entrant.
return {
'Légende': legend,
'Couleur': '#' + palette[indx],
'Épaisseur': [1., {'step': .1, 'limits': (.1, 7.)}],
'Style': ['__________', {
'values': ['__________', '- - - - - - - - -', '. . . . . . . . .', '_ . _ . _ . _ .',
'_ . . _ . . _ . .']}]
}
def get_yaml_files(self):
py_file = os.path.basename(__file__)
seeder_name = f"{self.get_param(['Fenêtre', 'Interface']).lower()}"
seeder = __file__.replace(py_file, seeder_name) + '_seeder.yaml'
path_node = f"{os.path.dirname(__file__).split('nodes')[0]}backups/{self.o_scene.graph_name}/node{self.id}"
extended_params = os.path.abspath(f'{path_node}/{self.id}-{seeder_name}.yaml')
return extended_params, seeder
def recalculate(self, num_input):
d_calc = {'edges': set()}
o_socket_in = self.lo_sockets_in[num_input]
o_socket_in.update_dcalc(d_calc)
od_params = self.new_od(self.o_params.od_params[self.main_key])
od_calc = self.new_od()
od_calc.write('Compute', self.type)
od_calc.write('Yaml file', self.get_yaml_files()[0])
od_calc.write('Checked', self.b_chk)
for key in od_params.key_list():
if key[0] == 'Titre du node' or key[0] == 'Couleurs' or key[0] == '*🔆':
continue
if key[-1] == '$Provenance du signal :' or key[-1] == "Nombre d'entrées":
continue
b_add = True
if key[0].startswith('Entrée'):
t_socket = self.id, int(key[0][7:])
b_add = False
for edge in d_calc['edges']:
if t_socket in edge:
b_add = True
break
if b_add:
od_calc.write(key, od_params.read(key))
d_calc[self.s_id] = dict(od_calc)
self.fix_colors()
""" On n'enregistre le pkl que s'il a été modifié. """
if d_calc != self.ld_calc[num_input]:
fic_pkl = os.path.abspath(f'{self.path_node}/calc_{num_input}.pkl')
try:
with open(fic_pkl, 'wb') as pk:
pickle.dump(d_calc, pk, pickle.HIGHEST_PROTOCOL)
self.ld_calc[num_input] = self.deepcopy(d_calc) # Mémorisation.
except (Exception,):
pass
def fix_colors(self):
""" Dès qu'une couleur par défaut est attribuée, on l'enregistre dans le pkl afin d'éviter des changements
lorsque le N° de courbe change. Exemple : on a [Sinus, Cosinus, Carré] produits par 'Signaux' :
- Carré est à l'indice 2, donc prend par défaut la couleur palette[2].
- On désactive Cosinus, donc Carré passe à l'indice 1 => Couleur par défaut = palette[1].
- Pour éviter ce défaaut visuel, on enregistre la couleur dès son attribution.
- En effet, les paramètres enregistrés dans le pkl sont prioritaires par rapport à ceux par défaut.
"""
b_modified = False
for l_keys in self.o_params.od_params.key_list():
if l_keys[-1] == 'Couleur':
color = self.o_params.od_params.read(l_keys)
if self.od_pkl.read([self.s_id] + l_keys) is None:
self.od_pkl.write([self.s_id] + l_keys, color)
b_modified = True
if b_modified:
self.o_scene.o_pkl.backup()
def refresh(self, l_keys):
if l_keys[-1] == "Nombre d'entrées":
self.rebuild()
""" L'entrée ajoutée ou retirée peut modifier l'infrastructure. """
l_keys = ['infrastructure'] + l_keys
super().refresh(l_keys)
def detect_graph_closed(self):
if self.graph_pid is None or not psutil.pid_exists(self.graph_pid):
self.o_grcontent.button.setText('Voir')
self.clock.stop()
def show_figure(self):
""" Fonctions internes. """
def get_fic(nom_afficheur):
return os.path.abspath(f"{os.path.dirname(__file__).split('nodes')[0]}show/{nom_afficheur}")
def close_figure():
if self.graph_pid is None:
return
for proc in psutil.process_iter():
if proc.pid == self.graph_pid:
proc.kill()
""" Désélection de tous les items de la scène, puis sélection de moi-même. """
for item in self.o_scene.o_grscene.items():
item.setSelected(False)
self.o_grnode.setSelected(True)
ui_fig = self.get_param(['Fenêtre', 'Interface'])
nom_projet = self.o_scene.graph_name
appli = ["pythonw", get_fic(self.scripts[ui_fig]), nom_projet, str(self.id), str(os.getpid())]
if self.graph_pid is None or not psutil.pid_exists(self.graph_pid):
""" Ne peut être lancé qu'une fois. """
self.graph_pid = psutil.Popen(args=appli).pid
self.o_grcontent.button.setText('Fermer')
self.clock.start(3000)
else:
close_figure()
self.o_grcontent.button.setText('Voir')
class UiContentPlots(QWidget):
""" Widgets affichés dans le node (ici, un bouton). L'affichage est assuré par 'UiNode.initContent()'. """
def __init__(self, o_parent):
super().__init__()
self.o_parent = o_parent
""" Le style est dans qss """
self.setGeometry(40, 38, 10, 10)
self.layout = QVBoxLayout()
self.layout.setContentsMargins(0, 0, 0, 0) # g, h, d, b
self.setLayout(self.layout)
""" Bouton : le style est dans qss/nodestyle.qss. """
self.button = QPushButton('Voir', self)
self.button.setCursor(Qt.PointingHandCursor)
self.layout.addWidget(self.button)
self.button.clicked.connect(self.o_parent.show_figure)
class Calcul(CtrlCalcul):
def __init__(self):
super().__init__() # Plots est client => d_map doit exister, même vide.
self.od_calcs = CtrlCalcul.od_calcs
show_matplotlib.py
.
Point de départ : nous sommes prêts à activer le code commenté, progressivement.
*.pkl, *.calc et *.yaml
.
ShowMatPlotLib.setup()
, activer (décommenter) le point 7 : "Surveillance de ..."ini_watch()
. A la fin, celle-ci appelle la méthode watch().watch()
est également appelée automatiquement par le watcher.watch()
contient un print()
de mise au point permettant de voir les dossiers et fichiers modifiés.
print()
.watch()
, (pas le 3 !).
update_figure()
: "Titre de la fenêtre".
matplotlib_yaml
par celui-ci./nodes/afficheurs/plots/matplotlib_yaml.py
:
""" Mise en garde importante :
Cette classe YamlParams est intimement couplée aux fichiers de paramètres étendus yaml de chaque node 'Plots'.
Les méthodes ci-dessous, ainsi que les fichiers seeders sont volontairement limités à certains exemples pratiques.
En effet les possibilités sont infinies et leur étude sort du cadre de ce cours.
Il vous appartient de vous en inspirer pour augmenter cette classe en ajoutant vos propres méthodes.
"""
# Imports externes
from matplotlib import pyplot as plt
# Imports internes
from nodes.yaml_parent import YamlParent
class YamlParams(YamlParent):
def __init__(self, o_calc):
super().__init__(o_calc)
self.grid_ante = []
self.l_geom = list()
def get_dgrids(self):
""" Return un dictionnaire : une clé par entrée active. Pour chacune, définition de sa géométrie à l'écran. """
d_geometry = dict()
default_fig_margins = [0, 0, 3, 0] # [ft, fr, fb, fl] -> [top, right, bottom, left]. <-- en % (0 à 100)
fig_margins = self.od_yaml.read(['Figure', 'marges'], default_fig_margins)
for indx, name_input in enumerate(self.o_calc.l_inputs):
num_input = int(name_input[-1]) # 0 à 7
default_geometry = self.get_default_grid(indx) + default_fig_margins # 12 valeurs par défaut.
size = self.od_yaml.read([f'Entrée {num_input}', 'Géométrie', 'taille'], default_geometry[:4])
margins = self.od_yaml.read([f'Entrée {num_input}', 'Géométrie', 'marges'], default_geometry[4:8])
geometry = size + margins + fig_margins # 12 valeurs réelles.
for i, val in enumerate(geometry):
if not isinstance(val, (int, float)):
geometry[i] = default_geometry[i]
px, py, pw, ph, pt, pr, pb, pl, ft, fr, fb, fl = geometry # p* = Plot margin *, f* = Figure margin *.
fw = 100-fl-fr
fu = fw/100 # fu = Figure Unit = 1% de fw
_pw = pw*fu
x = (fl + px*fu + pl*fu)/100
w = (_pw - (pl+pr)*fu)/100
fh = 100-ft-fb
fu = fh/100 # 1% de fh
_ph = ph*fu
y = (100 - (ft + (py+ph-pb)*fu))/100 # (100 - (0 + 0*1 + (50-0)*.5))/100
h = (_ph - (pt+pb)*fu)/100
d_geometry[name_input] = x, y, w, h
return d_geometry
def get_default_grid(self, indx):
""" Disposition par défaut des subplots : 1 à 8 """
nb_inputs = len(self.o_calc.l_inputs)
if nb_inputs == 0:
return []
xywh, trbl = [], [5, .5, 1.5, 5] # top, right, bottom, left. <-- en % (0 à 100).
if nb_inputs < 4:
h = 100 / nb_inputs
y = indx * h
xywh = [0, y, 100, h] # x, y, width, height. <-- en % (0 à 100).
elif nb_inputs in [4, 6, 8]:
h = 200 / nb_inputs
x = (indx % 2) * 50
y = h * (indx // 2)
xywh = [x, y, 50, h]
elif nb_inputs in [5, 7]:
h = 200 / (nb_inputs + 1)
if indx == 0:
x, y, w = 0, 0, 100
else:
i = indx - 1
x = (i % 2) * 50
y = h + h * (i // 2)
w = 50
xywh = [x, y, w, h]
return xywh + trbl
@property
def grid_modified(self):
""" Grille de disposition des subplots ou géométrie(s) modifiées depuis le fichier yaml.
@return: Bool """
l_geom = [self.od_yaml.read(['Figure', 'marges'])]
for i in range(8):
l_geom.append(self.od_yaml.read([f'Entrée {i}', 'Géométrie']))
b_modified = l_geom != self.l_geom
self.l_geom = l_geom # Mémorisation (pour le prochain passage).
return b_modified
""" Fenêtre. Relancer l'affichage pour la prise en compte des modifications. """
def set_rcparams(self, rc_params):
d_params = self.od_yaml.read(['Fenêtre', 'rcParams'])
if not isinstance(d_params, dict):
d_params = dict()
for key, val in d_params.items():
rc_params[key] = val
def get_style(self):
style = self.od_yaml.read(['Fenêtre', 'style'], 'seaborn')
if style not in plt.style.available:
style = 'seaborn'
return style
""" Subplots. """
def subplot(self, l_ax, num_input):
ax = l_ax[num_input]
self.legend(ax, num_input) # Cartouche de légendes : visibilité, position, nb colonnes, couleurs.
self.axes_xy(ax, num_input) # Axe abscisse et Axe ordonnée : visibilités, polices, positions.
""" Cadre : visibilité. """
b_cadre = self.od_yaml.read([f'Entrée {num_input}', 'Cadre'], True)
ax.set_frame_on(b_cadre)
""" Titre : Texte vient du dockable, visibilité vient du yaml. """
title = self.o_calc.od_mydic.read('Titre', self.o_calc.od_mydic.read([f'Entrée {num_input}', 'Titre']))
title_visible = self.od_yaml.read([f'Entrée {num_input}', 'Titre visible']) # True, False ou None.
title_visible = False if title_visible is False else True
ax.title.set_text(title)
ax.title.set_visible(title_visible)
""" Réticule : liste de 3 valeurs [axis, linestyle, color]. axis = 'x', 'y' ou 'both' """
l_grid = self.od_yaml.read([f'Entrée {num_input}', 'Réticule']) # True, False ou None.
ax.grid(False)
try:
ax.grid(axis=l_grid[0], linestyle=l_grid[1], color=l_grid[2])
except (Exception,):
pass
""" Axes couplés. """
xaxis_pilot = self.od_yaml.read([f'Entrée {num_input}', 'ShareX']) # (0 à 6) ou None.
if isinstance(xaxis_pilot, int) and (0 <= xaxis_pilot < 7) and l_ax[xaxis_pilot] is not None:
try:
l_ax[xaxis_pilot].get_shared_x_axes().join(l_ax[xaxis_pilot], ax)
ax.pointer = l_ax[xaxis_pilot].pointer
except(Exception,):
print(f"Yaml.matplotlib, erreur de saisie dans Entrée {num_input} / ShareX.")
def between(self, ax, x, y):
""" Coloriage d'une zône entre 2 limites. Chacune des limites peut être :
- Une courbe : Une portion du nom suffit pour identifier le signal.
(Vérifier qu'il n'y ait pas risque de doublon)
- Un nombre (int ou float) constant => Ligne horizontale.
- Une variable existant dans le dictionnaire od_descr du node-serveur.
- Le format d'écriture dans yaml est le suivant : {portion du nom de la courbe}${nom de la variable}
- Exemple pour le RSI : courbes: [ Normal, Normal$low ] <-- Voir rsi.py """
def get_oline(line_name_or_int): # ex : line_name = 'Signaux1-Cosinus'
""" Renvoie l'objet line (courbe) à partir d'une partie du nom affiché dans le dockable des paramètres. """
for o_line in ax.lines:
if isinstance(line_name_or_int, str):
line = line_name_or_int.split('$')
if line[0] in o_line.name:
if len(line) == 1:
return o_line
else:
var = line[1]
value = ax.o_server.od_descr.read([o_line.name, var], False)
return value if isinstance(value, (int, float)) else False
else:
return line_name_or_int if isinstance(line_name_or_int, (int, float)) else False
name_input = f"Entrée {ax.num_input}"
color_between = self.od_yaml.read([name_input, 'color between'])
if not isinstance(color_between, list):
return
""" Nettoyage pour éviter les cumuls. """
try:
if hasattr(ax, 'filled'):
for fill in ax.filled:
ax.collections.remove(fill)
except (Exception,):
pass
ax.filled = list()
for d_between in color_between:
for key, val in d_between.items():
if not isinstance(val, dict):
continue
try:
o_line0 = get_oline(val['courbes'][0])
o_line1 = get_oline(val['courbes'][1])
if o_line0 is False or o_line1 is False:
continue
b_active, num_col0, num_col1 = True, None, None
if hasattr(o_line0, 'od'):
num_col0 = o_line0.od['num_col']
b_active = b_active and o_line0.od['Signal actif']
if hasattr(o_line1, 'od'):
num_col1 = o_line1.od['num_col']
b_active = b_active and o_line1.od['Signal actif']
b_active = b_active and o_line0 is not None and o_line1 is not None
if not b_active:
return
colors = val['couleurs']
y0 = o_line0 if isinstance(o_line0, (int, float)) else y[:, num_col0]
y1 = o_line1 if isinstance(o_line1, (int, float)) else y[:, num_col1]
if len(colors) == 1:
ax.filled.append(ax.fill_between(x, y0, y1, color=colors, alpha=.45))
if len(colors) > 1:
ax.filled.append(ax.fill_between(x, y0, y1, where=(y0 > y1), color=colors[0]))
ax.filled.append(ax.fill_between(x, y0, y1, where=(y1 > y0), color=colors[1]))
except (Exception,):
print(f"Yaml.matplotlib, erreur de saisie dans Entrée {ax.num_input} / color between")
def axes_xy(self, ax, num_input):
""" Axe abscisse et Axe ordonnée : visibilités, polices, positions. """
for b_x in [True, False]:
dic = self.od_yaml.read([f'Entrée {num_input}', 'Axe ' + ('abscisse' if b_x else 'ordonnée')])
od = self.new_od(dic)
axis = ax.xaxis if b_x else ax.yaxis
label = od.read('label', '')
fsize = od.read('label font size', 8)
tsize = od.read('ticks font size', 8)
pos_label = od.read('label pos', [7, 6] if b_x else [2, 22])
pos_axe = od.read('ticks pos', 'bottom' if b_x else 'left')
b_tick_visible = pos_axe is not False
try:
for tick in axis.get_major_ticks():
""" 1 = left ou bottom. """
tick.tick1line.set_visible(b_tick_visible) # Petit tiret : visibilité.
tick.label1.set_visible(b_tick_visible) # Texte : visibilité.
tick.label1.set_fontsize(tsize) # Texte : taille police.
""" 2 = right ou top. """
tick.tick2line.set_visible(b_tick_visible)
tick.label2.set_visible(b_tick_visible)
tick.label2.set_fontsize(tsize)
if b_tick_visible:
axis.set_ticks_position(pos_axe)
ax.set_xlabel(xlabel=label, fontsize=fsize) if b_x else ax.set_ylabel(ylabel=label, fontsize=fsize)
axis.set_label_coords(pos_label[0]/100, pos_label[1]/100)
except (Exception,):
axe = f"Entrée {num_input} / Axe {'abscisse' if b_x else 'ordonnée'}"
print(f"Yaml.matplotlib, erreur de saisie dans {axe}")
def legend(self, ax, num_input):
""" Cartouche de légendes : visibilité, position, nb colonnes, couleurs. """
""" https://matplotlib.org/stable/api/legend_api.html (Pycharm : Ctrl + clic) """
od_legend = self.new_od(self.od_yaml.read([f'Entrée {num_input}', 'Légende'], {}))
d_legend = {
'loc': od_legend.read('loc', 'upper left'),
'ncol': od_legend.read('ncol', 2),
'framealpha': od_legend.read('framealpha', .3),
'facecolor': od_legend.read('facecolor', '#ffe'),
'edgecolor': od_legend.read('edgecolor', '#333'),
}
b_visible = od_legend.read('visible', True)
if b_visible:
ax.legend(**d_legend).set_frame_on(True)
else:
legend = ax.get_legend()
if legend is not None:
legend.remove()
def line_params(self, o_line):
"""
:param o_line: objet Line2D, auquel on a ajouté un membre :
o_line.od = super-dictionnaire des paramètres du dockable.
:return: NA.
On a ici 2 super-dictionnaires exploitables :
- o_line.od = dockable des paramètres du PC : seulement la grappe concernant cette courbe (line).
- self.od_yaml = paramètres du fichier yaml éditable manuellement.
Code libre, à compléter au fur et à mesure des besoins du yaml.
"""
l_key_line = [o_line.od.read('Entrée'), o_line.name]
""" alpha (transparence) """
alpha = self.od_yaml.read(l_key_line + ['alpha'], None)
if alpha is not None:
o_line.set_alpha(alpha)
yaml_parent
par celui-ci./nodes/yaml_parent.py
:
""" https://yaml.org/spec/1.2/ <-- Ctrl + Clic """
# Imports externes
import yaml
import os
# Imports internes
from functions.utils import Utils, Dictionary
class YamlParent:
def __init__(self, o_calc):
self.o_calc = o_calc
self.yaml_file = None # Modifiable à tout moment, manuellement par l'utilisateur.
self.ut = Utils()
self.od_yaml = Dictionary()
def update_odyaml(self): # , od_params=None):
""" Mise à jour de od_yaml. """
try:
with open(self.yaml_file, 'r', encoding='utf-8') as yml:
self.od_yaml = Dictionary(yaml.safe_load(yml))
""" Fusion de modèles. """
except (Exception,) as err:
if os.path.isfile(self.yaml_file):
print(f'Erreur dans le fichier {self.yaml_file}.\n{err}')
""" Fichier yaml absent ou incorrect => dictionnaire vide. """
self.od_yaml = Dictionary() # Vide.
def get_od_dock(self):
return Dictionary(self.o_calc.od_calcs.read(self.o_calc.l_mykey))
def compile(self, l_keys, d_context):
"""
Exemple de code appelant :
d_context = {'msg': 'Bonjour les codeurs !'}
self.o_yaml.compile(d_context)
print(d_context['return_value'])
Exemple de code dans le fichier Yaml :
Figure:
Code: |
print(f'Je suis dans le code compilé.\n{msg}')
return_value = 'Terminé.'
En cas d'erreur de compilation, celle-ci peut être lue dans d_context['error'].
"""
yaml_code = self.od_yaml.read(l_keys)
try:
code = compile(yaml_code, '<string>', 'exec')
eval(code, d_context)
d_context['error'] = "Pas d'erreur."
except (Exception,) as err:
d_context['error'] = f'Erreur :\n{err}'
@staticmethod
def new_od(dic=None):
return Dictionary(dic)
matplotlib_seeder
par celui-ci./nodes/afficheurs/plots/matplotlib_seeder.yaml
:
---
Models: # /nodes/afficheurs/plots/models
# Liste de modèles fusionnés. Cependant, les données de ce fichier restent prioritaires.
- xxxxxxxx.yaml
- yyyyyyyy.yaml
- zzzzzzzz.yaml
Fenêtre:
# Redémarrer pour la prise en compte des paramètres de la fenêtre.
# Styles disponibles, faire : [print(style) for style in plt.style.available]
# ['seaborn', 'dark_background', 'Solarize_Light2', '_classic_test_patch', 'bmh', 'classic', 'fast', ...]
style: seaborn
# Pour voir tous les paramètres disponibles, faire: Dictionary(rcParams).print()
# (Vérifier que l'import existe : from matplotlib import rcParams)
rcParams:
toolbar: 'toolbar2' # 'None', 'toolbar2', 'toolmanager'
axes.titlesize: 6 # Relancer. N'est pas pris en compte par certains styles (seaborn, ... à tester.)
legend.fontsize: 7 # Ne pas relancer.
# Types de graphiques -> plt.{x} : plot, hist, pie, bar, scatter
# Exemples d'attributs : https://python.doctor/page-creer-graphiques-scientifiques-python-apprendre
# - plt.grid(True)
# - plt.text(150, 6.5, r'Danger')
# - plt.xlabel('Vitesse')
# - plt.ylabel('Temps')
# - plt.legend()
# - plt.annotate('Limite', xy=(150, 7), xytext=(165, 5.5), arrowprops={'facecolor':'black', 'shrink':0.05})
Figure:
marges: [0, 0, 3, 0] # En % : haute, droite, basse, gauche. '_' = valeur par défaut
# Entrée 0:
# Entrée 1:
# Entrée 2:
# Entrée 3:
# Entrée 4:
# Entrée 5:
# Entrée 6:
# Entrée 7:
Entrée x - exemples de code:
# Note sur les couleurs. Palette ou Hexa :
# Palette : C0 à C9 ...
# ... ou bien écriture hexadécimale -> 3, 4, 6 ou 8 chiffres hexa :
# 6 chiffres hexa. ex : #2fd155 -> rouge : #2f, vert : #d1, bleu : #55
# 8 chiffres hexa. ex : #f411a680 -> 6 chiffres = Couleur, les 2 derniers = alpha.
# 3 chiffres hexa. ex : #4b6 identique à #44bb66
# 4 chiffres hexa. ex : #87fc identique à #8877ffcc <-- transparence = #cc
Géométrie: # _ = valeur par défaut. Valeurs de 0 à 100.
taille: [0, 0, 60, 70] # En % : x, y (coin haut gauche), largeur, hauteur
marges: [10, _, _, 12] # En % : haut, droit, bas, gauche.
Légende:
visible: False # True par défaut
loc: 'upper right' # Automatique par défaut
ncol: 2 # 1 par défaut
framealpha: .2 # 1 par défaut (opaque)
facecolor: '#ffe' # Transparent par défaut
edgecolor: '#333' # Bordure blanche par défaut
Titre visible: False
Cadre: False # ax.set_frame_on(bool)
Réticule: [ both, dotted, C9 ] # Traits verticaux(x), horizontaux (y) ou les 2 (both).
color between:
- Nom quelconque 1: # Juste pour l'utilisateur. Non utilisé dans le code.
# Nom des courbes : seule une partie du nom affiché dns le dockable.
courbes: [ -MACD, 0, Courte ] # Seules le 2 premières valeurs de la liste sont traitées.
couleurs: [ '#00f8', '#f006' ]
- RSI - Nom quelconque 2:
courbes: [ Normal, Normal$low ] # Ici emploi de la variable 'low' du signal contenant 'Normal' dans son nom.
couleurs: [ '#0000', '#f008' ]
Axe abscisse:
label: Blah-x Blah-1 # ax.set_xlabel(...)
label pos: [ 7, 4 ] # 0 à 100. Pourcentages (horizontal, vertical) : (50, 0) = center
label font size: 8
ticks pos: False # bottom (défaut), top. ax.xaxis.tick_top() Position de l'abscisse : bottom, top, False (invisible)
ticks font size: 8
Axe ordonnée:
label: Blah-y Blah-1 # ax.set_ylabel(...)
label pos: [ 2, 14 ]
label font size: 8
ticks pos: False # ax.yaxis.tick_left() Position de l'ordonnée : left, right, False (invisible)
ticks font size: 8
ShareX: 0 # ax.sharex(ax_ante) - Axe x partagé avec le subplot 0 (Entrée 0)
/backups/Signaux/node0/0-matplotlib.yaml
et /backups/Signaux/node5/5-matplotlib.yaml
.ShowMatPlotLib.setup()
→ décommenter le point 3 : "Fichier de configuration ..."ShowMatPlotLib.watch()
→ décommenter le point 3 : "Création ou update des paramètres étendus ..."ShowMatPlotLib.update_figure()
→ décommenter le point 2 : "Paramètres étendus yaml"PC
, pas matplotlib
.Voir
d'un node Plots
→ La fenêtre matplotlib
s'affiche, le bouton devient Fermer
Paramètres étendus
de ce node, dans le dockable → Les paramètres yaml de base (idem au seeder) s'affichent.Fenêtre
, modifier le style et retirer la toolbar (None).PC
: Clic sur le bouton Fermer
, puis clic à nouveau sur ce bouton (Voir
).Matplotlib
.ShowMatPlotLib.update_figure()
, décommenter le point 3 : "Subplots : création, ... etc."ShowMatPlotLib.update_figure()
, décommenter le point 6 : "Affichage à l'écran ... etc."ShowMatPlotLib.update_ax()
.CtrlSocket.update_calc()
doit être modifiée, pour ne prendre en compte que les entrées actives.ctrl_socket.py > CtrlSocket.update_dcalc()
:
def update_dcalc(self, d_calc):
""" Propagation vers l'amont. """
if self.b_input:
lo_edges = [gredge.o_edge for gredge in list(self.get_gredges()) if gredge.o_edge.b_on]
if lo_edges:
""" Il ne peut y avoir qu'un seul edge dans la liste. """
o_edge = lo_edges[0]
d_calc['edges'].add(o_edge.get_tid())
o_edge.update_dcalc(d_calc)
else:
self.o_node.update_dcalc(d_calc)
Voir
de Plots-0
.PC
, activer 0 à 8 entrées de Plots-0
, en cliquant sur les cases à cocher des nodes Signaux
.e0
à e3
.
Entrée 3:
Géométrie: # _ = valeur par défaut. Valeurs de 0 à 100.
taille: [65, 5, 30, 30] # En % : x, y (coin haut gauche), largeur, hauteur
Calcul
de Signaux
par celle-ci./nodes/generateurs/signaux/signaux.py > classe Calcul
:
class Calcul(CtrlCalcul):
""" ********** Le code ci-dessous ne concerne pas le poste de contrôle, mais seulement les calculs. ********** """
def __init__(self):
super().__init__()
self.o_yaml = YamlParams(self) # Classe spécifique à 'Signaux'.
def descr_signal(self, od_descr, val, root_key):
""" Voir docstring dans la classe-mère. """
""" Boucle sur les signaux en sortie. """
for key in ['Sinus', 'Cosinus', 'Carré', 'Triangle', 'Dent de scie montante', 'Dent de scie descendante']:
""" 'key_dock' doit avoir LA MÊME ORTHOGRAPHE que celle affichée dans le dockable de ses nodes-clients. """
key_dock = f'{self.s_id}-{key}' # Ex : Signaux2-Carré
per_amp = self.o_yaml.get_params(key)
od_descr.write([key_dock, 'actif'], self.od_mydic.read(key, False)) # True ou False.
od_descr.write([key_dock, 'per_amp'], per_amp)
def process(self):
""" Voir docstring dans la classe-mère. """
""" Mapping signal -> méthode. """
l_signals = [
('Sinus', self.get_sinus_wave),
('Cosinus', self.get_cosinus_wave),
('Carré', self.get_square_wave),
('Triangle', self.get_triangle_wave),
('Dent de scie montante', self.get_up_sawtooth_wave),
('Dent de scie descendante', self.get_down_sawtooth_wave)
]
""" Traitement des signaux. """
for signal_name, method in l_signals:
key_dock = self.s_id + '-' + signal_name
b_signal_actif = self.od_descr.read([key_dock, 'actif'], False)
if b_signal_actif:
period, amplitude = self.o_yaml.get_params(signal_name)
nb_iter = 2 + self.len_buffer // (2 * period)
vector = method(nb_iter, period, amplitude)
num_col = self.od_descr.read([key_dock, 'num_col'], -1)
self.add_vector(key_dock, num_col, vector)
def get_sinus_wave(self, nb_iter, period, amplitude):
x = np.linspace(0, 2 * np.pi, period + 1)[:-1]
y = np.reshape(amplitude * np.sin(x), (period, 1))
y = np.concatenate((y, y) * nb_iter)
return y[: self.len_buffer]
def get_cosinus_wave(self, nb_iter, period, amplitude):
x = np.linspace(0, 2 * np.pi, period + 1)[:-1]
y = np.reshape(amplitude * np.cos(x), (period, 1))
y = np.concatenate((y, y) * nb_iter)
return y[: self.len_buffer]
def get_square_wave(self, nb_iter, period, amplitude):
y = np.ones((period, 1))
y[:period // 2] *= amplitude
y[period // 2:] *= - amplitude
y = np.concatenate((y, y) * nb_iter)
return y[: self.len_buffer]
def get_triangle_wave(self, nb_iter, period, amplitude):
y = np.ones((period, 1))
y[:period // 2] *= np.linspace(amplitude, -amplitude, period // 2, endpoint=False).reshape((period // 2, 1))
y[period // 2:] *= np.linspace(-amplitude, amplitude, period // 2, endpoint=False).reshape((period // 2, 1))
y = np.concatenate((y, y) * nb_iter)
return y[: self.len_buffer]
def get_up_sawtooth_wave(self, nb_iter, period, amplitude):
y = np.ones((period, 1)) * np.linspace(-amplitude, amplitude, period).reshape((period, 1))
y = np.concatenate((y, y) * nb_iter)
return y[: self.len_buffer]
def get_down_sawtooth_wave(self, nb_iter, period, amplitude):
y = np.ones((period, 1)) * np.linspace(amplitude, -amplitude, period).reshape((period, 1))
y = np.concatenate((y, y) * nb_iter)
return y[: self.len_buffer]
signaux_yaml.py
par celui-ci./nodes/generateurs/signaux/signaux_yaml.py
:
# Imports internes.
from nodes.yaml_parent import YamlParent
class YamlParams(YamlParent):
def __init__(self, o_calc):
super().__init__(o_calc)
def get_params(self, signal_name):
od_mydic = self.get_od_dock()
default_period = od_mydic['Points par période']
default_amplitude = od_mydic['Amplitude']
period = self.od_yaml.read([signal_name, 'Points par période'], default_period)
amplitude = self.od_yaml.read([signal_name, 'Amplitude'], default_amplitude)
return period, amplitude
CtrlCalculs
, des méthodes suivantes :
get_server()
.get_keys_from_idnode()
.get_oserver()
. ← @classmethodsetup_server()
.get_signature()
.dt_servers_in()
. ← @propertyShowMatPlotLib
:
get_edge()
.update_figure()
, décommenter le début du point 4 : jusqu'à la fin du point 4.1 (se termine par print(ax.o_server)
).CtrlCalculs
, des méthodes suivantes :
calculation()
.load_last_datas()
.setup_od_descr()
.descr_signal()
.update_od_descr()
.process()
.create_out_calc()
.add_vector()
.ShowMatPlotLib
:
update_figure()
, décommenter le point 4.2 (il se termine par un print()
de vérification).show_matplotlib.py
.
ShowMatPlotLib
, des méthodes :
update_subplot()
.update_lines()
.line_exist()
.get_line()
.get_linestyle()
.update_subplot()
.ShowMatPlotLib
:
update_figure()
, décommenter le point 4.3.show_matplotlib.py
.
PC
, modifier les paramètres de chaque entrée de Plots
:
Signaux
.CtrlCalcul
de la méthode :
get_matrix()
← fournit, depuis le tableau numpy, seulement la partie (la matrice) à afficher.ShowMatPlotLib
des méthodes :
show_anim()
.speed()
.setup()
, décommenter le point 4 : "Vitesse d'animation ..."update_figure()
, décommenter le point 5 : "Actualisation du buffer d'affichage ..."PC
, modifier les paramètres des courbes ; couleur, épaisseur, style, ...yaml
, colorier des zônes entre 2 courbes ou valeurs.ShowMatPlotLib
:key_event()
.
key_event()
.setup()
, les événements et les timers.
poste de contrôle
, ne pas lancer show_matplotlib.py
.self.nb_points
(nombre de points en abscisse, fixé à 200 dans l'ini).Bonjour les codeurs !