Astarte.Flow.Blocks.DeviceEventsProducer (astarte_flow v0.1.0)

This is a producer block that generates messages by polling consuming from an AMQP queue that contains a stream of Astarte.Core.Triggers.SimpleEvents.SimpleEvent generated by Data Updater Plant.

To make Data Updater Plant populate the queue, a Trigger has to be installed, using the same routing_key that will be passed in the start_link options to this block.

If the Event can't be decoded, no message is produced and the event is rejected from the queue.

If the request succeeds, DeviceEventsProducer produces an %Astarte.Flow.Message{} containing these fields:

  • key contains a key in the format REALM/DEVICE_ID/INTERFACE/PATH.
  • data contains the value obtained by extracting the bson_value of the trigger.
  • type is inferred from the bson_value of the trigger.
  • timestamp contains the timestamp (in microseconds) contained in the trigger or, if it didn't contain one, the reception timestamp of the event.

Link to this section Summary

Functions

Starts the DeviceEventsProducer.

Link to this section Functions

Link to this function

start_link(opts)

@spec start_link(options) :: GenServer.on_start()
when options: [option],
     option:
       {:routing_key, String.t()}
       | {:realm, String.t()}
       | {:target_devices, [String.t()]}
       | {:queue, String.t()}
       | {:connection, keyword()}
       | {:exchange, String.t()}
       | {:prefetch_count, non_neg_integer()}
       | {:client, module()}

Starts the DeviceEventsProducer.

options

Options

  • :exchange (required) - The name of the exchange the queue will be bound to
  • :routing_key (required) - The routing key used to bind the queue to the exchange.
  • :realm (required) - The realm the producer will listen to.
  • :target_devices - A list of device IDs. If provided, only events coming from these devices will be processed.
  • :queue - The name of the queue that gets declared by the block. Defaults to an autogenerated name.
  • :connection - A keyword list containing the options that will be passed to AMQP.Connection.open/1. Defaults to [].
  • :prefetch_count - The prefetch count of the AMQP channel. Defaults to 100.
  • :client - A module that implements the Astarte.Flow.Blocks.DeviceEventsProducer.AMQPClient behaviour and that will be used to connect to AMQP. Defaults to Astarte.Flow.Blocks.DeviceEventsProducer.RabbitMQClient