Implementing GenStages Under Umbrella
With the tests in place, we can start the actual implementation of our application. We will implement two parallel GenStages information flows. They closely follow the tests we wrote in the last article.
Receive GenStages
First, we need to add GenStage as a dependency. We will put it in the Shared
mix.exs: :gen_stage, “~> 0.12”
. Then we start with the producer.
ReceiveProducer
We use a basic GenStage producer implementation, heavily inspired by the official docs. As said before, the main topic of the article is not the GenStage itself. But using it as a way of communication between Umbrella Apps. This is why we do not insist on optimizations, demand management, etc. We let the default GenStage implementation to handle the things for us.
We start the server with :no_state
, as we will not need one for our demo app. The receive_info/1
function will be the entry point to our Receive GenStage. When we receive a new stock market message, the handle_call
callback will either:
- pass it to the
Converter.ReceiveProducerConsumer
, if there is any demand - or queue it in the GenStage buffer, waiting for demand
Do not forget to start the server in the supervision tree. We will repeat this step for all GenStages we create in our example, so I will not post this step again in the article.
I’m using Elixir 1.5 for our example. Please check Streamlined Child Specs if you are not familiar with the syntax above.
The implementation of UsaMarket.ReceiveProducer
is similar to GerMarket,
so we skip it. In the next (and last) article of the series, once we will put everything in place, you will find the link to the Github repository, with the full code of the demo.
ReceiveProducerConsumer
The ReceiveProducerConsumer
inside Converter
will ask the producers for stock market information (events).
The code is quite simple. We receive events, and we map them to GBP, using pattern matching to find USD and EUR.
If you used GenStage before, you are right to ask why we do not subscribe the producer_consumer to the producers in the init/1,
using subscribe_to
. In a “monolith” application you can control the start order of the GenStage processes. Not in a “flat umbrella”. You cannot guarantee that the GerMarket.ReceiveProducer
starts before Converter.ReceiveProducerConsumer
tries to subscribe to it. That’s why we manually subscribe the consumers to the producers in the tests. We will come back to this issue in the next article. We will see how we can handle the subscriptions automatically.
ReceiveConsumer
The last stage of the receive flow is the MyUkApp.ReceiveConsumer
.
The ReceiveConsumer
asks the Converter.ReceiveProducerConsumer
for events. When it receives any, it calls the Shared.Interface.process_info/2
. As discussed in the last article, we implemented this Interface, just to send the received info to the Test process and to be able to assert them.
At this point, the ReceiveInfoTest
will pass. We have a fully functional GenStage communication between different apps in the umbrella.
Send GenStage
The send information flow is very similar to the receive, from the implementation point of view. The only major difference is the fact that we have two information consumers this time.
SendProducer
This time MyUkApp
will have the producer role.
Nothing new here. The implementation is similar to the other producer above.
SendProducerConsumer
This time the Converter will exchange GBP info to USD and EUR.
Note the dispatcher: GenStage.BroadcastDispatcher
option. This ensures that the SendProducerConsumer
will send the events to all the subscribed consumers. Not only to the first that issued the demand.
For each message received from the producer, the Converter
will create two of them, one for each converted currency.
SendConsumer
The consumer again is identical to the one above, so no point to insist on this code. The UsaMarket.SendConsumer
is similar as well.
The SendInfoTest
will pass and we have a fully working 2-way communication between “flat umbrella” apps.
One thing remains to be done: automatically handle the subscriptions between consumers and producers. We will take care of this in the next article.