astarte_devices_source
- type: producer
- output: any type of message
Producer block that generates messages by consuming events coming from Astarte devices. This block must be combined with Astarte AMQP Triggers which allow Device events to be routed to an AMQP exchange, while the block takes care of consuming those events and transforming them to Flow messages.
Note: currently only incoming_data
triggers on scalar interfaces (i.e. no arrays) are supported
Properties
realm
: the Realm which is generating the events (required, string)amqp_exchange
: the exchange where the events are being published (required, string)amqp_routing_key
: the routing key used to bind the consumer queue to the exchange (required, string)target_devices
: a list of Device IDs. If present, only events coming from these devices will generate a message (optional, array of strings)
realm
realm
The Realm that is generating the events. This is also used to enforce the exchange name format (see below)
amqp_exchange
amqp_exchange
The exchange where the events are being published. This must match the exchange that was used while
setting up the AMQP trigger, as this block will bind a queue to that exchange (possibly using the
configured amqp_routing_key
).
This exchange name must follow the same restrictions that are present in Astarte triggers, namely it must match this regular expression
^astarte_events_$realm_[a-zA-Z0-9_\.\:]+$
where $realm
must be substituted with the provided realm
.
amqp_routing_key
amqp_routing_key
The routing key used to bind the queue to the amqp_exchange
. This must match the
amqp_routing_key
used in the AMQP trigger.
target_devices
target_devices
A list of target Device IDs (e.g. ["BWh_mWEATIumVX8asPgxag", "A4QR-S9FTrahKiJihYAetA"]
). If it's
provided, only events coming from the target devices will be considered and generate messages, all
the others will be discarded.
Keep in mind that filtering at this level will still generate the events on the Astarte side and publish them on RabbitMQ, so if it's possible it's better filtering at the trigger level instead, since Astarte supports specifying a Device ID or a group when creating a trigger.
Output message
Each consumed event will produce a Flow Message with the fields constructed in the following way:
key
: the key format will berealm/device_id/interface/path
. For example, an event coming from the device with Device IDEA8lpOrkR8OJ-bp9dSVYQA
in thetest
realm on theorg.astarte-platform.genericsensors.Values
interface with/mysensor/value
path will produce a message with keytest/EA8lpOrkR8OJ-bp9dSVYQA/org.astarte-platform.genericsensors.Values/mysensor/value
data
: the data sent from the Device. If a device sends42.3
on the above mentioned interface,data
will contain42.3
.type
: thetype
of the data. Currently this is not explicitly sent inside the event, so the type is deduced using an heuristic approach. This must be handled with care especially with numeric types, where there can be some instances where adouble
interface generates messages withinteger
type.timestamp
: contains the timestamp (in microseconds) of the event. If the source interface supportsexplicit_timestamp
, the timestamp is the one explicitly sent from the device, otherwise it's the reception timestamp.
Examples
The following example uses astarte_devices_source
block to generate messages coming from the
test
Realm on the org.astarte-platform.genericsensors.Values
interface.
astarte_devices_source
.realm("test")
.amqp_exchange("astarte_events_test_1")
[...]
To make the example work, the following trigger must be installed in Astarte in the test
Realm:
{
"name": "example_trigger",
"action": {
"amqp_exchange": "astarte_events_test_1",
"amqp_message_expiration_ms": 10000,
"amqp_message_persistent": false
},
"simple_triggers": [
{
"type": "data_trigger",
"on": "incoming_data",
"interface_name": "org.astarte-platform.genericsensors.Values",
"interface_major": 1,
"match_path": "/*"
}
]
}