【kubeedge源码分析】edgecore源码分析之二edged模块

【kubeedge源码分析】edgecore源码分析之二edged模块描述 Edged 一个运行在 edge 节点的 agent 程序 管理边缘的容器化应用程序 目前支持 docker 运行时 将来会支持更多的运行时 比如 containerd 等 Fig 1 EdgeD Functionalit 1 Register 函数 根据

大家好,我是讯享网,很高兴认识大家。

描述

  • Edged:一个运行在 edge 节点的 agent 程序,管理边缘的容器化应用程序,目前支持 docker 运行时,将来会支持更多的运行时,比如 containerd等


讯享网

                                               Fig 1: EdgeD Functionalities

 

1. Register 函数

    根据 registerModules 中模块注册,将调用 edged 模块,路径 kubeedge/edge/pkg/edged/edged.go,注册 edged 模块

// Register register edged func Register() { edged, err := newEdged() if err != nil { klog.Errorf("init new edged error, %v", err) return } core.Register(edged) }

讯享网

   1.1 newEged 函数

    newEged 实例化 eged 对象,并进行初始化,getConfig 函数主要是初始化 kubelet 启动参数,包括 node 名字,namespace imageGC docker endpoint CRI 接口类型,dns,内存等等

    包括 pod manager 初始化,imageGC 出发策略等,实例化 eged 并初始化 kubelet 启动参数,根目录默认为 /var/lib/edged

讯享网//newEdged creates new edged object and initialises it func newEdged() (*edged, error) { conf := getConfig() backoff := flowcontrol.NewBackOff(backOffPeriod, MaxContainerBackOff) podManager := podmanager.NewPodManager() policy := images.ImageGCPolicy{ HighThresholdPercent: conf.imageGCHighThreshold, LowThresholdPercent: conf.imageGCLowThreshold, MinAge: minAge, } // build new object to match interface recorder := record.NewEventRecorder()

     1.1.1 如果未设置运行时 endpoint,则设置默认未 unix:///var/run/dockershim.sock

if conf.remoteRuntimeEndpoint != "" { // remoteImageEndpoint is same as remoteRuntimeEndpoint if not explicitly specified if conf.remoteImageEndpoint == "" { conf.remoteImageEndpoint = conf.remoteRuntimeEndpoint } }

     1.1.2 如果运行时 endpoint 为 docker 的情况

     目前支持的 runtime dockershim,初始化 docker 服务,启动 dockershim GRPC 服务,并注册 runtime 和 image服务

讯享网//create and start the docker shim running as a grpc server if conf.remoteRuntimeEndpoint == DockerShimEndpoint || conf.remoteRuntimeEndpoint == DockerShimEndpointDeprecated { streamingConfig := &streaming.Config{} ...... if err := server.Start(); err != nil { return nil, err } }

     1.1.3 使用 socket 建立 GRPC 连接 runtime 和 image 服务

containerRefManager := kubecontainer.NewRefManager() httpClient := &http.Client{} runtimeService, imageService, err := getRuntimeAndImageServices(conf.remoteRuntimeEndpoint, conf.remoteRuntimeEndpoint, conf.RuntimeRequestTimeout) if err != nil { return nil, err } if ed.os == nil { ed.os = kubecontainer.RealOS{} }

     1.1.4 容器运行时 manager 实例化

      复用了 kubelet 的 NewKubeGerericRuntimeManager 函数,实现在 kubernetes/pkg/kubelet/kuberuntime/kuberuntime_manager.go

讯享网var machineInfo cadvisorapi.MachineInfo machineInfo.MemoryCapacity = uint64(conf.memoryCapacity) containerRuntime, err := kuberuntime.NewKubeGenericRuntimeManager( recorder, ed.livenessManager, "", containerRefManager, &machineInfo, ed, ed.os, ed, httpClient, backoff, false, 0, 0, false, metav1.Duration{Duration: 100 * time.Millisecond}, runtimeService, imageService, ed.clcm.InternalContainerLifecycle(), nil, nil, )

     1.1.5 实例化 cadvisor manager

      复用了 kubelet 使用 cadvisor 模块,实现在 kubernetes/pkg/kubelet/cm/container_manager_linux.go

cadvisorInterface, err := cadvisor.New("") containerManager, err := cm.NewContainerManager(mount.New(""), cadvisorInterface, cm.NodeConfig{ CgroupDriver: conf.cgroupDriver, SystemCgroupsName: conf.cgroupDriver, KubeletCgroupsName: conf.cgroupDriver, ContainerRuntime: conf.runtimeType, KubeletRootDir: DefaultRootDir, }, false, conf.devicePluginEnabled, recorder)

     1.1.6 实例化 imageGC containerGC manager

     同样使用了 kubelet 中该代码,不再说明,可以参看 imageGC containerGC 文章

     https://blog.csdn.net/zhonglinzhang/article/details/

     https://blog.csdn.net/zhonglinzhang/article/details/

讯享网imageGCManager, err := images.NewImageGCManager(ed.containerRuntime, statsProvider, recorder, nodeRef, policy, conf.PodSandboxImage) if err != nil { return nil, fmt.Errorf("failed to initialize image manager: %v", err) } ed.imageGCManager = imageGCManager containerGCManager, err := kubecontainer.NewContainerGC(containerRuntime, containerGCPolicy, &containers.KubeSourcesReady{}) if err != nil { return nil, fmt.Errorf("init Container GC Manager failed with error %s", err.Error()) }

     1.1.7 实例化 volume plugin manager

     包裹了 kubernetes/pkg/volume/plugins.go,初始化 plugin

      可以参考:https://blog.csdn.net/zhonglinzhang/article/details/

// NewInitializedVolumePluginMgr returns a new instance of volume.VolumePluginMgr func NewInitializedVolumePluginMgr( edge *edged, plugins []volume.VolumePlugin) (*volume.VolumePluginMgr, error) { evh := &edgedVolumeHost{ edge: edge, volumePluginMgr: volume.VolumePluginMgr{}, } if err := evh.volumePluginMgr.InitPlugins(plugins, nil, evh); err != nil { return nil, fmt.Errorf( "Could not initialize volume plugins for KubeletVolumePluginMgr: %v", err) } return &evh.volumePluginMgr, nil }

   1.2 注册 edged 模块加入到全局变量 modules map中

讯享网// Register register module func Register(m Module) { if isModuleEnabled(m.Name()) { modules[m.Name()] = m klog.Infof("Module %v registered", m.Name()) } else { disabledModules[m.Name()] = m klog.Warningf("Module %v is not register, please check modules.yaml", m.Name()) } }

 

edged 同样实现了  Module 接口 

// Module interface type Module interface { Name() string Group() string Start() Cleanup() }

 

2. edged 的 name 设置为 edged,group 设置为 edged

讯享网func (e *edged) Name() string { return modules.EdgedModuleName } func (e *edged) Group() string { return modules.EdgedGroup }

 

   启动模块分别调用不同的 Start() 方法

3. edged 的 Start 函数

func (e *edged) Start() { e.metaClient = client.New() var ctx context.Context ctx, e.cancel = context.WithCancel(context.Background()) // use self defined client to replace fake kube client e.kubeClient = fakekube.NewSimpleClientset(e.metaClient)

   3.1 NewManager 函数包裹了 kubelet 的 status manager

    参考 status manager:https://blog.csdn.net/zhonglinzhang/article/details/

讯享网//NewManager creates and returns a new manager object func NewManager(kubeClient clientset.Interface, podManager podmanager.Manager, podDeletionSafety status.PodDeletionSafetyProvider, metaClient client.CoreInterface) status.Manager { kubeManager := status.NewManager(kubeClient, podManager, podDeletionSafety) return &manager{ Manager: kubeManager, metaClient: metaClient, podManager: podManager, apiStatusVersions: make(map[types.UID]*v1.PodStatus), } }

   3.2 initializeModules 函数调用了 kubelet 中 container manager 模块启动服务

func (e *edged) initializeModules() error { node, _ := e.initialNode() if err := e.containerManager.Start(node, e.GetActivePods, edgedutil.NewSourcesReady(), e.statusManager, e.runtimeService); err != nil { klog.Errorf("Failed to start device plugin manager %v", err) return err } return nil }

   3.3 volume manager 

    不用看了,使用了 kubernetes 的 volume manager 并启动,还有下面的 probe manager 

讯享网e.volumeManager = volumemanager.NewVolumeManager( true, types.NodeName(e.nodeName), e.podManager, e.statusManager, e.kubeClient, e.volumePluginMgr, e.containerRuntime, e.mounter, e.getPodsDir(), record.NewEventRecorder(), false, false, ) go e.volumeManager.Run(edgedutil.NewSourcesReady(), utilwait.NeverStop)

   3.4 podAddWorkerRun

     并发 5 个 worker 处理新增 pod 的请求,调用 consumePodAddition 函数创建 pod,创建 pod 的工作目录

func (e *edged) consumePodAddition(namespacedName *types.NamespacedName) error { podName := namespacedName.Name klog.Infof("start to consume added pod [%s]", podName) pod, ok := e.podManager.GetPodByName(namespacedName.Namespace, podName) if !ok || pod.DeletionTimestamp != nil { return apis.ErrPodNotFound } if err := e.makePodDataDirs(pod); err != nil { klog.Errorf("Unable to make pod data directories for pod %q: %v", format.Pod(pod), err) return err }

 

总结:

     edged 模块的 Register 函数 newEdged 初始化如 kubelet 启动的参数     

 

参考:

     https://docs.kubeedge.io/en/latest/modules/edge/edged.html

小讯
上一篇 2025-04-02 23:01
下一篇 2025-03-30 15:29

相关推荐

版权声明:本文内容由互联网用户自发贡献,该文观点仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容,请联系我们,一经查实,本站将立刻删除。
如需转载请保留出处:https://51itzy.com/kjqy/50913.html