Streams
Record Stream
A
RecordStream is a
Stream<Record> - a stream of
Record instances. All operations defined in the Java
8
Stream interface are supported for
RecordStream. Obtained using the
DatasetReader.records() method, a
RecordStream is the primary means of performing a
query against a TCStore
Dataset.
As with a
java.util.stream.Stream, a
RecordStream may be used only once. Unlike a Java
Stream, a
RecordStream closes itself when the stream is fully
consumed through a terminal operation other than
iterator or
spliterator. (Even so, it is good practice to close
a
RecordStream using a try-with-resources block or
RecordStream.close.) There are no provisions for
concatenating two
RecordStream instances while retaining
RecordStream capabilities.
Most
RecordStream intermediate operations return a
RecordStream. However, operations which perform a
transformation on a stream element may return a
Stream<Record> which is
not a
RecordStream. For example,
map(identity()) returns a
Stream<Record> which is not a
RecordStream.
RecordStream, in addition to being composed of
intermediate and terminal operations (as described in the Java 8 package
java.util.stream), is comprised of a server-side and
a client-side pipeline segment. Every
RecordStream originates in the server. As each
operation is added during pipeline construction, an evaluation is made if the
operation and its arguments can be run in the server (extending the server-side
pipeline) - many pipeline operations can be run in the server. The first
operation which cannot be run in the server begins the client-side pipeline. A
stream pipeline may have both server-side and client-side pipeline segments,
only a server-side segment, or only a client-side segment (other than the
stream source). Each
Record or element passing through the stream
pipeline is processed first by the server-side pipeline segment (if any) and is
then passed to the client-side pipeline segment (if the client-side pipeline
segment exists) to complete processing.
The following code creates a
RecordStream and performs few operations on the
records of the stream:
long numMaleEmployees = employeeReader.records() // <1>
.filter(GENDER.value().is('M')) // <2>
.count(); // <3>
| 1 | The
DatasetReader.record() method returns a
RecordStream delivering
Record instances from the
Dataset referred to by the
DatasetReader.
|
| 2 |
Stream intermediate
operations on a
RecordStream return a stream whose type is
determined by the operation and its parameters. In this example, filter
provides a
RecordStream.
|
| 3 | A
Stream terminal operation on
RecordStream produces a value or a
side-effect. In this case,
count returns the number of
Record instances passing the
filter above.
|
Additional operations supported On RecordStream
Optional<Record<Integer>> record = employeeReader.records()
.explain(System.out::println) // <1>
.batch(2) // <2>
.peek(RecordStream.log("{} from {}", NAME.valueOr(""),
COUNTRY.valueOr(""))) // <3>
.filter(COUNTRY.value().is("USA"))
.findAny();
long count = employeeReader.records()
.inline() // <4>
.count();
| 1 | The
RecordStream.explain operation observes the
stream pipeline and provides the pre-execution analysis information for this
stream pipeline. It takes, as a parameter, a
Consumer which is passed an explanation of
the stream execution plan.
RecordStream.explain returns a
RecordStream for further pipeline
construction. For
explain to be effective, the pipeline must
be terminated - the plan is not determined until the pipeline begins execution.
The
explain
Consumer is called once the pipeline is
closed. For a
RecordStream against a clustered TCStore
configuration, the explanation identifies the operations in each of the
server-side and client-side pipeline segments.
|
| 2 |
In a clustered configuration, a
Note: When batching is
not disabled, multiple elements may complete
processing in the server-side pipeline segment before any elements are
presented to the client-side pipeline segment. If one-at-a-time element
processing is required, the
inline operation (described below) must be
used to disable batching.
|
| 3 | The
RecordStream.log method produces a
Consumer for use in
Stream.peek to log a message according to
the specified format and arguments. The first argument provides a message
format like that used in the SLF4J
MessageFormatter.arrayFormat method. Each
subsequent argument supplies a value to be substituted into the message text
and is a mapping function that maps the stream element to the value to be
substituted. The formatted message is logged using the logging implementation
discovered by SLF4J (the logging abstraction used in TCStore). If the
peek(log(…)) operation is in the
server-side pipeline segment, the formatted message is logged on the TCStore
server. If the
peek(log(…)) operation is in the
client-side segment, the formatted message is logged in the client.
|
| 4 | The
RecordStream.inline operation disables the
element batching discussed above. When
inline is used, each stream element is
processed by both the server-side and client-side pipeline segments before the
next element is processed.
RecordStream.inline returns a
RecordStream for further pipeline
construction.
|
Mutable Record Stream
Obtained from the
DatasetWriterReader.records() method, a
MutableRecordStream extends
RecordStream to provide operations through which
Record instances in the
RecordStream may be changed. No more than one of the
mutating operations may be used in a pipeline. The changes in a
Record from a
MutableRecordStream mutation operation affect only
the
Dataset from which
MutableRecordStream was obtained (and to which the
Record belongs).
The following are the operations added in
MutableRecordStream:
mutateThen
The
MutableRecordStream.mutateThen operation is an
intermediate operation that accepts an
UpdateOperation instance describing a mutation to
perform on every Record passing through the
mutateThen operation. The output of
mutateThen is a
Stream<Tuple<Record, Record>> where the
Tuple holds the
before (Tuple.first()) and
after (Tuple.second()) versions of the
Record.
double sum = employeeWriterReader.records() // 1
.mutateThen(UpdateOperation.write(SALARY).doubleResultOf(
SALARY.doubleValueOrFail().increment())) // 2
.map(Tuple::getSecond) // 3
.mapToDouble(SALARY.doubleValueOrFail())
.sum();
| 1 | The
DatasetWriterReader.record() method, not
DatasetReader.record(), returns a
MutableRecordStream which is a
Stream of Records of the
Dataset referred by the
DatasetWriterReader.
|
| 2 |
MutableRecordStream.mutateThen() is an
intermediate operation and takes in
UpdateOperation as parameter and performs
the update transformation against the
Record instances in the stream.
|
| 3 |
MutableRecordStream.mutateThen() returns a
Stream of new
Tuple instances holding before and after
Record instances. Note that it does not
return a
RecordStream or a
MutableRecordStream.
|
deleteThen
The
MutableRecordStream.deleteThen operation is an
intermediate operation that deletes all
Record instances passing through the
deleteThen operation. The output of
deleteThen is a
Stream<Record> where each element is a deleted
Record. (Note the output is neither a
RecordStream nor a
MutableRecordStream.)
employeeWriterReader.records()
.filter(BIRTH_YEAR.value().isGreaterThan(1985))
.deleteThen() // <1>
.map(NAME.valueOrFail()) // <2>
.forEach(name -> System.out.println("Deleted record of " + name));
| 1 |
MutableRecordStream.deleteThen() is an
intermediate operation and deletes every
Record in the stream.
|
| 2 |
MutableRecordStream.deleteThen() returns a
Stream of the deleted
Record instances. Note that it does not
return a
RecordStream or a
MutableRecordStream.
|
mutate
The
MutableRecordStream.mutate operation is a terminal
operation that accepts an
UpdateOperation instance describing a mutation to
perform on every
Record reaching the
mutate operation. The return type of the
mutate operation is
void.
employeeWriterReader.records()
.filter(GENDER.value().is('M'))
.mutate(UpdateOperation.write(SALARY)
.doubleResultOf(SALARY.doubleValueOrFail().decrement())); <1>
| 1 |
|
delete
The
MutableRecordStream.delete operation is a terminal
operation deletes every
Record reaching the
delete operation. The return type of the
delete operation is
void.
employeeWriterReader.records()
.filter(BIRTH_YEAR.value().isLessThan(1945))
.delete(); // <1>
| 1 |
|
Stream pipeline execution and concurrent record mutations
During stream pipeline execution on a
Dataset, concurrent mutation of records on it are
allowed. Pipeline execution does not iterate over a point in time snapshot of a
Dataset - changes by concurrent mutations on a
Dataset may or may not be visible to a pipeline
execution depending on the position of its underlying
Record iterator.
Stream pipeline portability
In a clustered configuration, the
Record instances accessed through a
RecordStream are sourced from one or more Terracotta
servers. For large datasets, this can involve an enormous amount of data
transfer. To reduce the amount of data to transfer, there are
RecordStream capabilities and optimization
strategies that can be applied to significantly reduce the amount of data
transferred. One of these capabilities is enabled through the use of
portable pipeline operations. This capability and others
are described in the section
Streams.