Как и обещал, выкладываю сниппеты к посту
http://vasnake.blogspot.com/2016/02/data-manipulation-at-scale-systems-and.html
про курс:
Data Manipulation at
Scale: Systems and Algorithms
University of
Washington
Assignment: Tweets
sentiment analisys
– sentiment of each
tweet: sum(sentiments for each word)
– sentiment of new
word: numPositiveTweets — numNegativeTweets
– word frequency:
wordCount / numWords
– happiest state:
tweetsForState; max(sumSentiment / numTweets)
– top ten hash tags:
sort(tagCount / totalTags)
# frequency.py
import sys
import json
import re
def parseTweetFile(fp):
"""Return iterator, item is a dict object
"""
for line in fp:
tw = json.loads(line)
yield tw
def tweetText(tweet):
"""Return text from tweet or ''
"""
return tweet.get('text', '')
def getTerms(text):
"""Return list of words.
Each word is lowercase and cleared from non alphabet symbols.
"""
if not text:
return []
pat = '[^A-Za-z]+'
clean = re.sub(pat, ' ', text)
lst = clean.split()
res = [x.strip().lower() for x in lst]
return res
def wordOccurences(tweets):
"""Return dict with records:
term: num of occurences
"""
res = {}
total = 0
for tw in tweets:
text = tweetText(tw)
terms = getTerms(text)
for term in terms:
cnt = res.get(term, 0)
res[term] = cnt + 1
total += 1
return res, total
def main():
tweet_file = open(sys.argv[1])
tweets = parseTweetFile(tweet_file)
db, total = wordOccurences(tweets)
for rec in db.items():
term, occ = rec
freq = float(occ) / float(total)
print("%s %f" % (term, freq))
if __name__ == '__main__':
main()
# happiest_state.py
import sys
import json
import re
def getUSStates():
"""Return (abr, states)
"""
st = {
'AK': 'Alaska',
'AL': 'Alabama',
...
'WV': 'West Virginia',
'WY': 'Wyoming'
}
abr = [(key.lower(), key) for key, val in st.items()]
abr = dict(abr)
states = [(val.lower(), key) for key, val in st.items()]
states = dict(states)
return (abr, states)
ST_ABBR, ST_NAMES = getUSStates()
def parseSentFile(fp):
"""Return dictionary word: score
"""
res = {}
for line in fp:
term, score = line.split('\t')
res[term] = int(score)
return res
def totalSentiment(terms, scores):
"""Return sentiment score for word list
"""
res = 0
for term in terms:
score = scores.get(term, 0)
res += score
return res
def splitFullName(fn):
"""Return tuple (city, state) or ('', '')
"""
res = ('', '')
lst = fn.split(',')
cleanLst = [x.strip() for x in lst if x.strip()]
if len(cleanLst) == 2:
res = (cleanLst[0], cleanLst[1])
return res
def detectState(tweet):
"""Return two letter USA state name or ''
"""
res = ''
place = tweet.get('place', None)
if place is not None:
country = place.get('country', '')
country_code = place.get('country_code', '')
full_name = place.get('full_name', '')
if country == 'Unated States' or country_code == 'US':
fullName = splitFullName(full_name)
city, state = fullName
res = ST_ABBR.get(state.lower(), '')
if not res:
res = ST_NAMES.get(city.lower(), '')
return res
def tweetsForStates(tweets, scores):
"""Return dict with records:
state: (totalScore, numTweets)
"""
res = {}
for tw in tweets:
text = tweetText(tw)
if text:
terms = getTerms(text)
twScore = totalSentiment(terms, scores)
state = detectState(tw)
tot, num = res.get(state, (0, 0))
res[state] = (tot + twScore, num + 1)
return res
def main():
sent_file = open(sys.argv[1])
tweet_file = open(sys.argv[2])
scores = parseSentFile(sent_file)
tweets = parseTweetFile(tweet_file)
db = tweetsForStates(tweets, scores)
curr = sys.float_info.min
happystate = 'undetected'
for rec in db.items():
state, data = rec
score, num = data
aver = float(score) / float(num)
if aver > curr and state:
curr = aver
happystate = state
print happystate
if __name__ == '__main__':
main()
# term_sentiment.py
import sys
import json
import re
def parseSentFile(fp):
"""Return dictionary word: score
"""
res = {}
for line in fp:
term, score = line.split('\t')
res[term] = int(score)
return res
def totalSentiment(terms, scores):
"""Return sentiment score for word list
"""
res = 0
for term in terms:
score = scores.get(term, 0)
res += score
return res
def calcTermsSentiment(tweets, scores):
"""Return dict with records:
term: (pos, neg, tot)
"""
res = {}
for tw in tweets:
text = tweetText(tw)
terms = getTerms(text)
twScore = totalSentiment(terms, scores)
for term in terms:
score = scores.get(term, None)
if score is None:
pos, neg, tot = res.get(term, (0, 0, 0))
if twScore > 0:
pos += 1
elif twScore < 0:
neg +=1
tot += twScore
res[term] = (pos, neg, tot)
return res
def termSentiment(pos, neg, tot):
"""Return float: term sentiment
"""
return float(pos - neg)
if neg == 0:
neg = 1
return float(pos) / float(neg)
def main():
sent_file = open(sys.argv[1])
tweet_file = open(sys.argv[2])
scores = parseSentFile(sent_file)
tweets = parseTweetFile(tweet_file)
db = calcTermsSentiment(tweets, scores)
for rec in db.items():
term, counts = rec
pos, neg, tot = counts
sent = termSentiment(pos, neg, tot)
print("%s %f" % (term, sent))
if __name__ == '__main__':
main()
# top_ten.py
import sys
import json
import re
import collections
import heapq
def getTags(tweet):
"""Return list of hashtags or []
"""
ents = tweet.get('entities', {})
tags = ents.get('hashtags', [])
res = [x['text'] for x in tags]
return res
def hashtagOccurences(tweets):
"""Return dict with records:
hashtag: num of occurences
"""
res = collections.defaultdict(int)
total = 0
for tw in tweets:
tags = getTags(tw)
for tag in tags:
res[tag] += 1
total += 1
return res, total
def main():
tweet_file = open(sys.argv[1])
tweets = parseTweetFile(tweet_file)
db, total = hashtagOccurences(tweets)
tagfreq = []
for rec in db.items():
tag, occ = rec
freq = float(occ) / float(total)
tagfreq.append((tag, occ))
tt = heapq.nlargest(10, tagfreq, key = lambda (t,f): f)
for rec in tt:
tag, freq = rec
print("%s %d" % (tag, freq))
if __name__ == '__main__':
main()
# tweet_sentiment.py
import sys
import json
import re
def getTermScore(term, db):
"""Return term sentiment score from db
"""
res = db.get(term, 0)
return res
def calcTweetSentiment(tweet, db):
"""Return int: tweet sentiment score.
If tweet is not really a tweet (no text in it), return None
"""
res = None
if 'text' in tweet:
res = 0
text = tweet['text']
terms = getTerms(text)
for term in terms:
score = getTermScore(term, db)
res += score
return res
def main():
sent_file = open(sys.argv[1])
tweet_file = open(sys.argv[2])
scores = parseSentFile(sent_file)
tweets = parseTweetFile(tweet_file)
for tw in tweets:
score = calcTweetSentiment(tw, scores)
if score is not None:
print score
else:
print 0
if __name__ == '__main__':
main()
SQL assignment
– sql for RA
expression 'πterm(σdocid=10398_txt_earn
and count=1(frequency))'
select term from frequency where docid = '10398_txt_earn' and count = 1
– RA
'πterm(σdocid=10398_txt_earn
and count=1(frequency))
U πterm(σdocid=925_txt_trade
and count=1(frequency))'
select term from frequency where docid = '10398_txt_earn' and count = 1 union select term from frequency where docid = '925_txt_trade' and count = 1
– Write a SQL
statement to count the number of unique documents containing the word
"law" or containing the word "legal"
select count(*) from (select distinct docid from frequency where term = 'law' or term = 'legal') x
– Write a SQL
statement to find all documents that have more than 300 total terms
select docid, count(term) as numterms, sum(count) numwords from frequency group by docid having numterms > 300
– count the number of
unique documents that contain both the word 'transactions' and the
word 'world'
select distinct docid from frequency where term = 'transactions' INTERSECT select distinct docid from frequency where term = 'world'
– Matrix
multiplication in SQL (sparse matrix, may be very fast and efficient
in some DB engines):
select A.row_num, B.col_num, sum(A.value * B.value) from A, B where A.col_num = B.row_num group by A.row_num, B.col_num;
– Find the best
matching document to the keyword query "washington taxes
treasury"
compute the similarity
of documents B = A dot Atranspose
Each row of the matrix
is a document vector, with one column for every term in the entire
corpus
docid : rownum
term : colnum
count : value
create view corpus as SELECT * FROM frequency UNION SELECT 'q' as docid, 'washington' as term, 1 as count UNION SELECT 'q' as docid, 'taxes' as term, 1 as count UNION SELECT 'q' as docid, 'treasury' as term, 1 as count; # term-document matrix create view A as select docid rownum, term colnum, count value from corpus; # td matrix transposed create view B as select term rownum, docid colnum, count value from corpus; # matrix C = A dot B create view C as select A.rownum, B.colnum, sum(A.value * B.value) value from A, B where A.colnum = B.rownum and A.rownum < B.colnum group by A.rownum, B.colnum; # find max similarity score for 'q' doc select rownum, max(value) from C where rownum = 'q' or colnum = 'q';
MapReduce assignment
– example
import MapReduce
mr = MapReduce.MapReduce()
def mapper(record):
# key: document identifier
# value: document contents
key = record[0]
value = record[1]
words = value.split()
for w in words:
mr.emit_intermediate(w, 1) # appent '1' to list, list is dict value under 'w' key in storage
def reducer(key, list_of_values):
# key: word
# value: list of occurrence counts from intermediate res.
total = 0
for v in list_of_values:
total += v
mr.emit((key, total)) # appent tuple to result list
if __name__ == '__main__':
inputdata = open(sys.argv[1])
mr.execute(inputdata, mapper, reducer)
– Create an Inverted
index. Given a set of documents, an inverted index is a dictionary
where each word is associated with a list of the document identifiers
in which that word appears
def mapper((docid, text)) ... words = text.split() for w in words: mr.emit_intermediate(w, docid) def reducer(word, values) ... docs = distinct list(values) mr.emit((word, docs))
– Implement a
relational join as a MapReduce query
def mapper((tabname, joinkey, tabrow)) ... mr.emit_intermediate(joinkey, (tabname, tabrow)) def reducer(key, values) ... if len(values) >= 2: ... masterrow = getMaster(values) detailrows = getDetails(values) for line in detailrows: mr.emit(masterrow + line)
– Consider a simple
social network dataset consisting of a set of key-value pairs
(person, friend) representing a friend relationship between two
people. Count the number of friends for each person
def mapper((pers, frnd)) ... mr.emit_intermediate(pers, frnd) def reducer(pers, friends) ... mr.emit(len distinct list(friends))
– Generate a list of
all non-symmetric friend relationships
def mapper((pers, frnd)) ... mr.emit_intermediate((pers, frnd), frnd) mr.emit_intermediate((frnd, pers), '') def reducer((name1, name2), values) ... if name2 not in values: mr.emit((name1, name2))
– Consider a set of
key-value pairs where each key is sequence id and each value is a
string of nucleotides, e.g., GCTTCCGAAATGCTCGAA.... Remove the last
10 characters from each string of nucleotides, then remove any
duplicates generated
def mapper((sid, nucs)) ... mr.emit_intermediate(nucs[:-10], sid) def reducer(key, val) ... mr.emit(key)
– Design a MapReduce
algorithm to compute the matrix multiplication A x B
def mapper((mname, rownum, colnum, val)) ... if mname == 'a': ... for col in 0..BCOLS: ... mr.emit_intermediate((rownum, col), (mname, colnum, val)) if mname == 'b': ... for row in 0..AROWS: ... mr.emit_intermediate((row, colnum), (mname, rownum, val)) def reducer((row, col), values) ... for item in values: ... _, idx, value = item res[idx].append(value) for idx in res.keys(): ... a, b = res[idx] res[idx] = a * b val = sum(res.values()) mr.emit((row, col, val))
original post http://vasnake.blogspot.com/2016/02/code-for-data-manipulation-at-scale.html


