RSS

Applying Kafka Streams for internal message delivery pipeline

by LINE Engineer on 2016.8.19


Introduction

Hello, my name is Yuto Kawamura. I’m a LINE server engineer in charge of developing and operating LINE’s core storage facilities such as HBase and Kafka.

Since the latter half of last year, I’ve been working on a new project called IMF, which stands for Internal Message Flow (or Fund). IMF has two main goals:

  • Develop a data pipeline which provides a unified way of delivering events between our systems.
  • Replace talk-dispatcher, a component in the LINE server system responsible for background task processing.

The two goals may seem unrelated, but we’re actually trying to adopt the same technologies for both; Apache Kafka and stream processing. Apache Kafka is a high-throughput distributed messaging system that was originally developed and used at LinkedIn. Although Kafka has various unique features, the most important ones are the following:

  • Disk-based persistence with high throughput which is near in-memory through the use a page cache
  • Ability to have multiple consumers consume messages from a topic (similar to a queue) multiple times because each client manages “offsets” which represents the position in the queue that consumption has caught up to

There are many interesting things that I could tell you about our work, such as the practices we’ve defined while working on Kafka engineering or the bigger picture of what the IMF project is about. But today, I’d like to focus on just one thing: How we implement stream processing.

Stream processing frameworks

There are several well-known stream processing frameworks available such as Apache Storm, Apache Spark, Apache Flink and Apache Samza.

Initially, we tried adopting Apache Samza, which like Kafka, was originally developed by engineers at LinkedIn. Because Samza was designed to be optimized with Kafka, it has a high affinity with Kafka as well as built-in support for integration. It worked fine in principle, but I soon realized that there would be issues when applying it to our core infrastructure (the key components that handle business logic which can affect service reliability). I had the following concerns regarding adopting Samza:

  • It relies on YARN, a well-known and well-developed distributed resource allocation framework. These were the reasons why I was concerned about YARN:
    • It was originally designed for batch processing rather than stream processing. Although it can still be used for stream processing, there were a few minor issues which made me feel a little uncomfortable using it.
    • It wasn’t compatible with our internal deployment system, Project Management Console (PMC).
    • It would make our architecture too complex and I wanted to keep our architecture as simple as possible. We didn’t need resource isolation or allocation for service-sensitive applications that run indefinitely for the following reasons:
      • Our servers are allocated separately for different services.
      • The JVM has an option to limit the maximum heap size, which is the part that consumes the most physical memory.
      • CPU usage isn’t a problem because of our application characteristics.
      • There may be traffic issues but YARN doesn’t have a network I/O throttling function.
  • It would be difficult to allow YARN to decide which host is needed to run certain jobs because our systems have strictly defined interpretations of “host”. (For example, IMON, an internal system that monitors various services, can see metrics per host and issue alerts per host. Kibana stores log lines per host.)
  • The development community for Samza didn’t seem as active as other communities.

Needless to say, YARN is very useful as it allows for multi-tenancy in running jobs or apps in a resource pool. We use it to run different kinds of jobs such as jobs for stats and ad-hoc jobs which are executed when investigating something.

Kafka Streams

On March 10, 2016, Confluent (the company started by the members who originally wrote Apache Kafka at LinkedIn) published a new blog post titled Introducing Kafka Streams: Stream Processing Made Simple. Once I read through the post, I realized that this is what we were looking for. (Before I discovered this post, I was seriously thinking of writing a new framework on my own.) Unlike other stream processing frameworks, it was designed as a “library” rather than an “execution framework.” Some concepts were inherited from Apache Samza, but there are also some other significant differences which I’ll describe here:

  • It’s a library not an execution framework meaning that users have to run it manually. It’s up to developers whether to run it on a framework or to simply write public static void main().
  • It’s very thin and simple for core functionality by fully utilizing Kafka primitives.
  • It supports a straightforward DSL for defining processing topology.
  • Its development is officially supported and recommended by the Kafka community.
  • It supports rolling restarts, which is useful for checking whether an updated app works as expected on a single instance with production traffic.

Kafka Streams was first released with Kafka version 0.10.0.0. However, when we first tried it prior to the release of Kafka 0.10.0.0, we had to manually build an artifact from the source on the repository. It also required Kafka brokers running with a version higher or equal to 0.10.0.0 but our cluster was running 0.9.0.1 at that time, so we had to do some dirty work like manually patching the client library to force a downgrade to an incompatible protocol. (And of course we will upgrade our cluster as soon as we can.) Well, at least it was easier than starting a new implementation from scratch on my own, I think.

Next, I’ll talk about some of the interesting features that Kafka Streams has to offer.

Masterless

Kafka Streams has no notion of a master that takes care of detecting failures, coordinating workers, and assigning partitions as in most common distributed systems. Instead, it relies solely on Kafka’s built-in coordination mechanism. In other words, there’s no inter-worker communication. When you run a new instance of KafkaStreams with a given applicationId, the instance subscribes to a Kafka broker as a possible consumer for the applicationId. When assignments are rebalanced or when failovers occur, Kafka brokers detect such reassignments so communication between workers isn’t necessary.

High-level-DSL API and low-level API

Kafka Streams supports two kinds of APIs to program stream processing; a high-level DSL API and a low-level API.

High-level-DSL API

In most cases, stream processing is about transforming, filtering, joining and aggregating streams and storing the results. In these cases, a high-level DSL API would be appropriate for the interface. It lets users program operations such as collection transformations that look quite similar to Scala’s collection API. Here’s an example from IMF’s loopback topic replicator which consumes messages from the original topic and produces fewer filtered messages to derived topics by category.

KStreamBuilder builder = new KStreamBuilder();
KStream<Long, OperationLog> stream =
        builder.stream(sourceTopic.keySerde(), sourceTopic.valSerde(), 
    sourceTopic.name());

Map<String, Set<OpType>> categories = loadCategories();
for (Map.Entry<String, Set<OpType>> entry : categories.entrySet()) {
    String topic = entry.getKey();
    Set<OpType> opTypes = entry.getValue();
    TalkOperationLogV1 destTopic =
            InternalMessageTopics.getTopic(topic, TalkOperationLogV1.class);

    stream.filter((key, value) -> {
              TalkOperation op = value.getOperation();
              return op != null && opTypes.contains(op.getOpType());
          })
          .to(destTopic.keySerde(), destTopic.valSerde(), destTopic.name());
}
Properties props = loadStreamsProps();
KafkaStreams streams = new KafkaStreams(builder, props);
streams.start();

The purpose – to construct derived topics from the original topic by opType categorization – is expressed in the following code snippet.

  1. Creates new KStream by specifying a topic and its key-value serializers
  2. Apply filters on each element
  3. Store results to the corresponding topic for the category
KStream<Long, OperationLog> stream =
        builder.stream(sourceTopic.keySerde(), sourceTopic.valSerde(), 
    sourceTopic.name());

... 

stream.filter((key, value) -> {
          TalkOperation op = value.getOperation();
          return op != null && opTypes.contains(op.getOpType());
      })
      .to(destTopic.keySerde(), destTopic.valSerde(), destTopic.name()); 
    // Store the resulting messages to the topic pointed by destTopic

Low-level API

The low-level API should be used for niche cases that require uncommon processing such as dispatching messages to the specific downstream based on its content. Processor API is quite straightforward and easy to understand. You can see examples of how to use the low-level API in the examples directory of the official repository. In short, the high-level DSL API should be your first choice in most situations, but there is nothing preventing you from using the low-level API when necessary.

Fault-tolerant local state database

While implementing stream processing, it’s often required to keep states for various purposes. Local states are commonly used to implement aggregations, joins and windowing, but that’s not the only case. In Kafka Streams, each processor can have its own state store. When a failover occurs and the processor assignment moves to another host, the state database is also migrated to the new processor thanks to the changelog mechanism of Kafka Streams. When Kafka Streams updates the physical store for the state database (by the way, it’s got a pluggable interface so you can choose the in-memory store, RocksDB, or anything else you want), it also produces a message to a special topic called “changelog”. The changelog topic can be considered the WAL (Write-Ahead-Log) of the local state. As Kafka topics can be consumed an arbitrary number of times, every time a processor failover occurs a new processor can restore its local state database by replaying mutation logs from the changelog topic. In other words, you don’t need to prepare external storage to have a state store for a processor. Kafka and Kafka Streams are the only things you need.

What we’ve implemented using Kafka Streams

Loopback Replicator

We implemented a Kafka topic replicator with Kafka Streams. It’s actually not for replicating topics across clusters, but for replicating topics while applying map/filter operations to messages. Right now, it’s mainly used to derive topics which have less categorized messages than the original topic so that consumers can consume fewer messages, thereby reducing network traffic and machine resources. For example, we produce TalkOperation logs. TalkOperation is a core data structure which allows for communication between LINE clients and the talk-server. At peak times, the number of incoming operations to a topic can reach 1 million messages per second. Some consumers may be interested in all operations but some consumers are not. If consumers only want to consume operations which are related to the contact functions of LINE, such as ADD_CONTACT and BLOCK_CONTACT, they don’t need to consume the entire stream. In such cases, we use a loopback replicator to provide derived topics which only contain the operations which are related to contact functions. Currently, the loopback replicator is deployed as a single Java application without doing anything special and it has worked well up until now, even though we had it artificially failover for testing purposes.

Decaton

Decaton is the background task processing system that is intended to replace talk-dispatcher as I mentioned at the beginning of this post. Here, I’ll describe some of the problems that we have with talk-dispatcher:

  • The processing doesn’t scale. All tasks spawned by the talk-server are queued into a local Redis queue running on the same host. Talk-dispatcher also runs within the same server and consumes tasks from the local Redis instance. If a burst happens on a server, the size of the queue would increase by an insane amount because there would only be a single instance consuming that queue.
  • It uses in-memory queue. Because we’re using Redis for the queue server, the contents of that queue could be lost if an instance dies for any reason. Also, the maximum number of tasks that can be held in a queue is greatly limited because of the limited amount of physical memory. In fact, we’ve lost a huge number of tasks due to this limitation on several occasions.
  • Forced out-of-order processing. Since we’re not routing requests based on userId, any talk-server can receive requests from a user. Tasks that are created due to the requests from that user are put into a local queue, while other tasks may be put into queues in another host. These tasks are consumed independently by different talk-dispatcher instances and may be processed out of order.

Kafka provides:

  1. Scalable partitioned queues
  2. Disk-based persistent queue that still performs quite fast
  3. In-order task consumption by message key shuffling (it depends on the key, but let’s use userId as a key for example).

Here’s a rough sketch showing an example of how Decaton works. It solves the three main problems that we currently face with talk-dispatcher and it aims to isolate processing as much as possible. Thanks to Kafka’s features, different processors that perform different processing jobs for the same task can consume tasks independently because Kafka topics aren’t volatile. By isolating process execution, unrelated failures no longer block other processes from being performed after the process which resulted in a failure. In the example above, TaskProcessorA and TaskProcessorB are not affected by StorageMutationProcessor even if an HBase server fails and the processing job is blocked indefinitely. Tasks will pile up in the queue but that isn’t a problem as Kafka topics are persistent and nearly-unbounded.

Decaton is one example of using the low-level API to provide outgoing message dispatching to arbitrary topics. KAFKA-3497 is an API enhancement which was requested to make this possible in fact.

Contributing to Kafka

While developing our own software with Kafka Streams, I reported several bugs and made some improvements to the Kafka Streams project. Some of the issues and patches I contributed have already been merged.

As Kafka Streams is still in its early stages of development, the community is quite responsive when it comes to requests from users. As a community backed by the engineers of Confluent, they respond very quickly, which makes it pretty comfortable for me to contribute. Also, the Kafka community is completely open with no closed discussion (as far as I can tell).

You can make an impact by contributing to newly created software. You’re involved in deciding the development direction of the software and you can develop the software according to your own needs while being empowered by the community. And to top it off, it’s a fun thing to do!

Conclusion

Kafka Streams is still early in its development, but I expect it to have a bright future. As far as I know, it’s the first stream processing framework that was designed from the ground up to be integrated with primary applications which directly affect service performance and reliability. Kafka itself is also well-designed, reliable middleware. If you’re thinking of building a distributed log infrastructure with Kafka, you should also try Kafka Streams.

If you’re interested and would like to learn more, take a look at these useful resources:

Thanks, Yuto