Hide keyboard shortcuts

Hot-keys on this page

r m x p   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

1from argparse import ArgumentParser 

2from functools import partial 

3from traceback import StackSummary 

4import asyncio 

5import enum 

6import logging 

7import ssl 

8import time 

9import os 

10from stem import CircStatus # type: ignore 

11from stem.control import Controller, EventType # type: ignore 

12from stem.response.events import CircuitEvent, FFMeasEvent # type: ignore 

13from transitions import Machine # type: ignore 

14from typing import Tuple, Union, Set, Dict 

15from .. import tor_client 

16from .. import msg 

17from ..tor_ctrl_msg import MeasrStartMeas 

18 

19 

20class CoordProtocol(asyncio.Protocol): 

21 transport = None 

22 

23 def connection_made(self, transport): 

24 log.debug('Connected to coord') 

25 self.transport = transport 

26 

27 def connection_lost(self, exc): 

28 machine.change_state_nonfatal_error('Lost connection with coord') 

29 pass 

30 

31 def data_received(self, data: bytes): 

32 ''' Receive data from the coordinator. Parse it into a FFMsg and tell 

33 other code about the message. 

34 

35 It's possible that this is called before the entire message is 

36 received. In that case, we'll need to edit this function to buffer 

37 bytes until the entire message has arrived. ''' 

38 log.info('Received %d bytes: %s', len(data), data) 

39 m = msg.FFMsg.deserialize(data) 

40 machine.notif_coord_msg(m) 

41 

42 

43class Measurement: 

44 ''' State related to a single measurement. ''' 

45 #: keep a copy of :class:`flashflow.msg.ConnectToRelay` command so we can 

46 #: send it back to the coord when we're ready to go (or have failed) 

47 connect_msg: msg.ConnectToRelay 

48 #: Our circuit ids with the relay. Filled in once we know what they are 

49 #: (they're launched) but not yet bullt 

50 circs: Set[int] 

51 #: Our built circuit ids with the relay. Filled in as we learn of launched 

52 #: circuits becoming built. 

53 ready_circs: Set[int] 

54 #: Our circuit ids that we've been told have CLOSED or FAILED at any point 

55 bad_circs: Set[int] 

56 

57 def __init__(self, connect_msg: msg.ConnectToRelay): 

58 self.connect_msg = connect_msg 

59 self.circs = set() 

60 self.ready_circs = set() 

61 self.bad_circs = set() 

62 

63 @property 

64 def meas_id(self) -> int: 

65 ''' The measurement ID ''' 

66 return self.connect_msg.meas_id 

67 

68 @property 

69 def relay_fp(self) -> str: 

70 ''' The fingerprint of the relay to measure ''' 

71 return self.connect_msg.fp 

72 

73 @property 

74 def meas_duration(self) -> int: 

75 ''' The duration, in seconds, that active measurement should last. ''' 

76 return self.connect_msg.dur 

77 

78 @property 

79 def waiting_circs(self) -> Set[int]: 

80 ''' Circs that we have LAUNCHED but have not yet added to ready_circs 

81 because we haven't seen BUILT yet. 

82 

83 Note that as far as this function is concerned, there's no such thing 

84 as a circuit becoming un-BUILT. This functiion doesn't know anything 

85 about circuits closing. Other code needs to manipulate circs and 

86 ready_circs as it deems fit. 

87 ''' 

88 return self.circs - self.ready_circs 

89 

90 

91class States(enum.Enum): 

92 ''' States that we, as a FlashFlow measurer, can be in. ''' 

93 #: State in which we are created and to which we return when there's a 

94 #: non-fatal error 

95 START = enum.auto() 

96 #: First "real" state. Launch a tor client and connect to it. 

97 ENSURE_CONN_W_TOR = enum.auto() 

98 #: Second real state. Connect to the coordinator. 

99 ENSURE_CONN_W_COORD = enum.auto() 

100 #: Normal state. We're doing measurements or waiting to be told to do them. 

101 #: We are usually here. 

102 READY = enum.auto() 

103 #: There was some sort of error that calls for cleaning everything up and 

104 #: essentially relaunching, but we shouldn't outright die. 

105 NONFATAL_ERROR = enum.auto() 

106 #: There is a serious error that isn't recoverable. Just cleanup and die. 

107 FATAL_ERROR = enum.auto() 

108 

109 

110class StateMachine(Machine): 

111 ''' State machine and main control flow hub for FlashFlow measurer. 

112 

113 change_state_*: 

114 State transitions are named change_state_* and don't exist here in the 

115 code. The Machine class takes care of making them based on the triggers 

116 in the list of possible transitions. For example: change_state_starting 

117 is named as the trigger for transitions from either START or 

118 NONFATAL_ERROR into ENSURE_CONN_W_TOR. 

119 

120 on_enter_*: 

121 This is how the Machine class finds functions to call upon entering the 

122 given state. For example, on_enter_NONFATAL_ERROR() is called when we 

123 are transitioning to the NONFATAL_ERROR state. These functions should 

124 be kept short. Significant work/logic should be done in other functions 

125 that these call or schedule for calling later. 

126 

127 _*: 

128 Other internal functions. See their documentation for more information 

129 on them. 

130 ''' 

131 # conf # This is set in __init__ 

132 tor_client: Controller 

133 # how we communicate with the coord 

134 coord_trans: asyncio.WriteTransport 

135 coord_proto: CoordProtocol 

136 measurements: Dict[int, Measurement] 

137 

138 def __init__(self, conf): 

139 self.conf = conf 

140 self.measurements = {} 

141 super().__init__( 

142 model=self, 

143 states=States, 

144 transitions=[ 

145 { 

146 'trigger': 'change_state_starting', 

147 'source': [States.START, States.NONFATAL_ERROR], 

148 'dest': States.ENSURE_CONN_W_TOR, 

149 }, 

150 { 

151 'trigger': 'change_state_connected_to_tor', 

152 'source': States.ENSURE_CONN_W_TOR, 

153 'dest': States.ENSURE_CONN_W_COORD, 

154 }, 

155 { 

156 'trigger': 'change_state_connected_to_coord', 

157 'source': States.ENSURE_CONN_W_COORD, 

158 'dest': States.READY, 

159 }, 

160 { 

161 'trigger': 'change_state_nonfatal_error', 

162 'source': '*', 

163 'dest': States.NONFATAL_ERROR, 

164 }, 

165 { 

166 'trigger': 'change_state_fatal_error', 

167 'source': '*', 

168 'dest': States.FATAL_ERROR, 

169 }, 

170 ], 

171 initial=States.START, 

172 # Do not create .to_<state>() methods, which allow transition to 

173 # <state> regardless of current state 

174 auto_transitions=False, 

175 ) 

176 

177 def _ensure_conn_w_tor(self): 

178 ''' Main function in the ENSURE_CONN_W_TOR state. Launch a tor client 

179 and connect to it. Save the Controller object. ''' 

180 assert self.state == States.ENSURE_CONN_W_TOR 

181 # TODO: what happens if tor client disappears? Exception thrown? What?? 

182 # And what should we do about it? Try to relaunch? Just die? Choose 

183 # **something** 

184 c = tor_client.launch( 

185 self.conf.getpath('tor', 'tor_bin'), 

186 self.conf.getpath('measurer', 'tor_datadir'), 

187 self.conf.get('tor', 'torrc_extra_lines') 

188 ) 

189 if not c: 

190 log.error('Unable to launch and connect to tor client') 

191 self.change_state_fatal_error() 

192 return 

193 c.add_event_listener(self.notif_circ_event, EventType.CIRC) 

194 c.add_event_listener(self.notif_ffmeas_event, EventType.FF_MEAS) 

195 self.tor_client = c 

196 self.change_state_connected_to_tor() 

197 

198 def _ensure_conn_w_coord(self, delay: float): 

199 ''' Main function in the ENSURE_CONN_W_COORD state. Repeatedly try 

200 connecting to the coordinator until we are successful or have a fatal 

201 error warranting completely giving up on life. 

202 

203 This function uses asynchronous python: the connection is represented 

204 by a transport and protocol, and we try connecting asynchronously and 

205 use a callback to find out the result. That said, the work done here 

206 should probably be the only thing going on. 

207 ''' 

208 assert self.state == States.ENSURE_CONN_W_COORD 

209 # TODO: what if connection goes away? 

210 # Get the (host, port) from "host:port" 

211 coord_addr_port = self.conf.getaddr('measurer', 'coord_addr') 

212 if coord_addr_port is None: 

213 log.error('Don\'t know where coord is') 

214 self.change_state_fatal_error() 

215 return 

216 

217 # Callback to get the result of one connection attempt. If it didn't 

218 # work and it wasn't fatal, schedule calling this function again some 

219 # time in the future. If fatal, die. If successful, save the transport 

220 # and protocol and move on! 

221 def cb(fut): 

222 nonlocal delay 

223 # It's possible that the programmer didn't catch all exceptions. 

224 # If the result is an exception, this *should* bubble up to the 

225 # default exception handler, _exception_handler(...). 

226 success_code, stuff_or_error = fut.result() 

227 # Now check if we were successful, fatally unable to connect, or if 

228 # we should retry. 

229 if success_code == CoordConnRes.FATAL_ERROR: 

230 log.error( 

231 'Fatal error connecting to coordinator: %s', 

232 stuff_or_error) 

233 self.change_state_fatal_error() 

234 return 

235 elif success_code == CoordConnRes.RETRY_ERROR: 

236 delay = min(2 * delay, 60) 

237 log.warn( 

238 'Unable to connect to coordinator: %s. Retrying in %.2fs.', 

239 stuff_or_error, delay) 

240 loop.call_later( 

241 delay, partial(self._ensure_conn_w_coord, delay)) 

242 return 

243 assert success_code == CoordConnRes.SUCCESS 

244 assert not isinstance(stuff_or_error, str) 

245 self.coord_trans, self.coord_proto = stuff_or_error 

246 self.change_state_connected_to_coord() 

247 # Kick off the asyncronous attempt to connect and attach the above 

248 # callback so we can get the result. 

249 task = asyncio.Task(_try_connect_to_coord( 

250 coord_addr_port, 

251 self.conf.getpath('measurer', 'key'), 

252 self.conf.getpath('measurer', 'coord_cert'), 

253 )) 

254 task.add_done_callback(cb) 

255 # This is asynchronous python. We end immediately and the callback will 

256 # eventually be called with the connection results. Nothing left to do 

257 # for now. 

258 

259 def _complete_cleanup(self): 

260 ''' Cleanup all of our state while being very careful to not allow any 

261 exceptions to bubble up. Use this when in an error state and you want 

262 to cleanup before starting over or just dying. ''' 

263 if hasattr(self, 'tor_client') and self.tor_client: 

264 log.info('cleanup: closing tor') 

265 try: 

266 self.tor_client.close() 

267 except Exception as e: 

268 log.error('Error closing tor: %s', e) 

269 if hasattr(self, 'coord_trans') and self.coord_trans: 

270 log.info('cleanup: closing coord transport') 

271 try: 

272 self.coord_trans.close() 

273 except Exception as e: 

274 log.error('Error closing transport with coord: %s', e) 

275 if hasattr(self, 'coord_proto') and self.coord_proto: 

276 # nothing to do 

277 pass 

278 if hasattr(self, 'measurements') and self.measurements: 

279 log.info( 

280 'cleanup: forgetting about %d measurements', 

281 len(self.measurements)) 

282 self.measurements = {} 

283 

284 def _die(self): 

285 ''' End execution of the program. ''' 

286 loop.stop() 

287 

288 # ######################################################################## 

289 # STATE CHANGE EVENTS. These are called when entering the specified state. 

290 # ######################################################################## 

291 

292 def on_enter_READY(self): 

293 pass 

294 

295 def on_enter_ENSURE_CONN_W_TOR(self): 

296 loop.call_soon(self._ensure_conn_w_tor) 

297 

298 def on_enter_ENSURE_CONN_W_COORD(self): 

299 loop.call_soon(partial(self._ensure_conn_w_coord, 0.5)) 

300 

301 def on_enter_NONFATAL_ERROR(self, err_msg: str): 

302 log.error('nonfatal error: %s', err_msg) 

303 loop.call_soon(self._complete_cleanup) 

304 loop.call_soon(self.change_state_starting) 

305 

306 def on_enter_FATAL_ERROR(self): 

307 # log.error('We encountered a fatal error :(') 

308 self._complete_cleanup() 

309 self._die() 

310 

311 # ######################################################################## 

312 # MESSAGES FROM COORD. These are called when the coordinator tells us 

313 # something. 

314 # ######################################################################## 

315 

316 def notif_coord_msg(self, message: msg.FFMsg): 

317 msg_type = type(message) 

318 if self.state != States.READY: 

319 log.warn( 

320 'Coord sent us message but we are not ready. Dropping. %s', 

321 message) 

322 return 

323 # The asserts below are for shutting up mypy 

324 if msg_type == msg.ConnectToRelay: 

325 assert isinstance(message, msg.ConnectToRelay) 

326 return self._notif_coord_msg_ConnectToRelay(message) 

327 elif msg_type == msg.Failure: 

328 assert isinstance(message, msg.Failure) 

329 return self._notif_coord_msg_Failure(message) 

330 elif msg_type == msg.Go: 

331 assert isinstance(message, msg.Go) 

332 return self._notif_coord_msg_Go(message) 

333 log.warn( 

334 'Unexpected/unhandled %s message. Dropping. %s', 

335 msg_type, message) 

336 

337 def _notif_coord_msg_ConnectToRelay(self, message: msg.ConnectToRelay): 

338 # caller should have verified and logged about this already 

339 assert self.state == States.READY 

340 meas_id = message.meas_id 

341 if meas_id in self.measurements: 

342 fail_msg = msg.Failure(msg.FailCode.M_DUPE_MEAS_ID, meas_id) 

343 log.error(fail_msg) 

344 self.coord_trans.write(fail_msg.serialize()) 

345 return 

346 meas = Measurement(message) 

347 ret = tor_client.send_msg( 

348 self.tor_client, 

349 MeasrStartMeas( 

350 meas.meas_id, meas.relay_fp, message.n_circs, 

351 meas.meas_duration)) 

352 # Make sure the circuit launches went well. Note they aren't built yet. 

353 # It's just that tor found nothing obviously wrong with trying to build 

354 # these circuits. 

355 if not ret.is_ok(): 

356 fail_msg = msg.Failure( 

357 msg.FailCode.LAUNCH_CIRCS, meas_id, 

358 extra_info=str(ret)) 

359 log.error(fail_msg) 

360 self.coord_trans.write(fail_msg.serialize()) 

361 return 

362 # We expect to see "250 FF_MEAS 0 LAUNCHED CIRCS=1,2,3,4,5", where the 

363 # 0 is the measurement ID we told the tor client, and the actual list 

364 # of launched circuits is CIRCS the comma-separated list 

365 code, _, content = ret.content()[0] 

366 # Already checked this above with ret.is_ok() 

367 assert code == '250' 

368 parts = content.split() 

369 if len(parts) != 4 or \ 

370 not parts[0] == 'FF_MEAS' or \ 

371 not parts[2] == 'LAUNCHED' or \ 

372 not parts[3].startswith('CIRCS='): 

373 fail_msg = msg.Failure( 

374 msg.FailCode.MALFORMED_TOR_RESP, meas_id, 

375 extra_info=str(ret)) 

376 log.error(fail_msg) 

377 self.coord_trans.write(fail_msg.serialize()) 

378 return 

379 meas.circs.update({ 

380 int(circ_id_str) for circ_id_str in 

381 parts[3].split('=')[1].split(',') 

382 }) 

383 log.info( 

384 'Launched %d circuits with relay %s: %s', len(meas.circs), 

385 meas.relay_fp, meas.circs) 

386 self.measurements[meas_id] = meas 

387 # That's all for now. We stay in this state until Tor tells us it has 

388 # finished building all circuits 

389 

390 def _notif_coord_msg_Go(self, go_msg: msg.Go): 

391 # caller should have verified and logged about this already 

392 assert self.state == States.READY 

393 meas_id = go_msg.meas_id 

394 if meas_id not in self.measurements: 

395 fail_msg = msg.Failure(msg.FailCode.M_UNKNOWN_MEAS_ID, meas_id) 

396 log.error(fail_msg) 

397 self.coord_trans.write(fail_msg.serialize()) 

398 # TODO: cleanup Measurement 

399 return 

400 meas = self.measurements[meas_id] 

401 start_msg = MeasrStartMeas( 

402 meas.meas_id, meas.relay_fp, len(meas.ready_circs), 

403 meas.meas_duration) 

404 ret = tor_client.send_msg(self.tor_client, start_msg) 

405 if not ret.is_ok(): 

406 fail_msg = msg.Failure(msg.FailCode.M_START_ACTIVE_MEAS, meas_id) 

407 log.error(fail_msg) 

408 self.coord_trans.write(fail_msg.serialize()) 

409 # TODO: cleanup Measurement 

410 return 

411 

412 def _notif_coord_msg_Failure(self, fail_msg: msg.Failure): 

413 # caller should have verified and logged about this already 

414 assert self.state == States.READY 

415 meas_id = fail_msg.meas_id 

416 log.warn( 

417 'Received FailCode %s regarding meas id %s', 

418 fail_msg.code.name, meas_id) 

419 if meas_id is None: 

420 return 

421 if meas_id not in self.measurements: 

422 log.info( 

423 'Unknown measurement %d in Failure message, so nothing to do', 

424 meas_id) 

425 return 

426 # TODO: uhh ... close circuits or something? Other clean up? 

427 

428 # ######################################################################## 

429 # MISC EVENTS. These are called from other parts of the measr code. 

430 # ######################################################################## 

431 

432 def notif_ffmeas_event(self, event: FFMeasEvent): 

433 ''' Called from stem to tell us about FF_MEAS events. 

434 

435 These events come from a different thread. We tell the main thread's 

436 loop (in a threadsafe manner) to handle this event in the similarly 

437 named function with a leading underscore. 

438 ''' 

439 loop.call_soon_threadsafe(partial(self._notif_ffmeas_event, event)) 

440 

441 def _notif_ffmeas_event(self, event: FFMeasEvent): 

442 ''' Actually handle the FF_MEAS event. 

443 

444 We look for: 

445 - per-second BW_REPORTs of the amount of measurement traffic sent and 

446 received, and we will fowarded those on to the coordinator. 

447 - a END message at the end signally success. 

448 ''' 

449 if event.ffmeas_type == 'BW_REPORT': 

450 log.debug( 

451 'Forwarding report of %d/%d sent/recv meas bytes', 

452 event.sent, event.recv) 

453 report = msg.BwReport( 

454 event.meas_id, time.time(), event.sent, event.recv) 

455 self.coord_trans.write(report.serialize()) 

456 return 

457 elif event.ffmeas_type == 'END': 

458 log.info( 

459 'Tor client tells us meas %d finished %ssuccessfully%s', 

460 event.meas_id, '' if event.success else 'un', 

461 '. Cleaning up.' if event.meas_id in self.measurements else 

462 ', but we don\'t know about it. Dropping.') 

463 if event.meas_id not in self.measurements: 

464 return 

465 del self.measurements[event.meas_id] 

466 return 

467 log.warn( 

468 'Unexpected FF_MEAS event type %s. Dropping.', event.ffmeas_type) 

469 return 

470 

471 def notif_circ_event(self, event: CircuitEvent): 

472 ''' Called from stem to tell us about circuit events. 

473 

474 These events come from a different thread. We tell the main thread's 

475 loop (in a threadsafe manner) to handle this event in the similarly 

476 named function with a leading underscore. 

477 ''' 

478 loop.call_soon_threadsafe(partial(self._notif_circ_event, event)) 

479 

480 def _notif_circ_event(self, event: CircuitEvent): 

481 ''' Actually handle the circuit event. We usually don't care, but 

482 sometimes we are waiting on circuits to be built with a relay. 

483 

484 This runs in the main thread's loop unlike the similarly named function 

485 (without a leading underscore) that tells the loop to call us. 

486 ''' 

487 circ_id = int(event.id) 

488 # We don't care about anything unless we're in the main state where we 

489 # do measurements 

490 if self.state != States.READY: 

491 return 

492 # Make sure it's a circuit we care about 

493 all_circs: Set[int] = set.union( 

494 # in case there's no measurements, add empty set to avoid errors 

495 set(), 

496 *[meas.circs for meas in self.measurements.values()]) 

497 waiting_circs: Set[int] = set.union( 

498 # in case there's no measurements, add empty set to avoid errors 

499 set(), 

500 *[meas.waiting_circs for meas in self.measurements.values()]) 

501 if circ_id not in all_circs: 

502 # log.warn( 

503 # 'Ignoring CIRC event not for us. %d not in any ' 

504 # 'measurement\'s set of all circuits', 

505 # circ_id) 

506 return 

507 # Act based on the type of CIRC event 

508 if event.status == CircStatus.BUILT: 

509 if circ_id not in waiting_circs: 

510 log.warn( 

511 'CIRC BUILT event for circ %d we do care about but that ' 

512 'isn\'t waiting. Shouldn\'t be possible. %s. Ignoring.', 

513 circ_id, event) 

514 return 

515 # Tell all interested Measurements (should just be one, but do all 

516 # that claim to care about this circuit, just in case) that the 

517 # circuit is built 

518 for meas in self.measurements.values(): 

519 if circ_id not in meas.circs: 

520 continue 

521 meas.ready_circs.add(circ_id) 

522 log.debug( 

523 'Circ %d added to meas %d\'s built circs. Now ' 

524 'have %d/%d', circ_id, meas.meas_id, 

525 len(meas.ready_circs), len(meas.circs)) 

526 # If all are built, then tell coord this measurement is ready 

527 if len(meas.ready_circs) < len(meas.circs): 

528 continue 

529 log.info('Meas %d built all circs', meas.meas_id) 

530 self.coord_trans.write(msg.ConnectedToRelay( 

531 meas.connect_msg).serialize()) 

532 return 

533 elif event.status in [CircStatus.LAUNCHED, CircStatus.EXTENDED]: 

534 # ignore these 

535 return 

536 elif event.status in [CircStatus.CLOSED, CircStatus.FAILED]: 

537 # Tell all interested Measurements (should just be one, but do all 

538 # that claim to care about this circuit, just in case) that the 

539 # circuit has closed or failed 

540 for meas in self.measurements.values(): 

541 if circ_id not in meas.circs: 

542 continue 

543 meas.bad_circs.add(circ_id) 

544 log.info( 

545 'Meas %d\'s circ %d is now closed/failed: %s', 

546 meas.meas_id, circ_id, event) 

547 return 

548 # It's for us, but don't know how to handle it yet 

549 log.warn('Not handling CIRC event for us: %s', event) 

550 

551 

552class CoordConnRes(enum.Enum): 

553 ''' Part of the return value of :meth:`_try_connect_to_coord`. ''' 

554 #: We successfully connected to the coord, shook our TLS hands, and all is 

555 #: well. 

556 SUCCESS = enum.auto() 

557 #: We were not successful, but whatever happened may be temporary and it's 

558 #: logical to try connecting again in the future. 

559 RETRY_ERROR = enum.auto() 

560 #: We were not successful, and trying again in the future is extremely 

561 #: unlikely to be successful. We should give up. 

562 FATAL_ERROR = enum.auto() 

563 

564 

565async def _try_connect_to_coord( 

566 addr_port: Tuple[str, int], 

567 our_key: str, 

568 coord_cert: str, 

569) -> Tuple[ 

570 CoordConnRes, Union[ 

571 str, Tuple[asyncio.BaseTransport, asyncio.BaseProtocol]]]: 

572 ''' Try to connect to the coordinator at the given (host, port) tuple. 

573 Perform the TLS handshake using our client TLS key in the file `our_key` 

574 and only trusting the coord server cert in the file `coord_cert`. 

575 

576 Returns a tuple in all cases. The first item indicates success with 

577 CoordConnRes. If it is an *_ERROR, then the second item is a string with 

578 more details. If it is SUCCESS, then the second item is the transport and 

579 protocol with the coordinator. 

580 

581 This function is a coroutine and all exceptions **should** be handled 

582 within this function's body. If they aren't, that's a programming error. 

583 To handle the case of unhandled exceptions, wrap this function in a 

584 Task/Future, then catch and handle the generic Exception. 

585 

586 def cb(fut): 

587 # handle the completion of the Task, whether successful or not 

588 pass 

589 task = asyncio.Task(_try_connect_to_coord(...)) 

590 task.add_done_callback(cb) 

591 try: 

592 result = task.result() 

593 except Exception as e: 

594 log.error( 

595 'An unhandled exception occurred. Tell your programmer: %s', e) 

596 # Additional code to handle the error, as necessary 

597 ''' 

598 if not os.path.isfile(our_key): 

599 return CoordConnRes.FATAL_ERROR, our_key + ' does not exist' 

600 if not os.path.isfile(coord_cert): 

601 return CoordConnRes.FATAL_ERROR, coord_cert + ' does not exist' 

602 ssl_context = ssl.SSLContext() 

603 # Load our TLS private key and certificate 

604 ssl_context.load_cert_chain(our_key) 

605 # Load the certificate of the coord 

606 ssl_context.load_verify_locations(coord_cert) 

607 ssl_context.verify_mode = ssl.CERT_REQUIRED 

608 try: 

609 res = await loop.create_connection( 

610 CoordProtocol, 

611 addr_port[0], 

612 addr_port[1], 

613 ssl=ssl_context, 

614 ) 

615 except OSError as e: 

616 return CoordConnRes.RETRY_ERROR, str(e) 

617 return CoordConnRes.SUCCESS, res 

618 

619 

620def _exception_handler(loop, context): 

621 log.error('%s', context['message']) 

622 if 'exception' in context: 

623 log.error(context['exception']) 

624 if 'handle' in context: 

625 log.error(context['handle']) 

626 if 'source_traceback' in context: 

627 log.error('Traceback:') 

628 summary = StackSummary.from_list(context['source_traceback']) 

629 for line_super in summary.format(): 

630 # The above line has multiple lines in it 

631 for line in line_super.split('\n'): 

632 if len(line): 

633 log.error(' %s', line) 

634 else: 

635 log.error('Traceback not available. Run with PYTHONASYNCIODEBUG=1') 

636 machine.change_state_fatal_error() 

637 

638 

639# # Not sure if this would actually work here. Maybe add to the logging config 

640# # file? 

641# # https://docs.python.org/3.6/library/asyncio-dev.html#logging 

642# logging.getLogger('asyncio').setLevel(logging.WARNING) 

643log = logging.getLogger(__name__) 

644loop = asyncio.get_event_loop() 

645machine: StateMachine 

646 

647 

648def gen_parser(sub) -> ArgumentParser: 

649 ''' Add the cmd line options for this FlashFlow command ''' 

650 d = 'Run as a FlashFlow measurer.' 

651 p = sub.add_parser('measurer', description=d) 

652 return p 

653 

654 

655# This function needs **some sort** of type annotation so that mypy will check 

656# the things it does. Adding the return value (e.g. '-> None') is enough 

657def main(args, conf) -> None: 

658 global machine 

659 os.makedirs(conf.getpath('measurer', 'datadir'), mode=0o700, exist_ok=True) 

660 os.makedirs(conf.getpath('measurer', 'keydir'), mode=0o700, exist_ok=True) 

661 machine = StateMachine(conf) 

662 loop.set_exception_handler(_exception_handler) 

663 loop.call_soon(machine.change_state_starting) 

664 try: 

665 loop.run_forever() 

666 finally: 

667 loop.run_until_complete(loop.shutdown_asyncgens()) 

668 loop.close() 

669 return