Записки программиста, обо всем и ни о чем. Но, наверное, больше профессионального.

2016-02-19

Code for 'Data Manipulation at Scale: Systems and Algorithms'

Как и обещал, выкладываю сниппеты к посту
http://vasnake.blogspot.com/2016/02/data-manipulation-at-scale-systems-and.html
про курс:

Data Manipulation at Scale: Systems and Algorithms
University of Washington



Assignment: Tweets sentiment analisys
– sentiment of each tweet: sum(sentiments for each word)
– sentiment of new word: numPositiveTweets — numNegativeTweets
– word frequency: wordCount / numWords
– happiest state: tweetsForState; max(sumSentiment / numTweets)
– top ten hash tags: sort(tagCount / totalTags)

# frequency.py
import sys
import json
import re

def parseTweetFile(fp):
    """Return iterator, item is a dict object
    """
    for line in fp:
        tw = json.loads(line)
        yield tw

def tweetText(tweet):
    """Return text from tweet or ''
    """
    return tweet.get('text', '')

def getTerms(text):
    """Return list of words.
    Each word is lowercase and cleared from non alphabet symbols.
    """
    if not text:
        return []

    pat = '[^A-Za-z]+'
    clean = re.sub(pat, ' ', text)
    lst = clean.split()
    res = [x.strip().lower() for x in lst]
    return res

def wordOccurences(tweets):
    """Return dict with records:
        term: num of occurences
    """
    res = {}
    total = 0
    for tw in tweets:
        text = tweetText(tw)
        terms = getTerms(text)
        for term in terms:
            cnt = res.get(term, 0)
            res[term] = cnt + 1
            total += 1
    return res, total

def main():
    tweet_file = open(sys.argv[1])
    tweets = parseTweetFile(tweet_file)
    db, total = wordOccurences(tweets)
    for rec in db.items():
        term, occ = rec
        freq = float(occ) / float(total)
        print("%s %f" % (term, freq))

if __name__ == '__main__':
    main()

# happiest_state.py
import sys
import json
import re

def getUSStates():
    """Return (abr, states)
    """
    st = {
        'AK': 'Alaska',
        'AL': 'Alabama',
...
        'WV': 'West Virginia',
        'WY': 'Wyoming'
    }
    abr = [(key.lower(), key) for key, val in st.items()]
    abr = dict(abr)
    states = [(val.lower(), key) for key, val in st.items()]
    states = dict(states)
    return (abr, states)

ST_ABBR, ST_NAMES = getUSStates()

def parseSentFile(fp):
    """Return dictionary word: score
    """
    res = {}
    for line in fp:
        term, score = line.split('\t')
        res[term] = int(score)
    return res

def totalSentiment(terms, scores):
    """Return sentiment score for word list
    """
    res = 0
    for term in terms:
        score = scores.get(term, 0)
        res += score
    return res

def splitFullName(fn):
    """Return tuple (city, state) or ('', '')
    """
    res = ('', '')
    lst = fn.split(',')
    cleanLst = [x.strip() for x in lst if x.strip()]
    if len(cleanLst) == 2:
        res = (cleanLst[0], cleanLst[1])
    return res

def detectState(tweet):
    """Return two letter USA state name or ''
    """
    res = ''
    place = tweet.get('place', None)
    if place is not None:
        country = place.get('country', '')
        country_code = place.get('country_code', '')
        full_name = place.get('full_name', '')
        if country == 'Unated States' or country_code == 'US':
            fullName = splitFullName(full_name)
            city, state = fullName
            res = ST_ABBR.get(state.lower(), '')
            if not res:
                res = ST_NAMES.get(city.lower(), '')
    return res

def tweetsForStates(tweets, scores):
    """Return dict with records:
        state: (totalScore, numTweets)
    """
    res = {}
    for tw in tweets:
        text = tweetText(tw)
        if text:
            terms = getTerms(text)
            twScore = totalSentiment(terms, scores)
            state = detectState(tw)
            tot, num = res.get(state, (0, 0))
            res[state] = (tot + twScore, num + 1)
    return res

def main():
    sent_file = open(sys.argv[1])
    tweet_file = open(sys.argv[2])
    scores = parseSentFile(sent_file)
    tweets = parseTweetFile(tweet_file)
    db = tweetsForStates(tweets, scores)
    curr = sys.float_info.min
    happystate = 'undetected'
    for rec in db.items():
        state, data = rec
        score, num = data
        aver = float(score) / float(num)
        if aver > curr and state:
            curr = aver
            happystate = state
    print happystate

if __name__ == '__main__':
    main()

# term_sentiment.py
import sys
import json
import re

def parseSentFile(fp):
    """Return dictionary word: score
    """
    res = {}
    for line in fp:
        term, score = line.split('\t')
        res[term] = int(score)
    return res

def totalSentiment(terms, scores):
    """Return sentiment score for word list
    """
    res = 0
    for term in terms:
        score = scores.get(term, 0)
        res += score
    return res

def calcTermsSentiment(tweets, scores):
    """Return dict with records:
        term: (pos, neg, tot)
    """
    res = {}
    for tw in tweets:
        text = tweetText(tw)
        terms = getTerms(text)
        twScore = totalSentiment(terms, scores)
        for term in terms:
            score = scores.get(term, None)
            if score is None:
                pos, neg, tot = res.get(term, (0, 0, 0))
                if twScore > 0:
                    pos += 1
                elif twScore < 0:
                    neg +=1
                tot += twScore
                res[term] = (pos, neg, tot)
    return res

def termSentiment(pos, neg, tot):
    """Return float: term sentiment
    """
    return float(pos - neg)
    if neg == 0:
        neg = 1
    return float(pos) / float(neg)

def main():
    sent_file = open(sys.argv[1])
    tweet_file = open(sys.argv[2])
    scores = parseSentFile(sent_file)
    tweets = parseTweetFile(tweet_file)
    db = calcTermsSentiment(tweets, scores)
    for rec in db.items():
        term, counts = rec
        pos, neg, tot = counts
        sent = termSentiment(pos, neg, tot)
        print("%s %f" % (term, sent))

if __name__ == '__main__':
    main()

# top_ten.py
import sys
import json
import re
import collections
import heapq

def getTags(tweet):
    """Return list of hashtags or []
    """
    ents = tweet.get('entities', {})
    tags = ents.get('hashtags', [])
    res = [x['text'] for x in tags]
    return res

def hashtagOccurences(tweets):
    """Return dict with records:
        hashtag: num of occurences
    """
    res = collections.defaultdict(int)
    total = 0
    for tw in tweets:
        tags = getTags(tw)
        for tag in tags:
            res[tag] += 1
            total += 1
    return res, total

def main():
    tweet_file = open(sys.argv[1])
    tweets = parseTweetFile(tweet_file)
    db, total = hashtagOccurences(tweets)
    tagfreq = []
    for rec in db.items():
        tag, occ = rec
        freq = float(occ) / float(total)
        tagfreq.append((tag, occ))
    tt = heapq.nlargest(10, tagfreq, key = lambda (t,f): f)
    for rec in tt:
        tag, freq = rec
        print("%s %d" % (tag, freq))

if __name__ == '__main__':
    main()

# tweet_sentiment.py
import sys
import json
import re

def getTermScore(term, db):
    """Return term sentiment score from db
    """
    res = db.get(term, 0)
    return res

def calcTweetSentiment(tweet, db):
    """Return int: tweet sentiment score.
    If tweet is not really a tweet (no text in it), return None
    """
    res = None
    if 'text' in tweet:
        res = 0
        text = tweet['text']
        terms = getTerms(text)
        for term in terms:
            score = getTermScore(term, db)
            res += score
    return res

def main():
    sent_file = open(sys.argv[1])
    tweet_file = open(sys.argv[2])
    scores = parseSentFile(sent_file)
    tweets = parseTweetFile(tweet_file)
    for tw in tweets:
        score = calcTweetSentiment(tw, scores)
        if score is not None:
            print score
        else:
            print 0

if __name__ == '__main__':
    main()


SQL assignment
– sql for RA expression 'πtermdocid=10398_txt_earn and count=1(frequency))'
select term from frequency where docid = '10398_txt_earn' and count = 1

– RA 'πtermdocid=10398_txt_earn and count=1(frequency)) U πtermdocid=925_txt_trade and count=1(frequency))'
 select term from frequency where docid = '10398_txt_earn' and count = 1
 union 
 select term from frequency where docid = '925_txt_trade' and count = 1

– Write a SQL statement to count the number of unique documents containing the word "law" or containing the word "legal"
select count(*) from (select distinct docid from frequency where term = 'law' or term = 'legal') x

– Write a SQL statement to find all documents that have more than 300 total terms
select docid, count(term) as numterms, sum(count) numwords 
from frequency
group by docid
having numterms > 300

– count the number of unique documents that contain both the word 'transactions' and the word 'world'
select distinct docid
from frequency
where term = 'transactions'
INTERSECT
select distinct docid
from frequency
where term = 'world'

– Matrix multiplication in SQL (sparse matrix, may be very fast and efficient in some DB engines):
select A.row_num, B.col_num, sum(A.value * B.value)
from A, B
where A.col_num = B.row_num
group by A.row_num, B.col_num;

– Find the best matching document to the keyword query "washington taxes treasury"
compute the similarity of documents B = A dot Atranspose
Each row of the matrix is a document vector, with one column for every term in the entire corpus
docid : rownum
term : colnum
count : value
create view corpus as
SELECT * FROM frequency
UNION
SELECT 'q' as docid, 'washington' as term, 1 as count 
UNION
SELECT 'q' as docid, 'taxes' as term, 1 as count
UNION 
SELECT 'q' as docid, 'treasury' as term, 1 as count;

# term-document matrix
create view A as
select docid rownum, term colnum, count value
from corpus;

# td matrix transposed
create view B as
select term rownum, docid colnum, count value
from corpus;

# matrix C = A dot B
create view C as
select A.rownum, B.colnum, sum(A.value * B.value) value
from A, B
where A.colnum = B.rownum and A.rownum < B.colnum
group by A.rownum, B.colnum;

# find max similarity score for 'q' doc 
select rownum, max(value)
from C
where rownum = 'q' or colnum = 'q';


MapReduce assignment
– example
import MapReduce
mr = MapReduce.MapReduce()

def mapper(record):
    # key: document identifier
    # value: document contents
    key = record[0]
    value = record[1]
    words = value.split()
    for w in words:
      mr.emit_intermediate(w, 1) # appent '1' to list, list is dict value under 'w' key in storage

def reducer(key, list_of_values):
    # key: word
    # value: list of occurrence counts from intermediate res.
    total = 0
    for v in list_of_values:
      total += v
    mr.emit((key, total)) # appent tuple to result list

if __name__ == '__main__':
  inputdata = open(sys.argv[1])
  mr.execute(inputdata, mapper, reducer)

– Create an Inverted index. Given a set of documents, an inverted index is a dictionary where each word is associated with a list of the document identifiers in which that word appears
def mapper((docid, text)) ...
words = text.split()
for w in words: mr.emit_intermediate(w, docid)
def reducer(word, values) ...
docs = distinct list(values)
mr.emit((word, docs))

– Implement a relational join as a MapReduce query
def mapper((tabname, joinkey, tabrow)) ...
mr.emit_intermediate(joinkey, (tabname, tabrow))
def reducer(key, values) ...
if len(values) >= 2: ...
masterrow = getMaster(values)
detailrows = getDetails(values)
for line in detailrows: mr.emit(masterrow + line)

– Consider a simple social network dataset consisting of a set of key-value pairs (person, friend) representing a friend relationship between two people. Count the number of friends for each person
def mapper((pers, frnd)) ...
mr.emit_intermediate(pers, frnd)
def reducer(pers, friends) ...
mr.emit(len distinct list(friends))

– Generate a list of all non-symmetric friend relationships
def mapper((pers, frnd)) ...
mr.emit_intermediate((pers, frnd), frnd)
mr.emit_intermediate((frnd, pers), '')
def reducer((name1, name2), values) ...
if name2 not in values: mr.emit((name1, name2))

– Consider a set of key-value pairs where each key is sequence id and each value is a string of nucleotides, e.g., GCTTCCGAAATGCTCGAA.... Remove the last 10 characters from each string of nucleotides, then remove any duplicates generated
def mapper((sid, nucs)) ...
mr.emit_intermediate(nucs[:-10], sid)
def reducer(key, val) ...
mr.emit(key)

– Design a MapReduce algorithm to compute the matrix multiplication A x B
def mapper((mname, rownum, colnum, val)) ...
if mname == 'a': ...
for col in 0..BCOLS: ...
mr.emit_intermediate((rownum, col), (mname, colnum, val))
if mname == 'b': ...
for row in 0..AROWS: ...
mr.emit_intermediate((row, colnum), (mname, rownum, val))
def reducer((row, col), values) ...
for item in values: ...
_, idx, value = item
res[idx].append(value)
for idx in res.keys(): ...
a, b = res[idx]
res[idx] = a * b
val = sum(res.values())
mr.emit((row, col, val))




original post http://vasnake.blogspot.com/2016/02/code-for-data-manipulation-at-scale.html

Data Manipulation at Scale: Systems and Algorithms

Закончил недавно курс:
Data Manipulation at Scale: Systems and Algorithms
University of Washington

Ощущения двойственные: вроде и знаний прибавилось, но как-то неадекватно потраченному времени (а времени жалко).
Подавляющее количество лекционного времени занято поверхностными рассказами о том,
что происходит вокруг Data Sciense и, собственно, что это такое. С уклоном в Биг Дата.
Не знаю, на какую целевую аудиторию рассчитан курс, на бухгалтеров с начальными знаниями в СУБД и программировании?
Которые хотят стать аналитиками? Мне, с моим бэкграундом программиста, было откровенно скучно 90% времени.
А когда скучно не было, настойчиво хотелось более углубленного изучения материала.
Задачки тоже ерундовые, даже думать особо не надо.
В целом: очень поверхностный, вводный курс в современный анализ данных.

Silly bus

Data Science Context and Concepts:
– Examples and the Diversity of Data Science
– Working Definitions of Data Science
– Characterizing this Course
– Related Topics
– Course Logistics
Assignment: Twitter Sentiment Analysis

Relational Databases and the Relational Algebra:
– Principles of Data Manipulation and Management
– Relational Algebra
– SQL for Data Science
– Key Principles of Relational Databases
Assignment: SQL

MapReduce and Parallel Dataflow Programming:
– Reasoning about Scale
– The MapReduce Programming Model
– Algorithms in MapReduce
– Parallel Databases vs. MapReduce
Assignment: MapReduce

NoSQL: Systems and Concepts:
– What problems do NoSQL systems aim to solve?
– Early key-value systems and key concepts
– Document Stores and Extensible Record Stores
– Extended NoSQL Systems
– Pig: Programming with Relational Algebra
– Pig Analytics
– Spark
Graph Analytics:
– Structural Tasks
– Traversal Tasks
– Pattern Matching Tasks and Graph Query
– Recursive Queries
– Representations and Algorithms

Очень интересные темы, но, увы, галопом-по-европам.


Дайджест

Data Science Context and Concepts

Data Science: intersect(Programming, Math/Statistics, Substantive expertise)

Simple methods + lots of not-so-good data vs sophisticated methods + not-so-many good and clean data.
First approach wins.

Describing/quantifying uncertainty making a statistical argument;
Conveying the results to the public:
critical to the success of the effort, and it's far from trivial.

Collecting data from web, etc; process data; make revealing presentations:
it's data science.

Graph analytics, databases, visualisation, large datasets, adhoc interactive analysis, repurposing:
what data scientist do.

Data scientist: find nuggets of truth in data and then explain it to the buisness leaders.
Strong mathematical background.

Skills: statistics, data munging, visualisation.
80% of analytics is sums and averages.

Concerned with the collection, preparation, analysis, visualisation, management and preservation of large collections of information.

Tasks:
– 80% of the work: preparing to run a model (data gathering, cleaning, integrating, restructuring, transforming, loading, filtering, deleting, combining, merging, verifying, extracting, shaping, massaging)
– running the model
– other 80% of the work: interpreting the results, communicating the results

Data products: data-driven apps (spellchecker, translator, ...)

If you're a DBA, you need to learn to deal with unstructured data;
If you're a statistician, learn to deal with big data.
if you're a software engineer, learn statistical modeling and communicating results.
If you're a business analyst, learn algorithms and tradeoffs at scale.

Abstractions: matrices, linear algebra, relations + relation algebra.

Barrier: Incompatible data formats, non-aligned data, inconsistent data semantics.
90% of time for 'handling' data (not doing science).

Working with large and heterogeneous datasets requires significant programming chops.

Science: empirical → theoretical → computational → eScience (massive datasets).
Sometimes possible only one-pass through dataset.

Data: Volume, Variety, Velocity, (Veracity, ...)
Big Data: hard data (1G social graph, build a recommender system; 10K spreadsheets to integrate).
– data exhaust from customers; new sensors; ability to keep everything

Assignment:
– sentiment of each tweet: sum(sentiments for each word)
– sentiment of new word: numPositiveTweets — numNegativeTweets
– word frequency: wordCount / numWords
– happiest state: tweetsForState; max(sumSentiment / numTweets)
– top ten hash tags: sort(tagCount / totalTags)

Куски кода выложу отельным постом.

Relational Algebra (algebra of tables):

– universal
– applicable to Big Data
– well known
– abstracted / optimized query plans
– SQL to express queries, operations

RA expression does specify order of operations, SQL does not.
SQL: declarative language.

Data model is about:
– structures (rows/columns, key-value pairs),
– constraints (all rows must have the same number of columns, ...),
– operations (get next n bytes, find the rows where column 'a' is 'b').

Database: a collection of information organized to afford efficient retrieval.
What problem do they solve:
– sharing (concurrent access by multiple readers/writers)
– data model enforcement (data clean, organized)
– scale (work with datasets larger than RAM)
– flexibility (use the data in new, unanticipated ways)

Questions:
– how is the data physically organized on disk
– what kinds of queries are efficiently supported
– how hard is it update/add the data
– what happens when I encounter new queries? Do I reorganize the data?

Relational DBMS were invented to let you use one set of data in multiple ways, including unforeseen

Algebraic structure of the data, allowing reasoning and manipulating independently of physical data representation.

Relational algebra operations (closure: operation return relation):
– set operations (union [all], intersection-join, difference-except)
– selection (condition where ...)
– projection (select [distinct] a, b, ...)
– join (cross [cartesian] product, left/right/inner/outer join, theta join)
and extended (bag) ops
– duplicate elimination (distinct)
– grouping, aggregating (group by, sum, count, ...)
– sorting (order by)

Every paper will assume set semantics; every implementation – bag semantics.

Query optimization: apply laws/rules and find less expensive equivalent
– like arithmetic ((z*2)+((z*3)+0))/1 = (2+3)*z
– same logical expression → different physical algorithms (declarative query language)
– explain … mapping SQL to RA

UDF: user defined functions:
– scalar
– aggregate
– table

Views: logical data independence (just a query with a name)
– abstraction and reuse

Indexes: for searching needle in a haystack.

SQL assignment:
Сниппеты кода выложу отдельным постом.

MapReduce, parallel dataflow

– abstraction for parallel manipulation of massive datasets

Shared nothing vs
– shared memory (multicore PC)
– shared disc (Oracle, …)
only Shared Nothing can be scaled out to 1000s of nodes.

Implementation:
– master node
– M splits — read to M map tasks, write to R regions – read to R reduce tasks
– DFS, fault tolerant, files by blocks, block size 64 mb
– YARN, task scheduler, fault tolerant

Scalable:
– can make use of 1000s of cheap computers
– for N data items you must do no more than (N^m)/k operations
– scale up vs scale out

MapReduce:
– map (inkey, invalue) => [(outkey, outvalue)], eg: (docname, listofwords) => [(word, frequence)]
– shuffle, group/aggregate outkeys
– reduce (outkey, [outvalues]) => [resvalues], eg: (word, [frequence]) => (word, totalfrequence)
– work with tuples, bags of tuples
– think backward: what result you have to produce, from what reduce input you can do it, what you have to do in map phase to generate reduce input.

Inverted index: «eat»: (tweet1, tweet3); «pancakes»: (tweet3, tweet2)
– map to (word, tweetnum)
– reduce input is the result.

Relational join in MR:
– lump two datasets into one
– map to (joinkey, datarow)
– reduce to (joinkey, tab1row, tab2row)

Counting friends in MR:
– record ('Jim', 'Sue') is an edge from digraph
– counting like in 'word count' example, key is a person name

Matrix multiplication in MR:
– reduce output must be in form ((i, j), val) → (i,j) is a key
– mapper have to emit each cell from A numBcols times and so on

Parallel databases vs MapReduce
– MR: cheap scale out, fault tolerance, easy to deploy
– PD: schema, low latency, indexes, consistency (transactions/ACID), SQL, but very expensive
– SQL, indexes, schemas, low latency, algebraic optimisation emerging in MR world (Pig, Hive, Impala, Spark, ...)
– relational algebra everywhere

For data fitted to RDB (not so big), RDBMS may be faster than MR
– MR load data faster
– RDBMS process data faster (preprocessing done in loading phase)


MapReduce assignment:
Сниппеты кода выложу отдельным постом.

NoSQL: key-value, extensible record, document DB

NoSQL systems are purely about scale rather than analytics, and are arguably less relevant for the practicing data scientist.

Joins, language/algebra barely present in NoSQL bases:
– CouchDB (no language)
– Pig, Hive, Spark, Impala
It's a very small fraction of all available databases. Scale out for write, read – all about it.

NoSQL features:
– scale horizontally
– replicate and partition data over many nodes
– simple API – no query language
– weak concurrency model (vs ACID transactions)
– efficient use of distributed indexes, RAM data storage
– dynamically add new attributes to data records

ACID:
– atomicity
– consistency (written data must be valid according to all defined rules)
– isolation (often relaxed)
– durability

Relaxing consistency guarantees: CAP theorem
– consistency, availability, partitioning – choose 2 of 3
– w/o partitioning no scalability => AP, no C
– big scale, frequent failures => transactions can't be finished fast enough
eventually consistent data.

Consistency:
– do all applications see all the same data?
Availability:
– can I interact with the system in the presence of failures?
Partitioning:
– if two sections of your system cannot talk to each other, can they make forward progress?
– If not => no Availability
– If yes => no Consistency

Conventional databases assume no partitioning.
NoSQL systems may sacrifice consistency.

Paxos protocol for solving consensus:
– distributed voting scheme

MVCC: multi-version concurrency control:
– creates new version on write
– checks timestamps to determine which version to read

Major impact systems:
– Memcached: in-memory indexes can be highly scalable (consistent hashing/hash ring)
– Dynamo: idea of using eventual consistency as a way to achieve higher availability and scalability
(SLA: respond within 300 ms for 99.9% requests at the 99th percentile); vector clocks
– BigTable: persistent record storage could be scaled to 1000s of nodes

BigTable/Hbase:
– sparse, distributed, persistent multi-dimentional sorted map
– record is: (row, col, time): string
– data sorted by row key
– row key range broken into tablets (tablet: unit of distribution)
– column families (family is unit of access control, mem.accounting, disc accounting)
– write through memtable, SSTables (disc chunks) are immutable

RA and declarative query languages comes to MapReduce:
– Hive
– Pig
Take joins. To make a join system can use different methods/strategies (diff. query plans).
System should pick the right one, not programmer => we need declarative query language with optimizer to make RA plan.

Enterprise needs:
– ACID
– declarative query language
– schema/standards

Pig: almost SQL on top of Hadoop (MapReduce), nested types:
– data model: atom, tuple, bag, dict (map)
– operators: load, filter, group/cogroup (join), distinct, foreach, flatten, store, union, cross, order, ...

Spark: iterative tasks, cached RDD:
– RDD: distributed collection of key-value pairs
– transformations → actions
– lazy evaluating (optimization)
– operations: filter, group, sort, union, cross, … (RA?)

Graphs: Structural tasks

Graph-structured data are increasingly common in data science contexts due to their ubiquity in modeling the communication between entities

graph:
– set of vertices V
– set of edges E
– edge is a pair (v, w), directed, undirected, weighted, capacitated, loaded with data
structural algorithms, traversal, pattern-matching

Structural:
– in-degree, out-degree
– degree histogram: for d = 0..maxOutDegree find n = count(vertices.outdegree == d)
– random graph have exponential distribution: n(d) = (c*x^d), x < 1 (most vertices have small outdegree)
– human generated graphs have zipf distribution: n(d) = 1/d^x, x > 0 (long tail, most vertices have significant outdegree)
– connectivity coefficient: min number of vertices you need to remove that will disconnect the graph
– centrality:
closeness centrality of a vertex: average length of all its shortest paths;
betweeness centrality of v: the fraction of all shortest paths that pass through v

Traversal tasks:
– PageRank (eigenvector centrality): propagate node rank to connected nodes, divide to outgoing links, mitigate by damping factor
– minimum spanning tree (smallest subset of edges)
– Euler's path: visit every vertex crossing each edge once, easy
– Hamiltonian path: visit every vertex once, NP-complete
– maxflow/mincut

PageRank in details:
– formula: PRv = (1 - damp) / len(Vertices) + damp * sumPointingToV( PR(wi) / wi.outdegree)
– meaning: probability of stepping into vertex walking randomly

Graph: Pattern matching tasks

– find all instances of a pattern (in graph G, pattern: configuration of vertices and edges)
– pattern example: two vertices connected to each other directly. Vertices and edges may be labeled.

Popular pattern: triangle A → B → C → A, a cycle with 3 vertices.
– brute force: for each edge (v, w) check every vertex u: if (v, u) and (w, u) is edges => triangle.

RDF – resource description framework (subject, object, predicate):
– given a graph with edge labels
– pattern match example: A interferes with B regulates C associated with D
– need some kind of pattern expression language
– RDF: resource description framework, triples (subject, predicate, object)
– triple is just a labeled edge in a graph

Relation algebra, Datalog for graphs
– SQL query for pattern:
select i.subject from R i, R r, R a where i.predicate = 'interferes with' and
r.predicate = 'regulates' and a predicate = 'associated with' and
i.object = r.subject and r.object = a.subject
– Datalog query:
Ans(x) :-
R(x, interferes witn, y), R(y, regulates, z), R(z, associates with, w)

Another example:
– relations: InterferesWith(drug1, drug2); Regulates(drug, gene); AssociatedWith(gene, disease);
– SQL: select i.drug1 from InterferesWith i, Regulates r, AssociatedWith a
where i.drug2 = r.drug and r.gene = a.gene
– Datalog: Ans(x) :-
InterferesWith(x, y), Regulates(y, z), AssociatedWith(z, w)

Recursion

– graph query example: Datalog
knew(Person2) :- knew(Person1) email(Person1, Person2, Message, Time), Time < 'June 3'
– sort of BFS algorithm, loop ang queue
– beware: cycles in the graph
– MapReduce, Spark: loop, cache invariants
– in graphs with ZIPF distribution (long tail) you have to break looping: insignificant data going and going but you get all you need already

Pregel, Giraph, Hama:
– PageRank in MapReduce have problems: all graph data shuffled on every iteration;
we need to shuffle only new rank contributions; iterations controlled outside of MR.
– Pregel: batch algorithms on large graphs
while any vertex is active or max iterations not reached:
for each vertex: # parallelized
process messages from neighbors from prev.iteration
send messages to neighbors
set active flag

Data representation

– edge table: edges(v, w [, data])
top 5 highest in-degree vertices: select top 5 w, incount from (
select w, count(v) as incount from edges group by w
) order by incount
– adjacency list: edges(v, [w, …]); good for MR
– adjacency matrix: good for finding specific edge
Different algorithms, different properties.






original post http://vasnake.blogspot.com/2016/02/data-manipulation-at-scale-systems-and.html

Code for 'Hadoop Platform and Application Framework'

Вчера вышла заметка 
про курс
Hadoop Platform and Application Framework
by University of California, San Diego (San Diego Supercomputer Center)

Я обещал оформить куски кода отдельно.
Вот, иллюстративный материал в виде сниппетов.

Cloudera QuickStart VM, practice

username: cloudera, password: cloudera, hostname: quickstart

Берем образ виртмашины (полный фарш), запускаем, логинимся на рабочий стол. Далее все операции  через веб-интерфейс и терминал. В общем, там внутре подробная шпаргалка.

Load data from SQL to HDFS, Sqoop:
sqoop import-all-tables \
    -m 1 \
    --connect jdbc:mysql://quickstart:3306/retail_db \
    --username=retail_dba \
    --password=cloudera \
    --compression-codec=snappy \
    --as-avrodatafile \
    --warehouse-dir=/user/hive/warehouse

hadoop fs -ls /user/hive/warehouse
Found 6 items
drwxr-xr-x   - cloudera hive          0 2016-01-04 11:50 /user/hive/warehouse/categories
drwxr-xr-x   - cloudera hive          0 2016-01-04 11:51 /user/hive/warehouse/customers
drwxr-xr-x   - cloudera hive          0 2016-01-04 11:51 /user/hive/warehouse/departments
drwxr-xr-x   - cloudera hive          0 2016-01-04 11:52 /user/hive/warehouse/order_items
drwxr-xr-x   - cloudera hive          0 2016-01-04 11:52 /user/hive/warehouse/orders
drwxr-xr-x   - cloudera hive          0 2016-01-04 11:52 /user/hive/warehouse/products

# schema 
ls -l *.avsc
-rw-rw-r-- 1 cloudera cloudera  541 Jan  4 11:50 sqoop_import_categories.avsc
-rw-rw-r-- 1 cloudera cloudera 1324 Jan  4 11:50 sqoop_import_customers.avsc

sudo -u hdfs hadoop fs -mkdir /user/examples
sudo -u hdfs hadoop fs -chmod +rw /user/examples
hadoop fs -copyFromLocal ~/*.avsc /user/examples/

Query Structured Data using Hue + Impala
CREATE EXTERNAL TABLE categories STORED AS AVRO
LOCATION 'hdfs:///user/hive/warehouse/categories'
TBLPROPERTIES ('avro.schema.url'='hdfs://quickstart/user/examples/sqoop_import_categories.avsc');
… 
invalidate metadata;
show tables;

-- Most popular product categories
select c.category_name, count(order_item_quantity) as count
from order_items oi
inner join products p on oi.order_item_product_id = p.product_id
inner join categories c on c.category_id = p.product_category_id
group by c.category_name
order by count desc
limit 10;

-- top 10 revenue generating products
select p.product_id, p.product_name, r.revenue
from products p inner join
(select oi.order_item_product_id, sum(cast(oi.order_item_subtotal as float)) as revenue
from order_items oi inner join orders o
on oi.order_item_order_id = o.order_id
where o.order_status <> 'CANCELED'
and o.order_status <> 'SUSPECTED_FRAUD'
group by order_item_product_id) r
on p.product_id = r.order_item_product_id
order by r.revenue desc
limit 10;

Correlate Structured Data with Unstructured Data (DB records + web logs)
Bulk upload, web log data
sudo -u hdfs hadoop fs -mkdir /user/hive/warehouse/original_access_logs
sudo -u hdfs hadoop fs -copyFromLocal /opt/examples/log_files/access.log.2 /user/hive/warehouse/original_access_logs
hadoop fs -ls /user/hive/warehouse/original_access_logs
Found 1 items
-rw-r--r--   1 hdfs hive   39593868 2016-01-05 08:06 /user/hive/warehouse/original_access_logs/access.log.2

Build a table in Hive (Beeline)
First, you'll take advantage of Hive's flexible SerDe (serializers / deserializers)
to parse the logs into individual fields using a regular expression
beeline -u jdbc:hive2://quickstart:10000/default -n admin -d org.apache.hive.jdbc.HiveDriver
...
jdbc:hive2://quickstart:10000/default> 
CREATE EXTERNAL TABLE intermediate_access_logs (
 ip STRING,
 date STRING,
 method STRING,
 url STRING,
 http_version STRING,
 code1 STRING,
 code2 STRING,
 dash STRING,
 user_agent STRING)
 ROW FORMAT SERDE 'org.apache.hadoop.hive.contrib.serde2.RegexSerDe'
 WITH SERDEPROPERTIES (
 'input.regex' = '([^ ]*) - - \\[([^\\]]*)\\] "([^\ ]*) ([^\ ]*) ([^\ ]*)" (\\d*) (\\d*) "([^"]*)" "([^"]*)"',
 'output.format.string' = "%1$s %2$s %3$s %4$s %5$s %6$s %7$s %8$s %9$s"
 )
 LOCATION '/user/hive/warehouse/original_access_logs';

Second, you'll transfer the data from this intermediate table to one that does not require any special SerDe
CREATE EXTERNAL TABLE tokenized_access_logs (
 ip STRING,
 date STRING,
 method STRING,
 url STRING,
 http_version STRING,
 code1 STRING,
 code2 STRING,
 dash STRING,
 user_agent STRING)
 ROW FORMAT DELIMITED FIELDS TERMINATED BY ','
 LOCATION '/user/hive/warehouse/tokenized_access_logs';

ADD JAR /usr/lib/hive/lib/hive-contrib.jar;
INSERT OVERWRITE TABLE tokenized_access_logs SELECT * FROM intermediate_access_logs;
!quit

query in Hue + Impala
invalidate metadata;
show tables;

select count(*),url from tokenized_access_logs
where url like '%\/product\/%'
group by url order by count(*) desc;


Pig, PigLatin

– platform for data processing
– PigLatin: high level language
– pig execution: local, MR, Tez
– extensible
– usage: ETL; manipulating, analysing raw data
– can run as scripts, embedded programs

interactive shell example
hdfs dfs -put /etc/passwd /user/cloudera/
pig -x mapreduce
… >
grunt> A = load '/user/cloudera/passwd' using PigStorage(':');
grunt> B = foreach A generate $0, $4, $5 ;
grunt> dump B;
…
(root,root,/root)
(bin,bin,/bin)
(daemon,daemon,/sbin)
(adm,adm,/var/adm)

grunt> store B into 'userinfo.out';

[cloudera@quickstart ~]$ hdfs dfs -ls /user/cloudera/userinfo.out
Found 2 items
-rw-r--r--   1 cloudera cloudera          0 2016-01-07 09:00 /user/cloudera/userinfo.out/_SUCCESS
-rw-r--r--   1 cloudera cloudera       1459 2016-01-07 09:00 /user/cloudera/userinfo.out/part-m-00000

Hive

– query, manage data using HiveQL (SQL-like)
– run interactively using beeline, other run options (Hcatalog, WebHcat)
– warehouse tools
– exec.environments: MR, Tez, Spark
– data in HDFS, HBase
– custom mappers/reducers
– usage: data mining, analytics; ML; ad hoc analysis

interactive shell example
$ beeline -u jdbc:hive2://
> create table userinfo (uname STRING, pswd STRING, uid INT, gid INT, fullname STRING, hdir STRING, shell STRING)
 row format delimited fields terminated by ':' stored as textfile;
> load data inpath '/tmp/passwd' overwrite into table userinfo;
> select uname, fullname, hdir from userinfo order by uname;
…
+----------------+-------------------------------+-------------------------------+--+
|     uname      |           fullname            |             hdir              |
+----------------+-------------------------------+-------------------------------+--+
| abrt           |                               | /etc/abrt                     |
| adm            | adm                           | /var/adm                      |

HBase

– non-relational distributed database on top of HDFS
– compression, in-memory operations (memstore, blockcache)
– Consistency, high Availability, Partitioning (auto sharding)
– replication, security
– SQL-like access (using Hive, Spark, Impala)
– HBase API, external API, MR

example, interactively HBase shell
$ hbase shell
> 
create 'userinfotable', {NAME => 'username'}, {NAME => 'fullname'}, {NAME => 'homedir'} 
put 'userinfotable', 'r1', 'username', 'vcsa' 
put 'userinfotable', 'r1', 'fullname', 'VirtualMachine Admin'
put 'userinfotable', 'r2', 'username', 'sasuser'
put 'userinfotable', 'r3', 'username', 'postfix'

hbase(main):009:0> scan 'userinfotable'
ROW                                  COLUMN+CELL                                                                                              
 r1                                  column=fullname:, timestamp=1452193130567, value=VirtualMachine Admin                                    
 r1                                  column=username:, timestamp=1452193080935, value=vcsa                                                    
 r2                                  column=username:, timestamp=1452193088826, value=sasuser                                                 
 r3                                  column=username:, timestamp=1452193096044, value=postfix  

hbase(main):011:0> scan 'userinfotable', {COLUMNS => 'fullname'}
ROW                                  COLUMN+CELL                                                                                              
 r1                                  column=fullname:, timestamp=1452193130567, value=VirtualMachine Admin

HDFS 

– bin/hdfs script:
– user commands, filesystem shell commands
– administrator commands
– debug commands
– native Java API, org.apache.hadoop.fs.FileSystem
– C API, libhdfs
– WebHDFS REST API
– NFS gateway
– other options: Flume, Sqoop, …

commands example
– hdfs dfs -ls /
– hdfs dfs -mkdir /user/test
dd if=/dev/zero of=sample.txt bs=64M count=16
sudo -u hdfs hdfs dfs -chown -R cloudera /user/test
sudo -u hdfs hdfs dfs -put sample.txt /user/test/
hdfs fsck /user/test/sample.txt
sudo -u hdfs hdfs dfsadmin -report

HDFS API
– org.apache.hadoop.fs.FileSystem
– FSDataInputStream, FSDataOutputStream
– methods: get, open, create
FileSystem fs = FileSystem.get(URI.create(uri), conf);
in = fs.open(new Path(uri)); 
IOUtils.copyBytes(in, System.out, 4096, false);
...

REST API for HDFS
– in hdfs-site.xml: dfs.webhdfs.enabled; dfs.web.authentication.kerberos.principal; ...kerberos.keytab
hdfs getconf -confKey dfs.webhdfs.enabled
– example
service hadoop-httpfs start
curl -i "http://quickstart.cloudera:14000/webhdfs/v1/user/cloudera?user.name=cloudera&op=GETFILESTATUS" 
curl -i -X PUT "http://quickstart.cloudera:14000/webhdfs/v1/user/test?user.name=cloudera&op=MKDIRS&permission=755" 
curl -i "http://quickstart.cloudera:14000/webhdfs/v1/user/test?user.name=cloudera&op=GETCONTENTSUMMARY" 

MapReduce

word count example
– mapper take line; produce pairs (word, 1)
– MR sort mapper output, feed it to reducer
– reducer take (word, 1); if current word == input word => word count += 1; output (word, count) if curr. word != inp.word

mapper
for line in sys.stdin:  
    line = line.strip()
    keys = line.split()
    for key in keys:
        value = 1        
        print('{0}\t{1}'.format(key, value))

reducer
last_key      = None
running_total = 0

for input_line in sys.stdin:
    input_line = input_line.strip()
    this_key, value = input_line.split("\t", 1)
    value = int(value)

    if last_key == this_key:
        running_total += value
    else:
        if last_key is not None:
            print( "{0}\t{1}".format(last_key, running_total) )
        last_key = this_key
        running_total = value

if last_key == this_key:
    print( "{0}\t{1}".format(last_key, running_total)) 

test and run
> chmod +x wordcount_mapper.py
> chmod +x wordcount_reducer.py
> echo "A long time ago in a galaxy far far away" > /home/cloudera/testfile1
> echo "Another episode of Star Wars" > /home/cloudera/testfile2
> cat testfile* | ./<mapper.py> | sort | ./<reducer.py>
hdfs dfs -put /home/cloudera/testfile1 /user/cloudera/input
hdfs dfs -put /home/cloudera/testfile2 /user/cloudera/input
hadoop jar /usr/lib/hadoop-mapreduce/hadoop-streaming.jar \
   -input /user/cloudera/input \
   -output /user/cloudera/output_new \
   -mapper /home/cloudera/wordcount_mapper.py \
   -reducer /home/cloudera/wordcount_reducer.py \
   -numReduceTasks 1
hdfs dfs -cat /user/cloudera/output_new/part-00000
> hdfs dfs -getmerge /user/cloudera/output_new_0/* ./wordcount_num0_output.txt

assignment: MR joining data 
(The show-to-channel relationship is Many-to-Many)
– find all ABC shows;
– group by show name;
– sum counts for each show
# A: show,cnannel
# B: show,count
# mapper 
for line in sys.stdin:
    line = line.strip()
    show, channel = line.split(",")
    count = 'NULL'
    if channel.isdigit():
        count = int(channel)
        channel = 'NULL'
    # we interested only in ABC channel
    if count != 'NULL' or channel == 'ABC':
        print('%s\t%s\t%s' % (show, channel, count))

# reducer 
def dumpAccum():
    global total
    if 'ABC' in channels:
        show = currentShow
        vnum = sum(counts)
        total += vnum
        print("{0} {1}".format(show, vnum))
    
for line in sys.stdin:
    line = line.strip()
    show, channel, count = line.split('\t')
    if show != currentShow and currentShow:
        dumpAccum()
        resetAccum()
    currentShow = show
    appendChannel(channel)
    appendCount(count)
    
dumpAccum()
# print total

Spark

features
– on top of HDFS, YARN but can work standalone on any storage
– fast (cache, no intermediate HDFS writes)
– interactive (Scala, Python, R shells)
– iterative
– any DAG workflow

PySpark setup
sudo easy_install ipython==1.2.1
PYSPARK_DRIVER_PYTHON=ipython
pyspark

Concepts
– RDD
– transformations (lazy)
– actions
– caching
– shared variables (one-way transfer)

RDD: resilient distributed dataset (from storage, from RDD transformations)
– divided in partitions (atomic chunks of data), immutable;
– track history of each partition, re-run if necessary;
a_RDD = sc.parallelize(range(10), 3) 
b_RDD = a_RDD.map(lambda x: x+1)) # transformation 
b_RDD.glom.collect() # action 

join example
fileA = sc.textFile("input/join1_FileA.txt")
fileB = sc.textFile("input/join1_FileB.txt")
def split_fileA(line):
    word, cnt = line.split(',')
    count = int(cnt)
    return (word, count)
fileA_data = fileA.map(split_fileA)
# collect(); take(2)
def split_fileB(line):
    date, val = line.split(' ')
    word, count_string = val.split(',')
    return (word, date + " " + count_string)
fileB_data = fileB.map(split_fileB)
fileB_joined_fileA = fileB_data.join(fileA_data)
fileB_joined_fileA.collect()

broadcast variables
– large variable used in all nodes read-only
– transfer just once per Executor
– torrent-like transfer
– good for config, lookup table, join (if table fits in memory)
conf = sc.broadcast({a: b, c: d})
…
conf.value 

accumulators
– write-only on nodes
– collect data across the cluster
acc = sc.accumulator(0) 
def test(x): acc.add(x) 
rdd.foreach(test)
acc.value







original post http://vasnake.blogspot.com/2016/02/code-for-hadoop-platform-and.html

Hadoop Platform and Application Framework

Недавно закончил курс
Hadoop Platform and Application Framework
by University of California, San Diego (San Diego Supercomputer Center)

Еще один курс «для самых маленьких», по типу «Data Manipulation at Scale: Systems and Algorithms».
Целевая аудитория: школота, желающая знать «что такое Hadoop, MapReduce и с чем это едят».
На лекциях довольно много времени отведено рассказам о том, кто на ком стоял и что есть что в экосистеме Apache Hadoop.
Рассказывают о том, как это работает, в принципе. Если лень искать доки и из них это выколупывать, то вполне можно лекции послушать.
На практике дают пощупать HDFS, MapReduce и Spark руками, через выполнение тестовых заданий по обработке игрушечных датасетов на виртмашине от Cloudera (Cloudera QuickStart).
В целом, полезно. Но времени жалко.

Silly bus

Hadoop Basics
Lesson 1: Big Data Hadoop Stack
Lesson 2: Hands-On Exploration of the Cloudera VM

Introduction to the Hadoop Stack
Lesson 1: Overview of the Hadoop Stack
Lesson 2: The Hadoop Execution Environment
Lesson 3: Overview of Hadoop based Applications and Services

Introduction to Hadoop Distributed File System (HDFS)
Lesson 1: HDFS Architecture and Configuration
Lesson 2: HDFS Performance and Tuning
Lesson 3: HDFS Access, Commands, APIs, and Applications

Introduction to Map/Reduce
Lesson 1: Introduction to Map/Reduce
Lesson 2: Map/Reduce Examples and Principles
Assignment: Running Wordcount with Hadoop streaming, using Python code
Assignment: Joining Data

Spark
Lesson 1: Introduction to Apache Spark
Lesson 2: Resilient Distributed Datasets and Transformations
Lesson 3: Job scheduling, Actions, Caching and Shared Variables
Assignment: Simple Join in Spark
Assignment: Advanced Join in Spark


Дайджест

Big Data Hadoop Stack

Apache Hadoop is an open source software framework for
storage and large scale processing of data-sets on clusters of commodity hardware.

Simple batch-processing framework, moving computations to data.

Hardware failures handled automatically (data replication, process tracking).

Schema-on-read approach. Big amount of data + simple analisys beats small data + complex analisys.

Scalability, Reliability, Flexibility, Cost

Components:
– Hadoop Common (libs and utils)
– HDFS (Hadoop Distributed File System), very high aggregate bandwidth
– Hadoop Map-Reduce (later YARN + MR): resource management platform, scheduler (MR: programming model)

On top of that other applications and tools, like:
– HCatalog
– PIG
– Hive
– HBase
– ...

Nodes and tasks:
– Master node, Slave nodes
– HDFS: Name node, Data nodes
– MR: Task manager, Job tracker, Task trackers

HDFS
– Java, distributed, scalable, portable
– reliability via replicating data
– Namenode (+ secondary Namenode): metadata; heartbeats, replication, balancing
– Datanodes: blocks

MR as a scheduler:
– Job tracker on Master server, pushes work to Slave servers to available Task trackers
– now we have YARN

YARN aka Hadoop 2 (NextGen):
– Hadoop1: (HDFS, MR) vs Hadoop2: (HDFS, YARN, MR)
– separates resource management and data processing components
– on master: resource manager; on slaves: node manager, app master, container
– supports other workloads (graph processing, iterative modeling)
– multiple HDFS access engines (batch, interactive, realtime/streaming)

Hadoop Zoo
– Google stack, Facebook stack, Yahoo, Cloudera: all like twins
– blocks data storage, coordinator/scheduler/manager, metadata, columns/records, query languages, warehousing, ...

Apache Sqoop (SQL for Hadoop)
– for transferring data between Hadoop and RDB (SQL)

HBase (Google BigTable)
– column-oriented DBMS
– key-value store
– dynamic data model
– fast random data access

PIG, PIG Latin
– high level programming on top of MR
– data analysis as data flows

Apache Hive
– data warehousing, quering, managing
– can project structure on data
– SQL-like queries (HiveQL)
– allow to use MR processing

Oozie
– workflow scheduler to manage Hadoop jobs
– supports MR, Pig, Hive, Sqoop, …
– Oozie workflow jobs is a DAG
– Oozie Coordinator jobs is a recurrent workflow jobs, triggered by frequency or data availability

Zookeeper
– operational services for a Hadoop cluster
– centralized service for maintaining config, naming, synchronization, …

Flume
– for log data, streaming: collect, aggregate, move, …

Impala
– MPP (massively parallel processing) SQL query engine

Apache Spark
– fast in-memory data processing
– works on top of YARN but have own cluster manager for standalone mode
– can be started on any storage

Hadoop stack

HDFS → YARN → Tez (exec.engine) under MR (batch), Pig, Hive, ...

We have more than one distribution => building blocks can be somewhat different

HDFS
– files divided to blocks (file as list of blocks), blocks stored on data nodes
– each block is replicated (3 copy)
– metadata on a name node
– HDFS2 added: HDFS federation, multiple name nodes, namespaces, block pools (performance, isolation)
– heterogeneous storage (archive, disk, ssd, ram_disk)

MapReduce
– parallel data processing framework
– MR job splits data into chunks
– map task process data chunks (line: key, value)
– framework sorts/shuffle map output by key
– reduce tasks use sorted map data as input
– compute and storage nodes are the same

Vanilla MR 1
– single master JobTracker, schedules, monitors, re-execute tasks
– one slave TaskTracker per node
– TaskTracker executes tasks per JobTracker requests

YARN, MR 2
– separate resource management and job scheduling/monitoring
– global ResourceManager (get job req. from client; get node status from NM)
– RM have scheduler, allocating resources (capacity queues, …); app.manager, negotiates first container for application
– NodeManager on each node
– ApplicationMaster: one for each application
– AM do the job using Containers

Additional YARN features
– high availability resource manager, stand-by RMs in case RM goes down
– timeline server, historic information for storage and apps (history of resource usage, ...)
– use of Cgroups, containers resources management
– secure containers, for particular users
– YARN REST web API

Hadoop resource scheduling
– resource management, different scheduling algorithms, parameters
– want to control resource usage
– schedulers: FIFO (default); fairshare; capacity
– fairshare try to balance resources across apps (memory pools wants to be eq. size)
– capacity gives you a garantee for asked resources (resources divided among queues, apps/users wait in queue with asked capacity)

capacity sheduler
– queues and subqueues
– capacity garantee with elasticity (can expand resources)
– runtime changes (draining/waiting apps)
– ACL security

fairshare scheduler
– balances out resource allocation among apps over time
– organize into queues/subqueues
– guarantee min. shares
– limits per user/app
– weighted app priorities

Limitations of classic MapReduce: interactive or iterative tasks inefficient
– executions frameworks like YARN, Tez, Spark: support complex DAG of tasks, RAM caching
– Tez, Spark on top of YARN (Spark can work w/o YARN, standalone)

The Apache Tez project is aimed at building an application framework
which allows for a complex directed-acyclic-graph of tasks for processing data
– dataflow graphs
– custom data types
– complex DAG of tasks, dynamic DAG changes

Tez example: Hive SQL query (2 joins) on MR vs Hive on Tez
– MR chain with 4 map-reduce stages, writes on HDFS
– Tez chain w/o intermediate HDFS read/write, 4 reduce stages, only 2 map stages
– reusing containers and data, less map jobs

Spark, 100x faster
– asvanced DAG exec.engine
– supports cyclic data flow (iterations: for iter in iterations: gradient = pointsRDD.map(...).reduce... )
– in-memory computing, cache
– many languages: Java, Scala, Python, R
– optimized libs

Databases/stores
– Avro: data structures for MR
– HBase: NoSQL database
– Cassandra: NoSQL database

Querying
– Pig, declarative language over MR
– Hive, manage and query large datasets
– Impala, low-latency SQL query
– Spark, general processing, in-memory

ML, graph processing
– Giraph, iterative graph processing
– Mahout, framework for ML apps over Spark, Hadoop
– Spark, general …

Pig, PigLatin
– platform for data processing
– PigLatin: high level language
– pig execution: local, MR, Tez
– extensible
– usage: ETL; manipulating, analysing raw data
– can run as scripts, embedded programs

Hive
– query, manage data using HiveQL (SQL-like)
– run interactively using beeline, other run options (Hcatalog, WebHcat)
– warehouse tools
– exec.environments: MR, Tez, Spark
– data in HDFS, HBase
– custom mappers/reducers
– usage: data mining, analytics; ML; ad hoc analysis

HBase
– non-relational distributed database on top of HDFS
– compression, in-memory operations (memstore, blockcache)
– Consistency, high Availability, Partitioning (auto sharding)
– replication, security
– SQL-like access (using Hive, Spark, Impala)
– HBase API, external API, MR

HDFS in details

– low cost commodity hardware
– tons of nodes
– portability
– large datasets
– high throughput (aggregated)

design
– simplified coherency model: write once read many
– data replication
– move computation to data
– relax POSIX requirements (client-side buffering)
– name node: namespace, metadata, regulates access
– data nodes: manage storage, serving read/write requests from clients, block creation, deletion, replication based on NN instructions

HDFS block size
– default BS = 64 MB, good for large files
– small files = big problems
– impact NN memory usage, number of map tasks
– memory usage: 150 bytes / object => 300 BG for 1 billion blocks
– network load: number of checks proportional to number of blocks
– many small blocks is bad
– many small files is worse

large number of small files is bad
– map tasks: depends on #blocks, 10GB data, 32k file size => 327680 map tasks.
lots of queued tasks; large overhead of start/stop tasks; inefficient I/O
– many map tasks is bad
– latency: start/stop java processes
– disk I/O low for small data blocks

options for small files?
– merge/concatenate
– sequence
– HBase, HIVE configuration
– CombineFileInputFormat in API

Write to HDFS
– data cached on client up to block size (POSIX relaxation)
– size up to block size, contact to NameNode: where block should go
– NN responds with a list of DataNodes, rack aware: second replica goes to remote rack
– first DN recieves data and save it to local block; replicate block to second DN
– second DN replicate block to third DN; replication pipelined, block goes to all nodes immediately
– NN commits writed data into persistent store; recieves heartbeat and block report from DN
– if fail occures, NN start recovery procedure

Read from HDFS
– client gets data node list from name node
– read from closest replica

HDFS config: hdfs-site.xml
– dfs.blocksize; dfs.block.size
– dfs.replication, default 3 (lower repl => more space; higher repl => can make data local to more workers)

Failures
– data node failures, network f., name node f.
– name node recieves heartbeat, block reports from data nodes
– DN w/o recent heartbeat: marked dead, no new IO, blocks below repl.factor re-replicated
– checksum computed on file creation, stored in namespace
– checksum used to check retrieved data, re-read from replica if need
– multiple copies of metadata
– failover-to-standby name node, manual by default

HDFS access
– bin/hdfs script:
– user commands, filesystem shell commands
– administrator commands
– debug commands
– native Java API, org.apache.hadoop.fs.FileSystem
– C API, libhdfs
– WebHDFS REST API
– NFS gateway
– other options: Flume, Sqoop, …

commands example
– hdfs dfs -ls /
– hdfs dfs -mkdir /user/test
dd if=/dev/zero of=sample.txt bs=64M count=16
sudo -u hdfs hdfs dfs -chown -R cloudera /user/test
sudo -u hdfs hdfs dfs -put sample.txt /user/test/
hdfs fsck /user/test/sample.txt
sudo -u hdfs hdfs dfsadmin -report

HDFS API
– org.apache.hadoop.fs.FileSystem
– FSDataInputStream, FSDataOutputStream
– methods: get, open, create
FileSystem fs = FileSystem.get(URI.create(uri), conf);
in = fs.open(new Path(uri)); 
IOUtils.copyBytes(in, System.out, 4096, false);
...

REST API for HDFS
– in hdfs-site.xml: dfs.webhdfs.enabled; dfs.web.authentication.kerberos.principal; ...kerberos.keytab
hdfs getconf -confKey dfs.webhdfs.enabled
– example
service hadoop-httpfs start
curl -i "http://quickstart.cloudera:14000/webhdfs/v1/user/cloudera?user.name=cloudera&op=GETFILESTATUS" 
curl -i -X PUT "http://quickstart.cloudera:14000/webhdfs/v1/user/test?user.name=cloudera&op=MKDIRS&permission=755" 
curl -i "http://quickstart.cloudera:14000/webhdfs/v1/user/test?user.name=cloudera&op=GETCONTENTSUMMARY" 

MapReduce in details

– large number of internet documents that need to be indexed
– bring computations to data
– hide all messy details about cluster failures, communications, synchronisation, whatnot
– model: sweep through all data
– user defines: (key, value) items; mapper; reducer
– Hadoop handles the logistics (distribution (code, data), execution), brings mapper to datanodes
– mapper takes data item (line of text from file-HDFS block); outputs (key, value) pairs
– Hadoop shuffles pairs into partitions, sorts within the partitions and feed sorted pairs to reducers
– reducer take (key, value) and output result (grouping, aggregating, calculating, …)
– trade-offs: programming complexity / Hadoop workload
– common mappers: filter, identity, splitter, …

running MR
– streaming (stdin/stdout pipes)
– API

word count example
– mapper take line; produce pairs (word, 1)
– MR sort mapper output, feed it to reducer
– reducer take (word, 1); if current word == input word => word count += 1; output (word, count) if curr. word != inp.word

principles
– 1 mapper per data split
– 1 reducer per core (processing time, number of output files)
– key-value simplicity (shuffling & grouping)
– good task decomposition: simple and separable mapper; easy consolidation in reducers
– composite keys
– extra info in values
– cascade MR jobs
– bin keys into ranges
– aggregate map output (combiner option, grouping in shuffle phase?)

inner join
– join field must be the key for reducer => mapper output (join field, data)

vectors (matrices) multiplication
– ab = (a1 * b1) + (a2 * b2) …
– mapper output key-values is (index, values)
– N groups are shuffled to reducers, may be slow
– you can use binning: R bins => map output (bin, index, data)
– you pay by reducer complexity

MR limitations
– not iterative
– not interactive
– problem must fit in MR model

Spark

Spark provides great performance advantages over Hadoop MapReduce, especially for iterative algorithms,
thanks to in-memory caching. Also, gives Data Scientists an easier way to write their analysis pipeline in Python and Scala,
even providing interactive shells to play live with data.

features
– on top of HDFS, YARN but can work standalone on any storage
– fast (cache, no intermediate HDFS writes)
– interactive (Scala, Python, R shells)
– iterative
– any DAG workflow

cluster
– worker nodes with Spark Executor (JVM) linked to HDFS data;
some tasks will be send to HDFS nodes
– Cluster Manager (YARN/Standalone): provision/restart workers
– Driver program, DAG processing, action results

PySpark setup
sudo easy_install ipython==1.2.1
PYSPARK_DRIVER_PYTHON=ipython
pyspark

Concepts
– RDD
– transformations (lazy)
– actions
– caching
– shared variables (one-way transfer)

RDD: resilient distributed dataset (from storage, from RDD transformations)
– divided in partitions (atomic chunks of data), immutable;
– track history of each partition, re-run if necessary;
a_RDD = sc.parallelize(range(10), 3) 
b_RDD = a_RDD.map(lambda x: x+1)) # transformation 
b_RDD.glom.collect() # action 

transformations: lazy operations on RDD
– worker → worker
– RDD = RDD.transform
– pipeline as chain of transformations;
– map, flatMap(one => many), filter, sample, coalesce: narrow;
– narrow (local) vs wide (network) transformations;
– groupByKey: wide transformation; also: reduceByKey, repartition

know shuffle, avoid it: replace groupByKey by reduceByKey

Workflow DAG (directed acyclic graph)
– nodes: RDDs; edges: transformations
– dependency flow (RDD created from …)
– lineage of provenance
– partition depends on another part.
– tracking for recovery (node fails)
– going back thru DAG and reexecuting what needed
– when narrow => easy to track
– when wide, dependency is more complex
– detecting independend tasks => parallelism

actions: final stage of workflow, worker → driver
– triggers execution of the DAG
– returns result to the driver (or writes to HDFS)
– collect; take; reduce; saveAsTextFile

caching
– mark RDD with .cache()
– lazy
– for iterative algorithm
– to memory, disk, both

broadcast variables
– large variable used in all nodes read-only
– transfer just once per Executor
– torrent-like transfer
– good for config, lookup table, join (if table fits in memory)
conf = sc.broadcast({a: b, c: d})
…
conf.value 

accumulators
– write-only on nodes
– collect data across the cluster
acc = sc.accumulator(0) 
def test(x): acc.add(x) 
rdd.foreach(test)
acc.value

Примеры кода выложу завтра, отдельным постом.

Более подробно про Spark:
Мои конспекты по курсу BerkeleyX: CS190.1x Scalable Machine Learning





original post http://vasnake.blogspot.com/2016/02/hadoop-platform-and-application.html

Архив блога

Ярлыки

linux (241) python (191) citation (186) web-develop (170) gov.ru (159) video (124) бытовуха (115) sysadm (100) GIS (97) Zope(Plone) (88) бурчалки (84) Book (83) programming (82) грабли (77) Fun (76) development (73) windsurfing (72) Microsoft (64) hiload (62) internet provider (57) opensource (57) security (57) опыт (55) movie (52) Wisdom (51) ML (47) driving (45) hardware (45) language (45) money (42) JS (41) curse (40) bigdata (39) DBMS (38) ArcGIS (34) history (31) PDA (30) howto (30) holyday (29) Google (27) Oracle (27) tourism (27) virtbox (27) health (26) vacation (24) AI (23) Autodesk (23) SQL (23) humor (23) Java (22) knowledge (22) translate (20) CSS (19) cheatsheet (19) hack (19) Apache (16) Klaipeda (15) Manager (15) web-browser (15) Никонов (15) functional programming (14) happiness (14) music (14) todo (14) PHP (13) course (13) scala (13) weapon (13) HTTP. Apache (12) SSH (12) frameworks (12) hero (12) im (12) settings (12) HTML (11) SciTE (11) USA (11) crypto (11) game (11) map (11) HTTPD (9) ODF (9) Photo (9) купи/продай (9) benchmark (8) documentation (8) 3D (7) CS (7) DNS (7) NoSQL (7) cloud (7) django (7) gun (7) matroska (7) telephony (7) Microsoft Office (6) VCS (6) bluetooth (6) pidgin (6) proxy (6) Donald Knuth (5) ETL (5) NVIDIA (5) Palanga (5) REST (5) bash (5) flash (5) keyboard (5) price (5) samba (5) CGI (4) LISP (4) RoR (4) cache (4) car (4) display (4) holywar (4) nginx (4) pistol (4) spark (4) xml (4) Лебедев (4) IDE (3) IE8 (3) J2EE (3) NTFS (3) RDP (3) holiday (3) mount (3) Гоблин (3) кухня (3) урюк (3) AMQP (2) ERP (2) IE7 (2) NAS (2) Naudoc (2) PDF (2) address (2) air (2) british (2) coffee (2) fitness (2) font (2) ftp (2) fuckup (2) messaging (2) notify (2) sharepoint (2) ssl/tls (2) stardict (2) tests (2) tunnel (2) udev (2) APT (1) Baltic (1) CRUD (1) Canyonlands (1) Cyprus (1) DVDShrink (1) Jabber (1) K9Copy (1) Matlab (1) Portugal (1) VBA (1) WD My Book (1) autoit (1) bike (1) cannabis (1) chat (1) concurrent (1) dbf (1) ext4 (1) idioten (1) join (1) krusader (1) license (1) life (1) migration (1) mindmap (1) navitel (1) pneumatic weapon (1) quiz (1) regexp (1) robot (1) science (1) seaside (1) serialization (1) shore (1) spatial (1) tie (1) vim (1) Науру (1) крысы (1) налоги (1) пианино (1)