Hide keyboard shortcuts

Hot-keys on this page

r m x p   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

1''' Functions to generate a v3bw file from the latest per-second measurement 

2results ''' 

3from flashflow.results_logger import MeasLine, MeasLineBegin, MeasLineData,\ 

4 MeasLineEnd, Meas 

5import datetime 

6import gzip 

7import logging 

8import os 

9import time 

10from glob import iglob 

11from shutil import rmtree 

12from typing import List, Dict 

13from . import __version__ as FF_VERSION 

14 

15 

16log = logging.getLogger(__name__) 

17V3BW_FORMAT_VERSION = "1.5.0" 

18 

19 

20def _read(fname: str) -> List[MeasLine]: 

21 ''' Read all per-second results from the given filename. 

22 

23 :param fname: the filename to read 

24 :returns: List of :class:`MeasLine`, sorted by timestamp 

25 

26 The file must exist. The file is partially assumed to be well-formed: it 

27 might have invalid lines or comments, which will be skipped over just fine, 

28 but this function does not actually care that BEGIN lines come before data 

29 lines, which themselves come before END lines. :func:`sorted` is stable, so 

30 good order in == good order out. 

31 

32 If the file name ends in ``.gz``, it is assumed to be gzip compressed. 

33 ''' 

34 log.debug('Reading per-second results from %s', fname) 

35 out = [] 

36 if fname.endswith('.gz'): 

37 fd = gzip.open(fname, 'rt') 

38 else: 

39 fd = open(fname, 'rt') 

40 for line in fd: 

41 meas_line = MeasLine.parse(line) 

42 if not meas_line: 

43 continue 

44 out.append(meas_line) 

45 fd.close() 

46 return sorted(out, key=lambda item: item.ts) 

47 

48 

49def _find_files(base: str, min_ts: float = 0) -> List[str]: 

50 ''' Find all result files related to the given ``base`` filename. 

51 

52 Sort them by modification time such that the oldest files are first, and 

53 return them all as a list. The ``base`` file *should* be last, given proper 

54 function of the rest of FlashFlow, but do not assume it will be. 

55 

56 :param base: The path to the current results filename (e.g. 

57 `data-coord/results/results.log``). Assuming it exists, it will appear 

58 in the returned list of files. A ``*`` will be appended, and any files 

59 found when shell globbing with that will also be returned. 

60 :param min_ts: Only consider files with mtime equal or greater than this. 

61 Defaults to zero, meaning consider all files found. 

62 ''' 

63 # List of (mod_time, file_name) tuples 

64 cache = [] 

65 for fname in iglob(base + '*'): 

66 # Check if a file. It could be a directory. 

67 if not os.path.isfile(fname): 

68 continue 

69 # Check if new enough 

70 stat = os.stat(fname) 

71 if stat.st_mtime < min_ts: 

72 continue 

73 cache.append((stat.st_mtime, fname)) 

74 # Since modification time is first, sorting the list of tuples will sort by 

75 # mod_time s.t. oldest is first. 

76 cache.sort() 

77 return [i[1] for i in cache] 

78 

79 

80def _write_results( 

81 symlink_fname: str, results: Dict[str, Meas], ts: float) -> str: 

82 ''' Write a new v3bw file and return the path to the new file. 

83 

84 The output format follows the "Bandwidth File Format". 

85 https://gitweb.torproject.org/torspec.git/tree/bandwidth-file-spec.txt 

86 If FlashFlow claims to output version X, but it is found to violate the 

87 spec, revisions to FlashFlow should err on the side of simplicity. That may 

88 mean supporting version X-1 instead. The spec for what should be a simple 

89 document format is 1000 lines too long. 

90 

91 :param symlink_fname: Path prefix for the file to create. Will end up being 

92 a symlink, and the actual file will be at this path plus a date suffix. 

93 :param results: The measurements 

94 :param ts: The current timestamp 

95 :returns: The path to the file we created 

96 ''' 

97 # Prepare the header 

98 out = f'''{int(ts)} 

99version={V3BW_FORMAT_VERSION} 

100software=flashflow 

101software_version={FF_VERSION} 

102===== 

103''' 

104 # Add each relay's line 

105 for relay_fp, meas in results.items(): 

106 bw = max(1, int(meas.result()/1000)) 

107 out += f'node_id=${relay_fp} bw={bw} '\ 

108 f'measured_at={int(meas.start_ts)}\n' 

109 actual_fname = symlink_fname + '.' + datetime.datetime\ 

110 .utcfromtimestamp(ts).isoformat(timespec='seconds') 

111 with open(actual_fname, 'wt') as fd: 

112 fd.write(out) 

113 # According to a comment in sbws code, renaming a file is atomic. Thus make 

114 # the symlink at a temporary location, then rename it to the real desired 

115 # location. 

116 # https://github.com/torproject/sbws/blob/15fbb2bd3e290f30b18536d0822c703dbca65c4c/sbws/lib/v3bwfile.py#L1477 

117 symlink_tmp_fname = symlink_fname + '.tmp' 

118 # First ensure the temporary file doesn't exist. Perhaps from a previous 

119 # run that crashed at a very exact wrong time. 

120 try: 

121 os.unlink(symlink_tmp_fname) 

122 except IsADirectoryError: 

123 rmtree(symlink_tmp_fname) 

124 except FileNotFoundError: 

125 # This is actually the normal case. This is fine. 

126 pass 

127 os.symlink(os.path.basename(actual_fname), symlink_tmp_fname) 

128 os.rename(symlink_tmp_fname, symlink_fname) 

129 return actual_fname 

130 

131 

132def gen(v3bw_fname: str, results_fname: str, max_results_age: float) -> str: 

133 ''' Generate a v3bw file based on the latest per-second measurement results 

134 we have on disk. 

135 

136 :param v3bw_fname: The path to the v3bw file to create 

137 :param results_fname: The path to the current results filename (e.g. 

138 ``data-coord/results/results.log``). It will be read for the latest 

139 results, and if needed, an ``*`` appended to the name to search for 

140 adjacent logrotated files for additional necessary data. 

141 :param max_results_age: The maximum number of seconds in the past a 

142 measurement can have occurred and we'll still include it in the v3bw 

143 file. 

144 

145 :returns: Path to the v3bw file created. This will be the ``v3bw_fname`` 

146 argument plus a suffix. 

147 ''' 

148 log.info('Beginning v3bw file generation') 

149 now = time.time() 

150 ignored_lines = { 

151 'age': 0, 

152 'type': 0, 

153 'unknown_meas': 0, 

154 } 

155 used_lines = 0 

156 measurements: Dict[str, Meas] = {} 

157 # store for our internal purposes a mapping of measurement ID to relay FP 

158 mid_to_fp: Dict[int, str] = {} 

159 fnames = _find_files(results_fname, min_ts=now-max_results_age) 

160 for fname in fnames: 

161 for line in _read(fname): 

162 if line.ts < now - max_results_age: 

163 ignored_lines['age'] += 1 

164 if isinstance(line, MeasLineBegin): 

165 if line.relay_fp in measurements: 

166 log.info( 

167 'Found new measurement for %s, forgetting old one', 

168 line.relay_fp) 

169 used_lines += 1 

170 measurements[line.relay_fp] = Meas(line) 

171 mid_to_fp[line.meas_id] = line.relay_fp 

172 elif isinstance(line, MeasLineEnd): 

173 if line.meas_id not in mid_to_fp: 

174 log.warn( 

175 'Found END line for unknown meas id %d. Dropping.', 

176 line.meas_id) 

177 ignored_lines['unknown_meas'] += 1 

178 continue 

179 used_lines += 1 

180 measurements[mid_to_fp[line.meas_id]].set_end(line) 

181 elif isinstance(line, MeasLineData): 

182 if line.meas_id not in mid_to_fp: 

183 log.warn( 

184 'Found BG or MEASR line for unknown meas id %d. ' 

185 'Dropping.', line.meas_id) 

186 ignored_lines['unknown_meas'] += 1 

187 continue 

188 if line.is_bg(): 

189 measurements[mid_to_fp[line.meas_id]].add_bg(line) 

190 else: 

191 measurements[mid_to_fp[line.meas_id]].add_measr(line) 

192 used_lines += 1 

193 else: 

194 log.warn( 

195 'Unhandled MeasLine type %s', type(line)) 

196 ignored_lines['type'] += 1 

197 incomplete_relays = { 

198 m.relay_fp for m in measurements.values() if not m.have_all_data() 

199 } 

200 if len(incomplete_relays): 

201 for relay_fp in incomplete_relays: 

202 del measurements[relay_fp] 

203 log.info( 

204 'Finished v3bw file generation. Used %d lines from %d files to ' 

205 'produce results for %d relays. Ignored %d lines due to age, ' 

206 '%d lines due to unknown type, %d lines due to unknown meas. ' 

207 'Ignored %d measurements that seem incomplete.', 

208 used_lines, len(fnames), len(measurements), 

209 ignored_lines['age'], ignored_lines['type'], 

210 ignored_lines['unknown_meas'], 

211 len(incomplete_relays), 

212 ) 

213 return _write_results(v3bw_fname, measurements, now)