package main import ( "bytes" "context" "encoding/json" "net" "net/http" "os" "os/exec" "os/signal" "path/filepath" "slices" "strconv" "strings" "syscall" "time" log "github.com/sirupsen/logrus" ) const version = "1.2" const GroupStatePrimary = "primary" const GroupStatePrimaryDegraded = "wait_primary" const GroupStatePrimaryAlone = "single" const GroupStateStandby = "secondary" const DefaultFormation = "default" const DefaultSocketPath = "/var/run/pg_autoconfig.sock" const ExecutionCycleSleep = 1000 * time.Millisecond const ExecutionCyclesPerPing = 10 const DefaultRemotePort = 5420 const MonitorUpCheckDelay = 15 * time.Second const PostgresOriginalEntrypoint = "/usr/local/bin/docker-entrypoint.sh" const PostgresOriginalCmd = "postgres" const AutoFailoverExecutable = "pg_autoctl" const AutoFailoverPidPath = "/tmp/pg_autoctl/data/data/pg_autoctl.pid" const AutoFailoverPidPath2 = "/tmp/pg_autoctl/db/data/pg_autoctl_postgres.pid" const AutoFailoverPidPath3 = "/tmp/pg_autoctl/db/data/pg_autoctl_listener.pid" const PgHbaConfFileName = "pg_hba.conf" const PostgresInitDoneFileName = "PG_VERSION" const PostgresDataDefaultPath = "/var/lib/postgresql/data" type postgresInstance struct { // Locally available info NodeId int `json:"node_id"` GroupId int `json:"group_id"` NodeHost string `json:"nodehost"` NodeName string `json:"nodename"` NodePort int `json:"nodeport"` CurrentState string `json:"current_group_state"` AssignedState string `json:"assigned_group_state"` // Monitor provided extended info Health int `json:"health"` NodeCluster string `json:"nodecluster"` ReportedLsn string `json:"reported_lsn"` ReportedTli int `json:"reported_tli"` FormationKind string `json:"formation_kind"` CandidatePriority int `json:"candidate_priority"` ReplicationQuorum bool `json:"replication_quorum"` } type configFiles struct { Config string `json:"config"` State string `json:"state"` Init string `json:"init"` Pid string `json:"pid"` } type configMessage struct { Instances []configInstance } type configInstance struct { IpAddress net.IP Port int HostName string Weight float32 IsReadOnly bool } type pingMessage struct { NeedsConfig bool } type global struct { formation string useLocal bool socketPath string remotes []string isMonitor bool monitorHost string monitorPort int monitorPassword string replicationPassword string hbaConfPath string env []string } func main() { // Logging var logLevelS string if logLevelS = os.Getenv("AUTOCONFIG_LOG_LEVEL"); logLevelS == "" { logLevelS = "info" } logLevel, err := log.ParseLevel(logLevelS) if err != nil { log.WithError(err).Fatal("Failed to parse log level") } log.SetLevel(logLevel) log.Info("Starting pg_auto_failover load balancer configurator, version " + version) // Mode of operation var innerExec string var innerArgs []string innerEnv := os.Environ() doComplete := false if mode := os.Getenv("AUTOCONFIG_MODE"); mode == "monitor" || mode == "postgres" { innerExec = AutoFailoverExecutable innerArgs = append(innerArgs, []string{"create", mode, "--no-ssl"}...) var state global state.formation = getEnvOrDefault("AUTOCONFIG_FORMATION", DefaultFormation) state.useLocal = getEnvOrDefaultBool("AUTOCONFIG_LOCAL_PEER", false) state.socketPath = getEnvOrDefault("AUTOCONFIG_SOCKET", DefaultSocketPath) state.remotes = slices.DeleteFunc(strings.Split(getEnvOrDefault("AUTOCONFIG_REMOTE_PEERS", ""), ","), func(x string) bool { return x == "" }) if !state.useLocal && len(state.remotes) == 0 { log.Warning("No configuration peers configured using AUTOCONFIG_LOCAL_PEER or AUTOCONFIG_REMOTE_PEERS. Load balancing to the Postgres primary will only work if supported on the application level. Load balancing of the readonly queries may be suboptimal.") } // Use custom or automatic pg_hba.conf (critical during secondary initialization) state.hbaConfPath = getEnvOrDefault("AUTOCONFIG_LINK_HBA_CONF", "") if state.hbaConfPath == "" { innerArgs = append(innerArgs, []string{"--auth", "scram-sha-256", "--pg-hba-lan"}...) } else { innerArgs = append(innerArgs, "--skip-pg-hba") } state.monitorPassword = getEnvOrDefault("AUTOCONFIG_MONITOR_PASSWORD", "") monitorPasswordPath := getEnvOrDefault("AUTOCONFIG_MONITOR_PASSWORD_FILE", "") if monitorPasswordFile, err := os.Stat(monitorPasswordPath); err == nil && !monitorPasswordFile.IsDir() { if v, err := os.ReadFile(monitorPasswordPath); err == nil { state.monitorPassword = string(v) } } if state.monitorPassword == "" { log.Fatal("AUTOCONFIG_MONITOR_PASSWORD or AUTOCONFIG_MONITOR_PASSWORD_FILE must be set") } if mode == "monitor" { state.isMonitor = true innerArgs = append(innerArgs, "--run") log.Info("Starting a monitor node.") } else { state.isMonitor = false state.monitorHost = getEnvOrDefault("AUTOCONFIG_MONITOR_HOST", "") if state.monitorHost == "" { log.Fatal("AUTOCONFIG_MONITOR_HOST must be set") } state.monitorPort = getEnvOrDefaultInt("AUTOCONFIG_MONITOR_PORT", getEnvOrDefaultInt("PGPORT", 5432)) state.replicationPassword = getEnvOrDefault("AUTOCONFIG_REPLICATION_PASSWORD", "") replicationPasswordPath := getEnvOrDefault("AUTOCONFIG_REPLICATION_PASSWORD_FILE", "") if replicationPasswordFile, err := os.Stat(replicationPasswordPath); err == nil && !replicationPasswordFile.IsDir() { if v, err := os.ReadFile(replicationPasswordPath); err == nil { state.replicationPassword = string(v) } } if state.replicationPassword == "" { log.Fatal("AUTOCONFIG_REPLICATION_PASSWORD or AUTOCONFIG_REPLICATION_PASSWORD_FILE must be set") } // Pass the monitor through an env var which is a little bit better than an argument // Note: For security reasons we do not want to pass the monitor URI as an argument as it contains a password monitorEnvUri := "PG_AUTOCTL_MONITOR=postgresql://autoctl_node:" + state.monitorPassword + "@" + state.monitorHost + ":" + strconv.Itoa(state.monitorPort) + "/pg_auto_failover?sslmode=prefer" monitorSet := false for i, j := range innerEnv { if strings.HasPrefix(j, "PG_AUTOCTL_MONITOR=") { innerEnv[i] = monitorEnvUri monitorSet = true break } } if !monitorSet { innerEnv = append(innerEnv, monitorEnvUri) } state.env = innerEnv // The first secondary initialization is tricky as we must inject pg_hba.conf // and soon after set the replication password for the second time targetDir := getEnvPgData() initDonePath := filepath.Join(targetDir, PostgresInitDoneFileName) if _, err := os.Stat(initDonePath); err != nil && !os.IsNotExist(err) { log.Fatal("Failed to access Postgres data directory") } else { // Note: pg_auto_failure refuses to start without pg_hba.conf log.WithFields(log.Fields{"path": targetDir}).Info("Postgres data dir is not initialized") if state.hbaConfPath != "" { go func() { for { if _, err := os.Stat(initDonePath); err == nil { break } else if !os.IsNotExist(err) { log.Fatal("Failed to access Postgres data directory") } time.Sleep(100 * time.Millisecond) } ensurePgHbaConfReplaced(state.hbaConfPath) }() } if err != nil { // Database initialization as the storage is empty so propagate the replication password (in case this is a secondary) // Note: There is no option to pass the replication password and even pg_autoctl create without --run already // assumes it is present. Neither pg_autoctl config set works at this stage. So use PGPASSWORD replEnvPassword := "PGPASSWORD=" + state.replicationPassword replSet := false initEnv := innerEnv for i, j := range initEnv { if strings.HasPrefix(j, "PGPASSWORD=") { initEnv[i] = replEnvPassword replSet = true break } } if !replSet { initEnv = append(initEnv, replEnvPassword) } log.WithFields(log.Fields{"name": innerExec, "args": innerArgs}).Info("Initializing pg_auto_failure on disk") cmdInit := exec.Command(innerExec, innerArgs...) cmdInit.Env = initEnv cmdInit.Stdout = os.Stdout cmdInit.Stderr = os.Stderr if err := cmdInit.Run(); err != nil { log.WithError(err).Warn("Initialization of pg_auto_failure encountered problems") } log.Info("Setting replication password in the configuration") setPassCmd := exec.Command(AutoFailoverExecutable, "config", "set", "replication.password", state.replicationPassword) setPassCmd.Env = state.env if err := setPassCmd.Run(); err != nil { log.WithError(err).Fatal("Failed to set password of user pgautofailover_replicator in the configuration") } } } innerArgs = append(innerArgs, "--run") log.Info("Starting a worker node.") } // Run the auto configuration go executionLoop(state, &doComplete) } else if mode != "" { log.Fatal("AUTOCONFIG_MODE must be either 'monitor' or 'node' to activate pg_auto_failover. Leave it empty for plain Postgres operation.") } else { // Execute the original entrypoint as if we never existed log.Info("No pg_auto_failover mode specified. Will execute plain Postgres.") innerExec = PostgresOriginalEntrypoint innerArgs = []string{PostgresOriginalCmd} } // Clean-up any PIDs left from previous run os.Remove(AutoFailoverPidPath) os.Remove(AutoFailoverPidPath2) os.Remove(AutoFailoverPidPath3) // Start the inner executable (usually starting with "pg_autoctl create postgres" or "pg_autoctl create monitor") log.WithFields(log.Fields{"name": innerExec, "args": innerArgs}).Info("Handling over to the inner process.") cmd := exec.Command(innerExec, innerArgs...) cmd.Env = innerEnv cmd.Stdout = os.Stdout cmd.Stderr = os.Stderr // Pass termination signals to the inner process // Note: The container we are part sets STOPSIGNAL SIGINT but we accept also the default SIGTERM sigs := make(chan os.Signal, 1) defer close(sigs) signal.Notify(sigs, syscall.SIGINT, syscall.SIGTERM) defer signal.Reset() go func() { for sig := range sigs { log.Info("Passing signal '" + sig.String() + "' to the inner process") cmd.Process.Signal(sig) } }() cmd.Run() log.Info("pg_auto_failover load balancer configurator has completed.") doComplete = true } func executionLoop(state global, doComplete *bool) { // Link pg_hba.conf if asked to do so ensurePostgresReadyForQueries() ensurePgHbaConfReplaced(state.hbaConfPath) ensurePostgresReadyForQueries() // State var instances []postgresInstance monitorIsDown := false var lastSentStateJson []byte // Monitor automatically creates user autoctl_node but we must set/propagate its password if state.isMonitor { log.Info("This is a monitor node, setting password for autoctl_node user") setPassCmd := exec.Command("psql", "-d", "pg_auto_failover", "-c", "alter user autoctl_node password '"+state.monitorPassword+"';") if err := setPassCmd.Run(); err != nil { log.WithError(err).Fatal("Failed to set password of user autoctl_node on the monitor node") } log.WithFields(log.Fields{"rule": "host pg_auto_failover autoctl_node /32 "}).Info( "Check rules exist in pg_hba.conf for each other node") } else { log.Info("Setting replication password in the database (will succeed on primary only)") setPassCmd2 := exec.Command("psql", "-c", "alter user pgautofailover_replicator password '"+state.replicationPassword+"';") if err := setPassCmd2.Run(); err != nil { log.WithError(err).Info("Failed to set password of user pgautofailover_replicator in the database (probably a stand-by read-only node)") } // TODO How to set password of pgautofailover_monitor? log.WithFields(log.Fields{ "ruleMonitor": "host all pgautofailover_monitor /32 ", "ruleRegular1": "host postgres pgautofailover_replicator /32 ", "ruleRegular2": "host replication pgautofailover_replicator /32 ", }).Info("Check rules exist in pg_hba.conf for each other node and type") } peers := []http.Client{} if state.useLocal { peers = append(peers, http.Client{ Transport: &http.Transport{ DialContext: func(_ context.Context, _, _ string) (net.Conn, error) { return net.Dial("unix", state.socketPath) }, }, }) } for _, i := range state.remotes { if iParts := strings.Split(i, ":"); len(iParts) == 1 { i = i + ":" + strconv.Itoa(DefaultRemotePort) } peers = append(peers, http.Client{ Transport: &http.Transport{ DialContext: func(_ context.Context, _, _ string) (net.Conn, error) { return net.Dial("tcp", i) }, }, }) } pingWaitCounter := 0 for !*doComplete { if !monitorIsDown { cmdFormationState := exec.Command("/usr/bin/pg_autoctl", "show", "state", "--formation", state.formation, "--json") cmdFormationState.Env = state.env if localStateB, err := cmdFormationState.Output(); err != nil { log.WithError(err).Warn("Failed to obtain the formation state from pg_autoctl. Monitor is probably down.") monitorIsDown = true // Delegate the monitor up-checking to a separate go routine so we do not block our main task of // sending pings to the configuration peer go func() { for !*doComplete { cmdMonitorUp := exec.Command("/usr/bin/pg_autoctl", "show", "state", "--formation", state.formation, "--json") cmdMonitorUp.Env = state.env if _, err := cmdMonitorUp.Output(); err == nil { log.Warn("Monitor node is no longer down.") monitorIsDown = false return } time.Sleep(MonitorUpCheckDelay) } }() } else { var formationState []postgresInstance if err := json.Unmarshal(localStateB, &formationState); err != nil { log.WithError(err).Warn("Failed to parse the local state from pg_autoctl") } else { instances = formationState } } } if monitorIsDown && !state.isMonitor && len(instances) == 0 { // Try to obtain info about the current node. That way we may still be able to operate // in a read-only mode or with luck even in read-write mode. If the configuration peer // is connected to all the nodes then it can reconstruct the full picture. cmdLocalState := exec.Command("/usr/bin/pg_autoctl", "show", "state", "--local", "--json") cmdLocalState.Env = state.env if localStateB, err := cmdLocalState.Output(); err != nil { log.WithError(err).Warn("Failed to obtain the local state from pg_autoctl") } else { var localState postgresInstance if err := json.Unmarshal(localStateB, &localState); err != nil { log.WithError(err).Warn("Failed to parse the local state from pg_autoctl") } else { instances = []postgresInstance{localState} } } } // Keep maintaining the last known primary/standby configuration if monitor is down // Note: We do keep producing configuration files as some volatile values (like // IP addresses) still may change if !monitorIsDown || len(instances) != 0 { if newState, err := configure(instances); err != nil { log.WithFields(log.Fields{"instances": instances}).Warn("Failed to produce configuration from instances.") } else if !bytes.Equal(newState, lastSentStateJson) { // Send the new state lastSentStateJson = newState log.WithFields(log.Fields{"state": string(newState)}).Info("Sending configuration to the peer processes (if any)") for _, i := range peers { if response, err := i.Post("http://unix/config", "application/octet-stream", bytes.NewReader(newState)); err != nil { log.WithError(err).WithFields(log.Fields{"peer": i, "response": response}).Warn("Failed to send configuration to a peer process.") } } } } // Send a periodic ping to the configuration worker if pingWaitCounter == ExecutionCyclesPerPing { pingWaitCounter = 0 log.Trace("Pinging configuration peers (if any)") for _, i := range peers { if response, err := i.Get("http://unix/ping"); err != nil { log.WithError(err).WithFields(log.Fields{"peer": i, "response": response}).Trace("Failed to ping to a peer process.") } else { var msg pingMessage if err := json.NewDecoder(response.Body).Decode(&msg); err != nil { log.WithError(err).WithFields(log.Fields{"peer": i}).Warn("Failed to decode ping response.") } else { log.WithFields(log.Fields{"response": msg}).Trace("Ping response") if msg.NeedsConfig { log.WithFields(log.Fields{"peer": i, "state": string(lastSentStateJson)}).Info("Sending configuration to a peer process") if response, err := i.Post("http://unix/config", "application/octet-stream", bytes.NewReader(lastSentStateJson)); err != nil { log.WithError(err).WithFields(log.Fields{"response": response}).Warn("Failed to send configuration to the peer process.") } } } } } } else { pingWaitCounter++ } time.Sleep(ExecutionCycleSleep) } } func ensurePostgresReadyForQueries() { for { log.Trace("Testing if we can query the Postgres database") testCmd := exec.Command("psql", "-c", "SELECT 0;") if err := testCmd.Run(); err == nil { break } time.Sleep(1000 * time.Millisecond) } log.Trace("Postgres database accepts queries") } func ensurePgHbaConfReplaced(hbaConfPath string) { needsPgConfReload := false if hbaConfPath != "" { // Postgres refuses to initialize a data directory that is initially not empty targetDir := getEnvPgData() targetPath := filepath.Join(targetDir, PgHbaConfFileName) initDonePath := filepath.Join(targetDir, PostgresInitDoneFileName) log.WithFields(log.Fields{"path": targetDir}).Info("Ensuring Postgres data dir is initialized") for { if _, err := os.Stat(initDonePath); err == nil { break } else if !os.IsNotExist(err) { log.Fatal("Failed to access Postgres data directory") } time.Sleep(100 * time.Millisecond) } if info, err := os.Stat(hbaConfPath); err != nil || info.IsDir() { log.WithError(err).Fatal("Cannot find the pg_hba.conf to be linked as requested by AUTOCONFIG_LINK_HBA_CONF") } if info, err := os.Lstat(targetPath); (err != nil && os.IsNotExist(err)) || (err == nil && info.Mode().Type() != os.ModeSymlink) { // During secondary node initialization the pg_hba.conf is completely missing if !os.IsNotExist(err) { if err := os.Remove(targetPath); err != nil { log.Warn("Failed to remove the original pg_hba.conf") } } log.WithFields(log.Fields{"path": hbaConfPath}).Info("Applying the requested pg_hba.conf") if err := os.Symlink(hbaConfPath, targetPath); err != nil { log.WithError(err).Fatal("Failed to sym-link the pg_hb.conf requested by AUTOCONFIG_LINK_HBA_CONF") } needsPgConfReload = true } } if needsPgConfReload { ensurePostgresReadyForQueries() log.Info("Reloading Postgres pg_hba.conf") reloadCmd := exec.Command("psql", "-c", "SELECT 0 FROM pg_reload_conf();") if err := reloadCmd.Run(); err != nil { log.WithError(err).Fatal("Failed to reload Postgres configuration to apply new pg_hba.conf") } } } func configure(nodes []postgresInstance) ([]byte, error) { var msg configMessage for _, i := range nodes { var isPrimary bool switch i.AssignedState { case GroupStatePrimary, GroupStatePrimaryDegraded, GroupStatePrimaryAlone: isPrimary = true case GroupStateStandby: isPrimary = false default: // Skip the node as it is not ready to accept connections log.WithFields(log.Fields{"nodeId": i.NodeId, "nodeName": i.NodeName, "port": i.NodePort, "assignedState": i.AssignedState, "currentState": i.CurrentState}).Trace("Skipping node.") continue } // Pgpool does not re-resolve host names and assumes IP addresses are // fixed. Therefore we do it ourselves and our peer application will // mark any IP addresses no longer used for Postgres nodes as down so // Pgpool happily uses the ones that are up. var a configInstance if ips, err := net.LookupIP(i.NodeHost); err != nil || len(ips) == 0 { log.WithError(err).WithFields(log.Fields{"host": i.NodeHost}).Warn("Failed resolve node's host name, skipping.") continue } else { a.IpAddress = ips[0] } a.Port = i.NodePort a.HostName = i.NodeHost a.IsReadOnly = !isPrimary msg.Instances = append(msg.Instances, a) } // TODO add user=configurable weights for i := range msg.Instances { msg.Instances[i].Weight = 1.0 / float32(len(msg.Instances)) } return json.Marshal(msg) } func getEnvOrDefault(name string, defaultValue string) string { if val := os.Getenv(name); val != "" { return val } else { return defaultValue } } func getEnvOrDefaultBool(name string, defaultValue bool) bool { if valS := os.Getenv(name); valS != "" { valS = strings.ToLower(valS) if valS == "1" || valS == "yes" || valS == "true" { return true } else { return false } } else { return defaultValue } } func getEnvOrDefaultInt(name string, defaultValue int) int { if val := os.Getenv(name); val != "" { if val2, err := strconv.Atoi(val); err == nil { return val2 } } return defaultValue } func getEnvPgData() string { return getEnvOrDefault("PGDATA", PostgresDataDefaultPath) }