ottomata

Live updates to Wikimedia projects with EventStreams

| Comments

This was originally published on Wikimedia’s blog

Wikimedia’s new public service that exposes live streams of Wikimedia projects is already powering several visualizations, like DataWaltz.

Photo by Mikey Tnasuttimonkol, CC BY-SA 4.0.

We are happy to announce EventStreams, a new public service that exposes live streams of Wikimedia events. And we don’t mean the next big calendar event like the Winter Olympics or Wikimania. Here, an ‘event’ is defined to be a small piece of data usually representing a state change. An edit of a Wikipedia page that adds some new information is an ‘event’, and could be described like the following:

1
2
3
4
5
6
7
{
    "event-type": "edit",
    "page": "Special Olympics",
    "project": "English Wikipedia",
    "time": "2017-03-07 09:31",
    "user": "TheBestEditor"
}

This means: “a user named ‘TheBestEditor’ added some content to the English Wikipedia’s Special Olympics page on March 7, 2017 at 9:31am”. While composing this blog post, we sought visualizations that use EventStreams, and found some awesome examples.

Open now in Los Angeles, DataWaltz is a physical installation that “creates a spatial feedback system for engaging with Wikipedia live updates, allowing visitors to follow and produce content from their interactions with the gallery’s physical environment.” You can see a photo of it at the top, and a 360 video of it over on Vimeo.

Sacha Saint-Leger sent us this display of real-time edits on a rotating globe, showing off where they are made.

EventStreams globe

Ethan Jewett created a really nice continuously updating chart of edit statistics.

EventStreams charts

A little background—why EventStreams?

EventStreams is not the first service from Wikimedia to expose RecentChange events as a stream. irc.wikimedia.org and RCStream have existed for years. These all serve the same data: RecentChange events. So why add a third stream service?

Both irc.wikimedia.org and RCStream suffer from similar design flaws. Neither service can be restarted without interrupting client subscriptions. This makes it difficult to build comprehensive tools that might not want to miss an event, and hard for WMF engineers to maintain. They are not easy to use, as services require several programming setup steps just to start subscribing to the stream. Perhaps more importantly, these services are RecentChanges specific, meaning that they are not able to serve different types of events. EventStreams addresses all of these issues.

EventStreams is built on the w3c standard Server Sent Events (SSE). SSE is simply a streaming HTTP connection with event data in a particular text format. Client libraries, usually called EventSource, assist with building responsive tools, but because SSE is really just HTTP, you can use any HTTP client (even curl!) to consume it.

The SSE standard defines a Last-Event-ID HTTP header, which allows clients to tell servers about the last event that they’ve consumed. EventStreams uses this header to begin streaming to a client from a point in the past. If EventSource clients are disconnected from servers (due to network issues or EventStreams service restarts), they will send this header to the server and automatically reconnect and begin from where they left off.

EventStreams can be used to expose any useful streams of events, not just RecentChanges. If there’s a stream you’d like to have, we want to know about it. For example, soon ORES revision score events may be exposed in their own stream. The service API docs have an up to date list of the (currently limited) available stream endpoints.

We’d like all RecentChange stream clients to switch to EventStreams, but we recognize that there are valuable bots out there running on irc.wikimedia.org that we might not be able to find the maintainers of. We commit to supporting irc.wikimedia.org for the foreseeable future. However, we believe the list of (really important) RCStream clients is small enough that we can convince or help folks switch to EventStreams. We’ve chosen an official RCStream decommission date of July 7 this year. If you run an RCStream client and are reading this and want help migrating, please reach out to us!

Quickstart

EventStreams is really easy to use, as shown by this quickstart example in JavaScript. Navigate to http://wikimedia.org/ in your browser and open the development console (for Google Chrome: More Tools > Developer Tools, and click ‘console’ on the bottom screen, which should open on the browser below the page you are visiting). Then paste the following:

1
2
3
4
5
6
7
8
9
10
11
12
// This is the EventStreams RecentChange stream endpoint
var url = 'https://stream.wikimedia.org/v2/stream/recentchange';
// Use EventSource (available in most browsers, or as an
// npm module: https://www.npmjs.com/package/eventsource)
// to subscribe to the stream.
var recentChangeStream = new EventSource(url);
// Print each event to the console
recentChangeStream.onmessage = function(message) {
//Parse the message.data string as JSON.
var event = JSON.parse(message.data);
console.log(event);
};

You should see RecentChange events fly by in your console.

That’s it! The EventStreams documentation has in depth information and usage examples in other languages.

If you build something, please tell us, or add yourself to the Powered By EventStreams wiki page. There are already some amazing uses there!

Importing JSON into Hadoop via Kafka

| Comments

This was originally published on Wikimedia’s blog

Our three key players are Hadoop, the defacto distributed batch data processing platform; JSON, a ubiquitous data format; and Kafka, which is becoming the system of choice for transporting streams of data. However, much of the data that flows into Kafka is in JSON format, and there isn’t good community support around importing JSON data from Kafka into Hadoop. This article summarizes some common technologies, and describes the approach used at Wikimedia to import our stream of incoming HTTP requests, which can peak at around 200,000 per second.

Photo by Eric Kilby, CC BY-SA 2.0.

JSON is…not binary

JSON is awesome. It is both machine and human readable. It is concise (at least compared to XML), and is even more concise when represented as YAML. It is well supported in many programming languages. JSON is text, and works with standard CLI tools.

JSON sucks. It is verbose. Every value has a key in every single record. It is schema-less and fragile. If a JSON producer changes a field name, all downstream consumer code has to be ready. It is slow. Languages have to convert JSON strings to binary representations and back too often. JSON is ubiquitous. Because it is so easy for developers to work with, it is one of the most common data serialization formats used on the web [citation needed!]. Almost any web based organization out there likely has to work with JSON in some capacity.

Kafka was originally developed by LinkedIn, and is now an open source Apache project with strong support from Confluent. Both of these organizations prefer to work with strongly typed and schema-ed data. Their serialization format of choice is Avro. Organizations like this have tight control over their data formats, as it rarely escapes outside of their internal networks. There are very good reasons Confluent is pushing Avro instead of JSON, but for many, like Wikimedia, it is impractical to transport data in a binary format that is unparseable without extra information (schemas) or special tools.

The Wikimedia Foundation lives openly on the web and has a commitment to work with volunteer open source contributors. Mediawiki is used by people of varying technical skill levels in different operating environments. Forcing volunteers and Wikimedia engineering teams to work with serialization formats other than JSON is just mean! Wikimedia wants our software and data to be easy.

For better or worse, we are stuck with JSON. This makes many things easy, but big data processing in Hadoop is not one of them. Hadoop runs in the JVM, and it works more smoothly if its data is schema-ed and strongly typed. Hive tables are schema-ed and strongly typed. They can be mapped onto JSON HDFS files using a JSON SerDe, but if the underlying data changes because someone renames a field, certain queries on that Hive table will break. Wikimedia imports the latest JSON data from Kafka into HDFS every 10 minutes, and then does a batch transform and load process on each fully imported hour.

Camus, Gobblin, Connect

LinkedIn created Camus to import Avro data from Kafka into HDFS. JSON support was added by Wikimedia. Camus’ shining feature is the ability to write data into HDFS directory hierarchies based on configurable time bucketing. You specify the granularity of the bucket and which field in your data should be used as the event timestamp. However, both LinkedIn and Confluent have dropped support for Camus. It is an end-of-life piece of software. Posited as replacements, LinkedIn has developed Gobblin, and Kafka ships with Kafka Connect.

Gobblin is a generic HDFS import tool. It should be used if you want to import data from a variety of sources into HDFS. It does not support timestamp bucketed JSON data out of the box. You’ll have to provide your own implementation to do this.

Kafka Connect is generic Kafka import and export tool, and has a HDFS Connector that helps get data into HDFS. It has limited JSON support, and requires that your JSON data conform to a Kafka Connect specific envelope. If you don’t want to reformat your JSON data to fit this envelope, you’ll have difficulty using Kafka Connect.

That leaves us with Camus. For years, Wikimedia has successfully been using Camus to import JSON data from Kafka into HDFS. Unlike the newer solutions, Camus does not do streaming imports, so it must be scheduled in batches. We’d like to catch up with more current solutions and use something like Kafka Connect, but until JSON is better supported we will continue to use Camus.

So, how is it done? This question appears often enough on Kafka related mailing lists, that we decided to write this blog post.

Camus with JSON

Camus needs to be told how to read messages from Kafka, and in what format they should be written to HDFS. JSON should be serialized and produced to Kafka as UTF-8 byte strings, one JSON object per Kafka message. We want this data to be written as is with no transformation directly to HDFS. We’d also like to compress this data in HDFS, and still have it be useable by MapReduce. Hadoop’s SequenceFile format will do nicely. (If we didn’t care about compression, we could use the StringRecordWriterProvider to write the JSON records \n delimited directly to HDFS text files.)

We’ll now create a camus.properties file that does what we need.

First, we need to tell Camus where to write our data, and where to keep execution metadata about this Camus job. Camus uses HDFS to store Kafka offsets so that it can keep track of topic partition offsets from which to start during each run:

1
2
3
4
5
6
7
8
9
# Final top-level HDFS data output directory. A sub-directory
# will be dynamically created for each consumed topic.
etl.destination.path=hdfs:///path/to/output/directory
# HDFS location where you want to keep execution files,
# i.e. offsets, error logs, and count files.
etl.execution.base.path=hdfs:///path/to/camus/metadata
# Where completed Camus job output directories are kept,
# usually a sub-dir in the etl.execution.base.path
etl.execution.history.path=hdfs:///path/to/camus/metadata/history

Next, we’ll specify how Camus should read in messages from Kafka, and how it should look for event timestamps in each message. We’ll use the JsonStringMessageDecoder, which expects each message to be UTF-8 byte JSON string. It will deserialize each message using the Gson JSON parser, and look for a configured timestamp field.

1
2
# Use the JsonStringMessageDecoder to deserialize JSON messages from Kafka.
camus.message.decoder.class=com.linkedin.camus.etl.kafka.coders.JsonStringMessageDecoder

camus.message.timestamp.field specifies which field in the JSON object should be used as the event timestamp, and camus.message.timestamp.format specifies the timestamp format of that field. Timestamp interpolation is handled by Java’s SimpleDateFormat, so you should set camus.message.timestamp.format to something that SimpleDateFormat understands, unless your timestamp is already an integer UNIX epoch timestamp. If it is, you should use unix_seconds or unix_milliseconds, depending on the granularity of your UNIX epoch timestamp.

Wikimedia maintains a slight fork of JSONStringMessageDecoder that makes the camus.message.timestamp.field slightly more flexible. In our fork, you can specify sub-objects using dotted notation, e.g. camus.message.timestamp.field=sub.object.timestamp. If you don’t need this feature, then don’t bother with our fork.

Here are a couple of examples:

Timestamp field is ‘dt’, format is an ISO-8601 string:

1
2
3
4
# Specify which field in the JSON object will contain our event timestamp.
camus.message.timestamp.field=dt
# Timestamp values look like 2017-01-01T15:40:17
camus.message.timestamp.format=yyyy-MM-dd'T'HH:mm:ss

Timestamp field is ‘meta.sub.object.ts’, format is a UNIX epoch timestamp integer in milliseconds:

1
2
3
4
5
6
# Specify which field in the JSON object will contain our event timestamp.
# E.g. { “meta”: { “sub”: { “object”: { “ts”: 1482871710123 } } } }
# Note that this will only work with Wikimedia’s fork of Camus.
camus.message.timestamp.field=meta.sub.object.ts
# Timestamp values are in milliseconds since UNIX epoch.
camus.message.timestamp.format=unix_milliseconds

If the timestamp cannot be read out of the JSON object, JsonStringMessageDecoder will log a warning and fall back to using System.currentTimeMillis().

Now that we’ve told Camus how to read from Kafka, we need to tell it how to write to HDFS. etl.output.file.time.partition.mins is important. It tells Camus the time bucketing granularity to use. Setting this to 60 minutes will cause Camus to write files into hourly bucket directories, e.g. 2017/01/01/15. Setting it to 1440 minutes will write daily buckets, etc.

1
2
3
4
5
6
# Store output into hourly buckets.
etl.output.file.time.partition.mins=60
Use UTC as the default timezone.
etl.default.timezone=UTC
# Delimit records by newline.  This is important for MapReduce to be able to split JSON records.
etl.output.record.delimiter=\n

Use SequenceFileRecordWriterProvider if you want to compress data. To do so, set mapreduce.output.fileoutputformat.compress.codec=Snappy (or another splittable compression codec) either in your mapred-site.xml, or in this camus.properties file.

1
2
3
4
5
# SequenceFileRecordWriterProvider writes the records as Hadoop Sequence files
# so that they can be split even if they are compressed.
etl.record.writer.provider.class=com.linkedin.camus.etl.kafka.common.SequenceFileRecordWriterProvider
# Use Snappy to compress output records.
mapreduce.output.fileoutputformat.compress.codec=SnappyCodec

Finally, some basic Camus configs are needed:

1
2
3
4
5
6
7
8
# Replace this with your list of Kafka brokers from which to bootstrap.
kafka.brokers=kafka1001:9092,kafka1002:9092,kafka1003:9092
# These are the kafka topics camus brings to HDFS.
# Replace this with the topics you want to pull,
# or alternatively use kafka.blacklist.topics.
kafka.whitelist.topics=topicA,topicB,topicC
# If whitelist has values, only whitelisted topic are pulled.
kafka.blacklist.topics=

There are various other camus properties you can tweak as well. You can see some of the ones Wikimedia uses here.

Once this camus.properties file is configured, we can launch a Camus Hadoop job to import from Kafka.

1
hadoop jar camus-etl-kafka-X.jar com.linkedin.camus.etl.kafka.CamusJob -P /path/to/camus.properties -Dcamus.job.name="my-camus-job"

Note: replace X in the above command with your Camus version number, e.g. camus-etl-kafka-3.1.1.jar

The first time this job runs, it will import as much data from Kafka as it can, and write its finishing topic-partition offsets to HDFS. The next time you launch a Camus job with this with the same camus.properties file, it will read offsets from the configured etl.execution.base.path HDFS directory and start consuming from Kafka at those offsets. Wikimedia schedules regular Camus Jobs using boring ol’ cron, but you could use whatever new fangled job scheduler you like.

After several Camus runs, you should see time bucketed directories containing Snappy compressed SequenceFiles of JSON data in HDFS stored in etl.destination.path, e.g. hdfs:///path/to/output/directory/topicA/2017/01/01/15/. You could access this data with custom MapReduce or Spark jobs, or use Hive’s org.apache.hive.hcatalog.data.JsonSerDe and Hadoop’s org.apache.hadoop.mapred.SequenceFileInputFormat. Wikimedia creates an external Hive table doing just that, and then batch processes this data into a more refined and useful schema stored as Parquet for faster querying.

Here’s the camus.properties file in full:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
#
# Camus properties file for consuming Kafka topics into HDFS.
#
# Final top-level HDFS data output directory. A sub-directory
# will be dynamically created for each consumed topic.
etl.destination.path=hdfs:///path/to/output/directory
# HDFS location where you want to keep execution files,
# i.e. offsets, error logs, and count files.
etl.execution.base.path=hdfs:///path/to/camus/metadata
# Where completed Camus job output directories are kept,
# usually a sub-dir in the etl.execution.base.path
etl.execution.history.path=hdfs:///path/to/camus/metadata/history
# Use the JsonStringMessageDecoder to deserialize JSON messages from Kafka.
camus.message.decoder.class=com.linkedin.camus.etl.kafka.coders.JsonStringMessageDecoder
# Specify which field in the JSON object will contain our event timestamp.
camus.message.timestamp.field=dt
# Timestamp values look like 2017-01-01T15:40:17
camus.message.timestamp.format=yyyy-MM-dd'T'HH:mm:ss
# Store output into hourly buckets.
etl.output.file.time.partition.mins=60
# Use UTC as the default timezone.
etl.default.timezone=UTC
# Delimit records by newline.  This is important for MapReduce to be able to split JSON records.
etl.output.record.delimiter=\n
# Concrete implementation of the Decoder class to use
camus.message.decoder.class=com.linkedin.camus.etl.kafka.coders.JsonStringMessageDecoder
# SequenceFileRecordWriterProvider writes the records as Hadoop Sequence files
# so that they can be split even if they are compressed.
etl.record.writer.provider.class=com.linkedin.camus.etl.kafka.common.SequenceFileRecordWriterProvider
# Use Snappy to compress output records.
mapreduce.output.fileoutputformat.compress.codec=SnappyCodec
# Max hadoop tasks to use, each task can pull multiple topic partitions.
mapred.map.tasks=24
# Connection parameters.
# Replace this with your list of Kafka brokers from which to bootstrap.
kafka.brokers=kafka1001:9092,kafka1002:9092,kafka1003:9092
# These are the kafka topics camus brings to HDFS.
# Replace this with the topics you want to pull,
# or alternatively use kafka.blacklist.topics.
kafka.whitelist.topics=topicA,topicB,topicC
# If whitelist has values, only whitelisted topic are pulled.
kafka.blacklist.topics=
# max historical time that will be pulled from each partition based on event timestamp
#  Note:  max.pull.hrs doesn't quite seem to be respected here.
#  This will take some more sleuthing to figure out why, but in our case
#  here it’s ok, as we hope to never be this far behind in Kafka messages to
#  consume.
kafka.max.pull.hrs=168
# events with a timestamp older than this will be discarded.
kafka.max.historical.days=7
# Max minutes for each mapper to pull messages (-1 means no limit)
# Let each mapper run for no more than 9 minutes.
# Camus creates hourly directories, and we don't want a single
# long running mapper keep other Camus jobs from being launched.
# We run Camus every 10 minutes, so limiting it to 9 should keep
# runs fresh.
kafka.max.pull.minutes.per.task=9
# Name of the client as seen by kafka
kafka.client.name=camus-00
# Fetch Request Parameters
#kafka.fetch.buffer.size=
#kafka.fetch.request.correlationid=
#kafka.fetch.request.max.wait=
#kafka.fetch.request.min.bytes=
kafka.client.buffer.size=20971520
kafka.client.so.timeout=60000
# Controls the submitting of counts to Kafka
# Default value set to true
post.tracking.counts.to.kafka=false
# Stops the mapper from getting inundated with Decoder exceptions for the same topic
# Default value is set to 10
max.decoder.exceptions.to.print=5
log4j.configuration=false
##########################
# Everything below this point can be ignored for the time being,
# will provide more documentation down the road. (LinkedIn/Camus never did! :/ )
##########################
etl.run.tracking.post=false
#kafka.monitor.tier=
kafka.monitor.time.granularity=10
etl.hourly=hourly
etl.daily=daily
etl.ignore.schema.errors=false
etl.keep.count.files=false
#etl.counts.path=
etl.execution.history.max.of.quota=.8