blob: f229383f21b53c9026d48fd5e9c8d348b5727762 [file] [log] [blame]
khenaidoofffcc8a2019-03-13 11:54:41 -04001/*
2 * Copyright 2019-present Open Networking Foundation
3
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7
8 * http://www.apache.org/licenses/LICENSE-2.0
9
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16package core
17
18import (
19 "context"
20 "fmt"
21 "github.com/opencord/voltha-go/common/log"
22 "github.com/opencord/voltha-go/db/kvstore"
23 "google.golang.org/grpc/codes"
24 "google.golang.org/grpc/status"
25 "sync"
26 "time"
27)
28
29type ownership struct {
30 id string
31 owned bool
32 chnl chan int
33}
34
35type DeviceOwnership struct {
36 instanceId string
37 exitChannel chan int
38 kvClient kvstore.Client
39 reservationTimeout int64 // Duration in seconds
40 ownershipPrefix string
41 deviceMap map[string]*ownership
42 deviceMapLock *sync.RWMutex
43}
44
45func NewDeviceOwnership(id string, kvClient kvstore.Client, ownershipPrefix string, reservationTimeout int64) *DeviceOwnership {
46 var deviceOwnership DeviceOwnership
47 deviceOwnership.instanceId = id
48 deviceOwnership.exitChannel = make(chan int, 1)
49 deviceOwnership.kvClient = kvClient
50 deviceOwnership.ownershipPrefix = ownershipPrefix
51 deviceOwnership.reservationTimeout = reservationTimeout
52 deviceOwnership.deviceMap = make(map[string]*ownership)
53 deviceOwnership.deviceMapLock = &sync.RWMutex{}
54 return &deviceOwnership
55}
56
57func (da *DeviceOwnership) Start(ctx context.Context) {
58 log.Info("starting-deviceOwnership", log.Fields{"instanceId": da.instanceId})
59 log.Info("deviceOwnership-started")
60}
61
62func (da *DeviceOwnership) Stop(ctx context.Context) {
63 log.Info("stopping-deviceOwnership")
64 da.exitChannel <- 1
65 // Need to flush all device reservations
66 log.Info("deviceOwnership-stopped")
67}
68
69func (da *DeviceOwnership) tryToReserveKey(id string) bool {
70 var currOwner string
71 // Try to reserve the key
72 kvKey := fmt.Sprintf("%s_%s", da.ownershipPrefix, id)
73 value, err := da.kvClient.Reserve(kvKey, da.instanceId, da.reservationTimeout)
74 if value != nil {
75 if currOwner, err = kvstore.ToString(value); err != nil {
76 log.Error("unexpected-owner-type")
77 }
78 return currOwner == da.instanceId
79 }
80 return false
81}
82
83func (da *DeviceOwnership) startOwnershipMonitoring(id string, chnl chan int) {
Richard Jankowski199fd862019-03-18 14:49:51 -040084 var op string
85
khenaidoofffcc8a2019-03-13 11:54:41 -040086startloop:
87 for {
Richard Jankowski199fd862019-03-18 14:49:51 -040088 da.deviceMapLock.RLock()
89 val, exist := da.deviceMap[id]
90 da.deviceMapLock.RUnlock()
91 if exist && val.owned {
92 // Device owned; renew reservation
93 op = "renew"
94 kvKey := fmt.Sprintf("%s_%s", da.ownershipPrefix, id)
95 if err := da.kvClient.RenewReservation(kvKey); err != nil {
96 log.Errorw("reservation-renewal-error", log.Fields{"error": err})
97 }
98 } else {
99 // Device not owned; try to seize ownership
100 op = "retry"
101 if err := da.setOwnership(id, da.tryToReserveKey(id)); err != nil {
102 log.Errorw("unexpected-error", log.Fields{"error": err})
103 }
khenaidoofffcc8a2019-03-13 11:54:41 -0400104 }
105 select {
106 case <-da.exitChannel:
107 log.Infow("closing-monitoring", log.Fields{"Id": id})
108 break startloop
109 case <-time.After(time.Duration(da.reservationTimeout) / 3 * time.Second):
Richard Jankowski199fd862019-03-18 14:49:51 -0400110 msg := fmt.Sprintf("%s-reservation", op)
111 log.Infow(msg, log.Fields{"Id": id})
khenaidoofffcc8a2019-03-13 11:54:41 -0400112 case <-chnl:
113 log.Infow("closing-device-monitoring", log.Fields{"Id": id})
114 break startloop
115 }
116 }
117}
118
119func (da *DeviceOwnership) getOwnership(id string) bool {
120 da.deviceMapLock.RLock()
121 defer da.deviceMapLock.RUnlock()
122 if val, exist := da.deviceMap[id]; exist {
123 return val.owned
124 }
125 log.Debugw("setting-up-new-ownership", log.Fields{"Id": id})
126 // Not owned by me or maybe anybody else. Try to reserve it
127 reservedByMe := da.tryToReserveKey(id)
128 myChnl := make(chan int)
129 da.deviceMap[id] = &ownership{id: id, owned: reservedByMe, chnl: myChnl}
130 go da.startOwnershipMonitoring(id, myChnl)
131 return reservedByMe
132}
133
134func (da *DeviceOwnership) setOwnership(id string, owner bool) error {
135 da.deviceMapLock.Lock()
136 defer da.deviceMapLock.Unlock()
137 if _, exist := da.deviceMap[id]; exist {
138 if da.deviceMap[id].owned != owner {
139 log.Debugw("ownership-changed", log.Fields{"Id": id, "owner": owner})
140 }
141 da.deviceMap[id].owned = owner
142 return nil
143 }
144 return status.Error(codes.NotFound, fmt.Sprintf("id-inexistent-%s", id))
145}
146
147// OwnedByMe returns where this Core instance active owns this device. This function will automatically
148// trigger the process to monitor the device and update the device ownership regularly.
149func (da *DeviceOwnership) OwnedByMe(id string) bool {
150 return da.getOwnership(id)
151}
152
153//AbandonDevice must be invoked whenever a device is deleted from the Core
154func (da *DeviceOwnership) AbandonDevice(id string) error {
155 da.deviceMapLock.Lock()
156 defer da.deviceMapLock.Unlock()
157 if o, exist := da.deviceMap[id]; exist {
158 // Stop the Go routine monitoring the device
159 close(o.chnl)
160 delete(da.deviceMap, id)
161 log.Debugw("abandoning-device", log.Fields{"Id": id})
162 return nil
163 }
164 return status.Error(codes.NotFound, fmt.Sprintf("id-inexistent-%s", id))
Richard Jankowski199fd862019-03-18 14:49:51 -0400165}