В прошлый
раз я начал рассказывать о материале,
который мы изучали на второй неделе –
Introduction to Apache Spark. Но не закончил. Продолжаю.
Lab 2
Learning Apache
Spark. ... you will learn about the Spark data model,
transformations, and actions, and write a word counting program to
count the words in all of Shakespeare's plays.
Как известно,
лабораторки в этом курсе проводятся на
личных виртмашинах, поднятых через
Vagrant и VirtualBox.
vagrant up http://localhost:8001/
Нотебуки там
такие
И понеслась.
Интересные
топики в этой лабе:
– An introduction to
using Apache Spark with the Python pySpark API running in the browser
– Using RDDs and
chaining together transformations and actions
– Lambda functions
– Additional RDD
actions
– Additional RDD
transformations
– Caching RDDs and
storage options
The following
transformations will be covered:
-
map(),
-
mapPartitions(),
-
mapPartitionsWithIndex(),
-
filter(),
-
flatMap(),
-
reduceByKey(),
-
groupByKey()
Трансформации,
это преобразование одного RDD в другой,
распределенно и лениво. На воркерах.
The following actions
will be covered:
-
first(),
-
take(),
-
takeSample(),
-
takeOrdered(),
-
collect(),
-
count(),
-
countByValue(),
-
reduce(),
-
top()
Действия
(акции), это получение результата обсчета
на драйвере.
Also covered:
cache(), unpersist(),
id(), setName()
SparkContext –
грубо говоря, коннект к кластеру, задающий
параметры типа количества узлов и проч.
В лабах этот контекст создается за
кадром и доступен через переменную sc.
At a high level, every
Spark application consists of a driver program that launches various
parallel operations on executor Java Virtual Machines (JVMs) running
either in a cluster or locally on the same machine.... When running
locally, "PySparkShell" is the driver program. In all
cases, this driver program contains the main loop for the program and
creates distributed datasets on the cluster, then applies operations
(transformations & actions) to those datasets
Driver programs access
Spark through a SparkContext object, which represents a connection to
a computing cluster. A Spark context object (sc) is the main entry
point for Spark functionality
Управлятор
Spark
You can view the
details of your Spark application in the Spark web UI. When running
locally you'll find it at localhost:4040.
In the web UI, under the "Jobs" tab, you can see a list of
jobs that have been scheduled or run.
Создадим RDD
data = xrange(1, 10001) xrangeRDD = sc.parallelize(data, 8) xrangeRDD.setName('My first RDD')
Проще пареной
свеклы.
Теперь
посмотрим на трансформации и действия
Теперь выполним
трансформацию,
создав новый RDD через уменьшение каждого
элемента на 1.
def sub(value): return (value - 1) subRDD = xrangeRDD.map(sub)
А теперь actions
print subRDD.collect() print xrangeRDD.count()
Трансформация
filter
def ten(value): if (value < 10): return True else: return False filteredRDD = subRDD.filter(ten) print filteredRDD.collect()
Теперь с
лямбдами
lambdaRDD = subRDD.filter(lambda x: x < 10) lambdaRDD.collect() evenRDD = lambdaRDD.filter(lambda x: x % 2 == 0) evenRDD.collect()
Другие
действия.
# Let's get the first element print filteredRDD.first() # The first 4 print filteredRDD.take(4) # Note that it is ok to take more elements than the RDD has print filteredRDD.take(12) # Retrieve the three smallest elements print filteredRDD.takeOrdered(3) # Retrieve the five largest elements print filteredRDD.top(5) # Pass a lambda function to takeOrdered to reverse the order filteredRDD.takeOrdered(4, lambda s: -s) # Obtain Python's add function from operator import add # Efficiently sum the RDD using reduce print filteredRDD.reduce(add) # Sum using reduce with a lambda function print filteredRDD.reduce(lambda a, b: a + b) # Note that subtraction is not both associative and commutative print filteredRDD.reduce(lambda a, b: a - b) print filteredRDD.repartition(4).reduce(lambda a, b: a - b) # While addition is print filteredRDD.repartition(4).reduce(lambda a, b: a + b)
Here
are two additional actions that are useful for retrieving information
from an RDD: takeSample() andcountByValue()
# takeSample reusing elements print filteredRDD.takeSample(withReplacement=True, num=6) # takeSample without reuse print filteredRDD.takeSample(withReplacement=False, num=6) # Set seed for predictability print filteredRDD.takeSample(withReplacement=False, num=6, seed=500) # Try reruning this cell and the cell above -- the results from this cell will remain constant # Create new base RDD to show countByValue repetitiveRDD = sc.parallelize([1, 2, 3, 1, 2, 3, 1, 2, 1, 2, 3, 3, 3, 4, 5, 4, 6]) print repetitiveRDD.countByValue()
Трансформация
flatMap нужна, если на входе RDD с элементами
в виде коллекций, а нам надо плоский
список.
The
solution is to use a flatMap()transformation,
flatMap()
is
similar to map()
,
except that with flatMap()
each
input item can be mapped to zero or more output elements.# Let's create a new base RDD to work from wordsList = ['cat', 'elephant', 'rat', 'rat', 'cat'] wordsRDD = sc.parallelize(wordsList, 4) # Use map singularAndPluralWordsRDDMap = wordsRDD.map(lambda x: (x, x + 's')) # Use flatMap singularAndPluralWordsRDD = wordsRDD.flatMap(lambda x: (x, x + 's')) # View the results print singularAndPluralWordsRDDMap.collect() print singularAndPluralWordsRDD.collect() # View the number of elements in the RDD print singularAndPluralWordsRDDMap.count() print singularAndPluralWordsRDD.count() [('cat', 'cats'), ('elephant', 'elephants'), ('rat', 'rats'), ('rat', 'rats'), ('cat', 'cats')] ['cat', 'cats', 'elephant', 'elephants', 'rat', 'rats', 'rat', 'rats', 'cat', 'cats'] 5 10
Известно, что
трансформации groupByKey следует избегать,
а то сеть ляжет :)
Both
of these transformations operate on pair RDDs. A pair RDD is an RDD
where each element is a pair tuple (key, value). For
example,
sc.parallelize([('a',
1), ('a', 2), ('b', 1)])
would
create a pair RDD where the keys are 'a', 'a', 'b' and the values are
1, 2, 1.
Here
are more transformations to prefer over
groupByKey()
:-
combineByKey() can be used when you are combining elements but your return type differs from your input value type.
pairRDD = sc.parallelize([('a', 1), ('a', 2), ('b', 1)]) # mapValues only used to improve format for printing print pairRDD.groupByKey().mapValues(lambda x: list(x)).collect() # Different ways to sum by key print pairRDD.groupByKey().map(lambda (k, v): (k, sum(v))).collect() # Using mapValues, which is recommended when they key doesn't change print pairRDD.groupByKey().mapValues(lambda x: sum(x)).collect() # reduceByKey is more efficient / scalable print pairRDD.reduceByKey(add).collect() [('a', [1, 2]), ('b', [1])] [('a', 3), ('b', 1)] [('a', 3), ('b', 1)] [('a', 3), ('b', 1)]
Для чего нужны
эти трансформации, я так и не понял
# mapPartitions takes a function that takes an iterator and returns an iterator print wordsRDD.collect() itemsRDD = wordsRDD.mapPartitions(lambda iterator: [','.join(iterator)]) print itemsRDD.collect() ['cat', 'elephant', 'rat', 'rat', 'cat'] ['cat', 'elephant', 'rat', 'rat,cat'] itemsByPartRDD = wordsRDD.mapPartitionsWithIndex(lambda index, iterator: [(index, list(iterator))]) # We can see that three of the (partitions) workers have one element and the fourth worker has two # elements, although things may not bode well for the rat... print itemsByPartRDD.collect() # Rerun without returning a list (acts more like flatMap) itemsByPartRDD = wordsRDD.mapPartitionsWithIndex(lambda index, iterator: (index, list(iterator))) print itemsByPartRDD.collect() [(0, ['cat']), (1, ['elephant']), (2, ['rat']), (3, ['rat', 'cat'])] [0, ['cat'], 1, ['elephant'], 2, ['rat'], 3, ['rat', 'cat']]
Кэширование
RDD
Тут все просто.
Если нужно повторно использовать RDD,
его надо закешировать.
# Name the RDD filteredRDD.setName('My Filtered RDD') # Cache the RDD filteredRDD.cache() # Is it cached print filteredRDD.is_cached True
# Note that toDebugString also provides storage information print filteredRDD.toDebugString() (8) My Filtered RDD PythonRDD[79] at collect at <ipython-input-74-2e6525e1a0c2>:23 [Memory Serialized 1x Replicated] | ParallelCollectionRDD[73] at parallelize at PythonRDD.scala:392 [Memory Serialized 1x Replicated] # If we are done with the RDD we can unpersist it so that its memory can be reclaimed filteredRDD.unpersist() # Storage level for a non cached RDD print filteredRDD.getStorageLevel() filteredRDD.cache() # Storage level for a cached RDD print filteredRDD.getStorageLevel() Serialized 1x Replicated Memory Serialized 1x Replicated
Вот, пробежавшись
вкратце по основным приемам работы с
RDD в Spark, мы можем сделать что-то похожее
на реальную работу.
Аппликуха
подсчета слов в произведениях Шекспира
In
this lab, we will write code that calculates the most common words in
the Complete
Works of William Shakespeareretrieved
from Project
Gutenberg.
Создадим RDD из
файла
shakespeareRDD = (sc .textFile(fileName, 8) .map(removePunctuation)) print '\n'.join(shakespeareRDD .zipWithIndex() # to (line, lineNum) .map(lambda (l, num): '{0}: {1}'.format(num, l)) # to 'lineNum: line' .take(15)) 0: 1609 1: 2: the sonnets 3: 4: by william shakespeare 5: 6: 7: 8: 1 9: from fairest creatures we desire increase 10: that thereby beautys rose might never die 11: but as the riper should by time decease 12: his tender heir might bear his memory 13: but thou contracted to thine own bright eyes 14: feedst thy lights flame with selfsubstantial fuel
Ой, нам нужна
функция removePunctuation
import re def removePunctuation(text): """Removes punctuation, changes to lower case, and strips leading and trailing spaces. Note: Only spaces, letters, and numbers should be retained. Other characters should should be eliminated (e.g. it's becomes its). Leading and trailing spaces should be removed after punctuation is removed. Args: text (str): A string. Returns: str: The cleaned up string. """ res = не буду же я показывать решение лабы? return res print removePunctuation('Hi, you!') print removePunctuation(' No under_score!') print removePunctuation(' * Remove punctuation then spaces * ') hi you no underscore remove punctuation then spaces
Создадим RDD
содержащий отдельные слова
shakespeareWordsRDD = shakespeareRDD.flatMap(lambda line: line.split(' ')) shakespeareWordCount = shakespeareWordsRDD.count() print shakespeareWordsRDD.top(5) print shakespeareWordCount [u'zwaggerd', u'zounds', u'zounds', u'zounds', u'zounds'] 927631
Уберем пустые
элементы
shakeWordsRDD = shakespeareWordsRDD.filter(lambda x: x) shakeWordCount = shakeWordsRDD.count() print shakeWordCount 882996
А теперь
посчитаем наиболее употребляемые
Шекспиром слова
top15WordsAndCounts = wordCount(shakeWordsRDD).takeOrdered(15, key=lambda (w, cnt): -cnt) print '\n'.join(map(lambda (w, c): '{0}: {1}'.format(w, c), top15WordsAndCounts)) the: 27361 and: 26028 i: 20681 to: 19150 of: 17463 a: 14593 you: 13615 my: 12481 in: 10956 that: 10890 is: 9134 not: 8497 with: 7771 me: 7769 it: 7678
Ой, нужна
функция wordCount
def wordCount(wordListRDD): """Creates a pair RDD with word counts from an RDD of words. Args: wordListRDD (RDD of str): An RDD consisting of words. Returns: RDD of (str, int): An RDD consisting of (word, count) tuples. """ res = ну не буду я подсказывать студням. return res print wordCount(wordsRDD).collect() [('rat', 2), ('elephant', 1), ('cat', 2)]
Вот, собственно,
и вся лабораторка.
Поработали с
трансформациями и действиями, посчитали
сколько раз то или иное слово встречается
в тексте.
Освоились,
короче.
original post http://vasnake.blogspot.com/2015/08/week-2-lab.html
Комментариев нет:
Отправить комментарий