Deploying and managing scalable web services with Flume

Machine-generated log data is valuable in locating causes of various hardware and software failures. The information derived from it can provide feedback in improving system architecture, reducing system degradation, and improving up-time. Recently, businesses have started using this log data for deriving business insight. Using a fault-tolerant architecture, Flume is a distributed, service for efficiently collecting, aggregating, and moving large amounts of log data. In this article, we will learn how to deploy and use Flume with a Hadoop cluster and a simple distributed web service.

Zafar Gilani (z.gilani@tunacode.com), Research Intern, Telefonica Research

Zafar GilaniZafar Gilani is a research intern at Telefonica Research with a background in distributed computing and big data processing and analytics. His previous experience includes working on the Infiniband support for MPJ Express and he was a visiting scientist at the SLAC National Accelerator Lab.



13 August 2013

Also available in Russian

Flume architecture

Flume is a distributed, reliable, and available service used to collect, aggregate, and move large amounts of streaming event data from many sources to a centralized data store.

Figure 1. Flume architecture
Image shows Flume architecture

A Flume event can be defined as a unit of data flow having a payload (bytes) and an optional set of string attributes. A Flume agent is a JVM process that hosts the components through which events flow from an external source to the next destination (hop).

InfoSphere® BigInsights™ enables the continuous analysis and storage of streaming data with low latency. InfoSphere Streams can be used to configure the agent and collector processes described above (see Resources). Alternatively, Flume can be used to collect data on a remote location, and a collector can be configured on an InfoSphere BigInsights server to store data on the distributed file system (DFS). In this article, however, we will be using Flume as both agent and collector processes, together with a Hadoop Distributed File System (HDFS) cluster as storage.


Data flow model

A Flume agent has three main components: source, channel, and sink. The source consumes events delivered to it by an external source like a web service. The external source sends events to Flume in a recognizable format. When a Flume source receives events, it stores them into one or more channels. The channel is a passive store that keeps an event until it is consumed by a Flume sink. For example, a file channel uses the local file system; the sink extracts the event from the channel and puts it in an external repository like the HDFS, or forwards it to the Flume source of the next Flume agent (next hop) in the flow; the source and sink within the given agent run asynchronously with the events staged in the channel.

There can be different formats used by the source for different purposes. For example, an Avro Flume source can be used to receive Avro events from Avro clients. An Avro source forms half of Flume's tiered collection support. Internally, this source uses Avro's NettyTransceiver to listen for and handle events. It can be paired with the built-in AvroSink to create tiered collection topologies. Other popular network streams that Flume uses are Thrift, Syslog, and Netcat.


Avro

Apache's Avro is a data serialization format. It is an RPC-based framework, used widely by Apache projects — such as Flume and Hadoop — for data storage and communication (see Resources). The purpose of the Avro framework is to provide rich data structures, a compact and fast binary data format, and simple integration with dynamic languages, such as C++, Java™, Perl, and Python. Avro uses JSON for its Interface Description Language (IDL) to specify data types and protocols.

Avro relies on a schema stored with data. This enables fast and easy serialization since there are no per-value overheads. During the remote-procedure call (RPC), the schema is exchanged during client-server handshake. Using Avro, correspondence between the fields can be easily resolved, since it uses JSON.


Reliability, recoverability, and multi-hop flows

Flume uses a transactional design to guarantee reliability of event delivery. Transactional design corresponds to each event being treated as a transaction, and the events are staged in a channel on each agent. Each event is delivered to the next agent (like source bar) or terminal repository (like HDFS) in the flow. The events are removed from a channel only after they are stored in the channel of the next agent or in the terminal repository, thus maintaining a queue of current events until the storage confirmation is received. This happens through the source and the sink, which encapsulate the storage or retrieval information in a transaction provided by the channel. This ensures end-to-end reliability of the flow for single-hop message delivery semantics in Flume.

Recoverability is maintained through staging events in the channel, which manages recovery from failure. Flume supports a durable file channel that is backed by the local file system (essentially maintaining state on permanent storage). If a durable file channel is used, any events lost — in case of a crash or system failure — can be recovered. There is also a memory channel that stores the events in an in-memory queue, which is faster, but any events still left in the memory channel when an agent process dies cannot be recovered.

Flume also allows a user to build multi-hop flows where events travel through multiple agents before reaching the final destination. In the case of a multi-hop flow, the sink from the previous hop and the source from the next hop both have their transaction processes running to ensure that the data is safely stored in the channel of the next hop.

Figure 2. Multi-hop flows
Image shows multi-hop flows

System architecture

In this section, we will discuss how to set up a scalable web service using Flume. For this purpose, we will need code to read RSS feeds. We also need to configure Flume agents and collectors to receive RSS data and store it in the HDFS.

InfoSphere BigInsights Quick Start Edition

InfoSphere BigInsights Quick Start Edition is a complimentary, downloadable version of InfoSphere BigInsights, IBM’s Hadoop-based offering. Using Quick Start Edition, you can try out the features that IBM has built to extend the value of open source Hadoop, like Big SQL, text analytics, and BigSheets. Guided learning is available to make your experience as smooth as possible including step-by-step, self-paced tutorials and videos to help you start putting Hadoop to work for you. With no time or data limit, you can experiment on your own time with large amounts of data. Watch the videos, follow the tutorials (PDF), and download BigInsights Quick Start Edition now.

Flume agent configuration is stored in a local configuration file. This is similar to a Java properties file and is stored as a text file. Configurations for one or more agents can be specified in the same configuration file. The configuration file includes properties of each source, sink and channel in an agent and how they are wired together to form data flows.

An Avro source needs a hostname (IP address) and a port number to receive data. A memory channel can have maximum queue size (capacity), and an HDFS sink needs to know the file system URI and path to create files. An Avro sink can be a forward sink (avro-forward-sink), which can forward to the next Flume agent.

The idea is to create a miniature Flume distributed feed (log events) collection system. We will use agents as nodes, which get data (RSS feeds in this case) from an RSS feed reader. These agents will pass on these feeds to a collector node that will be responsible for storing these feeds into an HDFS cluster. In this example, we will use two Flume agent nodes, one Flume collector node, and a three-node HDFS cluster. Table 1 describes sources and sinks for the agent and collector nodes.

Table 1. Sources and sinks for agent and collector nodes
NodesSourceSink
Agent nodeRSS feedCollector
Collector nodeAgentsHDFS

Figure 3 shows the architectural overview of our multi-hop system with two agent nodes, one collector node, and an HDFS cluster. The RSS web feed (see code below) is an Avro source for both the agents and stores feeds in a memory channel. As the feeds pile up in the memory channel of the two agents, the Avro sinks start sending these events to the collector node's Avro source. The collector also uses a memory channel and an HDFS sink to dump feeds into the HDFS cluster. See below for agent and collector configurations.

Figure 3. Architectural overview of multi-hop system
Image shows architectural overview of multi-hop system

Let's look at how we can spin up a simple news reader service using Flume. The following Java code describes an RSS reader that reads RSS web sources from the BBC. As you may already know, RSS is a family of web feed formats used to publish frequently updated works, such as blog entries, news headlines, audio, and video, in a standardized format. RSS uses a publish-subscribe model to check the subscribed feeds regularly for updates.

The Java code uses Java's Net and Javax XML APIs to read the contents of a URL source in a W3C Document, and processes that information, before writing the information to the Flume channel.

Listing 1. Java code (RSSReader.java)
import java.net.URL;
import javax.xml.parsers.DocumentBuilder;
import javax.xml.parsers.DocumentBuilderFactory;
import org.w3c.dom.CharacterData;
import org.w3c.dom.Document;
import org.w3c.dom.Element;
import org.w3c.dom.Node;
import org.w3c.dom.NodeList;

public class RSSReader {
  private static RSSReader instance = null;
  private RSSReader() {
  }
  public static RSSReader getInstance() {
    if(instance == null) {
      instance = new RSSReader();
    }
    return instance;
  }
  public void writeNews() {
    try {
      DocumentBuilder builder = DocumentBuilderFactory.newInstance().
newDocumentBuilder();
      URL u = new URL("http://feeds.bbci.co.uk/news/world/rss.xml
?edition=uk#");
      Document doc = builder.parse(u.openStream());
      NodeList nodes = doc.getElementsByTagName("item");
      for(int i=0;i<nodes.getLength();i++) {
        Element element = (Element)nodes.item(i);
        System.out.println("Title: " + getElementValue(element,"title"));
        System.out.println("Link: " + getElementValue(element,"link"));
        System.out.println("Publish Date: " + getElementValue(element,"pubDate"));
        System.out.println("author: " + getElementValue(element,"dc:creator"));
        System.out.println("comments: " + getElementValue(element,"wfw:comment"));
        System.out.println("description: " + getElementValue(element,"description"));
        System.out.println();
      }
    } catch(Exception ex) {
      ex.printStackTrace();
    }
  }
  private String getCharacterDataFromElement(Element e) {
    try {
      Node child = e.getFirstChild();
      if(child instanceof CharacterData) {
        CharacterData cd = (CharacterData) child;
        return cd.getData();
      }
    } catch(Exception ex) {
    }
    return "";
  }
  protected float getFloat(String value) {
    if(value != null && !value.equals("")) {
      return Float.parseFloat(value);
    }
    return 0;
  }
  protected String getElementValue(Element parent,String label) {
    return getCharacterDataFromElement((Element)parent.getElements
ByTagName(label).item(0));
  }
  public static void main(String[] args) {
    RSSReader reader = RSSReader.getInstance();
    reader.writeNews();
  }
}

The following code listings show sample configuration files for agents (10.0.0.1 and 10.0.0.2) and a collector (10.0.0.3). The configuration files define semantics for source, channel, and sink. For each source type, we also need to define type, command, standard error behavior and failure options. For each channel, we need to define the channel type. The channel type, capacity (maximum number of events stored in the channel) and transaction capacity (maximum number of events the channel will take from a source or give to a sink per transaction) have to be defined as well. Similarly, for each sink type, we need to define type, hostname (IP of the recipient of the event), and port. In case of an HDFS sink, the directory path to the HDFS head name node is provided.

Listing 2 shows sample configuration file 10.0.0.1.

Listing 2. Agent 1 configuration (flume-conf.properties on 10.0.0.1)
# The configuration file needs to define the sources,
# the channels and the sinks.
# Sources, channels and sinks are defined per agent,
# in this case called 'agent'
agent.sources = reader
agent.channels = memoryChannel
agent.sinks = avro-forward-sink

# For each one of the sources, the type is defined
agent.sources.reader.type = exec
agent.sources.reader.command = tail -f /var/log/flume-ng/source.txt
# stderr is simply discarded, unless logStdErr=true 
# If the process exits for any reason, the source also exits and will produce no 
# further data.
agent.sources.reader.logStdErr = true
agent.sources.reader.restart = true
 
# The channel can be defined as follows.
agent.sources.reader.channels = memoryChannel

# Each sink's type must be defined
agent.sinks.avro-forward-sink.type = avro
agent.sinks.avro-forward-sink.hostname = 10.0.0.3
agent.sinks.avro-forward-sink.port = 60000

#Specify the channel the sink should use
agent.sinks.avro-forward-sink.channel = memoryChannel

# Each channel's type is defined.
agent.channels.memoryChannel.type = memory

# Other config values specific to each type of channel(sink or source)
# can be defined as well
# In this case, it specifies the capacity of the memory channel
agent.channels.memoryChannel.capacity = 10000
agent.channels.memoryChannel.transactionCapacity = 100

Listing 3 shows sample configuration file 10.0.0.2.

Listing 3. Agent 2 configuration (flume-conf.properties on 10.0.0.2)
agent.sources = reader
agent.channels = memoryChannel
agent.sinks = avro-forward-sink

# For each one of the sources, the type is defined
agent.sources.reader.type = exec
agent.sources.reader.command = tail -f /var/log/flume-ng/source.txt
# stderr is simply discarded, unless logStdErr=true 
# If the process exits for any reason, the source also exits and will produce
# no further data.
agent.sources.reader.logStdErr = true
agent.sources.reader.restart = true
 
# The channel can be defined as follows.
agent.sources.reader.channels = memoryChannel

# Each sink's type must be defined
agent.sinks.avro-forward-sink.type = avro
agent.sinks.avro-forward-sink.hostname = 10.0.0.3
agent.sinks.avro-forward-sink.port = 60000

#Specify the channel the sink should use
agent.sinks.avro-forward-sink.channel = memoryChannel

# Each channel's type is defined.
agent.channels.memoryChannel.type = memory

# Other config values specific to each type of channel(sink or source)
# can be defined as well
# In this case, it specifies the capacity of the memory channel
agent.channels.memoryChannel.capacity = 10000
agent.channels.memoryChannel.transactionCapacity = 100

Listing 4 shows the collector configuration file 10.0.0.3.

Listing 4. Collector configuration (flume-conf.properties on 10.0.0.3)
Collector configuration (flume-conf.properties on 10.0.0.3):
# The configuration file needs to define the sources,
# the channels and the sinks.
# Sources, channels and sinks are defined per agent,
# in this case called 'agent'

agent.sources = avro-collection-source
agent.channels = memoryChannel
agent.sinks = hdfs-sink

# For each one of the sources, the type is defined
agent.sources.avro-collection-source.type = avro
agent.sources.avro-collection-source.bind = 10.0.0.3
agent.sources.avro-collection-source.port = 60000

# The channel can be defined as follows.
agent.sources.avro-collection-source.channels = memoryChannel

# Each sink's type must be defined
agent.sinks.hdfs-sink.type = hdfs
agent.sinks.hdfs-sink.hdfs.path = hdfs://10.0.10.1:8020/flume

#Specify the channel the sink should use
agent.sinks.hdfs-sink.channel = memoryChannel

# Each channel's type is defined.
agent.channels.memoryChannel.type = memory

# Other config values specific to each type of channel(sink or source)
# can be defined as well
# In this case, it specifies the capacity of the memory channel
agent.channels.memoryChannel.capacity = 10000

Next steps

Now that we have the code to read RSS feeds and we know how to configure Flume agents and a collector, we can set up the whole system in three steps.

Step 1

The compiled Java code should be executed as a background process to keep it running.

Listing 5. Compiled Java code
$ javac RSSReader.java
$ java -cp /root/RSSReader RSSReader > /var/log/flume-ng/source.txt &

Step 2

Before starting the agents, you need to modify the configuration file using the template provided under $FLUME_HOME/conf/ directory. Once the configuration files are modified, the agents can be started using the following commands.

Listing 6 shows starting the agent on node 1.

Listing 6. Starting the agent on node 1
Agent node 1 (on 10.0.0.1):
$ $FLUME_HOME/bin/flume-ng agent -n agent1 -c conf -f 
$FLUME_HOME/conf/flume-conf.properties

Listing 7 shows starting the agent on node 2.

Listing 7. Starting the agent on node 2
Agent node 2 (on 10.0.0.2):
$ $FLUME_HOME/bin/flume-ng agent -n agent2 -c conf -f 
$FLUME_HOME/conf/flume-conf.properties

Here, $FLUME_HOME is defined as an environmental variable (bash or .bashrc), which points to the home directory of Flume (/home/user/flume-1.4/, for example).

Step 3

Listing 8 starts the collector. It is worth noting that the configuration files are responsible for how a node behaves, such as whether it is an agent or a collector.

Listing 8. Collector node (on 10.0.0.3)
$ $FLUME_HOME/bin/flume-ng agent -n collector -c conf -f 
$FLUME_HOME/conf/flume-conf.properties

Conclusion

In this article, we introduced Flume, a distributed and reliable service for efficiently collecting large amounts of log data. We described how it can be used to deploy single-hop and multi-hop flows, depending on need. We also described a detailed example in which we deployed a multi-hop news aggregator web service. In the example, we read RSS feeds using Avro agents and used an HDFS collector to store the newsfeeds. Flume can be used to build scalable distributed systems to collect large streams of data.

Resources

Learn

Get products and technologies

Discuss

Comments

developerWorks: Sign in

Required fields are indicated with an asterisk (*).


Need an IBM ID?
Forgot your IBM ID?


Forgot your password?
Change your password

By clicking Submit, you agree to the developerWorks terms of use.

 


The first time you sign into developerWorks, a profile is created for you. Information in your profile (your name, country/region, and company name) is displayed to the public and will accompany any content you post, unless you opt to hide your company name. You may update your IBM account at any time.

All information submitted is secure.

Choose your display name



The first time you sign in to developerWorks, a profile is created for you, so you need to choose a display name. Your display name accompanies the content you post on developerWorks.

Please choose a display name between 3-31 characters. Your display name must be unique in the developerWorks community and should not be your email address for privacy reasons.

Required fields are indicated with an asterisk (*).

(Must be between 3 – 31 characters.)

By clicking Submit, you agree to the developerWorks terms of use.

 


All information submitted is secure.

Dig deeper into Big data and analytics on developerWorks


static.content.url=http://www.ibm.com/developerworks/js/artrating/
SITE_ID=1
Zone=Big data and analytics, Big data and analytics, Open source, Information Management
ArticleID=940026
ArticleTitle=Deploying and managing scalable web services with Flume
publish-date=08132013