Stream PipelinesS2C Home « Stream Pipelines

The Introducing Streams lesson gave us an insight into the capability of streams that were introduced in Java8. In this lesson we look at stream processes (pipelines) in much more detail.

We talked briefly about pipelines in the last lesson but just to reiterate:

  1. Create a stream from a source.
  2. Specify zero or more intermediate operations that return a stream to be processed by the next operation.
  3. Specify zero or one terminal operation that will invoke the lazy operations preceding it (the intermediate operations) culminating in either a void or non-stream result.

All streams follow this pipeline pattern where a stream is created from a source, transformed into new streams in each subsequent intermediate step, if there are any, and is then terminated. The subsequent result depends on the terminal operation used and can be an array, collection, object type, primitive type or void.

Although you can specify a stream with intermediate operations and no terminal operation this is pointless as intermediate operations are lazy and so the stream will never be processed.

Stream Pipeline Slideshow Top

The following slideshow should help with understanding of stream pipelines and we will code it up after the show. Just press the buttons to step through it:

pipeline 1







The first thing we do is create a stream from an input source.

pipeline 2





Here we are using the intermediate filter() method to create a new stream.

pipeline 3





Here we use the intermediate skip() method on the filtered stream.

pipeline 4





Here we use the intermediate limit() method on the skipped stream.

pipeline 5



Here we use the terminal forEach() method on the limited stream.

pipeline 6

The result. In the case of the terminal forEach() operation this is void().

Ok, lets code up the slideshow.

Following is a TestPipelineStream class that tests against the Employee class we created in the Introducing Streams lesson to demonstrate the slideshow:


package com.server2client;

/* Test our pipeline */
public class TestPipelineStream {

    public static void main(String[] args) {

        Employee.listOfStaff().stream()
            .filter(e -> e.getAge() > 21)
            .skip(2)
            .limit(2)
            .forEach(System.out::println);
    }
}

Building and running the TestEmployeeAge class produces the following output:

Run TestPipelineStream class
Screenshot 1. Running the TestPipelineStream class.

As you can see the from the screenshot 2 employees are printed. Lets go through what we did:

  1. We filtered the source reducing the stream to five
  2. We skipped the first two values reducing the stream to three.
  3. We set a limit of 2 on values.
  4. We printed out each employees details.

We used the skip() intermediate operator for the first time which returns a stream consisting of the remaining elements of this stream after discarding the first n elements of the stream.

We also used the limit() intermediate operator for the first time which returns a stream consisting of the elements of this stream, truncated to be no longer than maxSize in length.

We also used the forEach() terminal operator for the first time to process each value within our streams.

Using peek() Top

So now we have seen a stream pipeline in operation, wouldn't it be nice to see the streams evolving as we go through each intermediate operation so we can really get a handle on pipeline processing. Luckily for us there is the peek() method of the Stream<E> interface which returns a stream consisting of the elements of this stream, additionally performing the provided action on each element as elements are consumed from the resulting stream.

From the official documentation: This method exists mainly to support debugging, where you want to see the elements as they flow past a certain point in a pipeline:

Lets modify our TestPipelineStream class we used above to use the peek() method after each intermediate operation:


package com.server2client;

/* Test our pipeline */
public class TestPipelineStream {

    public static void main(String[] args) {

        /* Take a peek after each intermediate action */
        Employee.listOfStaff().stream()
                .filter(e -> e.getAge() > 21)
                .peek(e -> System.out.println("After Filtering value: " + e))
                .skip(2)
                .peek(e -> System.out.println("After Skipping value: " + e))
                .limit(2)
                .peek(e -> System.out.println("After Limiting value: " + e))
                .forEach(System.out::println);
    }
}

Building and rerunning the TestEmployeeAge class produces the following output:

Run TestPipelineStream class 2
Screenshot 2. Rerunning the TestPipelineStream class.

Although at first the screenshot might be seem confusing we have to remember that the intermediate operations don't run sequentially, so the skip() method can process a filtered element before all elements are filtered and so on.

  1. We can see that four elements were filteredby the over 21 predicate which is correct when you examine the Employee class.
  2. We skipped the first two values reducing the stream to three and we see only two skipped values in the screenshot.
  3. We set a limit of 2 on values.
  4. We printed out each employees details.

The peek() method also allows us to alter the inner state of an element such as capitalising text rather than using the map() method when we don't want to replace elements.

Related Quiz

Streams Quiz 2 - Stream Pipelines

Lesson 2 Complete

In this lesson we looked at stream pipelining.

What's Next?

In the next lesson we take a high level overview of streams including tables listing all the intermediate and terminal operations.