对于 Pod,Kubelet 会从多个数据来源 watch Pod spec 中的变化。对于容器,Kubelet 会定期(例如,10s)轮询容器运行时,以获取所有容器的最新状态。
随着 Pod 和容器数量的增加,轮询会产生不可忽略的开销,并且会由于 Kubelet 的并行操作而加剧这种开销(为每个 Pod 分配一个 goruntine,用来获取容器的状态)。轮询带来的周期性大量并发请求会导致较高的 CPU 使用率峰值(即使 Pod 的定义和容器的状态没有发生改变),降低性能。最后容器运行时可能不堪重负,从而降低系统的可靠性,限制 Kubelet 的可扩展性。
为了降低 Pod 的管理开销,提升 Kubelet 的性能和可扩展性,引入了 PLEG,改进了之前的工作方式:
// The threshold needs to be greater than the relisting period + the // relisting time, which can vary significantly. Set a conservative // threshold to avoid flipping between healthy and unhealthy. relistThreshold = 3 * time.Minute : func (g *GenericPLEG) Healthy() (bool, error) { relistTime := g.getRelistTime() elapsed := g.clock.Since(relistTime) if elapsed > relistThreshold { returnfalse, fmt.Errorf("pleg was last seen active %v ago; threshold is %v", elapsed, relistThreshold) } returntrue, nil }
//// pkg/kubelet/kubelet.go - syncLoop() func (kl *Kubelet) syncLoop(updates <-chan kubetypes.PodUpdate, handler SyncHandler) { : // The resyncTicker wakes up kubelet to checks if there are any pod workers // that need to be sync'd. A one-second period is sufficient because the // sync interval is defaulted to 10s. : const ( base = 100 * time.Millisecond max = 5 * time.Second factor = 2 ) duration := base for { if rs := kl.runtimeState.runtimeErrors(); len(rs) != 0 { glog.Infof("skipping pod synchronization - %v", rs) // exponential backoff time.Sleep(duration) duration = time.Duration(math.Min(float64(max), factor*float64(duration))) continue } : } : }
//// pkg/kubelet/runtime.go - runtimeErrors() func (s *runtimeState) runtimeErrors() []string { : for _, hc := range s.healthChecks { if ok, err := hc.fn(); !ok { ret = append(ret, fmt.Sprintf("%s is not healthy: %v", hc.name, err)) } } : }
relist 函数
上文提到 healthy() 函数会检查 relist 的完成时间,但 relist 究竟是用来干嘛的呢?解释 relist 之前,要先解释一下 Pod 的生命周期事件。Pod 的生命周期事件是在 Pod 层面上对底层容器状态改变的抽象,使其与底层的容器运行时无关,这样就可以让 Kubelet 不受底层容器运行时的影响。
// PodLifecycleEvent is an event reflects the change of the pod state. type PodLifecycleEvent struct { // The pod ID. ID types.UID // The type of the event. TypePodLifeCycleEventType // The accompanied data which varies based on the event type. Data interface{} }
以 Docker 为例,在 Pod 中启动一个 infra 容器就会在 Kubelet 中注册一个 NetworkSetupCompleted Pod 生命周期事件。
// Generic PLEG relies on relisting for discovering container events. // A longer period means that kubelet will take longer to detect container // changes and to update pod status. On the other hand, a shorter period // will cause more frequent relisting (e.g., container runtime operations), // leading to higher cpu usage. // Note that even though we set the period to 1s, the relisting itself can // take more than 1s to finish if the container runtime responds slowly // and/or when there are many container changes in one cycle. plegRelistPeriod = time.Second * 1
// NewMainKubelet instantiates a new Kubelet object along with all the required internal modules. // No initialization of Kubelet and its modules should happen here. func NewMainKubelet(kubeCfg *kubeletconfiginternal.KubeletConfiguration, ... : klet.pleg = pleg.NewGenericPLEG(klet.containerRuntime, plegChannelCapacity, plegRelistPeriod, klet.podCache, clock.RealClock{})
//// pkg/kubelet/pleg/generic.go - Start()
// Start spawns a goroutine to relist periodically. func (g *GenericPLEG) Start() { go wait.Until(g.relist, g.relistPeriod, wait.NeverStop) }
回到上面那幅图,relist 函数第一步就是记录 Kubelet 的相关指标(例如 kubelet_pleg_relist_latency_microseconds),然后通过 CRI 从容器运行时获取当前的 Pod 列表(包括停止的 Pod)。该 Pod 列表会和之前的 Pod 列表进行比较,检查哪些状态发生了变化,然后同时生成相关的 Pod 生命周期事件和更改后的状态。
//// pkg/kubelet/pleg/generic.go - relist() : // get a current timestamp timestamp := g.clock.Now()
// kubelet_pleg_relist_latency_microseconds for prometheus metrics defer func() { metrics.PLEGRelistLatency.Observe(metrics.SinceInMicroseconds(timestamp)) }()
// Get all the pods. podList, err := g.runtime.GetPods(true) :
// GetPods returns a list of containers grouped by pods. The boolean parameter // specifies whether the runtime returns all containers including those already // exited and dead containers (used for garbage collection). func (m *kubeGenericRuntimeManager) GetPods(all bool) ([]*kubecontainer.Pod, error) { pods := make(map[kubetypes.UID]*kubecontainer.Pod) sandboxes, err := m.getKubeletSandboxes(all) : }
// Compare the old and the current pods, and generate events. eventsByPodID := map[types.UID][]*PodLifecycleEvent{} for pid := range g.podRecords { oldPod := g.podRecords.getOld(pid) pod := g.podRecords.getCurrent(pid)
// Get all containers in the old and the new pod. allContainers := getContainersFromPods(oldPod, pod) for _, container := range allContainers { events := computeEvents(oldPod, pod, &container.ID)
for _, e := range events { updateEvents(eventsByPodID, e) } } }
某些远程客户端还会调用每一个 Pod 来获取 Pod 的 spec 定义信息,这样一来,Pod 数量越多,延时就可能越高,因为 Pod 越多就会生成越多的事件。
//// pkg/kubelet/pleg/generic.go - relist()
// If there are events associated with a pod, we should update the // podCache. for pid, events := range eventsByPodID { pod := g.podRecords.getCurrent(pid) if g.cacheEnabled() { // updateCache() will inspect the pod and update the cache. If an // error occurs during the inspection, we want PLEG to retry again // in the next relist. To achieve this, we do not update the // associated podRecord of the pod, so that the change will be // detect again in the next relist. // TODO: If many pods changed during the same relist period, // inspecting the pod and getting the PodStatus to update the cache // serially may take a while. We should be aware of this and // parallelize if needed. if err := g.updateCache(pod, pid); err != nil { glog.Errorf("PLEG: Ignoring events for pod %s/%s: %v", pod.Name, pod.Namespace, err) : } : } // Update the internal storage and send out the events. g.podRecords.update(pid) for i := range events { // Filter out events that are not reliable and no other components use yet. if events[i].Type == ContainerChanged { continue } g.eventChannel <- events[i] } }
updateCache() 的详细调用堆栈如下图所示,其中 GetPodStatus() 用来获取 Pod 的 spec 定义信息:
相关代码如下:
//// pkg/kubelet/pleg/generic.go - updateCache()
func (g *GenericPLEG) updateCache(pod *kubecontainer.Pod, pid types.UID) error { : timestamp := g.clock.Now() // TODO: Consider adding a new runtime method // GetPodStatus(pod *kubecontainer.Pod) so that Docker can avoid listing // all containers again. status, err := g.runtime.GetPodStatus(pod.ID, pod.Name, pod.Namespace) : g.cache.Set(pod.ID, status, err, timestamp) return err }
// GetPodStatus retrieves the status of the pod, including the // information of all containers in the pod that are visible in Runtime. func (m *kubeGenericRuntimeManager) GetPodStatus(uid kubetypes.UID, name, namespace string) (*kubecontainer.PodStatus, error) { podSandboxIDs, err := m.getSandboxIDByPodUID(uid, nil) : for idx, podSandboxID := range podSandboxIDs { podSandboxStatus, err := m.runtimeService.PodSandboxStatus(podSandboxID) : }
// Get statuses of all containers visible in the pod. containerStatuses, err := m.getPodContainerStatuses(uid, name, namespace) : }
// getPodSandboxID gets the sandbox id by podUID and returns ([]sandboxID, error). // Param state could be nil in order to get all sandboxes belonging to same pod. func (m *kubeGenericRuntimeManager) getSandboxIDByPodUID(podUID kubetypes.UID, state *runtimeapi.PodSandboxState) ([]string, error) { : sandboxes, err := m.runtimeService.ListPodSandbox(filter) : return sandboxIDs, nil }
// getPodContainerStatuses gets all containers' statuses for the pod. func (m *kubeGenericRuntimeManager) getPodContainerStatuses(uid kubetypes.UID, name, namespace string) ([]*kubecontainer.ContainerStatus, error) { // Select all containers of the given pod. containers, err := m.runtimeService.ListContainers(&runtimeapi.ContainerFilter{ LabelSelector: map[string]string{types.KubernetesPodUIDLabel: string(uid)}, }) : // TODO: optimization: set maximum number of containers per container name to examine. for i, c := range containers { status, err := m.runtimeService.ContainerStatus(c.Id) : } : return statuses, nil }