package main import ( "encoding/json" "fmt" "net" "net/http" "os" "os/exec" "strconv" "strings" "sync" log "github.com/sirupsen/logrus" ) const version = "1.0" const DefaultSocketPath = "/var/run/pg_autoconfig.sock" const WorkDir = "/var/run/pgpool" const PgpoolConfigPath = WorkDir + "/pgpool.conf" type pgpoolConfigMessage struct { Instances []pgpoolInstance } type pgpoolInstance struct { IpAddress net.IP Port int HostName string Weight float32 IsReadOnly bool } type pgpoolPingMessage struct { NeedsConfig bool } type pgpoolForcedKey struct { hostName string port int } type pgpoolForcedConfig struct { num int host string port int weight int dir string flag string } type pgpoolNodeKey struct { address string // net.IP port int } type pgpoolNodeInfo struct { num int } func main() { // Logging var logLevelS string if logLevelS = os.Getenv("AUTOPOOL_LOG_LEVEL"); logLevelS == "" { logLevelS = "info" } logLevel, err := log.ParseLevel(logLevelS) if err != nil { log.WithError(err).Fatal("Failed to parse log level") } log.SetLevel(logLevel) log.Info("Starting pg_auto_failover load balancer using pgpool, version " + version) nextIsSocket := false socketPath := DefaultSocketPath for _, j := range os.Args[1:] { if nextIsSocket { nextIsSocket = false socketPath = j } else { switch j { case "--socket": nextIsSocket = true default: log.WithFields(log.Fields{"value": j}).Fatal("Unknown command switch") } } } // Set permissions as we usually run as non-root if err := os.Chown(WorkDir, os.Getuid(), os.Getgid()); err != nil { log.WithFields(log.Fields{"dir": WorkDir, "uid": os.Getuid(), "gid": os.Getgid()}).Warn("Failed to set owner of the work directory to the current user.") } if err := os.Chmod(WorkDir, 700); err != nil { log.WithFields(log.Fields{"dir": WorkDir, "uid": os.Getuid(), "gid": os.Getgid()}).Warn("Failed to chmod the working directory.") } // Run the configuration change listener // Note: we must remember all IP/port pairs that appeared during // the runtime of the pgpool instance so we can properly report // former nodes are down. var intialConfigWait sync.WaitGroup intialConfigWait.Add(1) initialConfigDone := false nodes := make(map[pgpoolNodeKey]pgpoolNodeInfo) handler := http.NewServeMux() handler.HandleFunc("/ping", func(w http.ResponseWriter, r *http.Request) { log.Trace("Received a ping") var msg pgpoolPingMessage msg.NeedsConfig = !initialConfigDone if msgS, err := json.Marshal(msg); err != nil { log.WithFields(log.Fields{"msg": msg}).Warn("Failed to serializeping message") } else { w.Header()["Content-Type"] = []string{"application/json"} w.Header()["Content-Length"] = []string{strconv.Itoa(len(msgS))} w.WriteHeader(http.StatusOK) w.Write(msgS) } }) handler.HandleFunc("/config", func(w http.ResponseWriter, r *http.Request) { var msg pgpoolConfigMessage if err = json.NewDecoder(r.Body).Decode(&msg); err != nil { http.Error(w, err.Error(), http.StatusBadRequest) } log.Info("Received a new configuration") configure(msg, nodes, !initialConfigDone) if !initialConfigDone { initialConfigDone = true intialConfigWait.Done() } w.WriteHeader(http.StatusOK) w.Write([]byte("OK")) }) server := http.Server{ Handler: handler, } go func() { os.Remove(socketPath) if listener, err := net.Listen("unix", socketPath); err != nil { log.WithError(err).Fatal("Failed to start config change listener") } else { server.Serve(listener) } }() // Start the inner executable (usually starting with "pg_autoctl create postgres" or "pg_autoctl create monitor") log.Info("Waiting for the initial configuration to arrive") intialConfigWait.Wait() log.Info("Handling over to the inner pgpool process") cmd := exec.Command("pgpool", "-n", "--config-file", PgpoolConfigPath) cmd.Stdout = os.Stdout cmd.Stderr = os.Stderr //cmd.Env = os.Environ() cmd.Run() log.Info("pg_auto_failover load balancer has completed.") server.Close() } func configure(msg pgpoolConfigMessage, nodesHistory map[pgpoolNodeKey]pgpoolNodeInfo, isInitial bool) { conf := make(map[string]string) // Note: We are mostly compatible with the bitnami pgpool docker regarding env vars // Connection settings // http://www.pgpool.net/docs/latest/en/html/configuring-pgpool.html conf["backend_clustering_mode"] = "'streaming_replication'" conf["listen_addresses"] = "*" pgpoolPort := getEnvOrDefault("PGPOOL_PORT_NUMBER", "5432") conf["port"] = pgpoolPort tmpDir := getEnvOrDefault("PGPOOL_TMP_DIR", "/tmp") conf["unix_socket_directories"] = tmpDir conf["pcp_socket_dir"] = tmpDir // Connection Pooling settings // http://www.pgpool.net/docs/latest/en/html/runtime-config-connection-pooling.html if val := os.Getenv("PGPOOL_NUM_INIT_CHILDREN"); val != "" { conf["num_init_children"] = val } if val := os.Getenv("PGPOOL_RESERVED_CONNECTIONS"); val != "" { conf["reserved_connections"] = val } conf["max_pool"] = getEnvOrDefault("PGPOOL_MAX_POOL", "15") if val := os.Getenv("PGPOOL_CHILD_MAX_CONNECTIONS"); val != "" { conf["child_max_connections"] = val } if val := os.Getenv("PGPOOL_CHILD_LIFE_TIME"); val != "" { conf["child_life_time"] = val } if val := os.Getenv("PGPOOL_CONNECTION_LIFE_TIME"); val != "" { conf["connection_life_time"] = val } if val := os.Getenv("PGPOOL_CLIENT_IDLE_LIMIT"); val != "" { conf["client_idle_limit"] = val } // Logging settings // https://www.pgpool.net/docs/latest/en/html/runtime-config-logging.html conf["log_connections"] = getEnvOrDefaultBool("PGPOOL_ENABLE_LOG_CONNECTIONS", false) conf["log_hostname"] = getEnvOrDefaultBool("PGPOOL_ENABLE_LOG_HOSTNAME", false) conf["log_per_node_statement"] = getEnvOrDefaultBool("PGPOOL_ENABLE_LOG_PER_NODE_STATEMENT", false) if val := os.Getenv("PGPOOL_LOG_LINE_PREFIX"); val != "" { conf["log_line_prefix"] = val } if val := os.Getenv("PGPOOL_CLIENT_MIN_MESSAGES"); val != "" { conf["client_min_messages"] = val } // Authentication settings const DefaultEnablePoolHba = true const DefaultEnablePoolPasswd = true var poolPasswd string if getEnvOrDefaultBoolRaw("PGPOOL_ENABLE_POOL_PASSWD", DefaultEnablePoolPasswd) { poolPasswd = getEnvOrDefault("PGPOOL_PASSWD_FILE", "pool_passwd") } else { // Specifying '' (empty) disables the use of password file. // https://www.pgpool.net/docs/latest/en/html/runtime-config-connection.html#GUC-POOL-PASSWD poolPasswd = "" } // http://www.pgpool.net/docs/latest/en/html/runtime-config-connection.html#RUNTIME-CONFIG-AUTHENTICATION-SETTINGS conf["enable_pool_hba"] = getEnvOrDefaultBool("PGPOOL_ENABLE_POOL_HBA", DefaultEnablePoolHba) // allow_clear_text_frontend_auth only works when enable_pool_hba is not enabled // https://www.pgpool.net/docs/latest/en/html/runtime-config-connection.html#GUC-ALLOW-CLEAR-TEXT-FRONTEND-AUTH // Note: the yes/no values are negatives off/on conf["allow_clear_text_frontend_auth"] = getEnvOrDefaultBoolNamed("PGPOOL_ENABLE_POOL_HBA", DefaultEnablePoolHba, "off", "on") conf["pool_passwd"] = getEnvOrDefault("PGPOOL_MAX_POOL", "15") conf["pool_passwd"] = poolPasswd conf["authentication_timeout"] = "30" // File Locations settings conf["pid_file_name"] = getEnvOrDefault("PGPOOL_PID_FILE", tmpDir+"/pgpool.pid") conf["logdir"] = getEnvOrDefault("PGPOOL_LOG_DIR", "/var/log") // Load Balancing settings // https://www.pgpool.net/docs/latest/en/html/runtime-config-load-balancing.html conf["load_balance_mode"] = getEnvOrDefaultBool("PGPOOL_ENABLE_LOAD_BALANCING", true) conf["black_function_list"] = "nextval,setval" conf["statement_level_load_balance"] = getEnvOrDefaultBool("PGPOOL_ENABLE_STATEMENT_LOAD_BALANCING", false) // Streaming Replication Check settings // https://www.pgpool.net/docs/latest/en/html/runtime-streaming-replication-check.html conf["sr_check_user"] = getEnvOrDefault("PGPOOL_SR_CHECK_USER", "") conf["sr_check_password"] = getEnvOrDefault("PGPOOL_SR_CHECK_PASSWORD", "") conf["sr_check_period"] = getEnvOrDefault("PGPOOL_SR_CHECK_PERIOD", "30") conf["sr_check_database"] = getEnvOrDefault("PGPOOL_SR_CHECK_DATABASE", "postgres") // Failover settings conf["failover_command"] = "echo \">>> Failover - that will initialize new primary node search!\"" conf["failover_on_backend_error"] = "off" // Keeps searching for a primary node forever when a failover occurs conf["search_primary_node_timeout"] = "0" conf["disable_load_balance_on_write"] = checkKnownValue(getEnvOrDefault("PGPOOL_DISABLE_LOAD_BALANCE_ON_WRITE", "transaction"), []string{"off", "transaction", "trans_transaction", "always"}, "PGPOOL_DISABLE_LOAD_BALANCE_ON_WRITE") // SSL settings // https://www.pgpool.net/docs/latest/en/html/runtime-ssl.html if getEnvOrDefaultBoolRaw("PGPOOL_ENABLE_TLS", false) { tlsKeyFile := getEnvOrDefault("PGPOOL_TLS_KEY_FILE", "") if keyStat, err := os.Stat(tlsKeyFile); err != nil { log.WithFields(log.Fields{"path": tlsKeyFile}).Fatal("TLS key file not found or access is denied") } else if keyStat.Mode() != 0600 { if err := os.Chmod(tlsKeyFile, 0600); err != nil { log.WithError(err).WithFields(log.Fields{"path": tlsKeyFile}).Fatal("TLS key file permission cannot be set to compulsory 0600") } } conf["ssl"] = "on" // Server ciphers are preferred by default if !getEnvOrDefaultBoolRaw("PGPOOL_TLS_PREFER_SERVER_CIPHERS", true) { conf["ssl_prefer_server_ciphers"] = "off" } if val := os.Getenv("PGPOOL_TLS_CA_FILE"); val != "" { conf["ssl_ca_cert"] = val } conf["ssl_cert"] = getEnvOrDefault("PGPOOL_TLS_CERT_FILE", "") conf["ssl_key"] = getEnvOrDefault("PGPOOL_TLS_KEY_FILE", "") } // // Backend settings // // Get forced configuration (currentyl only weights) forcedNodes, forcedNodesErr := parseForcedNodeConfigs(os.Getenv("PGPOOL_BACKEND_NODES")) if forcedNodesErr != nil { log.WithError(forcedNodesErr).Warn("Failed to parse forced node configuration. Ignoring.") } // Merge node info by re-using address/port pairs as those cannot // be modified without pgpool restart // Note: However there is no need to print the nodes that are down for _, i := range msg.Instances { nodeKey := pgpoolNodeKey{address: i.IpAddress.String(), port: i.Port} var num int if histNode, exists := nodesHistory[nodeKey]; exists { num = histNode.num } else { num = len(nodesHistory) } nodesHistory[nodeKey] = pgpoolNodeInfo{num: num} var weight int if forced, exists := forcedNodes[pgpoolForcedKey{hostName: i.HostName, port: i.Port}]; exists { weight = forced.weight } else { weight = -1 } var flag string if !i.IsReadOnly { flag = "ALWAYS_PRIMARY|DISALLOW_TO_FAILOVER" } else { flag = "DISALLOW_TO_FAILOVER" } suffix := strconv.Itoa(num) conf["backend_hostname"+suffix] = i.IpAddress.String() conf["backend_port"+suffix] = strconv.Itoa(i.Port) conf["backend_weight"+suffix] = strconv.Itoa(weight) conf["backend_flag"+suffix] = flag } /* read -r -a nodes <<<"$(tr ',;' ' ' <<<"${PGPOOL_BACKEND_NODES}")" if is_boolean_yes "$PGPOOL_AUTO_FAILBACK"; then pgpool_set_property "auto_failback" "on" read -r -a app_name <<<"$(tr ',;' ' ' <<<"${PGPOOL_BACKEND_APPLICATION_NAMES}")" fi for node in "${nodes[@]}"; do pgpool_create_backend_config "$node" "$(is_boolean_yes "$PGPOOL_AUTO_FAILBACK" && echo "${app_name[i]}")" ((i += 1)) done */ /* if [[ -f "$PGPOOL_USER_CONF_FILE" ]]; then info "Custom configuration '$PGPOOL_USER_CONF_FILE' detected!. Adding it to the configuration file." cat "$PGPOOL_USER_CONF_FILE" >>"$PGPOOL_CONF_FILE" fi if [[ -f "$PGPOOL_USER_HBA_FILE" ]]; then info "Custom configuration '$PGPOOL_USER_HBA_FILE' detected!. Overwriting the generated hba file." cat "$PGPOOL_USER_HBA_FILE" >"$PGPOOL_PGHBA_FILE" fi*/ // // Write the config file // log.WithFields(log.Fields{"conf": conf}).Trace("New configuration") if configFile, err := os.Create(PgpoolConfigPath); err != nil { log.WithError(err).Fatal("Cannot open the pgpool config file.") } else { defer configFile.Close() for i, j := range conf { if _, err := configFile.WriteString(i + " = " + j + "\n"); err != nil { log.WithError(err).Fatal("Cannot write to the the pgpool config file.") } } configFile.Sync() } if !isInitial { // Reload config reloadCmd := exec.Command("pcp_reload_config", "-h", tmpDir, "-p", pgpoolPort, "--no-password") if err := reloadCmd.Run(); err != nil { log.WithError(err).Warn("Failed to force config reload.") } // Attach nodes for _, j := range msg.Instances { if info, exists := nodesHistory[pgpoolNodeKey{address: j.IpAddress.String(), port: j.Port}]; !exists { log.WithFields(log.Fields{"address": j.IpAddress, "Port": j.Port}).Warn("Failed to resolve node number. Skipping attach.") } else { attachCmd := exec.Command("pcp_attach_node", "-h", tmpDir, "-p", pgpoolPort, "--no-password", strconv.Itoa(info.num)) if err := attachCmd.Run(); err != nil { log.WithError(err).WithFields(log.Fields{"address": j.IpAddress, "Port": j.Port}).Warn("Node attach failed.") } } } } } func getEnvOrDefault(name string, defaultValue string) string { if val := os.Getenv(name); val != "" { return val } else { return defaultValue } } func getEnvOrDefaultBool(name string, defaultValue bool) string { return getEnvOrDefaultBoolNamed(name, defaultValue, "on", "off") } func getEnvOrDefaultBoolNamed(name string, defaultValue bool, yes string, no string) string { if getEnvOrDefaultBoolRaw(name, defaultValue) { return yes } else { return no } } func getEnvOrDefaultBoolRaw(name string, defaultValue bool) bool { if valS := os.Getenv(name); name != "" { valS = strings.ToLower(valS) if valS == "1" || valS == "yes" || valS == "true" { return true } else { return false } } else { return defaultValue } } func checkKnownValue(value string, allowed []string, context string) string { valueLower := strings.ToLower(value) for _, i := range allowed { if i == valueLower { return value } } log.WithFields(log.Fields{context: value, "allowed": allowed}).Fatal("The value must be a well-known string") // Unreachable return "" } func parseForcedNodeConfigs(nodes string) (map[pgpoolForcedKey]pgpoolForcedConfig, error) { result := make(map[pgpoolForcedKey]pgpoolForcedConfig) for _, i := range strings.FieldsFunc(nodes, NodesSplit) { if val, err := parseForcedNodeConfig(i); err != nil { return nil, err } else { result[pgpoolForcedKey{hostName: val.host, port: val.port}] = val } } return result, nil } func NodesSplit(r rune) bool { return r == ',' || r == ';' } func parseForcedNodeConfig(value string) (pgpoolForcedConfig, error) { fields := strings.Split(value, ":") fieldsLen := len(fields) if fieldsLen < 2 { return pgpoolForcedConfig{}, fmt.Errorf("node config must have at least two parts separated by ':'.") } var result pgpoolForcedConfig if val, err := strconv.ParseInt(fields[0], 10, 32); err != nil { result.num = -1 // the number is not important, just skip it } else { result.num = int(val) } result.host = fields[1] if fieldsLen >= 3 { if val, err := strconv.ParseInt(fields[2], 10, 32); err == nil { result.port = int(val) } else { result.port = 5432 } } else { result.port = 5432 } if fieldsLen >= 4 { if val, err := strconv.ParseInt(fields[3], 10, 32); err == nil { result.weight = int(val) } else { result.weight = -1 } } else { result.weight = -1 } if fieldsLen >= 5 { result.dir = fields[4] } else { result.dir = getEnvOrDefault("PGPOOL_DATA_DIR", "/data") } if fieldsLen >= 6 { result.flag = fields[5] } else { result.flag = "ALLOW_TO_FAILOVER" } return result, nil }