Astarte.Flow.Blocks.DeviceEventsProducer (astarte_flow v0.1.0)

This is a producer block that generates messages by polling consuming from an AMQP queue that contains a stream of Astarte.Core.Triggers.SimpleEvents.SimpleEvent generated by Data Updater Plant.

To make Data Updater Plant populate the queue, a Trigger has to be installed, using the same routing_key that will be passed in the start_link options to this block.

If the Event can't be decoded, no message is produced and the event is rejected from the queue.

If the request succeeds, DeviceEventsProducer produces an %Astarte.Flow.Message{} containing these fields:

  • key contains a key in the format REALM/DEVICE_ID/INTERFACE/PATH.
  • data contains the value obtained by extracting the bson_value of the trigger.
  • type is inferred from the bson_value of the trigger.
  • timestamp contains the timestamp (in microseconds) contained in the trigger or, if it didn't contain one, the reception timestamp of the event.

Starts the DeviceEventsProducer.

@spec start_link(options) :: GenServer.on_start()
when options: [option],
       {:routing_key, String.t()}
       | {:realm, String.t()}
       | {:target_devices, [String.t()]}
       | {:queue, String.t()}
       | {:connection, keyword()}
       | {:exchange, String.t()}
       | {:prefetch_count, non_neg_integer()}
       | {:client, module()}

  • :exchange (required) - The name of the exchange the queue will be bound to
  • :routing_key (required) - The routing key used to bind the queue to the exchange.
  • :realm (required) - The realm the producer will listen to.
  • :target_devices - A list of device IDs. If provided, only events coming from these devices will be processed.
  • :queue - The name of the queue that gets declared by the block. Defaults to an autogenerated name.
  • :connection - A keyword list containing the options that will be passed to Defaults to [].
  • :prefetch_count - The prefetch count of the AMQP channel. Defaults to 100.
  • :client - A module that implements the Astarte.Flow.Blocks.DeviceEventsProducer.AMQPClient behaviour and that will be used to connect to AMQP. Defaults to Astarte.Flow.Blocks.DeviceEventsProducer.RabbitMQClient