// Beacon Daemon - collects events from scanners and uploads to server package main import ( "context" "encoding/json" "flag" "fmt" "log" "net" "net/http" "os" "os/signal" "strings" "sync" "syscall" "time" "github.com/go-zeromq/zmq4" ) const ( defaultConfigPath = "/opt/mybeacon/etc/config.json" defaultStatePath = "/opt/mybeacon/etc/device.json" defaultBinDir = "/opt/mybeacon/bin" defaultWiFiIface = "wlan0" maxSpoolBytes = 100 * 1024 * 1024 // 100 MB ) type Daemon struct { cfg *Config state *DeviceState client *APIClient spooler *Spooler sshTunnel *SSHTunnel dashboardTunnel *SSHTunnel scanners *ScannerManager api *APIServer netmgr *NetworkManager bleEvents []interface{} wifiEvents []interface{} mu sync.Mutex configPath string statePath string httpAddr string // HTTP API listen address stopChan chan struct{} // Upload failure counters (for reducing log spam) bleUploadFailures int wifiUploadFailures int } func main() { var ( configPath = flag.String("config", defaultConfigPath, "Config file path") statePath = flag.String("state", defaultStatePath, "Device state file path") serverAddr = flag.String("server", "", "API server address (e.g., http://192.168.5.2:5000)") binDir = flag.String("bindir", defaultBinDir, "Directory with scanner binaries") wifiIface = flag.String("wifi-iface", defaultWiFiIface, "WiFi interface for monitor mode") httpAddr = flag.String("http", ":8080", "HTTP API listen address") debug = flag.Bool("debug", false, "Enable debug logging") ) flag.Parse() log.SetFlags(log.Ltime) log.Println("================================================================================") log.Println("Beacon Daemon starting...") // Load configuration cfg, err := LoadConfig(*configPath) if err != nil { log.Printf("Warning: failed to load config: %v (using defaults)", err) cfg = DefaultConfig() } cfg.Debug = *debug || cfg.Debug // Override server address if provided if *serverAddr != "" { cfg.APIBase = *serverAddr + "/api/v1" log.Printf("Using server: %s", *serverAddr) } // Store WiFi interface cfg.WiFiIface = *wifiIface // Load device state state, err := LoadDeviceState(*statePath) if err != nil { log.Printf("Warning: failed to load state: %v", err) state = &DeviceState{} } // Get device ID from MAC if not set if state.DeviceID == "" { state.DeviceID = getDeviceID() SaveDeviceState(*statePath, state) } log.Printf("Device ID: %s", state.DeviceID) // Create spooler spooler, err := NewSpooler(cfg.SpoolDir, maxSpoolBytes) if err != nil { log.Fatalf("Failed to create spooler: %v", err) } // Create API client client := NewAPIClient(cfg.APIBase) // Create SSH tunnel managers (will be started when enabled in config) sshTunnelCfg := &TunnelConfig{ Enabled: cfg.SSHTunnel.Enabled, Server: cfg.SSHTunnel.Server, Port: cfg.SSHTunnel.Port, User: cfg.SSHTunnel.User, RemotePort: cfg.SSHTunnel.RemotePort, KeepaliveInterval: cfg.SSHTunnel.KeepaliveInterval, ReconnectDelay: cfg.SSHTunnel.ReconnectDelay, } sshTunnel := NewSSHTunnel("ssh", 22, sshTunnelCfg) dashboardTunnelCfg := &TunnelConfig{ Enabled: cfg.DashboardTunnel.Enabled, Server: cfg.DashboardTunnel.Server, Port: cfg.DashboardTunnel.Port, User: cfg.DashboardTunnel.User, RemotePort: cfg.DashboardTunnel.RemotePort, KeepaliveInterval: cfg.DashboardTunnel.KeepaliveInterval, ReconnectDelay: cfg.DashboardTunnel.ReconnectDelay, } dashboardTunnel := NewSSHTunnel("dashboard", 80, dashboardTunnelCfg) // Create scanner manager scanners := NewScannerManager(*binDir, cfg.Debug) // Create network manager (manages eth0, wlan0 client, wlan0 AP fallback) netmgr := NewNetworkManager(cfg, scanners, state.DevicePassword) // Create daemon daemon := &Daemon{ cfg: cfg, state: state, client: client, spooler: spooler, sshTunnel: sshTunnel, dashboardTunnel: dashboardTunnel, scanners: scanners, netmgr: netmgr, configPath: *configPath, statePath: *statePath, httpAddr: *httpAddr, stopChan: make(chan struct{}), } // Create API server daemon.api = NewAPIServer(daemon) if state.DeviceToken != "" { daemon.client.SetToken(state.DeviceToken) } // Handle signals sigChan := make(chan os.Signal, 1) signal.Notify(sigChan, syscall.SIGINT, syscall.SIGTERM) go func() { <-sigChan log.Println("Shutting down...") daemon.api.Stop() daemon.scanners.StopAll() daemon.sshTunnel.Stop() daemon.dashboardTunnel.Stop() daemon.netmgr.Stop() close(daemon.stopChan) }() // Start HTTP API server if enabled (dashboard should be available immediately) if cfg.Dashboard.Enabled { go func() { log.Printf("Starting HTTP API server on %s", daemon.httpAddr) if err := daemon.api.Start(daemon.httpAddr); err != nil && err != http.ErrServerClosed { log.Printf("HTTP server error: %v", err) } }() // Give HTTP server time to bind time.Sleep(100 * time.Millisecond) } else { log.Println("Dashboard disabled - HTTP server not started") } // Start network manager (manages eth0, wlan0 client, wlan0 AP fallback, WiFi scanner) // Network manager will automatically handle all network priorities and scanner coordination daemon.netmgr.Start() // Start SSH tunnel if enabled if cfg.SSHTunnel.Enabled && cfg.SSHTunnel.RemotePort != 0 { daemon.sshTunnel.Start() } // Start Dashboard tunnel if enabled if cfg.DashboardTunnel.Enabled && cfg.DashboardTunnel.RemotePort != 0 { daemon.dashboardTunnel.Start() } // Start BLE scanner if enabled (not managed by network manager) if cfg.BLE.Enabled { if !daemon.scanners.IsBLERunning() { log.Println("Starting BLE scanner...") if err := daemon.scanners.StartBLE(cfg.ZMQAddrBLE); err != nil { log.Printf("Failed to start BLE scanner: %v", err) } } } // Start registration loop (if not registered) go daemon.registrationLoop() // Start config polling loop go daemon.configLoop() // Start ZMQ subscribers go daemon.subscribeLoop("ble", cfg.ZMQAddrBLE) go daemon.subscribeLoop("wifi", cfg.ZMQAddrWiFi) // Start batch upload loops go daemon.uploadLoop("ble", "/ble", cfg.BLE.BatchIntervalMs) go daemon.uploadLoop("wifi", "/wifi", cfg.WiFi.BatchIntervalMs) // Start spool flush loop go daemon.spoolFlushLoop() // Wait for shutdown <-daemon.stopChan log.Println("Daemon stopped") } func (d *Daemon) registrationLoop() { for { select { case <-d.stopChan: return default: } if d.state.DeviceToken != "" { time.Sleep(60 * time.Second) continue } log.Println("Attempting device registration...") // Generate or load SSH key pair sshKeyPath := "/etc/beacon/ssh_tunnel_ed25519" sshPubKey, err := GenerateOrLoadSSHKey(sshKeyPath) if err != nil { log.Printf("Failed to generate/load SSH key: %v", err) time.Sleep(10 * time.Second) continue } log.Printf("SSH key ready: %s", sshKeyPath) req := &RegistrationRequest{ DeviceID: d.state.DeviceID, SSHPublicKey: sshPubKey, } // Try to get IPs if ip := getInterfaceIP("eth0"); ip != "" { req.EthIP = &ip } if ip := getInterfaceIP("wlan0"); ip != "" { req.WlanIP = &ip } resp, err := d.client.Register(req) if err != nil { log.Printf("Registration failed: %v", err) time.Sleep(10 * time.Second) continue } d.state.DeviceToken = resp.DeviceToken d.state.DevicePassword = resp.DevicePassword d.client.SetToken(resp.DeviceToken) SaveDeviceState(d.statePath, d.state) log.Printf("Device registered, token received") // Immediately fetch config after registration log.Println("Fetching initial config from server...") d.fetchAndApplyConfig() } } func (d *Daemon) configLoop() { // Initial fetch immediately d.fetchAndApplyConfig() ticker := time.NewTicker(30 * time.Second) defer ticker.Stop() for { select { case <-d.stopChan: return case <-ticker.C: } d.fetchAndApplyConfig() } } func (d *Daemon) fetchAndApplyConfig() { if d.state.DeviceToken == "" { return } serverCfg, err := d.client.GetConfig(d.state.DeviceID) if err != nil { // In LAN mode, suppress errors (server might be unreachable) if d.cfg.Mode == "lan" { // Silent - use local config return } if d.cfg.Debug { log.Printf("Config fetch failed: %v", err) } return } // Determine effective mode: force_cloud overrides local mode setting effectiveMode := d.cfg.Mode if effectiveMode == "" { effectiveMode = "cloud" } if serverCfg.ForceCloud { if effectiveMode == "lan" { log.Println("Server force_cloud enabled - switching to cloud mode") } effectiveMode = "cloud" } d.mu.Lock() // Track changes for scanner restart bleChanged := false wifiMonitorChanged := false wifiClientChanged := false if effectiveMode == "cloud" { // Cloud mode: server settings have priority bleChanged = d.cfg.BLE.Enabled != serverCfg.BLE.Enabled wifiMonitorChanged = d.cfg.WiFi.MonitorEnabled != serverCfg.WiFi.MonitorEnabled wifiClientChanged = d.cfg.WiFi.ClientEnabled != serverCfg.WiFi.ClientEnabled || d.cfg.WiFi.SSID != serverCfg.WiFi.SSID || d.cfg.WiFi.PSK != serverCfg.WiFi.PSK d.cfg.BLE.Enabled = serverCfg.BLE.Enabled d.cfg.BLE.BatchIntervalMs = serverCfg.BLE.BatchIntervalMs d.cfg.WiFi.MonitorEnabled = serverCfg.WiFi.MonitorEnabled d.cfg.WiFi.ClientEnabled = serverCfg.WiFi.ClientEnabled d.cfg.WiFi.SSID = serverCfg.WiFi.SSID d.cfg.WiFi.PSK = serverCfg.WiFi.PSK d.cfg.WiFi.BatchIntervalMs = serverCfg.WiFi.BatchIntervalMs d.cfg.Debug = serverCfg.Debug // NTP from server in cloud mode if len(serverCfg.Net.NTP.Servers) > 0 { d.cfg.Network.NTPServers = serverCfg.Net.NTP.Servers } } // LAN mode: local settings have priority, we keep what's in d.cfg // Only tunnels come from server (for remote support) // SSH tunnel ALWAYS from server (for remote support access) sshChanged := d.cfg.SSHTunnel.Enabled != serverCfg.SSHTunnel.Enabled || d.cfg.SSHTunnel.RemotePort != serverCfg.SSHTunnel.RemotePort if serverCfg.SSHTunnel.Enabled { d.cfg.SSHTunnel.Enabled = true d.cfg.SSHTunnel.Server = serverCfg.SSHTunnel.Server d.cfg.SSHTunnel.Port = serverCfg.SSHTunnel.Port d.cfg.SSHTunnel.User = serverCfg.SSHTunnel.User d.cfg.SSHTunnel.RemotePort = serverCfg.SSHTunnel.RemotePort d.cfg.SSHTunnel.KeepaliveInterval = serverCfg.SSHTunnel.KeepaliveInterval } else { d.cfg.SSHTunnel.Enabled = false } // Dashboard tunnel ALWAYS from server (for remote dashboard access) dashboardTunnelChanged := d.cfg.DashboardTunnel.Enabled != serverCfg.DashboardTunnel.Enabled || d.cfg.DashboardTunnel.RemotePort != serverCfg.DashboardTunnel.RemotePort if serverCfg.DashboardTunnel.Enabled { d.cfg.DashboardTunnel.Enabled = true d.cfg.DashboardTunnel.Server = serverCfg.DashboardTunnel.Server d.cfg.DashboardTunnel.Port = serverCfg.DashboardTunnel.Port d.cfg.DashboardTunnel.User = serverCfg.DashboardTunnel.User d.cfg.DashboardTunnel.RemotePort = serverCfg.DashboardTunnel.RemotePort d.cfg.DashboardTunnel.KeepaliveInterval = serverCfg.DashboardTunnel.KeepaliveInterval } else { d.cfg.DashboardTunnel.Enabled = false } // Dashboard HTTP API ALWAYS from server (for remote management) dashboardChanged := d.cfg.Dashboard.Enabled != serverCfg.Dashboard.Enabled d.cfg.Dashboard.Enabled = serverCfg.Dashboard.Enabled d.mu.Unlock() // Apply BLE scanner changes (not managed by Network Manager) if bleChanged { log.Printf("BLE config changed (mode=%s): enabled=%v", effectiveMode, d.cfg.BLE.Enabled) if d.cfg.BLE.Enabled { if !d.scanners.IsBLERunning() { log.Println("Starting BLE scanner...") d.scanners.StartBLE(d.cfg.ZMQAddrBLE) } } else { if d.scanners.IsBLERunning() { log.Println("Stopping BLE scanner...") d.scanners.StopBLE() } } } // Log WiFi config changes (Network Manager will handle automatically) if wifiMonitorChanged || wifiClientChanged { log.Printf("WiFi config changed (mode=%s): monitor=%v client=%v ssid=%s", effectiveMode, d.cfg.WiFi.MonitorEnabled, d.cfg.WiFi.ClientEnabled, d.cfg.WiFi.SSID) log.Println("Network Manager will apply changes automatically") } // Update SSH tunnel config sshTunnelCfg := &TunnelConfig{ Enabled: d.cfg.SSHTunnel.Enabled, Server: d.cfg.SSHTunnel.Server, Port: d.cfg.SSHTunnel.Port, User: d.cfg.SSHTunnel.User, RemotePort: d.cfg.SSHTunnel.RemotePort, KeepaliveInterval: d.cfg.SSHTunnel.KeepaliveInterval, ReconnectDelay: d.cfg.SSHTunnel.ReconnectDelay, } d.sshTunnel.UpdateConfig(sshTunnelCfg) if sshChanged { if d.cfg.SSHTunnel.Enabled && d.cfg.SSHTunnel.RemotePort != 0 { log.Printf("SSH tunnel enabled by server (port %d)", d.cfg.SSHTunnel.RemotePort) d.sshTunnel.Restart() } else { log.Println("SSH tunnel disabled by server") d.sshTunnel.Stop() } } // Update Dashboard tunnel config dashboardTunnelCfg := &TunnelConfig{ Enabled: d.cfg.DashboardTunnel.Enabled, Server: d.cfg.DashboardTunnel.Server, Port: d.cfg.DashboardTunnel.Port, User: d.cfg.DashboardTunnel.User, RemotePort: d.cfg.DashboardTunnel.RemotePort, KeepaliveInterval: d.cfg.DashboardTunnel.KeepaliveInterval, ReconnectDelay: d.cfg.DashboardTunnel.ReconnectDelay, } d.dashboardTunnel.UpdateConfig(dashboardTunnelCfg) if dashboardTunnelChanged { if d.cfg.DashboardTunnel.Enabled && d.cfg.DashboardTunnel.RemotePort != 0 { log.Printf("Dashboard tunnel enabled by server (port %d)", d.cfg.DashboardTunnel.RemotePort) d.dashboardTunnel.Restart() } else { log.Println("Dashboard tunnel disabled by server") d.dashboardTunnel.Stop() } } // Update dashboard state if dashboardChanged { if d.cfg.Dashboard.Enabled { if !d.api.IsRunning() { log.Println("Dashboard enabled by server - starting HTTP API") go func() { if err := d.api.Start(d.httpAddr); err != nil && err != http.ErrServerClosed { log.Printf("HTTP server error: %v", err) } }() } } else { if d.api.IsRunning() { log.Println("Dashboard disabled by server - stopping HTTP API") d.api.Stop() } } } // Update network manager config - it will handle all network changes automatically // (eth0 settings are local-only, never from server) // (WiFi client and scanner coordination handled by Network Manager) d.netmgr.UpdateConfig(d.cfg) // Save updated config SaveConfig(d.configPath, d.cfg) } func (d *Daemon) subscribeLoop(name string, addr string) { for { select { case <-d.stopChan: return default: } // Only try to connect if the corresponding scanner is running scannerRunning := false if name == "ble" { scannerRunning = d.scanners.IsBLERunning() } else if name == "wifi" { scannerRunning = d.scanners.IsWiFiRunning() } if !scannerRunning { // Wait before checking again time.Sleep(5 * time.Second) continue } if err := d.runSubscriber(name, addr); err != nil { log.Printf("[%s] Subscriber error: %v, reconnecting...", name, err) time.Sleep(time.Second) } } } func (d *Daemon) runSubscriber(name string, addr string) error { ctx, cancel := context.WithCancel(context.Background()) defer cancel() sub := zmq4.NewSub(ctx) defer sub.Close() // Subscribe to all topics for this type if err := sub.SetOption(zmq4.OptionSubscribe, name+"."); err != nil { return err } if err := sub.Dial(addr); err != nil { return err } log.Printf("[%s] Connected to %s", name, addr) // Monitor stop channel in goroutine go func() { <-d.stopChan cancel() }() for { msg, err := sub.Recv() if err != nil { return err } // Message is in first frame data := string(msg.Frames[0]) // Parse message: "topic JSON" parts := strings.SplitN(data, " ", 2) if len(parts) != 2 { continue } var event interface{} if err := json.Unmarshal([]byte(parts[1]), &event); err != nil { continue } d.mu.Lock() if name == "ble" { d.bleEvents = append(d.bleEvents, event) if d.cfg.Debug { log.Printf("[%s] Received event, queue size: %d", name, len(d.bleEvents)) } // Add to API for WebSocket broadcast if d.api != nil { d.api.AddBLEEvent(event) } } else { d.wifiEvents = append(d.wifiEvents, event) if d.cfg.Debug { log.Printf("[%s] Received event, queue size: %d", name, len(d.wifiEvents)) } // Add to API for WebSocket broadcast if d.api != nil { d.api.AddWiFiEvent(event) } } d.mu.Unlock() } } func (d *Daemon) uploadLoop(name string, endpoint string, intervalMs int) { if intervalMs <= 0 { intervalMs = 2500 } ticker := time.NewTicker(time.Duration(intervalMs) * time.Millisecond) defer ticker.Stop() for { select { case <-d.stopChan: return case <-ticker.C: } // Skip upload if not registered yet if d.state.DeviceToken == "" { continue } d.mu.Lock() var events []interface{} if name == "ble" { events = d.bleEvents d.bleEvents = nil } else { events = d.wifiEvents d.wifiEvents = nil } d.mu.Unlock() if len(events) == 0 { continue } if d.cfg.Debug { log.Printf("[%s] Batch ready: %d events, uploading...", name, len(events)) } batch := &EventBatch{ DeviceID: d.state.DeviceID, Events: events, } if err := d.client.UploadEvents(endpoint, batch); err != nil { // Increment failure counter if name == "ble" { d.bleUploadFailures++ } else { d.wifiUploadFailures++ } // Log only every 6th failure (once per minute) to reduce spam when network is down failCount := d.bleUploadFailures if name != "ble" { failCount = d.wifiUploadFailures } if failCount == 1 || failCount%6 == 0 { log.Printf("[%s] Upload failed: %v, spooling %d events (failures: %d)", name, err, len(events), failCount) } if err := d.spooler.Save(batch, name); err != nil { log.Printf("[%s] Spool save failed: %v", name, err) } } else { // Reset failure counter on success if name == "ble" { d.bleUploadFailures = 0 } else { d.wifiUploadFailures = 0 } log.Printf("[%s] Uploaded %d events to server", name, len(events)) } } } func (d *Daemon) spoolFlushLoop() { ticker := time.NewTicker(10 * time.Second) defer ticker.Stop() for { select { case <-d.stopChan: return case <-ticker.C: } if d.state.DeviceToken == "" { continue } // Try to flush one batch batch, err := d.spooler.PopOldest() if err != nil || batch == nil { continue } // Try to upload (guess endpoint from event types) endpoint := "/events" eventType := "unknown" if len(batch.Events) > 0 { if ev, ok := batch.Events[0].(map[string]interface{}); ok { if t, ok := ev["type"].(string); ok { if strings.HasPrefix(t, "wifi") { endpoint = "/wifi" eventType = "wifi" } else { endpoint = "/ble" eventType = "ble" } } } } if err := d.client.UploadEvents(endpoint, batch); err != nil { // Re-spool if upload failed d.spooler.Save(batch, "retry") } else { log.Printf("[spool] Flushed %d %s events to server", len(batch.Events), eventType) } } } // updateWiFiCredentials sends WiFi credentials to server (Cloud Mode only) func (d *Daemon) updateWiFiCredentials(ssid, psk string) error { return d.client.UpdateWiFiCredentials(ssid, psk) } // getDeviceID returns a device ID based on MAC address func getDeviceID() string { // Try wlan0 first, then eth0 for _, iface := range []string{"wlan0", "eth0"} { if mac := getInterfaceMAC(iface); mac != "" { return mac } } // Fallback to hostname host, _ := os.Hostname() return host } func getInterfaceMAC(name string) string { iface, err := net.InterfaceByName(name) if err != nil { return "" } return iface.HardwareAddr.String() } func getInterfaceIP(name string) string { iface, err := net.InterfaceByName(name) if err != nil { return "" } addrs, err := iface.Addrs() if err != nil { return "" } for _, addr := range addrs { if ipnet, ok := addr.(*net.IPNet); ok && ipnet.IP.To4() != nil { // Return IP with CIDR notation (e.g., 192.168.5.244/24) ones, _ := ipnet.Mask.Size() return fmt.Sprintf("%s/%d", ipnet.IP.String(), ones) } } return "" }