feat: add locally syncing and watchdog
This commit is contained in:
@@ -9,21 +9,6 @@ import (
|
||||
"gorm.io/gorm"
|
||||
)
|
||||
|
||||
// 0: unspecified
|
||||
// 1: video
|
||||
// 2: presentation
|
||||
// 3: internet URL
|
||||
func categorizeFilemode(ext string) database.MediaType {
|
||||
switch ext {
|
||||
case ".mp4", ".mov", ".avi", ".mkv", ".webm", ".m4a":
|
||||
return database.Video
|
||||
case ".pptx", ".ppt", ".key", ".odp":
|
||||
return database.Presentation
|
||||
default:
|
||||
return database.Unspecified
|
||||
}
|
||||
}
|
||||
|
||||
func spawnApiRoutes(api *gin.RouterGroup /* env runtime.Environment,*/, db *gorm.DB) {
|
||||
// prefix: api
|
||||
// Display the information on what is going on at the moment
|
||||
|
||||
+15
-36
@@ -1,7 +1,6 @@
|
||||
package api
|
||||
|
||||
import (
|
||||
"eden-server/internal/crypto"
|
||||
"eden-server/internal/database"
|
||||
"eden-server/internal/utility"
|
||||
"errors"
|
||||
@@ -10,7 +9,6 @@ import (
|
||||
"path/filepath"
|
||||
|
||||
"github.com/gin-gonic/gin"
|
||||
"github.com/google/uuid"
|
||||
"gorm.io/gorm"
|
||||
)
|
||||
|
||||
@@ -35,57 +33,38 @@ func spawnFileRoutes(file *gin.RouterGroup, env utility.Environment, db *gorm.DB
|
||||
return
|
||||
}
|
||||
|
||||
checksum, err := crypto.CalculateHashFromRequest(f)
|
||||
readerStream, err := f.Open()
|
||||
if err != nil {
|
||||
slog.Error("failed to calculate hash of file at given path", "error", err)
|
||||
slog.Error("failed to open uploaded file in memory")
|
||||
}
|
||||
defer readerStream.Close()
|
||||
|
||||
fileData, err := database.BuildFileRecord(readerStream, f.Filename, env.ContentDirectory)
|
||||
if err != nil {
|
||||
slog.Error("failed to enroll file to the database", "error", err)
|
||||
c.JSON(http.StatusInternalServerError, RespObj{
|
||||
Msg: ieMes,
|
||||
})
|
||||
return
|
||||
}
|
||||
|
||||
e := filepath.Ext(f.Filename)
|
||||
m := categorizeFilemode(e)
|
||||
if m == database.Unspecified {
|
||||
slog.Debug("discarding file since its filetype is unsupported")
|
||||
c.JSON(http.StatusUnsupportedMediaType, RespObj{
|
||||
Msg: "unsupported filetype",
|
||||
})
|
||||
return
|
||||
}
|
||||
|
||||
safeName := uuid.New().String() + "_" + string(m) + e
|
||||
destPath := filepath.Join(env.DataDirectory, "content", safeName)
|
||||
|
||||
fData := database.File{
|
||||
MediaType: m,
|
||||
MetaName: f.Filename,
|
||||
FileName: safeName,
|
||||
FilePath: destPath,
|
||||
Checksum: checksum,
|
||||
}
|
||||
|
||||
// first register the file to the database
|
||||
// error out if something bad happens (duplicate, failing db, etc)
|
||||
err = database.RegisterFile(db, fData)
|
||||
if err != nil {
|
||||
if err := database.RegisterFile(db, fileData); err != nil {
|
||||
if errors.Is(err, gorm.ErrDuplicatedKey) {
|
||||
slog.Debug("discarding file since its a duplicate", "error", err)
|
||||
c.JSON(http.StatusConflict, RespObj{
|
||||
Msg: "file is a duplicate",
|
||||
})
|
||||
return
|
||||
} else {
|
||||
slog.Error("failed to insert filedata to the database", "error", err)
|
||||
c.JSON(http.StatusInternalServerError, RespObj{
|
||||
Msg: ieMes,
|
||||
})
|
||||
}
|
||||
|
||||
slog.Error("discarding file since an error occured", "error", err)
|
||||
c.JSON(http.StatusInternalServerError, RespObj{
|
||||
Msg: ieMes,
|
||||
})
|
||||
return
|
||||
}
|
||||
|
||||
// save to filesystem after everything has given a green light
|
||||
if err := c.SaveUploadedFile(f, destPath); err != nil {
|
||||
if err := c.SaveUploadedFile(f, fileData.FilePath); err != nil {
|
||||
slog.Error("failed to receive the file over http:", "error", err)
|
||||
c.JSON(http.StatusInternalServerError, RespObj{
|
||||
Msg: ieMes,
|
||||
|
||||
@@ -1,26 +0,0 @@
|
||||
package crypto
|
||||
|
||||
import (
|
||||
"crypto/sha512"
|
||||
"encoding/hex"
|
||||
"io"
|
||||
"mime/multipart"
|
||||
)
|
||||
|
||||
func CalculateHashFromRequest(fileHeader *multipart.FileHeader) (string, error) {
|
||||
src, err := fileHeader.Open()
|
||||
if err != nil {
|
||||
return "", err
|
||||
}
|
||||
defer src.Close()
|
||||
|
||||
h := sha512.New()
|
||||
if _, err := io.Copy(h, src); err != nil {
|
||||
return "", err
|
||||
}
|
||||
|
||||
// return the sha checksum in hex
|
||||
return hex.EncodeToString(h.Sum(nil)), nil
|
||||
// alternatively return in base64
|
||||
//return base64.StdEncoding.EncodeToString(h.Sum(nil)), nil
|
||||
}
|
||||
@@ -2,7 +2,6 @@ package database
|
||||
|
||||
import (
|
||||
"eden-server/internal/utility"
|
||||
"log/slog"
|
||||
"path/filepath"
|
||||
"time"
|
||||
|
||||
@@ -24,7 +23,6 @@ func KickoffDatabase(workDir string) (*gorm.DB, error) {
|
||||
}
|
||||
|
||||
// try to use GORM automigrate if the schema changes
|
||||
slog.Info("performing migration")
|
||||
if err := db.AutoMigrate(
|
||||
&State{},
|
||||
&Device{},
|
||||
@@ -36,7 +34,7 @@ func KickoffDatabase(workDir string) (*gorm.DB, error) {
|
||||
// create the first row if it does not exist yet
|
||||
if err := db.FirstOrCreate(&State{}, State{
|
||||
ID: 0,
|
||||
MediaType: "unspecified",
|
||||
MediaType: Unspecified,
|
||||
}).Error; err != nil {
|
||||
return nil, err
|
||||
}
|
||||
@@ -45,23 +43,12 @@ func KickoffDatabase(workDir string) (*gorm.DB, error) {
|
||||
}
|
||||
|
||||
func KickoffDatabaseWatchdog(env utility.Environment, db *gorm.DB) {
|
||||
timeInterval := time.Second * time.Duration(env.WatchInterval)
|
||||
timeInterval := time.Second * time.Duration(env.WatchdogInterval)
|
||||
ticker := time.NewTicker(timeInterval)
|
||||
|
||||
go func() {
|
||||
defer ticker.Stop()
|
||||
|
||||
/*
|
||||
// Possible future mechanism to stop the watchdog
|
||||
// must be inside a non-conditional for loop
|
||||
select {
|
||||
case <-ticker.C: // ticker event
|
||||
watchdog(env.DataDirectory, db)
|
||||
case <-watchdogStop:
|
||||
return
|
||||
}
|
||||
*/
|
||||
|
||||
// run the watchdog function once to see if all is well.
|
||||
watchdog(env, db)
|
||||
// then defer to a decoupled/disowned golang goroutine
|
||||
|
||||
+47
-21
@@ -2,49 +2,75 @@ package database
|
||||
|
||||
import (
|
||||
"time"
|
||||
|
||||
"gorm.io/datatypes"
|
||||
)
|
||||
|
||||
type MediaType string
|
||||
|
||||
const (
|
||||
Unspecified MediaType = "unspecified"
|
||||
Video MediaType = "video"
|
||||
Presentation MediaType = "presentation"
|
||||
Internet MediaType = "internet"
|
||||
Unspecified MediaType = "unspecified"
|
||||
)
|
||||
|
||||
type Timestamps struct {
|
||||
CreatedAt time.Time `gorm:"not null;"`
|
||||
UpdatedAt time.Time `gorm:"not null;"`
|
||||
ExpiresAt time.Time
|
||||
}
|
||||
|
||||
type State struct {
|
||||
ID int `gorm:"primaryKey"`
|
||||
ID int `gorm:"primaryKey;not null;"`
|
||||
// unspecified
|
||||
// video
|
||||
// presentation
|
||||
// internet URL
|
||||
MediaType MediaType `gorm:"type:varchar(20);not null"` // Must specify what kind of file it is
|
||||
Targets datatypes.JSON
|
||||
Location string // Must be the location where the file is downloadable on the API
|
||||
UpdatedAt time.Time
|
||||
// Must be target list who are compelled to listen to the command
|
||||
// can be none when there is no targets specified (init stage)
|
||||
Targets []string `gorm:"type:json"`
|
||||
// Must be the location where the file is downloadable on the API
|
||||
// can be none when there is no media specified (init stage)
|
||||
Location string
|
||||
Timestamps
|
||||
}
|
||||
|
||||
type Key struct {
|
||||
ID int `gorm:"primaryKey;not null;"`
|
||||
MetaName string
|
||||
KeyName string `gorm:"not null;"`
|
||||
// We don't store the key itself, we hash the key
|
||||
KeyHash string `gorm:"not null;"`
|
||||
// we're cooking without pepper
|
||||
KeySalt string `gorm:"not null;"`
|
||||
CreatedAt time.Time `gorm:"not null;"`
|
||||
Timestamps
|
||||
}
|
||||
|
||||
type Device struct {
|
||||
ID int `gorm:"primaryKey"`
|
||||
ID int `gorm:"primaryKey;not null;"`
|
||||
// Device type is meant as a field where can be specified what type of device this is
|
||||
// eg Raspberry Pi, PC, things like that
|
||||
DeviceType string
|
||||
Hostname string
|
||||
RemoteAddress string
|
||||
Alive bool
|
||||
Compliant bool
|
||||
CreatedAt time.Time
|
||||
UpdatedAt time.Time
|
||||
Hostname string `gorm:"not null;"`
|
||||
RemoteAddress string `gorm:"not null;"`
|
||||
Alive bool `gorm:"not null;"`
|
||||
Compliant bool `gorm:"not null;"`
|
||||
Timestamps
|
||||
}
|
||||
|
||||
type File struct {
|
||||
ID int `gorm:"primaryKey"`
|
||||
ID int `gorm:"primaryKey;not null;"`
|
||||
// unspecified
|
||||
// video
|
||||
// presentation
|
||||
// internet URL
|
||||
MediaType MediaType `gorm:"type:varchar(20);not null;"`
|
||||
MetaName string
|
||||
FileName string
|
||||
FilePath string
|
||||
Checksum string `gorm:"uniqueIndex"` // hex encoded sha512 checksum
|
||||
CreatedAt time.Time
|
||||
UpdatedAt time.Time
|
||||
// the name given by the user
|
||||
MetaName string
|
||||
FileName string `gorm:"not null;"`
|
||||
FilePath string `gorm:"not null;"`
|
||||
// hex encoded sha512 checksum
|
||||
Checksum string `gorm:"uniqueIndex;not null;"`
|
||||
Timestamps
|
||||
}
|
||||
|
||||
@@ -1,13 +1,74 @@
|
||||
package database
|
||||
|
||||
import (
|
||||
"eden-server/internal/utility"
|
||||
"fmt"
|
||||
"io"
|
||||
"log/slog"
|
||||
"path/filepath"
|
||||
|
||||
"github.com/google/uuid"
|
||||
"gorm.io/gorm"
|
||||
)
|
||||
|
||||
// 0: unspecified
|
||||
// 1: video
|
||||
// 2: presentation
|
||||
// 3: internet URL
|
||||
func CategorizeMediaType(ext string) (MediaType, bool) {
|
||||
|
||||
switch ext {
|
||||
case ".mp4", ".mov", ".avi", ".mkv", ".webm", ".m4a":
|
||||
return Video, true
|
||||
case ".pptx", ".ppt", ".key", ".odp":
|
||||
return Presentation, true
|
||||
default:
|
||||
slog.Debug("marking file as invalid undefined extension")
|
||||
return "", false
|
||||
}
|
||||
}
|
||||
|
||||
func GenerateSafeName(category MediaType, ext string) string {
|
||||
return uuid.New().String() + "_" + string(category) + ext
|
||||
}
|
||||
|
||||
// it has been made more general for DRY purposes
|
||||
// this function should only be called after manually checking the filetype
|
||||
func BuildFileRecord(r io.Reader, origName string, contentDirectory string) (File, error) {
|
||||
ext := filepath.Ext(origName)
|
||||
category, ok := CategorizeMediaType(ext)
|
||||
if !ok {
|
||||
return File{}, fmt.Errorf("unsupported filetype")
|
||||
}
|
||||
|
||||
checksum, err := utility.HashReader(r)
|
||||
if err != nil {
|
||||
slog.Error("failed to calculate hash of file at given path", "error", err)
|
||||
return File{}, err
|
||||
}
|
||||
|
||||
safeName := GenerateSafeName(category, ext)
|
||||
destPath := filepath.Join(contentDirectory, safeName)
|
||||
|
||||
fData := File{
|
||||
MediaType: category,
|
||||
MetaName: origName,
|
||||
FileName: safeName,
|
||||
FilePath: destPath,
|
||||
Checksum: checksum,
|
||||
}
|
||||
|
||||
return fData, nil
|
||||
}
|
||||
|
||||
func GetState(db *gorm.DB) (State, error) {
|
||||
var state State
|
||||
|
||||
return state, db.First(&state).Error
|
||||
if err := db.First(&state).Error; err != nil {
|
||||
return State{}, err
|
||||
}
|
||||
|
||||
return state, nil
|
||||
}
|
||||
|
||||
func GetFiles(db *gorm.DB) ([]File, error) {
|
||||
|
||||
@@ -2,6 +2,8 @@ package database
|
||||
|
||||
import (
|
||||
"eden-server/internal/utility"
|
||||
"errors"
|
||||
"fmt"
|
||||
"log/slog"
|
||||
"os"
|
||||
"path/filepath"
|
||||
@@ -9,19 +11,11 @@ import (
|
||||
"gorm.io/gorm"
|
||||
)
|
||||
|
||||
func watchdog(env utility.Environment, db *gorm.DB) {
|
||||
slog.Info("performing the watchdog cycle")
|
||||
|
||||
func filesystemGather(env utility.Environment) (map[string]struct{}, error) {
|
||||
fsFiles, err := os.ReadDir(env.ContentDirectory)
|
||||
if err != nil {
|
||||
slog.Error("failed to read the content directory on the filesystem", "error", err)
|
||||
return
|
||||
}
|
||||
|
||||
dbFiles, err := GetFiles(db)
|
||||
if err != nil {
|
||||
slog.Error("failed to retrieve the files indexed from the database", "error", err)
|
||||
return
|
||||
return nil, err
|
||||
}
|
||||
|
||||
// generate a set of filesystem contents
|
||||
@@ -32,41 +26,104 @@ func watchdog(env utility.Environment, db *gorm.DB) {
|
||||
fsSet[fullPath] = struct{}{}
|
||||
}
|
||||
|
||||
return fsSet, nil
|
||||
}
|
||||
|
||||
func databaseGather(db *gorm.DB) (map[string]File, error) {
|
||||
dbFiles, err := GetFiles(db)
|
||||
if err != nil {
|
||||
slog.Error("failed to retrieve the files indexed from the database", "error", err)
|
||||
return nil, err
|
||||
}
|
||||
|
||||
// generate a set of Database contents
|
||||
dbSet := make(map[string]File) // cool name for the files that are going to be deregistered from the database
|
||||
for _, f := range dbFiles {
|
||||
dbSet[f.FilePath] = f
|
||||
}
|
||||
|
||||
return dbSet, nil
|
||||
}
|
||||
|
||||
func watchdog(env utility.Environment, db *gorm.DB) {
|
||||
slog.Info("performing the watchdog cycle")
|
||||
|
||||
fsSet, err := filesystemGather(env)
|
||||
if err != nil {
|
||||
return
|
||||
}
|
||||
dbSet, err := databaseGather(db)
|
||||
if err != nil {
|
||||
return
|
||||
}
|
||||
|
||||
// FS -> DB
|
||||
// check for orphaned filesystem files
|
||||
var fsPurgeScroll []string
|
||||
var fsOrphans []string
|
||||
for path := range fsSet {
|
||||
if _, exists := dbSet[path]; !exists {
|
||||
fsPurgeScroll = append(fsPurgeScroll, path)
|
||||
fsOrphans = append(fsOrphans, path)
|
||||
}
|
||||
}
|
||||
|
||||
if len(fsOrphans) > 0 {
|
||||
slog.Info("filesystem orphans detected, engaging flow")
|
||||
//filepath is stored in the slice, the filename or file object, see above
|
||||
|
||||
for _, fp := range fsOrphans {
|
||||
// this switch is guarded by the environment.go its check making sure its one of the three
|
||||
// the following logic is used to actually perform the sync modes, removal, enrollment or ignore
|
||||
switch env.WatchdogSyncMode {
|
||||
case "sync":
|
||||
readerStream, err := os.Open(fp)
|
||||
if err != nil {
|
||||
|
||||
}
|
||||
defer readerStream.Close()
|
||||
|
||||
fileData, err := BuildFileRecord(readerStream, filepath.Base(fp), env.ContentDirectory)
|
||||
if err != nil {
|
||||
slog.Error("failed to enroll local file into the database", "error", err)
|
||||
}
|
||||
|
||||
fmt.Println(fileData)
|
||||
|
||||
if err := RegisterFile(db, fileData); err != nil {
|
||||
if errors.Is(err, gorm.ErrDuplicatedKey) {
|
||||
slog.Debug("discarding file since its a duplicate", "error", err)
|
||||
} else {
|
||||
slog.Error("failed to insert filedata to the database", "error", err)
|
||||
}
|
||||
}
|
||||
|
||||
if err := os.Rename(fp, fileData.FilePath); err != nil {
|
||||
slog.Error("failed to move the locally inserted file", "error", err)
|
||||
}
|
||||
|
||||
case "strict":
|
||||
err := utility.RemoveFile(fp)
|
||||
if err != nil {
|
||||
slog.Warn("failed to remove local file from the filesystem", "error", err)
|
||||
}
|
||||
case "dry":
|
||||
slog.Debug("dry mode enabled, not purging", "filepath", fp)
|
||||
}
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
// DB -> FS
|
||||
// check stale database records
|
||||
var dbPurgeScroll []File
|
||||
var dbdbOrphans []File
|
||||
for path, f := range dbSet {
|
||||
if _, exists := fsSet[path]; !exists {
|
||||
dbPurgeScroll = append(dbPurgeScroll, f)
|
||||
dbdbOrphans = append(dbdbOrphans, f)
|
||||
}
|
||||
}
|
||||
|
||||
if len(fsPurgeScroll) > 0 {
|
||||
slog.Info("filesystem purge scroll is populated, engaging purge")
|
||||
//filepath is stored in the slice, the filename or file object, see above
|
||||
for _, fp := range fsPurgeScroll {
|
||||
utility.RemoveFile(fp)
|
||||
}
|
||||
}
|
||||
|
||||
if len(dbPurgeScroll) > 0 {
|
||||
slog.Info("database purge scroll is populated, engaging purge")
|
||||
for _, f := range dbPurgeScroll {
|
||||
if len(dbdbOrphans) > 0 {
|
||||
slog.Info("database orphans detected, engaging flow")
|
||||
for _, f := range dbdbOrphans {
|
||||
DeregisterFile(db, f)
|
||||
}
|
||||
}
|
||||
|
||||
@@ -1,8 +1,10 @@
|
||||
package utility
|
||||
|
||||
import (
|
||||
"log/slog"
|
||||
"os"
|
||||
"path/filepath"
|
||||
"slices"
|
||||
"strconv"
|
||||
)
|
||||
|
||||
@@ -13,14 +15,24 @@ type Environment struct {
|
||||
ContentDirectory string
|
||||
Hostname string
|
||||
Port int
|
||||
WatchInterval int
|
||||
WatchdogInterval int
|
||||
WatchdogSyncMode string
|
||||
}
|
||||
|
||||
var (
|
||||
validSyncModes = []string{
|
||||
"sync",
|
||||
"strict",
|
||||
"dry",
|
||||
}
|
||||
)
|
||||
|
||||
// part of environment checking
|
||||
func safeStringGrab(key, fallback string) string {
|
||||
if v, ok := os.LookupEnv(key); ok {
|
||||
return v
|
||||
}
|
||||
slog.Debug("using fallback", "key", key, "fallback", fallback)
|
||||
return fallback
|
||||
}
|
||||
|
||||
@@ -30,6 +42,17 @@ func safeIntGrab(key string, fallback int) int {
|
||||
return i
|
||||
}
|
||||
}
|
||||
slog.Debug("using fallback", "key", key, "fallback", fallback)
|
||||
return fallback
|
||||
}
|
||||
|
||||
func safeSyncModeGrab(key, fallback string) string {
|
||||
if v, ok := os.LookupEnv(key); ok {
|
||||
if slices.Contains(validSyncModes, v) {
|
||||
return v
|
||||
}
|
||||
}
|
||||
slog.Debug("using fallback", "key", key, "fallback", fallback)
|
||||
return fallback
|
||||
}
|
||||
|
||||
@@ -48,8 +71,11 @@ func GrabEnvironment() Environment {
|
||||
DataDirectory: safeStringGrab("DATA_DIR", fbBase),
|
||||
ContentDirectory: safeStringGrab("CONTENT_DIR", fbContent),
|
||||
Hostname: safeStringGrab("HOSTNAME", "0.0.0.0"),
|
||||
|
||||
Port: safeIntGrab("PORT", 8080),
|
||||
WatchInterval: safeIntGrab("WATCHDOG_INTERVAL", 60),
|
||||
Port: safeIntGrab("PORT", 8080),
|
||||
WatchdogInterval: safeIntGrab("WATCHDOG_INTERVAL", 60),
|
||||
// sync: sync local files to the database, for example when you want to allow local inserting files which then get added to the database
|
||||
// strict: make the database leading, this is when you only allow API uploads to be registered, and remove orphaned filesystem files
|
||||
// dry: do nothing
|
||||
WatchdogSyncMode: safeStringGrab("WATCHDOG_SYNC_MODE", "strict"),
|
||||
}
|
||||
}
|
||||
|
||||
@@ -0,0 +1,41 @@
|
||||
package utility
|
||||
|
||||
import (
|
||||
"crypto/sha512"
|
||||
"encoding/hex"
|
||||
"io"
|
||||
"mime/multipart"
|
||||
"os"
|
||||
)
|
||||
|
||||
func HashReader(r io.Reader) (string, error) {
|
||||
h := sha512.New()
|
||||
if _, err := io.Copy(h, r); err != nil {
|
||||
return "", err
|
||||
}
|
||||
|
||||
// return the sha checksum in hex
|
||||
return hex.EncodeToString(h.Sum(nil)), nil
|
||||
// alternatively return in base64
|
||||
//return base64.StdEncoding.EncodeToString(h.Sum(nil)), nil
|
||||
}
|
||||
|
||||
func HashUpload(fileHeader *multipart.FileHeader) (string, error) {
|
||||
stream, err := fileHeader.Open()
|
||||
if err != nil {
|
||||
return "", err
|
||||
}
|
||||
defer stream.Close()
|
||||
|
||||
return HashReader(stream)
|
||||
}
|
||||
|
||||
func HashFile(path string) (string, error) {
|
||||
file, err := os.Open(path)
|
||||
if err != nil {
|
||||
return "", err
|
||||
}
|
||||
defer file.Close()
|
||||
|
||||
return HashReader(file)
|
||||
}
|
||||
Reference in New Issue
Block a user