blob: 3908aeb20bc22007bc8b5d51b8cb57fbb402ef0d [file] [log] [blame]
Jonathan Hart4b110f62020-03-13 17:36:19 -07001/*
2 Copyright 2020 the original author or authors.
3
4 Licensed under the Apache License, Version 2.0 (the "License");
5 you may not use this file except in compliance with the License.
6 You may obtain a copy of the License at
7
8 http://www.apache.org/licenses/LICENSE-2.0
9
10 Unless required by applicable law or agreed to in writing, software
11 distributed under the License is distributed on an "AS IS" BASIS,
12 WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 See the License for the specific language governing permissions and
14 limitations under the License.
15*/
16package openflow
17
18import (
19 "bufio"
20 "context"
21 "encoding/binary"
22 "encoding/json"
23 "errors"
Jonathan Hart4b110f62020-03-13 17:36:19 -070024 "io"
25 "net"
26 "time"
David Bainbridgef8ce7d22020-04-08 12:49:41 -070027
Jonathan Hart828908c2020-04-15 14:23:45 -070028 "github.com/opencord/goloxi"
29 ofp "github.com/opencord/goloxi/of13"
David Bainbridgef8ce7d22020-04-08 12:49:41 -070030 "github.com/opencord/ofagent-go/internal/pkg/holder"
31 "github.com/opencord/voltha-lib-go/v3/pkg/log"
32 "github.com/opencord/voltha-protos/v3/go/voltha"
Jonathan Hart4b110f62020-03-13 17:36:19 -070033)
34
35type OFConnection struct {
36 OFControllerEndPoint string
37 DeviceID string
David Bainbridgef8ce7d22020-04-08 12:49:41 -070038 VolthaClient *holder.VolthaServiceClientHolder
Jonathan Hart4b110f62020-03-13 17:36:19 -070039 PacketOutChannel chan *voltha.PacketOut
40 ConnectionMaxRetries int
41 ConnectionRetryDelay time.Duration
42
43 conn net.Conn
44
45 // current role of this connection
46 role ofcRole
47 roleManager RoleManager
48
49 events chan ofcEvent
50 sendChannel chan Message
51 lastUnsentMessage Message
Matteo Scandolo256266d2020-06-01 13:44:07 -070052
53 flowsChunkSize int
54 portsChunkSize int
55 portsDescChunkSize int
Jonathan Hart4b110f62020-03-13 17:36:19 -070056}
57
58func (ofc *OFConnection) peekAtOFHeader(buf []byte) (ofp.IHeader, error) {
59 header := ofp.Header{}
60 header.Version = uint8(buf[0])
61 header.Type = uint8(buf[1])
62 header.Length = binary.BigEndian.Uint16(buf[2:4])
63 header.Xid = binary.BigEndian.Uint32(buf[4:8])
64
65 // TODO: add minimal validation of version and type
66
67 return &header, nil
68}
69
Rohan Agrawalc32d9932020-06-15 11:01:47 +000070func (ofc *OFConnection) establishConnectionToController(ctx context.Context) error {
Jonathan Hart4b110f62020-03-13 17:36:19 -070071 if ofc.conn != nil {
Rohan Agrawalc32d9932020-06-15 11:01:47 +000072 logger.Debugw(ctx, "closing-of-connection-to-reconnect",
Jonathan Hart4b110f62020-03-13 17:36:19 -070073 log.Fields{"device-id": ofc.DeviceID})
74 ofc.conn.Close()
75 ofc.conn = nil
76 }
77 try := 1
78 for ofc.ConnectionMaxRetries == 0 || try < ofc.ConnectionMaxRetries {
79 if raddr, err := net.ResolveTCPAddr("tcp", ofc.OFControllerEndPoint); err != nil {
Rohan Agrawalc32d9932020-06-15 11:01:47 +000080 logger.Debugw(ctx, "openflow-client unable to resolve endpoint",
Jonathan Hart4b110f62020-03-13 17:36:19 -070081 log.Fields{
82 "device-id": ofc.DeviceID,
83 "endpoint": ofc.OFControllerEndPoint})
84 } else {
85 if connection, err := net.DialTCP("tcp", nil, raddr); err == nil {
86 ofc.conn = connection
Rohan Agrawalc32d9932020-06-15 11:01:47 +000087 ofc.sayHello(ctx)
Jonathan Hart4b110f62020-03-13 17:36:19 -070088 ofc.events <- ofcEventConnect
89 return nil
90 } else {
Rohan Agrawalc32d9932020-06-15 11:01:47 +000091 logger.Warnw(ctx, "openflow-client-connect-error",
Jonathan Hart4b110f62020-03-13 17:36:19 -070092 log.Fields{
93 "device-id": ofc.DeviceID,
94 "endpoint": ofc.OFControllerEndPoint})
95 }
96 }
97 if ofc.ConnectionMaxRetries == 0 || try < ofc.ConnectionMaxRetries {
98 if ofc.ConnectionMaxRetries != 0 {
99 try += 1
100 }
101 time.Sleep(ofc.ConnectionRetryDelay)
102 }
103 }
104 return errors.New("failed-to-connect-to-of-controller")
105}
106
107// Run implements the state machine for the OF client reacting to state change
108// events and invoking actions as a reaction to those state changes
109func (ofc *OFConnection) Run(ctx context.Context) {
110
111 var ofCtx context.Context
112 var ofDone func()
113 state := ofcStateCreated
114 ofc.events <- ofcEventStart
115top:
116 for {
117 select {
118 case <-ctx.Done():
119 state = ofcStateStopped
Rohan Agrawalc32d9932020-06-15 11:01:47 +0000120 logger.Debugw(ctx, "state-transition-context-done",
Jonathan Hart4b110f62020-03-13 17:36:19 -0700121 log.Fields{"device-id": ofc.DeviceID})
122 break top
123 case event := <-ofc.events:
124 previous := state
125 switch event {
126 case ofcEventStart:
Rohan Agrawalc32d9932020-06-15 11:01:47 +0000127 logger.Debugw(ctx, "ofc-event-start",
Jonathan Hart4b110f62020-03-13 17:36:19 -0700128 log.Fields{"device-id": ofc.DeviceID})
129 if state == ofcStateCreated {
130 state = ofcStateStarted
Rohan Agrawalc32d9932020-06-15 11:01:47 +0000131 logger.Debug(ctx, "STARTED MORE THAN ONCE")
Jonathan Hart4b110f62020-03-13 17:36:19 -0700132 go func() {
Rohan Agrawalc32d9932020-06-15 11:01:47 +0000133 if err := ofc.establishConnectionToController(ctx); err != nil {
134 logger.Errorw(ctx, "controller-connection-failed", log.Fields{"error": err})
Jonathan Hart4b110f62020-03-13 17:36:19 -0700135 panic(err)
136 }
137 }()
138 } else {
Rohan Agrawalc32d9932020-06-15 11:01:47 +0000139 logger.Errorw(ctx, "illegal-state-transition",
Jonathan Hart4b110f62020-03-13 17:36:19 -0700140 log.Fields{
141 "device-id": ofc.DeviceID,
142 "current-state": state.String(),
143 "event": event.String()})
144 }
145 case ofcEventConnect:
Rohan Agrawalc32d9932020-06-15 11:01:47 +0000146 logger.Debugw(ctx, "ofc-event-connected",
Jonathan Hart4b110f62020-03-13 17:36:19 -0700147 log.Fields{"device-id": ofc.DeviceID})
148 if state == ofcStateStarted || state == ofcStateDisconnected {
149 state = ofcStateConnected
Girish Kumar01e0c632020-08-10 16:48:56 +0000150 ofCtx, ofDone = context.WithCancel(log.WithSpanFromContext(context.Background(), ctx))
Jonathan Hart4b110f62020-03-13 17:36:19 -0700151 go ofc.messageSender(ofCtx)
152 go ofc.processOFStream(ofCtx)
153 } else {
Rohan Agrawalc32d9932020-06-15 11:01:47 +0000154 logger.Errorw(ctx, "illegal-state-transition",
Jonathan Hart4b110f62020-03-13 17:36:19 -0700155 log.Fields{
156 "device-id": ofc.DeviceID,
157 "current-state": state.String(),
158 "event": event.String()})
159 }
160 case ofcEventDisconnect:
Rohan Agrawalc32d9932020-06-15 11:01:47 +0000161 logger.Debugw(ctx, "ofc-event-disconnected",
Jonathan Hart4b110f62020-03-13 17:36:19 -0700162 log.Fields{
163 "device-id": ofc.DeviceID,
164 "state": state.String()})
165 if state == ofcStateConnected {
166 state = ofcStateDisconnected
167 if ofDone != nil {
168 ofDone()
169 ofDone = nil
170 }
171 go func() {
Rohan Agrawalc32d9932020-06-15 11:01:47 +0000172 if err := ofc.establishConnectionToController(ctx); err != nil {
Girish Kumarbe2ea8a2020-08-19 17:52:55 +0000173 logger.Errorw(ctx, "controller-connection-failed", log.Fields{"error": err})
Jonathan Hart4b110f62020-03-13 17:36:19 -0700174 panic(err)
175 }
176 }()
177 } else {
Rohan Agrawalc32d9932020-06-15 11:01:47 +0000178 logger.Errorw(ctx, "illegal-state-transition",
Jonathan Hart4b110f62020-03-13 17:36:19 -0700179 log.Fields{
180 "device-id": ofc.DeviceID,
181 "current-state": state.String(),
182 "event": event.String()})
183 }
184 case ofcEventStop:
Rohan Agrawalc32d9932020-06-15 11:01:47 +0000185 logger.Debugw(ctx, "ofc-event-stop",
Jonathan Hart4b110f62020-03-13 17:36:19 -0700186 log.Fields{"device-id": ofc.DeviceID})
187 if state == ofcStateCreated || state == ofcStateConnected || state == ofcStateDisconnected {
188 state = ofcStateStopped
189 break top
190 } else {
Rohan Agrawalc32d9932020-06-15 11:01:47 +0000191 logger.Errorw(ctx, "illegal-state-transition",
Jonathan Hart4b110f62020-03-13 17:36:19 -0700192 log.Fields{
193 "device-id": ofc.DeviceID,
194 "current-state": state.String(),
195 "event": event.String()})
196 }
197 }
Rohan Agrawalc32d9932020-06-15 11:01:47 +0000198 logger.Debugw(ctx, "state-transition",
Jonathan Hart4b110f62020-03-13 17:36:19 -0700199 log.Fields{
200 "device-id": ofc.DeviceID,
201 "previous-state": previous.String(),
202 "current-state": state.String(),
203 "event": event.String()})
204 }
205 }
206
207 // If the child context exists, then cancel it
208 if ofDone != nil {
Girish Kumarbe2ea8a2020-08-19 17:52:55 +0000209 logger.Debugw(ctx, "closing-child-processes",
Jonathan Hart4b110f62020-03-13 17:36:19 -0700210 log.Fields{"device-id": ofc.DeviceID})
211 ofDone()
212 }
213
214 // If the connection is open, then close it
215 if ofc.conn != nil {
Girish Kumarbe2ea8a2020-08-19 17:52:55 +0000216 logger.Debugw(ctx, "closing-of-connection",
Jonathan Hart4b110f62020-03-13 17:36:19 -0700217 log.Fields{"device-id": ofc.DeviceID})
218 ofc.conn.Close()
219 ofc.conn = nil
220 }
Girish Kumarbe2ea8a2020-08-19 17:52:55 +0000221 logger.Debugw(ctx, "state-machine-finished",
Jonathan Hart4b110f62020-03-13 17:36:19 -0700222 log.Fields{"device-id": ofc.DeviceID})
223}
224
225// processOFStream processes the OF connection from the controller and invokes
226// the appropriate handler methods for each message.
227func (ofc *OFConnection) processOFStream(ctx context.Context) {
228 fromController := bufio.NewReader(ofc.conn)
229
230 /*
231 * We have a read buffer of a max size of 4096, so if we ever have
232 * a message larger than this then we will have issues
233 */
234 headerBuf := make([]byte, 8)
Jonathan Hart4b110f62020-03-13 17:36:19 -0700235top:
236 // Continue until we are told to stop
237 for {
238 select {
239 case <-ctx.Done():
Rohan Agrawalc32d9932020-06-15 11:01:47 +0000240 logger.Error(ctx, "of-loop-ending-context-done")
Jonathan Hart4b110f62020-03-13 17:36:19 -0700241 break top
242 default:
243 // Read 8 bytes, the standard OF header
244 read, err := io.ReadFull(fromController, headerBuf)
245 if err != nil {
Jonathan Hart828908c2020-04-15 14:23:45 -0700246 if err == io.EOF {
Rohan Agrawalc32d9932020-06-15 11:01:47 +0000247 logger.Infow(ctx, "controller-disconnected",
Jonathan Hart828908c2020-04-15 14:23:45 -0700248 log.Fields{
249 "device-id": ofc.DeviceID,
250 "controller": ofc.OFControllerEndPoint,
251 })
252 } else {
Rohan Agrawalc32d9932020-06-15 11:01:47 +0000253 logger.Errorw(ctx, "bad-of-header",
Jonathan Hart828908c2020-04-15 14:23:45 -0700254 log.Fields{
255 "byte-count": read,
256 "device-id": ofc.DeviceID,
257 "controller": ofc.OFControllerEndPoint,
258 "error": err})
259 }
Jonathan Hart4b110f62020-03-13 17:36:19 -0700260 break top
261 }
262
263 // Decode the header
264 peek, err := ofc.peekAtOFHeader(headerBuf)
265 if err != nil {
266 /*
267 * Header is bad, assume stream is corrupted
268 * and needs to be restarted
269 */
Rohan Agrawalc32d9932020-06-15 11:01:47 +0000270 logger.Errorw(ctx, "bad-of-packet",
Jonathan Hart4b110f62020-03-13 17:36:19 -0700271 log.Fields{
272 "device-id": ofc.DeviceID,
273 "error": err})
274 break top
275 }
276
277 // Calculate the size of the rest of the packet and read it
278 need := int(peek.GetLength())
279 messageBuf := make([]byte, need)
280 copy(messageBuf, headerBuf)
281 read, err = io.ReadFull(fromController, messageBuf[8:])
282 if err != nil {
Rohan Agrawalc32d9932020-06-15 11:01:47 +0000283 logger.Errorw(ctx, "bad-of-packet",
Jonathan Hart4b110f62020-03-13 17:36:19 -0700284 log.Fields{
285 "byte-count": read,
286 "device-id": ofc.DeviceID,
287 "error": err})
288 break top
289 }
290
291 // Decode and process the packet
292 decoder := goloxi.NewDecoder(messageBuf)
293 msg, err := ofp.DecodeHeader(decoder)
294 if err != nil {
295 // nolint: staticcheck
296 js, _ := json.Marshal(decoder)
Rohan Agrawalc32d9932020-06-15 11:01:47 +0000297 logger.Errorw(ctx, "failed-to-decode",
Jonathan Hart4b110f62020-03-13 17:36:19 -0700298 log.Fields{
299 "device-id": ofc.DeviceID,
300 "decoder": js,
301 "error": err})
302 break top
303 }
304 if logger.V(log.DebugLevel) {
305 js, _ := json.Marshal(msg)
Rohan Agrawalc32d9932020-06-15 11:01:47 +0000306 logger.Debugw(ctx, "packet-header",
Jonathan Hart4b110f62020-03-13 17:36:19 -0700307 log.Fields{
308 "device-id": ofc.DeviceID,
309 "header": js})
310 }
Andrea Campanella3d955ef2020-06-19 15:29:55 +0200311 /*
312 * Spawning a go routine for every incoming message removes processing ordering guarantees.
313 * Removing such guarantees puts burden on the controller to ensure the correct ordering of
314 * incoming messages and is a less optimal and safe agent implementation.
315 * This is OK for now because ONOS keeps the order guaranteed but the agent needs to avoid
316 * relying on external fairness. Particular care and attention has to be placed in flow add/delete
317 * and relative barrier requests. e.g. a flowMod will be handled in thread different from a barrier,
318 * with no guarantees of handling all messages before a barrier.
319 * A multiple queue (incoming worker and outgoing) is a possible solution.
320 */
Rohan Agrawalc32d9932020-06-15 11:01:47 +0000321 go ofc.parseHeader(ctx, msg)
Jonathan Hart4b110f62020-03-13 17:36:19 -0700322 }
323 }
Rohan Agrawalc32d9932020-06-15 11:01:47 +0000324 logger.Debugw(ctx, "end-of-stream",
Jonathan Hart4b110f62020-03-13 17:36:19 -0700325 log.Fields{"device-id": ofc.DeviceID})
326 ofc.events <- ofcEventDisconnect
327}
328
Rohan Agrawalc32d9932020-06-15 11:01:47 +0000329func (ofc *OFConnection) sayHello(ctx context.Context) {
Jonathan Hart4b110f62020-03-13 17:36:19 -0700330 hello := ofp.NewHello()
331 hello.Xid = uint32(GetXid())
332 elem := ofp.NewHelloElemVersionbitmap()
333 elem.SetType(ofp.OFPHETVersionbitmap)
334 elem.SetLength(8)
335 elem.SetBitmaps([]*ofp.Uint32{{Value: 16}})
336 hello.SetElements([]ofp.IHelloElem{elem})
337 if logger.V(log.DebugLevel) {
338 js, _ := json.Marshal(hello)
Rohan Agrawalc32d9932020-06-15 11:01:47 +0000339 logger.Debugw(ctx, "sayHello Called",
Jonathan Hart4b110f62020-03-13 17:36:19 -0700340 log.Fields{
341 "device-id": ofc.DeviceID,
342 "hello-message": js})
343 }
Rohan Agrawalc32d9932020-06-15 11:01:47 +0000344 if err := ofc.SendMessage(ctx, hello); err != nil {
345 logger.Fatalw(ctx, "Failed saying hello to Openflow Server, unable to proceed",
Jonathan Hart4b110f62020-03-13 17:36:19 -0700346 log.Fields{
347 "device-id": ofc.DeviceID,
348 "error": err})
349 }
350}
351
Rohan Agrawalc32d9932020-06-15 11:01:47 +0000352func (ofc *OFConnection) parseHeader(ctx context.Context, header ofp.IHeader) {
Jonathan Hart4b110f62020-03-13 17:36:19 -0700353 headerType := header.GetType()
Rohan Agrawalc32d9932020-06-15 11:01:47 +0000354 logger.Debugw(ctx, "packet-header-type",
Jonathan Hart4b110f62020-03-13 17:36:19 -0700355 log.Fields{
356 "header-type": ofp.Type(headerType).String()})
357 switch headerType {
358 case ofp.OFPTHello:
359 //x := header.(*ofp.Hello)
360 case ofp.OFPTError:
Rohan Agrawalc32d9932020-06-15 11:01:47 +0000361 go ofc.handleErrMsg(ctx, header.(*ofp.ErrorMsg))
Jonathan Hart4b110f62020-03-13 17:36:19 -0700362 case ofp.OFPTEchoRequest:
Rohan Agrawalc32d9932020-06-15 11:01:47 +0000363 go ofc.handleEchoRequest(ctx, header.(*ofp.EchoRequest))
Jonathan Hart4b110f62020-03-13 17:36:19 -0700364 case ofp.OFPTEchoReply:
365 case ofp.OFPTExperimenter:
366 case ofp.OFPTFeaturesRequest:
367 go func() {
Rohan Agrawalc32d9932020-06-15 11:01:47 +0000368 if err := ofc.handleFeatureRequest(ctx, header.(*ofp.FeaturesRequest)); err != nil {
369 logger.Errorw(ctx, "handle-feature-request", log.Fields{"error": err})
Jonathan Hart4b110f62020-03-13 17:36:19 -0700370 }
371 }()
372 case ofp.OFPTFeaturesReply:
373 case ofp.OFPTGetConfigRequest:
Rohan Agrawalc32d9932020-06-15 11:01:47 +0000374 go ofc.handleGetConfigRequest(ctx, header.(*ofp.GetConfigRequest))
Jonathan Hart4b110f62020-03-13 17:36:19 -0700375 case ofp.OFPTGetConfigReply:
376 case ofp.OFPTSetConfig:
Rohan Agrawalc32d9932020-06-15 11:01:47 +0000377 go ofc.handleSetConfig(ctx, header.(*ofp.SetConfig))
Jonathan Hart4b110f62020-03-13 17:36:19 -0700378 case ofp.OFPTPacketIn:
379 case ofp.OFPTFlowRemoved:
380 case ofp.OFPTPortStatus:
381 case ofp.OFPTPacketOut:
382 if !(ofc.role == ofcRoleMaster || ofc.role == ofcRoleEqual) {
Rohan Agrawalc32d9932020-06-15 11:01:47 +0000383 ofc.sendRoleSlaveError(ctx, header)
Jonathan Hart4b110f62020-03-13 17:36:19 -0700384 return
385 }
Rohan Agrawalc32d9932020-06-15 11:01:47 +0000386 go ofc.handlePacketOut(ctx, header.(*ofp.PacketOut))
Jonathan Hart4b110f62020-03-13 17:36:19 -0700387 case ofp.OFPTFlowMod:
388 if !(ofc.role == ofcRoleMaster || ofc.role == ofcRoleEqual) {
Rohan Agrawalc32d9932020-06-15 11:01:47 +0000389 ofc.sendRoleSlaveError(ctx, header)
Jonathan Hart4b110f62020-03-13 17:36:19 -0700390 return
391 }
Jonathan Hart4b110f62020-03-13 17:36:19 -0700392 switch header.(ofp.IFlowMod).GetCommand() {
393 case ofp.OFPFCAdd:
Rohan Agrawalc32d9932020-06-15 11:01:47 +0000394 ofc.handleFlowAdd(ctx, header.(*ofp.FlowAdd))
Jonathan Hart4b110f62020-03-13 17:36:19 -0700395 case ofp.OFPFCModify:
Rohan Agrawalc32d9932020-06-15 11:01:47 +0000396 ofc.handleFlowMod(ctx, header.(*ofp.FlowMod))
Jonathan Hart4b110f62020-03-13 17:36:19 -0700397 case ofp.OFPFCModifyStrict:
Rohan Agrawalc32d9932020-06-15 11:01:47 +0000398 ofc.handleFlowModStrict(ctx, header.(*ofp.FlowModifyStrict))
Jonathan Hart4b110f62020-03-13 17:36:19 -0700399 case ofp.OFPFCDelete:
Rohan Agrawalc32d9932020-06-15 11:01:47 +0000400 ofc.handleFlowDelete(ctx, header.(*ofp.FlowDelete))
Jonathan Hart4b110f62020-03-13 17:36:19 -0700401 case ofp.OFPFCDeleteStrict:
Rohan Agrawalc32d9932020-06-15 11:01:47 +0000402 ofc.handleFlowDeleteStrict(ctx, header.(*ofp.FlowDeleteStrict))
Jonathan Hart4b110f62020-03-13 17:36:19 -0700403 }
404 case ofp.OFPTStatsRequest:
405 go func() {
Rohan Agrawalc32d9932020-06-15 11:01:47 +0000406 if err := ofc.handleStatsRequest(ctx, header, header.(ofp.IStatsRequest).GetStatsType()); err != nil {
407 logger.Errorw(ctx, "ofpt-stats-request", log.Fields{"error": err})
Jonathan Hart4b110f62020-03-13 17:36:19 -0700408 }
409 }()
410 case ofp.OFPTBarrierRequest:
411 /* See note above at case ofp.OFPTFlowMod:*/
Rohan Agrawalc32d9932020-06-15 11:01:47 +0000412 ofc.handleBarrierRequest(ctx, header.(*ofp.BarrierRequest))
Jonathan Hart4b110f62020-03-13 17:36:19 -0700413 case ofp.OFPTRoleRequest:
Rohan Agrawalc32d9932020-06-15 11:01:47 +0000414 go ofc.handleRoleRequest(ctx, header.(*ofp.RoleRequest))
Jonathan Hart4b110f62020-03-13 17:36:19 -0700415 case ofp.OFPTMeterMod:
416 if !(ofc.role == ofcRoleMaster || ofc.role == ofcRoleEqual) {
Rohan Agrawalc32d9932020-06-15 11:01:47 +0000417 ofc.sendRoleSlaveError(ctx, header)
Jonathan Hart4b110f62020-03-13 17:36:19 -0700418 return
419 }
Rohan Agrawalc32d9932020-06-15 11:01:47 +0000420 ofc.handleMeterModRequest(ctx, header.(*ofp.MeterMod))
Jonathan Hart4b110f62020-03-13 17:36:19 -0700421 case ofp.OFPTGroupMod:
422 if !(ofc.role == ofcRoleMaster || ofc.role == ofcRoleEqual) {
Rohan Agrawalc32d9932020-06-15 11:01:47 +0000423 ofc.sendRoleSlaveError(ctx, header)
Jonathan Hart4b110f62020-03-13 17:36:19 -0700424 return
425 }
Rohan Agrawalc32d9932020-06-15 11:01:47 +0000426 ofc.handleGroupMod(ctx, header.(ofp.IGroupMod))
Jonathan Hart4b110f62020-03-13 17:36:19 -0700427 }
428}
429
430// Message interface that represents an open flow message and enables for a
431// unified implementation of SendMessage
432type Message interface {
433 Serialize(encoder *goloxi.Encoder) error
434}
435
Rohan Agrawalc32d9932020-06-15 11:01:47 +0000436func (ofc *OFConnection) doSend(ctx context.Context, msg Message) error {
Jonathan Hart4b110f62020-03-13 17:36:19 -0700437 if ofc.conn == nil {
438 return errors.New("no-connection")
439 }
440 enc := goloxi.NewEncoder()
441 if err := msg.Serialize(enc); err != nil {
442 return err
443 }
444
445 bytes := enc.Bytes()
446 if _, err := ofc.conn.Write(bytes); err != nil {
Rohan Agrawalc32d9932020-06-15 11:01:47 +0000447 logger.Errorw(ctx, "unable-to-send-message-to-controller",
Jonathan Hart4b110f62020-03-13 17:36:19 -0700448 log.Fields{
449 "device-id": ofc.DeviceID,
450 "message": msg,
451 "error": err})
452 return err
453 }
454 return nil
455}
456
457func (ofc *OFConnection) messageSender(ctx context.Context) {
458 // first process last fail if it exists
459 if ofc.lastUnsentMessage != nil {
Rohan Agrawalc32d9932020-06-15 11:01:47 +0000460 if err := ofc.doSend(ctx, ofc.lastUnsentMessage); err != nil {
Jonathan Hart4b110f62020-03-13 17:36:19 -0700461 ofc.events <- ofcEventDisconnect
462 return
463 }
464 ofc.lastUnsentMessage = nil
465 }
466top:
467 for {
468 select {
469 case <-ctx.Done():
470 break top
471 case msg := <-ofc.sendChannel:
Rohan Agrawalc32d9932020-06-15 11:01:47 +0000472 if err := ofc.doSend(ctx, msg); err != nil {
Jonathan Hart4b110f62020-03-13 17:36:19 -0700473 ofc.lastUnsentMessage = msg
474 ofc.events <- ofcEventDisconnect
Rohan Agrawalc32d9932020-06-15 11:01:47 +0000475 logger.Debugw(ctx, "message-sender-error",
Jonathan Hart4b110f62020-03-13 17:36:19 -0700476 log.Fields{
477 "device-id": ofc.DeviceID,
478 "error": err.Error()})
479 break top
480 }
Rohan Agrawalc32d9932020-06-15 11:01:47 +0000481 logger.Debugw(ctx, "message-sender-send",
Jonathan Hart4b110f62020-03-13 17:36:19 -0700482 log.Fields{
483 "device-id": ofc.DeviceID})
484 ofc.lastUnsentMessage = nil
485 }
486 }
487
Rohan Agrawalc32d9932020-06-15 11:01:47 +0000488 logger.Debugw(ctx, "message-sender-finished",
Jonathan Hart4b110f62020-03-13 17:36:19 -0700489 log.Fields{
490 "device-id": ofc.DeviceID})
491}
492
493// SendMessage queues a message to be sent to the openflow controller
Rohan Agrawalc32d9932020-06-15 11:01:47 +0000494func (ofc *OFConnection) SendMessage(ctx context.Context, message Message) error {
495 logger.Debug(ctx, "queuing-message")
Jonathan Hart4b110f62020-03-13 17:36:19 -0700496 ofc.sendChannel <- message
497 return nil
498}