Hot-keys on this page
r m x p toggle line displays
j k next/prev highlighted chunk
0 (zero) top of page
1 (one) first highlighted chunk
1from argparse import ArgumentParser
2from functools import partial
3from traceback import StackSummary
4import asyncio
5import enum
6import logging
7import ssl
8import time
9import os
10from stem import CircStatus # type: ignore
11from stem.control import Controller, EventType # type: ignore
12from stem.response.events import CircuitEvent, FFMeasEvent # type: ignore
13from transitions import Machine # type: ignore
14from typing import Tuple, Union, Set, Dict
15from .. import tor_client
16from .. import msg
17from ..tor_ctrl_msg import MeasrStartMeas
20class CoordProtocol(asyncio.Protocol):
21 transport = None
23 def connection_made(self, transport):
24 log.debug('Connected to coord')
25 self.transport = transport
27 def connection_lost(self, exc):
28 machine.change_state_nonfatal_error('Lost connection with coord')
29 pass
31 def data_received(self, data: bytes):
32 ''' Receive data from the coordinator. Parse it into a FFMsg and tell
33 other code about the message.
35 It's possible that this is called before the entire message is
36 received. In that case, we'll need to edit this function to buffer
37 bytes until the entire message has arrived. '''
38 log.info('Received %d bytes: %s', len(data), data)
39 m = msg.FFMsg.deserialize(data)
40 machine.notif_coord_msg(m)
43class Measurement:
44 ''' State related to a single measurement. '''
45 #: keep a copy of :class:`flashflow.msg.ConnectToRelay` command so we can
46 #: send it back to the coord when we're ready to go (or have failed)
47 connect_msg: msg.ConnectToRelay
48 #: Our circuit ids with the relay. Filled in once we know what they are
49 #: (they're launched) but not yet bullt
50 circs: Set[int]
51 #: Our built circuit ids with the relay. Filled in as we learn of launched
52 #: circuits becoming built.
53 ready_circs: Set[int]
54 #: Our circuit ids that we've been told have CLOSED or FAILED at any point
55 bad_circs: Set[int]
57 def __init__(self, connect_msg: msg.ConnectToRelay):
58 self.connect_msg = connect_msg
59 self.circs = set()
60 self.ready_circs = set()
61 self.bad_circs = set()
63 @property
64 def meas_id(self) -> int:
65 ''' The measurement ID '''
66 return self.connect_msg.meas_id
68 @property
69 def relay_fp(self) -> str:
70 ''' The fingerprint of the relay to measure '''
71 return self.connect_msg.fp
73 @property
74 def meas_duration(self) -> int:
75 ''' The duration, in seconds, that active measurement should last. '''
76 return self.connect_msg.dur
78 @property
79 def waiting_circs(self) -> Set[int]:
80 ''' Circs that we have LAUNCHED but have not yet added to ready_circs
81 because we haven't seen BUILT yet.
83 Note that as far as this function is concerned, there's no such thing
84 as a circuit becoming un-BUILT. This functiion doesn't know anything
85 about circuits closing. Other code needs to manipulate circs and
86 ready_circs as it deems fit.
87 '''
88 return self.circs - self.ready_circs
91class States(enum.Enum):
92 ''' States that we, as a FlashFlow measurer, can be in. '''
93 #: State in which we are created and to which we return when there's a
94 #: non-fatal error
95 START = enum.auto()
96 #: First "real" state. Launch a tor client and connect to it.
97 ENSURE_CONN_W_TOR = enum.auto()
98 #: Second real state. Connect to the coordinator.
99 ENSURE_CONN_W_COORD = enum.auto()
100 #: Normal state. We're doing measurements or waiting to be told to do them.
101 #: We are usually here.
102 READY = enum.auto()
103 #: There was some sort of error that calls for cleaning everything up and
104 #: essentially relaunching, but we shouldn't outright die.
105 NONFATAL_ERROR = enum.auto()
106 #: There is a serious error that isn't recoverable. Just cleanup and die.
107 FATAL_ERROR = enum.auto()
110class StateMachine(Machine):
111 ''' State machine and main control flow hub for FlashFlow measurer.
113 change_state_*:
114 State transitions are named change_state_* and don't exist here in the
115 code. The Machine class takes care of making them based on the triggers
116 in the list of possible transitions. For example: change_state_starting
117 is named as the trigger for transitions from either START or
118 NONFATAL_ERROR into ENSURE_CONN_W_TOR.
120 on_enter_*:
121 This is how the Machine class finds functions to call upon entering the
122 given state. For example, on_enter_NONFATAL_ERROR() is called when we
123 are transitioning to the NONFATAL_ERROR state. These functions should
124 be kept short. Significant work/logic should be done in other functions
125 that these call or schedule for calling later.
127 _*:
128 Other internal functions. See their documentation for more information
129 on them.
130 '''
131 # conf # This is set in __init__
132 tor_client: Controller
133 # how we communicate with the coord
134 coord_trans: asyncio.WriteTransport
135 coord_proto: CoordProtocol
136 measurements: Dict[int, Measurement]
138 def __init__(self, conf):
139 self.conf = conf
140 self.measurements = {}
141 super().__init__(
142 model=self,
143 states=States,
144 transitions=[
145 {
146 'trigger': 'change_state_starting',
147 'source': [States.START, States.NONFATAL_ERROR],
148 'dest': States.ENSURE_CONN_W_TOR,
149 },
150 {
151 'trigger': 'change_state_connected_to_tor',
152 'source': States.ENSURE_CONN_W_TOR,
153 'dest': States.ENSURE_CONN_W_COORD,
154 },
155 {
156 'trigger': 'change_state_connected_to_coord',
157 'source': States.ENSURE_CONN_W_COORD,
158 'dest': States.READY,
159 },
160 {
161 'trigger': 'change_state_nonfatal_error',
162 'source': '*',
163 'dest': States.NONFATAL_ERROR,
164 },
165 {
166 'trigger': 'change_state_fatal_error',
167 'source': '*',
168 'dest': States.FATAL_ERROR,
169 },
170 ],
171 initial=States.START,
172 # Do not create .to_<state>() methods, which allow transition to
173 # <state> regardless of current state
174 auto_transitions=False,
175 )
177 def _ensure_conn_w_tor(self):
178 ''' Main function in the ENSURE_CONN_W_TOR state. Launch a tor client
179 and connect to it. Save the Controller object. '''
180 assert self.state == States.ENSURE_CONN_W_TOR
181 # TODO: what happens if tor client disappears? Exception thrown? What??
182 # And what should we do about it? Try to relaunch? Just die? Choose
183 # **something**
184 c = tor_client.launch(
185 self.conf.getpath('tor', 'tor_bin'),
186 self.conf.getpath('measurer', 'tor_datadir'),
187 self.conf.get('tor', 'torrc_extra_lines')
188 )
189 if not c:
190 log.error('Unable to launch and connect to tor client')
191 self.change_state_fatal_error()
192 return
193 c.add_event_listener(self.notif_circ_event, EventType.CIRC)
194 c.add_event_listener(self.notif_ffmeas_event, EventType.FF_MEAS)
195 self.tor_client = c
196 self.change_state_connected_to_tor()
198 def _ensure_conn_w_coord(self, delay: float):
199 ''' Main function in the ENSURE_CONN_W_COORD state. Repeatedly try
200 connecting to the coordinator until we are successful or have a fatal
201 error warranting completely giving up on life.
203 This function uses asynchronous python: the connection is represented
204 by a transport and protocol, and we try connecting asynchronously and
205 use a callback to find out the result. That said, the work done here
206 should probably be the only thing going on.
207 '''
208 assert self.state == States.ENSURE_CONN_W_COORD
209 # TODO: what if connection goes away?
210 # Get the (host, port) from "host:port"
211 coord_addr_port = self.conf.getaddr('measurer', 'coord_addr')
212 if coord_addr_port is None:
213 log.error('Don\'t know where coord is')
214 self.change_state_fatal_error()
215 return
217 # Callback to get the result of one connection attempt. If it didn't
218 # work and it wasn't fatal, schedule calling this function again some
219 # time in the future. If fatal, die. If successful, save the transport
220 # and protocol and move on!
221 def cb(fut):
222 nonlocal delay
223 # It's possible that the programmer didn't catch all exceptions.
224 # If the result is an exception, this *should* bubble up to the
225 # default exception handler, _exception_handler(...).
226 success_code, stuff_or_error = fut.result()
227 # Now check if we were successful, fatally unable to connect, or if
228 # we should retry.
229 if success_code == CoordConnRes.FATAL_ERROR:
230 log.error(
231 'Fatal error connecting to coordinator: %s',
232 stuff_or_error)
233 self.change_state_fatal_error()
234 return
235 elif success_code == CoordConnRes.RETRY_ERROR:
236 delay = min(2 * delay, 60)
237 log.warn(
238 'Unable to connect to coordinator: %s. Retrying in %.2fs.',
239 stuff_or_error, delay)
240 loop.call_later(
241 delay, partial(self._ensure_conn_w_coord, delay))
242 return
243 assert success_code == CoordConnRes.SUCCESS
244 assert not isinstance(stuff_or_error, str)
245 self.coord_trans, self.coord_proto = stuff_or_error
246 self.change_state_connected_to_coord()
247 # Kick off the asyncronous attempt to connect and attach the above
248 # callback so we can get the result.
249 task = asyncio.Task(_try_connect_to_coord(
250 coord_addr_port,
251 self.conf.getpath('measurer', 'key'),
252 self.conf.getpath('measurer', 'coord_cert'),
253 ))
254 task.add_done_callback(cb)
255 # This is asynchronous python. We end immediately and the callback will
256 # eventually be called with the connection results. Nothing left to do
257 # for now.
259 def _complete_cleanup(self):
260 ''' Cleanup all of our state while being very careful to not allow any
261 exceptions to bubble up. Use this when in an error state and you want
262 to cleanup before starting over or just dying. '''
263 if hasattr(self, 'tor_client') and self.tor_client:
264 log.info('cleanup: closing tor')
265 try:
266 self.tor_client.close()
267 except Exception as e:
268 log.error('Error closing tor: %s', e)
269 if hasattr(self, 'coord_trans') and self.coord_trans:
270 log.info('cleanup: closing coord transport')
271 try:
272 self.coord_trans.close()
273 except Exception as e:
274 log.error('Error closing transport with coord: %s', e)
275 if hasattr(self, 'coord_proto') and self.coord_proto:
276 # nothing to do
277 pass
278 if hasattr(self, 'measurements') and self.measurements:
279 log.info(
280 'cleanup: forgetting about %d measurements',
281 len(self.measurements))
282 self.measurements = {}
284 def _die(self):
285 ''' End execution of the program. '''
286 loop.stop()
288 # ########################################################################
289 # STATE CHANGE EVENTS. These are called when entering the specified state.
290 # ########################################################################
292 def on_enter_READY(self):
293 pass
295 def on_enter_ENSURE_CONN_W_TOR(self):
296 loop.call_soon(self._ensure_conn_w_tor)
298 def on_enter_ENSURE_CONN_W_COORD(self):
299 loop.call_soon(partial(self._ensure_conn_w_coord, 0.5))
301 def on_enter_NONFATAL_ERROR(self, err_msg: str):
302 log.error('nonfatal error: %s', err_msg)
303 loop.call_soon(self._complete_cleanup)
304 loop.call_soon(self.change_state_starting)
306 def on_enter_FATAL_ERROR(self):
307 # log.error('We encountered a fatal error :(')
308 self._complete_cleanup()
309 self._die()
311 # ########################################################################
312 # MESSAGES FROM COORD. These are called when the coordinator tells us
313 # something.
314 # ########################################################################
316 def notif_coord_msg(self, message: msg.FFMsg):
317 msg_type = type(message)
318 if self.state != States.READY:
319 log.warn(
320 'Coord sent us message but we are not ready. Dropping. %s',
321 message)
322 return
323 # The asserts below are for shutting up mypy
324 if msg_type == msg.ConnectToRelay:
325 assert isinstance(message, msg.ConnectToRelay)
326 return self._notif_coord_msg_ConnectToRelay(message)
327 elif msg_type == msg.Failure:
328 assert isinstance(message, msg.Failure)
329 return self._notif_coord_msg_Failure(message)
330 elif msg_type == msg.Go:
331 assert isinstance(message, msg.Go)
332 return self._notif_coord_msg_Go(message)
333 log.warn(
334 'Unexpected/unhandled %s message. Dropping. %s',
335 msg_type, message)
337 def _notif_coord_msg_ConnectToRelay(self, message: msg.ConnectToRelay):
338 # caller should have verified and logged about this already
339 assert self.state == States.READY
340 meas_id = message.meas_id
341 if meas_id in self.measurements:
342 fail_msg = msg.Failure(msg.FailCode.M_DUPE_MEAS_ID, meas_id)
343 log.error(fail_msg)
344 self.coord_trans.write(fail_msg.serialize())
345 return
346 meas = Measurement(message)
347 ret = tor_client.send_msg(
348 self.tor_client,
349 MeasrStartMeas(
350 meas.meas_id, meas.relay_fp, message.n_circs,
351 meas.meas_duration))
352 # Make sure the circuit launches went well. Note they aren't built yet.
353 # It's just that tor found nothing obviously wrong with trying to build
354 # these circuits.
355 if not ret.is_ok():
356 fail_msg = msg.Failure(
357 msg.FailCode.LAUNCH_CIRCS, meas_id,
358 extra_info=str(ret))
359 log.error(fail_msg)
360 self.coord_trans.write(fail_msg.serialize())
361 return
362 # We expect to see "250 FF_MEAS 0 LAUNCHED CIRCS=1,2,3,4,5", where the
363 # 0 is the measurement ID we told the tor client, and the actual list
364 # of launched circuits is CIRCS the comma-separated list
365 code, _, content = ret.content()[0]
366 # Already checked this above with ret.is_ok()
367 assert code == '250'
368 parts = content.split()
369 if len(parts) != 4 or \
370 not parts[0] == 'FF_MEAS' or \
371 not parts[2] == 'LAUNCHED' or \
372 not parts[3].startswith('CIRCS='):
373 fail_msg = msg.Failure(
374 msg.FailCode.MALFORMED_TOR_RESP, meas_id,
375 extra_info=str(ret))
376 log.error(fail_msg)
377 self.coord_trans.write(fail_msg.serialize())
378 return
379 meas.circs.update({
380 int(circ_id_str) for circ_id_str in
381 parts[3].split('=')[1].split(',')
382 })
383 log.info(
384 'Launched %d circuits with relay %s: %s', len(meas.circs),
385 meas.relay_fp, meas.circs)
386 self.measurements[meas_id] = meas
387 # That's all for now. We stay in this state until Tor tells us it has
388 # finished building all circuits
390 def _notif_coord_msg_Go(self, go_msg: msg.Go):
391 # caller should have verified and logged about this already
392 assert self.state == States.READY
393 meas_id = go_msg.meas_id
394 if meas_id not in self.measurements:
395 fail_msg = msg.Failure(msg.FailCode.M_UNKNOWN_MEAS_ID, meas_id)
396 log.error(fail_msg)
397 self.coord_trans.write(fail_msg.serialize())
398 # TODO: cleanup Measurement
399 return
400 meas = self.measurements[meas_id]
401 start_msg = MeasrStartMeas(
402 meas.meas_id, meas.relay_fp, len(meas.ready_circs),
403 meas.meas_duration)
404 ret = tor_client.send_msg(self.tor_client, start_msg)
405 if not ret.is_ok():
406 fail_msg = msg.Failure(msg.FailCode.M_START_ACTIVE_MEAS, meas_id)
407 log.error(fail_msg)
408 self.coord_trans.write(fail_msg.serialize())
409 # TODO: cleanup Measurement
410 return
412 def _notif_coord_msg_Failure(self, fail_msg: msg.Failure):
413 # caller should have verified and logged about this already
414 assert self.state == States.READY
415 meas_id = fail_msg.meas_id
416 log.warn(
417 'Received FailCode %s regarding meas id %s',
418 fail_msg.code.name, meas_id)
419 if meas_id is None:
420 return
421 if meas_id not in self.measurements:
422 log.info(
423 'Unknown measurement %d in Failure message, so nothing to do',
424 meas_id)
425 return
426 # TODO: uhh ... close circuits or something? Other clean up?
428 # ########################################################################
429 # MISC EVENTS. These are called from other parts of the measr code.
430 # ########################################################################
432 def notif_ffmeas_event(self, event: FFMeasEvent):
433 ''' Called from stem to tell us about FF_MEAS events.
435 These events come from a different thread. We tell the main thread's
436 loop (in a threadsafe manner) to handle this event in the similarly
437 named function with a leading underscore.
438 '''
439 loop.call_soon_threadsafe(partial(self._notif_ffmeas_event, event))
441 def _notif_ffmeas_event(self, event: FFMeasEvent):
442 ''' Actually handle the FF_MEAS event.
444 We look for:
445 - per-second BW_REPORTs of the amount of measurement traffic sent and
446 received, and we will fowarded those on to the coordinator.
447 - a END message at the end signally success.
448 '''
449 if event.ffmeas_type == 'BW_REPORT':
450 log.debug(
451 'Forwarding report of %d/%d sent/recv meas bytes',
452 event.sent, event.recv)
453 report = msg.BwReport(
454 event.meas_id, time.time(), event.sent, event.recv)
455 self.coord_trans.write(report.serialize())
456 return
457 elif event.ffmeas_type == 'END':
458 log.info(
459 'Tor client tells us meas %d finished %ssuccessfully%s',
460 event.meas_id, '' if event.success else 'un',
461 '. Cleaning up.' if event.meas_id in self.measurements else
462 ', but we don\'t know about it. Dropping.')
463 if event.meas_id not in self.measurements:
464 return
465 del self.measurements[event.meas_id]
466 return
467 log.warn(
468 'Unexpected FF_MEAS event type %s. Dropping.', event.ffmeas_type)
469 return
471 def notif_circ_event(self, event: CircuitEvent):
472 ''' Called from stem to tell us about circuit events.
474 These events come from a different thread. We tell the main thread's
475 loop (in a threadsafe manner) to handle this event in the similarly
476 named function with a leading underscore.
477 '''
478 loop.call_soon_threadsafe(partial(self._notif_circ_event, event))
480 def _notif_circ_event(self, event: CircuitEvent):
481 ''' Actually handle the circuit event. We usually don't care, but
482 sometimes we are waiting on circuits to be built with a relay.
484 This runs in the main thread's loop unlike the similarly named function
485 (without a leading underscore) that tells the loop to call us.
486 '''
487 circ_id = int(event.id)
488 # We don't care about anything unless we're in the main state where we
489 # do measurements
490 if self.state != States.READY:
491 return
492 # Make sure it's a circuit we care about
493 all_circs: Set[int] = set.union(
494 # in case there's no measurements, add empty set to avoid errors
495 set(),
496 *[meas.circs for meas in self.measurements.values()])
497 waiting_circs: Set[int] = set.union(
498 # in case there's no measurements, add empty set to avoid errors
499 set(),
500 *[meas.waiting_circs for meas in self.measurements.values()])
501 if circ_id not in all_circs:
502 # log.warn(
503 # 'Ignoring CIRC event not for us. %d not in any '
504 # 'measurement\'s set of all circuits',
505 # circ_id)
506 return
507 # Act based on the type of CIRC event
508 if event.status == CircStatus.BUILT:
509 if circ_id not in waiting_circs:
510 log.warn(
511 'CIRC BUILT event for circ %d we do care about but that '
512 'isn\'t waiting. Shouldn\'t be possible. %s. Ignoring.',
513 circ_id, event)
514 return
515 # Tell all interested Measurements (should just be one, but do all
516 # that claim to care about this circuit, just in case) that the
517 # circuit is built
518 for meas in self.measurements.values():
519 if circ_id not in meas.circs:
520 continue
521 meas.ready_circs.add(circ_id)
522 log.debug(
523 'Circ %d added to meas %d\'s built circs. Now '
524 'have %d/%d', circ_id, meas.meas_id,
525 len(meas.ready_circs), len(meas.circs))
526 # If all are built, then tell coord this measurement is ready
527 if len(meas.ready_circs) < len(meas.circs):
528 continue
529 log.info('Meas %d built all circs', meas.meas_id)
530 self.coord_trans.write(msg.ConnectedToRelay(
531 meas.connect_msg).serialize())
532 return
533 elif event.status in [CircStatus.LAUNCHED, CircStatus.EXTENDED]:
534 # ignore these
535 return
536 elif event.status in [CircStatus.CLOSED, CircStatus.FAILED]:
537 # Tell all interested Measurements (should just be one, but do all
538 # that claim to care about this circuit, just in case) that the
539 # circuit has closed or failed
540 for meas in self.measurements.values():
541 if circ_id not in meas.circs:
542 continue
543 meas.bad_circs.add(circ_id)
544 log.info(
545 'Meas %d\'s circ %d is now closed/failed: %s',
546 meas.meas_id, circ_id, event)
547 return
548 # It's for us, but don't know how to handle it yet
549 log.warn('Not handling CIRC event for us: %s', event)
552class CoordConnRes(enum.Enum):
553 ''' Part of the return value of :meth:`_try_connect_to_coord`. '''
554 #: We successfully connected to the coord, shook our TLS hands, and all is
555 #: well.
556 SUCCESS = enum.auto()
557 #: We were not successful, but whatever happened may be temporary and it's
558 #: logical to try connecting again in the future.
559 RETRY_ERROR = enum.auto()
560 #: We were not successful, and trying again in the future is extremely
561 #: unlikely to be successful. We should give up.
562 FATAL_ERROR = enum.auto()
565async def _try_connect_to_coord(
566 addr_port: Tuple[str, int],
567 our_key: str,
568 coord_cert: str,
569) -> Tuple[
570 CoordConnRes, Union[
571 str, Tuple[asyncio.BaseTransport, asyncio.BaseProtocol]]]:
572 ''' Try to connect to the coordinator at the given (host, port) tuple.
573 Perform the TLS handshake using our client TLS key in the file `our_key`
574 and only trusting the coord server cert in the file `coord_cert`.
576 Returns a tuple in all cases. The first item indicates success with
577 CoordConnRes. If it is an *_ERROR, then the second item is a string with
578 more details. If it is SUCCESS, then the second item is the transport and
579 protocol with the coordinator.
581 This function is a coroutine and all exceptions **should** be handled
582 within this function's body. If they aren't, that's a programming error.
583 To handle the case of unhandled exceptions, wrap this function in a
584 Task/Future, then catch and handle the generic Exception.
586 def cb(fut):
587 # handle the completion of the Task, whether successful or not
588 pass
589 task = asyncio.Task(_try_connect_to_coord(...))
590 task.add_done_callback(cb)
591 try:
592 result = task.result()
593 except Exception as e:
594 log.error(
595 'An unhandled exception occurred. Tell your programmer: %s', e)
596 # Additional code to handle the error, as necessary
597 '''
598 if not os.path.isfile(our_key):
599 return CoordConnRes.FATAL_ERROR, our_key + ' does not exist'
600 if not os.path.isfile(coord_cert):
601 return CoordConnRes.FATAL_ERROR, coord_cert + ' does not exist'
602 ssl_context = ssl.SSLContext()
603 # Load our TLS private key and certificate
604 ssl_context.load_cert_chain(our_key)
605 # Load the certificate of the coord
606 ssl_context.load_verify_locations(coord_cert)
607 ssl_context.verify_mode = ssl.CERT_REQUIRED
608 try:
609 res = await loop.create_connection(
610 CoordProtocol,
611 addr_port[0],
612 addr_port[1],
613 ssl=ssl_context,
614 )
615 except OSError as e:
616 return CoordConnRes.RETRY_ERROR, str(e)
617 return CoordConnRes.SUCCESS, res
620def _exception_handler(loop, context):
621 log.error('%s', context['message'])
622 if 'exception' in context:
623 log.error(context['exception'])
624 if 'handle' in context:
625 log.error(context['handle'])
626 if 'source_traceback' in context:
627 log.error('Traceback:')
628 summary = StackSummary.from_list(context['source_traceback'])
629 for line_super in summary.format():
630 # The above line has multiple lines in it
631 for line in line_super.split('\n'):
632 if len(line):
633 log.error(' %s', line)
634 else:
635 log.error('Traceback not available. Run with PYTHONASYNCIODEBUG=1')
636 machine.change_state_fatal_error()
639# # Not sure if this would actually work here. Maybe add to the logging config
640# # file?
641# # https://docs.python.org/3.6/library/asyncio-dev.html#logging
642# logging.getLogger('asyncio').setLevel(logging.WARNING)
643log = logging.getLogger(__name__)
644loop = asyncio.get_event_loop()
645machine: StateMachine
648def gen_parser(sub) -> ArgumentParser:
649 ''' Add the cmd line options for this FlashFlow command '''
650 d = 'Run as a FlashFlow measurer.'
651 p = sub.add_parser('measurer', description=d)
652 return p
655# This function needs **some sort** of type annotation so that mypy will check
656# the things it does. Adding the return value (e.g. '-> None') is enough
657def main(args, conf) -> None:
658 global machine
659 os.makedirs(conf.getpath('measurer', 'datadir'), mode=0o700, exist_ok=True)
660 os.makedirs(conf.getpath('measurer', 'keydir'), mode=0o700, exist_ok=True)
661 machine = StateMachine(conf)
662 loop.set_exception_handler(_exception_handler)
663 loop.call_soon(machine.change_state_starting)
664 try:
665 loop.run_forever()
666 finally:
667 loop.run_until_complete(loop.shutdown_asyncgens())
668 loop.close()
669 return