Fix ping, primary detection. More logging.
All checks were successful
continuous-integration/drone/push Build is passing

This commit is contained in:
2024-04-04 18:59:22 +02:00
parent e44155a459
commit 1353ae0a63

12
main.go
View File

@@ -15,7 +15,8 @@ import (
const version = "1.0"
const GroupStatePrimary = "primary"
const GroupStatePrimaryAlone = "wait-primary"
const GroupStatePrimaryDegraded = "wait-primary"
const GroupStatePrimaryAlone = "single"
const GroupStateStandby = "secondary"
const DefaultFormation = "default"
const DefaultSocketPath = "/var/run/pg_autoconfig.sock"
@@ -218,6 +219,7 @@ func main() {
} else if !bytes.Equal(newState, lastSentStateJson) {
// Send the new state
lastSentStateJson = newState
log.WithFields(log.Fields{"state": string(newState)}).Info("Sending configuration to the peer process")
if response, err := httpc.Post("http://unix/config", "application/octet-stream", bytes.NewReader(newState)); err != nil {
log.WithError(err).WithFields(log.Fields{"response": response}).Warn("Failed to send configuration to the peer process.")
}
@@ -227,6 +229,7 @@ func main() {
// Send a periodic ping to the configuration worker
if pingWaitCounter == CyclesPerPing {
pingWaitCounter = 0
log.Trace("Pinging configuration peer")
if response, err := httpc.Get("http://unix/ping"); err != nil {
log.WithError(err).WithFields(log.Fields{"response": response}).Trace("Failed to ping to the peer process.")
} else {
@@ -234,13 +237,17 @@ func main() {
if err := json.NewDecoder(response.Body).Decode(&msg); err != nil {
log.WithError(err).Warn("Failed to decode ping response.")
} else {
log.WithFields(log.Fields{"Response": msg}).Trace("Ping response")
if msg.NeedsConfig {
log.WithFields(log.Fields{"state": string(lastSentStateJson)}).Info("Sending configuration to the peer process")
if response, err := httpc.Post("http://unix/config", "application/octet-stream", bytes.NewReader(lastSentStateJson)); err != nil {
log.WithError(err).WithFields(log.Fields{"response": response}).Warn("Failed to send configuration to the peer process.")
}
}
}
}
} else {
pingWaitCounter++
}
time.Sleep(1000 * time.Millisecond)
@@ -264,12 +271,13 @@ func (t *pgpoolConfiguration) configure(nodes []postgresInstance) ([]byte, error
for _, i := range nodes {
var isPrimary bool
switch i.AssignedState {
case GroupStatePrimary, GroupStatePrimaryAlone:
case GroupStatePrimary, GroupStatePrimaryDegraded, GroupStatePrimaryAlone:
isPrimary = true
case GroupStateStandby:
isPrimary = false
default:
// Skip the node as it is not ready to accept connections
log.WithFields(log.Fields{"nodeId": i.NodeId, "nodeName": i.NodeName, "port": i.NodePort, "assignedState": i.AssignedState, "currentState": i.CurrentState}).Trace("Skipping node.")
continue
}