[8055]
This update adds kubernetes deployment files for the OFagent and fluentd services.
The OFagent calls get_my_containers_name with the assumption that the HOSTNAME
environment variable contains the docker container's ID, which does not appear
to be the case for kuberbetes. In kubernetes a container's HOSTNAME is its pod
name. Feeding a kubernetes pod name into the docker API used by
get_my_containers_name triggers an exception. This is the reason behind the
code changes to OFagent.
The kubernetes config files in this submission still use the old image naming
convention. The names in these and all the other kubernetes deployment files
will be changed to conform to the new convention in a separate update.
Change-Id: I3eecfdcc8bba070ffe3e7372f195cf15f0cdcd56
diff --git a/k8s/fluentd.yml b/k8s/fluentd.yml
new file mode 100644
index 0000000..5b535e1
--- /dev/null
+++ b/k8s/fluentd.yml
@@ -0,0 +1,171 @@
+#
+# This file describes a cluster of 3 fluentd forwarders that
+# send logs to a cluster of 2 fluentd aggregators: one active
+# and one standby.
+#
+# The active fluentd aggregator
+#
+apiVersion: v1
+kind: Service
+metadata:
+ name: fluentdactv
+spec:
+ clusterIP: None
+ selector:
+ app: fluentdactv
+ ports:
+ - protocol: TCP
+ port: 24224
+ targetPort: 24224
+---
+#
+# Ensure that the active aggregator is not deployed to the
+# same node as the standby
+#
+apiVersion: apps/v1beta1
+kind: Deployment
+metadata:
+ name: fluentdactv
+spec:
+ replicas: 1
+ template:
+ metadata:
+ labels:
+ app: fluentdactv
+ spec:
+ terminationGracePeriodSeconds: 10
+ affinity:
+ podAntiAffinity:
+ requiredDuringSchedulingIgnoredDuringExecution:
+ - labelSelector:
+ matchExpressions:
+ - key: app
+ operator: In
+ values:
+ - fluentdstby
+ topologyKey: kubernetes.io/hostname
+ containers:
+ - name: fluentdactv
+ image: cord/fluentd
+ imagePullPolicy: Never
+ volumeMounts:
+ - name: fluentd-log
+ mountPath: /fluentd/log
+ ports:
+ - containerPort: 24224
+ env:
+ - name: FLUENTD_CONF
+ value: fluent-agg.conf
+ volumes:
+ - name: fluentd-log
+ hostPath:
+ path: /var/log/voltha/logging_volume
+ type: Directory
+---
+#
+# The standby fluentd aggregator
+#
+apiVersion: v1
+kind: Service
+metadata:
+ name: fluentdstby
+spec:
+ clusterIP: None
+ selector:
+ app: fluentdstby
+ ports:
+ - protocol: TCP
+ port: 24224
+ targetPort: 24224
+---
+#
+# Ensure thet the standby aggregator is not deployed to the
+# same node as the active
+#
+apiVersion: apps/v1beta1
+kind: Deployment
+metadata:
+ name: fluentdstby
+spec:
+ replicas: 1
+ template:
+ metadata:
+ labels:
+ app: fluentdstby
+ spec:
+ terminationGracePeriodSeconds: 10
+ affinity:
+ podAntiAffinity:
+ requiredDuringSchedulingIgnoredDuringExecution:
+ - labelSelector:
+ matchExpressions:
+ - key: app
+ operator: In
+ values:
+ - fluentdactv
+ topologyKey: kubernetes.io/hostname
+ containers:
+ - name: fluentdstby
+ image: cord/fluentd
+ imagePullPolicy: Never
+ volumeMounts:
+ - name: fluentd-log
+ mountPath: /fluentd/log
+ ports:
+ - containerPort: 24224
+ env:
+ - name: FLUENTD_CONF
+ value: fluent-agg.conf
+ volumes:
+ - name: fluentd-log
+ hostPath:
+ path: /var/log/voltha/logging_volume
+ type: Directory
+---
+#
+# The cluster of fluentd forwarders
+#
+apiVersion: v1
+kind: Service
+metadata:
+ name: fluentd
+spec:
+ clusterIP: None
+ selector:
+ app: fluentd
+ ports:
+ - protocol: TCP
+ port: 24224
+ targetPort: 24224
+---
+apiVersion: apps/v1beta1
+kind: Deployment
+metadata:
+ name: fluentd
+spec:
+ replicas: 3
+ template:
+ metadata:
+ labels:
+ app: fluentd
+ spec:
+ terminationGracePeriodSeconds: 10
+ affinity:
+ podAntiAffinity:
+ requiredDuringSchedulingIgnoredDuringExecution:
+ - labelSelector:
+ matchExpressions:
+ - key: app
+ operator: In
+ values:
+ - fluentd
+ topologyKey: kubernetes.io/hostname
+ containers:
+ - name: fluentd
+ image: cord/fluentd
+ imagePullPolicy: Never
+ ports:
+ - containerPort: 24224
+ env:
+ - name: FLUENTD_CONF
+ value: fluent.conf
diff --git a/k8s/ofagent.yml b/k8s/ofagent.yml
new file mode 100644
index 0000000..c282fce
--- /dev/null
+++ b/k8s/ofagent.yml
@@ -0,0 +1,41 @@
+apiVersion: apps/v1beta1
+kind: Deployment
+metadata:
+ name: ofagent
+spec:
+ replicas: 3
+ template:
+ metadata:
+ labels:
+ app: ofagent
+ spec:
+ terminationGracePeriodSeconds: 10
+ affinity:
+ podAntiAffinity:
+ requiredDuringSchedulingIgnoredDuringExecution:
+ - labelSelector:
+ matchExpressions:
+ - key: app
+ operator: In
+ values:
+ - ofagent
+ topologyKey: kubernetes.io/hostname
+ containers:
+ - name: ofagent
+ image: cord/ofagent
+ imagePullPolicy: Never
+ env:
+ - name: NAMESPACE
+ valueFrom:
+ fieldRef:
+ fieldPath: metadata.namespace
+ args:
+ - "/ofagent/ofagent/main.py"
+ - "-v"
+ - "--consul=consul.$(NAMESPACE).svc.cluster.local:8500"
+ - "--fluentd=fluentd.$(NAMESPACE).svc.cluster.local:24224"
+ - "--controller=onos:6653"
+ - "--grpc-endpoint=vcore.$(NAMESPACE).svc.cluster.local:50556"
+ - "--enable-tls"
+ - "--key-file=/ofagent/pki/voltha.key"
+ - "--cert-file=/ofagent/pki/voltha.crt"
diff --git a/k8s/vcore_for_consul.yml b/k8s/vcore_for_consul.yml
index 4052c80..40b3631 100644
--- a/k8s/vcore_for_consul.yml
+++ b/k8s/vcore_for_consul.yml
@@ -31,30 +31,29 @@
app: vcore
spec:
containers:
- - name: voltha
- image: "cord/voltha:latest"
- env:
- - name: NAMESPACE
- valueFrom:
- fieldRef:
- fieldPath: metadata.namespace
- args:
- - "voltha/voltha/main.py"
- - "-v"
- - "--consul=consul.$(NAMESPACE).svc.cluster.local:8500"
- - "--kafka=kafka.$(NAMESPACE).svc.cluster.local"
- - "--rest-port=8880"
- - "--grpc-port=50556"
- - "--interface=eth1"
- - "--backend=consul"
- - "--pon-subnet=172.29.19.0/24"
- ports:
- - containerPort: 8880
- name: rest-port
- - containerPort: 18880
- name: mystery-port
- - containerPort: 50556
- name: grpc-port
- imagePullPolicy: Never
-
-
+ - name: voltha
+ image: "cord/voltha:latest"
+ imagePullPolicy: Never
+ ports:
+ - containerPort: 8880
+ name: rest-port
+ - containerPort: 18880
+ name: mystery-port
+ - containerPort: 50556
+ name: grpc-port
+ env:
+ - name: NAMESPACE
+ valueFrom:
+ fieldRef:
+ fieldPath: metadata.namespace
+ args:
+ - "voltha/voltha/main.py"
+ - "-v"
+ - "--consul=consul.$(NAMESPACE).svc.cluster.local:8500"
+ - "--kafka=kafka.$(NAMESPACE).svc.cluster.local"
+ - "--fluentd=fluentd.$(NAMESPACE).svc.cluster.local:24224"
+ - "--rest-port=8880"
+ - "--grpc-port=50556"
+ - "--interface=eth1"
+ - "--backend=consul"
+ - "--pon-subnet=172.29.19.0/24"
diff --git a/k8s/vcore_for_etcd.yml b/k8s/vcore_for_etcd.yml
index 35de8ab..ea207fc 100644
--- a/k8s/vcore_for_etcd.yml
+++ b/k8s/vcore_for_etcd.yml
@@ -43,6 +43,7 @@
- "-v"
- "--etcd=etcd.$(NAMESPACE).svc.cluster.local:2379"
- "--kafka=kafka.$(NAMESPACE).svc.cluster.local"
+ - "--fluentd=fluentd.$(NAMESPACE).svc.cluster.local:24224"
- "--rest-port=8880"
- "--grpc-port=50556"
- "--interface=eth1"
diff --git a/ofagent/connection_mgr.py b/ofagent/connection_mgr.py
index a61f2ab..13ce2a0 100644
--- a/ofagent/connection_mgr.py
+++ b/ofagent/connection_mgr.py
@@ -41,6 +41,7 @@
class ConnectionManager(object):
def __init__(self, consul_endpoint, vcore_endpoint, controller_endpoints,
+ instance_id,
enable_tls=False, key_file=None, cert_file=None,
vcore_retry_interval=0.5, devices_refresh_interval=5,
subscription_refresh_interval=5):
@@ -50,6 +51,7 @@
self.controller_endpoints = controller_endpoints
self.consul_endpoint = consul_endpoint
self.vcore_endpoint = vcore_endpoint
+ self.instance_id = instance_id
self.enable_tls = enable_tls
self.key_file = key_file
self.cert_file = cert_file
@@ -157,7 +159,7 @@
self._assign_grpc_attributes()
# Send subscription request to register the current ofagent instance
- container_name = get_my_containers_name()
+ container_name = self.instance_id
stub = voltha_pb2.VolthaLocalServiceStub(self.channel)
subscription = stub.Subscribe(OfAgentSubscriber(ofagent_id=container_name))
diff --git a/ofagent/main.py b/ofagent/main.py
index 6812514..5887b75 100755
--- a/ofagent/main.py
+++ b/ofagent/main.py
@@ -238,7 +238,7 @@
self.log.info('starting-internal-components')
args = self.args
self.connection_manager = yield ConnectionManager(
- args.consul, args.grpc_endpoint, args.controller,\
+ args.consul, args.grpc_endpoint, args.controller, args.instance_id, \
args.enable_tls, args.key_file, args.cert_file).start()
self.log.info('started-internal-services')
diff --git a/tests/utests/ofagent/test_connection_mgr.py b/tests/utests/ofagent/test_connection_mgr.py
index 179e691..2429cbb 100644
--- a/tests/utests/ofagent/test_connection_mgr.py
+++ b/tests/utests/ofagent/test_connection_mgr.py
@@ -1,3 +1,4 @@
+import os
from unittest import TestCase, main
from connection_mgr import ConnectionManager
@@ -9,6 +10,10 @@
controller_endpoints = ["localhost:6633","localhost:6644","localhost:6655"]
return (consul_endpoint,voltha_endpoint,controller_endpoints)
+ def gen_container_name(self):
+ instance_id = os.environ.get('HOSTNAME', 'localhost')
+ return instance_id
+
def gen_devices(self):
device =lambda: None
device.id = "1"
@@ -26,34 +31,39 @@
def test_connection_mgr_init(self):
consul_endpoint,voltha_endpoint,controller_endpoints = self.gen_endpoints()
- test_connection_init = ConnectionManager(consul_endpoint, voltha_endpoint, controller_endpoints)
+ my_name = self.gen_container_name()
+ test_connection_init = ConnectionManager(consul_endpoint, voltha_endpoint, controller_endpoints, my_name)
self.assertEqual(test_connection_init.consul_endpoint,consul_endpoint)
self.assertEqual(test_connection_init.vcore_endpoint, voltha_endpoint)
self.assertEqual(test_connection_init.controller_endpoints, controller_endpoints)
def test_resolve_endpoint(self):
consul_endpoint, voltha_endpoint, controller_endpoints = self.gen_endpoints()
- test_connection_init = ConnectionManager(consul_endpoint, voltha_endpoint, controller_endpoints)
+ my_name = self.gen_container_name()
+ test_connection_init = ConnectionManager(consul_endpoint, voltha_endpoint, controller_endpoints, my_name)
host,port = test_connection_init.resolve_endpoint(endpoint=consul_endpoint)
assert isinstance(port, int)
assert isinstance(host, basestring)
def test_refresh_agent_connections(self):
consul_endpoint, voltha_endpoint, controller_endpoints = self.gen_endpoints()
+ my_name = self.gen_container_name()
devices,device = self.gen_devices()
- test_connection_init = ConnectionManager(consul_endpoint, voltha_endpoint, controller_endpoints)
+ test_connection_init = ConnectionManager(consul_endpoint, voltha_endpoint, controller_endpoints, my_name)
test_connection_init.refresh_agent_connections(devices)
def test_create_agent(self):
consul_endpoint, voltha_endpoint, controller_endpoints = self.gen_endpoints()
+ my_name = self.gen_container_name()
devices,device = self.gen_devices()
- test_connection_init = ConnectionManager(consul_endpoint, voltha_endpoint, controller_endpoints)
+ test_connection_init = ConnectionManager(consul_endpoint, voltha_endpoint, controller_endpoints, my_name)
test_connection_init.create_agent(device)
def test_delete_agent(self):
consul_endpoint, voltha_endpoint, controller_endpoints = self.gen_endpoints()
+ my_name = self.gen_container_name()
devices,device = self.gen_devices()
- test_connection_init = ConnectionManager(consul_endpoint, voltha_endpoint, controller_endpoints)
+ test_connection_init = ConnectionManager(consul_endpoint, voltha_endpoint, controller_endpoints, my_name)
test_connection_init.create_agent(device)
with self.assertRaises(Exception) as context:
test_connection_init.delete_agent(device.datapath_id)
@@ -62,9 +72,10 @@
def test_forward_packet_in(self):
consul_endpoint, voltha_endpoint, controller_endpoints = self.gen_endpoints()
+ my_name = self.gen_container_name()
devices,device = self.gen_devices()
packet_in = self.gen_packet_in()
- test_connection_init = ConnectionManager(consul_endpoint, voltha_endpoint, controller_endpoints)
+ test_connection_init = ConnectionManager(consul_endpoint, voltha_endpoint, controller_endpoints, my_name)
test_connection_init.create_agent(device)
test_connection_init.forward_packet_in(device.id, packet_in)