From bee370ec6be6bd6192faf1a3a91350d99ba3838a Mon Sep 17 00:00:00 2001 From: Ben Weissmann Date: Fri, 30 Aug 2019 06:16:04 -0400 Subject: [PATCH] Throttle Kubernetes config refresh --- docs/configuration/backends/kubernetes.md | 12 +++++- provider/kubernetes/kubernetes.go | 52 ++++++++++++++++++++++- 2 files changed, 61 insertions(+), 3 deletions(-) diff --git a/docs/configuration/backends/kubernetes.md b/docs/configuration/backends/kubernetes.md index 5ee509c2e..acf384bed 100644 --- a/docs/configuration/backends/kubernetes.md +++ b/docs/configuration/backends/kubernetes.md @@ -73,6 +73,14 @@ See also [Kubernetes user guide](/user-guide/kubernetes). # # enablePassTLSCert = true +# Throttle how frequently we refresh our configuration from Ingresses when there +# are frequent changes. +# +# Optional +# Default: 0 (no throttling) +# +# throttleDuration = 10s + # Override default configuration template. # # Optional @@ -210,7 +218,7 @@ infos: serialnumber: true ``` -If `pem` is set, it will add a `X-Forwarded-Tls-Client-Cert` header that contains the escaped pem as value. +If `pem` is set, it will add a `X-Forwarded-Tls-Client-Cert` header that contains the escaped pem as value. If at least one flag of the `infos` part is set, it will add a `X-Forwarded-Tls-Client-Cert-Infos` header that contains an escaped string composed of the client certificate data selected by the infos flags. This infos part is composed like the following example (not escaped): ```Subject="C=FR,ST=SomeState,L=Lyon,O=Cheese,CN=*.cheese.org",NB=1531900816,NA=1563436816,SAN=*.cheese.org,*.cheese.net,cheese.in,test@cheese.org,test@cheese.net,10.0.1.0,10.0.1.2``` @@ -231,7 +239,7 @@ rateset: ``` <5> `traefik.ingress.kubernetes.io/rule-type` -Note: `ReplacePath` is deprecated in this annotation, use the `traefik.ingress.kubernetes.io/request-modifier` annotation instead. Default: `PathPrefix`. +Note: `ReplacePath` is deprecated in this annotation, use the `traefik.ingress.kubernetes.io/request-modifier` annotation instead. Default: `PathPrefix`. <6> `traefik.ingress.kubernetes.io/service-weights`: Service weights enable to split traffic across multiple backing services in a fine-grained manner. diff --git a/provider/kubernetes/kubernetes.go b/provider/kubernetes/kubernetes.go index be9b97ec2..2bbbca04c 100644 --- a/provider/kubernetes/kubernetes.go +++ b/provider/kubernetes/kubernetes.go @@ -16,6 +16,7 @@ import ( "time" "github.com/cenk/backoff" + "github.com/containous/flaeg" "github.com/containous/traefik/job" "github.com/containous/traefik/log" "github.com/containous/traefik/provider" @@ -68,6 +69,7 @@ type Provider struct { LabelSelector string `description:"Kubernetes Ingress label selector to use" export:"true"` IngressClass string `description:"Value of kubernetes.io/ingress.class annotation to watch for" export:"true"` IngressEndpoint *IngressEndpoint `description:"Kubernetes Ingress Endpoint"` + ThrottleDuration flaeg.Duration `description:"Ingress refresh throttle duration"` lastConfiguration safe.Safe } @@ -137,16 +139,26 @@ func (p *Provider) Provide(configurationChan chan<- types.ConfigMessage, pool *s return nil } } + + throttleDuration := time.Duration(p.ThrottleDuration) + eventsChanToRead := throttleEvents(throttleDuration, stop, eventsChan) + for { select { case <-stop: return nil - case event := <-eventsChan: + case event := <-eventsChanToRead: + // Note that event is the *first* event that came in during this + // throttling interval -- if we're hitting our throttle, we may have + // dropped events. This is fine, because we don't treat different + // event types differently. But if we do in the future, we'll need to + // track more information about the dropped events. log.Debugf("Received Kubernetes event kind %T", event) templateObjects, err := p.loadIngresses(k8sClient) if err != nil { return err } + if reflect.DeepEqual(p.lastConfiguration.Get(), templateObjects) { log.Debugf("Skipping Kubernetes event kind %T", event) } else { @@ -156,6 +168,11 @@ func (p *Provider) Provide(configurationChan chan<- types.ConfigMessage, pool *s Configuration: p.loadConfig(*templateObjects), } } + + // If we're throttling, we sleep here for the throttle duration to + // enforce that we don't refresh faster than our throttle. time.Sleep + // returns immediately if p.ThrottleDuration is 0 (no throttle). + time.Sleep(throttleDuration) } } } @@ -599,6 +616,39 @@ func (p *Provider) addGlobalBackend(cl Client, i *extensionsv1beta1.Ingress, tem return nil } +func throttleEvents(throttleDuration time.Duration, stop chan bool, eventsChan <-chan interface{}) chan interface{} { + if throttleDuration == 0 { + return nil + } + // Create a buffered channel to hold the pending event (if we're delaying processing the event due to throttling) + eventsChanBuffered := make(chan interface{}, 1) + + // Run a goroutine that reads events from eventChan and does a + // non-blocking write to pendingEvent. This guarantees that writing to + // eventChan will never block, and that pendingEvent will have + // something in it if there's been an event since we read from that channel. + go func() { + for { + select { + case <-stop: + return + case nextEvent := <-eventsChan: + select { + case eventsChanBuffered <- nextEvent: + default: + // We already have an event in eventsChanBuffered, so we'll + // do a refresh as soon as our throttle allows us to. It's fine + // to drop the event and keep whatever's in the buffer -- we + // don't do different things for different events + log.Debugf("Dropping event kind %T due to throttling", nextEvent) + } + } + } + }() + + return eventsChanBuffered +} + func getRuleForPath(pa extensionsv1beta1.HTTPIngressPath, i *extensionsv1beta1.Ingress) (string, error) { if len(pa.Path) == 0 { return "", nil