Files
objectivefs-docker-volume/main.go
Roman Vaníček 2b5e0a0c6c
All checks were successful
continuous-integration/drone/push Build is passing
Still hangs on mount
2023-05-26 14:39:25 +02:00

282 lines
7.7 KiB
Go

// Copyright (c) 2020, Objective Security Corporation
// Permission to use, copy, modify, and/or distribute this software for any
// purpose with or without fee is hereby granted, provided that the above
// copyright notice and this permission notice appear in all copies.
// THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES
// WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF
// MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR
// ANY SPECIAL, DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES
// WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN
// ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF
// OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
package main
import (
"bufio"
"flag"
"fmt"
"os"
"os/exec"
"os/user"
"path/filepath"
"strconv"
"time"
"github.com/docker/go-plugins-helpers/volume"
log "github.com/sirupsen/logrus"
)
type ofsVolume struct {
volume *volume.Volume
fs string
opts string
env []string
use map[string]bool
mounted bool
asap bool
}
type ofsDriver struct {
//sync.RWMutex
volumes map[string]*ofsVolume
defEnv map[string]string
}
var version = "1.0"
func (d ofsDriver) Create(r *volume.CreateRequest) error {
log.WithFields(log.Fields{"name": r.Name}).Info("Create ObjectiveFS Volume")
//d.Lock()
//defer d.Unlock()
if _, ok := d.volumes[r.Name]; ok {
return fmt.Errorf("volume '%s' already exists", r.Name)
}
v := &ofsVolume{}
v.volume = &volume.Volume{Name: r.Name, Mountpoint: filepath.Join(volume.DefaultDockerRootDirectory, "objectivefs", r.Name), CreatedAt: time.Now().Format(time.RFC3339Nano)}
v.use = make(map[string]bool)
v.opts = ""
v.fs = r.Name
env := make(map[string]string)
for id, val := range d.defEnv {
env[id] = val
}
for key, val := range r.Options {
switch key {
case "fs":
v.fs = val
case "options", "ptions":
v.opts = v.opts + "," + val
case "asap":
v.asap = true
default:
env[key] = val
}
}
for id, val := range env {
v.env = append(v.env, id+"="+val)
}
d.volumes[r.Name] = v
return nil
}
func (d ofsDriver) List() (*volume.ListResponse, error) {
//d.Lock()
//defer d.Unlock()
var vs []*volume.Volume
for _, v := range d.volumes {
vs = append(vs, v.volume)
}
return &volume.ListResponse{Volumes: vs}, nil
}
func (d ofsDriver) Get(r *volume.GetRequest) (*volume.GetResponse, error) {
//d.Lock()
//defer d.Unlock()
v, ok := d.volumes[r.Name]
if !ok {
return &volume.GetResponse{}, fmt.Errorf("volume '%s' not found", r.Name)
}
return &volume.GetResponse{Volume: v.volume}, nil
}
func umount(v *ofsVolume) error {
log.WithFields(log.Fields{"name": v.volume.Name}).Info("Unmount ObjectiveFS Volume")
if !v.mounted {
return nil
}
if err := exec.Command("umount", v.volume.Mountpoint).Run(); err != nil {
return err
}
if err := os.Remove(v.volume.Mountpoint); err != nil {
return err
}
v.mounted = false
return nil
}
func (d ofsDriver) Remove(r *volume.RemoveRequest) error {
//d.Lock()
//defer d.Unlock()
v, ok := d.volumes[r.Name]
if !ok {
return fmt.Errorf("volume '%s' not found", r.Name)
}
if len(v.use) != 0 {
return fmt.Errorf("volume '%s' currently in use (%d unique)", r.Name, len(v.use))
}
if err := umount(v); err != nil {
return err
}
delete(d.volumes, r.Name)
return nil
}
func (d ofsDriver) Path(r *volume.PathRequest) (*volume.PathResponse, error) {
//d.Lock()
//defer d.Unlock()
v, ok := d.volumes[r.Name]
if !ok {
return &volume.PathResponse{}, fmt.Errorf("volume '%s' not found", r.Name)
}
return &volume.PathResponse{Mountpoint: v.volume.Mountpoint}, nil
}
func (d ofsDriver) Mount(r *volume.MountRequest) (*volume.MountResponse, error) {
//d.Lock()
//defer d.Unlock()
v, ok := d.volumes[r.Name]
if !ok {
return &volume.MountResponse{}, fmt.Errorf("volume '%s' not found", r.Name)
}
log.WithFields(log.Fields{"name": r.Name, "id": r.ID}).Info("Attach ObjectiveFS Volume")
if !v.mounted {
if err := os.MkdirAll(v.volume.Mountpoint, 0755); err != nil {
return &volume.MountResponse{}, err
}
// Note: The first argument ("mount") causes running in the foreground, its absence in the background
var cmd *exec.Cmd
if len(v.opts) == 0 {
cmd = exec.Command("/sbin/mount.objectivefs", "mount", v.fs, v.volume.Mountpoint)
} else {
cmd = exec.Command("/sbin/mount.objectivefs", "mount", "-o"+v.opts, v.fs, v.volume.Mountpoint)
}
cmd.Env = v.env
cmdReader, _ := cmd.StderrPipe()
log.WithFields(log.Fields{
"name": r.Name,
"cmd": cmd,
//"env": v.env, // for security reasons disabled
}).Info("Mount ObjectiveFS Volume")
scanner := bufio.NewScanner(cmdReader)
go func() {
for scanner.Scan() {
log.WithFields(log.Fields{"name": r.Name}).Info(scanner.Text())
}
log.WithFields(log.Fields{"name": r.Name}).Info("Completing mount process")
cmd.Wait()
log.WithFields(log.Fields{"name": r.Name}).Info("Completed mount process")
}()
if err := cmd.Start(); err != nil {
return &volume.MountResponse{}, fmt.Errorf("unexpected error mounting '%s' error: %s", r.Name, err.Error())
}
log.WithFields(log.Fields{"name": r.Name}).Info("Process starting")
// The drawback of running the mount in the foreground is there is no way to tell if it failed
// to initially connect. So we just wait a fixed amount of time and check for process exit.
time.Sleep(1 * time.Second)
log.WithFields(log.Fields{"name": r.Name}).Info("Process started")
if cmd.ProcessState.Exited() {
log.WithFields(log.Fields{"name": r.Name, "exitStatus": cmd.ProcessState.ExitCode()}).Error("Volume mount failed")
return &volume.MountResponse{}, fmt.Errorf("unexpected error mounting '%s' exist status: %v", r.Name, cmd.ProcessState.ExitCode())
}
log.WithFields(log.Fields{"name": r.Name}).Info("Volume mounted")
v.mounted = true
}
v.use[r.ID] = true
return &volume.MountResponse{Mountpoint: v.volume.Mountpoint}, nil
}
func (d ofsDriver) Unmount(r *volume.UnmountRequest) error {
//d.Lock()
//defer d.Unlock()
v, ok := d.volumes[r.Name]
if !ok {
return fmt.Errorf("volume '%s' not found", r.Name)
}
log.WithFields(log.Fields{"name": r.Name, "id": r.ID}).Info("Detach ObjectiveFS Volume")
delete(v.use, r.ID)
if len(v.use) == 0 && v.asap {
if err := umount(v); err != nil {
return err
}
}
return nil
}
func (d ofsDriver) Capabilities() *volume.CapabilitiesResponse {
//d.Lock()
//defer d.Unlock()
return &volume.CapabilitiesResponse{Capabilities: volume.Capability{Scope: "local"}}
}
var (
logLevel = flag.String("log", "", "log level")
logFilePath = flag.String("logfile", "", "log file")
)
func main() {
flag.Parse()
if *logLevel == "" {
if *logLevel = os.Getenv("LOG_LEVEL"); *logLevel == "" {
*logLevel = "info"
}
}
level, err := log.ParseLevel(*logLevel)
if err != nil {
log.WithError(err).Fatal("Failed to parse log level")
}
log.SetLevel(level)
if *logFilePath != "" {
f, err := os.OpenFile(*logFilePath, os.O_CREATE|os.O_WRONLY|os.O_APPEND, 0666)
if err != nil {
log.WithError(err).Fatal("Failed to open log file for writing")
}
defer f.Close()
log.StandardLogger().Out = f
}
log.Info("Starting ObjectiveFS Volume Driver, version " + version)
defEnv := make(map[string]string)
for _, i := range []string{"ACCESS_KEY", "SECRET_KEY", "OBJECTIVEFS_LICENSE", "OBJECTSTORE", "ENDPOINT", "CACHESIZE", "DISKCACHE_SIZE", "DISKCACHE_PATH", "OBJECTIVEFS_PASSPHRASE"} {
if a := os.Getenv(i); a != "" {
defEnv[i] = a
}
}
d := ofsDriver{volumes: make(map[string]*ofsVolume), defEnv: defEnv}
h := volume.NewHandler(d)
u, _ := user.Lookup("root")
gid, _ := strconv.Atoi(u.Gid)
if err := h.ServeUnix("/run/docker/plugins/objectivefs", gid); err != nil {
log.Fatal(err)
}
}