lib task

OverviewRecord TasksEphemeral TasksLifecycleSecuritySubscriptionsMessagingFuturesLocalsDeadlocksTask|Debug

Overview

The task extension defines a framework for running Axon on asynchronous background threads; it is based on the Fantom actor framework.

A task is an actor which processes messages asynchronously with an Axon function. When a message is sent to a task it is placed onto a queue and the caller is given back a future. Futures are used to access the asynchronous result.

The task extension uses a thread pool to manage background computation. By default the max number of threads in this pool is 50, but it may be tuned under Settings|Task. Tasks are scheduled to run by sending them messages. Each task manages its messages with its own message queue. Once a task has a message enqueued, it is assigned a background thread from the pool and will process the message asynchronously. Tasks share the thread pool using cooperative multitasking. When there are more tasks with work than there are threads in the thread pool, then some tasks will be put into a pending state until a thread frees up. When this happens tasks will attempt to yield their thread. But a task can only yield a thread between messages. A task cannot yield its thread if its blocking on operations such network I/O or taskSleep(). This means it is better to design your tasks to have short processing times for each message and limit blocking operations.

Record Tasks

The standard way to create a task is by creating a record in the project's Folio database with the task marker tag. The following tags are used to configure a task:

  • dis: appropriate display name
  • task: required marker tag
  • taskExpr: Axon expression for message processing
  • disabled: marker to set task into the disabled state

The taskExpr tag may be a top-level function name or a lambda expression. It should be a function which takes one parameter (the message to process). Some tasks do not need access to their message in which case any expression may be used.

Here are some examples:

dis: "Task with top level function"
task
taskExpr: "myTaskFunc"

dis: "Task with lambda function"
task
taskExpr: "(msg) => echo(msg)"

dis: "Task which does not use message"
task
taskExpr: "runInBackground()"

Ephemeral Tasks

An ephemeral task is a short lived task used to run an expression on a background thread. Ephemeral tasks are spawned with the taskRun() function which does the following:

  • creates a new task
  • schedules the expression to be run in the background using the standard task extension thread pool
  • runs the expression and returns result via future
  • the task is garbage collected

Here is a simple example which runs a computation asynchronously using the task extension's thread pool:

// parse a CSV file asynchronously on background thread
future: taskRun(ioReadCsv(`io/import-sites.csv`))

Note that the expr passed to taskRun will be executed on a different thread with a different Axon context. It will not have access to the local varaibles in scope when calling taskRun. So you cannot use any local variables in the taskRun expression. If you need to pass a variable to a task, then you can send it via the message parameter:

// this will not work - the variable str is not accessible
str: "hello"
taskRun(echo(str)).futureGet

// this will work because we pass str as the message
str: "hello"
taskRun((msg)=>echo(msg), str).futureGet

Lifecycle

Task records are mapped one-to-one to an actor by the task extension. If there is a configuration error then the task is put into a "fault" condition. Applying the disabled marker tag puts the task into the "disabled" state. Tasks in the fault and disabled state will not run, nor process messages.

Whenever the system detects a change to the task record, then the task is restarted. This means that the existing actor is killed and any messages in its queue are thrown away. A new actor is spawned and the task begins again with fresh state and no local variables. You can also restart a task with the taskRestart() function.

Security

Tasks are executed in an axon context using the local user with the username "task". Use the userAllow function to grant specific superuser functions to your tasks. The task user account must not be configured with the "su" role or it will not be used.

If no user is defined by the username "task", then an inferred synthetic user with the "admin" role is used for all evaluations. The synthetic user has its projAccessFilter set to restrict access to the local project only. XQuery to other projects is not available.

Remember that by default all admin users can create/edit tasks. So it is important restrict what can be performed by tasks using the user database.

If testing changes to the task user account, you can call taskRefreshUser() to force an immediate cache refresh.

Subscriptions

Tasks may subscribe to an observable. Subscribed tasks receive observations from the observable as their messages to process.

Tasks subscribe to an observable by applying one of the following marker tags:

Additional observable configuration tags are configured as tags on the task itself. Use the Task|Debug view to check the current status of your subscription. Tasks may only subscribe to one observer.

Subscriptions are made only after the project reaches steady state. No messages are enqueued after startup until the steady state timer has elapsed.

Example task scheduled to run every 10sec:

dis: "Example task"
task
obsSchedule
obsScheduleFreq: 10sec
taskExpr: logInfo("test", "Run this expression every 10sec")

Messaging

The following functions are used to send a message to a task:

All three functions will return a future which is a proxy for the asynchronous result. The first parameter is always the task itself which is typically the result of looking up a task via the task() function. Or you can pass an id for a task record. Example:

// send a message to a task immediately
future: taskSend(taskId, msg)

// schedule a message to be enqueued after 10sec
future: taskSendLater(taskId, 10sec, msg)

// future chaining - send future1 as a message to task2 only
// after task1 completes processing of the message
future1: taskSend(task1, msg1)
future2: taskSendWhenComplete(task2, future1, future1)

Note that all messages and return results must be immutable. Most values in Axon are already immutable. However there are some values used in Axon which are mutable and will raise a NotImmutableErr if used as a message or return result.

Futures

Futures are a proxy for the result of an asynchronous computation. A future is returned every time a message is sent to a task and enqueued. The following functions are used to work with futures:

While the message is enqueued its said to be pending. A future becomes complete when one of the following occurs:

  • The message is processed and the task returns a result value
  • The message is processed and the task raises an exception
  • The future is cancelled

You can poll the current state of a future with futureState() which returns one of the following string constants:

  • "pending": the message is still enqueued or being processed
  • "ok": the message has been processed and result is ready
  • "err": the message has been processed and exception was raised
  • "cancelled": the future was cancelled

The simplest way to get the result of a future is to call futureGet(). Calling this function will block the current thread until the future enters one of the three completed states. By default the thread will block forever, or you may pass a timeout duration. The following are possible outcomes on the caller's thread:

  • if task completed ok, then the resulting value is returned
  • if task raised an exception, then it is raised to the caller
  • if future was cancelled, then a CancelledErr is raised
  • if timeout occurs, then TimeoutErr is raised

Here are some code examples which illustrate how to send a task a message and then block until the result is ready:

// block forever
result: taskSend(taskId, msg).futureGet

// block up to 10sec
result: taskSend(taskId, msg).futureGet(10sec)

You can also use the futureWaitFor() function to block until a future completes. It takes an optional timeout. Unlike futureGet, the futureWaitFor function returns the future itself not the result. This means you can block until the future is completed and then check its state:

f: taskSend(taskId, msg)
futureWaitFor(f, 1min)
if (f.futureState == "ok")
  echo("Future is ok, result = " + f.futureGet)
else
  echo("Future not ok")

The futureWaitForAll() function is used to block on a list of futures. Its used when performing multiple asynchronous tasks in parallel:

futures: messages.map m => taskSend(taskId, m)
futureWaitForAll(futures)
results: futures.map f => f.futureGet

Locals

A task local is a variable bound to the task and only available while the task is running. Task locals provide a mechanism to save state between message processing. These three functions may be used to manage your task's locals:

Here is an example which shows using a local variable to count how many messages are processed:

(msg) => do
  count: taskLocalGet("count", 0)
  logInfo("test", "count = " + count)
  taskLocalSet("count", count+1)
end

Task local variable names are required to be valid tag names. These functions may only be used within the context of task message processing.

Deadlocks

A deadlock is when two or more threads are blocking on each other. They occur due to programming errors. Deadlocks are severe errors because there is no safe recovery except to restart the server. The actor model used by the task framework minimizes the chance for deadlock compared to other asynchronous design patterns, but it does not eliminate them.

One simple way to avoid deadlocks is to ensure you always use timeouts in your blocking operations. Timeouts will ensure that eventually the code will unblock itself with a timeout exception.

Any task which processes messages where client code blocks on the results should never perform a blocking operation itself.

Debugging deadlocks is done in the Debug|Threads view which will list threads which have deadlocked at the start of the dump.

Task|Debug

The Task|Debug view provides diagnostics for all your configured tasks. The columns are as follows:

The type column:

  • rec: record task with good configuration
  • fault: record task with invalid configuration
  • disabled: record task with the disabled marker tag
  • ephemeral: short lived task from taskRun() function

The status column:

  • fault: when type is fault
  • disabled: when type is disabled
  • idle: no messages queued, waiting for new messages
  • pending: messages are queued, waiting for a thread to process
  • running: actively processing messages on a background thread
  • killed: shutting down and no longer processing messages

The subscription column shows the task's current observable subscription. If there was a subscribe error due to invalid configuration, the error is shown here. This column will be blank for tasks which do not subscribe to an observable.

The progress column provides visibility to user data for debugging task activities. The progress is an arbitrary dict updated by the task with the taskProgress() function.

The errNum column indicates the total number of messages which raised exceptions (versus returned a result value). You can use the Details action to see the last exception. Note that messages which raise an error do not affect the status. If you are looking for tasks throwing exceptions, then look for non-zero values in this column.

The queueSize indicates current number of messages in the queue. The queuePeak indicates the peak of the queue - high numbers indicate task queue backlogs.

The evalNum indicates total number of messages processed. The evalTotalTime is the total time used to process all those messages. The evalAvgTime is the average of evalTotalTime and evalNum.

The fault column is the error message if the task has a configuration error.

Note that all statistics such as errNum and evalNum are reset to zero when the task is restarted due to a configuration change.