Skip to content

Commit

Permalink
Version 4.2.0, revert last changes, keep dataclasses, fix issue with …
Browse files Browse the repository at this point in the history
…accessing the status attribute
  • Loading branch information
autinerd committed Jan 7, 2024
1 parent e472aa8 commit 842a67e
Show file tree
Hide file tree
Showing 2 changed files with 94 additions and 26 deletions.
2 changes: 1 addition & 1 deletion openwebif/__init__.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,3 @@
"""Top-level package for openwebif."""

__version__ = "4.1.0"
__version__ = "4.2.0"
118 changes: 93 additions & 25 deletions openwebif/api.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,9 +2,10 @@

import logging
import unicodedata
from dataclasses import dataclass
from re import sub
from time import time
from typing import Any, Mapping, cast
from typing import Any, Mapping

import aiohttp
from yarl import URL
Expand Down Expand Up @@ -40,22 +41,54 @@ def enable_logging() -> None:
"""Set up the logging for home assistant."""
logging.basicConfig(level=logging.INFO)

# pylint: disable-next=too-many-instance-attributes, too-many-public-methods

@dataclass
class OpenWebIfServiceEvent:
"""Represent a OpenWebIf service event."""

filename: str | None = None
id: int | None = None
name: str | None = None
serviceref: str | None = None
begin: str | None = None
begin_timestamp: int | None = None
end: str | None = None
end_timestamp: int | None = None
description: str | None = None
fulldescription: str | None = None
station: str | None = None


@dataclass
class OpenWebIfStatus:
"""Repesent a OpenWebIf status."""

currservice: OpenWebIfServiceEvent
volume: int | None = None
muted: bool | None = None
in_standby: bool | None = False
is_recording: bool | None = False
streaming_list: str | None = None
is_streaming: bool | None = False
status_info: dict | None = None
is_recording_playback: bool | None = False


class OpenWebIfDevice:
"""Represent a OpenWebIf client device."""

_session: aiohttp.ClientSession | None
base: URL
status: OpenWebIfStatus
is_offline: bool = False
turn_off_to_deep: bool
picon_url: str | None = None
source_bouquet: str | None = None
mac_address: str | None = None
sources: dict[str, Any] | None = None
source_list: list[str] | None = None
status_info: dict[str, Any] = {}

# pylint: disable-next=too-many-arguments
# pylint: disable=too-many-arguments, disable=too-many-instance-attributes
def __init__(
self,
host: str | aiohttp.ClientSession,
Expand Down Expand Up @@ -92,6 +125,7 @@ def __init__(
self._session = host
self.turn_off_to_deep = turn_off_to_deep
self.source_bouquet = source_bouquet
self.status = OpenWebIfStatus(currservice=OpenWebIfServiceEvent())

async def close(self) -> None:
"""Close the connection."""
Expand All @@ -101,7 +135,7 @@ async def close(self) -> None:

def default_all(self) -> None:
"""Set all properties to default."""
self.status_info = {}
self.status = OpenWebIfStatus(currservice=OpenWebIfServiceEvent())

async def get_about(self) -> dict[str, Any] | None:
"""Get general information."""
Expand All @@ -113,28 +147,64 @@ async def get_status_info(self) -> dict[str, Any] | None:

async def update(self) -> None:
"""Refresh current state based from <host>/api/statusinfo."""
self.status_info = await self._call_api(PATH_STATUSINFO) or {}
self.status.status_info = await self._call_api(PATH_STATUSINFO)

if self.is_offline or self.status_info == {}:
if self.is_offline or not self.status.status_info:
self.default_all()
return

self.status.currservice.filename = self.status.status_info.get(
"currservice_filename"
)
self.status.currservice.id = self.status.status_info.get("currservice_id")
self.status.currservice.name = self.status.status_info.get("currservice_name")
self.status.currservice.serviceref = self.status.status_info.get(
"currservice_serviceref"
)
self.status.currservice.begin = self.status.status_info.get("currservice_begin")
self.status.currservice.begin_timestamp = self.status.status_info.get(
"currservice_begin_timestamp"
)

self.status.currservice.end = self.status.status_info.get("currservice_end")

self.status.currservice.end_timestamp = self.status.status_info.get(
"currservice_end_timestamp"
)

self.status.currservice.description = self.status.status_info.get(
"currservice_description"
)
self.status.currservice.station = self.status.status_info.get(
"currservice_station"
)
self.status.currservice.fulldescription = self.status.status_info.get(
"currservice_fulldescription"
)
self.status.in_standby = self.status.status_info["inStandby"] == "true"
self.status.is_recording = self.status.status_info["isRecording"] == "true"
if "isStreaming" in self.status.status_info:
self.status.is_streaming = self.status.status_info["isStreaming"] == "true"
else:
self.status.is_streaming = None
self.status.muted = self.status.status_info.get("muted")
self.status.volume = self.status.status_info.get("volume")

if not self.sources:
self.sources = await self.get_bouquet_sources(bouquet=self.source_bouquet)
self.source_list = list(self.sources.keys())

if self.get_current_playback_type() == PlaybackType.RECORDING:
# try get correct channel name
channel_name = self.get_channel_name_from_serviceref()
self.status_info["currservice_station"] = channel_name
self.status_info[
"currservice_name"
] = f"🔴 {self.status_info['currservice_name']}"
self.status.status_info["currservice_station"] = channel_name
self.status.currservice.station = channel_name
self.status.currservice.name = f"🔴 {self.status.currservice.name}"

if not self.status_info["inStandby"]:
if not self.status.in_standby:
url = await self.get_current_playing_picon_url(
channel_name=self.status_info["currservice_station"],
currservice_serviceref=self.status_info["currservice_serviceref"],
channel_name=self.status.currservice.station,
currservice_serviceref=self.status.currservice.serviceref,
)
self.picon_url = str(self.base.with_path(url)) if url is not None else None

Expand Down Expand Up @@ -250,11 +320,11 @@ def is_currently_recording_playback(self) -> bool:
def get_current_playback_type(self) -> PlaybackType | None:
"""Get the currservice_serviceref playing media type.
:return: PlaybackType.live or PlaybackType.recording
:return: PlaybackType.LIVE or PlaybackType.RECORDING
"""

if self.status_info.get("currservice_serviceref"):
if self.status_info["currservice_serviceref"].startswith("1:0:0"):
if self.status.currservice and self.status.currservice.serviceref:
if self.status.currservice.serviceref.startswith("1:0:0"):
# This is a recording, not a live channel
return PlaybackType.RECORDING

Expand All @@ -273,11 +343,11 @@ async def get_current_playing_picon_url(
"""

if channel_name is None:
channel_name = self.status_info["currservice_station"]
channel_name = self.status.currservice.station

currservice_serviceref = str(self.status_info["currservice_serviceref"])
currservice_serviceref = str(self.status.currservice.serviceref)

if self.is_currently_recording_playback():
if self.status.is_recording_playback:
channel_name = self.get_channel_name_from_serviceref()

url = f"/picon/{self.get_picon_name(str(channel_name))}.png"
Expand Down Expand Up @@ -306,15 +376,13 @@ async def get_current_playing_picon_url(
def get_channel_name_from_serviceref(self) -> str | None:
"""Try to get the channel name from the recording file name."""
try:
if self.status_info["currservice_serviceref"] is None:
if self.status.currservice.serviceref is None:
return None
return cast(
str, self.status_info["currservice_serviceref"].split("-")[1].strip()
)
return self.status.currservice.serviceref.split("-")[1].strip()
# pylint: disable=broad-except
except Exception:
_LOGGER.debug("cannot determine channel name from recording")
return cast(str, self.status_info["currservice_serviceref"])
return self.status.currservice.serviceref

async def url_exists(self, url: str) -> bool:
"""Check if a given URL responds to a HEAD request.
Expand Down

0 comments on commit 842a67e

Please sign in to comment.