/*
 * Copyright 2019-present Open Networking Foundation
 *
 * Licensed under the Apache License, Version 2.0 (the "License");
 * you may not use this file except in compliance with the License.
 * You may obtain a copy of the License at
 *
 * http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */
package device

import (
	"context"
	"math/rand"
	"sync"
	"testing"
	"time"

	"github.com/gogo/protobuf/proto"
	"github.com/opencord/voltha-go/db/model"
	"github.com/opencord/voltha-go/rw_core/config"
	"github.com/opencord/voltha-go/rw_core/core/adapter"
	tst "github.com/opencord/voltha-go/rw_core/test"
	com "github.com/opencord/voltha-lib-go/v3/pkg/adapters/common"
	"github.com/opencord/voltha-lib-go/v3/pkg/db"
	fu "github.com/opencord/voltha-lib-go/v3/pkg/flows"
	"github.com/opencord/voltha-lib-go/v3/pkg/kafka"
	mock_etcd "github.com/opencord/voltha-lib-go/v3/pkg/mocks/etcd"
	mock_kafka "github.com/opencord/voltha-lib-go/v3/pkg/mocks/kafka"
	ofp "github.com/opencord/voltha-protos/v3/go/openflow_13"
	"github.com/opencord/voltha-protos/v3/go/voltha"
	"github.com/phayes/freeport"
	"github.com/stretchr/testify/assert"
)

func TestLogicalDeviceAgent_diff_nochange_1(t *testing.T) {
	currentLogicalPorts := []*voltha.LogicalPort{}
	updatedLogicalPorts := []*voltha.LogicalPort{}
	newPorts, changedPorts, deletedPorts := diff(currentLogicalPorts, updatedLogicalPorts)
	assert.Equal(t, 0, len(newPorts))
	assert.Equal(t, 0, len(changedPorts))
	assert.Equal(t, 0, len(deletedPorts))
}

func TestLogicalDeviceAgent_diff_nochange_2(t *testing.T) {
	currentLogicalPorts := []*voltha.LogicalPort{
		{
			Id:           "1231",
			DeviceId:     "d1234",
			DevicePortNo: 1,
			RootPort:     true,
			OfpPort: &ofp.OfpPort{
				PortNo: 1,
				Name:   "port1",
				Config: 1,
				State:  1,
			},
		},
		{
			Id:           "1232",
			DeviceId:     "d1234",
			DevicePortNo: 2,
			RootPort:     false,
			OfpPort: &ofp.OfpPort{
				PortNo: 2,
				Name:   "port2",
				Config: 1,
				State:  1,
			},
		},
		{
			Id:           "1233",
			DeviceId:     "d1234",
			DevicePortNo: 3,
			RootPort:     false,
			OfpPort: &ofp.OfpPort{
				PortNo: 3,
				Name:   "port3",
				Config: 1,
				State:  1,
			},
		},
	}
	updatedLogicalPorts := []*voltha.LogicalPort{
		{
			Id:           "1231",
			DeviceId:     "d1234",
			DevicePortNo: 1,
			RootPort:     true,
			OfpPort: &ofp.OfpPort{
				PortNo: 1,
				Name:   "port1",
				Config: 1,
				State:  1,
			},
		},
		{
			Id:           "1232",
			DeviceId:     "d1234",
			DevicePortNo: 2,
			RootPort:     false,
			OfpPort: &ofp.OfpPort{
				PortNo: 2,
				Name:   "port2",
				Config: 1,
				State:  1,
			},
		},
		{
			Id:           "1233",
			DeviceId:     "d1234",
			DevicePortNo: 3,
			RootPort:     false,
			OfpPort: &ofp.OfpPort{
				PortNo: 3,
				Name:   "port3",
				Config: 1,
				State:  1,
			},
		},
	}
	newPorts, changedPorts, deletedPorts := diff(currentLogicalPorts, updatedLogicalPorts)
	assert.Equal(t, 0, len(newPorts))
	assert.Equal(t, 0, len(changedPorts))
	assert.Equal(t, 0, len(deletedPorts))
}

func TestLogicalDeviceAgent_diff_add(t *testing.T) {
	currentLogicalPorts := []*voltha.LogicalPort{}
	updatedLogicalPorts := []*voltha.LogicalPort{
		{
			Id:           "1231",
			DeviceId:     "d1234",
			DevicePortNo: 1,
			RootPort:     true,
			OfpPort: &ofp.OfpPort{
				PortNo: 1,
				Name:   "port1",
				Config: 1,
				State:  1,
			},
		},
		{
			Id:           "1232",
			DeviceId:     "d1234",
			DevicePortNo: 2,
			RootPort:     true,
			OfpPort: &ofp.OfpPort{
				PortNo: 2,
				Name:   "port2",
				Config: 1,
				State:  1,
			},
		},
	}
	newPorts, changedPorts, deletedPorts := diff(currentLogicalPorts, updatedLogicalPorts)
	assert.Equal(t, 2, len(newPorts))
	assert.Equal(t, 0, len(changedPorts))
	assert.Equal(t, 0, len(deletedPorts))
	assert.Equal(t, updatedLogicalPorts[0], newPorts[updatedLogicalPorts[0].Id])
	assert.Equal(t, updatedLogicalPorts[1], newPorts[updatedLogicalPorts[1].Id])
}

func TestLogicalDeviceAgent_diff_delete(t *testing.T) {
	currentLogicalPorts := []*voltha.LogicalPort{
		{
			Id:           "1231",
			DeviceId:     "d1234",
			DevicePortNo: 1,
			RootPort:     true,
			OfpPort: &ofp.OfpPort{
				PortNo: 1,
				Name:   "port1",
				Config: 1,
				State:  1,
			},
		},
	}
	updatedLogicalPorts := []*voltha.LogicalPort{}
	newPorts, changedPorts, deletedPorts := diff(currentLogicalPorts, updatedLogicalPorts)
	assert.Equal(t, 0, len(newPorts))
	assert.Equal(t, 0, len(changedPorts))
	assert.Equal(t, 1, len(deletedPorts))
	assert.Equal(t, currentLogicalPorts[0], deletedPorts[currentLogicalPorts[0].Id])
}

func TestLogicalDeviceAgent_diff_changed(t *testing.T) {
	currentLogicalPorts := []*voltha.LogicalPort{
		{
			Id:           "1231",
			DeviceId:     "d1234",
			DevicePortNo: 1,
			RootPort:     true,
			OfpPort: &ofp.OfpPort{
				PortNo: 1,
				Name:   "port1",
				Config: 1,
				State:  1,
			},
		},
		{
			Id:           "1232",
			DeviceId:     "d1234",
			DevicePortNo: 2,
			RootPort:     false,
			OfpPort: &ofp.OfpPort{
				PortNo: 2,
				Name:   "port2",
				Config: 1,
				State:  1,
			},
		},
		{
			Id:           "1233",
			DeviceId:     "d1234",
			DevicePortNo: 3,
			RootPort:     false,
			OfpPort: &ofp.OfpPort{
				PortNo: 3,
				Name:   "port3",
				Config: 1,
				State:  1,
			},
		},
	}
	updatedLogicalPorts := []*voltha.LogicalPort{
		{
			Id:           "1231",
			DeviceId:     "d1234",
			DevicePortNo: 1,
			RootPort:     true,
			OfpPort: &ofp.OfpPort{
				PortNo: 1,
				Name:   "port1",
				Config: 4,
				State:  4,
			},
		},
		{
			Id:           "1232",
			DeviceId:     "d1234",
			DevicePortNo: 2,
			RootPort:     false,
			OfpPort: &ofp.OfpPort{
				PortNo: 2,
				Name:   "port2",
				Config: 4,
				State:  4,
			},
		},
		{
			Id:           "1233",
			DeviceId:     "d1234",
			DevicePortNo: 3,
			RootPort:     false,
			OfpPort: &ofp.OfpPort{
				PortNo: 3,
				Name:   "port3",
				Config: 1,
				State:  1,
			},
		},
	}
	newPorts, changedPorts, deletedPorts := diff(currentLogicalPorts, updatedLogicalPorts)
	assert.Equal(t, 0, len(newPorts))
	assert.Equal(t, 2, len(changedPorts))
	assert.Equal(t, 0, len(deletedPorts))
	assert.Equal(t, updatedLogicalPorts[0], changedPorts[updatedLogicalPorts[0].Id])
	assert.Equal(t, updatedLogicalPorts[1], changedPorts[updatedLogicalPorts[1].Id])
}

func TestLogicalDeviceAgent_diff_mix(t *testing.T) {
	currentLogicalPorts := []*voltha.LogicalPort{
		{
			Id:           "1231",
			DeviceId:     "d1234",
			DevicePortNo: 1,
			RootPort:     true,
			OfpPort: &ofp.OfpPort{
				PortNo: 1,
				Name:   "port1",
				Config: 1,
				State:  1,
			},
		},
		{
			Id:           "1232",
			DeviceId:     "d1234",
			DevicePortNo: 2,
			RootPort:     false,
			OfpPort: &ofp.OfpPort{
				PortNo: 2,
				Name:   "port2",
				Config: 1,
				State:  1,
			},
		},
		{
			Id:           "1233",
			DeviceId:     "d1234",
			DevicePortNo: 3,
			RootPort:     false,
			OfpPort: &ofp.OfpPort{
				PortNo: 3,
				Name:   "port3",
				Config: 1,
				State:  1,
			},
		},
	}
	updatedLogicalPorts := []*voltha.LogicalPort{
		{
			Id:           "1231",
			DeviceId:     "d1234",
			DevicePortNo: 1,
			RootPort:     true,
			OfpPort: &ofp.OfpPort{
				PortNo: 1,
				Name:   "port1",
				Config: 4,
				State:  4,
			},
		},
		{
			Id:           "1232",
			DeviceId:     "d1234",
			DevicePortNo: 2,
			RootPort:     false,
			OfpPort: &ofp.OfpPort{
				PortNo: 2,
				Name:   "port2",
				Config: 4,
				State:  4,
			},
		},
		{
			Id:           "1234",
			DeviceId:     "d1234",
			DevicePortNo: 4,
			RootPort:     false,
			OfpPort: &ofp.OfpPort{
				PortNo: 4,
				Name:   "port4",
				Config: 4,
				State:  4,
			},
		},
	}
	newPorts, changedPorts, deletedPorts := diff(currentLogicalPorts, updatedLogicalPorts)
	assert.Equal(t, 1, len(newPorts))
	assert.Equal(t, 2, len(changedPorts))
	assert.Equal(t, 1, len(deletedPorts))
	assert.Equal(t, updatedLogicalPorts[0], changedPorts[updatedLogicalPorts[0].Id])
	assert.Equal(t, updatedLogicalPorts[1], changedPorts[updatedLogicalPorts[1].Id])
	assert.Equal(t, currentLogicalPorts[2], deletedPorts[currentLogicalPorts[2].Id])
}

type LDATest struct {
	etcdServer       *mock_etcd.EtcdServer
	deviceMgr        *Manager
	kmp              kafka.InterContainerProxy
	logicalDeviceMgr *LogicalManager
	kClient          kafka.Client
	kvClientPort     int
	oltAdapterName   string
	onuAdapterName   string
	coreInstanceID   string
	defaultTimeout   time.Duration
	maxTimeout       time.Duration
	logicalDevice    *voltha.LogicalDevice
	deviceIds        []string
	done             chan int
}

func newLDATest() *LDATest {
	test := &LDATest{}
	// Start the embedded etcd server
	var err error
	test.etcdServer, test.kvClientPort, err = tst.StartEmbeddedEtcdServer("voltha.rwcore.lda.test", "voltha.rwcore.lda.etcd", "error")
	if err != nil {
		logger.Fatal(err)
	}
	// Create the kafka client
	test.kClient = mock_kafka.NewKafkaClient()
	test.oltAdapterName = "olt_adapter_mock"
	test.onuAdapterName = "onu_adapter_mock"
	test.coreInstanceID = "rw-da-test"
	test.defaultTimeout = 5 * time.Second
	test.maxTimeout = 20 * time.Second
	test.done = make(chan int)
	test.deviceIds = []string{com.GetRandomString(10), com.GetRandomString(10), com.GetRandomString(10)}
	test.logicalDevice = &voltha.LogicalDevice{
		Desc: &ofp.OfpDesc{
			HwDesc:    "olt_adapter_mock",
			SwDesc:    "olt_adapter_mock",
			SerialNum: com.GetRandomSerialNumber(),
		},
		SwitchFeatures: &ofp.OfpSwitchFeatures{
			NBuffers: 256,
			NTables:  2,
			Capabilities: uint32(ofp.OfpCapabilities_OFPC_FLOW_STATS |
				ofp.OfpCapabilities_OFPC_TABLE_STATS |
				ofp.OfpCapabilities_OFPC_PORT_STATS |
				ofp.OfpCapabilities_OFPC_GROUP_STATS),
		},
		RootDeviceId: test.deviceIds[0],
		Ports: []*voltha.LogicalPort{
			{
				Id:           "1001",
				DeviceId:     test.deviceIds[0],
				DevicePortNo: 1,
				RootPort:     true,
				OfpPort: &ofp.OfpPort{
					PortNo: 1,
					Name:   "port1",
					Config: 4,
					State:  4,
				},
			},
			{
				Id:           "1002",
				DeviceId:     test.deviceIds[1],
				DevicePortNo: 2,
				RootPort:     false,
				OfpPort: &ofp.OfpPort{
					PortNo: 2,
					Name:   "port2",
					Config: 4,
					State:  4,
				},
			},
			{
				Id:           "1003",
				DeviceId:     test.deviceIds[2],
				DevicePortNo: 3,
				RootPort:     false,
				OfpPort: &ofp.OfpPort{
					PortNo: 4,
					Name:   "port3",
					Config: 4,
					State:  4,
				},
			},
		},
	}
	return test
}

func (lda *LDATest) startCore(inCompeteMode bool) {
	cfg := config.NewRWCoreFlags()
	cfg.CoreTopic = "rw_core"
	cfg.DefaultRequestTimeout = lda.defaultTimeout
	cfg.KVStorePort = lda.kvClientPort
	cfg.InCompetingMode = inCompeteMode
	grpcPort, err := freeport.GetFreePort()
	if err != nil {
		logger.Fatal("Cannot get a freeport for grpc")
	}
	cfg.GrpcPort = grpcPort
	cfg.GrpcHost = "127.0.0.1"
	client := tst.SetupKVClient(cfg, lda.coreInstanceID)
	backend := &db.Backend{
		Client:                  client,
		StoreType:               cfg.KVStoreType,
		Host:                    cfg.KVStoreHost,
		Port:                    cfg.KVStorePort,
		Timeout:                 cfg.KVStoreTimeout,
		LivenessChannelInterval: cfg.LiveProbeInterval / 2}
	lda.kmp = kafka.NewInterContainerProxy(
		kafka.InterContainerHost(cfg.KafkaAdapterHost),
		kafka.InterContainerPort(cfg.KafkaAdapterPort),
		kafka.MsgClient(lda.kClient),
		kafka.DefaultTopic(&kafka.Topic{Name: cfg.CoreTopic}),
		kafka.DeviceDiscoveryTopic(&kafka.Topic{Name: cfg.AffinityRouterTopic}))

	endpointMgr := kafka.NewEndpointManager(backend)
	proxy := model.NewDBPath(backend)
	adapterMgr := adapter.NewAdapterManager(proxy, lda.coreInstanceID, lda.kClient)

	lda.deviceMgr, lda.logicalDeviceMgr = NewManagers(proxy, adapterMgr, lda.kmp, endpointMgr, cfg.CoreTopic, lda.coreInstanceID, cfg.DefaultCoreTimeout)
	if err = lda.kmp.Start(); err != nil {
		logger.Fatal("Cannot start InterContainerProxy")
	}
	adapterMgr.Start(context.Background())
}

func (lda *LDATest) stopAll() {
	if lda.kClient != nil {
		lda.kClient.Stop()
	}
	if lda.kmp != nil {
		lda.kmp.Stop()
	}
	if lda.etcdServer != nil {
		tst.StopEmbeddedEtcdServer(lda.etcdServer)
	}
}

func (lda *LDATest) createLogicalDeviceAgent(t *testing.T) *LogicalAgent {
	lDeviceMgr := lda.logicalDeviceMgr
	deviceMgr := lda.deviceMgr
	clonedLD := proto.Clone(lda.logicalDevice).(*voltha.LogicalDevice)
	clonedLD.Id = com.GetRandomString(10)
	clonedLD.DatapathId = rand.Uint64()
	lDeviceAgent := newLogicalAgent(clonedLD.Id, clonedLD.Id, clonedLD.RootDeviceId, lDeviceMgr, deviceMgr, lDeviceMgr.dbPath, lDeviceMgr.ldProxy, lDeviceMgr.defaultTimeout)
	lDeviceAgent.logicalDevice = clonedLD
	err := lDeviceAgent.ldProxy.Set(context.Background(), clonedLD.Id, clonedLD)
	assert.Nil(t, err)
	lDeviceMgr.addLogicalDeviceAgentToMap(lDeviceAgent)
	return lDeviceAgent
}

func (lda *LDATest) updateLogicalDeviceConcurrently(t *testing.T, ldAgent *LogicalAgent, globalWG *sync.WaitGroup) {
	originalLogicalDevice, _ := ldAgent.GetLogicalDevice(context.Background())
	assert.NotNil(t, originalLogicalDevice)
	var localWG sync.WaitGroup

	// Change the state of the first port to FAILED
	localWG.Add(1)
	go func() {
		err := ldAgent.updatePortState(context.Background(), lda.logicalDevice.Ports[0].DeviceId, lda.logicalDevice.Ports[0].DevicePortNo, voltha.OperStatus_FAILED)
		assert.Nil(t, err)
		localWG.Done()
	}()

	// Change the state of the second port to TESTING
	localWG.Add(1)
	go func() {
		err := ldAgent.updatePortState(context.Background(), lda.logicalDevice.Ports[1].DeviceId, lda.logicalDevice.Ports[1].DevicePortNo, voltha.OperStatus_TESTING)
		assert.Nil(t, err)
		localWG.Done()
	}()

	// Change the state of the third port to UNKNOWN and then back to ACTIVE
	localWG.Add(1)
	go func() {
		err := ldAgent.updatePortState(context.Background(), lda.logicalDevice.Ports[2].DeviceId, lda.logicalDevice.Ports[2].DevicePortNo, voltha.OperStatus_UNKNOWN)
		assert.Nil(t, err)
		err = ldAgent.updatePortState(context.Background(), lda.logicalDevice.Ports[2].DeviceId, lda.logicalDevice.Ports[2].DevicePortNo, voltha.OperStatus_ACTIVE)
		assert.Nil(t, err)
		localWG.Done()
	}()

	// Add a meter to the logical device
	meterMod := &ofp.OfpMeterMod{
		Command: ofp.OfpMeterModCommand_OFPMC_ADD,
		Flags:   rand.Uint32(),
		MeterId: rand.Uint32(),
		Bands: []*ofp.OfpMeterBandHeader{
			{Type: ofp.OfpMeterBandType_OFPMBT_EXPERIMENTER,
				Rate:      rand.Uint32(),
				BurstSize: rand.Uint32(),
				Data:      nil,
			},
		},
	}
	localWG.Add(1)
	go func() {
		err := ldAgent.meterAdd(context.Background(), meterMod)
		assert.Nil(t, err)
		localWG.Done()
	}()
	// wait for go routines to be done
	localWG.Wait()
	meterEntry := fu.MeterEntryFromMeterMod(meterMod)

	meterHandle, have := ldAgent.meterLoader.Lock(meterMod.MeterId)
	assert.Equal(t, have, true)
	if have {
		assert.True(t, proto.Equal(meterEntry, meterHandle.GetReadOnly()))
		meterHandle.Unlock()
	}

	expectedChange := proto.Clone(originalLogicalDevice).(*voltha.LogicalDevice)
	expectedChange.Ports[0].OfpPort.Config = originalLogicalDevice.Ports[0].OfpPort.Config | uint32(ofp.OfpPortConfig_OFPPC_PORT_DOWN)
	expectedChange.Ports[0].OfpPort.State = uint32(ofp.OfpPortState_OFPPS_LINK_DOWN)
	expectedChange.Ports[1].OfpPort.Config = originalLogicalDevice.Ports[0].OfpPort.Config | uint32(ofp.OfpPortConfig_OFPPC_PORT_DOWN)
	expectedChange.Ports[1].OfpPort.State = uint32(ofp.OfpPortState_OFPPS_LINK_DOWN)
	expectedChange.Ports[2].OfpPort.Config = originalLogicalDevice.Ports[0].OfpPort.Config & ^uint32(ofp.OfpPortConfig_OFPPC_PORT_DOWN)
	expectedChange.Ports[2].OfpPort.State = uint32(ofp.OfpPortState_OFPPS_LIVE)
	updatedLogicalDevice, _ := ldAgent.GetLogicalDevice(context.Background())
	assert.NotNil(t, updatedLogicalDevice)
	assert.True(t, proto.Equal(expectedChange, updatedLogicalDevice))
	globalWG.Done()
}

func TestConcurrentLogicalDeviceUpdate(t *testing.T) {
	lda := newLDATest()
	assert.NotNil(t, lda)
	defer lda.stopAll()

	// Start the Core
	lda.startCore(false)

	var wg sync.WaitGroup
	numConCurrentLogicalDeviceAgents := 3
	for i := 0; i < numConCurrentLogicalDeviceAgents; i++ {
		wg.Add(1)
		a := lda.createLogicalDeviceAgent(t)
		go lda.updateLogicalDeviceConcurrently(t, a, &wg)
	}

	wg.Wait()
}
