Hide keyboard shortcuts

Hot-keys on this page

r m x p   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

1''' 

2Measurement Periods and Slots 

3============================== 

4 

5Helper functions for calculating things about measurement periods and slots. 

6 

7Measurement **Period**: Suggested to be a day, each relay is intended to be 

8measured once each period. At the beginning of each period (or on startup, if 

9started mid-period without an existing schedule for that period), the 

10coordinator comes up with a schedule that it will follow for the rest of the 

11period, measuring relays in slots. 

12 

13Measurement **Slots**: Twice the length of the active measurement duration, 

14slots subdivide a measurement period. The first slot of each period is 0, and 

15there are ``N`` slots in a period, where ``N = meas_period / (meas_dur * 2)``. 

16Relays get scheduled into measurement slots, and the extra length allows for 

17wiggle room for overhead due to measurement creation (building 100+ circuits 

18takes a while!) and any other causes. 

19 

20Here is a diagram of what happens during Period ``M``, which is divided into 

21``N`` slots from ``0`` to ``N-1``. Slot 1 is further blown up to show how three 

22relays are scheduled to be measured during it. The pre-measurement work for 

23each took a different amount of time, which is okay: that's why slots are twice 

24as long as the actual measurement duration. The moment the work is done to 

25measure a relay, the measurement is started. There is still time left in the 

26slot after the relays are measured. This is okay; we just wait until the next 

27slot starts before doing any measurements scheduled in it. 

28 

29:: 

30 

31 ------------ Time ------------> 

32 

33 |--Period M-------------------------------------------------|--Period M+1-- 

34 |--Slot 0----|--Slot 1----| ... |--Slot N-2----|--Slot N-1--|--Slot 0---- 

35 / \\ 

36 / \\ 

37 / -----------------------------------------\\ 

38 / | 

39 |--Pre-meas relay1--|--Meas relay1--------------| | 

40 |--Pre-meas relay2----|--Meas relay2--------------| | 

41 |--Pre-meas relay3----------|--Meas relay3--------------| | 

42 

43It's possible a slot doesn't have any measurements in it. That's fine. We just 

44wait until the next slot. 

45''' 

46import logging 

47import random 

48from typing import List, Dict, Tuple 

49 

50 

51log = logging.getLogger(__name__) 

52#: How much larger is a slot than a single measurement's duration? E.g. ``2`` 

53#: here means 30 second measurements are scheduled into 60 second slots. 

54MEAS_TO_SLOT_FACT = 2 

55 

56 

57def _time_till_next(now: float, dur: int) -> float: 

58 ''' Calculate time till the next ``dur`` period of time starts 

59 

60 :param now: The current timestamp 

61 :parm dur: The slice size used to divide time 

62 

63 :returns: The time remaining in the current slice of time 

64 ''' 

65 current_slice = int(now / dur) 

66 next_slice = current_slice + 1 

67 return next_slice * dur - now 

68 

69 

70def current_period(now: float, period_dur: int) -> int: 

71 ''' Calculate the measurement period number and return it. 

72 

73 :param now: The current timestamp 

74 :param period_dur: The duration of a measurement period 

75 

76 :returns: The measurement period number in which ``now`` resides. 

77 ''' 

78 return int(now / period_dur) 

79 

80 

81# ## Unit test is written for this, but commented out 

82# def time_till_next_period(now: float, period_dur: int) -> float: 

83# ''' Return the time remaining until the next measurement period starts. 

84# 

85# :param now: The current timestamp 

86# :param period_dur: The duration of a measurement period 

87# ''' 

88# return _time_till_next(now, period_dur) 

89 

90 

91def current_slot(now: float, period_dur: int, meas_dur: int) -> int: 

92 ''' Calculate the slot number and return it. 

93 

94 :param now: The current timestamp 

95 :param period_dur: The duration of a measurement period 

96 :param meas_dur: The duration of a measurement 

97 

98 :returns: The slot number in which ``now`` resides. 

99 ''' 

100 slot_dur = meas_dur * MEAS_TO_SLOT_FACT 

101 time_since_period_begin = now - period_dur*current_period(now, period_dur) 

102 return int(time_since_period_begin / slot_dur) 

103 

104 

105def time_till_next_slot(now: float, meas_dur: int) -> float: 

106 ''' Calculate the time remaining until the next measurement slot starts. 

107 

108 :param now: The current timestamp 

109 :param meas_dur: The duration of a measurement period 

110 ''' 

111 return _time_till_next(now, meas_dur * MEAS_TO_SLOT_FACT) 

112 

113 

114class MeasrInfo: 

115 ''' Store general information on a measurer in one object to easily pass it 

116 around. 

117 

118 :param measr_id: The measurer's ID 

119 :param bw: The amount of bandwidth, in **bytes**/second, the measurer is 

120 capable of 

121 ''' 

122 def __init__(self, measr_id: str, bw: int): 

123 self.measr_id = measr_id 

124 self.bw = bw 

125 

126 

127class RelayInfo: 

128 ''' Store general information on a relay in one object to easily pass it 

129 around. 

130 

131 :param fp: The relay's fingerprint 

132 ''' 

133 def __init__(self, fp: str): 

134 self.fp = fp 

135 

136 

137class MeasrMeasInfo: 

138 ''' Store info associated to how a specific measurer participates in a 

139 specific measurement. 

140 

141 :param measr_id: A unique ID for the measurer that will be still be 

142 meaningful hours after making this object, ideally across reconnection 

143 with the measurer. In practice, we require measurers to use a unique 

144 ``organizationName`` in their certificate and use that. 

145 :param n_circs: The number of circuits this measurer shall open with the 

146 relay. 

147 :param bw: The amount of bandwidth, in bytes/second, the measurer should 

148 allocate to measuring this relay. 

149 ''' 

150 def __init__(self, measr_id: str, n_circs: int, bw: int): 

151 self.measr_id = measr_id 

152 self.n_circs = n_circs 

153 self.bw = bw 

154 

155 def to_dict(self) -> Dict: 

156 return { 

157 'measr_id': self.measr_id, 

158 'n_circs': self.n_circs, 

159 'bw': self.bw, 

160 } 

161 

162 @staticmethod 

163 def from_dict(d: Dict) -> 'MeasrMeasInfo': 

164 return MeasrMeasInfo( 

165 d['measr_id'], 

166 d['n_circs'], 

167 d['bw'], 

168 ) 

169 

170 

171class Schedule: 

172 ''' Measurement Schedule for a Measurement Period. 

173 

174 :param relays: List of relays to schedule during the 

175 measurement period 

176 :param measurers: List of :class:`MeasrInfo` we 

177 should use this measurement period 

178 :param n_slots: The number of slots there are in a measurement period 

179 :param n_circs: The number of circuits the measurers, in aggregate, should 

180 open with a relay to measure it 

181 ''' 

182 #: Key is slot number, value is a list tuples containing information needed 

183 #: for each measurement. 

184 #: 

185 #: The contents of the tuple: 

186 #: 1. :class:`str`, the fingerprint of the relay to measure 

187 #: 2. List of :class:`MeasrMeasInfo` for the measurers to use for this 

188 #: measurement 

189 #: 

190 #: Not every slot number will be in this dict. Missing slots have no 

191 #: measurements scheduled. 

192 slots: Dict[int, List[Tuple[str, List[MeasrMeasInfo]]]] 

193 

194 def __init__(self): 

195 ''' Generate an empty schedule. You probably want :meth:`Schedule.gen` 

196 ''' 

197 self.slots = {} 

198 

199 @staticmethod 

200 def gen( 

201 relays: List[RelayInfo], measurers: List[MeasrInfo], 

202 n_slots: int, n_circs: int) -> 'Schedule': 

203 sched = Schedule() 

204 log.info( 

205 'Building new %d-slot measurement schedule of %d relays with ' 

206 '%d measurers capable of a combined %d Mbit/s', 

207 n_slots, len(relays), len(measurers), 

208 sum([m.bw for m in measurers]) * 8 / 1e6, # bytes to bits, then M 

209 ) 

210 # Shuffle the relays and take at most n_slots of them 

211 if len(relays) <= n_slots: 

212 log.debug( 

213 'Putting a maximum of one relay in each slot.') 

214 random.shuffle(relays) 

215 else: 

216 log.warn( 

217 'Not enough slots available to simply measure one relay in ' 

218 'each. Will measure %d relays (selected uniformly at random) ' 

219 'and forego the rest', n_slots) 

220 relays = random.sample(relays, n_slots) 

221 # choose a random set of slots to use 

222 slot_ids = random.sample(range(n_slots), len(relays)) 

223 for slot_id, relay in zip(slot_ids, relays): 

224 measurer = random.choice(measurers) 

225 sched.slots[slot_id] = [( 

226 relay.fp, 

227 [MeasrMeasInfo(measurer.measr_id, n_circs, measurer.bw)], 

228 )] 

229 log.debug( 

230 'Will measure %s with %s in slot %d', 

231 relay.fp, measurer.measr_id, slot_id) 

232 log.info( 

233 'Finished generating schedule. Scheduled %d measurements in ' 

234 '%d slots', sum([len(_) for _ in sched.slots.values()]), 

235 len(sched.slots)) 

236 return sched 

237 

238 def to_dict(self) -> Dict[int, List[Tuple[str, List[Dict]]]]: 

239 out: Dict[int, List[Tuple[str, List[Dict]]]] = {} 

240 for key, measurements in self.slots.items(): 

241 out[key] = [] 

242 for relay_fp, measurers in measurements: 

243 out[key].append(( 

244 relay_fp, 

245 [m.to_dict() for m in measurers], 

246 )) 

247 return out 

248 

249 @staticmethod 

250 def from_dict(d: Dict[int, List[Tuple[str, List[Dict]]]]) -> 'Schedule': 

251 sched = Schedule() 

252 for key, measurements in d.items(): 

253 sched.slots[key] = [] 

254 for relay_fp, measurers in measurements: 

255 sched.slots[key].append(( 

256 relay_fp, [MeasrMeasInfo.from_dict(m) for m in measurers])) 

257 return sched