Как и обещал, выкладываю сниппеты к посту
http://vasnake.blogspot.com/2016/02/data-manipulation-at-scale-systems-and.html
про курс:
Data Manipulation at
Scale: Systems and Algorithms
University of
Washington
Assignment: Tweets
sentiment analisys
– sentiment of each
tweet: sum(sentiments for each word)
– sentiment of new
word: numPositiveTweets — numNegativeTweets
– word frequency:
wordCount / numWords
– happiest state:
tweetsForState; max(sumSentiment / numTweets)
– top ten hash tags:
sort(tagCount / totalTags)
# frequency.py import sys import json import re def parseTweetFile(fp): """Return iterator, item is a dict object """ for line in fp: tw = json.loads(line) yield tw def tweetText(tweet): """Return text from tweet or '' """ return tweet.get('text', '') def getTerms(text): """Return list of words. Each word is lowercase and cleared from non alphabet symbols. """ if not text: return [] pat = '[^A-Za-z]+' clean = re.sub(pat, ' ', text) lst = clean.split() res = [x.strip().lower() for x in lst] return res def wordOccurences(tweets): """Return dict with records: term: num of occurences """ res = {} total = 0 for tw in tweets: text = tweetText(tw) terms = getTerms(text) for term in terms: cnt = res.get(term, 0) res[term] = cnt + 1 total += 1 return res, total def main(): tweet_file = open(sys.argv[1]) tweets = parseTweetFile(tweet_file) db, total = wordOccurences(tweets) for rec in db.items(): term, occ = rec freq = float(occ) / float(total) print("%s %f" % (term, freq)) if __name__ == '__main__': main()
# happiest_state.py import sys import json import re def getUSStates(): """Return (abr, states) """ st = { 'AK': 'Alaska', 'AL': 'Alabama', ... 'WV': 'West Virginia', 'WY': 'Wyoming' } abr = [(key.lower(), key) for key, val in st.items()] abr = dict(abr) states = [(val.lower(), key) for key, val in st.items()] states = dict(states) return (abr, states) ST_ABBR, ST_NAMES = getUSStates() def parseSentFile(fp): """Return dictionary word: score """ res = {} for line in fp: term, score = line.split('\t') res[term] = int(score) return res def totalSentiment(terms, scores): """Return sentiment score for word list """ res = 0 for term in terms: score = scores.get(term, 0) res += score return res def splitFullName(fn): """Return tuple (city, state) or ('', '') """ res = ('', '') lst = fn.split(',') cleanLst = [x.strip() for x in lst if x.strip()] if len(cleanLst) == 2: res = (cleanLst[0], cleanLst[1]) return res def detectState(tweet): """Return two letter USA state name or '' """ res = '' place = tweet.get('place', None) if place is not None: country = place.get('country', '') country_code = place.get('country_code', '') full_name = place.get('full_name', '') if country == 'Unated States' or country_code == 'US': fullName = splitFullName(full_name) city, state = fullName res = ST_ABBR.get(state.lower(), '') if not res: res = ST_NAMES.get(city.lower(), '') return res def tweetsForStates(tweets, scores): """Return dict with records: state: (totalScore, numTweets) """ res = {} for tw in tweets: text = tweetText(tw) if text: terms = getTerms(text) twScore = totalSentiment(terms, scores) state = detectState(tw) tot, num = res.get(state, (0, 0)) res[state] = (tot + twScore, num + 1) return res def main(): sent_file = open(sys.argv[1]) tweet_file = open(sys.argv[2]) scores = parseSentFile(sent_file) tweets = parseTweetFile(tweet_file) db = tweetsForStates(tweets, scores) curr = sys.float_info.min happystate = 'undetected' for rec in db.items(): state, data = rec score, num = data aver = float(score) / float(num) if aver > curr and state: curr = aver happystate = state print happystate if __name__ == '__main__': main()
# term_sentiment.py import sys import json import re def parseSentFile(fp): """Return dictionary word: score """ res = {} for line in fp: term, score = line.split('\t') res[term] = int(score) return res def totalSentiment(terms, scores): """Return sentiment score for word list """ res = 0 for term in terms: score = scores.get(term, 0) res += score return res def calcTermsSentiment(tweets, scores): """Return dict with records: term: (pos, neg, tot) """ res = {} for tw in tweets: text = tweetText(tw) terms = getTerms(text) twScore = totalSentiment(terms, scores) for term in terms: score = scores.get(term, None) if score is None: pos, neg, tot = res.get(term, (0, 0, 0)) if twScore > 0: pos += 1 elif twScore < 0: neg +=1 tot += twScore res[term] = (pos, neg, tot) return res def termSentiment(pos, neg, tot): """Return float: term sentiment """ return float(pos - neg) if neg == 0: neg = 1 return float(pos) / float(neg) def main(): sent_file = open(sys.argv[1]) tweet_file = open(sys.argv[2]) scores = parseSentFile(sent_file) tweets = parseTweetFile(tweet_file) db = calcTermsSentiment(tweets, scores) for rec in db.items(): term, counts = rec pos, neg, tot = counts sent = termSentiment(pos, neg, tot) print("%s %f" % (term, sent)) if __name__ == '__main__': main()
# top_ten.py import sys import json import re import collections import heapq def getTags(tweet): """Return list of hashtags or [] """ ents = tweet.get('entities', {}) tags = ents.get('hashtags', []) res = [x['text'] for x in tags] return res def hashtagOccurences(tweets): """Return dict with records: hashtag: num of occurences """ res = collections.defaultdict(int) total = 0 for tw in tweets: tags = getTags(tw) for tag in tags: res[tag] += 1 total += 1 return res, total def main(): tweet_file = open(sys.argv[1]) tweets = parseTweetFile(tweet_file) db, total = hashtagOccurences(tweets) tagfreq = [] for rec in db.items(): tag, occ = rec freq = float(occ) / float(total) tagfreq.append((tag, occ)) tt = heapq.nlargest(10, tagfreq, key = lambda (t,f): f) for rec in tt: tag, freq = rec print("%s %d" % (tag, freq)) if __name__ == '__main__': main()
# tweet_sentiment.py import sys import json import re def getTermScore(term, db): """Return term sentiment score from db """ res = db.get(term, 0) return res def calcTweetSentiment(tweet, db): """Return int: tweet sentiment score. If tweet is not really a tweet (no text in it), return None """ res = None if 'text' in tweet: res = 0 text = tweet['text'] terms = getTerms(text) for term in terms: score = getTermScore(term, db) res += score return res def main(): sent_file = open(sys.argv[1]) tweet_file = open(sys.argv[2]) scores = parseSentFile(sent_file) tweets = parseTweetFile(tweet_file) for tw in tweets: score = calcTweetSentiment(tw, scores) if score is not None: print score else: print 0 if __name__ == '__main__': main()
SQL assignment
– sql for RA
expression 'πterm(σdocid=10398_txt_earn
and count=1(frequency))'
select term from frequency where docid = '10398_txt_earn' and count = 1
– RA
'πterm(σdocid=10398_txt_earn
and count=1(frequency))
U πterm(σdocid=925_txt_trade
and count=1(frequency))'
select term from frequency where docid = '10398_txt_earn' and count = 1 union select term from frequency where docid = '925_txt_trade' and count = 1
– Write a SQL
statement to count the number of unique documents containing the word
"law" or containing the word "legal"
select count(*) from (select distinct docid from frequency where term = 'law' or term = 'legal') x
– Write a SQL
statement to find all documents that have more than 300 total terms
select docid, count(term) as numterms, sum(count) numwords from frequency group by docid having numterms > 300
– count the number of
unique documents that contain both the word 'transactions' and the
word 'world'
select distinct docid from frequency where term = 'transactions' INTERSECT select distinct docid from frequency where term = 'world'
– Matrix
multiplication in SQL (sparse matrix, may be very fast and efficient
in some DB engines):
select A.row_num, B.col_num, sum(A.value * B.value) from A, B where A.col_num = B.row_num group by A.row_num, B.col_num;
– Find the best
matching document to the keyword query "washington taxes
treasury"
compute the similarity
of documents B = A dot Atranspose
Each row of the matrix
is a document vector, with one column for every term in the entire
corpus
docid : rownum
term : colnum
count : value
create view corpus as SELECT * FROM frequency UNION SELECT 'q' as docid, 'washington' as term, 1 as count UNION SELECT 'q' as docid, 'taxes' as term, 1 as count UNION SELECT 'q' as docid, 'treasury' as term, 1 as count; # term-document matrix create view A as select docid rownum, term colnum, count value from corpus; # td matrix transposed create view B as select term rownum, docid colnum, count value from corpus; # matrix C = A dot B create view C as select A.rownum, B.colnum, sum(A.value * B.value) value from A, B where A.colnum = B.rownum and A.rownum < B.colnum group by A.rownum, B.colnum; # find max similarity score for 'q' doc select rownum, max(value) from C where rownum = 'q' or colnum = 'q';
MapReduce assignment
– example
import MapReduce mr = MapReduce.MapReduce() def mapper(record): # key: document identifier # value: document contents key = record[0] value = record[1] words = value.split() for w in words: mr.emit_intermediate(w, 1) # appent '1' to list, list is dict value under 'w' key in storage def reducer(key, list_of_values): # key: word # value: list of occurrence counts from intermediate res. total = 0 for v in list_of_values: total += v mr.emit((key, total)) # appent tuple to result list if __name__ == '__main__': inputdata = open(sys.argv[1]) mr.execute(inputdata, mapper, reducer)
– Create an Inverted
index. Given a set of documents, an inverted index is a dictionary
where each word is associated with a list of the document identifiers
in which that word appears
def mapper((docid, text)) ... words = text.split() for w in words: mr.emit_intermediate(w, docid) def reducer(word, values) ... docs = distinct list(values) mr.emit((word, docs))
– Implement a
relational join as a MapReduce query
def mapper((tabname, joinkey, tabrow)) ... mr.emit_intermediate(joinkey, (tabname, tabrow)) def reducer(key, values) ... if len(values) >= 2: ... masterrow = getMaster(values) detailrows = getDetails(values) for line in detailrows: mr.emit(masterrow + line)
– Consider a simple
social network dataset consisting of a set of key-value pairs
(person, friend) representing a friend relationship between two
people. Count the number of friends for each person
def mapper((pers, frnd)) ... mr.emit_intermediate(pers, frnd) def reducer(pers, friends) ... mr.emit(len distinct list(friends))
– Generate a list of
all non-symmetric friend relationships
def mapper((pers, frnd)) ... mr.emit_intermediate((pers, frnd), frnd) mr.emit_intermediate((frnd, pers), '') def reducer((name1, name2), values) ... if name2 not in values: mr.emit((name1, name2))
– Consider a set of
key-value pairs where each key is sequence id and each value is a
string of nucleotides, e.g., GCTTCCGAAATGCTCGAA.... Remove the last
10 characters from each string of nucleotides, then remove any
duplicates generated
def mapper((sid, nucs)) ... mr.emit_intermediate(nucs[:-10], sid) def reducer(key, val) ... mr.emit(key)
– Design a MapReduce
algorithm to compute the matrix multiplication A x B
def mapper((mname, rownum, colnum, val)) ... if mname == 'a': ... for col in 0..BCOLS: ... mr.emit_intermediate((rownum, col), (mname, colnum, val)) if mname == 'b': ... for row in 0..AROWS: ... mr.emit_intermediate((row, colnum), (mname, rownum, val)) def reducer((row, col), values) ... for item in values: ... _, idx, value = item res[idx].append(value) for idx in res.keys(): ... a, b = res[idx] res[idx] = a * b val = sum(res.values()) mr.emit((row, col, val))
original post http://vasnake.blogspot.com/2016/02/code-for-data-manipulation-at-scale.html