Stockage local des cours en 'ticks' et en 'Renko'
Avant-propos
tick.sql
et renko.sql
.Volume
des tables de Candles
.
m15
), on voit que le champ Volume
est rempli de NULL
.
Examen de la table m15
par DB Browser for SQLite.
DataBase
et de DbCandle
est modifié. On trouvera notamment :
DbCandle.update_volume()
.DataBase.is_volume_in_week()
.delete_week
de DbCandle
vers DataBase
.self.o_ctrl
, self.kb_break
, .../trading/historiques/db.py
:
from peewee import SqliteDatabase
from functions.utils import DateTime, Utils
import datetime
import pandas as pd
import numpy as np
import keyboard
import os
""" (Russe - vidéo) https://www.youtube.com/watch?v=8dla28TLvwA <- Ctrl + Clic
(Français - txt) https://linuxtut.com/fr/9d4e1d0afac1865acdbb/
(Anglais - vidéo) https://www.youtube.com/watch?v=Vk6Ptnvqr4M
"""
class DataBase:
def __init__(self, o_ctrl, b_ticks):
self.o_ctrl = o_ctrl
self.b_ticks = b_ticks
self.pips = .01 if o_ctrl.symbol_dir.endswith('JPY') else .0001
self.dt = DateTime()
self.ut = Utils()
self.kb_break = o_ctrl.kb_break # Nom de la touche du clavier défini dans le contrôleur CtrlHistos.
self.week_delta = datetime.timedelta(weeks=1) # <class 'datetime.timedelta'>
self.day_delta = datetime.timedelta(days=1) # <class 'datetime.timedelta'>
self.o_table = None
pd.set_option('mode.chained_assignment', None)
""" 3 valeurs pour b_ticks : True (ticks), False (candles) ou None (renkos). """
file_name = 'tick' if b_ticks is True else ('candle' if b_ticks is False else 'renko')
sql_file = os.path.abspath(f"{o_ctrl.symbol_dir}/{file_name}.sql")
self.db = SqliteDatabase(sql_file)
self.d_tables = dict()
""" Création physique du fichier *.sql sur disque dur. """
self.db.connect()
def change_datetime_column(self, df):
""" Mise en conformité de la colonne DateTime : string to float. """
if 'DateTime' in df.columns:
df = df.rename(columns={'DateTime': 'timestamp'})
dt0 = np.datetime64(self.dt.get_dtstr_from_dtstamp(0, dt_format='%Y-%m-%d %H:%M:%S')) # '1970-01-01 01:00'
t_delta = np.timedelta64(1, 's') # <class 'numpy.timedelta64'>
pd_dt = pd.to_datetime(df.timestamp, format='%m/%d/%Y %H:%M:%S.%f') # format nécessaire, sinon très lent.
last_stamp = (np.datetime64(pd_dt.iloc[-1]) - dt0) / t_delta
o_d = self.dt.get_date_from_dtstamp(last_stamp)
yw = o_d.isocalendar()[:2] # (year, week)
len_batch = int(.95 * df.shape[0] / 50)
l_ts = list()
self.ut.gauge(f'Conformité {yw}')
for _from in range(0, df.shape[0], len_batch):
if keyboard.is_pressed(self.kb_break):
self.ut.printc("Arrêt manuel demandé.")
self.ut.gauge('end')
return None
_to = _from + len_batch
self.ut.gauge()
l_ts += [(np.datetime64(x) - dt0) / t_delta for x in pd_dt[_from: _to]]
self.ut.gauge('end')
df.timestamp = l_ts
return df
def get_db_sign(self):
nb_enr = 0
for o_table in list(self.d_tables.values()):
nb_enr += o_table.select().count()
return nb_enr
def _get_range_from_df(self, df):
""" Renvoie un tuple de stamps, qui encadrent df, à l'extérieur de df (samedis 22:00). """
f_stamp = df['timestamp'].iloc[0] # Premier stamp réel dans df. <-- début.
l_stamp = df['timestamp'].iloc[-1] # Dernier stamp réel dans df. <-- fin.
stamp_before = f_stamp + self.dt.get_stamp_offset(f_stamp, b_after=False, num_day=6, _time='22:00')
stamp_after = l_stamp + self.dt.get_stamp_offset(l_stamp, b_after=True, num_day=6, _time='22:00')
return stamp_before, stamp_after
def _get_range_from_yw(self, year, str_week):
stamp = self.dt.get_dtstamp_from_dtstr(f'{year}-{str_week}-3 10', dt_format='%G-%V-%u %H') # Mercredi 10:00
stamp_before = stamp + self.dt.get_stamp_offset(stamp, b_after=False, num_day=6, _time='22:00')
stamp_after = stamp + self.dt.get_stamp_offset(stamp, b_after=True, num_day=6, _time='22:00')
return stamp_before, stamp_after
def is_volume_in_week(self, *yw):
""" Compte le nombre d'enregistrements total et le nombre d'enregistrements ayant du volume. """
""" Limites de la semaine : samedi 22:00 à samedi 22:00. """
range_stamps = self._get_range_from_yw(*yw)
""" Parcours de toutes les tables et cumul des valeurs. """
nb_total = nb_volume = 0
for o_table in list(self.d_tables.values()):
nb_total += o_table.select(o_table.Volume).where(
o_table.timestamp.between(*range_stamps)
).count()
nb_volume += o_table.select(o_table.Volume).where(
o_table.timestamp.between(*range_stamps),
~o_table.Volume.is_null()
).count()
return nb_volume > nb_total * .98 # Booléen.
def delete_week(self, *yw):
""" Supprime la semaine {yw} de chacune des tables """
range_stamps = self._get_range_from_yw(*yw)
for o_table in list(self.d_tables.values())[::-1]: # Parcours à l'envers.
o_table.delete().where(o_table.timestamp.between(*range_stamps)).execute()
""" Méthodes surchargées. """
def df_to_table(self, df):
return True
def update_derived(self, df):
pass
def update_derived_table(self, df, stamp_before, stamp_after, o_derived_table):
pass
def week_exists(self, year, str_week, csv):
return False
Remarque : Pour ces tests, le code concernant les volumes est provisoirement commenté (recherchez 'volume')./trading/historiques/db_candle.py
:
# Imports externes
from peewee import Model, FloatField, IntegerField
# Imports internes
from trading.historiques.db import DataBase
# noinspection PyTypeChecker, PyProtectedMember, DuplicatedCode
class DbCandle(DataBase):
def __init__(self, o_ctrl):
super().__init__(o_ctrl=o_ctrl, b_ticks=False)
db = self.db
""" Classe mère (hérite de peewee.Model). """
class BaseCandles(Model):
# https://docs.peewee-orm.com/en/latest/peewee/models.html#field-types-table
timestamp = IntegerField(primary_key=True) # Index==True et null==False par défaut => vide refusé.
BidOpen = FloatField()
BidHigh = FloatField()
BidLow = FloatField()
BidClose = FloatField()
AskOpen = FloatField()
AskHigh = FloatField()
AskLow = FloatField()
AskClose = FloatField()
Volume = IntegerField(null=True)
class Meta:
# db_table = 'xxxx' Si omis, le nom de la table sera automatique = nom de la classe en minuscules.
database = db
order_by = 'timestamp'
""" Classes dérivées (héritent de BaseCandles). """
class M1(BaseCandles):
""" Table principale. """
seconds = 60
class M5(BaseCandles):
seconds = 5 * 60
class M15(BaseCandles):
seconds = 15 * 60
class M30(BaseCandles):
seconds = 30 * 60
class H1(BaseCandles):
seconds = 60 * 60
class H4(BaseCandles):
seconds = 4 * 60 * 60
class Day(BaseCandles):
seconds = 24 * 60 * 60
class Week(BaseCandles):
seconds = 5 * 24 * 60 * 60 # Marché ouvert 5 jours par semaine.
""" Instanciations. """
self.d_tables = {'m1': M1, 'm5': M5, 'm15': M15, 'm30': M30, 'h1': H1, 'h4': H4, 'day': Day, 'week': Week}
self.o_table = M1
""" Création des tables dans le fichier candle.sql. """
with db:
self.db.create_tables(list(self.d_tables.values()))
def df_to_table(self, df):
""" Écriture SANS DOUBLONS des données de la DateFrame Pandas {df} dans la table principale M1. """
""" 1 - Mise en conformité de la colonne date-time. """
df = self.change_datetime_column(df)
if df is None:
return False
""" 2 - Écriture dans la table M1 : ~38sec pour 1_000_000 lignes.
replace_many() préférable à insert_many() car remplace en cas de doublon, donc pas d'erreur. """
l_datas = df.to_dict(orient='records') # <class 'list'> <-- ~3sec pour 1_000_000 lignes.
self.o_table.replace_many(l_datas).execute()
""" 3 - Écriture dans les tables dérivées : M5 à week. """
return self.update_derived(df)
def update_derived(self, df):
""" Tables dérivées de {M1}. """
stamp_before, stamp_after = self._get_range_from_df(df) # 2 samedis à 22:00.
self.ut.gauge('Tables dérivées de M1')
for o_derived_table in list(self.d_tables.values())[1:]: # Toutes les tables, sauf 'M1'
self.update_derived_table(df, stamp_before, stamp_after, o_derived_table)
self.ut.gauge(char=f'{o_derived_table._meta.table_name.upper()} ▒')
self.ut.gauge('end') # Passe à la ligne en fin de traitement.
self.ut.gauge('end') # Ligne vide supplémentaire.
return True
def update_derived_table(self, df, stamp_before, stamp_after, o_derived_table):
""" Une seule table. """
l_batches = list()
first_stamp, last_stamp, step = int(stamp_before + 86_400), int(stamp_after - 86_400), o_derived_table.seconds
last = ((last_stamp - first_stamp) // step) - 1
for i, since in enumerate(range(first_stamp, last_stamp, step)):
_from = stamp_before if i == 0 else since
_to = stamp_after if i == last else since + step
filtered_df = df[(df['timestamp'] >= _from) & (df['timestamp'] <= _to)]
if filtered_df['timestamp'].count() == 0:
continue
l_record = list()
l_record.append(filtered_df['timestamp'].iloc[-1]) # timestamp de fin.
l_record.append(filtered_df['BidOpen'].iloc[0]) # bidopen de début.
l_record.append(filtered_df['BidHigh'].max()) # valeur max des bidhigh.
l_record.append(filtered_df['BidLow'].min()) # valeur min des bidlow.
l_record.append(filtered_df['BidClose'].iloc[-1]) # bidclose de fin.
l_record.append(filtered_df['AskOpen'].iloc[0]) # askopen de début.
l_record.append(filtered_df['AskHigh'].max()) # valeur max ds askhigh.
l_record.append(filtered_df['AskLow'].min()) # valeur min des asklow.
l_record.append(filtered_df['AskClose'].iloc[-1]) # askclose de fin.
l_batches.append(l_record)
""" Écriture de l_batches (plusieurs enregistrements en une seule fois) dans la table o_table. """
with self.db.transaction():
o_derived_table.replace_many(l_batches).execute()
def week_exists(self, year, str_week, csv_size):
""" Toutes les tables sont testées. Une seule table défaillante suffit pour renvoyer False. """
""" Samedi avant, samedi après. Le milieu de la semaine est '3 10' -> mercredi 10:00. """
stamp = self.dt.get_dtstamp_from_dtstr(f'{year}-{str_week}-3 10', dt_format='%G-%V-%u %H') # Mercredi 10:00
stamp_before = stamp + self.dt.get_stamp_offset(stamp, b_after=False, num_day=6, _time='22:00')
stamp_after = stamp + self.dt.get_stamp_offset(stamp, b_after=True, num_day=6, _time='22:00')
for o_table in list(self.d_tables.values()): # Toutes les tables.
count = o_table.select().where(o_table.timestamp.between(stamp_before, stamp_after)).count()
""" Relation empirique entre la taille du fichier sur disque et le nombre attendu d'enregistrements. """
corr = .65 # Coefficient de correction.
nb_min = max(1, int(corr * csv_size / o_table.seconds))
if count < nb_min:
return False
return True
def update_volume(self, df_tick):
""" Update la colonne 'Volume' de chaque table, à partir de la dataframe des ticks. """
""" Limites de la semaine : samedi 22:00 à samedi 22:00. """
stamp_before, stamp_after = df_tick['timestamp'].iloc[0], df_tick['timestamp'].iloc[-1]
yw = self.dt.get_date_from_dtstamp((stamp_before + stamp_after)/2).isocalendar()[:2]
nb_enr = 0
for o_table in list(self.d_tables.values()):
nb_enr += o_table.select().where(o_table.timestamp.between(stamp_before, stamp_after)).count()
gauge_step = nb_enr // 50
indx = 0
self.ut.gauge(f"Volume dans les tables Candle ({yw[0]}, {yw[1]})") # >1514580360 >1514840400
for o_table in list(self.d_tables.values()):
batch = o_table.select().where(o_table.timestamp.between(stamp_before, stamp_after))
timestamp_ante = int(stamp_before)
l_batch = list()
for rec in batch:
indx += 1
if indx % gauge_step == 0:
nb = indx // gauge_step # 0 <= nb <= 50
char = '▒' if nb < 41 else ('▄' if nb < 46 else '.')
self.ut.gauge(char=char)
period = timestamp_ante + 1, rec.timestamp
vol = df_tick[df_tick['timestamp'].between(*period)].shape[0]
l_batch.append((rec.timestamp, rec.BidOpen, rec.BidHigh, rec.BidLow, rec.BidClose,
rec.AskOpen, rec.AskHigh, rec.AskLow, rec.AskClose, vol))
timestamp_ante = rec.timestamp
o_table.replace_many(l_batch).execute()
self.ut.gauge('end')
if keyboard.is_pressed('ctrl')
2 fois dans ctrl_histos.py
.Ctrl
, souvent utilisée, provoquera un arrêt involontaire du programme Python.self.kb_break
à la place de 'ctrl'
.ctrl_histos.py
./trading/historiques/ctrl_histos.py
:
# Imports externes
import datetime
import gzip
import hashlib
import os # http://www.python-simple.com/python-modules-fichiers/os-path.php (Ctrl + clic)
import pickle
import requests
from io import BytesIO, StringIO
import pandas as pd
import keyboard # https://www.delftstack.com/fr/howto/python/python-detect-keypress/
# Imports internes
from functions.utils import DateTime, Utils
from trading.historiques.db_candle import DbCandle
from trading.historiques.db_tick import DbTick
class CtrlHistos:
def __init__(self, instrument):
""" https://github.com/fxcm/MarketData
Tous les symboles :
AUDCAD, AUDCHF, AUDJPY, AUDNZD, AUDUSD, CADCHF, CADJPY, EURAUD, EURCHF,
EURGBP, EURJPY, EURNZD, EURUSD, GBPCAD, GBPCHF, GBPJPY, GBPNZD, GBPUSD,
NZDCAD, NZDCHF, NZDJPY, NZDUSD, USDCAD, USDCHF, USDJPY, USDTRY
Les plus utilisés (Dans l'ordre d'importance) :
EURUSD, USDJPY, EURCHF, USDCAD, NZDUSD, EURGBP, EURJPY, GBPJPY, GBPCHF, GBPUSD """
self.instrument = instrument
self.symbol = instrument.replace('/', '')
self.dt = DateTime()
self.ut = Utils()
self.kb_break = 'f12' # 'shift', 'ctrl', ... à votre convenance.
self.symbol_dir = None
self.db_candle = None
self.db_tick = None
self.setup()
def setup(self):
""" Les données peuvent se trouver sur un disque dur SSD autre que celui en cours.
Le cas échéant, db_path doit être modifié (chemin absolu).
Il est également possible, pour les besoins des tests, d'avoir plusieurs bases de données.
|_ Il faudra alors modifier db_path chaque fois que l'on désire switcher. """
db_path = os.path.dirname(__file__) # <--- Chemin par défaut, à modifier si autre disque dur.
self.symbol_dir = os.path.abspath(f'{db_path}/db/{self.symbol}') # Dossier des fichiers d'historiques.
os.makedirs(self.symbol_dir, exist_ok=True)
self.db_candle = DbCandle(self)
self.db_tick = DbTick(self)
def _helper(self, key, *l_params):
""" ************ Collection d'utilitaires. Nombre de paramètres variable. ************ """
if key == 'nb_weeks_in_year':
""" Renvoie le nombre de semaines dans l'année {year}. """
year = l_params[0] # l_params = [année dont on veut connaître le nombre de semaines].
if len(l_params) > 1:
y, num_week, _ = datetime.date.today().isocalendar()
if y == year:
return num_week - 1
o_dat = self.dt.get_date_from_dtstamp(self.dt.get_dtstamp_from_dtstr(f'{year}-12-28', '%Y-%m-%d'))
return o_dat.isocalendar()[1]
elif key == 'l_iso_yw':
""" Renvoie une liste de tuples ISO (iso_year, iso_str_week) : <-- exemple de tuple : (2019, '08')
- Pour la devise en cours et son type (tick ou candle).
- Toutes les semaines, du début à aujourd'hui.
- L'année est un int, la semaine une str ('02', '12', ...) """
b_ticks = l_params[0] # l_params = [b_ticks].
st_yw = set() # Le set() évite les doublons.
since = 2018 if b_ticks else 2012
yw_today = datetime.date.today().isocalendar()[:2]
for year in range(since, yw_today[0] + 1):
for week in range(1, 54):
iso = self.dt.isoyw_from_fxyw(year, week)
st_yw.add((iso[0], f'0{iso[1]}'[-2:]))
if (year, week) >= yw_today:
lt_yw = list(st_yw)
lt_yw.sort()
return lt_yw
elif key == 'csv_file':
""" Renvoie le chemin complet du fichier correspondant à la devise en cours, à l'année et à la semaine. """
year, week, b_ticks = l_params # l_params = [*t_yw, b_ticks]
str_week = f'0{week}'[-2:]
return os.path.join(self.symbol_dir, str(year), f"{'tick' if b_ticks else 'candle'}_"
f"{str_week}.csv")
elif key == 'url_file':
""" Renvoie l'url du fichier d'histos correspondant à la devise en cours, à l'année et à la semaine. """
year, str_week, b_ticks = l_params # l_params = [*t_yw, b_ticks]
url_candle = 'https://candledata.fxcorporate.com/m1'
url_tick = 'https://tickdata.fxcorporate.com'
return f'{url_tick if b_ticks else url_candle}/{self.symbol}/{year}/{int(str_week)}.csv.gz'
def verify_weeks(self, b_ticks, b_silent=False):
""" Affichage d'un tableau dans la console (Lignes=années, Colonnes=semaines).
'▒' ou '▄' ou '▀' = semaine présente, '.' = semaine absente. """
""" 1 - Listes des semaines réellement existantes en base de données et en fichier .csv. """
l_weeks_csv, l_weeks_db = self._existing_lists(b_ticks)
""" 2 - Affichage du titre et entêtes de colonnes. """
typ = 'Ticks' if b_ticks else 'Candles'
self.ut.gauge(f"{self.instrument}-{typ}", large=17)
[self.ut.gauge(char=f"{str(week) :<2}") for week in range(1, 54)]
self.ut.gauge('end')
""" 3 - Affichage du contenu - Ordonnées = années, abscisse = semaines. """
l_weeks = l_weeks_csv + l_weeks_db
l_weeks.sort()
first_year = (2018 if b_ticks else 2012) if len(l_weeks) == 0 else l_weeks[0][0]
year_now = datetime.date.today().isocalendar()[0]
for year in range(first_year, year_now + 1):
self.ut.gauge(year, large=17)
""" Boucle sur les semaines de l'année {year} : 1 à (52 ou 53). """
for week in range(1, self._helper('nb_weeks_in_year', year, 'today') + 1): # today limite à aujourd'hui.
t_yw = year, f'0{week}'[-2:] # 7 -> 07
b_csv, b_db = t_yw in l_weeks_csv, t_yw in l_weeks_db
if not b_csv and not b_db: # 0 0 /x./y
char, color = '. ', 'BLEU'
elif not b_csv and b_db: # 0 1 /x.y
char, color = '▀ ', 'VERT'
elif b_csv and not b_db: # 1 0 x./y
char, color = '▄ ', 'VIOLET'
else: # 1 1 x.y
char, color = '▒ ', 'BLEU'
self.ut.gauge(char=char, color=color)
self.ut.gauge('end')
""" Vérification de la synchronisation entre la base de données et les fichiers csv. """
msg = "\nAttention : La base de données n'est pas entièrement synchronisée ! csv + db = rectangles bleus." \
"\nL'affichage ci-dessus montre les fichiers .csv en carrés-bas mauves, la db en carrés-hauts verts." \
if not b_silent and l_weeks_csv != l_weeks_db else ''
self.ut.printc(msg)
def download_histos(self, nb_weeks, b_ticks):
if nb_weeks > 1:
self.ut.printc(f"\nArrêt manuel : garder la touche '{self.kb_break.upper()}' appuyée.\n")
""" Intervalle des semaines à télécharger (nombres en base 53). """
""" |_ since = Depuis. """
l_weeks_csv, _ = self._existing_lists(b_ticks=b_ticks) # Semaines = str <-- '08', '09', '10', ...
l_csv = [(y, int(w)) for (y, w) in l_weeks_csv] # Semaines = int <-- 8, 9, 10, ...
for y, w in l_csv:
if w > 2:
since = y * 53 + w - 1
break
else:
since = (2018 if b_ticks else 2012) * 53
y_today, w_today = datetime.date.today().isocalendar()[:2]
now = y_today * 53 + w_today
""" |_ now = Jusqu'à. """
""" l_required = Liste des fichiers à télécharger. """
l_required = list()
for yw53 in range(since, now-1): # dernière semaine = (now-1) => pas la semaine en cours.
csv_yw = yw53 // 53, 1 + yw53 % 53
csv_file = self._helper('csv_file', *csv_yw, b_ticks)
if not os.path.isfile(csv_file):
""" Fichier absent localement => à télécharger. """
l_required.append(csv_yw)
db = self.db_tick if b_ticks else self.db_candle
nb_weeks_loaded = 0
""" Parcours des semaines à traiter. """
for csv_yw_required in l_required:
df = self._download_csv_file(csv_yw_required, b_ticks)
if df is None:
""" Arrêt manuel demandé. """
break
elif df.shape[0] > 0:
""" 1 - Enregistrement du fichier téléchargé sur disque dur. """
self._df_to_csv(df, csv_yw_required, b_ticks)
""" 2 - Écriture du contenu en base de données. break si arrêt manuel demandé. """
if not db.df_to_table(df):
continue
""" 3 - Update des volumes à partir des ticks. """
# df_stamps = self.db_tick.get_df_stamps(*csv_yw_required)
# if df_stamps.shape[0] > 0:
# self.db_candle.update_volume(df_stamps)
""" Arrêt si le nombre de semaines demandées est atteint. """
nb_weeks_loaded += 1
if nb_weeks_loaded >= nb_weeks:
break
s = 's' if nb_weeks > 1 else ''
self.ut.printc(f"Téléchargement{s} terminé{s}.", color='vert')
def synchro_db_csv(self, b_ticks):
""" A l'issue de cette méthode, TOUTE la db sera l'image de TOUS les fichiers csv. """
""" 1 - Base de données. """
db, typ = (self.db_tick, 'Ticks') if b_ticks else (self.db_candle, 'Candles')
""" 2 - État actuel des historiques => Différences entre csv et db
- l_drop = semaines à supprimer de la db, l_add = semaines à ajouter à la db. """
l_drop, l_add, l_vol = self._lists_to_update_db(b_ticks=b_ticks)
if l_drop+l_add+l_vol == []:
self.ut.printc(f"{typ} csv <--> db : OK. La synchronisation est inutile.\n", color='VERT')
return True
""" 3 - Message : état initial. """
for l_weeks in [l_drop, l_add]:
if len(l_weeks) > 0:
s = 's sont' if len(l_weeks) > 1 else ' est'
verb = 'ajouter à' if l_weeks == l_add else 'supprimer de'
self.ut.printc(f"{len(l_weeks)} semaine{s} à {verb} la base de données '{typ.lower()}'.", color='VERT')
""" 4 - Ajout de données dans la db. """
self._add_weeks(l_add, b_ticks)
""" 5 - Suppression de données de la db. """
for yw in l_drop:
db.delete_week(*yw)
""" 6 - Update des volumes. """
# for yw in l_vol:
# if keyboard.is_pressed(self.kb_break):
# self.ut.printc("Arrêt manuel demandé.")
# break
# df_stamps = self.db_tick.get_df_stamps(*yw)
# self.db_candle.update_volume(df_stamps)
""" 7 - Vérification. """
l_drop, l_add, l_vol = self._lists_to_update_db(b_ticks=b_ticks)
success, color = ('réussi', 'VERT') if l_drop+l_add+l_vol == [] else ('échoué', 'ROUGE')
self.ut.printc(f'{typ} : La synchronisation a {success}.', color=color)
def _existing_lists(self, b_ticks):
""" Retourne les listes de semaines des fichiers csv et des enregistrements en base de données. """
""" Exécution rapide si les signatures correspondent (~1ms). """
sign_now = self._signature(b_ticks) # Signature actuelle.
lists_file = os.path.join(self.symbol_dir, 'lists_tick.pkl' if b_ticks else 'lists_candle.pkl')
if os.path.isfile(lists_file):
with open(lists_file, 'rb') as l_lists:
sign_ante, l_weeks_csv, l_weeks_db = pickle.load(l_lists) # Signature persistée.
if sign_now == sign_ante:
return l_weeks_csv, l_weeks_db
""" Exécution lente si les signatures ne correspondent pas (~3 secondes, mais seulement la 1ère fois). """
""" s_weeks_csv n'est pas une list(), mais un set(), pour éviter les doublons. """
l_weeks_csv, s_weeks_db = list(), set()
db = self.db_tick if b_ticks else self.db_candle
l_iso_yw = self._helper('l_iso_yw', b_ticks) # Liste de toutes les semaines.
typ = 'Ticks' if b_ticks else 'Candles'
self.ut.gauge(f"{self.instrument}-{typ} Création listes :", large=0)
for num, iso_yw in enumerate(l_iso_yw):
""" Semaines existant en fichiers csv : Pour une semaine iso, on a 0, 1 ou 2 fichiers csv. """
l_csv_files, last_stamp = self._get_csv_files(*iso_yw, b_ticks)
csv_size = 0
if len(l_csv_files) > 0:
l_weeks_csv.append(iso_yw)
for csv_file in l_csv_files:
csv_size += os.path.getsize(csv_file)
""" Liste des semaines existant en base de données. """
csv = last_stamp if b_ticks else csv_size
if db.week_exists(*iso_yw, csv):
s_weeks_db.add(iso_yw) # (1)
if num % 8 == 0:
self.ut.gauge(char='.') # , color='Jaune')
self.ut.gauge('end')
self.ut.gauge('end')
""" Persistance en pkl. """
l_weeks_db = list(s_weeks_db)
l_weeks_csv.sort()
l_weeks_db.sort()
with open(lists_file, 'wb') as l_lists:
pickle.dump((sign_now, l_weeks_csv, l_weeks_db), l_lists, pickle.HIGHEST_PROTOCOL)
return l_weeks_csv, l_weeks_db
def _signature(self, b_ticks):
""" - Cette méthode est d'exécution rapide (~1 ms).
- Elle produit une signature sur 256 bits.
- Elle permet de savoir si des modifications ont été apportées à la db et/ou aux fichiers csv. """
db = self.db_tick if b_ticks else self.db_candle
y, w = datetime.date.today().isocalendar()[:2]
sign = str(53*y + w + db.get_db_sign())
for root, sub_folder, l_csv in os.walk(self.symbol_dir):
if root == self.symbol_dir:
continue
else:
""" Concaténation de tous les noms de fichiers .csv dans {self.symbol_dir} """
l_csv = [csv for csv in l_csv if csv.startswith('tick' if b_ticks else 'candle')] # Filtrage.
sign += root + ''.join(l_csv)
_hash = hashlib.sha256(sign.encode('utf-8')).hexdigest()
return _hash
def _get_csv_files(self, iso_year, iso_week, b_ticks):
""" Attention ! Les noms de fichiers csv ne correspondent pas toujours à leur contenu ! """
""" Return : liste de noms complets de fichiers csv correspondant à (iso_year, iso_week) : 0, 1 ou 2 fichiers.
fxcorporate ne respecte pas la norme ISO. De plus :
- D'une année sur l'autre, le nom du fichier csv ne correspond pas à son contenu.
- On ne trouve pas de correspondance entre diverses devises.
- Par conséquent, le nom du fichier n'est pas fiable pour trouver le bon N° de semaine.
La méthode adoptée pour contourner ce problème est la suivante :
- Recherche de 3 fichiers csv voisins de (iso_year, iso_week) passés en paramètres.
- Lecture, pour chacun, de la date-heure du milieu du fichier.
- On en déduit le VRAI N° de semaine ISO.
Il arrive parfois, au changement d'année, qu'une même semaine soit composée de 2 fichiers csv.
- C'est pourquoi la valeur de retour est une liste. """
int_week, l_fxyw = int(iso_week), list()
last_stamp = 0
for week in range(int_week - 1, int_week + 2): # 3 Semaines : précédente, actuelle, suivante.
""" Correction : N° de semaine en base 53. """
y, w = (iso_year, week) if week > 0 else (iso_year - 1, week + 53)
y, w = (y, w) if w <= 53 else (y + 1, w - 53)
""" Les 3 fichiers csv candidats. """
str_w = f'0{w}'[-2:]
file_name = f"{'tick' if b_ticks else 'candle'}_{str_w}.csv" # 'tick_??.csv' ou 'candle_??.csv'.
csv_path = os.path.abspath(f"{os.path.dirname(self.symbol_dir)}/{self.symbol}/{y}/{file_name}")
if os.path.isfile(csv_path):
with open(csv_path, 'r') as fh:
""" On 'goûte' les 300 derniers bytes du fichier pour connaitre le N° ISO de semaine. """
nb_char = 300
fh.seek(0, os.SEEK_END)
fh.seek(fh.tell() - nb_char)
dt_str = fh.read().split('\n')[-3].split(',')[0]
last_stamp = self.dt.get_dtstamp_from_dtstr(dt_str)
o_d = self.dt.get_date_from_dtstamp(last_stamp) # Objet date.
if o_d.isocalendar()[:2] == (iso_year, int_week): # Filtrage.
l_fxyw.append(csv_path) # Candidat sélectionné.
return l_fxyw, last_stamp # 0, 1 ou 2 éléments.
def _download_csv_file(self, csv_yw_required, b_ticks):
""" Téléchargement, décompactage, décodage, DataFrame, enregistrement sur disque dur.
- return DataFrame. """
""" 1 - Demande au helper de construire l'url. """
url_required = self._helper('url_file', *csv_yw_required, b_ticks)
""" 2 - Initialisation des bytes. """
content = b''
""" 3 - Téléchargement en streaming avec lots de taille {size//50}, permettant l'insertion d'une jauge. """
y, w = csv_yw_required
with requests.get(url_required, stream=True) as req:
size = int(req.headers['Content-Length'])
if size < 1_000:
return pd.DataFrame() # return df vide.
print()
self.ut.gauge(f"Téléchargement {' ticks ' if b_ticks else 'candles'} {self.symbol} ({y}, {w})")
for chunk in req.iter_content(chunk_size=size//50):
if keyboard.is_pressed(self.kb_break):
self.ut.printc("Arrêt manuel demandé.")
self.ut.gauge('end')
return None
content += chunk
""" 3.1 - Jauge de progression. """
self.ut.gauge()
self.ut.gauge('end')
""" 4 - Décompactage du buffer de données binaires (codées utf), en mémoire. """
buf = BytesIO(content)
f = gzip.GzipFile(fileobj=buf)
coded_bytes = f.read()
""" 5 - Décodage : utf-8 pour les candles, utf-16 pour les ticks. """
codec, type_data = ('utf-16', 'tick') if b_ticks else ('utf-8', 'candle')
decoded_bytes = coded_bytes.decode(codec)
""" 6 - Conversion Bytes -> String """
decoded_str = StringIO(decoded_bytes)
""" 7 - String to DataFrame. """
df = pd.read_csv(decoded_str) # Pas de colonne-index (index_col=0 retiré)
""" 8 - Retourne le dataframe. """
return df
def _df_to_csv(self, df, t_yw, b_ticks):
""" Contrôle préalable : si le dossier-cible n'existe pas, on le crée. """
csv_path = self._helper('csv_file', *t_yw, b_ticks)
os.makedirs(os.path.dirname(csv_path), exist_ok=True)
df.to_csv(csv_path, index=False) # Copie du fichier .csv sur disque dur.
@staticmethod
def _csv_to_df(csv_file):
""" Conversion du fichier {csv_file} en dataframe. """
return pd.read_csv(csv_file) if os.path.isfile(csv_file) else pd.DataFrame()
def _lists_to_update_db(self, b_ticks):
""" Retourne 3 listes : l_drop, l_add et l_vol.
Algorithme pour l_drop et l_add :
- (1) = l_weeks_db : Liste des semaines réellement existantes en base de données.
- (2) = l_weeks_csv : Liste des semaines réellement existantes en fichiers csv.
- Intersection (Inter) = (1) ⋂ (2)
- l_drop : Liste des semaines à supprimer de la db (celles qui n'existent pas en csv) : (1) - (Inter)
- l_add : Liste des semaines à ajouter dans la db (celles des csv qui n'existent pas en db) : (2) - (Inter)
--------------------------------
l_vol = liste des semaines (yw) de candles nécessitant un update du champ 'Volume' dans les tables.
Sans objet pour les ticks.
"""
""" 1 et 2 - Listes des semaines réellement existantes en base de données et en fichier .csv. """
l_weeks_csv, l_weeks_db = self._existing_lists(b_ticks=b_ticks)
""" 3 - Liste des semaines à supprimer de la db (celles qui n'existent pas en csv). """
l_drop = list()
for t_yw in l_weeks_db:
if t_yw not in l_weeks_csv:
# print('A supprimer :', t_yw) # MAP : Permet de lister les semaines qui sont à supprimer.
l_drop.append(t_yw)
""" 4 - Liste des semaines à ajouter à la db (celles des csv qui n'existent pas en db). """
l_add = list()
for t_yw in l_weeks_csv:
if t_yw not in l_weeks_db:
# print('A ajouter :', t_yw) # MAP : Permet de lister les semaines qui sont à ajouter.
l_add.append(t_yw)
""" 5 - Liste des semaines de ticks pour lesquelles les volumes des candles sont à updater. """
l_vol = list()
# if not b_ticks: # Renkos non concernés.
# """ Semaines ticks existantes (b_ticks=True). """
# _, l_weeks_tick_db = self._existing_lists(b_ticks=True)
# for yw in l_weeks_tick_db:
# if yw in l_weeks_db:
# if not self.db_candle.is_volume_in_week(*yw):
# l_vol.append(yw)
return l_drop, l_add, l_vol
def _add_weeks(self, l_yw, b_ticks):
"""
@param l_yw : Liste des semaines à ajouter à la base de données.
@param b_ticks: True = ticks, False = candles.
"""
db = self.db_tick if b_ticks else self.db_candle
self.ut.printc(f"\nArrêt manuel : garder la touche '{self.kb_break.upper()}' appuyée.\n")
for iso_yw in l_yw:
if keyboard.is_pressed(self.kb_break):
self.ut.printc("Arrêt manuel demandé.")
break
l_csv_files, _ = self._get_csv_files(*iso_yw, b_ticks) # Liste de 0, 1 ou 2 fichiers.
for csv_file in l_csv_files:
df = self._csv_to_df(csv_file)
if df.shape[0] > 0:
db.df_to_table(df)
def main(): # MAP ***** MAP ***** MAP ***** MAP ***** MAP ***** MAP ***** MAP *****
""" Code de MAP (mise au point). Espace expérimental : ajouter, supprimer, commenter, décommenter, ... """
h = CtrlHistos('EUR/USD')
# h = CtrlHistos('USD/JPY')
""" Candles. """
# h.download_histos(10, b_ticks=False)
# h.synchro_db_csv(b_ticks=False)
h.verify_weeks(b_ticks=False) # , b_silent=True)
""" Ticks. """
# h.download_histos(2, b_ticks=True)
# h.synchro_db_csv(b_ticks=True)
# h.verify_weeks(b_ticks=True)
if __name__ == '__main__':
main()
self.kb_break = 'f12'
dans l'init, mais vous pouvez modifier cette valeur à votre convenance.
DbTick
a changé : self
au lieu de self.symbol_dir
. Remplacer le fichier db_tick.py
./trading/historiques/db_tick.py
:
# Imports internes
from trading.historiques.db import DataBase # Classe-parent.
class DbTick(DataBase):
def __init__(self, o_ctrl):
super().__init__(o_ctrl=o_ctrl, b_ticks=True)
main()
: Vérifiez que seul verify_weeks()
pour les Candles est activé.
ctrl_histos.py
: Run 'ctrl_histos'.main()
: Activer download_histos()
pour les Candles.
ctrl_histos.py
.main()
: Activer synchro_db_csv()
pour les Candles.
Description
main() à la fin du fichier ctrl_histos.py
:
def main(): # MAP ***** MAP ***** MAP ***** MAP ***** MAP ***** MAP ***** MAP *****
""" Code de MAP (mise au point). Espace expérimental : ajouter, supprimer, commenter, décommenter, ... """
h = CtrlHistos('EUR/USD')
# h = CtrlHistos('USD/JPY')
""" Candles. """
# h.download_histos(1, b_ticks=False)
# h.synchro_db_csv(b_ticks=False)
# h.verify_weeks(b_ticks=False) # , b_silent=True)
""" Ticks. """
# h.download_histos(1, b_ticks=True)
h.verify_weeks(b_ticks=True)
# h.synchro_db_csv(b_ticks=True)
Une seule ligne est activée : h.verify_weeks(b_ticks=True)
Run 'ctrl_histos'
).DbTick
n'est pas encore codée.
Petits carrés mauves = seulement fichiers dans disque dur.
main()
: Activer synchro_db_csv()
pour les Ticks.
ctrl_histos.py
(Run 'ctrl_histos').
La synchronisation a échoué car la classe DbTick
n'est pas encore codée.
DbCandle
, elle hérite de DataBase
.tick.sql
.Ticks
.timestamp
, Bid
et Ask
./trading/historiques/db_tick.py
:
# Imports externes
from peewee import Model, FloatField
import pandas as pd
# Imports internes
from trading.historiques.db import DataBase # Classe-parent.
from trading.historiques.db_renko import DbRenko
# noinspection DuplicatedCode
class DbTick(DataBase):
def __init__(self, o_ctrl):
super().__init__(o_ctrl=o_ctrl, b_ticks=True)
db = self.db
self.o_renko = DbRenko(o_ctrl)
class Ticks(Model):
timestamp = FloatField(primary_key=True) # Index==True et null==False par défaut => vide refusé.
Bid = FloatField()
Ask = FloatField()
class Meta:
database = db
order_by = 'timestamp'
""" Instanciations. """
self.o_table = Ticks
""" Création de la table 'ticks' dans le fichier tick.sql. """
self.db.create_tables([Ticks])
Ce code permet juste de créer la base de données tick.sql
et la table Tick
.
self.o_renko
est une instance de la classe DbRenko
.DbRenko
minimaliste pour éviter des erreurs./trading/historiques/db_renko.py
:
# Imports internes
from trading.historiques.db import DataBase
class DbRenko(DataBase):
def __init__(self, o_ctrl):
super().__init__(o_ctrl=o_ctrl, b_ticks=None)
def range_exists(self, stamp_before, stamp_after):
""" Les 5 premières tables sont testées. Une seule table défaillante suffit pour renvoyer False.
- En effet, pour les grandes mailles (> 7), il arrive qu'il n'y ait pas d'enr pendant plus d'une semaine. """
# ...
return True
tick.sql
ave DB Browser SQLite
:
Une seule table (ticks
), contenant 3 champs : timestamp
, Bid
et Ask
.
DbTick
sont : get_db_sign()
, week_exists()
, df_to_table()
et delete_week()
.CtrlHistos
.
Pour plus de clarté, seules les méthodes de DbTick
sont représentées dans leurs contenants.
/trading/historiques/db_tick.py > DbTick
:
def get_db_sign(self):
""" self.o_table.select().count() est chronophage. Remplacé par la lecture du dernier timestamp. """
try:
last_ts = int(self.o_table.select().order_by(self.o_table.timestamp.desc()).get().timestamp)
except (Exception,):
last_ts = 0
return last_ts + self.o_renko.get_db_sign()
def week_exists(self, year, str_week, last_stamp):
""" - La table de ticks est testée. Les tables Renko sont également testées.
- Une seule table défaillante suffit pour renvoyer False. """
""" Samedi avant, samedi après. Le milieu de la semaine est '3 10' -> mercredi 10:00. """
first_stamp = last_stamp - 10_000
count = self.o_table.select().where(self.o_table.timestamp.between(first_stamp, last_stamp)).count()
""" Relation empirique entre la taille du fichier sur disque et le nombre attendu d'enregistrements. """
if count == 0:
return False
return self.o_renko.range_exists(*self._get_range_from_yw(year, str_week))
def df_to_table(self, df):
""" a) Écriture Ticks (SANS DOUBLONS) : Données de la DateFrame Pandas {df} dans la table de ticks.
b) Écriture Renko (df conforme) : Délégation à l'objet {o_renko}, en dernière ligne."""
""" 1 - Mise en conformité de la colonne date-time. """
df = self.change_datetime_column(df)
if df is None:
return False
""" 2 - df -> l_datas (~700_000 lignes). """
l_datas = df.to_dict(orient='records')
""" 3 - Découpage de l_datas en 50 batches (au minimum). """
len_datas = len(l_datas)
batch_size = max(1, min(50_000, len_datas // 50))
nb_steps = len_datas // batch_size
""" 4 - Boucle d'écritures avec jauge de progression. """
self.ut.gauge('Écriture dans la table de ticks')
with self.db.transaction():
for nb in range(1 + nb_steps):
""" {batch_size} enr. pour éviter l'erreur 'OperationalError: too many SQL variables' (si > 82_000) """
indx_min = nb * batch_size
indx_max = min(len_datas, indx_min + batch_size)
datas = l_datas[indx_min: indx_max] # Slice.
""" replace_many() préférable à insert_many() car remplace en cas de doublon, donc pas d'erreur. """
self.o_table.replace_many(datas).execute()
char = '▒' if nb < (nb_steps - 9) else ('▄' if nb < (nb_steps - 4) else '.')
self.ut.gauge(char=char)
self.ut.gauge('end')
""" 5 - Écriture dans les tables Renko. """
return self.o_renko.df_to_table(df)
def delete_week(self, *yw):
""" 1) Suppression de la semaine {yw} dans la table {tick}.
2) Suppression dans chacune des tables renko, déléguée à l'objet {o_renko}, en dernière ligne. """
range_stamps = self._get_range_from_yw(*yw)
self.o_table.delete().where(self.o_table.timestamp.between(*range_stamps)).execute()
self.o_renko.delete_week(*yw)
Exemple : 7 semaines de ticks on été synchronisées.
tick.sql
ave DB Browser SQLite
:
7 semaines : 11 millions d'enregistrementes et 600 Mo sur disque dur !
DB Browser for SQLite
ne sera utilisé que pour les Candles
et les Renkos
.
tick
...Python
et Peewee
n'ont pas ces contraintes 😊download_histos()
et verify_weeks()
.
La semaine (2018, 13) a bien été téléchargée et entrée en base de données : rectangle bleu.
/trading/historiques/db_renko.py
:
# Imports externes
from peewee import Model, FloatField, IntegerField
import keyboard
# Imports internes
from trading.historiques.db import DataBase
# noinspection PyTypeChecker,PyProtectedMember,DuplicatedCode
class DbRenko(DataBase):
def __init__(self, o_ctrl):
super().__init__(o_ctrl=o_ctrl, b_ticks=None)
db = self.db
""" Classe mère (hérite de peewee.Model). """
class BaseRenko(Model):
# https://docs.peewee-orm.com/en/latest/peewee/models.html#field-types-table
timestamp = FloatField(primary_key=True) # Index==True et null==False par défaut => vide refusé.
Renko = IntegerField()
Spread = FloatField()
Volume = FloatField()
class Meta:
database = db
order_by = 'timestamp'
""" Classes dérivées (héritent de BaseRenko). """
class Renko1(BaseRenko):
maille = 1
class Renko2(BaseRenko):
maille = 2
class Renko3(BaseRenko):
maille = 3
class Renko4(BaseRenko):
maille = 4
class Renko5(BaseRenko):
maille = 5
class Renko7(BaseRenko):
maille = 7
class Renko10(BaseRenko):
maille = 10
class Renko14(BaseRenko):
maille = 14
class Renko20(BaseRenko):
maille = 20
""" Instanciations. """
self.d_tables = {'r1': Renko1, 'r2': Renko2, 'r3': Renko3, 'r4': Renko4, 'r5': Renko5,
'r7': Renko7, 'r10': Renko10, 'r14': Renko14, 'r20': Renko20}
self.o_table = Renko1
""" Création des tables dans le fichier renko.sql. """
with db:
self.db.create_tables(list(self.d_tables.values()))
def range_exists(self, stamp_before, stamp_after):
""" Les 5 premières tables sont testées. Une seule table défaillante suffit pour renvoyer False.
- En effet, pour les grandes mailles (> 7), il arrive qu'il n'y ait pas d'enr pendant plus d'une semaine. """
# ...
return True
ctrl_histos.py
localement.
Comparer le code de db_renko.py
avec l'image ci-dessus pour mettre en évidence les tables et la champs.
Algorithme :
- Le contrôleur d'historiques fait la liste des semaines de Candles à updater au niveau des volumes.
- Point 5 de la méthode
CtrlHistos._lists_to_update_db()
- La méthode d'update des volumes de Candles existe déjà, il s'agit de
DbCandle.update_volume()
.- Le contrôleur d'historiques appelle cette méthode à 2 occasions :
- Lors de la synchronisation des ticks : Point 6 de la méthode
CtrlHistos.synchro_db_csv()
.- Lors du téléchargement de ticks : Point 3 de la méthode
CtrlHistos.download_histos()
.- Pour l'instant, les 3 points mentionnés sont commentés.
update_volume()
appelée a besoin d'un argument : df_stamps
.
DbTick.get_df_stamp()
qui n'existe pas. Créons-la :DbTick.get_df_stamp()
:
def get_df_stamps(self, *yw):
""" Filtrage de la table de ticks sur la semaine passée en paramètre, puis => df (pandas.DataFrame).
Le volume est le nombre de variations du tick : https://www.ig.com/fr/glossaire-trading/volume-definition
Valeur retournée : vecteur des stamps sur la semaine {yw}.
"""
range_stamps = self._get_range_from_yw(*yw)
query_ts = self.o_table.select(self.o_table.timestamp).where(
self.o_table.timestamp.between(*range_stamps)
).order_by(self.o_table.timestamp.asc())
return pd.DataFrame(list(query_ts.dicts()))
main()
de lancement de ctrl_histos.py : activer uniquement la synchro des Candles.ctrl_histos.py
localement.timestamp
avec cette condition : >1514000000
(1 514 millions). Cela correspond à la fin de 2017.DbTick.week_exists()
.True
si la semaine visée existe, False
sinon.DbTick.week_exists()
, à la fin :
return self.o_renko.range_exists(*self._get_range_from_yw(year, str_week))
DbRenko.range_exists()
n'est pas encore codée et renvoie True
de façon provisoire.La méthode DbRenko.range_exists() doit donc être codée.
DbTick.df_to_table()
.
""" 5 - Écriture dans les tables Renko. """
return self.o_renko.df_to_table(df)
DbRenko.df_to_table()
n'existe pas. C'est la méthode-parent DataBase.df_to_table()
qui est donc appelée et qui renvoie simplement True
.La méthode DbRenko.df_to_table() doit donc être créée et codée.
DbTick.delete_week()
.
self.o_renko.delete_week(*yw)
En résumé, 2 méthodes sont à coder dans
DbRenko
:range_exists()
etdf_to_table()
Vérification
main()
de ctrl_histos
.F12
) apres quelques synchronisations.
renko.sql
dans DB Browser for SQLite
, afficher le contenu des tables :Toutes les tables Renko contiennent des données.
/nodes/generateurs/histos/histos.py > Node.histo_data()
.
self.ut.printc()
final.Snippets
Bonjour les codeurs !