blob: db53ae52e690af4251c0e23397cc57700690bf19 [file] [log] [blame]
Jonathan Hart4b110f62020-03-13 17:36:19 -07001/*
2 Copyright 2020 the original author or authors.
3
4 Licensed under the Apache License, Version 2.0 (the "License");
5 you may not use this file except in compliance with the License.
6 You may obtain a copy of the License at
7
8 http://www.apache.org/licenses/LICENSE-2.0
9
10 Unless required by applicable law or agreed to in writing, software
11 distributed under the License is distributed on an "AS IS" BASIS,
12 WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 See the License for the specific language governing permissions and
14 limitations under the License.
15*/
16package openflow
17
18import (
19 "bufio"
20 "context"
21 "encoding/binary"
22 "encoding/json"
23 "errors"
Jonathan Hart4b110f62020-03-13 17:36:19 -070024 "io"
25 "net"
26 "time"
David Bainbridgef8ce7d22020-04-08 12:49:41 -070027
Jonathan Hart828908c2020-04-15 14:23:45 -070028 "github.com/opencord/goloxi"
29 ofp "github.com/opencord/goloxi/of13"
David Bainbridgef8ce7d22020-04-08 12:49:41 -070030 "github.com/opencord/ofagent-go/internal/pkg/holder"
31 "github.com/opencord/voltha-lib-go/v3/pkg/log"
32 "github.com/opencord/voltha-protos/v3/go/voltha"
Jonathan Hart4b110f62020-03-13 17:36:19 -070033)
34
35type OFConnection struct {
36 OFControllerEndPoint string
37 DeviceID string
David Bainbridgef8ce7d22020-04-08 12:49:41 -070038 VolthaClient *holder.VolthaServiceClientHolder
Jonathan Hart4b110f62020-03-13 17:36:19 -070039 PacketOutChannel chan *voltha.PacketOut
40 ConnectionMaxRetries int
41 ConnectionRetryDelay time.Duration
42
43 conn net.Conn
44
45 // current role of this connection
46 role ofcRole
47 roleManager RoleManager
48
49 events chan ofcEvent
50 sendChannel chan Message
51 lastUnsentMessage Message
Matteo Scandolo256266d2020-06-01 13:44:07 -070052
53 flowsChunkSize int
54 portsChunkSize int
55 portsDescChunkSize int
Jonathan Hart4b110f62020-03-13 17:36:19 -070056}
57
58func (ofc *OFConnection) peekAtOFHeader(buf []byte) (ofp.IHeader, error) {
59 header := ofp.Header{}
60 header.Version = uint8(buf[0])
61 header.Type = uint8(buf[1])
62 header.Length = binary.BigEndian.Uint16(buf[2:4])
63 header.Xid = binary.BigEndian.Uint32(buf[4:8])
64
65 // TODO: add minimal validation of version and type
66
67 return &header, nil
68}
69
70func (ofc *OFConnection) establishConnectionToController() error {
71 if ofc.conn != nil {
72 logger.Debugw("closing-of-connection-to-reconnect",
73 log.Fields{"device-id": ofc.DeviceID})
74 ofc.conn.Close()
75 ofc.conn = nil
76 }
77 try := 1
78 for ofc.ConnectionMaxRetries == 0 || try < ofc.ConnectionMaxRetries {
79 if raddr, err := net.ResolveTCPAddr("tcp", ofc.OFControllerEndPoint); err != nil {
80 logger.Debugw("openflow-client unable to resolve endpoint",
81 log.Fields{
82 "device-id": ofc.DeviceID,
83 "endpoint": ofc.OFControllerEndPoint})
84 } else {
85 if connection, err := net.DialTCP("tcp", nil, raddr); err == nil {
86 ofc.conn = connection
87 ofc.sayHello()
88 ofc.events <- ofcEventConnect
89 return nil
90 } else {
91 logger.Warnw("openflow-client-connect-error",
92 log.Fields{
93 "device-id": ofc.DeviceID,
94 "endpoint": ofc.OFControllerEndPoint})
95 }
96 }
97 if ofc.ConnectionMaxRetries == 0 || try < ofc.ConnectionMaxRetries {
98 if ofc.ConnectionMaxRetries != 0 {
99 try += 1
100 }
101 time.Sleep(ofc.ConnectionRetryDelay)
102 }
103 }
104 return errors.New("failed-to-connect-to-of-controller")
105}
106
107// Run implements the state machine for the OF client reacting to state change
108// events and invoking actions as a reaction to those state changes
109func (ofc *OFConnection) Run(ctx context.Context) {
110
111 var ofCtx context.Context
112 var ofDone func()
113 state := ofcStateCreated
114 ofc.events <- ofcEventStart
115top:
116 for {
117 select {
118 case <-ctx.Done():
119 state = ofcStateStopped
120 logger.Debugw("state-transition-context-done",
121 log.Fields{"device-id": ofc.DeviceID})
122 break top
123 case event := <-ofc.events:
124 previous := state
125 switch event {
126 case ofcEventStart:
127 logger.Debugw("ofc-event-start",
128 log.Fields{"device-id": ofc.DeviceID})
129 if state == ofcStateCreated {
130 state = ofcStateStarted
131 logger.Debug("STARTED MORE THAN ONCE")
132 go func() {
133 if err := ofc.establishConnectionToController(); err != nil {
134 logger.Errorw("controller-connection-failed", log.Fields{"error": err})
135 panic(err)
136 }
137 }()
138 } else {
139 logger.Errorw("illegal-state-transition",
140 log.Fields{
141 "device-id": ofc.DeviceID,
142 "current-state": state.String(),
143 "event": event.String()})
144 }
145 case ofcEventConnect:
146 logger.Debugw("ofc-event-connected",
147 log.Fields{"device-id": ofc.DeviceID})
148 if state == ofcStateStarted || state == ofcStateDisconnected {
149 state = ofcStateConnected
150 ofCtx, ofDone = context.WithCancel(context.Background())
151 go ofc.messageSender(ofCtx)
152 go ofc.processOFStream(ofCtx)
153 } else {
154 logger.Errorw("illegal-state-transition",
155 log.Fields{
156 "device-id": ofc.DeviceID,
157 "current-state": state.String(),
158 "event": event.String()})
159 }
160 case ofcEventDisconnect:
161 logger.Debugw("ofc-event-disconnected",
162 log.Fields{
163 "device-id": ofc.DeviceID,
164 "state": state.String()})
165 if state == ofcStateConnected {
166 state = ofcStateDisconnected
167 if ofDone != nil {
168 ofDone()
169 ofDone = nil
170 }
171 go func() {
172 if err := ofc.establishConnectionToController(); err != nil {
Girish Kumar182049b2020-07-08 18:53:34 +0000173 logger.Errorw("controller-connection-failed", log.Fields{"error": err})
Jonathan Hart4b110f62020-03-13 17:36:19 -0700174 panic(err)
175 }
176 }()
177 } else {
178 logger.Errorw("illegal-state-transition",
179 log.Fields{
180 "device-id": ofc.DeviceID,
181 "current-state": state.String(),
182 "event": event.String()})
183 }
184 case ofcEventStop:
185 logger.Debugw("ofc-event-stop",
186 log.Fields{"device-id": ofc.DeviceID})
187 if state == ofcStateCreated || state == ofcStateConnected || state == ofcStateDisconnected {
188 state = ofcStateStopped
189 break top
190 } else {
191 logger.Errorw("illegal-state-transition",
192 log.Fields{
193 "device-id": ofc.DeviceID,
194 "current-state": state.String(),
195 "event": event.String()})
196 }
197 }
198 logger.Debugw("state-transition",
199 log.Fields{
200 "device-id": ofc.DeviceID,
201 "previous-state": previous.String(),
202 "current-state": state.String(),
203 "event": event.String()})
204 }
205 }
206
207 // If the child context exists, then cancel it
208 if ofDone != nil {
Girish Kumar182049b2020-07-08 18:53:34 +0000209 logger.Debugw("closing-child-processes",
Jonathan Hart4b110f62020-03-13 17:36:19 -0700210 log.Fields{"device-id": ofc.DeviceID})
211 ofDone()
212 }
213
214 // If the connection is open, then close it
215 if ofc.conn != nil {
Girish Kumar182049b2020-07-08 18:53:34 +0000216 logger.Debugw("closing-of-connection",
Jonathan Hart4b110f62020-03-13 17:36:19 -0700217 log.Fields{"device-id": ofc.DeviceID})
218 ofc.conn.Close()
219 ofc.conn = nil
220 }
Girish Kumar182049b2020-07-08 18:53:34 +0000221 logger.Debugw("state-machine-finished",
Jonathan Hart4b110f62020-03-13 17:36:19 -0700222 log.Fields{"device-id": ofc.DeviceID})
223}
224
225// processOFStream processes the OF connection from the controller and invokes
226// the appropriate handler methods for each message.
227func (ofc *OFConnection) processOFStream(ctx context.Context) {
228 fromController := bufio.NewReader(ofc.conn)
229
230 /*
231 * We have a read buffer of a max size of 4096, so if we ever have
232 * a message larger than this then we will have issues
233 */
234 headerBuf := make([]byte, 8)
Jonathan Hart4b110f62020-03-13 17:36:19 -0700235top:
236 // Continue until we are told to stop
237 for {
238 select {
239 case <-ctx.Done():
240 logger.Error("of-loop-ending-context-done")
241 break top
242 default:
243 // Read 8 bytes, the standard OF header
244 read, err := io.ReadFull(fromController, headerBuf)
245 if err != nil {
Jonathan Hart828908c2020-04-15 14:23:45 -0700246 if err == io.EOF {
247 logger.Infow("controller-disconnected",
248 log.Fields{
249 "device-id": ofc.DeviceID,
250 "controller": ofc.OFControllerEndPoint,
251 })
252 } else {
253 logger.Errorw("bad-of-header",
254 log.Fields{
255 "byte-count": read,
256 "device-id": ofc.DeviceID,
257 "controller": ofc.OFControllerEndPoint,
258 "error": err})
259 }
Jonathan Hart4b110f62020-03-13 17:36:19 -0700260 break top
261 }
262
263 // Decode the header
264 peek, err := ofc.peekAtOFHeader(headerBuf)
265 if err != nil {
266 /*
267 * Header is bad, assume stream is corrupted
268 * and needs to be restarted
269 */
270 logger.Errorw("bad-of-packet",
271 log.Fields{
272 "device-id": ofc.DeviceID,
273 "error": err})
274 break top
275 }
276
277 // Calculate the size of the rest of the packet and read it
278 need := int(peek.GetLength())
279 messageBuf := make([]byte, need)
280 copy(messageBuf, headerBuf)
281 read, err = io.ReadFull(fromController, messageBuf[8:])
282 if err != nil {
283 logger.Errorw("bad-of-packet",
284 log.Fields{
285 "byte-count": read,
286 "device-id": ofc.DeviceID,
287 "error": err})
288 break top
289 }
290
291 // Decode and process the packet
292 decoder := goloxi.NewDecoder(messageBuf)
293 msg, err := ofp.DecodeHeader(decoder)
294 if err != nil {
295 // nolint: staticcheck
296 js, _ := json.Marshal(decoder)
297 logger.Errorw("failed-to-decode",
298 log.Fields{
299 "device-id": ofc.DeviceID,
300 "decoder": js,
301 "error": err})
302 break top
303 }
304 if logger.V(log.DebugLevel) {
305 js, _ := json.Marshal(msg)
306 logger.Debugw("packet-header",
307 log.Fields{
308 "device-id": ofc.DeviceID,
309 "header": js})
310 }
Andrea Campanella3d955ef2020-06-19 15:29:55 +0200311 /*
312 * Spawning a go routine for every incoming message removes processing ordering guarantees.
313 * Removing such guarantees puts burden on the controller to ensure the correct ordering of
314 * incoming messages and is a less optimal and safe agent implementation.
315 * This is OK for now because ONOS keeps the order guaranteed but the agent needs to avoid
316 * relying on external fairness. Particular care and attention has to be placed in flow add/delete
317 * and relative barrier requests. e.g. a flowMod will be handled in thread different from a barrier,
318 * with no guarantees of handling all messages before a barrier.
319 * A multiple queue (incoming worker and outgoing) is a possible solution.
320 */
321 go ofc.parseHeader(msg)
Jonathan Hart4b110f62020-03-13 17:36:19 -0700322 }
323 }
324 logger.Debugw("end-of-stream",
325 log.Fields{"device-id": ofc.DeviceID})
326 ofc.events <- ofcEventDisconnect
327}
328
329func (ofc *OFConnection) sayHello() {
330 hello := ofp.NewHello()
331 hello.Xid = uint32(GetXid())
332 elem := ofp.NewHelloElemVersionbitmap()
333 elem.SetType(ofp.OFPHETVersionbitmap)
334 elem.SetLength(8)
335 elem.SetBitmaps([]*ofp.Uint32{{Value: 16}})
336 hello.SetElements([]ofp.IHelloElem{elem})
337 if logger.V(log.DebugLevel) {
338 js, _ := json.Marshal(hello)
339 logger.Debugw("sayHello Called",
340 log.Fields{
341 "device-id": ofc.DeviceID,
342 "hello-message": js})
343 }
344 if err := ofc.SendMessage(hello); err != nil {
345 logger.Fatalw("Failed saying hello to Openflow Server, unable to proceed",
346 log.Fields{
347 "device-id": ofc.DeviceID,
348 "error": err})
349 }
350}
351
352func (ofc *OFConnection) parseHeader(header ofp.IHeader) {
353 headerType := header.GetType()
354 logger.Debugw("packet-header-type",
355 log.Fields{
356 "header-type": ofp.Type(headerType).String()})
357 switch headerType {
358 case ofp.OFPTHello:
359 //x := header.(*ofp.Hello)
360 case ofp.OFPTError:
361 go ofc.handleErrMsg(header.(*ofp.ErrorMsg))
362 case ofp.OFPTEchoRequest:
363 go ofc.handleEchoRequest(header.(*ofp.EchoRequest))
364 case ofp.OFPTEchoReply:
365 case ofp.OFPTExperimenter:
366 case ofp.OFPTFeaturesRequest:
367 go func() {
368 if err := ofc.handleFeatureRequest(header.(*ofp.FeaturesRequest)); err != nil {
369 logger.Errorw("handle-feature-request", log.Fields{"error": err})
370 }
371 }()
372 case ofp.OFPTFeaturesReply:
373 case ofp.OFPTGetConfigRequest:
374 go ofc.handleGetConfigRequest(header.(*ofp.GetConfigRequest))
375 case ofp.OFPTGetConfigReply:
376 case ofp.OFPTSetConfig:
377 go ofc.handleSetConfig(header.(*ofp.SetConfig))
378 case ofp.OFPTPacketIn:
379 case ofp.OFPTFlowRemoved:
380 case ofp.OFPTPortStatus:
381 case ofp.OFPTPacketOut:
382 if !(ofc.role == ofcRoleMaster || ofc.role == ofcRoleEqual) {
383 ofc.sendRoleSlaveError(header)
384 return
385 }
386 go ofc.handlePacketOut(header.(*ofp.PacketOut))
387 case ofp.OFPTFlowMod:
388 if !(ofc.role == ofcRoleMaster || ofc.role == ofcRoleEqual) {
389 ofc.sendRoleSlaveError(header)
390 return
391 }
Jonathan Hart4b110f62020-03-13 17:36:19 -0700392 switch header.(ofp.IFlowMod).GetCommand() {
393 case ofp.OFPFCAdd:
394 ofc.handleFlowAdd(header.(*ofp.FlowAdd))
395 case ofp.OFPFCModify:
396 ofc.handleFlowMod(header.(*ofp.FlowMod))
397 case ofp.OFPFCModifyStrict:
398 ofc.handleFlowModStrict(header.(*ofp.FlowModifyStrict))
399 case ofp.OFPFCDelete:
400 ofc.handleFlowDelete(header.(*ofp.FlowDelete))
401 case ofp.OFPFCDeleteStrict:
402 ofc.handleFlowDeleteStrict(header.(*ofp.FlowDeleteStrict))
403 }
404 case ofp.OFPTStatsRequest:
405 go func() {
406 if err := ofc.handleStatsRequest(header, header.(ofp.IStatsRequest).GetStatsType()); err != nil {
407 logger.Errorw("ofpt-stats-request", log.Fields{"error": err})
408 }
409 }()
410 case ofp.OFPTBarrierRequest:
411 /* See note above at case ofp.OFPTFlowMod:*/
412 ofc.handleBarrierRequest(header.(*ofp.BarrierRequest))
413 case ofp.OFPTRoleRequest:
414 go ofc.handleRoleRequest(header.(*ofp.RoleRequest))
415 case ofp.OFPTMeterMod:
416 if !(ofc.role == ofcRoleMaster || ofc.role == ofcRoleEqual) {
417 ofc.sendRoleSlaveError(header)
418 return
419 }
420 ofc.handleMeterModRequest(header.(*ofp.MeterMod))
421 case ofp.OFPTGroupMod:
422 if !(ofc.role == ofcRoleMaster || ofc.role == ofcRoleEqual) {
423 ofc.sendRoleSlaveError(header)
424 return
425 }
Jonathan Hart60c5d772020-03-30 18:28:40 -0700426 ofc.handleGroupMod(header.(ofp.IGroupMod))
Jonathan Hart4b110f62020-03-13 17:36:19 -0700427 }
428}
429
430// Message interface that represents an open flow message and enables for a
431// unified implementation of SendMessage
432type Message interface {
433 Serialize(encoder *goloxi.Encoder) error
434}
435
436func (ofc *OFConnection) doSend(msg Message) error {
437 if ofc.conn == nil {
438 return errors.New("no-connection")
439 }
440 enc := goloxi.NewEncoder()
441 if err := msg.Serialize(enc); err != nil {
442 return err
443 }
444
445 bytes := enc.Bytes()
446 if _, err := ofc.conn.Write(bytes); err != nil {
447 logger.Errorw("unable-to-send-message-to-controller",
448 log.Fields{
449 "device-id": ofc.DeviceID,
450 "message": msg,
451 "error": err})
452 return err
453 }
454 return nil
455}
456
457func (ofc *OFConnection) messageSender(ctx context.Context) {
458 // first process last fail if it exists
459 if ofc.lastUnsentMessage != nil {
460 if err := ofc.doSend(ofc.lastUnsentMessage); err != nil {
461 ofc.events <- ofcEventDisconnect
462 return
463 }
464 ofc.lastUnsentMessage = nil
465 }
466top:
467 for {
468 select {
469 case <-ctx.Done():
470 break top
471 case msg := <-ofc.sendChannel:
472 if err := ofc.doSend(msg); err != nil {
473 ofc.lastUnsentMessage = msg
474 ofc.events <- ofcEventDisconnect
475 logger.Debugw("message-sender-error",
476 log.Fields{
477 "device-id": ofc.DeviceID,
478 "error": err.Error()})
479 break top
480 }
481 logger.Debugw("message-sender-send",
482 log.Fields{
483 "device-id": ofc.DeviceID})
484 ofc.lastUnsentMessage = nil
485 }
486 }
487
488 logger.Debugw("message-sender-finished",
489 log.Fields{
490 "device-id": ofc.DeviceID})
491}
492
493// SendMessage queues a message to be sent to the openflow controller
494func (ofc *OFConnection) SendMessage(message Message) error {
495 logger.Debug("queuing-message")
496 ofc.sendChannel <- message
497 return nil
498}