“声明式 API”并不像“命令式 API”那样有着明显的执行逻辑。这就使得基于声明式 API 的业务功能实现,往往需要通过控制器模式来“监视”API 对象的变化(比如,创建或者删除 Network),然后以此来决定实际要执行的具体工作。
在API对象中已经讲述了如何自定义API对象,那么现在来完成另一部分,如果自定义控制器来管理API对象。
编写自定义控制器包括:编写main函数,编写自定义控制器的定义,编写控制器里的业务逻辑三个部分。
编写main函数
main函数的主要功能是定义一个自定义控制器,并启动它。主要代码如下:
func main() {
...
cfg, err := clientcmd.BuildConfigFromFlags(masterURL, kubeconfig)
...
kubeClient, err := kubernetes.NewForConfig(cfg)
...
networkClient, err := clientset.NewForConfig(cfg)
...
networkInformerFactory := informers.NewSharedInformerFactory(networkClient, ...)
controller := NewController(kubeClient, networkClient,
networkInformerFactory.Samplecrd().V1().Networks())
go networkInformerFactory.Start(stopCh)
if err = controller.Run(2, stopCh); err != nil {
glog.Fatalf("Error running controller: %s", err.Error())
}
}整个函数的流程分为三步:
- 根据提供的Master配置,创建一个k8s的Client(kubeClient)和network的Client(networkClient)。如果没有提供,就默认使用ServiceAccount数据卷里的授权信息来访问APIServer。
- 创建一个InformerFactory(networkInformerFactory)工厂,并使用它生成一个Network对象的Informer传递给控制器。
- 启动上述的Informer,然后执行controller.Run启动控制器。
编写自定义控制器的定义
k8s中的自定义控制器的工作原因可以用如下图来表示:

Informer工作原理
控制器要做的第一件事,就是从k8s的APIServer获取它关注的对象,也就是自定义的Network对象。这是通过Informer完成的,Informer与API对象一一对应。
Informer使用传递的networkClient与APIServer建立连接,不过这个连接时Informer通过Reflactor包维持的,使用了名为ListenAndWatch的方法。一旦APIServer端有新的Network实例被创建、删除或者更新都会通知Reflector事件发生。
Reflector将事件放入队列当中,然后根据事件的类型进行处理:一是进行同步本地缓存。
- 如果是Add事件,Informer就会通过Indexer将这个新增的API对象保存在本地缓存当中并为它创建索引。
- 如果是Delete事件,Informer就会从本地缓存删除这个事件。
每经过 resyncPeriod 指定的时间,Informer 维护的本地缓存,都会使用最近一次 LIST 返回的结果强制更新一次,从而保证缓存的有效性。在 Kubernetes 中,这个缓存强制更新的操作就叫作:resync。
二则是根据事件的类型,触发事先注册好的ResourceEventHandler。控制器的主要定义如下:
func NewController(
kubeclientset kubernetes.Interface,
networkclientset clientset.Interface,
networkInformer informers.NetworkInformer) *Controller {
...
controller := &Controller{
kubeclientset: kubeclientset,
networkclientset: networkclientset,
networksLister: networkInformer.Lister(),
networksSynced: networkInformer.Informer().HasSynced,
workqueue: workqueue.NewNamedRateLimitingQueue(..., "Networks"),
...
}
networkInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
AddFunc: controller.enqueueNetwork,
UpdateFunc: func(old, new interface{}) {
oldNetwork := old.(*samplecrdv1.Network)
newNetwork := new.(*samplecrdv1.Network)
if oldNetwork.ResourceVersion == newNetwork.ResourceVersion {
return
}
controller.enqueueNetwork(new)
},
DeleteFunc: controller.enqueueNetworkForDelete,
return controller
}这里使用了两个client和一个informer来初始化控制器,就是main函数中创建的几个对象。
还设置了一个工作队列,负责同步informer和控制循环之间的数据。注册了三个Handler,分别对应添加、更新、删除。而具体的处理操作,都是将该事件对应的 API 对象加入到工作队列中。实际入队的并不是 API 对象本身,而是它们的 Key,即:该 API 对象的<namespace>/<name>。
再捋一遍,Informer 通过一种叫作 ListAndWatch 的方法,把 APIServer 中的 API 对象缓存在了本地,并负责更新和维护这个缓存。每经过一段时间就强制更新以便缓存以保证缓存的有效性。
编写控制器的业务逻辑
控制循环
控制循环就是不断循环,将Pod的状态变为预期状态。 这里编写的控制循环如下:
func (c *Controller) Run(threadiness int, stopCh <-chan struct{}) error {
...
if ok := cache.WaitForCacheSync(stopCh, c.networksSynced); !ok {
return fmt.Errorf("failed to wait for caches to sync")
}
...
for i := 0; i < threadiness; i++ {
go wait.Until(c.runWorker, time.Second, stopCh)
}
...
return nil
}- 首先,等待 Informer 完成一次本地缓存的数据同步操作;
- 然后,直接通过 goroutine 启动一个(或者并发启动多个)“无限循环”的任务。 这个无限循环就是控制器的业务逻辑,内容如下:
func (c *Controller) runWorker() {
for c.processNextWorkItem() {
}
}
func (c *Controller) processNextWorkItem() bool {
obj, shutdown := c.workqueue.Get()
...
err := func(obj interface{}) error {
...
if err := c.syncHandler(key); err != nil {
return fmt.Errorf("error syncing '%s': %s", key, err.Error())
}
c.workqueue.Forget(obj)
...
return nil
}(obj)
...
return true
}
func (c *Controller) syncHandler(key string) error {
namespace, name, err := cache.SplitMetaNamespaceKey(key)
...
network, err := c.networksLister.Networks(namespace).Get(name)
if err != nil {
if errors.IsNotFound(err) {
glog.Warningf("Network does not exist in local cache: %s/%s, will delete it from Neutron ...",
namespace, name)
glog.Warningf("Network: %s/%s does not exist in local cache, will delete it from Neutron ...",
namespace, name)
// FIX ME: call Neutron API to delete this network by name.
//
// neutron.Delete(namespace, name)
return nil
}
...
return err
}
glog.Infof("[Neutron] Try to process network: %#v ...", network)
// FIX ME: Do diff().
//
// actualNetwork, exists := neutron.Get(namespace, name)
//
// if !exists {
// neutron.Create(namespace, name)
// } else if !reflect.DeepEqual(actualNetwork, network) {
// neutron.Update(namespace, name)
// }
return nil
}- 从工作队列里出队(workqueue.Get)了一个成员,也就是一个 Key(Network 对象的:namespace/name)。
- 在 syncHandler 方法中,我使用这个 Key,尝试从 Informer 维护的缓存中拿到了它所对应的 Network 对象。如果拿不到,说明这个对象是删除事件添加到队列中的,所以这里就需要将这个对象从真实的集群当中删除。
- 能够获取则可以比较真实状态与期望状态。期望状态就是从本地缓存读取到的,真实状态则通过API从集群中获得。不一致则进行依次协调。