Streams
Overview
Streams are Axon data processing pipelines. Streams don't provide any in-memory storage such as the List or Grid collection types. Rather streams define a series of processing steps which process the data lazily.
Lets examine a simple example to illustrate:
// without streams readAll(site) .map(s => s.set("area", s->area.to(1m²))) .findAll(s => s->area > 1000m²) // with streams readAllStream(site) .map(s => s.set("area", s->area.to(1m²))) .findAll(s => s->area > 1000m²) .collect(toGrid)
Both snippets of code run the following pipeline:
- read all the sites from Folio
- convert the
area
tag to m² - filter out only sites with area greater than 1000m²
- return a Grid result
We can see that the transformation steps using map()
and findAll()
are identical. But in the first example we start with readAll()
which reads all the sites into memory at once as a grid. Then we call map()
which constructs another immutable in grid in-memory. And then we call findAll()
which constructs another grid in-memory. All these intermediate grids are relatively expensive to create.
Alternatively the streaming approach is much more efficient. We start with readAllStream()
which unlike readAll()
does not construct an in-memory grid, but rather iterates the stream each time a new matching record is found. The calls to map()
and findAll()
don't create any intermediate grids, but rather just process the items as they are streamed from Folio. The last step explicitly collects all our mapped/found records into a grid - this is the only step which creates an in-memory grid.
Terminology
We use the following terminology in the stream APIs:
- source: this represents the source of the data to stream. Typically the source is an in-memory collection or a call to
readAllStream()
. All source functions return an instance of stream. - transform: intermediate stream stages which perform data transformation. All transform functions take a stream as the first parameter and return a new stream instance.
- terminal: last stage of a stream which performs a collection or reduction of the data stream. Terminal functions take a stream as the first parameter and always return a non-stream value.
- upstream: all the stages to run before a given stage
- downstream: all the stages to run after a given stage
- bounded: a stream with a definitive end; for example any stream from an in-memory collection is bounded by the collection's size
- unbounded: a stream with a potentially infinite flow of data items.
- back pressure: streams which automatically throttle their processing when the source produces data faster than the terminal step can consume
Function Index
Stream creation functions:
- stream: create stream from in-memory collection
- streamCol: stream the cells of a grid column
- readAllStream: stream records from Folio with filter
- readByIdsStream: stream records from Folio by id
- ioStreamLines: read a stream of Str lines
- ioStreamCsv: read a stream of Dicts from CSV
Stream transformation functions:
- limit: truncate stream after number of data items
- skip: skip number of data items in stream
- map: map each item to new item
- flatMap: map each item to zero or more new items
- filter: filter dicts using Haystack filter expression
- findAll: filter items with predicate function
- addMeta: add grid level meta
- setMeta: set grid level meta
- addColMeta: add column level meta
- setColMeta: set column level meta
- reorderCols: reorder grid column
- removeCol: remove grid column
- removeCols: remove grid columns
- keepCols: keep grid columns
Stream termination functions:
- collect: collect to in-memory list
- each: iterate all items to callback function
- eachWhile: iterate items to callback function with break
- first: return first stream data item
- last: return last stream data item
- find: find first matching item
- all: return if all items match predicate function
- any: return if any item matches predicate function
- reduce: reduce stream into a single accumulation value
- fold: fold stream of numbers
- commit: commit a stream of diffs to Folio database
- feed: creates live data stream to populate view
Function Details
Details on each streaming function.
addColMeta
The addColMeta()
function adds column level meta when collecting to a grid. The tags are merged with upstream grid meta. This function infers collect(toGrid)
.
readAllStream(site).addColMeta("geoAddr", {hidden}).collect
addMeta
The addMeta()
function adds grid level meta when collecting to a grid. The tags are merged with upstream grid meta. This function infers collect(toGrid)
.
readAllStream(site).addMeta({title:"Sites"}).collect
all
The all()
function terminates a stream and returns a Bool indicating if all items match a predicate function. The stream is short circuited on first false match.
[1, 2, 3].stream.all(v => v.isOdd)
any
The any()
function terminates a stream and returns a Bool indicating if any items match a predicate function. The stream is short circuited on first true match.
[1, 2, 3].stream.any(v => v.isOdd)
collect
The collect()
function terminates a stream by collecting the items into an in-memory collection. Collect will return either a List or Grid depending on the to
parameter. To explicitly specify the resulting collection type use the toList()
or toGrid()
function as follows:
(0..10).stream.collect(toList) (0..10).stream.collect(toGrid)
If the to
function is omitted, then a default is chosen automatically using the following rules:
- If source is
stream()
of Grid, return Grid - If any stream stage manipulates grid meta such as
setMeta
, return Grid - If none of the conditions above are met, then return List
You can use the debugType()
to check the return type of collect.
commit
The commit()
function terminates a stream of diffs by writing them to the Folio database. The previous step must generate a stream of diff()
objects. Nulls values are implicitly skipped.
This function will return the number of diffs committed, but this is not guaranteed behavior and might change in the future.
This function will automatically apply back pressure by blocking periodically to ensure the Folio write queues are fully processed before proceeding
(1..5).stream.map(n => diff(null, {dis:"C-"+n}, {add})).commit
each
The each()
function terminates a stream by iterating every item to a callback function:
(0..10).stream.each(v => echo(v))
eachWhile
The eachWhile()
function terminates a stream and iterates every item to a callback function. If the function returns non-null then the stream is immediately closed and the resulting object is returned. If the function returns null for every item, then the entire stream is iterated and null is returned.
(0..10).stream.eachWhile v => do echo(v) if (v > 5) "break" else null end
feed
The feed() function terminates a stream and creates a live data feed for view consumption (SkySpark only):
readAllStream(point).limit(5).feed
filter
The filter()
function transforms a stream by filtering items using a filter expression:
readAllStream(equip).filter(siteMeter and elec and meter).collect
find
The find()
function terminates a stream with the first item matched by a predicate function:
(0..10).stream.find(v => v.isOdd)
findAll
The findAll()
function transforms a stream by filtering items using a predicate function:
(0..10).stream.findAll(v => v.isOdd).collect
first
The first()
function terminates a stream and returns the first data item streamed:
(0..10).stream.first
fold
The fold()
function terminates a stream by folding all upstream items using a folding function:
(0..10).stream.fold(sum)
flatMap
The flatMap()
function transforms a stream using a 1-to-N transform function. The transform function must return a list or grid of items to send downstream. You can return null to indicate no items (same as empty list).
readAllStream(site) .flatMap(s => s.toEquips) .collect
ioStreamCsv
The ioStreamCsv()
creates a stream of Dicts by reading the rows of a comma separated value I/O handle.
ioStreamCsv(`io/import-points.csv`).limit(3).collect
ioStreamLines
The ioStreamLines()
creates a stream to read lines from an I/O handle.
ioStreamLines(`io/import-points.csv`).limit(3).collect
keepCols
The keepCols()
function specifies the list of column names to keep when collecting to a grid. This function infers collect(toGrid)
.
readAllStream(site).keepCols(["dis", "area"]).collect
last
The last()
function terminates a stream and returns the last data item streamed:
(0..10).stream.last
limit
The limit()
function truncates the stream after a given limit threshold is reached:
readAllStream(point).limit(5).collect
map
The map()
function transforms a stream using a 1-to-1 transform function:
readAllStream(site) .map(s => s.set("area", s->area.to("m²"))) .collect
readAllStream
The readAllStream()
function creates a new stream of Dict records using a filter expression:
readAllStream(equip).collect
readByIdsStream
The readByIdsStream()
function creates a new stream of Dict records using a list of ids:
readByIdsStream([id1, id2]).collect
removeCol
The removeCol()
function removes one column by name when collecting to a grid. This function infers collect(toGrid)
.
readAllStream(site).removeCol("geoAddr").collect
removeCols
The removeCols()
function removes a list of column names when collecting to a grid. This function infers collect(toGrid)
.
readAllStream(site).removeCols(["geoAddr", "geoStreet"]).collect
reorderCols
The reorderCols()
function specifies an explicit column order when collecting to a grid. It takes a list of column names. Any columns not specified in the list are removed. This function infers collect(toGrid)
.
readAllStream(site).reorderCols(["dis", "area"]).collect
reduce
The reduce()
function terminates a stream by reducing all upstream items using a custom reduction function:
(1..5).stream.reduce(1, (acc,val)=>acc*val)
setColMeta
The setColMeta()
function replaces column level meta when collecting to a grid. Any column meta inferred from upstream steps is cleared. This function infers collect(toGrid)
.
readAllStream(site).setColMeta("geoAddr", {hidden}).collect
setMeta
The setMeta()
function replaces grid level meta when collecting to a grid. Any grid meta inferred from upstream steps is cleared. This function infers collect(toGrid)
.
readAllStream(site).setMeta({title:"Sites"}).collect
skip
The skip()
function discards a given number of items at the start of stream:
(1..10).stream.skip(3).collect
stream
The stream()
function creates a new stream from an in-memory collection:
- List: stream the items
- Grid: stream the rows
- Range: stream the inclusive range as Numbers
Examples:
["a", "b", "c"].stream.collect (3..7).stream.collect
streamCol
The streamCol()
function creates a new stream from a grid and column. It turns the column's cell into a sequence of the cells.
Examples:
readAll(site).streamCol("area").fold(sum)