Files
docker-postgis/main.go
Roman Vanicek 7e3e97a8bf
All checks were successful
continuous-integration/drone/push Build is passing
Configuration using env vars. Adding standby works.
2024-04-09 01:53:53 +02:00

576 lines
20 KiB
Go

package main
import (
"bytes"
"context"
"encoding/json"
"net"
"net/http"
"os"
"os/exec"
"path/filepath"
"slices"
"strconv"
"strings"
"time"
log "github.com/sirupsen/logrus"
)
const version = "1.0"
const GroupStatePrimary = "primary"
const GroupStatePrimaryDegraded = "wait_primary"
const GroupStatePrimaryAlone = "single"
const GroupStateStandby = "secondary"
const DefaultFormation = "default"
const DefaultSocketPath = "/var/run/pg_autoconfig.sock"
const ExecutionCycleSleep = 1000 * time.Millisecond
const ExecutionCyclesPerPing = 10
const DefaultRemotePort = 5420
const MonitorUpCheckDelay = 15 * time.Second
const PostgresOriginalEntrypoint = "/usr/local/bin/docker-entrypoint.sh"
const PostgresOriginalCmd = "postgres"
const AutoFailoverExecutable = "pg_autoctl"
const AutoFailoverPidPath = "/tmp/pg_autoctl/data/data/pg_autoctl.pid"
const AutoFailoverPidPath2 = "/tmp/pg_autoctl/db/data/pg_autoctl_postgres.pid"
const AutoFailoverPidPath3 = "/tmp/pg_autoctl/db/data/pg_autoctl_listener.pid"
const PgHbaConfFileName = "pg_hba.conf"
const PostgresInitDoneFileName = "PG_VERSION"
const PostgresDataDefaultPath = "/var/lib/postgresql/data"
type postgresInstance struct {
// Locally available info
NodeId int `json:"node_id"`
GroupId int `json:"group_id"`
NodeHost string `json:"nodehost"`
NodeName string `json:"nodename"`
NodePort int `json:"nodeport"`
CurrentState string `json:"current_group_state"`
AssignedState string `json:"assigned_group_state"`
// Monitor provided extended info
Health int `json:"health"`
NodeCluster string `json:"nodecluster"`
ReportedLsn string `json:"reported_lsn"`
ReportedTli int `json:"reported_tli"`
FormationKind string `json:"formation_kind"`
CandidatePriority int `json:"candidate_priority"`
ReplicationQuorum bool `json:"replication_quorum"`
}
type configFiles struct {
Config string `json:"config"`
State string `json:"state"`
Init string `json:"init"`
Pid string `json:"pid"`
}
type configMessage struct {
Instances []configInstance
}
type configInstance struct {
IpAddress net.IP
Port int
HostName string
Weight float32
IsReadOnly bool
}
type pingMessage struct {
NeedsConfig bool
}
type global struct {
formation string
useLocal bool
socketPath string
remotes []string
isMonitor bool
monitorHost string
monitorPort int
monitorPassword string
replicationPassword string
hbaConfPath string
env []string
}
func main() {
// Logging
var logLevelS string
if logLevelS = os.Getenv("AUTOCONFIG_LOG_LEVEL"); logLevelS == "" {
logLevelS = "info"
}
logLevel, err := log.ParseLevel(logLevelS)
if err != nil {
log.WithError(err).Fatal("Failed to parse log level")
}
log.SetLevel(logLevel)
log.Info("Starting pg_auto_failover load balancer configurator, version " + version)
// Mode of operation
var innerExec string
var innerArgs []string
innerEnv := os.Environ()
doComplete := false
if mode := os.Getenv("AUTOCONFIG_MODE"); mode == "monitor" || mode == "postgres" {
innerExec = AutoFailoverExecutable
innerArgs = append(innerArgs, []string{"create", mode, "--no-ssl"}...)
var state global
state.formation = getEnvOrDefault("AUTOCONFIG_FORMATION", DefaultFormation)
state.useLocal = getEnvOrDefaultBool("AUTOCONFIG_LOCAL_PEER", false)
state.socketPath = getEnvOrDefault("AUTOCONFIG_SOCKET", DefaultSocketPath)
state.remotes = slices.DeleteFunc(strings.Split(getEnvOrDefault("AUTOCONFIG_REMOTE_PEERS", ""), ","), func(x string) bool {
return x == ""
})
if !state.useLocal && len(state.remotes) == 0 {
log.Warning("No configuration peers configured using AUTOCONFIG_LOCAL_PEER or AUTOCONFIG_REMOTE_PEERS. Load balancing to the Postgres primary will only work if supported on the application level. Load balancing of the readonly queries may be suboptimal.")
}
// Use custom or automatic pg_hba.conf (critical during secondary initialization)
state.hbaConfPath = getEnvOrDefault("AUTOCONFIG_LINK_HBA_CONF", "")
if state.hbaConfPath == "" {
innerArgs = append(innerArgs, []string{"--auth", "scram-sha-256", "--pg-hba-lan"}...)
} else {
innerArgs = append(innerArgs, "--skip-pg-hba")
}
state.monitorPassword = getEnvOrDefault("AUTOCONFIG_MONITOR_PASSWORD", "")
monitorPasswordPath := getEnvOrDefault("AUTOCONFIG_MONITOR_PASSWORD_FILE", "")
if monitorPasswordFile, err := os.Stat(monitorPasswordPath); err == nil && !monitorPasswordFile.IsDir() {
if v, err := os.ReadFile(monitorPasswordPath); err == nil {
state.monitorPassword = string(v)
}
}
if state.monitorPassword == "" {
log.Fatal("AUTOCONFIG_MONITOR_PASSWORD or AUTOCONFIG_MONITOR_PASSWORD_FILE must be set")
}
if mode == "monitor" {
state.isMonitor = true
innerArgs = append(innerArgs, "--run")
log.Info("Starting a monitor node.")
} else {
state.isMonitor = false
state.monitorHost = getEnvOrDefault("AUTOCONFIG_MONITOR_HOST", "")
if state.monitorHost == "" {
log.Fatal("AUTOCONFIG_MONITOR_HOST must be set")
}
state.monitorPort = getEnvOrDefaultInt("AUTOCONFIG_MONITOR_PORT", getEnvOrDefaultInt("PGPORT", 5432))
state.replicationPassword = getEnvOrDefault("AUTOCONFIG_REPLICATION_PASSWORD", "")
replicationPasswordPath := getEnvOrDefault("AUTOCONFIG_REPLICATION_PASSWORD_FILE", "")
if replicationPasswordFile, err := os.Stat(replicationPasswordPath); err == nil && !replicationPasswordFile.IsDir() {
if v, err := os.ReadFile(replicationPasswordPath); err == nil {
state.monitorPassword = string(v)
}
}
if state.replicationPassword == "" {
log.Fatal("AUTOCONFIG_REPLICATION_PASSWORD or AUTOCONFIG_REPLICATION_PASSWORD_FILE must be set")
}
// Pass the monitor through an env var which is a little bit better than an argument
// Note: For security reasons we do not want to pass the monitor URI as an argument as it contains a password
monitorEnvUri := "PG_AUTOCTL_MONITOR=postgresql://autoctl_node:" + state.monitorPassword + "@" + state.monitorHost + ":" + strconv.Itoa(state.monitorPort) + "/pg_auto_failover?sslmode=prefer"
monitorSet := false
for i, j := range innerEnv {
if strings.HasPrefix(j, "PG_AUTOCTL_MONITOR=") {
innerEnv[i] = monitorEnvUri
monitorSet = true
break
}
}
if !monitorSet {
innerEnv = append(innerEnv, monitorEnvUri)
}
state.env = innerEnv
// The first secondary initialization is tricky as we must inject pg_hba.conf
// and soon after set the replication password for the second time
targetDir := getEnvPgData()
initDonePath := filepath.Join(targetDir, PostgresInitDoneFileName)
if _, err := os.Stat(initDonePath); err != nil && !os.IsNotExist(err) {
log.Fatal("Failed to access Postgres data directory")
} else if err != nil {
log.WithFields(log.Fields{"path": targetDir}).Info("Postgres data dir is not initialized")
if state.hbaConfPath != "" {
go func() {
for {
if _, err := os.Stat(initDonePath); err == nil {
break
} else if !os.IsNotExist(err) {
log.Fatal("Failed to access Postgres data directory")
}
time.Sleep(100 * time.Millisecond)
}
ensurePgHbaConfReplaced(state.hbaConfPath)
}()
}
// Propagate the replication password
// Note: There is no option to pass the replication password and even pg_autoctl create without --run already
// assumes it is present. Neither pg_autoctl config set works at this stage
replEnvPassword := "PGPASSWORD=" + state.replicationPassword
replSet := false
initEnv := innerEnv
for i, j := range initEnv {
if strings.HasPrefix(j, "PGPASSWORD=") {
initEnv[i] = replEnvPassword
replSet = true
break
}
}
if !replSet {
initEnv = append(initEnv, replEnvPassword)
}
log.WithFields(log.Fields{"name": innerExec, "args": innerArgs}).Info("Initializing pg_auto_failure on disk")
cmdInit := exec.Command(innerExec, innerArgs...)
cmdInit.Env = initEnv
cmdInit.Stdout = os.Stdout
cmdInit.Stderr = os.Stderr
if err := cmdInit.Run(); err != nil {
log.WithError(err).Warn("Initialization of pg_auto_failure encountered problems")
}
log.Info("Setting replication password in the configuration")
setPassCmd := exec.Command(AutoFailoverExecutable, "config", "set", "replication.password", state.replicationPassword)
setPassCmd.Env = state.env
if err := setPassCmd.Run(); err != nil {
log.WithError(err).Fatal("Failed to set password of user pgautofailover_replicator in the configuration")
}
}
innerArgs = append(innerArgs, "--run")
log.Info("Starting a worker node.")
}
// Run the auto configuration
go executionLoop(state, &doComplete)
} else if mode != "" {
log.Fatal("AUTOCONFIG_MODE must be either 'monitor' or 'node' to activate pg_auto_failover. Leave it empty for plain Postgres operation.")
} else {
// Execute the original entrypoint as if we never existed
log.Info("No pg_auto_failover mode specified. Will execute plain Postgres.")
innerExec = PostgresOriginalEntrypoint
innerArgs = []string{PostgresOriginalCmd}
}
// Clean-up any PIDs left from previous run
os.Remove(AutoFailoverPidPath)
os.Remove(AutoFailoverPidPath2)
os.Remove(AutoFailoverPidPath3)
// Start the inner executable (usually starting with "pg_autoctl create postgres" or "pg_autoctl create monitor")
log.WithFields(log.Fields{"name": innerExec, "args": innerArgs}).Info("Handling over to the inner process.")
cmd := exec.Command(innerExec, innerArgs...)
cmd.Env = innerEnv
cmd.Stdout = os.Stdout
cmd.Stderr = os.Stderr
cmd.Run()
log.Info("pg_auto_failover load balancer configurator has completed.")
doComplete = true
}
func executionLoop(state global, doComplete *bool) {
// Link pg_hba.conf if asked to do so
ensurePostgresReadyForQueries()
ensurePgHbaConfReplaced(state.hbaConfPath)
ensurePostgresReadyForQueries()
// State
var instances []postgresInstance
monitorIsDown := false
var lastSentStateJson []byte
// Monitor automatically creates user autoctl_node but we must set/propagate its password
if state.isMonitor {
log.Info("This is a monitor node, setting password for autoctl_node user")
setPassCmd := exec.Command("psql", "-d", "pg_auto_failover", "-c", "alter user autoctl_node password '"+state.monitorPassword+"';")
if err := setPassCmd.Run(); err != nil {
log.WithError(err).Fatal("Failed to set password of user autoctl_node on the monitor node")
}
log.WithFields(log.Fields{"rule": "host pg_auto_failover autoctl_node <node ip>/32 <auth-type>"}).Info(
"Check rules exist in pg_hba.conf for each other node")
} else {
log.Info("Setting replication password in the database (will succeed on primary only)")
setPassCmd2 := exec.Command("psql", "-c", "alter user pgautofailover_replicator password '"+state.replicationPassword+"';")
if err := setPassCmd2.Run(); err != nil {
log.WithError(err).Info("Failed to set password of user pgautofailover_replicator in the database (probably a stand-by read-only node)")
}
// TODO How to set password of pgautofailover_monitor?
log.WithFields(log.Fields{
"ruleMonitor": "host all pgautofailover_monitor <monitor ip>/32 <auth-type>",
"ruleRegular1": "host postgres pgautofailover_replicator <node ip>/32 <auth-type>",
"ruleRegular2": "host replication pgautofailover_replicator <node ip>/32 <auth-type>",
}).Info("Check rules exist in pg_hba.conf for each other node and type")
}
peers := []http.Client{}
if state.useLocal {
peers = append(peers, http.Client{
Transport: &http.Transport{
DialContext: func(_ context.Context, _, _ string) (net.Conn, error) {
return net.Dial("unix", state.socketPath)
},
},
})
}
for _, i := range state.remotes {
if iParts := strings.Split(i, ":"); len(iParts) == 1 {
i = i + ":" + strconv.Itoa(DefaultRemotePort)
}
peers = append(peers, http.Client{
Transport: &http.Transport{
DialContext: func(_ context.Context, _, _ string) (net.Conn, error) {
return net.Dial("tcp", i)
},
},
})
}
pingWaitCounter := 0
for !*doComplete {
if !monitorIsDown {
cmdFormationState := exec.Command("/usr/bin/pg_autoctl", "show", "state", "--formation", state.formation, "--json")
cmdFormationState.Env = state.env
if localStateB, err := cmdFormationState.Output(); err != nil {
log.WithError(err).Warn("Failed to obtain the formation state from pg_autoctl. Monitor is probably down.")
monitorIsDown = true
// Delegate the monitor up-checking to a separate go routine so we do not block our main task of
// sending pings to the configuration peer
go func() {
for !*doComplete {
cmdMonitorUp := exec.Command("/usr/bin/pg_autoctl", "show", "state", "--formation", state.formation, "--json")
cmdMonitorUp.Env = state.env
if _, err := cmdMonitorUp.Output(); err == nil {
log.Warn("Monitor node is no longer down.")
monitorIsDown = false
return
}
time.Sleep(MonitorUpCheckDelay)
}
}()
} else {
var formationState []postgresInstance
if err := json.Unmarshal(localStateB, &formationState); err != nil {
log.WithError(err).Warn("Failed to parse the local state from pg_autoctl")
} else {
instances = formationState
}
}
}
if monitorIsDown && !state.isMonitor && len(instances) == 0 {
// Try to obtain info about the current node. That way we may still be able to operate
// in a read-only mode or with luck even in read-write mode. If the configuration peer
// is connected to all the nodes then it can reconstruct the full picture.
cmdLocalState := exec.Command("/usr/bin/pg_autoctl", "show", "state", "--local", "--json")
cmdLocalState.Env = state.env
if localStateB, err := cmdLocalState.Output(); err != nil {
log.WithError(err).Warn("Failed to obtain the local state from pg_autoctl")
} else {
var localState postgresInstance
if err := json.Unmarshal(localStateB, &localState); err != nil {
log.WithError(err).Warn("Failed to parse the local state from pg_autoctl")
} else {
instances = []postgresInstance{localState}
}
}
}
// Keep maintaining the last known primary/standby configuration if monitor is down
// Note: We do keep producing configuration files as some volatile values (like
// IP addresses) still may change
if !monitorIsDown || len(instances) != 0 {
if newState, err := configure(instances); err != nil {
log.WithFields(log.Fields{"instances": instances}).Warn("Failed to produce configuration from instances.")
} else if !bytes.Equal(newState, lastSentStateJson) {
// Send the new state
lastSentStateJson = newState
log.WithFields(log.Fields{"state": string(newState)}).Info("Sending configuration to the peer processes (if any)")
for _, i := range peers {
if response, err := i.Post("http://unix/config", "application/octet-stream", bytes.NewReader(newState)); err != nil {
log.WithError(err).WithFields(log.Fields{"peer": i, "response": response}).Warn("Failed to send configuration to a peer process.")
}
}
}
}
// Send a periodic ping to the configuration worker
if pingWaitCounter == ExecutionCyclesPerPing {
pingWaitCounter = 0
log.Trace("Pinging configuration peers (if any)")
for _, i := range peers {
if response, err := i.Get("http://unix/ping"); err != nil {
log.WithError(err).WithFields(log.Fields{"peer": i, "response": response}).Trace("Failed to ping to a peer process.")
} else {
var msg pingMessage
if err := json.NewDecoder(response.Body).Decode(&msg); err != nil {
log.WithError(err).WithFields(log.Fields{"peer": i}).Warn("Failed to decode ping response.")
} else {
log.WithFields(log.Fields{"response": msg}).Trace("Ping response")
if msg.NeedsConfig {
log.WithFields(log.Fields{"peer": i, "state": string(lastSentStateJson)}).Info("Sending configuration to a peer process")
if response, err := i.Post("http://unix/config", "application/octet-stream", bytes.NewReader(lastSentStateJson)); err != nil {
log.WithError(err).WithFields(log.Fields{"response": response}).Warn("Failed to send configuration to the peer process.")
}
}
}
}
}
} else {
pingWaitCounter++
}
time.Sleep(ExecutionCycleSleep)
}
}
func ensurePostgresReadyForQueries() {
for {
log.Trace("Testing if we can query the Postgres database")
testCmd := exec.Command("psql", "-c", "SELECT 0;")
if err := testCmd.Run(); err == nil {
break
}
time.Sleep(1000 * time.Millisecond)
}
log.Trace("Postgres database accepts queries")
}
func ensurePgHbaConfReplaced(hbaConfPath string) {
needsPgConfReload := false
if hbaConfPath != "" {
// Postgres refuses to initialize a data directory that is initially not empty
targetDir := getEnvPgData()
targetPath := filepath.Join(targetDir, PgHbaConfFileName)
initDonePath := filepath.Join(targetDir, PostgresInitDoneFileName)
log.WithFields(log.Fields{"path": targetDir}).Info("Ensuring Postgres data dir is initialized")
for {
if _, err := os.Stat(initDonePath); err == nil {
break
} else if !os.IsNotExist(err) {
log.Fatal("Failed to access Postgres data directory")
}
time.Sleep(100 * time.Millisecond)
}
if info, err := os.Stat(hbaConfPath); err != nil || info.IsDir() {
log.WithError(err).Fatal("Cannot find the pg_hba.conf to be linked as requested by AUTOCONFIG_LINK_HBA_CONF")
}
if info, err := os.Lstat(targetPath); (err != nil && os.IsNotExist(err)) || (err == nil && info.Mode().Type() != os.ModeSymlink) {
// During secondary node initialization the pg_hba.conf is completely missing
if !os.IsNotExist(err) {
if err := os.Remove(targetPath); err != nil {
log.Warn("Failed to remove the original pg_hba.conf")
}
}
log.WithFields(log.Fields{"path": hbaConfPath}).Info("Applying the requested pg_hba.conf")
if err := os.Symlink(hbaConfPath, targetPath); err != nil {
log.WithError(err).Fatal("Failed to sym-link the pg_hb.conf requested by AUTOCONFIG_LINK_HBA_CONF")
}
needsPgConfReload = true
}
}
if needsPgConfReload {
ensurePostgresReadyForQueries()
log.Info("Reloading Postgres pg_hba.conf")
reloadCmd := exec.Command("psql", "-c", "SELECT 0 FROM pg_reload_conf();")
if err := reloadCmd.Run(); err != nil {
log.WithError(err).Fatal("Failed to reload Postgres configuration to apply new pg_hba.conf")
}
}
}
func configure(nodes []postgresInstance) ([]byte, error) {
var msg configMessage
for _, i := range nodes {
var isPrimary bool
switch i.AssignedState {
case GroupStatePrimary, GroupStatePrimaryDegraded, GroupStatePrimaryAlone:
isPrimary = true
case GroupStateStandby:
isPrimary = false
default:
// Skip the node as it is not ready to accept connections
log.WithFields(log.Fields{"nodeId": i.NodeId, "nodeName": i.NodeName, "port": i.NodePort, "assignedState": i.AssignedState, "currentState": i.CurrentState}).Trace("Skipping node.")
continue
}
// Pgpool does not re-resolve host names and assumes IP addresses are
// fixed. Therefore we do it ourselves and our peer application will
// mark any IP addresses no longer used for Postgres nodes as down so
// Pgpool happily uses the ones that are up.
var a configInstance
if ips, err := net.LookupIP(i.NodeHost); err != nil || len(ips) == 0 {
log.WithError(err).WithFields(log.Fields{"host": i.NodeHost}).Warn("Failed resolve node's host name, skipping.")
continue
} else {
a.IpAddress = ips[0]
}
a.Port = i.NodePort
a.HostName = i.NodeHost
a.IsReadOnly = !isPrimary
msg.Instances = append(msg.Instances, a)
}
// TODO add user=configurable weights
for i := range msg.Instances {
msg.Instances[i].Weight = 1.0 / float32(len(msg.Instances))
}
return json.Marshal(msg)
}
func getEnvOrDefault(name string, defaultValue string) string {
if val := os.Getenv(name); val != "" {
return val
} else {
return defaultValue
}
}
func getEnvOrDefaultBool(name string, defaultValue bool) bool {
if valS := os.Getenv(name); valS != "" {
valS = strings.ToLower(valS)
if valS == "1" || valS == "yes" || valS == "true" {
return true
} else {
return false
}
} else {
return defaultValue
}
}
func getEnvOrDefaultInt(name string, defaultValue int) int {
if val := os.Getenv(name); val != "" {
if val2, err := strconv.Atoi(val); err == nil {
return val2
}
}
return defaultValue
}
func getEnvPgData() string {
return getEnvOrDefault("PGDATA", PostgresDataDefaultPath)
}