blob: 3021a31a525cc45e6c304c5d41d5b914f33a80e2 [file] [log] [blame]
khenaidooac637102019-01-14 15:44:34 -05001/*
2 *
3 * Copyright 2018 gRPC authors.
4 *
5 * Licensed under the Apache License, Version 2.0 (the "License");
6 * you may not use this file except in compliance with the License.
7 * You may obtain a copy of the License at
8 *
9 * http://www.apache.org/licenses/LICENSE-2.0
10 *
11 * Unless required by applicable law or agreed to in writing, software
12 * distributed under the License is distributed on an "AS IS" BASIS,
13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 * See the License for the specific language governing permissions and
15 * limitations under the License.
16 *
17 */
18
19// Package channelz defines APIs for enabling channelz service, entry
20// registration/deletion, and accessing channelz data. It also defines channelz
21// metric struct formats.
22//
23// All APIs in this package are experimental.
24package channelz
25
26import (
27 "sort"
28 "sync"
29 "sync/atomic"
30 "time"
31
32 "google.golang.org/grpc/grpclog"
33)
34
35const (
36 defaultMaxTraceEntry int32 = 30
37)
38
39var (
40 db dbWrapper
41 idGen idGenerator
42 // EntryPerPage defines the number of channelz entries to be shown on a web page.
43 EntryPerPage = 50
44 curState int32
45 maxTraceEntry = defaultMaxTraceEntry
46)
47
48// TurnOn turns on channelz data collection.
49func TurnOn() {
50 if !IsOn() {
51 NewChannelzStorage()
52 atomic.StoreInt32(&curState, 1)
53 }
54}
55
56// IsOn returns whether channelz data collection is on.
57func IsOn() bool {
58 return atomic.CompareAndSwapInt32(&curState, 1, 1)
59}
60
61// SetMaxTraceEntry sets maximum number of trace entry per entity (i.e. channel/subchannel).
62// Setting it to 0 will disable channel tracing.
63func SetMaxTraceEntry(i int32) {
64 atomic.StoreInt32(&maxTraceEntry, i)
65}
66
67// ResetMaxTraceEntryToDefault resets the maximum number of trace entry per entity to default.
68func ResetMaxTraceEntryToDefault() {
69 atomic.StoreInt32(&maxTraceEntry, defaultMaxTraceEntry)
70}
71
72func getMaxTraceEntry() int {
73 i := atomic.LoadInt32(&maxTraceEntry)
74 return int(i)
75}
76
77// dbWarpper wraps around a reference to internal channelz data storage, and
78// provide synchronized functionality to set and get the reference.
79type dbWrapper struct {
80 mu sync.RWMutex
81 DB *channelMap
82}
83
84func (d *dbWrapper) set(db *channelMap) {
85 d.mu.Lock()
86 d.DB = db
87 d.mu.Unlock()
88}
89
90func (d *dbWrapper) get() *channelMap {
91 d.mu.RLock()
92 defer d.mu.RUnlock()
93 return d.DB
94}
95
96// NewChannelzStorage initializes channelz data storage and id generator.
97//
98// Note: This function is exported for testing purpose only. User should not call
99// it in most cases.
100func NewChannelzStorage() {
101 db.set(&channelMap{
102 topLevelChannels: make(map[int64]struct{}),
103 channels: make(map[int64]*channel),
104 listenSockets: make(map[int64]*listenSocket),
105 normalSockets: make(map[int64]*normalSocket),
106 servers: make(map[int64]*server),
107 subChannels: make(map[int64]*subChannel),
108 })
109 idGen.reset()
110}
111
112// GetTopChannels returns a slice of top channel's ChannelMetric, along with a
113// boolean indicating whether there's more top channels to be queried for.
114//
115// The arg id specifies that only top channel with id at or above it will be included
116// in the result. The returned slice is up to a length of EntryPerPage, and is
117// sorted in ascending id order.
118func GetTopChannels(id int64) ([]*ChannelMetric, bool) {
119 return db.get().GetTopChannels(id)
120}
121
122// GetServers returns a slice of server's ServerMetric, along with a
123// boolean indicating whether there's more servers to be queried for.
124//
125// The arg id specifies that only server with id at or above it will be included
126// in the result. The returned slice is up to a length of EntryPerPage, and is
127// sorted in ascending id order.
128func GetServers(id int64) ([]*ServerMetric, bool) {
129 return db.get().GetServers(id)
130}
131
132// GetServerSockets returns a slice of server's (identified by id) normal socket's
133// SocketMetric, along with a boolean indicating whether there's more sockets to
134// be queried for.
135//
136// The arg startID specifies that only sockets with id at or above it will be
137// included in the result. The returned slice is up to a length of EntryPerPage,
138// and is sorted in ascending id order.
139func GetServerSockets(id int64, startID int64) ([]*SocketMetric, bool) {
140 return db.get().GetServerSockets(id, startID)
141}
142
143// GetChannel returns the ChannelMetric for the channel (identified by id).
144func GetChannel(id int64) *ChannelMetric {
145 return db.get().GetChannel(id)
146}
147
148// GetSubChannel returns the SubChannelMetric for the subchannel (identified by id).
149func GetSubChannel(id int64) *SubChannelMetric {
150 return db.get().GetSubChannel(id)
151}
152
153// GetSocket returns the SocketInternalMetric for the socket (identified by id).
154func GetSocket(id int64) *SocketMetric {
155 return db.get().GetSocket(id)
156}
157
158// RegisterChannel registers the given channel c in channelz database with ref
159// as its reference name, and add it to the child list of its parent (identified
160// by pid). pid = 0 means no parent. It returns the unique channelz tracking id
161// assigned to this channel.
162func RegisterChannel(c Channel, pid int64, ref string) int64 {
163 id := idGen.genID()
164 cn := &channel{
165 refName: ref,
166 c: c,
167 subChans: make(map[int64]string),
168 nestedChans: make(map[int64]string),
169 id: id,
170 pid: pid,
171 trace: &channelTrace{createdTime: time.Now(), events: make([]*TraceEvent, 0, getMaxTraceEntry())},
172 }
173 if pid == 0 {
174 db.get().addChannel(id, cn, true, pid, ref)
175 } else {
176 db.get().addChannel(id, cn, false, pid, ref)
177 }
178 return id
179}
180
181// RegisterSubChannel registers the given channel c in channelz database with ref
182// as its reference name, and add it to the child list of its parent (identified
183// by pid). It returns the unique channelz tracking id assigned to this subchannel.
184func RegisterSubChannel(c Channel, pid int64, ref string) int64 {
185 if pid == 0 {
186 grpclog.Error("a SubChannel's parent id cannot be 0")
187 return 0
188 }
189 id := idGen.genID()
190 sc := &subChannel{
191 refName: ref,
192 c: c,
193 sockets: make(map[int64]string),
194 id: id,
195 pid: pid,
196 trace: &channelTrace{createdTime: time.Now(), events: make([]*TraceEvent, 0, getMaxTraceEntry())},
197 }
198 db.get().addSubChannel(id, sc, pid, ref)
199 return id
200}
201
202// RegisterServer registers the given server s in channelz database. It returns
203// the unique channelz tracking id assigned to this server.
204func RegisterServer(s Server, ref string) int64 {
205 id := idGen.genID()
206 svr := &server{
207 refName: ref,
208 s: s,
209 sockets: make(map[int64]string),
210 listenSockets: make(map[int64]string),
211 id: id,
212 }
213 db.get().addServer(id, svr)
214 return id
215}
216
217// RegisterListenSocket registers the given listen socket s in channelz database
218// with ref as its reference name, and add it to the child list of its parent
219// (identified by pid). It returns the unique channelz tracking id assigned to
220// this listen socket.
221func RegisterListenSocket(s Socket, pid int64, ref string) int64 {
222 if pid == 0 {
223 grpclog.Error("a ListenSocket's parent id cannot be 0")
224 return 0
225 }
226 id := idGen.genID()
227 ls := &listenSocket{refName: ref, s: s, id: id, pid: pid}
228 db.get().addListenSocket(id, ls, pid, ref)
229 return id
230}
231
232// RegisterNormalSocket registers the given normal socket s in channelz database
233// with ref as its reference name, and add it to the child list of its parent
234// (identified by pid). It returns the unique channelz tracking id assigned to
235// this normal socket.
236func RegisterNormalSocket(s Socket, pid int64, ref string) int64 {
237 if pid == 0 {
238 grpclog.Error("a NormalSocket's parent id cannot be 0")
239 return 0
240 }
241 id := idGen.genID()
242 ns := &normalSocket{refName: ref, s: s, id: id, pid: pid}
243 db.get().addNormalSocket(id, ns, pid, ref)
244 return id
245}
246
247// RemoveEntry removes an entry with unique channelz trakcing id to be id from
248// channelz database.
249func RemoveEntry(id int64) {
250 db.get().removeEntry(id)
251}
252
253// TraceEventDesc is what the caller of AddTraceEvent should provide to describe the event to be added
254// to the channel trace.
255// The Parent field is optional. It is used for event that will be recorded in the entity's parent
256// trace also.
257type TraceEventDesc struct {
258 Desc string
259 Severity Severity
260 Parent *TraceEventDesc
261}
262
263// AddTraceEvent adds trace related to the entity with specified id, using the provided TraceEventDesc.
264func AddTraceEvent(id int64, desc *TraceEventDesc) {
265 if getMaxTraceEntry() == 0 {
266 return
267 }
268 db.get().traceEvent(id, desc)
269}
270
271// channelMap is the storage data structure for channelz.
272// Methods of channelMap can be divided in two two categories with respect to locking.
273// 1. Methods acquire the global lock.
274// 2. Methods that can only be called when global lock is held.
275// A second type of method need always to be called inside a first type of method.
276type channelMap struct {
277 mu sync.RWMutex
278 topLevelChannels map[int64]struct{}
279 servers map[int64]*server
280 channels map[int64]*channel
281 subChannels map[int64]*subChannel
282 listenSockets map[int64]*listenSocket
283 normalSockets map[int64]*normalSocket
284}
285
286func (c *channelMap) addServer(id int64, s *server) {
287 c.mu.Lock()
288 s.cm = c
289 c.servers[id] = s
290 c.mu.Unlock()
291}
292
293func (c *channelMap) addChannel(id int64, cn *channel, isTopChannel bool, pid int64, ref string) {
294 c.mu.Lock()
295 cn.cm = c
296 cn.trace.cm = c
297 c.channels[id] = cn
298 if isTopChannel {
299 c.topLevelChannels[id] = struct{}{}
300 } else {
301 c.findEntry(pid).addChild(id, cn)
302 }
303 c.mu.Unlock()
304}
305
306func (c *channelMap) addSubChannel(id int64, sc *subChannel, pid int64, ref string) {
307 c.mu.Lock()
308 sc.cm = c
309 sc.trace.cm = c
310 c.subChannels[id] = sc
311 c.findEntry(pid).addChild(id, sc)
312 c.mu.Unlock()
313}
314
315func (c *channelMap) addListenSocket(id int64, ls *listenSocket, pid int64, ref string) {
316 c.mu.Lock()
317 ls.cm = c
318 c.listenSockets[id] = ls
319 c.findEntry(pid).addChild(id, ls)
320 c.mu.Unlock()
321}
322
323func (c *channelMap) addNormalSocket(id int64, ns *normalSocket, pid int64, ref string) {
324 c.mu.Lock()
325 ns.cm = c
326 c.normalSockets[id] = ns
327 c.findEntry(pid).addChild(id, ns)
328 c.mu.Unlock()
329}
330
331// removeEntry triggers the removal of an entry, which may not indeed delete the entry, if it has to
332// wait on the deletion of its children and until no other entity's channel trace references it.
333// It may lead to a chain of entry deletion. For example, deleting the last socket of a gracefully
334// shutting down server will lead to the server being also deleted.
335func (c *channelMap) removeEntry(id int64) {
336 c.mu.Lock()
337 c.findEntry(id).triggerDelete()
338 c.mu.Unlock()
339}
340
341// c.mu must be held by the caller
342func (c *channelMap) decrTraceRefCount(id int64) {
343 e := c.findEntry(id)
344 if v, ok := e.(tracedChannel); ok {
345 v.decrTraceRefCount()
346 e.deleteSelfIfReady()
347 }
348}
349
350// c.mu must be held by the caller.
351func (c *channelMap) findEntry(id int64) entry {
352 var v entry
353 var ok bool
354 if v, ok = c.channels[id]; ok {
355 return v
356 }
357 if v, ok = c.subChannels[id]; ok {
358 return v
359 }
360 if v, ok = c.servers[id]; ok {
361 return v
362 }
363 if v, ok = c.listenSockets[id]; ok {
364 return v
365 }
366 if v, ok = c.normalSockets[id]; ok {
367 return v
368 }
369 return &dummyEntry{idNotFound: id}
370}
371
372// c.mu must be held by the caller
373// deleteEntry simply deletes an entry from the channelMap. Before calling this
374// method, caller must check this entry is ready to be deleted, i.e removeEntry()
375// has been called on it, and no children still exist.
376// Conditionals are ordered by the expected frequency of deletion of each entity
377// type, in order to optimize performance.
378func (c *channelMap) deleteEntry(id int64) {
379 var ok bool
380 if _, ok = c.normalSockets[id]; ok {
381 delete(c.normalSockets, id)
382 return
383 }
384 if _, ok = c.subChannels[id]; ok {
385 delete(c.subChannels, id)
386 return
387 }
388 if _, ok = c.channels[id]; ok {
389 delete(c.channels, id)
390 delete(c.topLevelChannels, id)
391 return
392 }
393 if _, ok = c.listenSockets[id]; ok {
394 delete(c.listenSockets, id)
395 return
396 }
397 if _, ok = c.servers[id]; ok {
398 delete(c.servers, id)
399 return
400 }
401}
402
403func (c *channelMap) traceEvent(id int64, desc *TraceEventDesc) {
404 c.mu.Lock()
405 child := c.findEntry(id)
406 childTC, ok := child.(tracedChannel)
407 if !ok {
408 c.mu.Unlock()
409 return
410 }
411 childTC.getChannelTrace().append(&TraceEvent{Desc: desc.Desc, Severity: desc.Severity, Timestamp: time.Now()})
412 if desc.Parent != nil {
413 parent := c.findEntry(child.getParentID())
414 var chanType RefChannelType
415 switch child.(type) {
416 case *channel:
417 chanType = RefChannel
418 case *subChannel:
419 chanType = RefSubChannel
420 }
421 if parentTC, ok := parent.(tracedChannel); ok {
422 parentTC.getChannelTrace().append(&TraceEvent{
423 Desc: desc.Parent.Desc,
424 Severity: desc.Parent.Severity,
425 Timestamp: time.Now(),
426 RefID: id,
427 RefName: childTC.getRefName(),
428 RefType: chanType,
429 })
430 childTC.incrTraceRefCount()
431 }
432 }
433 c.mu.Unlock()
434}
435
436type int64Slice []int64
437
438func (s int64Slice) Len() int { return len(s) }
439func (s int64Slice) Swap(i, j int) { s[i], s[j] = s[j], s[i] }
440func (s int64Slice) Less(i, j int) bool { return s[i] < s[j] }
441
442func copyMap(m map[int64]string) map[int64]string {
443 n := make(map[int64]string)
444 for k, v := range m {
445 n[k] = v
446 }
447 return n
448}
449
450func min(a, b int) int {
451 if a < b {
452 return a
453 }
454 return b
455}
456
457func (c *channelMap) GetTopChannels(id int64) ([]*ChannelMetric, bool) {
458 c.mu.RLock()
459 l := len(c.topLevelChannels)
460 ids := make([]int64, 0, l)
461 cns := make([]*channel, 0, min(l, EntryPerPage))
462
463 for k := range c.topLevelChannels {
464 ids = append(ids, k)
465 }
466 sort.Sort(int64Slice(ids))
467 idx := sort.Search(len(ids), func(i int) bool { return ids[i] >= id })
468 count := 0
469 var end bool
470 var t []*ChannelMetric
471 for i, v := range ids[idx:] {
472 if count == EntryPerPage {
473 break
474 }
475 if cn, ok := c.channels[v]; ok {
476 cns = append(cns, cn)
477 t = append(t, &ChannelMetric{
478 NestedChans: copyMap(cn.nestedChans),
479 SubChans: copyMap(cn.subChans),
480 })
481 count++
482 }
483 if i == len(ids[idx:])-1 {
484 end = true
485 break
486 }
487 }
488 c.mu.RUnlock()
489 if count == 0 {
490 end = true
491 }
492
493 for i, cn := range cns {
494 t[i].ChannelData = cn.c.ChannelzMetric()
495 t[i].ID = cn.id
496 t[i].RefName = cn.refName
497 t[i].Trace = cn.trace.dumpData()
498 }
499 return t, end
500}
501
502func (c *channelMap) GetServers(id int64) ([]*ServerMetric, bool) {
503 c.mu.RLock()
504 l := len(c.servers)
505 ids := make([]int64, 0, l)
506 ss := make([]*server, 0, min(l, EntryPerPage))
507 for k := range c.servers {
508 ids = append(ids, k)
509 }
510 sort.Sort(int64Slice(ids))
511 idx := sort.Search(len(ids), func(i int) bool { return ids[i] >= id })
512 count := 0
513 var end bool
514 var s []*ServerMetric
515 for i, v := range ids[idx:] {
516 if count == EntryPerPage {
517 break
518 }
519 if svr, ok := c.servers[v]; ok {
520 ss = append(ss, svr)
521 s = append(s, &ServerMetric{
522 ListenSockets: copyMap(svr.listenSockets),
523 })
524 count++
525 }
526 if i == len(ids[idx:])-1 {
527 end = true
528 break
529 }
530 }
531 c.mu.RUnlock()
532 if count == 0 {
533 end = true
534 }
535
536 for i, svr := range ss {
537 s[i].ServerData = svr.s.ChannelzMetric()
538 s[i].ID = svr.id
539 s[i].RefName = svr.refName
540 }
541 return s, end
542}
543
544func (c *channelMap) GetServerSockets(id int64, startID int64) ([]*SocketMetric, bool) {
545 var svr *server
546 var ok bool
547 c.mu.RLock()
548 if svr, ok = c.servers[id]; !ok {
549 // server with id doesn't exist.
550 c.mu.RUnlock()
551 return nil, true
552 }
553 svrskts := svr.sockets
554 l := len(svrskts)
555 ids := make([]int64, 0, l)
556 sks := make([]*normalSocket, 0, min(l, EntryPerPage))
557 for k := range svrskts {
558 ids = append(ids, k)
559 }
560 sort.Sort(int64Slice(ids))
561 idx := sort.Search(len(ids), func(i int) bool { return ids[i] >= startID })
562 count := 0
563 var end bool
564 for i, v := range ids[idx:] {
565 if count == EntryPerPage {
566 break
567 }
568 if ns, ok := c.normalSockets[v]; ok {
569 sks = append(sks, ns)
570 count++
571 }
572 if i == len(ids[idx:])-1 {
573 end = true
574 break
575 }
576 }
577 c.mu.RUnlock()
578 if count == 0 {
579 end = true
580 }
581 var s []*SocketMetric
582 for _, ns := range sks {
583 sm := &SocketMetric{}
584 sm.SocketData = ns.s.ChannelzMetric()
585 sm.ID = ns.id
586 sm.RefName = ns.refName
587 s = append(s, sm)
588 }
589 return s, end
590}
591
592func (c *channelMap) GetChannel(id int64) *ChannelMetric {
593 cm := &ChannelMetric{}
594 var cn *channel
595 var ok bool
596 c.mu.RLock()
597 if cn, ok = c.channels[id]; !ok {
598 // channel with id doesn't exist.
599 c.mu.RUnlock()
600 return nil
601 }
602 cm.NestedChans = copyMap(cn.nestedChans)
603 cm.SubChans = copyMap(cn.subChans)
604 // cn.c can be set to &dummyChannel{} when deleteSelfFromMap is called. Save a copy of cn.c when
605 // holding the lock to prevent potential data race.
606 chanCopy := cn.c
607 c.mu.RUnlock()
608 cm.ChannelData = chanCopy.ChannelzMetric()
609 cm.ID = cn.id
610 cm.RefName = cn.refName
611 cm.Trace = cn.trace.dumpData()
612 return cm
613}
614
615func (c *channelMap) GetSubChannel(id int64) *SubChannelMetric {
616 cm := &SubChannelMetric{}
617 var sc *subChannel
618 var ok bool
619 c.mu.RLock()
620 if sc, ok = c.subChannels[id]; !ok {
621 // subchannel with id doesn't exist.
622 c.mu.RUnlock()
623 return nil
624 }
625 cm.Sockets = copyMap(sc.sockets)
626 // sc.c can be set to &dummyChannel{} when deleteSelfFromMap is called. Save a copy of sc.c when
627 // holding the lock to prevent potential data race.
628 chanCopy := sc.c
629 c.mu.RUnlock()
630 cm.ChannelData = chanCopy.ChannelzMetric()
631 cm.ID = sc.id
632 cm.RefName = sc.refName
633 cm.Trace = sc.trace.dumpData()
634 return cm
635}
636
637func (c *channelMap) GetSocket(id int64) *SocketMetric {
638 sm := &SocketMetric{}
639 c.mu.RLock()
640 if ls, ok := c.listenSockets[id]; ok {
641 c.mu.RUnlock()
642 sm.SocketData = ls.s.ChannelzMetric()
643 sm.ID = ls.id
644 sm.RefName = ls.refName
645 return sm
646 }
647 if ns, ok := c.normalSockets[id]; ok {
648 c.mu.RUnlock()
649 sm.SocketData = ns.s.ChannelzMetric()
650 sm.ID = ns.id
651 sm.RefName = ns.refName
652 return sm
653 }
654 c.mu.RUnlock()
655 return nil
656}
657
658type idGenerator struct {
659 id int64
660}
661
662func (i *idGenerator) reset() {
663 atomic.StoreInt64(&i.id, 0)
664}
665
666func (i *idGenerator) genID() int64 {
667 return atomic.AddInt64(&i.id, 1)
668}