diff --git a/samples/DEMO.md b/samples/DEMO.md index f2a281445..0344b0f45 100644 --- a/samples/DEMO.md +++ b/samples/DEMO.md @@ -766,7 +766,9 @@ The array aux fields can be updated using bam_aux_update_array api. Refer: mod_aux_ba.c Shows the records updated with an array of integers, containing count of ACGT -and N in that order. +and N in that order. The bases are decoded before count for the sake of +simplicity. Refer qtask_ordered.c for a better counting where decoding is made +outside the loop. ./mod_aux_ba samtools/test/mpileup/mpileup.1.bam @@ -1386,22 +1388,21 @@ Refer: flags_htsopt_field.c The HTSLib api supports thread pooling for better performance. There are a few ways in which this can be used. The pool can be made specific for a file or a generic pool can be created and shared across multiple files. Thread pool can -also be used to execute user defined tasks. The tasks are to be queued on a -queue and threads in pool executes them and results can be queued back if -required. - -The samples are trivial ones to showcase the usage of api and the number of -threads to use has to be identified based on complexity and parallelism of the -job. +also be used to execute user defined tasks. The tasks are to be added to queue, +threads in pool executes them and results can be queued back if required. To have a thread pool specific for a file, hts_set_opt api can be used with the -file pointer, HTS_OPT_NTHREADS and the number of threads to use in the pool. -Closure of file releases the thread pool as well. To have a thread pool which -can be shared across different files, it needs to be initialized using -hts_tpool_init api, passing number of threads as argument. This thread pool can -be associated with a file using hts_set_opt api. The file pointer, -HTS_OPT_THREAD_POOL and the thread pool address are to be passed as arguments -to api. The thread pool has to be released with hts_tpool_destroy. +file pointer, HTS_OPT_NTHREADS and the number of threads to be in the pool. +Thread pool is released on closure of file. To have a thread pool which can be +shared across different files, it needs to be initialized using hts_tpool_init +api, passing number of threads as an argument. This thread pool can be +associated with a file using hts_set_opt api. The file pointer, +HTS_OPT_THREAD_POOL and the thread pool address are to be passed as arguments to +the api. The thread pool has to be released with hts_tpool_destroy. + +The samples are trivial ones to showcase the usage of api. The number of threads +to use for different tasks has to be identified based on complexity and +parallelism of the task. Below excerpt shows file specific thread pool, @@ -1444,9 +1445,9 @@ The order of execution of threads are decided based on many factors and load on each task may vary, so the completion of the tasks may not be in the order of their queueing. The queues can be used in two different ways, one where the result is enqueued to queue again to be read in same order as initial queueing, -two where the resuls is not enqueued and is readily available, possibly in a -different order than initial queueing. Explicitly created threads can also be -used along with hts thread pool usage. +second where the resuls are not enqueued and completed possibly in a different +order than initial queueing. Explicitly created threads can also be used along +with hts thread pool usage. hts_tpool_process_init initializes the queue / process, associates a queue with thread pool and reserves space for given number of tasks on queue. It takes a @@ -1464,124 +1465,156 @@ all the piled up tasks are processed, a possible case when the queueing and processing happen at different speeds. hts_tpool_process_shutdown api stops the processing of queue. +There are a few apis which let the user to check the status of processing. The +api hts_tpool_process_empty shows whether all the tasks are completed or not. +The api hts_tpool_process_sz gives the number of tasks, at different states of +processing. The api hts_tpool_process_len gives the number of results in output +queue waiting to be collected. + The order of execution of tasks depends on the number of threads involved and how the threads are scheduled by operating system. When the results are enqueued back to queue, they are read in same order of enqueueing of task and in that case the order of execution will not be noticed. When the results are not enqueued the results are available right away and the order of execution may be -noticeable. Based on the nature of task and need of order maintenance, users +noticeable. Based on the nature of task and the need of order maintenance, users can select either of the queueing. Below excerpts shows the usage of queues and threads in both cases. In the 1st, alignments are updated with an aux tag indicating GC ratio. The order of data has to be maintained even after update, hence the result queueing is used to -ensure same order as initial. A number of alignments are bunched together for -optimal processing. +ensure same order as initial. A number of alignments are bunched together and +reuse of allocated memory is made to make it perform better. A sentinel job is +used to identify the completion of all tasks at the result collection side. ... void *thread_ordered_proc(void *args) { ... for ( i = 0; i < bamdata->count; ++i) { - //add count - if (addcount(bamdata->bamarray[i]) < 0) { ... - return bamdata; + for (pos = 0; pos < bamdata->bamarray[i]->core.l_qseq; ++pos) { + count[bam_seqi(data,pos)]++; + ... + gcratio = (count[2] /*C*/ + count[4] /*G*/) / (float) (count[1] /*A*/ + count[8] /*T*/ + count[2] + count[4]); + + if (bam_aux_append(bamdata->bamarray[i], "xr", 'f', sizeof(gcratio), (const uint8_t*)&gcratio) < 0) { ... void *threadfn_orderedwrite(void *args) { - ... - do { - //get result and write; wait if no result is in queue - until shutdown of queue - while ((r = hts_tpool_next_result(tdata->queue))) { - bamdata = (data*) hts_tpool_result_data(r); - for (i = 0; i < bamdata->count; ++i) { - if (sam_write1(tdata->outfile, tdata->samhdr, bamdata->bamarray[i]) < 0) { - ... - hts_tpool_delete_result(r, 0); //release the result memory - .. - } while (pthread_cond_timedwait(tdata->done, tdata->lock, &timeout) == ETIMEDOUT); + ... + //get result and write; wait if no result is in queue - until shutdown of queue + while (tdata->result == 0 && + (r = hts_tpool_next_result_wait(tdata->queue)) != NULL) { + bamdata = (data*) hts_tpool_result_data(r); + ... + for (i = 0; i < bamdata->count; ++i) { + if (sam_write1(tdata->outfile, tdata->samhdr, bamdata->bamarray[i]) < 0) { + ... + hts_tpool_delete_result(r, 0); //release the result memory + + // Shut down the process queue. If we stopped early due to a write failure, + // this will signal to the other end that something has gone wrong. + hts_tpool_process_shutdown(tdata->queue); ... int main(int argc, char *argv[]) { ... if (!(pool = hts_tpool_init(cnt))) { //thread pool ... + tpool.pool = pool; //to share the pool for file read and write as well //queue to use with thread pool, for task and results if (!(queue = hts_tpool_process_init(pool, cnt * 2, 0))) { ... - if (!(infile = sam_open(inname, "r"))) { - ... - if (!(outfile = sam_open(file, "w"))) { + //share the thread pool with i/o files + if (hts_set_opt(infile, HTS_OPT_THREAD_POOL, &tpool) < 0 || + hts_set_opt(outfile, HTS_OPT_THREAD_POOL, &tpool) < 0) { ... - //start output writer thread for ordered processing if (pthread_create(&thread, NULL, threadfn_orderedwrite, &twritedata)) { ... while (c >= 0) { - bamdata = getbamstorage(chunk); - //read alignments, upto max size for this lot - for (cnt = 0; cnt < bamdata->size; ++cnt) { + if (!(bamdata = getbamstorage(chunk, &bamcache))) { + ... + for (cnt = 0; cnt < bamdata->maxsize; ++cnt) { + c = sam_read1(infile, in_samhdr, bamdata->bamarray[cnt]); + ... + if (hts_tpool_dispatch3(pool, queue, thread_ordered_proc, bamdata, + cleanup_bamstorage, cleanup_bamstorage, + 0) == -1) { ... - //queue the lot for processing + if (queue) { + if (-1 == c) { + // EOF read, send a marker to tell the threadfn_orderedwrite() + // function to shut down. if (hts_tpool_dispatch(pool, queue, thread_ordered_proc, - bamdata) == -1) { + NULL) == -1) { ... - if (-1 == c) { - //EOF read, trigger processing for anything pending, NOTE: will be blocked until queue is cleared - if (hts_tpool_process_flush(queue) == -1) { + hts_tpool_process_shutdown(queue); ... - usleep(WAIT * 1000000); //to ensure result check is done atleast once after processing - //trigger exit for ordered write thread - pthread_cond_signal(twritedata.done); + // Wait for threadfn_orderedwrite to finish. + if (started_thread) { + pthread_join(thread, NULL); ... - //shutdown queues to exit the result wait - hts_tpool_process_shutdown(queue); + if (queue) { + // Once threadfn_orderedwrite has stopped, the queue can be + // cleaned up. + hts_tpool_process_destroy(queue); + } ... Refer: qtask_ordered.c -In this 2nd, the alignments from input are split to different files containing -alignment specific to given regions. The order in which the output files are -created doesnt matter and hence no result queueing is used. +In this 2nd, the bases are counted and GC ratio of whole file is calculated. +Order in which bases are counted is not relevant and no result queue required. +The queue is created as input only. ... - void * splittoregions(void *args) + void *thread_unordered_proc(void *args) { ... - if (!(infile = sam_open(data->infile, "r"))) { - ... - if (!(in_samhdr = sam_hdr_read(infile))) { - ... - if (!(bamdata = bam_init1())) { - ... - if (!(idx = sam_index_load(infile, data->infile))) { - ... - if (!(iter = sam_itr_querys(idx, in_samhdr, region))) { - ... - if (!(outfile = sam_open(file, "w"))) { + for ( i = 0; i < bamdata->count; ++i) { + data = bam_get_seq(bamdata->bamarray[i]); + for (pos = 0; pos < bamdata->bamarray[i]->core.l_qseq; ++pos) { ... - if (sam_hdr_write(outfile, in_samhdr) < 0) { + counts[bam_seqi(data, pos)]++; ... - while ((ret = sam_itr_next(infile, iter, bamdata)) >= 0) { //read and write relevant data - if (sam_write1(outfile, in_samhdr, bamdata) < 0) { + //update result and add the memory block for reuse + pthread_mutex_lock(&bamdata->cache->lock); + for (i = 0; i < 16; i++) { + bamdata->bases->counts[i] += counts[i]; + } + + bamdata->next = bamdata->cache->list; + bamdata->cache->list = bamdata; + pthread_mutex_unlock(&bamdata->cache->lock); ... int main(int argc, char *argv[]) { - ... - //get regions from bedfile - if ((regcnt = readbedfile(bedfile, ®ions)) < 0) { - ... - if (!(pool = hts_tpool_init(cnt))) { //thread pool ... if (!(queue = hts_tpool_process_init(pool, cnt * 2, 1))) { ... - for (c = 0; c < regcnt; ++c) { + c = 0; + while (c >= 0) { + ... + for (cnt = 0; cnt < bamdata->maxsize; ++cnt) { + c = sam_read1(infile, in_samhdr, bamdata->bamarray[cnt]); ... - //schedule jobs to run in parallel - if (hts_tpool_dispatch(pool, queue, splittoregions, task) < 0) { + if (c >= -1 ) { ... - //trigger processing for anything pending, NOTE: will be blocked until queue is cleared - if (hts_tpool_process_flush(queue) == -1) { + if (hts_tpool_dispatch3(pool, queue, thread_unordered_proc, bamdata, + cleanup_bamstorage, cleanup_bamstorage, + 0) == -1) { ... - //shutdown queues to exit the result wait - hts_tpool_process_shutdown(queue); + if (-1 == c) { + // EOF read, ensure all are processed, waits for all to finish + if (hts_tpool_process_flush(queue) == -1) { + fprintf(stderr, "Failed to flush queue\n"); + } else { //all done + //refer seq_nt16_str to find position of required bases + fprintf(stdout, "GCratio: %f\nBase counts:\n", + (gccount.counts[2] /*C*/ + gccount.counts[4] /*G*/) / (float) + (gccount.counts[1] /*A*/ + gccount.counts[8] /*T*/ + + gccount.counts[2] + gccount.counts[4])); + ... + if (queue) { + hts_tpool_process_destroy(queue); + } ... Refer: qtask_unordered.c diff --git a/samples/README.md b/samples/README.md index 6ede74007..c68354329 100644 --- a/samples/README.md +++ b/samples/README.md @@ -209,9 +209,8 @@ indexed. [Qtask_unordered][Qtask_unordered] This application showcases the use of queues and threads for custom - processing. In this, alignments in input files are split to different files - for requested regions. The processing may occur in any order and results may - appear in the order completion of processing. + processing. The count of bases and GC ratio are calculated and displayed. + The order of counting is irrelevant and hence ordered retrieval is not used. ### More Information diff --git a/samples/qtask_ordered.c b/samples/qtask_ordered.c index 731587e97..a76d59826 100644 --- a/samples/qtask_ordered.c +++ b/samples/qtask_ordered.c @@ -33,19 +33,27 @@ DEALINGS IN THE SOFTWARE #include #include +typedef struct data { + int count; //used up size + int maxsize; //max size per data chunk + bam1_t **bamarray; //bam1_t array for optimal queueing + struct data *next; //pointer to next one - to reuse earlier allocations +} data; + +typedef struct datacache +{ + pthread_mutex_t lock; //synchronizes the access to cache + data *list; //data storage +} datacache; + typedef struct orderedwrite { samFile *outfile; //output file handle sam_hdr_t *samhdr; //header used to write data hts_tpool_process *queue; //queue from which results to be retrieved + datacache *cache; //to re-use allocated storage int result; //result code returned by writer thread } orderedwrite; -typedef struct data { - int count; //used up size - int size; //max size - bam1_t **bamarray; //bam1_t array for optimal queueing -} data; - /// print_usage - print the usage /** @param fp pointer to the file / terminal to which usage to be dumped returns nothing @@ -53,78 +61,71 @@ returns nothing static void print_usage(FILE *fp) { fprintf(fp, "Usage: qtask_ordered infile threadcount outdir [chunksize]\n\ -Calculates GC ratio - sum(G,C) / sum(A,T,C,G,N) - and adds to each alignment\n\ +Calculates GC ratio - sum(G,C) / sum(A,T,C,G) - and adds to each alignment\n\ as xr:f aux tag. Output is saved in outdir.\n\ -chunksize [100] sets the number of alignments clubbed together to process.\n"); +chunksize [4096] sets the number of alignments clubbed together to process.\n"); return; } -/// addcount - calculates and adds the aux tag -/** @param bamdata pointer to bam data -returns 0 on success and -1 for failure -*/ -int addcount(bam1_t *bamdata) -{ - int pos = 0, gc = 0, ret = 0; - float gcratio = 0; - uint8_t *data = bam_get_seq(bamdata); - for (pos = 0; pos < bamdata->core.l_qseq; ++pos) { - switch(seq_nt16_str[bam_seqi(data, pos)]) { - case 'G': //fall through - case 'C': - gc++; - break; - } - } - gcratio = gc / (float) bamdata->core.l_qseq; - - if (bam_aux_append(bamdata, "xr", 'f', sizeof(gcratio), (const uint8_t*)&gcratio) < 0) { - fprintf(stderr, "Failed to add aux tag xr, errno: %d\n", errno); - ret = -1; - } - return ret; -} - - /// getbamstorage - allocates storage for alignments to queue -/** @param chunk no of alignments to be queued together -returns allocated data. +/** @param chunk number of bam data to allocate + * @param bamcache cached storage +returns already allocated data storage if one is available, otherwise allocates new */ -data* getbamstorage(int chunk) +data* getbamstorage(int chunk, datacache *bamcache) { int i = 0; - data *bamdata = malloc(sizeof(data)); - if (!bamdata) { + data *bamdata = NULL; + + if (!bamcache) { return NULL; } + //get from cache if there is an already allocated storage + if (pthread_mutex_lock(&bamcache->lock)) { + return NULL; + } + if (bamcache->list) { //available + bamdata = bamcache->list; + bamcache->list = bamdata->next; //remove and set next one as available + bamdata->next = NULL; //remove link + bamdata->count = 0; + goto end; + } + //allocate and use + if (!(bamdata = malloc(sizeof(data)))) { + goto end; + } bamdata->bamarray = malloc(chunk * sizeof(bam1_t*)); if (!bamdata->bamarray) { - return NULL; + free(bamdata); + bamdata = NULL; + goto end; } for (i = 0; i < chunk; ++i) { bamdata->bamarray[i] = bam_init1(); } + bamdata->maxsize = chunk; bamdata->count = 0; - bamdata->size = chunk; + bamdata->next = NULL; +end: + pthread_mutex_unlock(&bamcache->lock); return bamdata; } /// cleanup_bamstorage - frees a bamdata struct plus contents /** @param arg Pointer to data to free - @p arg has type void * so it can be used as a callback passed to hts_tpool_dispatch3(). */ void cleanup_bamstorage(void *arg) { data *bamdata = (data *) arg; - if (!bamdata) return; if (bamdata->bamarray) { int i; - for (i = 0; i < bamdata->size; i++) { + for (i = 0; i < bamdata->maxsize; i++) { bam_destroy1(bamdata->bamarray[i]); } free(bamdata->bamarray); @@ -137,19 +138,31 @@ void cleanup_bamstorage(void *arg) returns the processed data the processing could be in any order based on the number of threads in use but read of output from queue will be in order +a null data indicates the end of input and a null is returned to be added back to result queue */ void *thread_ordered_proc(void *args) { - int i = 0; + int i = 0, pos = 0; data *bamdata = (data*)args; + float gcratio = 0; + uint8_t *data = NULL; if (bamdata == NULL) return NULL; // Indicates no more input for ( i = 0; i < bamdata->count; ++i) { //add count - if (addcount(bamdata->bamarray[i]) < 0) { - fprintf(stderr, "Failed to calculate gc data\n"); + uint64_t count[16] = {0}; + data = bam_get_seq(bamdata->bamarray[i]); + for (pos = 0; pos < bamdata->bamarray[i]->core.l_qseq; ++pos) { + count[bam_seqi(data,pos)]++; + } + /*it is faster to count all and use offset to get required counts rather than select + require ones inside the loop*/ + gcratio = (count[2] /*C*/ + count[4] /*G*/) / (float) (count[1] /*A*/ + count[8] /*T*/ + count[2] + count[4]); + + if (bam_aux_append(bamdata->bamarray[i], "xr", 'f', sizeof(gcratio), (const uint8_t*)&gcratio) < 0) { + fprintf(stderr, "Failed to add aux tag xr, errno: %d\n", errno); break; } } @@ -166,11 +179,6 @@ void *threadfn_orderedwrite(void *args) hts_tpool_result *r = NULL; data *bamdata = NULL; int i = 0; - int count = 0; - - struct timeval now; - struct timespec timeout; - long usec = 0; tdata->result = 0; @@ -192,8 +200,12 @@ void *threadfn_orderedwrite(void *args) break; } } - hts_tpool_delete_result(r, 0); //release the result memory - cleanup_bamstorage(bamdata); + hts_tpool_delete_result(r, 0); //release the result memory + + pthread_mutex_lock(&tdata->cache->lock); + bamdata->next = tdata->cache->list; //make current list as next + tdata->cache->list = bamdata; //set as current to reuse + pthread_mutex_unlock(&tdata->cache->lock); } // Shut down the process queue. If we stopped early due to a write failure, @@ -222,6 +234,7 @@ int main(int argc, char *argv[]) hts_tpool_process *queue = NULL; htsThreadPool tpool = {NULL, 0}; data *bamdata = NULL; + datacache bamcache = {PTHREAD_MUTEX_INITIALIZER, NULL}; //qtask infile threadcount outdir [chunksize] if (argc != 4 && argc != 5) { @@ -231,25 +244,23 @@ int main(int argc, char *argv[]) inname = argv[1]; cnt = atoi(argv[2]); outdir = argv[3]; - if (argc == 5) { + if (argc == 5) { //chunk size present chunk = atoi(argv[4]); } - - if (cnt < 1) { + if (cnt < 1) { //set proper thread count cnt = 1; } - if (chunk < 1) { - chunk = 10000; + if (chunk < 1) { //set valid chunk size + chunk = 4096; } //allocate space for output - size = (strlen(outdir) + sizeof("/out.sam") + 1); //space for output file name and null termination - file = malloc(size); - if (!file) { + size = (strlen(outdir) + sizeof("/out.bam") + 1); //space for output file name and null termination + if (!(file = malloc(size))) { fprintf(stderr, "Failed to set output path\n"); goto end; } - snprintf(file, size, "%s/out.sam", outdir); //output file name + snprintf(file, size, "%s/out.bam", outdir); //output file name if (!(pool = hts_tpool_init(cnt))) { //thread pool fprintf(stderr, "Failed to create thread pool\n"); goto end; @@ -266,7 +277,7 @@ int main(int argc, char *argv[]) goto end; } //open output files - w write as SAM, wb write as BAM - if (!(outfile = sam_open(file, "w"))) { + if (!(outfile = sam_open(file, "wb"))) { fprintf(stderr, "Could not open output file\n"); goto end; } @@ -281,21 +292,21 @@ int main(int argc, char *argv[]) fprintf(stderr, "Failed to read header from file!\n"); goto end; } - //write header if ((sam_hdr_write(outfile, in_samhdr) == -1)) { fprintf(stderr, "Failed to write header\n"); goto end; } - /*tasks are queued, worker threads get them and process in parallel; + /* tasks are queued, worker threads get them and process in parallel; the results are queued and they are to be removed in parallel as well */ // start output writer thread for ordered processing twritedata.outfile = outfile; - twritedata.queue = queue; - twritedata.samhdr = in_samhdr; - twritedata.result = 0; + twritedata.samhdr = in_samhdr; + twritedata.result = 0; + twritedata.queue = queue; + twritedata.cache = &bamcache; if (pthread_create(&thread, NULL, threadfn_orderedwrite, &twritedata)) { fprintf(stderr, "Failed to create writer thread\n"); goto end; @@ -304,9 +315,12 @@ int main(int argc, char *argv[]) c = 0; while (c >= 0) { - bamdata = getbamstorage(chunk); + if (!(bamdata = getbamstorage(chunk, &bamcache))) { + fprintf(stderr, "Failed to allocate memory\n"); + break; + } //read alignments, upto max size for this lot - for (cnt = 0; cnt < bamdata->size; ++cnt) { + for (cnt = 0; cnt < bamdata->maxsize; ++cnt) { c = sam_read1(infile, in_samhdr, bamdata->bamarray[cnt]); if (c < 0) { break; // EOF or failure @@ -325,8 +339,7 @@ int main(int argc, char *argv[]) goto end; } bamdata = NULL; - } - else { + } else { fprintf(stderr, "Error in reading data\n"); break; } @@ -351,14 +364,7 @@ int main(int argc, char *argv[]) // function to shut down. if (hts_tpool_dispatch(pool, queue, thread_ordered_proc, NULL) == -1) { - fprintf(stderr, "Failed to schedule processing\n"); - ret = EXIT_FAILURE; - } - - // trigger processing for anything pending - // NOTE: will be blocked until queue is cleared - if (hts_tpool_process_flush(queue) == -1) { - fprintf(stderr, "Failed to flush queues\n"); + fprintf(stderr, "Failed to schedule sentinel job\n"); ret = EXIT_FAILURE; } } else { @@ -373,8 +379,9 @@ int main(int argc, char *argv[]) pthread_join(thread, NULL); // Once the writer thread has finished, check the result it sent back - if (twritedata.result != 0) + if (twritedata.result != 0) { ret = EXIT_FAILURE; + } } if (queue) { @@ -387,17 +394,27 @@ int main(int argc, char *argv[]) sam_hdr_destroy(in_samhdr); } if (infile) { - if (sam_close(infile) != 0) + if (sam_close(infile) != 0) { ret = EXIT_FAILURE; + } } if (outfile) { - if (sam_close(outfile) != 0) + if (sam_close(outfile) != 0) { ret = EXIT_FAILURE; + } } - if (bamdata) { - cleanup_bamstorage(bamdata); + pthread_mutex_lock(&bamcache.lock); + if (bamcache.list) { + struct data *tmp = NULL; + while (bamcache.list) { + tmp = bamcache.list; + bamcache.list = bamcache.list->next; + cleanup_bamstorage(tmp); + } } + pthread_mutex_unlock(&bamcache.lock); + if (file) { free(file); } diff --git a/samples/qtask_unordered.c b/samples/qtask_unordered.c index 7e659fd3a..05fe50346 100644 --- a/samples/qtask_unordered.c +++ b/samples/qtask_unordered.c @@ -1,4 +1,4 @@ -/* qtask_unordered.c -- showcases the htslib api usage +/* qtask_ordered.c -- showcases the htslib api usage Copyright (C) 2024 Genome Research Ltd. @@ -30,21 +30,30 @@ DEALINGS IN THE SOFTWARE #include #include #include -#include #include #include -typedef struct beddata { - char *name; //chromosome name - hts_pos_t start; //start position - hts_pos_t end; //end position -} beddata; +struct datacache; -typedef struct splitdata { - beddata *region; //region information - const char *infile; //input file - const char *outdir; //output path -} splitdata; +typedef struct basecount { + uint64_t counts[16]; //count of all bases +} basecount; + +typedef struct data { + int count; //used up size + int maxsize; //max size per data chunk + bam1_t **bamarray; //bam1_t array for optimal queueing + + struct datacache *cache; + basecount *bases; //count of all possible bases + struct data *next; //pointer to next one - to reuse earlier allocations +} data; + +typedef struct datacache +{ + pthread_mutex_t lock; //synchronizes the access to cache + data *list; //data storage +} datacache; /// print_usage - print the usage /** @param fp pointer to the file / terminal to which usage to be dumped @@ -52,258 +61,258 @@ returns nothing */ static void print_usage(FILE *fp) { - fprintf(fp, "Usage: qtask_unordered infile threadcount outdir bedfile\n\ -Splits the input to files specific for regions given in bedfile. Output is\n\ -saved in outdir. Expects the index file to be present along with infile.\n"); + fprintf(fp, "Usage: qtask_unordered infile threadcount [chunksize]\n\ +Shows the base counts and calculates GC ratio - sum(G,C) / sum(A,T,C,G)\n\ +chunksize [4096] sets the number of alignments clubbed together to process.\n"); return; } -/// readbedfile - read the bedfile and return region array -/** @param file bed file name - * @param data - output, pointer to array of beddata data, to hold bed regions -returns number of regions in data +/// getbamstorage - allocates storage for alignments to queue +/** @param chunk number of bam data to allocate + * @param bases storage of result + * @param bamcache cached storage +returns already allocated data storage if one is available, otherwise allocates new */ -int readbedfile(const char *file, struct beddata **data) +data* getbamstorage(int chunk, basecount *bases, datacache *bamcache) { - int ret = -1, cnt = 0 , max = 0; - kstring_t line = KS_INITIALIZE; - htsFile *bedfile = NULL; - char *sptr = NULL, *token = NULL; - const char *sep ="\t"; - beddata *bedtoks = NULL; - - if (!data || *data) { - printf("Invalid argument\n"); - goto fail; - } - if (!(bedfile = hts_open(file, "r"))) { - printf("Failed to open bedfile\n"); - goto fail; - } - //get lines one by one and get region details - while (!(ret = kgetline(&line, (kgets_func*)hgets, bedfile->fp.hfile))) { - if (!line.l) { - continue; //skip empty line - } - if (line.s[0] == '#' || !strncmp("track ", line.s, sizeof("track ") - 1)) { - ks_clear(&line); - continue; //ignore track lines and comments - } - token = strtok_r(line.s, sep, &sptr); - if (token) { //allocate memory for regions - if ((cnt+1) > max) { - max += 100; //for another 100 regions - bedtoks = realloc(bedtoks, sizeof(beddata) * max); - } - } - else { - break; - } - bedtoks[cnt].name = strdup(token); //chromosome name - token = strtok_r(NULL, sep, &sptr); - if (!token) { - break; - } //start position - bedtoks[cnt].start = token ? atoll(token) : 0; - token = strtok_r(NULL, sep, &sptr); - if (!token) { - break; - } //end position - bedtoks[cnt++].end = token ? atoll(token) : 0; - ks_clear(&line); - } - if (ret != EOF) { - goto fail; + int i = 0; + data *bamdata = NULL; + + if (!bamcache || !bases) { + return NULL; + } + //get from cache if there is an already allocated storage + if (pthread_mutex_lock(&bamcache->lock)) { + return NULL; + } + if (bamcache->list) { //available + bamdata = bamcache->list; + bamcache->list = bamdata->next; //remove and set next one as available + bamdata->next = NULL; //remove link + bamdata->count = 0; + + bamdata->bases = bases; + bamdata->cache = bamcache; + goto end; } - ret = cnt; - if (bedfile) { - hts_close(bedfile); + //allocate and use + if (!(bamdata = malloc(sizeof(data)))) { + goto end; } - ks_free(&line); - if (!cnt) { - goto fail; + bamdata->bamarray = malloc(chunk * sizeof(bam1_t*)); + if (!bamdata->bamarray) { + free(bamdata); + bamdata = NULL; + goto end; } - *data = bedtoks; - return ret; -fail: - if (bedfile) { - hts_close(bedfile); + for (i = 0; i < chunk; ++i) { + bamdata->bamarray[i] = bam_init1(); } - ks_free(&line); - if (bedtoks) { - for (max = cnt, cnt = 0; cnt < max; ++cnt) { - free(bedtoks[cnt].name); + bamdata->maxsize = chunk; + bamdata->count = 0; + bamdata->next = NULL; + + bamdata->bases = bases; + bamdata->cache = bamcache; + +end: + pthread_mutex_unlock(&bamcache->lock); + return bamdata; +} + +/// cleanup_bamstorage - frees a bamdata struct plus contents +/** @param arg Pointer to data to free + @p arg has type void * so it can be used as a callback passed + to hts_tpool_dispatch3(). + */ +void cleanup_bamstorage(void *arg) +{ + data *bamdata = (data *) arg; + if (!bamdata) + return; + if (bamdata->bamarray) { + int i; + for (i = 0; i < bamdata->maxsize; i++) { + bam_destroy1(bamdata->bamarray[i]); } - free(bedtoks); + free(bamdata->bamarray); } - bedtoks = NULL; - return 0; + free(bamdata); } -/// splittoregions - saves the relevant data to separate file +/// thread_unordered_proc - does the processing of task in queue and updates result /** @param args pointer to set of data to be processed returns NULL the processing could be in any order based on the number of threads in use */ -void * splittoregions(void *args) +void *thread_unordered_proc(void *args) { - samFile *infile = NULL, *outfile = NULL; - sam_hdr_t *in_samhdr = NULL; - bam1_t *bamdata = NULL; - hts_itr_t *iter = NULL; - hts_idx_t *idx = NULL; - splitdata *data = (splitdata*)args; - char *file = NULL, *region = NULL; - int size = 0, ret = 0; - if (!(infile = sam_open(data->infile, "r"))) { - printf("Failed to open input file\n"); - goto end; - } - if (!(in_samhdr = sam_hdr_read(infile))) { - printf("Failed to read header data\n"); - goto end; - } - if (!(bamdata = bam_init1())) { - printf("Failed to initialize bamdata\n"); - goto end; - } - size = strlen(data->region->name) + 50; //region specification - if (!(region = malloc(size))) { - printf("Failed to allocate memory\n"); - goto end; - } - snprintf(region, size, "%s:%"PRIhts_pos"-%"PRIhts_pos, data->region->name, data->region->start, data->region->end); - size += strlen(data->outdir); //output file with path - if (!(file = malloc(size))) { - printf("Failed to allocate memory\n"); - goto end; - } - snprintf(file, size, "%s/%s_%"PRIhts_pos"_%"PRIhts_pos".sam", data->outdir, data->region->name, data->region->start, data->region->end); - if (!(idx = sam_index_load(infile, data->infile))) { - printf("Failed to load index\n"); - goto end; - } - if (!(iter = sam_itr_querys(idx, in_samhdr, region))) { - printf("Failed to create iterator\n"); - goto end; - } - if (!(outfile = sam_open(file, "w"))) { - printf("Failed to open output file\n"); - goto end; - } - if (sam_hdr_write(outfile, in_samhdr) < 0) { - printf("Failed to write header\n"); - goto end; - } - while ((ret = sam_itr_next(infile, iter, bamdata)) >= 0) { //read and write relevant data - if (sam_write1(outfile, in_samhdr, bamdata) < 0) { - printf("Failed to write data\n"); - goto end; + int i = 0; + data *bamdata = (data*)args; + uint64_t pos = 0; + uint8_t *data = NULL; + uint64_t counts[16] = {0}; + for ( i = 0; i < bamdata->count; ++i) { + data = bam_get_seq(bamdata->bamarray[i]); + for (pos = 0; pos < bamdata->bamarray[i]->core.l_qseq; ++pos) { + /* it is faster to count all bases and select required ones later + compared to select and count here */ + counts[bam_seqi(data, pos)]++; } } - if (ret != -1) { - printf("Failed to get all data\n"); + //update result and add the memory block for reuse + pthread_mutex_lock(&bamdata->cache->lock); + for (i = 0; i < 16; i++) { + bamdata->bases->counts[i] += counts[i]; } -end: - free(data); - if (infile) { - sam_close(infile); - } - if (outfile) { - sam_close(outfile); - } - if (iter) { - sam_itr_destroy(iter); - } - if (idx) { - hts_idx_destroy(idx); - } - if (bamdata) { - bam_destroy1(bamdata); - } - if (in_samhdr) { - sam_hdr_destroy(in_samhdr); - } - if (file) { - free(file); - } - if (region) { - free(region); - } + bamdata->next = bamdata->cache->list; + bamdata->cache->list = bamdata; + pthread_mutex_unlock(&bamdata->cache->lock); + return NULL; } -/// main - splits the data to region specific files +/// main - start of the demo /** @param argc - count of arguments * @param argv - pointer to array of arguments returns 1 on failure 0 on success */ int main(int argc, char *argv[]) { - const char *inname = NULL, *outdir = NULL, *bedfile = NULL; - int c = 0, ret = EXIT_FAILURE, cnt = 0, regcnt = 0; + const char *inname = NULL; + int c = 0, ret = EXIT_FAILURE, cnt = 0, chunk = 0; + samFile *infile = NULL; + sam_hdr_t *in_samhdr = NULL; hts_tpool *pool = NULL; hts_tpool_process *queue = NULL; - beddata *regions = NULL; + htsThreadPool tpool = {NULL, 0}; + data *bamdata = NULL; + basecount gccount = {{0}}; + datacache bamcache = {PTHREAD_MUTEX_INITIALIZER, NULL}; - //qtask infile threadcount outdir [chunksize] - if (argc != 5) { + //qtask infile threadcount [chunksize] + if (argc != 3 && argc != 4) { print_usage(stdout); goto end; } inname = argv[1]; cnt = atoi(argv[2]); - outdir = argv[3]; - bedfile = argv[4]; - //get regions from bedfile - if ((regcnt = readbedfile(bedfile, ®ions)) <= 0) { - printf("Failed to get bed data\n"); - goto end; + if (argc == 4) { + chunk = atoi(argv[3]); } if (cnt < 1) { cnt = 1; } - if (!(pool = hts_tpool_init(cnt))) { //thread pool - printf("Failed to create thread pool\n"); + if (chunk < 1) { + chunk = 4096; + } + + if (!(pool = hts_tpool_init(cnt))) { + fprintf(stderr, "Failed to create thread pool\n"); goto end; } + tpool.pool = pool; //to share the pool for file read and write as well //queue to use with thread pool, for tasks if (!(queue = hts_tpool_process_init(pool, cnt * 2, 1))) { - printf("Failed to create queue\n"); + fprintf(stderr, "Failed to create queue\n"); goto end; } - for (c = 0; c < regcnt; ++c) { - struct splitdata *task = malloc(sizeof(splitdata)); - task->infile = inname; - task->outdir = outdir; - task->region = regions + c; - //schedule jobs to run in parallel - if (hts_tpool_dispatch(pool, queue, splittoregions, task) < 0) { - printf("Failed to schedule processing\n"); - goto end; - } + //open input file - r reading + if (!(infile = sam_open(inname, "r"))) { + fprintf(stderr, "Could not open %s\n", inname); + goto end; } - //trigger processing for anything pending, NOTE: will be blocked until queue is cleared - if (hts_tpool_process_flush(queue) == -1) { - printf("Failed to flush queues\n"); + //share the thread pool with i/o files + if (hts_set_opt(infile, HTS_OPT_THREAD_POOL, &tpool) < 0) { + fprintf(stderr, "Failed to set threads to i/o files\n"); + goto end; + } + //read header, required to resolve the target names to proper ids + if (!(in_samhdr = sam_hdr_read(infile))) { + fprintf(stderr, "Failed to read header from file!\n"); goto end; } - ret = EXIT_SUCCESS; - //shutdown queues to exit the result wait - hts_tpool_process_shutdown(queue); + /*tasks are queued, worker threads get them and process in parallel; + all bases are counted instead of counting atcg alone as it is faster*/ -end: - //cleanup - for (c = 0; c < regcnt; ++c) { - free(regions[c].name); + c = 0; + while (c >= 0) { + //use cached storage to avoid allocate/deallocate overheads + if (!(bamdata = getbamstorage(chunk, &gccount, &bamcache))) { + fprintf(stderr, "Failed to allocate memory\n"); + break; + } + //read alignments, upto max size for this lot + for (cnt = 0; cnt < bamdata->maxsize; ++cnt) { + c = sam_read1(infile, in_samhdr, bamdata->bamarray[cnt]); + if (c < 0) { + break; // EOF or failure + } + } + if (c >= -1 ) { + //max size data or reached EOF + bamdata->count = cnt; + // Queue the data for processing. hts_tpool_dispatch3() is + // used here as it allows in-flight data to be cleaned up + // properly when stopping early due to errors. + if (hts_tpool_dispatch3(pool, queue, thread_unordered_proc, bamdata, + cleanup_bamstorage, cleanup_bamstorage, + 0) == -1) { + fprintf(stderr, "Failed to schedule processing\n"); + goto end; + } + bamdata = NULL; + } else { + fprintf(stderr, "Error in reading data\n"); + break; + } } - free(regions); + if (-1 == c) { + // EOF read, ensure all are processed, waits for all to finish + if (hts_tpool_process_flush(queue) == -1) { + fprintf(stderr, "Failed to flush queue\n"); + } else { //all done + //refer seq_nt16_str to find position of required bases + fprintf(stdout, "GCratio: %f\nBase counts:\n", + (gccount.counts[2] /*C*/ + gccount.counts[4] /*G*/) / (float) + (gccount.counts[1] /*A*/ + gccount.counts[8] /*T*/ + + gccount.counts[2] + gccount.counts[4])); + + for (cnt = 0; cnt < 16; ++cnt) { + fprintf(stdout, "%c: %"PRIu64"\n", seq_nt16_str[cnt], gccount.counts[cnt]); + } + + ret = EXIT_SUCCESS; + } + } + end: if (queue) { hts_tpool_process_destroy(queue); } + + if (in_samhdr) { + sam_hdr_destroy(in_samhdr); + } + if (infile) { + if (sam_close(infile) != 0) { + ret = EXIT_FAILURE; + } + } + + pthread_mutex_lock(&bamcache.lock); + if (bamcache.list) { + struct data *tmp = NULL; + while (bamcache.list) { + tmp = bamcache.list; + bamcache.list = bamcache.list->next; + cleanup_bamstorage(tmp); + } + } + pthread_mutex_unlock(&bamcache.lock); + if (pool) { hts_tpool_destroy(pool); }