Вчера вышла заметка
про курс
Hadoop Platform and
Application Framework
by University of
California, San Diego (San Diego Supercomputer Center)
Я обещал оформить куски кода отдельно.
Вот, иллюстративный материал в виде сниппетов.
Cloudera QuickStart VM, practice
username: cloudera,
password: cloudera, hostname: quickstart
Берем образ виртмашины (полный фарш), запускаем, логинимся на рабочий стол. Далее все операции через веб-интерфейс и терминал. В общем, там внутре подробная шпаргалка.
Load data from SQL to
HDFS, Sqoop:
sqoop import-all-tables \ -m 1 \ --connect jdbc:mysql://quickstart:3306/retail_db \ --username=retail_dba \ --password=cloudera \ --compression-codec=snappy \ --as-avrodatafile \ --warehouse-dir=/user/hive/warehouse hadoop fs -ls /user/hive/warehouse Found 6 items drwxr-xr-x - cloudera hive 0 2016-01-04 11:50 /user/hive/warehouse/categories drwxr-xr-x - cloudera hive 0 2016-01-04 11:51 /user/hive/warehouse/customers drwxr-xr-x - cloudera hive 0 2016-01-04 11:51 /user/hive/warehouse/departments drwxr-xr-x - cloudera hive 0 2016-01-04 11:52 /user/hive/warehouse/order_items drwxr-xr-x - cloudera hive 0 2016-01-04 11:52 /user/hive/warehouse/orders drwxr-xr-x - cloudera hive 0 2016-01-04 11:52 /user/hive/warehouse/products # schema ls -l *.avsc -rw-rw-r-- 1 cloudera cloudera 541 Jan 4 11:50 sqoop_import_categories.avsc -rw-rw-r-- 1 cloudera cloudera 1324 Jan 4 11:50 sqoop_import_customers.avsc sudo -u hdfs hadoop fs -mkdir /user/examples sudo -u hdfs hadoop fs -chmod +rw /user/examples hadoop fs -copyFromLocal ~/*.avsc /user/examples/
Query Structured Data
using Hue + Impala
CREATE EXTERNAL TABLE categories STORED AS AVRO LOCATION 'hdfs:///user/hive/warehouse/categories' TBLPROPERTIES ('avro.schema.url'='hdfs://quickstart/user/examples/sqoop_import_categories.avsc'); … invalidate metadata; show tables; -- Most popular product categories select c.category_name, count(order_item_quantity) as count from order_items oi inner join products p on oi.order_item_product_id = p.product_id inner join categories c on c.category_id = p.product_category_id group by c.category_name order by count desc limit 10; -- top 10 revenue generating products select p.product_id, p.product_name, r.revenue from products p inner join (select oi.order_item_product_id, sum(cast(oi.order_item_subtotal as float)) as revenue from order_items oi inner join orders o on oi.order_item_order_id = o.order_id where o.order_status <> 'CANCELED' and o.order_status <> 'SUSPECTED_FRAUD' group by order_item_product_id) r on p.product_id = r.order_item_product_id order by r.revenue desc limit 10;
Correlate Structured
Data with Unstructured Data (DB records + web logs)
– Bulk upload, web
log data
sudo -u hdfs hadoop fs -mkdir /user/hive/warehouse/original_access_logs sudo -u hdfs hadoop fs -copyFromLocal /opt/examples/log_files/access.log.2 /user/hive/warehouse/original_access_logs hadoop fs -ls /user/hive/warehouse/original_access_logs Found 1 items -rw-r--r-- 1 hdfs hive 39593868 2016-01-05 08:06 /user/hive/warehouse/original_access_logs/access.log.2
– Build a table in
Hive (Beeline)
First, you'll take
advantage of Hive's flexible SerDe (serializers / deserializers)
to parse the logs into
individual fields using a regular expression
beeline -u jdbc:hive2://quickstart:10000/default -n admin -d org.apache.hive.jdbc.HiveDriver ... jdbc:hive2://quickstart:10000/default> CREATE EXTERNAL TABLE intermediate_access_logs ( ip STRING, date STRING, method STRING, url STRING, http_version STRING, code1 STRING, code2 STRING, dash STRING, user_agent STRING) ROW FORMAT SERDE 'org.apache.hadoop.hive.contrib.serde2.RegexSerDe' WITH SERDEPROPERTIES ( 'input.regex' = '([^ ]*) - - \\[([^\\]]*)\\] "([^\ ]*) ([^\ ]*) ([^\ ]*)" (\\d*) (\\d*) "([^"]*)" "([^"]*)"', 'output.format.string' = "%1$s %2$s %3$s %4$s %5$s %6$s %7$s %8$s %9$s" ) LOCATION '/user/hive/warehouse/original_access_logs';
Second, you'll transfer the data from this intermediate table to one
that does not require any special SerDe
CREATE EXTERNAL TABLE tokenized_access_logs ( ip STRING, date STRING, method STRING, url STRING, http_version STRING, code1 STRING, code2 STRING, dash STRING, user_agent STRING) ROW FORMAT DELIMITED FIELDS TERMINATED BY ',' LOCATION '/user/hive/warehouse/tokenized_access_logs'; ADD JAR /usr/lib/hive/lib/hive-contrib.jar; INSERT OVERWRITE TABLE tokenized_access_logs SELECT * FROM intermediate_access_logs; !quit
– query in Hue +
Impala
invalidate metadata; show tables; select count(*),url from tokenized_access_logs where url like '%\/product\/%' group by url order by count(*) desc;
Pig, PigLatin
– platform for data
processing
– PigLatin: high
level language
– pig execution:
local, MR, Tez
– extensible
– usage: ETL;
manipulating, analysing raw data
– can run as scripts,
embedded programs
interactive shell
example
hdfs dfs -put /etc/passwd /user/cloudera/ pig -x mapreduce … > grunt> A = load '/user/cloudera/passwd' using PigStorage(':'); grunt> B = foreach A generate $0, $4, $5 ; grunt> dump B; … (root,root,/root) (bin,bin,/bin) (daemon,daemon,/sbin) (adm,adm,/var/adm) grunt> store B into 'userinfo.out'; [cloudera@quickstart ~]$ hdfs dfs -ls /user/cloudera/userinfo.out Found 2 items -rw-r--r-- 1 cloudera cloudera 0 2016-01-07 09:00 /user/cloudera/userinfo.out/_SUCCESS -rw-r--r-- 1 cloudera cloudera 1459 2016-01-07 09:00 /user/cloudera/userinfo.out/part-m-00000
Hive
– query, manage data
using HiveQL (SQL-like)
– run interactively
using beeline, other run options (Hcatalog, WebHcat)
– warehouse tools
– exec.environments:
MR, Tez, Spark
– data in HDFS, HBase
– custom
mappers/reducers
– usage: data mining,
analytics; ML; ad hoc analysis
interactive shell
example
$ beeline -u jdbc:hive2:// > create table userinfo (uname STRING, pswd STRING, uid INT, gid INT, fullname STRING, hdir STRING, shell STRING) row format delimited fields terminated by ':' stored as textfile; > load data inpath '/tmp/passwd' overwrite into table userinfo; > select uname, fullname, hdir from userinfo order by uname; … +----------------+-------------------------------+-------------------------------+--+ | uname | fullname | hdir | +----------------+-------------------------------+-------------------------------+--+ | abrt | | /etc/abrt | | adm | adm | /var/adm |
HBase
– non-relational
distributed database on top of HDFS
– compression,
in-memory operations (memstore, blockcache)
– Consistency, high
Availability, Partitioning (auto sharding)
– replication,
security
– SQL-like access
(using Hive, Spark, Impala)
– HBase API, external
API, MR
example, interactively HBase shell
$ hbase shell > create 'userinfotable', {NAME => 'username'}, {NAME => 'fullname'}, {NAME => 'homedir'} put 'userinfotable', 'r1', 'username', 'vcsa' put 'userinfotable', 'r1', 'fullname', 'VirtualMachine Admin' put 'userinfotable', 'r2', 'username', 'sasuser' put 'userinfotable', 'r3', 'username', 'postfix' hbase(main):009:0> scan 'userinfotable' ROW COLUMN+CELL r1 column=fullname:, timestamp=1452193130567, value=VirtualMachine Admin r1 column=username:, timestamp=1452193080935, value=vcsa r2 column=username:, timestamp=1452193088826, value=sasuser r3 column=username:, timestamp=1452193096044, value=postfix hbase(main):011:0> scan 'userinfotable', {COLUMNS => 'fullname'} ROW COLUMN+CELL r1 column=fullname:, timestamp=1452193130567, value=VirtualMachine Admin
HDFS
– bin/hdfs script:
– user commands,
filesystem shell commands
– administrator
commands
– debug commands
– native Java API,
org.apache.hadoop.fs.FileSystem
– C API, libhdfs
– WebHDFS REST API
– NFS gateway
– other options:
Flume, Sqoop, …
commands example
– hdfs dfs -ls /
– hdfs dfs -mkdir
/user/test
dd if=/dev/zero of=sample.txt bs=64M count=16 sudo -u hdfs hdfs dfs -chown -R cloudera /user/test sudo -u hdfs hdfs dfs -put sample.txt /user/test/ hdfs fsck /user/test/sample.txt sudo -u hdfs hdfs dfsadmin -report
HDFS API
–
org.apache.hadoop.fs.FileSystem
– FSDataInputStream,
FSDataOutputStream
– methods: get, open,
create
FileSystem fs = FileSystem.get(URI.create(uri), conf); in = fs.open(new Path(uri)); IOUtils.copyBytes(in, System.out, 4096, false); ...
REST API for HDFS
– in hdfs-site.xml:
dfs.webhdfs.enabled; dfs.web.authentication.kerberos.principal;
...kerberos.keytab
hdfs getconf -confKey dfs.webhdfs.enabled
– example
service hadoop-httpfs start curl -i "http://quickstart.cloudera:14000/webhdfs/v1/user/cloudera?user.name=cloudera&op=GETFILESTATUS" curl -i -X PUT "http://quickstart.cloudera:14000/webhdfs/v1/user/test?user.name=cloudera&op=MKDIRS&permission=755" curl -i "http://quickstart.cloudera:14000/webhdfs/v1/user/test?user.name=cloudera&op=GETCONTENTSUMMARY"
MapReduce
word count example
– mapper take line;
produce pairs (word, 1)
– MR sort mapper
output, feed it to reducer
– reducer take (word,
1); if current word == input word => word count += 1; output
(word, count) if curr. word != inp.word
mapper
for line in sys.stdin: line = line.strip() keys = line.split() for key in keys: value = 1 print('{0}\t{1}'.format(key, value))
reducer
last_key = None running_total = 0 for input_line in sys.stdin: input_line = input_line.strip() this_key, value = input_line.split("\t", 1) value = int(value) if last_key == this_key: running_total += value else: if last_key is not None: print( "{0}\t{1}".format(last_key, running_total) ) last_key = this_key running_total = value if last_key == this_key: print( "{0}\t{1}".format(last_key, running_total))
test and run
> chmod +x wordcount_mapper.py > chmod +x wordcount_reducer.py > echo "A long time ago in a galaxy far far away" > /home/cloudera/testfile1 > echo "Another episode of Star Wars" > /home/cloudera/testfile2 > cat testfile* | ./<mapper.py> | sort | ./<reducer.py> hdfs dfs -put /home/cloudera/testfile1 /user/cloudera/input hdfs dfs -put /home/cloudera/testfile2 /user/cloudera/input hadoop jar /usr/lib/hadoop-mapreduce/hadoop-streaming.jar \ -input /user/cloudera/input \ -output /user/cloudera/output_new \ -mapper /home/cloudera/wordcount_mapper.py \ -reducer /home/cloudera/wordcount_reducer.py \ -numReduceTasks 1 hdfs dfs -cat /user/cloudera/output_new/part-00000 > hdfs dfs -getmerge /user/cloudera/output_new_0/* ./wordcount_num0_output.txt
assignment: MR joining
data
(The show-to-channel relationship is Many-to-Many)
– find all ABC shows;
– group by show name;
– sum counts for each
show
# A: show,cnannel # B: show,count # mapper for line in sys.stdin: line = line.strip() show, channel = line.split(",") count = 'NULL' if channel.isdigit(): count = int(channel) channel = 'NULL' # we interested only in ABC channel if count != 'NULL' or channel == 'ABC': print('%s\t%s\t%s' % (show, channel, count)) # reducer def dumpAccum(): global total if 'ABC' in channels: show = currentShow vnum = sum(counts) total += vnum print("{0} {1}".format(show, vnum)) for line in sys.stdin: line = line.strip() show, channel, count = line.split('\t') if show != currentShow and currentShow: dumpAccum() resetAccum() currentShow = show appendChannel(channel) appendCount(count) dumpAccum() # print total
Spark
features
– on top of HDFS,
YARN but can work standalone on any storage
– fast (cache, no
intermediate HDFS writes)
– interactive (Scala,
Python, R shells)
– iterative
– any DAG workflow
PySpark setup
sudo easy_install ipython==1.2.1 PYSPARK_DRIVER_PYTHON=ipython pyspark
Concepts
– RDD
– transformations
(lazy)
– actions
– caching
– shared variables
(one-way transfer)
RDD: resilient
distributed dataset (from storage, from RDD transformations)
– divided in
partitions (atomic chunks of data), immutable;
– track history of
each partition, re-run if necessary;
a_RDD = sc.parallelize(range(10), 3) b_RDD = a_RDD.map(lambda x: x+1)) # transformation b_RDD.glom.collect() # action
join example
fileA = sc.textFile("input/join1_FileA.txt") fileB = sc.textFile("input/join1_FileB.txt") def split_fileA(line): word, cnt = line.split(',') count = int(cnt) return (word, count) fileA_data = fileA.map(split_fileA) # collect(); take(2) def split_fileB(line): date, val = line.split(' ') word, count_string = val.split(',') return (word, date + " " + count_string) fileB_data = fileB.map(split_fileB) fileB_joined_fileA = fileB_data.join(fileA_data) fileB_joined_fileA.collect()
broadcast variables
– large variable used
in all nodes read-only
– transfer just once
per Executor
– torrent-like
transfer
– good for config,
lookup table, join (if table fits in memory)
conf = sc.broadcast({a: b, c: d}) … conf.value
accumulators
– write-only on nodes
– collect data across
the cluster
acc = sc.accumulator(0) def test(x): acc.add(x) rdd.foreach(test) acc.value
original post http://vasnake.blogspot.com/2016/02/code-for-hadoop-platform-and.html
Комментариев нет:
Отправить комментарий