Skip to content

Commit

Permalink
Improve qtask_ordered sample program
Browse files Browse the repository at this point in the history
Use hts_tpool_next_result_wait() in threadfn_orderedwrite() so it
blocks properly when it has nothing to do.  Remove lock and
conditional that are no longer needed.  Use a sentinel job to
tell threadfn_orderedwrite() that there is no more data and it can
terminate.

Improve error handling.  Add a data::result field so that
threadfn_orderedwrite() can report back to the main thread if
it encountered a problem.  Ensure everything is cleaned up
correctly if either the main thread or threadfn_orderedwrite()
fail, and in the right order.  Set the return value to EXIT_FAILURE
if either sam_close() or threadfn_orderedwrite() report failure.

Ensure error messages are written to stderr.
  • Loading branch information
daviesrob authored and vasudeva8 committed Jul 2, 2024
1 parent a73aaea commit c553f40
Showing 1 changed file with 141 additions and 100 deletions.
241 changes: 141 additions & 100 deletions samples/qtask_ordered.c
Original file line number Diff line number Diff line change
Expand Up @@ -37,9 +37,7 @@ typedef struct orderedwrite {
samFile *outfile; //output file handle
sam_hdr_t *samhdr; //header used to write data
hts_tpool_process *queue; //queue from which results to be retrieved

pthread_cond_t *done; //indicates exit condition
pthread_mutex_t *lock; //to synchronise queue access
int result; //result code returned by writer thread
} orderedwrite;

typedef struct data {
Expand All @@ -48,7 +46,6 @@ typedef struct data {
bam1_t **bamarray; //bam1_t array for optimal queueing
} data;

#define WAIT 1 //1 sec
/// print_usage - print the usage
/** @param fp pointer to the file / terminal to which usage to be dumped
returns nothing
Expand Down Expand Up @@ -82,7 +79,7 @@ int addcount(bam1_t *bamdata)
gcratio = gc / (float) bamdata->core.l_qseq;

if (bam_aux_append(bamdata, "xr", 'f', sizeof(gcratio), (const uint8_t*)&gcratio) < 0) {
printf("Failed to add aux tag xr, errno: %d\n", errno);
fprintf(stderr, "Failed to add aux tag xr, errno: %d\n", errno);
ret = -1;
}
return ret;
Expand Down Expand Up @@ -113,6 +110,28 @@ data* getbamstorage(int chunk)
return bamdata;
}

/// cleanup_bamstorage - frees a bamdata struct plus contents
/** @param arg Pointer to data to free
@p arg has type void * so it can be used as a callback passed
to hts_tpool_dispatch3().
*/
void cleanup_bamstorage(void *arg)
{
data *bamdata = (data *) arg;

if (!bamdata)
return;
if (bamdata->bamarray) {
int i;
for (i = 0; i < bamdata->size; i++) {
bam_destroy1(bamdata->bamarray[i]);
}
free(bamdata->bamarray);
}
free(bamdata);
}

/// thread_ordered_proc - does the processing of task in queue and queues the output back
/** @param args pointer to set of data to be processed
returns the processed data
Expand All @@ -123,10 +142,14 @@ void *thread_ordered_proc(void *args)
{
int i = 0;
data *bamdata = (data*)args;

if (bamdata == NULL)
return NULL; // Indicates no more input

for ( i = 0; i < bamdata->count; ++i) {
//add count
if (addcount(bamdata->bamarray[i]) < 0) {
printf("Failed to calculate gc data\n");
fprintf(stderr, "Failed to calculate gc data\n");
break;
}
}
Expand All @@ -143,44 +166,39 @@ void *threadfn_orderedwrite(void *args)
hts_tpool_result *r = NULL;
data *bamdata = NULL;
int i = 0;
int count = 0;

struct timeval now;
struct timespec timeout;
long usec = 0;

pthread_mutex_lock(tdata->lock); //lock to check the exit condition
do {
//get result and write; wait if no result is in queue - until shutdown of queue
while ((r = hts_tpool_next_result(tdata->queue))) {
bamdata = (data*) hts_tpool_result_data(r);
for (i = 0; i < bamdata->count; ++i) {
if (sam_write1(tdata->outfile, tdata->samhdr, bamdata->bamarray[i]) < 0) {
printf("Failed to write output data\n");
break;
}
}
hts_tpool_delete_result(r, 0); //release the result memory
if (bamdata) {
for (i = 0; i < bamdata->size; ++i) {
bam_destroy1(bamdata->bamarray[i]); //clear the bamdata;
}
free(bamdata->bamarray);
free(bamdata);
}
}
//no more data in queues, check and wait till exit is triggered
gettimeofday(&now, NULL);
usec = now.tv_usec + 100000; //+100msec
if (usec >= 1000000) { //overflow
usec %= 1000000;
now.tv_sec++;
tdata->result = 0;

//get result and write; wait if no result is in queue - until shutdown of queue
while (tdata->result == 0 &&
(r = hts_tpool_next_result_wait(tdata->queue)) != NULL) {
bamdata = (data*) hts_tpool_result_data(r);

if (bamdata == NULL) {
// Indicator for no more input. Time to stop.
hts_tpool_delete_result(r, 0);
break;
}
timeout.tv_sec = now.tv_sec;
timeout.tv_nsec = usec * 1000;

} while (pthread_cond_timedwait(tdata->done, tdata->lock, &timeout) == ETIMEDOUT);
for (i = 0; i < bamdata->count; ++i) {
if (sam_write1(tdata->outfile, tdata->samhdr, bamdata->bamarray[i]) < 0) {
fprintf(stderr, "Failed to write output data\n");
tdata->result = -1;
break;
}
}
hts_tpool_delete_result(r, 0); //release the result memory
cleanup_bamstorage(bamdata);
}

pthread_mutex_unlock(tdata->lock);
// Shut down the process queue. If we stopped early due to a write failure,
// this will signal to the other end that something has gone wrong.
hts_tpool_process_shutdown(tdata->queue);

return NULL;
}
Expand All @@ -194,12 +212,10 @@ int main(int argc, char *argv[])
{
const char *inname = NULL, *outdir = NULL;
char *file = NULL;
int c = 0, ret = EXIT_FAILURE, cnt = 0, clearthread = 0, chunk = 0;
int c = 0, ret = EXIT_FAILURE, cnt = 0, started_thread = 0, chunk = 0;
size_t size = 0;
samFile *infile = NULL, *outfile = NULL;
sam_hdr_t *in_samhdr = NULL;
pthread_mutex_t stopcondsynch = PTHREAD_MUTEX_INITIALIZER;
pthread_cond_t stopcond = PTHREAD_COND_INITIALIZER;
pthread_t thread;
orderedwrite twritedata = {0};
hts_tpool *pool = NULL;
Expand Down Expand Up @@ -230,136 +246,161 @@ int main(int argc, char *argv[])
size = (strlen(outdir) + sizeof("/out.sam") + 1); //space for output file name and null termination
file = malloc(size);
if (!file) {
printf("Failed to set output path\n");
fprintf(stderr, "Failed to set output path\n");
goto end;
}
snprintf(file, size, "%s/out.sam", outdir); //output file name
if (!(pool = hts_tpool_init(cnt))) { //thread pool
printf("Failed to create thread pool\n");
fprintf(stderr, "Failed to create thread pool\n");
goto end;
}
tpool.pool = pool; //to share the pool for file read and write as well
//queue to use with thread pool, for task and results
if (!(queue = hts_tpool_process_init(pool, cnt * 2, 0))) {
printf("Failed to create queue\n");
fprintf(stderr, "Failed to create queue\n");
goto end;
}
//open input file - r reading
if (!(infile = sam_open(inname, "r"))) {
printf("Could not open %s\n", inname);
fprintf(stderr, "Could not open %s\n", inname);
goto end;
}
//open output files - w write as SAM, wb write as BAM
if (!(outfile = sam_open(file, "w"))) {
printf("Could not open output file\n");
fprintf(stderr, "Could not open output file\n");
goto end;
}
//share the thread pool with i/o files
if (hts_set_opt(infile, HTS_OPT_THREAD_POOL, &tpool) < 0 ||
hts_set_opt(outfile, HTS_OPT_THREAD_POOL, &tpool) < 0) {
printf("Failed to set threads to i/o files\n");
fprintf(stderr, "Failed to set threads to i/o files\n");
goto end;
}
//read header, required to resolve the target names to proper ids
if (!(in_samhdr = sam_hdr_read(infile))) {
printf("Failed to read header from file!\n");
fprintf(stderr, "Failed to read header from file!\n");
goto end;
}
/*tasks are queued, worker threads get them and processes in parallel;
the results are queued and they are to be removed in parallel as well*/
//start output writer thread for ordered processing

//write header
if ((sam_hdr_write(outfile, in_samhdr) == -1)) {
fprintf(stderr, "Failed to write header\n");
goto end;
}

/*tasks are queued, worker threads get them and process in parallel;
the results are queued and they are to be removed in parallel as well */

// start output writer thread for ordered processing
twritedata.outfile = outfile;
twritedata.queue = queue;
twritedata.done = &stopcond;
twritedata.lock = &stopcondsynch;
twritedata.samhdr = in_samhdr;
twritedata.result = 0;
if (pthread_create(&thread, NULL, threadfn_orderedwrite, &twritedata)) {
printf("Failed to create writer thread\n");
goto end;
}
clearthread = 1;
//write header
if ((sam_hdr_write(outfile, in_samhdr) == -1)) {
printf("Failed to write header\n");
fprintf(stderr, "Failed to create writer thread\n");
goto end;
}
started_thread = 1;

c = 0;
while (c >= 0) {
bamdata = getbamstorage(chunk);
//read alignments, upto max size for this lot
for (cnt = 0; cnt < bamdata->size; ++cnt) {
c = sam_read1(infile, in_samhdr, bamdata->bamarray[cnt]);
if (c >= 0) {
continue; //read next
}
else {
break; //failure
if (c < 0) {
break; // EOF or failure
}
}
if (c >= -1 ) {
//max size data or reached EOF
bamdata->count = ( c >= 0 )? bamdata->size : cnt;
//queue the lot for processing
if (hts_tpool_dispatch(pool, queue, thread_ordered_proc,
bamdata) == -1) {
printf("Failed to schedule processing\n");
bamdata->count = cnt;
// Queue the data for processing. hts_tpool_dispatch3() is
// used here as it allows in-flight data to be cleaned up
// properly when stopping early due to errors.
if (hts_tpool_dispatch3(pool, queue, thread_ordered_proc, bamdata,
cleanup_bamstorage, cleanup_bamstorage,
0) == -1) {
fprintf(stderr, "Failed to schedule processing\n");
goto end;
}
bamdata = NULL;
}
else {
printf("Error in reading data\n");
fprintf(stderr, "Error in reading data\n");
break;
}
}
if (-1 == c) {
//EOF read, trigger processing for anything pending, NOTE: will be blocked until queue is cleared
if (hts_tpool_process_flush(queue) == -1) {
printf("Failed to flush queues\n");
goto end;
}
//all tasks done, check for processing completion
while (1) {
if (!hts_tpool_process_empty(queue)) {
usleep(WAIT * 1000000); //results yet to be empty, check again
continue;

ret = EXIT_SUCCESS;

end:
// Tidy up after having dispatched all of the data.

// Note that the order here is important. In particular, we need
// to join the thread that was started earlier before freeing anything
// to avoid any use-after-free errors.

// It's also possible to get here early due to various error conditions,
// so we need to carefully check which parts of the program state have
// been created before trying to clean them up.

if (queue) {
if (-1 == c) {
// EOF read, send a marker to tell the threadfn_orderedwrite()
// function to shut down.
if (hts_tpool_dispatch(pool, queue, thread_ordered_proc,
NULL) == -1) {
fprintf(stderr, "Failed to schedule processing\n");
ret = EXIT_FAILURE;
}
break;

// trigger processing for anything pending
// NOTE: will be blocked until queue is cleared
if (hts_tpool_process_flush(queue) == -1) {
fprintf(stderr, "Failed to flush queues\n");
ret = EXIT_FAILURE;
}
} else {
// Error or we never wrote anything. Shut down the queue to
// ensure threadfn_orderedwrite() wakes up and terminates.
hts_tpool_process_shutdown(queue);
}
ret = EXIT_SUCCESS;
}

//trigger exit for ordered write thread
pthread_mutex_lock(twritedata.lock);
pthread_cond_signal(twritedata.done);
pthread_mutex_unlock(twritedata.lock);
end:
//cleanup
if (clearthread) {
// Wait for threadfn_orderedwrite to finish.
if (started_thread) {
pthread_join(thread, NULL);

// Once the writer thread has finished, check the result it sent back
if (twritedata.result != 0)
ret = EXIT_FAILURE;
}

if (queue) {
// Once threadfn_orderedwrite has stopped, the queue can be
// cleaned up.
hts_tpool_process_destroy(queue);
}

if (in_samhdr) {
sam_hdr_destroy(in_samhdr);
}
if (infile) {
sam_close(infile);
if (sam_close(infile) != 0)
ret = EXIT_FAILURE;
}
if (outfile) {
if (sam_close(outfile) != 0)
ret = EXIT_FAILURE;
}

if (bamdata) {
for (cnt = 0; cnt < bamdata->size; ++cnt) {
bam_destroy1(bamdata->bamarray[cnt]);
}
free(bamdata);
cleanup_bamstorage(bamdata);
}
if (file) {
free(file);
}
if (outfile) {
sam_close(outfile);
}
if (queue) {
hts_tpool_process_destroy(queue);
}
if (pool) {
hts_tpool_destroy(pool);
}
Expand Down

0 comments on commit c553f40

Please sign in to comment.