diff --git a/Dockerfile b/Dockerfile index e7766d1..22498d9 100644 --- a/Dockerfile +++ b/Dockerfile @@ -9,16 +9,13 @@ RUN mkdir bin/ && go build -o bin/ ./... FROM alpine:3.18 -#RUN ca-certificates curl ldap-utils libaudit1 libbsd0 libcap-ng0 libcom-err2 libcrypt1 libedit2 libffi8 libgcc-s1 libgmp10 libgnutls30 libgssapi-krb5-2 libhogweed6 libicu72 libidn2-0 libk5crypto3 libkeyutils1 libkrb5-3 libkrb5support0 libldap-2.5-0 libldap-common liblzma5 libmd0 libnettle8 libnss-ldapd libp11-kit0 libpam-ldapd libpam0g libpq5 libsasl2-2 libssl3 libstdc++6 libtasn1-6 libtinfo6 libunistring2 libuuid1 libxml2 libxslt1.1 nslcd procps zlib1g RUN apk add --no-cache pgpool gettext postgresql-client && \ mkdir /var/run/pgpool/ && \ chmod 777 /var/run/pgpool && \ chmod 777 /var/log COPY --from=builder /usr/local/src/pg_autopool/bin/pg_autopool / -#COPY ./conf /etc/pgpool EXPOSE 5432 -ENTRYPOINT ["/pg_autopool"] -CMD ["pgpool", "-n"] \ No newline at end of file +ENTRYPOINT ["/pg_autopool"] \ No newline at end of file diff --git a/main.go b/main.go index 32cb8dd..5458b66 100644 --- a/main.go +++ b/main.go @@ -12,6 +12,7 @@ import ( "strconv" "strings" "sync" + "time" log "github.com/sirupsen/logrus" ) @@ -21,6 +22,9 @@ const DefaultSocketPath = "/var/run/pg_autoconfig.sock" const WorkDir = "/var/run/pgpool" const PgpoolConfigPath = WorkDir + "/pgpool.conf" const PcpConfigPath = WorkDir + "/pcp.conf" +const DefaultRemotePort = 5420 +const MaxNonPingedMessage = 20 * time.Second +const ConfigurationDelay = 4 * time.Second type pgpoolConfigMessage struct { Instances []pgpoolInstance @@ -52,6 +56,23 @@ type pgpoolForcedConfig struct { flag string } +type global struct { + nodeHistory map[pgpoolNodeKey]pgpoolNodeInfo + messageHistoryLock sync.Mutex + messageHistory map[string]receivedMessage + adminLogin string + adminPass string + intialConfigWait sync.WaitGroup + initialConfigDone bool + configLock sync.Mutex + configTimer *time.Timer +} + +type receivedMessage struct { + msg pgpoolConfigMessage + lastPing time.Time +} + type pgpoolNodeKey struct { address string // net.IP port int @@ -60,6 +81,7 @@ type pgpoolNodeKey struct { type pgpoolNodeInfo struct { num int isPrimary bool + isInUse bool } func main() { @@ -77,22 +99,43 @@ func main() { log.Info("Starting pg_auto_failover load balancer using pgpool, version " + version) + useLocal := false + useRemote := false nextIsSocket := false socketPath := DefaultSocketPath + nextIsPort := false + httpPort := DefaultRemotePort for _, j := range os.Args[1:] { if nextIsSocket { nextIsSocket = false socketPath = j + } else if nextIsPort { + nextIsPort = false + if value, err := strconv.ParseInt(j, 10, 32); err != nil { + log.Fatal("Invalid port number") + } else { + httpPort = int(value) + } } else { switch j { + case "--local": + useLocal = true case "--socket": nextIsSocket = true + case "--remote": + useRemote = true + case "--port": + nextIsPort = true default: log.WithFields(log.Fields{"value": j}).Fatal("Unknown command switch") } } } + if !useLocal && !useRemote { + log.Fatal("You must enable --local, --remote or both.") + } + // Set permissions as we usually run as non-root //if err := os.Chown(WorkDir, os.Getuid(), os.Getgid()); err != nil { // log.WithFields(log.Fields{"dir": WorkDir, "uid": os.Getuid(), "gid": os.Getgid()}).Warn("Failed to set owner of the work directory to the current user.") @@ -102,26 +145,27 @@ func main() { //} // Authentication for pgpool management - adminLogin := os.Getenv("PGPOOL_ADMIN_USERNAME") - adminPass := os.Getenv("PGPOOL_ADMIN_PASSWORD") + var state global + state.adminLogin = os.Getenv("PGPOOL_ADMIN_USERNAME") + state.adminPass = os.Getenv("PGPOOL_ADMIN_PASSWORD") adminPassPath := os.Getenv("PGPOOL_ADMIN_PASSWORD_FILE") if adminPassFile, err := os.Stat(adminPassPath); err == nil && !adminPassFile.IsDir() { if v, err := os.ReadFile(adminPassPath); err == nil { - adminPass = string(v) + state.adminPass = string(v) } } - if adminLogin == "" || adminPass == "" { + if state.adminLogin == "" || state.adminPass == "" { log.Fatal("Pgpool admin PGPOOL_ADMIN_USERNAME and either PGPOOL_ADMIN_PASSWORD or PGPOOL_ADMIN_PASSWORD_FILE must be set") } - adminPassHashB := md5.Sum([]byte(adminPass)) + adminPassHashB := md5.Sum([]byte(state.adminPass)) adminPassHash := hex.EncodeToString(adminPassHashB[:]) if pcpFile, err := os.Create(PcpConfigPath); err != nil { log.WithError(err).Fatal("Cannot create the pcp config file.") } else { defer pcpFile.Close() - if _, err := pcpFile.WriteString(adminLogin + ":" + adminPassHash); err != nil { + if _, err := pcpFile.WriteString(state.adminLogin + ":" + adminPassHash); err != nil { log.WithError(err).Fatal("Cannot write to the the pcp config file.") } pcpFile.Sync() @@ -131,16 +175,30 @@ func main() { // Note: we must remember all IP/port pairs that appeared during // the runtime of the pgpool instance so we can properly report // former nodes are down. - var intialConfigWait sync.WaitGroup - intialConfigWait.Add(1) - initialConfigDone := false - nodes := make(map[pgpoolNodeKey]pgpoolNodeInfo) + state.intialConfigWait.Add(1) + state.nodeHistory = make(map[pgpoolNodeKey]pgpoolNodeInfo) + state.messageHistory = make(map[string]receivedMessage) + doComplete := false handler := http.NewServeMux() handler.HandleFunc("/ping", func(w http.ResponseWriter, r *http.Request) { log.Trace("Received a ping") + source := strings.Split(r.RemoteAddr, ":")[0] + + // Note: Look for message history cleanup go routine below + state.messageHistoryLock.Lock() + var needsConfig bool + if hist, exists := state.messageHistory[source]; exists { + hist.lastPing = time.Now() + state.messageHistory[source] = hist + needsConfig = false + } else { + needsConfig = true + } + state.messageHistoryLock.Unlock() + var msg pgpoolPingMessage - msg.NeedsConfig = !initialConfigDone + msg.NeedsConfig = needsConfig if msgS, err := json.Marshal(msg); err != nil { log.WithFields(log.Fields{"msg": msg}).Warn("Failed to serializeping message") } else { @@ -156,44 +214,125 @@ func main() { http.Error(w, err.Error(), http.StatusBadRequest) } - log.WithFields(log.Fields{"instances": msg.Instances}).Info("Received a new configuration") - configure(msg, nodes, !initialConfigDone, adminLogin, adminPass) - log.WithFields(log.Fields{"nodes": nodes}).Info("Configured") - if !initialConfigDone { - initialConfigDone = true - intialConfigWait.Done() - } + source := strings.Split(r.RemoteAddr, ":")[0] + log.WithFields(log.Fields{"source": source, "instances": msg.Instances}).Info("Received a new configuration") + + // Update the message history + state.messageHistoryLock.Lock() + state.messageHistory[source] = receivedMessage{msg: msg, lastPing: time.Now()} + state.messageHistoryLock.Unlock() + + // Prevent configuration trashing while receiving info from multiple sources using a timer + queueConfigure(&state) w.WriteHeader(http.StatusOK) w.Write([]byte("OK")) }) - server := http.Server{ - Handler: handler, - } + go func() { - os.Remove(socketPath) - if listener, err := net.Listen("unix", socketPath); err != nil { - log.WithError(err).Fatal("Failed to start config change listener") - } else { - server.Serve(listener) + // Cleanup stale message sources and cause reconfigurations + for !doComplete { + now := time.Now() + reconfigure := false + state.messageHistoryLock.Lock() + minExpire := MaxNonPingedMessage + for i, j := range state.messageHistory { + if toExpire := j.lastPing.Add(MaxNonPingedMessage).Sub(now); toExpire < 0 { + log.WithFields(log.Fields{"source": i}).Trace("Missing pings from source so removing a message") + delete(state.messageHistory, i) + reconfigure = true + } else if toExpire < minExpire { + minExpire = toExpire + } + } + state.messageHistoryLock.Unlock() + + log.WithFields(log.Fields{"reconfigure": reconfigure, "sleep": minExpire}).Trace("Stale messages cleaned-up") + if reconfigure { + queueConfigure(&state) + } + + time.Sleep(minExpire) } }() + var serverLocal, serverRemote http.Server + if useLocal { + serverLocal := http.Server{ + Handler: handler, + } + go func() { + os.Remove(socketPath) + if listener, err := net.Listen("unix", socketPath); err != nil { + log.WithError(err).Fatal("Failed to start local config change listener") + } else { + serverLocal.Serve(listener) + } + }() + } + if useRemote { + serverRemote := http.Server{ + Handler: handler, + } + go func() { + if listener, err := net.Listen("tcp", ":"+strconv.Itoa(httpPort)); err != nil { + log.WithError(err).Fatal("Failed to start remote config change listener") + } else { + serverRemote.Serve(listener) + } + }() + } + // Start the inner executable (usually starting with "pg_autoctl create postgres" or "pg_autoctl create monitor") log.Info("Waiting for the initial configuration to arrive") - intialConfigWait.Wait() + state.intialConfigWait.Wait() log.Info("Handling over to the inner pgpool process") cmd := exec.Command("pgpool", "-n", "--config-file", PgpoolConfigPath, "-F", PcpConfigPath) cmd.Stdout = os.Stdout cmd.Stderr = os.Stderr - //cmd.Env = os.Environ() cmd.Run() log.Info("pg_auto_failover load balancer has completed.") - server.Close() + doComplete = true + if useLocal { + serverLocal.Close() + } + if useRemote { + serverRemote.Close() + } } -func configure(msg pgpoolConfigMessage, nodesHistory map[pgpoolNodeKey]pgpoolNodeInfo, isInitial bool, adminLogin string, adminPass string) { +func queueConfigure(state *global) { + // Note: We use messageHistoryLock to protect configTimer as the lock + // is never held for too long as opposed to the configLock + state.messageHistoryLock.Lock() + if state.configTimer == nil || !state.configTimer.Reset(ConfigurationDelay) { + state.configTimer = time.NewTimer(ConfigurationDelay) + go func() { + <-state.configTimer.C + // We need to continue processing incoming pings so that + // messages do not get invalidated in a few seconds + snapshot := []pgpoolConfigMessage{} + state.messageHistoryLock.Lock() + for _, j := range state.messageHistory { + snapshot = append(snapshot, j.msg) + } + state.messageHistoryLock.Unlock() + + state.configLock.Lock() + configure(state, snapshot) + if !state.initialConfigDone { + state.initialConfigDone = true + state.intialConfigWait.Done() + } + state.configLock.Unlock() + }() + } + state.messageHistoryLock.Unlock() +} + +func configure(state *global, messageSnapshot []pgpoolConfigMessage) { + log.WithFields(log.Fields{"messages": messageSnapshot}).Trace("Configuring") conf := make(map[string]interface{}) // Note: We are mostly compatible with the bitnami pgpool docker regarding env vars @@ -316,7 +455,7 @@ func configure(msg pgpoolConfigMessage, nodesHistory map[pgpoolNodeKey]pgpoolNod // Backend settings // - // Get forced configuration (currentyl only weights) + // Get forced configuration (currently only weights) forcedNodes, forcedNodesErr := parseForcedNodeConfigs(os.Getenv("PGPOOL_BACKEND_NODES")) if forcedNodesErr != nil { log.WithError(forcedNodesErr).Warn("Failed to parse forced node configuration. Ignoring.") @@ -325,50 +464,87 @@ func configure(msg pgpoolConfigMessage, nodesHistory map[pgpoolNodeKey]pgpoolNod // Merge node info by re-using address/port pairs as those cannot // be modified without pgpool restart // Note: However there is no need to print the nodes that are down - hasPrimaryChanged := false + changedDown := make(map[pgpoolNodeKey]int) + changedUp := make(map[pgpoolNodeKey]int) processedNums := make(map[int]bool) - for _, i := range msg.Instances { - nodeKey := pgpoolNodeKey{address: i.IpAddress.String(), port: i.Port} - var num int - if histNode, exists := nodesHistory[nodeKey]; exists { - num = histNode.num - hasPrimaryChanged = hasPrimaryChanged || histNode.isPrimary != !i.IsReadOnly - } else { - num = len(nodesHistory) - hasPrimaryChanged = hasPrimaryChanged || !i.IsReadOnly + var prevPrimary *pgpoolNodeKey + for i, j := range state.nodeHistory { + if j.isInUse { + changedDown[i] = j.num } - nodesHistory[nodeKey] = pgpoolNodeInfo{num: num, isPrimary: !i.IsReadOnly} - processedNums[num] = true - - var weight float32 - if forced, exists := forcedNodes[pgpoolForcedKey{hostName: i.HostName, port: i.Port}]; exists && forced.weight != -1 { - weight = forced.weight - } else { - weight = i.Weight + if j.isPrimary { + prevPrimary = &pgpoolNodeKey{address: i.address, port: i.port} + j.isPrimary = false + state.nodeHistory[i] = j } - - var flag string - if !i.IsReadOnly { - flag = "ALWAYS_PRIMARY|DISALLOW_TO_FAILOVER" - } else { - flag = "DISALLOW_TO_FAILOVER" - } - - suffix := strconv.Itoa(num) - conf["backend_hostname"+suffix] = i.IpAddress.String() - conf["backend_port"+suffix] = i.Port - conf["backend_weight"+suffix] = weight - conf["backend_flag"+suffix] = flag } - // Reset node primality in instances that disappeared - for i, j := range nodesHistory { - if _, exists := processedNums[j.num]; !exists { - j.isPrimary = false - nodesHistory[i] = j + // Determine the new primary + // Note: There may be several reported in a transition state although + // we have a trashing protection in the caller of this function. Anyway + // max one primary can be chosen so use the first one + var newPrimary *pgpoolNodeKey + for _, i := range messageSnapshot { + for _, j := range i.Instances { + if !j.IsReadOnly { + newPrimary = &pgpoolNodeKey{address: j.IpAddress.String(), port: j.Port} + break + } } } + for _, i := range messageSnapshot { + for _, j := range i.Instances { + nodeKey := pgpoolNodeKey{address: j.IpAddress.String(), port: j.Port} + var num int + if histNode, exists := state.nodeHistory[nodeKey]; exists { + num = histNode.num + } else { + num = len(state.nodeHistory) + } + + if _, exists := processedNums[num]; exists { + // node already seen in another message + continue + } + + processedNums[num] = true + isPrimary := newPrimary != nil && *newPrimary == nodeKey + state.nodeHistory[nodeKey] = pgpoolNodeInfo{num: num, isPrimary: isPrimary, isInUse: true} + + if _, exists := changedDown[nodeKey]; exists { + delete(changedDown, nodeKey) + } else { + changedUp[nodeKey] = num + } + + var weight float32 + if forced, exists := forcedNodes[pgpoolForcedKey{hostName: j.HostName, port: j.Port}]; exists && forced.weight != -1 { + weight = forced.weight + } else { + weight = j.Weight + } + + var flag string + if isPrimary { + flag = "ALWAYS_PRIMARY|DISALLOW_TO_FAILOVER" + } else { + flag = "DISALLOW_TO_FAILOVER" + } + + suffix := strconv.Itoa(num) + conf["backend_hostname"+suffix] = j.IpAddress.String() + conf["backend_port"+suffix] = j.Port + conf["backend_weight"+suffix] = weight + conf["backend_flag"+suffix] = flag + } + } + + // Complete transition of nodeHistory + for i, j := range changedDown { + state.nodeHistory[i] = pgpoolNodeInfo{num: j, isInUse: false} + } + /* read -r -a nodes <<<"$(tr ',;' ' ' <<<"${PGPOOL_BACKEND_NODES}")" @@ -430,49 +606,54 @@ func configure(msg pgpoolConfigMessage, nodesHistory map[pgpoolNodeKey]pgpoolNod configFile.Sync() } - if !isInitial { - if pcpAuth, err := createPcpAuth(adminLogin, adminPass, "localhost"); err != nil { + if state.initialConfigDone { + if pcpAuth, err := createPcpAuth(state.adminLogin, state.adminPass, "localhost"); err != nil { log.WithError(err).Fatal("Cannot create PCP authentication file.") } else { defer releasePcpAuth(pcpAuth) + // Detach the nodes + for _, j := range changedDown { + log.WithFields(log.Fields{"num": j}).Info("Detaching node") + detachNode(j, state.adminLogin, pcpAuth) + } + // Reload config - reloadCmd := exec.Command("pcp_reload_config", "-h", "localhost", "-U", adminLogin, "-w") + reloadCmd := exec.Command("pcp_reload_config", "-h", "localhost", "-U", state.adminLogin, "-w") reloadCmd.Env = []string{"PCPPASSFILE=" + pcpAuth} if err := reloadCmd.Run(); err != nil { log.WithError(err).Warn("Failed to force config reload.") } - // Attach nodes - for _, j := range msg.Instances { - if info, exists := nodesHistory[pgpoolNodeKey{address: j.IpAddress.String(), port: j.Port}]; !exists { - log.WithFields(log.Fields{"address": j.IpAddress, "Port": j.Port}).Warn("Failed to resolve node number. Skipping attach.") - } else { - attachCmd := exec.Command("pcp_attach_node", "-h", "localhost", "-U", adminLogin, "-n", strconv.Itoa(info.num), "-w") - attachCmd.Env = []string{"PCPPASSFILE=" + pcpAuth} - if err := attachCmd.Run(); err != nil { - log.WithError(err).WithFields(log.Fields{"address": j.IpAddress, "Port": j.Port}).Warn("Node attach failed.") - } + // Change the primary first as it also brings down the previous primary (if any) + if newPrimary != nil && (prevPrimary == nil || *newPrimary != *prevPrimary) { + newNode := state.nodeHistory[*newPrimary] + if _, exists := changedUp[*newPrimary]; exists { + log.WithFields(log.Fields{"num": newNode.num}).Info("Attaching node") + attachNode(newNode.num, state.adminLogin, pcpAuth) + } + log.WithFields(log.Fields{"num": newNode.num}).Info("Promoting to primary") + promoteNode(newNode.num, state.adminLogin, pcpAuth) + } + if prevPrimary != nil { + prevNode := state.nodeHistory[*prevPrimary] + if prevNode.isInUse { + // There is no demote se re-attach it as secondary + attachNode(prevNode.num, state.adminLogin, pcpAuth) } } - // Set the primary - if hasPrimaryChanged { - for _, i := range msg.Instances { - if !i.IsReadOnly { - num := nodesHistory[pgpoolNodeKey{address: i.IpAddress.String(), port: i.Port}].num - log.WithFields(log.Fields{"host": i.HostName, "address": i.IpAddress, "port": i.Port, "id": num}).Info("Changing the primary node") - primaryCmd := exec.Command("pcp_promote_node", "-h", "localhost", "-U", adminLogin, "-n", strconv.Itoa(num), "-w") - primaryCmd.Env = []string{"PCPPASSFILE=" + pcpAuth} - if err := primaryCmd.Run(); err != nil { - log.WithError(err).Warn("Failed to change the primary node.") - } - break - } + // Attach the rest of nodes nodes + for i, j := range changedUp { + if newPrimary == nil || *newPrimary != i { + log.WithFields(log.Fields{"num": j}).Info("Attaching node") + attachNode(j, state.adminLogin, pcpAuth) } } } } + + log.WithFields(log.Fields{"nodes": state.nodeHistory}).Info("Configured") } func getEnvOrDefault(name string, defaultValue string) string { @@ -617,3 +798,23 @@ func releasePcpAuth(pcpAuth string) { log.Warn("Failed to delete the PCP authentication file.") } } + +func attachNode(num int, adminLogin string, pcpAuth string) { + pcpNodeCall("pcp_attach_node", num, adminLogin, pcpAuth) +} + +func detachNode(num int, adminLogin string, pcpAuth string) { + pcpNodeCall("pcp_detach_node", num, adminLogin, pcpAuth) +} + +func promoteNode(num int, adminLogin string, pcpAuth string) { + pcpNodeCall("pcp_promote_node", num, adminLogin, pcpAuth) +} + +func pcpNodeCall(executable string, num int, adminLogin string, pcpAuth string) { + cmd := exec.Command(executable, "-h", "localhost", "-U", adminLogin, "-n", strconv.Itoa(num), "-w") + cmd.Env = []string{"PCPPASSFILE=" + pcpAuth} + if err := cmd.Run(); err != nil { + log.WithError(err).WithFields(log.Fields{"num": num}).Warn(executable + " failed.") + } +}