#!/usr/bin/env python3 """ Litter Box Camera Analyzer - MQTT-based computer vision service. Subscribes to base64-encoded JPEG images from the DFRobot DFR1154 ESP32-S3 camera (published via ESPHome to MQTT) and runs analysis to detect: - Cat presence (motion-based: sustained blob in frame) - Box visual state (open vs. rotated/closed) via frame comparison - Litter box stuck closed (prolonged non-open state) - Cleanliness / needs scooping Publishes results back via MQTT for Home Assistant to consume. Dependencies: pip3 install opencv-python numpy paho-mqtt """ import argparse import base64 import logging import os import signal import sys import time from collections import deque from datetime import datetime, timezone import cv2 import numpy as np import paho.mqtt.client as mqtt # --------------------------------------------------------------------------- # Configuration # --------------------------------------------------------------------------- CONFIG = { "mqtt_broker": os.environ.get("MQTT_BROKER", "localhost"), "mqtt_port": int(os.environ.get("MQTT_PORT", "1883")), "mqtt_username": os.environ.get("MQTT_USERNAME", ""), "mqtt_password": os.environ.get("MQTT_PASSWORD", ""), "mqtt_topic_image": "litter_box/camera/image", "mqtt_topic_prefix": "litter_box/status", "motion_threshold": 15.0, # avg pixel delta for motion (0-255) "cat_detection_frames": 4, # consecutive motion frames to confirm cat "cat_absent_frames": 3, # consecutive no-motion to confirm cat left "box_state_threshold": 30.0, # avg diff from open ref -> closed state "stuck_closed_frames": 10, # consecutive "closed" frames before stuck alert "analysis_cooldown": 2.0, # seconds between analyses "health_check_interval": 300, # seconds between health checks when no motion "calibration_frames": 10, # frames to average for open reference "debug_dir": os.environ.get("DEBUG_DIR", ""), } # --------------------------------------------------------------------------- # Logging # --------------------------------------------------------------------------- logging.basicConfig( level=logging.INFO, format="%(asctime)s [%(levelname)s] %(message)s", datefmt="%Y-%m-%d %H:%M:%S", ) log = logging.getLogger("litter_box") # --------------------------------------------------------------------------- # Globals # --------------------------------------------------------------------------- shutdown_flag = False mqtt_client: mqtt.Client | None = None reference_open: np.ndarray | None = None # averaged "open/normal" frame motion_streak: int = 0 no_motion_streak: int = 0 cat_present = False cat_visit_completed = False box_closed_streak: int = 0 # consecutive frames in "closed" state box_stuck_notified = False box_state: str = "unknown" # "open", "closed", "unknown" cleanliness_score: int = 100 needs_scooping = False calibration_frames: deque = deque(maxlen=CONFIG["calibration_frames"]) calibrated = False last_analysis_time: float = 0.0 last_mqtt_publish: dict = {} # avoid re-publishing same values # --------------------------------------------------------------------------- # Image decoding # --------------------------------------------------------------------------- def decode_image(payload: bytes) -> np.ndarray | None: try: raw = base64.b64decode(payload) arr = np.frombuffer(raw, dtype=np.uint8) img = cv2.imdecode(arr, cv2.IMREAD_COLOR) if img is None: log.warning("cv2.imdecode returned None (%d bytes)", len(raw)) return img except Exception as exc: log.error("Failed to decode image: %s", exc) return None # --------------------------------------------------------------------------- # Image comparison helpers # --------------------------------------------------------------------------- def _downsample_gray(img: np.ndarray, size=(50, 40)) -> np.ndarray: small = cv2.resize(img, size, interpolation=cv2.INTER_AREA) return cv2.cvtColor(small, cv2.COLOR_BGR2GRAY) def compute_diff_score(frame: np.ndarray, reference: np.ndarray) -> float: """Return mean absolute pixel difference (0-255) between frame and reference.""" f = _downsample_gray(frame) r = _downsample_gray(reference) return float(np.mean(np.abs(f.astype(float) - r.astype(float)))) def compute_cleanliness(frame: np.ndarray, reference: np.ndarray) -> int: """Return cleanliness % (100 = pristine, matches reference; 0 = very different).""" diff = compute_diff_score(frame, reference) raw = max(0.0, 100.0 - (diff * 2.5)) # scale: diff 40 -> 0%, diff 0 -> 100% return int(min(100, raw)) def build_average_frame(frames: list[np.ndarray]) -> np.ndarray: stacked = np.stack([f.astype(np.float32) for f in frames], axis=0) return np.mean(stacked, axis=0).astype(np.uint8) # --------------------------------------------------------------------------- # Core analysis # --------------------------------------------------------------------------- def analyze_frame(frame: np.ndarray) -> dict: global reference_open, motion_streak, no_motion_streak global cat_present, cat_visit_completed global box_closed_streak, box_stuck_notified, box_state global cleanliness_score, needs_scooping global calibration_frames, calibrated results = { "cat_present": False, "cat_visit_complete": False, "motion_detected": False, "motion_score": 0.0, "box_state": "unknown", "litter_box_stuck": False, "needs_scooping": False, "cleanliness_score": 100, "status_message": "OK", } # --- Calibration phase: build "open/normal" reference --- if not calibrated: calibration_frames.append(frame.copy()) results["status_message"] = f"Calibrating {len(calibration_frames)}/{CONFIG['calibration_frames']}..." if len(calibration_frames) >= CONFIG["calibration_frames"]: reference_open = build_average_frame(list(calibration_frames)) calibrated = True log.info("Calibration complete - open reference frame established") results["status_message"] = "Calibrated. Monitoring started." return results if reference_open is None: results["status_message"] = "No reference frame - recalibrating..." calibrated = False return results # --- 1. Box visual state detection (open vs. closed) --- diff_from_open = compute_diff_score(frame, reference_open) box_is_closed = diff_from_open > CONFIG["box_state_threshold"] if box_is_closed: box_closed_streak += 1 results["box_state"] = "closed" if box_closed_streak >= CONFIG["stuck_closed_frames"] and not box_stuck_notified: results["litter_box_stuck"] = True box_stuck_notified = True results["status_message"] = ( f"STUCK CLOSED - box closed for {box_closed_streak} frames" ) else: box_closed_streak = 0 box_stuck_notified = False results["box_state"] = "open" box_state = results["box_state"] results["box_diff_score"] = round(diff_from_open, 1) # --- 2. Cat presence via motion relative to the open reference --- motion_score = diff_from_open # reuse - motion is deviation from empty/open state results["motion_score"] = round(motion_score, 1) if motion_score > CONFIG["motion_threshold"]: results["motion_detected"] = True motion_streak += 1 no_motion_streak = 0 if motion_streak >= CONFIG["cat_detection_frames"]: results["cat_present"] = True cat_present = True else: no_motion_streak += 1 motion_streak = 0 if cat_present and no_motion_streak >= CONFIG["cat_absent_frames"]: cat_present = False results["cat_visit_complete"] = True results["cat_present"] = cat_present # --- 3. Cleanliness --- cleanliness = compute_cleanliness(frame, reference_open) results["cleanliness_score"] = cleanliness if cleanliness < 45: results["needs_scooping"] = True results["status_message"] = results.get("status_message", "") + " Needs scooping." cleanliness_score = cleanliness needs_scooping = results["needs_scooping"] # --- 4. Debug --- if CONFIG["debug_dir"]: _save_debug(frame, results) return results # --------------------------------------------------------------------------- # Debug frame saving # --------------------------------------------------------------------------- def _save_debug(frame: np.ndarray, results: dict) -> None: ts = datetime.now(timezone.utc).strftime("%Y%m%d_%H%M%S") path = os.path.join(CONFIG["debug_dir"], f"litter_{ts}.jpg") annotated = frame.copy() y = 30 for label, color in [ (f"Box: {results.get('box_state', '?')} Diff: {results.get('box_diff_score', 0):.1f}", (255, 255, 0)), (f"Motion: {results.get('motion_score', 0):.1f} Clean: {results.get('cleanliness_score', 100)}%", (0, 255, 0)), ]: cv2.putText(annotated, label, (10, y), cv2.FONT_HERSHEY_SIMPLEX, 0.5, color, 1) y += 20 if results.get("cat_present"): cv2.putText(annotated, "CAT", (annotated.shape[1] - 100, 30), cv2.FONT_HERSHEY_SIMPLEX, 1.2, (0, 0, 255), 3) if results.get("litter_box_stuck"): cv2.putText(annotated, "STUCK!", (annotated.shape[1] - 140, 60), cv2.FONT_HERSHEY_SIMPLEX, 1.0, (0, 0, 255), 3) cv2.imwrite(path, annotated) # --------------------------------------------------------------------------- # MQTT Callbacks # --------------------------------------------------------------------------- def on_connect(client, userdata, flags, reason_code, properties=None): if reason_code == 0: log.info("Connected to MQTT broker at %s:%d", CONFIG["mqtt_broker"], CONFIG["mqtt_port"]) client.subscribe(CONFIG["mqtt_topic_image"]) log.info("Subscribed to %s", CONFIG["mqtt_topic_image"]) else: log.error("MQTT connection failed: rc=%d", reason_code) def on_disconnect(client, userdata, flags, reason_code, properties=None): log.warning("MQTT disconnected (rc=%d), auto-reconnecting", reason_code) def on_message(client, userdata, msg): global last_analysis_time now = time.time() if now - last_analysis_time < CONFIG["analysis_cooldown"]: return img = decode_image(msg.payload) if img is None: return last_analysis_time = now results = analyze_frame(img) _publish_results(results) def _should_publish(topic: str, value: str) -> bool: """Skip re-publishing the same value to reduce MQTT traffic.""" if last_mqtt_publish.get(topic) == value: return False last_mqtt_publish[topic] = value return True def _publish_results(results: dict) -> None: prefix = CONFIG["mqtt_topic_prefix"] publish = mqtt_client.publish cat_val = "ON" if results.get("cat_present") else "OFF" motion_val = "ON" if results.get("motion_detected") else "OFF" stuck_val = "ON" if results.get("litter_box_stuck") else "OFF" scoop_val = "ON" if results.get("needs_scooping") else "OFF" motion_score_val = str(results.get("motion_score", 0)) clean_val = str(results.get("cleanliness_score", 100)) box_state_val = results.get("box_state", "unknown") msg_val = results.get("status_message", "OK") publish(f"{prefix}/cat_present", cat_val, retain=False) publish(f"{prefix}/motion_detected", motion_val, retain=False) publish(f"{prefix}/motion_score", motion_score_val, retain=False) publish(f"{prefix}/litter_box_stuck", stuck_val, retain=False) publish(f"{prefix}/needs_scooping", scoop_val, retain=False) publish(f"{prefix}/cleanliness_score", clean_val, retain=False) publish(f"{prefix}/box_state", box_state_val, retain=False) publish(f"{prefix}/status_message", msg_val, retain=False) if results.get("cat_visit_complete"): log.info("Cat visit completed!") if results.get("litter_box_stuck"): log.warning("LITTER BOX STUCK CLOSED!") # --------------------------------------------------------------------------- # Signal handling # --------------------------------------------------------------------------- def signal_handler(sig, frame): global shutdown_flag log.info("Received signal %d, shutting down...", sig) shutdown_flag = True # --------------------------------------------------------------------------- # Main # --------------------------------------------------------------------------- def main(): global mqtt_client parser = argparse.ArgumentParser(description="Litter Box Camera Analyzer") parser.add_argument("--broker", default=CONFIG["mqtt_broker"], help="MQTT broker host") parser.add_argument("--port", type=int, default=CONFIG["mqtt_port"], help="MQTT broker port") parser.add_argument("--username", default=CONFIG["mqtt_username"], help="MQTT username") parser.add_argument("--password", default=CONFIG["mqtt_password"], help="MQTT password") parser.add_argument("--debug-dir", default=CONFIG["debug_dir"], help="Save annotated images to this directory") parser.add_argument("--motion-threshold", type=float, default=CONFIG["motion_threshold"], help="Avg pixel diff for motion (0-255, default 15)") parser.add_argument("--box-state-threshold", type=float, default=CONFIG["box_state_threshold"], help="Avg pixel diff from open ref to flag closed (default 30)") parser.add_argument("--stuck-frames", type=int, default=CONFIG["stuck_closed_frames"], help="Consecutive 'closed' frames before stuck alert") parser.add_argument("--calibration-frames", type=int, default=CONFIG["calibration_frames"], help="Frames to average for open reference") args = parser.parse_args() CONFIG["mqtt_broker"] = args.broker CONFIG["mqtt_port"] = args.port CONFIG["mqtt_username"] = args.username CONFIG["mqtt_password"] = args.password CONFIG["motion_threshold"] = args.motion_threshold CONFIG["box_state_threshold"] = args.box_state_threshold CONFIG["stuck_closed_frames"] = args.stuck_frames CONFIG["calibration_frames"] = args.calibration_frames if args.debug_dir: CONFIG["debug_dir"] = args.debug_dir os.makedirs(args.debug_dir, exist_ok=True) calibration_frames.clear() for _ in range(CONFIG["calibration_frames"]): calibration_frames.append(None) calibration_frames.clear() signal.signal(signal.SIGINT, signal_handler) signal.signal(signal.SIGTERM, signal_handler) mqtt_client = mqtt.Client(mqtt.CallbackAPIVersion.VERSION2, client_id="litter_box_analyzer") mqtt_client.on_connect = on_connect mqtt_client.on_disconnect = on_disconnect mqtt_client.on_message = on_message if CONFIG["mqtt_username"]: mqtt_client.username_pw_set(CONFIG["mqtt_username"], CONFIG["mqtt_password"]) mqtt_client.connect_async(CONFIG["mqtt_broker"], CONFIG["mqtt_port"], keepalive=60) mqtt_client.loop_start() log.info("=== Litter Box Analyzer Started ===") log.info(" MQTT: %s:%d", CONFIG["mqtt_broker"], CONFIG["mqtt_port"]) log.info(" Image topic: %s", CONFIG["mqtt_topic_image"]) log.info(" Status prefix: %s", CONFIG["mqtt_topic_prefix"]) log.info(" Motion threshold: %.1f Box closed threshold: %.1f", CONFIG["motion_threshold"], CONFIG["box_state_threshold"]) log.info(" Stuck-closed frames: %d Calibration: %d frames", CONFIG["stuck_closed_frames"], CONFIG["calibration_frames"]) log.info(" Waiting for %d frames to calibrate open reference...", CONFIG["calibration_frames"]) try: while not shutdown_flag: time.sleep(1) finally: mqtt_client.loop_stop() mqtt_client.disconnect() log.info("Litter Box Analyzer stopped") if __name__ == "__main__": main()