diff --git a/.gitignore b/.gitignore index 43681bc..3406207 100644 --- a/.gitignore +++ b/.gitignore @@ -103,4 +103,7 @@ venv.bak/ # mypy .mypy_cache/ -.vscode \ No newline at end of file +.vscode +.idea + +token.json diff --git a/README.md b/README.md index d5e1ba4..2fcd9b8 100644 --- a/README.md +++ b/README.md @@ -17,7 +17,7 @@ positional arguments: set Set status of a device dump Dump raw data of a device history Dump history of a device - + optional arguments: -h, --help show this help message and exit -t TOKEN, --token TOKEN @@ -99,16 +99,19 @@ optional arguments: ```python import pcomfortcloud + session = pcomfortcloud.Session('user@example.com', 'mypassword') session.login() -devices = session.get_devices() +client = pcomfortcloud.ApiClient(session) + +devices = client.get_devices() print(devices) -print(session.get_device(devices[0]['id'])) +print(client.get_device(devices[0]['id'])) -session.set_device(devices[0]['id'], +client.set_device(devices[0]['id'], power = pcomfortcloud.constants.Power.On, temperature = 22.0) ``` diff --git a/pcomfortcloud/__init__.py b/pcomfortcloud/__init__.py index d6c134b..6d3dc99 100644 --- a/pcomfortcloud/__init__.py +++ b/pcomfortcloud/__init__.py @@ -3,17 +3,30 @@ """ __all__ = [ + 'ApiClient', 'Error', 'LoginError', - 'ResponseError', - 'Session' + 'RequestError', + 'ResponseError' ] +from .apiclient import ( + ApiClient +) + from .session import ( + Session +) + +from .authentication import ( + Authentication +) + +from .exceptions import ( Error, LoginError, - ResponseError, - Session + RequestError, + ResponseError ) -from . import constants \ No newline at end of file +from . import constants diff --git a/pcomfortcloud/__main__.py b/pcomfortcloud/__main__.py index b2a19a6..c65e7eb 100644 --- a/pcomfortcloud/__main__.py +++ b/pcomfortcloud/__main__.py @@ -1,10 +1,11 @@ import argparse +import os import json import pcomfortcloud from enum import Enum -def print_result(obj, indent = 0): +def print_result(obj, indent=0): for key in obj: value = obj[key] @@ -12,22 +13,25 @@ def print_result(obj, indent = 0): print(" "*indent + key) print_result(value, indent + 4) elif isinstance(value, Enum): - print(" "*indent + "{0: <{width}}: {1}".format(key, value.name, width=25-indent)) + print( + " "*indent + "{0: <{width}}: {1}".format(key, value.name, width=25-indent)) elif isinstance(value, list): print(" "*indent + "{0: <{width}}:".format(key, width=25-indent)) for elt in value: print_result(elt, indent + 4) print("") else: - print(" "*indent + "{0: <{width}}: {1}".format(key, value, width=25-indent)) + print(" "*indent + + "{0: <{width}}: {1}".format(key, value, width=25-indent)) -def str2bool(v): - if v.lower() in ('yes', 'true', 't', 'y', '1'): + +def str2bool(boolean_string_value): + if boolean_string_value.lower() in ('yes', 'true', 't', 'y', '1'): return True - elif v.lower() in ('no', 'false', 'f', 'n', '0'): + if boolean_string_value.lower() in ('no', 'false', 'f', 'n', '0'): return False - else: - raise argparse.ArgumentTypeError('Boolean value expected.') + raise argparse.ArgumentTypeError('Boolean value expected.') + def main(): """ Start pcomfortcloud Comfort Cloud command line """ @@ -46,13 +50,7 @@ def main(): parser.add_argument( '-t', '--token', help='File to store token in', - default='~/.pcomfortcloud-token') - - parser.add_argument( - '-s', '--skipVerify', - help='Skip Ssl verification if set as True', - type=str2bool, nargs='?', const=True, - default=False) + default='$HOME/.pcomfortcloud-oauth-token') parser.add_argument( '-r', '--raw', @@ -198,13 +196,13 @@ def main(): args = parser.parse_args() - session = pcomfortcloud.Session(args.username, args.password, args.token, args.raw, args.skipVerify == False) + session = pcomfortcloud.Session(args.username, args.password, args.token, args.raw) session.login() try: if args.command == 'list': print("list of devices and its device id (1-x)") for idx, device in enumerate(session.get_devices()): - if(idx > 0): + if idx > 0: print('') print("device #{}".format(idx + 1)) @@ -271,9 +269,8 @@ def main(): print_result(session.history(device['id'], args.mode, args.date)) except pcomfortcloud.ResponseError as ex: - print(ex.text) + print(ex) -# pylint: disable=C0103 if __name__ == "__main__": main() diff --git a/pcomfortcloud/apiclient.py b/pcomfortcloud/apiclient.py new file mode 100644 index 0000000..03abae4 --- /dev/null +++ b/pcomfortcloud/apiclient.py @@ -0,0 +1,276 @@ +''' +Panasonic session, using Panasonic Comfort Cloud app api +''' + +import hashlib +import re +from urllib.parse import quote_plus + +from . import authentication +from . import constants + + +class ApiClient(): + def __init__(self, auth: authentication.Authentication, raw=False): + self._auth = auth + + self._groups = None + self._devices = None + self._device_indexer = {} + self._raw = raw + self._acc_client_id = None + + def _ensure_logged_in(self): + self._auth.login() + + def _get_groups(self): + self._ensure_logged_in() + self._groups = self._auth.execute_get( + self._get_group_url(), + "get_groups", + 200 + ) + self._devices = None + + def get_devices(self): + if self._devices is None: + if self._groups is None: + self._get_groups() + + self._devices = [] + + for group in self._groups['groupList']: + if 'deviceList' in group: + device_list = group.get('deviceList', []) + else: + device_list = group.get('deviceIdList', []) + + for device in device_list: + if device: + if 'deviceHashGuid' in device: + device_id = device['deviceHashGuid'] + else: + device_id = hashlib.md5( + device['deviceGuid'].encode('utf-8')).hexdigest() + + self._device_indexer[device_id] = device['deviceGuid'] + self._devices.append({ + 'id': device_id, + 'name': device['deviceName'], + 'group': group['groupName'], + 'model': device['deviceModuleNumber'] if 'deviceModuleNumber' in device else '' + }) + return self._devices + + def dump(self, device_id): + device_guid = self._device_indexer.get(device_id) + if device_guid: + return self._auth.execute_get(self._get_device_status_url(device_guid), "dump", 200) + return None + + def history(self, device_id, mode, date, time_zone="+01:00"): + self._ensure_logged_in() + + device_guid = self._device_indexer.get(device_id) + + if device_guid: + try: + data_mode = constants.DataMode[mode].value + except KeyError: + raise Exception("Wrong mode parameter") + + payload = { + "deviceGuid": device_guid, + "dataMode": data_mode, + "date": date, + "osTimezone": time_zone + } + + json_response = self._auth.execute_post( + self._get_device_history_url(), payload, "history", 200) + + return { + 'id': device_id, + 'parameters': self._read_parameters(json_response) + } + return None + + def get_device(self, device_id): + self._ensure_logged_in() + + device_guid = self._device_indexer.get(device_id) + + if device_guid: + json_response = self._auth.execute_get( + self._get_device_status_url(device_guid), "get_device", 200) + return { + 'id': device_id, + 'parameters': self._read_parameters(json_response['parameters']) + } + return None + + def set_device(self, device_id, **kwargs): + """ Set parameters of device + + Args: + device_id (str): Id of the device + kwargs : {temperature=float}, {mode=OperationMode}, {fanSpeed=FanSpeed}, {power=Power}, + {airSwingHorizontal=}, {airSwingVertical=}, {eco=EcoMode} + """ + + parameters = {} + air_x = None + air_y = None + + if kwargs is not None: + for key, value in kwargs.items(): + if key == 'power' and isinstance(value, constants.Power): + parameters['operate'] = value.value + + if key == 'temperature': + parameters['temperatureSet'] = value + + if key == 'mode' and isinstance(value, constants.OperationMode): + parameters['operationMode'] = value.value + + if key == 'fanSpeed' and isinstance(value, constants.FanSpeed): + parameters['fanSpeed'] = value.value + + if key == 'airSwingHorizontal' and isinstance(value, constants.AirSwingLR): + air_x = value + + if key == 'airSwingVertical' and isinstance(value, constants.AirSwingUD): + air_y = value + + if key == 'eco' and isinstance(value, constants.EcoMode): + parameters['ecoMode'] = value.value + + if key == 'nanoe' and \ + isinstance(value, constants.NanoeMode) and \ + value != constants.NanoeMode.Unavailable: + parameters['nanoe'] = value.value + + # routine to set the auto mode of fan + # (either horizontal, vertical, both or disabled) + if air_x is not None or air_y is not None: + fan_auto = 0 + device = self.get_device(device_id) + + if device and device['parameters']['airSwingHorizontal'].value == -1: + fan_auto = fan_auto | 1 + + if device and device['parameters']['airSwingVertical'].value == -1: + fan_auto = fan_auto | 2 + + if air_x is not None: + if air_x.value == -1: + fan_auto = fan_auto | 1 + else: + fan_auto = fan_auto & ~1 + parameters['airSwingLR'] = air_x.value + + if air_y is not None: + if air_y.value == -1: + fan_auto = fan_auto | 2 + else: + fan_auto = fan_auto & ~2 + print(air_y.name) + parameters['airSwingUD'] = air_y.value + + if fan_auto == 3: + parameters['fanAutoMode'] = constants.AirSwingAutoMode.Both.value + elif fan_auto == 1: + parameters['fanAutoMode'] = constants.AirSwingAutoMode.AirSwingLR.value + elif fan_auto == 2: + parameters['fanAutoMode'] = constants.AirSwingAutoMode.AirSwingUD.value + else: + parameters['fanAutoMode'] = constants.AirSwingAutoMode.Disabled.value + + device_guid = self._device_indexer.get(device_id) + if device_guid: + payload = { + "deviceGuid": device_guid, + "parameters": parameters + } + _ = self._auth.execute_post( + self._get_device_status_control_url(), payload, "set_device", 200) + return True + return False + + def _read_parameters(self, parameters=dict()): + value = dict() + + _convert = { + 'insideTemperature': 'temperatureInside', + 'outTemperature': 'temperatureOutside', + 'temperatureSet': 'temperature', + 'currencyUnit': 'currencyUnit', + 'energyConsumption': 'energyConsumption', + 'estimatedCost': 'estimatedCost', + 'historyDataList': 'historyDataList', + } + for key in _convert: + if key in parameters: + value[_convert[key]] = parameters[key] + + if 'operate' in parameters: + value['power'] = constants.Power(parameters['operate']) + + if 'operationMode' in parameters: + value['mode'] = constants.OperationMode( + parameters['operationMode']) + + if 'fanSpeed' in parameters: + value['fanSpeed'] = constants.FanSpeed(parameters['fanSpeed']) + + if 'airSwingLR' in parameters: + value['airSwingHorizontal'] = constants.AirSwingLR( + parameters['airSwingLR']) + + if 'airSwingUD' in parameters: + value['airSwingVertical'] = constants.AirSwingUD( + parameters['airSwingUD']) + + if 'ecoMode' in parameters: + value['eco'] = constants.EcoMode(parameters['ecoMode']) + + if 'nanoe' in parameters: + value['nanoe'] = constants.NanoeMode(parameters['nanoe']) + + if 'fanAutoMode' in parameters: + if parameters['fanAutoMode'] == constants.AirSwingAutoMode.Both.value: + value['airSwingHorizontal'] = constants.AirSwingLR.Auto + value['airSwingVertical'] = constants.AirSwingUD.Auto + elif parameters['fanAutoMode'] == constants.AirSwingAutoMode.AirSwingLR.value: + value['airSwingHorizontal'] = constants.AirSwingLR.Auto + elif parameters['fanAutoMode'] == constants.AirSwingAutoMode.AirSwingUD.value: + value['airSwingVertical'] = constants.AirSwingUD.Auto + + return value + + def _get_group_url(self): + return '{base_url}/device/group'.format( + base_url=constants.BASE_PATH_ACC + ) + + def _get_device_status_url(self, guid): + return '{base_url}/deviceStatus/{guid}'.format( + base_url=constants.BASE_PATH_ACC, + guid=re.sub(r'(?i)\%2f', 'f', quote_plus(guid)) + ) + + def _get_device_status_now_url(self, guid): + return '{base_url}/deviceStatus/now/{guid}'.format( + base_url=constants.BASE_PATH_ACC, + guid=re.sub(r'(?i)\%2f', 'f', quote_plus(guid)) + ) + + def _get_device_status_control_url(self): + return '{base_url}/deviceStatus/control'.format( + base_url=constants.BASE_PATH_ACC + ) + + def _get_device_history_url(self): + return '{base_url}/deviceHistoryData'.format( + base_url=constants.BASE_PATH_ACC, + ) diff --git a/pcomfortcloud/authentication.py b/pcomfortcloud/authentication.py new file mode 100644 index 0000000..97b79c2 --- /dev/null +++ b/pcomfortcloud/authentication.py @@ -0,0 +1,436 @@ +import base64 +import datetime +import hashlib +import json +import random +import string +import time +import urllib +import requests + +from bs4 import BeautifulSoup +from . import exceptions +from . import constants + +def generate_random_string(length): + return ''.join(random.choice(string.ascii_letters + string.digits) for _ in range(length)) + + +def generate_random_string_hex(length): + return ''.join(random.choice(string.hexdigits) for _ in range(length)) + + +def check_response(response, function_description, expected_status): + if response.status_code != expected_status: + raise exceptions.ResponseError( + f"({function_description}: Expected status code {expected_status}, received: {response.status_code}: " + + f"{response.text}" + ) + + +def get_querystring_parameter_from_header_entry_url(response, header_entry, querystring_parameter): + header_entry_value = response.headers[header_entry] + parsed_url = urllib.parse.urlparse(header_entry_value) + params = urllib.parse.parse_qs(parsed_url.query) + return params.get(querystring_parameter, [None])[0] + + +class Authentication(): + # token: + # - access_token + # - refresh_token + # - id_token + # - unix_timestamp_token_received + # - expires_in_sec + # - acc_client_id + # - scope + + def __init__(self, username, password, token, raw=False): + self._username = username + self._password = password + self._token = token + self._raw = raw + self._app_version = constants.X_APP_VERSION + self._update_app_version() + + def _check_token_is_valid(self): + if self._token is not None: + now = datetime.datetime.now() + now_unix = time.mktime(now.timetuple()) + + # multiple parts in access_token which are separated by . + part_of_token_b64 = str(self._token["access_token"].split(".")[1]) + # as seen here: https://stackoverflow.com/questions/3302946/how-to-decode-base64-url-in-python + part_of_token = base64.urlsafe_b64decode( + part_of_token_b64 + '=' * (4 - len(part_of_token_b64) % 4)) + token_info_json = json.loads(part_of_token) + + expiry_in_token = token_info_json["exp"] + + if (now_unix > expiry_in_token) or \ + (now_unix > self._token["unix_timestamp_token_received"] + self._token["expires_in_sec"]): + + if self._raw: + print("--- Token is invalid") + return False + + if self._raw: + print("--- Token is valid") + return True + else: + if self._raw: + print("--- Token is invalid") + return False + + def _get_new_token(self): + requests_session = requests.Session() + + # generate initial state and code_challenge + state = generate_random_string(20) + code_verifier = generate_random_string(43) + + code_challenge = base64.urlsafe_b64encode( + hashlib.sha256( + code_verifier.encode('utf-8') + ).digest()).split('='.encode('utf-8'))[0].decode('utf-8') + + # -------------------------------------------------------------------- + # AUTHORIZE + # -------------------------------------------------------------------- + + response = requests_session.get( + f'{constants.BASE_PATH_AUTH}/authorize', + headers={ + "user-agent": "okhttp/4.10.0", + }, + params={ + "scope": "openid offline_access comfortcloud.control a2w.control", + "audience": f"https://digital.panasonic.com/{constants.APP_CLIENT_ID}/api/v1/", + "protocol": "oauth2", + "response_type": "code", + "code_challenge": code_challenge, + "code_challenge_method": "S256", + "auth0Client": constants.AUTH_0_CLIENT, + "client_id": constants.APP_CLIENT_ID, + "redirect_uri": constants.REDIRECT_URI, + "state": state, + }, + allow_redirects=False) + check_response(response, 'authorize', 302) + + # ------------------------------------------------------------------- + # FOLLOW REDIRECT + # ------------------------------------------------------------------- + + location = response.headers['Location'] + state = get_querystring_parameter_from_header_entry_url( + response, 'Location', 'state') + + if not location.startswith(constants.REDIRECT_URI): + response = requests_session.get( + f"{constants.BASE_PATH_AUTH}/{location}", + allow_redirects=False) + check_response(response, 'authorize_redirect', 200) + + # get the "_csrf" cookie + csrf = response.cookies['_csrf'] + + # ------------------------------------------------------------------- + # LOGIN + # ------------------------------------------------------------------- + + response = requests_session.post( + f'{constants.BASE_PATH_AUTH}/usernamepassword/login', + headers={ + "Auth0-Client": constants.AUTH_0_CLIENT, + "user-agent": "okhttp/4.10.0", + }, + json={ + "client_id": constants.APP_CLIENT_ID, + "redirect_uri": constants.REDIRECT_URI, + "tenant": "pdpauthglb-a1", + "response_type": "code", + "scope": "openid offline_access comfortcloud.control a2w.control", + "audience": f"https://digital.panasonic.com/{constants.APP_CLIENT_ID}/api/v1/", + "_csrf": csrf, + "state": state, + "_intstate": "deprecated", + "username": self._username, + "password": self._password, + "lang": "en", + "connection": "PanasonicID-Authentication" + }, + allow_redirects=False) + check_response(response, 'login', 200) + + # ------------------------------------------------------------------- + # CALLBACK + # ------------------------------------------------------------------- + + # get wa, wresult, wctx from body + soup = BeautifulSoup(response.content, "html.parser") + input_lines = soup.find_all("input", {"type": "hidden"}) + parameters = dict() + for input_line in input_lines: + parameters[input_line.get("name")] = input_line.get("value") + + user_agent = "Mozilla/5.0 (Linux; Android 10; K) AppleWebKit/537.36 " + user_agent += "(KHTML, like Gecko) Chrome/113.0.0.0 Mobile Safari/537.36" + + response = requests_session.post( + url=f"{constants.BASE_PATH_AUTH}/login/callback", + data=parameters, + headers={ + "Content-Type": "application/x-www-form-urlencoded", + "User-Agent": user_agent, + }, + allow_redirects=False) + check_response(response, 'login_callback', 302) + + # ------------------------------------------------------------------ + # FOLLOW REDIRECT + # ------------------------------------------------------------------ + + location = response.headers['Location'] + + response = requests_session.get( + f"{constants.BASE_PATH_AUTH}/{location}", + allow_redirects=False) + check_response(response, 'login_redirect', 302) + + # ------------------------------------------------------------------ + # GET TOKEN + # ------------------------------------------------------------------ + + code = get_querystring_parameter_from_header_entry_url( + response, 'Location', 'code') + + # do before, so that timestamp is older rather than newer + now = datetime.datetime.now() + unix_time_token_received = time.mktime(now.timetuple()) + + response = requests_session.post( + f'{constants.BASE_PATH_AUTH}/oauth/token', + headers={ + "Auth0-Client": constants.AUTH_0_CLIENT, + "user-agent": "okhttp/4.10.0", + }, + json={ + "scope": "openid", + "client_id": constants.APP_CLIENT_ID, + "grant_type": "authorization_code", + "code": code, + "redirect_uri": constants.REDIRECT_URI, + "code_verifier": code_verifier + }, + allow_redirects=False) + check_response(response, 'get_token', 200) + + token_response = json.loads(response.text) + + # ------------------------------------------------------------------ + # RETRIEVE ACC_CLIENT_ID + # ------------------------------------------------------------------ + now = datetime.datetime.now() + timestamp = now.strftime("%Y-%m-%d %H:%M:%S") + response = requests.post( + f'{constants.BASE_PATH_ACC}/auth/v2/login', + headers={ + "Content-Type": "application/json;charset=utf-8", + "User-Agent": "G-RAC", + "x-app-name": "Comfort Cloud", + "x-app-timestamp": timestamp, + "x-app-type": "1", + "x-app-version": self._app_version, + "x-cfc-api-key": generate_random_string_hex(128), + "x-user-authorization-v2": "Bearer " + token_response["access_token"] + }, + json={ + "language": 0 + }) + check_response(response, 'get_acc_client_id', 200) + + json_body = json.loads(response.text) + acc_client_id = json_body["clientId"] + + self._token = { + "access_token": token_response["access_token"], + "refresh_token": token_response["refresh_token"], + "id_token": token_response["id_token"], + "unix_timestamp_token_received": unix_time_token_received, + "expires_in_sec": token_response["expires_in"], + "acc_client_id": acc_client_id, + "scope": token_response["scope"] + } + + def get_token(self): + return self._token + + def set_token(self, token): + self._token = token + + def is_token_valid(self): + return self._check_token_is_valid() + + def login(self): + if self._token is not None: + if not self._check_token_is_valid(): + self._refresh_token() + return "Refreshing" + else: + self._get_new_token() + return "Authenticating" + + return "Valid" + + def logout(self): + response = requests.post( + f"{constants.BASE_PATH_ACC}/auth/v2/logout", + headers=self._get_header_for_api_calls() + ) + check_response(response, "logout", 200) + if json.loads(response.text)["result"] != 0: + # issue during logout, but do we really care? + pass + + def _refresh_token(self): + # do before, so that timestamp is older rather than newer + now = datetime.datetime.now() + unix_time_token_received = time.mktime(now.timetuple()) + + response = requests.post( + f'{constants.BASE_PATH_AUTH}/oauth/token', + headers={ + "Auth0-Client": constants.AUTH_0_CLIENT, + "user-agent": "okhttp/4.10.0", + }, + json={ + "scope": self._token["scope"], + "client_id": constants.APP_CLIENT_ID, + "refresh_token": self._token["refresh_token"], + "grant_type": "refresh_token" + }, + allow_redirects=False) + + if response.status_code != 200: + self._get_new_token() + return + + token_response = json.loads(response.text) + + self._token = { + "access_token": token_response["access_token"], + "refresh_token": token_response["refresh_token"], + "id_token": token_response["id_token"], + "unix_timestamp_token_received": unix_time_token_received, + "expires_in_sec": token_response["expires_in"], + "acc_client_id": self._token["acc_client_id"], + "scope": token_response["scope"] + } + + def _get_header_for_api_calls(self, no_client_id=False): + now = datetime.datetime.now() + timestamp = now.strftime("%Y-%m-%d %H:%M:%S") + return { + "Content-Type": "application/json;charset=utf-8", + "x-app-name": "Comfort Cloud", + "user-agent": "G-RAC", + "x-app-timestamp": timestamp, + "x-app-type": "1", + "x-app-version": self._app_version, + # Seems to work by either setting X-CFC-API-KEY to 0 or to a 128-long hex string + # "X-CFC-API-KEY": "0", + "x-cfc-api-key": generate_random_string_hex(128), + "x-client-id": self._token["acc_client_id"], + "x-user-authorization-v2": "Bearer " + self._token["access_token"] + } + + def _get_user_info(self): + response = requests.get( + f'{constants.BASE_PATH_AUTH}/userinfo', + headers={ + "Auth0-Client": self.AUTH_0_CLIENT, + "Authorization": "Bearer " + self._token["access_token"] + }) + check_response(response, 'userinfo', 200) + + def execute_post(self, + url, + json_data, + function_description, + expected_status_code): + self._ensure_valid_token() + + try: + response = requests.post( + url, + json=json_data, + headers=self._get_header_for_api_calls() + ) + except requests.exceptions.RequestException as ex: + raise exceptions.RequestError(ex) + + self._print_response_if_raw_is_set(response, function_description) + check_response(response, function_description, expected_status_code) + return json.loads(response.text) + + def execute_get(self, url, function_description, expected_status_code): + self._ensure_valid_token() + + try: + response = requests.get( + url, + headers=self._get_header_for_api_calls() + ) + except requests.exceptions.RequestException as ex: + raise exceptions.RequestError(ex) + + self._print_response_if_raw_is_set(response, function_description) + check_response(response, function_description, expected_status_code) + return json.loads(response.text) + + def _print_response_if_raw_is_set(self, response, function_description): + if self._raw: + print("=" * 79) + print(f"Response: {function_description}") + print("=" * 79) + print(f"Status: {response.status_code}") + print("-" * 79) + print("Headers:") + for header in response.headers: + print(f'{header}: {response.headers[header]}') + print("-" * 79) + print("Response body:") + print(response.text) + print("-" * 79) + + def _ensure_valid_token(self): + if self._check_token_is_valid(): + return + self._refresh_token() + + def _update_app_version(self): + if self._raw: + print("--- auto detecting latest app version") + try: + response = requests.get(constants.APPBRAIN_URL) + responseContent = response.content + soup = BeautifulSoup(responseContent, "html.parser") + meta_tag = soup.find("meta", itemprop="softwareVersion") + if meta_tag is not None: + version = meta_tag['content'] + self._app_version = version + if self._raw: + print("--- found version: {}".format(self._app_version)) + return + else: + self._app_version = constants.X_APP_VERSION + print("--- Error finding meta_tag") + return + + except Exception: + self._app_version = constants.X_APP_VERSION + if self._raw: + print("--- failed to detect app version using version default") + pass + diff --git a/pcomfortcloud/certificatechain.pem b/pcomfortcloud/certificatechain.pem deleted file mode 100644 index 2b66cc6..0000000 --- a/pcomfortcloud/certificatechain.pem +++ /dev/null @@ -1,50 +0,0 @@ ------BEGIN CERTIFICATE----- -MIIDdzCCAl+gAwIBAgIBADANBgkqhkiG9w0BAQsFADBdMQswCQYDVQQGEwJKUDEl -MCMGA1UEChMcU0VDT00gVHJ1c3QgU3lzdGVtcyBDTy4sTFRELjEnMCUGA1UECxMe -U2VjdXJpdHkgQ29tbXVuaWNhdGlvbiBSb290Q0EyMB4XDTA5MDUyOTA1MDAzOVoX -DTI5MDUyOTA1MDAzOVowXTELMAkGA1UEBhMCSlAxJTAjBgNVBAoTHFNFQ09NIFRy -dXN0IFN5c3RlbXMgQ08uLExURC4xJzAlBgNVBAsTHlNlY3VyaXR5IENvbW11bmlj -YXRpb24gUm9vdENBMjCCASIwDQYJKoZIhvcNAQEBBQADggEPADCCAQoCggEBANAV -OVKxUrO6xVmCxF1SrjpDZYBLx/KWvNs2l9amZIyoXvDjChz335c9S672XewhtUGr -zbl+dp+++T42NKA7wfYxEUV0kz1XgMX5iZnK5atq1LXaQZAQwdbWQonCv/Q4EpVM -VAX3NuRFg3sUZdbcDE3R3n4MqzvEFb46VqZab3ZpUql6ucjrappdUtAtCms1FgkQ -hNBqyjoGADdH5H5XTz+L62e4iKrFvlNVspHEfbmwhRkGeC7bYRr6hfVKkaHnFtWO -ojnflLhwHyg/i/xAXmODPIMqGplrz95Zajv8bxbXH/1KEOtOghY6rCcMU/Gt1SSw -awNQwS08Ft1ENCcadfsCAwEAAaNCMEAwHQYDVR0OBBYEFAqFqXdlBZh8QIH4D5cs -OPEK7DzPMA4GA1UdDwEB/wQEAwIBBjAPBgNVHRMBAf8EBTADAQH/MA0GCSqGSIb3 -DQEBCwUAA4IBAQBMOqNErLlFsceTfsgLCkLfZOoc7llsCLqJX2rKSpWeeo8HxdpF -coJxDjrSzG+ntKEju/Ykn8sX/oymzsLS28yN/HH8AynBbF0zX2S2ZTuJbxh2ePXc -okgfGT+Ok+vx+hfuzU7jBBJV1uXk3fs+BXziHV7Gp7yXT2g69ekuCkO2r1dcYmh8 -t/2jioSgrGK+KwmHNPBqAbubKVY8/gA3zyNs8U6qtnRGEmyR7jTV7JqR50S+kDFy -1UkC9gLl9B/rfNmWVan/7Ir5mUf/NVoCqgTLiluHcSmRvaS0eg29mvVXIwAHIRc/ -SjnRBUkLp7Y3gaVdjKozXoEofKd9J+sAro03 ------END CERTIFICATE----- ------BEGIN CERTIFICATE----- -MIIE7jCCA9agAwIBAgIJIrmxYwzstDwuMA0GCSqGSIb3DQEBCwUAMF0xCzAJBgNV -BAYTAkpQMSUwIwYDVQQKExxTRUNPTSBUcnVzdCBTeXN0ZW1zIENPLixMVEQuMScw -JQYDVQQLEx5TZWN1cml0eSBDb21tdW5pY2F0aW9uIFJvb3RDQTIwHhcNMTkwOTI3 -MDE1NDIzWhcNMjkwNTI5MDUwMDM5WjBeMQswCQYDVQQGEwJKUDEjMCEGA1UEChMa -Q3liZXJ0cnVzdCBKYXBhbiBDby4sIEx0ZC4xKjAoBgNVBAMTIUN5YmVydHJ1c3Qg -SmFwYW4gU3VyZVNlcnZlciBDQSBHNDCCASIwDQYJKoZIhvcNAQEBBQADggEPADCC -AQoCggEBAMtunFmosT8IxBkVFP+OnkGkcVmbui+hdVBlGZhniniVluAhigm2WUxx -p4X5V3B/QKJLZmeAswmzxGKXaDCzcomYxXTygNXcTLI+IMyRisEO7V1NXFHUjSEl -KaY1LzCA9/emldnmRjX6B9Zt5xXK5q12WOIWkJECEwwKku77tvtKZPRKaCNCGsZ5 -Hja7PBs07jLoE0rMuZLQZNQEB0W63attKGCGzEk50lDj+wQ0UlUbQk3zAEsvdE6X -o1qZy9l783Va40vSx3VqhGYb4jWQrg2CrAtJcKQNSJ0m9yxJVVQDwpQQwGxHO5Em -Qv1LGJExASegOXzhzqCr5yiwECfSrOsCAwEAAaOCAa4wggGqMB0GA1UdDgQWBBRi -p9La3oW2kvGFvPbolZ11oPpOHzAfBgNVHSMEGDAWgBQKhal3ZQWYfECB+A+XLDjx -Cuw8zzASBgNVHRMBAf8ECDAGAQH/AgEAMA4GA1UdDwEB/wQEAwIBBjBJBgNVHR8E -QjBAMD6gPKA6hjhodHRwOi8vcmVwb3NpdG9yeS5zZWNvbXRydXN0Lm5ldC9TQy1S -b290Mi9TQ1Jvb3QyQ1JMLmNybDBSBgNVHSAESzBJMEcGCiqDCIybG2SHBQQwOTA3 -BggrBgEFBQcCARYraHR0cHM6Ly9yZXBvc2l0b3J5LnNlY29tdHJ1c3QubmV0L1ND -LVJvb3QyLzCBhQYIKwYBBQUHAQEEeTB3MDAGCCsGAQUFBzABhiRodHRwOi8vc2Ny -b290Y2EyLm9jc3Auc2Vjb210cnVzdC5uZXQwQwYIKwYBBQUHMAKGN2h0dHA6Ly9y -ZXBvc2l0b3J5LnNlY29tdHJ1c3QubmV0L1NDLVJvb3QyL1NDUm9vdDJjYS5jZXIw -HQYDVR0lBBYwFAYIKwYBBQUHAwEGCCsGAQUFBwMCMA0GCSqGSIb3DQEBCwUAA4IB -AQDFUXkuQAZKY5Pb/JvgufizJ8uZKpN++fiI8kOTgkrSriNYNjO9iEdTwVV3sW3P -abr3lAdCGrJ+wgArLLUnBZ7VGM//UlDhTRe4A19c0TmV1udTSbxabwQJbN1zIGbG -3KQsumg9/70PaIkawnihx9/TxGCrJkAH5W+9pkX7OWefsahylwiPkKRkJRv2em/v -rMRyb3LxRU5t84k1xQqCnMxTEWOAiWoOtatExZ38a8kn8kqiE1NEvtPOzSrVjSvD -7pH56AlvO7hzGzng60kyWnAz6O5rQ0tsFIgW9xloWTQQVcfEtzvjc8ptuP9onkbA -jMRDJaqLXrIVtB7GMek7S6AO ------END CERTIFICATE----- diff --git a/pcomfortcloud/constants.py b/pcomfortcloud/constants.py index 95bfb53..9b999d1 100644 --- a/pcomfortcloud/constants.py +++ b/pcomfortcloud/constants.py @@ -1,5 +1,13 @@ from enum import Enum +APP_CLIENT_ID = "Xmy6xIYIitMxngjB2rHvlm6HSDNnaMJx" +AUTH_0_CLIENT = "eyJuYW1lIjoiQXV0aDAuQW5kcm9pZCIsImVudiI6eyJhbmRyb2lkIjoiMzAifSwidmVyc2lvbiI6IjIuOS4zIn0=" +REDIRECT_URI = "panasonic-iot-cfc://authglb.digital.panasonic.com/android/com.panasonic.ACCsmart/callback" +BASE_PATH_AUTH = "https://authglb.digital.panasonic.com" +BASE_PATH_ACC = "https://accsmart.panasonic.com" +X_APP_VERSION = "1.21.0" +APPBRAIN_URL = "https://www.appbrain.com/app/panasonic-comfort-cloud/com.panasonic.ACCsmart" + class Power(Enum): Off = 0 On = 1 @@ -47,7 +55,7 @@ class FanSpeed(Enum): HighMid = 4 High = 5 -class dataMode(Enum): +class DataMode(Enum): Day = 0 Week = 1 Month = 2 diff --git a/pcomfortcloud/exceptions.py b/pcomfortcloud/exceptions.py new file mode 100644 index 0000000..73e5e51 --- /dev/null +++ b/pcomfortcloud/exceptions.py @@ -0,0 +1,14 @@ +class Error(Exception): + pass + + +class LoginError(Error): + pass + + +class RequestError(Error): + pass + + +class ResponseError(Error): + pass diff --git a/pcomfortcloud/session.py b/pcomfortcloud/session.py index 0ba6996..ee15b8b 100644 --- a/pcomfortcloud/session.py +++ b/pcomfortcloud/session.py @@ -1,49 +1,12 @@ -''' -Panasonic session, using Panasonic Comfort Cloud app api -''' - -import json -import requests import os -import urllib3 -import hashlib +import json +from pathlib import Path -from . import urls +from .authentication import Authentication +from .apiclient import ApiClient from . import constants -def _validate_response(response): - """ Verify that response is OK """ - if response.status_code == 200: - return - raise ResponseError(response.status_code, response.text) - - -class Error(Exception): - ''' Panasonic session error ''' - pass - -class RequestError(Error): - ''' Wrapped requests.exceptions.RequestException ''' - pass - - -class LoginError(Error): - ''' Login failed ''' - pass - -class ResponseError(Error): - ''' Unexcpected response ''' - def __init__(self, status_code, text): - super(ResponseError, self).__init__( - 'Invalid response' - ', status code: {0} - Data: {1}'.format( - status_code, - text)) - self.status_code = status_code - self.text = text - - -class Session(object): +class Session(Authentication): """ Verisure app session Args: @@ -52,405 +15,60 @@ class Session(object): """ - def __init__(self, username, password, tokenFileName='~/.panasonic-token', raw=False, verifySsl=True): - self._username = username - self._password = password - self._tokenFileName = os.path.expanduser(tokenFileName) - self._vid = None - self._groups = None - self._devices = None - self._deviceIndexer = {} - self._raw = raw - - if verifySsl == False: - urllib3.disable_warnings(urllib3.exceptions.InsecureRequestWarning) - self._verifySsl = verifySsl - else: - self._verifySsl = os.path.join(os.path.dirname(__file__), - "certificatechain.pem") + def __init__(self, username, password, tokenFileName='$HOME/.panasonic-oauth-token', raw=False): + super().__init__(username, password, None, raw) - def __enter__(self): - self.login() - return self - - def __exit__(self, exc_type, exc_val, exc_tb): - self.logout() + home = str(Path.home()) + self._tokenFileName = os.path.expanduser(tokenFileName.replace("$HOME", home)) + self._api = ApiClient(self, raw) def login(self): - """ Login to verisure app api """ - - if os.path.exists(self._tokenFileName): - with open(self._tokenFileName, 'r') as cookieFile: - self._vid = cookieFile.read().strip() + if super().is_token_valid() is True: + return - if self._raw: print("--- token found") + if super().get_token() is None and os.path.exists(self._tokenFileName): + with open(self._tokenFileName, "r") as tokenFile: + self.token = json.load(tokenFile) - try: - self._get_groups() + if self._raw: print("--- token read") + super().set_token(self.token) - except ResponseError: - if self._raw: print("--- token probably expired") + state = super().login() - self._vid = None - self._devices = None - os.remove(self._tokenFileName) + if self._raw: print("--- authentication state: " + state) - if self._vid is None: - self._create_token() - with open(self._tokenFileName, 'w') as tokenFile: - tokenFile.write(self._vid) + if state != "Valid": + self.token = super().get_token() + + with open(self._tokenFileName, "w") as tokenFile: + json.dump(self.token, tokenFile, indent=4) - self._get_groups() + if self._raw: print("--- token written") def logout(self): - """ Logout """ - - def _headers(self): - return { - "X-APP-TYPE": "1", - "X-APP-VERSION": "1.20.1", - "X-User-Authorization": self._vid, - "X-APP-TIMESTAMP": "1", - "X-APP-NAME": "Comfort Cloud", - "X-CFC-API-KEY": "Comfort Cloud", - "User-Agent": "G-RAC", - "Accept": "application/json; charset=utf-8", - "Content-Type": "application/json; charset=utf-8" - } - - def _create_token(self): - response = None - - payload = { - "language": 0, - "loginId": self._username, - "password": self._password - } - - if self._raw: print("--- creating token by authenticating") - - try: - response = requests.post(urls.login(), json=payload, headers=self._headers(), verify=self._verifySsl) - if 2 != response.status_code // 100: - raise ResponseError(response.status_code, response.text) - - except requests.exceptions.RequestException as ex: - raise LoginError(ex) - - _validate_response(response) - - if(self._raw is True): - print("--- raw beginning ---") - print(response.text) - print("--- raw ending ---\n") - - self._vid = json.loads(response.text)['uToken'] - - def _get_groups(self): - """ Get information about groups """ - response = None + super().logout() - try: - response = requests.get(urls.get_groups(),headers=self._headers(), verify=self._verifySsl) + def execute_post(self, + url, + json_data, + function_description, + expected_status_code): + return super().execute_post(url, json_data, function_description, expected_status_code) - if 2 != response.status_code // 100: - raise ResponseError(response.status_code, response.text) - - except requests.exceptions.RequestException as ex: - raise RequestError(ex) - - _validate_response(response) - - if(self._raw is True): - print("--- _get_groups()") - print("--- raw beginning ---") - print(response.text) - print("--- raw ending ---\n") - - self._groups = json.loads(response.text) - self._devices = None + def execute_get(self, url, function_description, expected_status_code): + return super().execute_get(url, function_description, expected_status_code) def get_devices(self, group=None): - if self._vid is None: - self.login() - - if self._devices is None: - self._devices = [] - - for group in self._groups['groupList']: - if 'deviceList' in group: - list = group.get('deviceList', []) - else: - list = group.get('deviceIdList', []) - - for device in list: - if device: - id = None - if 'deviceHashGuid' in device: - id = device['deviceHashGuid'] - else: - id = hashlib.md5(device['deviceGuid'].encode('utf-8')).hexdigest() - - self._deviceIndexer[id] = device['deviceGuid'] - self._devices.append({ - 'id': id, - 'name': device['deviceName'], - 'group': group['groupName'], - 'model': device['deviceModuleNumber'] if 'deviceModuleNumber' in device else '' - }) - - return self._devices + return self._api.get_devices() def dump(self, id): - deviceGuid = self._deviceIndexer.get(id) - - if(deviceGuid): - response = None - - try: - response = requests.get(urls.status(deviceGuid), headers=self._headers(), verify=self._verifySsl) - - if 2 != response.status_code // 100: - raise ResponseError(response.status_code, response.text) - - except requests.exceptions.RequestException as ex: - raise RequestError(ex) - - _validate_response(response) - return json.loads(response.text) - - return None + return self._api.dump(id) def history(self, id, mode, date, tz="+01:00"): - deviceGuid = self._deviceIndexer.get(id) - - if(deviceGuid): - response = None - - try: - dataMode = constants.dataMode[mode].value - except KeyError: - raise Exception("Wrong mode parameter") - - payload = { - "deviceGuid": deviceGuid, - "dataMode": dataMode, - "date": date, - "osTimezone": tz - } - - try: - response = requests.post(urls.history(), json=payload, headers=self._headers(), verify=self._verifySsl) - - if 2 != response.status_code // 100: - raise ResponseError(response.status_code, response.text) - - except requests.exceptions.RequestException as ex: - raise RequestError(ex) - - _validate_response(response) - - if(self._raw is True): - print("--- history()") - print("--- raw beginning ---") - print(response.text) - print("--- raw ending ---") - - _json = json.loads(response.text) - return { - 'id': id, - 'parameters': self._read_parameters(_json) - } - - return None + return self._api.history(id, mode, date, tz) def get_device(self, id): - deviceGuid = self._deviceIndexer.get(id) - - if(deviceGuid): - response = None - - try: - response = requests.get(urls.status(deviceGuid), headers=self._headers(), verify=self._verifySsl) - - if 2 != response.status_code // 100: - raise ResponseError(response.status_code, response.text) - - except requests.exceptions.RequestException as ex: - raise RequestError(ex) - - _validate_response(response) - - if(self._raw is True): - print("--- get_device()") - print("--- raw beginning ---") - print(response.text) - print("--- raw ending ---") - - - _json = json.loads(response.text) - return { - 'id': id, - 'parameters': self._read_parameters(_json['parameters']) - } - - return None + return self._api.get_device(id) def set_device(self, id, **kwargs): - """ Set parameters of device - - Args: - id (str): Id of the device - kwargs : {temperature=float}, {mode=OperationMode}, {fanSpeed=FanSpeed}, {power=Power}, {airSwingHorizontal=}, {airSwingVertical=}, {eco=EcoMode} - """ - - parameters = {} - airX = None - airY = None - - if kwargs is not None: - for key, value in kwargs.items(): - if key == 'power' and isinstance(value, constants.Power): - parameters['operate'] = value.value - - if key == 'temperature': - parameters['temperatureSet'] = value - - if key == 'mode' and isinstance(value, constants.OperationMode): - parameters['operationMode'] = value.value - - if key == 'fanSpeed' and isinstance(value, constants.FanSpeed): - parameters['fanSpeed'] = value.value - - if key == 'airSwingHorizontal' and isinstance(value, constants.AirSwingLR): - airX = value - - if key == 'airSwingVertical' and isinstance(value, constants.AirSwingUD): - airY = value - - if key == 'eco' and isinstance(value, constants.EcoMode): - parameters['ecoMode'] = value.value - - if key == 'nanoe' and isinstance(value, constants.NanoeMode) and value != constants.NanoeMode.Unavailable: - parameters['nanoe'] = value.value - - - # routine to set the auto mode of fan (either horizontal, vertical, both or disabled) - if airX is not None or airY is not None: - fanAuto = 0 - device = self.get_device(id) - - if device and device['parameters']['airSwingHorizontal'].value == -1: - fanAuto = fanAuto | 1 - - if device and device['parameters']['airSwingVertical'].value == -1: - fanAuto = fanAuto | 2 - - if airX is not None: - if airX.value == -1: - fanAuto = fanAuto | 1 - else: - fanAuto = fanAuto & ~1 - parameters['airSwingLR'] = airX.value - - if airY is not None: - if airY.value == -1: - fanAuto = fanAuto | 2 - else: - fanAuto = fanAuto & ~2 - print(airY.name) - parameters['airSwingUD'] = airY.value - - if fanAuto == 3: - parameters['fanAutoMode'] = constants.AirSwingAutoMode.Both.value - elif fanAuto == 1: - parameters['fanAutoMode'] = constants.AirSwingAutoMode.AirSwingLR.value - elif fanAuto == 2: - parameters['fanAutoMode'] = constants.AirSwingAutoMode.AirSwingUD.value - else: - parameters['fanAutoMode'] = constants.AirSwingAutoMode.Disabled.value - - deviceGuid = self._deviceIndexer.get(id) - if(deviceGuid): - response = None - - payload = { - "deviceGuid": deviceGuid, - "parameters": parameters - } - - if(self._raw is True): - print("--- set_device()") - print("--- raw out beginning ---") - print(payload) - print("--- raw out ending ---") - - try: - response = requests.post(urls.control(), json=payload, headers=self._headers(), verify=self._verifySsl) - - if 2 != response.status_code // 100: - raise ResponseError(response.status_code, response.text) - - except requests.exceptions.RequestException as ex: - raise RequestError(ex) - - _validate_response(response) - - if(self._raw is True): - print("--- raw in beginning ---") - print(response.text) - print("--- raw in ending ---\n") - - _json = json.loads(response.text) - - return True - - return False - - def _read_parameters(self, parameters = {}): - value = {} - - _convert = { - 'insideTemperature': 'temperatureInside', - 'outTemperature': 'temperatureOutside', - 'temperatureSet': 'temperature', - 'currencyUnit': 'currencyUnit', - 'energyConsumption': 'energyConsumption', - 'estimatedCost': 'estimatedCost', - 'historyDataList': 'historyDataList', - } - for key in _convert: - if key in parameters: - value[_convert[key]] = parameters[key] - - if 'operate' in parameters: - value['power'] = constants.Power(parameters['operate']) - - if 'operationMode' in parameters: - value['mode'] = constants.OperationMode(parameters['operationMode']) - - if 'fanSpeed' in parameters: - value['fanSpeed'] = constants.FanSpeed(parameters['fanSpeed']) - - if 'airSwingLR' in parameters: - value['airSwingHorizontal'] = constants.AirSwingLR(parameters['airSwingLR']) - - if 'airSwingUD' in parameters: - value['airSwingVertical'] = constants.AirSwingUD(parameters['airSwingUD']) - - if 'ecoMode' in parameters: - value['eco'] = constants.EcoMode(parameters['ecoMode']) - - if 'nanoe' in parameters: - value['nanoe'] = constants.NanoeMode(parameters['nanoe']) - - if 'fanAutoMode' in parameters: - if parameters['fanAutoMode'] == constants.AirSwingAutoMode.Both.value: - value['airSwingHorizontal'] = constants.AirSwingLR.Auto - value['airSwingVertical'] = constants.AirSwingUD.Auto - elif parameters['fanAutoMode'] == constants.AirSwingAutoMode.AirSwingLR.value: - value['airSwingHorizontal'] = constants.AirSwingLR.Auto - elif parameters['fanAutoMode'] == constants.AirSwingAutoMode.AirSwingUD.value: - value['airSwingVertical'] = constants.AirSwingUD.Auto - - return value + return self._api.set_device(id, **kwargs) diff --git a/pcomfortcloud/urls.py b/pcomfortcloud/urls.py deleted file mode 100644 index 44a69ec..0000000 --- a/pcomfortcloud/urls.py +++ /dev/null @@ -1,40 +0,0 @@ -import re - -try: - # Python 3 - from urllib.parse import quote_plus -except ImportError: - # Python 2 - from urllib import quote_plus - -BASE_URL = 'https://accsmart.panasonic.com' - -def login(): - return '{base_url}/auth/login'.format( - base_url=BASE_URL) - -def get_groups(): - return '{base_url}/device/group'.format( - base_url=BASE_URL) - -def status(guid): - return '{base_url}/deviceStatus/{guid}'.format( - base_url=BASE_URL, - guid=re.sub('(?i)\%2f', 'f', quote_plus(guid)) - ) - -def statusCache(guid): - return '{base_url}/deviceStatus/now/{guid}'.format( - base_url=BASE_URL, - guid=re.sub('(?i)\%2f', 'f', quote_plus(guid)) - ) - -def control(): - return '{base_url}/deviceStatus/control'.format( - base_url=BASE_URL - ) - -def history(): - return '{base_url}/deviceHistoryData'.format( - base_url=BASE_URL, - ) diff --git a/requirements.txt b/requirements.txt new file mode 100644 index 0000000..8263152 --- /dev/null +++ b/requirements.txt @@ -0,0 +1,3 @@ +requests +urllib3 +bs4 diff --git a/setup.py b/setup.py index ca19901..d6e9081 100644 --- a/setup.py +++ b/setup.py @@ -16,7 +16,7 @@ author='Lostfields', license='MIT', classifiers=[ - 'Topic :: Home Automation', + 'Topic :: Home Automation', 'License :: OSI Approved :: MIT License', 'Programming Language :: Python :: 3.4', 'Programming Language :: Python :: 3.5',