forked from Ivasoft/objectivefs-docker-volume
Persisting volumes states in Bolt db. Locks reinstated.
All checks were successful
continuous-integration/drone/push Build is passing
All checks were successful
continuous-integration/drone/push Build is passing
This commit is contained in:
1
go.mod
1
go.mod
@@ -3,6 +3,7 @@ module git.ivasoft.cz/sw/objectivefs-docker-volume
|
||||
go 1.20
|
||||
|
||||
require (
|
||||
github.com/boltdb/bolt v1.3.1
|
||||
github.com/docker/go-plugins-helpers v0.0.0-20211224144127-6eecb7beb651
|
||||
github.com/sirupsen/logrus v1.9.0
|
||||
)
|
||||
|
||||
2
go.sum
2
go.sum
@@ -1,5 +1,7 @@
|
||||
github.com/Microsoft/go-winio v0.6.1 h1:9/kr64B9VUZrLm5YYwbGtUJnMgqWVOdUAXu6Migciow=
|
||||
github.com/Microsoft/go-winio v0.6.1/go.mod h1:LRdKpFKfdobln8UmuiYcKPot9D2v6svN5+sAH+4kjUM=
|
||||
github.com/boltdb/bolt v1.3.1 h1:JQmyP4ZBrce+ZQu0dY660FMfatumYDLun9hBCUVIkF4=
|
||||
github.com/boltdb/bolt v1.3.1/go.mod h1:clJnj/oiGkjum5o1McbSZDSLxVThjynRyGBgiAx27Ps=
|
||||
github.com/coreos/go-systemd v0.0.0-20191104093116-d3cd4ed1dbcf h1:iW4rZ826su+pqaw19uhpSCzhj44qo35pNgKFGqzDKkU=
|
||||
github.com/coreos/go-systemd v0.0.0-20191104093116-d3cd4ed1dbcf/go.mod h1:F5haX7vjVVG0kc13fIWeqUViNPyEJxv/OmvnBo0Yme4=
|
||||
github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
|
||||
|
||||
281
main.go
281
main.go
@@ -15,6 +15,8 @@ package main
|
||||
|
||||
import (
|
||||
"bufio"
|
||||
"bytes"
|
||||
"encoding/gob"
|
||||
"flag"
|
||||
"fmt"
|
||||
"os"
|
||||
@@ -22,8 +24,10 @@ import (
|
||||
"os/user"
|
||||
"path/filepath"
|
||||
"strconv"
|
||||
"sync"
|
||||
"time"
|
||||
|
||||
"github.com/boltdb/bolt"
|
||||
"github.com/docker/go-plugins-helpers/volume"
|
||||
log "github.com/sirupsen/logrus"
|
||||
)
|
||||
@@ -39,21 +43,33 @@ type ofsVolume struct {
|
||||
}
|
||||
|
||||
type ofsDriver struct {
|
||||
//sync.RWMutex
|
||||
volumes map[string]*ofsVolume
|
||||
defEnv map[string]string
|
||||
sync.RWMutex
|
||||
volumedb *bolt.DB
|
||||
defEnv map[string]string
|
||||
}
|
||||
|
||||
var version = "1.0"
|
||||
|
||||
func (d ofsDriver) Create(r *volume.CreateRequest) error {
|
||||
log.WithFields(log.Fields{"name": r.Name}).Info("Create ObjectiveFS Volume")
|
||||
//d.Lock()
|
||||
//defer d.Unlock()
|
||||
const (
|
||||
volumeBucket = "volumes"
|
||||
)
|
||||
|
||||
if _, ok := d.volumes[r.Name]; ok {
|
||||
func (d *ofsDriver) Create(r *volume.CreateRequest) error {
|
||||
log.WithFields(log.Fields{"name": r.Name}).Info("Create ObjectiveFS Volume")
|
||||
d.Lock()
|
||||
defer d.Unlock()
|
||||
|
||||
tx, err := d.volumedb.Begin(true)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
defer tx.Rollback()
|
||||
|
||||
_, volumeExists, err := d.getVolumeInfo(tx, r.Name)
|
||||
if volumeExists {
|
||||
return fmt.Errorf("volume '%s' already exists", r.Name)
|
||||
}
|
||||
|
||||
v := &ofsVolume{}
|
||||
v.volume = &volume.Volume{Name: r.Name, Mountpoint: filepath.Join(volume.DefaultDockerRootDirectory, "objectivefs", r.Name), CreatedAt: time.Now().Format(time.RFC3339Nano)}
|
||||
v.use = make(map[string]bool)
|
||||
@@ -68,7 +84,11 @@ func (d ofsDriver) Create(r *volume.CreateRequest) error {
|
||||
case "fs":
|
||||
v.fs = val
|
||||
case "options", "ptions":
|
||||
v.opts = v.opts + "," + val
|
||||
if len(v.opts) == 0 {
|
||||
v.opts = val
|
||||
} else {
|
||||
v.opts = v.opts + "," + val
|
||||
}
|
||||
case "asap":
|
||||
v.asap = true
|
||||
default:
|
||||
@@ -79,30 +99,53 @@ func (d ofsDriver) Create(r *volume.CreateRequest) error {
|
||||
v.env = append(v.env, id+"="+val)
|
||||
}
|
||||
|
||||
d.volumes[r.Name] = v
|
||||
if err := d.storeVolumeInfo(tx, r.Name, v); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
// Commit the transaction and check for error.
|
||||
if err := tx.Commit(); err != nil {
|
||||
return err
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
func (d ofsDriver) List() (*volume.ListResponse, error) {
|
||||
//d.Lock()
|
||||
//defer d.Unlock()
|
||||
func (d *ofsDriver) List() (*volume.ListResponse, error) {
|
||||
d.Lock()
|
||||
defer d.Unlock()
|
||||
|
||||
var vs []*volume.Volume
|
||||
for _, v := range d.volumes {
|
||||
vs = append(vs, v.volume)
|
||||
tx, err := d.volumedb.Begin(false)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
return &volume.ListResponse{Volumes: vs}, nil
|
||||
defer tx.Rollback()
|
||||
|
||||
var vols []*volume.Volume
|
||||
volumeMap, err := d.getVolumeMap(tx)
|
||||
for _, v := range volumeMap {
|
||||
vols = append(vols, v.volume)
|
||||
}
|
||||
return &volume.ListResponse{Volumes: vols}, nil
|
||||
}
|
||||
|
||||
func (d ofsDriver) Get(r *volume.GetRequest) (*volume.GetResponse, error) {
|
||||
//d.Lock()
|
||||
//defer d.Unlock()
|
||||
func (d *ofsDriver) Get(r *volume.GetRequest) (*volume.GetResponse, error) {
|
||||
d.Lock()
|
||||
defer d.Unlock()
|
||||
|
||||
v, ok := d.volumes[r.Name]
|
||||
if !ok {
|
||||
return &volume.GetResponse{}, fmt.Errorf("volume '%s' not found", r.Name)
|
||||
tx, err := d.volumedb.Begin(false)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
return &volume.GetResponse{Volume: v.volume}, nil
|
||||
defer tx.Rollback()
|
||||
|
||||
volumeInfo, volumeExists, getVolErr := d.getVolumeInfo(tx, r.Name)
|
||||
if !volumeExists {
|
||||
return &volume.GetResponse{}, fmt.Errorf("volume %s does not exist", r.Name)
|
||||
}
|
||||
if getVolErr != nil {
|
||||
return &volume.GetResponse{}, getVolErr
|
||||
}
|
||||
return &volume.GetResponse{Volume: volumeInfo.volume}, nil
|
||||
}
|
||||
|
||||
func umount(v *ofsVolume) error {
|
||||
@@ -120,13 +163,22 @@ func umount(v *ofsVolume) error {
|
||||
return nil
|
||||
}
|
||||
|
||||
func (d ofsDriver) Remove(r *volume.RemoveRequest) error {
|
||||
//d.Lock()
|
||||
//defer d.Unlock()
|
||||
func (d *ofsDriver) Remove(r *volume.RemoveRequest) error {
|
||||
d.Lock()
|
||||
defer d.Unlock()
|
||||
|
||||
v, ok := d.volumes[r.Name]
|
||||
if !ok {
|
||||
return fmt.Errorf("volume '%s' not found", r.Name)
|
||||
tx, err := d.volumedb.Begin(true)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
defer tx.Rollback()
|
||||
|
||||
v, volumeExists, getVolErr := d.getVolumeInfo(tx, r.Name)
|
||||
if !volumeExists {
|
||||
return fmt.Errorf("volume %s does not exist", r.Name)
|
||||
}
|
||||
if getVolErr != nil {
|
||||
return getVolErr
|
||||
}
|
||||
if len(v.use) != 0 {
|
||||
return fmt.Errorf("volume '%s' currently in use (%d unique)", r.Name, len(v.use))
|
||||
@@ -134,29 +186,52 @@ func (d ofsDriver) Remove(r *volume.RemoveRequest) error {
|
||||
if err := umount(v); err != nil {
|
||||
return err
|
||||
}
|
||||
delete(d.volumes, r.Name)
|
||||
return nil
|
||||
|
||||
if err := d.removeVolumeInfo(tx, r.Name); err != nil {
|
||||
return err
|
||||
}
|
||||
return tx.Commit()
|
||||
}
|
||||
|
||||
func (d ofsDriver) Path(r *volume.PathRequest) (*volume.PathResponse, error) {
|
||||
func (d *ofsDriver) Path(r *volume.PathRequest) (*volume.PathResponse, error) {
|
||||
d.Lock()
|
||||
defer d.Unlock()
|
||||
|
||||
tx, err := d.volumedb.Begin(false)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
defer tx.Rollback()
|
||||
|
||||
volumeInfo, volumeExists, getVolErr := d.getVolumeInfo(tx, r.Name)
|
||||
if !volumeExists {
|
||||
return &volume.PathResponse{}, fmt.Errorf("volume %s does not exist", r.Name)
|
||||
}
|
||||
if getVolErr != nil {
|
||||
return &volume.PathResponse{}, getVolErr
|
||||
}
|
||||
|
||||
return &volume.PathResponse{Mountpoint: volumeInfo.volume.Mountpoint}, nil
|
||||
}
|
||||
|
||||
func (d *ofsDriver) Mount(r *volume.MountRequest) (*volume.MountResponse, error) {
|
||||
//d.Lock()
|
||||
//defer d.Unlock()
|
||||
|
||||
v, ok := d.volumes[r.Name]
|
||||
if !ok {
|
||||
return &volume.PathResponse{}, fmt.Errorf("volume '%s' not found", r.Name)
|
||||
tx, err := d.volumedb.Begin(true)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
return &volume.PathResponse{Mountpoint: v.volume.Mountpoint}, nil
|
||||
}
|
||||
defer tx.Rollback()
|
||||
|
||||
func (d ofsDriver) Mount(r *volume.MountRequest) (*volume.MountResponse, error) {
|
||||
//d.Lock()
|
||||
//defer d.Unlock()
|
||||
|
||||
v, ok := d.volumes[r.Name]
|
||||
if !ok {
|
||||
return &volume.MountResponse{}, fmt.Errorf("volume '%s' not found", r.Name)
|
||||
v, volumeExists, getVolErr := d.getVolumeInfo(tx, r.Name)
|
||||
if !volumeExists {
|
||||
return &volume.MountResponse{}, fmt.Errorf("volume %s does not exist", r.Name)
|
||||
}
|
||||
if getVolErr != nil {
|
||||
return &volume.MountResponse{}, getVolErr
|
||||
}
|
||||
|
||||
log.WithFields(log.Fields{"name": r.Name, "id": r.ID}).Info("Attach ObjectiveFS Volume")
|
||||
if !v.mounted {
|
||||
if err := os.MkdirAll(v.volume.Mountpoint, 0755); err != nil {
|
||||
@@ -206,31 +281,45 @@ func (d ofsDriver) Mount(r *volume.MountRequest) (*volume.MountResponse, error)
|
||||
v.mounted = true
|
||||
}
|
||||
v.use[r.ID] = true
|
||||
return &volume.MountResponse{Mountpoint: v.volume.Mountpoint}, nil
|
||||
|
||||
d.storeVolumeInfo(tx, r.Name, v)
|
||||
|
||||
return &volume.MountResponse{Mountpoint: v.volume.Mountpoint}, tx.Commit()
|
||||
}
|
||||
|
||||
func (d ofsDriver) Unmount(r *volume.UnmountRequest) error {
|
||||
//d.Lock()
|
||||
//defer d.Unlock()
|
||||
func (d *ofsDriver) Unmount(r *volume.UnmountRequest) error {
|
||||
d.Lock()
|
||||
defer d.Unlock()
|
||||
|
||||
v, ok := d.volumes[r.Name]
|
||||
if !ok {
|
||||
return fmt.Errorf("volume '%s' not found", r.Name)
|
||||
}
|
||||
log.WithFields(log.Fields{"name": r.Name, "id": r.ID}).Info("Detach ObjectiveFS Volume")
|
||||
|
||||
tx, err := d.volumedb.Begin(true)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
defer tx.Rollback()
|
||||
|
||||
v, volumeExists, getVolErr := d.getVolumeInfo(tx, r.Name)
|
||||
if !volumeExists {
|
||||
return fmt.Errorf("volume %s does not exist", r.Name)
|
||||
}
|
||||
if getVolErr != nil {
|
||||
return getVolErr
|
||||
}
|
||||
|
||||
delete(v.use, r.ID)
|
||||
if len(v.use) == 0 && v.asap {
|
||||
if err := umount(v); err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
return nil
|
||||
|
||||
d.storeVolumeInfo(tx, r.Name, v)
|
||||
|
||||
return tx.Commit()
|
||||
}
|
||||
|
||||
func (d ofsDriver) Capabilities() *volume.CapabilitiesResponse {
|
||||
//d.Lock()
|
||||
//defer d.Unlock()
|
||||
|
||||
func (d *ofsDriver) Capabilities() *volume.CapabilitiesResponse {
|
||||
return &volume.CapabilitiesResponse{Capabilities: volume.Capability{Scope: "local"}}
|
||||
}
|
||||
|
||||
@@ -272,7 +361,20 @@ func main() {
|
||||
}
|
||||
}
|
||||
|
||||
d := ofsDriver{volumes: make(map[string]*ofsVolume), defEnv: defEnv}
|
||||
db, err := bolt.Open("objectivefs.db", 0600, nil)
|
||||
if err != nil {
|
||||
log.Fatal(err)
|
||||
}
|
||||
|
||||
db.Update(func(tx *bolt.Tx) error {
|
||||
_, err := tx.CreateBucketIfNotExists([]byte(volumeBucket))
|
||||
if err != nil {
|
||||
log.Fatalf("create bucket: %s", err)
|
||||
}
|
||||
return nil
|
||||
})
|
||||
|
||||
d := &ofsDriver{volumedb: db, defEnv: defEnv}
|
||||
h := volume.NewHandler(d)
|
||||
u, _ := user.Lookup("root")
|
||||
gid, _ := strconv.Atoi(u.Gid)
|
||||
@@ -280,3 +382,64 @@ func main() {
|
||||
log.Fatal(err)
|
||||
}
|
||||
}
|
||||
|
||||
// ----------------
|
||||
|
||||
func (p *ofsVolume) gobEncode() ([]byte, error) {
|
||||
buf := new(bytes.Buffer)
|
||||
enc := gob.NewEncoder(buf)
|
||||
err := enc.Encode(p)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
return buf.Bytes(), nil
|
||||
}
|
||||
|
||||
func gobDecode(data []byte) (*ofsVolume, error) {
|
||||
var p *ofsVolume
|
||||
buf := bytes.NewBuffer(data)
|
||||
dec := gob.NewDecoder(buf)
|
||||
err := dec.Decode(&p)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
return p, nil
|
||||
}
|
||||
|
||||
func (p *ofsDriver) storeVolumeInfo(tx *bolt.Tx, volumeName string, info *ofsVolume) error {
|
||||
bucket := tx.Bucket([]byte(volumeBucket))
|
||||
b, err := info.gobEncode()
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
return bucket.Put([]byte(volumeName), b)
|
||||
}
|
||||
|
||||
func (p *ofsDriver) getVolumeInfo(tx *bolt.Tx, volumeName string) (*ofsVolume, bool, error) {
|
||||
bucket := tx.Bucket([]byte(volumeBucket))
|
||||
v := bucket.Get([]byte(volumeName))
|
||||
if v == nil {
|
||||
return nil, false, nil
|
||||
}
|
||||
info, err := gobDecode(v)
|
||||
return info, true, err
|
||||
}
|
||||
|
||||
func (p *ofsDriver) getVolumeMap(tx *bolt.Tx) (map[string]ofsVolume, error) {
|
||||
bucket := tx.Bucket([]byte(volumeBucket))
|
||||
ret := make(map[string]ofsVolume)
|
||||
err := bucket.ForEach(func(k, v []byte) error {
|
||||
info, err := gobDecode(v)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
ret[string(k)] = *info
|
||||
return nil
|
||||
})
|
||||
return ret, err
|
||||
}
|
||||
|
||||
func (p *ofsDriver) removeVolumeInfo(tx *bolt.Tx, volumeName string) error {
|
||||
bucket := tx.Bucket([]byte(volumeBucket))
|
||||
return bucket.Delete([]byte(volumeName))
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user