Недавно закончил
курс
Hadoop Platform and
Application Framework
by University of
California, San Diego (San Diego Supercomputer Center)
Еще один курс
«для самых маленьких», по типу «Data
Manipulation at Scale: Systems and Algorithms».
Целевая
аудитория: школота, желающая знать «что
такое Hadoop, MapReduce и с чем это едят».
На лекциях
довольно много времени отведено рассказам
о том, кто на ком стоял и что есть что в
экосистеме Apache Hadoop.
Рассказывают
о том, как это работает, в принципе. Если
лень искать доки и из них это выколупывать,
то вполне можно лекции послушать.
На практике
дают пощупать HDFS, MapReduce и Spark руками,
через выполнение тестовых заданий по
обработке игрушечных датасетов на
виртмашине от Cloudera (Cloudera
QuickStart).
В целом, полезно.
Но времени жалко.
Silly bus
Hadoop Basics
Lesson 1: Big Data
Hadoop Stack
Lesson 2: Hands-On
Exploration of the Cloudera VM
Introduction to the
Hadoop Stack
Lesson 1: Overview of
the Hadoop Stack
Lesson 2: The Hadoop
Execution Environment
Lesson 3: Overview of
Hadoop based Applications and Services
Introduction to Hadoop
Distributed File System (HDFS)
Lesson 1: HDFS
Architecture and Configuration
Lesson 2: HDFS
Performance and Tuning
Lesson 3: HDFS Access,
Commands, APIs, and Applications
Introduction to
Map/Reduce
Lesson 1: Introduction
to Map/Reduce
Lesson 2: Map/Reduce
Examples and Principles
Assignment: Running
Wordcount with Hadoop streaming, using Python code
Assignment: Joining
Data
Spark
Lesson 1: Introduction
to Apache Spark
Lesson 2: Resilient
Distributed Datasets and Transformations
Lesson 3: Job
scheduling, Actions, Caching and Shared Variables
Assignment: Simple Join
in Spark
Assignment: Advanced
Join in Spark
Дайджест
Big Data Hadoop Stack
storage and large scale
processing of data-sets on clusters of commodity hardware.
Simple batch-processing
framework, moving computations to data.
Hardware failures
handled automatically (data replication, process tracking).
Schema-on-read
approach. Big amount of data + simple analisys beats small data +
complex analisys.
Scalability,
Reliability, Flexibility, Cost
Components:
– Hadoop Common (libs
and utils)
– HDFS (Hadoop
Distributed File System), very high aggregate bandwidth
– Hadoop Map-Reduce
(later YARN + MR): resource management platform, scheduler (MR:
programming model)
On top of that other
applications and tools, like:
– HCatalog
– PIG
– Hive
– HBase
– ...
Nodes and tasks:
– Master node, Slave
nodes
– HDFS: Name node,
Data nodes
– MR: Task manager,
Job tracker, Task trackers
HDFS
– Java, distributed,
scalable, portable
– reliability via
replicating data
– Namenode (+
secondary Namenode): metadata; heartbeats, replication, balancing
– Datanodes: blocks
MR as a scheduler:
– Job tracker on
Master server, pushes work to Slave servers to available Task
trackers
– now we have YARN
YARN aka Hadoop 2
(NextGen):
– Hadoop1: (HDFS, MR)
vs Hadoop2: (HDFS, YARN, MR)
– separates resource
management and data processing components
– on master: resource
manager; on slaves: node manager, app master, container
– supports other
workloads (graph processing, iterative modeling)
– multiple HDFS
access engines (batch, interactive, realtime/streaming)
Hadoop Zoo
– Google stack,
Facebook stack, Yahoo, Cloudera: all like twins
– blocks data
storage, coordinator/scheduler/manager, metadata, columns/records,
query languages, warehousing, ...
Apache Sqoop (SQL for
Hadoop)
– for transferring
data between Hadoop and RDB (SQL)
HBase (Google BigTable)
– column-oriented
DBMS
– key-value store
– dynamic data model
– fast random data
access
PIG, PIG Latin
– high level
programming on top of MR
– data analysis as
data flows
Apache Hive
– data warehousing,
quering, managing
– can project
structure on data
– SQL-like queries
(HiveQL)
– allow to use MR
processing
Oozie
– workflow scheduler
to manage Hadoop jobs
– supports MR, Pig,
Hive, Sqoop, …
– Oozie workflow jobs
is a DAG
– Oozie Coordinator
jobs is a recurrent workflow jobs, triggered by frequency or data
availability
Zookeeper
– operational
services for a Hadoop cluster
– centralized service
for maintaining config, naming, synchronization, …
Flume
– for log data,
streaming: collect, aggregate, move, …
Impala
– MPP (massively
parallel processing) SQL query engine
Apache Spark
– fast in-memory data
processing
– works on top of
YARN but have own cluster manager for standalone mode
– can be started on
any storage
Hadoop stack
We have more than one
distribution => building blocks can be somewhat different
HDFS
– files divided to
blocks (file as list of blocks), blocks stored on data nodes
– each block is
replicated (3 copy)
– metadata on a name
node
– HDFS2 added: HDFS
federation, multiple name nodes, namespaces, block pools
(performance, isolation)
– heterogeneous
storage (archive, disk, ssd, ram_disk)
MapReduce
– parallel data
processing framework
– MR job splits data
into chunks
– map task process
data chunks (line: key, value)
– framework
sorts/shuffle map output by key
– reduce tasks use
sorted map data as input
– compute and storage
nodes are the same
Vanilla MR 1
– single master
JobTracker, schedules, monitors, re-execute tasks
– one slave
TaskTracker per node
– TaskTracker
executes tasks per JobTracker requests
YARN, MR 2
– separate resource
management and job scheduling/monitoring
– global
ResourceManager (get job req. from client; get node status from NM)
– RM have scheduler,
allocating resources (capacity queues, …); app.manager, negotiates
first container for application
– NodeManager on each
node
– ApplicationMaster:
one for each application
– AM do the job using
Containers
Additional YARN
features
– high availability
resource manager, stand-by RMs in case RM goes down
– timeline server,
historic information for storage and apps (history of resource usage,
...)
– use of Cgroups,
containers resources management
– secure containers,
for particular users
– YARN REST web API
Hadoop resource
scheduling
– resource
management, different scheduling algorithms, parameters
– want to control
resource usage
– schedulers: FIFO
(default); fairshare; capacity
– fairshare try to
balance resources across apps (memory pools wants to be eq. size)
– capacity gives you
a garantee for asked resources (resources divided among queues,
apps/users wait in queue with asked capacity)
capacity sheduler
– queues and
subqueues
– capacity garantee
with elasticity (can expand resources)
– runtime changes
(draining/waiting apps)
– ACL security
fairshare scheduler
– balances out
resource allocation among apps over time
– organize into
queues/subqueues
– guarantee min.
shares
– limits per user/app
– weighted app
priorities
Limitations of classic
MapReduce: interactive or iterative tasks inefficient
– executions
frameworks like YARN, Tez, Spark: support complex DAG of tasks, RAM
caching
– Tez, Spark on top
of YARN (Spark can work w/o YARN, standalone)
The Apache Tez project
is aimed at building an application framework
which allows for a
complex directed-acyclic-graph of tasks for processing data
– dataflow graphs
– custom data types
– complex DAG of
tasks, dynamic DAG changes
Tez example: Hive SQL
query (2 joins) on MR vs Hive on Tez
– MR chain with 4
map-reduce stages, writes on HDFS
– Tez chain w/o
intermediate HDFS read/write, 4 reduce stages, only 2 map stages
– reusing containers
and data, less map jobs
Spark, 100x faster
– asvanced DAG
exec.engine
– supports cyclic
data flow (iterations: for iter in iterations: gradient =
pointsRDD.map(...).reduce... )
– in-memory
computing, cache
– many languages:
Java, Scala, Python, R
– optimized libs
Databases/stores
– Avro: data
structures for MR
– HBase: NoSQL
database
– Cassandra: NoSQL
database
Querying
– Pig, declarative
language over MR
– Hive, manage and
query large datasets
– Impala, low-latency
SQL query
– Spark, general
processing, in-memory
ML, graph processing
– Giraph, iterative
graph processing
– Mahout, framework
for ML apps over Spark, Hadoop
– Spark, general …
Pig, PigLatin
– platform for data
processing
– PigLatin: high
level language
– pig execution:
local, MR, Tez
– extensible
– usage: ETL;
manipulating, analysing raw data
– can run as scripts,
embedded programs
Hive
– query, manage data
using HiveQL (SQL-like)
– run interactively
using beeline, other run options (Hcatalog, WebHcat)
– warehouse tools
– exec.environments:
MR, Tez, Spark
– data in HDFS, HBase
– custom
mappers/reducers
– usage: data mining,
analytics; ML; ad hoc analysis
HBase
– non-relational
distributed database on top of HDFS
– compression,
in-memory operations (memstore, blockcache)
– Consistency, high
Availability, Partitioning (auto sharding)
– replication,
security
– SQL-like access
(using Hive, Spark, Impala)
– HBase API, external
API, MR
HDFS in details
– tons of nodes
– portability
– large datasets
– high throughput
(aggregated)
design
– simplified
coherency model: write once read many
– data replication
– move computation to
data
– relax POSIX
requirements (client-side buffering)
– name node:
namespace, metadata, regulates access
– data nodes: manage
storage, serving read/write requests from clients, block creation,
deletion, replication based on NN instructions
HDFS block size
– default BS = 64 MB,
good for large files
– small files = big
problems
– impact NN memory
usage, number of map tasks
– memory usage: 150
bytes / object => 300 BG for 1 billion blocks
– network load:
number of checks proportional to number of blocks
– many small blocks
is bad
– many small files is
worse
large number of small
files is bad
– map tasks: depends
on #blocks, 10GB data, 32k file size => 327680 map tasks.
lots of queued tasks;
large overhead of start/stop tasks; inefficient I/O
– many map tasks is
bad
– latency: start/stop
java processes
– disk I/O low for
small data blocks
options for small
files?
– merge/concatenate
– sequence
– HBase, HIVE
configuration
–
CombineFileInputFormat in API
Write to HDFS
– data cached on
client up to block size (POSIX relaxation)
– size up to block
size, contact to NameNode: where block should go
– NN responds with a
list of DataNodes, rack aware: second replica goes to remote rack
– first DN recieves
data and save it to local block; replicate block to second DN
– second DN replicate
block to third DN; replication pipelined, block goes to all nodes
immediately
– NN commits writed
data into persistent store; recieves heartbeat and block report from
DN
– if fail occures, NN
start recovery procedure
Read from HDFS
– client gets data
node list from name node
– read from closest
replica
HDFS config:
hdfs-site.xml
– dfs.blocksize;
dfs.block.size
– dfs.replication,
default 3 (lower repl => more space; higher repl => can make
data local to more workers)
Failures
– data node failures,
network f., name node f.
– name node recieves
heartbeat, block reports from data nodes
– DN w/o recent
heartbeat: marked dead, no new IO, blocks below repl.factor
re-replicated
– checksum computed
on file creation, stored in namespace
– checksum used to
check retrieved data, re-read from replica if need
– multiple copies of
metadata
– failover-to-standby
name node, manual by default
HDFS access
– bin/hdfs script:
– user commands,
filesystem shell commands
– administrator
commands
– debug commands
– native Java API,
org.apache.hadoop.fs.FileSystem
– C API, libhdfs
– WebHDFS REST API
– NFS gateway
– other options:
Flume, Sqoop, …
commands example
– hdfs dfs -ls /
– hdfs dfs -mkdir
/user/test
dd if=/dev/zero of=sample.txt bs=64M count=16 sudo -u hdfs hdfs dfs -chown -R cloudera /user/test sudo -u hdfs hdfs dfs -put sample.txt /user/test/ hdfs fsck /user/test/sample.txt sudo -u hdfs hdfs dfsadmin -report
HDFS API
–
org.apache.hadoop.fs.FileSystem
– FSDataInputStream,
FSDataOutputStream
– methods: get, open,
create
FileSystem fs = FileSystem.get(URI.create(uri), conf); in = fs.open(new Path(uri)); IOUtils.copyBytes(in, System.out, 4096, false); ...
REST API for HDFS
– in hdfs-site.xml:
dfs.webhdfs.enabled; dfs.web.authentication.kerberos.principal;
...kerberos.keytab
hdfs getconf -confKey dfs.webhdfs.enabled
– example
service hadoop-httpfs start curl -i "http://quickstart.cloudera:14000/webhdfs/v1/user/cloudera?user.name=cloudera&op=GETFILESTATUS" curl -i -X PUT "http://quickstart.cloudera:14000/webhdfs/v1/user/test?user.name=cloudera&op=MKDIRS&permission=755" curl -i "http://quickstart.cloudera:14000/webhdfs/v1/user/test?user.name=cloudera&op=GETCONTENTSUMMARY"
MapReduce in details
– bring computations
to data
– hide all messy
details about cluster failures, communications, synchronisation,
whatnot
– model: sweep
through all data
– user defines: (key,
value) items; mapper; reducer
– Hadoop handles the
logistics (distribution (code, data), execution), brings mapper to
datanodes
– mapper takes data
item (line of text from file-HDFS block); outputs (key, value) pairs
– Hadoop shuffles
pairs into partitions, sorts within the partitions and feed sorted
pairs to reducers
– reducer take (key,
value) and output result (grouping, aggregating, calculating, …)
– trade-offs:
programming complexity / Hadoop workload
– common mappers:
filter, identity, splitter, …
running MR
– streaming
(stdin/stdout pipes)
– API
word count example
– mapper take line;
produce pairs (word, 1)
– MR sort mapper
output, feed it to reducer
– reducer take (word,
1); if current word == input word => word count += 1; output
(word, count) if curr. word != inp.word
principles
– 1 mapper per data
split
– 1 reducer per core
(processing time, number of output files)
– key-value
simplicity (shuffling & grouping)
– good task
decomposition: simple and separable mapper; easy consolidation in
reducers
– composite keys
– extra info in
values
– cascade MR jobs
– bin keys into
ranges
– aggregate map
output (combiner option, grouping in shuffle phase?)
inner join
– join field must be
the key for reducer => mapper output (join field, data)
vectors (matrices)
multiplication
– ab = (a1 * b1) +
(a2 * b2) …
– mapper output
key-values is (index, values)
– N groups are
shuffled to reducers, may be slow
– you can use
binning: R bins => map output (bin, index, data)
– you pay by reducer
complexity
MR limitations
– not iterative
– not interactive
– problem must fit in
MR model
Spark
Spark provides great performance advantages over Hadoop MapReduce,
especially for iterative algorithms,
thanks to in-memory
caching. Also, gives Data Scientists an easier way to write their
analysis pipeline in Python and Scala,
even providing
interactive shells to play live with data.
features
– on top of HDFS,
YARN but can work standalone on any storage
– fast (cache, no
intermediate HDFS writes)
– interactive (Scala,
Python, R shells)
– iterative
– any DAG workflow
cluster
– worker nodes with
Spark Executor (JVM) linked to HDFS data;
some tasks will be send
to HDFS nodes
– Cluster Manager
(YARN/Standalone): provision/restart workers
– Driver program, DAG
processing, action results
PySpark setup
sudo easy_install ipython==1.2.1 PYSPARK_DRIVER_PYTHON=ipython pyspark
Concepts
– RDD
– transformations
(lazy)
– actions
– caching
– shared variables
(one-way transfer)
RDD: resilient
distributed dataset (from storage, from RDD transformations)
– divided in
partitions (atomic chunks of data), immutable;
– track history of
each partition, re-run if necessary;
a_RDD = sc.parallelize(range(10), 3) b_RDD = a_RDD.map(lambda x: x+1)) # transformation b_RDD.glom.collect() # action
transformations: lazy
operations on RDD
– worker → worker
– RDD = RDD.transform
– pipeline as chain
of transformations;
– map, flatMap(one =>
many), filter, sample, coalesce: narrow;
– narrow (local) vs
wide (network) transformations;
– groupByKey: wide
transformation; also: reduceByKey, repartition
know shuffle, avoid it:
replace groupByKey by reduceByKey
Workflow DAG (directed
acyclic graph)
– nodes: RDDs; edges:
transformations
– dependency flow
(RDD created from …)
– lineage of
provenance
– partition depends
on another part.
– tracking for
recovery (node fails)
– going back thru DAG
and reexecuting what needed
– when narrow =>
easy to track
– when wide,
dependency is more complex
– detecting
independend tasks => parallelism
actions: final stage of
workflow, worker → driver
– triggers execution
of the DAG
– returns result to
the driver (or writes to HDFS)
– collect; take;
reduce; saveAsTextFile
caching
– mark RDD with
.cache()
– lazy
– for iterative
algorithm
– to memory, disk,
both
broadcast variables
– large variable used
in all nodes read-only
– transfer just once
per Executor
– torrent-like
transfer
– good for config,
lookup table, join (if table fits in memory)
conf = sc.broadcast({a: b, c: d}) … conf.value
accumulators
– write-only on nodes
– collect data across
the cluster
acc = sc.accumulator(0) def test(x): acc.add(x) rdd.foreach(test) acc.value
Примеры кода
выложу завтра, отдельным постом.
Более подробно
про Spark:
Мои конспекты
по курсу BerkeleyX: CS190.1x Scalable Machine Learning
original post http://vasnake.blogspot.com/2016/02/hadoop-platform-and-application.html
Этот комментарий был удален администратором блога.
ОтветитьУдалитьIt is nice blog Thank you porovide important information and i am searching for same information to save my time Big Data Hadoop Online Course
ОтветитьУдалитьЭтот комментарий был удален администратором блога.
ОтветитьУдалить