From 08685bda470787ad08f23ac8035d8fff1dac0830 Mon Sep 17 00:00:00 2001 From: Roman Vanicek Date: Wed, 3 Apr 2024 00:37:40 +0200 Subject: [PATCH] Initial commit --- .drone.yml | 16 ++ .gitignore | 1 + Dockerfile | 22 +++ go.mod | 7 + go.sum | 16 ++ main.go | 471 +++++++++++++++++++++++++++++++++++++++++++++++++++++ 6 files changed, 533 insertions(+) create mode 100644 .drone.yml create mode 100644 .gitignore create mode 100644 Dockerfile create mode 100644 go.mod create mode 100644 go.sum create mode 100644 main.go diff --git a/.drone.yml b/.drone.yml new file mode 100644 index 0000000..ca480fd --- /dev/null +++ b/.drone.yml @@ -0,0 +1,16 @@ +kind: pipeline +name: default + +steps: +- name: docker + image: plugins/docker + settings: + registry: https://git.ivasoft.cz + username: + from_secret: repo_user + password: + from_secret: repo_pass + repo: git.ivasoft.cz/sw/docker-pgpool + tags: + - latest + - ${DRONE_TAG:-latest} \ No newline at end of file diff --git a/.gitignore b/.gitignore new file mode 100644 index 0000000..d89834c --- /dev/null +++ b/.gitignore @@ -0,0 +1 @@ +pg_autopool \ No newline at end of file diff --git a/Dockerfile b/Dockerfile new file mode 100644 index 0000000..c7f5a9a --- /dev/null +++ b/Dockerfile @@ -0,0 +1,22 @@ +FROM golang:1.21-bullseye AS builder + +WORKDIR /usr/local/src/pg_autopool +COPY go.* ./ +COPY *.go ./ +RUN go mod download + +RUN mkdir bin/ && go build -o bin/ ./... + +FROM alpine + +#RUN ca-certificates curl ldap-utils libaudit1 libbsd0 libcap-ng0 libcom-err2 libcrypt1 libedit2 libffi8 libgcc-s1 libgmp10 libgnutls30 libgssapi-krb5-2 libhogweed6 libicu72 libidn2-0 libk5crypto3 libkeyutils1 libkrb5-3 libkrb5support0 libldap-2.5-0 libldap-common liblzma5 libmd0 libnettle8 libnss-ldapd libp11-kit0 libpam-ldapd libpam0g libpq5 libsasl2-2 libssl3 libstdc++6 libtasn1-6 libtinfo6 libunistring2 libuuid1 libxml2 libxslt1.1 nslcd procps zlib1g +RUN apk add --no-cache pgpool gettext postgresql-client && \ + mkdir /var/run/pgpool/ + +COPY --from=builder /usr/local/src/pg_autopool/bin/pg_autopool / +#COPY ./conf /etc/pgpool + +EXPOSE 5432 + +ENTRYPOINT ["/bin/sh", "/pg_autopool"] +CMD ["pgpool", "-n"] \ No newline at end of file diff --git a/go.mod b/go.mod new file mode 100644 index 0000000..949f159 --- /dev/null +++ b/go.mod @@ -0,0 +1,7 @@ +module git.ivasoft.cz/sw/pg_autopool + +go 1.21 + +require github.com/sirupsen/logrus v1.9.0 + +require golang.org/x/sys v0.15.0 // indirect diff --git a/go.sum b/go.sum new file mode 100644 index 0000000..ddaaa62 --- /dev/null +++ b/go.sum @@ -0,0 +1,16 @@ +github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= +github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c= +github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= +github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM= +github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4= +github.com/sirupsen/logrus v1.9.0 h1:trlNQbNUG3OdDrDil03MCb1H2o9nJ1x4/5LYw7byDE0= +github.com/sirupsen/logrus v1.9.0/go.mod h1:naHLuLoDiP4jHNo9R0sCBMtWGeIprob74mVsIT4qYEQ= +github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME= +github.com/stretchr/testify v1.7.0 h1:nwc3DEeHmmLAfoZucVR881uASk0Mfjw8xYJ99tb5CcY= +github.com/stretchr/testify v1.7.0/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg= +golang.org/x/sys v0.0.0-20220715151400-c0bba94af5f8/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= +golang.org/x/sys v0.15.0 h1:h48lPFYpsTvQJZF4EKyI4aLHaev3CxivZmv7yZig9pc= +golang.org/x/sys v0.15.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA= +gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= +gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c h1:dUUwHk2QECo/6vqA44rthZ8ie2QXMNeKRTHCNY2nXvo= +gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= diff --git a/main.go b/main.go new file mode 100644 index 0000000..42ef101 --- /dev/null +++ b/main.go @@ -0,0 +1,471 @@ +package main + +import ( + "encoding/json" + "flag" + "fmt" + "net" + "net/http" + "os" + "os/exec" + "strconv" + "strings" + + log "github.com/sirupsen/logrus" +) + +const version = "1.0" +const DefaultSocketPath = "/var/run/pg_autoconfig.sock" +const PgpoolConfigPath = "/etc/pgpool/pgpool.conf" + +type pgpoolConfigMessage struct { + Instances []pgpoolInstance +} + +type pgpoolInstance struct { + IpAddress net.IP + Port int + HostName string + Weight float32 + IsReadOnly bool +} + +type pgpoolForcedKey struct { + hostName string + port int +} + +type pgpoolForcedConfig struct { + num int + host string + port int + weight int + dir string + flag string +} + +type pgpoolNodeKey struct { + address string // net.IP + port int +} + +type pgpoolNodeInfo struct { + num int +} + +func main() { + flag.Parse() + + // Logging + var logLevelS string + if logLevelS = os.Getenv("AUTOPOOL_LOG_LEVEL"); logLevelS == "" { + logLevelS = "info" + } + + logLevel, err := log.ParseLevel(logLevelS) + if err != nil { + log.WithError(err).Fatal("Failed to parse log level") + } + log.SetLevel(logLevel) + + log.Info("Starting pg_auto_failover load balancer using pgpool, version " + version) + + nextIsSocket := false + socketPath := DefaultSocketPath + var argsLeft []string + for i, j := range os.Args { + if nextIsSocket { + nextIsSocket = false + socketPath = j + } else if j == "--" { + argsLeft = os.Args[i+1:] + } else { + switch j { + case "--socket": + nextIsSocket = true + default: + log.WithFields(log.Fields{"value": j}).Fatal("Unknown command switch") + } + } + } + + if len(argsLeft) == 0 { + log.Info("No inner command found to execute. Will execute pgpool -n") + argsLeft = []string{"pgpool", "-n"} + } + + // Run the configuration change listener + // Note: we must remember all IP/port pairs that appeared during + // the runtime of the pgpool instance so we can properly report + // former nodes are down. + nodes := make(map[pgpoolNodeKey]pgpoolNodeInfo) + handler := http.NewServeMux() + handler.HandleFunc("/config", func(w http.ResponseWriter, r *http.Request) { + var msg pgpoolConfigMessage + if err = json.NewDecoder(r.Body).Decode(&msg); err != nil { + http.Error(w, err.Error(), http.StatusBadRequest) + } + + configure(msg, nodes) + + w.WriteHeader(http.StatusOK) + w.Write([]byte("OK")) + }) + server := http.Server{ + Handler: handler, + } + go func() { + if listener, err := net.Listen("unix", socketPath); err != nil { + log.WithError(err).Fatal("Failed to start config change listener") + } else { + server.Serve(listener) + } + }() + + // Start the inner executable (usually starting with "pg_autoctl create postgres" or "pg_autoctl create monitor") + log.WithFields(log.Fields{"name": argsLeft[0], "args": argsLeft[1:]}).Info("Handling over to inner process.") + cmd := exec.Command(argsLeft[0], argsLeft[1:]...) + //cmd.Env = os.Environ() + cmd.Run() + + log.Info("pg_auto_failover load balancer has completed.") + server.Close() +} + +func configure(msg pgpoolConfigMessage, nodesHistory map[pgpoolNodeKey]pgpoolNodeInfo) { + conf := make(map[string]string) + + // Note: We are mostly compatible with the bitnami pgpool docker regarding env vars + + // Connection settings + // http://www.pgpool.net/docs/latest/en/html/configuring-pgpool.html + conf["backend_clustering_mode"] = "'streaming_replication'" + conf["listen_addresses"] = "*" + pgpoolPort := getEnvOrDefault("PGPOOL_PORT_NUMBER", "5432") + conf["port"] = pgpoolPort + tmpDir := getEnvOrDefault("PGPOOL_TMP_DIR", "/tmp") + conf["unix_socket_directories"] = tmpDir + conf["pcp_socket_dir"] = tmpDir + + // Connection Pooling settings + // http://www.pgpool.net/docs/latest/en/html/runtime-config-connection-pooling.html + if val := os.Getenv("PGPOOL_NUM_INIT_CHILDREN"); val != "" { + conf["num_init_children"] = val + } + if val := os.Getenv("PGPOOL_RESERVED_CONNECTIONS"); val != "" { + conf["reserved_connections"] = val + } + conf["max_pool"] = getEnvOrDefault("PGPOOL_MAX_POOL", "15") + if val := os.Getenv("PGPOOL_CHILD_MAX_CONNECTIONS"); val != "" { + conf["child_max_connections"] = val + } + if val := os.Getenv("PGPOOL_CHILD_LIFE_TIME"); val != "" { + conf["child_life_time"] = val + } + if val := os.Getenv("PGPOOL_CONNECTION_LIFE_TIME"); val != "" { + conf["connection_life_time"] = val + } + if val := os.Getenv("PGPOOL_CLIENT_IDLE_LIMIT"); val != "" { + conf["client_idle_limit"] = val + } + + // Logging settings + // https://www.pgpool.net/docs/latest/en/html/runtime-config-logging.html + conf["log_connections"] = getEnvOrDefaultBool("PGPOOL_ENABLE_LOG_CONNECTIONS", false) + conf["log_hostname"] = getEnvOrDefaultBool("PGPOOL_ENABLE_LOG_HOSTNAME", false) + conf["log_per_node_statement"] = getEnvOrDefaultBool("PGPOOL_ENABLE_LOG_PER_NODE_STATEMENT", false) + if val := os.Getenv("PGPOOL_LOG_LINE_PREFIX"); val != "" { + conf["log_line_prefix"] = val + } + if val := os.Getenv("PGPOOL_CLIENT_MIN_MESSAGES"); val != "" { + conf["client_min_messages"] = val + } + + // Authentication settings + const DefaultEnablePoolHba = true + const DefaultEnablePoolPasswd = true + var poolPasswd string + if getEnvOrDefaultBoolRaw("PGPOOL_ENABLE_POOL_PASSWD", DefaultEnablePoolPasswd) { + poolPasswd = getEnvOrDefault("PGPOOL_PASSWD_FILE", "pool_passwd") + } else { + // Specifying '' (empty) disables the use of password file. + // https://www.pgpool.net/docs/latest/en/html/runtime-config-connection.html#GUC-POOL-PASSWD + poolPasswd = "" + } + + // http://www.pgpool.net/docs/latest/en/html/runtime-config-connection.html#RUNTIME-CONFIG-AUTHENTICATION-SETTINGS + conf["enable_pool_hba"] = getEnvOrDefaultBool("PGPOOL_ENABLE_POOL_HBA", DefaultEnablePoolHba) + // allow_clear_text_frontend_auth only works when enable_pool_hba is not enabled + // https://www.pgpool.net/docs/latest/en/html/runtime-config-connection.html#GUC-ALLOW-CLEAR-TEXT-FRONTEND-AUTH + // Note: the yes/no values are negatives off/on + conf["allow_clear_text_frontend_auth"] = getEnvOrDefaultBoolNamed("PGPOOL_ENABLE_POOL_HBA", DefaultEnablePoolHba, "off", "on") + conf["pool_passwd"] = getEnvOrDefault("PGPOOL_MAX_POOL", "15") + conf["pool_passwd"] = poolPasswd + conf["authentication_timeout"] = "30" + + // File Locations settings + conf["pid_file_name"] = getEnvOrDefault("PGPOOL_PID_FILE", tmpDir+"/pgpool.pid") + conf["logdir"] = getEnvOrDefault("PGPOOL_LOG_DIR", "/var/log") + + // Load Balancing settings + // https://www.pgpool.net/docs/latest/en/html/runtime-config-load-balancing.html + conf["load_balance_mode"] = getEnvOrDefaultBool("PGPOOL_ENABLE_LOAD_BALANCING", true) + conf["black_function_list"] = "nextval,setval" + conf["statement_level_load_balance"] = getEnvOrDefaultBool("PGPOOL_ENABLE_STATEMENT_LOAD_BALANCING", false) + + // Streaming Replication Check settings + // https://www.pgpool.net/docs/latest/en/html/runtime-streaming-replication-check.html + conf["sr_check_user"] = getEnvOrDefault("PGPOOL_SR_CHECK_USER", "") + conf["sr_check_password"] = getEnvOrDefault("PGPOOL_SR_CHECK_PASSWORD", "") + conf["sr_check_period"] = getEnvOrDefault("PGPOOL_SR_CHECK_PERIOD", "30") + conf["sr_check_database"] = getEnvOrDefault("PGPOOL_SR_CHECK_DATABASE", "postgres") + + // Failover settings + conf["failover_command"] = "echo \">>> Failover - that will initialize new primary node search!\"" + conf["failover_on_backend_error"] = "off" + // Keeps searching for a primary node forever when a failover occurs + conf["search_primary_node_timeout"] = "0" + conf["disable_load_balance_on_write"] = checkKnownValue(getEnvOrDefault("PGPOOL_DISABLE_LOAD_BALANCE_ON_WRITE", "transaction"), + []string{"off", "transaction", "trans_transaction", "always"}, "PGPOOL_DISABLE_LOAD_BALANCE_ON_WRITE") + + // SSL settings + // https://www.pgpool.net/docs/latest/en/html/runtime-ssl.html + if getEnvOrDefaultBoolRaw("PGPOOL_ENABLE_TLS", false) { + tlsKeyFile := getEnvOrDefault("PGPOOL_TLS_KEY_FILE", "") + if keyStat, err := os.Stat(tlsKeyFile); err != nil { + log.WithFields(log.Fields{"path": tlsKeyFile}).Fatal("TLS key file not found or access is denied") + } else if keyStat.Mode() != 0600 { + if err := os.Chmod(tlsKeyFile, 0600); err != nil { + log.WithError(err).WithFields(log.Fields{"path": tlsKeyFile}).Fatal("TLS key file permission cannot be set to compulsory 0600") + } + } + conf["ssl"] = "on" + // Server ciphers are preferred by default + if !getEnvOrDefaultBoolRaw("PGPOOL_TLS_PREFER_SERVER_CIPHERS", true) { + conf["ssl_prefer_server_ciphers"] = "off" + } + + if val := os.Getenv("PGPOOL_TLS_CA_FILE"); val != "" { + conf["ssl_ca_cert"] = val + } + conf["ssl_cert"] = getEnvOrDefault("PGPOOL_TLS_CERT_FILE", "") + conf["ssl_cert"] = getEnvOrDefault("PGPOOL_TLS_KEY_FILE", "") + } + + // + // Backend settings + // + + // Get forced configuration (currentyl only weights) + forcedNodes, forcedNodesErr := parseForcedNodeConfigs(os.Getenv("PGPOOL_BACKEND_NODES")) + if forcedNodesErr != nil { + log.WithError(forcedNodesErr).Warn("Failed to parse forced node configuration. Ignoring.") + } + + // Merge node info by re-using address/port pairs as those cannot + // be modified without pgpool restart + // Note: However there is no need to print the nodes that are down + for _, i := range msg.Instances { + nodeKey := pgpoolNodeKey{address: i.IpAddress.String(), port: i.Port} + var num int + if histNode, exists := nodesHistory[nodeKey]; exists { + num = histNode.num + } else { + num = len(nodesHistory) + } + nodesHistory[nodeKey] = pgpoolNodeInfo{num: num} + + var weight int + if forced, exists := forcedNodes[pgpoolForcedKey{hostName: i.HostName, port: i.Port}]; exists { + weight = forced.weight + } else { + weight = -1 + } + + var flag string + if !i.IsReadOnly { + flag = "ALWAYS_PRIMARY|DISALLOW_TO_FAILOVER" + } else { + flag = "DISALLOW_TO_FAILOVER" + } + + suffix := strconv.Itoa(num) + conf["backend_hostname"+suffix] = i.IpAddress.String() + conf["backend_port"+suffix] = strconv.Itoa(i.Port) + conf["backend_weight"+suffix] = strconv.Itoa(weight) + conf["backend_flag"+suffix] = flag + } + + /* + + read -r -a nodes <<<"$(tr ',;' ' ' <<<"${PGPOOL_BACKEND_NODES}")" + if is_boolean_yes "$PGPOOL_AUTO_FAILBACK"; then + pgpool_set_property "auto_failback" "on" + + read -r -a app_name <<<"$(tr ',;' ' ' <<<"${PGPOOL_BACKEND_APPLICATION_NAMES}")" + fi + + for node in "${nodes[@]}"; do + pgpool_create_backend_config "$node" "$(is_boolean_yes "$PGPOOL_AUTO_FAILBACK" && echo "${app_name[i]}")" + ((i += 1)) + done + */ + + /* + + if [[ -f "$PGPOOL_USER_CONF_FILE" ]]; then + info "Custom configuration '$PGPOOL_USER_CONF_FILE' detected!. Adding it to the configuration file." + cat "$PGPOOL_USER_CONF_FILE" >>"$PGPOOL_CONF_FILE" + fi + + if [[ -f "$PGPOOL_USER_HBA_FILE" ]]; then + info "Custom configuration '$PGPOOL_USER_HBA_FILE' detected!. Overwriting the generated hba file." + cat "$PGPOOL_USER_HBA_FILE" >"$PGPOOL_PGHBA_FILE" + fi*/ + + // + // Write the config file + // + if configFile, err := os.Create(PgpoolConfigPath); err != nil { + log.WithError(err).Fatal("Cannot open the pgpool config file.") + } else { + defer configFile.Close() + + for i, j := range conf { + if _, err := configFile.WriteString(i + " = " + j + "\n"); err != nil { + log.WithError(err).Fatal("Cannot write to the the pgpool config file.") + } + } + configFile.Sync() + } + + // Reload config + reloadCmd := exec.Command("pcp_reload_config", "-h", tmpDir, "-p", pgpoolPort, "--no-password") + if err := reloadCmd.Run(); err != nil { + log.WithError(err).Warn("Failed to force config reload.") + } + + // Attach nodes + for _, j := range msg.Instances { + if info, exists := nodesHistory[pgpoolNodeKey{address: j.IpAddress.String(), port: j.Port}]; !exists { + log.WithFields(log.Fields{"address": j.IpAddress, "Port": j.Port}).Warn("Failed to resolve node number. Skipping attach.") + } else { + attachCmd := exec.Command("pcp_attach_node", "-h", tmpDir, "-p", pgpoolPort, "--no-password", strconv.Itoa(info.num)) + if err := attachCmd.Run(); err != nil { + log.WithError(err).WithFields(log.Fields{"address": j.IpAddress, "Port": j.Port}).Warn("Node attach failed.") + } + } + } +} + +func getEnvOrDefault(name string, defaultValue string) string { + if val := os.Getenv(name); name != "" { + return val + } else { + return defaultValue + } +} + +func getEnvOrDefaultBool(name string, defaultValue bool) string { + return getEnvOrDefaultBoolNamed(name, defaultValue, "on", "off") +} + +func getEnvOrDefaultBoolNamed(name string, defaultValue bool, yes string, no string) string { + if getEnvOrDefaultBoolRaw(name, defaultValue) { + return yes + } else { + return no + } +} + +func getEnvOrDefaultBoolRaw(name string, defaultValue bool) bool { + if valS := os.Getenv(name); name != "" { + valS = strings.ToLower(valS) + if valS == "1" || valS == "yes" || valS == "true" { + return true + } else { + return false + } + } else { + return defaultValue + } +} + +func checkKnownValue(value string, allowed []string, context string) string { + valueLower := strings.ToLower(value) + for _, i := range allowed { + if i == valueLower { + return value + } + } + log.WithFields(log.Fields{context: value, "allowed": allowed}).Fatal("The value must be a well-known string") + // Unreachable + return "" +} + +func parseForcedNodeConfigs(nodes string) (map[pgpoolForcedKey]pgpoolForcedConfig, error) { + result := make(map[pgpoolForcedKey]pgpoolForcedConfig) + for _, i := range strings.FieldsFunc(nodes, NodesSplit) { + if val, err := parseForcedNodeConfig(i); err != nil { + return nil, err + } else { + result[pgpoolForcedKey{hostName: val.host, port: val.port}] = val + } + } + return result, nil +} + +func NodesSplit(r rune) bool { + return r == ',' || r == ';' +} + +func parseForcedNodeConfig(value string) (pgpoolForcedConfig, error) { + fields := strings.Split(value, ":") + fieldsLen := len(fields) + if fieldsLen < 2 { + return pgpoolForcedConfig{}, fmt.Errorf("node config must have at least two parts separated by ':'.") + } + + var result pgpoolForcedConfig + if val, err := strconv.ParseInt(fields[0], 10, 32); err != nil { + result.num = -1 // the number is not important, just skip it + } else { + result.num = int(val) + } + + result.host = fields[1] + + if fieldsLen >= 3 { + if val, err := strconv.ParseInt(fields[2], 10, 32); err == nil { + result.port = int(val) + } else { + result.port = 5432 + } + } else { + result.port = 5432 + } + + if fieldsLen >= 4 { + if val, err := strconv.ParseInt(fields[3], 10, 32); err == nil { + result.weight = int(val) + } else { + result.weight = -1 + } + } else { + result.weight = -1 + } + + if fieldsLen >= 5 { + result.dir = fields[4] + } else { + result.dir = getEnvOrDefault("PGPOOL_DATA_DIR", "/data") + } + + if fieldsLen >= 6 { + result.flag = fields[5] + } else { + result.flag = "ALLOW_TO_FAILOVER" + } + + return result, nil +}