blob: 87a50b30a8b42a5f03e26c8c4072cdd52e5cc1b3 [file] [log] [blame]
Jonathan Hart4b110f62020-03-13 17:36:19 -07001/*
2 Copyright 2020 the original author or authors.
3
4 Licensed under the Apache License, Version 2.0 (the "License");
5 you may not use this file except in compliance with the License.
6 You may obtain a copy of the License at
7
8 http://www.apache.org/licenses/LICENSE-2.0
9
10 Unless required by applicable law or agreed to in writing, software
11 distributed under the License is distributed on an "AS IS" BASIS,
12 WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 See the License for the specific language governing permissions and
14 limitations under the License.
15*/
16package openflow
17
18import (
19 "bufio"
20 "context"
21 "encoding/binary"
22 "encoding/json"
23 "errors"
Jonathan Hart4b110f62020-03-13 17:36:19 -070024 "io"
25 "net"
Matteo Scandolo936e2df2020-10-27 14:31:36 -070026 "sync"
Jonathan Hart4b110f62020-03-13 17:36:19 -070027 "time"
David Bainbridgef8ce7d22020-04-08 12:49:41 -070028
Jonathan Hart828908c2020-04-15 14:23:45 -070029 "github.com/opencord/goloxi"
30 ofp "github.com/opencord/goloxi/of13"
David Bainbridgef8ce7d22020-04-08 12:49:41 -070031 "github.com/opencord/ofagent-go/internal/pkg/holder"
Maninder12b909f2020-10-23 14:23:36 +053032 "github.com/opencord/voltha-lib-go/v4/pkg/log"
33 "github.com/opencord/voltha-protos/v4/go/voltha"
Jonathan Hart4b110f62020-03-13 17:36:19 -070034)
35
36type OFConnection struct {
37 OFControllerEndPoint string
38 DeviceID string
David Bainbridgef8ce7d22020-04-08 12:49:41 -070039 VolthaClient *holder.VolthaServiceClientHolder
Jonathan Hart4b110f62020-03-13 17:36:19 -070040 PacketOutChannel chan *voltha.PacketOut
41 ConnectionMaxRetries int
42 ConnectionRetryDelay time.Duration
43
44 conn net.Conn
45
46 // current role of this connection
47 role ofcRole
48 roleManager RoleManager
49
50 events chan ofcEvent
51 sendChannel chan Message
52 lastUnsentMessage Message
Matteo Scandolo256266d2020-06-01 13:44:07 -070053
54 flowsChunkSize int
55 portsChunkSize int
56 portsDescChunkSize int
Jonathan Hart4b110f62020-03-13 17:36:19 -070057}
58
59func (ofc *OFConnection) peekAtOFHeader(buf []byte) (ofp.IHeader, error) {
60 header := ofp.Header{}
61 header.Version = uint8(buf[0])
62 header.Type = uint8(buf[1])
63 header.Length = binary.BigEndian.Uint16(buf[2:4])
64 header.Xid = binary.BigEndian.Uint32(buf[4:8])
65
66 // TODO: add minimal validation of version and type
67
68 return &header, nil
69}
70
Rohan Agrawalc32d9932020-06-15 11:01:47 +000071func (ofc *OFConnection) establishConnectionToController(ctx context.Context) error {
Jonathan Hart4b110f62020-03-13 17:36:19 -070072 if ofc.conn != nil {
Rohan Agrawalc32d9932020-06-15 11:01:47 +000073 logger.Debugw(ctx, "closing-of-connection-to-reconnect",
Jonathan Hart4b110f62020-03-13 17:36:19 -070074 log.Fields{"device-id": ofc.DeviceID})
75 ofc.conn.Close()
76 ofc.conn = nil
77 }
78 try := 1
79 for ofc.ConnectionMaxRetries == 0 || try < ofc.ConnectionMaxRetries {
80 if raddr, err := net.ResolveTCPAddr("tcp", ofc.OFControllerEndPoint); err != nil {
Rohan Agrawalc32d9932020-06-15 11:01:47 +000081 logger.Debugw(ctx, "openflow-client unable to resolve endpoint",
Jonathan Hart4b110f62020-03-13 17:36:19 -070082 log.Fields{
83 "device-id": ofc.DeviceID,
84 "endpoint": ofc.OFControllerEndPoint})
85 } else {
86 if connection, err := net.DialTCP("tcp", nil, raddr); err == nil {
87 ofc.conn = connection
Rohan Agrawalc32d9932020-06-15 11:01:47 +000088 ofc.sayHello(ctx)
Jonathan Hart4b110f62020-03-13 17:36:19 -070089 ofc.events <- ofcEventConnect
90 return nil
91 } else {
Rohan Agrawalc32d9932020-06-15 11:01:47 +000092 logger.Warnw(ctx, "openflow-client-connect-error",
Jonathan Hart4b110f62020-03-13 17:36:19 -070093 log.Fields{
94 "device-id": ofc.DeviceID,
95 "endpoint": ofc.OFControllerEndPoint})
96 }
97 }
98 if ofc.ConnectionMaxRetries == 0 || try < ofc.ConnectionMaxRetries {
99 if ofc.ConnectionMaxRetries != 0 {
100 try += 1
101 }
102 time.Sleep(ofc.ConnectionRetryDelay)
103 }
104 }
105 return errors.New("failed-to-connect-to-of-controller")
106}
107
108// Run implements the state machine for the OF client reacting to state change
109// events and invoking actions as a reaction to those state changes
110func (ofc *OFConnection) Run(ctx context.Context) {
111
112 var ofCtx context.Context
113 var ofDone func()
114 state := ofcStateCreated
115 ofc.events <- ofcEventStart
116top:
117 for {
118 select {
119 case <-ctx.Done():
120 state = ofcStateStopped
Rohan Agrawalc32d9932020-06-15 11:01:47 +0000121 logger.Debugw(ctx, "state-transition-context-done",
Jonathan Hart4b110f62020-03-13 17:36:19 -0700122 log.Fields{"device-id": ofc.DeviceID})
123 break top
124 case event := <-ofc.events:
125 previous := state
126 switch event {
127 case ofcEventStart:
Rohan Agrawalc32d9932020-06-15 11:01:47 +0000128 logger.Debugw(ctx, "ofc-event-start",
Jonathan Hart4b110f62020-03-13 17:36:19 -0700129 log.Fields{"device-id": ofc.DeviceID})
130 if state == ofcStateCreated {
131 state = ofcStateStarted
Rohan Agrawalc32d9932020-06-15 11:01:47 +0000132 logger.Debug(ctx, "STARTED MORE THAN ONCE")
Jonathan Hart4b110f62020-03-13 17:36:19 -0700133 go func() {
Rohan Agrawalc32d9932020-06-15 11:01:47 +0000134 if err := ofc.establishConnectionToController(ctx); err != nil {
135 logger.Errorw(ctx, "controller-connection-failed", log.Fields{"error": err})
Jonathan Hart4b110f62020-03-13 17:36:19 -0700136 panic(err)
137 }
138 }()
139 } else {
Rohan Agrawalc32d9932020-06-15 11:01:47 +0000140 logger.Errorw(ctx, "illegal-state-transition",
Jonathan Hart4b110f62020-03-13 17:36:19 -0700141 log.Fields{
142 "device-id": ofc.DeviceID,
143 "current-state": state.String(),
144 "event": event.String()})
145 }
146 case ofcEventConnect:
Rohan Agrawalc32d9932020-06-15 11:01:47 +0000147 logger.Debugw(ctx, "ofc-event-connected",
Jonathan Hart4b110f62020-03-13 17:36:19 -0700148 log.Fields{"device-id": ofc.DeviceID})
149 if state == ofcStateStarted || state == ofcStateDisconnected {
150 state = ofcStateConnected
Girish Kumar01e0c632020-08-10 16:48:56 +0000151 ofCtx, ofDone = context.WithCancel(log.WithSpanFromContext(context.Background(), ctx))
Jonathan Hart4b110f62020-03-13 17:36:19 -0700152 go ofc.messageSender(ofCtx)
153 go ofc.processOFStream(ofCtx)
154 } else {
Rohan Agrawalc32d9932020-06-15 11:01:47 +0000155 logger.Errorw(ctx, "illegal-state-transition",
Jonathan Hart4b110f62020-03-13 17:36:19 -0700156 log.Fields{
157 "device-id": ofc.DeviceID,
158 "current-state": state.String(),
159 "event": event.String()})
160 }
161 case ofcEventDisconnect:
Rohan Agrawalc32d9932020-06-15 11:01:47 +0000162 logger.Debugw(ctx, "ofc-event-disconnected",
Jonathan Hart4b110f62020-03-13 17:36:19 -0700163 log.Fields{
164 "device-id": ofc.DeviceID,
165 "state": state.String()})
166 if state == ofcStateConnected {
167 state = ofcStateDisconnected
168 if ofDone != nil {
169 ofDone()
170 ofDone = nil
171 }
172 go func() {
Rohan Agrawalc32d9932020-06-15 11:01:47 +0000173 if err := ofc.establishConnectionToController(ctx); err != nil {
Girish Kumarbe2ea8a2020-08-19 17:52:55 +0000174 logger.Errorw(ctx, "controller-connection-failed", log.Fields{"error": err})
Jonathan Hart4b110f62020-03-13 17:36:19 -0700175 panic(err)
176 }
177 }()
178 } else {
Rohan Agrawalc32d9932020-06-15 11:01:47 +0000179 logger.Errorw(ctx, "illegal-state-transition",
Jonathan Hart4b110f62020-03-13 17:36:19 -0700180 log.Fields{
181 "device-id": ofc.DeviceID,
182 "current-state": state.String(),
183 "event": event.String()})
184 }
185 case ofcEventStop:
Rohan Agrawalc32d9932020-06-15 11:01:47 +0000186 logger.Debugw(ctx, "ofc-event-stop",
Jonathan Hart4b110f62020-03-13 17:36:19 -0700187 log.Fields{"device-id": ofc.DeviceID})
188 if state == ofcStateCreated || state == ofcStateConnected || state == ofcStateDisconnected {
189 state = ofcStateStopped
190 break top
191 } else {
Rohan Agrawalc32d9932020-06-15 11:01:47 +0000192 logger.Errorw(ctx, "illegal-state-transition",
Jonathan Hart4b110f62020-03-13 17:36:19 -0700193 log.Fields{
194 "device-id": ofc.DeviceID,
195 "current-state": state.String(),
196 "event": event.String()})
197 }
198 }
Rohan Agrawalc32d9932020-06-15 11:01:47 +0000199 logger.Debugw(ctx, "state-transition",
Jonathan Hart4b110f62020-03-13 17:36:19 -0700200 log.Fields{
201 "device-id": ofc.DeviceID,
202 "previous-state": previous.String(),
203 "current-state": state.String(),
204 "event": event.String()})
205 }
206 }
207
208 // If the child context exists, then cancel it
209 if ofDone != nil {
Girish Kumarbe2ea8a2020-08-19 17:52:55 +0000210 logger.Debugw(ctx, "closing-child-processes",
Jonathan Hart4b110f62020-03-13 17:36:19 -0700211 log.Fields{"device-id": ofc.DeviceID})
212 ofDone()
213 }
214
215 // If the connection is open, then close it
216 if ofc.conn != nil {
Girish Kumarbe2ea8a2020-08-19 17:52:55 +0000217 logger.Debugw(ctx, "closing-of-connection",
Jonathan Hart4b110f62020-03-13 17:36:19 -0700218 log.Fields{"device-id": ofc.DeviceID})
219 ofc.conn.Close()
220 ofc.conn = nil
221 }
Girish Kumarbe2ea8a2020-08-19 17:52:55 +0000222 logger.Debugw(ctx, "state-machine-finished",
Jonathan Hart4b110f62020-03-13 17:36:19 -0700223 log.Fields{"device-id": ofc.DeviceID})
224}
225
226// processOFStream processes the OF connection from the controller and invokes
227// the appropriate handler methods for each message.
228func (ofc *OFConnection) processOFStream(ctx context.Context) {
229 fromController := bufio.NewReader(ofc.conn)
230
231 /*
232 * We have a read buffer of a max size of 4096, so if we ever have
233 * a message larger than this then we will have issues
234 */
235 headerBuf := make([]byte, 8)
Matteo Scandolo936e2df2020-10-27 14:31:36 -0700236 wg := sync.WaitGroup{}
Jonathan Hart4b110f62020-03-13 17:36:19 -0700237top:
238 // Continue until we are told to stop
239 for {
240 select {
241 case <-ctx.Done():
Rohan Agrawalc32d9932020-06-15 11:01:47 +0000242 logger.Error(ctx, "of-loop-ending-context-done")
Jonathan Hart4b110f62020-03-13 17:36:19 -0700243 break top
244 default:
245 // Read 8 bytes, the standard OF header
246 read, err := io.ReadFull(fromController, headerBuf)
247 if err != nil {
Jonathan Hart828908c2020-04-15 14:23:45 -0700248 if err == io.EOF {
Rohan Agrawalc32d9932020-06-15 11:01:47 +0000249 logger.Infow(ctx, "controller-disconnected",
Jonathan Hart828908c2020-04-15 14:23:45 -0700250 log.Fields{
251 "device-id": ofc.DeviceID,
252 "controller": ofc.OFControllerEndPoint,
253 })
254 } else {
Rohan Agrawalc32d9932020-06-15 11:01:47 +0000255 logger.Errorw(ctx, "bad-of-header",
Jonathan Hart828908c2020-04-15 14:23:45 -0700256 log.Fields{
257 "byte-count": read,
258 "device-id": ofc.DeviceID,
259 "controller": ofc.OFControllerEndPoint,
260 "error": err})
261 }
Jonathan Hart4b110f62020-03-13 17:36:19 -0700262 break top
263 }
264
265 // Decode the header
266 peek, err := ofc.peekAtOFHeader(headerBuf)
267 if err != nil {
268 /*
269 * Header is bad, assume stream is corrupted
270 * and needs to be restarted
271 */
Rohan Agrawalc32d9932020-06-15 11:01:47 +0000272 logger.Errorw(ctx, "bad-of-packet",
Jonathan Hart4b110f62020-03-13 17:36:19 -0700273 log.Fields{
274 "device-id": ofc.DeviceID,
275 "error": err})
276 break top
277 }
278
279 // Calculate the size of the rest of the packet and read it
280 need := int(peek.GetLength())
281 messageBuf := make([]byte, need)
282 copy(messageBuf, headerBuf)
283 read, err = io.ReadFull(fromController, messageBuf[8:])
284 if err != nil {
Rohan Agrawalc32d9932020-06-15 11:01:47 +0000285 logger.Errorw(ctx, "bad-of-packet",
Jonathan Hart4b110f62020-03-13 17:36:19 -0700286 log.Fields{
287 "byte-count": read,
288 "device-id": ofc.DeviceID,
289 "error": err})
290 break top
291 }
292
293 // Decode and process the packet
294 decoder := goloxi.NewDecoder(messageBuf)
295 msg, err := ofp.DecodeHeader(decoder)
296 if err != nil {
297 // nolint: staticcheck
298 js, _ := json.Marshal(decoder)
Rohan Agrawalc32d9932020-06-15 11:01:47 +0000299 logger.Errorw(ctx, "failed-to-decode",
Jonathan Hart4b110f62020-03-13 17:36:19 -0700300 log.Fields{
301 "device-id": ofc.DeviceID,
302 "decoder": js,
303 "error": err})
304 break top
305 }
306 if logger.V(log.DebugLevel) {
307 js, _ := json.Marshal(msg)
Rohan Agrawalc32d9932020-06-15 11:01:47 +0000308 logger.Debugw(ctx, "packet-header",
Jonathan Hart4b110f62020-03-13 17:36:19 -0700309 log.Fields{
310 "device-id": ofc.DeviceID,
311 "header": js})
312 }
Matteo Scandolo936e2df2020-10-27 14:31:36 -0700313
314 // We can parallelize the processing of all the operations
315 // that we get before a BarrieRequest, then we need to wait.
316 // What we are doing is:
317 // - spawn threads until we get a Barrier
318 // - when we get a barrier wait for the threads to complete before continuing
319
320 msgType := msg.GetType()
321 if msgType == ofp.OFPTBarrierRequest {
322 logger.Debug(ctx, "received-barrier-request-waiting-for-pending-requests")
323 wg.Wait()
324 logger.Debug(ctx, "restarting-requests-processing")
325 }
326
327 wg.Add(1)
328 go ofc.parseHeader(ctx, msg, &wg)
Jonathan Hart4b110f62020-03-13 17:36:19 -0700329 }
330 }
Rohan Agrawalc32d9932020-06-15 11:01:47 +0000331 logger.Debugw(ctx, "end-of-stream",
Jonathan Hart4b110f62020-03-13 17:36:19 -0700332 log.Fields{"device-id": ofc.DeviceID})
333 ofc.events <- ofcEventDisconnect
334}
335
Rohan Agrawalc32d9932020-06-15 11:01:47 +0000336func (ofc *OFConnection) sayHello(ctx context.Context) {
Jonathan Hart4b110f62020-03-13 17:36:19 -0700337 hello := ofp.NewHello()
338 hello.Xid = uint32(GetXid())
339 elem := ofp.NewHelloElemVersionbitmap()
340 elem.SetType(ofp.OFPHETVersionbitmap)
341 elem.SetLength(8)
342 elem.SetBitmaps([]*ofp.Uint32{{Value: 16}})
343 hello.SetElements([]ofp.IHelloElem{elem})
344 if logger.V(log.DebugLevel) {
345 js, _ := json.Marshal(hello)
Rohan Agrawalc32d9932020-06-15 11:01:47 +0000346 logger.Debugw(ctx, "sayHello Called",
Jonathan Hart4b110f62020-03-13 17:36:19 -0700347 log.Fields{
348 "device-id": ofc.DeviceID,
349 "hello-message": js})
350 }
Rohan Agrawalc32d9932020-06-15 11:01:47 +0000351 if err := ofc.SendMessage(ctx, hello); err != nil {
352 logger.Fatalw(ctx, "Failed saying hello to Openflow Server, unable to proceed",
Jonathan Hart4b110f62020-03-13 17:36:19 -0700353 log.Fields{
354 "device-id": ofc.DeviceID,
355 "error": err})
356 }
357}
358
Matteo Scandolo936e2df2020-10-27 14:31:36 -0700359func (ofc *OFConnection) parseHeader(ctx context.Context, header ofp.IHeader, wg *sync.WaitGroup) {
360 defer wg.Done()
Jonathan Hart4b110f62020-03-13 17:36:19 -0700361 headerType := header.GetType()
Rohan Agrawalc32d9932020-06-15 11:01:47 +0000362 logger.Debugw(ctx, "packet-header-type",
Jonathan Hart4b110f62020-03-13 17:36:19 -0700363 log.Fields{
364 "header-type": ofp.Type(headerType).String()})
365 switch headerType {
366 case ofp.OFPTHello:
367 //x := header.(*ofp.Hello)
368 case ofp.OFPTError:
Rohan Agrawalc32d9932020-06-15 11:01:47 +0000369 go ofc.handleErrMsg(ctx, header.(*ofp.ErrorMsg))
Jonathan Hart4b110f62020-03-13 17:36:19 -0700370 case ofp.OFPTEchoRequest:
Rohan Agrawalc32d9932020-06-15 11:01:47 +0000371 go ofc.handleEchoRequest(ctx, header.(*ofp.EchoRequest))
Jonathan Hart4b110f62020-03-13 17:36:19 -0700372 case ofp.OFPTEchoReply:
373 case ofp.OFPTExperimenter:
374 case ofp.OFPTFeaturesRequest:
375 go func() {
Rohan Agrawalc32d9932020-06-15 11:01:47 +0000376 if err := ofc.handleFeatureRequest(ctx, header.(*ofp.FeaturesRequest)); err != nil {
377 logger.Errorw(ctx, "handle-feature-request", log.Fields{"error": err})
Jonathan Hart4b110f62020-03-13 17:36:19 -0700378 }
379 }()
380 case ofp.OFPTFeaturesReply:
381 case ofp.OFPTGetConfigRequest:
Rohan Agrawalc32d9932020-06-15 11:01:47 +0000382 go ofc.handleGetConfigRequest(ctx, header.(*ofp.GetConfigRequest))
Jonathan Hart4b110f62020-03-13 17:36:19 -0700383 case ofp.OFPTGetConfigReply:
384 case ofp.OFPTSetConfig:
Rohan Agrawalc32d9932020-06-15 11:01:47 +0000385 go ofc.handleSetConfig(ctx, header.(*ofp.SetConfig))
Jonathan Hart4b110f62020-03-13 17:36:19 -0700386 case ofp.OFPTPacketIn:
387 case ofp.OFPTFlowRemoved:
388 case ofp.OFPTPortStatus:
389 case ofp.OFPTPacketOut:
390 if !(ofc.role == ofcRoleMaster || ofc.role == ofcRoleEqual) {
Rohan Agrawalc32d9932020-06-15 11:01:47 +0000391 ofc.sendRoleSlaveError(ctx, header)
Jonathan Hart4b110f62020-03-13 17:36:19 -0700392 return
393 }
Rohan Agrawalc32d9932020-06-15 11:01:47 +0000394 go ofc.handlePacketOut(ctx, header.(*ofp.PacketOut))
Jonathan Hart4b110f62020-03-13 17:36:19 -0700395 case ofp.OFPTFlowMod:
396 if !(ofc.role == ofcRoleMaster || ofc.role == ofcRoleEqual) {
Rohan Agrawalc32d9932020-06-15 11:01:47 +0000397 ofc.sendRoleSlaveError(ctx, header)
Jonathan Hart4b110f62020-03-13 17:36:19 -0700398 return
399 }
Jonathan Hart4b110f62020-03-13 17:36:19 -0700400 switch header.(ofp.IFlowMod).GetCommand() {
401 case ofp.OFPFCAdd:
Rohan Agrawalc32d9932020-06-15 11:01:47 +0000402 ofc.handleFlowAdd(ctx, header.(*ofp.FlowAdd))
Jonathan Hart4b110f62020-03-13 17:36:19 -0700403 case ofp.OFPFCModify:
Rohan Agrawalc32d9932020-06-15 11:01:47 +0000404 ofc.handleFlowMod(ctx, header.(*ofp.FlowMod))
Jonathan Hart4b110f62020-03-13 17:36:19 -0700405 case ofp.OFPFCModifyStrict:
Rohan Agrawalc32d9932020-06-15 11:01:47 +0000406 ofc.handleFlowModStrict(ctx, header.(*ofp.FlowModifyStrict))
Jonathan Hart4b110f62020-03-13 17:36:19 -0700407 case ofp.OFPFCDelete:
Rohan Agrawalc32d9932020-06-15 11:01:47 +0000408 ofc.handleFlowDelete(ctx, header.(*ofp.FlowDelete))
Jonathan Hart4b110f62020-03-13 17:36:19 -0700409 case ofp.OFPFCDeleteStrict:
Rohan Agrawalc32d9932020-06-15 11:01:47 +0000410 ofc.handleFlowDeleteStrict(ctx, header.(*ofp.FlowDeleteStrict))
Jonathan Hart4b110f62020-03-13 17:36:19 -0700411 }
412 case ofp.OFPTStatsRequest:
413 go func() {
Rohan Agrawalc32d9932020-06-15 11:01:47 +0000414 if err := ofc.handleStatsRequest(ctx, header, header.(ofp.IStatsRequest).GetStatsType()); err != nil {
415 logger.Errorw(ctx, "ofpt-stats-request", log.Fields{"error": err})
Jonathan Hart4b110f62020-03-13 17:36:19 -0700416 }
417 }()
418 case ofp.OFPTBarrierRequest:
419 /* See note above at case ofp.OFPTFlowMod:*/
Rohan Agrawalc32d9932020-06-15 11:01:47 +0000420 ofc.handleBarrierRequest(ctx, header.(*ofp.BarrierRequest))
Jonathan Hart4b110f62020-03-13 17:36:19 -0700421 case ofp.OFPTRoleRequest:
Rohan Agrawalc32d9932020-06-15 11:01:47 +0000422 go ofc.handleRoleRequest(ctx, header.(*ofp.RoleRequest))
Jonathan Hart4b110f62020-03-13 17:36:19 -0700423 case ofp.OFPTMeterMod:
424 if !(ofc.role == ofcRoleMaster || ofc.role == ofcRoleEqual) {
Rohan Agrawalc32d9932020-06-15 11:01:47 +0000425 ofc.sendRoleSlaveError(ctx, header)
Jonathan Hart4b110f62020-03-13 17:36:19 -0700426 return
427 }
Rohan Agrawalc32d9932020-06-15 11:01:47 +0000428 ofc.handleMeterModRequest(ctx, header.(*ofp.MeterMod))
Jonathan Hart4b110f62020-03-13 17:36:19 -0700429 case ofp.OFPTGroupMod:
430 if !(ofc.role == ofcRoleMaster || ofc.role == ofcRoleEqual) {
Rohan Agrawalc32d9932020-06-15 11:01:47 +0000431 ofc.sendRoleSlaveError(ctx, header)
Jonathan Hart4b110f62020-03-13 17:36:19 -0700432 return
433 }
Rohan Agrawalc32d9932020-06-15 11:01:47 +0000434 ofc.handleGroupMod(ctx, header.(ofp.IGroupMod))
Jonathan Hart4b110f62020-03-13 17:36:19 -0700435 }
436}
437
438// Message interface that represents an open flow message and enables for a
439// unified implementation of SendMessage
440type Message interface {
441 Serialize(encoder *goloxi.Encoder) error
442}
443
Rohan Agrawalc32d9932020-06-15 11:01:47 +0000444func (ofc *OFConnection) doSend(ctx context.Context, msg Message) error {
Jonathan Hart4b110f62020-03-13 17:36:19 -0700445 if ofc.conn == nil {
446 return errors.New("no-connection")
447 }
448 enc := goloxi.NewEncoder()
449 if err := msg.Serialize(enc); err != nil {
450 return err
451 }
452
453 bytes := enc.Bytes()
454 if _, err := ofc.conn.Write(bytes); err != nil {
Rohan Agrawalc32d9932020-06-15 11:01:47 +0000455 logger.Errorw(ctx, "unable-to-send-message-to-controller",
Jonathan Hart4b110f62020-03-13 17:36:19 -0700456 log.Fields{
457 "device-id": ofc.DeviceID,
458 "message": msg,
459 "error": err})
460 return err
461 }
462 return nil
463}
464
465func (ofc *OFConnection) messageSender(ctx context.Context) {
466 // first process last fail if it exists
467 if ofc.lastUnsentMessage != nil {
Rohan Agrawalc32d9932020-06-15 11:01:47 +0000468 if err := ofc.doSend(ctx, ofc.lastUnsentMessage); err != nil {
Jonathan Hart4b110f62020-03-13 17:36:19 -0700469 ofc.events <- ofcEventDisconnect
470 return
471 }
472 ofc.lastUnsentMessage = nil
473 }
474top:
475 for {
476 select {
477 case <-ctx.Done():
478 break top
479 case msg := <-ofc.sendChannel:
Rohan Agrawalc32d9932020-06-15 11:01:47 +0000480 if err := ofc.doSend(ctx, msg); err != nil {
Jonathan Hart4b110f62020-03-13 17:36:19 -0700481 ofc.lastUnsentMessage = msg
482 ofc.events <- ofcEventDisconnect
Rohan Agrawalc32d9932020-06-15 11:01:47 +0000483 logger.Debugw(ctx, "message-sender-error",
Jonathan Hart4b110f62020-03-13 17:36:19 -0700484 log.Fields{
485 "device-id": ofc.DeviceID,
486 "error": err.Error()})
487 break top
488 }
Rohan Agrawalc32d9932020-06-15 11:01:47 +0000489 logger.Debugw(ctx, "message-sender-send",
Jonathan Hart4b110f62020-03-13 17:36:19 -0700490 log.Fields{
491 "device-id": ofc.DeviceID})
492 ofc.lastUnsentMessage = nil
493 }
494 }
495
Rohan Agrawalc32d9932020-06-15 11:01:47 +0000496 logger.Debugw(ctx, "message-sender-finished",
Jonathan Hart4b110f62020-03-13 17:36:19 -0700497 log.Fields{
498 "device-id": ofc.DeviceID})
499}
500
501// SendMessage queues a message to be sent to the openflow controller
Rohan Agrawalc32d9932020-06-15 11:01:47 +0000502func (ofc *OFConnection) SendMessage(ctx context.Context, message Message) error {
Matteo Scandolo65e96762020-09-18 14:24:57 -0700503 logger.Debugw(ctx, "queuing-message", log.Fields{
504 "endpoint": ofc.OFControllerEndPoint,
505 "role": ofc.role,
506 })
Jonathan Hart4b110f62020-03-13 17:36:19 -0700507 ofc.sendChannel <- message
508 return nil
509}