Zhu Wu's Blog

The world is a fine place and worth fighting for.

Effective Spark Map-like Transformation with External Resource

Suppose we have a Spark RDD, and we want to do transformation on each record by getting the value from an external resource/API/service. How could we do it efficiently?

The most naive way to achieve it can be this:

keyedRDD.map {
  case(key, record) => {
    // Init a new resource.
    val resource = new Resource()

    // Call the external resource for value.
    val result = resource.process(record)

    // Close the resource.
    resource.close()

    // Return the result key value pair.
    (key, result)
  }
}

You may notice that the external resource is initialized for every record in RDD. It may potentially harm the whole job performance if the resource initialization overhead is large. For example, if the external resource requires a large memory space or the initialization time is long, the Spark job may suffer from memory constraint or the whole job can be slowed down.

A better way is to initialize the resource once per RDD partition. This can be achieved by using the mapPartitions API. This API provides an iterator to the records of a certain partition. Thus, we can use the initialized external resource to process every record in the partition, and return a new iterator at the end:

keyedRDD.mapPartitions(iter => {
  // Init a new resource.
  val resource = new Resource()

  val buffer = new ListBuffer[(String, String)]
  while (iter.hasNext) {
    val (key, record) = iter.next

    // Call the external resource for value.
    val result = resource.process(record)

    // Put result key value pair to buffer.
    buffer += (key, result)
  }

  // The whole processing is done. Close the resource.
  resource.close()

  // Return the result iterator.
  buffer.iterator
})

This solution is optimal for resource initialization now. However, we use a ListBuffer to keep the result values of every record processing in a partition. Again, if the partition is large, the memory of an executor may not be enough to hold the entire partition's result, which results in job failure. To solve this, we can apply the custom iterator pattern:

// The custom iterator is a wrapper of the original one
class CustomIterator(iter: Iterator[(String, String)]) extends Iterator[(String, String)] {
  // Init a new resource.
  // This statement is executed when a new `CustomIterator` object is created.
  val resource = new Resource()

  // It should return the same value as the underlying iterator's `hasNext`
  def hasNext: Boolean = {
    val iterHasNext = iter.hasNext
    if (!iterHasNext) {
      // When it reaches the end of un iterator, close the external Resource.
      resource.close() 
    }
    iterHasNext
  }

  // Process data in `next` method.
  def next : (String, String) = {
    // Get the current key value pair from the underlying iterator.
    val (key, record) = iter.next

    // Call the external resource for value.
    val result = resource.process(record)

    // Return the result key value pair.
    (key, result)
  }
}

// Use the custom iterator to perform the map transformation.
keyedRDD.mapPartitions(new CustomIterator(_))

The trick here is to wrap the original iterator provided by mapPartitions with the custom iterator. The CustomIterator overrides hasNext and next methods. The details of these two methods are described in the code comment above. During the execution, the external resource is initialized when a new CustomIterator object is created. Then, in the internal implementation, mapPartitions calls hasNext and next of the CustomIterator object accordingly, which streamlined the data processing without any buffer in between. Finally when the iterator reaches the end, the external resource is closed.