// Beacon Daemon - collects events from scanners and uploads to server package main import ( "context" "encoding/json" "flag" "fmt" "log" "net" "os" "os/signal" "strings" "sync" "syscall" "time" "github.com/go-zeromq/zmq4" ) const ( defaultConfigPath = "/opt/mybeacon/etc/config.json" defaultStatePath = "/opt/mybeacon/etc/device.json" defaultBinDir = "/opt/mybeacon/bin" defaultWiFiIface = "wlan0" maxSpoolBytes = 100 * 1024 * 1024 // 100 MB ) type Daemon struct { cfg *Config state *DeviceState client *APIClient spooler *Spooler tunnel *SSHTunnel scanners *ScannerManager api *APIServer netmgr *NetworkManager bleEvents []interface{} wifiEvents []interface{} mu sync.Mutex configPath string statePath string stopChan chan struct{} } func main() { var ( configPath = flag.String("config", defaultConfigPath, "Config file path") statePath = flag.String("state", defaultStatePath, "Device state file path") serverAddr = flag.String("server", "", "API server address (e.g., http://192.168.5.2:5000)") binDir = flag.String("bindir", defaultBinDir, "Directory with scanner binaries") wifiIface = flag.String("wifi-iface", defaultWiFiIface, "WiFi interface for monitor mode") httpAddr = flag.String("http", ":8080", "HTTP API listen address") debug = flag.Bool("debug", false, "Enable debug logging") ) flag.Parse() log.SetFlags(log.Ltime) log.Println("================================================================================") log.Println("Beacon Daemon starting...") // Load configuration cfg, err := LoadConfig(*configPath) if err != nil { log.Printf("Warning: failed to load config: %v (using defaults)", err) cfg = DefaultConfig() } cfg.Debug = *debug || cfg.Debug // Override server address if provided if *serverAddr != "" { cfg.APIBase = *serverAddr + "/api/v1" log.Printf("Using server: %s", *serverAddr) } // Store WiFi interface cfg.WiFiIface = *wifiIface // Load device state state, err := LoadDeviceState(*statePath) if err != nil { log.Printf("Warning: failed to load state: %v", err) state = &DeviceState{} } // Get device ID from MAC if not set if state.DeviceID == "" { state.DeviceID = getDeviceID() SaveDeviceState(*statePath, state) } log.Printf("Device ID: %s", state.DeviceID) // Create spooler spooler, err := NewSpooler(cfg.SpoolDir, maxSpoolBytes) if err != nil { log.Fatalf("Failed to create spooler: %v", err) } // Create SSH tunnel manager tunnel := NewSSHTunnel(cfg) // Create scanner manager scanners := NewScannerManager(*binDir, cfg.Debug) // Create network manager (manages eth0, wlan0 client, wlan0 AP fallback) netmgr := NewNetworkManager(cfg, scanners) // Create daemon daemon := &Daemon{ cfg: cfg, state: state, client: NewAPIClient(cfg.APIBase), spooler: spooler, tunnel: tunnel, scanners: scanners, netmgr: netmgr, configPath: *configPath, statePath: *statePath, stopChan: make(chan struct{}), } // Create API server daemon.api = NewAPIServer(daemon) if state.DeviceToken != "" { daemon.client.SetToken(state.DeviceToken) } // Handle signals sigChan := make(chan os.Signal, 1) signal.Notify(sigChan, syscall.SIGINT, syscall.SIGTERM) go func() { <-sigChan log.Println("Shutting down...") daemon.scanners.StopAll() daemon.tunnel.Stop() daemon.netmgr.Stop() close(daemon.stopChan) }() // Start HTTP API server FIRST (dashboard should be available immediately) go func() { log.Printf("Starting HTTP API server on %s", *httpAddr) if err := daemon.api.Start(*httpAddr); err != nil { log.Printf("HTTP server error: %v", err) } }() // Give HTTP server time to bind time.Sleep(100 * time.Millisecond) // Start network manager (manages eth0, wlan0 client, wlan0 AP fallback, WiFi scanner) // Network manager will automatically handle all network priorities and scanner coordination daemon.netmgr.Start() // Start SSH tunnel if enabled if cfg.SSHTunnel.Enabled { daemon.tunnel.Start() } // Start BLE scanner if enabled (not managed by network manager) if cfg.BLE.Enabled { if !daemon.scanners.IsBLERunning() { log.Println("Starting BLE scanner...") if err := daemon.scanners.StartBLE(cfg.ZMQAddrBLE); err != nil { log.Printf("Failed to start BLE scanner: %v", err) } } } // Start registration loop (if not registered) go daemon.registrationLoop() // Start config polling loop go daemon.configLoop() // Start ZMQ subscribers go daemon.subscribeLoop("ble", cfg.ZMQAddrBLE) go daemon.subscribeLoop("wifi", cfg.ZMQAddrWiFi) // Start batch upload loops go daemon.uploadLoop("ble", "/ble", cfg.BLE.BatchIntervalMs) go daemon.uploadLoop("wifi", "/wifi", cfg.WiFi.BatchIntervalMs) // Start spool flush loop go daemon.spoolFlushLoop() // Wait for shutdown <-daemon.stopChan log.Println("Daemon stopped") } // applyScannerConfig starts/stops scanners based on current config func (d *Daemon) applyScannerConfig() { d.mu.Lock() bleEnabled := d.cfg.BLE.Enabled wifiMonitorEnabled := d.cfg.WiFi.MonitorEnabled wifiClientEnabled := d.cfg.WiFi.ClientEnabled zmqBLE := d.cfg.ZMQAddrBLE zmqWiFi := d.cfg.ZMQAddrWiFi wifiIface := d.cfg.WiFiIface d.mu.Unlock() // BLE scanner if bleEnabled { if !d.scanners.IsBLERunning() { log.Println("Starting BLE scanner...") if err := d.scanners.StartBLE(zmqBLE); err != nil { log.Printf("Failed to start BLE scanner: %v", err) } } } else { if d.scanners.IsBLERunning() { log.Println("Stopping BLE scanner...") d.scanners.StopBLE() } } // WiFi scanner - NEVER run if WiFi client is enabled (they share wlan0) // Client mode takes priority over monitor mode if wifiClientEnabled { if d.scanners.IsWiFiRunning() { log.Println("Stopping WiFi scanner (client mode enabled in config)...") d.scanners.StopWiFi() } return } // WiFi scanner - only if monitor enabled AND client disabled // Client has HIGHER priority - if both enabled, client works, scanner doesn't if wifiMonitorEnabled { if !d.scanners.IsWiFiRunning() { log.Println("Starting WiFi scanner...") if err := d.scanners.StartWiFi(zmqWiFi, wifiIface); err != nil { log.Printf("Failed to start WiFi scanner: %v", err) } } } else { if d.scanners.IsWiFiRunning() { log.Println("Stopping WiFi scanner (monitor disabled)...") d.scanners.StopWiFi() } } } // applyWiFiClientState checks if WiFi client state matches config and applies changes func (d *Daemon) applyWiFiClientState() { d.mu.Lock() clientEnabled := d.cfg.WiFi.ClientEnabled ssid := d.cfg.WiFi.SSID psk := d.cfg.WiFi.PSK d.mu.Unlock() // Check if wlan0 has an IP (meaning it's connected) wlan0IP := getInterfaceIP("wlan0") isConnected := wlan0IP != "" if clientEnabled && ssid != "" { // Should be connected if !isConnected { // Stop WiFi scanner before connecting client (they can't coexist) if d.scanners.IsWiFiRunning() { log.Println("Stopping WiFi scanner before connecting client...") d.scanners.StopWiFi() } log.Printf("WiFi client should be connected (SSID=%s), connecting...", ssid) if err := applyWiFiSettings(ssid, psk); err != nil { log.Printf("Failed to connect WiFi client: %v", err) } } } else { // Should be disconnected if isConnected { log.Println("WiFi client should be disconnected, disconnecting...") disconnectWiFiClient() } } } func (d *Daemon) registrationLoop() { for { select { case <-d.stopChan: return default: } if d.state.DeviceToken != "" { time.Sleep(60 * time.Second) continue } log.Println("Attempting device registration...") req := &RegistrationRequest{ DeviceID: d.state.DeviceID, } // Try to get IPs if ip := getInterfaceIP("eth0"); ip != "" { req.EthIP = &ip } if ip := getInterfaceIP("wlan0"); ip != "" { req.WlanIP = &ip } resp, err := d.client.Register(req) if err != nil { log.Printf("Registration failed: %v", err) time.Sleep(10 * time.Second) continue } d.state.DeviceToken = resp.DeviceToken d.state.DevicePassword = resp.DevicePassword d.client.SetToken(resp.DeviceToken) SaveDeviceState(d.statePath, d.state) log.Printf("Device registered, token received") } } func (d *Daemon) configLoop() { // Initial fetch immediately d.fetchAndApplyConfig() ticker := time.NewTicker(30 * time.Second) defer ticker.Stop() for { select { case <-d.stopChan: return case <-ticker.C: } d.fetchAndApplyConfig() } } func (d *Daemon) fetchAndApplyConfig() { if d.state.DeviceToken == "" { return } serverCfg, err := d.client.GetConfig(d.state.DeviceID) if err != nil { // In LAN mode, suppress errors (server might be unreachable) if d.cfg.Mode == "lan" { // Silent - use local config return } if d.cfg.Debug { log.Printf("Config fetch failed: %v", err) } return } // Determine effective mode: force_cloud overrides local mode setting effectiveMode := d.cfg.Mode if effectiveMode == "" { effectiveMode = "cloud" } if serverCfg.ForceCloud { if effectiveMode == "lan" { log.Println("Server force_cloud enabled - switching to cloud mode") } effectiveMode = "cloud" } d.mu.Lock() // Track changes for scanner restart bleChanged := false wifiMonitorChanged := false wifiClientChanged := false oldClientEnabled := d.cfg.WiFi.ClientEnabled if effectiveMode == "cloud" { // Cloud mode: server settings have priority bleChanged = d.cfg.BLE.Enabled != serverCfg.BLE.Enabled wifiMonitorChanged = d.cfg.WiFi.MonitorEnabled != serverCfg.WiFi.MonitorEnabled wifiClientChanged = d.cfg.WiFi.ClientEnabled != serverCfg.WiFi.ClientEnabled || d.cfg.WiFi.SSID != serverCfg.WiFi.SSID || d.cfg.WiFi.PSK != serverCfg.WiFi.PSK d.cfg.BLE.Enabled = serverCfg.BLE.Enabled d.cfg.BLE.BatchIntervalMs = serverCfg.BLE.BatchIntervalMs d.cfg.WiFi.MonitorEnabled = serverCfg.WiFi.MonitorEnabled d.cfg.WiFi.ClientEnabled = serverCfg.WiFi.ClientEnabled d.cfg.WiFi.SSID = serverCfg.WiFi.SSID d.cfg.WiFi.PSK = serverCfg.WiFi.PSK d.cfg.WiFi.BatchIntervalMs = serverCfg.WiFi.BatchIntervalMs d.cfg.Debug = serverCfg.Debug // NTP from server in cloud mode if len(serverCfg.Net.NTP.Servers) > 0 { d.cfg.Network.NTPServers = serverCfg.Net.NTP.Servers } } // LAN mode: local settings have priority, we keep what's in d.cfg // Only SSH tunnel comes from server (for remote support) // SSH tunnel ALWAYS from server (for remote support access) sshChanged := d.cfg.SSHTunnel.Enabled != serverCfg.SSHTunnel.Enabled if serverCfg.SSHTunnel.Enabled { d.cfg.SSHTunnel.Enabled = true d.cfg.SSHTunnel.Server = serverCfg.SSHTunnel.Server d.cfg.SSHTunnel.Port = serverCfg.SSHTunnel.Port d.cfg.SSHTunnel.User = serverCfg.SSHTunnel.User d.cfg.SSHTunnel.RemotePort = serverCfg.SSHTunnel.RemotePort d.cfg.SSHTunnel.KeepaliveInterval = serverCfg.SSHTunnel.KeepaliveInterval } else { d.cfg.SSHTunnel.Enabled = false } d.mu.Unlock() // Apply scanner config changes if bleChanged || wifiMonitorChanged { log.Printf("Config changed (mode=%s): ble=%v wifi_monitor=%v", effectiveMode, d.cfg.BLE.Enabled, d.cfg.WiFi.MonitorEnabled) d.applyScannerConfig() } // Apply WiFi client changes if wifiClientChanged { if d.cfg.WiFi.ClientEnabled && d.cfg.WiFi.SSID != "" { // Stop WiFi scanner before connecting client if d.scanners.IsWiFiRunning() { log.Println("Stopping WiFi scanner before connecting client...") d.scanners.StopWiFi() } log.Printf("WiFi client enabled by server: SSID=%s", d.cfg.WiFi.SSID) if err := applyWiFiSettings(d.cfg.WiFi.SSID, d.cfg.WiFi.PSK); err != nil { log.Printf("Failed to apply WiFi client settings: %v", err) } // After client connects, update scanner config (will skip WiFi scanner) d.applyScannerConfig() } else if oldClientEnabled && !d.cfg.WiFi.ClientEnabled { log.Println("WiFi client disabled by server") disconnectWiFiClient() // After client disconnects, WiFi scanner may start (if monitor enabled) d.applyScannerConfig() } } // Update tunnel config d.tunnel.UpdateConfig(d.cfg) if sshChanged { if d.cfg.SSHTunnel.Enabled { log.Println("SSH tunnel enabled by server") d.tunnel.Start() } else { log.Println("SSH tunnel disabled by server") d.tunnel.Stop() } } // Update network manager config (eth0 settings are local-only, never from server) d.netmgr.UpdateConfig(d.cfg) // Save updated config SaveConfig(d.configPath, d.cfg) } func (d *Daemon) subscribeLoop(name string, addr string) { for { select { case <-d.stopChan: return default: } // Only try to connect if the corresponding scanner is running scannerRunning := false if name == "ble" { scannerRunning = d.scanners.IsBLERunning() } else if name == "wifi" { scannerRunning = d.scanners.IsWiFiRunning() } if !scannerRunning { // Wait before checking again time.Sleep(5 * time.Second) continue } if err := d.runSubscriber(name, addr); err != nil { log.Printf("[%s] Subscriber error: %v, reconnecting...", name, err) time.Sleep(time.Second) } } } func (d *Daemon) runSubscriber(name string, addr string) error { ctx, cancel := context.WithCancel(context.Background()) defer cancel() sub := zmq4.NewSub(ctx) defer sub.Close() // Subscribe to all topics for this type if err := sub.SetOption(zmq4.OptionSubscribe, name+"."); err != nil { return err } if err := sub.Dial(addr); err != nil { return err } log.Printf("[%s] Connected to %s", name, addr) // Monitor stop channel in goroutine go func() { <-d.stopChan cancel() }() for { msg, err := sub.Recv() if err != nil { return err } // Message is in first frame data := string(msg.Frames[0]) // Parse message: "topic JSON" parts := strings.SplitN(data, " ", 2) if len(parts) != 2 { continue } var event interface{} if err := json.Unmarshal([]byte(parts[1]), &event); err != nil { continue } d.mu.Lock() if name == "ble" { d.bleEvents = append(d.bleEvents, event) if d.cfg.Debug { log.Printf("[%s] Received event, queue size: %d", name, len(d.bleEvents)) } // Add to API for WebSocket broadcast if d.api != nil { d.api.AddBLEEvent(event) } } else { d.wifiEvents = append(d.wifiEvents, event) if d.cfg.Debug { log.Printf("[%s] Received event, queue size: %d", name, len(d.wifiEvents)) } // Add to API for WebSocket broadcast if d.api != nil { d.api.AddWiFiEvent(event) } } d.mu.Unlock() } } func (d *Daemon) uploadLoop(name string, endpoint string, intervalMs int) { if intervalMs <= 0 { intervalMs = 2500 } ticker := time.NewTicker(time.Duration(intervalMs) * time.Millisecond) defer ticker.Stop() for { select { case <-d.stopChan: return case <-ticker.C: } d.mu.Lock() var events []interface{} if name == "ble" { events = d.bleEvents d.bleEvents = nil } else { events = d.wifiEvents d.wifiEvents = nil } d.mu.Unlock() if len(events) == 0 { continue } if d.cfg.Debug { log.Printf("[%s] Batch ready: %d events, uploading...", name, len(events)) } batch := &EventBatch{ DeviceID: d.state.DeviceID, Events: events, } if err := d.client.UploadEvents(endpoint, batch); err != nil { log.Printf("[%s] Upload failed: %v, spooling %d events", name, err, len(events)) if err := d.spooler.Save(batch, name); err != nil { log.Printf("[%s] Spool save failed: %v", name, err) } } else { log.Printf("[%s] Uploaded %d events to server", name, len(events)) } } } func (d *Daemon) spoolFlushLoop() { ticker := time.NewTicker(10 * time.Second) defer ticker.Stop() for { select { case <-d.stopChan: return case <-ticker.C: } if d.state.DeviceToken == "" { continue } // Try to flush one batch batch, err := d.spooler.PopOldest() if err != nil || batch == nil { continue } // Try to upload (guess endpoint from event types) endpoint := "/events" eventType := "unknown" if len(batch.Events) > 0 { if ev, ok := batch.Events[0].(map[string]interface{}); ok { if t, ok := ev["type"].(string); ok { if strings.HasPrefix(t, "wifi") { endpoint = "/wifi" eventType = "wifi" } else { endpoint = "/ble" eventType = "ble" } } } } if err := d.client.UploadEvents(endpoint, batch); err != nil { // Re-spool if upload failed d.spooler.Save(batch, "retry") } else { log.Printf("[spool] Flushed %d %s events to server", len(batch.Events), eventType) } } } // getDeviceID returns a device ID based on MAC address func getDeviceID() string { // Try wlan0 first, then eth0 for _, iface := range []string{"wlan0", "eth0"} { if mac := getInterfaceMAC(iface); mac != "" { return mac } } // Fallback to hostname host, _ := os.Hostname() return host } func getInterfaceMAC(name string) string { iface, err := net.InterfaceByName(name) if err != nil { return "" } return iface.HardwareAddr.String() } func getInterfaceIP(name string) string { iface, err := net.InterfaceByName(name) if err != nil { return "" } addrs, err := iface.Addrs() if err != nil { return "" } for _, addr := range addrs { if ipnet, ok := addr.(*net.IPNet); ok && ipnet.IP.To4() != nil { // Return IP with CIDR notation (e.g., 192.168.5.244/24) ones, _ := ipnet.Mask.Size() return fmt.Sprintf("%s/%d", ipnet.IP.String(), ones) } } return "" }