feat: refactor watchdog

This commit is contained in:
2026-04-28 17:18:14 +02:00
parent f589ae4faf
commit 0a98ee455f
11 changed files with 261 additions and 178 deletions
+1 -1
View File
@@ -15,7 +15,7 @@ import (
// All error messages from slog must have an error field with the golang error
// See bottom the the kickoff function for details
func KickoffApi(logger *slog.Logger, env bootstrap.Environment, db *gorm.DB) {
func Kickoff(logger *slog.Logger, env bootstrap.Environment, db *gorm.DB) {
gin.SetMode(gin.ReleaseMode)
// For a nice looking logger:
-16
View File
@@ -8,22 +8,6 @@ import (
flag "github.com/spf13/pflag"
)
type Environment struct {
Version string `env:"VERSION" default:"0.0.1" flag:"version" usage:"option to specify a custom version"`
Codename string `env:"CODENAME" default:"Magical Anomaly" flag:"codename" usage:"option to change the release codename"`
LogLevel string `env:"LOG_LEVEL" default:"debug" flag:"log-level" usage:"option to change the loglevel"`
DataDirectory string `env:"DATA_DIR" default:"./data" flag:"data-dir" usage:"option to specify where the state data gets stored"`
ContentDirectory string `env:"CONTENT_DIR" default:"./content" flag:"content-dir" usage:"option to specify where the content gets stored"`
Hostname string `env:"HOSTNAME" default:"0.0.0.0" flag:"hostname" usage:"option to specify the address/hostname to bind the api server to"`
Port int `env:"PORT" default:"8080" flag:"port" usage:"option to specify the port to bind the api server to"`
Authentication bool `env:"AUTHENTICATION" default:"true" flag:"authentication" usage:"option to disable authentication"`
Watchdog bool `env:"WATCHDOG" default:"true" flag:"watchdog" usage:"option to disable watchdog"`
WatchdogInterval int `env:"WATCHDOG_INTERVAL" default:"60" flag:"watchdog-interval" usage:"option to specify the interval in second(s) on which watchdog runs"`
WatchdogSyncMode string `env:"WATCHDOG_SYNC_MODE" default:"strict" flag:"watchdog-mode" usage:"option to specify the mode watchdog will run with: strict|sync|dry"`
}
func loadFromEnv(env *Environment) {
v := reflect.ValueOf(env).Elem()
t := v.Type()
+18
View File
@@ -0,0 +1,18 @@
package bootstrap
type Environment struct {
Version string `env:"VERSION" default:"0.0.1" flag:"version" usage:"option to specify a custom version"`
Codename string `env:"CODENAME" default:"Magical Anomaly" flag:"codename" usage:"option to change the release codename"`
LogLevel string `env:"LOG_LEVEL" default:"debug" flag:"log-level" usage:"option to change the loglevel"`
DataDirectory string `env:"DATA_DIR" default:"./data" flag:"data-dir" usage:"option to specify where the state data gets stored"`
ContentDirectory string `env:"CONTENT_DIR" default:"./content" flag:"content-dir" usage:"option to specify where the content gets stored"`
Hostname string `env:"HOSTNAME" default:"0.0.0.0" flag:"hostname" usage:"option to specify the address/hostname to bind the api server to"`
Port int `env:"PORT" default:"8080" flag:"port" usage:"option to specify the port to bind the api server to"`
Authentication bool `env:"AUTHENTICATION" default:"true" flag:"authentication" usage:"option to disable authentication"`
AdminKey string `env:"ADMIN_KEY" default:"" flag:"admin-key" usage:"option to specify a custom admin top-level authentication key"`
Watchdog bool `env:"WATCHDOG" default:"true" flag:"watchdog" usage:"option to disable watchdog"`
WatchdogInterval int `env:"WATCHDOG_INTERVAL" default:"60" flag:"watchdog-interval" usage:"option to specify the interval in second(s) on which watchdog runs"`
WatchdogSyncMode string `env:"WATCHDOG_SYNC_MODE" default:"strict" flag:"watchdog-mode" usage:"option to specify the mode watchdog will run with: strict|sync|dry"`
}
+1 -20
View File
@@ -1,10 +1,8 @@
package database
import (
"orbits-server/internal/server/bootstrap"
"orbits-server/internal/shared/utility"
"path/filepath"
"time"
"gorm.io/driver/sqlite"
"gorm.io/gorm"
@@ -13,7 +11,7 @@ import (
//var watchdogStop = make(chan struct{})
func KickoffDatabase(workDir string) (*gorm.DB, error) {
func Kickoff(workDir string) (*gorm.DB, error) {
dbLoc := filepath.Join(workDir, "station.db")
db, err := gorm.Open(sqlite.Open(dbLoc), &gorm.Config{
Logger: logger.Discard, // disable gorm logging since its not slog (yet)
@@ -45,20 +43,3 @@ func KickoffDatabase(workDir string) (*gorm.DB, error) {
return db, nil
}
func KickoffDatabaseWatchdog(env bootstrap.Environment, db *gorm.DB) {
timeInterval := time.Second * time.Duration(env.WatchdogInterval)
ticker := time.NewTicker(timeInterval)
go func() {
defer ticker.Stop()
// run the watchdog function once to see if all is well.
watchdog(env, db)
// then defer to a decoupled/disowned golang goroutine
for range ticker.C {
watchdog(env, db)
}
}()
}
+60 -2
View File
@@ -4,12 +4,60 @@ import (
"gorm.io/gorm"
)
/*
State functions
*/
func LatestState(db *gorm.DB) (Command, error) {
var state Command
err := db.Last(&state).Error
return state, err
}
/*
Key functions
*/
func CountKeys(db *gorm.DB) (int64, error) {
var count int64
err := db.Model(&AccessKey{}).Count(&count).Error
return count, err
}
func ListKeys(db *gorm.DB) ([]AccessKey, error) {
var keys []AccessKey
err := db.Find(&keys).Error
return keys, err
}
func CreateKey(db *gorm.DB, k AccessKey) error {
return db.Create(&k).Error
}
func DeleteKeyByID(db *gorm.DB, id int) error {
res := db.Delete(&AccessKey{}, id)
if res.Error != nil {
return res.Error
}
if res.RowsAffected == 0 {
return gorm.ErrRecordNotFound
}
return nil
}
/*
File functions
*/
func CountFiles(db *gorm.DB) (int64, error) {
var count int64
err := db.Model(&File{}).Count(&count).Error
return count, err
}
func ListFiles(db *gorm.DB) ([]File, error) {
var files []File
err := db.Find(&files).Error
@@ -26,6 +74,16 @@ func CreateFile(db *gorm.DB, f File) error {
return db.Create(&f).Error
}
func DeleteFile(db *gorm.DB, f File) error {
return db.Delete(&f).Error
func DeleteFileByID(db *gorm.DB, id int) error {
res := db.Delete(&File{}, id)
if res.Error != nil {
return res.Error
}
if res.RowsAffected == 0 {
return gorm.ErrRecordNotFound
}
return nil
}
@@ -33,7 +33,7 @@ type AccessKey struct {
MetaName string
KeyName string `gorm:"not null;"`
// We don't store the key itself, we hash the key
KeyHash string `gorm:"not null;"`
KeyHash string `gorm:"uniqueIndex;not null;"`
// we're cooking without pepper
Timestamps
}
-134
View File
@@ -1,134 +0,0 @@
package database
import (
"errors"
"log/slog"
"orbits-server/internal/server/bootstrap"
"orbits-server/internal/shared/utility"
"os"
"path/filepath"
"gorm.io/gorm"
)
func filesystemGather(env bootstrap.Environment) (map[string]struct{}, error) {
fsFiles, err := os.ReadDir(env.ContentDirectory)
if err != nil {
slog.Error("failed to read the content directory on the filesystem", "error", err)
return nil, err
}
// generate a set of filesystem contents
fsSet := make(map[string]struct{}) // cool name for the files that are (now) marked for annihilation
for _, f := range fsFiles {
// absolute path creation
fullPath := filepath.Join(env.ContentDirectory, f.Name())
fsSet[fullPath] = struct{}{}
}
return fsSet, nil
}
func databaseGather(db *gorm.DB) (map[string]File, error) {
dbFiles, err := ListFiles(db)
if err != nil {
slog.Error("failed to retrieve the files indexed from the database", "error", err)
return nil, err
}
// generate a set of Database contents
dbSet := make(map[string]File) // cool name for the files that are going to be deregistered from the database
for _, f := range dbFiles {
dbSet[f.FilePath] = f
}
return dbSet, nil
}
func watchdog(env bootstrap.Environment, db *gorm.DB) {
slog.Debug("performing the watchdog cycle")
fsSet, err := filesystemGather(env)
if err != nil {
return
}
dbSet, err := databaseGather(db)
if err != nil {
return
}
// FS -> DB
// check for orphaned filesystem files
var fsOrphans []string
for path := range fsSet {
if _, exists := dbSet[path]; !exists {
fsOrphans = append(fsOrphans, path)
}
}
if len(fsOrphans) > 0 {
slog.Info("filesystem orphans detected, engaging flow")
//filepath is stored in the slice, the filename or file object, see above
for _, fp := range fsOrphans {
// this switch is guarded by the environment.go its check making sure its one of the three
// the following logic is used to actually perform the sync modes, removal, enrollment or ignore
switch env.WatchdogSyncMode {
case "sync":
readerStream, err := os.Open(fp)
if err != nil {
slog.Error("failed to a reader stream")
continue
}
defer readerStream.Close()
fileData, err := BuildFileRecord(readerStream, filepath.Base(fp), env.ContentDirectory)
if err != nil {
slog.Error("failed to enroll local file into the database", "error", err)
continue
}
if err := CreateFile(db, fileData); err != nil {
if errors.Is(err, gorm.ErrDuplicatedKey) {
slog.Debug("discarding file since its a duplicate", "error", err)
} else {
slog.Error("failed to insert filedata to the database", "error", err)
}
continue
}
// to fully finalize the enrollment process, we rename the locally inserted file to a unique filename
// this is to make all files comply, wether uploaded via the api or locally inserted with the filesystem
if err := os.Rename(fp, fileData.FilePath); err != nil {
slog.Error("failed to move the locally inserted file", "error", err)
}
case "strict":
err := utility.RemoveFile(fp)
if err != nil {
slog.Error("failed to remove local file from the filesystem", "error", err)
}
case "dry":
slog.Debug("dry mode enabled, not purging", "filepath", fp)
default:
slog.Warn("unknown watchdog mode", "mode", env.WatchdogSyncMode)
}
}
}
// DB -> FS
// check stale database records
var dbdbOrphans []File
for path, f := range dbSet {
if _, exists := fsSet[path]; !exists {
dbdbOrphans = append(dbdbOrphans, f)
}
}
if len(dbdbOrphans) > 0 {
slog.Info("database orphans detected, engaging flow")
for _, f := range dbdbOrphans {
DeleteFile(db, f)
}
}
}
+57
View File
@@ -0,0 +1,57 @@
package watchdog
import (
"log/slog"
"orbits-server/internal/server/bootstrap"
"orbits-server/internal/server/database"
"orbits-server/internal/shared/utility"
"os"
"path/filepath"
"gorm.io/gorm"
)
func applyFS(env bootstrap.Environment, db *gorm.DB, fsOrphans []string) {
for _, fp := range fsOrphans {
switch env.WatchdogSyncMode {
case "sync":
f, err := os.Open(fp)
if err != nil {
continue
}
func() {
defer f.Close()
fileData, err := database.BuildFileRecord(
f,
filepath.Base(fp),
env.ContentDirectory,
)
if err != nil {
return
}
database.CreateFile(db, fileData)
os.Rename(fp, fileData.FilePath)
}()
case "strict":
utility.RemoveFile(fp)
case "dry":
slog.Debug("dry mode", "file", fp)
}
}
}
func applyDB(db *gorm.DB, dbOrphans []database.File) {
for _, f := range dbOrphans {
err := database.DeleteFileByID(db, f.ID)
if err != nil {
slog.Error("failed to apply database measures", "error", err)
}
}
}
+57
View File
@@ -0,0 +1,57 @@
package watchdog
import (
"orbits-server/internal/server/bootstrap"
"orbits-server/internal/server/database"
"os"
"path/filepath"
"gorm.io/gorm"
)
func reconcile(fs map[string]struct{}, db map[string]database.File) Result {
r := Result{}
for path := range fs {
if _, ok := db[path]; !ok {
r.FSOrphans = append(r.FSOrphans, path)
}
}
for path, f := range db {
if _, ok := fs[path]; !ok {
r.DBOrphans = append(r.DBOrphans, f)
}
}
return r
}
func scanFS(env bootstrap.Environment) (map[string]struct{}, error) {
fsFiles, err := os.ReadDir(env.ContentDirectory)
if err != nil {
return nil, err
}
res := make(map[string]struct{})
for _, f := range fsFiles {
full := filepath.Join(env.ContentDirectory, f.Name())
res[full] = struct{}{}
}
return res, nil
}
func scanDB(db *gorm.DB) (map[string]database.File, error) {
files, err := database.ListFiles(db)
if err != nil {
return nil, err
}
res := make(map[string]database.File)
for _, f := range files {
res[f.FilePath] = f
}
return res, nil
}
+61
View File
@@ -0,0 +1,61 @@
package watchdog
import (
"log/slog"
"orbits-server/internal/server/bootstrap"
"orbits-server/internal/server/database"
"time"
"gorm.io/gorm"
)
type State struct {
FS map[string]struct{}
DB map[string]database.File
}
type Result struct {
FSOrphans []string
DBOrphans []database.File
}
func Kickoff(env bootstrap.Environment, db *gorm.DB) {
interval := time.Second * time.Duration(env.WatchdogInterval)
ticker := time.NewTicker(interval)
go func() {
defer ticker.Stop()
run(env, db)
for range ticker.C {
run(env, db)
}
}()
}
func run(env bootstrap.Environment, db *gorm.DB) {
slog.Debug("watchdog cycle start")
fsState, err := scanFS(env)
if err != nil {
return
}
dbState, err := scanDB(db)
if err != nil {
return
}
result := reconcile(fsState, dbState)
if len(result.FSOrphans) > 0 {
slog.Info("filesystem orphans detected")
applyFS(env, db, result.FSOrphans)
}
if len(result.DBOrphans) > 0 {
slog.Info("database orphans detected")
applyDB(db, result.DBOrphans)
}
}