VOL-1460 Updated core to use voltha-protos instead of local protos
Moved protos to python directory in order to maintain functionaly of containers built there.
Added capability to do local builds of protos
Added instructions on running dep ensure for getting protos.
Updated github.com/golang/protobuf version to v1.3.1

Change-Id: Ia6ef55f07f0d5dcb5b750d7c37b21b71db85bfc4
diff --git a/vendor/github.com/Shopify/sarama/async_producer.go b/vendor/github.com/Shopify/sarama/async_producer.go
index 89a0c70..5174a35 100644
--- a/vendor/github.com/Shopify/sarama/async_producer.go
+++ b/vendor/github.com/Shopify/sarama/async_producer.go
@@ -483,6 +483,19 @@
 	return input
 }
 
+func (pp *partitionProducer) backoff(retries int) {
+	var backoff time.Duration
+	if pp.parent.conf.Producer.Retry.BackoffFunc != nil {
+		maxRetries := pp.parent.conf.Producer.Retry.Max
+		backoff = pp.parent.conf.Producer.Retry.BackoffFunc(retries, maxRetries)
+	} else {
+		backoff = pp.parent.conf.Producer.Retry.Backoff
+	}
+	if backoff > 0 {
+		time.Sleep(backoff)
+	}
+}
+
 func (pp *partitionProducer) dispatch() {
 	// try to prefetch the leader; if this doesn't work, we'll do a proper call to `updateLeader`
 	// on the first message
@@ -493,11 +506,31 @@
 		pp.brokerProducer.input <- &ProducerMessage{Topic: pp.topic, Partition: pp.partition, flags: syn}
 	}
 
+	defer func() {
+		if pp.brokerProducer != nil {
+			pp.parent.unrefBrokerProducer(pp.leader, pp.brokerProducer)
+		}
+	}()
+
 	for msg := range pp.input {
+
+		if pp.brokerProducer != nil && pp.brokerProducer.abandoned != nil {
+			select {
+			case <-pp.brokerProducer.abandoned:
+				// a message on the abandoned channel means that our current broker selection is out of date
+				Logger.Printf("producer/leader/%s/%d abandoning broker %d\n", pp.topic, pp.partition, pp.leader.ID())
+				pp.parent.unrefBrokerProducer(pp.leader, pp.brokerProducer)
+				pp.brokerProducer = nil
+				time.Sleep(pp.parent.conf.Producer.Retry.Backoff)
+			default:
+				// producer connection is still open.
+			}
+		}
+
 		if msg.retries > pp.highWatermark {
 			// a new, higher, retry level; handle it and then back off
 			pp.newHighWatermark(msg.retries)
-			time.Sleep(pp.parent.conf.Producer.Retry.Backoff)
+			pp.backoff(msg.retries)
 		} else if pp.highWatermark > 0 {
 			// we are retrying something (else highWatermark would be 0) but this message is not a *new* retry level
 			if msg.retries < pp.highWatermark {
@@ -525,7 +558,7 @@
 		if pp.brokerProducer == nil {
 			if err := pp.updateLeader(); err != nil {
 				pp.parent.returnError(msg, err)
-				time.Sleep(pp.parent.conf.Producer.Retry.Backoff)
+				pp.backoff(msg.retries)
 				continue
 			}
 			Logger.Printf("producer/leader/%s/%d selected broker %d\n", pp.topic, pp.partition, pp.leader.ID())
@@ -533,10 +566,6 @@
 
 		pp.brokerProducer.input <- msg
 	}
-
-	if pp.brokerProducer != nil {
-		pp.parent.unrefBrokerProducer(pp.leader, pp.brokerProducer)
-	}
 }
 
 func (pp *partitionProducer) newHighWatermark(hwm int) {
@@ -637,6 +666,10 @@
 		close(responses)
 	})
 
+	if p.conf.Producer.Retry.Max <= 0 {
+		bp.abandoned = make(chan struct{})
+	}
+
 	return bp
 }
 
@@ -655,6 +688,7 @@
 	input     chan *ProducerMessage
 	output    chan<- *produceSet
 	responses <-chan *brokerProducerResponse
+	abandoned chan struct{}
 
 	buffer     *produceSet
 	timer      <-chan time.Time
@@ -829,9 +863,17 @@
 		// Retriable errors
 		case ErrInvalidMessage, ErrUnknownTopicOrPartition, ErrLeaderNotAvailable, ErrNotLeaderForPartition,
 			ErrRequestTimedOut, ErrNotEnoughReplicas, ErrNotEnoughReplicasAfterAppend:
-			retryTopics = append(retryTopics, topic)
+			if bp.parent.conf.Producer.Retry.Max <= 0 {
+				bp.parent.abandonBrokerConnection(bp.broker)
+				bp.parent.returnErrors(pSet.msgs, block.Err)
+			} else {
+				retryTopics = append(retryTopics, topic)
+			}
 		// Other non-retriable errors
 		default:
+			if bp.parent.conf.Producer.Retry.Max <= 0 {
+				bp.parent.abandonBrokerConnection(bp.broker)
+			}
 			bp.parent.returnErrors(pSet.msgs, block.Err)
 		}
 	})
@@ -1048,5 +1090,10 @@
 	p.brokerLock.Lock()
 	defer p.brokerLock.Unlock()
 
+	bc, ok := p.brokers[broker]
+	if ok && bc.abandoned != nil {
+		close(bc.abandoned)
+	}
+
 	delete(p.brokers, broker)
 }