Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
101 changes: 49 additions & 52 deletions bin/stashcp
Original file line number Diff line number Diff line change
Expand Up @@ -3,16 +3,12 @@
import optparse
import sys
import subprocess
import datetime
import time
import re
import os
import json
import multiprocessing
import urllib2
import threading
import math
import socket
import random
import shutil

Expand All @@ -29,24 +25,21 @@ def doStashCpSingle(sourceFile, destination, cache, debug=False):

logging.debug("Checking size of file.")
xrdfs = subprocess.Popen(["xrdfs", "root://stash.osgconnect.net", "stat", sourceFile], stdout=subprocess.PIPE).communicate()[0]
xrdcp_version=subprocess.Popen(['echo $(xrdcp -V 2>&1)'],stdout=subprocess.PIPE,shell=True).communicate()[0][:-1]
fileSize=int(re.findall(r"Size: \d+",xrdfs)[0].split(": ")[1])
logging.debug("Size of the file %s is %i" % (sourceFile, fileSize))
xrdcp_version = subprocess.Popen(['echo $(xrdcp -V 2>&1)'], stdout=subprocess.PIPE, shell=True).communicate()[0][:-1]
fileSize = int(re.findall(r"Size: \d+", xrdfs)[0].split(": ")[1])
logging.debug("Size of the file %s is %i", sourceFile, str(fileSize))
#cache=get_best_stashcache()
logging.debug("Using Cache %s" % cache)
logging.debug("Using Cache %s", cache)

try:
sitename=os.environ['OSG_SITE_NAME']
except:
sitename="siteNotFound"
sitename = os.environ.setdefault("OSG_SITE_NAME", "siteNotFound")

# Fill out the payload as much as possible
filename=destination+'/'+sourceFile.split('/')[-1]
payload={}
payload['xrdcp_version']=xrdcp_version
payload['filesize']=fileSize
payload['filename']=sourceFile
payload['sitename']=sitename
filename = destination + '/' + sourceFile.split('/')[-1]
payload = {}
payload['xrdcp_version'] = xrdcp_version
payload['filesize'] = fileSize
payload['filename'] = sourceFile
payload['sitename'] = sitename

# Calculate the starting time
start1 = int(time.time()*1000)
Expand All @@ -57,7 +50,7 @@ def doStashCpSingle(sourceFile, destination, cache, debug=False):
cvmfs_file = os.path.join("/cvmfs/stash.osgstorage.org/", sourceFile[1:])
else:
cvmfs_file = os.path.join("/cvmfs/stash.osgstorage.org/", sourceFile)
logging.debug("Checking if the CVMFS file exists: %s" % cvmfs_file)
logging.debug("Checking if the CVMFS file exists: %s", cvmfs_file)
if os.path.exists(cvmfs_file):
try:
shutil.copy(cvmfs_file, destination)
Expand All @@ -81,8 +74,8 @@ def doStashCpSingle(sourceFile, destination, cache, debug=False):

return

except IOError as e:
logging.error("Unable to copy with CVMFS, even though file exists: %s" % str(e))
except IOError, e:
logging.error("Unable to copy with CVMFS, even though file exists: %s", str(e))

else:
logging.debug("CVMFS File does not exist")
Expand All @@ -93,7 +86,7 @@ def doStashCpSingle(sourceFile, destination, cache, debug=False):

start2 = int(time.time()*1000)

xrd_exit=timed_transfer(timeout=TIMEOUT,filename=sourceFile,diff=DIFF,expSize=fileSize,debug=debug,cache=cache,destination=destination)
xrd_exit=timed_transfer(filename=sourceFile, debug=debug, cache=cache, destination=destination)

end2=int(time.time()*1000)
if os.path.exists(filename):
Expand All @@ -105,7 +98,7 @@ def doStashCpSingle(sourceFile, destination, cache, debug=False):
payload['end2']=end2

if xrd_exit=='0': #worked first try
logging.debug("Transfer success using %s" % cache)
logging.debug("Transfer success using %s", cache)
dltime=end2-start2
status = 'Success'
tries=2
Expand All @@ -120,10 +113,10 @@ def doStashCpSingle(sourceFile, destination, cache, debug=False):
es_send(payload)

else: #pull from origin
logging.warning("XrdCP from cache failed on %s, pulling from origin" % cache)
logging.warning("XrdCP from cache failed on %s, pulling from origin", cache)
cache="root://stash.osgconnect.net"
start3 = int(time.time()*1000)
xrd_exit=timed_transfer(timeout=TIMEOUT,filename=sourceFile,diff=DIFF,expSize=fileSize,debug=debug,cache=cache,destination=destination)
xrd_exit=timed_transfer(filename=sourceFile, debug=debug, cache=cache, destination=destination)
end3=int(time.time()*1000)
if os.path.exists(filename):
dlSz=os.stat(filename).st_size
Expand All @@ -150,13 +143,13 @@ def doStashCpSingle(sourceFile, destination, cache, debug=False):

def dostashcpdirectory(sourceDir, destination, cache, debug=False):
sourceItems = subprocess.Popen(["xrdfs", "root://stash.osgconnect.net", "ls", sourceDir], stdout=subprocess.PIPE).communicate()[0].split()
for file in sourceItems:
command2 = 'xrdfs root://stash.osgconnect.net stat '+ file + ' | grep "IsDir" | wc -l'
for remote_file in sourceItems:
command2 = 'xrdfs root://stash.osgconnect.net stat '+ remote_file + ' | grep "IsDir" | wc -l'
isdir=subprocess.Popen([command2],stdout=subprocess.PIPE,shell=True).communicate()[0].split()[0]
if isdir!='0':
dostashcpdirectory(file, destination, cache, debug)
dostashcpdirectory(remote_file, destination, cache, debug)
else:
doStashCpSingle(file,destination, cache, debug)
doStashCpSingle(remote_file,destination, cache, debug)


def es_send(payload):
Expand All @@ -172,20 +165,22 @@ def es_send(payload):
url = "http://uct2-collectd.mwt2.org:9951"
req = urllib2.Request(url, data=data, headers={'Content-Type': 'application/json'})
f = urllib2.urlopen(req)
response = f.read()
f.read()
f.close()
except:
logging.error("Error posting to ES")
except urllib2.URLError, e:
logging.error("Error posting to ES: %s", str(e))

p = multiprocessing.Process(target=_es_send, name="_es_send", args=(payload,))
p.start()
p.join(5)
p.terminate()




def timed_transfer(filename,expSize,cache,destination,timeout=TIMEOUT,diff=DIFF,debug=False):
def timed_transfer(filename, cache, destination, debug=False):
"""
Transfer the filename from the cache to the destination using xrdcp
"""


# All these values can be found on the xrdcp man page
Expand All @@ -207,7 +202,7 @@ def timed_transfer(filename,expSize,cache,destination,timeout=TIMEOUT,diff=DIFF,
os.remove(filename)
xrdcp=subprocess.Popen([command ],shell=True,stdout=subprocess.PIPE)

streamdata=xrdcp.communicate()[0]
xrdcp.communicate()
xrd_exit=xrdcp.returncode

return str(xrd_exit)
Expand All @@ -219,7 +214,7 @@ def get_best_stashcache():
dir_path = os.path.dirname(os.path.realpath(__file__))
cache_file = os.path.join(dir_path, 'caches.json')
if not os.path.isfile(cache_file):
logging.error("Unable to find caches.json in %s" % dir_path)
logging.error("Unable to find caches.json in %s", dir_path)
return None

# Get all the caches from the json file
Expand Down Expand Up @@ -260,36 +255,37 @@ def get_best_stashcache():
cur_site = geo_ip_sites[i]
logging.debug("Trying geoip site of: %s", cur_site)
final_url = "%s/%s/%s" % (cur_site, append_text, caches_string)
logging.debug("Querying for closest cache: %s" % final_url)
logging.debug("Querying for closest cache: %s", final_url)
try:
response = urllib2.urlopen(final_url)
if response.getcode() == 200:
logging.debug("Got error code 200 from %s" % cur_site)
logging.debug("Got error code 200 from %s", cur_site)
found = True
break
except urllib2.URLError as e:
logging.debug("URL error: %s" % str(e))
except urllib2.URLError, e:
logging.debug("URL error: %s", str(e))
i+=1

if found == False:
# Unable to find a geo_ip server to use, return random choice from caches!
minsite = random.choice(caches_list)
logging.error("Unable to use Geoip to find closest cache! Returning random cache %s" % minsite)
logging.error("Unable to use Geoip to find closest cache! Returning random cache %s", minsite)
return minsite
else:

# From the response, should respond with something like:
# 3,1,2
ordered_list = response.read().strip().split(",")
logging.debug("Got response %s" % str(ordered_list))
logging.debug("Got response %s", str(ordered_list))
minsite = caches_list[int(ordered_list[0])-1]['name']

logging.debug("Returning closest cache: %s" % minsite)
logging.debug("Returning closest cache: %s", minsite)
return minsite


def main():
parser = optparse.OptionParser()
usage = "usage: %prog [options] source destination"
parser = optparse.OptionParser(usage)
parser.add_option('-d', '--debug', dest='debug', action='store_true', help='debug')
parser.add_option('-r', dest='recursive', action='store_true', help='recursively copy')
parser.add_option('--closest', action='store_true')
Expand All @@ -301,16 +297,17 @@ def main():
logger.setLevel(logging.DEBUG)
else:
logger.setLevel(logging.WARNING)

if args.closest:
print get_best_stashcache()
sys.exit(0)

if not args.closest:
try:
source=opts[0]
destination=opts[1]
except:
parser.error('Source and Destination must be last two arguments')
if len(opts) != 2:
parser.error('Source and Destination must be specified on command line')
else:
print get_best_stashcache()
sys.exit()
source=opts[0]
destination=opts[1]


# Check for manually entered cache to use
if args.cache and len(args.cache) > 0:
Expand Down
5 changes: 3 additions & 2 deletions bin/stashcp2/tests/test_inside_docker.sh
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ rpm -Uvh https://dl.fedoraproject.org/pub/epel/epel-release-latest-${OS_VERSION}
yum -y install yum-plugin-priorities
rpm -Uvh https://repo.grid.iu.edu/osg/3.3/osg-3.3-el${OS_VERSION}-release-latest.rpm

yum -y install osg-oasis
yum -y install osg-oasis pylint

echo "user_allow_other" >> /etc/fuse.conf

Expand All @@ -34,7 +34,8 @@ set +e
set -e
module load xrootd

cp /StashCache/bin/caches.json /StashCache/bin/stashcp2/caches.json
# For now, disable pylint failures
pylint /StashCache/bin/stashcp || /bin/true

# Perform tests
/StashCache/bin/stashcp --cache=$XRD_CACHE -d /user/dweitzel/public/blast/queries/query1 ./
Expand Down