diff --git a/Framework/Core/include/Framework/Plugins.h b/Framework/Core/include/Framework/Plugins.h index 925943c6bffc3..3d320c6f2abb5 100644 --- a/Framework/Core/include/Framework/Plugins.h +++ b/Framework/Core/include/Framework/Plugins.h @@ -44,6 +44,9 @@ enum struct DplPluginKind : int { // using the arrow dataset API RootObjectReadingImplementation, + // A plugin which defines a whole workflow. This will be used to separate + // workflows in shared libraries and run them via a separate loader. + Workflow, // A plugin which was not initialised properly. Unknown }; diff --git a/Framework/Core/include/Framework/runDataProcessing.h b/Framework/Core/include/Framework/runDataProcessing.h index 07083314af12e..aa613d48fb86f 100644 --- a/Framework/Core/include/Framework/runDataProcessing.h +++ b/Framework/Core/include/Framework/runDataProcessing.h @@ -26,7 +26,9 @@ #include "Framework/CustomWorkflowTerminationHook.h" #include "Framework/CommonServices.h" #include "Framework/WorkflowCustomizationHelpers.h" +#include "Framework/WorkflowDefinitionContext.h" #include "Framework/Logger.h" +#include "Framework/Plugins.h" #include "Framework/CheckTypes.h" #include "Framework/StructToTuple.h" #include "ResourcePolicy.h" @@ -125,16 +127,7 @@ void overrideCloning(o2::framework::ConfigContext& ctx, std::vector& workflow); // This comes from the framework itself. This way we avoid code duplication. -int doMain(int argc, char** argv, o2::framework::WorkflowSpec const& specs, - std::vector const& channelPolicies, - std::vector const& completionPolicies, - std::vector const& dispatchPolicies, - std::vector const& resourcePolicies, - std::vector const& callbacksPolicies, - std::vector const& sendingPolicies, - std::vector const& workflowOptions, - std::vector const& detectedOptions, - o2::framework::ConfigContext& configContext); +int doMain(int argc, char** argv, o2::framework::WorkflowDefinitionContext& context, o2::framework::ConfigContext& configContext); void doDefaultWorkflowTerminationHook(); @@ -167,55 +160,44 @@ void callWorkflowTermination(T&, char const* idstring) void overrideAll(o2::framework::ConfigContext& ctx, std::vector& workflow); -o2::framework::ConfigContext createConfigContext(std::unique_ptr& workflowOptionsRegistry, - o2::framework::ServiceRegistry& configRegistry, - std::vector& workflowOptions, - std::vector& extraOptions, int argc, char** argv); +std::unique_ptr createConfigContext(std::unique_ptr& workflowOptionsRegistry, + o2::framework::ServiceRegistry& configRegistry, + std::vector& workflowOptions, + std::vector& extraOptions, int argc, char** argv); std::unique_ptr createRegistry(); -int mainNoCatch(int argc, char** argv) -{ - using namespace o2::framework; +char* getIdString(int argc, char** argv); - std::vector workflowOptions; - UserCustomizationsHelper::userDefinedCustomization(workflowOptions); - auto requiredWorkflowOptions = WorkflowCustomizationHelpers::requiredWorkflowOptions(); - workflowOptions.insert(std::end(workflowOptions), std::begin(requiredWorkflowOptions), std::end(requiredWorkflowOptions)); - - std::vector completionPolicies = injectCustomizations(); - std::vector dispatchPolicies = injectCustomizations(); - std::vector resourcePolicies = injectCustomizations(); - std::vector callbacksPolicies = injectCustomizations(); - std::vector sendingPolicies = injectCustomizations(); - - std::unique_ptr configRegistry = createRegistry(); - std::vector extraOptions; - std::unique_ptr workflowOptionsRegistry{nullptr}; - auto configContext = createConfigContext(workflowOptionsRegistry, *configRegistry, workflowOptions, extraOptions, argc, argv); - - o2::framework::WorkflowSpec specs = defineDataProcessing(configContext); - overrideAll(configContext, specs); - for (auto& spec : specs) { - UserCustomizationsHelper::userDefinedCustomization(spec.requiredServices); - } - std::vector channelPolicies; - UserCustomizationsHelper::userDefinedCustomization(channelPolicies); - auto defaultChannelPolicies = ChannelConfigurationPolicy::createDefaultPolicies(configContext); - channelPolicies.insert(std::end(channelPolicies), std::begin(defaultChannelPolicies), std::end(defaultChannelPolicies)); - return doMain(argc, argv, specs, - channelPolicies, completionPolicies, dispatchPolicies, - resourcePolicies, callbacksPolicies, sendingPolicies, workflowOptions, extraOptions, configContext); +#define STRINGIZE_NX(A) #A +#define STRINGIZE(A) STRINGIZE_NX(A) + +// This is to allow the old "executable" based behavior +// Each executable will contain a plugin called InternalWorkflow +// In case one wants to use the new DSO based approach, the +// name of the plugin an the library name where it is located +// will have to be specified at build time. +#ifndef DPL_WORKFLOW_PLUGIN_NAME +#define DPL_WORKFLOW_PLUGIN_NAME InternalCustomWorkflow +#ifdef DPL_WORKFLOW_PLUGIN_LIBRARY +#error Missing DPL_WORKFLOW_PLUGIN_NAME +#endif +#define DPL_WORKFLOW_PLUGIN_LIBRARY "" +#endif + +consteval char const* pluginName() +{ + return DPL_WORKFLOW_PLUGIN_LIBRARY ":" STRINGIZE(DPL_WORKFLOW_PLUGIN_NAME); } -int callMain(int argc, char** argv, int (*)(int, char**)); -char* getIdString(int argc, char** argv); +// Executables behave this way +int callMain(int argc, char** argv, char const* pluginName); int main(int argc, char** argv) { using namespace o2::framework; - int result = callMain(argc, argv, mainNoCatch); + int result = callMain(argc, argv, pluginName()); char* idstring = getIdString(argc, argv); o2::framework::OnWorkflowTerminationHook onWorkflowTerminationHook; @@ -223,4 +205,52 @@ int main(int argc, char** argv) return result; } + +struct WorkflowDefinition { + std::function defineWorkflow; +}; + +struct DPL_WORKFLOW_PLUGIN_NAME : o2::framework::WorkflowPlugin { + o2::framework::WorkflowDefinition* create() override + { + return new o2::framework::WorkflowDefinition{ + .defineWorkflow = [](int argc, char** argv) -> o2::framework::WorkflowDefinitionContext { + using namespace o2::framework; + WorkflowDefinitionContext workflowContext; + + UserCustomizationsHelper::userDefinedCustomization(workflowContext.workflowOptions); + auto requiredWorkflowOptions = WorkflowCustomizationHelpers::requiredWorkflowOptions(); + workflowContext.workflowOptions.insert(std::end(workflowContext.workflowOptions), std::begin(requiredWorkflowOptions), std::end(requiredWorkflowOptions)); + + workflowContext.completionPolicies = injectCustomizations(); + workflowContext.dispatchPolicies = injectCustomizations(); + workflowContext.resourcePolicies = injectCustomizations(); + workflowContext.callbacksPolicies = injectCustomizations(); + workflowContext.sendingPolicies = injectCustomizations(); + + workflowContext.configRegistry = createRegistry(); + workflowContext.configContext = createConfigContext(workflowContext.workflowOptionsRegistry, *workflowContext.configRegistry, workflowContext.workflowOptions, workflowContext.extraOptions, argc, argv); + + workflowContext.specs = defineDataProcessing(*workflowContext.configContext); + overrideAll(*workflowContext.configContext, workflowContext.specs); + for (auto& spec : workflowContext.specs) { + UserCustomizationsHelper::userDefinedCustomization(spec.requiredServices); + } + UserCustomizationsHelper::userDefinedCustomization(workflowContext.channelPolicies); + auto defaultChannelPolicies = ChannelConfigurationPolicy::createDefaultPolicies(*workflowContext.configContext); + workflowContext.channelPolicies.insert(std::end(workflowContext.channelPolicies), std::begin(defaultChannelPolicies), std::end(defaultChannelPolicies)); + return workflowContext; + }}; + } +}; + +// This is like the plugin macros, we simply do it explicitly to avoid macro inside macro expansion +extern "C" { +DPLPluginHandle* dpl_plugin_callback(DPLPluginHandle* previous) +{ + previous = new DPLPluginHandle{new DPL_WORKFLOW_PLUGIN_NAME{}, strdup(STRINGIZE(DPL_WORKFLOW_PLUGIN_NAME)), o2::framework::DplPluginKind::Workflow, previous}; + return previous; +} +} + #endif diff --git a/Framework/Core/src/runDataProcessing.cxx b/Framework/Core/src/runDataProcessing.cxx index 66fc2c7b2c3df..660f8c27d5e4e 100644 --- a/Framework/Core/src/runDataProcessing.cxx +++ b/Framework/Core/src/runDataProcessing.cxx @@ -13,9 +13,7 @@ #include #include "Framework/BoostOptionsRetriever.h" #include "Framework/BacktraceHelpers.h" -#include "Framework/CallbacksPolicy.h" #include "Framework/ChannelConfigurationPolicy.h" -#include "Framework/ChannelMatching.h" #include "Framework/ConfigParamsHelper.h" #include "Framework/ConfigParamSpec.h" #include "Framework/ConfigContext.h" @@ -38,7 +36,6 @@ #include "Framework/ServiceRegistryHelpers.h" #include "Framework/DevicesManager.h" #include "Framework/DebugGUI.h" -#include "Framework/LocalRootFileService.h" #include "Framework/LogParsingHelpers.h" #include "Framework/Logger.h" #include "Framework/ParallelContext.h" @@ -62,6 +59,7 @@ #include "Framework/DeviceContext.h" #include "Framework/ServiceMetricsInfo.h" #include "Framework/DataTakingContext.h" +#include "Framework/WorkflowDefinitionContext.h" #include "Framework/CommonServices.h" #include "Framework/DefaultsHelpers.h" #include "ProcessingPoliciesHelpers.h" @@ -192,13 +190,28 @@ char* getIdString(int argc, char** argv) return nullptr; } -int callMain(int argc, char** argv, int (*mainNoCatch)(int, char**)) +int doMain(int argc, char** argv, o2::framework::WorkflowDefinitionContext& workflowContext); + +int callMain(int argc, char** argv, char const* pluginSpec) { + std::vector plugins; + auto morePlugins = PluginManager::parsePluginSpecString(pluginSpec); + for (auto& extra : morePlugins) { + plugins.push_back(extra); + } + // Only one for now + assert(plugins.size() == 1); + + std::vector availableWorkflows; + PluginManager::loadFromPlugin(plugins, availableWorkflows); + + assert(availableWorkflows.size() == 1); static bool noCatch = getenv("O2_NO_CATCHALL_EXCEPTIONS") && strcmp(getenv("O2_NO_CATCHALL_EXCEPTIONS"), "0"); int result = 1; + o2::framework::WorkflowDefinitionContext workflowContext = availableWorkflows.back().defineWorkflow(argc, argv); if (noCatch) { try { - result = mainNoCatch(argc, argv); + result = doMain(argc, argv, workflowContext); } catch (o2::framework::RuntimeErrorRef& ref) { doDPLException(ref, argv[0]); throw; @@ -209,7 +222,7 @@ int callMain(int argc, char** argv, int (*mainNoCatch)(int, char**)) // SFINAE expression above fit better the version which invokes user code over // the default one. // The default policy is a catch all pub/sub setup to be consistent with the past. - result = mainNoCatch(argc, argv); + result = doMain(argc, argv, workflowContext); } catch (boost::exception& e) { doBoostException(e, argv[0]); throw; @@ -2806,10 +2819,10 @@ void overrideAll(o2::framework::ConfigContext& ctx, std::vector& workflowOptionsRegistry, - o2::framework::ServiceRegistry& configRegistry, - std::vector& workflowOptions, - std::vector& extraOptions, int argc, char** argv) +std::unique_ptr createConfigContext(std::unique_ptr& workflowOptionsRegistry, + o2::framework::ServiceRegistry& configRegistry, + std::vector& workflowOptions, + std::vector& extraOptions, int argc, char** argv) { std::vector> retrievers; std::unique_ptr retriever{new o2::framework::BoostOptionsRetriever(true, argc, argv)}; @@ -2823,7 +2836,7 @@ o2::framework::ConfigContext createConfigContext(std::unique_ptr(*workflowOptionsRegistry, o2::framework::ServiceRegistryRef{configRegistry}, argc, argv); } std::unique_ptr createRegistry() @@ -2840,16 +2853,7 @@ std::unique_ptr createRegistry() // killing them all on ctrl-c). // - Child, pick the data-processor ID and start a O2DataProcessorDevice for // each DataProcessorSpec -int doMain(int argc, char** argv, o2::framework::WorkflowSpec const& workflow, - std::vector const& channelPolicies, - std::vector const& completionPolicies, - std::vector const& dispatchPolicies, - std::vector const& resourcePolicies, - std::vector const& callbacksPolicies, - std::vector const& sendingPolicies, - std::vector const& currentWorkflowOptions, - std::vector const& detectedParams, - o2::framework::ConfigContext& configContext) +int doMain(int argc, char** argv, o2::framework::WorkflowDefinitionContext& workflowContext) { // Peek very early in the driver options and look for // signposts, so the we can enable it without going through the whole dance @@ -2868,7 +2872,7 @@ int doMain(int argc, char** argv, o2::framework::WorkflowSpec const& workflow, WorkflowInfo currentWorkflow{ argv[0], currentArgs, - currentWorkflowOptions}; + workflowContext.workflowOptions}; ProcessingPolicies processingPolicies; enum LogParsingHelpers::LogLevel minFailureLevel; @@ -2918,7 +2922,7 @@ int doMain(int argc, char** argv, o2::framework::WorkflowSpec const& workflow, bpo::options_description visibleOptions; visibleOptions.add(executorOptions); - auto physicalWorkflow = workflow; + auto physicalWorkflow = workflowContext.specs; std::map rankIndex; // We remove the duplicates because for the moment child get themself twice: // once from the actual definition in the child, a second time from the @@ -2929,11 +2933,11 @@ int doMain(int argc, char** argv, o2::framework::WorkflowSpec const& workflow, size_t workflowHashA = 0; std::hash hash_fn; - for (auto& dp : workflow) { + for (auto& dp : workflowContext.specs) { workflowHashA += hash_fn(dp.name); } - for (auto& dp : workflow) { + for (auto& dp : workflowContext.specs) { rankIndex.insert(std::make_pair(dp.name, workflowHashA)); } @@ -2985,7 +2989,7 @@ int doMain(int argc, char** argv, o2::framework::WorkflowSpec const& workflow, OverrideServiceSpecs driverServicesOverride = ServiceSpecHelpers::parseOverrides(getenv("DPL_DRIVER_OVERRIDE_SERVICES")); ServiceSpecs driverServices = ServiceSpecHelpers::filterDisabled(CommonDriverServices::defaultServices(), driverServicesOverride); // We insert the hash for the internal devices. - WorkflowHelpers::injectServiceDevices(physicalWorkflow, configContext); + WorkflowHelpers::injectServiceDevices(physicalWorkflow, *workflowContext.configContext); auto reader = std::find_if(physicalWorkflow.begin(), physicalWorkflow.end(), [](DataProcessorSpec& spec) { return spec.name == "internal-dpl-aod-reader"; }); if (reader != physicalWorkflow.end()) { driverServices.push_back(ArrowSupport::arrowBackendSpec()); @@ -2995,7 +2999,7 @@ int doMain(int argc, char** argv, o2::framework::WorkflowSpec const& workflow, continue; } WorkflowSpecNode node{physicalWorkflow}; - service.injectTopology(node, configContext); + service.injectTopology(node, *workflowContext.configContext); } for (auto& dp : physicalWorkflow) { if (dp.name.rfind("internal-", 0) == 0) { @@ -3101,7 +3105,7 @@ int doMain(int argc, char** argv, o2::framework::WorkflowSpec const& workflow, // Use the hidden options as veto, all config specs matching a definition // in the hidden options are skipped in order to avoid duplicate definitions // in the main parser. Note: all config specs are forwarded to devices - visibleOptions.add(ConfigParamsHelper::prepareOptionDescriptions(physicalWorkflow, currentWorkflowOptions, gHiddenDeviceOptions)); + visibleOptions.add(ConfigParamsHelper::prepareOptionDescriptions(physicalWorkflow, workflowContext.workflowOptions, gHiddenDeviceOptions)); bpo::options_description od; od.add(visibleOptions); @@ -3137,7 +3141,7 @@ int doMain(int argc, char** argv, o2::framework::WorkflowSpec const& workflow, conflicting_options(varmap, "no-batch", "batch"); if (varmap.count("help")) { - printHelp(varmap, executorOptions, physicalWorkflow, currentWorkflowOptions); + printHelp(varmap, executorOptions, physicalWorkflow, workflowContext.workflowOptions); exit(0); } /// Set the fair::Logger severity to the one specified in the command line @@ -3185,16 +3189,16 @@ int doMain(int argc, char** argv, o2::framework::WorkflowSpec const& workflow, return true; }; DriverInfo driverInfo{ - .sendingPolicies = sendingPolicies, + .sendingPolicies = workflowContext.sendingPolicies, .forwardingPolicies = forwardingPolicies, - .callbacksPolicies = callbacksPolicies}; + .callbacksPolicies = workflowContext.callbacksPolicies}; driverInfo.states.reserve(10); driverInfo.sigintRequested = false; driverInfo.sigchldRequested = false; - driverInfo.channelPolicies = channelPolicies; - driverInfo.completionPolicies = completionPolicies; - driverInfo.dispatchPolicies = dispatchPolicies; - driverInfo.resourcePolicies = resourcePolicies; + driverInfo.channelPolicies = workflowContext.channelPolicies; + driverInfo.completionPolicies = workflowContext.completionPolicies; + driverInfo.dispatchPolicies = workflowContext.dispatchPolicies; + driverInfo.resourcePolicies = workflowContext.resourcePolicies; driverInfo.argc = argc; driverInfo.argv = argv; driverInfo.noSHMCleanup = varmap["no-cleanup"].as(); @@ -3226,7 +3230,7 @@ int doMain(int argc, char** argv, o2::framework::WorkflowSpec const& workflow, // FIXME: should use the whole dataProcessorInfos, actually... driverInfo.processorInfo = dataProcessorInfos; - driverInfo.configContext = &configContext; + driverInfo.configContext = workflowContext.configContext.get(); DriverControl driverControl; initialiseDriverControl(varmap, driverInfo, driverControl); @@ -3255,7 +3259,7 @@ int doMain(int argc, char** argv, o2::framework::WorkflowSpec const& workflow, driverInfo, driverConfig, gDeviceMetricsInfos, - detectedParams, + workflowContext.extraOptions, varmap, driverServices, frameworkId);