[VOL-1462] Sync data between two voltha cores in the same pair

This commit consists of the following updates:
1) Background data syncing between two cores after a transaction
is completed by one core.
2) Add transaction management to southbound APIs (adapter facing).
This is enabled got adapter registration only for now.
3) Fix an issue with flow decomposition
4) Add the rough-in to allow a packet to be send to an OFAgent
with a transaction ID.  Two cores can therefore send the same
packet and let the OFAgent discard the duplicate.  The work in
OFAgent remains.
5) Cleanups

Change-Id: Ibe9d75edb66cfd6a0954bdfeb16a7e7c8a3c53b6
diff --git a/tests/core/concurrency/core_concurrency_test.go b/tests/core/concurrency/core_concurrency_test.go
new file mode 100644
index 0000000..d781ffc
--- /dev/null
+++ b/tests/core/concurrency/core_concurrency_test.go
@@ -0,0 +1,379 @@
+/*
+ * Copyright 2018-present Open Networking Foundation
+
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+
+ * http://www.apache.org/licenses/LICENSE-2.0
+
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package concurrency
+
+import (
+	"context"
+	"errors"
+	"fmt"
+	"github.com/golang/protobuf/ptypes/empty"
+	"github.com/google/uuid"
+	com "github.com/opencord/voltha-go/adapters/common"
+	"github.com/opencord/voltha-go/common/log"
+	"github.com/opencord/voltha-go/protos/common"
+	"github.com/opencord/voltha-go/protos/voltha"
+	"github.com/stretchr/testify/assert"
+	"google.golang.org/grpc"
+	"google.golang.org/grpc/metadata"
+	"os"
+	"os/exec"
+	"strings"
+	"testing"
+)
+
+var conns []*grpc.ClientConn
+var stubs []voltha.VolthaServiceClient
+var volthaSerialNumberKey string
+var grpcPorts []int
+
+/*
+ This series of tests are executed with two RW_Cores
+*/
+
+var devices map[string]*voltha.Device
+
+func setup() {
+	var err error
+
+	if _, err = log.AddPackage(log.JSON, log.WarnLevel, log.Fields{"instanceId": "testing"}); err != nil {
+		log.With(log.Fields{"error": err}).Fatal("Cannot setup logging")
+	}
+	log.UpdateAllLoggers(log.Fields{"instanceId": "testing"})
+	log.SetAllLogLevel(log.ErrorLevel)
+
+	grpcPorts = []int{50057, 50058}
+	stubs = make([]voltha.VolthaServiceClient, 0)
+	conns = make([]*grpc.ClientConn, 0)
+
+	volthaSerialNumberKey = "voltha_serial_number"
+	devices = make(map[string]*voltha.Device)
+}
+
+func connectToCore(port int) (voltha.VolthaServiceClient, error)  {
+	grpcHostIP := os.Getenv("DOCKER_HOST_IP")
+	grpcHost := fmt.Sprintf("%s:%d", grpcHostIP, port)
+	conn, err := grpc.Dial(grpcHost, grpc.WithInsecure())
+	if err != nil {
+		log.Fatalf("did not connect: %s", err)
+		return nil, errors.New("failure-to-connect")
+	}
+	conns = append(conns, conn)
+	return voltha.NewVolthaServiceClient(conn), nil
+}
+
+func setupGrpcConnection() []voltha.VolthaServiceClient {
+	// We have 2 concurrent cores.  Connect to them
+	for _, port := range grpcPorts {
+		if client, err := connectToCore(port); err == nil {
+			stubs = append(stubs, client)
+			log.Infow("connected", log.Fields{"port": port})
+		}
+	}
+	return stubs
+}
+
+func clearAllDevices(clearMap bool) {
+	for key, _ := range devices {
+		ctx := context.Background()
+		response, err := stubs[1].DeleteDevice(ctx, &voltha.ID{Id: key})
+		log.Infow("response", log.Fields{"res": response, "error": err})
+		if clearMap {
+			delete(devices, key)
+		}
+	}
+}
+
+// Verify if all ids are present in the global list of devices
+func hasAllIds(ids *voltha.IDs) bool {
+	if ids == nil && len(devices) == 0 {
+		return true
+	}
+	if ids == nil {
+		return false
+	}
+	for _, id := range ids.Items {
+		if _, exist := devices[id.Id]; !exist {
+			return false
+		}
+	}
+	return true
+}
+
+func startKafka() {
+	fmt.Println("Starting Kafka and Etcd ...")
+	command := "docker-compose"
+	cmd := exec.Command(command, "-f", "../../../compose/docker-compose-zk-kafka-test.yml", "up", "-d")
+	if err := cmd.Run(); err != nil {
+		log.Fatal(err)
+	}
+}
+
+func startEtcd() {
+	fmt.Println("Starting Etcd ...")
+	command := "docker-compose"
+	cmd := exec.Command(command, "-f", "../../../compose/docker-compose-etcd.yml", "up", "-d")
+	if err := cmd.Run(); err != nil {
+		log.Fatal(err)
+	}
+}
+
+func stopKafka() {
+	fmt.Println("Stopping Kafka and Etcd ...")
+	command := "docker-compose"
+	cmd := exec.Command(command, "-f", "../../../compose/docker-compose-zk-kafka-test.yml", "down")
+	if err := cmd.Run(); err != nil {
+		// ignore error - as this is mostly due network being left behind as its being used by other
+		// containers
+		log.Warn(err)
+	}
+}
+
+func stopEtcd() {
+	fmt.Println("Stopping Etcd ...")
+	command := "docker-compose"
+	cmd := exec.Command(command, "-f", "../../../compose/docker-compose-etcd.yml", "down")
+	if err := cmd.Run(); err != nil {
+		// ignore error - as this is mostly due network being left behind as its being used by other
+		// containers
+		log.Warn(err)
+	}
+}
+
+func startCores() {
+	fmt.Println("Starting voltha cores ...")
+	command := "docker-compose"
+	cmd := exec.Command(command, "-f", "../../../compose/rw_core_concurrency_test.yml", "up", "-d")
+	if err := cmd.Run(); err != nil {
+		log.Fatal(err)
+	}
+}
+
+func stopCores() {
+	fmt.Println("Stopping voltha cores ...")
+	command := "docker-compose"
+	cmd := exec.Command(command, "-f", "../../../compose/rw_core_concurrency_test.yml", "down")
+	if err := cmd.Run(); err != nil {
+		// ignore error - as this is mostly due network being left behind as its being used by other
+		// containers
+		log.Warn(err)
+	}
+}
+
+func startSimulatedOLTAndONUAdapters() {
+	fmt.Println("Starting simulated OLT and ONU adapters ...")
+	command := "docker-compose"
+	cmd := exec.Command(command, "-f", "../../../compose/adapters-simulated.yml", "up", "-d")
+	if err := cmd.Run(); err != nil {
+		log.Fatal(err)
+	}
+}
+
+func stopSimulatedOLTAndONUAdapters() {
+	fmt.Println("Stopping simulated OLT and ONU adapters ...")
+	command := "docker-compose"
+	cmd := exec.Command(command, "-f", "../../../compose/adapters-simulated.yml", "down")
+	if err := cmd.Run(); err != nil {
+		// ignore error - as this is mostly due network being left behind as its being used by other
+		// containers
+		log.Warn(err)
+	}
+}
+
+
+func sendCreateDeviceRequest(ctx context.Context, stub voltha.VolthaServiceClient, device *voltha.Device, ch chan interface{} ) {
+	fmt.Println("Sending  create device ...")
+	if response, err := stub.CreateDevice(ctx, device); err != nil {
+		ch <- err
+	} else {
+		ch <- response
+	}
+}
+
+func sendEnableDeviceRequest(ctx context.Context, stub voltha.VolthaServiceClient, deviceId string, ch chan interface{} ) {
+	fmt.Println("Sending enable device ...")
+	if response, err := stub.EnableDevice(ctx, &common.ID{Id:deviceId}); err != nil {
+		ch <- err
+	} else {
+		ch <- response
+	}
+}
+
+//// createPonsimDevice sends two requests to each core and waits for both responses
+//func createPonsimDevice(stubs []voltha.VolthaServiceClient) (*voltha.Device, error) {
+//	ui := uuid.New()
+//	ctx := metadata.NewOutgoingContext(context.Background(), metadata.Pairs(volthaSerialNumberKey, ui.String()))
+//	//preprovision_olt -t ponsim_olt -H 172.20.0.11:50060
+//	device := &voltha.Device{Type: "ponsim_olt"}
+//	device.Address = &voltha.Device_HostAndPort{HostAndPort:"172.20.0.11:50060"}
+//	ch := make(chan interface{})
+//	defer close(ch)
+//	requestNum := 0
+//	for _, stub := range stubs {
+//		go sendCreateDeviceRequest(ctx, stub, device, ch)
+//		requestNum += 1
+//	}
+//	fmt.Println("Waiting for create device response ...")
+//	receivedResponse := 0
+//	var err error
+//	var returnedDevice *voltha.Device
+//	select {
+//	case res, ok := <-ch:
+//		receivedResponse += 1
+//		if !ok {
+//		} else if er, ok := res.(error); ok {
+//			err = er
+//		} else if d, ok := res.(*voltha.Device); ok {
+//			returnedDevice = d
+//		}
+//		if receivedResponse == requestNum {
+//			break
+//		}
+//	}
+//	if returnedDevice != nil {
+//		return returnedDevice, nil
+//	}
+//	return nil, err
+//}
+
+// createDevice sends two requests to each core and waits for both responses
+func createDevice(stubs []voltha.VolthaServiceClient) (*voltha.Device, error) {
+	ui := uuid.New()
+	ctx := metadata.NewOutgoingContext(context.Background(), metadata.Pairs(volthaSerialNumberKey, ui.String()))
+	randomMacAddress := strings.ToUpper(com.GetRandomMacAddress())
+	device := &voltha.Device{Type: "simulated_olt", MacAddress:randomMacAddress}
+	ch := make(chan interface{})
+	defer close(ch)
+	requestNum := 0
+	for _, stub := range stubs {
+		go sendCreateDeviceRequest(ctx, stub, device, ch)
+		requestNum += 1
+	}
+	fmt.Println("Waiting for create device response ...")
+	receivedResponse := 0
+	var err error
+	var returnedDevice *voltha.Device
+	select {
+	case res, ok := <-ch:
+		receivedResponse += 1
+		if !ok {
+		} else if er, ok := res.(error); ok {
+			err = er
+		} else if d, ok := res.(*voltha.Device); ok {
+			returnedDevice = d
+		}
+		if receivedResponse == requestNum {
+			break
+		}
+	}
+	if returnedDevice != nil {
+		return returnedDevice, nil
+	}
+	return nil, err
+}
+
+// enableDevices sends two requests to each core for each device and waits for both responses before sending another
+// enable request for a different device.
+func enableAllDevices(stubs []voltha.VolthaServiceClient) error {
+	for deviceId, val := range devices {
+		if val.AdminState == voltha.AdminState_PREPROVISIONED {
+			ui := uuid.New()
+			ctx := metadata.NewOutgoingContext(context.Background(), metadata.Pairs(volthaSerialNumberKey, ui.String()))
+			ch := make(chan interface{})
+			defer close(ch)
+			requestNum := 0
+			for _, stub := range stubs {
+				go sendEnableDeviceRequest(ctx, stub, deviceId, ch)
+				requestNum +=1
+			}
+			receivedResponse := 0
+			var err error
+			fmt.Println("Waiting for enable device response ...")
+			validResponseReceived := false
+			select {
+			case res, ok := <-ch:
+				receivedResponse += 1
+				if !ok {
+				} else if er, ok := res.(error); ok {
+					err = er
+				} else if _ , ok := res.(*empty.Empty); ok {
+					validResponseReceived = true
+				}
+				if receivedResponse == requestNum {
+					break
+				}
+			}
+			if validResponseReceived {
+				return nil
+			}
+			return err
+		}
+	}
+	return nil
+}
+
+
+func TestConcurrentRequests(t *testing.T) {
+	fmt.Println("Testing Concurrent requests ...")
+
+	////0. Start kafka and Ectd
+	//startKafka()
+	//startEtcd()
+	//
+	////1. Start the core
+	//startCores()
+	//
+	////2. Start the simulated adapters
+	//startSimulatedOLTAndONUAdapters()
+	//
+	//// Wait until the core and adapters sync up
+	//time.Sleep(10 * time.Second)
+
+	stubs = setupGrpcConnection()
+
+	//3.  Create the devices
+	response, err := createDevice(stubs)
+	log.Infow("response", log.Fields{"res": response, "error": err})
+	assert.Nil(t, err)
+	devices[response.Id] = response
+
+	//4. Enable all the devices
+	err = enableAllDevices(stubs)
+	assert.Nil(t, err)
+
+	////5. Store simulated adapters
+	//stopSimulatedOLTAndONUAdapters()
+	//
+	////6. Store the core
+	//stopCores()
+	//
+	////7. Stop Kafka and Etcd
+	//stopKafka()
+	//stopEtcd()
+}
+
+
+func shutdown() {
+	for _, conn := range conns {
+		conn.Close()
+	}
+}
+
+func TestMain(m *testing.M) {
+	setup()
+	code := m.Run()
+	shutdown()
+	os.Exit(code)
+}