sensor.py 5.6 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145
  1. """GitHub sensor platform."""
  2. from __future__ import annotations
  3. from collections.abc import Callable
  4. from datetime import timedelta
  5. import logging
  6. from typing import Any
  7. from aiopjlink import PJLink, PJLinkException, PJLinkProjectorError, Power, Sources, Lamp, Information
  8. from homeassistant import config_entries, core
  9. from homeassistant.components.sensor import PLATFORM_SCHEMA
  10. from homeassistant.const import CONF_HOST, CONF_PORT, CONF_NAME, CONF_PASSWORD, CONF_TIMEOUT
  11. import homeassistant.helpers.config_validation as cv
  12. from homeassistant.helpers.entity import Entity
  13. from homeassistant.helpers.typing import ConfigType, DiscoveryInfoType, HomeAssistantType
  14. import voluptuous as vol
  15. from .const import DOMAIN, CONF_ENCODING, DEFAULT_ENCODING, DEFAULT_PORT, DEFAULT_TIMEOUT, ATTR_PRODUCT_NAME, ATTR_MANUFACTURER_NAME, ATTR_PROJECTOR_NAME, ATTR_RESOLUTION_X, ATTR_RESOLUTION_Y, ATTR_LAMP_HOURS, ProjectorState
  16. _LOGGER = logging.getLogger(__name__)
  17. # Time between updating data from projector
  18. SCAN_INTERVAL = timedelta(seconds=3)
  19. PLATFORM_SCHEMA = PLATFORM_SCHEMA.extend(
  20. {
  21. vol.Required(CONF_HOST): cv.string,
  22. vol.Optional(CONF_PORT, default=DEFAULT_PORT): cv.port,
  23. vol.Optional(CONF_NAME): cv.string,
  24. vol.Optional(CONF_ENCODING, default=DEFAULT_ENCODING): cv.string,
  25. vol.Optional(CONF_PASSWORD) : cv.string,
  26. vol.Optional(CONF_TIMEOUT, default=DEFAULT_TIMEOUT) : cv.positive_float
  27. }
  28. )
  29. async def async_setup_platform(
  30. hass: HomeAssistantType,
  31. config: ConfigType,
  32. async_add_entities: Callable,
  33. discovery_info: DiscoveryInfoType | None = None,
  34. ) -> None:
  35. """Set up the sensor platform."""
  36. host = config.get(CONF_HOST)
  37. port = config.get(CONF_PORT)
  38. password = config.get(CONF_PASSWORD)
  39. timeout = config.get(CONF_TIMEOUT)
  40. name = config.get(CONF_NAME)
  41. pjl = PJLink(host, port, password, timeout)
  42. sensors = [PJLink2Sensor(pjl, name)]
  43. async_add_entities(sensors, update_before_add=True)
  44. class PJLink2Sensor(Entity):
  45. """Representation of a PJLink2 sensor."""
  46. def __init__(self, pjl, name):
  47. super().__init__()
  48. self._projector = pjl
  49. self.attrs: dict[str, Any] = {}
  50. self._name = name
  51. self._state = None
  52. self._available = False
  53. async def async_will_remove_from_hass(self) -> None:
  54. """Close connection."""
  55. await super().async_will_remove_from_hass()
  56. try:
  57. await self._projector.__aexit__(0,0,0)
  58. except PJLinkException as err:
  59. _LOGGER.exception("PJLink2 ERROR for %s: %s", self._name, repr(err))
  60. else:
  61. _LOGGER.info("PJLink2 INFO for %s: Connection closed.", self._name)
  62. @property
  63. def name(self) -> str:
  64. """Return the name of the entity."""
  65. return self._name
  66. @property
  67. def unique_id(self) -> str:
  68. """Return the unique ID of the sensor."""
  69. return self._projector._address
  70. @property
  71. def available(self) -> bool:
  72. """Return True if entity is available."""
  73. return self._available
  74. @property
  75. def state(self) -> str | None:
  76. return self._state
  77. @property
  78. def extra_state_attributes(self) -> dict[str, Any]:
  79. return self.attrs
  80. async def async_update(self) -> None:
  81. """Update all sensors."""
  82. try:
  83. if not self._available:
  84. # connect and init static information
  85. await self._projector.__aenter__()
  86. self._available = True
  87. info = await Information(self._projector).table()
  88. self.attrs[ATTR_PRODUCT_NAME] = info["product_name"]
  89. self.attrs[ATTR_MANUFACTURER_NAME] = info["manufacturer_name"]
  90. self.attrs[ATTR_PROJECTOR_NAME] = info["projector_name"]
  91. if self._name == None: self._name = info["projector_name"]
  92. _LOGGER.info("PJLink2 INFO for %s: Connection opened.", self._name)
  93. pwr = await Power(self._projector).get()
  94. if pwr == Power.State.OFF: self._state = ProjectorState.OFF
  95. elif pwr == Power.State.ON: self._state = ProjectorState.ON
  96. elif pwr == Power.State.COOLING: self._state = ProjectorState.COOLING
  97. elif pwr == Power.State.WARMING: self._state = ProjectorState.WARMING
  98. if pwr==Power.ON:
  99. res = await Sources(self._projector).resolution()
  100. self.attrs[ATTR_RESOLUTION_X] = res[0]
  101. self.attrs[ATTR_RESOLUTION_Y] = res[1]
  102. lmpHrs = await Lamp(self._projector).hours()
  103. self.attrs[ATTR_LAMP_HOURS] = lmpHrs
  104. else:
  105. if ATTR_RESOLUTION_X in self.attrs: del self.attrs[ATTR_RESOLUTION_X]
  106. if ATTR_RESOLUTION_Y in self.attrs: del self.attrs[ATTR_RESOLUTION_Y]
  107. except PJLinkProjectorError:
  108. # resolution cannot be queried due to no input
  109. if ATTR_RESOLUTION_X in self.attrs: del self.attrs[ATTR_RESOLUTION_X]
  110. if ATTR_RESOLUTION_Y in self.attrs: del self.attrs[ATTR_RESOLUTION_Y]
  111. _LOGGER.info("PJLink2 INFO for %s: Cannot get resolution", self._name)
  112. except PJLinkException as err:
  113. self._state = None
  114. self._available = False
  115. _LOGGER.exception("PJLink2 ERROR for %s: %s", self._name, repr(err))
  116. try:
  117. await self._projector.__aexit__(0,0,0)
  118. except PJLinkException as err:
  119. _LOGGER.exception("PJLink2 ERROR for %s: %s", self._name, repr(err))
  120. else:
  121. _LOGGER.info("PJLink2 INFO for %s: Connection closed.", self._name)