diff --git a/provider/kubernetes.go b/provider/kubernetes.go index 942094ebf..224c83d52 100644 --- a/provider/kubernetes.go +++ b/provider/kubernetes.go @@ -160,8 +160,13 @@ func (provider *Kubernetes) loadIngresses(k8sClient k8s.Client) (*types.Configur } } service, exists, err := k8sClient.GetService(i.ObjectMeta.Namespace, pa.Backend.ServiceName) - if err != nil || !exists { - log.Warnf("Error retrieving service %s/%s: %v", i.ObjectMeta.Namespace, pa.Backend.ServiceName, err) + if err != nil { + log.Errorf("Error while retrieving service information from k8s API %s/%s: %v", service.ObjectMeta.Namespace, pa.Backend.ServiceName, err) + return nil, err + } + + if !exists { + log.Errorf("Service not found for %s/%s", service.ObjectMeta.Namespace, pa.Backend.ServiceName) delete(templateObjects.Frontends, r.Host+pa.Path) continue } @@ -184,13 +189,20 @@ func (provider *Kubernetes) loadIngresses(k8sClient k8s.Client) (*types.Configur if port.Port == 443 { protocol = "https" } + endpoints, exists, err := k8sClient.GetEndpoints(service.ObjectMeta.Namespace, service.ObjectMeta.Name) - if err != nil || !exists { + if err != nil { log.Errorf("Error retrieving endpoints %s/%s: %v", service.ObjectMeta.Namespace, service.ObjectMeta.Name, err) + return nil, err + } + + if !exists { + log.Errorf("Endpoints not found for %s/%s", service.ObjectMeta.Namespace, service.ObjectMeta.Name) continue } + if len(endpoints.Subsets) == 0 { - log.Warnf("Endpoints not found for %s/%s, falling back to Service ClusterIP", service.ObjectMeta.Namespace, service.ObjectMeta.Name) + log.Warnf("Service endpoints not found for %s/%s, falling back to Service ClusterIP", service.ObjectMeta.Namespace, service.ObjectMeta.Name) templateObjects.Backends[r.Host+pa.Path].Servers[string(service.UID)] = types.Server{ URL: protocol + "://" + service.Spec.ClusterIP + ":" + strconv.Itoa(int(port.Port)), Weight: 1, diff --git a/provider/kubernetes_test.go b/provider/kubernetes_test.go index a3866f9cc..1f9057b6e 100644 --- a/provider/kubernetes_test.go +++ b/provider/kubernetes_test.go @@ -2,11 +2,13 @@ package provider import ( "encoding/json" + "errors" "reflect" "testing" "github.com/containous/traefik/provider/k8s" "github.com/containous/traefik/types" + "github.com/davecgh/go-spew/spew" "k8s.io/client-go/1.5/pkg/api/v1" "k8s.io/client-go/1.5/pkg/apis/extensions/v1beta1" "k8s.io/client-go/1.5/pkg/util/intstr" @@ -1524,11 +1526,286 @@ func TestServiceAnnotations(t *testing.T) { } } +func TestKubeAPIErrors(t *testing.T) { + ingresses := []*v1beta1.Ingress{{ + ObjectMeta: v1.ObjectMeta{ + Namespace: "testing", + }, + Spec: v1beta1.IngressSpec{ + Rules: []v1beta1.IngressRule{ + { + Host: "foo", + IngressRuleValue: v1beta1.IngressRuleValue{ + HTTP: &v1beta1.HTTPIngressRuleValue{ + Paths: []v1beta1.HTTPIngressPath{ + { + Path: "/bar", + Backend: v1beta1.IngressBackend{ + ServiceName: "service1", + ServicePort: intstr.FromInt(80), + }, + }, + }, + }, + }, + }, + }, + }, + }} + + services := []*v1.Service{{ + ObjectMeta: v1.ObjectMeta{ + Name: "service1", + UID: "1", + Namespace: "testing", + }, + Spec: v1.ServiceSpec{ + ClusterIP: "10.0.0.1", + Ports: []v1.ServicePort{ + { + Port: 80, + }, + }, + }, + }} + + endpoints := []*v1.Endpoints{} + watchChan := make(chan interface{}) + apiErr := errors.New("failed kube api call") + + testCases := []struct { + desc string + apiServiceErr error + apiEndpointsErr error + }{ + { + desc: "failed service call", + apiServiceErr: apiErr, + }, + { + desc: "failed endpoints call", + apiEndpointsErr: apiErr, + }, + } + + for _, tc := range testCases { + tc := tc + t.Run(tc.desc, func(t *testing.T) { + t.Parallel() + + client := clientMock{ + ingresses: ingresses, + services: services, + endpoints: endpoints, + watchChan: watchChan, + apiServiceError: tc.apiServiceErr, + apiEndpointsError: tc.apiEndpointsErr, + } + + provider := Kubernetes{} + if _, err := provider.loadIngresses(client); err != apiErr { + t.Errorf("Got error %v, wanted error %v", err, apiErr) + } + }) + } +} + +func TestMissingResources(t *testing.T) { + ingresses := []*v1beta1.Ingress{{ + ObjectMeta: v1.ObjectMeta{ + Namespace: "testing", + }, + Spec: v1beta1.IngressSpec{ + Rules: []v1beta1.IngressRule{ + { + Host: "fully_working", + IngressRuleValue: v1beta1.IngressRuleValue{ + HTTP: &v1beta1.HTTPIngressRuleValue{ + Paths: []v1beta1.HTTPIngressPath{ + { + Backend: v1beta1.IngressBackend{ + ServiceName: "fully_working_service", + ServicePort: intstr.FromInt(80), + }, + }, + }, + }, + }, + }, + { + Host: "missing_service", + IngressRuleValue: v1beta1.IngressRuleValue{ + HTTP: &v1beta1.HTTPIngressRuleValue{ + Paths: []v1beta1.HTTPIngressPath{ + { + Backend: v1beta1.IngressBackend{ + ServiceName: "missing_service_service", + ServicePort: intstr.FromInt(80), + }, + }, + }, + }, + }, + }, + { + Host: "missing_endpoints", + IngressRuleValue: v1beta1.IngressRuleValue{ + HTTP: &v1beta1.HTTPIngressRuleValue{ + Paths: []v1beta1.HTTPIngressPath{ + { + Backend: v1beta1.IngressBackend{ + ServiceName: "missing_endpoints_service", + ServicePort: intstr.FromInt(80), + }, + }, + }, + }, + }, + }, + }, + }, + }} + services := []*v1.Service{ + { + ObjectMeta: v1.ObjectMeta{ + Name: "fully_working_service", + UID: "1", + Namespace: "testing", + }, + Spec: v1.ServiceSpec{ + ClusterIP: "10.0.0.1", + Ports: []v1.ServicePort{ + { + Port: 80, + }, + }, + }, + }, + { + ObjectMeta: v1.ObjectMeta{ + Name: "missing_endpoints_service", + UID: "3", + Namespace: "testing", + }, + Spec: v1.ServiceSpec{ + ClusterIP: "10.0.0.3", + Ports: []v1.ServicePort{ + { + Port: 80, + }, + }, + }, + }, + } + endpoints := []*v1.Endpoints{ + { + ObjectMeta: v1.ObjectMeta{ + Name: "fully_working_service", + UID: "1", + Namespace: "testing", + }, + Subsets: []v1.EndpointSubset{ + { + Addresses: []v1.EndpointAddress{ + { + IP: "10.10.0.1", + }, + }, + Ports: []v1.EndpointPort{ + { + Port: 8080, + }, + }, + }, + }, + }, + } + + watchChan := make(chan interface{}) + client := clientMock{ + ingresses: ingresses, + services: services, + endpoints: endpoints, + watchChan: watchChan, + + // TODO: Update all tests to cope with "properExists == true" correctly and remove flag. + // See https://github.com/containous/traefik/issues/1307 + properExists: true, + } + provider := Kubernetes{} + actual, err := provider.loadIngresses(client) + if err != nil { + t.Fatalf("error %+v", err) + } + + expected := &types.Configuration{ + Backends: map[string]*types.Backend{ + "fully_working": { + Servers: map[string]types.Server{ + "http://10.10.0.1:8080": { + URL: "http://10.10.0.1:8080", + Weight: 1, + }, + }, + CircuitBreaker: nil, + LoadBalancer: &types.LoadBalancer{ + Method: "wrr", + Sticky: false, + }, + }, + "missing_service": { + Servers: map[string]types.Server{}, + LoadBalancer: &types.LoadBalancer{ + Method: "wrr", + Sticky: false, + }, + }, + "missing_endpoints": { + Servers: map[string]types.Server{}, + CircuitBreaker: nil, + LoadBalancer: &types.LoadBalancer{ + Method: "wrr", + Sticky: false, + }, + }, + }, + Frontends: map[string]*types.Frontend{ + "fully_working": { + Backend: "fully_working", + PassHostHeader: true, + Routes: map[string]types.Route{ + "fully_working": { + Rule: "Host:fully_working", + }, + }, + }, + "missing_endpoints": { + Backend: "missing_endpoints", + PassHostHeader: true, + Routes: map[string]types.Route{ + "missing_endpoints": { + Rule: "Host:missing_endpoints", + }, + }, + }, + }, + } + + if !reflect.DeepEqual(actual, expected) { + t.Fatalf("expected\n%v\ngot\n\n%v", spew.Sdump(expected), spew.Sdump(actual)) + } +} + type clientMock struct { ingresses []*v1beta1.Ingress services []*v1.Service endpoints []*v1.Endpoints watchChan chan interface{} + + apiServiceError error + apiEndpointsError error + + properExists bool } func (c clientMock) GetIngresses(namespaces k8s.Namespaces) []*v1beta1.Ingress { @@ -1543,20 +1820,33 @@ func (c clientMock) GetIngresses(namespaces k8s.Namespaces) []*v1beta1.Ingress { } func (c clientMock) GetService(namespace, name string) (*v1.Service, bool, error) { + if c.apiServiceError != nil { + return &v1.Service{}, false, c.apiServiceError + } + for _, service := range c.services { if service.Namespace == namespace && service.Name == name { return service, true, nil } } - return &v1.Service{}, true, nil + return &v1.Service{}, false, nil } func (c clientMock) GetEndpoints(namespace, name string) (*v1.Endpoints, bool, error) { + if c.apiEndpointsError != nil { + return &v1.Endpoints{}, false, c.apiEndpointsError + } + for _, endpoints := range c.endpoints { if endpoints.Namespace == namespace && endpoints.Name == name { return endpoints, true, nil } } + + if c.properExists { + return &v1.Endpoints{}, false, nil + } + return &v1.Endpoints{}, true, nil }