Initial commit for flat directory storage.
This commit is contained in:
2
.gitignore
vendored
Normal file
2
.gitignore
vendored
Normal file
@@ -0,0 +1,2 @@
|
||||
test/
|
||||
minio-hdd
|
||||
10
go.mod
Normal file
10
go.mod
Normal file
@@ -0,0 +1,10 @@
|
||||
module git.ivasoft.cz/sw/minio-hdd
|
||||
|
||||
go 1.21
|
||||
|
||||
require (
|
||||
github.com/minio/pkg/v2 v2.0.9
|
||||
github.com/sirupsen/logrus v1.9.0
|
||||
)
|
||||
|
||||
require golang.org/x/sys v0.15.0
|
||||
18
go.sum
Normal file
18
go.sum
Normal file
@@ -0,0 +1,18 @@
|
||||
github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
|
||||
github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c=
|
||||
github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
|
||||
github.com/minio/pkg/v2 v2.0.9 h1:JzqkNtqLekhdv1B/cdC8WiN97Qb/qW/AW03VNuaTIR4=
|
||||
github.com/minio/pkg/v2 v2.0.9/go.mod h1:yayUTo82b0RK+e97hGb1naC787mOtUEyDs3SIcwSyHI=
|
||||
github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM=
|
||||
github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4=
|
||||
github.com/sirupsen/logrus v1.9.0 h1:trlNQbNUG3OdDrDil03MCb1H2o9nJ1x4/5LYw7byDE0=
|
||||
github.com/sirupsen/logrus v1.9.0/go.mod h1:naHLuLoDiP4jHNo9R0sCBMtWGeIprob74mVsIT4qYEQ=
|
||||
github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME=
|
||||
github.com/stretchr/testify v1.7.0 h1:nwc3DEeHmmLAfoZucVR881uASk0Mfjw8xYJ99tb5CcY=
|
||||
github.com/stretchr/testify v1.7.0/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg=
|
||||
golang.org/x/sys v0.0.0-20220715151400-c0bba94af5f8/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
|
||||
golang.org/x/sys v0.15.0 h1:h48lPFYpsTvQJZF4EKyI4aLHaev3CxivZmv7yZig9pc=
|
||||
golang.org/x/sys v0.15.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA=
|
||||
gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=
|
||||
gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c h1:dUUwHk2QECo/6vqA44rthZ8ie2QXMNeKRTHCNY2nXvo=
|
||||
gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM=
|
||||
307
main.go
Normal file
307
main.go
Normal file
@@ -0,0 +1,307 @@
|
||||
package main
|
||||
|
||||
import (
|
||||
"errors"
|
||||
"flag"
|
||||
"fmt"
|
||||
"io"
|
||||
"os"
|
||||
"path/filepath"
|
||||
"sync"
|
||||
"sync/atomic"
|
||||
"syscall"
|
||||
"time"
|
||||
|
||||
"github.com/minio/pkg/v2/ellipses"
|
||||
"golang.org/x/sys/unix"
|
||||
|
||||
log "github.com/sirupsen/logrus"
|
||||
)
|
||||
|
||||
const metaFileName = "xl.meta"
|
||||
const maxMetaLockSize = 8 * 1024
|
||||
|
||||
var (
|
||||
logLevelArg = flag.String("log", "", "log level")
|
||||
volumesArg = flag.String("volumes", "", "minio volumes using expansion notation (ie. /mnt/disk{1...3})")
|
||||
delayArg = flag.String("delay", "10m", "file refresh delay")
|
||||
pageSize = os.Getpagesize()
|
||||
junkCounter byte = 0 // to prevent any compiler optimizations
|
||||
)
|
||||
|
||||
var version = "1.0"
|
||||
|
||||
type volume struct {
|
||||
path string
|
||||
objectMetas map[uint64]*metaFile // objects keyed by their metadata file inode
|
||||
}
|
||||
|
||||
type metaFile struct {
|
||||
visited bool // indicates a visited node in a search
|
||||
mappedData []byte
|
||||
}
|
||||
|
||||
type lockStats struct {
|
||||
objLockedCount int32
|
||||
objNotLockedCount int32
|
||||
objAddedCount int32
|
||||
objRemovedCount int32
|
||||
objAddedLockedSize int64
|
||||
}
|
||||
|
||||
func main() {
|
||||
flag.Parse()
|
||||
|
||||
// Logging
|
||||
if *logLevelArg == "" {
|
||||
if *logLevelArg = os.Getenv("LOG_LEVEL"); *logLevelArg == "" {
|
||||
*logLevelArg = "info"
|
||||
}
|
||||
}
|
||||
|
||||
logLevel, err := log.ParseLevel(*logLevelArg)
|
||||
if err != nil {
|
||||
log.WithError(err).Fatal("Failed to parse log level")
|
||||
}
|
||||
log.SetLevel(logLevel)
|
||||
|
||||
log.Info("Starting Minio Hard Disk Drive optimizer, version " + version)
|
||||
|
||||
// Disk volumes
|
||||
if *volumesArg == "" {
|
||||
if *volumesArg = os.Getenv("MINIO_VOLUMES"); *volumesArg == "" {
|
||||
//*volumesArg = "./test"
|
||||
log.Fatal("Missing info about volumes. Use -volumes or MINIO_VOLUMES.")
|
||||
}
|
||||
}
|
||||
|
||||
var volumes []volume
|
||||
if !ellipses.HasEllipses(*volumesArg) {
|
||||
volumes = append(volumes, volume{path: *volumesArg})
|
||||
} else {
|
||||
patterns, err := ellipses.FindEllipsesPatterns(*volumesArg)
|
||||
if err != nil {
|
||||
log.Fatal(fmt.Errorf("invalid expansion notation: %w", err))
|
||||
}
|
||||
|
||||
for _, pattern := range patterns {
|
||||
for _, lbls := range pattern.Expand() {
|
||||
volumes = append(volumes, volume{path: lbls})
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// Delay
|
||||
if *delayArg == "" {
|
||||
if *delayArg = os.Getenv("CYCLE_DELAY"); *delayArg == "" {
|
||||
*delayArg = "5m"
|
||||
}
|
||||
}
|
||||
delay, err := time.ParseDuration(*delayArg)
|
||||
if err != nil {
|
||||
log.Fatal(fmt.Errorf("invalid cycle delay argument: %w", err))
|
||||
}
|
||||
|
||||
log.Info("Locking all metadata files into RAM")
|
||||
stats, err := executeCycle(volumes)
|
||||
if err != nil {
|
||||
log.Fatal(err)
|
||||
}
|
||||
log.WithFields(log.Fields{
|
||||
"LockedCount": stats.objLockedCount,
|
||||
"FailCount": stats.objNotLockedCount,
|
||||
"LockedSizeMB": (stats.objAddedLockedSize + 1024*1024 - 1) / (1024 * 1024),
|
||||
}).Info("Locked metadata files into RAM")
|
||||
|
||||
//
|
||||
// Execute
|
||||
//
|
||||
defer log.WithFields(log.Fields{"Junk": junkCounter}).Info("Stopping")
|
||||
log.WithFields(log.Fields{"Delay": delay.String()}).Info("Will re-lock metadata files after each delay until killed")
|
||||
for {
|
||||
time.Sleep(delay)
|
||||
|
||||
log.Debug("Re-locking metadata files")
|
||||
|
||||
stats, err := executeCycle(volumes)
|
||||
if err != nil {
|
||||
log.Fatal(err)
|
||||
}
|
||||
|
||||
log.WithFields(log.Fields{
|
||||
"AddedCount": stats.objAddedCount,
|
||||
"RemovedCount": stats.objRemovedCount,
|
||||
"FailCount": stats.objNotLockedCount,
|
||||
"LockedSizeMB": (stats.objAddedLockedSize + 1024*1024 - 1) / (1024 * 1024),
|
||||
}).Debug("Locked new and removed stale metadata files.")
|
||||
}
|
||||
}
|
||||
|
||||
func executeCycle(volumes []volume) (lockStats, error) {
|
||||
var waitGroup sync.WaitGroup
|
||||
var mux sync.Mutex
|
||||
waitGroup.Add(len(volumes))
|
||||
|
||||
var stats lockStats
|
||||
var processErrors []error
|
||||
|
||||
// Execute each volume in parallel as they shall be separate devices
|
||||
// so data reads do not contend with each other.
|
||||
for i := range volumes {
|
||||
go func(volume *volume) {
|
||||
defer waitGroup.Done()
|
||||
statsVol, err := processVolume(volume)
|
||||
if err != nil {
|
||||
mux.Lock()
|
||||
processErrors = append(processErrors, err)
|
||||
mux.Unlock()
|
||||
} else {
|
||||
atomic.AddInt32(&stats.objLockedCount, statsVol.objLockedCount)
|
||||
atomic.AddInt32(&stats.objNotLockedCount, statsVol.objNotLockedCount)
|
||||
atomic.AddInt32(&stats.objAddedCount, statsVol.objAddedCount)
|
||||
atomic.AddInt32(&stats.objRemovedCount, statsVol.objRemovedCount)
|
||||
atomic.AddInt64(&stats.objAddedLockedSize, statsVol.objAddedLockedSize)
|
||||
}
|
||||
}(&volumes[i])
|
||||
}
|
||||
|
||||
waitGroup.Wait()
|
||||
|
||||
if processErrors != nil {
|
||||
return lockStats{}, errors.Join(processErrors...)
|
||||
} else {
|
||||
return stats, nil
|
||||
}
|
||||
}
|
||||
|
||||
func processVolume(volume *volume) (lockStats, error) {
|
||||
dir, err := os.Open(volume.path)
|
||||
if err != nil {
|
||||
return lockStats{}, err
|
||||
}
|
||||
|
||||
if volume.objectMetas == nil {
|
||||
volume.objectMetas = make(map[uint64]*metaFile)
|
||||
}
|
||||
|
||||
var objNewCount int32 = 0
|
||||
var objExistingCount int32 = 0
|
||||
var objFailCount int32 = 0
|
||||
var objNewLockedSize int64 = 0
|
||||
for {
|
||||
bucketNames, err := dir.Readdirnames(1024)
|
||||
if err == io.EOF {
|
||||
break
|
||||
} else if err != nil {
|
||||
return lockStats{}, err
|
||||
}
|
||||
|
||||
for _, i := range bucketNames {
|
||||
bucket, err := os.Open(filepath.Join(volume.path, i))
|
||||
if err != nil {
|
||||
objFailCount++
|
||||
continue
|
||||
}
|
||||
|
||||
for {
|
||||
objectNames, err := bucket.Readdirnames(1024)
|
||||
if err == io.EOF {
|
||||
break
|
||||
} else if err != nil {
|
||||
objFailCount++
|
||||
continue
|
||||
}
|
||||
|
||||
for _, j := range objectNames {
|
||||
metaPath := filepath.Join(volume.path, i, j, metaFileName)
|
||||
metaInfo, err := os.Stat(metaPath)
|
||||
if err != nil {
|
||||
objFailCount++
|
||||
continue
|
||||
}
|
||||
metaInfoEx, ok := metaInfo.Sys().(*syscall.Stat_t)
|
||||
if !ok {
|
||||
return lockStats{}, fmt.Errorf("must be running on a POSIX OS")
|
||||
}
|
||||
|
||||
if meta, ok := volume.objectMetas[metaInfoEx.Ino]; ok {
|
||||
objExistingCount++
|
||||
meta.visited = true
|
||||
} else if meta, lockedBytes, err := LockMetaFile(metaPath, metaInfo.Size()); err == nil {
|
||||
meta.visited = true
|
||||
volume.objectMetas[metaInfoEx.Ino] = meta
|
||||
objNewCount++
|
||||
objNewLockedSize += lockedBytes
|
||||
} else {
|
||||
objFailCount++
|
||||
continue
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
bucket.Close()
|
||||
}
|
||||
}
|
||||
|
||||
// Remove all not visited items
|
||||
var objDeleteCount int32 = 0
|
||||
for i, j := range volume.objectMetas {
|
||||
if !j.visited {
|
||||
UnlockMetaFile(j)
|
||||
delete(volume.objectMetas, i)
|
||||
objDeleteCount++
|
||||
} else {
|
||||
// Prepare for the next run
|
||||
j.visited = false
|
||||
}
|
||||
}
|
||||
|
||||
return lockStats{
|
||||
objLockedCount: objNewCount + objExistingCount,
|
||||
objAddedCount: objNewCount,
|
||||
objNotLockedCount: objFailCount,
|
||||
objAddedLockedSize: objNewLockedSize,
|
||||
objRemovedCount: objDeleteCount,
|
||||
}, nil
|
||||
}
|
||||
|
||||
func LockMetaFile(path string, size int64) (*metaFile, int64, error) {
|
||||
file, err := os.Open(path)
|
||||
if err != nil {
|
||||
return nil, 0, err
|
||||
}
|
||||
defer file.Close()
|
||||
|
||||
var sizeToLock int64
|
||||
if size < maxMetaLockSize {
|
||||
sizeToLock = size
|
||||
} else {
|
||||
sizeToLock = maxMetaLockSize
|
||||
}
|
||||
|
||||
data, err := unix.Mmap(int(file.Fd()), 0, int(sizeToLock), syscall.PROT_READ, syscall.MAP_SHARED)
|
||||
if err != nil {
|
||||
return nil, 0, err
|
||||
}
|
||||
|
||||
// Touch each memory page to cause page-faults and bring the
|
||||
// data into memory if not already there.
|
||||
dataPageCount := (len(data) + pageSize - 1) / pageSize
|
||||
for i := 0; i < dataPageCount; i++ {
|
||||
junkCounter += data[i*pageSize]
|
||||
}
|
||||
|
||||
if err := unix.Mlock(data); err != nil {
|
||||
unix.Munmap(data)
|
||||
return nil, 0, err
|
||||
}
|
||||
|
||||
return &metaFile{
|
||||
mappedData: data,
|
||||
}, sizeToLock, nil
|
||||
}
|
||||
|
||||
func UnlockMetaFile(meta *metaFile) {
|
||||
unix.Munlock(meta.mappedData)
|
||||
unix.Munmap(meta.mappedData)
|
||||
}
|
||||
Reference in New Issue
Block a user