| package sarama |
| |
| import "errors" |
| |
| // ClusterAdmin is the administrative client for Kafka, which supports managing and inspecting topics, |
| // brokers, configurations and ACLs. The minimum broker version required is 0.10.0.0. |
| // Methods with stricter requirements will specify the minimum broker version required. |
| // You MUST call Close() on a client to avoid leaks |
| type ClusterAdmin interface { |
| // Creates a new topic. This operation is supported by brokers with version 0.10.1.0 or higher. |
| // It may take several seconds after CreateTopic returns success for all the brokers |
| // to become aware that the topic has been created. During this time, listTopics |
| // may not return information about the new topic.The validateOnly option is supported from version 0.10.2.0. |
| CreateTopic(topic string, detail *TopicDetail, validateOnly bool) error |
| |
| // Delete a topic. It may take several seconds after the DeleteTopic to returns success |
| // and for all the brokers to become aware that the topics are gone. |
| // During this time, listTopics may continue to return information about the deleted topic. |
| // If delete.topic.enable is false on the brokers, deleteTopic will mark |
| // the topic for deletion, but not actually delete them. |
| // This operation is supported by brokers with version 0.10.1.0 or higher. |
| DeleteTopic(topic string) error |
| |
| // Increase the number of partitions of the topics according to the corresponding values. |
| // If partitions are increased for a topic that has a key, the partition logic or ordering of |
| // the messages will be affected. It may take several seconds after this method returns |
| // success for all the brokers to become aware that the partitions have been created. |
| // During this time, ClusterAdmin#describeTopics may not return information about the |
| // new partitions. This operation is supported by brokers with version 1.0.0 or higher. |
| CreatePartitions(topic string, count int32, assignment [][]int32, validateOnly bool) error |
| |
| // Delete records whose offset is smaller than the given offset of the corresponding partition. |
| // This operation is supported by brokers with version 0.11.0.0 or higher. |
| DeleteRecords(topic string, partitionOffsets map[int32]int64) error |
| |
| // Get the configuration for the specified resources. |
| // The returned configuration includes default values and the Default is true |
| // can be used to distinguish them from user supplied values. |
| // Config entries where ReadOnly is true cannot be updated. |
| // The value of config entries where Sensitive is true is always nil so |
| // sensitive information is not disclosed. |
| // This operation is supported by brokers with version 0.11.0.0 or higher. |
| DescribeConfig(resource ConfigResource) ([]ConfigEntry, error) |
| |
| // Update the configuration for the specified resources with the default options. |
| // This operation is supported by brokers with version 0.11.0.0 or higher. |
| // The resources with their configs (topic is the only resource type with configs |
| // that can be updated currently Updates are not transactional so they may succeed |
| // for some resources while fail for others. The configs for a particular resource are updated automatically. |
| AlterConfig(resourceType ConfigResourceType, name string, entries map[string]*string, validateOnly bool) error |
| |
| // Creates access control lists (ACLs) which are bound to specific resources. |
| // This operation is not transactional so it may succeed for some ACLs while fail for others. |
| // If you attempt to add an ACL that duplicates an existing ACL, no error will be raised, but |
| // no changes will be made. This operation is supported by brokers with version 0.11.0.0 or higher. |
| CreateACL(resource Resource, acl Acl) error |
| |
| // Lists access control lists (ACLs) according to the supplied filter. |
| // it may take some time for changes made by createAcls or deleteAcls to be reflected in the output of ListAcls |
| // This operation is supported by brokers with version 0.11.0.0 or higher. |
| ListAcls(filter AclFilter) ([]ResourceAcls, error) |
| |
| // Deletes access control lists (ACLs) according to the supplied filters. |
| // This operation is not transactional so it may succeed for some ACLs while fail for others. |
| // This operation is supported by brokers with version 0.11.0.0 or higher. |
| DeleteACL(filter AclFilter, validateOnly bool) ([]MatchingAcl, error) |
| |
| // Close shuts down the admin and closes underlying client. |
| Close() error |
| } |
| |
| type clusterAdmin struct { |
| client Client |
| conf *Config |
| } |
| |
| // NewClusterAdmin creates a new ClusterAdmin using the given broker addresses and configuration. |
| func NewClusterAdmin(addrs []string, conf *Config) (ClusterAdmin, error) { |
| client, err := NewClient(addrs, conf) |
| if err != nil { |
| return nil, err |
| } |
| |
| //make sure we can retrieve the controller |
| _, err = client.Controller() |
| if err != nil { |
| return nil, err |
| } |
| |
| ca := &clusterAdmin{ |
| client: client, |
| conf: client.Config(), |
| } |
| return ca, nil |
| } |
| |
| func (ca *clusterAdmin) Close() error { |
| return ca.client.Close() |
| } |
| |
| func (ca *clusterAdmin) Controller() (*Broker, error) { |
| return ca.client.Controller() |
| } |
| |
| func (ca *clusterAdmin) CreateTopic(topic string, detail *TopicDetail, validateOnly bool) error { |
| |
| if topic == "" { |
| return ErrInvalidTopic |
| } |
| |
| if detail == nil { |
| return errors.New("You must specify topic details") |
| } |
| |
| topicDetails := make(map[string]*TopicDetail) |
| topicDetails[topic] = detail |
| |
| request := &CreateTopicsRequest{ |
| TopicDetails: topicDetails, |
| ValidateOnly: validateOnly, |
| Timeout: ca.conf.Admin.Timeout, |
| } |
| |
| if ca.conf.Version.IsAtLeast(V0_11_0_0) { |
| request.Version = 1 |
| } |
| if ca.conf.Version.IsAtLeast(V1_0_0_0) { |
| request.Version = 2 |
| } |
| |
| b, err := ca.Controller() |
| if err != nil { |
| return err |
| } |
| |
| rsp, err := b.CreateTopics(request) |
| if err != nil { |
| return err |
| } |
| |
| topicErr, ok := rsp.TopicErrors[topic] |
| if !ok { |
| return ErrIncompleteResponse |
| } |
| |
| if topicErr.Err != ErrNoError { |
| return topicErr.Err |
| } |
| |
| return nil |
| } |
| |
| func (ca *clusterAdmin) DeleteTopic(topic string) error { |
| |
| if topic == "" { |
| return ErrInvalidTopic |
| } |
| |
| request := &DeleteTopicsRequest{ |
| Topics: []string{topic}, |
| Timeout: ca.conf.Admin.Timeout, |
| } |
| |
| if ca.conf.Version.IsAtLeast(V0_11_0_0) { |
| request.Version = 1 |
| } |
| |
| b, err := ca.Controller() |
| if err != nil { |
| return err |
| } |
| |
| rsp, err := b.DeleteTopics(request) |
| if err != nil { |
| return err |
| } |
| |
| topicErr, ok := rsp.TopicErrorCodes[topic] |
| if !ok { |
| return ErrIncompleteResponse |
| } |
| |
| if topicErr != ErrNoError { |
| return topicErr |
| } |
| return nil |
| } |
| |
| func (ca *clusterAdmin) CreatePartitions(topic string, count int32, assignment [][]int32, validateOnly bool) error { |
| if topic == "" { |
| return ErrInvalidTopic |
| } |
| |
| topicPartitions := make(map[string]*TopicPartition) |
| topicPartitions[topic] = &TopicPartition{Count: count, Assignment: assignment} |
| |
| request := &CreatePartitionsRequest{ |
| TopicPartitions: topicPartitions, |
| Timeout: ca.conf.Admin.Timeout, |
| } |
| |
| b, err := ca.Controller() |
| if err != nil { |
| return err |
| } |
| |
| rsp, err := b.CreatePartitions(request) |
| if err != nil { |
| return err |
| } |
| |
| topicErr, ok := rsp.TopicPartitionErrors[topic] |
| if !ok { |
| return ErrIncompleteResponse |
| } |
| |
| if topicErr.Err != ErrNoError { |
| return topicErr.Err |
| } |
| |
| return nil |
| } |
| |
| func (ca *clusterAdmin) DeleteRecords(topic string, partitionOffsets map[int32]int64) error { |
| |
| if topic == "" { |
| return ErrInvalidTopic |
| } |
| |
| topics := make(map[string]*DeleteRecordsRequestTopic) |
| topics[topic] = &DeleteRecordsRequestTopic{PartitionOffsets: partitionOffsets} |
| request := &DeleteRecordsRequest{ |
| Topics: topics, |
| Timeout: ca.conf.Admin.Timeout, |
| } |
| |
| b, err := ca.Controller() |
| if err != nil { |
| return err |
| } |
| |
| rsp, err := b.DeleteRecords(request) |
| if err != nil { |
| return err |
| } |
| |
| _, ok := rsp.Topics[topic] |
| if !ok { |
| return ErrIncompleteResponse |
| } |
| |
| //todo since we are dealing with couple of partitions it would be good if we return slice of errors |
| //for each partition instead of one error |
| return nil |
| } |
| |
| func (ca *clusterAdmin) DescribeConfig(resource ConfigResource) ([]ConfigEntry, error) { |
| |
| var entries []ConfigEntry |
| var resources []*ConfigResource |
| resources = append(resources, &resource) |
| |
| request := &DescribeConfigsRequest{ |
| Resources: resources, |
| } |
| |
| b, err := ca.Controller() |
| if err != nil { |
| return nil, err |
| } |
| |
| rsp, err := b.DescribeConfigs(request) |
| if err != nil { |
| return nil, err |
| } |
| |
| for _, rspResource := range rsp.Resources { |
| if rspResource.Name == resource.Name { |
| if rspResource.ErrorMsg != "" { |
| return nil, errors.New(rspResource.ErrorMsg) |
| } |
| for _, cfgEntry := range rspResource.Configs { |
| entries = append(entries, *cfgEntry) |
| } |
| } |
| } |
| return entries, nil |
| } |
| |
| func (ca *clusterAdmin) AlterConfig(resourceType ConfigResourceType, name string, entries map[string]*string, validateOnly bool) error { |
| |
| var resources []*AlterConfigsResource |
| resources = append(resources, &AlterConfigsResource{ |
| Type: resourceType, |
| Name: name, |
| ConfigEntries: entries, |
| }) |
| |
| request := &AlterConfigsRequest{ |
| Resources: resources, |
| ValidateOnly: validateOnly, |
| } |
| |
| b, err := ca.Controller() |
| if err != nil { |
| return err |
| } |
| |
| rsp, err := b.AlterConfigs(request) |
| if err != nil { |
| return err |
| } |
| |
| for _, rspResource := range rsp.Resources { |
| if rspResource.Name == name { |
| if rspResource.ErrorMsg != "" { |
| return errors.New(rspResource.ErrorMsg) |
| } |
| } |
| } |
| return nil |
| } |
| |
| func (ca *clusterAdmin) CreateACL(resource Resource, acl Acl) error { |
| var acls []*AclCreation |
| acls = append(acls, &AclCreation{resource, acl}) |
| request := &CreateAclsRequest{AclCreations: acls} |
| |
| b, err := ca.Controller() |
| if err != nil { |
| return err |
| } |
| |
| _, err = b.CreateAcls(request) |
| return err |
| } |
| |
| func (ca *clusterAdmin) ListAcls(filter AclFilter) ([]ResourceAcls, error) { |
| |
| request := &DescribeAclsRequest{AclFilter: filter} |
| |
| b, err := ca.Controller() |
| if err != nil { |
| return nil, err |
| } |
| |
| rsp, err := b.DescribeAcls(request) |
| if err != nil { |
| return nil, err |
| } |
| |
| var lAcls []ResourceAcls |
| for _, rAcl := range rsp.ResourceAcls { |
| lAcls = append(lAcls, *rAcl) |
| } |
| return lAcls, nil |
| } |
| |
| func (ca *clusterAdmin) DeleteACL(filter AclFilter, validateOnly bool) ([]MatchingAcl, error) { |
| var filters []*AclFilter |
| filters = append(filters, &filter) |
| request := &DeleteAclsRequest{Filters: filters} |
| |
| b, err := ca.Controller() |
| if err != nil { |
| return nil, err |
| } |
| |
| rsp, err := b.DeleteAcls(request) |
| if err != nil { |
| return nil, err |
| } |
| |
| var mAcls []MatchingAcl |
| for _, fr := range rsp.FilterResponses { |
| for _, mACL := range fr.MatchingAcls { |
| mAcls = append(mAcls, *mACL) |
| } |
| |
| } |
| return mAcls, nil |
| } |