Stockage local des cours de devises 'Candles'
Avant-propos
Description
candle.sql
← dans ce tuto.tick.sql
← dans le prochain tuto.renko.sql
← dans le prochain tuto.DataBase
← Classe-mère, dans le fichier /trading/historiques/db.py
.DbCandle
← Hérite de DataBase, dans le fichier /trading/historiques/db_candle.py
.DbTick
← Hérite de DataBase, dans le fichier /trading/historiques/db_tick.py
.DbRenko
← Hérite de DataBase, dans le fichier /trading/historiques/db_renko.py
./trading/historiques/ctrl_histos.py
CtrlHistos.__init__()
, la variable db_path
a été définie par vous-même :
SSD
de préférence).ctrl_histos.py
.db_path
, et de son arborescence devront être sauvegardés.
utils.py
:
Utils.printc()
.Utils.gauge()
.DateTime.isoyw_from_fxyw()
← yw
signifie (year, week).DateTime.get_stamp_offset()
.utils.py, contenant entre autres, les classes Utils et DateTime
:
# Imports externes
from PyQt5.QtWidgets import QMessageBox
from PyQt5.QtCore import QSettings, QTimer, QRect, QTranslator, QLocale, QLibraryInfo, QFileSystemWatcher
import subprocess
import threading
import inspect
from collections import OrderedDict
import datetime
import time
import copy
import os
import sys
import winsound
import colorama
colorama.init()
class Dictionary(OrderedDict):
def __init__(self, dic=None, max_keys=0):
"""
:param dic: dict() or None.
:param max_keys: Nombre max de clés (de depth=0) autorisé dans le dictionnaire :
- Si -1 → infini.
- Sinon, gestion de l'ordre des clés lors de l'écriture :
- Les plus récentes en tête.
- Suppression de celles dont l'ordre est > max_keys.
"""
super().__init__()
self.max_keys = max_keys
if isinstance(dic, dict):
""" 'self' est un dictionnaire vide. """
self.update(dic)
def print(self, dic=None, length=60, dash='-', depth=0, blank=2):
"""
- Méthode récursive (qui s'appelle elle-même), d'où la présence du depth.
- Elle affiche plusieurs lignes, chacune se terminant par "<class '...'>"
:param dic: Dictionnaire à afficher.
:param length: Nombre de caractères, ou, plus précisément, position pour chaque ligne du texte "<class '...'>"
:param dash: Symbole '-', '_', '.', ' ', ou tout autre caractère.
:param depth: Le 1er élément a un depth de 0. S'il contient à so tour un dictionnaire,
celui-ci aura un depth de 1, et ainsi de suite.
:param blank: 0, 1, 2 ou 3
0 = pas de ligne vide avant ni après l'affichage du dictionnaire.
1 = Une ligne vide avant l'affichage du dictionnaire.
2 = Une ligne vide après l'affichage du dictionnaire.
3 = Une ligne vide avant ET après l'affichage du dictionnaire (valeur par défaut).
:return: NA, le retour est en fait une succession de print() dans la console 'run'.
En cas d'erreur :
- Si dic vide, afficher 'Dictionnaire vide.'
- Si dic n'est pas un dictionnaire, afficher :
f"Vérifie dico : Le paramètre reçu n'est pas un dictionnaire.\nType : {type(dic)}\nValeur : {dic}"
"""
if dic is None:
dic = self
if len(dic) == 0:
print('Dictionnaire vide.')
return
if blank & 1: # Bit 0 : Espace avant (ligne vide)
print()
try:
for key, val in dic.items(): # Attention ! key peut être de type autre que str.
if isinstance(val, dict):
print(self.dashes(f"{' ' * 4 * depth}{key}:", length, type(val), dash))
depth += 1
self.print(val, length, dash, depth, blank=0)
depth -= 1
else:
print(self.dashes(f"{' ' * 4 * depth}{key}: {val}", length, type(val), dash))
except (Exception,):
print(f"Vérifie dico : Le paramètre reçu n'est pas un dictionnaire."
f"\nType : {type(dic)}\nValeur : {dic}")
return
if blank & 2: # Bit 1 : Espace après (ligne vide)
print()
@staticmethod
def dashes(before, nb, after, dash='-'):
""" Formatage de chaine : https://pyformat.info/ <-- Ctrl + Clic
Exemple : " position: (-29, -25) ------------------------- <class 'tuple'>"
<------- before -------> <---- nb de tirets ----> <--- after --->
<------- nb = before + 1 + nb de tirets --------->
:param before: Chaîne avant les tirets.
:param nb: taille de before + 1 + Nombre de tirets.
:param after: Chaîne après les tirets.
:param dash: Symbole '-', '_', '.', ' ', ou tout autre caractère.
:return: Chaîne complète, exemple → " position: (-29, -25) ------------------------- <class 'tuple'>"
"""
return f"{before + ' ' :{dash}<{nb}} {after}"
def key_list(self):
"""
Mise à plat des clés arborescentes des feuilles.
:return: Liste des clés des feuilles. Chaque clé est elle-même une liste.
"""
def recursive(dico, l_keys):
for key, val in dico.items():
if isinstance(val, dict):
recursive(val, l_keys + [key])
else:
ll_keys.append(l_keys + [key])
ll_keys = list() # ll_ = Liste de listes
recursive(self, [])
return ll_keys
def read(self, l_keys, default=None):
"""
:param l_keys: Si c'est une liste, elle correspond à la hiérarchie dans l'arbre.
:param default: Valeur par défaut à retourner lorsque la recherche échoue.
:return: Valeur de la dernière clé de la liste -> sans enfant (feuille).
"""
if isinstance(l_keys, str):
l_keys = l_keys, # str devient tuple
dic = self
try:
for l_key in l_keys[:-1]:
dic = dic[l_key]
return dic[l_keys[-1]]
except (Exception,):
return default
def write(self, l_keys, value):
"""
Important : Si la clé n'existe pas, elle est créée.
:param l_keys: Si c'est une liste, elle correspond à la hiérarchie dans l'arbre.
:param value: Valeur affectée à la dernière clé de la liste -> sans enfant (feuille).
:return: Booléen de réussite.
"""
if isinstance(l_keys, str):
l_keys = l_keys, # str devient tuple
dic = self
try:
for l_key in l_keys[:-1]:
if l_key not in dic:
dic[l_key] = dict() # Création de la clé si inexistante.
dic = dic[l_key]
dic[l_keys[-1]] = value
self._sort_cut(l_keys[0])
return True
except (Exception,):
return False
def _sort_cut(self, master_key):
""" SORT AND CUT
Méthode appelée lorsqu'on accède au dictionnaire, en lecture comme en écriture.
self.max_keys est le nombre maximum admis de clés de 1er niveau (depth=0).
- Si self.max_keys <= 0 (par défaut) le nombre maximum admis est infini.
- Si self.max_keys > 0, tri en remontant la clé master_key en 1ère place, puis ...
|_ ... supprime les clés de depth 0 dont l'index est supérieur ou égal au max autorisé.
:param master_key: Clé à remonter (de depth 0).
:return: NA
"""
if self.max_keys > 0: # Si <= 0, infini.
self.move_to_end(master_key, last=False) # Remonte en tête.
for i, (key, val) in enumerate(self.items()):
if i >= self.max_keys:
self.delete(key,)
def delete(self, l_keys):
"""
Suppression, si elle existe, de la clé correspondant à l_keys.
:param l_keys: liste des clés, ordonnées hiérachiquement.
:return: True si la clé a effectivement été supprimée.
False si échec ou si la clé est absente.
"""
if isinstance(l_keys, str):
l_keys = l_keys,
dic = self
try:
for l_key in l_keys[:-1]:
dic = dic[l_key]
del dic[l_keys[-1]]
return True
except (Exception,):
return False
def fusion(self, d_added):
""" Les clés (et les valeurs) de d_added viennent s'ajouter à celle de self.
Si la même clé existe dans les 2 dictionnaires, celle de self est prioritaire. """
od_added = Dictionary(d_added)
l_keys = self.key_list() + od_added.key_list()
""" Dédoublonnage. """
s_keys = set()
for key in l_keys:
s_keys.add(tuple(key))
""" Update de self. """
for key in s_keys:
self.write(key, self.read(key, od_added.read(key)))
class DateTime:
"""
Classe dédiée à la gestion du temps :
- Dates, heures, différents formats (Unix, Windows, ...)
- Temporisateurs, horloges, ...
"""
def __init__(self):
self.d_tempo = dict() # Dictionnaire de temporisateurs.
def delay(self, client_function, delay=20, *args, **kwargs):
""" Asynchrone (ne prend pas la main).
:param client_function: Cette fonction sera exécutée après un délai.
:param delay: Délai en ms (20ms par défaut).
:param args: Optionnel. Arguments dans l'appel de la fonction (tuple ou liste).
:param kwargs: Optionnel. Arguments dans l'appel de la fonction (dictionnaire).
:return: NA
"""
key = client_function.__name__
if key in self.d_tempo:
try:
self.d_tempo[key].disconnect()
except TypeError:
pass
else:
self.d_tempo[key] = QTimer()
self.d_tempo[key].setSingleShot(True)
self.d_tempo[key].timeout.connect(lambda: client_function(*args, **kwargs))
self.d_tempo[key].start(delay)
@staticmethod
def get_dtstamp_from_dtstr(dt_str, dt_format="%m/%d/%Y %H:%M:%S.%f"):
""" - Fonction inverse de get_dtstr_from_dtstamp(). Précision à la microseconde.
- Remarque : le format iso est '%Y-%m-%d %H:%M:%S.%f'.
@param dt_str: Chaîne de caractères représentant une date-heure au format 'humain'.
@param dt_format: Format par défaut, dicté par celui utilisé dans les fichiers d'historiques téléchargés.
@return: Nombre réel au format stamp unix → https://www.unixtimestamp.com/
"""
dt_stamp = datetime.datetime.strptime(dt_str, dt_format)
time_stamp = time.mktime(dt_stamp.timetuple())
micro_s = dt_stamp.microsecond / 1000_000
return time_stamp + micro_s
@staticmethod
def get_dtstr_from_dtstamp(dt_stamp, dt_format="%m/%d/%Y %H:%M:%S"):
""" - Fonction inverse de get_dtstamp_from_dtstr().
- Remarque : le format iso est '%Y-%m-%d %H:%M:%S' """
micro_s = int((dt_stamp % 1) * 1_000_000) # Partie décimale * 1 000 000.
dt_stamp = int(dt_stamp)
structured_time = time.localtime(dt_stamp)
date_str = time.strftime(dt_format, structured_time)
return f'{date_str}.{micro_s}' if '%S' in dt_format else date_str
@staticmethod
def get_date_from_dtstamp(dt_stamp):
""" dtstamp --> o_date """
return datetime.date.fromtimestamp(dt_stamp) # Conversion timestamp en objet <class 'datetime.date'>
def get_dtstamp_from_date(self, o_date):
""" L'objet o_date ne possède pas d'attribut 'timestamp' => Conversion préalable en o_datetime """
o_datetime = self.get_datetime_from_date(o_date)
return o_datetime.timestamp()
@staticmethod
def get_datetime_from_dtstamp(dt_stamp):
return datetime.datetime.fromtimestamp(dt_stamp) # Conversion timestamp en objet <class 'datetime.datetime'>
@staticmethod
def get_dtstamp_from_datetime(o_datetime):
return o_datetime.timestamp()
@staticmethod
def get_datetime_from_date(o_date):
return datetime.datetime.combine(o_date, datetime.datetime.min.time())
@staticmethod
def get_date_from_datetime(o_datetime):
return o_datetime.date()
@staticmethod
def yw_from_date(d_date):
""" d_date est de type date ou datetime. """
d_iso = d_date.isocalendar()
return d_iso[0], d_iso[1] # (year, week) - 52 semaines, sauf pour 2020 : 53 semaines.
def isoyw_from_fxyw(self, fx_year, fx_week):
""" Une semaine au format FX produit une semaine au format ISO. """
stamp = self.get_dtstamp_from_dtstr(f'{fx_year}-{fx_week}-3 10', dt_format='%G-%V-%u %H') # Mercredi 10:00
o_dt = self.get_date_from_dtstamp(stamp)
return o_dt.isocalendar()[:2] # Les 2 premiers éléments = (year, week)
def get_stamp_offset(self, dtstamp, b_after, num_day, _time=None):
"""
@param dtstamp: Date-heure, point de départ.
@param b_after: On demande l'offset pour une date : postérieure si True, antérieure si False.
@param num_day: isoweekdat -> 1 = lundi, ..., 7 = dimanche.
@param _time: Même heure que dtstamp si None. Format = '%H:%M'
@return: float positif ou négatif, selon b_after.
"""
num_day = min(7, max(1, num_day))
o_dt = self.get_datetime_from_dtstamp(dtstamp)
delta_day = num_day - o_dt.isocalendar()[2]
if delta_day > 0:
delta_day -= 0 if b_after else 7
elif delta_day < 0:
delta_day += 7 if b_after else 0
else:
delta_day += 7 if b_after else -7
o_dt += datetime.timedelta(days=delta_day)
if _time is not None:
hour, minute = int(_time.split(':')[0]), int(_time.split(':')[1])
o_dt = o_dt.replace(hour=hour, minute=minute, second=0, microsecond=0)
return self.get_dtstamp_from_datetime(o_dt) - dtstamp
# noinspection PyUnresolvedReferences, DuplicatedCode
class Utils:
def __init__(self, path='', file='params'):
self.o_dt = DateTime() # o_dt = 'Object DateTime'
self.caller_dir = os.getcwd() # dossier du code appelant. Ne pas remplacer getcwd()
path_conf = self.caller_dir if path == '' else path
self.settings = QSettings(f"{path_conf}{os.sep}{file}.conf", QSettings.IniFormat)
self.watcher = None
def save_state(self, win):
def memo_state():
self.settings.setValue('Geometrie', win.geometry())
self.settings.setValue('Etat', win.saveState())
self.o_dt.delay(memo_state, 100)
def restore_state(self, win):
""" Lecture et application de la géométrie à la fenêtre principale. """
win.setGeometry(self.settings.value('Geometrie', QRect(200, 200, 400, 200)))
""" Lecture et application de la géométrie aux dockables. """
q_state = self.settings.value('Etat')
if q_state is not None:
win.restoreState(q_state) # Méthode du parent de win : QMainWindow.
def save_tabs(self, win):
""" Sauvegarde de l'état des onglets (liste ordonnée, sélection). """
l_tabs = []
for i in range(win.tabGraphs.count()):
l_tabs.append(win.tabGraphs.widget(i).graph_name)
self.settings.setValue('List tabs', l_tabs)
self.settings.setValue('Active tab', win.tabGraphs.currentIndex())
def restore_tabs(self, win):
""" Restauration initiale de l'état des onglets (nombre, ordre et sélection). """
l_tabs = self.settings.value('List tabs')
active_tab = self.settings.value('Active tab')
if not (l_tabs is None or active_tab is None):
""" Seulement si params.conf contient les clés 'List tabs' et 'Active tab'. """
for tab in l_tabs:
win.open_graph(tab)
win.tabGraphs.setCurrentIndex(int(active_tab)) # On active l'onglet.
def save_dock_nodes(self, win):
""" Mémorisation de l'ordre des onglets. """
l_tab_nodes = []
for i in range(win.tabNodes.count()):
l_tab_nodes.append(win.tabNodes.tabText(i))
self.settings.setValue('List tab_nodes', l_tab_nodes)
self.settings.setValue('Selected tab_node', win.tabNodes.currentIndex())
def restore_dock_nodes(self, win):
"""
:param win: objet = instance (unique) de la fenêtre Main.
:return: tuple (liste mémorisée des onglets, index mémorisé de l'onglet actif)
"""
l_tab_nodes = self.settings.value('List tab_nodes', [])
n_selected = int(self.settings.value('Selected tab_node', 0))
""" Liste des dossiers de /nodes par défaut. """
if len(l_tab_nodes) != len(win.do_groups_nodes):
""" Si l_tab_nodes n'est pas conforme (même taille que le dico), on crée
une liste par défaut à partir du dico et l'onglet sélectionné est le n°0.
"""
l_tab_nodes.clear()
for friendly_name in win.do_groups_nodes.keys():
l_tab_nodes.append(friendly_name)
n_selected = 0
return l_tab_nodes, n_selected
@staticmethod
def translate(app):
""" Exemple d'utilisation : ut.translate(app)
:param app: objet QApplication
:return: NA
"""
translator = QTranslator(app)
locale = QLocale.system().name().split('_')[0] # Paramètre de Windows. Exemple : 'fr'
path = QLibraryInfo.location(QLibraryInfo.TranslationsPath)
translator.load(f'{path}/qtbase_{locale}')
app.installTranslator(translator)
@staticmethod
def ui2py(action=None):
"""
Cette méthode est chargée de compiler tous les fichiers .ui et .qrc du dossier /design du code appelant.
- Le code appelant manipule des fenêtres ou des boîtes de dialogue créées par Qt Designer.
- Le code appelant est dans un dossier : /pc, /affichage, /tests, ... et d'autres à venir.
:param action: False, True ou None
- Si False, ne compile pas.
- Si True, compile systématiquement.
- si None (par défaut), compile si nécessaire (automatique).
|_ automatique = seuls les fichiers-source qui ont été modifiés.
:return: True si réussite, False si échec.
"""
if action is False:
return True
""" Création de listes des fichiers concernés. """
client_dir = os.path.abspath(os.path.dirname(inspect.stack()[1][1])) # dossier du code client.
design_dir = os.path.abspath(f"{client_dir}{os.sep}design")
l_files_ui = list() # Liste des fichiers-source '.ui' (interfaces utilisateur).
l_files_rc = list() # Liste des fichiers-source '.qrc' (ressources).
l_targets = list() # Liste des fichiers-cible à obtenir '.py'.
for file_name in os.listdir(design_dir):
if file_name.lower().endswith('.ui'):
l_files_ui.append(file_name[:-3]) # Nom du fichier sans l'extension '.ui'
l_targets.append(f"{client_dir}{os.sep}{file_name[:-3]}.py") # chemin complet, avec l'extension '.py'
elif file_name.lower().endswith('.qrc'):
l_files_rc.append(file_name[:-4]) # Nom du fichier sans l'extension '.qrc'
l_targets.append(f"{client_dir}{os.sep}{file_name[:-4]}_rc.py") # complet, avec l'extension '.py'
settings = QSettings(f"{client_dir}{os.sep}params.conf", QSettings.IniFormat)
if action is True:
""" action == True => Compilation forcée (pas de filtrage). """
filtered_files_ui = l_files_ui
filtered_files_rc = l_files_rc
else:
""" Compilation automatique => Filtrage (seulement les fichiers modifiés).
On compare les date-heure (de modification) de chaque fichier à compiler avec celles qui ont été mémorisées.
Si égalité => compilation inutile.
"""
filtered_files_ui = list()
for file_ui in l_files_ui:
file_source = f"{design_dir}{os.sep}{file_ui}.ui"
file_target = f"{client_dir}{os.sep}{file_ui}.py"
dh_now = os.path.getmtime(file_source)
dh_memo = settings.value(f"dh_{file_source}", 0.)
if not os.path.isfile(file_target) or float(dh_now) != float(dh_memo):
filtered_files_ui.append(file_ui)
filtered_files_rc = list()
for file_rc in l_files_rc:
file_source = f"{design_dir}{os.sep}{file_rc}.qrc"
file_target = f"{client_dir}{os.sep}{file_rc}.py"
dh_now = os.path.getmtime(file_source)
dh_memo = settings.value(f"dh_{file_source}", 0.)
if not os.path.isfile(file_target) or float(dh_now) != float(dh_memo):
filtered_files_rc.append(file_rc)
""" Compilation """
scripts_path = f"{os.path.dirname(sys.executable)}{os.sep}Scripts{os.sep}"
pyuic_exe = f"{scripts_path}pyuic5.exe"
for file_ui in filtered_files_ui:
f_source = f"{design_dir}{os.sep}{file_ui}.ui"
f_target = f"{client_dir}{os.sep}{file_ui}.py"
if subprocess.run([pyuic_exe, f_source, '-o', f_target, '-x']).returncode != 0:
return False
settings.setValue(f"dh_{f_source}", os.path.getmtime(f_source))
pyrcc_exe = f"{scripts_path}pyrcc5.exe"
for file_rc in filtered_files_rc:
f_source = f"{design_dir}{os.sep}{file_rc}.qrc"
f_target = f"{client_dir}{os.sep}{file_rc}_rc.py"
if subprocess.run([pyrcc_exe, f_source, '-o', f_target]).returncode != 0:
return False
settings.setValue(f"dh_{f_source}", os.path.getmtime(f_source))
return True
# **********************************************************
# Watching - Surveillance de fichier *
# **********************************************************
def watch_file(self, l_files, callback):
"""
QFileSystemWatcher peut renvoyer 2 signaux coup sur coup au lieu d'un seul :
|_ Le premier parce que le fichier à surveiller a été supprimé.
|_ Le deuxième parce qu'il a été re-créé.
Pour éviter ce défaut et un double callback, on met en place un délai de 100 ms ...
|_ ... qui prend en charge le callback.
:param l_files: Liste des fichiers et dossiers à surveiller (chemins complets)
:param callback: procédure à exécuter à chaque shoot du watcher.
:return: callback(<fic>) avec le fichier modifié en argument.
"""
def file_changed(fic):
if fic in status[0]:
return
status[0].append(fic) # current_file
self.o_dt.delay(send_file, 100)
def send_file(): # Fin du monostable.
callback(status[0])
status[0] = list() # Reset.
status = [list()] # [[fichiers modifiés]] <- Liste dans liste.
self.watcher = QFileSystemWatcher(l_files)
self.watcher.directoryChanged.connect(file_changed)
self.watcher.fileChanged.connect(file_changed)
# |_ self.watcher = Mémorisation du QFileSystemWatcher obligatoire à cause du garbage collector de Python.
# **********************************************************
# Messages Messages Messages Messages Messages Messages *
# **********************************************************
@staticmethod
def msg_info(title, msg, parent=None, sound=True, typed='info'):
"""
Si parent == None => Affichage au milieu de l'écran.
|_ Si parent est l'objet-widget appelant => Affichage au centre de la fenêtre.
"""
if sound:
winsound.MessageBeep() # winsound.MB_ICONASTERISK)
if typed == 'info':
QMessageBox.information(parent, title, msg)
elif typed == 'error':
QMessageBox.critical(parent, title, msg)
elif typed == 'yesno':
response = QMessageBox.question(parent, title, msg, QMessageBox.Yes, QMessageBox.No)
return response == QMessageBox.Yes # True si oui.
def msg_error(self, title, msg, parent=None, sound=True):
self.msg_info(title, msg, parent, sound, 'error')
def msg_yesno(self, title, msg, parent=None, sound=True):
return self.msg_info(title, msg, parent, sound, 'yesno')
def gauge(self, label='progress', char='▒', large=48, color='BLEU', b_silent=False):
""" Exemple de code. Pour 1 ligne, la méthode est appelée au début, en boucle et à la fin :
self.ut.gauge('label :', large=17) # <-- Début de ligne (label).
for week in range(1, 53):
self.ut.gauge(char='▒') # <-- Progression sur la même ligne (en boucle).
self.ut.gauge('end') # <-- Fin de ligne.
"""
if b_silent:
return
if label == 'progress': # En cours de progression, sur la même ligne.
self.printc(char, color=color, end=' ')
elif label == 'end': # Passe à la ligne en fin de traitement.
print()
else: # Début de ligne.
msg = f"{label :<{large}}"
self.printc(msg, 'JAUNE', end=' ')
@staticmethod
def printc(msg='', color='ROUGE', end='\n'):
"""
Écrit dans la console, en couleur - Rouge par défaut.
|_ S'il y a cumul de valeurs, les placer dans une liste -> []
|_ Exemple : printc('Bonjour le monde !', ['BLEU', 'UNDERLINE'])
Certaines consoles n'acceptent pas tous les paramètres.
|_ Par exemple, celle de Pycharm n'accepte pas BLINK, BOLD, et autres ...
:param msg: Message
:param color: Casse insensitive. Nom d'une couleur (voir dans Bcolors la liste des couleurs)
:param end: Paramètre 'end' du print()
:return: NA
"""
class Bcolors:
NOIR, ROUGE, VERT, JAUNE = '\33[30m', '\33[31m', '\33[32m', '\33[33m'
BLEU, VIOLET, CYAN, BLANC = '\33[34m', '\33[35m', '\33[36m', '\33[37m'
HEADER, OKBLUE, OKGREEN, WARNING = '\33[95m', '\33[94m', '\33[92m', '\33[93m'
FAIL, RESET, BOLD, UNDERLINE = '\33[91m', '\33[0m', '\33[1m', '\33[4m'
CEND, CBOLD, CITALIC, CURL = '\33[0m', '\33[1m', '\33[3m', '\33[4m'
CBLINK, CBLINK2, CSELECTED = '\33[5m', '\33[6m', '\33[7m'
CBLACK, CRED, CGREEN, CYELLOW = '\33[30m', '\33[31m', '\33[32m', '\33[33m'
CBLUE, CVIOLET, CCYAN, CWHITE = '\33[34m', '\33[35m', '\33[36m', '\33[37m'
CBLACKBG, CREDBG, CGREENBG, CYELLOWBG = '\33[40m', '\33[41m', '\33[42m', '\33[43m'
CBLUEBG, CVIOLETBG, CCYANBG, CWHITEBG = '\33[44m', '\33[45m', '\33[46m', '\33[47m'
CGREY, CRED2, CGREEN2, CYELLOW2 = '\33[90m', '\33[91m', '\33[92m', '\33[93m'
CBLUE2, CVIOLET2, CCYAN2, CWHITE2 = '\33[94m', '\33[95m', '\33[96m', '\33[97m'
CGREYBG, CREDBG2, CGREENBG2, CYELLOWBG2 = '\33[100m', '\33[101m', '\33[102m', '\33[103m'
CBLUEBG2, CVIOLETBG2, CCYANBG2, CWHITEBG2 = '\33[104m', '\33[105m', '\33[106m', '\33[107m'
try:
if isinstance(color, str):
color = [color]
couleur = ''
for coul in color:
couleur += eval('Bcolors.' + coul.upper())
print(couleur + msg + Bcolors.RESET, end=end)
except NameError:
print(msg, end=end)
# **********************************************************
# Divers *
# **********************************************************
@staticmethod
def deepcopy(reference):
return copy.deepcopy(reference)
# **********************************************************
# Couleurs Couleurs Couleurs Couleurs Couleurs Couleurs *
# **********************************************************
def color_to_qtcolor(self, color):
color = self.color_8_digit(color)
return '#' + color[7:] + color[1: 7]
@staticmethod
def color_8_digit(color):
""" La valeur de retour est exprimée sur 8 chiffres hexa : #rrvvbbaa """
if color[0] != '#':
return color
if len(color) == 4: # #rvb
color = '#' + color[1]*2 + color[2]*2 + color[3]*2 + 'ff'
elif len(color) == 5: # #rvba
color = '#' + color[1]*2 + color[2]*2 + color[3]*2 + color[4]*2
elif len(color) == 7: # #rrvvbb
color += 'ff'
return color
# **********************************************************
# Asynchrone Asynchrone Asynchrone Asynchrone Asynchrone *
# **********************************************************
@staticmethod
def asyn(my_target, my_callback=None, target_args=()):
"""
@param my_target: Fonction ou méthode principale (target), appelée de suite.
@param my_callback: Fonction ou méthode (callback) qui sera appelée lorsque my_target aura terminé son travail.
@param target_args: Tuple → Nb quelconque d'arguments passés à my_target.
@return: NA, réponse différée en callback.
"""
thread = MyThread(
target=my_target,
callback=my_callback,
target_args=target_args,
)
thread.start()
class MyThread(threading.Thread):
""" https://gist.github.com/amirasaran/e91c7253c03518b8f7b7955df0e954bb """
def __init__(self, target, callback, target_args):
super(MyThread, self).__init__(target=self.target_with_callback)
self.callback = callback
self.method = target
self.target_args = target_args
def target_with_callback(self):
response = self.method(*self.target_args)
self.callback(*response)
def main():
""" Code de MAP (mise au point). Espace expérimental : ajouter, supprimer, commenter, décommenter, ... """
""" Exécution : clic-droit > Run 'utils' """
dt = DateTime()
dt_stamp = 1636808631
print(dt.get_dtstr_from_dtstamp(dt_stamp))
if __name__ == '__main__':
main()
/nodes/generateurs/histos/histos.py
:
# Imports internes
from pc.ctrl_node import CtrlNode
from trading.historiques.ctrl_histos import CtrlHistos
d_datas = {
'name': 'Histos', # Label affiché sous l'icone.
'icon': 'histos.png', # Icone affichée.
}
class Node(CtrlNode):
def __init__(self, o_scene, s_id, pos):
super().__init__(o_scene, s_id, pos)
self.type = 'Histos'
self.setup()
def setup(self, child_file=__file__):
super().setup(child_file)
@property
def ld_outputs(self):
return [{
'label': 'Sortie',
'label_pos': (-38, -10)
}]
def fixed_params(self):
return {
'Instrument': ['EUR/USD', {'values': ['EUR/USD', 'USD/JPY', 'EUR/CHF', 'USD/CAD', 'NZD/USD',
'EUR/GBP', 'EUR/JPY', 'GBP/JPY', 'GBP/CHF', 'GBP/USD']}],
'Période': ['Les derniers', {'values': ['Les premiers', 'Les derniers', 'Intervalle']}],
'Nombre(s) ou semaine(s)': ['100 000',
{'tip': "Les nombres supportent des espaces pour les séparateurs de milliers."
"\nLes semaines doivent être écrites comme ceci : '2016-14'."
"\n\tSi premiers → une semaine de fin."
"\n\tSi derniers → une semaine de début."
"\n\tSi intervalle → 2 semaines, séparées par une virgule."}],
'Tick': False, # à chaque variation du signal.
'Maille 1': False, # Renko, maille de 1 pip.
'Maille 2': False, # 2 pips.
'Maille 3': False, # 3 pips.
'Maille 4': False, # 4 pips.
'Maille 5': True, # 5 pips.
'Maille 7': False, # 7 pips.
'Maille 10': False, # 10 pips.
'Maille 14': False, # 14 pips.
'Maille 20': False, # 20 pips.
'1 mn': False, # 1 minute.
'5 mn': False, # 5 minutes.
'15 mn': False, # 15 minutes.
'30 mn': False, # 30 minutes.
'1 h': False, # 1 heure.
'4 h': False, # 4 heures.
'1 j': False, # 1 jour.
'1 s': False, # 1 semaine.
'* Voir historiques ': '', # '*' = bouton.
'* Télécharger ': '',
'* Synchroniser bd ': '',
}
def my_signals(self, l_signals_in, num_socket_out):
l_signals = list()
for signal_key in self.fixed_params().keys():
b_signal = self.get_param(signal_key) # Booléen.
if isinstance(b_signal, bool) and b_signal:
typ_id, signal_ante, signal_now, signal_source = f'{self.type}{self.id}', '', signal_key, ''
l_signals.append((typ_id, signal_ante, signal_now, signal_source))
return l_signals
def get_state(self):
""" Surcharge. """
if self.b_chk:
for signal in self.fixed_params().keys():
if isinstance(self.get_param(signal), bool) and self.get_param(signal, False):
return True
return False
def histo_data(self, button):
""" Clic sur un bouton : Voir, télécharger, synchroniser. """
o_hist = CtrlHistos(self.get_param('Instrument')) # Objet Contrôleur d'historiques.
if button.startswith('Voir'):
""" Bouton 'Voir historiques'. Si Instrument == EUR/USD => Symbole == EURUSD (sans '/') """
o_hist.verify_weeks(b_ticks=False, b_silent=True)
print()
# o_hist.verify_weeks(b_ticks=True) # Ligne provisoirement commentée.
elif button.startswith('Télécharger'):
""" Bouton 'Télécharger'. """
o_hist.download_histos(nb_weeks=1, b_ticks=False) # Candles.
# o_hist.download_histos(nb_weeks=1, b_ticks=True) # Ticks - Ligne provisoirement commentée.
self.ut.printc('Écriture des ticks en base de données -> prochain tuto.')
""" Option : après le téléchargement, on affiche l'état des historiques. """
print()
o_hist.verify_weeks(b_ticks=False, b_silent=True) # Candles.
print()
# o_hist.verify_weeks(b_ticks=True) # Ticks. # Ligne provisoirement commentée.
elif button.startswith('Synchroniser'):
""" Bouton 'Synchroniser bd'. """
o_hist.synchro_db_csv(b_ticks=False)
# o_hist.synchro_db_csv(b_ticks=True) # Ligne provisoirement commentée.
self.ut.printc('Synchro ticks -> prochain tuto.')
def refresh(self, l_keys):
""" Méthode appelée automatiquement à chaque modification de paramètre dans le dockable. """
if l_keys[1].startswith(' '): # Les libellés des boutons commencent par des espaces.
""" Les 3 boutons pour la base de données. Affichages dans le terminal. """
self.histo_data(l_keys[1].strip())
elif isinstance(self.get_param(l_keys[-1]), bool):
""" Nodes à updater : Propagation aval, seulement pour les signaux (cases à cocher). """
self.lo_sockets_out[0].to_update()
""" Infrastructure : Insertion du mot 'infrastructure' dans l_keys, en 1ère place. """
l_keys = ['infrastructure'] + l_keys
super().refresh(l_keys)
# class Calcul(CtrlCalcul):
# pass
Le code a été simplifié. Les actions des boutons concernent à la fois les candles et les ticks.
CtrlHistos.download_histos()
2 fois : Candles
et Ticks
.CtrlHistos.verify_weeks()
2 fois : Candles
et Ticks
.CtrlHistos.verify_weeks()
.CtrlHistos.download_histos()
.CtrlHistos.synchro_bd_csv()
.ctrl_histos.py
, avec les méthodes download_histos()
et verify_weeks()
codées./trading/historiques/ctrl_histos.py
:
# Imports externes
import datetime
import gzip
import hashlib
import os # http://www.python-simple.com/python-modules-fichiers/os-path.php (Ctrl + clic)
import pickle
import requests
from io import BytesIO, StringIO
import pandas as pd
import keyboard
# Imports internes
from functions.utils import DateTime, Utils
from trading.historiques.db_candle import DbCandle
# from trading.historiques.db_tick import DbTick
class CtrlHistos:
def __init__(self, instrument):
""" https://github.com/fxcm/MarketData
Tous les symboles :
AUDCAD, AUDCHF, AUDJPY, AUDNZD, AUDUSD, CADCHF, CADJPY, EURAUD, EURCHF,
EURGBP, EURJPY, EURNZD, EURUSD, GBPCAD, GBPCHF, GBPJPY, GBPNZD, GBPUSD,
NZDCAD, NZDCHF, NZDJPY, NZDUSD, USDCAD, USDCHF, USDJPY, USDTRY
Les plus utilisés (Dans l'ordre d'importance) :
EURUSD, USDJPY, EURCHF, USDCAD, NZDUSD, EURGBP, EURJPY, GBPJPY, GBPCHF, GBPUSD """
self.instrument = instrument
self.symbol = instrument.replace('/', '')
self.dt = DateTime()
self.ut = Utils()
self.symbol_dir = None
self.db_candle = None
self.db_tick = None
self.setup()
def setup(self):
""" Les données peuvent se trouver sur un disque dur SSD autre que celui en cours.
Le cas échéant, db_path doit être modifié (chemin absolu).
Il est également possible, pour les besoins des tests, d'avoir plusieurs bases de données.
|_ Il faudra alors modifier db_path chaque fois que l'on désire switcher. """
db_path = os.path.dirname(__file__) # <--- Chemin par défaut, à modifier si autre disque dur.
self.symbol_dir = os.path.abspath(f'{db_path}/db/{self.symbol}') # Dossier des fichiers d'historiques.
os.makedirs(self.symbol_dir, exist_ok=True)
self.db_candle = DbCandle(self.symbol_dir)
# self.db_tick = DbTick(self.symbol_dir)
def _helper(self, key, *l_params):
""" ************ Collection d'utilitaires. Nombre de paramètres variable. ************ """
if key == 'nb_weeks_in_year':
""" Renvoie le nombre de semaines dans l'année {year}. """
year = l_params[0] # l_params = [année dont on veut connaître le nombre de semaines].
if len(l_params) > 1:
y, num_week, _ = datetime.date.today().isocalendar()
if y == year:
return num_week - 1
o_dat = self.dt.get_date_from_dtstamp(self.dt.get_dtstamp_from_dtstr(f'{year}-12-28', '%Y-%m-%d'))
return o_dat.isocalendar()[1]
elif key == 'l_iso_yw':
""" Renvoie une liste de tuples ISO (iso_year, iso_str_week) : <-- exemple de tuple : (2019, '08')
- Pour la devise en cours et son type (tick ou candle).
- Toutes les semaines, du début à aujourd'hui.
- L'année est un int, la semaine une str ('02', '12', ...) """
b_ticks = l_params[0] # l_params = [b_ticks].
st_yw = set() # Le set() évite les doublons.
since = 2018 if b_ticks else 2012
yw_today = datetime.date.today().isocalendar()[:2]
for year in range(since, yw_today[0] + 1):
for week in range(1, 54):
iso = self.dt.isoyw_from_fxyw(year, week)
st_yw.add((iso[0], f'0{iso[1]}'[-2:]))
if (year, week) >= yw_today:
lt_yw = list(st_yw)
lt_yw.sort()
return lt_yw
elif key == 'csv_file':
""" Renvoie le chemin complet du fichier correspondant à la devise en cours, à l'année et à la semaine. """
year, week, b_ticks = l_params # l_params = [*t_yw, b_ticks]
str_week = f'0{week}'[-2:]
return os.path.join(self.symbol_dir, str(year), f"{'tick' if b_ticks else 'candle'}_"
f"{str_week}.csv")
elif key == 'url_file':
""" Renvoie l'url du fichier d'histos correspondant à la devise en cours, à l'année et à la semaine. """
year, str_week, b_ticks = l_params # l_params = [*t_yw, b_ticks]
url_candle = 'https://candledata.fxcorporate.com/m1'
url_tick = 'https://tickdata.fxcorporate.com'
return f'{url_tick if b_ticks else url_candle}/{self.symbol}/{year}/{int(str_week)}.csv.gz'
def verify_weeks(self, b_ticks, b_silent=False):
""" Affichage d'un tableau dans la console (Lignes=années, Colonnes=semaines).
'▒' ou '▄' ou '▀' = semaine présente, '.' = semaine absente. """
""" 1 - Listes des semaines réellement existantes en base de données et en fichier .csv. """
l_weeks_csv, l_weeks_db = self._existing_lists(b_ticks)
""" 2 - Affichage du titre et entêtes de colonnes. """
typ = 'Ticks' if b_ticks else 'Candles'
self.ut.gauge(f"{self.instrument}-{typ}", large=17)
[self.ut.gauge(char=f"{str(week) :<2}") for week in range(1, 54)]
self.ut.gauge('end')
""" 3 - Affichage du contenu - Ordonnées = années, abscisse = semaines. """
l_weeks = l_weeks_csv + l_weeks_db
l_weeks.sort()
first_year = (2018 if b_ticks else 2012) if len(l_weeks) == 0 else l_weeks[0][0]
year_now = datetime.date.today().isocalendar()[0]
for year in range(first_year, year_now + 1):
self.ut.gauge(year, large=17)
""" Boucle sur les semaines de l'année {year} : 1 à (52 ou 53). """
for week in range(1, self._helper('nb_weeks_in_year', year, 'today') + 1): # today limite à aujourd'hui.
t_yw = year, f'0{week}'[-2:] # 7 -> 07
b_csv, b_db = t_yw in l_weeks_csv, t_yw in l_weeks_db
if not b_csv and not b_db: # 0 0 /x./y
char, color = '. ', 'BLEU'
elif not b_csv and b_db: # 0 1 /x.y
char, color = '▀ ', 'VERT'
elif b_csv and not b_db: # 1 0 x./y
char, color = '▄ ', 'VIOLET'
else: # 1 1 x.y
char, color = '▒ ', 'BLEU'
self.ut.gauge(char=char, color=color)
self.ut.gauge('end')
""" Vérification de la synchronisation entre la base de données et les fichiers csv. """
msg = "\nAttention : La base de données n'est pas entièrement synchronisée ! csv + db = rectangles bleus." \
"\nL'affichage ci-dessus montre les fichiers .csv en carrés-bas mauves, la db en carrés-hauts verts." \
if not b_silent and l_weeks_csv != l_weeks_db else ''
self.ut.printc(msg)
def download_histos(self, nb_weeks, b_ticks):
if nb_weeks > 1:
self.ut.printc("\nArrêt manuel : garder la touche 'CTRL' appuyée.\n")
""" Intervalle des semaines à télécharger (nombres en base 53). """
""" |_ since = Depuis. """
l_weeks_csv, _ = self._existing_lists(b_ticks=b_ticks) # Semaines = str <-- '08', '09', '10', ...
l_csv = [(y, int(w)) for (y, w) in l_weeks_csv] # Semaines = int <-- 8, 9, 10, ...
for y, w in l_csv:
if w > 2:
since = y * 53 + w - 1
break
else:
since = (2018 if b_ticks else 2012) * 53
y_today, w_today = datetime.date.today().isocalendar()[:2]
now = y_today * 53 + w_today - 1 # -1 = pas la semaine en cours.
""" |_ now = Jusqu'à. """
""" l_required = Liste des fichiers à télécharger. """
l_required = list()
for yw53 in range(since, now):
csv_yw = yw53 // 53, 1 + yw53 % 53
csv_file = self._helper('csv_file', *csv_yw, b_ticks)
if not os.path.isfile(csv_file):
""" Fichier absent localement => à télécharger. """
l_required.append(csv_yw)
db = self.db_tick if b_ticks else self.db_candle
nb_weeks_loaded = 0
for csv_yw_required in l_required:
if keyboard.is_pressed('ctrl'):
self.ut.printc("Arrêt manuel demandé.")
break
df = self._download_csv_file(csv_yw_required, b_ticks)
if df.shape[0] > 0:
""" Enregistrement du fichier téléchargé sur disque dur. """
self._df_to_csv(df, csv_yw_required, b_ticks)
""" Écriture du contenu en base de données. """
db.df_to_table(df)
""" Arrêt si le nombre de semaines est atteint. """
nb_weeks_loaded += 1
if nb_weeks_loaded >= nb_weeks:
break
s = 's' if nb_weeks > 1 else ''
self.ut.printc(f"Téléchargement{s} terminé{s}.", color='vert')
def synchro_db_csv(self, b_ticks):
""" A l'issue de cette méthode, TOUTE la db sera l'image de TOUS les fichiers csv. """
""" 1 - Base de données. """
db, typ = (self.db_tick, 'Ticks') if b_ticks else (self.db_candle, 'Candles')
""" 2 - État actuel des historiques => Différences entre csv et db
- l_drop = semaines à supprimer de la db, l_add = semaines à ajouter à la db. """
l_drop, l_add = self._lists_to_update_db(b_ticks=b_ticks)
if l_drop+l_add == []:
self.ut.printc(f"{typ} csv <--> db : OK. La synchronisation est inutile.\n", color='VERT')
return True
""" 3 - Message : état initial. """
for l_weeks in [l_drop, l_add]:
if len(l_weeks) > 0:
s = 's sont' if len(l_weeks) > 1 else ' est'
verb = 'ajouter à' if l_weeks == l_add else 'supprimer de'
self.ut.printc(f"{len(l_weeks)} semaine{s} à {verb} la base de données '{typ.lower()}'.", color='VERT')
""" 4 - Ajout de données dans la db. """
self._add_weeks(l_add, b_ticks)
""" 5 - Suppression de données de la db. """
for yw in l_drop:
db.delete_week(*yw)
""" 6 - Vérification. """
l_drop, l_add = self._lists_to_update_db(b_ticks=b_ticks)
success, color = ('réussi', 'VERT') if l_drop+l_add == [] else ('échoué', 'ROUGE')
self.ut.printc(f'La synchronisation a {success}.', color=color)
def _existing_lists(self, b_ticks):
""" Retourne les listes de semaines des fichiers csv et des enregistrements en base de données. """
""" Exécution rapide si les signatures correspondent (~1ms). """
sign_now = self._signature(b_ticks) # Signature actuelle.
lists_file = os.path.join(self.symbol_dir, 'lists_tick.pkl' if b_ticks else 'lists_candle.pkl')
if os.path.isfile(lists_file):
with open(lists_file, 'rb') as l_lists:
sign_ante, l_weeks_csv, l_weeks_db = pickle.load(l_lists) # Signature persistée.
if sign_now == sign_ante:
return l_weeks_csv, l_weeks_db
""" Exécution lente si les signatures ne correspondent pas (~3 secondes, mais seulement la 1ère fois). """
""" s_weeks_csv n'est pas une list(), mais un set(), pour éviter les doublons. """
l_weeks_csv, s_weeks_db = list(), set()
db = self.db_tick if b_ticks else self.db_candle
l_iso_yw = self._helper('l_iso_yw', b_ticks) # Liste de toutes les semaines.
typ = 'Ticks' if b_ticks else 'Candles'
self.ut.gauge(f"{self.instrument}-{typ} Création listes :", large=0)
for num, iso_yw in enumerate(l_iso_yw):
""" Semaines existant en fichiers csv : Pour une semaine iso, on a 0, 1 ou 2 fichiers csv. """
l_csv_files = self._get_csv_files(*iso_yw, b_ticks)
csv_size = 0
if len(l_csv_files) > 0:
l_weeks_csv.append(iso_yw)
for csv_file in l_csv_files:
csv_size += os.path.getsize(csv_file)
""" Liste des semaines existant en base de données. """
if db.week_exists(*iso_yw, csv_size):
s_weeks_db.add(iso_yw) # (1)
if num % 8 == 0:
self.ut.gauge(char='.') # , color='Jaune')
self.ut.gauge('end')
self.ut.gauge('end')
""" Persistance en pkl. """
l_weeks_db = list(s_weeks_db)
l_weeks_csv.sort()
l_weeks_db.sort()
with open(lists_file, 'wb') as l_lists:
pickle.dump((sign_now, l_weeks_csv, l_weeks_db), l_lists, pickle.HIGHEST_PROTOCOL)
return l_weeks_csv, l_weeks_db
def _signature(self, b_ticks):
""" - Cette méthode est d'exécution rapide (~1 ms).
- Elle produit une signature sur 256 bits.
- Elle permet de savoir si des modifications ont été apportées à la db et/ou aux fichiers csv. """
db = self.db_tick if b_ticks else self.db_candle
sign = str(db.get_db_size())
for root, sub_folder, l_csv in os.walk(self.symbol_dir):
if root == self.symbol_dir:
continue
else:
""" Concaténation de tous les noms de fichiers .csv dans {self.symbol_dir} """
l_csv = [csv for csv in l_csv if csv.startswith('tick' if b_ticks else 'candle')] # Filtrage.
sign += root + ''.join(l_csv)
return hashlib.sha256(sign.encode('utf-8')).hexdigest()
def _get_csv_files(self, iso_year, iso_week, b_ticks):
""" Attention ! Les noms de fichiers csv ne correspondent pas toujours à leur contenu ! """
""" Return : liste de noms complets de fichiers csv correspondant à (iso_year, iso_week) : 0, 1 ou 2 fichiers.
fxcorporate ne respecte pas la norme ISO. De plus :
- D'une année sur l'autre, le nom du fichier csv ne correspond pas à son contenu.
- On ne trouve pas de correspondance entre diverses devises.
- Par conséquent, le nom du fichier n'est pas fiable pour trouver le bon N° de semaine.
La méthode adoptée pour contourner ce problème est la suivante :
- Recherche de 3 fichiers csv voisins de (iso_year, iso_week) passés en paramètres.
- Lecture, pour chacun, de la date-heure du milieu du fichier.
- On en déduit le VRAI N° de semaine ISO.
Il arrive parfois, au changement d'année, qu'une même semaine soit composée de 2 fichiers csv.
- C'est pourquoi la valeur de retour est une liste. """
int_week, l_fxyw = int(iso_week), list()
for week in range(int_week - 1, int_week + 2): # 3 Semaines : précédente, actuelle, suivante.
""" Correction : N° de semaine en base 53. """
y, w = (iso_year, week) if week > 0 else (iso_year - 1, week + 53)
y, w = (y, w) if w <= 53 else (y + 1, w - 53)
""" Les 3 fichiers csv candidats. """
str_w = f'0{w}'[-2:]
file_name = f"{'tick' if b_ticks else 'candle'}_{str_w}.csv" # 'tick_??.csv' ou 'candle_??.csv'.
csv_path = os.path.abspath(f"{os.path.dirname(self.symbol_dir)}/{self.symbol}/{y}/{file_name}")
if os.path.isfile(csv_path):
with open(csv_path, 'r') as csv:
""" On 'goûte' la dernière ligne du fichier pour connaitre le N° ISO de semaine. """
last_line = list(csv)[-2]
stamp = self.dt.get_dtstamp_from_dtstr(last_line.split(',')[0])
o_d = self.dt.get_date_from_dtstamp(stamp) # Objet date.
if o_d.isocalendar()[:2] == (iso_year, int_week): # Filtrage.
l_fxyw.append(csv_path) # Candidat sélectionné.
return l_fxyw # 0, 1 ou 2 éléments.
def _download_csv_file(self, csv_yw_required, b_ticks):
""" Téléchargement, décompactage, décodage, DataFrame, enregistrement sur disque dur.
- return DataFrame. """
""" 1 - Demande au helper de construire l'url. """
url_required = self._helper('url_file', *csv_yw_required, b_ticks)
""" 2 - Initialisation des bytes. """
content = b''
""" 3 - Téléchargement en streaming avec lots de taille {size//50}, permettant l'insertion d'une jauge. """
y, w = csv_yw_required
with requests.get(url_required, stream=True) as req:
size = int(req.headers['Content-Length'])
if size < 1_000:
return pd.DataFrame() # return df vide.
self.ut.gauge(f"Téléchargement {' ticks ' if b_ticks else 'candles'} {self.symbol} ({y}, {w})")
for chunk in req.iter_content(chunk_size=size//50):
content += chunk
""" 3.1 - Jauge de progression. """
self.ut.gauge()
self.ut.gauge('end')
""" 4 - Décompactage du buffer de données binaires (codées utf), en mémoire. """
buf = BytesIO(content)
f = gzip.GzipFile(fileobj=buf)
coded_bytes = f.read()
""" 5 - Décodage : utf-8 pour les candles, utf-16 pour les ticks. """
codec, type_data = ('utf-16', 'tick') if b_ticks else ('utf-8', 'candle')
decoded_bytes = coded_bytes.decode(codec)
""" 6 - Conversion Bytes -> String """
decoded_str = StringIO(decoded_bytes)
""" 7 - String to DataFrame. """
df = pd.read_csv(decoded_str) # Pas de colonne-index (index_col=0 retiré)
""" 8 - Retourne le dataframe. """
return df
def _df_to_csv(self, df, t_yw, b_ticks):
""" Contrôle préalable : si le dossier-cible n'existe pas, on le crée. """
csv_path = self._helper('csv_file', *t_yw, b_ticks)
os.makedirs(os.path.dirname(csv_path), exist_ok=True)
df.to_csv(csv_path, index=False) # Copie du fichier .csv sur disque dur.
@staticmethod
def _csv_to_df(csv_file):
""" Conversion du fichier {csv_file} en dataframe (vide si fichier inexistant). """
# à coder ...
def _lists_to_update_db(self, b_ticks):
""" Retourne 2 listes : l_drop et l_add. Algorithme :
- (1) = l_weeks_db : Liste des semaines réellement existantes en base de données.
- (2) = l_weeks_csv : Liste des semaines réellement existantes en fichiers csv.
- Intersection (Inter) = (1) ⋂ (2)
- l_drop : Liste des semaines à supprimer de la db (celles qui n'existent pas en csv) : (1) - (Inter)
- l_add : Liste des semaines à ajouter dans la db (celles des csv qui n'existent pas en db) : (2) - (Inter)
"""
l_drop, l_add = list(), list()
""" 1 - Listes des semaines réellement existantes en base de données et en fichier .csv. """
# ...
""" 2 - Liste des semaines à supprimer de la db (celles qui n'existent pas en csv). """
# ...
""" 3 - Liste des semaines à ajouter à la db (celles des csv qui n'existent pas en db). """
# ...
return l_drop, l_add
def _add_weeks(self, l_yw, b_ticks):
"""
@param l_yw : Liste des semaines à ajouter à la base de données.
@param b_ticks: True = ticks, False = candles.
"""
# à coder ...
def main(): # MAP ***** MAP ***** MAP ***** MAP ***** MAP ***** MAP ***** MAP *****
""" Code de MAP (mise au point). Espace expérimental : ajouter, supprimer, commenter, décommenter, ... """
h = CtrlHistos('EUR/USD')
# h = CtrlHistos('USD/JPY')
""" Candles. """
# h.download_histos(1, b_ticks=False)
h.verify_weeks(b_ticks=False) # , b_silent=True)
# h.synchro_db_csv(b_ticks=False)
""" Ticks. """
# h.download_histos(1, b_ticks=True)
# h.verify_weeks(b_ticks=True)
# h.synchro_db_csv(b_ticks=True)
if __name__ == '__main__':
main()
La méthode verify_weeks()
utilise la jauge de progression, codée dans la classe Utils.
main()
).
main()
, commentez / décommentez les lignes qui vous conviennent.Dans cet exemple, la dernière semaine téléchargée est (2018, 38).
downolad_csv_file()
+ df_to_csv()
.downolad_csv_file()
+ df_to_table()
en direct, lors du téléchargement.csv_to_df()
+ df_to_table()
en différé = Synchronisation csv → db.download_histos()
, qui accepte comme paramètres le nombre de semaines à télécharger et le type (Candles ou Ticks).download_histos()
:
- Boucle sur le nombre de semaines à télécharger :
- Appel de la méthode privée
_next_week required()
:
- Recherche de la première semaine existant sur internet ET n'existant pas localement.
- Si l'instrument n'existe pas sur internet (ou mauvaise connexion) : Message et fin.
- Si la base de données est à jour : Message et fin.
- Appel de la méthode privée
_download_csv_file()
: ← Voir diagramme ci-dessus.
- Téléchargement du fichier.
- Décompactage.
- Décodage.
- Production d'une DataFrame Pandas.
- Appel de la méthode
df_to_csv()
: ← Voir diagramme ci-dessus.
- Production du fichier
.csv
à partir du DataFrame Pandas.- Copie de ce fichier sur disque dur.
- Appel de la méthode
df_to_table()
:
- L'écriture en base de données est déléguée aux objets db.
Le point suivant traite de la base de données. Nous avons besoin de fichiers csv pour effectuer une synchronisation (csv → bd
).
Prenons pour l'exemple l'instrument EUR/USD
.
☐ Modifier le code de MAP
, dans la fonction main()
à la fin de ce fichier ctrl_histos.py
.
☐ Vérifier leur présence sur disque dur.
ORM
pour gérer les db, en lieu et place du langage SQL
.
SQL
pour comprendre le mécanisme des requêtes.M1
contiendra les données téléchargées des candles.M1
.ORM
choisi est peewee
: il faudra installer ce module via Pycharm
.SQLite
.peewee
depuis Pycharm.Database
./trading/historiques/db.py
:
from peewee import SqliteDatabase
from functions.utils import DateTime, Utils
import datetime
import pandas as pd
import numpy as np
import os
class DataBase:
def __init__(self, symbol_dir, b_ticks):
self.b_ticks = b_ticks
self.pips = .01 if symbol_dir.endswith('JPY') else .0001
self.dt = DateTime()
self.ut = Utils()
self.week_delta = datetime.timedelta(weeks=1) # <class 'datetime.timedelta'>
self.day_delta = datetime.timedelta(days=1) # <class 'datetime.timedelta'>
self.o_table = None
pd.set_option('mode.chained_assignment', None)
""" 3 valeurs pour b_ticks : True (ticks), False (candles) ou None (renkos). """
file_name = 'tick' if b_ticks is True else ('candle' if b_ticks is False else 'renko')
sql_file = os.path.abspath(f"{symbol_dir}/{file_name}.sql")
self.db = SqliteDatabase(sql_file)
self.d_tables = dict()
""" Création physique du fichier candle.sql sur disque dur. """
self.db.connect()
print("Ici, la classe-mère 'DataBase'.")
def change_datetime_column(self, df):
""" Mise en conformité de la colonne DateTime : string to float. """
if 'DateTime' in df.columns:
df = df.rename(columns={'DateTime': 'timestamp'})
dt0 = np.datetime64(self.dt.get_dtstr_from_dtstamp(0, dt_format='%Y-%m-%d %H:%M:%S')) # '1970-01-01 01:00'
t_delta = np.timedelta64(1, 's') # <class 'numpy.timedelta64'>
pd_dt = pd.to_datetime(df.timestamp, format='%m/%d/%Y %H:%M:%S.%f') # format nécessaire, sinon très lent.
last_stamp = (np.datetime64(pd_dt.iloc[-1]) - dt0) / t_delta
o_d = self.dt.get_date_from_dtstamp(last_stamp)
yw = o_d.isocalendar()[:2] # (year, week)
len_batch = int(.95 * df.shape[0] / 50)
l_ts = list()
self.ut.gauge(f'Conformité {yw}')
for _from in range(0, df.shape[0], len_batch):
_to = _from + len_batch
self.ut.gauge()
l_ts += [(np.datetime64(x) - dt0) / t_delta for x in pd_dt[_from: _to]]
self.ut.gauge('end')
df.timestamp = l_ts
return df
def get_db_size(self):
nb_enr = 0
for o_table in list(self.d_tables.values()):
nb_enr += o_table.select().count()
return nb_enr
def _get_range_from_df(self, df):
""" Renvoie un tuple de stamps, qui encadrent df, à l'extérieur de df (samedis 22:00). """
f_stamp = df['timestamp'].iloc[0] # Premier stamp réel dans df. <-- début.
l_stamp = df['timestamp'].iloc[-1] # Dernier stamp réel dans df. <-- fin.
stamp_before = f_stamp + self.dt.get_stamp_offset(f_stamp, b_after=False, num_day=6, _time='22:00')
stamp_after = l_stamp + self.dt.get_stamp_offset(l_stamp, b_after=True, num_day=6, _time='22:00')
return stamp_before, stamp_after
def _get_range_from_yw(self, year, str_week):
stamp = self.dt.get_dtstamp_from_dtstr(f'{year}-{str_week}-3 10', dt_format='%G-%V-%u %H') # Mercredi 10:00
stamp_before = stamp + self.dt.get_stamp_offset(stamp, b_after=False, num_day=6, _time='22:00')
stamp_after = stamp + self.dt.get_stamp_offset(stamp, b_after=True, num_day=6, _time='22:00')
return stamp_before, stamp_after
""" Méthodes surchargées. """
def df_to_table(self, df):
pass
def update_derived(self, df):
pass
def update_derived_table(self, df, stamp_before, stamp_after, o_derived_table):
pass
def week_exists(self, year, week, csv_size):
return False
def delete_week(self, year, str_week):
pass
DbCandle
./trading/historiques/db_candle.py
:
# Imports externes
from peewee import Model, FloatField, IntegerField
# Imports internes
from trading.historiques.db import DataBase
# noinspection PyTypeChecker,PyProtectedMember
class DbCandle(DataBase):
def __init__(self, symbol_dir):
super().__init__(symbol_dir=symbol_dir, b_ticks=False)
db = self.db
print("Ici, la classe dérivée 'DbCandle'.")
""" Classe mère (hérite de peewee.Model). """
class BaseCandles(Model):
# https://docs.peewee-orm.com/en/latest/peewee/models.html#field-types-table
timestamp = IntegerField(primary_key=True) # Index==True et null==False par défaut => vide refusé.
BidOpen = FloatField()
BidHigh = FloatField()
BidLow = FloatField()
BidClose = FloatField()
AskOpen = FloatField()
AskHigh = FloatField()
AskLow = FloatField()
AskClose = FloatField()
Volume = IntegerField(null=True)
class Meta:
# db_table = 'xxxx' Si omis, le nom de la table sera automatique = nom de la classe en minuscules.
database = db
order_by = 'timestamp'
""" Classes dérivées (héritent de BaseCandles). """
class M1(BaseCandles):
""" Table principale. """
seconds = 60
class M5(BaseCandles):
seconds = 5 * 60
class M15(BaseCandles):
seconds = 15 * 60
class M30(BaseCandles):
seconds = 30 * 60
class H1(BaseCandles):
seconds = 60 * 60
class H4(BaseCandles):
seconds = 4 * 60 * 60
class Day(BaseCandles):
seconds = 24 * 60 * 60
class Week(BaseCandles):
seconds = 5 * 24 * 60 * 60 # Marché ouvert 5 jours par semaine.
""" Instanciations. """
self.d_tables = {'m1': M1, 'm5': M5, 'm15': M15, 'm30': M30, 'h1': H1, 'h4': H4, 'day': Day, 'week': Week}
self.o_table = M1
""" Création des tables dans le fichier candle.sql. """
with db:
self.db.create_tables(list(self.d_tables.values()))
def df_to_table(self, df):
""" Écriture SANS DOUBLONS des données de la DateFrame Pandas {df} dans la table principale M1. """
""" 1 - Mise en conformité de la colonne date-time. """
df = self.change_datetime_column(df)
""" 2 - Écriture dans la table M1 : ~38sec pour 1_000_000 lignes.
replace_many() préférable à insert_many() car remplace en cas de doublon, donc pas d'erreur. """
l_datas = df.to_dict(orient='records') # <class 'list'> <-- ~3sec pour 1_000_000 lignes.
self.o_table.replace_many(l_datas).execute()
self.update_derived(df)
def update_derived(self, df):
""" Tables dérivées de {M1}. """
stamp_before, stamp_after = self._get_range_from_df(df) # 2 samedis à 22:00.
self.ut.gauge('Tables dérivées de M1')
for o_derived_table in list(self.d_tables.values())[1:]: # Toutes les tables, sauf 'M1'
self.update_derived_table(df, stamp_before, stamp_after, o_derived_table)
self.ut.gauge(char=f'{o_derived_table._meta.table_name.upper()} ▒')
self.ut.gauge('end') # Passe à la ligne en fin de traitement.
self.ut.gauge('end') # Ligne vide supplémentaire.
def update_derived_table(self, df, stamp_before, stamp_after, o_derived_table):
""" Une seule table. """
l_batches = list()
first_stamp, last_stamp, step = int(stamp_before + 86_400), int(stamp_after - 86_400), o_derived_table.seconds
last = ((last_stamp - first_stamp) // step) - 1
for i, since in enumerate(range(first_stamp, last_stamp, step)):
_from = stamp_before if i == 0 else since
_to = stamp_after if i == last else since + step
filtered_df = df[(df['timestamp'] >= _from) & (df['timestamp'] <= _to)]
if filtered_df['timestamp'].count() == 0:
continue
l_record = list()
l_record.append(filtered_df['timestamp'].iloc[-1]) # timestamp de fin.
l_record.append(filtered_df['BidOpen'].iloc[0]) # bidopen de début.
l_record.append(filtered_df['BidHigh'].max()) # valeur max des bidhigh.
l_record.append(filtered_df['BidLow'].min()) # valeur min des bidlow.
l_record.append(filtered_df['BidClose'].iloc[-1]) # bidclose de fin.
l_record.append(filtered_df['AskOpen'].iloc[0]) # askopen de début.
l_record.append(filtered_df['AskHigh'].max()) # valeur max ds askhigh.
l_record.append(filtered_df['AskLow'].min()) # valeur min des asklow.
l_record.append(filtered_df['AskClose'].iloc[-1]) # askclose de fin.
l_batches.append(l_record)
""" Écriture de l_batches (plusieurs enregistrements en une seule fois) dans la table o_table. """
with self.db.transaction():
o_derived_table.replace_many(l_batches).execute()
def week_exists(self, year, str_week, csv_size):
""" Samedi avant, samedi après. Le milieu de la semaine est '3 10' -> mercredi 10:00. """
stamp = self.dt.get_dtstamp_from_dtstr(f'{year}-{str_week}-3 10', dt_format='%G-%V-%u %H') # Mercredi 10:00
stamp_before = stamp + self.dt.get_stamp_offset(stamp, b_after=False, num_day=6, _time='22:00')
stamp_after = stamp + self.dt.get_stamp_offset(stamp, b_after=True, num_day=6, _time='22:00')
for o_table in list(self.d_tables.values()): # Toutes les tables.
count = o_table.select().where(o_table.timestamp.between(stamp_before, stamp_after)).count()
""" Relation empirique entre la taille du fichier sur disque et le nombre attendu d'enregistrements. """
corr = .65 # Coefficient de correction.
nb_min = max(1, int(corr * csv_size / o_table.seconds))
if count < nb_min:
return False
return True
def delete_week(self, *yw):
""" Supprime la semaine {yw} dans chacune des tables """
range_stamps = self._get_range_from_yw(*yw)
for o_table in list(self.d_tables.values())[::-1]: # A l'envers, de 'Week' à 'M1'.
o_table.delete().where(o_table.timestamp.between(*range_stamps)).execute()
DbTick
./trading/historiques/db_tick.py
:
# Imports internes
from trading.historiques.db import DataBase # Classe-parent.
class DbTick(DataBase):
def __init__(self, symbol_dir):
super().__init__(symbol_dir=symbol_dir, b_ticks=True)
Cette classe sera codée dans le prochain tuto.
DbCandle
au début du fichier ctrl_histos.py
:
from trading.historiques.db_candle import DbCandle
DbCandle
à la fin du setup :
self.db_candle = DbCandle(self.symbol_dir)
main()
, en fin de fichier, garder uniquement l'instanciation du contrôleur :
def main(): # MAP ***** MAP ***** MAP ***** MAP ***** MAP ***** MAP ***** MAP *****
""" Code de MAP (mise au point). Espace expérimental : ajouter, supprimer, commenter, décommenter, ... """
h = CtrlHistos('EUR/USD')
☐ Vérifier :
peewee.Model
.BaseCandles
, qui hérite de peewee.Model
.M1, M5, M15, M30, H1, H4, Day, Week
, qui héritent de BaseCandles
.DbCandle.__init__()
.d_tables
.M1
→ self.o_table = M1
SQLite
(External tool), ainsi que le raccourci Alt + Q
.
candle.sql
)./trading/historiques/db/EURUSD/candle.sql
dans le DB Browser (Alt + Q
).Les tables ont bien été créées. On peut voir les colonnes de h1 sur la droite.
vers la fin de la méthode download_histos()
:
""" Écriture du contenu en base de données. """
db.df_to_table(df)
main()
de ctrl_histos.py
pour télécharger une semaine de candles.
def main(): # MAP ***** MAP ***** MAP ***** MAP ***** MAP ***** MAP ***** MAP *****
""" Code de MAP (mise au point). Espace expérimental : ajouter, supprimer, commenter, décommenter, ... """
h = CtrlHistos('EUR/USD')
""" Candles. """
h.download_histos(1, b_ticks=False)
h.verify_weeks(b_ticks=False)
ctrl_histos.py
. Dans notre exemple, la dernière semaine téléchargée était (2018, 38).
candle.sql
puis Alt + Q
) :
1 semaine téléchargée = 5 jours. Sélectionner une autre table pour voir son contenu.
.csv
sont hebdomadaires.timestamp
nous permettra de savoir quelles sont les semaines à synchroniser.candle.csv
à synchroniser, consistera à :
M1
.M5, M15, M30, H1, H4, Day et Week
.
table M1
à partir des fichiers csv
.csv_to_df()
puis df_to_table()
.synchro_bd_csv()
, appelée par le bouton du PC
, les utilise dans une boucle.synchro_db_csv()
:
Paramètre en entrée :
b_ticks
, booléen =type
de données (Ticks ou Candles).Création de 2 listes : Méthode privée
_lists_to_update_db()
qui renvoie l_drop et l_add
(1) - l_weeks_db : Liste des semaines existant réellement en base de données, déléguée à
DbCandle.week_exists()
.(2) - l_weeks_csv : Liste des semaines existant réellement en fichier csv.
|_ Intersection(Inter) = (1) ⋂ (2)
l_drop : Liste des semaines à supprimer de la db (celles qui n'existent pas en csv) :
(1) - (Inter)
l_add : Liste des semaines à ajouter dans la db (celles des csv qui n'existent pas en db) :
(2) - (Inter)
Si les 2 listes sont vides : Message et fin.
Ajout de données. Méthode
add_weeks()
. Boucle sur l_add :
csv_to_df()
, produisant leDataFrame df
.
db.change_datetime_column(df)
: Mise en conformité → la colonne df.DateTime (str) devient df.timestamp (float).
db.df_to_table()
: Écriture dedf
dans la table principaleM1
.
db.update_derived()
: Écriture dedf filtré
dans les tables dérivées :M5, M15, M30, H1, H4, Day et Week
.Suppression de données. Boucle sur l_drop :
db.delete_week(year, week)
: Suppression d'enregistrements de toutes les tables, de la semaine(year, week)
.
CtrlHistos._lists_to_update_db()
+ vérification.CtrlHistos.csv_to_df()
+ vérification.CtrlHistos.add_weeks()
+ vérification.
Bon coding, et bon courage !
Snippets
Bonjour les codeurs !