blob: 9a781ecbfafb51182d63c37aa029bd128ef897a6 [file] [log] [blame]
Jonathan Hart4b110f62020-03-13 17:36:19 -07001/*
2 Copyright 2020 the original author or authors.
3
4 Licensed under the Apache License, Version 2.0 (the "License");
5 you may not use this file except in compliance with the License.
6 You may obtain a copy of the License at
7
8 http://www.apache.org/licenses/LICENSE-2.0
9
10 Unless required by applicable law or agreed to in writing, software
11 distributed under the License is distributed on an "AS IS" BASIS,
12 WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 See the License for the specific language governing permissions and
14 limitations under the License.
15*/
16package openflow
17
18import (
19 "bufio"
20 "context"
21 "encoding/binary"
22 "encoding/json"
23 "errors"
Jonathan Hart4b110f62020-03-13 17:36:19 -070024 "io"
25 "net"
Matteo Scandolo936e2df2020-10-27 14:31:36 -070026 "sync"
Jonathan Hart4b110f62020-03-13 17:36:19 -070027 "time"
David Bainbridgef8ce7d22020-04-08 12:49:41 -070028
Jonathan Hart828908c2020-04-15 14:23:45 -070029 "github.com/opencord/goloxi"
30 ofp "github.com/opencord/goloxi/of13"
David Bainbridgef8ce7d22020-04-08 12:49:41 -070031 "github.com/opencord/ofagent-go/internal/pkg/holder"
Andrea Campanella18448bc2021-07-08 18:47:22 +020032 "github.com/opencord/voltha-lib-go/v5/pkg/log"
Maninder12b909f2020-10-23 14:23:36 +053033 "github.com/opencord/voltha-protos/v4/go/voltha"
Jonathan Hart4b110f62020-03-13 17:36:19 -070034)
35
36type OFConnection struct {
37 OFControllerEndPoint string
38 DeviceID string
David Bainbridgef8ce7d22020-04-08 12:49:41 -070039 VolthaClient *holder.VolthaServiceClientHolder
Jonathan Hart4b110f62020-03-13 17:36:19 -070040 PacketOutChannel chan *voltha.PacketOut
41 ConnectionMaxRetries int
42 ConnectionRetryDelay time.Duration
43
44 conn net.Conn
45
46 // current role of this connection
47 role ofcRole
48 roleManager RoleManager
49
50 events chan ofcEvent
51 sendChannel chan Message
52 lastUnsentMessage Message
Matteo Scandolo256266d2020-06-01 13:44:07 -070053
54 flowsChunkSize int
55 portsChunkSize int
56 portsDescChunkSize int
Jonathan Hart4b110f62020-03-13 17:36:19 -070057}
58
59func (ofc *OFConnection) peekAtOFHeader(buf []byte) (ofp.IHeader, error) {
60 header := ofp.Header{}
61 header.Version = uint8(buf[0])
62 header.Type = uint8(buf[1])
63 header.Length = binary.BigEndian.Uint16(buf[2:4])
64 header.Xid = binary.BigEndian.Uint32(buf[4:8])
65
66 // TODO: add minimal validation of version and type
67
68 return &header, nil
69}
70
Rohan Agrawalc32d9932020-06-15 11:01:47 +000071func (ofc *OFConnection) establishConnectionToController(ctx context.Context) error {
Jonathan Hart4b110f62020-03-13 17:36:19 -070072 if ofc.conn != nil {
Rohan Agrawalc32d9932020-06-15 11:01:47 +000073 logger.Debugw(ctx, "closing-of-connection-to-reconnect",
Jonathan Hart4b110f62020-03-13 17:36:19 -070074 log.Fields{"device-id": ofc.DeviceID})
Andrey Pozolotin536ee582021-05-28 16:31:44 +030075 err := ofc.conn.Close()
76 if err != nil {
77 logger.Errorw(ctx, "failed-connection-close-proceeding-setting-to-nil", log.Fields{"error": err})
78 }
Jonathan Hart4b110f62020-03-13 17:36:19 -070079 ofc.conn = nil
80 }
81 try := 1
82 for ofc.ConnectionMaxRetries == 0 || try < ofc.ConnectionMaxRetries {
83 if raddr, err := net.ResolveTCPAddr("tcp", ofc.OFControllerEndPoint); err != nil {
Rohan Agrawalc32d9932020-06-15 11:01:47 +000084 logger.Debugw(ctx, "openflow-client unable to resolve endpoint",
Jonathan Hart4b110f62020-03-13 17:36:19 -070085 log.Fields{
86 "device-id": ofc.DeviceID,
87 "endpoint": ofc.OFControllerEndPoint})
88 } else {
89 if connection, err := net.DialTCP("tcp", nil, raddr); err == nil {
90 ofc.conn = connection
Rohan Agrawalc32d9932020-06-15 11:01:47 +000091 ofc.sayHello(ctx)
Jonathan Hart4b110f62020-03-13 17:36:19 -070092 ofc.events <- ofcEventConnect
93 return nil
94 } else {
Rohan Agrawalc32d9932020-06-15 11:01:47 +000095 logger.Warnw(ctx, "openflow-client-connect-error",
Jonathan Hart4b110f62020-03-13 17:36:19 -070096 log.Fields{
97 "device-id": ofc.DeviceID,
98 "endpoint": ofc.OFControllerEndPoint})
99 }
100 }
101 if ofc.ConnectionMaxRetries == 0 || try < ofc.ConnectionMaxRetries {
102 if ofc.ConnectionMaxRetries != 0 {
103 try += 1
104 }
105 time.Sleep(ofc.ConnectionRetryDelay)
106 }
107 }
108 return errors.New("failed-to-connect-to-of-controller")
109}
110
111// Run implements the state machine for the OF client reacting to state change
112// events and invoking actions as a reaction to those state changes
113func (ofc *OFConnection) Run(ctx context.Context) {
114
115 var ofCtx context.Context
116 var ofDone func()
117 state := ofcStateCreated
118 ofc.events <- ofcEventStart
119top:
120 for {
121 select {
122 case <-ctx.Done():
123 state = ofcStateStopped
Rohan Agrawalc32d9932020-06-15 11:01:47 +0000124 logger.Debugw(ctx, "state-transition-context-done",
Jonathan Hart4b110f62020-03-13 17:36:19 -0700125 log.Fields{"device-id": ofc.DeviceID})
126 break top
127 case event := <-ofc.events:
128 previous := state
129 switch event {
130 case ofcEventStart:
Rohan Agrawalc32d9932020-06-15 11:01:47 +0000131 logger.Debugw(ctx, "ofc-event-start",
Jonathan Hart4b110f62020-03-13 17:36:19 -0700132 log.Fields{"device-id": ofc.DeviceID})
133 if state == ofcStateCreated {
134 state = ofcStateStarted
Rohan Agrawalc32d9932020-06-15 11:01:47 +0000135 logger.Debug(ctx, "STARTED MORE THAN ONCE")
Jonathan Hart4b110f62020-03-13 17:36:19 -0700136 go func() {
Rohan Agrawalc32d9932020-06-15 11:01:47 +0000137 if err := ofc.establishConnectionToController(ctx); err != nil {
138 logger.Errorw(ctx, "controller-connection-failed", log.Fields{"error": err})
Jonathan Hart4b110f62020-03-13 17:36:19 -0700139 panic(err)
140 }
141 }()
142 } else {
Rohan Agrawalc32d9932020-06-15 11:01:47 +0000143 logger.Errorw(ctx, "illegal-state-transition",
Jonathan Hart4b110f62020-03-13 17:36:19 -0700144 log.Fields{
145 "device-id": ofc.DeviceID,
146 "current-state": state.String(),
147 "event": event.String()})
148 }
149 case ofcEventConnect:
Rohan Agrawalc32d9932020-06-15 11:01:47 +0000150 logger.Debugw(ctx, "ofc-event-connected",
Jonathan Hart4b110f62020-03-13 17:36:19 -0700151 log.Fields{"device-id": ofc.DeviceID})
152 if state == ofcStateStarted || state == ofcStateDisconnected {
153 state = ofcStateConnected
Girish Kumar01e0c632020-08-10 16:48:56 +0000154 ofCtx, ofDone = context.WithCancel(log.WithSpanFromContext(context.Background(), ctx))
Jonathan Hart4b110f62020-03-13 17:36:19 -0700155 go ofc.messageSender(ofCtx)
156 go ofc.processOFStream(ofCtx)
157 } else {
Rohan Agrawalc32d9932020-06-15 11:01:47 +0000158 logger.Errorw(ctx, "illegal-state-transition",
Jonathan Hart4b110f62020-03-13 17:36:19 -0700159 log.Fields{
160 "device-id": ofc.DeviceID,
161 "current-state": state.String(),
162 "event": event.String()})
163 }
164 case ofcEventDisconnect:
Rohan Agrawalc32d9932020-06-15 11:01:47 +0000165 logger.Debugw(ctx, "ofc-event-disconnected",
Jonathan Hart4b110f62020-03-13 17:36:19 -0700166 log.Fields{
167 "device-id": ofc.DeviceID,
168 "state": state.String()})
169 if state == ofcStateConnected {
170 state = ofcStateDisconnected
171 if ofDone != nil {
172 ofDone()
173 ofDone = nil
174 }
175 go func() {
Rohan Agrawalc32d9932020-06-15 11:01:47 +0000176 if err := ofc.establishConnectionToController(ctx); err != nil {
Girish Kumarbe2ea8a2020-08-19 17:52:55 +0000177 logger.Errorw(ctx, "controller-connection-failed", log.Fields{"error": err})
Jonathan Hart4b110f62020-03-13 17:36:19 -0700178 panic(err)
179 }
180 }()
181 } else {
Rohan Agrawalc32d9932020-06-15 11:01:47 +0000182 logger.Errorw(ctx, "illegal-state-transition",
Jonathan Hart4b110f62020-03-13 17:36:19 -0700183 log.Fields{
184 "device-id": ofc.DeviceID,
185 "current-state": state.String(),
186 "event": event.String()})
187 }
188 case ofcEventStop:
Rohan Agrawalc32d9932020-06-15 11:01:47 +0000189 logger.Debugw(ctx, "ofc-event-stop",
Jonathan Hart4b110f62020-03-13 17:36:19 -0700190 log.Fields{"device-id": ofc.DeviceID})
191 if state == ofcStateCreated || state == ofcStateConnected || state == ofcStateDisconnected {
192 state = ofcStateStopped
193 break top
194 } else {
Rohan Agrawalc32d9932020-06-15 11:01:47 +0000195 logger.Errorw(ctx, "illegal-state-transition",
Jonathan Hart4b110f62020-03-13 17:36:19 -0700196 log.Fields{
197 "device-id": ofc.DeviceID,
198 "current-state": state.String(),
199 "event": event.String()})
200 }
201 }
Rohan Agrawalc32d9932020-06-15 11:01:47 +0000202 logger.Debugw(ctx, "state-transition",
Jonathan Hart4b110f62020-03-13 17:36:19 -0700203 log.Fields{
204 "device-id": ofc.DeviceID,
205 "previous-state": previous.String(),
206 "current-state": state.String(),
207 "event": event.String()})
208 }
209 }
210
211 // If the child context exists, then cancel it
212 if ofDone != nil {
Girish Kumarbe2ea8a2020-08-19 17:52:55 +0000213 logger.Debugw(ctx, "closing-child-processes",
Jonathan Hart4b110f62020-03-13 17:36:19 -0700214 log.Fields{"device-id": ofc.DeviceID})
215 ofDone()
216 }
217
218 // If the connection is open, then close it
219 if ofc.conn != nil {
Girish Kumarbe2ea8a2020-08-19 17:52:55 +0000220 logger.Debugw(ctx, "closing-of-connection",
Jonathan Hart4b110f62020-03-13 17:36:19 -0700221 log.Fields{"device-id": ofc.DeviceID})
Andrey Pozolotin536ee582021-05-28 16:31:44 +0300222 err := ofc.conn.Close()
223 if err != nil {
224 logger.Errorw(ctx, "closing-of-connection", log.Fields{"error": err})
225 }
Jonathan Hart4b110f62020-03-13 17:36:19 -0700226 ofc.conn = nil
227 }
Girish Kumarbe2ea8a2020-08-19 17:52:55 +0000228 logger.Debugw(ctx, "state-machine-finished",
Jonathan Hart4b110f62020-03-13 17:36:19 -0700229 log.Fields{"device-id": ofc.DeviceID})
230}
231
232// processOFStream processes the OF connection from the controller and invokes
233// the appropriate handler methods for each message.
234func (ofc *OFConnection) processOFStream(ctx context.Context) {
235 fromController := bufio.NewReader(ofc.conn)
236
237 /*
238 * We have a read buffer of a max size of 4096, so if we ever have
239 * a message larger than this then we will have issues
240 */
241 headerBuf := make([]byte, 8)
Matteo Scandolo936e2df2020-10-27 14:31:36 -0700242 wg := sync.WaitGroup{}
Jonathan Hart4b110f62020-03-13 17:36:19 -0700243top:
244 // Continue until we are told to stop
245 for {
246 select {
247 case <-ctx.Done():
Rohan Agrawalc32d9932020-06-15 11:01:47 +0000248 logger.Error(ctx, "of-loop-ending-context-done")
Jonathan Hart4b110f62020-03-13 17:36:19 -0700249 break top
250 default:
251 // Read 8 bytes, the standard OF header
252 read, err := io.ReadFull(fromController, headerBuf)
253 if err != nil {
Jonathan Hart828908c2020-04-15 14:23:45 -0700254 if err == io.EOF {
Rohan Agrawalc32d9932020-06-15 11:01:47 +0000255 logger.Infow(ctx, "controller-disconnected",
Jonathan Hart828908c2020-04-15 14:23:45 -0700256 log.Fields{
257 "device-id": ofc.DeviceID,
258 "controller": ofc.OFControllerEndPoint,
259 })
260 } else {
Rohan Agrawalc32d9932020-06-15 11:01:47 +0000261 logger.Errorw(ctx, "bad-of-header",
Jonathan Hart828908c2020-04-15 14:23:45 -0700262 log.Fields{
263 "byte-count": read,
264 "device-id": ofc.DeviceID,
265 "controller": ofc.OFControllerEndPoint,
266 "error": err})
267 }
Jonathan Hart4b110f62020-03-13 17:36:19 -0700268 break top
269 }
270
271 // Decode the header
272 peek, err := ofc.peekAtOFHeader(headerBuf)
273 if err != nil {
274 /*
275 * Header is bad, assume stream is corrupted
276 * and needs to be restarted
277 */
Rohan Agrawalc32d9932020-06-15 11:01:47 +0000278 logger.Errorw(ctx, "bad-of-packet",
Jonathan Hart4b110f62020-03-13 17:36:19 -0700279 log.Fields{
280 "device-id": ofc.DeviceID,
281 "error": err})
282 break top
283 }
284
285 // Calculate the size of the rest of the packet and read it
286 need := int(peek.GetLength())
287 messageBuf := make([]byte, need)
288 copy(messageBuf, headerBuf)
289 read, err = io.ReadFull(fromController, messageBuf[8:])
290 if err != nil {
Rohan Agrawalc32d9932020-06-15 11:01:47 +0000291 logger.Errorw(ctx, "bad-of-packet",
Jonathan Hart4b110f62020-03-13 17:36:19 -0700292 log.Fields{
293 "byte-count": read,
294 "device-id": ofc.DeviceID,
295 "error": err})
296 break top
297 }
298
299 // Decode and process the packet
300 decoder := goloxi.NewDecoder(messageBuf)
301 msg, err := ofp.DecodeHeader(decoder)
302 if err != nil {
303 // nolint: staticcheck
304 js, _ := json.Marshal(decoder)
Rohan Agrawalc32d9932020-06-15 11:01:47 +0000305 logger.Errorw(ctx, "failed-to-decode",
Jonathan Hart4b110f62020-03-13 17:36:19 -0700306 log.Fields{
307 "device-id": ofc.DeviceID,
308 "decoder": js,
309 "error": err})
310 break top
311 }
312 if logger.V(log.DebugLevel) {
313 js, _ := json.Marshal(msg)
Rohan Agrawalc32d9932020-06-15 11:01:47 +0000314 logger.Debugw(ctx, "packet-header",
Jonathan Hart4b110f62020-03-13 17:36:19 -0700315 log.Fields{
316 "device-id": ofc.DeviceID,
317 "header": js})
318 }
Matteo Scandolo936e2df2020-10-27 14:31:36 -0700319
320 // We can parallelize the processing of all the operations
321 // that we get before a BarrieRequest, then we need to wait.
322 // What we are doing is:
323 // - spawn threads until we get a Barrier
324 // - when we get a barrier wait for the threads to complete before continuing
325
326 msgType := msg.GetType()
327 if msgType == ofp.OFPTBarrierRequest {
328 logger.Debug(ctx, "received-barrier-request-waiting-for-pending-requests")
329 wg.Wait()
330 logger.Debug(ctx, "restarting-requests-processing")
331 }
332
333 wg.Add(1)
334 go ofc.parseHeader(ctx, msg, &wg)
Jonathan Hart4b110f62020-03-13 17:36:19 -0700335 }
336 }
Rohan Agrawalc32d9932020-06-15 11:01:47 +0000337 logger.Debugw(ctx, "end-of-stream",
Jonathan Hart4b110f62020-03-13 17:36:19 -0700338 log.Fields{"device-id": ofc.DeviceID})
339 ofc.events <- ofcEventDisconnect
340}
341
Rohan Agrawalc32d9932020-06-15 11:01:47 +0000342func (ofc *OFConnection) sayHello(ctx context.Context) {
Jonathan Hart4b110f62020-03-13 17:36:19 -0700343 hello := ofp.NewHello()
344 hello.Xid = uint32(GetXid())
345 elem := ofp.NewHelloElemVersionbitmap()
346 elem.SetType(ofp.OFPHETVersionbitmap)
347 elem.SetLength(8)
348 elem.SetBitmaps([]*ofp.Uint32{{Value: 16}})
349 hello.SetElements([]ofp.IHelloElem{elem})
350 if logger.V(log.DebugLevel) {
351 js, _ := json.Marshal(hello)
Rohan Agrawalc32d9932020-06-15 11:01:47 +0000352 logger.Debugw(ctx, "sayHello Called",
Jonathan Hart4b110f62020-03-13 17:36:19 -0700353 log.Fields{
354 "device-id": ofc.DeviceID,
355 "hello-message": js})
356 }
Rohan Agrawalc32d9932020-06-15 11:01:47 +0000357 if err := ofc.SendMessage(ctx, hello); err != nil {
358 logger.Fatalw(ctx, "Failed saying hello to Openflow Server, unable to proceed",
Jonathan Hart4b110f62020-03-13 17:36:19 -0700359 log.Fields{
360 "device-id": ofc.DeviceID,
361 "error": err})
362 }
363}
364
Matteo Scandolo936e2df2020-10-27 14:31:36 -0700365func (ofc *OFConnection) parseHeader(ctx context.Context, header ofp.IHeader, wg *sync.WaitGroup) {
366 defer wg.Done()
Jonathan Hart4b110f62020-03-13 17:36:19 -0700367 headerType := header.GetType()
Rohan Agrawalc32d9932020-06-15 11:01:47 +0000368 logger.Debugw(ctx, "packet-header-type",
Jonathan Hart4b110f62020-03-13 17:36:19 -0700369 log.Fields{
370 "header-type": ofp.Type(headerType).String()})
371 switch headerType {
372 case ofp.OFPTHello:
373 //x := header.(*ofp.Hello)
374 case ofp.OFPTError:
Rohan Agrawalc32d9932020-06-15 11:01:47 +0000375 go ofc.handleErrMsg(ctx, header.(*ofp.ErrorMsg))
Jonathan Hart4b110f62020-03-13 17:36:19 -0700376 case ofp.OFPTEchoRequest:
Rohan Agrawalc32d9932020-06-15 11:01:47 +0000377 go ofc.handleEchoRequest(ctx, header.(*ofp.EchoRequest))
Jonathan Hart4b110f62020-03-13 17:36:19 -0700378 case ofp.OFPTEchoReply:
379 case ofp.OFPTExperimenter:
380 case ofp.OFPTFeaturesRequest:
381 go func() {
Rohan Agrawalc32d9932020-06-15 11:01:47 +0000382 if err := ofc.handleFeatureRequest(ctx, header.(*ofp.FeaturesRequest)); err != nil {
383 logger.Errorw(ctx, "handle-feature-request", log.Fields{"error": err})
Jonathan Hart4b110f62020-03-13 17:36:19 -0700384 }
385 }()
386 case ofp.OFPTFeaturesReply:
387 case ofp.OFPTGetConfigRequest:
Rohan Agrawalc32d9932020-06-15 11:01:47 +0000388 go ofc.handleGetConfigRequest(ctx, header.(*ofp.GetConfigRequest))
Jonathan Hart4b110f62020-03-13 17:36:19 -0700389 case ofp.OFPTGetConfigReply:
390 case ofp.OFPTSetConfig:
Rohan Agrawalc32d9932020-06-15 11:01:47 +0000391 go ofc.handleSetConfig(ctx, header.(*ofp.SetConfig))
Jonathan Hart4b110f62020-03-13 17:36:19 -0700392 case ofp.OFPTPacketIn:
393 case ofp.OFPTFlowRemoved:
394 case ofp.OFPTPortStatus:
395 case ofp.OFPTPacketOut:
396 if !(ofc.role == ofcRoleMaster || ofc.role == ofcRoleEqual) {
Rohan Agrawalc32d9932020-06-15 11:01:47 +0000397 ofc.sendRoleSlaveError(ctx, header)
Jonathan Hart4b110f62020-03-13 17:36:19 -0700398 return
399 }
Rohan Agrawalc32d9932020-06-15 11:01:47 +0000400 go ofc.handlePacketOut(ctx, header.(*ofp.PacketOut))
Jonathan Hart4b110f62020-03-13 17:36:19 -0700401 case ofp.OFPTFlowMod:
402 if !(ofc.role == ofcRoleMaster || ofc.role == ofcRoleEqual) {
Rohan Agrawalc32d9932020-06-15 11:01:47 +0000403 ofc.sendRoleSlaveError(ctx, header)
Jonathan Hart4b110f62020-03-13 17:36:19 -0700404 return
405 }
Jonathan Hart4b110f62020-03-13 17:36:19 -0700406 switch header.(ofp.IFlowMod).GetCommand() {
407 case ofp.OFPFCAdd:
Rohan Agrawalc32d9932020-06-15 11:01:47 +0000408 ofc.handleFlowAdd(ctx, header.(*ofp.FlowAdd))
Jonathan Hart4b110f62020-03-13 17:36:19 -0700409 case ofp.OFPFCModify:
Rohan Agrawalc32d9932020-06-15 11:01:47 +0000410 ofc.handleFlowMod(ctx, header.(*ofp.FlowMod))
Jonathan Hart4b110f62020-03-13 17:36:19 -0700411 case ofp.OFPFCModifyStrict:
Rohan Agrawalc32d9932020-06-15 11:01:47 +0000412 ofc.handleFlowModStrict(ctx, header.(*ofp.FlowModifyStrict))
Jonathan Hart4b110f62020-03-13 17:36:19 -0700413 case ofp.OFPFCDelete:
Rohan Agrawalc32d9932020-06-15 11:01:47 +0000414 ofc.handleFlowDelete(ctx, header.(*ofp.FlowDelete))
Jonathan Hart4b110f62020-03-13 17:36:19 -0700415 case ofp.OFPFCDeleteStrict:
Rohan Agrawalc32d9932020-06-15 11:01:47 +0000416 ofc.handleFlowDeleteStrict(ctx, header.(*ofp.FlowDeleteStrict))
Jonathan Hart4b110f62020-03-13 17:36:19 -0700417 }
418 case ofp.OFPTStatsRequest:
419 go func() {
Rohan Agrawalc32d9932020-06-15 11:01:47 +0000420 if err := ofc.handleStatsRequest(ctx, header, header.(ofp.IStatsRequest).GetStatsType()); err != nil {
421 logger.Errorw(ctx, "ofpt-stats-request", log.Fields{"error": err})
Jonathan Hart4b110f62020-03-13 17:36:19 -0700422 }
423 }()
424 case ofp.OFPTBarrierRequest:
425 /* See note above at case ofp.OFPTFlowMod:*/
Rohan Agrawalc32d9932020-06-15 11:01:47 +0000426 ofc.handleBarrierRequest(ctx, header.(*ofp.BarrierRequest))
Jonathan Hart4b110f62020-03-13 17:36:19 -0700427 case ofp.OFPTRoleRequest:
Rohan Agrawalc32d9932020-06-15 11:01:47 +0000428 go ofc.handleRoleRequest(ctx, header.(*ofp.RoleRequest))
Jonathan Hart4b110f62020-03-13 17:36:19 -0700429 case ofp.OFPTMeterMod:
430 if !(ofc.role == ofcRoleMaster || ofc.role == ofcRoleEqual) {
Rohan Agrawalc32d9932020-06-15 11:01:47 +0000431 ofc.sendRoleSlaveError(ctx, header)
Jonathan Hart4b110f62020-03-13 17:36:19 -0700432 return
433 }
Rohan Agrawalc32d9932020-06-15 11:01:47 +0000434 ofc.handleMeterModRequest(ctx, header.(*ofp.MeterMod))
Jonathan Hart4b110f62020-03-13 17:36:19 -0700435 case ofp.OFPTGroupMod:
436 if !(ofc.role == ofcRoleMaster || ofc.role == ofcRoleEqual) {
Rohan Agrawalc32d9932020-06-15 11:01:47 +0000437 ofc.sendRoleSlaveError(ctx, header)
Jonathan Hart4b110f62020-03-13 17:36:19 -0700438 return
439 }
Rohan Agrawalc32d9932020-06-15 11:01:47 +0000440 ofc.handleGroupMod(ctx, header.(ofp.IGroupMod))
Jonathan Hart4b110f62020-03-13 17:36:19 -0700441 }
442}
443
444// Message interface that represents an open flow message and enables for a
445// unified implementation of SendMessage
446type Message interface {
447 Serialize(encoder *goloxi.Encoder) error
448}
449
Rohan Agrawalc32d9932020-06-15 11:01:47 +0000450func (ofc *OFConnection) doSend(ctx context.Context, msg Message) error {
Jonathan Hart4b110f62020-03-13 17:36:19 -0700451 if ofc.conn == nil {
452 return errors.New("no-connection")
453 }
454 enc := goloxi.NewEncoder()
455 if err := msg.Serialize(enc); err != nil {
456 return err
457 }
458
459 bytes := enc.Bytes()
460 if _, err := ofc.conn.Write(bytes); err != nil {
Rohan Agrawalc32d9932020-06-15 11:01:47 +0000461 logger.Errorw(ctx, "unable-to-send-message-to-controller",
Jonathan Hart4b110f62020-03-13 17:36:19 -0700462 log.Fields{
463 "device-id": ofc.DeviceID,
464 "message": msg,
465 "error": err})
466 return err
467 }
468 return nil
469}
470
471func (ofc *OFConnection) messageSender(ctx context.Context) {
472 // first process last fail if it exists
473 if ofc.lastUnsentMessage != nil {
Rohan Agrawalc32d9932020-06-15 11:01:47 +0000474 if err := ofc.doSend(ctx, ofc.lastUnsentMessage); err != nil {
Jonathan Hart4b110f62020-03-13 17:36:19 -0700475 ofc.events <- ofcEventDisconnect
476 return
477 }
478 ofc.lastUnsentMessage = nil
479 }
480top:
481 for {
482 select {
483 case <-ctx.Done():
484 break top
485 case msg := <-ofc.sendChannel:
Rohan Agrawalc32d9932020-06-15 11:01:47 +0000486 if err := ofc.doSend(ctx, msg); err != nil {
Jonathan Hart4b110f62020-03-13 17:36:19 -0700487 ofc.lastUnsentMessage = msg
488 ofc.events <- ofcEventDisconnect
Rohan Agrawalc32d9932020-06-15 11:01:47 +0000489 logger.Debugw(ctx, "message-sender-error",
Jonathan Hart4b110f62020-03-13 17:36:19 -0700490 log.Fields{
491 "device-id": ofc.DeviceID,
492 "error": err.Error()})
493 break top
494 }
Rohan Agrawalc32d9932020-06-15 11:01:47 +0000495 logger.Debugw(ctx, "message-sender-send",
Jonathan Hart4b110f62020-03-13 17:36:19 -0700496 log.Fields{
497 "device-id": ofc.DeviceID})
498 ofc.lastUnsentMessage = nil
499 }
500 }
501
Rohan Agrawalc32d9932020-06-15 11:01:47 +0000502 logger.Debugw(ctx, "message-sender-finished",
Jonathan Hart4b110f62020-03-13 17:36:19 -0700503 log.Fields{
504 "device-id": ofc.DeviceID})
505}
506
507// SendMessage queues a message to be sent to the openflow controller
Rohan Agrawalc32d9932020-06-15 11:01:47 +0000508func (ofc *OFConnection) SendMessage(ctx context.Context, message Message) error {
Matteo Scandolo65e96762020-09-18 14:24:57 -0700509 logger.Debugw(ctx, "queuing-message", log.Fields{
510 "endpoint": ofc.OFControllerEndPoint,
511 "role": ofc.role,
512 })
Jonathan Hart4b110f62020-03-13 17:36:19 -0700513 ofc.sendChannel <- message
514 return nil
515}