SEBA-767 Directory restructuring in accordance with best practices
Change-Id: Id651366a3545ad0141a7854e99fa46867e543295
diff --git a/internal/pkg/commands/orm.go b/internal/pkg/commands/orm.go
new file mode 100644
index 0000000..0ac846a
--- /dev/null
+++ b/internal/pkg/commands/orm.go
@@ -0,0 +1,658 @@
+/*
+ * Portions copyright 2019-present Open Networking Foundation
+ * Original copyright 2019-present Ciena Corporation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package commands
+
+import (
+ "context"
+ "fmt"
+ "github.com/fullstorydev/grpcurl"
+ "github.com/golang/protobuf/proto"
+ "github.com/golang/protobuf/protoc-gen-go/descriptor"
+ "github.com/jhump/protoreflect/desc"
+ "github.com/jhump/protoreflect/dynamic"
+ corderrors "github.com/opencord/cordctl/internal/pkg/error"
+ "google.golang.org/grpc"
+ "io"
+ "strconv"
+ "strings"
+ "time"
+)
+
+// Flags for calling the *WithRetry methods
+const (
+ GM_QUIET = 1
+ GM_UNTIL_FOUND = 2
+ GM_UNTIL_ENACTED = 4
+ GM_UNTIL_STATUS = 8
+)
+
+// Valid choices for FilterModels `Kind` argument
+const (
+ FILTER_DEFAULT = "DEFAULT"
+ FILTER_ALL = "ALL"
+ FILTER_DIRTY = "SYNCHRONIZER_DIRTY_OBJECTS"
+ FILTER_DELETED = "SYNCHRONIZER_DELETED_OBJECTS"
+ FILTER_DIRTYPOL = "SYNCHRONIZER_DIRTY_POLICIES"
+ FILTER_DELETEDPOL = "SYNCHRONIZER_DELETED_POLICIES"
+)
+
+type QueryEventHandler struct {
+ RpcEventHandler
+ Elements map[string]string
+ Model *desc.MessageDescriptor
+ Kind string
+ EOF bool
+}
+
+// Separate the operator from the query value.
+// For example,
+// "==foo" --> "EQUAL", "foo"
+func DecodeOperator(query string) (string, string, bool, error) {
+ if strings.HasPrefix(query, "!=") {
+ return strings.TrimSpace(query[2:]), "EQUAL", true, nil
+ } else if strings.HasPrefix(query, "==") {
+ return "", "", false, corderrors.NewInvalidInputError("Operator == is now allowed. Suggest using = instead.")
+ } else if strings.HasPrefix(query, "=") {
+ return strings.TrimSpace(query[1:]), "EQUAL", false, nil
+ } else if strings.HasPrefix(query, ">=") {
+ return strings.TrimSpace(query[2:]), "GREATER_THAN_OR_EQUAL", false, nil
+ } else if strings.HasPrefix(query, ">") {
+ return strings.TrimSpace(query[1:]), "GREATER_THAN", false, nil
+ } else if strings.HasPrefix(query, "<=") {
+ return strings.TrimSpace(query[2:]), "LESS_THAN_OR_EQUAL", false, nil
+ } else if strings.HasPrefix(query, "<") {
+ return strings.TrimSpace(query[1:]), "LESS_THAN", false, nil
+ } else {
+ return strings.TrimSpace(query), "EQUAL", false, nil
+ }
+}
+
+// Generate the parameters for Query messages.
+func (h *QueryEventHandler) GetParams(msg proto.Message) error {
+ dmsg, err := dynamic.AsDynamicMessage(msg)
+ if err != nil {
+ return err
+ }
+
+ //fmt.Printf("MessageName: %s\n", dmsg.XXX_MessageName())
+
+ if h.EOF {
+ return io.EOF
+ }
+
+ // Get the MessageType for the `elements` field
+ md := dmsg.GetMessageDescriptor()
+ elements_fld := md.FindFieldByName("elements")
+ elements_mt := elements_fld.GetMessageType()
+
+ for field_name, element := range h.Elements {
+ value, operator, invert, err := DecodeOperator(element)
+ if err != nil {
+ return err
+ }
+
+ nm := dynamic.NewMessage(elements_mt)
+
+ field_descriptor := h.Model.FindFieldByName(field_name)
+ if field_descriptor == nil {
+ return corderrors.WithStackTrace(&corderrors.FieldDoesNotExistError{ModelName: h.Model.GetName(), FieldName: field_name})
+ }
+
+ field_type := field_descriptor.GetType()
+ switch field_type {
+ case descriptor.FieldDescriptorProto_TYPE_INT32:
+ var i int64
+ i, err = strconv.ParseInt(value, 10, 32)
+ nm.SetFieldByName("iValue", int32(i))
+ case descriptor.FieldDescriptorProto_TYPE_UINT32:
+ var i int64
+ i, err = strconv.ParseInt(value, 10, 32)
+ nm.SetFieldByName("iValue", uint32(i))
+ case descriptor.FieldDescriptorProto_TYPE_FLOAT:
+ err = corderrors.NewInvalidInputError("Floating point filters are unsupported")
+ case descriptor.FieldDescriptorProto_TYPE_DOUBLE:
+ err = corderrors.NewInvalidInputError("Floating point filters are unsupported")
+ default:
+ nm.SetFieldByName("sValue", value)
+ err = nil
+ }
+
+ if err != nil {
+ return err
+ }
+
+ nm.SetFieldByName("name", field_name)
+ nm.SetFieldByName("invert", invert)
+ SetEnumValue(nm, "operator", operator)
+ dmsg.AddRepeatedFieldByName("elements", nm)
+ }
+
+ SetEnumValue(dmsg, "kind", h.Kind)
+
+ h.EOF = true
+
+ return nil
+}
+
+// Take a string list of queries and turns it into a map of queries
+func QueryStringsToMap(query_args []string, allow_inequality bool) (map[string]string, error) {
+ queries := make(map[string]string)
+ for _, query_str := range query_args {
+ query_str := strings.TrimSpace(query_str)
+ operator_pos := -1
+ for i, ch := range query_str {
+ if allow_inequality {
+ if (ch == '!') || (ch == '=') || (ch == '>') || (ch == '<') {
+ operator_pos = i
+ break
+ }
+ } else {
+ if ch == '=' {
+ operator_pos = i
+ break
+ }
+ }
+ }
+ if operator_pos == -1 {
+ return nil, corderrors.WithStackTrace(&corderrors.IllegalQueryError{Query: query_str})
+ }
+ queries[strings.TrimSpace(query_str[:operator_pos])] = query_str[operator_pos:]
+ }
+ return queries, nil
+}
+
+// Take a string of comma-separated queries and turn it into a map of queries
+func CommaSeparatedQueryToMap(query_str string, allow_inequality bool) (map[string]string, error) {
+ if query_str == "" {
+ return nil, nil
+ }
+
+ query_strings := strings.Split(query_str, ",")
+ return QueryStringsToMap(query_strings, allow_inequality)
+}
+
+// Convert a string into the appropriate gRPC type for a given field
+func TypeConvert(source grpcurl.DescriptorSource, modelName string, field_name string, v string) (interface{}, error) {
+ model_descriptor, err := source.FindSymbol("xos." + modelName)
+ if err != nil {
+ return nil, err
+ }
+ model_md, ok := model_descriptor.(*desc.MessageDescriptor)
+ if !ok {
+ return nil, corderrors.WithStackTrace(&corderrors.TypeConversionError{Source: modelName, Destination: "messageDescriptor"})
+ }
+ field_descriptor := model_md.FindFieldByName(field_name)
+ if field_descriptor == nil {
+ return nil, corderrors.WithStackTrace(&corderrors.FieldDoesNotExistError{ModelName: modelName, FieldName: field_name})
+ }
+ field_type := field_descriptor.GetType()
+
+ var result interface{}
+
+ switch field_type {
+ case descriptor.FieldDescriptorProto_TYPE_INT32:
+ var i int64
+ i, err = strconv.ParseInt(v, 10, 32)
+ result = int32(i)
+ case descriptor.FieldDescriptorProto_TYPE_UINT32:
+ var i int64
+ i, err = strconv.ParseInt(v, 10, 32)
+ result = uint32(i)
+ case descriptor.FieldDescriptorProto_TYPE_FLOAT:
+ var f float64
+ f, err = strconv.ParseFloat(v, 32)
+ result = float32(f)
+ case descriptor.FieldDescriptorProto_TYPE_DOUBLE:
+ var f float64
+ f, err = strconv.ParseFloat(v, 64)
+ result = f
+ default:
+ result = v
+ err = nil
+ }
+
+ return result, err
+}
+
+// Return a list of all available model names
+func GetModelNames(source grpcurl.DescriptorSource) (map[string]bool, error) {
+ models := make(map[string]bool)
+ methods, err := grpcurl.ListMethods(source, "xos.xos")
+
+ if err != nil {
+ return nil, err
+ }
+
+ for _, method := range methods {
+ if strings.HasPrefix(method, "xos.xos.Get") {
+ models[method[11:]] = true
+ }
+ }
+
+ return models, nil
+}
+
+// Check to see if a model name is valid
+func CheckModelName(source grpcurl.DescriptorSource, name string) error {
+ models, err := GetModelNames(source)
+ if err != nil {
+ return err
+ }
+ _, present := models[name]
+ if !present {
+ return corderrors.WithStackTrace(&corderrors.UnknownModelTypeError{Name: name})
+ }
+ return nil
+}
+
+// Create a model in XOS given a map of fields
+func CreateModel(conn *grpc.ClientConn, descriptor grpcurl.DescriptorSource, modelName string, fields map[string]interface{}) error {
+ ctx, cancel := context.WithTimeout(context.Background(), GlobalConfig.Grpc.Timeout)
+ defer cancel()
+
+ headers := GenerateHeaders()
+
+ h := &RpcEventHandler{
+ Fields: map[string]map[string]interface{}{"xos." + modelName: fields},
+ }
+ err := grpcurl.InvokeRPC(ctx, descriptor, conn, "xos.xos.Create"+modelName, headers, h, h.GetParams)
+ if err != nil {
+ return corderrors.RpcErrorWithModelNameToCordError(err, modelName)
+ } else if h.Status != nil && h.Status.Err() != nil {
+ return corderrors.RpcErrorWithModelNameToCordError(h.Status.Err(), modelName)
+ }
+
+ resp, err := dynamic.AsDynamicMessage(h.Response)
+ if err != nil {
+ return err
+ }
+
+ fields["id"] = resp.GetFieldByName("id").(int32)
+
+ if resp.HasFieldName("uuid") {
+ fields["uuid"] = resp.GetFieldByName("uuid").(string)
+ }
+
+ return nil
+}
+
+// Update a model in XOS given a map of fields
+func UpdateModel(conn *grpc.ClientConn, descriptor grpcurl.DescriptorSource, modelName string, fields map[string]interface{}) error {
+ ctx, cancel := context.WithTimeout(context.Background(), GlobalConfig.Grpc.Timeout)
+ defer cancel()
+
+ headers := GenerateHeaders()
+
+ h := &RpcEventHandler{
+ Fields: map[string]map[string]interface{}{"xos." + modelName: fields},
+ }
+ err := grpcurl.InvokeRPC(ctx, descriptor, conn, "xos.xos.Update"+modelName, headers, h, h.GetParams)
+ if err != nil {
+ return corderrors.RpcErrorWithModelNameToCordError(err, modelName)
+ } else if h.Status != nil && h.Status.Err() != nil {
+ return corderrors.RpcErrorWithModelNameToCordError(h.Status.Err(), modelName)
+ }
+
+ resp, err := dynamic.AsDynamicMessage(h.Response)
+ if err != nil {
+ return err
+ }
+
+ // TODO: Do we need to do anything with the response?
+ _ = resp
+
+ return nil
+}
+
+// Get a model from XOS given its ID
+func GetModel(ctx context.Context, conn *grpc.ClientConn, descriptor grpcurl.DescriptorSource, modelName string, id int32) (*dynamic.Message, error) {
+ ctx, cancel := context.WithTimeout(ctx, GlobalConfig.Grpc.Timeout)
+ defer cancel()
+
+ headers := GenerateHeaders()
+
+ h := &RpcEventHandler{
+ Fields: map[string]map[string]interface{}{"xos.ID": map[string]interface{}{"id": id}},
+ }
+ err := grpcurl.InvokeRPC(ctx, descriptor, conn, "xos.xos.Get"+modelName, headers, h, h.GetParams)
+ if err != nil {
+ return nil, corderrors.RpcErrorWithIdToCordError(err, modelName, id)
+ }
+
+ if h.Status != nil && h.Status.Err() != nil {
+ return nil, corderrors.RpcErrorWithIdToCordError(h.Status.Err(), modelName, id) //h.Status.Err()
+ }
+
+ d, err := dynamic.AsDynamicMessage(h.Response)
+ if err != nil {
+ return nil, err
+ }
+
+ return d, nil
+}
+
+// Get a model, but retry under a variety of circumstances
+func GetModelWithRetry(ctx context.Context, conn *grpc.ClientConn, descriptor grpcurl.DescriptorSource, modelName string, id int32, flags uint32) (*grpc.ClientConn, *dynamic.Message, error) {
+ quiet := (flags & GM_QUIET) != 0
+ until_found := (flags & GM_UNTIL_FOUND) != 0
+ until_enacted := (flags & GM_UNTIL_ENACTED) != 0
+ until_status := (flags & GM_UNTIL_STATUS) != 0
+
+ for {
+ var err error
+
+ if conn == nil {
+ conn, err = NewConnection()
+ if err != nil {
+ return nil, nil, err
+ }
+ }
+
+ model, err := GetModel(ctx, conn, descriptor, modelName, id)
+ if err != nil {
+ if strings.Contains(err.Error(), "rpc error: code = Unavailable") ||
+ strings.Contains(err.Error(), "rpc error: code = Internal desc = stream terminated by RST_STREAM") {
+ if !quiet {
+ fmt.Print(".")
+ }
+ select {
+ case <-time.After(100 * time.Millisecond):
+ case <-ctx.Done():
+ return nil, nil, ctx.Err()
+ }
+ conn.Close()
+ conn = nil
+ continue
+ }
+
+ _, is_not_found_error := err.(*corderrors.ModelNotFoundError)
+ if until_found && is_not_found_error {
+ if !quiet {
+ fmt.Print("x")
+ }
+ select {
+ case <-time.After(100 * time.Millisecond):
+ case <-ctx.Done():
+ return nil, nil, ctx.Err()
+ }
+ continue
+ }
+ return nil, nil, err
+ }
+
+ if until_enacted && !IsEnacted(model) {
+ if !quiet {
+ fmt.Print("o")
+ }
+ select {
+ case <-time.After(100 * time.Millisecond):
+ case <-ctx.Done():
+ return nil, nil, ctx.Err()
+ }
+ continue
+ }
+
+ if until_status && model.GetFieldByName("status") == nil {
+ if !quiet {
+ fmt.Print("O")
+ }
+ select {
+ case <-time.After(100 * time.Millisecond):
+ case <-ctx.Done():
+ return nil, nil, ctx.Err()
+ }
+ continue
+ }
+
+ return conn, model, nil
+ }
+}
+
+func ItemsToDynamicMessageList(items interface{}) []*dynamic.Message {
+ result := make([]*dynamic.Message, len(items.([]interface{})))
+ for i, item := range items.([]interface{}) {
+ result[i] = item.(*dynamic.Message)
+ }
+ return result
+}
+
+// List all objects of a given model
+func ListModels(ctx context.Context, conn *grpc.ClientConn, descriptor grpcurl.DescriptorSource, modelName string) ([]*dynamic.Message, error) {
+ ctx, cancel := context.WithTimeout(ctx, GlobalConfig.Grpc.Timeout)
+ defer cancel()
+
+ headers := GenerateHeaders()
+
+ h := &RpcEventHandler{}
+ err := grpcurl.InvokeRPC(ctx, descriptor, conn, "xos.xos.List"+modelName, headers, h, h.GetParams)
+ if err != nil {
+ return nil, corderrors.RpcErrorWithModelNameToCordError(err, modelName)
+ }
+
+ if h.Status != nil && h.Status.Err() != nil {
+ return nil, corderrors.RpcErrorWithModelNameToCordError(h.Status.Err(), modelName)
+ }
+
+ d, err := dynamic.AsDynamicMessage(h.Response)
+ if err != nil {
+ return nil, err
+ }
+
+ items, err := d.TryGetFieldByName("items")
+ if err != nil {
+ return nil, err
+ }
+
+ return ItemsToDynamicMessageList(items), nil
+}
+
+// Filter models based on field values
+// queries is a map of <field_name> to <operator><query>
+// For example,
+// map[string]string{"name": "==mysite"}
+func FilterModels(ctx context.Context, conn *grpc.ClientConn, descriptor grpcurl.DescriptorSource, modelName string, kind string, queries map[string]string) ([]*dynamic.Message, error) {
+ ctx, cancel := context.WithTimeout(ctx, GlobalConfig.Grpc.Timeout)
+ defer cancel()
+
+ headers := GenerateHeaders()
+
+ model_descriptor, err := descriptor.FindSymbol("xos." + modelName)
+ if err != nil {
+ return nil, err
+ }
+ model_md, ok := model_descriptor.(*desc.MessageDescriptor)
+ if !ok {
+ return nil, corderrors.WithStackTrace(&corderrors.TypeConversionError{Source: modelName, Destination: "messageDescriptor"})
+ }
+
+ h := &QueryEventHandler{
+ RpcEventHandler: RpcEventHandler{
+ Fields: map[string]map[string]interface{}{"xos.Query": map[string]interface{}{"kind": 0}},
+ },
+ Elements: queries,
+ Model: model_md,
+ Kind: kind,
+ }
+ err = grpcurl.InvokeRPC(ctx, descriptor, conn, "xos.xos.Filter"+modelName, headers, h, h.GetParams)
+ if err != nil {
+ return nil, corderrors.RpcErrorWithQueriesToCordError(err, modelName, queries)
+ }
+
+ if h.Status != nil && h.Status.Err() != nil {
+ return nil, corderrors.RpcErrorWithQueriesToCordError(h.Status.Err(), modelName, queries)
+ }
+
+ d, err := dynamic.AsDynamicMessage(h.Response)
+ if err != nil {
+ return nil, err
+ }
+
+ items, err := d.TryGetFieldByName("items")
+ if err != nil {
+ return nil, err
+ }
+
+ return ItemsToDynamicMessageList(items), nil
+}
+
+// Call ListModels or FilterModels as appropriate
+func ListOrFilterModels(ctx context.Context, conn *grpc.ClientConn, descriptor grpcurl.DescriptorSource, modelName string, kind string, queries map[string]string) ([]*dynamic.Message, error) {
+ if (len(queries) == 0) && (kind == FILTER_DEFAULT) {
+ return ListModels(ctx, conn, descriptor, modelName)
+ } else {
+ return FilterModels(ctx, conn, descriptor, modelName, kind, queries)
+ }
+}
+
+// Get a model from XOS given a fieldName/fieldValue
+func FindModel(ctx context.Context, conn *grpc.ClientConn, descriptor grpcurl.DescriptorSource, modelName string, queries map[string]string) (*dynamic.Message, error) {
+ models, err := FilterModels(ctx, conn, descriptor, modelName, FILTER_DEFAULT, queries)
+ if err != nil {
+ return nil, err
+ }
+
+ if len(models) == 0 {
+ cordError := &corderrors.ModelNotFoundError{}
+ cordError.Obj = corderrors.ObjectReference{ModelName: modelName, Queries: queries}
+ return nil, corderrors.WithStackTrace(cordError)
+ }
+
+ return models[0], nil
+}
+
+// Find a model, but retry under a variety of circumstances
+func FindModelWithRetry(ctx context.Context, conn *grpc.ClientConn, descriptor grpcurl.DescriptorSource, modelName string, queries map[string]string, flags uint32) (*grpc.ClientConn, *dynamic.Message, error) {
+ quiet := (flags & GM_QUIET) != 0
+ until_found := (flags & GM_UNTIL_FOUND) != 0
+ until_enacted := (flags & GM_UNTIL_ENACTED) != 0
+ until_status := (flags & GM_UNTIL_STATUS) != 0
+
+ for {
+ var err error
+
+ if conn == nil {
+ conn, err = NewConnection()
+ if err != nil {
+ return nil, nil, err
+ }
+ }
+
+ model, err := FindModel(ctx, conn, descriptor, modelName, queries)
+ if err != nil {
+ if strings.Contains(err.Error(), "rpc error: code = Unavailable") ||
+ strings.Contains(err.Error(), "rpc error: code = Internal desc = stream terminated by RST_STREAM") {
+ if !quiet {
+ fmt.Print(".")
+ }
+ select {
+ case <-time.After(100 * time.Millisecond):
+ case <-ctx.Done():
+ return nil, nil, ctx.Err()
+ }
+ conn.Close()
+ conn = nil
+ continue
+ }
+
+ _, is_not_found_error := err.(*corderrors.ModelNotFoundError)
+ if until_found && is_not_found_error {
+ if !quiet {
+ fmt.Print("x")
+ }
+ select {
+ case <-time.After(100 * time.Millisecond):
+ case <-ctx.Done():
+ return nil, nil, ctx.Err()
+ }
+ continue
+ }
+ return nil, nil, err
+ }
+
+ if until_enacted && !IsEnacted(model) {
+ if !quiet {
+ fmt.Print("o")
+ }
+ select {
+ case <-time.After(100 * time.Millisecond):
+ case <-ctx.Done():
+ return nil, nil, ctx.Err()
+ }
+ continue
+ }
+
+ if until_status && model.GetFieldByName("status") == nil {
+ if !quiet {
+ fmt.Print("O")
+ }
+ select {
+ case <-time.After(100 * time.Millisecond):
+ case <-ctx.Done():
+ return nil, nil, ctx.Err()
+ }
+ continue
+ }
+
+ return conn, model, nil
+ }
+}
+
+// Get a model from XOS given its ID
+func DeleteModel(conn *grpc.ClientConn, descriptor grpcurl.DescriptorSource, modelName string, id int32) error {
+ ctx, cancel := context.WithTimeout(context.Background(), GlobalConfig.Grpc.Timeout)
+ defer cancel()
+
+ headers := GenerateHeaders()
+
+ h := &RpcEventHandler{
+ Fields: map[string]map[string]interface{}{"xos.ID": map[string]interface{}{"id": id}},
+ }
+ err := grpcurl.InvokeRPC(ctx, descriptor, conn, "xos.xos.Delete"+modelName, headers, h, h.GetParams)
+ if err != nil {
+ return corderrors.RpcErrorWithIdToCordError(err, modelName, id)
+ }
+
+ if h.Status != nil && h.Status.Err() != nil {
+ return corderrors.RpcErrorWithIdToCordError(h.Status.Err(), modelName, id)
+ }
+
+ _, err = dynamic.AsDynamicMessage(h.Response)
+ if err != nil {
+ return err
+ }
+
+ return nil
+}
+
+// Takes a *dynamic.Message and turns it into a map of fields to interfaces
+// TODO: Might be more useful to convert the values to strings and ints
+func MessageToMap(d *dynamic.Message) map[string]interface{} {
+ fields := make(map[string]interface{})
+ for _, field_desc := range d.GetKnownFields() {
+ field_name := field_desc.GetName()
+ fields[field_name] = d.GetFieldByName(field_name)
+ }
+ return fields
+}
+
+// Returns True if a message has been enacted
+func IsEnacted(d *dynamic.Message) bool {
+ enacted := d.GetFieldByName("enacted").(float64)
+ updated := d.GetFieldByName("updated").(float64)
+
+ return (enacted >= updated)
+}