Implementing GenStages Under Umbrella

GenStage Under Umbrella — Part 3

iacobson
3 min readSep 29, 2017

With the tests in place, we can start the actual implementation of our application. We will implement two parallel GenStages information flows. They closely follow the tests we wrote in the last article.

Receive GenStages

First, we need to add GenStage as a dependency. We will put it in the Shared mix.exs: :gen_stage, “~> 0.12”. Then we start with the producer.

ReceiveProducer

./apps/ger_market/lib/ger_market/receive_producer.ex

We use a basic GenStage producer implementation, heavily inspired by the official docs. As said before, the main topic of the article is not the GenStage itself. But using it as a way of communication between Umbrella Apps. This is why we do not insist on optimizations, demand management, etc. We let the default GenStage implementation to handle the things for us.

We start the server with :no_state, as we will not need one for our demo app. The receive_info/1 function will be the entry point to our Receive GenStage. When we receive a new stock market message, the handle_call callback will either:

  • pass it to the Converter.ReceiveProducerConsumer, if there is any demand
  • or queue it in the GenStage buffer, waiting for demand

Do not forget to start the server in the supervision tree. We will repeat this step for all GenStages we create in our example, so I will not post this step again in the article.

./apps/ger_market/lib/ger_market/application.ex

I’m using Elixir 1.5 for our example. Please check Streamlined Child Specs if you are not familiar with the syntax above.

The implementation of UsaMarket.ReceiveProducer is similar to GerMarket, so we skip it. In the next (and last) article of the series, once we will put everything in place, you will find the link to the Github repository, with the full code of the demo.

ReceiveProducerConsumer

The ReceiveProducerConsumer inside Converter will ask the producers for stock market information (events).

./apps/converter/lib/converter/receive_producer_consumer.ex

The code is quite simple. We receive events, and we map them to GBP, using pattern matching to find USD and EUR.

If you used GenStage before, you are right to ask why we do not subscribe the producer_consumer to the producers in the init/1, using subscribe_to. In a “monolith” application you can control the start order of the GenStage processes. Not in a “flat umbrella”. You cannot guarantee that the GerMarket.ReceiveProducer starts before Converter.ReceiveProducerConsumer tries to subscribe to it. That’s why we manually subscribe the consumers to the producers in the tests. We will come back to this issue in the next article. We will see how we can handle the subscriptions automatically.

ReceiveConsumer

The last stage of the receive flow is the MyUkApp.ReceiveConsumer.

./apps/my_uk_app/lib/my_uk_app/receive_consumer.ex

The ReceiveConsumer asks the Converter.ReceiveProducerConsumer for events. When it receives any, it calls the Shared.Interface.process_info/2. As discussed in the last article, we implemented this Interface, just to send the received info to the Test process and to be able to assert them.

At this point, the ReceiveInfoTest will pass. We have a fully functional GenStage communication between different apps in the umbrella.

Send GenStage

The send information flow is very similar to the receive, from the implementation point of view. The only major difference is the fact that we have two information consumers this time.

SendProducer

This time MyUkApp will have the producer role.

./apps/my_uk_app/lib/my_uk_app/send_producer.ex

Nothing new here. The implementation is similar to the other producer above.

SendProducerConsumer

This time the Converter will exchange GBP info to USD and EUR.

Note the dispatcher: GenStage.BroadcastDispatcher option. This ensures that the SendProducerConsumer will send the events to all the subscribed consumers. Not only to the first that issued the demand.

For each message received from the producer, the Converter will create two of them, one for each converted currency.

SendConsumer

The consumer again is identical to the one above, so no point to insist on this code. The UsaMarket.SendConsumer is similar as well.

The SendInfoTest will pass and we have a fully working 2-way communication between “flat umbrella” apps.

One thing remains to be done: automatically handle the subscriptions between consumers and producers. We will take care of this in the next article.

--

--

No responses yet