package main import ( "compress/gzip" "encoding/json" "fmt" "os" "path/filepath" "sort" "time" ) // Spooler handles offline event storage type Spooler struct { dir string maxBytes int64 } // NewSpooler creates a new spooler func NewSpooler(dir string, maxBytes int64) (*Spooler, error) { if err := os.MkdirAll(dir, 0755); err != nil { return nil, err } return &Spooler{dir: dir, maxBytes: maxBytes}, nil } // Save saves a batch to the spool func (s *Spooler) Save(batch *EventBatch, prefix string) error { // Generate filename ts := time.Now().UnixNano() filename := fmt.Sprintf("%s_%d_%d.json.gz", prefix, ts, os.Getpid()) path := filepath.Join(s.dir, filename) // Create gzipped file f, err := os.Create(path) if err != nil { return err } defer f.Close() gw := gzip.NewWriter(f) if err := json.NewEncoder(gw).Encode(batch); err != nil { gw.Close() os.Remove(path) return err } if err := gw.Close(); err != nil { os.Remove(path) return err } // Trim if needed s.trim() return nil } // PopOldest returns and removes the oldest spooled file func (s *Spooler) PopOldest() (*EventBatch, error) { files, err := s.listFiles() if err != nil || len(files) == 0 { return nil, err } path := files[0] batch, err := s.loadFile(path) if err != nil { // Remove corrupted file os.Remove(path) return nil, err } os.Remove(path) return batch, nil } // Size returns the total size of spooled files func (s *Spooler) Size() int64 { files, err := s.listFiles() if err != nil { return 0 } var total int64 for _, path := range files { if info, err := os.Stat(path); err == nil { total += info.Size() } } return total } // Count returns the number of spooled files func (s *Spooler) Count() int { files, _ := s.listFiles() return len(files) } func (s *Spooler) listFiles() ([]string, error) { entries, err := os.ReadDir(s.dir) if err != nil { return nil, err } var files []string for _, entry := range entries { if !entry.IsDir() && filepath.Ext(entry.Name()) == ".gz" { files = append(files, filepath.Join(s.dir, entry.Name())) } } // Sort by modification time (oldest first) sort.Slice(files, func(i, j int) bool { infoI, _ := os.Stat(files[i]) infoJ, _ := os.Stat(files[j]) if infoI == nil || infoJ == nil { return false } return infoI.ModTime().Before(infoJ.ModTime()) }) return files, nil } func (s *Spooler) loadFile(path string) (*EventBatch, error) { f, err := os.Open(path) if err != nil { return nil, err } defer f.Close() gr, err := gzip.NewReader(f) if err != nil { return nil, err } defer gr.Close() var batch EventBatch if err := json.NewDecoder(gr).Decode(&batch); err != nil { return nil, err } return &batch, nil } func (s *Spooler) trim() { for s.Size() > s.maxBytes { files, err := s.listFiles() if err != nil || len(files) == 0 { break } os.Remove(files[0]) } }