diff --git a/pycaw/messari/defillama/__init__.py b/pycaw/defillama/__init__.py similarity index 64% rename from pycaw/messari/defillama/__init__.py rename to pycaw/defillama/__init__.py index f1d5bc4..e96a87d 100644 --- a/pycaw/messari/defillama/__init__.py +++ b/pycaw/defillama/__init__.py @@ -1,4 +1,7 @@ -"""Module to handle initialization, imports, for DeFiLlama class""" +"""Module to handle initialization, imports, for DeFiLlama class + +DeFiLLama API Docs: https://defillama.com/docs/api +""" from .defillama import * diff --git a/pycaw/messari/defillama/defillama.py b/pycaw/defillama/defillama.py similarity index 66% rename from pycaw/messari/defillama/defillama.py rename to pycaw/defillama/defillama.py index 3b5f6cd..3ef5ec1 100644 --- a/pycaw/messari/defillama/defillama.py +++ b/pycaw/defillama/defillama.py @@ -1,38 +1,60 @@ -"""This module is meant to contain the DeFiLlama class""" +"""This module is meant to contain the DeFiLlama class + +DeFiLLama API Docs: https://defillama.com/docs/api +""" -# Global imports import datetime -from string import Template -from typing import Union, List, Dict + +from pycaw.defillama import helpers +from pycaw.messari import dataloader +from pycaw.messari import utils import pandas as pd +from typing import Union, List, Dict -from messari.dataloader import DataLoader -# Local imports -from messari.utils import validate_input, get_taxonomy_dict, time_filter_df -from .helpers import format_df ########################## # URL Endpoints ########################## -DL_PROTOCOLS_URL = "https://api.llama.fi/protocols" -DL_GLOBAL_TVL_URL = "https://api.llama.fi/charts/" -DL_CURRENT_PROTOCOL_TVL_URL = Template("https://api.llama.fi/tvl/$slug") -DL_CHAIN_TVL_URL = Template("https://api.llama.fi/charts/$chain") -DL_GET_PROTOCOL_TVL_URL = Template("https://api.llama.fi/protocol/$slug") -class DeFiLlama(DataLoader): - """This class is a wrapper around the DeFi Llama API - """ +class DeFiLlama(dataloader.DataLoader): + """This class is a wrapper around the DeFi Llama API""" - def __init__(self): - messari_to_dl_dict = get_taxonomy_dict("messari_to_dl.json") - DataLoader.__init__(self, api_dict=None, taxonomy_dict=messari_to_dl_dict) + api_urls: Dict[str, str] - def get_protocol_tvl_timeseries(self, asset_slugs: Union[str, List], - start_date: Union[str, datetime.datetime] = None, - end_date: Union[str, datetime.datetime] = None) -> pd.DataFrame: + def __init__(self): + messari_to_dl_dict = utils.get_taxonomy_dict("messari_to_dl.json") + dataloader.DataLoader.__init__( + self, api_dict=None, taxonomy_dict=messari_to_dl_dict + ) + + @property + def api_urls(self) -> Dict[str, str]: + _endpoint_preamble: str = "https://api.llama.fi" + urls = dict( + # List all protocols on defillama along with their tvl + protocols="/".join([_endpoint_preamble, "protocols"]), + # Get historical TVL of a protocol and breakdowns by token and chain + get_protocol_tvl="/".join([_endpoint_preamble, "protocol", "{_slug}"]), + # Get historical TVL on DeFi on all chains + global_tvl="/".join([_endpoint_preamble, "charts"]), + # Get historical TVL of a chain + chain_tvl="/".join([_endpoint_preamble, "charts", "{_chain}"]), + # Get current TVL of a protocol + current_protocol_tvl="/".join([_endpoint_preamble, "tvl", "{_slug}"]), + # Get current TVL of all chains + all_chains_tvl="/".join([_endpoint_preamble, "chains"]), + ) + return urls + # TODO test: Check that the urls aren't broken now. + + def get_protocol_tvl_timeseries( + self, + asset_slugs: Union[str, List], + start_date: Union[str, datetime.datetime] = None, + end_date: Union[str, datetime.datetime] = None, + ) -> pd.DataFrame: """Returns times TVL of a protocol with token amounts as a pandas DataFrame. Returned DataFrame is indexed by df[protocol][chain][asset]. @@ -59,7 +81,7 @@ def get_protocol_tvl_timeseries(self, asset_slugs: Union[str, List], slug_df_list: List = [] for slug in slugs: - endpoint_url = DL_GET_PROTOCOL_TVL_URL.substitute(slug=slug) + endpoint_url = self.api_urls["get_protocol_tvl"].format(_slug=slug) protocol = self.get_response(endpoint_url) ########################### @@ -95,13 +117,15 @@ def get_protocol_tvl_timeseries(self, asset_slugs: Union[str, List], chain_tvl_tokens_usd_df = pd.DataFrame(chain_tvl_tokens_usd) # fix indexes - chain_tvl_df = format_df(chain_tvl_df) - chain_tvl_tokens_df = format_df(chain_tvl_tokens_df) - chain_tvl_tokens_usd_df = format_df(chain_tvl_tokens_usd_df) + chain_tvl_df = helpers.format_df(chain_tvl_df) + chain_tvl_tokens_df = helpers.format_df(chain_tvl_tokens_df) + chain_tvl_tokens_usd_df = helpers.format_df(chain_tvl_tokens_usd_df) chain_tvl_tokens_usd_df = chain_tvl_tokens_usd_df.add_suffix("_usd") # concat tokens and tokensInUsd - joint_tokens_df = pd.concat([chain_tvl_tokens_df, chain_tvl_tokens_usd_df], axis=1) + joint_tokens_df = pd.concat( + [chain_tvl_tokens_df, chain_tvl_tokens_usd_df], axis=1 + ) # Join total chain TVL w/ token TVL chain_df = chain_tvl_df.join(joint_tokens_df) chain_df_list.append(chain_df) @@ -119,7 +143,7 @@ def get_protocol_tvl_timeseries(self, asset_slugs: Union[str, List], token[key] = value token.pop("tokens", None) tokens_df = pd.DataFrame(tokens) - tokens_df = format_df(tokens_df) + tokens_df = helpers.format_df(tokens_df) ## tokens in USD tokens_usd = protocol["tokensInUsd"] @@ -128,13 +152,13 @@ def get_protocol_tvl_timeseries(self, asset_slugs: Union[str, List], token[key] = value token.pop("tokens", None) tokens_usd_df = pd.DataFrame(tokens_usd) - tokens_usd_df = format_df(tokens_usd_df) + tokens_usd_df = helpers.format_df(tokens_usd_df) tokens_usd_df = tokens_usd_df.add_suffix("_usd") # Get total tvl across chains tvl = protocol["tvl"] total_tvl_df = pd.DataFrame(tvl) - total_tvl_df = format_df(total_tvl_df) + total_tvl_df = helpers.format_df(total_tvl_df) # Working joint_tokens_df = pd.concat([tokens_df, tokens_usd_df], axis=1) @@ -150,11 +174,16 @@ def get_protocol_tvl_timeseries(self, asset_slugs: Union[str, List], total_slugs_df = pd.concat(slug_df_list, keys=slugs, axis=1) total_slugs_df.sort_index(inplace=True) - total_slugs_df = time_filter_df(total_slugs_df, start_date=start_date, end_date=end_date) + total_slugs_df = utils.time_filter_df( + total_slugs_df, start_date=start_date, end_date=end_date + ) return total_slugs_df - def get_global_tvl_timeseries(self, start_date: Union[str, datetime.datetime] = None, - end_date: Union[str, datetime.datetime] = None) -> pd.DataFrame: + def get_global_tvl_timeseries( + self, + start_date: Union[str, datetime.datetime] = None, + end_date: Union[str, datetime.datetime] = None, + ) -> pd.DataFrame: """Returns timeseries TVL from total of all Defi Llama supported protocols Parameters @@ -170,15 +199,20 @@ def get_global_tvl_timeseries(self, start_date: Union[str, datetime.datetime] = DataFrame DataFrame containing timeseries tvl data for every protocol """ - global_tvl = self.get_response(DL_GLOBAL_TVL_URL) + global_tvl = self.get_response(self.api_urls["global_tvl"]) global_tvl_df = pd.DataFrame(global_tvl) - global_tvl_df = format_df(global_tvl_df) - global_tvl_df = time_filter_df(global_tvl_df, start_date=start_date, end_date=end_date) + global_tvl_df = helpers.format_df(global_tvl_df) + global_tvl_df = utils.time_filter_df( + global_tvl_df, start_date=start_date, end_date=end_date + ) return global_tvl_df - def get_chain_tvl_timeseries(self, chains_in: Union[str, List], - start_date: Union[str, datetime.datetime] = None, - end_date: Union[str, datetime.datetime] = None) -> pd.DataFrame: + def get_chain_tvl_timeseries( + self, + chains_in: Union[str, List], + start_date: Union[str, datetime.datetime] = None, + end_date: Union[str, datetime.datetime] = None, + ) -> pd.DataFrame: """Retrive timeseries TVL for a given chain Parameters @@ -197,20 +231,22 @@ def get_chain_tvl_timeseries(self, chains_in: Union[str, List], DataFrame DataFrame containing timeseries tvl data for each chain """ - chains = validate_input(chains_in) + chains = utils.validate_input(chains_in) chain_df_list = [] for chain in chains: - endpoint_url = DL_CHAIN_TVL_URL.substitute(chain=chain) + endpoint_url = self.api_urls["chain_tvl"].format(_chain=chain) response = self.get_response(endpoint_url) chain_df = pd.DataFrame(response) - chain_df = format_df(chain_df) + chain_df = helpers.format_df(chain_df) chain_df_list.append(chain_df) # Join DataFrames from each chain & return chains_df = pd.concat(chain_df_list, axis=1) chains_df.columns = chains - chains_df = time_filter_df(chains_df, start_date=start_date, end_date=end_date) + chains_df = utils.time_filter_df( + chains_df, start_date=start_date, end_date=end_date + ) return chains_df def get_current_tvl(self, asset_slugs: Union[str, List]) -> Dict: @@ -226,11 +262,11 @@ def get_current_tvl(self, asset_slugs: Union[str, List]) -> Dict: DataFrame Pandas Series for tvl indexed by each slug {slug: tvl, ...} """ - slugs = validate_input(asset_slugs) + slugs = utils.validate_input(asset_slugs) tvl_dict = {} for slug in slugs: - endpoint_url = DL_CURRENT_PROTOCOL_TVL_URL.substitute(slug=slug) + endpoint_url = self.api_urls["current_protocol_tvl"].format(_slug=slug) tvl = self.get_response(endpoint_url) if isinstance(tvl, float): tvl_dict[slug] = tvl @@ -250,7 +286,7 @@ def get_protocols(self) -> pd.DataFrame: DataFrame DataFrame with one column per DeFi Llama supported protocol """ - protocols = self.get_response(DL_PROTOCOLS_URL) + protocols = self.get_response(self.api_urls["protocols"]) protocol_dict = {} for protocol in protocols: diff --git a/pycaw/messari/defillama/helpers.py b/pycaw/defillama/helpers.py similarity index 52% rename from pycaw/messari/defillama/helpers.py rename to pycaw/defillama/helpers.py index d2208bf..0a70fc5 100644 --- a/pycaw/messari/defillama/helpers.py +++ b/pycaw/defillama/helpers.py @@ -1,4 +1,8 @@ -"""This module is dedicated to helpers for the DeFiLlama class""" +"""A module with helper functions for the DeFiLlama class + +Methods: + format_df: Replaces dates and drops duplicates. +""" import pandas as pd @@ -7,27 +11,23 @@ def format_df(df_in: pd.DataFrame) -> pd.DataFrame: """format a typical DF from DL, replace date & drop duplicates - Parameters - ---------- - df_in: pd.DataFrame - input DataFrame + Args: + df_in (pd.DataFrame): input DataFrame - Returns - ------- - DataFrame - formated pandas DataFrame + Returns: + (pd.DataFrame): formated pandas DataFrame """ # set date to index df_new = df_in - if 'date' in df_in.columns: - df_new.set_index('date', inplace=True) - df_new.index = pd.to_datetime(df_new.index, unit='s', origin='unix') + if "date" in df_in.columns: + df_new.set_index("date", inplace=True) + df_new.index = pd.to_datetime(df_new.index, unit="s", origin="unix") df_new.index = df_new.index.date # drop duplicates # NOTE: sometimes DeFi Llama has duplicate dates, choosing to just keep the last # NOTE: Data for duplicates is not the same # TODO: Investigate which data should be kept (currently assuming last is more recent - df_new = df_new[~df_new.index.duplicated(keep='last')] + df_new = df_new[~df_new.index.duplicated(keep="last")] return df_new diff --git a/pycaw/etherscan/etherscan_connector.py b/pycaw/etherscan/etherscan_connector.py index 4eb22f8..f0c514b 100644 --- a/pycaw/etherscan/etherscan_connector.py +++ b/pycaw/etherscan/etherscan_connector.py @@ -26,12 +26,6 @@ def _validate_timestamp_format(self, timestamp: Union[int, str, pd.Timestamp]): raise NotImplementedError() # TODO - """ - @tenacity.retry(stop=tenacity.stop_after_attempt(3), - wait=tenacity.wait_exponential(min=0.1, max=5, multiplier=2)) - @ratelimit.sleep_and_retry - @ratelimit.limits(calls=30, period=1) # period (float) is in seconds. - """ def run_query(self, query: str, rate_limit: bool = True) -> Dict[str, Any]: """Func is wrapped with some ultimate limiters to ensure this method is never callled too much. However, the batch-call function should also diff --git a/pycaw/tests/__init__.py b/pycaw/tests/__init__.py index 81b6e24..ae2e9d6 100644 --- a/pycaw/tests/__init__.py +++ b/pycaw/tests/__init__.py @@ -1,3 +1,4 @@ """Tests for the pycaw package.""" import dotenv + dotenv.load_dotenv() diff --git a/pycaw/tests/cmc_test.py b/pycaw/tests/cmc_test.py new file mode 100644 index 0000000..771bcc5 --- /dev/null +++ b/pycaw/tests/cmc_test.py @@ -0,0 +1,71 @@ +#!/usr/bin/env python + +import os +import json +import pytest +from pycaw import cmc +from typing import Any, Dict, List, Union + + +class TestCoinMarketCapAPI: + @pytest.fixture + def cmc_api(self) -> cmc.CoinMarketCapAPI: + return cmc.CoinMarketCapAPI() + + def test_cmc_id_map(self, cmc_api: cmc.CoinMarketCapAPI): + symbols: List[str] = ["BTC", "ETH"] + cmc_id_maps: List[dict] = cmc_api.cmc_id_map(symbols=symbols) + assert isinstance(cmc_id_maps, list) + assert isinstance(cmc_id_maps[0], dict) + assert all([k in cmc_id_maps[0].keys() for k in ["id", "slug", "name"]]) + + @pytest.fixture + def cmc_id_maps(self) -> List[dict]: + return [ + { + "id": 1, + "name": "Bitcoin", + "symbol": "BTC", + "slug": "bitcoin", + "rank": 1, + "is_active": 1, + "first_historical_data": "2013-04-28T18:47:21.000Z", + "last_historical_data": "2021-11-19T00:59:02.000Z", + "platform": None, + }, + { + "id": 1027, + "name": "Ethereum", + "symbol": "ETH", + "slug": "ethereum", + "rank": 2, + "is_active": 1, + "first_historical_data": "2015-08-07T14:49:30.000Z", + "last_historical_data": "2021-11-19T00:59:02.000Z", + "platform": None, + }, + ] + + def test_save_cmc_id_maps( + self, cmc_api: cmc.CoinMarketCapAPI, cmc_id_maps: List[dict] + ): + """Tests whether the CMC ID Map query saves correctly.""" + + temp_filename: str = "temp-foo.json" + temp_save_path = temp_filename + + assert not os.path.exists(temp_save_path) + cmc_api._save_cmc_id_maps(cmc_id_maps=cmc_id_maps, filename=temp_filename) + with open(temp_save_path, mode="r") as f: + saved_cmc_id_maps: List[dict] = json.load(f) + assert isinstance(saved_cmc_id_maps, list) + assert len(saved_cmc_id_maps) == 2 + assert all( + [ + [k in dict_.keys() for k in ["id", "name", "symbol"]] + for dict_ in saved_cmc_id_maps + ] + ) + + os.remove(temp_save_path) + assert not os.path.exists(temp_save_path) diff --git a/pycaw/tests/defillama_test.py b/pycaw/tests/defillama_test.py new file mode 100644 index 0000000..2b8f163 --- /dev/null +++ b/pycaw/tests/defillama_test.py @@ -0,0 +1,49 @@ +import pandas as pd +from pycaw import defillama as dl + + +class TestDeFiLlama(): + """Test suite for the 'defillama.DeFiLlama' class.""" + + def test_init(self): + """Test initializing DeFiLlama class""" + dl_conn = dl.DeFiLlama() + assert isinstance(dl_conn, dl.DeFiLlama) + + def test_get_protocol_tvl(self): + """Test getting protocol tvl""" + dl_conn = dl.DeFiLlama() + tvl = dl_conn.get_protocol_tvl_timeseries( + ["aave", "compound"], start_date="2021-10-01", end_date="2021-10-10" + ) + assert isinstance(tvl, pd.DataFrame) + + def test_global_tvl(self): + """Test getting global tvl""" + dl_conn = dl.DeFiLlama() + global_tvl = dl_conn.get_global_tvl_timeseries( + start_date="2021-10-01", end_date="2021-10-10" + ) + assert isinstance(global_tvl, pd.DataFrame) + + def test_chain_tvl(self): + """Test getting chain tvl""" + dl_conn = dl.DeFiLlama() + chains = ["Avalanche", "Harmony", "Polygon"] + chain_tvl = dl_conn.get_chain_tvl_timeseries( + chains, start_date="2021-10-01", end_date="2021-10-10" + ) + assert isinstance(chain_tvl, pd.DataFrame) + + def test_current_tvl(self): + """Test getting current protocol tvl""" + dl_conn = dl.DeFiLlama() + protocols = ["uniswap", "curve", "aave"] + current_tvl = dl_conn.get_current_tvl(protocols) + assert isinstance(current_tvl, pd.DataFrame) + + def test_get_protocols(self): + """Test getting protocol info""" + dl_conn = dl.DeFiLlama() + protocols = dl_conn.get_protocols() + assert isinstance(protocols, pd.DataFrame) \ No newline at end of file diff --git a/pycaw/tests/test_eth.py b/pycaw/tests/eth_test.py similarity index 56% rename from pycaw/tests/test_eth.py rename to pycaw/tests/eth_test.py index 18d7d7d..5e3b724 100644 --- a/pycaw/tests/test_eth.py +++ b/pycaw/tests/eth_test.py @@ -5,17 +5,18 @@ from typing import Any, Dict, List, Optional, Union + class TestGasInfo: - def test_init(self): - tx_hash: str = "0xe61b59cbe8381dff1c3152545f63c7bbdf03f27411a183c12bd7f90b68daf27a" - raw_tx_receipt: dict = { - "cumulativeGasUsed": "0xd7e11b", - "gasUsed": "0x2043b"} + tx_hash: str = ( + "0xe61b59cbe8381dff1c3152545f63c7bbdf03f27411a183c12bd7f90b68daf27a" + ) + raw_tx_receipt: dict = {"cumulativeGasUsed": "0xd7e11b", "gasUsed": "0x2043b"} gas_info = eth.GasInfo( - gas_used_wei=int(raw_tx_receipt["gasUsed"], base=16), - gas_price_wei=int(raw_tx_receipt["cumulativeGasUsed"], base=16), - tx_hash=tx_hash) + gas_used_wei=int(raw_tx_receipt["gasUsed"], base=16), + gas_price_wei=int(raw_tx_receipt["cumulativeGasUsed"], base=16), + tx_hash=tx_hash, + ) assert gas_info assert gas_info.eth_price_usd is None - assert gas_info.timestamp is None \ No newline at end of file + assert gas_info.timestamp is None diff --git a/pycaw/tests/test_etherscan.py b/pycaw/tests/etherscan_test.py similarity index 74% rename from pycaw/tests/test_etherscan.py rename to pycaw/tests/etherscan_test.py index 9bb9515..8090f5e 100644 --- a/pycaw/tests/test_etherscan.py +++ b/pycaw/tests/etherscan_test.py @@ -13,13 +13,14 @@ class TestEtherscanConnector: - @pytest.fixture def connector(self) -> etherscan.EtherscanConnector: return etherscan.EtherscanConnector() def test_get_tx_receipt(self, connector: etherscan.EtherscanConnector): - tx_hash: str = "0x20f98d428f3452a858ddb0972628991f50c529fbc5883111d1db1e6ba2eb4121" + tx_hash: str = ( + "0x20f98d428f3452a858ddb0972628991f50c529fbc5883111d1db1e6ba2eb4121" + ) tx_receipt: Dict[str, Any] = connector.get_tx_receipt(tx_hash=tx_hash) assert tx_receipt @@ -29,65 +30,61 @@ def test_get_tx_receipt(self, connector: etherscan.EtherscanConnector): assert "please use API Key" in tx_receipt else: assert False - + def test_get_tx_gas_info(self, connector: etherscan.EtherscanConnector): - tx_hash: str = "0xe61b59cbe8381dff1c3152545f63c7bbdf03f27411a183c12bd7f90b68daf27a" + tx_hash: str = ( + "0xe61b59cbe8381dff1c3152545f63c7bbdf03f27411a183c12bd7f90b68daf27a" + ) gas_info: eth.GasInfo = connector.get_tx_gas_info(tx_hash=tx_hash) assert isinstance(gas_info, eth.GasInfo) - assert gas_info.gas_price_wei > 0 + assert gas_info.gas_price_wei > 0 if gas_info.eth_price_usd: - assert gas_info.eth_price_usd > 0 + assert gas_info.eth_price_usd > 0 else: assert gas_info.eth_price_usd is None assert gas_info.tx_hash class TestTokenInfoConnector: - @pytest.fixture def connector(self) -> etherscan.TokenInfoConnector: return etherscan.TokenInfoConnector() def test_single_query(self, connector: etherscan.TokenInfoConnector): token_id = "0x0e3a2a1f2146d86a604adc220b4967a898d7fe07" - token_info_map: TokenInfoMap = connector.get_token_info( - token_ids=token_id) + token_info_map: TokenInfoMap = connector.get_token_info(token_ids=token_id) assert isinstance(token_info_map, dict) assert all([isinstance(token_id, TokenID) for token_id in token_info_map]) - + @pytest.fixture - def token_info_map(self, connector: etherscan.TokenInfoConnector - ) -> TokenInfoMap: + def token_info_map(self, connector: etherscan.TokenInfoConnector) -> TokenInfoMap: token_id_wbtc: str = "0x2260fac5e5542a773aa44fbcfedf7c193bc2c599" token_id_card: str = "0x0e3a2a1f2146d86a604adc220b4967a898d7fe07" token_ids: List[str] = [token_id_wbtc, token_id_card] - token_info_map: TokenInfoMap = connector.get_token_info( - token_ids=token_ids) + token_info_map: TokenInfoMap = connector.get_token_info(token_ids=token_ids) assert isinstance(token_info_map, dict) assert all([isinstance(token_id, TokenID) for token_id in token_info_map]) return token_info_map - def test_save_token_info_json(self, - connector: etherscan.TokenInfoConnector, - token_info_map: TokenInfoMap, - save_dir: Optional[str] = None): - """ l. """ + def test_save_token_info_json( + self, + connector: etherscan.TokenInfoConnector, + token_info_map: TokenInfoMap, + save_dir: Optional[str] = None, + ): + """l.""" save_dir: str = "_test_temp" if not os.path.exists(save_dir): os.mkdir(save_dir) - connector.save_token_info_json(token_info_map=token_info_map, - save_dir=save_dir) + connector.save_token_info_json(token_info_map=token_info_map, save_dir=save_dir) save_path: str = os.path.join(save_dir, "token_info.json") - with open(save_path, mode='r') as f: + with open(save_path, mode="r") as f: map_from_file: TokenInfoMap = json.load(f) - + for token_id in token_info_map: - assert token_id in map_from_file, ( - f"Failed to save token id: {token_id}") + assert token_id in map_from_file, f"Failed to save token id: {token_id}" os.remove(path=save_path) - - diff --git a/pycaw/tests/test_ftmscan.py b/pycaw/tests/ftmscan_test.py similarity index 88% rename from pycaw/tests/test_ftmscan.py rename to pycaw/tests/ftmscan_test.py index a5f0b14..6d6c38f 100644 --- a/pycaw/tests/test_ftmscan.py +++ b/pycaw/tests/ftmscan_test.py @@ -2,8 +2,8 @@ from typing import List import pytest -class TestFTMScanConnector: +class TestFTMScanConnector: @pytest.fixture def ftmscan_api(self) -> ftmscan.FTMScanConnector: return ftmscan.FTMScanConnector() @@ -12,12 +12,13 @@ def test_connector(self, ftmscan_api: ftmscan.FTMScanConnector): address = "0x33e0e07ca86c869ade3fc9de9126f6c73dad105e" balance: float = ftmscan_api.account_balance_single_address(address=address) assert isinstance(balance, float) - assert balance >= 0 + assert balance >= 0 def test_tx_list(self, ftmscan_api: ftmscan.FTMScanConnector): address = "0xba821dc848803900C01BA7Ac1D7a034B95B1eD97" tx_receipts: List[ftmscan.TxReceipt] = ftmscan_api.tx_receipt_list( - address=address) + address=address + ) assert isinstance(tx_receipts, (list)) if not len(tx_receipts) > 0: @@ -25,4 +26,4 @@ def test_tx_list(self, ftmscan_api: ftmscan.FTMScanConnector): tx_receipt = tx_receipts[0] assert isinstance(tx_receipt, dict) - assert "gasPrice" in tx_receipt.keys() \ No newline at end of file + assert "gasPrice" in tx_receipt.keys() diff --git a/pycaw/tests/messari_test.py b/pycaw/tests/messari_test.py new file mode 100644 index 0000000..3da4fad --- /dev/null +++ b/pycaw/tests/messari_test.py @@ -0,0 +1,115 @@ +"""Tests for 'pycaw.messari'.""" +import os +import sys +import time +import pytest + +from pycaw import messari + +import pandas as pd +from typing import Dict, List + +API_KEY = os.getenv("MESSARI_API_KEY") +if API_KEY is None: + print("Please define MESSARI_API_KEY in your runtime enviornment") + sys.exit() + + +class TestMessari: + """This is a unit testing class for testing the Messari class""" + + @pytest.fixture + def messari_connector(self) -> messari.Messari: + """Test initialization of the Messari class""" + messari_connector = messari.Messari(API_KEY) + assert isinstance(messari_connector, messari.Messari) + return messari_connector + + def test_get_all_assets(self, messari_connector: messari.Messari): + """Test 'Messari.get_all_assets'.""" + + response_data = messari_connector.get_all_assets() + assert isinstance(response_data, Dict) + + response_data_df = messari_connector.get_all_assets( + asset_fields=["metrics"], to_dataframe=True + ) + assert isinstance(response_data_df, pd.DataFrame) + + metric = "mining_stats" + response_data_df_market_data = messari_connector.get_all_assets( + asset_metric=metric, to_dataframe=True + ) + assert isinstance(response_data_df_market_data, pd.DataFrame) + + def test_get_asset(self, messari_connector: messari.Messari): + """Test get asset""" + + assets: List[str] = ["bitcoin", "ethereum", "tether"] + + asset_metadata = messari_connector.get_asset(asset_slugs=assets) + assert all( + [col in asset_metadata.columns for col in ["name", "symbol", "slug", "id"]] + ) + + fields = ["id", "name"] + asset_metadata_filtered = messari_connector.get_asset( + asset_slugs=assets, asset_fields=fields + ) + assert all([col in asset_metadata_filtered.columns for col in ["name", "id"]]) + + def test_get_asset_profile(self, messari_connector: messari.Messari): + """Test 'Messari.get_asset_profile'.""" + + assets = ["bitcoin", "ethereum", "tether"] + + asset_profile_data = messari_connector.get_asset_profile(asset_slugs=assets) + assert isinstance(asset_profile_data, Dict) + details = asset_profile_data["bitcoin"][ + "profile_general_overview_project_details" + ] + assert isinstance(details, str) + asset = "Uniswap" + profile_metric = "investors" + governance_data = messari_connector.get_asset_profile( + asset_slugs=asset, asset_profile_metric=profile_metric + ) + assert isinstance(governance_data, Dict) + assert asset in governance_data + expected_fields: List[str] = [ + "id", + "slug", + "profile_investors_individuals", + "profile_investors_organizations", + ] + assert all([field in governance_data for field in expected_fields]) + + def test_get_asset_metric(self, messari_connector: messari.Messari): + """Test get asset metric""" + assets = ["bitcoin", "ethereum", "tether"] + asset_metric_df = messari_connector.get_asset_metrics(asset_slugs=assets) + assert isinstance(asset_metric_df, pd.DataFrame) + expected_fields: List[str] = ["id", "symbol", "market_data_price_usd"] + assert all([col in asset_metric_df.columns for col in expected_fields]) + + def test_get_asset_market_data(self, messari_connector: messari.Messari): + """Test get asset market date""" + assets = ["bitcoin", "ethereum", "tether"] + market_data = messari_connector.get_asset_market_data(asset_slugs=assets) + assert isinstance(market_data, pd.DataFrame) + + def test_get_all_markets(self, messari_connector: messari.Messari): + """Test get all markets""" + markets_df = messari_connector.get_all_markets() + assert isinstance(markets_df, pd.DataFrame) + + def test_get_metric_timeseries(self, messari_connector: messari.Messari): + """Test get metic timeseries""" + metric = "price" + start = "2020-06-01" + end = "2021-01-01" + assets = ["bitcoin", "ethereum", "tether"] + timeseries_df = messari_connector.get_metric_timeseries( + asset_slugs=assets, asset_metric=metric, start=start, end=end + ) + assert isinstance(timeseries_df, pd.DataFrame) diff --git a/pycaw/tests/test_cmc.py b/pycaw/tests/test_cmc.py deleted file mode 100644 index ab5b29b..0000000 --- a/pycaw/tests/test_cmc.py +++ /dev/null @@ -1,56 +0,0 @@ -#!/usr/bin/env python - -import os -import json -import pytest -from pycaw import cmc -from typing import Any, Dict, List, Union - - -class TestCoinMarketCapAPI: - - @pytest.fixture - def cmc_api(self) -> cmc.CoinMarketCapAPI: - return cmc.CoinMarketCapAPI() - - def test_cmc_id_map(self, cmc_api: cmc.CoinMarketCapAPI): - symbols: List[str] = ["BTC", "ETH"] - cmc_id_maps: List[dict] = cmc_api.cmc_id_map(symbols=symbols) - assert isinstance(cmc_id_maps, list) - assert isinstance(cmc_id_maps[0], dict) - assert all([k in cmc_id_maps[0].keys() for k in ['id', 'slug', 'name']]) - - @pytest.fixture - def cmc_id_maps(self) -> List[dict]: - return [ - {'id': 1, 'name': 'Bitcoin', 'symbol': 'BTC', 'slug': 'bitcoin', - 'rank': 1, 'is_active': 1, - 'first_historical_data': '2013-04-28T18:47:21.000Z', - 'last_historical_data': '2021-11-19T00:59:02.000Z', - 'platform': None}, - {'id': 1027, 'name': 'Ethereum', 'symbol': 'ETH', - 'slug': 'ethereum', 'rank': 2, 'is_active': 1, - 'first_historical_data': '2015-08-07T14:49:30.000Z', - 'last_historical_data': '2021-11-19T00:59:02.000Z', - 'platform': None}] - - def test_save_cmc_id_maps(self, - cmc_api: cmc.CoinMarketCapAPI, - cmc_id_maps: List[dict]): - """Tests whether the CMC ID Map query saves correctly.""" - - temp_filename: str = "temp-foo.json" - temp_save_path = temp_filename - - assert not os.path.exists(temp_save_path) - cmc_api._save_cmc_id_maps(cmc_id_maps=cmc_id_maps, - filename=temp_filename) - with open(temp_save_path, mode="r") as f: - saved_cmc_id_maps: List[dict] = json.load(f) - assert isinstance(saved_cmc_id_maps, list) - assert len(saved_cmc_id_maps) == 2 - assert all([[k in dict_.keys() for k in ["id", "name", "symbol"]] - for dict_ in saved_cmc_id_maps]) - - os.remove(temp_save_path) - assert not os.path.exists(temp_save_path) diff --git a/pycaw/tests/test_messari.py b/pycaw/tests/test_messari.py deleted file mode 100644 index 4173c89..0000000 --- a/pycaw/tests/test_messari.py +++ /dev/null @@ -1,20 +0,0 @@ -import pycaw -import pandas as pd -from typing import List - -#%% Price data - -from pycaw.messari import messari_api - -def prices_test_query(): - assets = ['btc', 'eth'] - metric = "price" - start = "2020-06-01" - end = "2020-07-01" - prices_df = messari_api.get_metric_timeseries( - asset_slugs=assets, asset_metric=metric, start=start, end=end) - breakpoint() - -# prices_test_query() # It works. Now, we just need parameters. - -