blob: 33d531d99e67ba39bbe2b5c1ef9dce1c31f01faa [file] [log] [blame]
David K. Bainbridge157bdab2020-01-16 14:38:05 -08001/*
2 Copyright 2020 the original author or authors.
3
4 Licensed under the Apache License, Version 2.0 (the "License");
5 you may not use this file except in compliance with the License.
6 You may obtain a copy of the License at
7
8 http://www.apache.org/licenses/LICENSE-2.0
9
10 Unless required by applicable law or agreed to in writing, software
11 distributed under the License is distributed on an "AS IS" BASIS,
12 WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 See the License for the specific language governing permissions and
14 limitations under the License.
15*/
16
17package openflow
18
19import (
20 "context"
21 "encoding/binary"
22 "encoding/json"
23 "errors"
24 "github.com/donNewtonAlpha/goloxi"
25 ofp "github.com/donNewtonAlpha/goloxi/of13"
26 "github.com/opencord/voltha-lib-go/v2/pkg/log"
27 "github.com/opencord/voltha-protos/v2/go/voltha"
28 "io"
29 "net"
30 "time"
31)
32
33var logger, _ = log.AddPackage(log.JSON, log.DebugLevel, nil)
34
35type ofcEvent byte
36type ofcState byte
37
38const (
39 ofcEventStart = ofcEvent(iota)
40 ofcEventConnected
41 ofcEventDisconnected
42
43 ofcStateConnected = ofcState(iota)
44 ofcStateDisconnected
45)
46
47//Client structure to hold fields of Openflow Client
48type OFClient struct {
49 OFControllerEndPoint string
50 Port uint16
51 DeviceID string
52 KeepRunning bool
53 VolthaClient voltha.VolthaServiceClient
54 PacketOutChannel chan *voltha.PacketOut
55 ConnectionMaxRetries int
56 ConnectionRetryDelay time.Duration
57 conn net.Conn
58
59 // expirimental
60 events chan ofcEvent
61 sendChannel chan Message
62 lastUnsentMessage Message
63}
64
65//NewClient contstructs a new Openflow Client and then starts up
66func NewOFClient(config *OFClient) *OFClient {
67
68 ofc := OFClient{
69 DeviceID: config.DeviceID,
70 OFControllerEndPoint: config.OFControllerEndPoint,
71 VolthaClient: config.VolthaClient,
72 PacketOutChannel: config.PacketOutChannel,
73 KeepRunning: config.KeepRunning,
74 ConnectionMaxRetries: config.ConnectionMaxRetries,
75 ConnectionRetryDelay: config.ConnectionRetryDelay,
76 events: make(chan ofcEvent, 10),
77 sendChannel: make(chan Message, 100),
78 }
79
80 if ofc.ConnectionRetryDelay <= 0 {
81 logger.Warnw("connection retry delay not valid, setting to default",
82 log.Fields{
83 "device-id": ofc.DeviceID,
84 "value": ofc.ConnectionRetryDelay.String(),
85 "default": (3 * time.Second).String()})
86 ofc.ConnectionRetryDelay = 3 * time.Second
87 }
88 return &ofc
89}
90
91//End - set keepRunning to false so start loop exits
92func (ofc *OFClient) Stop() {
93 ofc.KeepRunning = false
94}
95
96func (ofc *OFClient) peekAtOFHeader(buf []byte) (ofp.IHeader, error) {
97 header := ofp.Header{}
98 header.Version = uint8(buf[0])
99 header.Type = uint8(buf[1])
100 header.Length = binary.BigEndian.Uint16(buf[2:4])
101 header.Xid = binary.BigEndian.Uint32(buf[4:8])
102
103 // TODO: add minimal validation of version and type
104
105 return &header, nil
106}
107
108func (ofc *OFClient) establishConnectionToController() error {
109 if ofc.conn != nil {
110 ofc.conn.Close()
111 ofc.conn = nil
112 }
113 try := 1
114 for ofc.ConnectionMaxRetries == 0 || try < ofc.ConnectionMaxRetries {
115 if raddr, err := net.ResolveTCPAddr("tcp", ofc.OFControllerEndPoint); err != nil {
116 logger.Debugw("openflow-client unable to resolve endpoint",
117 log.Fields{
118 "device-id": ofc.DeviceID,
119 "endpoint": ofc.OFControllerEndPoint})
120 } else {
121 if connection, err := net.DialTCP("tcp", nil, raddr); err == nil {
122 ofc.conn = connection
123 ofc.sayHello()
124 ofc.events <- ofcEventConnected
125 return nil
126 } else {
127 logger.Warnw("openflow-client-connect-error",
128 log.Fields{
129 "device-id": ofc.DeviceID,
130 "endpoint": ofc.OFControllerEndPoint})
131 }
132 }
133 if ofc.ConnectionMaxRetries == 0 || try < ofc.ConnectionMaxRetries {
134 if ofc.ConnectionMaxRetries != 0 {
135 try += 1
136 }
137 time.Sleep(ofc.ConnectionRetryDelay)
138 }
139 }
140 return errors.New("failed-to-connect-to-of-controller")
141}
142
143func (ofc *OFClient) Run(ctx context.Context) {
144
145 var ofCtx context.Context
146 var ofDone func()
147 ofc.events <- ofcEventStart
148 state := ofcStateDisconnected
149top:
150 for {
151 select {
152 case <-ctx.Done():
153 break top
154 case event := <-ofc.events:
155 switch event {
156 case ofcEventStart:
157 logger.Debugw("ofc-event-star",
158 log.Fields{"device-id": ofc.DeviceID})
159 go ofc.establishConnectionToController()
160 case ofcEventConnected:
161 if state == ofcStateDisconnected {
162 state = ofcStateConnected
163 logger.Debugw("ofc-event-connected",
164 log.Fields{"device-id": ofc.DeviceID})
165 ofCtx, ofDone = context.WithCancel(context.Background())
166 go ofc.messageSender(ofCtx)
167 go ofc.processOFStream(ofCtx)
168 }
169 case ofcEventDisconnected:
170 if state == ofcStateConnected {
171 state = ofcStateDisconnected
172 logger.Debugw("ofc-event-disconnected",
173 log.Fields{"device-id": ofc.DeviceID})
174 if ofDone != nil {
175 ofDone()
176 ofDone = nil
177 }
178 go ofc.establishConnectionToController()
179 }
180 }
181 }
182 }
183
184 if ofDone != nil {
185 ofDone()
186 ofDone = nil
187 }
188
189}
190
191// Run run loop for the openflow client
192func (ofc *OFClient) processOFStream(ctx context.Context) {
193 buf := make([]byte, 1500)
194 var need, have int
195 /*
196 * EXPLANATION
197 *
198 * The below loops reuses a byte array to read messages from the TCP
199 * connection to the OF controller. It reads messages into a large
200 * buffer in an attempt to optimize the read performance from the
201 * TCP connection. This means that on any given read there may be more
202 * than a single message in the byte array read.
203 *
204 * As the minimal size for an OF message is 8 bytes (because that is
205 * the size of the basic header) we know that if we have not read
206 * 8 bytes we need to read more before we can process a message.
207 *
208 * Once the mninium header is read, the complete length of the
209 * message is retrieved from the header and bytes are repeatedly read
210 * until we know the byte array contains at least one message.
211 *
212 * Once it is known that the buffer has at least a single message
213 * a slice (msg) is moved through the read bytes looking to process
214 * each message util the length of read data is < the length required
215 * i.e., the minimum size or the size of the next message.
216 *
217 * When no more message can be proessed from the byte array any unused
218 * bytes are moved to the front of the source array and more data is
219 * read from the TCP connection.
220 */
221
222 /*
223 * First thing we are looking for is an openflow header, so we need at
224 * least 8 bytes
225 */
226 need = 8
227
228top:
229 // Continue until we are told to stop
230 for ofc.KeepRunning {
231 logger.Debugw("before-read-from-controller",
232 log.Fields{
233 "device-id": ofc.DeviceID,
234 "have": have,
235 "need": need,
236 "buf-length": len(buf[have:])})
237 read, err := ofc.conn.Read(buf[have:])
238 have += read
239 logger.Debugw("read-from-controller",
240 log.Fields{
241 "device-id": ofc.DeviceID,
242 "byte-count": read,
243 "error": err})
244
245 /*
246 * If we have less than we need and there is no
247 * error, then continue to attempt to read more data
248 */
249 if have < need && err == nil {
250 // No bytes available, just continue
251 logger.Debugw("continue-to-read",
252 log.Fields{
253 "device-id": ofc.DeviceID,
254 "have": have,
255 "need": need,
256 "error": err})
257 continue
258 }
259
260 /*
261 * Single out EOF here, because if we have bytes
262 * but have an EOF we still want to process the
263 * the last meesage. A read of 0 bytes and EOF is
264 * a terminated connection.
265 */
266 if err != nil && (err != io.EOF || read == 0) {
267 logger.Errorw("voltha-connection-dead",
268 log.Fields{
269 "device-id": ofc.DeviceID,
270 "error": err})
271 break
272 }
273
274 /*
275 * We should have at least 1 message at this point so
276 * create a slice (msg) that points to the start of the
277 * buffer
278 */
279 msg := buf[0:]
280 for need <= have {
281 logger.Debugw("process-of-message-stream",
282 log.Fields{
283 "device-id": ofc.DeviceID,
284 "have": have,
285 "need": need})
286 /*
287 * If we get here, we have at least the 8 bytes of the
288 * header, if not enough for the complete message. So
289 * take a peek at the OF header to do simple validation
290 * and be able to get the full expected length of the
291 * packet
292 */
293 peek, err := ofc.peekAtOFHeader(msg)
294 if err != nil {
295 /*
296 * Header is bad, assume stream is corrupted
297 * and needs to be restarted
298 */
299 logger.Errorw("bad-of-packet",
300 log.Fields{
301 "device-id": ofc.DeviceID,
302 "error": err})
303 break top
304 }
305
306 /*
307 * If we don't have the full packet, then back around
308 * the outer loop to get more bytes
309 */
310 need = int(peek.GetLength())
311
312 logger.Debugw("processed-header-need-message",
313 log.Fields{
314 "device-id": ofc.DeviceID,
315 "have": have,
316 "need": need})
317
318 if have < need {
319 logger.Debugw("end-processing:continue-to-read",
320 log.Fields{
321 "device-id": ofc.DeviceID,
322 "have": have,
323 "need": need})
324 break
325 }
326
327 // Decode and process the packet
328 decoder := goloxi.NewDecoder(msg)
329 header, err := ofp.DecodeHeader(decoder)
330 if err != nil {
331 js, _ := json.Marshal(decoder)
332 logger.Errorw("failed-to-decode",
333 log.Fields{
334 "device-id": ofc.DeviceID,
335 "decoder": js,
336 "error": err})
337 break top
338 }
339 if logger.V(log.DebugLevel) {
340 js, _ := json.Marshal(header)
341 logger.Debugw("packet-header",
342 log.Fields{
343 "device-id": ofc.DeviceID,
344 "header": js})
345 }
346 ofc.parseHeader(header)
347
348 /*
349 * Move the msg slice to the start of the next
350 * message, which is the current message plus the
351 * used bytes (need)
352 */
353 msg = msg[need:]
354 have -= need
355
356 // Finished process method, need header again
357 need = 8
358
359 logger.Debugw("message-process-complete",
360 log.Fields{
361 "device-id": ofc.DeviceID,
362 "have": have,
363 "need": need,
364 "read-length": len(buf[have:])})
365 }
366 /*
367 * If we have any left over bytes move them to the front
368 * of the byte array to be appended to bny the next read
369 */
370 if have > 0 {
371 copy(buf, msg)
372 }
373 }
374 ofc.events <- ofcEventDisconnected
375}
376
377func (ofc *OFClient) sayHello() {
378 hello := ofp.NewHello()
379 hello.Xid = uint32(GetXid())
380 elem := ofp.NewHelloElemVersionbitmap()
381 elem.SetType(ofp.OFPHETVersionbitmap)
382 elem.SetLength(8)
383 elem.SetBitmaps([]*ofp.Uint32{&ofp.Uint32{Value: 16}})
384 hello.SetElements([]ofp.IHelloElem{elem})
385 if logger.V(log.DebugLevel) {
386 js, _ := json.Marshal(hello)
387 logger.Debugw("sayHello Called",
388 log.Fields{
389 "device-id": ofc.DeviceID,
390 "hello-message": js})
391 }
392 if err := ofc.SendMessage(hello); err != nil {
393 logger.Fatalw("Failed saying hello to Openflow Server, unable to proceed",
394 log.Fields{
395 "device-id": ofc.DeviceID,
396 "error": err})
397 }
398}
399
400func (ofc *OFClient) parseHeader(header ofp.IHeader) {
401 switch header.GetType() {
402 case ofp.OFPTHello:
403 //x := header.(*ofp.Hello)
404 case ofp.OFPTError:
405 go ofc.handleErrMsg(header.(*ofp.ErrorMsg))
406 case ofp.OFPTEchoRequest:
407 go ofc.handleEchoRequest(header.(*ofp.EchoRequest))
408 case ofp.OFPTEchoReply:
409 case ofp.OFPTExperimenter:
410 case ofp.OFPTFeaturesRequest:
411 go ofc.handleFeatureRequest(header.(*ofp.FeaturesRequest))
412 case ofp.OFPTFeaturesReply:
413 case ofp.OFPTGetConfigRequest:
414 go ofc.handleGetConfigRequest(header.(*ofp.GetConfigRequest))
415 case ofp.OFPTGetConfigReply:
416 case ofp.OFPTSetConfig:
417 go ofc.handleSetConfig(header.(*ofp.SetConfig))
418 case ofp.OFPTPacketIn:
419 case ofp.OFPTFlowRemoved:
420 case ofp.OFPTPortStatus:
421 case ofp.OFPTPacketOut:
422 go ofc.handlePacketOut(header.(*ofp.PacketOut))
423 case ofp.OFPTFlowMod:
424 /*
425 * Not using go routine to handle flow* messages or barrier requests
426 * onos typically issues barrier requests just before a flow* message.
427 * by handling in this thread I ensure all flow* are handled when barrier
428 * request is issued.
429 */
430 switch header.(ofp.IFlowMod).GetCommand() {
431 case ofp.OFPFCAdd:
432 ofc.handleFlowAdd(header.(*ofp.FlowAdd))
433 case ofp.OFPFCModify:
434 ofc.handleFlowMod(header.(*ofp.FlowMod))
435 case ofp.OFPFCModifyStrict:
436 ofc.handleFlowModStrict(header.(*ofp.FlowModifyStrict))
437 case ofp.OFPFCDelete:
438 ofc.handleFlowDelete(header.(*ofp.FlowDelete))
439 case ofp.OFPFCDeleteStrict:
440 ofc.handleFlowDeleteStrict(header.(*ofp.FlowDeleteStrict))
441 }
442 case ofp.OFPTStatsRequest:
443 go ofc.handleStatsRequest(header, header.(ofp.IStatsRequest).GetStatsType())
444 case ofp.OFPTBarrierRequest:
445 /* See note above at case ofp.OFPTFlowMod:*/
446 ofc.handleBarrierRequest(header.(*ofp.BarrierRequest))
447 case ofp.OFPTRoleRequest:
448 go ofc.handleRoleRequest(header.(*ofp.RoleRequest))
449 case ofp.OFPTMeterMod:
450 go ofc.handleMeterModRequest(header.(*ofp.MeterMod))
451 }
452}
453
454//Message created to allow for a single SendMessage
455type Message interface {
456 Serialize(encoder *goloxi.Encoder) error
457}
458
459func (ofc *OFClient) doSend(msg Message) error {
460 if ofc.conn == nil {
461 return errors.New("no-connection")
462 }
463 enc := goloxi.NewEncoder()
464 msg.Serialize(enc)
465 bytes := enc.Bytes()
466 if _, err := ofc.conn.Write(bytes); err != nil {
467 logger.Warnw("unable-to-send-message-to-controller",
468 log.Fields{
469 "device-id": ofc.DeviceID,
470 "message": msg,
471 "error": err})
472 return err
473 }
474 return nil
475}
476
477func (ofc *OFClient) messageSender(ctx context.Context) {
478
479 // first process last fail if it exists
480 if ofc.lastUnsentMessage != nil {
481 if err := ofc.doSend(ofc.lastUnsentMessage); err != nil {
482 ofc.events <- ofcEventDisconnected
483 return
484 }
485 ofc.lastUnsentMessage = nil
486 }
487top:
488 for {
489 select {
490 case <-ctx.Done():
491 break top
492 case msg := <-ofc.sendChannel:
493 if ofc.doSend(msg) != nil {
494 ofc.lastUnsentMessage = msg
495 ofc.events <- ofcEventDisconnected
496 return
497 }
498 ofc.lastUnsentMessage = nil
499 }
500 }
501}
502
503func (ofc *OFClient) SendMessage(message Message) error {
504 ofc.sendChannel <- message
505 return nil
506}
507
508//SendMessage sends message to openflow server
509func (ofc *OFClient) SendMessageOrig(message Message) error {
510 if logger.V(log.DebugLevel) {
511 js, _ := json.Marshal(message)
512 logger.Debugw("SendMessage called",
513 log.Fields{
514 "device-id": ofc.DeviceID,
515 "message": js})
516 }
517 enc := goloxi.NewEncoder()
518 message.Serialize(enc)
519 for {
520 if ofc.conn == nil {
521 logger.Warnln("SendMessage Connection is Nil sleeping for 10 milliseconds")
522 time.Sleep(10 * time.Millisecond)
523 } else {
524 break
525 }
526 }
527 bytes := enc.Bytes()
528 if _, err := ofc.conn.Write(bytes); err != nil {
529 jMessage, _ := json.Marshal(message)
530 logger.Errorw("SendMessage failed sending message",
531 log.Fields{
532 "device-id": ofc.DeviceID,
533 "error": err,
534 "message": jMessage})
535 return err
536 }
537 return nil
538}