diff --git a/Framework/AnalysisSupport/src/AODJAlienReaderHelpers.cxx b/Framework/AnalysisSupport/src/AODJAlienReaderHelpers.cxx index 9c19de85739ce..f8a9705e4eb62 100644 --- a/Framework/AnalysisSupport/src/AODJAlienReaderHelpers.cxx +++ b/Framework/AnalysisSupport/src/AODJAlienReaderHelpers.cxx @@ -10,10 +10,12 @@ // or submit itself to any jurisdiction. #include "AODJAlienReaderHelpers.h" +#include #include "Framework/TableTreeHelpers.h" #include "Framework/AnalysisHelpers.h" #include "Framework/DataProcessingStats.h" #include "Framework/RootTableBuilderHelpers.h" +#include "Framework/RootArrowFilesystem.h" #include "Framework/AlgorithmSpec.h" #include "Framework/ConfigParamRegistry.h" #include "Framework/ControlService.h" @@ -41,6 +43,8 @@ #include #include #include +#include +#include using namespace o2; using namespace o2::aod; @@ -272,11 +276,13 @@ AlgorithmSpec AODJAlienReaderHelpers::rootFileReaderCallback(ConfigContext const // Origin file name for derived output map auto o2 = Output(TFFileNameHeader); auto fileAndFolder = didir->getFileFolder(dh, fcnt, ntf); - std::string currentFilename(fileAndFolder.file->GetName()); - if (strcmp(fileAndFolder.file->GetEndpointUrl()->GetProtocol(), "file") == 0 && fileAndFolder.file->GetEndpointUrl()->GetFile()[0] != '/') { + auto rootFS = std::dynamic_pointer_cast(fileAndFolder.filesystem()); + auto* f = dynamic_cast(rootFS->GetFile()); + std::string currentFilename(f->GetFile()->GetName()); + if (strcmp(f->GetEndpointUrl()->GetProtocol(), "file") == 0 && f->GetEndpointUrl()->GetFile()[0] != '/') { // This is not an absolute local path. Make it absolute. static std::string pwd = gSystem->pwd() + std::string("/"); - currentFilename = pwd + std::string(fileAndFolder.file->GetName()); + currentFilename = pwd + std::string(f->GetName()); } outputs.make(o2) = currentFilename; } @@ -312,7 +318,9 @@ AlgorithmSpec AODJAlienReaderHelpers::rootFileReaderCallback(ConfigContext const auto concrete = DataSpecUtils::asConcreteDataMatcher(firstRoute.matcher); auto dh = header::DataHeader(concrete.description, concrete.origin, concrete.subSpec); auto fileAndFolder = didir->getFileFolder(dh, fcnt, ntf); - if (!fileAndFolder.file) { + + // In case the filesource is empty, move to the next one. + if (fileAndFolder.path().empty()) { fcnt += 1; ntf = 0; if (didir->atEnd(fcnt)) { diff --git a/Framework/AnalysisSupport/src/DataInputDirector.cxx b/Framework/AnalysisSupport/src/DataInputDirector.cxx index 172ecd66c0e64..1daab029b3e8e 100644 --- a/Framework/AnalysisSupport/src/DataInputDirector.cxx +++ b/Framework/AnalysisSupport/src/DataInputDirector.cxx @@ -11,6 +11,8 @@ #include "DataInputDirector.h" #include "Framework/DataDescriptorQueryBuilder.h" #include "Framework/Logger.h" +#include "Framework/PluginManager.h" +#include "Framework/RootArrowFilesystem.h" #include "Framework/AnalysisDataModelHelpers.h" #include "Framework/Output.h" #include "Headers/DataHeader.h" @@ -26,8 +28,12 @@ #include "TGrid.h" #include "TObjString.h" #include "TMap.h" +#include "TFile.h" +#include +#include #include +#include #if __has_include() #include @@ -47,12 +53,27 @@ FileNameHolder* makeFileNameHolder(std::string fileName) return fileNameHolder; } -DataInputDescriptor::DataInputDescriptor(bool alienSupport, int level, o2::monitoring::Monitoring* monitoring, int allowedParentLevel, std::string parentFileReplacement) : mAlienSupport(alienSupport), - mMonitoring(monitoring), - mAllowedParentLevel(allowedParentLevel), - mParentFileReplacement(std::move(parentFileReplacement)), - mLevel(level) +DataInputDescriptor::DataInputDescriptor(bool alienSupport, int level, o2::monitoring::Monitoring* monitoring, int allowedParentLevel, std::string parentFileReplacement) + : mAlienSupport(alienSupport), + mMonitoring(monitoring), + mAllowedParentLevel(allowedParentLevel), + mParentFileReplacement(std::move(parentFileReplacement)), + mLevel(level) { + std::vector capabilitiesSpecs = { + "O2Framework:RNTupleObjectReadingCapability", + "O2Framework:TTreeObjectReadingCapability", + }; + + std::vector plugins; + for (auto spec : capabilitiesSpecs) { + auto morePlugins = PluginManager::parsePluginSpecString(spec); + for (auto& extra : morePlugins) { + plugins.push_back(extra); + } + } + + PluginManager::loadFromPlugin(plugins, mFactory.capabilities); } void DataInputDescriptor::printOut() @@ -108,20 +129,22 @@ bool DataInputDescriptor::setFile(int counter) // open file auto filename = mfilenames[counter]->fileName; - if (mcurrentFile) { - if (mcurrentFile->GetName() == filename) { + auto rootFS = std::dynamic_pointer_cast(mCurrentFilesystem); + if (rootFS.get()) { + if (rootFS->GetFile()->GetName() == filename) { return true; } closeInputFile(); } - mcurrentFile = TFile::Open(filename.c_str()); - if (!mcurrentFile) { + + mCurrentFilesystem = std::make_shared(TFile::Open(filename.c_str()), 50 * 1024 * 1024, mFactory); + if (!mCurrentFilesystem.get()) { throw std::runtime_error(fmt::format("Couldn't open file \"{}\"!", filename)); } - mcurrentFile->SetReadaheadSize(50 * 1024 * 1024); + rootFS = std::dynamic_pointer_cast(mCurrentFilesystem); // get the parent file map if exists - mParentFileMap = (TMap*)mcurrentFile->Get("parentFiles"); // folder name (DF_XXX) --> parent file (absolute path) + mParentFileMap = (TMap*)rootFS->GetFile()->Get("parentFiles"); // folder name (DF_XXX) --> parent file (absolute path) if (mParentFileMap && !mParentFileReplacement.empty()) { auto pos = mParentFileReplacement.find(';'); if (pos == std::string::npos) { @@ -140,16 +163,28 @@ bool DataInputDescriptor::setFile(int counter) // get the directory names if (mfilenames[counter]->numberOfTimeFrames <= 0) { - std::regex TFRegex = std::regex("DF_[0-9]+"); - TList* keyList = mcurrentFile->GetListOfKeys(); + const std::regex TFRegex = std::regex("/?DF_([0-9]+)(|-.*)$"); + TList* keyList = rootFS->GetFile()->GetListOfKeys(); + std::vector finalList; // extract TF numbers and sort accordingly + // We use an extra seen set to make sure we preserve the order in which + // we instert things in the final list and to make sure we do not have duplicates. + // Multiple folder numbers can happen if we use a flat structure /DF_- + std::unordered_set seen; for (auto key : *keyList) { - if (std::regex_match(((TObjString*)key)->GetString().Data(), TFRegex)) { - auto folderNumber = std::stoul(std::string(((TObjString*)key)->GetString().Data()).substr(3)); - mfilenames[counter]->listOfTimeFrameNumbers.emplace_back(folderNumber); + std::smatch matchResult; + std::string keyName = ((TObjString*)key)->GetString().Data(); + bool match = std::regex_match(keyName, matchResult, TFRegex); + if (match) { + auto folderNumber = std::stoul(matchResult[1].str()); + if (seen.find(folderNumber) == seen.end()) { + seen.insert(folderNumber); + mfilenames[counter]->listOfTimeFrameNumbers.emplace_back(folderNumber); + } } } + if (mParentFileMap != nullptr) { // If we have a parent map, we should not process in DF alphabetical order but according to parent file to avoid swapping between files std::sort(mfilenames[counter]->listOfTimeFrameNumbers.begin(), mfilenames[counter]->listOfTimeFrameNumbers.end(), @@ -162,12 +197,8 @@ bool DataInputDescriptor::setFile(int counter) std::sort(mfilenames[counter]->listOfTimeFrameNumbers.begin(), mfilenames[counter]->listOfTimeFrameNumbers.end()); } - for (auto folderNumber : mfilenames[counter]->listOfTimeFrameNumbers) { - auto folderName = "DF_" + std::to_string(folderNumber); - mfilenames[counter]->listOfTimeFrameKeys.emplace_back(folderName); - mfilenames[counter]->alreadyRead.emplace_back(false); - } - mfilenames[counter]->numberOfTimeFrames = mfilenames[counter]->listOfTimeFrameKeys.size(); + mfilenames[counter]->alreadyRead.resize(mfilenames[counter]->alreadyRead.size() + mfilenames[counter]->listOfTimeFrameNumbers.size(), false); + mfilenames[counter]->numberOfTimeFrames = mfilenames[counter]->listOfTimeFrameNumbers.size(); } mCurrentFileID = counter; @@ -193,26 +224,21 @@ uint64_t DataInputDescriptor::getTimeFrameNumber(int counter, int numTF) return (mfilenames[counter]->listOfTimeFrameNumbers)[numTF]; } -FileAndFolder DataInputDescriptor::getFileFolder(int counter, int numTF) +arrow::dataset::FileSource DataInputDescriptor::getFileFolder(int counter, int numTF) { - FileAndFolder fileAndFolder; - // open file if (!setFile(counter)) { - return fileAndFolder; + return {}; } // no TF left if (mfilenames[counter]->numberOfTimeFrames > 0 && numTF >= mfilenames[counter]->numberOfTimeFrames) { - return fileAndFolder; + return {}; } - fileAndFolder.file = mcurrentFile; - fileAndFolder.folderName = (mfilenames[counter]->listOfTimeFrameKeys)[numTF]; - mfilenames[counter]->alreadyRead[numTF] = true; - return fileAndFolder; + return {fmt::format("DF_{}", mfilenames[counter]->listOfTimeFrameNumbers[numTF]), mCurrentFilesystem}; } DataInputDescriptor* DataInputDescriptor::getParentFile(int counter, int numTF, std::string treename) @@ -221,17 +247,19 @@ DataInputDescriptor* DataInputDescriptor::getParentFile(int counter, int numTF, // This file has no parent map return nullptr; } - auto folderName = (mfilenames[counter]->listOfTimeFrameKeys)[numTF]; + auto folderName = fmt::format("DF_{}", mfilenames[counter]->listOfTimeFrameNumbers[numTF]); auto parentFileName = (TObjString*)mParentFileMap->GetValue(folderName.c_str()); + // The current DF is not found in the parent map (this should not happen and is a fatal error) + auto rootFS = std::dynamic_pointer_cast(mCurrentFilesystem); if (!parentFileName) { - // The current DF is not found in the parent map (this should not happen and is a fatal error) - throw std::runtime_error(fmt::format(R"(parent file map exists but does not contain the current DF "{}" in file "{}")", folderName.c_str(), mcurrentFile->GetName())); + throw std::runtime_error(fmt::format(R"(parent file map exists but does not contain the current DF "{}" in file "{}")", folderName.c_str(), rootFS->GetFile()->GetName())); return nullptr; } if (mParentFile) { // Is this still the corresponding to the correct file? - if (parentFileName->GetString().CompareTo(mParentFile->mcurrentFile->GetName()) == 0) { + auto parentRootFS = std::dynamic_pointer_cast(mParentFile->mCurrentFilesystem); + if (parentFileName->GetString().CompareTo(parentRootFS->GetFile()->GetName()) == 0) { return mParentFile; } else { mParentFile->closeInputFile(); @@ -241,7 +269,8 @@ DataInputDescriptor* DataInputDescriptor::getParentFile(int counter, int numTF, } if (mLevel == mAllowedParentLevel) { - throw std::runtime_error(fmt::format(R"(while looking for tree "{}", the parent file was requested but we are already at level {} of maximal allowed level {} for DF "{}" in file "{}")", treename.c_str(), mLevel, mAllowedParentLevel, folderName.c_str(), mcurrentFile->GetName())); + throw std::runtime_error(fmt::format(R"(while looking for tree "{}", the parent file was requested but we are already at level {} of maximal allowed level {} for DF "{}" in file "{}")", treename.c_str(), mLevel, mAllowedParentLevel, folderName.c_str(), + rootFS->GetFile()->GetName())); } LOGP(info, "Opening parent file {} for DF {}", parentFileName->GetString().Data(), folderName.c_str()); @@ -270,11 +299,13 @@ void DataInputDescriptor::printFileStatistics() if (wait_time < 0) { wait_time = 0; } - std::string monitoringInfo(fmt::format("lfn={},size={},total_df={},read_df={},read_bytes={},read_calls={},io_time={:.1f},wait_time={:.1f},level={}", mcurrentFile->GetName(), - mcurrentFile->GetSize(), getTimeFramesInFile(mCurrentFileID), getReadTimeFramesInFile(mCurrentFileID), mcurrentFile->GetBytesRead(), mcurrentFile->GetReadCalls(), + auto rootFS = std::dynamic_pointer_cast(mCurrentFilesystem); + auto f = dynamic_cast(rootFS->GetFile()); + std::string monitoringInfo(fmt::format("lfn={},size={},total_df={},read_df={},read_bytes={},read_calls={},io_time={:.1f},wait_time={:.1f},level={}", f->GetName(), + f->GetSize(), getTimeFramesInFile(mCurrentFileID), getReadTimeFramesInFile(mCurrentFileID), f->GetBytesRead(), f->GetReadCalls(), ((float)mIOTime / 1e9), ((float)wait_time / 1e9), mLevel)); #if __has_include() - auto alienFile = dynamic_cast(mcurrentFile); + auto alienFile = dynamic_cast(f); if (alienFile) { monitoringInfo += fmt::format(",se={},open_time={:.1f}", alienFile->GetSE(), alienFile->GetElapsed()); } @@ -285,7 +316,7 @@ void DataInputDescriptor::printFileStatistics() void DataInputDescriptor::closeInputFile() { - if (mcurrentFile) { + if (mCurrentFilesystem.get()) { if (mParentFile) { mParentFile->closeInputFile(); delete mParentFile; @@ -296,9 +327,7 @@ void DataInputDescriptor::closeInputFile() mParentFileMap = nullptr; printFileStatistics(); - mcurrentFile->Close(); - delete mcurrentFile; - mcurrentFile = nullptr; + mCurrentFilesystem.reset(); } } @@ -346,8 +375,8 @@ int DataInputDescriptor::fillInputfiles() int DataInputDescriptor::findDFNumber(int file, std::string dfName) { - auto dfList = mfilenames[file]->listOfTimeFrameKeys; - auto it = std::find(dfList.begin(), dfList.end(), dfName); + auto dfList = mfilenames[file]->listOfTimeFrameNumbers; + auto it = std::find_if(dfList.begin(), dfList.end(), [dfName](size_t i) { return fmt::format("DF_{}", i) == dfName; }); if (it == dfList.end()) { return -1; } @@ -358,40 +387,76 @@ bool DataInputDescriptor::readTree(DataAllocator& outputs, header::DataHeader dh { auto ioStart = uv_hrtime(); - auto fileAndFolder = getFileFolder(counter, numTF); - if (!fileAndFolder.file) { + auto folder = getFileFolder(counter, numTF); + if (!folder.filesystem()) { return false; } - auto fullpath = fileAndFolder.folderName + "/" + treename; - auto tree = (TTree*)fileAndFolder.file->Get(fullpath.c_str()); + auto rootFS = std::dynamic_pointer_cast(folder.filesystem()); + + if (!rootFS) { + throw std::runtime_error(fmt::format(R"(Not a TFile filesystem!)")); + } + // FIXME: Ugly. We should detect the format from the treename, good enough for now. + std::shared_ptr format; + FragmentToBatch::StreamerCreator creator = nullptr; + + auto fullpath = arrow::dataset::FileSource{folder.path() + "/" + treename, folder.filesystem()}; + + for (auto& capability : mFactory.capabilities) { + auto objectPath = capability.lfn2objectPath(fullpath.path()); + void* handle = capability.getHandle(rootFS, objectPath); + if (handle) { + format = capability.factory().format(); + creator = capability.factory().deferredOutputStreamer; + break; + } + } + + if (!format) { + throw std::runtime_error(fmt::format(R"(Cannot find a viable format for object {}!)", fullpath.path())); + } + + auto schemaOpt = format->Inspect(fullpath); + auto physicalSchema = schemaOpt; + std::vector> fields; + for (auto& original : (*schemaOpt)->fields()) { + if (original->name().ends_with("_size")) { + continue; + } + fields.push_back(original); + } + auto datasetSchema = std::make_shared(fields); + + auto fragment = format->MakeFragment(fullpath, {}, *physicalSchema); - if (!tree) { - LOGP(debug, "Could not find tree {}. Trying in parent file.", fullpath.c_str()); + if (!fragment.ok()) { + LOGP(debug, "Could not find tree {}. Trying in parent file.", fullpath.path()); auto parentFile = getParentFile(counter, numTF, treename); if (parentFile != nullptr) { - int parentNumTF = parentFile->findDFNumber(0, fileAndFolder.folderName); + int parentNumTF = parentFile->findDFNumber(0, folder.path()); if (parentNumTF == -1) { - throw std::runtime_error(fmt::format(R"(DF {} listed in parent file map but not found in the corresponding file "{}")", fileAndFolder.folderName, parentFile->mcurrentFile->GetName())); + auto parentRootFS = std::dynamic_pointer_cast(parentFile->mCurrentFilesystem); + throw std::runtime_error(fmt::format(R"(DF {} listed in parent file map but not found in the corresponding file "{}")", folder.path(), parentRootFS->GetFile()->GetName())); } // first argument is 0 as the parent file object contains only 1 file return parentFile->readTree(outputs, dh, 0, parentNumTF, treename, totalSizeCompressed, totalSizeUncompressed); } - throw std::runtime_error(fmt::format(R"(Couldn't get TTree "{}" from "{}". Please check https://aliceo2group.github.io/analysis-framework/docs/troubleshooting/#tree-not-found for more information.)", fileAndFolder.folderName + "/" + treename, fileAndFolder.file->GetName())); + auto rootFS = std::dynamic_pointer_cast(mCurrentFilesystem); + throw std::runtime_error(fmt::format(R"(Couldn't get TTree "{}" from "{}". Please check https://aliceo2group.github.io/analysis-framework/docs/troubleshooting/#tree-not-found for more information.)", fullpath.path(), rootFS->GetFile()->GetName())); } // create table output auto o = Output(dh); - auto t2t = outputs.make(o); - // add branches to read - // fill the table - t2t->setLabel(tree->GetName()); - totalSizeCompressed += tree->GetZipBytes(); - totalSizeUncompressed += tree->GetTotBytes(); - t2t->addAllColumns(tree); - t2t->fill(tree); - delete tree; + // FIXME: This should allow me to create a memory pool + // which I can then use to scan the dataset. + auto f2b = outputs.make(o, creator, *fragment); + + //// add branches to read + //// fill the table + f2b->setLabel(treename.c_str()); + f2b->fill(datasetSchema, format); mIOTime += (uv_hrtime() - ioStart); @@ -693,7 +758,7 @@ DataInputDescriptor* DataInputDirector::getDataInputDescriptor(header::DataHeade return result; } -FileAndFolder DataInputDirector::getFileFolder(header::DataHeader dh, int counter, int numTF) +arrow::dataset::FileSource DataInputDirector::getFileFolder(header::DataHeader dh, int counter, int numTF) { auto didesc = getDataInputDescriptor(dh); // if NOT match then use defaultDataInputDescriptor diff --git a/Framework/AnalysisSupport/src/DataInputDirector.h b/Framework/AnalysisSupport/src/DataInputDirector.h index eca0ef195d111..9bab29db3ff24 100644 --- a/Framework/AnalysisSupport/src/DataInputDirector.h +++ b/Framework/AnalysisSupport/src/DataInputDirector.h @@ -15,6 +15,10 @@ #include "Framework/DataDescriptorMatcher.h" #include "Framework/DataAllocator.h" +#include "Framework/RootArrowFilesystem.h" + +#include +#include #include #include "rapidjson/fwd.h" @@ -31,16 +35,10 @@ struct FileNameHolder { std::string fileName; int numberOfTimeFrames = 0; std::vector listOfTimeFrameNumbers; - std::vector listOfTimeFrameKeys; std::vector alreadyRead; }; FileNameHolder* makeFileNameHolder(std::string fileName); -struct FileAndFolder { - TFile* file = nullptr; - std::string folderName = ""; -}; - class DataInputDescriptor { /// Holds information concerning the reading of an aod table. @@ -52,7 +50,6 @@ class DataInputDescriptor std::string treename = ""; std::unique_ptr matcher; - DataInputDescriptor() = default; DataInputDescriptor(bool alienSupport, int level, o2::monitoring::Monitoring* monitoring = nullptr, int allowedParentLevel = 0, std::string parentFileReplacement = ""); void printOut(); @@ -78,7 +75,7 @@ class DataInputDescriptor int findDFNumber(int file, std::string dfName); uint64_t getTimeFrameNumber(int counter, int numTF); - FileAndFolder getFileFolder(int counter, int numTF); + arrow::dataset::FileSource getFileFolder(int counter, int numTF); DataInputDescriptor* getParentFile(int counter, int numTF, std::string treename); int getTimeFramesInFile(int counter); int getReadTimeFramesInFile(int counter); @@ -90,6 +87,7 @@ class DataInputDescriptor bool isAlienSupportOn() { return mAlienSupport; } private: + o2::framework::RootObjectReadingFactory mFactory; std::string minputfilesFile = ""; std::string* minputfilesFilePtr = nullptr; std::string mFilenameRegex = ""; @@ -98,7 +96,7 @@ class DataInputDescriptor std::string mParentFileReplacement; std::vector mfilenames; std::vector* mdefaultFilenamesPtr = nullptr; - TFile* mcurrentFile = nullptr; + std::shared_ptr mCurrentFilesystem; int mCurrentFileID = -1; bool mAlienSupport = false; @@ -127,7 +125,6 @@ class DataInputDirector ~DataInputDirector(); void reset(); - void createDefaultDataInputDescriptor(); void printOut(); bool atEnd(int counter); @@ -140,10 +137,11 @@ class DataInputDirector // getters DataInputDescriptor* getDataInputDescriptor(header::DataHeader dh); int getNumberInputDescriptors() { return mdataInputDescriptors.size(); } + void createDefaultDataInputDescriptor(); bool readTree(DataAllocator& outputs, header::DataHeader dh, int counter, int numTF, size_t& totalSizeCompressed, size_t& totalSizeUncompressed); uint64_t getTimeFrameNumber(header::DataHeader dh, int counter, int numTF); - FileAndFolder getFileFolder(header::DataHeader dh, int counter, int numTF); + arrow::dataset::FileSource getFileFolder(header::DataHeader dh, int counter, int numTF); int getTimeFramesInFile(header::DataHeader dh, int counter); uint64_t getTotalSizeCompressed(); diff --git a/Framework/AnalysisSupport/src/Plugin.cxx b/Framework/AnalysisSupport/src/Plugin.cxx index e3a39761e8049..033adc461c600 100644 --- a/Framework/AnalysisSupport/src/Plugin.cxx +++ b/Framework/AnalysisSupport/src/Plugin.cxx @@ -85,20 +85,48 @@ std::vector getListOfTables(std::unique_ptr& f) { std::vector r; TList* keyList = f->GetListOfKeys(); + // We should handle two cases, one where the list of tables in a TDirectory, + // the other one where the dataframe number is just a prefix + std::string first = ""; for (auto key : *keyList) { - if (!std::string_view(key->GetName()).starts_with("DF_")) { + if (!std::string_view(key->GetName()).starts_with("DF_") && !std::string_view(key->GetName()).starts_with("/DF_")) { continue; } - auto* d = (TDirectory*)f->Get(key->GetName()); - TList* branchList = d->GetListOfKeys(); - for (auto b : *branchList) { - r.emplace_back(b->GetName()); + auto* d = (TDirectory*)f->GetObjectChecked(key->GetName(), TClass::GetClass("TDirectory")); + // Objects are in a folder, list it. + if (d) { + TList* branchList = d->GetListOfKeys(); + for (auto b : *branchList) { + r.emplace_back(b->GetName()); + } + break; + } + + void* v = f->GetObjectChecked(key->GetName(), TClass::GetClass("ROOT::Experimental::RNTuple")); + if (v) { + std::string s = key->GetName(); + size_t pos = s.find('-'); + // Check if '-' is found + // Skip metaData and parentFiles + if (pos == std::string::npos) { + continue; + } + std::string t = s.substr(pos + 1); + // If we find a duplicate table name, it means we are in the next DF and we can stop. + if (t == first) { + break; + } + if (first.empty()) { + first = t; + } + // Create a new string starting after the '-' + r.emplace_back(t); } - break; } return r; } + auto readMetadata(std::unique_ptr& currentFile) -> std::vector { // Get the metadata, if any diff --git a/Framework/AnalysisSupport/src/RNTuplePlugin.cxx b/Framework/AnalysisSupport/src/RNTuplePlugin.cxx index 51b585d0714bb..a910964e6527c 100644 --- a/Framework/AnalysisSupport/src/RNTuplePlugin.cxx +++ b/Framework/AnalysisSupport/src/RNTuplePlugin.cxx @@ -12,6 +12,7 @@ #include "Framework/RuntimeError.h" #include "Framework/RootArrowFilesystem.h" #include "Framework/Plugins.h" +#include "Framework/FairMQResizableBuffer.h" #include #include #include @@ -852,7 +853,10 @@ struct RNTupleObjectReadingImplementation : public RootArrowFactoryPlugin { return new RootArrowFactory{ .options = [context]() { return context->format->DefaultWriteOptions(); }, .format = [context]() { return context->format; }, - }; + .deferredOutputStreamer = [](std::shared_ptr fragment, const std::shared_ptr& buffer) -> std::shared_ptr { + auto treeFragment = std::dynamic_pointer_cast(fragment); + return std::make_shared(buffer); + }}; } }; diff --git a/Framework/AnalysisSupport/src/TTreePlugin.cxx b/Framework/AnalysisSupport/src/TTreePlugin.cxx index 4b130a2144253..5c51e47511cec 100644 --- a/Framework/AnalysisSupport/src/TTreePlugin.cxx +++ b/Framework/AnalysisSupport/src/TTreePlugin.cxx @@ -13,10 +13,15 @@ #include "Framework/Plugins.h" #include "Framework/Signpost.h" #include "Framework/Endian.h" +#include +#include +#include #include #include +#include #include #include +#include #include #include #include @@ -26,13 +31,269 @@ #include #include #include +#include +#include #include +#include +#include O2_DECLARE_DYNAMIC_LOG(root_arrow_fs); namespace o2::framework { +enum struct ReadOpKind { + Unknown, + Offsets, + Values, + Booleans, + VLA +}; + +struct ReadOps { + TBranch* branch = nullptr; + std::shared_ptr targetBuffer = nullptr; + int64_t rootBranchEntries = 0; + size_t typeSize = 0; + size_t listSize = 0; + // If this is an offset reading op, keep track of the actual + // range for the offsets, not only how many VLAs are there. + int64_t offsetCount = 0; + ReadOpKind kind = ReadOpKind::Unknown; +}; + +/// An OutputStream which does the reading of the input buffers directly +/// on writing, if needed. Each deferred operation is encoded in the source +/// buffer by an incremental number which can be used to lookup in the @a ops +/// vector the operation to perform. +class TTreeDeferredReadOutputStream : public arrow::io::OutputStream +{ + public: + explicit TTreeDeferredReadOutputStream(std::vector& ops, + const std::shared_ptr& buffer); + + /// \brief Create in-memory output stream with indicated capacity using a + /// memory pool + /// \param[in] initial_capacity the initial allocated internal capacity of + /// the OutputStream + /// \param[in,out] pool a MemoryPool to use for allocations + /// \return the created stream + static arrow::Result> Create( + std::vector& ops, + int64_t initial_capacity = 4096, + arrow::MemoryPool* pool = arrow::default_memory_pool()); + + // By the time we call the destructor, the contents + // of the buffer are already moved to fairmq + // for being sent. + ~TTreeDeferredReadOutputStream() override = default; + + // Implement the OutputStream interface + + /// Close the stream, preserving the buffer (retrieve it with Finish()). + arrow::Status Close() override; + [[nodiscard]] bool closed() const override; + [[nodiscard]] arrow::Result Tell() const override; + arrow::Status Write(const void* data, int64_t nbytes) override; + + /// \cond FALSE + using OutputStream::Write; + /// \endcond + + /// Close the stream and return the buffer + arrow::Result> Finish(); + + /// \brief Initialize state of OutputStream with newly allocated memory and + /// set position to 0 + /// \param[in] initial_capacity the starting allocated capacity + /// \param[in,out] pool the memory pool to use for allocations + /// \return Status + arrow::Status Reset(std::vector ops, + int64_t initial_capacity, arrow::MemoryPool* pool); + + [[nodiscard]] int64_t capacity() const { return capacity_; } + + private: + TTreeDeferredReadOutputStream(); + std::vector ops_; + + // Ensures there is sufficient space available to write nbytes + arrow::Status Reserve(int64_t nbytes); + + std::shared_ptr buffer_; + bool is_open_; + int64_t capacity_; + int64_t position_; + uint8_t* mutable_data_; +}; + +static constexpr int64_t kBufferMinimumSize = 256; + +TTreeDeferredReadOutputStream::TTreeDeferredReadOutputStream() + : is_open_(false), capacity_(0), position_(0), mutable_data_(nullptr) {} + +TTreeDeferredReadOutputStream::TTreeDeferredReadOutputStream(std::vector& ops, + const std::shared_ptr& buffer) + : ops_(ops), + buffer_(buffer), + is_open_(true), + capacity_(buffer->size()), + position_(0), + mutable_data_(buffer->mutable_data()) {} + +arrow::Result> TTreeDeferredReadOutputStream::Create( + std::vector& ops, + int64_t initial_capacity, arrow::MemoryPool* pool) +{ + // ctor is private, so cannot use make_shared + auto ptr = std::shared_ptr(new TTreeDeferredReadOutputStream); + RETURN_NOT_OK(ptr->Reset(ops, initial_capacity, pool)); + return ptr; +} + +arrow::Status TTreeDeferredReadOutputStream::Reset(std::vector ops, + int64_t initial_capacity, arrow::MemoryPool* pool) +{ + ARROW_ASSIGN_OR_RAISE(buffer_, AllocateResizableBuffer(initial_capacity, pool)); + ops_ = ops; + is_open_ = true; + capacity_ = initial_capacity; + position_ = 0; + mutable_data_ = buffer_->mutable_data(); + return arrow::Status::OK(); +} + +arrow::Status TTreeDeferredReadOutputStream::Close() +{ + if (is_open_) { + is_open_ = false; + if (position_ < capacity_) { + RETURN_NOT_OK(buffer_->Resize(position_, false)); + } + } + return arrow::Status::OK(); +} + +bool TTreeDeferredReadOutputStream::closed() const { return !is_open_; } + +arrow::Result> TTreeDeferredReadOutputStream::Finish() +{ + RETURN_NOT_OK(Close()); + buffer_->ZeroPadding(); + is_open_ = false; + return std::move(buffer_); +} + +arrow::Result TTreeDeferredReadOutputStream::Tell() const { return position_; } + +auto readValues = [](uint8_t* target, ReadOps& op, TBufferFile& rootBuffer) { + int readEntries = 0; + rootBuffer.Reset(); + while (readEntries < op.rootBranchEntries) { + auto readLast = op.branch->GetBulkRead().GetEntriesSerialized(readEntries, rootBuffer); + int size = readLast * op.listSize; + readEntries += readLast; + swapCopy(target, rootBuffer.GetCurrent(), size, op.typeSize); + target += (ptrdiff_t)(size * op.typeSize); + } +}; + +auto readBoolValues = [](uint8_t* target, ReadOps& op, TBufferFile& rootBuffer) { + int readEntries = 0; + rootBuffer.Reset(); + // Set to 0 + memset(target, 0, op.targetBuffer->size()); + int readLast = 0; + while (readEntries < op.rootBranchEntries) { + auto beginValue = readLast; + auto readLast = op.branch->GetBulkRead().GetBulkEntries(readEntries, rootBuffer); + int size = readLast * op.listSize; + readEntries += readLast; + for (int i = beginValue; i < beginValue + size; ++i) { + auto value = static_cast(rootBuffer.GetCurrent()[i - beginValue] << (i % 8)); + target[i / 8] |= value; + } + } +}; + +auto readVLAValues = [](uint8_t* target, ReadOps& op, ReadOps const& offsetOp, TBufferFile& rootBuffer) { + int readEntries = 0; + auto* tPtrOffset = reinterpret_cast(offsetOp.targetBuffer->data()); + std::span const offsets{tPtrOffset, tPtrOffset + offsetOp.rootBranchEntries + 1}; + + rootBuffer.Reset(); + while (readEntries < op.rootBranchEntries) { + auto readLast = op.branch->GetBulkRead().GetEntriesSerialized(readEntries, rootBuffer); + int size = offsets[readEntries + readLast] - offsets[readEntries]; + readEntries += readLast; + swapCopy(target, rootBuffer.GetCurrent(), size, op.typeSize); + target += (ptrdiff_t)(size * op.typeSize); + } +}; + +arrow::Status TTreeDeferredReadOutputStream::Write(const void* data, int64_t nbytes) +{ + if (ARROW_PREDICT_FALSE(!is_open_)) { + return arrow::Status::IOError("OutputStream is closed"); + } + if (ARROW_PREDICT_TRUE(nbytes == 0)) { + return arrow::Status::OK(); + } + if (ARROW_PREDICT_FALSE(position_ + nbytes >= capacity_)) { + RETURN_NOT_OK(Reserve(nbytes)); + } + // This is a real address which needs to be copied. Do it! + auto ref = (int64_t)data; + if (ref >= ops_.size()) { + memcpy(mutable_data_ + position_, data, nbytes); + position_ += nbytes; + return arrow::Status::OK(); + } + auto& op = ops_[ref]; + static TBufferFile rootBuffer{TBuffer::EMode::kWrite, 4 * 1024 * 1024}; + + switch (op.kind) { + // Offsets need to be read in advance because we need to know + // how many elements are there in total (since TTree does not allow discovering such informantion) + case ReadOpKind::Offsets: + break; + case ReadOpKind::Values: + readValues(mutable_data_ + position_, op, rootBuffer); + break; + case ReadOpKind::VLA: + readVLAValues(mutable_data_ + position_, op, ops_[ref - 1], rootBuffer); + break; + case ReadOpKind::Booleans: + readBoolValues(mutable_data_ + position_, op, rootBuffer); + break; + case ReadOpKind::Unknown: + throw runtime_error("Unknown Op"); + } + op.branch->SetStatus(false); + op.branch->DropBaskets("all"); + op.branch->Reset(); + op.branch->GetTransientBuffer(0)->Expand(0); + + position_ += nbytes; + return arrow::Status::OK(); +} + +arrow::Status TTreeDeferredReadOutputStream::Reserve(int64_t nbytes) +{ + // Always overallocate by doubling. It seems that it is a better growth + // strategy, at least for memory_benchmark.cc. + // This may be because it helps match the allocator's allocation buckets + // more exactly. Or perhaps it hits a sweet spot in jemalloc. + int64_t new_capacity = std::max(kBufferMinimumSize, capacity_); + new_capacity = position_ + nbytes; + if (new_capacity > capacity_) { + RETURN_NOT_OK(buffer_->Resize(new_capacity)); + capacity_ = new_capacity; + mutable_data_ = buffer_->mutable_data(); + } + return arrow::Status::OK(); +} + class TTreeFileWriteOptions : public arrow::dataset::FileWriteOptions { public: @@ -174,8 +435,21 @@ class TTreeFileFragment : public arrow::dataset::FileFragment return mTree.get(); } + std::vector& ops() + { + return mOps; + } + + /// The pointer to each allocation is an incremental number, indexing a collection to track + /// the size of each allocation. + std::shared_ptr GetPlaceholderForOp(size_t size) + { + return std::make_shared((uint8_t*)(mOps.size() - 1), size); + } + private: std::unique_ptr mTree; + std::vector mOps; }; // An arrow outputstream which allows to write to a TTree. Eventually @@ -246,6 +520,9 @@ bool TTreeOutputStream::closed() const TBranch* TTreeOutputStream::CreateBranch(char const* branchName, char const* sizeBranch) { + if (mBranchPrefix.empty() == true) { + return mTree->Branch(branchName, (char*)nullptr, sizeBranch); + } return mTree->Branch((mBranchPrefix + "/" + branchName).c_str(), (char*)nullptr, (mBranchPrefix + sizeBranch).c_str()); } @@ -263,7 +540,10 @@ struct TTreeObjectReadingImplementation : public RootArrowFactoryPlugin { return new RootArrowFactory{ .options = [context]() { return context->format->DefaultWriteOptions(); }, .format = [context]() { return context->format; }, - }; + .deferredOutputStreamer = [](std::shared_ptr fragment, const std::shared_ptr& buffer) -> std::shared_ptr { + auto treeFragment = std::dynamic_pointer_cast(fragment); + return std::make_shared(treeFragment->ops(), buffer); + }}; } }; @@ -273,10 +553,36 @@ struct BranchFieldMapping { int datasetFieldIdx; }; +auto readOffsets = [](ReadOps& op, TBufferFile& rootBuffer) { + uint32_t offset = 0; + std::span offsets; + int readEntries = 0; + int count = 0; + auto* tPtrOffset = reinterpret_cast(op.targetBuffer->mutable_data()); + offsets = std::span{tPtrOffset, tPtrOffset + op.rootBranchEntries + 1}; + + // read sizes first + rootBuffer.Reset(); + while (readEntries < op.rootBranchEntries) { + auto readLast = op.branch->GetBulkRead().GetEntriesSerialized(readEntries, rootBuffer); + if (readLast == -1) { + throw runtime_error_f("Unable to read from branch %s.", op.branch->GetName()); + } + readEntries += readLast; + for (auto i = 0; i < readLast; ++i) { + offsets[count++] = (int)offset; + offset += swap32_(reinterpret_cast(rootBuffer.GetCurrent())[i]); + } + } + offsets[count] = (int)offset; + op.offsetCount = offset; +}; + arrow::Result TTreeFileFormat::ScanBatchesAsync( const std::shared_ptr& options, const std::shared_ptr& fragment) const { + assert(options->dataset_schema != nullptr); // This is the schema we want to read auto dataset_schema = options->dataset_schema; auto treeFragment = std::dynamic_pointer_cast(fragment); @@ -286,6 +592,8 @@ arrow::Result TTreeFileFormat::ScanBatchesAsync( auto generator = [pool = options->pool, treeFragment, dataset_schema, &totalCompressedSize = mTotCompressedSize, &totalUncompressedSize = mTotUncompressedSize]() -> arrow::Future> { + O2_SIGNPOST_ID_FROM_POINTER(tid, root_arrow_fs, treeFragment->GetTree()); + O2_SIGNPOST_START(root_arrow_fs, tid, "Generator", "Creating batch for tree %{public}s", treeFragment->GetTree()->GetName()); std::vector> columns; std::vector> fields = dataset_schema->fields(); auto physical_schema = *treeFragment->ReadPhysicalSchema(); @@ -297,201 +605,171 @@ arrow::Result TTreeFileFormat::ScanBatchesAsync( // Register physical fields into the cache std::vector mappings; + // We need to count the number of readops to avoid moving the vector. + int opsCount = 0; for (int fi = 0; fi < dataset_schema->num_fields(); ++fi) { auto dataset_field = dataset_schema->field(fi); + // This is needed because for now the dataset_field + // is actually the schema of the ttree + O2_SIGNPOST_EVENT_EMIT(root_arrow_fs, tid, "Generator", "Processing dataset field %{public}s.", dataset_field->name().c_str()); int physicalFieldIdx = physical_schema->GetFieldIndex(dataset_field->name()); if (physicalFieldIdx < 0) { - throw runtime_error_f("Cannot find physical field associated to %s", dataset_field->name().c_str()); + throw runtime_error_f("Cannot find physical field associated to %s. Possible fields: %s", + dataset_field->name().c_str(), physical_schema->ToString().c_str()); } if (physicalFieldIdx > 1 && physical_schema->field(physicalFieldIdx - 1)->name().ends_with("_size")) { + O2_SIGNPOST_EVENT_EMIT(root_arrow_fs, tid, "Generator", "Field %{public}s has sizes in %{public}s.", dataset_field->name().c_str(), + physical_schema->field(physicalFieldIdx - 1)->name().c_str()); mappings.push_back({physicalFieldIdx, physicalFieldIdx - 1, fi}); + opsCount += 2; } else { mappings.push_back({physicalFieldIdx, -1, fi}); + opsCount++; } } auto* tree = treeFragment->GetTree(); - tree->SetCacheSize(25000000); auto branches = tree->GetListOfBranches(); + size_t totalTreeSize = 0; + std::vector selectedBranches; for (auto& mapping : mappings) { - tree->AddBranchToCache((TBranch*)branches->At(mapping.mainBranchIdx), false); + selectedBranches.push_back((TBranch*)branches->At(mapping.mainBranchIdx)); + O2_SIGNPOST_EVENT_EMIT(root_arrow_fs, tid, "Generator", "Adding branch %{public}s to stream.", selectedBranches.back()->GetName()); + totalTreeSize += selectedBranches.back()->GetTotalSize(); if (mapping.vlaIdx != -1) { - tree->AddBranchToCache((TBranch*)branches->At(mapping.vlaIdx), false); + selectedBranches.push_back((TBranch*)branches->At(mapping.vlaIdx)); + O2_SIGNPOST_EVENT_EMIT(root_arrow_fs, tid, "Generator", "Adding branch %{public}s to stream.", selectedBranches.back()->GetName()); + totalTreeSize += selectedBranches.back()->GetTotalSize(); } } - tree->StopCacheLearningPhase(); - static TBufferFile buffer{TBuffer::EMode::kWrite, 4 * 1024 * 1024}; + size_t cacheSize = std::max(std::min(totalTreeSize, 25000000UL), 1000000UL); + O2_SIGNPOST_EVENT_EMIT(root_arrow_fs, tid, "Generator", "Resizing cache to %zu.", cacheSize); + tree->SetCacheSize(cacheSize); + for (auto* branch : selectedBranches) { + tree->AddBranchToCache(branch, false); + } + tree->StopCacheLearningPhase(); - int64_t rows = -1; + // Intermediate buffer to bulk read. Two for now + static TBufferFile rootBuffer{TBuffer::EMode::kWrite, 4 * 1024 * 1024}; + std::vector& ops = treeFragment->ops(); + ops.clear(); + ops.reserve(opsCount); for (size_t mi = 0; mi < mappings.size(); ++mi) { BranchFieldMapping mapping = mappings[mi]; // The field actually on disk auto datasetField = dataset_schema->field(mapping.datasetFieldIdx); auto physicalField = physical_schema->field(mapping.mainBranchIdx); - auto* branch = (TBranch*)branches->At(mapping.mainBranchIdx); - assert(branch); - buffer.Reset(); - auto totalEntries = branch->GetEntries(); - if (rows == -1) { - rows = totalEntries; + + if (mapping.vlaIdx != -1) { + auto* branch = (TBranch*)branches->At(mapping.vlaIdx); + ops.emplace_back(ReadOps{ + .branch = branch, + .rootBranchEntries = branch->GetEntries(), + .typeSize = 4, + .listSize = 1, + .kind = ReadOpKind::Offsets, + }); + auto& op = ops.back(); + ARROW_ASSIGN_OR_RAISE(op.targetBuffer, arrow::AllocateBuffer((op.rootBranchEntries + 1) * op.typeSize, pool)); + // Offsets need to be read immediately to know how many values are there + readOffsets(op, rootBuffer); } - if (rows != totalEntries) { - throw runtime_error_f("Unmatching number of rows for branch %s", branch->GetName()); + ops.push_back({}); + auto& valueOp = ops.back(); + valueOp.branch = (TBranch*)branches->At(mapping.mainBranchIdx); + valueOp.rootBranchEntries = valueOp.branch->GetEntries(); + // In case this is a vla, we set the offsetCount as totalEntries + // In case we read booleans we need a special coversion from bytes to bits. + auto listType = std::dynamic_pointer_cast(datasetField->type()); + valueOp.typeSize = physicalField->type()->byte_width(); + // Notice how we are not (yet) allocating buffers at this point. We merely + // create placeholders to subsequently fill. + if ((datasetField->type() == arrow::boolean())) { + valueOp.kind = ReadOpKind::Booleans; + valueOp.listSize = 1; + valueOp.targetBuffer = treeFragment->GetPlaceholderForOp((valueOp.rootBranchEntries) / 8 + 1); + } else if (listType && datasetField->type()->field(0)->type() == arrow::boolean()) { + valueOp.typeSize = physicalField->type()->field(0)->type()->byte_width(); + valueOp.listSize = listType->list_size(); + valueOp.kind = ReadOpKind::Booleans; + valueOp.targetBuffer = treeFragment->GetPlaceholderForOp((valueOp.rootBranchEntries * valueOp.listSize) / 8 + 1); + } else if (mapping.vlaIdx != -1) { + valueOp.typeSize = physicalField->type()->field(0)->type()->byte_width(); + valueOp.listSize = -1; + // -1 is the current one, -2 is the one with for the offsets + valueOp.kind = ReadOpKind::VLA; + valueOp.targetBuffer = treeFragment->GetPlaceholderForOp(ops[ops.size() - 2].offsetCount * valueOp.typeSize); + } else if (listType) { + valueOp.kind = ReadOpKind::Values; + valueOp.listSize = listType->list_size(); + valueOp.typeSize = physicalField->type()->field(0)->type()->byte_width(); + valueOp.targetBuffer = treeFragment->GetPlaceholderForOp(valueOp.rootBranchEntries * valueOp.typeSize * valueOp.listSize); + } else { + valueOp.typeSize = physicalField->type()->byte_width(); + valueOp.kind = ReadOpKind::Values; + valueOp.listSize = 1; + valueOp.targetBuffer = treeFragment->GetPlaceholderForOp(valueOp.rootBranchEntries * valueOp.typeSize); } arrow::Status status; - int readEntries = 0; std::shared_ptr array; - auto listType = std::dynamic_pointer_cast(datasetField->type()); - if (datasetField->type() == arrow::boolean() || - (listType && datasetField->type()->field(0)->type() == arrow::boolean())) { - if (listType) { - std::unique_ptr builder = nullptr; - auto status = arrow::MakeBuilder(pool, datasetField->type()->field(0)->type(), &builder); - if (!status.ok()) { - throw runtime_error("Cannot create value builder"); - } - auto listBuilder = std::make_unique(pool, std::move(builder), listType->list_size()); - auto valueBuilder = listBuilder.get()->value_builder(); - // boolean array special case: we need to use builder to create the bitmap - status = valueBuilder->Reserve(totalEntries * listType->list_size()); - status &= listBuilder->Reserve(totalEntries); - if (!status.ok()) { - throw runtime_error("Failed to reserve memory for array builder"); - } - while (readEntries < totalEntries) { - auto readLast = branch->GetBulkRead().GetBulkEntries(readEntries, buffer); - readEntries += readLast; - status &= static_cast(valueBuilder)->AppendValues(reinterpret_cast(buffer.GetCurrent()), readLast * listType->list_size()); - } - status &= static_cast(listBuilder.get())->AppendValues(readEntries); - if (!status.ok()) { - throw runtime_error("Failed to append values to array"); - } - status &= listBuilder->Finish(&array); - if (!status.ok()) { - throw runtime_error("Failed to create array"); - } - } else if (listType == nullptr) { - std::unique_ptr builder = nullptr; - auto status = arrow::MakeBuilder(pool, datasetField->type(), &builder); - if (!status.ok()) { - throw runtime_error("Cannot create builder"); - } - auto valueBuilder = static_cast(builder.get()); - // boolean array special case: we need to use builder to create the bitmap - status = valueBuilder->Reserve(totalEntries); - if (!status.ok()) { - throw runtime_error("Failed to reserve memory for array builder"); - } - while (readEntries < totalEntries) { - auto readLast = branch->GetBulkRead().GetBulkEntries(readEntries, buffer); - readEntries += readLast; - status &= valueBuilder->AppendValues(reinterpret_cast(buffer.GetCurrent()), readLast); - } - if (!status.ok()) { - throw runtime_error("Failed to append values to array"); - } - status &= valueBuilder->Finish(&array); - if (!status.ok()) { - throw runtime_error("Failed to create array"); - } - } - } else { - // This is needed for branches which have not been persisted. - auto bytes = branch->GetTotBytes(); - auto branchSize = bytes ? bytes : 1000000; - auto&& result = arrow::AllocateResizableBuffer(branchSize, pool); - if (!result.ok()) { - throw runtime_error("Cannot allocate values buffer"); - } - std::shared_ptr arrowValuesBuffer = result.MoveValueUnsafe(); - auto ptr = arrowValuesBuffer->mutable_data(); - if (ptr == nullptr) { - throw runtime_error("Invalid buffer"); - } - - std::unique_ptr offsetBuffer = nullptr; - - uint32_t offset = 0; - int count = 0; - std::shared_ptr arrowOffsetBuffer; - std::span offsets; - int size = 0; - uint32_t totalSize = 0; - if (mapping.vlaIdx != -1) { - auto* mSizeBranch = (TBranch*)branches->At(mapping.vlaIdx); - offsetBuffer = std::make_unique(TBuffer::EMode::kWrite, 4 * 1024 * 1024); - result = arrow::AllocateResizableBuffer((totalEntries + 1) * (int64_t)sizeof(int), pool); - if (!result.ok()) { - throw runtime_error("Cannot allocate offset buffer"); - } - arrowOffsetBuffer = result.MoveValueUnsafe(); - unsigned char* ptrOffset = arrowOffsetBuffer->mutable_data(); - auto* tPtrOffset = reinterpret_cast(ptrOffset); - offsets = std::span{tPtrOffset, tPtrOffset + totalEntries + 1}; - - // read sizes first - while (readEntries < totalEntries) { - auto readLast = mSizeBranch->GetBulkRead().GetEntriesSerialized(readEntries, *offsetBuffer); - readEntries += readLast; - for (auto i = 0; i < readLast; ++i) { - offsets[count++] = (int)offset; - offset += swap32_(reinterpret_cast(offsetBuffer->GetCurrent())[i]); - } - } - offsets[count] = (int)offset; - totalSize = offset; - readEntries = 0; - } - - int typeSize = physicalField->type()->byte_width(); - int64_t listSize = 1; - if (auto fixedSizeList = std::dynamic_pointer_cast(datasetField->type())) { - listSize = fixedSizeList->list_size(); - typeSize = physicalField->type()->field(0)->type()->byte_width(); - } else if (mapping.vlaIdx != -1) { - typeSize = physicalField->type()->field(0)->type()->byte_width(); - listSize = -1; - } - while (readEntries < totalEntries) { - auto readLast = branch->GetBulkRead().GetEntriesSerialized(readEntries, buffer); - if (mapping.vlaIdx != -1) { - size = offsets[readEntries + readLast] - offsets[readEntries]; - } else { - size = readLast * listSize; - } - readEntries += readLast; - swapCopy(ptr, buffer.GetCurrent(), size, typeSize); - ptr += (ptrdiff_t)(size * typeSize); - } - if (listSize >= 1) { - totalSize = readEntries * listSize; - } - if (listSize == 1) { - array = std::make_shared(datasetField->type(), readEntries, arrowValuesBuffer); - } else { - auto varray = std::make_shared(datasetField->type()->field(0)->type(), totalSize, arrowValuesBuffer); - if (mapping.vlaIdx != -1) { - array = std::make_shared(datasetField->type(), readEntries, arrowOffsetBuffer, varray); - } else { - array = std::make_shared(datasetField->type(), readEntries, varray); - } - } + if (listType) { + auto varray = std::make_shared(datasetField->type()->field(0)->type(), valueOp.rootBranchEntries * valueOp.listSize, valueOp.targetBuffer); + array = std::make_shared(datasetField->type(), valueOp.rootBranchEntries, varray); + // This is a vla, there is also an offset op + O2_SIGNPOST_EVENT_EMIT(root_arrow_fs, tid, "Op", "Created op for branch %{public}s with %lli entries, size of the buffer %lli.", + valueOp.branch->GetName(), + valueOp.rootBranchEntries, + valueOp.targetBuffer->size()); + } else if (mapping.vlaIdx != -1) { + auto& offsetOp = ops[ops.size() - 2]; + auto varray = std::make_shared(datasetField->type()->field(0)->type(), offsetOp.offsetCount, valueOp.targetBuffer); + // We have pushed an offset op if this was the case. + array = std::make_shared(datasetField->type(), offsetOp.rootBranchEntries, offsetOp.targetBuffer, varray); + O2_SIGNPOST_EVENT_EMIT(root_arrow_fs, tid, "Op", "Created op for branch %{public}s with %lli entries, size of the buffer %lli.", + offsetOp.branch->GetName(), offsetOp.rootBranchEntries, offsetOp.targetBuffer->size()); + O2_SIGNPOST_EVENT_EMIT(root_arrow_fs, tid, "Op", "Created op for branch %{public}s with %lli entries, size of the buffer %lli.", + valueOp.branch->GetName(), + offsetOp.offsetCount, + valueOp.targetBuffer->size()); + } else { + array = std::make_shared(datasetField->type(), valueOp.rootBranchEntries, valueOp.targetBuffer); + O2_SIGNPOST_EVENT_EMIT(root_arrow_fs, tid, "Op", "Created op for branch %{public}s with %lli entries, size of the buffer %lli.", + valueOp.branch->GetName(), + valueOp.rootBranchEntries, + valueOp.targetBuffer->size()); } - branch->SetStatus(false); - branch->DropBaskets("all"); - branch->Reset(); - branch->GetTransientBuffer(0)->Expand(0); - columns.push_back(array); } + + // Do the actual filling of the buffers. This happens after we have created the whole structure + // so that we can read directly in shared memory. + int64_t rows = -1; + for (size_t i = 0; i < ops.size(); ++i) { + auto& op = ops[i]; + if (rows == -1 && op.kind != ReadOpKind::VLA) { + rows = op.rootBranchEntries; + } + if (rows == -1 && op.kind == ReadOpKind::VLA) { + auto& offsetOp = ops[i - 1]; + rows = offsetOp.rootBranchEntries; + } + if (op.kind != ReadOpKind::VLA && rows != op.rootBranchEntries) { + throw runtime_error_f("Unmatching number of rows for branch %s. Expected %lli, found %lli", op.branch->GetName(), rows, op.rootBranchEntries); + } + if (op.kind == ReadOpKind::VLA && rows != ops[i - 1].rootBranchEntries) { + throw runtime_error_f("Unmatching number of rows for branch %s. Expected %lli, found %lli", op.branch->GetName(), rows, ops[i - 1].offsetCount); + } + } + auto batch = arrow::RecordBatch::Make(dataset_schema, rows, columns); totalCompressedSize += tree->GetZipBytes(); totalUncompressedSize += tree->GetTotBytes(); + O2_SIGNPOST_END(root_arrow_fs, tid, "Generator", "Done creating batch compressed:%zu uncompressed:%zu", totalCompressedSize, totalUncompressedSize); return batch; }; return generator; @@ -817,11 +1095,31 @@ class TTreeFileWriter : public arrow::dataset::FileWriter switch (field->type()->id()) { case arrow::Type::FIXED_SIZE_LIST: { auto list = std::static_pointer_cast(column); - valueArrays.back() = list->values(); + if (list->list_type()->field(0)->type()->id() == arrow::Type::BOOL) { + int64_t length = list->length() * list->list_type()->list_size(); + arrow::UInt8Builder builder; + auto ok = builder.Reserve(length); + // I need to build an array of uint8_t for the conversion to ROOT which uses + // bytes for boolans. + auto boolArray = std::static_pointer_cast(list->values()); + for (int64_t i = 0; i < length; ++i) { + if (boolArray->IsValid(i)) { + // Expand each boolean value (true/false) to uint8 (1/0) + uint8_t value = boolArray->Value(i) ? 1 : 0; + auto ok = builder.Append(value); + } else { + // Append null for invalid entries + auto ok = builder.AppendNull(); + } + } + valueArrays.back() = *builder.Finish(); + } else { + valueArrays.back() = list->values(); + } } break; case arrow::Type::LIST: { auto list = std::static_pointer_cast(column); - valueArrays.back() = list; + valueArrays.back() = list->values(); } break; case arrow::Type::BOOL: { // In case of arrays of booleans, we need to go back to their @@ -867,11 +1165,12 @@ class TTreeFileWriter : public arrow::dataset::FileWriter uint8_t const* buffer = std::static_pointer_cast(valueArray)->values()->data() + array->offset() + list->value_offset(pos) * valueType->byte_width(); branch->SetAddress((void*)buffer); sizeBranch->SetAddress(&listSize); - }; - break; + } break; case arrow::Type::FIXED_SIZE_LIST: default: { - uint8_t const* buffer = std::static_pointer_cast(valueArray)->values()->data() + array->offset() + pos * listSize * valueType->byte_width(); + // needed for the boolean case, I should probably cache this. + auto byteWidth = valueType->byte_width() ? valueType->byte_width() : 1; + uint8_t const* buffer = std::static_pointer_cast(valueArray)->values()->data() + array->offset() + pos * listSize * byteWidth; branch->SetAddress((void*)buffer); }; } diff --git a/Framework/Core/include/Framework/RootArrowFilesystem.h b/Framework/Core/include/Framework/RootArrowFilesystem.h index 441b43aeca331..5aceaed077001 100644 --- a/Framework/Core/include/Framework/RootArrowFilesystem.h +++ b/Framework/Core/include/Framework/RootArrowFilesystem.h @@ -12,6 +12,7 @@ #define O2_FRAMEWORK_ROOT_ARROW_FILESYSTEM_H_ #include +#include #include #include #include @@ -96,6 +97,9 @@ class VirtualRootFileSystemBase : public arrow::fs::FileSystem struct RootArrowFactory final { std::function()> options = nullptr; std::function()> format = nullptr; + // Builds an output streamer which is able to read from the source fragment + // in a deferred way. + std::function(std::shared_ptr, const std::shared_ptr& buffer)> deferredOutputStreamer = nullptr; }; struct RootArrowFactoryPlugin { @@ -144,6 +148,8 @@ class TFileFileSystem : public VirtualRootFileSystemBase TFileFileSystem(TDirectoryFile* f, size_t readahead, RootObjectReadingFactory&); + ~TFileFileSystem() override; + std::string type_name() const override { return "TDirectoryFile"; diff --git a/Framework/Core/include/Framework/TableTreeHelpers.h b/Framework/Core/include/Framework/TableTreeHelpers.h index c6a769e579fb7..92725d186ee33 100644 --- a/Framework/Core/include/Framework/TableTreeHelpers.h +++ b/Framework/Core/include/Framework/TableTreeHelpers.h @@ -11,6 +11,8 @@ #ifndef O2_FRAMEWORK_TABLETREEHELPERS_H_ #define O2_FRAMEWORK_TABLETREEHELPERS_H_ +#include +#include #include #include "TFile.h" #include "TTreeReader.h" @@ -146,15 +148,25 @@ class TreeToTable class FragmentToBatch { public: - FragmentToBatch(arrow::MemoryPool* pool = arrow::default_memory_pool()); + // The function to be used to create the required stream. + using StreamerCreator = std::function(std::shared_ptr, const std::shared_ptr& buffer)>; + + FragmentToBatch(StreamerCreator, std::shared_ptr, arrow::MemoryPool* pool = arrow::default_memory_pool()); void setLabel(const char* label); - void fill(std::shared_ptr, std::shared_ptr dataSetSchema, std::shared_ptr); + void fill(std::shared_ptr dataSetSchema, std::shared_ptr); std::shared_ptr finalize(); + std::shared_ptr streamer(std::shared_ptr buffer) + { + return mCreator(mFragment, buffer); + } + private: + std::shared_ptr mFragment; arrow::MemoryPool* mArrowMemoryPool = nullptr; std::string mTableLabel; std::shared_ptr mRecordBatch; + StreamerCreator mCreator; }; // ----------------------------------------------------------------------------- diff --git a/Framework/Core/src/DataAllocator.cxx b/Framework/Core/src/DataAllocator.cxx index c310892c4c490..b735eee1f3308 100644 --- a/Framework/Core/src/DataAllocator.cxx +++ b/Framework/Core/src/DataAllocator.cxx @@ -211,34 +211,6 @@ void doWriteTable(std::shared_ptr b, arrow::Table* table) } } -void doWriteBatch(std::shared_ptr b, arrow::RecordBatch* batch) -{ - auto mock = std::make_shared(); - int64_t expectedSize = 0; - auto mockWriter = arrow::ipc::MakeStreamWriter(mock.get(), batch->schema()); - arrow::Status outStatus = mockWriter.ValueOrDie()->WriteRecordBatch(*batch); - - expectedSize = mock->Tell().ValueOrDie(); - auto reserve = b->Reserve(expectedSize); - if (reserve.ok() == false) { - throw std::runtime_error("Unable to reserve memory for table"); - } - - auto stream = std::make_shared(b); - // This is a copy maybe we can finally get rid of it by having using the - // dataset API? - auto outBatch = arrow::ipc::MakeStreamWriter(stream.get(), batch->schema()); - if (outBatch.ok() == false) { - throw ::std::runtime_error("Unable to create batch writer"); - } - - outStatus = outBatch.ValueOrDie()->WriteRecordBatch(*batch); - - if (outStatus.ok() == false) { - throw std::runtime_error("Unable to Write batch"); - } -} - void DataAllocator::adopt(const Output& spec, LifetimeHolder& tb) { auto& timingInfo = mRegistry.get(); @@ -318,16 +290,35 @@ void DataAllocator::adopt(const Output& spec, LifetimeHolder& f // Serialization happens in here, so that we can // get rid of the intermediate tree 2 table object, saving memory. auto batch = source.finalize(); - doWriteBatch(buffer, batch.get()); + auto mock = std::make_shared(); + int64_t expectedSize = 0; + auto mockWriter = arrow::ipc::MakeStreamWriter(mock.get(), batch->schema()); + arrow::Status outStatus = mockWriter.ValueOrDie()->WriteRecordBatch(*batch); + + expectedSize = mock->Tell().ValueOrDie(); + auto reserve = buffer->Reserve(expectedSize); + if (reserve.ok() == false) { + throw std::runtime_error("Unable to reserve memory for table"); + } + + auto deferredWriterStream = source.streamer(buffer); + + auto outBatch = arrow::ipc::MakeStreamWriter(deferredWriterStream, batch->schema()); + if (outBatch.ok() == false) { + throw ::std::runtime_error("Unable to create batch writer"); + } + + outStatus = outBatch.ValueOrDie()->WriteRecordBatch(*batch); + + if (outStatus.ok() == false) { + throw std::runtime_error("Unable to Write batch"); + } // deletion happens in the caller }; - /// To finalise this we write the table to the buffer. - /// FIXME: most likely not a great idea. We should probably write to the buffer - /// directly in the TableBuilder, incrementally. auto finalizer = [](std::shared_ptr b) -> void { // This is empty because we already serialised the object when - // the LifetimeHolder goes out of scope. + // the LifetimeHolder goes out of scope. See code above. }; context.addBuffer(std::move(header), buffer, std::move(finalizer), routeIndex); diff --git a/Framework/Core/src/RootArrowFilesystem.cxx b/Framework/Core/src/RootArrowFilesystem.cxx index c563866e802bb..403e393ec6090 100644 --- a/Framework/Core/src/RootArrowFilesystem.cxx +++ b/Framework/Core/src/RootArrowFilesystem.cxx @@ -42,6 +42,12 @@ TFileFileSystem::TFileFileSystem(TDirectoryFile* f, size_t readahead, RootObject ((TFile*)mFile)->SetReadaheadSize(50 * 1024 * 1024); } +TFileFileSystem::~TFileFileSystem() +{ + mFile->Close(); + delete mFile; +} + std::shared_ptr TFileFileSystem::GetObjectHandler(arrow::dataset::FileSource source) { // We use a plugin to create the actual objects inside the diff --git a/Framework/Core/src/TableTreeHelpers.cxx b/Framework/Core/src/TableTreeHelpers.cxx index 2f23c07aea451..84d4ff171bc39 100644 --- a/Framework/Core/src/TableTreeHelpers.cxx +++ b/Framework/Core/src/TableTreeHelpers.cxx @@ -13,7 +13,6 @@ #include "Framework/Endian.h" #include "Framework/Signpost.h" -#include "arrow/type_traits.h" #include #include #include @@ -533,7 +532,7 @@ void TreeToTable::setLabel(const char* label) mTableLabel = label; } -void TreeToTable::fill(TTree*tree) +void TreeToTable::fill(TTree* tree) { std::vector> columns; std::vector> fields; @@ -569,8 +568,10 @@ std::shared_ptr TreeToTable::finalize() return mTable; } -FragmentToBatch::FragmentToBatch(arrow::MemoryPool* pool) - : mArrowMemoryPool{pool} +FragmentToBatch::FragmentToBatch(StreamerCreator creator, std::shared_ptr fragment, arrow::MemoryPool* pool) + : mFragment{std::move(fragment)}, + mArrowMemoryPool{pool}, + mCreator{std::move(creator)} { } @@ -579,13 +580,14 @@ void FragmentToBatch::setLabel(const char* label) mTableLabel = label; } -void FragmentToBatch::fill(std::shared_ptr fragment, std::shared_ptr schema, std::shared_ptr format) +void FragmentToBatch::fill(std::shared_ptr schema, std::shared_ptr format) { auto options = std::make_shared(); options->dataset_schema = schema; - auto scanner = format->ScanBatchesAsync(options, fragment); + auto scanner = format->ScanBatchesAsync(options, mFragment); auto batch = (*scanner)(); mRecordBatch = *batch.result(); + // Notice that up to here the buffer was not yet filled. } std::shared_ptr FragmentToBatch::finalize() diff --git a/Framework/Core/test/test_Root2ArrowTable.cxx b/Framework/Core/test/test_Root2ArrowTable.cxx index 438f388ec86b5..f87c3014eb0f4 100644 --- a/Framework/Core/test/test_Root2ArrowTable.cxx +++ b/Framework/Core/test/test_Root2ArrowTable.cxx @@ -38,6 +38,7 @@ #include #include +#include #include #include #include @@ -388,6 +389,7 @@ bool validatePhysicalSchema(std::shared_ptr schema) { REQUIRE(schema->num_fields() == 12); REQUIRE(schema->field(0)->type()->id() == arrow::float32()->id()); + REQUIRE(schema->field(0)->name() == "px"); REQUIRE(schema->field(1)->type()->id() == arrow::float32()->id()); REQUIRE(schema->field(2)->type()->id() == arrow::float32()->id()); REQUIRE(schema->field(3)->type()->id() == arrow::float64()->id()); @@ -541,12 +543,28 @@ TEST_CASE("RootTree2Dataset") options->dataset_schema = schema; auto scanner = format->ScanBatchesAsync(options, *fragment); REQUIRE(scanner.ok()); + + // This is batch has deferred contents. Therefore we need to use a DeferredOutputStream to + // write it to a real one and read it back with the BufferReader, which is hopefully zero copy + std::shared_ptr batch; + auto batches = (*scanner)(); auto result = batches.result(); REQUIRE(result.ok()); REQUIRE((*result)->columns().size() == 11); REQUIRE((*result)->num_rows() == 100); - validateContents(*result); + std::shared_ptr buffer = *arrow::AllocateResizableBuffer(1000, 64); + auto deferredWriterStream = factory.capabilities[1].factory().deferredOutputStreamer(*fragment, buffer); + auto outBatch = arrow::ipc::MakeStreamWriter(deferredWriterStream.get(), schema); + auto status = outBatch.ValueOrDie()->WriteRecordBatch(**result); + std::shared_ptr bufferReader = std::make_shared(buffer); + auto readerResult = arrow::ipc::RecordBatchStreamReader::Open(bufferReader); + auto batchReader = readerResult.ValueOrDie(); + + auto next = batchReader->ReadNext(&batch); + REQUIRE(batch != nullptr); + + validateContents(batch); auto* output = new TMemFile("foo", "RECREATE"); auto outFs = std::make_shared(output, 0, factory); @@ -558,7 +576,8 @@ TEST_CASE("RootTree2Dataset") // Write to the /DF_3 tree at top level arrow::fs::FileLocator locator{outFs, "/DF_3"}; auto writer = format->MakeWriter(*destination, schema, {}, locator); - auto success = writer->get()->Write(*result); + auto success = writer->get()->Write(batch); + REQUIRE(batch->schema()->field(0)->name() == "px"); auto rootDestination = std::dynamic_pointer_cast(*destination); SECTION("Read tree") @@ -568,7 +587,11 @@ TEST_CASE("RootTree2Dataset") auto tfileFs = std::dynamic_pointer_cast(outFs); REQUIRE(tfileFs.get()); REQUIRE(tfileFs->GetFile()); - REQUIRE(tfileFs->GetFile()->GetObjectChecked("/DF_3", TClass::GetClass("TTree"))); + auto* tree = (TTree*)tfileFs->GetFile()->GetObjectChecked("/DF_3", TClass::GetClass("TTree")); + REQUIRE(tree != nullptr); + REQUIRE(((TBranch*)tree->GetListOfBranches()->At(0))->GetEntries() == 100); + REQUIRE(((TBranch*)tree->GetListOfBranches()->At(0))->GetName() == std::string("px")); + arrow::dataset::FileSource source2("/DF_3", outFs); REQUIRE(format->IsSupported(source2) == true); @@ -577,6 +600,10 @@ TEST_CASE("RootTree2Dataset") REQUIRE(tfileFs->GetFile()); REQUIRE(tfileFs->GetFile()->GetObjectChecked("/DF_3", TClass::GetClass("TTree"))); + tree = (TTree*)tfileFs->GetFile()->GetObjectChecked("/DF_3", TClass::GetClass("TTree")); + REQUIRE(tree != nullptr); + REQUIRE(((TBranch*)tree->GetListOfBranches()->At(0))->GetEntries() == 100); + auto schemaOptWritten = format->Inspect(source2); tfileFs = std::dynamic_pointer_cast(source2.filesystem()); REQUIRE(tfileFs.get()); @@ -585,6 +612,10 @@ TEST_CASE("RootTree2Dataset") REQUIRE(schemaOptWritten.ok()); auto schemaWritten = *schemaOptWritten; + tree = (TTree*)tfileFs->GetFile()->GetObjectChecked("/DF_3", TClass::GetClass("TTree")); + REQUIRE(tree != nullptr); + REQUIRE(((TBranch*)tree->GetListOfBranches()->At(0))->GetEntries() == 100); + REQUIRE(validatePhysicalSchema(schemaWritten)); std::vector> fields; for (auto& field : schemaWritten->fields()) { @@ -599,23 +630,38 @@ TEST_CASE("RootTree2Dataset") auto fragmentWritten = format->MakeFragment(source2, {}, *physicalSchema); REQUIRE(fragmentWritten.ok()); auto optionsWritten = std::make_shared(); - options->dataset_schema = schema; - auto scannerWritten = format->ScanBatchesAsync(optionsWritten, *fragment); + optionsWritten->dataset_schema = schema; + auto scannerWritten = format->ScanBatchesAsync(optionsWritten, *fragmentWritten); REQUIRE(scannerWritten.ok()); - auto batchesWritten = (*scanner)(); - auto resultWritten = batches.result(); + tree = (TTree*)tfileFs->GetFile()->GetObjectChecked("/DF_3", TClass::GetClass("TTree")); + REQUIRE(tree != nullptr); + REQUIRE(((TBranch*)tree->GetListOfBranches()->At(0))->GetEntries() == 100); + auto batchesWritten = (*scannerWritten)(); + auto resultWritten = batchesWritten.result(); REQUIRE(resultWritten.ok()); REQUIRE((*resultWritten)->columns().size() == 11); REQUIRE((*resultWritten)->num_rows() == 100); - validateContents(*resultWritten); + + std::shared_ptr buffer = *arrow::AllocateResizableBuffer(1000, 64); + auto deferredwriterstream = factory.capabilities[1].factory().deferredOutputStreamer(*fragmentWritten, buffer); + auto outBatch = arrow::ipc::MakeStreamWriter(deferredWriterStream.get(), schema); + auto status = outBatch.ValueOrDie()->WriteRecordBatch(**resultWritten); + std::shared_ptr bufferReader = std::make_shared(buffer); + auto readerResult = arrow::ipc::RecordBatchStreamReader::Open(bufferReader); + auto batchReader = readerResult.ValueOrDie(); + + auto next = batchReader->ReadNext(&batch); + REQUIRE(batch != nullptr); + validateContents(batch); } + arrow::fs::FileLocator rnTupleLocator{outFs, "/rntuple"}; // We write an RNTuple in the same TMemFile, using /rntuple as a location auto rntupleDestination = std::dynamic_pointer_cast(*destination); { auto rNtupleWriter = rNtupleFormat->MakeWriter(*destination, schema, {}, rnTupleLocator); - auto rNtupleSuccess = rNtupleWriter->get()->Write(*result); + auto rNtupleSuccess = rNtupleWriter->get()->Write(batch); REQUIRE(rNtupleSuccess.ok()); } diff --git a/Framework/TestWorkflows/src/o2TestHistograms.cxx b/Framework/TestWorkflows/src/o2TestHistograms.cxx index efac16f6da4f0..2ec268130267b 100644 --- a/Framework/TestWorkflows/src/o2TestHistograms.cxx +++ b/Framework/TestWorkflows/src/o2TestHistograms.cxx @@ -40,7 +40,7 @@ struct EtaAndClsHistogramsSimple { OutputObj etaClsH{TH2F("eta_vs_pt", "#eta vs pT", 102, -2.01, 2.01, 100, 0, 10)}; Produces skimEx; - void process(aod::Tracks const& tracks) + void process(aod::Tracks const& tracks, aod::FT0s const&) { LOGP(info, "Invoking the simple one"); for (auto& track : tracks) { @@ -54,7 +54,7 @@ struct EtaAndClsHistogramsIUSimple { OutputObj etaClsH{TH2F("eta_vs_pt", "#eta vs pT", 102, -2.01, 2.01, 100, 0, 10)}; Produces skimEx; - void process(aod::TracksIU const& tracks) + void process(aod::TracksIU const& tracks, aod::FT0s const&) { LOGP(info, "Invoking the simple one"); for (auto& track : tracks) {