Вчера вышла заметка
про курс
Hadoop Platform and
Application Framework
by University of
California, San Diego (San Diego Supercomputer Center)
Я обещал оформить куски кода отдельно.
Вот, иллюстративный материал в виде сниппетов.
Cloudera QuickStart VM, practice
username: cloudera,
password: cloudera, hostname: quickstart
Берем образ виртмашины (полный фарш), запускаем, логинимся на рабочий стол. Далее все операции через веб-интерфейс и терминал. В общем, там внутре подробная шпаргалка.
Load data from SQL to
HDFS, Sqoop:
sqoop import-all-tables \
-m 1 \
--connect jdbc:mysql://quickstart:3306/retail_db \
--username=retail_dba \
--password=cloudera \
--compression-codec=snappy \
--as-avrodatafile \
--warehouse-dir=/user/hive/warehouse
hadoop fs -ls /user/hive/warehouse
Found 6 items
drwxr-xr-x - cloudera hive 0 2016-01-04 11:50 /user/hive/warehouse/categories
drwxr-xr-x - cloudera hive 0 2016-01-04 11:51 /user/hive/warehouse/customers
drwxr-xr-x - cloudera hive 0 2016-01-04 11:51 /user/hive/warehouse/departments
drwxr-xr-x - cloudera hive 0 2016-01-04 11:52 /user/hive/warehouse/order_items
drwxr-xr-x - cloudera hive 0 2016-01-04 11:52 /user/hive/warehouse/orders
drwxr-xr-x - cloudera hive 0 2016-01-04 11:52 /user/hive/warehouse/products
# schema
ls -l *.avsc
-rw-rw-r-- 1 cloudera cloudera 541 Jan 4 11:50 sqoop_import_categories.avsc
-rw-rw-r-- 1 cloudera cloudera 1324 Jan 4 11:50 sqoop_import_customers.avsc
sudo -u hdfs hadoop fs -mkdir /user/examples
sudo -u hdfs hadoop fs -chmod +rw /user/examples
hadoop fs -copyFromLocal ~/*.avsc /user/examples/
Query Structured Data
using Hue + Impala
CREATE EXTERNAL TABLE categories STORED AS AVRO
LOCATION 'hdfs:///user/hive/warehouse/categories'
TBLPROPERTIES ('avro.schema.url'='hdfs://quickstart/user/examples/sqoop_import_categories.avsc');
…
invalidate metadata;
show tables;
-- Most popular product categories
select c.category_name, count(order_item_quantity) as count
from order_items oi
inner join products p on oi.order_item_product_id = p.product_id
inner join categories c on c.category_id = p.product_category_id
group by c.category_name
order by count desc
limit 10;
-- top 10 revenue generating products
select p.product_id, p.product_name, r.revenue
from products p inner join
(select oi.order_item_product_id, sum(cast(oi.order_item_subtotal as float)) as revenue
from order_items oi inner join orders o
on oi.order_item_order_id = o.order_id
where o.order_status <> 'CANCELED'
and o.order_status <> 'SUSPECTED_FRAUD'
group by order_item_product_id) r
on p.product_id = r.order_item_product_id
order by r.revenue desc
limit 10;
Correlate Structured
Data with Unstructured Data (DB records + web logs)
– Bulk upload, web
log data
sudo -u hdfs hadoop fs -mkdir /user/hive/warehouse/original_access_logs sudo -u hdfs hadoop fs -copyFromLocal /opt/examples/log_files/access.log.2 /user/hive/warehouse/original_access_logs hadoop fs -ls /user/hive/warehouse/original_access_logs Found 1 items -rw-r--r-- 1 hdfs hive 39593868 2016-01-05 08:06 /user/hive/warehouse/original_access_logs/access.log.2
– Build a table in
Hive (Beeline)
First, you'll take
advantage of Hive's flexible SerDe (serializers / deserializers)
to parse the logs into
individual fields using a regular expression
beeline -u jdbc:hive2://quickstart:10000/default -n admin -d org.apache.hive.jdbc.HiveDriver ... jdbc:hive2://quickstart:10000/default> CREATE EXTERNAL TABLE intermediate_access_logs ( ip STRING, date STRING, method STRING, url STRING, http_version STRING, code1 STRING, code2 STRING, dash STRING, user_agent STRING) ROW FORMAT SERDE 'org.apache.hadoop.hive.contrib.serde2.RegexSerDe' WITH SERDEPROPERTIES ( 'input.regex' = '([^ ]*) - - \\[([^\\]]*)\\] "([^\ ]*) ([^\ ]*) ([^\ ]*)" (\\d*) (\\d*) "([^"]*)" "([^"]*)"', 'output.format.string' = "%1$s %2$s %3$s %4$s %5$s %6$s %7$s %8$s %9$s" ) LOCATION '/user/hive/warehouse/original_access_logs';
Second, you'll transfer the data from this intermediate table to one
that does not require any special SerDe
CREATE EXTERNAL TABLE tokenized_access_logs ( ip STRING, date STRING, method STRING, url STRING, http_version STRING, code1 STRING, code2 STRING, dash STRING, user_agent STRING) ROW FORMAT DELIMITED FIELDS TERMINATED BY ',' LOCATION '/user/hive/warehouse/tokenized_access_logs'; ADD JAR /usr/lib/hive/lib/hive-contrib.jar; INSERT OVERWRITE TABLE tokenized_access_logs SELECT * FROM intermediate_access_logs; !quit
– query in Hue +
Impala
invalidate metadata; show tables; select count(*),url from tokenized_access_logs where url like '%\/product\/%' group by url order by count(*) desc;
Pig, PigLatin
– platform for data
processing
– PigLatin: high
level language
– pig execution:
local, MR, Tez
– extensible
– usage: ETL;
manipulating, analysing raw data
– can run as scripts,
embedded programs
interactive shell
example
hdfs dfs -put /etc/passwd /user/cloudera/
pig -x mapreduce
… >
grunt> A = load '/user/cloudera/passwd' using PigStorage(':');
grunt> B = foreach A generate $0, $4, $5 ;
grunt> dump B;
…
(root,root,/root)
(bin,bin,/bin)
(daemon,daemon,/sbin)
(adm,adm,/var/adm)
grunt> store B into 'userinfo.out';
[cloudera@quickstart ~]$ hdfs dfs -ls /user/cloudera/userinfo.out
Found 2 items
-rw-r--r-- 1 cloudera cloudera 0 2016-01-07 09:00 /user/cloudera/userinfo.out/_SUCCESS
-rw-r--r-- 1 cloudera cloudera 1459 2016-01-07 09:00 /user/cloudera/userinfo.out/part-m-00000
Hive
– query, manage data
using HiveQL (SQL-like)
– run interactively
using beeline, other run options (Hcatalog, WebHcat)
– warehouse tools
– exec.environments:
MR, Tez, Spark
– data in HDFS, HBase
– custom
mappers/reducers
– usage: data mining,
analytics; ML; ad hoc analysis
interactive shell
example
$ beeline -u jdbc:hive2:// > create table userinfo (uname STRING, pswd STRING, uid INT, gid INT, fullname STRING, hdir STRING, shell STRING) row format delimited fields terminated by ':' stored as textfile; > load data inpath '/tmp/passwd' overwrite into table userinfo; > select uname, fullname, hdir from userinfo order by uname; … +----------------+-------------------------------+-------------------------------+--+ | uname | fullname | hdir | +----------------+-------------------------------+-------------------------------+--+ | abrt | | /etc/abrt | | adm | adm | /var/adm |
HBase
– non-relational
distributed database on top of HDFS
– compression,
in-memory operations (memstore, blockcache)
– Consistency, high
Availability, Partitioning (auto sharding)
– replication,
security
– SQL-like access
(using Hive, Spark, Impala)
– HBase API, external
API, MR
example, interactively HBase shell
$ hbase shell
>
create 'userinfotable', {NAME => 'username'}, {NAME => 'fullname'}, {NAME => 'homedir'}
put 'userinfotable', 'r1', 'username', 'vcsa'
put 'userinfotable', 'r1', 'fullname', 'VirtualMachine Admin'
put 'userinfotable', 'r2', 'username', 'sasuser'
put 'userinfotable', 'r3', 'username', 'postfix'
hbase(main):009:0> scan 'userinfotable'
ROW COLUMN+CELL
r1 column=fullname:, timestamp=1452193130567, value=VirtualMachine Admin
r1 column=username:, timestamp=1452193080935, value=vcsa
r2 column=username:, timestamp=1452193088826, value=sasuser
r3 column=username:, timestamp=1452193096044, value=postfix
hbase(main):011:0> scan 'userinfotable', {COLUMNS => 'fullname'}
ROW COLUMN+CELL
r1 column=fullname:, timestamp=1452193130567, value=VirtualMachine Admin
HDFS
– bin/hdfs script:
– user commands,
filesystem shell commands
– administrator
commands
– debug commands
– native Java API,
org.apache.hadoop.fs.FileSystem
– C API, libhdfs
– WebHDFS REST API
– NFS gateway
– other options:
Flume, Sqoop, …
commands example
– hdfs dfs -ls /
– hdfs dfs -mkdir
/user/test
dd if=/dev/zero of=sample.txt bs=64M count=16 sudo -u hdfs hdfs dfs -chown -R cloudera /user/test sudo -u hdfs hdfs dfs -put sample.txt /user/test/ hdfs fsck /user/test/sample.txt sudo -u hdfs hdfs dfsadmin -report
HDFS API
–
org.apache.hadoop.fs.FileSystem
– FSDataInputStream,
FSDataOutputStream
– methods: get, open,
create
FileSystem fs = FileSystem.get(URI.create(uri), conf); in = fs.open(new Path(uri)); IOUtils.copyBytes(in, System.out, 4096, false); ...
REST API for HDFS
– in hdfs-site.xml:
dfs.webhdfs.enabled; dfs.web.authentication.kerberos.principal;
...kerberos.keytab
hdfs getconf -confKey dfs.webhdfs.enabled
– example
service hadoop-httpfs start curl -i "http://quickstart.cloudera:14000/webhdfs/v1/user/cloudera?user.name=cloudera&op=GETFILESTATUS" curl -i -X PUT "http://quickstart.cloudera:14000/webhdfs/v1/user/test?user.name=cloudera&op=MKDIRS&permission=755" curl -i "http://quickstart.cloudera:14000/webhdfs/v1/user/test?user.name=cloudera&op=GETCONTENTSUMMARY"
MapReduce
word count example
– mapper take line;
produce pairs (word, 1)
– MR sort mapper
output, feed it to reducer
– reducer take (word,
1); if current word == input word => word count += 1; output
(word, count) if curr. word != inp.word
mapper
for line in sys.stdin:
line = line.strip()
keys = line.split()
for key in keys:
value = 1
print('{0}\t{1}'.format(key, value))
reducer
last_key = None
running_total = 0
for input_line in sys.stdin:
input_line = input_line.strip()
this_key, value = input_line.split("\t", 1)
value = int(value)
if last_key == this_key:
running_total += value
else:
if last_key is not None:
print( "{0}\t{1}".format(last_key, running_total) )
last_key = this_key
running_total = value
if last_key == this_key:
print( "{0}\t{1}".format(last_key, running_total))
test and run
> chmod +x wordcount_mapper.py > chmod +x wordcount_reducer.py > echo "A long time ago in a galaxy far far away" > /home/cloudera/testfile1 > echo "Another episode of Star Wars" > /home/cloudera/testfile2 > cat testfile* | ./<mapper.py> | sort | ./<reducer.py> hdfs dfs -put /home/cloudera/testfile1 /user/cloudera/input hdfs dfs -put /home/cloudera/testfile2 /user/cloudera/input hadoop jar /usr/lib/hadoop-mapreduce/hadoop-streaming.jar \ -input /user/cloudera/input \ -output /user/cloudera/output_new \ -mapper /home/cloudera/wordcount_mapper.py \ -reducer /home/cloudera/wordcount_reducer.py \ -numReduceTasks 1 hdfs dfs -cat /user/cloudera/output_new/part-00000 > hdfs dfs -getmerge /user/cloudera/output_new_0/* ./wordcount_num0_output.txt
assignment: MR joining
data
(The show-to-channel relationship is Many-to-Many)
– find all ABC shows;
– group by show name;
– sum counts for each
show
# A: show,cnannel
# B: show,count
# mapper
for line in sys.stdin:
line = line.strip()
show, channel = line.split(",")
count = 'NULL'
if channel.isdigit():
count = int(channel)
channel = 'NULL'
# we interested only in ABC channel
if count != 'NULL' or channel == 'ABC':
print('%s\t%s\t%s' % (show, channel, count))
# reducer
def dumpAccum():
global total
if 'ABC' in channels:
show = currentShow
vnum = sum(counts)
total += vnum
print("{0} {1}".format(show, vnum))
for line in sys.stdin:
line = line.strip()
show, channel, count = line.split('\t')
if show != currentShow and currentShow:
dumpAccum()
resetAccum()
currentShow = show
appendChannel(channel)
appendCount(count)
dumpAccum()
# print total
Spark
features
– on top of HDFS,
YARN but can work standalone on any storage
– fast (cache, no
intermediate HDFS writes)
– interactive (Scala,
Python, R shells)
– iterative
– any DAG workflow
PySpark setup
sudo easy_install ipython==1.2.1 PYSPARK_DRIVER_PYTHON=ipython pyspark
Concepts
– RDD
– transformations
(lazy)
– actions
– caching
– shared variables
(one-way transfer)
RDD: resilient
distributed dataset (from storage, from RDD transformations)
– divided in
partitions (atomic chunks of data), immutable;
– track history of
each partition, re-run if necessary;
a_RDD = sc.parallelize(range(10), 3) b_RDD = a_RDD.map(lambda x: x+1)) # transformation b_RDD.glom.collect() # action
join example
fileA = sc.textFile("input/join1_FileA.txt")
fileB = sc.textFile("input/join1_FileB.txt")
def split_fileA(line):
word, cnt = line.split(',')
count = int(cnt)
return (word, count)
fileA_data = fileA.map(split_fileA)
# collect(); take(2)
def split_fileB(line):
date, val = line.split(' ')
word, count_string = val.split(',')
return (word, date + " " + count_string)
fileB_data = fileB.map(split_fileB)
fileB_joined_fileA = fileB_data.join(fileA_data)
fileB_joined_fileA.collect()
broadcast variables
– large variable used
in all nodes read-only
– transfer just once
per Executor
– torrent-like
transfer
– good for config,
lookup table, join (if table fits in memory)
conf = sc.broadcast({a: b, c: d})
…
conf.value
accumulators
– write-only on nodes
– collect data across
the cluster
acc = sc.accumulator(0) def test(x): acc.add(x) rdd.foreach(test) acc.value
original post http://vasnake.blogspot.com/2016/02/code-for-hadoop-platform-and.html

Комментариев нет:
Отправить комментарий