Skip to content

Transformation System evolution

arrabito edited this page Mar 31, 2015 · 34 revisions

The goal is to make the TS a completely data-driven system and a 'workflow engine'. To achieve this goal, we have identified 4 steps in order of priority:

  1. Implement MetaFilters
  2. Support for chained Transformations
  3. Use MQ (Message Queue) complementary to polling
  4. DIRAC workflow 'machine' abstraction

Implement MetaFilters

To achieve a data-driven system, the TS must expose a Catalog Plugin, instrumenting 5 methods:

  • addFile
  • addReplica
  • setFileStatus
  • setReplicaStatus
  • setMetaData

Also an ApplyQuery utility (here after also called 'filter') is necessary to check if the metadata dictionary of a given file is compatible with the metaqueries of the active Transformations. The similar functionality can be achieved by instrumenting the main File Catalog ( DFC ) with a message emitting mechanism in the events mentioned above. This will be explored.

addFile

The addFile method uses the ApplyQuery utility to filter files having UserMetaData satisfying the query conditions of the active Transformations. In this way new registered data are 'automatically' added to the correct Transformations.

addReplica

The transformation metadata filter can in principle include the location of replicas, processing only data at a given site, for example. Therefore, new tasks can be triggered by addReplica call.

setFileStatus

An example of use case is the 'quality flag' of LHCb, which uses the File Status metadata. The first time a file is registered has Status 'AprioriGood' in the FC and by default is added to the TS if it passes the filter. Then, when the quality flag of the file changes (e.g. during validation procedure), the filter must be applied to the updated metadata dictionary of the File. This is achieved by instrumenting the setFileStatus method with the ApplyQuery utility. Here below an example illustrating the sequence:

  1. We have: Transformation1(key1=val1) # Transformation which does not put any condition on the quality flag Transformation2(key1=val1 Status=GoodQuality) # Transformation requiring a 'good' quality flag

  2. file1(key1=val1) is added (note that when no Status is specified, by default is 'AprioriGood' in the FC) -> addFile applies the filter and the file is added to Transformation1 and to Transformation2?

  3. file1 changes status: key1=val1 Status=GoodQuality -> setFileStatus applies the filter and the file remains attached to Transformation1 and Transformation2?

  4. file1 changes status: key1=val1 Status=BadQuality -> setFileStatus applies the filter and the file is removed from Transformation2? Or the file status in the TS changes from 'Unused' to 'Problematic'? What about Transformation1?

To achieve this the setFileStatus should:

  • get the list of the Transformations having the file attached -> need to implement a getTransformationsForFile(lfn) method that returns the list of the TransIDs
  • for each of these Transformations, apply the filter Utility to take into account the updated Status of the File

We've also discussed the possibility to define a kind of 'File State machine' which associates the Statuses in the FC to the Statuses in the TS, e.g.:

File Status <-> Status in TS

AprioriGood <-> Unused

Trash <-> Problematic

BadQuality <-> Problematic

setReplicaStatus

Since 'SE' is yet another metadata that could be used in a Transformation, the setReplicaStatus should also be instrumented with the ApplyQuery utility similarly to what described for the setFileStatus.

setMetaData

It can happen that new UserMetadata are added to a file once it's already registered. In this case, the file could become eventually eligible to be attached to some Transformation. To cover this case, the setMetaData method should:

  • Get the UserMetaData of the File
  • Update the UserMetaData with the new metadata
  • Apply the filter Utility to the updated metadatadict of the file, and if it passes the filter add the file to the new Transformations (for the Transformations having the file already attached, the add operation should not have any effect)

Support for chained Transformations

In LHCb chained Transformations, e.g. Re-processing -> Merging -> Removal, are handled by a dedicated Production System. The idea is to develop a generalized light system to support chained Transformations, starting from the experience of LHCb, use-cases of CTA, ILC, etc. This system would be much lighter than the LHCb Production System and won't cover all possible use-cases. The idea is that it would serve as basis for each community that could then build its own 'Production System' on top of it.

Use MQ (Message Queue) complementary to polling

The implementation of the 'MetaFilters' as described in 1. is not enough to achieve a completely data-driven system, indeed there are other components of the TS working in polling mode. In particular these componenents are the TransformationAgent, the WFTaskAgent and the RequestTaskAgent.

The idea is to use a MQ system, so that a signal is sent to these 3 Agents to trigger their execution. In particular, when a Transformation is created in the TransformationDB, a signal would be sent to the TransformationAgent which prepares the Tasks. When a new Task is created a signal is sent to the WFTaskAgent or to the RequestTaskAgent to trigger their execution. The signal can carry some useful information, for example the transformations touched by the event, in order to let the agents perform just well focused operations contrary to their behavior in a “polling” mode. The 'polling' mode would also remain active to recover the eventual failures of the MQ system (lost signal, etc.). Each agent should have 2 threads: 1 for the 'MQ mode' listening to the messages, and 1 for the 'polling mode' listening to the 'timer'. The idea is to explore the use of RabbitMQ as MQ System.

DIRAC workflow 'machine' abstraction

Once the support of 'chained Transformation' (hereafter 'workflows') is there, the next step is to prepare a generalized interface so that a user is able to easily build its own workflows. This requires to express in the most generalized way the human procedure of building workflows.

Clone this wiki locally