Update zipkin-go-opentracing.

This commit is contained in:
Ludovic Fernandez
2019-04-05 11:18:04 +02:00
committed by Traefiker Bot
parent 98a5e08553
commit de6e365b74
68 changed files with 2651 additions and 2293 deletions

View File

@@ -1,5 +1,5 @@
Apache Thrift
Copyright 2006-2010 The Apache Software Foundation.
Copyright 2006-2017 The Apache Software Foundation.
This product includes software developed at
The Apache Software Foundation (http://www.apache.org/).
The Apache Software Foundation (http://www.apache.org/).

View File

@@ -30,11 +30,22 @@ const (
PROTOCOL_ERROR = 7
)
var defaultApplicationExceptionMessage = map[int32]string{
UNKNOWN_APPLICATION_EXCEPTION: "unknown application exception",
UNKNOWN_METHOD: "unknown method",
INVALID_MESSAGE_TYPE_EXCEPTION: "invalid message type",
WRONG_METHOD_NAME: "wrong method name",
BAD_SEQUENCE_ID: "bad sequence ID",
MISSING_RESULT: "missing result",
INTERNAL_ERROR: "unknown internal error",
PROTOCOL_ERROR: "unknown protocol error",
}
// Application level Thrift exception
type TApplicationException interface {
TException
TypeId() int32
Read(iprot TProtocol) (TApplicationException, error)
Read(iprot TProtocol) error
Write(oprot TProtocol) error
}
@@ -44,7 +55,10 @@ type tApplicationException struct {
}
func (e tApplicationException) Error() string {
return e.message
if e.message != "" {
return e.message
}
return defaultApplicationExceptionMessage[e.type_]
}
func NewTApplicationException(type_ int32, message string) TApplicationException {
@@ -55,10 +69,11 @@ func (p *tApplicationException) TypeId() int32 {
return p.type_
}
func (p *tApplicationException) Read(iprot TProtocol) (TApplicationException, error) {
func (p *tApplicationException) Read(iprot TProtocol) error {
// TODO: this should really be generated by the compiler
_, err := iprot.ReadStructBegin()
if err != nil {
return nil, err
return err
}
message := ""
@@ -67,7 +82,7 @@ func (p *tApplicationException) Read(iprot TProtocol) (TApplicationException, er
for {
_, ttype, id, err := iprot.ReadFieldBegin()
if err != nil {
return nil, err
return err
}
if ttype == STOP {
break
@@ -76,33 +91,40 @@ func (p *tApplicationException) Read(iprot TProtocol) (TApplicationException, er
case 1:
if ttype == STRING {
if message, err = iprot.ReadString(); err != nil {
return nil, err
return err
}
} else {
if err = SkipDefaultDepth(iprot, ttype); err != nil {
return nil, err
return err
}
}
case 2:
if ttype == I32 {
if type_, err = iprot.ReadI32(); err != nil {
return nil, err
return err
}
} else {
if err = SkipDefaultDepth(iprot, ttype); err != nil {
return nil, err
return err
}
}
default:
if err = SkipDefaultDepth(iprot, ttype); err != nil {
return nil, err
return err
}
}
if err = iprot.ReadFieldEnd(); err != nil {
return nil, err
return err
}
}
return NewTApplicationException(type_, message), iprot.ReadStructEnd()
if err := iprot.ReadStructEnd(); err != nil {
return err
}
p.message = message
p.type_ = type_
return nil
}
func (p *tApplicationException) Write(oprot TProtocol) (err error) {

View File

@@ -21,6 +21,7 @@ package thrift
import (
"bytes"
"context"
"encoding/binary"
"errors"
"fmt"
@@ -447,9 +448,6 @@ func (p *TBinaryProtocol) ReadBinary() ([]byte, error) {
if size < 0 {
return nil, invalidDataLength
}
if uint64(size) > p.trans.RemainingBytes() {
return nil, invalidDataLength
}
isize := int(size)
buf := make([]byte, isize)
@@ -457,8 +455,8 @@ func (p *TBinaryProtocol) ReadBinary() ([]byte, error) {
return buf, NewTProtocolException(err)
}
func (p *TBinaryProtocol) Flush() (err error) {
return NewTProtocolException(p.trans.Flush())
func (p *TBinaryProtocol) Flush(ctx context.Context) (err error) {
return NewTProtocolException(p.trans.Flush(ctx))
}
func (p *TBinaryProtocol) Skip(fieldType TType) (err error) {
@@ -480,9 +478,6 @@ func (p *TBinaryProtocol) readStringBody(size int32) (value string, err error) {
if size < 0 {
return "", nil
}
if uint64(size) > p.trans.RemainingBytes() {
return "", invalidDataLength
}
var (
buf bytes.Buffer

View File

@@ -21,6 +21,7 @@ package thrift
import (
"bufio"
"context"
)
type TBufferedTransportFactory struct {
@@ -32,8 +33,8 @@ type TBufferedTransport struct {
tp TTransport
}
func (p *TBufferedTransportFactory) GetTransport(trans TTransport) TTransport {
return NewTBufferedTransport(trans, p.size)
func (p *TBufferedTransportFactory) GetTransport(trans TTransport) (TTransport, error) {
return NewTBufferedTransport(trans, p.size), nil
}
func NewTBufferedTransportFactory(bufferSize int) *TBufferedTransportFactory {
@@ -78,12 +79,12 @@ func (p *TBufferedTransport) Write(b []byte) (int, error) {
return n, err
}
func (p *TBufferedTransport) Flush() error {
func (p *TBufferedTransport) Flush(ctx context.Context) error {
if err := p.ReadWriter.Flush(); err != nil {
p.ReadWriter.Writer.Reset(p.tp)
return err
}
return p.tp.Flush()
return p.tp.Flush(ctx)
}
func (p *TBufferedTransport) RemainingBytes() (num_bytes uint64) {

View File

@@ -0,0 +1,85 @@
package thrift
import (
"context"
"fmt"
)
type TClient interface {
Call(ctx context.Context, method string, args, result TStruct) error
}
type TStandardClient struct {
seqId int32
iprot, oprot TProtocol
}
// TStandardClient implements TClient, and uses the standard message format for Thrift.
// It is not safe for concurrent use.
func NewTStandardClient(inputProtocol, outputProtocol TProtocol) *TStandardClient {
return &TStandardClient{
iprot: inputProtocol,
oprot: outputProtocol,
}
}
func (p *TStandardClient) Send(ctx context.Context, oprot TProtocol, seqId int32, method string, args TStruct) error {
if err := oprot.WriteMessageBegin(method, CALL, seqId); err != nil {
return err
}
if err := args.Write(oprot); err != nil {
return err
}
if err := oprot.WriteMessageEnd(); err != nil {
return err
}
return oprot.Flush(ctx)
}
func (p *TStandardClient) Recv(iprot TProtocol, seqId int32, method string, result TStruct) error {
rMethod, rTypeId, rSeqId, err := iprot.ReadMessageBegin()
if err != nil {
return err
}
if method != rMethod {
return NewTApplicationException(WRONG_METHOD_NAME, fmt.Sprintf("%s: wrong method name", method))
} else if seqId != rSeqId {
return NewTApplicationException(BAD_SEQUENCE_ID, fmt.Sprintf("%s: out of order sequence response", method))
} else if rTypeId == EXCEPTION {
var exception tApplicationException
if err := exception.Read(iprot); err != nil {
return err
}
if err := iprot.ReadMessageEnd(); err != nil {
return err
}
return &exception
} else if rTypeId != REPLY {
return NewTApplicationException(INVALID_MESSAGE_TYPE_EXCEPTION, fmt.Sprintf("%s: invalid message type", method))
}
if err := result.Read(iprot); err != nil {
return err
}
return iprot.ReadMessageEnd()
}
func (p *TStandardClient) Call(ctx context.Context, method string, args, result TStruct) error {
p.seqId++
seqId := p.seqId
if err := p.Send(ctx, p.oprot, seqId, method, args); err != nil {
return err
}
// method is oneway
if result == nil {
return nil
}
return p.Recv(p.iprot, seqId, method, result)
}

View File

@@ -20,6 +20,7 @@
package thrift
import (
"context"
"encoding/binary"
"fmt"
"io"
@@ -561,9 +562,6 @@ func (p *TCompactProtocol) ReadString() (value string, err error) {
if length < 0 {
return "", invalidDataLength
}
if uint64(length) > p.trans.RemainingBytes() {
return "", invalidDataLength
}
if length == 0 {
return "", nil
@@ -590,17 +588,14 @@ func (p *TCompactProtocol) ReadBinary() (value []byte, err error) {
if length < 0 {
return nil, invalidDataLength
}
if uint64(length) > p.trans.RemainingBytes() {
return nil, invalidDataLength
}
buf := make([]byte, length)
_, e = io.ReadFull(p.trans, buf)
return buf, NewTProtocolException(e)
}
func (p *TCompactProtocol) Flush() (err error) {
return NewTProtocolException(p.trans.Flush())
func (p *TCompactProtocol) Flush(ctx context.Context) (err error) {
return NewTProtocolException(p.trans.Flush(ctx))
}
func (p *TCompactProtocol) Skip(fieldType TType) (err error) {
@@ -806,7 +801,7 @@ func (p *TCompactProtocol) getTType(t tCompactType) (TType, error) {
case COMPACT_STRUCT:
return STRUCT, nil
}
return STOP, TException(fmt.Errorf("don't know what type: %s", t&0x0f))
return STOP, TException(fmt.Errorf("don't know what type: %v", t&0x0f))
}
// Given a TType value, find the appropriate TCompactProtocol.Types constant.

View File

@@ -19,12 +19,6 @@
package thrift
// A processor is a generic object which operates upon an input stream and
// writes to some output stream.
type TProcessor interface {
Process(in, out TProtocol) (bool, TException)
}
import "context"
type TProcessorFunction interface {
Process(seqId int32, in, out TProtocol) (bool, TException)
}
var defaultCtx = context.Background()

View File

@@ -20,6 +20,7 @@
package thrift
import (
"context"
"log"
)
@@ -258,8 +259,8 @@ func (tdp *TDebugProtocol) Skip(fieldType TType) (err error) {
log.Printf("%sSkip(fieldType=%#v) (err=%#v)", tdp.LogPrefix, fieldType, err)
return
}
func (tdp *TDebugProtocol) Flush() (err error) {
err = tdp.Delegate.Flush()
func (tdp *TDebugProtocol) Flush(ctx context.Context) (err error) {
err = tdp.Delegate.Flush(ctx)
log.Printf("%sFlush() (err=%#v)", tdp.LogPrefix, err)
return
}

View File

@@ -22,6 +22,7 @@ package thrift
import (
"bufio"
"bytes"
"context"
"encoding/binary"
"fmt"
"io"
@@ -48,11 +49,15 @@ func NewTFramedTransportFactory(factory TTransportFactory) TTransportFactory {
}
func NewTFramedTransportFactoryMaxLength(factory TTransportFactory, maxLength uint32) TTransportFactory {
return &tFramedTransportFactory{factory: factory, maxLength: maxLength}
return &tFramedTransportFactory{factory: factory, maxLength: maxLength}
}
func (p *tFramedTransportFactory) GetTransport(base TTransport) TTransport {
return NewTFramedTransportMaxLength(p.factory.GetTransport(base), p.maxLength)
func (p *tFramedTransportFactory) GetTransport(base TTransport) (TTransport, error) {
tt, err := p.factory.GetTransport(base)
if err != nil {
return nil, err
}
return NewTFramedTransportMaxLength(tt, p.maxLength), nil
}
func NewTFramedTransport(transport TTransport) *TFramedTransport {
@@ -131,21 +136,23 @@ func (p *TFramedTransport) WriteString(s string) (n int, err error) {
return p.buf.WriteString(s)
}
func (p *TFramedTransport) Flush() error {
func (p *TFramedTransport) Flush(ctx context.Context) error {
size := p.buf.Len()
buf := p.buffer[:4]
binary.BigEndian.PutUint32(buf, uint32(size))
_, err := p.transport.Write(buf)
if err != nil {
p.buf.Truncate(0)
return NewTTransportExceptionFromError(err)
}
if size > 0 {
if n, err := p.buf.WriteTo(p.transport); err != nil {
print("Error while flushing write buffer of size ", size, " to transport, only wrote ", n, " bytes: ", err.Error(), "\n")
p.buf.Truncate(0)
return NewTTransportExceptionFromError(err)
}
}
err = p.transport.Flush()
err = p.transport.Flush(ctx)
return NewTTransportExceptionFromError(err)
}
@@ -164,4 +171,3 @@ func (p *TFramedTransport) readFrameHeader() (uint32, error) {
func (p *TFramedTransport) RemainingBytes() (num_bytes uint64) {
return uint64(p.frameSize)
}

View File

@@ -21,6 +21,7 @@ package thrift
import (
"bytes"
"context"
"io"
"io/ioutil"
"net/http"
@@ -46,27 +47,16 @@ type THttpClient struct {
type THttpClientTransportFactory struct {
options THttpClientOptions
url string
isPost bool
}
func (p *THttpClientTransportFactory) GetTransport(trans TTransport) TTransport {
func (p *THttpClientTransportFactory) GetTransport(trans TTransport) (TTransport, error) {
if trans != nil {
t, ok := trans.(*THttpClient)
if ok && t.url != nil {
if t.requestBuffer != nil {
t2, _ := NewTHttpPostClientWithOptions(t.url.String(), p.options)
return t2
}
t2, _ := NewTHttpClientWithOptions(t.url.String(), p.options)
return t2
return NewTHttpClientWithOptions(t.url.String(), p.options)
}
}
if p.isPost {
s, _ := NewTHttpPostClientWithOptions(p.url, p.options)
return s
}
s, _ := NewTHttpClientWithOptions(p.url, p.options)
return s
return NewTHttpClientWithOptions(p.url, p.options)
}
type THttpClientOptions struct {
@@ -79,39 +69,10 @@ func NewTHttpClientTransportFactory(url string) *THttpClientTransportFactory {
}
func NewTHttpClientTransportFactoryWithOptions(url string, options THttpClientOptions) *THttpClientTransportFactory {
return &THttpClientTransportFactory{url: url, isPost: false, options: options}
}
func NewTHttpPostClientTransportFactory(url string) *THttpClientTransportFactory {
return NewTHttpPostClientTransportFactoryWithOptions(url, THttpClientOptions{})
}
func NewTHttpPostClientTransportFactoryWithOptions(url string, options THttpClientOptions) *THttpClientTransportFactory {
return &THttpClientTransportFactory{url: url, isPost: true, options: options}
return &THttpClientTransportFactory{url: url, options: options}
}
func NewTHttpClientWithOptions(urlstr string, options THttpClientOptions) (TTransport, error) {
parsedURL, err := url.Parse(urlstr)
if err != nil {
return nil, err
}
response, err := http.Get(urlstr)
if err != nil {
return nil, err
}
client := options.Client
if client == nil {
client = DefaultHttpClient
}
httpHeader := map[string][]string{"Content-Type": []string{"application/x-thrift"}}
return &THttpClient{client: client, response: response, url: parsedURL, header: httpHeader}, nil
}
func NewTHttpClient(urlstr string) (TTransport, error) {
return NewTHttpClientWithOptions(urlstr, THttpClientOptions{})
}
func NewTHttpPostClientWithOptions(urlstr string, options THttpClientOptions) (TTransport, error) {
parsedURL, err := url.Parse(urlstr)
if err != nil {
return nil, err
@@ -121,12 +82,12 @@ func NewTHttpPostClientWithOptions(urlstr string, options THttpClientOptions) (T
if client == nil {
client = DefaultHttpClient
}
httpHeader := map[string][]string{"Content-Type": []string{"application/x-thrift"}}
httpHeader := map[string][]string{"Content-Type": {"application/x-thrift"}}
return &THttpClient{client: client, url: parsedURL, requestBuffer: bytes.NewBuffer(buf), header: httpHeader}, nil
}
func NewTHttpPostClient(urlstr string) (TTransport, error) {
return NewTHttpPostClientWithOptions(urlstr, THttpClientOptions{})
func NewTHttpClient(urlstr string) (TTransport, error) {
return NewTHttpClientWithOptions(urlstr, THttpClientOptions{})
}
// Set the HTTP Header for this specific Thrift Transport
@@ -221,7 +182,7 @@ func (p *THttpClient) WriteString(s string) (n int, err error) {
return p.requestBuffer.WriteString(s)
}
func (p *THttpClient) Flush() error {
func (p *THttpClient) Flush(ctx context.Context) error {
// Close any previous response body to avoid leaking connections.
p.closeResponse()
@@ -230,6 +191,9 @@ func (p *THttpClient) Flush() error {
return NewTTransportExceptionFromError(err)
}
req.Header = p.header
if ctx != nil {
req = req.WithContext(ctx)
}
response, err := p.client.Do(req)
if err != nil {
return NewTTransportExceptionFromError(err)
@@ -256,3 +220,23 @@ func (p *THttpClient) RemainingBytes() (num_bytes uint64) {
const maxSize = ^uint64(0)
return maxSize // the thruth is, we just don't know unless framed is used
}
// Deprecated: Use NewTHttpClientTransportFactory instead.
func NewTHttpPostClientTransportFactory(url string) *THttpClientTransportFactory {
return NewTHttpClientTransportFactoryWithOptions(url, THttpClientOptions{})
}
// Deprecated: Use NewTHttpClientTransportFactoryWithOptions instead.
func NewTHttpPostClientTransportFactoryWithOptions(url string, options THttpClientOptions) *THttpClientTransportFactory {
return NewTHttpClientTransportFactoryWithOptions(url, options)
}
// Deprecated: Use NewTHttpClientWithOptions instead.
func NewTHttpPostClientWithOptions(urlstr string, options THttpClientOptions) (TTransport, error) {
return NewTHttpClientWithOptions(urlstr, options)
}
// Deprecated: Use NewTHttpClient instead.
func NewTHttpPostClient(urlstr string) (TTransport, error) {
return NewTHttpClientWithOptions(urlstr, THttpClientOptions{})
}

View File

@@ -19,16 +19,45 @@
package thrift
import "net/http"
import (
"compress/gzip"
"io"
"net/http"
"strings"
)
// NewThriftHandlerFunc is a function that create a ready to use Apache Thrift Handler function
func NewThriftHandlerFunc(processor TProcessor,
inPfactory, outPfactory TProtocolFactory) func(w http.ResponseWriter, r *http.Request) {
return func(w http.ResponseWriter, r *http.Request) {
return gz(func(w http.ResponseWriter, r *http.Request) {
w.Header().Add("Content-Type", "application/x-thrift")
transport := NewStreamTransport(r.Body, w)
processor.Process(inPfactory.GetProtocol(transport), outPfactory.GetProtocol(transport))
processor.Process(r.Context(), inPfactory.GetProtocol(transport), outPfactory.GetProtocol(transport))
})
}
// gz transparently compresses the HTTP response if the client supports it.
func gz(handler http.HandlerFunc) http.HandlerFunc {
return func(w http.ResponseWriter, r *http.Request) {
if !strings.Contains(r.Header.Get("Accept-Encoding"), "gzip") {
handler(w, r)
return
}
w.Header().Set("Content-Encoding", "gzip")
gz := gzip.NewWriter(w)
defer gz.Close()
gzw := gzipResponseWriter{Writer: gz, ResponseWriter: w}
handler(gzw, r)
}
}
type gzipResponseWriter struct {
io.Writer
http.ResponseWriter
}
func (w gzipResponseWriter) Write(b []byte) (int, error) {
return w.Writer.Write(b)
}

View File

@@ -21,6 +21,7 @@ package thrift
import (
"bufio"
"context"
"io"
)
@@ -38,38 +39,38 @@ type StreamTransportFactory struct {
isReadWriter bool
}
func (p *StreamTransportFactory) GetTransport(trans TTransport) TTransport {
func (p *StreamTransportFactory) GetTransport(trans TTransport) (TTransport, error) {
if trans != nil {
t, ok := trans.(*StreamTransport)
if ok {
if t.isReadWriter {
return NewStreamTransportRW(t.Reader.(io.ReadWriter))
return NewStreamTransportRW(t.Reader.(io.ReadWriter)), nil
}
if t.Reader != nil && t.Writer != nil {
return NewStreamTransport(t.Reader, t.Writer)
return NewStreamTransport(t.Reader, t.Writer), nil
}
if t.Reader != nil && t.Writer == nil {
return NewStreamTransportR(t.Reader)
return NewStreamTransportR(t.Reader), nil
}
if t.Reader == nil && t.Writer != nil {
return NewStreamTransportW(t.Writer)
return NewStreamTransportW(t.Writer), nil
}
return &StreamTransport{}
return &StreamTransport{}, nil
}
}
if p.isReadWriter {
return NewStreamTransportRW(p.Reader.(io.ReadWriter))
return NewStreamTransportRW(p.Reader.(io.ReadWriter)), nil
}
if p.Reader != nil && p.Writer != nil {
return NewStreamTransport(p.Reader, p.Writer)
return NewStreamTransport(p.Reader, p.Writer), nil
}
if p.Reader != nil && p.Writer == nil {
return NewStreamTransportR(p.Reader)
return NewStreamTransportR(p.Reader), nil
}
if p.Reader == nil && p.Writer != nil {
return NewStreamTransportW(p.Writer)
return NewStreamTransportW(p.Writer), nil
}
return &StreamTransport{}
return &StreamTransport{}, nil
}
func NewStreamTransportFactory(reader io.Reader, writer io.Writer, isReadWriter bool) *StreamTransportFactory {
@@ -138,7 +139,7 @@ func (p *StreamTransport) Close() error {
}
// Flushes the underlying output stream if not null.
func (p *StreamTransport) Flush() error {
func (p *StreamTransport) Flush(ctx context.Context) error {
if p.Writer == nil {
return NewTTransportException(NOT_OPEN, "Cannot flush null outputStream")
}
@@ -209,6 +210,5 @@ func (p *StreamTransport) WriteString(s string) (n int, err error) {
func (p *StreamTransport) RemainingBytes() (num_bytes uint64) {
const maxSize = ^uint64(0)
return maxSize // the thruth is, we just don't know unless framed is used
return maxSize // the thruth is, we just don't know unless framed is used
}

View File

@@ -20,6 +20,7 @@
package thrift
import (
"context"
"encoding/base64"
"fmt"
)
@@ -438,10 +439,10 @@ func (p *TJSONProtocol) ReadBinary() ([]byte, error) {
return v, p.ParsePostValue()
}
func (p *TJSONProtocol) Flush() (err error) {
func (p *TJSONProtocol) Flush(ctx context.Context) (err error) {
err = p.writer.Flush()
if err == nil {
err = p.trans.Flush()
err = p.trans.Flush(ctx)
}
return NewTProtocolException(err)
}

View File

@@ -21,6 +21,7 @@ package thrift
import (
"bytes"
"context"
)
// Memory buffer-based implementation of the TTransport interface.
@@ -33,14 +34,14 @@ type TMemoryBufferTransportFactory struct {
size int
}
func (p *TMemoryBufferTransportFactory) GetTransport(trans TTransport) TTransport {
func (p *TMemoryBufferTransportFactory) GetTransport(trans TTransport) (TTransport, error) {
if trans != nil {
t, ok := trans.(*TMemoryBuffer)
if ok && t.size > 0 {
return NewTMemoryBufferLen(t.size)
return NewTMemoryBufferLen(t.size), nil
}
}
return NewTMemoryBufferLen(p.size)
return NewTMemoryBufferLen(p.size), nil
}
func NewTMemoryBufferTransportFactory(size int) *TMemoryBufferTransportFactory {
@@ -70,7 +71,7 @@ func (p *TMemoryBuffer) Close() error {
}
// Flushing a memory buffer is a no-op
func (p *TMemoryBuffer) Flush() error {
func (p *TMemoryBuffer) Flush(ctx context.Context) error {
return nil
}

View File

@@ -20,6 +20,7 @@
package thrift
import (
"context"
"fmt"
"strings"
)
@@ -127,7 +128,7 @@ func (t *TMultiplexedProcessor) RegisterProcessor(name string, processor TProces
t.serviceProcessorMap[name] = processor
}
func (t *TMultiplexedProcessor) Process(in, out TProtocol) (bool, TException) {
func (t *TMultiplexedProcessor) Process(ctx context.Context, in, out TProtocol) (bool, TException) {
name, typeId, seqid, err := in.ReadMessageBegin()
if err != nil {
return false, err
@@ -140,7 +141,7 @@ func (t *TMultiplexedProcessor) Process(in, out TProtocol) (bool, TException) {
if len(v) != 2 {
if t.DefaultProcessor != nil {
smb := NewStoredMessageProtocol(in, name, typeId, seqid)
return t.DefaultProcessor.Process(smb, out)
return t.DefaultProcessor.Process(ctx, smb, out)
}
return false, fmt.Errorf("Service name not found in message name: %s. Did you forget to use a TMultiplexProtocol in your client?", name)
}
@@ -149,7 +150,7 @@ func (t *TMultiplexedProcessor) Process(in, out TProtocol) (bool, TException) {
return false, fmt.Errorf("Service name not found: %s. Did you forget to call registerProcessor()?", v[0])
}
smb := NewStoredMessageProtocol(in, v[1], typeId, seqid)
return actualProcessor.Process(smb, out)
return actualProcessor.Process(ctx, smb, out)
}
//Protocol that use stored message for ReadMessageBegin

View File

@@ -19,6 +19,18 @@
package thrift
import "context"
// A processor is a generic object which operates upon an input stream and
// writes to some output stream.
type TProcessor interface {
Process(ctx context.Context, in, out TProtocol) (bool, TException)
}
type TProcessorFunction interface {
Process(ctx context.Context, seqId int32, in, out TProtocol) (bool, TException)
}
// The default processor factory just returns a singleton
// instance.
type TProcessorFactory interface {

View File

@@ -20,7 +20,9 @@
package thrift
import (
"context"
"errors"
"fmt"
)
const (
@@ -73,7 +75,7 @@ type TProtocol interface {
ReadBinary() (value []byte, err error)
Skip(fieldType TType) (err error)
Flush() (err error)
Flush(ctx context.Context) (err error)
Transport() TTransport
}
@@ -88,9 +90,9 @@ func SkipDefaultDepth(prot TProtocol, typeId TType) (err error) {
// Skips over the next data element from the provided input TProtocol object.
func Skip(self TProtocol, fieldType TType, maxDepth int) (err error) {
if maxDepth <= 0 {
return NewTProtocolExceptionWithType( DEPTH_LIMIT, errors.New("Depth limit exceeded"))
if maxDepth <= 0 {
return NewTProtocolExceptionWithType(DEPTH_LIMIT, errors.New("Depth limit exceeded"))
}
switch fieldType {
@@ -170,6 +172,8 @@ func Skip(self TProtocol, fieldType TType, maxDepth int) (err error) {
}
}
return self.ReadListEnd()
default:
return NewTProtocolExceptionWithType(INVALID_DATA, errors.New(fmt.Sprintf("Unknown data type %d", fieldType)))
}
return nil
}

View File

@@ -60,7 +60,7 @@ func NewTProtocolException(err error) TProtocolException {
if err == nil {
return nil
}
if e,ok := err.(TProtocolException); ok {
if e, ok := err.(TProtocolException); ok {
return e
}
if _, ok := err.(base64.CorruptInputError); ok {
@@ -75,4 +75,3 @@ func NewTProtocolExceptionWithType(errType int, err error) TProtocolException {
}
return &tProtocolException{errType, err.Error()}
}

View File

@@ -66,4 +66,3 @@ func writeByte(w io.Writer, c byte) error {
_, err := w.Write(v[0:1])
return err
}

View File

@@ -19,6 +19,10 @@
package thrift
import (
"context"
)
type TSerializer struct {
Transport *TMemoryBuffer
Protocol TProtocol
@@ -38,35 +42,35 @@ func NewTSerializer() *TSerializer {
protocol}
}
func (t *TSerializer) WriteString(msg TStruct) (s string, err error) {
func (t *TSerializer) WriteString(ctx context.Context, msg TStruct) (s string, err error) {
t.Transport.Reset()
if err = msg.Write(t.Protocol); err != nil {
return
}
if err = t.Protocol.Flush(); err != nil {
if err = t.Protocol.Flush(ctx); err != nil {
return
}
if err = t.Transport.Flush(); err != nil {
if err = t.Transport.Flush(ctx); err != nil {
return
}
return t.Transport.String(), nil
}
func (t *TSerializer) Write(msg TStruct) (b []byte, err error) {
func (t *TSerializer) Write(ctx context.Context, msg TStruct) (b []byte, err error) {
t.Transport.Reset()
if err = msg.Write(t.Protocol); err != nil {
return
}
if err = t.Protocol.Flush(); err != nil {
if err = t.Protocol.Flush(ctx); err != nil {
return
}
if err = t.Transport.Flush(); err != nil {
if err = t.Transport.Flush(ctx); err != nil {
return
}

View File

@@ -47,7 +47,14 @@ func NewTServerSocketTimeout(listenAddr string, clientTimeout time.Duration) (*T
return &TServerSocket{addr: addr, clientTimeout: clientTimeout}, nil
}
// Creates a TServerSocket from a net.Addr
func NewTServerSocketFromAddrTimeout(addr net.Addr, clientTimeout time.Duration) *TServerSocket {
return &TServerSocket{addr: addr, clientTimeout: clientTimeout}
}
func (p *TServerSocket) Listen() error {
p.mu.Lock()
defer p.mu.Unlock()
if p.IsListening() {
return nil
}
@@ -67,10 +74,15 @@ func (p *TServerSocket) Accept() (TTransport, error) {
if interrupted {
return nil, errTransportInterrupted
}
if p.listener == nil {
p.mu.Lock()
listener := p.listener
p.mu.Unlock()
if listener == nil {
return nil, NewTTransportException(NOT_OPEN, "No underlying server socket")
}
conn, err := p.listener.Accept()
conn, err := listener.Accept()
if err != nil {
return nil, NewTTransportExceptionFromError(err)
}
@@ -84,6 +96,8 @@ func (p *TServerSocket) IsListening() bool {
// Connects the socket, creating a new socket object if necessary.
func (p *TServerSocket) Open() error {
p.mu.Lock()
defer p.mu.Unlock()
if p.IsListening() {
return NewTTransportException(ALREADY_OPEN, "Server socket already open")
}
@@ -103,20 +117,21 @@ func (p *TServerSocket) Addr() net.Addr {
}
func (p *TServerSocket) Close() error {
defer func() {
p.listener = nil
}()
var err error
p.mu.Lock()
if p.IsListening() {
return p.listener.Close()
err = p.listener.Close()
p.listener = nil
}
return nil
p.mu.Unlock()
return err
}
func (p *TServerSocket) Interrupt() error {
p.mu.Lock()
p.interrupted = true
p.Close()
p.mu.Unlock()
p.Close()
return nil
}

View File

@@ -22,6 +22,7 @@ package thrift
import (
"bufio"
"bytes"
"context"
"encoding/base64"
"encoding/json"
"fmt"
@@ -552,7 +553,7 @@ func (p *TSimpleJSONProtocol) ReadBinary() ([]byte, error) {
return v, p.ParsePostValue()
}
func (p *TSimpleJSONProtocol) Flush() (err error) {
func (p *TSimpleJSONProtocol) Flush(ctx context.Context) (err error) {
return NewTProtocolException(p.writer.Flush())
}
@@ -1064,7 +1065,7 @@ func (p *TSimpleJSONProtocol) ParseListEnd() error {
for _, char := range line {
switch char {
default:
e := fmt.Errorf("Expecting end of list \"]\", but found: \"", line, "\"")
e := fmt.Errorf("Expecting end of list \"]\", but found: \"%v\"", line)
return NewTProtocolExceptionWithType(INVALID_DATA, e)
case ' ', '\n', '\r', '\t', rune(JSON_RBRACKET[0]):
break

View File

@@ -23,11 +23,18 @@ import (
"log"
"runtime/debug"
"sync"
"sync/atomic"
)
// Simple, non-concurrent server for testing.
/*
* This is not a typical TSimpleServer as it is not blocked after accept a socket.
* It is more like a TThreadedServer that can handle different connections in different goroutines.
* This will work if golang user implements a conn-pool like thing in client side.
*/
type TSimpleServer struct {
quit chan struct{}
closed int32
wg sync.WaitGroup
mu sync.Mutex
processorFactory TProcessorFactory
serverTransport TServerTransport
@@ -87,7 +94,6 @@ func NewTSimpleServerFactory6(processorFactory TProcessorFactory, serverTranspor
outputTransportFactory: outputTransportFactory,
inputProtocolFactory: inputProtocolFactory,
outputProtocolFactory: outputProtocolFactory,
quit: make(chan struct{}, 1),
}
}
@@ -119,23 +125,37 @@ func (p *TSimpleServer) Listen() error {
return p.serverTransport.Listen()
}
func (p *TSimpleServer) innerAccept() (int32, error) {
client, err := p.serverTransport.Accept()
p.mu.Lock()
defer p.mu.Unlock()
closed := atomic.LoadInt32(&p.closed)
if closed != 0 {
return closed, nil
}
if err != nil {
return 0, err
}
if client != nil {
p.wg.Add(1)
go func() {
defer p.wg.Done()
if err := p.processRequests(client); err != nil {
log.Println("error processing request:", err)
}
}()
}
return 0, nil
}
func (p *TSimpleServer) AcceptLoop() error {
for {
client, err := p.serverTransport.Accept()
closed, err := p.innerAccept()
if err != nil {
select {
case <-p.quit:
return nil
default:
}
return err
}
if client != nil {
go func() {
if err := p.processRequests(client); err != nil {
log.Println("error processing request:", err)
}
}()
if closed != 0 {
return nil
}
}
}
@@ -149,21 +169,28 @@ func (p *TSimpleServer) Serve() error {
return nil
}
var once sync.Once
func (p *TSimpleServer) Stop() error {
q := func() {
p.quit <- struct{}{}
p.serverTransport.Interrupt()
p.mu.Lock()
defer p.mu.Unlock()
if atomic.LoadInt32(&p.closed) != 0 {
return nil
}
once.Do(q)
atomic.StoreInt32(&p.closed, 1)
p.serverTransport.Interrupt()
p.wg.Wait()
return nil
}
func (p *TSimpleServer) processRequests(client TTransport) error {
processor := p.processorFactory.GetProcessor(client)
inputTransport := p.inputTransportFactory.GetTransport(client)
outputTransport := p.outputTransportFactory.GetTransport(client)
inputTransport, err := p.inputTransportFactory.GetTransport(client)
if err != nil {
return err
}
outputTransport, err := p.outputTransportFactory.GetTransport(client)
if err != nil {
return err
}
inputProtocol := p.inputProtocolFactory.GetProtocol(inputTransport)
outputProtocol := p.outputProtocolFactory.GetProtocol(outputTransport)
defer func() {
@@ -171,6 +198,7 @@ func (p *TSimpleServer) processRequests(client TTransport) error {
log.Printf("panic in processor: %s: %s", e, debug.Stack())
}
}()
if inputTransport != nil {
defer inputTransport.Close()
}
@@ -178,17 +206,20 @@ func (p *TSimpleServer) processRequests(client TTransport) error {
defer outputTransport.Close()
}
for {
ok, err := processor.Process(inputProtocol, outputProtocol)
if atomic.LoadInt32(&p.closed) != 0 {
return nil
}
ok, err := processor.Process(defaultCtx, inputProtocol, outputProtocol)
if err, ok := err.(TTransportException); ok && err.TypeId() == END_OF_FILE {
return nil
} else if err != nil {
log.Printf("error processing request: %s", err)
return err
}
if err, ok := err.(TApplicationException); ok && err.TypeId() == UNKNOWN_METHOD {
continue
}
if !ok {
if !ok {
break
}
}

View File

@@ -20,6 +20,7 @@
package thrift
import (
"context"
"net"
"time"
)
@@ -148,7 +149,7 @@ func (p *TSocket) Write(buf []byte) (int, error) {
return p.conn.Write(buf)
}
func (p *TSocket) Flush() error {
func (p *TSocket) Flush(ctx context.Context) error {
return nil
}
@@ -161,6 +162,5 @@ func (p *TSocket) Interrupt() error {
func (p *TSocket) RemainingBytes() (num_bytes uint64) {
const maxSize = ^uint64(0)
return maxSize // the thruth is, we just don't know unless framed is used
return maxSize // the thruth is, we just don't know unless framed is used
}

View File

@@ -20,9 +20,9 @@
package thrift
import (
"crypto/tls"
"net"
"time"
"crypto/tls"
)
type TSSLServerSocket struct {
@@ -38,6 +38,9 @@ func NewTSSLServerSocket(listenAddr string, cfg *tls.Config) (*TSSLServerSocket,
}
func NewTSSLServerSocketTimeout(listenAddr string, cfg *tls.Config, clientTimeout time.Duration) (*TSSLServerSocket, error) {
if cfg.MinVersion == 0 {
cfg.MinVersion = tls.VersionTLS10
}
addr, err := net.ResolveTCPAddr("tcp", listenAddr)
if err != nil {
return nil, err

View File

@@ -20,6 +20,7 @@
package thrift
import (
"context"
"crypto/tls"
"net"
"time"
@@ -48,6 +49,9 @@ func NewTSSLSocket(hostPort string, cfg *tls.Config) (*TSSLSocket, error) {
// NewTSSLSocketTimeout creates a net.Conn-backed TTransport, given a host and port
// it also accepts a tls Configuration and a timeout as a time.Duration
func NewTSSLSocketTimeout(hostPort string, cfg *tls.Config, timeout time.Duration) (*TSSLSocket, error) {
if cfg.MinVersion == 0 {
cfg.MinVersion = tls.VersionTLS10
}
return &TSSLSocket{hostPort: hostPort, timeout: timeout, cfg: cfg}, nil
}
@@ -87,7 +91,8 @@ func (p *TSSLSocket) Open() error {
// If we have a hostname, we need to pass the hostname to tls.Dial for
// certificate hostname checks.
if p.hostPort != "" {
if p.conn, err = tls.Dial("tcp", p.hostPort, p.cfg); err != nil {
if p.conn, err = tls.DialWithDialer(&net.Dialer{
Timeout: p.timeout}, "tcp", p.hostPort, p.cfg); err != nil {
return NewTTransportException(NOT_OPEN, err.Error())
}
} else {
@@ -103,7 +108,8 @@ func (p *TSSLSocket) Open() error {
if len(p.addr.String()) == 0 {
return NewTTransportException(NOT_OPEN, "Cannot open bad address.")
}
if p.conn, err = tls.Dial(p.addr.Network(), p.addr.String(), p.cfg); err != nil {
if p.conn, err = tls.DialWithDialer(&net.Dialer{
Timeout: p.timeout}, p.addr.Network(), p.addr.String(), p.cfg); err != nil {
return NewTTransportException(NOT_OPEN, err.Error())
}
}
@@ -153,7 +159,7 @@ func (p *TSSLSocket) Write(buf []byte) (int, error) {
return p.conn.Write(buf)
}
func (p *TSSLSocket) Flush() error {
func (p *TSSLSocket) Flush(ctx context.Context) error {
return nil
}
@@ -166,6 +172,5 @@ func (p *TSSLSocket) Interrupt() error {
func (p *TSSLSocket) RemainingBytes() (num_bytes uint64) {
const maxSize = ^uint64(0)
return maxSize // the thruth is, we just don't know unless framed is used
return maxSize // the thruth is, we just don't know unless framed is used
}

View File

@@ -20,6 +20,7 @@
package thrift
import (
"context"
"errors"
"io"
)
@@ -30,15 +31,18 @@ type Flusher interface {
Flush() (err error)
}
type ContextFlusher interface {
Flush(ctx context.Context) (err error)
}
type ReadSizeProvider interface {
RemainingBytes() (num_bytes uint64)
}
// Encapsulates the I/O layer
type TTransport interface {
io.ReadWriteCloser
Flusher
ContextFlusher
ReadSizeProvider
// Opens the transport for communication
@@ -52,7 +56,6 @@ type stringWriter interface {
WriteString(s string) (n int, err error)
}
// This is "enchanced" transport with extra capabilities. You need to use one of these
// to construct protocol.
// Notably, TSocket does not implement this interface, and it is always a mistake to use
@@ -62,7 +65,6 @@ type TRichTransport interface {
io.ByteReader
io.ByteWriter
stringWriter
Flusher
ContextFlusher
ReadSizeProvider
}

View File

@@ -24,14 +24,14 @@ package thrift
// a ServerTransport and then may want to mutate them (i.e. create
// a BufferedTransport from the underlying base transport)
type TTransportFactory interface {
GetTransport(trans TTransport) TTransport
GetTransport(trans TTransport) (TTransport, error)
}
type tTransportFactory struct{}
// Return a wrapped instance of the base Transport.
func (p *tTransportFactory) GetTransport(trans TTransport) TTransport {
return trans
func (p *tTransportFactory) GetTransport(trans TTransport) (TTransport, error) {
return trans, nil
}
func NewTTransportFactory() TTransportFactory {

View File

@@ -21,13 +21,15 @@ package thrift
import (
"compress/zlib"
"context"
"io"
"log"
)
// TZlibTransportFactory is a factory for TZlibTransport instances
type TZlibTransportFactory struct {
level int
level int
factory TTransportFactory
}
// TZlibTransport is a TTransport implementation that makes use of zlib compression.
@@ -38,14 +40,27 @@ type TZlibTransport struct {
}
// GetTransport constructs a new instance of NewTZlibTransport
func (p *TZlibTransportFactory) GetTransport(trans TTransport) TTransport {
t, _ := NewTZlibTransport(trans, p.level)
return t
func (p *TZlibTransportFactory) GetTransport(trans TTransport) (TTransport, error) {
if p.factory != nil {
// wrap other factory
var err error
trans, err = p.factory.GetTransport(trans)
if err != nil {
return nil, err
}
}
return NewTZlibTransport(trans, p.level)
}
// NewTZlibTransportFactory constructs a new instance of NewTZlibTransportFactory
func NewTZlibTransportFactory(level int) *TZlibTransportFactory {
return &TZlibTransportFactory{level: level}
return &TZlibTransportFactory{level: level, factory: nil}
}
// NewTZlibTransportFactory constructs a new instance of TZlibTransportFactory
// as a wrapper over existing transport factory
func NewTZlibTransportFactoryWithFactory(level int, factory TTransportFactory) *TZlibTransportFactory {
return &TZlibTransportFactory{level: level, factory: factory}
}
// NewTZlibTransport constructs a new instance of TZlibTransport
@@ -77,11 +92,11 @@ func (z *TZlibTransport) Close() error {
}
// Flush flushes the writer and its underlying transport.
func (z *TZlibTransport) Flush() error {
func (z *TZlibTransport) Flush(ctx context.Context) error {
if err := z.writer.Flush(); err != nil {
return err
}
return z.transport.Flush()
return z.transport.Flush(ctx)
}
// IsOpen returns true if the transport is open

239
vendor/github.com/apache/thrift/tutorial/hs/LICENSE generated vendored Normal file
View File

@@ -0,0 +1,239 @@
Apache License
Version 2.0, January 2004
http://www.apache.org/licenses/
TERMS AND CONDITIONS FOR USE, REPRODUCTION, AND DISTRIBUTION
1. Definitions.
"License" shall mean the terms and conditions for use, reproduction,
and distribution as defined by Sections 1 through 9 of this document.
"Licensor" shall mean the copyright owner or entity authorized by
the copyright owner that is granting the License.
"Legal Entity" shall mean the union of the acting entity and all
other entities that control, are controlled by, or are under common
control with that entity. For the purposes of this definition,
"control" means (i) the power, direct or indirect, to cause the
direction or management of such entity, whether by contract or
otherwise, or (ii) ownership of fifty percent (50%) or more of the
outstanding shares, or (iii) beneficial ownership of such entity.
"You" (or "Your") shall mean an individual or Legal Entity
exercising permissions granted by this License.
"Source" form shall mean the preferred form for making modifications,
including but not limited to software source code, documentation
source, and configuration files.
"Object" form shall mean any form resulting from mechanical
transformation or translation of a Source form, including but
not limited to compiled object code, generated documentation,
and conversions to other media types.
"Work" shall mean the work of authorship, whether in Source or
Object form, made available under the License, as indicated by a
copyright notice that is included in or attached to the work
(an example is provided in the Appendix below).
"Derivative Works" shall mean any work, whether in Source or Object
form, that is based on (or derived from) the Work and for which the
editorial revisions, annotations, elaborations, or other modifications
represent, as a whole, an original work of authorship. For the purposes
of this License, Derivative Works shall not include works that remain
separable from, or merely link (or bind by name) to the interfaces of,
the Work and Derivative Works thereof.
"Contribution" shall mean any work of authorship, including
the original version of the Work and any modifications or additions
to that Work or Derivative Works thereof, that is intentionally
submitted to Licensor for inclusion in the Work by the copyright owner
or by an individual or Legal Entity authorized to submit on behalf of
the copyright owner. For the purposes of this definition, "submitted"
means any form of electronic, verbal, or written communication sent
to the Licensor or its representatives, including but not limited to
communication on electronic mailing lists, source code control systems,
and issue tracking systems that are managed by, or on behalf of, the
Licensor for the purpose of discussing and improving the Work, but
excluding communication that is conspicuously marked or otherwise
designated in writing by the copyright owner as "Not a Contribution."
"Contributor" shall mean Licensor and any individual or Legal Entity
on behalf of whom a Contribution has been received by Licensor and
subsequently incorporated within the Work.
2. Grant of Copyright License. Subject to the terms and conditions of
this License, each Contributor hereby grants to You a perpetual,
worldwide, non-exclusive, no-charge, royalty-free, irrevocable
copyright license to reproduce, prepare Derivative Works of,
publicly display, publicly perform, sublicense, and distribute the
Work and such Derivative Works in Source or Object form.
3. Grant of Patent License. Subject to the terms and conditions of
this License, each Contributor hereby grants to You a perpetual,
worldwide, non-exclusive, no-charge, royalty-free, irrevocable
(except as stated in this section) patent license to make, have made,
use, offer to sell, sell, import, and otherwise transfer the Work,
where such license applies only to those patent claims licensable
by such Contributor that are necessarily infringed by their
Contribution(s) alone or by combination of their Contribution(s)
with the Work to which such Contribution(s) was submitted. If You
institute patent litigation against any entity (including a
cross-claim or counterclaim in a lawsuit) alleging that the Work
or a Contribution incorporated within the Work constitutes direct
or contributory patent infringement, then any patent licenses
granted to You under this License for that Work shall terminate
as of the date such litigation is filed.
4. Redistribution. You may reproduce and distribute copies of the
Work or Derivative Works thereof in any medium, with or without
modifications, and in Source or Object form, provided that You
meet the following conditions:
(a) You must give any other recipients of the Work or
Derivative Works a copy of this License; and
(b) You must cause any modified files to carry prominent notices
stating that You changed the files; and
(c) You must retain, in the Source form of any Derivative Works
that You distribute, all copyright, patent, trademark, and
attribution notices from the Source form of the Work,
excluding those notices that do not pertain to any part of
the Derivative Works; and
(d) If the Work includes a "NOTICE" text file as part of its
distribution, then any Derivative Works that You distribute must
include a readable copy of the attribution notices contained
within such NOTICE file, excluding those notices that do not
pertain to any part of the Derivative Works, in at least one
of the following places: within a NOTICE text file distributed
as part of the Derivative Works; within the Source form or
documentation, if provided along with the Derivative Works; or,
within a display generated by the Derivative Works, if and
wherever such third-party notices normally appear. The contents
of the NOTICE file are for informational purposes only and
do not modify the License. You may add Your own attribution
notices within Derivative Works that You distribute, alongside
or as an addendum to the NOTICE text from the Work, provided
that such additional attribution notices cannot be construed
as modifying the License.
You may add Your own copyright statement to Your modifications and
may provide additional or different license terms and conditions
for use, reproduction, or distribution of Your modifications, or
for any such Derivative Works as a whole, provided Your use,
reproduction, and distribution of the Work otherwise complies with
the conditions stated in this License.
5. Submission of Contributions. Unless You explicitly state otherwise,
any Contribution intentionally submitted for inclusion in the Work
by You to the Licensor shall be under the terms and conditions of
this License, without any additional terms or conditions.
Notwithstanding the above, nothing herein shall supersede or modify
the terms of any separate license agreement you may have executed
with Licensor regarding such Contributions.
6. Trademarks. This License does not grant permission to use the trade
names, trademarks, service marks, or product names of the Licensor,
except as required for reasonable and customary use in describing the
origin of the Work and reproducing the content of the NOTICE file.
7. Disclaimer of Warranty. Unless required by applicable law or
agreed to in writing, Licensor provides the Work (and each
Contributor provides its Contributions) on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
implied, including, without limitation, any warranties or conditions
of TITLE, NON-INFRINGEMENT, MERCHANTABILITY, or FITNESS FOR A
PARTICULAR PURPOSE. You are solely responsible for determining the
appropriateness of using or redistributing the Work and assume any
risks associated with Your exercise of permissions under this License.
8. Limitation of Liability. In no event and under no legal theory,
whether in tort (including negligence), contract, or otherwise,
unless required by applicable law (such as deliberate and grossly
negligent acts) or agreed to in writing, shall any Contributor be
liable to You for damages, including any direct, indirect, special,
incidental, or consequential damages of any character arising as a
result of this License or out of the use or inability to use the
Work (including but not limited to damages for loss of goodwill,
work stoppage, computer failure or malfunction, or any and all
other commercial damages or losses), even if such Contributor
has been advised of the possibility of such damages.
9. Accepting Warranty or Additional Liability. While redistributing
the Work or Derivative Works thereof, You may choose to offer,
and charge a fee for, acceptance of support, warranty, indemnity,
or other liability obligations and/or rights consistent with this
License. However, in accepting such obligations, You may act only
on Your own behalf and on Your sole responsibility, not on behalf
of any other Contributor, and only if You agree to indemnify,
defend, and hold each Contributor harmless for any liability
incurred by, or claims asserted against, such Contributor by reason
of your accepting any such warranty or additional liability.
END OF TERMS AND CONDITIONS
APPENDIX: How to apply the Apache License to your work.
To apply the Apache License to your work, attach the following
boilerplate notice, with the fields enclosed by brackets "[]"
replaced with your own identifying information. (Don't include
the brackets!) The text should be enclosed in the appropriate
comment syntax for the file format. We also recommend that a
file or class name and description of purpose be included on the
same "printed page" as the copyright notice for easier
identification within third-party archives.
Copyright [yyyy] [name of copyright owner]
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
--------------------------------------------------
SOFTWARE DISTRIBUTED WITH THRIFT:
The Apache Thrift software includes a number of subcomponents with
separate copyright notices and license terms. Your use of the source
code for the these subcomponents is subject to the terms and
conditions of the following licenses.
--------------------------------------------------
Portions of the following files are licensed under the MIT License:
lib/erl/src/Makefile.am
Please see doc/otp-base-license.txt for the full terms of this license.
--------------------------------------------------
For the aclocal/ax_boost_base.m4 and contrib/fb303/aclocal/ax_boost_base.m4 components:
# Copyright (c) 2007 Thomas Porschberg <thomas@randspringer.de>
#
# Copying and distribution of this file, with or without
# modification, are permitted in any medium without royalty provided
# the copyright notice and this notice are preserved.
--------------------------------------------------
For the lib/nodejs/lib/thrift/json_parse.js:
/*
json_parse.js
2015-05-02
Public Domain.
NO WARRANTY EXPRESSED OR IMPLIED. USE AT YOUR OWN RISK.
*/
(By Douglas Crockford <douglas@crockford.com>)
--------------------------------------------------

View File

@@ -3,12 +3,11 @@ package zipkintracer
import (
"bytes"
"net/http"
"sync"
"time"
"github.com/apache/thrift/lib/go/thrift"
"github.com/openzipkin/zipkin-go-opentracing/thrift/gen-go/zipkincore"
"github.com/openzipkin-contrib/zipkin-go-opentracing/thrift/gen-go/zipkincore"
)
// Default timeout for http request in seconds
@@ -29,12 +28,9 @@ type HTTPCollector struct {
batchInterval time.Duration
batchSize int
maxBacklog int
batch []*zipkincore.Span
spanc chan *zipkincore.Span
quit chan struct{}
shutdown chan error
sendMutex *sync.Mutex
batchMutex *sync.Mutex
reqCallback RequestCallback
}
@@ -100,25 +96,32 @@ func NewHTTPCollector(url string, options ...HTTPOption) (Collector, error) {
batchInterval: defaultHTTPBatchInterval * time.Second,
batchSize: defaultHTTPBatchSize,
maxBacklog: defaultHTTPMaxBacklog,
batch: []*zipkincore.Span{},
spanc: make(chan *zipkincore.Span),
quit: make(chan struct{}, 1),
shutdown: make(chan error, 1),
sendMutex: &sync.Mutex{},
batchMutex: &sync.Mutex{},
}
for _, option := range options {
option(c)
}
// spanc can immediately accept maxBacklog spans and everything else is dropped.
c.spanc = make(chan *zipkincore.Span, c.maxBacklog)
go c.loop()
return c, nil
}
// Collect implements Collector.
// attempts a non blocking send on the channel.
func (c *HTTPCollector) Collect(s *zipkincore.Span) error {
c.spanc <- s
select {
case c.spanc <- s:
// Accepted.
case <-c.quit:
// Collector concurrently closed.
default:
c.logger.Log("msg", "queue full, disposing spans.", "size", len(c.spanc))
}
return nil
}
@@ -153,55 +156,35 @@ func (c *HTTPCollector) loop() {
)
defer ticker.Stop()
// The following loop is single threaded
// allocate enough space so we don't have to reallocate.
batch := make([]*zipkincore.Span, 0, c.batchSize)
for {
select {
case span := <-c.spanc:
currentBatchSize := c.append(span)
if currentBatchSize >= c.batchSize {
batch = append(batch, span)
if len(batch) == c.batchSize {
c.send(batch)
batch = batch[0:0]
nextSend = time.Now().Add(c.batchInterval)
go c.send()
}
case <-tickc:
if time.Now().After(nextSend) {
if len(batch) > 0 {
c.send(batch)
batch = batch[0:0]
}
nextSend = time.Now().Add(c.batchInterval)
go c.send()
}
case <-c.quit:
c.shutdown <- c.send()
c.shutdown <- c.send(batch)
return
}
}
}
func (c *HTTPCollector) append(span *zipkincore.Span) (newBatchSize int) {
c.batchMutex.Lock()
defer c.batchMutex.Unlock()
c.batch = append(c.batch, span)
if len(c.batch) > c.maxBacklog {
dispose := len(c.batch) - c.maxBacklog
c.logger.Log("msg", "backlog too long, disposing spans.", "count", dispose)
c.batch = c.batch[dispose:]
}
newBatchSize = len(c.batch)
return
}
func (c *HTTPCollector) send() error {
// in order to prevent sending the same batch twice
c.sendMutex.Lock()
defer c.sendMutex.Unlock()
// Select all current spans in the batch to be sent
c.batchMutex.Lock()
sendBatch := c.batch[:]
c.batchMutex.Unlock()
// Do not send an empty batch
if len(sendBatch) == 0 {
return nil
}
func (c *HTTPCollector) send(sendBatch []*zipkincore.Span) error {
req, err := http.NewRequest(
"POST",
c.url,
@@ -214,15 +197,15 @@ func (c *HTTPCollector) send() error {
if c.reqCallback != nil {
c.reqCallback(req)
}
if _, err = c.client.Do(req); err != nil {
resp, err := c.client.Do(req)
if err != nil {
c.logger.Log("err", err.Error())
return err
}
// Remove sent spans from the batch
c.batchMutex.Lock()
c.batch = c.batch[len(sendBatch):]
c.batchMutex.Unlock()
resp.Body.Close()
// non 2xx code
if resp.StatusCode < 200 || resp.StatusCode >= 300 {
c.logger.Log("err", "HTTP POST span failed", "code", resp.Status)
}
return nil
}

View File

@@ -4,7 +4,7 @@ import (
"github.com/Shopify/sarama"
"github.com/apache/thrift/lib/go/thrift"
"github.com/openzipkin/zipkin-go-opentracing/thrift/gen-go/zipkincore"
"github.com/openzipkin-contrib/zipkin-go-opentracing/thrift/gen-go/zipkincore"
)
// defaultKafkaTopic sets the standard Kafka topic our Collector will publish

View File

@@ -1,6 +1,7 @@
package zipkintracer
import (
"context"
"encoding/base64"
"fmt"
"net"
@@ -9,8 +10,8 @@ import (
"github.com/apache/thrift/lib/go/thrift"
"github.com/openzipkin/zipkin-go-opentracing/thrift/gen-go/scribe"
"github.com/openzipkin/zipkin-go-opentracing/thrift/gen-go/zipkincore"
"github.com/openzipkin-contrib/zipkin-go-opentracing/thrift/gen-go/scribe"
"github.com/openzipkin-contrib/zipkin-go-opentracing/thrift/gen-go/zipkincore"
)
const defaultScribeCategory = "zipkin"
@@ -112,8 +113,13 @@ func NewScribeCollector(addr string, timeout time.Duration, options ...ScribeOpt
// Collect implements Collector.
func (c *ScribeCollector) Collect(s *zipkincore.Span) error {
c.spanc <- s
return nil // accepted
select {
case c.spanc <- s:
// Accepted.
case <-c.quit:
// Collector concurrently closed.
}
return nil
}
// Close implements Collector.
@@ -198,7 +204,7 @@ func (c *ScribeCollector) send() error {
return err
}
}
if rc, err := c.client.Log(sendBatch); err != nil {
if rc, err := c.client.Log(context.Background(), sendBatch); err != nil {
c.client = nil
_ = c.logger.Log("err", fmt.Sprintf("during Log: %v", err))
return err

View File

@@ -3,7 +3,7 @@ package zipkintracer
import (
"strings"
"github.com/openzipkin/zipkin-go-opentracing/thrift/gen-go/zipkincore"
"github.com/openzipkin-contrib/zipkin-go-opentracing/thrift/gen-go/zipkincore"
)
// Collector represents a Zipkin trace collector, which is probably a set of
@@ -47,6 +47,11 @@ func (c MultiCollector) aggregateErrors(f func(c Collector) error) error {
e.errs[i] = err
}
}
if e == nil {
return nil
}
return e
}

View File

@@ -1,8 +1,8 @@
package zipkintracer
import (
"github.com/openzipkin/zipkin-go-opentracing/flag"
"github.com/openzipkin/zipkin-go-opentracing/types"
"github.com/openzipkin-contrib/zipkin-go-opentracing/flag"
"github.com/openzipkin-contrib/zipkin-go-opentracing/types"
)
// SpanContext holds the basic Span metadata.

View File

@@ -1,8 +1,8 @@
package zipkintracer
import (
opentracing "github.com/opentracing/opentracing-go"
otobserver "github.com/opentracing-contrib/go-observer"
opentracing "github.com/opentracing/opentracing-go"
)
// observer is a dispatcher to other observers

View File

@@ -3,8 +3,8 @@ package zipkintracer
import (
opentracing "github.com/opentracing/opentracing-go"
"github.com/openzipkin/zipkin-go-opentracing/flag"
"github.com/openzipkin/zipkin-go-opentracing/types"
"github.com/openzipkin-contrib/zipkin-go-opentracing/flag"
"github.com/openzipkin-contrib/zipkin-go-opentracing/types"
)
type accessorPropagator struct {

View File

@@ -2,6 +2,7 @@ package zipkintracer
import (
"encoding/binary"
"fmt"
"io"
"strconv"
"strings"
@@ -9,9 +10,9 @@ import (
"github.com/gogo/protobuf/proto"
opentracing "github.com/opentracing/opentracing-go"
"github.com/openzipkin/zipkin-go-opentracing/flag"
"github.com/openzipkin/zipkin-go-opentracing/types"
"github.com/openzipkin/zipkin-go-opentracing/wire"
"github.com/openzipkin-contrib/zipkin-go-opentracing/flag"
"github.com/openzipkin-contrib/zipkin-go-opentracing/types"
"github.com/openzipkin-contrib/zipkin-go-opentracing/wire"
)
type textMapPropagator struct {
@@ -25,12 +26,11 @@ const (
prefixTracerState = "x-b3-" // we default to interop with non-opentracing zipkin tracers
prefixBaggage = "ot-baggage-"
tracerStateFieldCount = 3 // not 5, X-B3-ParentSpanId is optional and we allow optional Sampled header
zipkinTraceID = prefixTracerState + "traceid"
zipkinSpanID = prefixTracerState + "spanid"
zipkinParentSpanID = prefixTracerState + "parentspanid"
zipkinSampled = prefixTracerState + "sampled"
zipkinFlags = prefixTracerState + "flags"
zipkinTraceID = prefixTracerState + "traceid"
zipkinSpanID = prefixTracerState + "spanid"
zipkinParentSpanID = prefixTracerState + "parentspanid"
zipkinSampled = prefixTracerState + "sampled"
zipkinFlags = prefixTracerState + "flags"
)
func (p *textMapPropagator) Inject(
@@ -45,14 +45,23 @@ func (p *textMapPropagator) Inject(
if !ok {
return opentracing.ErrInvalidCarrier
}
carrier.Set(zipkinTraceID, sc.TraceID.ToHex())
carrier.Set(zipkinSpanID, strconv.FormatUint(sc.SpanID, 16))
carrier.Set(zipkinSampled, strconv.FormatBool(sc.Sampled))
if sc.ParentSpanID != nil {
// we only set ParentSpanID header if there is a parent span
carrier.Set(zipkinParentSpanID, strconv.FormatUint(*sc.ParentSpanID, 16))
// only inject IDs if both trace ID and span ID are present
if !sc.TraceID.Empty() && sc.SpanID > 0 {
carrier.Set(zipkinTraceID, sc.TraceID.ToHex())
carrier.Set(zipkinSpanID, fmt.Sprintf("%016x", sc.SpanID))
if sc.ParentSpanID != nil {
// we only set ParentSpanID header if there is a parent span
carrier.Set(zipkinParentSpanID, fmt.Sprintf("%016x", *sc.ParentSpanID))
}
}
if sc.Sampled {
carrier.Set(zipkinSampled, "1")
} else {
carrier.Set(zipkinSampled, "0")
}
// we only need to inject the debug flag if set. see flag package for details.
flags := sc.Flags & flag.Debug
carrier.Set(zipkinFlags, strconv.FormatUint(uint64(flags), 10))
@@ -80,6 +89,8 @@ func (p *textMapPropagator) Extract(
err error
)
decodedBaggage := make(map[string]string)
var traceIDFound, spanIDFound bool
err = carrier.ForeachKey(func(k, v string) error {
switch strings.ToLower(k) {
case zipkinTraceID:
@@ -87,11 +98,21 @@ func (p *textMapPropagator) Extract(
if err != nil {
return opentracing.ErrSpanContextCorrupted
}
// mark TraceID as found
if !traceIDFound {
requiredFieldCount++
traceIDFound = true
}
case zipkinSpanID:
spanID, err = strconv.ParseUint(v, 16, 64)
if err != nil {
return opentracing.ErrSpanContextCorrupted
}
// mark SpanID as found
if !spanIDFound {
requiredFieldCount++
spanIDFound = true
}
case zipkinParentSpanID:
var id uint64
id, err = strconv.ParseUint(v, 16, 64)
@@ -120,19 +141,15 @@ func (p *textMapPropagator) Extract(
if strings.HasPrefix(lowercaseK, prefixBaggage) {
decodedBaggage[strings.TrimPrefix(lowercaseK, prefixBaggage)] = v
}
// Balance off the requiredFieldCount++ just below...
requiredFieldCount--
}
requiredFieldCount++
return nil
})
if err != nil {
return nil, err
}
if requiredFieldCount < tracerStateFieldCount {
if requiredFieldCount == 0 {
return nil, opentracing.ErrSpanContextNotFound
}
// required fields must both be present or neither be present
if requiredFieldCount != 0 && requiredFieldCount != 2 {
return nil, opentracing.ErrSpanContextCorrupted
}

View File

@@ -8,8 +8,8 @@ import (
"github.com/opentracing/opentracing-go/ext"
"github.com/opentracing/opentracing-go/log"
"github.com/openzipkin/zipkin-go-opentracing/thrift/gen-go/zipkincore"
otobserver "github.com/opentracing-contrib/go-observer"
"github.com/openzipkin-contrib/zipkin-go-opentracing/thrift/gen-go/zipkincore"
)
// Span provides access to the essential details of the span, for use

View File

@@ -0,0 +1,7 @@
// Autogenerated by Thrift Compiler (1.0.0-dev)
// DO NOT EDIT UNLESS YOU ARE SURE THAT YOU KNOW WHAT YOU ARE DOING
package scribe
var GoUnusedProtection__ int;

View File

@@ -1,10 +1,12 @@
// Autogenerated by Thrift Compiler (0.9.3)
// Autogenerated by Thrift Compiler (1.0.0-dev)
// DO NOT EDIT UNLESS YOU ARE SURE THAT YOU KNOW WHAT YOU ARE DOING
package scribe
import (
"bytes"
"context"
"reflect"
"fmt"
"github.com/apache/thrift/lib/go/thrift"
)
@@ -12,7 +14,11 @@ import (
// (needed to ensure safety because of naive import list construction.)
var _ = thrift.ZERO
var _ = fmt.Printf
var _ = context.Background
var _ = reflect.DeepEqual
var _ = bytes.Equal
func init() {
}

View File

@@ -0,0 +1,551 @@
// Autogenerated by Thrift Compiler (1.0.0-dev)
// DO NOT EDIT UNLESS YOU ARE SURE THAT YOU KNOW WHAT YOU ARE DOING
package scribe
import (
"bytes"
"context"
"reflect"
"database/sql/driver"
"errors"
"fmt"
"github.com/apache/thrift/lib/go/thrift"
)
// (needed to ensure safety because of naive import list construction.)
var _ = thrift.ZERO
var _ = fmt.Printf
var _ = context.Background
var _ = reflect.DeepEqual
var _ = bytes.Equal
type ResultCode int64
const (
ResultCode_OK ResultCode = 0
ResultCode_TRY_LATER ResultCode = 1
)
func (p ResultCode) String() string {
switch p {
case ResultCode_OK: return "OK"
case ResultCode_TRY_LATER: return "TRY_LATER"
}
return "<UNSET>"
}
func ResultCodeFromString(s string) (ResultCode, error) {
switch s {
case "OK": return ResultCode_OK, nil
case "TRY_LATER": return ResultCode_TRY_LATER, nil
}
return ResultCode(0), fmt.Errorf("not a valid ResultCode string")
}
func ResultCodePtr(v ResultCode) *ResultCode { return &v }
func (p ResultCode) MarshalText() ([]byte, error) {
return []byte(p.String()), nil
}
func (p *ResultCode) UnmarshalText(text []byte) error {
q, err := ResultCodeFromString(string(text))
if (err != nil) {
return err
}
*p = q
return nil
}
func (p *ResultCode) Scan(value interface{}) error {
v, ok := value.(int64)
if !ok {
return errors.New("Scan value is not int64")
}
*p = ResultCode(v)
return nil
}
func (p * ResultCode) Value() (driver.Value, error) {
if p == nil {
return nil, nil
}
return int64(*p), nil
}
// Attributes:
// - Category
// - Message
type LogEntry struct {
Category string `thrift:"category,1" db:"category" json:"category"`
Message string `thrift:"message,2" db:"message" json:"message"`
}
func NewLogEntry() *LogEntry {
return &LogEntry{}
}
func (p *LogEntry) GetCategory() string {
return p.Category
}
func (p *LogEntry) GetMessage() string {
return p.Message
}
func (p *LogEntry) Read(iprot thrift.TProtocol) error {
if _, err := iprot.ReadStructBegin(); err != nil {
return thrift.PrependError(fmt.Sprintf("%T read error: ", p), err)
}
for {
_, fieldTypeId, fieldId, err := iprot.ReadFieldBegin()
if err != nil {
return thrift.PrependError(fmt.Sprintf("%T field %d read error: ", p, fieldId), err)
}
if fieldTypeId == thrift.STOP { break; }
switch fieldId {
case 1:
if fieldTypeId == thrift.STRING {
if err := p.ReadField1(iprot); err != nil {
return err
}
} else {
if err := iprot.Skip(fieldTypeId); err != nil {
return err
}
}
case 2:
if fieldTypeId == thrift.STRING {
if err := p.ReadField2(iprot); err != nil {
return err
}
} else {
if err := iprot.Skip(fieldTypeId); err != nil {
return err
}
}
default:
if err := iprot.Skip(fieldTypeId); err != nil {
return err
}
}
if err := iprot.ReadFieldEnd(); err != nil {
return err
}
}
if err := iprot.ReadStructEnd(); err != nil {
return thrift.PrependError(fmt.Sprintf("%T read struct end error: ", p), err)
}
return nil
}
func (p *LogEntry) ReadField1(iprot thrift.TProtocol) error {
if v, err := iprot.ReadString(); err != nil {
return thrift.PrependError("error reading field 1: ", err)
} else {
p.Category = v
}
return nil
}
func (p *LogEntry) ReadField2(iprot thrift.TProtocol) error {
if v, err := iprot.ReadString(); err != nil {
return thrift.PrependError("error reading field 2: ", err)
} else {
p.Message = v
}
return nil
}
func (p *LogEntry) Write(oprot thrift.TProtocol) error {
if err := oprot.WriteStructBegin("LogEntry"); err != nil {
return thrift.PrependError(fmt.Sprintf("%T write struct begin error: ", p), err) }
if p != nil {
if err := p.writeField1(oprot); err != nil { return err }
if err := p.writeField2(oprot); err != nil { return err }
}
if err := oprot.WriteFieldStop(); err != nil {
return thrift.PrependError("write field stop error: ", err) }
if err := oprot.WriteStructEnd(); err != nil {
return thrift.PrependError("write struct stop error: ", err) }
return nil
}
func (p *LogEntry) writeField1(oprot thrift.TProtocol) (err error) {
if err := oprot.WriteFieldBegin("category", thrift.STRING, 1); err != nil {
return thrift.PrependError(fmt.Sprintf("%T write field begin error 1:category: ", p), err) }
if err := oprot.WriteString(string(p.Category)); err != nil {
return thrift.PrependError(fmt.Sprintf("%T.category (1) field write error: ", p), err) }
if err := oprot.WriteFieldEnd(); err != nil {
return thrift.PrependError(fmt.Sprintf("%T write field end error 1:category: ", p), err) }
return err
}
func (p *LogEntry) writeField2(oprot thrift.TProtocol) (err error) {
if err := oprot.WriteFieldBegin("message", thrift.STRING, 2); err != nil {
return thrift.PrependError(fmt.Sprintf("%T write field begin error 2:message: ", p), err) }
if err := oprot.WriteString(string(p.Message)); err != nil {
return thrift.PrependError(fmt.Sprintf("%T.message (2) field write error: ", p), err) }
if err := oprot.WriteFieldEnd(); err != nil {
return thrift.PrependError(fmt.Sprintf("%T write field end error 2:message: ", p), err) }
return err
}
func (p *LogEntry) String() string {
if p == nil {
return "<nil>"
}
return fmt.Sprintf("LogEntry(%+v)", *p)
}
type Scribe interface {
// Parameters:
// - Messages
Log(ctx context.Context, messages []*LogEntry) (r ResultCode, err error)
}
type ScribeClient struct {
c thrift.TClient
}
func NewScribeClientFactory(t thrift.TTransport, f thrift.TProtocolFactory) *ScribeClient {
return &ScribeClient{
c: thrift.NewTStandardClient(f.GetProtocol(t), f.GetProtocol(t)),
}
}
func NewScribeClientProtocol(t thrift.TTransport, iprot thrift.TProtocol, oprot thrift.TProtocol) *ScribeClient {
return &ScribeClient{
c: thrift.NewTStandardClient(iprot, oprot),
}
}
func NewScribeClient(c thrift.TClient) *ScribeClient {
return &ScribeClient{
c: c,
}
}
func (p *ScribeClient) Client_() thrift.TClient {
return p.c
}
// Parameters:
// - Messages
func (p *ScribeClient) Log(ctx context.Context, messages []*LogEntry) (r ResultCode, err error) {
var _args0 ScribeLogArgs
_args0.Messages = messages
var _result1 ScribeLogResult
if err = p.Client_().Call(ctx, "Log", &_args0, &_result1); err != nil {
return
}
return _result1.GetSuccess(), nil
}
type ScribeProcessor struct {
processorMap map[string]thrift.TProcessorFunction
handler Scribe
}
func (p *ScribeProcessor) AddToProcessorMap(key string, processor thrift.TProcessorFunction) {
p.processorMap[key] = processor
}
func (p *ScribeProcessor) GetProcessorFunction(key string) (processor thrift.TProcessorFunction, ok bool) {
processor, ok = p.processorMap[key]
return processor, ok
}
func (p *ScribeProcessor) ProcessorMap() map[string]thrift.TProcessorFunction {
return p.processorMap
}
func NewScribeProcessor(handler Scribe) *ScribeProcessor {
self2 := &ScribeProcessor{handler:handler, processorMap:make(map[string]thrift.TProcessorFunction)}
self2.processorMap["Log"] = &scribeProcessorLog{handler:handler}
return self2
}
func (p *ScribeProcessor) Process(ctx context.Context, iprot, oprot thrift.TProtocol) (success bool, err thrift.TException) {
name, _, seqId, err := iprot.ReadMessageBegin()
if err != nil { return false, err }
if processor, ok := p.GetProcessorFunction(name); ok {
return processor.Process(ctx, seqId, iprot, oprot)
}
iprot.Skip(thrift.STRUCT)
iprot.ReadMessageEnd()
x3 := thrift.NewTApplicationException(thrift.UNKNOWN_METHOD, "Unknown function " + name)
oprot.WriteMessageBegin(name, thrift.EXCEPTION, seqId)
x3.Write(oprot)
oprot.WriteMessageEnd()
oprot.Flush(ctx)
return false, x3
}
type scribeProcessorLog struct {
handler Scribe
}
func (p *scribeProcessorLog) Process(ctx context.Context, seqId int32, iprot, oprot thrift.TProtocol) (success bool, err thrift.TException) {
args := ScribeLogArgs{}
if err = args.Read(iprot); err != nil {
iprot.ReadMessageEnd()
x := thrift.NewTApplicationException(thrift.PROTOCOL_ERROR, err.Error())
oprot.WriteMessageBegin("Log", thrift.EXCEPTION, seqId)
x.Write(oprot)
oprot.WriteMessageEnd()
oprot.Flush(ctx)
return false, err
}
iprot.ReadMessageEnd()
result := ScribeLogResult{}
var retval ResultCode
var err2 error
if retval, err2 = p.handler.Log(ctx, args.Messages); err2 != nil {
x := thrift.NewTApplicationException(thrift.INTERNAL_ERROR, "Internal error processing Log: " + err2.Error())
oprot.WriteMessageBegin("Log", thrift.EXCEPTION, seqId)
x.Write(oprot)
oprot.WriteMessageEnd()
oprot.Flush(ctx)
return true, err2
} else {
result.Success = &retval
}
if err2 = oprot.WriteMessageBegin("Log", thrift.REPLY, seqId); err2 != nil {
err = err2
}
if err2 = result.Write(oprot); err == nil && err2 != nil {
err = err2
}
if err2 = oprot.WriteMessageEnd(); err == nil && err2 != nil {
err = err2
}
if err2 = oprot.Flush(ctx); err == nil && err2 != nil {
err = err2
}
if err != nil {
return
}
return true, err
}
// HELPER FUNCTIONS AND STRUCTURES
// Attributes:
// - Messages
type ScribeLogArgs struct {
Messages []*LogEntry `thrift:"messages,1" db:"messages" json:"messages"`
}
func NewScribeLogArgs() *ScribeLogArgs {
return &ScribeLogArgs{}
}
func (p *ScribeLogArgs) GetMessages() []*LogEntry {
return p.Messages
}
func (p *ScribeLogArgs) Read(iprot thrift.TProtocol) error {
if _, err := iprot.ReadStructBegin(); err != nil {
return thrift.PrependError(fmt.Sprintf("%T read error: ", p), err)
}
for {
_, fieldTypeId, fieldId, err := iprot.ReadFieldBegin()
if err != nil {
return thrift.PrependError(fmt.Sprintf("%T field %d read error: ", p, fieldId), err)
}
if fieldTypeId == thrift.STOP { break; }
switch fieldId {
case 1:
if fieldTypeId == thrift.LIST {
if err := p.ReadField1(iprot); err != nil {
return err
}
} else {
if err := iprot.Skip(fieldTypeId); err != nil {
return err
}
}
default:
if err := iprot.Skip(fieldTypeId); err != nil {
return err
}
}
if err := iprot.ReadFieldEnd(); err != nil {
return err
}
}
if err := iprot.ReadStructEnd(); err != nil {
return thrift.PrependError(fmt.Sprintf("%T read struct end error: ", p), err)
}
return nil
}
func (p *ScribeLogArgs) ReadField1(iprot thrift.TProtocol) error {
_, size, err := iprot.ReadListBegin()
if err != nil {
return thrift.PrependError("error reading list begin: ", err)
}
tSlice := make([]*LogEntry, 0, size)
p.Messages = tSlice
for i := 0; i < size; i ++ {
_elem4 := &LogEntry{}
if err := _elem4.Read(iprot); err != nil {
return thrift.PrependError(fmt.Sprintf("%T error reading struct: ", _elem4), err)
}
p.Messages = append(p.Messages, _elem4)
}
if err := iprot.ReadListEnd(); err != nil {
return thrift.PrependError("error reading list end: ", err)
}
return nil
}
func (p *ScribeLogArgs) Write(oprot thrift.TProtocol) error {
if err := oprot.WriteStructBegin("Log_args"); err != nil {
return thrift.PrependError(fmt.Sprintf("%T write struct begin error: ", p), err) }
if p != nil {
if err := p.writeField1(oprot); err != nil { return err }
}
if err := oprot.WriteFieldStop(); err != nil {
return thrift.PrependError("write field stop error: ", err) }
if err := oprot.WriteStructEnd(); err != nil {
return thrift.PrependError("write struct stop error: ", err) }
return nil
}
func (p *ScribeLogArgs) writeField1(oprot thrift.TProtocol) (err error) {
if err := oprot.WriteFieldBegin("messages", thrift.LIST, 1); err != nil {
return thrift.PrependError(fmt.Sprintf("%T write field begin error 1:messages: ", p), err) }
if err := oprot.WriteListBegin(thrift.STRUCT, len(p.Messages)); err != nil {
return thrift.PrependError("error writing list begin: ", err)
}
for _, v := range p.Messages {
if err := v.Write(oprot); err != nil {
return thrift.PrependError(fmt.Sprintf("%T error writing struct: ", v), err)
}
}
if err := oprot.WriteListEnd(); err != nil {
return thrift.PrependError("error writing list end: ", err)
}
if err := oprot.WriteFieldEnd(); err != nil {
return thrift.PrependError(fmt.Sprintf("%T write field end error 1:messages: ", p), err) }
return err
}
func (p *ScribeLogArgs) String() string {
if p == nil {
return "<nil>"
}
return fmt.Sprintf("ScribeLogArgs(%+v)", *p)
}
// Attributes:
// - Success
type ScribeLogResult struct {
Success *ResultCode `thrift:"success,0" db:"success" json:"success,omitempty"`
}
func NewScribeLogResult() *ScribeLogResult {
return &ScribeLogResult{}
}
var ScribeLogResult_Success_DEFAULT ResultCode
func (p *ScribeLogResult) GetSuccess() ResultCode {
if !p.IsSetSuccess() {
return ScribeLogResult_Success_DEFAULT
}
return *p.Success
}
func (p *ScribeLogResult) IsSetSuccess() bool {
return p.Success != nil
}
func (p *ScribeLogResult) Read(iprot thrift.TProtocol) error {
if _, err := iprot.ReadStructBegin(); err != nil {
return thrift.PrependError(fmt.Sprintf("%T read error: ", p), err)
}
for {
_, fieldTypeId, fieldId, err := iprot.ReadFieldBegin()
if err != nil {
return thrift.PrependError(fmt.Sprintf("%T field %d read error: ", p, fieldId), err)
}
if fieldTypeId == thrift.STOP { break; }
switch fieldId {
case 0:
if fieldTypeId == thrift.I32 {
if err := p.ReadField0(iprot); err != nil {
return err
}
} else {
if err := iprot.Skip(fieldTypeId); err != nil {
return err
}
}
default:
if err := iprot.Skip(fieldTypeId); err != nil {
return err
}
}
if err := iprot.ReadFieldEnd(); err != nil {
return err
}
}
if err := iprot.ReadStructEnd(); err != nil {
return thrift.PrependError(fmt.Sprintf("%T read struct end error: ", p), err)
}
return nil
}
func (p *ScribeLogResult) ReadField0(iprot thrift.TProtocol) error {
if v, err := iprot.ReadI32(); err != nil {
return thrift.PrependError("error reading field 0: ", err)
} else {
temp := ResultCode(v)
p.Success = &temp
}
return nil
}
func (p *ScribeLogResult) Write(oprot thrift.TProtocol) error {
if err := oprot.WriteStructBegin("Log_result"); err != nil {
return thrift.PrependError(fmt.Sprintf("%T write struct begin error: ", p), err) }
if p != nil {
if err := p.writeField0(oprot); err != nil { return err }
}
if err := oprot.WriteFieldStop(); err != nil {
return thrift.PrependError("write field stop error: ", err) }
if err := oprot.WriteStructEnd(); err != nil {
return thrift.PrependError("write struct stop error: ", err) }
return nil
}
func (p *ScribeLogResult) writeField0(oprot thrift.TProtocol) (err error) {
if p.IsSetSuccess() {
if err := oprot.WriteFieldBegin("success", thrift.I32, 0); err != nil {
return thrift.PrependError(fmt.Sprintf("%T write field begin error 0:success: ", p), err) }
if err := oprot.WriteI32(int32(*p.Success)); err != nil {
return thrift.PrependError(fmt.Sprintf("%T.success (0) field write error: ", p), err) }
if err := oprot.WriteFieldEnd(); err != nil {
return thrift.PrependError(fmt.Sprintf("%T write field end error 0:success: ", p), err) }
}
return err
}
func (p *ScribeLogResult) String() string {
if p == nil {
return "<nil>"
}
return fmt.Sprintf("ScribeLogResult(%+v)", *p)
}

View File

@@ -0,0 +1,7 @@
// Autogenerated by Thrift Compiler (1.0.0-dev)
// DO NOT EDIT UNLESS YOU ARE SURE THAT YOU KNOW WHAT YOU ARE DOING
package zipkincore
var GoUnusedProtection__ int;

View File

@@ -1,10 +1,12 @@
// Autogenerated by Thrift Compiler (0.9.3)
// Autogenerated by Thrift Compiler (1.0.0-dev)
// DO NOT EDIT UNLESS YOU ARE SURE THAT YOU KNOW WHAT YOU ARE DOING
package zipkincore
import (
"bytes"
"context"
"reflect"
"fmt"
"github.com/apache/thrift/lib/go/thrift"
)
@@ -12,6 +14,8 @@ import (
// (needed to ensure safety because of naive import list construction.)
var _ = thrift.ZERO
var _ = fmt.Printf
var _ = context.Background
var _ = reflect.DeepEqual
var _ = bytes.Equal
const CLIENT_SEND = "cs"
@@ -38,3 +42,4 @@ const SERVER_ADDR = "sa"
func init() {
}

File diff suppressed because it is too large Load Diff

View File

@@ -7,8 +7,8 @@ import (
opentracing "github.com/opentracing/opentracing-go"
"github.com/opentracing/opentracing-go/ext"
"github.com/openzipkin/zipkin-go-opentracing/flag"
otobserver "github.com/opentracing-contrib/go-observer"
"github.com/openzipkin-contrib/zipkin-go-opentracing/flag"
)
// ErrInvalidEndpoint will be thrown if hostPort parameter is corrupted or host

View File

@@ -27,11 +27,9 @@ func TraceIDFromHex(h string) (t TraceID, err error) {
// ToHex outputs the 128-bit traceID as hex string.
func (t TraceID) ToHex() string {
if t.High == 0 {
return strconv.FormatUint(t.Low, 16)
return fmt.Sprintf("%016x", t.Low)
}
return fmt.Sprintf(
"%016s%016s", strconv.FormatUint(t.High, 16), strconv.FormatUint(t.Low, 16),
)
return fmt.Sprintf("%016x%016x", t.High, t.Low)
}
// Empty returns if TraceID has zero value

View File

@@ -1,8 +1,8 @@
package wire
import (
"github.com/openzipkin/zipkin-go-opentracing/flag"
"github.com/openzipkin/zipkin-go-opentracing/types"
"github.com/openzipkin-contrib/zipkin-go-opentracing/flag"
"github.com/openzipkin-contrib/zipkin-go-opentracing/types"
)
// ProtobufCarrier is a DelegatingCarrier that uses protocol buffers as the

View File

@@ -1,6 +1,6 @@
package wire
//go:generate protoc --gogofaster_out=$GOPATH/src/github.com/openzipkin/zipkin-go-opentracing/wire wire.proto
//go:generate protoc --gogofaster_out=$GOPATH/src wire.proto
// Run `go get github.com/gogo/protobuf/protoc-gen-gogofaster` to install the
// gogofaster generator binary.

View File

@@ -624,24 +624,26 @@ var (
func init() { proto.RegisterFile("wire.proto", fileDescriptorWire) }
var fileDescriptorWire = []byte{
// 300 bytes of a gzipped FileDescriptorProto
0x1f, 0x8b, 0x08, 0x00, 0x00, 0x09, 0x6e, 0x88, 0x02, 0xff, 0xe2, 0xe2, 0x2a, 0xcf, 0x2c, 0x4a,
0xd5, 0x2b, 0x28, 0xca, 0x2f, 0xc9, 0x17, 0x12, 0xa9, 0xca, 0x2c, 0xc8, 0xce, 0xcc, 0x2b, 0x29,
0x4a, 0x4c, 0x4e, 0x2d, 0x8a, 0x4f, 0xcf, 0xd7, 0x03, 0xc9, 0x29, 0x5d, 0x63, 0xe2, 0xe2, 0x0e,
0x01, 0x0b, 0x05, 0x97, 0x24, 0x96, 0xa4, 0x0a, 0x49, 0x72, 0x71, 0x80, 0x55, 0xc4, 0x67, 0xa6,
0x48, 0x30, 0x2a, 0x30, 0x6a, 0xb0, 0x05, 0xb1, 0x83, 0xf9, 0x9e, 0x29, 0x42, 0xe2, 0x5c, 0xec,
0xc5, 0x05, 0x89, 0x79, 0x20, 0x19, 0x26, 0xb0, 0x0c, 0x1b, 0x88, 0xeb, 0x99, 0x22, 0x24, 0xc1,
0xc5, 0x5e, 0x9c, 0x98, 0x5b, 0x90, 0x93, 0x9a, 0x22, 0xc1, 0xac, 0xc0, 0xa8, 0xc1, 0x11, 0x04,
0xe3, 0x0a, 0x45, 0x70, 0xf1, 0x26, 0x25, 0xa6, 0xa7, 0x27, 0xa6, 0xa7, 0xc6, 0x67, 0x96, 0xa4,
0xe6, 0x16, 0x4b, 0xb0, 0x28, 0x30, 0x6b, 0x70, 0x1b, 0x19, 0xeb, 0x61, 0x73, 0x8b, 0x1e, 0x92,
0x3b, 0xf4, 0x9c, 0x20, 0xda, 0x3c, 0x41, 0xba, 0x5c, 0xf3, 0x4a, 0x8a, 0x2a, 0x83, 0x78, 0x92,
0x90, 0x84, 0x84, 0x94, 0xb8, 0x78, 0x61, 0xee, 0x8c, 0xcf, 0xc8, 0x4c, 0xcf, 0x90, 0x10, 0x01,
0x3b, 0x89, 0x1b, 0xea, 0x58, 0x8f, 0xcc, 0xf4, 0x0c, 0x21, 0x15, 0x2e, 0xbe, 0x82, 0xc4, 0xa2,
0xd4, 0xbc, 0x92, 0x78, 0x98, 0xbb, 0x45, 0xc1, 0x8a, 0x78, 0x20, 0xa2, 0xc1, 0x10, 0xd7, 0x8b,
0x70, 0xb1, 0xa6, 0xe5, 0x24, 0xa6, 0x17, 0x4b, 0x88, 0x81, 0x25, 0x21, 0x1c, 0x29, 0x7b, 0x2e,
0x41, 0x0c, 0x27, 0x08, 0x09, 0x70, 0x31, 0x67, 0xa7, 0x56, 0x82, 0xc3, 0x85, 0x33, 0x08, 0xc4,
0x04, 0x69, 0x2e, 0x4b, 0xcc, 0x29, 0x4d, 0x05, 0x87, 0x08, 0x67, 0x10, 0x84, 0x63, 0xc5, 0x64,
0xc1, 0xe8, 0x24, 0x76, 0xe2, 0x91, 0x1c, 0xe3, 0x85, 0x47, 0x72, 0x8c, 0x0f, 0x1e, 0xc9, 0x31,
0x4e, 0x78, 0x2c, 0xc7, 0x10, 0xc5, 0x02, 0xf2, 0x64, 0x12, 0x1b, 0x38, 0x36, 0x8c, 0x01, 0x01,
0x00, 0x00, 0xff, 0xff, 0xb5, 0x5e, 0x0d, 0x33, 0x9b, 0x01, 0x00, 0x00,
// 325 bytes of a gzipped FileDescriptorProto
0x1f, 0x8b, 0x08, 0x00, 0x00, 0x09, 0x6e, 0x88, 0x02, 0xff, 0x6c, 0x91, 0xcf, 0x4a, 0x03, 0x31,
0x18, 0xc4, 0x4d, 0xab, 0xfd, 0xf3, 0xb5, 0x15, 0x0d, 0x55, 0x57, 0x0f, 0x4b, 0x29, 0x1e, 0x7a,
0xe9, 0x56, 0xec, 0x45, 0xbc, 0x08, 0x05, 0xc1, 0x5e, 0xb7, 0x1e, 0xc4, 0xcb, 0x92, 0xed, 0xc6,
0x6c, 0x68, 0x9b, 0x84, 0x6c, 0xaa, 0xd4, 0xa7, 0xf0, 0xb1, 0x3c, 0x7a, 0xf2, 0x2c, 0xf5, 0x45,
0x24, 0x89, 0x85, 0x82, 0x9e, 0x76, 0x67, 0xe6, 0x1b, 0xf8, 0x31, 0x01, 0x78, 0xe1, 0x9a, 0x46,
0x4a, 0x4b, 0x23, 0x71, 0xfb, 0x95, 0xab, 0x19, 0x17, 0x46, 0x93, 0x29, 0xd5, 0x09, 0x93, 0x91,
0xcd, 0xba, 0x9f, 0x25, 0x68, 0xdc, 0x3b, 0x6b, 0x62, 0x88, 0xa1, 0xf8, 0x14, 0x6a, 0xee, 0x22,
0xe1, 0x59, 0x80, 0x3a, 0xa8, 0x57, 0x89, 0xab, 0x4e, 0x8f, 0x33, 0x7c, 0x02, 0xd5, 0x42, 0x11,
0x61, 0x93, 0x92, 0x4b, 0x2a, 0x56, 0x8e, 0x33, 0x1c, 0x40, 0xb5, 0x20, 0x0b, 0x35, 0xa7, 0x59,
0x50, 0xee, 0xa0, 0x5e, 0x2d, 0xde, 0x48, 0xfc, 0x00, 0xad, 0x94, 0x30, 0x46, 0x18, 0x4d, 0xb8,
0xa1, 0x8b, 0x22, 0xd8, 0xed, 0x94, 0x7b, 0x8d, 0xcb, 0x61, 0xf4, 0x1f, 0x4b, 0xb4, 0xc5, 0x11,
0x8d, 0x7c, 0x6d, 0x6c, 0x5b, 0xb7, 0xc2, 0xe8, 0x55, 0xdc, 0x4c, 0xb7, 0x2c, 0xdc, 0x85, 0xd6,
0x86, 0x33, 0xc9, 0x39, 0xcb, 0x83, 0xb6, 0x43, 0x6a, 0xfc, 0xc2, 0xde, 0x71, 0x96, 0xe3, 0x73,
0xd8, 0x57, 0x44, 0x53, 0x61, 0x92, 0x0d, 0xf7, 0x91, 0x3b, 0x6a, 0x7a, 0x77, 0xe2, 0xe9, 0xdb,
0xb0, 0xf7, 0x34, 0x27, 0xac, 0x08, 0x8e, 0x5d, 0xe8, 0xc5, 0xd9, 0x0d, 0x1c, 0xfe, 0x41, 0xc0,
0x07, 0x50, 0x9e, 0xd1, 0x95, 0xdb, 0xa5, 0x1e, 0xdb, 0x5f, 0x5b, 0x7e, 0x26, 0xf3, 0x25, 0x75,
0x8b, 0xd4, 0x63, 0x2f, 0xae, 0x4b, 0x57, 0x68, 0x34, 0x7a, 0x5f, 0x87, 0xe8, 0x63, 0x1d, 0xa2,
0xaf, 0x75, 0x88, 0xde, 0xbe, 0xc3, 0x9d, 0xc7, 0x0b, 0xc6, 0x4d, 0xbe, 0x4c, 0xa3, 0xa9, 0x5c,
0x0c, 0xa4, 0xa2, 0xc2, 0x6f, 0x30, 0xf0, 0x9f, 0x3e, 0x93, 0x7d, 0x6b, 0x5a, 0x7e, 0x2e, 0xd8,
0xc0, 0x0e, 0x92, 0x56, 0xdc, 0xcb, 0x0d, 0x7f, 0x02, 0x00, 0x00, 0xff, 0xff, 0x3a, 0xab, 0xcc,
0x6b, 0xc7, 0x01, 0x00, 0x00,
}

View File

@@ -6,7 +6,7 @@ import (
"strconv"
"strings"
"github.com/openzipkin/zipkin-go-opentracing/thrift/gen-go/zipkincore"
"github.com/openzipkin-contrib/zipkin-go-opentracing/thrift/gen-go/zipkincore"
)
// makeEndpoint takes the hostport and service name that represent this Zipkin

View File

@@ -3,7 +3,6 @@ package zipkintracer
import (
"encoding/binary"
"fmt"
"math"
"net"
"strconv"
"time"
@@ -11,8 +10,8 @@ import (
otext "github.com/opentracing/opentracing-go/ext"
"github.com/opentracing/opentracing-go/log"
"github.com/openzipkin/zipkin-go-opentracing/thrift/gen-go/zipkincore"
"github.com/openzipkin/zipkin-go-opentracing/flag"
"github.com/openzipkin-contrib/zipkin-go-opentracing/flag"
"github.com/openzipkin-contrib/zipkin-go-opentracing/thrift/gen-go/zipkincore"
)
var (
@@ -162,6 +161,7 @@ func (r *Recorder) RecordSpan(sp RawSpan) {
default:
annotateBinary(span, zipkincore.LOCAL_COMPONENT, r.endpoint.GetServiceName(), r.endpoint)
}
delete(sp.Tags, string(otext.SpanKind))
} else {
annotateBinary(span, zipkincore.LOCAL_COMPONENT, r.endpoint.GetServiceName(), r.endpoint)
}
@@ -202,86 +202,17 @@ func annotate(span *zipkincore.Span, timestamp time.Time, value string, host *zi
// annotateBinary annotates the span with a key and a value that will be []byte
// encoded.
func annotateBinary(span *zipkincore.Span, key string, value interface{}, host *zipkincore.Endpoint) {
var a zipkincore.AnnotationType
var b []byte
// We are not using zipkincore.AnnotationType_I16 for types that could fit
// as reporting on it seems to be broken on the zipkin web interface
// (however, we can properly extract the number from zipkin storage
// directly). int64 has issues with negative numbers but seems ok for
// positive numbers needing more than 32 bit.
switch v := value.(type) {
case bool:
a = zipkincore.AnnotationType_BOOL
b = []byte("\x00")
if v {
b = []byte("\x01")
if b, ok := value.(bool); ok {
if b {
value = "true"
} else {
value = "false"
}
case []byte:
a = zipkincore.AnnotationType_BYTES
b = v
case byte:
a = zipkincore.AnnotationType_I32
b = make([]byte, 4)
binary.BigEndian.PutUint32(b, uint32(v))
case int8:
a = zipkincore.AnnotationType_I32
b = make([]byte, 4)
binary.BigEndian.PutUint32(b, uint32(v))
case int16:
a = zipkincore.AnnotationType_I32
b = make([]byte, 4)
binary.BigEndian.PutUint32(b, uint32(v))
case uint16:
a = zipkincore.AnnotationType_I32
b = make([]byte, 4)
binary.BigEndian.PutUint32(b, uint32(v))
case int32:
a = zipkincore.AnnotationType_I32
b = make([]byte, 4)
binary.BigEndian.PutUint32(b, uint32(v))
case uint32:
a = zipkincore.AnnotationType_I32
b = make([]byte, 4)
binary.BigEndian.PutUint32(b, v)
case int64:
a = zipkincore.AnnotationType_I64
b = make([]byte, 8)
binary.BigEndian.PutUint64(b, uint64(v))
case int:
a = zipkincore.AnnotationType_I32
b = make([]byte, 8)
binary.BigEndian.PutUint32(b, uint32(v))
case uint:
a = zipkincore.AnnotationType_I32
b = make([]byte, 8)
binary.BigEndian.PutUint32(b, uint32(v))
case uint64:
a = zipkincore.AnnotationType_I64
b = make([]byte, 8)
binary.BigEndian.PutUint64(b, v)
case float32:
a = zipkincore.AnnotationType_DOUBLE
b = make([]byte, 8)
bits := math.Float64bits(float64(v))
binary.BigEndian.PutUint64(b, bits)
case float64:
a = zipkincore.AnnotationType_DOUBLE
b = make([]byte, 8)
bits := math.Float64bits(v)
binary.BigEndian.PutUint64(b, bits)
case string:
a = zipkincore.AnnotationType_STRING
b = []byte(v)
default:
// we have no handler for type's value, but let's get a string
// representation of it.
a = zipkincore.AnnotationType_STRING
b = []byte(fmt.Sprintf("%+v", value))
}
span.BinaryAnnotations = append(span.BinaryAnnotations, &zipkincore.BinaryAnnotation{
Key: key,
Value: b,
AnnotationType: a,
Value: []byte(fmt.Sprintf("%+v", value)),
AnnotationType: zipkincore.AnnotationType_STRING,
Host: host,
})
}

View File

@@ -1,431 +0,0 @@
// Autogenerated by Thrift Compiler (0.9.3)
// DO NOT EDIT UNLESS YOU ARE SURE THAT YOU KNOW WHAT YOU ARE DOING
package scribe
import (
"bytes"
"fmt"
"github.com/apache/thrift/lib/go/thrift"
)
// (needed to ensure safety because of naive import list construction.)
var _ = thrift.ZERO
var _ = fmt.Printf
var _ = bytes.Equal
type Scribe interface {
// Parameters:
// - Messages
Log(messages []*LogEntry) (r ResultCode, err error)
}
type ScribeClient struct {
Transport thrift.TTransport
ProtocolFactory thrift.TProtocolFactory
InputProtocol thrift.TProtocol
OutputProtocol thrift.TProtocol
SeqId int32
}
func NewScribeClientFactory(t thrift.TTransport, f thrift.TProtocolFactory) *ScribeClient {
return &ScribeClient{Transport: t,
ProtocolFactory: f,
InputProtocol: f.GetProtocol(t),
OutputProtocol: f.GetProtocol(t),
SeqId: 0,
}
}
func NewScribeClientProtocol(t thrift.TTransport, iprot thrift.TProtocol, oprot thrift.TProtocol) *ScribeClient {
return &ScribeClient{Transport: t,
ProtocolFactory: nil,
InputProtocol: iprot,
OutputProtocol: oprot,
SeqId: 0,
}
}
// Parameters:
// - Messages
func (p *ScribeClient) Log(messages []*LogEntry) (r ResultCode, err error) {
if err = p.sendLog(messages); err != nil {
return
}
return p.recvLog()
}
func (p *ScribeClient) sendLog(messages []*LogEntry) (err error) {
oprot := p.OutputProtocol
if oprot == nil {
oprot = p.ProtocolFactory.GetProtocol(p.Transport)
p.OutputProtocol = oprot
}
p.SeqId++
if err = oprot.WriteMessageBegin("Log", thrift.CALL, p.SeqId); err != nil {
return
}
args := ScribeLogArgs{
Messages: messages,
}
if err = args.Write(oprot); err != nil {
return
}
if err = oprot.WriteMessageEnd(); err != nil {
return
}
return oprot.Flush()
}
func (p *ScribeClient) recvLog() (value ResultCode, err error) {
iprot := p.InputProtocol
if iprot == nil {
iprot = p.ProtocolFactory.GetProtocol(p.Transport)
p.InputProtocol = iprot
}
method, mTypeId, seqId, err := iprot.ReadMessageBegin()
if err != nil {
return
}
if method != "Log" {
err = thrift.NewTApplicationException(thrift.WRONG_METHOD_NAME, "Log failed: wrong method name")
return
}
if p.SeqId != seqId {
err = thrift.NewTApplicationException(thrift.BAD_SEQUENCE_ID, "Log failed: out of sequence response")
return
}
if mTypeId == thrift.EXCEPTION {
error0 := thrift.NewTApplicationException(thrift.UNKNOWN_APPLICATION_EXCEPTION, "Unknown Exception")
var error1 error
error1, err = error0.Read(iprot)
if err != nil {
return
}
if err = iprot.ReadMessageEnd(); err != nil {
return
}
err = error1
return
}
if mTypeId != thrift.REPLY {
err = thrift.NewTApplicationException(thrift.INVALID_MESSAGE_TYPE_EXCEPTION, "Log failed: invalid message type")
return
}
result := ScribeLogResult{}
if err = result.Read(iprot); err != nil {
return
}
if err = iprot.ReadMessageEnd(); err != nil {
return
}
value = result.GetSuccess()
return
}
type ScribeProcessor struct {
processorMap map[string]thrift.TProcessorFunction
handler Scribe
}
func (p *ScribeProcessor) AddToProcessorMap(key string, processor thrift.TProcessorFunction) {
p.processorMap[key] = processor
}
func (p *ScribeProcessor) GetProcessorFunction(key string) (processor thrift.TProcessorFunction, ok bool) {
processor, ok = p.processorMap[key]
return processor, ok
}
func (p *ScribeProcessor) ProcessorMap() map[string]thrift.TProcessorFunction {
return p.processorMap
}
func NewScribeProcessor(handler Scribe) *ScribeProcessor {
self2 := &ScribeProcessor{handler: handler, processorMap: make(map[string]thrift.TProcessorFunction)}
self2.processorMap["Log"] = &scribeProcessorLog{handler: handler}
return self2
}
func (p *ScribeProcessor) Process(iprot, oprot thrift.TProtocol) (success bool, err thrift.TException) {
name, _, seqId, err := iprot.ReadMessageBegin()
if err != nil {
return false, err
}
if processor, ok := p.GetProcessorFunction(name); ok {
return processor.Process(seqId, iprot, oprot)
}
iprot.Skip(thrift.STRUCT)
iprot.ReadMessageEnd()
x3 := thrift.NewTApplicationException(thrift.UNKNOWN_METHOD, "Unknown function "+name)
oprot.WriteMessageBegin(name, thrift.EXCEPTION, seqId)
x3.Write(oprot)
oprot.WriteMessageEnd()
oprot.Flush()
return false, x3
}
type scribeProcessorLog struct {
handler Scribe
}
func (p *scribeProcessorLog) Process(seqId int32, iprot, oprot thrift.TProtocol) (success bool, err thrift.TException) {
args := ScribeLogArgs{}
if err = args.Read(iprot); err != nil {
iprot.ReadMessageEnd()
x := thrift.NewTApplicationException(thrift.PROTOCOL_ERROR, err.Error())
oprot.WriteMessageBegin("Log", thrift.EXCEPTION, seqId)
x.Write(oprot)
oprot.WriteMessageEnd()
oprot.Flush()
return false, err
}
iprot.ReadMessageEnd()
result := ScribeLogResult{}
var retval ResultCode
var err2 error
if retval, err2 = p.handler.Log(args.Messages); err2 != nil {
x := thrift.NewTApplicationException(thrift.INTERNAL_ERROR, "Internal error processing Log: "+err2.Error())
oprot.WriteMessageBegin("Log", thrift.EXCEPTION, seqId)
x.Write(oprot)
oprot.WriteMessageEnd()
oprot.Flush()
return true, err2
} else {
result.Success = &retval
}
if err2 = oprot.WriteMessageBegin("Log", thrift.REPLY, seqId); err2 != nil {
err = err2
}
if err2 = result.Write(oprot); err == nil && err2 != nil {
err = err2
}
if err2 = oprot.WriteMessageEnd(); err == nil && err2 != nil {
err = err2
}
if err2 = oprot.Flush(); err == nil && err2 != nil {
err = err2
}
if err != nil {
return
}
return true, err
}
// HELPER FUNCTIONS AND STRUCTURES
// Attributes:
// - Messages
type ScribeLogArgs struct {
Messages []*LogEntry `thrift:"messages,1" json:"messages"`
}
func NewScribeLogArgs() *ScribeLogArgs {
return &ScribeLogArgs{}
}
func (p *ScribeLogArgs) GetMessages() []*LogEntry {
return p.Messages
}
func (p *ScribeLogArgs) Read(iprot thrift.TProtocol) error {
if _, err := iprot.ReadStructBegin(); err != nil {
return thrift.PrependError(fmt.Sprintf("%T read error: ", p), err)
}
for {
_, fieldTypeId, fieldId, err := iprot.ReadFieldBegin()
if err != nil {
return thrift.PrependError(fmt.Sprintf("%T field %d read error: ", p, fieldId), err)
}
if fieldTypeId == thrift.STOP {
break
}
switch fieldId {
case 1:
if err := p.readField1(iprot); err != nil {
return err
}
default:
if err := iprot.Skip(fieldTypeId); err != nil {
return err
}
}
if err := iprot.ReadFieldEnd(); err != nil {
return err
}
}
if err := iprot.ReadStructEnd(); err != nil {
return thrift.PrependError(fmt.Sprintf("%T read struct end error: ", p), err)
}
return nil
}
func (p *ScribeLogArgs) readField1(iprot thrift.TProtocol) error {
_, size, err := iprot.ReadListBegin()
if err != nil {
return thrift.PrependError("error reading list begin: ", err)
}
tSlice := make([]*LogEntry, 0, size)
p.Messages = tSlice
for i := 0; i < size; i++ {
_elem4 := &LogEntry{}
if err := _elem4.Read(iprot); err != nil {
return thrift.PrependError(fmt.Sprintf("%T error reading struct: ", _elem4), err)
}
p.Messages = append(p.Messages, _elem4)
}
if err := iprot.ReadListEnd(); err != nil {
return thrift.PrependError("error reading list end: ", err)
}
return nil
}
func (p *ScribeLogArgs) Write(oprot thrift.TProtocol) error {
if err := oprot.WriteStructBegin("Log_args"); err != nil {
return thrift.PrependError(fmt.Sprintf("%T write struct begin error: ", p), err)
}
if err := p.writeField1(oprot); err != nil {
return err
}
if err := oprot.WriteFieldStop(); err != nil {
return thrift.PrependError("write field stop error: ", err)
}
if err := oprot.WriteStructEnd(); err != nil {
return thrift.PrependError("write struct stop error: ", err)
}
return nil
}
func (p *ScribeLogArgs) writeField1(oprot thrift.TProtocol) (err error) {
if err := oprot.WriteFieldBegin("messages", thrift.LIST, 1); err != nil {
return thrift.PrependError(fmt.Sprintf("%T write field begin error 1:messages: ", p), err)
}
if err := oprot.WriteListBegin(thrift.STRUCT, len(p.Messages)); err != nil {
return thrift.PrependError("error writing list begin: ", err)
}
for _, v := range p.Messages {
if err := v.Write(oprot); err != nil {
return thrift.PrependError(fmt.Sprintf("%T error writing struct: ", v), err)
}
}
if err := oprot.WriteListEnd(); err != nil {
return thrift.PrependError("error writing list end: ", err)
}
if err := oprot.WriteFieldEnd(); err != nil {
return thrift.PrependError(fmt.Sprintf("%T write field end error 1:messages: ", p), err)
}
return err
}
func (p *ScribeLogArgs) String() string {
if p == nil {
return "<nil>"
}
return fmt.Sprintf("ScribeLogArgs(%+v)", *p)
}
// Attributes:
// - Success
type ScribeLogResult struct {
Success *ResultCode `thrift:"success,0" json:"success,omitempty"`
}
func NewScribeLogResult() *ScribeLogResult {
return &ScribeLogResult{}
}
var ScribeLogResult_Success_DEFAULT ResultCode
func (p *ScribeLogResult) GetSuccess() ResultCode {
if !p.IsSetSuccess() {
return ScribeLogResult_Success_DEFAULT
}
return *p.Success
}
func (p *ScribeLogResult) IsSetSuccess() bool {
return p.Success != nil
}
func (p *ScribeLogResult) Read(iprot thrift.TProtocol) error {
if _, err := iprot.ReadStructBegin(); err != nil {
return thrift.PrependError(fmt.Sprintf("%T read error: ", p), err)
}
for {
_, fieldTypeId, fieldId, err := iprot.ReadFieldBegin()
if err != nil {
return thrift.PrependError(fmt.Sprintf("%T field %d read error: ", p, fieldId), err)
}
if fieldTypeId == thrift.STOP {
break
}
switch fieldId {
case 0:
if err := p.readField0(iprot); err != nil {
return err
}
default:
if err := iprot.Skip(fieldTypeId); err != nil {
return err
}
}
if err := iprot.ReadFieldEnd(); err != nil {
return err
}
}
if err := iprot.ReadStructEnd(); err != nil {
return thrift.PrependError(fmt.Sprintf("%T read struct end error: ", p), err)
}
return nil
}
func (p *ScribeLogResult) readField0(iprot thrift.TProtocol) error {
if v, err := iprot.ReadI32(); err != nil {
return thrift.PrependError("error reading field 0: ", err)
} else {
temp := ResultCode(v)
p.Success = &temp
}
return nil
}
func (p *ScribeLogResult) Write(oprot thrift.TProtocol) error {
if err := oprot.WriteStructBegin("Log_result"); err != nil {
return thrift.PrependError(fmt.Sprintf("%T write struct begin error: ", p), err)
}
if err := p.writeField0(oprot); err != nil {
return err
}
if err := oprot.WriteFieldStop(); err != nil {
return thrift.PrependError("write field stop error: ", err)
}
if err := oprot.WriteStructEnd(); err != nil {
return thrift.PrependError("write struct stop error: ", err)
}
return nil
}
func (p *ScribeLogResult) writeField0(oprot thrift.TProtocol) (err error) {
if p.IsSetSuccess() {
if err := oprot.WriteFieldBegin("success", thrift.I32, 0); err != nil {
return thrift.PrependError(fmt.Sprintf("%T write field begin error 0:success: ", p), err)
}
if err := oprot.WriteI32(int32(*p.Success)); err != nil {
return thrift.PrependError(fmt.Sprintf("%T.success (0) field write error: ", p), err)
}
if err := oprot.WriteFieldEnd(); err != nil {
return thrift.PrependError(fmt.Sprintf("%T write field end error 0:success: ", p), err)
}
}
return err
}
func (p *ScribeLogResult) String() string {
if p == nil {
return "<nil>"
}
return fmt.Sprintf("ScribeLogResult(%+v)", *p)
}

View File

@@ -1,185 +0,0 @@
// Autogenerated by Thrift Compiler (0.9.3)
// DO NOT EDIT UNLESS YOU ARE SURE THAT YOU KNOW WHAT YOU ARE DOING
package scribe
import (
"bytes"
"fmt"
"github.com/apache/thrift/lib/go/thrift"
)
// (needed to ensure safety because of naive import list construction.)
var _ = thrift.ZERO
var _ = fmt.Printf
var _ = bytes.Equal
var GoUnusedProtection__ int
type ResultCode int64
const (
ResultCode_OK ResultCode = 0
ResultCode_TRY_LATER ResultCode = 1
)
func (p ResultCode) String() string {
switch p {
case ResultCode_OK:
return "OK"
case ResultCode_TRY_LATER:
return "TRY_LATER"
}
return "<UNSET>"
}
func ResultCodeFromString(s string) (ResultCode, error) {
switch s {
case "OK":
return ResultCode_OK, nil
case "TRY_LATER":
return ResultCode_TRY_LATER, nil
}
return ResultCode(0), fmt.Errorf("not a valid ResultCode string")
}
func ResultCodePtr(v ResultCode) *ResultCode { return &v }
func (p ResultCode) MarshalText() ([]byte, error) {
return []byte(p.String()), nil
}
func (p *ResultCode) UnmarshalText(text []byte) error {
q, err := ResultCodeFromString(string(text))
if err != nil {
return err
}
*p = q
return nil
}
// Attributes:
// - Category
// - Message
type LogEntry struct {
Category string `thrift:"category,1" json:"category"`
Message string `thrift:"message,2" json:"message"`
}
func NewLogEntry() *LogEntry {
return &LogEntry{}
}
func (p *LogEntry) GetCategory() string {
return p.Category
}
func (p *LogEntry) GetMessage() string {
return p.Message
}
func (p *LogEntry) Read(iprot thrift.TProtocol) error {
if _, err := iprot.ReadStructBegin(); err != nil {
return thrift.PrependError(fmt.Sprintf("%T read error: ", p), err)
}
for {
_, fieldTypeId, fieldId, err := iprot.ReadFieldBegin()
if err != nil {
return thrift.PrependError(fmt.Sprintf("%T field %d read error: ", p, fieldId), err)
}
if fieldTypeId == thrift.STOP {
break
}
switch fieldId {
case 1:
if err := p.readField1(iprot); err != nil {
return err
}
case 2:
if err := p.readField2(iprot); err != nil {
return err
}
default:
if err := iprot.Skip(fieldTypeId); err != nil {
return err
}
}
if err := iprot.ReadFieldEnd(); err != nil {
return err
}
}
if err := iprot.ReadStructEnd(); err != nil {
return thrift.PrependError(fmt.Sprintf("%T read struct end error: ", p), err)
}
return nil
}
func (p *LogEntry) readField1(iprot thrift.TProtocol) error {
if v, err := iprot.ReadString(); err != nil {
return thrift.PrependError("error reading field 1: ", err)
} else {
p.Category = v
}
return nil
}
func (p *LogEntry) readField2(iprot thrift.TProtocol) error {
if v, err := iprot.ReadString(); err != nil {
return thrift.PrependError("error reading field 2: ", err)
} else {
p.Message = v
}
return nil
}
func (p *LogEntry) Write(oprot thrift.TProtocol) error {
if err := oprot.WriteStructBegin("LogEntry"); err != nil {
return thrift.PrependError(fmt.Sprintf("%T write struct begin error: ", p), err)
}
if err := p.writeField1(oprot); err != nil {
return err
}
if err := p.writeField2(oprot); err != nil {
return err
}
if err := oprot.WriteFieldStop(); err != nil {
return thrift.PrependError("write field stop error: ", err)
}
if err := oprot.WriteStructEnd(); err != nil {
return thrift.PrependError("write struct stop error: ", err)
}
return nil
}
func (p *LogEntry) writeField1(oprot thrift.TProtocol) (err error) {
if err := oprot.WriteFieldBegin("category", thrift.STRING, 1); err != nil {
return thrift.PrependError(fmt.Sprintf("%T write field begin error 1:category: ", p), err)
}
if err := oprot.WriteString(string(p.Category)); err != nil {
return thrift.PrependError(fmt.Sprintf("%T.category (1) field write error: ", p), err)
}
if err := oprot.WriteFieldEnd(); err != nil {
return thrift.PrependError(fmt.Sprintf("%T write field end error 1:category: ", p), err)
}
return err
}
func (p *LogEntry) writeField2(oprot thrift.TProtocol) (err error) {
if err := oprot.WriteFieldBegin("message", thrift.STRING, 2); err != nil {
return thrift.PrependError(fmt.Sprintf("%T write field begin error 2:message: ", p), err)
}
if err := oprot.WriteString(string(p.Message)); err != nil {
return thrift.PrependError(fmt.Sprintf("%T.message (2) field write error: ", p), err)
}
if err := oprot.WriteFieldEnd(); err != nil {
return thrift.PrependError(fmt.Sprintf("%T write field end error 2:message: ", p), err)
}
return err
}
func (p *LogEntry) String() string {
if p == nil {
return "<nil>"
}
return fmt.Sprintf("LogEntry(%+v)", *p)
}

File diff suppressed because it is too large Load Diff