背景

Kubernetes Operator 是 CNCF 主推的云原生扩展机制。用 Go 写 Operator 是我日常工作的重要部分。

这篇文章聊聊怎么从零开发一个生产级的 Operator。

核心概念

Operator 核心是声明式 API + reconciliation loop

用户声明期望状态 → Controller 调和 → 实际状态趋近期望

项目结构

my-operator/
├── main.go
├── api/
│   └── v1/
│       └── myapp_types.go    # CRD 定义
├── controllers/
│   └── myapp_controller.go   # Reconciliation 逻辑
└── config/
    ├── crd/
    └── rbac/

第一步:定义 CRD (Custom Resource Definition)

// api/v1/myapp_types.go
package v1

import (
    metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
)

type MyAppSpec struct {
    Replicas  int32  `json:"replicas,omitempty"`
    Image     string `json:"image"`
    Port      int32  `json:"port"`
    EnvVars   []EnvVar `json:"envVars,omitempty"`
}

type EnvVar struct {
    Name  string `json:"name"`
    Value string `json:"value"`
}

type MyAppStatus struct {
    AvailableReplicas int32 `json:"availableReplicas,omitempty"`
    Conditions        []metav1.Condition `json:"conditions,omitempty"`
}

// +kubebuilder:object:root=true
// +kubebuilder:subresource:status
// +kubebuilder:resource:shortName=myapp

type MyApp struct {
    metav1.TypeMeta   `json:",inline"`
    metav1.ObjectMeta `json:"metadata,omitempty"`
    Spec   MyAppSpec   `json:"spec,omitempty"`
    Status MyAppStatus `json:"status,omitempty"`
}

func (r *MyApp) Hub() {}

第二步:生成代码

# 安装 controller-gen
go install sigs.k8s.io/controller-tools/cmd/controller-gen@latest

# 生成 CRD + RBAC + DeepCopy
controller-gen object:headerFile="hack/boilerplate.go.txt" paths="./..."

# 生成 CRD YAML
controller-gen crd:crdVersions=v1 paths="./..." output:crd:artifacts:config=config/crd/bases

第三步:实现 Controller

// controllers/myapp_controller.go
package controllers

type MyAppReconciler struct {
    Client client.Client
    Scheme *runtime.Scheme
    Log    logr.Logger
}

func (r *MyAppReconciler) Reconcile(ctx context.Context, req ctrl.Request) (ctrl.Result, error) {
    log := r.Log.WithValues("myapp", req.NamespacedName)

    // 1. 获取资源
    var myapp v1.MyApp
    if err := r.Get(ctx, req.NamespacedName, &myapp); err != nil {
        return ctrl.Result{}, client.IgnoreNotFound(err)
    }

    // 2. 构建 Deployment
    deploy := r.buildDeployment(&myapp)
    if err := ctrl.SetControllerReference(&myapp, deploy, r.Scheme); err != nil {
        return ctrl.Result{}, err
    }

    // 3. 创建或更新 Deployment
    found := &appsv1.Deployment{}
    err := r.Get(ctx, req.NamespacedName, found)
    if err != nil && errors.IsNotFound(err) {
        log.Info("Creating Deployment", "name", deploy.Name)
        err = r.Create(ctx, deploy)
    } else if err == nil {
        // 更新(需要对比 spec 差异)
        if !r.deploymentEqual(found, deploy) {
            found.Spec = deploy.Spec
            log.Info("Updating Deployment")
            err = r.Update(ctx, found)
        }
    }

    // 4. 更新 Status
    r.updateStatus(&myapp, found)

    return ctrl.Result{RequeueAfter: 30 * time.Second}, nil
}

func (r *MyAppReconciler) buildDeployment(app *v1.MyApp) *appsv1.Deployment {
    replicas := app.Spec.Replicas
    if replicas == 0 {
        replicas = 1
    }

    return &appsv1.Deployment{
        ObjectMeta: metav1.ObjectMeta{
            Name:      app.Name,
            Namespace: app.Namespace,
        },
        Spec: appsv1.DeploymentSpec{
            Replicas: &replicas,
            Selector: &metav1.LabelSelector{
                MatchLabels: map[string]string{"app": app.Name},
            },
            Template: corev1.PodTemplateSpec{
                ObjectMeta: metav1.ObjectMeta{
                    Labels: map[string]string{"app": app.Name},
                },
                Spec: corev1.PodSpec{
                    Containers: []corev1.Container{{
                        Name:  "myapp",
                        Image: app.Spec.Image,
                        Ports: []corev1.ContainerPort{{
                            ContainerPort: app.Spec.Port,
                        }},
                        Env: r.buildEnvVars(app.Spec.EnvVars),
                    }},
                },
            },
        },
    }
}

第四步:启动 Controller

// main.go
func main() {
    ctrl.SetLogger(zap.New(zap.UseDevMode(true)))

    mgr, err := ctrl.NewManager(ctrl.GetConfigOrDie(), ctrl.Options{
        Scheme: scheme,
    })
    if err != nil {
        setupLog.Error(err, "unable to start manager")
        os.Exit(1)
    }

    if err = (&controllers.MyAppReconciler{
        Client: mgr.GetClient(),
        Scheme: mgr.GetScheme(),
    }).SetupWithManager(mgr); err != nil {
        setupLog.Error(err, "unable to create controller")
        os.Exit(1)
    }

    if err := mgr.Start(ctrl.SetupSignalHandler()); err != nil {
        setupLog.Error(err, "problem running manager")
        os.Exit(1)
    }
}

高级特性

1. Webhook 验证

// webhooks/myapp_webhook.go
func (r *MyApp) ValidateCreate() error {
    if r.Spec.Replicas < 0 {
        return field.Invalid(
            field.NewPath("spec").Child("replicas"),
            r.Spec.Replicas,
            "replicas must be non-negative",
        )
    }
    return nil
}

2. Finalizer(防止误删)

func (r *MyAppReconciler) Reconcile(ctx context.Context, req ctrl.Request) (ctrl.Result, error) {
    myapp := &v1.MyApp{}
    r.Get(ctx, req.NamespacedName, myapp)

    // 删除标记?
    if myapp.DeletionTimestamp.IsZero() {
        // 添加 finalizer
        if !containsString(myapp.GetFinalizers(), "myapp.finalizer") {
            myapp.Finalizers = append(myapp.GetFinalizers(), "myapp.finalizer")
            r.Update(ctx, myapp)
        }
    } else {
        // 执行清理逻辑
        r.cleanup(myapp)
        // 移除 finalizer
        myapp.Finalizers = removeString(myapp.GetFinalizers(), "myapp.finalizer")
        r.Update(ctx, myapp)
    }
}

测试

import (
    . "github.com/onsi/ginkgo/v2"
    . "github.com/onsi/gomega"
)

var _ = Describe("MyApp controller", func() {
    Context("with basic spec", func() {
        It("should create a Deployment", func() {
            myapp := &v1.MyApp{
                ObjectMeta: metav1.ObjectMeta{
                    Name:      "test",
                    Namespace: "default",
                },
                Spec: v1.MyAppSpec{
                    Replicas: 2,
                    Image:    "nginx:latest",
                    Port:     80,
                },
            }
            Expect(k8sClient.Create(ctx, myapp)).Should(Succeed())
        })
    })
})

部署 Operator

# config/manager/manager.yaml
apiVersion: apps/v1
kind: Deployment
metadata:
  name: my-operator
spec:
  replicas: 1
  template:
    spec:
      containers:
      - name: operator
        image: myorg/my-operator:v1.0.0
        env:
        - name: WATCH_NAMESPACE
          value: ""
# OLM (Operator Lifecycle Manager) 安装
operator-sdk olm install
operator-sdk run bundle myorg/my-operator-bundle:v1.0.0

总结

Operator 开发的核心:

  1. CRD — 定义声明式 API
  2. Reconcile Loop — 调和实际状态到期望状态
  3. Status — 反馈当前状态给用户
  4. Finalizer — 安全清理资源

用 Go 写 Operator 是一种享受——强类型 + K8s 生态 + 声明式哲学,完美结合。

你的下一个 Operator 在哪里?


Operator SDK 是开发 Operator 的利器,推荐从它开始。