Fix startup issues and add pinging config peer.
This commit is contained in:
37
main.go
37
main.go
@@ -4,7 +4,6 @@ import (
|
||||
"bytes"
|
||||
"context"
|
||||
"encoding/json"
|
||||
"flag"
|
||||
"net"
|
||||
"net/http"
|
||||
"os"
|
||||
@@ -20,6 +19,7 @@ const GroupStatePrimaryAlone = "wait-primary"
|
||||
const GroupStateStandby = "secondary"
|
||||
const DefaultFormation = "default"
|
||||
const DefaultSocketPath = "/var/run/pg_autoconfig.sock"
|
||||
const CyclesPerPing = 10
|
||||
|
||||
type postgresInstance struct {
|
||||
// Locally available info
|
||||
@@ -67,9 +67,11 @@ type pgpoolInstance struct {
|
||||
IsReadOnly bool
|
||||
}
|
||||
|
||||
func main() {
|
||||
flag.Parse()
|
||||
type pgpoolPingMessage struct {
|
||||
NeedsConfig bool
|
||||
}
|
||||
|
||||
func main() {
|
||||
// Logging
|
||||
var logLevelS string
|
||||
if logLevelS = os.Getenv("AUTOCONFIG_LOG_LEVEL"); logLevelS == "" {
|
||||
@@ -90,8 +92,8 @@ func main() {
|
||||
socketPath := DefaultSocketPath
|
||||
nextIsFormation := false
|
||||
formation := DefaultFormation
|
||||
var argsLeft []string
|
||||
for i, j := range os.Args {
|
||||
argsLeft := os.Args[1:]
|
||||
for i, j := range argsLeft {
|
||||
if nextIsType {
|
||||
nextIsType = false
|
||||
switch j {
|
||||
@@ -107,7 +109,8 @@ func main() {
|
||||
nextIsFormation = false
|
||||
formation = j
|
||||
} else if j == "--" {
|
||||
argsLeft = os.Args[i+1:]
|
||||
argsLeft = argsLeft[i+1:]
|
||||
break
|
||||
} else {
|
||||
switch j {
|
||||
case "--type":
|
||||
@@ -166,6 +169,7 @@ func main() {
|
||||
},
|
||||
}
|
||||
|
||||
pingWaitCounter := 0
|
||||
for !doComplete {
|
||||
cmdFormationState := exec.Command("/usr/bin/pg_autoctl", "show", "state", "--formation", formation, "--json")
|
||||
cmdFormationState.Env = os.Environ()
|
||||
@@ -220,6 +224,25 @@ func main() {
|
||||
}
|
||||
}
|
||||
|
||||
// Send a periodic ping to the configuration worker
|
||||
if pingWaitCounter == CyclesPerPing {
|
||||
pingWaitCounter = 0
|
||||
if response, err := httpc.Get("http://unix/ping"); err != nil {
|
||||
log.WithError(err).WithFields(log.Fields{"response": response}).Trace("Failed to ping to the peer process.")
|
||||
} else {
|
||||
var msg pgpoolPingMessage
|
||||
if err := json.NewDecoder(response.Body).Decode(&msg); err != nil {
|
||||
log.WithError(err).Warn("Failed to decode ping response.")
|
||||
} else {
|
||||
if msg.NeedsConfig {
|
||||
if response, err := httpc.Post("http://unix/config", "application/octet-stream", bytes.NewReader(lastSentStateJson)); err != nil {
|
||||
log.WithError(err).WithFields(log.Fields{"response": response}).Warn("Failed to send configuration to the peer process.")
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
time.Sleep(1000 * time.Millisecond)
|
||||
}
|
||||
}()
|
||||
@@ -228,6 +251,8 @@ func main() {
|
||||
log.WithFields(log.Fields{"name": argsLeft[0], "args": argsLeft[1:]}).Info("Handling over to inner process.")
|
||||
cmd := exec.Command(argsLeft[0], argsLeft[1:]...)
|
||||
cmd.Env = os.Environ()
|
||||
cmd.Stdout = os.Stdout
|
||||
cmd.Stderr = os.Stderr
|
||||
cmd.Run()
|
||||
|
||||
log.Info("pg_auto_failover load balancer has completed.")
|
||||
|
||||
Reference in New Issue
Block a user