Parallel StreamsJ8 Home « Parallel Streams
In this lesson we take a final look at streams by investigating parallelizing streams and how and when to use this technique effectively.
We mentioned way back in the Introducing Streams lesson that an interesting aspect of streams is that they can be processed in parallel without any of the overheads and additional coding associated with concurrency. We showed how easy it is to turn a stream into a parallel stream:
// Stream
long count2 = Employee.listOfStaff().stream()
.filter(e -> e.getAge() > 21)
.count();
// Parallel Stream
long count2 = Employee.listOfStaff().parallelStream()
.filter(e -> e.getAge() > 21)
.count();
All we do is change the stream()
method to the parallelStream()
method and voila our stream can now run the filter()
and count()
methods in parallel!
If something seems to good to be true it generally is and there are some guidelines that need to be followed when parallelizing streams or you could end up with a slower process or even a runtime exception!
- The provided data should be in memory.
- The processing done by the stream operations should be signicant or you may as well use a sequential stream.
- Under the bonnet parallel streams use the fork/join framework, so the stream should be easily split into subparts in an efficient manner.
- Some stream operations perform better sequentially than in parallel, for example the
findFirst()
andlimit()
intermediate operations rely on the order of the elements and so are not suitable for parallelizing. - The
iterate()
method is quintissentially sequential and so not suitable for parallelizing. - Beware of autoboxing during intermediate operations at this can have a dramatic effect on performance when running in parallel; use the primitive methods
DoubleStream
,IntStream
andLongStream
whenever posssible. - Stream operations should not block.
Parallel streams work best on sources that are easily split into subparts such as these collection types ArrayList
, ConcurrentHashMap
, HashMap
and HashSet
and ranges of the primitive int
and long
types.
Each subpart of a stream is traversed by a Spliterator
object (splittable iterator) which is designed to traverse the subparts of a stream in parallel as well as sequentailly. In the following examples we are using the default Spliterator
, provided by Java8 for all data structures, although you can build one for your own requirements when necessary.
There are two methods we can use to run in parallel, the parallel()
method of the BaseStream
interface and the parallelStream()
method of the Stream
interface. Lets look at parallelStream()
first.
Using parallelStream()
Top
The parallelStream
method of the Collection
interface, which returns a possibly parallel stream with this collection as its source, allows us to convert a collection to a parallel stream.
Following is a TestParallelStream
class to demonstrate usage of the parallelStream
method of the Collection
interface.
package info.java8;
import java.time.chrono.IsoChronology;
import java.util.*;
import java.util.stream.Collectors;
import static java.util.stream.Collectors.mapping;
/* Parallel streams */
public class TestParallelStream {
static List<Employee> staff = new ArrayList<>();
public static void main(String[] args) {
// 12 cores
System.setProperty("java.util.concurrent.ForkJoinPool.common.parallelism", "12");
// Create a large number of Employees
TestParallelStream tps = new TestParallelStream();
tps.CreateEmployees(4_000_000);
// Sequential
long start = System.nanoTime();
staff.stream()
.flatMap(employee -> employee.getRoles()
.stream()
.map(role -> new AbstractMap.SimpleEntry<>(role, employee.getName())))
.collect(Collectors.groupingBy(
Map.Entry::getKey,
mapping(Map.Entry::getValue, Collectors.toList())));
long end = (System.nanoTime() - start) / 1_000_000;
System.out.println(end + " msecs run sequentially");
// Parallel
start = System.nanoTime();
staff.parallelStream()
.flatMap(employee -> employee.getRoles()
.stream()
.map(role -> new AbstractMap.SimpleEntry<>(role, employee.getName())))
.collect(Collectors.groupingBy(
Map.Entry::getKey,
mapping(Map.Entry::getValue, Collectors.toList())));
end = (System.nanoTime() - start) / 1_000_000;
System.out.println(end + " msecs run in parallel");
}
public void CreateEmployees(int i) {
for (int j = 0; j < i; j++) {
staff.add(new Employee("Derek", Employee.Sex.NON_BINARY,
IsoChronology.INSTANCE.date(1987, 7, 17), 32499.00,
new ArrayList<>(Arrays.asList("Manager", "Sales"))));
staff.add(new Employee("Jane", Employee.Sex.FEMALE,
IsoChronology.INSTANCE.date(1960, 12, 3), 56999.00,
new ArrayList<>(Arrays.asList("Director", "Accountant"))));
staff.add(new Employee("Matthew", Employee.Sex.MALE,
IsoChronology.INSTANCE.date(2002, 4, 7), 12999.00,
new ArrayList<>(Arrays.asList("Personnel", "Receptionist"))));
}
}
}
Building and running the TestParallelStream
class produces the following output:
Lets go though the code and see what's going on!
Firstly we change the size of the ForkJoinPool
pool to 12 to match my processors.
We then run a large list of Employee
elements through the same stream sequentially and in parallel and display some rudimentary timings for each.
We can see that the parallel run is 8xfaster than the sequential run. The parallel run isn't 12xfaster as you might expect because of the 12 cores as there is quite an overhead setting up a parallel stream. Running the test repeatedly gives varying results but the parallel stream is always at least twice as fast as the sequential one.
The parallel stream is faster because of some of the guidelines mentioned above i.e. it's an ArrayList
, doesn't use iterate()
and there isn't tons of autoboxing.
Using parallel()
Top
The parallel
method of the BaseStream
interface returns an equivalent stream that is parallel and that we can add to our streams as an intermediate operation.
Following is a TestParallel
class to demonstrate usage of the parallel
method of the BaseStream
interface.
package info.java8;
import java.time.chrono.IsoChronology;
import java.util.*;
import java.util.stream.Collectors;
import static java.util.stream.Collectors.mapping;
/* Parallel streams */
public class TestParallel {
static List<Employee> staff = new ArrayList<>();
public static void main(String[] args) {
// 12 cores
System.setProperty("java.util.concurrent.ForkJoinPool.common.parallelism", "12");
// Create a large number of Employees
TestParallel tp = new TestParallel();
tp.CreateEmployees(4_000_000);
// Sequential
long start = System.nanoTime();
staff.stream()
.sequential()
.flatMap(employee -> employee.getRoles()
.stream()
.map(role -> new AbstractMap.SimpleEntry<>(role, employee.getName())))
.collect(Collectors.groupingBy(
Map.Entry::getKey,
mapping(Map.Entry::getValue, Collectors.toList())));
long end = (System.nanoTime() - start) / 1_000_000;
System.out.println(end + " msecs run sequentially");
// Parallel
start = System.nanoTime();
staff.stream()
.parallel()
.flatMap(employee -> employee.getRoles()
.stream()
.map(role -> new AbstractMap.SimpleEntry<>(role, employee.getName())))
.collect(Collectors.groupingBy(
Map.Entry::getKey,
mapping(Map.Entry::getValue, Collectors.toList())));
end = (System.nanoTime() - start) / 1_000_000;
System.out.println(end + " msecs run in parallel");
}
public void CreateEmployees(int i) {
for (int j = 0; j < i; j++) {
staff.add(new Employee("Jack", Employee.Sex.MALE,
IsoChronology.INSTANCE.date(1954, 12, 2), 79999.00,
new ArrayList<>(Arrays.asList("Owner", "Director"))));
staff.add(new Employee("Jill", Employee.Sex.FEMALE,
IsoChronology.INSTANCE.date(1995, 10, 25), 39999.00,
new ArrayList<>(Arrays.asList("Secretary", "Manager", "Personnel"))));
staff.add(new Employee("Dorothy", Employee.Sex.FEMALE,
IsoChronology.INSTANCE.date(1972, 4, 7), 21999.00,
new ArrayList<>(Arrays.asList("Secretary", "Receptionist"))));
}
}
}
Building and running the TestParallel
class produces the following output:
Once again we change the size of the ForkJoinPool
pool to 12 to match my processors.
We then run a large list of Employee
elements through the same stream sequentially and in parallel and display some rudimentary timings for each.
We stream the Employee
list using the sequential()
and parallel
methods respectively of the BaseStream
interface respectively.
We can see that the parallel run is 3xfaster than the sequential run. Running the test repeatedly gives varying results but the parallel stream is always at least twice as fast as the sequential one.
The parallel stream is faster because of some of the guidelines mentioned above i.e. it's an ArrayList
, doesn't use iterate()
and there isn't tons of autoboxing.
Related Quiz
Streams Quiz 12 - Parallel Streams Quiz
Lesson 12 Complete
In this lesson we looked at numeric streams and some of the operations associated with them.
What's Next?
That's it for the Streams section, in the next section we tackle the complex subject of concurrency.