This was originally published on Wikimedia’s blog
Our three key players are Hadoop, the defacto distributed batch data processing platform; JSON, a ubiquitous data format; and Kafka, which is becoming the system of choice for transporting streams of data. However, much of the data that flows into Kafka is in JSON format, and there isn’t good community support around importing JSON data from Kafka into Hadoop. This article summarizes some common technologies, and describes the approach used at Wikimedia to import our stream of incoming HTTP requests, which can peak at around 200,000 per second.
JSON is…not binary
JSON is awesome. It is both machine and human readable. It is concise (at least compared to XML), and is even more concise when represented as YAML. It is well supported in many programming languages. JSON is text, and works with standard CLI tools.
JSON sucks. It is verbose. Every value has a key in every single record. It is schema-less and fragile. If a JSON producer changes a field name, all downstream consumer code has to be ready. It is slow. Languages have to convert JSON strings to binary representations and back too often. JSON is ubiquitous. Because it is so easy for developers to work with, it is one of the most common data serialization formats used on the web [citation needed!]. Almost any web based organization out there likely has to work with JSON in some capacity.
Kafka was originally developed by LinkedIn, and is now an open source Apache project with strong support from Confluent. Both of these organizations prefer to work with strongly typed and schema-ed data. Their serialization format of choice is Avro. Organizations like this have tight control over their data formats, as it rarely escapes outside of their internal networks. There are very good reasons Confluent is pushing Avro instead of JSON, but for many, like Wikimedia, it is impractical to transport data in a binary format that is unparseable without extra information (schemas) or special tools.
The Wikimedia Foundation lives openly on the web and has a commitment to work with volunteer open source contributors. Mediawiki is used by people of varying technical skill levels in different operating environments. Forcing volunteers and Wikimedia engineering teams to work with serialization formats other than JSON is just mean! Wikimedia wants our software and data to be easy.
For better or worse, we are stuck with JSON. This makes many things easy, but big data processing in Hadoop is not one of them. Hadoop runs in the JVM, and it works more smoothly if its data is schema-ed and strongly typed. Hive tables are schema-ed and strongly typed. They can be mapped onto JSON HDFS files using a JSON SerDe, but if the underlying data changes because someone renames a field, certain queries on that Hive table will break. Wikimedia imports the latest JSON data from Kafka into HDFS every 10 minutes, and then does a batch transform and load process on each fully imported hour.
Camus, Gobblin, Connect
LinkedIn created Camus to import Avro data from Kafka into HDFS. JSON support was added by Wikimedia. Camus’ shining feature is the ability to write data into HDFS directory hierarchies based on configurable time bucketing. You specify the granularity of the bucket and which field in your data should be used as the event timestamp. However, both LinkedIn and Confluent have dropped support for Camus. It is an end-of-life piece of software. Posited as replacements, LinkedIn has developed Gobblin, and Kafka ships with Kafka Connect.
Gobblin is a generic HDFS import tool. It should be used if you want to import data from a variety of sources into HDFS. It does not support timestamp bucketed JSON data out of the box. You’ll have to provide your own implementation to do this.
Kafka Connect is generic Kafka import and export tool, and has a HDFS Connector that helps get data into HDFS. It has limited JSON support, and requires that your JSON data conform to a Kafka Connect specific envelope. If you don’t want to reformat your JSON data to fit this envelope, you’ll have difficulty using Kafka Connect.
That leaves us with Camus. For years, Wikimedia has successfully been using Camus to import JSON data from Kafka into HDFS. Unlike the newer solutions, Camus does not do streaming imports, so it must be scheduled in batches. We’d like to catch up with more current solutions and use something like Kafka Connect, but until JSON is better supported we will continue to use Camus.
Camus with JSON
Camus needs to be told how to read messages from Kafka, and in what format they should be written to HDFS. JSON should be serialized and produced to Kafka as UTF-8 byte strings, one JSON object per Kafka message. We want this data to be written as is with no transformation directly to HDFS. We’d also like to compress this data in HDFS, and still have it be useable by MapReduce. Hadoop’s SequenceFile format will do nicely. (If we didn’t care about compression, we could use the StringRecordWriterProvider to write the JSON records \n delimited directly to HDFS text files.)
We’ll now create a camus.properties file that does what we need.
First, we need to tell Camus where to write our data, and where to keep execution metadata about this Camus job. Camus uses HDFS to store Kafka offsets so that it can keep track of topic partition offsets from which to start during each run:
1 2 3 4 5 6 7 8 9
Next, we’ll specify how Camus should read in messages from Kafka, and how it should look for event timestamps in each message. We’ll use the JsonStringMessageDecoder, which expects each message to be UTF-8 byte JSON string. It will deserialize each message using the Gson JSON parser, and look for a configured timestamp field.
camus.message.timestamp.field specifies which field in the JSON object should be used as the event timestamp, and
camus.message.timestamp.format specifies the timestamp format of that field. Timestamp interpolation is handled by Java’s SimpleDateFormat, so you should set
camus.message.timestamp.format to something that SimpleDateFormat understands, unless your timestamp is already an integer UNIX epoch timestamp. If it is, you should use
unix_milliseconds, depending on the granularity of your UNIX epoch timestamp.
Wikimedia maintains a slight fork of JSONStringMessageDecoder that makes the
camus.message.timestamp.field slightly more flexible. In our fork, you can specify sub-objects using dotted notation, e.g.
camus.message.timestamp.field=sub.object.timestamp. If you don’t need this feature, then don’t bother with our fork.
Here are a couple of examples:
Timestamp field is ‘dt’, format is an ISO-8601 string:
1 2 3 4
Timestamp field is ‘meta.sub.object.ts’, format is a UNIX epoch timestamp integer in milliseconds:
1 2 3 4 5 6
If the timestamp cannot be read out of the JSON object, JsonStringMessageDecoder will log a warning and fall back to using System.currentTimeMillis().
Now that we’ve told Camus how to read from Kafka, we need to tell it how to write to HDFS.
etl.output.file.time.partition.mins is important. It tells Camus the time bucketing granularity to use. Setting this to 60 minutes will cause Camus to write files into hourly bucket directories, e.g. 2017/01/01/15. Setting it to 1440 minutes will write daily buckets, etc.
1 2 3 4 5 6
Use SequenceFileRecordWriterProvider if you want to compress data. To do so, set
mapreduce.output.fileoutputformat.compress.codec=Snappy (or another splittable compression codec) either in your mapred-site.xml, or in this camus.properties file.
1 2 3 4 5
Finally, some basic Camus configs are needed:
1 2 3 4 5 6 7 8
There are various other camus properties you can tweak as well. You can see some of the ones Wikimedia uses here.
Once this camus.properties file is configured, we can launch a Camus Hadoop job to import from Kafka.
Note: replace X in the above command with your Camus version number, e.g. camus-etl-kafka-3.1.1.jar
The first time this job runs, it will import as much data from Kafka as it can, and write its finishing topic-partition offsets to HDFS. The next time you launch a Camus job with this with the same camus.properties file, it will read offsets from the configured
etl.execution.base.path HDFS directory and start consuming from Kafka at those offsets. Wikimedia schedules regular Camus Jobs using boring ol’ cron, but you could use whatever new fangled job scheduler you like.
After several Camus runs, you should see time bucketed directories containing Snappy compressed SequenceFiles of JSON data in HDFS stored in
hdfs:///path/to/output/directory/topicA/2017/01/01/15/. You could access this data with custom MapReduce or Spark jobs, or use Hive’s org.apache.hive.hcatalog.data.JsonSerDe and Hadoop’s org.apache.hadoop.mapred.SequenceFileInputFormat. Wikimedia creates an external Hive table doing just that, and then batch processes this data into a more refined and useful schema stored as Parquet for faster querying.
Here’s the camus.properties file in full:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86