-
Notifications
You must be signed in to change notification settings - Fork 176
Transformation System evolution
- Authors: L.Arrabito, F.Stagni, A.Tsaregorodtsev
The goal is to make the TS a completely data-driven system and a 'workflow engine'. To achieve this goal, we have identified 4 steps in order of priority:
- Implement MetaFilters
- Support for chained Transformations
- Use MQ (Message Queue) complementary to polling
- DIRAC workflow 'machine' abstraction
To achieve a data-driven system, the TS must expose a Catalog Plugin, instrumenting 5 methods:
- addFile
- addReplica
- setFileStatus
- setReplicaStatus
- setMetaData
Also an ApplyQuery utility (here after also called 'filter') is necessary to check if the metadata dictionary of a given file is compatible with the metaqueries of the active Transformations. The similar functionality can be achieved by instrumenting the main File Catalog ( DFC ) with a message emitting mechanism in the events mentioned above. This will be explored.
The addFile method uses the ApplyQuery utility to filter files having UserMetaData satisfying the query conditions of the active Transformations. In this way new registered data are 'automatically' added to the correct Transformations.
The transformation metadata filter can in principle include the location of replicas, processing only data at a given site, for example. Therefore, new tasks can be triggered by addReplica call.
An example of use case is the 'quality flag' of LHCb, which uses the File Status metadata. The first time a file is registered has Status 'AprioriGood' in the FC and by default is added to the TS if it passes the filter. Then, when the quality flag of the file changes (e.g. during validation procedure), the filter must be applied to the updated metadata dictionary of the File. This is achieved by instrumenting the setFileStatus method with the ApplyQuery utility. Here below an example illustrating the sequence:
- We have:
Transformation1(key1=val1) # Transformation which does not put any condition on the quality flag
Transformation2(key1=val1 Status=GoodQuality) # Transformation requiring a 'good' quality flag
- file1(key1=val1) is added
(note that when no Status is specified, by default is 'AprioriGood' in the FC)
-> addFile applies the filter and the file is added to Transformation1 and to Transformation2?
- file1 changes status:
key1=val1; Status=GoodQuality
-> setFileStatus applies the filter and the file remains attached to Transformation1 and Transformation2?
- file1 changes status:
key1=val1; Status=BadQuality
-> setFileStatus applies the filter and the file is removed from Transformation2? Or the file status in the TS changes from 'Unused' to 'Problematic'? What about Transformation1?
To achieve this the setFileStatus should:
- get the list of the Transformations having the file attached -> need to implement a getTransformationsForFile(lfn) method that returns the list of the TransIDs
- for each of these Transformations, apply the filter Utility to take into account the updated Status of the File
There could be also the possibility to define a kind of 'File State machine' which associates the Statuses in the FC to the Statuses in the TS, e.g.:
File Status <-> Status in TS
AprioriGood <-> Unused
Trash <-> Problematic
BadQuality <-> Problematic
Since 'SE' is yet another metadata that could be used in a Transformation, the setReplicaStatus should also be instrumented with the ApplyQuery utility similarly to what described for the setFileStatus.
It can happen that new UserMetadata are added to a file once it's already registered. In this case, the file could become eventually eligible to be attached to some Transformation. To cover this case, the setMetaData method should:
- Get the UserMetaData of the File
- Update the UserMetaData with the new metadata
- Apply the filter Utility to the updated metadatadict of the file, and if it passes the filter add the file to the new Transformations (for the Transformations having the file already attached, the add operation should not have any effect)
In LHCb chained Transformations, e.g. Re-processing -> Merging -> Removal, are handled by a dedicated Production System. The idea is to develop a generalized light system to support chained Transformations, starting from the experience of LHCb, use-cases of CTA, ILC, etc. This system would be much lighter than the LHCb Production System and won't cover all possible use-cases. The idea is that it would serve as basis for each community that could then build its own 'Production System' on top of it.
The implementation of the 'MetaFilters' as described in 1. is not enough to achieve a completely data-driven system, indeed there are other components of the TS working in polling mode. In particular these componenents are the TransformationAgent, the WFTaskAgent and the RequestTaskAgent.
The idea is to use a MQ system, so that a signal is sent to these 3 Agents to trigger their execution. In particular, when a Transformation is created in the TransformationDB, a signal would be sent to the TransformationAgent which prepares the Tasks. When a new Task is created a signal is sent to the WFTaskAgent or to the RequestTaskAgent to trigger their execution. The signal can carry some useful information, for example the transformations touched by the event, in order to let the agents perform just well focused operations contrary to their behavior in a “polling” mode. The 'polling' mode would also remain active to recover the eventual failures of the MQ system (lost signal, etc.). Each agent should have 2 threads: 1 for the 'MQ mode' listening to the messages, and 1 for the 'polling mode' listening to the 'timer'. The idea is to explore the use of RabbitMQ as MQ System.
Once the support of 'chained Transformation' (hereafter 'workflows') is there, the next step is to prepare a generalized interface so that a user is able to easily build its own workflows. This requires to express in the most generalized way the human procedure of building workflows.
First implementation is visible here: https://github.com/arrabito/DIRAC/tree/TSrelv6r13 where the addFile and addReplica methods have been instrumented.