blob: 041520d35199cf9316443a6945d9a481661b6c26 [file] [log] [blame]
William Kurkianea869482019-04-09 15:16:11 -04001/*
2 *
3 * Copyright 2018 gRPC authors.
4 *
5 * Licensed under the Apache License, Version 2.0 (the "License");
6 * you may not use this file except in compliance with the License.
7 * You may obtain a copy of the License at
8 *
9 * http://www.apache.org/licenses/LICENSE-2.0
10 *
11 * Unless required by applicable law or agreed to in writing, software
12 * distributed under the License is distributed on an "AS IS" BASIS,
13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 * See the License for the specific language governing permissions and
15 * limitations under the License.
16 *
17 */
18
19// Package channelz defines APIs for enabling channelz service, entry
20// registration/deletion, and accessing channelz data. It also defines channelz
21// metric struct formats.
22//
23// All APIs in this package are experimental.
24package channelz
25
26import (
27 "sort"
28 "sync"
29 "sync/atomic"
30 "time"
31
32 "google.golang.org/grpc/grpclog"
33)
34
35const (
36 defaultMaxTraceEntry int32 = 30
37)
38
39var (
40 db dbWrapper
41 idGen idGenerator
42 // EntryPerPage defines the number of channelz entries to be shown on a web page.
43 EntryPerPage = int64(50)
44 curState int32
45 maxTraceEntry = defaultMaxTraceEntry
46)
47
48// TurnOn turns on channelz data collection.
49func TurnOn() {
50 if !IsOn() {
51 NewChannelzStorage()
52 atomic.StoreInt32(&curState, 1)
53 }
54}
55
56// IsOn returns whether channelz data collection is on.
57func IsOn() bool {
58 return atomic.CompareAndSwapInt32(&curState, 1, 1)
59}
60
61// SetMaxTraceEntry sets maximum number of trace entry per entity (i.e. channel/subchannel).
62// Setting it to 0 will disable channel tracing.
63func SetMaxTraceEntry(i int32) {
64 atomic.StoreInt32(&maxTraceEntry, i)
65}
66
67// ResetMaxTraceEntryToDefault resets the maximum number of trace entry per entity to default.
68func ResetMaxTraceEntryToDefault() {
69 atomic.StoreInt32(&maxTraceEntry, defaultMaxTraceEntry)
70}
71
72func getMaxTraceEntry() int {
73 i := atomic.LoadInt32(&maxTraceEntry)
74 return int(i)
75}
76
77// dbWarpper wraps around a reference to internal channelz data storage, and
78// provide synchronized functionality to set and get the reference.
79type dbWrapper struct {
80 mu sync.RWMutex
81 DB *channelMap
82}
83
84func (d *dbWrapper) set(db *channelMap) {
85 d.mu.Lock()
86 d.DB = db
87 d.mu.Unlock()
88}
89
90func (d *dbWrapper) get() *channelMap {
91 d.mu.RLock()
92 defer d.mu.RUnlock()
93 return d.DB
94}
95
96// NewChannelzStorage initializes channelz data storage and id generator.
97//
98// Note: This function is exported for testing purpose only. User should not call
99// it in most cases.
100func NewChannelzStorage() {
101 db.set(&channelMap{
102 topLevelChannels: make(map[int64]struct{}),
103 channels: make(map[int64]*channel),
104 listenSockets: make(map[int64]*listenSocket),
105 normalSockets: make(map[int64]*normalSocket),
106 servers: make(map[int64]*server),
107 subChannels: make(map[int64]*subChannel),
108 })
109 idGen.reset()
110}
111
112// GetTopChannels returns a slice of top channel's ChannelMetric, along with a
113// boolean indicating whether there's more top channels to be queried for.
114//
115// The arg id specifies that only top channel with id at or above it will be included
116// in the result. The returned slice is up to a length of the arg maxResults or
117// EntryPerPage if maxResults is zero, and is sorted in ascending id order.
118func GetTopChannels(id int64, maxResults int64) ([]*ChannelMetric, bool) {
119 return db.get().GetTopChannels(id, maxResults)
120}
121
122// GetServers returns a slice of server's ServerMetric, along with a
123// boolean indicating whether there's more servers to be queried for.
124//
125// The arg id specifies that only server with id at or above it will be included
126// in the result. The returned slice is up to a length of the arg maxResults or
127// EntryPerPage if maxResults is zero, and is sorted in ascending id order.
128func GetServers(id int64, maxResults int64) ([]*ServerMetric, bool) {
129 return db.get().GetServers(id, maxResults)
130}
131
132// GetServerSockets returns a slice of server's (identified by id) normal socket's
133// SocketMetric, along with a boolean indicating whether there's more sockets to
134// be queried for.
135//
136// The arg startID specifies that only sockets with id at or above it will be
137// included in the result. The returned slice is up to a length of the arg maxResults
138// or EntryPerPage if maxResults is zero, and is sorted in ascending id order.
139func GetServerSockets(id int64, startID int64, maxResults int64) ([]*SocketMetric, bool) {
140 return db.get().GetServerSockets(id, startID, maxResults)
141}
142
143// GetChannel returns the ChannelMetric for the channel (identified by id).
144func GetChannel(id int64) *ChannelMetric {
145 return db.get().GetChannel(id)
146}
147
148// GetSubChannel returns the SubChannelMetric for the subchannel (identified by id).
149func GetSubChannel(id int64) *SubChannelMetric {
150 return db.get().GetSubChannel(id)
151}
152
153// GetSocket returns the SocketInternalMetric for the socket (identified by id).
154func GetSocket(id int64) *SocketMetric {
155 return db.get().GetSocket(id)
156}
157
158// GetServer returns the ServerMetric for the server (identified by id).
159func GetServer(id int64) *ServerMetric {
160 return db.get().GetServer(id)
161}
162
163// RegisterChannel registers the given channel c in channelz database with ref
164// as its reference name, and add it to the child list of its parent (identified
165// by pid). pid = 0 means no parent. It returns the unique channelz tracking id
166// assigned to this channel.
167func RegisterChannel(c Channel, pid int64, ref string) int64 {
168 id := idGen.genID()
169 cn := &channel{
170 refName: ref,
171 c: c,
172 subChans: make(map[int64]string),
173 nestedChans: make(map[int64]string),
174 id: id,
175 pid: pid,
176 trace: &channelTrace{createdTime: time.Now(), events: make([]*TraceEvent, 0, getMaxTraceEntry())},
177 }
178 if pid == 0 {
179 db.get().addChannel(id, cn, true, pid, ref)
180 } else {
181 db.get().addChannel(id, cn, false, pid, ref)
182 }
183 return id
184}
185
186// RegisterSubChannel registers the given channel c in channelz database with ref
187// as its reference name, and add it to the child list of its parent (identified
188// by pid). It returns the unique channelz tracking id assigned to this subchannel.
189func RegisterSubChannel(c Channel, pid int64, ref string) int64 {
190 if pid == 0 {
191 grpclog.Error("a SubChannel's parent id cannot be 0")
192 return 0
193 }
194 id := idGen.genID()
195 sc := &subChannel{
196 refName: ref,
197 c: c,
198 sockets: make(map[int64]string),
199 id: id,
200 pid: pid,
201 trace: &channelTrace{createdTime: time.Now(), events: make([]*TraceEvent, 0, getMaxTraceEntry())},
202 }
203 db.get().addSubChannel(id, sc, pid, ref)
204 return id
205}
206
207// RegisterServer registers the given server s in channelz database. It returns
208// the unique channelz tracking id assigned to this server.
209func RegisterServer(s Server, ref string) int64 {
210 id := idGen.genID()
211 svr := &server{
212 refName: ref,
213 s: s,
214 sockets: make(map[int64]string),
215 listenSockets: make(map[int64]string),
216 id: id,
217 }
218 db.get().addServer(id, svr)
219 return id
220}
221
222// RegisterListenSocket registers the given listen socket s in channelz database
223// with ref as its reference name, and add it to the child list of its parent
224// (identified by pid). It returns the unique channelz tracking id assigned to
225// this listen socket.
226func RegisterListenSocket(s Socket, pid int64, ref string) int64 {
227 if pid == 0 {
228 grpclog.Error("a ListenSocket's parent id cannot be 0")
229 return 0
230 }
231 id := idGen.genID()
232 ls := &listenSocket{refName: ref, s: s, id: id, pid: pid}
233 db.get().addListenSocket(id, ls, pid, ref)
234 return id
235}
236
237// RegisterNormalSocket registers the given normal socket s in channelz database
238// with ref as its reference name, and add it to the child list of its parent
239// (identified by pid). It returns the unique channelz tracking id assigned to
240// this normal socket.
241func RegisterNormalSocket(s Socket, pid int64, ref string) int64 {
242 if pid == 0 {
243 grpclog.Error("a NormalSocket's parent id cannot be 0")
244 return 0
245 }
246 id := idGen.genID()
247 ns := &normalSocket{refName: ref, s: s, id: id, pid: pid}
248 db.get().addNormalSocket(id, ns, pid, ref)
249 return id
250}
251
252// RemoveEntry removes an entry with unique channelz trakcing id to be id from
253// channelz database.
254func RemoveEntry(id int64) {
255 db.get().removeEntry(id)
256}
257
258// TraceEventDesc is what the caller of AddTraceEvent should provide to describe the event to be added
259// to the channel trace.
260// The Parent field is optional. It is used for event that will be recorded in the entity's parent
261// trace also.
262type TraceEventDesc struct {
263 Desc string
264 Severity Severity
265 Parent *TraceEventDesc
266}
267
268// AddTraceEvent adds trace related to the entity with specified id, using the provided TraceEventDesc.
269func AddTraceEvent(id int64, desc *TraceEventDesc) {
270 if getMaxTraceEntry() == 0 {
271 return
272 }
273 db.get().traceEvent(id, desc)
274}
275
276// channelMap is the storage data structure for channelz.
277// Methods of channelMap can be divided in two two categories with respect to locking.
278// 1. Methods acquire the global lock.
279// 2. Methods that can only be called when global lock is held.
280// A second type of method need always to be called inside a first type of method.
281type channelMap struct {
282 mu sync.RWMutex
283 topLevelChannels map[int64]struct{}
284 servers map[int64]*server
285 channels map[int64]*channel
286 subChannels map[int64]*subChannel
287 listenSockets map[int64]*listenSocket
288 normalSockets map[int64]*normalSocket
289}
290
291func (c *channelMap) addServer(id int64, s *server) {
292 c.mu.Lock()
293 s.cm = c
294 c.servers[id] = s
295 c.mu.Unlock()
296}
297
298func (c *channelMap) addChannel(id int64, cn *channel, isTopChannel bool, pid int64, ref string) {
299 c.mu.Lock()
300 cn.cm = c
301 cn.trace.cm = c
302 c.channels[id] = cn
303 if isTopChannel {
304 c.topLevelChannels[id] = struct{}{}
305 } else {
306 c.findEntry(pid).addChild(id, cn)
307 }
308 c.mu.Unlock()
309}
310
311func (c *channelMap) addSubChannel(id int64, sc *subChannel, pid int64, ref string) {
312 c.mu.Lock()
313 sc.cm = c
314 sc.trace.cm = c
315 c.subChannels[id] = sc
316 c.findEntry(pid).addChild(id, sc)
317 c.mu.Unlock()
318}
319
320func (c *channelMap) addListenSocket(id int64, ls *listenSocket, pid int64, ref string) {
321 c.mu.Lock()
322 ls.cm = c
323 c.listenSockets[id] = ls
324 c.findEntry(pid).addChild(id, ls)
325 c.mu.Unlock()
326}
327
328func (c *channelMap) addNormalSocket(id int64, ns *normalSocket, pid int64, ref string) {
329 c.mu.Lock()
330 ns.cm = c
331 c.normalSockets[id] = ns
332 c.findEntry(pid).addChild(id, ns)
333 c.mu.Unlock()
334}
335
336// removeEntry triggers the removal of an entry, which may not indeed delete the entry, if it has to
337// wait on the deletion of its children and until no other entity's channel trace references it.
338// It may lead to a chain of entry deletion. For example, deleting the last socket of a gracefully
339// shutting down server will lead to the server being also deleted.
340func (c *channelMap) removeEntry(id int64) {
341 c.mu.Lock()
342 c.findEntry(id).triggerDelete()
343 c.mu.Unlock()
344}
345
346// c.mu must be held by the caller
347func (c *channelMap) decrTraceRefCount(id int64) {
348 e := c.findEntry(id)
349 if v, ok := e.(tracedChannel); ok {
350 v.decrTraceRefCount()
351 e.deleteSelfIfReady()
352 }
353}
354
355// c.mu must be held by the caller.
356func (c *channelMap) findEntry(id int64) entry {
357 var v entry
358 var ok bool
359 if v, ok = c.channels[id]; ok {
360 return v
361 }
362 if v, ok = c.subChannels[id]; ok {
363 return v
364 }
365 if v, ok = c.servers[id]; ok {
366 return v
367 }
368 if v, ok = c.listenSockets[id]; ok {
369 return v
370 }
371 if v, ok = c.normalSockets[id]; ok {
372 return v
373 }
374 return &dummyEntry{idNotFound: id}
375}
376
377// c.mu must be held by the caller
378// deleteEntry simply deletes an entry from the channelMap. Before calling this
379// method, caller must check this entry is ready to be deleted, i.e removeEntry()
380// has been called on it, and no children still exist.
381// Conditionals are ordered by the expected frequency of deletion of each entity
382// type, in order to optimize performance.
383func (c *channelMap) deleteEntry(id int64) {
384 var ok bool
385 if _, ok = c.normalSockets[id]; ok {
386 delete(c.normalSockets, id)
387 return
388 }
389 if _, ok = c.subChannels[id]; ok {
390 delete(c.subChannels, id)
391 return
392 }
393 if _, ok = c.channels[id]; ok {
394 delete(c.channels, id)
395 delete(c.topLevelChannels, id)
396 return
397 }
398 if _, ok = c.listenSockets[id]; ok {
399 delete(c.listenSockets, id)
400 return
401 }
402 if _, ok = c.servers[id]; ok {
403 delete(c.servers, id)
404 return
405 }
406}
407
408func (c *channelMap) traceEvent(id int64, desc *TraceEventDesc) {
409 c.mu.Lock()
410 child := c.findEntry(id)
411 childTC, ok := child.(tracedChannel)
412 if !ok {
413 c.mu.Unlock()
414 return
415 }
416 childTC.getChannelTrace().append(&TraceEvent{Desc: desc.Desc, Severity: desc.Severity, Timestamp: time.Now()})
417 if desc.Parent != nil {
418 parent := c.findEntry(child.getParentID())
419 var chanType RefChannelType
420 switch child.(type) {
421 case *channel:
422 chanType = RefChannel
423 case *subChannel:
424 chanType = RefSubChannel
425 }
426 if parentTC, ok := parent.(tracedChannel); ok {
427 parentTC.getChannelTrace().append(&TraceEvent{
428 Desc: desc.Parent.Desc,
429 Severity: desc.Parent.Severity,
430 Timestamp: time.Now(),
431 RefID: id,
432 RefName: childTC.getRefName(),
433 RefType: chanType,
434 })
435 childTC.incrTraceRefCount()
436 }
437 }
438 c.mu.Unlock()
439}
440
441type int64Slice []int64
442
443func (s int64Slice) Len() int { return len(s) }
444func (s int64Slice) Swap(i, j int) { s[i], s[j] = s[j], s[i] }
445func (s int64Slice) Less(i, j int) bool { return s[i] < s[j] }
446
447func copyMap(m map[int64]string) map[int64]string {
448 n := make(map[int64]string)
449 for k, v := range m {
450 n[k] = v
451 }
452 return n
453}
454
455func min(a, b int64) int64 {
456 if a < b {
457 return a
458 }
459 return b
460}
461
462func (c *channelMap) GetTopChannels(id int64, maxResults int64) ([]*ChannelMetric, bool) {
463 if maxResults <= 0 {
464 maxResults = EntryPerPage
465 }
466 c.mu.RLock()
467 l := int64(len(c.topLevelChannels))
468 ids := make([]int64, 0, l)
469 cns := make([]*channel, 0, min(l, maxResults))
470
471 for k := range c.topLevelChannels {
472 ids = append(ids, k)
473 }
474 sort.Sort(int64Slice(ids))
475 idx := sort.Search(len(ids), func(i int) bool { return ids[i] >= id })
476 count := int64(0)
477 var end bool
478 var t []*ChannelMetric
479 for i, v := range ids[idx:] {
480 if count == maxResults {
481 break
482 }
483 if cn, ok := c.channels[v]; ok {
484 cns = append(cns, cn)
485 t = append(t, &ChannelMetric{
486 NestedChans: copyMap(cn.nestedChans),
487 SubChans: copyMap(cn.subChans),
488 })
489 count++
490 }
491 if i == len(ids[idx:])-1 {
492 end = true
493 break
494 }
495 }
496 c.mu.RUnlock()
497 if count == 0 {
498 end = true
499 }
500
501 for i, cn := range cns {
502 t[i].ChannelData = cn.c.ChannelzMetric()
503 t[i].ID = cn.id
504 t[i].RefName = cn.refName
505 t[i].Trace = cn.trace.dumpData()
506 }
507 return t, end
508}
509
510func (c *channelMap) GetServers(id, maxResults int64) ([]*ServerMetric, bool) {
511 if maxResults <= 0 {
512 maxResults = EntryPerPage
513 }
514 c.mu.RLock()
515 l := int64(len(c.servers))
516 ids := make([]int64, 0, l)
517 ss := make([]*server, 0, min(l, maxResults))
518 for k := range c.servers {
519 ids = append(ids, k)
520 }
521 sort.Sort(int64Slice(ids))
522 idx := sort.Search(len(ids), func(i int) bool { return ids[i] >= id })
523 count := int64(0)
524 var end bool
525 var s []*ServerMetric
526 for i, v := range ids[idx:] {
527 if count == maxResults {
528 break
529 }
530 if svr, ok := c.servers[v]; ok {
531 ss = append(ss, svr)
532 s = append(s, &ServerMetric{
533 ListenSockets: copyMap(svr.listenSockets),
534 })
535 count++
536 }
537 if i == len(ids[idx:])-1 {
538 end = true
539 break
540 }
541 }
542 c.mu.RUnlock()
543 if count == 0 {
544 end = true
545 }
546
547 for i, svr := range ss {
548 s[i].ServerData = svr.s.ChannelzMetric()
549 s[i].ID = svr.id
550 s[i].RefName = svr.refName
551 }
552 return s, end
553}
554
555func (c *channelMap) GetServerSockets(id int64, startID int64, maxResults int64) ([]*SocketMetric, bool) {
556 if maxResults <= 0 {
557 maxResults = EntryPerPage
558 }
559 var svr *server
560 var ok bool
561 c.mu.RLock()
562 if svr, ok = c.servers[id]; !ok {
563 // server with id doesn't exist.
564 c.mu.RUnlock()
565 return nil, true
566 }
567 svrskts := svr.sockets
568 l := int64(len(svrskts))
569 ids := make([]int64, 0, l)
570 sks := make([]*normalSocket, 0, min(l, maxResults))
571 for k := range svrskts {
572 ids = append(ids, k)
573 }
574 sort.Sort(int64Slice(ids))
575 idx := sort.Search(len(ids), func(i int) bool { return ids[i] >= startID })
576 count := int64(0)
577 var end bool
578 for i, v := range ids[idx:] {
579 if count == maxResults {
580 break
581 }
582 if ns, ok := c.normalSockets[v]; ok {
583 sks = append(sks, ns)
584 count++
585 }
586 if i == len(ids[idx:])-1 {
587 end = true
588 break
589 }
590 }
591 c.mu.RUnlock()
592 if count == 0 {
593 end = true
594 }
595 var s []*SocketMetric
596 for _, ns := range sks {
597 sm := &SocketMetric{}
598 sm.SocketData = ns.s.ChannelzMetric()
599 sm.ID = ns.id
600 sm.RefName = ns.refName
601 s = append(s, sm)
602 }
603 return s, end
604}
605
606func (c *channelMap) GetChannel(id int64) *ChannelMetric {
607 cm := &ChannelMetric{}
608 var cn *channel
609 var ok bool
610 c.mu.RLock()
611 if cn, ok = c.channels[id]; !ok {
612 // channel with id doesn't exist.
613 c.mu.RUnlock()
614 return nil
615 }
616 cm.NestedChans = copyMap(cn.nestedChans)
617 cm.SubChans = copyMap(cn.subChans)
618 // cn.c can be set to &dummyChannel{} when deleteSelfFromMap is called. Save a copy of cn.c when
619 // holding the lock to prevent potential data race.
620 chanCopy := cn.c
621 c.mu.RUnlock()
622 cm.ChannelData = chanCopy.ChannelzMetric()
623 cm.ID = cn.id
624 cm.RefName = cn.refName
625 cm.Trace = cn.trace.dumpData()
626 return cm
627}
628
629func (c *channelMap) GetSubChannel(id int64) *SubChannelMetric {
630 cm := &SubChannelMetric{}
631 var sc *subChannel
632 var ok bool
633 c.mu.RLock()
634 if sc, ok = c.subChannels[id]; !ok {
635 // subchannel with id doesn't exist.
636 c.mu.RUnlock()
637 return nil
638 }
639 cm.Sockets = copyMap(sc.sockets)
640 // sc.c can be set to &dummyChannel{} when deleteSelfFromMap is called. Save a copy of sc.c when
641 // holding the lock to prevent potential data race.
642 chanCopy := sc.c
643 c.mu.RUnlock()
644 cm.ChannelData = chanCopy.ChannelzMetric()
645 cm.ID = sc.id
646 cm.RefName = sc.refName
647 cm.Trace = sc.trace.dumpData()
648 return cm
649}
650
651func (c *channelMap) GetSocket(id int64) *SocketMetric {
652 sm := &SocketMetric{}
653 c.mu.RLock()
654 if ls, ok := c.listenSockets[id]; ok {
655 c.mu.RUnlock()
656 sm.SocketData = ls.s.ChannelzMetric()
657 sm.ID = ls.id
658 sm.RefName = ls.refName
659 return sm
660 }
661 if ns, ok := c.normalSockets[id]; ok {
662 c.mu.RUnlock()
663 sm.SocketData = ns.s.ChannelzMetric()
664 sm.ID = ns.id
665 sm.RefName = ns.refName
666 return sm
667 }
668 c.mu.RUnlock()
669 return nil
670}
671
672func (c *channelMap) GetServer(id int64) *ServerMetric {
673 sm := &ServerMetric{}
674 var svr *server
675 var ok bool
676 c.mu.RLock()
677 if svr, ok = c.servers[id]; !ok {
678 c.mu.RUnlock()
679 return nil
680 }
681 sm.ListenSockets = copyMap(svr.listenSockets)
682 c.mu.RUnlock()
683 sm.ID = svr.id
684 sm.RefName = svr.refName
685 sm.ServerData = svr.s.ChannelzMetric()
686 return sm
687}
688
689type idGenerator struct {
690 id int64
691}
692
693func (i *idGenerator) reset() {
694 atomic.StoreInt64(&i.id, 0)
695}
696
697func (i *idGenerator) genID() int64 {
698 return atomic.AddInt64(&i.id, 1)
699}