blob: 1d225b2c5ae7733c53571abf6d01c50c72591546 [file] [log] [blame]
David K. Bainbridge157bdab2020-01-16 14:38:05 -08001/*
2 Copyright 2020 the original author or authors.
3
4 Licensed under the Apache License, Version 2.0 (the "License");
5 you may not use this file except in compliance with the License.
6 You may obtain a copy of the License at
7
8 http://www.apache.org/licenses/LICENSE-2.0
9
10 Unless required by applicable law or agreed to in writing, software
11 distributed under the License is distributed on an "AS IS" BASIS,
12 WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 See the License for the specific language governing permissions and
14 limitations under the License.
15*/
16
17package ofagent
18
19import (
20 "context"
David K. Bainbridge9cb404e2020-01-28 14:32:29 -080021 "fmt"
David K. Bainbridge157bdab2020-01-16 14:38:05 -080022 "github.com/opencord/ofagent-go/internal/pkg/openflow"
David K. Bainbridgeaea73cd2020-01-27 10:44:50 -080023 "github.com/opencord/voltha-lib-go/v3/pkg/log"
24 "github.com/opencord/voltha-lib-go/v3/pkg/probe"
25 "github.com/opencord/voltha-protos/v3/go/voltha"
David K. Bainbridge157bdab2020-01-16 14:38:05 -080026 "google.golang.org/grpc"
27 "sync"
28 "time"
29)
30
David K. Bainbridge157bdab2020-01-16 14:38:05 -080031type ofaEvent byte
32type ofaState byte
33
34const (
35 ofaEventStart = ofaEvent(iota)
36 ofaEventVolthaConnected
37 ofaEventVolthaDisconnected
38 ofaEventError
39
40 ofaStateConnected = ofaState(iota)
David K. Bainbridge9cb404e2020-01-28 14:32:29 -080041 ofaStateConnecting
David K. Bainbridge157bdab2020-01-16 14:38:05 -080042 ofaStateDisconnected
43)
44
45type OFAgent struct {
46 VolthaApiEndPoint string
47 OFControllerEndPoint string
48 DeviceListRefreshInterval time.Duration
49 ConnectionMaxRetries int
50 ConnectionRetryDelay time.Duration
51
52 volthaConnection *grpc.ClientConn
53 volthaClient voltha.VolthaServiceClient
54 mapLock sync.Mutex
55 clientMap map[string]*openflow.OFClient
56 events chan ofaEvent
57
58 packetInChannel chan *voltha.PacketIn
59 packetOutChannel chan *voltha.PacketOut
60 changeEventChannel chan *voltha.ChangeEvent
61}
62
63func NewOFAgent(config *OFAgent) (*OFAgent, error) {
64 ofa := OFAgent{
65 VolthaApiEndPoint: config.VolthaApiEndPoint,
66 OFControllerEndPoint: config.OFControllerEndPoint,
67 DeviceListRefreshInterval: config.DeviceListRefreshInterval,
68 ConnectionMaxRetries: config.ConnectionMaxRetries,
69 ConnectionRetryDelay: config.ConnectionRetryDelay,
70 packetInChannel: make(chan *voltha.PacketIn),
71 packetOutChannel: make(chan *voltha.PacketOut),
72 changeEventChannel: make(chan *voltha.ChangeEvent),
73 clientMap: make(map[string]*openflow.OFClient),
74 events: make(chan ofaEvent, 100),
75 }
76
77 if ofa.DeviceListRefreshInterval <= 0 {
78 logger.Warnw("device list refresh internal not valid, setting to default",
79 log.Fields{
80 "value": ofa.DeviceListRefreshInterval.String(),
81 "default": (1 * time.Minute).String()})
82 ofa.DeviceListRefreshInterval = 1 * time.Minute
83 }
84
85 if ofa.ConnectionRetryDelay <= 0 {
86 logger.Warnw("connection retry delay not value, setting to default",
87 log.Fields{
88 "value": ofa.ConnectionRetryDelay.String(),
89 "default": (3 * time.Second).String()})
90 ofa.ConnectionRetryDelay = 3 * time.Second
91 }
92
93 return &ofa, nil
94}
95
96// Run - make the inital connection to voltha and kicks off io streams
97func (ofa *OFAgent) Run(ctx context.Context) {
98
99 logger.Debugw("Starting GRPC - VOLTHA client",
100 log.Fields{
101 "voltha-endpoint": ofa.VolthaApiEndPoint,
102 "controller-endpoint": ofa.OFControllerEndPoint})
103
104 // If the context contains a k8s probe then register services
105 p := probe.GetProbeFromContext(ctx)
106 if p != nil {
107 p.RegisterService("voltha")
108 }
divyadesai81bb7ba2020-03-11 11:45:23 +0000109
David K. Bainbridge157bdab2020-01-16 14:38:05 -0800110 ofa.events <- ofaEventStart
111
112 /*
113 * Two sub-contexts are created here for different purposes so we can
114 * control the lifecyle of processing loops differently.
115 *
116 * volthaCtx - controls those processes that rely on the GRPC
117 * GRPCconnection to voltha and will be restarted when the
118 * GRPC connection is interrupted.
119 * hdlCtx - controls those processes that listen to channels and
120 * process each message. these will likely never be
121 * stopped until the ofagent is stopped.
122 */
123 var volthaCtx, hdlCtx context.Context
124 var volthaDone, hdlDone func()
125 state := ofaStateDisconnected
126
127 for {
128 select {
129 case <-ctx.Done():
130 if volthaDone != nil {
131 volthaDone()
David K. Bainbridge157bdab2020-01-16 14:38:05 -0800132 }
133 if hdlDone != nil {
134 hdlDone()
David K. Bainbridge157bdab2020-01-16 14:38:05 -0800135 }
136 return
137 case event := <-ofa.events:
138 switch event {
139 case ofaEventStart:
140 logger.Debug("ofagent-voltha-start-event")
141
142 // Start the loops that process messages
143 hdlCtx, hdlDone = context.WithCancel(context.Background())
144 go ofa.handlePacketsIn(hdlCtx)
145 go ofa.handleChangeEvents(hdlCtx)
146
147 // Kick off process to attempt to establish
148 // connection to voltha
David K. Bainbridge9cb404e2020-01-28 14:32:29 -0800149 state = ofaStateConnecting
David K. Bainbridgecac73ac2020-02-19 07:00:12 -0800150 go func() {
151 if err := ofa.establishConnectionToVoltha(p); err != nil {
152 logger.Errorw("voltha-connection-failed", log.Fields{"error": err})
153 panic(err)
154 }
155 }()
David K. Bainbridge157bdab2020-01-16 14:38:05 -0800156
157 case ofaEventVolthaConnected:
158 logger.Debug("ofagent-voltha-connect-event")
159
160 // Start the loops that poll from voltha
161 if state != ofaStateConnected {
162 state = ofaStateConnected
163 volthaCtx, volthaDone = context.WithCancel(context.Background())
David K. Bainbridge9cb404e2020-01-28 14:32:29 -0800164 // Reconnect clients
165 for _, client := range ofa.clientMap {
166 if logger.V(log.DebugLevel) {
167 logger.Debugw("reset-client-voltha-connection",
168 log.Fields{
169 "from": fmt.Sprintf("0x%p", &client.VolthaClient),
170 "to": fmt.Sprintf("0x%p", &ofa.volthaClient)})
171 }
172 client.VolthaClient = ofa.volthaClient
173 }
David K. Bainbridge157bdab2020-01-16 14:38:05 -0800174 go ofa.receiveChangeEvents(volthaCtx)
175 go ofa.receivePacketsIn(volthaCtx)
176 go ofa.streamPacketOut(volthaCtx)
177 go ofa.synchronizeDeviceList(volthaCtx)
178 }
179
180 case ofaEventVolthaDisconnected:
David K. Bainbridge9cb404e2020-01-28 14:32:29 -0800181 if p != nil {
182 p.UpdateStatus("voltha", probe.ServiceStatusNotReady)
183 }
David K. Bainbridge157bdab2020-01-16 14:38:05 -0800184 logger.Debug("ofagent-voltha-disconnect-event")
185 if state == ofaStateConnected {
186 state = ofaStateDisconnected
David K. Bainbridge9cb404e2020-01-28 14:32:29 -0800187 ofa.volthaClient = nil
188 for _, client := range ofa.clientMap {
189 client.VolthaClient = nil
190 if logger.V(log.DebugLevel) {
191 logger.Debugw("reset-client-voltha-connection",
192 log.Fields{
193 "from": fmt.Sprintf("0x%p", &client.VolthaClient),
194 "to": "nil"})
195 }
196 }
David K. Bainbridge157bdab2020-01-16 14:38:05 -0800197 volthaDone()
198 volthaDone = nil
David K. Bainbridge157bdab2020-01-16 14:38:05 -0800199 }
David K. Bainbridge9cb404e2020-01-28 14:32:29 -0800200 if state != ofaStateConnecting {
201 state = ofaStateConnecting
David K. Bainbridgecac73ac2020-02-19 07:00:12 -0800202 go func() {
203 if err := ofa.establishConnectionToVoltha(p); err != nil {
Girish Kumare9d76172020-03-20 20:26:04 +0000204 logger.Errorw("voltha-connection-failed", log.Fields{"error": err})
David K. Bainbridgecac73ac2020-02-19 07:00:12 -0800205 panic(err)
206 }
207 }()
David K. Bainbridge9cb404e2020-01-28 14:32:29 -0800208 }
209
David K. Bainbridge157bdab2020-01-16 14:38:05 -0800210 case ofaEventError:
211 logger.Debug("ofagent-error-event")
212 default:
213 logger.Fatalw("ofagent-unknown-event",
214 log.Fields{"event": event})
215 }
216 }
217 }
218}