Hide keyboard shortcuts

Hot-keys on this page

r m x p   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

1from argparse import ArgumentParser 

2import asyncio 

3import enum 

4import glob 

5import logging 

6import os 

7import random 

8import ssl 

9import time 

10from configparser import ConfigParser 

11from functools import partial 

12from traceback import StackSummary 

13from statistics import median 

14from tempfile import NamedTemporaryFile 

15from typing import Tuple, List, IO, Set, Dict, Optional 

16from .. import meas_period 

17from .. import msg 

18from .. import results_logger 

19from .. import tor_client 

20from .. import v3bw 

21from .. import __version__ as FF_VERSION, PKG_DIR 

22from ..relay_filter_list import RelayFilterList 

23from ..state_file import StateFile 

24from ..tor_ctrl_msg import CoordStartMeas 

25from stem import CircStatus # type: ignore 

26from stem.control import Controller, EventType # type: ignore 

27import stem.descriptor.router_status_entry as stem_rse # type: ignore 

28from stem.response.events import CircuitEvent, FFMeasEvent # type: ignore 

29from transitions import Machine # type: ignore 

30 

31RELAYS_TXT_FNAME = os.path.join(PKG_DIR, 'relays.txt') 

32 

33 

34def next_meas_id() -> int: 

35 ''' Generate a new measurement ID ''' 

36 return random.randint(1, 2**32-1) 

37 

38 

39def find_relay(c: Controller, nick_fp: str) \ 

40 -> Optional[stem_rse.RouterStatusEntryV3]: 

41 ''' Ask our tor client for the :class:`RouterStatusEntryV3` object for the 

42 given relay, identified either by nickname or fingerprint. If impossible, 

43 (e.g. it doesn't exist), return ``None``. 

44 ''' 

45 return c.get_network_status(nick_fp, default=None) 

46 

47 

48def get_measr_infos(conf: ConfigParser) -> List[meas_period.MeasrInfo]: 

49 ''' Parse the information we have on our measurers from our config 

50 file, and return a list of :class:`MeasrInfo` for them. ''' 

51 out: List[meas_period.MeasrInfo] = [] 

52 measr_ids_str = conf.get('coord', 'measurers').strip() 

53 if not measr_ids_str: 

54 return out 

55 def_bw = conf.getint('measr_default', 'measr_bw') 

56 for measr_id in measr_ids_str.split(','): 

57 out.append(meas_period.MeasrInfo( 

58 measr_id, 

59 conf.getint( 

60 'measr_' + measr_id, 

61 'measr_bw', 

62 fallback=def_bw), 

63 )) 

64 return out 

65 

66 

67def get_relays_to_measure(c: Controller, rfl_fname: str) \ 

68 -> List[meas_period.RelayInfo]: 

69 ''' Return the list of relay fingerprints of all known relays that we 

70 should measure. 

71 

72 :param c: Tor controller 

73 :param rfl_fname: Filename containing a :class:`RelayFilterList` 

74 ''' 

75 relays = [] 

76 with open(rfl_fname, 'rt') as fd: 

77 rfl = RelayFilterList.from_str(fd.read()) 

78 for ns in c.get_network_statuses(): 

79 fp = ns.fingerprint 

80 if rfl.should_measure(fp, default=False): 

81 relays.append(meas_period.RelayInfo(fp)) 

82 return relays 

83 

84 

85def slots_in_period(conf: ConfigParser) -> int: 

86 ''' Calculate the number of slots in a measurement period based on the 

87 given configuration ''' 

88 period = conf.getint('coord', 'meas_period') 

89 dur = conf.getint('meas_params', 'meas_duration') 

90 return int(period / (dur * 2)) 

91 

92 

93class MeasrProtocol(asyncio.Protocol): 

94 ''' How we communicate with measurers. 

95 

96 Very little should be done here. Parse the bytes then give objects to the 

97 main state machines to handle. 

98 ''' 

99 transport: Optional[asyncio.Transport] = None 

100 

101 def connection_made(self, transport): 

102 # TODO: log host:port of measurer 

103 log.debug('Connection from measurer') 

104 self.transport = transport 

105 # Set measurer ID to the organizationName in the measurer's certificate 

106 self.measr_id = self.transport.get_extra_info( 

107 'peercert')['subject'][0][0][1] 

108 log.info('Connection from measurer "%s"', self.measr_id) 

109 machine.notif_measurer_connected(self) 

110 

111 def connection_lost(self, exc): 

112 log.debug('Lost connection with measurer') 

113 machine.notif_measurer_disconnected(self) 

114 # TODO: anything else need to be done? 

115 

116 def data_received(self, data: bytes): 

117 # XXX: It's possible we receive an incomplete JSON string 

118 # https://gitlab.torproject.org/pastly/flashflow/-/issues/10 

119 log.info('Received %d bytes: %s', len(data), data) 

120 m = msg.FFMsg.deserialize(data) 

121 machine.notif_measr_msg(self, m) 

122 

123 

124class CtrlProtocol(asyncio.Protocol): 

125 ''' Development/debugging control communication protocol ''' 

126 transport: Optional[asyncio.Transport] = None 

127 

128 def connection_made(self, transport): 

129 # TODO: log host:port of controller 

130 log.debug('Connection from controller') 

131 self.transport = transport 

132 # machine.notif_measurer_connected(self) 

133 

134 def connection_lost(self, exc): 

135 log.debug('Lost connection with controller') 

136 # machine.notif_measurer_disconnected(self) 

137 

138 def data_received(self, data: bytes): 

139 # log.debug('ctrl: %s', data) 

140 assert self.transport is not None 

141 success, err_str = machine.notif_ctrl_message(data.decode('utf-8')) 

142 if success: 

143 self.transport.write(b'OK') 

144 else: 

145 self.transport.write(err_str.encode('utf-8')) 

146 self.transport.close() 

147 

148 

149class Measurement: 

150 ''' State related to a single measurment. ''' 

151 #: Measurement ID 

152 meas_id: int 

153 #: The fingerprint of the relay to measure 

154 relay_fp: str 

155 #: Our circuit id with the relay. Filled in once we actually have one 

156 relay_circ: Optional[int] 

157 #: The measurers participating in this measurement 

158 measurers: Set[MeasrProtocol] 

159 #: The measurers that have indicated the are ready to begin active 

160 #: measurement 

161 ready_measurers: Set[MeasrProtocol] 

162 #: The duration, in seconds, that active measurement should last. 

163 meas_duration: int 

164 #: The percent of background traffic, as a fraction between 0 and 1, that 

165 #: the relay should be limiting itself to. 

166 bg_percent: float 

167 #: Per-second reports from the relay with the number of the bytes of 

168 #: background traffic it is carrying. Each tuple is ``(time, sent_bytes, 

169 #: received_bytes)`` where time is the timestamp at which the report was 

170 #: received and sent/received are from the relay's perspective. 

171 bg_reports: List[Tuple[float, int, int]] 

172 #: Per-second reports from the measurers with the number of bytes of 

173 #: measurement traffic. Each tuple is ``(time, sent_bytes, 

174 #: received_bytes)``, where time is the timestamp at which the report was 

175 #: received and sent/received are from the measurer's perspective. 

176 measr_reports: Dict[MeasrProtocol, List[Tuple[float, int, int]]] 

177 

178 def __init__( 

179 self, meas_id: int, relay_fp: str, 

180 measurers: Set[MeasrProtocol], 

181 meas_duration: int, bg_percent: float): 

182 self.meas_id = meas_id 

183 self.relay_fp = relay_fp 

184 self.relay_circ = None # we build the circ and get a circ id later 

185 self.measurers = measurers 

186 self.ready_measurers = set() 

187 self.meas_duration = meas_duration 

188 self.bg_percent = bg_percent 

189 self.bg_reports = [] 

190 self.measr_reports = {m: [] for m in self.measurers} 

191 

192 def start_and_end(self) -> Tuple[float, float]: 

193 ''' Return our best idea for what the start and end timestamp of this 

194 :class:`Measurement` are. 

195 

196 This method assumes the measurement is finished. 

197 

198 We currently only consider the timestamps in the background reports; 

199 these have timestamps that *we* generated, so if we always use only 

200 ourself as the authority on what time it is, we'll always be 

201 consistent. Consider a deployment with at lesat two measurers, one with 

202 a very fast clock and another with a very slow clock. If we instead, 

203 for example, took the earliest timestamp as the start and the latest 

204 timestamp as the end, then not only could the measurement "last" more 

205 than its actual duration, but our accuracy on start/end times would 

206 change as the set of measureers used for each measurement changes. 

207 ''' 

208 start_ts = self.bg_reports[0][0] 

209 end_ts = self.bg_reports[-1][0] 

210 return start_ts, end_ts 

211 

212 def should_save_data(self) -> bool: 

213 ''' Return our best idea for whether or not it would be a good idea to 

214 save this measurement's results to disk. ''' 

215 return not not len(self.bg_reports) 

216 

217 

218class States(enum.Enum): 

219 ''' States that we, as a FlashFlow coordinator, can be in. ''' 

220 #: State in which we are created and to which we return when there's a 

221 #: non-fatal error. 

222 START = enum.auto() 

223 #: First "real" state. Open all listening sockets. 

224 ENSURE_LISTEN_SOCKS = enum.auto() 

225 #: Second real state. Launch a tor client and connect to it. 

226 ENSURE_CONN_W_TOR = enum.auto() 

227 #: Normal state. We're doing measurements or waiting to decide to do them. 

228 #: We are usually here. 

229 READY = enum.auto() 

230 #: There was some sort of error that calls for cleaning everything up and 

231 #: essentially relaunching, but we shouldn't outright die. 

232 NONFATAL_ERROR = enum.auto() 

233 #: There is a serious error that isn't recoverable. Just cleanup and die. 

234 FATAL_ERROR = enum.auto() 

235 

236 

237class StateMachine(Machine): 

238 ''' State machine and main control flow hub for FlashFlow coordinator. 

239 

240 change_state_*: 

241 State transitions are named change_state_* and don't exist here in the 

242 code. See the analogous docstring in 

243 :class:`flashflow.cmd.measurer.StateMachine` for more information. 

244 

245 on_enter_*: 

246 This is how the :class:`Machine` class finds functions to call upon 

247 entering the given state. See the analogous docstring in the 

248 StateMachine for measurers for more information. 

249 

250 _*: 

251 Other internal functions. See their documentation for more information 

252 on them. 

253 ''' 

254 # conf # This is set in __init__ 

255 #: Listening sockets for FlashFlow :mod:`flashflow.cmd.measurer` 

256 #: connections 

257 meas_server: asyncio.base_events.Server 

258 #: Listening sockets for FlashFlow :mod:`flashflow.cmd.ctrl` connections 

259 ctrl_server: asyncio.base_events.Server 

260 #: Stem controller object we have with our Tor client 

261 tor_client: Controller 

262 #: List of all connected measurers by their :class:`MeasrProtocol` 

263 measurers: List[MeasrProtocol] 

264 #: Storage of ongoing :class:`Measurement`\s 

265 measurements: Dict[int, Measurement] 

266 #: On-disk storage of long term information 

267 state_file: StateFile 

268 #: Our current measurement schedule 

269 schedule: Optional[meas_period.Schedule] 

270 #: If we have one, the handle for the event to be called at the beginning 

271 #: of the next measurement slot 

272 begin_of_slot_handle: Optional[asyncio.TimerHandle] 

273 

274 def __init__(self, conf): 

275 self.conf = conf 

276 self.measurements = {} 

277 self.measurers = [] 

278 self.state_file = StateFile.from_file(conf.getpath('coord', 'state')) 

279 self.schedule = None 

280 self.begin_of_slot_handle = None 

281 super().__init__( 

282 model=self, 

283 states=States, 

284 transitions=[ 

285 { 

286 'trigger': 'change_state_starting', 

287 'source': [States.START, States.NONFATAL_ERROR], 

288 'dest': States.ENSURE_LISTEN_SOCKS, 

289 }, 

290 { 

291 'trigger': 'change_state_listening', 

292 'source': States.ENSURE_LISTEN_SOCKS, 

293 'dest': States.ENSURE_CONN_W_TOR, 

294 }, 

295 { 

296 'trigger': 'change_state_connected_to_tor', 

297 'source': States.ENSURE_CONN_W_TOR, 

298 'dest': States.READY, 

299 }, 

300 { 

301 'trigger': 'change_state_nonfatal_error', 

302 'source': '*', 

303 'dest': States.NONFATAL_ERROR, 

304 }, 

305 { 

306 'trigger': 'change_state_fatal_error', 

307 'source': '*', 

308 'dest': States.FATAL_ERROR, 

309 }, 

310 ], 

311 initial=States.START, 

312 # Do not create .to_<state>() methods, which allow transition to 

313 # <state> regardless of current state 

314 auto_transitions=False, 

315 ) 

316 

317 def _ensure_listen_socks(self): 

318 ''' Main function in the ENSURE_LISTEN_SOCKS state. Open listening 

319 sockets for measurers as well as for a FlashFlow controller ''' 

320 # Get (host, port) from "host:port" 

321 measr_addr_port = self.conf.getaddr('coord', 'listen_addr') 

322 ctrl_addr_port = self.conf.getaddr('coord', 'ctrl_addr') 

323 if measr_addr_port is None: 

324 log.error('Don\'t know what to listen on') 

325 self.change_state_fatal_error() 

326 return 

327 if ctrl_addr_port is None: 327 ↛ 328line 327 didn't jump to line 328, because the condition on line 327 was never true

328 log.error('Don\'t know what to listen on') 

329 self.change_state_fatal_error() 

330 return 

331 # Make sure TLS key material exists 

332 our_key = self.conf.getpath('coord', 'key') 

333 keydir = self.conf.getpath('coord', 'keydir') 

334 if not os.path.isfile(our_key): 

335 log.error('%s does not exist', our_key) 

336 self.change_state_fatal_error() 

337 return 

338 if not os.path.isdir(keydir): 338 ↛ 345line 338 didn't jump to line 345, because the condition on line 338 was never false

339 log.error('%s does not exist', keydir) 

340 self.change_state_fatal_error() 

341 return 

342 # Start building ssl context. This first bit is a helper that takes the 

343 # measurer certificate files and combines them into one big file 

344 # listing them all, since that's what python's ssl wants 

345 _, measr_cert_fname = _gen_concated_measr_cert_file(keydir, our_key) 

346 ssl_context = ssl.SSLContext() 

347 # Load our TLS private key and certificate 

348 ssl_context.load_cert_chain(our_key) 

349 # Load the certificate of the measurers 

350 ssl_context.load_verify_locations(measr_cert_fname) 

351 ssl_context.verify_mode = ssl.CERT_REQUIRED 

352 # Create the async task of opening this listen socks. 

353 measr_task = loop.create_task(loop.create_server( 

354 MeasrProtocol, 

355 measr_addr_port[0], measr_addr_port[1], 

356 ssl=ssl_context, 

357 reuse_address=True, 

358 )) 

359 ctrl_task = loop.create_task(loop.create_server( 

360 CtrlProtocol, 

361 ctrl_addr_port[0], ctrl_addr_port[1], 

362 reuse_address=True, 

363 )) 

364 

365 # Callback to find out the result of the attempt to open listen sockets 

366 def measr_cb(fut): 

367 exc = fut.exception() 

368 if exc: 

369 log.error('Unable to open listen socket(s): %s', exc) 

370 self.change_state_fatal_error() 

371 return 

372 self.meas_server = fut.result() 

373 for s in self.meas_server.sockets: 

374 log.info('Listening on %s for measurers', s.getsockname()) 

375 self.change_state_listening() 

376 

377 def ctrl_cb(fut): 

378 exc = fut.exception() 

379 if exc: 

380 log.error('Unable to open listen socket(s): %s', exc) 

381 self.change_state_fatal_error() 

382 return 

383 self.ctrl_server = fut.result() 

384 for s in self.ctrl_server.sockets: 

385 log.info('Listening on %s for FF controllers', s.getsockname()) 

386 # Attach the callback so we find out the results. This will happen 

387 # asynchronously after we return. And we're returning now. 

388 measr_task.add_done_callback(measr_cb) 

389 ctrl_task.add_done_callback(ctrl_cb) 

390 

391 def _ensure_conn_w_tor(self): 

392 ''' Main function in the ENSURE_CONN_W_TOR state. Launch a tor client 

393 and connect to it. Save the Controller object. ''' 

394 assert self.state == States.ENSURE_CONN_W_TOR 

395 # TODO: what happens if tor client disappears? Exception thrown? What?? 

396 # And what should we do about it? Try to relaunch? Just die? Choose 

397 # **something** 

398 c = tor_client.launch( 

399 self.conf.getpath('tor', 'tor_bin'), 

400 self.conf.getpath('coord', 'tor_datadir'), 

401 self.conf.get('tor', 'torrc_extra_lines') 

402 ) 

403 if not c: 

404 log.error('Unable to launch and connect to tor client') 

405 self.change_state_fatal_error() 

406 return 

407 c.add_event_listener(self.notif_circ_event, EventType.CIRC) 

408 c.add_event_listener(self.notif_ffmeas_event, EventType.FF_MEAS) 

409 self.tor_client = c 

410 self.change_state_connected_to_tor() 

411 

412 def _now_ready(self): 

413 ''' Main function called at the beginning of the READY state. Check 

414 various things to see if they need to be done. ''' 

415 log.info('(Re)started FlashFlow %s', FF_VERSION) 

416 now = time.time() 

417 self.state_file.set('last_restart', now) 

418 # Determine what measurement period we are in. 

419 meas_period_dur = self.conf.getint('coord', 'meas_period') 

420 last_meas_period = self.state_file.get('last_meas_period', default=0) 

421 curr_meas_period = meas_period.current_period(now, meas_period_dur) 

422 log.info( 

423 'Last recorded measurement period: %d, current: %d', 

424 last_meas_period, curr_meas_period) 

425 # Measurement period numbers should only go up, unless their duration 

426 # was changed to be longer between runs. Thus we only check for 

427 # inequality (as opposed to the new one being larger) and act whenever 

428 # they are different 

429 if last_meas_period != curr_meas_period: 

430 self._begin_of_new_period() 

431 else: 

432 self.schedule = meas_period.Schedule.from_dict( 

433 self.state_file.get('meas_schedule')) 

434 assert self.schedule 

435 log.info('Loaded existing measurement schedule from state file') 

436 # Kick off the first call of our beginning-of-slot event 

437 meas_dur = self.conf.getint('meas_params', 'meas_duration') 

438 next_slot_time = meas_period.time_till_next_slot(now, meas_dur) 

439 self.begin_of_slot_handle = loop.call_later( 

440 next_slot_time, self._begin_of_slot_cb) 

441 log.info( 

442 'Will do begin-of-slot callback in %0.3f seconds', 

443 next_slot_time) 

444 

445 def _begin_of_new_period(self): 

446 # Need to throw away old schedule and make a new one. 

447 self.schedule = self._make_new_schedule() 

448 self.state_file.set( 

449 'meas_schedule', self.schedule.to_dict(), skip_write=True) 

450 # Update measurement period number 

451 meas_period_dur = self.conf.getint('coord', 'meas_period') 

452 curr_meas_period = meas_period.current_period( 

453 time.time(), meas_period_dur) 

454 self.state_file.set('last_meas_period', curr_meas_period) 

455 

456 def _schedule_next_begin_of_slot(self): 

457 ''' Schedule the next call of the begin-of-slot callback function ''' 

458 # Cancel any existing one. Safe to do even if already canceled or 

459 # executed 

460 if self.begin_of_slot_handle: 

461 self.begin_of_slot_handle.cancel() 

462 # Now for the new one 

463 now = time.time() 

464 meas_dur = self.conf.getint('meas_params', 'meas_duration') 

465 self.begin_of_slot_handle = loop.call_later( 

466 meas_period.time_till_next_slot(now+0.001, meas_dur), 

467 self._begin_of_slot_cb) 

468 

469 def _begin_of_slot_cb(self): 

470 now = time.time() 

471 period_dur = self.conf.getint('coord', 'meas_period') 

472 meas_dur = self.conf.getint('meas_params', 'meas_duration') 

473 current_period = meas_period.current_period(now, period_dur) 

474 current_slot = meas_period.current_slot(now, period_dur, meas_dur) 

475 log.info( 

476 'It\'s the beginning of period %d slot %d', 

477 current_period, current_slot) 

478 # Cancel any still running measurements. It's too late. Time to go and 

479 # free up resources. 

480 if len(self.measurements): 

481 # Grab a copy of the measurements. The dictionary will be updated 

482 # in the :meth:`StateMachine._finish_measurement` call, which 

483 # breaks iteration 

484 measurements = [_ for _ in self.measurements.values()] 

485 for meas in measurements: 

486 log.warn( 

487 'Cleaning up after must-have-failed measurement %d', 

488 meas.meas_id) 

489 fail_msg = msg.Failure( 

490 msg.FailCode.C_END_OF_SLOT, 

491 meas.meas_id, 

492 None) 

493 self._finish_measurement(meas, fail_msg) 

494 assert not len(self.measurements) 

495 # Check if we've entered a new period. If so, need to generate new 

496 # schedule 

497 if current_period != self.state_file.get('last_meas_period'): 

498 self._begin_of_new_period() 

499 # If no measurements this slot, return early 

500 if current_slot not in self.schedule.slots: 

501 log.info('Nothing to do') 

502 self._schedule_next_begin_of_slot() 

503 return 

504 # Stuff to do, so send ourself the command to do said stuff 

505 measurements = self.schedule.slots[current_slot] 

506 for relay_fp, measr_meas_infos in measurements: 

507 log.info( 

508 'Would start meas of %s with %d measurers', 

509 relay_fp, len(measr_meas_infos)) 

510 success, err_msg = self.notif_ctrl_message('measure ' + relay_fp) 

511 if not success: 

512 log.warn('Unable to start measurement: %s', err_msg) 

513 # And finally make sure we will run this function again 

514 self._schedule_next_begin_of_slot() 

515 

516 def _make_new_schedule(self): 

517 ''' Make a new measurement schedule, scheduling each relay we know 

518 about to be measured at some point during the existing measurement 

519 period (if now is not the very beginning of a period, we might schedule 

520 some for the past). ''' 

521 relay_infos = get_relays_to_measure( 

522 self.tor_client, 

523 self.conf.getpath('coord', 'relay_filter_list')) 

524 measr_infos = get_measr_infos(self.conf) 

525 return meas_period.Schedule.gen( 

526 relay_infos, measr_infos, 

527 slots_in_period(self.conf), 

528 self.conf.getint('meas_params', 'num_circs'), 

529 ) 

530 

531 def _cleanup(self): 

532 ''' Cleanup all of our state while being very careful to not allow any 

533 exceptions to bubble up. Use this when in an error state and you want 

534 to cleanup before starting over or just dying. ''' 

535 if hasattr(self, 'meas_server') and self.meas_server: 535 ↛ 536line 535 didn't jump to line 536, because the condition on line 535 was never true

536 log.info('cleanup: closing listening sockets for measurers') 

537 try: 

538 self.meas_server.close() 

539 except Exception as e: 

540 log.error('Error closing listening sockets: %s', e) 

541 if hasattr(self, 'ctrl_server') and self.ctrl_server: 541 ↛ 542line 541 didn't jump to line 542, because the condition on line 541 was never true

542 log.info('cleanup: closing listening sockets for controllers') 

543 try: 

544 self.ctrl_server.close() 

545 except Exception as e: 

546 log.error('Error closing listening sockets: %s', e) 

547 if hasattr(self, 'tor_client') and self.tor_client: 547 ↛ exitline 547 didn't return from function '_cleanup', because the condition on line 547 was never false

548 log.info('cleanup: closing tor') 

549 try: 

550 self.tor_client.close() 

551 except Exception as e: 

552 log.error('Error closing tor: %s', e) 

553 

554 def _die(self): 

555 ''' End execution of the program. ''' 

556 loop.stop() 

557 

558 def _have_all_bw_reports(self, meas: Measurement) -> bool: 

559 ''' Check if we have the expected number of ``bg_reports`` and 

560 ``measr_reports`` for the given measurement 

561 

562 :param meas_id: The measurement ID. If we don't know about it, we 

563 return ``False`` 

564 :returns: ``True`` if we have the expected number of reports from all 

565 parties, else ``False`` i.e. because unrecognized ``meas_id`` or 

566 simply yet-incomplete measurement 

567 ''' 

568 # For debug logging purposes, gather all the report list lengths here. 

569 # We could return early as soon as we find one that isn't long enough. 

570 # But this may help debug. 

571 # 

572 # The first item is the number of background reports. All other items 

573 # are the number of reports from each measurer 

574 counts = [len(meas.bg_reports)] + [ 

575 len(measr_n_reports) for measr_n_reports 

576 in meas.measr_reports.values()] 

577 num_expect = meas.meas_duration 

578 log.debug( 

579 'Meas %d has %d/%d bg reports. Num measr reports: %s', 

580 meas.meas_id, counts[0], num_expect, 

581 ', '.join([str(_) for _ in counts[1:]])) 

582 # Count how many of the counts are less than the number needed. If 

583 # a non-zero number of the counts are too small, not done yet 

584 if len(['' for c in counts if c < num_expect]): 

585 log.debug('Meas %d not finished', meas.meas_id) 

586 return False 

587 log.info('Meas %d is finished', meas.meas_id) 

588 return True 

589 

590 def _finish_measurement( 

591 self, meas: Measurement, fail_msg: Optional[msg.Failure]): 

592 ''' We have finished a measurement, successful or not. Write out the 

593 results we have, tell everyone we are done, and forget about it. ''' 

594 self._write_measurement_results(meas) 

595 if fail_msg is not None: 

596 for measr in meas.measurers: 

597 if measr.transport: 

598 measr.transport.write(fail_msg.serialize()) 

599 del self.measurements[meas.meas_id] 

600 log.info( 

601 'Meas %d finished.%s Now %d measurements.', 

602 meas.meas_id, 

603 '' if not fail_msg else ' Err="' + str(fail_msg) + '".', 

604 len(self.measurements)) 

605 

606 def _write_measurement_results(self, meas: Measurement): 

607 ''' We have completed a measurement (maybe successfully) and should 

608 write out measurement results to our file. ''' 

609 if not meas.should_save_data(): 

610 return 

611 start_ts, end_ts = meas.start_and_end() 

612 results_logger.write_begin(meas.relay_fp, meas.meas_id, int(start_ts)) 

613 # Take the minimum of send/recv from the relay's bg reports for each 

614 # second. These are untrusted results because the relay may have lied 

615 # about having a massive amount of background traffic 

616 bg_report_untrust = [(ts, min(s, r)) for ts, s, r in meas.bg_reports] 

617 # Always take the recv side of measurer reports since that's the only 

618 # side that definitely made it back from the relay 

619 measr_reports = [] 

620 for measr_report in meas.measr_reports.values(): 

621 lst = [(ts, r) for ts, _, r in measr_report] 

622 for ts, r in lst: 

623 results_logger.write_meas( 

624 meas.meas_id, int(ts), r) 

625 measr_reports.append([r for _, r in lst]) 

626 # For each second, cap the amount of claimed bg traffic to the maximum 

627 # amount we will trust. I.e. if the relay is supposed to reserve no 

628 # more than 25% of its capacity for bg traffic, make sure the reported 

629 # background traffic is no more than 25% of all data we have for that 

630 # second. 

631 # TODO: make the fraction configurable 

632 bg_report_trust = [] 

633 for sec_i, (ts, bg_untrust) in enumerate(bg_report_untrust): 

634 # The relay is supposed to be throttling its bg traffic such that 

635 # it is no greater than some fraction of total traffic. 

636 # frac = bg / (bg + meas) 

637 # We know and trust meas. We know frac. Thus we can solve for the 

638 # maximum allowed bg: 

639 # frac * bg + frac * meas = bg 

640 # frac * bg - bg = -frac * meas 

641 # bg * (frac - 1) = -frac * meas 

642 # bg = (-frac * meas) / (frac - 1) 

643 # bg = (frac * meas) / (1 - frac) 

644 frac = meas.bg_percent 

645 measured = sum([ 

646 measr_report[sec_i] for measr_report in measr_reports]) 

647 max_bg = int(frac * measured / (1 - frac)) 

648 if bg_untrust > max_bg: 

649 log.warn( 

650 'Meas %d capping %s\'s reported bg to %d as %d is too ' 

651 'much', meas.meas_id, meas.relay_fp, max_bg, bg_untrust) 

652 bg_report_trust.append(min(bg_untrust, max_bg)) 

653 results_logger.write_bg( 

654 meas.meas_id, int(ts), bg_untrust, max_bg) 

655 # Calculate each second's aggregate bytes 

656 aggs = [ 

657 sum(sec_i_vals) for sec_i_vals 

658 in zip(bg_report_trust, *measr_reports)] 

659 # Calculate the median over all seconds 

660 res = int(median(aggs)) 

661 # Log as Mbit/s 

662 log.info( 

663 'Meas %d %s was measured at %.2f Mbit/s', 

664 meas.meas_id, meas.relay_fp, res*8/1e6) 

665 results_logger.write_end(meas.meas_id, int(end_ts)) 

666 

667 def _notif_circ_event_BUILT(self, meas: Measurement, event: CircuitEvent): 

668 ''' Received CIRC event with status BUILT. 

669 

670 Look for a Measurement waiting on this circ_id to be BUILT. ''' 

671 assert event.status == CircStatus.BUILT 

672 circ_id = int(event.id) 

673 log.debug( 

674 'Found meas %d waiting on circ %d to be built. Not doing ' 

675 'anything yet.', meas.meas_id, circ_id) 

676 

677 def _notif_circ_event_CLOSED(self, meas: Measurement, event: CircuitEvent): 

678 pass 

679 

680 def _notif_circ_event_FAILED(self, meas: Measurement, event: CircuitEvent): 

681 pass 

682 

683 def _notif_ffmeas_event_PARAMS_SENT( 

684 self, meas: Measurement, event: FFMeasEvent): 

685 ''' Received FF_MEAS event with type PARAMS_SENT 

686 

687 Log about it. There's nothing to do until we learn the relay's 

688 response, which we get later with PARAMS_OK. ''' 

689 log.info( 

690 'Meas %d sent params on circ %d to relay %s', 

691 meas.meas_id, meas.relay_circ, meas.relay_fp) 

692 return 

693 

694 def _notif_ffmeas_event_PARAMS_OK( 

695 self, meas: Measurement, event: FFMeasEvent): 

696 ''' Received FF_MEAS event with type PARAMS_OK 

697 

698 Check if the relay accepted them or not. Drop the Measurement if not 

699 accepted, otherwise continue on to the next stage: telling the 

700 measurers to connect to the relay. 

701 ''' 

702 if not event.accepted: 

703 # TODO: record as failed somehow pastly/flashflow#18 

704 log.warn( 

705 'Meas %d params not accepted: %s', 

706 meas.meas_id, event) 

707 del self.measurements[meas.meas_id] 

708 return 

709 assert event.accepted 

710 # TODO: num circs as a param pastly/flashflow#11 

711 m = msg.ConnectToRelay( 

712 meas.meas_id, meas.relay_fp, 10, meas.meas_duration) 

713 for measr in meas.measurers: 

714 assert measr.transport is not None 

715 measr.transport.write(m.serialize()) 

716 return 

717 

718 def _notif_ffmeas_event_BW_REPORT( 

719 self, meas: Measurement, event: FFMeasEvent): 

720 ''' Received FF_MEAS event with type BW_REPORT ''' 

721 meas.bg_reports.append((time.time(), event.sent, event.recv)) 

722 if self._have_all_bw_reports(meas): 

723 return self._finish_measurement(meas, None) 

724 return 

725 

726 def _notif_measr_msg_ConnectedToRelay( 

727 self, measr: MeasrProtocol, message: msg.ConnectedToRelay): 

728 meas_id = message.orig.meas_id 

729 if meas_id not in self.measurements: 

730 log.info( 

731 'Received ConnectedToRelay for unknown meas %d, dropping.', 

732 meas_id) 

733 return 

734 meas = self.measurements[meas_id] 

735 meas.ready_measurers.add(measr) 

736 ready_measr = meas.ready_measurers 

737 all_measr = meas.measurers 

738 log.debug( 

739 '%d/%d measurers are ready for meas %d', 

740 len(ready_measr), len(all_measr), meas_id) 

741 if len(ready_measr) == len(all_measr): 

742 log.debug('Sending Go message for meas %d', meas_id) 

743 ret = tor_client.send_msg( 

744 self.tor_client, CoordStartMeas( 

745 meas.meas_id, meas.relay_fp, meas.meas_duration)) 

746 if not ret.is_ok(): 

747 fail_msg = msg.Failure( 

748 msg.FailCode.C_START_ACTIVE_MEAS, meas_id, 

749 extra_info=str(ret)) 

750 log.error(fail_msg) 

751 for measr in ready_measr: 

752 assert measr.transport is not None 

753 measr.transport.write(fail_msg.serialize()) 

754 del self.measurements[meas_id] 

755 return 

756 for measr in ready_measr: 

757 go_msg = msg.Go(meas_id) 

758 assert measr.transport is not None 

759 measr.transport.write(go_msg.serialize()) 

760 return 

761 

762 def _notif_measr_msg_BwReport( 

763 self, measr: MeasrProtocol, message: msg.BwReport): 

764 meas_id = message.meas_id 

765 if meas_id not in self.measurements: 

766 log.info( 

767 'Received BwReport for unknown meas %d, dropping.', meas_id) 

768 return 

769 meas = self.measurements[meas_id] 

770 meas.measr_reports[measr].append(( 

771 message.ts, message.sent, message.recv)) 

772 if self._have_all_bw_reports(meas): 

773 return self._finish_measurement(meas, None) 

774 return 

775 

776 def _connect_to_relay(self, meas: Measurement): 

777 ''' Start the given measurement off by connecting ourselves to the 

778 necessary relay ''' 

779 # Sanity: we haven't launched a circuit for this measurement yet 

780 if meas.relay_circ is not None: 

781 log.error( 

782 'Ready to connect to relay, but meas %d already has circ %d', 

783 meas.meas_id, meas.relay_circ) 

784 return 

785 # Tell our tor to launch the circuit 

786 ret = tor_client.send_msg( 

787 self.tor_client, 

788 CoordStartMeas(meas.meas_id, meas.relay_fp, meas.meas_duration)) 

789 # Make sure it is LAUNCHED. Launched just means circuit construction is 

790 # started. BUILT is when the circuit is successfully built, and comes 

791 # later. 

792 if not ret.is_ok(): 

793 # TODO: record the failure somehow pastly/flashflow#18 

794 log.error( 

795 'Failed to launch circuit to %s: %s', 

796 meas.relay_fp, ret) 

797 del self.measurements[meas.meas_id] 

798 return 

799 # We expect to see "250 LAUNCHED <circ_id>", e.g. "250 LAUNCHED 24". 

800 # Get the circuit id out and save it for later use. 

801 code, _, content = ret.content()[0] 

802 assert code == '250' 

803 parts = content.split() 

804 if len(parts) != 2 or parts[0] != 'LAUNCHED': 

805 # TODO: record the failure somehow pastly/flashflow#18 

806 log.error( 

807 'Did not expect body of message to be: %s', content) 

808 del self.measurements[meas.meas_id] 

809 return 

810 meas.relay_circ = int(parts[1]) 

811 log.info( 

812 'Meas %d launched circ %d with relay %s', 

813 meas.meas_id, meas.relay_circ, meas.relay_fp) 

814 # That's all for now. We stay in this state until Tor tells us it has 

815 # finished building the circuit 

816 

817 def _generate_v3bw(self): 

818 ''' Generate a v3bw file using our latest results for each relay. ''' 

819 v3bw_fname = v3bw.gen( 

820 self.conf.getpath('v3bw', 'v3bw'), 

821 self.conf.getpath('coord', 'results_log'), 

822 self.conf.getfloat('v3bw', 'max_results_age')) 

823 log.debug(v3bw_fname) 

824 

825 # ######################################################################## 

826 # STATE CHANGE EVENTS. These are called when entering the specified state. 

827 # ######################################################################## 

828 

829 def on_enter_ENSURE_LISTEN_SOCKS(self): 

830 loop.call_soon(self._ensure_listen_socks) 

831 

832 def on_enter_ENSURE_CONN_W_TOR(self): 

833 loop.call_soon(self._ensure_conn_w_tor) 

834 

835 def on_enter_READY(self): 

836 loop.call_soon(self._now_ready) 

837 

838 def on_enter_NONFATAL_ERROR(self, err_msg): 

839 self._cleanup() 

840 log.error(err_msg) 

841 loop.call_soon(self.change_state_starting) 

842 

843 def on_enter_FATAL_ERROR(self): 

844 self._cleanup() 

845 self._die() 

846 

847 # ######################################################################## 

848 # MEASSAGES FROM MEASRs. These are called when a measurer tells us 

849 # something. 

850 # ######################################################################## 

851 

852 def notif_measr_msg(self, measr: MeasrProtocol, message: msg.FFMsg): 

853 ''' Receive a FFMsg object from one of our measurers ''' 

854 msg_type = type(message) 

855 state = self.state 

856 if msg_type == msg.ConnectedToRelay and state == States.READY: 856 ↛ 857line 856 didn't jump to line 857, because the condition on line 856 was never true

857 assert isinstance(message, msg.ConnectedToRelay) # so mypy knows 

858 return self._notif_measr_msg_ConnectedToRelay(measr, message) 

859 elif msg_type == msg.BwReport and state == States.READY: 859 ↛ 860line 859 didn't jump to line 860, because the condition on line 859 was never true

860 assert isinstance(message, msg.BwReport) # so mypy knows 

861 return self._notif_measr_msg_BwReport(measr, message) 

862 self.change_state_nonfatal_error( 

863 'Unexpected %s message received in state %s' % 

864 (msg_type, state)) 

865 

866 # ######################################################################## 

867 # MISC EVENTS. These are called from other parts of the coord code. 

868 # ######################################################################## 

869 

870 def notif_sslerror(self, exc: ssl.SSLError, trans): 

871 ''' Called from the last-chance exception handler to tell us about TLS 

872 errors. For example, measurer connected to us with a bad client cert 

873 ''' 

874 log.debug( 

875 'Someone (%s) failed to TLS handshake with us: %s', 

876 trans.get_extra_info('peername'), exc) 

877 trans.close() 

878 

879 def notif_measurer_connected(self, measurer: MeasrProtocol): 

880 ''' Called from MeasrProtocol when a connection is successfully made 

881 from a measurer ''' 

882 self.measurers.append(measurer) 

883 log.debug('Now have %d measurers', len(self.measurers)) 

884 

885 def notif_measurer_disconnected(self, measurer: MeasrProtocol): 

886 ''' Called from MeasrProtocol when a connection with a measurer has 

887 been lost ''' 

888 self.measurers = [m for m in self.measurers if m != measurer] 

889 log.debug('Measurer lost. Now have %d', len(self.measurers)) 

890 # TODO: need to do error stuff if they were a part of any measurements 

891 

892 def notif_ctrl_message(self, msg: str) -> Tuple[bool, str]: 

893 ''' Called from CtrlProtocol when a controller has given us a command. 

894 Returns (True, '') if the message seems like a good, actionable 

895 message. Otherwise returns False and a human-meaningful string with 

896 more information. ''' 

897 words = msg.lower().split() 

898 if not len(words): 

899 return False, 'Empty command?' 

900 command = words[0] 

901 if command == 'measure': 

902 if self.state != States.READY: 

903 return False, 'Not READY' 

904 log.debug('told to measure %s', words[1]) 

905 meas_id = next_meas_id() 

906 relay = find_relay(self.tor_client, words[1]) 

907 if not relay: 

908 return False, 'No such relay ' + words[1] 

909 relay_fp = relay.fingerprint 

910 meas = Measurement( 

911 meas_id, 

912 relay_fp, 

913 {_ for _ in self.measurers}, 

914 self.conf.getint('meas_params', 'meas_duration'), 

915 self.conf.getfloat('meas_params', 'bg_percent')) 

916 self.measurements[meas_id] = meas 

917 loop.call_soon(partial(self._connect_to_relay, meas)) 

918 return True, '' 

919 elif command == 'v3bw': 

920 self._generate_v3bw() 

921 return True, '' 

922 return False, 'Unknown ctrl command: ' + msg 

923 

924 def notif_circ_event(self, event: CircuitEvent): 

925 ''' Called from stem to tell us about CIRC events. We care about these 

926 when we are waiting on a circuit to be built with a relay. 

927 

928 We are currently in a different thread. We tell the main thread's loop 

929 (in a threadsafe manner) to handle this event. 

930 ''' 

931 loop.call_soon_threadsafe(partial(self._notif_circ_event, event)) 

932 

933 def _notif_circ_event(self, event: CircuitEvent): 

934 ''' The real CIRC event handler, in the main thread's loop. 

935 

936 Receive a CIRC event from our tor client. 

937 

938 We want to know about circuit events for the following reasons: 

939 - When we have recently launched our circuit with the relay and 

940 want to know when it is built so we can go to the next state 

941 - TODO failures 

942 - TOOD other reasons 

943 ''' 

944 # Try to find a Measurement that is interested in this CIRC event based 

945 # on the circ_id 

946 matching_meas_ids = [ 

947 meas_id for meas_id, meas in self.measurements.items() 

948 if meas.relay_circ == int(event.id)] 

949 # If none, then it's probably our tor client doing irrelevant things. 

950 # Ignore. 

951 if not len(matching_meas_ids): 

952 return 

953 # If more than one, serious programming issue. Drop. 

954 if len(matching_meas_ids) != 1: 

955 log.error( 

956 'It should not be possible for more than one Measurement to ' 

957 'be using the same circuit id %d. Not handling CIRC event: %s', 

958 int(event.id), event) 

959 return 

960 # Found it. Pass off control based on the type of CIRC event 

961 meas_id = matching_meas_ids[0] 

962 meas = self.measurements[meas_id] 

963 if event.status == CircStatus.BUILT: 

964 return self._notif_circ_event_BUILT(meas, event) 

965 elif event.status in [CircStatus.LAUNCHED, CircStatus.EXTENDED]: 

966 # ignore these 

967 return 

968 elif event.status == CircStatus.CLOSED: 

969 return self._notif_circ_event_CLOSED(meas, event) 

970 elif event.status == CircStatus.FAILED: 

971 return self._notif_circ_event_FAILED(meas, event) 

972 log.warn('Not handling CIRC event: %s', event) 

973 

974 def notif_ffmeas_event(self, event: FFMeasEvent): 

975 ''' Called from stem to tell us about FF_MEAS events. 

976 

977 We are currently in a different thread. We tell the main thread's loop 

978 (in a threadsafe manner) to handle this event. 

979 ''' 

980 loop.call_soon_threadsafe(partial(self._notif_ffmeas_event, event)) 

981 

982 def _notif_ffmeas_event(self, event: FFMeasEvent): 

983 ''' The real FF_MEAS event handler, in the main thread's loop. 

984 

985 Receive a FF_MEAS event from our tor client. 

986 ''' 

987 meas_id = event.meas_id 

988 if meas_id not in self.measurements: 

989 log.error( 

990 'Received FF_MEAS event with meas %d we don\'t know about: %s', 

991 meas_id, event) 

992 return 

993 meas = self.measurements[meas_id] 

994 if event.ffmeas_type == 'PARAMS_SENT': 

995 return self._notif_ffmeas_event_PARAMS_SENT(meas, event) 

996 elif event.ffmeas_type == 'PARAMS_OK': 

997 return self._notif_ffmeas_event_PARAMS_OK(meas, event) 

998 elif event.ffmeas_type == 'BW_REPORT': 

999 return self._notif_ffmeas_event_BW_REPORT(meas, event) 

1000 log.warn( 

1001 'Ignoring FF_MEAS event because unrecognized/unhandled type ' 

1002 '%s: %s', event.ffmeas_type, event) 

1003 return 

1004 

1005 

1006log = logging.getLogger(__name__) 

1007loop = asyncio.get_event_loop() 

1008machine: StateMachine 

1009 

1010 

1011def _exception_handler(loop, context): 

1012 ''' Last resort exception handler 

1013 

1014 This will only catch exceptions that happen in the main thread. Others very 

1015 well may go entirely unnoticed and unlogged. 

1016 

1017 Some exceptions are unexpected, so we end up here. For these we kill 

1018 ourselves after logging about the exception. 

1019 

1020 Other exceptions are impossible to catch before we get here. For example, a 

1021 client failing the TLS handshake with us. (ugh what the fuck). For these we 

1022 notify the state machine so it can react. 

1023 ''' 

1024 # Check for exceptions that should not be fatal and we should tell other 

1025 # parts of the code about so they can react intelligently 

1026 if 'exception' in context: 

1027 exception_type = type(context['exception']) 

1028 # Check for recoverable TLS errors 

1029 if exception_type == ssl.SSLError: 

1030 if 'transport' in context: 

1031 machine.notif_sslerror( 

1032 context['exception'], context['transport']) 

1033 return 

1034 else: 

1035 log.warn( 

1036 'SSLError caught without a transport too. Cannot pass ' + 

1037 'to state machine to handle gracefully.') 

1038 # Additional recoverable errors would continue here 

1039 # All other exceptions. These are fatal 

1040 log.error('%s', context['message']) 

1041 if 'exception' in context: 

1042 log.error('%s %s', type(context['exception']), context['exception']) 

1043 if 'handle' in context: 

1044 log.error(context['handle']) 

1045 if 'source_traceback' in context: 

1046 log.error('Traceback:') 

1047 summary = StackSummary.from_list(context['source_traceback']) 

1048 for line_super in summary.format(): 

1049 # The above line has multiple lines in it 

1050 for line in line_super.split('\n'): 

1051 if len(line): 

1052 log.error(' %s', line) 

1053 else: 

1054 log.error( 

1055 'Traceback not available. Maybe run with PYTHONASYNCIODEBUG=1') 

1056 machine.change_state_fatal_error() 

1057 

1058 

1059def _gen_concated_measr_cert_file( 

1060 d: str, coord_fname: str) -> Tuple[IO[str], str]: 

1061 ''' Search for measurer certs in the given directory (being careful to 

1062 ignore any file matching the given coord cert filename). Read them all into 

1063 a new temporary file and return its name. Will always return a filename, 

1064 even if it is empty. ''' 

1065 cert_fnames = _measr_cert_files(d, coord_fname) 

1066 # + indicates "updating" AKA reading and writing 

1067 fd = NamedTemporaryFile('w+') 

1068 for cert in cert_fnames: 

1069 with open(cert, 'rt') as fd_in: 

1070 fd.write(fd_in.read()) 

1071 fd.seek(0, 0) 

1072 log.debug('Stored %d measurer certs in %s', len(cert_fnames), fd.name) 

1073 return fd, fd.name 

1074 

1075 

1076def _measr_cert_files(d: str, coord_fname: str) -> List[str]: 

1077 ''' Look in the directory `d` for files ending with '.pem', recursively. If 

1078 any found file matches `coord_fname` by name exactly, then ignore it. 

1079 Return all other files found. If no allowed files are found, returns an 

1080 empty list. ''' 

1081 out = [] 

1082 for fname in glob.iglob(os.path.join(d, '*.pem'), recursive=True): 

1083 if fname == coord_fname: 

1084 continue 

1085 log.debug('Treating %s as a measurer cert file', fname) 

1086 out.append(fname) 

1087 return out 

1088 

1089 

1090def _ensure_relays_txt(conf): 

1091 fname = conf.getpath('coord', 'relay_filter_list') 

1092 if os.path.exists(fname): 

1093 return 

1094 with open(fname, 'wt') as fd_out, open(RELAYS_TXT_FNAME, 'rt') as fd_in: 

1095 fd_out.write(fd_in.read()) 

1096 

1097 

1098def gen_parser(sub) -> ArgumentParser: 

1099 ''' Add the cmd line options for this FlashFlow command ''' 

1100 d = 'Run as a FlashFlow coordinator.' 

1101 p = sub.add_parser('coord', description=d) 

1102 return p 

1103 

1104 

1105# This function needs **some sort** of type annotation so that mypy will check 

1106# the things it does. Adding the return value (e.g. '-> None') is enough 

1107def main(args, conf) -> None: 

1108 global machine 

1109 os.makedirs(conf.getpath('coord', 'datadir'), mode=0o700, exist_ok=True) 

1110 os.makedirs(conf.getpath('coord', 'keydir'), mode=0o700, exist_ok=True) 

1111 _ensure_relays_txt(conf) 

1112 machine = StateMachine(conf) 

1113 loop.set_exception_handler(_exception_handler) 

1114 loop.call_soon(machine.change_state_starting) 

1115 try: 

1116 loop.run_forever() 

1117 finally: 

1118 loop.run_until_complete(loop.shutdown_asyncgens()) 

1119 loop.close() 

1120 return