Track volume runtime info separately for correct reboot behaviour.
All checks were successful
continuous-integration/drone/push Build is passing

This commit is contained in:
2023-05-26 17:29:29 +02:00
parent 35d1d0c654
commit 12265880aa

81
main.go
View File

@@ -33,18 +33,22 @@ import (
)
type ofsVolume struct {
Volume *volume.Volume
Fs string
Opts string
Env []string
Use map[string]bool
Mounted bool
Asap bool
Volume *volume.Volume
Fs string
Opts string
Env []string
Asap bool
}
type ofsVolumeRt struct {
use map[string]bool
mounted bool
}
type ofsDriver struct {
sync.RWMutex
volumedb *bolt.DB
volumeRt map[string]*ofsVolumeRt
defEnv map[string]string
}
@@ -65,14 +69,13 @@ func (d *ofsDriver) Create(r *volume.CreateRequest) error {
}
defer tx.Rollback()
_, volumeExists, err := d.getVolumeInfo(tx, r.Name)
_, _, volumeExists, err := d.getVolumeInfo(tx, r.Name)
if volumeExists {
return fmt.Errorf("volume '%s' already exists", r.Name)
}
v := &ofsVolume{}
v.Volume = &volume.Volume{Name: r.Name, Mountpoint: filepath.Join(volume.DefaultDockerRootDirectory, "objectivefs", r.Name), CreatedAt: time.Now().Format(time.RFC3339Nano)}
v.Use = make(map[string]bool)
v.Opts = ""
v.Fs = r.Name
env := make(map[string]string)
@@ -138,7 +141,7 @@ func (d *ofsDriver) Get(r *volume.GetRequest) (*volume.GetResponse, error) {
}
defer tx.Rollback()
volumeInfo, volumeExists, getVolErr := d.getVolumeInfo(tx, r.Name)
volumeInfo, _, volumeExists, getVolErr := d.getVolumeInfo(tx, r.Name)
if !volumeExists {
return &volume.GetResponse{}, fmt.Errorf("volume %s does not exist", r.Name)
}
@@ -148,10 +151,10 @@ func (d *ofsDriver) Get(r *volume.GetRequest) (*volume.GetResponse, error) {
return &volume.GetResponse{Volume: volumeInfo.Volume}, nil
}
func umount(v *ofsVolume) error {
func umount(v *ofsVolume, rt *ofsVolumeRt) error {
log.WithFields(log.Fields{"name": v.Volume.Name}).Info("Unmount ObjectiveFS Volume")
if !v.Mounted {
return nil
if !rt.mounted {
return fmt.Errorf("Volume is not mounted")
}
if err := exec.Command("umount", v.Volume.Mountpoint).Run(); err != nil {
return err
@@ -159,7 +162,7 @@ func umount(v *ofsVolume) error {
if err := os.Remove(v.Volume.Mountpoint); err != nil {
return err
}
v.Mounted = false
rt.mounted = false
return nil
}
@@ -173,18 +176,20 @@ func (d *ofsDriver) Remove(r *volume.RemoveRequest) error {
}
defer tx.Rollback()
v, volumeExists, getVolErr := d.getVolumeInfo(tx, r.Name)
v, rt, volumeExists, getVolErr := d.getVolumeInfo(tx, r.Name)
if !volumeExists {
return fmt.Errorf("volume %s does not exist", r.Name)
}
if getVolErr != nil {
return getVolErr
}
if len(v.Use) != 0 {
return fmt.Errorf("volume '%s' currently in use (%d unique)", r.Name, len(v.Use))
if len(rt.use) != 0 {
return fmt.Errorf("volume '%s' currently in use (%d unique)", r.Name, len(rt.use))
}
if err := umount(v); err != nil {
return err
if rt.mounted {
if err := umount(v, rt); err != nil {
return err
}
}
if err := d.removeVolumeInfo(tx, r.Name); err != nil {
@@ -203,7 +208,7 @@ func (d *ofsDriver) Path(r *volume.PathRequest) (*volume.PathResponse, error) {
}
defer tx.Rollback()
volumeInfo, volumeExists, getVolErr := d.getVolumeInfo(tx, r.Name)
volumeInfo, _, volumeExists, getVolErr := d.getVolumeInfo(tx, r.Name)
if !volumeExists {
return &volume.PathResponse{}, fmt.Errorf("volume %s does not exist", r.Name)
}
@@ -215,8 +220,8 @@ func (d *ofsDriver) Path(r *volume.PathRequest) (*volume.PathResponse, error) {
}
func (d *ofsDriver) Mount(r *volume.MountRequest) (*volume.MountResponse, error) {
//d.Lock()
//defer d.Unlock()
d.Lock()
defer d.Unlock()
tx, err := d.volumedb.Begin(true)
if err != nil {
@@ -224,7 +229,7 @@ func (d *ofsDriver) Mount(r *volume.MountRequest) (*volume.MountResponse, error)
}
defer tx.Rollback()
v, volumeExists, getVolErr := d.getVolumeInfo(tx, r.Name)
v, rt, volumeExists, getVolErr := d.getVolumeInfo(tx, r.Name)
if !volumeExists {
return &volume.MountResponse{}, fmt.Errorf("volume %s does not exist", r.Name)
}
@@ -233,7 +238,7 @@ func (d *ofsDriver) Mount(r *volume.MountRequest) (*volume.MountResponse, error)
}
log.WithFields(log.Fields{"name": r.Name, "id": r.ID}).Info("Attach ObjectiveFS Volume")
if !v.Mounted {
if !rt.mounted {
if err := os.MkdirAll(v.Volume.Mountpoint, 0755); err != nil {
return &volume.MountResponse{}, err
}
@@ -258,19 +263,16 @@ func (d *ofsDriver) Mount(r *volume.MountRequest) (*volume.MountResponse, error)
for scanner.Scan() {
log.WithFields(log.Fields{"name": r.Name}).Info(scanner.Text())
}
log.WithFields(log.Fields{"name": r.Name}).Info("Completing mount process")
cmd.Wait()
log.WithFields(log.Fields{"name": r.Name}).Info("Completed mount process")
}()
if err := cmd.Start(); err != nil {
return &volume.MountResponse{}, fmt.Errorf("unexpected error mounting '%s' error: %s", r.Name, err.Error())
}
log.WithFields(log.Fields{"name": r.Name}).Info("Process starting")
// The drawback of running the mount in the foreground is there is no way to tell if it failed
// to initially connect. So we just wait a fixed amount of time and check for process exit.
time.Sleep(1 * time.Second)
log.WithFields(log.Fields{"name": r.Name}).Info("Process started")
if cmd.ProcessState != nil {
// The process has exited so consider an error occured
log.WithFields(log.Fields{"name": r.Name, "exitStatus": cmd.ProcessState.ExitCode()}).Error("Volume mount failed")
@@ -278,9 +280,9 @@ func (d *ofsDriver) Mount(r *volume.MountRequest) (*volume.MountResponse, error)
}
log.WithFields(log.Fields{"name": r.Name}).Info("Volume mounted")
v.Mounted = true
rt.mounted = true
}
v.Use[r.ID] = true
rt.use[r.ID] = true
d.storeVolumeInfo(tx, r.Name, v)
@@ -299,7 +301,7 @@ func (d *ofsDriver) Unmount(r *volume.UnmountRequest) error {
}
defer tx.Rollback()
v, volumeExists, getVolErr := d.getVolumeInfo(tx, r.Name)
v, rt, volumeExists, getVolErr := d.getVolumeInfo(tx, r.Name)
if !volumeExists {
return fmt.Errorf("volume %s does not exist", r.Name)
}
@@ -307,9 +309,9 @@ func (d *ofsDriver) Unmount(r *volume.UnmountRequest) error {
return getVolErr
}
delete(v.Use, r.ID)
if len(v.Use) == 0 && v.Asap {
if err := umount(v); err != nil {
delete(rt.use, r.ID)
if len(rt.use) == 0 && v.Asap {
if err := umount(v, rt); err != nil {
return err
}
}
@@ -415,14 +417,21 @@ func (p *ofsDriver) storeVolumeInfo(tx *bolt.Tx, volumeName string, info *ofsVol
return bucket.Put([]byte(volumeName), b)
}
func (p *ofsDriver) getVolumeInfo(tx *bolt.Tx, volumeName string) (*ofsVolume, bool, error) {
func (p *ofsDriver) getVolumeInfo(tx *bolt.Tx, volumeName string) (*ofsVolume, *ofsVolumeRt, bool, error) {
bucket := tx.Bucket([]byte(volumeBucket))
v := bucket.Get([]byte(volumeName))
if v == nil {
return nil, false, nil
return nil, nil, false, nil
}
info, err := gobDecode(v)
return info, true, err
rt, ok := p.volumeRt[volumeName]
if !ok {
rt = &ofsVolumeRt{use: make(map[string]bool), mounted: false}
p.volumeRt[volumeName] = rt
}
return info, rt, true, err
}
func (p *ofsDriver) getVolumeMap(tx *bolt.Tx) (map[string]ofsVolume, error) {