程序印象

Kubelet - Pod 创建之 CRI 和 CNI 源码剖析

2019/12/02 Share

本文源码跟踪基于 1.12.6

CRI

创建 Pod 入口

k8s.io/kubernetes/pkg/kubelet/kuberuntime/kuberuntime_manager.go

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
// kubeGenericRuntimeManager::runtimeService:  newInstrumentedRuntimeService(runtimeService)

// SyncPod syncs the running pod into the desired pod by executing following steps:
//
// 1. Compute sandbox and container changes.
// 2. Kill pod sandbox if necessary.
// 3. Kill any containers that should not be running.
// 4. Create sandbox if necessary.
// 5. Create init containers.
// 6. Create normal containers.
func (m *kubeGenericRuntimeManager) SyncPod(pod *v1.Pod, _ v1.PodStatus, podStatus *kubecontainer.PodStatus, pullSecrets []v1.Secret, backOff *flowcontrol.Backoff) (result kubecontainer.PodSyncResult) {
// Step 1: Compute sandbox and container changes.

// Step 2: Kill the pod if the sandbox has changed.
if podContainerChanges.KillPod {
// ...
} else {
// Step 3: kill any running containers in this pod which are not to keep.
// ...
}

// Keep terminated init containers fairly aggressively controlled
// This is an optimization because container removals are typically handled
// by container garbage collector.
m.pruneInitContainersBeforeStart(pod, podStatus)

// We pass the value of the podIP down to generatePodSandboxConfig and
// generateContainerConfig, which in turn passes it to various other
// functions, in order to facilitate functionality that requires this
// value (hosts file and downward API) and avoid races determining
// the pod IP in cases where a container requires restart but the
// podIP isn't in the status manager yet.
//
// We default to the IP in the passed-in pod status, and overwrite it if the
// sandbox needs to be (re)started.
podIP := ""
if podStatus != nil {
podIP = podStatus.IP
}

// Step 4: Create a sandbox for the pod if necessary.
// 创建使用 pause 镜像创建的 sandbox
podSandboxID := podContainerChanges.SandboxID
if podContainerChanges.CreateSandbox {
// ...
podSandboxID, msg, err = m.createPodSandbox(pod, podContainerChanges.Attempt)
// 底层调用 m.runtimeService.RunPodSandbox(podSandboxConfig, runtimeHandler)

glog.V(4).Infof("Created PodSandbox %q for pod %q", podSandboxID, format.Pod(pod))

podSandboxStatus, err := m.runtimeService.PodSandboxStatus(podSandboxID)
}

// Get podSandboxConfig for containers to start.
podSandboxConfig, err := m.generatePodSandboxConfig(pod, podContainerChanges.Attempt)

// Step 5: start the init container. 启动所有的 init container
if container := podContainerChanges.NextInitContainerToStart; container != nil {
// Start the next init container.
glog.V(4).Infof("Creating init container %+v in pod %v", container, format.Pod(pod))
if msg, err := m.startContainer(podSandboxID, podSandboxConfig, container, pod, podStatus, pullSecrets, podIP, kubecontainer.ContainerTypeInit); err != nil {
// ...
}

// Successfully started the container; clear the entry in the failure
glog.V(4).Infof("Completed init container %q for pod %q", container.Name, format.Pod(pod))
}

// Step 6: start containers in podContainerChanges.ContainersToStart. 启动 container
for _, idx := range podContainerChanges.ContainersToStart {
container := &pod.Spec.Containers[idx]
// ...

glog.V(4).Infof("Creating container %+v in pod %v", container, format.Pod(pod))
if msg, err := m.startContainer(podSandboxID, podSandboxConfig, container, pod, podStatus, pullSecrets, podIP, kubecontainer.ContainerTypeRegular); err != nil {
// ...
}
}

return
}

CRI 接口调用和实现

CRI 相关文档: Introducing Container Runtime Interface (CRI) in Kubernetes

k8s.io/kubernetes/pkg/kubelet/dockershim/docker_sandbox.go

dockerService 类实现了 CRI 接口:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
type dockerService struct {
client libdocker.Interface
os kubecontainer.OSInterface
podSandboxImage string
streamingRuntime *streamingRuntime
streamingServer streaming.Server

network *network.PluginManager // 实现了网络
// Map of podSandboxID :: network-is-ready
networkReady map[string]bool
networkReadyLock sync.Mutex

containerManager cm.ContainerManager
// cgroup driver used by Docker runtime.
cgroupDriver string
checkpointManager checkpointmanager.CheckpointManager
// caches the version of the runtime.
// To be compatible with multiple docker versions, we need to perform
// version checking for some operations. Use this cache to avoid querying
// the docker daemon every time we need to do such checks.
versionCache *cache.ObjectCache
// startLocalStreamingServer indicates whether dockershim should start a
// streaming server on localhost.
startLocalStreamingServer bool
}

RuntimeService 接口定义:

1
2
3
4
5
6
7
8
9
10
11
12
13
// RuntimeService interface should be implemented by a container runtime.
// The methods should be thread-safe.
type RuntimeService interface {
RuntimeVersioner
ContainerManager
PodSandboxManager
ContainerStatsManager

// UpdateRuntimeConfig updates runtime configuration if specified
UpdateRuntimeConfig(runtimeConfig *runtimeapi.RuntimeConfig) error
// Status returns the status of the runtime.
Status() (*runtimeapi.RuntimeStatus, error)
}

PodSandboxManager 接口定义:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
// PodSandboxManager contains methods for operating on PodSandboxes. The methods
// are thread-safe.
type PodSandboxManager interface {
// RunPodSandbox creates and starts a pod-level sandbox. Runtimes should ensure
// the sandbox is in ready state.
RunPodSandbox(config *runtimeapi.PodSandboxConfig, runtimeHandler string) (string, error)
// StopPodSandbox stops the sandbox. If there are any running containers in the
// sandbox, they should be force terminated.
StopPodSandbox(podSandboxID string) error
// RemovePodSandbox removes the sandbox. If there are running containers in the
// sandbox, they should be forcibly removed.
RemovePodSandbox(podSandboxID string) error
// PodSandboxStatus returns the Status of the PodSandbox.
PodSandboxStatus(podSandboxID string) (*runtimeapi.PodSandboxStatus, error)
// ListPodSandbox returns a list of Sandbox.
ListPodSandbox(filter *runtimeapi.PodSandboxFilter) ([]*runtimeapi.PodSandbox, error)
// PortForward prepares a streaming endpoint to forward ports from a PodSandbox, and returns the address.
PortForward(*runtimeapi.PortForwardRequest) (*runtimeapi.PortForwardResponse, error)
}

RunPodSandbox 接口为例:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
// RunPodSandbox creates and starts a pod-level sandbox. Runtimes should ensure
// the sandbox is in ready state.
// For docker, PodSandbox is implemented by a container holding the network
// namespace for the pod.
// Note: docker doesn't use LogDirectory (yet).
func (ds *dockerService) RunPodSandbox(ctx context.Context, r *runtimeapi.RunPodSandboxRequest) (*runtimeapi.RunPodSandboxResponse, error) {
config := r.GetConfig()

// Step 1: Pull the image for the sandbox.
image := defaultSandboxImage
if err := ensureSandboxImageExists(ds.client, image); err != nil {
}

createConfig, err := ds.makeSandboxDockerConfig(config, image)
createResp, err := ds.client.CreateContainer(*createConfig)

resp := &runtimeapi.RunPodSandboxResponse{PodSandboxId: createResp.ID}

ds.setNetworkReady(createResp.ID, false)

// Step 3: Create Sandbox Checkpoint.
if err = ds.checkpointManager.CreateCheckpoint(createResp.ID, constructPodSandboxCheckpoint(config)); err != nil {
return nil, err
}

// Step 4: Start the sandbox container.
// Assume kubelet's garbage collector would remove the sandbox later, if
// startContainer failed.
err = ds.client.StartContainer(createResp.ID)

// 设置 DNS config
// Do not invoke network plugins if in hostNetwork mode.
if config.GetLinux().GetSecurityContext().GetNamespaceOptions().GetNetwork() == runtimeapi.NamespaceMode_NODE {
return resp, nil
}

// Step 5: Setup networking for the sandbox.
// All pod networking is setup by a CNI plugin discovered at startup time.
// This plugin assigns the pod ip, sets up routes inside the sandbox,
// creates interfaces etc. In theory, its jurisdiction ends with pod
// sandbox networking, but it might insert iptables rules or open ports
// on the host as well, to satisfy parts of the pod spec that aren't
// recognized by the CNI standard yet.
cID := kubecontainer.BuildContainerID(runtimeName, createResp.ID)
err = ds.network.SetUpPod(config.GetMetadata().Namespace, config.GetMetadata().Name, cID, config.Annotations)
return resp, nil
}

CNI

CNI 接口调用和实现

在 dockershim 模式下,cniNetworkPlugin 实现了 CNI 定义的接口,SetUpPod 函数定义如下:

k8s.io/kubernetes/pkg/kubelet/dockershim/network/cni/cni.go

1
2
3
4
5
6
7
8
9
10
11
12
13
func (plugin *cniNetworkPlugin) SetUpPod(namespace string, name string, id kubecontainer.ContainerID, annotations map[string]string) error {
netnsPath, err := plugin.host.GetNetNS(id.ID)

// Windows doesn't have loNetwork. It comes only with Linux
if plugin.loNetwork != nil {
if _, err = plugin.addToNetwork(plugin.loNetwork, name, namespace, id, netnsPath, annotations); err != nil {
}
}

_, err = plugin.addToNetwork(plugin.getDefaultNetwork(), name, namespace, id, netnsPath, annotations)

return err
}

network 相关的在函数 NewDockerService 中初始化:

k8s.io/kubernetes/pkg/kubelet/dockershim/docker_service.go

1
2
3
4
5
6
7
8
9
10
11
12
func NewDockerService(...){
// dockershim currently only supports CNI plugins.
// 使用传入的 cni 配置目录和bin目录,初始化插件,并供后续选择
pluginSettings.PluginBinDirs = cni.SplitDirs(pluginSettings.PluginBinDirString)
cniPlugins := cni.ProbeNetworkPlugins(pluginSettings.PluginConfDir, pluginSettings.PluginBinDirs)
cniPlugins = append(cniPlugins, kubenet.NewPlugin(pluginSettings.PluginBinDirs))

// cniPlugins 传入全部的系统 CNI 插件
plug, err := network.InitNetworkPlugin(cniPlugins, pluginSettings.PluginName, netHost, pluginSettings.HairpinMode, pluginSettings.NonMasqueradeCIDR, pluginSettings.MTU)

ds.network = network.NewPluginManager(plug)
}

CNI 接口定义

在 DockerShim 中使用 cniNetworkPlugin 实现了 CNI 接口,CNI 接口如下:

k8s.io/kubernetes/pkg/kubelet/dockershim/network/cni/cni.go

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
type cniNetworkPlugin struct {
network.NoopNetworkPlugin

loNetwork *cniNetwork // 指向 lo CNI 的实现,参见后续分析
defaultNetwork *cniNetwork // 指向 default CNI 的实现,参见后续分析

sync.RWMutex
defaultNetwork *cniNetwork

host network.Host
execer utilexec.Interface
nsenterPath string
confDir string
binDirs []string
podCidr string
}

CNI 接口如下:

k8s.io/kubernetes/pkg/kubelet/dockershim/network/plugins.go面向 Pod 的 NetworkPlugin 接口 ,CNI 规范参见:https://github.com/containernetworking/cni/blob/master/SPEC.md#network-configuration

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
// Plugin is an interface to network plugins for the kubelet
type NetworkPlugin interface {
// Init initializes the plugin. This will be called exactly once
// before any other methods are called.
Init(host Host, hairpinMode kubeletconfig.HairpinMode, nonMasqueradeCIDR string, mtu int) error

// Called on various events like:
// NET_PLUGIN_EVENT_POD_CIDR_CHANGE
Event(name string, details map[string]interface{})

// Name returns the plugin's name. This will be used when searching
// for a plugin by name, e.g.
Name() string

// Returns a set of NET_PLUGIN_CAPABILITY_*
Capabilities() utilsets.Int

// SetUpPod is the method called after the infra container of
// the pod has been created but before the other containers of the
// pod are launched.
SetUpPod(namespace string, name string, podSandboxID kubecontainer.ContainerID, annotations map[string]string) error

// TearDownPod is the method called before a pod's infra container will be deleted
TearDownPod(namespace string, name string, podSandboxID kubecontainer.ContainerID) error

// GetPodNetworkStatus is the method called to obtain the ipv4 or ipv6 addresses of the container
GetPodNetworkStatus(namespace string, name string, podSandboxID kubecontainer.ContainerID) (*PodNetworkStatus, error)

// Status returns error if the network plugin is in error state
Status() error
}

cniNetworkPlugin 接口中包含接口 CNIConfig,其实现了 libcni.CNI 接口

1
2
3
4
5
6
7
8
9
10
11
12
type cniNetworkPlugin struct {
network.NoopNetworkPlugin

loNetwork *cniNetwork // lo 本地端口
defaultNetwork *cniNetwork // 默认网络
}

type cniNetwork struct {
name string
NetworkConfig *libcni.NetworkConfigList
CNIConfig libcni.CNI // 指向 CNI 接口定义
}

CNI 接口定义

1
2
3
4
5
6
7
type CNI interface {
AddNetworkList(net *NetworkConfigList, rt *RuntimeConf) (types.Result, error)
DelNetworkList(net *NetworkConfigList, rt *RuntimeConf) error

AddNetwork(net *NetworkConfig, rt *RuntimeConf) (types.Result, error)
DelNetwork(net *NetworkConfig, rt *RuntimeConf) error
}

CNIConfig 的初始化:

1
2
3
4
5
6
7
8
9
10
11
12
func ProbeNetworkPlugins(confDir string, binDirs []string) []network.NetworkPlugin {

plugin := &cniNetworkPlugin{
defaultNetwork: nil,
loNetwork: getLoNetwork(binDirs), // lo 初始化
execer: utilexec.New(),
confDir: confDir,
binDirs: binDirs,
}

return []network.NetworkPlugin{plugin}
}

getLoNetwork 函数定义:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
func getLoNetwork(binDirs []string) *cniNetwork {
loConfig, err := libcni.ConfListFromBytes([]byte(`{
"cniVersion": "0.2.0",
"name": "cni-loopback",
"plugins":[{
"type": "loopback"
}]
}`))
if err != nil {
// The hardcoded config above should always be valid and unit tests will
// catch this
panic(err)
}
loNetwork := &cniNetwork{
name: "lo",
NetworkConfig: loConfig,
CNIConfig: &libcni.CNIConfig{Path: binDirs},
}

return loNetwork
}

cniNetworkPlugin 结构体中默认网络字段 defaultNetwork 的初始化如下,该函数在 cni.ProbeNetworkPlugins 函数中被调用:

1
2
3
4
func (plugin *cniNetworkPlugin) syncNetworkConfig() {
network, err := getDefaultCNINetwork(plugin.confDir, plugin.binDirs)
plugin.setDefaultNetwork(network)
}

getDefaulgotCNINetwork 的定义如下,主要工作是从 CNI 的 conf 目录中读取 conf 文件,完成默认 network 的配置初始化:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
func getDefaulgotCNINetwork(confDir string, binDirs []string) (*cniNetwork, error) {
files, err := libcni.ConfFiles(confDir, []string{".conf", ".conflist", ".json"})
switch {
case err != nil:
return nil, err
case len(files) == 0:
return nil, fmt.Errorf("No networks found in %s", confDir)
}

sort.Strings(files)
for _, confFile := range files {
var confList *libcni.NetworkConfigList
if strings.HasSuffix(confFile, ".conflist") {
confList, err = libcni.ConfListFromFile(confFile)
} else {
conf, err := libcni.ConfFromFile(confFile)

// Ensure the config has a "type" so we know what plugin to run.
// Also catches the case where somebody put a conflist into a conf file.
if conf.Network.Type == "" {
glog.Warningf("Error loading CNI config file %s: no 'type'; perhaps this is a .conflist?", confFile)
continue
}

confList, err = libcni.ConfListFromConf(conf)
if err != nil {
glog.Warningf("Error converting CNI config file %s to list: %v", confFile, err)
continue
}
}
if len(confList.Plugins) == 0 {
glog.Warningf("CNI config list %s has no networks, skipping", confFile)
continue
}

glog.V(4).Infof("Using CNI configuration file %s", confFile)

network := &cniNetwork{
name: confList.Name,
NetworkConfig: confList,
CNIConfig: &libcni.CNIConfig{Path: binDirs},
}
return network, nil
}
return nil, fmt.Errorf("No valid networks found in %s", confDir)
}
CATALOG
  1. 1. CRI
    1. 1.1. 创建 Pod 入口
    2. 1.2. CRI 接口调用和实现
  2. 2. CNI
    1. 2.1. CNI 接口调用和实现
    2. 2.2. CNI 接口定义