Loan

Sep 22 2017

Using Avro in MapReduce jobs with Hadoop, Pig, Hive – Michael G #hadoop-streaming, #avro #hadoop #pig #hive #mapreduce #streaming #snappy #compression #codec #data #serialization #format #tutorial #howto


#

Using Avro in MapReduce Jobs With Hadoop, Pig, Hive

Apache Avro is a very popular data serialization format in the Hadoop technology stack. In this article I show code examples of MapReduce jobs in Java, Hadoop Streaming, Pig and Hive that read and/or write data in Avro format. We will use a small, Twitter-like data set as input for our example MapReduce jobs.

The latest version of this article and the corresponding code examples are available at avro-hadoop-starter on GitHub.

Requirements

The examples require the following software versions:

  • Gradle 1.3+ (only for the Java examples)
  • Java JDK 7 (only for the Java examples)
    • It is easy to switch to JDK 6. Mostly you will need to change the sourceCompatibility and targetCompatibility parameters in build.gradle from 1.7 to 1.6. But since there are a couple of JDK 7 related gotchas (e.g. problems with its new bytecode verifier) that the Java example code solves I decided to stick with JDK 7 as the default.
  • Hadoop 2.x with MRv1 (not MRv2/YARN)
    • Tested with Cloudera CDH 4.3
  • Pig 0.11
    • Tested with Pig 0.11.0-cdh4.3.0
  • Hive 0.10
    • Tested with Hive 0.10.0-cdh4.3.0
  • Avro 1.7.4

Prerequisites

First you must clone my avro-hadoop-starter repository on GitHub.

Example data

Examples

TweetCount

TweetCount implements a MapReduce job that counts the number of tweets created by Twitter users.

TweetCountTest

TweetCountTest is very similar to TweetCount. It uses twitter.avro as its input and runs a unit test on it with the same MapReduce job as TweetCount. The unit test includes comparing the actual MapReduce output (in Snappy-compressed Avro format) with expected output. TweetCountTest extends ClusterMapReduceTestCase (MRv1), which means that the corresponding MapReduce job is launched in-memory via MiniMRCluster .

MiniMRCluster and Hadoop MRv2

The MiniMRCluster that is used by ClusterMapReduceTestCase in MRv1 is deprecated in Hadoop MRv2. When using MRv2 you should switch to MiniMRClientClusterFactory. which provides a wrapper interface called MiniMRClientCluster around the MiniMRYarnCluster (MRv2):

MiniMRClientClusterFactory: A MiniMRCluster factory. In MR2, it provides a wrapper MiniMRClientCluster interface around the MiniMRYarnCluster. While in MR1, it provides such wrapper around MiniMRCluster. This factory should be used in tests to provide an easy migration of tests across MR1 and MR2.

Further readings on Java

  • Package Documentation for org.apache.avro.mapred – Run Hadoop MapReduce jobs over Avro data, with map and reduce functions written in Java. This document provides detailed information on how you should use the Avro Java API to implement MapReduce jobs that read and/or write data in Avro format.
  • Java MapReduce and Avro – Cloudera CDH4 documentation

Hadoop Streaming

Preliminaries

Important: The examples below assume you have access to a running Hadoop cluster.

How Streaming sees data when reading via AvroAsTextInputFormat

When using AvroAsTextInputFormat as the input format your streaming code will receive the data in JSON format, one record (“datum” in Avro parlance) per line. Note that Avro will also add a trailing TAB ( \t ) at the end of each line.

Here is the basic data flow from your input data in binary Avro format to our streaming mapper:

Examples

Prerequisites

The example commands below use the Hadoop Streaming jar for MRv1 shipped with Cloudera CDH4:

If you are not using Cloudera CDH4 or are using a new version of CDH4 just replace the jar file with the one included in your Hadoop installation.

The Avro jar files are straight from the Avro project :

Reading Avro, writing plain-text

The following command reads Avro data from the relative HDFS directory examples/input/ (which normally resolves to /user/ your-unix-username /examples/input/ ). It writes the deserialized version of each data record (see section How Streaming sees data when reading via AvroAsTextInputFormat above) as is to the output HDFS directory streaming/output/. For this simple demonstration we are using the IdentityMapper as a naive map step implementation – it outputs its input data unmodified (equivalently we coud use the Unix tool cat. here). We do not need to run a reduce phase here, which is why we disable the reduce step via the option -D mapred.reduce.tasks=0 (see Specifying Map-Only Jobs in the Hadoop Streaming documentation).

Custom Avro output schema

This looks not to be supported by stock Avro at the moment. A related JIRA ticket AVRO-1067. created in April 2012, is still unresolved as of July 2013.

For a workaround take a look at the section Avro output for Hadoop Streaming at avro-utils. a third-party library for Avro.

Enabling compression of Avro output data (Snappy or Deflate)

If you want to enable compression for the Avro output data, you must add the following parameters to the streaming job:

Be aware that if you enable compression with mapred.output.compress but are NOT specifying an Avro output format (such as AvroTextOutputFormat) your cluster’s configured default compression codec will determine the final format of the output data. For instance, if mapred.output.compression.codec is set to com.hadoop.compression.lzo.LzopCodec then the job’s output files would be compressed with LZO (e.g. you would see part-00000.lzo output files instead of uncompressed part-00000 files).

See also Compression and Avro in the CDH4 documentation.

Further readings on Hadoop Streaming

Preliminaries

Important: The examples below assume you have access to a running Hadoop cluster.

Examples

In this section we demonstrate how to create a Hive table backed by Avro data, followed by running a few simple Hive queries against that data.

Defining a Hive table backed by Avro data

Using avro.schema.url to point to remote a Avro schema file

The following CREATE TABLE statement creates an external Hive table named tweets for storing Twitter messages in a very basic data structure that consists of username, content of the message and a timestamp.

Where to go from here

Comments

About Me

Contact

Follow Me

Recent Posts



Written by admin


Leave a Reply

Your email address will not be published. Required fields are marked *