Записки программиста, обо всем и ни о чем. Но, наверное, больше профессионального.

2015-08-12

Week 2, lab


В прошлый раз я начал рассказывать о материале, который мы изучали на второй неделе – Introduction to Apache Spark. Но не закончил. Продолжаю.

Lab 2
Learning Apache Spark. ... you will learn about the Spark data model, transformations, and actions, and write a word counting program to count the words in all of Shakespeare's plays.

Как известно, лабораторки в этом курсе проводятся на личных виртмашинах, поднятых через Vagrant и VirtualBox.
vagrant up
http://localhost:8001/

Нотебуки там такие

И понеслась.

Интересные топики в этой лабе:
– An introduction to using Apache Spark with the Python pySpark API running in the browser
– Using RDDs and chaining together transformations and actions
– Lambda functions
– Additional RDD actions
– Additional RDD transformations
– Caching RDDs and storage options

The following transformations will be covered:
  • map(),
  • mapPartitions(),
  • mapPartitionsWithIndex(),
  • filter(),
  • flatMap(),
  • reduceByKey(),
  • groupByKey()
Трансформации, это преобразование одного RDD в другой, распределенно и лениво. На воркерах.

The following actions will be covered:
  • first(),
  • take(),
  • takeSample(),
  • takeOrdered(),
  • collect(),
  • count(),
  • countByValue(),
  • reduce(),
  • top()
Действия (акции), это получение результата обсчета на драйвере.

Also covered:
cache(), unpersist(), id(), setName()

SparkContext – грубо говоря, коннект к кластеру, задающий параметры типа количества узлов и проч. В лабах этот контекст создается за кадром и доступен через переменную sc.
At a high level, every Spark application consists of a driver program that launches various parallel operations on executor Java Virtual Machines (JVMs) running either in a cluster or locally on the same machine.... When running locally, "PySparkShell" is the driver program. In all cases, this driver program contains the main loop for the program and creates distributed datasets on the cluster, then applies operations (transformations & actions) to those datasets

Driver programs access Spark through a SparkContext object, which represents a connection to a computing cluster. A Spark context object (sc) is the main entry point for Spark functionality

Управлятор Spark
You can view the details of your Spark application in the Spark web UI. When running locally you'll find it at localhost:4040. In the web UI, under the "Jobs" tab, you can see a list of jobs that have been scheduled or run.

Создадим RDD
data = xrange(1, 10001)
xrangeRDD = sc.parallelize(data, 8)
xrangeRDD.setName('My first RDD')

Проще пареной свеклы.

Теперь посмотрим на трансформации и действия

Теперь выполним трансформацию, создав новый RDD через уменьшение каждого элемента на 1.
def sub(value):
    return (value - 1)
subRDD = xrangeRDD.map(sub)

А теперь actions
print subRDD.collect()
print xrangeRDD.count()

Трансформация filter
def ten(value):
    if (value < 10):
        return True
    else:
        return False
filteredRDD = subRDD.filter(ten)
print filteredRDD.collect()

Теперь с лямбдами
lambdaRDD = subRDD.filter(lambda x: x < 10)
lambdaRDD.collect()
evenRDD = lambdaRDD.filter(lambda x: x % 2 == 0)
evenRDD.collect()

Другие действия.
Let's investigate the additional actionsfirst()take()top()takeOrdered(), and reduce()
# Let's get the first element
print filteredRDD.first()
# The first 4
print filteredRDD.take(4)
# Note that it is ok to take more elements than the RDD has
print filteredRDD.take(12)
# Retrieve the three smallest elements
print filteredRDD.takeOrdered(3)
# Retrieve the five largest elements
print filteredRDD.top(5)
# Pass a lambda function to takeOrdered to reverse the order
filteredRDD.takeOrdered(4, lambda s: -s)
# Obtain Python's add function
from operator import add
# Efficiently sum the RDD using reduce
print filteredRDD.reduce(add)
# Sum using reduce with a lambda function
print filteredRDD.reduce(lambda a, b: a + b)
# Note that subtraction is not both associative and commutative
print filteredRDD.reduce(lambda a, b: a - b)
print filteredRDD.repartition(4).reduce(lambda a, b: a - b)
# While addition is
print filteredRDD.repartition(4).reduce(lambda a, b: a + b)

Here are two additional actions that are useful for retrieving information from an RDD: takeSample() andcountByValue()
# takeSample reusing elements
print filteredRDD.takeSample(withReplacement=True, num=6)
# takeSample without reuse
print filteredRDD.takeSample(withReplacement=False, num=6)
# Set seed for predictability
print filteredRDD.takeSample(withReplacement=False, num=6, seed=500)
# Try reruning this cell and the cell above -- the results from this cell will remain constant

# Create new base RDD to show countByValue
repetitiveRDD = sc.parallelize([1, 2, 3, 1, 2, 3, 1, 2, 1, 2, 3, 3, 3, 4, 5, 4, 6])
print repetitiveRDD.countByValue()

Трансформация flatMap нужна, если на входе RDD с элементами в виде коллекций, а нам надо плоский список.
The solution is to use a flatMap()transformation, flatMap() is similar to map(), except that with flatMap() each input item can be mapped to zero or more output elements.
# Let's create a new base RDD to work from
wordsList = ['cat', 'elephant', 'rat', 'rat', 'cat']
wordsRDD = sc.parallelize(wordsList, 4)

# Use map
singularAndPluralWordsRDDMap = wordsRDD.map(lambda x: (x, x + 's'))
# Use flatMap
singularAndPluralWordsRDD = wordsRDD.flatMap(lambda x: (x, x + 's'))

# View the results
print singularAndPluralWordsRDDMap.collect()
print singularAndPluralWordsRDD.collect()
# View the number of elements in the RDD
print singularAndPluralWordsRDDMap.count()
print singularAndPluralWordsRDD.count()

[('cat', 'cats'), ('elephant', 'elephants'), ('rat', 'rats'), ('rat', 'rats'), ('cat', 'cats')]
['cat', 'cats', 'elephant', 'elephants', 'rat', 'rats', 'rat', 'rats', 'cat', 'cats']
5
10

Известно, что трансформации groupByKey следует избегать, а то сеть ляжет :)

Let's investigate the additional transformations: groupByKey() and reduceByKey().
Both of these transformations operate on pair RDDs. A pair RDD is an RDD where each element is a pair tuple (key, value). For example, sc.parallelize([('a', 1), ('a', 2), ('b', 1)]) would create a pair RDD where the keys are 'a', 'a', 'b' and the values are 1, 2, 1.
Here are more transformations to prefer over groupByKey():
        • combineByKey() can be used when you are combining elements but your return type differs from your input value type.
        • foldByKey() merges the values for each key using an associative function and a neutral "zero value"
pairRDD = sc.parallelize([('a', 1), ('a', 2), ('b', 1)])
# mapValues only used to improve format for printing
print pairRDD.groupByKey().mapValues(lambda x: list(x)).collect()

# Different ways to sum by key
print pairRDD.groupByKey().map(lambda (k, v): (k, sum(v))).collect()
# Using mapValues, which is recommended when they key doesn't change
print pairRDD.groupByKey().mapValues(lambda x: sum(x)).collect()
# reduceByKey is more efficient / scalable
print pairRDD.reduceByKey(add).collect()

[('a', [1, 2]), ('b', [1])]
[('a', 3), ('b', 1)]
[('a', 3), ('b', 1)]
[('a', 3), ('b', 1)]

Для чего нужны эти трансформации, я так и не понял
Let's investigate the advanced transformations: mapPartitions() and mapPartitionsWithIndex()
# mapPartitions takes a function that takes an iterator and returns an iterator
print wordsRDD.collect()
itemsRDD = wordsRDD.mapPartitions(lambda iterator: [','.join(iterator)])
print itemsRDD.collect()

['cat', 'elephant', 'rat', 'rat', 'cat']
['cat', 'elephant', 'rat', 'rat,cat']

itemsByPartRDD = wordsRDD.mapPartitionsWithIndex(lambda index, iterator: [(index, list(iterator))])
# We can see that three of the (partitions) workers have one element and the fourth worker has two
# elements, although things may not bode well for the rat...
print itemsByPartRDD.collect()
# Rerun without returning a list (acts more like flatMap)
itemsByPartRDD = wordsRDD.mapPartitionsWithIndex(lambda index, iterator: (index, list(iterator)))
print itemsByPartRDD.collect()

[(0, ['cat']), (1, ['elephant']), (2, ['rat']), (3, ['rat', 'cat'])]
[0, ['cat'], 1, ['elephant'], 2, ['rat'], 3, ['rat', 'cat']]

Кэширование RDD
Тут все просто. Если нужно повторно использовать RDD, его надо закешировать.
# Name the RDD
filteredRDD.setName('My Filtered RDD')
# Cache the RDD
filteredRDD.cache()
# Is it cached
print filteredRDD.is_cached
True

# Note that toDebugString also provides storage information
print filteredRDD.toDebugString()

(8) My Filtered RDD PythonRDD[79] at collect at <ipython-input-74-2e6525e1a0c2>:23 [Memory Serialized 1x Replicated]  |  ParallelCollectionRDD[73] at parallelize at PythonRDD.scala:392 [Memory Serialized 1x Replicated]

# If we are done with the RDD we can unpersist it so that its memory can be reclaimed
filteredRDD.unpersist()
# Storage level for a non cached RDD
print filteredRDD.getStorageLevel()
filteredRDD.cache()
# Storage level for a cached RDD
print filteredRDD.getStorageLevel()

Serialized 1x Replicated
Memory Serialized 1x Replicated

Вот, пробежавшись вкратце по основным приемам работы с RDD в Spark, мы можем сделать что-то похожее на реальную работу.

Аппликуха подсчета слов в произведениях Шекспира

 In this lab, we will write code that calculates the most common words in the Complete Works of William Shakespeareretrieved from Project Gutenberg.

Создадим RDD из файла
shakespeareRDD = (sc
                  .textFile(fileName, 8)
                  .map(removePunctuation))
print '\n'.join(shakespeareRDD
                .zipWithIndex()  # to (line, lineNum)
                .map(lambda (l, num): '{0}: {1}'.format(num, l))  # to 'lineNum: line'
                .take(15))

0: 1609
1: 
2: the sonnets
3: 
4: by william shakespeare
5: 
6: 
7: 
8: 1
9: from fairest creatures we desire increase
10: that thereby beautys rose might never die
11: but as the riper should by time decease
12: his tender heir might bear his memory
13: but thou contracted to thine own bright eyes
14: feedst thy lights flame with selfsubstantial fuel

Ой, нам нужна функция removePunctuation
import re
def removePunctuation(text):
    """Removes punctuation, changes to lower case, and strips leading and trailing spaces.
​
    Note:
        Only spaces, letters, and numbers should be retained.  Other characters should should be
        eliminated (e.g. it's becomes its).  Leading and trailing spaces should be removed after
        punctuation is removed.
​
    Args:
        text (str): A string.
​
    Returns:
        str: The cleaned up string.
    """
    res = не буду же я показывать решение лабы?
    return res
print removePunctuation('Hi, you!')
print removePunctuation(' No under_score!')
print removePunctuation(' *      Remove punctuation then spaces  * ')

hi you
no underscore
remove punctuation then spaces

Создадим RDD содержащий отдельные слова
shakespeareWordsRDD = shakespeareRDD.flatMap(lambda line: line.split(' '))
shakespeareWordCount = shakespeareWordsRDD.count()
print shakespeareWordsRDD.top(5)
print shakespeareWordCount

[u'zwaggerd', u'zounds', u'zounds', u'zounds', u'zounds']
927631

Уберем пустые элементы
shakeWordsRDD = shakespeareWordsRDD.filter(lambda x: x)
shakeWordCount = shakeWordsRDD.count()
print shakeWordCount

882996

А теперь посчитаем наиболее употребляемые Шекспиром слова
top15WordsAndCounts = wordCount(shakeWordsRDD).takeOrdered(15, key=lambda (w, cnt): -cnt)
print '\n'.join(map(lambda (w, c): '{0}: {1}'.format(w, c), top15WordsAndCounts))

the: 27361
and: 26028
i: 20681
to: 19150
of: 17463
a: 14593
you: 13615
my: 12481
in: 10956
that: 10890
is: 9134
not: 8497
with: 7771
me: 7769
it: 7678

Ой, нужна функция wordCount
def wordCount(wordListRDD):
    """Creates a pair RDD with word counts from an RDD of words.
​
    Args:
        wordListRDD (RDD of str): An RDD consisting of words.
​
    Returns:
        RDD of (str, int): An RDD consisting of (word, count) tuples.
    """
    res = ну не буду я подсказывать студням.
    return res
print wordCount(wordsRDD).collect()
[('rat', 2), ('elephant', 1), ('cat', 2)]

Вот, собственно, и вся лабораторка.

Поработали с трансформациями и действиями, посчитали сколько раз то или иное слово встречается в тексте.

Освоились, короче.



original post http://vasnake.blogspot.com/2015/08/week-2-lab.html

Комментариев нет:

Отправить комментарий

Архив блога

Ярлыки

linux (241) python (191) citation (186) web-develop (170) gov.ru (159) video (124) бытовуха (115) sysadm (100) GIS (97) Zope(Plone) (88) бурчалки (84) Book (83) programming (82) грабли (77) Fun (76) development (73) windsurfing (72) Microsoft (64) hiload (62) internet provider (57) opensource (57) security (57) опыт (55) movie (52) Wisdom (51) ML (47) driving (45) hardware (45) language (45) money (42) JS (41) curse (40) bigdata (39) DBMS (38) ArcGIS (34) history (31) PDA (30) howto (30) holyday (29) Google (27) Oracle (27) tourism (27) virtbox (27) health (26) vacation (24) AI (23) Autodesk (23) SQL (23) humor (23) Java (22) knowledge (22) translate (20) CSS (19) cheatsheet (19) hack (19) Apache (16) Klaipeda (15) Manager (15) web-browser (15) Никонов (15) functional programming (14) happiness (14) music (14) todo (14) PHP (13) course (13) scala (13) weapon (13) HTTP. Apache (12) SSH (12) frameworks (12) hero (12) im (12) settings (12) HTML (11) SciTE (11) USA (11) crypto (11) game (11) map (11) HTTPD (9) ODF (9) Photo (9) купи/продай (9) benchmark (8) documentation (8) 3D (7) CS (7) DNS (7) NoSQL (7) cloud (7) django (7) gun (7) matroska (7) telephony (7) Microsoft Office (6) VCS (6) bluetooth (6) pidgin (6) proxy (6) Donald Knuth (5) ETL (5) NVIDIA (5) Palanga (5) REST (5) bash (5) flash (5) keyboard (5) price (5) samba (5) CGI (4) LISP (4) RoR (4) cache (4) car (4) display (4) holywar (4) nginx (4) pistol (4) spark (4) xml (4) Лебедев (4) IDE (3) IE8 (3) J2EE (3) NTFS (3) RDP (3) holiday (3) mount (3) Гоблин (3) кухня (3) урюк (3) AMQP (2) ERP (2) IE7 (2) NAS (2) Naudoc (2) PDF (2) address (2) air (2) british (2) coffee (2) fitness (2) font (2) ftp (2) fuckup (2) messaging (2) notify (2) sharepoint (2) ssl/tls (2) stardict (2) tests (2) tunnel (2) udev (2) APT (1) Baltic (1) CRUD (1) Canyonlands (1) Cyprus (1) DVDShrink (1) Jabber (1) K9Copy (1) Matlab (1) Portugal (1) VBA (1) WD My Book (1) autoit (1) bike (1) cannabis (1) chat (1) concurrent (1) dbf (1) ext4 (1) idioten (1) join (1) krusader (1) license (1) life (1) migration (1) mindmap (1) navitel (1) pneumatic weapon (1) quiz (1) regexp (1) robot (1) science (1) seaside (1) serialization (1) shore (1) spatial (1) tie (1) vim (1) Науру (1) крысы (1) налоги (1) пианино (1)