/*
 * Portions copyright 2019-present Open Networking Foundation
 * Original copyright 2019-present Ciena Corporation
 *
 * Licensed under the Apache License, Version 2.0 (the"github.com/stretchr/testify/assert" "License");
 * you may not use this file except in compliance with the License.
 * You may obtain a copy of the License at
 *
 * http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */
package afrouter

import (
	"fmt"
	"github.com/golang/protobuf/proto"
	"github.com/opencord/voltha-go/common/log"
	common_pb "github.com/opencord/voltha-protos/go/common"
	voltha_pb "github.com/opencord/voltha-protos/go/voltha"
	"github.com/stretchr/testify/assert"
	"google.golang.org/grpc"
	"testing"
)

const (
	AFFINITY_ROUTER_PROTOFILE = "../../../vendor/github.com/opencord/voltha-protos/voltha.pb"
)

// Unit test initialization
func init() {
	// Logger must be configured or bad things happen
	log.SetDefaultLogger(log.JSON, log.DebugLevel, log.Fields{"instanceId": 1})
	log.AddPackage(log.JSON, log.WarnLevel, nil)
}

// Build an affinity router configuration
func MakeAffinityTestConfig(numBackends int, numConnections int) (*RouteConfig, *RouterConfig) {

	var backends []BackendConfig
	for backendIndex := 0; backendIndex < numBackends; backendIndex++ {
		var connections []ConnectionConfig
		for connectionIndex := 0; connectionIndex < numConnections; connectionIndex++ {
			connectionConfig := ConnectionConfig{
				Name: fmt.Sprintf("rw_vcore%d%d", backendIndex, connectionIndex+1),
				Addr: "foo",
				Port: "123",
			}
			connections = append(connections, connectionConfig)
		}

		backendConfig := BackendConfig{
			Name:        fmt.Sprintf("rw_vcore%d", backendIndex),
			Type:        BackendSingleServer,
			Connections: connections,
		}

		backends = append(backends, backendConfig)
	}

	backendClusterConfig := BackendClusterConfig{
		Name:     "vcore",
		Backends: backends,
	}

	routeConfig := RouteConfig{
		Name:             "dev_manager",
		Type:             RouteTypeRpcAffinityMessage,
		Association:      AssociationRoundRobin,
		BackendCluster:   "vcore",
		backendCluster:   &backendClusterConfig,
		RouteField:       "id",
		Methods:          []string{"CreateDevice", "EnableDevice"},
		NbBindingMethods: []string{"CreateDevice"},
	}

	routerConfig := RouterConfig{
		Name:         "vcore",
		ProtoService: "VolthaService",
		ProtoPackage: "voltha",
		Routes:       []RouteConfig{routeConfig},
		ProtoFile:    AFFINITY_ROUTER_PROTOFILE,
	}
	return &routeConfig, &routerConfig
}

// Route() requires an open connection, so pretend we have one.
func PretendAffinityOpenConnection(router Router, clusterName string, backendIndex int, connectionName string) {
	cluster := router.FindBackendCluster(clusterName)

	// Route Method expects an open connection
	conn := cluster.backends[backendIndex].connections[connectionName]
	cluster.backends[backendIndex].openConns[conn] = &grpc.ClientConn{}
}

// Common setup to run before each unit test
func AffinityTestSetup() {
	// reset globals that need to be clean for each unit test

	clusters = make(map[string]*cluster)
	allRouters = make(map[string]Router)
}

// Test creation of a new AffinityRouter, and the Service(), Name(), FindBackendCluster(), and
// methods.
func TestAffinityRouterInit(t *testing.T) {
	AffinityTestSetup()

	routeConfig, routerConfig := MakeAffinityTestConfig(1, 1)

	router, err := newAffinityRouter(routerConfig, routeConfig)

	assert.NotNil(t, router)
	assert.Nil(t, err)

	assert.Equal(t, router.Service(), "VolthaService")
	assert.Equal(t, router.Name(), "dev_manager")

	cluster, err := router.BackendCluster("foo", "bar")
	assert.Equal(t, cluster, clusters["vcore"])
	assert.Nil(t, err)

	assert.Equal(t, router.FindBackendCluster("vcore"), clusters["vcore"])
}

// Should throw error if no name in configuration
func TestAffinityRouterInitNoName(t *testing.T) {
	AffinityTestSetup()

	routeConfig, routerConfig := MakeAffinityTestConfig(1, 1)
	routeConfig.Name = ""

	_, err := newAffinityRouter(routerConfig, routeConfig)

	assert.EqualError(t, err, "Failed to create a new router ''")
}

// Should thow error if now ProtoPackage in configuration
func TestAffinityRouterInitNoProtoPackage(t *testing.T) {
	AffinityTestSetup()

	routeConfig, routerConfig := MakeAffinityTestConfig(1, 1)
	routerConfig.ProtoPackage = ""

	_, err := newAffinityRouter(routerConfig, routeConfig)

	assert.EqualError(t, err, "Failed to create a new router 'dev_manager'")
}

// Should throw error if no ProtoServer in configuration
func TestAffinityRouterInitNoProtoService(t *testing.T) {
	AffinityTestSetup()

	routeConfig, routerConfig := MakeAffinityTestConfig(1, 1)
	routerConfig.ProtoService = ""

	_, err := newAffinityRouter(routerConfig, routeConfig)

	assert.EqualError(t, err, "Failed to create a new router 'dev_manager'")
}

// Tests a cluster with only one Backend
func TestAffinityRouteOne(t *testing.T) {
	AffinityTestSetup()

	_, routerConfig := MakeAffinityTestConfig(1, 1)

	router, err := newRouter(routerConfig)
	assert.Nil(t, err)

	PretendAffinityOpenConnection(router, "vcore", 0, "rw_vcore01")

	idMessage := &common_pb.ID{Id: "1234"}

	idData, err := proto.Marshal(idMessage)
	assert.Nil(t, err)

	sel := &requestFrame{payload: idData,
		err:        nil,
		metaKey:    NoMeta,
		methodInfo: newMethodDetails("/voltha.VolthaService/EnableDevice")}

	backend, connection := router.Route(sel)

	assert.Nil(t, sel.err)
	assert.NotNil(t, backend)
	assert.Equal(t, "rw_vcore0", backend.name)
	assert.Nil(t, connection)

	// Since we only have one backend, calling Route a second time should return the same one

	backend, connection = router.Route(sel)

	assert.Nil(t, sel.err)
	assert.NotNil(t, backend)
	assert.Equal(t, "rw_vcore0", backend.name)
	assert.Nil(t, connection)
}

// Tests a cluster with two Backends
func TestAffinityRouteTwo(t *testing.T) {
	AffinityTestSetup()

	_, routerConfig := MakeAffinityTestConfig(2, 1)

	router, err := newRouter(routerConfig)
	assert.Nil(t, err)

	PretendAffinityOpenConnection(router, "vcore", 0, "rw_vcore01")
	PretendAffinityOpenConnection(router, "vcore", 1, "rw_vcore11")

	idMessage := &common_pb.ID{Id: "1234"}
	idData, err := proto.Marshal(idMessage)
	assert.Nil(t, err)

	sel := &requestFrame{payload: idData,
		err:        nil,
		metaKey:    NoMeta,
		methodInfo: newMethodDetails("/voltha.VolthaService/EnableDevice")}

	// We should Route to the first core and bind affinity to it

	backend, connection := router.Route(sel)

	assert.Nil(t, sel.err)
	assert.NotNil(t, backend)
	assert.Equal(t, "rw_vcore0", backend.name)
	assert.Nil(t, connection)

	// We should have established affinity, and trying Route again should return the same core

	backend, connection = router.Route(sel)

	assert.Nil(t, sel.err)
	assert.NotNil(t, backend)
	assert.Equal(t, "rw_vcore0", backend.name)
	assert.Nil(t, connection)

	// Make up a message with a different id
	idMessage = &common_pb.ID{Id: "1235"}
	idData, err = proto.Marshal(idMessage)
	assert.Nil(t, err)

	sel = &requestFrame{payload: idData,
		err:        nil,
		metaKey:    NoMeta,
		methodInfo: newMethodDetails("/voltha.VolthaService/EnableDevice")}

	// Calling Route with the new ID should cause it to bind affinity to the second core

	backend, connection = router.Route(sel)

	assert.Nil(t, sel.err)
	assert.NotNil(t, backend)
	assert.Equal(t, "rw_vcore1", backend.name)
	assert.Nil(t, connection)
}

// Tests a cluster with one backend but no open connections
func TestAffinityRouteOneNoOpenConnection(t *testing.T) {
	AffinityTestSetup()

	_, routerConfig := MakeAffinityTestConfig(1, 1)

	router, err := newRouter(routerConfig)
	assert.Nil(t, err)

	idMessage := &common_pb.ID{Id: "1234"}

	idData, err := proto.Marshal(idMessage)
	assert.Nil(t, err)

	sel := &requestFrame{payload: idData,
		err:        nil,
		metaKey:    NoMeta,
		methodInfo: newMethodDetails("/voltha.VolthaService/EnableDevice")}

	backend, connection := router.Route(sel)

	assert.EqualError(t, sel.err, "No backend with open connections found")
	assert.Nil(t, backend)
	assert.Nil(t, connection)
}

// Tests binding on reply
func TestAffinityRouteReply(t *testing.T) {
	AffinityTestSetup()

	_, routerConfig := MakeAffinityTestConfig(2, 1)

	router, err := newRouter(routerConfig)
	assert.Nil(t, err)

	// Get the created AffinityRouter so we can inspect its state
	aRouter := allRouters["vcoredev_manager"].(AffinityRouter)

	PretendAffinityOpenConnection(router, "vcore", 0, "rw_vcore01")
	PretendAffinityOpenConnection(router, "vcore", 1, "rw_vcore11")

	idMessage := &voltha_pb.Device{Id: "1234"}
	idData, err := proto.Marshal(idMessage)
	assert.Nil(t, err)

	// Note that sel.backend must be set. As this is a response, it must
	// have come from a backend and that backend must be known.

	sel := &responseFrame{payload: idData,
		metaKey: NoMeta,
		backend: router.FindBackendCluster("vcore").backends[0],
		method:  "CreateDevice",
	}

	// affinity should be unset as we have not routed yet
	assert.Empty(t, aRouter.affinity)

	err = aRouter.ReplyHandler(sel)
	assert.Nil(t, err)

	// affinity should now be set
	assert.NotEmpty(t, aRouter.affinity)
	assert.Equal(t, router.FindBackendCluster("vcore").backends[0], aRouter.affinity["1234"])
}

// Tests binding on reply, with incorrect frame type
func TestAffinityRouteReplyIncorrectFrame(t *testing.T) {
	AffinityTestSetup()

	_, routerConfig := MakeAffinityTestConfig(2, 1)

	router, err := newRouter(routerConfig)
	assert.Nil(t, err)

	// Get the created AffinityRouter so we can inspect its state
	aRouter := allRouters["vcoredev_manager"].(AffinityRouter)

	PretendAffinityOpenConnection(router, "vcore", 0, "rw_vcore01")
	PretendAffinityOpenConnection(router, "vcore", 1, "rw_vcore11")

	idMessage := &voltha_pb.Device{Id: "1234"}
	idData, err := proto.Marshal(idMessage)
	assert.Nil(t, err)

	sel := &requestFrame{payload: idData,
		err:        nil,
		metaKey:    NoMeta,
		methodInfo: newMethodDetails("/voltha.VolthaService/EnableDevice"),
	}

	// ReplyHandler expects a replyFrame and we're giving it a requestFrame instead

	err = aRouter.ReplyHandler(sel)
	assert.EqualError(t, err, "Internal: invalid data type in ReplyHander call &{[10 4 49 50 51 52] <nil> <nil> <nil> <nil> {/voltha.VolthaService/EnableDevice voltha VolthaService EnableDevice}  nometa }")
}

func TestAffinityRouterDecodeProtoField(t *testing.T) {
	AffinityTestSetup()

	_, routerConfig := MakeAffinityTestConfig(2, 1)

	_, err := newRouter(routerConfig)
	assert.Nil(t, err)

	// Get the created AffinityRouter so we can inspect its state
	aRouter := allRouters["vcoredev_manager"].(AffinityRouter)

	// Pick something to test with lots of field types. Port is a good candidate.
	portMessage := &voltha_pb.Port{PortNo: 123,
		Label:     "testlabel",
		Type:      3,
		DeviceId:  "5678",
		RxPackets: 9876,
	}

	portData, err := proto.Marshal(portMessage)
	assert.Nil(t, err)

	/*
	 * Decode various fields in the protobuf. Decoding each subsequent
	 * field implies skipfield() is called on its predecessor.
	 */

	s, err := aRouter.decodeProtoField(portData, 1) // field 1 is PortNo
	assert.Equal(t, "123", s)

	// Test VOL-1882, skipping of varint field. Note: May cause infinite loop if not fixed!
	s, err = aRouter.decodeProtoField(portData, 2) // field 2 is Label
	assert.Equal(t, "testlabel", s)

	s, err = aRouter.decodeProtoField(portData, 3) // field 3 is PortType
	assert.Equal(t, "3", s)

	s, err = aRouter.decodeProtoField(portData, 7) // field 7 is DeviceId
	assert.Equal(t, "5678", s)

	// TODO: Seems like an int64 ought to be allowed...
	s, err = aRouter.decodeProtoField(portData, 9) // field 7 is RxPackets
	assert.EqualError(t, err, "Only integer and string route selectors are permitted")
}

// Test setting affinity for a key to a backend
func TestAffinitySetAffinity(t *testing.T) {
	AffinityTestSetup()

	_, routerConfig := MakeAffinityTestConfig(2, 1)

	router, err := newRouter(routerConfig)
	assert.Nil(t, err)

	// Get the created AffinityRouter so we can inspect its state
	aRouter := allRouters["vcoredev_manager"].(AffinityRouter)

	backend := router.FindBackendCluster("vcore").backends[0]
	err = aRouter.setAffinity("1234", backend)

	assert.Nil(t, err)
}

// Trying to set affinity when it has already been set should fail.
func TestAffinitySetAffinityChange(t *testing.T) {
	AffinityTestSetup()

	_, routerConfig := MakeAffinityTestConfig(2, 1)

	router, err := newRouter(routerConfig)
	assert.Nil(t, err)

	// Get the created AffinityRouter so we can inspect its state
	aRouter := allRouters["vcoredev_manager"].(AffinityRouter)

	backend := router.FindBackendCluster("vcore").backends[0]
	err = aRouter.setAffinity("1234", backend)

	assert.Nil(t, err)

	// Now pick a different backend
	backend = router.FindBackendCluster("vcore").backends[1]
	err = aRouter.setAffinity("1234", backend)

	assert.EqualError(t, err, "Attempting multiple sets of affinity for key 1234 to backend rw_vcore1 from rw_vcore0 on router dev_manager")
}
