MaxMemory parameter. Prints progress on lengthy start-up.
This commit is contained in:
125
main.go
125
main.go
@@ -7,6 +7,7 @@ import (
|
||||
"io"
|
||||
"os"
|
||||
"path/filepath"
|
||||
"strconv"
|
||||
"sync"
|
||||
"sync/atomic"
|
||||
"syscall"
|
||||
@@ -22,11 +23,12 @@ const metaFileName = "xl.meta"
|
||||
const maxMetaLockSize = 8 * 1024
|
||||
|
||||
var (
|
||||
logLevelArg = flag.String("log", "", "log level")
|
||||
volumesArg = flag.String("volumes", "", "minio volumes using expansion notation (ie. /mnt/disk{1...3})")
|
||||
delayArg = flag.String("delay", "10m", "file refresh delay")
|
||||
pageSize = os.Getpagesize()
|
||||
junkCounter byte = 0 // to prevent any compiler optimizations
|
||||
logLevelArg = flag.String("log", "", "log level")
|
||||
volumesArg = flag.String("volumes", "", "minio volumes using expansion notation (ie. /mnt/disk{1...3})")
|
||||
delayArg = flag.String("delay", "10m", "file refresh delay")
|
||||
maxMemoryArg = flag.Uint("maxMemory", 2*1024, "maximum memory in MB can be protected as resident")
|
||||
pageSize = os.Getpagesize()
|
||||
junkCounter byte = 0 // to prevent any compiler optimizations
|
||||
)
|
||||
|
||||
var version = "1.0"
|
||||
@@ -109,6 +111,14 @@ func main() {
|
||||
}
|
||||
|
||||
// Try to raise resource limits
|
||||
var maxMemory uint64
|
||||
maxMemoryEnv := os.Getenv("MAX_MEMORY")
|
||||
if maxMemoryEnvI, err := strconv.ParseUint(maxMemoryEnv, 10, 32); err != nil && maxMemoryEnvI != 0 {
|
||||
maxMemory = maxMemoryEnvI * 1024 * 1024
|
||||
} else {
|
||||
maxMemory = uint64(*maxMemoryArg) * 1024 * 1024
|
||||
}
|
||||
|
||||
var limitFileNo unix.Rlimit
|
||||
var limitMemLock unix.Rlimit
|
||||
hasFileNo := unix.Getrlimit(unix.RLIMIT_NOFILE, &limitFileNo) == nil
|
||||
@@ -116,7 +126,7 @@ func main() {
|
||||
log.WithFields(log.Fields{
|
||||
"LimitsError": !hasFileNo || !hasMemLock,
|
||||
"FileLimit": limitFileNo.Cur,
|
||||
"LockLimit": limitMemLock.Cur,
|
||||
"MemoryLimit": limitMemLock.Cur,
|
||||
}).Info("OS limits")
|
||||
|
||||
if hasFileNo && limitFileNo.Cur < limitFileNo.Max {
|
||||
@@ -128,18 +138,37 @@ func main() {
|
||||
}
|
||||
}
|
||||
|
||||
if hasMemLock && (limitMemLock.Cur < 8*1024*1024*1024) {
|
||||
limitMemLock.Max = 8 * 1024 * 1024 * 1024
|
||||
if hasMemLock && (limitMemLock.Cur < maxMemory) {
|
||||
limitMemLock.Max = maxMemory
|
||||
limitMemLock.Cur = limitMemLock.Max
|
||||
if err := unix.Setrlimit(unix.RLIMIT_MEMLOCK, &limitMemLock); err == nil {
|
||||
log.WithFields(log.Fields{
|
||||
"Value": limitMemLock.Cur,
|
||||
}).Info("Raised memory-lock limit")
|
||||
} else {
|
||||
log.WithFields(log.Fields{
|
||||
"Value": limitMemLock.Cur,
|
||||
}).Warn("Failed to raise memory-lock limit.")
|
||||
}
|
||||
}
|
||||
|
||||
log.Info("Consult sysctl vm.max_map_count value. It must be larger than the number of objects found on all disks combined.")
|
||||
|
||||
//
|
||||
// Initial execution with progress indication as it is slow
|
||||
//
|
||||
log.Info("Looking for metadata files")
|
||||
stats, err := executeCycle(volumes, true, lockStats{})
|
||||
if err != nil {
|
||||
log.Fatal(err)
|
||||
}
|
||||
log.WithFields(log.Fields{
|
||||
"FileCount": stats.objLockedCount,
|
||||
"LockedSizeMB": (stats.objAddedLockedSize + 1024*1024 - 1) / (1024 * 1024),
|
||||
}).Info("Found metadata files to be soon locked into RAM")
|
||||
|
||||
log.Info("Locking all metadata files into RAM")
|
||||
stats, err := executeCycle(volumes)
|
||||
stats, err = executeCycle(volumes, false, stats)
|
||||
if err != nil {
|
||||
log.Fatal(err)
|
||||
}
|
||||
@@ -159,7 +188,7 @@ func main() {
|
||||
|
||||
log.Debug("Re-locking metadata files")
|
||||
|
||||
stats, err := executeCycle(volumes)
|
||||
stats, err := executeCycle(volumes, false, lockStats{})
|
||||
if err != nil {
|
||||
log.Fatal(err)
|
||||
}
|
||||
@@ -173,8 +202,9 @@ func main() {
|
||||
}
|
||||
}
|
||||
|
||||
func executeCycle(volumes []volume) (lockStats, error) {
|
||||
func executeCycle(volumes []volume, onlyStats bool, prevStats lockStats) (lockStats, error) {
|
||||
var waitGroup sync.WaitGroup
|
||||
var waitCounter int32 = int32(len(volumes))
|
||||
var mux sync.Mutex
|
||||
waitGroup.Add(len(volumes))
|
||||
|
||||
@@ -184,24 +214,38 @@ func executeCycle(volumes []volume) (lockStats, error) {
|
||||
// Execute each volume in parallel as they shall be separate devices
|
||||
// so data reads do not contend with each other.
|
||||
for i := range volumes {
|
||||
go func(volume *volume) {
|
||||
go func(volume *volume, onlyStats bool, stats *lockStats) {
|
||||
defer waitGroup.Done()
|
||||
statsVol, err := processVolume(volume)
|
||||
defer atomic.AddInt32(&waitCounter, -1)
|
||||
err := processVolume(volume, onlyStats, stats)
|
||||
if err != nil {
|
||||
mux.Lock()
|
||||
processErrors = append(processErrors, err)
|
||||
mux.Unlock()
|
||||
} else {
|
||||
atomic.AddInt32(&stats.objLockedCount, statsVol.objLockedCount)
|
||||
atomic.AddInt32(&stats.objNotLockedCount, statsVol.objNotLockedCount)
|
||||
atomic.AddInt32(&stats.objAddedCount, statsVol.objAddedCount)
|
||||
atomic.AddInt32(&stats.objRemovedCount, statsVol.objRemovedCount)
|
||||
atomic.AddInt64(&stats.objAddedLockedSize, statsVol.objAddedLockedSize)
|
||||
}
|
||||
}(&volumes[i])
|
||||
time.Sleep(500)
|
||||
}(&volumes[i], onlyStats, &stats)
|
||||
}
|
||||
|
||||
waitGroup.Wait()
|
||||
// Report progress if possible
|
||||
if prevStats.objLockedCount != 0 {
|
||||
const cyclesBeforeNextPrint = 5 * 60
|
||||
lastPrint := 0
|
||||
lastProgress := 0
|
||||
for waitCounter > 0 {
|
||||
currentProgress := int((stats.objLockedCount * 100) / prevStats.objLockedCount)
|
||||
if lastProgress != currentProgress && lastPrint > cyclesBeforeNextPrint {
|
||||
log.Info("..." + strconv.FormatInt(int64(currentProgress), 10) + " %")
|
||||
lastProgress = currentProgress
|
||||
lastPrint = 0
|
||||
}
|
||||
time.Sleep(200 * time.Millisecond)
|
||||
lastPrint++
|
||||
}
|
||||
|
||||
} else {
|
||||
waitGroup.Wait()
|
||||
}
|
||||
|
||||
if processErrors != nil {
|
||||
return lockStats{}, errors.Join(processErrors...)
|
||||
@@ -210,20 +254,16 @@ func executeCycle(volumes []volume) (lockStats, error) {
|
||||
}
|
||||
}
|
||||
|
||||
func processVolume(volume *volume) (lockStats, error) {
|
||||
func processVolume(volume *volume, onlyStats bool, stats *lockStats) error {
|
||||
dir, err := os.Open(volume.path)
|
||||
if err != nil {
|
||||
return lockStats{}, err
|
||||
return err
|
||||
}
|
||||
|
||||
if volume.objectMetas == nil {
|
||||
volume.objectMetas = make(map[uint64]*metaFile)
|
||||
}
|
||||
|
||||
var objNewCount int32 = 0
|
||||
var objExistingCount int32 = 0
|
||||
var objFailCount int32 = 0
|
||||
var objNewLockedSize int64 = 0
|
||||
var stack stack[dirSearch]
|
||||
|
||||
stack = stack.Push(dirSearch{dir: dir, path: volume.path})
|
||||
@@ -239,7 +279,7 @@ func processVolume(volume *volume) (lockStats, error) {
|
||||
dir.dir.Close()
|
||||
continue
|
||||
} else if err != nil {
|
||||
return lockStats{}, err
|
||||
return err
|
||||
}
|
||||
dir.entriesBatch = names
|
||||
}
|
||||
@@ -255,19 +295,25 @@ func processVolume(volume *volume) (lockStats, error) {
|
||||
// Note: We do not descend further into the directory structure
|
||||
metaInfoEx, ok := metaInfo.Sys().(*syscall.Stat_t)
|
||||
if !ok {
|
||||
return lockStats{}, fmt.Errorf("must be running on a POSIX OS")
|
||||
return fmt.Errorf("must be running on a POSIX OS")
|
||||
}
|
||||
|
||||
if meta, ok := volume.objectMetas[metaInfoEx.Ino]; ok {
|
||||
objExistingCount++
|
||||
atomic.AddInt32(&stats.objLockedCount, 1)
|
||||
meta.visited = true
|
||||
} else if onlyStats {
|
||||
// Collect statistics only
|
||||
atomic.AddInt32(&stats.objLockedCount, 1)
|
||||
atomic.AddInt32(&stats.objAddedCount, 1)
|
||||
atomic.AddInt64(&stats.objAddedLockedSize, (metaInfo.Size()+int64(pageSize)-1)/int64(pageSize))
|
||||
} else if meta, lockedBytes, err := LockMetaFile(metaPath, metaInfo.Size()); err == nil {
|
||||
meta.visited = true
|
||||
volume.objectMetas[metaInfoEx.Ino] = meta
|
||||
objNewCount++
|
||||
objNewLockedSize += lockedBytes
|
||||
atomic.AddInt32(&stats.objLockedCount, 1)
|
||||
atomic.AddInt32(&stats.objAddedCount, 1)
|
||||
atomic.AddInt64(&stats.objAddedLockedSize, lockedBytes)
|
||||
} else {
|
||||
objFailCount++
|
||||
atomic.AddInt32(&stats.objNotLockedCount, 1)
|
||||
continue
|
||||
}
|
||||
|
||||
@@ -305,25 +351,18 @@ func processVolume(volume *volume) (lockStats, error) {
|
||||
}
|
||||
|
||||
// Remove all not visited items
|
||||
var objDeleteCount int32 = 0
|
||||
for i, j := range volume.objectMetas {
|
||||
if !j.visited {
|
||||
UnlockMetaFile(j)
|
||||
delete(volume.objectMetas, i)
|
||||
objDeleteCount++
|
||||
atomic.AddInt32(&stats.objRemovedCount, 1)
|
||||
} else {
|
||||
// Prepare for the next run
|
||||
j.visited = false
|
||||
}
|
||||
}
|
||||
|
||||
return lockStats{
|
||||
objLockedCount: objNewCount + objExistingCount,
|
||||
objAddedCount: objNewCount,
|
||||
objNotLockedCount: objFailCount,
|
||||
objAddedLockedSize: objNewLockedSize,
|
||||
objRemovedCount: objDeleteCount,
|
||||
}, nil
|
||||
return nil
|
||||
}
|
||||
|
||||
func LockMetaFile(path string, size int64) (*metaFile, int64, error) {
|
||||
@@ -359,7 +398,7 @@ func LockMetaFile(path string, size int64) (*metaFile, int64, error) {
|
||||
|
||||
return &metaFile{
|
||||
mappedData: data,
|
||||
}, sizeToLock, nil
|
||||
}, int64(dataPageCount * pageSize), nil
|
||||
}
|
||||
|
||||
func UnlockMetaFile(meta *metaFile) {
|
||||
|
||||
Reference in New Issue
Block a user