Support for remote peers over TCP
All checks were successful
continuous-integration/drone/push Build is passing

This commit is contained in:
2024-04-06 01:36:31 +02:00
parent 852a717784
commit 5a07511649
2 changed files with 295 additions and 97 deletions

View File

@@ -9,16 +9,13 @@ RUN mkdir bin/ && go build -o bin/ ./...
FROM alpine:3.18
#RUN ca-certificates curl ldap-utils libaudit1 libbsd0 libcap-ng0 libcom-err2 libcrypt1 libedit2 libffi8 libgcc-s1 libgmp10 libgnutls30 libgssapi-krb5-2 libhogweed6 libicu72 libidn2-0 libk5crypto3 libkeyutils1 libkrb5-3 libkrb5support0 libldap-2.5-0 libldap-common liblzma5 libmd0 libnettle8 libnss-ldapd libp11-kit0 libpam-ldapd libpam0g libpq5 libsasl2-2 libssl3 libstdc++6 libtasn1-6 libtinfo6 libunistring2 libuuid1 libxml2 libxslt1.1 nslcd procps zlib1g
RUN apk add --no-cache pgpool gettext postgresql-client && \
mkdir /var/run/pgpool/ && \
chmod 777 /var/run/pgpool && \
chmod 777 /var/log
COPY --from=builder /usr/local/src/pg_autopool/bin/pg_autopool /
#COPY ./conf /etc/pgpool
EXPOSE 5432
ENTRYPOINT ["/pg_autopool"]
CMD ["pgpool", "-n"]
ENTRYPOINT ["/pg_autopool"]

387
main.go
View File

@@ -12,6 +12,7 @@ import (
"strconv"
"strings"
"sync"
"time"
log "github.com/sirupsen/logrus"
)
@@ -21,6 +22,9 @@ const DefaultSocketPath = "/var/run/pg_autoconfig.sock"
const WorkDir = "/var/run/pgpool"
const PgpoolConfigPath = WorkDir + "/pgpool.conf"
const PcpConfigPath = WorkDir + "/pcp.conf"
const DefaultRemotePort = 5420
const MaxNonPingedMessage = 20 * time.Second
const ConfigurationDelay = 4 * time.Second
type pgpoolConfigMessage struct {
Instances []pgpoolInstance
@@ -52,6 +56,23 @@ type pgpoolForcedConfig struct {
flag string
}
type global struct {
nodeHistory map[pgpoolNodeKey]pgpoolNodeInfo
messageHistoryLock sync.Mutex
messageHistory map[string]receivedMessage
adminLogin string
adminPass string
intialConfigWait sync.WaitGroup
initialConfigDone bool
configLock sync.Mutex
configTimer *time.Timer
}
type receivedMessage struct {
msg pgpoolConfigMessage
lastPing time.Time
}
type pgpoolNodeKey struct {
address string // net.IP
port int
@@ -60,6 +81,7 @@ type pgpoolNodeKey struct {
type pgpoolNodeInfo struct {
num int
isPrimary bool
isInUse bool
}
func main() {
@@ -77,22 +99,43 @@ func main() {
log.Info("Starting pg_auto_failover load balancer using pgpool, version " + version)
useLocal := false
useRemote := false
nextIsSocket := false
socketPath := DefaultSocketPath
nextIsPort := false
httpPort := DefaultRemotePort
for _, j := range os.Args[1:] {
if nextIsSocket {
nextIsSocket = false
socketPath = j
} else if nextIsPort {
nextIsPort = false
if value, err := strconv.ParseInt(j, 10, 32); err != nil {
log.Fatal("Invalid port number")
} else {
httpPort = int(value)
}
} else {
switch j {
case "--local":
useLocal = true
case "--socket":
nextIsSocket = true
case "--remote":
useRemote = true
case "--port":
nextIsPort = true
default:
log.WithFields(log.Fields{"value": j}).Fatal("Unknown command switch")
}
}
}
if !useLocal && !useRemote {
log.Fatal("You must enable --local, --remote or both.")
}
// Set permissions as we usually run as non-root
//if err := os.Chown(WorkDir, os.Getuid(), os.Getgid()); err != nil {
// log.WithFields(log.Fields{"dir": WorkDir, "uid": os.Getuid(), "gid": os.Getgid()}).Warn("Failed to set owner of the work directory to the current user.")
@@ -102,26 +145,27 @@ func main() {
//}
// Authentication for pgpool management
adminLogin := os.Getenv("PGPOOL_ADMIN_USERNAME")
adminPass := os.Getenv("PGPOOL_ADMIN_PASSWORD")
var state global
state.adminLogin = os.Getenv("PGPOOL_ADMIN_USERNAME")
state.adminPass = os.Getenv("PGPOOL_ADMIN_PASSWORD")
adminPassPath := os.Getenv("PGPOOL_ADMIN_PASSWORD_FILE")
if adminPassFile, err := os.Stat(adminPassPath); err == nil && !adminPassFile.IsDir() {
if v, err := os.ReadFile(adminPassPath); err == nil {
adminPass = string(v)
state.adminPass = string(v)
}
}
if adminLogin == "" || adminPass == "" {
if state.adminLogin == "" || state.adminPass == "" {
log.Fatal("Pgpool admin PGPOOL_ADMIN_USERNAME and either PGPOOL_ADMIN_PASSWORD or PGPOOL_ADMIN_PASSWORD_FILE must be set")
}
adminPassHashB := md5.Sum([]byte(adminPass))
adminPassHashB := md5.Sum([]byte(state.adminPass))
adminPassHash := hex.EncodeToString(adminPassHashB[:])
if pcpFile, err := os.Create(PcpConfigPath); err != nil {
log.WithError(err).Fatal("Cannot create the pcp config file.")
} else {
defer pcpFile.Close()
if _, err := pcpFile.WriteString(adminLogin + ":" + adminPassHash); err != nil {
if _, err := pcpFile.WriteString(state.adminLogin + ":" + adminPassHash); err != nil {
log.WithError(err).Fatal("Cannot write to the the pcp config file.")
}
pcpFile.Sync()
@@ -131,16 +175,30 @@ func main() {
// Note: we must remember all IP/port pairs that appeared during
// the runtime of the pgpool instance so we can properly report
// former nodes are down.
var intialConfigWait sync.WaitGroup
intialConfigWait.Add(1)
initialConfigDone := false
nodes := make(map[pgpoolNodeKey]pgpoolNodeInfo)
state.intialConfigWait.Add(1)
state.nodeHistory = make(map[pgpoolNodeKey]pgpoolNodeInfo)
state.messageHistory = make(map[string]receivedMessage)
doComplete := false
handler := http.NewServeMux()
handler.HandleFunc("/ping", func(w http.ResponseWriter, r *http.Request) {
log.Trace("Received a ping")
source := strings.Split(r.RemoteAddr, ":")[0]
// Note: Look for message history cleanup go routine below
state.messageHistoryLock.Lock()
var needsConfig bool
if hist, exists := state.messageHistory[source]; exists {
hist.lastPing = time.Now()
state.messageHistory[source] = hist
needsConfig = false
} else {
needsConfig = true
}
state.messageHistoryLock.Unlock()
var msg pgpoolPingMessage
msg.NeedsConfig = !initialConfigDone
msg.NeedsConfig = needsConfig
if msgS, err := json.Marshal(msg); err != nil {
log.WithFields(log.Fields{"msg": msg}).Warn("Failed to serializeping message")
} else {
@@ -156,44 +214,125 @@ func main() {
http.Error(w, err.Error(), http.StatusBadRequest)
}
log.WithFields(log.Fields{"instances": msg.Instances}).Info("Received a new configuration")
configure(msg, nodes, !initialConfigDone, adminLogin, adminPass)
log.WithFields(log.Fields{"nodes": nodes}).Info("Configured")
if !initialConfigDone {
initialConfigDone = true
intialConfigWait.Done()
}
source := strings.Split(r.RemoteAddr, ":")[0]
log.WithFields(log.Fields{"source": source, "instances": msg.Instances}).Info("Received a new configuration")
// Update the message history
state.messageHistoryLock.Lock()
state.messageHistory[source] = receivedMessage{msg: msg, lastPing: time.Now()}
state.messageHistoryLock.Unlock()
// Prevent configuration trashing while receiving info from multiple sources using a timer
queueConfigure(&state)
w.WriteHeader(http.StatusOK)
w.Write([]byte("OK"))
})
server := http.Server{
Handler: handler,
}
go func() {
os.Remove(socketPath)
if listener, err := net.Listen("unix", socketPath); err != nil {
log.WithError(err).Fatal("Failed to start config change listener")
} else {
server.Serve(listener)
// Cleanup stale message sources and cause reconfigurations
for !doComplete {
now := time.Now()
reconfigure := false
state.messageHistoryLock.Lock()
minExpire := MaxNonPingedMessage
for i, j := range state.messageHistory {
if toExpire := j.lastPing.Add(MaxNonPingedMessage).Sub(now); toExpire < 0 {
log.WithFields(log.Fields{"source": i}).Trace("Missing pings from source so removing a message")
delete(state.messageHistory, i)
reconfigure = true
} else if toExpire < minExpire {
minExpire = toExpire
}
}
state.messageHistoryLock.Unlock()
log.WithFields(log.Fields{"reconfigure": reconfigure, "sleep": minExpire}).Trace("Stale messages cleaned-up")
if reconfigure {
queueConfigure(&state)
}
time.Sleep(minExpire)
}
}()
var serverLocal, serverRemote http.Server
if useLocal {
serverLocal := http.Server{
Handler: handler,
}
go func() {
os.Remove(socketPath)
if listener, err := net.Listen("unix", socketPath); err != nil {
log.WithError(err).Fatal("Failed to start local config change listener")
} else {
serverLocal.Serve(listener)
}
}()
}
if useRemote {
serverRemote := http.Server{
Handler: handler,
}
go func() {
if listener, err := net.Listen("tcp", ":"+strconv.Itoa(httpPort)); err != nil {
log.WithError(err).Fatal("Failed to start remote config change listener")
} else {
serverRemote.Serve(listener)
}
}()
}
// Start the inner executable (usually starting with "pg_autoctl create postgres" or "pg_autoctl create monitor")
log.Info("Waiting for the initial configuration to arrive")
intialConfigWait.Wait()
state.intialConfigWait.Wait()
log.Info("Handling over to the inner pgpool process")
cmd := exec.Command("pgpool", "-n", "--config-file", PgpoolConfigPath, "-F", PcpConfigPath)
cmd.Stdout = os.Stdout
cmd.Stderr = os.Stderr
//cmd.Env = os.Environ()
cmd.Run()
log.Info("pg_auto_failover load balancer has completed.")
server.Close()
doComplete = true
if useLocal {
serverLocal.Close()
}
if useRemote {
serverRemote.Close()
}
}
func configure(msg pgpoolConfigMessage, nodesHistory map[pgpoolNodeKey]pgpoolNodeInfo, isInitial bool, adminLogin string, adminPass string) {
func queueConfigure(state *global) {
// Note: We use messageHistoryLock to protect configTimer as the lock
// is never held for too long as opposed to the configLock
state.messageHistoryLock.Lock()
if state.configTimer == nil || !state.configTimer.Reset(ConfigurationDelay) {
state.configTimer = time.NewTimer(ConfigurationDelay)
go func() {
<-state.configTimer.C
// We need to continue processing incoming pings so that
// messages do not get invalidated in a few seconds
snapshot := []pgpoolConfigMessage{}
state.messageHistoryLock.Lock()
for _, j := range state.messageHistory {
snapshot = append(snapshot, j.msg)
}
state.messageHistoryLock.Unlock()
state.configLock.Lock()
configure(state, snapshot)
if !state.initialConfigDone {
state.initialConfigDone = true
state.intialConfigWait.Done()
}
state.configLock.Unlock()
}()
}
state.messageHistoryLock.Unlock()
}
func configure(state *global, messageSnapshot []pgpoolConfigMessage) {
log.WithFields(log.Fields{"messages": messageSnapshot}).Trace("Configuring")
conf := make(map[string]interface{})
// Note: We are mostly compatible with the bitnami pgpool docker regarding env vars
@@ -316,7 +455,7 @@ func configure(msg pgpoolConfigMessage, nodesHistory map[pgpoolNodeKey]pgpoolNod
// Backend settings
//
// Get forced configuration (currentyl only weights)
// Get forced configuration (currently only weights)
forcedNodes, forcedNodesErr := parseForcedNodeConfigs(os.Getenv("PGPOOL_BACKEND_NODES"))
if forcedNodesErr != nil {
log.WithError(forcedNodesErr).Warn("Failed to parse forced node configuration. Ignoring.")
@@ -325,50 +464,87 @@ func configure(msg pgpoolConfigMessage, nodesHistory map[pgpoolNodeKey]pgpoolNod
// Merge node info by re-using address/port pairs as those cannot
// be modified without pgpool restart
// Note: However there is no need to print the nodes that are down
hasPrimaryChanged := false
changedDown := make(map[pgpoolNodeKey]int)
changedUp := make(map[pgpoolNodeKey]int)
processedNums := make(map[int]bool)
for _, i := range msg.Instances {
nodeKey := pgpoolNodeKey{address: i.IpAddress.String(), port: i.Port}
var num int
if histNode, exists := nodesHistory[nodeKey]; exists {
num = histNode.num
hasPrimaryChanged = hasPrimaryChanged || histNode.isPrimary != !i.IsReadOnly
} else {
num = len(nodesHistory)
hasPrimaryChanged = hasPrimaryChanged || !i.IsReadOnly
var prevPrimary *pgpoolNodeKey
for i, j := range state.nodeHistory {
if j.isInUse {
changedDown[i] = j.num
}
nodesHistory[nodeKey] = pgpoolNodeInfo{num: num, isPrimary: !i.IsReadOnly}
processedNums[num] = true
var weight float32
if forced, exists := forcedNodes[pgpoolForcedKey{hostName: i.HostName, port: i.Port}]; exists && forced.weight != -1 {
weight = forced.weight
} else {
weight = i.Weight
if j.isPrimary {
prevPrimary = &pgpoolNodeKey{address: i.address, port: i.port}
j.isPrimary = false
state.nodeHistory[i] = j
}
var flag string
if !i.IsReadOnly {
flag = "ALWAYS_PRIMARY|DISALLOW_TO_FAILOVER"
} else {
flag = "DISALLOW_TO_FAILOVER"
}
suffix := strconv.Itoa(num)
conf["backend_hostname"+suffix] = i.IpAddress.String()
conf["backend_port"+suffix] = i.Port
conf["backend_weight"+suffix] = weight
conf["backend_flag"+suffix] = flag
}
// Reset node primality in instances that disappeared
for i, j := range nodesHistory {
if _, exists := processedNums[j.num]; !exists {
j.isPrimary = false
nodesHistory[i] = j
// Determine the new primary
// Note: There may be several reported in a transition state although
// we have a trashing protection in the caller of this function. Anyway
// max one primary can be chosen so use the first one
var newPrimary *pgpoolNodeKey
for _, i := range messageSnapshot {
for _, j := range i.Instances {
if !j.IsReadOnly {
newPrimary = &pgpoolNodeKey{address: j.IpAddress.String(), port: j.Port}
break
}
}
}
for _, i := range messageSnapshot {
for _, j := range i.Instances {
nodeKey := pgpoolNodeKey{address: j.IpAddress.String(), port: j.Port}
var num int
if histNode, exists := state.nodeHistory[nodeKey]; exists {
num = histNode.num
} else {
num = len(state.nodeHistory)
}
if _, exists := processedNums[num]; exists {
// node already seen in another message
continue
}
processedNums[num] = true
isPrimary := newPrimary != nil && *newPrimary == nodeKey
state.nodeHistory[nodeKey] = pgpoolNodeInfo{num: num, isPrimary: isPrimary, isInUse: true}
if _, exists := changedDown[nodeKey]; exists {
delete(changedDown, nodeKey)
} else {
changedUp[nodeKey] = num
}
var weight float32
if forced, exists := forcedNodes[pgpoolForcedKey{hostName: j.HostName, port: j.Port}]; exists && forced.weight != -1 {
weight = forced.weight
} else {
weight = j.Weight
}
var flag string
if isPrimary {
flag = "ALWAYS_PRIMARY|DISALLOW_TO_FAILOVER"
} else {
flag = "DISALLOW_TO_FAILOVER"
}
suffix := strconv.Itoa(num)
conf["backend_hostname"+suffix] = j.IpAddress.String()
conf["backend_port"+suffix] = j.Port
conf["backend_weight"+suffix] = weight
conf["backend_flag"+suffix] = flag
}
}
// Complete transition of nodeHistory
for i, j := range changedDown {
state.nodeHistory[i] = pgpoolNodeInfo{num: j, isInUse: false}
}
/*
read -r -a nodes <<<"$(tr ',;' ' ' <<<"${PGPOOL_BACKEND_NODES}")"
@@ -430,49 +606,54 @@ func configure(msg pgpoolConfigMessage, nodesHistory map[pgpoolNodeKey]pgpoolNod
configFile.Sync()
}
if !isInitial {
if pcpAuth, err := createPcpAuth(adminLogin, adminPass, "localhost"); err != nil {
if state.initialConfigDone {
if pcpAuth, err := createPcpAuth(state.adminLogin, state.adminPass, "localhost"); err != nil {
log.WithError(err).Fatal("Cannot create PCP authentication file.")
} else {
defer releasePcpAuth(pcpAuth)
// Detach the nodes
for _, j := range changedDown {
log.WithFields(log.Fields{"num": j}).Info("Detaching node")
detachNode(j, state.adminLogin, pcpAuth)
}
// Reload config
reloadCmd := exec.Command("pcp_reload_config", "-h", "localhost", "-U", adminLogin, "-w")
reloadCmd := exec.Command("pcp_reload_config", "-h", "localhost", "-U", state.adminLogin, "-w")
reloadCmd.Env = []string{"PCPPASSFILE=" + pcpAuth}
if err := reloadCmd.Run(); err != nil {
log.WithError(err).Warn("Failed to force config reload.")
}
// Attach nodes
for _, j := range msg.Instances {
if info, exists := nodesHistory[pgpoolNodeKey{address: j.IpAddress.String(), port: j.Port}]; !exists {
log.WithFields(log.Fields{"address": j.IpAddress, "Port": j.Port}).Warn("Failed to resolve node number. Skipping attach.")
} else {
attachCmd := exec.Command("pcp_attach_node", "-h", "localhost", "-U", adminLogin, "-n", strconv.Itoa(info.num), "-w")
attachCmd.Env = []string{"PCPPASSFILE=" + pcpAuth}
if err := attachCmd.Run(); err != nil {
log.WithError(err).WithFields(log.Fields{"address": j.IpAddress, "Port": j.Port}).Warn("Node attach failed.")
}
// Change the primary first as it also brings down the previous primary (if any)
if newPrimary != nil && (prevPrimary == nil || *newPrimary != *prevPrimary) {
newNode := state.nodeHistory[*newPrimary]
if _, exists := changedUp[*newPrimary]; exists {
log.WithFields(log.Fields{"num": newNode.num}).Info("Attaching node")
attachNode(newNode.num, state.adminLogin, pcpAuth)
}
log.WithFields(log.Fields{"num": newNode.num}).Info("Promoting to primary")
promoteNode(newNode.num, state.adminLogin, pcpAuth)
}
if prevPrimary != nil {
prevNode := state.nodeHistory[*prevPrimary]
if prevNode.isInUse {
// There is no demote se re-attach it as secondary
attachNode(prevNode.num, state.adminLogin, pcpAuth)
}
}
// Set the primary
if hasPrimaryChanged {
for _, i := range msg.Instances {
if !i.IsReadOnly {
num := nodesHistory[pgpoolNodeKey{address: i.IpAddress.String(), port: i.Port}].num
log.WithFields(log.Fields{"host": i.HostName, "address": i.IpAddress, "port": i.Port, "id": num}).Info("Changing the primary node")
primaryCmd := exec.Command("pcp_promote_node", "-h", "localhost", "-U", adminLogin, "-n", strconv.Itoa(num), "-w")
primaryCmd.Env = []string{"PCPPASSFILE=" + pcpAuth}
if err := primaryCmd.Run(); err != nil {
log.WithError(err).Warn("Failed to change the primary node.")
}
break
}
// Attach the rest of nodes nodes
for i, j := range changedUp {
if newPrimary == nil || *newPrimary != i {
log.WithFields(log.Fields{"num": j}).Info("Attaching node")
attachNode(j, state.adminLogin, pcpAuth)
}
}
}
}
log.WithFields(log.Fields{"nodes": state.nodeHistory}).Info("Configured")
}
func getEnvOrDefault(name string, defaultValue string) string {
@@ -617,3 +798,23 @@ func releasePcpAuth(pcpAuth string) {
log.Warn("Failed to delete the PCP authentication file.")
}
}
func attachNode(num int, adminLogin string, pcpAuth string) {
pcpNodeCall("pcp_attach_node", num, adminLogin, pcpAuth)
}
func detachNode(num int, adminLogin string, pcpAuth string) {
pcpNodeCall("pcp_detach_node", num, adminLogin, pcpAuth)
}
func promoteNode(num int, adminLogin string, pcpAuth string) {
pcpNodeCall("pcp_promote_node", num, adminLogin, pcpAuth)
}
func pcpNodeCall(executable string, num int, adminLogin string, pcpAuth string) {
cmd := exec.Command(executable, "-h", "localhost", "-U", adminLogin, "-n", strconv.Itoa(num), "-w")
cmd.Env = []string{"PCPPASSFILE=" + pcpAuth}
if err := cmd.Run(); err != nil {
log.WithError(err).WithFields(log.Fields{"num": num}).Warn(executable + " failed.")
}
}