Le poste de contrôle (PC) / Infrastructure /
Paramètres dynamiques d'un node

Transmission de signaux entre nodes


Avant-propos

Si vous laissez des bugs dans votre programme suite à un oubli ou à une erreur ... alors bienvenu au club !
N'hésitez pas à les signaler (les vôtres ou ceux du tuto), dans les commentaires de youtube.

1 - Révision : Nettoyez votre code en retirant les print(), les imports et les commentaires inutiles.
Customisez les formes, couleurs épaisseurs, polices, tailles, ... à votre convenance.
Vérifiez chaque script en appuyant sur la touche F2.

2 - Debug :

1 - Graphe

2 - Déplacement partiellement hors champ

3 - Relancement de l'application : les edges ont disparu !

 


3 - Debug : La position de la vue est mal mémorisée.

1 - État initial

2 - Retrait du dockable des nodes

3 - Relancement de l'appli : la position n'a pas été mémorisée.

Ce n'est pas très grave, mais nous allons tout de même le corriger :


4 - Ajouter l'id du node dans l'affichage de son type.
        self.type_item.setPlainText(f"{self.o_node.type}-{self.o_node.id}")


Description

Cet exemple affichera 2 courbes dans un graphique :

Exemple de graphe minimaliste.

Node 'Signaux'

Node 'Moyennes'

Node 'Graphique'

 


Attention. Ce tuto contient beaucoup de code. Soyez vigilant, procédez lentement pas à pas, un oubli est vite arrivé. Sur-vérifiez, visionnez la vidéo.
Affichage du graphe :

 


Algorithme de propagation pour cet exemple simple à 3 nodes et 2 edges :

Propagation vers l'amont. Partons dans notre graphe-exemple, du Plot-3, lire dans le sens des flèches :

Signaux(1) ← Socket_out(1.0) ← Edge((1.0),(2.0)) ← Socket_in(2.0) ← Moyennes(2) ← Socket_out(2.0) ← Edge((2.0),(3.0)) ← Socket_in(3.0) ← Plot(3)

  1. Plot-3 demande à son Socket_in-3-0 la liste des noms de signaux afin qu'il puisse les afficher dans le dockable de ses paramètres.
  2. Socket_in-3-0 demande cette liste à l'edge connecté.
  3. L'edge demande cette liste à son socket de départ, à savoir Socket_out-2-0.
  4. Socket_out-2-0 demande cette liste à son node MM-2.
  5. MM-2 doit faire un traitement : Sortie0 = f(Entrée0, paramètres).
    • On remarque Entrée0, qui est la liste fournie par Socket_in-2-0.
    • Pour cette entrée, le même algorithme est appliqué :
    • MM-2 demande à son Socket_in-2-0 la liste des noms de signaux, car il en a besoin :
      • Pour son traitement.
      • Pour les afficher dans le dockable des paramètres.
      • Pour les transmettre vers l'aval, à savoir Plot-3.
  6. Socket_in-2-0 demande cette liste à l'edge connecté.
  7. L'edge demande cette liste à son socket de départ, à savoir Socket_out-1-0.
  8. Socket_out-1-0 demande cette liste à son node Signaux-1.


Propagation vers l'aval, lire dans le sens des flèches :

Signaux(1) → Socket_out(1.0) → Edge((1.0),(2.0)) → Socket_in(2.0) → Moyennes(2) → Socket_out(2.0) → Edge((2.0),(3.0)) → Socket_in(3.0) → Plot(3)

  1. Signaux-1 retourne cette liste : ["Signaux-1 : Cosinus", "Signaux-1 : Carré"] à son Socket_out-1-0.
    • Il s'agit des signaux sélectionnés par les cases cochées.
  2. Socket_out-1-0 fournit cette liste à tous les edges connectés (ici, il n'y en a qu'un).
  3. L'edge fournit la liste à son socket d'arrivée, à savoir Socket_in-2-0.
  4. Socket_in-2-0 fournit cette liste à son node MM-2.
  5. MM-2 fait son traitement : Sortie0 = f(["Signaux-1 : Cosinus", "Signaux-1 : Carré"], paramètres) ...
    • ... puis retourne ce résultat à son socket de sortie Socket_out-2-0.
  6. Socket_out-2-0 fournit cette liste à tous les edges connectés (ici, il n'y en a qu'un).
  7. L'edge fournit cette liste à son socket d'arrivée, à savoir Socket_in-3-0
  8. Socket_in-3-0 fournit cette liste à son node, à savoir Plot-3.

Algorithme de propagation généralisé :


Représentation généralisée d'un node.

Cette apparente complexité est réduite à 4 traitements :

  1. Au niveau du node, le traitement sera fait dans la méthode get_signals(), exclusivement appelée par ses socket_out.
  2. Au niveau des socket_out, nous aurons la @property l_signals, exclusivement appelée par ses edges.
  3. Au niveau des edges, nous aurons la @property l_signals, exclusivement appelée par leur socket_in d'arrivée.
  4. Au niveau des socket_in, nous aurons la @property l_signals, exclusivement appelée par leur node.
Remarque : Les socket_in et les socket_out faisant partie de la même classe CtrlSocket, les traitements 2 et 4 seront groupés dans la même @property l_signals.


Création des paramètres dynamiques.


Propagation des paramètres :


Seuls les paramètres communs sont affichés.
 

  • Pourquoi ?
    • Les paramètres affichés dans le dockable sont ceux du dictionnaire od_params, construit à partir de  2 dictionnaires : od_default et od_real.
      • C'est la méthode Parameters.set_params() qui est chargée de cela.
    • Ces paramètres od_params sont créés dans la méthode CtrlNode.get_default(). Ils sont composés de 3 parties, qui dépendent fortement du type de node (figure ci-contre) :
      1. Les paramètres communs (titre du node, couleurs des sorties) 
      2. Les paramètres fixes, spécifiques au node.
      3. Les paramètres dynamiques, qui dépendent des connexions aux entrées.
    • Or pour l'instant, od_default ne contient que les paramètres  communs.
      • Voir figure au dessus : "Seuls les paramètres communs sont affichés".
  • Solution :
    •  Incorporer les paramètres fixes et dynamiques dans Parameters.od_default.
    • Ajouter à chaque type de node les méthodes spécifiques :
      • fixed_params()
      • my_params qui fournit un motif, appliqué à chaque signal. Ici : Légende, Couleur, Épaisseur, Style.
      • my_signals : signaux fournis par le node, groupés en faisceaux. Un faisceau par sortie.
  • Procédons par ordre :
    • Pour satisfaire aux algorithmes de propagation aval → amont et amont → aval, ainsi que l'incorporation des paramètres fixes et dynamiques, beaucoup de classes vont devoir être complétées : CtrlNode, CtrlSocket, CtrlEdge, ainsi que les classes Node de chaque type de node.
    • Nous allons faire ceci au travers d'exercices progressifs.

 


Node de type 'Plots' à 4 entrées.

Exercice :

Résultat à obtenir.
 


A faire, dans l'ordre :
  1. Classe Parameters, 2 modifications :
    • Déplacer l'affectation de l'attribut Parameters.od_default.
      Depuis Parameters.setup() vers Parameters.set_params() :
          def setup(self):
              self.o_scene = self.o_parent if self.o_parent.s_id == 'CtrlScene' else self.o_parent.o_scene
              self.od_real = self.o_scene.o_pkl.od_pkl
              self.layout = self.o_scene.o_main.params_layout     # Nom donné dans Qt Designer.
              self.od_params = Dictionary()
              self.set_params()
      
          def set_params(self):
              """ Les paramètres par défaut contiennent des paramètres fixes et des paramètres dynamiques. """
              self.od_default = Dictionary(self.o_parent.get_default())
              ll_keys = self.od_default.key_list()
              """ self.od_params n'est instancié qu'une fois (setup()), mais affecté plusieurs fois (ici). """
              self.od_params.clear()          # self.od_params conserve son adresse-mémoire.
              for l_keys in ll_keys:
                  v_default = self.od_default.read(l_keys)
                  if isinstance(v_default, list):
                      v_default = v_default[0]
                  l_k = [self.o_parent.s_id] + l_keys
                  self.od_params.write(l_keys, self.od_real.read(l_k, v_default))
      

      Cela permet de rafraîchir les paramètres dynamiques à chaque modification.
       

    • Amélioration de la méthode dic2params(). Ajout du type de paramètre text (Champ texte multi-lignes).
      Parameters.dic2params() :

          def dic2params(self):
              r""" La valeur de retour est une liste formatée pour satisfaire aux conditions imposées par pyqtgraph.Parameter.
                  Cette liste contient un ou plusieurs dictionnaires, contenant à leur tour :
                      - Des paramètres (nom, type, valeur par défaut, valeur réelle, paramètres optionnels)
                      - Des 'children' de type 'group' qui contiennent à leur tour une liste au même format. <-- list()
                  L'ensemble des paramètres constitue donc une arborescence (donc hiérarchique) assez complexe.
                  Voir exemple : D:\anaconda\envs\robot\Lib\site-packages\pyqtgraph\examples\parametertree.py
              """
              def _compute_val(l_keys, key, val):
                  """ Cette méthode produit un dictionnaire (5 clés ou plus) selon les exemples ci-après :
                  Exemple1 :
                      - Avec  l_keys = ["Paramètres du graphe 'Londres'"]
                              key = "Nom de l'onglet"
                              val = "Choisir un nom"
                      - On obtient : dic = {
                              'name': "Nom de l'onglet",
                              'default': 'Choisir un nom',    # Provient des valeurs par défaut (od_default).
                              'value': 'Stratégie 01',        # Provient des valeurs réelles (od_real).
                              'l_keys': ["Paramètres du graphe 'Londres'", "Nom de l'onglet"],
                              'type': 'str'
                              }
                  Exemple2 :
                      - Avec  l_keys = ["Paramètres du graphe 'Londres'", 'Épaisseurs']
                              key = "Traits"
                              val = [1.1, {'step': 0.1, 'limits': (0.1, 3.0)}]
                      - On obtient : dic = {
                              'step': 0.1,
                              'limits': (0.1, 3.0),
                              'name': 'Traits',
                              'default': 1.1,
                              'value': 1.0,
                              'l_keys': ["Paramètres du graphe 'Londres'", 'Épaisseurs', 'Traits'],
                              'type': 'float'
                              }
                  """
                  dic = dict()
                  if isinstance(val, list):
                      if len(val) != 2 or not isinstance(val[1], dict):
                          return False
                      dic = val[1]
                      val = val[0]
                      if 'values' in dic:
                          dic['type'] = 'list'        # Liste de choix.
                  if key[0] == '$':
                      dic['type'] = 'text'            # Champ texte multilignes.
                      key = key[1:]
                  dic['name'] = key
                  dic['default'] = val
                  dic['value'] = self.od_real.read([self.o_parent.s_id] + l_keys + [key], dic['default'])
                  dic['l_keys'] = l_keys + [key]
                  if 'type' not in dic:               # Si ce n'est pas une liste ...
                      if isinstance(val, bool):
                          dic['type'] = 'bool'
                      elif isinstance(val, int):
                          dic['type'] = 'int'
                      elif isinstance(val, float):
                          dic['type'] = 'float'
                      elif isinstance(val, str):
                          if val and val[0] == '#':
                              dic['type'] = 'color'
                              dic['default'] = pg.mkColor(dic['default'])
                              dic['value'] = pg.mkColor(dic['value'])
                          else:
                              dic['type'] = 'str'     # ... c'est un str.
                  return dic
      
              def _build_params(default_in, list_out, l_keys):
                  """ Fonction récursive.
                  :param default_in: Sous-dictionnaire des valeurs par défaut en entrée.
                  :param list_out: Liste en sortie, formatée pour ParameterTree. Modifiée 'in place'.
                  :param l_keys: clés 'à plat' (liste) des dictionnaires od_default et od_real.
                  :return: list_out, modifiée 'in place'.
                  """
                  for key, val in default_in.items():
                      if isinstance(val, dict):
                          branche = {'name': key, 'type': 'group', 'children': []}
                          list_out.append(branche)
                          l_keys.append(key)          # Ajout d'une clé pour traiter la descendance.
                          _build_params(val, branche['children'], l_keys)
                          del(l_keys[-1])             # Descendance traitée => suppression de la dernière clé.
                      else:
                          """ Traitement déporté pour simplifier la compréhension du code. """
                          dic = _compute_val(l_keys, key, val)    # Retourne un dictionnaire, sauf si erreur.
                          if isinstance(dic, dict):
                              list_out.append(dic)
      
              l_params = list()
              _build_params(copy.deepcopy(self.od_default), l_params, [])
              return l_params
      

      Cela permet d'afficher dans le dockable, le parcours du signal (node après node), sur plusieurs lignes.
       

  2. Classe UiView, 1 modification :

    • Propagation des modifications lors de l'ajout d'un edge. Modifier la méthode stop_edge().
      UiView.stop_edge() :

          def stop_edge(self, ev):
              def break_edge(o_socket_out):
                  """ Remplace un edge par 2 edges. Plusieurs conditions sont nécessaires :
                      - Le socket d'arrivée du mi-edge est déjà occupé par un edge.
                      - Le node de départ a au moins un socket d'entrée libre.
                      - Le tuple formé par ce socket d'entrée et le socket de sortie en cours est dans l_short_circuits.
                  """
                  o_node = self.o_mid_edge.o_socket_out.o_node            # Node de départ.
                  num_socket_out = self.o_mid_edge.o_socket_out.t_id[1]   # N° du socket de sortie en cours de liaison.
                  for s_c in o_node.l_short_circuits:
                      """ Recherche des sockets d'entrée pouvant être court-circuités avec le socket de sortie en cours. """
                      if s_c[1] == num_socket_out:
                          o_socket_in = o_node.lo_sockets_in[s_c[0]]
                          if len(o_socket_in.get_gredges()) == 0:
                              """ Construction de l'edge supplémentaire, à condition que l'entrée soit libre. """
                              CtrlEdge(self.o_scene, o_socket_out, o_socket_in)
      
              if self.o_mid_edge.__class__.__name__ == 'CtrlEdge':
                  """ Reset. """
                  self.normal_size_sockets()          # Rétablit les tailles normales aux sockets.
                  self.o_scene.o_grscene.removeItem(self.o_mid_edge.o_gredge)     # Effacement du mi_edge.
      
                  """ Drop = fin du drag. """
                  if self.is_dropable(ev):
                      gr_item = self.itemAt(ev.pos())     # UiSocket
      
                      """ Suppression d'un éventuel edge connecté à ce socket. """
                      for o_gredge in gr_item.o_socket.get_gredges():
                          break_edge(o_gredge.o_edge.o_socket_out)        # "Dé-court-circuit" : remplace un edge par 2 edges.
                          self.o_scene.o_grscene.removeItem(o_gredge)     # Effacement.
      
                      """ Création d'un edge complet. """
                      CtrlEdge(self.o_scene, self.o_mid_edge.o_socket_out, gr_item.o_socket)
                      gr_item.o_socket.to_update()     # Paramètres des nodes en aval à updater.
      
                      """ Enregistrement. """
                      self.o_scene.save_edges(backup=True)
      
                  """ Destruction de l'objet. """
                  self.o_mid_edge = None
      

      Cela permet de propager la baisse du drapeau b_updated_out vers tous les sockets en aval.
       

  3. Modifier les fichiers signaux.py et plot.py pour prendre en compte les adaptations nécessaires.
    /nodes/generateurs/signaux.py :
    from pc.ctrl_node import CtrlNode
    d_datas = {
        'name': 'Signaux',
        'icon': 'signaux.png',
    }
    
    
    class Node(CtrlNode):
        def __init__(self, o_scene, s_id, pos):
            super().__init__(o_scene, s_id, pos)
            self.type = 'Signaux'
            self.setup()
    
        def setup(self, child_file=__file__):
            super().setup(child_file)
    
        @property
        def ld_outputs(self):
            return [{
                'label': 'Sortie',
                'label_pos': (-38, -10)
            }]
    
        def fixed_params(self):
            return {
                'Sinus': True,
                'Cosinus': False,
                'Carré': False,
                'Triangle': False,
                'Dent de scie montante': False,
                'Dent de scie descendante': False,
                'Amplitude': 20,
                'Points par période': 100,
            }
    
        def my_signals(self, l_signals_in, num_socket_out):
            """ Voir documentation dans la classe-mère : CtrlNode.my_signals().
            :param l_signals_in: Liste toujours vide car ce node n'a pas d'entrée.
            :param num_socket_out: Ce node n'a qu'une sortie : N° 0.
            :return: Liste de tuples. Pour chaque signal actif, un tuple de 4 valeurs.
            """
            l_signals = list()
    
            # à coder ...
    
            return l_signals
    
        def refresh(self, l_keys):
            """ Propagation en cascade : Seulement sur les noms de signaux (type valeur = bool). """
            if isinstance(self.get_param(l_keys[-1]), bool):
                self.lo_sockets_out[0].to_update()
    
            super().refresh(l_keys)
    
    Vous devrez coder la méthode my_signals().

    /nodes/afficheurs/plot.py :
    from pc.ctrl_node import CtrlNode
    d_datas = {
        'name': 'Plot',
        'icon': 'plot.png',
    }
    
    
    class Node(CtrlNode):
        def __init__(self, o_scene, s_id, pos):
            super().__init__(o_scene, s_id, pos)
            self.type = 'Plot'
            self.setup()
    
        def setup(self, child_file=__file__):
            super().setup(child_file)
    
        @property
        def ld_inputs(self):
            return [{      # Liste de 1 dictionnaire.
                'label': 'Entrée',
                'label_pos': (6, -10)
            }]
    
        def fixed_params(self):
            return {
                'Graphique': {
                    'Titre': 'titre du graphique',
                }
            }
    
        def my_params(self, context):
            """ Voir documentation dans la classe-mère : CtrlNode.my_params(). """
            d_params = dict()
    
            # à coder ...
    
            return d_params
    

    Vous devrez coder la méthode my_params().
     

  4. Modifier les classes CtrlSocket et CtrlEdge pour prendre en compte les adaptations nécessaires.
    fichier ctrl_socket.py :
    from pc.ui_socket import UiSocket
    
    
    class CtrlSocket:
        def __init__(self, o_node, d_datas):
            self.o_node = o_node
            self.d_datas = d_datas                  # Dictionnaire. clés : label, label_pos, id, is_input
            self.t_id = self.d_datas['id']          # (id_node: int, num_socket: int) <- tuple.
            self.b_input = self.d_datas['pos'][0]   # Position Hori. physique : Entrée (True) à gauche, Sortie à droite.
            self.pos = self.d_datas['pos'][1]       # Position Vert. physique, tient compte des séparateurs.
            self.label = self.d_datas['label']
            self.t_label_pos = self.d_datas['label_pos']
            self.o_grsocket = None                  # Objet UiSocket.
            self.color = '#bbb'                     # Gris clair pour les socket_in libres (non connectés).
            self.b_updated_out = False
            self._l_signals = list()
            self.setup()
    
        def setup(self):
            if not self.b_input:
                """ Socket de sortie. """
                l_keys = ['Couleurs', self.label]
                self.color = self.o_node.get_param(l_keys, '#666')
            self.o_grsocket = UiSocket(self)
    
        @property
        def l_signals(self):
            """ Si les signaux sont à jour, on ne fait rien. """
            if not self.b_updated_out:
                """ Si les signaux ne sont pas à jour, on les régénère. """
                if self.b_input:
                    """ Socket d'entrée - code appelant : son propre CtrlNode. """
                    lo_gredges = list(self.get_gredges())
                    if not lo_gredges:
                        return []
                    """ On délègue la recherche des signaux à l'edge connecté. """
                    self._l_signals = lo_gredges[0].o_edge.l_signals    # [0] : Une entrée reçoit un edge au maximum.
                else:
                    """ Socket de sortie - codes appelant : Les CtrlEdge connectés à ce socket_out. """
                    self._l_signals = self.o_node.get_signals(num_socket=self.t_id[1])
                self.b_updated_out = True
    
            return self._l_signals
    
        def to_update(self):
            self.b_updated_out = False
            if self.b_input:                # Socket in
                """ Mise à jour des paramètres dynamiques de son node. """
                self.o_node.o_params.set_params()
    
                """ Propagation vers tous les sockets de sortie de son propre node. """
                for o_socket_out in self.o_node.lo_sockets_out:
                    o_socket_out.to_update()
            else:                           # Socket out
                """ Chaque edge de ce socket (sortie) propage la baisse du drapeau au socket (entrée) à l'autre bout. """
                for o_gredge in self.get_gredges():
                    o_gredge.o_edge.o_socket_in.to_update()
    
        def get_gredges(self):
            """ Retourne un set contenant tous les edges connectés à ce socket.
            Remarque :
                - Un socket d'entrée peut être connecté à 0 ou 1 edge.
                - Un socket de sortie peut être connecté à 0, 1 ou plusieurs edges.
            """
            s_gredges = set()
            for gredge in self.o_node.o_scene.o_grscene.items():
                if gredge.__class__.__name__ == 'UiEdge':
                    """ Choix du socket accroché à la bonne extrémité du edge. """
                    o_socket = gredge.o_edge.o_socket_in if self.b_input else gredge.o_edge.o_socket_out
                    if o_socket == self:
                        s_gredges.add(gredge)
            return s_gredges
    
        @property
        def position(self):
            """ Les positions sont optimisées pour une grille de maille 16. """
            h_title = self.o_node.d_display['geometry']['h_title']
            first_y = 16*(1+(h_title+4)//16)    # Multiple de 16
            place_y = self.pos                  # num socket
            y = first_y + place_y * 16
            x = 0 if self.b_input else self.o_node.width
            return x, y
    

    fichier ctrl_edge.py :
    from pc.ui_edge import UiEdge
    
    
    class CtrlEdge:
        def __init__(self, o_scene, o_socket_out, o_socket_in):
            """
            :param o_scene: objet CtrlScene
            :param o_socket_out: objet CtrlSocket
            :param o_socket_in: objet CtrlSocket ou QPointF
            """
            self.o_scene = o_scene
            self.o_gredge = None
            self.o_socket_out = o_socket_out    # Attention ! risque de confusion : Socket FROM (de départ)
            self.o_socket_in = o_socket_in      # Attention ! risque de confusion : Socket  TO  (d'arrivée)
            self.color = '#bbb'
            self.setup()
    
        def setup(self):
            self.o_gredge = UiEdge(self)                    # Dessin de l'edge dans l'UI.
            self.o_scene.o_grscene.addItem(self.o_gredge)   # Ajout de l'edge dans la scène.
    
            """ Couleurs, sauf si l'edge est en cours de construction. """
            if self.o_socket_in.__class__.__name__ == 'CtrlSocket':
                self.color = self.o_socket_out.color
                self.o_socket_in.color = self.color         # On impose sa couleur au socket_in d'arrivée.
    
        def get_tid(self):
            """ Renvoie un tuple de tuples. Exemple : (90, 0), (91, 4) """
            return self.o_socket_out.t_id, self.o_socket_in.t_id
    
        @property
        def l_signals(self):
            return self.o_socket_out.l_signals
    
        @property
        def end_points(self):
            """ - Renvoie un tuple : les points de départ et d'arrivée.
                - Les coordonnées d'un socket sont relatives à celles de son node.
                - Il faut donc les additionner pour obtenir les coordonnées absolues."""
            """ Point de départ. """
            pos_node_from = self.o_socket_out.o_node.o_grnode.pos()     # type QPointF
            pos_socket_out = self.o_socket_out.o_grsocket.pos()         # type QPointF
    
            """ Point d'arrivée. """
            if self.o_socket_in.__class__.__name__ == 'QPointF':
                """ Edge en construction : pos_to = position de la souris. """
                pos_to = self.o_socket_in                               # type QPointF
            else:
                pos_node_to = self.o_socket_in.o_node.o_grnode.pos()    # type QPointF
                pos_socket_in = self.o_socket_in.o_grsocket.pos()       # type QPointF
                pos_to = pos_node_to + pos_socket_in
    
            """ Les QPointF peuvent s'additionner. """
            return pos_node_from + pos_socket_out, pos_to    # Tuple.
    

     
  5. La classe CtrlNode : De nouvelles méthodes sont ajoutées.
    Elles sont fournies ci-dessous, mais prenez le temps de les étudier, voire même de les améliorer.
    • deepcopy() : Utilitaire, permettant une copie profonde des variables par référence (listes, dictionnaires, ...)
    • get_signals() : Fournit les signaux de sortie à partir des signaux d'entrée et des paramètres. Voir figure "Représentation généralisée d'un node".
    • join() : Utilitaire. Permet une concaténation aisée d'éléments (str, int, ...), en ignorant les chaines.vides.
    • @property ld_inputs() : Remplace l'ancien attribut ld_inputs.
    • @property ld_outputs() : Remplace l'ancien attribut ld_outputs.
    • my_params() : Doit obligatoirement être surchargée par les classes dérivées.
    • my_signals() : Doit obligatoirement être surchargée par les classes dérivées.
      fichier ctrl_node.py :
      from ui_node import UiNode
      from functions.utils import Dictionary, DateTime, Utils
      from parameters import Parameters
      from ctrl_socket import CtrlSocket
      from ctrl_edge import CtrlEdge
      import copy
      import os
      
      
      class CtrlNode:
          def __init__(self, o_scene, s_id, pos):
              self.o_scene = o_scene
              self.s_id = s_id  # Clé d'entrée dans le super-dictionnaire pkl.
              self.od_pkl = self.o_scene.o_pkl.od_pkl
              self.o_grnode = None
              self.o_params = None
              self.pos = pos  # pos = (x, y)
              self.width = 96
              self.height = 60
              self.type = ''
              self.ut = Utils()
              self.main_key = f"Paramètres du node '{self.s_id}'"
              self.b_chk = self.od_pkl.read([self.s_id, 'b_chk'], True)
              self.b_on = self.b_chk  # Provisoire.
              self.child_file = ''
      
              """ Sockets. """
              self.lo_sockets_in = list()  # Liste d'objets 'sockets in'  instanciés.
              self.lo_sockets_out = list()  # Liste d'objets 'sockets out' instanciés.
              self.default_color = '#88bbff'  # Couleur par défaut des sockets de sortie.
      
              """ Secousses => Court-circuits. """
              self.dt = DateTime()
              self.l_short_circuits = list()  # Surchargé par les classes dérivées.
              self.l_shortables = list()
              self.b_removable = True  # Les edges d'entrée peuvent être détruits en cas de court-circuit.
              self.k_shake = 0  # Nombre de secousses.
              self.b_shaking = False  # Secousse en cours.
              self.x, self.y = 0, 0  # Position antérieure (x, y), mémorisée.
              self.dx, self.dy = 0, 0  # Deltas (variations) antérieurs, mémorisés.
      
          def setup(self, child_file):
              """ Code appelant : classe dérivée, on connait donc ses attributs. """
              """ Paramètres. """
              self.o_params = Parameters(self)
      
              """ Hauteur automatique du node : elle dépend du nombre de sockets en entrée ou en sortie (le plus grand).  """
              h_title = self.d_display['geometry']['h_title']
              nb_sockets_max = max(len(self.ld_inputs), len(self.ld_outputs))  # En fait : Sockets + séparateurs.
              first_y = 16 * (1 + (h_title + 4) // 16)  # Multiple de 16.
              self.height = first_y + nb_sockets_max * 16 - 4
      
              """ Nouveau node par drag & drop. """
              self.child_file = child_file
              b_new = self.pos != (0, 0)  # Nouveau node (node dropé).
              if b_new:
                  """ - Cas d'un nouveau node (dropé).
                      - Son centre apparaît exactement à la position du curseur de la souris.
                      - On mémorise son path et sa position. """
                  self.pos = self.pos[0] - self.width // 2, self.pos[1] - self.height // 2
                  path = 'nodes' + os.path.relpath(child_file).split('nodes')[1][:-3].replace(os.sep, '.')
                  self.od_pkl.write([self.s_id, 'path'], path)
              else:
                  self.pos = self.od_pkl.read([self.s_id, 'position'], (0, 0))
      
              """ Instanciation de l'UI. """
              self.o_grnode = UiNode(self)  # Gestion de l'UI (Interface utilisateur) : dessin couleurs, ...
              if b_new:
                  self.o_grnode.save_pos()  # Important ! à la fin (contient o_pkl.backup()).
              self.o_scene.o_grscene.addItem(self.o_grnode)  # Incorporation dans la scène.
      
              """ Sockets. """
              num_socket = 0
              for i, d_input in enumerate(self.ld_inputs):
                  """ On complète les dictionnaires de base. """
                  if isinstance(d_input, dict):  # Si ce n'est pas un dictionnaire, c'est un séparateur.
                      d_input['pos'] = True, i  # On ajoute l'emplacement. Tuple (input: True, num_position).
                      d_input['id'] = self.id, num_socket  # On ajoute l'identifiant. Tuple (id_node, num_socket)
                      self.lo_sockets_in.append(CtrlSocket(self, d_input))  # Socket instancié et ajouté à la liste.
                      num_socket += 1
              num_socket = 0
              for i, d_output in enumerate(self.ld_outputs):
                  """ On complète les dictionnaires de base. """
                  if isinstance(d_output, dict):  # Si ce n'est pas un dictionnaire, c'est un séparateur.
                      d_output['pos'] = False, i  # On ajoute l'emplacement. Tuple (input: False, num_position).
                      d_output['id'] = self.id, num_socket  # On ajoute l'identifiant. Tuple (id_node, num_socket)
                      self.lo_sockets_out.append(CtrlSocket(self, d_output))  # Socket instancié et ajouté à la liste.
                      num_socket += 1
      
              self.short_circuit_check()
      
          @property
          def id(self):
              """ Ex : 'Node 14' renvoie 14. """
              return int(self.s_id[4:])
      
          @property
          def d_display(self):
              """ Attributs par défaut pouvant être surchargés dans les classes dérivées. """
              return {
                  'geometry': {'h_title': 32, 'round': 6},
                  'col_pen': {'select': '#ffa637', 'hover': '#888', 'on': "#000", 'off': '#aaaa00'},  # ordre = priorité.
                  'col_brush': {'on': "#8888bbcc", 'off': '#cccccccc'},
                  'thick_pen': {'hover': 4., 'leave': 2.}
              }
      
          @staticmethod
          def deepcopy(reference):
              return copy.deepcopy(reference)
      
          def get_default(self):
              """ 1) Paramètres communs : Titre, et autant de couleurs que de sorties. """
              d_default = {'Titre du node': 'Choisir un titre'}
              d_colors = dict()
              for d_output in self.ld_outputs:
                  if isinstance(d_output, dict):
                      d_colors[d_output['label']] = self.default_color
              if d_colors:
                  d_default['Couleurs'] = d_colors
      
              """ 2) Ajout de paramètres fixes spécifiques au type de node. """
              d_default.update(self.fixed_params())       # méthode fixed_params()
      
              """ 3) Ajout des paramètres dynamiques, qui dépendent des signaux entrants. """
              d_default.update(self.dynamic_params)       # propriété dynamic_params
      
              """ Dictionnaire complet. Paramètres : fixes communs + fixes spécifiques + dynamiques. """
              return {self.main_key: d_default}
      
          def get_param(self, l_keys, v_default=None):
              od_params = Dictionary(self.o_params.od_params[self.main_key])
              return od_params.read(l_keys, v_default)
      
          def set_checked(self, val):
              self.b_chk = val
              self.b_on = self.b_chk  # Provisoire.
      
              """ Enregistrement. """
              self.od_pkl.write([self.s_id, 'b_chk'], val)
              self.o_scene.o_pkl.backup()
      
          @staticmethod
          def join(*l_variants, sep='-'):
              """
              Concatène les éléments de l_strings en une chaine. Les chaînes vides sont ignorées. le séparateur est sep.
              :param l_variants: Nombre quelconque d'éléments de tout type.
              :param sep: '-' par défaut. Peut être '' (vide) ou \n (retour à la ligne) ... ou autre.
              :return: Chaîne concaténée.
                  Exemple l_strings = 'Paris', 'Lille', '', 'Bruxelles', 'Prague' => return "Paris-Lille-Bruxelles-Prague"
              """
              string_txt = ''
              for string in l_variants:
                  if string != '':
                      string_txt += f'{sep}{string}'  # Accepte tout type de variable pour string.
              return string_txt[len(sep):]            # Suppression du 1er sep.
      
          """ ************************* Méthodes à surcharger ************************** """
          @property
          def ld_inputs(self):
              return list()
      
          @property
          def ld_outputs(self):
              return list()
      
          def fixed_params(self):
              """ - Renvoie un dictionnaire de paramètres, indépendants des connexions.
                  - Exemple : le node de type 'Signaux' renvoie 'Sinus', 'Cosinus', etc. """
              return {}
      
          def my_params(self, context):
              """
              :param context: Liste de 7 valeurs, pouvant être utilisée pour construire le dictionnaire de sortie :
                  - context[0] typ_id_from = Type + id du node en amont, producteur du signal. Ex : 'MM12'.
                  - context[1] signal_ante_from = Nom du signal créé par le node en amont du node producteur. Ex : 'Cosinus'.
                  - context[2] signal_now_from = Nom du signal créé par le node en amont. Ex : 'SMA18'.
                  - context[3] signal_source_from = 'Provenance du signal' affiché dans le node en amont.
                  - context[4] num_signal = Ordre du signal dans le faisceau entrant.
                  - context[5] signal_title = Titre du signal dans le dockable des paramètres.
                  - context[6] signal_source = 'Provenance du signal' -> texte avec retours à la ligne.
              :return: Dictionnaire formaté pour les paramètres.
                  |_ Exemple {'Type de MA': ['SMA', {'values': ['SMA', 'EMA', 'SMMA', 'LWMA']}], "Périodes (sep=',')": '14'}
              """
              raise SystemExit(self.child_file + '\nCtrlNode.my_params() : Cette méthode doit être surchargée.')
      
          def my_signals(self, l_signals_in, num_socket_out):
              """ Description des signaux délivrés à la sortie N° num_socket_out.
              Comment ça marche ?
                  - Cette fonction prend en entrée tous les signaux entrants ainsi que les paramètres pkl.
                  - Un traitement spécifique est décrit, qui produit plusieurs signaux.
                  - Ces signaux sont ensuite distribués aux différentes sorties du node.
              :param l_signals_in: Signaux d'entrée. Cette liste contient autant d'éléments que d'entrées connectées.
                  - Chaque élément est un tuple : (typ_id_from, signal_ante_from, signal_now_from, signal_source_from, \
                  num_signal, signal_title, typ_id, signal_ante, signal_source)
                  (Les variables suffixées de '_from' proviennent du node en amont).
              :param num_socket_out: N° de la sortie.
              :return: Liste de signaux exclusivement destinés à la sortie N° num_socket_out.
                  Chaque signal est décrit par un tuple à 4 valeurs : (typ_id, signal_ante, signal_now, signal_source).
              """
              raise SystemExit(self.child_file + '\nCtrlNode.my_signals() : Cette méthode doit être surchargée.')
          """ *********************** Fin - Méthodes à surcharger ********************** """
      
          @property
          def dynamic_params(self):
              """ Code appelant : self.get_default().
              Ces paramètres sont dynamiques car ils dépendent des signaux connectés aux entrées.
              - Ce node demande la liste des signaux à chaque socket d'entrée : o_socket_in.l_signals
              - Cette liste est formatée de la même façon pour tous les types de node.
              - C'est une liste de tuples, chaque tuple a 4 valeurs et caractérise un signal.
              - Il est rappelé qu'un edge véhicule non pas un signal, mais un faisceau de signaux (un vecteur).
              - Le format d'un tuple est le suivant :
                  - typ_id_from = Type + id du node en amont, producteur du signal. Ex : 'MM12'.
                  - signal_ante_from = Nom du signal créé par le node en amont du node producteur. Ex : 'Cosinus'.
                  - signal_now_from = Nom du signal créé par le node producteur. Ex : 'SMA18'.
                  - signal_source_from = Texte destiné à être affiché dans le dockable, dans 'Provenance du signal :'
              - Node
                  |_ Socket_in
                      |_ Faisceau de signaux
                          |_ Signal
                              |_ Paramètre dynamique.
              """
              if not self.lo_sockets_in:
                  """ Tant que l'initialisation n'est pas terminée, lo_sockets_in est une liste vide."""
                  return {}
      
              d_params = dict()
      
              # à coder ...
      
              return d_params
      
          def get_signals(self, num_socket):
              """ Code appelant : oSocket_out N° num_socket.
              - Ce node crée des signaux et les fournit aux nodes en aval par l'intermédiaire des sockets et des edges.
              - Il crée autant de faisceaux de signaux (vecteurs) qu'il a de sorties.
              - Pour chaque sortie, la liste fournie est formatée de la même façon pour tout type de node.
              - Voir détails ci-dessus, dans dynamic_params().
              """
              if not self.lo_sockets_out:  # Setup en cours
                  return []
      
              """ Update od_params. """
              self.o_params.set_params()
      
              nb_inputs = len(self.lo_sockets_in)
              l_all_signals_in = list()
      
              # à coder ...
      
              return self.my_signals(l_all_signals_in, num_socket)
      
          """ ******************************* Secousses ******************************* """
          def short_circuit_check(self):
              """ Passage unique dans cette méthode, au démarrage. Fin programme si erreur.
                  - Peut-on supprimer les edges d'entrée en cas de court-circuit ?
                  - Oui, à condition que TOUTES les sorties existent dans self.l_short_circuits. """
              l_indx_outputs = list()  # Liste des index de sortie.
              d_counter = dict()
              for short_circuit in self.l_short_circuits:  # Exemple -> self.l_short_circuits = [(0, 0), (0, 2)]
                  num_input = short_circuit[0]
                  num_output = short_circuit[1]
                  if num_input >= len(self.ld_inputs):
                      """ Fin programme. """
                      raise SystemExit(f"La liste des court-circuits du type de node '{self.type}' est erronée."
                                       f"\nL'entrée N°{num_input} est hors limites."
                                       f"\nRevoir le contenu de self.l_short_circuits dans le setup de '{self.type}'.")
                  if num_output >= len(self.ld_outputs):
                      """ Fin programme. """
                      raise SystemExit(f"La liste des court-circuits du type de node '{self.type}' est erronée."
                                       f"\nLa sortie N°{num_output} est hors limites."
                                       f"\nRevoir le contenu de self.l_short_circuits dans le setup de '{self.type}'.")
                  l_indx_outputs.append(num_output)  # Dans cet exemple ->                   ( 0  ,    2 )
                  if num_output in d_counter:
                      d_counter[num_output] += 1
                  else:
                      d_counter[num_output] = 1
              for i, d_output in enumerate(self.ld_outputs):
                  if isinstance(d_output, dict):
                      if i not in l_indx_outputs:
                          """ Condition non satisfaite. """
                          self.b_removable = False  # b_removable est à True dans __init__().
                          break
              """ Contrainte technologique : Plusieurs entrées sur une même sortie est interdit ! """
              for nb_inputs in d_counter.values():
                  if nb_inputs > 1:
                      """ Fin programme. """
                      raise SystemExit("Plusieurs entrées sont court-circuitables sur une même sortie."
                                       f"\nRevoir le contenu de self.l_short_circuits dans le setup de '{self.type}'.")
      
          def set_shortables(self):
              """ Méthode appelée lorsque le bouton gauche de la souris est appuyé (sur ce node).
                  - Chaque type de node possède sa liste de court-circuits faisables.
                  - Retourne une liste de tuples.
                  - Chaque tuple est composé des 2 gredges pouvant être réunis pour n'en faire qu'un. """
              self.l_shortables = list()
              for short_circuit in self.l_short_circuits:  # Exemple -> self.l_short_circuits = [(0, 0), (0, 2)]
                  lo_gredges_in = list(self.lo_sockets_in[short_circuit[0]].get_gredges())
                  lo_gredges_out = list(self.lo_sockets_out[short_circuit[1]].get_gredges())
                  if lo_gredges_in:  # Cette liste contient 0 ou 1 edge.
                      for o_gredge_out in lo_gredges_out:
                          self.l_shortables.append((lo_gredges_in[0], o_gredge_out))
      
          def shake(self):
              def shortcircuit():
                  for to_gredge in self.l_shortables:                         # Tuples de gredges.
                      o_socket_out = to_gredge[0].o_edge.o_socket_out         # From, depuis.
                      o_socket_in = to_gredge[1].o_edge.o_socket_in           # To, jusqu'à.
      
                      """ Suppression des 2 edges du tuple. """
                      self.o_scene.o_grscene.removeItem(to_gredge[1])         # Edge de sortie.
                      if self.b_removable:
                          self.o_scene.o_grscene.removeItem(to_gredge[0])     # Edge d'entrée.
      
                      """ Création du nouvel edge de court-circuit. """
                      CtrlEdge(self.o_scene, o_socket_out, o_socket_in)
                      o_socket_in.to_update()
      
                  """ Persistance. """
                  self.o_scene.save_edges(backup=True)
      
              def raz_shakes():
                  """ Après cette raz, un nouveau court-circuit est autorisé. """
                  self.k_shake = 0
                  self.b_shaking = False
      
              if not self.l_shortables:
                  return
              self.dt.delay(raz_shakes, delay=100)
              x, y = self.o_grnode.pos().x(), self.o_grnode.pos().y()  # Positions.
              dx, dy = x - self.x, y - self.y  # Déplacements (delta x, delta y).
              if (dx * self.dx < 0) or (dy * self.dy < 0):
                  """ Le mouvement a changé de sens => incrémentation du compteur. """
                  self.k_shake += 1
                  if self.k_shake > 5:  # Valeur à ajuster.
                      shortcircuit()
      
              """ Mémorisation. """
              self.x, self.y = x, y
              self.dx, self.dy = dx, dy
          """ ***************************** Fin secousses ***************************** """
      
          def set_colors(self, l_keys):
              """ - La couleur est appliquée au socket_out concerné.
                  - Elle est également appliquée à tous ses éventuels edges connectés.
                  - A leur tour, ceux-ci propagent cette couleur à leur socket d'arrivée. """
              for o_socket_out in self.lo_sockets_out:
                  if o_socket_out.__class__.__name__ == 'CtrlSocket':
                      if o_socket_out.label == l_keys[-1]:
                          """ Affectation de la couleur au socket_out concerné. """
                          o_socket_out.color = self.get_param(l_keys, '#666')
                          """ Récupération de tous les edgess partant de ce socket. """
                          l_gredges = list(o_socket_out.get_gredges())
                          for o_gredge in l_gredges:
                              """ Affectation de la même couleur à chaque edge. """
                              o_gredge.o_edge.color = o_socket_out.color
                              """ Affectation de la même couleur à chaque socket d'arrivée. """
                              o_gredge.o_edge.o_socket_in.color = o_socket_out.color
              """ Rafraîchissement de l'affichage. """
              self.o_grnode.update()
      
          def refresh(self, l_keys):
              """ Titre du node. """
              if l_keys[-1] == 'Titre du node':
                  self.o_grnode.set_title()
              """ Couleur des sockets et des edges. """
              if l_keys[-2] == 'Couleurs':
                  self.set_colors(l_keys[1:])
      

      Vous devrez coder les méthodes dynamic_params() et get_signals().


Vérification

Pas de TDD, mais les résultats à obtenir sont ci-dessous.
  • Retirer le mode debug.
  • On conserve le même graphe : Un node Plot connecté à un node Signaux :



 

 

 

 

Bon coding et bon courage !


Snippets

Essayez de résoudre cette fonctionnalité par vous-même.
Consultez les réponses (snippets) seulement si vous n'avez pas trop de temps.

Bonjour les codeurs !