Записки программиста, обо всем и ни о чем. Но, наверное, больше профессионального.


Hadoop Platform and Application Framework

Недавно закончил курс
Hadoop Platform and Application Framework
by University of California, San Diego (San Diego Supercomputer Center)

Еще один курс «для самых маленьких», по типу «Data Manipulation at Scale: Systems and Algorithms».
Целевая аудитория: школота, желающая знать «что такое Hadoop, MapReduce и с чем это едят».
На лекциях довольно много времени отведено рассказам о том, кто на ком стоял и что есть что в экосистеме Apache Hadoop.
Рассказывают о том, как это работает, в принципе. Если лень искать доки и из них это выколупывать, то вполне можно лекции послушать.
На практике дают пощупать HDFS, MapReduce и Spark руками, через выполнение тестовых заданий по обработке игрушечных датасетов на виртмашине от Cloudera (Cloudera QuickStart).
В целом, полезно. Но времени жалко.

Silly bus

Hadoop Basics
Lesson 1: Big Data Hadoop Stack
Lesson 2: Hands-On Exploration of the Cloudera VM

Introduction to the Hadoop Stack
Lesson 1: Overview of the Hadoop Stack
Lesson 2: The Hadoop Execution Environment
Lesson 3: Overview of Hadoop based Applications and Services

Introduction to Hadoop Distributed File System (HDFS)
Lesson 1: HDFS Architecture and Configuration
Lesson 2: HDFS Performance and Tuning
Lesson 3: HDFS Access, Commands, APIs, and Applications

Introduction to Map/Reduce
Lesson 1: Introduction to Map/Reduce
Lesson 2: Map/Reduce Examples and Principles
Assignment: Running Wordcount with Hadoop streaming, using Python code
Assignment: Joining Data

Lesson 1: Introduction to Apache Spark
Lesson 2: Resilient Distributed Datasets and Transformations
Lesson 3: Job scheduling, Actions, Caching and Shared Variables
Assignment: Simple Join in Spark
Assignment: Advanced Join in Spark


Big Data Hadoop Stack

Apache Hadoop is an open source software framework for
storage and large scale processing of data-sets on clusters of commodity hardware.

Simple batch-processing framework, moving computations to data.

Hardware failures handled automatically (data replication, process tracking).

Schema-on-read approach. Big amount of data + simple analisys beats small data + complex analisys.

Scalability, Reliability, Flexibility, Cost

– Hadoop Common (libs and utils)
– HDFS (Hadoop Distributed File System), very high aggregate bandwidth
– Hadoop Map-Reduce (later YARN + MR): resource management platform, scheduler (MR: programming model)

On top of that other applications and tools, like:
– HCatalog
– Hive
– HBase
– ...

Nodes and tasks:
– Master node, Slave nodes
– HDFS: Name node, Data nodes
– MR: Task manager, Job tracker, Task trackers

– Java, distributed, scalable, portable
– reliability via replicating data
– Namenode (+ secondary Namenode): metadata; heartbeats, replication, balancing
– Datanodes: blocks

MR as a scheduler:
– Job tracker on Master server, pushes work to Slave servers to available Task trackers
– now we have YARN

YARN aka Hadoop 2 (NextGen):
– Hadoop1: (HDFS, MR) vs Hadoop2: (HDFS, YARN, MR)
– separates resource management and data processing components
– on master: resource manager; on slaves: node manager, app master, container
– supports other workloads (graph processing, iterative modeling)
– multiple HDFS access engines (batch, interactive, realtime/streaming)

Hadoop Zoo
– Google stack, Facebook stack, Yahoo, Cloudera: all like twins
– blocks data storage, coordinator/scheduler/manager, metadata, columns/records, query languages, warehousing, ...

Apache Sqoop (SQL for Hadoop)
– for transferring data between Hadoop and RDB (SQL)

HBase (Google BigTable)
– column-oriented DBMS
– key-value store
– dynamic data model
– fast random data access

PIG, PIG Latin
– high level programming on top of MR
– data analysis as data flows

Apache Hive
– data warehousing, quering, managing
– can project structure on data
– SQL-like queries (HiveQL)
– allow to use MR processing

– workflow scheduler to manage Hadoop jobs
– supports MR, Pig, Hive, Sqoop, …
– Oozie workflow jobs is a DAG
– Oozie Coordinator jobs is a recurrent workflow jobs, triggered by frequency or data availability

– operational services for a Hadoop cluster
– centralized service for maintaining config, naming, synchronization, …

– for log data, streaming: collect, aggregate, move, …

– MPP (massively parallel processing) SQL query engine

Apache Spark
– fast in-memory data processing
– works on top of YARN but have own cluster manager for standalone mode
– can be started on any storage

Hadoop stack

HDFS → YARN → Tez (exec.engine) under MR (batch), Pig, Hive, ...

We have more than one distribution => building blocks can be somewhat different

– files divided to blocks (file as list of blocks), blocks stored on data nodes
– each block is replicated (3 copy)
– metadata on a name node
– HDFS2 added: HDFS federation, multiple name nodes, namespaces, block pools (performance, isolation)
– heterogeneous storage (archive, disk, ssd, ram_disk)

– parallel data processing framework
– MR job splits data into chunks
– map task process data chunks (line: key, value)
– framework sorts/shuffle map output by key
– reduce tasks use sorted map data as input
– compute and storage nodes are the same

Vanilla MR 1
– single master JobTracker, schedules, monitors, re-execute tasks
– one slave TaskTracker per node
– TaskTracker executes tasks per JobTracker requests

– separate resource management and job scheduling/monitoring
– global ResourceManager (get job req. from client; get node status from NM)
– RM have scheduler, allocating resources (capacity queues, …); app.manager, negotiates first container for application
– NodeManager on each node
– ApplicationMaster: one for each application
– AM do the job using Containers

Additional YARN features
– high availability resource manager, stand-by RMs in case RM goes down
– timeline server, historic information for storage and apps (history of resource usage, ...)
– use of Cgroups, containers resources management
– secure containers, for particular users

Hadoop resource scheduling
– resource management, different scheduling algorithms, parameters
– want to control resource usage
– schedulers: FIFO (default); fairshare; capacity
– fairshare try to balance resources across apps (memory pools wants to be eq. size)
– capacity gives you a garantee for asked resources (resources divided among queues, apps/users wait in queue with asked capacity)

capacity sheduler
– queues and subqueues
– capacity garantee with elasticity (can expand resources)
– runtime changes (draining/waiting apps)
– ACL security

fairshare scheduler
– balances out resource allocation among apps over time
– organize into queues/subqueues
– guarantee min. shares
– limits per user/app
– weighted app priorities

Limitations of classic MapReduce: interactive or iterative tasks inefficient
– executions frameworks like YARN, Tez, Spark: support complex DAG of tasks, RAM caching
– Tez, Spark on top of YARN (Spark can work w/o YARN, standalone)

The Apache Tez project is aimed at building an application framework
which allows for a complex directed-acyclic-graph of tasks for processing data
– dataflow graphs
– custom data types
– complex DAG of tasks, dynamic DAG changes

Tez example: Hive SQL query (2 joins) on MR vs Hive on Tez
– MR chain with 4 map-reduce stages, writes on HDFS
– Tez chain w/o intermediate HDFS read/write, 4 reduce stages, only 2 map stages
– reusing containers and data, less map jobs

Spark, 100x faster
– asvanced DAG exec.engine
– supports cyclic data flow (iterations: for iter in iterations: gradient = pointsRDD.map(...).reduce... )
– in-memory computing, cache
– many languages: Java, Scala, Python, R
– optimized libs

– Avro: data structures for MR
– HBase: NoSQL database
– Cassandra: NoSQL database

– Pig, declarative language over MR
– Hive, manage and query large datasets
– Impala, low-latency SQL query
– Spark, general processing, in-memory

ML, graph processing
– Giraph, iterative graph processing
– Mahout, framework for ML apps over Spark, Hadoop
– Spark, general …

Pig, PigLatin
– platform for data processing
– PigLatin: high level language
– pig execution: local, MR, Tez
– extensible
– usage: ETL; manipulating, analysing raw data
– can run as scripts, embedded programs

– query, manage data using HiveQL (SQL-like)
– run interactively using beeline, other run options (Hcatalog, WebHcat)
– warehouse tools
– exec.environments: MR, Tez, Spark
– data in HDFS, HBase
– custom mappers/reducers
– usage: data mining, analytics; ML; ad hoc analysis

– non-relational distributed database on top of HDFS
– compression, in-memory operations (memstore, blockcache)
– Consistency, high Availability, Partitioning (auto sharding)
– replication, security
– SQL-like access (using Hive, Spark, Impala)
– HBase API, external API, MR

HDFS in details

– low cost commodity hardware
– tons of nodes
– portability
– large datasets
– high throughput (aggregated)

– simplified coherency model: write once read many
– data replication
– move computation to data
– relax POSIX requirements (client-side buffering)
– name node: namespace, metadata, regulates access
– data nodes: manage storage, serving read/write requests from clients, block creation, deletion, replication based on NN instructions

HDFS block size
– default BS = 64 MB, good for large files
– small files = big problems
– impact NN memory usage, number of map tasks
– memory usage: 150 bytes / object => 300 BG for 1 billion blocks
– network load: number of checks proportional to number of blocks
– many small blocks is bad
– many small files is worse

large number of small files is bad
– map tasks: depends on #blocks, 10GB data, 32k file size => 327680 map tasks.
lots of queued tasks; large overhead of start/stop tasks; inefficient I/O
– many map tasks is bad
– latency: start/stop java processes
– disk I/O low for small data blocks

options for small files?
– merge/concatenate
– sequence
– HBase, HIVE configuration
– CombineFileInputFormat in API

Write to HDFS
– data cached on client up to block size (POSIX relaxation)
– size up to block size, contact to NameNode: where block should go
– NN responds with a list of DataNodes, rack aware: second replica goes to remote rack
– first DN recieves data and save it to local block; replicate block to second DN
– second DN replicate block to third DN; replication pipelined, block goes to all nodes immediately
– NN commits writed data into persistent store; recieves heartbeat and block report from DN
– if fail occures, NN start recovery procedure

Read from HDFS
– client gets data node list from name node
– read from closest replica

HDFS config: hdfs-site.xml
– dfs.blocksize; dfs.block.size
– dfs.replication, default 3 (lower repl => more space; higher repl => can make data local to more workers)

– data node failures, network f., name node f.
– name node recieves heartbeat, block reports from data nodes
– DN w/o recent heartbeat: marked dead, no new IO, blocks below repl.factor re-replicated
– checksum computed on file creation, stored in namespace
– checksum used to check retrieved data, re-read from replica if need
– multiple copies of metadata
– failover-to-standby name node, manual by default

HDFS access
– bin/hdfs script:
– user commands, filesystem shell commands
– administrator commands
– debug commands
– native Java API, org.apache.hadoop.fs.FileSystem
– C API, libhdfs
– NFS gateway
– other options: Flume, Sqoop, …

commands example
– hdfs dfs -ls /
– hdfs dfs -mkdir /user/test
dd if=/dev/zero of=sample.txt bs=64M count=16
sudo -u hdfs hdfs dfs -chown -R cloudera /user/test
sudo -u hdfs hdfs dfs -put sample.txt /user/test/
hdfs fsck /user/test/sample.txt
sudo -u hdfs hdfs dfsadmin -report

– org.apache.hadoop.fs.FileSystem
– FSDataInputStream, FSDataOutputStream
– methods: get, open, create
FileSystem fs = FileSystem.get(URI.create(uri), conf);
in = fs.open(new Path(uri)); 
IOUtils.copyBytes(in, System.out, 4096, false);

– in hdfs-site.xml: dfs.webhdfs.enabled; dfs.web.authentication.kerberos.principal; ...kerberos.keytab
hdfs getconf -confKey dfs.webhdfs.enabled
– example
service hadoop-httpfs start
curl -i "http://quickstart.cloudera:14000/webhdfs/v1/user/cloudera?user.name=cloudera&op=GETFILESTATUS" 
curl -i -X PUT "http://quickstart.cloudera:14000/webhdfs/v1/user/test?user.name=cloudera&op=MKDIRS&permission=755" 
curl -i "http://quickstart.cloudera:14000/webhdfs/v1/user/test?user.name=cloudera&op=GETCONTENTSUMMARY" 

MapReduce in details

– large number of internet documents that need to be indexed
– bring computations to data
– hide all messy details about cluster failures, communications, synchronisation, whatnot
– model: sweep through all data
– user defines: (key, value) items; mapper; reducer
– Hadoop handles the logistics (distribution (code, data), execution), brings mapper to datanodes
– mapper takes data item (line of text from file-HDFS block); outputs (key, value) pairs
– Hadoop shuffles pairs into partitions, sorts within the partitions and feed sorted pairs to reducers
– reducer take (key, value) and output result (grouping, aggregating, calculating, …)
– trade-offs: programming complexity / Hadoop workload
– common mappers: filter, identity, splitter, …

running MR
– streaming (stdin/stdout pipes)

word count example
– mapper take line; produce pairs (word, 1)
– MR sort mapper output, feed it to reducer
– reducer take (word, 1); if current word == input word => word count += 1; output (word, count) if curr. word != inp.word

– 1 mapper per data split
– 1 reducer per core (processing time, number of output files)
– key-value simplicity (shuffling & grouping)
– good task decomposition: simple and separable mapper; easy consolidation in reducers
– composite keys
– extra info in values
– cascade MR jobs
– bin keys into ranges
– aggregate map output (combiner option, grouping in shuffle phase?)

inner join
– join field must be the key for reducer => mapper output (join field, data)

vectors (matrices) multiplication
– ab = (a1 * b1) + (a2 * b2) …
– mapper output key-values is (index, values)
– N groups are shuffled to reducers, may be slow
– you can use binning: R bins => map output (bin, index, data)
– you pay by reducer complexity

MR limitations
– not iterative
– not interactive
– problem must fit in MR model


Spark provides great performance advantages over Hadoop MapReduce, especially for iterative algorithms,
thanks to in-memory caching. Also, gives Data Scientists an easier way to write their analysis pipeline in Python and Scala,
even providing interactive shells to play live with data.

– on top of HDFS, YARN but can work standalone on any storage
– fast (cache, no intermediate HDFS writes)
– interactive (Scala, Python, R shells)
– iterative
– any DAG workflow

– worker nodes with Spark Executor (JVM) linked to HDFS data;
some tasks will be send to HDFS nodes
– Cluster Manager (YARN/Standalone): provision/restart workers
– Driver program, DAG processing, action results

PySpark setup
sudo easy_install ipython==1.2.1

– transformations (lazy)
– actions
– caching
– shared variables (one-way transfer)

RDD: resilient distributed dataset (from storage, from RDD transformations)
– divided in partitions (atomic chunks of data), immutable;
– track history of each partition, re-run if necessary;
a_RDD = sc.parallelize(range(10), 3) 
b_RDD = a_RDD.map(lambda x: x+1)) # transformation 
b_RDD.glom.collect() # action 

transformations: lazy operations on RDD
– worker → worker
– RDD = RDD.transform
– pipeline as chain of transformations;
– map, flatMap(one => many), filter, sample, coalesce: narrow;
– narrow (local) vs wide (network) transformations;
– groupByKey: wide transformation; also: reduceByKey, repartition

know shuffle, avoid it: replace groupByKey by reduceByKey

Workflow DAG (directed acyclic graph)
– nodes: RDDs; edges: transformations
– dependency flow (RDD created from …)
– lineage of provenance
– partition depends on another part.
– tracking for recovery (node fails)
– going back thru DAG and reexecuting what needed
– when narrow => easy to track
– when wide, dependency is more complex
– detecting independend tasks => parallelism

actions: final stage of workflow, worker → driver
– triggers execution of the DAG
– returns result to the driver (or writes to HDFS)
– collect; take; reduce; saveAsTextFile

– mark RDD with .cache()
– lazy
– for iterative algorithm
– to memory, disk, both

broadcast variables
– large variable used in all nodes read-only
– transfer just once per Executor
– torrent-like transfer
– good for config, lookup table, join (if table fits in memory)
conf = sc.broadcast({a: b, c: d})

– write-only on nodes
– collect data across the cluster
acc = sc.accumulator(0) 
def test(x): acc.add(x) 

Примеры кода выложу завтра, отдельным постом.

Более подробно про Spark:
Мои конспекты по курсу BerkeleyX: CS190.1x Scalable Machine Learning

original post http://vasnake.blogspot.com/2016/02/hadoop-platform-and-application.html

2 комментария:

Архив блога


linux (241) python (191) citation (186) web-develop (170) gov.ru (159) video (124) бытовуха (114) sysadm (100) GIS (97) Zope(Plone) (88) бурчалки (84) Book (82) programming (82) грабли (77) development (73) Fun (72) windsurfing (72) Microsoft (64) hiload (62) opensource (58) internet provider (57) security (57) опыт (55) movie (52) Wisdom (51) ML (47) language (45) hardware (44) money (42) JS (41) driving (41) curse (40) DBMS (38) bigdata (38) ArcGIS (34) history (31) PDA (30) howto (30) holyday (29) Google (27) Oracle (27) tourism (27) virtbox (27) health (26) vacation (24) AI (23) Autodesk (23) SQL (23) Java (22) humor (22) knowledge (22) translate (20) CSS (19) cheatsheet (19) hack (19) Apache (16) Manager (15) web-browser (15) Никонов (15) functional programming (14) happiness (14) music (14) todo (14) PHP (13) course (13) weapon (13) HTTP. Apache (12) SSH (12) frameworks (12) hero (12) im (12) scala (12) settings (12) HTML (11) SciTE (11) USA (11) crypto (11) game (11) map (11) HTTPD (9) ODF (9) купи/продай (9) Photo (8) benchmark (8) documentation (8) 3D (7) CS (7) DNS (7) NoSQL (7) cloud (7) django (7) gun (7) matroska (7) telephony (7) Microsoft Office (6) VCS (6) bluetooth (6) pidgin (6) proxy (6) Donald Knuth (5) ETL (5) NVIDIA (5) REST (5) bash (5) flash (5) keyboard (5) price (5) samba (5) CGI (4) LISP (4) RoR (4) cache (4) display (4) holywar (4) nginx (4) pistol (4) xml (4) Лебедев (4) IDE (3) IE8 (3) J2EE (3) NTFS (3) RDP (3) holiday (3) mount (3) spark (3) Гоблин (3) кухня (3) урюк (3) AMQP (2) ERP (2) IE7 (2) NAS (2) Naudoc (2) PDF (2) address (2) air (2) british (2) coffee (2) font (2) ftp (2) messaging (2) notify (2) sharepoint (2) ssl/tls (2) stardict (2) tests (2) tunnel (2) udev (2) APT (1) CRUD (1) Canyonlands (1) Cyprus (1) DVDShrink (1) Jabber (1) K9Copy (1) Matlab (1) Palanga (1) Portugal (1) VBA (1) WD My Book (1) autoit (1) bike (1) cannabis (1) chat (1) concurrent (1) dbf (1) ext4 (1) idioten (1) krusader (1) license (1) mindmap (1) navitel (1) pneumatic weapon (1) quiz (1) regexp (1) robot (1) science (1) serialization (1) tie (1) vim (1) Науру (1) крысы (1) налоги (1) пианино (1)