- 1. Dynamic Provisioner
- 1.1. Provisioner Interface
- 1.2. 开发provisioner的步骤
- 2. CSI Provisioner
- 2.1. Main 函数
- 2.1.1. 读取环境变量
- 2.1.2. 获取clientset对象
- 2.1.3. k8s版本校验
- 2.1.4. 连接 csi socket
- 2.1.5. 构造csi-Provisioner对象
- 2.1.6. 运行ProvisionController
- 2.2. Provision 和 Delete 方法
- 2.2.1. Provision方法
- 2.2.2. Delete方法
- 2.3. 总结
- 2.1. Main 函数
- 3. csi-client
- 3.1. 构造csi-client
- 3.1.1. 构造grpcClient
- 3.1.2. 构造csi-client
- 3.2. csiClient.CreateVolume
- 3.3. csiClient.DeleteVolume
- 3.1. 构造csi-client
- 4. ProvisionController.Run
本文主要分析
csi-provisioner的源码,关于开发一个Dynamic Provisioner,具体可参考nfs-client-provisioner的源码分析
1. Dynamic Provisioner
1.1. Provisioner Interface
开发Dynamic Provisioner需要实现Provisioner接口,该接口有两个方法,分别是:
- Provision:创建存储资源,并且返回一个PV对象。
- Delete:移除对应的存储资源,但并没有删除PV对象。
1.2. 开发provisioner的步骤
- 写一个
provisioner实现Provisioner接口(包含Provision和Delete的方法)。 - 通过该
provisioner构建ProvisionController。 - 执行
ProvisionController的Run方法。
2. CSI Provisioner
CSI Provisioner的源码可参考:https://github.com/kubernetes-csi/external-provisioner。
2.1. Main 函数
2.1.1. 读取环境变量
源码如下:
var (provisioner = flag.String("provisioner", "", "Name of the provisioner. The provisioner will only provision volumes for claims that request a StorageClass with a provisioner field set equal to this name.")master = flag.String("master", "", "Master URL to build a client config from. Either this or kubeconfig needs to be set if the provisioner is being run out of cluster.")kubeconfig = flag.String("kubeconfig", "", "Absolute path to the kubeconfig file. Either this or master needs to be set if the provisioner is being run out of cluster.")csiEndpoint = flag.String("csi-address", "/run/csi/socket", "The gRPC endpoint for Target CSI Volume")connectionTimeout = flag.Duration("connection-timeout", 10*time.Second, "Timeout for waiting for CSI driver socket.")volumeNamePrefix = flag.String("volume-name-prefix", "pvc", "Prefix to apply to the name of a created volume")volumeNameUUIDLength = flag.Int("volume-name-uuid-length", -1, "Truncates generated UUID of a created volume to this length. Defaults behavior is to NOT truncate.")showVersion = flag.Bool("version", false, "Show version.")provisionController *controller.ProvisionControllerversion = "unknown")func init() {var config *rest.Configvar err errorflag.Parse()flag.Set("logtostderr", "true")if *showVersion {fmt.Println(os.Args[0], version)os.Exit(0)}glog.Infof("Version: %s", version)...}
通过init函数解析相关参数,其实provisioner指明为PVC提供PV的provisioner的名字,需要和StorageClass对象中的provisioner字段一致。
2.1.2. 获取clientset对象
源码如下:
// get the KUBECONFIG from env if specified (useful for local/debug cluster)kubeconfigEnv := os.Getenv("KUBECONFIG")if kubeconfigEnv != "" {glog.Infof("Found KUBECONFIG environment variable set, using that..")kubeconfig = &kubeconfigEnv}if *master != "" || *kubeconfig != "" {glog.Infof("Either master or kubeconfig specified. building kube config from that..")config, err = clientcmd.BuildConfigFromFlags(*master, *kubeconfig)} else {glog.Infof("Building kube configs for running in cluster...")config, err = rest.InClusterConfig()}if err != nil {glog.Fatalf("Failed to create config: %v", err)}clientset, err := kubernetes.NewForConfig(config)if err != nil {glog.Fatalf("Failed to create client: %v", err)}// snapclientset.NewForConfig creates a new Clientset for VolumesnapshotV1alpha1ClientsnapClient, err := snapclientset.NewForConfig(config)if err != nil {glog.Fatalf("Failed to create snapshot client: %v", err)}csiAPIClient, err := csiclientset.NewForConfig(config)if err != nil {glog.Fatalf("Failed to create CSI API client: %v", err)}
通过读取对应的k8s的配置,创建clientset对象,用来执行k8s对应的API,其中主要包括对PV和PVC等对象的创建删除等操作。
2.1.3. k8s版本校验
// The controller needs to know what the server version is because out-of-tree// provisioners aren't officially supported until 1.5serverVersion, err := clientset.Discovery().ServerVersion()if err != nil {glog.Fatalf("Error getting server version: %v", err)}
获取了k8s的版本信息,因为provisioners的功能在k8s 1.5及以上版本才支持。
2.1.4. 连接 csi socket
// Generate a unique ID for this provisionertimeStamp := time.Now().UnixNano() / int64(time.Millisecond)identity := strconv.FormatInt(timeStamp, 10) + "-" + strconv.Itoa(rand.Intn(10000)) + "-" + *provisioner// Provisioner will stay in Init until driver opens csi socket, once it's done// controller will exit this loop and proceed normally.socketDown := truegrpcClient := &grpc.ClientConn{}for socketDown {grpcClient, err = ctrl.Connect(*csiEndpoint, *connectionTimeout)if err == nil {socketDown = falsecontinue}time.Sleep(10 * time.Second)}
在Provisioner会停留在初始化状态,直到csi socket连接成功才正常运行。如果连接失败,会暂停10秒后重试,其中涉及以下2个参数:
- csiEndpoint:CSI Volume的gRPC地址,默认通过为
/run/csi/socket。 - connectionTimeout:连接CSI driver socket的超时时间,默认为10秒。
2.1.5. 构造csi-Provisioner对象
// Create the provisioner: it implements the Provisioner interface expected by// the controllercsiProvisioner := ctrl.NewCSIProvisioner(clientset, csiAPIClient, *csiEndpoint, *connectionTimeout, identity, *volumeNamePrefix, *volumeNameUUIDLength, grpcClient, snapClient)provisionController = controller.NewProvisionController(clientset,*provisioner,csiProvisioner,serverVersion.GitVersion,)
通过参数clientset,csiAPIClient, csiEndpoint, connectionTimeout, identity, volumeNamePrefix, volumeNameUUIDLength,grpcClient, snapClient构造csi-Provisioner对象。
通过csiProvisioner构造ProvisionController对象。
2.1.6. 运行ProvisionController
func main() {provisionController.Run(wait.NeverStop)}
ProvisionController实现了具体的PV和PVC的相关逻辑,Run方法以常驻进程的方式运行。
2.2. Provision和Delete方法
2.2.1. Provision方法
csiProvisioner的Provision方法具体源码参考:https://github.com/kubernetes-csi/external-provisioner/blob/master/pkg/controller/controller.go#L336
Provision方法用来创建存储资源,并且返回一个PV对象。其中入参是VolumeOptions,用来指定PV对象的相关属性。
1、构造PV相关属性
pvName, err := makeVolumeName(p.volumeNamePrefix, fmt.Sprintf("%s", options.PVC.ObjectMeta.UID), p.volumeNameUUIDLength)if err != nil {return nil, err}
2、构造CSIPersistentVolumeSource相关属性
driverState, err := checkDriverState(p.grpcClient, p.timeout, needSnapshotSupport)if err != nil {return nil, err}...// Resolve controller publish, node stage, node publish secret referencescontrollerPublishSecretRef, err := getSecretReference(controllerPublishSecretNameKey, controllerPublishSecretNamespaceKey, options.Parameters, pvName, options.PVC)if err != nil {return nil, err}nodeStageSecretRef, err := getSecretReference(nodeStageSecretNameKey, nodeStageSecretNamespaceKey, options.Parameters, pvName, options.PVC)if err != nil {return nil, err}nodePublishSecretRef, err := getSecretReference(nodePublishSecretNameKey, nodePublishSecretNamespaceKey, options.Parameters, pvName, options.PVC)if err != nil {return nil, err}...volumeAttributes := map[string]string{provisionerIDKey: p.identity}for k, v := range rep.Volume.Attributes {volumeAttributes[k] = v}...fsType := ""for k, v := range options.Parameters {switch strings.ToLower(k) {case "fstype":fsType = v}}if len(fsType) == 0 {fsType = defaultFSType}
3、创建CSI CreateVolumeRequest
// Create a CSI CreateVolumeRequest and Responsereq := csi.CreateVolumeRequest{Name: pvName,Parameters: options.Parameters,VolumeCapabilities: volumeCaps,CapacityRange: &csi.CapacityRange{RequiredBytes: int64(volSizeBytes),},}...glog.V(5).Infof("CreateVolumeRequest %+v", req)rep := &csi.CreateVolumeResponse{}...opts := wait.Backoff{Duration: backoffDuration, Factor: backoffFactor, Steps: backoffSteps}err = wait.ExponentialBackoff(opts, func() (bool, error) {ctx, cancel := context.WithTimeout(context.Background(), p.timeout)defer cancel()rep, err = p.csiClient.CreateVolume(ctx, &req)if err == nil {// CreateVolume has finished successfullyreturn true, nil}if status, ok := status.FromError(err); ok {if status.Code() == codes.DeadlineExceeded {// CreateVolume timed out, give it another chance to completeglog.Warningf("CreateVolume timeout: %s has expired, operation will be retried", p.timeout.String())return false, nil}}// CreateVolume failed , no reason to retry, bailing from ExponentialBackoffreturn false, err})if err != nil {return nil, err}if rep.Volume != nil {glog.V(3).Infof("create volume rep: %+v", *rep.Volume)}respCap := rep.GetVolume().GetCapacityBytes()if respCap < volSizeBytes {capErr := fmt.Errorf("created volume capacity %v less than requested capacity %v", respCap, volSizeBytes)delReq := &csi.DeleteVolumeRequest{VolumeId: rep.GetVolume().GetId(),}delReq.ControllerDeleteSecrets = provisionerCredentialsctx, cancel := context.WithTimeout(context.Background(), p.timeout)defer cancel()_, err := p.csiClient.DeleteVolume(ctx, delReq)if err != nil {capErr = fmt.Errorf("%v. Cleanup of volume %s failed, volume is orphaned: %v", capErr, pvName, err)}return nil, capErr}
Provison方法核心功能是调用p.csiClient.CreateVolume(ctx, &req)。
4、构造PV对象
pv := &v1.PersistentVolume{ObjectMeta: metav1.ObjectMeta{Name: pvName,},Spec: v1.PersistentVolumeSpec{PersistentVolumeReclaimPolicy: options.PersistentVolumeReclaimPolicy,AccessModes: options.PVC.Spec.AccessModes,Capacity: v1.ResourceList{v1.ResourceName(v1.ResourceStorage): bytesToGiQuantity(respCap),},// TODO wait for CSI VolumeSource APIPersistentVolumeSource: v1.PersistentVolumeSource{CSI: &v1.CSIPersistentVolumeSource{Driver: driverState.driverName,VolumeHandle: p.volumeIdToHandle(rep.Volume.Id),FSType: fsType,VolumeAttributes: volumeAttributes,ControllerPublishSecretRef: controllerPublishSecretRef,NodeStageSecretRef: nodeStageSecretRef,NodePublishSecretRef: nodePublishSecretRef,},},},}if driverState.capabilities.Has(PluginCapability_ACCESSIBILITY_CONSTRAINTS) {pv.Spec.NodeAffinity = GenerateVolumeNodeAffinity(rep.Volume.AccessibleTopology)}glog.Infof("successfully created PV %+v", pv.Spec.PersistentVolumeSource)return pv, nil
Provision方法只是通过VolumeOptions参数来构建PV对象,并没有执行具体PV的创建或删除的操作。
不同类型的Provisioner的,一般是PersistentVolumeSource类型和参数不同,例如csi-provisioner对应的PersistentVolumeSource为CSI,并且需要传入CSI相关的参数:
DriverVolumeHandleFSTypeVolumeAttributesControllerPublishSecretRefNodeStageSecretRefNodePublishSecretRef
2.2.2. Delete方法
csiProvisioner的delete方法具体源码参考:https://github.com/kubernetes-csi/external-provisioner/blob/master/pkg/controller/controller.go#L606
func (p *csiProvisioner) Delete(volume *v1.PersistentVolume) error {if volume == nil || volume.Spec.CSI == nil {return fmt.Errorf("invalid CSI PV")}volumeId := p.volumeHandleToId(volume.Spec.CSI.VolumeHandle)_, err := checkDriverState(p.grpcClient, p.timeout, false)if err != nil {return err}req := csi.DeleteVolumeRequest{VolumeId: volumeId,}// get secrets if StorageClass specifies itstorageClassName := volume.Spec.StorageClassNameif len(storageClassName) != 0 {if storageClass, err := p.client.StorageV1().StorageClasses().Get(storageClassName, metav1.GetOptions{}); err == nil {// Resolve provision secret credentials.// No PVC is provided when resolving provision/delete secret names, since the PVC may or may not exist at delete time.provisionerSecretRef, err := getSecretReference(provisionerSecretNameKey, provisionerSecretNamespaceKey, storageClass.Parameters, volume.Name, nil)if err != nil {return err}credentials, err := getCredentials(p.client, provisionerSecretRef)if err != nil {return err}req.ControllerDeleteSecrets = credentials}}ctx, cancel := context.WithTimeout(context.Background(), p.timeout)defer cancel()_, err = p.csiClient.DeleteVolume(ctx, &req)return err}
Delete方法主要是调用了p.csiClient.DeleteVolume(ctx, &req)方法。
2.3. 总结
csi provisioner实现了Provisioner接口,其中包含Provison和Delete两个方法:
Provision:调用csiClient.CreateVolume方法,同时构造并返回PV对象。Delete:调用csiClient.DeleteVolume方法。
csi provisioner的核心方法都调用了csi-client相关方法。
3. csi-client
csi client的相关代码参考:https://github.com/container-storage-interface/spec/blob/master/lib/go/csi/v0/csi.pb.go
3.1. 构造csi-client
3.1.1. 构造grpcClient
// Provisioner will stay in Init until driver opens csi socket, once it's done// controller will exit this loop and proceed normally.socketDown := truegrpcClient := &grpc.ClientConn{}for socketDown {grpcClient, err = ctrl.Connect(*csiEndpoint, *connectionTimeout)if err == nil {socketDown = falsecontinue}time.Sleep(10 * time.Second)}
通过连接csi socket,连接成功才构造可用的grpcClient。
3.1.2. 构造csi-client
通过grpcClient构造csi-client。
// Create the provisioner: it implements the Provisioner interface expected by// the controllercsiProvisioner := ctrl.NewCSIProvisioner(clientset, csiAPIClient, *csiEndpoint, *connectionTimeout, identity, *volumeNamePrefix, *volumeNameUUIDLength, grpcClient, snapClient)
NewCSIProvisioner
// NewCSIProvisioner creates new CSI provisionerfunc NewCSIProvisioner(client kubernetes.Interface,csiAPIClient csiclientset.Interface,csiEndpoint string,connectionTimeout time.Duration,identity string,volumeNamePrefix string,volumeNameUUIDLength int,grpcClient *grpc.ClientConn,snapshotClient snapclientset.Interface) controller.Provisioner {csiClient := csi.NewControllerClient(grpcClient)provisioner := &csiProvisioner{client: client,grpcClient: grpcClient,csiClient: csiClient,csiAPIClient: csiAPIClient,snapshotClient: snapshotClient,timeout: connectionTimeout,identity: identity,volumeNamePrefix: volumeNamePrefix,volumeNameUUIDLength: volumeNameUUIDLength,}return provisioner}
NewControllerClient
csiClient := csi.NewControllerClient(grpcClient)...type controllerClient struct {cc *grpc.ClientConn}func NewControllerClient(cc *grpc.ClientConn) ControllerClient {return &controllerClient{cc}}
3.2. csiClient.CreateVolume
csi provisoner中调用csiClient.CreateVolume代码如下:
opts := wait.Backoff{Duration: backoffDuration, Factor: backoffFactor, Steps: backoffSteps}err = wait.ExponentialBackoff(opts, func() (bool, error) {ctx, cancel := context.WithTimeout(context.Background(), p.timeout)defer cancel()rep, err = p.csiClient.CreateVolume(ctx, &req)if err == nil {// CreateVolume has finished successfullyreturn true, nil}if status, ok := status.FromError(err); ok {if status.Code() == codes.DeadlineExceeded {// CreateVolume timed out, give it another chance to completeglog.Warningf("CreateVolume timeout: %s has expired, operation will be retried", p.timeout.String())return false, nil}}// CreateVolume failed , no reason to retry, bailing from ExponentialBackoffreturn false, err})
CreateVolumeRequest的构造:
// Create a CSI CreateVolumeRequest and Responsereq := csi.CreateVolumeRequest{Name: pvName,Parameters: options.Parameters,VolumeCapabilities: volumeCaps,CapacityRange: &csi.CapacityRange{RequiredBytes: int64(volSizeBytes),},}...req.VolumeContentSource = volumeContentSource...req.AccessibilityRequirements = requirements...req.ControllerCreateSecrets = provisionerCredentials
具体的Create实现方法如下:
其中
csiClient是个接口类型
具体代码参考controllerClient.CreateVolume
func (c *controllerClient) CreateVolume(ctx context.Context, in *CreateVolumeRequest, opts ...grpc.CallOption) (*CreateVolumeResponse, error) {out := new(CreateVolumeResponse)err := grpc.Invoke(ctx, "/csi.v0.Controller/CreateVolume", in, out, c.cc, opts...)if err != nil {return nil, err}return out, nil}
3.3. csiClient.DeleteVolume
csi provisoner中调用csiClient.DeleteVolume代码如下:
func (p *csiProvisioner) Delete(volume *v1.PersistentVolume) error {...req := csi.DeleteVolumeRequest{VolumeId: volumeId,}// get secrets if StorageClass specifies it...ctx, cancel := context.WithTimeout(context.Background(), p.timeout)defer cancel()_, err = p.csiClient.DeleteVolume(ctx, &req)return err}
DeleteVolumeRequest的构造:
req := csi.DeleteVolumeRequest{VolumeId: volumeId,}...req.ControllerDeleteSecrets = credentials
将构造的DeleteVolumeRequest传给DeleteVolume方法。
具体的Delete实现方法如下:
具体代码参考:controllerClient.DeleteVolume
func (c *controllerClient) DeleteVolume(ctx context.Context, in *DeleteVolumeRequest, opts ...grpc.CallOption) (*DeleteVolumeResponse, error) {out := new(DeleteVolumeResponse)err := grpc.Invoke(ctx, "/csi.v0.Controller/DeleteVolume", in, out, c.cc, opts...)if err != nil {return nil, err}return out, nil}
4. ProvisionController.Run
自定义的provisioner实现了Provisoner接口的Provision和Delete方法,这两个方法主要对后端存储做创建和删除操作,并没有对PV对象进行创建和删除操作。
PV对象的相关操作具体由ProvisionController中的provisionClaimOperation和deleteVolumeOperation具体执行,同时调用了具体provisioner的Provision和Delete两个方法来对存储数据做处理。
func main() {provisionController.Run(wait.NeverStop)}
这块代码逻辑可参考:nfs-client-provisioner 源码分析
参考文章:
- https://github.com/kubernetes-csi/external-provisioner
- https://github.com/container-storage-interface/spec
- https://github.com/kubernetes/community/blob/master/contributors/design-proposals/storage/container-storage-interface.md
- https://github.com/container-storage-interface/spec/blob/master/spec.md
