blob: f58d473fcefca405cf6f8800e877a9156fb0f11f [file] [log] [blame]
Stephane Barbarie35595062018-02-08 08:34:39 -05001package core
2
3import (
4 "context"
5 "crypto/tls"
6 "github.com/golang/protobuf/ptypes/empty"
7 "github.com/google/gopacket"
8 "github.com/google/uuid"
9 "github.com/opencord/voltha/ponsim/v2/common"
10 "github.com/opencord/voltha/protos/go/ponsim"
11 "github.com/sirupsen/logrus"
12 "google.golang.org/grpc"
13 "google.golang.org/grpc/credentials"
14 "strconv"
15 "strings"
16 "sync"
17 "time"
18)
19
20// TODO: Cleanup GRPC security config
21// TODO: Pass-in the certificate information as a structure parameter
22
23/*
24PonSimOnuDevice is the structure responsible for the handling of an ONU device
25*/
26type PonSimOnuDevice struct {
27 PonSimDevice
28
29 ParentAddress string
30 ParentPort int32
31 AssignedPort int32
32 Conn *grpc.ClientConn
33
34 oltClient ponsim.PonSimCommonClient
35 stream ponsim.PonSimCommon_ProcessDataClient
36 monitor chan PonSimDeviceState
37 state PonSimDeviceState
38}
39
40/*
41NewPonSimOnuDevice instantiates a new ONU device structure
42*/
43func NewPonSimOnuDevice(device PonSimDevice) *PonSimOnuDevice {
44 onu := &PonSimOnuDevice{PonSimDevice: device}
45
46 return onu
47}
48
49/*
50forwardToOLT defines a INGRESS function to forward a packet to the parent OLT
51*/
52func (o *PonSimOnuDevice) forwardToOLT() func(int, gopacket.Packet) {
53 return func(port int, frame gopacket.Packet) {
54 ipAddress := common.GetInterfaceIP(o.InternalIf)
55 incoming := &ponsim.IncomingData{
56 Id: "INGRESS.ONU." + ipAddress,
57 Address: ipAddress,
58 Port: int32(port),
59 Payload: frame.Data(),
60 }
61 common.Logger().WithFields(logrus.Fields{
62 "device": o,
63 "port": port,
64 "frame": frame,
65 "frameDump": frame.Dump(),
66 "incoming": incoming,
67 }).Debug("Forwarding to OLT")
68
69 // Forward packet to OLT
70 if err := o.stream.Send(incoming); err != nil {
71 common.Logger().WithFields(logrus.Fields{
72 "device": o,
73 "port": port,
74 "frameDump": frame.Dump(),
75 "incoming": incoming,
76 }).Fatal("A problem occurred while forwarding to OLT")
77 }
78 }
79}
80
81/*
82forwardToWAN defines a EGRESS function to forward a packet to the world
83*/
84func (o *PonSimOnuDevice) forwardToWAN() func(int, gopacket.Packet) {
85 return func(port int, frame gopacket.Packet) {
86 var err error
87 common.Logger().WithFields(logrus.Fields{
88 "device": o,
89 "port": port,
90 "frame": frame,
91 }).Debug("Forwarding packet to world")
92 if err = o.ingressHandler.WritePacketData(frame.Data()); err != nil {
93 common.Logger().WithFields(logrus.Fields{
94 "device": o,
95 "port": port,
96 "frame": frame,
97 }).Fatal("Problem while forwarding packet to world")
98 } else {
99 common.Logger().WithFields(logrus.Fields{
100 "device": o,
101 "port": port,
102 "frame": frame,
103 }).Debug("Forwarded packet to world")
104 }
105 }
106}
107
108/*
109Start performs setup operations for an ONU device
110*/
111func (o *PonSimOnuDevice) Start(ctx context.Context) {
112 // Initialize the parent
113 o.PonSimDevice.Start(ctx)
114
115 // Setup flow behaviours
116 // ONU -> OLT
117 o.AddLink(1, 0, o.forwardToOLT())
118 // ONU -> World
119 o.AddLink(2, 0, o.forwardToWAN())
120
121 go o.MonitorConnection(ctx)
122}
123
124/*
125Stop performs cleanup operations for an ONU device
126*/
127func (o *PonSimOnuDevice) Stop(ctx context.Context) {
128 common.Logger().WithFields(logrus.Fields{
129 "device": o,
130 }).Debug("Stopping ONU")
131
132 o.RemoveLink(1, 0)
133 o.RemoveLink(2, 0)
134
135 o.PonSimDevice.Stop(ctx)
136}
137
138/*
139Listen waits for incoming INGRESS data on the external interface
140*/
141func (o *PonSimOnuDevice) Listen(ctx context.Context) {
142 var reply *empty.Empty
143 var err error
144
145 if o.oltClient = ponsim.NewPonSimCommonClient(o.Conn); o.oltClient == nil {
146 common.Logger().WithFields(logrus.Fields{
147 "device": o,
148 }).Fatal("Problem establishing client connection to OLT")
149 panic("Problem establishing client connection to OLT")
150 }
151
152 // Establish GRPC connection with OLT
153 if o.stream, err = o.oltClient.ProcessData(ctx); err != nil {
154 common.Logger().WithFields(logrus.Fields{
155 "device": o,
156 "error": err.Error(),
157 }).Fatal("Problem establishing stream")
158 panic(err)
159 }
160
161 defer o.ingressHandler.Close()
162 packetSource := gopacket.NewPacketSource(o.ingressHandler, o.ingressHandler.LinkType())
163 common.Logger().WithFields(logrus.Fields{
164 "device": o,
165 "interface": o.ExternalIf,
166 }).Debug("Listening to incoming ONU data")
167
168 for packet := range packetSource.Packets() {
169 common.Logger().WithFields(logrus.Fields{
170 "device": o,
171 "packet": packet,
172 }).Debug("Received INGRESS packet")
173
174 o.Forward(ctx, 2, packet)
175 }
176
177 common.Logger().WithFields(logrus.Fields{
178 "device": o,
179 }).Debug("No more packets to process")
180
181 if reply, err = o.stream.CloseAndRecv(); err != nil {
182 common.Logger().Fatal("A problem occurred while closing Ingress stream", err.Error())
183 } else {
184 common.Logger().Info("Ingress stream closed", reply)
185 }
186}
187
188/*
189Register sends a registration request to the remote OLT
190*/
191func (o *PonSimOnuDevice) Register(ctx context.Context) error {
192 var err error
193 var rreq *ponsim.RegistrationRequest
194 var rrep *ponsim.RegistrationReply
195 var client ponsim.PonSimOltClient
196
197 if o.Conn != nil {
198 if client = ponsim.NewPonSimOltClient(o.Conn); client != nil {
199 rreq = &ponsim.RegistrationRequest{
200 Id: uuid.New().String(),
201 Address: common.GetInterfaceIP(o.InternalIf),
202 Port: o.Port,
203 }
204 common.Logger().Printf("Request details %+v\n", rreq)
205
206 // TODO: Loop registration until an OLT becomes available??
207
208 rrep, err = client.Register(ctx, rreq)
209 if err != nil {
210 common.Logger().Printf("Problem with registration", err.Error())
211 } else {
212 // Save OLT address details
213 o.ParentAddress = rrep.GetParentAddress()
214 o.ParentPort = rrep.GetParentPort()
215 o.AssignedPort = rrep.GetAssignedPort()
216
217 common.Logger().Printf("Registration details - %+v\n", rrep)
218
219 o.monitor <- REGISTERED_WITH_OLT
220 }
221
222 } else {
223 common.Logger().Info("Client is NIL")
224 }
225 }
226
227 return err
228}
229
230/*
231MonitorConnection verifies the communication with the OLT
232*/
233func (o *PonSimOnuDevice) MonitorConnection(ctx context.Context) {
234 for {
235 if o.state == DISCONNECTED_FROM_PON {
236 // Establish communication with OLT
237 o.Connect(ctx)
238 }
239
240 if o.state == CONNECTED_TO_PON {
241 // Just stay idle while the ONU-OLT connection is up
242 o.Conn.WaitForStateChange(ctx, o.Conn.GetState())
243
244 // The ONU-OLT connection was lost... need to cleanup
245 o.Disconnect(ctx)
246 }
247
248 time.Sleep(1 * time.Second)
249 }
250}
251
252/*
253Connect sets up communication and monitoring with remote OLT
254*/
255func (o *PonSimOnuDevice) Connect(ctx context.Context) {
256 o.monitor = make(chan PonSimDeviceState, 1)
257
258 // Define a waitgroup to block the current routine until
259 // a CONNECTED state is reached
260 wg := sync.WaitGroup{}
261 wg.Add(1)
262
263 go o.MonitorState(ctx, &wg)
264
265 o.ConnectToRemoteOlt()
266
267 // Wait until we establish a connection to the remote PON
268 wg.Wait()
269}
270
271/*
272Disconnect tears down communication and monitoring with remote OLT
273*/
274func (o *PonSimOnuDevice) Disconnect(ctx context.Context) {
275 if o.egressHandler != nil {
276 o.egressHandler.Close()
277 o.egressHandler = nil
278 }
279
280 if o.Conn != nil {
281 o.Conn.Close()
282 o.Conn = nil
283 }
284
285 if o.monitor != nil {
286 close(o.monitor)
287 o.monitor = nil
288 o.state = DISCONNECTED_FROM_PON
289 }
290}
291
292/*
293MonitorState follows the progress of the OLT connection
294*/
295func (o *PonSimOnuDevice) MonitorState(ctx context.Context, wg *sync.WaitGroup) {
296 // Start a concurrent routine to handle ONU state changes
297 var ok bool
298 for {
299 select {
300 case o.state, ok = <-o.monitor:
301 if ok {
302 common.Logger().WithFields(logrus.Fields{
303 "device": o,
304 "state": o.state,
305 }).Info("Received monitoring state")
306
307 switch o.state {
308 case CONNECTED_TO_PON:
309 // We have successfully connected to the OLT
310 // proceed with registration
311 wg.Done()
312
313 if err := o.Register(ctx); err != nil {
314 o.Disconnect(ctx)
315 }
316
317 case DISCONNECTED_FROM_PON:
318 // Connection to remote OLT was lost... exit
319 common.Logger().WithFields(logrus.Fields{
320 "device": o,
321 }).Warn("Exiting due to disconnection")
322 return
323
324 case REGISTERED_WITH_OLT:
325 // Start listening on network interfaces
326 o.connectNetworkInterfaces()
327 o.monitor <- CONNECTED_IO_INTERFACE
328
329 case CONNECTED_IO_INTERFACE:
330 // Start listening on local interfaces
331 go o.Listen(ctx)
332 }
333 } else {
334 common.Logger().WithFields(logrus.Fields{
335 "device": o,
336 }).Warn("Monitoring channel has closed")
337 return
338 }
339 case <-ctx.Done():
340 common.Logger().WithFields(logrus.Fields{
341 "device": o,
342 }).Warn("Received a cancellation notification")
343
344 return
345 }
346 }
347}
348
349/*
350ConnectToRemoteOlt establishes GRPC communication with the remote OLT
351*/
352func (o *PonSimOnuDevice) ConnectToRemoteOlt() {
353 common.Logger().WithFields(logrus.Fields{
354 "device": o,
355 }).Debug("Connecting to remote device")
356
357 var err error
358
359 host := strings.Join([]string{
360 o.ParentAddress,
361 strconv.Itoa(int(o.ParentPort)),
362 }, ":")
363
364 // TODO: make it secure
365 // GRPC communication needs to be secured
366 ta := credentials.NewTLS(&tls.Config{
367 //Certificates: []tls.Certificate{peerCert},
368 //RootCAs: caCertPool,
369 InsecureSkipVerify: true,
370 })
371
372 if o.Conn, err = grpc.DialContext(
373 context.Background(), host, grpc.WithTransportCredentials(ta), grpc.WithBlock(),
374 ); err != nil {
375 common.Logger().WithFields(logrus.Fields{
376 "device": o,
377 "error": err.Error(),
378 }).Error("Problem establishing connection")
379 } else {
380 // We are now connected
381 // time to move on
382 common.Logger().WithFields(logrus.Fields{
383 "device": o,
384 }).Info("Connected to OLT")
385 }
386
387 o.monitor <- CONNECTED_TO_PON
388}