Source code for fastf1.livetiming.data

"""
Data object for livetiming data
"""

from datetime import timedelta
import json
import hashlib
import logging

from fastf1.utils import to_datetime, recursive_dict_get


_track_status_mapping = {
    'AllClear': '1',
    'Yellow': '2',
    'SCDeployed': '4',
    'Red': '5',
    'VSCDeployed': '6',
    'VSCEnding': '7'
}


[docs]class LiveTimingData: """Live timing data object for using saved livetiming data as data source. This object is created from data that was recorded using :class:`fastf1.livetiming.SignalRClient`. It can be passed to various api calling functions using the ``livedata`` keyword. Usually you will only instantiate this function and pass it to other functions. See :mod:`fastf1.livetiming` for a usage example. If you want to load data from multiple files you can simply pass multiple filenames:: livedata = LiveTimingData('file1.txt', 'file2.txt', 'file3.txt') The files need to be in chronological order but may overlap. I.e. if the last five minutes of file1 are the same as the first 5 minutes of file2 this will be recognized while loading the data. No duplicate data will be loaded. Args: *files (str): One or multiple file names remove_duplicates (bool): Remove duplicate lines. Mainly useful when loading multiple overlapping recordings. (Checking for duplicates is currently very slow for large files. Therefore, it can be disabled if this may cause problems.) """ def __init__(self, *files, remove_duplicates=True): # file names self.files = files # parsed data self.data = dict() # number of json errors self.errorcount = 0 # flag for auto loading on first access self._files_read = False # date when session was started self._start_date = None # whether any files were loaded previously, i.e. appending data self._previous_files = False # hash each line, used to skip duplicates from multiple files self._line_hashes = list() self._remove_duplicates = remove_duplicates
[docs] def load(self): """ Read all files, parse the data and store it by category. Should usually not be called manually. This is called automatically the first time :meth:`get`, :meth:`has` or :meth:`list_categories` are called. """ logging.info("Reading live timing data from recording. " "This may take a bit.") for fname in self.files: self._load_single_file(fname) self._files_read = True
def _load_single_file(self, fname): # read one file, parse its content and add it to the already loaded # data (if there is data already) with open(fname, 'r') as fobj: data = fobj.readlines() # try to find the correct start date (only if this is the first file) if not self._previous_files: self._try_set_correct_start_date(data) for line in data: self._parse_line(line) # first file was loaded, others are appended if any more are loaded self._previous_files = True def _parse_line(self, elem): # parse a single line of data if self._remove_duplicates: # prevent duplicates when loading data (slow, but it works...) # allows to load data from overlapping recordings lhash = hashlib.md5(elem.encode()).hexdigest() if lhash in self._line_hashes: return self._line_hashes.append(lhash) # load the three parts of each data element elem = self._fix_json(elem) try: cat, msg, dt = json.loads(elem) except (json.JSONDecodeError, ValueError): self.errorcount += 1 return # convert string to datetime try: dt = to_datetime(dt) except (ValueError, TypeError): self.errorcount += 1 return # if no start date could be determined beforehand, simply use the # first timestamp as we need to have some date as start date; # convert timestamp to timedelta (SessionTime) base on start date if self._start_date is None: self._start_date = dt td = timedelta(seconds=0) else: td = dt - self._start_date self._store_message(cat, td, msg) def _store_message(self, cat, td, msg): # stores parsed messages by category # TrackStatus and SessionStatus categories need special handling if cat == 'SessionData': self._parse_session_data(msg) elif cat not in ('TrackStatus', 'SessionStatus'): self._add_to_category(cat, [td, msg]) def _fix_json(self, elem): # fix F1's not json compliant data elem = elem.replace("'", '"') \ .replace('True', 'true') \ .replace('False', 'false') return elem def _add_to_category(self, cat, entry): if cat not in self.data: self.data[cat] = [entry, ] else: self.data[cat].append(entry) def _parse_session_data(self, msg): # make sure the categories exist as we want to append to them if 'TrackStatus' not in self.data.keys(): self.data['TrackStatus'] = {'Time': [], 'Status': [], 'Message': []} if 'SessionStatus' not in self.data.keys(): self.data['SessionStatus'] = {'Time': [], 'Status': []} if ('StatusSeries' in msg) and isinstance(msg['StatusSeries'], dict): for entry in msg['StatusSeries'].values(): # convert timestamp to timedelta try: status_dt = to_datetime(entry['Utc']) except (KeyError, ValueError, TypeError): self.errorcount += 1 continue status_timedelta = status_dt - self._start_date # add data to category if 'TrackStatus' in entry.keys(): status_value = str(entry['TrackStatus']) # convert to numeric system used by the api if not status_value.isnumeric(): status_value = _track_status_mapping[status_value] self.data['TrackStatus']['Time'].append(status_timedelta) self.data['TrackStatus']['Status'].append(status_value) self.data['TrackStatus']['Message'].append("") elif 'SessionStatus' in entry.keys(): self.data['SessionStatus']['Time'].append(status_timedelta) self.data['SessionStatus']['Status'].append(entry['SessionStatus']) def _try_set_correct_start_date(self, data): # skim content to find 'Started' session status without actually # decoding each line to save time for elem in data: if 'SessionStatus' in elem and 'Started' in elem: break else: # didn't find 'Started' logging.error("Error while trying to set correct " "session start date!") return # decode matching line elem = self._fix_json(elem) try: cat, msg, dt = json.loads(elem) except (json.JSONDecodeError, ValueError): logging.error("Error while trying to set correct " "session start date!") return # find correct entry in series try: for entry in msg['StatusSeries']: status = recursive_dict_get(entry, 'SessionStatus') if status == 'Started': try: self._start_date = to_datetime(entry['Utc']) except (KeyError, ValueError, TypeError): self.errorcount += 1 logging.error("Error while trying to set correct " "session start date!") return except AttributeError: for entry in msg['StatusSeries'].values(): status = entry.get('SessionStatus', None) if status == 'Started': try: self._start_date = to_datetime(entry['Utc']) except (KeyError, ValueError, TypeError): self.errorcount += 1 logging.error("Error while trying to set correct " "session start date!") return
[docs] def get(self, name): """ Return data for category name. Will load data on first call, this will take a bit. Args: name (str): name of the category """ if not self._files_read: self.load() return self.data[name]
[docs] def has(self, name): """ Check if data for a category name exists. Will load data on first call, this will take a bit. Args: name (str): name of the category """ if not self._files_read: self.load() return name in self.data.keys()
[docs] def list_categories(self): """ List all available data categories. Will load data on first call, this will take a bit. Returns: list of category names """ if not self._files_read: self.load() return list(self.data.keys())