Hot-keys on this page
r m x p toggle line displays
j k next/prev highlighted chunk
0 (zero) top of page
1 (one) first highlighted chunk
1'''
2Measurement Periods and Slots
3==============================
5Helper functions for calculating things about measurement periods and slots.
7Measurement **Period**: Suggested to be a day, each relay is intended to be
8measured once each period. At the beginning of each period (or on startup, if
9started mid-period without an existing schedule for that period), the
10coordinator comes up with a schedule that it will follow for the rest of the
11period, measuring relays in slots.
13Measurement **Slots**: Twice the length of the active measurement duration,
14slots subdivide a measurement period. The first slot of each period is 0, and
15there are ``N`` slots in a period, where ``N = meas_period / (meas_dur * 2)``.
16Relays get scheduled into measurement slots, and the extra length allows for
17wiggle room for overhead due to measurement creation (building 100+ circuits
18takes a while!) and any other causes.
20Here is a diagram of what happens during Period ``M``, which is divided into
21``N`` slots from ``0`` to ``N-1``. Slot 1 is further blown up to show how three
22relays are scheduled to be measured during it. The pre-measurement work for
23each took a different amount of time, which is okay: that's why slots are twice
24as long as the actual measurement duration. The moment the work is done to
25measure a relay, the measurement is started. There is still time left in the
26slot after the relays are measured. This is okay; we just wait until the next
27slot starts before doing any measurements scheduled in it.
29::
31 ------------ Time ------------>
33 |--Period M-------------------------------------------------|--Period M+1--
34 |--Slot 0----|--Slot 1----| ... |--Slot N-2----|--Slot N-1--|--Slot 0----
35 / \\
36 / \\
37 / -----------------------------------------\\
38 / |
39 |--Pre-meas relay1--|--Meas relay1--------------| |
40 |--Pre-meas relay2----|--Meas relay2--------------| |
41 |--Pre-meas relay3----------|--Meas relay3--------------| |
43It's possible a slot doesn't have any measurements in it. That's fine. We just
44wait until the next slot.
45'''
46import logging
47import random
48from typing import List, Dict, Tuple
51log = logging.getLogger(__name__)
52#: How much larger is a slot than a single measurement's duration? E.g. ``2``
53#: here means 30 second measurements are scheduled into 60 second slots.
54MEAS_TO_SLOT_FACT = 2
57def _time_till_next(now: float, dur: int) -> float:
58 ''' Calculate time till the next ``dur`` period of time starts
60 :param now: The current timestamp
61 :parm dur: The slice size used to divide time
63 :returns: The time remaining in the current slice of time
64 '''
65 current_slice = int(now / dur)
66 next_slice = current_slice + 1
67 return next_slice * dur - now
70def current_period(now: float, period_dur: int) -> int:
71 ''' Calculate the measurement period number and return it.
73 :param now: The current timestamp
74 :param period_dur: The duration of a measurement period
76 :returns: The measurement period number in which ``now`` resides.
77 '''
78 return int(now / period_dur)
81# ## Unit test is written for this, but commented out
82# def time_till_next_period(now: float, period_dur: int) -> float:
83# ''' Return the time remaining until the next measurement period starts.
84#
85# :param now: The current timestamp
86# :param period_dur: The duration of a measurement period
87# '''
88# return _time_till_next(now, period_dur)
91def current_slot(now: float, period_dur: int, meas_dur: int) -> int:
92 ''' Calculate the slot number and return it.
94 :param now: The current timestamp
95 :param period_dur: The duration of a measurement period
96 :param meas_dur: The duration of a measurement
98 :returns: The slot number in which ``now`` resides.
99 '''
100 slot_dur = meas_dur * MEAS_TO_SLOT_FACT
101 time_since_period_begin = now - period_dur*current_period(now, period_dur)
102 return int(time_since_period_begin / slot_dur)
105def time_till_next_slot(now: float, meas_dur: int) -> float:
106 ''' Calculate the time remaining until the next measurement slot starts.
108 :param now: The current timestamp
109 :param meas_dur: The duration of a measurement period
110 '''
111 return _time_till_next(now, meas_dur * MEAS_TO_SLOT_FACT)
114class MeasrInfo:
115 ''' Store general information on a measurer in one object to easily pass it
116 around.
118 :param measr_id: The measurer's ID
119 :param bw: The amount of bandwidth, in **bytes**/second, the measurer is
120 capable of
121 '''
122 def __init__(self, measr_id: str, bw: int):
123 self.measr_id = measr_id
124 self.bw = bw
127class RelayInfo:
128 ''' Store general information on a relay in one object to easily pass it
129 around.
131 :param fp: The relay's fingerprint
132 '''
133 def __init__(self, fp: str):
134 self.fp = fp
137class MeasrMeasInfo:
138 ''' Store info associated to how a specific measurer participates in a
139 specific measurement.
141 :param measr_id: A unique ID for the measurer that will be still be
142 meaningful hours after making this object, ideally across reconnection
143 with the measurer. In practice, we require measurers to use a unique
144 ``organizationName`` in their certificate and use that.
145 :param n_circs: The number of circuits this measurer shall open with the
146 relay.
147 :param bw: The amount of bandwidth, in bytes/second, the measurer should
148 allocate to measuring this relay.
149 '''
150 def __init__(self, measr_id: str, n_circs: int, bw: int):
151 self.measr_id = measr_id
152 self.n_circs = n_circs
153 self.bw = bw
155 def to_dict(self) -> Dict:
156 return {
157 'measr_id': self.measr_id,
158 'n_circs': self.n_circs,
159 'bw': self.bw,
160 }
162 @staticmethod
163 def from_dict(d: Dict) -> 'MeasrMeasInfo':
164 return MeasrMeasInfo(
165 d['measr_id'],
166 d['n_circs'],
167 d['bw'],
168 )
171class Schedule:
172 ''' Measurement Schedule for a Measurement Period.
174 :param relays: List of relays to schedule during the
175 measurement period
176 :param measurers: List of :class:`MeasrInfo` we
177 should use this measurement period
178 :param n_slots: The number of slots there are in a measurement period
179 :param n_circs: The number of circuits the measurers, in aggregate, should
180 open with a relay to measure it
181 '''
182 #: Key is slot number, value is a list tuples containing information needed
183 #: for each measurement.
184 #:
185 #: The contents of the tuple:
186 #: 1. :class:`str`, the fingerprint of the relay to measure
187 #: 2. List of :class:`MeasrMeasInfo` for the measurers to use for this
188 #: measurement
189 #:
190 #: Not every slot number will be in this dict. Missing slots have no
191 #: measurements scheduled.
192 slots: Dict[int, List[Tuple[str, List[MeasrMeasInfo]]]]
194 def __init__(self):
195 ''' Generate an empty schedule. You probably want :meth:`Schedule.gen`
196 '''
197 self.slots = {}
199 @staticmethod
200 def gen(
201 relays: List[RelayInfo], measurers: List[MeasrInfo],
202 n_slots: int, n_circs: int) -> 'Schedule':
203 sched = Schedule()
204 log.info(
205 'Building new %d-slot measurement schedule of %d relays with '
206 '%d measurers capable of a combined %d Mbit/s',
207 n_slots, len(relays), len(measurers),
208 sum([m.bw for m in measurers]) * 8 / 1e6, # bytes to bits, then M
209 )
210 # Shuffle the relays and take at most n_slots of them
211 if len(relays) <= n_slots:
212 log.debug(
213 'Putting a maximum of one relay in each slot.')
214 random.shuffle(relays)
215 else:
216 log.warn(
217 'Not enough slots available to simply measure one relay in '
218 'each. Will measure %d relays (selected uniformly at random) '
219 'and forego the rest', n_slots)
220 relays = random.sample(relays, n_slots)
221 # choose a random set of slots to use
222 slot_ids = random.sample(range(n_slots), len(relays))
223 for slot_id, relay in zip(slot_ids, relays):
224 measurer = random.choice(measurers)
225 sched.slots[slot_id] = [(
226 relay.fp,
227 [MeasrMeasInfo(measurer.measr_id, n_circs, measurer.bw)],
228 )]
229 log.debug(
230 'Will measure %s with %s in slot %d',
231 relay.fp, measurer.measr_id, slot_id)
232 log.info(
233 'Finished generating schedule. Scheduled %d measurements in '
234 '%d slots', sum([len(_) for _ in sched.slots.values()]),
235 len(sched.slots))
236 return sched
238 def to_dict(self) -> Dict[int, List[Tuple[str, List[Dict]]]]:
239 out: Dict[int, List[Tuple[str, List[Dict]]]] = {}
240 for key, measurements in self.slots.items():
241 out[key] = []
242 for relay_fp, measurers in measurements:
243 out[key].append((
244 relay_fp,
245 [m.to_dict() for m in measurers],
246 ))
247 return out
249 @staticmethod
250 def from_dict(d: Dict[int, List[Tuple[str, List[Dict]]]]) -> 'Schedule':
251 sched = Schedule()
252 for key, measurements in d.items():
253 sched.slots[key] = []
254 for relay_fp, measurers in measurements:
255 sched.slots[key].append((
256 relay_fp, [MeasrMeasInfo.from_dict(m) for m in measurers]))
257 return sched