From 08302513c5ea7e571b59d40d15e4af92ccce222e Mon Sep 17 00:00:00 2001 From: Giulio Eulisse <10544+ktf@users.noreply.github.com> Date: Fri, 7 Feb 2025 14:07:41 +0100 Subject: [PATCH 1/3] DPL: improve flat file support for RNTuple --- Framework/AnalysisSupport/src/Plugin.cxx | 40 ++++++++++++++++++++---- 1 file changed, 34 insertions(+), 6 deletions(-) diff --git a/Framework/AnalysisSupport/src/Plugin.cxx b/Framework/AnalysisSupport/src/Plugin.cxx index e3a39761e8049..033adc461c600 100644 --- a/Framework/AnalysisSupport/src/Plugin.cxx +++ b/Framework/AnalysisSupport/src/Plugin.cxx @@ -85,20 +85,48 @@ std::vector getListOfTables(std::unique_ptr& f) { std::vector r; TList* keyList = f->GetListOfKeys(); + // We should handle two cases, one where the list of tables in a TDirectory, + // the other one where the dataframe number is just a prefix + std::string first = ""; for (auto key : *keyList) { - if (!std::string_view(key->GetName()).starts_with("DF_")) { + if (!std::string_view(key->GetName()).starts_with("DF_") && !std::string_view(key->GetName()).starts_with("/DF_")) { continue; } - auto* d = (TDirectory*)f->Get(key->GetName()); - TList* branchList = d->GetListOfKeys(); - for (auto b : *branchList) { - r.emplace_back(b->GetName()); + auto* d = (TDirectory*)f->GetObjectChecked(key->GetName(), TClass::GetClass("TDirectory")); + // Objects are in a folder, list it. + if (d) { + TList* branchList = d->GetListOfKeys(); + for (auto b : *branchList) { + r.emplace_back(b->GetName()); + } + break; + } + + void* v = f->GetObjectChecked(key->GetName(), TClass::GetClass("ROOT::Experimental::RNTuple")); + if (v) { + std::string s = key->GetName(); + size_t pos = s.find('-'); + // Check if '-' is found + // Skip metaData and parentFiles + if (pos == std::string::npos) { + continue; + } + std::string t = s.substr(pos + 1); + // If we find a duplicate table name, it means we are in the next DF and we can stop. + if (t == first) { + break; + } + if (first.empty()) { + first = t; + } + // Create a new string starting after the '-' + r.emplace_back(t); } - break; } return r; } + auto readMetadata(std::unique_ptr& currentFile) -> std::vector { // Get the metadata, if any From 89a56b3ba96bd7d5317607f0559b72f33e518738 Mon Sep 17 00:00:00 2001 From: Giulio Eulisse <10544+ktf@users.noreply.github.com> Date: Fri, 7 Feb 2025 14:07:41 +0100 Subject: [PATCH 2/3] DPL: Move DataInputDirector to arrow::Dataset API --- .../src/AODJAlienReaderHelpers.cxx | 16 +- .../AnalysisSupport/src/DataInputDirector.cxx | 192 ++++++++++++------ .../AnalysisSupport/src/DataInputDirector.h | 20 +- Framework/AnalysisSupport/src/TTreePlugin.cxx | 40 +++- .../include/Framework/RootArrowFilesystem.h | 2 + Framework/Core/src/RootArrowFilesystem.cxx | 6 + .../TestWorkflows/src/o2TestHistograms.cxx | 4 +- 7 files changed, 192 insertions(+), 88 deletions(-) diff --git a/Framework/AnalysisSupport/src/AODJAlienReaderHelpers.cxx b/Framework/AnalysisSupport/src/AODJAlienReaderHelpers.cxx index 9c19de85739ce..f8a9705e4eb62 100644 --- a/Framework/AnalysisSupport/src/AODJAlienReaderHelpers.cxx +++ b/Framework/AnalysisSupport/src/AODJAlienReaderHelpers.cxx @@ -10,10 +10,12 @@ // or submit itself to any jurisdiction. #include "AODJAlienReaderHelpers.h" +#include #include "Framework/TableTreeHelpers.h" #include "Framework/AnalysisHelpers.h" #include "Framework/DataProcessingStats.h" #include "Framework/RootTableBuilderHelpers.h" +#include "Framework/RootArrowFilesystem.h" #include "Framework/AlgorithmSpec.h" #include "Framework/ConfigParamRegistry.h" #include "Framework/ControlService.h" @@ -41,6 +43,8 @@ #include #include #include +#include +#include using namespace o2; using namespace o2::aod; @@ -272,11 +276,13 @@ AlgorithmSpec AODJAlienReaderHelpers::rootFileReaderCallback(ConfigContext const // Origin file name for derived output map auto o2 = Output(TFFileNameHeader); auto fileAndFolder = didir->getFileFolder(dh, fcnt, ntf); - std::string currentFilename(fileAndFolder.file->GetName()); - if (strcmp(fileAndFolder.file->GetEndpointUrl()->GetProtocol(), "file") == 0 && fileAndFolder.file->GetEndpointUrl()->GetFile()[0] != '/') { + auto rootFS = std::dynamic_pointer_cast(fileAndFolder.filesystem()); + auto* f = dynamic_cast(rootFS->GetFile()); + std::string currentFilename(f->GetFile()->GetName()); + if (strcmp(f->GetEndpointUrl()->GetProtocol(), "file") == 0 && f->GetEndpointUrl()->GetFile()[0] != '/') { // This is not an absolute local path. Make it absolute. static std::string pwd = gSystem->pwd() + std::string("/"); - currentFilename = pwd + std::string(fileAndFolder.file->GetName()); + currentFilename = pwd + std::string(f->GetName()); } outputs.make(o2) = currentFilename; } @@ -312,7 +318,9 @@ AlgorithmSpec AODJAlienReaderHelpers::rootFileReaderCallback(ConfigContext const auto concrete = DataSpecUtils::asConcreteDataMatcher(firstRoute.matcher); auto dh = header::DataHeader(concrete.description, concrete.origin, concrete.subSpec); auto fileAndFolder = didir->getFileFolder(dh, fcnt, ntf); - if (!fileAndFolder.file) { + + // In case the filesource is empty, move to the next one. + if (fileAndFolder.path().empty()) { fcnt += 1; ntf = 0; if (didir->atEnd(fcnt)) { diff --git a/Framework/AnalysisSupport/src/DataInputDirector.cxx b/Framework/AnalysisSupport/src/DataInputDirector.cxx index 172ecd66c0e64..dd0238af8ddc0 100644 --- a/Framework/AnalysisSupport/src/DataInputDirector.cxx +++ b/Framework/AnalysisSupport/src/DataInputDirector.cxx @@ -11,6 +11,8 @@ #include "DataInputDirector.h" #include "Framework/DataDescriptorQueryBuilder.h" #include "Framework/Logger.h" +#include "Framework/PluginManager.h" +#include "Framework/RootArrowFilesystem.h" #include "Framework/AnalysisDataModelHelpers.h" #include "Framework/Output.h" #include "Headers/DataHeader.h" @@ -26,8 +28,12 @@ #include "TGrid.h" #include "TObjString.h" #include "TMap.h" +#include "TFile.h" +#include +#include #include +#include #if __has_include() #include @@ -47,12 +53,27 @@ FileNameHolder* makeFileNameHolder(std::string fileName) return fileNameHolder; } -DataInputDescriptor::DataInputDescriptor(bool alienSupport, int level, o2::monitoring::Monitoring* monitoring, int allowedParentLevel, std::string parentFileReplacement) : mAlienSupport(alienSupport), - mMonitoring(monitoring), - mAllowedParentLevel(allowedParentLevel), - mParentFileReplacement(std::move(parentFileReplacement)), - mLevel(level) +DataInputDescriptor::DataInputDescriptor(bool alienSupport, int level, o2::monitoring::Monitoring* monitoring, int allowedParentLevel, std::string parentFileReplacement) + : mAlienSupport(alienSupport), + mMonitoring(monitoring), + mAllowedParentLevel(allowedParentLevel), + mParentFileReplacement(std::move(parentFileReplacement)), + mLevel(level) { + std::vector capabilitiesSpecs = { + "O2Framework:RNTupleObjectReadingCapability", + "O2Framework:TTreeObjectReadingCapability", + }; + + std::vector plugins; + for (auto spec : capabilitiesSpecs) { + auto morePlugins = PluginManager::parsePluginSpecString(spec); + for (auto& extra : morePlugins) { + plugins.push_back(extra); + } + } + + PluginManager::loadFromPlugin(plugins, mFactory.capabilities); } void DataInputDescriptor::printOut() @@ -108,20 +129,22 @@ bool DataInputDescriptor::setFile(int counter) // open file auto filename = mfilenames[counter]->fileName; - if (mcurrentFile) { - if (mcurrentFile->GetName() == filename) { + auto rootFS = std::dynamic_pointer_cast(mCurrentFilesystem); + if (rootFS.get()) { + if (rootFS->GetFile()->GetName() == filename) { return true; } closeInputFile(); } - mcurrentFile = TFile::Open(filename.c_str()); - if (!mcurrentFile) { + + mCurrentFilesystem = std::make_shared(TFile::Open(filename.c_str()), 50 * 1024 * 1024, mFactory); + if (!mCurrentFilesystem.get()) { throw std::runtime_error(fmt::format("Couldn't open file \"{}\"!", filename)); } - mcurrentFile->SetReadaheadSize(50 * 1024 * 1024); + rootFS = std::dynamic_pointer_cast(mCurrentFilesystem); // get the parent file map if exists - mParentFileMap = (TMap*)mcurrentFile->Get("parentFiles"); // folder name (DF_XXX) --> parent file (absolute path) + mParentFileMap = (TMap*)rootFS->GetFile()->Get("parentFiles"); // folder name (DF_XXX) --> parent file (absolute path) if (mParentFileMap && !mParentFileReplacement.empty()) { auto pos = mParentFileReplacement.find(';'); if (pos == std::string::npos) { @@ -140,16 +163,28 @@ bool DataInputDescriptor::setFile(int counter) // get the directory names if (mfilenames[counter]->numberOfTimeFrames <= 0) { - std::regex TFRegex = std::regex("DF_[0-9]+"); - TList* keyList = mcurrentFile->GetListOfKeys(); + const std::regex TFRegex = std::regex("/?DF_([0-9]+)(|-.*)$"); + TList* keyList = rootFS->GetFile()->GetListOfKeys(); + std::vector finalList; // extract TF numbers and sort accordingly + // We use an extra seen set to make sure we preserve the order in which + // we instert things in the final list and to make sure we do not have duplicates. + // Multiple folder numbers can happen if we use a flat structure /DF_- + std::unordered_set seen; for (auto key : *keyList) { - if (std::regex_match(((TObjString*)key)->GetString().Data(), TFRegex)) { - auto folderNumber = std::stoul(std::string(((TObjString*)key)->GetString().Data()).substr(3)); - mfilenames[counter]->listOfTimeFrameNumbers.emplace_back(folderNumber); + std::smatch matchResult; + std::string keyName = ((TObjString*)key)->GetString().Data(); + bool match = std::regex_match(keyName, matchResult, TFRegex); + if (match) { + auto folderNumber = std::stoul(matchResult[1].str()); + if (seen.find(folderNumber) == seen.end()) { + seen.insert(folderNumber); + mfilenames[counter]->listOfTimeFrameNumbers.emplace_back(folderNumber); + } } } + if (mParentFileMap != nullptr) { // If we have a parent map, we should not process in DF alphabetical order but according to parent file to avoid swapping between files std::sort(mfilenames[counter]->listOfTimeFrameNumbers.begin(), mfilenames[counter]->listOfTimeFrameNumbers.end(), @@ -162,12 +197,8 @@ bool DataInputDescriptor::setFile(int counter) std::sort(mfilenames[counter]->listOfTimeFrameNumbers.begin(), mfilenames[counter]->listOfTimeFrameNumbers.end()); } - for (auto folderNumber : mfilenames[counter]->listOfTimeFrameNumbers) { - auto folderName = "DF_" + std::to_string(folderNumber); - mfilenames[counter]->listOfTimeFrameKeys.emplace_back(folderName); - mfilenames[counter]->alreadyRead.emplace_back(false); - } - mfilenames[counter]->numberOfTimeFrames = mfilenames[counter]->listOfTimeFrameKeys.size(); + mfilenames[counter]->alreadyRead.resize(mfilenames[counter]->alreadyRead.size() + mfilenames[counter]->listOfTimeFrameNumbers.size(), false); + mfilenames[counter]->numberOfTimeFrames = mfilenames[counter]->listOfTimeFrameNumbers.size(); } mCurrentFileID = counter; @@ -193,26 +224,21 @@ uint64_t DataInputDescriptor::getTimeFrameNumber(int counter, int numTF) return (mfilenames[counter]->listOfTimeFrameNumbers)[numTF]; } -FileAndFolder DataInputDescriptor::getFileFolder(int counter, int numTF) +arrow::dataset::FileSource DataInputDescriptor::getFileFolder(int counter, int numTF) { - FileAndFolder fileAndFolder; - // open file if (!setFile(counter)) { - return fileAndFolder; + return {}; } // no TF left if (mfilenames[counter]->numberOfTimeFrames > 0 && numTF >= mfilenames[counter]->numberOfTimeFrames) { - return fileAndFolder; + return {}; } - fileAndFolder.file = mcurrentFile; - fileAndFolder.folderName = (mfilenames[counter]->listOfTimeFrameKeys)[numTF]; - mfilenames[counter]->alreadyRead[numTF] = true; - return fileAndFolder; + return {fmt::format("DF_{}", mfilenames[counter]->listOfTimeFrameNumbers[numTF]), mCurrentFilesystem}; } DataInputDescriptor* DataInputDescriptor::getParentFile(int counter, int numTF, std::string treename) @@ -221,17 +247,19 @@ DataInputDescriptor* DataInputDescriptor::getParentFile(int counter, int numTF, // This file has no parent map return nullptr; } - auto folderName = (mfilenames[counter]->listOfTimeFrameKeys)[numTF]; + auto folderName = fmt::format("DF_{}", mfilenames[counter]->listOfTimeFrameNumbers[numTF]); auto parentFileName = (TObjString*)mParentFileMap->GetValue(folderName.c_str()); + // The current DF is not found in the parent map (this should not happen and is a fatal error) + auto rootFS = std::dynamic_pointer_cast(mCurrentFilesystem); if (!parentFileName) { - // The current DF is not found in the parent map (this should not happen and is a fatal error) - throw std::runtime_error(fmt::format(R"(parent file map exists but does not contain the current DF "{}" in file "{}")", folderName.c_str(), mcurrentFile->GetName())); + throw std::runtime_error(fmt::format(R"(parent file map exists but does not contain the current DF "{}" in file "{}")", folderName.c_str(), rootFS->GetFile()->GetName())); return nullptr; } if (mParentFile) { // Is this still the corresponding to the correct file? - if (parentFileName->GetString().CompareTo(mParentFile->mcurrentFile->GetName()) == 0) { + auto parentRootFS = std::dynamic_pointer_cast(mParentFile->mCurrentFilesystem); + if (parentFileName->GetString().CompareTo(parentRootFS->GetFile()->GetName()) == 0) { return mParentFile; } else { mParentFile->closeInputFile(); @@ -241,7 +269,8 @@ DataInputDescriptor* DataInputDescriptor::getParentFile(int counter, int numTF, } if (mLevel == mAllowedParentLevel) { - throw std::runtime_error(fmt::format(R"(while looking for tree "{}", the parent file was requested but we are already at level {} of maximal allowed level {} for DF "{}" in file "{}")", treename.c_str(), mLevel, mAllowedParentLevel, folderName.c_str(), mcurrentFile->GetName())); + throw std::runtime_error(fmt::format(R"(while looking for tree "{}", the parent file was requested but we are already at level {} of maximal allowed level {} for DF "{}" in file "{}")", treename.c_str(), mLevel, mAllowedParentLevel, folderName.c_str(), + rootFS->GetFile()->GetName())); } LOGP(info, "Opening parent file {} for DF {}", parentFileName->GetString().Data(), folderName.c_str()); @@ -270,11 +299,13 @@ void DataInputDescriptor::printFileStatistics() if (wait_time < 0) { wait_time = 0; } - std::string monitoringInfo(fmt::format("lfn={},size={},total_df={},read_df={},read_bytes={},read_calls={},io_time={:.1f},wait_time={:.1f},level={}", mcurrentFile->GetName(), - mcurrentFile->GetSize(), getTimeFramesInFile(mCurrentFileID), getReadTimeFramesInFile(mCurrentFileID), mcurrentFile->GetBytesRead(), mcurrentFile->GetReadCalls(), + auto rootFS = std::dynamic_pointer_cast(mCurrentFilesystem); + auto f = dynamic_cast(rootFS->GetFile()); + std::string monitoringInfo(fmt::format("lfn={},size={},total_df={},read_df={},read_bytes={},read_calls={},io_time={:.1f},wait_time={:.1f},level={}", f->GetName(), + f->GetSize(), getTimeFramesInFile(mCurrentFileID), getReadTimeFramesInFile(mCurrentFileID), f->GetBytesRead(), f->GetReadCalls(), ((float)mIOTime / 1e9), ((float)wait_time / 1e9), mLevel)); #if __has_include() - auto alienFile = dynamic_cast(mcurrentFile); + auto alienFile = dynamic_cast(f); if (alienFile) { monitoringInfo += fmt::format(",se={},open_time={:.1f}", alienFile->GetSE(), alienFile->GetElapsed()); } @@ -285,7 +316,7 @@ void DataInputDescriptor::printFileStatistics() void DataInputDescriptor::closeInputFile() { - if (mcurrentFile) { + if (mCurrentFilesystem.get()) { if (mParentFile) { mParentFile->closeInputFile(); delete mParentFile; @@ -296,9 +327,7 @@ void DataInputDescriptor::closeInputFile() mParentFileMap = nullptr; printFileStatistics(); - mcurrentFile->Close(); - delete mcurrentFile; - mcurrentFile = nullptr; + mCurrentFilesystem.reset(); } } @@ -346,8 +375,8 @@ int DataInputDescriptor::fillInputfiles() int DataInputDescriptor::findDFNumber(int file, std::string dfName) { - auto dfList = mfilenames[file]->listOfTimeFrameKeys; - auto it = std::find(dfList.begin(), dfList.end(), dfName); + auto dfList = mfilenames[file]->listOfTimeFrameNumbers; + auto it = std::find_if(dfList.begin(), dfList.end(), [dfName](size_t i) { return fmt::format("DF_{}", i) == dfName; }); if (it == dfList.end()) { return -1; } @@ -358,40 +387,75 @@ bool DataInputDescriptor::readTree(DataAllocator& outputs, header::DataHeader dh { auto ioStart = uv_hrtime(); - auto fileAndFolder = getFileFolder(counter, numTF); - if (!fileAndFolder.file) { + auto folder = getFileFolder(counter, numTF); + if (!folder.filesystem()) { return false; } - auto fullpath = fileAndFolder.folderName + "/" + treename; - auto tree = (TTree*)fileAndFolder.file->Get(fullpath.c_str()); + auto rootFS = std::dynamic_pointer_cast(folder.filesystem()); + + if (!rootFS) { + throw std::runtime_error(fmt::format(R"(Not a TFile filesystem!)")); + } + // FIXME: Ugly. We should detect the format from the treename, good enough for now. + std::shared_ptr format; + + auto fullpath = arrow::dataset::FileSource{folder.path() + "/" + treename, folder.filesystem()}; + + for (auto& capability : mFactory.capabilities) { + auto objectPath = capability.lfn2objectPath(fullpath.path()); + void* handle = capability.getHandle(rootFS, objectPath); + if (handle) { + format = capability.factory().format(); + break; + } + } + + if (!format) { + throw std::runtime_error(fmt::format(R"(Cannot find a viable format for object {}!)", fullpath.path())); + } + + auto schemaOpt = format->Inspect(fullpath); + auto physicalSchema = schemaOpt; + std::vector> fields; + for (auto& original : (*schemaOpt)->fields()) { + if (original->name().ends_with("_size")) { + continue; + } + fields.push_back(original); + } + auto datasetSchema = std::make_shared(fields); + + auto fragment = format->MakeFragment(fullpath, {}, *physicalSchema); - if (!tree) { - LOGP(debug, "Could not find tree {}. Trying in parent file.", fullpath.c_str()); + if (!fragment.ok()) { + LOGP(debug, "Could not find tree {}. Trying in parent file.", fullpath.path()); auto parentFile = getParentFile(counter, numTF, treename); if (parentFile != nullptr) { - int parentNumTF = parentFile->findDFNumber(0, fileAndFolder.folderName); + int parentNumTF = parentFile->findDFNumber(0, folder.path()); if (parentNumTF == -1) { - throw std::runtime_error(fmt::format(R"(DF {} listed in parent file map but not found in the corresponding file "{}")", fileAndFolder.folderName, parentFile->mcurrentFile->GetName())); + auto parentRootFS = std::dynamic_pointer_cast(parentFile->mCurrentFilesystem); + throw std::runtime_error(fmt::format(R"(DF {} listed in parent file map but not found in the corresponding file "{}")", folder.path(), parentRootFS->GetFile()->GetName())); } // first argument is 0 as the parent file object contains only 1 file return parentFile->readTree(outputs, dh, 0, parentNumTF, treename, totalSizeCompressed, totalSizeUncompressed); } - throw std::runtime_error(fmt::format(R"(Couldn't get TTree "{}" from "{}". Please check https://aliceo2group.github.io/analysis-framework/docs/troubleshooting/#tree-not-found for more information.)", fileAndFolder.folderName + "/" + treename, fileAndFolder.file->GetName())); + auto rootFS = std::dynamic_pointer_cast(mCurrentFilesystem); + throw std::runtime_error(fmt::format(R"(Couldn't get TTree "{}" from "{}". Please check https://aliceo2group.github.io/analysis-framework/docs/troubleshooting/#tree-not-found for more information.)", fullpath.path(), rootFS->GetFile()->GetName())); } // create table output auto o = Output(dh); - auto t2t = outputs.make(o); - // add branches to read - // fill the table - t2t->setLabel(tree->GetName()); - totalSizeCompressed += tree->GetZipBytes(); - totalSizeUncompressed += tree->GetTotBytes(); - t2t->addAllColumns(tree); - t2t->fill(tree); - delete tree; + // FIXME: This should allow me to create a memory pool + // which I can then use to scan the dataset. + // + auto f2b = outputs.make(o); + + //// add branches to read + //// fill the table + f2b->setLabel(treename.c_str()); + f2b->fill(*fragment, datasetSchema, format); mIOTime += (uv_hrtime() - ioStart); @@ -693,7 +757,7 @@ DataInputDescriptor* DataInputDirector::getDataInputDescriptor(header::DataHeade return result; } -FileAndFolder DataInputDirector::getFileFolder(header::DataHeader dh, int counter, int numTF) +arrow::dataset::FileSource DataInputDirector::getFileFolder(header::DataHeader dh, int counter, int numTF) { auto didesc = getDataInputDescriptor(dh); // if NOT match then use defaultDataInputDescriptor diff --git a/Framework/AnalysisSupport/src/DataInputDirector.h b/Framework/AnalysisSupport/src/DataInputDirector.h index eca0ef195d111..9bab29db3ff24 100644 --- a/Framework/AnalysisSupport/src/DataInputDirector.h +++ b/Framework/AnalysisSupport/src/DataInputDirector.h @@ -15,6 +15,10 @@ #include "Framework/DataDescriptorMatcher.h" #include "Framework/DataAllocator.h" +#include "Framework/RootArrowFilesystem.h" + +#include +#include #include #include "rapidjson/fwd.h" @@ -31,16 +35,10 @@ struct FileNameHolder { std::string fileName; int numberOfTimeFrames = 0; std::vector listOfTimeFrameNumbers; - std::vector listOfTimeFrameKeys; std::vector alreadyRead; }; FileNameHolder* makeFileNameHolder(std::string fileName); -struct FileAndFolder { - TFile* file = nullptr; - std::string folderName = ""; -}; - class DataInputDescriptor { /// Holds information concerning the reading of an aod table. @@ -52,7 +50,6 @@ class DataInputDescriptor std::string treename = ""; std::unique_ptr matcher; - DataInputDescriptor() = default; DataInputDescriptor(bool alienSupport, int level, o2::monitoring::Monitoring* monitoring = nullptr, int allowedParentLevel = 0, std::string parentFileReplacement = ""); void printOut(); @@ -78,7 +75,7 @@ class DataInputDescriptor int findDFNumber(int file, std::string dfName); uint64_t getTimeFrameNumber(int counter, int numTF); - FileAndFolder getFileFolder(int counter, int numTF); + arrow::dataset::FileSource getFileFolder(int counter, int numTF); DataInputDescriptor* getParentFile(int counter, int numTF, std::string treename); int getTimeFramesInFile(int counter); int getReadTimeFramesInFile(int counter); @@ -90,6 +87,7 @@ class DataInputDescriptor bool isAlienSupportOn() { return mAlienSupport; } private: + o2::framework::RootObjectReadingFactory mFactory; std::string minputfilesFile = ""; std::string* minputfilesFilePtr = nullptr; std::string mFilenameRegex = ""; @@ -98,7 +96,7 @@ class DataInputDescriptor std::string mParentFileReplacement; std::vector mfilenames; std::vector* mdefaultFilenamesPtr = nullptr; - TFile* mcurrentFile = nullptr; + std::shared_ptr mCurrentFilesystem; int mCurrentFileID = -1; bool mAlienSupport = false; @@ -127,7 +125,6 @@ class DataInputDirector ~DataInputDirector(); void reset(); - void createDefaultDataInputDescriptor(); void printOut(); bool atEnd(int counter); @@ -140,10 +137,11 @@ class DataInputDirector // getters DataInputDescriptor* getDataInputDescriptor(header::DataHeader dh); int getNumberInputDescriptors() { return mdataInputDescriptors.size(); } + void createDefaultDataInputDescriptor(); bool readTree(DataAllocator& outputs, header::DataHeader dh, int counter, int numTF, size_t& totalSizeCompressed, size_t& totalSizeUncompressed); uint64_t getTimeFrameNumber(header::DataHeader dh, int counter, int numTF); - FileAndFolder getFileFolder(header::DataHeader dh, int counter, int numTF); + arrow::dataset::FileSource getFileFolder(header::DataHeader dh, int counter, int numTF); int getTimeFramesInFile(header::DataHeader dh, int counter); uint64_t getTotalSizeCompressed(); diff --git a/Framework/AnalysisSupport/src/TTreePlugin.cxx b/Framework/AnalysisSupport/src/TTreePlugin.cxx index 4b130a2144253..b6df8c969b12d 100644 --- a/Framework/AnalysisSupport/src/TTreePlugin.cxx +++ b/Framework/AnalysisSupport/src/TTreePlugin.cxx @@ -13,6 +13,7 @@ #include "Framework/Plugins.h" #include "Framework/Signpost.h" #include "Framework/Endian.h" +#include #include #include #include @@ -286,6 +287,8 @@ arrow::Result TTreeFileFormat::ScanBatchesAsync( auto generator = [pool = options->pool, treeFragment, dataset_schema, &totalCompressedSize = mTotCompressedSize, &totalUncompressedSize = mTotUncompressedSize]() -> arrow::Future> { + O2_SIGNPOST_ID_FROM_POINTER(tid, root_arrow_fs, treeFragment->GetTree()); + O2_SIGNPOST_START(root_arrow_fs, tid, "Generator", "Creating batch for tree %{public}s", treeFragment->GetTree()->GetName()); std::vector> columns; std::vector> fields = dataset_schema->fields(); auto physical_schema = *treeFragment->ReadPhysicalSchema(); @@ -299,27 +302,48 @@ arrow::Result TTreeFileFormat::ScanBatchesAsync( for (int fi = 0; fi < dataset_schema->num_fields(); ++fi) { auto dataset_field = dataset_schema->field(fi); + // This is needed because for now the dataset_field + // is actually the schema of the ttree + O2_SIGNPOST_EVENT_EMIT(root_arrow_fs, tid, "Generator", "Processing dataset field %{public}s.", dataset_field->name().c_str()); int physicalFieldIdx = physical_schema->GetFieldIndex(dataset_field->name()); if (physicalFieldIdx < 0) { throw runtime_error_f("Cannot find physical field associated to %s", dataset_field->name().c_str()); } if (physicalFieldIdx > 1 && physical_schema->field(physicalFieldIdx - 1)->name().ends_with("_size")) { + O2_SIGNPOST_EVENT_EMIT(root_arrow_fs, tid, "Generator", "Field %{public}s has sizes in %{public}s.", dataset_field->name().c_str(), + physical_schema->field(physicalFieldIdx - 1)->name().c_str()); mappings.push_back({physicalFieldIdx, physicalFieldIdx - 1, fi}); } else { + if (physicalFieldIdx > 1) { + O2_SIGNPOST_EVENT_EMIT(root_arrow_fs, tid, "Generator", "Field %{public}s previous field is %{public}s.", dataset_field->name().c_str(), + physical_schema->field(physicalFieldIdx - 1)->name().c_str()); + } mappings.push_back({physicalFieldIdx, -1, fi}); } } auto* tree = treeFragment->GetTree(); - tree->SetCacheSize(25000000); auto branches = tree->GetListOfBranches(); + size_t totalTreeSize = 0; + std::vector selectedBranches; for (auto& mapping : mappings) { - tree->AddBranchToCache((TBranch*)branches->At(mapping.mainBranchIdx), false); + selectedBranches.push_back((TBranch*)branches->At(mapping.mainBranchIdx)); + O2_SIGNPOST_EVENT_EMIT(root_arrow_fs, tid, "Generator", "Adding branch %{public}s to stream.", selectedBranches.back()->GetName()); + totalTreeSize += selectedBranches.back()->GetTotalSize(); if (mapping.vlaIdx != -1) { - tree->AddBranchToCache((TBranch*)branches->At(mapping.vlaIdx), false); + selectedBranches.push_back((TBranch*)branches->At(mapping.vlaIdx)); + O2_SIGNPOST_EVENT_EMIT(root_arrow_fs, tid, "Generator", "Adding branch %{public}s to stream.", selectedBranches.back()->GetName()); + totalTreeSize += selectedBranches.back()->GetTotalSize(); } } + + size_t cacheSize = std::max(std::min(totalTreeSize, 25000000UL), 1000000UL); + O2_SIGNPOST_EVENT_EMIT(root_arrow_fs, tid, "Generator", "Resizing cache to %zu.", cacheSize); + tree->SetCacheSize(cacheSize); + for (auto* branch : selectedBranches) { + tree->AddBranchToCache(branch, false); + } tree->StopCacheLearningPhase(); static TBufferFile buffer{TBuffer::EMode::kWrite, 4 * 1024 * 1024}; @@ -400,9 +424,7 @@ arrow::Result TTreeFileFormat::ScanBatchesAsync( } } else { // This is needed for branches which have not been persisted. - auto bytes = branch->GetTotBytes(); - auto branchSize = bytes ? bytes : 1000000; - auto&& result = arrow::AllocateResizableBuffer(branchSize, pool); + auto&& result = arrow::AllocateBuffer(branch->GetTotalSize(), pool); if (!result.ok()) { throw runtime_error("Cannot allocate values buffer"); } @@ -423,7 +445,7 @@ arrow::Result TTreeFileFormat::ScanBatchesAsync( if (mapping.vlaIdx != -1) { auto* mSizeBranch = (TBranch*)branches->At(mapping.vlaIdx); offsetBuffer = std::make_unique(TBuffer::EMode::kWrite, 4 * 1024 * 1024); - result = arrow::AllocateResizableBuffer((totalEntries + 1) * (int64_t)sizeof(int), pool); + result = arrow::AllocateBuffer((totalEntries + 1) * (int64_t)sizeof(int), pool); if (!result.ok()) { throw runtime_error("Cannot allocate offset buffer"); } @@ -435,6 +457,9 @@ arrow::Result TTreeFileFormat::ScanBatchesAsync( // read sizes first while (readEntries < totalEntries) { auto readLast = mSizeBranch->GetBulkRead().GetEntriesSerialized(readEntries, *offsetBuffer); + if (readLast == -1) { + throw runtime_error_f("Unable to read from branch %s.", mSizeBranch->GetName()); + } readEntries += readLast; for (auto i = 0; i < readLast; ++i) { offsets[count++] = (int)offset; @@ -492,6 +517,7 @@ arrow::Result TTreeFileFormat::ScanBatchesAsync( auto batch = arrow::RecordBatch::Make(dataset_schema, rows, columns); totalCompressedSize += tree->GetZipBytes(); totalUncompressedSize += tree->GetTotBytes(); + O2_SIGNPOST_END(root_arrow_fs, tid, "Generator", "Done creating batch compressed:%zu uncompressed:%zu", totalCompressedSize, totalUncompressedSize); return batch; }; return generator; diff --git a/Framework/Core/include/Framework/RootArrowFilesystem.h b/Framework/Core/include/Framework/RootArrowFilesystem.h index 441b43aeca331..6f331ddbbca4e 100644 --- a/Framework/Core/include/Framework/RootArrowFilesystem.h +++ b/Framework/Core/include/Framework/RootArrowFilesystem.h @@ -144,6 +144,8 @@ class TFileFileSystem : public VirtualRootFileSystemBase TFileFileSystem(TDirectoryFile* f, size_t readahead, RootObjectReadingFactory&); + ~TFileFileSystem() override; + std::string type_name() const override { return "TDirectoryFile"; diff --git a/Framework/Core/src/RootArrowFilesystem.cxx b/Framework/Core/src/RootArrowFilesystem.cxx index c563866e802bb..403e393ec6090 100644 --- a/Framework/Core/src/RootArrowFilesystem.cxx +++ b/Framework/Core/src/RootArrowFilesystem.cxx @@ -42,6 +42,12 @@ TFileFileSystem::TFileFileSystem(TDirectoryFile* f, size_t readahead, RootObject ((TFile*)mFile)->SetReadaheadSize(50 * 1024 * 1024); } +TFileFileSystem::~TFileFileSystem() +{ + mFile->Close(); + delete mFile; +} + std::shared_ptr TFileFileSystem::GetObjectHandler(arrow::dataset::FileSource source) { // We use a plugin to create the actual objects inside the diff --git a/Framework/TestWorkflows/src/o2TestHistograms.cxx b/Framework/TestWorkflows/src/o2TestHistograms.cxx index efac16f6da4f0..2ec268130267b 100644 --- a/Framework/TestWorkflows/src/o2TestHistograms.cxx +++ b/Framework/TestWorkflows/src/o2TestHistograms.cxx @@ -40,7 +40,7 @@ struct EtaAndClsHistogramsSimple { OutputObj etaClsH{TH2F("eta_vs_pt", "#eta vs pT", 102, -2.01, 2.01, 100, 0, 10)}; Produces skimEx; - void process(aod::Tracks const& tracks) + void process(aod::Tracks const& tracks, aod::FT0s const&) { LOGP(info, "Invoking the simple one"); for (auto& track : tracks) { @@ -54,7 +54,7 @@ struct EtaAndClsHistogramsIUSimple { OutputObj etaClsH{TH2F("eta_vs_pt", "#eta vs pT", 102, -2.01, 2.01, 100, 0, 10)}; Produces skimEx; - void process(aod::TracksIU const& tracks) + void process(aod::TracksIU const& tracks, aod::FT0s const&) { LOGP(info, "Invoking the simple one"); for (auto& track : tracks) { From 6064fb9c6315c694f26a81ede3c8e4098d3a2f00 Mon Sep 17 00:00:00 2001 From: Giulio Eulisse <10544+ktf@users.noreply.github.com> Date: Thu, 13 Feb 2025 13:18:54 +0100 Subject: [PATCH 3/3] DPL: add support for decompressing directly to shared memory This PR postpones the read operations which would usually populate an intermediate RecordBatch and it performs them directly on its subsequent shared memory serialization. Doing so avoids having the intermediate representation allocate most of the memory. For the moment this is only done for TTree. RNtuple support will come in a subsequent PR. --- .../AnalysisSupport/src/DataInputDirector.cxx | 7 +- .../AnalysisSupport/src/RNTuplePlugin.cxx | 6 +- Framework/AnalysisSupport/src/TTreePlugin.cxx | 603 +++++++++++++----- .../include/Framework/RootArrowFilesystem.h | 4 + .../Core/include/Framework/TableTreeHelpers.h | 16 +- Framework/Core/src/DataAllocator.cxx | 57 +- Framework/Core/src/TableTreeHelpers.cxx | 14 +- Framework/Core/test/test_Root2ArrowTable.cxx | 64 +- 8 files changed, 552 insertions(+), 219 deletions(-) diff --git a/Framework/AnalysisSupport/src/DataInputDirector.cxx b/Framework/AnalysisSupport/src/DataInputDirector.cxx index dd0238af8ddc0..1daab029b3e8e 100644 --- a/Framework/AnalysisSupport/src/DataInputDirector.cxx +++ b/Framework/AnalysisSupport/src/DataInputDirector.cxx @@ -399,6 +399,7 @@ bool DataInputDescriptor::readTree(DataAllocator& outputs, header::DataHeader dh } // FIXME: Ugly. We should detect the format from the treename, good enough for now. std::shared_ptr format; + FragmentToBatch::StreamerCreator creator = nullptr; auto fullpath = arrow::dataset::FileSource{folder.path() + "/" + treename, folder.filesystem()}; @@ -407,6 +408,7 @@ bool DataInputDescriptor::readTree(DataAllocator& outputs, header::DataHeader dh void* handle = capability.getHandle(rootFS, objectPath); if (handle) { format = capability.factory().format(); + creator = capability.factory().deferredOutputStreamer; break; } } @@ -449,13 +451,12 @@ bool DataInputDescriptor::readTree(DataAllocator& outputs, header::DataHeader dh // FIXME: This should allow me to create a memory pool // which I can then use to scan the dataset. - // - auto f2b = outputs.make(o); + auto f2b = outputs.make(o, creator, *fragment); //// add branches to read //// fill the table f2b->setLabel(treename.c_str()); - f2b->fill(*fragment, datasetSchema, format); + f2b->fill(datasetSchema, format); mIOTime += (uv_hrtime() - ioStart); diff --git a/Framework/AnalysisSupport/src/RNTuplePlugin.cxx b/Framework/AnalysisSupport/src/RNTuplePlugin.cxx index 51b585d0714bb..a910964e6527c 100644 --- a/Framework/AnalysisSupport/src/RNTuplePlugin.cxx +++ b/Framework/AnalysisSupport/src/RNTuplePlugin.cxx @@ -12,6 +12,7 @@ #include "Framework/RuntimeError.h" #include "Framework/RootArrowFilesystem.h" #include "Framework/Plugins.h" +#include "Framework/FairMQResizableBuffer.h" #include #include #include @@ -852,7 +853,10 @@ struct RNTupleObjectReadingImplementation : public RootArrowFactoryPlugin { return new RootArrowFactory{ .options = [context]() { return context->format->DefaultWriteOptions(); }, .format = [context]() { return context->format; }, - }; + .deferredOutputStreamer = [](std::shared_ptr fragment, const std::shared_ptr& buffer) -> std::shared_ptr { + auto treeFragment = std::dynamic_pointer_cast(fragment); + return std::make_shared(buffer); + }}; } }; diff --git a/Framework/AnalysisSupport/src/TTreePlugin.cxx b/Framework/AnalysisSupport/src/TTreePlugin.cxx index b6df8c969b12d..5c51e47511cec 100644 --- a/Framework/AnalysisSupport/src/TTreePlugin.cxx +++ b/Framework/AnalysisSupport/src/TTreePlugin.cxx @@ -13,11 +13,15 @@ #include "Framework/Plugins.h" #include "Framework/Signpost.h" #include "Framework/Endian.h" +#include +#include #include #include #include +#include #include #include +#include #include #include #include @@ -27,13 +31,269 @@ #include #include #include +#include +#include #include +#include +#include O2_DECLARE_DYNAMIC_LOG(root_arrow_fs); namespace o2::framework { +enum struct ReadOpKind { + Unknown, + Offsets, + Values, + Booleans, + VLA +}; + +struct ReadOps { + TBranch* branch = nullptr; + std::shared_ptr targetBuffer = nullptr; + int64_t rootBranchEntries = 0; + size_t typeSize = 0; + size_t listSize = 0; + // If this is an offset reading op, keep track of the actual + // range for the offsets, not only how many VLAs are there. + int64_t offsetCount = 0; + ReadOpKind kind = ReadOpKind::Unknown; +}; + +/// An OutputStream which does the reading of the input buffers directly +/// on writing, if needed. Each deferred operation is encoded in the source +/// buffer by an incremental number which can be used to lookup in the @a ops +/// vector the operation to perform. +class TTreeDeferredReadOutputStream : public arrow::io::OutputStream +{ + public: + explicit TTreeDeferredReadOutputStream(std::vector& ops, + const std::shared_ptr& buffer); + + /// \brief Create in-memory output stream with indicated capacity using a + /// memory pool + /// \param[in] initial_capacity the initial allocated internal capacity of + /// the OutputStream + /// \param[in,out] pool a MemoryPool to use for allocations + /// \return the created stream + static arrow::Result> Create( + std::vector& ops, + int64_t initial_capacity = 4096, + arrow::MemoryPool* pool = arrow::default_memory_pool()); + + // By the time we call the destructor, the contents + // of the buffer are already moved to fairmq + // for being sent. + ~TTreeDeferredReadOutputStream() override = default; + + // Implement the OutputStream interface + + /// Close the stream, preserving the buffer (retrieve it with Finish()). + arrow::Status Close() override; + [[nodiscard]] bool closed() const override; + [[nodiscard]] arrow::Result Tell() const override; + arrow::Status Write(const void* data, int64_t nbytes) override; + + /// \cond FALSE + using OutputStream::Write; + /// \endcond + + /// Close the stream and return the buffer + arrow::Result> Finish(); + + /// \brief Initialize state of OutputStream with newly allocated memory and + /// set position to 0 + /// \param[in] initial_capacity the starting allocated capacity + /// \param[in,out] pool the memory pool to use for allocations + /// \return Status + arrow::Status Reset(std::vector ops, + int64_t initial_capacity, arrow::MemoryPool* pool); + + [[nodiscard]] int64_t capacity() const { return capacity_; } + + private: + TTreeDeferredReadOutputStream(); + std::vector ops_; + + // Ensures there is sufficient space available to write nbytes + arrow::Status Reserve(int64_t nbytes); + + std::shared_ptr buffer_; + bool is_open_; + int64_t capacity_; + int64_t position_; + uint8_t* mutable_data_; +}; + +static constexpr int64_t kBufferMinimumSize = 256; + +TTreeDeferredReadOutputStream::TTreeDeferredReadOutputStream() + : is_open_(false), capacity_(0), position_(0), mutable_data_(nullptr) {} + +TTreeDeferredReadOutputStream::TTreeDeferredReadOutputStream(std::vector& ops, + const std::shared_ptr& buffer) + : ops_(ops), + buffer_(buffer), + is_open_(true), + capacity_(buffer->size()), + position_(0), + mutable_data_(buffer->mutable_data()) {} + +arrow::Result> TTreeDeferredReadOutputStream::Create( + std::vector& ops, + int64_t initial_capacity, arrow::MemoryPool* pool) +{ + // ctor is private, so cannot use make_shared + auto ptr = std::shared_ptr(new TTreeDeferredReadOutputStream); + RETURN_NOT_OK(ptr->Reset(ops, initial_capacity, pool)); + return ptr; +} + +arrow::Status TTreeDeferredReadOutputStream::Reset(std::vector ops, + int64_t initial_capacity, arrow::MemoryPool* pool) +{ + ARROW_ASSIGN_OR_RAISE(buffer_, AllocateResizableBuffer(initial_capacity, pool)); + ops_ = ops; + is_open_ = true; + capacity_ = initial_capacity; + position_ = 0; + mutable_data_ = buffer_->mutable_data(); + return arrow::Status::OK(); +} + +arrow::Status TTreeDeferredReadOutputStream::Close() +{ + if (is_open_) { + is_open_ = false; + if (position_ < capacity_) { + RETURN_NOT_OK(buffer_->Resize(position_, false)); + } + } + return arrow::Status::OK(); +} + +bool TTreeDeferredReadOutputStream::closed() const { return !is_open_; } + +arrow::Result> TTreeDeferredReadOutputStream::Finish() +{ + RETURN_NOT_OK(Close()); + buffer_->ZeroPadding(); + is_open_ = false; + return std::move(buffer_); +} + +arrow::Result TTreeDeferredReadOutputStream::Tell() const { return position_; } + +auto readValues = [](uint8_t* target, ReadOps& op, TBufferFile& rootBuffer) { + int readEntries = 0; + rootBuffer.Reset(); + while (readEntries < op.rootBranchEntries) { + auto readLast = op.branch->GetBulkRead().GetEntriesSerialized(readEntries, rootBuffer); + int size = readLast * op.listSize; + readEntries += readLast; + swapCopy(target, rootBuffer.GetCurrent(), size, op.typeSize); + target += (ptrdiff_t)(size * op.typeSize); + } +}; + +auto readBoolValues = [](uint8_t* target, ReadOps& op, TBufferFile& rootBuffer) { + int readEntries = 0; + rootBuffer.Reset(); + // Set to 0 + memset(target, 0, op.targetBuffer->size()); + int readLast = 0; + while (readEntries < op.rootBranchEntries) { + auto beginValue = readLast; + auto readLast = op.branch->GetBulkRead().GetBulkEntries(readEntries, rootBuffer); + int size = readLast * op.listSize; + readEntries += readLast; + for (int i = beginValue; i < beginValue + size; ++i) { + auto value = static_cast(rootBuffer.GetCurrent()[i - beginValue] << (i % 8)); + target[i / 8] |= value; + } + } +}; + +auto readVLAValues = [](uint8_t* target, ReadOps& op, ReadOps const& offsetOp, TBufferFile& rootBuffer) { + int readEntries = 0; + auto* tPtrOffset = reinterpret_cast(offsetOp.targetBuffer->data()); + std::span const offsets{tPtrOffset, tPtrOffset + offsetOp.rootBranchEntries + 1}; + + rootBuffer.Reset(); + while (readEntries < op.rootBranchEntries) { + auto readLast = op.branch->GetBulkRead().GetEntriesSerialized(readEntries, rootBuffer); + int size = offsets[readEntries + readLast] - offsets[readEntries]; + readEntries += readLast; + swapCopy(target, rootBuffer.GetCurrent(), size, op.typeSize); + target += (ptrdiff_t)(size * op.typeSize); + } +}; + +arrow::Status TTreeDeferredReadOutputStream::Write(const void* data, int64_t nbytes) +{ + if (ARROW_PREDICT_FALSE(!is_open_)) { + return arrow::Status::IOError("OutputStream is closed"); + } + if (ARROW_PREDICT_TRUE(nbytes == 0)) { + return arrow::Status::OK(); + } + if (ARROW_PREDICT_FALSE(position_ + nbytes >= capacity_)) { + RETURN_NOT_OK(Reserve(nbytes)); + } + // This is a real address which needs to be copied. Do it! + auto ref = (int64_t)data; + if (ref >= ops_.size()) { + memcpy(mutable_data_ + position_, data, nbytes); + position_ += nbytes; + return arrow::Status::OK(); + } + auto& op = ops_[ref]; + static TBufferFile rootBuffer{TBuffer::EMode::kWrite, 4 * 1024 * 1024}; + + switch (op.kind) { + // Offsets need to be read in advance because we need to know + // how many elements are there in total (since TTree does not allow discovering such informantion) + case ReadOpKind::Offsets: + break; + case ReadOpKind::Values: + readValues(mutable_data_ + position_, op, rootBuffer); + break; + case ReadOpKind::VLA: + readVLAValues(mutable_data_ + position_, op, ops_[ref - 1], rootBuffer); + break; + case ReadOpKind::Booleans: + readBoolValues(mutable_data_ + position_, op, rootBuffer); + break; + case ReadOpKind::Unknown: + throw runtime_error("Unknown Op"); + } + op.branch->SetStatus(false); + op.branch->DropBaskets("all"); + op.branch->Reset(); + op.branch->GetTransientBuffer(0)->Expand(0); + + position_ += nbytes; + return arrow::Status::OK(); +} + +arrow::Status TTreeDeferredReadOutputStream::Reserve(int64_t nbytes) +{ + // Always overallocate by doubling. It seems that it is a better growth + // strategy, at least for memory_benchmark.cc. + // This may be because it helps match the allocator's allocation buckets + // more exactly. Or perhaps it hits a sweet spot in jemalloc. + int64_t new_capacity = std::max(kBufferMinimumSize, capacity_); + new_capacity = position_ + nbytes; + if (new_capacity > capacity_) { + RETURN_NOT_OK(buffer_->Resize(new_capacity)); + capacity_ = new_capacity; + mutable_data_ = buffer_->mutable_data(); + } + return arrow::Status::OK(); +} + class TTreeFileWriteOptions : public arrow::dataset::FileWriteOptions { public: @@ -175,8 +435,21 @@ class TTreeFileFragment : public arrow::dataset::FileFragment return mTree.get(); } + std::vector& ops() + { + return mOps; + } + + /// The pointer to each allocation is an incremental number, indexing a collection to track + /// the size of each allocation. + std::shared_ptr GetPlaceholderForOp(size_t size) + { + return std::make_shared((uint8_t*)(mOps.size() - 1), size); + } + private: std::unique_ptr mTree; + std::vector mOps; }; // An arrow outputstream which allows to write to a TTree. Eventually @@ -247,6 +520,9 @@ bool TTreeOutputStream::closed() const TBranch* TTreeOutputStream::CreateBranch(char const* branchName, char const* sizeBranch) { + if (mBranchPrefix.empty() == true) { + return mTree->Branch(branchName, (char*)nullptr, sizeBranch); + } return mTree->Branch((mBranchPrefix + "/" + branchName).c_str(), (char*)nullptr, (mBranchPrefix + sizeBranch).c_str()); } @@ -264,7 +540,10 @@ struct TTreeObjectReadingImplementation : public RootArrowFactoryPlugin { return new RootArrowFactory{ .options = [context]() { return context->format->DefaultWriteOptions(); }, .format = [context]() { return context->format; }, - }; + .deferredOutputStreamer = [](std::shared_ptr fragment, const std::shared_ptr& buffer) -> std::shared_ptr { + auto treeFragment = std::dynamic_pointer_cast(fragment); + return std::make_shared(treeFragment->ops(), buffer); + }}; } }; @@ -274,10 +553,36 @@ struct BranchFieldMapping { int datasetFieldIdx; }; +auto readOffsets = [](ReadOps& op, TBufferFile& rootBuffer) { + uint32_t offset = 0; + std::span offsets; + int readEntries = 0; + int count = 0; + auto* tPtrOffset = reinterpret_cast(op.targetBuffer->mutable_data()); + offsets = std::span{tPtrOffset, tPtrOffset + op.rootBranchEntries + 1}; + + // read sizes first + rootBuffer.Reset(); + while (readEntries < op.rootBranchEntries) { + auto readLast = op.branch->GetBulkRead().GetEntriesSerialized(readEntries, rootBuffer); + if (readLast == -1) { + throw runtime_error_f("Unable to read from branch %s.", op.branch->GetName()); + } + readEntries += readLast; + for (auto i = 0; i < readLast; ++i) { + offsets[count++] = (int)offset; + offset += swap32_(reinterpret_cast(rootBuffer.GetCurrent())[i]); + } + } + offsets[count] = (int)offset; + op.offsetCount = offset; +}; + arrow::Result TTreeFileFormat::ScanBatchesAsync( const std::shared_ptr& options, const std::shared_ptr& fragment) const { + assert(options->dataset_schema != nullptr); // This is the schema we want to read auto dataset_schema = options->dataset_schema; auto treeFragment = std::dynamic_pointer_cast(fragment); @@ -300,6 +605,8 @@ arrow::Result TTreeFileFormat::ScanBatchesAsync( // Register physical fields into the cache std::vector mappings; + // We need to count the number of readops to avoid moving the vector. + int opsCount = 0; for (int fi = 0; fi < dataset_schema->num_fields(); ++fi) { auto dataset_field = dataset_schema->field(fi); // This is needed because for now the dataset_field @@ -308,18 +615,17 @@ arrow::Result TTreeFileFormat::ScanBatchesAsync( int physicalFieldIdx = physical_schema->GetFieldIndex(dataset_field->name()); if (physicalFieldIdx < 0) { - throw runtime_error_f("Cannot find physical field associated to %s", dataset_field->name().c_str()); + throw runtime_error_f("Cannot find physical field associated to %s. Possible fields: %s", + dataset_field->name().c_str(), physical_schema->ToString().c_str()); } if (physicalFieldIdx > 1 && physical_schema->field(physicalFieldIdx - 1)->name().ends_with("_size")) { O2_SIGNPOST_EVENT_EMIT(root_arrow_fs, tid, "Generator", "Field %{public}s has sizes in %{public}s.", dataset_field->name().c_str(), physical_schema->field(physicalFieldIdx - 1)->name().c_str()); mappings.push_back({physicalFieldIdx, physicalFieldIdx - 1, fi}); + opsCount += 2; } else { - if (physicalFieldIdx > 1) { - O2_SIGNPOST_EVENT_EMIT(root_arrow_fs, tid, "Generator", "Field %{public}s previous field is %{public}s.", dataset_field->name().c_str(), - physical_schema->field(physicalFieldIdx - 1)->name().c_str()); - } mappings.push_back({physicalFieldIdx, -1, fi}); + opsCount++; } } @@ -346,174 +652,120 @@ arrow::Result TTreeFileFormat::ScanBatchesAsync( } tree->StopCacheLearningPhase(); - static TBufferFile buffer{TBuffer::EMode::kWrite, 4 * 1024 * 1024}; - - int64_t rows = -1; + // Intermediate buffer to bulk read. Two for now + static TBufferFile rootBuffer{TBuffer::EMode::kWrite, 4 * 1024 * 1024}; + std::vector& ops = treeFragment->ops(); + ops.clear(); + ops.reserve(opsCount); for (size_t mi = 0; mi < mappings.size(); ++mi) { BranchFieldMapping mapping = mappings[mi]; // The field actually on disk auto datasetField = dataset_schema->field(mapping.datasetFieldIdx); auto physicalField = physical_schema->field(mapping.mainBranchIdx); - auto* branch = (TBranch*)branches->At(mapping.mainBranchIdx); - assert(branch); - buffer.Reset(); - auto totalEntries = branch->GetEntries(); - if (rows == -1) { - rows = totalEntries; + + if (mapping.vlaIdx != -1) { + auto* branch = (TBranch*)branches->At(mapping.vlaIdx); + ops.emplace_back(ReadOps{ + .branch = branch, + .rootBranchEntries = branch->GetEntries(), + .typeSize = 4, + .listSize = 1, + .kind = ReadOpKind::Offsets, + }); + auto& op = ops.back(); + ARROW_ASSIGN_OR_RAISE(op.targetBuffer, arrow::AllocateBuffer((op.rootBranchEntries + 1) * op.typeSize, pool)); + // Offsets need to be read immediately to know how many values are there + readOffsets(op, rootBuffer); } - if (rows != totalEntries) { - throw runtime_error_f("Unmatching number of rows for branch %s", branch->GetName()); + ops.push_back({}); + auto& valueOp = ops.back(); + valueOp.branch = (TBranch*)branches->At(mapping.mainBranchIdx); + valueOp.rootBranchEntries = valueOp.branch->GetEntries(); + // In case this is a vla, we set the offsetCount as totalEntries + // In case we read booleans we need a special coversion from bytes to bits. + auto listType = std::dynamic_pointer_cast(datasetField->type()); + valueOp.typeSize = physicalField->type()->byte_width(); + // Notice how we are not (yet) allocating buffers at this point. We merely + // create placeholders to subsequently fill. + if ((datasetField->type() == arrow::boolean())) { + valueOp.kind = ReadOpKind::Booleans; + valueOp.listSize = 1; + valueOp.targetBuffer = treeFragment->GetPlaceholderForOp((valueOp.rootBranchEntries) / 8 + 1); + } else if (listType && datasetField->type()->field(0)->type() == arrow::boolean()) { + valueOp.typeSize = physicalField->type()->field(0)->type()->byte_width(); + valueOp.listSize = listType->list_size(); + valueOp.kind = ReadOpKind::Booleans; + valueOp.targetBuffer = treeFragment->GetPlaceholderForOp((valueOp.rootBranchEntries * valueOp.listSize) / 8 + 1); + } else if (mapping.vlaIdx != -1) { + valueOp.typeSize = physicalField->type()->field(0)->type()->byte_width(); + valueOp.listSize = -1; + // -1 is the current one, -2 is the one with for the offsets + valueOp.kind = ReadOpKind::VLA; + valueOp.targetBuffer = treeFragment->GetPlaceholderForOp(ops[ops.size() - 2].offsetCount * valueOp.typeSize); + } else if (listType) { + valueOp.kind = ReadOpKind::Values; + valueOp.listSize = listType->list_size(); + valueOp.typeSize = physicalField->type()->field(0)->type()->byte_width(); + valueOp.targetBuffer = treeFragment->GetPlaceholderForOp(valueOp.rootBranchEntries * valueOp.typeSize * valueOp.listSize); + } else { + valueOp.typeSize = physicalField->type()->byte_width(); + valueOp.kind = ReadOpKind::Values; + valueOp.listSize = 1; + valueOp.targetBuffer = treeFragment->GetPlaceholderForOp(valueOp.rootBranchEntries * valueOp.typeSize); } arrow::Status status; - int readEntries = 0; std::shared_ptr array; - auto listType = std::dynamic_pointer_cast(datasetField->type()); - if (datasetField->type() == arrow::boolean() || - (listType && datasetField->type()->field(0)->type() == arrow::boolean())) { - if (listType) { - std::unique_ptr builder = nullptr; - auto status = arrow::MakeBuilder(pool, datasetField->type()->field(0)->type(), &builder); - if (!status.ok()) { - throw runtime_error("Cannot create value builder"); - } - auto listBuilder = std::make_unique(pool, std::move(builder), listType->list_size()); - auto valueBuilder = listBuilder.get()->value_builder(); - // boolean array special case: we need to use builder to create the bitmap - status = valueBuilder->Reserve(totalEntries * listType->list_size()); - status &= listBuilder->Reserve(totalEntries); - if (!status.ok()) { - throw runtime_error("Failed to reserve memory for array builder"); - } - while (readEntries < totalEntries) { - auto readLast = branch->GetBulkRead().GetBulkEntries(readEntries, buffer); - readEntries += readLast; - status &= static_cast(valueBuilder)->AppendValues(reinterpret_cast(buffer.GetCurrent()), readLast * listType->list_size()); - } - status &= static_cast(listBuilder.get())->AppendValues(readEntries); - if (!status.ok()) { - throw runtime_error("Failed to append values to array"); - } - status &= listBuilder->Finish(&array); - if (!status.ok()) { - throw runtime_error("Failed to create array"); - } - } else if (listType == nullptr) { - std::unique_ptr builder = nullptr; - auto status = arrow::MakeBuilder(pool, datasetField->type(), &builder); - if (!status.ok()) { - throw runtime_error("Cannot create builder"); - } - auto valueBuilder = static_cast(builder.get()); - // boolean array special case: we need to use builder to create the bitmap - status = valueBuilder->Reserve(totalEntries); - if (!status.ok()) { - throw runtime_error("Failed to reserve memory for array builder"); - } - while (readEntries < totalEntries) { - auto readLast = branch->GetBulkRead().GetBulkEntries(readEntries, buffer); - readEntries += readLast; - status &= valueBuilder->AppendValues(reinterpret_cast(buffer.GetCurrent()), readLast); - } - if (!status.ok()) { - throw runtime_error("Failed to append values to array"); - } - status &= valueBuilder->Finish(&array); - if (!status.ok()) { - throw runtime_error("Failed to create array"); - } - } - } else { - // This is needed for branches which have not been persisted. - auto&& result = arrow::AllocateBuffer(branch->GetTotalSize(), pool); - if (!result.ok()) { - throw runtime_error("Cannot allocate values buffer"); - } - std::shared_ptr arrowValuesBuffer = result.MoveValueUnsafe(); - auto ptr = arrowValuesBuffer->mutable_data(); - if (ptr == nullptr) { - throw runtime_error("Invalid buffer"); - } - - std::unique_ptr offsetBuffer = nullptr; - - uint32_t offset = 0; - int count = 0; - std::shared_ptr arrowOffsetBuffer; - std::span offsets; - int size = 0; - uint32_t totalSize = 0; - if (mapping.vlaIdx != -1) { - auto* mSizeBranch = (TBranch*)branches->At(mapping.vlaIdx); - offsetBuffer = std::make_unique(TBuffer::EMode::kWrite, 4 * 1024 * 1024); - result = arrow::AllocateBuffer((totalEntries + 1) * (int64_t)sizeof(int), pool); - if (!result.ok()) { - throw runtime_error("Cannot allocate offset buffer"); - } - arrowOffsetBuffer = result.MoveValueUnsafe(); - unsigned char* ptrOffset = arrowOffsetBuffer->mutable_data(); - auto* tPtrOffset = reinterpret_cast(ptrOffset); - offsets = std::span{tPtrOffset, tPtrOffset + totalEntries + 1}; - - // read sizes first - while (readEntries < totalEntries) { - auto readLast = mSizeBranch->GetBulkRead().GetEntriesSerialized(readEntries, *offsetBuffer); - if (readLast == -1) { - throw runtime_error_f("Unable to read from branch %s.", mSizeBranch->GetName()); - } - readEntries += readLast; - for (auto i = 0; i < readLast; ++i) { - offsets[count++] = (int)offset; - offset += swap32_(reinterpret_cast(offsetBuffer->GetCurrent())[i]); - } - } - offsets[count] = (int)offset; - totalSize = offset; - readEntries = 0; - } - - int typeSize = physicalField->type()->byte_width(); - int64_t listSize = 1; - if (auto fixedSizeList = std::dynamic_pointer_cast(datasetField->type())) { - listSize = fixedSizeList->list_size(); - typeSize = physicalField->type()->field(0)->type()->byte_width(); - } else if (mapping.vlaIdx != -1) { - typeSize = physicalField->type()->field(0)->type()->byte_width(); - listSize = -1; - } - while (readEntries < totalEntries) { - auto readLast = branch->GetBulkRead().GetEntriesSerialized(readEntries, buffer); - if (mapping.vlaIdx != -1) { - size = offsets[readEntries + readLast] - offsets[readEntries]; - } else { - size = readLast * listSize; - } - readEntries += readLast; - swapCopy(ptr, buffer.GetCurrent(), size, typeSize); - ptr += (ptrdiff_t)(size * typeSize); - } - if (listSize >= 1) { - totalSize = readEntries * listSize; - } - if (listSize == 1) { - array = std::make_shared(datasetField->type(), readEntries, arrowValuesBuffer); - } else { - auto varray = std::make_shared(datasetField->type()->field(0)->type(), totalSize, arrowValuesBuffer); - if (mapping.vlaIdx != -1) { - array = std::make_shared(datasetField->type(), readEntries, arrowOffsetBuffer, varray); - } else { - array = std::make_shared(datasetField->type(), readEntries, varray); - } - } + if (listType) { + auto varray = std::make_shared(datasetField->type()->field(0)->type(), valueOp.rootBranchEntries * valueOp.listSize, valueOp.targetBuffer); + array = std::make_shared(datasetField->type(), valueOp.rootBranchEntries, varray); + // This is a vla, there is also an offset op + O2_SIGNPOST_EVENT_EMIT(root_arrow_fs, tid, "Op", "Created op for branch %{public}s with %lli entries, size of the buffer %lli.", + valueOp.branch->GetName(), + valueOp.rootBranchEntries, + valueOp.targetBuffer->size()); + } else if (mapping.vlaIdx != -1) { + auto& offsetOp = ops[ops.size() - 2]; + auto varray = std::make_shared(datasetField->type()->field(0)->type(), offsetOp.offsetCount, valueOp.targetBuffer); + // We have pushed an offset op if this was the case. + array = std::make_shared(datasetField->type(), offsetOp.rootBranchEntries, offsetOp.targetBuffer, varray); + O2_SIGNPOST_EVENT_EMIT(root_arrow_fs, tid, "Op", "Created op for branch %{public}s with %lli entries, size of the buffer %lli.", + offsetOp.branch->GetName(), offsetOp.rootBranchEntries, offsetOp.targetBuffer->size()); + O2_SIGNPOST_EVENT_EMIT(root_arrow_fs, tid, "Op", "Created op for branch %{public}s with %lli entries, size of the buffer %lli.", + valueOp.branch->GetName(), + offsetOp.offsetCount, + valueOp.targetBuffer->size()); + } else { + array = std::make_shared(datasetField->type(), valueOp.rootBranchEntries, valueOp.targetBuffer); + O2_SIGNPOST_EVENT_EMIT(root_arrow_fs, tid, "Op", "Created op for branch %{public}s with %lli entries, size of the buffer %lli.", + valueOp.branch->GetName(), + valueOp.rootBranchEntries, + valueOp.targetBuffer->size()); } - branch->SetStatus(false); - branch->DropBaskets("all"); - branch->Reset(); - branch->GetTransientBuffer(0)->Expand(0); - columns.push_back(array); } + + // Do the actual filling of the buffers. This happens after we have created the whole structure + // so that we can read directly in shared memory. + int64_t rows = -1; + for (size_t i = 0; i < ops.size(); ++i) { + auto& op = ops[i]; + if (rows == -1 && op.kind != ReadOpKind::VLA) { + rows = op.rootBranchEntries; + } + if (rows == -1 && op.kind == ReadOpKind::VLA) { + auto& offsetOp = ops[i - 1]; + rows = offsetOp.rootBranchEntries; + } + if (op.kind != ReadOpKind::VLA && rows != op.rootBranchEntries) { + throw runtime_error_f("Unmatching number of rows for branch %s. Expected %lli, found %lli", op.branch->GetName(), rows, op.rootBranchEntries); + } + if (op.kind == ReadOpKind::VLA && rows != ops[i - 1].rootBranchEntries) { + throw runtime_error_f("Unmatching number of rows for branch %s. Expected %lli, found %lli", op.branch->GetName(), rows, ops[i - 1].offsetCount); + } + } + auto batch = arrow::RecordBatch::Make(dataset_schema, rows, columns); totalCompressedSize += tree->GetZipBytes(); totalUncompressedSize += tree->GetTotBytes(); @@ -843,11 +1095,31 @@ class TTreeFileWriter : public arrow::dataset::FileWriter switch (field->type()->id()) { case arrow::Type::FIXED_SIZE_LIST: { auto list = std::static_pointer_cast(column); - valueArrays.back() = list->values(); + if (list->list_type()->field(0)->type()->id() == arrow::Type::BOOL) { + int64_t length = list->length() * list->list_type()->list_size(); + arrow::UInt8Builder builder; + auto ok = builder.Reserve(length); + // I need to build an array of uint8_t for the conversion to ROOT which uses + // bytes for boolans. + auto boolArray = std::static_pointer_cast(list->values()); + for (int64_t i = 0; i < length; ++i) { + if (boolArray->IsValid(i)) { + // Expand each boolean value (true/false) to uint8 (1/0) + uint8_t value = boolArray->Value(i) ? 1 : 0; + auto ok = builder.Append(value); + } else { + // Append null for invalid entries + auto ok = builder.AppendNull(); + } + } + valueArrays.back() = *builder.Finish(); + } else { + valueArrays.back() = list->values(); + } } break; case arrow::Type::LIST: { auto list = std::static_pointer_cast(column); - valueArrays.back() = list; + valueArrays.back() = list->values(); } break; case arrow::Type::BOOL: { // In case of arrays of booleans, we need to go back to their @@ -893,11 +1165,12 @@ class TTreeFileWriter : public arrow::dataset::FileWriter uint8_t const* buffer = std::static_pointer_cast(valueArray)->values()->data() + array->offset() + list->value_offset(pos) * valueType->byte_width(); branch->SetAddress((void*)buffer); sizeBranch->SetAddress(&listSize); - }; - break; + } break; case arrow::Type::FIXED_SIZE_LIST: default: { - uint8_t const* buffer = std::static_pointer_cast(valueArray)->values()->data() + array->offset() + pos * listSize * valueType->byte_width(); + // needed for the boolean case, I should probably cache this. + auto byteWidth = valueType->byte_width() ? valueType->byte_width() : 1; + uint8_t const* buffer = std::static_pointer_cast(valueArray)->values()->data() + array->offset() + pos * listSize * byteWidth; branch->SetAddress((void*)buffer); }; } diff --git a/Framework/Core/include/Framework/RootArrowFilesystem.h b/Framework/Core/include/Framework/RootArrowFilesystem.h index 6f331ddbbca4e..5aceaed077001 100644 --- a/Framework/Core/include/Framework/RootArrowFilesystem.h +++ b/Framework/Core/include/Framework/RootArrowFilesystem.h @@ -12,6 +12,7 @@ #define O2_FRAMEWORK_ROOT_ARROW_FILESYSTEM_H_ #include +#include #include #include #include @@ -96,6 +97,9 @@ class VirtualRootFileSystemBase : public arrow::fs::FileSystem struct RootArrowFactory final { std::function()> options = nullptr; std::function()> format = nullptr; + // Builds an output streamer which is able to read from the source fragment + // in a deferred way. + std::function(std::shared_ptr, const std::shared_ptr& buffer)> deferredOutputStreamer = nullptr; }; struct RootArrowFactoryPlugin { diff --git a/Framework/Core/include/Framework/TableTreeHelpers.h b/Framework/Core/include/Framework/TableTreeHelpers.h index c6a769e579fb7..92725d186ee33 100644 --- a/Framework/Core/include/Framework/TableTreeHelpers.h +++ b/Framework/Core/include/Framework/TableTreeHelpers.h @@ -11,6 +11,8 @@ #ifndef O2_FRAMEWORK_TABLETREEHELPERS_H_ #define O2_FRAMEWORK_TABLETREEHELPERS_H_ +#include +#include #include #include "TFile.h" #include "TTreeReader.h" @@ -146,15 +148,25 @@ class TreeToTable class FragmentToBatch { public: - FragmentToBatch(arrow::MemoryPool* pool = arrow::default_memory_pool()); + // The function to be used to create the required stream. + using StreamerCreator = std::function(std::shared_ptr, const std::shared_ptr& buffer)>; + + FragmentToBatch(StreamerCreator, std::shared_ptr, arrow::MemoryPool* pool = arrow::default_memory_pool()); void setLabel(const char* label); - void fill(std::shared_ptr, std::shared_ptr dataSetSchema, std::shared_ptr); + void fill(std::shared_ptr dataSetSchema, std::shared_ptr); std::shared_ptr finalize(); + std::shared_ptr streamer(std::shared_ptr buffer) + { + return mCreator(mFragment, buffer); + } + private: + std::shared_ptr mFragment; arrow::MemoryPool* mArrowMemoryPool = nullptr; std::string mTableLabel; std::shared_ptr mRecordBatch; + StreamerCreator mCreator; }; // ----------------------------------------------------------------------------- diff --git a/Framework/Core/src/DataAllocator.cxx b/Framework/Core/src/DataAllocator.cxx index c310892c4c490..b735eee1f3308 100644 --- a/Framework/Core/src/DataAllocator.cxx +++ b/Framework/Core/src/DataAllocator.cxx @@ -211,34 +211,6 @@ void doWriteTable(std::shared_ptr b, arrow::Table* table) } } -void doWriteBatch(std::shared_ptr b, arrow::RecordBatch* batch) -{ - auto mock = std::make_shared(); - int64_t expectedSize = 0; - auto mockWriter = arrow::ipc::MakeStreamWriter(mock.get(), batch->schema()); - arrow::Status outStatus = mockWriter.ValueOrDie()->WriteRecordBatch(*batch); - - expectedSize = mock->Tell().ValueOrDie(); - auto reserve = b->Reserve(expectedSize); - if (reserve.ok() == false) { - throw std::runtime_error("Unable to reserve memory for table"); - } - - auto stream = std::make_shared(b); - // This is a copy maybe we can finally get rid of it by having using the - // dataset API? - auto outBatch = arrow::ipc::MakeStreamWriter(stream.get(), batch->schema()); - if (outBatch.ok() == false) { - throw ::std::runtime_error("Unable to create batch writer"); - } - - outStatus = outBatch.ValueOrDie()->WriteRecordBatch(*batch); - - if (outStatus.ok() == false) { - throw std::runtime_error("Unable to Write batch"); - } -} - void DataAllocator::adopt(const Output& spec, LifetimeHolder& tb) { auto& timingInfo = mRegistry.get(); @@ -318,16 +290,35 @@ void DataAllocator::adopt(const Output& spec, LifetimeHolder& f // Serialization happens in here, so that we can // get rid of the intermediate tree 2 table object, saving memory. auto batch = source.finalize(); - doWriteBatch(buffer, batch.get()); + auto mock = std::make_shared(); + int64_t expectedSize = 0; + auto mockWriter = arrow::ipc::MakeStreamWriter(mock.get(), batch->schema()); + arrow::Status outStatus = mockWriter.ValueOrDie()->WriteRecordBatch(*batch); + + expectedSize = mock->Tell().ValueOrDie(); + auto reserve = buffer->Reserve(expectedSize); + if (reserve.ok() == false) { + throw std::runtime_error("Unable to reserve memory for table"); + } + + auto deferredWriterStream = source.streamer(buffer); + + auto outBatch = arrow::ipc::MakeStreamWriter(deferredWriterStream, batch->schema()); + if (outBatch.ok() == false) { + throw ::std::runtime_error("Unable to create batch writer"); + } + + outStatus = outBatch.ValueOrDie()->WriteRecordBatch(*batch); + + if (outStatus.ok() == false) { + throw std::runtime_error("Unable to Write batch"); + } // deletion happens in the caller }; - /// To finalise this we write the table to the buffer. - /// FIXME: most likely not a great idea. We should probably write to the buffer - /// directly in the TableBuilder, incrementally. auto finalizer = [](std::shared_ptr b) -> void { // This is empty because we already serialised the object when - // the LifetimeHolder goes out of scope. + // the LifetimeHolder goes out of scope. See code above. }; context.addBuffer(std::move(header), buffer, std::move(finalizer), routeIndex); diff --git a/Framework/Core/src/TableTreeHelpers.cxx b/Framework/Core/src/TableTreeHelpers.cxx index 2f23c07aea451..84d4ff171bc39 100644 --- a/Framework/Core/src/TableTreeHelpers.cxx +++ b/Framework/Core/src/TableTreeHelpers.cxx @@ -13,7 +13,6 @@ #include "Framework/Endian.h" #include "Framework/Signpost.h" -#include "arrow/type_traits.h" #include #include #include @@ -533,7 +532,7 @@ void TreeToTable::setLabel(const char* label) mTableLabel = label; } -void TreeToTable::fill(TTree*tree) +void TreeToTable::fill(TTree* tree) { std::vector> columns; std::vector> fields; @@ -569,8 +568,10 @@ std::shared_ptr TreeToTable::finalize() return mTable; } -FragmentToBatch::FragmentToBatch(arrow::MemoryPool* pool) - : mArrowMemoryPool{pool} +FragmentToBatch::FragmentToBatch(StreamerCreator creator, std::shared_ptr fragment, arrow::MemoryPool* pool) + : mFragment{std::move(fragment)}, + mArrowMemoryPool{pool}, + mCreator{std::move(creator)} { } @@ -579,13 +580,14 @@ void FragmentToBatch::setLabel(const char* label) mTableLabel = label; } -void FragmentToBatch::fill(std::shared_ptr fragment, std::shared_ptr schema, std::shared_ptr format) +void FragmentToBatch::fill(std::shared_ptr schema, std::shared_ptr format) { auto options = std::make_shared(); options->dataset_schema = schema; - auto scanner = format->ScanBatchesAsync(options, fragment); + auto scanner = format->ScanBatchesAsync(options, mFragment); auto batch = (*scanner)(); mRecordBatch = *batch.result(); + // Notice that up to here the buffer was not yet filled. } std::shared_ptr FragmentToBatch::finalize() diff --git a/Framework/Core/test/test_Root2ArrowTable.cxx b/Framework/Core/test/test_Root2ArrowTable.cxx index 438f388ec86b5..f87c3014eb0f4 100644 --- a/Framework/Core/test/test_Root2ArrowTable.cxx +++ b/Framework/Core/test/test_Root2ArrowTable.cxx @@ -38,6 +38,7 @@ #include #include +#include #include #include #include @@ -388,6 +389,7 @@ bool validatePhysicalSchema(std::shared_ptr schema) { REQUIRE(schema->num_fields() == 12); REQUIRE(schema->field(0)->type()->id() == arrow::float32()->id()); + REQUIRE(schema->field(0)->name() == "px"); REQUIRE(schema->field(1)->type()->id() == arrow::float32()->id()); REQUIRE(schema->field(2)->type()->id() == arrow::float32()->id()); REQUIRE(schema->field(3)->type()->id() == arrow::float64()->id()); @@ -541,12 +543,28 @@ TEST_CASE("RootTree2Dataset") options->dataset_schema = schema; auto scanner = format->ScanBatchesAsync(options, *fragment); REQUIRE(scanner.ok()); + + // This is batch has deferred contents. Therefore we need to use a DeferredOutputStream to + // write it to a real one and read it back with the BufferReader, which is hopefully zero copy + std::shared_ptr batch; + auto batches = (*scanner)(); auto result = batches.result(); REQUIRE(result.ok()); REQUIRE((*result)->columns().size() == 11); REQUIRE((*result)->num_rows() == 100); - validateContents(*result); + std::shared_ptr buffer = *arrow::AllocateResizableBuffer(1000, 64); + auto deferredWriterStream = factory.capabilities[1].factory().deferredOutputStreamer(*fragment, buffer); + auto outBatch = arrow::ipc::MakeStreamWriter(deferredWriterStream.get(), schema); + auto status = outBatch.ValueOrDie()->WriteRecordBatch(**result); + std::shared_ptr bufferReader = std::make_shared(buffer); + auto readerResult = arrow::ipc::RecordBatchStreamReader::Open(bufferReader); + auto batchReader = readerResult.ValueOrDie(); + + auto next = batchReader->ReadNext(&batch); + REQUIRE(batch != nullptr); + + validateContents(batch); auto* output = new TMemFile("foo", "RECREATE"); auto outFs = std::make_shared(output, 0, factory); @@ -558,7 +576,8 @@ TEST_CASE("RootTree2Dataset") // Write to the /DF_3 tree at top level arrow::fs::FileLocator locator{outFs, "/DF_3"}; auto writer = format->MakeWriter(*destination, schema, {}, locator); - auto success = writer->get()->Write(*result); + auto success = writer->get()->Write(batch); + REQUIRE(batch->schema()->field(0)->name() == "px"); auto rootDestination = std::dynamic_pointer_cast(*destination); SECTION("Read tree") @@ -568,7 +587,11 @@ TEST_CASE("RootTree2Dataset") auto tfileFs = std::dynamic_pointer_cast(outFs); REQUIRE(tfileFs.get()); REQUIRE(tfileFs->GetFile()); - REQUIRE(tfileFs->GetFile()->GetObjectChecked("/DF_3", TClass::GetClass("TTree"))); + auto* tree = (TTree*)tfileFs->GetFile()->GetObjectChecked("/DF_3", TClass::GetClass("TTree")); + REQUIRE(tree != nullptr); + REQUIRE(((TBranch*)tree->GetListOfBranches()->At(0))->GetEntries() == 100); + REQUIRE(((TBranch*)tree->GetListOfBranches()->At(0))->GetName() == std::string("px")); + arrow::dataset::FileSource source2("/DF_3", outFs); REQUIRE(format->IsSupported(source2) == true); @@ -577,6 +600,10 @@ TEST_CASE("RootTree2Dataset") REQUIRE(tfileFs->GetFile()); REQUIRE(tfileFs->GetFile()->GetObjectChecked("/DF_3", TClass::GetClass("TTree"))); + tree = (TTree*)tfileFs->GetFile()->GetObjectChecked("/DF_3", TClass::GetClass("TTree")); + REQUIRE(tree != nullptr); + REQUIRE(((TBranch*)tree->GetListOfBranches()->At(0))->GetEntries() == 100); + auto schemaOptWritten = format->Inspect(source2); tfileFs = std::dynamic_pointer_cast(source2.filesystem()); REQUIRE(tfileFs.get()); @@ -585,6 +612,10 @@ TEST_CASE("RootTree2Dataset") REQUIRE(schemaOptWritten.ok()); auto schemaWritten = *schemaOptWritten; + tree = (TTree*)tfileFs->GetFile()->GetObjectChecked("/DF_3", TClass::GetClass("TTree")); + REQUIRE(tree != nullptr); + REQUIRE(((TBranch*)tree->GetListOfBranches()->At(0))->GetEntries() == 100); + REQUIRE(validatePhysicalSchema(schemaWritten)); std::vector> fields; for (auto& field : schemaWritten->fields()) { @@ -599,23 +630,38 @@ TEST_CASE("RootTree2Dataset") auto fragmentWritten = format->MakeFragment(source2, {}, *physicalSchema); REQUIRE(fragmentWritten.ok()); auto optionsWritten = std::make_shared(); - options->dataset_schema = schema; - auto scannerWritten = format->ScanBatchesAsync(optionsWritten, *fragment); + optionsWritten->dataset_schema = schema; + auto scannerWritten = format->ScanBatchesAsync(optionsWritten, *fragmentWritten); REQUIRE(scannerWritten.ok()); - auto batchesWritten = (*scanner)(); - auto resultWritten = batches.result(); + tree = (TTree*)tfileFs->GetFile()->GetObjectChecked("/DF_3", TClass::GetClass("TTree")); + REQUIRE(tree != nullptr); + REQUIRE(((TBranch*)tree->GetListOfBranches()->At(0))->GetEntries() == 100); + auto batchesWritten = (*scannerWritten)(); + auto resultWritten = batchesWritten.result(); REQUIRE(resultWritten.ok()); REQUIRE((*resultWritten)->columns().size() == 11); REQUIRE((*resultWritten)->num_rows() == 100); - validateContents(*resultWritten); + + std::shared_ptr buffer = *arrow::AllocateResizableBuffer(1000, 64); + auto deferredwriterstream = factory.capabilities[1].factory().deferredOutputStreamer(*fragmentWritten, buffer); + auto outBatch = arrow::ipc::MakeStreamWriter(deferredWriterStream.get(), schema); + auto status = outBatch.ValueOrDie()->WriteRecordBatch(**resultWritten); + std::shared_ptr bufferReader = std::make_shared(buffer); + auto readerResult = arrow::ipc::RecordBatchStreamReader::Open(bufferReader); + auto batchReader = readerResult.ValueOrDie(); + + auto next = batchReader->ReadNext(&batch); + REQUIRE(batch != nullptr); + validateContents(batch); } + arrow::fs::FileLocator rnTupleLocator{outFs, "/rntuple"}; // We write an RNTuple in the same TMemFile, using /rntuple as a location auto rntupleDestination = std::dynamic_pointer_cast(*destination); { auto rNtupleWriter = rNtupleFormat->MakeWriter(*destination, schema, {}, rnTupleLocator); - auto rNtupleSuccess = rNtupleWriter->get()->Write(*result); + auto rNtupleSuccess = rNtupleWriter->get()->Write(batch); REQUIRE(rNtupleSuccess.ok()); }