Monday, August 23, 2010

query, download, upload and update with rsync


#!/home/ying/py2.6.5/bin/python
"""
Use rsync to synchronize the DATA_HOME(hci-bio2, which raw data was stored) and COMP_HOME(chpc-node,
which process raw data and return output back to source. All the rsync operations are initiated from chpc-node.

Functions:
1. query: query the source if the new data is available
2. download: download new data ( from source to destination)
3. upload: upload result(output) to source.
4. update: update the source by appending a log file from the destination to show the alignment progress on chpc. "begin, percent-finished, end, error, etc"

"""
import os
import time
import operator
import subprocess

DATA_HOME = 'ying@123.123.123.123:~/alignment/'
COMP_HOME = '/home/ying/alignment/'
RSYNC_QUERY_PARAMS = '-are ssh'
RYSNC_DOWNLOAD_PARAMS = '-avuze ssh' #archive, verbose, update-only, compress-during-transfer and trasnfer-over-ssh
RYSNC_UPLOAD_PARAMS = '-avuze ssh' #archive, verbose, update-only, compress-during-transfer and trasnfer-over-ssh
LOG_FILE_NAME = 'log.txt'

def query():
#query_cmd = """rsync -are ssh ying@155.100.235.73:~/alignment/ | awk '{print $1 " " $3"#"$4 " " $5}'"""
cmd = ["rsync",RSYNC_QUERY_PARAMS,DATA_HOME,'|',"""awk '{print $1 " " $3"#"$4 " " $5}'"""]
query_cmd = ' '.join(cmd)
#print query_cmd
p = subprocess.Popen(query_cmd,shell=True,stdout=subprocess.PIPE)
line = p.communicate()
fnames = ''.join(line[:-1]).strip().split('\n')
df = {}
dt = {}
for fname in fnames:
prop,ctime,fn = fname.split(' ')
if fn != '.' and fn != '..':
if prop[0] == 'd': #it is a directory
pass
if prop[0] == '-': #it is a file
parent_path = os.path.dirname(fn)
df.setdefault(parent_path,[]).append(os.path.basename(fn))
#Transform time to seconds, for comparing which job should be run firstly. First In First Run.
#'2010/08/20#16:27:40' -> 1282343260.0
dt[parent_path] = int(time.mktime(time.strptime(ctime,'%Y/%m/%d#%H:%M:%S')))
jobs = []
for k,v in df.items():
if "begin" in v: #the data is ready
if not LOG_FILE_NAME in v: #this job is not running
jobs.append((k,dt[k]))
#print jobs
return sorted(jobs,key=operator.itemgetter(1))

def download(path_to_download):
"""download a whole directory from DATA_HOME to COMP_HOME
"""
normalized_path = path_to_download.strip('/') #/A, /A/ or A/ -> A
source = os.path.join(DATA_HOME,normalized_path)
cmd = ["rsync", RYSNC_DOWNLOAD_PARAMS, source, COMP_HOME] #download the whole directory to COMP_HOME.
download_cmd = ' '.join(cmd)
#print download_cmd
p = subprocess.Popen(download_cmd,shell=True,stdout=subprocess.PIPE)
line = p.communicate()
stdout_message = ''.join(line[:-1]).strip().split('\n')
#print 'Message:',stdout_message
update(normalized_path,stdout_message)

def upload(path,file_to_upload):
"""upload a file from COMP_HOME to DATA_HOME
upload 'file_to_upload' at local path 'folder'.
:/folder/file_to_upload will be uploaded to :/
"""
normalized_path = path.strip('/')+"/" #/A, /A/ or A/ -> A/
source = os.path.join(COMP_HOME,normalized_path,file_to_upload)
dest = os.path.join(DATA_HOME,normalized_path)
cmd = ["rsync",RYSNC_UPLOAD_PARAMS, source,dest]
upload_cmd = ' '.join(cmd)
p = subprocess.Popen(upload_cmd,shell=True,stdout=subprocess.PIPE)
line = p.communicate()
stdout_message = ''.join(line[:-1]).strip().split('\n')
#fn_log = log(os.path.join(COMP_HOME,normalized_src),stdout_message)
update(normalized_path,stdout_message)

def update(path,message):
fn = os.path.join(COMP_HOME,path,LOG_FILE_NAME)
ofn = None
try:
if os.path.exists(fn):
ofn = open(fn,'w+')#append
else:
ofn = open(fn,'w') #new file
print >>ofn,'-'*50
print >>ofn,time.asctime()
for m in message:
print >>ofn,M
ofn.close()
normalized_path = path.strip('/')+"/" #/A, /A/ or A/ -> A/
source = fn
dest = os.path.join(DATA_HOME,normalized_path)
cmd = ["rsync",RYSNC_UPLOAD_PARAMS, source,dest]
subprocess.Popen(cmd,shell=False,stdout=subprocess.PIPE)
except:
pass

def clear():
pass
#print query()
#download('A')
upload('A','hello.txt')

No comments:

Post a Comment