|
14 | 14 | # |
15 | 15 |
|
16 | 16 | from abc import ABC, abstractmethod |
17 | | -from contextlib import contextmanager |
| 17 | +from contextlib import contextmanager, nullcontext |
18 | 18 | from datetime import datetime |
19 | 19 | from functools import partial |
20 | 20 | from weakref import WeakSet |
21 | 21 | from shlex import quote |
22 | | -from time import monotonic |
23 | 22 | import os |
24 | 23 | import signal |
25 | 24 | import socket |
@@ -61,12 +60,26 @@ class ConnectionBase(InitCheckpoint): |
61 | 60 | """ |
62 | 61 | Base class for all connections. |
63 | 62 | """ |
64 | | - def __init__(self): |
| 63 | + def __init__( |
| 64 | + self, |
| 65 | + poll_transfers=False, |
| 66 | + start_transfer_poll_delay=30, |
| 67 | + total_transfer_timeout=3600, |
| 68 | + transfer_poll_period=30, |
| 69 | + ): |
65 | 70 | self._current_bg_cmds = WeakSet() |
66 | 71 | self._closed = False |
67 | 72 | self._close_lock = threading.Lock() |
68 | 73 | self.busybox = None |
69 | 74 |
|
| 75 | + self.transfer_mgr = TransferManager( |
| 76 | + self, |
| 77 | + start_transfer_poll_delay=start_transfer_poll_delay, |
| 78 | + total_transfer_timeout=total_transfer_timeout, |
| 79 | + transfer_poll_period=transfer_poll_period, |
| 80 | + ) if poll_transfers else NoopTransferManager() |
| 81 | + |
| 82 | + |
70 | 83 | def cancel_running_command(self): |
71 | 84 | bg_cmds = set(self._current_bg_cmds) |
72 | 85 | for bg_cmd in bg_cmds: |
@@ -500,161 +513,147 @@ def __exit__(self, *args, **kwargs): |
500 | 513 | self.adb_popen.__exit__(*args, **kwargs) |
501 | 514 |
|
502 | 515 |
|
503 | | -class TransferManagerBase(ABC): |
504 | | - |
505 | | - def _pull_dest_size(self, dest): |
506 | | - if os.path.isdir(dest): |
507 | | - return sum( |
508 | | - os.stat(os.path.join(dirpath, f)).st_size |
509 | | - for dirpath, _, fnames in os.walk(dest) |
510 | | - for f in fnames |
511 | | - ) |
512 | | - else: |
513 | | - return os.stat(dest).st_size |
514 | | - |
515 | | - def _push_dest_size(self, dest): |
516 | | - cmd = '{} du -s {}'.format(quote(self.conn.busybox), quote(dest)) |
517 | | - out = self.conn.execute(cmd) |
518 | | - try: |
519 | | - return int(out.split()[0]) |
520 | | - except ValueError: |
521 | | - return 0 |
522 | | - |
523 | | - def __init__(self, conn, poll_period, start_transfer_poll_delay, total_timeout): |
| 516 | +class TransferManager: |
| 517 | + def __init__(self, conn, poll_period=30, start_transfer_poll_delay=30, total_timeout=3600): |
524 | 518 | self.conn = conn |
525 | 519 | self.poll_period = poll_period |
526 | 520 | self.total_timeout = total_timeout |
527 | 521 | self.start_transfer_poll_delay = start_transfer_poll_delay |
528 | 522 |
|
529 | 523 | self.logger = logging.getLogger('FileTransfer') |
530 | | - self.managing = threading.Event() |
531 | | - self.transfer_started = threading.Event() |
532 | | - self.transfer_completed = threading.Event() |
533 | | - self.transfer_aborted = threading.Event() |
534 | 524 |
|
535 | | - self.monitor_thread = None |
536 | | - self.sources = None |
537 | | - self.dest = None |
538 | | - self.direction = None |
| 525 | + @contextmanager |
| 526 | + def manage(self, sources, dest, direction, handle): |
| 527 | + excep = None |
| 528 | + stop_thread = threading.Event() |
| 529 | + |
| 530 | + def monitor(): |
| 531 | + nonlocal excep |
| 532 | + |
| 533 | + def cancel(reason): |
| 534 | + self.logger.warning( |
| 535 | + f'Cancelling file transfer {sources} -> {dest} due to: {reason}' |
| 536 | + ) |
| 537 | + handle.cancel() |
| 538 | + |
| 539 | + start_t = time.monotonic() |
| 540 | + stop_thread.wait(self.start_transfer_poll_delay) |
| 541 | + while not stop_thread.wait(self.poll_period): |
| 542 | + if not handle.isactive(): |
| 543 | + cancel(reason='transfer inactive') |
| 544 | + elif monotonic() - start_t > self.total_timeout: |
| 545 | + cancel(reason='transfer timed out') |
| 546 | + excep = TimeoutError(f'{direction}: {sources} -> {dest}') |
| 547 | + |
| 548 | + m_thread = threading.Thread(target=monitor, daemon=True) |
| 549 | + try: |
| 550 | + m_thread.start() |
| 551 | + yield self |
| 552 | + finally: |
| 553 | + stop_thread.set() |
| 554 | + m_thread.join() |
| 555 | + if excep is not None: |
| 556 | + raise excep |
539 | 557 |
|
540 | | - @abstractmethod |
541 | | - def _cancel(self): |
542 | | - pass |
543 | 558 |
|
544 | | - def cancel(self, reason=None): |
545 | | - msg = 'Cancelling file transfer {} -> {}'.format(self.sources, self.dest) |
546 | | - if reason is not None: |
547 | | - msg += ' due to \'{}\''.format(reason) |
548 | | - self.logger.warning(msg) |
549 | | - self.transfer_aborted.set() |
550 | | - self._cancel() |
| 559 | +class NoopTransferManager: |
| 560 | + def manage(self, *args, **kwargs): |
| 561 | + return nullcontext(self) |
| 562 | + |
| 563 | + |
| 564 | +class TransferHandleBase(ABC): |
| 565 | + def __init__(self, mgr): |
| 566 | + self.mgr = mgr |
551 | 567 |
|
552 | 568 | @abstractmethod |
553 | 569 | def isactive(self): |
554 | 570 | pass |
555 | 571 |
|
556 | | - @contextmanager |
557 | | - def manage(self, sources, dest, direction): |
558 | | - try: |
559 | | - self.sources, self.dest, self.direction = sources, dest, direction |
560 | | - m_thread = threading.Thread(target=self._monitor) |
| 572 | + @abstractmethod |
| 573 | + def cancel(self): |
| 574 | + pass |
561 | 575 |
|
562 | | - self.transfer_completed.clear() |
563 | | - self.transfer_aborted.clear() |
564 | | - self.transfer_started.set() |
565 | 576 |
|
566 | | - m_thread.start() |
567 | | - yield self |
568 | | - except BaseException: |
569 | | - self.cancel(reason='exception during transfer') |
570 | | - raise |
571 | | - finally: |
572 | | - self.transfer_completed.set() |
573 | | - self.transfer_started.set() |
574 | | - m_thread.join() |
575 | | - self.transfer_started.clear() |
576 | | - self.transfer_completed.clear() |
577 | | - self.transfer_aborted.clear() |
| 577 | +class PopenTransferHandle(TransferHandleBase): |
| 578 | + def __init__(self, bg_cmd, dest, direction, *args, **kwargs): |
| 579 | + super().__init__(*args, **kwargs) |
578 | 580 |
|
579 | | - def _monitor(self): |
580 | | - start_t = monotonic() |
581 | | - self.transfer_completed.wait(self.start_transfer_poll_delay) |
582 | | - while not self.transfer_completed.wait(self.poll_period): |
583 | | - if not self.isactive(): |
584 | | - self.cancel(reason='transfer inactive') |
585 | | - elif monotonic() - start_t > self.total_timeout: |
586 | | - self.cancel(reason='transfer timed out') |
| 581 | + if direction == 'push': |
| 582 | + sample_size = self._push_dest_size |
| 583 | + elif direction == 'pull': |
| 584 | + sample_size = self._pull_dest_size |
| 585 | + else: |
| 586 | + raise ValueError(f'Unknown direction: {direction}') |
587 | 587 |
|
| 588 | + self.sample_size = lambda: sample_size(dest) |
588 | 589 |
|
589 | | -class PopenTransferManager(TransferManagerBase): |
| 590 | + self.bg_cmd = bg_cmd |
| 591 | + self.last_sample = 0 |
590 | 592 |
|
591 | | - def __init__(self, conn, poll_period=30, start_transfer_poll_delay=30, total_timeout=3600): |
592 | | - super().__init__(conn, poll_period, start_transfer_poll_delay, total_timeout) |
593 | | - self.transfer = None |
594 | | - self.last_sample = None |
| 593 | + @staticmethod |
| 594 | + def _pull_dest_size(dest): |
| 595 | + if os.path.isdir(dest): |
| 596 | + return sum( |
| 597 | + os.stat(os.path.join(dirpath, f)).st_size |
| 598 | + for dirpath, _, fnames in os.walk(dest) |
| 599 | + for f in fnames |
| 600 | + ) |
| 601 | + else: |
| 602 | + return os.stat(dest).st_size |
595 | 603 |
|
596 | | - def _cancel(self): |
597 | | - if self.transfer: |
598 | | - self.transfer.cancel() |
599 | | - self.transfer = None |
600 | | - self.last_sample = None |
| 604 | + def _push_dest_size(self, dest): |
| 605 | + conn = self.mgr.conn |
| 606 | + cmd = '{} du -s -- {}'.format(quote(conn.busybox), quote(dest)) |
| 607 | + out = conn.execute(cmd) |
| 608 | + return int(out.split()[0]) |
| 609 | + |
| 610 | + def cancel(self): |
| 611 | + self.bg_cmd.cancel() |
601 | 612 |
|
602 | 613 | def isactive(self): |
603 | | - size_fn = self._push_dest_size if self.direction == 'push' else self._pull_dest_size |
604 | | - curr_size = size_fn(self.dest) |
605 | | - self.logger.debug('Polled file transfer, destination size {}'.format(curr_size)) |
606 | | - active = True if self.last_sample is None else curr_size > self.last_sample |
607 | | - self.last_sample = curr_size |
608 | | - return active |
| 614 | + try: |
| 615 | + curr_size = self.sample_size() |
| 616 | + except Exception as e: |
| 617 | + self.logger.debug(f'File size polling failed: {e}') |
| 618 | + return True |
| 619 | + else: |
| 620 | + self.logger.debug(f'Polled file transfer, destination size: {curr_size}') |
| 621 | + if curr_size: |
| 622 | + active = curr_size > self.last_sample |
| 623 | + self.last_sample = curr_size |
| 624 | + return active |
| 625 | + # If the file is empty it will never grow in size, so we assume |
| 626 | + # everything is going well. |
| 627 | + else: |
| 628 | + return True |
609 | 629 |
|
610 | | - def set_transfer_and_wait(self, popen_bg_cmd): |
611 | | - self.transfer = popen_bg_cmd |
612 | | - self.last_sample = None |
613 | | - ret = self.transfer.wait() |
614 | 630 |
|
615 | | - if ret and not self.transfer_aborted.is_set(): |
616 | | - raise subprocess.CalledProcessError(ret, self.transfer.popen.args) |
617 | | - elif self.transfer_aborted.is_set(): |
618 | | - raise TimeoutError(self.transfer.popen.args) |
| 631 | +class SSHTransferHandle(TransferHandleBase): |
619 | 632 |
|
| 633 | + def __init__(self, handle, *args, **kwargs): |
| 634 | + super().__init__(*args, **kwargs) |
620 | 635 |
|
621 | | -class SSHTransferManager(TransferManagerBase): |
| 636 | + # SFTPClient or SSHClient |
| 637 | + self.handle = handle |
622 | 638 |
|
623 | | - def __init__(self, conn, poll_period=30, start_transfer_poll_delay=30, total_timeout=3600): |
624 | | - super().__init__(conn, poll_period, start_transfer_poll_delay, total_timeout) |
625 | | - self.transferer = None |
626 | 639 | self.progressed = False |
627 | | - self.transferred = None |
628 | | - self.to_transfer = None |
| 640 | + self.transferred = 0 |
| 641 | + self.to_transfer = 0 |
629 | 642 |
|
630 | | - def _cancel(self): |
631 | | - self.transferer.close() |
| 643 | + def cancel(self): |
| 644 | + self.handle.close() |
632 | 645 |
|
633 | 646 | def isactive(self): |
634 | 647 | progressed = self.progressed |
635 | | - self.progressed = False |
636 | | - msg = 'Polled transfer: {}% [{}B/{}B]' |
637 | | - pc = format((self.transferred / self.to_transfer) * 100, '.2f') |
638 | | - self.logger.debug(msg.format(pc, self.transferred, self.to_transfer)) |
| 648 | + if progressed: |
| 649 | + self.progressed = False |
| 650 | + pc = self.transferred / self.to_transfer * 100 |
| 651 | + self.logger.debug( |
| 652 | + f'Polled transfer: {pc:.2f}% [{self.transferred}B/{self.to_transfer}B]' |
| 653 | + ) |
639 | 654 | return progressed |
640 | 655 |
|
641 | | - @contextmanager |
642 | | - def manage(self, sources, dest, direction, transferer): |
643 | | - with super().manage(sources, dest, direction): |
644 | | - try: |
645 | | - self.progressed = False |
646 | | - self.transferer = transferer # SFTPClient or SCPClient |
647 | | - yield self |
648 | | - except socket.error as e: |
649 | | - if self.transfer_aborted.is_set(): |
650 | | - self.transfer_aborted.clear() |
651 | | - method = 'SCP' if self.conn.use_scp else 'SFTP' |
652 | | - raise TimeoutError('{} {}: {} -> {}'.format(method, self.direction, sources, self.dest)) |
653 | | - else: |
654 | | - raise e |
655 | | - |
656 | 656 | def progress_cb(self, to_transfer, transferred): |
657 | | - if self.transfer_started.is_set(): |
658 | | - self.progressed = True |
659 | | - self.transferred = transferred |
660 | | - self.to_transfer = to_transfer |
| 657 | + self.progressed = True |
| 658 | + self.transferred = transferred |
| 659 | + self.to_transfer = to_transfer |
0 commit comments