Skip to content

Commit 94f13bf

Browse files
authored
Merge pull request OSGeo#11915 from rouault/fix_11904
VRT and multitthreaded warp: propagate errors from auxiliary threads to main thread
2 parents cf9dc1a + 4970a4f commit 94f13bf

15 files changed

+415
-288
lines changed

alg/gdalwarpoperation.cpp

+29-20
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,7 @@
2929
#include "cpl_config.h"
3030
#include "cpl_conv.h"
3131
#include "cpl_error.h"
32+
#include "cpl_error_internal.h"
3233
#include "cpl_mask.h"
3334
#include "cpl_multiproc.h"
3435
#include "cpl_string.h"
@@ -1044,20 +1045,22 @@ CPLErr GDALChunkAndWarpImage(GDALWarpOperationH hOperation, int nDstXOff,
10441045
/* ChunkThreadMain() */
10451046
/************************************************************************/
10461047

1047-
typedef struct
1048+
struct ChunkThreadData
10481049
{
1049-
GDALWarpOperation *poOperation;
1050-
GDALWarpChunk *pasChunkInfo;
1051-
CPLJoinableThread *hThreadHandle;
1052-
CPLErr eErr;
1053-
double dfProgressBase;
1054-
double dfProgressScale;
1055-
CPLMutex *hIOMutex;
1056-
1057-
CPLMutex *hCondMutex;
1058-
volatile int bIOMutexTaken;
1059-
CPLCond *hCond;
1060-
} ChunkThreadData;
1050+
GDALWarpOperation *poOperation = nullptr;
1051+
GDALWarpChunk *pasChunkInfo = nullptr;
1052+
CPLJoinableThread *hThreadHandle = nullptr;
1053+
CPLErr eErr = CE_None;
1054+
double dfProgressBase = 0;
1055+
double dfProgressScale = 0;
1056+
CPLMutex *hIOMutex = nullptr;
1057+
1058+
CPLMutex *hCondMutex = nullptr;
1059+
volatile int bIOMutexTaken = 0;
1060+
CPLCond *hCond = nullptr;
1061+
1062+
CPLErrorAccumulator *poErrorAccumulator = nullptr;
1063+
};
10611064

10621065
static void ChunkThreadMain(void *pThreadData)
10631066

@@ -1086,6 +1089,10 @@ static void ChunkThreadMain(void *pThreadData)
10861089
CPLReleaseMutex(psData->hCondMutex);
10871090
}
10881091

1092+
auto oAccumulator =
1093+
psData->poErrorAccumulator->InstallForCurrentScope();
1094+
CPL_IGNORE_RET_VAL(oAccumulator);
1095+
10891096
psData->eErr = psData->poOperation->WarpRegion(
10901097
pasChunkInfo->dx, pasChunkInfo->dy, pasChunkInfo->dsx,
10911098
pasChunkInfo->dsy, pasChunkInfo->sx, pasChunkInfo->sy,
@@ -1150,13 +1157,13 @@ CPLErr GDALWarpOperation::ChunkAndWarpMulti(int nDstXOff, int nDstYOff,
11501157
/* information for each region. */
11511158
/* -------------------------------------------------------------------- */
11521159
ChunkThreadData volatile asThreadData[2] = {};
1153-
memset(reinterpret_cast<void *>(
1154-
const_cast<ChunkThreadData(*)[2]>(&asThreadData)),
1155-
0, sizeof(asThreadData));
1156-
asThreadData[0].poOperation = this;
1157-
asThreadData[0].hIOMutex = hIOMutex;
1158-
asThreadData[1].poOperation = this;
1159-
asThreadData[1].hIOMutex = hIOMutex;
1160+
CPLErrorAccumulator oErrorAccumulator;
1161+
for (int i = 0; i < 2; ++i)
1162+
{
1163+
asThreadData[i].poOperation = this;
1164+
asThreadData[i].hIOMutex = hIOMutex;
1165+
asThreadData[i].poErrorAccumulator = &oErrorAccumulator;
1166+
}
11601167

11611168
double dfPixelsProcessed = 0.0;
11621169
double dfTotalPixels = static_cast<double>(nDstXSize) * nDstYSize;
@@ -1260,6 +1267,8 @@ CPLErr GDALWarpOperation::ChunkAndWarpMulti(int nDstXOff, int nDstYOff,
12601267

12611268
WipeChunkList();
12621269

1270+
oErrorAccumulator.ReplayErrors();
1271+
12631272
psOptions->pfnProgress(1.0, "", psOptions->pProgressArg);
12641273

12651274
return eErr;

apps/gdalwarp_bin.cpp

+9-12
Original file line numberDiff line numberDiff line change
@@ -204,20 +204,17 @@ MAIN_START(argc, argv)
204204
}
205205
else
206206
{
207-
std::vector<CPLErrorHandlerAccumulatorStruct> aoErrors;
208-
CPLInstallErrorHandlerAccumulator(aoErrors);
209-
hDstDS = GDALOpenEx(
210-
sOptionsForBinary.osDstFilename.c_str(),
211-
GDAL_OF_RASTER | GDAL_OF_VERBOSE_ERROR | GDAL_OF_UPDATE, nullptr,
212-
sOptionsForBinary.aosDestOpenOptions.List(), nullptr);
213-
CPLUninstallErrorHandlerAccumulator();
207+
CPLErrorAccumulator oErrorAccumulator;
208+
{
209+
auto oAccumulator = oErrorAccumulator.InstallForCurrentScope();
210+
hDstDS = GDALOpenEx(
211+
sOptionsForBinary.osDstFilename.c_str(),
212+
GDAL_OF_RASTER | GDAL_OF_VERBOSE_ERROR | GDAL_OF_UPDATE,
213+
nullptr, sOptionsForBinary.aosDestOpenOptions.List(), nullptr);
214+
}
214215
if (hDstDS != nullptr)
215216
{
216-
for (size_t i = 0; i < aoErrors.size(); i++)
217-
{
218-
CPLError(aoErrors[i].type, aoErrors[i].no, "%s",
219-
aoErrors[i].msg.c_str());
220-
}
217+
oErrorAccumulator.ReplayErrors();
221218
}
222219
}
223220

autotest/alg/warp.py

+27
Original file line numberDiff line numberDiff line change
@@ -1964,3 +1964,30 @@ def test_warp_nodata_substitution(dt, expected_val, resampling):
19641964
struct.unpack("d", out_ds.ReadRaster(0, 0, 1, 1, buf_type=gdal.GDT_Float64))[0]
19651965
== expected_val
19661966
)
1967+
1968+
1969+
###############################################################################
1970+
# Test propagation of errors from I/O threads to main thread in multi-threaded reading
1971+
1972+
1973+
@gdaltest.enable_exceptions()
1974+
def test_warp_multi_threaded_errors(tmp_vsimem):
1975+
1976+
filename1 = str(tmp_vsimem / "tmp1.tif")
1977+
ds = gdal.GetDriverByName("GTiff").Create(filename1, 1, 1)
1978+
ds.SetGeoTransform([2, 1, 0, 49, 0, -1])
1979+
ds.Close()
1980+
1981+
filename2 = str(tmp_vsimem / "tmp2.tif")
1982+
ds = gdal.GetDriverByName("GTiff").Create(filename2, 1, 1)
1983+
ds.SetGeoTransform([3, 1, 0, 49, 0, -1])
1984+
ds.Close()
1985+
1986+
vrt_filename = str(tmp_vsimem / "tmp.vrt")
1987+
gdal.BuildVRT(vrt_filename, [filename1, filename2])
1988+
1989+
gdal.Unlink(filename2)
1990+
1991+
with gdal.Open(vrt_filename) as ds:
1992+
with pytest.raises(Exception):
1993+
gdal.Warp("", ds, format="MEM", multithread=True)

autotest/gcore/vrt_read.py

+27
Original file line numberDiff line numberDiff line change
@@ -2805,6 +2805,33 @@ def test_vrt_read_multi_threaded_disabled_since_overlapping_sources():
28052805
)
28062806

28072807

2808+
###############################################################################
2809+
# Test propagation of errors from threads to main thread in multi-threaded reading
2810+
2811+
2812+
@gdaltest.enable_exceptions()
2813+
def test_vrt_read_multi_threaded_errors(tmp_vsimem):
2814+
2815+
filename1 = str(tmp_vsimem / "tmp1.tif")
2816+
ds = gdal.GetDriverByName("GTiff").Create(filename1, 1, 1)
2817+
ds.SetGeoTransform([2, 1, 0, 49, 0, -1])
2818+
ds.Close()
2819+
2820+
filename2 = str(tmp_vsimem / "tmp2.tif")
2821+
ds = gdal.GetDriverByName("GTiff").Create(filename2, 1, 1)
2822+
ds.SetGeoTransform([3, 1, 0, 49, 0, -1])
2823+
ds.Close()
2824+
2825+
vrt_filename = str(tmp_vsimem / "tmp.vrt")
2826+
gdal.BuildVRT(vrt_filename, [filename1, filename2])
2827+
2828+
gdal.Unlink(filename2)
2829+
2830+
with gdal.Open(vrt_filename) as ds:
2831+
with pytest.raises(Exception):
2832+
ds.GetRasterBand(1).ReadRaster()
2833+
2834+
28082835
###############################################################################
28092836
# Test reading a VRT with a <VRTDataset> inside a <SimpleSource>
28102837

frmts/gti/gdaltileindexdataset.cpp

+7-25
Original file line numberDiff line numberDiff line change
@@ -374,11 +374,11 @@ class GDALTileIndexDataset final : public GDALPamDataset
374374
{
375375
std::atomic<int> *pnCompletedJobs = nullptr;
376376
std::atomic<bool> *pbSuccess = nullptr;
377+
CPLErrorAccumulator *poErrorAccumulator = nullptr;
377378
GDALTileIndexDataset *poDS = nullptr;
378379
GDALTileIndexDataset::QueueWorkingStates *poQueueWorkingStates =
379380
nullptr;
380381
int nBandNrMax = 0;
381-
std::string *posErrorMsg = nullptr;
382382

383383
int nXOff = 0;
384384
int nYOff = 0;
@@ -4624,6 +4624,7 @@ CPLErr GDALTileIndexDataset::IRasterIO(
46244624

46254625
if (m_bLastMustUseMultiThreading)
46264626
{
4627+
CPLErrorAccumulator oErrorAccumulator;
46274628
std::atomic<bool> bSuccess = true;
46284629
const int nContributingSources =
46294630
static_cast<int>(m_aoSourceDesc.size());
@@ -4654,16 +4655,15 @@ CPLErr GDALTileIndexDataset::IRasterIO(
46544655

46554656
auto oQueue = psThreadPool->CreateJobQueue();
46564657
std::atomic<int> nCompletedJobs = 0;
4657-
std::string osErrorMsg;
46584658
for (auto &oSourceDesc : m_aoSourceDesc)
46594659
{
46604660
auto psJob = new RasterIOJob();
46614661
psJob->poDS = this;
46624662
psJob->pbSuccess = &bSuccess;
4663+
psJob->poErrorAccumulator = &oErrorAccumulator;
46634664
psJob->pnCompletedJobs = &nCompletedJobs;
46644665
psJob->poQueueWorkingStates = &m_oQueueWorkingStates;
46654666
psJob->nBandNrMax = nBandNrMax;
4666-
psJob->posErrorMsg = &osErrorMsg;
46674667
psJob->nXOff = nXOff;
46684668
psJob->nYOff = nYOff;
46694669
psJob->nXSize = nXSize;
@@ -4702,10 +4702,7 @@ CPLErr GDALTileIndexDataset::IRasterIO(
47024702
}
47034703
}
47044704

4705-
if (!osErrorMsg.empty())
4706-
{
4707-
CPLError(CE_Failure, CPLE_AppDefined, "%s", osErrorMsg.c_str());
4708-
}
4705+
oErrorAccumulator.ReplayErrors();
47094706

47104707
if (bSuccess && psExtraArg->pfnProgress)
47114708
{
@@ -4754,22 +4751,16 @@ void GDALTileIndexDataset::RasterIOJob::Func(void *pData)
47544751

47554752
SourceDesc oSourceDesc;
47564753

4757-
std::vector<CPLErrorHandlerAccumulatorStruct> aoErrors;
4758-
CPLInstallErrorHandlerAccumulator(aoErrors);
4754+
auto oAccumulator = psJob->poErrorAccumulator->InstallForCurrentScope();
4755+
CPL_IGNORE_RET_VAL(oAccumulator);
4756+
47594757
const bool bCanOpenSource =
47604758
psJob->poDS->GetSourceDesc(osTileName, oSourceDesc,
47614759
&psJob->poQueueWorkingStates->oMutex) &&
47624760
oSourceDesc.poDS;
4763-
CPLUninstallErrorHandlerAccumulator();
47644761

47654762
if (!bCanOpenSource)
47664763
{
4767-
if (!aoErrors.empty())
4768-
{
4769-
std::lock_guard oLock(psJob->poQueueWorkingStates->oMutex);
4770-
if (psJob->posErrorMsg->empty())
4771-
*(psJob->posErrorMsg) = aoErrors.back().msg;
4772-
}
47734764
*psJob->pbSuccess = false;
47744765
}
47754766
else
@@ -4799,8 +4790,6 @@ void GDALTileIndexDataset::RasterIOJob::Func(void *pData)
47994790
dfYSize = psJob->psExtraArg->dfYSize;
48004791
}
48014792

4802-
aoErrors.clear();
4803-
CPLInstallErrorHandlerAccumulator(aoErrors);
48044793
const bool bRenderOK =
48054794
psJob->poDS->RenderSource(
48064795
oSourceDesc, /*bNeedInitBuffer = */ true, psJob->nBandNrMax,
@@ -4810,16 +4799,9 @@ void GDALTileIndexDataset::RasterIOJob::Func(void *pData)
48104799
psJob->nBandCount, psJob->panBandMap, psJob->nPixelSpace,
48114800
psJob->nLineSpace, psJob->nBandSpace, &sArg,
48124801
*(poWorkingState.get())) == CE_None;
4813-
CPLUninstallErrorHandlerAccumulator();
48144802

48154803
if (!bRenderOK)
48164804
{
4817-
if (!aoErrors.empty())
4818-
{
4819-
std::lock_guard oLock(psJob->poQueueWorkingStates->oMutex);
4820-
if (psJob->posErrorMsg->empty())
4821-
*(psJob->posErrorMsg) = aoErrors.back().msg;
4822-
}
48234805
*psJob->pbSuccess = false;
48244806
}
48254807

0 commit comments

Comments
 (0)