From 4d5d8b9f5895514b621f7fb4495aa90a1a5b41ff Mon Sep 17 00:00:00 2001 From: Giulio Eulisse <10544+ktf@users.noreply.github.com> Date: Wed, 17 Jan 2024 14:58:33 +0100 Subject: [PATCH] DPL: account for the case in which outputs are not user created --- .../Core/include/Framework/StreamContext.h | 2 +- Framework/Core/src/CommonServices.cxx | 17 +++++++++++------ Framework/Core/src/DataAllocator.cxx | 2 +- 3 files changed, 13 insertions(+), 8 deletions(-) diff --git a/Framework/Core/include/Framework/StreamContext.h b/Framework/Core/include/Framework/StreamContext.h index 198802179a419..8300ce4ce7ac8 100644 --- a/Framework/Core/include/Framework/StreamContext.h +++ b/Framework/Core/include/Framework/StreamContext.h @@ -69,7 +69,7 @@ struct StreamContext { // for a given iteration. // This is in the stream context to allow tracking data creation on a per thread // basis. - std::vector routeCreated; + std::vector routeUserCreated; }; } // namespace o2::framework diff --git a/Framework/Core/src/CommonServices.cxx b/Framework/Core/src/CommonServices.cxx index 4656ddeb12e63..fd02a4c9ccb88 100644 --- a/Framework/Core/src/CommonServices.cxx +++ b/Framework/Core/src/CommonServices.cxx @@ -169,9 +169,9 @@ o2::framework::ServiceSpec CommonServices::streamContextSpec() // the same stream might be referring to different data processors. // We should probably have a context which is per stream of a specific // data processor. - stream->routeCreated.resize(routes.size()); - // Reset the routeCreated at every processing step - std::fill(stream->routeCreated.begin(), stream->routeCreated.end(), false); }, + stream->routeUserCreated.resize(routes.size()); + // Reset the routeUserCreated at every processing step + std::fill(stream->routeUserCreated.begin(), stream->routeUserCreated.end(), false); }, .postProcessing = [](ProcessingContext& processingContext, void* service) { auto* stream = (StreamContext*)service; auto& routes = processingContext.services().get().outputs; @@ -182,7 +182,7 @@ o2::framework::ServiceSpec CommonServices::streamContextSpec() // it means it was created out of band. bool didCreate = false; for (size_t ri = 0; ri < routes.size(); ++ri) { - if (stream->routeCreated[ri] == true) { + if (stream->routeUserCreated[ri] == true) { didCreate = true; break; } @@ -192,7 +192,7 @@ o2::framework::ServiceSpec CommonServices::streamContextSpec() return; } for (size_t ri = 0; ri < routes.size(); ++ri) { - if (stream->routeCreated[ri] == true) { + if (stream->routeUserCreated[ri] == true) { continue; } auto &route = routes[ri]; @@ -458,7 +458,9 @@ o2::framework::ServiceSpec CommonServices::ccdbSupportSpec() // For any output that is a FLP/DISTSUBTIMEFRAME with subspec != 0, // we create a new message. InputSpec matcher{"matcher", ConcreteDataTypeMatcher{"FLP", "DISTSUBTIMEFRAME"}}; - for (auto& output : pc.services().get().outputs) { + auto& streamContext = pc.services().get(); + for (size_t oi = 0; oi < pc.services().get().outputs.size(); ++oi) { + OutputRoute const& output = pc.services().get().outputs[oi]; if ((output.timeslice % output.maxTimeslices) != 0) { continue; } @@ -471,6 +473,9 @@ o2::framework::ServiceSpec CommonServices::ccdbSupportSpec() stfDist.id = timingInfo.timeslice; stfDist.firstOrbit = timingInfo.firstTForbit; stfDist.runNumber = timingInfo.runNumber; + // We mark it as not created, because we do should not account for it when + // checking if we created all the data for a timeslice. + streamContext.routeUserCreated[oi] = false; } } }, .kind = ServiceKind::Global}; diff --git a/Framework/Core/src/DataAllocator.cxx b/Framework/Core/src/DataAllocator.cxx index 9ce72aed8c1cf..a6b13ceeda101 100644 --- a/Framework/Core/src/DataAllocator.cxx +++ b/Framework/Core/src/DataAllocator.cxx @@ -53,7 +53,7 @@ RouteIndex DataAllocator::matchDataHeader(const Output& spec, size_t timeslice) for (auto ri = 0; ri < allowedOutputRoutes.size(); ++ri) { auto& route = allowedOutputRoutes[ri]; if (DataSpecUtils::match(route.matcher, spec.origin, spec.description, spec.subSpec) && ((timeslice % route.maxTimeslices) == route.timeslice)) { - stream.routeCreated[ri] = true; + stream.routeUserCreated[ri] = true; return RouteIndex{ri}; } }