“声明式 API”并不像“命令式 API”那样有着明显的执行逻辑。这就使得基于声明式 API 的业务功能实现,往往需要通过控制器模式来“监视”API 对象的变化(比如,创建或者删除 Network),然后以此来决定实际要执行的具体工作。

API对象中已经讲述了如何自定义API对象,那么现在来完成另一部分,如果自定义控制器来管理API对象。

编写自定义控制器包括:编写main函数,编写自定义控制器的定义,编写控制器里的业务逻辑三个部分。

编写main函数

main函数的主要功能是定义一个自定义控制器,并启动它。主要代码如下:

func main() {
  ...
  
  cfg, err := clientcmd.BuildConfigFromFlags(masterURL, kubeconfig)
  ...
  kubeClient, err := kubernetes.NewForConfig(cfg)
  ...
  networkClient, err := clientset.NewForConfig(cfg)
  ...
  
  networkInformerFactory := informers.NewSharedInformerFactory(networkClient, ...)
  
  controller := NewController(kubeClient, networkClient,
  networkInformerFactory.Samplecrd().V1().Networks())
  
  go networkInformerFactory.Start(stopCh)
 
  if err = controller.Run(2, stopCh); err != nil {
    glog.Fatalf("Error running controller: %s", err.Error())
  }
}

整个函数的流程分为三步:

  1. 根据提供的Master配置,创建一个k8s的Client(kubeClient)和network的Client(networkClient)。如果没有提供,就默认使用ServiceAccount数据卷里的授权信息来访问APIServer。
  2. 创建一个InformerFactory(networkInformerFactory)工厂,并使用它生成一个Network对象的Informer传递给控制器。
  3. 启动上述的Informer,然后执行controller.Run启动控制器。

编写自定义控制器的定义

k8s中的自定义控制器的工作原因可以用如下图来表示:

Informer工作原理

控制器要做的第一件事,就是从k8s的APIServer获取它关注的对象,也就是自定义的Network对象。这是通过Informer完成的,Informer与API对象一一对应。

Informer使用传递的networkClient与APIServer建立连接,不过这个连接时Informer通过Reflactor包维持的,使用了名为ListenAndWatch的方法。一旦APIServer端有新的Network实例被创建、删除或者更新都会通知Reflector事件发生。 Reflector将事件放入队列当中,然后根据事件的类型进行处理:一是进行同步本地缓存

  • 如果是Add事件,Informer就会通过Indexer将这个新增的API对象保存在本地缓存当中并为它创建索引。
  • 如果是Delete事件,Informer就会从本地缓存删除这个事件。

每经过 resyncPeriod 指定的时间,Informer 维护的本地缓存,都会使用最近一次 LIST 返回的结果强制更新一次,从而保证缓存的有效性。在 Kubernetes 中,这个缓存强制更新的操作就叫作:resync

二则是根据事件的类型,触发事先注册好的ResourceEventHandler。控制器的主要定义如下:

func NewController(
  kubeclientset kubernetes.Interface,
  networkclientset clientset.Interface,
  networkInformer informers.NetworkInformer) *Controller {
  ...
  controller := &Controller{
    kubeclientset:    kubeclientset,
    networkclientset: networkclientset,
    networksLister:   networkInformer.Lister(),
    networksSynced:   networkInformer.Informer().HasSynced,
    workqueue:        workqueue.NewNamedRateLimitingQueue(...,  "Networks"),
    ...
  }
    networkInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
    AddFunc: controller.enqueueNetwork,
    UpdateFunc: func(old, new interface{}) {
      oldNetwork := old.(*samplecrdv1.Network)
      newNetwork := new.(*samplecrdv1.Network)
      if oldNetwork.ResourceVersion == newNetwork.ResourceVersion {
        return
      }
      controller.enqueueNetwork(new)
    },
    DeleteFunc: controller.enqueueNetworkForDelete,
 return controller
}

这里使用了两个client和一个informer来初始化控制器,就是main函数中创建的几个对象。

还设置了一个工作队列,负责同步informer和控制循环之间的数据。注册了三个Handler,分别对应添加、更新、删除。而具体的处理操作,都是将该事件对应的 API 对象加入到工作队列中。实际入队的并不是 API 对象本身,而是它们的 Key,即:该 API 对象的<namespace>/<name>

再捋一遍,Informer 通过一种叫作 ListAndWatch 的方法,把 APIServer 中的 API 对象缓存在了本地,并负责更新和维护这个缓存。每经过一段时间就强制更新以便缓存以保证缓存的有效性。

编写控制器的业务逻辑

控制循环

控制循环就是不断循环,将Pod的状态变为预期状态。 这里编写的控制循环如下:

func (c *Controller) Run(threadiness int, stopCh <-chan struct{}) error {
 ...
  if ok := cache.WaitForCacheSync(stopCh, c.networksSynced); !ok {
    return fmt.Errorf("failed to wait for caches to sync")
  }
  
  ...
  for i := 0; i < threadiness; i++ {
    go wait.Until(c.runWorker, time.Second, stopCh)
  }
  
  ...
  return nil
}
  • 首先,等待 Informer 完成一次本地缓存的数据同步操作;
  • 然后,直接通过 goroutine 启动一个(或者并发启动多个)“无限循环”的任务。 这个无限循环就是控制器的业务逻辑,内容如下:
func (c *Controller) runWorker() {
  for c.processNextWorkItem() {
  }
}
 
func (c *Controller) processNextWorkItem() bool {
  obj, shutdown := c.workqueue.Get()
  
  ...
  
  err := func(obj interface{}) error {
    ...
    if err := c.syncHandler(key); err != nil {
     return fmt.Errorf("error syncing '%s': %s", key, err.Error())
    }
    
    c.workqueue.Forget(obj)
    ...
    return nil
  }(obj)
  
  ...
  
  return true
}
 
func (c *Controller) syncHandler(key string) error {
 
  namespace, name, err := cache.SplitMetaNamespaceKey(key)
  ...
  
  network, err := c.networksLister.Networks(namespace).Get(name)
  if err != nil {
    if errors.IsNotFound(err) {
      glog.Warningf("Network does not exist in local cache: %s/%s, will delete it from Neutron ...",
      namespace, name)
      
      glog.Warningf("Network: %s/%s does not exist in local cache, will delete it from Neutron ...",
    namespace, name)
    
     // FIX ME: call Neutron API to delete this network by name.
     //
     // neutron.Delete(namespace, name)
     
     return nil
  }
    ...
    
    return err
  }
  
  glog.Infof("[Neutron] Try to process network: %#v ...", network)
  
  // FIX ME: Do diff().
  //
  // actualNetwork, exists := neutron.Get(namespace, name)
  //
  // if !exists {
  //   neutron.Create(namespace, name)
  // } else if !reflect.DeepEqual(actualNetwork, network) {
  //   neutron.Update(namespace, name)
  // }
  
  return nil
}
  • 从工作队列里出队(workqueue.Get)了一个成员,也就是一个 Key(Network 对象的:namespace/name)。
  • 在 syncHandler 方法中,我使用这个 Key,尝试从 Informer 维护的缓存中拿到了它所对应的 Network 对象。如果拿不到,说明这个对象是删除事件添加到队列中的,所以这里就需要将这个对象从真实的集群当中删除。
  • 能够获取则可以比较真实状态与期望状态。期望状态就是从本地缓存读取到的,真实状态则通过API从集群中获得。不一致则进行依次协调。

tags: 容器编排 控制器 声明式API