This article quickly describes how we handled the transit of external-generated customers events toward our internal Kafka cluster and how we built a reliable failover system using Flume-NG.

We started to work with Apache Flume on its 0.9 version since the beginning, because it was fitting well our need to make internet events landing into a first dumb backet, before being processed.

Flume and the old structure

In brief, the architecture of the old Flume centralizes all the logical agents configuration into a Master Flume server, which is constantly able to communicate and push changes. A new Flume agent hence, needs to be initialized against this master and a master/agent communication must be open all the time.

That said, it’s easy to consider that such architecture is not the best for scaling up in general and can be a bottleneck if you think about autoscaling or multi-region setup.

So, with the need of opening new countries, we started to upgrade to Flume-NG, attracted by its masterless architecture.

From the beginning, Flume-NG was easier to setup, even from a provisioning prospective, compared to its predecessor. A source/sink setup now is living on each agent independently.

In our case and as example for this article, we are working with Flume-NG 1.6, using THRIFT as source and KAFKA as remote sink:

flumeng_node_example

Flume 0.x and Disk Failover

However, a very special out-of-the-box feature shipped with Flume 0.x was apparently lost in Flume-NG, the “diskFailover”, which simply stores events on a local disk when a remote sink is down.

It took me some time to understand that the sink-on-disk feature was not actually missing, but rather, with Flume-NG we could have relied on more options for auto-failing over. So, instead of a single sink-to-disk option, we can now configure on a pool of real sinks that will take place in the failover system following a specific priority.

However, despite the many choices, the idea to keep unsent events to the disk was still the best suitable option for us. The following, shows how we achieved this feature using Flume-NG and which kind of problems we eventually solved.

Analysis of the new sink pool

So, as previously said, in order to achieve pretty much the same type of “DiskFailover” feature, we configured what, in Flume-NG, is called the Failover Group of sinks. In our case, it will only include Kafka (main sink) and Fileroll sink (failover), used to write on the disk in case Kafka is unavailable.

The Flume-NG agent sink group is defined as g1 here since you can have as many groups as you need. The last line is utilized to determine the type of the group, in our case failover (it can also be load balancer, for instance):

Now we assign a prio among the sinks:

If the highest prio sink is down, let the lower prio sink take over. When the highest prio is available again, get back to it:

If the highest prio sink is down, let the lower prio sink take over. When the highest prio is available again, get back to it:

A spool directory

After a failover episode, we end up having some events collected locally that we still want to send over the main sink in a reasonable time. However, in our specific case, the arrival order of the events on Kafka is not important.

To accomplish this task, we just set up what Flume-NG calls SpoolDir, which is also connected to the Kafka cluster:

So, basically, as soon as a event is moved in this directory, it will be automatically sent to Kafka:

So, basically, all the events moved in this directory will be automatically sent to Kafka:

Issues using SpoolDir source

By design, in case of Kafka, SpoolDir will work as long as you write a Kafka topic’s name in the Flume-NG agent configuration file:

Unfortunately, this fixed configuration is not suitable  in case you’re collecting varioius events aimed at different topics.

In theory, the SpoolDir should read the topic name from some event metadata stored on the event itself. In practice, in our case, this never worked.

When a developer from our core team did the magic (fixed the deserialization procedure, thanks Sebastian Alfers!) , we were able to make the SpooDir sends each event to the proper destination topic.

Processing SpoolDir regularly

The last step of this implementation was to put a trivial cronjob which periodically:

  • Check the presence of rolled files (unsent) events in Flume-NG/failover/Events
  • Check whether those events are still “useful” for our data processing (in our case, only events younger than n days; in case of older events, just archive them somewhere for a further analysis; you’ll never know.)
  • Check once again whether or not the remote sink is available
  • Move the events to the SpoolDir

 

AUTHOR: SIMONE ROSELLI