Defining a Pipeline

Astarte Flow has its own DSL for pipeline definition.

Pipe Operator

The core construct behind Astarte Flow pipelines is the pipe operator | which connects two blocks.

In the following example messages flowing out from block1 (the source) will be processed from block2 and they will be processes from block3 (the sink):

    block1 | block2 | block3

New lines are allowed (and suggested) for readabilty reasons.

With Operator

Most of the times blocks are not usable alone without any kind of configuration.

Astarte Flow DSL defines a . operator that can be read as "with". Several . can be chained together.

Syntax for . operator is block.property1(value1).property2(value2).

The following example can be read as: "Pipe messages from astarte_devices_source block with realm "test" to filter block with script "return message.value > 0".

astarte_devices_source
  .realm("test")
| filter
  .script("return message.value > 0")

Literals

In Astarte Flow DSL literals are meant to be used with . operator.

integers

Integers

Base 10 integers are allowed, such as -1, 0 and 89.

booleans

Booleans

true and false boolean values are allowed.

strings

Strings

Strings are marked with "", such as "a string". In some situations it might be convenient using multi-line strings such as:

"""
A
"multi-line"
string
"""

Multi-line strings are convenient for Lua scripting.

objects

Objects

JSON like objects are allowed, however there is a major difference: "" around keys are not required (and should not be used).

{
  a: 1,
  b: 2,
  c: 3
}

jsonpath

JSONPath

JSONPath expressions such as ${ $.config.something } play a special role in Astarte Flow DSL, since they allow queries to any configuration object.

Following example uses a JSON Path expressions instead of harcoding the realm name.

astarte_devices_source
    .realm(${$.config.realm})

Note: JSONPath expressions must always appear as bare JSONPath expressions and must not be surrounded by double quotes, even if they represent string variables.

Pipeline Example

astarte_devices_source
    .realm(${$.config.realm})
| filter
    .script("""
            return string.find(message.key, "org.astarte%-platform.genericsensors.Values") ~= nil;
            """)
| lua_map
    .script("""
            tokens = string_utils.split(message.key, "/");
            sensor_id = tokens[4];
            new_device_id = uuid.get_v5_base64("b25aa4c6-a55b-4231-a080-a5af629d05a3", sensor_id);
            new_message = {};
            new_message.key = config.realm .. "/" .. new_device_id .. "/org.astarte-platform.ComputedValues/value";
            new_message.data = (message.data - 32) / 1.8;
            return new_message;
            """)
    .config(${config})
| virtual_device_pool
    .pairing_url("https://example.com/pairing/v1")
    .realms([{realm: ${$.config.realm}, jwt: ${$.config.pairing_jwt}}])
    .interfaces(${$.config.interfaces})

Installing a Pipeline

Pipelines can be installed using Astarte Flow REST API or using Astarte Dashboard.

Running a Pipeline

Pipelines are just blueprints and they must be instantiated so they can start processing messages, once instantiated they are called Flows. Pipelines must be installed in order to be instantiated, a pipeline which has not been instantiated does not consume any resource (and it does not process any message).