Parallel StreamsJ8 Home « Parallel Streams

In this lesson we take a final look at streams by investigating parallelizing streams and how and when to use this technique effectively.

We mentioned way back in the Introducing Streams lesson that an interesting aspect of streams is that they can be processed in parallel without any of the overheads and additional coding associated with concurrency. We showed how easy it is to turn a stream into a parallel stream:



// Stream
long count2 = Employee.listOfStaff().stream()
        .filter(e -> e.getAge() > 21)
        .count();


// Parallel Stream
long count2 = Employee.listOfStaff().parallelStream()
        .filter(e -> e.getAge() > 21)
        .count();


All we do is change the stream() method to the parallelStream() method and voila our stream can now run the filter() and count() methods in parallel!

If something seems to good to be true it generally is and there are some guidelines that need to be followed when parallelizing streams or you could end up with a slower process or even a runtime exception!

  • The provided data should be in memory.
  • The processing done by the stream operations should be signicant or you may as well use a sequential stream.
  • Under the bonnet parallel streams use the fork/join framework, so the stream should be easily split into subparts in an efficient manner.
  • Some stream operations perform better sequentially than in parallel, for example the findFirst() and limit() intermediate operations rely on the order of the elements and so are not suitable for parallelizing.
  • The iterate() method is quintissentially sequential and so not suitable for parallelizing.
  • Beware of autoboxing during intermediate operations at this can have a dramatic effect on performance when running in parallel; use the primitive methods DoubleStream, IntStream and LongStream whenever posssible.
  • Stream operations should not block.

Parallel streams work best on sources that are easily split into subparts such as these collection types ArrayList, ConcurrentHashMap, HashMap and HashSet and ranges of the primitive int and long types.

Each subpart of a stream is traversed by a Spliterator object (splittable iterator) which is designed to traverse the subparts of a stream in parallel as well as sequentailly. In the following examples we are using the default Spliterator, provided by Java8 for all data structures, although you can build one for your own requirements when necessary.

There are two methods we can use to run in parallel, the parallel() method of the BaseStream interface and the parallelStream() method of the Stream interface. Lets look at parallelStream() first.

Using parallelStream() Top

The parallelStream method of the Collection interface, which returns a possibly parallel stream with this collection as its source, allows us to convert a collection to a parallel stream.

Following is a TestParallelStream class to demonstrate usage of the parallelStream method of the Collection interface.


package info.java8;

import java.time.chrono.IsoChronology;
import java.util.*;
import java.util.stream.Collectors;

import static java.util.stream.Collectors.mapping;

/* Parallel streams */
public class TestParallelStream {

    static List<Employee> staff = new ArrayList<>();

    public static void main(String[] args) {

        // 12 cores
        System.setProperty("java.util.concurrent.ForkJoinPool.common.parallelism", "12");
        
        // Create a large number of Employees
        TestParallelStream tps = new TestParallelStream();
        tps.CreateEmployees(4_000_000);
        
        // Sequential  
        long start = System.nanoTime();
        staff.stream()
                .flatMap(employee -> employee.getRoles()
                        .stream()
                        .map(role -> new AbstractMap.SimpleEntry<>(role, employee.getName())))
                .collect(Collectors.groupingBy(
                        Map.Entry::getKey,
                        mapping(Map.Entry::getValue, Collectors.toList())));
        long end = (System.nanoTime() - start) / 1_000_000;
        System.out.println(end + " msecs run sequentially");

        // Parallel
        start = System.nanoTime();
        staff.parallelStream()
                .flatMap(employee -> employee.getRoles()
                        .stream()
                        .map(role -> new AbstractMap.SimpleEntry<>(role, employee.getName())))
                .collect(Collectors.groupingBy(
                        Map.Entry::getKey,
                        mapping(Map.Entry::getValue, Collectors.toList())));
        end = (System.nanoTime() - start) / 1_000_000;
        System.out.println(end + " msecs run in parallel");
    }

    public void CreateEmployees(int i) {
        for (int j = 0; j < i; j++) {
            staff.add(new Employee("Derek", Employee.Sex.NON_BINARY,
                    IsoChronology.INSTANCE.date(1987, 7, 17), 32499.00,
                    new ArrayList<>(Arrays.asList("Manager", "Sales"))));
            staff.add(new Employee("Jane", Employee.Sex.FEMALE,
                    IsoChronology.INSTANCE.date(1960, 12, 3), 56999.00,
                    new ArrayList<>(Arrays.asList("Director", "Accountant"))));
            staff.add(new Employee("Matthew", Employee.Sex.MALE,
                    IsoChronology.INSTANCE.date(2002, 4, 7), 12999.00,
                    new ArrayList<>(Arrays.asList("Personnel", "Receptionist"))));
        }
    }
}

Building and running the TestParallelStream class produces the following output:

Run TestParallelStream class
Screenshot 1. Running the TestParallelStream class.

Lets go though the code and see what's going on!

Firstly we change the size of the ForkJoinPool pool to 12 to match my processors.

We then run a large list of Employee elements through the same stream sequentially and in parallel and display some rudimentary timings for each.

We can see that the parallel run is 8xfaster than the sequential run. The parallel run isn't 12xfaster as you might expect because of the 12 cores as there is quite an overhead setting up a parallel stream. Running the test repeatedly gives varying results but the parallel stream is always at least twice as fast as the sequential one.

The parallel stream is faster because of some of the guidelines mentioned above i.e. it's an ArrayList, doesn't use iterate() and there isn't tons of autoboxing.

Using parallel() Top

The parallel method of the BaseStream interface returns an equivalent stream that is parallel and that we can add to our streams as an intermediate operation.

Following is a TestParallel class to demonstrate usage of the parallel method of the BaseStream interface.


package info.java8;

import java.time.chrono.IsoChronology;
import java.util.*;
import java.util.stream.Collectors;

import static java.util.stream.Collectors.mapping;

/* Parallel streams */
public class TestParallel {

    static List<Employee> staff = new ArrayList<>();

    public static void main(String[] args) {

        // 12 cores
        System.setProperty("java.util.concurrent.ForkJoinPool.common.parallelism", "12");
        
        // Create a large number of Employees
        TestParallel tp = new TestParallel();
        tp.CreateEmployees(4_000_000);
        
        // Sequential  
        long start = System.nanoTime();
        staff.stream()
                .sequential()
                .flatMap(employee -> employee.getRoles()
                        .stream()
                        .map(role -> new AbstractMap.SimpleEntry<>(role, employee.getName())))
                .collect(Collectors.groupingBy(
                        Map.Entry::getKey,
                        mapping(Map.Entry::getValue, Collectors.toList())));
        long end = (System.nanoTime() - start) / 1_000_000;
        System.out.println(end + " msecs run sequentially");

        // Parallel
        start = System.nanoTime();
        staff.stream()
                .parallel()
                .flatMap(employee -> employee.getRoles()
                        .stream()
                        .map(role -> new AbstractMap.SimpleEntry<>(role, employee.getName())))
                .collect(Collectors.groupingBy(
                        Map.Entry::getKey,
                        mapping(Map.Entry::getValue, Collectors.toList())));
        end = (System.nanoTime() - start) / 1_000_000;
        System.out.println(end + " msecs run in parallel");
    }

    public void CreateEmployees(int i) {
        for (int j = 0; j < i; j++) {
            staff.add(new Employee("Jack", Employee.Sex.MALE,
                    IsoChronology.INSTANCE.date(1954, 12, 2), 79999.00,
                    new ArrayList<>(Arrays.asList("Owner", "Director"))));
            staff.add(new Employee("Jill", Employee.Sex.FEMALE,
                    IsoChronology.INSTANCE.date(1995, 10, 25), 39999.00,
                    new ArrayList<>(Arrays.asList("Secretary", "Manager", "Personnel"))));
            staff.add(new Employee("Dorothy", Employee.Sex.FEMALE,
                    IsoChronology.INSTANCE.date(1972, 4, 7), 21999.00,
                    new ArrayList<>(Arrays.asList("Secretary", "Receptionist"))));
        }
    }
}

Building and running the TestParallelStream class produces the following output:

Run TestParallel class
Screenshot 1. Running the TestParallel class.

Once again we change the size of the ForkJoinPool pool to 12 to match my processors.

We then run a large list of Employee elements through the same stream sequentially and in parallel and display some rudimentary timings for each.

We stream the Employee list using the sequential() and parallel methods respectively of the BaseStream interface respectively.

We can see that the parallel run is 3xfaster than the sequential run. Running the test repeatedly gives varying results but the parallel stream is always at least twice as fast as the sequential one.

The parallel stream is faster because of some of the guidelines mentioned above i.e. it's an ArrayList, doesn't use iterate() and there isn't tons of autoboxing.

Related Quiz

Streams Quiz 12 - Parallel Streams Quiz

Lesson 12 Complete

In this lesson we looked at numeric streams and some of the operations associated with them.

What's Next?

That's it for the Streams section, in the next section we tackle the complex subject of concurrency.