Files
minio-hdd/main.go
Roman Vanicek cc2dfe234f
All checks were successful
continuous-integration/drone/push Build is passing
Do not crash on most possibly minor directory enumeration problem.
2024-03-20 10:43:48 +01:00

440 lines
11 KiB
Go

package main
import (
"errors"
"flag"
"fmt"
"io"
"os"
"path/filepath"
"strconv"
"sync"
"sync/atomic"
"syscall"
"time"
"github.com/minio/pkg/v2/ellipses"
"golang.org/x/sys/unix"
log "github.com/sirupsen/logrus"
)
const metaFileName = "xl.meta"
const maxMetaLockSize = 8 * 1024
var (
logLevelArg = flag.String("log", "", "log level")
volumesArg = flag.String("volumes", "", "minio volumes using expansion notation (ie. /mnt/disk{1...3})")
delayArg = flag.String("delay", "", "file refresh delay")
maxMemoryArg = flag.Uint("maxMemory", 0, "maximum memory in MB can be protected as resident")
pageSize = os.Getpagesize()
junkCounter byte = 0 // to prevent any compiler optimizations
)
var version = "1.1"
type volume struct {
path string
objectMetas map[uint64]*metaFile // objects keyed by their metadata file inode
}
type metaFile struct {
visited bool // indicates a visited node in a search
mappedData []byte
}
type dirSearch struct {
dir *os.File
path string
entriesBatch []string
}
type lockStats struct {
objLockedCount int32
objNotLockedCount int32
objAddedCount int32
objRemovedCount int32
objAddedLockedSize int64
}
func main() {
flag.Parse()
// Logging
if *logLevelArg == "" {
if *logLevelArg = os.Getenv("LOG_LEVEL"); *logLevelArg == "" {
*logLevelArg = "info"
}
}
logLevel, err := log.ParseLevel(*logLevelArg)
if err != nil {
log.WithError(err).Fatal("Failed to parse log level")
}
log.SetLevel(logLevel)
log.Info("Starting Minio Hard Disk Drive optimizer, version " + version)
// Disk volumes
if *volumesArg == "" {
if *volumesArg = os.Getenv("MINIO_VOLUMES"); *volumesArg == "" {
//*volumesArg = "./test"
log.Fatal("Missing info about volumes. Use -volumes or MINIO_VOLUMES.")
}
}
var volumes []volume
if !ellipses.HasEllipses(*volumesArg) {
volumes = append(volumes, volume{path: *volumesArg})
} else {
patterns, err := ellipses.FindEllipsesPatterns(*volumesArg)
if err != nil {
log.Fatal(fmt.Errorf("invalid expansion notation: %w", err))
}
for _, pattern := range patterns {
for _, lbls := range pattern.Expand() {
volumes = append(volumes, volume{path: lbls})
}
}
}
// Delay
if *delayArg == "" {
if *delayArg = os.Getenv("CYCLE_DELAY"); *delayArg == "" {
*delayArg = "10m"
}
}
delay, err := time.ParseDuration(*delayArg)
if err != nil {
log.Fatal(fmt.Errorf("invalid cycle delay argument: %w", err))
}
// Try to raise resource limits
var maxMemory uint64
if *maxMemoryArg == 0 {
maxMemoryEnv := os.Getenv("MAX_MEMORY")
if maxMemoryEnvI, err := strconv.ParseUint(maxMemoryEnv, 10, 32); err != nil || maxMemoryEnvI == 0 {
maxMemory = 2 * 1024 * 1024 * 1024
} else {
maxMemory = maxMemoryEnvI * 1024 * 1024
}
} else {
maxMemory = uint64(*maxMemoryArg) * 1024 * 1024
}
var limitFileNo unix.Rlimit
var limitMemLock unix.Rlimit
hasFileNo := unix.Getrlimit(unix.RLIMIT_NOFILE, &limitFileNo) == nil
hasMemLock := unix.Getrlimit(unix.RLIMIT_MEMLOCK, &limitMemLock) == nil
log.WithFields(log.Fields{
"LimitsError": !hasFileNo || !hasMemLock,
"FileLimit": limitFileNo.Cur,
"MemoryLimit": limitMemLock.Cur,
}).Info("OS limits")
if hasFileNo && limitFileNo.Cur < limitFileNo.Max {
limitFileNo.Cur = limitFileNo.Max
if err := unix.Setrlimit(unix.RLIMIT_NOFILE, &limitFileNo); err == nil {
log.WithFields(log.Fields{
"Value": limitFileNo.Cur,
}).Info("Raised file limit")
}
}
if hasMemLock && (limitMemLock.Cur < maxMemory) {
limitMemLock.Max = maxMemory
limitMemLock.Cur = limitMemLock.Max
if err := unix.Setrlimit(unix.RLIMIT_MEMLOCK, &limitMemLock); err == nil {
log.WithFields(log.Fields{
"Value": limitMemLock.Cur,
}).Info("Raised memory-lock limit")
} else {
log.WithFields(log.Fields{
"Value": limitMemLock.Cur,
}).Warn("Failed to raise memory-lock limit.")
}
}
log.Info("Consult sysctl vm.max_map_count value. It must be larger than the number of objects found on all disks combined.")
//
// Initial execution with progress indication as it is slow
//
log.Info("Looking for metadata files")
stats, err := executeCycle(volumes, true, lockStats{})
if err != nil {
log.Fatal(err)
}
log.WithFields(log.Fields{
"FileCount": stats.objLockedCount,
"LockedSizeMB": (stats.objAddedLockedSize + 1024*1024 - 1) / (1024 * 1024),
}).Info("Found metadata files to be soon locked into RAM")
log.Info("Locking all metadata files into RAM")
stats, err = executeCycle(volumes, false, stats)
if err != nil {
log.Fatal(err)
}
log.WithFields(log.Fields{
"LockedCount": stats.objLockedCount,
"FailCount": stats.objNotLockedCount,
"LockedSizeMB": (stats.objAddedLockedSize + 1024*1024 - 1) / (1024 * 1024),
}).Info("Locked metadata files into RAM")
//
// Execute
//
defer log.WithFields(log.Fields{"Junk": junkCounter}).Info("Stopping")
log.WithFields(log.Fields{"Delay": delay.String()}).Info("Will re-lock metadata files after each delay until killed")
for {
time.Sleep(delay)
log.Debug("Re-locking metadata files")
stats, err := executeCycle(volumes, false, lockStats{})
if err != nil {
log.Fatal(err)
}
log.WithFields(log.Fields{
"AddedCount": stats.objAddedCount,
"RemovedCount": stats.objRemovedCount,
"FailCount": stats.objNotLockedCount,
"NewlyLockedSizeMB": (stats.objAddedLockedSize + 1024*1024 - 1) / (1024 * 1024),
}).Debug("Locked new and removed stale metadata files.")
}
}
func executeCycle(volumes []volume, onlyStats bool, prevStats lockStats) (lockStats, error) {
var waitGroup sync.WaitGroup
var waitCounter int32 = int32(len(volumes))
var mux sync.Mutex
waitGroup.Add(len(volumes))
var stats lockStats
var processErrors []error
// Execute each volume in parallel as they shall be separate devices
// so data reads do not contend with each other.
for i := range volumes {
go func(volume *volume, onlyStats bool, stats *lockStats) {
defer waitGroup.Done()
defer atomic.AddInt32(&waitCounter, -1)
err := processVolume(volume, onlyStats, stats)
if err != nil {
mux.Lock()
processErrors = append(processErrors, err)
mux.Unlock()
}
}(&volumes[i], onlyStats, &stats)
}
// Report progress if possible
if prevStats.objLockedCount != 0 {
const cyclesBeforeNextPrint = 5 * 60
lastPrint := 0
lastProgress := 0
for waitCounter > 0 {
currentProgress := int((stats.objLockedCount * 100) / prevStats.objLockedCount)
if lastProgress != currentProgress && lastPrint > cyclesBeforeNextPrint {
log.Info("..." + strconv.FormatInt(int64(currentProgress), 10) + " %")
lastProgress = currentProgress
lastPrint = 0
}
time.Sleep(200 * time.Millisecond)
lastPrint++
}
} else {
waitGroup.Wait()
}
if processErrors != nil {
return lockStats{}, errors.Join(processErrors...)
} else {
return stats, nil
}
}
func processVolume(volume *volume, onlyStats bool, stats *lockStats) error {
dir, err := os.Open(volume.path)
if err != nil {
return err
}
if volume.objectMetas == nil {
volume.objectMetas = make(map[uint64]*metaFile)
}
var stack stack[dirSearch]
stack = stack.Push(dirSearch{dir: dir, path: volume.path})
for !stack.IsEmpty() {
var dir dirSearch
stack, dir = stack.Pop()
if len(dir.entriesBatch) == 0 {
// Get next batch of directory entries
names, err := dir.dir.Readdirnames(1024)
if err == io.EOF {
dir.dir.Close()
continue
} else if err != nil {
log.WithFields(log.Fields{"Directory": dir.path, "Error": err}).Warn("Failed to enumerate directory. Skipping.")
dir.dir.Close()
continue
}
dir.entriesBatch = names
}
var batchDone bool = true
for i, name := range dir.entriesBatch {
// Objects are directories that contain xl.meta file
// PERF: Most probable case is an object
metaPath := filepath.Join(dir.path, name, metaFileName)
metaInfo, err := os.Stat(metaPath)
if err == nil {
// We found an object
// Note: We do not descend further into the directory structure
metaInfoEx, ok := metaInfo.Sys().(*syscall.Stat_t)
if !ok {
return fmt.Errorf("must be running on a POSIX OS")
}
if meta, ok := volume.objectMetas[metaInfoEx.Ino]; ok {
atomic.AddInt32(&stats.objLockedCount, 1)
meta.visited = true
} else if onlyStats {
// Collect statistics only
atomic.AddInt32(&stats.objLockedCount, 1)
atomic.AddInt32(&stats.objAddedCount, 1)
var sizeToLock int64
if metaInfo.Size() < maxMetaLockSize {
sizeToLock = metaInfo.Size()
} else {
sizeToLock = maxMetaLockSize
}
atomic.AddInt64(&stats.objAddedLockedSize, (sizeToLock+int64(pageSize)-1)/int64(pageSize)*int64(pageSize))
} else if meta, lockedBytes, err := LockMetaFile(metaPath, metaInfo.Size()); err == nil {
meta.visited = true
volume.objectMetas[metaInfoEx.Ino] = meta
atomic.AddInt32(&stats.objLockedCount, 1)
atomic.AddInt32(&stats.objAddedCount, 1)
atomic.AddInt64(&stats.objAddedLockedSize, lockedBytes)
} else {
atomic.AddInt32(&stats.objNotLockedCount, 1)
continue
}
} else {
// Descend into subdirectories and ignore regular files
entryPath := filepath.Join(dir.path, name)
entryInfo, err := os.Stat(entryPath)
if err != nil {
// Race condition the file/directory disappeared
continue
}
if entryInfo.IsDir() {
subdir, err := os.Open(entryPath)
if err != nil {
// Race condition the file/directory disappeared or no permission
continue
}
// Save state and descend into the directory
dir.entriesBatch = dir.entriesBatch[i+1:]
stack = stack.Push(dir)
stack = stack.Push(dirSearch{dir: subdir, path: entryPath})
batchDone = false
break
}
}
}
if batchDone {
// Continue with another batch
dir.entriesBatch = nil
stack = stack.Push(dir)
}
}
// Remove all not visited items
for i, j := range volume.objectMetas {
if !j.visited {
UnlockMetaFile(j)
delete(volume.objectMetas, i)
atomic.AddInt32(&stats.objRemovedCount, 1)
} else {
// Prepare for the next run
j.visited = false
}
}
return nil
}
func LockMetaFile(path string, size int64) (*metaFile, int64, error) {
file, err := os.Open(path)
if err != nil {
return nil, 0, err
}
defer file.Close()
var sizeToLock int64
if size < maxMetaLockSize {
sizeToLock = size
} else {
sizeToLock = maxMetaLockSize
}
data, err := unix.Mmap(int(file.Fd()), 0, int(sizeToLock), syscall.PROT_READ, syscall.MAP_SHARED)
if err != nil {
return nil, 0, err
}
// Touch each memory page to cause page-faults and bring the
// data into memory if not already there.
dataPageCount := (len(data) + pageSize - 1) / pageSize
for i := 0; i < dataPageCount; i++ {
junkCounter += data[i*pageSize]
}
if err := unix.Mlock(data); err != nil {
unix.Munmap(data)
return nil, 0, err
}
return &metaFile{
mappedData: data,
}, int64(dataPageCount * pageSize), nil
}
func UnlockMetaFile(meta *metaFile) {
unix.Munlock(meta.mappedData)
unix.Munmap(meta.mappedData)
}
type stack[T any] struct {
data []T
}
func (s stack[T]) Push(v T) stack[T] {
return stack[T]{data: append(s.data, v)}
}
func (s stack[T]) Pop() (stack[T], T) {
l := len(s.data)
if l == 0 {
// Empty stack
panic(1)
}
return stack[T]{data: s.data[:l-1]}, s.data[l-1]
}
func (s stack[T]) IsEmpty() bool {
return len(s.data) == 0
}