package main import ( "crypto/md5" "encoding/hex" "encoding/json" "fmt" "net" "net/http" "os" "os/exec" "strconv" "strings" "sync" "time" log "github.com/sirupsen/logrus" ) const version = "1.0" const DefaultSocketPath = "/var/run/pg_autoconfig.sock" const WorkDir = "/var/run/pgpool" const PgpoolConfigPath = WorkDir + "/pgpool.conf" const PcpConfigPath = WorkDir + "/pcp.conf" const DefaultRemotePort = 5420 const MaxNonPingedMessage = 20 * time.Second const ConfigurationDelay = 4 * time.Second type pgpoolConfigMessage struct { Instances []pgpoolInstance } type pgpoolInstance struct { IpAddress net.IP Port int HostName string Weight float32 IsReadOnly bool } type pgpoolPingMessage struct { NeedsConfig bool } type pgpoolForcedKey struct { hostName string port int } type pgpoolForcedConfig struct { num int host string port int weight float32 dir string flag string } type global struct { nodeHistory map[pgpoolNodeKey]pgpoolNodeInfo messageHistoryLock sync.Mutex messageHistory map[string]receivedMessage adminLogin string adminPass string intialConfigWait sync.WaitGroup initialConfigDone bool configLock sync.Mutex configTimer *time.Timer } type receivedMessage struct { msg pgpoolConfigMessage lastPing time.Time } type pgpoolNodeKey struct { address string // net.IP port int } type pgpoolNodeInfo struct { num int isPrimary bool isInUse bool } func main() { // Logging var logLevelS string if logLevelS = os.Getenv("AUTOPOOL_LOG_LEVEL"); logLevelS == "" { logLevelS = "info" } logLevel, err := log.ParseLevel(logLevelS) if err != nil { log.WithError(err).Fatal("Failed to parse log level") } log.SetLevel(logLevel) log.Info("Starting pg_auto_failover load balancer using pgpool, version " + version) useLocal := false useRemote := false nextIsSocket := false socketPath := DefaultSocketPath nextIsPort := false httpPort := DefaultRemotePort for _, j := range os.Args[1:] { if nextIsSocket { nextIsSocket = false socketPath = j } else if nextIsPort { nextIsPort = false if value, err := strconv.ParseInt(j, 10, 32); err != nil { log.Fatal("Invalid port number") } else { httpPort = int(value) } } else { switch j { case "--local": useLocal = true case "--socket": nextIsSocket = true case "--remote": useRemote = true case "--port": nextIsPort = true default: log.WithFields(log.Fields{"value": j}).Fatal("Unknown command switch") } } } if !useLocal && !useRemote { log.Fatal("You must enable --local, --remote or both.") } // Set permissions as we usually run as non-root //if err := os.Chown(WorkDir, os.Getuid(), os.Getgid()); err != nil { // log.WithFields(log.Fields{"dir": WorkDir, "uid": os.Getuid(), "gid": os.Getgid()}).Warn("Failed to set owner of the work directory to the current user.") //} //if err := os.Chmod(WorkDir, 700); err != nil { // log.WithFields(log.Fields{"dir": WorkDir, "uid": os.Getuid(), "gid": os.Getgid()}).Warn("Failed to chmod the working directory.") //} // Authentication for pgpool management var state global state.adminLogin = os.Getenv("PGPOOL_ADMIN_USERNAME") state.adminPass = os.Getenv("PGPOOL_ADMIN_PASSWORD") adminPassPath := os.Getenv("PGPOOL_ADMIN_PASSWORD_FILE") if adminPassFile, err := os.Stat(adminPassPath); err == nil && !adminPassFile.IsDir() { if v, err := os.ReadFile(adminPassPath); err == nil { state.adminPass = string(v) } } if state.adminLogin == "" || state.adminPass == "" { log.Fatal("Pgpool admin PGPOOL_ADMIN_USERNAME and either PGPOOL_ADMIN_PASSWORD or PGPOOL_ADMIN_PASSWORD_FILE must be set") } adminPassHashB := md5.Sum([]byte(state.adminPass)) adminPassHash := hex.EncodeToString(adminPassHashB[:]) if pcpFile, err := os.Create(PcpConfigPath); err != nil { log.WithError(err).Fatal("Cannot create the pcp config file.") } else { defer pcpFile.Close() if _, err := pcpFile.WriteString(state.adminLogin + ":" + adminPassHash); err != nil { log.WithError(err).Fatal("Cannot write to the the pcp config file.") } pcpFile.Sync() } // Run the configuration change listener // Note: we must remember all IP/port pairs that appeared during // the runtime of the pgpool instance so we can properly report // former nodes are down. state.intialConfigWait.Add(1) state.nodeHistory = make(map[pgpoolNodeKey]pgpoolNodeInfo) state.messageHistory = make(map[string]receivedMessage) doComplete := false handler := http.NewServeMux() handler.HandleFunc("/ping", func(w http.ResponseWriter, r *http.Request) { log.Trace("Received a ping") source := strings.Split(r.RemoteAddr, ":")[0] // Note: Look for message history cleanup go routine below state.messageHistoryLock.Lock() var needsConfig bool if hist, exists := state.messageHistory[source]; exists { hist.lastPing = time.Now() state.messageHistory[source] = hist needsConfig = false } else { needsConfig = true } state.messageHistoryLock.Unlock() var msg pgpoolPingMessage msg.NeedsConfig = needsConfig if msgS, err := json.Marshal(msg); err != nil { log.WithFields(log.Fields{"msg": msg}).Warn("Failed to serializeping message") } else { w.Header()["Content-Type"] = []string{"application/json"} w.Header()["Content-Length"] = []string{strconv.Itoa(len(msgS))} w.WriteHeader(http.StatusOK) w.Write(msgS) } }) handler.HandleFunc("/config", func(w http.ResponseWriter, r *http.Request) { var msg pgpoolConfigMessage if err = json.NewDecoder(r.Body).Decode(&msg); err != nil { http.Error(w, err.Error(), http.StatusBadRequest) } source := strings.Split(r.RemoteAddr, ":")[0] log.WithFields(log.Fields{"source": source, "instances": msg.Instances}).Info("Received a new configuration") // Update the message history state.messageHistoryLock.Lock() state.messageHistory[source] = receivedMessage{msg: msg, lastPing: time.Now()} state.messageHistoryLock.Unlock() // Prevent configuration trashing while receiving info from multiple sources using a timer queueConfigure(&state) w.WriteHeader(http.StatusOK) w.Write([]byte("OK")) }) go func() { // Cleanup stale message sources and cause reconfigurations for !doComplete { now := time.Now() reconfigure := false state.messageHistoryLock.Lock() minExpire := MaxNonPingedMessage for i, j := range state.messageHistory { if toExpire := j.lastPing.Add(MaxNonPingedMessage).Sub(now); toExpire < 0 { log.WithFields(log.Fields{"source": i}).Trace("Missing pings from source so removing a message") delete(state.messageHistory, i) reconfigure = true } else if toExpire < minExpire { minExpire = toExpire } } state.messageHistoryLock.Unlock() log.WithFields(log.Fields{"reconfigure": reconfigure, "sleep": minExpire}).Trace("Stale messages cleaned-up") if reconfigure { queueConfigure(&state) } time.Sleep(minExpire) } }() var serverLocal, serverRemote http.Server if useLocal { serverLocal := http.Server{ Handler: handler, } go func() { os.Remove(socketPath) if listener, err := net.Listen("unix", socketPath); err != nil { log.WithError(err).Fatal("Failed to start local config change listener") } else { serverLocal.Serve(listener) } }() } if useRemote { serverRemote := http.Server{ Handler: handler, } go func() { if listener, err := net.Listen("tcp", ":"+strconv.Itoa(httpPort)); err != nil { log.WithError(err).Fatal("Failed to start remote config change listener") } else { serverRemote.Serve(listener) } }() } // Start the inner executable (usually starting with "pg_autoctl create postgres" or "pg_autoctl create monitor") log.Info("Waiting for the initial configuration to arrive") state.intialConfigWait.Wait() log.Info("Handling over to the inner pgpool process") cmd := exec.Command("pgpool", "-n", "--config-file", PgpoolConfigPath, "-F", PcpConfigPath) cmd.Stdout = os.Stdout cmd.Stderr = os.Stderr cmd.Run() log.Info("pg_auto_failover load balancer has completed.") doComplete = true if useLocal { serverLocal.Close() } if useRemote { serverRemote.Close() } } func queueConfigure(state *global) { // Note: We use messageHistoryLock to protect configTimer as the lock // is never held for too long as opposed to the configLock state.messageHistoryLock.Lock() if state.configTimer == nil || !state.configTimer.Reset(ConfigurationDelay) { state.configTimer = time.NewTimer(ConfigurationDelay) go func() { <-state.configTimer.C // We need to continue processing incoming pings so that // messages do not get invalidated in a few seconds snapshot := []pgpoolConfigMessage{} state.messageHistoryLock.Lock() for _, j := range state.messageHistory { snapshot = append(snapshot, j.msg) } state.messageHistoryLock.Unlock() state.configLock.Lock() configure(state, snapshot) if !state.initialConfigDone { state.initialConfigDone = true state.intialConfigWait.Done() } state.configLock.Unlock() }() } state.messageHistoryLock.Unlock() } func configure(state *global, messageSnapshot []pgpoolConfigMessage) { log.WithFields(log.Fields{"messages": messageSnapshot}).Trace("Configuring") conf := make(map[string]interface{}) // Note: We are mostly compatible with the bitnami pgpool docker regarding env vars // Connection settings // http://www.pgpool.net/docs/latest/en/html/configuring-pgpool.html conf["backend_clustering_mode"] = "streaming_replication" conf["listen_addresses"] = "*" pgpoolPort := getEnvOrDefaultInt("PGPOOL_PORT_NUMBER", 5432) conf["port"] = pgpoolPort tmpDir := getEnvOrDefault("PGPOOL_TMP_DIR", "/tmp") conf["unix_socket_directories"] = tmpDir conf["pcp_socket_dir"] = tmpDir // Connection Pooling settings // http://www.pgpool.net/docs/latest/en/html/runtime-config-connection-pooling.html if val, err := getEnvInt("PGPOOL_NUM_INIT_CHILDREN"); err == nil { conf["num_init_children"] = val } if val, err := getEnvInt("PGPOOL_RESERVED_CONNECTIONS"); err == nil { conf["reserved_connections"] = val } conf["max_pool"] = getEnvOrDefaultInt("PGPOOL_MAX_POOL", 15) if val, err := getEnvInt("PGPOOL_CHILD_MAX_CONNECTIONS"); err == nil { conf["child_max_connections"] = val } if val, err := getEnvInt("PGPOOL_CHILD_LIFE_TIME"); err == nil { conf["child_life_time"] = val } if val, err := getEnvInt("PGPOOL_CONNECTION_LIFE_TIME"); err == nil { conf["connection_life_time"] = val } if val, err := getEnvInt("PGPOOL_CLIENT_IDLE_LIMIT"); err == nil { conf["client_idle_limit"] = val } // Logging settings // https://www.pgpool.net/docs/latest/en/html/runtime-config-logging.html conf["log_connections"] = getEnvOrDefaultBool("PGPOOL_ENABLE_LOG_CONNECTIONS", false) conf["log_hostname"] = getEnvOrDefaultBool("PGPOOL_ENABLE_LOG_HOSTNAME", false) conf["log_per_node_statement"] = getEnvOrDefaultBool("PGPOOL_ENABLE_LOG_PER_NODE_STATEMENT", false) if val := os.Getenv("PGPOOL_LOG_LINE_PREFIX"); val != "" { conf["log_line_prefix"] = val } if val, err := getEnvInt("PGPOOL_CLIENT_MIN_MESSAGES"); err == nil { conf["client_min_messages"] = val } // Authentication settings const DefaultEnablePoolHba = true const DefaultEnablePoolPasswd = true var poolPasswd string if getEnvOrDefaultBool("PGPOOL_ENABLE_POOL_PASSWD", DefaultEnablePoolPasswd) { poolPasswd = getEnvOrDefault("PGPOOL_PASSWD_FILE", "pool_passwd") } else { // Specifying '' (empty) disables the use of password file. // https://www.pgpool.net/docs/latest/en/html/runtime-config-connection.html#GUC-POOL-PASSWD poolPasswd = "" } // http://www.pgpool.net/docs/latest/en/html/runtime-config-connection.html#RUNTIME-CONFIG-AUTHENTICATION-SETTINGS conf["enable_pool_hba"] = getEnvOrDefaultBool("PGPOOL_ENABLE_POOL_HBA", DefaultEnablePoolHba) // allow_clear_text_frontend_auth only works when enable_pool_hba is not enabled // https://www.pgpool.net/docs/latest/en/html/runtime-config-connection.html#GUC-ALLOW-CLEAR-TEXT-FRONTEND-AUTH conf["allow_clear_text_frontend_auth"] = !getEnvOrDefaultBool("PGPOOL_ENABLE_POOL_HBA", DefaultEnablePoolHba) conf["pool_passwd"] = getEnvOrDefaultInt("PGPOOL_MAX_POOL", 15) conf["pool_passwd"] = poolPasswd conf["authentication_timeout"] = 30 // File Locations settings conf["pid_file_name"] = getEnvOrDefault("PGPOOL_PID_FILE", tmpDir+"/pgpool.pid") conf["logdir"] = getEnvOrDefault("PGPOOL_LOG_DIR", "/var/log") // Load Balancing settings // https://www.pgpool.net/docs/latest/en/html/runtime-config-load-balancing.html conf["load_balance_mode"] = getEnvOrDefaultBool("PGPOOL_ENABLE_LOAD_BALANCING", true) conf["black_function_list"] = "nextval,setval" conf["statement_level_load_balance"] = getEnvOrDefaultBool("PGPOOL_ENABLE_STATEMENT_LOAD_BALANCING", false) // Streaming Replication Check settings // https://www.pgpool.net/docs/latest/en/html/runtime-streaming-replication-check.html srCheckUser := getEnvOrDefault("PGPOOL_SR_CHECK_USER", "") srCheckPassword := getEnvOrDefault("PGPOOL_SR_CHECK_PASSWORD", "") srCheckPasswordPath := os.Getenv("PGPOOL_SR_CHECK_PASSWORD_FILE") if srCheckPasswordFile, err := os.Stat(srCheckPasswordPath); err == nil && !srCheckPasswordFile.IsDir() { if v, err := os.ReadFile(srCheckPasswordPath); err == nil { srCheckPassword = string(v) } } conf["sr_check_user"] = srCheckUser conf["sr_check_password"] = srCheckPassword conf["sr_check_period"] = getEnvOrDefaultInt("PGPOOL_SR_CHECK_PERIOD", 30) conf["sr_check_database"] = getEnvOrDefault("PGPOOL_SR_CHECK_DATABASE", "postgres") // Healthcheck per node settings // https://www.pgpool.net/docs/latest/en/html/runtime-config-health-check.html conf["health_check_period"] = getEnvOrDefaultInt("PGPOOL_HEALTH_CHECK_PERIOD", 30) conf["health_check_timeout"] = getEnvOrDefaultInt("PGPOOL_HEALTH_CHECK_TIMEOUT", 10) conf["health_check_user"] = getEnvOrDefault("PGPOOL_HEALTH_CHECK_USER", srCheckUser) hCheckPassword := getEnvOrDefault("PGPOOL_HEALTH_CHECK_PASSWORD", srCheckPassword) hCheckPasswordPath := os.Getenv("PGPOOL_HEALTH_CHECK_PASSWORD_FILE") if hCheckPasswordFile, err := os.Stat(hCheckPasswordPath); err == nil && !hCheckPasswordFile.IsDir() { if v, err := os.ReadFile(hCheckPasswordPath); err == nil { hCheckPassword = string(v) } } conf["health_check_password"] = hCheckPassword conf["health_check_max_retries"] = getEnvOrDefaultInt("PGPOOL_HEALTH_CHECK_MAX_RETRIES", 5) conf["health_check_retry_delay"] = getEnvOrDefaultInt("PGPOOL_HEALTH_CHECK_RETRY_DELAY", 5) conf["connect_timeout"] = getEnvOrDefaultInt("PGPOOL_CONNECT_TIMEOUT", 10000) // Failover settings conf["failover_command"] = "echo \">>> Failover - that will initialize new primary node search!\"" conf["failover_on_backend_error"] = "off" // Keeps searching for a primary node forever when a failover occurs conf["search_primary_node_timeout"] = "0" conf["disable_load_balance_on_write"] = checkKnownValue(getEnvOrDefault("PGPOOL_DISABLE_LOAD_BALANCE_ON_WRITE", "transaction"), []string{"off", "transaction", "trans_transaction", "always"}, "PGPOOL_DISABLE_LOAD_BALANCE_ON_WRITE") // SSL settings // https://www.pgpool.net/docs/latest/en/html/runtime-ssl.html if getEnvOrDefaultBool("PGPOOL_ENABLE_TLS", false) { tlsKeyFile := getEnvOrDefault("PGPOOL_TLS_KEY_FILE", "") if keyStat, err := os.Stat(tlsKeyFile); err != nil { log.WithFields(log.Fields{"path": tlsKeyFile}).Fatal("TLS key file not found or access is denied") } else if keyStat.Mode() != 0600 { if err := os.Chmod(tlsKeyFile, 0600); err != nil { log.WithError(err).WithFields(log.Fields{"path": tlsKeyFile}).Fatal("TLS key file permission cannot be set to compulsory 0600") } } conf["ssl"] = "on" // Server ciphers are preferred by default if !getEnvOrDefaultBool("PGPOOL_TLS_PREFER_SERVER_CIPHERS", true) { conf["ssl_prefer_server_ciphers"] = "off" } if val := os.Getenv("PGPOOL_TLS_CA_FILE"); val != "" { conf["ssl_ca_cert"] = val } conf["ssl_cert"] = getEnvOrDefault("PGPOOL_TLS_CERT_FILE", "") conf["ssl_key"] = getEnvOrDefault("PGPOOL_TLS_KEY_FILE", "") } // // Backend settings // // Get forced configuration (currently only weights) forcedNodes, forcedNodesErr := parseForcedNodeConfigs(os.Getenv("PGPOOL_BACKEND_NODES")) if forcedNodesErr != nil { log.WithError(forcedNodesErr).Warn("Failed to parse forced node configuration. Ignoring.") } // Merge node info by re-using address/port pairs as those cannot // be modified without pgpool restart // Note: However there is no need to print the nodes that are down changedDown := make(map[pgpoolNodeKey]int) changedUp := make(map[pgpoolNodeKey]int) processedNums := make(map[int]bool) var prevPrimary *pgpoolNodeKey for i, j := range state.nodeHistory { if j.isInUse { changedDown[i] = j.num } if j.isPrimary { prevPrimary = &pgpoolNodeKey{address: i.address, port: i.port} j.isPrimary = false state.nodeHistory[i] = j } } // Determine the new primary // Note: There may be several reported in a transition state although // we have a trashing protection in the caller of this function. Anyway // max one primary can be chosen so use the first one var newPrimary *pgpoolNodeKey for _, i := range messageSnapshot { for _, j := range i.Instances { if !j.IsReadOnly { newPrimary = &pgpoolNodeKey{address: j.IpAddress.String(), port: j.Port} break } } } for _, i := range messageSnapshot { for _, j := range i.Instances { nodeKey := pgpoolNodeKey{address: j.IpAddress.String(), port: j.Port} var num int if histNode, exists := state.nodeHistory[nodeKey]; exists { num = histNode.num } else { num = len(state.nodeHistory) } if _, exists := processedNums[num]; exists { // node already seen in another message continue } processedNums[num] = true isPrimary := newPrimary != nil && *newPrimary == nodeKey state.nodeHistory[nodeKey] = pgpoolNodeInfo{num: num, isPrimary: isPrimary, isInUse: true} if _, exists := changedDown[nodeKey]; exists { delete(changedDown, nodeKey) } else { changedUp[nodeKey] = num } var weight float32 if forced, exists := forcedNodes[pgpoolForcedKey{hostName: j.HostName, port: j.Port}]; exists && forced.weight != -1 { weight = forced.weight } else { weight = j.Weight } //var flag string //if isPrimary { // flag = "DISALLOW_TO_FAILOVER" //"ALWAYS_PRIMARY|DISALLOW_TO_FAILOVER" //} else { // flag = "DISALLOW_TO_FAILOVER" //} flag := "ALLOW_TO_FAILOVER" suffix := strconv.Itoa(num) conf["backend_hostname"+suffix] = j.IpAddress.String() conf["backend_port"+suffix] = j.Port conf["backend_weight"+suffix] = weight conf["backend_flag"+suffix] = flag } } // Complete transition of nodeHistory for i, j := range changedDown { state.nodeHistory[i] = pgpoolNodeInfo{num: j, isInUse: false} } /* read -r -a nodes <<<"$(tr ',;' ' ' <<<"${PGPOOL_BACKEND_NODES}")" if is_boolean_yes "$PGPOOL_AUTO_FAILBACK"; then pgpool_set_property "auto_failback" "on" read -r -a app_name <<<"$(tr ',;' ' ' <<<"${PGPOOL_BACKEND_APPLICATION_NAMES}")" fi for node in "${nodes[@]}"; do pgpool_create_backend_config "$node" "$(is_boolean_yes "$PGPOOL_AUTO_FAILBACK" && echo "${app_name[i]}")" ((i += 1)) done */ /* if [[ -f "$PGPOOL_USER_CONF_FILE" ]]; then info "Custom configuration '$PGPOOL_USER_CONF_FILE' detected!. Adding it to the configuration file." cat "$PGPOOL_USER_CONF_FILE" >>"$PGPOOL_CONF_FILE" fi if [[ -f "$PGPOOL_USER_HBA_FILE" ]]; then info "Custom configuration '$PGPOOL_USER_HBA_FILE' detected!. Overwriting the generated hba file." cat "$PGPOOL_USER_HBA_FILE" >"$PGPOOL_PGHBA_FILE" fi*/ // // Write the config file // log.WithFields(log.Fields{"conf": conf}).Trace("New configuration") if configFile, err := os.Create(PgpoolConfigPath); err != nil { log.WithError(err).Fatal("Cannot open the pgpool config file.") } else { defer configFile.Close() for i, j := range conf { var jS string switch j := j.(type) { case int: jS = strconv.Itoa(j) case float32: jS = strconv.FormatFloat(float64(j), 'f', 5, 32) case string: jS = "'" + j + "'" case bool: if j { jS = "on" } else { jS = "off" } default: log.Fatal("Unsupported config type") } if _, err := configFile.WriteString(i + " = " + jS + "\n"); err != nil { log.WithError(err).Fatal("Cannot write to the the pgpool config file.") } } configFile.Sync() } if state.initialConfigDone { if pcpAuth, err := createPcpAuth(state.adminLogin, state.adminPass, "localhost"); err != nil { log.WithError(err).Fatal("Cannot create PCP authentication file.") } else { defer releasePcpAuth(pcpAuth) // Detach the nodes for _, j := range changedDown { log.WithFields(log.Fields{"num": j}).Info("Detaching node") detachNode(j, state.adminLogin, pcpAuth) } // Reload config reloadCmd := exec.Command("pcp_reload_config", "-h", "localhost", "-U", state.adminLogin, "-w") reloadCmd.Env = []string{"PCPPASSFILE=" + pcpAuth} if err := reloadCmd.Run(); err != nil { log.WithError(err).Warn("Failed to force config reload.") } // Change the primary first as it also brings down the previous primary (if any) if newPrimary != nil && (prevPrimary == nil || *newPrimary != *prevPrimary) { newNode := state.nodeHistory[*newPrimary] if _, exists := changedUp[*newPrimary]; exists { log.WithFields(log.Fields{"num": newNode.num}).Info("Attaching node") attachNode(newNode.num, state.adminLogin, pcpAuth) } log.WithFields(log.Fields{"num": newNode.num}).Info("Promoting to primary") promoteNode(newNode.num, state.adminLogin, pcpAuth) } if prevPrimary != nil { prevNode := state.nodeHistory[*prevPrimary] if prevNode.isInUse { // There is no demote se re-attach it as secondary attachNode(prevNode.num, state.adminLogin, pcpAuth) } } // Attach the rest of nodes nodes for i, j := range changedUp { if newPrimary == nil || *newPrimary != i { log.WithFields(log.Fields{"num": j}).Info("Attaching node") attachNode(j, state.adminLogin, pcpAuth) } } } } log.WithFields(log.Fields{"nodes": state.nodeHistory}).Info("Configured") } func getEnvOrDefault(name string, defaultValue string) string { if val := os.Getenv(name); val != "" { return val } else { return defaultValue } } func getEnvInt(name string) (int, error) { if val := os.Getenv(name); val != "" { if val2, err := strconv.Atoi(val); err == nil { return val2, nil } else { return 0, err } } return -1, fmt.Errorf("no value") } func getEnvOrDefaultInt(name string, defaultValue int) int { if val := os.Getenv(name); val != "" { if val2, err := strconv.Atoi(val); err == nil { return val2 } } return defaultValue } func getEnvOrDefaultBool(name string, defaultValue bool) bool { if valS := os.Getenv(name); valS != "" { valS = strings.ToLower(valS) if valS == "1" || valS == "yes" || valS == "true" { return true } else { return false } } else { return defaultValue } } func checkKnownValue(value string, allowed []string, context string) string { valueLower := strings.ToLower(value) for _, i := range allowed { if i == valueLower { return value } } log.WithFields(log.Fields{context: value, "allowed": allowed}).Fatal("The value must be a well-known string") // Unreachable return "" } func parseForcedNodeConfigs(nodes string) (map[pgpoolForcedKey]pgpoolForcedConfig, error) { result := make(map[pgpoolForcedKey]pgpoolForcedConfig) for _, i := range strings.FieldsFunc(nodes, NodesSplit) { if val, err := parseForcedNodeConfig(i); err != nil { return nil, err } else { result[pgpoolForcedKey{hostName: val.host, port: val.port}] = val } } return result, nil } func NodesSplit(r rune) bool { return r == ',' || r == ';' } func parseForcedNodeConfig(value string) (pgpoolForcedConfig, error) { fields := strings.Split(value, ":") fieldsLen := len(fields) if fieldsLen < 2 { return pgpoolForcedConfig{}, fmt.Errorf("node config must have at least two parts separated by ':'") } var result pgpoolForcedConfig if val, err := strconv.ParseInt(fields[0], 10, 32); err != nil { result.num = -1 // the number is not important, just skip it } else { result.num = int(val) } result.host = fields[1] if fieldsLen >= 3 { if val, err := strconv.ParseInt(fields[2], 10, 32); err == nil { result.port = int(val) } else { result.port = 5432 } } else { result.port = 5432 } if fieldsLen >= 4 { if val, err := strconv.ParseFloat(fields[3], 32); err == nil { result.weight = float32(val) } else { result.weight = -1 } } else { result.weight = -1 } if fieldsLen >= 5 { result.dir = fields[4] } else { result.dir = getEnvOrDefault("PGPOOL_DATA_DIR", "/data") } if fieldsLen >= 6 { result.flag = fields[5] } else { result.flag = "ALLOW_TO_FAILOVER" } return result, nil } func createPcpAuth(adminLogin string, adminPass string, host string) (string, error) { if file, err := os.CreateTemp(os.TempDir(), "pcppass*"); err != nil { return "", err } else { defer file.Close() if _, err := file.WriteString(host + ":9898:" + adminLogin + ":" + adminPass); err != nil { return "", err } else { result := file.Name() if err := os.Chmod(result, 0600); err != nil { log.Warn("Failed to set permissions for the PCP password file.") } return result, nil } } } func releasePcpAuth(pcpAuth string) { if err := os.Remove(pcpAuth); err != nil { log.Warn("Failed to delete the PCP authentication file.") } } func attachNode(num int, adminLogin string, pcpAuth string) { pcpNodeCall("pcp_attach_node", num, adminLogin, pcpAuth) } func detachNode(num int, adminLogin string, pcpAuth string) { pcpNodeCall("pcp_detach_node", num, adminLogin, pcpAuth) } func promoteNode(num int, adminLogin string, pcpAuth string) { pcpNodeCall("pcp_promote_node", num, adminLogin, pcpAuth) } func pcpNodeCall(executable string, num int, adminLogin string, pcpAuth string) { cmd := exec.Command(executable, "-h", "localhost", "-U", adminLogin, "-n", strconv.Itoa(num), "-w") cmd.Env = []string{"PCPPASSFILE=" + pcpAuth} if err := cmd.Run(); err != nil { log.WithError(err).WithFields(log.Fields{"num": num}).Warn(executable + " failed.") } }