main.go 22 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838
  1. // Beacon Daemon - collects events from scanners and uploads to server
  2. package main
  3. import (
  4. "context"
  5. "encoding/json"
  6. "flag"
  7. "fmt"
  8. "log"
  9. "net"
  10. "net/http"
  11. "os"
  12. "os/signal"
  13. "strings"
  14. "sync"
  15. "syscall"
  16. "time"
  17. "github.com/go-zeromq/zmq4"
  18. )
  19. const (
  20. defaultConfigPath = "/opt/mybeacon/etc/config.json"
  21. defaultStatePath = "/opt/mybeacon/etc/device.json"
  22. defaultBinDir = "/opt/mybeacon/bin"
  23. defaultWiFiIface = "wlan0"
  24. maxSpoolBytes = 100 * 1024 * 1024 // 100 MB
  25. )
  26. type Daemon struct {
  27. cfg *Config
  28. state *DeviceState
  29. client *APIClient
  30. spooler *Spooler
  31. sshTunnel *SSHTunnel
  32. dashboardTunnel *SSHTunnel
  33. scanners *ScannerManager
  34. api *APIServer
  35. netmgr *NetworkManager
  36. bleEvents []interface{}
  37. wifiEvents []interface{}
  38. mu sync.Mutex
  39. configPath string
  40. statePath string
  41. httpAddr string // HTTP API listen address
  42. stopChan chan struct{}
  43. // Upload failure counters (for reducing log spam)
  44. bleUploadFailures int
  45. wifiUploadFailures int
  46. }
  47. func main() {
  48. var (
  49. configPath = flag.String("config", defaultConfigPath, "Config file path")
  50. statePath = flag.String("state", defaultStatePath, "Device state file path")
  51. serverAddr = flag.String("server", "", "API server address (e.g., http://192.168.5.2:5000)")
  52. binDir = flag.String("bindir", defaultBinDir, "Directory with scanner binaries")
  53. wifiIface = flag.String("wifi-iface", defaultWiFiIface, "WiFi interface for monitor mode")
  54. httpAddr = flag.String("http", ":8080", "HTTP API listen address")
  55. debug = flag.Bool("debug", false, "Enable debug logging")
  56. )
  57. flag.Parse()
  58. log.SetFlags(log.Ltime)
  59. log.Println("================================================================================")
  60. log.Println("Beacon Daemon starting...")
  61. // Load configuration
  62. cfg, err := LoadConfig(*configPath)
  63. if err != nil {
  64. log.Printf("Warning: failed to load config: %v (using defaults)", err)
  65. cfg = DefaultConfig()
  66. }
  67. cfg.Debug = *debug || cfg.Debug
  68. // Override server address if provided
  69. if *serverAddr != "" {
  70. cfg.APIBase = *serverAddr + "/api/v1"
  71. log.Printf("Using server: %s", *serverAddr)
  72. }
  73. // Store WiFi interface
  74. cfg.WiFiIface = *wifiIface
  75. // Load device state
  76. state, err := LoadDeviceState(*statePath)
  77. if err != nil {
  78. log.Printf("Warning: failed to load state: %v", err)
  79. state = &DeviceState{}
  80. }
  81. // Get device ID from MAC if not set
  82. if state.DeviceID == "" {
  83. state.DeviceID = getDeviceID()
  84. SaveDeviceState(*statePath, state)
  85. }
  86. log.Printf("Device ID: %s", state.DeviceID)
  87. // Create spooler
  88. spooler, err := NewSpooler(cfg.SpoolDir, maxSpoolBytes)
  89. if err != nil {
  90. log.Fatalf("Failed to create spooler: %v", err)
  91. }
  92. // Create API client
  93. client := NewAPIClient(cfg.APIBase)
  94. // Create SSH tunnel managers (will be started when enabled in config)
  95. sshTunnelCfg := &TunnelConfig{
  96. Enabled: cfg.SSHTunnel.Enabled,
  97. Server: cfg.SSHTunnel.Server,
  98. Port: cfg.SSHTunnel.Port,
  99. User: cfg.SSHTunnel.User,
  100. RemotePort: cfg.SSHTunnel.RemotePort,
  101. KeepaliveInterval: cfg.SSHTunnel.KeepaliveInterval,
  102. ReconnectDelay: cfg.SSHTunnel.ReconnectDelay,
  103. }
  104. sshTunnel := NewSSHTunnel("ssh", 22, sshTunnelCfg, client)
  105. dashboardTunnelCfg := &TunnelConfig{
  106. Enabled: cfg.DashboardTunnel.Enabled,
  107. Server: cfg.DashboardTunnel.Server,
  108. Port: cfg.DashboardTunnel.Port,
  109. User: cfg.DashboardTunnel.User,
  110. RemotePort: cfg.DashboardTunnel.RemotePort,
  111. KeepaliveInterval: cfg.DashboardTunnel.KeepaliveInterval,
  112. ReconnectDelay: cfg.DashboardTunnel.ReconnectDelay,
  113. }
  114. dashboardTunnel := NewSSHTunnel("dashboard", 80, dashboardTunnelCfg, client)
  115. // Create scanner manager
  116. scanners := NewScannerManager(*binDir, cfg.Debug)
  117. // Create network manager (manages eth0, wlan0 client, wlan0 AP fallback)
  118. netmgr := NewNetworkManager(cfg, scanners, state.DevicePassword)
  119. // Create daemon
  120. daemon := &Daemon{
  121. cfg: cfg,
  122. state: state,
  123. client: client,
  124. spooler: spooler,
  125. sshTunnel: sshTunnel,
  126. dashboardTunnel: dashboardTunnel,
  127. scanners: scanners,
  128. netmgr: netmgr,
  129. configPath: *configPath,
  130. statePath: *statePath,
  131. httpAddr: *httpAddr,
  132. stopChan: make(chan struct{}),
  133. }
  134. // Create API server
  135. daemon.api = NewAPIServer(daemon)
  136. if state.DeviceToken != "" {
  137. daemon.client.SetToken(state.DeviceToken)
  138. }
  139. // Handle signals
  140. sigChan := make(chan os.Signal, 1)
  141. signal.Notify(sigChan, syscall.SIGINT, syscall.SIGTERM)
  142. go func() {
  143. <-sigChan
  144. log.Println("Shutting down...")
  145. daemon.api.Stop()
  146. daemon.scanners.StopAll()
  147. daemon.sshTunnel.Stop()
  148. daemon.dashboardTunnel.Stop()
  149. daemon.netmgr.Stop()
  150. close(daemon.stopChan)
  151. }()
  152. // Start HTTP API server if enabled (dashboard should be available immediately)
  153. if cfg.Dashboard.Enabled {
  154. go func() {
  155. log.Printf("Starting HTTP API server on %s", daemon.httpAddr)
  156. if err := daemon.api.Start(daemon.httpAddr); err != nil && err != http.ErrServerClosed {
  157. log.Printf("HTTP server error: %v", err)
  158. }
  159. }()
  160. // Give HTTP server time to bind
  161. time.Sleep(100 * time.Millisecond)
  162. } else {
  163. log.Println("Dashboard disabled - HTTP server not started")
  164. }
  165. // Start network manager (manages eth0, wlan0 client, wlan0 AP fallback, WiFi scanner)
  166. // Network manager will automatically handle all network priorities and scanner coordination
  167. daemon.netmgr.Start()
  168. // Start SSH tunnel if enabled
  169. if cfg.SSHTunnel.Enabled && cfg.SSHTunnel.RemotePort != 0 {
  170. daemon.sshTunnel.Start()
  171. }
  172. // Start Dashboard tunnel if enabled
  173. if cfg.DashboardTunnel.Enabled && cfg.DashboardTunnel.RemotePort != 0 {
  174. daemon.dashboardTunnel.Start()
  175. }
  176. // Start BLE scanner if enabled (not managed by network manager)
  177. if cfg.BLE.Enabled {
  178. if !daemon.scanners.IsBLERunning() {
  179. log.Println("Starting BLE scanner...")
  180. if err := daemon.scanners.StartBLE(cfg.ZMQAddrBLE); err != nil {
  181. log.Printf("Failed to start BLE scanner: %v", err)
  182. }
  183. }
  184. }
  185. // Start registration loop (if not registered)
  186. go daemon.registrationLoop()
  187. // Start config polling loop
  188. go daemon.configLoop()
  189. // Start ZMQ subscribers
  190. go daemon.subscribeLoop("ble", cfg.ZMQAddrBLE)
  191. go daemon.subscribeLoop("wifi", cfg.ZMQAddrWiFi)
  192. // Start batch upload loops
  193. go daemon.uploadLoop("ble", cfg.BLE.BatchIntervalMs)
  194. go daemon.uploadLoop("wifi", cfg.WiFi.BatchIntervalMs)
  195. // Start spool flush loop
  196. go daemon.spoolFlushLoop()
  197. // Wait for shutdown
  198. <-daemon.stopChan
  199. log.Println("Daemon stopped")
  200. }
  201. func (d *Daemon) registrationLoop() {
  202. for {
  203. select {
  204. case <-d.stopChan:
  205. return
  206. default:
  207. }
  208. if d.state.DeviceToken != "" {
  209. time.Sleep(60 * time.Second)
  210. continue
  211. }
  212. log.Println("Attempting device registration...")
  213. // Generate or load SSH key pair
  214. sshKeyPath := "/opt/mybeacon/etc/ssh_tunnel_ed25519"
  215. sshPubKey, err := GenerateOrLoadSSHKey(sshKeyPath)
  216. if err != nil {
  217. log.Printf("Failed to generate/load SSH key: %v", err)
  218. time.Sleep(10 * time.Second)
  219. continue
  220. }
  221. log.Printf("SSH key ready: %s", sshKeyPath)
  222. req := &RegistrationRequest{
  223. DeviceID: d.state.DeviceID,
  224. SSHPublicKey: sshPubKey,
  225. }
  226. // Try to get IPs
  227. if ip := getInterfaceIP("eth0"); ip != "" {
  228. req.EthIP = &ip
  229. }
  230. if ip := getInterfaceIP("wlan0"); ip != "" {
  231. req.WlanIP = &ip
  232. }
  233. resp, err := d.client.Register(req)
  234. if err != nil {
  235. log.Printf("Registration failed: %v", err)
  236. time.Sleep(10 * time.Second)
  237. continue
  238. }
  239. d.state.DeviceToken = resp.DeviceToken
  240. d.state.DevicePassword = resp.DevicePassword
  241. d.client.SetToken(resp.DeviceToken)
  242. SaveDeviceState(d.statePath, d.state)
  243. log.Printf("Device registered, token received")
  244. // Immediately fetch config after registration
  245. log.Println("Fetching initial config from server...")
  246. d.fetchAndApplyConfig()
  247. }
  248. }
  249. func (d *Daemon) configLoop() {
  250. // Initial fetch immediately
  251. d.fetchAndApplyConfig()
  252. // Get initial polling interval (default 30 seconds)
  253. d.mu.Lock()
  254. interval := d.cfg.ConfigPollingInterval
  255. if interval <= 0 {
  256. interval = 30
  257. }
  258. d.mu.Unlock()
  259. ticker := time.NewTicker(time.Duration(interval) * time.Second)
  260. defer ticker.Stop()
  261. for {
  262. select {
  263. case <-d.stopChan:
  264. return
  265. case <-ticker.C:
  266. }
  267. d.fetchAndApplyConfig()
  268. // Check if interval changed and reset ticker if needed
  269. d.mu.Lock()
  270. newInterval := d.cfg.ConfigPollingInterval
  271. if newInterval <= 0 {
  272. newInterval = 30
  273. }
  274. if newInterval != interval {
  275. interval = newInterval
  276. ticker.Reset(time.Duration(interval) * time.Second)
  277. if d.cfg.Debug {
  278. log.Printf("[config] Polling interval changed to %d seconds", interval)
  279. }
  280. }
  281. d.mu.Unlock()
  282. }
  283. }
  284. func (d *Daemon) fetchAndApplyConfig() {
  285. if d.state.DeviceToken == "" {
  286. return
  287. }
  288. serverCfg, err := d.client.GetConfig(d.state.DeviceID)
  289. if err != nil {
  290. // In LAN mode, suppress errors (server might be unreachable)
  291. if d.cfg.Mode == "lan" {
  292. // Silent - use local config
  293. return
  294. }
  295. log.Printf("[config] Fetch failed: %v", err)
  296. return
  297. }
  298. log.Printf("[config] Fetched from server (polling interval: %ds)", d.cfg.ConfigPollingInterval)
  299. // Determine effective mode: force_cloud overrides local mode setting
  300. effectiveMode := d.cfg.Mode
  301. if effectiveMode == "" {
  302. effectiveMode = "cloud"
  303. }
  304. if serverCfg.ForceCloud {
  305. if effectiveMode == "lan" {
  306. log.Println("Server force_cloud enabled - switching to cloud mode")
  307. }
  308. effectiveMode = "cloud"
  309. }
  310. d.mu.Lock()
  311. // Track changes for scanner restart
  312. bleChanged := false
  313. wifiMonitorChanged := false
  314. wifiClientChanged := false
  315. if effectiveMode == "cloud" {
  316. // Cloud mode: server settings have priority
  317. bleChanged = d.cfg.BLE.Enabled != serverCfg.BLE.Enabled
  318. wifiMonitorChanged = d.cfg.WiFi.MonitorEnabled != serverCfg.WiFi.MonitorEnabled
  319. wifiClientChanged = d.cfg.WiFi.ClientEnabled != serverCfg.WiFi.ClientEnabled ||
  320. d.cfg.WiFi.SSID != serverCfg.WiFi.SSID ||
  321. d.cfg.WiFi.PSK != serverCfg.WiFi.PSK
  322. d.cfg.BLE.Enabled = serverCfg.BLE.Enabled
  323. d.cfg.BLE.BatchIntervalMs = serverCfg.BLE.BatchIntervalMs
  324. d.cfg.BLE.UploadEndpoint = serverCfg.BLE.UploadEndpoint
  325. d.cfg.WiFi.MonitorEnabled = serverCfg.WiFi.MonitorEnabled
  326. d.cfg.WiFi.ClientEnabled = serverCfg.WiFi.ClientEnabled
  327. d.cfg.WiFi.SSID = serverCfg.WiFi.SSID
  328. d.cfg.WiFi.PSK = serverCfg.WiFi.PSK
  329. d.cfg.WiFi.BatchIntervalMs = serverCfg.WiFi.BatchIntervalMs
  330. d.cfg.WiFi.UploadEndpoint = serverCfg.WiFi.UploadEndpoint
  331. d.cfg.Debug = serverCfg.Debug
  332. // Config polling interval from server
  333. if serverCfg.ConfigPollingInterval > 0 {
  334. d.cfg.ConfigPollingInterval = serverCfg.ConfigPollingInterval
  335. }
  336. // NTP from server in cloud mode
  337. if len(serverCfg.Net.NTP.Servers) > 0 {
  338. d.cfg.Network.NTPServers = serverCfg.Net.NTP.Servers
  339. }
  340. }
  341. // LAN mode: local settings have priority, we keep what's in d.cfg
  342. // Only tunnels come from server (for remote support)
  343. // SSH tunnel ALWAYS from server (for remote support access)
  344. sshChanged := d.cfg.SSHTunnel.Enabled != serverCfg.SSHTunnel.Enabled ||
  345. d.cfg.SSHTunnel.RemotePort != serverCfg.SSHTunnel.RemotePort
  346. if serverCfg.SSHTunnel.Enabled {
  347. d.cfg.SSHTunnel.Enabled = true
  348. d.cfg.SSHTunnel.Server = serverCfg.SSHTunnel.Server
  349. d.cfg.SSHTunnel.Port = serverCfg.SSHTunnel.Port
  350. d.cfg.SSHTunnel.User = serverCfg.SSHTunnel.User
  351. d.cfg.SSHTunnel.RemotePort = serverCfg.SSHTunnel.RemotePort
  352. d.cfg.SSHTunnel.KeepaliveInterval = serverCfg.SSHTunnel.KeepaliveInterval
  353. } else {
  354. d.cfg.SSHTunnel.Enabled = false
  355. }
  356. // Dashboard tunnel ALWAYS from server (for remote dashboard access)
  357. dashboardTunnelChanged := d.cfg.DashboardTunnel.Enabled != serverCfg.DashboardTunnel.Enabled ||
  358. d.cfg.DashboardTunnel.RemotePort != serverCfg.DashboardTunnel.RemotePort
  359. if serverCfg.DashboardTunnel.Enabled {
  360. d.cfg.DashboardTunnel.Enabled = true
  361. d.cfg.DashboardTunnel.Server = serverCfg.DashboardTunnel.Server
  362. d.cfg.DashboardTunnel.Port = serverCfg.DashboardTunnel.Port
  363. d.cfg.DashboardTunnel.User = serverCfg.DashboardTunnel.User
  364. d.cfg.DashboardTunnel.RemotePort = serverCfg.DashboardTunnel.RemotePort
  365. d.cfg.DashboardTunnel.KeepaliveInterval = serverCfg.DashboardTunnel.KeepaliveInterval
  366. } else {
  367. d.cfg.DashboardTunnel.Enabled = false
  368. }
  369. // Dashboard HTTP API ALWAYS from server (for remote management)
  370. dashboardChanged := d.cfg.Dashboard.Enabled != serverCfg.Dashboard.Enabled
  371. d.cfg.Dashboard.Enabled = serverCfg.Dashboard.Enabled
  372. d.mu.Unlock()
  373. // Apply BLE scanner changes (not managed by Network Manager)
  374. if bleChanged {
  375. log.Printf("BLE config changed (mode=%s): enabled=%v", effectiveMode, d.cfg.BLE.Enabled)
  376. if d.cfg.BLE.Enabled {
  377. if !d.scanners.IsBLERunning() {
  378. log.Println("Starting BLE scanner...")
  379. d.scanners.StartBLE(d.cfg.ZMQAddrBLE)
  380. }
  381. } else {
  382. if d.scanners.IsBLERunning() {
  383. log.Println("Stopping BLE scanner...")
  384. d.scanners.StopBLE()
  385. }
  386. }
  387. }
  388. // Log WiFi config changes (Network Manager will handle automatically)
  389. if wifiMonitorChanged || wifiClientChanged {
  390. log.Printf("WiFi config changed (mode=%s): monitor=%v client=%v ssid=%s",
  391. effectiveMode, d.cfg.WiFi.MonitorEnabled, d.cfg.WiFi.ClientEnabled, d.cfg.WiFi.SSID)
  392. log.Println("Network Manager will apply changes automatically")
  393. }
  394. // Update SSH tunnel config
  395. sshTunnelCfg := &TunnelConfig{
  396. Enabled: d.cfg.SSHTunnel.Enabled,
  397. Server: d.cfg.SSHTunnel.Server,
  398. Port: d.cfg.SSHTunnel.Port,
  399. User: d.cfg.SSHTunnel.User,
  400. RemotePort: d.cfg.SSHTunnel.RemotePort,
  401. KeepaliveInterval: d.cfg.SSHTunnel.KeepaliveInterval,
  402. ReconnectDelay: d.cfg.SSHTunnel.ReconnectDelay,
  403. }
  404. d.sshTunnel.UpdateConfig(sshTunnelCfg)
  405. if sshChanged {
  406. if d.cfg.SSHTunnel.Enabled && d.cfg.SSHTunnel.RemotePort != 0 {
  407. log.Printf("SSH tunnel enabled by server (port %d)", d.cfg.SSHTunnel.RemotePort)
  408. d.sshTunnel.Restart()
  409. } else {
  410. log.Println("SSH tunnel disabled by server")
  411. d.sshTunnel.Stop()
  412. }
  413. }
  414. // Update Dashboard tunnel config
  415. dashboardTunnelCfg := &TunnelConfig{
  416. Enabled: d.cfg.DashboardTunnel.Enabled,
  417. Server: d.cfg.DashboardTunnel.Server,
  418. Port: d.cfg.DashboardTunnel.Port,
  419. User: d.cfg.DashboardTunnel.User,
  420. RemotePort: d.cfg.DashboardTunnel.RemotePort,
  421. KeepaliveInterval: d.cfg.DashboardTunnel.KeepaliveInterval,
  422. ReconnectDelay: d.cfg.DashboardTunnel.ReconnectDelay,
  423. }
  424. d.dashboardTunnel.UpdateConfig(dashboardTunnelCfg)
  425. if dashboardTunnelChanged {
  426. if d.cfg.DashboardTunnel.Enabled && d.cfg.DashboardTunnel.RemotePort != 0 {
  427. log.Printf("Dashboard tunnel enabled by server (port %d)", d.cfg.DashboardTunnel.RemotePort)
  428. d.dashboardTunnel.Restart()
  429. } else {
  430. log.Println("Dashboard tunnel disabled by server")
  431. d.dashboardTunnel.Stop()
  432. }
  433. }
  434. // Update dashboard state
  435. if dashboardChanged {
  436. if d.cfg.Dashboard.Enabled {
  437. if !d.api.IsRunning() {
  438. log.Println("Dashboard enabled by server - starting HTTP API")
  439. go func() {
  440. if err := d.api.Start(d.httpAddr); err != nil && err != http.ErrServerClosed {
  441. log.Printf("HTTP server error: %v", err)
  442. }
  443. }()
  444. }
  445. } else {
  446. if d.api.IsRunning() {
  447. log.Println("Dashboard disabled by server - stopping HTTP API")
  448. d.api.Stop()
  449. }
  450. }
  451. }
  452. // Update network manager config - it will handle all network changes automatically
  453. // (eth0 settings are local-only, never from server)
  454. // (WiFi client and scanner coordination handled by Network Manager)
  455. d.netmgr.UpdateConfig(d.cfg)
  456. // Save updated config
  457. SaveConfig(d.configPath, d.cfg)
  458. }
  459. func (d *Daemon) subscribeLoop(name string, addr string) {
  460. for {
  461. select {
  462. case <-d.stopChan:
  463. return
  464. default:
  465. }
  466. // Only try to connect if the corresponding scanner is running
  467. scannerRunning := false
  468. if name == "ble" {
  469. scannerRunning = d.scanners.IsBLERunning()
  470. } else if name == "wifi" {
  471. scannerRunning = d.scanners.IsWiFiRunning()
  472. }
  473. if !scannerRunning {
  474. // Wait before checking again
  475. time.Sleep(5 * time.Second)
  476. continue
  477. }
  478. if err := d.runSubscriber(name, addr); err != nil {
  479. log.Printf("[%s] Subscriber error: %v, reconnecting...", name, err)
  480. time.Sleep(time.Second)
  481. }
  482. }
  483. }
  484. func (d *Daemon) runSubscriber(name string, addr string) error {
  485. ctx, cancel := context.WithCancel(context.Background())
  486. defer cancel()
  487. sub := zmq4.NewSub(ctx)
  488. defer sub.Close()
  489. // Subscribe to all topics for this type
  490. if err := sub.SetOption(zmq4.OptionSubscribe, name+"."); err != nil {
  491. return err
  492. }
  493. if err := sub.Dial(addr); err != nil {
  494. return err
  495. }
  496. log.Printf("[%s] Connected to %s", name, addr)
  497. // Monitor stop channel in goroutine
  498. go func() {
  499. <-d.stopChan
  500. cancel()
  501. }()
  502. for {
  503. msg, err := sub.Recv()
  504. if err != nil {
  505. return err
  506. }
  507. // Message is in first frame
  508. data := string(msg.Frames[0])
  509. // Parse message: "topic JSON"
  510. parts := strings.SplitN(data, " ", 2)
  511. if len(parts) != 2 {
  512. continue
  513. }
  514. var event interface{}
  515. if err := json.Unmarshal([]byte(parts[1]), &event); err != nil {
  516. continue
  517. }
  518. d.mu.Lock()
  519. if name == "ble" {
  520. d.bleEvents = append(d.bleEvents, event)
  521. // Prevent memory exhaustion: drop oldest events if queue too large
  522. if len(d.bleEvents) > 10000 {
  523. dropped := len(d.bleEvents) - 5000
  524. d.bleEvents = append([]interface{}{}, d.bleEvents[dropped:]...)
  525. log.Printf("[%s] Queue overflow, dropped %d oldest events", name, dropped)
  526. }
  527. if d.cfg.Debug {
  528. log.Printf("[%s] Received event, queue size: %d", name, len(d.bleEvents))
  529. }
  530. // Add to API for WebSocket broadcast
  531. if d.api != nil {
  532. d.api.AddBLEEvent(event)
  533. }
  534. } else {
  535. d.wifiEvents = append(d.wifiEvents, event)
  536. // Prevent memory exhaustion: drop oldest events if queue too large
  537. if len(d.wifiEvents) > 10000 {
  538. dropped := len(d.wifiEvents) - 5000
  539. d.wifiEvents = append([]interface{}{}, d.wifiEvents[dropped:]...)
  540. log.Printf("[%s] Queue overflow, dropped %d oldest events", name, dropped)
  541. }
  542. if d.cfg.Debug {
  543. log.Printf("[%s] Received event, queue size: %d", name, len(d.wifiEvents))
  544. }
  545. // Add to API for WebSocket broadcast
  546. if d.api != nil {
  547. d.api.AddWiFiEvent(event)
  548. }
  549. }
  550. d.mu.Unlock()
  551. }
  552. }
  553. func (d *Daemon) uploadLoop(name string, intervalMs int) {
  554. if intervalMs <= 0 {
  555. intervalMs = 2500
  556. }
  557. ticker := time.NewTicker(time.Duration(intervalMs) * time.Millisecond)
  558. defer ticker.Stop()
  559. for {
  560. select {
  561. case <-d.stopChan:
  562. return
  563. case <-ticker.C:
  564. }
  565. // Skip upload if not registered yet
  566. if d.state.DeviceToken == "" {
  567. continue
  568. }
  569. d.mu.Lock()
  570. var events []interface{}
  571. var endpoint string
  572. if name == "ble" {
  573. events = d.bleEvents
  574. d.bleEvents = nil
  575. // Use custom endpoint if set, otherwise default to /ble
  576. if d.cfg.BLE.UploadEndpoint != "" {
  577. endpoint = d.cfg.BLE.UploadEndpoint
  578. } else {
  579. endpoint = "/ble"
  580. }
  581. } else {
  582. events = d.wifiEvents
  583. d.wifiEvents = nil
  584. // Use custom endpoint if set, otherwise default to /wifi
  585. if d.cfg.WiFi.UploadEndpoint != "" {
  586. endpoint = d.cfg.WiFi.UploadEndpoint
  587. } else {
  588. endpoint = "/wifi"
  589. }
  590. }
  591. d.mu.Unlock()
  592. if len(events) == 0 {
  593. continue
  594. }
  595. if d.cfg.Debug {
  596. log.Printf("[%s] Batch ready: %d events, uploading...", name, len(events))
  597. }
  598. batch := &EventBatch{
  599. DeviceID: d.state.DeviceID,
  600. Events: events,
  601. }
  602. if err := d.client.UploadEvents(endpoint, batch); err != nil {
  603. // Increment failure counter
  604. if name == "ble" {
  605. d.bleUploadFailures++
  606. } else {
  607. d.wifiUploadFailures++
  608. }
  609. // Log only every 6th failure (once per minute) to reduce spam when network is down
  610. failCount := d.bleUploadFailures
  611. if name != "ble" {
  612. failCount = d.wifiUploadFailures
  613. }
  614. if failCount == 1 || failCount%6 == 0 {
  615. log.Printf("[%s] Upload failed: %v, spooling %d events (failures: %d)", name, err, len(events), failCount)
  616. }
  617. if err := d.spooler.Save(batch, name); err != nil {
  618. log.Printf("[%s] Spool save failed: %v", name, err)
  619. }
  620. } else {
  621. // Reset failure counter on success
  622. if name == "ble" {
  623. d.bleUploadFailures = 0
  624. } else {
  625. d.wifiUploadFailures = 0
  626. }
  627. log.Printf("[%s] Uploaded %d events to server", name, len(events))
  628. }
  629. }
  630. }
  631. func (d *Daemon) spoolFlushLoop() {
  632. ticker := time.NewTicker(10 * time.Second)
  633. defer ticker.Stop()
  634. for {
  635. select {
  636. case <-d.stopChan:
  637. return
  638. case <-ticker.C:
  639. }
  640. if d.state.DeviceToken == "" {
  641. continue
  642. }
  643. // Try to flush one batch
  644. batch, err := d.spooler.PopOldest()
  645. if err != nil || batch == nil {
  646. continue
  647. }
  648. // Try to upload (guess endpoint from event types)
  649. endpoint := "/events"
  650. eventType := "unknown"
  651. if len(batch.Events) > 0 {
  652. if ev, ok := batch.Events[0].(map[string]interface{}); ok {
  653. if t, ok := ev["type"].(string); ok {
  654. if strings.HasPrefix(t, "wifi") {
  655. // Use custom endpoint if set, otherwise default to /wifi
  656. d.mu.Lock()
  657. if d.cfg.WiFi.UploadEndpoint != "" {
  658. endpoint = d.cfg.WiFi.UploadEndpoint
  659. } else {
  660. endpoint = "/wifi"
  661. }
  662. d.mu.Unlock()
  663. eventType = "wifi"
  664. } else {
  665. // Use custom endpoint if set, otherwise default to /ble
  666. d.mu.Lock()
  667. if d.cfg.BLE.UploadEndpoint != "" {
  668. endpoint = d.cfg.BLE.UploadEndpoint
  669. } else {
  670. endpoint = "/ble"
  671. }
  672. d.mu.Unlock()
  673. eventType = "ble"
  674. }
  675. }
  676. }
  677. }
  678. if err := d.client.UploadEvents(endpoint, batch); err != nil {
  679. // Re-spool if upload failed
  680. d.spooler.Save(batch, "retry")
  681. } else {
  682. log.Printf("[spool] Flushed %d %s events to server", len(batch.Events), eventType)
  683. }
  684. }
  685. }
  686. // updateWiFiCredentials sends WiFi credentials to server (Cloud Mode only)
  687. func (d *Daemon) updateWiFiCredentials(ssid, psk string) error {
  688. return d.client.UpdateWiFiCredentials(ssid, psk)
  689. }
  690. // getDeviceID returns a device ID based on MAC address
  691. func getDeviceID() string {
  692. // Try eth0 first, then wlan0
  693. for _, iface := range []string{"eth0", "wlan0"} {
  694. if mac := getInterfaceMAC(iface); mac != "" {
  695. return mac
  696. }
  697. }
  698. // Fallback to hostname
  699. host, _ := os.Hostname()
  700. return host
  701. }
  702. func getInterfaceMAC(name string) string {
  703. iface, err := net.InterfaceByName(name)
  704. if err != nil {
  705. return ""
  706. }
  707. return iface.HardwareAddr.String()
  708. }
  709. func getInterfaceIP(name string) string {
  710. iface, err := net.InterfaceByName(name)
  711. if err != nil {
  712. return ""
  713. }
  714. addrs, err := iface.Addrs()
  715. if err != nil {
  716. return ""
  717. }
  718. for _, addr := range addrs {
  719. if ipnet, ok := addr.(*net.IPNet); ok && ipnet.IP.To4() != nil {
  720. // Return IP with CIDR notation (e.g., 192.168.5.244/24)
  721. ones, _ := ipnet.Mask.Size()
  722. return fmt.Sprintf("%s/%d", ipnet.IP.String(), ones)
  723. }
  724. }
  725. return ""
  726. }