Apache hadoop is a framework that allows for the distributed processing of large data sets across clusters of computers using simple programming models. It's designed to scale up from one single server to thousands of machines, each offering computation and storage. It is designed to detect and handle failure on the application layer rather than to deliver the high-available on the hardware layer. It can merge thousands of servers to a Hadoop cluster that looks like a big virtual server. The two core things are done by Hadoop
1> Storage the files
2> Run the application on top of the files.
Difference between RDBMS and Hadoop
1> Schema-on-Write (RDBMS):
i>Schema must be created before any data can be loaded
ii> An explicit load operation has to take place which transforms data to DB internal structure
iii> New columns must be added explicitly before new data for such columns can be loaded into the database
2> Schema-on-Read (Hadoop)
i> Data is simply copied to the file store, no transformation is needed
ii> A SerDe(Serializer/Deserlizer) is applied during read time to extract the required columns
iii> New data can start flowing anytime and will appear retroactively once the SerDe is updated to parse it.
Notes: Serialize is used for writing data, the structured data is serialized into a bit/byte stream for storage. On read, the data is deserialized from the bit/byte storage format to the structure required by the reader.
3> Different roles for RDBMS and Hadoop
i> RDBMS is used when:
Interactive OLAP Analytics (< 1sec)
Multistep ACID transactions
100% SQL compliance
ii> Hadoop is used when:
Structured or Not (Flexibility)
Scalability of Compute/Storage
Complex data Processing
Hadoop is composed of many projects to offer various feathers to process the large data sets. The two core projects are HDFS and MapReduce:
HDFS is a distributed file system and highly fault-tolerant system. It's designed to be deployed on a low-cost hardware. HDFS provides high througput access to application data and is suitable for applications that have large data sets of structual or unstructual data.
HDFS has a master/slave architecture. An HDFS cluster consists of a single NameNode, a master server that manages the file system namespaces and regulates access to files by clients. In addition, there are a number of DataNodes. HDFS exposes a file system namespace and allows user data to be stored in files. Internally, a file is split into one or more blocks are stored in a set of DataNodes.
The NameNode executes file system namespace operations like opening, closing, and renaming files and directories. It also defines the mapping of blocks to DataNodes. The DataNode are responsible for serving read and write requests from the file system's clients. The DataNodes also perform block creation, deletion, and replication upon instruction from the NameNode. The NameNode is the repository for all HDFS metadata.
1> File System Namespace
HDFS supports a traditional hierarchical file organization. The user or application can create directories and store the files inside the directories. The file system namespace hierarchy is similar to the traditional file system. You can create and remove files, move a file from one directory to another or rename a file. HDFS does not support hard links or soft links by default.
The NameNode maintains the file system namespace. Any change to the file system namespace or its properties is recorded by the NameNode. An application can specify the number of replicas of a file that should be maintained by HDFS. The number of copies of a file is called the replication factor of the file. This information is stored by the NameNode.
2> Data Replication:
HDFS is designed to reliably store very large files (GB or TB) across machines in a large cluster. It stores each file as a sequence of blocks. The blocks of a file are replicated across the multiple DataNodes for fault tolerance. The block size and replication factor are configurable per file.
All blocks in a file except the last block are the same size, while users can start a new block without filling out the last block to the configured block size after the support for variable length block was added to append and hsync.
An application can specify the number of replicas of a file. The replication factor can be specified at file creation time and can be changed later. Files in HDFS are write-once(except for appends and truncates) and only one writer can be existed at any time.
The NameNode makes all decisions based on replication of blocks. It receives a Heartbeat and a Blockreport from each of the DataNodes in the cluster. Heartbeat indicates if the DataNode is working properly. A Blockreport contains a list of all blocks on a DataNode.
3> Persistence of File System Metadata
Two files are used to store the meta data for Haddop file system in NameNode. File named EditLog is used to stroe HDFS namespace and persistently record every change of file system metadata. The file named FSImage is used to store the entire filesystem namespace, including the mapping of blocks to files and file system properties.
The NameNode keeps an image of the entire file system namespace and file Blockmap in memory. When the NameNode starts up, or a checkpoint is triggered by a configurable threshold, it reads the files named FSImage and EditLog from disk, applies all the transactions from the EditLog to the in-memory representation of the FSImage, and flushes out this new version into a new FSImage on disk. It can then truncate the old EditLog because its transactions have been applied to the persistent FSImage. This process is called a checkpoint. The purpose of a checkpoint is to make sure that HDFS has a consistent view of the file system metadata by taking a snapshot of the file system metadata and saving it to FSImage.
The DataNode stores HDFS data in files in its local file system. The DataNode has no knowledge about HDFS files. It stores each block of HDFS data in a separate file in its local system. It does not create all files in the same directory. It uses a heuristic method to find the optimal number of files per directory and split the optimal number of files into the sub-directories.
4> Data Organization:
i> Data Blocks:
Hadoop is designed to support very large files. The data is written only once but is read many times. A typical block size is 128MB. Thus, and HDFS file is chopped up into 128MB chunks, and in general each trunk will reside on a different DataNode.
A client request to create a file does not reach the NameNode immediately. In fact, the data is cached into the local buffer by HDFS client. When the local file accumulates data to reach one chunk size, the client contacts the NameNode. The NameNode inserts the file name into the HDFS system namespace and allocates a data block for it. The NameNode responds to the client request with the identity of the DataNode and the destination data block. Then the client flushes the chunk of data from the local buffer to the specified DataNode. When a file is closed, the remaining un-flushed data in the local buffer is transferred to the DataNode. The client then tells the NameNode that the file is closed. At this point, the NameNode commits the file creation operation into a persist store. If the NameNode is died before the file is closed, the file is lost.
iii> Replication Pipelining
The data is pipelined among the DataNodes. When the local buffer reaches the chunk size, the client retrieves a list of DataNodes from the NameNode. The list contains the DataNodes that will host a replica of that block. The client then flushes the data chunk to the first DataNode. The first DataNode startes receiving the data in small portions, write each portion to its local repository and transfers that portion to the second DataNode in the list. The second DataNode, in turn starts receiving the data in small portions, writes that portion to its repository and then flushes that portion to its repository and transfers that portion to the next DataNode. Finally all the Datanodes write the portion to their local repository.
Hadoop MapReduce is a software framework for easily writing applications which process vast amounts of data in parallel on large clusters in a reliable, fault-tolerant manner.
A MapReduce job usually splits the input data-set into independent chunks which are processed by map tasks in a parallel manner. The framework sorts the output of the maps which are then as input of the reduce tasks. Typically both the input and output of the job are stored in HDFS. The framework is responsible for scheduling tasks, monitoring them and re-executes the failed tasks.
The MapReduce consists of a single master ResourceManager, one work NodeManage per cluster-node, and MRAppMaster per application.
Minimally, applications define the input/output locations and provide map and reduce functions by implementing the appropriate interfaces and/or abstract classes. These and other job parameters comprise the job configuration.
The Hadoop client then submits the job and configuration to ResourceManager which then distribute the software/configuration to the workers, scheduling tasks and monitoring them, providing status and diagnostic information to the job client.
Inputs and Outputs
MapReduce framework exclusively operates on <key,value> paris. The framework regards the input to the job as a <key, value> pairs and produces a set of <key, values> as the output of the job.
MapReduce -- User Interfaces
Applications typically implement the Mapper and Reducer interfaces to provide the map and reduce methods. These form the core of the jobs.
Mapper maps input key/value pairs to a set of intermediate key/value pairs. Maps are the individual tasks that transform input records into intermediate records. Output pairs do not need to be of the same types as input pairs. Applications can use the "Counter" to report its statistics. The Mapper outputs are sorted and then partitioned per Reducer. The total number of partitions is the same as the number of reduce tasks for the job. The intermediate, sorted outputs are stored in a simple (key-len, key, value-len, value) format. Applications can control if, and how, the intermediate outputs are tob compressed.
The number of maps is usually determined by the total size of the inputs. In general it is the total number of blocks of the input files.
Reducer reduces a set of intermediate values which share a key to a smaller set of values. The number of reduces for the jobs can be define by the application. Reduce has the below 3 primary phases:
Input to the Reducer is the sorted output of the mappers. In this phase the framework fetches the relevant partition of the output of all the mappers, via HTTP.
The framework groups Reducer input by keys (since different mappers may have output the same key) in this stage. The shuffle and sort phases can occur concurrently, when map-outputs are being fetched they are merged.
In this phase the reduce method is called for each <key, (list of values)> pair in the grouped inputs. Application can use the Counter to report its statistics. The output of the Reducer is not sorted.
The right numbers of reduces seems to be 0.95 or 1.75 multiplied by (<no. of nodes> * <no. of maximum containers per node>). Increasing the number of reduces increases the framework overhead, but increases load balancing and lowers the cost of failures.
The reduce number can be set to zero if no reduction is desired. In this case the outputs of the map-tasks go directly to store in the HDFS. The framework does not sort the map-outputs before writing them out to the HDFS.
Partitioner partitions the key space. It controls the partitioning of the keys of the intermediate map-outputs. The key is used to derive the partition, typically by a hash function. The total number of partitions is the same as the number of reduce tasks for the job.
Counter is a facility for MapReduce applications to report its statistics. Mapper and Reducer implementations can use the Counter to report statistics.
2> Job Configuration
Job represents a MapReduce job configuration. Job is the primary interface for a user to describe a MapReduce job to the Hadoop framework for execution. Job is typically used to specify the Mapper, Combiner, Partitioner, Reducer, InputFormat, OutputFormat implementations. Optionally Job is used to specify other advanced facets of the job and it can set/get arbitrary parameters needed by applications.
3> Task Execution & Environment
MRAppMaster executes the Mapper/Reduce task as a child process in a separate jvm. The child process inherits the parameter of the parent MRAppMaster. You also can define the additional options in configuration parameters.
i> Memory management:
Users/Admins can specify the maximum virtual memory of of the child-task and any sub-process it launched recursively. The value set is for a per process.
The memory for some parts of the framework is also configurable. In map and reduce tasks, the performance can be influenced by the concurrency of the operations and frequency with which data hit disk. Monitoring the filesystem counters for a specific job relative to byte counts from the map and into the reduce- is important to tune these parameters.
ii> Map parameters:
A record sent from a map will be serialized into a buffer and metadata will be stored into accounting buffers. When either the serialization buffer or the metadata exceed a threshold, the contents of the buffers will be sorted and written to disk in the background while the map continues to send records. If buffer fills completely while the spill is in progress, the map thread will block. When the map is finished, any remaining records are written to disk and all on-disk segments are merged into a single file. Minimizing the number of spills to disk can decrease map time, but a larger buffer also decreases the memory available to the mapper.
iii> Shuffle/Reduce parameters:
As described previously, each reduce fetches the output assigned to it by the Partitioner via HTTP into memory and periodically merges these outputs to disk. If intermediate compression of map outputs is turned on, each output is decompressed into memory. It can be tuned to affect the frequency of these merges to disk prior to the reduce and the memory allocated to map output during the reduce.
Other components in Hadoop
YARN is the architectural center of Hadoop that allows multiple data processing engines such as interactive SQL, real-time streaming, data science and batch processing to handle data stored in a single platform, unlocking an entirely new approach to analytics. YARN provides resource management and a central platform to deliver consistent operations, security, and data governance tools across Hadoop clusters.
A web-based tool for provisioning, managing, and monitoring Apache Hadoop clusters which includes support for Hadoop HDFS, Hadoop MapReduce, Hive, HCatalog, HBase, ZooKeeper, Oozie, Pig and Sqoop. Ambari also provides a dashboard for viewing cluster health such as heatmaps and ability to view MapReduce, Pig and Hive applications visually alongwith features to diagnose their performance characteristics in a user-friendly manner.
HBase is the Hadoop database, a distributed, scalable, big data store. Use HBase when you need random, real-time read/write access to your Big Data. This project's goal is the hosting of very large tables -- billions of rows X millions of columns -- atop clusters of commodity hardware. Apache HBase is an open-source, distributed, versioned, non-relational database modeled after Google's Bigtable: A Distributed Storage System for Structured Data by Chang et al. Just as Bigtable leverages the distributed data storage provided by the Google File System, Apache HBase provides Bigtable-like capabilities on top of Hadoop and HDFS.
A data warehouse infrastructure that provides data summarization and ad hoc querying. Hive facilitates reading, writing, and managing large datasets residing in distributed storage using SQL. Structure can be projected onto data already in storage. A command line tool and JDBC driver are provided to connect users to Hive.
A fast and general compute engine for Hadoop data. Spark provides a simple and expressive programming model that supports a wide range of applications, including ETL, machine learning, stream processing, and graph computation.
The Knox API Gateway is designed as a reverse proxy with consideration for pluggability in the areas of policy enforcement, through providers and the backend services for which it proxies requests. Knox provides perimeter security so that the enterprise can confidently extend Hadoop access to more of those new users while also maintaining compliance with enterprise security policies.
Pig is a platform for analyzing large data sets that consists of a high-level language for expressing data analysis programs, coupled with infrastructure for evaluating these programs. The salient property of Pig programs is that their structure is amenable to substantial parallelization, which in turns enables them to handle very large data sets.
Flume is a distributed, reliable, and available service for efficiently collecting, aggregating, and moving large amounts of log data. It has a simple and flexible architecture based on streaming data flows. It is robust and fault tolerant with tunable reliability mechanisms and many failover and recovery mechanisms. It uses a simple extensible data model that allows for online analytic application.