flashflow.cmd.coord package

Submodules

flashflow.cmd.coord.coord module

FlashFlow coordinator

The FlashFlow coordinator is the brain of a FlashFlow team. It controls one or more measurers, telling them when and how to perform bandwidth measurements of relays.

Code organization

The coordinator first opens its listening sockets for measurers and for the FlashFlow controller. Then it launches a Tor client and connects and authenticates to it. These things are done in main().

At this point, events drive the execution of the program. The following are the message/event types:

  • begin of slot/begin of period events,

  • FF ctrl messages,

  • CIRC Tor control messages,

  • FF_MEAS Tor control messages, and

  • FF measurer messages.

The remainder of this section explains the organization of the code and control flow by walking the reader through the process of a measurement taking place.

Begin of slot/period events

Main function(s):

The measurement schedule divides time into periods (i.e. days) and periods into slots (i.e. minutes). We schedule a call to begin_of_slot_cb() at the beginning of every slot with schedule_next_begin_of_slot(). If it’s also the beginning of a new period, begin_of_new_period() is also called.

If there are measurements to do, begin_of_slot_cb() sends a FF ctrl message for each to start them.

FlashFlow control messages

Main function(s):

notif_ctrl_message() incoming FlashFlow control messages. These are simple strings and either come from a FlashFlow control process (not to be confused with a Tor controller or Tor control port) or internally. The most notable message is measure <fingerprint>, which instructs us to start measuring the relay with the given fingerprint. A Measuremet is constructed and connect_to_relay() is called to start the measurement by telling our tor client to construct a one-hop circuit to the relay for measurement purposes, at which point we stop doing things related to this measurement until our Tor client reports its progress via CIRC and FF_MEAS Tor control messages.

CIRC Tor control messages

Main function(s):

These come from stem in _notif_circ_event(), but in a different thread. Thus we have to send the event to the main event loop thread in notif_circ_event().

From there, control goes to a different function based on the status of the circuit (e.g. BUILT or LAUNCHED or CLOSED). But ultimately, the coordinator doesn’t really care: its Tor client knows to send measurement parameters to the relay after the circuit is BUILT, so all we do is debug-log when the circuit is BUILT.

The message we’re ultimately waiting for is a FF_MEAS event.

FF_MEAS Tor control messages

Main function(s):

These come from stem in _notif_ffmeas_event(), but in a different thread. Thus we have to send the event to the main event loop thread in notif_ffmeas_event().

In the running example of a measurement taking place, we’re waiting for a PARAMS_OK FF_MEAS message. Upon receiving it we hand control off to a sub-function that determines if the relay agrees to be measured, and if so, sends messages to the appropriate measurers instructing them to connect to the relay.

FlashFlow measurer messages

Main function(s):

We’re waiting for measurers to respond saying they’ve successfully connected to the relay. Once _notif_measurer_msg_ConnectedToRelay() has seen the ConnectedToRelay from each measurer, it tells our tor client that active measurement is starting and tells each measurer to start active measurement.

Per-second messages

During a measurement, we expect bandwidth reports each second from our tor client and from each measurer. The former is a FF_MEAS Tor control message with type BW_REPORT handled in notif_ffmeas_event_BW_REPORT(), and the latter is a flashflow.msg.BwReport FlashFlow measurer message.

class flashflow.cmd.coord.coord.CtrlProtocol

Bases: asyncio.protocols.Protocol

Development/debugging control communication protocol

connection_lost(exc)

Called when the connection is lost or closed.

The argument is an exception object or None (the latter meaning a regular EOF is received or the connection was aborted or closed).

connection_made(transport)

Called when a connection is made.

The argument is the transport representing the pipe connection. To receive data, wait for data_received() calls. When the connection is closed, connection_lost() is called.

data_received(data)

Called when some data is received.

The argument is a bytes object.

transport: Optional[asyncio.transports.Transport] = None
class flashflow.cmd.coord.coord.MeasrProtocol

Bases: asyncio.protocols.Protocol

How we communicate with measurers.

Very little should be done here. Parse the bytes then give objects to the rest of coord code to handle.

connection_lost(exc)

Called when the connection is lost or closed.

The argument is an exception object or None (the latter meaning a regular EOF is received or the connection was aborted or closed).

connection_made(transport)

Called when a connection is made.

The argument is the transport representing the pipe connection. To receive data, wait for data_received() calls. When the connection is closed, connection_lost() is called.

data_received(data)

Called when some data is received.

The argument is a bytes object.

transport: Optional[asyncio.transports.Transport] = None
class flashflow.cmd.coord.coord.Measurement(meas_id, relay_fp, measurers, meas_duration, bg_percent)

Bases: object

State related to a single measurment.

bg_percent: float = None

The percent of background traffic, as a fraction between 0 and 1, that the relay should be limiting itself to.

bg_reports: List[Tuple[float, int, int]] = None

Per-second reports from the relay with the number of the bytes of background traffic it is carrying. Each tuple is (time, sent_bytes, received_bytes) where time is the timestamp at which the report was received and sent/received are from the relay’s perspective.

meas_duration: int = None

The duration, in seconds, that active measurement should last.

meas_id: int = None

Measurement ID

measr_reports: Dict[MeasrProtocol, List[Tuple[float, int, int]]] = None

Per-second reports from the measurers with the number of bytes of measurement traffic. Each tuple is (time, sent_bytes, received_bytes), where time is the timestamp at which the report was received and sent/received are from the measurer’s perspective.

ready_measurers: Set[MeasrProtocol] = None

The measurers that have indicated the are ready to begin active measurement

relay_circ: Optional[int] = None

Our circuit id with the relay. Filled in once we actually have one

relay_fp: str = None

The fingerprint of the relay to measure

should_save_data()

Return our best idea for whether or not it would be a good idea to save this measurement’s results to disk.

Return type

bool

start_and_end()

Return our best idea for what the start and end timestamp of this Measurement are.

This method assumes the measurement is finished.

We currently only consider the timestamps in the background reports; these have timestamps that we generated, so if we always use only ourself as the authority on what time it is, we’ll always be consistent. Consider a deployment with at lesat two measurers, one with a very fast clock and another with a very slow clock. If we instead, for example, took the earliest timestamp as the start and the latest timestamp as the end, then not only could the measurement “last” more than its actual duration, but our accuracy on start/end times would change as the set of measureers used for each measurement changes.

Return type

Tuple[float, float]

class flashflow.cmd.coord.coord.State(conf)

Bases: object

Global state for the coord process

attach_sockets(tor_client, measr_server, ctrl_server)

Add to the State object the actual communication stuff. Ideally the object doesn’t already have them, but if it does, we’ll warm

begin_of_slot_handle: Optional[asyncio.TimerHandle] = None

If we have one, the handle for the event to be called at the beginning of the next measurement slot

conf: ConfigParser = None

Config file

ctrl_server: Optional[Server] = None

Our listening sockets for FF controllers, set in main

measr_server: Optional[Server] = None

Our listening sockets for measurers, set in main

measurements: Dict[int, Measurement] = None

Storage of ongoing Measurements

measurers: Set[MeasrProtocol] = None

All measurers that are currently connected

schedule: Optional[meas_period.Schedule] = None

Our current measurement schedule

state_file: StateFile = None

On-disk storage of long term information

tor_client: Optional[Controller] = None

Our Tor client, set in main

flashflow.cmd.coord.coord.begin_of_new_period(state)
flashflow.cmd.coord.coord.begin_of_slot_cb()
flashflow.cmd.coord.coord.cleanup(state)

Cleanup all the State object while being very careful to not allow any exceptions to bubble up. Use this when in an error state and you want to cleanup before starting over or just dying.

flashflow.cmd.coord.coord.connect_to_relay(meas, state)

Start the given measurement off by connecting ourselves to the necessary relay

flashflow.cmd.coord.coord.die()

End execution of the program.

flashflow.cmd.coord.coord.ensure_conn_w_tor(conf)

Create a Tor client, connect to its control socket, authenticate, and return the Controller. On success, return True and the controller. Otherwise return False and a operator-meaningful error message.

Return type

Tuple[bool, Union[Controller, str]]

async flashflow.cmd.coord.coord.ensure_ctrl_listen_socks(conf)

Create the listen sockets to which FF controllers will connect. On success, return True and the Server listening for FF controllers. Otherwise return False and an operator-meaningful error message.

Return type

Tuple[bool, Union[Server, str]]

async flashflow.cmd.coord.coord.ensure_listen_socks(conf)

Create all the listening sockets we need. On success return True and a tuple of all the Server s. Otherwise return False and an operator-meaningful error message.

Return type

Tuple[bool, Tuple[Server, Server], str]

async flashflow.cmd.coord.coord.ensure_measurer_listen_socks(conf)

Create the listen sockets to which measurers will connect. On success, return True and the Server listening for measurers. Otherwise return False and an operator-meaningful error message.

Return type

Tuple[bool, Union[Server, str]]

flashflow.cmd.coord.coord.find_relay(c, nick_fp)

Ask our tor client for the RouterStatusEntryV3 object for the given relay, identified either by nickname or fingerprint. If impossible, (e.g. it doesn’t exist), return None.

Return type

Optional[RouterStatusEntryV3]

flashflow.cmd.coord.coord.finish_measurement(meas, fail_msg, state)

We have finished a measurement, successful or not. Write out the results we have, tell everyone we are done, and forget about it.

flashflow.cmd.coord.coord.g_state = None

global in-memory state object, init in main

flashflow.cmd.coord.coord.gen_parser(sub)

Add the cmd line options for this FlashFlow command

Return type

ArgumentParser

flashflow.cmd.coord.coord.generate_v3bw(conf)

Generate a v3bw file using our latest results for each relay.

flashflow.cmd.coord.coord.get_measr_infos(conf)

Parse the information we have on our measurers from our config file, and return a list of MeasrInfo for them.

Return type

List[MeasrInfo]

flashflow.cmd.coord.coord.get_relays_to_measure(c, rfl_fname)

Return the list of relay fingerprints of all known relays that we should measure.

Parameters
  • c (Controller) – Tor controller

  • rfl_fname (str) – Filename containing a RelayFilterList

Return type

List[RelayInfo]

flashflow.cmd.coord.coord.have_all_bw_reports(meas)

Check if we have the expected number of bg_reports and measr_reports for the given measurement

Parameters

meas_id – The measurement ID. If we don’t know about it, we return False

Return type

bool

Returns

True if we have the expected number of reports from all parties, else False i.e. because unrecognized meas_id or simply yet-incomplete measurement

flashflow.cmd.coord.coord.main(args, conf)
Return type

None

flashflow.cmd.coord.coord.make_new_schedule(conf, tor_client)

Make a new measurement schedule, scheduling each relay we know about to be measured at some point during the existing measurement period (if now is not the very beginning of a period, we might schedule some for the past).

Return type

Schedule

flashflow.cmd.coord.coord.next_meas_id()

Generate a new measurement ID

Return type

int

flashflow.cmd.coord.coord.notif_circ_event(event, state=None)

The real CIRC event handler, in the main thread’s loop.

Receive a CIRC event from our tor client.

We want to know about circuit events for the following reasons:
  • When we have recently launched our circuit with the relay and want to know when it is built so we can go to the next state

  • TODO failures

  • TOOD other reasons

flashflow.cmd.coord.coord.notif_circ_event_BUILT(meas, event)

Received CIRC event with status BUILT.

Look for a Measurement waiting on this circ_id to be BUILT.

flashflow.cmd.coord.coord.notif_circ_event_CLOSED(meas, event)
flashflow.cmd.coord.coord.notif_circ_event_FAILED(meas, event)
flashflow.cmd.coord.coord.notif_ctrl_message(msg, state=None)

Called from CtrlProtocol when a controller has given us a command. Returns (True, ‘’) if the message seems like a good, actionable message. Otherwise returns False and a human-meaningful string with more information.

Return type

Tuple[bool, str]

flashflow.cmd.coord.coord.notif_ffmeas_event(event, state=None)

The real FF_MEAS event handler, in the main thread’s loop.

Receive a FF_MEAS event from our tor client.

flashflow.cmd.coord.coord.notif_ffmeas_event_BW_REPORT(meas, event)

Received FF_MEAS event with type BW_REPORT

flashflow.cmd.coord.coord.notif_ffmeas_event_PARAMS_OK(meas, event, state)

Received FF_MEAS event with type PARAMS_OK

Check if the relay accepted them or not. Drop the Measurement if not accepted, otherwise continue on to the next stage: telling the measurers to connect to the relay.

flashflow.cmd.coord.coord.notif_ffmeas_event_PARAMS_SENT(meas, event)

Received FF_MEAS event with type PARAMS_SENT

Log about it. There’s nothing to do until we learn the relay’s response, which we get later with PARAMS_OK.

flashflow.cmd.coord.coord.notif_measurer_connected(measr, state)

Called from MeasrProtocol when a connection is successfully made from a measurer

flashflow.cmd.coord.coord.notif_measurer_disconnected(measr, state)

Called from MeasrProtocol when a connection with a measurer has been lost

flashflow.cmd.coord.coord.notif_measurer_msg(measr, message, state=None)

Receive a FFMsg object from one of our measurers

flashflow.cmd.coord.coord.notif_sslerror(self, exc, trans)

Called from the last-chance exception handler to tell us about TLS errors. For example, measurer connected to us with a bad client cert

flashflow.cmd.coord.coord.schedule_next_begin_of_slot(state)

Schedule the next call of the begin-of-slot callback function

flashflow.cmd.coord.coord.slots_in_period(conf)

Calculate the number of slots in a measurement period based on the given configuration

Return type

int

flashflow.cmd.coord.coord.write_measurement_results(meas)

We have completed a measurement (maybe successfully) and should write out measurement results to our file.

Module contents