Fork/Join and More Java 5 Concurrency APIs

As I mentioned before, I am going through Programming Concurrency on the JVM by Venkat Subramaniam.

Chapter 3 was about design. He discussed immutability, shared mutability, and isolated mutability.

In Chapter 4 he went over some of the concurrency APIs that were introduced in Java 5.

The Fork/Join framework was not as much of a beast as I thought (although I am not claiming to know the whole thing). It is a lot like dealing with Callables, Futures and ExecutorServices and thread pools. First, you create a ForkJoinPool. Then you make a class that extends  a child class of ForkJoinTask. There are two, and they each have a method called “compute” that actually does the work, sort of like Callable.call(). RecursiveAction.compute() returns void, and RecursiveTask.compute() returns an object.

Your grandchild class of ForkJoinTask might instantiate a few copies of itself in the compute() method. You build up a java.util.Collection of them, and call invoke() or invokeAll() to start the calculations. I guess this is sort of like ExecutorService.submit(). Then you call ForkJoinTask.join to bring the results together. This is all kicked off by ForkJoinPool.invoke() (which returns an object) or ForkJoinPool.invokeAll() (which returns a List of Future objects). ForkJoinPool implements the ExecutorService interface, and ForkJoinTask implements the Future interface. So in a way it is like Callables and Futures, although I am not too clear right now when you would use one versus the other.

He deals briefly with CountDownLatch and CyclicBarrier. He states that there are better ways to do concurrency than Threads and locking, so I did not spend a whole lot of time on these. “CyclicBarrier” sounds like something out of science fiction.

He touched on AtomicLong, and that it is a good way to make a long variable thread-safe. There are also AtomicInteger and AtomicBoolean classes. I wonder why there is no AtomicFloat or AtomicDouble. The API for these says that these should not be used as replacements for the java.lang.Integer and java.lang.Long classes. Yet if you look at the Really Big Tutorial, they kind of do exactly that.

There was also a section on concurrent collections. He pointed out that they have better performance that older collection classes in addition to being thread safe, although he did not spend a whole lot of time on them.

There was a section on the classes in the java.util.concurrent.locks package. It looks like it is a good substitute for the synchronized keyword. although it is pretty verbose. I made a few classes that set and manipulate a few variables, with one using “synchronized” and two others using locks. One used a separate RentrantLock for each variable, and the other used one for the whole class. The main class just goes through a loop, calls some methods and sees how long it took. The class using the “synchronized” keyword was faster. I know that the Lock objects are better at preventing deadlocks, but it does seem like they sacrificed some speed. Perhaps I need to find a better comparison than the one I made.

Dr S. will be here next week for the Austin JUG meeting. Hopefully by then I will be through chapter 6, Software Transactional Memory. It is the longest chapter in the book. I will have to set up some projects for Clojure and Scala. I am doing the Java stuff with Gradle, and I think I am learning as much about Gradle as I am about concurrency.

2013-05-18 Tweets

A Look At Java’s Callable Interface Part 002

As I mentioned before, I am going through Programming Concurrency on the JVM by Venkat Subramaniam. I am looking at the Callable interface. He uses it a few times in chapter 2. I will go over some code I posted last time.

The two files I will go over are CallableWorker and ForwardCallableRunner. There are a few things in the code that are not relevant to concurrency, but still have use. I put in the ForwardCallableRunner.setIterations which will set the number of iterations a loop will run by using a system property, or set a default value if none is provided. I also put in some logging statements and calls to Thread.sleep() to get a better idea of when things are called.

The class that does the concurrent magic is CallableWorker. It implements the java.util.concurrent.Callable interface. It is part of the concurrency APIs that were introduced in JDK 5. It is similar to the java.lang.Runnable interface that has been part of the JDK since the very beginning. The main difference is that Callable can return a value and Runnable cannot. Developers would have to create additional methods to check if their Runnable.run() method had completed. Dr S says that we should use the newer APIs and not bother with the older stuff.

The only method defined by Callable is the call() method. It takes no arguments, and returns an object. If you are returning a Java primitive (like an int or double), you must use one of the wrapper classes in java.lang, like java.lang.Double.

I added a few parameters to the constructor. A lot of examples online that I saw (including the original that I got this from by Lars Vogella) do not use a lot of data. They just increment a variable in a loop. do some simple math, and return the result. If you want to add data to your class to work on, you could either add it via the constructor, or another method, or create the Callable inline, as Dr S does in his examples. I would guess that adding it in the constructor would be better for thread safety. But ulitimately, I just do some simple summing, like most other examples.

The class that creates and uses the Callables is ForwardCallableRunner. I also made a BackwardCallableRunner. They both create some Callables, which sum some numbers in loops. In ForwardCallableRunner, the numbers send to the constructors for CallableWorker increase in the loop, and in BackwardCallableRunner they decrease.

The first thing you need to do when using a Callable is to create a thread pool. The JDK 5 API gives you some classes to do that.

Use java.util.concurrent.Executors to create a thread pool. I use the method Executors.newFixedThreadPool with an integer argument for the size of the thread pool. This will return an instance of java.util.concurrent.ExecutorService.

Executors.newFixedThreadPool is overloaded. You could also call it with an addition argument, which is a class implementing the java.util.concurrent.ThreadFactory interface. There are also a couple of methods that make thread pools of dynamic size: Executors.newCachedThreadPool. You can use the Executors.newFixedThreadPool without specifying a ThreadFactory. There are a couple of other libraries that do implement or return an instance of java.util.concurrent.ThreadFactory: BasicThreadFactory in Apache Commons Lang, and  ThreadFactoryBuilder in the Google Guava library. I have not tried them, so I do not know what they give you beyond what the default implementation gives you.

The next step is to create an instance of  java.util.concurrent.Future, or a list of them. It must be of the same type as the Callable you created before.

The next part of the example is in a loop. In each iteration, I created a Callable, and I sent it as an argument in a call to the method java.util.concurrent.ExecutorService.submit. This will return a java.util.concurrent.Future, which will hold the result of our Callable. The Callable.call method starts when the ExecutorService.submit method is called. (I inferred this from the examples, but I wanted some verification; this is what led me to put in the logging statements.) I then add the Future that was just created to my list of Future objects.

Lastly, I create a for loop to cycle through the Future objects. This shows the next step in our Callable/Future lifecycle. To get the return value of the Callable.call method, you call Future.get. The documentation says that it will wait if the result is not ready yet. This is why I made an example that counts upwards through the number of iterations and one that starts at the iteration number and counts to 0. Since my CallableWorker.call method also does some sleeping, the class with the highest number should take longer. I wanted to see if the for loop with the call to Future.get would actually wait if the next Callable.call was not done yet. Since I got the same result in both classes, I assume that it did.

The last step is to call ExecutorService.shutdown().

Most of the examples are like the one that I worked with: They call ExecutorService.submit for each Callable. Dr S creates a java.util.List of Callable objects, and starts them off by calling ExecutorService.invokeAll. He also creates his Callable objects inline, and does not have a separate Something.java file for them.

So, to summarize, here are the steps to using Callable:

1. Create a class that implements the java.util.Callable interface. (This step could come later.)

2. Create a thread pool by calling java.util.concurrent.Executors.newFixedThreadPool to return an instance of java.util.concurrent.ExecutorService

3. Create a list with type of java.util.concurrent.Future.

4. Call ExecutorService.submit, sending it a Callable object. This will create a Future object, which you add to your list of Future objects.

5. Get the result of all of your Callable.call methods by iterating through your list of Future objects, and calling Future.get on each one.

6. Clean up by calling ExecutorService.shutdown.

Dr S does this slightly differently:

1. Create a list of inline objects that implement the java.util.Callable interface.

2. Create a thread pool by calling java.util.concurrent.Executors.newFixedThreadPool to return an instance of java.util.concurrent.ExecutorService

3. Create a list with type of java.util.concurrent.Future, and instantiate it with a call to ExecutorService.invokeAll. The paramter of ExecutorService.invokeAll is the list of Callable objects created in step 1.

4. Get the result of all of your Callable.call methods by iterating through your list of Future objects, and calling Future.get on each one.

5. Clean up by calling ExecutorService.shutdown.

His way has fewer steps, and fewer lines of code. But honestly I prefer the way most examples do it. I think it is clearer. Doing too much inline looks a bit noisy and crowded to me.

2013-05-11 Tweets

A Look At Java’s Callable Interface Part 001

As I mentioned, I am looking at Programming Concurrency on the JVM by Venkat Subramaniam. I got to an example that used the java.util.concurrent.Callable interface. I then went out looking for some more examples. I have put what I have done up on Github. I used a class by Lars Vogella in my code, and then I looked at a few other examples to see the pattern, and made a couple of my own.

Java had an interface called java.lang.Runnable from the beginning. One issue with it is that it does not return a value. I am not going to look at that too much or all of the methods of java.lang.Thread. The general consensus is that the world should move beyond the old ways.

This post is pretty big with the code, so I will put the code in this post and talk about it tomorrow.

Here is a class that implements the java.util.concurrent.Callable interface.

package info.shelfunit.concurrency.callable;

import java.util.List;
import java.util.LinkedList;
import java.util.UUID;

import java.util.concurrent.Callable;
import info.shelfunit.util.ShelfLogger;
import org.apache.log4j.Logger;

public class CallableWorker implements Callable< Double > {

    private final int num;
    private final String idString;
    private static Logger logger;

    public CallableWorker(int num, String idString) {
	    logger = ShelfLogger.getInstance().getLogger();
	    this.num = num;
	    this.idString = idString;
	    logger.info("Starting CallableWorker " + num + ", " + idString);
    }

    @Override
    public Double call() throws Exception {
	    logger.info("Starting CallableWorker.call in " + num + ", " + idString);
	    this.hello();
	    double sum = 0;
	    Thread.sleep( num * 1000 );
	    for ( double i = 0; i <= 100; i++ ) {
	        sum += ( i * num );
	    }

	    Thread.sleep( num * 1000 );
	    logger.info("Ending CallableWorker.call in " + num + ", " + idString);
	    return sum;
    } // end method call

    private void hello() {
	    logger.info("in CallableWorker.hello in " + num + ", " + idString);
    }

} // end class info.shelfunit.concurrency.callable.CallableWorker

Here is a class that calls the class we just made.

 

package info.shelfunit.concurrency.callable;

import java.util.ArrayList;
import java.util.List;
import java.util.UUID;

import java.util.concurrent.Callable;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;

import info.shelfunit.util.ShelfLogger;
import org.apache.log4j.Logger;

public class ForwardCallableRunner {

    private static final int NTHREDS = 10;
    private static int iterations;

    private static ShelfLogger shelfLogger;
    private static Logger logger;

    public static void setIterations() {
	    try {
	        iterations = new Integer( System.getProperty( "iterations" ) );
	    } catch ( Exception e) {
	        e.printStackTrace();
	        iterations = 5; // some default value
	    }
    }

    public static void main( String[] args ) throws InterruptedException {
	    logger = ShelfLogger.getInstance().getLogger();

	    setIterations();
	    String idString;
	    ExecutorService executor = Executors.newFixedThreadPool( NTHREDS );
	    List< Future< Double > > futureList = new ArrayList< Future< Double > >();

	    for ( int i = 0; i <= iterations; i++ ) {
	        idString = UUID.randomUUID().toString();

	        logger.info( "Starting a new MyCallable: " + i + ", " + idString );
	        Callable< Double > worker = new CallableWorker(i, idString);
	        Thread.sleep( 2 * 1000 );

	        logger.info( "About to submit MyCallable: " + i + ", " + idString );
	        Future< Double > submit = executor.submit( worker );

	        logger.info( "About to add to futureList MyCallable: " + i + ", " + idString );
	        logger.info( " ------ " );

	        futureList.add( submit );
	    }

	    long sum = 0;
	    logger.info( "The size of the list is: " + futureList.size() );

	    for ( Future< Double > future : futureList ) {
	        try {
		        sum += future.get();
	        } catch ( InterruptedException e ) {
		        e.printStackTrace();
	        } catch ( ExecutionException e ) {
		        e.printStackTrace();
	        }
	    } // for ( Future< Double > future : futureList )

	    logger.info( "The sum is: " + sum );
	    executor.shutdown();
    } // end method main

} // ForwardCallableRunner

I will go through the code in a couple of days.

2013-05-07 Update

I am pretty late, but here is an update.

I am still working on helping a local developer upgrade his Rails 2 app to Rails 3. I hit a wall. There is a lot of Javascript. I do not know too much Javascript. I asked someone for some help. We will see how things go.

I am kind of thinking about reviving RailsPress. A Rails user group had another thread complaining about WordPress. In addition, there was an article on Naked Capitalism that the site was having some issues with a WordPress host called WPEngine.  WPEngine is in the Capital Factory, an incubator in Austin that hosts the Austin Ruby and Austin Rails groups. If I get RailsPress up and running, I doubt I will be talking about it at the Capital Factory.

I am also looking at Groovy and Grails. I am porting a DNS server written in Java to Groovy. I got the tests written in Spock. I have not started porting the server itself yet. Getting the tests from JUnit to Spock took a while.

I am also looking at Java concurrency. I am going through Programming Concurrency on the JVM by Venkat Subramaniam. I don’t know if I will ever use some of this stuff (I might use GPars in my Groovy DNS port), but this has been a big hole in my Java knowledge that I think it’s time I should fill. I might post about the Callable interface.

2013-05-04 Tweets

2013-04-20 Tweets

2013-04-13 Tweets

2013-04-06 Tweets