||
- // Beacon Daemon - collects events from scanners and uploads to server
- package main
- import (
- "context"
- "encoding/json"
- "flag"
- "fmt"
- "log"
- "net"
- "os"
- "os/signal"
- "strings"
- "sync"
- "syscall"
- "time"
- "github.com/go-zeromq/zmq4"
- )
- const (
- defaultConfigPath = "/opt/mybeacon/etc/config.json"
- defaultStatePath = "/opt/mybeacon/etc/device.json"
- defaultBinDir = "/opt/mybeacon/bin"
- defaultWiFiIface = "wlan0"
- maxSpoolBytes = 100 * 1024 * 1024 // 100 MB
- )
- type Daemon struct {
- cfg *Config
- state *DeviceState
- client *APIClient
- spooler *Spooler
- tunnel *SSHTunnel
- scanners *ScannerManager
- api *APIServer
- netmgr *NetworkManager
- bleEvents []interface{}
- wifiEvents []interface{}
- mu sync.Mutex
- configPath string
- statePath string
- stopChan chan struct{}
- }
- func main() {
- var (
- configPath = flag.String("config", defaultConfigPath, "Config file path")
- statePath = flag.String("state", defaultStatePath, "Device state file path")
- serverAddr = flag.String("server", "", "API server address (e.g., http://192.168.5.2:5000)")
- binDir = flag.String("bindir", defaultBinDir, "Directory with scanner binaries")
- wifiIface = flag.String("wifi-iface", defaultWiFiIface, "WiFi interface for monitor mode")
- httpAddr = flag.String("http", ":8080", "HTTP API listen address")
- debug = flag.Bool("debug", false, "Enable debug logging")
- )
- flag.Parse()
- log.SetFlags(log.Ltime)
- log.Println("================================================================================")
- log.Println("Beacon Daemon starting...")
- // Load configuration
- cfg, err := LoadConfig(*configPath)
- if err != nil {
- log.Printf("Warning: failed to load config: %v (using defaults)", err)
- cfg = DefaultConfig()
- }
- cfg.Debug = *debug || cfg.Debug
- // Override server address if provided
- if *serverAddr != "" {
- cfg.APIBase = *serverAddr + "/api/v1"
- log.Printf("Using server: %s", *serverAddr)
- }
- // Store WiFi interface
- cfg.WiFiIface = *wifiIface
- // Load device state
- state, err := LoadDeviceState(*statePath)
- if err != nil {
- log.Printf("Warning: failed to load state: %v", err)
- state = &DeviceState{}
- }
- // Get device ID from MAC if not set
- if state.DeviceID == "" {
- state.DeviceID = getDeviceID()
- SaveDeviceState(*statePath, state)
- }
- log.Printf("Device ID: %s", state.DeviceID)
- // Create spooler
- spooler, err := NewSpooler(cfg.SpoolDir, maxSpoolBytes)
- if err != nil {
- log.Fatalf("Failed to create spooler: %v", err)
- }
- // Create SSH tunnel manager
- tunnel := NewSSHTunnel(cfg)
- // Create scanner manager
- scanners := NewScannerManager(*binDir, cfg.Debug)
- // Create network manager (manages eth0, wlan0 client, wlan0 AP fallback)
- netmgr := NewNetworkManager(cfg, scanners)
- // Create daemon
- daemon := &Daemon{
- cfg: cfg,
- state: state,
- client: NewAPIClient(cfg.APIBase),
- spooler: spooler,
- tunnel: tunnel,
- scanners: scanners,
- netmgr: netmgr,
- configPath: *configPath,
- statePath: *statePath,
- stopChan: make(chan struct{}),
- }
- // Create API server
- daemon.api = NewAPIServer(daemon)
- if state.DeviceToken != "" {
- daemon.client.SetToken(state.DeviceToken)
- }
- // Handle signals
- sigChan := make(chan os.Signal, 1)
- signal.Notify(sigChan, syscall.SIGINT, syscall.SIGTERM)
- go func() {
- <-sigChan
- log.Println("Shutting down...")
- daemon.scanners.StopAll()
- daemon.tunnel.Stop()
- daemon.netmgr.Stop()
- close(daemon.stopChan)
- }()
- // Start HTTP API server FIRST (dashboard should be available immediately)
- go func() {
- log.Printf("Starting HTTP API server on %s", *httpAddr)
- if err := daemon.api.Start(*httpAddr); err != nil {
- log.Printf("HTTP server error: %v", err)
- }
- }()
- // Give HTTP server time to bind
- time.Sleep(100 * time.Millisecond)
- // Start network manager (manages eth0, wlan0 client, wlan0 AP fallback, WiFi scanner)
- // Network manager will automatically handle all network priorities and scanner coordination
- daemon.netmgr.Start()
- // Start SSH tunnel if enabled
- if cfg.SSHTunnel.Enabled {
- daemon.tunnel.Start()
- }
- // Start BLE scanner if enabled (not managed by network manager)
- if cfg.BLE.Enabled {
- if !daemon.scanners.IsBLERunning() {
- log.Println("Starting BLE scanner...")
- if err := daemon.scanners.StartBLE(cfg.ZMQAddrBLE); err != nil {
- log.Printf("Failed to start BLE scanner: %v", err)
- }
- }
- }
- // Start registration loop (if not registered)
- go daemon.registrationLoop()
- // Start config polling loop
- go daemon.configLoop()
- // Start ZMQ subscribers
- go daemon.subscribeLoop("ble", cfg.ZMQAddrBLE)
- go daemon.subscribeLoop("wifi", cfg.ZMQAddrWiFi)
- // Start batch upload loops
- go daemon.uploadLoop("ble", "/ble", cfg.BLE.BatchIntervalMs)
- go daemon.uploadLoop("wifi", "/wifi", cfg.WiFi.BatchIntervalMs)
- // Start spool flush loop
- go daemon.spoolFlushLoop()
- // Wait for shutdown
- <-daemon.stopChan
- log.Println("Daemon stopped")
- }
- // applyScannerConfig starts/stops scanners based on current config
- func (d *Daemon) applyScannerConfig() {
- d.mu.Lock()
- bleEnabled := d.cfg.BLE.Enabled
- wifiMonitorEnabled := d.cfg.WiFi.MonitorEnabled
- wifiClientEnabled := d.cfg.WiFi.ClientEnabled
- zmqBLE := d.cfg.ZMQAddrBLE
- zmqWiFi := d.cfg.ZMQAddrWiFi
- wifiIface := d.cfg.WiFiIface
- d.mu.Unlock()
- // BLE scanner
- if bleEnabled {
- if !d.scanners.IsBLERunning() {
- log.Println("Starting BLE scanner...")
- if err := d.scanners.StartBLE(zmqBLE); err != nil {
- log.Printf("Failed to start BLE scanner: %v", err)
- }
- }
- } else {
- if d.scanners.IsBLERunning() {
- log.Println("Stopping BLE scanner...")
- d.scanners.StopBLE()
- }
- }
- // WiFi scanner - NEVER run if WiFi client is enabled (they share wlan0)
- // Client mode takes priority over monitor mode
- if wifiClientEnabled {
- if d.scanners.IsWiFiRunning() {
- log.Println("Stopping WiFi scanner (client mode enabled in config)...")
- d.scanners.StopWiFi()
- }
- return
- }
- // WiFi scanner - only if monitor enabled AND client disabled
- // Client has HIGHER priority - if both enabled, client works, scanner doesn't
- if wifiMonitorEnabled {
- if !d.scanners.IsWiFiRunning() {
- log.Println("Starting WiFi scanner...")
- if err := d.scanners.StartWiFi(zmqWiFi, wifiIface); err != nil {
- log.Printf("Failed to start WiFi scanner: %v", err)
- }
- }
- } else {
- if d.scanners.IsWiFiRunning() {
- log.Println("Stopping WiFi scanner (monitor disabled)...")
- d.scanners.StopWiFi()
- }
- }
- }
- // applyWiFiClientState checks if WiFi client state matches config and applies changes
- func (d *Daemon) applyWiFiClientState() {
- d.mu.Lock()
- clientEnabled := d.cfg.WiFi.ClientEnabled
- ssid := d.cfg.WiFi.SSID
- psk := d.cfg.WiFi.PSK
- d.mu.Unlock()
- // Check if wlan0 has an IP (meaning it's connected)
- wlan0IP := getInterfaceIP("wlan0")
- isConnected := wlan0IP != ""
- if clientEnabled && ssid != "" {
- // Should be connected
- if !isConnected {
- // Stop WiFi scanner before connecting client (they can't coexist)
- if d.scanners.IsWiFiRunning() {
- log.Println("Stopping WiFi scanner before connecting client...")
- d.scanners.StopWiFi()
- }
- log.Printf("WiFi client should be connected (SSID=%s), connecting...", ssid)
- if err := applyWiFiSettings(ssid, psk); err != nil {
- log.Printf("Failed to connect WiFi client: %v", err)
- }
- }
- } else {
- // Should be disconnected
- if isConnected {
- log.Println("WiFi client should be disconnected, disconnecting...")
- disconnectWiFiClient()
- }
- }
- }
- func (d *Daemon) registrationLoop() {
- for {
- select {
- case <-d.stopChan:
- return
- default:
- }
- if d.state.DeviceToken != "" {
- time.Sleep(60 * time.Second)
- continue
- }
- log.Println("Attempting device registration...")
- req := &RegistrationRequest{
- DeviceID: d.state.DeviceID,
- }
- // Try to get IPs
- if ip := getInterfaceIP("eth0"); ip != "" {
- req.EthIP = &ip
- }
- if ip := getInterfaceIP("wlan0"); ip != "" {
- req.WlanIP = &ip
- }
- resp, err := d.client.Register(req)
- if err != nil {
- log.Printf("Registration failed: %v", err)
- time.Sleep(10 * time.Second)
- continue
- }
- d.state.DeviceToken = resp.DeviceToken
- d.state.DevicePassword = resp.DevicePassword
- d.client.SetToken(resp.DeviceToken)
- SaveDeviceState(d.statePath, d.state)
- log.Printf("Device registered, token received")
- }
- }
- func (d *Daemon) configLoop() {
- // Initial fetch immediately
- d.fetchAndApplyConfig()
- ticker := time.NewTicker(30 * time.Second)
- defer ticker.Stop()
- for {
- select {
- case <-d.stopChan:
- return
- case <-ticker.C:
- }
- d.fetchAndApplyConfig()
- }
- }
- func (d *Daemon) fetchAndApplyConfig() {
- if d.state.DeviceToken == "" {
- return
- }
- serverCfg, err := d.client.GetConfig(d.state.DeviceID)
- if err != nil {
- // In LAN mode, suppress errors (server might be unreachable)
- if d.cfg.Mode == "lan" {
- // Silent - use local config
- return
- }
- if d.cfg.Debug {
- log.Printf("Config fetch failed: %v", err)
- }
- return
- }
- // Determine effective mode: force_cloud overrides local mode setting
- effectiveMode := d.cfg.Mode
- if effectiveMode == "" {
- effectiveMode = "cloud"
- }
- if serverCfg.ForceCloud {
- if effectiveMode == "lan" {
- log.Println("Server force_cloud enabled - switching to cloud mode")
- }
- effectiveMode = "cloud"
- }
- d.mu.Lock()
- // Track changes for scanner restart
- bleChanged := false
- wifiMonitorChanged := false
- wifiClientChanged := false
- oldClientEnabled := d.cfg.WiFi.ClientEnabled
- if effectiveMode == "cloud" {
- // Cloud mode: server settings have priority
- bleChanged = d.cfg.BLE.Enabled != serverCfg.BLE.Enabled
- wifiMonitorChanged = d.cfg.WiFi.MonitorEnabled != serverCfg.WiFi.MonitorEnabled
- wifiClientChanged = d.cfg.WiFi.ClientEnabled != serverCfg.WiFi.ClientEnabled ||
- d.cfg.WiFi.SSID != serverCfg.WiFi.SSID ||
- d.cfg.WiFi.PSK != serverCfg.WiFi.PSK
- d.cfg.BLE.Enabled = serverCfg.BLE.Enabled
- d.cfg.BLE.BatchIntervalMs = serverCfg.BLE.BatchIntervalMs
- d.cfg.WiFi.MonitorEnabled = serverCfg.WiFi.MonitorEnabled
- d.cfg.WiFi.ClientEnabled = serverCfg.WiFi.ClientEnabled
- d.cfg.WiFi.SSID = serverCfg.WiFi.SSID
- d.cfg.WiFi.PSK = serverCfg.WiFi.PSK
- d.cfg.WiFi.BatchIntervalMs = serverCfg.WiFi.BatchIntervalMs
- d.cfg.Debug = serverCfg.Debug
- // NTP from server in cloud mode
- if len(serverCfg.Net.NTP.Servers) > 0 {
- d.cfg.Network.NTPServers = serverCfg.Net.NTP.Servers
- }
- }
- // LAN mode: local settings have priority, we keep what's in d.cfg
- // Only SSH tunnel comes from server (for remote support)
- // SSH tunnel ALWAYS from server (for remote support access)
- sshChanged := d.cfg.SSHTunnel.Enabled != serverCfg.SSHTunnel.Enabled
- if serverCfg.SSHTunnel.Enabled {
- d.cfg.SSHTunnel.Enabled = true
- d.cfg.SSHTunnel.Server = serverCfg.SSHTunnel.Server
- d.cfg.SSHTunnel.Port = serverCfg.SSHTunnel.Port
- d.cfg.SSHTunnel.User = serverCfg.SSHTunnel.User
- d.cfg.SSHTunnel.RemotePort = serverCfg.SSHTunnel.RemotePort
- d.cfg.SSHTunnel.KeepaliveInterval = serverCfg.SSHTunnel.KeepaliveInterval
- } else {
- d.cfg.SSHTunnel.Enabled = false
- }
- d.mu.Unlock()
- // Apply scanner config changes
- if bleChanged || wifiMonitorChanged {
- log.Printf("Config changed (mode=%s): ble=%v wifi_monitor=%v", effectiveMode, d.cfg.BLE.Enabled, d.cfg.WiFi.MonitorEnabled)
- d.applyScannerConfig()
- }
- // Apply WiFi client changes
- if wifiClientChanged {
- if d.cfg.WiFi.ClientEnabled && d.cfg.WiFi.SSID != "" {
- // Stop WiFi scanner before connecting client
- if d.scanners.IsWiFiRunning() {
- log.Println("Stopping WiFi scanner before connecting client...")
- d.scanners.StopWiFi()
- }
- log.Printf("WiFi client enabled by server: SSID=%s", d.cfg.WiFi.SSID)
- if err := applyWiFiSettings(d.cfg.WiFi.SSID, d.cfg.WiFi.PSK); err != nil {
- log.Printf("Failed to apply WiFi client settings: %v", err)
- }
- // After client connects, update scanner config (will skip WiFi scanner)
- d.applyScannerConfig()
- } else if oldClientEnabled && !d.cfg.WiFi.ClientEnabled {
- log.Println("WiFi client disabled by server")
- disconnectWiFiClient()
- // After client disconnects, WiFi scanner may start (if monitor enabled)
- d.applyScannerConfig()
- }
- }
- // Update tunnel config
- d.tunnel.UpdateConfig(d.cfg)
- if sshChanged {
- if d.cfg.SSHTunnel.Enabled {
- log.Println("SSH tunnel enabled by server")
- d.tunnel.Start()
- } else {
- log.Println("SSH tunnel disabled by server")
- d.tunnel.Stop()
- }
- }
- // Update network manager config (eth0 settings are local-only, never from server)
- d.netmgr.UpdateConfig(d.cfg)
- // Save updated config
- SaveConfig(d.configPath, d.cfg)
- }
- func (d *Daemon) subscribeLoop(name string, addr string) {
- for {
- select {
- case <-d.stopChan:
- return
- default:
- }
- // Only try to connect if the corresponding scanner is running
- scannerRunning := false
- if name == "ble" {
- scannerRunning = d.scanners.IsBLERunning()
- } else if name == "wifi" {
- scannerRunning = d.scanners.IsWiFiRunning()
- }
- if !scannerRunning {
- // Wait before checking again
- time.Sleep(5 * time.Second)
- continue
- }
- if err := d.runSubscriber(name, addr); err != nil {
- log.Printf("[%s] Subscriber error: %v, reconnecting...", name, err)
- time.Sleep(time.Second)
- }
- }
- }
- func (d *Daemon) runSubscriber(name string, addr string) error {
- ctx, cancel := context.WithCancel(context.Background())
- defer cancel()
- sub := zmq4.NewSub(ctx)
- defer sub.Close()
- // Subscribe to all topics for this type
- if err := sub.SetOption(zmq4.OptionSubscribe, name+"."); err != nil {
- return err
- }
- if err := sub.Dial(addr); err != nil {
- return err
- }
- log.Printf("[%s] Connected to %s", name, addr)
- // Monitor stop channel in goroutine
- go func() {
- <-d.stopChan
- cancel()
- }()
- for {
- msg, err := sub.Recv()
- if err != nil {
- return err
- }
- // Message is in first frame
- data := string(msg.Frames[0])
- // Parse message: "topic JSON"
- parts := strings.SplitN(data, " ", 2)
- if len(parts) != 2 {
- continue
- }
- var event interface{}
- if err := json.Unmarshal([]byte(parts[1]), &event); err != nil {
- continue
- }
- d.mu.Lock()
- if name == "ble" {
- d.bleEvents = append(d.bleEvents, event)
- if d.cfg.Debug {
- log.Printf("[%s] Received event, queue size: %d", name, len(d.bleEvents))
- }
- // Add to API for WebSocket broadcast
- if d.api != nil {
- d.api.AddBLEEvent(event)
- }
- } else {
- d.wifiEvents = append(d.wifiEvents, event)
- if d.cfg.Debug {
- log.Printf("[%s] Received event, queue size: %d", name, len(d.wifiEvents))
- }
- // Add to API for WebSocket broadcast
- if d.api != nil {
- d.api.AddWiFiEvent(event)
- }
- }
- d.mu.Unlock()
- }
- }
- func (d *Daemon) uploadLoop(name string, endpoint string, intervalMs int) {
- if intervalMs <= 0 {
- intervalMs = 2500
- }
- ticker := time.NewTicker(time.Duration(intervalMs) * time.Millisecond)
- defer ticker.Stop()
- for {
- select {
- case <-d.stopChan:
- return
- case <-ticker.C:
- }
- d.mu.Lock()
- var events []interface{}
- if name == "ble" {
- events = d.bleEvents
- d.bleEvents = nil
- } else {
- events = d.wifiEvents
- d.wifiEvents = nil
- }
- d.mu.Unlock()
- if len(events) == 0 {
- continue
- }
- if d.cfg.Debug {
- log.Printf("[%s] Batch ready: %d events, uploading...", name, len(events))
- }
- batch := &EventBatch{
- DeviceID: d.state.DeviceID,
- Events: events,
- }
- if err := d.client.UploadEvents(endpoint, batch); err != nil {
- log.Printf("[%s] Upload failed: %v, spooling %d events", name, err, len(events))
- if err := d.spooler.Save(batch, name); err != nil {
- log.Printf("[%s] Spool save failed: %v", name, err)
- }
- } else {
- log.Printf("[%s] Uploaded %d events to server", name, len(events))
- }
- }
- }
- func (d *Daemon) spoolFlushLoop() {
- ticker := time.NewTicker(10 * time.Second)
- defer ticker.Stop()
- for {
- select {
- case <-d.stopChan:
- return
- case <-ticker.C:
- }
- if d.state.DeviceToken == "" {
- continue
- }
- // Try to flush one batch
- batch, err := d.spooler.PopOldest()
- if err != nil || batch == nil {
- continue
- }
- // Try to upload (guess endpoint from event types)
- endpoint := "/events"
- eventType := "unknown"
- if len(batch.Events) > 0 {
- if ev, ok := batch.Events[0].(map[string]interface{}); ok {
- if t, ok := ev["type"].(string); ok {
- if strings.HasPrefix(t, "wifi") {
- endpoint = "/wifi"
- eventType = "wifi"
- } else {
- endpoint = "/ble"
- eventType = "ble"
- }
- }
- }
- }
- if err := d.client.UploadEvents(endpoint, batch); err != nil {
- // Re-spool if upload failed
- d.spooler.Save(batch, "retry")
- } else {
- log.Printf("[spool] Flushed %d %s events to server", len(batch.Events), eventType)
- }
- }
- }
- // getDeviceID returns a device ID based on MAC address
- func getDeviceID() string {
- // Try wlan0 first, then eth0
- for _, iface := range []string{"wlan0", "eth0"} {
- if mac := getInterfaceMAC(iface); mac != "" {
- return mac
- }
- }
- // Fallback to hostname
- host, _ := os.Hostname()
- return host
- }
- func getInterfaceMAC(name string) string {
- iface, err := net.InterfaceByName(name)
- if err != nil {
- return ""
- }
- return iface.HardwareAddr.String()
- }
- func getInterfaceIP(name string) string {
- iface, err := net.InterfaceByName(name)
- if err != nil {
- return ""
- }
- addrs, err := iface.Addrs()
- if err != nil {
- return ""
- }
- for _, addr := range addrs {
- if ipnet, ok := addr.(*net.IPNet); ok && ipnet.IP.To4() != nil {
- // Return IP with CIDR notation (e.g., 192.168.5.244/24)
- ones, _ := ipnet.Mask.Size()
- return fmt.Sprintf("%s/%d", ipnet.IP.String(), ones)
- }
- }
- return ""
- }
|