Masoud Kalali's Blog

My thoughts on software engineering and beyond…

By

A walkthrough for the fork/join framework introduced in Java SE 7

Java SE 7 brought some neat features on the table for Java developers, one of these features is the fork/join framework or basically the new parallel programming framework we can use to easily to implement our divide and conquer solutions. The point is that a solution to a problem should be devised with the following characteristics to use the fork/join framework effectively:

  • The problem domain whether it is a file, list, etc to be processed or a computation should be dividable to smaller subtasks.
  • Processing chunks should be possible without requiring the result of other chunks.

To summarize it, solving or processing the problem domain should require no self-feedback to make it possible to use the framework. For example if you want to process a list and processing each element in the list require the result of processing previous element then it is impossible to use any parallel computing for doing that job. If you want to apply some FFT over a sound stream which require feedback for processing each pulse from the previous pulses it is not possible to speedup the processing using the fork/join framework, etc.

Well, before we start learning the fork/join framework we better know what it is and what it is not: What fork/join framework is:

  • A parallel programming framework for Java
  • Part of Java SE 7
  • Suitable for implementing parallel processing solutions, mostly data intensive with small or no shared resources between the workers who process the data chunks.
  • Suitable when no synchronization is required between the workers

What fork/join framework is not:

  • It is not a magic that turns your code to run fast on machines with multiple processors, you need to think and implement your solutions in a parallel manner.
  • It is not hard and obscure like other frameworks, MPI for example. Using the framework is way easier than anything I used before.

If you want to learn the mechanics behind the fork/join framework you can read the original article written by Doug Le which explains the motive and the design. The article is available at http://gee.cs.oswego.edu/dl/papers/fj.pdf. If you want to see how we can use the framework then continue on reading this article.

First let’s see what are the important classes that one need to know in order to implement a divide and conquer solution using fork/join framework and then we will start using those classes.

  • The ForkJoinPool: This is the workers pool where you can post your ForkJoinTask to be executed. The default parallelism level is the number of processors available to the runtime.
  • The RecursiveTask<V>: This is a task, subclass of the ForkJoinTask which can return some value of type V. For example processing a list of DTOs and returning the result of process.
  • The RecursiveAction: Another subclass of the ForkJoinTask without any return value, for example processing an array…

I looked at this new API mainly for data pipelining in which I need to process a pretty huge list of object and turn it to another format to keep the processing result of one library consumable for the next one in the data flow and I am happy with the result pretty easy and straight forward.

Following is an small sample showing how to process a list of Row objects and convert them a list of Entity Objects. In my case it was something similar with processing Row objects and turning them to OData OEntity objects.


import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.ForkJoinPool;
import java.util.concurrent.RecursiveTask;

/**
 *
 * @author Masoud Kalali <mkalali>
 */
class RowConverter extends RecursiveTask<List<Entity>> {

    //if more than 5000 we will use the parallel processing
    static final int SINGLE_TREAD_TOP = 5000;
    int begin;
    int end;
    List<Row> rows;

    public RowConverter(int begin, int end, List<Row> rows) {
        this.begin = begin;
        this.end = end;
        this.rows = rows;
    }

    @Override
    protected List<Entity> compute() {

        if (end - begin <= SINGLE_TREAD_TOP) {
            //actual processing happens here
            List<Entity> preparedEntities = new ArrayList<Entity>(end - begin);
            System.out.println("  beging: " + begin + " end: " + end);
            for (int i = begin; i < end; ++i) {
                preparedEntities.add(convertRow(rows.get(i)));
            }
            return preparedEntities;
        } else {
            //here we do the dividing the work and combining the results
            // specifies the number of chunks you want to break the data to
            int divider = 5000;
            // one can calculate the divider based on the list size and the number of processor available 
            // using the http://download.oracle.com/javase/7/docs/api/java/lang/Runtime.html#availableProcessors()
            // decrease the divider number and examine the changes.

            RowConverter curLeft = new RowConverter(begin, divider, rows);
            RowConverter curRight = new RowConverter(divider, end, rows);
            curLeft.fork();
            List<Entity> leftReslt = curRight.compute();
            List<Entity> rightRes = curLeft.join();
            leftReslt.addAll(rightRes);
            return leftReslt;
        }
    }

    //dummy converted method converting one DTO to another
    private Entity convertRow(Row row) {

        return new Entity(row.getId());
    }
}

// the driver class which own the pool 
public class Fjf {

    public static void main(String[] args) {

        List<Row> rawData = initDummyList(10000);
        ForkJoinPool pool = new ForkJoinPool();
        System.out.println("number of worker threads: " + pool.getParallelism());


        List<Entity> res = pool.invoke(new RowConverter(0, rawData.size(), rawData));

        // add a breakpoint here and examine the pool object. 
        //check how the stealCount, which shows number of subtasks taken on by available workers, 
        //changes when you use an smaller divider and thus produce more tasks
        System.out.println("processed list: " + res.size());

    }

    /**
     * creates a dummy list of rows
     * 
     * @param size number of rows int he list
     * @return the list of @see Row objects
     */
    private static List<Row> initDummyList(int size) {

        List<Row> rows = new ArrayList<Row>(size);

        for (int i = 0; i < size; i++) {
            rows.add(new Row(i));
        }
        return rows;
    }
}

//dummy classes which should be converted from one form to another
class Row {

    int id;

    public Row(int id) {
        this.id = id;
    }

    public int getId() {
        return id;
    }
}

class Entity {

    int id;

    public Entity(int id) {
        this.id = id;
    }

    public int getId() {
        return id;
    }
}

Just copy and paste the code into your IDE and try running and examining it to get deeper understanding of how the framework can be used. post any comment and possible questions that you may have here and I will try to help you own with them.

One Response to A walkthrough for the fork/join framework introduced in Java SE 7

  1. shreyas nimbkar says:

    What is the best possible example of this framework?

Leave a Reply

Your email address will not be published. Required fields are marked *


nine − = 6

You may use these HTML tags and attributes: <a href="" title=""> <abbr title=""> <acronym title=""> <b> <blockquote cite=""> <cite> <code class="" title="" data-url=""> <del datetime=""> <em> <i> <q cite=""> <strike> <strong> <pre class="" title="" data-url=""> <span class="" title="" data-url="">