main.go 16 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674
  1. // Beacon Daemon - collects events from scanners and uploads to server
  2. package main
  3. import (
  4. "context"
  5. "encoding/json"
  6. "flag"
  7. "fmt"
  8. "log"
  9. "net"
  10. "net/http"
  11. "os"
  12. "os/signal"
  13. "strings"
  14. "sync"
  15. "syscall"
  16. "time"
  17. "github.com/go-zeromq/zmq4"
  18. )
  19. const (
  20. defaultConfigPath = "/opt/mybeacon/etc/config.json"
  21. defaultStatePath = "/opt/mybeacon/etc/device.json"
  22. defaultBinDir = "/opt/mybeacon/bin"
  23. defaultWiFiIface = "wlan0"
  24. maxSpoolBytes = 100 * 1024 * 1024 // 100 MB
  25. )
  26. type Daemon struct {
  27. cfg *Config
  28. state *DeviceState
  29. client *APIClient
  30. spooler *Spooler
  31. tunnel *SSHTunnel
  32. scanners *ScannerManager
  33. api *APIServer
  34. netmgr *NetworkManager
  35. bleEvents []interface{}
  36. wifiEvents []interface{}
  37. mu sync.Mutex
  38. configPath string
  39. statePath string
  40. httpAddr string // HTTP API listen address
  41. stopChan chan struct{}
  42. // Upload failure counters (for reducing log spam)
  43. bleUploadFailures int
  44. wifiUploadFailures int
  45. }
  46. func main() {
  47. var (
  48. configPath = flag.String("config", defaultConfigPath, "Config file path")
  49. statePath = flag.String("state", defaultStatePath, "Device state file path")
  50. serverAddr = flag.String("server", "", "API server address (e.g., http://192.168.5.2:5000)")
  51. binDir = flag.String("bindir", defaultBinDir, "Directory with scanner binaries")
  52. wifiIface = flag.String("wifi-iface", defaultWiFiIface, "WiFi interface for monitor mode")
  53. httpAddr = flag.String("http", ":8080", "HTTP API listen address")
  54. debug = flag.Bool("debug", false, "Enable debug logging")
  55. )
  56. flag.Parse()
  57. log.SetFlags(log.Ltime)
  58. log.Println("================================================================================")
  59. log.Println("Beacon Daemon starting...")
  60. // Load configuration
  61. cfg, err := LoadConfig(*configPath)
  62. if err != nil {
  63. log.Printf("Warning: failed to load config: %v (using defaults)", err)
  64. cfg = DefaultConfig()
  65. }
  66. cfg.Debug = *debug || cfg.Debug
  67. // Override server address if provided
  68. if *serverAddr != "" {
  69. cfg.APIBase = *serverAddr + "/api/v1"
  70. log.Printf("Using server: %s", *serverAddr)
  71. }
  72. // Store WiFi interface
  73. cfg.WiFiIface = *wifiIface
  74. // Load device state
  75. state, err := LoadDeviceState(*statePath)
  76. if err != nil {
  77. log.Printf("Warning: failed to load state: %v", err)
  78. state = &DeviceState{}
  79. }
  80. // Get device ID from MAC if not set
  81. if state.DeviceID == "" {
  82. state.DeviceID = getDeviceID()
  83. SaveDeviceState(*statePath, state)
  84. }
  85. log.Printf("Device ID: %s", state.DeviceID)
  86. // Create spooler
  87. spooler, err := NewSpooler(cfg.SpoolDir, maxSpoolBytes)
  88. if err != nil {
  89. log.Fatalf("Failed to create spooler: %v", err)
  90. }
  91. // Create SSH tunnel manager
  92. tunnel := NewSSHTunnel(cfg)
  93. // Create scanner manager
  94. scanners := NewScannerManager(*binDir, cfg.Debug)
  95. // Create network manager (manages eth0, wlan0 client, wlan0 AP fallback)
  96. netmgr := NewNetworkManager(cfg, scanners, state.DevicePassword)
  97. // Create daemon
  98. daemon := &Daemon{
  99. cfg: cfg,
  100. state: state,
  101. client: NewAPIClient(cfg.APIBase),
  102. spooler: spooler,
  103. tunnel: tunnel,
  104. scanners: scanners,
  105. netmgr: netmgr,
  106. configPath: *configPath,
  107. statePath: *statePath,
  108. httpAddr: *httpAddr,
  109. stopChan: make(chan struct{}),
  110. }
  111. // Create API server
  112. daemon.api = NewAPIServer(daemon)
  113. if state.DeviceToken != "" {
  114. daemon.client.SetToken(state.DeviceToken)
  115. }
  116. // Handle signals
  117. sigChan := make(chan os.Signal, 1)
  118. signal.Notify(sigChan, syscall.SIGINT, syscall.SIGTERM)
  119. go func() {
  120. <-sigChan
  121. log.Println("Shutting down...")
  122. daemon.api.Stop()
  123. daemon.scanners.StopAll()
  124. daemon.tunnel.Stop()
  125. daemon.netmgr.Stop()
  126. close(daemon.stopChan)
  127. }()
  128. // Start HTTP API server if enabled (dashboard should be available immediately)
  129. if cfg.Dashboard.Enabled {
  130. go func() {
  131. log.Printf("Starting HTTP API server on %s", daemon.httpAddr)
  132. if err := daemon.api.Start(daemon.httpAddr); err != nil && err != http.ErrServerClosed {
  133. log.Printf("HTTP server error: %v", err)
  134. }
  135. }()
  136. // Give HTTP server time to bind
  137. time.Sleep(100 * time.Millisecond)
  138. } else {
  139. log.Println("Dashboard disabled - HTTP server not started")
  140. }
  141. // Start network manager (manages eth0, wlan0 client, wlan0 AP fallback, WiFi scanner)
  142. // Network manager will automatically handle all network priorities and scanner coordination
  143. daemon.netmgr.Start()
  144. // Start SSH tunnel if enabled
  145. if cfg.SSHTunnel.Enabled {
  146. daemon.tunnel.Start()
  147. }
  148. // Start BLE scanner if enabled (not managed by network manager)
  149. if cfg.BLE.Enabled {
  150. if !daemon.scanners.IsBLERunning() {
  151. log.Println("Starting BLE scanner...")
  152. if err := daemon.scanners.StartBLE(cfg.ZMQAddrBLE); err != nil {
  153. log.Printf("Failed to start BLE scanner: %v", err)
  154. }
  155. }
  156. }
  157. // Start registration loop (if not registered)
  158. go daemon.registrationLoop()
  159. // Start config polling loop
  160. go daemon.configLoop()
  161. // Start ZMQ subscribers
  162. go daemon.subscribeLoop("ble", cfg.ZMQAddrBLE)
  163. go daemon.subscribeLoop("wifi", cfg.ZMQAddrWiFi)
  164. // Start batch upload loops
  165. go daemon.uploadLoop("ble", "/ble", cfg.BLE.BatchIntervalMs)
  166. go daemon.uploadLoop("wifi", "/wifi", cfg.WiFi.BatchIntervalMs)
  167. // Start spool flush loop
  168. go daemon.spoolFlushLoop()
  169. // Wait for shutdown
  170. <-daemon.stopChan
  171. log.Println("Daemon stopped")
  172. }
  173. func (d *Daemon) registrationLoop() {
  174. for {
  175. select {
  176. case <-d.stopChan:
  177. return
  178. default:
  179. }
  180. if d.state.DeviceToken != "" {
  181. time.Sleep(60 * time.Second)
  182. continue
  183. }
  184. log.Println("Attempting device registration...")
  185. req := &RegistrationRequest{
  186. DeviceID: d.state.DeviceID,
  187. }
  188. // Try to get IPs
  189. if ip := getInterfaceIP("eth0"); ip != "" {
  190. req.EthIP = &ip
  191. }
  192. if ip := getInterfaceIP("wlan0"); ip != "" {
  193. req.WlanIP = &ip
  194. }
  195. resp, err := d.client.Register(req)
  196. if err != nil {
  197. log.Printf("Registration failed: %v", err)
  198. time.Sleep(10 * time.Second)
  199. continue
  200. }
  201. d.state.DeviceToken = resp.DeviceToken
  202. d.state.DevicePassword = resp.DevicePassword
  203. d.client.SetToken(resp.DeviceToken)
  204. SaveDeviceState(d.statePath, d.state)
  205. log.Printf("Device registered, token received")
  206. // Immediately fetch config after registration
  207. log.Println("Fetching initial config from server...")
  208. d.fetchAndApplyConfig()
  209. }
  210. }
  211. func (d *Daemon) configLoop() {
  212. // Initial fetch immediately
  213. d.fetchAndApplyConfig()
  214. ticker := time.NewTicker(30 * time.Second)
  215. defer ticker.Stop()
  216. for {
  217. select {
  218. case <-d.stopChan:
  219. return
  220. case <-ticker.C:
  221. }
  222. d.fetchAndApplyConfig()
  223. }
  224. }
  225. func (d *Daemon) fetchAndApplyConfig() {
  226. if d.state.DeviceToken == "" {
  227. return
  228. }
  229. serverCfg, err := d.client.GetConfig(d.state.DeviceID)
  230. if err != nil {
  231. // In LAN mode, suppress errors (server might be unreachable)
  232. if d.cfg.Mode == "lan" {
  233. // Silent - use local config
  234. return
  235. }
  236. if d.cfg.Debug {
  237. log.Printf("Config fetch failed: %v", err)
  238. }
  239. return
  240. }
  241. // Determine effective mode: force_cloud overrides local mode setting
  242. effectiveMode := d.cfg.Mode
  243. if effectiveMode == "" {
  244. effectiveMode = "cloud"
  245. }
  246. if serverCfg.ForceCloud {
  247. if effectiveMode == "lan" {
  248. log.Println("Server force_cloud enabled - switching to cloud mode")
  249. }
  250. effectiveMode = "cloud"
  251. }
  252. d.mu.Lock()
  253. // Track changes for scanner restart
  254. bleChanged := false
  255. wifiMonitorChanged := false
  256. wifiClientChanged := false
  257. if effectiveMode == "cloud" {
  258. // Cloud mode: server settings have priority
  259. bleChanged = d.cfg.BLE.Enabled != serverCfg.BLE.Enabled
  260. wifiMonitorChanged = d.cfg.WiFi.MonitorEnabled != serverCfg.WiFi.MonitorEnabled
  261. wifiClientChanged = d.cfg.WiFi.ClientEnabled != serverCfg.WiFi.ClientEnabled ||
  262. d.cfg.WiFi.SSID != serverCfg.WiFi.SSID ||
  263. d.cfg.WiFi.PSK != serverCfg.WiFi.PSK
  264. d.cfg.BLE.Enabled = serverCfg.BLE.Enabled
  265. d.cfg.BLE.BatchIntervalMs = serverCfg.BLE.BatchIntervalMs
  266. d.cfg.WiFi.MonitorEnabled = serverCfg.WiFi.MonitorEnabled
  267. d.cfg.WiFi.ClientEnabled = serverCfg.WiFi.ClientEnabled
  268. d.cfg.WiFi.SSID = serverCfg.WiFi.SSID
  269. d.cfg.WiFi.PSK = serverCfg.WiFi.PSK
  270. d.cfg.WiFi.BatchIntervalMs = serverCfg.WiFi.BatchIntervalMs
  271. d.cfg.Debug = serverCfg.Debug
  272. // NTP from server in cloud mode
  273. if len(serverCfg.Net.NTP.Servers) > 0 {
  274. d.cfg.Network.NTPServers = serverCfg.Net.NTP.Servers
  275. }
  276. }
  277. // LAN mode: local settings have priority, we keep what's in d.cfg
  278. // Only SSH tunnel comes from server (for remote support)
  279. // SSH tunnel ALWAYS from server (for remote support access)
  280. sshChanged := d.cfg.SSHTunnel.Enabled != serverCfg.SSHTunnel.Enabled
  281. if serverCfg.SSHTunnel.Enabled {
  282. d.cfg.SSHTunnel.Enabled = true
  283. d.cfg.SSHTunnel.Server = serverCfg.SSHTunnel.Server
  284. d.cfg.SSHTunnel.Port = serverCfg.SSHTunnel.Port
  285. d.cfg.SSHTunnel.User = serverCfg.SSHTunnel.User
  286. d.cfg.SSHTunnel.RemotePort = serverCfg.SSHTunnel.RemotePort
  287. d.cfg.SSHTunnel.KeepaliveInterval = serverCfg.SSHTunnel.KeepaliveInterval
  288. } else {
  289. d.cfg.SSHTunnel.Enabled = false
  290. }
  291. // Dashboard ALWAYS from server (for remote management)
  292. dashboardChanged := d.cfg.Dashboard.Enabled != serverCfg.Dashboard.Enabled
  293. d.cfg.Dashboard.Enabled = serverCfg.Dashboard.Enabled
  294. d.mu.Unlock()
  295. // Apply BLE scanner changes (not managed by Network Manager)
  296. if bleChanged {
  297. log.Printf("BLE config changed (mode=%s): enabled=%v", effectiveMode, d.cfg.BLE.Enabled)
  298. if d.cfg.BLE.Enabled {
  299. if !d.scanners.IsBLERunning() {
  300. log.Println("Starting BLE scanner...")
  301. d.scanners.StartBLE(d.cfg.ZMQAddrBLE)
  302. }
  303. } else {
  304. if d.scanners.IsBLERunning() {
  305. log.Println("Stopping BLE scanner...")
  306. d.scanners.StopBLE()
  307. }
  308. }
  309. }
  310. // Log WiFi config changes (Network Manager will handle automatically)
  311. if wifiMonitorChanged || wifiClientChanged {
  312. log.Printf("WiFi config changed (mode=%s): monitor=%v client=%v ssid=%s",
  313. effectiveMode, d.cfg.WiFi.MonitorEnabled, d.cfg.WiFi.ClientEnabled, d.cfg.WiFi.SSID)
  314. log.Println("Network Manager will apply changes automatically")
  315. }
  316. // Update tunnel config
  317. d.tunnel.UpdateConfig(d.cfg)
  318. if sshChanged {
  319. if d.cfg.SSHTunnel.Enabled {
  320. log.Println("SSH tunnel enabled by server")
  321. d.tunnel.Start()
  322. } else {
  323. log.Println("SSH tunnel disabled by server")
  324. d.tunnel.Stop()
  325. }
  326. }
  327. // Update dashboard state
  328. if dashboardChanged {
  329. if d.cfg.Dashboard.Enabled {
  330. if !d.api.IsRunning() {
  331. log.Println("Dashboard enabled by server - starting HTTP API")
  332. go func() {
  333. if err := d.api.Start(d.httpAddr); err != nil && err != http.ErrServerClosed {
  334. log.Printf("HTTP server error: %v", err)
  335. }
  336. }()
  337. }
  338. } else {
  339. if d.api.IsRunning() {
  340. log.Println("Dashboard disabled by server - stopping HTTP API")
  341. d.api.Stop()
  342. }
  343. }
  344. }
  345. // Update network manager config - it will handle all network changes automatically
  346. // (eth0 settings are local-only, never from server)
  347. // (WiFi client and scanner coordination handled by Network Manager)
  348. d.netmgr.UpdateConfig(d.cfg)
  349. // Save updated config
  350. SaveConfig(d.configPath, d.cfg)
  351. }
  352. func (d *Daemon) subscribeLoop(name string, addr string) {
  353. for {
  354. select {
  355. case <-d.stopChan:
  356. return
  357. default:
  358. }
  359. // Only try to connect if the corresponding scanner is running
  360. scannerRunning := false
  361. if name == "ble" {
  362. scannerRunning = d.scanners.IsBLERunning()
  363. } else if name == "wifi" {
  364. scannerRunning = d.scanners.IsWiFiRunning()
  365. }
  366. if !scannerRunning {
  367. // Wait before checking again
  368. time.Sleep(5 * time.Second)
  369. continue
  370. }
  371. if err := d.runSubscriber(name, addr); err != nil {
  372. log.Printf("[%s] Subscriber error: %v, reconnecting...", name, err)
  373. time.Sleep(time.Second)
  374. }
  375. }
  376. }
  377. func (d *Daemon) runSubscriber(name string, addr string) error {
  378. ctx, cancel := context.WithCancel(context.Background())
  379. defer cancel()
  380. sub := zmq4.NewSub(ctx)
  381. defer sub.Close()
  382. // Subscribe to all topics for this type
  383. if err := sub.SetOption(zmq4.OptionSubscribe, name+"."); err != nil {
  384. return err
  385. }
  386. if err := sub.Dial(addr); err != nil {
  387. return err
  388. }
  389. log.Printf("[%s] Connected to %s", name, addr)
  390. // Monitor stop channel in goroutine
  391. go func() {
  392. <-d.stopChan
  393. cancel()
  394. }()
  395. for {
  396. msg, err := sub.Recv()
  397. if err != nil {
  398. return err
  399. }
  400. // Message is in first frame
  401. data := string(msg.Frames[0])
  402. // Parse message: "topic JSON"
  403. parts := strings.SplitN(data, " ", 2)
  404. if len(parts) != 2 {
  405. continue
  406. }
  407. var event interface{}
  408. if err := json.Unmarshal([]byte(parts[1]), &event); err != nil {
  409. continue
  410. }
  411. d.mu.Lock()
  412. if name == "ble" {
  413. d.bleEvents = append(d.bleEvents, event)
  414. if d.cfg.Debug {
  415. log.Printf("[%s] Received event, queue size: %d", name, len(d.bleEvents))
  416. }
  417. // Add to API for WebSocket broadcast
  418. if d.api != nil {
  419. d.api.AddBLEEvent(event)
  420. }
  421. } else {
  422. d.wifiEvents = append(d.wifiEvents, event)
  423. if d.cfg.Debug {
  424. log.Printf("[%s] Received event, queue size: %d", name, len(d.wifiEvents))
  425. }
  426. // Add to API for WebSocket broadcast
  427. if d.api != nil {
  428. d.api.AddWiFiEvent(event)
  429. }
  430. }
  431. d.mu.Unlock()
  432. }
  433. }
  434. func (d *Daemon) uploadLoop(name string, endpoint string, intervalMs int) {
  435. if intervalMs <= 0 {
  436. intervalMs = 2500
  437. }
  438. ticker := time.NewTicker(time.Duration(intervalMs) * time.Millisecond)
  439. defer ticker.Stop()
  440. for {
  441. select {
  442. case <-d.stopChan:
  443. return
  444. case <-ticker.C:
  445. }
  446. // Skip upload if not registered yet
  447. if d.state.DeviceToken == "" {
  448. continue
  449. }
  450. d.mu.Lock()
  451. var events []interface{}
  452. if name == "ble" {
  453. events = d.bleEvents
  454. d.bleEvents = nil
  455. } else {
  456. events = d.wifiEvents
  457. d.wifiEvents = nil
  458. }
  459. d.mu.Unlock()
  460. if len(events) == 0 {
  461. continue
  462. }
  463. if d.cfg.Debug {
  464. log.Printf("[%s] Batch ready: %d events, uploading...", name, len(events))
  465. }
  466. batch := &EventBatch{
  467. DeviceID: d.state.DeviceID,
  468. Events: events,
  469. }
  470. if err := d.client.UploadEvents(endpoint, batch); err != nil {
  471. // Increment failure counter
  472. if name == "ble" {
  473. d.bleUploadFailures++
  474. } else {
  475. d.wifiUploadFailures++
  476. }
  477. // Log only every 6th failure (once per minute) to reduce spam when network is down
  478. failCount := d.bleUploadFailures
  479. if name != "ble" {
  480. failCount = d.wifiUploadFailures
  481. }
  482. if failCount == 1 || failCount%6 == 0 {
  483. log.Printf("[%s] Upload failed: %v, spooling %d events (failures: %d)", name, err, len(events), failCount)
  484. }
  485. if err := d.spooler.Save(batch, name); err != nil {
  486. log.Printf("[%s] Spool save failed: %v", name, err)
  487. }
  488. } else {
  489. // Reset failure counter on success
  490. if name == "ble" {
  491. d.bleUploadFailures = 0
  492. } else {
  493. d.wifiUploadFailures = 0
  494. }
  495. log.Printf("[%s] Uploaded %d events to server", name, len(events))
  496. }
  497. }
  498. }
  499. func (d *Daemon) spoolFlushLoop() {
  500. ticker := time.NewTicker(10 * time.Second)
  501. defer ticker.Stop()
  502. for {
  503. select {
  504. case <-d.stopChan:
  505. return
  506. case <-ticker.C:
  507. }
  508. if d.state.DeviceToken == "" {
  509. continue
  510. }
  511. // Try to flush one batch
  512. batch, err := d.spooler.PopOldest()
  513. if err != nil || batch == nil {
  514. continue
  515. }
  516. // Try to upload (guess endpoint from event types)
  517. endpoint := "/events"
  518. eventType := "unknown"
  519. if len(batch.Events) > 0 {
  520. if ev, ok := batch.Events[0].(map[string]interface{}); ok {
  521. if t, ok := ev["type"].(string); ok {
  522. if strings.HasPrefix(t, "wifi") {
  523. endpoint = "/wifi"
  524. eventType = "wifi"
  525. } else {
  526. endpoint = "/ble"
  527. eventType = "ble"
  528. }
  529. }
  530. }
  531. }
  532. if err := d.client.UploadEvents(endpoint, batch); err != nil {
  533. // Re-spool if upload failed
  534. d.spooler.Save(batch, "retry")
  535. } else {
  536. log.Printf("[spool] Flushed %d %s events to server", len(batch.Events), eventType)
  537. }
  538. }
  539. }
  540. // getDeviceID returns a device ID based on MAC address
  541. func getDeviceID() string {
  542. // Try wlan0 first, then eth0
  543. for _, iface := range []string{"wlan0", "eth0"} {
  544. if mac := getInterfaceMAC(iface); mac != "" {
  545. return mac
  546. }
  547. }
  548. // Fallback to hostname
  549. host, _ := os.Hostname()
  550. return host
  551. }
  552. func getInterfaceMAC(name string) string {
  553. iface, err := net.InterfaceByName(name)
  554. if err != nil {
  555. return ""
  556. }
  557. return iface.HardwareAddr.String()
  558. }
  559. func getInterfaceIP(name string) string {
  560. iface, err := net.InterfaceByName(name)
  561. if err != nil {
  562. return ""
  563. }
  564. addrs, err := iface.Addrs()
  565. if err != nil {
  566. return ""
  567. }
  568. for _, addr := range addrs {
  569. if ipnet, ok := addr.(*net.IPNet); ok && ipnet.IP.To4() != nil {
  570. // Return IP with CIDR notation (e.g., 192.168.5.244/24)
  571. ones, _ := ipnet.Mask.Size()
  572. return fmt.Sprintf("%s/%d", ipnet.IP.String(), ones)
  573. }
  574. }
  575. return ""
  576. }