Skip to content

Commit

Permalink
DPL: account for the case in which outputs are not user created
Browse files Browse the repository at this point in the history
  • Loading branch information
ktf committed Jan 17, 2024
1 parent b33072f commit 4d5d8b9
Show file tree
Hide file tree
Showing 3 changed files with 13 additions and 8 deletions.
2 changes: 1 addition & 1 deletion Framework/Core/include/Framework/StreamContext.h
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,7 @@ struct StreamContext {
// for a given iteration.
// This is in the stream context to allow tracking data creation on a per thread
// basis.
std::vector<bool> routeCreated;
std::vector<bool> routeUserCreated;
};

} // namespace o2::framework
Expand Down
17 changes: 11 additions & 6 deletions Framework/Core/src/CommonServices.cxx
Original file line number Diff line number Diff line change
Expand Up @@ -169,9 +169,9 @@ o2::framework::ServiceSpec CommonServices::streamContextSpec()
// the same stream might be referring to different data processors.
// We should probably have a context which is per stream of a specific
// data processor.
stream->routeCreated.resize(routes.size());
// Reset the routeCreated at every processing step
std::fill(stream->routeCreated.begin(), stream->routeCreated.end(), false); },
stream->routeUserCreated.resize(routes.size());
// Reset the routeUserCreated at every processing step
std::fill(stream->routeUserCreated.begin(), stream->routeUserCreated.end(), false); },
.postProcessing = [](ProcessingContext& processingContext, void* service) {
auto* stream = (StreamContext*)service;
auto& routes = processingContext.services().get<DeviceSpec const>().outputs;
Expand All @@ -182,7 +182,7 @@ o2::framework::ServiceSpec CommonServices::streamContextSpec()
// it means it was created out of band.
bool didCreate = false;
for (size_t ri = 0; ri < routes.size(); ++ri) {
if (stream->routeCreated[ri] == true) {
if (stream->routeUserCreated[ri] == true) {
didCreate = true;
break;
}
Expand All @@ -192,7 +192,7 @@ o2::framework::ServiceSpec CommonServices::streamContextSpec()
return;
}
for (size_t ri = 0; ri < routes.size(); ++ri) {
if (stream->routeCreated[ri] == true) {
if (stream->routeUserCreated[ri] == true) {
continue;
}
auto &route = routes[ri];
Expand Down Expand Up @@ -458,7 +458,9 @@ o2::framework::ServiceSpec CommonServices::ccdbSupportSpec()
// For any output that is a FLP/DISTSUBTIMEFRAME with subspec != 0,
// we create a new message.
InputSpec matcher{"matcher", ConcreteDataTypeMatcher{"FLP", "DISTSUBTIMEFRAME"}};
for (auto& output : pc.services().get<DeviceSpec const>().outputs) {
auto& streamContext = pc.services().get<StreamContext>();
for (size_t oi = 0; oi < pc.services().get<DeviceSpec const>().outputs.size(); ++oi) {
OutputRoute const& output = pc.services().get<DeviceSpec const>().outputs[oi];
if ((output.timeslice % output.maxTimeslices) != 0) {
continue;
}
Expand All @@ -471,6 +473,9 @@ o2::framework::ServiceSpec CommonServices::ccdbSupportSpec()
stfDist.id = timingInfo.timeslice;
stfDist.firstOrbit = timingInfo.firstTForbit;
stfDist.runNumber = timingInfo.runNumber;
// We mark it as not created, because we do should not account for it when
// checking if we created all the data for a timeslice.
streamContext.routeUserCreated[oi] = false;
}
} },
.kind = ServiceKind::Global};
Expand Down
2 changes: 1 addition & 1 deletion Framework/Core/src/DataAllocator.cxx
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@ RouteIndex DataAllocator::matchDataHeader(const Output& spec, size_t timeslice)
for (auto ri = 0; ri < allowedOutputRoutes.size(); ++ri) {
auto& route = allowedOutputRoutes[ri];
if (DataSpecUtils::match(route.matcher, spec.origin, spec.description, spec.subSpec) && ((timeslice % route.maxTimeslices) == route.timeslice)) {
stream.routeCreated[ri] = true;
stream.routeUserCreated[ri] = true;
return RouteIndex{ri};
}
}
Expand Down

0 comments on commit 4d5d8b9

Please sign in to comment.