XOS sFlow service models, synchronizer, tosca scripts
diff --git a/xos/configurations/cord/ceilometer.yaml b/xos/configurations/cord/ceilometer.yaml
index 0ea5cbd..215a1cc 100644
--- a/xos/configurations/cord/ceilometer.yaml
+++ b/xos/configurations/cord/ceilometer.yaml
@@ -6,6 +6,58 @@
- custom_types/xos.yaml
node_types:
+ tosca.nodes.SFlowService:
+ derived_from: tosca.nodes.Root
+ description: >
+ XOS SFlow Collection Service
+ capabilities:
+ scalable:
+ type: tosca.capabilities.Scalable
+ service:
+ type: tosca.capabilities.xos.Service
+ properties:
+ kind:
+ type: string
+ default: generic
+ description: Type of service.
+ view_url:
+ type: string
+ required: false
+ description: URL to follow when icon is clicked in the Service Directory.
+ icon_url:
+ type: string
+ required: false
+ description: ICON to display in the Service Directory.
+ enabled:
+ type: boolean
+ default: true
+ published:
+ type: boolean
+ default: true
+ description: If True then display this Service in the Service Directory.
+ public_key:
+ type: string
+ required: false
+ description: Public key to install into Instances to allows Services to SSH into them.
+ private_key_fn:
+ type: string
+ required: false
+ description: Location of private key file
+ versionNumber:
+ type: string
+ required: false
+ description: Version number of Service.
+ sflow_port:
+ type: integer
+ required: false
+ default: 6643
+ description: sFlow listening port
+ sflow_api_port:
+ type: integer
+ required: false
+ default: 33333
+ description: sFlow publish subscribe api listening port
+
tosca.nodes.CeilometerTenant:
derived_from: tosca.nodes.Root
description: >
@@ -28,6 +80,14 @@
# artifacts:
# pubkey: /opt/xos/synchronizers/vcpe/vcpe_public_key
+ service_sflow:
+ type: tosca.nodes.SFlowService
+ requirements:
+ properties:
+ view_url: /admin/ceilometer/sflowservice/$id$/
+ kind: sflow
+ sflow_port: 6643
+ sflow_api_port: 33333
Private:
type: tosca.nodes.NetworkTemplate
@@ -70,6 +130,17 @@
properties:
default_flavor: m1.small
+ mysite_sflow:
+ description: Slice for sFlow service
+ type: tosca.nodes.Slice
+ requirements:
+ - sflow_service:
+ node: service_sflow
+ relationship: tosca.relationships.MemberOfService
+ - site:
+ node: mysite
+ relationship: tosca.relationships.MemberOfSite
+
my_ceilometer_tenant:
description: Ceilometer Service default Tenant
type: tosca.nodes.CeilometerTenant
@@ -78,6 +149,29 @@
node: service_ceilometer
relationship: tosca.relationships.MemberOfService
+ # Virtual machines
+ sflow_service_instance:
+ type: tosca.nodes.Compute
+ capabilities:
+ # Host container properties
+ host:
+ properties:
+ num_cpus: 1
+ disk_size: 10 GB
+ mem_size: 4 MB
+ # Guest Operating System properties
+ os:
+ properties:
+ # host Operating System image properties
+ architecture: x86_64
+ type: linux
+ distribution: Ubuntu
+ version: 14.10
+ requirements:
+ - slice:
+ node: mysite_sflow
+ relationship: tosca.relationships.MemberOfSlice
+
Ceilometer:
type: tosca.nodes.DashboardView
properties:
diff --git a/xos/services/ceilometer/admin.py b/xos/services/ceilometer/admin.py
index ed8e47a..73b205e 100644
--- a/xos/services/ceilometer/admin.py
+++ b/xos/services/ceilometer/admin.py
@@ -10,7 +10,7 @@
from django.utils import timezone
from django.contrib.contenttypes import generic
from suit.widgets import LinkedSelect
-from core.admin import ServiceAppAdmin,SliceInline,ServiceAttrAsTabInline, ReadOnlyAwareAdmin, XOSTabularInline, ServicePrivilegeInline, TenantRootTenantInline, TenantRootPrivilegeInline
+from core.admin import ServiceAppAdmin,SliceInline,ServiceAttrAsTabInline, ReadOnlyAwareAdmin, XOSTabularInline, ServicePrivilegeInline, TenantRootTenantInline, TenantRootPrivilegeInline, TenantAttrAsTabInline
from core.middleware import get_request
from functools import update_wrapper
@@ -97,6 +97,101 @@
def queryset(self, request):
return MonitoringChannel.get_tenant_objects_by_user(request.user)
+class SFlowServiceForm(forms.ModelForm):
+ sflow_port = forms.IntegerField(required=False)
+ sflow_api_port = forms.IntegerField(required=False)
+
+ def __init__(self,*args,**kwargs):
+ super (SFlowServiceForm,self ).__init__(*args,**kwargs)
+ if self.instance:
+ # fields for the attributes
+ self.fields['sflow_port'].initial = self.instance.sflow_port
+ self.fields['sflow_api_port'].initial = self.instance.sflow_api_port
+ if (not self.instance) or (not self.instance.pk):
+ # default fields for an 'add' form
+ self.fields['sflow_port'].initial = SFLOW_PORT
+ self.fields['sflow_api_port'].initial = SFLOW_API_PORT
+
+ def save(self, commit=True):
+ self.instance.sflow_port = self.cleaned_data.get("sflow_port")
+ self.instance.sflow_api_port = self.cleaned_data.get("sflow_api_port")
+ return super(SFlowServiceForm, self).save(commit=commit)
+
+ class Meta:
+ model = SFlowService
+
+class SFlowServiceAdmin(ReadOnlyAwareAdmin):
+ model = SFlowService
+ verbose_name = "SFlow Service"
+ verbose_name_plural = "SFlow Service"
+ list_display = ("backend_status_icon", "name", "enabled")
+ list_display_links = ('backend_status_icon', 'name', )
+ fieldsets = [(None, {'fields': ['backend_status_text', 'name','enabled','versionNumber', 'description',"view_url","sflow_port","sflow_api_port","icon_url" ], 'classes':['suit-tab suit-tab-general']})]
+ readonly_fields = ('backend_status_text', )
+ inlines = [SliceInline,ServiceAttrAsTabInline,ServicePrivilegeInline]
+ form = SFlowServiceForm
+
+ extracontext_registered_admins = True
+
+ user_readonly_fields = ["name", "enabled", "versionNumber", "description"]
+
+ suit_form_tabs =(('general', 'SFlow Service Details'),
+ ('administration', 'Administration'),
+ ('slices','Slices'),
+ ('serviceattrs','Additional Attributes'),
+ ('serviceprivileges','Privileges'),
+ )
+
+ suit_form_includes = (('sflowadmin.html', 'top', 'administration'),
+ )
+
+ def queryset(self, request):
+ return SFlowService.get_service_objects_by_user(request.user)
+
+class SFlowTenantForm(forms.ModelForm):
+ creator = forms.ModelChoiceField(queryset=User.objects.all())
+ listening_endpoint = forms.CharField(max_length=1024, help_text="sFlow listening endpoint in udp://IP:port format")
+
+ def __init__(self,*args,**kwargs):
+ super (SFlowTenantForm,self ).__init__(*args,**kwargs)
+ self.fields['kind'].widget.attrs['readonly'] = True
+ self.fields['provider_service'].queryset = SFlowService.get_service_objects().all()
+ if self.instance:
+ # fields for the attributes
+ self.fields['creator'].initial = self.instance.creator
+ self.fields['listening_endpoint'].initial = self.instance.listening_endpoint
+ if (not self.instance) or (not self.instance.pk):
+ # default fields for an 'add' form
+ self.fields['kind'].initial = SFLOW_KIND
+ self.fields['creator'].initial = get_request().user
+ if SFlowService.get_service_objects().exists():
+ self.fields["provider_service"].initial = SFlowService.get_service_objects().all()[0]
+
+ def save(self, commit=True):
+ self.instance.creator = self.cleaned_data.get("creator")
+ self.instance.listening_endpoint = self.cleaned_data.get("listening_endpoint")
+ return super(SFlowTenantForm, self).save(commit=commit)
+
+ class Meta:
+ model = SFlowTenant
+
+class SFlowTenantAdmin(ReadOnlyAwareAdmin):
+ list_display = ('backend_status_icon', 'creator', 'listening_endpoint' )
+ list_display_links = ('backend_status_icon', 'listening_endpoint')
+ fieldsets = [ (None, {'fields': ['backend_status_text', 'kind', 'provider_service', 'subscriber_service', 'service_specific_attribute', 'listening_endpoint',
+ 'creator'],
+ 'classes':['suit-tab suit-tab-general']})]
+ readonly_fields = ('backend_status_text', 'instance', 'service_specific_attribute')
+ inlines = [TenantAttrAsTabInline]
+ form = SFlowTenantForm
+
+ suit_form_tabs = (('general','Details'), ('tenantattrs', 'Attributes'))
+
+ def queryset(self, request):
+ return SFlowTenant.get_tenant_objects_by_user(request.user)
+
admin.site.register(CeilometerService, CeilometerServiceAdmin)
+admin.site.register(SFlowService, SFlowServiceAdmin)
admin.site.register(MonitoringChannel, MonitoringChannelAdmin)
+admin.site.register(SFlowTenant, SFlowTenantAdmin)
diff --git a/xos/services/ceilometer/models.py b/xos/services/ceilometer/models.py
index 826b4d4..6c749f3 100644
--- a/xos/services/ceilometer/models.py
+++ b/xos/services/ceilometer/models.py
@@ -166,16 +166,18 @@
SFLOW_KIND = "sflow"
+SFLOW_PORT = 6343
+SFLOW_API_PORT = 33333
class SFlowService(Service):
KIND = SFLOW_KIND
class Meta:
- app_label = "sflow"
+ app_label = "ceilometer"
verbose_name = "sFlow Collection Service"
proxy = True
- default_attributes = {"sflow_port": "6343", "sflow_api_port":"33333"}
+ default_attributes = {"sflow_port": SFLOW_PORT, "sflow_api_port": SFLOW_API_PORT}
sync_attributes = ("sflow_port", "sflow_api_port",)
diff --git a/xos/services/ceilometer/templates/sflowadmin.html b/xos/services/ceilometer/templates/sflowadmin.html
new file mode 100644
index 0000000..da2a8dd
--- /dev/null
+++ b/xos/services/ceilometer/templates/sflowadmin.html
@@ -0,0 +1,6 @@
+<div class = "left-nav">
+<ul>
+<li><a href="/admin/ceilometer/sflowtenant/">sFlow Tenants</a></li>
+</ul>
+</div>
+
diff --git a/xos/synchronizers/monitoring_channel/steps/sync_sflowservice.py b/xos/synchronizers/monitoring_channel/steps/sync_sflowservice.py
new file mode 100644
index 0000000..0471955
--- /dev/null
+++ b/xos/synchronizers/monitoring_channel/steps/sync_sflowservice.py
@@ -0,0 +1,74 @@
+import hashlib
+import os
+import socket
+import sys
+import base64
+import time
+from django.db.models import F, Q
+from xos.config import Config
+from synchronizers.base.syncstep import SyncStep
+from synchronizers.base.ansible import run_template_ssh
+from synchronizers.base.SyncInstanceUsingAnsible import SyncInstanceUsingAnsible
+from core.models import Service, Slice
+from services.ceilometer.models import SFlowService
+from xos.logger import Logger, logging
+
+# hpclibrary will be in steps/..
+parentdir = os.path.join(os.path.dirname(__file__),"..")
+sys.path.insert(0,parentdir)
+
+logger = Logger(level=logging.INFO)
+
+class SyncSFlowService(SyncInstanceUsingAnsible):
+ provides=[SFlowService]
+ observes=SFlowService
+ requested_interval=0
+ template_name = "sync_sflowservice.yaml"
+ service_key_name = "/opt/xos/synchronizers/monitoring_channel/monitoring_channel_private_key"
+
+ def __init__(self, *args, **kwargs):
+ super(SyncSFlowService, self).__init__(*args, **kwargs)
+
+ def fetch_pending(self, deleted):
+ if (not deleted):
+ objs = SFlowService.get_service_objects().filter(Q(enacted__lt=F('updated')) | Q(enacted=None),Q(lazy_blocked=False))
+ else:
+ objs = SFlowService.get_deleted_service_objects()
+
+ return objs
+
+ def get_instance(self, o):
+ # We assume the ONOS service owns a slice, so pick one of the instances
+ # inside that slice to sync to.
+
+ serv = o
+
+ if serv.slices.exists():
+ slice = serv.slices.all()[0]
+ if slice.instances.exists():
+ return slice.instances.all()[0]
+
+ return None
+
+ def get_extra_attributes(self, o):
+ fields={}
+ fields["instance_hostname"] = self.get_instance(o).instance_name.replace("_","-")
+ fields["listening_endpoint"] = o.listening_endpoint
+ fields["nat_ip"] = self.get_instance(o).get_ssh_ip()
+ fields["sflow_container"] = "sflowpubsub"
+ return fields
+
+ def sync_fields(self, o, fields):
+ # the super causes the playbook to be run
+ super(SyncSFlowService, self).sync_fields(o, fields)
+
+ def run_playbook(self, o, fields):
+ instance = self.get_instance(o)
+ if (instance.isolation=="container"):
+ # If the instance is already a container, then we don't need to
+ # install ONOS.
+ return
+ super(SyncSFlowService, self).run_playbook(o, fields)
+
+ def delete_record(self, m):
+ pass
diff --git a/xos/synchronizers/monitoring_channel/steps/sync_sflowservice.yaml b/xos/synchronizers/monitoring_channel/steps/sync_sflowservice.yaml
new file mode 100644
index 0000000..470fc34
--- /dev/null
+++ b/xos/synchronizers/monitoring_channel/steps/sync_sflowservice.yaml
@@ -0,0 +1,69 @@
+---
+- hosts: {{ instance_name }}
+ gather_facts: False
+ connection: ssh
+ user: ubuntu
+ sudo: yes
+
+ tasks:
+
+ - name: Fix /etc/hosts
+ lineinfile:
+ dest=/etc/hosts
+ regexp="127.0.0.1 localhost"
+ line="127.0.0.1 localhost {{ instance_hostname }}"
+
+ - name: Add repo key
+ apt_key:
+ keyserver=hkp://pgp.mit.edu:80
+ id=58118E89F3A912897C070ADBF76221572C52609D
+
+ - name: Install Docker repo
+ apt_repository:
+ repo="deb https://apt.dockerproject.org/repo ubuntu-trusty main"
+ state=present
+
+ - name: Install Docker
+ apt:
+ name={{ '{{' }} item {{ '}}' }}
+ state=latest
+ update_cache=yes
+ with_items:
+ - docker-engine
+ - python-pip
+ - python-httplib2
+
+ - name: Install docker-py
+ pip:
+ name=docker-py
+ state=latest
+
+ - name: sflow pub-sub config
+ template: src=/opt/xos/synchronizers/monitoring_channel/templates/sflow_pub_sub/sflow_pub_sub_config.j2 dest=/usr/local/share/sflow_pub_sub.conf mode=0777
+
+ - name: Start SFLOW pub-sub container
+ docker:
+ docker_api_version: "1.18"
+ name: {{ sflow_container }}
+ # was: reloaded
+ state: running
+ image: srikanthvavila/sflowpubsub
+ ports:
+ - "{{ sflow_port }}:{{ sflow_port }}/udp"
+ - "{{ sflow_api_port }}:{{ sflow_api_port }}"
+ volumes:
+ - /usr/local/share/sflow_pub_sub.conf:/usr/local/share/sflow_pub_sub/sflow_pub_sub.conf
+
+ - name: Get Docker IP
+ #TODO: copy dockerip.sh to monitoring service synchronizer
+ script: /opt/xos/synchronizers/onos/scripts/dockerip.sh {{ sflow_container }}
+ register: dockerip
+
+ - name: Wait for SFlow service to come up
+ wait_for:
+ host={{ '{{' }} dockerip.stdout {{ '}}' }}
+ port={{ '{{' }} item {{ '}}' }}
+ state=present
+ with_items:
+ - {{ sflow_port }}
+ - {{ sflow_api_port }}
diff --git a/xos/synchronizers/monitoring_channel/steps/sync_sflowtenant.py b/xos/synchronizers/monitoring_channel/steps/sync_sflowtenant.py
new file mode 100644
index 0000000..3e14b1e
--- /dev/null
+++ b/xos/synchronizers/monitoring_channel/steps/sync_sflowtenant.py
@@ -0,0 +1,69 @@
+import hashlib
+import os
+import socket
+import socket
+import sys
+import base64
+import time
+import re
+import json
+from django.db.models import F, Q
+from xos.config import Config
+from synchronizers.base.syncstep import SyncStep
+from synchronizers.base.ansible import run_template_ssh
+from synchronizers.base.SyncInstanceUsingAnsible import SyncInstanceUsingAnsible
+from core.models import Service, Slice, ControllerSlice, ControllerUser
+from services.ceilometer.models import SFlowTenant
+from xos.logger import Logger, logging
+
+# hpclibrary will be in steps/..
+parentdir = os.path.join(os.path.dirname(__file__),"..")
+sys.path.insert(0,parentdir)
+
+logger = Logger(level=logging.INFO)
+
+class SyncSFlowTenant(SyncInstanceUsingAnsible):
+ provides=[SFlowTenant]
+ observes=SFlowTenant
+ requested_interval=0
+ template_name = "sync_sflowtenant.yaml"
+ service_key_name = "/opt/xos/synchronizers/monitoring_channel/monitoring_channel_private_key"
+
+ def __init__(self, *args, **kwargs):
+ super(SyncSFlowTenant, self).__init__(*args, **kwargs)
+
+ def fetch_pending(self, deleted):
+ if (not deleted):
+ objs = SFlowTenant.get_tenant_objects().filter(Q(enacted__lt=F('updated')) | Q(enacted=None),Q(lazy_blocked=False))
+ else:
+ objs = SFlowTenant.get_deleted_tenant_objects()
+
+ return objs
+
+ def get_sflow_service(self, o):
+ sflows = SFlowService.get_service_objects().filter(id=o.provider_service.id)
+ if not sflows:
+ raise "No associated SFlow service"
+
+ return sflows[0]
+
+ def get_extra_attributes(self, o):
+ instance = self.get_instance(o)
+
+ fields={}
+ fields["sflow_api_base_url"] = get_sflow_service(self, o).sflow_api_url
+ fields["sflow_api_port"] = get_sflow_service(self, o).sflow_api_port
+ fields["listening_endpoint"] = o.listening_endpoint
+ fields["sflow_container"] = "sflowpubsub"
+
+ return fields
+
+ def sync_fields(self, o, fields):
+ # the super causes the playbook to be run
+ super(SyncSFlowTenant, self).sync_fields(o, fields)
+
+ def run_playbook(self, o, fields):
+ super(SyncSFlowTenant, self).run_playbook(o, fields)
+
+ def delete_record(self, m):
+ pass
diff --git a/xos/synchronizers/monitoring_channel/steps/sync_sflowtenant.yaml b/xos/synchronizers/monitoring_channel/steps/sync_sflowtenant.yaml
new file mode 100644
index 0000000..701ce5c
--- /dev/null
+++ b/xos/synchronizers/monitoring_channel/steps/sync_sflowtenant.yaml
@@ -0,0 +1,28 @@
+---
+- hosts: {{ instance_name }}
+ gather_facts: False
+ connection: ssh
+ user: {{ username }}
+ sudo: yes
+
+ tasks:
+
+ - name: Get Docker IP
+ #TODO: copy dockerip.sh to monitoring service synchronizer
+ script: /opt/xos/synchronizers/onos/scripts/dockerip.sh {{ sflow_container }}
+ register: sflowserviceaddr
+
+ - name: Wait for SFlow service to come up
+ wait_for:
+ host={{ '{{' }} sflowserviceaddr.stdout {{ '}}' }}
+ port={{ '{{' }} item {{ '}}' }}
+ state=present
+ with_items:
+ - {{ sflow_api_port }}
+
+ - name: Invoke SFlow service REST API to subscribe
+ uri:
+ url: http://{{ '{{' }} sflowserviceaddr.stdout {{ '}}' }}:{{ sflow_api_port }}/subscribe
+ body: "{{ listening_endpoint }}"
+ body_format: raw
+ method: POST
diff --git a/xos/synchronizers/monitoring_channel/templates/Dockerfile.sflowpubsub b/xos/synchronizers/monitoring_channel/templates/Dockerfile.sflowpubsub
new file mode 100644
index 0000000..c9025ee
--- /dev/null
+++ b/xos/synchronizers/monitoring_channel/templates/Dockerfile.sflowpubsub
@@ -0,0 +1,22 @@
+FROM ubuntu:14.04.2
+MAINTAINER Andy Bavier <acb@cs.princeton.edu>
+
+# XXX Workaround for docker bug:
+# https://github.com/docker/docker/issues/6345
+# Kernel 3.15 breaks docker, uss the line below as a workaround
+# until there is a fix
+RUN ln -s -f /bin/true /usr/bin/chfn
+# XXX End workaround
+
+# Install.
+RUN apt-get update && apt-get install -y \
+ python-pip \
+ python-dev
+
+RUN pip install Flask
+RUN mkdir -p /usr/local/share/
+ADD sflow_pub_sub /usr/local/share/sflow_pub_sub
+RUN chmod +x /usr/local/share/sflow_pub_sub/sflow_pub_sub_main.py
+RUN chmod +x /usr/local/share/sflow_pub_sub/start_sflow_pub_sub
+WORKDIR /usr/local/share/sflow_pub_sub/
+CMD /usr/local/share/sflow_pub_sub/start_sflow_pub_sub
diff --git a/xos/synchronizers/monitoring_channel/templates/sflow_pub_sub/README b/xos/synchronizers/monitoring_channel/templates/sflow_pub_sub/README
new file mode 100644
index 0000000..ee8ad9b
--- /dev/null
+++ b/xos/synchronizers/monitoring_channel/templates/sflow_pub_sub/README
@@ -0,0 +1,37 @@
+
+Subscribe-Publish Frame Work:
+1.Command to Install Flask Webserver frame work.
+ sudo pip install Flask
+
+ Along with flask we need the following packages:
+ msgpack
+ fnmatch
+ operator
+ logging
+ oslo_utils
+ ConfigParser
+
+2.Files: i.sub_main.py
+ ii.pubrecords.py
+ iii.pub_sub.conf
+
+3.Command to start the server:
+ #python sun_main.py
+4.Command for subscription:
+ i.app_id:Application ID,should be unique.
+ ii.target:
+ Presently only udp is supported.
+ a.udp:<ip:portno>
+ b.kafka:<kafkaip:kafkaport>
+ iii.sub_info:Sunscription notifications.ex:cpu_util,cpu_*
+ iv.query:
+ Below information need to provide as part of query.
+ a.field:fileds like user id ,porject id etc.,
+ b.op:"eq","gt","lt" etc.,
+ c.value:value of the fileds.
+ Example:
+ curl -i -H "Content-Type: application/json" -X SUB -d '{"app_id":"10","target":"udp://10.11.10.1:5006","sub_info":"cpu_util","query":[{"field":"user_id","op":"eq","value":"e1271a86bd4e413c87248baf2e5f01e0"},{"field":"project_id","op":"eq","value":"b1a3bf16d2014b47be9aefea88087318"},{"field":"resource_id","op":"eq","value":"658cd03f-d0f0-4f55-9f48-39e7222a8646"}]}' -L http://10.11.10.1:4455/subscribe
+
+5.Command for unsunscription:
+ For unsubcription only appid will be needed.
+ curl -i -H "Content-Type: application/json" -X UNSUB -d '{"app_id":"10"}' http://10.11.10.1:4455/unsubscribe
diff --git a/xos/synchronizers/monitoring_channel/templates/sflow_pub_sub/sample_sflow_pub_sub.conf_sample b/xos/synchronizers/monitoring_channel/templates/sflow_pub_sub/sample_sflow_pub_sub.conf_sample
new file mode 100644
index 0000000..40b5bf5
--- /dev/null
+++ b/xos/synchronizers/monitoring_channel/templates/sflow_pub_sub/sample_sflow_pub_sub.conf_sample
@@ -0,0 +1,11 @@
+[LOGGING]
+level = DEBUG
+filename = sflow_pub_sub.log
+
+[WEB_SERVER]
+webserver_host = 0.0.0.0
+webserver_port = 33333
+
+[SFLOW]
+listening_ip_addr = 0.0.0.0
+listening_port = 6343
diff --git a/xos/synchronizers/monitoring_channel/templates/sflow_pub_sub/sflow_pub_sub_config.j2 b/xos/synchronizers/monitoring_channel/templates/sflow_pub_sub/sflow_pub_sub_config.j2
new file mode 100644
index 0000000..1c5c88c
--- /dev/null
+++ b/xos/synchronizers/monitoring_channel/templates/sflow_pub_sub/sflow_pub_sub_config.j2
@@ -0,0 +1,15 @@
+# This file autogenerated by sflow service synchronizer
+# It contains a list of attributes to be used by sflow service
+# syntax: key=value
+
+[LOGGING]
+level = DEBUG
+filename = sflow_pub_sub.log
+
+[WEB_SERVER]
+webserver_host = 0.0.0.0
+webserver_port = {{ sflow_api_port }}
+
+[SFLOW]
+listening_ip_addr = 0.0.0.0
+listening_port = {{ sflow_port }}
diff --git a/xos/synchronizers/monitoring_channel/templates/sflow_pub_sub/sflow_pub_sub_main.py b/xos/synchronizers/monitoring_channel/templates/sflow_pub_sub/sflow_pub_sub_main.py
new file mode 100644
index 0000000..1276721
--- /dev/null
+++ b/xos/synchronizers/monitoring_channel/templates/sflow_pub_sub/sflow_pub_sub_main.py
@@ -0,0 +1,136 @@
+#!/usr/bin/python
+import socket,thread
+import sys
+import fnmatch
+import operator
+import logging
+import ConfigParser
+from urlparse import urlparse
+from sflow_sub_records import *
+
+from flask import request, Request, jsonify
+from flask import Flask
+from flask import make_response
+app = Flask(__name__)
+
+COMPARATORS = {
+ 'gt': operator.gt,
+ 'lt': operator.lt,
+ 'ge': operator.ge,
+ 'le': operator.le,
+ 'eq': operator.eq,
+ 'ne': operator.ne,
+}
+
+LEVELS = {'DEBUG': logging.DEBUG,
+ 'INFO': logging.INFO,
+ 'WARNING': logging.WARNING,
+ 'ERROR': logging.ERROR,
+ 'CRITICAL': logging.CRITICAL}
+
+_DEFAULT_LOG_DATE_FORMAT = "%Y-%m-%d %H:%M:%S"
+
+@app.route('/subscribe',methods=['POST'])
+def subscribe():
+ logging.debug(" SUB data:%s",request.data)
+ target = request.data
+ parse_target=urlparse(target)
+ if not parse_target.netloc:
+ err_str = "Error:Invalid target format"
+ logging.error("* Invalid target format")
+ return err_str
+
+ status = ""
+ if parse_target.scheme == "udp" :
+ host=parse_target.hostname
+ port=parse_target.port
+ scheme = parse_target.scheme
+ app_ip = host
+ app_port = port
+
+ if host == None or port == None :
+ err_str = "* Error: Invalid IP Address format"
+ logging.error("* Invalid IP Address format")
+ return err_str
+
+ subscrip_obj=sflow_sub_record(scheme,None,app_ip,app_port,None,None)
+ status = add_sflow_sub_record(subscrip_obj)
+ print_sflow_sub_records()
+
+ if parse_target.scheme == "kafka" :
+ pass
+ if parse_target.scheme == "file" :
+ pass
+ return status
+
+@app.route('/unsubscribe',methods=['POST'])
+def unsubscribe():
+ try :
+ target = request.data
+ parse_target=urlparse(target)
+ if not parse_target.netloc:
+ err_str = "Error:Invalid target format"
+ logging.error("* Invalid target format")
+ return err_str
+
+ status = ""
+ if parse_target.scheme == "udp" :
+ host=parse_target.hostname
+ port=parse_target.port
+ scheme = parse_target.scheme
+ app_ip = host
+ app_port = port
+
+ delete_sflow_sub_record(app_ip, app_port)
+ except Exception as e:
+ logging.error("* %s",e.__str__())
+ return e.__str__()
+ return "UnSubscrition is sucessful! \n"
+
+@app.errorhandler(404)
+def not_found(error):
+ return make_response(jsonify({'error': 'Not found'}), 404)
+
+def sflow_recv(host,port):
+ udp = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) # UDP
+ udp.bind((host, port))
+ logging.info("Started sflow receive thread on %s:%s",host, str(port))
+
+ while True:
+ data, source = udp.recvfrom(64000)
+ for obj in sflow_sub_database:
+ target_host = obj.ipaddress
+ target_port = int(obj.portno)
+ try:
+ logging.debug("Replicating the sFlow data to:%s:%s",target_host, str(target_port))
+ udp.sendto(data,(target_host,target_port))
+ except Exception:
+ logging.error ("Unable to send sFlow data to target %s:%s ",target_host,str(target_port))
+ logging.warn("Exiting sflow receive thread")
+
+
+def initialize(host,port):
+ thread.start_new(sflow_recv,(host,port,))
+
+if __name__ == "__main__":
+
+ try:
+ config = ConfigParser.ConfigParser()
+ config.read('sflow_pub_sub.conf')
+ webserver_host = config.get('WEB_SERVER','webserver_host')
+ webserver_port = int (config.get('WEB_SERVER','webserver_port'))
+ sflow_listening_ip_addr = config.get('SFLOW','listening_ip_addr')
+ sflow_listening_port = int (config.get('SFLOW','listening_port'))
+
+ log_level = config.get('LOGGING','level')
+ log_file = config.get('LOGGING','filename')
+
+ level = LEVELS.get(log_level, logging.NOTSET)
+ logging.basicConfig(filename=log_file,format='%(asctime)s %(levelname)s %(message)s',\
+ datefmt=_DEFAULT_LOG_DATE_FORMAT,level=level)
+ except Exception as e:
+ print("* Error in config file:",e.__str__())
+ logging.error("* Error in confing file:%s",e.__str__())
+ else:
+ initialize(sflow_listening_ip_addr,sflow_listening_port)
+ app.run(host=webserver_host,port=webserver_port,debug=False)
diff --git a/xos/synchronizers/monitoring_channel/templates/sflow_pub_sub/sflow_sub_records.py b/xos/synchronizers/monitoring_channel/templates/sflow_pub_sub/sflow_sub_records.py
new file mode 100644
index 0000000..f8b0038
--- /dev/null
+++ b/xos/synchronizers/monitoring_channel/templates/sflow_pub_sub/sflow_sub_records.py
@@ -0,0 +1,63 @@
+#!/usr/bin/python
+import fnmatch
+import logging
+
+class sflow_sub_record:
+ def __init__(self,scheme,app_id,app_ip,app_port,subscription_info,sub_info_filter):
+ logging.debug("* Updating subscription_info ")
+ self.scheme = scheme
+ self.app_id = app_id
+ self.ipaddress = app_ip
+ self.portno = app_port
+ self.subscription_info = subscription_info
+ self.sub_info_filter = sub_info_filter
+
+sflow_sub_database=[]
+def add_sflow_sub_record(record):
+ logging.info("* inside %s",add_sflow_sub_record.__name__)
+ if not sflow_sub_database:
+ logging.debug("* -----------List is EMpty -------------")
+ sflow_sub_database.append(record)
+ logging.debug("* Subscription is sucessful")
+ return "Subscription is sucessful \n"
+ for x in sflow_sub_database:
+ if (record.ipaddress == x.ipaddress) and (record.portno == x.portno) :
+ logging.warning("* entry already exists\n")
+ return "entry already exists \n"
+ sflow_sub_database.append(record)
+ return "Subscription is sucessful \n"
+
+def delete_sflow_sub_record(ip,port):
+ logging.info("* inside %s",delete_sflow_sub_record.__name__)
+ Flag = False
+ for x in sflow_sub_database:
+ if (ip == x.ipaddress) and (port == x.portno) :
+ sflow_sub_database.remove(x)
+ Flag = True
+ logging.debug("* Un-Subscription is sucessful")
+ return "Un-Subscription is sucessful \n"
+ if not Flag :
+ err_str = "No subscription exists with target: udp://" + ip + ":" + str(port) + "\n"
+ logging.error(err_str)
+ raise Exception (err_str)
+
+def print_sflow_sub_records():
+ logging.info("* inside %s",print_sflow_sub_records.__name__)
+ for obj in sflow_sub_database:
+ logging.debug("* ------------------------------------------------")
+ logging.debug("* scheme:%s",obj.scheme)
+ logging.debug("* app_id:%s",obj.app_id)
+ logging.debug("* portno:%s",obj.portno )
+ logging.debug("* ipaddress:%s",obj.ipaddress)
+ logging.debug("* portno:%s",obj.portno)
+ logging.debug("* subscription_info:%s",obj.subscription_info)
+ logging.debug("* sub_info_filter:%s",obj.sub_info_filter)
+ logging.debug("* ------------------------------------------------")
+
+def get_sflow_sub_records(notif_subscription_info):
+ logging.info("* inside %s",get_sflow_sub_records.__name__)
+ sub_list=[]
+ for obj in sflow_sub_database:
+ if obj.subscription_info == notif_subscription_info:
+ sub_list.append(obj)
+ return sub_list
diff --git a/xos/synchronizers/monitoring_channel/templates/sflow_pub_sub/start_sflow_pub_sub b/xos/synchronizers/monitoring_channel/templates/sflow_pub_sub/start_sflow_pub_sub
new file mode 100644
index 0000000..e2edda2
--- /dev/null
+++ b/xos/synchronizers/monitoring_channel/templates/sflow_pub_sub/start_sflow_pub_sub
@@ -0,0 +1 @@
+/usr/local/share/sflow_pub_sub/sflow_pub_sub_main.py
diff --git a/xos/tosca/resources/sflowservice.py b/xos/tosca/resources/sflowservice.py
new file mode 100644
index 0000000..272518e
--- /dev/null
+++ b/xos/tosca/resources/sflowservice.py
@@ -0,0 +1,41 @@
+import os
+import pdb
+import sys
+import tempfile
+sys.path.append("/opt/tosca")
+from translator.toscalib.tosca_template import ToscaTemplate
+
+from core.models import ServiceAttribute
+from services.ceilometer.models import SFlowService
+
+from service import XOSService
+
+class XOSSFlowService(XOSService):
+ provides = "tosca.nodes.SFlowService"
+ xos_model = SFlowService
+ copyin_props = ["view_url", "icon_url", "enabled", "published", "public_key", "versionNumber", "sflow_port", "sflow_api_port"]
+
+ def set_service_attr(self, obj, prop_name, value):
+ value = self.try_intrinsic_function(value)
+ if value:
+ attrs = ServiceAttribute.objects.filter(service=obj, name=prop_name)
+ if attrs:
+ attr = attrs[0]
+ if attr.value != value:
+ self.info("updating attribute %s" % prop_name)
+ attr.value = value
+ attr.save()
+ else:
+ self.info("adding attribute %s" % prop_name)
+ ta = ServiceAttribute(service=obj, name=prop_name, value=value)
+ ta.save()
+
+ def postprocess(self, obj):
+ props = self.nodetemplate.get_properties()
+ for (k,d) in props.items():
+ v = d.value
+ if k.startswith("config_"):
+ self.set_service_attr(obj, k, v)
+ elif k.startswith("rest_"):
+ self.set_service_attr(obj, k, v)
+