blob: ea3b0914a5cb3b923678216026e0590fc4ae7995 [file] [log] [blame]
David K. Bainbridge157bdab2020-01-16 14:38:05 -08001/*
2 Copyright 2020 the original author or authors.
3
4 Licensed under the Apache License, Version 2.0 (the "License");
5 you may not use this file except in compliance with the License.
6 You may obtain a copy of the License at
7
8 http://www.apache.org/licenses/LICENSE-2.0
9
10 Unless required by applicable law or agreed to in writing, software
11 distributed under the License is distributed on an "AS IS" BASIS,
12 WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 See the License for the specific language governing permissions and
14 limitations under the License.
15*/
16
17package ofagent
18
19import (
20 "context"
21 "github.com/opencord/ofagent-go/internal/pkg/openflow"
David K. Bainbridgeaea73cd2020-01-27 10:44:50 -080022 "github.com/opencord/voltha-lib-go/v3/pkg/log"
23 "github.com/opencord/voltha-lib-go/v3/pkg/probe"
24 "github.com/opencord/voltha-protos/v3/go/voltha"
David K. Bainbridge157bdab2020-01-16 14:38:05 -080025 "google.golang.org/grpc"
26 "sync"
27 "time"
28)
29
30var logger, _ = log.AddPackage(log.JSON, log.DebugLevel, nil)
31
32type ofaEvent byte
33type ofaState byte
34
35const (
36 ofaEventStart = ofaEvent(iota)
37 ofaEventVolthaConnected
38 ofaEventVolthaDisconnected
39 ofaEventError
40
41 ofaStateConnected = ofaState(iota)
42 ofaStateDisconnected
43)
44
45type OFAgent struct {
46 VolthaApiEndPoint string
47 OFControllerEndPoint string
48 DeviceListRefreshInterval time.Duration
49 ConnectionMaxRetries int
50 ConnectionRetryDelay time.Duration
51
52 volthaConnection *grpc.ClientConn
53 volthaClient voltha.VolthaServiceClient
54 mapLock sync.Mutex
55 clientMap map[string]*openflow.OFClient
56 events chan ofaEvent
57
58 packetInChannel chan *voltha.PacketIn
59 packetOutChannel chan *voltha.PacketOut
60 changeEventChannel chan *voltha.ChangeEvent
61}
62
63func NewOFAgent(config *OFAgent) (*OFAgent, error) {
64 ofa := OFAgent{
65 VolthaApiEndPoint: config.VolthaApiEndPoint,
66 OFControllerEndPoint: config.OFControllerEndPoint,
67 DeviceListRefreshInterval: config.DeviceListRefreshInterval,
68 ConnectionMaxRetries: config.ConnectionMaxRetries,
69 ConnectionRetryDelay: config.ConnectionRetryDelay,
70 packetInChannel: make(chan *voltha.PacketIn),
71 packetOutChannel: make(chan *voltha.PacketOut),
72 changeEventChannel: make(chan *voltha.ChangeEvent),
73 clientMap: make(map[string]*openflow.OFClient),
74 events: make(chan ofaEvent, 100),
75 }
76
77 if ofa.DeviceListRefreshInterval <= 0 {
78 logger.Warnw("device list refresh internal not valid, setting to default",
79 log.Fields{
80 "value": ofa.DeviceListRefreshInterval.String(),
81 "default": (1 * time.Minute).String()})
82 ofa.DeviceListRefreshInterval = 1 * time.Minute
83 }
84
85 if ofa.ConnectionRetryDelay <= 0 {
86 logger.Warnw("connection retry delay not value, setting to default",
87 log.Fields{
88 "value": ofa.ConnectionRetryDelay.String(),
89 "default": (3 * time.Second).String()})
90 ofa.ConnectionRetryDelay = 3 * time.Second
91 }
92
93 return &ofa, nil
94}
95
96// Run - make the inital connection to voltha and kicks off io streams
97func (ofa *OFAgent) Run(ctx context.Context) {
98
99 logger.Debugw("Starting GRPC - VOLTHA client",
100 log.Fields{
101 "voltha-endpoint": ofa.VolthaApiEndPoint,
102 "controller-endpoint": ofa.OFControllerEndPoint})
103
104 // If the context contains a k8s probe then register services
105 p := probe.GetProbeFromContext(ctx)
106 if p != nil {
107 p.RegisterService("voltha")
108 }
109 ofa.events <- ofaEventStart
110
111 /*
112 * Two sub-contexts are created here for different purposes so we can
113 * control the lifecyle of processing loops differently.
114 *
115 * volthaCtx - controls those processes that rely on the GRPC
116 * GRPCconnection to voltha and will be restarted when the
117 * GRPC connection is interrupted.
118 * hdlCtx - controls those processes that listen to channels and
119 * process each message. these will likely never be
120 * stopped until the ofagent is stopped.
121 */
122 var volthaCtx, hdlCtx context.Context
123 var volthaDone, hdlDone func()
124 state := ofaStateDisconnected
125
126 for {
127 select {
128 case <-ctx.Done():
129 if volthaDone != nil {
130 volthaDone()
131 volthaDone = nil
132 }
133 if hdlDone != nil {
134 hdlDone()
135 hdlDone = nil
136 }
137 return
138 case event := <-ofa.events:
139 switch event {
140 case ofaEventStart:
141 logger.Debug("ofagent-voltha-start-event")
142
143 // Start the loops that process messages
144 hdlCtx, hdlDone = context.WithCancel(context.Background())
145 go ofa.handlePacketsIn(hdlCtx)
146 go ofa.handleChangeEvents(hdlCtx)
147
148 // Kick off process to attempt to establish
149 // connection to voltha
150 go ofa.establishConnectionToVoltha(p)
151
152 case ofaEventVolthaConnected:
153 logger.Debug("ofagent-voltha-connect-event")
154
155 // Start the loops that poll from voltha
156 if state != ofaStateConnected {
157 state = ofaStateConnected
158 volthaCtx, volthaDone = context.WithCancel(context.Background())
159 go ofa.receiveChangeEvents(volthaCtx)
160 go ofa.receivePacketsIn(volthaCtx)
161 go ofa.streamPacketOut(volthaCtx)
162 go ofa.synchronizeDeviceList(volthaCtx)
163 }
164
165 case ofaEventVolthaDisconnected:
166 logger.Debug("ofagent-voltha-disconnect-event")
167 if state == ofaStateConnected {
168 state = ofaStateDisconnected
169 volthaDone()
170 volthaDone = nil
171 volthaCtx = nil
172 }
173 case ofaEventError:
174 logger.Debug("ofagent-error-event")
175 default:
176 logger.Fatalw("ofagent-unknown-event",
177 log.Fields{"event": event})
178 }
179 }
180 }
181}