blob: f0744f9937e940e12f635e57bffa8c4b5a78a5fa [file] [log] [blame]
Naveen Sampath04696f72022-06-13 15:19:14 +05301/*
2 *
3 * Copyright 2018 gRPC authors.
4 *
5 * Licensed under the Apache License, Version 2.0 (the "License");
6 * you may not use this file except in compliance with the License.
7 * You may obtain a copy of the License at
8 *
9 * http://www.apache.org/licenses/LICENSE-2.0
10 *
11 * Unless required by applicable law or agreed to in writing, software
12 * distributed under the License is distributed on an "AS IS" BASIS,
13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 * See the License for the specific language governing permissions and
15 * limitations under the License.
16 *
17 */
18
19// Package channelz defines APIs for enabling channelz service, entry
20// registration/deletion, and accessing channelz data. It also defines channelz
21// metric struct formats.
22//
23// All APIs in this package are experimental.
24package channelz
25
26import (
27 "fmt"
28 "sort"
29 "sync"
30 "sync/atomic"
31 "time"
32
33 "google.golang.org/grpc/grpclog"
34)
35
36const (
37 defaultMaxTraceEntry int32 = 30
38)
39
40var (
41 db dbWrapper
42 idGen idGenerator
43 // EntryPerPage defines the number of channelz entries to be shown on a web page.
44 EntryPerPage = int64(50)
45 curState int32
46 maxTraceEntry = defaultMaxTraceEntry
47)
48
49// TurnOn turns on channelz data collection.
50func TurnOn() {
51 if !IsOn() {
52 NewChannelzStorage()
53 atomic.StoreInt32(&curState, 1)
54 }
55}
56
57// IsOn returns whether channelz data collection is on.
58func IsOn() bool {
59 return atomic.CompareAndSwapInt32(&curState, 1, 1)
60}
61
62// SetMaxTraceEntry sets maximum number of trace entry per entity (i.e. channel/subchannel).
63// Setting it to 0 will disable channel tracing.
64func SetMaxTraceEntry(i int32) {
65 atomic.StoreInt32(&maxTraceEntry, i)
66}
67
68// ResetMaxTraceEntryToDefault resets the maximum number of trace entry per entity to default.
69func ResetMaxTraceEntryToDefault() {
70 atomic.StoreInt32(&maxTraceEntry, defaultMaxTraceEntry)
71}
72
73func getMaxTraceEntry() int {
74 i := atomic.LoadInt32(&maxTraceEntry)
75 return int(i)
76}
77
78// dbWarpper wraps around a reference to internal channelz data storage, and
79// provide synchronized functionality to set and get the reference.
80type dbWrapper struct {
81 mu sync.RWMutex
82 DB *channelMap
83}
84
85func (d *dbWrapper) set(db *channelMap) {
86 d.mu.Lock()
87 d.DB = db
88 d.mu.Unlock()
89}
90
91func (d *dbWrapper) get() *channelMap {
92 d.mu.RLock()
93 defer d.mu.RUnlock()
94 return d.DB
95}
96
97// NewChannelzStorage initializes channelz data storage and id generator.
98//
99// This function returns a cleanup function to wait for all channelz state to be reset by the
100// grpc goroutines when those entities get closed. By using this cleanup function, we make sure tests
101// don't mess up each other, i.e. lingering goroutine from previous test doing entity removal happen
102// to remove some entity just register by the new test, since the id space is the same.
103//
104// Note: This function is exported for testing purpose only. User should not call
105// it in most cases.
106func NewChannelzStorage() (cleanup func() error) {
107 db.set(&channelMap{
108 topLevelChannels: make(map[int64]struct{}),
109 channels: make(map[int64]*channel),
110 listenSockets: make(map[int64]*listenSocket),
111 normalSockets: make(map[int64]*normalSocket),
112 servers: make(map[int64]*server),
113 subChannels: make(map[int64]*subChannel),
114 })
115 idGen.reset()
116 return func() error {
117 var err error
118 cm := db.get()
119 if cm == nil {
120 return nil
121 }
122 for i := 0; i < 1000; i++ {
123 cm.mu.Lock()
124 if len(cm.topLevelChannels) == 0 && len(cm.servers) == 0 && len(cm.channels) == 0 && len(cm.subChannels) == 0 && len(cm.listenSockets) == 0 && len(cm.normalSockets) == 0 {
125 cm.mu.Unlock()
126 // all things stored in the channelz map have been cleared.
127 return nil
128 }
129 cm.mu.Unlock()
130 time.Sleep(10 * time.Millisecond)
131 }
132
133 cm.mu.Lock()
134 err = fmt.Errorf("after 10s the channelz map has not been cleaned up yet, topchannels: %d, servers: %d, channels: %d, subchannels: %d, listen sockets: %d, normal sockets: %d", len(cm.topLevelChannels), len(cm.servers), len(cm.channels), len(cm.subChannels), len(cm.listenSockets), len(cm.normalSockets))
135 cm.mu.Unlock()
136 return err
137 }
138}
139
140// GetTopChannels returns a slice of top channel's ChannelMetric, along with a
141// boolean indicating whether there's more top channels to be queried for.
142//
143// The arg id specifies that only top channel with id at or above it will be included
144// in the result. The returned slice is up to a length of the arg maxResults or
145// EntryPerPage if maxResults is zero, and is sorted in ascending id order.
146func GetTopChannels(id int64, maxResults int64) ([]*ChannelMetric, bool) {
147 return db.get().GetTopChannels(id, maxResults)
148}
149
150// GetServers returns a slice of server's ServerMetric, along with a
151// boolean indicating whether there's more servers to be queried for.
152//
153// The arg id specifies that only server with id at or above it will be included
154// in the result. The returned slice is up to a length of the arg maxResults or
155// EntryPerPage if maxResults is zero, and is sorted in ascending id order.
156func GetServers(id int64, maxResults int64) ([]*ServerMetric, bool) {
157 return db.get().GetServers(id, maxResults)
158}
159
160// GetServerSockets returns a slice of server's (identified by id) normal socket's
161// SocketMetric, along with a boolean indicating whether there's more sockets to
162// be queried for.
163//
164// The arg startID specifies that only sockets with id at or above it will be
165// included in the result. The returned slice is up to a length of the arg maxResults
166// or EntryPerPage if maxResults is zero, and is sorted in ascending id order.
167func GetServerSockets(id int64, startID int64, maxResults int64) ([]*SocketMetric, bool) {
168 return db.get().GetServerSockets(id, startID, maxResults)
169}
170
171// GetChannel returns the ChannelMetric for the channel (identified by id).
172func GetChannel(id int64) *ChannelMetric {
173 return db.get().GetChannel(id)
174}
175
176// GetSubChannel returns the SubChannelMetric for the subchannel (identified by id).
177func GetSubChannel(id int64) *SubChannelMetric {
178 return db.get().GetSubChannel(id)
179}
180
181// GetSocket returns the SocketInternalMetric for the socket (identified by id).
182func GetSocket(id int64) *SocketMetric {
183 return db.get().GetSocket(id)
184}
185
186// GetServer returns the ServerMetric for the server (identified by id).
187func GetServer(id int64) *ServerMetric {
188 return db.get().GetServer(id)
189}
190
191// RegisterChannel registers the given channel c in channelz database with ref
192// as its reference name, and add it to the child list of its parent (identified
193// by pid). pid = 0 means no parent. It returns the unique channelz tracking id
194// assigned to this channel.
195func RegisterChannel(c Channel, pid int64, ref string) int64 {
196 id := idGen.genID()
197 cn := &channel{
198 refName: ref,
199 c: c,
200 subChans: make(map[int64]string),
201 nestedChans: make(map[int64]string),
202 id: id,
203 pid: pid,
204 trace: &channelTrace{createdTime: time.Now(), events: make([]*TraceEvent, 0, getMaxTraceEntry())},
205 }
206 if pid == 0 {
207 db.get().addChannel(id, cn, true, pid, ref)
208 } else {
209 db.get().addChannel(id, cn, false, pid, ref)
210 }
211 return id
212}
213
214// RegisterSubChannel registers the given channel c in channelz database with ref
215// as its reference name, and add it to the child list of its parent (identified
216// by pid). It returns the unique channelz tracking id assigned to this subchannel.
217func RegisterSubChannel(c Channel, pid int64, ref string) int64 {
218 if pid == 0 {
219 grpclog.Error("a SubChannel's parent id cannot be 0")
220 return 0
221 }
222 id := idGen.genID()
223 sc := &subChannel{
224 refName: ref,
225 c: c,
226 sockets: make(map[int64]string),
227 id: id,
228 pid: pid,
229 trace: &channelTrace{createdTime: time.Now(), events: make([]*TraceEvent, 0, getMaxTraceEntry())},
230 }
231 db.get().addSubChannel(id, sc, pid, ref)
232 return id
233}
234
235// RegisterServer registers the given server s in channelz database. It returns
236// the unique channelz tracking id assigned to this server.
237func RegisterServer(s Server, ref string) int64 {
238 id := idGen.genID()
239 svr := &server{
240 refName: ref,
241 s: s,
242 sockets: make(map[int64]string),
243 listenSockets: make(map[int64]string),
244 id: id,
245 }
246 db.get().addServer(id, svr)
247 return id
248}
249
250// RegisterListenSocket registers the given listen socket s in channelz database
251// with ref as its reference name, and add it to the child list of its parent
252// (identified by pid). It returns the unique channelz tracking id assigned to
253// this listen socket.
254func RegisterListenSocket(s Socket, pid int64, ref string) int64 {
255 if pid == 0 {
256 grpclog.Error("a ListenSocket's parent id cannot be 0")
257 return 0
258 }
259 id := idGen.genID()
260 ls := &listenSocket{refName: ref, s: s, id: id, pid: pid}
261 db.get().addListenSocket(id, ls, pid, ref)
262 return id
263}
264
265// RegisterNormalSocket registers the given normal socket s in channelz database
266// with ref as its reference name, and add it to the child list of its parent
267// (identified by pid). It returns the unique channelz tracking id assigned to
268// this normal socket.
269func RegisterNormalSocket(s Socket, pid int64, ref string) int64 {
270 if pid == 0 {
271 grpclog.Error("a NormalSocket's parent id cannot be 0")
272 return 0
273 }
274 id := idGen.genID()
275 ns := &normalSocket{refName: ref, s: s, id: id, pid: pid}
276 db.get().addNormalSocket(id, ns, pid, ref)
277 return id
278}
279
280// RemoveEntry removes an entry with unique channelz trakcing id to be id from
281// channelz database.
282func RemoveEntry(id int64) {
283 db.get().removeEntry(id)
284}
285
286// TraceEventDesc is what the caller of AddTraceEvent should provide to describe the event to be added
287// to the channel trace.
288// The Parent field is optional. It is used for event that will be recorded in the entity's parent
289// trace also.
290type TraceEventDesc struct {
291 Desc string
292 Severity Severity
293 Parent *TraceEventDesc
294}
295
296// AddTraceEvent adds trace related to the entity with specified id, using the provided TraceEventDesc.
297func AddTraceEvent(id int64, desc *TraceEventDesc) {
298 if getMaxTraceEntry() == 0 {
299 return
300 }
301 db.get().traceEvent(id, desc)
302}
303
304// channelMap is the storage data structure for channelz.
305// Methods of channelMap can be divided in two two categories with respect to locking.
306// 1. Methods acquire the global lock.
307// 2. Methods that can only be called when global lock is held.
308// A second type of method need always to be called inside a first type of method.
309type channelMap struct {
310 mu sync.RWMutex
311 topLevelChannels map[int64]struct{}
312 servers map[int64]*server
313 channels map[int64]*channel
314 subChannels map[int64]*subChannel
315 listenSockets map[int64]*listenSocket
316 normalSockets map[int64]*normalSocket
317}
318
319func (c *channelMap) addServer(id int64, s *server) {
320 c.mu.Lock()
321 s.cm = c
322 c.servers[id] = s
323 c.mu.Unlock()
324}
325
326func (c *channelMap) addChannel(id int64, cn *channel, isTopChannel bool, pid int64, ref string) {
327 c.mu.Lock()
328 cn.cm = c
329 cn.trace.cm = c
330 c.channels[id] = cn
331 if isTopChannel {
332 c.topLevelChannels[id] = struct{}{}
333 } else {
334 c.findEntry(pid).addChild(id, cn)
335 }
336 c.mu.Unlock()
337}
338
339func (c *channelMap) addSubChannel(id int64, sc *subChannel, pid int64, ref string) {
340 c.mu.Lock()
341 sc.cm = c
342 sc.trace.cm = c
343 c.subChannels[id] = sc
344 c.findEntry(pid).addChild(id, sc)
345 c.mu.Unlock()
346}
347
348func (c *channelMap) addListenSocket(id int64, ls *listenSocket, pid int64, ref string) {
349 c.mu.Lock()
350 ls.cm = c
351 c.listenSockets[id] = ls
352 c.findEntry(pid).addChild(id, ls)
353 c.mu.Unlock()
354}
355
356func (c *channelMap) addNormalSocket(id int64, ns *normalSocket, pid int64, ref string) {
357 c.mu.Lock()
358 ns.cm = c
359 c.normalSockets[id] = ns
360 c.findEntry(pid).addChild(id, ns)
361 c.mu.Unlock()
362}
363
364// removeEntry triggers the removal of an entry, which may not indeed delete the entry, if it has to
365// wait on the deletion of its children and until no other entity's channel trace references it.
366// It may lead to a chain of entry deletion. For example, deleting the last socket of a gracefully
367// shutting down server will lead to the server being also deleted.
368func (c *channelMap) removeEntry(id int64) {
369 c.mu.Lock()
370 c.findEntry(id).triggerDelete()
371 c.mu.Unlock()
372}
373
374// c.mu must be held by the caller
375func (c *channelMap) decrTraceRefCount(id int64) {
376 e := c.findEntry(id)
377 if v, ok := e.(tracedChannel); ok {
378 v.decrTraceRefCount()
379 e.deleteSelfIfReady()
380 }
381}
382
383// c.mu must be held by the caller.
384func (c *channelMap) findEntry(id int64) entry {
385 var v entry
386 var ok bool
387 if v, ok = c.channels[id]; ok {
388 return v
389 }
390 if v, ok = c.subChannels[id]; ok {
391 return v
392 }
393 if v, ok = c.servers[id]; ok {
394 return v
395 }
396 if v, ok = c.listenSockets[id]; ok {
397 return v
398 }
399 if v, ok = c.normalSockets[id]; ok {
400 return v
401 }
402 return &dummyEntry{idNotFound: id}
403}
404
405// c.mu must be held by the caller
406// deleteEntry simply deletes an entry from the channelMap. Before calling this
407// method, caller must check this entry is ready to be deleted, i.e removeEntry()
408// has been called on it, and no children still exist.
409// Conditionals are ordered by the expected frequency of deletion of each entity
410// type, in order to optimize performance.
411func (c *channelMap) deleteEntry(id int64) {
412 var ok bool
413 if _, ok = c.normalSockets[id]; ok {
414 delete(c.normalSockets, id)
415 return
416 }
417 if _, ok = c.subChannels[id]; ok {
418 delete(c.subChannels, id)
419 return
420 }
421 if _, ok = c.channels[id]; ok {
422 delete(c.channels, id)
423 delete(c.topLevelChannels, id)
424 return
425 }
426 if _, ok = c.listenSockets[id]; ok {
427 delete(c.listenSockets, id)
428 return
429 }
430 if _, ok = c.servers[id]; ok {
431 delete(c.servers, id)
432 return
433 }
434}
435
436func (c *channelMap) traceEvent(id int64, desc *TraceEventDesc) {
437 c.mu.Lock()
438 child := c.findEntry(id)
439 childTC, ok := child.(tracedChannel)
440 if !ok {
441 c.mu.Unlock()
442 return
443 }
444 childTC.getChannelTrace().append(&TraceEvent{Desc: desc.Desc, Severity: desc.Severity, Timestamp: time.Now()})
445 if desc.Parent != nil {
446 parent := c.findEntry(child.getParentID())
447 var chanType RefChannelType
448 switch child.(type) {
449 case *channel:
450 chanType = RefChannel
451 case *subChannel:
452 chanType = RefSubChannel
453 }
454 if parentTC, ok := parent.(tracedChannel); ok {
455 parentTC.getChannelTrace().append(&TraceEvent{
456 Desc: desc.Parent.Desc,
457 Severity: desc.Parent.Severity,
458 Timestamp: time.Now(),
459 RefID: id,
460 RefName: childTC.getRefName(),
461 RefType: chanType,
462 })
463 childTC.incrTraceRefCount()
464 }
465 }
466 c.mu.Unlock()
467}
468
469type int64Slice []int64
470
471func (s int64Slice) Len() int { return len(s) }
472func (s int64Slice) Swap(i, j int) { s[i], s[j] = s[j], s[i] }
473func (s int64Slice) Less(i, j int) bool { return s[i] < s[j] }
474
475func copyMap(m map[int64]string) map[int64]string {
476 n := make(map[int64]string)
477 for k, v := range m {
478 n[k] = v
479 }
480 return n
481}
482
483func min(a, b int64) int64 {
484 if a < b {
485 return a
486 }
487 return b
488}
489
490func (c *channelMap) GetTopChannels(id int64, maxResults int64) ([]*ChannelMetric, bool) {
491 if maxResults <= 0 {
492 maxResults = EntryPerPage
493 }
494 c.mu.RLock()
495 l := int64(len(c.topLevelChannels))
496 ids := make([]int64, 0, l)
497 cns := make([]*channel, 0, min(l, maxResults))
498
499 for k := range c.topLevelChannels {
500 ids = append(ids, k)
501 }
502 sort.Sort(int64Slice(ids))
503 idx := sort.Search(len(ids), func(i int) bool { return ids[i] >= id })
504 count := int64(0)
505 var end bool
506 var t []*ChannelMetric
507 for i, v := range ids[idx:] {
508 if count == maxResults {
509 break
510 }
511 if cn, ok := c.channels[v]; ok {
512 cns = append(cns, cn)
513 t = append(t, &ChannelMetric{
514 NestedChans: copyMap(cn.nestedChans),
515 SubChans: copyMap(cn.subChans),
516 })
517 count++
518 }
519 if i == len(ids[idx:])-1 {
520 end = true
521 break
522 }
523 }
524 c.mu.RUnlock()
525 if count == 0 {
526 end = true
527 }
528
529 for i, cn := range cns {
530 t[i].ChannelData = cn.c.ChannelzMetric()
531 t[i].ID = cn.id
532 t[i].RefName = cn.refName
533 t[i].Trace = cn.trace.dumpData()
534 }
535 return t, end
536}
537
538func (c *channelMap) GetServers(id, maxResults int64) ([]*ServerMetric, bool) {
539 if maxResults <= 0 {
540 maxResults = EntryPerPage
541 }
542 c.mu.RLock()
543 l := int64(len(c.servers))
544 ids := make([]int64, 0, l)
545 ss := make([]*server, 0, min(l, maxResults))
546 for k := range c.servers {
547 ids = append(ids, k)
548 }
549 sort.Sort(int64Slice(ids))
550 idx := sort.Search(len(ids), func(i int) bool { return ids[i] >= id })
551 count := int64(0)
552 var end bool
553 var s []*ServerMetric
554 for i, v := range ids[idx:] {
555 if count == maxResults {
556 break
557 }
558 if svr, ok := c.servers[v]; ok {
559 ss = append(ss, svr)
560 s = append(s, &ServerMetric{
561 ListenSockets: copyMap(svr.listenSockets),
562 })
563 count++
564 }
565 if i == len(ids[idx:])-1 {
566 end = true
567 break
568 }
569 }
570 c.mu.RUnlock()
571 if count == 0 {
572 end = true
573 }
574
575 for i, svr := range ss {
576 s[i].ServerData = svr.s.ChannelzMetric()
577 s[i].ID = svr.id
578 s[i].RefName = svr.refName
579 }
580 return s, end
581}
582
583func (c *channelMap) GetServerSockets(id int64, startID int64, maxResults int64) ([]*SocketMetric, bool) {
584 if maxResults <= 0 {
585 maxResults = EntryPerPage
586 }
587 var svr *server
588 var ok bool
589 c.mu.RLock()
590 if svr, ok = c.servers[id]; !ok {
591 // server with id doesn't exist.
592 c.mu.RUnlock()
593 return nil, true
594 }
595 svrskts := svr.sockets
596 l := int64(len(svrskts))
597 ids := make([]int64, 0, l)
598 sks := make([]*normalSocket, 0, min(l, maxResults))
599 for k := range svrskts {
600 ids = append(ids, k)
601 }
602 sort.Sort(int64Slice(ids))
603 idx := sort.Search(len(ids), func(i int) bool { return ids[i] >= startID })
604 count := int64(0)
605 var end bool
606 for i, v := range ids[idx:] {
607 if count == maxResults {
608 break
609 }
610 if ns, ok := c.normalSockets[v]; ok {
611 sks = append(sks, ns)
612 count++
613 }
614 if i == len(ids[idx:])-1 {
615 end = true
616 break
617 }
618 }
619 c.mu.RUnlock()
620 if count == 0 {
621 end = true
622 }
623 var s []*SocketMetric
624 for _, ns := range sks {
625 sm := &SocketMetric{}
626 sm.SocketData = ns.s.ChannelzMetric()
627 sm.ID = ns.id
628 sm.RefName = ns.refName
629 s = append(s, sm)
630 }
631 return s, end
632}
633
634func (c *channelMap) GetChannel(id int64) *ChannelMetric {
635 cm := &ChannelMetric{}
636 var cn *channel
637 var ok bool
638 c.mu.RLock()
639 if cn, ok = c.channels[id]; !ok {
640 // channel with id doesn't exist.
641 c.mu.RUnlock()
642 return nil
643 }
644 cm.NestedChans = copyMap(cn.nestedChans)
645 cm.SubChans = copyMap(cn.subChans)
646 // cn.c can be set to &dummyChannel{} when deleteSelfFromMap is called. Save a copy of cn.c when
647 // holding the lock to prevent potential data race.
648 chanCopy := cn.c
649 c.mu.RUnlock()
650 cm.ChannelData = chanCopy.ChannelzMetric()
651 cm.ID = cn.id
652 cm.RefName = cn.refName
653 cm.Trace = cn.trace.dumpData()
654 return cm
655}
656
657func (c *channelMap) GetSubChannel(id int64) *SubChannelMetric {
658 cm := &SubChannelMetric{}
659 var sc *subChannel
660 var ok bool
661 c.mu.RLock()
662 if sc, ok = c.subChannels[id]; !ok {
663 // subchannel with id doesn't exist.
664 c.mu.RUnlock()
665 return nil
666 }
667 cm.Sockets = copyMap(sc.sockets)
668 // sc.c can be set to &dummyChannel{} when deleteSelfFromMap is called. Save a copy of sc.c when
669 // holding the lock to prevent potential data race.
670 chanCopy := sc.c
671 c.mu.RUnlock()
672 cm.ChannelData = chanCopy.ChannelzMetric()
673 cm.ID = sc.id
674 cm.RefName = sc.refName
675 cm.Trace = sc.trace.dumpData()
676 return cm
677}
678
679func (c *channelMap) GetSocket(id int64) *SocketMetric {
680 sm := &SocketMetric{}
681 c.mu.RLock()
682 if ls, ok := c.listenSockets[id]; ok {
683 c.mu.RUnlock()
684 sm.SocketData = ls.s.ChannelzMetric()
685 sm.ID = ls.id
686 sm.RefName = ls.refName
687 return sm
688 }
689 if ns, ok := c.normalSockets[id]; ok {
690 c.mu.RUnlock()
691 sm.SocketData = ns.s.ChannelzMetric()
692 sm.ID = ns.id
693 sm.RefName = ns.refName
694 return sm
695 }
696 c.mu.RUnlock()
697 return nil
698}
699
700func (c *channelMap) GetServer(id int64) *ServerMetric {
701 sm := &ServerMetric{}
702 var svr *server
703 var ok bool
704 c.mu.RLock()
705 if svr, ok = c.servers[id]; !ok {
706 c.mu.RUnlock()
707 return nil
708 }
709 sm.ListenSockets = copyMap(svr.listenSockets)
710 c.mu.RUnlock()
711 sm.ID = svr.id
712 sm.RefName = svr.refName
713 sm.ServerData = svr.s.ChannelzMetric()
714 return sm
715}
716
717type idGenerator struct {
718 id int64
719}
720
721func (i *idGenerator) reset() {
722 atomic.StoreInt64(&i.id, 0)
723}
724
725func (i *idGenerator) genID() int64 {
726 return atomic.AddInt64(&i.id, 1)
727}