Zhu Wu's Blog

The world is a fine place and worth fighting for.

Understand Flink BucketingSink Implementation

Flink provides us the BucketingSink to sink data to HDFS. This article analyzes the BucketingSink implementation and demonstrates how it supports exactly-once semantics.

The BucketingSink writes files to HDFS. The destination directory of a piece of data in HDFS is /<base_path>/<bucket_path>. Files can be in one of the three states: in-progress, pending or finished.

  • in-progress: The file is currently be written into by a sink task. Every task can only have one in-progress file in each bucket. The file name format is <in_progress_prefix><finished_file_name><in_progress_suffix>.
  • pending: The file is closed and waiting for checkpoint. The file can be closed because the file size reaches the file size limit or the file is inactive for some time. The file name format is <pending_prefix><finished_file_name><pending_suffix>.
  • finished: After the checkpoint succeeds, the pending files are moved to finished state. The file name format is <part_prefix>-<task_index>-<rolling_counter><part_suffix>.

The explanations of the path name segments appeared above are listed below:

  • base_path: The HDFS base path specified when calling BucketingSink constructor.
  • bucket_path: BucketingSink can write data into many buckets. For every new element, the Bucketer determines which bucket_path the element should be written into. The default Bucketer determines bucket_path by system time, and we can implement our own bucketing strategy by providing a customized Bucketer.
  • task_index: Multiple sink tasks can run in parallel, and it denotes which task writes the file. One task only writes to one file at most per bucket at any time by design.
  • rolling_counter: BucketingSink performs file rolling operations. It closes the current file if the file size is larger than a threshold (default is 384MiB) or the file is idle for some time (default is 1 minute). Data will be written to a new file and the rolling counter is incremented by 1 after the rolling.

A Flink program will be compiled to a JobGraph during execution, and then deployed as several StreamTasks which run in parallel. Each StreamTask executes an operator chain. The BucketingSink is executed by the StreamSink operator. When a StreamTask runs a BucketingSink, it will call BucketingSink#initializeState and BucketingSink#open to initialize it, and then call BucketingSink#invoke to run it. Upon a checkpoint, BucketingSink#snapshotState is called. After checkpoint succeeds, Flink sends notification and BucketingSink#notifyCheckpointComplete is called. Let's look into these methods and drill into details.

1. BucketingSink#initializeState

This method performs the following two actions:

  • Initialize HDFS file system by calling BucketingSink#initFileSystem.
  • Restore from a checkpoint by calling BucketingSink#handleRestoredBucketState (we will look into this method later in this article).

2. BucketingSink#open

A timer is registered here to check whether in-progress files are inactive after a period of time (default is 1 minute).

3. BucketingSink#onProcessingTime

Upon the timer time out, this method is called. It checks last modified time of each in-progress file, and calls BucketingSink#closeCurrentPartFile (this method will be described later) if the file's inactive time reached threshold. At the end, A new timer is set. This method will be called again upon new timer timeout.

4. BucketingSink#invoke

The following actions are performed in this method:

4.1. Get bucket path from bucketer
Path bucketPath = bucketer.getBucketPath(clock, new Path(basePath), value);
4.1.1. Bucketer
A Bucketer implementation needs to implement Path getBucketPath(Clock clock, Path basePath, T element). It receives the current system time, base path of the BucketingSink, and the value of data read from BucketingSink input. The return value is the bucket path.
4.2. Get bucket state of the bucket. Create a new one if not available.
A task can write into many buckets at a time and write to one file at most in every bucket. A BucketState instance keeps track of the internal state of the bucket which the task is written into.
BucketState bucketState = state.getBucketState(bucketPath);
if (bucketState == null) {
    bucketState = new BucketState<>(currentProcessingTime);
    state.addBucketState(bucketPath, bucketState);
}
4.2.1. BucketingSink.BucketState
This is used for keeping track of the current in-progress file, closed files waiting for checkpoint, and files waiting for checkpoint success notification for a bucket. The following are kept in BucketState:
  • currentFile: The file is written into currently, i.e., in-progress file.
  • currentFileValidLength: The in-progress file's length at the time of the last checkpoint.
  • lastWrittenToTime: The time that the in-progress file was last written.
  • partCounter: The rolling counter of the in-progress file in the bucket
  • pendingFiles: Pending files since the last checkpoint.
  • pendingFilesPerCheckpoint: The files waiting for checkpoint success notification.
  • writer: The writer that writes data to file.
4.3. Check whether the file which is written into currently reaches file size limit. Perform file rolling if file size limit is reached.
if (shouldRoll(bucketState)) {
    openNewPartFile(bucketPath, bucketState);
}
4.3.1. BucketingSink#shouldRoll
It checks whether a new file should be created. A new file needs to be created if the writer was closed (due to file inactive) or the file size reached file size limit.
4.3.2. BucketingSink#openNewPartFile
It closes the current file and open a new one.
  • Call BucketingSink#closeCurrentPartFile. During the execution of this method, the writer is closed, and the in-progress file is converted to a pending file.
  • Then, it constructs the path for the next in-progress file, prepare the corresponding writer for the file, and update currentFile and writer in the bucket state.
4.4. Write data into the file
bucketState.writer.write(value);

5. BucketingSink#snapshotState

During state snap shot, the following actions are carried out:

  • Flush writer for each bucket. Record the latest in-progress file valid length in its bucket state.
  • Record pending files of the bucket for this checkpoint in bucketState.pendingFilesPerCheckpoint, and then empty bucketState.pendingFiles.
  • Save the bucket state to the operator state for further persisting and processing.

6. BucketingSink#notifyCheckpointComplete

After a checkpoint is completed, all the pending files in bucketState.pendingFilesPerCheckpoint whose corresponding checkpoint ID is less than the notified one are converted to finished. The converted entries of bucketState.pendingFilesPerCheckpoint are cleared. Finally, the bucket state is cleared if its writer is closed, pendingFiles is empty, and pendingFilesPerCheckpoint is empty.

7. bucketingSink#handleRestoredBucketState

Now, let's go back to this method which is called in BucketingSink#initializeState to understand how it guarantees exactly-once semantics when recovering from a checkpoint.

7.1. Clear pendingFiles of the bucket state. Since we are restarting from a successful checkpoint, this field should be empty.
restoredState.pendingFiles.clear();
7.2. Deal with the in-progress file at the checkpoint.
For an in-progress file, only the bytes from beginning of the file to the checkpointed length (which is recorded in currentFileValidLength of the bucket state) are valid, so we need remove the invalid bytes.
handlePendingInProgressFile(restoredState.currentFile, restoredState.currentFileValidLength);
7.2.1. BucketingSink#handlePendingInProgressFile
  • Find the path of the file. The file can be in in-progress, pending or finished state now. Move the file to finished state.
  • Use reflection to determine whether the file system is capable to truncate the file. If truncate function is available (Hadoop 2.7+), the file is truncated to to it's valid length. Otherwise, a valid-length file with file name -- is written to the bucket path and the content is the currentFileValidLength in the bucket state. When processing the file in future, only read the bytes from the file up to its valid length to ensure exactly-once semantics.
7.3. Reset the flag for current file in bucket state.
restoredState.currentFile = null;
restoredState.currentFileValidLength = -1;
7.4. Process pendingFilesPerCheckpoint of the bucket state.
handlePendingFilesForPreviousCheckpoints(restoredState.pendingFilesPerCheckpoint);
restoredState.pendingFilesPerCheckpoint.clear();

The field pendingFilesPerCheckpoint contains all the files waiting for checkpoint complete notification of the one restored from and probably the earlier ones. The checkpoint notification might be lost due to a failure just after the checkpoint is done, so pendingFilesPerCheckpoint might not be empty and we need to process them. Since we are restoring from a successful checkpoint, it can be guaranteed that all the files in pendingFilesPerCheckpoint were written successfully. Thus, all the files in pendingFilesPerCheckpoint are moved to finished state, and pendingFilesPerCheckpoint is cleared.

Finally, we have gone through the major components and the corresponding implementations in BucketingSink, and discussed how it ensures exactly-once semantics by storing the states and recovering from them.