Записки программиста, обо всем и ни о чем. Но, наверное, больше профессионального.

2016-02-19

Code for 'Data Manipulation at Scale: Systems and Algorithms'

Как и обещал, выкладываю сниппеты к посту
http://vasnake.blogspot.com/2016/02/data-manipulation-at-scale-systems-and.html
про курс:

Data Manipulation at Scale: Systems and Algorithms
University of Washington



Assignment: Tweets sentiment analisys
– sentiment of each tweet: sum(sentiments for each word)
– sentiment of new word: numPositiveTweets — numNegativeTweets
– word frequency: wordCount / numWords
– happiest state: tweetsForState; max(sumSentiment / numTweets)
– top ten hash tags: sort(tagCount / totalTags)

# frequency.py
import sys
import json
import re

def parseTweetFile(fp):
    """Return iterator, item is a dict object
    """
    for line in fp:
        tw = json.loads(line)
        yield tw

def tweetText(tweet):
    """Return text from tweet or ''
    """
    return tweet.get('text', '')

def getTerms(text):
    """Return list of words.
    Each word is lowercase and cleared from non alphabet symbols.
    """
    if not text:
        return []

    pat = '[^A-Za-z]+'
    clean = re.sub(pat, ' ', text)
    lst = clean.split()
    res = [x.strip().lower() for x in lst]
    return res

def wordOccurences(tweets):
    """Return dict with records:
        term: num of occurences
    """
    res = {}
    total = 0
    for tw in tweets:
        text = tweetText(tw)
        terms = getTerms(text)
        for term in terms:
            cnt = res.get(term, 0)
            res[term] = cnt + 1
            total += 1
    return res, total

def main():
    tweet_file = open(sys.argv[1])
    tweets = parseTweetFile(tweet_file)
    db, total = wordOccurences(tweets)
    for rec in db.items():
        term, occ = rec
        freq = float(occ) / float(total)
        print("%s %f" % (term, freq))

if __name__ == '__main__':
    main()

# happiest_state.py
import sys
import json
import re

def getUSStates():
    """Return (abr, states)
    """
    st = {
        'AK': 'Alaska',
        'AL': 'Alabama',
...
        'WV': 'West Virginia',
        'WY': 'Wyoming'
    }
    abr = [(key.lower(), key) for key, val in st.items()]
    abr = dict(abr)
    states = [(val.lower(), key) for key, val in st.items()]
    states = dict(states)
    return (abr, states)

ST_ABBR, ST_NAMES = getUSStates()

def parseSentFile(fp):
    """Return dictionary word: score
    """
    res = {}
    for line in fp:
        term, score = line.split('\t')
        res[term] = int(score)
    return res

def totalSentiment(terms, scores):
    """Return sentiment score for word list
    """
    res = 0
    for term in terms:
        score = scores.get(term, 0)
        res += score
    return res

def splitFullName(fn):
    """Return tuple (city, state) or ('', '')
    """
    res = ('', '')
    lst = fn.split(',')
    cleanLst = [x.strip() for x in lst if x.strip()]
    if len(cleanLst) == 2:
        res = (cleanLst[0], cleanLst[1])
    return res

def detectState(tweet):
    """Return two letter USA state name or ''
    """
    res = ''
    place = tweet.get('place', None)
    if place is not None:
        country = place.get('country', '')
        country_code = place.get('country_code', '')
        full_name = place.get('full_name', '')
        if country == 'Unated States' or country_code == 'US':
            fullName = splitFullName(full_name)
            city, state = fullName
            res = ST_ABBR.get(state.lower(), '')
            if not res:
                res = ST_NAMES.get(city.lower(), '')
    return res

def tweetsForStates(tweets, scores):
    """Return dict with records:
        state: (totalScore, numTweets)
    """
    res = {}
    for tw in tweets:
        text = tweetText(tw)
        if text:
            terms = getTerms(text)
            twScore = totalSentiment(terms, scores)
            state = detectState(tw)
            tot, num = res.get(state, (0, 0))
            res[state] = (tot + twScore, num + 1)
    return res

def main():
    sent_file = open(sys.argv[1])
    tweet_file = open(sys.argv[2])
    scores = parseSentFile(sent_file)
    tweets = parseTweetFile(tweet_file)
    db = tweetsForStates(tweets, scores)
    curr = sys.float_info.min
    happystate = 'undetected'
    for rec in db.items():
        state, data = rec
        score, num = data
        aver = float(score) / float(num)
        if aver > curr and state:
            curr = aver
            happystate = state
    print happystate

if __name__ == '__main__':
    main()

# term_sentiment.py
import sys
import json
import re

def parseSentFile(fp):
    """Return dictionary word: score
    """
    res = {}
    for line in fp:
        term, score = line.split('\t')
        res[term] = int(score)
    return res

def totalSentiment(terms, scores):
    """Return sentiment score for word list
    """
    res = 0
    for term in terms:
        score = scores.get(term, 0)
        res += score
    return res

def calcTermsSentiment(tweets, scores):
    """Return dict with records:
        term: (pos, neg, tot)
    """
    res = {}
    for tw in tweets:
        text = tweetText(tw)
        terms = getTerms(text)
        twScore = totalSentiment(terms, scores)
        for term in terms:
            score = scores.get(term, None)
            if score is None:
                pos, neg, tot = res.get(term, (0, 0, 0))
                if twScore > 0:
                    pos += 1
                elif twScore < 0:
                    neg +=1
                tot += twScore
                res[term] = (pos, neg, tot)
    return res

def termSentiment(pos, neg, tot):
    """Return float: term sentiment
    """
    return float(pos - neg)
    if neg == 0:
        neg = 1
    return float(pos) / float(neg)

def main():
    sent_file = open(sys.argv[1])
    tweet_file = open(sys.argv[2])
    scores = parseSentFile(sent_file)
    tweets = parseTweetFile(tweet_file)
    db = calcTermsSentiment(tweets, scores)
    for rec in db.items():
        term, counts = rec
        pos, neg, tot = counts
        sent = termSentiment(pos, neg, tot)
        print("%s %f" % (term, sent))

if __name__ == '__main__':
    main()

# top_ten.py
import sys
import json
import re
import collections
import heapq

def getTags(tweet):
    """Return list of hashtags or []
    """
    ents = tweet.get('entities', {})
    tags = ents.get('hashtags', [])
    res = [x['text'] for x in tags]
    return res

def hashtagOccurences(tweets):
    """Return dict with records:
        hashtag: num of occurences
    """
    res = collections.defaultdict(int)
    total = 0
    for tw in tweets:
        tags = getTags(tw)
        for tag in tags:
            res[tag] += 1
            total += 1
    return res, total

def main():
    tweet_file = open(sys.argv[1])
    tweets = parseTweetFile(tweet_file)
    db, total = hashtagOccurences(tweets)
    tagfreq = []
    for rec in db.items():
        tag, occ = rec
        freq = float(occ) / float(total)
        tagfreq.append((tag, occ))
    tt = heapq.nlargest(10, tagfreq, key = lambda (t,f): f)
    for rec in tt:
        tag, freq = rec
        print("%s %d" % (tag, freq))

if __name__ == '__main__':
    main()

# tweet_sentiment.py
import sys
import json
import re

def getTermScore(term, db):
    """Return term sentiment score from db
    """
    res = db.get(term, 0)
    return res

def calcTweetSentiment(tweet, db):
    """Return int: tweet sentiment score.
    If tweet is not really a tweet (no text in it), return None
    """
    res = None
    if 'text' in tweet:
        res = 0
        text = tweet['text']
        terms = getTerms(text)
        for term in terms:
            score = getTermScore(term, db)
            res += score
    return res

def main():
    sent_file = open(sys.argv[1])
    tweet_file = open(sys.argv[2])
    scores = parseSentFile(sent_file)
    tweets = parseTweetFile(tweet_file)
    for tw in tweets:
        score = calcTweetSentiment(tw, scores)
        if score is not None:
            print score
        else:
            print 0

if __name__ == '__main__':
    main()


SQL assignment
– sql for RA expression 'πtermdocid=10398_txt_earn and count=1(frequency))'
select term from frequency where docid = '10398_txt_earn' and count = 1

– RA 'πtermdocid=10398_txt_earn and count=1(frequency)) U πtermdocid=925_txt_trade and count=1(frequency))'
 select term from frequency where docid = '10398_txt_earn' and count = 1
 union 
 select term from frequency where docid = '925_txt_trade' and count = 1

– Write a SQL statement to count the number of unique documents containing the word "law" or containing the word "legal"
select count(*) from (select distinct docid from frequency where term = 'law' or term = 'legal') x

– Write a SQL statement to find all documents that have more than 300 total terms
select docid, count(term) as numterms, sum(count) numwords 
from frequency
group by docid
having numterms > 300

– count the number of unique documents that contain both the word 'transactions' and the word 'world'
select distinct docid
from frequency
where term = 'transactions'
INTERSECT
select distinct docid
from frequency
where term = 'world'

– Matrix multiplication in SQL (sparse matrix, may be very fast and efficient in some DB engines):
select A.row_num, B.col_num, sum(A.value * B.value)
from A, B
where A.col_num = B.row_num
group by A.row_num, B.col_num;

– Find the best matching document to the keyword query "washington taxes treasury"
compute the similarity of documents B = A dot Atranspose
Each row of the matrix is a document vector, with one column for every term in the entire corpus
docid : rownum
term : colnum
count : value
create view corpus as
SELECT * FROM frequency
UNION
SELECT 'q' as docid, 'washington' as term, 1 as count 
UNION
SELECT 'q' as docid, 'taxes' as term, 1 as count
UNION 
SELECT 'q' as docid, 'treasury' as term, 1 as count;

# term-document matrix
create view A as
select docid rownum, term colnum, count value
from corpus;

# td matrix transposed
create view B as
select term rownum, docid colnum, count value
from corpus;

# matrix C = A dot B
create view C as
select A.rownum, B.colnum, sum(A.value * B.value) value
from A, B
where A.colnum = B.rownum and A.rownum < B.colnum
group by A.rownum, B.colnum;

# find max similarity score for 'q' doc 
select rownum, max(value)
from C
where rownum = 'q' or colnum = 'q';


MapReduce assignment
– example
import MapReduce
mr = MapReduce.MapReduce()

def mapper(record):
    # key: document identifier
    # value: document contents
    key = record[0]
    value = record[1]
    words = value.split()
    for w in words:
      mr.emit_intermediate(w, 1) # appent '1' to list, list is dict value under 'w' key in storage

def reducer(key, list_of_values):
    # key: word
    # value: list of occurrence counts from intermediate res.
    total = 0
    for v in list_of_values:
      total += v
    mr.emit((key, total)) # appent tuple to result list

if __name__ == '__main__':
  inputdata = open(sys.argv[1])
  mr.execute(inputdata, mapper, reducer)

– Create an Inverted index. Given a set of documents, an inverted index is a dictionary where each word is associated with a list of the document identifiers in which that word appears
def mapper((docid, text)) ...
words = text.split()
for w in words: mr.emit_intermediate(w, docid)
def reducer(word, values) ...
docs = distinct list(values)
mr.emit((word, docs))

– Implement a relational join as a MapReduce query
def mapper((tabname, joinkey, tabrow)) ...
mr.emit_intermediate(joinkey, (tabname, tabrow))
def reducer(key, values) ...
if len(values) >= 2: ...
masterrow = getMaster(values)
detailrows = getDetails(values)
for line in detailrows: mr.emit(masterrow + line)

– Consider a simple social network dataset consisting of a set of key-value pairs (person, friend) representing a friend relationship between two people. Count the number of friends for each person
def mapper((pers, frnd)) ...
mr.emit_intermediate(pers, frnd)
def reducer(pers, friends) ...
mr.emit(len distinct list(friends))

– Generate a list of all non-symmetric friend relationships
def mapper((pers, frnd)) ...
mr.emit_intermediate((pers, frnd), frnd)
mr.emit_intermediate((frnd, pers), '')
def reducer((name1, name2), values) ...
if name2 not in values: mr.emit((name1, name2))

– Consider a set of key-value pairs where each key is sequence id and each value is a string of nucleotides, e.g., GCTTCCGAAATGCTCGAA.... Remove the last 10 characters from each string of nucleotides, then remove any duplicates generated
def mapper((sid, nucs)) ...
mr.emit_intermediate(nucs[:-10], sid)
def reducer(key, val) ...
mr.emit(key)

– Design a MapReduce algorithm to compute the matrix multiplication A x B
def mapper((mname, rownum, colnum, val)) ...
if mname == 'a': ...
for col in 0..BCOLS: ...
mr.emit_intermediate((rownum, col), (mname, colnum, val))
if mname == 'b': ...
for row in 0..AROWS: ...
mr.emit_intermediate((row, colnum), (mname, rownum, val))
def reducer((row, col), values) ...
for item in values: ...
_, idx, value = item
res[idx].append(value)
for idx in res.keys(): ...
a, b = res[idx]
res[idx] = a * b
val = sum(res.values())
mr.emit((row, col, val))




original post http://vasnake.blogspot.com/2016/02/code-for-data-manipulation-at-scale.html

Комментариев нет:

Отправить комментарий

Архив блога

Ярлыки

linux (241) python (191) citation (186) web-develop (170) gov.ru (159) video (124) бытовуха (115) sysadm (100) GIS (97) Zope(Plone) (88) бурчалки (84) Book (83) programming (82) грабли (77) Fun (76) development (73) windsurfing (72) Microsoft (64) hiload (62) internet provider (57) opensource (57) security (57) опыт (55) movie (52) Wisdom (51) ML (47) driving (45) hardware (45) language (45) money (42) JS (41) curse (40) bigdata (39) DBMS (38) ArcGIS (34) history (31) PDA (30) howto (30) holyday (29) Google (27) Oracle (27) tourism (27) virtbox (27) health (26) vacation (24) AI (23) Autodesk (23) SQL (23) humor (23) Java (22) knowledge (22) translate (20) CSS (19) cheatsheet (19) hack (19) Apache (16) Manager (15) web-browser (15) Никонов (15) Klaipeda (14) functional programming (14) happiness (14) music (14) todo (14) PHP (13) course (13) scala (13) weapon (13) HTTP. Apache (12) SSH (12) frameworks (12) hero (12) im (12) settings (12) HTML (11) SciTE (11) USA (11) crypto (11) game (11) map (11) HTTPD (9) ODF (9) Photo (9) купи/продай (9) benchmark (8) documentation (8) 3D (7) CS (7) DNS (7) NoSQL (7) cloud (7) django (7) gun (7) matroska (7) telephony (7) Microsoft Office (6) VCS (6) bluetooth (6) pidgin (6) proxy (6) Donald Knuth (5) ETL (5) NVIDIA (5) Palanga (5) REST (5) bash (5) flash (5) keyboard (5) price (5) samba (5) CGI (4) LISP (4) RoR (4) cache (4) car (4) display (4) holywar (4) nginx (4) pistol (4) spark (4) xml (4) Лебедев (4) IDE (3) IE8 (3) J2EE (3) NTFS (3) RDP (3) holiday (3) mount (3) Гоблин (3) кухня (3) урюк (3) AMQP (2) ERP (2) IE7 (2) NAS (2) Naudoc (2) PDF (2) address (2) air (2) british (2) coffee (2) fitness (2) font (2) ftp (2) fuckup (2) messaging (2) notify (2) sharepoint (2) ssl/tls (2) stardict (2) tests (2) tunnel (2) udev (2) APT (1) CRUD (1) Canyonlands (1) Cyprus (1) DVDShrink (1) Jabber (1) K9Copy (1) Matlab (1) Portugal (1) VBA (1) WD My Book (1) autoit (1) bike (1) cannabis (1) chat (1) concurrent (1) dbf (1) ext4 (1) idioten (1) join (1) krusader (1) license (1) life (1) migration (1) mindmap (1) navitel (1) pneumatic weapon (1) quiz (1) regexp (1) robot (1) science (1) serialization (1) spatial (1) tie (1) vim (1) Науру (1) крысы (1) налоги (1) пианино (1)