Skip to content

Commit e5dbcf8

Browse files
authored
Merge pull request open-mpi#12196 from edgargabriel/topic/fbtl-posix-reorg
fbtl/posix: prepare component for new request types
2 parents e852c0e + 39d369a commit e5dbcf8

File tree

4 files changed

+229
-220
lines changed

4 files changed

+229
-220
lines changed

ompi/mca/fbtl/posix/fbtl_posix.c

Lines changed: 113 additions & 112 deletions
Original file line numberDiff line numberDiff line change
@@ -36,7 +36,7 @@
3636
#include <aio.h>
3737
#endif
3838

39-
int ompi_fbtl_posix_max_aio_active_reqs=2048;
39+
int ompi_fbtl_posix_max_prd_active_reqs=2048;
4040

4141
#include "ompi/mca/fbtl/fbtl.h"
4242
#include "ompi/mca/fbtl/posix/fbtl_posix.h"
@@ -106,7 +106,7 @@ int mca_fbtl_posix_module_init (ompio_file_t *file) {
106106
#if defined (FBTL_POSIX_HAVE_AIO)
107107
long val = sysconf(_SC_AIO_MAX);
108108
if ( -1 != val ) {
109-
ompi_fbtl_posix_max_aio_active_reqs = (int)val;
109+
ompi_fbtl_posix_max_prd_active_reqs = (int)val;
110110
}
111111
#endif
112112
return OMPI_SUCCESS;
@@ -125,147 +125,148 @@ bool mca_fbtl_posix_progress ( mca_ompio_request_t *req)
125125
mca_fbtl_posix_request_data_t *data=(mca_fbtl_posix_request_data_t *)req->req_data;
126126
off_t start_offset, end_offset, total_length;
127127

128-
for (i=data->aio_first_active_req; i < data->aio_last_active_req; i++ ) {
129-
if ( EINPROGRESS == data->aio_req_status[i] ) {
130-
data->aio_req_status[i] = aio_error ( &data->aio_reqs[i]);
131-
if ( 0 == data->aio_req_status[i]){
132-
/* assuming right now that aio_return will return
133-
** the number of bytes written/read and not an error code,
134-
** since aio_error should have returned an error in that
135-
** case and not 0 ( which means request is complete)
136-
*/
137-
ssize_t ret2 = aio_return (&data->aio_reqs[i]);
138-
data->aio_total_len += ret2;
139-
if ( data->aio_reqs[i].aio_nbytes != (size_t)ret2 ) {
128+
for (i=data->prd_first_active_req; i < data->prd_last_active_req; i++ ) {
129+
if ( EINPROGRESS == data->prd_aio.aio_req_status[i] ) {
130+
data->prd_aio.aio_req_status[i] = aio_error ( &data->prd_aio.aio_reqs[i]);
131+
if ( 0 == data->prd_aio.aio_req_status[i]){
132+
/* assuming right now that aio_return will return
133+
** the number of bytes written/read and not an error code,
134+
** since aio_error should have returned an error in that
135+
** case and not 0 ( which means request is complete)
136+
*/
137+
ssize_t ret2 = aio_return (&data->prd_aio.aio_reqs[i]);
138+
data->prd_total_len += ret2;
139+
if ( data->prd_aio.aio_reqs[i].aio_nbytes != (size_t)ret2 ) {
140140
/* Partial completion */
141-
data->aio_reqs[i].aio_offset += ret2;
142-
data->aio_reqs[i].aio_buf = (char*)data->aio_reqs[i].aio_buf + ret2;
143-
data->aio_reqs[i].aio_nbytes -= ret2;
144-
data->aio_reqs[i].aio_reqprio = 0;
145-
data->aio_reqs[i].aio_sigevent.sigev_notify = SIGEV_NONE;
146-
data->aio_req_status[i] = EINPROGRESS;
147-
start_offset = data->aio_reqs[i].aio_offset;
148-
total_length = data->aio_reqs[i].aio_nbytes;
141+
data->prd_aio.aio_reqs[i].aio_offset += ret2;
142+
data->prd_aio.aio_reqs[i].aio_buf = (char*)data->prd_aio.aio_reqs[i].aio_buf + ret2;
143+
data->prd_aio.aio_reqs[i].aio_nbytes -= ret2;
144+
data->prd_aio.aio_reqs[i].aio_reqprio = 0;
145+
data->prd_aio.aio_reqs[i].aio_sigevent.sigev_notify = SIGEV_NONE;
146+
data->prd_aio.aio_req_status[i] = EINPROGRESS;
147+
start_offset = data->prd_aio.aio_reqs[i].aio_offset;
148+
total_length = data->prd_aio.aio_reqs[i].aio_nbytes;
149149
/* release previous lock */
150-
mca_fbtl_posix_unlock ( &data->aio_lock, data->aio_fh, &data->aio_lock_counter );
150+
mca_fbtl_posix_unlock ( &data->prd_lock, data->prd_fh, &data->prd_lock_counter );
151151

152-
if ( data->aio_req_type == FBTL_POSIX_WRITE ) {
153-
ret_code = mca_fbtl_posix_lock( &data->aio_lock, data->aio_fh, F_WRLCK, start_offset, total_length,
154-
OMPIO_LOCK_ENTIRE_REGION, &data->aio_lock_counter );
152+
if ( data->prd_req_type == FBTL_POSIX_AIO_WRITE ) {
153+
ret_code = mca_fbtl_posix_lock( &data->prd_lock, data->prd_fh, F_WRLCK, start_offset, total_length,
154+
OMPIO_LOCK_ENTIRE_REGION, &data->prd_lock_counter );
155155
if ( 0 < ret_code ) {
156156
opal_output(1, "mca_fbtl_posix_progress: error in mca_fbtl_posix_lock() %d", ret_code);
157157
/* Just in case some part of the lock actually succeeded. */
158-
mca_fbtl_posix_unlock ( &data->aio_lock, data->aio_fh, &data->aio_lock_counter );
159-
return OMPI_ERROR;
158+
mca_fbtl_posix_unlock ( &data->prd_lock, data->prd_fh, &data->prd_lock_counter );
159+
return false;
160160
}
161-
if (-1 == aio_write(&data->aio_reqs[i])) {
161+
if (-1 == aio_write(&data->prd_aio.aio_reqs[i])) {
162162
opal_output(1, "mca_fbtl_posix_progress: error in aio_write()");
163-
mca_fbtl_posix_unlock ( &data->aio_lock, data->aio_fh, &data->aio_lock_counter );
164-
return OMPI_ERROR;
165-
}
163+
mca_fbtl_posix_unlock ( &data->prd_lock, data->prd_fh, &data->prd_lock_counter );
164+
return false;
165+
}
166166
}
167-
else if ( data->aio_req_type == FBTL_POSIX_READ ) {
168-
ret_code = mca_fbtl_posix_lock( &data->aio_lock, data->aio_fh, F_RDLCK, start_offset, total_length,
169-
OMPIO_LOCK_ENTIRE_REGION, &data->aio_lock_counter );
167+
else if ( data->prd_req_type == FBTL_POSIX_AIO_READ ) {
168+
ret_code = mca_fbtl_posix_lock( &data->prd_lock, data->prd_fh, F_RDLCK, start_offset, total_length,
169+
OMPIO_LOCK_ENTIRE_REGION, &data->prd_lock_counter );
170170
if ( 0 < ret_code ) {
171171
opal_output(1, "mca_fbtl_posix_progress: error in mca_fbtl_posix_lock() %d", ret_code);
172172
/* Just in case some part of the lock actually succeeded. */
173-
mca_fbtl_posix_unlock ( &data->aio_lock, data->aio_fh, &data->aio_lock_counter );
174-
return OMPI_ERROR;
173+
mca_fbtl_posix_unlock ( &data->prd_lock, data->prd_fh, &data->prd_lock_counter );
174+
return false;
175175
}
176-
if (-1 == aio_read(&data->aio_reqs[i])) {
176+
if (-1 == aio_read(&data->prd_aio.aio_reqs[i])) {
177177
opal_output(1, "mca_fbtl_posix_progress: error in aio_read()");
178-
mca_fbtl_posix_unlock ( &data->aio_lock, data->aio_fh, &data->aio_lock_counter );
179-
return OMPI_ERROR;
178+
mca_fbtl_posix_unlock ( &data->prd_lock, data->prd_fh, &data->prd_lock_counter );
179+
return false;
180180
}
181181
}
182182
}
183-
else {
184-
data->aio_open_reqs--;
183+
else {
184+
data->prd_open_reqs--;
185185
lcount++;
186186
}
187-
}
188-
else if ( EINPROGRESS == data->aio_req_status[i]){
189-
/* not yet done */
190-
continue;
191-
}
192-
else {
193-
/* an error occurred. Mark the request done, but
194-
set an error code in the status */
195-
req->req_ompi.req_status.MPI_ERROR = OMPI_ERROR;
196-
req->req_ompi.req_status._ucount = data->aio_total_len;
197-
ret = true;
198-
break;
199-
}
200-
}
201-
else {
202-
lcount++;
203-
}
187+
}
188+
else if ( EINPROGRESS == data->prd_aio.aio_req_status[i]){
189+
/* not yet done */
190+
continue;
191+
}
192+
else {
193+
/* an error occurred. Mark the request done, but
194+
set an error code in the status */
195+
req->req_ompi.req_status.MPI_ERROR = OMPI_ERROR;
196+
req->req_ompi.req_status._ucount = data->prd_total_len;
197+
ret = true;
198+
break;
199+
}
200+
}
201+
else {
202+
lcount++;
203+
}
204204
}
205205
#if 0
206-
printf("lcount=%d open_reqs=%d\n", lcount, data->aio_open_reqs );
206+
printf("lcount=%d open_reqs=%d\n", lcount, data->prd_open_reqs );
207207
#endif
208-
if ( (lcount == data->aio_req_chunks) && (0 != data->aio_open_reqs )) {
208+
if ( (lcount == data->prd_req_chunks) && (0 != data->prd_open_reqs )) {
209209
/* release the lock of the previous operations */
210-
mca_fbtl_posix_unlock ( &data->aio_lock, data->aio_fh, &data->aio_lock_counter );
210+
mca_fbtl_posix_unlock ( &data->prd_lock, data->prd_fh, &data->prd_lock_counter );
211211

212-
/* post the next batch of operations */
213-
data->aio_first_active_req = data->aio_last_active_req;
214-
if ( (data->aio_req_count-data->aio_last_active_req) > data->aio_req_chunks ) {
215-
data->aio_last_active_req += data->aio_req_chunks;
216-
}
217-
else {
218-
data->aio_last_active_req = data->aio_req_count;
219-
}
212+
/* post the next batch of operations */
213+
data->prd_first_active_req = data->prd_last_active_req;
214+
if ( (data->prd_req_count-data->prd_last_active_req) > data->prd_req_chunks ) {
215+
data->prd_last_active_req += data->prd_req_chunks;
216+
}
217+
else {
218+
data->prd_last_active_req = data->prd_req_count;
219+
}
220220

221-
start_offset = data->aio_reqs[data->aio_first_active_req].aio_offset;
222-
end_offset = data->aio_reqs[data->aio_last_active_req-1].aio_offset + data->aio_reqs[data->aio_last_active_req-1].aio_nbytes;
221+
start_offset = data->prd_aio.aio_reqs[data->prd_first_active_req].aio_offset;
222+
end_offset = data->prd_aio.aio_reqs[data->prd_last_active_req-1].aio_offset +
223+
data->prd_aio.aio_reqs[data->prd_last_active_req-1].aio_nbytes;
223224
total_length = (end_offset - start_offset);
224225

225-
if ( FBTL_POSIX_READ == data->aio_req_type ) {
226-
ret_code = mca_fbtl_posix_lock( &data->aio_lock, data->aio_fh, F_RDLCK, start_offset, total_length,
227-
OMPIO_LOCK_ENTIRE_REGION, &data->aio_lock_counter );
226+
if ( FBTL_POSIX_AIO_READ == data->prd_req_type ) {
227+
ret_code = mca_fbtl_posix_lock( &data->prd_lock, data->prd_fh, F_RDLCK, start_offset, total_length,
228+
OMPIO_LOCK_ENTIRE_REGION, &data->prd_lock_counter );
228229
}
229-
else if ( FBTL_POSIX_WRITE == data->aio_req_type ) {
230-
ret_code = mca_fbtl_posix_lock( &data->aio_lock, data->aio_fh, F_WRLCK, start_offset, total_length,
231-
OMPIO_LOCK_ENTIRE_REGION, &data->aio_lock_counter );
230+
else if ( FBTL_POSIX_AIO_WRITE == data->prd_req_type ) {
231+
ret_code = mca_fbtl_posix_lock( &data->prd_lock, data->prd_fh, F_WRLCK, start_offset, total_length,
232+
OMPIO_LOCK_ENTIRE_REGION, &data->prd_lock_counter );
232233
}
233234
if ( 0 < ret_code ) {
234235
opal_output(1, "mca_fbtl_posix_progress: error in mca_fbtl_posix_lock() %d", ret_code);
235236
/* Just in case some part of the lock actually succeeded. */
236-
mca_fbtl_posix_unlock ( &data->aio_lock, data->aio_fh, &data->aio_lock_counter );
237-
return OMPI_ERROR;
237+
mca_fbtl_posix_unlock ( &data->prd_lock, data->prd_fh, &data->prd_lock_counter );
238+
return false;
238239
}
239240

240-
for ( i=data->aio_first_active_req; i< data->aio_last_active_req; i++ ) {
241-
if ( FBTL_POSIX_READ == data->aio_req_type ) {
242-
if (-1 == aio_read(&data->aio_reqs[i])) {
243-
opal_output(1, "mca_fbtl_posix_progress: error in aio_read()");
244-
mca_fbtl_posix_unlock ( &data->aio_lock, data->aio_fh, &data->aio_lock_counter );
245-
return OMPI_ERROR;
246-
}
247-
}
248-
else if ( FBTL_POSIX_WRITE == data->aio_req_type ) {
249-
if (-1 == aio_write(&data->aio_reqs[i])) {
250-
opal_output(1, "mca_fbtl_posix_progress: error in aio_write()");
251-
mca_fbtl_posix_unlock ( &data->aio_lock, data->aio_fh, &data->aio_lock_counter );
252-
return OMPI_ERROR;
253-
}
254-
}
255-
}
241+
for ( i=data->prd_first_active_req; i< data->prd_last_active_req; i++ ) {
242+
if ( FBTL_POSIX_AIO_READ == data->prd_req_type ) {
243+
if (-1 == aio_read(&data->prd_aio.aio_reqs[i])) {
244+
opal_output(1, "mca_fbtl_posix_progress: error in aio_read()");
245+
mca_fbtl_posix_unlock ( &data->prd_lock, data->prd_fh, &data->prd_lock_counter );
246+
return false;
247+
}
248+
}
249+
else if ( FBTL_POSIX_AIO_WRITE == data->prd_req_type ) {
250+
if (-1 == aio_write(&data->prd_aio.aio_reqs[i])) {
251+
opal_output(1, "mca_fbtl_posix_progress: error in aio_write()");
252+
mca_fbtl_posix_unlock ( &data->prd_lock, data->prd_fh, &data->prd_lock_counter );
253+
return false;
254+
}
255+
}
256+
}
256257
#if 0
257-
printf("posting new batch: first=%d last=%d\n", data->aio_first_active_req, data->aio_last_active_req );
258+
printf("posting new batch: first=%d last=%d\n", data->prd_first_active_req, data->prd_last_active_req );
258259
#endif
259260
}
260261

261-
if ( 0 == data->aio_open_reqs ) {
262-
/* all pending operations are finished for this request */
263-
req->req_ompi.req_status.MPI_ERROR = OMPI_SUCCESS;
264-
req->req_ompi.req_status._ucount = data->aio_total_len;
265-
mca_fbtl_posix_unlock ( &data->aio_lock, data->aio_fh, &data->aio_lock_counter );
262+
if ( 0 == data->prd_open_reqs ) {
263+
/* all pending operations are finished for this request */
264+
req->req_ompi.req_status.MPI_ERROR = OMPI_SUCCESS;
265+
req->req_ompi.req_status._ucount = data->prd_total_len;
266+
mca_fbtl_posix_unlock ( &data->prd_lock, data->prd_fh, &data->prd_lock_counter );
266267

267-
if ( data->aio_fh->f_atomicity ) {
268-
mca_fbtl_posix_unlock ( &data->aio_lock, data->aio_fh, &data->aio_lock_counter );
268+
if ( data->prd_fh->f_atomicity ) {
269+
mca_fbtl_posix_unlock ( &data->prd_lock, data->prd_fh, &data->prd_lock_counter );
269270
}
270271

271272
ret = true;
@@ -281,14 +282,14 @@ void mca_fbtl_posix_request_free ( mca_ompio_request_t *req)
281282
mca_fbtl_posix_request_data_t *data=(mca_fbtl_posix_request_data_t *)req->req_data;
282283
if (NULL != data ) {
283284

284-
if ( NULL != data->aio_reqs ) {
285-
free ( data->aio_reqs);
286-
}
287-
if ( NULL != data->aio_req_status ) {
288-
free ( data->aio_req_status );
289-
}
290-
free ( data );
291-
req->req_data = NULL;
285+
if ( NULL != data->prd_aio.aio_reqs ) {
286+
free ( data->prd_aio.aio_reqs);
287+
}
288+
if ( NULL != data->prd_aio.aio_req_status ) {
289+
free ( data->prd_aio.aio_req_status );
290+
}
291+
free (data);
292+
req->req_data = NULL;
292293
}
293294
#endif
294295
return;

ompi/mca/fbtl/posix/fbtl_posix.h

Lines changed: 24 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -47,7 +47,7 @@ int mca_fbtl_posix_component_file_unquery (ompio_file_t *file);
4747
int mca_fbtl_posix_module_init (ompio_file_t *file);
4848
int mca_fbtl_posix_module_finalize (ompio_file_t *file);
4949

50-
extern int ompi_fbtl_posix_max_aio_active_reqs;
50+
extern int ompi_fbtl_posix_max_prd_active_reqs;
5151

5252
OMPI_DECLSPEC extern mca_fbtl_base_component_2_0_0_t mca_fbtl_posix_component;
5353
/*
@@ -72,29 +72,36 @@ int mca_fbtl_posix_lock ( struct flock *lock, ompio_file_t *fh, int op,
7272
int *lock_counter);
7373
void mca_fbtl_posix_unlock ( struct flock *lock, ompio_file_t *fh, int *lock_counter );
7474

75+
/* Right now statically defined, will become a configure check */
76+
#define FBTL_POSIX_HAVE_AIO 1
7577

7678
struct mca_fbtl_posix_request_data_t {
77-
int aio_req_count; /* total number of aio reqs */
78-
int aio_open_reqs; /* number of unfinished reqs */
79-
int aio_req_type; /* read or write */
80-
int aio_req_chunks; /* max. no. of aio reqs that can be posted at once*/
81-
int aio_first_active_req; /* first active posted req */
82-
int aio_last_active_req; /* last currently active poted req */
83-
struct aiocb *aio_reqs; /* pointer array of req structures */
84-
int *aio_req_status; /* array of statuses */
85-
ssize_t aio_total_len; /* total amount of data written */
86-
struct flock aio_lock; /* lock used for certain file systems */
87-
int aio_lock_counter; /* to keep track of no. of lock calls */
88-
ompio_file_t *aio_fh; /* pointer back to the mca_io_ompio_fh structure */
79+
int prd_req_count; /* total number of sub reqs */
80+
int prd_open_reqs; /* number of unfinished reqs */
81+
int prd_req_type; /* read or write */
82+
int prd_req_chunks; /* max. no. of sub reqs that can be posted at once*/
83+
int prd_first_active_req; /* first active posted req */
84+
int prd_last_active_req; /* last currently active poted req */
85+
ssize_t prd_total_len; /* total amount of data written */
86+
struct flock prd_lock; /* lock used for certain file systems */
87+
int prd_lock_counter; /* to keep track of no. of lock calls */
88+
ompio_file_t *prd_fh; /* pointer to the ompio_fh structure */
89+
union {
90+
#if defined (FBTL_POSIX_HAVE_AIO)
91+
struct {
92+
struct aiocb *aio_reqs; /* pointer array of req structures */
93+
int *aio_req_status; /* array of statuses */
94+
} prd_aio;
95+
#endif
96+
};
97+
8998
};
9099
typedef struct mca_fbtl_posix_request_data_t mca_fbtl_posix_request_data_t;
91100

92-
/* Right now statically defined, will become a configure check */
93-
#define FBTL_POSIX_HAVE_AIO 1
94101

95102
/* define constants for AIO requests */
96-
#define FBTL_POSIX_READ 1
97-
#define FBTL_POSIX_WRITE 2
103+
#define FBTL_POSIX_AIO_READ 1
104+
#define FBTL_POSIX_AIO_WRITE 2
98105

99106
#define OMPIO_SET_ATOMICITY_LOCK(_fh, _lock, _lock_counter, _op) { \
100107
int32_t _orig_flags = _fh->f_flags; \

0 commit comments

Comments
 (0)