Skip to content

Commit 0ff0d8d

Browse files
authored
Merge pull request OSGeo#11870 from rouault/zarr_v2_add_shuffle_codec
Zarr V2: add read/update support for 'shuffle' filter
2 parents a4de9c0 + 000cced commit 0ff0d8d

File tree

5 files changed

+256
-12
lines changed

5 files changed

+256
-12
lines changed

autotest/gdrivers/data/zarr/generate_test_files.py

+12-1
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,7 @@
1616

1717
import numpy as np
1818
import zarr
19-
from numcodecs import LZ4, LZMA, Blosc, GZip, Zlib, Zstd
19+
from numcodecs import LZ4, LZMA, Blosc, GZip, Shuffle, Zlib, Zstd
2020

2121
os.chdir(os.path.dirname(__file__))
2222

@@ -69,6 +69,17 @@
6969
)
7070
z[:] = [1, 2]
7171

72+
z = zarr.open(
73+
"shuffle.zarr",
74+
mode="w",
75+
dtype="u2",
76+
shape=(2,),
77+
chunks=(2,),
78+
compressor=None,
79+
filters=[Shuffle(elementsize=2)],
80+
)
81+
z[:] = [1, 2]
82+
7283

7384
z = zarr.open(
7485
"order_f_u1.zarr",
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,19 @@
1+
{
2+
"chunks": [
3+
2
4+
],
5+
"compressor": null,
6+
"dtype": "<u2",
7+
"fill_value": 0,
8+
"filters": [
9+
{
10+
"elementsize": 2,
11+
"id": "shuffle"
12+
}
13+
],
14+
"order": "C",
15+
"shape": [
16+
2
17+
],
18+
"zarr_format": 2
19+
}
4 Bytes
Binary file not shown.

autotest/gdrivers/zarr_driver.py

+31
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@
1616
import json
1717
import math
1818
import os
19+
import shutil
1920
import struct
2021
import sys
2122

@@ -535,6 +536,36 @@ def test_zarr_read_compression_methods(datasetname, compressor):
535536
assert ar.Read() == array.array("b", [1, 2])
536537

537538

539+
def test_zarr_read_shuffle_filter():
540+
541+
filename = "data/zarr/shuffle.zarr"
542+
ds = gdal.OpenEx(filename, gdal.OF_MULTIDIM_RASTER)
543+
rg = ds.GetRootGroup()
544+
assert rg
545+
ar = rg.OpenMDArray(rg.GetMDArrayNames()[0])
546+
assert ar
547+
assert ar.Read() == array.array("h", [1, 2])
548+
549+
550+
def test_zarr_read_shuffle_filter_update(tmp_path):
551+
552+
out_filename = tmp_path / "filter_update.zarr"
553+
shutil.copytree("data/zarr/shuffle.zarr", out_filename)
554+
555+
def write():
556+
ds = gdal.OpenEx(out_filename, gdal.OF_MULTIDIM_RASTER | gdal.OF_UPDATE)
557+
rg = ds.GetRootGroup()
558+
ar = rg.OpenMDArray(rg.GetMDArrayNames()[0])
559+
ar.Write([3, 4])
560+
561+
write()
562+
563+
ds = gdal.OpenEx(out_filename, gdal.OF_MULTIDIM_RASTER)
564+
rg = ds.GetRootGroup()
565+
ar = rg.OpenMDArray(rg.GetMDArrayNames()[0])
566+
assert ar.Read() == array.array("h", [3, 4])
567+
568+
538569
@pytest.mark.parametrize("name", ["u1", "u2", "u4", "u8"])
539570
def test_zarr_read_fortran_order(name):
540571

frmts/zarr/zarr_v2_array.cpp

+194-11
Original file line numberDiff line numberDiff line change
@@ -396,6 +396,181 @@ bool ZarrV2Array::AllocateWorkingBuffers(
396396
#undef m_abyDecodedTileData
397397
}
398398

399+
/************************************************************************/
400+
/* ZarrShuffleCompressor() */
401+
/************************************************************************/
402+
403+
static bool ZarrShuffleCompressor(const void *input_data, size_t input_size,
404+
void **output_data, size_t *output_size,
405+
CSLConstList options,
406+
void * /* compressor_user_data */)
407+
{
408+
// 4 is the default of the shuffle numcodecs:
409+
// https://numcodecs.readthedocs.io/en/v0.10.0/shuffle.html
410+
const int eltSize = atoi(CSLFetchNameValueDef(options, "ELEMENTSIZE", "4"));
411+
if (eltSize != 2 && eltSize != 4 && eltSize != 8)
412+
{
413+
CPLError(CE_Failure, CPLE_AppDefined,
414+
"Only ELEMENTSIZE=2,4,8 is supported");
415+
if (output_size)
416+
*output_size = 0;
417+
return false;
418+
}
419+
if ((input_size % eltSize) != 0)
420+
{
421+
CPLError(CE_Failure, CPLE_AppDefined,
422+
"input_size should be a multiple of ELEMENTSIZE");
423+
if (output_size)
424+
*output_size = 0;
425+
return false;
426+
}
427+
if (output_data != nullptr && *output_data != nullptr &&
428+
output_size != nullptr && *output_size != 0)
429+
{
430+
if (*output_size < input_size)
431+
{
432+
CPLError(CE_Failure, CPLE_AppDefined, "Too small output size");
433+
*output_size = input_size;
434+
return false;
435+
}
436+
437+
const size_t nElts = input_size / eltSize;
438+
// Put at the front of the output buffer all the least significant
439+
// bytes of each word, then then 2nd least significant byte, etc.
440+
for (size_t i = 0; i < nElts; ++i)
441+
{
442+
for (int j = 0; j < eltSize; j++)
443+
{
444+
(static_cast<uint8_t *>(*output_data))[j * nElts + i] =
445+
(static_cast<const uint8_t *>(input_data))[i * eltSize + j];
446+
}
447+
}
448+
449+
*output_size = input_size;
450+
return true;
451+
}
452+
453+
if (output_data == nullptr && output_size != nullptr)
454+
{
455+
*output_size = input_size;
456+
return true;
457+
}
458+
459+
if (output_data != nullptr && *output_data == nullptr &&
460+
output_size != nullptr)
461+
{
462+
*output_data = VSI_MALLOC_VERBOSE(input_size);
463+
*output_size = input_size;
464+
if (*output_data == nullptr)
465+
return false;
466+
bool ret = ZarrShuffleCompressor(input_data, input_size, output_data,
467+
output_size, options, nullptr);
468+
if (!ret)
469+
{
470+
VSIFree(*output_data);
471+
*output_data = nullptr;
472+
}
473+
return ret;
474+
}
475+
476+
CPLError(CE_Failure, CPLE_AppDefined, "Invalid use of API");
477+
return false;
478+
}
479+
480+
/************************************************************************/
481+
/* ZarrShuffleDecompressor() */
482+
/************************************************************************/
483+
484+
static bool ZarrShuffleDecompressor(const void *input_data, size_t input_size,
485+
void **output_data, size_t *output_size,
486+
CSLConstList options,
487+
void * /* compressor_user_data */)
488+
{
489+
// 4 is the default of the shuffle numcodecs:
490+
// https://numcodecs.readthedocs.io/en/v0.10.0/shuffle.html
491+
const int eltSize = atoi(CSLFetchNameValueDef(options, "ELEMENTSIZE", "4"));
492+
if (eltSize != 2 && eltSize != 4 && eltSize != 8)
493+
{
494+
CPLError(CE_Failure, CPLE_AppDefined,
495+
"Only ELEMENTSIZE=2,4,8 is supported");
496+
if (output_size)
497+
*output_size = 0;
498+
return false;
499+
}
500+
if ((input_size % eltSize) != 0)
501+
{
502+
CPLError(CE_Failure, CPLE_AppDefined,
503+
"input_size should be a multiple of ELEMENTSIZE");
504+
if (output_size)
505+
*output_size = 0;
506+
return false;
507+
}
508+
if (output_data != nullptr && *output_data != nullptr &&
509+
output_size != nullptr && *output_size != 0)
510+
{
511+
if (*output_size < input_size)
512+
{
513+
CPLError(CE_Failure, CPLE_AppDefined, "Too small output size");
514+
*output_size = input_size;
515+
return false;
516+
}
517+
518+
// Reverse of what is done in the compressor function.
519+
const size_t nElts = input_size / eltSize;
520+
for (size_t i = 0; i < nElts; ++i)
521+
{
522+
for (int j = 0; j < eltSize; j++)
523+
{
524+
(static_cast<uint8_t *>(*output_data))[i * eltSize + j] =
525+
(static_cast<const uint8_t *>(input_data))[j * nElts + i];
526+
}
527+
}
528+
529+
*output_size = input_size;
530+
return true;
531+
}
532+
533+
if (output_data == nullptr && output_size != nullptr)
534+
{
535+
*output_size = input_size;
536+
return true;
537+
}
538+
539+
if (output_data != nullptr && *output_data == nullptr &&
540+
output_size != nullptr)
541+
{
542+
*output_data = VSI_MALLOC_VERBOSE(input_size);
543+
*output_size = input_size;
544+
if (*output_data == nullptr)
545+
return false;
546+
bool ret = ZarrShuffleDecompressor(input_data, input_size, output_data,
547+
output_size, options, nullptr);
548+
if (!ret)
549+
{
550+
VSIFree(*output_data);
551+
*output_data = nullptr;
552+
}
553+
return ret;
554+
}
555+
556+
CPLError(CE_Failure, CPLE_AppDefined, "Invalid use of API");
557+
return false;
558+
}
559+
560+
static const CPLCompressor gShuffleCompressor = {
561+
/* nStructVersion = */ 1,
562+
/* pszId = */ "shuffle", CCT_FILTER,
563+
/* papszMetadata = */ nullptr, ZarrShuffleCompressor,
564+
/* user_data = */ nullptr};
565+
566+
static const CPLCompressor gShuffleDecompressor = {
567+
/* nStructVersion = */ 1,
568+
/* pszId = */ "shuffle",
569+
CCT_FILTER,
570+
/* papszMetadata = */ nullptr,
571+
ZarrShuffleDecompressor,
572+
/* user_data = */ nullptr};
573+
399574
/************************************************************************/
400575
/* ZarrV2Array::LoadTileData() */
401576
/************************************************************************/
@@ -563,7 +738,9 @@ bool ZarrV2Array::LoadTileData(const uint64_t *tileIndices, bool bUseMutex,
563738
const auto &oFilter = m_oFiltersArray[i];
564739
const auto osFilterId = oFilter["id"].ToString();
565740
const auto psFilterDecompressor =
566-
CPLGetDecompressor(osFilterId.c_str());
741+
EQUAL(osFilterId.c_str(), "shuffle")
742+
? &gShuffleDecompressor
743+
: CPLGetDecompressor(osFilterId.c_str());
567744
CPLAssert(psFilterDecompressor);
568745

569746
CPLStringList aosOptions;
@@ -846,7 +1023,10 @@ bool ZarrV2Array::FlushDirtyTile() const
8461023
for (const auto &oFilter : m_oFiltersArray)
8471024
{
8481025
const auto osFilterId = oFilter["id"].ToString();
849-
const auto psFilterCompressor = CPLGetCompressor(osFilterId.c_str());
1026+
const auto psFilterCompressor =
1027+
EQUAL(osFilterId.c_str(), "shuffle")
1028+
? &gShuffleCompressor
1029+
: CPLGetCompressor(osFilterId.c_str());
8501030
CPLAssert(psFilterCompressor);
8511031

8521032
CPLStringList aosOptions;
@@ -1865,16 +2045,19 @@ ZarrV2Group::LoadArray(const std::string &osArrayName,
18652045
CPLError(CE_Failure, CPLE_AppDefined, "Missing filter id");
18662046
return nullptr;
18672047
}
1868-
const auto psFilterCompressor =
1869-
CPLGetCompressor(osFilterId.c_str());
1870-
const auto psFilterDecompressor =
1871-
CPLGetDecompressor(osFilterId.c_str());
1872-
if (psFilterCompressor == nullptr ||
1873-
psFilterDecompressor == nullptr)
2048+
if (!EQUAL(osFilterId.c_str(), "shuffle"))
18742049
{
1875-
CPLError(CE_Failure, CPLE_AppDefined, "Filter %s not handled",
1876-
osFilterId.c_str());
1877-
return nullptr;
2050+
const auto psFilterCompressor =
2051+
CPLGetCompressor(osFilterId.c_str());
2052+
const auto psFilterDecompressor =
2053+
CPLGetDecompressor(osFilterId.c_str());
2054+
if (psFilterCompressor == nullptr ||
2055+
psFilterDecompressor == nullptr)
2056+
{
2057+
CPLError(CE_Failure, CPLE_AppDefined,
2058+
"Filter %s not handled", osFilterId.c_str());
2059+
return nullptr;
2060+
}
18782061
}
18792062
}
18802063
}

0 commit comments

Comments
 (0)