blob: c625aac94bce23f40900aca28214101cdafac74c [file] [log] [blame]
Jonathan Hart4b110f62020-03-13 17:36:19 -07001/*
2 Copyright 2020 the original author or authors.
3
4 Licensed under the Apache License, Version 2.0 (the "License");
5 you may not use this file except in compliance with the License.
6 You may obtain a copy of the License at
7
8 http://www.apache.org/licenses/LICENSE-2.0
9
10 Unless required by applicable law or agreed to in writing, software
11 distributed under the License is distributed on an "AS IS" BASIS,
12 WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 See the License for the specific language governing permissions and
14 limitations under the License.
15*/
16package openflow
17
18import (
19 "bufio"
20 "context"
21 "encoding/binary"
22 "encoding/json"
23 "errors"
Jonathan Hart4b110f62020-03-13 17:36:19 -070024 "io"
25 "net"
Matteo Scandolo936e2df2020-10-27 14:31:36 -070026 "sync"
Jonathan Hart4b110f62020-03-13 17:36:19 -070027 "time"
David Bainbridgef8ce7d22020-04-08 12:49:41 -070028
Jonathan Hart828908c2020-04-15 14:23:45 -070029 "github.com/opencord/goloxi"
30 ofp "github.com/opencord/goloxi/of13"
David Bainbridgef8ce7d22020-04-08 12:49:41 -070031 "github.com/opencord/ofagent-go/internal/pkg/holder"
David K. Bainbridgee05cf0c2021-08-19 03:16:50 +000032 "github.com/opencord/voltha-lib-go/v7/pkg/log"
khenaidoofcf0b8d2021-10-19 17:57:30 -040033 "github.com/opencord/voltha-protos/v5/go/openflow_13"
Jonathan Hart4b110f62020-03-13 17:36:19 -070034)
35
36type OFConnection struct {
37 OFControllerEndPoint string
38 DeviceID string
David Bainbridgef8ce7d22020-04-08 12:49:41 -070039 VolthaClient *holder.VolthaServiceClientHolder
khenaidoofcf0b8d2021-10-19 17:57:30 -040040 PacketOutChannel chan *openflow_13.PacketOut
Jonathan Hart4b110f62020-03-13 17:36:19 -070041 ConnectionMaxRetries int
42 ConnectionRetryDelay time.Duration
43
44 conn net.Conn
45
46 // current role of this connection
47 role ofcRole
48 roleManager RoleManager
49
50 events chan ofcEvent
51 sendChannel chan Message
52 lastUnsentMessage Message
Matteo Scandolo256266d2020-06-01 13:44:07 -070053
54 flowsChunkSize int
55 portsChunkSize int
56 portsDescChunkSize int
Jonathan Hart4b110f62020-03-13 17:36:19 -070057}
58
59func (ofc *OFConnection) peekAtOFHeader(buf []byte) (ofp.IHeader, error) {
60 header := ofp.Header{}
61 header.Version = uint8(buf[0])
62 header.Type = uint8(buf[1])
63 header.Length = binary.BigEndian.Uint16(buf[2:4])
64 header.Xid = binary.BigEndian.Uint32(buf[4:8])
65
66 // TODO: add minimal validation of version and type
67
68 return &header, nil
69}
70
Rohan Agrawalc32d9932020-06-15 11:01:47 +000071func (ofc *OFConnection) establishConnectionToController(ctx context.Context) error {
Jonathan Hart4b110f62020-03-13 17:36:19 -070072 if ofc.conn != nil {
Rohan Agrawalc32d9932020-06-15 11:01:47 +000073 logger.Debugw(ctx, "closing-of-connection-to-reconnect",
Jonathan Hart4b110f62020-03-13 17:36:19 -070074 log.Fields{"device-id": ofc.DeviceID})
Andrey Pozolotin536ee582021-05-28 16:31:44 +030075 err := ofc.conn.Close()
76 if err != nil {
77 logger.Errorw(ctx, "failed-connection-close-proceeding-setting-to-nil", log.Fields{"error": err})
78 }
Jonathan Hart4b110f62020-03-13 17:36:19 -070079 ofc.conn = nil
80 }
81 try := 1
82 for ofc.ConnectionMaxRetries == 0 || try < ofc.ConnectionMaxRetries {
83 if raddr, err := net.ResolveTCPAddr("tcp", ofc.OFControllerEndPoint); err != nil {
Rohan Agrawalc32d9932020-06-15 11:01:47 +000084 logger.Debugw(ctx, "openflow-client unable to resolve endpoint",
Jonathan Hart4b110f62020-03-13 17:36:19 -070085 log.Fields{
86 "device-id": ofc.DeviceID,
87 "endpoint": ofc.OFControllerEndPoint})
88 } else {
89 if connection, err := net.DialTCP("tcp", nil, raddr); err == nil {
90 ofc.conn = connection
Rohan Agrawalc32d9932020-06-15 11:01:47 +000091 ofc.sayHello(ctx)
Jonathan Hart4b110f62020-03-13 17:36:19 -070092 ofc.events <- ofcEventConnect
93 return nil
94 } else {
Rohan Agrawalc32d9932020-06-15 11:01:47 +000095 logger.Warnw(ctx, "openflow-client-connect-error",
Jonathan Hart4b110f62020-03-13 17:36:19 -070096 log.Fields{
97 "device-id": ofc.DeviceID,
98 "endpoint": ofc.OFControllerEndPoint})
99 }
100 }
101 if ofc.ConnectionMaxRetries == 0 || try < ofc.ConnectionMaxRetries {
102 if ofc.ConnectionMaxRetries != 0 {
103 try += 1
104 }
105 time.Sleep(ofc.ConnectionRetryDelay)
106 }
107 }
108 return errors.New("failed-to-connect-to-of-controller")
109}
110
111// Run implements the state machine for the OF client reacting to state change
112// events and invoking actions as a reaction to those state changes
113func (ofc *OFConnection) Run(ctx context.Context) {
114
115 var ofCtx context.Context
116 var ofDone func()
117 state := ofcStateCreated
118 ofc.events <- ofcEventStart
119top:
120 for {
121 select {
122 case <-ctx.Done():
123 state = ofcStateStopped
Rohan Agrawalc32d9932020-06-15 11:01:47 +0000124 logger.Debugw(ctx, "state-transition-context-done",
Jonathan Hart4b110f62020-03-13 17:36:19 -0700125 log.Fields{"device-id": ofc.DeviceID})
126 break top
127 case event := <-ofc.events:
128 previous := state
129 switch event {
130 case ofcEventStart:
Rohan Agrawalc32d9932020-06-15 11:01:47 +0000131 logger.Debugw(ctx, "ofc-event-start",
Jonathan Hart4b110f62020-03-13 17:36:19 -0700132 log.Fields{"device-id": ofc.DeviceID})
133 if state == ofcStateCreated {
134 state = ofcStateStarted
Rohan Agrawalc32d9932020-06-15 11:01:47 +0000135 logger.Debug(ctx, "STARTED MORE THAN ONCE")
Jonathan Hart4b110f62020-03-13 17:36:19 -0700136 go func() {
Rohan Agrawalc32d9932020-06-15 11:01:47 +0000137 if err := ofc.establishConnectionToController(ctx); err != nil {
138 logger.Errorw(ctx, "controller-connection-failed", log.Fields{"error": err})
Jonathan Hart4b110f62020-03-13 17:36:19 -0700139 panic(err)
140 }
141 }()
142 } else {
Rohan Agrawalc32d9932020-06-15 11:01:47 +0000143 logger.Errorw(ctx, "illegal-state-transition",
Jonathan Hart4b110f62020-03-13 17:36:19 -0700144 log.Fields{
145 "device-id": ofc.DeviceID,
146 "current-state": state.String(),
147 "event": event.String()})
148 }
149 case ofcEventConnect:
Rohan Agrawalc32d9932020-06-15 11:01:47 +0000150 logger.Debugw(ctx, "ofc-event-connected",
Jonathan Hart4b110f62020-03-13 17:36:19 -0700151 log.Fields{"device-id": ofc.DeviceID})
152 if state == ofcStateStarted || state == ofcStateDisconnected {
153 state = ofcStateConnected
Girish Kumar01e0c632020-08-10 16:48:56 +0000154 ofCtx, ofDone = context.WithCancel(log.WithSpanFromContext(context.Background(), ctx))
Jonathan Hart4b110f62020-03-13 17:36:19 -0700155 go ofc.messageSender(ofCtx)
156 go ofc.processOFStream(ofCtx)
157 } else {
Rohan Agrawalc32d9932020-06-15 11:01:47 +0000158 logger.Errorw(ctx, "illegal-state-transition",
Jonathan Hart4b110f62020-03-13 17:36:19 -0700159 log.Fields{
160 "device-id": ofc.DeviceID,
161 "current-state": state.String(),
162 "event": event.String()})
163 }
164 case ofcEventDisconnect:
Rohan Agrawalc32d9932020-06-15 11:01:47 +0000165 logger.Debugw(ctx, "ofc-event-disconnected",
Jonathan Hart4b110f62020-03-13 17:36:19 -0700166 log.Fields{
167 "device-id": ofc.DeviceID,
168 "state": state.String()})
169 if state == ofcStateConnected {
170 state = ofcStateDisconnected
171 if ofDone != nil {
172 ofDone()
173 ofDone = nil
174 }
175 go func() {
Rohan Agrawalc32d9932020-06-15 11:01:47 +0000176 if err := ofc.establishConnectionToController(ctx); err != nil {
Girish Kumarbe2ea8a2020-08-19 17:52:55 +0000177 logger.Errorw(ctx, "controller-connection-failed", log.Fields{"error": err})
Jonathan Hart4b110f62020-03-13 17:36:19 -0700178 panic(err)
179 }
180 }()
181 } else {
Rohan Agrawalc32d9932020-06-15 11:01:47 +0000182 logger.Errorw(ctx, "illegal-state-transition",
Jonathan Hart4b110f62020-03-13 17:36:19 -0700183 log.Fields{
184 "device-id": ofc.DeviceID,
185 "current-state": state.String(),
186 "event": event.String()})
187 }
188 case ofcEventStop:
Rohan Agrawalc32d9932020-06-15 11:01:47 +0000189 logger.Debugw(ctx, "ofc-event-stop",
Jonathan Hart4b110f62020-03-13 17:36:19 -0700190 log.Fields{"device-id": ofc.DeviceID})
Andrea Campanellaeb7c5412021-10-29 15:50:24 +0200191 // check for empty send channel to make sure we are not loosing any message to the controller
192 for len(ofc.sendChannel) != 0 {
193 logger.Debugw(ctx, "wait-for-empty-send-channel-to-close-connection",
194 log.Fields{
195 "device-id": ofc.DeviceID,
196 "len of channel": len(ofc.sendChannel),
197 "event": event.String()})
198 }
Jonathan Hart4b110f62020-03-13 17:36:19 -0700199 if state == ofcStateCreated || state == ofcStateConnected || state == ofcStateDisconnected {
200 state = ofcStateStopped
201 break top
202 } else {
Rohan Agrawalc32d9932020-06-15 11:01:47 +0000203 logger.Errorw(ctx, "illegal-state-transition",
Jonathan Hart4b110f62020-03-13 17:36:19 -0700204 log.Fields{
205 "device-id": ofc.DeviceID,
206 "current-state": state.String(),
207 "event": event.String()})
208 }
209 }
Rohan Agrawalc32d9932020-06-15 11:01:47 +0000210 logger.Debugw(ctx, "state-transition",
Jonathan Hart4b110f62020-03-13 17:36:19 -0700211 log.Fields{
212 "device-id": ofc.DeviceID,
213 "previous-state": previous.String(),
214 "current-state": state.String(),
215 "event": event.String()})
216 }
217 }
218
219 // If the child context exists, then cancel it
220 if ofDone != nil {
Girish Kumarbe2ea8a2020-08-19 17:52:55 +0000221 logger.Debugw(ctx, "closing-child-processes",
Jonathan Hart4b110f62020-03-13 17:36:19 -0700222 log.Fields{"device-id": ofc.DeviceID})
223 ofDone()
224 }
225
226 // If the connection is open, then close it
227 if ofc.conn != nil {
Girish Kumarbe2ea8a2020-08-19 17:52:55 +0000228 logger.Debugw(ctx, "closing-of-connection",
Jonathan Hart4b110f62020-03-13 17:36:19 -0700229 log.Fields{"device-id": ofc.DeviceID})
Andrey Pozolotin536ee582021-05-28 16:31:44 +0300230 err := ofc.conn.Close()
231 if err != nil {
232 logger.Errorw(ctx, "closing-of-connection", log.Fields{"error": err})
233 }
Jonathan Hart4b110f62020-03-13 17:36:19 -0700234 ofc.conn = nil
235 }
Girish Kumarbe2ea8a2020-08-19 17:52:55 +0000236 logger.Debugw(ctx, "state-machine-finished",
Jonathan Hart4b110f62020-03-13 17:36:19 -0700237 log.Fields{"device-id": ofc.DeviceID})
238}
239
240// processOFStream processes the OF connection from the controller and invokes
241// the appropriate handler methods for each message.
242func (ofc *OFConnection) processOFStream(ctx context.Context) {
243 fromController := bufio.NewReader(ofc.conn)
244
245 /*
246 * We have a read buffer of a max size of 4096, so if we ever have
247 * a message larger than this then we will have issues
248 */
249 headerBuf := make([]byte, 8)
Matteo Scandolo936e2df2020-10-27 14:31:36 -0700250 wg := sync.WaitGroup{}
Jonathan Hart4b110f62020-03-13 17:36:19 -0700251top:
252 // Continue until we are told to stop
253 for {
254 select {
255 case <-ctx.Done():
Rohan Agrawalc32d9932020-06-15 11:01:47 +0000256 logger.Error(ctx, "of-loop-ending-context-done")
Jonathan Hart4b110f62020-03-13 17:36:19 -0700257 break top
258 default:
259 // Read 8 bytes, the standard OF header
260 read, err := io.ReadFull(fromController, headerBuf)
261 if err != nil {
Jonathan Hart828908c2020-04-15 14:23:45 -0700262 if err == io.EOF {
Rohan Agrawalc32d9932020-06-15 11:01:47 +0000263 logger.Infow(ctx, "controller-disconnected",
Jonathan Hart828908c2020-04-15 14:23:45 -0700264 log.Fields{
265 "device-id": ofc.DeviceID,
266 "controller": ofc.OFControllerEndPoint,
267 })
268 } else {
Rohan Agrawalc32d9932020-06-15 11:01:47 +0000269 logger.Errorw(ctx, "bad-of-header",
Jonathan Hart828908c2020-04-15 14:23:45 -0700270 log.Fields{
271 "byte-count": read,
272 "device-id": ofc.DeviceID,
273 "controller": ofc.OFControllerEndPoint,
274 "error": err})
275 }
Jonathan Hart4b110f62020-03-13 17:36:19 -0700276 break top
277 }
278
279 // Decode the header
280 peek, err := ofc.peekAtOFHeader(headerBuf)
281 if err != nil {
282 /*
283 * Header is bad, assume stream is corrupted
284 * and needs to be restarted
285 */
Rohan Agrawalc32d9932020-06-15 11:01:47 +0000286 logger.Errorw(ctx, "bad-of-packet",
Jonathan Hart4b110f62020-03-13 17:36:19 -0700287 log.Fields{
288 "device-id": ofc.DeviceID,
289 "error": err})
290 break top
291 }
292
293 // Calculate the size of the rest of the packet and read it
294 need := int(peek.GetLength())
295 messageBuf := make([]byte, need)
296 copy(messageBuf, headerBuf)
297 read, err = io.ReadFull(fromController, messageBuf[8:])
298 if err != nil {
Rohan Agrawalc32d9932020-06-15 11:01:47 +0000299 logger.Errorw(ctx, "bad-of-packet",
Jonathan Hart4b110f62020-03-13 17:36:19 -0700300 log.Fields{
301 "byte-count": read,
302 "device-id": ofc.DeviceID,
303 "error": err})
304 break top
305 }
306
307 // Decode and process the packet
308 decoder := goloxi.NewDecoder(messageBuf)
309 msg, err := ofp.DecodeHeader(decoder)
310 if err != nil {
311 // nolint: staticcheck
312 js, _ := json.Marshal(decoder)
Rohan Agrawalc32d9932020-06-15 11:01:47 +0000313 logger.Errorw(ctx, "failed-to-decode",
Jonathan Hart4b110f62020-03-13 17:36:19 -0700314 log.Fields{
315 "device-id": ofc.DeviceID,
316 "decoder": js,
317 "error": err})
318 break top
319 }
320 if logger.V(log.DebugLevel) {
321 js, _ := json.Marshal(msg)
Rohan Agrawalc32d9932020-06-15 11:01:47 +0000322 logger.Debugw(ctx, "packet-header",
Jonathan Hart4b110f62020-03-13 17:36:19 -0700323 log.Fields{
324 "device-id": ofc.DeviceID,
325 "header": js})
326 }
Matteo Scandolo936e2df2020-10-27 14:31:36 -0700327
328 // We can parallelize the processing of all the operations
329 // that we get before a BarrieRequest, then we need to wait.
330 // What we are doing is:
331 // - spawn threads until we get a Barrier
332 // - when we get a barrier wait for the threads to complete before continuing
333
334 msgType := msg.GetType()
335 if msgType == ofp.OFPTBarrierRequest {
336 logger.Debug(ctx, "received-barrier-request-waiting-for-pending-requests")
337 wg.Wait()
338 logger.Debug(ctx, "restarting-requests-processing")
339 }
340
341 wg.Add(1)
342 go ofc.parseHeader(ctx, msg, &wg)
Jonathan Hart4b110f62020-03-13 17:36:19 -0700343 }
344 }
Rohan Agrawalc32d9932020-06-15 11:01:47 +0000345 logger.Debugw(ctx, "end-of-stream",
Jonathan Hart4b110f62020-03-13 17:36:19 -0700346 log.Fields{"device-id": ofc.DeviceID})
347 ofc.events <- ofcEventDisconnect
348}
349
Rohan Agrawalc32d9932020-06-15 11:01:47 +0000350func (ofc *OFConnection) sayHello(ctx context.Context) {
Jonathan Hart4b110f62020-03-13 17:36:19 -0700351 hello := ofp.NewHello()
352 hello.Xid = uint32(GetXid())
353 elem := ofp.NewHelloElemVersionbitmap()
354 elem.SetType(ofp.OFPHETVersionbitmap)
355 elem.SetLength(8)
356 elem.SetBitmaps([]*ofp.Uint32{{Value: 16}})
357 hello.SetElements([]ofp.IHelloElem{elem})
358 if logger.V(log.DebugLevel) {
359 js, _ := json.Marshal(hello)
Rohan Agrawalc32d9932020-06-15 11:01:47 +0000360 logger.Debugw(ctx, "sayHello Called",
Jonathan Hart4b110f62020-03-13 17:36:19 -0700361 log.Fields{
362 "device-id": ofc.DeviceID,
363 "hello-message": js})
364 }
Rohan Agrawalc32d9932020-06-15 11:01:47 +0000365 if err := ofc.SendMessage(ctx, hello); err != nil {
366 logger.Fatalw(ctx, "Failed saying hello to Openflow Server, unable to proceed",
Jonathan Hart4b110f62020-03-13 17:36:19 -0700367 log.Fields{
368 "device-id": ofc.DeviceID,
369 "error": err})
370 }
371}
372
Matteo Scandolo936e2df2020-10-27 14:31:36 -0700373func (ofc *OFConnection) parseHeader(ctx context.Context, header ofp.IHeader, wg *sync.WaitGroup) {
374 defer wg.Done()
Jonathan Hart4b110f62020-03-13 17:36:19 -0700375 headerType := header.GetType()
Rohan Agrawalc32d9932020-06-15 11:01:47 +0000376 logger.Debugw(ctx, "packet-header-type",
Jonathan Hart4b110f62020-03-13 17:36:19 -0700377 log.Fields{
378 "header-type": ofp.Type(headerType).String()})
379 switch headerType {
380 case ofp.OFPTHello:
381 //x := header.(*ofp.Hello)
382 case ofp.OFPTError:
Matteo Scandoloe20dc1b2021-08-31 08:10:04 -0700383 ofc.handleErrMsg(ctx, header.(*ofp.ErrorMsg))
Jonathan Hart4b110f62020-03-13 17:36:19 -0700384 case ofp.OFPTEchoRequest:
Matteo Scandoloe20dc1b2021-08-31 08:10:04 -0700385 ofc.handleEchoRequest(ctx, header.(*ofp.EchoRequest))
Jonathan Hart4b110f62020-03-13 17:36:19 -0700386 case ofp.OFPTEchoReply:
387 case ofp.OFPTExperimenter:
388 case ofp.OFPTFeaturesRequest:
Matteo Scandoloe20dc1b2021-08-31 08:10:04 -0700389 if err := ofc.handleFeatureRequest(ctx, header.(*ofp.FeaturesRequest)); err != nil {
390 logger.Errorw(ctx, "handle-feature-request", log.Fields{"error": err})
391 }
Jonathan Hart4b110f62020-03-13 17:36:19 -0700392 case ofp.OFPTFeaturesReply:
393 case ofp.OFPTGetConfigRequest:
Matteo Scandoloe20dc1b2021-08-31 08:10:04 -0700394 ofc.handleGetConfigRequest(ctx, header.(*ofp.GetConfigRequest))
Jonathan Hart4b110f62020-03-13 17:36:19 -0700395 case ofp.OFPTGetConfigReply:
396 case ofp.OFPTSetConfig:
Matteo Scandoloe20dc1b2021-08-31 08:10:04 -0700397 ofc.handleSetConfig(ctx, header.(*ofp.SetConfig))
Jonathan Hart4b110f62020-03-13 17:36:19 -0700398 case ofp.OFPTPacketIn:
399 case ofp.OFPTFlowRemoved:
400 case ofp.OFPTPortStatus:
401 case ofp.OFPTPacketOut:
402 if !(ofc.role == ofcRoleMaster || ofc.role == ofcRoleEqual) {
Rohan Agrawalc32d9932020-06-15 11:01:47 +0000403 ofc.sendRoleSlaveError(ctx, header)
Jonathan Hart4b110f62020-03-13 17:36:19 -0700404 return
405 }
Matteo Scandoloe20dc1b2021-08-31 08:10:04 -0700406 ofc.handlePacketOut(ctx, header.(*ofp.PacketOut))
Jonathan Hart4b110f62020-03-13 17:36:19 -0700407 case ofp.OFPTFlowMod:
408 if !(ofc.role == ofcRoleMaster || ofc.role == ofcRoleEqual) {
Rohan Agrawalc32d9932020-06-15 11:01:47 +0000409 ofc.sendRoleSlaveError(ctx, header)
Jonathan Hart4b110f62020-03-13 17:36:19 -0700410 return
411 }
Jonathan Hart4b110f62020-03-13 17:36:19 -0700412 switch header.(ofp.IFlowMod).GetCommand() {
413 case ofp.OFPFCAdd:
Rohan Agrawalc32d9932020-06-15 11:01:47 +0000414 ofc.handleFlowAdd(ctx, header.(*ofp.FlowAdd))
Jonathan Hart4b110f62020-03-13 17:36:19 -0700415 case ofp.OFPFCModify:
Rohan Agrawalc32d9932020-06-15 11:01:47 +0000416 ofc.handleFlowMod(ctx, header.(*ofp.FlowMod))
Jonathan Hart4b110f62020-03-13 17:36:19 -0700417 case ofp.OFPFCModifyStrict:
Rohan Agrawalc32d9932020-06-15 11:01:47 +0000418 ofc.handleFlowModStrict(ctx, header.(*ofp.FlowModifyStrict))
Jonathan Hart4b110f62020-03-13 17:36:19 -0700419 case ofp.OFPFCDelete:
Rohan Agrawalc32d9932020-06-15 11:01:47 +0000420 ofc.handleFlowDelete(ctx, header.(*ofp.FlowDelete))
Jonathan Hart4b110f62020-03-13 17:36:19 -0700421 case ofp.OFPFCDeleteStrict:
Rohan Agrawalc32d9932020-06-15 11:01:47 +0000422 ofc.handleFlowDeleteStrict(ctx, header.(*ofp.FlowDeleteStrict))
Jonathan Hart4b110f62020-03-13 17:36:19 -0700423 }
424 case ofp.OFPTStatsRequest:
Matteo Scandoloe20dc1b2021-08-31 08:10:04 -0700425 if err := ofc.handleStatsRequest(ctx, header, header.(ofp.IStatsRequest).GetStatsType()); err != nil {
426 logger.Errorw(ctx, "ofpt-stats-request", log.Fields{"error": err})
427 }
Jonathan Hart4b110f62020-03-13 17:36:19 -0700428 case ofp.OFPTBarrierRequest:
429 /* See note above at case ofp.OFPTFlowMod:*/
Rohan Agrawalc32d9932020-06-15 11:01:47 +0000430 ofc.handleBarrierRequest(ctx, header.(*ofp.BarrierRequest))
Jonathan Hart4b110f62020-03-13 17:36:19 -0700431 case ofp.OFPTRoleRequest:
Matteo Scandoloe20dc1b2021-08-31 08:10:04 -0700432 ofc.handleRoleRequest(ctx, header.(*ofp.RoleRequest))
Jonathan Hart4b110f62020-03-13 17:36:19 -0700433 case ofp.OFPTMeterMod:
434 if !(ofc.role == ofcRoleMaster || ofc.role == ofcRoleEqual) {
Rohan Agrawalc32d9932020-06-15 11:01:47 +0000435 ofc.sendRoleSlaveError(ctx, header)
Jonathan Hart4b110f62020-03-13 17:36:19 -0700436 return
437 }
Rohan Agrawalc32d9932020-06-15 11:01:47 +0000438 ofc.handleMeterModRequest(ctx, header.(*ofp.MeterMod))
Jonathan Hart4b110f62020-03-13 17:36:19 -0700439 case ofp.OFPTGroupMod:
440 if !(ofc.role == ofcRoleMaster || ofc.role == ofcRoleEqual) {
Rohan Agrawalc32d9932020-06-15 11:01:47 +0000441 ofc.sendRoleSlaveError(ctx, header)
Jonathan Hart4b110f62020-03-13 17:36:19 -0700442 return
443 }
Rohan Agrawalc32d9932020-06-15 11:01:47 +0000444 ofc.handleGroupMod(ctx, header.(ofp.IGroupMod))
Jonathan Hart4b110f62020-03-13 17:36:19 -0700445 }
446}
447
448// Message interface that represents an open flow message and enables for a
449// unified implementation of SendMessage
450type Message interface {
451 Serialize(encoder *goloxi.Encoder) error
452}
453
Rohan Agrawalc32d9932020-06-15 11:01:47 +0000454func (ofc *OFConnection) doSend(ctx context.Context, msg Message) error {
Jonathan Hart4b110f62020-03-13 17:36:19 -0700455 if ofc.conn == nil {
456 return errors.New("no-connection")
457 }
458 enc := goloxi.NewEncoder()
459 if err := msg.Serialize(enc); err != nil {
460 return err
461 }
462
463 bytes := enc.Bytes()
464 if _, err := ofc.conn.Write(bytes); err != nil {
Rohan Agrawalc32d9932020-06-15 11:01:47 +0000465 logger.Errorw(ctx, "unable-to-send-message-to-controller",
Jonathan Hart4b110f62020-03-13 17:36:19 -0700466 log.Fields{
467 "device-id": ofc.DeviceID,
468 "message": msg,
469 "error": err})
470 return err
471 }
472 return nil
473}
474
475func (ofc *OFConnection) messageSender(ctx context.Context) {
476 // first process last fail if it exists
477 if ofc.lastUnsentMessage != nil {
Rohan Agrawalc32d9932020-06-15 11:01:47 +0000478 if err := ofc.doSend(ctx, ofc.lastUnsentMessage); err != nil {
Jonathan Hart4b110f62020-03-13 17:36:19 -0700479 ofc.events <- ofcEventDisconnect
480 return
481 }
482 ofc.lastUnsentMessage = nil
483 }
484top:
485 for {
486 select {
487 case <-ctx.Done():
488 break top
489 case msg := <-ofc.sendChannel:
Rohan Agrawalc32d9932020-06-15 11:01:47 +0000490 if err := ofc.doSend(ctx, msg); err != nil {
Jonathan Hart4b110f62020-03-13 17:36:19 -0700491 ofc.lastUnsentMessage = msg
492 ofc.events <- ofcEventDisconnect
Rohan Agrawalc32d9932020-06-15 11:01:47 +0000493 logger.Debugw(ctx, "message-sender-error",
Jonathan Hart4b110f62020-03-13 17:36:19 -0700494 log.Fields{
495 "device-id": ofc.DeviceID,
496 "error": err.Error()})
497 break top
498 }
Rohan Agrawalc32d9932020-06-15 11:01:47 +0000499 logger.Debugw(ctx, "message-sender-send",
Jonathan Hart4b110f62020-03-13 17:36:19 -0700500 log.Fields{
501 "device-id": ofc.DeviceID})
502 ofc.lastUnsentMessage = nil
503 }
504 }
505
Rohan Agrawalc32d9932020-06-15 11:01:47 +0000506 logger.Debugw(ctx, "message-sender-finished",
Jonathan Hart4b110f62020-03-13 17:36:19 -0700507 log.Fields{
508 "device-id": ofc.DeviceID})
509}
510
511// SendMessage queues a message to be sent to the openflow controller
Rohan Agrawalc32d9932020-06-15 11:01:47 +0000512func (ofc *OFConnection) SendMessage(ctx context.Context, message Message) error {
Andrea Campanella0f751d52021-07-27 10:54:08 +0200513 // Removing this very chatty DEBUG log, it's the same as line 273 of client.go
514 //which has been extended to include the role.
515 //logger.Debugw(ctx, "queuing-message", log.Fields{
516 // "endpoint": ofc.OFControllerEndPoint,
517 // "role": ofc.role,
518 //})
Jonathan Hart4b110f62020-03-13 17:36:19 -0700519 ofc.sendChannel <- message
520 return nil
521}