Source code for steamloop.cli

"""Command-line interface for steamloop thermostat control."""

from __future__ import annotations

import argparse
import asyncio
import logging
import sys
from typing import TYPE_CHECKING, Any

import orjson

from .connection import ThermostatConnection, load_pairing, save_pairing
from .const import DEFAULT_PORT, FanMode, HoldType, ZoneMode
from .exceptions import SteamloopConnectionError, SteamloopError

if TYPE_CHECKING:
    from .models import ThermostatState

_LOGGER = logging.getLogger(__name__)

_EHEAT_LABELS = {"": "N/A", "0": "Off", "1": "On"}
_ACTIVE_LABELS = {"": "N/A", "0": "Inactive", "1": "Idle", "2": "Active"}

_HOLD_MAP: dict[str, HoldType] = {
    "manual": HoldType.MANUAL,
    "man": HoldType.MANUAL,
    "schedule": HoldType.SCHEDULE,
    "sched": HoldType.SCHEDULE,
    "hold": HoldType.HOLD,
    "next": HoldType.HOLD,
}
_MODE_MAP: dict[str, ZoneMode] = {
    "off": ZoneMode.OFF,
    "auto": ZoneMode.AUTO,
    "cool": ZoneMode.COOL,
    "heat": ZoneMode.HEAT,
}
_FAN_MAP: dict[str, FanMode] = {
    "auto": FanMode.AUTO,
    "on": FanMode.ALWAYS_ON,
    "always": FanMode.ALWAYS_ON,
    "circulate": FanMode.CIRCULATE,
}


def _print_state(state: ThermostatState) -> None:
    """Display the current thermostat state."""
    fan = state.fan_mode.name.replace("_", " ").title()
    eheat = _EHEAT_LABELS.get(state.emergency_heat, state.emergency_heat)
    cooling = _ACTIVE_LABELS.get(state.cooling_active, state.cooling_active)
    heating = _ACTIVE_LABELS.get(state.heating_active, state.heating_active)
    modes = ", ".join(m.name for m in state.supported_modes)
    print("\n--- Thermostat State ---")
    print(f"  Fan mode: {fan}")
    print(f"  Emergency heat: {eheat}")
    print(f"  Relative humidity: {state.relative_humidity}%")
    print(f"  Cooling: {cooling}")
    print(f"  Heating: {heating}")
    print(f"  Supported modes: {modes}")
    for zid, zone in sorted(state.zones.items()):
        print(f"  Zone {zid} ({zone.name}):")
        print(f"    Temperature: {zone.indoor_temperature}\u00b0F")
        print(f"    Mode: {zone.mode.name}")
        print(f"    Heat setpoint: {zone.heat_setpoint}\u00b0F")
        print(f"    Cool setpoint: {zone.cool_setpoint}\u00b0F")
        print(f"    Deadband: {zone.deadband}")
        print(f"    Hold: {zone.hold_type.name.title()}")
    print("------------------------\n")


def _print_help(active_zone: str) -> None:
    """Display available commands."""
    print("Commands:")
    print("  status                      Show thermostat state")
    print(f"  zone <id>                   Select active zone (current: {active_zone})")
    print("  heat <temp>                 Set heat setpoint")
    print("  cool <temp>                 Set cool setpoint")
    print("  setpoint <heat> <cool>      Set both setpoints")
    print("  hold <manual|schedule|hold> Set hold type")
    print("  mode <off|auto|cool|heat>   Set zone HVAC mode")
    print("  fan <auto|on|circulate>     Set fan mode")
    print("  eheat <on|off>              Set emergency heat")
    print("  raw <json>                  Send raw JSON message")
    print("  ping                        Send heartbeat")
    print("  quit                        Disconnect and exit")


def _cmd_heat(conn: ThermostatConnection, active_zone: str, temp: str) -> None:
    """Handle the heat command."""
    print(f"Setting heat setpoint to {temp} (zone {active_zone})")
    conn.set_temperature_setpoint(active_zone, heat_setpoint=temp)


def _cmd_cool(conn: ThermostatConnection, active_zone: str, temp: str) -> None:
    """Handle the cool command."""
    print(f"Setting cool setpoint to {temp} (zone {active_zone})")
    conn.set_temperature_setpoint(active_zone, cool_setpoint=temp)


def _handle_command(
    conn: ThermostatConnection,
    cmd: str,
    parts: list[str],
    active_zone: str,
) -> str | None:
    """
    Handle a single interactive command.

    Returns the (possibly updated) active_zone, or None to quit.
    """
    if parts[0] in ("quit", "q"):
        return None

    if parts[0] == "status":
        _print_state(conn.state)

    elif parts[0] == "zone" and len(parts) >= 2:
        zid = parts[1]
        if zid in conn.state.zones:
            active_zone = zid
            print(f"Active zone: {zid} ({conn.state.zones[zid].name})")
        else:
            available = ", ".join(sorted(conn.state.zones))
            print(f"Zone {zid} not found. Available: {available}")

    elif parts[0] == "heat" and len(parts) >= 2:
        _cmd_heat(conn, active_zone, parts[1])

    elif parts[0] == "cool" and len(parts) >= 2:
        _cmd_cool(conn, active_zone, parts[1])

    elif parts[0] == "setpoint" and len(parts) >= 3:
        print(f"Setting heat={parts[1]}, cool={parts[2]} (zone {active_zone})")
        conn.set_temperature_setpoint(
            active_zone, heat_setpoint=parts[1], cool_setpoint=parts[2]
        )

    elif parts[0] == "hold" and len(parts) >= 2:
        ht = _HOLD_MAP.get(parts[1])
        if ht is not None:
            print(f"Setting hold type to {parts[1]} (zone {active_zone})")
            conn.set_temperature_setpoint(active_zone, hold_type=ht)
        else:
            print(f"Unknown hold type: {parts[1]}. Try: manual, schedule, hold")

    elif parts[0] == "mode" and len(parts) >= 2:
        mode = _MODE_MAP.get(parts[1])
        if mode is not None:
            print(f"Setting zone mode to {parts[1]} (zone {active_zone})")
            conn.set_zone_mode(active_zone, mode)
        else:
            print(f"Unknown mode: {parts[1]}")

    elif parts[0] == "fan" and len(parts) >= 2:
        fan = _FAN_MAP.get(parts[1])
        if fan is not None:
            print(f"Setting fan mode to {parts[1]}")
            conn.set_fan_mode(fan)
        else:
            print(f"Unknown fan mode: {parts[1]}")

    elif parts[0] == "eheat" and len(parts) >= 2:
        enabled = parts[1] in ("on", "1", "true", "enable")
        print(f"Setting emergency heat {'on' if enabled else 'off'}")
        conn.set_emergency_heat(enabled)

    elif cmd.lower().startswith("raw "):
        raw_json = cmd[4:].strip()
        try:
            msg = orjson.loads(raw_json)
            print(f"Sending: {orjson.dumps(msg).decode()}")
            conn.send(msg)
        except orjson.JSONDecodeError as exc:
            print(f"Invalid JSON: {exc}")

    elif parts[0] == "ping":
        print("Sending heartbeat")
        conn.heartbeat()

    elif parts[0] in ("help", "?"):
        _print_help(active_zone)

    else:
        print("Unknown command. Type 'help' for available commands.")

    return active_zone


async def _do_pair(ip: str, port: int) -> None:
    """Run pairing mode to obtain a secret key from the thermostat."""
    print("\n=== PAIRING MODE ===")
    print("Make sure the thermostat is in pairing mode!")
    print("(On the thermostat: Menu > Settings > Remote Access > Pair New Device)\n")

    conn = ThermostatConnection(ip, port, secret_key="")
    try:
        await conn.connect()
        ssk = await conn.pair()
    except (SteamloopError, OSError) as exc:
        print(f"Pairing failed: {exc}")
        print("\nMake sure:")
        print("  1. The thermostat IP is correct")
        print("  2. The thermostat is in pairing mode")
        print("  3. You are on the same network as the thermostat")
        return
    finally:
        await conn.disconnect()

    secret_key = ssk["secret_key"]
    if secret_key:
        await save_pairing(
            ip,
            {
                "secret_key": secret_key,
                "device_type": "automation",
                "device_id": "module",
            },
        )
        print("Pairing complete! Reconnecting...\n")
    else:
        print("Already paired. Connecting...\n")

    await asyncio.sleep(2)
    await _do_monitor(ip, port)


async def _do_monitor(ip: str, port: int, secret_key: str | None = None) -> None:
    """Run monitoring mode with interactive command loop."""
    if secret_key is not None:
        print("\n=== MONITORING MODE ===")
        print(f"Using provided secret key for {ip}")
        device_type = "automation"
        device_id = "module"
    else:
        pairing = await load_pairing(ip)
        if not pairing:
            print("No pairing found. Run with --pair or --key first.")
            return
        print("\n=== MONITORING MODE ===")
        print(f"Using saved pairing for {ip}")
        secret_key = pairing["secret_key"]
        device_type = pairing.get("device_type", "automation")
        device_id = pairing.get("device_id", "module")

    conn = ThermostatConnection(
        ip,
        port,
        secret_key=secret_key,
        device_type=device_type,
        device_id=device_id,
    )

    def on_event(msg: dict[str, Any]) -> None:
        print(f"[<] {orjson.dumps(msg).decode()}")

    conn.add_event_callback(on_event)

    try:
        async with conn:
            active_zone = "1"
            print("\nListening for thermostat events... (Ctrl+C to quit)")
            _print_help(active_zone)
            print()

            loop = asyncio.get_running_loop()
            while conn.connected:
                try:
                    line = await loop.run_in_executor(None, sys.stdin.readline)
                    cmd = line.strip()
                except EOFError:
                    break

                if not cmd:
                    continue

                parts = cmd.lower().split()
                try:
                    result = _handle_command(conn, cmd, parts, active_zone)
                    if result is None:
                        break
                    active_zone = result
                except SteamloopConnectionError as exc:
                    print(f"Connection error: {exc}")
                    break

    except SteamloopError as exc:
        print(f"Connection failed: {exc}")
    except KeyboardInterrupt:
        pass

    print("\nDisconnected.")


[docs] def main() -> None: """Entry point for the steamloop CLI.""" parser = argparse.ArgumentParser(description="Thermostat Local Control CLI") parser.add_argument("ip", help="Thermostat IP address") parser.add_argument( "--port", type=int, default=DEFAULT_PORT, help=f"Port (default: {DEFAULT_PORT})", ) parser.add_argument("--pair", action="store_true", help="Enter pairing mode") parser.add_argument( "--key", help="Secret key from a previous pairing (skip pairing file lookup)" ) parser.add_argument("--debug", action="store_true", help="Enable debug logging") args = parser.parse_args() level = logging.DEBUG if args.debug else logging.INFO logging.basicConfig( level=level, format="%(asctime)s [%(levelname)s] %(message)s", datefmt="%H:%M:%S", ) if args.pair: asyncio.run(_do_pair(args.ip, args.port)) else: asyncio.run(_do_monitor(args.ip, args.port, secret_key=args.key))