Browse Source

Fix BLE scanner performance and memory leaks

- Increase D-Bus signal buffer from 100 to 10000
- Add device cache to avoid blocking D-Bus GetAll calls
- Extract MAC address from D-Bus path for PropertiesChanged events
- Clear ManufacturerData on update to handle different advert types
- Fix slice memory leak in recentBLE/recentWiFi (copy instead of re-slice)
- Add async broadcast to avoid blocking event loop
- Add queue overflow protection (max 10000 events)
- Add -buildvcs=false to Makefile for sudo builds

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
root 1 week ago
parent
commit
7c42a3cba3
4 changed files with 157 additions and 72 deletions
  1. 1 1
      Makefile
  2. 12 4
      cmd/beacon-daemon/api.go
  3. 12 0
      cmd/beacon-daemon/main.go
  4. 132 67
      cmd/ble-scanner/main.go

+ 1 - 1
Makefile

@@ -7,7 +7,7 @@ CMDS := ble-scanner wifi-scanner beacon-daemon
 
 # Build flags
 LDFLAGS := -s -w
-BUILD_FLAGS := -trimpath
+BUILD_FLAGS := -trimpath -buildvcs=false
 
 # Device settings
 DEVICE_IP ?= 192.168.1.100

+ 12 - 4
cmd/beacon-daemon/api.go

@@ -595,11 +595,15 @@ func (s *APIServer) AddBLEEvent(event interface{}) {
 	s.recentMu.Lock()
 	s.recentBLE = append(s.recentBLE, event)
 	if len(s.recentBLE) > 100 {
-		s.recentBLE = s.recentBLE[1:]
+		// Copy to new slice to allow GC of old backing array
+		newSlice := make([]interface{}, 100)
+		copy(newSlice, s.recentBLE[len(s.recentBLE)-100:])
+		s.recentBLE = newSlice
 	}
 	s.recentMu.Unlock()
 
-	s.broadcast(map[string]interface{}{
+	// Broadcast asynchronously to avoid blocking the event loop
+	go s.broadcast(map[string]interface{}{
 		"type":  "ble",
 		"event": event,
 	})
@@ -610,11 +614,15 @@ func (s *APIServer) AddWiFiEvent(event interface{}) {
 	s.recentMu.Lock()
 	s.recentWiFi = append(s.recentWiFi, event)
 	if len(s.recentWiFi) > 100 {
-		s.recentWiFi = s.recentWiFi[1:]
+		// Copy to new slice to allow GC of old backing array
+		newSlice := make([]interface{}, 100)
+		copy(newSlice, s.recentWiFi[len(s.recentWiFi)-100:])
+		s.recentWiFi = newSlice
 	}
 	s.recentMu.Unlock()
 
-	s.broadcast(map[string]interface{}{
+	// Broadcast asynchronously to avoid blocking the event loop
+	go s.broadcast(map[string]interface{}{
 		"type":  "wifi",
 		"event": event,
 	})

+ 12 - 0
cmd/beacon-daemon/main.go

@@ -608,6 +608,12 @@ func (d *Daemon) runSubscriber(name string, addr string) error {
 		d.mu.Lock()
 		if name == "ble" {
 			d.bleEvents = append(d.bleEvents, event)
+			// Prevent memory exhaustion: drop oldest events if queue too large
+			if len(d.bleEvents) > 10000 {
+				dropped := len(d.bleEvents) - 5000
+				d.bleEvents = append([]interface{}{}, d.bleEvents[dropped:]...)
+				log.Printf("[%s] Queue overflow, dropped %d oldest events", name, dropped)
+			}
 			if d.cfg.Debug {
 				log.Printf("[%s] Received event, queue size: %d", name, len(d.bleEvents))
 			}
@@ -617,6 +623,12 @@ func (d *Daemon) runSubscriber(name string, addr string) error {
 			}
 		} else {
 			d.wifiEvents = append(d.wifiEvents, event)
+			// Prevent memory exhaustion: drop oldest events if queue too large
+			if len(d.wifiEvents) > 10000 {
+				dropped := len(d.wifiEvents) - 5000
+				d.wifiEvents = append([]interface{}{}, d.wifiEvents[dropped:]...)
+				log.Printf("[%s] Queue overflow, dropped %d oldest events", name, dropped)
+			}
 			if d.cfg.Debug {
 				log.Printf("[%s] Received event, queue size: %d", name, len(d.wifiEvents))
 			}

+ 132 - 67
cmd/ble-scanner/main.go

@@ -11,6 +11,7 @@ import (
 	"os"
 	"os/signal"
 	"strings"
+	"sync"
 	"syscall"
 	"time"
 
@@ -31,6 +32,87 @@ const (
 	propertiesIface  = "org.freedesktop.DBus.Properties"
 )
 
+// DeviceCache stores device properties to avoid D-Bus calls
+type DeviceCache struct {
+	mu      sync.RWMutex
+	devices map[dbus.ObjectPath]*DeviceProps
+}
+
+type DeviceProps struct {
+	Address          string
+	RSSI             int16
+	ManufacturerData map[uint16][]byte
+	LastSeen         time.Time
+}
+
+func NewDeviceCache() *DeviceCache {
+	return &DeviceCache{
+		devices: make(map[dbus.ObjectPath]*DeviceProps),
+	}
+}
+
+func (c *DeviceCache) Update(path dbus.ObjectPath, props map[string]dbus.Variant) *DeviceProps {
+	c.mu.Lock()
+	defer c.mu.Unlock()
+
+	dev, exists := c.devices[path]
+	if !exists {
+		dev = &DeviceProps{
+			ManufacturerData: make(map[uint16][]byte),
+		}
+		c.devices[path] = dev
+	}
+
+	// Update fields from props
+	if v, ok := props["Address"]; ok {
+		if addr, ok := v.Value().(string); ok {
+			dev.Address = addr
+		}
+	}
+	// Extract address from path if not in props (e.g. /org/bluez/hci0/dev_AA_BB_CC_DD_EE_FF)
+	if dev.Address == "" {
+		pathStr := string(path)
+		if idx := strings.LastIndex(pathStr, "/dev_"); idx != -1 {
+			mac := strings.ReplaceAll(pathStr[idx+5:], "_", ":")
+			dev.Address = mac
+		}
+	}
+	if v, ok := props["RSSI"]; ok {
+		if rssi, ok := v.Value().(int16); ok {
+			dev.RSSI = rssi
+		}
+	}
+	if v, ok := props["ManufacturerData"]; ok {
+		if mfgVariant, ok := v.Value().(map[uint16]dbus.Variant); ok {
+			// Clear old data - device may send different advert types
+			dev.ManufacturerData = make(map[uint16][]byte)
+			for companyID, dataVariant := range mfgVariant {
+				if data, ok := dataVariant.Value().([]byte); ok {
+					dev.ManufacturerData[companyID] = data
+				}
+			}
+		}
+	}
+
+	dev.LastSeen = time.Now()
+	return dev
+}
+
+// Cleanup removes stale devices (not seen for 60 seconds)
+func (c *DeviceCache) Cleanup() {
+	c.mu.Lock()
+	defer c.mu.Unlock()
+
+	threshold := time.Now().Add(-60 * time.Second)
+	for path, dev := range c.devices {
+		if dev.LastSeen.Before(threshold) {
+			delete(c.devices, path)
+		}
+	}
+}
+
+var debugMode bool
+
 func main() {
 	var (
 		zmqAddr = flag.String("zmq", defaultZMQAddr, "ZMQ PUB address")
@@ -38,6 +120,7 @@ func main() {
 		debug   = flag.Bool("debug", false, "Enable debug logging")
 	)
 	flag.Parse()
+	debugMode = *debug
 
 	log.SetFlags(log.Ltime)
 	log.Printf("BLE Scanner starting (adapter=%s, zmq=%s)", *adapter, *zmqAddr)
@@ -114,16 +197,33 @@ func main() {
 		os.Exit(0)
 	}()
 
-	// Process D-Bus signals
-	signals := make(chan *dbus.Signal, 100)
+	// Device cache for fast property lookup
+	cache := NewDeviceCache()
+
+	// Periodic cache cleanup
+	go func() {
+		ticker := time.NewTicker(30 * time.Second)
+		defer ticker.Stop()
+		for {
+			select {
+			case <-ctx.Done():
+				return
+			case <-ticker.C:
+				cache.Cleanup()
+			}
+		}
+	}()
+
+	// Process D-Bus signals - large buffer for high-traffic environments
+	signals := make(chan *dbus.Signal, 10000)
 	conn.Signal(signals)
 
 	var eventCount uint64
 
 	for sig := range signals {
-		var ev interface{}
+		var dev *DeviceProps
 
-		if *debug {
+		if debugMode {
 			log.Printf("Signal: %s path=%s", sig.Name, sig.Path)
 		}
 
@@ -132,6 +232,10 @@ func main() {
 			if len(sig.Body) < 2 {
 				continue
 			}
+			path, ok := sig.Body[0].(dbus.ObjectPath)
+			if !ok {
+				continue
+			}
 			ifaces, ok := sig.Body[1].(map[string]map[string]dbus.Variant)
 			if !ok {
 				continue
@@ -140,10 +244,10 @@ func main() {
 			if !ok {
 				continue
 			}
-			if *debug {
-				log.Printf("InterfacesAdded: device props=%v", props)
+			if debugMode {
+				log.Printf("InterfacesAdded: path=%s props=%v", path, props)
 			}
-			ev = parseDeviceProperties(props, *debug)
+			dev = cache.Update(path, props)
 
 		case propertiesIface + ".PropertiesChanged":
 			if len(sig.Body) < 2 {
@@ -157,24 +261,18 @@ func main() {
 			if !ok {
 				continue
 			}
-			// Get full device properties
-			devicePath := sig.Path
-			deviceObj := conn.Object(bluezBus, devicePath)
-			var allProps map[string]dbus.Variant
-			if err := deviceObj.Call(propertiesIface+".GetAll", 0, deviceInterface).Store(&allProps); err != nil {
-				// Use partial props
-				if *debug {
-					log.Printf("PropertiesChanged (partial): %v", props)
-				}
-				ev = parseDeviceProperties(props, *debug)
-			} else {
-				if *debug {
-					log.Printf("PropertiesChanged (full): addr=%v mfg=%v", allProps["Address"], allProps["ManufacturerData"])
-				}
-				ev = parseDeviceProperties(allProps, *debug)
+			if debugMode {
+				log.Printf("PropertiesChanged: path=%s props=%v", sig.Path, props)
 			}
+			// Update cache with partial props - no D-Bus call needed!
+			dev = cache.Update(sig.Path, props)
+		}
+
+		if dev == nil || dev.Address == "" || len(dev.ManufacturerData) == 0 {
+			continue
 		}
 
+		ev := parseDeviceProps(dev)
 		if ev == nil {
 			continue
 		}
@@ -203,75 +301,42 @@ func main() {
 		}
 
 		eventCount++
-		if *debug {
+		if debugMode {
 			log.Printf("[%s] %s", topic, string(jsonData))
-		} else if eventCount%100 == 0 {
+		} else if eventCount%1000 == 0 {
 			log.Printf("[ble-scanner] %d events sent to daemon via ZMQ", eventCount)
 		}
 	}
 }
 
-func parseDeviceProperties(props map[string]dbus.Variant, debug bool) interface{} {
-	var mac string
-	var rssi int16
-	manufacturerData := make(map[uint16][]byte)
-
-	if v, ok := props["Address"]; ok {
-		mac, _ = v.Value().(string)
-	}
-	if v, ok := props["RSSI"]; ok {
-		rssi, _ = v.Value().(int16)
-	}
-	if v, ok := props["ManufacturerData"]; ok {
-		// D-Bus returns map[uint16]dbus.Variant where each Variant contains []byte
-		if mfgVariant, ok := v.Value().(map[uint16]dbus.Variant); ok {
-			for companyID, dataVariant := range mfgVariant {
-				if data, ok := dataVariant.Value().([]byte); ok {
-					manufacturerData[companyID] = data
-				}
-			}
-		}
-	}
-
-	if debug {
-		log.Printf("  parseDeviceProperties: mac=%s rssi=%d mfgData=%v", mac, rssi, manufacturerData)
-	}
-
-	if mac == "" {
-		return nil
-	}
-
-	// Normalize MAC address
-	mac = strings.ToLower(strings.ReplaceAll(mac, "-", ":"))
+func parseDeviceProps(dev *DeviceProps) interface{} {
+	mac := strings.ToLower(strings.ReplaceAll(dev.Address, "-", ":"))
 	ts := time.Now().UnixMilli()
-
-	if len(manufacturerData) == 0 {
-		return nil
-	}
+	rssi := int(dev.RSSI)
 
 	// Check for iBeacon (Apple company ID 0x004C)
-	if data, ok := manufacturerData[0x004C]; ok {
-		if debug {
+	if data, ok := dev.ManufacturerData[0x004C]; ok {
+		if debugMode {
 			log.Printf("  Found Apple mfg data: %x", data)
 		}
-		if ev := parseIBeacon(mac, int(rssi), ts, data); ev != nil {
+		if ev := parseIBeacon(mac, rssi, ts, data); ev != nil {
 			return ev
 		}
 	}
 
 	// Check for Nordic/custom (0x0059) - my-beacon_acc and rt_mybeacon
-	if data, ok := manufacturerData[0x0059]; ok {
-		if debug {
+	if data, ok := dev.ManufacturerData[0x0059]; ok {
+		if debugMode {
 			log.Printf("  Found Nordic mfg data: %x", data)
 		}
 		// Check for acc (0x01 0x15) or relay (0x02 0x15)
 		if len(data) >= 2 {
 			if data[0] == 0x01 && data[1] == 0x15 {
-				if ev := parseAccelBeacon(mac, int(rssi), ts, data); ev != nil {
+				if ev := parseAccelBeacon(mac, rssi, ts, data); ev != nil {
 					return ev
 				}
 			} else if data[0] == 0x02 && data[1] == 0x15 {
-				if ev := parseRelayBeacon(mac, int(rssi), ts, data); ev != nil {
+				if ev := parseRelayBeacon(mac, rssi, ts, data); ev != nil {
 					return ev
 				}
 			}