blob: dd2f00b77ad32d277ba2401fc38038ef88312812 [file] [log] [blame]
Scott Baker761e1062016-06-20 17:18:17 -07001import requests
2import logging
3import json
4import sys
5from rest_framework.exceptions import APIException
6
7""" format of settings
8
9 ["settings"]
10 ["watershed"]
11 ["rating"]
12 ["categories"]
13 ["blocklist"]
14 ["allowlist"]
15
16 ["users"]
17 array
18 ["account_id"] - 58
19 ["reporting"] - False
20 ["name"] - Scott1
21 ["devices"]
22 ["settings"] -
23 ["watershed"]
24 ["rating"]
25 ["categories"]
26 ["blocklist"]
27 ["allowlist"]
28
29 ["devices"]
30 array
31 ["username"] - "Scott1" or "" if whole-house
32 ["uuid"] - empty
33 ["mac_address"] - mac address as hex digits in ascii
34 ["type"] - "laptop"
35 ["name"] - human readable name of device ("Scott's laptop")
36 ["settings"]
37 ["watershed"]
38 array
39 array
40 ["rating"]
41 ["category"]
42 ["rating"] - ["G" | "NONE"]
43 ["categories"] - list of categories set by rating
44 ["blocklist"] - []
45 ["allowlist"] - []
46"""
47
48class BBS_Failure(APIException):
49 status_code=400
50 def __init__(self, why="broadbandshield error", fields={}):
51 APIException.__init__(self, {"error": "BBS_Failure",
52 "specific_error": why,
53 "fields": fields})
54
55
56class BBS:
57 level_map = {"PG_13": "PG13",
58 "NONE": "OFF",
59 "ALL": "NONE",
60 None: "NONE"}
61
62 def __init__(self, username, password, bbs_hostname=None, bbs_port=None):
63 self.username = username
64 self.password = password
65
66 # XXX not tested on port 80
67 #self.bbs_hostname = "www.broadbandshield.com"
68 #self.bbs_port = 80
69
70 if not bbs_hostname:
71 bbs_hostname = "cordcompute01.onlab.us"
72 if not bbs_port:
73 bbs_port = 8018
74
75 self.bbs_hostname = bbs_hostname
76 self.bbs_port = int(bbs_port)
77
78 self.api = "http://%s:%d/api" % (self.bbs_hostname, self.bbs_port)
79 self.nic_update = "http://%s:%d/nic/update" % (self.bbs_hostname, self.bbs_port)
80
81 self.session = None
82 self.settings = None
83
84 def login(self):
85 self.session = requests.Session()
86 r = self.session.post(self.api + "/login", data = json.dumps({"email": self.username, "password": self.password}))
87 if (r.status_code != 200):
88 raise BBS_Failure("Failed to login (%d)" % r.status_code)
89
90 def get_account(self):
91 if not self.session:
92 self.login()
93
94 r = self.session.get(self.api + "/account")
95 if (r.status_code != 200):
96 raise BBS_Failure("Failed to get account settings (%d)" % r.status_code)
97 self.settings = r.json()
98
99 return self.settings
100
101 def post_account(self):
102 if not self.settings:
103 raise XOSProgrammingError("no settings to post")
104
105 r = self.session.post(self.api + "/account/settings", data= json.dumps(self.settings))
106 if (r.status_code != 200):
107 raise BBS_Failure("Failed to set account settings (%d)" % r.status_code)
108
109 def add_device(self, name, mac, type="tablet", username=""):
110 data = {"name": name, "mac_address": mac, "type": type, "username": username}
111 r = self.session.post(self.api + "/device", data = json.dumps(data))
112 if (r.status_code != 200):
113 raise BBS_Failure("Failed to add device (%d)" % r.status_code)
114
115 def delete_device(self, data):
116 r = self.session.delete(self.api + "/device", data = json.dumps(data))
117 if (r.status_code != 200):
118 raise BBS_Failure("Failed to delete device (%d)" % r.status_code)
119
120 def add_user(self, name, rating="NONE", categories=[]):
121 data = {"name": name, "settings": {"rating": rating, "categories": categories}}
122 r = self.session.post(self.api + "/users", data = json.dumps(data))
123 if (r.status_code != 200):
124 raise BBS_Failure("Failed to add user (%d)" % r.status_code)
125
126 def delete_user(self, data):
127 r = self.session.delete(self.api + "/users", data = json.dumps(data))
128 if (r.status_code != 200):
129 raise BBS_Failure("Failed to delete user (%d)" % r.status_code)
130
131 def clear_users_and_devices(self):
132 if not self.settings:
133 self.get_account()
134
135 for device in self.settings["devices"]:
136 self.delete_device(device)
137
138 for user in self.settings["users"]:
139 self.delete_user(user)
140
141 def get_whole_home_level(self):
142 if not self.settings:
143 self.get_account()
144
145 return self.settings["settings"]["rating"]
146
147 def sync(self, whole_home_level, users):
148 if not self.settings:
149 self.get_account()
150
151 vcpe_users = {}
152 for user in users:
153 user = user.copy()
154 user["level"] = self.level_map.get(user["level"], user["level"])
155 user["mac"] = user.get("mac", "")
156 vcpe_users[user["name"]] = user
157
158 whole_home_level = self.level_map.get(whole_home_level, whole_home_level)
159
160 if (whole_home_level != self.settings["settings"]["rating"]):
161 print "*** set whole_home", whole_home_level, "***"
162 self.settings["settings"]["rating"] = whole_home_level
163 self.post_account()
164
165 bbs_usernames = [bbs_user["name"] for bbs_user in self.settings["users"]]
166 bbs_devicenames = [bbs_device["name"] for bbs_device in self.settings["devices"]]
167
168 add_users = []
169 add_devices = []
170 delete_users = []
171 delete_devices = []
172
173 for bbs_user in self.settings["users"]:
174 bbs_username = bbs_user["name"]
175 if bbs_username in vcpe_users.keys():
176 vcpe_user = vcpe_users[bbs_username]
177 if bbs_user["settings"]["rating"] != vcpe_user["level"]:
178 print "set user", vcpe_user["name"], "rating", vcpe_user["level"]
179 #bbs_user["settings"]["rating"] = vcpe_user["level"]
180 # add can be used as an update
181 add_users.append(vcpe_user)
182 else:
183 delete_users.append(bbs_user)
184
185 for bbs_device in self.settings["devices"]:
186 bbs_devicename = bbs_device["name"]
187 if bbs_devicename in vcpe_users.keys():
188 vcpe_user = vcpe_users[bbs_devicename]
189 if bbs_device["mac_address"] != vcpe_user["mac"]:
190 print "set device", vcpe_user["name"], "mac", vcpe_user["mac"]
191 #bbs_device["mac_address"] = vcpe_user["mac"]
192 # add of a device can't be used as an update, as you'll end
193 # up with two of them.
194 delete_devices.append(bbs_device)
195 add_devices.append(vcpe_user)
196 else:
197 delete_devices.append(bbs_device)
198
199 for (username, user) in vcpe_users.iteritems():
200 if not username in bbs_usernames:
201 add_users.append(user)
202 if not username in bbs_devicenames:
203 add_devices.append(user)
204
205 for bbs_user in delete_users:
206 print "delete user", bbs_user["name"]
207 self.delete_user(bbs_user)
208
209 for bbs_device in delete_devices:
210 print "delete device", bbs_device["name"]
211 self.delete_device(bbs_device)
212
213 for vcpe_user in add_users:
214 print "add user", vcpe_user["name"], "level", vcpe_user["level"]
215 self.add_user(vcpe_user["name"], vcpe_user["level"])
216
217 for vcpe_user in add_devices:
218 print "add device", vcpe_user["name"], "mac", vcpe_user["mac"]
219 self.add_device(vcpe_user["name"], vcpe_user["mac"], "tablet", vcpe_user["name"])
220
221 def get_whole_home_rating(self):
222 return self.settings["settings"]["rating"]
223
224 def get_user(self, name):
225 for user in self.settings["users"]:
226 if user["name"]==name:
227 return user
228 return None
229
230 def get_device(self, name):
231 for device in self.settings["devices"]:
232 if device["name"]==name:
233 return device
234 return None
235
236 def dump(self):
237 if not self.settings:
238 self.get_account()
239
240 print "whole_home_rating:", self.settings["settings"]["rating"]
241 print "users:"
242 for user in self.settings["users"]:
243 print " user", user["name"], "rating", user["settings"]["rating"]
244
245 print "devices:"
246 for device in self.settings["devices"]:
247 print " device", device["name"], "user", device["username"], "rating", device["settings"]["rating"], "mac", device["mac_address"]
248
249 def associate(self, ip):
250 bbs_hostname = "cordcompute01.onlab.us"
251 r = requests.get(self.nic_update, params={"hostname": "onlab.us"}, headers={"X-Forwarded-For": ip}, auth=requests.auth.HTTPBasicAuth(self.username,self.password))
252 if (r.status_code != 200):
253 raise BBS_Failure("Failed to associate account with ip (%d)" % r.status_code)
254
255def dump():
256 bbs = BBS(sys.argv[2], sys.argv[3])
257 bbs.dump()
258
259def associate():
260 if len(sys.argv)<5:
261 print "you need to specify IP address"
262 sys.exit(-1)
263
264 bbs = BBS(sys.argv[2], sys.argv[3])
265 bbs.associate(sys.argv[4])
266
267def self_test():
268 bbs = BBS(sys.argv[2], sys.argv[3])
269
270 print "*** initial ***"
271 bbs.dump()
272
273 open("bbs.json","w").write(json.dumps(bbs.settings))
274
275 # a new BBS account will throw a 500 error if it has no rating
276 bbs.settings["settings"]["rating"] = "R"
277 #bbs.settings["settings"]["category"] = [u'PORNOGRAPHY', u'ADULT', u'ILLEGAL', u'WEAPONS', u'DRUGS', u'GAMBLING', u'CYBERBULLY', u'ANONYMIZERS', u'SUICIDE', u'MALWARE']
278 #bbs.settings["settings"]["blocklist"] = []
279 #bbs.settings["settings"]["allowlist"] = []
280 #for water in bbs.settings["settings"]["watershed"];
281 # water["categories"]=[]
282 # delete everything
283 bbs.post_account()
284 bbs.clear_users_and_devices()
285
286 print "*** cleared ***"
287 bbs.settings=None
288 bbs.dump()
289
290 users = [{"name": "Moms pc", "level": "R", "mac": "010203040506"},
291 {"name": "Dads pc", "level": "R", "mac": "010203040507"},
292 {"name": "Jacks ipad", "level": "PG", "mac": "010203040508"},
293 {"name": "Jills iphone", "level": "G", "mac": "010203040509"}]
294
295 print "*** syncing mom-R, Dad-R, jack-PG, Jill-G, wholehome-PG-13 ***"
296
297 bbs.settings = None
298 bbs.sync("PG-13", users)
299
300 print "*** after sync ***"
301 bbs.settings=None
302 bbs.dump()
303 assert(bbs.get_whole_home_rating() == "PG-13")
304 assert(bbs.get_user("Moms pc")["settings"]["rating"] == "R")
305 assert(bbs.get_user("Dads pc")["settings"]["rating"] == "R")
306 assert(bbs.get_user("Jacks ipad")["settings"]["rating"] == "PG")
307 assert(bbs.get_user("Jills iphone")["settings"]["rating"] == "G")
308 assert(bbs.get_device("Moms pc")["mac_address"] == "010203040506")
309 assert(bbs.get_device("Dads pc")["mac_address"] == "010203040507")
310 assert(bbs.get_device("Jacks ipad")["mac_address"] == "010203040508")
311 assert(bbs.get_device("Jills iphone")["mac_address"] == "010203040509")
312
313 print "*** update whole home level ***"
314 bbs.settings=None
315 bbs.get_account()
316 bbs.settings["settings"]["rating"] = "PG"
317 bbs.post_account()
318
319 print "*** after sync ***"
320 bbs.settings=None
321 bbs.dump()
322 assert(bbs.get_whole_home_rating() == "PG")
323 assert(bbs.get_user("Moms pc")["settings"]["rating"] == "R")
324 assert(bbs.get_user("Dads pc")["settings"]["rating"] == "R")
325 assert(bbs.get_user("Jacks ipad")["settings"]["rating"] == "PG")
326 assert(bbs.get_user("Jills iphone")["settings"]["rating"] == "G")
327 assert(bbs.get_device("Moms pc")["mac_address"] == "010203040506")
328 assert(bbs.get_device("Dads pc")["mac_address"] == "010203040507")
329 assert(bbs.get_device("Jacks ipad")["mac_address"] == "010203040508")
330 assert(bbs.get_device("Jills iphone")["mac_address"] == "010203040509")
331
332 print "*** delete dad, change moms IP, change jills level to PG, change whole home to PG-13 ***"
333 users = [{"name": "Moms pc", "level": "R", "mac": "010203040511"},
334 {"name": "Jacks ipad", "level": "PG", "mac": "010203040508"},
335 {"name": "Jills iphone", "level": "PG", "mac": "010203040509"}]
336
337 bbs.settings = None
338 bbs.sync("PG-13", users)
339
340 print "*** after sync ***"
341 bbs.settings=None
342 bbs.dump()
343 assert(bbs.get_whole_home_rating() == "PG-13")
344 assert(bbs.get_user("Moms pc")["settings"]["rating"] == "R")
345 assert(bbs.get_user("Dads pc") == None)
346 assert(bbs.get_user("Jacks ipad")["settings"]["rating"] == "PG")
347 assert(bbs.get_user("Jills iphone")["settings"]["rating"] == "PG")
348 assert(bbs.get_device("Moms pc")["mac_address"] == "010203040511")
349 assert(bbs.get_device("Dads pc") == None)
350 assert(bbs.get_device("Jacks ipad")["mac_address"] == "010203040508")
351
352 print "add dad's laptop"
353 users = [{"name": "Moms pc", "level": "R", "mac": "010203040511"},
354 {"name": "Dads laptop", "level": "PG-13", "mac": "010203040512"},
355 {"name": "Jacks ipad", "level": "PG", "mac": "010203040508"},
356 {"name": "Jills iphone", "level": "PG", "mac": "010203040509"}]
357
358 bbs.settings = None
359 bbs.sync("PG-13", users)
360
361 print "*** after sync ***"
362 bbs.settings=None
363 bbs.dump()
364 assert(bbs.get_whole_home_rating() == "PG-13")
365 assert(bbs.get_user("Moms pc")["settings"]["rating"] == "R")
366 assert(bbs.get_user("Dads pc") == None)
367 assert(bbs.get_user("Dads laptop")["settings"]["rating"] == "PG-13")
368 assert(bbs.get_user("Jacks ipad")["settings"]["rating"] == "PG")
369 assert(bbs.get_user("Jills iphone")["settings"]["rating"] == "PG")
370 assert(bbs.get_device("Moms pc")["mac_address"] == "010203040511")
371 assert(bbs.get_device("Dads pc") == None)
372 assert(bbs.get_device("Dads laptop")["mac_address"] == "010203040512")
373 assert(bbs.get_device("Jacks ipad")["mac_address"] == "010203040508")
374
375 #bbs.add_user("tom", "G", [u'PORNOGRAPHY', u'ADULT', u'ILLEGAL', u'WEAPONS', u'DRUGS', u'GAMBLING', u'SOCIAL', u'CYBERBULLY', u'GAMES', u'ANONYMIZERS', u'SUICIDE', u'MALWARE'])
376 #bbs.add_device(name="tom's iphone", mac="010203040506", type="tablet", username="tom")
377
378def main():
379 if len(sys.argv)<4:
380 print "syntax: broadbandshield.py <operation> <email> <password>"
381 print " operation = [dump | selftest | assocate"
382 sys.exit(-1)
383
384 operation = sys.argv[1]
385
386 if operation=="dump":
387 dump()
388 elif operation=="selftest":
389 self_test()
390 elif operation=="associate":
391 associate()
392
393if __name__ == "__main__":
394 main()
395
396