Monday, October 21, 2013

Implement a hadoop pipeline for NGS data analysis - 4

"Align Fastq to ReferenceGenome and get Alignment"

This is always the first step to do a NGS data analysis, supposedly we already did the QC on the "Fastq" files. In this command, both the "Fastq" and "ReferenceGenome" are inputs, the "Alignment" is the output. The key difference between the two type of inputs is that every job has its own "Fastq" while lots of similar jobs reuse the same "ReferenceGenome", as well as the application "Align".

For the NGS data analysis, we always have some pre-existing files include

1. All bioinformatics applications, packages that were used in the data analysis.

2. All associated data files used in above applications, which include:

  1. Reference sequence file (and index) 
  2. Indexed reference sequence file for different aligners
  3. Databases like dbSNP, COSMIC, UCSC Gene Table, etc.
  4. Other pre-defined common resource

The total size of these applications and data files could be as much as tens of GB, depending how many reference genomes we are going to support in our pipeline.

To achieve the best performance we need to keep a copy of all apps and data in each slave node. We do NOT want to copy tens of GB of "immutable" apps and library files from one place to all slave nodes for every job. Also it is very important to keep the consistency of all apps and library files in all nodes. In another word, we MUST guarantee everything is the same. The idea is simple: we only maintain a single copy of all bioinformatics applications and library files in the master name node or another dedicated node, as long as this dedicated node can access all slave nodes with ssh-keyless. Then we can synchronize all slave nodes to make sure all slave nodes has a duplicated copy, before accepting EVERY new job. Why we can NOT synchronize slave nodes in a fixed interval? Because the backend library files changes may cause inconsistency if a job is in processing. So, before we pop a job from the job queue and to push to hadoop, we should synchronize all slave nodes firstly. OK, again, why do not we just do the synchronization only once when we start our framework? Because we need to update applications and library files while the service is running. If we are going to deploy this service as 24*7 we need to update these things on-the-fly. Luckily it would not take too much time as most operations are just comparing timestamp of files. The real data transferring operation will not happen unless we add/update/remove the data in the name node. Even so, the data transferring is a incremental updating. It is quite simple to get all these done - "rsync" is our friend.



For simplicity, we use the name node as the central repository. We already created indexed genome hg19 for BWA and put it under "
/home/hadoop/data/hg19/bwa". We also put the BWA binary executable under "/home/hadoop/tools/bwa" on name node.

Now we need to transfer all these files to all slave nodes. There is one script named "slaves.sh" under hadoop's distribution package. It is convenient to use that script execute a command in all slaves nodes. Howeevr, this synchronization operation should be done in our application. For demonstration, we can just do it manually to learn the process. 

#master is name node.
#n1, n2 and n3 are slave nodes.


#on slave node 
hadoop@n1$ mkdir -p /hadoop/data
hadoop@n1$ mkdir -p /hadoop/app


#on master node 
hadoop@master$ rsync -arvue ssh /home/hadoop/tools/bwa hadoop@n1:/hadoop/app/


hadoop@master$ rsync -arvue ssh /home/hadoop/data/hg19/bwa hadoop@n1:/hadoop/data/



Thursday, October 17, 2013

Implement a hadoop pipeline for NGS data analysis - 3

The baby step.

The WordCount example in the hadoop distribution provides an excellent demonstration on how to implement a simple MapReduce application. About 7 years ago when I read the paper of MapReduce from Google I realized it would be an effective solution for many problems in the field of bioinformatics then I developed a light weight MapReduce application, the ABCGrid http://abcgrid.cbi.pku.edu.cn/mainly for BLAST-alike sequence analysis. 

The WordCount application will count the frequency of words so it need to read the text file, tokenize the line (Map stage). After that, by partitioning the results to Reducer, and counting each word (Reduce stage). 

The input file format for NGS will be FASTQ file, which size differ from hundreds of MB to a few GB, depending on the platform, coverage, etc. We can use the FASTQ file name as key, then we need to split the FASTQ files.

1) Split the FASTQ manually, and save as files.
   
  #count lines, supposedly we got 1M lines
  $zcat  A_1.fastq.gz | wc -l

  1.1. NUM_LINES % 4==0
  because every 4 lines represent a read, we must assure that
  split size is right 

  1.2 size the raw fastq
  #If the fastq is small, we do not need to bother splitting it     #into too many pieces, because of the overhead.

  1.3 number of nodes in the cluster, N
  #number of splits >= N

  1.4 HDFS block size, M
  #size of each split <= M

  Suposedly after some calculation, we determine that we should   split the big fastq into 100 pieces, with each of them has 10K reads.

  $zcat A_1.fastq.gz | split -l 10000 -a 4 -d - A_1.

  Also we have to upload the data to the HDFS manually
  $hadoop fs -put A* /user/hdfs/suny/
2) Split the FASTQ in the application automatically, and save as files. But the size is determined by a set of rules like above, so we do need to take out our calculator

   2.1 public static int splitFastq(File input, File outputPath)
    {...
       return 10000;
    }
   2.2 public static int copyFastq(File outputPath, Path InputHDFS)
    {
       FileUtils.copy(...);
    }

3) Split the FASTQ in the application automatically, but do NOT save as files. Instead, transfer/feed the content to the Map function as value. To do this we need to derive our own class from InputFormat and RecordReader.

Monday, October 14, 2013

Implement a hadoop pipeline for NGS data analysis - 2

The fundamental idea behind any big data processing is "divide and conquer" which means we always try to divide the big data into small pieces - small enough to process in parallel in many computers. The prerequisite is that the processing of these small piece should be independent as much as possible to each other, without any in-process communications. 


Supposed a FASTA file has 10 million sequences, now we want to do a BLAST operation to find all good alignments, using human reference genome hg19.
The question is how should we "divide and conquer" this computation intensive job. The first thing come to my mind is "divide the FASTA files into many fragments". Let us assume the optimal size of a fragment is 10 sequences - in another word, we will have 1 million FASTA files, and each FASTA file contains ten sequences. If we submit this job to cluster consists of 100 computation nodes (sometime they also called "slave" or "worker"), then on average, each node will process 10,000 jobs. In an ideal situation, we can say we expect a 100x speedup.

Another approach is to divide the reference data. Let us say if the query FASTA file is very small, with only 10 sequences, then it does not make sense to split it because of the overhead. In this case, we can divide (and index) the reference genome, as long as it was "divide-able". To the very least, the chromosome is a nature and safe unit for any dividing methods.

Now come back to the NGS era, now what file formats are involved in a typical DNASeq data analysis?


  1. FASTQ, sequencing read
  2. SAM/BAM, alignment
  3. BED, genomic regions
  4. VCF, variants






Thursday, October 10, 2013

Implement a hadoop pipeline for NGS data analysis - 1


Setup a hadoop cluster

1. Software


Windows8 + VMWare-Workstation9.0 + Ubuntu-13.04-x64-server + hadoop-2.1.0-beta


Name the first ubuntu installation as "master", this will be the master server (namenode + secondarynamenode)

then snapshot/clone 3 images from "master" and name them as "n1", "n2" and "n3". These 3 images will be the slave nodes.


2. Hadoop configuration


Too many steps so I will not list them one by one in here. 
Google is your friend. DO not forget to change the /etc/hosts, /etc/hostname in n1, n2 and n3. Start the hadoop and open a web browser to see if it works http://192.168.1.2:8088/cluster
192.168.1.2 is the IP of my master server.

You should be able to see 3 active nodes in the Cluster Metrics table.


Get all necessary apps and data


1. install the BWA package



  1. #get the code
  2. git clone https://github.com/lh3/bwa.git
  3. cd bwa

  4. #install two required libs
  5. sudo apt-get install gcc
  6. sudo apt-get install zlib1g-dev

  7. #compile
  8. make

  9. #delete other files, only keep the executable BWA
  10. find . ! -name "bwa" -exec rm -fr {} \;


2. Download some reference genome data -  for test purpose, we don't need to get all of them


  1. wget http://hgdownload.soe.ucsc.edu/goldenPath/hg19/chromosomes/chr1.fa.gz
  2. wget http://hgdownload.soe.ucsc.edu/goldenPath/hg19/chromosomes/chr2.fa.gz
  3. zcat *.gz > hg19.fasta

3. Build genome index for BWA

  1. bwa index -p hg19 hg19.fasta

########################################


#/etc/hosts
192.168.221.128 prime
192.168.221.133 n1
192.168.221.139 n2
192.168.221.140 n3

#/etc/hostname
prime

sudo apt-get install vim
sudo apt-get install openssh-server
sudo apt-get install screen
sudo apt-get install openjdk-7-jdk 
sudo apt-get install eclipse-platform

wget http://apache.cs.utah.edu/hadoop/common/hadoop-2.2.0/hadoop-2.2.0.tar.gz
tar -zxvf hadoop-2.2.0.tar.gz

#.bash_profile
alias hj="hadoop jar"
alias hf="hadoop fs"
export JAVA_HOME=/usr/lib/jvm/java-7-openjdk-amd64
export DISPLAY=192.168.1.133:0.0
export HADOOP_HOME=/home/hadoop/hadoop-2.2.0
export HADOOP_MAPRED_HOME=$HADOOP_HOME
export HADOOP_COMMON_HOME=$HADOOP_HOME
export HADOOP_HDFS_HOME=$HADOOP_HOME
export YARN_HOME=$HADOOP_HOME
export HADOOP_CONF_DIR=$HADOOP_HOME/etc/hadoop
export YARN_CONF_DIR=$HADOOP_HOME/etc/hadoop
export LD_LIBRARY_PATH=$LD_LIBRARY_PATH:/home/hadoop/hadoop-2.2.0/lib/native
export EC2_HOME=$HOME/tools/ec2-api-tools-1.6.11.0
export PATH=$PATH:/home/hadoop/hadoop-2.2.0/bin
export PATH=$PATH:/home/hadoop/hadoop-2.2.0/sbin
export PATH=$PATH:/home/hadoop/tools/bwa:/home/hadoop/tools/snap-0.15.4-linux
export PATH=$PATH:$HOME/tools/ec2-api-tools-1.6.11.0/bin


#hadoop-env.sh
export JAVA_HOME=/usr/lib/jvm/java-7-openjdk-amd64
export HADOOP_OPTS=-Djava.net.preferIPv4Stack=true

#core-site.xml





fs.default.name
hdfs://prime:50011/
true


hadoop.tmp.dir
/home/hadoop/dfstmp






#hdfs-site.xml






dfs.namenode.name.dir
/home/hadoop/dfsname



        dfs.datanode.data.dir
        /home/hadoop/dfsdata



dfs.namenode.secondary.http-address
prime:50012



     dfs.replication
     3





#yarn-site.xml




yarn.nodemanager.log-dirs
/home/hadoop/logs



yarn.nodemanager.aux-services
mapreduce_shuffle



yarn.nodemanager.aux-services.mapreduce.shuffle.class
org.apache.hadoop.mapred.ShuffleHandler



yarn.resourcemanager.scheduler.address
prime:8030



yarn.resourcemanager.resource-tracker.address
prime:8031



yarn.resourcemanager.address
prime:8032



yarn.resourcemanager.admin.address
prime:8033



yarn.resourcemanager.webapp.address
prime:8088



#slaves
n1
n2

n3