Add NETCONF notification for ONU activation and Kafka client to receive events, update dependencies
Change-Id: I5f768fa8077ef7c64e00a534744ca47492344935
diff --git a/vendor/github.com/Shopify/sarama/record.go b/vendor/github.com/Shopify/sarama/record.go
new file mode 100644
index 0000000..a3fe8c0
--- /dev/null
+++ b/vendor/github.com/Shopify/sarama/record.go
@@ -0,0 +1,116 @@
+package sarama
+
+import (
+ "encoding/binary"
+ "time"
+)
+
+const (
+ isTransactionalMask = 0x10
+ controlMask = 0x20
+ maximumRecordOverhead = 5*binary.MaxVarintLen32 + binary.MaxVarintLen64 + 1
+)
+
+// RecordHeader stores key and value for a record header
+type RecordHeader struct {
+ Key []byte
+ Value []byte
+}
+
+func (h *RecordHeader) encode(pe packetEncoder) error {
+ if err := pe.putVarintBytes(h.Key); err != nil {
+ return err
+ }
+ return pe.putVarintBytes(h.Value)
+}
+
+func (h *RecordHeader) decode(pd packetDecoder) (err error) {
+ if h.Key, err = pd.getVarintBytes(); err != nil {
+ return err
+ }
+
+ if h.Value, err = pd.getVarintBytes(); err != nil {
+ return err
+ }
+ return nil
+}
+
+// Record is kafka record type
+type Record struct {
+ Headers []*RecordHeader
+
+ Attributes int8
+ TimestampDelta time.Duration
+ OffsetDelta int64
+ Key []byte
+ Value []byte
+ length varintLengthField
+}
+
+func (r *Record) encode(pe packetEncoder) error {
+ pe.push(&r.length)
+ pe.putInt8(r.Attributes)
+ pe.putVarint(int64(r.TimestampDelta / time.Millisecond))
+ pe.putVarint(r.OffsetDelta)
+ if err := pe.putVarintBytes(r.Key); err != nil {
+ return err
+ }
+ if err := pe.putVarintBytes(r.Value); err != nil {
+ return err
+ }
+ pe.putVarint(int64(len(r.Headers)))
+
+ for _, h := range r.Headers {
+ if err := h.encode(pe); err != nil {
+ return err
+ }
+ }
+
+ return pe.pop()
+}
+
+func (r *Record) decode(pd packetDecoder) (err error) {
+ if err = pd.push(&r.length); err != nil {
+ return err
+ }
+
+ if r.Attributes, err = pd.getInt8(); err != nil {
+ return err
+ }
+
+ timestamp, err := pd.getVarint()
+ if err != nil {
+ return err
+ }
+ r.TimestampDelta = time.Duration(timestamp) * time.Millisecond
+
+ if r.OffsetDelta, err = pd.getVarint(); err != nil {
+ return err
+ }
+
+ if r.Key, err = pd.getVarintBytes(); err != nil {
+ return err
+ }
+
+ if r.Value, err = pd.getVarintBytes(); err != nil {
+ return err
+ }
+
+ numHeaders, err := pd.getVarint()
+ if err != nil {
+ return err
+ }
+
+ if numHeaders >= 0 {
+ r.Headers = make([]*RecordHeader, numHeaders)
+ }
+ for i := int64(0); i < numHeaders; i++ {
+ hdr := new(RecordHeader)
+ if err := hdr.decode(pd); err != nil {
+ return err
+ }
+ r.Headers[i] = hdr
+ }
+
+ return pd.pop()
+}