Compare commits

...

7 Commits

Author SHA1 Message Date
Julien Salleyron
1db9482a8e Prepare release v1.3.4 2017-07-27 17:24:19 +02:00
Julien Salleyron
888e6dcbc8 Oxy with gorilla for websocket(+integration tests) 2017-07-27 15:43:12 +02:00
dedalusj
a09a8b1235 Fix replace path rule
* Fix replace path rule
* test: add RequestURI tests.
2017-07-19 10:27:52 +02:00
Fernandez Ludovic
36ee69609e fix: double compression. 2017-07-18 11:27:24 +02:00
Fernandez Ludovic
98b52d1f54 Prepare release v1.3.3 2017-07-06 17:53:35 +02:00
Timo Reimann
4892b2b0da [kubernetes] Undo the Secrets controller sync wait.
When Secrets permissions have not been granted (which is likely to be
the case for users not needing the basic auth feature), the watch on the
Secrets API will never yield a response, thereby causing the controller
to never sync successfully, and in turn causing the check for all
controller synchronizations to fail consistently. Thus, no event will
ever be handled.
2017-07-06 17:12:25 +02:00
Timo Reimann
91ce78da46 [k8s] Tell glog to log everything into STDERR.
Logging errors into a file inside a minimalistic container might not be
possible, and glog bails out with an exit code > 0 if it fails.
2017-07-04 17:11:50 +02:00
16 changed files with 418 additions and 158 deletions

View File

@@ -1,5 +1,20 @@
# Change Log
## [v1.3.4](https://github.com/containous/traefik/tree/v1.3.4) (2017-07-27)
[All Commits](https://github.com/containous/traefik/compare/v1.3.3...v1.3.4)
**Bug fixes:**
- **[middleware]** Double compression. ([#1863](https://github.com/containous/traefik/pull/1863) by [ldez](https://github.com/ldez))
- **[middleware]** Fix replace path rule ([#1859](https://github.com/containous/traefik/pull/1859) by [dedalusj](https://github.com/dedalusj))
- **[websocket]** New oxy with gorilla for websocket with integration tests ([#1896](https://github.com/containous/traefik/pull/1896) by [Juliens](https://github.com/Juliens))
## [v1.3.3](https://github.com/containous/traefik/tree/v1.3.3) (2017-07-06)
[All Commits](https://github.com/containous/traefik/compare/v1.3.2...v1.3.3)
**Bug fixes:**
- **[k8s]** Undo the Secrets controller sync wait. ([#1828](https://github.com/containous/traefik/pull/1828) by [timoreimann](https://github.com/timoreimann))
- **[k8s]** Tell glog to log everything into STDERR. ([#1817](https://github.com/containous/traefik/pull/1817) by [timoreimann](https://github.com/timoreimann))
## [v1.3.2](https://github.com/containous/traefik/tree/v1.3.2) (2017-06-29)
[All Commits](https://github.com/containous/traefik/compare/v1.3.1...v1.3.2)

8
glide.lock generated
View File

@@ -1,4 +1,4 @@
hash: cebc972cf87c4b0a8f86801f38750c51b09c8dee3bf62bb48f8eaa6ab7946352
hash: bfc5801ed56be5f703a0924d8832dcccc42bf02f9e2b035ef77eab62c0cb4884
updated: 2017-06-29T16:47:14.848940186+02:00
imports:
- name: cloud.google.com/go
@@ -320,7 +320,9 @@ imports:
- name: github.com/mvdan/xurls
version: db96455566f05ffe42bd6ac671f05eeb1152b45d
- name: github.com/NYTimes/gziphandler
version: 22d4470af89e09998fc16b35029df973932df4ae
version: 316adfc72ed3b0157975917adf62ba2dc31842ce
repo: https://github.com/containous/gziphandler.git
vcs: git
- name: github.com/ogier/pflag
version: 45c278ab3607870051a2ea9040bb85fcb8557481
- name: github.com/opencontainers/runc
@@ -409,7 +411,7 @@ imports:
- name: github.com/vdemeester/docker-events
version: be74d4929ec1ad118df54349fda4b0cba60f849b
- name: github.com/vulcand/oxy
version: 7da864c1d53bd58165435bb78bbf8c01f01c8f4a
version: 49f1894c20d972f5c73ff44b859f87deb83f0076
repo: https://github.com/containous/oxy.git
vcs: git
subpackages:

View File

@@ -8,7 +8,7 @@ import:
- package: github.com/cenk/backoff
- package: github.com/containous/flaeg
- package: github.com/vulcand/oxy
version: 7da864c1d53bd58165435bb78bbf8c01f01c8f4a
version: 49f1894c20d972f5c73ff44b859f87deb83f0076
repo: https://github.com/containous/oxy.git
vcs: git
subpackages:
@@ -87,6 +87,8 @@ import:
vcs: git
- package: github.com/abbot/go-http-auth
- package: github.com/NYTimes/gziphandler
repo: https://github.com/containous/gziphandler.git
vcs: git
- package: github.com/docker/leadership
- package: github.com/satori/go.uuid
version: ^1.1.0

View File

@@ -0,0 +1,24 @@
defaultEntryPoints = ["http"]
logLevel = "DEBUG"
[entryPoints]
[entryPoints.http]
address = ":8000"
[web]
address = ":8080"
[file]
[backends]
[backends.backend1]
[backends.backend1.servers.server1]
url = "{{ .WebsocketServer }}"
[frontends]
[frontends.frontend1]
backend = "backend1"
[frontends.frontend1.routes.test_1]
rule = "Path:/ws"

View File

@@ -14,6 +14,8 @@ import (
"github.com/containous/traefik/integration/utils"
"github.com/go-check/check"
"bytes"
compose "github.com/libkermit/compose/check"
checker "github.com/vdemeester/shakers"
)
@@ -38,6 +40,7 @@ func init() {
check.Suite(&EurekaSuite{})
check.Suite(&AcmeSuite{})
check.Suite(&DynamoDBSuite{})
check.Suite(&WebsocketSuite{})
}
var traefikBinary = "../dist/traefik"
@@ -71,6 +74,18 @@ func (s *BaseSuite) createComposeProject(c *check.C, name string) {
s.composeProject = compose.CreateProject(c, projectName, composeFile)
}
func withConfigFile(file string) string {
return "--configFile=" + file
}
func (s *BaseSuite) cmdTraefik(args ...string) (*exec.Cmd, *bytes.Buffer) {
cmd := exec.Command(traefikBinary, args...)
var out bytes.Buffer
cmd.Stdout = &out
cmd.Stderr = &out
return cmd, &out
}
func (s *BaseSuite) traefikCmd(c *check.C, args ...string) (*exec.Cmd, string) {
cmd, out, err := utils.RunCommand(traefikBinary, args...)
c.Assert(err, checker.IsNil, check.Commentf("Fail to run %s with %v", traefikBinary, args))

View File

@@ -0,0 +1,81 @@
package main
import (
"net/http"
"net/http/httptest"
"time"
"github.com/go-check/check"
"errors"
"io/ioutil"
"os"
"strings"
"github.com/containous/traefik/integration/utils"
"github.com/gorilla/websocket"
checker "github.com/vdemeester/shakers"
)
// WebsocketSuite
type WebsocketSuite struct{ BaseSuite }
func (suite *WebsocketSuite) TestBase(c *check.C) {
var upgrader = websocket.Upgrader{} // use default options
srv := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
c, err := upgrader.Upgrade(w, r, nil)
if err != nil {
return
}
defer c.Close()
for {
mt, message, err := c.ReadMessage()
if err != nil {
break
}
err = c.WriteMessage(mt, message)
if err != nil {
break
}
}
}))
file := suite.adaptFile(c, "fixtures/websocket/config.toml", struct {
WebsocketServer string
}{
WebsocketServer: srv.URL,
})
defer os.Remove(file)
cmd, _ := suite.cmdTraefik(withConfigFile(file), "--debug")
err := cmd.Start()
c.Assert(err, check.IsNil)
defer cmd.Process.Kill()
// wait for traefik
err = utils.TryRequest("http://127.0.0.1:8080/api/providers", 60*time.Second, func(res *http.Response) error {
body, err := ioutil.ReadAll(res.Body)
if err != nil {
return err
}
if !strings.Contains(string(body), "127.0.0.1") {
return errors.New("Incorrect traefik config")
}
return nil
})
c.Assert(err, checker.IsNil)
conn, _, err := websocket.DefaultDialer.Dial("ws://127.0.0.1:8000/ws", nil)
c.Assert(err, checker.IsNil)
conn.WriteMessage(websocket.TextMessage, []byte("OK"))
_, msg, err := conn.ReadMessage()
c.Assert(err, checker.IsNil)
c.Assert(string(msg), checker.Equals, "OK")
}

View File

@@ -1,13 +1,11 @@
package middlewares
import (
"compress/gzip"
"net/http"
"github.com/NYTimes/gziphandler"
)
const (
contentEncodingHeader = "Content-Encoding"
"github.com/containous/traefik/log"
)
// Compress is a middleware that allows redirection
@@ -15,17 +13,13 @@ type Compress struct{}
// ServerHTTP is a function used by Negroni
func (c *Compress) ServeHTTP(rw http.ResponseWriter, r *http.Request, next http.HandlerFunc) {
if isEncoded(r.Header) {
next.ServeHTTP(rw, r)
} else {
newGzipHandler := gziphandler.GzipHandler(next)
newGzipHandler.ServeHTTP(rw, r)
}
gzipHandler(next).ServeHTTP(rw, r)
}
func isEncoded(headers http.Header) bool {
header := headers.Get(contentEncodingHeader)
// According to https://developer.mozilla.org/en-US/docs/Web/HTTP/Headers/Content-Encoding,
// content is not encoded if the header 'Content-Encoding' is empty or equals to 'identity'.
return header != "" && header != "identity"
func gzipHandler(h http.Handler) http.Handler {
wrapper, err := gziphandler.NewGzipHandler(gzip.DefaultCompression, gziphandler.DefaultMinSize, &gziphandler.GzipResponseWriterWrapper{})
if err != nil {
log.Error(err)
}
return wrapper(h)
}

View File

@@ -1,36 +1,39 @@
package middlewares
import (
"io/ioutil"
"net/http"
"net/http/httptest"
"testing"
"github.com/NYTimes/gziphandler"
"github.com/codegangsta/negroni"
"github.com/containous/traefik/testhelpers"
"github.com/stretchr/testify/assert"
)
const (
acceptEncodingHeader = "Accept-Encoding"
varyHeader = "Vary"
gzip = "gzip"
acceptEncodingHeader = "Accept-Encoding"
contentEncodingHeader = "Content-Encoding"
varyHeader = "Vary"
gzipValue = "gzip"
)
func TestShouldCompressWhenNoContentEncodingHeader(t *testing.T) {
handler := &Compress{}
req := testhelpers.MustNewRequest(http.MethodGet, "http://localhost", nil)
req.Header.Add(acceptEncodingHeader, gzip)
req.Header.Add(acceptEncodingHeader, gzipValue)
baseBody := generateBytes(gziphandler.DefaultMinSize)
next := func(rw http.ResponseWriter, r *http.Request) {
rw.Write(baseBody)
}
rw := httptest.NewRecorder()
rw := httptest.NewRecorder()
handler.ServeHTTP(rw, req, next)
assert.Equal(t, gzip, rw.Header().Get(contentEncodingHeader))
assert.Equal(t, gzipValue, rw.Header().Get(contentEncodingHeader))
assert.Equal(t, acceptEncodingHeader, rw.Header().Get(varyHeader))
if assert.ObjectsAreEqualValues(rw.Body.Bytes(), baseBody) {
@@ -42,28 +45,105 @@ func TestShouldNotCompressWhenContentEncodingHeader(t *testing.T) {
handler := &Compress{}
req := testhelpers.MustNewRequest(http.MethodGet, "http://localhost", nil)
req.Header.Add(acceptEncodingHeader, gzip)
req.Header.Add(contentEncodingHeader, gzip)
baseBody := generateBytes(gziphandler.DefaultMinSize)
req.Header.Add(acceptEncodingHeader, gzipValue)
fakeCompressedBody := generateBytes(gziphandler.DefaultMinSize)
next := func(rw http.ResponseWriter, r *http.Request) {
rw.Write(baseBody)
rw.Header().Add(contentEncodingHeader, gzipValue)
rw.Header().Add(varyHeader, acceptEncodingHeader)
rw.Write(fakeCompressedBody)
}
rw := httptest.NewRecorder()
handler.ServeHTTP(rw, req, next)
assert.Equal(t, "", rw.Header().Get(contentEncodingHeader))
assert.Equal(t, "", rw.Header().Get(varyHeader))
assert.Equal(t, gzipValue, rw.Header().Get(contentEncodingHeader))
assert.Equal(t, acceptEncodingHeader, rw.Header().Get(varyHeader))
assert.EqualValues(t, rw.Body.Bytes(), baseBody)
assert.EqualValues(t, rw.Body.Bytes(), fakeCompressedBody)
}
func TestShouldNotCompressWhenNoAcceptEncodingHeader(t *testing.T) {
handler := &Compress{}
req := testhelpers.MustNewRequest(http.MethodGet, "http://localhost", nil)
fakeBody := generateBytes(gziphandler.DefaultMinSize)
next := func(rw http.ResponseWriter, r *http.Request) {
rw.Write(fakeBody)
}
rw := httptest.NewRecorder()
handler.ServeHTTP(rw, req, next)
assert.Empty(t, rw.Header().Get(contentEncodingHeader))
assert.EqualValues(t, rw.Body.Bytes(), fakeBody)
}
func TestIntegrationShouldNotCompressWhenContentAlreadyCompressed(t *testing.T) {
fakeCompressedBody := generateBytes(100000)
handler := func(rw http.ResponseWriter, r *http.Request) {
rw.Header().Add(contentEncodingHeader, gzipValue)
rw.Header().Add(varyHeader, acceptEncodingHeader)
rw.Write(fakeCompressedBody)
}
comp := &Compress{}
negro := negroni.New(comp)
negro.UseHandlerFunc(handler)
ts := httptest.NewServer(negro)
defer ts.Close()
client := &http.Client{}
req := testhelpers.MustNewRequest(http.MethodGet, ts.URL, nil)
req.Header.Add(acceptEncodingHeader, gzipValue)
resp, err := client.Do(req)
assert.NoError(t, err, "there should be no error")
assert.Equal(t, gzipValue, resp.Header.Get(contentEncodingHeader))
assert.Equal(t, acceptEncodingHeader, resp.Header.Get(varyHeader))
body, err := ioutil.ReadAll(resp.Body)
assert.EqualValues(t, fakeCompressedBody, body)
}
func TestIntegrationShouldCompressWhenAcceptEncodingHeaderIsPresent(t *testing.T) {
fakeBody := generateBytes(100000)
handler := func(rw http.ResponseWriter, r *http.Request) {
rw.Write(fakeBody)
}
comp := &Compress{}
negro := negroni.New(comp)
negro.UseHandlerFunc(handler)
ts := httptest.NewServer(negro)
defer ts.Close()
client := &http.Client{}
req := testhelpers.MustNewRequest(http.MethodGet, ts.URL, nil)
req.Header.Add(acceptEncodingHeader, gzipValue)
resp, err := client.Do(req)
assert.NoError(t, err, "there should be no error")
assert.Equal(t, gzipValue, resp.Header.Get(contentEncodingHeader))
assert.Equal(t, acceptEncodingHeader, resp.Header.Get(varyHeader))
body, err := ioutil.ReadAll(resp.Body)
if assert.ObjectsAreEqualValues(body, fakeBody) {
assert.Fail(t, "expected a compressed body", "got %v", body)
}
}
func generateBytes(len int) []byte {
var value []byte
for i := 0; i < len; i++ {
value = append(value, 0x61)
value = append(value, 0x61+byte(i))
}
return value
}

View File

@@ -16,5 +16,6 @@ const ReplacedPathHeader = "X-Replaced-Path"
func (s *ReplacePath) ServeHTTP(w http.ResponseWriter, r *http.Request) {
r.Header.Add(ReplacedPathHeader, r.URL.Path)
r.URL.Path = s.Path
r.RequestURI = r.URL.RequestURI()
s.Handler.ServeHTTP(w, r)
}

View File

@@ -1,10 +1,11 @@
package middlewares_test
package middlewares
import (
"net/http"
"testing"
"github.com/containous/traefik/middlewares"
"github.com/containous/traefik/testhelpers"
"github.com/stretchr/testify/assert"
)
func TestReplacePath(t *testing.T) {
@@ -17,28 +18,24 @@ func TestReplacePath(t *testing.T) {
for _, path := range paths {
t.Run(path, func(t *testing.T) {
var newPath, oldPath string
handler := &middlewares.ReplacePath{
var expectedPath, actualHeader, requestURI string
handler := &ReplacePath{
Path: replacementPath,
Handler: http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
newPath = r.URL.Path
oldPath = r.Header.Get("X-Replaced-Path")
expectedPath = r.URL.Path
actualHeader = r.Header.Get(ReplacedPathHeader)
requestURI = r.RequestURI
}),
}
req, err := http.NewRequest("GET", "http://localhost"+path, nil)
if err != nil {
t.Error(err)
}
req := testhelpers.MustNewRequest(http.MethodGet, "http://localhost"+path, nil)
handler.ServeHTTP(nil, req)
if newPath != replacementPath {
t.Fatalf("new path should be '%s'", replacementPath)
}
if oldPath != path {
t.Fatalf("old path should be '%s'", path)
}
assert.Equal(t, expectedPath, replacementPath, "Unexpected path.")
assert.Equal(t, path, actualHeader, "Unexpected '%s' header.", ReplacedPathHeader)
assert.Equal(t, expectedPath, requestURI, "Unexpected request URI.")
})
}
}

View File

@@ -263,7 +263,7 @@ func (c *clientImpl) WatchAll(labelSelector string, stopCh <-chan struct{}) (<-c
// fireEvent checks if all controllers have synced before firing
// Used after startup or a reconnect
func (c *clientImpl) fireEvent(event interface{}, eventCh chan interface{}) {
if !c.ingController.HasSynced() || !c.svcController.HasSynced() || !c.epController.HasSynced() || !c.secController.HasSynced() {
if !c.ingController.HasSynced() || !c.svcController.HasSynced() || !c.epController.HasSynced() {
return
}
eventHandlerFunc(eventCh, event)

View File

@@ -4,6 +4,7 @@ import (
"bufio"
"bytes"
"errors"
"flag"
"fmt"
"os"
"reflect"
@@ -62,6 +63,12 @@ func (p *Provider) newK8sClient() (Client, error) {
// Provide allows the k8s provider to provide configurations to traefik
// using the given configuration channel.
func (p *Provider) Provide(configurationChan chan<- types.ConfigMessage, pool *safe.Pool, constraints types.Constraints) error {
// Tell glog (used by client-go) to log into STDERR. Otherwise, we risk
// certain kinds of API errors getting logged into a directory not
// available in a `FROM scratch` Docker container, causing glog to abort
// hard with an exit code > 0.
flag.Set("logtostderr", "true")
k8sClient, err := p.newK8sClient()
if err != nil {
return err

View File

@@ -3,6 +3,7 @@ package gziphandler
import (
"bufio"
"compress/gzip"
"errors"
"fmt"
"io"
"net"
@@ -97,6 +98,7 @@ func (w *GzipResponseWriter) Write(b []byte) (int, error) {
}
// Save the write into a buffer for later use in GZIP responseWriter (if content is long enough) or at close with regular responseWriter.
// On the first write, w.buf changes from nil to a valid slice
w.buf = append(w.buf, b...)
// If the global writes are bigger than the minSize, compression is enable.
@@ -122,7 +124,9 @@ func (w *GzipResponseWriter) startGzip() error {
w.Header().Del(contentLength)
// Write the header to gzip response.
w.writeHeader()
if w.code != 0 {
w.ResponseWriter.WriteHeader(w.code)
}
// Initialize the GZIP response.
w.init()
@@ -146,14 +150,6 @@ func (w *GzipResponseWriter) WriteHeader(code int) {
w.code = code
}
// writeHeader uses the saved code to send it to the ResponseWriter.
func (w *GzipResponseWriter) writeHeader() {
if w.code == 0 {
w.code = http.StatusOK
}
w.ResponseWriter.WriteHeader(w.code)
}
// init graps a new gzip writer from the gzipWriterPool and writes the correct
// content encoding header.
func (w *GzipResponseWriter) init() {
@@ -166,19 +162,18 @@ func (w *GzipResponseWriter) init() {
// Close will close the gzip.Writer and will put it back in the gzipWriterPool.
func (w *GzipResponseWriter) Close() error {
// Buffer not nil means the regular response must be returned.
if w.buf != nil {
w.writeHeader()
// Make the write into the regular response.
_, writeErr := w.ResponseWriter.Write(w.buf)
// Returns the error if any at write.
if writeErr != nil {
return fmt.Errorf("gziphandler: write to regular responseWriter at close gets error: %q", writeErr.Error())
}
}
// If the GZIP responseWriter is not set no needs to close it.
if w.gw == nil {
// Gzip not trigged yet, write out regular response.
if w.code != 0 {
w.ResponseWriter.WriteHeader(w.code)
}
if w.buf != nil {
_, writeErr := w.ResponseWriter.Write(w.buf)
// Returns the error if any at write.
if writeErr != nil {
return fmt.Errorf("gziphandler: write to regular responseWriter at close gets error: %q", writeErr.Error())
}
}
return nil
}
@@ -236,12 +231,22 @@ func NewGzipLevelHandler(level int) (func(http.Handler) http.Handler, error) {
// NewGzipLevelAndMinSize behave as NewGzipLevelHandler except it let the caller
// specify the minimum size before compression.
func NewGzipLevelAndMinSize(level, minSize int) (func(http.Handler) http.Handler, error) {
return NewGzipHandler(level, minSize, &GzipResponseWriter{})
}
// NewGzipHandler behave as NewGzipLevelHandler except it let the caller
// specify the minimum size before compression and a GzipWriter.
func NewGzipHandler(level, minSize int, gw GzipWriter) (func(http.Handler) http.Handler, error) {
if level != gzip.DefaultCompression && (level < gzip.BestSpeed || level > gzip.BestCompression) {
return nil, fmt.Errorf("invalid compression level requested: %d", level)
}
if minSize < 0 {
return nil, fmt.Errorf("minimum size must be more than zero")
return nil, errors.New("minimum size must be more than zero")
}
if gw == nil {
return nil, errors.New("the GzipWriter must be defined")
}
return func(h http.Handler) http.Handler {
index := poolIndex(level)
@@ -249,13 +254,9 @@ func NewGzipLevelAndMinSize(level, minSize int) (func(http.Handler) http.Handler
w.Header().Add(vary, acceptEncoding)
if acceptsGzip(r) {
gw := &GzipResponseWriter{
ResponseWriter: w,
index: index,
minSize: minSize,
buf: []byte{},
}
gw.SetResponseWriter(w)
gw.setIndex(index)
gw.setMinSize(minSize)
defer gw.Close()
h.ServeHTTP(gw, r)

58
vendor/github.com/NYTimes/gziphandler/wrapper.go generated vendored Normal file
View File

@@ -0,0 +1,58 @@
package gziphandler
import (
"bufio"
"net"
"net/http"
)
const (
contentEncodingHeader = "Content-Encoding"
)
// ----------
// http.ResponseWriter
// http.Hijacker
type GzipWriter interface {
Header() http.Header
Write([]byte) (int, error)
WriteHeader(int)
Hijack() (net.Conn, *bufio.ReadWriter, error)
Close() error
SetResponseWriter(http.ResponseWriter)
setIndex(int)
setMinSize(int)
}
func (w *GzipResponseWriter) SetResponseWriter(rw http.ResponseWriter) {
w.ResponseWriter = rw
}
func (w *GzipResponseWriter) setIndex(index int) {
w.index = index
}
func (w *GzipResponseWriter) setMinSize(minSize int) {
w.minSize = minSize
}
// --------
type GzipResponseWriterWrapper struct {
GzipResponseWriter
}
func (g *GzipResponseWriterWrapper) Write(b []byte) (int, error) {
if g.gw == nil && isEncoded(g.Header()) {
return g.ResponseWriter.Write(b)
}
return g.GzipResponseWriter.Write(b)
}
func isEncoded(headers http.Header) bool {
header := headers.Get(contentEncodingHeader)
// According to https://developer.mozilla.org/en-US/docs/Web/HTTP/Headers/Content-Encoding,
// content is not encoded if the header 'Content-Encoding' is empty or equals to 'identity'.
return header != "" && header != "identity"
}

View File

@@ -4,18 +4,16 @@
package forward
import (
"bufio"
"crypto/tls"
"io"
"net"
"net/http"
"net/url"
"os"
"reflect"
"strconv"
"strings"
"time"
"github.com/gorilla/websocket"
"github.com/vulcand/oxy/utils"
)
@@ -255,77 +253,47 @@ func (f *httpForwarder) copyRequest(req *http.Request, u *url.URL) *http.Request
// serveHTTP forwards websocket traffic
func (f *websocketForwarder) serveHTTP(w http.ResponseWriter, req *http.Request, ctx *handlerContext) {
outReq := f.copyRequest(req, req.URL)
host := outReq.URL.Host
dial := net.Dial
// if host does not specify a port, use the default http port
if !strings.Contains(host, ":") {
if outReq.URL.Scheme == "wss" {
host = host + ":443"
} else {
host = host + ":80"
}
dialer := websocket.DefaultDialer
if outReq.URL.Scheme == "wss" && f.TLSClientConfig != nil {
dialer.TLSClientConfig = f.TLSClientConfig
}
if outReq.URL.Scheme == "wss" {
if f.TLSClientConfig == nil {
f.TLSClientConfig = http.DefaultTransport.(*http.Transport).TLSClientConfig
}
dial = func(network, address string) (net.Conn, error) {
return tls.Dial("tcp", host, f.TLSClientConfig)
}
}
targetConn, err := dial("tcp", host)
targetConn, resp, err := dialer.Dial(outReq.URL.String(), outReq.Header)
if err != nil {
ctx.log.Errorf("Error dialing `%v`: %v", host, err)
ctx.log.Errorf("Error dialing `%v`: %v", outReq.Host, err)
ctx.errHandler.ServeHTTP(w, req, err)
return
}
hijacker, ok := w.(http.Hijacker)
if !ok {
ctx.log.Errorf("Unable to hijack the connection: %v", reflect.TypeOf(w))
ctx.errHandler.ServeHTTP(w, req, nil)
return
}
underlyingConn, _, err := hijacker.Hijack()
upgrader := websocket.Upgrader{}
utils.RemoveHeaders(resp.Header, WebsocketUpgradeHeaders...)
underlyingConn, err := upgrader.Upgrade(w, req, resp.Header)
if err != nil {
ctx.log.Errorf("Unable to hijack the connection: %v %v", reflect.TypeOf(w), err)
ctx.errHandler.ServeHTTP(w, req, err)
ctx.log.Errorf("Error while upgrading connection : %v", err)
return
}
// it is now caller's responsibility to Close the underlying connection
defer underlyingConn.Close()
defer targetConn.Close()
ctx.log.Infof("Writing outgoing Websocket request to target connection: %+v", outReq)
// write the modified incoming request to the dialed connection
if err = outReq.Write(targetConn); err != nil {
ctx.log.Errorf("Unable to copy request to target: %v", err)
ctx.errHandler.ServeHTTP(w, req, err)
return
errc := make(chan error, 2)
replicate := func(dst io.Writer, src io.Reader) {
_, err := io.Copy(dst, src)
errc <- err
}
br := bufio.NewReader(targetConn)
resp, err := http.ReadResponse(br, req)
resp.Write(underlyingConn)
defer resp.Body.Close()
go replicate(targetConn.UnderlyingConn(), underlyingConn.UnderlyingConn())
// We connect the conn only if the switching protocol has not failed
if resp.StatusCode == http.StatusSwitchingProtocols {
ctx.log.Infof("Switching protocol success")
errc := make(chan error, 2)
replicate := func(dst io.Writer, src io.Reader) {
_, err := io.Copy(dst, src)
errc <- err
}
go replicate(targetConn, underlyingConn)
go replicate(underlyingConn, targetConn)
<-errc
// Try to read the first message
t, msg, err := targetConn.ReadMessage()
if err != nil {
ctx.log.Errorf("Couldn't read first message : %v", err)
} else {
ctx.log.Infof("Switching protocol failed")
underlyingConn.WriteMessage(t, msg)
}
go replicate(underlyingConn.UnderlyingConn(), targetConn.UnderlyingConn())
<-errc
}
// copyRequest makes a copy of the specified request.
@@ -335,6 +303,7 @@ func (f *websocketForwarder) copyRequest(req *http.Request, u *url.URL) (outReq
outReq.URL = utils.CopyURL(req.URL)
outReq.URL.Scheme = u.Scheme
outReq.URL.Path = outReq.RequestURI
//sometimes backends might be registered as HTTP/HTTPS servers so translate URLs to websocket URLs.
switch u.Scheme {
@@ -345,19 +314,12 @@ func (f *websocketForwarder) copyRequest(req *http.Request, u *url.URL) (outReq
}
outReq.URL.Host = u.Host
outReq.URL.Opaque = req.RequestURI
// raw query is already included in RequestURI, so ignore it to avoid dupes
outReq.URL.RawQuery = ""
outReq.Proto = "HTTP/1.1"
outReq.ProtoMajor = 1
outReq.ProtoMinor = 1
// Overwrite close flag so we can keep persistent connection for the backend servers
outReq.Close = false
outReq.Header = make(http.Header)
utils.CopyHeaders(outReq.Header, req.Header)
utils.RemoveHeaders(outReq.Header, WebsocketDialHeaders...)
if f.rewriter != nil {
f.rewriter.Rewrite(outReq)

View File

@@ -1,20 +1,25 @@
package forward
const (
XForwardedProto = "X-Forwarded-Proto"
XForwardedFor = "X-Forwarded-For"
XForwardedHost = "X-Forwarded-Host"
XForwardedServer = "X-Forwarded-Server"
Connection = "Connection"
KeepAlive = "Keep-Alive"
ProxyAuthenticate = "Proxy-Authenticate"
ProxyAuthorization = "Proxy-Authorization"
Te = "Te" // canonicalized version of "TE"
Trailers = "Trailers"
TransferEncoding = "Transfer-Encoding"
Upgrade = "Upgrade"
ContentLength = "Content-Length"
ContentType = "Content-Type"
XForwardedProto = "X-Forwarded-Proto"
XForwardedFor = "X-Forwarded-For"
XForwardedHost = "X-Forwarded-Host"
XForwardedServer = "X-Forwarded-Server"
Connection = "Connection"
KeepAlive = "Keep-Alive"
ProxyAuthenticate = "Proxy-Authenticate"
ProxyAuthorization = "Proxy-Authorization"
Te = "Te" // canonicalized version of "TE"
Trailers = "Trailers"
TransferEncoding = "Transfer-Encoding"
Upgrade = "Upgrade"
ContentLength = "Content-Length"
ContentType = "Content-Type"
SecWebsocketKey = "Sec-Websocket-Key"
SecWebsocketVersion = "Sec-Websocket-Version"
SecWebsocketExtensions = "Sec-Websocket-Extensions"
SecWebsocketProtocol = "Sec-Websocket-Protocol"
SecWebsocketAccept = "Sec-Websocket-Accept"
)
// Hop-by-hop headers. These are removed when sent to the backend.
@@ -30,3 +35,19 @@ var HopHeaders = []string{
TransferEncoding,
Upgrade,
}
var WebsocketDialHeaders = []string{
Upgrade,
Connection,
SecWebsocketKey,
SecWebsocketVersion,
SecWebsocketExtensions,
SecWebsocketProtocol,
SecWebsocketAccept,
}
var WebsocketUpgradeHeaders = []string{
Upgrade,
Connection,
SecWebsocketAccept,
}