blob: fc1a6ec2f10c0667cb3672eb2698896d1585c564 [file] [log] [blame]
Sapan Bhatia24836f12013-08-27 10:16:05 -04001import os
2import base64
Tony Mack4fa85fb2013-09-25 14:39:57 -04003from datetime import datetime
Scott Baker86e132c2015-02-11 21:38:09 -08004from xos.config import Config
Andy Baviere7abb622013-10-18 15:11:56 -04005from util.logger import Logger, logging
Sapan Bhatiaeba08432014-04-28 23:58:36 -04006from observer.steps import *
Sapan Bhatiad9468eb2014-08-20 03:03:12 -04007from django.db.models import F, Q
Sapan Bhatia082ee362015-02-08 06:35:36 +00008from core.models import *
Sapan Bhatia8a51bf52015-01-29 20:55:40 +00009import json
10import time
11import pdb
Andy Baviere7abb622013-10-18 15:11:56 -040012
Andy Bavier04111b72013-10-22 16:47:10 -040013logger = Logger(level=logging.INFO)
Sapan Bhatia24836f12013-08-27 10:16:05 -040014
Sapan Bhatiab7b1b6e2015-02-08 06:36:32 +000015def f7(seq):
16 seen = set()
17 seen_add = seen.add
18 return [ x for x in seq if not (x in seen or seen_add(x))]
19
20def elim_dups(backend_str):
21 strs = backend_str.split(' // ')
22 strs2 = f7(strs)
23 return ' // '.join(strs2)
24
Sapan Bhatia082ee362015-02-08 06:35:36 +000025def deepgetattr(obj, attr):
26 return reduce(getattr, attr.split('.'), obj)
27
Sapan Bhatia13c7f112013-09-02 14:19:35 -040028class FailedDependency(Exception):
Tony Mackce79de02013-09-24 10:12:33 -040029 pass
Sapan Bhatia13c7f112013-09-02 14:19:35 -040030
Tony Macke10fbe52015-01-03 19:37:39 -050031class SyncStep(object):
Scott Baker03552842015-02-18 16:13:48 -080032 """ An XOS Sync step.
Sapan Bhatia24836f12013-08-27 10:16:05 -040033
Tony Mackce79de02013-09-24 10:12:33 -040034 Attributes:
35 psmodel Model name the step synchronizes
36 dependencies list of names of models that must be synchronized first if the current model depends on them
37 """
38 slow=False
Tony Mack5f49fad2015-02-24 14:16:43 -050039 def get_prop(self, prop):
Tony Mackce79de02013-09-24 10:12:33 -040040 try:
41 sync_config_dir = Config().sync_config_dir
42 except:
Scott Bakerace7ab52015-02-19 22:25:49 -080043 sync_config_dir = '/etc/xos/sync'
Tony Mackce79de02013-09-24 10:12:33 -040044 prop_config_path = '/'.join(sync_config_dir,self.name,prop)
45 return open(prop_config_path).read().rstrip()
Sapan Bhatia24836f12013-08-27 10:16:05 -040046
Tony Mackce79de02013-09-24 10:12:33 -040047 def __init__(self, **args):
48 """Initialize a sync step
49 Keyword arguments:
50 name -- Name of the step
Scott Baker03552842015-02-18 16:13:48 -080051 provides -- XOS models sync'd by this step
Tony Mackce79de02013-09-24 10:12:33 -040052 """
53 dependencies = []
Tony Mack387a73f2013-09-18 07:59:14 -040054 self.driver = args.get('driver')
Sapan Bhatiaeba08432014-04-28 23:58:36 -040055 self.error_map = args.get('error_map')
56
Tony Mackce79de02013-09-24 10:12:33 -040057 try:
58 self.soft_deadline = int(self.get_prop('soft_deadline_seconds'))
59 except:
60 self.soft_deadline = 5 # 5 seconds
Sapan Bhatia24836f12013-08-27 10:16:05 -040061
Tony Mackce79de02013-09-24 10:12:33 -040062 return
Sapan Bhatia24836f12013-08-27 10:16:05 -040063
Sapan Bhatiae17bc5b2014-04-30 00:53:06 -040064 def fetch_pending(self, deletion=False):
Sapan Bhatia21765662014-07-23 08:59:30 -040065 # This is the most common implementation of fetch_pending
66 # Steps should override it if they have their own logic
67 # for figuring out what objects are outstanding.
Sapan Bhatia39a775f2015-01-29 20:58:25 +000068 main_obj = self.observes
Sapan Bhatiad9468eb2014-08-20 03:03:12 -040069 if (not deletion):
Sapan Bhatia21765662014-07-23 08:59:30 -040070 objs = main_obj.objects.filter(Q(enacted__lt=F('updated')) | Q(enacted=None))
71 else:
72 objs = main_obj.deleted_objects.all()
73
74 return objs
Sapan Bhatiaca2e21f2013-10-02 01:10:02 -040075 #return Sliver.objects.filter(ip=None)
Tony Mackce79de02013-09-24 10:12:33 -040076
Sapan Bhatiaca2e21f2013-10-02 01:10:02 -040077 def check_dependencies(self, obj, failed):
Tony Mackce79de02013-09-24 10:12:33 -040078 for dep in self.dependencies:
Scott Baker105b6b72014-05-12 10:40:25 -070079 peer_name = dep[0].lower() + dep[1:] # django names are camelCased with the first letter lower
Sapan Bhatia082ee362015-02-08 06:35:36 +000080
Sapan Bhatiacfef6ef2014-08-20 03:04:03 -040081 try:
Sapan Bhatia082ee362015-02-08 06:35:36 +000082 peer_object = deepgetattr(obj, peer_name)
83 try:
84 peer_objects = peer_object.all()
85 except AttributeError:
86 peer_objects = [peer_object]
Sapan Bhatiacfef6ef2014-08-20 03:04:03 -040087 except:
Sapan Bhatia082ee362015-02-08 06:35:36 +000088 peer_objects = []
Sapan Bhatiacfef6ef2014-08-20 03:04:03 -040089
Sapan Bhatia082ee362015-02-08 06:35:36 +000090 if (failed in peer_objects):
91 if (obj.backend_status!=failed.backend_status):
92 obj.backend_status = failed.backend_status
Sapan Bhatia7e482de2014-08-22 03:05:13 -040093 obj.save(update_fields=['backend_status'])
Scott Bakeraa86bfb2015-03-04 21:31:14 -080094 raise FailedDependency("Failed dependency for %s:%s peer %s:%s failed %s:%s" % (obj.__class__.__name__, str(getattr(obj,"pk","no_pk")), peer_object.__class__.__name__, str(getattr(peer_object,"pk","no_pk")), failed.__class__.__name__, str(getattr(failed,"pk","no_pk"))))
Sapan Bhatia24836f12013-08-27 10:16:05 -040095
Sapan Bhatia60823362014-04-30 00:52:32 -040096 def call(self, failed=[], deletion=False):
97 pending = self.fetch_pending(deletion)
Tony Mackce79de02013-09-24 10:12:33 -040098 for o in pending:
Sapan Bhatia8a51bf52015-01-29 20:55:40 +000099 sync_failed = False
Tony Mack68e818d2013-09-25 13:34:17 -0400100 try:
Sapan Bhatiade9f24c2015-02-10 17:21:33 -0500101 backoff_disabled = Config().observer_backoff_disabled
Sapan Bhatia25353592015-02-10 17:16:07 -0500102 except:
103 backoff_disabled = 0
104
105 try:
Sapan Bhatia8a51bf52015-01-29 20:55:40 +0000106 scratchpad = json.loads(o.backend_register)
107 if (scratchpad):
108 next_run = scratchpad['next_run']
Sapan Bhatia25353592015-02-10 17:16:07 -0500109 if (not backoff_disabled and next_run>time.time()):
Sapan Bhatia8a51bf52015-01-29 20:55:40 +0000110 sync_failed = True
111 print "BACKING OFF, exponent = %d"%scratchpad['exponent']
112 except:
113 pass
114
115 if (not sync_failed):
Sapan Bhatiaeba08432014-04-28 23:58:36 -0400116 try:
Sapan Bhatia8a51bf52015-01-29 20:55:40 +0000117 for f in failed:
118 self.check_dependencies(o,f) # Raises exception if failed
119 if (deletion):
120 self.delete_record(o)
121 o.delete(purge=True)
122 else:
123 self.sync_record(o)
124 o.enacted = datetime.now() # Is this the same timezone? XXX
125 scratchpad = {'next_run':0, 'exponent':0}
126 o.backend_register = json.dumps(scratchpad)
127 o.backend_status = "1 - OK"
128 o.save(update_fields=['enacted','backend_status','backend_register'])
129 except Exception,e:
130 logger.log_exc("sync step failed!")
Sapan Bhatia6d422152015-02-08 06:31:42 +0000131 try:
132 if (o.backend_status.startswith('2 - ')):
133 str_e = '%s // %r'%(o.backend_status[4:],e)
Sapan Bhatiab7b1b6e2015-02-08 06:36:32 +0000134 str_e = elim_dups(str_e)
Sapan Bhatia6d422152015-02-08 06:31:42 +0000135 else:
136 str_e = '%r'%e
137 except:
138 str_e = '%r'%e
139
Sapan Bhatia9c308fc2014-08-22 03:07:59 -0400140 try:
Sapan Bhatia8a51bf52015-01-29 20:55:40 +0000141 o.backend_status = '2 - %s'%self.error_map.map(str_e)
Sapan Bhatia9c308fc2014-08-22 03:07:59 -0400142 except:
Sapan Bhatia8a51bf52015-01-29 20:55:40 +0000143 o.backend_status = '2 - %s'%str_e
Sapan Bhatiaeba08432014-04-28 23:58:36 -0400144
Sapan Bhatia8a51bf52015-01-29 20:55:40 +0000145 try:
146 scratchpad = json.loads(o.backend_register)
147 scratchpad['exponent']
148 except:
149 scratchpad = {'next_run':0, 'exponent':0}
150
151 # Second failure
152 if (scratchpad['exponent']):
153 delay = scratchpad['exponent'] * 600 # 10 minutes
154 if (delay<1440):
155 delay = 1440
156 scratchpad['next_run'] = time.time() + delay
157
158 scratchpad['exponent']+=1
159
160 o.backend_register = json.dumps(scratchpad)
161
162 # TOFIX:
163 # DatabaseError: value too long for type character varying(140)
164 if (o.pk):
165 try:
Sapan Bhatia6d422152015-02-08 06:31:42 +0000166 o.backend_status = o.backend_status[:1024]
Sapan Bhatia8a51bf52015-01-29 20:55:40 +0000167 o.save(update_fields=['backend_status','backend_register'])
168 except:
169 print "Could not update backend status field!"
170 pass
171 sync_failed = True
172
173
174 if (sync_failed):
Tony Mack68e818d2013-09-25 13:34:17 -0400175 failed.append(o)
Sapan Bhatiaca2e21f2013-10-02 01:10:02 -0400176
Tony Mackce79de02013-09-24 10:12:33 -0400177 return failed
Sapan Bhatia24836f12013-08-27 10:16:05 -0400178
Tony Mack16f04742013-09-25 08:53:28 -0400179 def __call__(self, **args):
180 return self.call(**args)