Skip to content

Commit

Permalink
Merge 6064fb9 into sapling-pr-archive-ktf
Browse files Browse the repository at this point in the history
  • Loading branch information
ktf authored Feb 13, 2025
2 parents 071b329 + 6064fb9 commit 95f780c
Show file tree
Hide file tree
Showing 13 changed files with 766 additions and 301 deletions.
16 changes: 12 additions & 4 deletions Framework/AnalysisSupport/src/AODJAlienReaderHelpers.cxx
Original file line number Diff line number Diff line change
Expand Up @@ -10,10 +10,12 @@
// or submit itself to any jurisdiction.

#include "AODJAlienReaderHelpers.h"
#include <memory>
#include "Framework/TableTreeHelpers.h"
#include "Framework/AnalysisHelpers.h"
#include "Framework/DataProcessingStats.h"
#include "Framework/RootTableBuilderHelpers.h"
#include "Framework/RootArrowFilesystem.h"
#include "Framework/AlgorithmSpec.h"
#include "Framework/ConfigParamRegistry.h"
#include "Framework/ControlService.h"
Expand Down Expand Up @@ -41,6 +43,8 @@
#include <arrow/io/interfaces.h>
#include <arrow/table.h>
#include <arrow/util/key_value_metadata.h>
#include <arrow/dataset/dataset.h>
#include <arrow/dataset/file_base.h>

using namespace o2;
using namespace o2::aod;
Expand Down Expand Up @@ -272,11 +276,13 @@ AlgorithmSpec AODJAlienReaderHelpers::rootFileReaderCallback(ConfigContext const
// Origin file name for derived output map
auto o2 = Output(TFFileNameHeader);
auto fileAndFolder = didir->getFileFolder(dh, fcnt, ntf);
std::string currentFilename(fileAndFolder.file->GetName());
if (strcmp(fileAndFolder.file->GetEndpointUrl()->GetProtocol(), "file") == 0 && fileAndFolder.file->GetEndpointUrl()->GetFile()[0] != '/') {
auto rootFS = std::dynamic_pointer_cast<TFileFileSystem>(fileAndFolder.filesystem());
auto* f = dynamic_cast<TFile*>(rootFS->GetFile());
std::string currentFilename(f->GetFile()->GetName());
if (strcmp(f->GetEndpointUrl()->GetProtocol(), "file") == 0 && f->GetEndpointUrl()->GetFile()[0] != '/') {
// This is not an absolute local path. Make it absolute.
static std::string pwd = gSystem->pwd() + std::string("/");
currentFilename = pwd + std::string(fileAndFolder.file->GetName());
currentFilename = pwd + std::string(f->GetName());
}
outputs.make<std::string>(o2) = currentFilename;
}
Expand Down Expand Up @@ -312,7 +318,9 @@ AlgorithmSpec AODJAlienReaderHelpers::rootFileReaderCallback(ConfigContext const
auto concrete = DataSpecUtils::asConcreteDataMatcher(firstRoute.matcher);
auto dh = header::DataHeader(concrete.description, concrete.origin, concrete.subSpec);
auto fileAndFolder = didir->getFileFolder(dh, fcnt, ntf);
if (!fileAndFolder.file) {

// In case the filesource is empty, move to the next one.
if (fileAndFolder.path().empty()) {
fcnt += 1;
ntf = 0;
if (didir->atEnd(fcnt)) {
Expand Down
193 changes: 129 additions & 64 deletions Framework/AnalysisSupport/src/DataInputDirector.cxx

Large diffs are not rendered by default.

20 changes: 9 additions & 11 deletions Framework/AnalysisSupport/src/DataInputDirector.h
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,10 @@

#include "Framework/DataDescriptorMatcher.h"
#include "Framework/DataAllocator.h"
#include "Framework/RootArrowFilesystem.h"

#include <arrow/filesystem/filesystem.h>
#include <arrow/dataset/dataset.h>

#include <regex>
#include "rapidjson/fwd.h"
Expand All @@ -31,16 +35,10 @@ struct FileNameHolder {
std::string fileName;
int numberOfTimeFrames = 0;
std::vector<uint64_t> listOfTimeFrameNumbers;
std::vector<std::string> listOfTimeFrameKeys;
std::vector<bool> alreadyRead;
};
FileNameHolder* makeFileNameHolder(std::string fileName);

struct FileAndFolder {
TFile* file = nullptr;
std::string folderName = "";
};

class DataInputDescriptor
{
/// Holds information concerning the reading of an aod table.
Expand All @@ -52,7 +50,6 @@ class DataInputDescriptor
std::string treename = "";
std::unique_ptr<data_matcher::DataDescriptorMatcher> matcher;

DataInputDescriptor() = default;
DataInputDescriptor(bool alienSupport, int level, o2::monitoring::Monitoring* monitoring = nullptr, int allowedParentLevel = 0, std::string parentFileReplacement = "");

void printOut();
Expand All @@ -78,7 +75,7 @@ class DataInputDescriptor
int findDFNumber(int file, std::string dfName);

uint64_t getTimeFrameNumber(int counter, int numTF);
FileAndFolder getFileFolder(int counter, int numTF);
arrow::dataset::FileSource getFileFolder(int counter, int numTF);
DataInputDescriptor* getParentFile(int counter, int numTF, std::string treename);
int getTimeFramesInFile(int counter);
int getReadTimeFramesInFile(int counter);
Expand All @@ -90,6 +87,7 @@ class DataInputDescriptor
bool isAlienSupportOn() { return mAlienSupport; }

private:
o2::framework::RootObjectReadingFactory mFactory;
std::string minputfilesFile = "";
std::string* minputfilesFilePtr = nullptr;
std::string mFilenameRegex = "";
Expand All @@ -98,7 +96,7 @@ class DataInputDescriptor
std::string mParentFileReplacement;
std::vector<FileNameHolder*> mfilenames;
std::vector<FileNameHolder*>* mdefaultFilenamesPtr = nullptr;
TFile* mcurrentFile = nullptr;
std::shared_ptr<arrow::fs::FileSystem> mCurrentFilesystem;
int mCurrentFileID = -1;
bool mAlienSupport = false;

Expand Down Expand Up @@ -127,7 +125,6 @@ class DataInputDirector
~DataInputDirector();

void reset();
void createDefaultDataInputDescriptor();
void printOut();
bool atEnd(int counter);

Expand All @@ -140,10 +137,11 @@ class DataInputDirector
// getters
DataInputDescriptor* getDataInputDescriptor(header::DataHeader dh);
int getNumberInputDescriptors() { return mdataInputDescriptors.size(); }
void createDefaultDataInputDescriptor();

bool readTree(DataAllocator& outputs, header::DataHeader dh, int counter, int numTF, size_t& totalSizeCompressed, size_t& totalSizeUncompressed);
uint64_t getTimeFrameNumber(header::DataHeader dh, int counter, int numTF);
FileAndFolder getFileFolder(header::DataHeader dh, int counter, int numTF);
arrow::dataset::FileSource getFileFolder(header::DataHeader dh, int counter, int numTF);
int getTimeFramesInFile(header::DataHeader dh, int counter);

uint64_t getTotalSizeCompressed();
Expand Down
40 changes: 34 additions & 6 deletions Framework/AnalysisSupport/src/Plugin.cxx
Original file line number Diff line number Diff line change
Expand Up @@ -85,20 +85,48 @@ std::vector<std::string> getListOfTables(std::unique_ptr<TFile>& f)
{
std::vector<std::string> r;
TList* keyList = f->GetListOfKeys();
// We should handle two cases, one where the list of tables in a TDirectory,
// the other one where the dataframe number is just a prefix
std::string first = "";

for (auto key : *keyList) {
if (!std::string_view(key->GetName()).starts_with("DF_")) {
if (!std::string_view(key->GetName()).starts_with("DF_") && !std::string_view(key->GetName()).starts_with("/DF_")) {
continue;
}
auto* d = (TDirectory*)f->Get(key->GetName());
TList* branchList = d->GetListOfKeys();
for (auto b : *branchList) {
r.emplace_back(b->GetName());
auto* d = (TDirectory*)f->GetObjectChecked(key->GetName(), TClass::GetClass("TDirectory"));
// Objects are in a folder, list it.
if (d) {
TList* branchList = d->GetListOfKeys();
for (auto b : *branchList) {
r.emplace_back(b->GetName());
}
break;
}

void* v = f->GetObjectChecked(key->GetName(), TClass::GetClass("ROOT::Experimental::RNTuple"));
if (v) {
std::string s = key->GetName();
size_t pos = s.find('-');
// Check if '-' is found
// Skip metaData and parentFiles
if (pos == std::string::npos) {
continue;
}
std::string t = s.substr(pos + 1);
// If we find a duplicate table name, it means we are in the next DF and we can stop.
if (t == first) {
break;
}
if (first.empty()) {
first = t;
}
// Create a new string starting after the '-'
r.emplace_back(t);
}
break;
}
return r;
}

auto readMetadata(std::unique_ptr<TFile>& currentFile) -> std::vector<ConfigParamSpec>
{
// Get the metadata, if any
Expand Down
6 changes: 5 additions & 1 deletion Framework/AnalysisSupport/src/RNTuplePlugin.cxx
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
#include "Framework/RuntimeError.h"
#include "Framework/RootArrowFilesystem.h"
#include "Framework/Plugins.h"
#include "Framework/FairMQResizableBuffer.h"
#include <ROOT/RNTupleModel.hxx>
#include <ROOT/RNTupleWriteOptions.hxx>
#include <ROOT/RNTupleWriter.hxx>
Expand Down Expand Up @@ -852,7 +853,10 @@ struct RNTupleObjectReadingImplementation : public RootArrowFactoryPlugin {
return new RootArrowFactory{
.options = [context]() { return context->format->DefaultWriteOptions(); },
.format = [context]() { return context->format; },
};
.deferredOutputStreamer = [](std::shared_ptr<arrow::dataset::FileFragment> fragment, const std::shared_ptr<arrow::ResizableBuffer>& buffer) -> std::shared_ptr<arrow::io::OutputStream> {
auto treeFragment = std::dynamic_pointer_cast<RNTupleFileFragment>(fragment);
return std::make_shared<FairMQOutputStream>(buffer);
}};
}
};

Expand Down
Loading

0 comments on commit 95f780c

Please sign in to comment.