From 53de9f8c716f567c532fdb3595299526826dd85a Mon Sep 17 00:00:00 2001 From: junderw Date: Sun, 8 Oct 2023 00:34:14 -0700 Subject: [PATCH 1/5] Feat: Log client IP during REST and Electrum requests --- Cargo.lock | 39 ++++ Cargo.toml | 2 + src/bin/electrs.rs | 9 + src/config.rs | 31 ++++ src/electrum/server.rs | 400 +++++++++++++++++++++++++++++++++-------- src/new_index/fetch.rs | 8 +- src/rest.rs | 47 ++++- src/util/mod.rs | 33 ++-- start | 19 ++ 9 files changed, 486 insertions(+), 102 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index db6d368a..85e9d1b5 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -391,6 +391,12 @@ dependencies = [ "winapi 0.3.9", ] +[[package]] +name = "doc-comment" +version = "0.3.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "fea41bba32d969b513997752735605054bc0dfa92b4c56bf1189f2e174be7a10" + [[package]] name = "either" version = "1.6.1" @@ -843,9 +849,11 @@ dependencies = [ "lazy_static", "libc", "log", + "memchr", "num_cpus", "page_size", "prometheus", + "proxy-protocol", "rayon", "rocksdb", "serde", @@ -1098,6 +1106,16 @@ version = "2.27.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "cf7e6d18738ecd0902d30d1ad232c9125985a3422929b16c65517b38adc14f96" +[[package]] +name = "proxy-protocol" +version = "0.5.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "0e50c72c21c738f5c5f350cc33640aee30bf7cd20f9d9da20ed41bce2671d532" +dependencies = [ + "bytes", + "snafu", +] + [[package]] name = "quote" version = "1.0.31" @@ -1512,6 +1530,27 @@ version = "1.8.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "f2dd574626839106c320a323308629dcb1acfc96e32a8cba364ddc61ac23ee83" +[[package]] +name = "snafu" +version = "0.6.10" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "eab12d3c261b2308b0d80c26fffb58d17eba81a4be97890101f416b478c79ca7" +dependencies = [ + "doc-comment", + "snafu-derive", +] + +[[package]] +name = "snafu-derive" +version = "0.6.10" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "1508efa03c362e23817f96cde18abed596a25219a8b2c66e8db33c03543d315b" +dependencies = [ + "proc-macro2", + "quote", + "syn 1.0.91", +] + [[package]] name = "socket2" version = "0.4.4" diff --git a/Cargo.toml b/Cargo.toml index 58c5b579..133362f6 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -43,8 +43,10 @@ libc = "0.2.81" log = "0.4.11" socket2 = { version = "0.4", features = ["all"] } num_cpus = "1.12.0" +memchr = "2.4.1" page_size = "0.4.2" prometheus = "0.13" +proxy-protocol = { version = "0.5", features = ["always_exhaustive"] } rayon = "1.5.0" rocksdb = "0.21.0" serde = "1.0.118" diff --git a/src/bin/electrs.rs b/src/bin/electrs.rs index 0c649355..b8b32c7e 100644 --- a/src/bin/electrs.rs +++ b/src/bin/electrs.rs @@ -118,6 +118,15 @@ fn run_server(config: Arc) -> Result<()> { ); } + if std::env::var("ELECTRS_PERIODIC_THREAD_LOGGER").is_ok() { + electrs::util::spawn_thread("periodic_thread_logger", || loop { + electrs::util::with_spawned_threads(|threads| { + debug!("THREADS: {:?}", threads); + }); + std::thread::sleep(std::time::Duration::from_millis(5000)); + }); + } + loop { if let Err(err) = signal.wait(Duration::from_millis(config.main_loop_delay), true) { info!("stopping server: {}", err); diff --git a/src/config.rs b/src/config.rs index a5e903ce..96a07fd5 100644 --- a/src/config.rs +++ b/src/config.rs @@ -40,6 +40,8 @@ pub struct Config { pub daemon_rpc_addr: SocketAddr, pub cookie: Option, pub electrum_rpc_addr: SocketAddr, + pub electrum_proxy_depth: usize, + pub rest_proxy_depth: usize, pub http_addr: SocketAddr, pub http_socket_file: Option, pub rpc_socket_file: Option, @@ -150,6 +152,23 @@ impl Config { .help("Electrum server JSONRPC 'addr:port' to listen on (default: '127.0.0.1:50001' for mainnet, '127.0.0.1:60001' for testnet and '127.0.0.1:60401' for regtest)") .takes_value(true), ) + .arg( + Arg::with_name("electrum_proxy_depth") + .long("electrum-proxy-depth") + .help("Electrum server's PROXY protocol header depth. \ + ie. a value of 2 means the 2nd closest hop's PROXY header \ + will be used to find the source IP. A value of 0 means all \ + IPs are ignored. (default: 0)") + .takes_value(true), + ) + .arg( + Arg::with_name("rest_proxy_depth") + .long("rest-proxy-depth") + .help("REST server's X-Forwarded-For IP address depth. \ + ie. a value of 2 means the 2nd IP address in the header(s) is used. \ + (default: 0)") + .takes_value(true), + ) .arg( Arg::with_name("http_addr") .long("http-addr") @@ -440,6 +459,16 @@ impl Config { .unwrap_or(&format!("127.0.0.1:{}", default_electrum_port)), "Electrum RPC", ); + let electrum_proxy_depth = m + .value_of("electrum_proxy_depth") + .unwrap_or("0") + .parse::() + .expect("invalid electrum_proxy_depth"); + let rest_proxy_depth = m + .value_of("rest_proxy_depth") + .unwrap_or("0") + .parse::() + .expect("invalid rest_proxy_depth"); let http_addr: SocketAddr = str_to_socketaddr( m.value_of("http_addr") .unwrap_or(&format!("127.0.0.1:{}", default_http_port)), @@ -515,6 +544,8 @@ impl Config { cookie, utxos_limit: value_t_or_exit!(m, "utxos_limit", usize), electrum_rpc_addr, + electrum_proxy_depth, + rest_proxy_depth, electrum_txs_limit: value_t_or_exit!(m, "electrum_txs_limit", usize), electrum_banner, http_addr, diff --git a/src/electrum/server.rs b/src/electrum/server.rs index ae427cd2..1a2ef37c 100644 --- a/src/electrum/server.rs +++ b/src/electrum/server.rs @@ -1,3 +1,4 @@ +use std::cell::{Cell, RefCell}; use std::collections::HashMap; use std::convert::TryInto; use std::fs; @@ -9,14 +10,15 @@ use std::os::unix::fs::FileTypeExt; use std::os::unix::net::{UnixListener, UnixStream}; use std::path::Path; use std::sync::atomic::AtomicBool; -use std::sync::mpsc::{Receiver, Sender}; +use std::sync::mpsc::Sender; use std::sync::{Arc, Mutex}; use std::thread; use bitcoin::hashes::sha256d::Hash as Sha256dHash; use error_chain::ChainedError; use hex; -use serde_json::{from_str, Value}; +use proxy_protocol::{version1, version2, ProxyHeader}; +use serde_json::Value; use sha2::{Digest, Sha256}; #[cfg(not(feature = "liquid"))] @@ -100,39 +102,78 @@ fn get_status_hash(txs: Vec<(Txid, Option)>, query: &Query) -> Option>>, + electrum_proxy_depth: usize, + // Chain info related query: Arc, last_header_entry: Option, status_hashes: HashMap, // ScriptHash -> StatusHash - stream: ConnectionStream, - chan: SyncChannel, stats: Arc, txs_limit: usize, - die_please: Option>, + // Stream related + stream: ConnectionStream, + _arc_stream: Arc, // Needs to be kept alive until drop + reader: RefCell>>, + // Channel related + message_chan: SyncChannel, + shutdown_replies: crossbeam_channel::Receiver<()>, // For reply select branch + shutdown_send: crossbeam_channel::Sender<()>, // For Drop. Kills properly-die thread + // Discovery related #[cfg(feature = "electrum-discovery")] discovery: Option>, } +impl Drop for Connection { + fn drop(&mut self) { + let _ = self.shutdown_send.send(()); + } +} + impl Connection { pub fn new( query: Arc, stream: ConnectionStream, stats: Arc, - txs_limit: usize, - die_please: Receiver<()>, + (txs_limit, electrum_proxy_depth): (usize, usize), + shutdown: SyncChannel<()>, #[cfg(feature = "electrum-discovery")] discovery: Option>, ) -> Connection { - Connection { + // Channels + let (reply_killer, shutdown_replies) = crossbeam_channel::unbounded(); + let shutdown_send = shutdown.sender(); + + // Using this Arc to prevent any thread leaks from keeping the stream alive + let _arc_stream = Arc::new(stream.try_clone().expect("failed to clone TcpStream")); + let maybe_stream = Arc::downgrade(&_arc_stream); + + spawn_thread("properly-die", move || { + let _ = shutdown.receiver().map(|c| c.recv()); + let _ = maybe_stream.upgrade().map(|s| s.shutdown(Shutdown::Both)); + let _ = reply_killer.send(()); + }); + + let ret = Connection { + proxy_proto_addr: Cell::new(None), + electrum_proxy_depth, query, last_header_entry: None, // disable header subscription for now status_hashes: HashMap::new(), + reader: RefCell::new(Some(BufReader::new( + stream.try_clone().expect("failed to clone TcpStream"), + ))), stream, - chan: SyncChannel::new(10), + _arc_stream, + message_chan: SyncChannel::new(10), stats, txs_limit, - die_please: Some(die_please), + shutdown_replies, + shutdown_send, #[cfg(feature = "electrum-discovery")] discovery, - } + }; + // Wait for first request to find + ret.get_source_addr(); + ret } fn blockchain_headers_subscribe(&mut self) -> Result { @@ -354,8 +395,12 @@ impl Connection { let tx = params.get(0).chain_err(|| "missing tx")?; let tx = tx.as_str().chain_err(|| "non-string tx")?.to_string(); let txid = self.query.broadcast_raw(&tx)?; - if let Err(e) = self.chan.sender().try_send(Message::PeriodicUpdate) { - warn!("failed to issue PeriodicUpdate after broadcast: {}", e); + if let Err(e) = self.message_chan.sender().try_send(Message::PeriodicUpdate) { + warn!( + "[{}] failed to issue PeriodicUpdate after broadcast: {}", + self.get_source_addr_str(), + e + ); } Ok(json!(txid)) } @@ -455,7 +500,8 @@ impl Connection { Ok(result) => json!({"jsonrpc": "2.0", "id": id, "result": result}), Err(e) => { warn!( - "rpc #{} {} {:?} failed: {}", + "[{}] rpc #{} {} {:?} failed: {}", + self.get_source_addr_str(), id, method, params, @@ -512,16 +558,48 @@ impl Connection { Ok(()) } - fn handle_replies(&mut self, shutdown: crossbeam_channel::Receiver<()>) -> Result<()> { + fn get_source_addr_str(&self) -> String { + self.get_source_addr() + .map(|s| s.to_string()) + .unwrap_or_else(|| self.stream.addr_string()) + } + + /// This will only check the PROXY protocol once + /// and store the result in the first Option. + /// Some(None) means "we checked, but there was no address" + /// The inner option is returned as a Copy. + fn get_source_addr(&self) -> Option { + // Option is Copy + if let Some(v) = self.proxy_proto_addr.get() { + v + } else { + let v = self + .reader + .borrow_mut() + .as_mut() + .and_then(|r| r.fill_buf().ok()) + .and_then(|mut available| { + parse_proxy_headers(&mut available, self.electrum_proxy_depth).0 + }) + .map(|addr| { + trace!("RPC Received PROXY Protocol address: {}", addr); + addr + }); + self.proxy_proto_addr.set(Some(v)); + v + } + } + + fn handle_replies(&mut self) -> Result<()> { let empty_params = json!([]); + let addr_str = self.get_source_addr_str(); loop { crossbeam_channel::select! { - recv(self.chan.receiver()) -> msg => { - let msg = msg.chain_err(|| "channel closed")?; - trace!("RPC {:?}", msg); + recv(self.message_chan.receiver().chain_err(|| format!("[{addr_str}] channel closed"))?) -> msg => { + let msg = msg.chain_err(|| format!("[{addr_str}] channel closed"))?; + trace!("RPC [{addr_str}] {:?}", msg); match msg { - Message::Request(line) => { - let cmd: Value = from_str(&line).chain_err(|| "invalid JSON format")?; + Message::Request(cmd) => { let reply = match ( cmd.get("method"), cmd.get("params").unwrap_or(&empty_params), @@ -530,24 +608,24 @@ impl Connection { (Some(Value::String(method)), Value::Array(params), Some(id)) => { self.handle_command(method, params, id)? } - _ => bail!("invalid command: {}", cmd), + _ => bail!("[{addr_str}] invalid command: {}", cmd), }; self.send_values(&[reply])? } Message::PeriodicUpdate => { let values = self .update_subscriptions() - .chain_err(|| "failed to update subscriptions")?; + .chain_err(|| format!("[{addr_str}] failed to update subscriptions"))?; self.send_values(&values)? } Message::Done => { - self.chan.close(); + self.message_chan.close(); return Ok(()); } } } - recv(shutdown) -> _ => { - self.chan.close(); + recv(self.shutdown_replies) -> _ => { + self.message_chan.close(); return Ok(()); } } @@ -556,29 +634,32 @@ impl Connection { fn handle_requests( mut reader: BufReader, - tx: crossbeam_channel::Sender, + tx: &crossbeam_channel::Sender, + addr_str: &str, ) -> Result<()> { loop { - let mut line = Vec::::new(); - reader - .read_until(b'\n', &mut line) - .chain_err(|| "failed to read a request")?; - if line.is_empty() { - tx.send(Message::Done).chain_err(|| "channel closed")?; + let mut recv_data = Vec::::new(); + match read_until(&mut reader, b'\n', &mut recv_data) { + Ok(bytes) => trace!("[{addr_str}] Read {bytes} bytes from connection"), + Err(e) => bail!("[{addr_str}] Failed to read: {}", e), + } + if recv_data.is_empty() { return Ok(()); } else { - if line.starts_with(&[22, 3, 1]) { + if recv_data.starts_with(&[22, 3, 1]) { // (very) naive SSL handshake detection - let _ = tx.send(Message::Done); - bail!("invalid request - maybe SSL-encrypted data?: {:?}", line) + bail!( + "[{addr_str}] invalid request - maybe SSL-encrypted data?: {:?}", + recv_data + ) } - match String::from_utf8(line) { + match serde_json::from_slice(&recv_data) { Ok(req) => tx .send(Message::Request(req)) - .chain_err(|| "channel closed")?, + .chain_err(|| format!("[{}] channel closed", addr_str))?, Err(err) => { let _ = tx.send(Message::Done); - bail!("invalid UTF8: {}", err) + bail!("[{}] invalid UTF8: {}", addr_str, err) } } } @@ -587,29 +668,36 @@ impl Connection { pub fn run(mut self) { self.stats.clients.inc(); - let reader = BufReader::new(self.stream.try_clone().expect("failed to clone TcpStream")); - let tx = self.chan.sender(); - - let die_please = self.die_please.take().unwrap(); - let (reply_killer, reply_receiver) = crossbeam_channel::unbounded(); - - // We create a clone of the stream and put it in an Arc - // This will drop at the end of the function. - let arc_stream = Arc::new(self.stream.try_clone().expect("failed to clone TcpStream")); - // We don't want to keep the stream alive until SIGINT - // It should drop (close) no matter what. - let maybe_stream = Arc::downgrade(&arc_stream); - spawn_thread("properly-die", move || { - let _ = die_please.recv(); - let _ = maybe_stream.upgrade().map(|s| s.shutdown(Shutdown::Both)); - let _ = reply_killer.send(()); + let reader = self.reader.take().unwrap(); + let tx = self.message_chan.sender(); + + let rpc_addr = self.get_source_addr(); + let addr_str = self.get_source_addr_str(); + let shutdown_send = self.shutdown_send.clone(); + let child = spawn_thread("reader", move || { + let addr_str = rpc_addr + .map(|a| a.to_string()) + .unwrap_or_else(|| reader.get_ref().addr_string()); + let result = + std::panic::catch_unwind(|| Connection::handle_requests(reader, &tx, &addr_str)) + .unwrap_or_else(|e| { + Err(format!( + "[{}] RPC Panic in request handler: {}", + addr_str, + parse_panic_error(&e) + ) + .into()) + }); + // This shuts down the other graceful shutdown thread, + // which also shuts down the handle_replies loop + // regardless of panic, Err, or Ok + let _ = shutdown_send.send(()); + result }); - - let child = spawn_thread("reader", || Connection::handle_requests(reader, tx)); - if let Err(e) = self.handle_replies(reply_receiver) { + if let Err(e) = self.handle_replies() { error!( "[{}] connection handling failed: {}", - self.stream.addr_string(), + addr_str, e.display_chain().to_string() ); } @@ -618,13 +706,11 @@ impl Connection { .subscriptions .sub(self.status_hashes.len() as i64); - let addr = self.stream.addr_string(); - debug!("[{}] shutting down connection", addr); - // Drop the Arc so that the stream properly closes. - drop(arc_stream); + debug!("[{}] shutting down connection", addr_str); let _ = self.stream.shutdown(Shutdown::Both); - if let Err(err) = child.join().expect("receiver panicked") { - error!("[{}] receiver failed: {}", addr, err); + self.message_chan.close(); + if let Err(err) = child.join().expect("receiver can't panic") { + error!("[{}] receiver failed: {}", addr_str, err); } } } @@ -654,7 +740,7 @@ struct GetHistoryResult { #[derive(Debug)] pub enum Message { - Request(String), + Request(Value), PeriodicUpdate, Done, } @@ -770,6 +856,7 @@ impl RPC { }); let txs_limit = config.electrum_txs_limit; + let electrum_proxy_depth = config.electrum_proxy_depth; RPC { notification: notification.sender(), @@ -791,7 +878,6 @@ impl RPC { let (garbage_sender, garbage_receiver) = crossbeam_channel::unbounded(); while let Some(stream) = acceptor.receiver().recv().unwrap() { - let addr = stream.addr_string(); // explicitely scope the shadowed variables for the new thread let query = Arc::clone(&query); let senders = Arc::clone(&senders); @@ -799,36 +885,38 @@ impl RPC { let garbage_sender = garbage_sender.clone(); // Kill the peers properly - let (killer, peace_receiver) = std::sync::mpsc::channel(); - let killer_clone = killer.clone(); + let shutdown_channel = SyncChannel::new(1); + let shutdown_sender = shutdown_channel.sender(); #[cfg(feature = "electrum-discovery")] let discovery = discovery.clone(); let spawned = spawn_thread("peer", move || { - let addr = stream.addr_string(); - info!("[{}] connected peer", addr); + let shutdown_sender = shutdown_channel.sender(); + info!("connected peer. waiting for first request..."); let conn = Connection::new( query, stream, stats, - txs_limit, - peace_receiver, + (txs_limit, electrum_proxy_depth), + shutdown_channel, #[cfg(feature = "electrum-discovery")] discovery, ); - senders.lock().unwrap().push(conn.chan.sender()); + let addr = conn.get_source_addr_str(); + info!("[{}] connected peer", addr); + senders.lock().unwrap().push(conn.message_chan.sender()); conn.run(); info!("[{}] disconnected peer", addr); - let _ = killer_clone.send(()); + let _ = shutdown_sender.send(()); let _ = garbage_sender.send(std::thread::current().id()); }); - trace!("[{}] spawned {:?}", addr, spawned.thread().id()); - threads.insert(spawned.thread().id(), (spawned, killer)); + trace!("spawned {:?}", spawned.thread().id()); + threads.insert(spawned.thread().id(), (spawned, shutdown_sender)); while let Ok(id) = garbage_receiver.try_recv() { if let Some((thread, killer)) = threads.remove(&id) { - trace!("[{}] joining {:?}", addr, id); + trace!("joining {:?}", id); let _ = killer.send(()); if let Err(error) = thread.join() { error!("failed to join {:?}: {:?}", id, error); @@ -992,7 +1080,7 @@ impl ConnectionStream { fn addr_string(&self) -> String { match self { ConnectionStream::Tcp(_, a) => format!("{a}"), - ConnectionStream::Unix(_, a, _) => format!("{a:?}"), + ConnectionStream::Unix(_, _, _) => "(Unix socket)".to_string(), } } @@ -1050,3 +1138,159 @@ impl Read for ConnectionStream { } } } + +/// This is a slightly modified version of read_until from standard library BufRead trait. +/// After every read we check if there's a PROXY protocol header at the beginning of the read. +fn read_until( + r: &mut BufReader, + delim: u8, + buf: &mut Vec, +) -> std::io::Result { + let mut read = 0; + let mut carry_over_arr = [0_u8; 256]; + let mut carrying_over = 0; + loop { + let (done, used) = { + let mut available = match r.fill_buf() { + Ok(n) => n, + Err(ref e) if e.kind() == std::io::ErrorKind::Interrupted => continue, + Err(e) => return Err(e), + }; + + let skipped_count = if carrying_over > 0 { + process_carry_over(&mut carrying_over, &mut carry_over_arr, &mut available) + } else { + // Added to read_until impl: Try parsing PROXY headers after every read. + let (_, skipped_count) = parse_proxy_headers(&mut available, 0); + skipped_count + }; + + match memchr::memchr(delim, available) { + Some(i) => { + buf.extend_from_slice(&available[..=i]); + (true, i + 1 + skipped_count) + } + None => { + // Added: carry over + carrying_over = available.len(); + carry_over_arr[..carrying_over].copy_from_slice(available); + + buf.extend_from_slice(available); + (false, available.len() + skipped_count) + } + } + }; + r.consume(used); + read += used; + if done || used == 0 { + return Ok(read); + } + } +} + +fn proxy_header_to_source_socket_addr(p_header: ProxyHeader) -> Option { + match p_header { + ProxyHeader::Version1 { + addresses: version1::ProxyAddresses::Ipv4 { source, .. }, + } => Some(SocketAddr::V4(source)), + ProxyHeader::Version1 { + addresses: version1::ProxyAddresses::Ipv6 { source, .. }, + } => Some(SocketAddr::V6(source)), + ProxyHeader::Version2 { + addresses: version2::ProxyAddresses::Ipv4 { source, .. }, + .. + } => Some(SocketAddr::V4(source)), + ProxyHeader::Version2 { + addresses: version2::ProxyAddresses::Ipv6 { source, .. }, + .. + } => Some(SocketAddr::V6(source)), + _ => None, + } +} + +fn parse_proxy_headers( + buf: &mut &[u8], + electrum_proxy_depth: usize, +) -> (Option, usize) { + trace!("Starting parse PROXY headers: {:?}", &buf[..12]); + let mut addr = None; + let mut current_header_index = 0; + let before_len = buf.len(); + // The last header is the outer-most proxy + // Warning do not early return. ONLY break the loop. + loop { + let p_header = match proxy_protocol::parse(buf) { + Ok(h) => h, + Err(proxy_protocol::ParseError::NotProxyHeader) => break, + // This won't move the buf cursor forward + // and will most likely end in an error higher in the call stack + // This means "PROXY protocol was used, but it was in an unknown format" + // (Maybe someday if nginx/etc. uses a new version of the protocol + // and we don't update the dependency to a version that handles the new + // version, it might break.) + Err(_) => break, + }; + trace!("Parsed PROXY protocol header: {:?}", p_header); + // Increment from 0 to 1 before the first check + current_header_index += 1; + // 0 should always continue + // 1 should only get the 1st header's IP address etc. + if current_header_index != electrum_proxy_depth { + continue; + } + // The address is only attempted to be + // parsed when the 1 based index is equal + addr = proxy_header_to_source_socket_addr(p_header); + } + (addr, before_len - buf.len()) +} + +fn parse_panic_error(e: &(dyn std::any::Any + Send)) -> &str { + if let Some(s) = e.downcast_ref::<&str>() { + s + } else if let Some(s) = e.downcast_ref::() { + s + } else { + "Unknown panic" + } +} + +/// The goal of this function is to take the carried over bytes from the last loop +/// and connect them with the first bytes of the next read, then check if it's a header. +/// This is to prevent headers from straddling the BufReader's buffer end. +/// A simple static array should be quick and easy. +fn process_carry_over( + carrying_over: &mut usize, + carry_over: &mut [u8], + available: &mut &[u8], +) -> usize { + // How much space do we have left in the array? + let empty_space = carry_over.len() - *carrying_over; + // How many bytes should we copy over? + let copy_bytes = available.len().min(empty_space); + // Copy over the bytes to join with the carried over bytes + carry_over[*carrying_over..*carrying_over + copy_bytes] + .copy_from_slice(&available[..copy_bytes]); + + // Figure out if it was a proxy header or not. + #[allow(clippy::redundant_slicing)] + let mut cursor = &carry_over[..]; + let before_len = cursor.len(); + let was_proxy = proxy_protocol::parse(&mut cursor).is_ok(); + let skipped_count = before_len - cursor.len(); + + let skip_count = if was_proxy { + // We only want to skip the amount in the new buffer + skipped_count.saturating_sub(*carrying_over) + } else { + 0 + }; + + // Move the available cursor + *available = &available[skip_count..]; + // Reset carrying over (writing 0s to the array is unnecessary) + *carrying_over = 0; + + // Return the skip count so we can call consume later + skip_count +} diff --git a/src/new_index/fetch.rs b/src/new_index/fetch.rs index ec4c2cb6..6d06ce2e 100644 --- a/src/new_index/fetch.rs +++ b/src/new_index/fetch.rs @@ -74,7 +74,7 @@ fn bitcoind_fetcher( let chan = SyncChannel::new(1); let sender = chan.sender(); Ok(Fetcher::from( - chan.into_receiver(), + chan.into_receiver().expect("not closed"), spawn_thread("bitcoind_fetcher", move || { for entries in new_headers.chunks(100) { let blockhashes: Vec = entries.iter().map(|e| *e.hash()).collect(); @@ -115,7 +115,7 @@ fn blkfiles_fetcher( let parser = blkfiles_parser(blkfiles_reader(blk_files), magic); Ok(Fetcher::from( - chan.into_receiver(), + chan.into_receiver().expect("not closed"), spawn_thread("blkfiles_fetcher", move || { parser.map(|sizedblocks| { let block_entries: Vec = sizedblocks @@ -151,7 +151,7 @@ fn blkfiles_reader(blk_files: Vec) -> Fetcher> { let sender = chan.sender(); Fetcher::from( - chan.into_receiver(), + chan.into_receiver().expect("not closed"), spawn_thread("blkfiles_reader", move || { for path in blk_files { trace!("reading {:?}", path); @@ -170,7 +170,7 @@ fn blkfiles_parser(blobs: Fetcher>, magic: u32) -> Fetcher, rest_proxy_depth: usize) -> Option { + if rest_proxy_depth == 0 { + return None; + } + headers + .get_all("X-Forwarded-For") + .iter() + .filter_map(|v| v.to_str().ok()) + .join(",") + .split(',') + .nth(rest_proxy_depth - 1) + .and_then(|ip| ip.trim().parse::().ok()) +} + +fn get_client_ip_str() -> Cow<'static, str> { + get_rest_addr() + .map(|a| Cow::Owned(a.to_string())) + .unwrap_or_else(|| Cow::Borrowed("Unknown IP")) +} + #[tokio::main] async fn run_server( config: Arc, @@ -577,16 +604,20 @@ async fn run_server( let config = Arc::clone(&config); let timer = metric.with_label_values(&["all_methods"]).start_timer(); - async move { + REST_CLIENT_ADDR.scope(Cell::new(None), async move { let method = req.method().clone(); let uri = req.uri().clone(); + + // Set the task local IP addr from X-Forwarded-For + set_rest_addr(get_client_ip(req.headers(), config.rest_proxy_depth)); + let body = hyper::body::to_bytes(req.into_body()).await?; let mut resp = tokio::task::block_in_place(|| { handle_request(method, uri, body, &query, &config) }) .unwrap_or_else(|err| { - warn!("{:?}", err); + warn!("[{}] {:?}", get_client_ip_str(), err); Response::builder() .status(err.0) .header("Content-Type", "text/plain") @@ -600,7 +631,7 @@ async fn run_server( } timer.observe_duration(); Ok::<_, hyper::Error>(resp) - } + }) })) } }; @@ -688,7 +719,7 @@ fn handle_request( None => HashMap::new(), }; - info!("handle {:?} {:?}", method, uri); + info!("[{}] handle {:?} {:?}", get_client_ip_str(), method, uri); match ( &method, path.first(), diff --git a/src/util/mod.rs b/src/util/mod.rs index 22775911..b699b500 100644 --- a/src/util/mod.rs +++ b/src/util/mod.rs @@ -14,6 +14,7 @@ pub use self::transaction::{ sigops::transaction_sigop_count, TransactionStatus, TxInput, }; +use std::cell::Cell; use std::collections::HashMap; use std::sync::atomic::AtomicUsize; use std::sync::mpsc::{channel, Receiver, Sender}; @@ -23,7 +24,7 @@ use std::thread::{self, ThreadId}; use crate::chain::BlockHeader; use bitcoin::hashes::sha256d::Hash as Sha256dHash; use socket2::{Domain, Protocol, Socket, Type}; -use std::net::SocketAddr; +use std::net::{IpAddr, SocketAddr}; pub type Bytes = Vec; pub type HeaderMap = HashMap; @@ -38,29 +39,26 @@ pub fn full_hash(hash: &[u8]) -> FullHash { } pub struct SyncChannel { - tx: Option>, + tx: crossbeam_channel::Sender, rx: Option>, } impl SyncChannel { pub fn new(size: usize) -> SyncChannel { let (tx, rx) = crossbeam_channel::bounded(size); - SyncChannel { - tx: Some(tx), - rx: Some(rx), - } + SyncChannel { tx, rx: Some(rx) } } pub fn sender(&self) -> crossbeam_channel::Sender { - self.tx.as_ref().expect("No Sender").clone() + self.tx.clone() } - pub fn receiver(&self) -> &crossbeam_channel::Receiver { - self.rx.as_ref().expect("No Receiver") + pub fn receiver(&self) -> Option<&crossbeam_channel::Receiver> { + self.rx.as_ref() } - pub fn into_receiver(self) -> crossbeam_channel::Receiver { - self.rx.expect("No Receiver") + pub fn into_receiver(self) -> Option> { + self.rx } /// This drops the sender and receiver, causing all other methods to panic. @@ -68,7 +66,6 @@ impl SyncChannel { /// Use only when you know that the channel will no longer be used. /// ie. shutdown. pub fn close(&mut self) -> Option> { - self.tx.take(); self.rx.take() } } @@ -225,3 +222,15 @@ pub mod serde_hex { } } } + +pub(crate) fn get_rest_addr() -> Option { + REST_CLIENT_ADDR.with(|addr| addr.get()) +} +pub(crate) fn set_rest_addr(input: Option) { + REST_CLIENT_ADDR.with(|addr| { + addr.set(input); + }); +} +tokio::task_local! { + pub(crate) static REST_CLIENT_ADDR: Cell>; +} diff --git a/start b/start index 7e5cd80f..567e5acc 100755 --- a/start +++ b/start @@ -8,6 +8,23 @@ DB_FOLDER=/electrs NODENAME=$(hostname|cut -d . -f1) LOCATION=$(hostname|cut -d . -f2) +# since we know that our nginx will always be the first +# (closest) proxy from electrs, we set it to 1. +# If some servers prefer to disable PROXY protocol, +# set to 0. If the client themselves is proxying the +# electrum RPCs from other clients, this should be set to +# the hop that you want to trust to source IP addresses. +# If that hop doesn't exist, TcpStream's local_addr or +# "Unix socket" is used, and the PROXY headers are discarded. +# Image: +# [electrs] <> [proxy "1"] <> [proxy "2"] <> [electrum client] +RPC_PROXY_DEPTH=1 +# This chooses the depth of X-Forwarded-For IP to show. +# A value of 1 is the first value from left to right. +# Image: +# "X-Forwarded-For: proxy1, proxy2, proxy3..." +REST_PROXY_DEPTH=1 + # load rust if necessary if [ -e "${HOME}/.cargo/env" ];then source "${HOME}/.cargo/env" @@ -161,6 +178,8 @@ do --address-search \ --utxos-limit "${UTXOS_LIMIT}" \ --electrum-txs-limit "${ELECTRUM_TXS_LIMIT}" \ + --electrum-proxy-depth "${RPC_PROXY_DEPTH}" \ + --rest-proxy-depth "${REST_PROXY_DEPTH}" \ -vv sleep 1 done From db47f5ff4fdc5ae8d0ae61438d528ef8807c6bbd Mon Sep 17 00:00:00 2001 From: junderw Date: Thu, 12 Oct 2023 18:26:40 -0700 Subject: [PATCH 2/5] Fix: Indexing panic + read errors when headers hit buffer boundary --- src/electrum/server.rs | 192 +++++++++++++++++++++++++++++++++-------- 1 file changed, 158 insertions(+), 34 deletions(-) diff --git a/src/electrum/server.rs b/src/electrum/server.rs index 1a2ef37c..51f2112c 100644 --- a/src/electrum/server.rs +++ b/src/electrum/server.rs @@ -1141,11 +1141,7 @@ impl Read for ConnectionStream { /// This is a slightly modified version of read_until from standard library BufRead trait. /// After every read we check if there's a PROXY protocol header at the beginning of the read. -fn read_until( - r: &mut BufReader, - delim: u8, - buf: &mut Vec, -) -> std::io::Result { +fn read_until(r: &mut impl BufRead, delim: u8, buf: &mut Vec) -> std::io::Result { let mut read = 0; let mut carry_over_arr = [0_u8; 256]; let mut carrying_over = 0; @@ -1157,25 +1153,35 @@ fn read_until( Err(e) => return Err(e), }; - let skipped_count = if carrying_over > 0 { + // If carry over, try to parse PROXY headers. + let (carry_skipped_count, exit_error) = if carrying_over > 0 { process_carry_over(&mut carrying_over, &mut carry_over_arr, &mut available) } else { - // Added to read_until impl: Try parsing PROXY headers after every read. - let (_, skipped_count) = parse_proxy_headers(&mut available, 0); - skipped_count + (0, false) }; + // Rare edge case. If we carry over a proxy parse and it still errors + // It is most likely an unknown format + if exit_error { + return Err(std::io::Error::from(std::io::ErrorKind::UnexpectedEof)); + } + + // Try parsing PROXY headers after every read. + let (_, skipped_count, exit_error) = parse_proxy_headers(&mut available, 0); + let skipped_count = carry_skipped_count + skipped_count; - match memchr::memchr(delim, available) { - Some(i) => { + match (memchr::memchr(delim, available), exit_error) { + (Some(i), false) => { buf.extend_from_slice(&available[..=i]); (true, i + 1 + skipped_count) } - None => { + (None, _) | (_, true) => { // Added: carry over - carrying_over = available.len(); - carry_over_arr[..carrying_over].copy_from_slice(available); + insert_carry_over(&mut carrying_over, &mut carry_over_arr, available); + + if !exit_error { + buf.extend_from_slice(available); + } - buf.extend_from_slice(available); (false, available.len() + skipped_count) } } @@ -1211,25 +1217,40 @@ fn proxy_header_to_source_socket_addr(p_header: ProxyHeader) -> Option (Option, usize) { - trace!("Starting parse PROXY headers: {:?}", &buf[..12]); +) -> (Option, usize, bool) { + trace!("Starting parse PROXY headers: {:?}", buf.get(..12)); let mut addr = None; let mut current_header_index = 0; let before_len = buf.len(); + // Save the original state of the buf + let mut original_buf = *buf; + let mut error_exit = false; // The last header is the outer-most proxy // Warning do not early return. ONLY break the loop. loop { let p_header = match proxy_protocol::parse(buf) { Ok(h) => h, + // NotProxyHeader definitely does not move the buf pointer. Err(proxy_protocol::ParseError::NotProxyHeader) => break, - // This won't move the buf cursor forward + // This can move the buf cursor forward // and will most likely end in an error higher in the call stack // This means "PROXY protocol was used, but it was in an unknown format" // (Maybe someday if nginx/etc. uses a new version of the protocol // and we don't update the dependency to a version that handles the new // version, it might break.) - Err(_) => break, + // OR it could mean a PROXY header fell on the BufReader buffer boundary. + Err(_) => { + // This is the buf state before this error returned. + // This match arm will modify buf past the version bytes + // and stop in a state pointing to the middle of a broken header. + // We return the buf state to the beginning of the previous version bytes. + *buf = original_buf; + error_exit = true; + break; + } }; + // After each successful header parse, save the new buf state. + original_buf = *buf; trace!("Parsed PROXY protocol header: {:?}", p_header); // Increment from 0 to 1 before the first check current_header_index += 1; @@ -1242,7 +1263,7 @@ fn parse_proxy_headers( // parsed when the 1 based index is equal addr = proxy_header_to_source_socket_addr(p_header); } - (addr, before_len - buf.len()) + (addr, before_len - buf.len(), error_exit) } fn parse_panic_error(e: &(dyn std::any::Any + Send)) -> &str { @@ -1263,7 +1284,8 @@ fn process_carry_over( carrying_over: &mut usize, carry_over: &mut [u8], available: &mut &[u8], -) -> usize { +) -> (usize, bool) { + // Step 0: Copy as much from available into carry_over to try and parse // How much space do we have left in the array? let empty_space = carry_over.len() - *carrying_over; // How many bytes should we copy over? @@ -1272,25 +1294,127 @@ fn process_carry_over( carry_over[*carrying_over..*carrying_over + copy_bytes] .copy_from_slice(&available[..copy_bytes]); - // Figure out if it was a proxy header or not. + // Step 1: Figure out if it was a proxy header or not. #[allow(clippy::redundant_slicing)] let mut cursor = &carry_over[..]; - let before_len = cursor.len(); - let was_proxy = proxy_protocol::parse(&mut cursor).is_ok(); - let skipped_count = before_len - cursor.len(); + let (_, skipped_count, exit_error) = parse_proxy_headers(&mut cursor, 0); - let skip_count = if was_proxy { - // We only want to skip the amount in the new buffer - skipped_count.saturating_sub(*carrying_over) - } else { - 0 - }; + // Step 2: Figure out how much we need to skip available forward + let skip_count = skipped_count.saturating_sub(*carrying_over); - // Move the available cursor + // Step 3: Move the available cursor *available = &available[skip_count..]; - // Reset carrying over (writing 0s to the array is unnecessary) + // Step 4: Reset carrying over (writing 0s to the array is unnecessary) *carrying_over = 0; // Return the skip count so we can call consume later - skip_count + (skip_count, exit_error) +} + +/// Insert the carry over +fn insert_carry_over(carrying_over: &mut usize, carry_over_arr: &mut [u8], available: &[u8]) { + *carrying_over = available.len().min(carry_over_arr.len()); + carry_over_arr[..*carrying_over].copy_from_slice(&available[..*carrying_over]); +} + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn test_read_until() { + // len 48 + let v1 = "PROXY TCP6 ab:ce:ef:01:23:45:67:89 ::1 0 65535\r\n" + .as_bytes() + .to_vec(); + // len 31 + let v2 = + hex::decode("0d0a0d0a000d0a515549540a2111000f7f000001c0a80001ffff0101450000").unwrap(); + let simple_json = "{}\n".as_bytes().to_vec(); + // len 26 + let larger_json = r#"{"id":2,"name":"electrs"}\n"#.as_bytes().to_vec(); + + let vectors: Vec<(Vec, usize, &[u8], std::result::Result)> = vec![ + // Simple JSON with LF + (simple_json.clone(), 3, &simple_json, Ok(3)), + // Simple JSON with LF + PROXY v1 + ( + [v1.clone(), simple_json.clone()].concat(), + 51, + &simple_json, + Ok(51), + ), + // Simple JSON with LF + PROXY v2 + ( + [v2.clone(), simple_json.clone()].concat(), + 34, + &simple_json, + Ok(34), + ), + // Simple JSON with LF + two layers of proxy + ( + [v1.clone(), v2.clone(), simple_json.clone()].concat(), + 82, + &simple_json, + Ok(82), + ), + // Simple JSON that goes over the buffer boundary + ( + larger_json.clone(), + 2, // capacity + &larger_json, + Ok(27), + ), + // Simple JSON with LF + two layers of proxy + ( + [v1.clone(), v2.clone(), simple_json.clone()].concat(), + 45, // 3 bytes before v1 header ends + &simple_json, + Ok(82), + ), + // Simple JSON with LF + two layers of proxy + ( + [v1.clone(), v2.clone(), simple_json.clone()].concat(), + 46, // 2 bytes before v1 header ends + &simple_json, + Ok(82), + ), + // Capacity exactly on the last byte of v1 == parser error (library) + // TODO: make PR for upstream + // ( + // [v1.clone(), v2.clone(), simple_json.clone()].concat(), + // 47, // 1 bytes before v1 header ends (just befor \n) + // &simple_json, + // Ok(82), + // ), + // TODO: When the BufReader boundary hits in a v2 PROXY header it crashes + // ( + // [v1.clone(), v2.clone(), simple_json.clone()].concat(), + // 49, // 1 byte into v2 header + // &simple_json, + // Ok(82), + // ), + // TODO: make PR for upstream to bail when TLV is cut short + // ( + // [v1.clone(), v2.clone(), simple_json.clone()].concat(), + // 77, // 2 bytes short of v2 ending + // &simple_json, + // Ok(82), + // ), + ( + [v1.clone(), v2.clone(), larger_json.clone()].concat(), + 80, // 1 after v2 ends + &larger_json, + Ok(106), + ), + ]; + + for (input, capacity, bytes, count) in vectors { + let mut buf = BufReader::with_capacity(capacity, input.as_slice()); + let mut v = vec![]; + let result_count = read_until(&mut buf, b'\n', &mut v).map_err(|e| format!("{e}")); + assert_eq!(count, result_count); + assert_eq!(bytes, &v); + } + } } From 9f6506594d3546ec3ae212067193b595a96b24ac Mon Sep 17 00:00:00 2001 From: junderw Date: Thu, 12 Oct 2023 21:04:23 -0700 Subject: [PATCH 3/5] Unwind panics --- Cargo.toml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/Cargo.toml b/Cargo.toml index 133362f6..81e4e7b8 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -73,7 +73,7 @@ tempfile = "3.0" [profile.release] lto = true -panic = 'abort' +panic = 'unwind' # This is default, but required, so explicitly writing it codegen-units = 1 [patch.crates-io.electrum-client] From 071a72331dbe451fd7f365f27d1bd3ba776de5e0 Mon Sep 17 00:00:00 2001 From: junderw Date: Sat, 27 Jul 2024 15:32:36 +0900 Subject: [PATCH 4/5] Fix cargo fmt --- src/rest.rs | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/src/rest.rs b/src/rest.rs index 7abbe0d5..6191cb30 100644 --- a/src/rest.rs +++ b/src/rest.rs @@ -19,15 +19,15 @@ use bitcoin::hashes::Error as HashError; use hex::{self, FromHexError}; use hyper::service::{make_service_fn, service_fn}; use hyper::{header::HeaderValue, Body, HeaderMap, Method, Response, Server, StatusCode}; -use prometheus::{HistogramOpts, HistogramVec}; use itertools::Itertools; +use prometheus::{HistogramOpts, HistogramVec}; use tokio::sync::oneshot; use hyperlocal::UnixServerExt; use std::borrow::Cow; use std::cell::Cell; -use std::{cmp, fs}; use std::net::IpAddr; +use std::{cmp, fs}; #[cfg(feature = "liquid")] use { crate::elements::{peg::PegoutValue, AssetSorting, IssuanceValue}, From 0d12575047099ffd8f7518527ca09caa2d78c843 Mon Sep 17 00:00:00 2001 From: junderw Date: Wed, 4 Sep 2024 22:49:20 +0900 Subject: [PATCH 5/5] Fix: Upstream issues with rare panics --- Cargo.lock | 3 +-- Cargo.toml | 4 ++++ src/electrum/server.rs | 26 ++++++++++++-------------- 3 files changed, 17 insertions(+), 16 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 19caf596..92184f3e 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1109,8 +1109,7 @@ checksum = "cf7e6d18738ecd0902d30d1ad232c9125985a3422929b16c65517b38adc14f96" [[package]] name = "proxy-protocol" version = "0.5.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "0e50c72c21c738f5c5f350cc33640aee30bf7cd20f9d9da20ed41bce2671d532" +source = "git+https://github.com/junderw/proxy-protocol?rev=5f5431ecdae75c7e8aba0f7aebcfc2e0102b70dc#5f5431ecdae75c7e8aba0f7aebcfc2e0102b70dc" dependencies = [ "bytes", "snafu", diff --git a/Cargo.toml b/Cargo.toml index 006361df..841a7065 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -79,3 +79,7 @@ codegen-units = 1 [patch.crates-io.electrum-client] git = "https://github.com/Blockstream/rust-electrum-client" rev = "d3792352992a539afffbe11501d1aff9fd5b919d" # add-peer branch + +[patch.crates-io.proxy-protocol] +git = "https://github.com/junderw/proxy-protocol" +rev = "5f5431ecdae75c7e8aba0f7aebcfc2e0102b70dc" # fix/eof-panics branch diff --git a/src/electrum/server.rs b/src/electrum/server.rs index 4d4db23f..31c1a772 100644 --- a/src/electrum/server.rs +++ b/src/electrum/server.rs @@ -1424,13 +1424,12 @@ mod tests { Ok(82), ), // Capacity exactly on the last byte of v1 == parser error (library) - // TODO: make PR for upstream - // ( - // [v1.clone(), v2.clone(), simple_json.clone()].concat(), - // 47, // 1 bytes before v1 header ends (just befor \n) - // &simple_json, - // Ok(82), - // ), + ( + [v1.clone(), v2.clone(), simple_json.clone()].concat(), + 47, // 1 bytes before v1 header ends (just befor \n) + &simple_json, + Ok(82), + ), // TODO: When the BufReader boundary hits in a v2 PROXY header it crashes // ( // [v1.clone(), v2.clone(), simple_json.clone()].concat(), @@ -1438,13 +1437,12 @@ mod tests { // &simple_json, // Ok(82), // ), - // TODO: make PR for upstream to bail when TLV is cut short - // ( - // [v1.clone(), v2.clone(), simple_json.clone()].concat(), - // 77, // 2 bytes short of v2 ending - // &simple_json, - // Ok(82), - // ), + ( + [v1.clone(), v2.clone(), simple_json.clone()].concat(), + 77, // 2 bytes short of v2 ending + &simple_json, + Ok(82), + ), ( [v1.clone(), v2.clone(), larger_json.clone()].concat(), 80, // 1 after v2 ends