| /* |
| * Portions copyright 2019-present Open Networking Foundation |
| * Original copyright 2019-present Ciena Corporation |
| * |
| * Licensed under the Apache License, Version 2.0 (the "License"); |
| * you may not use this file except in compliance with the License. |
| * You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| * See the License for the specific language governing permissions and |
| * limitations under the License. |
| */ |
| package commands |
| |
| import ( |
| "context" |
| "fmt" |
| "github.com/fullstorydev/grpcurl" |
| "github.com/golang/protobuf/proto" |
| "github.com/golang/protobuf/protoc-gen-go/descriptor" |
| "github.com/jhump/protoreflect/desc" |
| "github.com/jhump/protoreflect/dynamic" |
| corderrors "github.com/opencord/cordctl/error" |
| "google.golang.org/grpc" |
| "io" |
| "strconv" |
| "strings" |
| "time" |
| ) |
| |
| // Flags for calling the *WithRetry methods |
| const ( |
| GM_QUIET = 1 |
| GM_UNTIL_FOUND = 2 |
| GM_UNTIL_ENACTED = 4 |
| GM_UNTIL_STATUS = 8 |
| ) |
| |
| // Valid choices for FilterModels `Kind` argument |
| const ( |
| FILTER_DEFAULT = "DEFAULT" |
| FILTER_ALL = "ALL" |
| FILTER_DIRTY = "SYNCHRONIZER_DIRTY_OBJECTS" |
| FILTER_DELETED = "SYNCHRONIZER_DELETED_OBJECTS" |
| FILTER_DIRTYPOL = "SYNCHRONIZER_DIRTY_POLICIES" |
| FILTER_DELETEDPOL = "SYNCHRONIZER_DELETED_POLICIES" |
| ) |
| |
| type QueryEventHandler struct { |
| RpcEventHandler |
| Elements map[string]string |
| Model *desc.MessageDescriptor |
| Kind string |
| EOF bool |
| } |
| |
| // Separate the operator from the query value. |
| // For example, |
| // "==foo" --> "EQUAL", "foo" |
| func DecodeOperator(query string) (string, string, bool, error) { |
| if strings.HasPrefix(query, "!=") { |
| return strings.TrimSpace(query[2:]), "EQUAL", true, nil |
| } else if strings.HasPrefix(query, "==") { |
| return "", "", false, corderrors.NewInvalidInputError("Operator == is now allowed. Suggest using = instead.") |
| } else if strings.HasPrefix(query, "=") { |
| return strings.TrimSpace(query[1:]), "EQUAL", false, nil |
| } else if strings.HasPrefix(query, ">=") { |
| return strings.TrimSpace(query[2:]), "GREATER_THAN_OR_EQUAL", false, nil |
| } else if strings.HasPrefix(query, ">") { |
| return strings.TrimSpace(query[1:]), "GREATER_THAN", false, nil |
| } else if strings.HasPrefix(query, "<=") { |
| return strings.TrimSpace(query[2:]), "LESS_THAN_OR_EQUAL", false, nil |
| } else if strings.HasPrefix(query, "<") { |
| return strings.TrimSpace(query[1:]), "LESS_THAN", false, nil |
| } else { |
| return strings.TrimSpace(query), "EQUAL", false, nil |
| } |
| } |
| |
| // Generate the parameters for Query messages. |
| func (h *QueryEventHandler) GetParams(msg proto.Message) error { |
| dmsg, err := dynamic.AsDynamicMessage(msg) |
| if err != nil { |
| return err |
| } |
| |
| //fmt.Printf("MessageName: %s\n", dmsg.XXX_MessageName()) |
| |
| if h.EOF { |
| return io.EOF |
| } |
| |
| // Get the MessageType for the `elements` field |
| md := dmsg.GetMessageDescriptor() |
| elements_fld := md.FindFieldByName("elements") |
| elements_mt := elements_fld.GetMessageType() |
| |
| for field_name, element := range h.Elements { |
| value, operator, invert, err := DecodeOperator(element) |
| if err != nil { |
| return err |
| } |
| |
| nm := dynamic.NewMessage(elements_mt) |
| |
| field_descriptor := h.Model.FindFieldByName(field_name) |
| if field_descriptor == nil { |
| return corderrors.WithStackTrace(&corderrors.FieldDoesNotExistError{ModelName: h.Model.GetName(), FieldName: field_name}) |
| } |
| |
| field_type := field_descriptor.GetType() |
| switch field_type { |
| case descriptor.FieldDescriptorProto_TYPE_INT32: |
| var i int64 |
| i, err = strconv.ParseInt(value, 10, 32) |
| nm.SetFieldByName("iValue", int32(i)) |
| case descriptor.FieldDescriptorProto_TYPE_UINT32: |
| var i int64 |
| i, err = strconv.ParseInt(value, 10, 32) |
| nm.SetFieldByName("iValue", uint32(i)) |
| case descriptor.FieldDescriptorProto_TYPE_FLOAT: |
| err = corderrors.NewInvalidInputError("Floating point filters are unsupported") |
| case descriptor.FieldDescriptorProto_TYPE_DOUBLE: |
| err = corderrors.NewInvalidInputError("Floating point filters are unsupported") |
| default: |
| nm.SetFieldByName("sValue", value) |
| err = nil |
| } |
| |
| if err != nil { |
| return err |
| } |
| |
| nm.SetFieldByName("name", field_name) |
| nm.SetFieldByName("invert", invert) |
| SetEnumValue(nm, "operator", operator) |
| dmsg.AddRepeatedFieldByName("elements", nm) |
| } |
| |
| SetEnumValue(dmsg, "kind", h.Kind) |
| |
| h.EOF = true |
| |
| return nil |
| } |
| |
| // Take a string list of queries and turns it into a map of queries |
| func QueryStringsToMap(query_args []string, allow_inequality bool) (map[string]string, error) { |
| queries := make(map[string]string) |
| for _, query_str := range query_args { |
| query_str := strings.TrimSpace(query_str) |
| operator_pos := -1 |
| for i, ch := range query_str { |
| if allow_inequality { |
| if (ch == '!') || (ch == '=') || (ch == '>') || (ch == '<') { |
| operator_pos = i |
| break |
| } |
| } else { |
| if ch == '=' { |
| operator_pos = i |
| break |
| } |
| } |
| } |
| if operator_pos == -1 { |
| return nil, corderrors.WithStackTrace(&corderrors.IllegalQueryError{Query: query_str}) |
| } |
| queries[strings.TrimSpace(query_str[:operator_pos])] = query_str[operator_pos:] |
| } |
| return queries, nil |
| } |
| |
| // Take a string of comma-separated queries and turn it into a map of queries |
| func CommaSeparatedQueryToMap(query_str string, allow_inequality bool) (map[string]string, error) { |
| if query_str == "" { |
| return nil, nil |
| } |
| |
| query_strings := strings.Split(query_str, ",") |
| return QueryStringsToMap(query_strings, allow_inequality) |
| } |
| |
| // Convert a string into the appropriate gRPC type for a given field |
| func TypeConvert(source grpcurl.DescriptorSource, modelName string, field_name string, v string) (interface{}, error) { |
| model_descriptor, err := source.FindSymbol("xos." + modelName) |
| if err != nil { |
| return nil, err |
| } |
| model_md, ok := model_descriptor.(*desc.MessageDescriptor) |
| if !ok { |
| return nil, corderrors.WithStackTrace(&corderrors.TypeConversionError{Source: modelName, Destination: "messageDescriptor"}) |
| } |
| field_descriptor := model_md.FindFieldByName(field_name) |
| if field_descriptor == nil { |
| return nil, corderrors.WithStackTrace(&corderrors.FieldDoesNotExistError{ModelName: modelName, FieldName: field_name}) |
| } |
| field_type := field_descriptor.GetType() |
| |
| var result interface{} |
| |
| switch field_type { |
| case descriptor.FieldDescriptorProto_TYPE_INT32: |
| var i int64 |
| i, err = strconv.ParseInt(v, 10, 32) |
| result = int32(i) |
| case descriptor.FieldDescriptorProto_TYPE_UINT32: |
| var i int64 |
| i, err = strconv.ParseInt(v, 10, 32) |
| result = uint32(i) |
| case descriptor.FieldDescriptorProto_TYPE_FLOAT: |
| var f float64 |
| f, err = strconv.ParseFloat(v, 32) |
| result = float32(f) |
| case descriptor.FieldDescriptorProto_TYPE_DOUBLE: |
| var f float64 |
| f, err = strconv.ParseFloat(v, 64) |
| result = f |
| default: |
| result = v |
| err = nil |
| } |
| |
| return result, err |
| } |
| |
| // Return a list of all available model names |
| func GetModelNames(source grpcurl.DescriptorSource) (map[string]bool, error) { |
| models := make(map[string]bool) |
| methods, err := grpcurl.ListMethods(source, "xos.xos") |
| |
| if err != nil { |
| return nil, err |
| } |
| |
| for _, method := range methods { |
| if strings.HasPrefix(method, "xos.xos.Get") { |
| models[method[11:]] = true |
| } |
| } |
| |
| return models, nil |
| } |
| |
| // Check to see if a model name is valid |
| func CheckModelName(source grpcurl.DescriptorSource, name string) error { |
| models, err := GetModelNames(source) |
| if err != nil { |
| return err |
| } |
| _, present := models[name] |
| if !present { |
| return corderrors.WithStackTrace(&corderrors.UnknownModelTypeError{Name: name}) |
| } |
| return nil |
| } |
| |
| // Create a model in XOS given a map of fields |
| func CreateModel(conn *grpc.ClientConn, descriptor grpcurl.DescriptorSource, modelName string, fields map[string]interface{}) error { |
| ctx, cancel := context.WithTimeout(context.Background(), GlobalConfig.Grpc.Timeout) |
| defer cancel() |
| |
| headers := GenerateHeaders() |
| |
| h := &RpcEventHandler{ |
| Fields: map[string]map[string]interface{}{"xos." + modelName: fields}, |
| } |
| err := grpcurl.InvokeRPC(ctx, descriptor, conn, "xos.xos.Create"+modelName, headers, h, h.GetParams) |
| if err != nil { |
| return corderrors.RpcErrorWithModelNameToCordError(err, modelName) |
| } else if h.Status != nil && h.Status.Err() != nil { |
| return corderrors.RpcErrorWithModelNameToCordError(h.Status.Err(), modelName) |
| } |
| |
| resp, err := dynamic.AsDynamicMessage(h.Response) |
| if err != nil { |
| return err |
| } |
| |
| fields["id"] = resp.GetFieldByName("id").(int32) |
| |
| if resp.HasFieldName("uuid") { |
| fields["uuid"] = resp.GetFieldByName("uuid").(string) |
| } |
| |
| return nil |
| } |
| |
| // Update a model in XOS given a map of fields |
| func UpdateModel(conn *grpc.ClientConn, descriptor grpcurl.DescriptorSource, modelName string, fields map[string]interface{}) error { |
| ctx, cancel := context.WithTimeout(context.Background(), GlobalConfig.Grpc.Timeout) |
| defer cancel() |
| |
| headers := GenerateHeaders() |
| |
| h := &RpcEventHandler{ |
| Fields: map[string]map[string]interface{}{"xos." + modelName: fields}, |
| } |
| err := grpcurl.InvokeRPC(ctx, descriptor, conn, "xos.xos.Update"+modelName, headers, h, h.GetParams) |
| if err != nil { |
| return corderrors.RpcErrorWithModelNameToCordError(err, modelName) |
| } else if h.Status != nil && h.Status.Err() != nil { |
| return corderrors.RpcErrorWithModelNameToCordError(h.Status.Err(), modelName) |
| } |
| |
| resp, err := dynamic.AsDynamicMessage(h.Response) |
| if err != nil { |
| return err |
| } |
| |
| // TODO: Do we need to do anything with the response? |
| _ = resp |
| |
| return nil |
| } |
| |
| // Get a model from XOS given its ID |
| func GetModel(ctx context.Context, conn *grpc.ClientConn, descriptor grpcurl.DescriptorSource, modelName string, id int32) (*dynamic.Message, error) { |
| ctx, cancel := context.WithTimeout(ctx, GlobalConfig.Grpc.Timeout) |
| defer cancel() |
| |
| headers := GenerateHeaders() |
| |
| h := &RpcEventHandler{ |
| Fields: map[string]map[string]interface{}{"xos.ID": map[string]interface{}{"id": id}}, |
| } |
| err := grpcurl.InvokeRPC(ctx, descriptor, conn, "xos.xos.Get"+modelName, headers, h, h.GetParams) |
| if err != nil { |
| return nil, corderrors.RpcErrorWithIdToCordError(err, modelName, id) |
| } |
| |
| if h.Status != nil && h.Status.Err() != nil { |
| return nil, corderrors.RpcErrorWithIdToCordError(h.Status.Err(), modelName, id) //h.Status.Err() |
| } |
| |
| d, err := dynamic.AsDynamicMessage(h.Response) |
| if err != nil { |
| return nil, err |
| } |
| |
| return d, nil |
| } |
| |
| // Get a model, but retry under a variety of circumstances |
| func GetModelWithRetry(ctx context.Context, conn *grpc.ClientConn, descriptor grpcurl.DescriptorSource, modelName string, id int32, flags uint32) (*grpc.ClientConn, *dynamic.Message, error) { |
| quiet := (flags & GM_QUIET) != 0 |
| until_found := (flags & GM_UNTIL_FOUND) != 0 |
| until_enacted := (flags & GM_UNTIL_ENACTED) != 0 |
| until_status := (flags & GM_UNTIL_STATUS) != 0 |
| |
| for { |
| var err error |
| |
| if conn == nil { |
| conn, err = NewConnection() |
| if err != nil { |
| return nil, nil, err |
| } |
| } |
| |
| model, err := GetModel(ctx, conn, descriptor, modelName, id) |
| if err != nil { |
| if strings.Contains(err.Error(), "rpc error: code = Unavailable") || |
| strings.Contains(err.Error(), "rpc error: code = Internal desc = stream terminated by RST_STREAM") { |
| if !quiet { |
| fmt.Print(".") |
| } |
| select { |
| case <-time.After(100 * time.Millisecond): |
| case <-ctx.Done(): |
| return nil, nil, ctx.Err() |
| } |
| conn.Close() |
| conn = nil |
| continue |
| } |
| |
| _, is_not_found_error := err.(*corderrors.ModelNotFoundError) |
| if until_found && is_not_found_error { |
| if !quiet { |
| fmt.Print("x") |
| } |
| select { |
| case <-time.After(100 * time.Millisecond): |
| case <-ctx.Done(): |
| return nil, nil, ctx.Err() |
| } |
| continue |
| } |
| return nil, nil, err |
| } |
| |
| if until_enacted && !IsEnacted(model) { |
| if !quiet { |
| fmt.Print("o") |
| } |
| select { |
| case <-time.After(100 * time.Millisecond): |
| case <-ctx.Done(): |
| return nil, nil, ctx.Err() |
| } |
| continue |
| } |
| |
| if until_status && model.GetFieldByName("status") == nil { |
| if !quiet { |
| fmt.Print("O") |
| } |
| select { |
| case <-time.After(100 * time.Millisecond): |
| case <-ctx.Done(): |
| return nil, nil, ctx.Err() |
| } |
| continue |
| } |
| |
| return conn, model, nil |
| } |
| } |
| |
| func ItemsToDynamicMessageList(items interface{}) []*dynamic.Message { |
| result := make([]*dynamic.Message, len(items.([]interface{}))) |
| for i, item := range items.([]interface{}) { |
| result[i] = item.(*dynamic.Message) |
| } |
| return result |
| } |
| |
| // List all objects of a given model |
| func ListModels(ctx context.Context, conn *grpc.ClientConn, descriptor grpcurl.DescriptorSource, modelName string) ([]*dynamic.Message, error) { |
| ctx, cancel := context.WithTimeout(ctx, GlobalConfig.Grpc.Timeout) |
| defer cancel() |
| |
| headers := GenerateHeaders() |
| |
| h := &RpcEventHandler{} |
| err := grpcurl.InvokeRPC(ctx, descriptor, conn, "xos.xos.List"+modelName, headers, h, h.GetParams) |
| if err != nil { |
| return nil, corderrors.RpcErrorWithModelNameToCordError(err, modelName) |
| } |
| |
| if h.Status != nil && h.Status.Err() != nil { |
| return nil, corderrors.RpcErrorWithModelNameToCordError(h.Status.Err(), modelName) |
| } |
| |
| d, err := dynamic.AsDynamicMessage(h.Response) |
| if err != nil { |
| return nil, err |
| } |
| |
| items, err := d.TryGetFieldByName("items") |
| if err != nil { |
| return nil, err |
| } |
| |
| return ItemsToDynamicMessageList(items), nil |
| } |
| |
| // Filter models based on field values |
| // queries is a map of <field_name> to <operator><query> |
| // For example, |
| // map[string]string{"name": "==mysite"} |
| func FilterModels(ctx context.Context, conn *grpc.ClientConn, descriptor grpcurl.DescriptorSource, modelName string, kind string, queries map[string]string) ([]*dynamic.Message, error) { |
| ctx, cancel := context.WithTimeout(ctx, GlobalConfig.Grpc.Timeout) |
| defer cancel() |
| |
| headers := GenerateHeaders() |
| |
| model_descriptor, err := descriptor.FindSymbol("xos." + modelName) |
| if err != nil { |
| return nil, err |
| } |
| model_md, ok := model_descriptor.(*desc.MessageDescriptor) |
| if !ok { |
| return nil, corderrors.WithStackTrace(&corderrors.TypeConversionError{Source: modelName, Destination: "messageDescriptor"}) |
| } |
| |
| h := &QueryEventHandler{ |
| RpcEventHandler: RpcEventHandler{ |
| Fields: map[string]map[string]interface{}{"xos.Query": map[string]interface{}{"kind": 0}}, |
| }, |
| Elements: queries, |
| Model: model_md, |
| Kind: kind, |
| } |
| err = grpcurl.InvokeRPC(ctx, descriptor, conn, "xos.xos.Filter"+modelName, headers, h, h.GetParams) |
| if err != nil { |
| return nil, corderrors.RpcErrorWithQueriesToCordError(err, modelName, queries) |
| } |
| |
| if h.Status != nil && h.Status.Err() != nil { |
| return nil, corderrors.RpcErrorWithQueriesToCordError(h.Status.Err(), modelName, queries) |
| } |
| |
| d, err := dynamic.AsDynamicMessage(h.Response) |
| if err != nil { |
| return nil, err |
| } |
| |
| items, err := d.TryGetFieldByName("items") |
| if err != nil { |
| return nil, err |
| } |
| |
| return ItemsToDynamicMessageList(items), nil |
| } |
| |
| // Call ListModels or FilterModels as appropriate |
| func ListOrFilterModels(ctx context.Context, conn *grpc.ClientConn, descriptor grpcurl.DescriptorSource, modelName string, kind string, queries map[string]string) ([]*dynamic.Message, error) { |
| if (len(queries) == 0) && (kind == FILTER_DEFAULT) { |
| return ListModels(ctx, conn, descriptor, modelName) |
| } else { |
| return FilterModels(ctx, conn, descriptor, modelName, kind, queries) |
| } |
| } |
| |
| // Get a model from XOS given a fieldName/fieldValue |
| func FindModel(ctx context.Context, conn *grpc.ClientConn, descriptor grpcurl.DescriptorSource, modelName string, queries map[string]string) (*dynamic.Message, error) { |
| models, err := FilterModels(ctx, conn, descriptor, modelName, FILTER_DEFAULT, queries) |
| if err != nil { |
| return nil, err |
| } |
| |
| if len(models) == 0 { |
| cordError := &corderrors.ModelNotFoundError{} |
| cordError.Obj = corderrors.ObjectReference{ModelName: modelName, Queries: queries} |
| return nil, corderrors.WithStackTrace(cordError) |
| } |
| |
| return models[0], nil |
| } |
| |
| // Find a model, but retry under a variety of circumstances |
| func FindModelWithRetry(ctx context.Context, conn *grpc.ClientConn, descriptor grpcurl.DescriptorSource, modelName string, queries map[string]string, flags uint32) (*grpc.ClientConn, *dynamic.Message, error) { |
| quiet := (flags & GM_QUIET) != 0 |
| until_found := (flags & GM_UNTIL_FOUND) != 0 |
| until_enacted := (flags & GM_UNTIL_ENACTED) != 0 |
| until_status := (flags & GM_UNTIL_STATUS) != 0 |
| |
| for { |
| var err error |
| |
| if conn == nil { |
| conn, err = NewConnection() |
| if err != nil { |
| return nil, nil, err |
| } |
| } |
| |
| model, err := FindModel(ctx, conn, descriptor, modelName, queries) |
| if err != nil { |
| if strings.Contains(err.Error(), "rpc error: code = Unavailable") || |
| strings.Contains(err.Error(), "rpc error: code = Internal desc = stream terminated by RST_STREAM") { |
| if !quiet { |
| fmt.Print(".") |
| } |
| select { |
| case <-time.After(100 * time.Millisecond): |
| case <-ctx.Done(): |
| return nil, nil, ctx.Err() |
| } |
| conn.Close() |
| conn = nil |
| continue |
| } |
| |
| _, is_not_found_error := err.(*corderrors.ModelNotFoundError) |
| if until_found && is_not_found_error { |
| if !quiet { |
| fmt.Print("x") |
| } |
| select { |
| case <-time.After(100 * time.Millisecond): |
| case <-ctx.Done(): |
| return nil, nil, ctx.Err() |
| } |
| continue |
| } |
| return nil, nil, err |
| } |
| |
| if until_enacted && !IsEnacted(model) { |
| if !quiet { |
| fmt.Print("o") |
| } |
| select { |
| case <-time.After(100 * time.Millisecond): |
| case <-ctx.Done(): |
| return nil, nil, ctx.Err() |
| } |
| continue |
| } |
| |
| if until_status && model.GetFieldByName("status") == nil { |
| if !quiet { |
| fmt.Print("O") |
| } |
| select { |
| case <-time.After(100 * time.Millisecond): |
| case <-ctx.Done(): |
| return nil, nil, ctx.Err() |
| } |
| continue |
| } |
| |
| return conn, model, nil |
| } |
| } |
| |
| // Get a model from XOS given its ID |
| func DeleteModel(conn *grpc.ClientConn, descriptor grpcurl.DescriptorSource, modelName string, id int32) error { |
| ctx, cancel := context.WithTimeout(context.Background(), GlobalConfig.Grpc.Timeout) |
| defer cancel() |
| |
| headers := GenerateHeaders() |
| |
| h := &RpcEventHandler{ |
| Fields: map[string]map[string]interface{}{"xos.ID": map[string]interface{}{"id": id}}, |
| } |
| err := grpcurl.InvokeRPC(ctx, descriptor, conn, "xos.xos.Delete"+modelName, headers, h, h.GetParams) |
| if err != nil { |
| return corderrors.RpcErrorWithIdToCordError(err, modelName, id) |
| } |
| |
| if h.Status != nil && h.Status.Err() != nil { |
| return corderrors.RpcErrorWithIdToCordError(h.Status.Err(), modelName, id) |
| } |
| |
| _, err = dynamic.AsDynamicMessage(h.Response) |
| if err != nil { |
| return err |
| } |
| |
| return nil |
| } |
| |
| // Takes a *dynamic.Message and turns it into a map of fields to interfaces |
| // TODO: Might be more useful to convert the values to strings and ints |
| func MessageToMap(d *dynamic.Message) map[string]interface{} { |
| fields := make(map[string]interface{}) |
| for _, field_desc := range d.GetKnownFields() { |
| field_name := field_desc.GetName() |
| fields[field_name] = d.GetFieldByName(field_name) |
| } |
| return fields |
| } |
| |
| // Returns True if a message has been enacted |
| func IsEnacted(d *dynamic.Message) bool { |
| enacted := d.GetFieldByName("enacted").(float64) |
| updated := d.GetFieldByName("updated").(float64) |
| |
| return (enacted >= updated) |
| } |