Initial version of pg_autoconfig
All checks were successful
continuous-integration/drone/push Build is passing
All checks were successful
continuous-integration/drone/push Build is passing
This commit is contained in:
1
.gitignore
vendored
Normal file
1
.gitignore
vendored
Normal file
@@ -0,0 +1 @@
|
||||
pg_autoconfig
|
||||
12
Dockerfile
12
Dockerfile
@@ -1,4 +1,16 @@
|
||||
FROM golang:1.21-bullseye AS builder
|
||||
|
||||
WORKDIR /usr/local/src/pg_autoconfig
|
||||
COPY go.* ./
|
||||
COPY *.go ./
|
||||
RUN go mod download
|
||||
|
||||
RUN mkdir bin/ && go build -o bin/ ./...
|
||||
|
||||
FROM postgis/postgis:16-master
|
||||
|
||||
COPY --from=builder /usr/local/src/pg_autoconfig/bin/pg_autoconfig /
|
||||
|
||||
RUN localedef -i cs_CZ -c -f UTF-8 -A /usr/share/locale/locale.alias cz_CZ.UTF-8 && \
|
||||
apt-get update && apt-get install -y postgresql-16-auto-failover pg-auto-failover-cli && rm -rf /var/lib/apt/lists/*
|
||||
|
||||
|
||||
7
go.mod
Normal file
7
go.mod
Normal file
@@ -0,0 +1,7 @@
|
||||
module git.ivasoft.cz/sw/pg_autoconfig
|
||||
|
||||
go 1.21
|
||||
|
||||
require github.com/sirupsen/logrus v1.9.0
|
||||
|
||||
require golang.org/x/sys v0.15.0 // indirect
|
||||
16
go.sum
Normal file
16
go.sum
Normal file
@@ -0,0 +1,16 @@
|
||||
github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
|
||||
github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c=
|
||||
github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
|
||||
github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM=
|
||||
github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4=
|
||||
github.com/sirupsen/logrus v1.9.0 h1:trlNQbNUG3OdDrDil03MCb1H2o9nJ1x4/5LYw7byDE0=
|
||||
github.com/sirupsen/logrus v1.9.0/go.mod h1:naHLuLoDiP4jHNo9R0sCBMtWGeIprob74mVsIT4qYEQ=
|
||||
github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME=
|
||||
github.com/stretchr/testify v1.7.0 h1:nwc3DEeHmmLAfoZucVR881uASk0Mfjw8xYJ99tb5CcY=
|
||||
github.com/stretchr/testify v1.7.0/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg=
|
||||
golang.org/x/sys v0.0.0-20220715151400-c0bba94af5f8/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
|
||||
golang.org/x/sys v0.15.0 h1:h48lPFYpsTvQJZF4EKyI4aLHaev3CxivZmv7yZig9pc=
|
||||
golang.org/x/sys v0.15.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA=
|
||||
gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=
|
||||
gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c h1:dUUwHk2QECo/6vqA44rthZ8ie2QXMNeKRTHCNY2nXvo=
|
||||
gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM=
|
||||
274
main.go
Normal file
274
main.go
Normal file
@@ -0,0 +1,274 @@
|
||||
package main
|
||||
|
||||
import (
|
||||
"bytes"
|
||||
"context"
|
||||
"encoding/json"
|
||||
"flag"
|
||||
"net"
|
||||
"net/http"
|
||||
"os"
|
||||
"os/exec"
|
||||
"time"
|
||||
|
||||
log "github.com/sirupsen/logrus"
|
||||
)
|
||||
|
||||
const version = "1.0"
|
||||
const GroupStatePrimary = "primary"
|
||||
const GroupStatePrimaryAlone = "wait-primary"
|
||||
const GroupStateStandby = "secondary"
|
||||
const DefaultFormation = "default"
|
||||
const DefaultSocketPath = "/var/run/pg_autoconfig.sock"
|
||||
|
||||
type postgresInstance struct {
|
||||
// Locally available info
|
||||
NodeId int `json:"node_id"`
|
||||
GroupId int `json:"group_id"`
|
||||
NodeHost string `json:"nodehost"`
|
||||
NodeName string `json:"nodename"`
|
||||
NodePort int `json:"nodeport"`
|
||||
CurrentState string `json:"current_group_state"`
|
||||
AssignedState string `json:"assigned_group_state"`
|
||||
|
||||
// Monitor provided extended info
|
||||
Health int `json:"health"`
|
||||
NodeCluster string `json:"nodecluster"`
|
||||
ReportedLsn string `json:"reported_lsn"`
|
||||
ReportedTli int `json:"reported_tli"`
|
||||
FormationKind string `json:"formation_kind"`
|
||||
CandidatePriority int `json:"candidate_priority"`
|
||||
ReplicationQuorum bool `json:"replication_quorum"`
|
||||
}
|
||||
|
||||
type configFiles struct {
|
||||
Config string `json:"config"`
|
||||
State string `json:"state"`
|
||||
Init string `json:"init"`
|
||||
Pid string `json:"pid"`
|
||||
}
|
||||
|
||||
type IConfiguration interface {
|
||||
configure(nodes []postgresInstance) ([]byte, error)
|
||||
}
|
||||
|
||||
type pgpoolConfiguration struct {
|
||||
}
|
||||
|
||||
type pgpoolConfigMessage struct {
|
||||
Instances []pgpoolInstance
|
||||
}
|
||||
|
||||
type pgpoolInstance struct {
|
||||
IpAddress net.IP
|
||||
Port int
|
||||
HostName string
|
||||
Weight float32
|
||||
IsReadOnly bool
|
||||
}
|
||||
|
||||
func main() {
|
||||
flag.Parse()
|
||||
|
||||
// Logging
|
||||
var logLevelS string
|
||||
if logLevelS = os.Getenv("AUTOCONFIG_LOG_LEVEL"); logLevelS == "" {
|
||||
logLevelS = "info"
|
||||
}
|
||||
|
||||
logLevel, err := log.ParseLevel(logLevelS)
|
||||
if err != nil {
|
||||
log.WithError(err).Fatal("Failed to parse log level")
|
||||
}
|
||||
log.SetLevel(logLevel)
|
||||
|
||||
log.Info("Starting pg_auto_failover load balancer configurator, version " + version)
|
||||
|
||||
nextIsType := false
|
||||
var config IConfiguration
|
||||
nextIsSocket := false
|
||||
socketPath := DefaultSocketPath
|
||||
nextIsFormation := false
|
||||
formation := DefaultFormation
|
||||
var argsLeft []string
|
||||
for i, j := range os.Args {
|
||||
if nextIsType {
|
||||
nextIsType = false
|
||||
switch j {
|
||||
case "pgpool":
|
||||
config = &pgpoolConfiguration{}
|
||||
default:
|
||||
log.WithFields(log.Fields{"value": j}).Fatal("Unknown configuration type")
|
||||
}
|
||||
} else if nextIsSocket {
|
||||
nextIsSocket = false
|
||||
socketPath = j
|
||||
} else if nextIsFormation {
|
||||
nextIsFormation = false
|
||||
formation = j
|
||||
} else if j == "--" {
|
||||
argsLeft = os.Args[i+1:]
|
||||
} else {
|
||||
switch j {
|
||||
case "--type":
|
||||
nextIsType = true
|
||||
case "--socket":
|
||||
nextIsSocket = true
|
||||
case "--formation":
|
||||
nextIsFormation = true
|
||||
default:
|
||||
log.WithFields(log.Fields{"value": j}).Fatal("Unknown command switch")
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
if config == nil {
|
||||
log.Fatal("No configuration type specified. Use --type <type>")
|
||||
}
|
||||
if len(argsLeft) == 0 {
|
||||
log.Fatal("No inner command found to execute. Use -- to separate the inner executable and its args")
|
||||
}
|
||||
|
||||
// Run the auto configuration
|
||||
doComplete := false
|
||||
go func() {
|
||||
// Wait for the inner process to start
|
||||
time.Sleep(10 * time.Second)
|
||||
|
||||
// State
|
||||
var instances []postgresInstance
|
||||
monitorIsDown := false
|
||||
var lastSentStateJson []byte
|
||||
|
||||
// Determine if we are a monitor
|
||||
cmdIsMonitor := exec.Command("/usr/bin/pg_autoctl", "show", "uri", "--json")
|
||||
cmdIsMonitor.Env = os.Environ()
|
||||
var isMonitor bool
|
||||
if configsB, err := cmdIsMonitor.Output(); err != nil {
|
||||
log.WithError(err).Warn("Failed to detect if we are a monitor node.")
|
||||
isMonitor = false
|
||||
} else {
|
||||
var configs *configFiles
|
||||
if err := json.Unmarshal(configsB, &configs); err != nil {
|
||||
log.WithError(err).Warn("Failed to parse monitor node detection")
|
||||
isMonitor = false
|
||||
} else {
|
||||
isMonitor = configs.State == ""
|
||||
log.WithFields(log.Fields{"isMonitor": isMonitor}).Info("Determined if this is a monitor")
|
||||
}
|
||||
}
|
||||
|
||||
httpc := http.Client{
|
||||
Transport: &http.Transport{
|
||||
DialContext: func(_ context.Context, _, _ string) (net.Conn, error) {
|
||||
return net.Dial("unix", socketPath)
|
||||
},
|
||||
},
|
||||
}
|
||||
|
||||
for !doComplete {
|
||||
cmdFormationState := exec.Command("/usr/bin/pg_autoctl", "show", "state", "--formation", formation, "--json")
|
||||
cmdFormationState.Env = os.Environ()
|
||||
if localStateB, err := cmdFormationState.Output(); err != nil {
|
||||
if !monitorIsDown {
|
||||
log.WithError(err).Warn("Failed to obtain the formation state from pg_autoctl. Monitor is probably down.")
|
||||
monitorIsDown = true
|
||||
}
|
||||
} else {
|
||||
if monitorIsDown {
|
||||
log.Warn("Monitor node is no longer down.")
|
||||
monitorIsDown = false
|
||||
}
|
||||
|
||||
var formationState []postgresInstance
|
||||
if err := json.Unmarshal(localStateB, &formationState); err != nil {
|
||||
log.WithError(err).Warn("Failed to parse the local state from pg_autoctl")
|
||||
} else {
|
||||
instances = formationState
|
||||
}
|
||||
}
|
||||
|
||||
if monitorIsDown && !isMonitor && len(instances) == 0 {
|
||||
// Try to obtain info about the current node. That way we may still be able to operate
|
||||
// in a read-only mode or with luck even in read-write mode.
|
||||
cmdLocalState := exec.Command("/usr/bin/pg_autoctl", "show", "state", "--local", "--json")
|
||||
cmdLocalState.Env = os.Environ()
|
||||
if localStateB, err := cmdLocalState.Output(); err != nil {
|
||||
log.WithError(err).Warn("Failed to obtain the local state from pg_autoctl")
|
||||
} else {
|
||||
var localState postgresInstance
|
||||
if err := json.Unmarshal(localStateB, &localState); err != nil {
|
||||
log.WithError(err).Warn("Failed to parse the local state from pg_autoctl")
|
||||
} else {
|
||||
instances = []postgresInstance{localState}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// Keep maintaining the last known primary/standby configuration if monitor is down
|
||||
// Note: We do keep producing configuration files as some volatile values (like
|
||||
// IP addresses) still may change
|
||||
if !monitorIsDown || len(instances) != 0 {
|
||||
if newState, err := config.configure(instances); err != nil {
|
||||
log.WithFields(log.Fields{"instances": instances}).Warn("Failed to produce configuration from instances.")
|
||||
} else if !bytes.Equal(newState, lastSentStateJson) {
|
||||
// Send the new state
|
||||
lastSentStateJson = newState
|
||||
if response, err := httpc.Post("http://unix/config", "application/octet-stream", bytes.NewReader(newState)); err != nil {
|
||||
log.WithError(err).WithFields(log.Fields{"response": response}).Warn("Failed to send configuration to the peer process.")
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
time.Sleep(1000 * time.Millisecond)
|
||||
}
|
||||
}()
|
||||
|
||||
// Start the inner executable (usually starting with "pg_autoctl create postgres" or "pg_autoctl create monitor")
|
||||
log.WithFields(log.Fields{"name": argsLeft[0], "args": argsLeft[1:]}).Info("Handling over to inner process.")
|
||||
cmd := exec.Command(argsLeft[0], argsLeft[1:]...)
|
||||
cmd.Env = os.Environ()
|
||||
cmd.Run()
|
||||
|
||||
log.Info("pg_auto_failover load balancer has completed.")
|
||||
doComplete = true
|
||||
}
|
||||
|
||||
func (t *pgpoolConfiguration) configure(nodes []postgresInstance) ([]byte, error) {
|
||||
var msg pgpoolConfigMessage
|
||||
for _, i := range nodes {
|
||||
var isPrimary bool
|
||||
switch i.AssignedState {
|
||||
case GroupStatePrimary, GroupStatePrimaryAlone:
|
||||
isPrimary = true
|
||||
case GroupStateStandby:
|
||||
isPrimary = false
|
||||
default:
|
||||
// Skip the node as it is not ready to accept connections
|
||||
continue
|
||||
}
|
||||
|
||||
// Pgpool does not re-resolve host names and assumes IP addresses are
|
||||
// fixed. Therefore we do it ourselves and our peer application will
|
||||
// mark any IP addresses no longer used for Postgres nodes as down so
|
||||
// Pgpool happily uses the ones that are up.
|
||||
var a pgpoolInstance
|
||||
if ips, err := net.LookupIP(i.NodeHost); err != nil || len(ips) == 0 {
|
||||
log.WithError(err).WithFields(log.Fields{"host": i.NodeHost}).Warn("Failed resolve node's host name, skipping.")
|
||||
continue
|
||||
} else {
|
||||
a.IpAddress = ips[0]
|
||||
}
|
||||
a.Port = i.NodePort
|
||||
a.HostName = i.NodeHost
|
||||
a.IsReadOnly = !isPrimary
|
||||
msg.Instances = append(msg.Instances, a)
|
||||
}
|
||||
|
||||
// TODO add user=configurable weights
|
||||
for i := range msg.Instances {
|
||||
msg.Instances[i].Weight = 1.0 / float32(len(msg.Instances))
|
||||
}
|
||||
|
||||
return json.Marshal(msg)
|
||||
}
|
||||
Reference in New Issue
Block a user