| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154 |
- package main
- import (
- "compress/gzip"
- "encoding/json"
- "fmt"
- "os"
- "path/filepath"
- "sort"
- "time"
- )
- // Spooler handles offline event storage
- type Spooler struct {
- dir string
- maxBytes int64
- }
- // NewSpooler creates a new spooler
- func NewSpooler(dir string, maxBytes int64) (*Spooler, error) {
- if err := os.MkdirAll(dir, 0755); err != nil {
- return nil, err
- }
- return &Spooler{dir: dir, maxBytes: maxBytes}, nil
- }
- // Save saves a batch to the spool
- func (s *Spooler) Save(batch *EventBatch, prefix string) error {
- // Generate filename
- ts := time.Now().UnixNano()
- filename := fmt.Sprintf("%s_%d_%d.json.gz", prefix, ts, os.Getpid())
- path := filepath.Join(s.dir, filename)
- // Create gzipped file
- f, err := os.Create(path)
- if err != nil {
- return err
- }
- defer f.Close()
- gw := gzip.NewWriter(f)
- if err := json.NewEncoder(gw).Encode(batch); err != nil {
- gw.Close()
- os.Remove(path)
- return err
- }
- if err := gw.Close(); err != nil {
- os.Remove(path)
- return err
- }
- // Trim if needed
- s.trim()
- return nil
- }
- // PopOldest returns and removes the oldest spooled file
- func (s *Spooler) PopOldest() (*EventBatch, error) {
- files, err := s.listFiles()
- if err != nil || len(files) == 0 {
- return nil, err
- }
- path := files[0]
- batch, err := s.loadFile(path)
- if err != nil {
- // Remove corrupted file
- os.Remove(path)
- return nil, err
- }
- os.Remove(path)
- return batch, nil
- }
- // Size returns the total size of spooled files
- func (s *Spooler) Size() int64 {
- files, err := s.listFiles()
- if err != nil {
- return 0
- }
- var total int64
- for _, path := range files {
- if info, err := os.Stat(path); err == nil {
- total += info.Size()
- }
- }
- return total
- }
- // Count returns the number of spooled files
- func (s *Spooler) Count() int {
- files, _ := s.listFiles()
- return len(files)
- }
- func (s *Spooler) listFiles() ([]string, error) {
- entries, err := os.ReadDir(s.dir)
- if err != nil {
- return nil, err
- }
- var files []string
- for _, entry := range entries {
- if !entry.IsDir() && filepath.Ext(entry.Name()) == ".gz" {
- files = append(files, filepath.Join(s.dir, entry.Name()))
- }
- }
- // Sort by modification time (oldest first)
- sort.Slice(files, func(i, j int) bool {
- infoI, _ := os.Stat(files[i])
- infoJ, _ := os.Stat(files[j])
- if infoI == nil || infoJ == nil {
- return false
- }
- return infoI.ModTime().Before(infoJ.ModTime())
- })
- return files, nil
- }
- func (s *Spooler) loadFile(path string) (*EventBatch, error) {
- f, err := os.Open(path)
- if err != nil {
- return nil, err
- }
- defer f.Close()
- gr, err := gzip.NewReader(f)
- if err != nil {
- return nil, err
- }
- defer gr.Close()
- var batch EventBatch
- if err := json.NewDecoder(gr).Decode(&batch); err != nil {
- return nil, err
- }
- return &batch, nil
- }
- func (s *Spooler) trim() {
- for s.Size() > s.maxBytes {
- files, err := s.listFiles()
- if err != nil || len(files) == 0 {
- break
- }
- os.Remove(files[0])
- }
- }
|