Files
objectivefs-docker-volume/main.go
Roman Vanicek ca04d2d9e2
All checks were successful
continuous-integration/drone/push Build is passing
Automatic snapshot mounting and destroying. Upgrade to Objectivefs 7.2.
2024-03-19 18:03:29 +01:00

716 lines
20 KiB
Go

// Copyright (c) 2020, Objective Security Corporation
// Permission to use, copy, modify, and/or distribute this software for any
// purpose with or without fee is hereby granted, provided that the above
// copyright notice and this permission notice appear in all copies.
// THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES
// WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF
// MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR
// ANY SPECIAL, DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES
// WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN
// ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF
// OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
package main
import (
"bufio"
"bytes"
"encoding/gob"
"flag"
"fmt"
"os"
"os/exec"
"os/user"
"path"
"path/filepath"
"strconv"
"strings"
"sync"
"time"
"github.com/boltdb/bolt"
"github.com/docker/go-plugins-helpers/volume"
log "github.com/sirupsen/logrus"
)
type ofsVolume struct {
Volume *volume.Volume
Fs string
Opts string
Env []string
Asap bool
MountSnapshots bool
}
type ofsVolumeRt struct {
use map[string]bool
mounted bool
}
type ofsDriver struct {
sync.RWMutex
volumedb *bolt.DB
volumeRt map[string]*ofsVolumeRt
defEnv map[string]string
defMountOpt string
defMountSnapshots bool
}
type snapshotRule struct {
count int
period time.Duration
}
var version = "1.0"
const (
volumeBucket = "volumes"
snapshotsDirectory = "snapshots"
)
func (d *ofsDriver) Create(r *volume.CreateRequest) error {
log.WithFields(log.Fields{"name": r.Name}).Info("Create ObjectiveFS Volume")
d.Lock()
defer d.Unlock()
tx, err := d.volumedb.Begin(true)
if err != nil {
return err
}
defer tx.Rollback()
_, _, volumeExists, err := d.getVolumeInfo(tx, r.Name)
if volumeExists {
return fmt.Errorf("volume '%s' already exists", r.Name)
}
v := &ofsVolume{}
v.Volume = &volume.Volume{Name: r.Name, Mountpoint: filepath.Join(volume.DefaultDockerRootDirectory, "objectivefs", r.Name), CreatedAt: time.Now().Format(time.RFC3339Nano)}
v.Opts = d.defMountOpt
v.Fs = r.Name
v.MountSnapshots = d.defMountSnapshots
env := make(map[string]string)
for id, val := range d.defEnv {
env[id] = val
}
for key, val := range r.Options {
switch key {
case "fs":
v.Fs = val
case "options", "ptions":
if len(v.Opts) == 0 {
v.Opts = val
} else {
v.Opts = v.Opts + "," + val
}
case "asap":
v.Asap = true
case "mountSnapshots":
v.MountSnapshots = val == "yes"
default:
env[key] = val
}
}
for id, val := range env {
v.Env = append(v.Env, id+"="+val)
}
if err := d.storeVolumeInfo(tx, r.Name, v); err != nil {
return err
}
// Commit the transaction and check for error.
if err := tx.Commit(); err != nil {
return err
}
return nil
}
func (d *ofsDriver) List() (*volume.ListResponse, error) {
d.Lock()
defer d.Unlock()
tx, err := d.volumedb.Begin(false)
if err != nil {
return nil, err
}
defer tx.Rollback()
var vols []*volume.Volume
volumeMap, err := d.getVolumeMap(tx)
for _, v := range volumeMap {
vols = append(vols, v.Volume)
}
return &volume.ListResponse{Volumes: vols}, nil
}
func (d *ofsDriver) Get(r *volume.GetRequest) (*volume.GetResponse, error) {
d.Lock()
defer d.Unlock()
tx, err := d.volumedb.Begin(false)
if err != nil {
return nil, err
}
defer tx.Rollback()
volumeInfo, _, volumeExists, getVolErr := d.getVolumeInfo(tx, r.Name)
if !volumeExists {
return &volume.GetResponse{}, fmt.Errorf("volume %s does not exist", r.Name)
}
if getVolErr != nil {
return &volume.GetResponse{}, getVolErr
}
return &volume.GetResponse{Volume: volumeInfo.Volume}, nil
}
func umount(v *ofsVolume, rt *ofsVolumeRt) error {
log.WithFields(log.Fields{"name": v.Volume.Name}).Info("Unmount ObjectiveFS Volume")
if !rt.mounted {
return fmt.Errorf("Volume is not mounted")
}
if err := exec.Command("umount", v.Volume.Mountpoint).Run(); err != nil {
return err
}
if err := os.Remove(v.Volume.Mountpoint); err != nil {
return err
}
rt.mounted = false
return nil
}
func (d *ofsDriver) Remove(r *volume.RemoveRequest) error {
d.Lock()
defer d.Unlock()
tx, err := d.volumedb.Begin(true)
if err != nil {
return err
}
defer tx.Rollback()
v, rt, volumeExists, getVolErr := d.getVolumeInfo(tx, r.Name)
if !volumeExists {
return fmt.Errorf("volume %s does not exist", r.Name)
}
if getVolErr != nil {
return getVolErr
}
if len(rt.use) != 0 {
return fmt.Errorf("volume '%s' currently in use (%d unique)", r.Name, len(rt.use))
}
if rt.mounted {
if err := umount(v, rt); err != nil {
return err
}
}
if err := d.removeVolumeInfo(tx, r.Name); err != nil {
return err
}
return tx.Commit()
}
func (d *ofsDriver) Path(r *volume.PathRequest) (*volume.PathResponse, error) {
d.Lock()
defer d.Unlock()
tx, err := d.volumedb.Begin(false)
if err != nil {
return nil, err
}
defer tx.Rollback()
volumeInfo, _, volumeExists, getVolErr := d.getVolumeInfo(tx, r.Name)
if !volumeExists {
return &volume.PathResponse{}, fmt.Errorf("volume %s does not exist", r.Name)
}
if getVolErr != nil {
return &volume.PathResponse{}, getVolErr
}
return &volume.PathResponse{Mountpoint: volumeInfo.Volume.Mountpoint}, nil
}
func (d *ofsDriver) Mount(r *volume.MountRequest) (*volume.MountResponse, error) {
d.Lock()
defer d.Unlock()
tx, err := d.volumedb.Begin(true)
if err != nil {
return nil, err
}
defer tx.Rollback()
v, rt, volumeExists, getVolErr := d.getVolumeInfo(tx, r.Name)
if !volumeExists {
return &volume.MountResponse{}, fmt.Errorf("volume %s does not exist", r.Name)
}
if getVolErr != nil {
return &volume.MountResponse{}, getVolErr
}
log.WithFields(log.Fields{"name": r.Name, "id": r.ID}).Info("Attach ObjectiveFS Volume")
if !rt.mounted {
if err := os.MkdirAll(v.Volume.Mountpoint, 0755); err != nil {
return &volume.MountResponse{}, err
}
// Note: The first argument ("mount") causes running in the foreground, its absence in the background
var cmd *exec.Cmd
if len(v.Opts) == 0 {
cmd = exec.Command("/sbin/mount.objectivefs", "mount", v.Fs, v.Volume.Mountpoint)
} else {
cmd = exec.Command("/sbin/mount.objectivefs", "mount", "-o"+v.Opts, v.Fs, v.Volume.Mountpoint)
}
cmd.Env = v.Env
cmdReader, _ := cmd.StderrPipe()
log.WithFields(log.Fields{
"name": r.Name,
"cmd": cmd,
//"env": v.env, // for security reasons disabled
}).Info("Mount ObjectiveFS Volume")
scanner := bufio.NewScanner(cmdReader)
go func() {
for scanner.Scan() {
log.WithFields(log.Fields{"name": r.Name}).Info(scanner.Text())
}
cmd.Wait()
log.WithFields(log.Fields{"name": r.Name}).Info("Completed mount process")
// In case the mount stopped in unplanned fashion allow easy re-mount
rt.mounted = false
rt.use = make(map[string]bool)
}()
if err := cmd.Start(); err != nil {
return &volume.MountResponse{}, fmt.Errorf("unexpected error mounting '%s' error: %s", r.Name, err.Error())
}
// The drawback of running the mount in the foreground is there is no easy way to tell if it failed
// to initially connect. So we just wait a fixed amount of time and check for process exit or mount
// success.
for {
// Check for process exit
time.Sleep(100 * time.Millisecond)
if cmd.ProcessState != nil {
// The process has exited so consider an error occured
log.WithFields(log.Fields{"name": r.Name, "exitStatus": cmd.ProcessState.ExitCode()}).Error("Volume mount failed")
return &volume.MountResponse{}, fmt.Errorf("unexpected error mounting '%s' exist status: %v", r.Name, cmd.ProcessState.ExitCode())
}
// Check for mount
if isObjfs, err := isObjectiveFsMount(v.Volume.Mountpoint); err == nil && isObjfs {
break
}
}
log.WithFields(log.Fields{"name": r.Name}).Info("Volume mounted")
rt.mounted = true
if v.MountSnapshots {
go func() {
snapshotsPath := path.Join(v.Volume.Mountpoint, snapshotsDirectory)
log.WithFields(log.Fields{"name": r.Name, "path": snapshotsPath}).Info("Snapshot auto-mount is starting")
if _, err := os.Stat(snapshotsPath); os.IsNotExist(err) {
log.WithFields(log.Fields{"name": r.Name, "directory": snapshotsDirectory}).Info("Creating the snapshots mount directory")
if err := os.Mkdir(snapshotsPath, os.ModePerm); err != nil {
log.WithFields(log.Fields{"name": r.Name}).Error("Failed to create the snapshots mount directory. Snapshot mounting will be disabled.")
return
}
}
for cmd.ProcessState == nil {
if snapshotRulesB, err := applyEnv(exec.Command("/sbin/mount.objectivefs", "snapshot", "-l", v.Fs), v.Env).Output(); err == nil {
if expectedSnapshots, err := generateSnapshotsFromRulesForNow(string(snapshotRulesB)); err == nil {
if existingSnapshotsB, err := applyEnv(exec.Command("/sbin/mount.objectivefs", "list", "-sz", v.Fs), v.Env).Output(); err == nil {
if existingSnapshots, err := parseExistingSnapshots(string(existingSnapshotsB), v.Fs); err == nil {
if existingMounts, err := getMountedSnaphots(snapshotsPath); err == nil {
// Destroy old snapshots
for i, name := range setMinus(existingSnapshots, expectedSnapshots) {
expectedOutput := "Snapshot '" + name + "' destroyed."
if output, err := applyEnv(exec.Command("/sbin/mount.objectivefs", "destroy", name, "-f"), v.Env).Output(); err != nil || string(output) != expectedOutput {
log.WithFields(log.Fields{"name": r.Name, "snapshot": i}).Warn("Failed to destroy an expired snapshot.")
}
}
// Remove mounts of expired snapshots
for i, path := range setMinus(existingMounts, expectedSnapshots) {
if err := exec.Command("umount", path).Run(); err != nil {
log.WithFields(log.Fields{"name": r.Name, "snapshot": i}).Warn("Failed to unmount an expired snapshot.")
}
if err := os.Remove(path); err != nil {
log.WithFields(log.Fields{"name": r.Name, "snapshot": i}).Warn("Failed to remove directory of an expired snapshot.")
}
}
// Add new mounts
for i, name := range setMinus(setIntersect(existingSnapshots, expectedSnapshots), existingMounts) {
dest := filepath.Join(snapshotsPath, i)
if err := applyEnv(exec.Command("/sbin/mount.objectivefs", name, dest), v.Env).Run(); err != nil {
log.WithFields(log.Fields{"name": r.Name, "snapshot": i}).Warn("Failed to destroy an expired snapshot.")
}
}
} else {
log.WithFields(log.Fields{"name": r.Name, "Error": err}).Warn("Cannot determine existing snapshot mounts")
}
} else {
log.WithFields(log.Fields{"name": r.Name, "Error": err}).Warn("Cannot parse existing snapshot names")
}
} else {
log.WithFields(log.Fields{"name": r.Name, "Error": err}).Warn("Cannot list existing snapshot names")
}
} else {
log.WithFields(log.Fields{"name": r.Name, "Error": err}).Warn("Cannot determine expected snapshot names")
}
} else {
log.WithFields(log.Fields{"name": r.Name, "Error": err}).Warn("Cannot detect snapshot frequency")
}
// Cycle periodically
time.Sleep(5 * time.Minute)
}
log.WithFields(log.Fields{"name": r.Name}).Info("Completed snapshot auto-mounting")
}()
}
}
rt.use[r.ID] = true
d.storeVolumeInfo(tx, r.Name, v)
return &volume.MountResponse{Mountpoint: v.Volume.Mountpoint}, tx.Commit()
}
func (d *ofsDriver) Unmount(r *volume.UnmountRequest) error {
d.Lock()
defer d.Unlock()
log.WithFields(log.Fields{"name": r.Name, "id": r.ID}).Info("Detach ObjectiveFS Volume")
tx, err := d.volumedb.Begin(true)
if err != nil {
return err
}
defer tx.Rollback()
v, rt, volumeExists, getVolErr := d.getVolumeInfo(tx, r.Name)
if !volumeExists {
return fmt.Errorf("volume %s does not exist", r.Name)
}
if getVolErr != nil {
return getVolErr
}
delete(rt.use, r.ID)
if len(rt.use) == 0 && v.Asap {
if err := umount(v, rt); err != nil {
return err
}
}
d.storeVolumeInfo(tx, r.Name, v)
return tx.Commit()
}
func (d *ofsDriver) Capabilities() *volume.CapabilitiesResponse {
return &volume.CapabilitiesResponse{Capabilities: volume.Capability{Scope: "local"}}
}
var (
logLevel = flag.String("log", "", "log level")
logFilePath = flag.String("logfile", "", "log file")
)
func main() {
flag.Parse()
if *logLevel == "" {
if *logLevel = os.Getenv("LOG_LEVEL"); *logLevel == "" {
*logLevel = "info"
}
}
level, err := log.ParseLevel(*logLevel)
if err != nil {
log.WithError(err).Fatal("Failed to parse log level")
}
log.SetLevel(level)
if *logFilePath != "" {
f, err := os.OpenFile(*logFilePath, os.O_CREATE|os.O_WRONLY|os.O_APPEND, 0666)
if err != nil {
log.WithError(err).Fatal("Failed to open log file for writing")
}
defer f.Close()
log.StandardLogger().Out = f
}
log.Info("Starting ObjectiveFS Volume Driver, version " + version)
defEnv := make(map[string]string)
for _, i := range []string{"ACCESS_KEY", "SECRET_KEY", "OBJECTIVEFS_LICENSE", "OBJECTSTORE", "ENDPOINT", "CACHESIZE", "DISKCACHE_SIZE", "DISKCACHE_PATH", "OBJECTIVEFS_PASSPHRASE", "IDMAP"} {
if a := os.Getenv(i); a != "" {
defEnv[i] = a
}
}
defMountOpt := os.Getenv("OBJECTIVEFS_MOUNT_OPTIONS")
defMountSnapshots := os.Getenv("OBJECTIVEFS_MOUNT_SNAPSHOTS") == "yes"
db, err := bolt.Open("objectivefs.db", 0600, nil)
if err != nil {
log.Fatal(err)
}
db.Update(func(tx *bolt.Tx) error {
_, err := tx.CreateBucketIfNotExists([]byte(volumeBucket))
if err != nil {
log.Fatalf("create bucket: %s", err)
}
return nil
})
d := &ofsDriver{volumedb: db, volumeRt: make(map[string]*ofsVolumeRt), defEnv: defEnv, defMountOpt: defMountOpt, defMountSnapshots: defMountSnapshots}
h := volume.NewHandler(d)
u, _ := user.Lookup("root")
gid, _ := strconv.Atoi(u.Gid)
if err := h.ServeUnix("/run/docker/plugins/objectivefs", gid); err != nil {
log.Fatal(err)
}
}
// ----------------
func (p *ofsVolume) gobEncode() ([]byte, error) {
buf := new(bytes.Buffer)
enc := gob.NewEncoder(buf)
err := enc.Encode(p)
if err != nil {
return nil, err
}
return buf.Bytes(), nil
}
func gobDecode(data []byte) (*ofsVolume, error) {
var p *ofsVolume
buf := bytes.NewBuffer(data)
dec := gob.NewDecoder(buf)
err := dec.Decode(&p)
if err != nil {
return nil, err
}
return p, nil
}
func (p *ofsDriver) storeVolumeInfo(tx *bolt.Tx, volumeName string, info *ofsVolume) error {
bucket := tx.Bucket([]byte(volumeBucket))
b, err := info.gobEncode()
if err != nil {
return err
}
return bucket.Put([]byte(volumeName), b)
}
func (p *ofsDriver) getVolumeInfo(tx *bolt.Tx, volumeName string) (*ofsVolume, *ofsVolumeRt, bool, error) {
bucket := tx.Bucket([]byte(volumeBucket))
v := bucket.Get([]byte(volumeName))
if v == nil {
return nil, nil, false, nil
}
info, err := gobDecode(v)
rt, ok := p.volumeRt[volumeName]
if !ok {
rt = &ofsVolumeRt{use: make(map[string]bool), mounted: false}
p.volumeRt[volumeName] = rt
}
return info, rt, true, err
}
func (p *ofsDriver) getVolumeMap(tx *bolt.Tx) (map[string]ofsVolume, error) {
bucket := tx.Bucket([]byte(volumeBucket))
ret := make(map[string]ofsVolume)
err := bucket.ForEach(func(k, v []byte) error {
info, err := gobDecode(v)
if err != nil {
return err
}
ret[string(k)] = *info
return nil
})
return ret, err
}
func (p *ofsDriver) removeVolumeInfo(tx *bolt.Tx, volumeName string) error {
bucket := tx.Bucket([]byte(volumeBucket))
return bucket.Delete([]byte(volumeName))
}
/*
func parseSnapshotRules(rulesS string) ([]snapshotRule, error) {
var result []snapshotRule
rules := strings.Split(rulesS, " ")
for _, i := range rules {
var rule snapshotRule
countAndRest := strings.SplitN(i, "@", 2)
if len(countAndRest) != 2 || len(countAndRest[1]) < 2 {
return nil, fmt.Errorf("Failed to parse snapshot rule '" + i + "'")
}
if count, err := strconv.ParseInt(countAndRest[0], 10, 32); err != nil {
return nil, fmt.Errorf("Failed to parse snapshot rule '" + i + "'")
} else {
rule.count = int(count)
}
period := countAndRest[1]
var multiplier time.Duration
switch period[len(period)-1] {
// see https://objectivefs.com/howto/snapshots
case 'm':
multiplier = time.Minute
case 'h':
multiplier = time.Hour
case 'd':
multiplier = 24 * time.Hour
case 'w':
multiplier = 7 * 24 * time.Hour
case 'n':
multiplier = 4 * 7 * 24 * time.Hour // Simple month (4 weeks)
case 'q':
multiplier = 12 * 7 * 24 * time.Hour // Simple quarter (12 weeks)
case 'y':
multiplier = 48 * 7 * 24 * time.Hour // Simple year (48 weeks)
default:
return nil, fmt.Errorf("Failed to parse snapshot period '" + period + "'")
}
if periodCount, err := strconv.ParseInt(period[:len(period)-1], 10, 32); err != nil {
return nil, fmt.Errorf("Failed to parse snapshot rule '" + i + "'")
} else {
rule.period = multiplier * time.Duration(periodCount)
}
result = append(result, rule)
}
return result, nil
}*/
func applyEnv(cmd *exec.Cmd, env []string) *exec.Cmd {
cmd.Env = env
return cmd
}
func generateSnapshotsFromRulesForNow(rules string) (map[string]bool, error) {
if timesB, err := exec.Command("/sbin/mount.objectivefs", "snapshot", "-vs", rules).Output(); err == nil {
var result map[string]bool
scanner := bufio.NewScanner(strings.NewReader(string(timesB)))
for scanner.Scan() {
line := scanner.Text()
if strings.HasPrefix(line, "Number of automatic snapshots:") || strings.HasPrefix(line, "Automatic schedule:") {
continue
}
if _, err := time.Parse("2006-01-02T15:04:05Z", line); err != nil {
return nil, err
} else {
result[line] = false
}
}
return result, nil
} else {
return nil, err
}
}
func parseExistingSnapshots(data string, expectedPrefix string) (map[string]string, error) {
var result map[string]string
scanner := bufio.NewScanner(strings.NewReader(data))
for scanner.Scan() {
line := scanner.Text()
if strings.HasPrefix(line, "NAME") {
// Skip column headers
continue
}
fields := strings.Fields(line)
name := fields[0]
uriSchemeAndRest := strings.SplitN(name, "://", 2)
if len(uriSchemeAndRest) != 2 {
return nil, fmt.Errorf("Expected URI scheme in the list of existing snapshosts")
}
hostAndPath := uriSchemeAndRest[1]
if hostAndPath == expectedPrefix {
// This is the live filesystem not a snapshot
continue
}
if !strings.HasPrefix(hostAndPath, expectedPrefix+"@") {
return nil, fmt.Errorf("Unexpected URI in the list of existing snapshosts")
}
time := hostAndPath[len(expectedPrefix)+1:]
result[time] = name
}
return result, nil
}
func getMountedSnaphots(baseDir string) (map[string]string, error) {
if entries, err := os.ReadDir(baseDir); err == nil {
return nil, err
} else {
var result map[string]string
for _, i := range entries {
if i.IsDir() {
iPath := filepath.Join(baseDir, i.Name())
if isObjFs, err := isObjectiveFsMount(iPath); err == nil && isObjFs {
result[i.Name()] = iPath
}
}
}
return result, nil
}
}
func isObjectiveFsMount(path string) (bool, error) {
mount := exec.Command("df", "--output=target", "-t", "fuse.objectivefs", path)
if data, err := mount.CombinedOutput(); err == nil {
scanner := bufio.NewScanner(strings.NewReader(string(data)))
// On success the first line contains column headers (in our case "Mounted on")
// and the second line contains the nearest mount point on the root path
return scanner.Scan() && scanner.Scan() && scanner.Text() == path, nil
} else {
return false, err
}
}
func setMinus[TKey comparable, TValue any, TValue2 any](a map[TKey]TValue, b map[TKey]TValue2) map[TKey]TValue {
var result map[TKey]TValue
for i, j := range a {
if _, contains := b[i]; contains {
result[i] = j
}
}
return result
}
func setIntersect[TKey comparable, TValue any, TValue2 any](a map[TKey]TValue, b map[TKey]TValue2) map[TKey]TValue {
var result map[TKey]TValue
for i, j := range a {
if _, contains := b[i]; contains {
result[i] = j
}
}
return result
}