Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 7 additions & 7 deletions cephsum/actions.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,17 +13,17 @@



def get_from_metatdata(ioctx, path, xattr_name = "XrdCks.adler32"):
def get_from_metatdata(ioctx, path, xattr_name = "XrdCks.adler32", use_multithreading=False, mt_workers=1):
"""Try to get checksum info from metadata only.
"""
xrdcks = cephtools.cks_from_metadata(ioctx,path,xattr_name)
logging.info(xrdcks)
return xrdcks # returns None if not existing

def get_from_file(ioctx, path, readsize):
def get_from_file(ioctx, path, readsize,use_multithreading=False, mt_workers=1):
"""Try to get checksum info from file only.
"""
xrdcks = cephtools.cks_from_file(ioctx,path,readsize)
xrdcks = cephtools.cks_from_file(ioctx,path,readsize, use_multithreading=use_multithreading, max_workers=mt_workers)
logging.info(xrdcks)
return xrdcks # returns None if not existing

Expand All @@ -41,7 +41,7 @@ def get_checksum(ioctx, path, readsize, xattr_name = "XrdCks.adler32"):



def inget(ioctx, path, readsize, xattr_name = "XrdCks.adler32",rewriteto_littleendian=True):
def inget(ioctx, path, readsize, xattr_name = "XrdCks.adler32",rewriteto_littleendian=True, use_multithreading=False, mt_workers=1):
"""Return a checksum; if in metadata, just return that. If no metadata, obtain from file and store metadata.
If rewriteto_littleendian and metadata was stored in big endian; write it back as little endian
"""
Expand All @@ -57,7 +57,7 @@ def inget(ioctx, path, readsize, xattr_name = "XrdCks.adler32",rewriteto_littlee

if xrdcks is None:
source = 'file'
xrdcks = cephtools.cks_from_file(ioctx, path,readsize)
xrdcks = cephtools.cks_from_file(ioctx, path,readsize, use_multithreading=use_multithreading, max_workers=mt_workers)
if xrdcks is None:
logging.warning(f"No checksum possible for {path} from file")
return None
Expand All @@ -73,7 +73,7 @@ def inget(ioctx, path, readsize, xattr_name = "XrdCks.adler32",rewriteto_littlee
return xrdcks


def verify(ioctx, path, readsize, xattr_name = "XrdCks.adler32", force_fileread=False):
def verify(ioctx, path, readsize, xattr_name = "XrdCks.adler32", force_fileread=False, use_multithreading=False, mt_workers=1):
"""compare the stored checksum against the file-computed value.
If no stored metadata, still compute file (if requested), but compare as false.
"""
Expand All @@ -85,7 +85,7 @@ def verify(ioctx, path, readsize, xattr_name = "XrdCks.adler32", force_fileread=
if xrdcks_stored is None and not force_fileread:
xrdcks_file = None
else:
xrdcks_file = cephtools.cks_from_file(ioctx, path,readsize)
xrdcks_file = cephtools.cks_from_file(ioctx, path,readsize, use_multithreading=use_multithreading, max_workers=mt_workers)

if xrdcks_stored is None:
matching = False
Expand Down
20 changes: 12 additions & 8 deletions cephsum/adler32.py
Original file line number Diff line number Diff line change
Expand Up @@ -44,14 +44,18 @@ def calc_checksum(self,buffer):
value = 1 # initilising value
bytes_read = 0
counter = 0
for buf in buffer:
# need to consider intra-file chunks
value = zlib.adler32( buf, value)
bytes_read += len(buf)
counter += 1
if self.log_each_step:
logging.debug('%s: %s %s %s' % (self.name, self.adler32_inttohex(value), len(buf), bytes_read) )

try:
for buf in buffer:
# need to consider intra-file chunks
value = zlib.adler32( buf, value)
bytes_read += len(buf)
counter += 1
if self.log_each_step:
logging.debug('%s: %s %s %s' % (self.name, self.adler32_inttohex(value), len(buf), bytes_read) )
except StopIteration:
logging.debug(f"Got Stopiteration after: {bytes_read} bytes")


self.value = self.adler32_inttohex(value)
self.bytes_read = bytes_read
self.number_buffers = counter
Expand Down
16 changes: 12 additions & 4 deletions cephsum/cephsum.py
Original file line number Diff line number Diff line change
Expand Up @@ -88,6 +88,10 @@ def convert_path(path, xmlfile=None):
parser.add_argument('--cephuser',default='client.xrootd', dest='ceph_user',
help='ceph user name for the client keyring')

parser.add_argument('--mt',default=False,help='Use multithreading',action='store_true')
parser.add_argument('--workers',default=1,type=int,help='If multithreading, specify the max number of workers')



# actual path to use, as a positional argument; only one allowed
parser.add_argument('path', nargs=1)
Expand Down Expand Up @@ -136,6 +140,10 @@ def convert_path(path, xmlfile=None):
if args.action == 'check' and source_checksum is None:
raise ValueError("Need --type|-C in form adler32:<checksum> for 'check' action with source checksum value")


use_multithreading = args.mt
mt_workers = args.workers

xrdcks,adler = None,None
timestart = datetime.now()

Expand All @@ -146,15 +154,15 @@ def convert_path(path, xmlfile=None):
try:
with cluster.open_ioctx(pool) as ioctx:
if args.action in ['inget','check']:
xrdcks = actions.inget(ioctx,path,readsize,xattr_name)
xrdcks = actions.inget(ioctx,path,readsize,xattr_name, use_multithreading=use_multithreading, mt_workers=mt_workers)
elif args.action == 'verify':
xrdcks = actions.verify(ioctx,path,readsize,xattr_name)
xrdcks = actions.verify(ioctx,path,readsize,xattr_name, use_multithreading=use_multithreading, mt_workers=mt_workers)
elif args.action == 'get':
xrdcks = actions.get_checksum(ioctx,path,readsize, xattr_name)
xrdcks = actions.get_checksum(ioctx,path,readsize, xattr_name, use_multithreading=use_multithreading, mt_workers=mt_workers)
elif args.action == 'metaonly':
xrdcks = actions.get_from_metatdata(ioctx,path,xattr_name)
elif args.action == 'fileonly':
xrdcks = actions.get_from_file(ioctx,path, readsize)
xrdcks = actions.get_from_file(ioctx,path, readsize, use_multithreading=use_multithreading, mt_workers=mt_workers)
else:
logging.warning(f'Action {args.action} is not implemented')
raise NotImplementedError(f'Action {args.action} is not implemented')
Expand Down
79 changes: 74 additions & 5 deletions cephsum/cephtools.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,10 @@
import XrdCks,adler32
import rados

import queue
from concurrent.futures import ThreadPoolExecutor, CancelledError, TimeoutError


chunk0=f'.{0:016x}' # Chunks are hex valued
nZeros=16

Expand Down Expand Up @@ -53,9 +57,9 @@ def get_chunks(ioctx,path, stripe_count=None):
while True:
try:
oid = path+f'.{counter:016x}' # chunks are hex encoded
ioctx.stat(oid)
size, ts = ioctx.stat(oid)
#logging.debug(oid)
yield oid
yield oid, size
except rados.ObjectNotFound:
raise StopIteration
counter += 1
Expand Down Expand Up @@ -105,12 +109,74 @@ def read_file_btyes(ioctx, path, stripe_size_bytes=None, number_of_stripes=None,
if stripe_size_bytes is None, will use READSIZE and read each stripe for all data.
if stripe_size_bytes is given, will assume each chunk is the given size.
"""
for oid in get_chunks(ioctx, path, number_of_stripes):
for oid, _ in get_chunks(ioctx, path, number_of_stripes):
for buffer in read_oid_bytes(ioctx, oid, stripe_size_bytes, readsize=readsize):
yield buffer
# Sanity stop statement at end.
raise StopIteration

stop_all_workers = False

def readio_mt(ioctx, oid: str, extent: slice) -> bytearray:
global stop_all_workers
if stop_all_workers: # cancellation signal
logging.debug(f'Got cancellation for thread: {oid}: {extent}')
return oid, extent, None
buf = ioctx.read(oid, length = extent.stop - extent.start, offset = extent.start)
return oid,extent, buf

def read_file_btyes_multithreaded(ioctx, path, stripe_size_bytes=None, number_of_stripes=None,readsize=64*1024*1024, max_workers=1):
"""Yield all bytes in a file, looping over chunks, and then bytes with the file using a multi-threaded threadpool

if stripe_size_bytes is None, will use READSIZE and read each stripe for all data.
if stripe_size_bytes is given, will assume each chunk is the given size.
"""
assert stripe_size_bytes is not None # must know the number of bytes per chunk here
assert number_of_stripes is not None # must know the number of chunks here
assert readsize <= stripe_size_bytes # only read at most the number of bytes in a chunk
assert stripe_size_bytes % readsize == 0 #

futures = queue.deque()
with ThreadPoolExecutor(max_workers=max_workers) as executor:
# load up the deque will all the slices
# get_chunks does a stat on each file to check for existence (what if the file is deleted mid read?)
for oid, chunk_size in get_chunks(ioctx, path, number_of_stripes):
# create a iterable of slices corresponding to the reads within each chunk
# only allow to read to the length of the chunk
for extent in map(lambda offset: slice(offset,min(chunk_size, offset+readsize),None), range(0, min(chunk_size, stripe_size_bytes), readsize)):
futures.append(executor.submit(readio_mt, ioctx, oid, extent) )

# wait for the ordered futures to completed:
global stop_all_workers
while True:
try:
f = futures.popleft()
except IndexError: # no more entries
break
try:
oid, extent, buf = f.result(timeout=120) # return the buffer of bytes
logging.debug(f"Result for {oid} {extent} {'None' if buf is None else len(buf)}")
except TimeoutError:
logging.error("Reached read timeout; aborting")
stop_all_workers = True
raise TimeoutError()


except CancelledError:
logging.debug(f'Got Cancelled error; aborted the remaining tasks')

if buf is None or len(buf) == 0:
# a problem or were at the end; cancel the other
# global stop_all_workers
stop_all_workers = True
logging.debug(f"Got 0 len array; stop all workers")

continue # don't yield below, but make sure we pop all the futures
yield buf

#raise StopIteration
## Sanity stop statement at end.




Expand Down Expand Up @@ -234,7 +300,7 @@ def cks_write_metadata(ioctx, path, xattr_name, xattr_value, force_overwrite=Fal



def cks_from_file(ioctx, path, readsize):
def cks_from_file(ioctx, path, readsize, use_multithreading=False, max_workers=1):
"""Calculate checksum from path. Returns None or checksum object
Raise error if not existing"""

Expand All @@ -257,7 +323,10 @@ def cks_from_file(ioctx, path, readsize):

try:
cks_alg = adler32.adler32('adler32')
cks_hex = cks_alg.calc_checksum( read_file_btyes(ioctx, path, rados_object_size, num_stripes,readsize) )
if use_multithreading:
cks_hex = cks_alg.calc_checksum( read_file_btyes_multithreaded(ioctx, path, rados_object_size, num_stripes,readsize, max_workers) )
else:
cks_hex = cks_alg.calc_checksum( read_file_btyes(ioctx, path, rados_object_size, num_stripes,readsize) )
bytes_read = cks_alg.bytes_read
except Exception as e:
raise e
Expand Down