flashflow package


flashflow.config module

Module handling the parsing of FlashFlow configuration files.

FlashFlow uses Python’s standard configparser.ConfigParser with configparser.ExtendedInterpolation to build a single config object. The same object contains the logging configuration suitable for handing off to Python via logging.config.fileConfig(). See logging.config for information on the format of the logging config.

Default options are loaded from DEF_CONF_INI (flashflow/config.default.ini), which is then extended with DEF_CONF_LOG_INI (flashflow/config.log.default.ini).

It is good practice to fetch ints, floats, and bools from the config with configparser.ConfigParser.getint(), configparser.ConfigParser.getfloat(), and configparser.ConfigParser.getboolean() respectively. FlashFlow extends ConfigParser with two additional converters:

  1. For file paths that automatically expands ~ and environment variables (with two ‘$’, not one). See expand_path().

  2. For parsing a hostname:port string into a (str, int) tuple. Use conf.getpath(...) and conf.getaddr(...) for these. See expand_addr().


Called near the very beginning of execution to finish configuring Python’s logging.


Parse the given string into a (hostname, port) tuple.

Not much effort is put into validation:
  • the port is checked to be a valid integer

  • if the host looks like an ipv6 address with brackets, they are removed

Otherwise the values are left as-is.

On success, returns (hostname, port) where port is an integer. On error, logs about the error and returns None. ConfigParser does not see this an error case worthy of special treatement, so you need to check if the returned value is None yourself.

''        --> None (error: no port)
':1234'            --> None (error: no host)
'example.com:asdf' --> None (error: invalid port)
'localhost:1234'   --> ('localhost', 1234)
':1234'            --> ('', 1234)
''   --> ('', 1234)
'[::1]:0'          --> ('::1', 0)
'::1:0'            --> ('::1', 0)

It’s not up to this function to decide how to specify “listen on all hosts” or “pick a port for me.” These things should be documented and decided elsewhere.

This function is only public so it gets documented. It is not intended to be used outside of this module.

Return type

Optional[Tuple[str, int]]


Expand path string containing shell variables and ~ into their values.

Environment variables must have their $ escaped by another $. For example, $$XDG_RUNTIME_DIR/foo.bar.

This function is only public so it gets documented. It is not intended to be used outside of this module.

Return type



THE function to call in order to parse and receive the configuration that the user wants to use.

First gather the default options, then apply the config found in the given filename, if any.

flashflow.flashflow module

flashflow.flashflow.call_real_main(args, conf)

Figure out what FlashFlow command the user gave and call into that command’s main function where the real work begins to happen. The only logic here should be figuring out what command’s main to call.

Return type



Entry point when called on the command line as flashflow ….

Do boring boilerplate stuff to get started initially. Parse the command line arguments and configuration file, then hand off control. This is where the bulk of the startup boring crap should happen.

Return type


flashflow.meas_period module

Measurement Periods and Slots

Helper functions for calculating things about measurement periods and slots.

Measurement Period: Suggested to be a day, each relay is intended to be measured once each period. At the beginning of each period (or on startup, if started mid-period without an existing schedule for that period), the coordinator comes up with a schedule that it will follow for the rest of the period, measuring relays in slots.

Measurement Slots: Twice the length of the active measurement duration, slots subdivide a measurement period. The first slot of each period is 0, and there are N slots in a period, where N = meas_period / (meas_dur * 2). Relays get scheduled into measurement slots, and the extra length allows for wiggle room for overhead due to measurement creation (building 100+ circuits takes a while!) and any other causes.

Here is a diagram of what happens during Period M, which is divided into N slots from 0 to N-1. Slot 1 is further blown up to show how three relays are scheduled to be measured during it. The pre-measurement work for each took a different amount of time, which is okay: that’s why slots are twice as long as the actual measurement duration. The moment the work is done to measure a relay, the measurement is started. There is still time left in the slot after the relays are measured. This is okay; we just wait until the next slot starts before doing any measurements scheduled in it.

------------ Time ------------>

|--Period M-------------------------------------------------|--Period M+1--
|--Slot 0----|--Slot 1----| ... |--Slot N-2----|--Slot N-1--|--Slot 0----
            /             \
          /                 \
        /                     -----------------------------------------\
      /                                                                 |
      |--Pre-meas relay1--|--Meas relay1--------------|                 |
      |--Pre-meas relay2----|--Meas relay2--------------|               |
      |--Pre-meas relay3----------|--Meas relay3--------------|         |

It’s possible a slot doesn’t have any measurements in it. That’s fine. We just wait until the next slot.

flashflow.meas_period.MEAS_TO_SLOT_FACT = 2

How much larger is a slot than a single measurement’s duration? E.g. 2 here means 30 second measurements are scheduled into 60 second slots.

class flashflow.meas_period.MeasrInfo(measr_id, bw)

Bases: object

Store general information on a measurer in one object to easily pass it around.

  • measr_id (str) – The measurer’s ID

  • bw (int) – The amount of bandwidth, in bytes/second, the measurer is capable of

class flashflow.meas_period.MeasrMeasInfo(measr_id, n_circs, bw)

Bases: object

Store info associated to how a specific measurer participates in a specific measurement.

  • measr_id (str) – A unique ID for the measurer that will be still be meaningful hours after making this object, ideally across reconnection with the measurer. In practice, we require measurers to use a unique organizationName in their certificate and use that.

  • n_circs (int) – The number of circuits this measurer shall open with the relay.

  • bw (int) – The amount of bandwidth, in bytes/second, the measurer should allocate to measuring this relay.

static from_dict(d)
Return type


Return type


class flashflow.meas_period.RelayInfo(fp)

Bases: object

Store general information on a relay in one object to easily pass it around.


fp (str) – The relay’s fingerprint

class flashflow.meas_period.Schedule

Bases: object

Measurement Schedule for a Measurement Period.

  • relays – List of relays to schedule during the measurement period

  • measurers – List of MeasrInfo we should use this measurement period

  • n_slots – The number of slots there are in a measurement period

  • n_circs – The number of circuits the measurers, in aggregate, should open with a relay to measure it

static from_dict(d)
Return type


static gen(relays, measurers, n_slots, n_circs)
Return type


slots: Dict[int, List[Tuple[str, List[MeasrMeasInfo]]]] = None

Key is slot number, value is a list tuples containing information needed for each measurement.

The contents of the tuple:
  1. str, the fingerprint of the relay to measure

  2. List of MeasrMeasInfo for the measurers to use for this measurement

Not every slot number will be in this dict. Missing slots have no measurements scheduled.

Return type

Dict[int, List[Tuple[str, List[Dict]]]]

flashflow.meas_period.current_period(now, period_dur)

Calculate the measurement period number and return it.

  • now (float) – The current timestamp

  • period_dur (int) – The duration of a measurement period

Return type



The measurement period number in which now resides.

flashflow.meas_period.current_slot(now, period_dur, meas_dur)

Calculate the slot number and return it.

  • now (float) – The current timestamp

  • period_dur (int) – The duration of a measurement period

  • meas_dur (int) – The duration of a measurement

Return type



The slot number in which now resides.

flashflow.meas_period.time_till_next_slot(now, meas_dur)

Calculate the time remaining until the next measurement slot starts.

  • now (float) – The current timestamp

  • meas_dur (int) – The duration of a measurement period

Return type


flashflow.msg module

Messages that FlashFlow coordinators and measurers can send to each other.

Messages serialize to JSON. Each contains a MsgType integer, which is the way the message type is determined. Parties are trusted to not be malicious, so relatively little is done to verify that messages are well-formed.


To create a new message, create it directly with its constructor. E.g. ConnectToRelay().

To send a message, call its FFMsg.serialize() method and write the bytes you get out to the stream.

To receive a message, pass the bytes to the static method FFMsg.deserialize()


# "Send" message to measurer
m_send = ConnectToRelay('DEADBEEF', 80, 30)
print(m_send.serialize())  # outputs JSON byte string
# "Receive" message from coordinator
b = b"{'msg_type': -289, 'sent': 16666, 'recv': 15555}"
m_recv = FFMsg.deserialize(b)  # Returns BwReport object

Adding new messages

  1. Define its MsgType with a random integer

  2. Check for the new variant in FFMsg.deserialize()

  3. Define the new class, ensuring you
    1. Set msg_type to the new MsgType variant

    2. Define a _to_dict() method that takes self and returns a dict

    3. Define a from_dict() method that takes a dict and returns a valid instance of the new message type

class flashflow.msg.BwReport(meas_id, ts, sent, recv)

Bases: flashflow.msg.FFMsg

Measurer to Coordinator message containing the number of sent and received bytes with the target relay in the last second.

  • meas_id (int) – the ID of the measurement to which this applies

  • ts (float) – the seconds since the unix epoch for which this BwReport applies

  • sent (int) – number of sent bytes in the last second

  • recv (int) – number of received bytes in the last second

static from_dict(d)
Return type


msg_type = -289
class flashflow.msg.ConnectToRelay(meas_id, fp, n_circs, bw, dur)

Bases: flashflow.msg.FFMsg

Coordinator to Measurer message instructing them to connect to the specified relay.

  • meas_id (int) – the ID to assign to this measurement

  • fp (str) – the fingerprint of the relay to which the measurer should connect

  • n_circs (int) – the number of circuits they should open with the relay

  • bw (int) – the amount of bandwidth, in bytes/second, the measurer should allocate for this measurement

  • dur (int) – the duration of the active measurement phase, in seconds

static from_dict(d)
Return type


msg_type = 357
class flashflow.msg.ConnectedToRelay(orig)

Bases: flashflow.msg.FFMsg

Measurer to Coordinator message indicating the have successfully connected to the relay. Non-success is signed with a Failure message


orig (ConnectToRelay) – the original ConnectToRelay message

static from_dict(d)
Return type


msg_type = 78612
class flashflow.msg.FFMsg

Bases: object

Base class for all messages that FlashFlow coordinators and measurers can send to each other.

See the module-level documentation for more information.

static deserialize(b)
Return type


Return type


class flashflow.msg.FailCode

Bases: enum.Enum

Failure codes.

Those prefixed with M_ can only originate at a measurer. Those prefixed with C_ can only originate at a coordinator. All others can originate from anywhere.


Coordinator reached the end of the measurement’s slot and the measurement still hadn’t ended. Maybe hadn’t even begun!


Coordinator’s Tor client didn’t accept command to start active measurement


A Tor client was unable to launch the required circuit(s) with the relay


A Tor client sent its controller a response it couldn’t understand


Measurer cannot start a new measurement with the given ID because it already has one with the same ID


Measurer’s Tor client didn’t accept command to start active measurement


Measurer given a command containing an unknown measurement ID

class flashflow.msg.Failure(code, meas_id, extra_info=None)

Bases: flashflow.msg.FFMsg

Bidirectional message indicating the sending party has experienced some sort of error and the measurement should be halted.

  • meas_id (Optional[int]) – the ID of the measurement to which this applies, or None if the failure is not specific to a measurement

  • code (FailCode) – the FailCode

  • info – optional, any arbitrary extra information already stringified

static from_dict(d)
Return type


msg_type = 62424
class flashflow.msg.Go(meas_id)

Bases: flashflow.msg.FFMsg

Coordinator to Measurer message indicating its time to start the measurement


meas_id (int) – the ID of the measurement to which this applies

static from_dict(d)
Return type


msg_type = 1089
class flashflow.msg.MsgType

Bases: enum.Enum

Message types used so that the parent FFMsg class can tell which type of JSON it is looking at and pass deserialization work off to the appropriate subclass.

I would normally use enum.auto() for these since I don’t want to allow implicit assumptions about each variant’s value and their relation to each other. However in the off chance a version X coordinator tries to talk to version Y measurer with different values for the variants, setting static and explicit values helps preserve their ability to communicate.

BW_REPORT = -289
FAILURE = 62424
GO = 1089

flashflow.relay_filter_list module

Relay Filter List

Parse a relay filter list file and decide whether or not we should measure a relay based on its fingerprint.

Files are line and word based and read left-to-right, top-to-bottom. First word to match a given fingerprint wins.


Everything after a # is a comment:

# This is a comment
this is not a comment # but this is
other stuff

A word starting with ! is a negative match, meaning that if the rest of the word matches, then the relay should not be measured. Normally a match means yes, do measure.


A word containing a * is a wildcard word, meaning it matches all fingerprints.

*   # means measure all relays
!*  # means do not measure any relay

Relay fingerprints are the only other valid non-comment text that should be in this type of file.


You may find that non-fingerprint text is parsed as fingerprints if they are not in a comment. Be careful. Don’t do this.

You can have multiple words/fingerprints per line. These snippets are parsed the exactly same way. They demonstrate a config where 3 relays have opted-in to being measured and no other relay should be measured.

# First

# Second
RelayFP1 RelayFP2 RelayFP3

# Third
RelayFP1 RelayFP2 RelayFP3 !*

RelayFP1 RelayFP2 RelayFP3 !*   # Forth



For brevity, pretend that relay fingerprints are four alphanumeric characters in the following examples.

Do not measure any relay, ever:


Measure all relays except one:


Only measure one relay:


Maybe two people have opted in to be measured and you want to organize their relays by their families:

# Jacob's relays

# Paul's relays
# Paul said we shouldn't measure this one

class flashflow.relay_filter_list.RFLWord(s)

Bases: object

A single word read from a file. You should not not need to use this directly.

is_negative: bool = False

Whether this is a negative-match word or not. If True, then if the fingerprint matches, the relay should NOT be measured.

is_wildcard: bool = False

Whether this is a wildcard word or not. If True, then this word matches all fingerprints.


Determine if the given string matches this RFLWord

Return type


class flashflow.relay_filter_list.RelayFilterList

Bases: object

static from_str(s)

Given the entire string contents of a file, return a new RelayFilterList

Return type


should_measure(fp, default)

Determine whether or not the given fp should be measured. If no match is found, then return default

Return type


words: List[RFLWord] = None

Ordered list of word we read from the file


Normalize a fingerprint so no matter the format in which it is received, it will be in a consistent format for later comparisons to work.

Currently this function is used for things that may not be exactly a fingerprint: comment lines, and wildcard words are two examples. Don’t edit this function without verifying this is no longer the case or that what you want to do won’t break those other things.

Return type


flashflow.results_logger module

Helper functions for writing per-second measurement results to a file that might rotate, as well as classes for reading those results from files later.

Note: The information here is only partially true until pastly/flashflow#4 is implemented and this message is removed.

Results are “logged” via logging at level INFO. It is important that the user does not edit the way these messages are logged. If the user would like to rotate the output file, e.g. with logrotate, they can do that because by default (and this should not be changed lightly) these “log messages” get “logged” via a logging.handlers.WatchedFileHandler, which handles this situation gracefully.


Call write_begin() once at the beginning of the active measurement phase. As measurement results come in every second from measurers, call write_meas() for each. Likewise for per-second background traffic reports and write_bg(). As soon as active measurement is over, call write_end().

Output Format

Output is line based. Multiple measurements can take place simultaneously, in which case per-second results from measurements of different relays can be interleaved.

A BEGIN line signals the start of data for the measurement of a relay. An END line signals the end. Between these lines there are zero or more result lines for the measurement of this relay, each with a per-second result from either a measurer measuring that relay or that relay itself reporting the amount of background traffic it saw that second.


<meas_id> <time> BEGIN <fp>


  • meas_id: the measurement ID for this measurement

  • time: the integer unix timestamp at which active measurement began.

  • fp: the fingerprint of the relay this BEGIN message is for.


58234 1591979504 BEGIN B0430D21D6609459D141078C0D7758B5CA753B6F

END line

<meas_id> <time> END


  • meas_id: the measurement ID for this measurement

  • time: the integer unix timestamp at which active measurement ended.


58234 1591979534 END B0430D21D6609459D141078C0D7758B5CA753B6F

Results line

<meas_id> <time> <is_bg> GIVEN=<given> TRUSTED=<trusted>


  • meas_id: the measurement ID for this measurement

  • time: the integer unix timestamp at which this result was received.

  • is_bg: ‘BG’ if this result is a report from the relay on the number of background bytes it saw in the last second, or ‘MEASR’ if this is a result from a measurer

  • given: the number of bytes reported

  • trusted: if a bg report from the relay, the maximum given is trusted to be; or if a measurer result, then the same as given.

Both given and trusted are in bytes. Yes, for measurer lines it is redundant to specify both.

Background traffic reports from the relay include the raw actual reported value in given; if the relay is malicious and claims 8 TiB of background traffic in the last second, you will see that here. trusted is the max that given can be. When reading results from this file, use min(given, trusted) as the trusted number of background bytes this second.


# bg report from relay, use GIVEN b/c less than TRUSTED
58234 1591979083 BG GIVEN=744904 TRUSTED=1659029
# bg report from relay, use TRUSTED b/c less than GIVEN
58234 1591979042 BG GIVEN=671858 TRUSTED=50960
# result from measurer, always trusted
58234 1591979083 MEASR GIVEN=5059082 TRUSTED=5059082
class flashflow.results_logger.Meas(begin)

Bases: object

Accumulate MeasLine* objects into a single measurement summary.

The first measurement line you should see is a MeasLineBegin; create a Meas object with it. Then pass each MeasLineData that you encounter to either Meas.add_measr() or Meas.add_bg() based on where it came from. Finally pass the MeasLineEnd to tell the object it has all the data.

Not much is done to ensure you’re using this data storage class correctly. For example:

  • You can add more MeasLineData after marking the end.

  • You can pass untrusted MeasLineData from the relay to the

    Meas.add_measr() function where they will be treated as trusted.

  • You can get the Meas.result() before all data lines have been


  • You can provide data from different measurements for different


You shouldn’t do these things, but you can. It’s up to you to use your tools as perscribed.


Add a MeasLineData to our results that came from the relay and is regarding the amount of background traffic.

As it came from the relay, we do not a given_bw > trusted_bw. Thus we add the minimum of the two to the appropriate second.


Add a MeasLineData to our results that came from a measurer.

As it came from a measurer, we trust it entirely (and there’s no trusted_bw member) and simply add it to the appropriate second.


Check if we still expect to be given more data

Return type


property meas_id

The measurement ID, as given in the initial MeasLineBegin.

Return type


property relay_fp

The relay measured, as given in the initial MeasLineBegin.

Return type



Calculate and return the result of this measurement

Return type



Indicate that there is no more data to be loaded into this Meas.

property start_ts

The integer timestamp for when the measurement started, as given in the initial MeasLineBegin.

Return type


class flashflow.results_logger.MeasLine(meas_id, ts)

Bases: object

Parent class for other MeasLine* types. You should only ever need to interact with this class directly via its MeasLine.parse() method.

static parse(s)

Try to parse a MeasLine subclass from the given line s. If impossible, return None.

Return type


class flashflow.results_logger.MeasLineBegin(fp, *a, **kw)

Bases: flashflow.results_logger.MeasLine

class flashflow.results_logger.MeasLineData(given_bw, trusted_bw, *a, **kw)

Bases: flashflow.results_logger.MeasLine

Return type


class flashflow.results_logger.MeasLineEnd(*a, **kw)

Bases: flashflow.results_logger.MeasLine

flashflow.results_logger.write_begin(fp, meas_id, ts)

Write a log line indicating the start of the given relay’s measurement.

  • fp (str) – the fingerprint of the relay

  • meas_id (int) – the measurement ID

  • ts (int) – the unix timestamp at which the measurement began

flashflow.results_logger.write_bg(meas_id, ts, given, trusted)

Write a single per-second report of bg traffic from the relay to our results.

  • meas_id (int) – the measurement ID

  • ts (int) – the unix timestamp at which the result came in

  • given (int) – the number of reported bg bytes

  • trusted (int) – the maximum given should be (from our perspective in this logging code, it’s fine if given is bigger than trusted)

flashflow.results_logger.write_end(meas_id, ts)

Write a log line indicating the end of the given relay’s measurement.

  • meas_id (int) – the measurement ID

  • ts (int) – the unix timestamp at which the measurement ended

flashflow.results_logger.write_meas(meas_id, ts, res)

Write a single per-second result from a measurer to our results.

  • meas_id (int) – the measurement ID

  • ts (int) – the unix timestamp at which the result came in

  • res (int) – the number of measured bytes

flashflow.state_file module

State file

class flashflow.state_file.StateFile

Bases: object

d: Dict[str, Any] = None

The data

fname: Optional[str] = None

The filename we were loaded from, if any

static from_file(fname)

Load a state object from the given filename. If the file doesn’t exist, just return a new object.

Return type


get(key, default=None)

Get the value stored at key, or the provided default value if there is no such key. By default, default is None.

Return type


set(key, val, skip_write=False)

Set key to val, and write out this change to the state file, unless skip_write is set to True.


Write ourselves out to the given filename, overwriting anything that might already exist there.

  • If no file is given and we don’t know what file we were read from, do nothing.

  • If no file is given but we do know from where we were read, write out to that file.

  • If a file is given, write out to that regardless of where we were read (if anywhere).

flashflow.tor_client module

Stem helper stuff.

flashflow.tor_client.launch(tor_bin, tor_datadir, torrc_extra)

Launch and connect to Tor, returning the stem.control.Controller on success, or None on failure.

  • tor_bin (str) – How to execute tor. I.e. either “tor” or “./path/to/tor”

  • tor_datadir (str) – DataDirectory to use

  • torrc_extra (str) – Extra arbitrary lines to add to the torrc we use

Return type


flashflow.tor_client.send_msg(c, m)

Send a message to Tor on the given Controller, wait for the response, and return it.

This should only be used for messages for which stem doesn’t already provide an interface. This is a thin wrapper. The reasons for it existing are:

  • To avoid using stem.control.BaseController.msg() directly.

  • Only allow ourselves to send specific messages.

  • Make it “impossible” to send malformed messages by only accepting TorCtrlMsg subtypes and using static analyses

Return type


flashflow.tor_ctrl_msg module

Collection of Tor control commands that now exist in Tor in support of to Flashflow but for which Stem does not yet implement a better interface.

All messages must be of the ABC TorCtrlMsg and provide its methods. Most notably this means the __str__ method, which is used to turn the message into a string for sending to the ControlPort.

New messages are free to have a more complex construction process than simply passing in all necessary information at __init__ time. But at the end of the day they need to implement all TorCtrlMsg’s methods and have a __str__ that turns them into a one-line string for sending to Tor.

class flashflow.tor_ctrl_msg.CoordStartMeas(meas_id, nick_fp, dur)

Bases: flashflow.tor_ctrl_msg.TorCtrlMsg

Coordinator to its client, instructing it to start the measurement processs with the given relay.

  • meas_id (int) – Measurement ID

  • nick_fp (str) – Nickname or fingerprint of the relay to measure. Always use fingerprint in practice.

  • dur (int) – The duration, in seconds, of the measurement.

class flashflow.tor_ctrl_msg.MeasrStartMeas(meas_id, nick_fp, n_circs, bw, dur)

Bases: flashflow.tor_ctrl_msg.TorCtrlMsg

Dual-purpose message sent from a FlashFlow measurer to its Tor client.

First it’s used to tell it to open circuits with the given relay as part of the pre-measurement process.

Later, when everything is setup and ready to go, it is used to tell the tor client to actually start sending measurement traffic with the relay.

  • meas_id (int) – Measurement ID

  • nick_fp (str) – Nickname or fingerprint of the relay to measure. Always use fingerprint in practice.

  • n_circs (int) – Number of circuits (and connections) to open with the relay.

  • bw (int) – The amount of bandwidth in bytes/second the client should allocate for this measurement.

  • dur (int) – The duration, in seconds, of the measurement.

class flashflow.tor_ctrl_msg.TorCtrlMsg

Bases: abc.ABC

Base class for our control commands.

flashflow.v3bw module

Functions to generate a v3bw file from the latest per-second measurement results

flashflow.v3bw.gen(v3bw_fname, results_fname, max_results_age)

Generate a v3bw file based on the latest per-second measurement results we have on disk.

  • v3bw_fname (str) – The path to the v3bw file to create

  • results_fname (str) – The path to the current results filename (e.g. data-coord/results/results.log). It will be read for the latest results, and if needed, an * appended to the name to search for adjacent logrotated files for additional necessary data.

  • max_results_age (float) – The maximum number of seconds in the past a measurement can have occurred and we’ll still include it in the v3bw file.

Return type



Path to the v3bw file created. This will be the v3bw_fname argument plus a suffix.

Module contents