From ec1b611781756b50cb3d14bf1a9e3374e67fd1e6 Mon Sep 17 00:00:00 2001 From: Roman Vanicek Date: Wed, 3 Apr 2024 00:46:52 +0200 Subject: [PATCH] Initial version of pg_autoconfig --- .gitignore | 1 + Dockerfile | 12 +++ go.mod | 7 ++ go.sum | 16 ++++ main.go | 274 +++++++++++++++++++++++++++++++++++++++++++++++++++++ 5 files changed, 310 insertions(+) create mode 100644 .gitignore create mode 100644 go.mod create mode 100644 go.sum create mode 100644 main.go diff --git a/.gitignore b/.gitignore new file mode 100644 index 0000000..7f3e5ce --- /dev/null +++ b/.gitignore @@ -0,0 +1 @@ +pg_autoconfig \ No newline at end of file diff --git a/Dockerfile b/Dockerfile index 1859689..9e90a72 100644 --- a/Dockerfile +++ b/Dockerfile @@ -1,4 +1,16 @@ +FROM golang:1.21-bullseye AS builder + +WORKDIR /usr/local/src/pg_autoconfig +COPY go.* ./ +COPY *.go ./ +RUN go mod download + +RUN mkdir bin/ && go build -o bin/ ./... + FROM postgis/postgis:16-master + +COPY --from=builder /usr/local/src/pg_autoconfig/bin/pg_autoconfig / + RUN localedef -i cs_CZ -c -f UTF-8 -A /usr/share/locale/locale.alias cz_CZ.UTF-8 && \ apt-get update && apt-get install -y postgresql-16-auto-failover pg-auto-failover-cli && rm -rf /var/lib/apt/lists/* diff --git a/go.mod b/go.mod new file mode 100644 index 0000000..58e0eaa --- /dev/null +++ b/go.mod @@ -0,0 +1,7 @@ +module git.ivasoft.cz/sw/pg_autoconfig + +go 1.21 + +require github.com/sirupsen/logrus v1.9.0 + +require golang.org/x/sys v0.15.0 // indirect diff --git a/go.sum b/go.sum new file mode 100644 index 0000000..ddaaa62 --- /dev/null +++ b/go.sum @@ -0,0 +1,16 @@ +github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= +github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c= +github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= +github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM= +github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4= +github.com/sirupsen/logrus v1.9.0 h1:trlNQbNUG3OdDrDil03MCb1H2o9nJ1x4/5LYw7byDE0= +github.com/sirupsen/logrus v1.9.0/go.mod h1:naHLuLoDiP4jHNo9R0sCBMtWGeIprob74mVsIT4qYEQ= +github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME= +github.com/stretchr/testify v1.7.0 h1:nwc3DEeHmmLAfoZucVR881uASk0Mfjw8xYJ99tb5CcY= +github.com/stretchr/testify v1.7.0/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg= +golang.org/x/sys v0.0.0-20220715151400-c0bba94af5f8/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= +golang.org/x/sys v0.15.0 h1:h48lPFYpsTvQJZF4EKyI4aLHaev3CxivZmv7yZig9pc= +golang.org/x/sys v0.15.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA= +gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= +gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c h1:dUUwHk2QECo/6vqA44rthZ8ie2QXMNeKRTHCNY2nXvo= +gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= diff --git a/main.go b/main.go new file mode 100644 index 0000000..784dd78 --- /dev/null +++ b/main.go @@ -0,0 +1,274 @@ +package main + +import ( + "bytes" + "context" + "encoding/json" + "flag" + "net" + "net/http" + "os" + "os/exec" + "time" + + log "github.com/sirupsen/logrus" +) + +const version = "1.0" +const GroupStatePrimary = "primary" +const GroupStatePrimaryAlone = "wait-primary" +const GroupStateStandby = "secondary" +const DefaultFormation = "default" +const DefaultSocketPath = "/var/run/pg_autoconfig.sock" + +type postgresInstance struct { + // Locally available info + NodeId int `json:"node_id"` + GroupId int `json:"group_id"` + NodeHost string `json:"nodehost"` + NodeName string `json:"nodename"` + NodePort int `json:"nodeport"` + CurrentState string `json:"current_group_state"` + AssignedState string `json:"assigned_group_state"` + + // Monitor provided extended info + Health int `json:"health"` + NodeCluster string `json:"nodecluster"` + ReportedLsn string `json:"reported_lsn"` + ReportedTli int `json:"reported_tli"` + FormationKind string `json:"formation_kind"` + CandidatePriority int `json:"candidate_priority"` + ReplicationQuorum bool `json:"replication_quorum"` +} + +type configFiles struct { + Config string `json:"config"` + State string `json:"state"` + Init string `json:"init"` + Pid string `json:"pid"` +} + +type IConfiguration interface { + configure(nodes []postgresInstance) ([]byte, error) +} + +type pgpoolConfiguration struct { +} + +type pgpoolConfigMessage struct { + Instances []pgpoolInstance +} + +type pgpoolInstance struct { + IpAddress net.IP + Port int + HostName string + Weight float32 + IsReadOnly bool +} + +func main() { + flag.Parse() + + // Logging + var logLevelS string + if logLevelS = os.Getenv("AUTOCONFIG_LOG_LEVEL"); logLevelS == "" { + logLevelS = "info" + } + + logLevel, err := log.ParseLevel(logLevelS) + if err != nil { + log.WithError(err).Fatal("Failed to parse log level") + } + log.SetLevel(logLevel) + + log.Info("Starting pg_auto_failover load balancer configurator, version " + version) + + nextIsType := false + var config IConfiguration + nextIsSocket := false + socketPath := DefaultSocketPath + nextIsFormation := false + formation := DefaultFormation + var argsLeft []string + for i, j := range os.Args { + if nextIsType { + nextIsType = false + switch j { + case "pgpool": + config = &pgpoolConfiguration{} + default: + log.WithFields(log.Fields{"value": j}).Fatal("Unknown configuration type") + } + } else if nextIsSocket { + nextIsSocket = false + socketPath = j + } else if nextIsFormation { + nextIsFormation = false + formation = j + } else if j == "--" { + argsLeft = os.Args[i+1:] + } else { + switch j { + case "--type": + nextIsType = true + case "--socket": + nextIsSocket = true + case "--formation": + nextIsFormation = true + default: + log.WithFields(log.Fields{"value": j}).Fatal("Unknown command switch") + } + } + } + + if config == nil { + log.Fatal("No configuration type specified. Use --type ") + } + if len(argsLeft) == 0 { + log.Fatal("No inner command found to execute. Use -- to separate the inner executable and its args") + } + + // Run the auto configuration + doComplete := false + go func() { + // Wait for the inner process to start + time.Sleep(10 * time.Second) + + // State + var instances []postgresInstance + monitorIsDown := false + var lastSentStateJson []byte + + // Determine if we are a monitor + cmdIsMonitor := exec.Command("/usr/bin/pg_autoctl", "show", "uri", "--json") + cmdIsMonitor.Env = os.Environ() + var isMonitor bool + if configsB, err := cmdIsMonitor.Output(); err != nil { + log.WithError(err).Warn("Failed to detect if we are a monitor node.") + isMonitor = false + } else { + var configs *configFiles + if err := json.Unmarshal(configsB, &configs); err != nil { + log.WithError(err).Warn("Failed to parse monitor node detection") + isMonitor = false + } else { + isMonitor = configs.State == "" + log.WithFields(log.Fields{"isMonitor": isMonitor}).Info("Determined if this is a monitor") + } + } + + httpc := http.Client{ + Transport: &http.Transport{ + DialContext: func(_ context.Context, _, _ string) (net.Conn, error) { + return net.Dial("unix", socketPath) + }, + }, + } + + for !doComplete { + cmdFormationState := exec.Command("/usr/bin/pg_autoctl", "show", "state", "--formation", formation, "--json") + cmdFormationState.Env = os.Environ() + if localStateB, err := cmdFormationState.Output(); err != nil { + if !monitorIsDown { + log.WithError(err).Warn("Failed to obtain the formation state from pg_autoctl. Monitor is probably down.") + monitorIsDown = true + } + } else { + if monitorIsDown { + log.Warn("Monitor node is no longer down.") + monitorIsDown = false + } + + var formationState []postgresInstance + if err := json.Unmarshal(localStateB, &formationState); err != nil { + log.WithError(err).Warn("Failed to parse the local state from pg_autoctl") + } else { + instances = formationState + } + } + + if monitorIsDown && !isMonitor && len(instances) == 0 { + // Try to obtain info about the current node. That way we may still be able to operate + // in a read-only mode or with luck even in read-write mode. + cmdLocalState := exec.Command("/usr/bin/pg_autoctl", "show", "state", "--local", "--json") + cmdLocalState.Env = os.Environ() + if localStateB, err := cmdLocalState.Output(); err != nil { + log.WithError(err).Warn("Failed to obtain the local state from pg_autoctl") + } else { + var localState postgresInstance + if err := json.Unmarshal(localStateB, &localState); err != nil { + log.WithError(err).Warn("Failed to parse the local state from pg_autoctl") + } else { + instances = []postgresInstance{localState} + } + } + } + + // Keep maintaining the last known primary/standby configuration if monitor is down + // Note: We do keep producing configuration files as some volatile values (like + // IP addresses) still may change + if !monitorIsDown || len(instances) != 0 { + if newState, err := config.configure(instances); err != nil { + log.WithFields(log.Fields{"instances": instances}).Warn("Failed to produce configuration from instances.") + } else if !bytes.Equal(newState, lastSentStateJson) { + // Send the new state + lastSentStateJson = newState + if response, err := httpc.Post("http://unix/config", "application/octet-stream", bytes.NewReader(newState)); err != nil { + log.WithError(err).WithFields(log.Fields{"response": response}).Warn("Failed to send configuration to the peer process.") + } + } + } + + time.Sleep(1000 * time.Millisecond) + } + }() + + // Start the inner executable (usually starting with "pg_autoctl create postgres" or "pg_autoctl create monitor") + log.WithFields(log.Fields{"name": argsLeft[0], "args": argsLeft[1:]}).Info("Handling over to inner process.") + cmd := exec.Command(argsLeft[0], argsLeft[1:]...) + cmd.Env = os.Environ() + cmd.Run() + + log.Info("pg_auto_failover load balancer has completed.") + doComplete = true +} + +func (t *pgpoolConfiguration) configure(nodes []postgresInstance) ([]byte, error) { + var msg pgpoolConfigMessage + for _, i := range nodes { + var isPrimary bool + switch i.AssignedState { + case GroupStatePrimary, GroupStatePrimaryAlone: + isPrimary = true + case GroupStateStandby: + isPrimary = false + default: + // Skip the node as it is not ready to accept connections + continue + } + + // Pgpool does not re-resolve host names and assumes IP addresses are + // fixed. Therefore we do it ourselves and our peer application will + // mark any IP addresses no longer used for Postgres nodes as down so + // Pgpool happily uses the ones that are up. + var a pgpoolInstance + if ips, err := net.LookupIP(i.NodeHost); err != nil || len(ips) == 0 { + log.WithError(err).WithFields(log.Fields{"host": i.NodeHost}).Warn("Failed resolve node's host name, skipping.") + continue + } else { + a.IpAddress = ips[0] + } + a.Port = i.NodePort + a.HostName = i.NodeHost + a.IsReadOnly = !isPrimary + msg.Instances = append(msg.Instances, a) + } + + // TODO add user=configurable weights + for i := range msg.Instances { + msg.Instances[i].Weight = 1.0 / float32(len(msg.Instances)) + } + + return json.Marshal(msg) +}