Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions electrum/gui/qt/main_window.py
Original file line number Diff line number Diff line change
Expand Up @@ -1880,6 +1880,7 @@ def task():
password=password)
def on_failure(exc_info):
type_, e, traceback = exc_info
#self.logger.error("Could not open channel", exc_info=exc_info)
self.show_error(_('Could not open channel: {}').format(repr(e)))
WaitingDialog(self, _('Opening channel...'), task, self.on_open_channel_success, on_failure)

Expand Down
91 changes: 57 additions & 34 deletions electrum/lnpeer.py
Original file line number Diff line number Diff line change
Expand Up @@ -99,7 +99,7 @@ def __init__(
# gossip uses a single queue to preserve message order
self.gossip_queue = asyncio.Queue()
self.ordered_message_queues = defaultdict(asyncio.Queue) # for messages that are ordered
self.temp_id_to_id = {} # to forward error messages
self.temp_id_to_id = {} # type: Dict[bytes, Optional[bytes]] # to forward error messages
self.funding_created_sent = set() # for channels in PREOPENING
self.funding_signed_sent = set() # for channels in PREOPENING
self.shutdown_received = {} # chan_id -> asyncio.Future()
Expand Down Expand Up @@ -224,36 +224,42 @@ def process_message(self, message):
if asyncio.iscoroutinefunction(f):
asyncio.ensure_future(self.taskgroup.spawn(execution_result))

def _get_channel_ids(self, channel_id):
# if channel_id is all zero: MUST fail all channels with the sending node.
# otherwise: MUST fail the channel referred to by channel_id, if that channel is with the sending node.
# if no existing channel is referred to by `channel_id: MUST ignore the message.
if channel_id == bytes(32):
return self.channels.keys()
elif channel_id in self.temp_id_to_id:
return [self.temp_id_to_id[channel_id]]
elif channel_id in self.channels:
return [channel_id]
else:
return []

def on_warning(self, payload):
# TODO: we could need some reconnection logic here -> delayed reconnect
self.logger.info(f"remote peer sent warning [DO NOT TRUST THIS MESSAGE]: {payload['data'].decode('ascii')}")
channel_ids = self._get_channel_ids(payload.get("channel_id"))
for cid in channel_ids:
self.ordered_message_queues[cid].put_nowait((None, {'warning': payload['data']}))
if channel_ids:
raise GracefulDisconnect
chan_id = payload.get("channel_id")
self.logger.info(f"remote peer sent warning [DO NOT TRUST THIS MESSAGE]: "
f"{payload['data'].decode('ascii')}. chan_id={chan_id.hex()}")
if chan_id in self.channels:
self.ordered_message_queues[chan_id].put_nowait((None, {'warning': payload['data']}))
elif chan_id in self.temp_id_to_id:
chan_id = self.temp_id_to_id[chan_id] or chan_id
self.ordered_message_queues[chan_id].put_nowait((None, {'warning': payload['data']}))
else:
# if no existing channel is referred to by channel_id:
# - MUST ignore the message.
return
raise GracefulDisconnect

def on_error(self, payload):
self.logger.info(f"remote peer sent error [DO NOT TRUST THIS MESSAGE]: {payload['data'].decode('ascii')}")
channel_ids = self._get_channel_ids(payload.get("channel_id"))
for cid in channel_ids:
self.schedule_force_closing(cid)
self.ordered_message_queues[cid].put_nowait((None, {'error': payload['data']}))
if channel_ids:
raise GracefulDisconnect
chan_id = payload.get("channel_id")
self.logger.info(f"remote peer sent error [DO NOT TRUST THIS MESSAGE]: "
f"{payload['data'].decode('ascii')}. chan_id={chan_id.hex()}")
if chan_id in self.channels:
self.schedule_force_closing(chan_id)
self.ordered_message_queues[chan_id].put_nowait((None, {'error': payload['data']}))
elif chan_id in self.temp_id_to_id:
chan_id = self.temp_id_to_id[chan_id] or chan_id
self.ordered_message_queues[chan_id].put_nowait((None, {'error': payload['data']}))
elif chan_id == bytes(32):
# if channel_id is all zero:
# - MUST fail all channels with the sending node.
for cid in self.channels:
self.schedule_force_closing(cid)
self.ordered_message_queues[cid].put_nowait((None, {'error': payload['data']}))
else:
# if no existing channel is referred to by channel_id:
# - MUST ignore the message.
return
raise GracefulDisconnect

async def send_warning(self, channel_id: bytes, message: str = None, *, close_connection=True):
"""Sends a warning and disconnects if close_connection.
Expand Down Expand Up @@ -298,8 +304,11 @@ async def send_error(self, channel_id: bytes, message: str = None, *, force_clos
# MUST fail the channel(s) referred to by the error message:
# we may violate this with force_close_channel
if force_close_channel:
for cid in self._get_channel_ids(channel_id):
if channel_id in self.channels:
self.schedule_force_closing(channel_id)
elif channel_id == bytes(32):
for cid in self.channels:
self.schedule_force_closing(cid)
raise GracefulDisconnect

def on_ping(self, payload):
Expand All @@ -310,11 +319,15 @@ def on_pong(self, payload):
pass

async def wait_for_message(self, expected_name, channel_id):
# errors and warnings are sent to the queue with name set to None, so that this task terminates
q = self.ordered_message_queues[channel_id]
name, payload = await asyncio.wait_for(q.get(), LN_P2P_NETWORK_TIMEOUT)
if name is None:
raise GracefulDisconnect
# raise exceptions for errors/warnings, so that the caller sees them
if payload.get('error'):
raise GracefulDisconnect(
f"remote peer sent error [DO NOT TRUST THIS MESSAGE]: {payload['error'].decode('ascii')}")
elif payload.get('warning'):
raise GracefulDisconnect(
f"remote peer sent warning [DO NOT TRUST THIS MESSAGE]: {payload['warning'].decode('ascii')}")
if name != expected_name:
raise Exception(f"Received unexpected '{name}'")
return payload
Expand Down Expand Up @@ -663,7 +676,6 @@ async def wrapper(self: 'Peer', *args, **kwargs):
self.lnworker.wallet.set_reserved_state_of_address(addr, reserved=False)
return wrapper

@log_exceptions
@temporarily_reserve_funding_tx_change_address
async def channel_establishment_flow(
self, *,
Expand Down Expand Up @@ -714,6 +726,10 @@ async def channel_establishment_flow(
)
per_commitment_point_first = secret_to_pubkey(
int.from_bytes(per_commitment_secret_first, 'big'))

# store the temp id now, so that it is recognized for e.g. 'error' messages
# TODO: this is never cleaned up; the dict grows unbounded until disconnect
self.temp_id_to_id[temp_channel_id] = None
self.send_message(
"open_channel",
temporary_channel_id=temp_channel_id,
Expand Down Expand Up @@ -897,6 +913,9 @@ async def on_open_channel(self, payload):
push_msat = payload['push_msat']
feerate = payload['feerate_per_kw'] # note: we are not validating this
temp_chan_id = payload['temporary_channel_id']
# store the temp id now, so that it is recognized for e.g. 'error' messages
# TODO: this is never cleaned up; the dict grows unbounded until disconnect
self.temp_id_to_id[temp_chan_id] = None

open_channel_tlvs = payload.get('open_channel_tlvs')
channel_type = open_channel_tlvs.get('channel_type') if open_channel_tlvs else None
Expand Down Expand Up @@ -1018,6 +1037,7 @@ async def on_open_channel(self, payload):
channel_id=channel_id,
signature=sig_64,
)
self.temp_id_to_id[temp_chan_id] = channel_id
self.funding_signed_sent.add(chan.channel_id)
chan.open_with_first_pcp(payload['first_per_commitment_point'], remote_sig)
chan.set_state(ChannelState.OPENING)
Expand All @@ -1040,7 +1060,10 @@ def schedule_force_closing(self, channel_id: bytes):
channels_with_peer.extend(self.temp_id_to_id.values())
if channel_id not in channels_with_peer:
raise ValueError(f"channel {channel_id.hex()} does not belong to this peer")
self.lnworker.schedule_force_closing(channel_id)
if channel_id in self.channels:
self.lnworker.schedule_force_closing(channel_id)
else:
self.logger.warning(f"tried to force-close channel {channel_id.hex()} but it is not in self.channels yet")

def on_channel_reestablish(self, chan, msg):
their_next_local_ctn = msg["next_commitment_number"]
Expand Down