Hide keyboard shortcuts

Hot-keys on this page

r m x p   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

1''' Messages that FlashFlow coordinators and measurers can send to each other. 

2 

3Messages serialize to JSON. Each contains a :class:`MsgType` integer, which is 

4**the** way the message type is determined. Parties are trusted to not be 

5malicious, so relatively little is done to verify that messages are 

6well-formed. 

7 

8Usage 

9===== 

10 

11To **create** a new message, create it directly with its constructor. E.g. 

12:func:`ConnectToRelay`. 

13 

14To **send** a message, call its :func:`FFMsg.serialize` method and write the 

15bytes you get out to the stream. 

16 

17To **receive** a message, pass the :class:`bytes` to the static method 

18:func:`FFMsg.deserialize` 

19 

20Example:: 

21 

22 # "Send" message to measurer 

23 m_send = ConnectToRelay('DEADBEEF', 80, 30) 

24 print(m_send.serialize()) # outputs JSON byte string 

25 # "Receive" message from coordinator 

26 b = b"{'msg_type': -289, 'sent': 16666, 'recv': 15555}" 

27 m_recv = FFMsg.deserialize(b) # Returns BwReport object 

28 

29Adding new messages 

30=================== 

31 

321. Define its :class:`MsgType` with a random integer 

332. Check for the new variant in :func:`FFMsg.deserialize` 

343. Define the new class, ensuring you 

35 1. Set ``msg_type`` to the new :class:`MsgType` variant 

36 2. Define a ``_to_dict()`` method that takes ``self`` and returns a 

37 ``dict`` 

38 3. Define a ``from_dict()`` method that takes a ``dict`` and returns a valid 

39 instance of the new message type 

40''' # noqa: E501 

41import enum 

42import json 

43from typing import NoReturn, Optional 

44 

45 

46class MsgType(enum.Enum): 

47 ''' Message types used so that the parent :class:`FFMsg` class can tell 

48 which type of JSON it is looking at and pass deserialization work off to 

49 the appropriate subclass. 

50 

51 I would normally use :py:func:`enum.auto` for these since I don't want to 

52 allow implicit assumptions about each variant's value and their relation to 

53 each other. However in the off chance a version ``X`` coordinator tries to 

54 talk to version ``Y`` measurer with different values for the variants, 

55 setting static and explicit values helps preserve their ability to 

56 communicate. 

57 ''' 

58 CONNECT_TO_RELAY = 357 

59 CONNECTED_TO_RELAY = 78612 

60 FAILURE = 62424 

61 GO = 1089 

62 BW_REPORT = -289 

63 

64 

65def _assert_never(x: NoReturn) -> NoReturn: 

66 ''' Helper that always throws an assert. 

67 

68 Used to help mypy ensure that all variants of an enum are covered. 

69 https://github.com/python/mypy/issues/6366#issuecomment-560369716 

70 ''' 

71 assert False, "Unhandled type: {}".format(type(x).__name__) 

72 

73 

74class FailCode(enum.Enum): 

75 ''' :class:`Failure` codes. 

76 

77 Those prefixed with ``M_`` can only originate 

78 at a measurer. Those prefixed with ``C_`` can only originate at a 

79 coordinator. All others can originate from anywhere. ''' 

80 #: A Tor client was unable to launch the required circuit(s) with the relay 

81 LAUNCH_CIRCS = enum.auto() 

82 #: A Tor client sent its controller a response it couldn't understand 

83 MALFORMED_TOR_RESP = enum.auto() 

84 #: Measurer cannot start a new measurement with the given ID because it 

85 #: already has one with the same ID 

86 M_DUPE_MEAS_ID = enum.auto() 

87 #: Measurer given a command containing an unknown measurement ID 

88 M_UNKNOWN_MEAS_ID = enum.auto() 

89 #: Measurer's Tor client didn't accept command to start active measurement 

90 M_START_ACTIVE_MEAS = enum.auto() 

91 #: Coordinator's Tor client didn't accept command to start active 

92 #: measurement 

93 C_START_ACTIVE_MEAS = enum.auto() 

94 #: Coordinator reached the end of the measurement's slot and the 

95 #: measurement still hadn't ended. Maybe hadn't even begun! 

96 C_END_OF_SLOT = enum.auto() 

97 

98 def __str__(self) -> str: 

99 if self is FailCode.LAUNCH_CIRCS: 

100 return 'Unable to launch circuit(s)' 

101 elif self is FailCode.MALFORMED_TOR_RESP: 

102 return 'Malformed response from Tor client' 

103 elif self is FailCode.M_DUPE_MEAS_ID: 

104 return 'Measurer already has measurement with given ID' 

105 elif self is FailCode.M_UNKNOWN_MEAS_ID: 

106 return 'Measurer does not have measurement with given ID' 

107 elif self is FailCode.M_START_ACTIVE_MEAS: 

108 return 'Measurer unable to tell Tor client to start active ' \ 

109 'measurement' 

110 elif self is FailCode.C_START_ACTIVE_MEAS: 

111 return 'Coordinator unable to tell Tor client to start active ' \ 

112 'measurement' 

113 elif self is FailCode.C_END_OF_SLOT: 

114 return 'Coordinator reached of measurement slot without ' \ 

115 'considering the measurement as completed' 

116 else: 

117 _assert_never(self) 

118 

119 

120class FFMsg: 

121 ''' Base class for all messages that FlashFlow coordinators and measurers 

122 can send to each other. 

123 

124 See the module-level documentation for more information. 

125 ''' 

126 def serialize(self) -> bytes: 

127 return json.dumps(self._to_dict()).encode('utf-8') 

128 

129 def _to_dict(self): 

130 assert None, 'Child FFMsg type did not implement _to_dict()' 

131 

132 @staticmethod 

133 def deserialize(b: bytes) -> 'FFMsg': 

134 j = json.loads(b.decode('utf-8')) 

135 msg_type = MsgType(j['msg_type']) 

136 if msg_type is MsgType.CONNECT_TO_RELAY: 

137 return ConnectToRelay.from_dict(j) 

138 elif msg_type is MsgType.CONNECTED_TO_RELAY: 

139 return ConnectedToRelay.from_dict(j) 

140 elif msg_type is MsgType.FAILURE: 

141 return Failure.from_dict(j) 

142 elif msg_type is MsgType.GO: 

143 return Go.from_dict(j) 

144 elif msg_type is MsgType.BW_REPORT: 

145 return BwReport.from_dict(j) 

146 else: 

147 _assert_never(msg_type) 

148 

149 

150class ConnectToRelay(FFMsg): 

151 ''' Coordinator to Measurer message instructing them to connect to the 

152 specified relay. 

153 

154 :param meas_id: the ID to assign to this measurement 

155 :param fp: the fingerprint of the relay to which the measurer should 

156 connect 

157 :param n_circs: the number of circuits they should open with the relay 

158 :param dur: the duration of the active measurement phase, in seconds 

159 ''' 

160 msg_type = MsgType.CONNECT_TO_RELAY 

161 

162 def __init__(self, meas_id: int, fp: str, n_circs: int, dur: int): 

163 self.meas_id = meas_id 

164 self.fp = fp 

165 self.n_circs = n_circs 

166 self.dur = dur 

167 

168 def _to_dict(self) -> dict: 

169 return { 

170 'msg_type': self.msg_type.value, 

171 'meas_id': self.meas_id, 

172 'fp': self.fp, 

173 'n_circs': self.n_circs, 

174 'dur': self.dur 

175 } 

176 

177 @staticmethod 

178 def from_dict(d: dict) -> 'ConnectToRelay': 

179 return ConnectToRelay(d['meas_id'], d['fp'], d['n_circs'], d['dur']) 

180 

181 

182class ConnectedToRelay(FFMsg): 

183 ''' Measurer to Coordinator message indicating the have 

184 successfully connected to the relay. Non-success is signed with a 

185 :class:`Failure` message 

186 

187 :param orig: the original :class:`ConnectToRelay` message 

188 ''' 

189 msg_type = MsgType.CONNECTED_TO_RELAY 

190 

191 def __init__(self, orig: ConnectToRelay): 

192 self.orig = orig 

193 

194 def _to_dict(self) -> dict: 

195 return { 

196 'msg_type': self.msg_type.value, 

197 'orig': self.orig._to_dict(), 

198 } 

199 

200 @staticmethod 

201 def from_dict(d: dict) -> 'ConnectedToRelay': 

202 return ConnectedToRelay(ConnectToRelay.from_dict(d['orig'])) 

203 

204 

205class Failure(FFMsg): 

206 ''' Bidirectional message indicating the sending party has experienced some 

207 sort of error and the measurement should be halted. 

208 

209 :param meas_id: the ID of the measurement to which this applies, or 

210 ``None`` if the failure is not specific to a measurement 

211 :param code: the :class:`FailCode` 

212 :param info: optional, any arbitrary extra information already stringified 

213 ''' 

214 msg_type = MsgType.FAILURE 

215 

216 def __init__( 

217 self, code: FailCode, meas_id: Optional[int], 

218 extra_info: Optional[str] = None): 

219 self.code = code 

220 self.meas_id = meas_id 

221 self.extra_info = extra_info 

222 

223 def _to_dict(self) -> dict: 

224 return { 

225 'msg_type': self.msg_type.value, 

226 'code': self.code.value, 

227 'meas_id': self.meas_id, 

228 'extra_info': self.extra_info, 

229 } 

230 

231 @staticmethod 

232 def from_dict(d: dict) -> 'Failure': 

233 return Failure( 

234 FailCode(d['code']), 

235 d['meas_id'], 

236 extra_info=d['extra_info'], 

237 ) 

238 

239 def __str__(self) -> str: 

240 prefix = str(self.code) 

241 if self.meas_id is not None: 

242 prefix += ' (meas %d)' % (self.meas_id,) 

243 if self.extra_info is not None: 

244 prefix += ': %s' % (str(self.extra_info),) 

245 return prefix 

246 

247 

248class Go(FFMsg): 

249 ''' Coordinator to Measurer message indicating its time to start the 

250 measurement 

251 

252 :param meas_id: the ID of the measurement to which this applies 

253 ''' 

254 msg_type = MsgType.GO 

255 

256 def __init__(self, meas_id: int): 

257 self.meas_id = meas_id 

258 

259 def _to_dict(self) -> dict: 

260 return { 

261 'msg_type': self.msg_type.value, 

262 'meas_id': self.meas_id 

263 } 

264 

265 @staticmethod 

266 def from_dict(d: dict) -> 'Go': 

267 return Go(d['meas_id']) 

268 

269 

270class BwReport(FFMsg): 

271 ''' Measurer to Coordinator message containing the number of sent and 

272 received bytes with the target relay in the last second. 

273 

274 :param meas_id: the ID of the measurement to which this applies 

275 :param ts: the seconds since the unix epoch for which this 

276 :class:`BwReport` applies 

277 :param sent: number of sent bytes in the last second 

278 :param recv: number of received bytes in the last second 

279 ''' 

280 msg_type = MsgType.BW_REPORT 

281 

282 def __init__(self, meas_id: int, ts: float, sent: int, recv: int): 

283 self.meas_id = meas_id 

284 self.ts = ts 

285 self.sent = sent 

286 self.recv = recv 

287 

288 def _to_dict(self) -> dict: 

289 return { 

290 'msg_type': self.msg_type.value, 

291 'meas_id': self.meas_id, 

292 'ts': self.ts, 

293 'sent': self.sent, 

294 'recv': self.recv, 

295 } 

296 

297 @staticmethod 

298 def from_dict(d: dict) -> 'BwReport': 

299 return BwReport(d['meas_id'], d['ts'], d['sent'], d['recv'])