main.go 14 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608
  1. // Beacon Daemon - collects events from scanners and uploads to server
  2. package main
  3. import (
  4. "context"
  5. "encoding/json"
  6. "flag"
  7. "fmt"
  8. "log"
  9. "net"
  10. "os"
  11. "os/signal"
  12. "strings"
  13. "sync"
  14. "syscall"
  15. "time"
  16. "github.com/go-zeromq/zmq4"
  17. )
  18. const (
  19. defaultConfigPath = "/opt/mybeacon/etc/config.json"
  20. defaultStatePath = "/opt/mybeacon/etc/device.json"
  21. defaultBinDir = "/opt/mybeacon/bin"
  22. defaultWiFiIface = "wlan0"
  23. maxSpoolBytes = 100 * 1024 * 1024 // 100 MB
  24. )
  25. type Daemon struct {
  26. cfg *Config
  27. state *DeviceState
  28. client *APIClient
  29. spooler *Spooler
  30. tunnel *SSHTunnel
  31. scanners *ScannerManager
  32. api *APIServer
  33. netmgr *NetworkManager
  34. bleEvents []interface{}
  35. wifiEvents []interface{}
  36. mu sync.Mutex
  37. configPath string
  38. statePath string
  39. stopChan chan struct{}
  40. }
  41. func main() {
  42. var (
  43. configPath = flag.String("config", defaultConfigPath, "Config file path")
  44. statePath = flag.String("state", defaultStatePath, "Device state file path")
  45. serverAddr = flag.String("server", "", "API server address (e.g., http://192.168.5.2:5000)")
  46. binDir = flag.String("bindir", defaultBinDir, "Directory with scanner binaries")
  47. wifiIface = flag.String("wifi-iface", defaultWiFiIface, "WiFi interface for monitor mode")
  48. httpAddr = flag.String("http", ":8080", "HTTP API listen address")
  49. debug = flag.Bool("debug", false, "Enable debug logging")
  50. )
  51. flag.Parse()
  52. log.SetFlags(log.Ltime)
  53. log.Println("================================================================================")
  54. log.Println("Beacon Daemon starting...")
  55. // Load configuration
  56. cfg, err := LoadConfig(*configPath)
  57. if err != nil {
  58. log.Printf("Warning: failed to load config: %v (using defaults)", err)
  59. cfg = DefaultConfig()
  60. }
  61. cfg.Debug = *debug || cfg.Debug
  62. // Override server address if provided
  63. if *serverAddr != "" {
  64. cfg.APIBase = *serverAddr + "/api/v1"
  65. log.Printf("Using server: %s", *serverAddr)
  66. }
  67. // Store WiFi interface
  68. cfg.WiFiIface = *wifiIface
  69. // Load device state
  70. state, err := LoadDeviceState(*statePath)
  71. if err != nil {
  72. log.Printf("Warning: failed to load state: %v", err)
  73. state = &DeviceState{}
  74. }
  75. // Get device ID from MAC if not set
  76. if state.DeviceID == "" {
  77. state.DeviceID = getDeviceID()
  78. SaveDeviceState(*statePath, state)
  79. }
  80. log.Printf("Device ID: %s", state.DeviceID)
  81. // Create spooler
  82. spooler, err := NewSpooler(cfg.SpoolDir, maxSpoolBytes)
  83. if err != nil {
  84. log.Fatalf("Failed to create spooler: %v", err)
  85. }
  86. // Create SSH tunnel manager
  87. tunnel := NewSSHTunnel(cfg)
  88. // Create scanner manager
  89. scanners := NewScannerManager(*binDir, cfg.Debug)
  90. // Create network manager (manages eth0, wlan0 client, wlan0 AP fallback)
  91. netmgr := NewNetworkManager(cfg, scanners)
  92. // Create daemon
  93. daemon := &Daemon{
  94. cfg: cfg,
  95. state: state,
  96. client: NewAPIClient(cfg.APIBase),
  97. spooler: spooler,
  98. tunnel: tunnel,
  99. scanners: scanners,
  100. netmgr: netmgr,
  101. configPath: *configPath,
  102. statePath: *statePath,
  103. stopChan: make(chan struct{}),
  104. }
  105. // Create API server
  106. daemon.api = NewAPIServer(daemon)
  107. if state.DeviceToken != "" {
  108. daemon.client.SetToken(state.DeviceToken)
  109. }
  110. // Handle signals
  111. sigChan := make(chan os.Signal, 1)
  112. signal.Notify(sigChan, syscall.SIGINT, syscall.SIGTERM)
  113. go func() {
  114. <-sigChan
  115. log.Println("Shutting down...")
  116. daemon.scanners.StopAll()
  117. daemon.tunnel.Stop()
  118. daemon.netmgr.Stop()
  119. close(daemon.stopChan)
  120. }()
  121. // Start HTTP API server FIRST (dashboard should be available immediately)
  122. go func() {
  123. log.Printf("Starting HTTP API server on %s", *httpAddr)
  124. if err := daemon.api.Start(*httpAddr); err != nil {
  125. log.Printf("HTTP server error: %v", err)
  126. }
  127. }()
  128. // Give HTTP server time to bind
  129. time.Sleep(100 * time.Millisecond)
  130. // Start network manager (manages eth0, wlan0 client, wlan0 AP fallback, WiFi scanner)
  131. // Network manager will automatically handle all network priorities and scanner coordination
  132. daemon.netmgr.Start()
  133. // Start SSH tunnel if enabled
  134. if cfg.SSHTunnel.Enabled {
  135. daemon.tunnel.Start()
  136. }
  137. // Start BLE scanner if enabled (not managed by network manager)
  138. if cfg.BLE.Enabled {
  139. if !daemon.scanners.IsBLERunning() {
  140. log.Println("Starting BLE scanner...")
  141. if err := daemon.scanners.StartBLE(cfg.ZMQAddrBLE); err != nil {
  142. log.Printf("Failed to start BLE scanner: %v", err)
  143. }
  144. }
  145. }
  146. // Start registration loop (if not registered)
  147. go daemon.registrationLoop()
  148. // Start config polling loop
  149. go daemon.configLoop()
  150. // Start ZMQ subscribers
  151. go daemon.subscribeLoop("ble", cfg.ZMQAddrBLE)
  152. go daemon.subscribeLoop("wifi", cfg.ZMQAddrWiFi)
  153. // Start batch upload loops
  154. go daemon.uploadLoop("ble", "/ble", cfg.BLE.BatchIntervalMs)
  155. go daemon.uploadLoop("wifi", "/wifi", cfg.WiFi.BatchIntervalMs)
  156. // Start spool flush loop
  157. go daemon.spoolFlushLoop()
  158. // Wait for shutdown
  159. <-daemon.stopChan
  160. log.Println("Daemon stopped")
  161. }
  162. func (d *Daemon) registrationLoop() {
  163. for {
  164. select {
  165. case <-d.stopChan:
  166. return
  167. default:
  168. }
  169. if d.state.DeviceToken != "" {
  170. time.Sleep(60 * time.Second)
  171. continue
  172. }
  173. log.Println("Attempting device registration...")
  174. req := &RegistrationRequest{
  175. DeviceID: d.state.DeviceID,
  176. }
  177. // Try to get IPs
  178. if ip := getInterfaceIP("eth0"); ip != "" {
  179. req.EthIP = &ip
  180. }
  181. if ip := getInterfaceIP("wlan0"); ip != "" {
  182. req.WlanIP = &ip
  183. }
  184. resp, err := d.client.Register(req)
  185. if err != nil {
  186. log.Printf("Registration failed: %v", err)
  187. time.Sleep(10 * time.Second)
  188. continue
  189. }
  190. d.state.DeviceToken = resp.DeviceToken
  191. d.state.DevicePassword = resp.DevicePassword
  192. d.client.SetToken(resp.DeviceToken)
  193. SaveDeviceState(d.statePath, d.state)
  194. log.Printf("Device registered, token received")
  195. }
  196. }
  197. func (d *Daemon) configLoop() {
  198. // Initial fetch immediately
  199. d.fetchAndApplyConfig()
  200. ticker := time.NewTicker(30 * time.Second)
  201. defer ticker.Stop()
  202. for {
  203. select {
  204. case <-d.stopChan:
  205. return
  206. case <-ticker.C:
  207. }
  208. d.fetchAndApplyConfig()
  209. }
  210. }
  211. func (d *Daemon) fetchAndApplyConfig() {
  212. if d.state.DeviceToken == "" {
  213. return
  214. }
  215. serverCfg, err := d.client.GetConfig(d.state.DeviceID)
  216. if err != nil {
  217. // In LAN mode, suppress errors (server might be unreachable)
  218. if d.cfg.Mode == "lan" {
  219. // Silent - use local config
  220. return
  221. }
  222. if d.cfg.Debug {
  223. log.Printf("Config fetch failed: %v", err)
  224. }
  225. return
  226. }
  227. // Determine effective mode: force_cloud overrides local mode setting
  228. effectiveMode := d.cfg.Mode
  229. if effectiveMode == "" {
  230. effectiveMode = "cloud"
  231. }
  232. if serverCfg.ForceCloud {
  233. if effectiveMode == "lan" {
  234. log.Println("Server force_cloud enabled - switching to cloud mode")
  235. }
  236. effectiveMode = "cloud"
  237. }
  238. d.mu.Lock()
  239. // Track changes for scanner restart
  240. bleChanged := false
  241. wifiMonitorChanged := false
  242. wifiClientChanged := false
  243. if effectiveMode == "cloud" {
  244. // Cloud mode: server settings have priority
  245. bleChanged = d.cfg.BLE.Enabled != serverCfg.BLE.Enabled
  246. wifiMonitorChanged = d.cfg.WiFi.MonitorEnabled != serverCfg.WiFi.MonitorEnabled
  247. wifiClientChanged = d.cfg.WiFi.ClientEnabled != serverCfg.WiFi.ClientEnabled ||
  248. d.cfg.WiFi.SSID != serverCfg.WiFi.SSID ||
  249. d.cfg.WiFi.PSK != serverCfg.WiFi.PSK
  250. d.cfg.BLE.Enabled = serverCfg.BLE.Enabled
  251. d.cfg.BLE.BatchIntervalMs = serverCfg.BLE.BatchIntervalMs
  252. d.cfg.WiFi.MonitorEnabled = serverCfg.WiFi.MonitorEnabled
  253. d.cfg.WiFi.ClientEnabled = serverCfg.WiFi.ClientEnabled
  254. d.cfg.WiFi.SSID = serverCfg.WiFi.SSID
  255. d.cfg.WiFi.PSK = serverCfg.WiFi.PSK
  256. d.cfg.WiFi.BatchIntervalMs = serverCfg.WiFi.BatchIntervalMs
  257. d.cfg.Debug = serverCfg.Debug
  258. // NTP from server in cloud mode
  259. if len(serverCfg.Net.NTP.Servers) > 0 {
  260. d.cfg.Network.NTPServers = serverCfg.Net.NTP.Servers
  261. }
  262. }
  263. // LAN mode: local settings have priority, we keep what's in d.cfg
  264. // Only SSH tunnel comes from server (for remote support)
  265. // SSH tunnel ALWAYS from server (for remote support access)
  266. sshChanged := d.cfg.SSHTunnel.Enabled != serverCfg.SSHTunnel.Enabled
  267. if serverCfg.SSHTunnel.Enabled {
  268. d.cfg.SSHTunnel.Enabled = true
  269. d.cfg.SSHTunnel.Server = serverCfg.SSHTunnel.Server
  270. d.cfg.SSHTunnel.Port = serverCfg.SSHTunnel.Port
  271. d.cfg.SSHTunnel.User = serverCfg.SSHTunnel.User
  272. d.cfg.SSHTunnel.RemotePort = serverCfg.SSHTunnel.RemotePort
  273. d.cfg.SSHTunnel.KeepaliveInterval = serverCfg.SSHTunnel.KeepaliveInterval
  274. } else {
  275. d.cfg.SSHTunnel.Enabled = false
  276. }
  277. d.mu.Unlock()
  278. // Apply BLE scanner changes (not managed by Network Manager)
  279. if bleChanged {
  280. log.Printf("BLE config changed (mode=%s): enabled=%v", effectiveMode, d.cfg.BLE.Enabled)
  281. if d.cfg.BLE.Enabled {
  282. if !d.scanners.IsBLERunning() {
  283. log.Println("Starting BLE scanner...")
  284. d.scanners.StartBLE(d.cfg.ZMQAddrBLE)
  285. }
  286. } else {
  287. if d.scanners.IsBLERunning() {
  288. log.Println("Stopping BLE scanner...")
  289. d.scanners.StopBLE()
  290. }
  291. }
  292. }
  293. // Log WiFi config changes (Network Manager will handle automatically)
  294. if wifiMonitorChanged || wifiClientChanged {
  295. log.Printf("WiFi config changed (mode=%s): monitor=%v client=%v ssid=%s",
  296. effectiveMode, d.cfg.WiFi.MonitorEnabled, d.cfg.WiFi.ClientEnabled, d.cfg.WiFi.SSID)
  297. log.Println("Network Manager will apply changes automatically")
  298. }
  299. // Update tunnel config
  300. d.tunnel.UpdateConfig(d.cfg)
  301. if sshChanged {
  302. if d.cfg.SSHTunnel.Enabled {
  303. log.Println("SSH tunnel enabled by server")
  304. d.tunnel.Start()
  305. } else {
  306. log.Println("SSH tunnel disabled by server")
  307. d.tunnel.Stop()
  308. }
  309. }
  310. // Update network manager config - it will handle all network changes automatically
  311. // (eth0 settings are local-only, never from server)
  312. // (WiFi client and scanner coordination handled by Network Manager)
  313. d.netmgr.UpdateConfig(d.cfg)
  314. // Save updated config
  315. SaveConfig(d.configPath, d.cfg)
  316. }
  317. func (d *Daemon) subscribeLoop(name string, addr string) {
  318. for {
  319. select {
  320. case <-d.stopChan:
  321. return
  322. default:
  323. }
  324. // Only try to connect if the corresponding scanner is running
  325. scannerRunning := false
  326. if name == "ble" {
  327. scannerRunning = d.scanners.IsBLERunning()
  328. } else if name == "wifi" {
  329. scannerRunning = d.scanners.IsWiFiRunning()
  330. }
  331. if !scannerRunning {
  332. // Wait before checking again
  333. time.Sleep(5 * time.Second)
  334. continue
  335. }
  336. if err := d.runSubscriber(name, addr); err != nil {
  337. log.Printf("[%s] Subscriber error: %v, reconnecting...", name, err)
  338. time.Sleep(time.Second)
  339. }
  340. }
  341. }
  342. func (d *Daemon) runSubscriber(name string, addr string) error {
  343. ctx, cancel := context.WithCancel(context.Background())
  344. defer cancel()
  345. sub := zmq4.NewSub(ctx)
  346. defer sub.Close()
  347. // Subscribe to all topics for this type
  348. if err := sub.SetOption(zmq4.OptionSubscribe, name+"."); err != nil {
  349. return err
  350. }
  351. if err := sub.Dial(addr); err != nil {
  352. return err
  353. }
  354. log.Printf("[%s] Connected to %s", name, addr)
  355. // Monitor stop channel in goroutine
  356. go func() {
  357. <-d.stopChan
  358. cancel()
  359. }()
  360. for {
  361. msg, err := sub.Recv()
  362. if err != nil {
  363. return err
  364. }
  365. // Message is in first frame
  366. data := string(msg.Frames[0])
  367. // Parse message: "topic JSON"
  368. parts := strings.SplitN(data, " ", 2)
  369. if len(parts) != 2 {
  370. continue
  371. }
  372. var event interface{}
  373. if err := json.Unmarshal([]byte(parts[1]), &event); err != nil {
  374. continue
  375. }
  376. d.mu.Lock()
  377. if name == "ble" {
  378. d.bleEvents = append(d.bleEvents, event)
  379. if d.cfg.Debug {
  380. log.Printf("[%s] Received event, queue size: %d", name, len(d.bleEvents))
  381. }
  382. // Add to API for WebSocket broadcast
  383. if d.api != nil {
  384. d.api.AddBLEEvent(event)
  385. }
  386. } else {
  387. d.wifiEvents = append(d.wifiEvents, event)
  388. if d.cfg.Debug {
  389. log.Printf("[%s] Received event, queue size: %d", name, len(d.wifiEvents))
  390. }
  391. // Add to API for WebSocket broadcast
  392. if d.api != nil {
  393. d.api.AddWiFiEvent(event)
  394. }
  395. }
  396. d.mu.Unlock()
  397. }
  398. }
  399. func (d *Daemon) uploadLoop(name string, endpoint string, intervalMs int) {
  400. if intervalMs <= 0 {
  401. intervalMs = 2500
  402. }
  403. ticker := time.NewTicker(time.Duration(intervalMs) * time.Millisecond)
  404. defer ticker.Stop()
  405. for {
  406. select {
  407. case <-d.stopChan:
  408. return
  409. case <-ticker.C:
  410. }
  411. d.mu.Lock()
  412. var events []interface{}
  413. if name == "ble" {
  414. events = d.bleEvents
  415. d.bleEvents = nil
  416. } else {
  417. events = d.wifiEvents
  418. d.wifiEvents = nil
  419. }
  420. d.mu.Unlock()
  421. if len(events) == 0 {
  422. continue
  423. }
  424. if d.cfg.Debug {
  425. log.Printf("[%s] Batch ready: %d events, uploading...", name, len(events))
  426. }
  427. batch := &EventBatch{
  428. DeviceID: d.state.DeviceID,
  429. Events: events,
  430. }
  431. if err := d.client.UploadEvents(endpoint, batch); err != nil {
  432. log.Printf("[%s] Upload failed: %v, spooling %d events", name, err, len(events))
  433. if err := d.spooler.Save(batch, name); err != nil {
  434. log.Printf("[%s] Spool save failed: %v", name, err)
  435. }
  436. } else {
  437. log.Printf("[%s] Uploaded %d events to server", name, len(events))
  438. }
  439. }
  440. }
  441. func (d *Daemon) spoolFlushLoop() {
  442. ticker := time.NewTicker(10 * time.Second)
  443. defer ticker.Stop()
  444. for {
  445. select {
  446. case <-d.stopChan:
  447. return
  448. case <-ticker.C:
  449. }
  450. if d.state.DeviceToken == "" {
  451. continue
  452. }
  453. // Try to flush one batch
  454. batch, err := d.spooler.PopOldest()
  455. if err != nil || batch == nil {
  456. continue
  457. }
  458. // Try to upload (guess endpoint from event types)
  459. endpoint := "/events"
  460. eventType := "unknown"
  461. if len(batch.Events) > 0 {
  462. if ev, ok := batch.Events[0].(map[string]interface{}); ok {
  463. if t, ok := ev["type"].(string); ok {
  464. if strings.HasPrefix(t, "wifi") {
  465. endpoint = "/wifi"
  466. eventType = "wifi"
  467. } else {
  468. endpoint = "/ble"
  469. eventType = "ble"
  470. }
  471. }
  472. }
  473. }
  474. if err := d.client.UploadEvents(endpoint, batch); err != nil {
  475. // Re-spool if upload failed
  476. d.spooler.Save(batch, "retry")
  477. } else {
  478. log.Printf("[spool] Flushed %d %s events to server", len(batch.Events), eventType)
  479. }
  480. }
  481. }
  482. // getDeviceID returns a device ID based on MAC address
  483. func getDeviceID() string {
  484. // Try wlan0 first, then eth0
  485. for _, iface := range []string{"wlan0", "eth0"} {
  486. if mac := getInterfaceMAC(iface); mac != "" {
  487. return mac
  488. }
  489. }
  490. // Fallback to hostname
  491. host, _ := os.Hostname()
  492. return host
  493. }
  494. func getInterfaceMAC(name string) string {
  495. iface, err := net.InterfaceByName(name)
  496. if err != nil {
  497. return ""
  498. }
  499. return iface.HardwareAddr.String()
  500. }
  501. func getInterfaceIP(name string) string {
  502. iface, err := net.InterfaceByName(name)
  503. if err != nil {
  504. return ""
  505. }
  506. addrs, err := iface.Addrs()
  507. if err != nil {
  508. return ""
  509. }
  510. for _, addr := range addrs {
  511. if ipnet, ok := addr.(*net.IPNet); ok && ipnet.IP.To4() != nil {
  512. // Return IP with CIDR notation (e.g., 192.168.5.244/24)
  513. ones, _ := ipnet.Mask.Size()
  514. return fmt.Sprintf("%s/%d", ipnet.IP.String(), ones)
  515. }
  516. }
  517. return ""
  518. }