Parallel Programming in Practice
Some basic concepts
- process
 - thread
 - Cooperative multitasking
 - mutex (互斥锁),用于加锁
 - conditional variable (条件变量),用于等待
 - semaphore (信号量),用于限制资源使用
 
Simultaneously excute multiple programs
GNU parallel
parallel -j 4 echo "hell await {1} {2}" ::: A B C ::: 1 2 3
#hell await A 1
#hell await A 2
#hell await A 3
#hell await B 1
#hell await B 2
#hell await B 3
#hell await C 1
#hell await C 2
#hell await C 3
- Here is some examples https://www.gnu.org/software/parallel/parallel.html#examples
 - A example for running a bash loop in parallel
 
#!/bin/bash
for a in 3 7 11 15 19 23;do
for b in 0.0 0.2 0.4 0.6 0.8 1.0 ;do
  sem -j 4  "scripts.py -a $a -b $b > log/${a}-${b}.log"
done
done
sem --wait
# sem is an alias for parallel --semaphore
GNU make
#cat sample_ids.txt
0001
0002
0003
0004
# the != operator read variables from bash command
idx != cat sample_ids.txt 
# pattern substitution
tgt = $(idx:%=count/%.txt)
.PHONY: all
# inform make not to remove intermediate files
.SECONDARY: data/%.txt
all: $(tgt)
# count occurence of each character in the random string
count/%.txt: data/%.txt
	scripts/count-character.py -i $^ -o $@
# generate some random strings
data/%.txt:
	openssl rand -base64 100000 > $@
make --jobs 4will spawn 4 process for parallel processing the pipeline
snakemake
- Inspired by gnu make, but more firendly
 - Better support for cluster environment
 - Native support for python scripting
 - Support for multiple named wildcards
 
Paralleling in programming language
C++
- pthread: gnu support for multithreading in C
 - openmp: higher level paralleling
 - 
    
std::thread: multithreading feature added in C++ 11
 - Discussion about pros and cons of these three methods
 - 
    
meme, bwa and STAR use pthread, kraken2 use openmp, seems few uses thread in C++ 11 …
 - Introductions for pthread
 - An example for 
pthreadwith mutex 
#include<stdio.h>
#include<pthread.h>
#define NTHREADS 10
static pthread_mutex_t mutex = PTHREAD_MUTEX_INITIALIZER;
static int glob = 0;
void * func(void * args){
    printf("Thread id is: %ld\n", pthread_self());
    pthread_mutex_lock(&mutex);
    glob ++;
    pthread_mutex_unlock(&mutex);
}
int main(int argc, char* argv[]){
    pthread_t threads[NTHREADS];
    for(unsigned int i=0;i<NTHREADS;i++){
        pthread_create(&threads[i],NULL,func,NULL);
    }
    for(unsigned int j=0;j<NTHREADS;j++){
        pthread_join(threads[j],NULL);
    }  
    printf("The global variable is now: %d\n",glob);
    return 0;
}
- An example for 
pthreadwith conditional variable 
#include<stdio.h>
#include<stdlib.h>
#include<pthread.h>
pthread_mutex_t count_mutex = PTHREAD_MUTEX_INITIALIZER;
pthread_mutex_t condition_mutex = PTHREAD_MUTEX_INITIALIZER;
pthread_cond_t condition_cond = PTHREAD_COND_INITIALIZER;
static int glob = 0;
void * func1(void * args){
  while(1){
    printf("func 1 try to aquire lock\n");
    pthread_mutex_lock(&condition_mutex);
    printf("func 1 aquired lock\n");
    while(glob>=3&&glob<=7){
        printf("thread 1 start waiting ...\n");
        pthread_cond_wait(&condition_cond,&condition_mutex);
        printf("thread 1 stop waiting.\n");
    }
    pthread_mutex_lock(&count_mutex);
    glob++;
    printf("func 1 update to glbal variable to: %d\n",glob);
    pthread_mutex_unlock(&count_mutex);
    pthread_mutex_unlock(&condition_mutex);
    printf("func 1 release lock\n");
    if(glob>=10)return NULL;
  }
}
void * func2(void * args){
  while(1){
    printf("func 2 try to aquire lock\n");
    pthread_mutex_lock(&condition_mutex);
    printf("func 2 aquired lock\n");
    if(glob<3||glob>7){
        printf("thread 2 is sending signal...\n");
        pthread_cond_signal(&condition_cond);
    }
    
    pthread_mutex_lock(&count_mutex);
    glob++;
    printf("func 2 update to glbal variable to: %d\n",glob);
    pthread_mutex_unlock(&count_mutex);
    pthread_mutex_unlock(&condition_mutex);
    printf("func 2 release lock\n");
    if(glob>=10)return NULL;
  }
}
int main(int argc,char * argv[]){
  // [3,10] will be printed out by function 1
  pthread_t t1,t2;
  pthread_create(&t1,NULL,&func1,NULL);
  pthread_create(&t2,NULL,&func2,NULL);
  pthread_join(t1,NULL);
  pthread_join(t2,NULL);
  return 0;
}
python
- Due to the global interpreter lock (GIL), pyhton is lack in native support for multithreading.
 - 
    
multithreading in python is not real multithreading, but it still helps in IO bounded problems.
 - CPU bounded problem
    
- multiprocessing
 
 - IO bounded problem
    
- multithreading
 - asyncio
 
 - OO style multithreading
 
#!/usr/bin/env python
from threading import Thread
from time import sleep
from random import randint
class MThread(Thread):
    def __init__(self,thread_id,delay):
        Thread.__init__(self)
        self.thread_id = thread_id
        self.delay = delay
    def run(self):
        print(f"{self.thread_id} start")
        sleep(self.delay)
        print(f"{self.thread_id} stop")
def main():
    threads = []
    for i in range(10):
        t = MThread(thread_id=i,delay=randint(1,5))
        threads.append(t)
        t.start()
    print("Join threads ...")
    for i in range(10):
        print(f"to join: {i}")
        threads[i].join() # block the program until this thread is finished
        print(f"{i} joined .")
if __name__ == "__main__":
    main()
- Multithreading with Lock
 
#!/usr/bin/env python
from threading import Lock, Thread
glob = 0
lock = Lock()
def increment():
    global glob
    for i in range(1000000):
        lock.acquire()
        glob += 1
        lock.release()
def decrement():
    global glob
    for i in range(1000000):
        lock.acquire()
        glob -= 1
        lock.release()
def main():
    threads = []
    for i in range(10):
        if i%2==0:
            target = increment
        else:
            target = decrement
        t = Thread(target=target)
        threads.append(t)
    for i in range(10):
        threads[i].start()
    for i in range(10):
        threads[i].join()
    print(glob)
if __name__ == "__main__":
    main()
- A multithreading thread example with semaphore
 
#!/usr/bin/env python
from threading import Thread, Semaphore, active_count
from time import sleep
from random import randint
n_active = 0
semaphore = Semaphore(5)
def func(i):
    global n_active
    print(f"thread {i} is accquiring a semphore ...")
    semaphore.acquire()
    print(f"thread {i} accquired a semphore ...")
    n_active += 1
    sleep(randint(1,3))
    semaphore.release()
    n_active -= 1
def main():
    threads = []
    for i in range(20):
        t = Thread(target=func,args=(i,))
        threads.append(t)
    for i in range(20):
        threads[i].start()
    for i in range(20):
        print(f"the number of activated threads is {n_active}")
        threads[i].join()
if __name__ == "__main__":
    main()
- API for multiprocessing is almost same as multithreading
 
#!/usr/bin/env python
from multiprocessing import Process
from time import sleep
from random import randint
def function(i):
    print(f"{i} start")
    sleep(randint(1,5))
    print(f"{i} stop")
def main():
    processes = []
    for i in range(10):
        p = Process(target=function,args=(i,))
        processes.append(p)
        p.start()
    print("Join threads ...")
    for i in range(10):
        print(f"to join: {i}")
        processes[i].join() # block the program until this thread is finished
        print(f"{i} joined .")
if __name__ == "__main__":
    main()
- Using 
ProcessPoolExecutorin concurrent.futures 
#!/usr/bin/env python
from concurrent.futures import ProcessPoolExecutor, as_completed
import time
def count(number) :
    for i in range(0,10000000):
        i=i+1
    return i * number
def main():
    global numbers
    numbers = [1, 2, 3, 4, 5, 6, 7, 8, 9, 10]
    start = time.time()
    with ProcessPoolExecutor(max_workers=5) as executor:
        futures = [executor.submit(count, number) for number in numbers]
        for future in as_completed(futures):
            print(future.result())
    end = time.time()
    print(f"{end-start} seconds passed.")
if __name__ == "__main__":
    main()
- 
    
For more examples, see python-parallel-programmning-cookbook
 - 
    
Simple processing with Pool class
 
#!/usr/bin/env python
from multiprocessing import Pool,Manager
import sys
import logging 
import argparse
import os
from tqdm import tqdm
logging.basicConfig(level=logging.INFO, format='[%(asctime)s] [%(levelname)s] %(name)s: %(message)s')
logger = logging.getLogger('grouping')
def load_fasta(path):
    sequences_ = {}
    with open(path) as f:
        for line in f:
            if line.startswith(">"):
                seq_id = line.strip()[1:].split(" ")[0]
                sequences_[seq_id] = ""
            else:
                sequences_[seq_id] += line.strip()
    for seq_id in sequences_:
        sequences[seq_id] = sequences_[seq_id]
def main():
    parser = argparse.ArgumentParser(description='group riboswitches by rfam')
    parser.add_argument('--input-directory','-i',type=str,required=True,help="input directory")
    parser.add_argument('--output-directory','-o',type=str,required=True,help="output directory")
    parser.add_argument('--jobs','-j',type=int,default=8,help="Works used for data loading")
    args = parser.parse_args()
    logger.info("Load riboswitch sequences ...")
    logger.info("Get fasta paths ... ")
    fasta_paths = []
    global sequences
    sequences = Manager().dict() 
    # note a normal dict() does not work here
    # the Manager object is specifically designed to allow multiple processes to manipulate them
    for fasta in os.listdir(args.input_directory):
        fasta_path = args.input_directory + "/" + fasta
        fasta_paths.append(fasta_path)
    logger.info(f"{len(fasta_paths)} fasta files to load .")
    logger.info(f"Load fasta files with {args.jobs} workers ...")
    pool = Pool(processes=args.jobs) 
    results = pool.map(load_fasta,fasta_paths) 
    logger.info(f"{len(sequences)} are loaded .") 
    if not os.path.exists(args.output_directory):
        logger.info(f"{args.output_directory} does not exists, create it .")
        os.mkdir(args.output_directory)
    handles = {}
    width = 70
    n_duplicate = 0
    encountered = set()
    logger.info(f"Saving groupped sequences to {args.output_directory} ...")
    for seq_id in tqdm(sequences):
        fields = seq_id.split(":")
        sequence = sequences[seq_id]
        if sequence in encountered:
            n_duplicate += 1
            continue
        encountered.add(sequence)
        rfam_id, rfam_name = fields[-2:]
        if rfam_id not in handles:
            handles[rfam_id] = open(args.output_directory + "/" + rfam_id + "-" + rfam_name + ".fa","w")
        handles[rfam_id].write(f">{seq_id}\n")
        p = 0
        while p < len(sequence):
            handles[rfam_id].write(f"{sequence[p:p+width]}\n")
            p += width
    logger.info(f"{n_duplicate} sequences are skipped .")
    for rfam_id in handles:
        handles[rfam_id].close()    
    logger.info("All done .")
- Parallel loading of gzipped files
 
R
- The parallel package https://stat.ethz.ch/R-manual/R-devel/library/parallel/doc/parallel.pdf
 - See this tutorial https://bookdown.org/rdpeng/rprogdatascience/parallel-computation.html