blob: 691644c55d008897a0b5238b8d106c010b166c01 [file] [log] [blame]
Jonathan Hart4b110f62020-03-13 17:36:19 -07001/*
2 Copyright 2020 the original author or authors.
3
4 Licensed under the Apache License, Version 2.0 (the "License");
5 you may not use this file except in compliance with the License.
6 You may obtain a copy of the License at
7
8 http://www.apache.org/licenses/LICENSE-2.0
9
10 Unless required by applicable law or agreed to in writing, software
11 distributed under the License is distributed on an "AS IS" BASIS,
12 WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 See the License for the specific language governing permissions and
14 limitations under the License.
15*/
16package openflow
17
18import (
19 "bufio"
20 "context"
21 "encoding/binary"
22 "encoding/json"
23 "errors"
Jonathan Hart4b110f62020-03-13 17:36:19 -070024 "io"
25 "net"
26 "time"
David Bainbridgef8ce7d22020-04-08 12:49:41 -070027
28 "github.com/donNewtonAlpha/goloxi"
29 ofp "github.com/donNewtonAlpha/goloxi/of13"
30 "github.com/opencord/ofagent-go/internal/pkg/holder"
31 "github.com/opencord/voltha-lib-go/v3/pkg/log"
32 "github.com/opencord/voltha-protos/v3/go/voltha"
Jonathan Hart4b110f62020-03-13 17:36:19 -070033)
34
35type OFConnection struct {
36 OFControllerEndPoint string
37 DeviceID string
David Bainbridgef8ce7d22020-04-08 12:49:41 -070038 VolthaClient *holder.VolthaServiceClientHolder
Jonathan Hart4b110f62020-03-13 17:36:19 -070039 PacketOutChannel chan *voltha.PacketOut
40 ConnectionMaxRetries int
41 ConnectionRetryDelay time.Duration
42
43 conn net.Conn
44
45 // current role of this connection
46 role ofcRole
47 roleManager RoleManager
48
49 events chan ofcEvent
50 sendChannel chan Message
51 lastUnsentMessage Message
52}
53
54func (ofc *OFConnection) peekAtOFHeader(buf []byte) (ofp.IHeader, error) {
55 header := ofp.Header{}
56 header.Version = uint8(buf[0])
57 header.Type = uint8(buf[1])
58 header.Length = binary.BigEndian.Uint16(buf[2:4])
59 header.Xid = binary.BigEndian.Uint32(buf[4:8])
60
61 // TODO: add minimal validation of version and type
62
63 return &header, nil
64}
65
66func (ofc *OFConnection) establishConnectionToController() error {
67 if ofc.conn != nil {
68 logger.Debugw("closing-of-connection-to-reconnect",
69 log.Fields{"device-id": ofc.DeviceID})
70 ofc.conn.Close()
71 ofc.conn = nil
72 }
73 try := 1
74 for ofc.ConnectionMaxRetries == 0 || try < ofc.ConnectionMaxRetries {
75 if raddr, err := net.ResolveTCPAddr("tcp", ofc.OFControllerEndPoint); err != nil {
76 logger.Debugw("openflow-client unable to resolve endpoint",
77 log.Fields{
78 "device-id": ofc.DeviceID,
79 "endpoint": ofc.OFControllerEndPoint})
80 } else {
81 if connection, err := net.DialTCP("tcp", nil, raddr); err == nil {
82 ofc.conn = connection
83 ofc.sayHello()
84 ofc.events <- ofcEventConnect
85 return nil
86 } else {
87 logger.Warnw("openflow-client-connect-error",
88 log.Fields{
89 "device-id": ofc.DeviceID,
90 "endpoint": ofc.OFControllerEndPoint})
91 }
92 }
93 if ofc.ConnectionMaxRetries == 0 || try < ofc.ConnectionMaxRetries {
94 if ofc.ConnectionMaxRetries != 0 {
95 try += 1
96 }
97 time.Sleep(ofc.ConnectionRetryDelay)
98 }
99 }
100 return errors.New("failed-to-connect-to-of-controller")
101}
102
103// Run implements the state machine for the OF client reacting to state change
104// events and invoking actions as a reaction to those state changes
105func (ofc *OFConnection) Run(ctx context.Context) {
106
107 var ofCtx context.Context
108 var ofDone func()
109 state := ofcStateCreated
110 ofc.events <- ofcEventStart
111top:
112 for {
113 select {
114 case <-ctx.Done():
115 state = ofcStateStopped
116 logger.Debugw("state-transition-context-done",
117 log.Fields{"device-id": ofc.DeviceID})
118 break top
119 case event := <-ofc.events:
120 previous := state
121 switch event {
122 case ofcEventStart:
123 logger.Debugw("ofc-event-start",
124 log.Fields{"device-id": ofc.DeviceID})
125 if state == ofcStateCreated {
126 state = ofcStateStarted
127 logger.Debug("STARTED MORE THAN ONCE")
128 go func() {
129 if err := ofc.establishConnectionToController(); err != nil {
130 logger.Errorw("controller-connection-failed", log.Fields{"error": err})
131 panic(err)
132 }
133 }()
134 } else {
135 logger.Errorw("illegal-state-transition",
136 log.Fields{
137 "device-id": ofc.DeviceID,
138 "current-state": state.String(),
139 "event": event.String()})
140 }
141 case ofcEventConnect:
142 logger.Debugw("ofc-event-connected",
143 log.Fields{"device-id": ofc.DeviceID})
144 if state == ofcStateStarted || state == ofcStateDisconnected {
145 state = ofcStateConnected
146 ofCtx, ofDone = context.WithCancel(context.Background())
147 go ofc.messageSender(ofCtx)
148 go ofc.processOFStream(ofCtx)
149 } else {
150 logger.Errorw("illegal-state-transition",
151 log.Fields{
152 "device-id": ofc.DeviceID,
153 "current-state": state.String(),
154 "event": event.String()})
155 }
156 case ofcEventDisconnect:
157 logger.Debugw("ofc-event-disconnected",
158 log.Fields{
159 "device-id": ofc.DeviceID,
160 "state": state.String()})
161 if state == ofcStateConnected {
162 state = ofcStateDisconnected
163 if ofDone != nil {
164 ofDone()
165 ofDone = nil
166 }
167 go func() {
168 if err := ofc.establishConnectionToController(); err != nil {
169 log.Errorw("controller-connection-failed", log.Fields{"error": err})
170 panic(err)
171 }
172 }()
173 } else {
174 logger.Errorw("illegal-state-transition",
175 log.Fields{
176 "device-id": ofc.DeviceID,
177 "current-state": state.String(),
178 "event": event.String()})
179 }
180 case ofcEventStop:
181 logger.Debugw("ofc-event-stop",
182 log.Fields{"device-id": ofc.DeviceID})
183 if state == ofcStateCreated || state == ofcStateConnected || state == ofcStateDisconnected {
184 state = ofcStateStopped
185 break top
186 } else {
187 logger.Errorw("illegal-state-transition",
188 log.Fields{
189 "device-id": ofc.DeviceID,
190 "current-state": state.String(),
191 "event": event.String()})
192 }
193 }
194 logger.Debugw("state-transition",
195 log.Fields{
196 "device-id": ofc.DeviceID,
197 "previous-state": previous.String(),
198 "current-state": state.String(),
199 "event": event.String()})
200 }
201 }
202
203 // If the child context exists, then cancel it
204 if ofDone != nil {
205 log.Debugw("closing-child-processes",
206 log.Fields{"device-id": ofc.DeviceID})
207 ofDone()
208 }
209
210 // If the connection is open, then close it
211 if ofc.conn != nil {
212 log.Debugw("closing-of-connection",
213 log.Fields{"device-id": ofc.DeviceID})
214 ofc.conn.Close()
215 ofc.conn = nil
216 }
217 log.Debugw("state-machine-finished",
218 log.Fields{"device-id": ofc.DeviceID})
219}
220
221// processOFStream processes the OF connection from the controller and invokes
222// the appropriate handler methods for each message.
223func (ofc *OFConnection) processOFStream(ctx context.Context) {
224 fromController := bufio.NewReader(ofc.conn)
225
226 /*
227 * We have a read buffer of a max size of 4096, so if we ever have
228 * a message larger than this then we will have issues
229 */
230 headerBuf := make([]byte, 8)
231
232top:
233 // Continue until we are told to stop
234 for {
235 select {
236 case <-ctx.Done():
237 logger.Error("of-loop-ending-context-done")
238 break top
239 default:
240 // Read 8 bytes, the standard OF header
241 read, err := io.ReadFull(fromController, headerBuf)
242 if err != nil {
243 logger.Errorw("bad-of-header",
244 log.Fields{
245 "byte-count": read,
246 "device-id": ofc.DeviceID,
247 "error": err})
248 break top
249 }
250
251 // Decode the header
252 peek, err := ofc.peekAtOFHeader(headerBuf)
253 if err != nil {
254 /*
255 * Header is bad, assume stream is corrupted
256 * and needs to be restarted
257 */
258 logger.Errorw("bad-of-packet",
259 log.Fields{
260 "device-id": ofc.DeviceID,
261 "error": err})
262 break top
263 }
264
265 // Calculate the size of the rest of the packet and read it
266 need := int(peek.GetLength())
267 messageBuf := make([]byte, need)
268 copy(messageBuf, headerBuf)
269 read, err = io.ReadFull(fromController, messageBuf[8:])
270 if err != nil {
271 logger.Errorw("bad-of-packet",
272 log.Fields{
273 "byte-count": read,
274 "device-id": ofc.DeviceID,
275 "error": err})
276 break top
277 }
278
279 // Decode and process the packet
280 decoder := goloxi.NewDecoder(messageBuf)
281 msg, err := ofp.DecodeHeader(decoder)
282 if err != nil {
283 // nolint: staticcheck
284 js, _ := json.Marshal(decoder)
285 logger.Errorw("failed-to-decode",
286 log.Fields{
287 "device-id": ofc.DeviceID,
288 "decoder": js,
289 "error": err})
290 break top
291 }
292 if logger.V(log.DebugLevel) {
293 js, _ := json.Marshal(msg)
294 logger.Debugw("packet-header",
295 log.Fields{
296 "device-id": ofc.DeviceID,
297 "header": js})
298 }
299 ofc.parseHeader(msg)
300 }
301 }
302 logger.Debugw("end-of-stream",
303 log.Fields{"device-id": ofc.DeviceID})
304 ofc.events <- ofcEventDisconnect
305}
306
307func (ofc *OFConnection) sayHello() {
308 hello := ofp.NewHello()
309 hello.Xid = uint32(GetXid())
310 elem := ofp.NewHelloElemVersionbitmap()
311 elem.SetType(ofp.OFPHETVersionbitmap)
312 elem.SetLength(8)
313 elem.SetBitmaps([]*ofp.Uint32{{Value: 16}})
314 hello.SetElements([]ofp.IHelloElem{elem})
315 if logger.V(log.DebugLevel) {
316 js, _ := json.Marshal(hello)
317 logger.Debugw("sayHello Called",
318 log.Fields{
319 "device-id": ofc.DeviceID,
320 "hello-message": js})
321 }
322 if err := ofc.SendMessage(hello); err != nil {
323 logger.Fatalw("Failed saying hello to Openflow Server, unable to proceed",
324 log.Fields{
325 "device-id": ofc.DeviceID,
326 "error": err})
327 }
328}
329
330func (ofc *OFConnection) parseHeader(header ofp.IHeader) {
331 headerType := header.GetType()
332 logger.Debugw("packet-header-type",
333 log.Fields{
334 "header-type": ofp.Type(headerType).String()})
335 switch headerType {
336 case ofp.OFPTHello:
337 //x := header.(*ofp.Hello)
338 case ofp.OFPTError:
339 go ofc.handleErrMsg(header.(*ofp.ErrorMsg))
340 case ofp.OFPTEchoRequest:
341 go ofc.handleEchoRequest(header.(*ofp.EchoRequest))
342 case ofp.OFPTEchoReply:
343 case ofp.OFPTExperimenter:
344 case ofp.OFPTFeaturesRequest:
345 go func() {
346 if err := ofc.handleFeatureRequest(header.(*ofp.FeaturesRequest)); err != nil {
347 logger.Errorw("handle-feature-request", log.Fields{"error": err})
348 }
349 }()
350 case ofp.OFPTFeaturesReply:
351 case ofp.OFPTGetConfigRequest:
352 go ofc.handleGetConfigRequest(header.(*ofp.GetConfigRequest))
353 case ofp.OFPTGetConfigReply:
354 case ofp.OFPTSetConfig:
355 go ofc.handleSetConfig(header.(*ofp.SetConfig))
356 case ofp.OFPTPacketIn:
357 case ofp.OFPTFlowRemoved:
358 case ofp.OFPTPortStatus:
359 case ofp.OFPTPacketOut:
360 if !(ofc.role == ofcRoleMaster || ofc.role == ofcRoleEqual) {
361 ofc.sendRoleSlaveError(header)
362 return
363 }
364 go ofc.handlePacketOut(header.(*ofp.PacketOut))
365 case ofp.OFPTFlowMod:
366 if !(ofc.role == ofcRoleMaster || ofc.role == ofcRoleEqual) {
367 ofc.sendRoleSlaveError(header)
368 return
369 }
370 /*
371 * Not using go routine to handle flow* messages or barrier requests
372 * onos typically issues barrier requests just before a flow* message.
373 * by handling in this thread I ensure all flow* are handled when barrier
374 * request is issued.
375 */
376 switch header.(ofp.IFlowMod).GetCommand() {
377 case ofp.OFPFCAdd:
378 ofc.handleFlowAdd(header.(*ofp.FlowAdd))
379 case ofp.OFPFCModify:
380 ofc.handleFlowMod(header.(*ofp.FlowMod))
381 case ofp.OFPFCModifyStrict:
382 ofc.handleFlowModStrict(header.(*ofp.FlowModifyStrict))
383 case ofp.OFPFCDelete:
384 ofc.handleFlowDelete(header.(*ofp.FlowDelete))
385 case ofp.OFPFCDeleteStrict:
386 ofc.handleFlowDeleteStrict(header.(*ofp.FlowDeleteStrict))
387 }
388 case ofp.OFPTStatsRequest:
389 go func() {
390 if err := ofc.handleStatsRequest(header, header.(ofp.IStatsRequest).GetStatsType()); err != nil {
391 logger.Errorw("ofpt-stats-request", log.Fields{"error": err})
392 }
393 }()
394 case ofp.OFPTBarrierRequest:
395 /* See note above at case ofp.OFPTFlowMod:*/
396 ofc.handleBarrierRequest(header.(*ofp.BarrierRequest))
397 case ofp.OFPTRoleRequest:
398 go ofc.handleRoleRequest(header.(*ofp.RoleRequest))
399 case ofp.OFPTMeterMod:
400 if !(ofc.role == ofcRoleMaster || ofc.role == ofcRoleEqual) {
401 ofc.sendRoleSlaveError(header)
402 return
403 }
404 ofc.handleMeterModRequest(header.(*ofp.MeterMod))
405 case ofp.OFPTGroupMod:
406 if !(ofc.role == ofcRoleMaster || ofc.role == ofcRoleEqual) {
407 ofc.sendRoleSlaveError(header)
408 return
409 }
410 // TODO handle group mods
411 }
412}
413
414// Message interface that represents an open flow message and enables for a
415// unified implementation of SendMessage
416type Message interface {
417 Serialize(encoder *goloxi.Encoder) error
418}
419
420func (ofc *OFConnection) doSend(msg Message) error {
421 if ofc.conn == nil {
422 return errors.New("no-connection")
423 }
424 enc := goloxi.NewEncoder()
425 if err := msg.Serialize(enc); err != nil {
426 return err
427 }
428
429 bytes := enc.Bytes()
430 if _, err := ofc.conn.Write(bytes); err != nil {
431 logger.Errorw("unable-to-send-message-to-controller",
432 log.Fields{
433 "device-id": ofc.DeviceID,
434 "message": msg,
435 "error": err})
436 return err
437 }
438 return nil
439}
440
441func (ofc *OFConnection) messageSender(ctx context.Context) {
442 // first process last fail if it exists
443 if ofc.lastUnsentMessage != nil {
444 if err := ofc.doSend(ofc.lastUnsentMessage); err != nil {
445 ofc.events <- ofcEventDisconnect
446 return
447 }
448 ofc.lastUnsentMessage = nil
449 }
450top:
451 for {
452 select {
453 case <-ctx.Done():
454 break top
455 case msg := <-ofc.sendChannel:
456 if err := ofc.doSend(msg); err != nil {
457 ofc.lastUnsentMessage = msg
458 ofc.events <- ofcEventDisconnect
459 logger.Debugw("message-sender-error",
460 log.Fields{
461 "device-id": ofc.DeviceID,
462 "error": err.Error()})
463 break top
464 }
465 logger.Debugw("message-sender-send",
466 log.Fields{
467 "device-id": ofc.DeviceID})
468 ofc.lastUnsentMessage = nil
469 }
470 }
471
472 logger.Debugw("message-sender-finished",
473 log.Fields{
474 "device-id": ofc.DeviceID})
475}
476
477// SendMessage queues a message to be sent to the openflow controller
478func (ofc *OFConnection) SendMessage(message Message) error {
479 logger.Debug("queuing-message")
480 ofc.sendChannel <- message
481 return nil
482}