blob: 2485a6752bfd0914adb64cef8db566706468b9d5 [file] [log] [blame]
Stephane Barbarie35595062018-02-08 08:34:39 -05001package core
2
3import (
4 "context"
5 "crypto/tls"
6 "github.com/golang/protobuf/ptypes/empty"
7 "github.com/google/gopacket"
8 "github.com/opencord/voltha/ponsim/v2/common"
9 "github.com/opencord/voltha/protos/go/ponsim"
10 "github.com/sirupsen/logrus"
11 "google.golang.org/grpc"
12 "google.golang.org/grpc/connectivity"
13 "google.golang.org/grpc/credentials"
14 "strconv"
15 "strings"
16 "time"
17)
18
19// TODO: Pass-in the certificate information as a structure parameter
20
21/*
22PonSimOltDevice is the structure responsible for the handling of an OLT device
23*/
24type PonSimOltDevice struct {
25 PonSimDevice `json:pon_device`
26 VCoreEndpoint string `json:vcore_ep`
27 MaxOnuCount int `json:max_onu`
28 Onus map[int32]*OnuRegistree `json:onu_registrees`
29 outgoing chan []byte
30
31 counterLoop *common.IntervalHandler
32 alarmLoop *common.IntervalHandler
33}
34
35/*
36
37 */
38type OnuRegistree struct {
39 Device *PonSimOnuDevice `json:onu_device`
40 Conn *grpc.ClientConn `json:grpc_conn`
41 Client ponsim.PonSimCommonClient `json:client`
42 Stream ponsim.PonSimCommon_ProcessDataClient `json:stream`
43}
44
45const (
46 BASE_PORT_NUMBER = 128
47)
48
49/*
50NewPonSimOltDevice instantiates a new OLT device structure
51*/
52func NewPonSimOltDevice(device PonSimDevice) *PonSimOltDevice {
53 olt := &PonSimOltDevice{PonSimDevice: device}
54 return olt
55}
56
57/*
58forwardToONU defines a EGRESS function to forward a packet to a specific ONU
59*/
60func (o *PonSimOltDevice) forwardToONU(onuPort int32) func(int, gopacket.Packet) {
61 return func(port int, frame gopacket.Packet) {
62 ipAddress := common.GetInterfaceIP(o.ExternalIf)
63 incoming := &ponsim.IncomingData{
64 Id: "EGRESS.OLT." + ipAddress,
65 Address: ipAddress,
66 Port: int32(port),
67 Payload: frame.Data(),
68 }
69 common.Logger().WithFields(logrus.Fields{
70 "device": o,
71 "port": port,
72 "frame": frame,
73 }).Debug("Forwarding to ONU")
74
75 // Forward packet to ONU
76 if err := o.GetOnu(onuPort).Stream.Send(incoming); err != nil {
77 common.Logger().WithFields(logrus.Fields{
78 "device": o,
79 "frameDump": frame.Dump(),
80 "incoming": incoming,
81 "error": err.Error(),
82 }).Error("A problem occurred while forwarding to ONU")
83 }
84
85 }
86}
87
88/*
89forwardToLAN defines an INGRESS function to forward a packet to VOLTHA
90*/
91func (o *PonSimOltDevice) forwardToLAN() func(int, gopacket.Packet) {
92 return func(port int, frame gopacket.Packet) {
93 common.Logger().WithFields(logrus.Fields{
94 "frame": frame.Dump(),
95 }).Info("Sending packet")
96
97 select {
98 case o.outgoing <- frame.Data():
99 common.Logger().WithFields(logrus.Fields{
100 "frame": frame.Dump(),
101 }).Info("Sent packet")
102 default:
103 common.Logger().WithFields(logrus.Fields{
104 "frame": frame.Dump(),
105 }).Warn("Unable to send packet")
106 }
107 }
108}
109
110/*
111Start performs setup operations for an OLT device
112*/
113func (o *PonSimOltDevice) Start(ctx context.Context) {
114 common.Logger().Info("Starting OLT device...")
115 o.PonSimDevice.Start(ctx)
116
117 // Open network interfaces for listening
118 o.connectNetworkInterfaces()
119
120 o.outgoing = make(chan []byte, 1)
121
122 // Add INGRESS operation
123 o.AddLink(2, 0, o.forwardToLAN())
124
125 // Start PM counter logging
126 o.counterLoop = common.NewIntervalHandler(90, o.Counter.LogCounts)
127 o.counterLoop.Start()
128
129 // Start alarm simulation
130 if o.AlarmsOn {
131 common.Logger().WithFields(logrus.Fields{
132 "device": o,
133 }).Debug("Starting alarm simulation")
134
135 alarms := NewPonSimAlarm(o.InternalIf, o.VCoreEndpoint, o.forwardToLAN())
136 o.alarmLoop = common.NewIntervalHandler(o.AlarmsFreq, alarms.GenerateAlarm)
137 o.alarmLoop.Start()
138 }
139}
140
141/*
142Stop performs cleanup operations for an OLT device
143*/
144func (o *PonSimOltDevice) Stop(ctx context.Context) {
145 common.Logger().Info("Stopping OLT device...")
146
147 // Stop PM counters loop
148 o.counterLoop.Stop()
149 o.counterLoop = nil
150
151 // Stop alarm simulation
152 if o.AlarmsOn {
153 o.alarmLoop.Stop()
154 }
155 o.alarmLoop = nil
156
157 o.ingressHandler.Close()
158 o.egressHandler.Close()
159
160 o.PonSimDevice.Stop(ctx)
161}
162
163/*
164ConnectToRemoteOnu establishes communication to a remote ONU device
165*/
166func (o *PonSimOltDevice) ConnectToRemoteOnu(onu *OnuRegistree) error {
167 var err error
168
169 host := strings.Join([]string{
170 onu.Device.Address,
171 strconv.Itoa(int(onu.Device.Port)),
172 }, ":")
173
174 common.Logger().WithFields(logrus.Fields{
175 "device": o,
176 "host": host,
177 }).Debug("Formatting host address")
178
179 // TODO: make it secure
180 ta := credentials.NewTLS(&tls.Config{
181 //Certificates: []tls.Certificate{peerCert},
182 //RootCAs: caCertPool,
183 InsecureSkipVerify: true,
184 })
185
186 // GRPC communication needs to be secured
187 if onu.Conn, err = grpc.DialContext(
188 context.Background(),
189 host,
190 grpc.WithTransportCredentials(ta),
191 ); err != nil {
192 common.Logger().WithFields(logrus.Fields{
193 "device": o,
194 "error": err.Error(),
195 }).Error("Problem with client connection")
196 }
197
198 return err
199}
200
201/*
202Listen waits for incoming EGRESS data on the internal interface
203*/
204func (o *PonSimOltDevice) Listen(ctx context.Context, port int32) {
205 var reply *empty.Empty
206 var err error
207
208 // Establish a GRPC connection with the ONU
209 onu := o.GetOnu(port)
210
211 common.Logger().WithFields(logrus.Fields{
212 "onu": onu,
213 }).Debug("Connecting to remote ONU")
214
215 if onu.Client = ponsim.NewPonSimCommonClient(onu.Conn); onu.Client == nil {
216 common.Logger().WithFields(logrus.Fields{
217 "device": o,
218 }).Error("Problem establishing client connection to ONU")
219 o.RemoveOnu(ctx, port)
220 return
221 }
222
223 // Prepare stream to ONU to forward incoming data as needed
224 if onu.Stream, err = onu.Client.ProcessData(ctx); err != nil {
225 common.Logger().WithFields(logrus.Fields{
226 "device": o,
227 }).Error("Problem establishing stream to ONU")
228 o.RemoveOnu(ctx, port)
229 return
230 }
231
232 defer o.egressHandler.Close()
233 packetSource := gopacket.NewPacketSource(o.egressHandler, o.egressHandler.LinkType())
234 common.Logger().WithFields(logrus.Fields{
235 "device": o,
236 "interface": o.InternalIf,
237 }).Debug("Listening to incoming EGRESS data")
238
239 // Wait for incoming EGRESS data
240 for packet := range packetSource.Packets() {
241 if dot1q := common.GetDot1QLayer(packet); dot1q != nil {
242 common.Logger().WithFields(logrus.Fields{
243 "device": o,
244 "packet": packet,
245 }).Debug("Received EGRESS packet")
246
247 o.Forward(ctx, 2, packet)
248 }
249 }
250
251 common.Logger().WithFields(logrus.Fields{
252 "device": o,
253 }).Debug("No more packets to process")
254
255 if reply, err = onu.Stream.CloseAndRecv(); err != nil {
256 common.Logger().WithFields(logrus.Fields{
257 "device": o,
258 "error": err.Error(),
259 }).Error("A problem occurred while closing client stream")
260 } else {
261 common.Logger().WithFields(logrus.Fields{
262 "device": o,
263 "reply": reply,
264 }).Warn("Client stream closed")
265 }
266}
267
268/*
269GetOnus returns the list of registered ONU devices
270*/
271func (o *PonSimOltDevice) GetOnus() map[int32]*OnuRegistree {
272 if o.Onus == nil {
273 o.Onus = make(map[int32]*OnuRegistree)
274 }
275
276 return o.Onus
277}
278
279/*
280GetOnu return a specific registered ONU
281*/
282func (o *PonSimOltDevice) GetOnu(index int32) *OnuRegistree {
283 var onu *OnuRegistree
284 var ok bool
285
286 if onu, ok = (o.GetOnus())[index]; ok {
287 return onu
288 }
289
290 return nil
291}
292
293func (o *PonSimOltDevice) GetOutgoing() chan []byte {
294 return o.outgoing
295}
296
297/*
298nextAvailablePort returns a port that is not already used by a registered ONU
299*/
300func (o *PonSimOltDevice) nextAvailablePort() int32 {
301 var port int32 = BASE_PORT_NUMBER
302
303 if len(o.GetOnus()) < o.MaxOnuCount {
304 for {
305 if o.GetOnu(port) != nil {
306 // port is already used
307 port += 1
308 } else {
309 // port is available... use it
310 return port
311 }
312 }
313 } else {
314 // OLT has reached its max number of ONUs
315 return -1
316 }
317}
318
319/*
320AddOnu registers an ONU device and sets up all required monitoring and connections
321*/
322func (o *PonSimOltDevice) AddOnu(onu *PonSimOnuDevice) (int32, error) {
323 var portNum int32
324 ctx := context.Background()
325
326 if portNum = o.nextAvailablePort(); portNum != -1 {
327 common.Logger().WithFields(logrus.Fields{
328 "device": o,
329 "port": portNum,
330 "onu": onu,
331 }).Info("Adding ONU")
332
333 registree := &OnuRegistree{Device: onu}
334
335 // Setup GRPC communication and check if it succeeded
336 if err := o.ConnectToRemoteOnu(registree); err == nil {
337 o.GetOnus()[portNum] = registree
338
339 o.AddLink(1, int(portNum), o.forwardToONU(portNum))
340 go o.MonitorOnu(ctx, portNum)
341 go o.Listen(ctx, portNum)
342 }
343
344 } else {
345 common.Logger().WithFields(logrus.Fields{
346 "device": o,
347 }).Warn("ONU Map is full")
348 }
349
350 return int32(portNum), nil
351}
352
353/*
354RemoveOnu removes the reference to a registered ONU
355*/
356func (o *PonSimOltDevice) RemoveOnu(ctx context.Context, onuIndex int32) error {
357 onu := o.GetOnu(onuIndex)
358 if err := onu.Conn.Close(); err != nil {
359 common.Logger().WithFields(logrus.Fields{
360 "device": o,
361 "onu": onu.Device,
362 "onuIndex": onuIndex,
363 }).Error("Problem closing connection to ONU")
364 }
365
366 common.Logger().WithFields(logrus.Fields{
367 "device": o,
368 "onu": onu,
369 "onuIndex": onuIndex,
370 }).Info("Removing ONU")
371
372 delete(o.Onus, onuIndex)
373
374 // Remove link entries for this ONU
375 o.RemoveLink(1, int(onuIndex))
376
377 return nil
378}
379
380/*
381MonitorOnu verifies the connection status of a specific ONU and cleans up as necessary
382*/
383func (o *PonSimOltDevice) MonitorOnu(ctx context.Context, onuIndex int32) {
384 for {
385 if o.GetOnu(onuIndex) != nil {
386 if conn := o.GetOnu(onuIndex).Conn; conn.GetState() == connectivity.Ready {
387 // Wait for any change to occur
388 conn.WaitForStateChange(ctx, conn.GetState())
389 // We lost communication with the ONU ... remove it
390 o.RemoveOnu(ctx, onuIndex)
391 return
392 }
393 common.Logger().WithFields(logrus.Fields{
394 "device": o,
395 "ctx": ctx,
396 "onuIndex": onuIndex,
397 }).Debug("ONU is not ready")
398 time.Sleep(1 * time.Second)
399 } else {
400 return
401 }
402 }
403}