diff --git a/bin/mount.objectivefs b/bin/mount.objectivefs index d8b524f..cb4f1d9 100644 Binary files a/bin/mount.objectivefs and b/bin/mount.objectivefs differ diff --git a/config.json b/config.json index 39fdafd..0870c1a 100644 --- a/config.json +++ b/config.json @@ -92,6 +92,13 @@ "settable": [ "value" ] + }, + { + "name": "OBJECTIVEFS_MOUNT_SNAPSHOTS", + "value": "no", + "settable": [ + "value" + ] } ], "network": { diff --git a/main.go b/main.go index b451a2a..2efbf23 100644 --- a/main.go +++ b/main.go @@ -22,6 +22,7 @@ import ( "os" "os/exec" "os/user" + "path" "path/filepath" "strconv" "strings" @@ -34,11 +35,12 @@ import ( ) type ofsVolume struct { - Volume *volume.Volume - Fs string - Opts string - Env []string - Asap bool + Volume *volume.Volume + Fs string + Opts string + Env []string + Asap bool + MountSnapshots bool } type ofsVolumeRt struct { @@ -48,16 +50,23 @@ type ofsVolumeRt struct { type ofsDriver struct { sync.RWMutex - volumedb *bolt.DB - volumeRt map[string]*ofsVolumeRt - defEnv map[string]string - defMountOpt string + volumedb *bolt.DB + volumeRt map[string]*ofsVolumeRt + defEnv map[string]string + defMountOpt string + defMountSnapshots bool +} + +type snapshotRule struct { + count int + period time.Duration } var version = "1.0" const ( - volumeBucket = "volumes" + volumeBucket = "volumes" + snapshotsDirectory = "snapshots" ) func (d *ofsDriver) Create(r *volume.CreateRequest) error { @@ -80,6 +89,7 @@ func (d *ofsDriver) Create(r *volume.CreateRequest) error { v.Volume = &volume.Volume{Name: r.Name, Mountpoint: filepath.Join(volume.DefaultDockerRootDirectory, "objectivefs", r.Name), CreatedAt: time.Now().Format(time.RFC3339Nano)} v.Opts = d.defMountOpt v.Fs = r.Name + v.MountSnapshots = d.defMountSnapshots env := make(map[string]string) for id, val := range d.defEnv { env[id] = val @@ -96,6 +106,8 @@ func (d *ofsDriver) Create(r *volume.CreateRequest) error { } case "asap": v.Asap = true + case "mountSnapshots": + v.MountSnapshots = val == "yes" default: env[key] = val } @@ -289,14 +301,79 @@ func (d *ofsDriver) Mount(r *volume.MountRequest) (*volume.MountResponse, error) } // Check for mount - mount := exec.Command("df", "--output=fstype", v.Volume.Mountpoint) - if out, err := mount.CombinedOutput(); err == nil && strings.Index(string(out), "fuse.objectivefs") >= 0 { + if isObjfs, err := isObjectiveFsMount(v.Volume.Mountpoint); err == nil && isObjfs { break } } log.WithFields(log.Fields{"name": r.Name}).Info("Volume mounted") rt.mounted = true + + if v.MountSnapshots { + go func() { + snapshotsPath := path.Join(v.Volume.Mountpoint, snapshotsDirectory) + log.WithFields(log.Fields{"name": r.Name, "path": snapshotsPath}).Info("Snapshot auto-mount is starting") + if _, err := os.Stat(snapshotsPath); os.IsNotExist(err) { + log.WithFields(log.Fields{"name": r.Name, "directory": snapshotsDirectory}).Info("Creating the snapshots mount directory") + if err := os.Mkdir(snapshotsPath, os.ModePerm); err != nil { + log.WithFields(log.Fields{"name": r.Name}).Error("Failed to create the snapshots mount directory. Snapshot mounting will be disabled.") + return + } + } + + for cmd.ProcessState == nil { + if snapshotRulesB, err := applyEnv(exec.Command("/sbin/mount.objectivefs", "snapshot", "-l", v.Fs), v.Env).Output(); err == nil { + if expectedSnapshots, err := generateSnapshotsFromRulesForNow(string(snapshotRulesB)); err == nil { + if existingSnapshotsB, err := applyEnv(exec.Command("/sbin/mount.objectivefs", "list", "-sz", v.Fs), v.Env).Output(); err == nil { + if existingSnapshots, err := parseExistingSnapshots(string(existingSnapshotsB), v.Fs); err == nil { + if existingMounts, err := getMountedSnaphots(snapshotsPath); err == nil { + // Destroy old snapshots + for i, name := range setMinus(existingSnapshots, expectedSnapshots) { + expectedOutput := "Snapshot '" + name + "' destroyed." + if output, err := applyEnv(exec.Command("/sbin/mount.objectivefs", "destroy", name, "-f"), v.Env).Output(); err != nil || string(output) != expectedOutput { + log.WithFields(log.Fields{"name": r.Name, "snapshot": i}).Warn("Failed to destroy an expired snapshot.") + } + } + + // Remove mounts of expired snapshots + for i, path := range setMinus(existingMounts, expectedSnapshots) { + if err := exec.Command("umount", path).Run(); err != nil { + log.WithFields(log.Fields{"name": r.Name, "snapshot": i}).Warn("Failed to unmount an expired snapshot.") + } + if err := os.Remove(path); err != nil { + log.WithFields(log.Fields{"name": r.Name, "snapshot": i}).Warn("Failed to remove directory of an expired snapshot.") + } + } + + // Add new mounts + for i, name := range setMinus(setIntersect(existingSnapshots, expectedSnapshots), existingMounts) { + dest := filepath.Join(snapshotsPath, i) + if err := applyEnv(exec.Command("/sbin/mount.objectivefs", name, dest), v.Env).Run(); err != nil { + log.WithFields(log.Fields{"name": r.Name, "snapshot": i}).Warn("Failed to destroy an expired snapshot.") + } + } + } else { + log.WithFields(log.Fields{"name": r.Name, "Error": err}).Warn("Cannot determine existing snapshot mounts") + } + } else { + log.WithFields(log.Fields{"name": r.Name, "Error": err}).Warn("Cannot parse existing snapshot names") + } + } else { + log.WithFields(log.Fields{"name": r.Name, "Error": err}).Warn("Cannot list existing snapshot names") + } + } else { + log.WithFields(log.Fields{"name": r.Name, "Error": err}).Warn("Cannot determine expected snapshot names") + } + } else { + log.WithFields(log.Fields{"name": r.Name, "Error": err}).Warn("Cannot detect snapshot frequency") + } + + // Cycle periodically + time.Sleep(5 * time.Minute) + } + log.WithFields(log.Fields{"name": r.Name}).Info("Completed snapshot auto-mounting") + }() + } } rt.use[r.ID] = true @@ -381,6 +458,8 @@ func main() { defMountOpt := os.Getenv("OBJECTIVEFS_MOUNT_OPTIONS") + defMountSnapshots := os.Getenv("OBJECTIVEFS_MOUNT_SNAPSHOTS") == "yes" + db, err := bolt.Open("objectivefs.db", 0600, nil) if err != nil { log.Fatal(err) @@ -394,7 +473,7 @@ func main() { return nil }) - d := &ofsDriver{volumedb: db, volumeRt: make(map[string]*ofsVolumeRt), defEnv: defEnv, defMountOpt: defMountOpt} + d := &ofsDriver{volumedb: db, volumeRt: make(map[string]*ofsVolumeRt), defEnv: defEnv, defMountOpt: defMountOpt, defMountSnapshots: defMountSnapshots} h := volume.NewHandler(d) u, _ := user.Lookup("root") gid, _ := strconv.Atoi(u.Gid) @@ -470,3 +549,167 @@ func (p *ofsDriver) removeVolumeInfo(tx *bolt.Tx, volumeName string) error { bucket := tx.Bucket([]byte(volumeBucket)) return bucket.Delete([]byte(volumeName)) } + +/* +func parseSnapshotRules(rulesS string) ([]snapshotRule, error) { + var result []snapshotRule + rules := strings.Split(rulesS, " ") + + for _, i := range rules { + var rule snapshotRule + + countAndRest := strings.SplitN(i, "@", 2) + if len(countAndRest) != 2 || len(countAndRest[1]) < 2 { + return nil, fmt.Errorf("Failed to parse snapshot rule '" + i + "'") + } + + if count, err := strconv.ParseInt(countAndRest[0], 10, 32); err != nil { + return nil, fmt.Errorf("Failed to parse snapshot rule '" + i + "'") + } else { + rule.count = int(count) + } + + period := countAndRest[1] + var multiplier time.Duration + switch period[len(period)-1] { + // see https://objectivefs.com/howto/snapshots + case 'm': + multiplier = time.Minute + case 'h': + multiplier = time.Hour + case 'd': + multiplier = 24 * time.Hour + case 'w': + multiplier = 7 * 24 * time.Hour + case 'n': + multiplier = 4 * 7 * 24 * time.Hour // Simple month (4 weeks) + case 'q': + multiplier = 12 * 7 * 24 * time.Hour // Simple quarter (12 weeks) + case 'y': + multiplier = 48 * 7 * 24 * time.Hour // Simple year (48 weeks) + default: + return nil, fmt.Errorf("Failed to parse snapshot period '" + period + "'") + } + + if periodCount, err := strconv.ParseInt(period[:len(period)-1], 10, 32); err != nil { + return nil, fmt.Errorf("Failed to parse snapshot rule '" + i + "'") + } else { + rule.period = multiplier * time.Duration(periodCount) + } + + result = append(result, rule) + } + + return result, nil +}*/ + +func applyEnv(cmd *exec.Cmd, env []string) *exec.Cmd { + cmd.Env = env + return cmd +} + +func generateSnapshotsFromRulesForNow(rules string) (map[string]bool, error) { + if timesB, err := exec.Command("/sbin/mount.objectivefs", "snapshot", "-vs", rules).Output(); err == nil { + var result map[string]bool + scanner := bufio.NewScanner(strings.NewReader(string(timesB))) + for scanner.Scan() { + line := scanner.Text() + if strings.HasPrefix(line, "Number of automatic snapshots:") || strings.HasPrefix(line, "Automatic schedule:") { + continue + } + if _, err := time.Parse("2006-01-02T15:04:05Z", line); err != nil { + return nil, err + } else { + result[line] = false + } + } + return result, nil + + } else { + return nil, err + } +} + +func parseExistingSnapshots(data string, expectedPrefix string) (map[string]string, error) { + var result map[string]string + scanner := bufio.NewScanner(strings.NewReader(data)) + for scanner.Scan() { + line := scanner.Text() + if strings.HasPrefix(line, "NAME") { + // Skip column headers + continue + } + + fields := strings.Fields(line) + name := fields[0] + + uriSchemeAndRest := strings.SplitN(name, "://", 2) + if len(uriSchemeAndRest) != 2 { + return nil, fmt.Errorf("Expected URI scheme in the list of existing snapshosts") + } + + hostAndPath := uriSchemeAndRest[1] + + if hostAndPath == expectedPrefix { + // This is the live filesystem not a snapshot + continue + } + + if !strings.HasPrefix(hostAndPath, expectedPrefix+"@") { + return nil, fmt.Errorf("Unexpected URI in the list of existing snapshosts") + } + + time := hostAndPath[len(expectedPrefix)+1:] + result[time] = name + } + return result, nil +} + +func getMountedSnaphots(baseDir string) (map[string]string, error) { + if entries, err := os.ReadDir(baseDir); err == nil { + return nil, err + } else { + var result map[string]string + for _, i := range entries { + if i.IsDir() { + iPath := filepath.Join(baseDir, i.Name()) + if isObjFs, err := isObjectiveFsMount(iPath); err == nil && isObjFs { + result[i.Name()] = iPath + } + } + } + return result, nil + } +} + +func isObjectiveFsMount(path string) (bool, error) { + mount := exec.Command("df", "--output=target", "-t", "fuse.objectivefs", path) + if data, err := mount.CombinedOutput(); err == nil { + scanner := bufio.NewScanner(strings.NewReader(string(data))) + // On success the first line contains column headers (in our case "Mounted on") + // and the second line contains the nearest mount point on the root path + return scanner.Scan() && scanner.Scan() && scanner.Text() == path, nil + } else { + return false, err + } +} + +func setMinus[TKey comparable, TValue any, TValue2 any](a map[TKey]TValue, b map[TKey]TValue2) map[TKey]TValue { + var result map[TKey]TValue + for i, j := range a { + if _, contains := b[i]; contains { + result[i] = j + } + } + return result +} + +func setIntersect[TKey comparable, TValue any, TValue2 any](a map[TKey]TValue, b map[TKey]TValue2) map[TKey]TValue { + var result map[TKey]TValue + for i, j := range a { + if _, contains := b[i]; contains { + result[i] = j + } + } + return result +}