当前位置: 首页 > article >正文

【k8s深入学习之 event 记录】初步了解 k8s event 记录机制

event 事件记录初始化

  • 一般在控制器都会有如下的初始化函数,初始化 event 记录器等参数
1. 创建 EventBroadcaster
  • record.NewBroadcaster(): 创建事件广播器,用于记录和分发事件。
  • StartLogging(klog.Infof): 将事件以日志的形式输出。
  • StartRecordingToSink: 将事件发送到 Kubernetes API Server,存储为 Event 资源。
2. 创建 EventRecorder
  • NewRecorder(scheme, source)从广播器中创建事件记录器。
    • scheme: 用于验证和序列化资源。
    • source: 指定事件的来源(如 example-controller)。
import "k8s.io/client-go/tools/record"

func (c *controller) Initialize(opt *framework.ControllerOption) error {
	// ...
  // 1. 创建事件广播器 eventBroadcaster
  eventBroadcaster := record.NewBroadcaster()
  // 将 event 记录到 log
	eventBroadcaster.StartLogging(klog.Infof)
  // 将 event 记录到 apiserver
  // c.kubeClient.CoreV1().Events("") 这个是创建一个可以操作任意 ns 下 event 的 client
	eventBroadcaster.StartRecordingToSink(&corev1.EventSinkImpl{Interface: c.kubeClient.CoreV1().Events("")})
  // 2. 基于事件广播器 创建事件记录器 Recorder
  c.recorder = eventBroadcaster.NewRecorder(versionedscheme.Scheme, v1.EventSource{Component: "example-controller"})
}

// 事件的记录
const Create-Reason = "PodCreate"
func (c *controller)Controller_Do_Something(pod *corev1.Pod){
  newPod:= pod.DeepCopy()
  // 生成个 event,并记录
  // 内容为 newPod 创建成功,event等级为 Normal,Reason 是 PodCreate,Message 是 Create Pod succeed
  // 之后 Recorder 内的 eventBroadcaster 会将此 event 广播出去,然后 eventBroadcaster 之前注册的日志记录和event存储逻辑会执行
  // 日志记录逻辑,会通过 klog.Infof 将此 event 打印出来
  // event存储逻辑,会将此 event 存储到 apiserver
  c.recorder.Event(newPod, v1.EventTypeNormal, Create-Reason, "Create Pod succeed")
}

源码解析

在这里插入图片描述

0- 总体逻辑设计

  1. 控制中心 Broadcaster

    • eventBroadcaster := record.NewBroadcaster() 创建一个公共数据源(或理解为总控中心,也可以称之为控制器,但不是k8s 控制器)

      • 返回的是eventBroadcasterImpl 结构体,其封装了Broadcaster结构体,因此 eventBroadcasterImpl 结构体的字段很丰富
    • Broadcaster 中的字段主要记录处理 event 的监听者watchers,以及分发的控制等

      • eventBroadcaster.StartLogging(klog.Infof) 就是一个 watcher
      • eventBroadcaster.StartRecordingToSink(&corev1.EventSinkImpl{Interface: c.kubeClient.CoreV1().Events("")}) 也是个 watcher
      • 这些 watcher 都会被记录到Broadcaster结构体的watchers map[int64]*broadcasterWatcher 的map 中
    • eventBroadcasterImplBroadcaster 基础上增加少量配置参数和控制函数

  2. Event 分发和 watcher 处理逻辑

    • eventBroadcaster := record.NewBroadcaster() 执行过程中会调用 watch.NewLongQueueBroadcaster(maxQueuedEvents, watch.DropIfChannelFull) 函数,其会开启 event 分发逻辑go m.loop()
      • go m.loop() 用于处理 event 的分发,读取eventBroadcasterImplincoming chan Event 通道传来的 event,分发给各个 watcher 的 result channel
        • incoming 中的 event 是由 Recorder 写入的,Recorder.Event 会生成个 event ,并发送到incomimg channel 中
        • go m.loop()函数会读取incoming通道中的 Event,发送个各个watcher, 然后各个watcher执行自己的逻辑(如记录为 info级别日志、或写入apiserver等)
      • 同时为了避免主进程的结束导致go m.loop()进程结束,NewLongQueueBroadcaster 还利用distributing sync.WaitGroup变量,进行 m.distributing.Add(1),让主进程等待(避免主进程快速结束,导致 loop 进程结束)
    • StartLoggingStartRecordingToSink 函数会调用 StartEventWatcher 函数, StartEventWatcher 函数将传入的参数变为一个 event处理函数 eventHandler, StartEventWatcher 函数同时会开启一个 go 协程,读取各自 watcher result channel 中的 event,之后用eventHandler进行处理(如记录为 info级别日志、或写入apiserver等)
  3. Event 产生逻辑

    • Recorder 是由 eventBroadcaster.NewRecorder 创建出来的,相当于对eventBroadcasterImplBroadcaster 的封装

    • Recorder.Event 会生成个 event ,并发送到incomimg channel 中

      • Recorder 会利用Broadcasterincoming channel 写入 event

      • Recorder 会利用BroadcasterincomingBlock,控制写入时的并发,避免同一时间写入 event 过多导致错乱(这部分逻辑在blockQueue 函数中)

1- 控制中心的创建 —— NewBroadcaster 函数

  • 创建的eventBroadcaster ,实际上就是创建一个 eventBroadcasterImpl 结构体,并传入一些配置参数进行初始化
  • 注意 eventBroadcasterImpl封装了Broadcaster结构体
    • 注意Broadcaster中有很多channelwatchers和分发相关控制、并发控制字段等
      • eventBroadcaster.StartLogging(klog.Infof) 就是一个 watcher
      • eventBroadcaster.StartRecordingToSink(&corev1.EventSinkImpl{Interface: c.kubeClient.CoreV1().Events("")}) 也是个 watcher
      • 这些 watcher 都会被记录到watchers map[int64]*broadcasterWatcher 的map 中
    • 基于eventBroadcaster 创建的Recorder,实际上级就是对eventBroadcasterImpl结构体的封装
    • 之后Recorder创建 event 时,会传入到eventBroadcasterImplBroadcaster
// 路径 mod/k8s.io/client-go@v0.29.0/tools/record/event.go
const maxQueuedEvents = 1000
type FullChannelBehavior int
const (
	WaitIfChannelFull FullChannelBehavior = iota
	DropIfChannelFull
)

// Creates a new event broadcaster.
func NewBroadcaster(opts ...BroadcasterOption) EventBroadcaster {
	c := config{
		sleepDuration: defaultSleepDuration,
	}
	for _, opt := range opts {
		opt(&c)
	}
  // 重点关注
	eventBroadcaster := &eventBroadcasterImpl{
		Broadcaster:   watch.NewLongQueueBroadcaster(maxQueuedEvents, watch.DropIfChannelFull),
		sleepDuration: c.sleepDuration,
		options:       c.CorrelatorOptions,
	}
	ctx := c.Context
	if ctx == nil {
		ctx = context.Background()
	} else {
		// Calling Shutdown is not required when a context was provided:
		// when the context is canceled, this goroutine will shut down
		// the broadcaster.
		go func() {
			<-ctx.Done()
			eventBroadcaster.Broadcaster.Shutdown()
		}()
	}
	eventBroadcaster.cancelationCtx, eventBroadcaster.cancel = context.WithCancel(ctx)
  // 重点关注
	return eventBroadcaster
}

// 路径 mod/k8s.io/apimachinery@v0.29.0/pkg/watch/mux.go
// NewLongQueueBroadcaster functions nearly identically to NewBroadcaster,
// except that the incoming queue is the same size as the outgoing queues
// (specified by queueLength).
func NewLongQueueBroadcaster(queueLength int, fullChannelBehavior FullChannelBehavior) *Broadcaster {
	m := &Broadcaster{
		watchers:            map[int64]*broadcasterWatcher{},
		incoming:            make(chan Event, queueLength),
		stopped:             make(chan struct{}),
		watchQueueLength:    queueLength,
		fullChannelBehavior: fullChannelBehavior,
	}
	m.distributing.Add(1)	// distributing sync.WaitGroup, 1 个进程
	go m.loop()  					// loop 进程,很关键! 处理 event 的分发,分发给 watcher 处理		
	return m
}
1.1- eventBroadcasterImpl 结构体
// 路径 mod/k8s.io/client-go@v0.29.0/tools/record/event.go
type eventBroadcasterImpl struct {
	*watch.Broadcaster  // 此处引用下面的结构体
	sleepDuration  time.Duration
	options        CorrelatorOptions
	cancelationCtx context.Context
	cancel         func()
}

// 路径 /mod/k8s.io/apimachinery@v0.29.0/pkg/watch/mux.go
// Broadcaster distributes event notifications among any number of watchers. Every event
// is delivered to every watcher.
type Broadcaster struct {
	watchers     map[int64]*broadcasterWatcher  // map 结构  id 和 watcher 的映射
	nextWatcher  int64													// 下一个 watcher 该分配的 id
	distributing sync.WaitGroup									// 用于保证分发函数 loop 正常运行,避免主函数停止,导致 loop 函数停止

	// incomingBlock allows us to ensure we don't race and end up sending events
	// to a closed channel following a broadcaster shutdown.
	incomingBlock sync.Mutex										// 避免接收 event 时,event 过多导致的并发,因此需要锁进行控制
	incoming      chan Event										// 承接生成的 event,其他 watcher 会从此 channel 中读取 event 进行记录到 apiserver 或日志打印等
	stopped       chan struct{}									// 承接关闭广播器 Broadcaster 的停止信号

	// How large to make watcher's channel.
	watchQueueLength int
	// If one of the watch channels is full, don't wait for it to become empty.
	// Instead just deliver it to the watchers that do have space in their
	// channels and move on to the next event.
	// It's more fair to do this on a per-watcher basis than to do it on the
	// "incoming" channel, which would allow one slow watcher to prevent all
	// other watchers from getting new events.
	fullChannelBehavior FullChannelBehavior
}
1.2- EventBroadcaster 接口
// 路径 mod/k8s.io/client-go@v0.29.0/tools/record/event.go
// EventBroadcaster knows how to receive events and send them to any EventSink, watcher, or log.
type EventBroadcaster interface {
	// StartEventWatcher starts sending events received from this EventBroadcaster to the given
	// event handler function. The return value can be ignored or used to stop recording, if
	// desired.
	StartEventWatcher(eventHandler func(*v1.Event)) watch.Interface

	// StartRecordingToSink starts sending events received from this EventBroadcaster to the given
	// sink. The return value can be ignored or used to stop recording, if desired.
	StartRecordingToSink(sink EventSink) watch.Interface

	// StartLogging starts sending events received from this EventBroadcaster to the given logging
	// function. The return value can be ignored or used to stop recording, if desired.
	StartLogging(logf func(format string, args ...interface{})) watch.Interface

	// StartStructuredLogging starts sending events received from this EventBroadcaster to the structured
	// logging function. The return value can be ignored or used to stop recording, if desired.
	StartStructuredLogging(verbosity klog.Level) watch.Interface

	// NewRecorder returns an EventRecorder that can be used to send events to this EventBroadcaster
	// with the event source set to the given event source.
	NewRecorder(scheme *runtime.Scheme, source v1.EventSource) EventRecorderLogger

	// Shutdown shuts down the broadcaster. Once the broadcaster is shut
	// down, it will only try to record an event in a sink once before
	// giving up on it with an error message.
	Shutdown()
}
1.3- NewRecorder 接口的实现
  • Recorder 封装了 Broadcaster
// 路径 mod/k8s.io/client-go@v0.29.0/tools/record/event.go

// NewRecorder returns an EventRecorder that records events with the given event source.
func (e *eventBroadcasterImpl) NewRecorder(scheme *runtime.Scheme, source v1.EventSource) EventRecorderLogger {
	return &recorderImplLogger{recorderImpl: &recorderImpl{scheme, source, e.Broadcaster, clock.RealClock{}}, logger: klog.Background()}
}

type recorderImplLogger struct {
	*recorderImpl
	logger klog.Logger
}

type recorderImpl struct {
	scheme *runtime.Scheme
	source v1.EventSource
	*watch.Broadcaster
	clock clock.PassiveClock
}
1.3- loop(event的分发)
// // 路径 /mod/k8s.io/apimachinery@v0.29.0/pkg/watch/mux.go
// loop receives from m.incoming and distributes to all watchers.
func (m *Broadcaster) loop() {
	// Deliberately not catching crashes here. Yes, bring down the process if there's a
	// bug in watch.Broadcaster.
	for event := range m.incoming {
		if event.Type == internalRunFunctionMarker {
			event.Object.(functionFakeRuntimeObject)()
			continue
		}
		m.distribute(event)  // 将 event 分发给 watcher
	}
	m.closeAll()
	m.distributing.Done()
}

// distribute sends event to all watchers. Blocking.
func (m *Broadcaster) distribute(event Event) {
	if m.fullChannelBehavior == DropIfChannelFull {
		for _, w := range m.watchers {
			select {
			case w.result <- event: // 将 event 发送到 watcher 的 result channel,等待 watcher 进行处理
			case <-w.stopped:
			default: // Don't block if the event can't be queued.
			}
		}
	} else {
		for _, w := range m.watchers {
			select {
			case w.result <- event:
			case <-w.stopped:
			}
		}
	}
}
1.4 event 的产生
// 路径 mod/k8s.io/client-go@v0.29.0/tools/record/event.go

// EventRecorder knows how to record events on behalf of an EventSource.
type EventRecorder interface {
	// Event constructs an event from the given information and puts it in the queue for sending.
	// 'object' is the object this event is about. Event will make a reference-- or you may also
	// pass a reference to the object directly.
	// 'eventtype' of this event, and can be one of Normal, Warning. New types could be added in future
	// 'reason' is the reason this event is generated. 'reason' should be short and unique; it
	// should be in UpperCamelCase format (starting with a capital letter). "reason" will be used
	// to automate handling of events, so imagine people writing switch statements to handle them.
	// You want to make that easy.
	// 'message' is intended to be human readable.
	//
	// The resulting event will be created in the same namespace as the reference object.
	Event(object runtime.Object, eventtype, reason, message string)

	// Eventf is just like Event, but with Sprintf for the message field.
	Eventf(object runtime.Object, eventtype, reason, messageFmt string, args ...interface{})

	// AnnotatedEventf is just like eventf, but with annotations attached
	AnnotatedEventf(object runtime.Object, annotations map[string]string, eventtype, reason, messageFmt string, args ...interface{})
}

func (recorder *recorderImpl) Event(object runtime.Object, eventtype, reason, message string) {
	recorder.generateEvent(klog.Background(), object, nil, eventtype, reason, message)
}

func (recorder *recorderImpl) generateEvent(logger klog.Logger, object runtime.Object, annotations map[string]string, eventtype, reason, message string) {
	ref, err := ref.GetReference(recorder.scheme, object)
	if err != nil {
		logger.Error(err, "Could not construct reference, will not report event", "object", object, "eventType", eventtype, "reason", reason, "message", message)
		return
	}

	if !util.ValidateEventType(eventtype) {
		logger.Error(nil, "Unsupported event type", "eventType", eventtype)
		return
	}

	event := recorder.makeEvent(ref, annotations, eventtype, reason, message)
	event.Source = recorder.source

	event.ReportingInstance = recorder.source.Host
	event.ReportingController = recorder.source.Component

	// NOTE: events should be a non-blocking operation, but we also need to not
	// put this in a goroutine, otherwise we'll race to write to a closed channel
	// when we go to shut down this broadcaster.  Just drop events if we get overloaded,
	// and log an error if that happens (we've configured the broadcaster to drop
	// outgoing events anyway).
	sent, err := recorder.ActionOrDrop(watch.Added, event)
	if err != nil {
		logger.Error(err, "Unable to record event (will not retry!)")
		return
	}
	if !sent {
		logger.Error(nil, "Unable to record event: too many queued events, dropped event", "event", event)
	}
}

// Action distributes the given event among all watchers, or drops it on the floor
// if too many incoming actions are queued up.  Returns true if the action was sent,
// false if dropped.
func (m *Broadcaster) ActionOrDrop(action EventType, obj runtime.Object) (bool, error) {
	m.incomingBlock.Lock()
	defer m.incomingBlock.Unlock()

	// Ensure that if the broadcaster is stopped we do not send events to it.
	select {
	case <-m.stopped:
		return false, fmt.Errorf("broadcaster already stopped")
	default:
	}

	select {
	case m.incoming <- Event{action, obj}:
		return true, nil
	default:
		return false, nil
	}
}
1.5 watcher 的处理
eventBroadcaster.StartLogging(klog.Infof)


// StartLogging starts sending events received from this EventBroadcaster to the given logging function.
// The return value can be ignored or used to stop recording, if desired.
func (e *eventBroadcasterImpl) StartLogging(logf func(format string, args ...interface{})) watch.Interface {
	return e.StartEventWatcher(
		func(e *v1.Event) {  // 对应 下面 eventHandler
			logf("Event(%#v): type: '%v' reason: '%v' %v", e.InvolvedObject, e.Type, e.Reason, e.Message)
		})
}

// StartEventWatcher starts sending events received from this EventBroadcaster to the given event handler function.
// The return value can be ignored or used to stop recording, if desired.
func (e *eventBroadcasterImpl) StartEventWatcher(eventHandler func(*v1.Event)) watch.Interface {
	watcher, err := e.Watch()
	if err != nil {
		klog.FromContext(e.cancelationCtx).Error(err, "Unable start event watcher (will not retry!)")
	}
	go func() {  // 直接运行了
		defer utilruntime.HandleCrash()
		for {
			select {
			case <-e.cancelationCtx.Done():
				watcher.Stop()
				return
			case watchEvent := <-watcher.ResultChan():  // 从 watcher result channel 中取出 event 
				event, ok := watchEvent.Object.(*v1.Event)
				if !ok {
					// This is all local, so there's no reason this should
					// ever happen.
					continue
				}
				eventHandler(event) // 对 event 进行处理 
			}
		}
	}()
	return watcher
}



附录1 | 示例详解

以下是一个完整的 EventRecorderEventBroadcaster 实例化的代码示例,展示如何在 Kubernetes 控制器中记录事件。代码包含详细注释,适合用于实际开发或学习:


代码示例

package main

import (
	"context"
	"fmt"
	"time"

	corev1 "k8s.io/api/core/v1"
	"k8s.io/apimachinery/pkg/runtime"
	metav1 "k8s.io/apimachinery/pkg/runtime/schema"
	"k8s.io/client-go/kubernetes"
	"k8s.io/client-go/kubernetes/fake"
	"k8s.io/client-go/tools/record"
	"k8s.io/client-go/util/workqueue"
	"k8s.io/klog/v2"
)

// 主要逻辑的入口
func main() {
	// 1. 创建 Kubernetes 客户端
	// 这里我们使用 fake 客户端进行演示,生产环境应替换为真实的 `kubernetes.Clientset`
	clientset := fake.NewSimpleClientset()

	// 2. 创建事件广播器(EventBroadcaster)
	// 事件广播器是事件处理的核心,负责分发事件到日志和 API Server
	eventBroadcaster := record.NewBroadcaster()

	// 3. 启动日志记录功能
	// 通过 `klog.Infof` 输出事件到控制台或日志文件
	eventBroadcaster.StartLogging(klog.Infof)

	// 4. 启动事件的 API Server 记录功能
	// 配置事件接收器,将事件发送到 API Server,通过 `kubeClient.CoreV1().Events` 接口记录事件
	eventBroadcaster.StartRecordingToSink(&corev1.EventSinkImpl{
		Interface: clientset.CoreV1().Events(""),
	})

	// 5. 创建事件记录器(EventRecorder)
	// EventRecorder 用于开发者实际记录事件
	recorder := eventBroadcaster.NewRecorder(
		scheme(), // 提供资源的模式信息,常用的是 `runtime.NewScheme()` 或自定义的 scheme
		corev1.EventSource{Component: "example-controller"},
	)

	// 6. 模拟一个 Kubernetes 对象(如 Pod)的引用
	// 事件通常需要与具体的 Kubernetes 资源关联
	objRef := &corev1.ObjectReference{
		Kind:       "Pod",                  // 资源类型
		Namespace:  "default",              // 命名空间
		Name:       "example-pod",          // 资源名称
		UID:        "12345-abcde-67890",    // 唯一标识符
		APIVersion: "v1",                   // API 版本
	}

	// 7. 使用 EventRecorder 记录事件
	// 记录一个正常类型的事件(EventTypeNormal)
	recorder.Eventf(objRef, corev1.EventTypeNormal, "PodCreated", "Successfully created Pod %s", objRef.Name)

	// 模拟一个警告事件(EventTypeWarning)
	recorder.Eventf(objRef, corev1.EventTypeWarning, "PodFailed", "Failed to create Pod %s due to insufficient resources", objRef.Name)

	// 模拟一个控制器逻辑的操作
	processQueue(recorder, objRef)

	// 等待事件记录完成
	time.Sleep(2 * time.Second)
}

// 模拟处理队列的函数
func processQueue(recorder record.EventRecorder, objRef *corev1.ObjectReference) {
	// 创建一个工作队列
	queue := workqueue.NewRateLimitingQueue(workqueue.DefaultControllerRateLimiter())

	// 将任务添加到队列
	queue.Add("task1")

	// 模拟处理队列中的任务
	for queue.Len() > 0 {
		item, _ := queue.Get()
		defer queue.Done(item)

		// 记录一个事件,表明任务已处理
		recorder.Eventf(objRef, corev1.EventTypeNormal, "TaskProcessed", "Successfully processed task: %v", item)
		fmt.Printf("Processed task: %v\n", item)
	}
}

// 创建 Scheme 用于事件记录器
// Scheme 是 Kubernetes 中资源的模式定义,用于确定资源类型和序列化方式
func scheme() *runtime.Scheme {
	s := runtime.NewScheme()

	// 添加 CoreV1 资源到 Scheme 中
	corev1.AddToScheme(s)
	metav1.AddToGroupVersion(s, schema.GroupVersion{Version: "v1"})
	return s
}

代码详解

1. 创建 EventBroadcaster
  • record.NewBroadcaster(): 创建事件广播器,用于记录和分发事件。
  • StartLogging(klog.Infof): 将事件以日志的形式输出。
  • StartRecordingToSink: 将事件发送到 Kubernetes API Server,存储为 Event 资源。
2. 创建 EventRecorder
  • NewRecorder(scheme, source)从广播器中创建事件记录器。
    • scheme: 用于验证和序列化资源。
    • source: 指定事件的来源(如 example-controller)。
3. 创建对象引用
  • ObjectReference: 用于标识事件关联的 Kubernetes 资源。包括类型、名称、命名空间、UID 等信息。
4. 记录事件
  • Eventf
    用于记录事件,包括:
    • 类型corev1.EventTypeNormal(正常)或 corev1.EventTypeWarning(警告)。
    • 原因:事件发生的原因(如 PodCreated)。
    • 消息:事件的详细描述。
5. 模拟队列任务
  • 使用 workqueue 模拟处理任务,记录任务完成时的事件。

运行结果

日志输出

事件将输出到日志(通过 klog):

I1119 12:34:56.123456   12345 example.go:52] Event(v1.ObjectReference{...}): type: 'Normal' reason: 'PodCreated' message: 'Successfully created Pod example-pod'
I1119 12:34:56.123457   12345 example.go:53] Event(v1.ObjectReference{...}): type: 'Warning' reason: 'PodFailed' message: 'Failed to create Pod example-pod due to insufficient resources'
Processed task: task1
事件存储

如果使用真实客户端,事件会存储在集群中,可通过 kubectl 查看:

kubectl get events
事件输出
LAST SEEN   TYPE      REASON         OBJECT            MESSAGE
5s          Normal    PodCreated     Pod/example-pod   Successfully created Pod example-pod
5s          Warning   PodFailed      Pod/example-pod   Failed to create Pod example-pod due to insufficient resources
5s          Normal    TaskProcessed  Pod/example-pod   Successfully processed task: task1

代码用途

  • 日志记录和事件管理: 帮助开发者跟踪控制器的运行状态和资源变更。
  • 任务队列处理: 将业务逻辑与事件机制结合,记录每个关键操作的状态。

以上代码展示了如何使用 EventRecorderEventBroadcaster 实现 Kubernetes 控制器中的事件管理,适合用于开发自定义控制器或调试集群事件处理逻辑。

附录2 | stoped 变量的作用

NewBroadcaster 函数中的 stopped 通道用于实现对 Broadcaster 对象的停止和关闭控制。具体来说,它的作用是在广播器的生命周期中进行信号传递,用于通知 Broadcaster 是否已经停止运行。

详细分析:

1. 通道的定义:
stopped: make(chan struct{}),

stopped 是一个无缓冲的通道,类型为 struct{}。无缓冲的通道用于信号传递,表示某个事件的发生,而不需要传递具体数据。这里的 struct{} 是一个空结构体,占用零内存,因此不会传递任何实际数据。

2. 停止广播器的作用:

stopped 通道用于在广播器停止时传递一个信号。通常这种信号用于通知相关的 goroutine 或者处理流程,广播器已经停止工作,可以做一些清理操作或者退出。

3. 与 go m.loop() 的配合:
go m.loop()

NewBroadcaster 中,启动了一个新的 goroutine 来执行 m.loop()。这个 loop 方法通常是处理传入事件并进行分发的核心逻辑。loop 方法可能会定期检查 stopped 通道,判断是否已经收到停止信号。

4. 典型的停止逻辑(假设)
func (m *Broadcaster) loop() {
    for {
        select {
        case event := <-m.incoming:
            // 处理事件逻辑
        case <-m.stopped:
            // 收到停止信号后,退出 goroutine
            return
        }
    }
}

在这个假设的 loop 实现中,select 语句等待从 m.incoming 通道接收事件,或者等待 stopped 通道的信号。当收到 stopped 通道的信号时,loop 方法退出,从而停止事件的分发。

5. 停止广播器的触发:

在实际代码中,某个外部操作可能会通过 close(m.stopped) 或者发送一个信号到 m.stopped 通道,来通知 Broadcaster 停止工作。比如在处理完所有事件或者发生错误时,可能会调用停止操作:

close(m.stopped)

或者

m.stopped <- struct{}{}

这样 loop 就会检测到停止信号并退出。


总结

stopped 通道在 Broadcaster 中的作用是提供一个停止信号,通知正在运行的 goroutine(如 loop 方法)停止执行。这种设计使得 Broadcaster 能够优雅地处理停止操作,确保所有 goroutine 都能够适时退出并清理资源。


http://www.kler.cn/a/419787.html

相关文章:

  • sshd[1905]: refused connect from 192.168.1.* (192.168.1.*),ssh 拒绝连接的问题解决
  • 家政小程序开发,打造便捷家政生活小程序
  • CTF-PWN glibc源码阅读[1]: 寻找libc中堆结构的定义(2.31-0ubuntu9.16)
  • SpringBoot3.4.0和OpenFeign4.1.4不兼容
  • 探索仓颉编程语言:官网上线,在线体验与版本下载全面启航
  • sql分类
  • 【ROS2】Ubuntu22.04安装ROS humble
  • 网络诊断指南:网络故障排查步骤与技巧
  • iOS——MVC、MVP、MVVM
  • leetcode——二分法
  • 4.22CACHE计算
  • 如何在centos7 安装vscode软件教程(图文教程)
  • Meta Reality Labs的VR/AR投资战略转向:内部视角与市场影响
  • mysql数据库varchar截断问题
  • C# 编程效率提升指南:掌握算数运算、循环与方法封装
  • 【054A】基于51单片机指南针(LCD1602显示)【Keil程序+报告+原理图】
  • python创建临时文件
  • 爬虫—Scrapy 整合 ChromeDriver 实现动态网页拉取
  • oracle数据库的启动与关闭
  • mongodb下载与使用
  • 分布式协同 - 分布式系统的特性与互斥问题
  • TI毫米波雷达(七)——high accurary示例分析(二)
  • 微信小程序——文档下载功能分享(含代码)
  • MySQL基础(语句)知识复习 (除索引和视图)
  • Hive项目实战:大数据处理与分析
  • 算法:上楼梯(递归)升级版