Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Properly limit the collections that are read #290

Open
wants to merge 16 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 15 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 8 additions & 5 deletions k4FWCore/components/IOSvc.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
#include "podio/Frame.h"
#include "podio/FrameCategories.h"
#include "podio/Reader.h"
#include "podio/podioVersion.h"

#include "k4FWCore/FunctionalUtils.h"
#include "k4FWCore/KeepDropSwitch.h"
Expand Down Expand Up @@ -114,7 +115,13 @@ std::tuple<std::vector<podio::CollectionBase*>, std::vector<std::string>, podio:
{
std::lock_guard<std::mutex> lock(m_changeBufferLock);
if (m_nextEntry < m_entries) {
frame = podio::Frame(m_reader->readEvent(m_nextEntry));
debug() << "Reading event " << m_nextEntry << endmsg;
#if PODIO_BUILD_VERSION <= PODIO_VERSION(1, 2, 0)
frame = m_reader->readEvent(m_nextEntry);
#else
debug() << "Reading collections " << m_collectionNames.value() << endmsg;
frame = m_reader->readEvent(m_nextEntry, m_collectionNames);
#endif
} else {
return std::make_tuple(std::vector<podio::CollectionBase*>(), std::vector<std::string>(), std::move(frame));
}
Expand Down Expand Up @@ -187,10 +194,6 @@ void IOSvc::handle(const Incident& incident) {
}
}

void IOSvc::setReadingCollectionNames(const std::vector<std::string>& names) { m_collectionNames = names; }

void IOSvc::setReadingFileNames(const std::vector<std::string>& names) { m_readingFileNames = names; }

bool IOSvc::checkIfWriteCollection(const std::string& collName) { return m_switch.isOn(collName); }

DECLARE_COMPONENT(IOSvc)
3 changes: 0 additions & 3 deletions k4FWCore/components/IOSvc.h
Original file line number Diff line number Diff line change
Expand Up @@ -49,9 +49,6 @@ class IOSvc : public extends<Service, IIOSvc, IIncidentListener> {
return std::make_shared<std::vector<std::string>>(m_collectionNames);
}

void setReadingCollectionNames(const std::vector<std::string>& names);
void setReadingFileNames(const std::vector<std::string>& names);

protected:
Gaudi::Property<std::vector<std::string>> m_collectionNames{
this, "CollectionNames", {}, "List of collections to read"};
Expand Down
3 changes: 3 additions & 0 deletions k4FWCore/components/PodioInput.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -166,6 +166,9 @@ StatusCode PodioInput::initialize() {
m_collectionNames.clear();
}

debug() << "Setting collections to read to: " << m_collectionNames.value() << endmsg;
m_podioDataSvc->setCollsToRead(m_collectionNames);

return StatusCode::SUCCESS;
}

Expand Down
16 changes: 9 additions & 7 deletions k4FWCore/components/Reader.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,9 @@ class CollectionPusher : public Gaudi::Functional::details::BaseClass_t<Gaudi::F

template <typename T>
using OutputHandle_t = Gaudi::Functional::details::OutputHandle_t<Traits_, std::remove_pointer_t<T>>;
std::vector<OutputHandle_t<Out>> m_outputs;
std::vector<OutputHandle_t<Out>> m_outputs;

protected:
Gaudi::Property<std::vector<std::string>> m_inputCollections{
this, "InputCollections", {"First collection"}, "List of input collections"};
// Gaudi::Property<std::string> m_input{this, "Input", "Event", "Input file"};
Expand Down Expand Up @@ -106,12 +108,12 @@ class Reader final : public CollectionPusher {
// Gaudi doesn't run the destructor of the Services so we have to
// manually ask for the reader to be deleted so it will call finish()
// See https://gitlab.cern.ch/gaudi/Gaudi/-/issues/169
~Reader() override { iosvc->deleteReader(); }
~Reader() override { m_iosvc->deleteReader(); }

ServiceHandle<IIOSvc> iosvc{this, "IOSvc", "IOSvc"};
ServiceHandle<IIOSvc> m_iosvc{this, "IOSvc", "IOSvc"};

StatusCode initialize() override {
if (!iosvc.isValid()) {
if (!m_iosvc.isValid()) {
error() << "Unable to locate IIOSvc interface" << endmsg;
return StatusCode::FAILURE;
}
Expand All @@ -120,8 +122,8 @@ class Reader final : public CollectionPusher {
}

StatusCode finalize() override {
if (iosvc) {
iosvc->deleteReader();
if (m_iosvc) {
m_iosvc->deleteReader();
}
return StatusCode::SUCCESS;
}
Expand All @@ -130,7 +132,7 @@ class Reader final : public CollectionPusher {
// By convention the Frame is pushed to the store
// so that it's deleted at the right time
std::tuple<std::vector<podio::CollectionBase*>, std::vector<std::string>> operator()() const override {
auto val = iosvc->next();
auto val = m_iosvc->next();

auto eds = eventSvc().as<IDataProviderSvc>();
auto frame = std::move(std::get<podio::Frame>(val));
Expand Down
11 changes: 9 additions & 2 deletions k4FWCore/components/Writer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -164,7 +164,9 @@ class Writer final : public Gaudi::Functional::Consumer<void(const EventContext&
// info() << "Leaf " << pReg->name() << " has no object" << endmsg;
// continue;
// }
m_availableCollections.insert(pReg->name().substr(1, pReg->name().size() - 1));
auto collName = pReg->name().substr(1, pReg->name().size() - 1);
debug() << "Adding " << collName << " to the list of available collections" << endmsg;
m_availableCollections.insert(std::move(collName));
}
}

Expand Down Expand Up @@ -206,9 +208,12 @@ class Writer final : public Gaudi::Functional::Consumer<void(const EventContext&
// and cache them
getOutputCollections();
for (const auto& coll : m_availableCollections) {
if (iosvc->checkIfWriteCollection(coll)) {
const auto doWrite = iosvc->checkIfWriteCollection(coll);
debug() << "Checking if " << coll << " should be written: " << (doWrite ? "yes" : "no") << endmsg;
if (doWrite) {
m_collectionsToSave.push_back(coll);
if (std::find(frameCollections.begin(), frameCollections.end(), coll) == frameCollections.end()) {
debug() << coll << " has to be added to the Frame" << endmsg;
m_collectionsToAdd.push_back(coll);
}
}
Expand All @@ -220,6 +225,7 @@ class Writer final : public Gaudi::Functional::Consumer<void(const EventContext&
// deleted by the store (and later deleted by the Frame, triggering a double
// delete)
for (const auto& coll : frameCollections) {
debug() << "Taking ownership of collection " << coll << " from the IOSvc as it belongs to a Frame" << endmsg;
DataObject* storeCollection;
if (m_dataSvc->retrieveObject("/Event/" + coll, storeCollection).isFailure()) {
error() << "Failed to retrieve collection " << coll << endmsg;
Expand All @@ -240,6 +246,7 @@ class Writer final : public Gaudi::Functional::Consumer<void(const EventContext&

std::vector<std::string_view> collectionsToRemove;
for (const auto& coll : m_collectionsToAdd) {
debug() << "Adding collection " << coll << " to the IOSvc Frame" << endmsg;
DataObject* storeCollection;
if (m_dataSvc->retrieveObject("/Event/" + coll, storeCollection).isFailure()) {
error() << "Failed to retrieve collection " << coll << endmsg;
Expand Down
4 changes: 4 additions & 0 deletions k4FWCore/include/k4FWCore/PodioDataSvc.h
Original file line number Diff line number Diff line change
Expand Up @@ -90,6 +90,8 @@ class PodioDataSvc : public DataSvc {
/// TODO: Make this private again after conversions have been properly solved
podio::Frame& getMetaDataFrame() { return m_metadataframe; }

void setCollsToRead(const std::vector<std::string>& collsToRead) { m_collsToRead = collsToRead; }

private:
/// PODIO reader for ROOT files
podio::ROOTReader m_reader;
Expand All @@ -109,6 +111,8 @@ class PodioDataSvc : public DataSvc {

// Registry of data wrappers; needed for memory management
std::vector<DataWrapperBase*> m_podio_datawrappers;
/// The names of the collections to read (set externally)
std::vector<std::string> m_collsToRead{};

protected:
/// ROOT file name the input is read from. Set by option filename
Expand Down
19 changes: 19 additions & 0 deletions k4FWCore/src/PodioDataSvc.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
#include "k4FWCore/DataWrapper.h"

#include "podio/CollectionBase.h"
#include "podio/podioVersion.h"

/// Service initialisation
StatusCode PodioDataSvc::initialize() {
Expand Down Expand Up @@ -103,7 +104,16 @@ StatusCode PodioDataSvc::clearStore() {
StatusCode PodioDataSvc::i_setRoot(std::string root_path, IOpaqueAddress* pRootAddr) {
// create a new frame
if (m_reading_from_file) {
debug() << "Reading event " << m_eventNum + m_1stEvtEntry << ", using collections: " << m_collsToRead << endmsg;
#if PODIO_BUILD_VERSION <= PODIO_VERSION(1, 2, 0)
if (!m_collsToRead.empty()) {
warning() << "Trying to limit collections that are read, but podio does only support this with version > 1.2"
<< endmsg;
}
m_eventframe = podio::Frame(m_reader.readEntry("events", m_eventNum + m_1stEvtEntry));
#else
m_eventframe = podio::Frame(m_reader.readEntry("events", m_eventNum + m_1stEvtEntry, m_collsToRead));
#endif
} else {
m_eventframe = podio::Frame();
}
Expand All @@ -113,7 +123,16 @@ StatusCode PodioDataSvc::i_setRoot(std::string root_path, IOpaqueAddress* pRootA
StatusCode PodioDataSvc::i_setRoot(std::string root_path, DataObject* pRootObj) {
// create a new frame
if (m_reading_from_file) {
debug() << "Reading event " << m_eventNum + m_1stEvtEntry << ", using collections: " << m_collsToRead << endmsg;
#if PODIO_BUILD_VERSION <= PODIO_VERSION(1, 2, 0)
if (!m_collsToRead.empty()) {
warning() << "Trying to limit collections that are read, but podio does only support this with version > 1.2"
<< endmsg;
}
m_eventframe = podio::Frame(m_reader.readEntry("events", m_eventNum + m_1stEvtEntry));
#else
m_eventframe = podio::Frame(m_reader.readEntry("events", m_eventNum + m_1stEvtEntry, m_collsToRead));
#endif
} else {
m_eventframe = podio::Frame();
}
Expand Down
94 changes: 68 additions & 26 deletions python/k4FWCore/ApplicationMgr.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,10 +16,13 @@
# See the License for the specific language governing permissions and
# limitations under the License.
#
import os
import logging

from Configurables import ApplicationMgr as AppMgr
from Configurables import Reader, Writer, IOSvc, Gaudi__Sequencer, EventLoopMgr

logger = logging.getLogger()


class ApplicationMgr:
"""ApplicationMgr is a class that wraps the Gaudi ApplicationMgr class to
Expand All @@ -37,6 +40,67 @@ class ApplicationMgr:
def __init__(self, **kwargs):
self._mgr = AppMgr(**kwargs)

def _setup_reader(self, reader, iosvc_props):
"""Setup the reader consistently such that it has sane defaults

In some cases we have to peek into the files to obtain some information
to set sane default values if they are not set by the user. We need to
obtain
- The number of events in case EvtMax is set to -1
- The collection names in case the CollectionNames are not provided

Knowing the number of events is necessary to avoid errors when running
multithreaded since if we have, for example, 10 events and we are
running 9 at the same time, then (possibly) the first 9 complete and 9
more are scheduled, out of which only one will be finished without
errors. If we know the number of events in advance then we can just
schedule those.

We need the collection names to read to feed them to the Reader. Either
we take the user provided ones or we get them from the first event in
the file we peek into.
"""
# First we determine whether we need to peek at all
inp = None
if iosvc_props["input"][0]:
inp = "input"
elif iosvc_props["Input"][0]:
inp = "Input"

if not inp:
# We have got nothing to do here, since there is no input
return

collections = iosvc_props["CollectionNames"][0] or None
n_events = self._mgr.EvtMax

# Check if we can get by without peeking into the file
if collections:
logger.info(f"Setting the reader to read the collections: {collections}")
reader.InputCollections = collections
if n_events != -1:
# We know everything we need
logger.info(f"Initializing reader to read {n_events} events")
return

# We need to peek into the file because we lack information.
# Import here to avoid always importing ROOT which is slow
from podio.root_io import Reader as PodioReader

podio_reader = PodioReader(iosvc_props[inp][0])
if n_events == -1:
self._mgr.EvtMax = len(podio_reader.get("events"))
if not collections:
try:
frame = podio_reader.get("events")[0]
logger.debug("Using the first frame to determine collections to read")
collections = list(frame.getAvailableCollections())
except IndexError:
print("Warning, the events category wasn't found in the input file")
raise
logger.info(f"Passing {collections} as collections to read to the Reader")
reader.InputCollections = collections

def fix_properties(self):
# If there isn't an EventLoopMgr then it's the default
# This will suppress two warnings about not using external input
Expand Down Expand Up @@ -75,34 +139,12 @@ def fix_properties(self):
or (props["Output"][0] and props["Output"][0] != "<no value>")
):
writer = Writer("k4FWCore__Writer")

# Let's tell the Reader one of the input files so it can
# know which collections it's going to read
if reader is not None:
# Open the files and get the number of events. This is necessary to
# avoid errors when running multithreaded since if we have, for
# example, 10 events and we are running 9 at the same time, then
# (possibly) the first 9 complete and 9 more are scheduled, out of
# which only one will be finished without errors. If we know the
# number of events in advance then we can just schedule those.
inp = None
if props["input"][0]:
inp = "input"
elif props["Input"][0]:
inp = "Input"
if inp:
# Import here to avoid always importing ROOT which is slow
from podio.root_io import Reader as PodioReader

podio_reader = PodioReader(props[inp][0])
if self._mgr.EvtMax == -1:
self._mgr.EvtMax = podio_reader._reader.getEntries("events")
try:
frame = podio_reader.get("events")[0]
except IndexError:
print("Warning, the events category wasn't found in the input file")
raise
collections = list(frame.getAvailableCollections())
reader.InputCollections = collections
self._setup_reader(reader, props)

self._mgr.TopAlg = ([reader] if add_reader else []) + self._mgr.TopAlg
# Assume the writer is at the end
# Algorithms are wrapped with Sequential=False so that they can run in parallel
Expand Down
4 changes: 4 additions & 0 deletions test/k4FWCoreTest/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -103,6 +103,8 @@ add_test_with_env(CheckExampleEventData_unbounded options/checkExampleEventData.
add_test_with_env(ReadExampleEventData options/readExampleEventData.py)
set_property(TEST ReadExampleEventData APPEND PROPERTY FIXTURES_REQUIRED ExampleEventDataFile)
add_test_with_env(ReadExampleDataFromNthEvent options/readExampleDataFromNthEvent.py PROPERTIES FIXTURES_REQUIRED ExampleEventDataFile)
add_test_with_env(ReadLimitedInputsk4DataSvc options/readLimitedSetOfCollectionsk4DataSvc.py ADD_TO_CHECK_FILES PROPERTIES FIXTURES_REQUIRED ExampleEventDataFile)
add_test_with_env(ReadLimitedInputsAllEventsk4DataSvc options/readLimitedSetOfCollectionsk4DataSvc.py -n -1 --PodioOutput.filename "output_k4test_exampledata_limited_allevents.root" ADD_TO_CHECK_FILES PROPERTIES FIXTURES_REQUIRED ExampleEventDataFile)

add_test_with_env(AlgorithmWithTFile options/TestAlgorithmWithTFile.py)
set_property(TEST AlgorithmWithTFile PROPERTY WORKING_DIRECTORY ${CMAKE_CURRENT_LIST_DIR})
Expand Down Expand Up @@ -194,6 +196,8 @@ add_test_with_env(FunctionalReadNthEvent options/ExampleFunctionalReadNthEvent.p
add_test_with_env(FunctionalProducerRNTuple options/ExampleFunctionalProducerRNTuple.py ADD_TO_CHECK_FILES)
add_test_with_env(FunctionalTTreeToRNTuple options/ExampleFunctionalTTreeToRNTuple.py PROPERTIES FIXTURES_REQUIRED ProducerFile ADD_TO_CHECK_FILES)
add_test_with_env(GaudiFunctional options/ExampleGaudiFunctional.py PROPERTIES FIXTURES_REQUIRED ProducerFile ADD_TO_CHECK_FILES)
add_test_with_env(ReadLimitedInputsIOSvc options/ExampleIOSvcLimitInputCollections.py PROPERTIES FIXTURES_REQUIRED ExampleEventDataFile ADD_TO_CHECK_FILES)
add_test_with_env(ReadLimitedInputsAllEventsIOSvc options/ExampleIOSvcLimitInputCollections.py --IOSvc.Output "functional_limited_input_all_events.root" -n -1 PROPERTIES FIXTURES_REQUIRED ExampleEventData ADD_TO_CHECK_FILES)

add_test_with_env(ParticleIDMetadataFramework options/ExampleParticleIDMetadata.py)

Expand Down
39 changes: 39 additions & 0 deletions test/k4FWCoreTest/options/ExampleIOSvcLimitInputCollections.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
#!/usr/bin/env python3
#
# Copyright (c) 2014-2024 Key4hep-Project.
#
# This file is part of Key4hep.
# See https://key4hep.github.io/key4hep-doc/ for further info.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#

from Gaudi.Configuration import DEBUG
from Configurables import EventDataSvc
from k4FWCore import ApplicationMgr, IOSvc

svc = IOSvc("IOSvc")
svc.Input = "output_k4test_exampledata.root"
# This will limit the collections to be read to only these two collections and
# the output file contain them
svc.CollectionNames = ["MCParticles", "Links"]
svc.Output = "functional_limited_input.root"
svc.OutputLevel = DEBUG

mgr = ApplicationMgr(
TopAlg=[],
EvtSel="NONE",
EvtMax=2,
ExtSvc=[EventDataSvc("EventDataSvc")],
OutputLevel=DEBUG,
)
Loading
Loading