Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
82 changes: 33 additions & 49 deletions bin/stashcp
Original file line number Diff line number Diff line change
Expand Up @@ -49,8 +49,7 @@ def doStashCpSingle(sourceFile, destination, cache, debug=False):
payload['sitename']=sitename

# Calculate the starting time
date = datetime.datetime.now()
start1=int(time.mktime(date.timetuple()))*1000
start1 = int(time.time()*1000)

# First, check if the file is available in CVMFS
# Really, we don't need to check for close caches before this, but oh well
Expand All @@ -63,8 +62,7 @@ def doStashCpSingle(sourceFile, destination, cache, debug=False):
try:
shutil.copy(cvmfs_file, destination)
logging.debug("Succesfully copied file from CVMFS!")
date = datetime.datetime.now()
end1=int(time.mktime(date.timetuple()))*1000
end1 = int(time.time()*1000)
dlSz=os.stat(filename).st_size
dltime=end1-start1
destSpace=1
Expand All @@ -79,13 +77,7 @@ def doStashCpSingle(sourceFile, destination, cache, debug=False):
payload['start1']=start1
payload['end1']=end1
payload['cache']="CVMFS"
try:
p = multiprocessing.Process(target=es_send, name="es_send", args=(payload,))
p.start()
p.join(5)
p.terminate()
except:
logging.error("Error curling to ES")
es_send(payload)

return

Expand All @@ -95,18 +87,15 @@ def doStashCpSingle(sourceFile, destination, cache, debug=False):
else:
logging.debug("CVMFS File does not exist")

date = datetime.datetime.now()
end1=int(time.mktime(date.timetuple()))*1000
end1=int(time.time()*1000)
payload['end1']=end1
payload['start1']=start1

date = datetime.datetime.now()
start2=int(time.mktime(date.timetuple()))*1000
start2 = int(time.time()*1000)

xrd_exit=timed_transfer(timeout=TIMEOUT,filename=sourceFile,diff=DIFF,expSize=fileSize,debug=debug,cache=cache,destination=destination)

date = datetime.datetime.now()
end2=int(time.mktime(date.timetuple()))*1000
end2=int(time.time()*1000)
if os.path.exists(filename):
dlSz=os.stat(filename).st_size
destSpace=1
Expand All @@ -121,31 +110,21 @@ def doStashCpSingle(sourceFile, destination, cache, debug=False):
status = 'Success'
tries=2

payload['timestamp']=end2
payload['host']=cache
payload['download_size']=dlSz
payload['download_time']=dltime
payload['sitename']=sitename
payload['destination_space']=destSpace
payload['status']=status
payload['tries']=tries
payload['cache']=cache
try:
p = multiprocessing.Process(target=es_send, name="es_send", args=(payload,))
p.start()
p.join(5)
p.terminate()
except:
logging.error("Error curling to ES")
es_send(payload)

else: #pull from origin
logging.warning("XrdCP from cache failed on %s, pulling from origin" % cache)
cache="root://stash.osgconnect.net"
date=datetime.datetime.now()
start3=int(time.mktime(date.timetuple()))*1000
start3 = int(time.time()*1000)
xrd_exit=timed_transfer(timeout=TIMEOUT,filename=sourceFile,diff=DIFF,expSize=fileSize,debug=debug,cache=cache,destination=destination)
date=datetime.datetime.now()
end3=int(time.mktime(date.timetuple()))*1000
end3=int(time.time()*1000)
if os.path.exists(filename):
dlSz=os.stat(filename).st_size
dltime=end3-start3
Expand All @@ -157,8 +136,6 @@ def doStashCpSingle(sourceFile, destination, cache, debug=False):
logging.error("stashcp failed after 3 attempts")
status = 'Timeout'
tries = 3
payload['timestamp']=end3
payload['host']=cache
payload['download_size']=dlSz
payload['download_time']=dltime
payload['destination_space']=destSpace
Expand All @@ -168,13 +145,7 @@ def doStashCpSingle(sourceFile, destination, cache, debug=False):
payload['start3']=start3
payload['end3']=end3
payload['cache']=cache
try:
p = multiprocessing.Process(target=es_send, name="es_send", args=(payload,))
p.start()
p.join(5)
p.terminate()
except:
logging.error("Error curling to ES")
es_send(payload)


def dostashcpdirectory(sourceDir, destination, cache, debug=False):
Expand All @@ -189,16 +160,29 @@ def dostashcpdirectory(sourceDir, destination, cache, debug=False):


def es_send(payload):
data = payload
data=json.dumps(data)
try:
url = "http://uct2-collectd.mwt2.org:9951"
req = urllib2.Request(url, data=data, headers={'Content-Type': 'application/json'})
f = urllib2.urlopen(req)
response = f.read()
f.close()
except:
print "Error posting to ES"

# Calculate the curernt timestamp
payload['timestamp'] = int(time.time()*1000)
payload['host'] = payload['cache']

def _es_send(payload):
data = payload
data=json.dumps(data)
try:
url = "http://uct2-collectd.mwt2.org:9951"
req = urllib2.Request(url, data=data, headers={'Content-Type': 'application/json'})
f = urllib2.urlopen(req)
response = f.read()
f.close()
except:
logging.error("Error posting to ES")

p = multiprocessing.Process(target=_es_send, name="_es_send", args=(payload,))
p.start()
p.join(5)
p.terminate()




def timed_transfer(filename,expSize,cache,destination,timeout=TIMEOUT,diff=DIFF,debug=False):
Expand Down