Записки программиста, обо всем и ни о чем. Но, наверное, больше профессионального.

2016-02-19

Code for 'Hadoop Platform and Application Framework'

Вчера вышла заметка 
про курс
Hadoop Platform and Application Framework
by University of California, San Diego (San Diego Supercomputer Center)

Я обещал оформить куски кода отдельно.
Вот, иллюстративный материал в виде сниппетов.

Cloudera QuickStart VM, practice

username: cloudera, password: cloudera, hostname: quickstart

Берем образ виртмашины (полный фарш), запускаем, логинимся на рабочий стол. Далее все операции  через веб-интерфейс и терминал. В общем, там внутре подробная шпаргалка.

Load data from SQL to HDFS, Sqoop:
sqoop import-all-tables \
    -m 1 \
    --connect jdbc:mysql://quickstart:3306/retail_db \
    --username=retail_dba \
    --password=cloudera \
    --compression-codec=snappy \
    --as-avrodatafile \
    --warehouse-dir=/user/hive/warehouse

hadoop fs -ls /user/hive/warehouse
Found 6 items
drwxr-xr-x   - cloudera hive          0 2016-01-04 11:50 /user/hive/warehouse/categories
drwxr-xr-x   - cloudera hive          0 2016-01-04 11:51 /user/hive/warehouse/customers
drwxr-xr-x   - cloudera hive          0 2016-01-04 11:51 /user/hive/warehouse/departments
drwxr-xr-x   - cloudera hive          0 2016-01-04 11:52 /user/hive/warehouse/order_items
drwxr-xr-x   - cloudera hive          0 2016-01-04 11:52 /user/hive/warehouse/orders
drwxr-xr-x   - cloudera hive          0 2016-01-04 11:52 /user/hive/warehouse/products

# schema 
ls -l *.avsc
-rw-rw-r-- 1 cloudera cloudera  541 Jan  4 11:50 sqoop_import_categories.avsc
-rw-rw-r-- 1 cloudera cloudera 1324 Jan  4 11:50 sqoop_import_customers.avsc

sudo -u hdfs hadoop fs -mkdir /user/examples
sudo -u hdfs hadoop fs -chmod +rw /user/examples
hadoop fs -copyFromLocal ~/*.avsc /user/examples/

Query Structured Data using Hue + Impala
CREATE EXTERNAL TABLE categories STORED AS AVRO
LOCATION 'hdfs:///user/hive/warehouse/categories'
TBLPROPERTIES ('avro.schema.url'='hdfs://quickstart/user/examples/sqoop_import_categories.avsc');
… 
invalidate metadata;
show tables;

-- Most popular product categories
select c.category_name, count(order_item_quantity) as count
from order_items oi
inner join products p on oi.order_item_product_id = p.product_id
inner join categories c on c.category_id = p.product_category_id
group by c.category_name
order by count desc
limit 10;

-- top 10 revenue generating products
select p.product_id, p.product_name, r.revenue
from products p inner join
(select oi.order_item_product_id, sum(cast(oi.order_item_subtotal as float)) as revenue
from order_items oi inner join orders o
on oi.order_item_order_id = o.order_id
where o.order_status <> 'CANCELED'
and o.order_status <> 'SUSPECTED_FRAUD'
group by order_item_product_id) r
on p.product_id = r.order_item_product_id
order by r.revenue desc
limit 10;

Correlate Structured Data with Unstructured Data (DB records + web logs)
Bulk upload, web log data
sudo -u hdfs hadoop fs -mkdir /user/hive/warehouse/original_access_logs
sudo -u hdfs hadoop fs -copyFromLocal /opt/examples/log_files/access.log.2 /user/hive/warehouse/original_access_logs
hadoop fs -ls /user/hive/warehouse/original_access_logs
Found 1 items
-rw-r--r--   1 hdfs hive   39593868 2016-01-05 08:06 /user/hive/warehouse/original_access_logs/access.log.2

Build a table in Hive (Beeline)
First, you'll take advantage of Hive's flexible SerDe (serializers / deserializers)
to parse the logs into individual fields using a regular expression
beeline -u jdbc:hive2://quickstart:10000/default -n admin -d org.apache.hive.jdbc.HiveDriver
...
jdbc:hive2://quickstart:10000/default> 
CREATE EXTERNAL TABLE intermediate_access_logs (
 ip STRING,
 date STRING,
 method STRING,
 url STRING,
 http_version STRING,
 code1 STRING,
 code2 STRING,
 dash STRING,
 user_agent STRING)
 ROW FORMAT SERDE 'org.apache.hadoop.hive.contrib.serde2.RegexSerDe'
 WITH SERDEPROPERTIES (
 'input.regex' = '([^ ]*) - - \\[([^\\]]*)\\] "([^\ ]*) ([^\ ]*) ([^\ ]*)" (\\d*) (\\d*) "([^"]*)" "([^"]*)"',
 'output.format.string' = "%1$s %2$s %3$s %4$s %5$s %6$s %7$s %8$s %9$s"
 )
 LOCATION '/user/hive/warehouse/original_access_logs';

Second, you'll transfer the data from this intermediate table to one that does not require any special SerDe
CREATE EXTERNAL TABLE tokenized_access_logs (
 ip STRING,
 date STRING,
 method STRING,
 url STRING,
 http_version STRING,
 code1 STRING,
 code2 STRING,
 dash STRING,
 user_agent STRING)
 ROW FORMAT DELIMITED FIELDS TERMINATED BY ','
 LOCATION '/user/hive/warehouse/tokenized_access_logs';

ADD JAR /usr/lib/hive/lib/hive-contrib.jar;
INSERT OVERWRITE TABLE tokenized_access_logs SELECT * FROM intermediate_access_logs;
!quit

query in Hue + Impala
invalidate metadata;
show tables;

select count(*),url from tokenized_access_logs
where url like '%\/product\/%'
group by url order by count(*) desc;


Pig, PigLatin

– platform for data processing
– PigLatin: high level language
– pig execution: local, MR, Tez
– extensible
– usage: ETL; manipulating, analysing raw data
– can run as scripts, embedded programs

interactive shell example
hdfs dfs -put /etc/passwd /user/cloudera/
pig -x mapreduce
… >
grunt> A = load '/user/cloudera/passwd' using PigStorage(':');
grunt> B = foreach A generate $0, $4, $5 ;
grunt> dump B;
…
(root,root,/root)
(bin,bin,/bin)
(daemon,daemon,/sbin)
(adm,adm,/var/adm)

grunt> store B into 'userinfo.out';

[cloudera@quickstart ~]$ hdfs dfs -ls /user/cloudera/userinfo.out
Found 2 items
-rw-r--r--   1 cloudera cloudera          0 2016-01-07 09:00 /user/cloudera/userinfo.out/_SUCCESS
-rw-r--r--   1 cloudera cloudera       1459 2016-01-07 09:00 /user/cloudera/userinfo.out/part-m-00000

Hive

– query, manage data using HiveQL (SQL-like)
– run interactively using beeline, other run options (Hcatalog, WebHcat)
– warehouse tools
– exec.environments: MR, Tez, Spark
– data in HDFS, HBase
– custom mappers/reducers
– usage: data mining, analytics; ML; ad hoc analysis

interactive shell example
$ beeline -u jdbc:hive2://
> create table userinfo (uname STRING, pswd STRING, uid INT, gid INT, fullname STRING, hdir STRING, shell STRING)
 row format delimited fields terminated by ':' stored as textfile;
> load data inpath '/tmp/passwd' overwrite into table userinfo;
> select uname, fullname, hdir from userinfo order by uname;
…
+----------------+-------------------------------+-------------------------------+--+
|     uname      |           fullname            |             hdir              |
+----------------+-------------------------------+-------------------------------+--+
| abrt           |                               | /etc/abrt                     |
| adm            | adm                           | /var/adm                      |

HBase

– non-relational distributed database on top of HDFS
– compression, in-memory operations (memstore, blockcache)
– Consistency, high Availability, Partitioning (auto sharding)
– replication, security
– SQL-like access (using Hive, Spark, Impala)
– HBase API, external API, MR

example, interactively HBase shell
$ hbase shell
> 
create 'userinfotable', {NAME => 'username'}, {NAME => 'fullname'}, {NAME => 'homedir'} 
put 'userinfotable', 'r1', 'username', 'vcsa' 
put 'userinfotable', 'r1', 'fullname', 'VirtualMachine Admin'
put 'userinfotable', 'r2', 'username', 'sasuser'
put 'userinfotable', 'r3', 'username', 'postfix'

hbase(main):009:0> scan 'userinfotable'
ROW                                  COLUMN+CELL                                                                                              
 r1                                  column=fullname:, timestamp=1452193130567, value=VirtualMachine Admin                                    
 r1                                  column=username:, timestamp=1452193080935, value=vcsa                                                    
 r2                                  column=username:, timestamp=1452193088826, value=sasuser                                                 
 r3                                  column=username:, timestamp=1452193096044, value=postfix  

hbase(main):011:0> scan 'userinfotable', {COLUMNS => 'fullname'}
ROW                                  COLUMN+CELL                                                                                              
 r1                                  column=fullname:, timestamp=1452193130567, value=VirtualMachine Admin

HDFS 

– bin/hdfs script:
– user commands, filesystem shell commands
– administrator commands
– debug commands
– native Java API, org.apache.hadoop.fs.FileSystem
– C API, libhdfs
– WebHDFS REST API
– NFS gateway
– other options: Flume, Sqoop, …

commands example
– hdfs dfs -ls /
– hdfs dfs -mkdir /user/test
dd if=/dev/zero of=sample.txt bs=64M count=16
sudo -u hdfs hdfs dfs -chown -R cloudera /user/test
sudo -u hdfs hdfs dfs -put sample.txt /user/test/
hdfs fsck /user/test/sample.txt
sudo -u hdfs hdfs dfsadmin -report

HDFS API
– org.apache.hadoop.fs.FileSystem
– FSDataInputStream, FSDataOutputStream
– methods: get, open, create
FileSystem fs = FileSystem.get(URI.create(uri), conf);
in = fs.open(new Path(uri)); 
IOUtils.copyBytes(in, System.out, 4096, false);
...

REST API for HDFS
– in hdfs-site.xml: dfs.webhdfs.enabled; dfs.web.authentication.kerberos.principal; ...kerberos.keytab
hdfs getconf -confKey dfs.webhdfs.enabled
– example
service hadoop-httpfs start
curl -i "http://quickstart.cloudera:14000/webhdfs/v1/user/cloudera?user.name=cloudera&op=GETFILESTATUS" 
curl -i -X PUT "http://quickstart.cloudera:14000/webhdfs/v1/user/test?user.name=cloudera&op=MKDIRS&permission=755" 
curl -i "http://quickstart.cloudera:14000/webhdfs/v1/user/test?user.name=cloudera&op=GETCONTENTSUMMARY" 

MapReduce

word count example
– mapper take line; produce pairs (word, 1)
– MR sort mapper output, feed it to reducer
– reducer take (word, 1); if current word == input word => word count += 1; output (word, count) if curr. word != inp.word

mapper
for line in sys.stdin:  
    line = line.strip()
    keys = line.split()
    for key in keys:
        value = 1        
        print('{0}\t{1}'.format(key, value))

reducer
last_key      = None
running_total = 0

for input_line in sys.stdin:
    input_line = input_line.strip()
    this_key, value = input_line.split("\t", 1)
    value = int(value)

    if last_key == this_key:
        running_total += value
    else:
        if last_key is not None:
            print( "{0}\t{1}".format(last_key, running_total) )
        last_key = this_key
        running_total = value

if last_key == this_key:
    print( "{0}\t{1}".format(last_key, running_total)) 

test and run
> chmod +x wordcount_mapper.py
> chmod +x wordcount_reducer.py
> echo "A long time ago in a galaxy far far away" > /home/cloudera/testfile1
> echo "Another episode of Star Wars" > /home/cloudera/testfile2
> cat testfile* | ./<mapper.py> | sort | ./<reducer.py>
hdfs dfs -put /home/cloudera/testfile1 /user/cloudera/input
hdfs dfs -put /home/cloudera/testfile2 /user/cloudera/input
hadoop jar /usr/lib/hadoop-mapreduce/hadoop-streaming.jar \
   -input /user/cloudera/input \
   -output /user/cloudera/output_new \
   -mapper /home/cloudera/wordcount_mapper.py \
   -reducer /home/cloudera/wordcount_reducer.py \
   -numReduceTasks 1
hdfs dfs -cat /user/cloudera/output_new/part-00000
> hdfs dfs -getmerge /user/cloudera/output_new_0/* ./wordcount_num0_output.txt

assignment: MR joining data 
(The show-to-channel relationship is Many-to-Many)
– find all ABC shows;
– group by show name;
– sum counts for each show
# A: show,cnannel
# B: show,count
# mapper 
for line in sys.stdin:
    line = line.strip()
    show, channel = line.split(",")
    count = 'NULL'
    if channel.isdigit():
        count = int(channel)
        channel = 'NULL'
    # we interested only in ABC channel
    if count != 'NULL' or channel == 'ABC':
        print('%s\t%s\t%s' % (show, channel, count))

# reducer 
def dumpAccum():
    global total
    if 'ABC' in channels:
        show = currentShow
        vnum = sum(counts)
        total += vnum
        print("{0} {1}".format(show, vnum))
    
for line in sys.stdin:
    line = line.strip()
    show, channel, count = line.split('\t')
    if show != currentShow and currentShow:
        dumpAccum()
        resetAccum()
    currentShow = show
    appendChannel(channel)
    appendCount(count)
    
dumpAccum()
# print total

Spark

features
– on top of HDFS, YARN but can work standalone on any storage
– fast (cache, no intermediate HDFS writes)
– interactive (Scala, Python, R shells)
– iterative
– any DAG workflow

PySpark setup
sudo easy_install ipython==1.2.1
PYSPARK_DRIVER_PYTHON=ipython
pyspark

Concepts
– RDD
– transformations (lazy)
– actions
– caching
– shared variables (one-way transfer)

RDD: resilient distributed dataset (from storage, from RDD transformations)
– divided in partitions (atomic chunks of data), immutable;
– track history of each partition, re-run if necessary;
a_RDD = sc.parallelize(range(10), 3) 
b_RDD = a_RDD.map(lambda x: x+1)) # transformation 
b_RDD.glom.collect() # action 

join example
fileA = sc.textFile("input/join1_FileA.txt")
fileB = sc.textFile("input/join1_FileB.txt")
def split_fileA(line):
    word, cnt = line.split(',')
    count = int(cnt)
    return (word, count)
fileA_data = fileA.map(split_fileA)
# collect(); take(2)
def split_fileB(line):
    date, val = line.split(' ')
    word, count_string = val.split(',')
    return (word, date + " " + count_string)
fileB_data = fileB.map(split_fileB)
fileB_joined_fileA = fileB_data.join(fileA_data)
fileB_joined_fileA.collect()

broadcast variables
– large variable used in all nodes read-only
– transfer just once per Executor
– torrent-like transfer
– good for config, lookup table, join (if table fits in memory)
conf = sc.broadcast({a: b, c: d})
…
conf.value 

accumulators
– write-only on nodes
– collect data across the cluster
acc = sc.accumulator(0) 
def test(x): acc.add(x) 
rdd.foreach(test)
acc.value







original post http://vasnake.blogspot.com/2016/02/code-for-hadoop-platform-and.html

Комментариев нет:

Отправить комментарий

Архив блога

Ярлыки

linux (241) python (191) citation (185) web-develop (170) gov.ru (157) video (123) бытовуха (112) sysadm (100) GIS (97) Zope(Plone) (88) Book (81) programming (81) бурчалки (81) грабли (77) development (73) Fun (72) windsurfing (72) Microsoft (64) hiload (62) opensource (58) internet provider (57) security (57) опыт (55) movie (52) Wisdom (51) ML (47) language (45) hardware (44) JS (41) curse (40) driving (40) money (40) DBMS (38) bigdata (38) ArcGIS (34) history (31) PDA (30) howto (30) holyday (29) Google (27) Oracle (27) virtbox (27) health (26) vacation (24) AI (23) Autodesk (23) SQL (23) Java (22) humor (22) knowledge (22) translate (20) CSS (19) cheatsheet (19) hack (19) tourism (19) Apache (16) Manager (15) web-browser (15) Никонов (15) happiness (14) music (14) todo (14) PHP (13) course (13) functional programming (13) weapon (13) HTTP. Apache (12) SSH (12) frameworks (12) hero (12) im (12) settings (12) HTML (11) SciTE (11) crypto (11) game (11) map (11) scala (11) HTTPD (9) ODF (9) купи/продай (9) benchmark (8) documentation (8) 3D (7) CS (7) DNS (7) NoSQL (7) Photo (7) cloud (7) django (7) gun (7) matroska (7) telephony (7) Microsoft Office (6) VCS (6) bluetooth (6) pidgin (6) proxy (6) Donald Knuth (5) ETL (5) NVIDIA (5) REST (5) bash (5) flash (5) keyboard (5) price (5) samba (5) CGI (4) LISP (4) RoR (4) cache (4) display (4) holywar (4) nginx (4) pistol (4) xml (4) Лебедев (4) IDE (3) IE8 (3) J2EE (3) NTFS (3) RDP (3) USA (3) holiday (3) mount (3) spark (3) Гоблин (3) кухня (3) урюк (3) AMQP (2) ERP (2) IE7 (2) NAS (2) Naudoc (2) PDF (2) address (2) air (2) british (2) coffee (2) font (2) ftp (2) messaging (2) notify (2) sharepoint (2) ssl/tls (2) stardict (2) tests (2) tunnel (2) udev (2) APT (1) CRUD (1) Canyonlands (1) Cyprus (1) DVDShrink (1) Jabber (1) K9Copy (1) Matlab (1) Palanga (1) Portugal (1) VBA (1) WD My Book (1) autoit (1) bike (1) cannabis (1) chat (1) concurrent (1) dbf (1) ext4 (1) idioten (1) krusader (1) license (1) mindmap (1) pneumatic weapon (1) quiz (1) regexp (1) robot (1) science (1) serialization (1) tie (1) vim (1) Науру (1) крысы (1) налоги (1) пианино (1)

Google+ Followers