forked from Ivasoft/objectivefs-docker-volume
All checks were successful
continuous-integration/drone/push Build is passing
983 lines
30 KiB
Go
983 lines
30 KiB
Go
// Copyright (c) 2020, Objective Security Corporation
|
|
|
|
// Permission to use, copy, modify, and/or distribute this software for any
|
|
// purpose with or without fee is hereby granted, provided that the above
|
|
// copyright notice and this permission notice appear in all copies.
|
|
|
|
// THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES
|
|
// WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF
|
|
// MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR
|
|
// ANY SPECIAL, DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES
|
|
// WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN
|
|
// ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF
|
|
// OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
|
|
package main
|
|
|
|
import (
|
|
"bufio"
|
|
"bytes"
|
|
"encoding/gob"
|
|
"flag"
|
|
"fmt"
|
|
"os"
|
|
"os/exec"
|
|
"os/user"
|
|
"path"
|
|
"path/filepath"
|
|
"strconv"
|
|
"strings"
|
|
"sync"
|
|
"time"
|
|
|
|
"github.com/boltdb/bolt"
|
|
"github.com/docker/go-plugins-helpers/volume"
|
|
log "github.com/sirupsen/logrus"
|
|
)
|
|
|
|
type ofsVolume struct {
|
|
Volume *volume.Volume
|
|
Fs string
|
|
Opts []string
|
|
Env []string
|
|
Asap bool
|
|
// Create a snaphot on each mount (and delete the previous one)
|
|
Backup bool
|
|
BackupSnapshot string
|
|
MountSnapshots bool
|
|
SnapshotsFilter string
|
|
}
|
|
|
|
type ofsVolumeRt struct {
|
|
use map[string]bool
|
|
mounted bool
|
|
}
|
|
|
|
type ofsDriver struct {
|
|
sync.RWMutex
|
|
volumedb *bolt.DB
|
|
volumeRt map[string]*ofsVolumeRt
|
|
defEnv map[string]string
|
|
defMountOpt map[string]string
|
|
defMountSnapshots bool
|
|
}
|
|
|
|
var version = "1.4"
|
|
var objfsVersion = "7.0"
|
|
|
|
const (
|
|
volumeBucket = "volumes"
|
|
snapshotsDirectory = "snapshots"
|
|
backupDirectory = "backup"
|
|
)
|
|
|
|
func (d *ofsDriver) Create(r *volume.CreateRequest) error {
|
|
log.WithFields(log.Fields{"name": r.Name}).Info("Create ObjectiveFS Volume")
|
|
d.Lock()
|
|
defer d.Unlock()
|
|
|
|
tx, err := d.volumedb.Begin(true)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
defer tx.Rollback()
|
|
|
|
_, _, volumeExists, err := d.getVolumeInfo(tx, r.Name)
|
|
if volumeExists {
|
|
return fmt.Errorf("volume '%s' already exists", r.Name)
|
|
}
|
|
|
|
v := &ofsVolume{}
|
|
v.Volume = &volume.Volume{Name: r.Name, Mountpoint: filepath.Join(volume.DefaultDockerRootDirectory, "objectivefs", r.Name), CreatedAt: time.Now().Format(time.RFC3339Nano)}
|
|
v.Fs = r.Name
|
|
v.MountSnapshots = d.defMountSnapshots
|
|
env := make(map[string]string)
|
|
for id, val := range d.defEnv {
|
|
env[id] = val
|
|
}
|
|
opts := make(map[string]string)
|
|
for id, val := range d.defMountOpt {
|
|
opts[id] = val
|
|
}
|
|
for key, val := range r.Options {
|
|
switch key {
|
|
case "fs":
|
|
v.Fs = val
|
|
case "options", "ptions":
|
|
for _, i := range strings.Split(val, ",") {
|
|
addMountOption(opts, i)
|
|
}
|
|
case "asap":
|
|
v.Asap = true
|
|
case "backup":
|
|
v.Backup = true
|
|
case "mountSnapshots":
|
|
v.MountSnapshots = val == "yes"
|
|
case "snapshotsFilter":
|
|
v.SnapshotsFilter = val
|
|
default:
|
|
env[key] = val
|
|
}
|
|
}
|
|
for id, val := range env {
|
|
v.Env = append(v.Env, id+"="+val)
|
|
}
|
|
|
|
for id, val := range opts {
|
|
var keyVal string
|
|
if val == "" {
|
|
keyVal = id
|
|
} else {
|
|
keyVal = id + "=" + val
|
|
}
|
|
v.Opts = append(v.Opts, keyVal)
|
|
}
|
|
|
|
if err := d.storeVolumeInfo(tx, r.Name, v); err != nil {
|
|
return err
|
|
}
|
|
|
|
// Commit the transaction and check for error.
|
|
if err := tx.Commit(); err != nil {
|
|
return err
|
|
}
|
|
return nil
|
|
}
|
|
|
|
func (d *ofsDriver) List() (*volume.ListResponse, error) {
|
|
d.Lock()
|
|
defer d.Unlock()
|
|
|
|
tx, err := d.volumedb.Begin(false)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
defer tx.Rollback()
|
|
|
|
var vols []*volume.Volume
|
|
volumeMap, err := d.getVolumeMap(tx)
|
|
for _, v := range volumeMap {
|
|
vols = append(vols, v.Volume)
|
|
}
|
|
return &volume.ListResponse{Volumes: vols}, nil
|
|
}
|
|
|
|
func (d *ofsDriver) Get(r *volume.GetRequest) (*volume.GetResponse, error) {
|
|
d.Lock()
|
|
defer d.Unlock()
|
|
|
|
tx, err := d.volumedb.Begin(false)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
defer tx.Rollback()
|
|
|
|
volumeInfo, _, volumeExists, getVolErr := d.getVolumeInfo(tx, r.Name)
|
|
if !volumeExists {
|
|
return &volume.GetResponse{}, fmt.Errorf("volume %s does not exist", r.Name)
|
|
}
|
|
if getVolErr != nil {
|
|
return &volume.GetResponse{}, getVolErr
|
|
}
|
|
return &volume.GetResponse{Volume: volumeInfo.Volume}, nil
|
|
}
|
|
|
|
func umount(v *ofsVolume, rt *ofsVolumeRt, isRemount bool) error {
|
|
log.WithFields(log.Fields{"name": v.Volume.Name}).Info("Unmount ObjectiveFS Volume")
|
|
if !rt.mounted {
|
|
return fmt.Errorf("volume is not mounted")
|
|
}
|
|
|
|
if v.MountSnapshots {
|
|
// Snapshots must be unmounted first
|
|
// Note: We do NOT remove the mount directory as other distributed mounts may be using them
|
|
snapshotsPath := path.Join(v.Volume.Mountpoint, snapshotsDirectory)
|
|
if existingMounts, err := getMountedSnaphots(snapshotsPath, true); err == nil {
|
|
log.WithFields(log.Fields{"name": v.Volume.Name, "existingMounts": existingMounts}).Trace("Existing snapshot mounts")
|
|
for i, path := range existingMounts {
|
|
if err := exec.Command("umount", path).Run(); err != nil {
|
|
log.WithFields(log.Fields{"name": v.Volume.Name, "snapshot": i}).Warn("Failed to unmount a snapshot on unmount.")
|
|
} else {
|
|
log.WithFields(log.Fields{"name": v.Volume.Name, "snapshot": i}).Trace("Snapshot unmounted on unmount.")
|
|
}
|
|
}
|
|
|
|
if err := exec.Command("umount", snapshotsPath).Run(); err != nil {
|
|
log.WithFields(log.Fields{"name": v.Volume.Name}).Warn("Failed to unmount a snapshots tmpfs on unmount.")
|
|
} else {
|
|
log.WithFields(log.Fields{"name": v.Volume.Name}).Trace("Snapshot tmpfs unmounted on unmount.")
|
|
}
|
|
} else {
|
|
log.WithFields(log.Fields{"name": v.Volume.Name, "Error": err}).Warn("Cannot determine existing snapshot mounts on unmount")
|
|
}
|
|
}
|
|
|
|
var umountPath string
|
|
if v.Backup {
|
|
umountPath = path.Join(v.Volume.Mountpoint, backupDirectory)
|
|
} else {
|
|
umountPath = v.Volume.Mountpoint
|
|
}
|
|
|
|
if err := exec.Command("umount", umountPath).Run(); err != nil {
|
|
return err
|
|
}
|
|
|
|
if v.Backup {
|
|
if err := applyEnv(exec.Command("/sbin/mount.objectivefs", "destroy", v.BackupSnapshot, "-f"), v.Env).Wait(); err == nil {
|
|
log.WithFields(log.Fields{"name": v.Volume.Name, "snapshot": v.BackupSnapshot}).Info("Failed to destroy the previous snapshot")
|
|
return err
|
|
}
|
|
v.BackupSnapshot = ""
|
|
|
|
if !isRemount {
|
|
if err := os.Remove(umountPath); err != nil {
|
|
return err
|
|
}
|
|
}
|
|
}
|
|
|
|
if !isRemount {
|
|
if err := os.Remove(v.Volume.Mountpoint); err != nil {
|
|
return err
|
|
}
|
|
}
|
|
rt.mounted = false
|
|
return nil
|
|
}
|
|
|
|
func (d *ofsDriver) Remove(r *volume.RemoveRequest) error {
|
|
d.Lock()
|
|
defer d.Unlock()
|
|
|
|
tx, err := d.volumedb.Begin(true)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
defer tx.Rollback()
|
|
|
|
v, rt, volumeExists, getVolErr := d.getVolumeInfo(tx, r.Name)
|
|
if !volumeExists {
|
|
return fmt.Errorf("volume %s does not exist", r.Name)
|
|
}
|
|
if getVolErr != nil {
|
|
return getVolErr
|
|
}
|
|
if len(rt.use) != 0 {
|
|
return fmt.Errorf("volume '%s' currently in use (%d unique)", r.Name, len(rt.use))
|
|
}
|
|
if rt.mounted {
|
|
if err := umount(v, rt, false); err != nil {
|
|
return err
|
|
}
|
|
}
|
|
|
|
if err := d.removeVolumeInfo(tx, r.Name); err != nil {
|
|
return err
|
|
}
|
|
return tx.Commit()
|
|
}
|
|
|
|
func (d *ofsDriver) Path(r *volume.PathRequest) (*volume.PathResponse, error) {
|
|
d.Lock()
|
|
defer d.Unlock()
|
|
|
|
tx, err := d.volumedb.Begin(false)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
defer tx.Rollback()
|
|
|
|
volumeInfo, _, volumeExists, getVolErr := d.getVolumeInfo(tx, r.Name)
|
|
if !volumeExists {
|
|
return &volume.PathResponse{}, fmt.Errorf("volume %s does not exist", r.Name)
|
|
}
|
|
if getVolErr != nil {
|
|
return &volume.PathResponse{}, getVolErr
|
|
}
|
|
|
|
return &volume.PathResponse{Mountpoint: volumeInfo.Volume.Mountpoint}, nil
|
|
}
|
|
|
|
func (d *ofsDriver) Mount(r *volume.MountRequest) (*volume.MountResponse, error) {
|
|
d.Lock()
|
|
defer d.Unlock()
|
|
|
|
tx, err := d.volumedb.Begin(true)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
defer tx.Rollback()
|
|
|
|
v, rt, volumeExists, getVolErr := d.getVolumeInfo(tx, r.Name)
|
|
if !volumeExists {
|
|
return &volume.MountResponse{}, fmt.Errorf("volume %s does not exist", r.Name)
|
|
}
|
|
if getVolErr != nil {
|
|
return &volume.MountResponse{}, getVolErr
|
|
}
|
|
|
|
log.WithFields(log.Fields{"name": r.Name, "id": r.ID}).Info("Attach ObjectiveFS Volume")
|
|
|
|
// Each mount request causes new snapshot to get created and the old one deleted
|
|
if v.Backup && rt.mounted {
|
|
log.WithFields(log.Fields{"name": r.Name, "snapshot": v.BackupSnapshot}).Info("Going to unmount and destroy the previous backup snapshot")
|
|
if err := umount(v, rt, true); err != nil {
|
|
return &volume.MountResponse{}, err
|
|
}
|
|
}
|
|
|
|
if !rt.mounted {
|
|
if err := os.MkdirAll(v.Volume.Mountpoint, 0755); err != nil {
|
|
return &volume.MountResponse{}, err
|
|
}
|
|
|
|
var fs, mountPath string
|
|
if v.Backup {
|
|
log.WithFields(log.Fields{"name": r.Name}).Info("Going to create a backup snapshot")
|
|
mountPath = path.Join(v.Volume.Mountpoint, backupDirectory)
|
|
if _, err := os.Stat(mountPath); os.IsNotExist(err) {
|
|
log.WithFields(log.Fields{"name": r.Name, "directory": mountPath}).Info("Creating the backup mount directory")
|
|
if err := os.Mkdir(mountPath, os.ModePerm); err != nil {
|
|
log.WithFields(log.Fields{"name": r.Name}).Error("Failed to create the backup mount directory.")
|
|
return &volume.MountResponse{}, err
|
|
}
|
|
}
|
|
if snapshotTextB, err := applyEnv(exec.Command("/sbin/mount.objectivefs", "snapshot", v.Fs), v.Env).CombinedOutput(); err != nil {
|
|
log.WithFields(log.Fields{"name": v.Volume.Name, "output": string(snapshotTextB[:])}).Info("Failed to create a new backup snapshot")
|
|
return &volume.MountResponse{}, err
|
|
} else {
|
|
// Format: NOTE: Successfully created snapshot: snapshotId (<current UTC time>)
|
|
const okPrefix = "NOTE: Successfully created snapshot: "
|
|
snapshotText := string(snapshotTextB[:])
|
|
if snapshotIdWithSuffix, isOk := strings.CutPrefix(snapshotText, okPrefix); !isOk {
|
|
log.WithFields(log.Fields{"name": v.Volume.Name, "snapshot": v.BackupSnapshot}).Info("Failed to create a new backup snapshot")
|
|
return &volume.MountResponse{}, err
|
|
} else {
|
|
v.BackupSnapshot = strings.Split(snapshotIdWithSuffix, " ")[0]
|
|
fs = v.BackupSnapshot
|
|
}
|
|
}
|
|
} else {
|
|
fs = v.Fs
|
|
mountPath = v.Volume.Mountpoint
|
|
}
|
|
|
|
// Note: The first argument ("mount") causes running in the foreground, its absence in the background
|
|
var cmd *exec.Cmd
|
|
if len(v.Opts) == 0 {
|
|
cmd = exec.Command("/sbin/mount.objectivefs", "mount", fs, mountPath)
|
|
} else {
|
|
cmd = exec.Command("/sbin/mount.objectivefs", "mount", "-o"+strings.Join(v.Opts, ","), fs, mountPath)
|
|
}
|
|
|
|
cmd.Env = v.Env
|
|
cmdReader, _ := cmd.StderrPipe()
|
|
log.WithFields(log.Fields{
|
|
"name": r.Name,
|
|
"cmd": cmd,
|
|
//"env": v.env, // for security reasons disabled
|
|
}).Info("Mount ObjectiveFS Volume")
|
|
scanner := bufio.NewScanner(cmdReader)
|
|
go func() {
|
|
for scanner.Scan() {
|
|
log.WithFields(log.Fields{"name": r.Name}).Info(scanner.Text())
|
|
}
|
|
cmd.Wait()
|
|
log.WithFields(log.Fields{"name": r.Name}).Info("Completed mount process")
|
|
|
|
// In case the mount stopped in unplanned fashion allow easy re-mount
|
|
rt.mounted = false
|
|
rt.use = make(map[string]bool)
|
|
}()
|
|
if err := cmd.Start(); err != nil {
|
|
return &volume.MountResponse{}, fmt.Errorf("unexpected error mounting '%s' error: %s", r.Name, err.Error())
|
|
}
|
|
|
|
// The drawback of running the mount in the foreground is there is no easy way to tell if it failed
|
|
// to initially connect. So we just wait a fixed amount of time and check for process exit or mount
|
|
// success.
|
|
for i := 0; ; i++ {
|
|
// Check for process exit
|
|
time.Sleep(100 * time.Millisecond)
|
|
if cmd.ProcessState != nil {
|
|
// The process has exited so consider an error occured
|
|
log.WithFields(log.Fields{"name": r.Name, "exitStatus": cmd.ProcessState.ExitCode()}).Error("Volume mount failed")
|
|
return &volume.MountResponse{}, fmt.Errorf("unexpected error mounting '%s' exist status: %v", r.Name, cmd.ProcessState.ExitCode())
|
|
}
|
|
|
|
// Check for mount
|
|
// BUG: mount.objectivefs has a bug when option "noocache" is used that prevents us from checking
|
|
// the mount point is properly initialized. If we check too early then something breaks in the
|
|
// mount.objectivefs process and the mount forever stays in startup state returning "Input/output error"
|
|
// for any filesystem interaction from us or other tools.
|
|
// UPDATE: The bug seems present also without "noocache" if S3 connection is fast enough
|
|
// HACK: Due to the bug we just wait a safe interval
|
|
if i == 0 /*&& slices.Contains(v.Opts, "noocache")*/ {
|
|
time.Sleep(1000 * time.Millisecond)
|
|
}
|
|
if isObjfs, err := isObjectiveFsMount(mountPath); err == nil && isObjfs {
|
|
break
|
|
}
|
|
}
|
|
|
|
log.WithFields(log.Fields{"name": r.Name}).Info("Volume mounted")
|
|
rt.mounted = true
|
|
|
|
if v.MountSnapshots {
|
|
go func() {
|
|
snapshotsPath := path.Join(v.Volume.Mountpoint, snapshotsDirectory)
|
|
log.WithFields(log.Fields{"name": r.Name, "path": snapshotsPath}).Info("Snapshot auto-mount is starting")
|
|
if _, err := os.Stat(snapshotsPath); os.IsNotExist(err) {
|
|
log.WithFields(log.Fields{"name": r.Name, "directory": snapshotsDirectory}).Info("Creating the snapshots mount directory")
|
|
if err := os.Mkdir(snapshotsPath, os.ModePerm); err != nil {
|
|
log.WithFields(log.Fields{"name": r.Name}).Error("Failed to create the snapshots mount directory. Snapshot mounting will be disabled.")
|
|
return
|
|
}
|
|
}
|
|
|
|
// Mount a tmpfs there so it becomes machine specific and we can promptly delete directories
|
|
// that are outside of filteredSnapshots
|
|
if err := exec.Command("/bin/mount", "-t", "tmpfs", "-o", "size=8m", "tmpfs", snapshotsPath).Run(); err != nil {
|
|
log.WithFields(log.Fields{"name": r.Name, "directory": snapshotsDirectory}).Warn("Failed to create tmpfs in the snapshots mount directory.")
|
|
}
|
|
|
|
for cmd.ProcessState == nil {
|
|
if snapshotRulesB, err := applyEnv(exec.Command("/sbin/mount.objectivefs", "snapshot", "-l", v.Fs), v.Env).Output(); err == nil {
|
|
log.WithFields(log.Fields{"name": r.Name, "snapshotRules": string(snapshotRulesB)}).Debug("Current snapshot rules")
|
|
if expectedSnapshots, err := generateSnapshotsFromRulesForNow(strings.TrimSpace(string(snapshotRulesB))); err == nil {
|
|
log.WithFields(log.Fields{"name": r.Name, "expectedSnapshots": expectedSnapshots}).Trace("Expected snapshots")
|
|
if existingSnapshotsB, err := applyEnv(exec.Command("/sbin/mount.objectivefs", "list", "-sz", v.Fs), v.Env).Output(); err == nil {
|
|
if existingSnapshots, err := parseExistingSnapshots(string(existingSnapshotsB), v.Fs); err == nil {
|
|
log.WithFields(log.Fields{"name": r.Name, "existingSnapshots": existingSnapshots}).Trace("Existing snapshots")
|
|
if existingMounts, err := getMountedSnaphots(snapshotsPath, true); err == nil {
|
|
log.WithFields(log.Fields{"name": r.Name, "existingMounts": existingMounts}).Trace("Existing snapshot mounts")
|
|
if existingMountDirs, err := getMountedSnaphots(snapshotsPath, false); err == nil {
|
|
log.WithFields(log.Fields{"name": r.Name, "existingMountDirs": existingMounts}).Trace("Existing snapshot mount dirs")
|
|
|
|
// PERF: We allow mounting just subset of snapshots for performance reasons (required RAM)
|
|
var filteredSnapshots map[string]bool
|
|
if len(strings.TrimSpace(v.SnapshotsFilter)) != 0 {
|
|
if a, err := generateSnapshotsFromRulesForNow(strings.TrimSpace(v.SnapshotsFilter)); err == nil {
|
|
filteredSnapshots = a
|
|
log.WithFields(log.Fields{"name": r.Name, "invalidFilter": a}).Trace("Snapshots filter.")
|
|
} else {
|
|
filteredSnapshots = expectedSnapshots
|
|
log.WithFields(log.Fields{"name": r.Name, "invalidFilter": v.SnapshotsFilter}).Warn("Failed to parse snapshots filter. Will mount all snapshots.")
|
|
}
|
|
} else {
|
|
filteredSnapshots = expectedSnapshots
|
|
}
|
|
|
|
// Unmount snapshots that are no longer in filteredSnapshots to conserve RAM
|
|
for i, path := range setMinus(existingMounts, setIntersect(expectedSnapshots, filteredSnapshots)) {
|
|
if err := exec.Command("umount", path).Run(); err != nil {
|
|
log.WithFields(log.Fields{"name": r.Name, "snapshot": i}).Warn("Failed to unmount an expired snapshot.")
|
|
} else {
|
|
log.WithFields(log.Fields{"name": r.Name, "snapshot": i}).Debug("Snapshot unmounted.")
|
|
}
|
|
}
|
|
|
|
// Remove left-over dirs of expired snapshots
|
|
// Note: As our root shall be on tmpfs deleting the directories does not affect
|
|
// other machines with different filteredSnapshots
|
|
for i, path := range setMinus(existingMountDirs, setIntersect(expectedSnapshots, filteredSnapshots)) {
|
|
if err := os.Remove(path); err != nil {
|
|
log.WithFields(log.Fields{"name": r.Name, "snapshot": i}).Warn("Failed to remove directory of an expired snapshot.")
|
|
}
|
|
}
|
|
|
|
// Destroy old snapshots
|
|
for i, name := range setMinus(existingSnapshots, expectedSnapshots) {
|
|
expectedOutput := "Snapshot '" + name + "' destroyed."
|
|
if output, err := applyEnv(exec.Command("/sbin/mount.objectivefs", "destroy", name, "-f"), v.Env).Output(); err != nil || strings.TrimSpace(string(output)) != expectedOutput {
|
|
log.WithFields(log.Fields{"name": r.Name, "snapshot": i}).Warn("Failed to destroy an expired snapshot.")
|
|
} else {
|
|
log.WithFields(log.Fields{"name": r.Name, "snapshot": i}).Debug("Snapshot destroyed.")
|
|
}
|
|
}
|
|
|
|
// Add new mounts
|
|
for i, name := range setMinus(setIntersect(existingSnapshots, setIntersect(expectedSnapshots, filteredSnapshots)), existingMounts) {
|
|
dest := filepath.Join(snapshotsPath, i)
|
|
if err := os.Mkdir(dest, os.ModePerm); err == nil || os.IsExist(err) {
|
|
// Note: There is a missing "mount" argument so the mount continues running in a background process
|
|
var ssCmd *exec.Cmd
|
|
if len(v.Opts) == 0 {
|
|
ssCmd = exec.Command("/sbin/mount.objectivefs", name, dest)
|
|
} else {
|
|
ssCmd = exec.Command("/sbin/mount.objectivefs", "-o"+strings.Join(v.Opts, ","), name, dest)
|
|
}
|
|
if err := applyEnv(ssCmd, makeSnapshotEnv(v.Env)).Run(); err != nil {
|
|
log.WithFields(log.Fields{"name": r.Name, "snapshot": i}).Warn("Failed to mount a new snapshot.")
|
|
} else {
|
|
log.WithFields(log.Fields{"name": r.Name, "snapshot": i}).Debug("Snapshot mounted.")
|
|
}
|
|
} else {
|
|
log.WithFields(log.Fields{"name": r.Name, "snapshot": i}).Warn("Failed to create directory for a snapshot.")
|
|
}
|
|
}
|
|
} else {
|
|
log.WithFields(log.Fields{"name": r.Name, "Error": err}).Warn("Cannot determine existing snapshot mount dirs")
|
|
}
|
|
} else {
|
|
log.WithFields(log.Fields{"name": r.Name, "Error": err}).Warn("Cannot determine existing snapshot mounts")
|
|
}
|
|
} else {
|
|
log.WithFields(log.Fields{"name": r.Name, "Error": err}).Warn("Cannot parse existing snapshot names")
|
|
}
|
|
} else {
|
|
log.WithFields(log.Fields{"name": r.Name, "Error": err}).Warn("Cannot list existing snapshot names")
|
|
}
|
|
} else {
|
|
log.WithFields(log.Fields{"name": r.Name, "Error": err}).Warn("Cannot determine expected snapshot names")
|
|
}
|
|
} else {
|
|
log.WithFields(log.Fields{"name": r.Name, "Error": err}).Warn("Cannot detect snapshot frequency")
|
|
}
|
|
|
|
// Cycle periodically
|
|
time.Sleep(5 * time.Minute)
|
|
}
|
|
log.WithFields(log.Fields{"name": r.Name}).Info("Completed snapshot auto-mounting")
|
|
}()
|
|
}
|
|
}
|
|
rt.use[r.ID] = true
|
|
|
|
d.storeVolumeInfo(tx, r.Name, v)
|
|
|
|
return &volume.MountResponse{Mountpoint: v.Volume.Mountpoint}, tx.Commit()
|
|
}
|
|
|
|
func (d *ofsDriver) Unmount(r *volume.UnmountRequest) error {
|
|
d.Lock()
|
|
defer d.Unlock()
|
|
|
|
log.WithFields(log.Fields{"name": r.Name, "id": r.ID}).Info("Detach ObjectiveFS Volume")
|
|
|
|
tx, err := d.volumedb.Begin(true)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
defer tx.Rollback()
|
|
|
|
v, rt, volumeExists, getVolErr := d.getVolumeInfo(tx, r.Name)
|
|
if !volumeExists {
|
|
return fmt.Errorf("volume %s does not exist", r.Name)
|
|
}
|
|
if getVolErr != nil {
|
|
return getVolErr
|
|
}
|
|
|
|
delete(rt.use, r.ID)
|
|
if len(rt.use) == 0 && v.Asap {
|
|
if err := umount(v, rt, false); err != nil {
|
|
return err
|
|
}
|
|
}
|
|
|
|
d.storeVolumeInfo(tx, r.Name, v)
|
|
|
|
return tx.Commit()
|
|
}
|
|
|
|
func (d *ofsDriver) Capabilities() *volume.CapabilitiesResponse {
|
|
return &volume.CapabilitiesResponse{Capabilities: volume.Capability{Scope: "local"}}
|
|
}
|
|
|
|
var (
|
|
logLevel = flag.String("log", "", "log level")
|
|
logFilePath = flag.String("logfile", "", "log file")
|
|
)
|
|
|
|
func main() {
|
|
flag.Parse()
|
|
|
|
if *logLevel == "" {
|
|
if *logLevel = os.Getenv("LOG_LEVEL"); *logLevel == "" {
|
|
*logLevel = "info"
|
|
}
|
|
}
|
|
|
|
level, err := log.ParseLevel(*logLevel)
|
|
if err != nil {
|
|
log.WithError(err).Fatal("Failed to parse log level")
|
|
}
|
|
log.SetLevel(level)
|
|
|
|
if *logFilePath != "" {
|
|
f, err := os.OpenFile(*logFilePath, os.O_CREATE|os.O_WRONLY|os.O_APPEND, 0666)
|
|
if err != nil {
|
|
log.WithError(err).Fatal("Failed to open log file for writing")
|
|
}
|
|
defer f.Close()
|
|
|
|
log.StandardLogger().Out = f
|
|
}
|
|
log.Info("Starting ObjectiveFS Volume Driver, version " + version + " ObjectiveFS " + objfsVersion)
|
|
|
|
defEnv := make(map[string]string)
|
|
for _, i := range []string{"ACCESS_KEY", "SECRET_KEY", "OBJECTIVEFS_LICENSE", "OBJECTSTORE", "ENDPOINT", "CACHESIZE", "DISKCACHE_SIZE", "DISKCACHE_PATH", "OBJECTIVEFS_PASSPHRASE", "IDMAP"} {
|
|
if a := os.Getenv(i); a != "" {
|
|
defEnv[i] = a
|
|
}
|
|
}
|
|
|
|
defMountOptS := os.Getenv("OBJECTIVEFS_MOUNT_OPTIONS")
|
|
defMountOpt := make(map[string]string)
|
|
for _, i := range strings.Split(defMountOptS, ",") {
|
|
addMountOption(defMountOpt, i)
|
|
}
|
|
|
|
defMountSnapshots := os.Getenv("OBJECTIVEFS_MOUNT_SNAPSHOTS") == "yes"
|
|
|
|
db, err := bolt.Open("objectivefs.db", 0600, nil)
|
|
if err != nil {
|
|
log.Fatal(err)
|
|
}
|
|
|
|
db.Update(func(tx *bolt.Tx) error {
|
|
_, err := tx.CreateBucketIfNotExists([]byte(volumeBucket))
|
|
if err != nil {
|
|
log.Fatalf("create bucket: %s", err)
|
|
}
|
|
return nil
|
|
})
|
|
|
|
d := &ofsDriver{volumedb: db, volumeRt: make(map[string]*ofsVolumeRt), defEnv: defEnv, defMountOpt: defMountOpt, defMountSnapshots: defMountSnapshots}
|
|
h := volume.NewHandler(d)
|
|
u, _ := user.Lookup("root")
|
|
gid, _ := strconv.Atoi(u.Gid)
|
|
if err := h.ServeUnix("/run/docker/plugins/objectivefs", gid); err != nil {
|
|
log.Fatal(err)
|
|
}
|
|
}
|
|
|
|
// ----------------
|
|
|
|
func (p *ofsVolume) gobEncode() ([]byte, error) {
|
|
buf := new(bytes.Buffer)
|
|
enc := gob.NewEncoder(buf)
|
|
err := enc.Encode(p)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
return buf.Bytes(), nil
|
|
}
|
|
|
|
func gobDecode(data []byte) (*ofsVolume, error) {
|
|
var p *ofsVolume
|
|
buf := bytes.NewBuffer(data)
|
|
dec := gob.NewDecoder(buf)
|
|
err := dec.Decode(&p)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
return p, nil
|
|
}
|
|
|
|
func (p *ofsDriver) storeVolumeInfo(tx *bolt.Tx, volumeName string, info *ofsVolume) error {
|
|
bucket := tx.Bucket([]byte(volumeBucket))
|
|
b, err := info.gobEncode()
|
|
if err != nil {
|
|
return err
|
|
}
|
|
return bucket.Put([]byte(volumeName), b)
|
|
}
|
|
|
|
func (p *ofsDriver) getVolumeInfo(tx *bolt.Tx, volumeName string) (*ofsVolume, *ofsVolumeRt, bool, error) {
|
|
bucket := tx.Bucket([]byte(volumeBucket))
|
|
v := bucket.Get([]byte(volumeName))
|
|
if v == nil {
|
|
return nil, nil, false, nil
|
|
}
|
|
info, err := gobDecode(v)
|
|
|
|
rt, ok := p.volumeRt[volumeName]
|
|
if !ok {
|
|
rt = &ofsVolumeRt{use: make(map[string]bool), mounted: false}
|
|
p.volumeRt[volumeName] = rt
|
|
}
|
|
|
|
return info, rt, true, err
|
|
}
|
|
|
|
func (p *ofsDriver) getVolumeMap(tx *bolt.Tx) (map[string]ofsVolume, error) {
|
|
bucket := tx.Bucket([]byte(volumeBucket))
|
|
ret := make(map[string]ofsVolume)
|
|
err := bucket.ForEach(func(k, v []byte) error {
|
|
info, err := gobDecode(v)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
ret[string(k)] = *info
|
|
return nil
|
|
})
|
|
return ret, err
|
|
}
|
|
|
|
func (p *ofsDriver) removeVolumeInfo(tx *bolt.Tx, volumeName string) error {
|
|
bucket := tx.Bucket([]byte(volumeBucket))
|
|
return bucket.Delete([]byte(volumeName))
|
|
}
|
|
|
|
func applyEnv(cmd *exec.Cmd, env []string) *exec.Cmd {
|
|
cmd.Env = env
|
|
return cmd
|
|
}
|
|
|
|
func makeSnapshotEnv(env []string) []string {
|
|
// We must conserve memory as there are hundreds of snaphots
|
|
// Note: This is a minimum based on documentation
|
|
const cacheSizeValue = "CACHESIZE=64MiB"
|
|
|
|
result := make([]string, len(env))
|
|
copy(result, env)
|
|
|
|
hasCacheSize := false
|
|
for i, j := range result {
|
|
if strings.HasPrefix(j, "CACHESIZE=") {
|
|
result[i] = cacheSizeValue
|
|
hasCacheSize = true
|
|
}
|
|
}
|
|
if !hasCacheSize {
|
|
result = append(result, cacheSizeValue)
|
|
}
|
|
|
|
return result
|
|
}
|
|
|
|
func generateSnapshotsFromRulesForNow(rules string) (map[string]bool, error) {
|
|
if timesB, err := exec.Command("/sbin/mount.objectivefs", "snapshot", "-vs", rules).Output(); err == nil {
|
|
var result = make(map[string]bool)
|
|
scanner := bufio.NewScanner(strings.NewReader(string(timesB)))
|
|
for scanner.Scan() {
|
|
line := strings.TrimSpace(scanner.Text())
|
|
if strings.HasPrefix(line, "Number of automatic snapshots:") || strings.HasPrefix(line, "Automatic schedule:") {
|
|
continue
|
|
}
|
|
if _, err := time.Parse("2006-01-02T15:04:05Z", line); err != nil {
|
|
return nil, err
|
|
} else {
|
|
result[line] = false
|
|
}
|
|
}
|
|
return result, nil
|
|
|
|
} else {
|
|
return nil, err
|
|
}
|
|
}
|
|
|
|
func parseExistingSnapshots(data string, expectedPrefix string) (map[string]string, error) {
|
|
var result = make(map[string]string)
|
|
scanner := bufio.NewScanner(strings.NewReader(data))
|
|
for scanner.Scan() {
|
|
line := strings.TrimSpace(scanner.Text())
|
|
if strings.HasPrefix(line, "NAME") {
|
|
// Skip column headers
|
|
continue
|
|
}
|
|
|
|
fields := strings.Fields(line)
|
|
name := fields[0]
|
|
|
|
uriSchemeAndRest := strings.SplitN(name, "://", 2)
|
|
if len(uriSchemeAndRest) != 2 {
|
|
return nil, fmt.Errorf("expected URI scheme in the list of existing snapshosts")
|
|
}
|
|
|
|
hostAndPath := uriSchemeAndRest[1]
|
|
|
|
if hostAndPath == expectedPrefix {
|
|
// This is the live filesystem not a snapshot
|
|
continue
|
|
}
|
|
|
|
if !strings.HasPrefix(hostAndPath, expectedPrefix+"@") {
|
|
return nil, fmt.Errorf("unexpected URI in the list of existing snapshosts")
|
|
}
|
|
|
|
time := hostAndPath[len(expectedPrefix)+1:]
|
|
result[time] = name
|
|
}
|
|
return result, nil
|
|
}
|
|
|
|
func getMountedSnaphots(baseDir string, checkMount bool) (map[string]string, error) {
|
|
if entries, err := os.ReadDir(baseDir); err != nil {
|
|
return nil, err
|
|
} else {
|
|
var result = make(map[string]string)
|
|
for _, i := range entries {
|
|
if i.IsDir() {
|
|
iPath := filepath.Join(baseDir, i.Name())
|
|
if isObjFs, err := isObjectiveFsMount(iPath); !checkMount || (err == nil && isObjFs) {
|
|
result[i.Name()] = iPath
|
|
}
|
|
}
|
|
}
|
|
return result, nil
|
|
}
|
|
}
|
|
|
|
func isObjectiveFsMount(path string) (bool, error) {
|
|
mount := exec.Command("df", "--output=target", "-t", "fuse.objectivefs", path)
|
|
if data, err := mount.CombinedOutput(); err == nil {
|
|
scanner := bufio.NewScanner(strings.NewReader(string(data)))
|
|
// On success the first line contains column headers (in our case "Mounted on")
|
|
// and the subsequent lines contain the ObjectiveFs mount points
|
|
headerOk := scanner.Scan()
|
|
if !headerOk {
|
|
return false, nil
|
|
} else {
|
|
for scanner.Scan() {
|
|
if strings.TrimSpace(scanner.Text()) == path {
|
|
return true, nil
|
|
}
|
|
}
|
|
return false, nil
|
|
}
|
|
} else {
|
|
return false, err
|
|
}
|
|
}
|
|
|
|
func setMinus[TKey comparable, TValue any, TValue2 any](a map[TKey]TValue, b map[TKey]TValue2) map[TKey]TValue {
|
|
var result = make(map[TKey]TValue)
|
|
for i, j := range a {
|
|
if _, contains := b[i]; !contains {
|
|
result[i] = j
|
|
}
|
|
}
|
|
return result
|
|
}
|
|
|
|
func setIntersect[TKey comparable, TValue any, TValue2 any](a map[TKey]TValue, b map[TKey]TValue2) map[TKey]TValue {
|
|
var result = make(map[TKey]TValue)
|
|
for i, j := range a {
|
|
if _, contains := b[i]; contains {
|
|
result[i] = j
|
|
}
|
|
}
|
|
return result
|
|
}
|
|
|
|
func addMountOption(mountOpts map[string]string, keyVal string) {
|
|
keyValA := strings.SplitN(keyVal, "=", 2)
|
|
key := keyValA[0]
|
|
var val string
|
|
if len(keyValA) <= 1 {
|
|
val = ""
|
|
} else {
|
|
val = keyValA[1]
|
|
}
|
|
switch key {
|
|
case "bulkdata":
|
|
delete(mountOpts, "nobulkdata")
|
|
mountOpts[key] = ""
|
|
case "nobulkdata":
|
|
delete(mountOpts, "bulkdata")
|
|
mountOpts[key] = ""
|
|
case "clean":
|
|
delete(mountOpts, "noclean")
|
|
mountOpts[key] = val
|
|
case "noclean":
|
|
delete(mountOpts, "clean")
|
|
mountOpts[key] = ""
|
|
case "compact":
|
|
delete(mountOpts, "nocompact")
|
|
mountOpts[key] = val
|
|
case "nocompact":
|
|
delete(mountOpts, "compact")
|
|
mountOpts[key] = ""
|
|
case "freebw":
|
|
delete(mountOpts, "nofreebw")
|
|
delete(mountOpts, "autofreebw")
|
|
mountOpts[key] = ""
|
|
case "nofreebw":
|
|
delete(mountOpts, "freebw")
|
|
delete(mountOpts, "autofreebw")
|
|
mountOpts[key] = ""
|
|
case "autofreebw":
|
|
delete(mountOpts, "freebw")
|
|
delete(mountOpts, "nofreebw")
|
|
mountOpts[key] = ""
|
|
case "hpc":
|
|
delete(mountOpts, "nohpc")
|
|
mountOpts[key] = ""
|
|
case "nohpc":
|
|
delete(mountOpts, "hpc")
|
|
mountOpts[key] = ""
|
|
case "mboost":
|
|
delete(mountOpts, "nomboost")
|
|
mountOpts[key] = val
|
|
case "nomboost":
|
|
delete(mountOpts, "mboost")
|
|
mountOpts[key] = ""
|
|
case "mt":
|
|
delete(mountOpts, "mtplus")
|
|
delete(mountOpts, "nomt")
|
|
delete(mountOpts, "cputhreads")
|
|
delete(mountOpts, "iothreads")
|
|
mountOpts[key] = ""
|
|
case "mtplus":
|
|
delete(mountOpts, "mt")
|
|
delete(mountOpts, "nomt")
|
|
delete(mountOpts, "cputhreads")
|
|
delete(mountOpts, "iothreads")
|
|
mountOpts[key] = ""
|
|
case "nomt":
|
|
delete(mountOpts, "mt")
|
|
delete(mountOpts, "mtplus")
|
|
delete(mountOpts, "cputhreads")
|
|
delete(mountOpts, "iothreads")
|
|
mountOpts[key] = ""
|
|
case "cputhreads":
|
|
delete(mountOpts, "mt")
|
|
delete(mountOpts, "mtplus")
|
|
delete(mountOpts, "nomt")
|
|
delete(mountOpts, "iothreads")
|
|
mountOpts[key] = val
|
|
case "iothreads":
|
|
delete(mountOpts, "mt")
|
|
delete(mountOpts, "mtplus")
|
|
delete(mountOpts, "nomt")
|
|
delete(mountOpts, "cputhreads")
|
|
mountOpts[key] = val
|
|
case "ocache":
|
|
delete(mountOpts, "noocache")
|
|
mountOpts[key] = ""
|
|
case "noocache":
|
|
delete(mountOpts, "ocache")
|
|
mountOpts[key] = ""
|
|
case "oob":
|
|
delete(mountOpts, "nooob")
|
|
mountOpts[key] = ""
|
|
case "nooob":
|
|
delete(mountOpts, "oob")
|
|
mountOpts[key] = ""
|
|
case "oom":
|
|
delete(mountOpts, "nooom")
|
|
mountOpts[key] = ""
|
|
case "nooom":
|
|
delete(mountOpts, "oom")
|
|
mountOpts[key] = ""
|
|
case "ratelimit":
|
|
delete(mountOpts, "noratelimit")
|
|
mountOpts[key] = ""
|
|
case "noratelimit":
|
|
delete(mountOpts, "ratelimit")
|
|
mountOpts[key] = ""
|
|
case "snapshots":
|
|
delete(mountOpts, "nosnapshots")
|
|
mountOpts[key] = ""
|
|
case "nosnapshots":
|
|
delete(mountOpts, "snapshots")
|
|
mountOpts[key] = ""
|
|
default:
|
|
mountOpts[key] = val
|
|
}
|
|
}
|