blob: 209990d25fdf2f0cc41828a7f2db42f5325eb969 [file] [log] [blame]
Jonathan Hart4b110f62020-03-13 17:36:19 -07001/*
2 Copyright 2020 the original author or authors.
3
4 Licensed under the Apache License, Version 2.0 (the "License");
5 you may not use this file except in compliance with the License.
6 You may obtain a copy of the License at
7
8 http://www.apache.org/licenses/LICENSE-2.0
9
10 Unless required by applicable law or agreed to in writing, software
11 distributed under the License is distributed on an "AS IS" BASIS,
12 WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 See the License for the specific language governing permissions and
14 limitations under the License.
15*/
16package openflow
17
18import (
19 "bufio"
20 "context"
21 "encoding/binary"
22 "encoding/json"
23 "errors"
Jonathan Hart4b110f62020-03-13 17:36:19 -070024 "io"
25 "net"
26 "time"
David Bainbridgef8ce7d22020-04-08 12:49:41 -070027
Jonathan Hart828908c2020-04-15 14:23:45 -070028 "github.com/opencord/goloxi"
29 ofp "github.com/opencord/goloxi/of13"
David Bainbridgef8ce7d22020-04-08 12:49:41 -070030 "github.com/opencord/ofagent-go/internal/pkg/holder"
31 "github.com/opencord/voltha-lib-go/v3/pkg/log"
32 "github.com/opencord/voltha-protos/v3/go/voltha"
Jonathan Hart4b110f62020-03-13 17:36:19 -070033)
34
35type OFConnection struct {
36 OFControllerEndPoint string
37 DeviceID string
David Bainbridgef8ce7d22020-04-08 12:49:41 -070038 VolthaClient *holder.VolthaServiceClientHolder
Jonathan Hart4b110f62020-03-13 17:36:19 -070039 PacketOutChannel chan *voltha.PacketOut
40 ConnectionMaxRetries int
41 ConnectionRetryDelay time.Duration
42
43 conn net.Conn
44
45 // current role of this connection
46 role ofcRole
47 roleManager RoleManager
48
49 events chan ofcEvent
50 sendChannel chan Message
51 lastUnsentMessage Message
52}
53
54func (ofc *OFConnection) peekAtOFHeader(buf []byte) (ofp.IHeader, error) {
55 header := ofp.Header{}
56 header.Version = uint8(buf[0])
57 header.Type = uint8(buf[1])
58 header.Length = binary.BigEndian.Uint16(buf[2:4])
59 header.Xid = binary.BigEndian.Uint32(buf[4:8])
60
61 // TODO: add minimal validation of version and type
62
63 return &header, nil
64}
65
66func (ofc *OFConnection) establishConnectionToController() error {
67 if ofc.conn != nil {
68 logger.Debugw("closing-of-connection-to-reconnect",
69 log.Fields{"device-id": ofc.DeviceID})
70 ofc.conn.Close()
71 ofc.conn = nil
72 }
73 try := 1
74 for ofc.ConnectionMaxRetries == 0 || try < ofc.ConnectionMaxRetries {
75 if raddr, err := net.ResolveTCPAddr("tcp", ofc.OFControllerEndPoint); err != nil {
76 logger.Debugw("openflow-client unable to resolve endpoint",
77 log.Fields{
78 "device-id": ofc.DeviceID,
79 "endpoint": ofc.OFControllerEndPoint})
80 } else {
81 if connection, err := net.DialTCP("tcp", nil, raddr); err == nil {
82 ofc.conn = connection
83 ofc.sayHello()
84 ofc.events <- ofcEventConnect
85 return nil
86 } else {
87 logger.Warnw("openflow-client-connect-error",
88 log.Fields{
89 "device-id": ofc.DeviceID,
90 "endpoint": ofc.OFControllerEndPoint})
91 }
92 }
93 if ofc.ConnectionMaxRetries == 0 || try < ofc.ConnectionMaxRetries {
94 if ofc.ConnectionMaxRetries != 0 {
95 try += 1
96 }
97 time.Sleep(ofc.ConnectionRetryDelay)
98 }
99 }
100 return errors.New("failed-to-connect-to-of-controller")
101}
102
103// Run implements the state machine for the OF client reacting to state change
104// events and invoking actions as a reaction to those state changes
105func (ofc *OFConnection) Run(ctx context.Context) {
106
107 var ofCtx context.Context
108 var ofDone func()
109 state := ofcStateCreated
110 ofc.events <- ofcEventStart
111top:
112 for {
113 select {
114 case <-ctx.Done():
115 state = ofcStateStopped
116 logger.Debugw("state-transition-context-done",
117 log.Fields{"device-id": ofc.DeviceID})
118 break top
119 case event := <-ofc.events:
120 previous := state
121 switch event {
122 case ofcEventStart:
123 logger.Debugw("ofc-event-start",
124 log.Fields{"device-id": ofc.DeviceID})
125 if state == ofcStateCreated {
126 state = ofcStateStarted
127 logger.Debug("STARTED MORE THAN ONCE")
128 go func() {
129 if err := ofc.establishConnectionToController(); err != nil {
130 logger.Errorw("controller-connection-failed", log.Fields{"error": err})
131 panic(err)
132 }
133 }()
134 } else {
135 logger.Errorw("illegal-state-transition",
136 log.Fields{
137 "device-id": ofc.DeviceID,
138 "current-state": state.String(),
139 "event": event.String()})
140 }
141 case ofcEventConnect:
142 logger.Debugw("ofc-event-connected",
143 log.Fields{"device-id": ofc.DeviceID})
144 if state == ofcStateStarted || state == ofcStateDisconnected {
145 state = ofcStateConnected
146 ofCtx, ofDone = context.WithCancel(context.Background())
147 go ofc.messageSender(ofCtx)
148 go ofc.processOFStream(ofCtx)
149 } else {
150 logger.Errorw("illegal-state-transition",
151 log.Fields{
152 "device-id": ofc.DeviceID,
153 "current-state": state.String(),
154 "event": event.String()})
155 }
156 case ofcEventDisconnect:
157 logger.Debugw("ofc-event-disconnected",
158 log.Fields{
159 "device-id": ofc.DeviceID,
160 "state": state.String()})
161 if state == ofcStateConnected {
162 state = ofcStateDisconnected
163 if ofDone != nil {
164 ofDone()
165 ofDone = nil
166 }
167 go func() {
168 if err := ofc.establishConnectionToController(); err != nil {
169 log.Errorw("controller-connection-failed", log.Fields{"error": err})
170 panic(err)
171 }
172 }()
173 } else {
174 logger.Errorw("illegal-state-transition",
175 log.Fields{
176 "device-id": ofc.DeviceID,
177 "current-state": state.String(),
178 "event": event.String()})
179 }
180 case ofcEventStop:
181 logger.Debugw("ofc-event-stop",
182 log.Fields{"device-id": ofc.DeviceID})
183 if state == ofcStateCreated || state == ofcStateConnected || state == ofcStateDisconnected {
184 state = ofcStateStopped
185 break top
186 } else {
187 logger.Errorw("illegal-state-transition",
188 log.Fields{
189 "device-id": ofc.DeviceID,
190 "current-state": state.String(),
191 "event": event.String()})
192 }
193 }
194 logger.Debugw("state-transition",
195 log.Fields{
196 "device-id": ofc.DeviceID,
197 "previous-state": previous.String(),
198 "current-state": state.String(),
199 "event": event.String()})
200 }
201 }
202
203 // If the child context exists, then cancel it
204 if ofDone != nil {
205 log.Debugw("closing-child-processes",
206 log.Fields{"device-id": ofc.DeviceID})
207 ofDone()
208 }
209
210 // If the connection is open, then close it
211 if ofc.conn != nil {
212 log.Debugw("closing-of-connection",
213 log.Fields{"device-id": ofc.DeviceID})
214 ofc.conn.Close()
215 ofc.conn = nil
216 }
217 log.Debugw("state-machine-finished",
218 log.Fields{"device-id": ofc.DeviceID})
219}
220
221// processOFStream processes the OF connection from the controller and invokes
222// the appropriate handler methods for each message.
223func (ofc *OFConnection) processOFStream(ctx context.Context) {
224 fromController := bufio.NewReader(ofc.conn)
225
226 /*
227 * We have a read buffer of a max size of 4096, so if we ever have
228 * a message larger than this then we will have issues
229 */
230 headerBuf := make([]byte, 8)
231
232top:
233 // Continue until we are told to stop
234 for {
235 select {
236 case <-ctx.Done():
237 logger.Error("of-loop-ending-context-done")
238 break top
239 default:
240 // Read 8 bytes, the standard OF header
241 read, err := io.ReadFull(fromController, headerBuf)
242 if err != nil {
Jonathan Hart828908c2020-04-15 14:23:45 -0700243 if err == io.EOF {
244 logger.Infow("controller-disconnected",
245 log.Fields{
246 "device-id": ofc.DeviceID,
247 "controller": ofc.OFControllerEndPoint,
248 })
249 } else {
250 logger.Errorw("bad-of-header",
251 log.Fields{
252 "byte-count": read,
253 "device-id": ofc.DeviceID,
254 "controller": ofc.OFControllerEndPoint,
255 "error": err})
256 }
Jonathan Hart4b110f62020-03-13 17:36:19 -0700257 break top
258 }
259
260 // Decode the header
261 peek, err := ofc.peekAtOFHeader(headerBuf)
262 if err != nil {
263 /*
264 * Header is bad, assume stream is corrupted
265 * and needs to be restarted
266 */
267 logger.Errorw("bad-of-packet",
268 log.Fields{
269 "device-id": ofc.DeviceID,
270 "error": err})
271 break top
272 }
273
274 // Calculate the size of the rest of the packet and read it
275 need := int(peek.GetLength())
276 messageBuf := make([]byte, need)
277 copy(messageBuf, headerBuf)
278 read, err = io.ReadFull(fromController, messageBuf[8:])
279 if err != nil {
280 logger.Errorw("bad-of-packet",
281 log.Fields{
282 "byte-count": read,
283 "device-id": ofc.DeviceID,
284 "error": err})
285 break top
286 }
287
288 // Decode and process the packet
289 decoder := goloxi.NewDecoder(messageBuf)
290 msg, err := ofp.DecodeHeader(decoder)
291 if err != nil {
292 // nolint: staticcheck
293 js, _ := json.Marshal(decoder)
294 logger.Errorw("failed-to-decode",
295 log.Fields{
296 "device-id": ofc.DeviceID,
297 "decoder": js,
298 "error": err})
299 break top
300 }
301 if logger.V(log.DebugLevel) {
302 js, _ := json.Marshal(msg)
303 logger.Debugw("packet-header",
304 log.Fields{
305 "device-id": ofc.DeviceID,
306 "header": js})
307 }
308 ofc.parseHeader(msg)
309 }
310 }
311 logger.Debugw("end-of-stream",
312 log.Fields{"device-id": ofc.DeviceID})
313 ofc.events <- ofcEventDisconnect
314}
315
316func (ofc *OFConnection) sayHello() {
317 hello := ofp.NewHello()
318 hello.Xid = uint32(GetXid())
319 elem := ofp.NewHelloElemVersionbitmap()
320 elem.SetType(ofp.OFPHETVersionbitmap)
321 elem.SetLength(8)
322 elem.SetBitmaps([]*ofp.Uint32{{Value: 16}})
323 hello.SetElements([]ofp.IHelloElem{elem})
324 if logger.V(log.DebugLevel) {
325 js, _ := json.Marshal(hello)
326 logger.Debugw("sayHello Called",
327 log.Fields{
328 "device-id": ofc.DeviceID,
329 "hello-message": js})
330 }
331 if err := ofc.SendMessage(hello); err != nil {
332 logger.Fatalw("Failed saying hello to Openflow Server, unable to proceed",
333 log.Fields{
334 "device-id": ofc.DeviceID,
335 "error": err})
336 }
337}
338
339func (ofc *OFConnection) parseHeader(header ofp.IHeader) {
340 headerType := header.GetType()
341 logger.Debugw("packet-header-type",
342 log.Fields{
343 "header-type": ofp.Type(headerType).String()})
344 switch headerType {
345 case ofp.OFPTHello:
346 //x := header.(*ofp.Hello)
347 case ofp.OFPTError:
348 go ofc.handleErrMsg(header.(*ofp.ErrorMsg))
349 case ofp.OFPTEchoRequest:
350 go ofc.handleEchoRequest(header.(*ofp.EchoRequest))
351 case ofp.OFPTEchoReply:
352 case ofp.OFPTExperimenter:
353 case ofp.OFPTFeaturesRequest:
354 go func() {
355 if err := ofc.handleFeatureRequest(header.(*ofp.FeaturesRequest)); err != nil {
356 logger.Errorw("handle-feature-request", log.Fields{"error": err})
357 }
358 }()
359 case ofp.OFPTFeaturesReply:
360 case ofp.OFPTGetConfigRequest:
361 go ofc.handleGetConfigRequest(header.(*ofp.GetConfigRequest))
362 case ofp.OFPTGetConfigReply:
363 case ofp.OFPTSetConfig:
364 go ofc.handleSetConfig(header.(*ofp.SetConfig))
365 case ofp.OFPTPacketIn:
366 case ofp.OFPTFlowRemoved:
367 case ofp.OFPTPortStatus:
368 case ofp.OFPTPacketOut:
369 if !(ofc.role == ofcRoleMaster || ofc.role == ofcRoleEqual) {
370 ofc.sendRoleSlaveError(header)
371 return
372 }
373 go ofc.handlePacketOut(header.(*ofp.PacketOut))
374 case ofp.OFPTFlowMod:
375 if !(ofc.role == ofcRoleMaster || ofc.role == ofcRoleEqual) {
376 ofc.sendRoleSlaveError(header)
377 return
378 }
379 /*
380 * Not using go routine to handle flow* messages or barrier requests
381 * onos typically issues barrier requests just before a flow* message.
382 * by handling in this thread I ensure all flow* are handled when barrier
383 * request is issued.
384 */
385 switch header.(ofp.IFlowMod).GetCommand() {
386 case ofp.OFPFCAdd:
387 ofc.handleFlowAdd(header.(*ofp.FlowAdd))
388 case ofp.OFPFCModify:
389 ofc.handleFlowMod(header.(*ofp.FlowMod))
390 case ofp.OFPFCModifyStrict:
391 ofc.handleFlowModStrict(header.(*ofp.FlowModifyStrict))
392 case ofp.OFPFCDelete:
393 ofc.handleFlowDelete(header.(*ofp.FlowDelete))
394 case ofp.OFPFCDeleteStrict:
395 ofc.handleFlowDeleteStrict(header.(*ofp.FlowDeleteStrict))
396 }
397 case ofp.OFPTStatsRequest:
398 go func() {
399 if err := ofc.handleStatsRequest(header, header.(ofp.IStatsRequest).GetStatsType()); err != nil {
400 logger.Errorw("ofpt-stats-request", log.Fields{"error": err})
401 }
402 }()
403 case ofp.OFPTBarrierRequest:
404 /* See note above at case ofp.OFPTFlowMod:*/
405 ofc.handleBarrierRequest(header.(*ofp.BarrierRequest))
406 case ofp.OFPTRoleRequest:
407 go ofc.handleRoleRequest(header.(*ofp.RoleRequest))
408 case ofp.OFPTMeterMod:
409 if !(ofc.role == ofcRoleMaster || ofc.role == ofcRoleEqual) {
410 ofc.sendRoleSlaveError(header)
411 return
412 }
413 ofc.handleMeterModRequest(header.(*ofp.MeterMod))
414 case ofp.OFPTGroupMod:
415 if !(ofc.role == ofcRoleMaster || ofc.role == ofcRoleEqual) {
416 ofc.sendRoleSlaveError(header)
417 return
418 }
Jonathan Hart60c5d772020-03-30 18:28:40 -0700419 ofc.handleGroupMod(header.(ofp.IGroupMod))
Jonathan Hart4b110f62020-03-13 17:36:19 -0700420 }
421}
422
423// Message interface that represents an open flow message and enables for a
424// unified implementation of SendMessage
425type Message interface {
426 Serialize(encoder *goloxi.Encoder) error
427}
428
429func (ofc *OFConnection) doSend(msg Message) error {
430 if ofc.conn == nil {
431 return errors.New("no-connection")
432 }
433 enc := goloxi.NewEncoder()
434 if err := msg.Serialize(enc); err != nil {
435 return err
436 }
437
438 bytes := enc.Bytes()
439 if _, err := ofc.conn.Write(bytes); err != nil {
440 logger.Errorw("unable-to-send-message-to-controller",
441 log.Fields{
442 "device-id": ofc.DeviceID,
443 "message": msg,
444 "error": err})
445 return err
446 }
447 return nil
448}
449
450func (ofc *OFConnection) messageSender(ctx context.Context) {
451 // first process last fail if it exists
452 if ofc.lastUnsentMessage != nil {
453 if err := ofc.doSend(ofc.lastUnsentMessage); err != nil {
454 ofc.events <- ofcEventDisconnect
455 return
456 }
457 ofc.lastUnsentMessage = nil
458 }
459top:
460 for {
461 select {
462 case <-ctx.Done():
463 break top
464 case msg := <-ofc.sendChannel:
465 if err := ofc.doSend(msg); err != nil {
466 ofc.lastUnsentMessage = msg
467 ofc.events <- ofcEventDisconnect
468 logger.Debugw("message-sender-error",
469 log.Fields{
470 "device-id": ofc.DeviceID,
471 "error": err.Error()})
472 break top
473 }
474 logger.Debugw("message-sender-send",
475 log.Fields{
476 "device-id": ofc.DeviceID})
477 ofc.lastUnsentMessage = nil
478 }
479 }
480
481 logger.Debugw("message-sender-finished",
482 log.Fields{
483 "device-id": ofc.DeviceID})
484}
485
486// SendMessage queues a message to be sent to the openflow controller
487func (ofc *OFConnection) SendMessage(message Message) error {
488 logger.Debug("queuing-message")
489 ofc.sendChannel <- message
490 return nil
491}