diff --git a/cephsum/actions.py b/cephsum/actions.py index 184f291..405875f 100644 --- a/cephsum/actions.py +++ b/cephsum/actions.py @@ -13,17 +13,17 @@ -def get_from_metatdata(ioctx, path, xattr_name = "XrdCks.adler32"): +def get_from_metatdata(ioctx, path, xattr_name = "XrdCks.adler32", use_multithreading=False, mt_workers=1): """Try to get checksum info from metadata only. """ xrdcks = cephtools.cks_from_metadata(ioctx,path,xattr_name) logging.info(xrdcks) return xrdcks # returns None if not existing -def get_from_file(ioctx, path, readsize): +def get_from_file(ioctx, path, readsize,use_multithreading=False, mt_workers=1): """Try to get checksum info from file only. """ - xrdcks = cephtools.cks_from_file(ioctx,path,readsize) + xrdcks = cephtools.cks_from_file(ioctx,path,readsize, use_multithreading=use_multithreading, max_workers=mt_workers) logging.info(xrdcks) return xrdcks # returns None if not existing @@ -41,7 +41,7 @@ def get_checksum(ioctx, path, readsize, xattr_name = "XrdCks.adler32"): -def inget(ioctx, path, readsize, xattr_name = "XrdCks.adler32",rewriteto_littleendian=True): +def inget(ioctx, path, readsize, xattr_name = "XrdCks.adler32",rewriteto_littleendian=True, use_multithreading=False, mt_workers=1): """Return a checksum; if in metadata, just return that. If no metadata, obtain from file and store metadata. If rewriteto_littleendian and metadata was stored in big endian; write it back as little endian """ @@ -57,7 +57,7 @@ def inget(ioctx, path, readsize, xattr_name = "XrdCks.adler32",rewriteto_littlee if xrdcks is None: source = 'file' - xrdcks = cephtools.cks_from_file(ioctx, path,readsize) + xrdcks = cephtools.cks_from_file(ioctx, path,readsize, use_multithreading=use_multithreading, max_workers=mt_workers) if xrdcks is None: logging.warning(f"No checksum possible for {path} from file") return None @@ -73,7 +73,7 @@ def inget(ioctx, path, readsize, xattr_name = "XrdCks.adler32",rewriteto_littlee return xrdcks -def verify(ioctx, path, readsize, xattr_name = "XrdCks.adler32", force_fileread=False): +def verify(ioctx, path, readsize, xattr_name = "XrdCks.adler32", force_fileread=False, use_multithreading=False, mt_workers=1): """compare the stored checksum against the file-computed value. If no stored metadata, still compute file (if requested), but compare as false. """ @@ -85,7 +85,7 @@ def verify(ioctx, path, readsize, xattr_name = "XrdCks.adler32", force_fileread= if xrdcks_stored is None and not force_fileread: xrdcks_file = None else: - xrdcks_file = cephtools.cks_from_file(ioctx, path,readsize) + xrdcks_file = cephtools.cks_from_file(ioctx, path,readsize, use_multithreading=use_multithreading, max_workers=mt_workers) if xrdcks_stored is None: matching = False diff --git a/cephsum/adler32.py b/cephsum/adler32.py index c6a95b5..a5683fb 100644 --- a/cephsum/adler32.py +++ b/cephsum/adler32.py @@ -44,14 +44,18 @@ def calc_checksum(self,buffer): value = 1 # initilising value bytes_read = 0 counter = 0 - for buf in buffer: - # need to consider intra-file chunks - value = zlib.adler32( buf, value) - bytes_read += len(buf) - counter += 1 - if self.log_each_step: - logging.debug('%s: %s %s %s' % (self.name, self.adler32_inttohex(value), len(buf), bytes_read) ) - + try: + for buf in buffer: + # need to consider intra-file chunks + value = zlib.adler32( buf, value) + bytes_read += len(buf) + counter += 1 + if self.log_each_step: + logging.debug('%s: %s %s %s' % (self.name, self.adler32_inttohex(value), len(buf), bytes_read) ) + except StopIteration: + logging.debug(f"Got Stopiteration after: {bytes_read} bytes") + + self.value = self.adler32_inttohex(value) self.bytes_read = bytes_read self.number_buffers = counter diff --git a/cephsum/cephsum.py b/cephsum/cephsum.py index 6e6fcd6..2358907 100644 --- a/cephsum/cephsum.py +++ b/cephsum/cephsum.py @@ -88,6 +88,10 @@ def convert_path(path, xmlfile=None): parser.add_argument('--cephuser',default='client.xrootd', dest='ceph_user', help='ceph user name for the client keyring') + parser.add_argument('--mt',default=False,help='Use multithreading',action='store_true') + parser.add_argument('--workers',default=1,type=int,help='If multithreading, specify the max number of workers') + + # actual path to use, as a positional argument; only one allowed parser.add_argument('path', nargs=1) @@ -136,6 +140,10 @@ def convert_path(path, xmlfile=None): if args.action == 'check' and source_checksum is None: raise ValueError("Need --type|-C in form adler32: for 'check' action with source checksum value") + + use_multithreading = args.mt + mt_workers = args.workers + xrdcks,adler = None,None timestart = datetime.now() @@ -146,15 +154,15 @@ def convert_path(path, xmlfile=None): try: with cluster.open_ioctx(pool) as ioctx: if args.action in ['inget','check']: - xrdcks = actions.inget(ioctx,path,readsize,xattr_name) + xrdcks = actions.inget(ioctx,path,readsize,xattr_name, use_multithreading=use_multithreading, mt_workers=mt_workers) elif args.action == 'verify': - xrdcks = actions.verify(ioctx,path,readsize,xattr_name) + xrdcks = actions.verify(ioctx,path,readsize,xattr_name, use_multithreading=use_multithreading, mt_workers=mt_workers) elif args.action == 'get': - xrdcks = actions.get_checksum(ioctx,path,readsize, xattr_name) + xrdcks = actions.get_checksum(ioctx,path,readsize, xattr_name, use_multithreading=use_multithreading, mt_workers=mt_workers) elif args.action == 'metaonly': xrdcks = actions.get_from_metatdata(ioctx,path,xattr_name) elif args.action == 'fileonly': - xrdcks = actions.get_from_file(ioctx,path, readsize) + xrdcks = actions.get_from_file(ioctx,path, readsize, use_multithreading=use_multithreading, mt_workers=mt_workers) else: logging.warning(f'Action {args.action} is not implemented') raise NotImplementedError(f'Action {args.action} is not implemented') diff --git a/cephsum/cephtools.py b/cephsum/cephtools.py index 155b364..d0b0e31 100644 --- a/cephsum/cephtools.py +++ b/cephsum/cephtools.py @@ -5,6 +5,10 @@ import XrdCks,adler32 import rados +import queue +from concurrent.futures import ThreadPoolExecutor, CancelledError, TimeoutError + + chunk0=f'.{0:016x}' # Chunks are hex valued nZeros=16 @@ -53,9 +57,9 @@ def get_chunks(ioctx,path, stripe_count=None): while True: try: oid = path+f'.{counter:016x}' # chunks are hex encoded - ioctx.stat(oid) + size, ts = ioctx.stat(oid) #logging.debug(oid) - yield oid + yield oid, size except rados.ObjectNotFound: raise StopIteration counter += 1 @@ -105,12 +109,74 @@ def read_file_btyes(ioctx, path, stripe_size_bytes=None, number_of_stripes=None, if stripe_size_bytes is None, will use READSIZE and read each stripe for all data. if stripe_size_bytes is given, will assume each chunk is the given size. """ - for oid in get_chunks(ioctx, path, number_of_stripes): + for oid, _ in get_chunks(ioctx, path, number_of_stripes): for buffer in read_oid_bytes(ioctx, oid, stripe_size_bytes, readsize=readsize): yield buffer # Sanity stop statement at end. raise StopIteration +stop_all_workers = False + +def readio_mt(ioctx, oid: str, extent: slice) -> bytearray: + global stop_all_workers + if stop_all_workers: # cancellation signal + logging.debug(f'Got cancellation for thread: {oid}: {extent}') + return oid, extent, None + buf = ioctx.read(oid, length = extent.stop - extent.start, offset = extent.start) + return oid,extent, buf + +def read_file_btyes_multithreaded(ioctx, path, stripe_size_bytes=None, number_of_stripes=None,readsize=64*1024*1024, max_workers=1): + """Yield all bytes in a file, looping over chunks, and then bytes with the file using a multi-threaded threadpool + + if stripe_size_bytes is None, will use READSIZE and read each stripe for all data. + if stripe_size_bytes is given, will assume each chunk is the given size. + """ + assert stripe_size_bytes is not None # must know the number of bytes per chunk here + assert number_of_stripes is not None # must know the number of chunks here + assert readsize <= stripe_size_bytes # only read at most the number of bytes in a chunk + assert stripe_size_bytes % readsize == 0 # + + futures = queue.deque() + with ThreadPoolExecutor(max_workers=max_workers) as executor: + # load up the deque will all the slices + # get_chunks does a stat on each file to check for existence (what if the file is deleted mid read?) + for oid, chunk_size in get_chunks(ioctx, path, number_of_stripes): + # create a iterable of slices corresponding to the reads within each chunk + # only allow to read to the length of the chunk + for extent in map(lambda offset: slice(offset,min(chunk_size, offset+readsize),None), range(0, min(chunk_size, stripe_size_bytes), readsize)): + futures.append(executor.submit(readio_mt, ioctx, oid, extent) ) + + # wait for the ordered futures to completed: + global stop_all_workers + while True: + try: + f = futures.popleft() + except IndexError: # no more entries + break + try: + oid, extent, buf = f.result(timeout=120) # return the buffer of bytes + logging.debug(f"Result for {oid} {extent} {'None' if buf is None else len(buf)}") + except TimeoutError: + logging.error("Reached read timeout; aborting") + stop_all_workers = True + raise TimeoutError() + + + except CancelledError: + logging.debug(f'Got Cancelled error; aborted the remaining tasks') + + if buf is None or len(buf) == 0: + # a problem or were at the end; cancel the other + # global stop_all_workers + stop_all_workers = True + logging.debug(f"Got 0 len array; stop all workers") + + continue # don't yield below, but make sure we pop all the futures + yield buf + + #raise StopIteration + ## Sanity stop statement at end. + @@ -234,7 +300,7 @@ def cks_write_metadata(ioctx, path, xattr_name, xattr_value, force_overwrite=Fal -def cks_from_file(ioctx, path, readsize): +def cks_from_file(ioctx, path, readsize, use_multithreading=False, max_workers=1): """Calculate checksum from path. Returns None or checksum object Raise error if not existing""" @@ -257,7 +323,10 @@ def cks_from_file(ioctx, path, readsize): try: cks_alg = adler32.adler32('adler32') - cks_hex = cks_alg.calc_checksum( read_file_btyes(ioctx, path, rados_object_size, num_stripes,readsize) ) + if use_multithreading: + cks_hex = cks_alg.calc_checksum( read_file_btyes_multithreaded(ioctx, path, rados_object_size, num_stripes,readsize, max_workers) ) + else: + cks_hex = cks_alg.calc_checksum( read_file_btyes(ioctx, path, rados_object_size, num_stripes,readsize) ) bytes_read = cks_alg.bytes_read except Exception as e: raise e