From 12265880aadb77b4aa871d673d28a4b39c392988 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Roman=20Van=C3=AD=C4=8Dek?= Date: Fri, 26 May 2023 17:29:29 +0200 Subject: [PATCH] Track volume runtime info separately for correct reboot behaviour. --- main.go | 81 ++++++++++++++++++++++++++++++++------------------------- 1 file changed, 45 insertions(+), 36 deletions(-) diff --git a/main.go b/main.go index d691dee..d04f4ee 100644 --- a/main.go +++ b/main.go @@ -33,18 +33,22 @@ import ( ) type ofsVolume struct { - Volume *volume.Volume - Fs string - Opts string - Env []string - Use map[string]bool - Mounted bool - Asap bool + Volume *volume.Volume + Fs string + Opts string + Env []string + Asap bool +} + +type ofsVolumeRt struct { + use map[string]bool + mounted bool } type ofsDriver struct { sync.RWMutex volumedb *bolt.DB + volumeRt map[string]*ofsVolumeRt defEnv map[string]string } @@ -65,14 +69,13 @@ func (d *ofsDriver) Create(r *volume.CreateRequest) error { } defer tx.Rollback() - _, volumeExists, err := d.getVolumeInfo(tx, r.Name) + _, _, volumeExists, err := d.getVolumeInfo(tx, r.Name) if volumeExists { return fmt.Errorf("volume '%s' already exists", r.Name) } v := &ofsVolume{} v.Volume = &volume.Volume{Name: r.Name, Mountpoint: filepath.Join(volume.DefaultDockerRootDirectory, "objectivefs", r.Name), CreatedAt: time.Now().Format(time.RFC3339Nano)} - v.Use = make(map[string]bool) v.Opts = "" v.Fs = r.Name env := make(map[string]string) @@ -138,7 +141,7 @@ func (d *ofsDriver) Get(r *volume.GetRequest) (*volume.GetResponse, error) { } defer tx.Rollback() - volumeInfo, volumeExists, getVolErr := d.getVolumeInfo(tx, r.Name) + volumeInfo, _, volumeExists, getVolErr := d.getVolumeInfo(tx, r.Name) if !volumeExists { return &volume.GetResponse{}, fmt.Errorf("volume %s does not exist", r.Name) } @@ -148,10 +151,10 @@ func (d *ofsDriver) Get(r *volume.GetRequest) (*volume.GetResponse, error) { return &volume.GetResponse{Volume: volumeInfo.Volume}, nil } -func umount(v *ofsVolume) error { +func umount(v *ofsVolume, rt *ofsVolumeRt) error { log.WithFields(log.Fields{"name": v.Volume.Name}).Info("Unmount ObjectiveFS Volume") - if !v.Mounted { - return nil + if !rt.mounted { + return fmt.Errorf("Volume is not mounted") } if err := exec.Command("umount", v.Volume.Mountpoint).Run(); err != nil { return err @@ -159,7 +162,7 @@ func umount(v *ofsVolume) error { if err := os.Remove(v.Volume.Mountpoint); err != nil { return err } - v.Mounted = false + rt.mounted = false return nil } @@ -173,18 +176,20 @@ func (d *ofsDriver) Remove(r *volume.RemoveRequest) error { } defer tx.Rollback() - v, volumeExists, getVolErr := d.getVolumeInfo(tx, r.Name) + v, rt, volumeExists, getVolErr := d.getVolumeInfo(tx, r.Name) if !volumeExists { return fmt.Errorf("volume %s does not exist", r.Name) } if getVolErr != nil { return getVolErr } - if len(v.Use) != 0 { - return fmt.Errorf("volume '%s' currently in use (%d unique)", r.Name, len(v.Use)) + if len(rt.use) != 0 { + return fmt.Errorf("volume '%s' currently in use (%d unique)", r.Name, len(rt.use)) } - if err := umount(v); err != nil { - return err + if rt.mounted { + if err := umount(v, rt); err != nil { + return err + } } if err := d.removeVolumeInfo(tx, r.Name); err != nil { @@ -203,7 +208,7 @@ func (d *ofsDriver) Path(r *volume.PathRequest) (*volume.PathResponse, error) { } defer tx.Rollback() - volumeInfo, volumeExists, getVolErr := d.getVolumeInfo(tx, r.Name) + volumeInfo, _, volumeExists, getVolErr := d.getVolumeInfo(tx, r.Name) if !volumeExists { return &volume.PathResponse{}, fmt.Errorf("volume %s does not exist", r.Name) } @@ -215,8 +220,8 @@ func (d *ofsDriver) Path(r *volume.PathRequest) (*volume.PathResponse, error) { } func (d *ofsDriver) Mount(r *volume.MountRequest) (*volume.MountResponse, error) { - //d.Lock() - //defer d.Unlock() + d.Lock() + defer d.Unlock() tx, err := d.volumedb.Begin(true) if err != nil { @@ -224,7 +229,7 @@ func (d *ofsDriver) Mount(r *volume.MountRequest) (*volume.MountResponse, error) } defer tx.Rollback() - v, volumeExists, getVolErr := d.getVolumeInfo(tx, r.Name) + v, rt, volumeExists, getVolErr := d.getVolumeInfo(tx, r.Name) if !volumeExists { return &volume.MountResponse{}, fmt.Errorf("volume %s does not exist", r.Name) } @@ -233,7 +238,7 @@ func (d *ofsDriver) Mount(r *volume.MountRequest) (*volume.MountResponse, error) } log.WithFields(log.Fields{"name": r.Name, "id": r.ID}).Info("Attach ObjectiveFS Volume") - if !v.Mounted { + if !rt.mounted { if err := os.MkdirAll(v.Volume.Mountpoint, 0755); err != nil { return &volume.MountResponse{}, err } @@ -258,19 +263,16 @@ func (d *ofsDriver) Mount(r *volume.MountRequest) (*volume.MountResponse, error) for scanner.Scan() { log.WithFields(log.Fields{"name": r.Name}).Info(scanner.Text()) } - log.WithFields(log.Fields{"name": r.Name}).Info("Completing mount process") cmd.Wait() log.WithFields(log.Fields{"name": r.Name}).Info("Completed mount process") }() if err := cmd.Start(); err != nil { return &volume.MountResponse{}, fmt.Errorf("unexpected error mounting '%s' error: %s", r.Name, err.Error()) } - log.WithFields(log.Fields{"name": r.Name}).Info("Process starting") // The drawback of running the mount in the foreground is there is no way to tell if it failed // to initially connect. So we just wait a fixed amount of time and check for process exit. time.Sleep(1 * time.Second) - log.WithFields(log.Fields{"name": r.Name}).Info("Process started") if cmd.ProcessState != nil { // The process has exited so consider an error occured log.WithFields(log.Fields{"name": r.Name, "exitStatus": cmd.ProcessState.ExitCode()}).Error("Volume mount failed") @@ -278,9 +280,9 @@ func (d *ofsDriver) Mount(r *volume.MountRequest) (*volume.MountResponse, error) } log.WithFields(log.Fields{"name": r.Name}).Info("Volume mounted") - v.Mounted = true + rt.mounted = true } - v.Use[r.ID] = true + rt.use[r.ID] = true d.storeVolumeInfo(tx, r.Name, v) @@ -299,7 +301,7 @@ func (d *ofsDriver) Unmount(r *volume.UnmountRequest) error { } defer tx.Rollback() - v, volumeExists, getVolErr := d.getVolumeInfo(tx, r.Name) + v, rt, volumeExists, getVolErr := d.getVolumeInfo(tx, r.Name) if !volumeExists { return fmt.Errorf("volume %s does not exist", r.Name) } @@ -307,9 +309,9 @@ func (d *ofsDriver) Unmount(r *volume.UnmountRequest) error { return getVolErr } - delete(v.Use, r.ID) - if len(v.Use) == 0 && v.Asap { - if err := umount(v); err != nil { + delete(rt.use, r.ID) + if len(rt.use) == 0 && v.Asap { + if err := umount(v, rt); err != nil { return err } } @@ -415,14 +417,21 @@ func (p *ofsDriver) storeVolumeInfo(tx *bolt.Tx, volumeName string, info *ofsVol return bucket.Put([]byte(volumeName), b) } -func (p *ofsDriver) getVolumeInfo(tx *bolt.Tx, volumeName string) (*ofsVolume, bool, error) { +func (p *ofsDriver) getVolumeInfo(tx *bolt.Tx, volumeName string) (*ofsVolume, *ofsVolumeRt, bool, error) { bucket := tx.Bucket([]byte(volumeBucket)) v := bucket.Get([]byte(volumeName)) if v == nil { - return nil, false, nil + return nil, nil, false, nil } info, err := gobDecode(v) - return info, true, err + + rt, ok := p.volumeRt[volumeName] + if !ok { + rt = &ofsVolumeRt{use: make(map[string]bool), mounted: false} + p.volumeRt[volumeName] = rt + } + + return info, rt, true, err } func (p *ofsDriver) getVolumeMap(tx *bolt.Tx) (map[string]ofsVolume, error) {