Home Kubernetes ReplicaSet Controller Design Principle
Post
Cancel

Kubernetes ReplicaSet Controller Design Principle

In kubernetes, controllers play a vital role to orchestrate resources. For beginners learning kubernetes, we need to understand what are controllers. In short, controllers manipulate resources. In particular, resources indicate a collection of static object definition, and they exist in the ETCD database (managed by ApiServer). For a simple example, you can throw the below pod yaml file

1
2
3
4
5
6
7
8
9
10
apiVersion: v1
kind: Pod
metadata:
  name: nginx
spec:
  containers:
  - name: nginx
    image: nginx:1.14.2
    ports:
    - containerPort: 80

into ETCD using kubectl, then you have a Pod resource. However, before scheduler arranges it to run in some node and kubelet starts the runtime container, the resource is just a static definition. This sounds like an order for a meal, it is just there and may not be cooked/served yet. The role of controllers is to orchestrate a lot of orders (but controllers do not cook or serve dishes). As another example, say if you throw a ReplicaSet with 3 replicas into ETCD, replicatset controller will then create 3 corresponding Pod resources in ETCD. Again they just make the order, the actual work to launch containers is the responsibility of kubelet. As such, you can immagine that controller development/debug does not require container runtime at all.

ReplicaSet controller in k8s is a very basic controller. In this post, we will have a look at the implementation details for ReplicatSet controller.

Run ReplicatSet Controller

Earlier I shared a post on how to debug k8s source code. However, that solution is too cubersome. In fact, we have a lighter way if we just want to run/debug simple function unit. Let’s go the the k8s source code folder and run:

1
2
3
4
5
6
7
8
kubernetes git:(master) go test -v ./pkg/controller/replicaset -run TestWatchPods
=== RUN   TestWatchPods
I0427 10:34:13.498307   65353 replica_set.go:205] Starting replicaset controller
I0427 10:34:13.498605   65353 shared_informer.go:255] Waiting for caches to sync for ReplicaSet
I0427 10:34:13.498619   65353 shared_informer.go:262] Caches are synced for ReplicaSet
--- PASS: TestWatchPods (0.00s)
PASS
ok  	k8s.io/kubernetes/pkg/controller/replicaset	0.789s

Now, you see we started a replicaset controller and did a test. The details of this test can be found ./pkg/controller/replicaset/replicat_set_test.go. You can also do a step-by-step debug using:

1
2
3
4
5
6
7
8
9
10
11
➜  kubernetes git:(master) dlv test ./pkg/controller/replicaset/

Type 'help' for list of commands.
(dlv) b TestWatchPods
Breakpoint 1 set at 0x2afd2d2 for k8s.io/kubernetes/pkg/controller/replicaset.TestWatchPods() ./pkg/controller/replicaset/replica_set_test.go:708
(dlv) c
=> 708:	func TestWatchPods(t *testing.T) {
   709:		client := fake.NewSimpleClientset()
   710:
   711:		fakeWatch := watch.NewFake()
   712:		client.PrependWatchReactor("pods", core.DefaultWatchReactor(fakeWatch, nil))

Then you will be able to go through the steps of creating the controller and see how it works.

ReplicaSet Controller Design Principle

The ultimate goal of replicat set controller is to ensure there are always a dedicated number of pods (here we can only ensure from the resource manifest level, but we cannot guarantee cooresponding containers can be created). Before we have look at the implementation, it is helpful if we ask the following questions:

How to deal with the case when controllers get restarted?

Cause the controllers may be interrupted but the number of pods is not enough. The overview is count the existing number of pods at the begining of every sync loop. Each managed pod is expected to have the same labels and may have a owner reference points to the replicat set. Then depending on the number, we keep creating or deleting pods to statisfy the dedidated number.

Implementation Details

We will take a look at a few key implementation details. The ReplicatSetController is defined as a struct:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
type ReplicaSetController struct {
	schema.GroupVersionKind
	kubeClient clientset.Interface
	
    //This is podController interface, which will be responsible to create or delete pod 
    //resources.
	podControl controller.PodControlInterface
	
	burstReplicas int
	syncHandler func(ctx context.Context, rsKey string) error

	// Think about what if we want to create 200 Pods in one sync, but before it finishes
	// a second sync is started? Here in the first sync, expecations will say I expect to 
	// create 200 pods. Thus if the expections are not satisfied, second sync  will quit
	expectations *controller.UIDTrackingControllerExpectations


	rsLister appslisters.ReplicaSetLister
	rsListerSynced cache.InformerSynced
	
    //This is storage for objects
	rsIndexer      cache.Indexer


	podLister corelisters.PodLister
	
	podListerSynced cache.InformerSynced

	// This is work queue. items in this queue will be processed one by one
	queue workqueue.RateLimitingInterface
}

When the ReplicatSet controller is created, it will listen to Replicat Set and Pods Add/Update/Delete Events.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
func NewBaseController(rsInformer appsinformers.ReplicaSetInformer, podInformer coreinformers.PodInformer, kubeClient clientset.Interface, burstReplicas int,
	gvk schema.GroupVersionKind, metricOwnerName, queueName string, podControl controller.PodControlInterface) *ReplicaSetController {
	if kubeClient != nil && kubeClient.CoreV1().RESTClient().GetRateLimiter() != nil {
		ratelimiter.RegisterMetricAndTrackRateLimiterUsage(metricOwnerName, kubeClient.CoreV1().RESTClient().GetRateLimiter())
	}

    ...
	rsInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
		AddFunc:    rsc.addRS,
		UpdateFunc: rsc.updateRS,
		DeleteFunc: rsc.deleteRS,
	})
	...
		podInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
		AddFunc: rsc.addPod,
		// This invokes the ReplicaSet for every pod change, eg: host assignment. Though this might seem like
		// overkill the most frequent pod update is status, and the associated ReplicaSet will only list from
		// local storage, so it should be ok.
		UpdateFunc: rsc.updatePod,
		DeleteFunc: rsc.deletePod,
	})

For which ever detected, the corresponding action is to add the respective ReplicatSet resource into the work queue. For instance:

1
2
3
4
5
6
7
8
9
10
11
12
13
func (rsc *ReplicaSetController) addRS(obj interface{}) {
	rs := obj.(*apps.ReplicaSet)
	klog.V(4).Infof("Adding %s %s/%s", rsc.Kind, rs.Namespace, rs.Name)
	rsc.enqueueRS(rs)
}
func (rsc *ReplicaSetController) enqueueRS(rs *apps.ReplicaSet) {
	key, err := controller.KeyFunc(rs)
	if err != nil {
		utilruntime.HandleError(fmt.Errorf("couldn't get key for object %#v: %v", rs, err))
		return
	}
	rsc.queue.Add(key)
}

Now we can have a look at the control loop. For each worker process, it will take one item from the work queue and call syncHandler to start a sync loop.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
func (rsc *ReplicaSetController) processNextWorkItem(ctx context.Context) bool {
	key, quit := rsc.queue.Get()
	if quit {
		return false
	}
	defer rsc.queue.Done(key)

	err := rsc.syncHandler(ctx, key.(string))
	if err == nil {
		rsc.queue.Forget(key)
		return true
	}

	utilruntime.HandleError(fmt.Errorf("sync %q failed with %v", key, err))
	rsc.queue.AddRateLimited(key)

	return true
}

Here syncHandler points to syncReplicaSet:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
//Sync a specific replicat set, key is the fullname for the replicatset
func (rsc *ReplicaSetController) syncReplicaSet(ctx context.Context, key string) error {
	...
    // Get namespace and anme
	namespace, name, err := cache.SplitMetaNamespaceKey(key)
    
    // Get the replicatset object
	rs, err := rsc.rsLister.ReplicaSets(namespace).Get(name)

    // Check if the last round of sync is satisfied (finished)
	rsNeedsSync := rsc.expectations.SatisfiedExpectations(key)
	
    // First get all the pods in that namespace
	allPods, err := rsc.podLister.Pods(rs.Namespace).List(labels.Everything())


	// Ignore inactive pods.
	filteredPods := controller.FilterActivePods(allPods)


    // Now get all existing pods for the replicatset
	filteredPods, err = rsc.claimPods(ctx, rs, selector, filteredPods)

	var manageReplicasErr error
	if rsNeedsSync && rs.DeletionTimestamp == nil {
	
        // Now we depends on how many existing pods
        // either create new pods or delete pods
		manageReplicasErr = rsc.manageReplicas(ctx, filteredPods, rs)
	}
	....

The actual work to create new pods or delete pods will be done in manageReplicas.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
func (rsc *ReplicaSetController) manageReplicas(ctx context.Context, filteredPods []*v1.Pod, rs *apps.ReplicaSet) error {

    //Caclulate the difference, either to create a delete
	diff := len(filteredPods) - int(*(rs.Spec.Replicas))
	rsKey, err := controller.KeyFunc(rs)
	...
	if diff < 0 {
		diff *= -1
		if diff > rsc.burstReplicas {
			diff = rsc.burstReplicas
		}
		// Set the number of pods expected to create, for each pod 
		// that is created succefully, this number will decrease 1.
		// When pod add event is detected, there is a call to 
		// rsc.expectations.CreationObserved(rsKey) to reduce this number
		// The next round of sync will quit if this number is not 0.
		rsc.expectations.ExpectCreations(rsKey, diff)
		klog.V(2).InfoS("Too few replicas", "replicaSet", klog.KObj(rs), "need", *(rs.Spec.Replicas), "creating", diff)
		

		successfulCreations, err := slowStartBatch(diff, controller.SlowStartInitialBatchSize, func() error {
		    // Actual create pods work is delegated to podControl
			err := rsc.podControl.CreatePods(ctx, rs.Namespace, &rs.Spec.Template, rs, metav1.NewControllerRef(rs, rsc.GroupVersionKind))
			if err != nil {
				if apierrors.HasStatusCause(err, v1.NamespaceTerminatingCause) {
					return nil
				}
			}
			return err
		})
        ....
	}

Actual pods creation work is wrapped in slowStartBatch, there pods will be created batch by batch with different batch sizes.

1
2
3
4
5
6
7
8
9
10
11
12
13
        errCh := make(chan error, batchSize)
		var wg sync.WaitGroup
		wg.Add(batchSize)
		for i := 0; i < batchSize; i++ {
			go func() {
				defer wg.Done()
				//Here fn is the function passed from the parameter of slowStartBatch
				if err := fn(); err != nil {
					errCh <- err
				}
			}()
		}
		wg.Wait()

Summary

In this post, we had an overview of the principle of replicat set controller in k8s. At each sync loop, the controller will claim pods (i.e., count how many pods exists under the replicat set), then decide whether to create more pods or delete pods. There are some hightlight implementation tricks to pay attention, such as use expectations to avoid concurrent sync loop, and use slow start batch to make pod creation process smooth.

This post is licensed under CC BY 4.0 by the author.

A Better Way to Debug TiDB Source Code

How to Traverse Trees in Rust

Comments powered by Disqus.