Understand Flink BucketingSink Implementation
Flink provides us the BucketingSink to sink data to HDFS. This article analyzes the BucketingSink implementation and demonstrates how it supports exactly-once semantics.
The BucketingSink writes files to HDFS. The destination directory of a piece of data in HDFS is /<base_path>/<bucket_path>
. Files can be in one of the three states: in-progress, pending or finished.
- in-progress: The file is currently be written into by a sink task. Every task can only have one in-progress file in each bucket. The file name format is
<in_progress_prefix><finished_file_name><in_progress_suffix>
. - pending: The file is closed and waiting for checkpoint. The file can be closed because the file size reaches the file size limit or the file is inactive for some time. The file name format is
<pending_prefix><finished_file_name><pending_suffix>
. - finished: After the checkpoint succeeds, the pending files are moved to finished state. The file name format is
<part_prefix>-<task_index>-<rolling_counter><part_suffix>
.
The explanations of the path name segments appeared above are listed below:
base_path
: The HDFS base path specified when calling BucketingSink constructor.bucket_path
: BucketingSink can write data into many buckets. For every new element, the Bucketer determines whichbucket_path
the element should be written into. The default Bucketer determinesbucket_path
by system time, and we can implement our own bucketing strategy by providing a customized Bucketer.task_index
: Multiple sink tasks can run in parallel, and it denotes which task writes the file. One task only writes to one file at most per bucket at any time by design.rolling_counter
: BucketingSink performs file rolling operations. It closes the current file if the file size is larger than a threshold (default is 384MiB) or the file is idle for some time (default is 1 minute). Data will be written to a new file and the rolling counter is incremented by 1 after the rolling.
A Flink program will be compiled to a JobGraph during execution, and then deployed as several StreamTasks which run in parallel. Each StreamTask executes an operator chain. The BucketingSink is executed by the StreamSink operator. When a StreamTask runs a BucketingSink, it will call BucketingSink#initializeState
and BucketingSink#open
to initialize it, and then call BucketingSink#invoke
to run it. Upon a checkpoint, BucketingSink#snapshotState
is called. After checkpoint succeeds, Flink sends notification and BucketingSink#notifyCheckpointComplete
is called. Let's look into these methods and drill into details.
1. BucketingSink#initializeState
This method performs the following two actions:
- Initialize HDFS file system by calling
BucketingSink#initFileSystem
. - Restore from a checkpoint by calling BucketingSink#handleRestoredBucketState (we will look into this method later in this article).
2. BucketingSink#open
A timer is registered here to check whether in-progress files are inactive after a period of time (default is 1 minute).
3. BucketingSink#onProcessingTime
Upon the timer time out, this method is called. It checks last modified time of each in-progress file, and calls BucketingSink#closeCurrentPartFile
(this method will be described later) if the file's inactive time reached threshold. At the end, A new timer is set. This method will be called again upon new timer timeout.
4. BucketingSink#invoke
The following actions are performed in this method:
- 4.1. Get bucket path from bucketer
Path bucketPath = bucketer.getBucketPath(clock, new Path(basePath), value);
-
- 4.1.1.
Bucketer
- A Bucketer implementation needs to implement
Path getBucketPath(Clock clock, Path basePath, T element)
. It receives the current system time, base path of the BucketingSink, and the value of data read from BucketingSink input. The return value is the bucket path.
- 4.1.1.
- 4.2. Get bucket state of the bucket. Create a new one if not available.
- A task can write into many buckets at a time and write to one file at most in every bucket. A BucketState instance keeps track of the internal state of the bucket which the task is written into.
BucketState bucketState = state.getBucketState(bucketPath); if (bucketState == null) { bucketState = new BucketState<>(currentProcessingTime); state.addBucketState(bucketPath, bucketState); }
- 4.2.1.
BucketingSink.BucketState
- This is used for keeping track of the current in-progress file, closed files waiting for checkpoint, and files waiting for checkpoint success notification for a bucket. The following are kept in BucketState:
currentFile
: The file is written into currently, i.e., in-progress file.currentFileValidLength
: The in-progress file's length at the time of the last checkpoint.lastWrittenToTime
: The time that the in-progress file was last written.partCounter
: The rolling counter of the in-progress file in the bucketpendingFiles
: Pending files since the last checkpoint.pendingFilesPerCheckpoint
: The files waiting for checkpoint success notification.writer
: The writer that writes data to file.
- 4.2.1.
- 4.3. Check whether the file which is written into currently reaches file size limit. Perform file rolling if file size limit is reached.
if (shouldRoll(bucketState)) { openNewPartFile(bucketPath, bucketState); }
- 4.3.1.
BucketingSink#shouldRoll
- It checks whether a new file should be created. A new file needs to be created if the writer was closed (due to file inactive) or the file size reached file size limit.
- 4.3.2.
BucketingSink#openNewPartFile
- It closes the current file and open a new one.
- Call
BucketingSink#closeCurrentPartFile
. During the execution of this method, the writer is closed, and the in-progress file is converted to a pending file. - Then, it constructs the path for the next in-progress file, prepare the corresponding writer for the file, and update
currentFile
andwriter
in the bucket state.
- Call
- 4.3.1.
- 4.4. Write data into the file
bucketState.writer.write(value);
5. BucketingSink#snapshotState
During state snap shot, the following actions are carried out:
- Flush writer for each bucket. Record the latest in-progress file valid length in its bucket state.
- Record pending files of the bucket for this checkpoint in
bucketState.pendingFilesPerCheckpoint
, and then emptybucketState.pendingFiles
. - Save the bucket state to the operator state for further persisting and processing.
6. BucketingSink#notifyCheckpointComplete
After a checkpoint is completed, all the pending files in bucketState.pendingFilesPerCheckpoint
whose corresponding checkpoint ID is less than the notified one are converted to finished. The converted entries of bucketState.pendingFilesPerCheckpoint
are cleared. Finally, the bucket state is cleared if its writer
is closed, pendingFiles
is empty, and pendingFilesPerCheckpoint
is empty.
7. bucketingSink#handleRestoredBucketState
Now, let's go back to this method which is called in BucketingSink#initializeState
to understand how it guarantees exactly-once semantics when recovering from a checkpoint.
- 7.1. Clear
pendingFiles
of the bucket state. Since we are restarting from a successful checkpoint, this field should be empty. restoredState.pendingFiles.clear();
- 7.2. Deal with the in-progress file at the checkpoint.
- For an in-progress file, only the bytes from beginning of the file to the checkpointed length (which is recorded in
currentFileValidLength
of the bucket state) are valid, so we need remove the invalid bytes. handlePendingInProgressFile(restoredState.currentFile, restoredState.currentFileValidLength);
- 7.2.1.
BucketingSink#handlePendingInProgressFile
-
- Find the path of the file. The file can be in in-progress, pending or finished state now. Move the file to finished state.
- Use reflection to determine whether the file system is capable to truncate the file. If truncate function is available (Hadoop 2.7+), the file is truncated to to it's valid length. Otherwise, a valid-length file with file name
is written to the bucket path and the content is the- - currentFileValidLength
in the bucket state. When processing the file in future, only read the bytes from the file up to its valid length to ensure exactly-once semantics.
- 7.2.1.
- 7.3. Reset the flag for current file in bucket state.
restoredState.currentFile = null; restoredState.currentFileValidLength = -1;
- 7.4. Process
pendingFilesPerCheckpoint
of the bucket state. handlePendingFilesForPreviousCheckpoints(restoredState.pendingFilesPerCheckpoint); restoredState.pendingFilesPerCheckpoint.clear();
The field
pendingFilesPerCheckpoint
contains all the files waiting for checkpoint complete notification of the one restored from and probably the earlier ones. The checkpoint notification might be lost due to a failure just after the checkpoint is done, sopendingFilesPerCheckpoint
might not be empty and we need to process them. Since we are restoring from a successful checkpoint, it can be guaranteed that all the files inpendingFilesPerCheckpoint
were written successfully. Thus, all the files inpendingFilesPerCheckpoint
are moved to finished state, andpendingFilesPerCheckpoint
is cleared.
Finally, we have gone through the major components and the corresponding implementations in BucketingSink, and discussed how it ensures exactly-once semantics by storing the states and recovering from them.