diff --git a/.gitignore b/.gitignore index 43681bc..3406207 100644 --- a/.gitignore +++ b/.gitignore @@ -103,4 +103,7 @@ venv.bak/ # mypy .mypy_cache/ -.vscode \ No newline at end of file +.vscode +.idea + +token.json diff --git a/README.md b/README.md index d5e1ba4..e4d878b 100644 --- a/README.md +++ b/README.md @@ -1,4 +1,5 @@ # python-panasonic-comfort-cloud + A python module for reading and changing status of panasonic climate devices through Panasonic Comfort Cloud app api ## Command line usage @@ -17,7 +18,7 @@ positional arguments: set Set status of a device dump Dump raw data of a device history Dump history of a device - + optional arguments: -h, --help show this help message and exit -t TOKEN, --token TOKEN @@ -95,12 +96,11 @@ optional arguments: ## Module usage - ```python import pcomfortcloud -session = pcomfortcloud.Session('user@example.com', 'mypassword') -session.login() +session = pcomfortcloud.ApiClient('user@example.com', 'mypassword') +session.start_session() devices = session.get_devices() @@ -114,8 +114,10 @@ session.set_device(devices[0]['id'], ``` ## PyPi package + can be found at https://pypi.org/project/pcomfortcloud/ ### How to publish package; + - `python .\setup.py sdist bdist_wheel` - `python -m twine upload dist/*` diff --git a/pcomfortcloud.py b/pcomfortcloud.py index 315a437..bb0a03d 100755 --- a/pcomfortcloud.py +++ b/pcomfortcloud.py @@ -1,5 +1,301 @@ #!/usr/bin/env python3 -""" Command line interface for Panasonic Comfort Cloud """ -from pcomfortcloud import __main__ -__main__.main() +import argparse +import os +import json +from enum import Enum + +import pcomfortcloud + + +def print_result(obj, indent=0): + for key in obj: + value = obj[key] + + if isinstance(value, dict): + print(" "*indent + key) + print_result(value, indent + 4) + elif isinstance(value, Enum): + print( + " "*indent + "{0: <{width}}: {1}".format(key, value.name, width=25-indent)) + elif isinstance(value, list): + print(" "*indent + "{0: <{width}}:".format(key, width=25-indent)) + for elt in value: + print_result(elt, indent + 4) + print("") + else: + print(" "*indent + + "{0: <{width}}: {1}".format(key, value, width=25-indent)) + + +def str2bool(boolean_string_value): + if boolean_string_value.lower() in ('yes', 'true', 't', 'y', '1'): + return True + if boolean_string_value.lower() in ('no', 'false', 'f', 'n', '0'): + return False + raise argparse.ArgumentTypeError('Boolean value expected.') + + +def main(): + """ Start pcomfortcloud Comfort Cloud command line """ + + parser = argparse.ArgumentParser( + description='Read or change status of pcomfortcloud Climate devices') + + parser.add_argument( + 'username', + help='Username for pcomfortcloud Comfort Cloud') + + parser.add_argument( + 'password', + help='Password for pcomfortcloud Comfort Cloud') + + parser.add_argument( + '-t', '--token', + help='File to store token in', + default='token.json') + + parser.add_argument( + '-r', '--raw', + help='Raw dump of response', + type=str2bool, nargs='?', const=True, + default=False) + + commandparser = parser.add_subparsers( + help='commands', + dest='command') + + commandparser.add_parser( + 'list', + help="Get a list of all devices") + + get_parser = commandparser.add_parser( + 'get', + help="Get status of a device") + + get_parser.add_argument( + dest='device', + type=int, + help='Device number #') + + set_parser = commandparser.add_parser( + 'set', + help="Set status of a device") + + set_parser.add_argument( + dest='device', + type=int, + help='Device number #' + ) + + set_parser.add_argument( + '-p', '--power', + choices=[ + pcomfortcloud.constants.Power.On.name, + pcomfortcloud.constants.Power.Off.name], + help='Power mode') + + set_parser.add_argument( + '-t', '--temperature', + type=float, + help="Temperature") + + set_parser.add_argument( + '-f', '--fanSpeed', + choices=[ + pcomfortcloud.constants.FanSpeed.Auto.name, + pcomfortcloud.constants.FanSpeed.Low.name, + pcomfortcloud.constants.FanSpeed.LowMid.name, + pcomfortcloud.constants.FanSpeed.Mid.name, + pcomfortcloud.constants.FanSpeed.HighMid.name, + pcomfortcloud.constants.FanSpeed.High.name], + help='Fan speed') + + set_parser.add_argument( + '-m', '--mode', + choices=[ + pcomfortcloud.constants.OperationMode.Auto.name, + pcomfortcloud.constants.OperationMode.Cool.name, + pcomfortcloud.constants.OperationMode.Dry.name, + pcomfortcloud.constants.OperationMode.Heat.name, + pcomfortcloud.constants.OperationMode.Fan.name], + help='Operation mode') + + set_parser.add_argument( + '-e', '--eco', + choices=[ + pcomfortcloud.constants.EcoMode.Auto.name, + pcomfortcloud.constants.EcoMode.Quiet.name, + pcomfortcloud.constants.EcoMode.Powerful.name], + help='Eco mode') + + set_parser.add_argument( + '-n', '--nanoe', + choices=[ + pcomfortcloud.constants.NanoeMode.On.name, + pcomfortcloud.constants.NanoeMode.Off.name, + pcomfortcloud.constants.NanoeMode.ModeG.name, + pcomfortcloud.constants.NanoeMode.All.name], + help='Nanoe mode') + + # set_parser.add_argument( + # '--airswingauto', + # choices=[ + # pcomfortcloud.constants.AirSwingAutoMode.Disabled.name, + # pcomfortcloud.constants.AirSwingAutoMode.AirSwingLR.name, + # pcomfortcloud.constants.AirSwingAutoMode.AirSwingUD.name, + # pcomfortcloud.constants.AirSwingAutoMode.Both.name], + # help='Automation of air swing') + + set_parser.add_argument( + '-y', '--airSwingVertical', + choices=[ + pcomfortcloud.constants.AirSwingUD.Auto.name, + pcomfortcloud.constants.AirSwingUD.Down.name, + pcomfortcloud.constants.AirSwingUD.DownMid.name, + pcomfortcloud.constants.AirSwingUD.Mid.name, + pcomfortcloud.constants.AirSwingUD.UpMid.name, + pcomfortcloud.constants.AirSwingUD.Up.name], + help='Vertical position of the air swing') + + set_parser.add_argument( + '-x', '--airSwingHorizontal', + choices=[ + pcomfortcloud.constants.AirSwingLR.Auto.name, + pcomfortcloud.constants.AirSwingLR.Left.name, + pcomfortcloud.constants.AirSwingLR.LeftMid.name, + pcomfortcloud.constants.AirSwingLR.Mid.name, + pcomfortcloud.constants.AirSwingLR.RightMid.name, + pcomfortcloud.constants.AirSwingLR.Right.name], + help='Horizontal position of the air swing') + + dump_parser = commandparser.add_parser( + 'dump', + help="Dump data of a device") + + dump_parser.add_argument( + dest='device', + type=int, + help='Device number 1-x') + + history_parser = commandparser.add_parser( + 'history', + help="Dump history of a device") + + history_parser.add_argument( + dest='device', + type=int, + help='Device number 1-x') + + history_parser.add_argument( + dest='mode', + type=str, + help='mode (Day, Week, Month, Year)') + + history_parser.add_argument( + dest='date', + type=str, + help='date of day like 20190807') + + args = parser.parse_args() + + if os.path.isfile(args.token): + with open(args.token, "r") as token_file: + json_token = json.load(token_file) + else: + json_token = None + + session = pcomfortcloud.ApiClient( + args.username, + args.password, + json_token, + args.raw) + + session.start_session() + json_token = session.get_token() + + with open(args.token, "w") as token_file: + json.dump(json_token, token_file, indent=4) + + try: + if args.command == 'list': + print("list of devices and its device id (1-x)") + for idx, device in enumerate(session.get_devices()): + if idx > 0: + print('') + + print("device #{}".format(idx + 1)) + print_result(device, 4) + + if args.command == 'get': + if int(args.device) <= 0 or int(args.device) > len(session.get_devices()): + raise Exception("device not found, acceptable device id is from {} to {}".format( + 1, len(session.get_devices()))) + + device = session.get_devices()[int(args.device) - 1] + print("reading from device '{}' ({})".format( + device['name'], device['id'])) + + print_result(session.get_device(device['id'])) + + if args.command == 'set': + if int(args.device) <= 0 or int(args.device) > len(session.get_devices()): + raise Exception("device not found, acceptable device id is from {} to {}".format( + 1, len(session.get_devices()))) + + device = session.get_devices()[int(args.device) - 1] + print("writing to device '{}' ({})".format( + device['name'], device['id'])) + + kwargs = {} + + if args.power is not None: + kwargs['power'] = pcomfortcloud.constants.Power[args.power] + + if args.temperature is not None: + kwargs['temperature'] = args.temperature + + if args.fanSpeed is not None: + kwargs['fanSpeed'] = pcomfortcloud.constants.FanSpeed[args.fanSpeed] + + if args.mode is not None: + kwargs['mode'] = pcomfortcloud.constants.OperationMode[args.mode] + + if args.eco is not None: + kwargs['eco'] = pcomfortcloud.constants.EcoMode[args.eco] + + if args.nanoe is not None: + kwargs['nanoe'] = pcomfortcloud.constants.NanoeMode[args.nanoe] + + if args.airSwingHorizontal is not None: + kwargs['airSwingHorizontal'] = pcomfortcloud.constants.AirSwingLR[args.airSwingHorizontal] + + if args.airSwingVertical is not None: + kwargs['airSwingVertical'] = pcomfortcloud.constants.AirSwingUD[args.airSwingVertical] + + session.set_device(device['id'], **kwargs) + + if args.command == 'dump': + if int(args.device) <= 0 or int(args.device) > len(session.get_devices()): + raise Exception("device not found, acceptable device id is from {} to {}".format( + 1, len(session.get_devices()))) + + device = session.get_devices()[int(args.device) - 1] + + print_result(session.dump(device['id'])) + + if args.command == 'history': + if int(args.device) <= 0 or int(args.device) > len(session.get_devices()): + raise Exception("device not found, acceptable device id is from {} to {}".format( + 1, len(session.get_devices()))) + + device = session.get_devices()[int(args.device) - 1] + + print_result(session.history(device['id'], args.mode, args.date)) + + except pcomfortcloud.ResponseError as ex: + print(ex) + + +if __name__ == "__main__": + main() diff --git a/pcomfortcloud/__init__.py b/pcomfortcloud/__init__.py index d6c134b..7ed735f 100644 --- a/pcomfortcloud/__init__.py +++ b/pcomfortcloud/__init__.py @@ -3,17 +3,22 @@ """ __all__ = [ + 'ApiClient', 'Error', 'LoginError', - 'ResponseError', - 'Session' + 'RequestError', + 'ResponseError' ] -from .session import ( +from .apiclient import ( + ApiClient +) + +from .exceptions import ( Error, LoginError, - ResponseError, - Session + RequestError, + ResponseError ) -from . import constants \ No newline at end of file +from . import constants diff --git a/pcomfortcloud/__main__.py b/pcomfortcloud/__main__.py deleted file mode 100644 index b2a19a6..0000000 --- a/pcomfortcloud/__main__.py +++ /dev/null @@ -1,279 +0,0 @@ -import argparse -import json -import pcomfortcloud - -from enum import Enum - -def print_result(obj, indent = 0): - for key in obj: - value = obj[key] - - if isinstance(value, dict): - print(" "*indent + key) - print_result(value, indent + 4) - elif isinstance(value, Enum): - print(" "*indent + "{0: <{width}}: {1}".format(key, value.name, width=25-indent)) - elif isinstance(value, list): - print(" "*indent + "{0: <{width}}:".format(key, width=25-indent)) - for elt in value: - print_result(elt, indent + 4) - print("") - else: - print(" "*indent + "{0: <{width}}: {1}".format(key, value, width=25-indent)) - -def str2bool(v): - if v.lower() in ('yes', 'true', 't', 'y', '1'): - return True - elif v.lower() in ('no', 'false', 'f', 'n', '0'): - return False - else: - raise argparse.ArgumentTypeError('Boolean value expected.') - -def main(): - """ Start pcomfortcloud Comfort Cloud command line """ - - parser = argparse.ArgumentParser( - description='Read or change status of pcomfortcloud Climate devices') - - parser.add_argument( - 'username', - help='Username for pcomfortcloud Comfort Cloud') - - parser.add_argument( - 'password', - help='Password for pcomfortcloud Comfort Cloud') - - parser.add_argument( - '-t', '--token', - help='File to store token in', - default='~/.pcomfortcloud-token') - - parser.add_argument( - '-s', '--skipVerify', - help='Skip Ssl verification if set as True', - type=str2bool, nargs='?', const=True, - default=False) - - parser.add_argument( - '-r', '--raw', - help='Raw dump of response', - type=str2bool, nargs='?', const=True, - default=False) - - commandparser = parser.add_subparsers( - help='commands', - dest='command') - - commandparser.add_parser( - 'list', - help="Get a list of all devices") - - get_parser = commandparser.add_parser( - 'get', - help="Get status of a device") - - get_parser.add_argument( - dest='device', - type=int, - help='Device number #') - - set_parser = commandparser.add_parser( - 'set', - help="Set status of a device") - - set_parser.add_argument( - dest='device', - type=int, - help='Device number #' - ) - - set_parser.add_argument( - '-p', '--power', - choices=[ - pcomfortcloud.constants.Power.On.name, - pcomfortcloud.constants.Power.Off.name], - help='Power mode') - - set_parser.add_argument( - '-t', '--temperature', - type=float, - help="Temperature") - - set_parser.add_argument( - '-f', '--fanSpeed', - choices=[ - pcomfortcloud.constants.FanSpeed.Auto.name, - pcomfortcloud.constants.FanSpeed.Low.name, - pcomfortcloud.constants.FanSpeed.LowMid.name, - pcomfortcloud.constants.FanSpeed.Mid.name, - pcomfortcloud.constants.FanSpeed.HighMid.name, - pcomfortcloud.constants.FanSpeed.High.name], - help='Fan speed') - - set_parser.add_argument( - '-m', '--mode', - choices=[ - pcomfortcloud.constants.OperationMode.Auto.name, - pcomfortcloud.constants.OperationMode.Cool.name, - pcomfortcloud.constants.OperationMode.Dry.name, - pcomfortcloud.constants.OperationMode.Heat.name, - pcomfortcloud.constants.OperationMode.Fan.name], - help='Operation mode') - - set_parser.add_argument( - '-e', '--eco', - choices=[ - pcomfortcloud.constants.EcoMode.Auto.name, - pcomfortcloud.constants.EcoMode.Quiet.name, - pcomfortcloud.constants.EcoMode.Powerful.name], - help='Eco mode') - - set_parser.add_argument( - '-n', '--nanoe', - choices=[ - pcomfortcloud.constants.NanoeMode.On.name, - pcomfortcloud.constants.NanoeMode.Off.name, - pcomfortcloud.constants.NanoeMode.ModeG.name, - pcomfortcloud.constants.NanoeMode.All.name], - help='Nanoe mode') - - # set_parser.add_argument( - # '--airswingauto', - # choices=[ - # pcomfortcloud.constants.AirSwingAutoMode.Disabled.name, - # pcomfortcloud.constants.AirSwingAutoMode.AirSwingLR.name, - # pcomfortcloud.constants.AirSwingAutoMode.AirSwingUD.name, - # pcomfortcloud.constants.AirSwingAutoMode.Both.name], - # help='Automation of air swing') - - set_parser.add_argument( - '-y', '--airSwingVertical', - choices=[ - pcomfortcloud.constants.AirSwingUD.Auto.name, - pcomfortcloud.constants.AirSwingUD.Down.name, - pcomfortcloud.constants.AirSwingUD.DownMid.name, - pcomfortcloud.constants.AirSwingUD.Mid.name, - pcomfortcloud.constants.AirSwingUD.UpMid.name, - pcomfortcloud.constants.AirSwingUD.Up.name], - help='Vertical position of the air swing') - - set_parser.add_argument( - '-x', '--airSwingHorizontal', - choices=[ - pcomfortcloud.constants.AirSwingLR.Auto.name, - pcomfortcloud.constants.AirSwingLR.Left.name, - pcomfortcloud.constants.AirSwingLR.LeftMid.name, - pcomfortcloud.constants.AirSwingLR.Mid.name, - pcomfortcloud.constants.AirSwingLR.RightMid.name, - pcomfortcloud.constants.AirSwingLR.Right.name], - help='Horizontal position of the air swing') - - dump_parser = commandparser.add_parser( - 'dump', - help="Dump data of a device") - - dump_parser.add_argument( - dest='device', - type=int, - help='Device number 1-x') - - history_parser = commandparser.add_parser( - 'history', - help="Dump history of a device") - - history_parser.add_argument( - dest='device', - type=int, - help='Device number 1-x') - - history_parser.add_argument( - dest='mode', - type=str, - help='mode (Day, Week, Month, Year)') - - history_parser.add_argument( - dest='date', - type=str, - help='date of day like 20190807') - - args = parser.parse_args() - - session = pcomfortcloud.Session(args.username, args.password, args.token, args.raw, args.skipVerify == False) - session.login() - try: - if args.command == 'list': - print("list of devices and its device id (1-x)") - for idx, device in enumerate(session.get_devices()): - if(idx > 0): - print('') - - print("device #{}".format(idx + 1)) - print_result(device, 4) - - if args.command == 'get': - if int(args.device) <= 0 or int(args.device) > len(session.get_devices()): - raise Exception("device not found, acceptable device id is from {} to {}".format(1, len(session.get_devices()))) - - device = session.get_devices()[int(args.device) - 1] - print("reading from device '{}' ({})".format(device['name'], device['id'])) - - print_result( session.get_device(device['id']) ) - - if args.command == 'set': - if int(args.device) <= 0 or int(args.device) > len(session.get_devices()): - raise Exception("device not found, acceptable device id is from {} to {}".format(1, len(session.get_devices()))) - - device = session.get_devices()[int(args.device) - 1] - print("writing to device '{}' ({})".format(device['name'], device['id'])) - - kwargs = {} - - if args.power is not None: - kwargs['power'] = pcomfortcloud.constants.Power[args.power] - - if args.temperature is not None: - kwargs['temperature'] = args.temperature - - if args.fanSpeed is not None: - kwargs['fanSpeed'] = pcomfortcloud.constants.FanSpeed[args.fanSpeed] - - if args.mode is not None: - kwargs['mode'] = pcomfortcloud.constants.OperationMode[args.mode] - - if args.eco is not None: - kwargs['eco'] = pcomfortcloud.constants.EcoMode[args.eco] - - if args.nanoe is not None: - kwargs['nanoe'] = pcomfortcloud.constants.NanoeMode[args.nanoe] - - if args.airSwingHorizontal is not None: - kwargs['airSwingHorizontal'] = pcomfortcloud.constants.AirSwingLR[args.airSwingHorizontal] - - if args.airSwingVertical is not None: - kwargs['airSwingVertical'] = pcomfortcloud.constants.AirSwingUD[args.airSwingVertical] - - session.set_device(device['id'], **kwargs) - - if args.command == 'dump': - if int(args.device) <= 0 or int(args.device) > len(session.get_devices()): - raise Exception("device not found, acceptable device id is from {} to {}".format(1, len(session.get_devices()))) - - device = session.get_devices()[int(args.device) - 1] - - print_result(session.dump(device['id'])) - - if args.command == 'history': - if int(args.device) <= 0 or int(args.device) > len(session.get_devices()): - raise Exception("device not found, acceptable device id is from {} to {}".format(1, len(session.get_devices()))) - - device = session.get_devices()[int(args.device) - 1] - - print_result(session.history(device['id'], args.mode, args.date)) - - except pcomfortcloud.ResponseError as ex: - print(ex.text) - - -# pylint: disable=C0103 -if __name__ == "__main__": - main() diff --git a/pcomfortcloud/apiclient.py b/pcomfortcloud/apiclient.py new file mode 100644 index 0000000..59de76a --- /dev/null +++ b/pcomfortcloud/apiclient.py @@ -0,0 +1,273 @@ +''' +Panasonic session, using Panasonic Comfort Cloud app api +''' + +import hashlib +import re +from urllib.parse import quote_plus + +from . import session +from . import constants + + +class ApiClient(session.PanasonicSession): + def __init__(self, + username, + password, + token=None, + raw=False): + super().__init__(username, password, token, raw) + + self._groups = None + self._devices = None + self._device_indexer = {} + self._raw = raw + self._acc_client_id = None + + def start_session(self): + super().start_session() + self._get_groups() + + def _get_groups(self): + self._groups = self.execute_get( + self._get_group_url(), + "get_groups", + 200 + ) + self._devices = None + + def get_devices(self): + if self._devices is None: + self._devices = [] + + for group in self._groups['groupList']: + if 'deviceList' in group: + device_list = group.get('deviceList', []) + else: + device_list = group.get('deviceIdList', []) + + for device in device_list: + if device: + if 'deviceHashGuid' in device: + device_id = device['deviceHashGuid'] + else: + device_id = hashlib.md5( + device['deviceGuid'].encode('utf-8')).hexdigest() + + self._device_indexer[device_id] = device['deviceGuid'] + self._devices.append({ + 'id': device_id, + 'name': device['deviceName'], + 'group': group['groupName'], + 'model': device['deviceModuleNumber'] if 'deviceModuleNumber' in device else '' + }) + return self._devices + + def dump(self, device_id): + device_guid = self._device_indexer.get(device_id) + if device_guid: + return self.execute_get(self._get_device_status_url(device_guid), "dump", 200) + return None + + def history(self, device_id, mode, date, time_zone="+01:00"): + device_guid = self._device_indexer.get(device_id) + + if device_guid: + try: + data_mode = constants.DataMode[mode].value + except KeyError: + raise Exception("Wrong mode parameter") + + payload = { + "deviceGuid": device_guid, + "dataMode": data_mode, + "date": date, + "osTimezone": time_zone + } + + json_response = self.execute_post( + self._get_device_history_url(), payload, "history", 200) + + return { + 'id': device_id, + 'parameters': self._read_parameters(json_response) + } + return None + + def get_device(self, device_id): + device_guid = self._device_indexer.get(device_id) + + if device_guid: + json_response = self.execute_get( + self._get_device_status_url(device_guid), "get_device", 200) + return { + 'id': device_id, + 'parameters': self._read_parameters(json_response['parameters']) + } + return None + + def set_device(self, device_id, **kwargs): + """ Set parameters of device + + Args: + device_id (str): Id of the device + kwargs : {temperature=float}, {mode=OperationMode}, {fanSpeed=FanSpeed}, {power=Power}, + {airSwingHorizontal=}, {airSwingVertical=}, {eco=EcoMode} + """ + + parameters = {} + air_x = None + air_y = None + + if kwargs is not None: + for key, value in kwargs.items(): + if key == 'power' and isinstance(value, constants.Power): + parameters['operate'] = value.value + + if key == 'temperature': + parameters['temperatureSet'] = value + + if key == 'mode' and isinstance(value, constants.OperationMode): + parameters['operationMode'] = value.value + + if key == 'fanSpeed' and isinstance(value, constants.FanSpeed): + parameters['fanSpeed'] = value.value + + if key == 'airSwingHorizontal' and isinstance(value, constants.AirSwingLR): + air_x = value + + if key == 'airSwingVertical' and isinstance(value, constants.AirSwingUD): + air_y = value + + if key == 'eco' and isinstance(value, constants.EcoMode): + parameters['ecoMode'] = value.value + + if key == 'nanoe' and \ + isinstance(value, constants.NanoeMode) and \ + value != constants.NanoeMode.Unavailable: + parameters['nanoe'] = value.value + + # routine to set the auto mode of fan + # (either horizontal, vertical, both or disabled) + if air_x is not None or air_y is not None: + fan_auto = 0 + device = self.get_device(device_id) + + if device and device['parameters']['airSwingHorizontal'].value == -1: + fan_auto = fan_auto | 1 + + if device and device['parameters']['airSwingVertical'].value == -1: + fan_auto = fan_auto | 2 + + if air_x is not None: + if air_x.value == -1: + fan_auto = fan_auto | 1 + else: + fan_auto = fan_auto & ~1 + parameters['airSwingLR'] = air_x.value + + if air_y is not None: + if air_y.value == -1: + fan_auto = fan_auto | 2 + else: + fan_auto = fan_auto & ~2 + print(air_y.name) + parameters['airSwingUD'] = air_y.value + + if fan_auto == 3: + parameters['fanAutoMode'] = constants.AirSwingAutoMode.Both.value + elif fan_auto == 1: + parameters['fanAutoMode'] = constants.AirSwingAutoMode.AirSwingLR.value + elif fan_auto == 2: + parameters['fanAutoMode'] = constants.AirSwingAutoMode.AirSwingUD.value + else: + parameters['fanAutoMode'] = constants.AirSwingAutoMode.Disabled.value + + device_guid = self._device_indexer.get(device_id) + if device_guid: + payload = { + "deviceGuid": device_guid, + "parameters": parameters + } + _ = self.execute_post( + self._get_device_status_control_url(), payload, "set_device", 200) + return True + return False + + def _read_parameters(self, parameters=dict()): + value = dict() + + _convert = { + 'insideTemperature': 'temperatureInside', + 'outTemperature': 'temperatureOutside', + 'temperatureSet': 'temperature', + 'currencyUnit': 'currencyUnit', + 'energyConsumption': 'energyConsumption', + 'estimatedCost': 'estimatedCost', + 'historyDataList': 'historyDataList', + } + for key in _convert: + if key in parameters: + value[_convert[key]] = parameters[key] + + if 'operate' in parameters: + value['power'] = constants.Power(parameters['operate']) + + if 'operationMode' in parameters: + value['mode'] = constants.OperationMode( + parameters['operationMode']) + + if 'fanSpeed' in parameters: + value['fanSpeed'] = constants.FanSpeed(parameters['fanSpeed']) + + if 'airSwingLR' in parameters: + value['airSwingHorizontal'] = constants.AirSwingLR( + parameters['airSwingLR']) + + if 'airSwingUD' in parameters: + value['airSwingVertical'] = constants.AirSwingUD( + parameters['airSwingUD']) + + if 'ecoMode' in parameters: + value['eco'] = constants.EcoMode(parameters['ecoMode']) + + if 'nanoe' in parameters: + value['nanoe'] = constants.NanoeMode(parameters['nanoe']) + + if 'fanAutoMode' in parameters: + if parameters['fanAutoMode'] == constants.AirSwingAutoMode.Both.value: + value['airSwingHorizontal'] = constants.AirSwingLR.Auto + value['airSwingVertical'] = constants.AirSwingUD.Auto + elif parameters['fanAutoMode'] == constants.AirSwingAutoMode.AirSwingLR.value: + value['airSwingHorizontal'] = constants.AirSwingLR.Auto + elif parameters['fanAutoMode'] == constants.AirSwingAutoMode.AirSwingUD.value: + value['airSwingVertical'] = constants.AirSwingUD.Auto + + return value + + def _get_group_url(self): + return '{base_url}/device/group'.format( + base_url=session.PanasonicSession.BASE_PATH_ACC + ) + + def _get_device_status_url(self, guid): + return '{base_url}/deviceStatus/{guid}'.format( + base_url=session.PanasonicSession.BASE_PATH_ACC, + guid=re.sub(r'(?i)\%2f', 'f', quote_plus(guid)) + ) + + def _get_device_status_now_url(self, guid): + return '{base_url}/deviceStatus/now/{guid}'.format( + base_url=session.PanasonicSession.BASE_PATH_ACC, + guid=re.sub(r'(?i)\%2f', 'f', quote_plus(guid)) + ) + + def _get_device_status_control_url(self): + return '{base_url}/deviceStatus/control'.format( + base_url=session.PanasonicSession.BASE_PATH_ACC + ) + + def _get_device_history_url(self): + return '{base_url}/deviceHistoryData'.format( + base_url=session.PanasonicSession.BASE_PATH_ACC, + ) diff --git a/pcomfortcloud/certificatechain.pem b/pcomfortcloud/certificatechain.pem deleted file mode 100644 index 2b66cc6..0000000 --- a/pcomfortcloud/certificatechain.pem +++ /dev/null @@ -1,50 +0,0 @@ ------BEGIN CERTIFICATE----- -MIIDdzCCAl+gAwIBAgIBADANBgkqhkiG9w0BAQsFADBdMQswCQYDVQQGEwJKUDEl -MCMGA1UEChMcU0VDT00gVHJ1c3QgU3lzdGVtcyBDTy4sTFRELjEnMCUGA1UECxMe -U2VjdXJpdHkgQ29tbXVuaWNhdGlvbiBSb290Q0EyMB4XDTA5MDUyOTA1MDAzOVoX -DTI5MDUyOTA1MDAzOVowXTELMAkGA1UEBhMCSlAxJTAjBgNVBAoTHFNFQ09NIFRy -dXN0IFN5c3RlbXMgQ08uLExURC4xJzAlBgNVBAsTHlNlY3VyaXR5IENvbW11bmlj -YXRpb24gUm9vdENBMjCCASIwDQYJKoZIhvcNAQEBBQADggEPADCCAQoCggEBANAV -OVKxUrO6xVmCxF1SrjpDZYBLx/KWvNs2l9amZIyoXvDjChz335c9S672XewhtUGr -zbl+dp+++T42NKA7wfYxEUV0kz1XgMX5iZnK5atq1LXaQZAQwdbWQonCv/Q4EpVM -VAX3NuRFg3sUZdbcDE3R3n4MqzvEFb46VqZab3ZpUql6ucjrappdUtAtCms1FgkQ -hNBqyjoGADdH5H5XTz+L62e4iKrFvlNVspHEfbmwhRkGeC7bYRr6hfVKkaHnFtWO -ojnflLhwHyg/i/xAXmODPIMqGplrz95Zajv8bxbXH/1KEOtOghY6rCcMU/Gt1SSw -awNQwS08Ft1ENCcadfsCAwEAAaNCMEAwHQYDVR0OBBYEFAqFqXdlBZh8QIH4D5cs -OPEK7DzPMA4GA1UdDwEB/wQEAwIBBjAPBgNVHRMBAf8EBTADAQH/MA0GCSqGSIb3 -DQEBCwUAA4IBAQBMOqNErLlFsceTfsgLCkLfZOoc7llsCLqJX2rKSpWeeo8HxdpF -coJxDjrSzG+ntKEju/Ykn8sX/oymzsLS28yN/HH8AynBbF0zX2S2ZTuJbxh2ePXc -okgfGT+Ok+vx+hfuzU7jBBJV1uXk3fs+BXziHV7Gp7yXT2g69ekuCkO2r1dcYmh8 -t/2jioSgrGK+KwmHNPBqAbubKVY8/gA3zyNs8U6qtnRGEmyR7jTV7JqR50S+kDFy -1UkC9gLl9B/rfNmWVan/7Ir5mUf/NVoCqgTLiluHcSmRvaS0eg29mvVXIwAHIRc/ -SjnRBUkLp7Y3gaVdjKozXoEofKd9J+sAro03 ------END CERTIFICATE----- ------BEGIN CERTIFICATE----- -MIIE7jCCA9agAwIBAgIJIrmxYwzstDwuMA0GCSqGSIb3DQEBCwUAMF0xCzAJBgNV -BAYTAkpQMSUwIwYDVQQKExxTRUNPTSBUcnVzdCBTeXN0ZW1zIENPLixMVEQuMScw -JQYDVQQLEx5TZWN1cml0eSBDb21tdW5pY2F0aW9uIFJvb3RDQTIwHhcNMTkwOTI3 -MDE1NDIzWhcNMjkwNTI5MDUwMDM5WjBeMQswCQYDVQQGEwJKUDEjMCEGA1UEChMa -Q3liZXJ0cnVzdCBKYXBhbiBDby4sIEx0ZC4xKjAoBgNVBAMTIUN5YmVydHJ1c3Qg -SmFwYW4gU3VyZVNlcnZlciBDQSBHNDCCASIwDQYJKoZIhvcNAQEBBQADggEPADCC -AQoCggEBAMtunFmosT8IxBkVFP+OnkGkcVmbui+hdVBlGZhniniVluAhigm2WUxx -p4X5V3B/QKJLZmeAswmzxGKXaDCzcomYxXTygNXcTLI+IMyRisEO7V1NXFHUjSEl -KaY1LzCA9/emldnmRjX6B9Zt5xXK5q12WOIWkJECEwwKku77tvtKZPRKaCNCGsZ5 -Hja7PBs07jLoE0rMuZLQZNQEB0W63attKGCGzEk50lDj+wQ0UlUbQk3zAEsvdE6X -o1qZy9l783Va40vSx3VqhGYb4jWQrg2CrAtJcKQNSJ0m9yxJVVQDwpQQwGxHO5Em -Qv1LGJExASegOXzhzqCr5yiwECfSrOsCAwEAAaOCAa4wggGqMB0GA1UdDgQWBBRi -p9La3oW2kvGFvPbolZ11oPpOHzAfBgNVHSMEGDAWgBQKhal3ZQWYfECB+A+XLDjx -Cuw8zzASBgNVHRMBAf8ECDAGAQH/AgEAMA4GA1UdDwEB/wQEAwIBBjBJBgNVHR8E -QjBAMD6gPKA6hjhodHRwOi8vcmVwb3NpdG9yeS5zZWNvbXRydXN0Lm5ldC9TQy1S -b290Mi9TQ1Jvb3QyQ1JMLmNybDBSBgNVHSAESzBJMEcGCiqDCIybG2SHBQQwOTA3 -BggrBgEFBQcCARYraHR0cHM6Ly9yZXBvc2l0b3J5LnNlY29tdHJ1c3QubmV0L1ND -LVJvb3QyLzCBhQYIKwYBBQUHAQEEeTB3MDAGCCsGAQUFBzABhiRodHRwOi8vc2Ny -b290Y2EyLm9jc3Auc2Vjb210cnVzdC5uZXQwQwYIKwYBBQUHMAKGN2h0dHA6Ly9y -ZXBvc2l0b3J5LnNlY29tdHJ1c3QubmV0L1NDLVJvb3QyL1NDUm9vdDJjYS5jZXIw -HQYDVR0lBBYwFAYIKwYBBQUHAwEGCCsGAQUFBwMCMA0GCSqGSIb3DQEBCwUAA4IB -AQDFUXkuQAZKY5Pb/JvgufizJ8uZKpN++fiI8kOTgkrSriNYNjO9iEdTwVV3sW3P -abr3lAdCGrJ+wgArLLUnBZ7VGM//UlDhTRe4A19c0TmV1udTSbxabwQJbN1zIGbG -3KQsumg9/70PaIkawnihx9/TxGCrJkAH5W+9pkX7OWefsahylwiPkKRkJRv2em/v -rMRyb3LxRU5t84k1xQqCnMxTEWOAiWoOtatExZ38a8kn8kqiE1NEvtPOzSrVjSvD -7pH56AlvO7hzGzng60kyWnAz6O5rQ0tsFIgW9xloWTQQVcfEtzvjc8ptuP9onkbA -jMRDJaqLXrIVtB7GMek7S6AO ------END CERTIFICATE----- diff --git a/pcomfortcloud/constants.py b/pcomfortcloud/constants.py index 95bfb53..591b282 100644 --- a/pcomfortcloud/constants.py +++ b/pcomfortcloud/constants.py @@ -1,9 +1,11 @@ from enum import Enum + class Power(Enum): Off = 0 On = 1 + class OperationMode(Enum): Auto = 0 Dry = 1 @@ -11,6 +13,7 @@ class OperationMode(Enum): Heat = 3 Fan = 4 + class AirSwingUD(Enum): Auto = -1 Up = 0 @@ -20,6 +23,7 @@ class AirSwingUD(Enum): Down = 1 Swing = 5 + class AirSwingLR(Enum): Auto = -1 Left = 1 @@ -28,17 +32,20 @@ class AirSwingLR(Enum): RightMid = 4 Right = 0 + class EcoMode(Enum): Auto = 0 Powerful = 1 Quiet = 2 + class AirSwingAutoMode(Enum): Disabled = 1 Both = 0 AirSwingLR = 3 AirSwingUD = 2 + class FanSpeed(Enum): Auto = 0 Low = 1 @@ -47,12 +54,14 @@ class FanSpeed(Enum): HighMid = 4 High = 5 -class dataMode(Enum): + +class DataMode(Enum): Day = 0 Week = 1 Month = 2 Year = 4 + class NanoeMode(Enum): Unavailable = 0 Off = 1 diff --git a/pcomfortcloud/exceptions.py b/pcomfortcloud/exceptions.py new file mode 100644 index 0000000..73e5e51 --- /dev/null +++ b/pcomfortcloud/exceptions.py @@ -0,0 +1,14 @@ +class Error(Exception): + pass + + +class LoginError(Error): + pass + + +class RequestError(Error): + pass + + +class ResponseError(Error): + pass diff --git a/pcomfortcloud/session.py b/pcomfortcloud/session.py index 0ba6996..80e9656 100644 --- a/pcomfortcloud/session.py +++ b/pcomfortcloud/session.py @@ -1,456 +1,425 @@ -''' -Panasonic session, using Panasonic Comfort Cloud app api -''' - +import base64 +import datetime +import hashlib import json +import random +import string +import time +import urllib import requests -import os -import urllib3 -import hashlib - -from . import urls -from . import constants -def _validate_response(response): - """ Verify that response is OK """ - if response.status_code == 200: - return - raise ResponseError(response.status_code, response.text) +from bs4 import BeautifulSoup +from . import exceptions -class Error(Exception): - ''' Panasonic session error ''' - pass +def generate_random_string(length): + return ''.join(random.choice(string.ascii_letters + string.digits) for _ in range(length)) -class RequestError(Error): - ''' Wrapped requests.exceptions.RequestException ''' - pass +def generate_random_string_hex(length): + return ''.join(random.choice(string.hexdigits) for _ in range(length)) -class LoginError(Error): - ''' Login failed ''' - pass -class ResponseError(Error): - ''' Unexcpected response ''' - def __init__(self, status_code, text): - super(ResponseError, self).__init__( - 'Invalid response' - ', status code: {0} - Data: {1}'.format( - status_code, - text)) - self.status_code = status_code - self.text = text +def check_response(response, function_description, expected_status): + if response.status_code != expected_status: + raise exceptions.ResponseError( + f"({function_description}: Expected status code {expected_status}, received: {response.status_code}: " + + f"{response.text}" + ) -class Session(object): - """ Verisure app session +def get_querystring_parameter_from_header_entry_url(response, header_entry, querystring_parameter): + header_entry_value = response.headers[header_entry] + parsed_url = urllib.parse.urlparse(header_entry_value) + params = urllib.parse.parse_qs(parsed_url.query) + return params.get(querystring_parameter, [None])[0] - Args: - username (str): Username used to login to verisure app - password (str): Password used to login to verisure app - """ +class PanasonicSession(): + APP_CLIENT_ID = "Xmy6xIYIitMxngjB2rHvlm6HSDNnaMJx" + AUTH_0_CLIENT = "eyJuYW1lIjoiQXV0aDAuQW5kcm9pZCIsImVudiI6eyJhbmRyb2lkIjoiMzAifSwidmVyc2lvbiI6IjIuOS4zIn0=" + REDIRECT_URI = "panasonic-iot-cfc://authglb.digital.panasonic.com/android/com.panasonic.ACCsmart/callback" + BASE_PATH_AUTH = "https://authglb.digital.panasonic.com" + BASE_PATH_ACC = "https://accsmart.panasonic.com" + X_APP_VERSION = "1.21.0" + APPBRAIN_URL = "https://www.appbrain.com/app/panasonic-comfort-cloud/com.panasonic.ACCsmart" + # token: + # - access_token + # - refresh_token + # - id_token + # - unix_timestamp_token_received + # - expires_in_sec + # - acc_client_id + # - scope - def __init__(self, username, password, tokenFileName='~/.panasonic-token', raw=False, verifySsl=True): + def __init__(self, username, password, token, raw=False): self._username = username self._password = password - self._tokenFileName = os.path.expanduser(tokenFileName) - self._vid = None - self._groups = None - self._devices = None - self._deviceIndexer = {} + self._token = token self._raw = raw + self._app_version = PanasonicSession.X_APP_VERSION + self._update_app_version() - if verifySsl == False: - urllib3.disable_warnings(urllib3.exceptions.InsecureRequestWarning) - self._verifySsl = verifySsl + def _update_app_version(self): + if self._raw: + print("--- auto detecting latest app version") + try: + response = requests.get(PanasonicSession.APPBRAIN_URL) + responseContent = response.content + soup = BeautifulSoup(responseContent, "html.parser") + meta_tag = soup.find("meta", itemprop="softwareVersion") + if meta_tag is not None: + version = meta_tag['content'] + self._app_version = version + if self._raw: + print("--- found version: {}".format(self._app_version)) + return + else: + self._app_version = PanasonicSession.X_APP_VERSION + print("--- Error finding meta_tag") + return + + except Exception: + self._app_version = PanasonicSession.X_APP_VERSION + if self._raw: + print("--- failed to detect app version using version default") + pass + + def _check_token_is_valid(self): + if self._raw: + print("--- Checking token is valid") + if self._token is not None: + now = datetime.datetime.now() + now_unix = time.mktime(now.timetuple()) + + # multiple parts in access_token which are separated by . + part_of_token_b64 = str(self._token["access_token"].split(".")[1]) + # as seen here: https://stackoverflow.com/questions/3302946/how-to-decode-base64-url-in-python + part_of_token = base64.urlsafe_b64decode( + part_of_token_b64 + '=' * (4 - len(part_of_token_b64) % 4)) + token_info_json = json.loads(part_of_token) + + if self._raw: + print(json.dumps(token_info_json, indent=4)) + + expiry_in_token = token_info_json["exp"] + + if (now_unix > expiry_in_token) or \ + (now_unix > self._token["unix_timestamp_token_received"] + self._token["expires_in_sec"]): + return False + return True else: - self._verifySsl = os.path.join(os.path.dirname(__file__), - "certificatechain.pem") - - def __enter__(self): - self.login() - return self - - def __exit__(self, exc_type, exc_val, exc_tb): - self.logout() - - def login(self): - """ Login to verisure app api """ - - if os.path.exists(self._tokenFileName): - with open(self._tokenFileName, 'r') as cookieFile: - self._vid = cookieFile.read().strip() - - if self._raw: print("--- token found") - - try: - self._get_groups() - - except ResponseError: - if self._raw: print("--- token probably expired") - - self._vid = None - self._devices = None - os.remove(self._tokenFileName) - - if self._vid is None: - self._create_token() - with open(self._tokenFileName, 'w') as tokenFile: - tokenFile.write(self._vid) - - self._get_groups() + return False + + def _get_new_token(self): + requests_session = requests.Session() + + # generate initial state and code_challenge + state = generate_random_string(20) + code_verifier = generate_random_string(43) + + code_challenge = base64.urlsafe_b64encode( + hashlib.sha256( + code_verifier.encode('utf-8') + ).digest()).split('='.encode('utf-8'))[0].decode('utf-8') + + # -------------------------------------------------------------------- + # AUTHORIZE + # -------------------------------------------------------------------- + + response = requests_session.get( + f'{PanasonicSession.BASE_PATH_AUTH}/authorize', + headers={ + "user-agent": "okhttp/4.10.0", + }, + params={ + "scope": "openid offline_access comfortcloud.control a2w.control", + "audience": f"https://digital.panasonic.com/{PanasonicSession.APP_CLIENT_ID}/api/v1/", + "protocol": "oauth2", + "response_type": "code", + "code_challenge": code_challenge, + "code_challenge_method": "S256", + "auth0Client": PanasonicSession.AUTH_0_CLIENT, + "client_id": PanasonicSession.APP_CLIENT_ID, + "redirect_uri": PanasonicSession.REDIRECT_URI, + "state": state, + }, + allow_redirects=False) + check_response(response, 'authorize', 302) + + # ------------------------------------------------------------------- + # FOLLOW REDIRECT + # ------------------------------------------------------------------- + + location = response.headers['Location'] + state = get_querystring_parameter_from_header_entry_url( + response, 'Location', 'state') + + if not location.startswith(PanasonicSession.REDIRECT_URI): + response = requests_session.get( + f"{PanasonicSession.BASE_PATH_AUTH}/{location}", + allow_redirects=False) + check_response(response, 'authorize_redirect', 200) + + # get the "_csrf" cookie + csrf = response.cookies['_csrf'] + + # ------------------------------------------------------------------- + # LOGIN + # ------------------------------------------------------------------- + + response = requests_session.post( + f'{PanasonicSession.BASE_PATH_AUTH}/usernamepassword/login', + headers={ + "Auth0-Client": PanasonicSession.AUTH_0_CLIENT, + "user-agent": "okhttp/4.10.0", + }, + json={ + "client_id": PanasonicSession.APP_CLIENT_ID, + "redirect_uri": PanasonicSession.REDIRECT_URI, + "tenant": "pdpauthglb-a1", + "response_type": "code", + "scope": "openid offline_access comfortcloud.control a2w.control", + "audience": f"https://digital.panasonic.com/{PanasonicSession.APP_CLIENT_ID}/api/v1/", + "_csrf": csrf, + "state": state, + "_intstate": "deprecated", + "username": self._username, + "password": self._password, + "lang": "en", + "connection": "PanasonicID-Authentication" + }, + allow_redirects=False) + check_response(response, 'login', 200) + + # ------------------------------------------------------------------- + # CALLBACK + # ------------------------------------------------------------------- + + # get wa, wresult, wctx from body + soup = BeautifulSoup(response.content, "html.parser") + input_lines = soup.find_all("input", {"type": "hidden"}) + parameters = dict() + for input_line in input_lines: + parameters[input_line.get("name")] = input_line.get("value") + + user_agent = "Mozilla/5.0 (Linux; Android 10; K) AppleWebKit/537.36 " + user_agent += "(KHTML, like Gecko) Chrome/113.0.0.0 Mobile Safari/537.36" + + response = requests_session.post( + url=f"{PanasonicSession.BASE_PATH_AUTH}/login/callback", + data=parameters, + headers={ + "Content-Type": "application/x-www-form-urlencoded", + "User-Agent": user_agent, + }, + allow_redirects=False) + check_response(response, 'login_callback', 302) + + # ------------------------------------------------------------------ + # FOLLOW REDIRECT + # ------------------------------------------------------------------ + + location = response.headers['Location'] + + response = requests_session.get( + f"{PanasonicSession.BASE_PATH_AUTH}/{location}", + allow_redirects=False) + check_response(response, 'login_redirect', 302) + + # ------------------------------------------------------------------ + # GET TOKEN + # ------------------------------------------------------------------ + + code = get_querystring_parameter_from_header_entry_url( + response, 'Location', 'code') + + # do before, so that timestamp is older rather than newer + now = datetime.datetime.now() + unix_time_token_received = time.mktime(now.timetuple()) + + response = requests_session.post( + f'{PanasonicSession.BASE_PATH_AUTH}/oauth/token', + headers={ + "Auth0-Client": PanasonicSession.AUTH_0_CLIENT, + "user-agent": "okhttp/4.10.0", + }, + json={ + "scope": "openid", + "client_id": PanasonicSession.APP_CLIENT_ID, + "grant_type": "authorization_code", + "code": code, + "redirect_uri": PanasonicSession.REDIRECT_URI, + "code_verifier": code_verifier + }, + allow_redirects=False) + check_response(response, 'get_token', 200) + + token_response = json.loads(response.text) + + # ------------------------------------------------------------------ + # RETRIEVE ACC_CLIENT_ID + # ------------------------------------------------------------------ + now = datetime.datetime.now() + timestamp = now.strftime("%Y-%m-%d %H:%M:%S") + response = requests.post( + f'{PanasonicSession.BASE_PATH_ACC}/auth/v2/login', + headers={ + "Content-Type": "application/json;charset=utf-8", + "User-Agent": "G-RAC", + "x-app-name": "Comfort Cloud", + "x-app-timestamp": timestamp, + "x-app-type": "1", + "x-app-version": self._app_version, + "x-cfc-api-key": generate_random_string_hex(128), + "x-user-authorization-v2": "Bearer " + token_response["access_token"] + }, + json={ + "language": 0 + }) + check_response(response, 'get_acc_client_id', 200) + + json_body = json.loads(response.text) + acc_client_id = json_body["clientId"] + + self._token = { + "access_token": token_response["access_token"], + "refresh_token": token_response["refresh_token"], + "id_token": token_response["id_token"], + "unix_timestamp_token_received": unix_time_token_received, + "expires_in_sec": token_response["expires_in"], + "acc_client_id": acc_client_id, + "scope": token_response["scope"] + } - def logout(self): - """ Logout """ + def get_token(self): + return self._token - def _headers(self): - return { - "X-APP-TYPE": "1", - "X-APP-VERSION": "1.20.1", - "X-User-Authorization": self._vid, - "X-APP-TIMESTAMP": "1", - "X-APP-NAME": "Comfort Cloud", - "X-CFC-API-KEY": "Comfort Cloud", - "User-Agent": "G-RAC", - "Accept": "application/json; charset=utf-8", - "Content-Type": "application/json; charset=utf-8" + def start_session(self): + if self._token is not None: + if not self._check_token_is_valid(): + self._refresh_token() + else: + self._get_new_token() + + def stop_session(self): + response = requests.post( + f"{PanasonicSession.BASE_PATH_ACC}/auth/v2/logout", + headers=self._get_header_for_api_calls() + ) + check_response(response, "logout", 200) + if json.loads(response.text)["result"] != 0: + # issue during logout, but do we really care? + pass + + def _refresh_token(self): + # do before, so that timestamp is older rather than newer + now = datetime.datetime.now() + unix_time_token_received = time.mktime(now.timetuple()) + + response = requests.post( + f'{PanasonicSession.BASE_PATH_AUTH}/oauth/token', + headers={ + "Auth0-Client": PanasonicSession.AUTH_0_CLIENT, + "user-agent": "okhttp/4.10.0", + }, + json={ + "scope": self._token["scope"], + "client_id": PanasonicSession.APP_CLIENT_ID, + "refresh_token": self._token["refresh_token"], + "grant_type": "refresh_token" + }, + allow_redirects=False) + check_response(response, 'refresh_token', 200) + token_response = json.loads(response.text) + + self._token = { + "access_token": token_response["access_token"], + "refresh_token": token_response["refresh_token"], + "id_token": token_response["id_token"], + "unix_timestamp_token_received": unix_time_token_received, + "expires_in_sec": token_response["expires_in"], + "acc_client_id": self._token["acc_client_id"], + "scope": token_response["scope"] } - def _create_token(self): - response = None - - payload = { - "language": 0, - "loginId": self._username, - "password": self._password + def _get_header_for_api_calls(self, no_client_id=False): + now = datetime.datetime.now() + timestamp = now.strftime("%Y-%m-%d %H:%M:%S") + return { + "Content-Type": "application/json;charset=utf-8", + "x-app-name": "Comfort Cloud", + "user-agent": "G-RAC", + "x-app-timestamp": timestamp, + "x-app-type": "1", + "x-app-version": self._app_version, + # Seems to work by either setting X-CFC-API-KEY to 0 or to a 128-long hex string + # "X-CFC-API-KEY": "0", + "x-cfc-api-key": generate_random_string_hex(128), + "x-client-id": self._token["acc_client_id"], + "x-user-authorization-v2": "Bearer " + self._token["access_token"] } - if self._raw: print("--- creating token by authenticating") + def _get_user_info(self): + response = requests.get( + f'{PanasonicSession.BASE_PATH_AUTH}/userinfo', + headers={ + "Auth0-Client": self.AUTH_0_CLIENT, + "Authorization": "Bearer " + self._token["access_token"] + }) + check_response(response, 'userinfo', 200) + + def execute_post(self, + url, + json_data, + function_description, + expected_status_code): + self._ensure_valid_token() try: - response = requests.post(urls.login(), json=payload, headers=self._headers(), verify=self._verifySsl) - if 2 != response.status_code // 100: - raise ResponseError(response.status_code, response.text) - + response = requests.post( + url, + json=json_data, + headers=self._get_header_for_api_calls() + ) except requests.exceptions.RequestException as ex: - raise LoginError(ex) - - _validate_response(response) + raise exceptions.RequestError(ex) - if(self._raw is True): - print("--- raw beginning ---") - print(response.text) - print("--- raw ending ---\n") - - self._vid = json.loads(response.text)['uToken'] + self._print_response_if_raw_is_set(response, function_description) + check_response(response, function_description, expected_status_code) + return json.loads(response.text) - def _get_groups(self): - """ Get information about groups """ - response = None + def execute_get(self, url, function_description, expected_status_code): + self._ensure_valid_token() try: - response = requests.get(urls.get_groups(),headers=self._headers(), verify=self._verifySsl) - - if 2 != response.status_code // 100: - raise ResponseError(response.status_code, response.text) - + response = requests.get( + url, + headers=self._get_header_for_api_calls() + ) except requests.exceptions.RequestException as ex: - raise RequestError(ex) - - _validate_response(response) - - if(self._raw is True): - print("--- _get_groups()") - print("--- raw beginning ---") + raise exceptions.RequestError(ex) + + self._print_response_if_raw_is_set(response, function_description) + check_response(response, function_description, expected_status_code) + return json.loads(response.text) + + def _print_response_if_raw_is_set(self, response, function_description): + if self._raw: + print("=" * 79) + print(f"Response: {function_description}") + print("=" * 79) + print(f"Status: {response.status_code}") + print("-" * 79) + print("Headers:") + for header in response.headers: + print(f'{header}: {response.headers[header]}') + print("-" * 79) + print("Response body:") print(response.text) - print("--- raw ending ---\n") - - self._groups = json.loads(response.text) - self._devices = None - - def get_devices(self, group=None): - if self._vid is None: - self.login() - - if self._devices is None: - self._devices = [] - - for group in self._groups['groupList']: - if 'deviceList' in group: - list = group.get('deviceList', []) - else: - list = group.get('deviceIdList', []) - - for device in list: - if device: - id = None - if 'deviceHashGuid' in device: - id = device['deviceHashGuid'] - else: - id = hashlib.md5(device['deviceGuid'].encode('utf-8')).hexdigest() - - self._deviceIndexer[id] = device['deviceGuid'] - self._devices.append({ - 'id': id, - 'name': device['deviceName'], - 'group': group['groupName'], - 'model': device['deviceModuleNumber'] if 'deviceModuleNumber' in device else '' - }) - - return self._devices - - def dump(self, id): - deviceGuid = self._deviceIndexer.get(id) - - if(deviceGuid): - response = None - - try: - response = requests.get(urls.status(deviceGuid), headers=self._headers(), verify=self._verifySsl) - - if 2 != response.status_code // 100: - raise ResponseError(response.status_code, response.text) - - except requests.exceptions.RequestException as ex: - raise RequestError(ex) - - _validate_response(response) - return json.loads(response.text) - - return None - - def history(self, id, mode, date, tz="+01:00"): - deviceGuid = self._deviceIndexer.get(id) - - if(deviceGuid): - response = None - - try: - dataMode = constants.dataMode[mode].value - except KeyError: - raise Exception("Wrong mode parameter") - - payload = { - "deviceGuid": deviceGuid, - "dataMode": dataMode, - "date": date, - "osTimezone": tz - } - - try: - response = requests.post(urls.history(), json=payload, headers=self._headers(), verify=self._verifySsl) - - if 2 != response.status_code // 100: - raise ResponseError(response.status_code, response.text) - - except requests.exceptions.RequestException as ex: - raise RequestError(ex) - - _validate_response(response) - - if(self._raw is True): - print("--- history()") - print("--- raw beginning ---") - print(response.text) - print("--- raw ending ---") - - _json = json.loads(response.text) - return { - 'id': id, - 'parameters': self._read_parameters(_json) - } - - return None - - def get_device(self, id): - deviceGuid = self._deviceIndexer.get(id) - - if(deviceGuid): - response = None - - try: - response = requests.get(urls.status(deviceGuid), headers=self._headers(), verify=self._verifySsl) - - if 2 != response.status_code // 100: - raise ResponseError(response.status_code, response.text) - - except requests.exceptions.RequestException as ex: - raise RequestError(ex) - - _validate_response(response) - - if(self._raw is True): - print("--- get_device()") - print("--- raw beginning ---") - print(response.text) - print("--- raw ending ---") - - - _json = json.loads(response.text) - return { - 'id': id, - 'parameters': self._read_parameters(_json['parameters']) - } - - return None - - def set_device(self, id, **kwargs): - """ Set parameters of device - - Args: - id (str): Id of the device - kwargs : {temperature=float}, {mode=OperationMode}, {fanSpeed=FanSpeed}, {power=Power}, {airSwingHorizontal=}, {airSwingVertical=}, {eco=EcoMode} - """ - - parameters = {} - airX = None - airY = None - - if kwargs is not None: - for key, value in kwargs.items(): - if key == 'power' and isinstance(value, constants.Power): - parameters['operate'] = value.value - - if key == 'temperature': - parameters['temperatureSet'] = value - - if key == 'mode' and isinstance(value, constants.OperationMode): - parameters['operationMode'] = value.value - - if key == 'fanSpeed' and isinstance(value, constants.FanSpeed): - parameters['fanSpeed'] = value.value - - if key == 'airSwingHorizontal' and isinstance(value, constants.AirSwingLR): - airX = value - - if key == 'airSwingVertical' and isinstance(value, constants.AirSwingUD): - airY = value - - if key == 'eco' and isinstance(value, constants.EcoMode): - parameters['ecoMode'] = value.value - - if key == 'nanoe' and isinstance(value, constants.NanoeMode) and value != constants.NanoeMode.Unavailable: - parameters['nanoe'] = value.value - - - # routine to set the auto mode of fan (either horizontal, vertical, both or disabled) - if airX is not None or airY is not None: - fanAuto = 0 - device = self.get_device(id) - - if device and device['parameters']['airSwingHorizontal'].value == -1: - fanAuto = fanAuto | 1 - - if device and device['parameters']['airSwingVertical'].value == -1: - fanAuto = fanAuto | 2 - - if airX is not None: - if airX.value == -1: - fanAuto = fanAuto | 1 - else: - fanAuto = fanAuto & ~1 - parameters['airSwingLR'] = airX.value - - if airY is not None: - if airY.value == -1: - fanAuto = fanAuto | 2 - else: - fanAuto = fanAuto & ~2 - print(airY.name) - parameters['airSwingUD'] = airY.value - - if fanAuto == 3: - parameters['fanAutoMode'] = constants.AirSwingAutoMode.Both.value - elif fanAuto == 1: - parameters['fanAutoMode'] = constants.AirSwingAutoMode.AirSwingLR.value - elif fanAuto == 2: - parameters['fanAutoMode'] = constants.AirSwingAutoMode.AirSwingUD.value - else: - parameters['fanAutoMode'] = constants.AirSwingAutoMode.Disabled.value - - deviceGuid = self._deviceIndexer.get(id) - if(deviceGuid): - response = None - - payload = { - "deviceGuid": deviceGuid, - "parameters": parameters - } - - if(self._raw is True): - print("--- set_device()") - print("--- raw out beginning ---") - print(payload) - print("--- raw out ending ---") - - try: - response = requests.post(urls.control(), json=payload, headers=self._headers(), verify=self._verifySsl) - - if 2 != response.status_code // 100: - raise ResponseError(response.status_code, response.text) - - except requests.exceptions.RequestException as ex: - raise RequestError(ex) - - _validate_response(response) - - if(self._raw is True): - print("--- raw in beginning ---") - print(response.text) - print("--- raw in ending ---\n") - - _json = json.loads(response.text) - - return True - - return False - - def _read_parameters(self, parameters = {}): - value = {} - - _convert = { - 'insideTemperature': 'temperatureInside', - 'outTemperature': 'temperatureOutside', - 'temperatureSet': 'temperature', - 'currencyUnit': 'currencyUnit', - 'energyConsumption': 'energyConsumption', - 'estimatedCost': 'estimatedCost', - 'historyDataList': 'historyDataList', - } - for key in _convert: - if key in parameters: - value[_convert[key]] = parameters[key] - - if 'operate' in parameters: - value['power'] = constants.Power(parameters['operate']) - - if 'operationMode' in parameters: - value['mode'] = constants.OperationMode(parameters['operationMode']) - - if 'fanSpeed' in parameters: - value['fanSpeed'] = constants.FanSpeed(parameters['fanSpeed']) - - if 'airSwingLR' in parameters: - value['airSwingHorizontal'] = constants.AirSwingLR(parameters['airSwingLR']) - - if 'airSwingUD' in parameters: - value['airSwingVertical'] = constants.AirSwingUD(parameters['airSwingUD']) - - if 'ecoMode' in parameters: - value['eco'] = constants.EcoMode(parameters['ecoMode']) - - if 'nanoe' in parameters: - value['nanoe'] = constants.NanoeMode(parameters['nanoe']) - - if 'fanAutoMode' in parameters: - if parameters['fanAutoMode'] == constants.AirSwingAutoMode.Both.value: - value['airSwingHorizontal'] = constants.AirSwingLR.Auto - value['airSwingVertical'] = constants.AirSwingUD.Auto - elif parameters['fanAutoMode'] == constants.AirSwingAutoMode.AirSwingLR.value: - value['airSwingHorizontal'] = constants.AirSwingLR.Auto - elif parameters['fanAutoMode'] == constants.AirSwingAutoMode.AirSwingUD.value: - value['airSwingVertical'] = constants.AirSwingUD.Auto + print("-" * 79) - return value + def _ensure_valid_token(self): + if self._check_token_is_valid(): + return + self._refresh_token() diff --git a/pcomfortcloud/urls.py b/pcomfortcloud/urls.py deleted file mode 100644 index 44a69ec..0000000 --- a/pcomfortcloud/urls.py +++ /dev/null @@ -1,40 +0,0 @@ -import re - -try: - # Python 3 - from urllib.parse import quote_plus -except ImportError: - # Python 2 - from urllib import quote_plus - -BASE_URL = 'https://accsmart.panasonic.com' - -def login(): - return '{base_url}/auth/login'.format( - base_url=BASE_URL) - -def get_groups(): - return '{base_url}/device/group'.format( - base_url=BASE_URL) - -def status(guid): - return '{base_url}/deviceStatus/{guid}'.format( - base_url=BASE_URL, - guid=re.sub('(?i)\%2f', 'f', quote_plus(guid)) - ) - -def statusCache(guid): - return '{base_url}/deviceStatus/now/{guid}'.format( - base_url=BASE_URL, - guid=re.sub('(?i)\%2f', 'f', quote_plus(guid)) - ) - -def control(): - return '{base_url}/deviceStatus/control'.format( - base_url=BASE_URL - ) - -def history(): - return '{base_url}/deviceHistoryData'.format( - base_url=BASE_URL, - ) diff --git a/requirements.txt b/requirements.txt new file mode 100644 index 0000000..8263152 --- /dev/null +++ b/requirements.txt @@ -0,0 +1,3 @@ +requests +urllib3 +bs4 diff --git a/setup.py b/setup.py index ca19901..d6e9081 100644 --- a/setup.py +++ b/setup.py @@ -16,7 +16,7 @@ author='Lostfields', license='MIT', classifiers=[ - 'Topic :: Home Automation', + 'Topic :: Home Automation', 'License :: OSI Approved :: MIT License', 'Programming Language :: Python :: 3.4', 'Programming Language :: Python :: 3.5',