| # Copyright 2017-present Open Networking Foundation |
| # |
| # Licensed under the Apache License, Version 2.0 (the "License"); |
| # you may not use this file except in compliance with the License. |
| # You may obtain a copy of the License at |
| # |
| # http://www.apache.org/licenses/LICENSE-2.0 |
| # |
| # Unless required by applicable law or agreed to in writing, software |
| # distributed under the License is distributed on an "AS IS" BASIS, |
| # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| # See the License for the specific language governing permissions and |
| # limitations under the License. |
| |
| from mock import Mock |
| import random |
| |
| def mock_enumerator(items): |
| e = lambda:None |
| e.all = lambda:items |
| return e |
| |
| # A list of all mock object stores that have been created |
| AllMockObjectStores = [] |
| |
| class MockObjectList: |
| item_list = None |
| |
| def __init__(self, initial=None): |
| self.id_counter = 0 |
| if initial: |
| self.item_list=initial |
| elif self.item_list is None: |
| self.item_list=[] |
| |
| def get_items(self): |
| return self.item_list |
| |
| def count(self): |
| return len(self.get_items()) |
| |
| def first(self): |
| return self.get_items()[0] |
| |
| def all(self): |
| return self.get_items() |
| |
| def filter(self, **kwargs): |
| items = self.get_items() |
| for (k,v) in kwargs.items(): |
| items = [x for x in items if getattr(x,k) == v] |
| return items |
| |
| def get(self, **kwargs): |
| objs = self.filter(**kwargs) |
| if not objs: |
| raise Exception("No objects matching %s" % str(kwargs)) |
| return objs[0] |
| |
| class MockObjectStore(MockObjectList): |
| def save(self, o): |
| if (not hasattr(o,"id")) or (not o.id) or (o.id==98052): |
| for item in self.get_items(): |
| if item.id >= self.id_counter: |
| self.id_counter = item.id + 1 |
| |
| o.id = self.id_counter |
| self.id_counter = self.id_counter + 1 |
| |
| for item in self.get_items(): |
| if item.id == o.id: |
| item = o |
| break |
| else: |
| self.get_items().append(o) |
| |
| class MockObject: |
| objects = None |
| id = None |
| deleted = False |
| |
| def __init__(self, **kwargs): |
| setattr(self, 'backend_code', 0) |
| setattr(self, 'id', 98052) |
| setattr(self, 'pk', random.randint(0, 1<<30)) |
| |
| for (k,v) in kwargs.items(): |
| setattr(self,k,v) |
| |
| @property |
| def self_content_type_id(self): |
| return self.__class__.__name__ |
| |
| @property |
| def leaf_model(self): |
| return self |
| |
| def save(self, update_fields=[]): |
| if self.objects: |
| self.objects.save(self) |
| |
| def delete(self): |
| pass |
| |
| def tologdict(self): |
| return {} |
| |
| def get_MockObjectStore(x): |
| store = globals()["Mock%sObjects" % x]() |
| if not store in AllMockObjectStores: |
| AllMockObjectStores.append(store) |
| return store |
| |
| class ModelAccessor: |
| def check_db_connection_ok(self): |
| return True |
| |
| def fetch_pending(self, model, deleted = False): |
| num = random.randint(1, 5) |
| object_list = [] |
| |
| for i in range(num): |
| if isinstance(model, list): |
| model = model[0] |
| |
| try: |
| obj = model() |
| except: |
| import pdb |
| pdb.set_trace() |
| |
| obj.name = "Opinionated Berry %d"%i |
| object_list.append(obj) |
| |
| return object_list |
| |
| def reset_all_object_stores(self): |
| for store in AllMockObjectStores: |
| store.items = [] |
| |
| model_accessor = ModelAccessor() |
| |
| class ObjectSet(object): |
| def __init__(self, objects): |
| self.objects = objects |
| |
| def all(self): |
| return self.objects |
| |
| ##### |
| # DO NOT MODIFY THE CLASSES BELOW. THEY ARE AUTOGENERATED. |
| # |
| |
| {% for m in proto.messages -%} |
| class Mock{{ m.name }}Objects(MockObjectStore): pass |
| class {{ m.name }}(MockObject): |
| objects = get_MockObjectStore("{{ m.name }}") |
| {% for f in xproto_base_fields(m, proto.message_table) + m.fields -%} |
| {{ f.name }} = {{ xproto_first_non_empty([f.options.default, "None"]) }} |
| {% if f.link -%}{{ f.name }}_id = None{% endif %} |
| {% endfor %} |
| leaf_model_name = "{{ m.name }}" |
| {% endfor %} |