blob: 062b1d15c1bc98a312b5a9a22f17dcc4348e9186 [file] [log] [blame]
Sapan Bhatia24836f12013-08-27 10:16:05 -04001import os
2import base64
Tony Mack4fa85fb2013-09-25 14:39:57 -04003from datetime import datetime
Sapan Bhatia24836f12013-08-27 10:16:05 -04004from planetstack.config import Config
Andy Baviere7abb622013-10-18 15:11:56 -04005from util.logger import Logger, logging
Sapan Bhatiaeba08432014-04-28 23:58:36 -04006from observer.steps import *
Sapan Bhatiad9468eb2014-08-20 03:03:12 -04007from django.db.models import F, Q
Sapan Bhatia709bebd2015-02-08 06:35:36 +00008from core.models import *
Sapan Bhatia47006112015-01-29 20:55:40 +00009import json
10import time
11import pdb
Andy Baviere7abb622013-10-18 15:11:56 -040012
Andy Bavier04111b72013-10-22 16:47:10 -040013logger = Logger(level=logging.INFO)
Sapan Bhatia24836f12013-08-27 10:16:05 -040014
Sapan Bhatia709bebd2015-02-08 06:35:36 +000015def deepgetattr(obj, attr):
16 return reduce(getattr, attr.split('.'), obj)
17
Sapan Bhatia13c7f112013-09-02 14:19:35 -040018class FailedDependency(Exception):
Tony Mackce79de02013-09-24 10:12:33 -040019 pass
Sapan Bhatia13c7f112013-09-02 14:19:35 -040020
Tony Mackb469d242015-01-03 19:37:39 -050021class SyncStep(object):
Tony Mackce79de02013-09-24 10:12:33 -040022 """ A PlanetStack Sync step.
Sapan Bhatia24836f12013-08-27 10:16:05 -040023
Tony Mackce79de02013-09-24 10:12:33 -040024 Attributes:
25 psmodel Model name the step synchronizes
26 dependencies list of names of models that must be synchronized first if the current model depends on them
27 """
28 slow=False
29 def get_prop(prop):
30 try:
31 sync_config_dir = Config().sync_config_dir
32 except:
33 sync_config_dir = '/etc/planetstack/sync'
34 prop_config_path = '/'.join(sync_config_dir,self.name,prop)
35 return open(prop_config_path).read().rstrip()
Sapan Bhatia24836f12013-08-27 10:16:05 -040036
Tony Mackce79de02013-09-24 10:12:33 -040037 def __init__(self, **args):
38 """Initialize a sync step
39 Keyword arguments:
40 name -- Name of the step
41 provides -- PlanetStack models sync'd by this step
42 """
43 dependencies = []
Tony Mack387a73f2013-09-18 07:59:14 -040044 self.driver = args.get('driver')
Sapan Bhatiaeba08432014-04-28 23:58:36 -040045 self.error_map = args.get('error_map')
46
Tony Mackce79de02013-09-24 10:12:33 -040047 try:
48 self.soft_deadline = int(self.get_prop('soft_deadline_seconds'))
49 except:
50 self.soft_deadline = 5 # 5 seconds
Sapan Bhatia24836f12013-08-27 10:16:05 -040051
Tony Mackce79de02013-09-24 10:12:33 -040052 return
Sapan Bhatia24836f12013-08-27 10:16:05 -040053
Sapan Bhatiae17bc5b2014-04-30 00:53:06 -040054 def fetch_pending(self, deletion=False):
Sapan Bhatia21765662014-07-23 08:59:30 -040055 # This is the most common implementation of fetch_pending
56 # Steps should override it if they have their own logic
57 # for figuring out what objects are outstanding.
Sapan Bhatia99f49682015-01-29 20:58:25 +000058 main_obj = self.observes
Sapan Bhatiad9468eb2014-08-20 03:03:12 -040059 if (not deletion):
Sapan Bhatia21765662014-07-23 08:59:30 -040060 objs = main_obj.objects.filter(Q(enacted__lt=F('updated')) | Q(enacted=None))
61 else:
62 objs = main_obj.deleted_objects.all()
63
64 return objs
Sapan Bhatiaca2e21f2013-10-02 01:10:02 -040065 #return Sliver.objects.filter(ip=None)
Tony Mackce79de02013-09-24 10:12:33 -040066
Sapan Bhatiaca2e21f2013-10-02 01:10:02 -040067 def check_dependencies(self, obj, failed):
Tony Mackce79de02013-09-24 10:12:33 -040068 for dep in self.dependencies:
Scott Baker105b6b72014-05-12 10:40:25 -070069 peer_name = dep[0].lower() + dep[1:] # django names are camelCased with the first letter lower
Sapan Bhatia709bebd2015-02-08 06:35:36 +000070
Sapan Bhatiacfef6ef2014-08-20 03:04:03 -040071 try:
Sapan Bhatia709bebd2015-02-08 06:35:36 +000072 peer_object = deepgetattr(obj, peer_name)
73 try:
74 peer_objects = peer_object.all()
75 except AttributeError:
76 peer_objects = [peer_object]
Sapan Bhatiacfef6ef2014-08-20 03:04:03 -040077 except:
Sapan Bhatia709bebd2015-02-08 06:35:36 +000078 peer_objects = []
Sapan Bhatiacfef6ef2014-08-20 03:04:03 -040079
Sapan Bhatia709bebd2015-02-08 06:35:36 +000080 if (failed in peer_objects):
81 if (obj.backend_status!=failed.backend_status):
82 obj.backend_status = failed.backend_status
Sapan Bhatia7e482de2014-08-22 03:05:13 -040083 obj.save(update_fields=['backend_status'])
84 raise FailedDependency("Failed dependency for %s:%s peer %s:%s failed %s:%s" % (obj.__class__.__name__, str(obj.pk), peer_object.__class__.__name__, str(peer_object.pk), failed.__class__.__name__, str(failed.pk)))
Sapan Bhatia24836f12013-08-27 10:16:05 -040085
Sapan Bhatia60823362014-04-30 00:52:32 -040086 def call(self, failed=[], deletion=False):
87 pending = self.fetch_pending(deletion)
Tony Mackce79de02013-09-24 10:12:33 -040088 for o in pending:
Sapan Bhatia47006112015-01-29 20:55:40 +000089 sync_failed = False
Tony Mack68e818d2013-09-25 13:34:17 -040090 try:
Sapan Bhatia47006112015-01-29 20:55:40 +000091 scratchpad = json.loads(o.backend_register)
92 if (scratchpad):
93 next_run = scratchpad['next_run']
94 if (next_run>time.time()):
95 sync_failed = True
96 print "BACKING OFF, exponent = %d"%scratchpad['exponent']
97 except:
98 pass
99
100 if (not sync_failed):
Sapan Bhatiaeba08432014-04-28 23:58:36 -0400101 try:
Sapan Bhatia47006112015-01-29 20:55:40 +0000102 for f in failed:
103 self.check_dependencies(o,f) # Raises exception if failed
104 if (deletion):
105 self.delete_record(o)
106 o.delete(purge=True)
107 else:
108 self.sync_record(o)
109 o.enacted = datetime.now() # Is this the same timezone? XXX
110 scratchpad = {'next_run':0, 'exponent':0}
111 o.backend_register = json.dumps(scratchpad)
112 o.backend_status = "1 - OK"
113 o.save(update_fields=['enacted','backend_status','backend_register'])
114 except Exception,e:
115 logger.log_exc("sync step failed!")
Sapan Bhatia2175c1d2015-02-08 06:31:42 +0000116 try:
117 if (o.backend_status.startswith('2 - ')):
118 str_e = '%s // %r'%(o.backend_status[4:],e)
119 else:
120 str_e = '%r'%e
121 except:
122 str_e = '%r'%e
123
Sapan Bhatia9c308fc2014-08-22 03:07:59 -0400124 try:
Sapan Bhatia47006112015-01-29 20:55:40 +0000125 o.backend_status = '2 - %s'%self.error_map.map(str_e)
Sapan Bhatia9c308fc2014-08-22 03:07:59 -0400126 except:
Sapan Bhatia47006112015-01-29 20:55:40 +0000127 o.backend_status = '2 - %s'%str_e
Sapan Bhatiaeba08432014-04-28 23:58:36 -0400128
Sapan Bhatia47006112015-01-29 20:55:40 +0000129 try:
130 scratchpad = json.loads(o.backend_register)
131 scratchpad['exponent']
132 except:
133 scratchpad = {'next_run':0, 'exponent':0}
134
135 # Second failure
136 if (scratchpad['exponent']):
137 delay = scratchpad['exponent'] * 600 # 10 minutes
138 if (delay<1440):
139 delay = 1440
140 scratchpad['next_run'] = time.time() + delay
141
142 scratchpad['exponent']+=1
143
144 o.backend_register = json.dumps(scratchpad)
145
146 # TOFIX:
147 # DatabaseError: value too long for type character varying(140)
148 if (o.pk):
149 try:
Sapan Bhatia2175c1d2015-02-08 06:31:42 +0000150 o.backend_status = o.backend_status[:1024]
Sapan Bhatia47006112015-01-29 20:55:40 +0000151 o.save(update_fields=['backend_status','backend_register'])
152 except:
153 print "Could not update backend status field!"
154 pass
155 sync_failed = True
156
157
158 if (sync_failed):
Tony Mack68e818d2013-09-25 13:34:17 -0400159 failed.append(o)
Sapan Bhatiaca2e21f2013-10-02 01:10:02 -0400160
Tony Mackce79de02013-09-24 10:12:33 -0400161 return failed
Sapan Bhatia24836f12013-08-27 10:16:05 -0400162
Tony Mack16f04742013-09-25 08:53:28 -0400163 def __call__(self, **args):
164 return self.call(**args)