blob: 25041abc22e8d5c3da6183ddd5b2b7ea50a7e15d [file] [log] [blame]
David K. Bainbridge157bdab2020-01-16 14:38:05 -08001/*
2 Copyright 2020 the original author or authors.
3
4 Licensed under the Apache License, Version 2.0 (the "License");
5 you may not use this file except in compliance with the License.
6 You may obtain a copy of the License at
7
8 http://www.apache.org/licenses/LICENSE-2.0
9
10 Unless required by applicable law or agreed to in writing, software
11 distributed under the License is distributed on an "AS IS" BASIS,
12 WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 See the License for the specific language governing permissions and
14 limitations under the License.
15*/
16
17package ofagent
18
19import (
20 "context"
David K. Bainbridge9cb404e2020-01-28 14:32:29 -080021 "fmt"
David K. Bainbridge157bdab2020-01-16 14:38:05 -080022 "github.com/opencord/ofagent-go/internal/pkg/openflow"
David K. Bainbridgeaea73cd2020-01-27 10:44:50 -080023 "github.com/opencord/voltha-lib-go/v3/pkg/log"
24 "github.com/opencord/voltha-lib-go/v3/pkg/probe"
25 "github.com/opencord/voltha-protos/v3/go/voltha"
David K. Bainbridge157bdab2020-01-16 14:38:05 -080026 "google.golang.org/grpc"
27 "sync"
28 "time"
29)
30
31var logger, _ = log.AddPackage(log.JSON, log.DebugLevel, nil)
32
33type ofaEvent byte
34type ofaState byte
35
36const (
37 ofaEventStart = ofaEvent(iota)
38 ofaEventVolthaConnected
39 ofaEventVolthaDisconnected
40 ofaEventError
41
42 ofaStateConnected = ofaState(iota)
David K. Bainbridge9cb404e2020-01-28 14:32:29 -080043 ofaStateConnecting
David K. Bainbridge157bdab2020-01-16 14:38:05 -080044 ofaStateDisconnected
45)
46
47type OFAgent struct {
48 VolthaApiEndPoint string
49 OFControllerEndPoint string
50 DeviceListRefreshInterval time.Duration
51 ConnectionMaxRetries int
52 ConnectionRetryDelay time.Duration
53
54 volthaConnection *grpc.ClientConn
55 volthaClient voltha.VolthaServiceClient
56 mapLock sync.Mutex
57 clientMap map[string]*openflow.OFClient
58 events chan ofaEvent
59
60 packetInChannel chan *voltha.PacketIn
61 packetOutChannel chan *voltha.PacketOut
62 changeEventChannel chan *voltha.ChangeEvent
63}
64
65func NewOFAgent(config *OFAgent) (*OFAgent, error) {
66 ofa := OFAgent{
67 VolthaApiEndPoint: config.VolthaApiEndPoint,
68 OFControllerEndPoint: config.OFControllerEndPoint,
69 DeviceListRefreshInterval: config.DeviceListRefreshInterval,
70 ConnectionMaxRetries: config.ConnectionMaxRetries,
71 ConnectionRetryDelay: config.ConnectionRetryDelay,
72 packetInChannel: make(chan *voltha.PacketIn),
73 packetOutChannel: make(chan *voltha.PacketOut),
74 changeEventChannel: make(chan *voltha.ChangeEvent),
75 clientMap: make(map[string]*openflow.OFClient),
76 events: make(chan ofaEvent, 100),
77 }
78
79 if ofa.DeviceListRefreshInterval <= 0 {
80 logger.Warnw("device list refresh internal not valid, setting to default",
81 log.Fields{
82 "value": ofa.DeviceListRefreshInterval.String(),
83 "default": (1 * time.Minute).String()})
84 ofa.DeviceListRefreshInterval = 1 * time.Minute
85 }
86
87 if ofa.ConnectionRetryDelay <= 0 {
88 logger.Warnw("connection retry delay not value, setting to default",
89 log.Fields{
90 "value": ofa.ConnectionRetryDelay.String(),
91 "default": (3 * time.Second).String()})
92 ofa.ConnectionRetryDelay = 3 * time.Second
93 }
94
95 return &ofa, nil
96}
97
98// Run - make the inital connection to voltha and kicks off io streams
99func (ofa *OFAgent) Run(ctx context.Context) {
100
101 logger.Debugw("Starting GRPC - VOLTHA client",
102 log.Fields{
103 "voltha-endpoint": ofa.VolthaApiEndPoint,
104 "controller-endpoint": ofa.OFControllerEndPoint})
105
106 // If the context contains a k8s probe then register services
107 p := probe.GetProbeFromContext(ctx)
108 if p != nil {
109 p.RegisterService("voltha")
110 }
111 ofa.events <- ofaEventStart
112
113 /*
114 * Two sub-contexts are created here for different purposes so we can
115 * control the lifecyle of processing loops differently.
116 *
117 * volthaCtx - controls those processes that rely on the GRPC
118 * GRPCconnection to voltha and will be restarted when the
119 * GRPC connection is interrupted.
120 * hdlCtx - controls those processes that listen to channels and
121 * process each message. these will likely never be
122 * stopped until the ofagent is stopped.
123 */
124 var volthaCtx, hdlCtx context.Context
125 var volthaDone, hdlDone func()
126 state := ofaStateDisconnected
127
128 for {
129 select {
130 case <-ctx.Done():
131 if volthaDone != nil {
132 volthaDone()
133 volthaDone = nil
134 }
135 if hdlDone != nil {
136 hdlDone()
137 hdlDone = nil
138 }
139 return
140 case event := <-ofa.events:
141 switch event {
142 case ofaEventStart:
143 logger.Debug("ofagent-voltha-start-event")
144
145 // Start the loops that process messages
146 hdlCtx, hdlDone = context.WithCancel(context.Background())
147 go ofa.handlePacketsIn(hdlCtx)
148 go ofa.handleChangeEvents(hdlCtx)
149
150 // Kick off process to attempt to establish
151 // connection to voltha
David K. Bainbridge9cb404e2020-01-28 14:32:29 -0800152 state = ofaStateConnecting
David K. Bainbridge157bdab2020-01-16 14:38:05 -0800153 go ofa.establishConnectionToVoltha(p)
154
155 case ofaEventVolthaConnected:
156 logger.Debug("ofagent-voltha-connect-event")
157
158 // Start the loops that poll from voltha
159 if state != ofaStateConnected {
160 state = ofaStateConnected
161 volthaCtx, volthaDone = context.WithCancel(context.Background())
David K. Bainbridge9cb404e2020-01-28 14:32:29 -0800162 // Reconnect clients
163 for _, client := range ofa.clientMap {
164 if logger.V(log.DebugLevel) {
165 logger.Debugw("reset-client-voltha-connection",
166 log.Fields{
167 "from": fmt.Sprintf("0x%p", &client.VolthaClient),
168 "to": fmt.Sprintf("0x%p", &ofa.volthaClient)})
169 }
170 client.VolthaClient = ofa.volthaClient
171 }
David K. Bainbridge157bdab2020-01-16 14:38:05 -0800172 go ofa.receiveChangeEvents(volthaCtx)
173 go ofa.receivePacketsIn(volthaCtx)
174 go ofa.streamPacketOut(volthaCtx)
175 go ofa.synchronizeDeviceList(volthaCtx)
176 }
177
178 case ofaEventVolthaDisconnected:
David K. Bainbridge9cb404e2020-01-28 14:32:29 -0800179 if p != nil {
180 p.UpdateStatus("voltha", probe.ServiceStatusNotReady)
181 }
David K. Bainbridge157bdab2020-01-16 14:38:05 -0800182 logger.Debug("ofagent-voltha-disconnect-event")
183 if state == ofaStateConnected {
184 state = ofaStateDisconnected
David K. Bainbridge9cb404e2020-01-28 14:32:29 -0800185 ofa.volthaClient = nil
186 for _, client := range ofa.clientMap {
187 client.VolthaClient = nil
188 if logger.V(log.DebugLevel) {
189 logger.Debugw("reset-client-voltha-connection",
190 log.Fields{
191 "from": fmt.Sprintf("0x%p", &client.VolthaClient),
192 "to": "nil"})
193 }
194 }
David K. Bainbridge157bdab2020-01-16 14:38:05 -0800195 volthaDone()
196 volthaDone = nil
197 volthaCtx = nil
198 }
David K. Bainbridge9cb404e2020-01-28 14:32:29 -0800199 if state != ofaStateConnecting {
200 state = ofaStateConnecting
201 go ofa.establishConnectionToVoltha(p)
202 }
203
David K. Bainbridge157bdab2020-01-16 14:38:05 -0800204 case ofaEventError:
205 logger.Debug("ofagent-error-event")
206 default:
207 logger.Fatalw("ofagent-unknown-event",
208 log.Fields{"event": event})
209 }
210 }
211 }
212}