// Copyright (c) 2020, Objective Security Corporation // Permission to use, copy, modify, and/or distribute this software for any // purpose with or without fee is hereby granted, provided that the above // copyright notice and this permission notice appear in all copies. // THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES // WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF // MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR // ANY SPECIAL, DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES // WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN // ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF // OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE. package main import ( "bufio" "bytes" "encoding/gob" "flag" "fmt" "os" "os/exec" "os/user" "path/filepath" "strconv" "sync" "time" "github.com/boltdb/bolt" "github.com/docker/go-plugins-helpers/volume" log "github.com/sirupsen/logrus" ) type ofsVolume struct { Volume *volume.Volume Fs string Opts string Env []string Use map[string]bool Mounted bool Asap bool } type ofsDriver struct { sync.RWMutex volumedb *bolt.DB defEnv map[string]string } var version = "1.0" const ( volumeBucket = "volumes" ) func (d *ofsDriver) Create(r *volume.CreateRequest) error { log.WithFields(log.Fields{"name": r.Name}).Info("Create ObjectiveFS Volume") d.Lock() defer d.Unlock() tx, err := d.volumedb.Begin(true) if err != nil { return err } defer tx.Rollback() _, volumeExists, err := d.getVolumeInfo(tx, r.Name) if volumeExists { return fmt.Errorf("volume '%s' already exists", r.Name) } v := &ofsVolume{} v.Volume = &volume.Volume{Name: r.Name, Mountpoint: filepath.Join(volume.DefaultDockerRootDirectory, "objectivefs", r.Name), CreatedAt: time.Now().Format(time.RFC3339Nano)} v.Use = make(map[string]bool) v.Opts = "" v.Fs = r.Name env := make(map[string]string) for id, val := range d.defEnv { env[id] = val } for key, val := range r.Options { switch key { case "fs": v.Fs = val case "options", "ptions": if len(v.Opts) == 0 { v.Opts = val } else { v.Opts = v.Opts + "," + val } case "asap": v.Asap = true default: env[key] = val } } for id, val := range env { v.Env = append(v.Env, id+"="+val) } if err := d.storeVolumeInfo(tx, r.Name, v); err != nil { return err } // Commit the transaction and check for error. if err := tx.Commit(); err != nil { return err } return nil } func (d *ofsDriver) List() (*volume.ListResponse, error) { d.Lock() defer d.Unlock() tx, err := d.volumedb.Begin(false) if err != nil { return nil, err } defer tx.Rollback() var vols []*volume.Volume volumeMap, err := d.getVolumeMap(tx) for _, v := range volumeMap { vols = append(vols, v.Volume) } return &volume.ListResponse{Volumes: vols}, nil } func (d *ofsDriver) Get(r *volume.GetRequest) (*volume.GetResponse, error) { d.Lock() defer d.Unlock() tx, err := d.volumedb.Begin(false) if err != nil { return nil, err } defer tx.Rollback() volumeInfo, volumeExists, getVolErr := d.getVolumeInfo(tx, r.Name) if !volumeExists { return &volume.GetResponse{}, fmt.Errorf("volume %s does not exist", r.Name) } if getVolErr != nil { return &volume.GetResponse{}, getVolErr } return &volume.GetResponse{Volume: volumeInfo.Volume}, nil } func umount(v *ofsVolume) error { log.WithFields(log.Fields{"name": v.Volume.Name}).Info("Unmount ObjectiveFS Volume") if !v.Mounted { return nil } if err := exec.Command("umount", v.Volume.Mountpoint).Run(); err != nil { return err } if err := os.Remove(v.Volume.Mountpoint); err != nil { return err } v.Mounted = false return nil } func (d *ofsDriver) Remove(r *volume.RemoveRequest) error { d.Lock() defer d.Unlock() tx, err := d.volumedb.Begin(true) if err != nil { return err } defer tx.Rollback() v, volumeExists, getVolErr := d.getVolumeInfo(tx, r.Name) if !volumeExists { return fmt.Errorf("volume %s does not exist", r.Name) } if getVolErr != nil { return getVolErr } if len(v.Use) != 0 { return fmt.Errorf("volume '%s' currently in use (%d unique)", r.Name, len(v.Use)) } if err := umount(v); err != nil { return err } if err := d.removeVolumeInfo(tx, r.Name); err != nil { return err } return tx.Commit() } func (d *ofsDriver) Path(r *volume.PathRequest) (*volume.PathResponse, error) { d.Lock() defer d.Unlock() tx, err := d.volumedb.Begin(false) if err != nil { return nil, err } defer tx.Rollback() volumeInfo, volumeExists, getVolErr := d.getVolumeInfo(tx, r.Name) if !volumeExists { return &volume.PathResponse{}, fmt.Errorf("volume %s does not exist", r.Name) } if getVolErr != nil { return &volume.PathResponse{}, getVolErr } return &volume.PathResponse{Mountpoint: volumeInfo.Volume.Mountpoint}, nil } func (d *ofsDriver) Mount(r *volume.MountRequest) (*volume.MountResponse, error) { //d.Lock() //defer d.Unlock() tx, err := d.volumedb.Begin(true) if err != nil { return nil, err } defer tx.Rollback() v, volumeExists, getVolErr := d.getVolumeInfo(tx, r.Name) if !volumeExists { return &volume.MountResponse{}, fmt.Errorf("volume %s does not exist", r.Name) } if getVolErr != nil { return &volume.MountResponse{}, getVolErr } log.WithFields(log.Fields{"name": r.Name, "id": r.ID}).Info("Attach ObjectiveFS Volume") if !v.Mounted { if err := os.MkdirAll(v.Volume.Mountpoint, 0755); err != nil { return &volume.MountResponse{}, err } // Note: The first argument ("mount") causes running in the foreground, its absence in the background var cmd *exec.Cmd if len(v.Opts) == 0 { cmd = exec.Command("/sbin/mount.objectivefs", "mount", v.Fs, v.Volume.Mountpoint) } else { cmd = exec.Command("/sbin/mount.objectivefs", "mount", "-o"+v.Opts, v.Fs, v.Volume.Mountpoint) } cmd.Env = v.Env cmdReader, _ := cmd.StderrPipe() log.WithFields(log.Fields{ "name": r.Name, "cmd": cmd, //"env": v.env, // for security reasons disabled }).Info("Mount ObjectiveFS Volume") scanner := bufio.NewScanner(cmdReader) go func() { for scanner.Scan() { log.WithFields(log.Fields{"name": r.Name}).Info(scanner.Text()) } log.WithFields(log.Fields{"name": r.Name}).Info("Completing mount process") cmd.Wait() log.WithFields(log.Fields{"name": r.Name}).Info("Completed mount process") }() if err := cmd.Start(); err != nil { return &volume.MountResponse{}, fmt.Errorf("unexpected error mounting '%s' error: %s", r.Name, err.Error()) } log.WithFields(log.Fields{"name": r.Name}).Info("Process starting") // The drawback of running the mount in the foreground is there is no way to tell if it failed // to initially connect. So we just wait a fixed amount of time and check for process exit. time.Sleep(1 * time.Second) log.WithFields(log.Fields{"name": r.Name}).Info("Process started") if cmd.ProcessState != nil { // The process has exited so consider an error occured log.WithFields(log.Fields{"name": r.Name, "exitStatus": cmd.ProcessState.ExitCode()}).Error("Volume mount failed") return &volume.MountResponse{}, fmt.Errorf("unexpected error mounting '%s' exist status: %v", r.Name, cmd.ProcessState.ExitCode()) } log.WithFields(log.Fields{"name": r.Name}).Info("Volume mounted") v.Mounted = true } v.Use[r.ID] = true d.storeVolumeInfo(tx, r.Name, v) return &volume.MountResponse{Mountpoint: v.Volume.Mountpoint}, tx.Commit() } func (d *ofsDriver) Unmount(r *volume.UnmountRequest) error { d.Lock() defer d.Unlock() log.WithFields(log.Fields{"name": r.Name, "id": r.ID}).Info("Detach ObjectiveFS Volume") tx, err := d.volumedb.Begin(true) if err != nil { return err } defer tx.Rollback() v, volumeExists, getVolErr := d.getVolumeInfo(tx, r.Name) if !volumeExists { return fmt.Errorf("volume %s does not exist", r.Name) } if getVolErr != nil { return getVolErr } delete(v.Use, r.ID) if len(v.Use) == 0 && v.Asap { if err := umount(v); err != nil { return err } } d.storeVolumeInfo(tx, r.Name, v) return tx.Commit() } func (d *ofsDriver) Capabilities() *volume.CapabilitiesResponse { return &volume.CapabilitiesResponse{Capabilities: volume.Capability{Scope: "local"}} } var ( logLevel = flag.String("log", "", "log level") logFilePath = flag.String("logfile", "", "log file") ) func main() { flag.Parse() if *logLevel == "" { if *logLevel = os.Getenv("LOG_LEVEL"); *logLevel == "" { *logLevel = "info" } } level, err := log.ParseLevel(*logLevel) if err != nil { log.WithError(err).Fatal("Failed to parse log level") } log.SetLevel(level) if *logFilePath != "" { f, err := os.OpenFile(*logFilePath, os.O_CREATE|os.O_WRONLY|os.O_APPEND, 0666) if err != nil { log.WithError(err).Fatal("Failed to open log file for writing") } defer f.Close() log.StandardLogger().Out = f } log.Info("Starting ObjectiveFS Volume Driver, version " + version) defEnv := make(map[string]string) for _, i := range []string{"ACCESS_KEY", "SECRET_KEY", "OBJECTIVEFS_LICENSE", "OBJECTSTORE", "ENDPOINT", "CACHESIZE", "DISKCACHE_SIZE", "DISKCACHE_PATH", "OBJECTIVEFS_PASSPHRASE"} { if a := os.Getenv(i); a != "" { defEnv[i] = a } } db, err := bolt.Open("objectivefs.db", 0600, nil) if err != nil { log.Fatal(err) } db.Update(func(tx *bolt.Tx) error { _, err := tx.CreateBucketIfNotExists([]byte(volumeBucket)) if err != nil { log.Fatalf("create bucket: %s", err) } return nil }) d := &ofsDriver{volumedb: db, defEnv: defEnv} h := volume.NewHandler(d) u, _ := user.Lookup("root") gid, _ := strconv.Atoi(u.Gid) if err := h.ServeUnix("/run/docker/plugins/objectivefs", gid); err != nil { log.Fatal(err) } } // ---------------- func (p *ofsVolume) gobEncode() ([]byte, error) { buf := new(bytes.Buffer) enc := gob.NewEncoder(buf) err := enc.Encode(p) if err != nil { return nil, err } return buf.Bytes(), nil } func gobDecode(data []byte) (*ofsVolume, error) { var p *ofsVolume buf := bytes.NewBuffer(data) dec := gob.NewDecoder(buf) err := dec.Decode(&p) if err != nil { return nil, err } return p, nil } func (p *ofsDriver) storeVolumeInfo(tx *bolt.Tx, volumeName string, info *ofsVolume) error { bucket := tx.Bucket([]byte(volumeBucket)) b, err := info.gobEncode() if err != nil { return err } return bucket.Put([]byte(volumeName), b) } func (p *ofsDriver) getVolumeInfo(tx *bolt.Tx, volumeName string) (*ofsVolume, bool, error) { bucket := tx.Bucket([]byte(volumeBucket)) v := bucket.Get([]byte(volumeName)) if v == nil { return nil, false, nil } info, err := gobDecode(v) return info, true, err } func (p *ofsDriver) getVolumeMap(tx *bolt.Tx) (map[string]ofsVolume, error) { bucket := tx.Bucket([]byte(volumeBucket)) ret := make(map[string]ofsVolume) err := bucket.ForEach(func(k, v []byte) error { info, err := gobDecode(v) if err != nil { return err } ret[string(k)] = *info return nil }) return ret, err } func (p *ofsDriver) removeVolumeInfo(tx *bolt.Tx, volumeName string) error { bucket := tx.Bucket([]byte(volumeBucket)) return bucket.Delete([]byte(volumeName)) }