blob: 7ad609200cb20b63485911af62ada1509a5605b3 [file] [log] [blame]
Jonathan Hart4b110f62020-03-13 17:36:19 -07001/*
2 Copyright 2020 the original author or authors.
3
4 Licensed under the Apache License, Version 2.0 (the "License");
5 you may not use this file except in compliance with the License.
6 You may obtain a copy of the License at
7
8 http://www.apache.org/licenses/LICENSE-2.0
9
10 Unless required by applicable law or agreed to in writing, software
11 distributed under the License is distributed on an "AS IS" BASIS,
12 WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 See the License for the specific language governing permissions and
14 limitations under the License.
15*/
16package openflow
17
18import (
19 "bufio"
20 "context"
21 "encoding/binary"
22 "encoding/json"
23 "errors"
Jonathan Hart4b110f62020-03-13 17:36:19 -070024 "io"
25 "net"
26 "time"
David Bainbridgef8ce7d22020-04-08 12:49:41 -070027
Jonathan Hart828908c2020-04-15 14:23:45 -070028 "github.com/opencord/goloxi"
29 ofp "github.com/opencord/goloxi/of13"
David Bainbridgef8ce7d22020-04-08 12:49:41 -070030 "github.com/opencord/ofagent-go/internal/pkg/holder"
31 "github.com/opencord/voltha-lib-go/v3/pkg/log"
32 "github.com/opencord/voltha-protos/v3/go/voltha"
Jonathan Hart4b110f62020-03-13 17:36:19 -070033)
34
35type OFConnection struct {
36 OFControllerEndPoint string
37 DeviceID string
David Bainbridgef8ce7d22020-04-08 12:49:41 -070038 VolthaClient *holder.VolthaServiceClientHolder
Jonathan Hart4b110f62020-03-13 17:36:19 -070039 PacketOutChannel chan *voltha.PacketOut
40 ConnectionMaxRetries int
41 ConnectionRetryDelay time.Duration
42
43 conn net.Conn
44
45 // current role of this connection
46 role ofcRole
47 roleManager RoleManager
48
49 events chan ofcEvent
50 sendChannel chan Message
51 lastUnsentMessage Message
Matteo Scandolo256266d2020-06-01 13:44:07 -070052
53 flowsChunkSize int
54 portsChunkSize int
55 portsDescChunkSize int
Jonathan Hart4b110f62020-03-13 17:36:19 -070056}
57
58func (ofc *OFConnection) peekAtOFHeader(buf []byte) (ofp.IHeader, error) {
59 header := ofp.Header{}
60 header.Version = uint8(buf[0])
61 header.Type = uint8(buf[1])
62 header.Length = binary.BigEndian.Uint16(buf[2:4])
63 header.Xid = binary.BigEndian.Uint32(buf[4:8])
64
65 // TODO: add minimal validation of version and type
66
67 return &header, nil
68}
69
70func (ofc *OFConnection) establishConnectionToController() error {
71 if ofc.conn != nil {
72 logger.Debugw("closing-of-connection-to-reconnect",
73 log.Fields{"device-id": ofc.DeviceID})
74 ofc.conn.Close()
75 ofc.conn = nil
76 }
77 try := 1
78 for ofc.ConnectionMaxRetries == 0 || try < ofc.ConnectionMaxRetries {
79 if raddr, err := net.ResolveTCPAddr("tcp", ofc.OFControllerEndPoint); err != nil {
80 logger.Debugw("openflow-client unable to resolve endpoint",
81 log.Fields{
82 "device-id": ofc.DeviceID,
83 "endpoint": ofc.OFControllerEndPoint})
84 } else {
85 if connection, err := net.DialTCP("tcp", nil, raddr); err == nil {
86 ofc.conn = connection
87 ofc.sayHello()
88 ofc.events <- ofcEventConnect
89 return nil
90 } else {
91 logger.Warnw("openflow-client-connect-error",
92 log.Fields{
93 "device-id": ofc.DeviceID,
94 "endpoint": ofc.OFControllerEndPoint})
95 }
96 }
97 if ofc.ConnectionMaxRetries == 0 || try < ofc.ConnectionMaxRetries {
98 if ofc.ConnectionMaxRetries != 0 {
99 try += 1
100 }
101 time.Sleep(ofc.ConnectionRetryDelay)
102 }
103 }
104 return errors.New("failed-to-connect-to-of-controller")
105}
106
107// Run implements the state machine for the OF client reacting to state change
108// events and invoking actions as a reaction to those state changes
109func (ofc *OFConnection) Run(ctx context.Context) {
110
111 var ofCtx context.Context
112 var ofDone func()
113 state := ofcStateCreated
114 ofc.events <- ofcEventStart
115top:
116 for {
117 select {
118 case <-ctx.Done():
119 state = ofcStateStopped
120 logger.Debugw("state-transition-context-done",
121 log.Fields{"device-id": ofc.DeviceID})
122 break top
123 case event := <-ofc.events:
124 previous := state
125 switch event {
126 case ofcEventStart:
127 logger.Debugw("ofc-event-start",
128 log.Fields{"device-id": ofc.DeviceID})
129 if state == ofcStateCreated {
130 state = ofcStateStarted
131 logger.Debug("STARTED MORE THAN ONCE")
132 go func() {
133 if err := ofc.establishConnectionToController(); err != nil {
134 logger.Errorw("controller-connection-failed", log.Fields{"error": err})
135 panic(err)
136 }
137 }()
138 } else {
139 logger.Errorw("illegal-state-transition",
140 log.Fields{
141 "device-id": ofc.DeviceID,
142 "current-state": state.String(),
143 "event": event.String()})
144 }
145 case ofcEventConnect:
146 logger.Debugw("ofc-event-connected",
147 log.Fields{"device-id": ofc.DeviceID})
148 if state == ofcStateStarted || state == ofcStateDisconnected {
149 state = ofcStateConnected
150 ofCtx, ofDone = context.WithCancel(context.Background())
151 go ofc.messageSender(ofCtx)
152 go ofc.processOFStream(ofCtx)
153 } else {
154 logger.Errorw("illegal-state-transition",
155 log.Fields{
156 "device-id": ofc.DeviceID,
157 "current-state": state.String(),
158 "event": event.String()})
159 }
160 case ofcEventDisconnect:
161 logger.Debugw("ofc-event-disconnected",
162 log.Fields{
163 "device-id": ofc.DeviceID,
164 "state": state.String()})
165 if state == ofcStateConnected {
166 state = ofcStateDisconnected
167 if ofDone != nil {
168 ofDone()
169 ofDone = nil
170 }
171 go func() {
172 if err := ofc.establishConnectionToController(); err != nil {
173 log.Errorw("controller-connection-failed", log.Fields{"error": err})
174 panic(err)
175 }
176 }()
177 } else {
178 logger.Errorw("illegal-state-transition",
179 log.Fields{
180 "device-id": ofc.DeviceID,
181 "current-state": state.String(),
182 "event": event.String()})
183 }
184 case ofcEventStop:
185 logger.Debugw("ofc-event-stop",
186 log.Fields{"device-id": ofc.DeviceID})
187 if state == ofcStateCreated || state == ofcStateConnected || state == ofcStateDisconnected {
188 state = ofcStateStopped
189 break top
190 } else {
191 logger.Errorw("illegal-state-transition",
192 log.Fields{
193 "device-id": ofc.DeviceID,
194 "current-state": state.String(),
195 "event": event.String()})
196 }
197 }
198 logger.Debugw("state-transition",
199 log.Fields{
200 "device-id": ofc.DeviceID,
201 "previous-state": previous.String(),
202 "current-state": state.String(),
203 "event": event.String()})
204 }
205 }
206
207 // If the child context exists, then cancel it
208 if ofDone != nil {
209 log.Debugw("closing-child-processes",
210 log.Fields{"device-id": ofc.DeviceID})
211 ofDone()
212 }
213
214 // If the connection is open, then close it
215 if ofc.conn != nil {
216 log.Debugw("closing-of-connection",
217 log.Fields{"device-id": ofc.DeviceID})
218 ofc.conn.Close()
219 ofc.conn = nil
220 }
221 log.Debugw("state-machine-finished",
222 log.Fields{"device-id": ofc.DeviceID})
223}
224
225// processOFStream processes the OF connection from the controller and invokes
226// the appropriate handler methods for each message.
227func (ofc *OFConnection) processOFStream(ctx context.Context) {
228 fromController := bufio.NewReader(ofc.conn)
229
230 /*
231 * We have a read buffer of a max size of 4096, so if we ever have
232 * a message larger than this then we will have issues
233 */
234 headerBuf := make([]byte, 8)
235
236top:
237 // Continue until we are told to stop
238 for {
239 select {
240 case <-ctx.Done():
241 logger.Error("of-loop-ending-context-done")
242 break top
243 default:
244 // Read 8 bytes, the standard OF header
245 read, err := io.ReadFull(fromController, headerBuf)
246 if err != nil {
Jonathan Hart828908c2020-04-15 14:23:45 -0700247 if err == io.EOF {
248 logger.Infow("controller-disconnected",
249 log.Fields{
250 "device-id": ofc.DeviceID,
251 "controller": ofc.OFControllerEndPoint,
252 })
253 } else {
254 logger.Errorw("bad-of-header",
255 log.Fields{
256 "byte-count": read,
257 "device-id": ofc.DeviceID,
258 "controller": ofc.OFControllerEndPoint,
259 "error": err})
260 }
Jonathan Hart4b110f62020-03-13 17:36:19 -0700261 break top
262 }
263
264 // Decode the header
265 peek, err := ofc.peekAtOFHeader(headerBuf)
266 if err != nil {
267 /*
268 * Header is bad, assume stream is corrupted
269 * and needs to be restarted
270 */
271 logger.Errorw("bad-of-packet",
272 log.Fields{
273 "device-id": ofc.DeviceID,
274 "error": err})
275 break top
276 }
277
278 // Calculate the size of the rest of the packet and read it
279 need := int(peek.GetLength())
280 messageBuf := make([]byte, need)
281 copy(messageBuf, headerBuf)
282 read, err = io.ReadFull(fromController, messageBuf[8:])
283 if err != nil {
284 logger.Errorw("bad-of-packet",
285 log.Fields{
286 "byte-count": read,
287 "device-id": ofc.DeviceID,
288 "error": err})
289 break top
290 }
291
292 // Decode and process the packet
293 decoder := goloxi.NewDecoder(messageBuf)
294 msg, err := ofp.DecodeHeader(decoder)
295 if err != nil {
296 // nolint: staticcheck
297 js, _ := json.Marshal(decoder)
298 logger.Errorw("failed-to-decode",
299 log.Fields{
300 "device-id": ofc.DeviceID,
301 "decoder": js,
302 "error": err})
303 break top
304 }
305 if logger.V(log.DebugLevel) {
306 js, _ := json.Marshal(msg)
307 logger.Debugw("packet-header",
308 log.Fields{
309 "device-id": ofc.DeviceID,
310 "header": js})
311 }
312 ofc.parseHeader(msg)
313 }
314 }
315 logger.Debugw("end-of-stream",
316 log.Fields{"device-id": ofc.DeviceID})
317 ofc.events <- ofcEventDisconnect
318}
319
320func (ofc *OFConnection) sayHello() {
321 hello := ofp.NewHello()
322 hello.Xid = uint32(GetXid())
323 elem := ofp.NewHelloElemVersionbitmap()
324 elem.SetType(ofp.OFPHETVersionbitmap)
325 elem.SetLength(8)
326 elem.SetBitmaps([]*ofp.Uint32{{Value: 16}})
327 hello.SetElements([]ofp.IHelloElem{elem})
328 if logger.V(log.DebugLevel) {
329 js, _ := json.Marshal(hello)
330 logger.Debugw("sayHello Called",
331 log.Fields{
332 "device-id": ofc.DeviceID,
333 "hello-message": js})
334 }
335 if err := ofc.SendMessage(hello); err != nil {
336 logger.Fatalw("Failed saying hello to Openflow Server, unable to proceed",
337 log.Fields{
338 "device-id": ofc.DeviceID,
339 "error": err})
340 }
341}
342
343func (ofc *OFConnection) parseHeader(header ofp.IHeader) {
344 headerType := header.GetType()
345 logger.Debugw("packet-header-type",
346 log.Fields{
347 "header-type": ofp.Type(headerType).String()})
348 switch headerType {
349 case ofp.OFPTHello:
350 //x := header.(*ofp.Hello)
351 case ofp.OFPTError:
352 go ofc.handleErrMsg(header.(*ofp.ErrorMsg))
353 case ofp.OFPTEchoRequest:
354 go ofc.handleEchoRequest(header.(*ofp.EchoRequest))
355 case ofp.OFPTEchoReply:
356 case ofp.OFPTExperimenter:
357 case ofp.OFPTFeaturesRequest:
358 go func() {
359 if err := ofc.handleFeatureRequest(header.(*ofp.FeaturesRequest)); err != nil {
360 logger.Errorw("handle-feature-request", log.Fields{"error": err})
361 }
362 }()
363 case ofp.OFPTFeaturesReply:
364 case ofp.OFPTGetConfigRequest:
365 go ofc.handleGetConfigRequest(header.(*ofp.GetConfigRequest))
366 case ofp.OFPTGetConfigReply:
367 case ofp.OFPTSetConfig:
368 go ofc.handleSetConfig(header.(*ofp.SetConfig))
369 case ofp.OFPTPacketIn:
370 case ofp.OFPTFlowRemoved:
371 case ofp.OFPTPortStatus:
372 case ofp.OFPTPacketOut:
373 if !(ofc.role == ofcRoleMaster || ofc.role == ofcRoleEqual) {
374 ofc.sendRoleSlaveError(header)
375 return
376 }
377 go ofc.handlePacketOut(header.(*ofp.PacketOut))
378 case ofp.OFPTFlowMod:
379 if !(ofc.role == ofcRoleMaster || ofc.role == ofcRoleEqual) {
380 ofc.sendRoleSlaveError(header)
381 return
382 }
383 /*
384 * Not using go routine to handle flow* messages or barrier requests
385 * onos typically issues barrier requests just before a flow* message.
386 * by handling in this thread I ensure all flow* are handled when barrier
387 * request is issued.
388 */
389 switch header.(ofp.IFlowMod).GetCommand() {
390 case ofp.OFPFCAdd:
391 ofc.handleFlowAdd(header.(*ofp.FlowAdd))
392 case ofp.OFPFCModify:
393 ofc.handleFlowMod(header.(*ofp.FlowMod))
394 case ofp.OFPFCModifyStrict:
395 ofc.handleFlowModStrict(header.(*ofp.FlowModifyStrict))
396 case ofp.OFPFCDelete:
397 ofc.handleFlowDelete(header.(*ofp.FlowDelete))
398 case ofp.OFPFCDeleteStrict:
399 ofc.handleFlowDeleteStrict(header.(*ofp.FlowDeleteStrict))
400 }
401 case ofp.OFPTStatsRequest:
402 go func() {
403 if err := ofc.handleStatsRequest(header, header.(ofp.IStatsRequest).GetStatsType()); err != nil {
404 logger.Errorw("ofpt-stats-request", log.Fields{"error": err})
405 }
406 }()
407 case ofp.OFPTBarrierRequest:
408 /* See note above at case ofp.OFPTFlowMod:*/
409 ofc.handleBarrierRequest(header.(*ofp.BarrierRequest))
410 case ofp.OFPTRoleRequest:
411 go ofc.handleRoleRequest(header.(*ofp.RoleRequest))
412 case ofp.OFPTMeterMod:
413 if !(ofc.role == ofcRoleMaster || ofc.role == ofcRoleEqual) {
414 ofc.sendRoleSlaveError(header)
415 return
416 }
417 ofc.handleMeterModRequest(header.(*ofp.MeterMod))
418 case ofp.OFPTGroupMod:
419 if !(ofc.role == ofcRoleMaster || ofc.role == ofcRoleEqual) {
420 ofc.sendRoleSlaveError(header)
421 return
422 }
Jonathan Hart60c5d772020-03-30 18:28:40 -0700423 ofc.handleGroupMod(header.(ofp.IGroupMod))
Jonathan Hart4b110f62020-03-13 17:36:19 -0700424 }
425}
426
427// Message interface that represents an open flow message and enables for a
428// unified implementation of SendMessage
429type Message interface {
430 Serialize(encoder *goloxi.Encoder) error
431}
432
433func (ofc *OFConnection) doSend(msg Message) error {
434 if ofc.conn == nil {
435 return errors.New("no-connection")
436 }
437 enc := goloxi.NewEncoder()
438 if err := msg.Serialize(enc); err != nil {
439 return err
440 }
441
442 bytes := enc.Bytes()
443 if _, err := ofc.conn.Write(bytes); err != nil {
444 logger.Errorw("unable-to-send-message-to-controller",
445 log.Fields{
446 "device-id": ofc.DeviceID,
447 "message": msg,
448 "error": err})
449 return err
450 }
451 return nil
452}
453
454func (ofc *OFConnection) messageSender(ctx context.Context) {
455 // first process last fail if it exists
456 if ofc.lastUnsentMessage != nil {
457 if err := ofc.doSend(ofc.lastUnsentMessage); err != nil {
458 ofc.events <- ofcEventDisconnect
459 return
460 }
461 ofc.lastUnsentMessage = nil
462 }
463top:
464 for {
465 select {
466 case <-ctx.Done():
467 break top
468 case msg := <-ofc.sendChannel:
469 if err := ofc.doSend(msg); err != nil {
470 ofc.lastUnsentMessage = msg
471 ofc.events <- ofcEventDisconnect
472 logger.Debugw("message-sender-error",
473 log.Fields{
474 "device-id": ofc.DeviceID,
475 "error": err.Error()})
476 break top
477 }
478 logger.Debugw("message-sender-send",
479 log.Fields{
480 "device-id": ofc.DeviceID})
481 ofc.lastUnsentMessage = nil
482 }
483 }
484
485 logger.Debugw("message-sender-finished",
486 log.Fields{
487 "device-id": ofc.DeviceID})
488}
489
490// SendMessage queues a message to be sent to the openflow controller
491func (ofc *OFConnection) SendMessage(message Message) error {
492 logger.Debug("queuing-message")
493 ofc.sendChannel <- message
494 return nil
495}