diff --git a/go.mod b/go.mod index 94c0bc7..16fd8d2 100644 --- a/go.mod +++ b/go.mod @@ -3,6 +3,7 @@ module git.ivasoft.cz/sw/objectivefs-docker-volume go 1.20 require ( + github.com/boltdb/bolt v1.3.1 github.com/docker/go-plugins-helpers v0.0.0-20211224144127-6eecb7beb651 github.com/sirupsen/logrus v1.9.0 ) diff --git a/go.sum b/go.sum index b2ae7c8..c76cf07 100644 --- a/go.sum +++ b/go.sum @@ -1,5 +1,7 @@ github.com/Microsoft/go-winio v0.6.1 h1:9/kr64B9VUZrLm5YYwbGtUJnMgqWVOdUAXu6Migciow= github.com/Microsoft/go-winio v0.6.1/go.mod h1:LRdKpFKfdobln8UmuiYcKPot9D2v6svN5+sAH+4kjUM= +github.com/boltdb/bolt v1.3.1 h1:JQmyP4ZBrce+ZQu0dY660FMfatumYDLun9hBCUVIkF4= +github.com/boltdb/bolt v1.3.1/go.mod h1:clJnj/oiGkjum5o1McbSZDSLxVThjynRyGBgiAx27Ps= github.com/coreos/go-systemd v0.0.0-20191104093116-d3cd4ed1dbcf h1:iW4rZ826su+pqaw19uhpSCzhj44qo35pNgKFGqzDKkU= github.com/coreos/go-systemd v0.0.0-20191104093116-d3cd4ed1dbcf/go.mod h1:F5haX7vjVVG0kc13fIWeqUViNPyEJxv/OmvnBo0Yme4= github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= diff --git a/main.go b/main.go index 2c826c8..6f0ad34 100644 --- a/main.go +++ b/main.go @@ -15,6 +15,8 @@ package main import ( "bufio" + "bytes" + "encoding/gob" "flag" "fmt" "os" @@ -22,8 +24,10 @@ import ( "os/user" "path/filepath" "strconv" + "sync" "time" + "github.com/boltdb/bolt" "github.com/docker/go-plugins-helpers/volume" log "github.com/sirupsen/logrus" ) @@ -39,21 +43,33 @@ type ofsVolume struct { } type ofsDriver struct { - //sync.RWMutex - volumes map[string]*ofsVolume - defEnv map[string]string + sync.RWMutex + volumedb *bolt.DB + defEnv map[string]string } var version = "1.0" -func (d ofsDriver) Create(r *volume.CreateRequest) error { - log.WithFields(log.Fields{"name": r.Name}).Info("Create ObjectiveFS Volume") - //d.Lock() - //defer d.Unlock() +const ( + volumeBucket = "volumes" +) - if _, ok := d.volumes[r.Name]; ok { +func (d *ofsDriver) Create(r *volume.CreateRequest) error { + log.WithFields(log.Fields{"name": r.Name}).Info("Create ObjectiveFS Volume") + d.Lock() + defer d.Unlock() + + tx, err := d.volumedb.Begin(true) + if err != nil { + return err + } + defer tx.Rollback() + + _, volumeExists, err := d.getVolumeInfo(tx, r.Name) + if volumeExists { return fmt.Errorf("volume '%s' already exists", r.Name) } + v := &ofsVolume{} v.volume = &volume.Volume{Name: r.Name, Mountpoint: filepath.Join(volume.DefaultDockerRootDirectory, "objectivefs", r.Name), CreatedAt: time.Now().Format(time.RFC3339Nano)} v.use = make(map[string]bool) @@ -68,7 +84,11 @@ func (d ofsDriver) Create(r *volume.CreateRequest) error { case "fs": v.fs = val case "options", "ptions": - v.opts = v.opts + "," + val + if len(v.opts) == 0 { + v.opts = val + } else { + v.opts = v.opts + "," + val + } case "asap": v.asap = true default: @@ -79,30 +99,53 @@ func (d ofsDriver) Create(r *volume.CreateRequest) error { v.env = append(v.env, id+"="+val) } - d.volumes[r.Name] = v + if err := d.storeVolumeInfo(tx, r.Name, v); err != nil { + return err + } + + // Commit the transaction and check for error. + if err := tx.Commit(); err != nil { + return err + } return nil } -func (d ofsDriver) List() (*volume.ListResponse, error) { - //d.Lock() - //defer d.Unlock() +func (d *ofsDriver) List() (*volume.ListResponse, error) { + d.Lock() + defer d.Unlock() - var vs []*volume.Volume - for _, v := range d.volumes { - vs = append(vs, v.volume) + tx, err := d.volumedb.Begin(false) + if err != nil { + return nil, err } - return &volume.ListResponse{Volumes: vs}, nil + defer tx.Rollback() + + var vols []*volume.Volume + volumeMap, err := d.getVolumeMap(tx) + for _, v := range volumeMap { + vols = append(vols, v.volume) + } + return &volume.ListResponse{Volumes: vols}, nil } -func (d ofsDriver) Get(r *volume.GetRequest) (*volume.GetResponse, error) { - //d.Lock() - //defer d.Unlock() +func (d *ofsDriver) Get(r *volume.GetRequest) (*volume.GetResponse, error) { + d.Lock() + defer d.Unlock() - v, ok := d.volumes[r.Name] - if !ok { - return &volume.GetResponse{}, fmt.Errorf("volume '%s' not found", r.Name) + tx, err := d.volumedb.Begin(false) + if err != nil { + return nil, err } - return &volume.GetResponse{Volume: v.volume}, nil + defer tx.Rollback() + + volumeInfo, volumeExists, getVolErr := d.getVolumeInfo(tx, r.Name) + if !volumeExists { + return &volume.GetResponse{}, fmt.Errorf("volume %s does not exist", r.Name) + } + if getVolErr != nil { + return &volume.GetResponse{}, getVolErr + } + return &volume.GetResponse{Volume: volumeInfo.volume}, nil } func umount(v *ofsVolume) error { @@ -120,13 +163,22 @@ func umount(v *ofsVolume) error { return nil } -func (d ofsDriver) Remove(r *volume.RemoveRequest) error { - //d.Lock() - //defer d.Unlock() +func (d *ofsDriver) Remove(r *volume.RemoveRequest) error { + d.Lock() + defer d.Unlock() - v, ok := d.volumes[r.Name] - if !ok { - return fmt.Errorf("volume '%s' not found", r.Name) + tx, err := d.volumedb.Begin(true) + if err != nil { + return err + } + defer tx.Rollback() + + v, volumeExists, getVolErr := d.getVolumeInfo(tx, r.Name) + if !volumeExists { + return fmt.Errorf("volume %s does not exist", r.Name) + } + if getVolErr != nil { + return getVolErr } if len(v.use) != 0 { return fmt.Errorf("volume '%s' currently in use (%d unique)", r.Name, len(v.use)) @@ -134,29 +186,52 @@ func (d ofsDriver) Remove(r *volume.RemoveRequest) error { if err := umount(v); err != nil { return err } - delete(d.volumes, r.Name) - return nil + + if err := d.removeVolumeInfo(tx, r.Name); err != nil { + return err + } + return tx.Commit() } -func (d ofsDriver) Path(r *volume.PathRequest) (*volume.PathResponse, error) { +func (d *ofsDriver) Path(r *volume.PathRequest) (*volume.PathResponse, error) { + d.Lock() + defer d.Unlock() + + tx, err := d.volumedb.Begin(false) + if err != nil { + return nil, err + } + defer tx.Rollback() + + volumeInfo, volumeExists, getVolErr := d.getVolumeInfo(tx, r.Name) + if !volumeExists { + return &volume.PathResponse{}, fmt.Errorf("volume %s does not exist", r.Name) + } + if getVolErr != nil { + return &volume.PathResponse{}, getVolErr + } + + return &volume.PathResponse{Mountpoint: volumeInfo.volume.Mountpoint}, nil +} + +func (d *ofsDriver) Mount(r *volume.MountRequest) (*volume.MountResponse, error) { //d.Lock() //defer d.Unlock() - v, ok := d.volumes[r.Name] - if !ok { - return &volume.PathResponse{}, fmt.Errorf("volume '%s' not found", r.Name) + tx, err := d.volumedb.Begin(true) + if err != nil { + return nil, err } - return &volume.PathResponse{Mountpoint: v.volume.Mountpoint}, nil -} + defer tx.Rollback() -func (d ofsDriver) Mount(r *volume.MountRequest) (*volume.MountResponse, error) { - //d.Lock() - //defer d.Unlock() - - v, ok := d.volumes[r.Name] - if !ok { - return &volume.MountResponse{}, fmt.Errorf("volume '%s' not found", r.Name) + v, volumeExists, getVolErr := d.getVolumeInfo(tx, r.Name) + if !volumeExists { + return &volume.MountResponse{}, fmt.Errorf("volume %s does not exist", r.Name) } + if getVolErr != nil { + return &volume.MountResponse{}, getVolErr + } + log.WithFields(log.Fields{"name": r.Name, "id": r.ID}).Info("Attach ObjectiveFS Volume") if !v.mounted { if err := os.MkdirAll(v.volume.Mountpoint, 0755); err != nil { @@ -206,31 +281,45 @@ func (d ofsDriver) Mount(r *volume.MountRequest) (*volume.MountResponse, error) v.mounted = true } v.use[r.ID] = true - return &volume.MountResponse{Mountpoint: v.volume.Mountpoint}, nil + + d.storeVolumeInfo(tx, r.Name, v) + + return &volume.MountResponse{Mountpoint: v.volume.Mountpoint}, tx.Commit() } -func (d ofsDriver) Unmount(r *volume.UnmountRequest) error { - //d.Lock() - //defer d.Unlock() +func (d *ofsDriver) Unmount(r *volume.UnmountRequest) error { + d.Lock() + defer d.Unlock() - v, ok := d.volumes[r.Name] - if !ok { - return fmt.Errorf("volume '%s' not found", r.Name) - } log.WithFields(log.Fields{"name": r.Name, "id": r.ID}).Info("Detach ObjectiveFS Volume") + + tx, err := d.volumedb.Begin(true) + if err != nil { + return err + } + defer tx.Rollback() + + v, volumeExists, getVolErr := d.getVolumeInfo(tx, r.Name) + if !volumeExists { + return fmt.Errorf("volume %s does not exist", r.Name) + } + if getVolErr != nil { + return getVolErr + } + delete(v.use, r.ID) if len(v.use) == 0 && v.asap { if err := umount(v); err != nil { return err } } - return nil + + d.storeVolumeInfo(tx, r.Name, v) + + return tx.Commit() } -func (d ofsDriver) Capabilities() *volume.CapabilitiesResponse { - //d.Lock() - //defer d.Unlock() - +func (d *ofsDriver) Capabilities() *volume.CapabilitiesResponse { return &volume.CapabilitiesResponse{Capabilities: volume.Capability{Scope: "local"}} } @@ -272,7 +361,20 @@ func main() { } } - d := ofsDriver{volumes: make(map[string]*ofsVolume), defEnv: defEnv} + db, err := bolt.Open("objectivefs.db", 0600, nil) + if err != nil { + log.Fatal(err) + } + + db.Update(func(tx *bolt.Tx) error { + _, err := tx.CreateBucketIfNotExists([]byte(volumeBucket)) + if err != nil { + log.Fatalf("create bucket: %s", err) + } + return nil + }) + + d := &ofsDriver{volumedb: db, defEnv: defEnv} h := volume.NewHandler(d) u, _ := user.Lookup("root") gid, _ := strconv.Atoi(u.Gid) @@ -280,3 +382,64 @@ func main() { log.Fatal(err) } } + +// ---------------- + +func (p *ofsVolume) gobEncode() ([]byte, error) { + buf := new(bytes.Buffer) + enc := gob.NewEncoder(buf) + err := enc.Encode(p) + if err != nil { + return nil, err + } + return buf.Bytes(), nil +} + +func gobDecode(data []byte) (*ofsVolume, error) { + var p *ofsVolume + buf := bytes.NewBuffer(data) + dec := gob.NewDecoder(buf) + err := dec.Decode(&p) + if err != nil { + return nil, err + } + return p, nil +} + +func (p *ofsDriver) storeVolumeInfo(tx *bolt.Tx, volumeName string, info *ofsVolume) error { + bucket := tx.Bucket([]byte(volumeBucket)) + b, err := info.gobEncode() + if err != nil { + return err + } + return bucket.Put([]byte(volumeName), b) +} + +func (p *ofsDriver) getVolumeInfo(tx *bolt.Tx, volumeName string) (*ofsVolume, bool, error) { + bucket := tx.Bucket([]byte(volumeBucket)) + v := bucket.Get([]byte(volumeName)) + if v == nil { + return nil, false, nil + } + info, err := gobDecode(v) + return info, true, err +} + +func (p *ofsDriver) getVolumeMap(tx *bolt.Tx) (map[string]ofsVolume, error) { + bucket := tx.Bucket([]byte(volumeBucket)) + ret := make(map[string]ofsVolume) + err := bucket.ForEach(func(k, v []byte) error { + info, err := gobDecode(v) + if err != nil { + return err + } + ret[string(k)] = *info + return nil + }) + return ret, err +} + +func (p *ofsDriver) removeVolumeInfo(tx *bolt.Tx, volumeName string) error { + bucket := tx.Bucket([]byte(volumeBucket)) + return bucket.Delete([]byte(volumeName)) +}