Coverage for flashflow/v3bw.py : 24%

Hot-keys on this page
r m x p toggle line displays
j k next/prev highlighted chunk
0 (zero) top of page
1 (one) first highlighted chunk
1''' Functions to generate a v3bw file from the latest per-second measurement
2results '''
3from flashflow.results_logger import MeasLine, MeasLineBegin, MeasLineData,\
4 MeasLineEnd, Meas
5import datetime
6import gzip
7import logging
8import os
9import time
10from glob import iglob
11from shutil import rmtree
12from typing import List, Dict
13from . import __version__ as FF_VERSION
16log = logging.getLogger(__name__)
17V3BW_FORMAT_VERSION = "1.5.0"
20def _read(fname: str) -> List[MeasLine]:
21 ''' Read all per-second results from the given filename.
23 :param fname: the filename to read
24 :returns: List of :class:`MeasLine`, sorted by timestamp
26 The file must exist. The file is partially assumed to be well-formed: it
27 might have invalid lines or comments, which will be skipped over just fine,
28 but this function does not actually care that BEGIN lines come before data
29 lines, which themselves come before END lines. :func:`sorted` is stable, so
30 good order in == good order out.
32 If the file name ends in ``.gz``, it is assumed to be gzip compressed.
33 '''
34 log.debug('Reading per-second results from %s', fname)
35 out = []
36 if fname.endswith('.gz'):
37 fd = gzip.open(fname, 'rt')
38 else:
39 fd = open(fname, 'rt')
40 for line in fd:
41 meas_line = MeasLine.parse(line)
42 if not meas_line:
43 continue
44 out.append(meas_line)
45 fd.close()
46 return sorted(out, key=lambda item: item.ts)
49def _find_files(base: str, min_ts: float = 0) -> List[str]:
50 ''' Find all result files related to the given ``base`` filename.
52 Sort them by modification time such that the oldest files are first, and
53 return them all as a list. The ``base`` file *should* be last, given proper
54 function of the rest of FlashFlow, but do not assume it will be.
56 :param base: The path to the current results filename (e.g.
57 `data-coord/results/results.log``). Assuming it exists, it will appear
58 in the returned list of files. A ``*`` will be appended, and any files
59 found when shell globbing with that will also be returned.
60 :param min_ts: Only consider files with mtime equal or greater than this.
61 Defaults to zero, meaning consider all files found.
62 '''
63 # List of (mod_time, file_name) tuples
64 cache = []
65 for fname in iglob(base + '*'):
66 # Check if a file. It could be a directory.
67 if not os.path.isfile(fname):
68 continue
69 # Check if new enough
70 stat = os.stat(fname)
71 if stat.st_mtime < min_ts:
72 continue
73 cache.append((stat.st_mtime, fname))
74 # Since modification time is first, sorting the list of tuples will sort by
75 # mod_time s.t. oldest is first.
76 cache.sort()
77 return [i[1] for i in cache]
80def _write_results(
81 symlink_fname: str, results: Dict[str, Meas], ts: float) -> str:
82 ''' Write a new v3bw file and return the path to the new file.
84 The output format follows the "Bandwidth File Format".
85 https://gitweb.torproject.org/torspec.git/tree/bandwidth-file-spec.txt
86 If FlashFlow claims to output version X, but it is found to violate the
87 spec, revisions to FlashFlow should err on the side of simplicity. That may
88 mean supporting version X-1 instead. The spec for what should be a simple
89 document format is 1000 lines too long.
91 :param symlink_fname: Path prefix for the file to create. Will end up being
92 a symlink, and the actual file will be at this path plus a date suffix.
93 :param results: The measurements
94 :param ts: The current timestamp
95 :returns: The path to the file we created
96 '''
97 # Prepare the header
98 out = f'''{int(ts)}
99version={V3BW_FORMAT_VERSION}
100software=flashflow
101software_version={FF_VERSION}
102=====
103'''
104 # Add each relay's line
105 for relay_fp, meas in results.items():
106 bw = max(1, int(meas.result()/1000))
107 out += f'node_id=${relay_fp} bw={bw} '\
108 f'measured_at={int(meas.start_ts)}\n'
109 actual_fname = symlink_fname + '.' + datetime.datetime\
110 .utcfromtimestamp(ts).isoformat(timespec='seconds')
111 with open(actual_fname, 'wt') as fd:
112 fd.write(out)
113 # According to a comment in sbws code, renaming a file is atomic. Thus make
114 # the symlink at a temporary location, then rename it to the real desired
115 # location.
116 # https://github.com/torproject/sbws/blob/15fbb2bd3e290f30b18536d0822c703dbca65c4c/sbws/lib/v3bwfile.py#L1477
117 symlink_tmp_fname = symlink_fname + '.tmp'
118 # First ensure the temporary file doesn't exist. Perhaps from a previous
119 # run that crashed at a very exact wrong time.
120 try:
121 os.unlink(symlink_tmp_fname)
122 except IsADirectoryError:
123 rmtree(symlink_tmp_fname)
124 except FileNotFoundError:
125 # This is actually the normal case. This is fine.
126 pass
127 os.symlink(os.path.basename(actual_fname), symlink_tmp_fname)
128 os.rename(symlink_tmp_fname, symlink_fname)
129 return actual_fname
132def gen(v3bw_fname: str, results_fname: str, max_results_age: float) -> str:
133 ''' Generate a v3bw file based on the latest per-second measurement results
134 we have on disk.
136 :param v3bw_fname: The path to the v3bw file to create
137 :param results_fname: The path to the current results filename (e.g.
138 ``data-coord/results/results.log``). It will be read for the latest
139 results, and if needed, an ``*`` appended to the name to search for
140 adjacent logrotated files for additional necessary data.
141 :param max_results_age: The maximum number of seconds in the past a
142 measurement can have occurred and we'll still include it in the v3bw
143 file.
145 :returns: Path to the v3bw file created. This will be the ``v3bw_fname``
146 argument plus a suffix.
147 '''
148 log.info('Beginning v3bw file generation')
149 now = time.time()
150 ignored_lines = {
151 'age': 0,
152 'type': 0,
153 'unknown_meas': 0,
154 }
155 used_lines = 0
156 measurements: Dict[str, Meas] = {}
157 # store for our internal purposes a mapping of measurement ID to relay FP
158 mid_to_fp: Dict[int, str] = {}
159 fnames = _find_files(results_fname, min_ts=now-max_results_age)
160 for fname in fnames:
161 for line in _read(fname):
162 if line.ts < now - max_results_age:
163 ignored_lines['age'] += 1
164 if isinstance(line, MeasLineBegin):
165 if line.relay_fp in measurements:
166 log.info(
167 'Found new measurement for %s, forgetting old one',
168 line.relay_fp)
169 used_lines += 1
170 measurements[line.relay_fp] = Meas(line)
171 mid_to_fp[line.meas_id] = line.relay_fp
172 elif isinstance(line, MeasLineEnd):
173 if line.meas_id not in mid_to_fp:
174 log.warn(
175 'Found END line for unknown meas id %d. Dropping.',
176 line.meas_id)
177 ignored_lines['unknown_meas'] += 1
178 continue
179 used_lines += 1
180 measurements[mid_to_fp[line.meas_id]].set_end(line)
181 elif isinstance(line, MeasLineData):
182 if line.meas_id not in mid_to_fp:
183 log.warn(
184 'Found BG or MEASR line for unknown meas id %d. '
185 'Dropping.', line.meas_id)
186 ignored_lines['unknown_meas'] += 1
187 continue
188 if line.is_bg():
189 measurements[mid_to_fp[line.meas_id]].add_bg(line)
190 else:
191 measurements[mid_to_fp[line.meas_id]].add_measr(line)
192 used_lines += 1
193 else:
194 log.warn(
195 'Unhandled MeasLine type %s', type(line))
196 ignored_lines['type'] += 1
197 incomplete_relays = {
198 m.relay_fp for m in measurements.values() if not m.have_all_data()
199 }
200 if len(incomplete_relays):
201 for relay_fp in incomplete_relays:
202 del measurements[relay_fp]
203 log.info(
204 'Finished v3bw file generation. Used %d lines from %d files to '
205 'produce results for %d relays. Ignored %d lines due to age, '
206 '%d lines due to unknown type, %d lines due to unknown meas. '
207 'Ignored %d measurements that seem incomplete.',
208 used_lines, len(fnames), len(measurements),
209 ignored_lines['age'], ignored_lines['type'],
210 ignored_lines['unknown_meas'],
211 len(incomplete_relays),
212 )
213 return _write_results(v3bw_fname, measurements, now)