-
Notifications
You must be signed in to change notification settings - Fork 20
Consistent Region Support for HDFS operators
HDFS file system has the following limitations when writing files:
-
Random write access not supported - this causes challenges with resetting the operator as we cannot randomly move the file pointer back to a checkpoint location.
-
Append - for append to work on a very small cluster (< 3 hosts), we need to configure the file replication count to 1 for append to work properly.
- We need to set up a bigger cluster to test if append works in default configurations.
Due to the limitations with HDFS write, our goal for consistent region is to achieve at least once processing, where we allow for duplicated tuples to be written in the file. We do not want to accidentally overwrite data that is already written to the file system.
At the most basic level, HDFS2FileSink only needs a file with a static filename.
() as lineSink1 = HDFS2FileSink(Functor_3_out0 as inPort0Alias)
{
param
file: "consistent.txt";
}
-
On Init: The operator creates the file. The operator does not actually create the file on init. It simply creates a handle. The operator will open and write to file when it receives tuples on #process method.
On initialize, the operator checks if it is restsarting in a consistent region. If the operator is restarting, the operator discards tuples until reset is called. On reset, the operator opens the file again in append mode and continues writing. This ensures that data already stored in the file will not be overwritten.
-
On Drain: The operator will flush all tuples stored in its internal buffer to the HDFS file system. The operator will not close the file.
-
On Checkpoint: The operator saves the current filename, # of tuples written to file, file size as known by the operator and fileNum.
-
On Reset: The operator opens the file as saved in the checkpoint. The file will be opened in append mode. The operator reset the states of its HdfsFile object (internal object for storing state of the file as known to the operator). States of file includes: filename, tupleCnt and size.
-
On ResetToInit: The operator closes the current file, and restores the operator back to its initialization state. A new file will be opened (based on init state). The file will be opened in overwrite mode, and existing content in the file will be overwritten.
The general idea is that files will be opened in append mode if the operator is reset. All other files will be opened in overwrite mode.
With this approach, the user will get the following behavior:
- When a consistent state is being established, the operator will flush all its internal buffer to the HDFS file system, and reduce the chance of losing that data if the HDFS2FileSink operator crashes.
- If an upstream operator crash, the reset method is called. When tuples are replayed, the HDFS2FileSink operator will continue writing to the file in append mode.
- If the HDFS2FileSink operator crashes, the operator will be restarted. Tuples received between operator allPortsReady and reset will be discarded. After the operator is reset, tuples will be written at the end of the file.
In the second case, the filename can be generated dynamically using a combination of these variables:
- %HOST The host that is running the processing element (PE) of this operator.
- %FILENUM The file number, which starts at 0 and counts up as a new file is created for writing.
- %PROCID The process ID of the processing element.
- %PEID The processing element ID.
- %PELAUNCHNUM The PE launch count.
- %TIME The time when the file is created. If the timeFormat parameter is not specified, the default time format is yyyyMMdd_HHmmss.
To support this case, we need to do the following:
- On Init: The operator regenerates the filename to be used for writing. The file is opened in overwrite mode.
- On Drain: The operator will flush all tuples stored in its internal buffer to the HDFS file system. The operator will not close the file.
- On Checkpoint: The operator saves the current filename, tupleCount, fileSize, and fileNum to checkpoint.
- On Reset: The operator closes the current file and opens the file from checkpoint in append mode. When the next tuple comes in, the operator writes tuple to the file in append mode. Operator restores fileNum value from checkpoint.
With this approach, the user will get the following behavior:
-
When a consistent state is being established, the operator will flush all its internal buffer to the HDFS file system, and reduce the chance of losing that data if the HDFS2FileSink operator crashes.
-
If an upstream operator crash, the reset method is called. On reset, we will reopen files that are stored in the checkpoint. File is opened in append mode, so content will be appended to the file. If close modes like tuplesPerFile, bytesPerFile are specified, the tuple count and file size information is restored from the checkpoint. This ensures that tuples meant to be written to the file continues to be written same file.
If close mode is set to timePerFile, the timer will be reset to 0.
-
If the HDFS2FileSink operator crashes, the operator will be restarted. On initialize. On initialization, we detect if the operator is restarting in a consistent region. Tuples between allPortsReady and reset will be discarded. When reset is called, we open the file from checkpoint in append mode, and will start appending tuples to the file.
-
When a new file is needed, a new file name will be generated as usual. The file will be opened in overwrite mode.
-
We guaranteed that tuples will be written to a file at least once. We do not guaranteed that the tuples will be written to the same file as if there has been no failures.
-
HDFS2FileSink can have an optional output port that provides the filename and filesize. The tuple is submitted when the file is closed. On reset, we have to close the file and reopen a file in append mode. Since we cannot submit tuples during reset, we must defer that submission when tuples are allow to flow again. The end result is that we cannot guaranteed that this tuple will not be lost, if multiple resets / failures occur.
HDFS2FileSink operator also allows user to provide filename as an input attribute. The current behavior is that if the filename has changed from the one we are writing to, we will close the current file, create file as specified by the input attribute. If file already exists, the file will be overwritten.
To support this case, we follow the same design for Case 2.
There is an existing request to support append mode for this case. i.e when we open a file with the filename from input attribute, open the file in append mode, so customer can write to multiple files. This feature is not targeted for 3.2.2 at the moment even though we get append working.
HDFS supports random read. With the HDFS FileSystem API and resulting input stream API, one can determine the position of the file pointer and reset it back when the file is opened again.
##Case 1: One file, no input port
In this case, the user provides a filename as a parameter. The operator opens this file and read it line by line (in txt mode), or as blob (in binary mode).
If the operator has no input port, the operator can be at the start of a consistent region. The operator will support both periodic and operator-driver modes.
-
On Init: set up the HDFS client, and create process thread object. The thread is not started at this point.
-
On allPortsReady: the process thread is started and we start reading the file.
-
On ProcessFile: The operator acquires the consistent region permit before submitting each tuple. After the operator has finished processing the file, the operator will establish a consistent cut. It is up to the runtime to determine when it is the best time to establish the cut. (For operator-driver, I expect this to be immediate. For periodic, the runtime determines when it is a good time to establish the cut.)
-
On drain: no work to be done as there is no buffer with the operator
-
On checkpoint: the operator saves the current filename and position of the file cursor
-
On reset: the operator reads the current filename and file cursor location. The operator closes the current file. The operator opens the file from the checkpoint and position the cursor back to that position.
-
On resetToInitialState: the operator closes the current file. The operator restores states to initial state. The operator opens the file and start reading again at the beginning.
With this approach, the user will get the following behavior:
- User can use both periodic or operator driven mode.
- For operator-driven mode, a drain will be performed when the operator has finished reading the file. If the application fails during the read of the file, the file will be re-read from the beginning.
- For periodic mode, drains will be performed based on the consistent region periodic settings. Multiple consistent states could have been established before the operator has finished reading the entire file. If the application fails during the read of the file, the file will be re-read from the end of the last good state (i.e. last checkpoint.)
##Case 2: Multiple files, with input port
In this mode, the operator gets the filename of the file to read from input tuples. On each tuple, the operator opens the file, read the entire file. Tuples are handled sequentially. i.e. we will finish reading the file before the next tuple is processed and the next file is read.
In this mode, the HDFS2FileSource operator cannot be at the start of the consistent region. But it can participate in a consistent region.
-
On Init: set up the HDFS client, do not create process thread
-
On allPortsReady: there is no process thread to start.
-
on process(tuple): when a tuple is received, we get the filename from the tuple. At this point, we open the file for reading. We do not need to acquire the consistent region permit before we submit as the permit is already acquired before the process method is called. What this means is that we will not allow the application to drain until we have finished processing the entire file.
- An alternative is to spawn off a background thread to read the file and return right away.
- Disadvantage here is that the operator may spawn off many threads, and we can start processing multiple files at the same time. To properly do this, we need to set up thread pools to limit the number of threads the operator can spawn off.
- The advantage with this approach is that it allows the application to establish consistent state before the entire file is read. This can be beneficial if the file being read is big. (HDFS encourages big files. Many small files are not recommended.)
- Not taking this approach at the moment to reduce complexity of design
-
On drain: no work to be done as there is no buffer with the operator
-
On checkpoint: in this mode, the operator does not need to save anything to the checkpoint
-
On reset: close the current file. Do not need to spawn off a thread to process a new file, as we expect tuples to be replayed and we will read a file from checkpoint at that time.
-
On resetToInitialState: the operator closes the current file. The operator restores states to initial state. The operator does not need to open any file, as we expect tuples to be replayed, and we will read a file from checkpoint at that time.
With this approach, the user will get the following behavior:
- A consistent state can only be established after a file is fully read.
- If an application failure occurs, the operator depend on tuples to be replayed, and we will re-read the file from the beginning.
#HDFS2DirectoryScan
In this case, the directory scan has the following logic:
-
After allPortsReady, the oprerator starts up a processing thread
-
In the processing thread, the operator scans the directory for all files. Files are returned in no specific order.
-
The operator goes through all files, every time a new file is submitted, it updates a field called latestTimestamp.
-
In the next scan, the operator gets all files again. For each file that is newer than the latest timestamp, the operator submits the filename. All other files are ignored.
###Issues with consistent region
- Because the files are processed in no specific order, even if we remember the latest timestamp.. there is no guaranteed that the time stamp we gather is the latest of all files that we have found. As a result, checkpointing and resetting the latestTimestamp field is not sufficient to support consistent region.
- An alternative is to save every files we have seen into the checkpoint, and in subsequent scans, submit files that we have not seen. Based on our understand of client applications, client can have hundreds of thousands of files in the directory. This essentially means we keep track of all files on the file system, and can take a lot of memory.. with no longical point for clean up.
- We can checkpoint and reset scan time, but that will force files that have already been successfully processed to be processed again. We want to try to get to exactly once processing.
###To support consistent region, the core logic of directory scan needs to be re-designed as follows:
-
Periodically, the operator scans the directory and find all files.
-
e.g. For example, we find the following files (Alphabets are filenames, ts = timestamp)
E ts: 1000
A ts: 3000
C ts: 2000
B ts: 4000
D ts: 5000
-
-
Sort the files by modification time, so files can be processed in the order of oldest to newest.
-
e.g. For example, List will be sorted as
E ts: 1000
C ts: 2000
A ts: 3000
B ts: 4000
D ts: 5000
-
-
For each file, when we submit the filename, we remember two things:
- file modification time
- filename
- With this example, if we get to the end of the list:
- lastSubmittedTs = 5000
- lastSubmittedFile = D
-
On subsequent scan, we will process the files in the order of modification time. If a file's modification is newer than the lastSubmittedTs from the last scan, the filename will be submitted. Otherwise, the file will be ignored.
-
e.g. If the next scan comes back with the following files:
E ts: 1000
C ts: 2000
A ts: 3000
B ts: 4000
D ts: 5000
G ts: 6000
H ts: 6000
F ts: 6000
I ts: 7000
J ts: 8000
-
Then file {F, G, H, I, J} will be submitted.
-
To support consistent region with this new logic, we will do the following
-
Operator can be at start of consistent region, we provide support for both periodic and operator driven mode.
-
If operator driven, a consistent state will be made after every tuple is submitted - allowing a consistent state to be established after a file is fully processed.
-
If periodic, then a consistent state will be made based on the periodic setting.
-
On Checkpoint: the operator will save
- lastSubmittedTs- this is the timestamp of the file that was last submitted.
- lastSubmittedFile - the name of the file that was last submitted
-
On Reset: the operator restores:
-
lastSubmittedTs
-
lastSubmittedFile
-
After the states a restored, in a subsequent scan, the scan will find all files that are newer than the last submitted file, and submit the filename.
-
e.g. For our example, if the following happens:
E ts: 1000
C ts: 2000
A ts: 3000 -> checkpoint
B ts: 4000 -> process, but no checkpoint yet
D ts: 5000 -> crashed while processing D
G ts: 6000
H ts: 6000
F ts: 6000
I ts: 7000
J ts: 8000
-
The operator will replay tuples {B, D, G, H, I, J}
-
-
On ResetToInit: the operator sets:
- lastSubmittedTs = 0
- lastSubmittedFile = null
With this design, the user will get the following behavior:
- Operator can be at start of consistent region, we provide support for both periodic and operator driven mode.
- If operator driven, a consistent state will be made after every tuple is submitted - allowing a consistent state to be established after a file is fully processed.
- If periodic, then a consistent state will be made based on the periodic setting.
- Upon application failure, the operator will re-submit filenames for files that are newer than the last submitted file from checkpoint.
This design works if all files have different timesamps. This design breaks down, when multiple files share the same timestamps as follows:
-
e.g. For our example, if the following happens:
`E ts: 1000` `C ts: 2000` `A ts: 3000 -> checkpoint` `B ts: 4000 ` `D ts: 5000 ` `G ts: 6000 -> checkpoint` `H ts: 6000 -> Crashed while processing H` `F ts: 6000` `I ts: 7000` `J ts: 8000`
-
In this case here, the checkpoint will have G, TS: 6000 saved.
-
In the next scan after the reset, the operator will find that 6000 as the lastest timestamp. As a result, the scan will only submit files {I, J}. But in this case here, files {H, F} are not yet fully processed and should be replayed.
-
To get around this problem, we modify the design a little bit.
-
Periodically, the operator scans the directory and find all files.
-
e.g. For example, we find the following files (Alphabets are filenames, ts = timestamp)
E ts: 1000
A ts: 3000
C ts: 2000
B ts: 4000
D ts: 5000
-
-
Sort the files by modification time, so files can be processed in the order of oldest to newest.
-
For each file, when we submit the filename, we remember two things:
- file modification time
- filename
- With this example, if we get to the end of the list:
- lastSubmittedTs = 5000
- lastSubmittedFile = D
-
On subsequent scan, we will process the files in the order of modification time.
-
For each file we process, we also look ahead to see the next file that we will process. If the next file has the same modification time as the current file, we go collect all files with the same timestamp.
-
e.g. In our example:
E ts: 1000
C ts: 2000
A ts: 3000
B ts: 4000
D ts: 5000
G ts: 6000 -> Let's say this is the current file
H ts: 6000 -> We look ahead, and find H file with same ts
F ts: 6000 -> we collect all files with same timestamp -> {G, H, F}
I ts: 7000
J ts: 8000
-
We will collect files {G, H, F}
-
-
Next, we sort the files we have collected alphabetically, which gives us this list:
`F ts: 6000` `G ts: 6000` `H ts: 6000`
-
We process the files with same timestamp in alphabetical order.
-
In this design, if the operator encounters a file that is newer than the lastSubmittedTs from last scan, we will always submit the tuple.
-
If the operator encounters a file, whose timestamp is the same as the lastSubmittedTs from last scan, we will only submit tuple if the filename is > lastSubmittedFile (this is the actual name of file).
-
For our example, we process files in the following order - notice that the filename changes order...
`E ts: 1000` `C ts: 2000` `A ts: 3000 -> checkpoint` `B ts: 4000 ` `D ts: 5000 ` `F ts: 6000 -> checkpoint` `G ts: 6000 -> Crashed while processing H` `H ts: 6000` `I ts: 7000` `J ts: 8000`
When an application failure happens:
-
checkpoint has F, TS=6000
-
reset will reset with lastSubmittedFile=F, lastSubmittedTs=6000
-
on subsequent scan, the operator will see the files {F, G, H}, having the same timestamp as lastSubmittedTs. In this case, the operator will only submit tuples {G, H} and will not replay {F} again.
The operator allows user to change directory to scan via an input port. At this time, I am treating this as a control input port and will not support this.
The user will get an ERROR message if we detect that HDFS2DirectoryScan has an input port and is in a consistent region. The reason for an error message here is that the threading model for the directory scan is complex. I am not confident at this time that we can tolerate having the directory input from an input stream. Concerns are that it may cause deadlocks due to the various locks that the operator has to manage.