initial commit

This commit is contained in:
root
2026-06-28 14:27:20 -04:00
commit ae0f1f559e
115 changed files with 30411 additions and 0 deletions
+403
View File
@@ -0,0 +1,403 @@
#!/usr/bin/env python3
"""
Litter Box Camera Analyzer - MQTT-based computer vision service.
Subscribes to base64-encoded JPEG images from the DFRobot DFR1154 ESP32-S3
camera (published via ESPHome to MQTT) and runs analysis to detect:
- Cat presence (motion-based: sustained blob in frame)
- Box visual state (open vs. rotated/closed) via frame comparison
- Litter box stuck closed (prolonged non-open state)
- Cleanliness / needs scooping
Publishes results back via MQTT for Home Assistant to consume.
Dependencies: pip3 install opencv-python numpy paho-mqtt
"""
import argparse
import base64
import logging
import os
import signal
import sys
import time
from collections import deque
from datetime import datetime, timezone
import cv2
import numpy as np
import paho.mqtt.client as mqtt
# ---------------------------------------------------------------------------
# Configuration
# ---------------------------------------------------------------------------
CONFIG = {
"mqtt_broker": os.environ.get("MQTT_BROKER", "localhost"),
"mqtt_port": int(os.environ.get("MQTT_PORT", "1883")),
"mqtt_username": os.environ.get("MQTT_USERNAME", ""),
"mqtt_password": os.environ.get("MQTT_PASSWORD", ""),
"mqtt_topic_image": "litter_box/camera/image",
"mqtt_topic_prefix": "litter_box/status",
"motion_threshold": 15.0, # avg pixel delta for motion (0-255)
"cat_detection_frames": 4, # consecutive motion frames to confirm cat
"cat_absent_frames": 3, # consecutive no-motion to confirm cat left
"box_state_threshold": 30.0, # avg diff from open ref -> closed state
"stuck_closed_frames": 10, # consecutive "closed" frames before stuck alert
"analysis_cooldown": 2.0, # seconds between analyses
"health_check_interval": 300, # seconds between health checks when no motion
"calibration_frames": 10, # frames to average for open reference
"debug_dir": os.environ.get("DEBUG_DIR", ""),
}
# ---------------------------------------------------------------------------
# Logging
# ---------------------------------------------------------------------------
logging.basicConfig(
level=logging.INFO,
format="%(asctime)s [%(levelname)s] %(message)s",
datefmt="%Y-%m-%d %H:%M:%S",
)
log = logging.getLogger("litter_box")
# ---------------------------------------------------------------------------
# Globals
# ---------------------------------------------------------------------------
shutdown_flag = False
mqtt_client: mqtt.Client | None = None
reference_open: np.ndarray | None = None # averaged "open/normal" frame
motion_streak: int = 0
no_motion_streak: int = 0
cat_present = False
cat_visit_completed = False
box_closed_streak: int = 0 # consecutive frames in "closed" state
box_stuck_notified = False
box_state: str = "unknown" # "open", "closed", "unknown"
cleanliness_score: int = 100
needs_scooping = False
calibration_frames: deque = deque(maxlen=CONFIG["calibration_frames"])
calibrated = False
last_analysis_time: float = 0.0
last_mqtt_publish: dict = {} # avoid re-publishing same values
# ---------------------------------------------------------------------------
# Image decoding
# ---------------------------------------------------------------------------
def decode_image(payload: bytes) -> np.ndarray | None:
try:
raw = base64.b64decode(payload)
arr = np.frombuffer(raw, dtype=np.uint8)
img = cv2.imdecode(arr, cv2.IMREAD_COLOR)
if img is None:
log.warning("cv2.imdecode returned None (%d bytes)", len(raw))
return img
except Exception as exc:
log.error("Failed to decode image: %s", exc)
return None
# ---------------------------------------------------------------------------
# Image comparison helpers
# ---------------------------------------------------------------------------
def _downsample_gray(img: np.ndarray, size=(50, 40)) -> np.ndarray:
small = cv2.resize(img, size, interpolation=cv2.INTER_AREA)
return cv2.cvtColor(small, cv2.COLOR_BGR2GRAY)
def compute_diff_score(frame: np.ndarray, reference: np.ndarray) -> float:
"""Return mean absolute pixel difference (0-255) between frame and reference."""
f = _downsample_gray(frame)
r = _downsample_gray(reference)
return float(np.mean(np.abs(f.astype(float) - r.astype(float))))
def compute_cleanliness(frame: np.ndarray, reference: np.ndarray) -> int:
"""Return cleanliness % (100 = pristine, matches reference; 0 = very different)."""
diff = compute_diff_score(frame, reference)
raw = max(0.0, 100.0 - (diff * 2.5)) # scale: diff 40 -> 0%, diff 0 -> 100%
return int(min(100, raw))
def build_average_frame(frames: list[np.ndarray]) -> np.ndarray:
stacked = np.stack([f.astype(np.float32) for f in frames], axis=0)
return np.mean(stacked, axis=0).astype(np.uint8)
# ---------------------------------------------------------------------------
# Core analysis
# ---------------------------------------------------------------------------
def analyze_frame(frame: np.ndarray) -> dict:
global reference_open, motion_streak, no_motion_streak
global cat_present, cat_visit_completed
global box_closed_streak, box_stuck_notified, box_state
global cleanliness_score, needs_scooping
global calibration_frames, calibrated
results = {
"cat_present": False,
"cat_visit_complete": False,
"motion_detected": False,
"motion_score": 0.0,
"box_state": "unknown",
"litter_box_stuck": False,
"needs_scooping": False,
"cleanliness_score": 100,
"status_message": "OK",
}
# --- Calibration phase: build "open/normal" reference ---
if not calibrated:
calibration_frames.append(frame.copy())
results["status_message"] = f"Calibrating {len(calibration_frames)}/{CONFIG['calibration_frames']}..."
if len(calibration_frames) >= CONFIG["calibration_frames"]:
reference_open = build_average_frame(list(calibration_frames))
calibrated = True
log.info("Calibration complete - open reference frame established")
results["status_message"] = "Calibrated. Monitoring started."
return results
if reference_open is None:
results["status_message"] = "No reference frame - recalibrating..."
calibrated = False
return results
# --- 1. Box visual state detection (open vs. closed) ---
diff_from_open = compute_diff_score(frame, reference_open)
box_is_closed = diff_from_open > CONFIG["box_state_threshold"]
if box_is_closed:
box_closed_streak += 1
results["box_state"] = "closed"
if box_closed_streak >= CONFIG["stuck_closed_frames"] and not box_stuck_notified:
results["litter_box_stuck"] = True
box_stuck_notified = True
results["status_message"] = (
f"STUCK CLOSED - box closed for {box_closed_streak} frames"
)
else:
box_closed_streak = 0
box_stuck_notified = False
results["box_state"] = "open"
box_state = results["box_state"]
results["box_diff_score"] = round(diff_from_open, 1)
# --- 2. Cat presence via motion relative to the open reference ---
motion_score = diff_from_open # reuse - motion is deviation from empty/open state
results["motion_score"] = round(motion_score, 1)
if motion_score > CONFIG["motion_threshold"]:
results["motion_detected"] = True
motion_streak += 1
no_motion_streak = 0
if motion_streak >= CONFIG["cat_detection_frames"]:
results["cat_present"] = True
cat_present = True
else:
no_motion_streak += 1
motion_streak = 0
if cat_present and no_motion_streak >= CONFIG["cat_absent_frames"]:
cat_present = False
results["cat_visit_complete"] = True
results["cat_present"] = cat_present
# --- 3. Cleanliness ---
cleanliness = compute_cleanliness(frame, reference_open)
results["cleanliness_score"] = cleanliness
if cleanliness < 45:
results["needs_scooping"] = True
results["status_message"] = results.get("status_message", "") + " Needs scooping."
cleanliness_score = cleanliness
needs_scooping = results["needs_scooping"]
# --- 4. Debug ---
if CONFIG["debug_dir"]:
_save_debug(frame, results)
return results
# ---------------------------------------------------------------------------
# Debug frame saving
# ---------------------------------------------------------------------------
def _save_debug(frame: np.ndarray, results: dict) -> None:
ts = datetime.now(timezone.utc).strftime("%Y%m%d_%H%M%S")
path = os.path.join(CONFIG["debug_dir"], f"litter_{ts}.jpg")
annotated = frame.copy()
y = 30
for label, color in [
(f"Box: {results.get('box_state', '?')} Diff: {results.get('box_diff_score', 0):.1f}", (255, 255, 0)),
(f"Motion: {results.get('motion_score', 0):.1f} Clean: {results.get('cleanliness_score', 100)}%", (0, 255, 0)),
]:
cv2.putText(annotated, label, (10, y), cv2.FONT_HERSHEY_SIMPLEX, 0.5, color, 1)
y += 20
if results.get("cat_present"):
cv2.putText(annotated, "CAT", (annotated.shape[1] - 100, 30),
cv2.FONT_HERSHEY_SIMPLEX, 1.2, (0, 0, 255), 3)
if results.get("litter_box_stuck"):
cv2.putText(annotated, "STUCK!", (annotated.shape[1] - 140, 60),
cv2.FONT_HERSHEY_SIMPLEX, 1.0, (0, 0, 255), 3)
cv2.imwrite(path, annotated)
# ---------------------------------------------------------------------------
# MQTT Callbacks
# ---------------------------------------------------------------------------
def on_connect(client, userdata, flags, reason_code, properties=None):
if reason_code == 0:
log.info("Connected to MQTT broker at %s:%d", CONFIG["mqtt_broker"], CONFIG["mqtt_port"])
client.subscribe(CONFIG["mqtt_topic_image"])
log.info("Subscribed to %s", CONFIG["mqtt_topic_image"])
else:
log.error("MQTT connection failed: rc=%d", reason_code)
def on_disconnect(client, userdata, flags, reason_code, properties=None):
log.warning("MQTT disconnected (rc=%d), auto-reconnecting", reason_code)
def on_message(client, userdata, msg):
global last_analysis_time
now = time.time()
if now - last_analysis_time < CONFIG["analysis_cooldown"]:
return
img = decode_image(msg.payload)
if img is None:
return
last_analysis_time = now
results = analyze_frame(img)
_publish_results(results)
def _should_publish(topic: str, value: str) -> bool:
"""Skip re-publishing the same value to reduce MQTT traffic."""
if last_mqtt_publish.get(topic) == value:
return False
last_mqtt_publish[topic] = value
return True
def _publish_results(results: dict) -> None:
prefix = CONFIG["mqtt_topic_prefix"]
publish = mqtt_client.publish
cat_val = "ON" if results.get("cat_present") else "OFF"
motion_val = "ON" if results.get("motion_detected") else "OFF"
stuck_val = "ON" if results.get("litter_box_stuck") else "OFF"
scoop_val = "ON" if results.get("needs_scooping") else "OFF"
motion_score_val = str(results.get("motion_score", 0))
clean_val = str(results.get("cleanliness_score", 100))
box_state_val = results.get("box_state", "unknown")
msg_val = results.get("status_message", "OK")
publish(f"{prefix}/cat_present", cat_val, retain=False)
publish(f"{prefix}/motion_detected", motion_val, retain=False)
publish(f"{prefix}/motion_score", motion_score_val, retain=False)
publish(f"{prefix}/litter_box_stuck", stuck_val, retain=False)
publish(f"{prefix}/needs_scooping", scoop_val, retain=False)
publish(f"{prefix}/cleanliness_score", clean_val, retain=False)
publish(f"{prefix}/box_state", box_state_val, retain=False)
publish(f"{prefix}/status_message", msg_val, retain=False)
if results.get("cat_visit_complete"):
log.info("Cat visit completed!")
if results.get("litter_box_stuck"):
log.warning("LITTER BOX STUCK CLOSED!")
# ---------------------------------------------------------------------------
# Signal handling
# ---------------------------------------------------------------------------
def signal_handler(sig, frame):
global shutdown_flag
log.info("Received signal %d, shutting down...", sig)
shutdown_flag = True
# ---------------------------------------------------------------------------
# Main
# ---------------------------------------------------------------------------
def main():
global mqtt_client
parser = argparse.ArgumentParser(description="Litter Box Camera Analyzer")
parser.add_argument("--broker", default=CONFIG["mqtt_broker"], help="MQTT broker host")
parser.add_argument("--port", type=int, default=CONFIG["mqtt_port"], help="MQTT broker port")
parser.add_argument("--username", default=CONFIG["mqtt_username"], help="MQTT username")
parser.add_argument("--password", default=CONFIG["mqtt_password"], help="MQTT password")
parser.add_argument("--debug-dir", default=CONFIG["debug_dir"],
help="Save annotated images to this directory")
parser.add_argument("--motion-threshold", type=float, default=CONFIG["motion_threshold"],
help="Avg pixel diff for motion (0-255, default 15)")
parser.add_argument("--box-state-threshold", type=float, default=CONFIG["box_state_threshold"],
help="Avg pixel diff from open ref to flag closed (default 30)")
parser.add_argument("--stuck-frames", type=int, default=CONFIG["stuck_closed_frames"],
help="Consecutive 'closed' frames before stuck alert")
parser.add_argument("--calibration-frames", type=int, default=CONFIG["calibration_frames"],
help="Frames to average for open reference")
args = parser.parse_args()
CONFIG["mqtt_broker"] = args.broker
CONFIG["mqtt_port"] = args.port
CONFIG["mqtt_username"] = args.username
CONFIG["mqtt_password"] = args.password
CONFIG["motion_threshold"] = args.motion_threshold
CONFIG["box_state_threshold"] = args.box_state_threshold
CONFIG["stuck_closed_frames"] = args.stuck_frames
CONFIG["calibration_frames"] = args.calibration_frames
if args.debug_dir:
CONFIG["debug_dir"] = args.debug_dir
os.makedirs(args.debug_dir, exist_ok=True)
calibration_frames.clear()
for _ in range(CONFIG["calibration_frames"]):
calibration_frames.append(None)
calibration_frames.clear()
signal.signal(signal.SIGINT, signal_handler)
signal.signal(signal.SIGTERM, signal_handler)
mqtt_client = mqtt.Client(mqtt.CallbackAPIVersion.VERSION2, client_id="litter_box_analyzer")
mqtt_client.on_connect = on_connect
mqtt_client.on_disconnect = on_disconnect
mqtt_client.on_message = on_message
if CONFIG["mqtt_username"]:
mqtt_client.username_pw_set(CONFIG["mqtt_username"], CONFIG["mqtt_password"])
mqtt_client.connect_async(CONFIG["mqtt_broker"], CONFIG["mqtt_port"], keepalive=60)
mqtt_client.loop_start()
log.info("=== Litter Box Analyzer Started ===")
log.info(" MQTT: %s:%d", CONFIG["mqtt_broker"], CONFIG["mqtt_port"])
log.info(" Image topic: %s", CONFIG["mqtt_topic_image"])
log.info(" Status prefix: %s", CONFIG["mqtt_topic_prefix"])
log.info(" Motion threshold: %.1f Box closed threshold: %.1f",
CONFIG["motion_threshold"], CONFIG["box_state_threshold"])
log.info(" Stuck-closed frames: %d Calibration: %d frames",
CONFIG["stuck_closed_frames"], CONFIG["calibration_frames"])
log.info(" Waiting for %d frames to calibrate open reference...",
CONFIG["calibration_frames"])
try:
while not shutdown_flag:
time.sleep(1)
finally:
mqtt_client.loop_stop()
mqtt_client.disconnect()
log.info("Litter Box Analyzer stopped")
if __name__ == "__main__":
main()