blob: c81fd5822f076615b5f2acb82c6bf38a18d3fecf [file] [log] [blame]
David K. Bainbridge157bdab2020-01-16 14:38:05 -08001/*
2 Copyright 2020 the original author or authors.
3
4 Licensed under the Apache License, Version 2.0 (the "License");
5 you may not use this file except in compliance with the License.
6 You may obtain a copy of the License at
7
8 http://www.apache.org/licenses/LICENSE-2.0
9
10 Unless required by applicable law or agreed to in writing, software
11 distributed under the License is distributed on an "AS IS" BASIS,
12 WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 See the License for the specific language governing permissions and
14 limitations under the License.
15*/
16
17package ofagent
18
19import (
20 "context"
David K. Bainbridge9cb404e2020-01-28 14:32:29 -080021 "fmt"
David K. Bainbridge157bdab2020-01-16 14:38:05 -080022 "github.com/opencord/ofagent-go/internal/pkg/openflow"
David K. Bainbridgeaea73cd2020-01-27 10:44:50 -080023 "github.com/opencord/voltha-lib-go/v3/pkg/log"
24 "github.com/opencord/voltha-lib-go/v3/pkg/probe"
25 "github.com/opencord/voltha-protos/v3/go/voltha"
David K. Bainbridge157bdab2020-01-16 14:38:05 -080026 "google.golang.org/grpc"
27 "sync"
28 "time"
29)
30
31var logger, _ = log.AddPackage(log.JSON, log.DebugLevel, nil)
32
33type ofaEvent byte
34type ofaState byte
35
36const (
37 ofaEventStart = ofaEvent(iota)
38 ofaEventVolthaConnected
39 ofaEventVolthaDisconnected
40 ofaEventError
41
42 ofaStateConnected = ofaState(iota)
David K. Bainbridge9cb404e2020-01-28 14:32:29 -080043 ofaStateConnecting
David K. Bainbridge157bdab2020-01-16 14:38:05 -080044 ofaStateDisconnected
45)
46
47type OFAgent struct {
48 VolthaApiEndPoint string
49 OFControllerEndPoint string
50 DeviceListRefreshInterval time.Duration
51 ConnectionMaxRetries int
52 ConnectionRetryDelay time.Duration
53
54 volthaConnection *grpc.ClientConn
55 volthaClient voltha.VolthaServiceClient
56 mapLock sync.Mutex
57 clientMap map[string]*openflow.OFClient
58 events chan ofaEvent
59
60 packetInChannel chan *voltha.PacketIn
61 packetOutChannel chan *voltha.PacketOut
62 changeEventChannel chan *voltha.ChangeEvent
63}
64
65func NewOFAgent(config *OFAgent) (*OFAgent, error) {
66 ofa := OFAgent{
67 VolthaApiEndPoint: config.VolthaApiEndPoint,
68 OFControllerEndPoint: config.OFControllerEndPoint,
69 DeviceListRefreshInterval: config.DeviceListRefreshInterval,
70 ConnectionMaxRetries: config.ConnectionMaxRetries,
71 ConnectionRetryDelay: config.ConnectionRetryDelay,
72 packetInChannel: make(chan *voltha.PacketIn),
73 packetOutChannel: make(chan *voltha.PacketOut),
74 changeEventChannel: make(chan *voltha.ChangeEvent),
75 clientMap: make(map[string]*openflow.OFClient),
76 events: make(chan ofaEvent, 100),
77 }
78
79 if ofa.DeviceListRefreshInterval <= 0 {
80 logger.Warnw("device list refresh internal not valid, setting to default",
81 log.Fields{
82 "value": ofa.DeviceListRefreshInterval.String(),
83 "default": (1 * time.Minute).String()})
84 ofa.DeviceListRefreshInterval = 1 * time.Minute
85 }
86
87 if ofa.ConnectionRetryDelay <= 0 {
88 logger.Warnw("connection retry delay not value, setting to default",
89 log.Fields{
90 "value": ofa.ConnectionRetryDelay.String(),
91 "default": (3 * time.Second).String()})
92 ofa.ConnectionRetryDelay = 3 * time.Second
93 }
94
95 return &ofa, nil
96}
97
98// Run - make the inital connection to voltha and kicks off io streams
99func (ofa *OFAgent) Run(ctx context.Context) {
100
101 logger.Debugw("Starting GRPC - VOLTHA client",
102 log.Fields{
103 "voltha-endpoint": ofa.VolthaApiEndPoint,
104 "controller-endpoint": ofa.OFControllerEndPoint})
105
106 // If the context contains a k8s probe then register services
107 p := probe.GetProbeFromContext(ctx)
108 if p != nil {
109 p.RegisterService("voltha")
110 }
111 ofa.events <- ofaEventStart
112
113 /*
114 * Two sub-contexts are created here for different purposes so we can
115 * control the lifecyle of processing loops differently.
116 *
117 * volthaCtx - controls those processes that rely on the GRPC
118 * GRPCconnection to voltha and will be restarted when the
119 * GRPC connection is interrupted.
120 * hdlCtx - controls those processes that listen to channels and
121 * process each message. these will likely never be
122 * stopped until the ofagent is stopped.
123 */
124 var volthaCtx, hdlCtx context.Context
125 var volthaDone, hdlDone func()
126 state := ofaStateDisconnected
127
128 for {
129 select {
130 case <-ctx.Done():
131 if volthaDone != nil {
132 volthaDone()
David K. Bainbridge157bdab2020-01-16 14:38:05 -0800133 }
134 if hdlDone != nil {
135 hdlDone()
David K. Bainbridge157bdab2020-01-16 14:38:05 -0800136 }
137 return
138 case event := <-ofa.events:
139 switch event {
140 case ofaEventStart:
141 logger.Debug("ofagent-voltha-start-event")
142
143 // Start the loops that process messages
144 hdlCtx, hdlDone = context.WithCancel(context.Background())
145 go ofa.handlePacketsIn(hdlCtx)
146 go ofa.handleChangeEvents(hdlCtx)
147
148 // Kick off process to attempt to establish
149 // connection to voltha
David K. Bainbridge9cb404e2020-01-28 14:32:29 -0800150 state = ofaStateConnecting
David K. Bainbridgecac73ac2020-02-19 07:00:12 -0800151 go func() {
152 if err := ofa.establishConnectionToVoltha(p); err != nil {
153 logger.Errorw("voltha-connection-failed", log.Fields{"error": err})
154 panic(err)
155 }
156 }()
David K. Bainbridge157bdab2020-01-16 14:38:05 -0800157
158 case ofaEventVolthaConnected:
159 logger.Debug("ofagent-voltha-connect-event")
160
161 // Start the loops that poll from voltha
162 if state != ofaStateConnected {
163 state = ofaStateConnected
164 volthaCtx, volthaDone = context.WithCancel(context.Background())
David K. Bainbridge9cb404e2020-01-28 14:32:29 -0800165 // Reconnect clients
166 for _, client := range ofa.clientMap {
167 if logger.V(log.DebugLevel) {
168 logger.Debugw("reset-client-voltha-connection",
169 log.Fields{
170 "from": fmt.Sprintf("0x%p", &client.VolthaClient),
171 "to": fmt.Sprintf("0x%p", &ofa.volthaClient)})
172 }
173 client.VolthaClient = ofa.volthaClient
174 }
David K. Bainbridge157bdab2020-01-16 14:38:05 -0800175 go ofa.receiveChangeEvents(volthaCtx)
176 go ofa.receivePacketsIn(volthaCtx)
177 go ofa.streamPacketOut(volthaCtx)
178 go ofa.synchronizeDeviceList(volthaCtx)
179 }
180
181 case ofaEventVolthaDisconnected:
David K. Bainbridge9cb404e2020-01-28 14:32:29 -0800182 if p != nil {
183 p.UpdateStatus("voltha", probe.ServiceStatusNotReady)
184 }
David K. Bainbridge157bdab2020-01-16 14:38:05 -0800185 logger.Debug("ofagent-voltha-disconnect-event")
186 if state == ofaStateConnected {
187 state = ofaStateDisconnected
David K. Bainbridge9cb404e2020-01-28 14:32:29 -0800188 ofa.volthaClient = nil
189 for _, client := range ofa.clientMap {
190 client.VolthaClient = nil
191 if logger.V(log.DebugLevel) {
192 logger.Debugw("reset-client-voltha-connection",
193 log.Fields{
194 "from": fmt.Sprintf("0x%p", &client.VolthaClient),
195 "to": "nil"})
196 }
197 }
David K. Bainbridge157bdab2020-01-16 14:38:05 -0800198 volthaDone()
199 volthaDone = nil
David K. Bainbridge157bdab2020-01-16 14:38:05 -0800200 }
David K. Bainbridge9cb404e2020-01-28 14:32:29 -0800201 if state != ofaStateConnecting {
202 state = ofaStateConnecting
David K. Bainbridgecac73ac2020-02-19 07:00:12 -0800203 go func() {
204 if err := ofa.establishConnectionToVoltha(p); err != nil {
205 log.Errorw("voltha-connection-failed", log.Fields{"error": err})
206 panic(err)
207 }
208 }()
David K. Bainbridge9cb404e2020-01-28 14:32:29 -0800209 }
210
David K. Bainbridge157bdab2020-01-16 14:38:05 -0800211 case ofaEventError:
212 logger.Debug("ofagent-error-event")
213 default:
214 logger.Fatalw("ofagent-unknown-event",
215 log.Fields{"event": event})
216 }
217 }
218 }
219}