spooler.go 2.9 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154
  1. package main
  2. import (
  3. "compress/gzip"
  4. "encoding/json"
  5. "fmt"
  6. "os"
  7. "path/filepath"
  8. "sort"
  9. "time"
  10. )
  11. // Spooler handles offline event storage
  12. type Spooler struct {
  13. dir string
  14. maxBytes int64
  15. }
  16. // NewSpooler creates a new spooler
  17. func NewSpooler(dir string, maxBytes int64) (*Spooler, error) {
  18. if err := os.MkdirAll(dir, 0755); err != nil {
  19. return nil, err
  20. }
  21. return &Spooler{dir: dir, maxBytes: maxBytes}, nil
  22. }
  23. // Save saves a batch to the spool
  24. func (s *Spooler) Save(batch *EventBatch, prefix string) error {
  25. // Generate filename
  26. ts := time.Now().UnixNano()
  27. filename := fmt.Sprintf("%s_%d_%d.json.gz", prefix, ts, os.Getpid())
  28. path := filepath.Join(s.dir, filename)
  29. // Create gzipped file
  30. f, err := os.Create(path)
  31. if err != nil {
  32. return err
  33. }
  34. defer f.Close()
  35. gw := gzip.NewWriter(f)
  36. if err := json.NewEncoder(gw).Encode(batch); err != nil {
  37. gw.Close()
  38. os.Remove(path)
  39. return err
  40. }
  41. if err := gw.Close(); err != nil {
  42. os.Remove(path)
  43. return err
  44. }
  45. // Trim if needed
  46. s.trim()
  47. return nil
  48. }
  49. // PopOldest returns and removes the oldest spooled file
  50. func (s *Spooler) PopOldest() (*EventBatch, error) {
  51. files, err := s.listFiles()
  52. if err != nil || len(files) == 0 {
  53. return nil, err
  54. }
  55. path := files[0]
  56. batch, err := s.loadFile(path)
  57. if err != nil {
  58. // Remove corrupted file
  59. os.Remove(path)
  60. return nil, err
  61. }
  62. os.Remove(path)
  63. return batch, nil
  64. }
  65. // Size returns the total size of spooled files
  66. func (s *Spooler) Size() int64 {
  67. files, err := s.listFiles()
  68. if err != nil {
  69. return 0
  70. }
  71. var total int64
  72. for _, path := range files {
  73. if info, err := os.Stat(path); err == nil {
  74. total += info.Size()
  75. }
  76. }
  77. return total
  78. }
  79. // Count returns the number of spooled files
  80. func (s *Spooler) Count() int {
  81. files, _ := s.listFiles()
  82. return len(files)
  83. }
  84. func (s *Spooler) listFiles() ([]string, error) {
  85. entries, err := os.ReadDir(s.dir)
  86. if err != nil {
  87. return nil, err
  88. }
  89. var files []string
  90. for _, entry := range entries {
  91. if !entry.IsDir() && filepath.Ext(entry.Name()) == ".gz" {
  92. files = append(files, filepath.Join(s.dir, entry.Name()))
  93. }
  94. }
  95. // Sort by modification time (oldest first)
  96. sort.Slice(files, func(i, j int) bool {
  97. infoI, _ := os.Stat(files[i])
  98. infoJ, _ := os.Stat(files[j])
  99. if infoI == nil || infoJ == nil {
  100. return false
  101. }
  102. return infoI.ModTime().Before(infoJ.ModTime())
  103. })
  104. return files, nil
  105. }
  106. func (s *Spooler) loadFile(path string) (*EventBatch, error) {
  107. f, err := os.Open(path)
  108. if err != nil {
  109. return nil, err
  110. }
  111. defer f.Close()
  112. gr, err := gzip.NewReader(f)
  113. if err != nil {
  114. return nil, err
  115. }
  116. defer gr.Close()
  117. var batch EventBatch
  118. if err := json.NewDecoder(gr).Decode(&batch); err != nil {
  119. return nil, err
  120. }
  121. return &batch, nil
  122. }
  123. func (s *Spooler) trim() {
  124. for s.Size() > s.maxBytes {
  125. files, err := s.listFiles()
  126. if err != nil || len(files) == 0 {
  127. break
  128. }
  129. os.Remove(files[0])
  130. }
  131. }