blob: 8bb0b944963aeb1eafa0e2c7549812756a9d90c9 [file] [log] [blame]
David K. Bainbridge157bdab2020-01-16 14:38:05 -08001/*
2 Copyright 2020 the original author or authors.
3
4 Licensed under the Apache License, Version 2.0 (the "License");
5 you may not use this file except in compliance with the License.
6 You may obtain a copy of the License at
7
8 http://www.apache.org/licenses/LICENSE-2.0
9
10 Unless required by applicable law or agreed to in writing, software
11 distributed under the License is distributed on an "AS IS" BASIS,
12 WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 See the License for the specific language governing permissions and
14 limitations under the License.
15*/
16
17package ofagent
18
19import (
20 "context"
David K. Bainbridge9cb404e2020-01-28 14:32:29 -080021 "fmt"
David K. Bainbridge157bdab2020-01-16 14:38:05 -080022 "github.com/opencord/ofagent-go/internal/pkg/openflow"
David K. Bainbridgeaea73cd2020-01-27 10:44:50 -080023 "github.com/opencord/voltha-lib-go/v3/pkg/log"
24 "github.com/opencord/voltha-lib-go/v3/pkg/probe"
25 "github.com/opencord/voltha-protos/v3/go/voltha"
David K. Bainbridge157bdab2020-01-16 14:38:05 -080026 "google.golang.org/grpc"
27 "sync"
28 "time"
29)
30
31var logger, _ = log.AddPackage(log.JSON, log.DebugLevel, nil)
32
33type ofaEvent byte
34type ofaState byte
35
36const (
37 ofaEventStart = ofaEvent(iota)
38 ofaEventVolthaConnected
39 ofaEventVolthaDisconnected
40 ofaEventError
41
42 ofaStateConnected = ofaState(iota)
David K. Bainbridge9cb404e2020-01-28 14:32:29 -080043 ofaStateConnecting
David K. Bainbridge157bdab2020-01-16 14:38:05 -080044 ofaStateDisconnected
45)
46
47type OFAgent struct {
48 VolthaApiEndPoint string
49 OFControllerEndPoint string
50 DeviceListRefreshInterval time.Duration
51 ConnectionMaxRetries int
52 ConnectionRetryDelay time.Duration
53
54 volthaConnection *grpc.ClientConn
55 volthaClient voltha.VolthaServiceClient
56 mapLock sync.Mutex
57 clientMap map[string]*openflow.OFClient
58 events chan ofaEvent
59
60 packetInChannel chan *voltha.PacketIn
61 packetOutChannel chan *voltha.PacketOut
62 changeEventChannel chan *voltha.ChangeEvent
63}
64
65func NewOFAgent(config *OFAgent) (*OFAgent, error) {
66 ofa := OFAgent{
67 VolthaApiEndPoint: config.VolthaApiEndPoint,
68 OFControllerEndPoint: config.OFControllerEndPoint,
69 DeviceListRefreshInterval: config.DeviceListRefreshInterval,
70 ConnectionMaxRetries: config.ConnectionMaxRetries,
71 ConnectionRetryDelay: config.ConnectionRetryDelay,
72 packetInChannel: make(chan *voltha.PacketIn),
73 packetOutChannel: make(chan *voltha.PacketOut),
74 changeEventChannel: make(chan *voltha.ChangeEvent),
75 clientMap: make(map[string]*openflow.OFClient),
76 events: make(chan ofaEvent, 100),
77 }
78
79 if ofa.DeviceListRefreshInterval <= 0 {
80 logger.Warnw("device list refresh internal not valid, setting to default",
81 log.Fields{
82 "value": ofa.DeviceListRefreshInterval.String(),
83 "default": (1 * time.Minute).String()})
84 ofa.DeviceListRefreshInterval = 1 * time.Minute
85 }
86
87 if ofa.ConnectionRetryDelay <= 0 {
88 logger.Warnw("connection retry delay not value, setting to default",
89 log.Fields{
90 "value": ofa.ConnectionRetryDelay.String(),
91 "default": (3 * time.Second).String()})
92 ofa.ConnectionRetryDelay = 3 * time.Second
93 }
94
95 return &ofa, nil
96}
97
98// Run - make the inital connection to voltha and kicks off io streams
99func (ofa *OFAgent) Run(ctx context.Context) {
100
101 logger.Debugw("Starting GRPC - VOLTHA client",
102 log.Fields{
103 "voltha-endpoint": ofa.VolthaApiEndPoint,
104 "controller-endpoint": ofa.OFControllerEndPoint})
105
106 // If the context contains a k8s probe then register services
107 p := probe.GetProbeFromContext(ctx)
108 if p != nil {
109 p.RegisterService("voltha")
110 }
divyadesai81bb7ba2020-03-11 11:45:23 +0000111
David K. Bainbridge157bdab2020-01-16 14:38:05 -0800112 ofa.events <- ofaEventStart
113
114 /*
115 * Two sub-contexts are created here for different purposes so we can
116 * control the lifecyle of processing loops differently.
117 *
118 * volthaCtx - controls those processes that rely on the GRPC
119 * GRPCconnection to voltha and will be restarted when the
120 * GRPC connection is interrupted.
121 * hdlCtx - controls those processes that listen to channels and
122 * process each message. these will likely never be
123 * stopped until the ofagent is stopped.
124 */
125 var volthaCtx, hdlCtx context.Context
126 var volthaDone, hdlDone func()
127 state := ofaStateDisconnected
128
129 for {
130 select {
131 case <-ctx.Done():
132 if volthaDone != nil {
133 volthaDone()
David K. Bainbridge157bdab2020-01-16 14:38:05 -0800134 }
135 if hdlDone != nil {
136 hdlDone()
David K. Bainbridge157bdab2020-01-16 14:38:05 -0800137 }
138 return
139 case event := <-ofa.events:
140 switch event {
141 case ofaEventStart:
142 logger.Debug("ofagent-voltha-start-event")
143
144 // Start the loops that process messages
145 hdlCtx, hdlDone = context.WithCancel(context.Background())
146 go ofa.handlePacketsIn(hdlCtx)
147 go ofa.handleChangeEvents(hdlCtx)
148
149 // Kick off process to attempt to establish
150 // connection to voltha
David K. Bainbridge9cb404e2020-01-28 14:32:29 -0800151 state = ofaStateConnecting
David K. Bainbridgecac73ac2020-02-19 07:00:12 -0800152 go func() {
153 if err := ofa.establishConnectionToVoltha(p); err != nil {
154 logger.Errorw("voltha-connection-failed", log.Fields{"error": err})
155 panic(err)
156 }
157 }()
David K. Bainbridge157bdab2020-01-16 14:38:05 -0800158
159 case ofaEventVolthaConnected:
160 logger.Debug("ofagent-voltha-connect-event")
161
162 // Start the loops that poll from voltha
163 if state != ofaStateConnected {
164 state = ofaStateConnected
165 volthaCtx, volthaDone = context.WithCancel(context.Background())
David K. Bainbridge9cb404e2020-01-28 14:32:29 -0800166 // Reconnect clients
167 for _, client := range ofa.clientMap {
168 if logger.V(log.DebugLevel) {
169 logger.Debugw("reset-client-voltha-connection",
170 log.Fields{
171 "from": fmt.Sprintf("0x%p", &client.VolthaClient),
172 "to": fmt.Sprintf("0x%p", &ofa.volthaClient)})
173 }
174 client.VolthaClient = ofa.volthaClient
175 }
David K. Bainbridge157bdab2020-01-16 14:38:05 -0800176 go ofa.receiveChangeEvents(volthaCtx)
177 go ofa.receivePacketsIn(volthaCtx)
178 go ofa.streamPacketOut(volthaCtx)
179 go ofa.synchronizeDeviceList(volthaCtx)
180 }
181
182 case ofaEventVolthaDisconnected:
David K. Bainbridge9cb404e2020-01-28 14:32:29 -0800183 if p != nil {
184 p.UpdateStatus("voltha", probe.ServiceStatusNotReady)
185 }
David K. Bainbridge157bdab2020-01-16 14:38:05 -0800186 logger.Debug("ofagent-voltha-disconnect-event")
187 if state == ofaStateConnected {
188 state = ofaStateDisconnected
David K. Bainbridge9cb404e2020-01-28 14:32:29 -0800189 ofa.volthaClient = nil
190 for _, client := range ofa.clientMap {
191 client.VolthaClient = nil
192 if logger.V(log.DebugLevel) {
193 logger.Debugw("reset-client-voltha-connection",
194 log.Fields{
195 "from": fmt.Sprintf("0x%p", &client.VolthaClient),
196 "to": "nil"})
197 }
198 }
David K. Bainbridge157bdab2020-01-16 14:38:05 -0800199 volthaDone()
200 volthaDone = nil
David K. Bainbridge157bdab2020-01-16 14:38:05 -0800201 }
David K. Bainbridge9cb404e2020-01-28 14:32:29 -0800202 if state != ofaStateConnecting {
203 state = ofaStateConnecting
David K. Bainbridgecac73ac2020-02-19 07:00:12 -0800204 go func() {
205 if err := ofa.establishConnectionToVoltha(p); err != nil {
206 log.Errorw("voltha-connection-failed", log.Fields{"error": err})
207 panic(err)
208 }
209 }()
David K. Bainbridge9cb404e2020-01-28 14:32:29 -0800210 }
211
David K. Bainbridge157bdab2020-01-16 14:38:05 -0800212 case ofaEventError:
213 logger.Debug("ofagent-error-event")
214 default:
215 logger.Fatalw("ofagent-unknown-event",
216 log.Fields{"event": event})
217 }
218 }
219 }
220}