Support for remote peers over TCP
All checks were successful
continuous-integration/drone/push Build is passing

This commit is contained in:
2024-04-06 01:32:24 +02:00
parent 65a8874da5
commit e1d7029fe0

116
main.go
View File

@@ -8,6 +8,8 @@ import (
"net/http"
"os"
"os/exec"
"strconv"
"strings"
"time"
log "github.com/sirupsen/logrus"
@@ -21,6 +23,8 @@ const GroupStateStandby = "secondary"
const DefaultFormation = "default"
const DefaultSocketPath = "/var/run/pg_autoconfig.sock"
const CyclesPerPing = 10
const DefaultRemotePort = 5420
const MonitorUpCheckDelay = 15 * time.Second
type postgresInstance struct {
// Locally available info
@@ -89,8 +93,11 @@ func main() {
nextIsType := false
var config IConfiguration
useLocal := false
nextIsSocket := false
socketPath := DefaultSocketPath
nextIsRemotes := false
remotes := []string{}
nextIsFormation := false
formation := DefaultFormation
argsLeft := os.Args[1:]
@@ -106,6 +113,9 @@ func main() {
} else if nextIsSocket {
nextIsSocket = false
socketPath = j
} else if nextIsRemotes {
nextIsRemotes = false
remotes = strings.Split(j, ",")
} else if nextIsFormation {
nextIsFormation = false
formation = j
@@ -116,6 +126,10 @@ func main() {
switch j {
case "--type":
nextIsType = true
case "--remotes":
nextIsRemotes = true
case "--local":
useLocal = true
case "--socket":
nextIsSocket = true
case "--formation":
@@ -129,6 +143,9 @@ func main() {
if config == nil {
log.Fatal("No configuration type specified. Use --type <type>")
}
if !useLocal && len(remotes) == 0 {
log.Fatal("At least one peer connector must be used, add --remotes, --local or both")
}
if len(argsLeft) == 0 {
log.Fatal("No inner command found to execute. Use -- to separate the inner executable and its args")
}
@@ -162,40 +179,65 @@ func main() {
}
}
httpc := http.Client{
Transport: &http.Transport{
DialContext: func(_ context.Context, _, _ string) (net.Conn, error) {
return net.Dial("unix", socketPath)
peers := []http.Client{}
if useLocal {
peers = append(peers, http.Client{
Transport: &http.Transport{
DialContext: func(_ context.Context, _, _ string) (net.Conn, error) {
return net.Dial("unix", socketPath)
},
},
},
})
}
for _, i := range remotes {
if iParts := strings.Split(i, ":"); len(iParts) == 1 {
i = i + ":" + strconv.Itoa(DefaultRemotePort)
}
peers = append(peers, http.Client{
Transport: &http.Transport{
DialContext: func(_ context.Context, _, _ string) (net.Conn, error) {
return net.Dial("tcp", i)
},
},
})
}
pingWaitCounter := 0
for !doComplete {
cmdFormationState := exec.Command("/usr/bin/pg_autoctl", "show", "state", "--formation", formation, "--json")
cmdFormationState.Env = os.Environ()
if localStateB, err := cmdFormationState.Output(); err != nil {
if !monitorIsDown {
if !monitorIsDown {
cmdFormationState := exec.Command("/usr/bin/pg_autoctl", "show", "state", "--formation", formation, "--json")
cmdFormationState.Env = os.Environ()
if localStateB, err := cmdFormationState.Output(); err != nil {
log.WithError(err).Warn("Failed to obtain the formation state from pg_autoctl. Monitor is probably down.")
monitorIsDown = true
}
} else {
if monitorIsDown {
log.Warn("Monitor node is no longer down.")
monitorIsDown = false
}
var formationState []postgresInstance
if err := json.Unmarshal(localStateB, &formationState); err != nil {
log.WithError(err).Warn("Failed to parse the local state from pg_autoctl")
// Delegate the monitor up-checking to a separate go routine so we do not block our main task of
// sending pings to the configuration peer
go func() {
for !doComplete {
cmdMonitorUp := exec.Command("/usr/bin/pg_autoctl", "show", "state", "--formation", formation, "--json")
cmdMonitorUp.Env = os.Environ()
if _, err := cmdMonitorUp.Output(); err == nil {
log.Warn("Monitor node is no longer down.")
monitorIsDown = false
return
}
time.Sleep(MonitorUpCheckDelay)
}
}()
} else {
instances = formationState
var formationState []postgresInstance
if err := json.Unmarshal(localStateB, &formationState); err != nil {
log.WithError(err).Warn("Failed to parse the local state from pg_autoctl")
} else {
instances = formationState
}
}
}
if monitorIsDown && !isMonitor && len(instances) == 0 {
// Try to obtain info about the current node. That way we may still be able to operate
// in a read-only mode or with luck even in read-write mode.
// in a read-only mode or with luck even in read-write mode. If the configuration peer
// is connected to all the nodes then it can reconstruct the full picture.
cmdLocalState := exec.Command("/usr/bin/pg_autoctl", "show", "state", "--local", "--json")
cmdLocalState.Env = os.Environ()
if localStateB, err := cmdLocalState.Output(); err != nil {
@@ -219,9 +261,11 @@ func main() {
} else if !bytes.Equal(newState, lastSentStateJson) {
// Send the new state
lastSentStateJson = newState
log.WithFields(log.Fields{"state": string(newState)}).Info("Sending configuration to the peer process")
if response, err := httpc.Post("http://unix/config", "application/octet-stream", bytes.NewReader(newState)); err != nil {
log.WithError(err).WithFields(log.Fields{"response": response}).Warn("Failed to send configuration to the peer process.")
log.WithFields(log.Fields{"state": string(newState)}).Info("Sending configuration to the peer processes")
for _, i := range peers {
if response, err := i.Post("http://unix/config", "application/octet-stream", bytes.NewReader(newState)); err != nil {
log.WithError(err).WithFields(log.Fields{"peer": i, "response": response}).Warn("Failed to send configuration to a peer process.")
}
}
}
}
@@ -230,18 +274,20 @@ func main() {
if pingWaitCounter == CyclesPerPing {
pingWaitCounter = 0
log.Trace("Pinging configuration peer")
if response, err := httpc.Get("http://unix/ping"); err != nil {
log.WithError(err).WithFields(log.Fields{"response": response}).Trace("Failed to ping to the peer process.")
} else {
var msg pgpoolPingMessage
if err := json.NewDecoder(response.Body).Decode(&msg); err != nil {
log.WithError(err).Warn("Failed to decode ping response.")
for _, i := range peers {
if response, err := i.Get("http://unix/ping"); err != nil {
log.WithError(err).WithFields(log.Fields{"peer": i, "response": response}).Trace("Failed to ping to a peer process.")
} else {
log.WithFields(log.Fields{"Response": msg}).Trace("Ping response")
if msg.NeedsConfig {
log.WithFields(log.Fields{"state": string(lastSentStateJson)}).Info("Sending configuration to the peer process")
if response, err := httpc.Post("http://unix/config", "application/octet-stream", bytes.NewReader(lastSentStateJson)); err != nil {
log.WithError(err).WithFields(log.Fields{"response": response}).Warn("Failed to send configuration to the peer process.")
var msg pgpoolPingMessage
if err := json.NewDecoder(response.Body).Decode(&msg); err != nil {
log.WithError(err).WithFields(log.Fields{"peer": i}).Warn("Failed to decode ping response.")
} else {
log.WithFields(log.Fields{"peer": i, "response": msg}).Trace("Ping response")
if msg.NeedsConfig {
log.WithFields(log.Fields{"peer": i, "state": string(lastSentStateJson)}).Info("Sending configuration to a peer process")
if response, err := i.Post("http://unix/config", "application/octet-stream", bytes.NewReader(lastSentStateJson)); err != nil {
log.WithError(err).WithFields(log.Fields{"response": response}).Warn("Failed to send configuration to the peer process.")
}
}
}
}