flashflow.cmd.coord package¶
Submodules¶
flashflow.cmd.coord.coord module¶
FlashFlow coordinator¶
The FlashFlow coordinator is the brain of a FlashFlow team. It controls one or more measurers, telling them when and how to perform bandwidth measurements of relays.
Code organization¶
The coordinator first opens its listening sockets for measurers and for the
FlashFlow controller. Then it launches a Tor client and connects and
authenticates to it. These things are done in main()
.
At this point, events drive the execution of the program. The following are the message/event types:
begin of slot/begin of period events,
FF ctrl messages,
CIRC Tor control messages,
FF_MEAS Tor control messages, and
FF measurer messages.
The remainder of this section explains the organization of the code and control flow by walking the reader through the process of a measurement taking place.
Begin of slot/period events¶
Main function(s):
The measurement schedule divides time into periods (i.e. days) and periods into
slots (i.e. minutes). We schedule a call to begin_of_slot_cb()
at the
beginning of every slot with schedule_next_begin_of_slot()
. If it’s also
the beginning of a new period, begin_of_new_period()
is also called.
If there are measurements to do, begin_of_slot_cb()
sends a FF ctrl
message for each to start them.
FlashFlow control messages¶
Main function(s):
notif_ctrl_message()
and the similarly named helper functions it calls
notif_ctrl_message()
incoming FlashFlow control messages. These are
simple strings and either come from a FlashFlow control process (not to be
confused with a Tor controller or Tor control port) or internally. The most
notable message is measure <fingerprint>
, which instructs us to start
measuring the relay with the given fingerprint. A Measuremet
is
constructed and connect_to_relay()
is called to start the measurement by
telling our tor client to construct a one-hop circuit to the relay for
measurement purposes, at which point we stop doing things related to this
measurement until our Tor client reports its progress via CIRC and FF_MEAS Tor
control messages.
CIRC Tor control messages¶
Main function(s):
notif_circ_event()
and the similarly named helper functions it calls
These come from stem in _notif_circ_event()
, but in a different thread.
Thus we have to send the event to the main event loop thread in
notif_circ_event()
.
From there, control goes to a different function based on the status of the circuit (e.g. BUILT or LAUNCHED or CLOSED). But ultimately, the coordinator doesn’t really care: its Tor client knows to send measurement parameters to the relay after the circuit is BUILT, so all we do is debug-log when the circuit is BUILT.
The message we’re ultimately waiting for is a FF_MEAS event.
FF_MEAS Tor control messages¶
Main function(s):
notif_ffmeas_event()
and the similarly named helper functions it calls
These come from stem in _notif_ffmeas_event()
, but in a different thread.
Thus we have to send the event to the main event loop thread in
notif_ffmeas_event()
.
In the running example of a measurement taking place, we’re waiting for a PARAMS_OK FF_MEAS message. Upon receiving it we hand control off to a sub-function that determines if the relay agrees to be measured, and if so, sends messages to the appropriate measurers instructing them to connect to the relay.
FlashFlow measurer messages¶
Main function(s):
We’re waiting for measurers to respond saying they’ve successfully connected to
the relay. Once _notif_measurer_msg_ConnectedToRelay()
has seen the
ConnectedToRelay
from each measurer, it tells our tor client that
active measurement is starting and tells each measurer to start active
measurement.
Per-second messages¶
During a measurement, we expect bandwidth reports each second from our tor
client and from each measurer. The former is a FF_MEAS Tor control message with
type BW_REPORT handled in notif_ffmeas_event_BW_REPORT()
, and the latter
is a flashflow.msg.BwReport
FlashFlow measurer message.
-
class
flashflow.cmd.coord.coord.
CtrlProtocol
¶ Bases:
asyncio.protocols.Protocol
Development/debugging control communication protocol
-
connection_lost
(exc)¶ Called when the connection is lost or closed.
The argument is an exception object or None (the latter meaning a regular EOF is received or the connection was aborted or closed).
-
connection_made
(transport)¶ Called when a connection is made.
The argument is the transport representing the pipe connection. To receive data, wait for data_received() calls. When the connection is closed, connection_lost() is called.
-
data_received
(data)¶ Called when some data is received.
The argument is a bytes object.
-
transport
: Optional[asyncio.transports.Transport] = None¶
-
-
class
flashflow.cmd.coord.coord.
MeasrProtocol
¶ Bases:
asyncio.protocols.Protocol
How we communicate with measurers.
Very little should be done here. Parse the bytes then give objects to the rest of coord code to handle.
-
connection_lost
(exc)¶ Called when the connection is lost or closed.
The argument is an exception object or None (the latter meaning a regular EOF is received or the connection was aborted or closed).
-
connection_made
(transport)¶ Called when a connection is made.
The argument is the transport representing the pipe connection. To receive data, wait for data_received() calls. When the connection is closed, connection_lost() is called.
-
data_received
(data)¶ Called when some data is received.
The argument is a bytes object.
-
transport
: Optional[asyncio.transports.Transport] = None¶
-
-
class
flashflow.cmd.coord.coord.
Measurement
(meas_id, relay_fp, measurers, meas_duration, bg_percent)¶ Bases:
object
State related to a single measurment.
-
bg_percent
: float = None¶ The percent of background traffic, as a fraction between 0 and 1, that the relay should be limiting itself to.
-
bg_reports
: List[Tuple[float, int, int]] = None¶ Per-second reports from the relay with the number of the bytes of background traffic it is carrying. Each tuple is
(time, sent_bytes, received_bytes)
where time is the timestamp at which the report was received and sent/received are from the relay’s perspective.
-
meas_duration
: int = None¶ The duration, in seconds, that active measurement should last.
-
meas_id
: int = None¶ Measurement ID
-
measr_reports
: Dict[MeasrProtocol, List[Tuple[float, int, int]]] = None¶ Per-second reports from the measurers with the number of bytes of measurement traffic. Each tuple is
(time, sent_bytes, received_bytes)
, where time is the timestamp at which the report was received and sent/received are from the measurer’s perspective.
-
ready_measurers
: Set[MeasrProtocol] = None¶ The measurers that have indicated the are ready to begin active measurement
-
relay_circ
: Optional[int] = None¶ Our circuit id with the relay. Filled in once we actually have one
-
relay_fp
: str = None¶ The fingerprint of the relay to measure
-
should_save_data
()¶ Return our best idea for whether or not it would be a good idea to save this measurement’s results to disk.
- Return type
-
start_and_end
()¶ Return our best idea for what the start and end timestamp of this
Measurement
are.This method assumes the measurement is finished.
We currently only consider the timestamps in the background reports; these have timestamps that we generated, so if we always use only ourself as the authority on what time it is, we’ll always be consistent. Consider a deployment with at lesat two measurers, one with a very fast clock and another with a very slow clock. If we instead, for example, took the earliest timestamp as the start and the latest timestamp as the end, then not only could the measurement “last” more than its actual duration, but our accuracy on start/end times would change as the set of measureers used for each measurement changes.
-
-
class
flashflow.cmd.coord.coord.
State
(conf)¶ Bases:
object
Global state for the coord process
-
attach_sockets
(tor_client, measr_server, ctrl_server)¶ Add to the
State
object the actual communication stuff. Ideally the object doesn’t already have them, but if it does, we’ll warm
-
begin_of_slot_handle
: Optional[asyncio.TimerHandle] = None¶ If we have one, the handle for the event to be called at the beginning of the next measurement slot
-
conf
: ConfigParser = None¶ Config file
-
ctrl_server
: Optional[Server] = None¶ Our listening sockets for FF controllers, set in main
-
measr_server
: Optional[Server] = None¶ Our listening sockets for measurers, set in main
-
measurements
: Dict[int, Measurement] = None¶ Storage of ongoing
Measurement
s
-
measurers
: Set[MeasrProtocol] = None¶ All measurers that are currently connected
-
schedule
: Optional[meas_period.Schedule] = None¶ Our current measurement schedule
-
state_file
: StateFile = None¶ On-disk storage of long term information
-
tor_client
: Optional[Controller] = None¶ Our Tor client, set in main
-
-
flashflow.cmd.coord.coord.
begin_of_new_period
(state)¶
-
flashflow.cmd.coord.coord.
begin_of_slot_cb
()¶
-
flashflow.cmd.coord.coord.
cleanup
(state)¶ Cleanup all the State object while being very careful to not allow any exceptions to bubble up. Use this when in an error state and you want to cleanup before starting over or just dying.
-
flashflow.cmd.coord.coord.
connect_to_relay
(meas, state)¶ Start the given measurement off by connecting ourselves to the necessary relay
-
flashflow.cmd.coord.coord.
die
()¶ End execution of the program.
-
flashflow.cmd.coord.coord.
ensure_conn_w_tor
(conf)¶ Create a Tor client, connect to its control socket, authenticate, and return the
Controller
. On success, return True and the controller. Otherwise return False and a operator-meaningful error message.- Return type
Tuple
[bool
,Union
[Controller
,str
]]
-
async
flashflow.cmd.coord.coord.
ensure_ctrl_listen_socks
(conf)¶ Create the listen sockets to which FF controllers will connect. On success, return True and the
Server
listening for FF controllers. Otherwise return False and an operator-meaningful error message.
-
async
flashflow.cmd.coord.coord.
ensure_listen_socks
(conf)¶ Create all the listening sockets we need. On success return True and a tuple of all the
Server
s. Otherwise return False and an operator-meaningful error message.
-
async
flashflow.cmd.coord.coord.
ensure_measurer_listen_socks
(conf)¶ Create the listen sockets to which measurers will connect. On success, return True and the
Server
listening for measurers. Otherwise return False and an operator-meaningful error message.
-
flashflow.cmd.coord.coord.
find_relay
(c, nick_fp)¶ Ask our tor client for the
RouterStatusEntryV3
object for the given relay, identified either by nickname or fingerprint. If impossible, (e.g. it doesn’t exist), returnNone
.- Return type
-
flashflow.cmd.coord.coord.
finish_measurement
(meas, fail_msg, state)¶ We have finished a measurement, successful or not. Write out the results we have, tell everyone we are done, and forget about it.
-
flashflow.cmd.coord.coord.
g_state
= None¶ global in-memory state object, init in main
-
flashflow.cmd.coord.coord.
gen_parser
(sub)¶ Add the cmd line options for this FlashFlow command
- Return type
-
flashflow.cmd.coord.coord.
generate_v3bw
(conf)¶ Generate a v3bw file using our latest results for each relay.
-
flashflow.cmd.coord.coord.
get_measr_infos
(conf)¶ Parse the information we have on our measurers from our config file, and return a list of
MeasrInfo
for them.
-
flashflow.cmd.coord.coord.
get_relays_to_measure
(c, rfl_fname)¶ Return the list of relay fingerprints of all known relays that we should measure.
- Parameters
c (
Controller
) – Tor controllerrfl_fname (
str
) – Filename containing aRelayFilterList
- Return type
-
flashflow.cmd.coord.coord.
have_all_bw_reports
(meas)¶ Check if we have the expected number of
bg_reports
andmeasr_reports
for the given measurement- Parameters
meas_id – The measurement ID. If we don’t know about it, we return
False
- Return type
- Returns
True
if we have the expected number of reports from all parties, elseFalse
i.e. because unrecognizedmeas_id
or simply yet-incomplete measurement
-
flashflow.cmd.coord.coord.
main
(args, conf)¶ - Return type
None
-
flashflow.cmd.coord.coord.
make_new_schedule
(conf, tor_client)¶ Make a new measurement schedule, scheduling each relay we know about to be measured at some point during the existing measurement period (if now is not the very beginning of a period, we might schedule some for the past).
- Return type
-
flashflow.cmd.coord.coord.
notif_circ_event
(event, state=None)¶ The real CIRC event handler, in the main thread’s loop.
Receive a CIRC event from our tor client.
- We want to know about circuit events for the following reasons:
When we have recently launched our circuit with the relay and want to know when it is built so we can go to the next state
TODO failures
TOOD other reasons
-
flashflow.cmd.coord.coord.
notif_circ_event_BUILT
(meas, event)¶ Received CIRC event with status BUILT.
Look for a Measurement waiting on this circ_id to be BUILT.
-
flashflow.cmd.coord.coord.
notif_circ_event_CLOSED
(meas, event)¶
-
flashflow.cmd.coord.coord.
notif_circ_event_FAILED
(meas, event)¶
-
flashflow.cmd.coord.coord.
notif_ctrl_message
(msg, state=None)¶ Called from CtrlProtocol when a controller has given us a command. Returns (True, ‘’) if the message seems like a good, actionable message. Otherwise returns False and a human-meaningful string with more information.
-
flashflow.cmd.coord.coord.
notif_ffmeas_event
(event, state=None)¶ The real FF_MEAS event handler, in the main thread’s loop.
Receive a FF_MEAS event from our tor client.
-
flashflow.cmd.coord.coord.
notif_ffmeas_event_BW_REPORT
(meas, event)¶ Received FF_MEAS event with type BW_REPORT
-
flashflow.cmd.coord.coord.
notif_ffmeas_event_PARAMS_OK
(meas, event, state)¶ Received FF_MEAS event with type PARAMS_OK
Check if the relay accepted them or not. Drop the Measurement if not accepted, otherwise continue on to the next stage: telling the measurers to connect to the relay.
-
flashflow.cmd.coord.coord.
notif_ffmeas_event_PARAMS_SENT
(meas, event)¶ Received FF_MEAS event with type PARAMS_SENT
Log about it. There’s nothing to do until we learn the relay’s response, which we get later with PARAMS_OK.
-
flashflow.cmd.coord.coord.
notif_measurer_connected
(measr, state)¶ Called from MeasrProtocol when a connection is successfully made from a measurer
-
flashflow.cmd.coord.coord.
notif_measurer_disconnected
(measr, state)¶ Called from MeasrProtocol when a connection with a measurer has been lost
-
flashflow.cmd.coord.coord.
notif_measurer_msg
(measr, message, state=None)¶ Receive a FFMsg object from one of our measurers
-
flashflow.cmd.coord.coord.
notif_sslerror
(self, exc, trans)¶ Called from the last-chance exception handler to tell us about TLS errors. For example, measurer connected to us with a bad client cert
-
flashflow.cmd.coord.coord.
schedule_next_begin_of_slot
(state)¶ Schedule the next call of the begin-of-slot callback function
-
flashflow.cmd.coord.coord.
slots_in_period
(conf)¶ Calculate the number of slots in a measurement period based on the given configuration
- Return type
-
flashflow.cmd.coord.coord.
write_measurement_results
(meas)¶ We have completed a measurement (maybe successfully) and should write out measurement results to our file.