blob: 7d57d34cb7880f3ccc11265da72243ea7b2853fc [file] [log] [blame]
Jonathan Hart4b110f62020-03-13 17:36:19 -07001/*
2 Copyright 2020 the original author or authors.
3
4 Licensed under the Apache License, Version 2.0 (the "License");
5 you may not use this file except in compliance with the License.
6 You may obtain a copy of the License at
7
8 http://www.apache.org/licenses/LICENSE-2.0
9
10 Unless required by applicable law or agreed to in writing, software
11 distributed under the License is distributed on an "AS IS" BASIS,
12 WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 See the License for the specific language governing permissions and
14 limitations under the License.
15*/
16package openflow
17
18import (
19 "bufio"
20 "context"
21 "encoding/binary"
22 "encoding/json"
23 "errors"
24 "github.com/donNewtonAlpha/goloxi"
25 ofp "github.com/donNewtonAlpha/goloxi/of13"
26 "github.com/opencord/voltha-lib-go/v3/pkg/log"
27 "github.com/opencord/voltha-protos/v3/go/voltha"
28 "io"
29 "net"
30 "time"
31)
32
33type OFConnection struct {
34 OFControllerEndPoint string
35 DeviceID string
36 VolthaClient voltha.VolthaServiceClient
37 PacketOutChannel chan *voltha.PacketOut
38 ConnectionMaxRetries int
39 ConnectionRetryDelay time.Duration
40
41 conn net.Conn
42
43 // current role of this connection
44 role ofcRole
45 roleManager RoleManager
46
47 events chan ofcEvent
48 sendChannel chan Message
49 lastUnsentMessage Message
50}
51
52func (ofc *OFConnection) peekAtOFHeader(buf []byte) (ofp.IHeader, error) {
53 header := ofp.Header{}
54 header.Version = uint8(buf[0])
55 header.Type = uint8(buf[1])
56 header.Length = binary.BigEndian.Uint16(buf[2:4])
57 header.Xid = binary.BigEndian.Uint32(buf[4:8])
58
59 // TODO: add minimal validation of version and type
60
61 return &header, nil
62}
63
64func (ofc *OFConnection) establishConnectionToController() error {
65 if ofc.conn != nil {
66 logger.Debugw("closing-of-connection-to-reconnect",
67 log.Fields{"device-id": ofc.DeviceID})
68 ofc.conn.Close()
69 ofc.conn = nil
70 }
71 try := 1
72 for ofc.ConnectionMaxRetries == 0 || try < ofc.ConnectionMaxRetries {
73 if raddr, err := net.ResolveTCPAddr("tcp", ofc.OFControllerEndPoint); err != nil {
74 logger.Debugw("openflow-client unable to resolve endpoint",
75 log.Fields{
76 "device-id": ofc.DeviceID,
77 "endpoint": ofc.OFControllerEndPoint})
78 } else {
79 if connection, err := net.DialTCP("tcp", nil, raddr); err == nil {
80 ofc.conn = connection
81 ofc.sayHello()
82 ofc.events <- ofcEventConnect
83 return nil
84 } else {
85 logger.Warnw("openflow-client-connect-error",
86 log.Fields{
87 "device-id": ofc.DeviceID,
88 "endpoint": ofc.OFControllerEndPoint})
89 }
90 }
91 if ofc.ConnectionMaxRetries == 0 || try < ofc.ConnectionMaxRetries {
92 if ofc.ConnectionMaxRetries != 0 {
93 try += 1
94 }
95 time.Sleep(ofc.ConnectionRetryDelay)
96 }
97 }
98 return errors.New("failed-to-connect-to-of-controller")
99}
100
101// Run implements the state machine for the OF client reacting to state change
102// events and invoking actions as a reaction to those state changes
103func (ofc *OFConnection) Run(ctx context.Context) {
104
105 var ofCtx context.Context
106 var ofDone func()
107 state := ofcStateCreated
108 ofc.events <- ofcEventStart
109top:
110 for {
111 select {
112 case <-ctx.Done():
113 state = ofcStateStopped
114 logger.Debugw("state-transition-context-done",
115 log.Fields{"device-id": ofc.DeviceID})
116 break top
117 case event := <-ofc.events:
118 previous := state
119 switch event {
120 case ofcEventStart:
121 logger.Debugw("ofc-event-start",
122 log.Fields{"device-id": ofc.DeviceID})
123 if state == ofcStateCreated {
124 state = ofcStateStarted
125 logger.Debug("STARTED MORE THAN ONCE")
126 go func() {
127 if err := ofc.establishConnectionToController(); err != nil {
128 logger.Errorw("controller-connection-failed", log.Fields{"error": err})
129 panic(err)
130 }
131 }()
132 } else {
133 logger.Errorw("illegal-state-transition",
134 log.Fields{
135 "device-id": ofc.DeviceID,
136 "current-state": state.String(),
137 "event": event.String()})
138 }
139 case ofcEventConnect:
140 logger.Debugw("ofc-event-connected",
141 log.Fields{"device-id": ofc.DeviceID})
142 if state == ofcStateStarted || state == ofcStateDisconnected {
143 state = ofcStateConnected
144 ofCtx, ofDone = context.WithCancel(context.Background())
145 go ofc.messageSender(ofCtx)
146 go ofc.processOFStream(ofCtx)
147 } else {
148 logger.Errorw("illegal-state-transition",
149 log.Fields{
150 "device-id": ofc.DeviceID,
151 "current-state": state.String(),
152 "event": event.String()})
153 }
154 case ofcEventDisconnect:
155 logger.Debugw("ofc-event-disconnected",
156 log.Fields{
157 "device-id": ofc.DeviceID,
158 "state": state.String()})
159 if state == ofcStateConnected {
160 state = ofcStateDisconnected
161 if ofDone != nil {
162 ofDone()
163 ofDone = nil
164 }
165 go func() {
166 if err := ofc.establishConnectionToController(); err != nil {
167 log.Errorw("controller-connection-failed", log.Fields{"error": err})
168 panic(err)
169 }
170 }()
171 } else {
172 logger.Errorw("illegal-state-transition",
173 log.Fields{
174 "device-id": ofc.DeviceID,
175 "current-state": state.String(),
176 "event": event.String()})
177 }
178 case ofcEventStop:
179 logger.Debugw("ofc-event-stop",
180 log.Fields{"device-id": ofc.DeviceID})
181 if state == ofcStateCreated || state == ofcStateConnected || state == ofcStateDisconnected {
182 state = ofcStateStopped
183 break top
184 } else {
185 logger.Errorw("illegal-state-transition",
186 log.Fields{
187 "device-id": ofc.DeviceID,
188 "current-state": state.String(),
189 "event": event.String()})
190 }
191 }
192 logger.Debugw("state-transition",
193 log.Fields{
194 "device-id": ofc.DeviceID,
195 "previous-state": previous.String(),
196 "current-state": state.String(),
197 "event": event.String()})
198 }
199 }
200
201 // If the child context exists, then cancel it
202 if ofDone != nil {
203 log.Debugw("closing-child-processes",
204 log.Fields{"device-id": ofc.DeviceID})
205 ofDone()
206 }
207
208 // If the connection is open, then close it
209 if ofc.conn != nil {
210 log.Debugw("closing-of-connection",
211 log.Fields{"device-id": ofc.DeviceID})
212 ofc.conn.Close()
213 ofc.conn = nil
214 }
215 log.Debugw("state-machine-finished",
216 log.Fields{"device-id": ofc.DeviceID})
217}
218
219// processOFStream processes the OF connection from the controller and invokes
220// the appropriate handler methods for each message.
221func (ofc *OFConnection) processOFStream(ctx context.Context) {
222 fromController := bufio.NewReader(ofc.conn)
223
224 /*
225 * We have a read buffer of a max size of 4096, so if we ever have
226 * a message larger than this then we will have issues
227 */
228 headerBuf := make([]byte, 8)
229
230top:
231 // Continue until we are told to stop
232 for {
233 select {
234 case <-ctx.Done():
235 logger.Error("of-loop-ending-context-done")
236 break top
237 default:
238 // Read 8 bytes, the standard OF header
239 read, err := io.ReadFull(fromController, headerBuf)
240 if err != nil {
241 logger.Errorw("bad-of-header",
242 log.Fields{
243 "byte-count": read,
244 "device-id": ofc.DeviceID,
245 "error": err})
246 break top
247 }
248
249 // Decode the header
250 peek, err := ofc.peekAtOFHeader(headerBuf)
251 if err != nil {
252 /*
253 * Header is bad, assume stream is corrupted
254 * and needs to be restarted
255 */
256 logger.Errorw("bad-of-packet",
257 log.Fields{
258 "device-id": ofc.DeviceID,
259 "error": err})
260 break top
261 }
262
263 // Calculate the size of the rest of the packet and read it
264 need := int(peek.GetLength())
265 messageBuf := make([]byte, need)
266 copy(messageBuf, headerBuf)
267 read, err = io.ReadFull(fromController, messageBuf[8:])
268 if err != nil {
269 logger.Errorw("bad-of-packet",
270 log.Fields{
271 "byte-count": read,
272 "device-id": ofc.DeviceID,
273 "error": err})
274 break top
275 }
276
277 // Decode and process the packet
278 decoder := goloxi.NewDecoder(messageBuf)
279 msg, err := ofp.DecodeHeader(decoder)
280 if err != nil {
281 // nolint: staticcheck
282 js, _ := json.Marshal(decoder)
283 logger.Errorw("failed-to-decode",
284 log.Fields{
285 "device-id": ofc.DeviceID,
286 "decoder": js,
287 "error": err})
288 break top
289 }
290 if logger.V(log.DebugLevel) {
291 js, _ := json.Marshal(msg)
292 logger.Debugw("packet-header",
293 log.Fields{
294 "device-id": ofc.DeviceID,
295 "header": js})
296 }
297 ofc.parseHeader(msg)
298 }
299 }
300 logger.Debugw("end-of-stream",
301 log.Fields{"device-id": ofc.DeviceID})
302 ofc.events <- ofcEventDisconnect
303}
304
305func (ofc *OFConnection) sayHello() {
306 hello := ofp.NewHello()
307 hello.Xid = uint32(GetXid())
308 elem := ofp.NewHelloElemVersionbitmap()
309 elem.SetType(ofp.OFPHETVersionbitmap)
310 elem.SetLength(8)
311 elem.SetBitmaps([]*ofp.Uint32{{Value: 16}})
312 hello.SetElements([]ofp.IHelloElem{elem})
313 if logger.V(log.DebugLevel) {
314 js, _ := json.Marshal(hello)
315 logger.Debugw("sayHello Called",
316 log.Fields{
317 "device-id": ofc.DeviceID,
318 "hello-message": js})
319 }
320 if err := ofc.SendMessage(hello); err != nil {
321 logger.Fatalw("Failed saying hello to Openflow Server, unable to proceed",
322 log.Fields{
323 "device-id": ofc.DeviceID,
324 "error": err})
325 }
326}
327
328func (ofc *OFConnection) parseHeader(header ofp.IHeader) {
329 headerType := header.GetType()
330 logger.Debugw("packet-header-type",
331 log.Fields{
332 "header-type": ofp.Type(headerType).String()})
333 switch headerType {
334 case ofp.OFPTHello:
335 //x := header.(*ofp.Hello)
336 case ofp.OFPTError:
337 go ofc.handleErrMsg(header.(*ofp.ErrorMsg))
338 case ofp.OFPTEchoRequest:
339 go ofc.handleEchoRequest(header.(*ofp.EchoRequest))
340 case ofp.OFPTEchoReply:
341 case ofp.OFPTExperimenter:
342 case ofp.OFPTFeaturesRequest:
343 go func() {
344 if err := ofc.handleFeatureRequest(header.(*ofp.FeaturesRequest)); err != nil {
345 logger.Errorw("handle-feature-request", log.Fields{"error": err})
346 }
347 }()
348 case ofp.OFPTFeaturesReply:
349 case ofp.OFPTGetConfigRequest:
350 go ofc.handleGetConfigRequest(header.(*ofp.GetConfigRequest))
351 case ofp.OFPTGetConfigReply:
352 case ofp.OFPTSetConfig:
353 go ofc.handleSetConfig(header.(*ofp.SetConfig))
354 case ofp.OFPTPacketIn:
355 case ofp.OFPTFlowRemoved:
356 case ofp.OFPTPortStatus:
357 case ofp.OFPTPacketOut:
358 if !(ofc.role == ofcRoleMaster || ofc.role == ofcRoleEqual) {
359 ofc.sendRoleSlaveError(header)
360 return
361 }
362 go ofc.handlePacketOut(header.(*ofp.PacketOut))
363 case ofp.OFPTFlowMod:
364 if !(ofc.role == ofcRoleMaster || ofc.role == ofcRoleEqual) {
365 ofc.sendRoleSlaveError(header)
366 return
367 }
368 /*
369 * Not using go routine to handle flow* messages or barrier requests
370 * onos typically issues barrier requests just before a flow* message.
371 * by handling in this thread I ensure all flow* are handled when barrier
372 * request is issued.
373 */
374 switch header.(ofp.IFlowMod).GetCommand() {
375 case ofp.OFPFCAdd:
376 ofc.handleFlowAdd(header.(*ofp.FlowAdd))
377 case ofp.OFPFCModify:
378 ofc.handleFlowMod(header.(*ofp.FlowMod))
379 case ofp.OFPFCModifyStrict:
380 ofc.handleFlowModStrict(header.(*ofp.FlowModifyStrict))
381 case ofp.OFPFCDelete:
382 ofc.handleFlowDelete(header.(*ofp.FlowDelete))
383 case ofp.OFPFCDeleteStrict:
384 ofc.handleFlowDeleteStrict(header.(*ofp.FlowDeleteStrict))
385 }
386 case ofp.OFPTStatsRequest:
387 go func() {
388 if err := ofc.handleStatsRequest(header, header.(ofp.IStatsRequest).GetStatsType()); err != nil {
389 logger.Errorw("ofpt-stats-request", log.Fields{"error": err})
390 }
391 }()
392 case ofp.OFPTBarrierRequest:
393 /* See note above at case ofp.OFPTFlowMod:*/
394 ofc.handleBarrierRequest(header.(*ofp.BarrierRequest))
395 case ofp.OFPTRoleRequest:
396 go ofc.handleRoleRequest(header.(*ofp.RoleRequest))
397 case ofp.OFPTMeterMod:
398 if !(ofc.role == ofcRoleMaster || ofc.role == ofcRoleEqual) {
399 ofc.sendRoleSlaveError(header)
400 return
401 }
402 ofc.handleMeterModRequest(header.(*ofp.MeterMod))
403 case ofp.OFPTGroupMod:
404 if !(ofc.role == ofcRoleMaster || ofc.role == ofcRoleEqual) {
405 ofc.sendRoleSlaveError(header)
406 return
407 }
408 // TODO handle group mods
409 }
410}
411
412// Message interface that represents an open flow message and enables for a
413// unified implementation of SendMessage
414type Message interface {
415 Serialize(encoder *goloxi.Encoder) error
416}
417
418func (ofc *OFConnection) doSend(msg Message) error {
419 if ofc.conn == nil {
420 return errors.New("no-connection")
421 }
422 enc := goloxi.NewEncoder()
423 if err := msg.Serialize(enc); err != nil {
424 return err
425 }
426
427 bytes := enc.Bytes()
428 if _, err := ofc.conn.Write(bytes); err != nil {
429 logger.Errorw("unable-to-send-message-to-controller",
430 log.Fields{
431 "device-id": ofc.DeviceID,
432 "message": msg,
433 "error": err})
434 return err
435 }
436 return nil
437}
438
439func (ofc *OFConnection) messageSender(ctx context.Context) {
440 // first process last fail if it exists
441 if ofc.lastUnsentMessage != nil {
442 if err := ofc.doSend(ofc.lastUnsentMessage); err != nil {
443 ofc.events <- ofcEventDisconnect
444 return
445 }
446 ofc.lastUnsentMessage = nil
447 }
448top:
449 for {
450 select {
451 case <-ctx.Done():
452 break top
453 case msg := <-ofc.sendChannel:
454 if err := ofc.doSend(msg); err != nil {
455 ofc.lastUnsentMessage = msg
456 ofc.events <- ofcEventDisconnect
457 logger.Debugw("message-sender-error",
458 log.Fields{
459 "device-id": ofc.DeviceID,
460 "error": err.Error()})
461 break top
462 }
463 logger.Debugw("message-sender-send",
464 log.Fields{
465 "device-id": ofc.DeviceID})
466 ofc.lastUnsentMessage = nil
467 }
468 }
469
470 logger.Debugw("message-sender-finished",
471 log.Fields{
472 "device-id": ofc.DeviceID})
473}
474
475// SendMessage queues a message to be sent to the openflow controller
476func (ofc *OFConnection) SendMessage(message Message) error {
477 logger.Debug("queuing-message")
478 ofc.sendChannel <- message
479 return nil
480}