Stockage local des cours en 'ticks' et en 'Renko'
Avant-propos
tick.sql et renko.sql.Volume des tables de Candles.
m15), on voit que le champ Volume est rempli de NULL.
Examen de la table m15 par DB Browser for SQLite.
DataBase et de DbCandle est modifié. On trouvera notamment :
DbCandle.update_volume().DataBase.is_volume_in_week().delete_week de DbCandle vers DataBase.self.o_ctrl, self.kb_break, .../trading/historiques/db.py :
from peewee import SqliteDatabase
from functions.utils import DateTime, Utils
import datetime
import pandas as pd
import numpy as np
import keyboard
import os
""" (Russe - vidéo) https://www.youtube.com/watch?v=8dla28TLvwA <- Ctrl + Clic
(Français - txt) https://linuxtut.com/fr/9d4e1d0afac1865acdbb/
(Anglais - vidéo) https://www.youtube.com/watch?v=Vk6Ptnvqr4M
"""
class DataBase:
def __init__(self, o_ctrl, b_ticks):
self.o_ctrl = o_ctrl
self.b_ticks = b_ticks
self.pips = .01 if o_ctrl.symbol_dir.endswith('JPY') else .0001
self.dt = DateTime()
self.ut = Utils()
self.kb_break = o_ctrl.kb_break # Nom de la touche du clavier défini dans le contrôleur CtrlHistos.
self.week_delta = datetime.timedelta(weeks=1) # <class 'datetime.timedelta'>
self.day_delta = datetime.timedelta(days=1) # <class 'datetime.timedelta'>
self.o_table = None
pd.set_option('mode.chained_assignment', None)
""" 3 valeurs pour b_ticks : True (ticks), False (candles) ou None (renkos). """
file_name = 'tick' if b_ticks is True else ('candle' if b_ticks is False else 'renko')
sql_file = os.path.abspath(f"{o_ctrl.symbol_dir}/{file_name}.sql")
self.db = SqliteDatabase(sql_file)
self.d_tables = dict()
""" Création physique du fichier *.sql sur disque dur. """
self.db.connect()
def change_datetime_column(self, df):
""" Mise en conformité de la colonne DateTime : string to float. """
if 'DateTime' in df.columns:
df = df.rename(columns={'DateTime': 'timestamp'})
dt0 = np.datetime64(self.dt.get_dtstr_from_dtstamp(0, dt_format='%Y-%m-%d %H:%M:%S')) # '1970-01-01 01:00'
t_delta = np.timedelta64(1, 's') # <class 'numpy.timedelta64'>
pd_dt = pd.to_datetime(df.timestamp, format='%m/%d/%Y %H:%M:%S.%f') # format nécessaire, sinon très lent.
last_stamp = (np.datetime64(pd_dt.iloc[-1]) - dt0) / t_delta
o_d = self.dt.get_date_from_dtstamp(last_stamp)
yw = o_d.isocalendar()[:2] # (year, week)
len_batch = int(.95 * df.shape[0] / 50)
l_ts = list()
self.ut.gauge(f'Conformité {yw}')
for _from in range(0, df.shape[0], len_batch):
if keyboard.is_pressed(self.kb_break):
self.ut.printc("Arrêt manuel demandé.")
self.ut.gauge('end')
return None
_to = _from + len_batch
self.ut.gauge()
l_ts += [(np.datetime64(x) - dt0) / t_delta for x in pd_dt[_from: _to]]
self.ut.gauge('end')
df.timestamp = l_ts
return df
def get_db_sign(self):
nb_enr = 0
for o_table in list(self.d_tables.values()):
nb_enr += o_table.select().count()
return nb_enr
def _get_range_from_df(self, df):
""" Renvoie un tuple de stamps, qui encadrent df, à l'extérieur de df (samedis 22:00). """
f_stamp = df['timestamp'].iloc[0] # Premier stamp réel dans df. <-- début.
l_stamp = df['timestamp'].iloc[-1] # Dernier stamp réel dans df. <-- fin.
stamp_before = f_stamp + self.dt.get_stamp_offset(f_stamp, b_after=False, num_day=6, _time='22:00')
stamp_after = l_stamp + self.dt.get_stamp_offset(l_stamp, b_after=True, num_day=6, _time='22:00')
return stamp_before, stamp_after
def _get_range_from_yw(self, year, str_week):
stamp = self.dt.get_dtstamp_from_dtstr(f'{year}-{str_week}-3 10', dt_format='%G-%V-%u %H') # Mercredi 10:00
stamp_before = stamp + self.dt.get_stamp_offset(stamp, b_after=False, num_day=6, _time='22:00')
stamp_after = stamp + self.dt.get_stamp_offset(stamp, b_after=True, num_day=6, _time='22:00')
return stamp_before, stamp_after
def is_volume_in_week(self, *yw):
""" Compte le nombre d'enregistrements total et le nombre d'enregistrements ayant du volume. """
""" Limites de la semaine : samedi 22:00 à samedi 22:00. """
range_stamps = self._get_range_from_yw(*yw)
""" Parcours de toutes les tables et cumul des valeurs. """
nb_total = nb_volume = 0
for o_table in list(self.d_tables.values()):
nb_total += o_table.select(o_table.Volume).where(
o_table.timestamp.between(*range_stamps)
).count()
nb_volume += o_table.select(o_table.Volume).where(
o_table.timestamp.between(*range_stamps),
~o_table.Volume.is_null()
).count()
return nb_volume > nb_total * .98 # Booléen.
def delete_week(self, *yw):
""" Supprime la semaine {yw} de chacune des tables """
range_stamps = self._get_range_from_yw(*yw)
for o_table in list(self.d_tables.values())[::-1]: # Parcours à l'envers.
o_table.delete().where(o_table.timestamp.between(*range_stamps)).execute()
""" Méthodes surchargées. """
def df_to_table(self, df):
return True
def update_derived(self, df):
pass
def update_derived_table(self, df, stamp_before, stamp_after, o_derived_table):
pass
def week_exists(self, year, str_week, csv):
return False
Remarque : Pour ces tests, le code concernant les volumes est provisoirement commenté (recherchez 'volume')./trading/historiques/db_candle.py :
# Imports externes
from peewee import Model, FloatField, IntegerField
# Imports internes
from trading.historiques.db import DataBase
# noinspection PyTypeChecker, PyProtectedMember, DuplicatedCode
class DbCandle(DataBase):
def __init__(self, o_ctrl):
super().__init__(o_ctrl=o_ctrl, b_ticks=False)
db = self.db
""" Classe mère (hérite de peewee.Model). """
class BaseCandles(Model):
# https://docs.peewee-orm.com/en/latest/peewee/models.html#field-types-table
timestamp = IntegerField(primary_key=True) # Index==True et null==False par défaut => vide refusé.
BidOpen = FloatField()
BidHigh = FloatField()
BidLow = FloatField()
BidClose = FloatField()
AskOpen = FloatField()
AskHigh = FloatField()
AskLow = FloatField()
AskClose = FloatField()
Volume = IntegerField(null=True)
class Meta:
# db_table = 'xxxx' Si omis, le nom de la table sera automatique = nom de la classe en minuscules.
database = db
order_by = 'timestamp'
""" Classes dérivées (héritent de BaseCandles). """
class M1(BaseCandles):
""" Table principale. """
seconds = 60
class M5(BaseCandles):
seconds = 5 * 60
class M15(BaseCandles):
seconds = 15 * 60
class M30(BaseCandles):
seconds = 30 * 60
class H1(BaseCandles):
seconds = 60 * 60
class H4(BaseCandles):
seconds = 4 * 60 * 60
class Day(BaseCandles):
seconds = 24 * 60 * 60
class Week(BaseCandles):
seconds = 5 * 24 * 60 * 60 # Marché ouvert 5 jours par semaine.
""" Instanciations. """
self.d_tables = {'m1': M1, 'm5': M5, 'm15': M15, 'm30': M30, 'h1': H1, 'h4': H4, 'day': Day, 'week': Week}
self.o_table = M1
""" Création des tables dans le fichier candle.sql. """
with db:
self.db.create_tables(list(self.d_tables.values()))
def df_to_table(self, df):
""" Écriture SANS DOUBLONS des données de la DateFrame Pandas {df} dans la table principale M1. """
""" 1 - Mise en conformité de la colonne date-time. """
df = self.change_datetime_column(df)
if df is None:
return False
""" 2 - Écriture dans la table M1 : ~38sec pour 1_000_000 lignes.
replace_many() préférable à insert_many() car remplace en cas de doublon, donc pas d'erreur. """
l_datas = df.to_dict(orient='records') # <class 'list'> <-- ~3sec pour 1_000_000 lignes.
self.o_table.replace_many(l_datas).execute()
""" 3 - Écriture dans les tables dérivées : M5 à week. """
return self.update_derived(df)
def update_derived(self, df):
""" Tables dérivées de {M1}. """
stamp_before, stamp_after = self._get_range_from_df(df) # 2 samedis à 22:00.
self.ut.gauge('Tables dérivées de M1')
for o_derived_table in list(self.d_tables.values())[1:]: # Toutes les tables, sauf 'M1'
self.update_derived_table(df, stamp_before, stamp_after, o_derived_table)
self.ut.gauge(char=f'{o_derived_table._meta.table_name.upper()} ▒')
self.ut.gauge('end') # Passe à la ligne en fin de traitement.
self.ut.gauge('end') # Ligne vide supplémentaire.
return True
def update_derived_table(self, df, stamp_before, stamp_after, o_derived_table):
""" Une seule table. """
l_batches = list()
first_stamp, last_stamp, step = int(stamp_before + 86_400), int(stamp_after - 86_400), o_derived_table.seconds
last = ((last_stamp - first_stamp) // step) - 1
for i, since in enumerate(range(first_stamp, last_stamp, step)):
_from = stamp_before if i == 0 else since
_to = stamp_after if i == last else since + step
filtered_df = df[(df['timestamp'] >= _from) & (df['timestamp'] <= _to)]
if filtered_df['timestamp'].count() == 0:
continue
l_record = list()
l_record.append(filtered_df['timestamp'].iloc[-1]) # timestamp de fin.
l_record.append(filtered_df['BidOpen'].iloc[0]) # bidopen de début.
l_record.append(filtered_df['BidHigh'].max()) # valeur max des bidhigh.
l_record.append(filtered_df['BidLow'].min()) # valeur min des bidlow.
l_record.append(filtered_df['BidClose'].iloc[-1]) # bidclose de fin.
l_record.append(filtered_df['AskOpen'].iloc[0]) # askopen de début.
l_record.append(filtered_df['AskHigh'].max()) # valeur max ds askhigh.
l_record.append(filtered_df['AskLow'].min()) # valeur min des asklow.
l_record.append(filtered_df['AskClose'].iloc[-1]) # askclose de fin.
l_batches.append(l_record)
""" Écriture de l_batches (plusieurs enregistrements en une seule fois) dans la table o_table. """
with self.db.transaction():
o_derived_table.replace_many(l_batches).execute()
def week_exists(self, year, str_week, csv_size):
""" Toutes les tables sont testées. Une seule table défaillante suffit pour renvoyer False. """
""" Samedi avant, samedi après. Le milieu de la semaine est '3 10' -> mercredi 10:00. """
stamp = self.dt.get_dtstamp_from_dtstr(f'{year}-{str_week}-3 10', dt_format='%G-%V-%u %H') # Mercredi 10:00
stamp_before = stamp + self.dt.get_stamp_offset(stamp, b_after=False, num_day=6, _time='22:00')
stamp_after = stamp + self.dt.get_stamp_offset(stamp, b_after=True, num_day=6, _time='22:00')
for o_table in list(self.d_tables.values()): # Toutes les tables.
count = o_table.select().where(o_table.timestamp.between(stamp_before, stamp_after)).count()
""" Relation empirique entre la taille du fichier sur disque et le nombre attendu d'enregistrements. """
corr = .65 # Coefficient de correction.
nb_min = max(1, int(corr * csv_size / o_table.seconds))
if count < nb_min:
return False
return True
def update_volume(self, df_tick):
""" Update la colonne 'Volume' de chaque table, à partir de la dataframe des ticks. """
""" Limites de la semaine : samedi 22:00 à samedi 22:00. """
stamp_before, stamp_after = df_tick['timestamp'].iloc[0], df_tick['timestamp'].iloc[-1]
yw = self.dt.get_date_from_dtstamp((stamp_before + stamp_after)/2).isocalendar()[:2]
nb_enr = 0
for o_table in list(self.d_tables.values()):
nb_enr += o_table.select().where(o_table.timestamp.between(stamp_before, stamp_after)).count()
gauge_step = nb_enr // 50
indx = 0
self.ut.gauge(f"Volume dans les tables Candle ({yw[0]}, {yw[1]})") # >1514580360 >1514840400
for o_table in list(self.d_tables.values()):
batch = o_table.select().where(o_table.timestamp.between(stamp_before, stamp_after))
timestamp_ante = int(stamp_before)
l_batch = list()
for rec in batch:
indx += 1
if indx % gauge_step == 0:
nb = indx // gauge_step # 0 <= nb <= 50
char = '▒' if nb < 41 else ('▄' if nb < 46 else '.')
self.ut.gauge(char=char)
period = timestamp_ante + 1, rec.timestamp
vol = df_tick[df_tick['timestamp'].between(*period)].shape[0]
l_batch.append((rec.timestamp, rec.BidOpen, rec.BidHigh, rec.BidLow, rec.BidClose,
rec.AskOpen, rec.AskHigh, rec.AskLow, rec.AskClose, vol))
timestamp_ante = rec.timestamp
o_table.replace_many(l_batch).execute()
self.ut.gauge('end')
if keyboard.is_pressed('ctrl') 2 fois dans ctrl_histos.py.Ctrl, souvent utilisée, provoquera un arrêt involontaire du programme Python.self.kb_break à la place de 'ctrl'.ctrl_histos.py./trading/historiques/ctrl_histos.py :
# Imports externes
import datetime
import gzip
import hashlib
import os # http://www.python-simple.com/python-modules-fichiers/os-path.php (Ctrl + clic)
import pickle
import requests
from io import BytesIO, StringIO
import pandas as pd
import keyboard # https://www.delftstack.com/fr/howto/python/python-detect-keypress/
# Imports internes
from functions.utils import DateTime, Utils
from trading.historiques.db_candle import DbCandle
from trading.historiques.db_tick import DbTick
class CtrlHistos:
def __init__(self, instrument):
""" https://github.com/fxcm/MarketData
Tous les symboles :
AUDCAD, AUDCHF, AUDJPY, AUDNZD, AUDUSD, CADCHF, CADJPY, EURAUD, EURCHF,
EURGBP, EURJPY, EURNZD, EURUSD, GBPCAD, GBPCHF, GBPJPY, GBPNZD, GBPUSD,
NZDCAD, NZDCHF, NZDJPY, NZDUSD, USDCAD, USDCHF, USDJPY, USDTRY
Les plus utilisés (Dans l'ordre d'importance) :
EURUSD, USDJPY, EURCHF, USDCAD, NZDUSD, EURGBP, EURJPY, GBPJPY, GBPCHF, GBPUSD """
self.instrument = instrument
self.symbol = instrument.replace('/', '')
self.dt = DateTime()
self.ut = Utils()
self.kb_break = 'f12' # 'shift', 'ctrl', ... à votre convenance.
self.symbol_dir = None
self.db_candle = None
self.db_tick = None
self.setup()
def setup(self):
""" Les données peuvent se trouver sur un disque dur SSD autre que celui en cours.
Le cas échéant, db_path doit être modifié (chemin absolu).
Il est également possible, pour les besoins des tests, d'avoir plusieurs bases de données.
|_ Il faudra alors modifier db_path chaque fois que l'on désire switcher. """
db_path = os.path.dirname(__file__) # <--- Chemin par défaut, à modifier si autre disque dur.
self.symbol_dir = os.path.abspath(f'{db_path}/db/{self.symbol}') # Dossier des fichiers d'historiques.
os.makedirs(self.symbol_dir, exist_ok=True)
self.db_candle = DbCandle(self)
self.db_tick = DbTick(self)
def _helper(self, key, *l_params):
""" ************ Collection d'utilitaires. Nombre de paramètres variable. ************ """
if key == 'nb_weeks_in_year':
""" Renvoie le nombre de semaines dans l'année {year}. """
year = l_params[0] # l_params = [année dont on veut connaître le nombre de semaines].
if len(l_params) > 1:
y, num_week, _ = datetime.date.today().isocalendar()
if y == year:
return num_week - 1
o_dat = self.dt.get_date_from_dtstamp(self.dt.get_dtstamp_from_dtstr(f'{year}-12-28', '%Y-%m-%d'))
return o_dat.isocalendar()[1]
elif key == 'l_iso_yw':
""" Renvoie une liste de tuples ISO (iso_year, iso_str_week) : <-- exemple de tuple : (2019, '08')
- Pour la devise en cours et son type (tick ou candle).
- Toutes les semaines, du début à aujourd'hui.
- L'année est un int, la semaine une str ('02', '12', ...) """
b_ticks = l_params[0] # l_params = [b_ticks].
st_yw = set() # Le set() évite les doublons.
since = 2018 if b_ticks else 2012
yw_today = datetime.date.today().isocalendar()[:2]
for year in range(since, yw_today[0] + 1):
for week in range(1, 54):
iso = self.dt.isoyw_from_fxyw(year, week)
st_yw.add((iso[0], f'0{iso[1]}'[-2:]))
if (year, week) >= yw_today:
lt_yw = list(st_yw)
lt_yw.sort()
return lt_yw
elif key == 'csv_file':
""" Renvoie le chemin complet du fichier correspondant à la devise en cours, à l'année et à la semaine. """
year, week, b_ticks = l_params # l_params = [*t_yw, b_ticks]
str_week = f'0{week}'[-2:]
return os.path.join(self.symbol_dir, str(year), f"{'tick' if b_ticks else 'candle'}_"
f"{str_week}.csv")
elif key == 'url_file':
""" Renvoie l'url du fichier d'histos correspondant à la devise en cours, à l'année et à la semaine. """
year, str_week, b_ticks = l_params # l_params = [*t_yw, b_ticks]
url_candle = 'https://candledata.fxcorporate.com/m1'
url_tick = 'https://tickdata.fxcorporate.com'
return f'{url_tick if b_ticks else url_candle}/{self.symbol}/{year}/{int(str_week)}.csv.gz'
def verify_weeks(self, b_ticks, b_silent=False):
""" Affichage d'un tableau dans la console (Lignes=années, Colonnes=semaines).
'▒' ou '▄' ou '▀' = semaine présente, '.' = semaine absente. """
""" 1 - Listes des semaines réellement existantes en base de données et en fichier .csv. """
l_weeks_csv, l_weeks_db = self._existing_lists(b_ticks)
""" 2 - Affichage du titre et entêtes de colonnes. """
typ = 'Ticks' if b_ticks else 'Candles'
self.ut.gauge(f"{self.instrument}-{typ}", large=17)
[self.ut.gauge(char=f"{str(week) :<2}") for week in range(1, 54)]
self.ut.gauge('end')
""" 3 - Affichage du contenu - Ordonnées = années, abscisse = semaines. """
l_weeks = l_weeks_csv + l_weeks_db
l_weeks.sort()
first_year = (2018 if b_ticks else 2012) if len(l_weeks) == 0 else l_weeks[0][0]
year_now = datetime.date.today().isocalendar()[0]
for year in range(first_year, year_now + 1):
self.ut.gauge(year, large=17)
""" Boucle sur les semaines de l'année {year} : 1 à (52 ou 53). """
for week in range(1, self._helper('nb_weeks_in_year', year, 'today') + 1): # today limite à aujourd'hui.
t_yw = year, f'0{week}'[-2:] # 7 -> 07
b_csv, b_db = t_yw in l_weeks_csv, t_yw in l_weeks_db
if not b_csv and not b_db: # 0 0 /x./y
char, color = '. ', 'BLEU'
elif not b_csv and b_db: # 0 1 /x.y
char, color = '▀ ', 'VERT'
elif b_csv and not b_db: # 1 0 x./y
char, color = '▄ ', 'VIOLET'
else: # 1 1 x.y
char, color = '▒ ', 'BLEU'
self.ut.gauge(char=char, color=color)
self.ut.gauge('end')
""" Vérification de la synchronisation entre la base de données et les fichiers csv. """
msg = "\nAttention : La base de données n'est pas entièrement synchronisée ! csv + db = rectangles bleus." \
"\nL'affichage ci-dessus montre les fichiers .csv en carrés-bas mauves, la db en carrés-hauts verts." \
if not b_silent and l_weeks_csv != l_weeks_db else ''
self.ut.printc(msg)
def download_histos(self, nb_weeks, b_ticks):
if nb_weeks > 1:
self.ut.printc(f"\nArrêt manuel : garder la touche '{self.kb_break.upper()}' appuyée.\n")
""" Intervalle des semaines à télécharger (nombres en base 53). """
""" |_ since = Depuis. """
l_weeks_csv, _ = self._existing_lists(b_ticks=b_ticks) # Semaines = str <-- '08', '09', '10', ...
l_csv = [(y, int(w)) for (y, w) in l_weeks_csv] # Semaines = int <-- 8, 9, 10, ...
for y, w in l_csv:
if w > 2:
since = y * 53 + w - 1
break
else:
since = (2018 if b_ticks else 2012) * 53
y_today, w_today = datetime.date.today().isocalendar()[:2]
now = y_today * 53 + w_today
""" |_ now = Jusqu'à. """
""" l_required = Liste des fichiers à télécharger. """
l_required = list()
for yw53 in range(since, now-1): # dernière semaine = (now-1) => pas la semaine en cours.
csv_yw = yw53 // 53, 1 + yw53 % 53
csv_file = self._helper('csv_file', *csv_yw, b_ticks)
if not os.path.isfile(csv_file):
""" Fichier absent localement => à télécharger. """
l_required.append(csv_yw)
db = self.db_tick if b_ticks else self.db_candle
nb_weeks_loaded = 0
""" Parcours des semaines à traiter. """
for csv_yw_required in l_required:
df = self._download_csv_file(csv_yw_required, b_ticks)
if df is None:
""" Arrêt manuel demandé. """
break
elif df.shape[0] > 0:
""" 1 - Enregistrement du fichier téléchargé sur disque dur. """
self._df_to_csv(df, csv_yw_required, b_ticks)
""" 2 - Écriture du contenu en base de données. break si arrêt manuel demandé. """
if not db.df_to_table(df):
continue
""" 3 - Update des volumes à partir des ticks. """
# df_stamps = self.db_tick.get_df_stamps(*csv_yw_required)
# if df_stamps.shape[0] > 0:
# self.db_candle.update_volume(df_stamps)
""" Arrêt si le nombre de semaines demandées est atteint. """
nb_weeks_loaded += 1
if nb_weeks_loaded >= nb_weeks:
break
s = 's' if nb_weeks > 1 else ''
self.ut.printc(f"Téléchargement{s} terminé{s}.", color='vert')
def synchro_db_csv(self, b_ticks):
""" A l'issue de cette méthode, TOUTE la db sera l'image de TOUS les fichiers csv. """
""" 1 - Base de données. """
db, typ = (self.db_tick, 'Ticks') if b_ticks else (self.db_candle, 'Candles')
""" 2 - État actuel des historiques => Différences entre csv et db
- l_drop = semaines à supprimer de la db, l_add = semaines à ajouter à la db. """
l_drop, l_add, l_vol = self._lists_to_update_db(b_ticks=b_ticks)
if l_drop+l_add+l_vol == []:
self.ut.printc(f"{typ} csv <--> db : OK. La synchronisation est inutile.\n", color='VERT')
return True
""" 3 - Message : état initial. """
for l_weeks in [l_drop, l_add]:
if len(l_weeks) > 0:
s = 's sont' if len(l_weeks) > 1 else ' est'
verb = 'ajouter à' if l_weeks == l_add else 'supprimer de'
self.ut.printc(f"{len(l_weeks)} semaine{s} à {verb} la base de données '{typ.lower()}'.", color='VERT')
""" 4 - Ajout de données dans la db. """
self._add_weeks(l_add, b_ticks)
""" 5 - Suppression de données de la db. """
for yw in l_drop:
db.delete_week(*yw)
""" 6 - Update des volumes. """
# for yw in l_vol:
# if keyboard.is_pressed(self.kb_break):
# self.ut.printc("Arrêt manuel demandé.")
# break
# df_stamps = self.db_tick.get_df_stamps(*yw)
# self.db_candle.update_volume(df_stamps)
""" 7 - Vérification. """
l_drop, l_add, l_vol = self._lists_to_update_db(b_ticks=b_ticks)
success, color = ('réussi', 'VERT') if l_drop+l_add+l_vol == [] else ('échoué', 'ROUGE')
self.ut.printc(f'{typ} : La synchronisation a {success}.', color=color)
def _existing_lists(self, b_ticks):
""" Retourne les listes de semaines des fichiers csv et des enregistrements en base de données. """
""" Exécution rapide si les signatures correspondent (~1ms). """
sign_now = self._signature(b_ticks) # Signature actuelle.
lists_file = os.path.join(self.symbol_dir, 'lists_tick.pkl' if b_ticks else 'lists_candle.pkl')
if os.path.isfile(lists_file):
with open(lists_file, 'rb') as l_lists:
sign_ante, l_weeks_csv, l_weeks_db = pickle.load(l_lists) # Signature persistée.
if sign_now == sign_ante:
return l_weeks_csv, l_weeks_db
""" Exécution lente si les signatures ne correspondent pas (~3 secondes, mais seulement la 1ère fois). """
""" s_weeks_csv n'est pas une list(), mais un set(), pour éviter les doublons. """
l_weeks_csv, s_weeks_db = list(), set()
db = self.db_tick if b_ticks else self.db_candle
l_iso_yw = self._helper('l_iso_yw', b_ticks) # Liste de toutes les semaines.
typ = 'Ticks' if b_ticks else 'Candles'
self.ut.gauge(f"{self.instrument}-{typ} Création listes :", large=0)
for num, iso_yw in enumerate(l_iso_yw):
""" Semaines existant en fichiers csv : Pour une semaine iso, on a 0, 1 ou 2 fichiers csv. """
l_csv_files, last_stamp = self._get_csv_files(*iso_yw, b_ticks)
csv_size = 0
if len(l_csv_files) > 0:
l_weeks_csv.append(iso_yw)
for csv_file in l_csv_files:
csv_size += os.path.getsize(csv_file)
""" Liste des semaines existant en base de données. """
csv = last_stamp if b_ticks else csv_size
if db.week_exists(*iso_yw, csv):
s_weeks_db.add(iso_yw) # (1)
if num % 8 == 0:
self.ut.gauge(char='.') # , color='Jaune')
self.ut.gauge('end')
self.ut.gauge('end')
""" Persistance en pkl. """
l_weeks_db = list(s_weeks_db)
l_weeks_csv.sort()
l_weeks_db.sort()
with open(lists_file, 'wb') as l_lists:
pickle.dump((sign_now, l_weeks_csv, l_weeks_db), l_lists, pickle.HIGHEST_PROTOCOL)
return l_weeks_csv, l_weeks_db
def _signature(self, b_ticks):
""" - Cette méthode est d'exécution rapide (~1 ms).
- Elle produit une signature sur 256 bits.
- Elle permet de savoir si des modifications ont été apportées à la db et/ou aux fichiers csv. """
db = self.db_tick if b_ticks else self.db_candle
y, w = datetime.date.today().isocalendar()[:2]
sign = str(53*y + w + db.get_db_sign())
for root, sub_folder, l_csv in os.walk(self.symbol_dir):
if root == self.symbol_dir:
continue
else:
""" Concaténation de tous les noms de fichiers .csv dans {self.symbol_dir} """
l_csv = [csv for csv in l_csv if csv.startswith('tick' if b_ticks else 'candle')] # Filtrage.
sign += root + ''.join(l_csv)
_hash = hashlib.sha256(sign.encode('utf-8')).hexdigest()
return _hash
def _get_csv_files(self, iso_year, iso_week, b_ticks):
""" Attention ! Les noms de fichiers csv ne correspondent pas toujours à leur contenu ! """
""" Return : liste de noms complets de fichiers csv correspondant à (iso_year, iso_week) : 0, 1 ou 2 fichiers.
fxcorporate ne respecte pas la norme ISO. De plus :
- D'une année sur l'autre, le nom du fichier csv ne correspond pas à son contenu.
- On ne trouve pas de correspondance entre diverses devises.
- Par conséquent, le nom du fichier n'est pas fiable pour trouver le bon N° de semaine.
La méthode adoptée pour contourner ce problème est la suivante :
- Recherche de 3 fichiers csv voisins de (iso_year, iso_week) passés en paramètres.
- Lecture, pour chacun, de la date-heure du milieu du fichier.
- On en déduit le VRAI N° de semaine ISO.
Il arrive parfois, au changement d'année, qu'une même semaine soit composée de 2 fichiers csv.
- C'est pourquoi la valeur de retour est une liste. """
int_week, l_fxyw = int(iso_week), list()
last_stamp = 0
for week in range(int_week - 1, int_week + 2): # 3 Semaines : précédente, actuelle, suivante.
""" Correction : N° de semaine en base 53. """
y, w = (iso_year, week) if week > 0 else (iso_year - 1, week + 53)
y, w = (y, w) if w <= 53 else (y + 1, w - 53)
""" Les 3 fichiers csv candidats. """
str_w = f'0{w}'[-2:]
file_name = f"{'tick' if b_ticks else 'candle'}_{str_w}.csv" # 'tick_??.csv' ou 'candle_??.csv'.
csv_path = os.path.abspath(f"{os.path.dirname(self.symbol_dir)}/{self.symbol}/{y}/{file_name}")
if os.path.isfile(csv_path):
with open(csv_path, 'r') as fh:
""" On 'goûte' les 300 derniers bytes du fichier pour connaitre le N° ISO de semaine. """
nb_char = 300
fh.seek(0, os.SEEK_END)
fh.seek(fh.tell() - nb_char)
dt_str = fh.read().split('\n')[-3].split(',')[0]
last_stamp = self.dt.get_dtstamp_from_dtstr(dt_str)
o_d = self.dt.get_date_from_dtstamp(last_stamp) # Objet date.
if o_d.isocalendar()[:2] == (iso_year, int_week): # Filtrage.
l_fxyw.append(csv_path) # Candidat sélectionné.
return l_fxyw, last_stamp # 0, 1 ou 2 éléments.
def _download_csv_file(self, csv_yw_required, b_ticks):
""" Téléchargement, décompactage, décodage, DataFrame, enregistrement sur disque dur.
- return DataFrame. """
""" 1 - Demande au helper de construire l'url. """
url_required = self._helper('url_file', *csv_yw_required, b_ticks)
""" 2 - Initialisation des bytes. """
content = b''
""" 3 - Téléchargement en streaming avec lots de taille {size//50}, permettant l'insertion d'une jauge. """
y, w = csv_yw_required
with requests.get(url_required, stream=True) as req:
size = int(req.headers['Content-Length'])
if size < 1_000:
return pd.DataFrame() # return df vide.
print()
self.ut.gauge(f"Téléchargement {' ticks ' if b_ticks else 'candles'} {self.symbol} ({y}, {w})")
for chunk in req.iter_content(chunk_size=size//50):
if keyboard.is_pressed(self.kb_break):
self.ut.printc("Arrêt manuel demandé.")
self.ut.gauge('end')
return None
content += chunk
""" 3.1 - Jauge de progression. """
self.ut.gauge()
self.ut.gauge('end')
""" 4 - Décompactage du buffer de données binaires (codées utf), en mémoire. """
buf = BytesIO(content)
f = gzip.GzipFile(fileobj=buf)
coded_bytes = f.read()
""" 5 - Décodage : utf-8 pour les candles, utf-16 pour les ticks. """
codec, type_data = ('utf-16', 'tick') if b_ticks else ('utf-8', 'candle')
decoded_bytes = coded_bytes.decode(codec)
""" 6 - Conversion Bytes -> String """
decoded_str = StringIO(decoded_bytes)
""" 7 - String to DataFrame. """
df = pd.read_csv(decoded_str) # Pas de colonne-index (index_col=0 retiré)
""" 8 - Retourne le dataframe. """
return df
def _df_to_csv(self, df, t_yw, b_ticks):
""" Contrôle préalable : si le dossier-cible n'existe pas, on le crée. """
csv_path = self._helper('csv_file', *t_yw, b_ticks)
os.makedirs(os.path.dirname(csv_path), exist_ok=True)
df.to_csv(csv_path, index=False) # Copie du fichier .csv sur disque dur.
@staticmethod
def _csv_to_df(csv_file):
""" Conversion du fichier {csv_file} en dataframe. """
return pd.read_csv(csv_file) if os.path.isfile(csv_file) else pd.DataFrame()
def _lists_to_update_db(self, b_ticks):
""" Retourne 3 listes : l_drop, l_add et l_vol.
Algorithme pour l_drop et l_add :
- (1) = l_weeks_db : Liste des semaines réellement existantes en base de données.
- (2) = l_weeks_csv : Liste des semaines réellement existantes en fichiers csv.
- Intersection (Inter) = (1) ⋂ (2)
- l_drop : Liste des semaines à supprimer de la db (celles qui n'existent pas en csv) : (1) - (Inter)
- l_add : Liste des semaines à ajouter dans la db (celles des csv qui n'existent pas en db) : (2) - (Inter)
--------------------------------
l_vol = liste des semaines (yw) de candles nécessitant un update du champ 'Volume' dans les tables.
Sans objet pour les ticks.
"""
""" 1 et 2 - Listes des semaines réellement existantes en base de données et en fichier .csv. """
l_weeks_csv, l_weeks_db = self._existing_lists(b_ticks=b_ticks)
""" 3 - Liste des semaines à supprimer de la db (celles qui n'existent pas en csv). """
l_drop = list()
for t_yw in l_weeks_db:
if t_yw not in l_weeks_csv:
# print('A supprimer :', t_yw) # MAP : Permet de lister les semaines qui sont à supprimer.
l_drop.append(t_yw)
""" 4 - Liste des semaines à ajouter à la db (celles des csv qui n'existent pas en db). """
l_add = list()
for t_yw in l_weeks_csv:
if t_yw not in l_weeks_db:
# print('A ajouter :', t_yw) # MAP : Permet de lister les semaines qui sont à ajouter.
l_add.append(t_yw)
""" 5 - Liste des semaines de ticks pour lesquelles les volumes des candles sont à updater. """
l_vol = list()
# if not b_ticks: # Renkos non concernés.
# """ Semaines ticks existantes (b_ticks=True). """
# _, l_weeks_tick_db = self._existing_lists(b_ticks=True)
# for yw in l_weeks_tick_db:
# if yw in l_weeks_db:
# if not self.db_candle.is_volume_in_week(*yw):
# l_vol.append(yw)
return l_drop, l_add, l_vol
def _add_weeks(self, l_yw, b_ticks):
"""
@param l_yw : Liste des semaines à ajouter à la base de données.
@param b_ticks: True = ticks, False = candles.
"""
db = self.db_tick if b_ticks else self.db_candle
self.ut.printc(f"\nArrêt manuel : garder la touche '{self.kb_break.upper()}' appuyée.\n")
for iso_yw in l_yw:
if keyboard.is_pressed(self.kb_break):
self.ut.printc("Arrêt manuel demandé.")
break
l_csv_files, _ = self._get_csv_files(*iso_yw, b_ticks) # Liste de 0, 1 ou 2 fichiers.
for csv_file in l_csv_files:
df = self._csv_to_df(csv_file)
if df.shape[0] > 0:
db.df_to_table(df)
def main(): # MAP ***** MAP ***** MAP ***** MAP ***** MAP ***** MAP ***** MAP *****
""" Code de MAP (mise au point). Espace expérimental : ajouter, supprimer, commenter, décommenter, ... """
h = CtrlHistos('EUR/USD')
# h = CtrlHistos('USD/JPY')
""" Candles. """
# h.download_histos(10, b_ticks=False)
# h.synchro_db_csv(b_ticks=False)
h.verify_weeks(b_ticks=False) # , b_silent=True)
""" Ticks. """
# h.download_histos(2, b_ticks=True)
# h.synchro_db_csv(b_ticks=True)
# h.verify_weeks(b_ticks=True)
if __name__ == '__main__':
main()
self.kb_break = 'f12' dans l'init, mais vous pouvez modifier cette valeur à votre convenance.
DbTick a changé : self au lieu de self.symbol_dir. Remplacer le fichier db_tick.py./trading/historiques/db_tick.py :
# Imports internes
from trading.historiques.db import DataBase # Classe-parent.
class DbTick(DataBase):
def __init__(self, o_ctrl):
super().__init__(o_ctrl=o_ctrl, b_ticks=True)
main() : Vérifiez que seul verify_weeks() pour les Candles est activé.
ctrl_histos.py : Run 'ctrl_histos'.main() : Activer download_histos() pour les Candles.
ctrl_histos.py.main() : Activer synchro_db_csv() pour les Candles.
Description
main() à la fin du fichier ctrl_histos.py :
def main(): # MAP ***** MAP ***** MAP ***** MAP ***** MAP ***** MAP ***** MAP *****
""" Code de MAP (mise au point). Espace expérimental : ajouter, supprimer, commenter, décommenter, ... """
h = CtrlHistos('EUR/USD')
# h = CtrlHistos('USD/JPY')
""" Candles. """
# h.download_histos(1, b_ticks=False)
# h.synchro_db_csv(b_ticks=False)
# h.verify_weeks(b_ticks=False) # , b_silent=True)
""" Ticks. """
# h.download_histos(1, b_ticks=True)
h.verify_weeks(b_ticks=True)
# h.synchro_db_csv(b_ticks=True)
Une seule ligne est activée : h.verify_weeks(b_ticks=True)
Run 'ctrl_histos').DbTick n'est pas encore codée.
Petits carrés mauves = seulement fichiers dans disque dur.
main() : Activer synchro_db_csv() pour les Ticks.
ctrl_histos.py (Run 'ctrl_histos').
La synchronisation a échoué car la classe DbTick n'est pas encore codée.
DbCandle, elle hérite de DataBase.tick.sql.Ticks.timestamp, Bid et Ask./trading/historiques/db_tick.py :
# Imports externes
from peewee import Model, FloatField
import pandas as pd
# Imports internes
from trading.historiques.db import DataBase # Classe-parent.
from trading.historiques.db_renko import DbRenko
# noinspection DuplicatedCode
class DbTick(DataBase):
def __init__(self, o_ctrl):
super().__init__(o_ctrl=o_ctrl, b_ticks=True)
db = self.db
self.o_renko = DbRenko(o_ctrl)
class Ticks(Model):
timestamp = FloatField(primary_key=True) # Index==True et null==False par défaut => vide refusé.
Bid = FloatField()
Ask = FloatField()
class Meta:
database = db
order_by = 'timestamp'
""" Instanciations. """
self.o_table = Ticks
""" Création de la table 'ticks' dans le fichier tick.sql. """
self.db.create_tables([Ticks])
Ce code permet juste de créer la base de données tick.sql et la table Tick.
self.o_renko est une instance de la classe DbRenko.DbRenko minimaliste pour éviter des erreurs./trading/historiques/db_renko.py :
# Imports internes
from trading.historiques.db import DataBase
class DbRenko(DataBase):
def __init__(self, o_ctrl):
super().__init__(o_ctrl=o_ctrl, b_ticks=None)
def range_exists(self, stamp_before, stamp_after):
""" Les 5 premières tables sont testées. Une seule table défaillante suffit pour renvoyer False.
- En effet, pour les grandes mailles (> 7), il arrive qu'il n'y ait pas d'enr pendant plus d'une semaine. """
# ...
return True
tick.sql ave DB Browser SQLite :
Une seule table (ticks), contenant 3 champs : timestamp, Bid et Ask.
DbTick sont : get_db_sign(), week_exists(), df_to_table() et delete_week().CtrlHistos.
Pour plus de clarté, seules les méthodes de DbTick sont représentées dans leurs contenants.
/trading/historiques/db_tick.py > DbTick :
def get_db_sign(self):
""" self.o_table.select().count() est chronophage. Remplacé par la lecture du dernier timestamp. """
try:
last_ts = int(self.o_table.select().order_by(self.o_table.timestamp.desc()).get().timestamp)
except (Exception,):
last_ts = 0
return last_ts + self.o_renko.get_db_sign()
def week_exists(self, year, str_week, last_stamp):
""" - La table de ticks est testée. Les tables Renko sont également testées.
- Une seule table défaillante suffit pour renvoyer False. """
""" Samedi avant, samedi après. Le milieu de la semaine est '3 10' -> mercredi 10:00. """
first_stamp = last_stamp - 10_000
count = self.o_table.select().where(self.o_table.timestamp.between(first_stamp, last_stamp)).count()
""" Relation empirique entre la taille du fichier sur disque et le nombre attendu d'enregistrements. """
if count == 0:
return False
return self.o_renko.range_exists(*self._get_range_from_yw(year, str_week))
def df_to_table(self, df):
""" a) Écriture Ticks (SANS DOUBLONS) : Données de la DateFrame Pandas {df} dans la table de ticks.
b) Écriture Renko (df conforme) : Délégation à l'objet {o_renko}, en dernière ligne."""
""" 1 - Mise en conformité de la colonne date-time. """
df = self.change_datetime_column(df)
if df is None:
return False
""" 2 - df -> l_datas (~700_000 lignes). """
l_datas = df.to_dict(orient='records')
""" 3 - Découpage de l_datas en 50 batches (au minimum). """
len_datas = len(l_datas)
batch_size = max(1, min(50_000, len_datas // 50))
nb_steps = len_datas // batch_size
""" 4 - Boucle d'écritures avec jauge de progression. """
self.ut.gauge('Écriture dans la table de ticks')
with self.db.transaction():
for nb in range(1 + nb_steps):
""" {batch_size} enr. pour éviter l'erreur 'OperationalError: too many SQL variables' (si > 82_000) """
indx_min = nb * batch_size
indx_max = min(len_datas, indx_min + batch_size)
datas = l_datas[indx_min: indx_max] # Slice.
""" replace_many() préférable à insert_many() car remplace en cas de doublon, donc pas d'erreur. """
self.o_table.replace_many(datas).execute()
char = '▒' if nb < (nb_steps - 9) else ('▄' if nb < (nb_steps - 4) else '.')
self.ut.gauge(char=char)
self.ut.gauge('end')
""" 5 - Écriture dans les tables Renko. """
return self.o_renko.df_to_table(df)
def delete_week(self, *yw):
""" 1) Suppression de la semaine {yw} dans la table {tick}.
2) Suppression dans chacune des tables renko, déléguée à l'objet {o_renko}, en dernière ligne. """
range_stamps = self._get_range_from_yw(*yw)
self.o_table.delete().where(self.o_table.timestamp.between(*range_stamps)).execute()
self.o_renko.delete_week(*yw)

Exemple : 7 semaines de ticks on été synchronisées.
tick.sql ave DB Browser SQLite :
7 semaines : 11 millions d'enregistrementes et 600 Mo sur disque dur !
DB Browser for SQLite ne sera utilisé que pour les Candles et les Renkos.
tick ...Python et Peewee n'ont pas ces contraintes 😊download_histos() et verify_weeks().
La semaine (2018, 13) a bien été téléchargée et entrée en base de données : rectangle bleu.

/trading/historiques/db_renko.py :
# Imports externes
from peewee import Model, FloatField, IntegerField
import keyboard
# Imports internes
from trading.historiques.db import DataBase
# noinspection PyTypeChecker,PyProtectedMember,DuplicatedCode
class DbRenko(DataBase):
def __init__(self, o_ctrl):
super().__init__(o_ctrl=o_ctrl, b_ticks=None)
db = self.db
""" Classe mère (hérite de peewee.Model). """
class BaseRenko(Model):
# https://docs.peewee-orm.com/en/latest/peewee/models.html#field-types-table
timestamp = FloatField(primary_key=True) # Index==True et null==False par défaut => vide refusé.
Renko = IntegerField()
Spread = FloatField()
Volume = FloatField()
class Meta:
database = db
order_by = 'timestamp'
""" Classes dérivées (héritent de BaseRenko). """
class Renko1(BaseRenko):
maille = 1
class Renko2(BaseRenko):
maille = 2
class Renko3(BaseRenko):
maille = 3
class Renko4(BaseRenko):
maille = 4
class Renko5(BaseRenko):
maille = 5
class Renko7(BaseRenko):
maille = 7
class Renko10(BaseRenko):
maille = 10
class Renko14(BaseRenko):
maille = 14
class Renko20(BaseRenko):
maille = 20
""" Instanciations. """
self.d_tables = {'r1': Renko1, 'r2': Renko2, 'r3': Renko3, 'r4': Renko4, 'r5': Renko5,
'r7': Renko7, 'r10': Renko10, 'r14': Renko14, 'r20': Renko20}
self.o_table = Renko1
""" Création des tables dans le fichier renko.sql. """
with db:
self.db.create_tables(list(self.d_tables.values()))
def range_exists(self, stamp_before, stamp_after):
""" Les 5 premières tables sont testées. Une seule table défaillante suffit pour renvoyer False.
- En effet, pour les grandes mailles (> 7), il arrive qu'il n'y ait pas d'enr pendant plus d'une semaine. """
# ...
return True
ctrl_histos.py localement.
Comparer le code de db_renko.py avec l'image ci-dessus pour mettre en évidence les tables et la champs.

Algorithme :
- Le contrôleur d'historiques fait la liste des semaines de Candles à updater au niveau des volumes.
- Point 5 de la méthode
CtrlHistos._lists_to_update_db()- La méthode d'update des volumes de Candles existe déjà, il s'agit de
DbCandle.update_volume().- Le contrôleur d'historiques appelle cette méthode à 2 occasions :
- Lors de la synchronisation des ticks : Point 6 de la méthode
CtrlHistos.synchro_db_csv().- Lors du téléchargement de ticks : Point 3 de la méthode
CtrlHistos.download_histos().- Pour l'instant, les 3 points mentionnés sont commentés.
update_volume() appelée a besoin d'un argument : df_stamps.
DbTick.get_df_stamp() qui n'existe pas. Créons-la :DbTick.get_df_stamp() :
def get_df_stamps(self, *yw):
""" Filtrage de la table de ticks sur la semaine passée en paramètre, puis => df (pandas.DataFrame).
Le volume est le nombre de variations du tick : https://www.ig.com/fr/glossaire-trading/volume-definition
Valeur retournée : vecteur des stamps sur la semaine {yw}.
"""
range_stamps = self._get_range_from_yw(*yw)
query_ts = self.o_table.select(self.o_table.timestamp).where(
self.o_table.timestamp.between(*range_stamps)
).order_by(self.o_table.timestamp.asc())
return pd.DataFrame(list(query_ts.dicts()))
main() de lancement de ctrl_histos.py : activer uniquement la synchro des Candles.ctrl_histos.py localement.timestamp avec cette condition : >1514000000 (1 514 millions). Cela correspond à la fin de 2017.
DbTick.week_exists().True si la semaine visée existe, False sinon.DbTick.week_exists(), à la fin :
return self.o_renko.range_exists(*self._get_range_from_yw(year, str_week))
DbRenko.range_exists() n'est pas encore codée et renvoie True de façon provisoire.La méthode DbRenko.range_exists() doit donc être codée.
DbTick.df_to_table().
""" 5 - Écriture dans les tables Renko. """
return self.o_renko.df_to_table(df)
DbRenko.df_to_table() n'existe pas. C'est la méthode-parent DataBase.df_to_table() qui est donc appelée et qui renvoie simplement True.La méthode DbRenko.df_to_table() doit donc être créée et codée.
DbTick.delete_week().
self.o_renko.delete_week(*yw)
En résumé, 2 méthodes sont à coder dans
DbRenko:range_exists()etdf_to_table()
Vérification
main() de ctrl_histos.F12) apres quelques synchronisations.
renko.sql dans DB Browser for SQLite, afficher le contenu des tables :Toutes les tables Renko contiennent des données.
/nodes/generateurs/histos/histos.py > Node.histo_data().
self.ut.printc() final.Snippets
Bonjour les codeurs !