Streams

OverviewTerminologyFunction IndexFunction Details

Overview

Streams are Axon data processing pipelines. Streams don't provide any in-memory storage such as the List or Grid collection types. Rather streams define a series of processing steps which process the data lazily.

Lets examine a simple example to illustrate:

// without streams
readAll(site)
  .map(s => s.set("area", s->area.to(1m²)))
  .findAll(s => s->area > 1000m²)

// with streams
readAllStream(site)
  .map(s => s.set("area", s->area.to(1m²)))
  .findAll(s => s->area > 1000m²)
  .collect(toGrid)

Both snippets of code run the following pipeline:

  1. read all the sites from Folio
  2. convert the area tag to m²
  3. filter out only sites with area greater than 1000m²
  4. return a Grid result

We can see that the transformation steps using map() and findAll() are identical. But in the first example we start with readAll() which reads all the sites into memory at once as a grid. Then we call map() which constructs another immutable in grid in-memory. And then we call findAll() which constructs another grid in-memory. All these intermediate grids are relatively expensive to create.

Alternatively the streaming approach is much more efficient. We start with readAllStream() which unlike readAll() does not construct an in-memory grid, but rather iterates the stream each time a new matching record is found. The calls to map() and findAll() don't create any intermediate grids, but rather just process the items as they are streamed from Folio. The last step explicitly collects all our mapped/found records into a grid - this is the only step which creates an in-memory grid.

Terminology

We use the following terminology in the stream APIs:

  • source: this represents the source of the data to stream. Typically the source is an in-memory collection or a call to readAllStream(). All source functions return an instance of stream.
  • transform: intermediate stream stages which perform data transformation. All transform functions take a stream as the first parameter and return a new stream instance.
  • terminal: last stage of a stream which performs a collection or reduction of the data stream. Terminal functions take a stream as the first parameter and always return a non-stream value.
  • upstream: all the stages to run before a given stage
  • downstream: all the stages to run after a given stage
  • bounded: a stream with a definitive end; for example any stream from an in-memory collection is bounded by the collection's size
  • unbounded: a stream with a potentially infinite flow of data items.
  • back pressure: streams which automatically throttle their processing when the source produces data faster than the terminal step can consume

Function Index

Stream creation functions:

Stream transformation functions:

  • limit: truncate stream after number of data items
  • skip: skip number of data items in stream
  • map: map each item to new item
  • flatMap: map each item to zero or more new items
  • filter: filter dicts using Haystack filter expression
  • findAll: filter items with predicate function
  • addMeta: add grid level meta
  • setMeta: set grid level meta
  • addColMeta: add column level meta
  • setColMeta: set column level meta
  • reorderCols: reorder grid column
  • removeCol: remove grid column
  • removeCols: remove grid columns
  • keepCols: keep grid columns

Stream termination functions:

  • collect: collect to in-memory list
  • each: iterate all items to callback function
  • eachWhile: iterate items to callback function with break
  • first: return first stream data item
  • last: return last stream data item
  • find: find first matching item
  • all: return if all items match predicate function
  • any: return if any item matches predicate function
  • reduce: reduce stream into a single accumulation value
  • fold: fold stream of numbers
  • commit: commit a stream of diffs to Folio database
  • feed: creates live data stream to populate view

Function Details

Details on each streaming function.

addColMeta

The addColMeta() function adds column level meta when collecting to a grid. The tags are merged with upstream grid meta. This function infers collect(toGrid).

readAllStream(site).addColMeta("geoAddr", {hidden}).collect

addMeta

The addMeta() function adds grid level meta when collecting to a grid. The tags are merged with upstream grid meta. This function infers collect(toGrid).

readAllStream(site).addMeta({title:"Sites"}).collect

all

The all() function terminates a stream and returns a Bool indicating if all items match a predicate function. The stream is short circuited on first false match.

[1, 2, 3].stream.all(v => v.isOdd)

any

The any() function terminates a stream and returns a Bool indicating if any items match a predicate function. The stream is short circuited on first true match.

[1, 2, 3].stream.any(v => v.isOdd)

collect

The collect() function terminates a stream by collecting the items into an in-memory collection. Collect will return either a List or Grid depending on the to parameter. To explicitly specify the resulting collection type use the toList() or toGrid() function as follows:

(0..10).stream.collect(toList)
(0..10).stream.collect(toGrid)

If the to function is omitted, then a default is chosen automatically using the following rules:

  1. If source is stream() of Grid, return Grid
  2. If any stream stage manipulates grid meta such as setMeta, return Grid
  3. If none of the conditions above are met, then return List

You can use the debugType() to check the return type of collect.

commit

The commit() function terminates a stream of diffs by writing them to the Folio database. The previous step must generate a stream of diff() objects. Nulls values are implicitly skipped.

This function will return the number of diffs committed, but this is not guaranteed behavior and might change in the future.

This function will automatically apply back pressure by blocking periodically to ensure the Folio write queues are fully processed before proceeding

(1..5).stream.map(n => diff(null, {dis:"C-"+n}, {add})).commit

each

The each() function terminates a stream by iterating every item to a callback function:

(0..10).stream.each(v => echo(v))

eachWhile

The eachWhile() function terminates a stream and iterates every item to a callback function. If the function returns non-null then the stream is immediately closed and the resulting object is returned. If the function returns null for every item, then the entire stream is iterated and null is returned.

(0..10).stream.eachWhile v => do
  echo(v)
  if (v > 5) "break" else null
end

feed

The feed() function terminates a stream and creates a live data feed for view consumption (SkySpark only):

readAllStream(point).limit(5).feed

filter

The filter() function transforms a stream by filtering items using a filter expression:

readAllStream(equip).filter(siteMeter and elec and meter).collect

find

The find() function terminates a stream with the first item matched by a predicate function:

(0..10).stream.find(v => v.isOdd)

findAll

The findAll() function transforms a stream by filtering items using a predicate function:

(0..10).stream.findAll(v => v.isOdd).collect

first

The first() function terminates a stream and returns the first data item streamed:

(0..10).stream.first

fold

The fold() function terminates a stream by folding all upstream items using a folding function:

(0..10).stream.fold(sum)

flatMap

The flatMap() function transforms a stream using a 1-to-N transform function. The transform function must return a list or grid of items to send downstream. You can return null to indicate no items (same as empty list).

readAllStream(site)
  .flatMap(s => s.toEquips)
  .collect

ioStreamCsv

The ioStreamCsv() creates a stream of Dicts by reading the rows of a comma separated value I/O handle.

ioStreamCsv(`io/import-points.csv`).limit(3).collect

ioStreamLines

The ioStreamLines() creates a stream to read lines from an I/O handle.

ioStreamLines(`io/import-points.csv`).limit(3).collect

keepCols

The keepCols() function specifies the list of column names to keep when collecting to a grid. This function infers collect(toGrid).

readAllStream(site).keepCols(["dis", "area"]).collect

last

The last() function terminates a stream and returns the last data item streamed:

(0..10).stream.last

limit

The limit() function truncates the stream after a given limit threshold is reached:

readAllStream(point).limit(5).collect

map

The map() function transforms a stream using a 1-to-1 transform function:

readAllStream(site)
  .map(s => s.set("area", s->area.to("m²")))
  .collect

readAllStream

The readAllStream() function creates a new stream of Dict records using a filter expression:

readAllStream(equip).collect

readByIdsStream

The readByIdsStream() function creates a new stream of Dict records using a list of ids:

readByIdsStream([id1, id2]).collect

removeCol

The removeCol() function removes one column by name when collecting to a grid. This function infers collect(toGrid).

readAllStream(site).removeCol("geoAddr").collect

removeCols

The removeCols() function removes a list of column names when collecting to a grid. This function infers collect(toGrid).

readAllStream(site).removeCols(["geoAddr", "geoStreet"]).collect

reorderCols

The reorderCols() function specifies an explicit column order when collecting to a grid. It takes a list of column names. Any columns not specified in the list are removed. This function infers collect(toGrid).

readAllStream(site).reorderCols(["dis", "area"]).collect

reduce

The reduce() function terminates a stream by reducing all upstream items using a custom reduction function:

(1..5).stream.reduce(1, (acc,val)=>acc*val)

setColMeta

The setColMeta() function replaces column level meta when collecting to a grid. Any column meta inferred from upstream steps is cleared. This function infers collect(toGrid).

readAllStream(site).setColMeta("geoAddr", {hidden}).collect

setMeta

The setMeta() function replaces grid level meta when collecting to a grid. Any grid meta inferred from upstream steps is cleared. This function infers collect(toGrid).

readAllStream(site).setMeta({title:"Sites"}).collect

skip

The skip() function discards a given number of items at the start of stream:

(1..10).stream.skip(3).collect

stream

The stream() function creates a new stream from an in-memory collection:

  • List: stream the items
  • Grid: stream the rows
  • Range: stream the inclusive range as Numbers

Examples:

["a", "b", "c"].stream.collect
(3..7).stream.collect

streamCol

The streamCol() function creates a new stream from a grid and column. It turns the column's cell into a sequence of the cells.

Examples:

readAll(site).streamCol("area").fold(sum)