diff --git a/raiden/api/python.py b/raiden/api/python.py index bd9715cda5..a11552e980 100644 --- a/raiden/api/python.py +++ b/raiden/api/python.py @@ -268,11 +268,13 @@ def channel_deposit( # Wait until the `ChannelNewBalance` event is processed. with gevent.Timeout(poll_timeout, EthNodeCommunicationError(msg)): - waiting.wait_for_newbalance( + target_address = self.raiden.address + waiting.wait_for_participant_newbalance( self.raiden, registry_address, token_address, partner_address, + target_address, target_balance, self.raiden.alarm.wait_time, ) diff --git a/raiden/network/blockchain_service.py b/raiden/network/blockchain_service.py index 9bab67f3fb..cac1ed7c84 100644 --- a/raiden/network/blockchain_service.py +++ b/raiden/network/blockchain_service.py @@ -49,7 +49,7 @@ def block_number(self) -> int: return self.client.block_number() def is_synced(self) -> bool: - result = self.client.call('eth_syncing') + result = self.client.rpccall_with_retry('eth_syncing') # the node is synchronized if result is False: @@ -87,7 +87,7 @@ def estimate_blocktime(self, oldest: int = 256) -> float: def get_block_header(self, block_number: int): block_number = block_tag_encoder(block_number) - return self.client.call('eth_getBlockByNumber', block_number, False) + return self.client.rpccall_with_retry('eth_getBlockByNumber', block_number, False) def next_block(self) -> int: target_block_number = self.block_number() + 1 @@ -166,7 +166,7 @@ def registry(self, registry_address: bytes) -> Registry: return self.address_to_registry[registry_address] def uninstall_filter(self, filter_id_raw): - self.client.call('eth_uninstallFilter', filter_id_raw) + self.client.rpccall_with_retry('eth_uninstallFilter', filter_id_raw) def deploy_contract(self, contract_name, contract_path, constructor_parameters=None): contracts = _solidity.compile_file(contract_path, libraries=dict()) @@ -177,7 +177,6 @@ def deploy_contract(self, contract_name, contract_path, constructor_parameters=N ) proxy = self.client.deploy_solidity_contract( - self.node_address, contract_name, contracts, list(), diff --git a/raiden/network/rpc/client.py b/raiden/network/rpc/client.py index ec741800b1..f331806117 100644 --- a/raiden/network/rpc/client.py +++ b/raiden/network/rpc/client.py @@ -1,8 +1,8 @@ # -*- coding: utf-8 -*- import os import warnings -from time import time as now -from binascii import hexlify, unhexlify +import time +from binascii import unhexlify from typing import Optional, List, Dict, Union import rlp @@ -60,9 +60,14 @@ def check_address_has_code( """ Checks that the given address contains code. """ result = client.eth_getCode(address, 'latest') - if len(result) == 0: + if not result: + if contract_name: + formated_contract_name = '[{}]: '.format(contract_name) + else: + formated_contract_name = '' + raise AddressWithoutCode('{}Address {} does not contain code'.format( - '[{}]: '.format(contract_name) if contract_name else '', + formated_contract_name, address_encoder(address), )) @@ -122,7 +127,7 @@ def dependencies_order_of_build(target_contract, dependencies_map): return order -def format_data_for_call( +def format_data_for_rpccall( sender: Address = b'', to: Address = b'', value: int = 0, @@ -141,31 +146,6 @@ def format_data_for_call( } -def check_node_connection(func): - """ A decorator to reconnect if the connection to the node is lost. - Decorator should only wrap methods of the JSONRPCClient class""" - def retry_on_disconnect(self, *args, **kwargs): - for i, timeout in enumerate(timeout_two_stage(10, 3, 10)): - - if self.stop_event and self.stop_event.is_set(): - raise RaidenShuttingDown() - - try: - result = func(self, *args, **kwargs) - if i > 0: - log.info('Client reconnected') - return result - - except (requests.exceptions.ConnectionError, InvalidReplyError): - log.info( - 'Timeout in eth client connection to {}. Is the client offline? Trying ' - 'again in {}s.'.format(self.transport.endpoint, timeout) - ) - gevent.sleep(timeout) - - return retry_on_disconnect - - class JSONRPCClient: """ Ethereum JSON RPC client. @@ -187,6 +167,9 @@ def __init__( nonce_update_interval: float = 5.0, nonce_offset: int = 0): + if privkey is None or len(privkey) != 32: + raise ValueError('Invalid private key') + endpoint = 'http://{}:{}'.format(host, port) self.session = requests.Session() adapter = requests.adapters.HTTPAdapter(pool_maxsize=50) @@ -207,7 +190,7 @@ def __init__( self.stop_event = None self.nonce_last_update = 0 - self.nonce_current_value = None + self.nonce_available_value = None self.nonce_lock = Semaphore() self.nonce_update_interval = nonce_update_interval self.nonce_offset = nonce_offset @@ -234,72 +217,62 @@ def __repr__(self): def block_number(self): """ Return the most recent block. """ - return quantity_decoder(self.call('eth_blockNumber')) + return quantity_decoder(self.rpccall_with_retry('eth_blockNumber')) - def nonce(self, address): - if len(address) == 40: - address = unhexlify(address) + def nonce_needs_update(self): + if self.nonce_available_value is None: + return True - with self.nonce_lock: - initialized = self.nonce_current_value is not None - query_time = now() + now = time.time() - if self.nonce_last_update > query_time: - # Python's 2.7 time is not monotonic and it's affected by clock - # resets, force an update. - self.nonce_update_interval = query_time - self.nonce_update_interval - needs_update = True + # Python's 2.7 time is not monotonic and it's affected by clock resets, + # force an update. + if self.nonce_last_update > now: + return True - else: - last_update_interval = query_time - self.nonce_last_update - needs_update = last_update_interval > self.nonce_update_interval + return now - self.nonce_last_update > self.nonce_update_interval - if initialized and not needs_update: - self.nonce_current_value += 1 - return self.nonce_current_value + def nonce_update_from_node(self): + nonce = -2 + nonce_available_value = self.nonce_available_value or -1 - pending_transactions_hex = self.call( + # Wait until all tx are registered as pending + while nonce < nonce_available_value: + pending_transactions_hex = self.rpccall_with_retry( 'eth_getTransactionCount', - address_encoder(address), + address_encoder(self.sender), 'pending', ) pending_transactions = quantity_decoder(pending_transactions_hex) nonce = pending_transactions + self.nonce_offset - # we may have hammered the server and not all tx are - # registered as `pending` yet - if initialized: - while nonce < self.nonce_current_value: - log.debug( - 'nonce on server too low; retrying', - server=nonce, - local=self.nonce_current_value, - ) + log.debug( + 'updated nonce from server', + server=nonce, + local=nonce_available_value, + ) - query_time = now() - pending_transactions_hex = self.call( - 'eth_getTransactionCount', - address_encoder(address), - 'pending', - ) - pending_transactions = quantity_decoder(pending_transactions_hex) - nonce = pending_transactions + self.nonce_offset + self.nonce_last_update = time.time() + self.nonce_available_value = nonce - self.nonce_current_value = nonce - self.nonce_last_update = query_time + def nonce(self): + with self.nonce_lock: + if self.nonce_needs_update(): + self.nonce_update_from_node() - return self.nonce_current_value + self.nonce_available_value += 1 + return self.nonce_available_value - 1 def inject_stop_event(self, event): self.stop_event = event def balance(self, account: Address): """ Return the balance of the account of given address. """ - res = self.call('eth_getBalance', address_encoder(account), 'pending') + res = self.rpccall_with_retry('eth_getBalance', address_encoder(account), 'pending') return quantity_decoder(res) def _gaslimit(self, location='pending') -> int: - last_block = self.call('eth_getBlockByNumber', location, True) + last_block = self.rpccall_with_retry('eth_getBlockByNumber', location, True) gas_limit = quantity_decoder(last_block['gasLimit']) return gas_limit * 8 // 10 @@ -307,7 +280,7 @@ def _gasprice(self) -> int: if self.given_gas_price: return self.given_gas_price - gas_price = self.call('eth_gasPrice') + gas_price = self.rpccall_with_retry('eth_gasPrice') return quantity_decoder(gas_price) def check_startgas(self, startgas): @@ -333,7 +306,6 @@ def new_contract_proxy(self, contract_interface, contract_address: Address): def deploy_solidity_contract( self, # pylint: disable=too-many-locals - sender, contract_name, all_contracts, libraries, @@ -401,7 +373,6 @@ def deploy_solidity_contract( dependency_contract['bin'] = bytecode transaction_hash_hex = self.send_transaction( - sender, to=b'', data=bytecode, ) @@ -418,7 +389,7 @@ def deploy_solidity_contract( deployed_code = self.eth_getCode(address_decoder(contract_address)) - if len(deployed_code) == 0: + if not deployed_code: raise RuntimeError('Contract address has no code, check gas usage.') hex_bytecode = solidity_resolve_symbols(contract['bin_hex'], libraries) @@ -435,7 +406,6 @@ def deploy_solidity_contract( bytecode = contract['bin'] transaction_hash_hex = self.send_transaction( - sender, to=b'', data=bytecode, ) @@ -447,7 +417,7 @@ def deploy_solidity_contract( deployed_code = self.eth_getCode(address_decoder(contract_address)) - if len(deployed_code) == 0: + if not deployed_code: raise RuntimeError( 'Deployment of {} failed. Contract address has no code, check gas usage.'.format( contract_name, @@ -479,11 +449,11 @@ def new_filter(self, fromBlock=None, toBlock=None, address=None, topics=None): json_data['topics'] = [topic_encoder(topic) for topic in topics] - filter_id = self.call('eth_newFilter', json_data) + filter_id = self.rpccall_with_retry('eth_newFilter', json_data) return quantity_decoder(filter_id) def filter_changes(self, fid: int) -> List: - changes = self.call('eth_getFilterChanges', quantity_encoder(fid)) + changes = self.rpccall_with_retry('eth_getFilterChanges', quantity_encoder(fid)) if not changes: return list() @@ -504,12 +474,11 @@ def filter_changes(self, fid: int) -> List: for c in changes ] - @check_node_connection - def call(self, method: str, *args): + def rpccall_with_retry(self, method: str, *args): """ Do the request and return the result. Args: - method: The RPC method. + method: The JSON-RPC method. args: The encoded arguments expected by the method. - Object arguments must be supplied as a dictionary. - Quantity arguments must be hex encoded starting with '0x' and @@ -517,24 +486,47 @@ def call(self, method: str, *args): - Data arguments must be hex encoded starting with '0x' """ request = self.protocol.create_request(method, args) - reply = self.transport.send_message(request.serialize().encode()) + request_serialized = request.serialize().encode() - jsonrpc_reply = self.protocol.parse_reply(reply) - if isinstance(jsonrpc_reply, JSONRPCSuccessResponse): - return jsonrpc_reply.result - elif isinstance(jsonrpc_reply, JSONRPCErrorResponse): - raise EthNodeCommunicationError(jsonrpc_reply.error, jsonrpc_reply._jsonrpc_error_code) - else: - raise EthNodeCommunicationError('Unknown type of JSONRPC reply') + for i, timeout in enumerate(timeout_two_stage(10, 3, 10)): + if self.stop_event and self.stop_event.is_set(): + raise RaidenShuttingDown() + + try: + reply = self.transport.send_message(request_serialized) + except (requests.exceptions.ConnectionError, InvalidReplyError): + log.info( + 'Timeout in eth client connection to {}. Is the client offline? Trying ' + 'again in {}s.'.format(self.transport.endpoint, timeout) + ) + else: + if self.stop_event and self.stop_event.is_set(): + raise RaidenShuttingDown() + + if i > 0: + log.info('Client reconnected') + + jsonrpc_reply = self.protocol.parse_reply(reply) + + if isinstance(jsonrpc_reply, JSONRPCSuccessResponse): + return jsonrpc_reply.result + elif isinstance(jsonrpc_reply, JSONRPCErrorResponse): + raise EthNodeCommunicationError( + jsonrpc_reply.error, + jsonrpc_reply._jsonrpc_error_code, # pylint: disable=protected-access + ) + else: + raise EthNodeCommunicationError('Unknown type of JSONRPC reply') + + gevent.sleep(timeout) def send_transaction( self, - sender: Address, to: Address, value: int = 0, data: bytes = b'', startgas: int = None, - nonce: int = None): + ): """ Helper to send signed messages. This method will use the `privkey` provided in the constructor to @@ -542,74 +534,6 @@ def send_transaction( implementation that accepts the variables v, r, and s. """ - if not self.privkey and not sender: - raise ValueError('Either privkey or sender needs to be supplied.') - - if self.privkey: - privkey_address = privatekey_to_address(self.privkey) - sender = sender or privkey_address - - if sender != privkey_address: - raise ValueError('sender for a different privkey.') - - if nonce is None: - nonce = self.nonce(sender) - else: - if nonce is None: - nonce = 0 - - startgas = self.check_startgas(startgas) - - tx = Transaction(nonce, self.gasprice(), startgas, to=to, value=value, data=data) - - if self.privkey: - tx.sign(self.privkey) - result = self.call( - 'eth_sendRawTransaction', - data_encoder(rlp.encode(tx)), - ) - return result[2 if result.startswith('0x') else 0:] - - else: - - # rename the fields to match the eth_sendTransaction signature - tx_dict = tx.to_dict() - tx_dict.pop('hash') - tx_dict['sender'] = sender - tx_dict['gasPrice'] = tx_dict.pop('gasprice') - tx_dict['gas'] = tx_dict.pop('startgas') - - res = self.eth_sendTransaction(**tx_dict) - - assert len(res) in (20, 32) - return hexlify(res) - - def eth_sendTransaction( - self, - sender: Address = b'', - to: Address = b'', - value: int = 0, - data: bytes = b'', - gas: int = GAS_LIMIT, - nonce: int = None - ) -> bytes: - """ Creates new message call transaction or a contract creation, if the - data field contains code. - - Args: - sender: The address the transaction is sent from. - to: The address the transaction is directed to. - (optional when creating new contract) - gas: Gas provided for the transaction execution. It will - return unused gas. - gasPrice: gasPrice used for each unit of gas paid. - value: Value sent with this transaction. - data: The compiled code of a contract OR the hash of the - invoked method signature and encoded parameters. - nonce: This allows to overwrite your own pending transactions - that use the same nonce. - """ - if to == b'' and data.isalnum(): warnings.warn( 'Verify that the data parameter is _not_ hex encoded, if this is the case ' @@ -620,24 +544,24 @@ def eth_sendTransaction( if to == b'0' * 40: warnings.warn('For contract creation the empty string must be used.') - if sender is None: - raise ValueError('sender needs to be provided.') + nonce = self.nonce() + startgas = self.check_startgas(startgas) - json_data = format_data_for_call( - sender, - to, - value, - data, - gas, - self.gasprice() + tx = Transaction( + nonce, + self.gasprice(), + startgas, + to=to, + value=value, + data=data, ) - if nonce is not None: - json_data['nonce'] = quantity_encoder(nonce) - - res = self.call('eth_sendTransaction', json_data) - - return data_decoder(res) + tx.sign(self.privkey) + result = self.rpccall_with_retry( + 'eth_sendRawTransaction', + data_encoder(rlp.encode(tx)), + ) + return result[2 if result.startswith('0x') else 0:] def eth_call( self, @@ -665,7 +589,7 @@ def eth_call( call. """ startgas = self.check_startgas(startgas) - json_data = format_data_for_call( + json_data = format_data_for_rpccall( sender, to, value, @@ -673,7 +597,7 @@ def eth_call( startgas, self.gasprice(), ) - res = self.call('eth_call', json_data, block_number) + res = self.rpccall_with_retry('eth_call', json_data, block_number) return data_decoder(res) @@ -702,7 +626,7 @@ def eth_estimateGas( call. """ startgas = self.check_startgas(startgas) - json_data = format_data_for_call( + json_data = format_data_for_rpccall( sender, to, value, @@ -710,7 +634,7 @@ def eth_estimateGas( startgas ) try: - res = self.call('eth_estimateGas', json_data) + res = self.rpccall_with_retry('eth_estimateGas', json_data) except EthNodeCommunicationError as e: tx_would_fail = e.error_code and e.error_code in (-32015, -32000) if tx_would_fail: # -32015 is parity and -32000 is geth @@ -742,7 +666,7 @@ def eth_getTransactionReceipt(self, transaction_hash: bytes) -> Dict: ) transaction_hash = data_encoder(transaction_hash) - return self.call('eth_getTransactionReceipt', transaction_hash) + return self.rpccall_with_retry('eth_getTransactionReceipt', transaction_hash) def eth_getCode(self, code_address: Address, block: Union[int, str] = 'latest') -> bytes: """ Returns code at a given address. @@ -763,7 +687,7 @@ def eth_getCode(self, code_address: Address, block: Union[int, str] = 'latest') 'address length must be 20 (it might be hex encoded)' ) - result = self.call('eth_getCode', address_encoder(code_address), block) + result = self.rpccall_with_retry('eth_getCode', address_encoder(code_address), block) return data_decoder(result) def eth_getTransactionByHash(self, transaction_hash: bytes): @@ -783,7 +707,7 @@ def eth_getTransactionByHash(self, transaction_hash: bytes): ) transaction_hash = data_encoder(transaction_hash) - return self.call('eth_getTransactionByHash', transaction_hash) + return self.rpccall_with_retry('eth_getTransactionByHash', transaction_hash) def poll( self, @@ -830,7 +754,10 @@ def poll( while True: # Could return None for a short period of time, until the # transaction is added to the pool - transaction = self.call('eth_getTransactionByHash', transaction_hash) + transaction = self.rpccall_with_retry( + 'eth_getTransactionByHash', + transaction_hash, + ) # if the transaction was added to the pool and then removed if transaction is None and last_result is not None: diff --git a/raiden/network/rpc/filters.py b/raiden/network/rpc/filters.py index e3f749f1f4..601ca12457 100644 --- a/raiden/network/rpc/filters.py +++ b/raiden/network/rpc/filters.py @@ -39,7 +39,7 @@ def new_filter( for topic in topics ] - return jsonrpc_client.call('eth_newFilter', json_data) + return jsonrpc_client.rpccall_with_retry('eth_newFilter', json_data) def get_filter_events( @@ -70,7 +70,7 @@ def get_filter_events( for topic in topics ] - filter_changes = jsonrpc_client.call('eth_getLogs', json_data) + filter_changes = jsonrpc_client.rpccall_with_retry('eth_getLogs', json_data) # geth could return None if filter_changes is None: @@ -106,7 +106,7 @@ def __init__(self, jsonrpc_client: JSONRPCClient, filter_id_raw: int): self.client = jsonrpc_client def _query_filter(self, function: str) -> List[Dict]: - filter_changes = self.client.call(function, self.filter_id_raw) + filter_changes = self.client.rpccall_with_retry(function, self.filter_id_raw) # geth could return None if filter_changes is None: @@ -142,4 +142,4 @@ def getall(self) -> List[Dict]: return self._query_filter('eth_getFilterLogs') def uninstall(self): - self.client.call('eth_uninstallFilter', self.filter_id_raw) + self.client.rpccall_with_retry('eth_uninstallFilter', self.filter_id_raw) diff --git a/raiden/network/rpc/smartcontract_proxy.py b/raiden/network/rpc/smartcontract_proxy.py index e7cc1fb50b..88c5144cfa 100644 --- a/raiden/network/rpc/smartcontract_proxy.py +++ b/raiden/network/rpc/smartcontract_proxy.py @@ -48,7 +48,6 @@ def transact(self, function_name: str, *args, **kargs): data = self.translator.encode_function_call(function_name, args) txhash = self.transaction_function( - sender=self.sender, to=self.contract_address, value=kargs.pop('value', 0), data=data, diff --git a/raiden/network/rpc/transactions.py b/raiden/network/rpc/transactions.py index bc4d6a1ed5..b383981b64 100644 --- a/raiden/network/rpc/transactions.py +++ b/raiden/network/rpc/transactions.py @@ -11,7 +11,7 @@ def check_transaction_threw(client, transaction_hash): transaction's status indicator is 0x0. """ encoded_transaction = data_encoder(unhexlify(transaction_hash)) - receipt = client.call('eth_getTransactionReceipt', encoded_transaction) + receipt = client.rpccall_with_retry('eth_getTransactionReceipt', encoded_transaction) if 'status' not in receipt: raise ValueError( diff --git a/raiden/tests/api/test_restapi.py b/raiden/tests/api/test_restapi.py index f08d19ca50..3abef391d3 100644 --- a/raiden/tests/api/test_restapi.py +++ b/raiden/tests/api/test_restapi.py @@ -581,6 +581,7 @@ def test_api_transfers(api_backend, raiden_network, token_addresses): @pytest.mark.parametrize('number_of_tokens', [0]) @pytest.mark.parametrize('number_of_nodes', [1]) +@pytest.mark.parametrize('channels_per_node', [0]) def test_register_token(api_backend, token_amount, token_addresses, raiden_network): app0 = raiden_network[0] new_token_address = app0.raiden.chain.deploy_contract( diff --git a/raiden/tests/fixtures/blockchain.py b/raiden/tests/fixtures/blockchain.py index e7cc4d5cdb..518417e2fc 100644 --- a/raiden/tests/fixtures/blockchain.py +++ b/raiden/tests/fixtures/blockchain.py @@ -4,6 +4,7 @@ from os import path from collections import namedtuple +import gevent import pytest from ethereum import slogging from ethereum.tools._solidity import compile_file @@ -31,6 +32,7 @@ create_apps, create_network_channels, create_sequential_channels, + netting_channel_open_and_deposit, ) from raiden.settings import GAS_PRICE @@ -104,7 +106,7 @@ def cached_genesis(request): """ if not request.getfixturevalue('blockchain_cache'): - return + return None # this will create the tester _and_ deploy the Registry deploy_key = request.getfixturevalue('deploy_key') @@ -162,23 +164,43 @@ def cached_genesis(request): ) if 'raiden_network' in request.fixturenames: - create_network_channels( + app_channels = create_network_channels( raiden_apps, - token_contract_addresses, request.getfixturevalue('channels_per_node'), - request.getfixturevalue('deposit'), - request.getfixturevalue('settle_timeout'), ) + greenlets = [] + for token_address in token_contract_addresses: + for app_pair in app_channels: + greenlets.append(gevent.spawn( + netting_channel_open_and_deposit, + app_pair[0], + app_pair[1], + token_address, + request.getfixturevalue('deposit'), + request.getfixturevalue('settle_timeout'), + )) + gevent.wait(greenlets) + elif 'raiden_chain' in request.fixturenames: - create_sequential_channels( + app_channels = create_sequential_channels( raiden_apps, - token_contract_addresses[0], request.getfixturevalue('channels_per_node'), - request.getfixturevalue('deposit'), - request.getfixturevalue('settle_timeout'), ) + greenlets = [] + for token_address in token_contract_addresses: + for app_pair in app_channels: + greenlets.append(gevent.spawn( + netting_channel_open_and_deposit, + app_pair[0], + app_pair[1], + token_address, + request.getfixturevalue('deposit'), + request.getfixturevalue('settle_timeout'), + )) + gevent.wait(greenlets) + # else: a test that is not creating channels for app in raiden_apps: @@ -441,14 +463,11 @@ def _jsonrpc_services( # we cannot instantiate BlockChainService without a registry, so first # deploy it directly with a JSONRPCClient if registry_address is None: - address = privatekey_to_address(deploy_key) - registry_path = get_contract_path('Registry.sol') registry_contracts = compile_file(registry_path, libraries=dict()) log.info('Deploying registry contract') registry_proxy = deploy_client.deploy_solidity_contract( - address, 'Registry', registry_contracts, dict(), diff --git a/raiden/tests/fixtures/raiden_network.py b/raiden/tests/fixtures/raiden_network.py index f7de22ae2a..a588f59c0d 100644 --- a/raiden/tests/fixtures/raiden_network.py +++ b/raiden/tests/fixtures/raiden_network.py @@ -3,15 +3,15 @@ import pytest from ethereum import slogging +from raiden import waiting from raiden.exceptions import RaidenShuttingDown -from raiden.transfer import views -from raiden.network.protocol import NODE_NETWORK_REACHABLE from raiden.tests.utils.tests import cleanup_tasks from raiden.tests.utils.network import ( CHAIN, create_apps, create_network_channels, create_sequential_channels, + netting_channel_open_and_deposit, ) log = slogging.getLogger(__name__) # pylint: disable=invalid-name @@ -34,30 +34,83 @@ def _cleanup(): request.addfinalizer(_cleanup) -def wait_for_partners(app_list, sleep=0.5, timeout=10): - waiting = list(app_list) +def wait_for_usable_channel( + app0, + app1, + registry_address, + token_address, + our_deposit, + partner_deposit, + events_poll_timeout=0.5, +): + """ Wait until the channel from app0 to app1 is usable. - while waiting: - # Poll the events to register the new channels - app = waiting[0] - app.raiden.poll_blockchain_events() + The channel and the deposits are registered, and the partner network state + is reachable. + """ + waiting.wait_for_newchannel( + app0.raiden, + registry_address, + token_address, + app1.raiden.address, + events_poll_timeout, + ) - node_state = views.state_from_app(app) - network_statuses = views.get_networkstatuses(node_state) + waiting.wait_for_participant_newbalance( + app0.raiden, + registry_address, + token_address, + app1.raiden.address, + app0.raiden.address, + our_deposit, + events_poll_timeout, + ) - all_healthy = all( - status == NODE_NETWORK_REACHABLE - for status in network_statuses.values() - ) + waiting.wait_for_participant_newbalance( + app0.raiden, + registry_address, + token_address, + app1.raiden.address, + app1.raiden.address, + partner_deposit, + events_poll_timeout, + ) + + waiting.wait_for_healthy( + app0.raiden, + app1.raiden.address, + events_poll_timeout, + ) - if timeout <= 0: - raise RuntimeError('fixture setup failed, nodes are unreachable') - if all_healthy: - waiting.pop(0) - else: - timeout -= sleep - gevent.sleep(sleep) +def wait_for_channels( + app_channels, + registry_address, + token_addresses, + deposit, + events_poll_timeout=0.5, +): + """ Wait until all channels are usable from both directions. """ + for app0, app1 in app_channels: + for token_address in token_addresses: + wait_for_usable_channel( + app0, + app1, + registry_address, + token_address, + deposit, + deposit, + events_poll_timeout, + ) + wait_for_usable_channel( + app1, + app0, + registry_address, + token_address, + deposit, + deposit, + events_poll_timeout, + ) @pytest.fixture @@ -108,19 +161,37 @@ def raiden_chain( nat_keepalive_timeout, ) + for app in raiden_apps: + app.raiden.register_payment_network(app.raiden.default_registry.address) + + app_channels = create_sequential_channels( + raiden_apps, + channels_per_node, + ) + if not cached_genesis: - create_sequential_channels( - raiden_apps, - token_addresses[0], - channels_per_node, + greenlets = [] + for token_address in token_addresses: + for app_pair in app_channels: + greenlets.append(gevent.spawn( + netting_channel_open_and_deposit, + app_pair[0], + app_pair[1], + token_address, + deposit, + settle_timeout, + )) + gevent.wait(greenlets) + + exception = RuntimeError('fixture setup failed, nodes are unreachable') + with gevent.Timeout(seconds=30, exception=exception): + wait_for_channels( + app_channels, + blockchain_services.deploy_registry.address, + token_addresses, deposit, - settle_timeout, ) - for app in raiden_apps: - app.raiden.register_payment_network(app.raiden.default_registry.address) - - wait_for_partners(raiden_apps) _raiden_cleanup(request, raiden_apps) return raiden_apps @@ -166,22 +237,37 @@ def raiden_network( nat_keepalive_timeout, ) + app_channels = create_network_channels( + raiden_apps, + channels_per_node, + ) + if not cached_genesis: - create_network_channels( - raiden_apps, + greenlets = [] + for token_address in token_addresses: + for app_pair in app_channels: + greenlets.append(gevent.spawn( + netting_channel_open_and_deposit, + app_pair[0], + app_pair[1], + token_address, + deposit, + settle_timeout, + )) + gevent.wait(greenlets) + + exception = RuntimeError('fixture setup failed, nodes are unreachable') + with gevent.Timeout(seconds=30, exception=exception): + wait_for_channels( + app_channels, + blockchain_services.deploy_registry.address, token_addresses, - channels_per_node, deposit, - settle_timeout ) - wait_for_partners(raiden_apps) _raiden_cleanup(request, raiden_apps) - # The block_number is primed on the app creation, but after the app is - # created all the channels are deployed, for the tester implementation this - # will advance the block_number with synchronous execution, making the - # apps' block_number to greatly fall behind. + # Force blocknumber update for the tester backend if not cached_genesis: for app in raiden_apps: app.raiden.alarm.poll_for_new_block() diff --git a/raiden/tests/integration/rpc/test_assumptions.py b/raiden/tests/integration/rpc/test_assumptions.py index 7525a8aff4..6fb576981c 100644 --- a/raiden/tests/integration/rpc/test_assumptions.py +++ b/raiden/tests/integration/rpc/test_assumptions.py @@ -19,7 +19,6 @@ def deploy_rpc_test_contract(deploy_client): contracts = _solidity.compile_file(contract_path, libraries=dict()) contract_proxy = deploy_client.deploy_solidity_contract( - deploy_client.sender, 'RpcTest', contracts, libraries=dict(), diff --git a/raiden/tests/integration/test_blockchainservice.py b/raiden/tests/integration/test_blockchainservice.py index 51da440747..a9dbaa8212 100644 --- a/raiden/tests/integration/test_blockchainservice.py +++ b/raiden/tests/integration/test_blockchainservice.py @@ -226,7 +226,6 @@ def test_blockchain( humantoken_path = get_contract_path('HumanStandardToken.sol') humantoken_contracts = compile_file(humantoken_path, libraries=dict()) token_proxy = jsonrpc_client.deploy_solidity_contract( - address, 'HumanStandardToken', humantoken_contracts, list(), @@ -238,7 +237,6 @@ def test_blockchain( registry_path = get_contract_path('Registry.sol') registry_contracts = compile_file(registry_path) registry_proxy = jsonrpc_client.deploy_solidity_contract( - address, 'Registry', registry_contracts, list(), @@ -247,7 +245,7 @@ def test_blockchain( timeout=poll_timeout, ) - log_list = jsonrpc_client.call( + log_list = jsonrpc_client.rpccall_with_retry( 'eth_getLogs', { 'fromBlock': '0x0', @@ -266,7 +264,7 @@ def test_blockchain( assert len(registry_proxy.call('tokenAddresses')) == 1 - log_list = jsonrpc_client.call( + log_list = jsonrpc_client.rpccall_with_retry( 'eth_getLogs', { 'fromBlock': '0x0', @@ -308,7 +306,7 @@ def test_blockchain( ) jsonrpc_client.poll(unhexlify(transaction_hash), timeout=poll_timeout) - log_list = jsonrpc_client.call( + log_list = jsonrpc_client.rpccall_with_retry( 'eth_getLogs', { 'fromBlock': '0x0', diff --git a/raiden/tests/integration/test_pythonapi.py b/raiden/tests/integration/test_pythonapi.py index 770e726c92..0cb4fec4b4 100644 --- a/raiden/tests/integration/test_pythonapi.py +++ b/raiden/tests/integration/test_pythonapi.py @@ -2,7 +2,6 @@ import pytest import gevent -from raiden import waiting from raiden.api.python import RaidenAPI from raiden.exceptions import ( AlreadyRegisteredTokenAddress, @@ -186,28 +185,9 @@ def test_token_swap(raiden_network, deposit, token_addresses): @pytest.mark.parametrize('blockchain_cache', [False]) @pytest.mark.parametrize('channels_per_node', [1]) @pytest.mark.parametrize('number_of_nodes', [2]) -def test_api_channel_events(raiden_chain, token_addresses, deposit, events_poll_timeout): +def test_api_channel_events(raiden_chain, token_addresses): app0, app1 = raiden_chain token_address = token_addresses[0] - registry_address = app0.raiden.default_registry.address - - # Without blockchain_cache we need to wait for the events to be processed - waiting.wait_for_newbalance( - app0.raiden, - registry_address, - token_address, - app1.raiden.address, - deposit, - events_poll_timeout, - ) - waiting.wait_for_newbalance( - app1.raiden, - registry_address, - token_address, - app0.raiden.address, - deposit, - events_poll_timeout, - ) amount = 30 direct_transfer( diff --git a/raiden/tests/integration/test_regression.py b/raiden/tests/integration/test_regression.py index 8bb0b42c48..da9b26e668 100644 --- a/raiden/tests/integration/test_regression.py +++ b/raiden/tests/integration/test_regression.py @@ -13,9 +13,9 @@ ) from raiden.tests.fixtures.raiden_network import ( CHAIN, - wait_for_partners, + wait_for_channels, ) -from raiden.tests.utils.network import setup_channels +from raiden.tests.utils.network import netting_channel_open_and_deposit from raiden.tests.utils.transfer import get_channelstate from raiden.transfer.mediated_transfer.events import SendRevealSecret from raiden.transfer.state import EMPTY_MERKLE_ROOT @@ -27,7 +27,12 @@ @pytest.mark.parametrize('number_of_nodes', [5]) @pytest.mark.parametrize('channels_per_node', [0]) @pytest.mark.parametrize('settle_timeout', [32]) # default settlement is too low for 3 hops -def test_regression_unfiltered_routes(raiden_network, token_addresses, settle_timeout, deposit): +def test_regression_unfiltered_routes( + raiden_network, + token_addresses, + settle_timeout, + deposit, +): """ The transfer should proceed without triggering an assert. Transfers failed in networks where two or more paths to the destination are @@ -35,6 +40,7 @@ def test_regression_unfiltered_routes(raiden_network, token_addresses, settle_ti """ app0, app1, app2, app3, app4 = raiden_network token = token_addresses[0] + registry_address = app0.raiden.default_registry.address # Topology: # @@ -49,16 +55,25 @@ def test_regression_unfiltered_routes(raiden_network, token_addresses, settle_ti (app2, app4), ] - setup_channels( - token, + greenlets = [] + for first_app, second_app in app_channels: + greenlets.append(gevent.spawn( + netting_channel_open_and_deposit, + first_app, + second_app, + token, + deposit, + settle_timeout, + )) + gevent.wait(greenlets) + + wait_for_channels( app_channels, + registry_address, + [token], deposit, - settle_timeout, ) - # poll the channel manager events - wait_for_partners(raiden_network) - transfer = app0.raiden.mediated_transfer_async( token_address=token, amount=1, diff --git a/raiden/tests/utils/blockchain.py b/raiden/tests/utils/blockchain.py index bb732560e2..7dc81b9f05 100644 --- a/raiden/tests/utils/blockchain.py +++ b/raiden/tests/utils/blockchain.py @@ -189,7 +189,7 @@ def geth_wait_and_check(deploy_client, privatekeys, random_marker): tries = 5 while not jsonrpc_running and tries > 0: try: - block = deploy_client.call('eth_getBlockByNumber', '0x0', True) + block = deploy_client.rpccall_with_retry('eth_getBlockByNumber', '0x0', True) except ConnectionError: gevent.sleep(0.5) tries -= 1 @@ -211,7 +211,7 @@ def geth_wait_and_check(deploy_client, privatekeys, random_marker): tries = 10 balance = '0x0' while balance == '0x0' and tries > 0: - balance = deploy_client.call('eth_getBalance', address, 'latest') + balance = deploy_client.rpccall_with_retry('eth_getBalance', address, 'latest') gevent.sleep(1) tries -= 1 diff --git a/raiden/tests/utils/network.py b/raiden/tests/utils/network.py index cef6167ab4..12a5e316b5 100644 --- a/raiden/tests/utils/network.py +++ b/raiden/tests/utils/network.py @@ -14,10 +14,15 @@ CHAIN = object() # Flag used by create a network does make a loop with the channels -def check_channel(app1, app2, netting_channel_address, deposit_amount): +def check_channel(app1, app2, netting_channel_address, settle_timeout, deposit_amount): netcontract1 = app1.raiden.chain.netting_channel(netting_channel_address) netcontract2 = app2.raiden.chain.netting_channel(netting_channel_address) + # Check a valid settle timeout was used, the netting contract has an + # enforced minimum and maximum + assert settle_timeout == netcontract1.settle_timeout() + assert settle_timeout == netcontract2.settle_timeout() + if deposit_amount > 0: assert netcontract1.can_transfer() assert netcontract2.can_transfer() @@ -31,56 +36,48 @@ def check_channel(app1, app2, netting_channel_address, deposit_amount): assert app1_details['our_balance'] == app2_details['partner_balance'] assert app1_details['partner_balance'] == app2_details['our_balance'] + assert app1_details['our_balance'] == deposit_amount + assert app1_details['partner_balance'] == deposit_amount + assert app2_details['our_balance'] == deposit_amount + assert app2_details['partner_balance'] == deposit_amount -def setup_channels(token_address, app_pairs, deposit, settle_timeout): - for first, second in app_pairs: - assert token_address - - manager = first.raiden.default_registry.manager_by_token(token_address) - - netcontract_address = manager.new_netting_channel( - second.raiden.address, - settle_timeout, - ) - - assert netcontract_address - - # use each app's own chain because of the private key / local signing - for app in [first, second]: - token = app.raiden.chain.token(token_address) - netting_channel = app.raiden.chain.netting_channel(netcontract_address) - previous_balance = token.balance_of(app.raiden.address) - - assert previous_balance >= deposit - - token.approve(netcontract_address, deposit) - netting_channel.deposit(deposit) - new_balance = token.balance_of(app.raiden.address) +def netting_channel_open_and_deposit(app0, app1, token_address, deposit, settle_timeout): + """ Open a new channel with app0 and app1 as participants """ + assert token_address - assert previous_balance - deposit == new_balance - - # netting contract does allow settle time lower than 30 - contract_settle_timeout = netting_channel.settle_timeout() - assert contract_settle_timeout == max(6, settle_timeout) - - check_channel( - first, - second, - netcontract_address, - deposit, - ) - - first_netting_channel = first.raiden.chain.netting_channel(netcontract_address) - second_netting_channel = second.raiden.chain.netting_channel(netcontract_address) - - details1 = first_netting_channel.detail() - details2 = second_netting_channel.detail() - - assert details1['our_balance'] == deposit - assert details1['partner_balance'] == deposit - assert details2['our_balance'] == deposit - assert details2['partner_balance'] == deposit + manager = app0.raiden.default_registry.manager_by_token(token_address) + netcontract_address = manager.new_netting_channel( + app1.raiden.address, + settle_timeout, + ) + assert netcontract_address + + for app in [app0, app1]: + # Use each app's own chain because of the private key / local signing + token = app.raiden.chain.token(token_address) + netting_channel = app.raiden.chain.netting_channel(netcontract_address) + + # This check can succeed and the deposit still fail, if channels are + # openned in parallel + previous_balance = token.balance_of(app.raiden.address) + assert previous_balance >= deposit + + token.approve(netcontract_address, deposit) + netting_channel.deposit(deposit) + + # Balance must decrease by at least but not exactly `deposit` amount, + # because channels can be openned in parallel + new_balance = token.balance_of(app.raiden.address) + assert new_balance <= previous_balance - deposit + + check_channel( + app0, + app1, + netcontract_address, + settle_timeout, + deposit, + ) def network_with_minimum_channels(apps, channels_per_node): @@ -95,6 +92,9 @@ def network_with_minimum_channels(apps, channels_per_node): if channels_per_node > len(apps): raise ValueError("Can't create more channels than nodes") + if len(apps) == 1: + raise ValueError("Can't create channels with only one node") + # If we use random nodes we can hit some edge cases, like the # following: # @@ -143,38 +143,23 @@ def network_with_minimum_channels(apps, channels_per_node): yield curr_app, least_connect -def create_network_channels( - raiden_apps, - token_addresses, - channels_per_node, - deposit, - settle_timeout): - +def create_network_channels(raiden_apps, channels_per_node): num_nodes = len(raiden_apps) if channels_per_node is not CHAIN and channels_per_node > num_nodes: raise ValueError("Can't create more channels than nodes") - for token in token_addresses: - if channels_per_node == CHAIN: - app_channels = list(zip(raiden_apps[:-1], raiden_apps[1:])) - else: - app_channels = list(network_with_minimum_channels(raiden_apps, channels_per_node)) - - setup_channels( - token, - app_channels, - deposit, - settle_timeout, - ) + if channels_per_node == 0: + app_channels = [] + elif channels_per_node == CHAIN: + app_channels = list(zip(raiden_apps[:-1], raiden_apps[1:])) + else: + app_channels = list(network_with_minimum_channels(raiden_apps, channels_per_node)) + return app_channels -def create_sequential_channels( - raiden_apps, - token_addresses, - channels_per_node, - deposit, - settle_timeout): + +def create_sequential_channels(raiden_apps, channels_per_node): """ Create a fully connected network with `num_nodes`, the nodes are connect sequentially. @@ -206,12 +191,7 @@ def create_sequential_channels( if channels_per_node == CHAIN: app_channels = list(zip(raiden_apps[:-1], raiden_apps[1:])) - setup_channels( - token_addresses, - app_channels, - deposit, - settle_timeout, - ) + return app_channels def create_apps( diff --git a/raiden/ui/cli.py b/raiden/ui/cli.py index 3fd9ada8bf..917ebbc2ff 100644 --- a/raiden/ui/cli.py +++ b/raiden/ui/cli.py @@ -96,7 +96,7 @@ def toggle_trace_profiler(raiden): def check_json_rpc(client): try: - client_version = client.call('web3_clientVersion') + client_version = client.rpccall_with_retry('web3_clientVersion') except (requests.exceptions.ConnectionError, EthNodeCommunicationError): print( '\n' @@ -559,7 +559,7 @@ def app( check_json_rpc(rpc_client) check_discovery_registration_gas(blockchain_service, address) - net_id = int(blockchain_service.client.call('net_version')) + net_id = int(blockchain_service.client.rpccall_with_retry('net_version')) if sync_check: check_synced(net_id, blockchain_service) diff --git a/raiden/ui/console.py b/raiden/ui/console.py index 2f1611dfef..147a82d0cb 100644 --- a/raiden/ui/console.py +++ b/raiden/ui/console.py @@ -321,12 +321,13 @@ def create_token( contract_path = get_contract_path('HumanStandardToken.sol') # Deploy a new ERC20 token token_proxy = self._chain.client.deploy_solidity_contract( - self._raiden.address, 'HumanStandardToken', + 'HumanStandardToken', compile_file(contract_path), dict(), (initial_alloc, name, decimals, symbol), contract_path=contract_path, - timeout=timeout) + timeout=timeout, + ) token_address_hex = hexlify(token_proxy.contract_address) if auto_register: self.register_token(token_address_hex) diff --git a/raiden/utils/events.py b/raiden/utils/events.py index e504099f2c..c60a310117 100644 --- a/raiden/utils/events.py +++ b/raiden/utils/events.py @@ -29,7 +29,7 @@ def all_contract_events_raw( Returns: events """ - return rpc.call('eth_getLogs', { + return rpc.rpccall_with_retry('eth_getLogs', { 'fromBlock': str(start_block), 'toBlock': str(end_block), 'address': address_encoder(normalize_address(contract_address)), diff --git a/raiden/waiting.py b/raiden/waiting.py index 1c701931b4..c4d347df41 100644 --- a/raiden/waiting.py +++ b/raiden/waiting.py @@ -2,6 +2,7 @@ import gevent from ethereum import slogging +from raiden.network.protocol import NODE_NETWORK_REACHABLE from raiden.transfer.state import ( CHANNEL_STATE_SETTLED, CHANNEL_AFTER_CLOSE_STATES, @@ -39,11 +40,12 @@ def wait_for_newchannel( ) -def wait_for_newbalance( +def wait_for_participant_newbalance( raiden, payment_network_id, token_address, partner_address, + target_address, target_balance, poll_timeout): """Wait until a given channels balance exceeds the target balance. @@ -51,6 +53,13 @@ def wait_for_newbalance( Note: This does not time out, use gevent.Timeout. """ + if target_address == raiden.address: + balance = lambda channel_state: channel_state.our_state.contract_balance + elif target_address == partner_address: + balance = lambda channel_state: channel_state.partner_state.contract_balance + else: + raise ValueError('target_address must be one of the channel participants') + channel_state = views.get_channelstate_for( views.state_from_raiden(raiden), payment_network_id, @@ -58,7 +67,7 @@ def wait_for_newbalance( partner_address, ) - while channel_state.our_state.contract_balance < target_balance: + while balance(channel_state) < target_balance: gevent.sleep(poll_timeout) channel_state = views.get_channelstate_for( views.state_from_raiden(raiden), @@ -149,3 +158,20 @@ def wait_for_settle_all_channels(raiden, poll_timeout): channel_ids, poll_timeout, ) + + +def wait_for_healthy(raiden, node_address, poll_timeout): + """Wait until `node_address` becomes healthy. + + Note: + This does not time out, use gevent.Timeout. + """ + network_statuses = views.get_networkstatuses( + views.state_from_raiden(raiden), + ) + + while network_statuses.get(node_address) != NODE_NETWORK_REACHABLE: + gevent.sleep(poll_timeout) + network_statuses = views.get_networkstatuses( + views.state_from_raiden(raiden), + ) diff --git a/tools/deploy.py b/tools/deploy.py index 5a81930a85..04d370ac8b 100755 --- a/tools/deploy.py +++ b/tools/deploy.py @@ -96,7 +96,6 @@ def deploy_file(contract, compiled_contracts, client): filename, _, name = contract.partition(":") log.info(f"Deploying {name}") proxy = client.deploy_solidity_contract( - client.sender, name, compiled_contracts, libraries, diff --git a/tools/init_blockchain.py b/tools/init_blockchain.py index cf0b97156d..5b5d4b0bc0 100644 --- a/tools/init_blockchain.py +++ b/tools/init_blockchain.py @@ -29,7 +29,6 @@ def create_and_distribute_token( name = name or hexlify(sha3(''.join(receivers).encode())) contract_path = get_contract_path('HumanStandardToken.sol') token_proxy = client.deploy_solidity_contract( - client.sender, 'HumanStandardToken', compile_file(contract_path), dict(), diff --git a/tools/transfer_eth.py b/tools/transfer_eth.py index c8a13324ca..6d6e9c225e 100755 --- a/tools/transfer_eth.py +++ b/tools/transfer_eth.py @@ -36,7 +36,7 @@ def main(private_key, eth_amount, targets_file, port, host): print("Sending {} eth to:".format(eth_amount)) for target in targets: print(" - {}".format(target)) - client.send_transaction(sender=client.sender, to=target, value=eth_amount * WEI_TO_ETH) + client.send_transaction(to=target, value=eth_amount * WEI_TO_ETH) if __name__ == "__main__":