blob: 1037cb965947585fe7545e85c517338d648b50a2 [file] [log] [blame]
Zack Williams41513bf2018-07-07 20:08:35 -07001/*
2 * Copyright 2017-present Open Networking Foundation
3
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7
8 * http://www.apache.org/licenses/LICENSE-2.0
9
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
Stephane Barbarie35595062018-02-08 08:34:39 -050016package core
17
18import (
19 "context"
20 "crypto/tls"
21 "github.com/golang/protobuf/ptypes/empty"
22 "github.com/google/gopacket"
23 "github.com/google/uuid"
24 "github.com/opencord/voltha/ponsim/v2/common"
25 "github.com/opencord/voltha/protos/go/ponsim"
26 "github.com/sirupsen/logrus"
27 "google.golang.org/grpc"
28 "google.golang.org/grpc/credentials"
29 "strconv"
30 "strings"
31 "sync"
32 "time"
33)
34
35// TODO: Cleanup GRPC security config
36// TODO: Pass-in the certificate information as a structure parameter
37
38/*
39PonSimOnuDevice is the structure responsible for the handling of an ONU device
40*/
41type PonSimOnuDevice struct {
42 PonSimDevice
43
44 ParentAddress string
45 ParentPort int32
46 AssignedPort int32
47 Conn *grpc.ClientConn
48
49 oltClient ponsim.PonSimCommonClient
50 stream ponsim.PonSimCommon_ProcessDataClient
51 monitor chan PonSimDeviceState
52 state PonSimDeviceState
53}
54
55/*
56NewPonSimOnuDevice instantiates a new ONU device structure
57*/
58func NewPonSimOnuDevice(device PonSimDevice) *PonSimOnuDevice {
59 onu := &PonSimOnuDevice{PonSimDevice: device}
60
61 return onu
62}
63
64/*
65forwardToOLT defines a INGRESS function to forward a packet to the parent OLT
66*/
67func (o *PonSimOnuDevice) forwardToOLT() func(int, gopacket.Packet) {
68 return func(port int, frame gopacket.Packet) {
69 ipAddress := common.GetInterfaceIP(o.InternalIf)
70 incoming := &ponsim.IncomingData{
71 Id: "INGRESS.ONU." + ipAddress,
72 Address: ipAddress,
73 Port: int32(port),
74 Payload: frame.Data(),
75 }
76 common.Logger().WithFields(logrus.Fields{
77 "device": o,
78 "port": port,
79 "frame": frame,
80 "frameDump": frame.Dump(),
81 "incoming": incoming,
82 }).Debug("Forwarding to OLT")
83
84 // Forward packet to OLT
85 if err := o.stream.Send(incoming); err != nil {
86 common.Logger().WithFields(logrus.Fields{
87 "device": o,
88 "port": port,
89 "frameDump": frame.Dump(),
90 "incoming": incoming,
91 }).Fatal("A problem occurred while forwarding to OLT")
92 }
93 }
94}
95
96/*
97forwardToWAN defines a EGRESS function to forward a packet to the world
98*/
99func (o *PonSimOnuDevice) forwardToWAN() func(int, gopacket.Packet) {
100 return func(port int, frame gopacket.Packet) {
101 var err error
102 common.Logger().WithFields(logrus.Fields{
103 "device": o,
104 "port": port,
105 "frame": frame,
106 }).Debug("Forwarding packet to world")
107 if err = o.ingressHandler.WritePacketData(frame.Data()); err != nil {
108 common.Logger().WithFields(logrus.Fields{
109 "device": o,
110 "port": port,
111 "frame": frame,
112 }).Fatal("Problem while forwarding packet to world")
113 } else {
114 common.Logger().WithFields(logrus.Fields{
115 "device": o,
116 "port": port,
117 "frame": frame,
118 }).Debug("Forwarded packet to world")
119 }
120 }
121}
122
123/*
124Start performs setup operations for an ONU device
125*/
126func (o *PonSimOnuDevice) Start(ctx context.Context) {
127 // Initialize the parent
128 o.PonSimDevice.Start(ctx)
129
130 // Setup flow behaviours
131 // ONU -> OLT
132 o.AddLink(1, 0, o.forwardToOLT())
133 // ONU -> World
134 o.AddLink(2, 0, o.forwardToWAN())
135
136 go o.MonitorConnection(ctx)
137}
138
139/*
140Stop performs cleanup operations for an ONU device
141*/
142func (o *PonSimOnuDevice) Stop(ctx context.Context) {
143 common.Logger().WithFields(logrus.Fields{
144 "device": o,
145 }).Debug("Stopping ONU")
146
147 o.RemoveLink(1, 0)
148 o.RemoveLink(2, 0)
149
150 o.PonSimDevice.Stop(ctx)
151}
152
153/*
154Listen waits for incoming INGRESS data on the external interface
155*/
156func (o *PonSimOnuDevice) Listen(ctx context.Context) {
157 var reply *empty.Empty
158 var err error
159
160 if o.oltClient = ponsim.NewPonSimCommonClient(o.Conn); o.oltClient == nil {
161 common.Logger().WithFields(logrus.Fields{
162 "device": o,
163 }).Fatal("Problem establishing client connection to OLT")
164 panic("Problem establishing client connection to OLT")
165 }
166
167 // Establish GRPC connection with OLT
168 if o.stream, err = o.oltClient.ProcessData(ctx); err != nil {
169 common.Logger().WithFields(logrus.Fields{
170 "device": o,
171 "error": err.Error(),
172 }).Fatal("Problem establishing stream")
173 panic(err)
174 }
175
176 defer o.ingressHandler.Close()
177 packetSource := gopacket.NewPacketSource(o.ingressHandler, o.ingressHandler.LinkType())
178 common.Logger().WithFields(logrus.Fields{
179 "device": o,
180 "interface": o.ExternalIf,
181 }).Debug("Listening to incoming ONU data")
182
183 for packet := range packetSource.Packets() {
184 common.Logger().WithFields(logrus.Fields{
185 "device": o,
186 "packet": packet,
187 }).Debug("Received INGRESS packet")
188
189 o.Forward(ctx, 2, packet)
190 }
191
192 common.Logger().WithFields(logrus.Fields{
193 "device": o,
194 }).Debug("No more packets to process")
195
196 if reply, err = o.stream.CloseAndRecv(); err != nil {
197 common.Logger().Fatal("A problem occurred while closing Ingress stream", err.Error())
198 } else {
199 common.Logger().Info("Ingress stream closed", reply)
200 }
201}
202
203/*
204Register sends a registration request to the remote OLT
205*/
206func (o *PonSimOnuDevice) Register(ctx context.Context) error {
207 var err error
208 var rreq *ponsim.RegistrationRequest
209 var rrep *ponsim.RegistrationReply
210 var client ponsim.PonSimOltClient
211
212 if o.Conn != nil {
213 if client = ponsim.NewPonSimOltClient(o.Conn); client != nil {
214 rreq = &ponsim.RegistrationRequest{
215 Id: uuid.New().String(),
216 Address: common.GetInterfaceIP(o.InternalIf),
217 Port: o.Port,
218 }
219 common.Logger().Printf("Request details %+v\n", rreq)
220
221 // TODO: Loop registration until an OLT becomes available??
222
223 rrep, err = client.Register(ctx, rreq)
224 if err != nil {
225 common.Logger().Printf("Problem with registration", err.Error())
226 } else {
227 // Save OLT address details
228 o.ParentAddress = rrep.GetParentAddress()
229 o.ParentPort = rrep.GetParentPort()
230 o.AssignedPort = rrep.GetAssignedPort()
231
232 common.Logger().Printf("Registration details - %+v\n", rrep)
233
234 o.monitor <- REGISTERED_WITH_OLT
235 }
236
237 } else {
238 common.Logger().Info("Client is NIL")
239 }
240 }
241
242 return err
243}
244
245/*
246MonitorConnection verifies the communication with the OLT
247*/
248func (o *PonSimOnuDevice) MonitorConnection(ctx context.Context) {
249 for {
250 if o.state == DISCONNECTED_FROM_PON {
251 // Establish communication with OLT
252 o.Connect(ctx)
253 }
254
255 if o.state == CONNECTED_TO_PON {
256 // Just stay idle while the ONU-OLT connection is up
257 o.Conn.WaitForStateChange(ctx, o.Conn.GetState())
258
259 // The ONU-OLT connection was lost... need to cleanup
260 o.Disconnect(ctx)
261 }
262
263 time.Sleep(1 * time.Second)
264 }
265}
266
267/*
268Connect sets up communication and monitoring with remote OLT
269*/
270func (o *PonSimOnuDevice) Connect(ctx context.Context) {
271 o.monitor = make(chan PonSimDeviceState, 1)
272
273 // Define a waitgroup to block the current routine until
274 // a CONNECTED state is reached
275 wg := sync.WaitGroup{}
276 wg.Add(1)
277
278 go o.MonitorState(ctx, &wg)
279
280 o.ConnectToRemoteOlt()
281
282 // Wait until we establish a connection to the remote PON
283 wg.Wait()
284}
285
286/*
287Disconnect tears down communication and monitoring with remote OLT
288*/
289func (o *PonSimOnuDevice) Disconnect(ctx context.Context) {
290 if o.egressHandler != nil {
291 o.egressHandler.Close()
292 o.egressHandler = nil
293 }
294
295 if o.Conn != nil {
296 o.Conn.Close()
297 o.Conn = nil
298 }
299
300 if o.monitor != nil {
301 close(o.monitor)
302 o.monitor = nil
303 o.state = DISCONNECTED_FROM_PON
304 }
305}
306
307/*
308MonitorState follows the progress of the OLT connection
309*/
310func (o *PonSimOnuDevice) MonitorState(ctx context.Context, wg *sync.WaitGroup) {
311 // Start a concurrent routine to handle ONU state changes
312 var ok bool
313 for {
314 select {
315 case o.state, ok = <-o.monitor:
316 if ok {
317 common.Logger().WithFields(logrus.Fields{
318 "device": o,
319 "state": o.state,
320 }).Info("Received monitoring state")
321
322 switch o.state {
323 case CONNECTED_TO_PON:
324 // We have successfully connected to the OLT
325 // proceed with registration
326 wg.Done()
327
328 if err := o.Register(ctx); err != nil {
329 o.Disconnect(ctx)
330 }
331
332 case DISCONNECTED_FROM_PON:
333 // Connection to remote OLT was lost... exit
334 common.Logger().WithFields(logrus.Fields{
335 "device": o,
336 }).Warn("Exiting due to disconnection")
337 return
338
339 case REGISTERED_WITH_OLT:
340 // Start listening on network interfaces
341 o.connectNetworkInterfaces()
342 o.monitor <- CONNECTED_IO_INTERFACE
343
344 case CONNECTED_IO_INTERFACE:
345 // Start listening on local interfaces
346 go o.Listen(ctx)
347 }
348 } else {
349 common.Logger().WithFields(logrus.Fields{
350 "device": o,
351 }).Warn("Monitoring channel has closed")
352 return
353 }
354 case <-ctx.Done():
355 common.Logger().WithFields(logrus.Fields{
356 "device": o,
357 }).Warn("Received a cancellation notification")
358
359 return
360 }
361 }
362}
363
364/*
365ConnectToRemoteOlt establishes GRPC communication with the remote OLT
366*/
367func (o *PonSimOnuDevice) ConnectToRemoteOlt() {
368 common.Logger().WithFields(logrus.Fields{
369 "device": o,
370 }).Debug("Connecting to remote device")
371
372 var err error
373
374 host := strings.Join([]string{
375 o.ParentAddress,
376 strconv.Itoa(int(o.ParentPort)),
377 }, ":")
378
379 // TODO: make it secure
380 // GRPC communication needs to be secured
381 ta := credentials.NewTLS(&tls.Config{
382 //Certificates: []tls.Certificate{peerCert},
383 //RootCAs: caCertPool,
384 InsecureSkipVerify: true,
385 })
386
387 if o.Conn, err = grpc.DialContext(
388 context.Background(), host, grpc.WithTransportCredentials(ta), grpc.WithBlock(),
389 ); err != nil {
390 common.Logger().WithFields(logrus.Fields{
391 "device": o,
392 "error": err.Error(),
393 }).Error("Problem establishing connection")
394 } else {
395 // We are now connected
396 // time to move on
397 common.Logger().WithFields(logrus.Fields{
398 "device": o,
399 }).Info("Connected to OLT")
400 }
401
402 o.monitor <- CONNECTED_TO_PON
403}