Курс
BerkeleyX: CS190.1x Scalable Machine Learning
и мои подробные
конспекты
экстра
Пришла пора
обновить память и скомпилять это все в
дайджест.
Content
In this course, you
will learn how to write and debug Python Spark (pySpark) programs in
five weekly lab exercises
WEEK 0: Setup Course
Software Environment
Topics:
Step-by-step instructions for installing / using the course software
environment, and submitting assignments to the course autograder.
WEEK 1: Course Overview
and Introduction to Machine Learning
Topics: Course
goals, Apache Spark overview, basic machine learning concepts, steps
of typical supervised learning pipelines, linear algebra review,
computational complexity / big O notation review.
Lab 1: NumPy,
Linear Algebra, and Lambda Function Review.
WEEK 2: Introduction to
Apache Spark
Topics: Big data
and hardware trends, history of Apache Spark, Spark's Resilient
Distributed Datasets (RDDs), transformations, and actions.
Lab 2: Learning
Apache Spark. Perform your first course lab where you will learn
about the Spark data model, transformations, and actions, and write a
word counting program to count the words in all of Shakespeare's
plays.
WEEK 3: Linear
Regression and Distributed Machine Learning Principles
Topics: Linear
regression formulation and closed-form solution, distributed machine
learning principles (related to computation, storage, and
communication), gradient descent, quadratic features, grid search.
Lab 3: Millionsong
Regression Pipeline. Develop an end-to-end linear regression pipeline
to predict the release year of a song given a set of audio features.
You will implement a gradient descent solver for linear regression,
use Spark's machine Learning library ( mllib) to train additional
models, tune models via grid search, improve accuracy using quadratic
features, and visualize various intermediate results to build
intuition.
WEEK 4: Logistic
Regression and Click-through Rate Prediction
Topics: Online
advertising, linear classification, logistic regression, working with
probabilistic predictions, categorical data and one-hot-encoding,
feature hashing for dimensionality reduction.
Lab 4:
Click-through Rate Prediction Pipeline. Construct a logistic
regression pipeline to predict click-through rate using data from a
recent Kaggle competition. You will extract numerical features from
the raw categorical data using one-hot-encoding, reduce the
dimensionality of these features via hashing, train logistic
regression models using mllib, tune hyperparameter via grid search,
and interpret probabilistic predictions via a ROC plot.
WEEK 5: Principal
Component Analysis and Neuroimaging
Topics:
Introduction to neuroscience and neuroimaging data, exploratory data
analysis, principal component analysis (PCA) formulations and
solution, distributed PCA.
Lab 5: Neuroimaging
Analysis via PCA - Identify patterns of brain activity in larval
zebrafish. You will work with time-varying images (generated using a
technique called light-sheet microscopy) that capture a zebrafish's
neural activity as it is presented with a moving visual pattern.
After implementing distributed PCA from scratch and gaining intuition
by working with synthetic data, you will use PCA to identify distinct
patterns across the zebrafish brain that are induced by different
types of stimuli.
Digest...
WEEK 0: Setup Course Software Environment
Topics:
Step-by-step instructions for installing / using the course software
environment, and submitting assignments to the course autograder.
Information for setting
up for the Spark MOOC, and lab assignments for the course.
Install VirtualBox,
Vagrant;
Put Vagrantfile into
work dir, run: $ vagrant up --provider=virtualbox
Open url
http://localhost:8001/
in browser
Upload lab*.ipynb into
Jupiter
WEEK 1: Course Overview and Introduction to Machine Learning
Topics: Course
goals, Apache Spark overview, basic machine learning concepts, steps
of typical supervised learning pipelines, linear algebra review,
computational complexity / big O notation review.
Lab 1: NumPy,
Linear Algebra, and Lambda Function Review.
Apache Spark: cluster
computing engine
– fast iterative
computations
– efficient
communication primitives
– APIs in Scala,
Java, Python, R
– interactive shell
– integrated
higher-level libraries
ML: methods that learn
from and make predictions on data
– tools and ideas
from: computer science, probability and statistics, optimization,
linear algebra
Supervised
learning: labeled observations
– learn mapping from
observations to labels
– classification:
assign a category to each item, categories are discrete
– regression: predict
a real value for each item, e.g. stock prices
Unsupervised
learning: unlabeled observations
– find latent
structure from features
– can be goal in
itself; can be means for preprocessing
– clustering:
partition observations into homogenous regions, e.g. identify
'communities' within lagre grous of people
– dimensionality
reduction: transform an initial feature representation into a more
concise representation
typical supervised ML
pipeline
– obtain raw data
– feature extraction:
represent observations, numeric features, incorporate domain
knowledge
unsupervised learning
may be used as preprocessor;
spam/not spam
classifier: bag of words, if word n in document, set feature n=1
– train a model: use
cross-validation dataset
classifiers: logistic
regression, SVM, decision tree, random forest, etc.
training involves
iterative computations, e.g. gradient descent
goal: find decision
boundary: model parameters = feature weights and offset
– evaluate: on test
dataset; evaluation metrics: accuracy, precision, recall, F1-score
detect
overfitting/high bias
– iterate: goto
feature extraction
– predict
linear algebra review
– matrix A:
2-dimensional array, n rows, m columns; A(i,j): entry in i-th row,
j-th column
– vector: a: matrix
with n rows, 1 column
– transpose: A(i,j) =
A.T(j,i); rotate vector couter-clock-wise
– scalar product
= dot product = inner product for vectors: two vectors =>
scalar // 1,m * m,1 = 1,1
vector_a dot
vector_b = sum a(i)*b(i)
it's a special case of
marix multiplication: A.T * B = A dot B for A,B: vectors size m,1
– matrix-vector
multiplication: n,m * m,1 = n,1
result: n scalar
products fo n row vectors in matrix
– matrix-matrix
multiplication: A * B = C // n,m * m,p = n,p
C(i,j) = scalar
product of row A(i) and column B(j)
associative, not
commutative
– outer
product: two vectors => matrix // n,1 * 1,m = n,m
– identity
matrix: I * A = A // ones in diagonal
– inverse
matrix: A * A.inv = I // only a square matrix can have an inverse
– euclidean norm:
norm(x) = ||x||2 = sqrt(x1^2 + x2^2) // vector 'length'
Big O notation
– describes how
algorithms respond to changes in input size
– processing time:
proportional to number of 'basic' operations
– space: units of
storage, 8 bytes for float
– notation: f(x) =
O(g(x)) // f does not grow faster than g
– formal definition:
|f(x)| <= C|g(x)|
– ignores constants
and lower-order terms
– matrix-matrix
multiply n,m * m,p = n,p
n*p entries, each
entry = dot product between 2 m-dimentional vectors
time O(npm); space
o(np)
lab: Gain hands
on experience using Python's scientific computing library to
manipulate matrices and vectors
import numpy as np arr = np.array([1, 2, 3]) elementWise = arr * brr dotProd = np.dot(arr, brr) A = np.matrix([[1,2,3,4], [5,6,7,8]]) AAt = A * A.T AAInv = np.linalg.inv(AAt) lastThree = arr[-3:] zeros = np.zeros(8) ones = np.ones(8) zerosThenOnes = np.hstack((zeros,ones)) zerosAboveOnes = np.vstack((zeros,ones)) from pyspark.mllib.linalg import DenseVector numpyVector = np.array([-3,-4,5]) myDenseVector = DenseVector([3.0, 4.0, 5.0]) denseProduct = DenseVector.dot(DenseVector(numpyVector), myDenseVector) multiplyByTen = lambda x: x * 10 mapResult = dataset.map(lambda x: x*5) filterResult = dataset.filter(lambda x: not(x % 2)) # keep even elems reduceResult = dataset.reduce(lambda x,y: x+y) (dataset .map(lambda x: x+2) .reduce(lambda x,y: x*y))
WEEK 2: Introduction to Apache Spark
Topics: Big data
and hardware trends, history of Apache Spark, Spark's Resilient
Distributed Datasets (RDDs), transformations, and actions.
Lab 2: Learning
Apache Spark. Perform your first course lab where you will learn
about the Spark data model, transformations, and actions, and write a
word counting program to count the words in all of Shakespeare's
plays.
the Big Data problem
– data growing faster
than computation speed
– storage getting
cheaper
– you can't process
all data on machine-you-can-afford: big data
– Google web index:
10+ PB
– time to read 1 TB
from disk: 3 hours
– solution:
distribute data over large clusters
problems with cheap
hardware
– failures: 1-5% hard
drives/year
0.2% DIMMs/year
– network speed,
latency vs. shared memory
– uneven performance
what's hard about
cluster computing
– how do we split
work across machines? hash table, map-reduce
must consider network,
data locality
moving data may be
very expensive
– how to deal with
failures?
10K nodes, 10 faults a
day
stragglers, slow nodes
failures, slow tasks
– re-launch the task
on other machine
– requirements:
individual task in job are idempotent; have no side effects
map-reduce, iterative
jobs
– involve a lot of
disk I/O for each repetition
– each stage passes
data through disk: processing speed limited by HDD speed
– I/O,
serialization/deserialization
Apache Spark keeps
intermediate results in RAM
– up to 100 times
faster
– storage: in-memory,
on disk
– operations: map,
reduce, join, sample, …
– execution model:
batch, interactive, streaming
– programming env:
Scala, Java, R, Python
– lazy evaluation:
better pipelining, reduces 'wait states'
– lower overhead for
starting jobs
– less expensive
shuffles
Apache Spark
cornerstone: RDD
– resilient
distributed dataset: a fault-tolerant abstraction for in-memory
cluster computing
– write programs in
terms of ops on distributed datasets
– partitioned
collections of objects spread across a cluster, stored in memory or
on disk
– RDDs built and
manipulated through a diverse set of parallel transformations (map,
filter, join)
and actions (count,
collect, save)
– RDDs automatically
rebuilt on machine failure
Spark computing
framework
– abstractions and
parallel runtime to hide complexities of fault-tolerance and slow
machines
– here's an
operation, run it on all the data
I don't care where it
runs
– Spark tools: SQL,
Streaming, MLlib, GraphX, ...
Spark essentials
– programming Spark
(pySpark)
– RDDs, creating RDD
– transformations,
actions
– Spark programming
model
Driver and workers
– driver,
driver machine, SparkContext: one program
– worker,
executor on cluster nodes (or local threads): another program
RDDs are distributed
across workers
– SparkContext
object: how and where to access a cluster
parameter: master,
type and size of cluster (local[K], spark://host:port,
mesos://host:port)
RDD: immutable,
track lineage information for recovery; enable ops in parallel
– construct RDD: from
files in HDFS, …
by parallelizing
existing Python collections
by transforming an
existing RDDs
– specify number of
partitions in constructor
rdd = sc.parallelize(data, 4) # 4 partitions rdd2 = sc.textFile('readme.md', 4) # lines of input
two types of operations
– transformations
(recipe for creating result)
– actions
– transformations are
lazy
– transformed RDD is
executed when action runs on it
– you can persist
(cache) RDD in memory or disk
transformations: create
a new rdd from existing one
– map, filter,
distinct, flatMap, …
rdd.map(lambda x:
x*2) # 'lambda: …' functon literals are closures,
automatically passed to workers;
– lazy evaluation:
nothing executes, Spark saves recipe for transforming source.
actions
– cause Spark to
execute recipe to transform source
– mechanism for
getting results out of Spark
reduce, take, collect,
takeOrdered, …
rdd.reduce(lambda a,b: a*b)
caching
lines = sc.textFile('filename', 4) lines.cache() # read file and form partitions only once comments = lines.filter(isComment) print lines.count(), comments.count() # second time calls to lines, cached
key-value RDDs
– each element of a
Pair RDD is a pair tuple
rdd = sc.parallelize([(1,2), (3,4)])
– key-value transformations
sortByKey, groupByKey,
reduceByKey, …
groupByKey can cause
massive data shuffle and overload some workers
closures
– Spark automatically
creates closures for functions that run on RDD at workers;
any global variables
used by those workers
– one closure per
worker: sent for every task
no communications
between workers
changes to global vars
at workers are not sent to driver
shared variables
– broadcast
variables
efficiently send
(large) read-only value to all workers
saved at workers for
use in one or more operators
useful for shared
read-only data (maps)
– accumulators
aggregate values from
workers back to driver
only driver can access
value of accumulator
for tasks, accum. are
write-only
use to count errors
seen in RDD across workers
can be 'added' to by
associative op, counters, sums
#on driver signPrefixes = sc.broadcast(loadCallSignTable()) # on driver def processSignCount(sign_count, signPrefixes): country = lookupCountry(sign_count[0], signPrefixes.value) # on worker … counts = (contacts.map(processSignCount).reduceByKey(lambda x,y: x+y)) # accum: accum = sc.accumulator(0) # on driver rdd = … def f(x): global accum # on worker accum += x rdd.foreach(f) accum.value # on driver
– using accum. in actions: guaranteed to be applied only once
no such guarantee for
accum. in transformations (might have to be run multiple times)
Lab 2: Learning
Apache Spark
– At a high level,
every Spark application consists of a driver program that
launches various parallel operations on executor JVM
– this driver program
contains the main loop for the program and creates distributed
datasets on the cluster, then applies operations (transformations &
actions) to those datasets
– Driver programs
access Spark through a SparkContext object, which represents a
connection to a computing cluster.
– When you run map()
on a dataset, a single stage of tasks is launched.
– A stage is a group
of tasks that all perform the same computation, but on different
input data.
– One task is
launched for each partitition. A task is a unit of execution that
runs on a single machine.
– to create a new
list on the driver from the the data distributed in the executor
nodes. To do this we call the collect() method on our RDD
– the data returned
to the driver must fit into the driver's available memory. If not,
the driver will crash.
– the dataset is
broken into n partitions, so n collect() tasks are launched.
Each task collects the entries in its partition and sends the result
to the SparkContext, which creates a list of the values
– The filter(f)
method is a transformation operation that creates a new RDD from the
input RDD...Elements that do not return True will be dropped.
– The key advantage
of using takeOrdered() instead of first() or take()
is that takeOrdered() returns a deterministic result.
– The top()
action is similar to takeOrdered() except that it returns the list in
descending order
– The reduce()
action reduces the elements of a RDD to a single value by applying a
function that takes two parameters and returns a single value. The
function should be commutative and associative, as
reduce() is applied at the partition level and then again to
aggregate results from partitions.
– The takeSample()
action returns an array with a random sample of elements from the
dataset
– The countByValue()
action returns the count of each unique value in the RDD as a
dictionary that maps values to counts
– flatMap() is
similar to map(), except that with flatMap() each input item can be
mapped to zero or more output elements
– The reduceByKey()
transformation gathers together pairs that have the same key and
applies a function to two associated values at a time. reduceByKey()
operates by applying the function first within each partition on a
per-key basis and then across the partitions
– when using the
groupByKey() transformation - all the key-value pairs are
shuffled around, causing a lot of unnecessary data to being
transferred over the network
– combineByKey() can
be used when you are combining elements but your return type differs
from your input value type
– foldByKey() merges
the values for each key using an associative function and a neutral
"zero value".
– The mapPartitions()
transformation uses a function that takes in an iterator (to the
items in that specific partition) and returns an iterator. The
function is applied on a partition by partition – basis
– The
mapPartitionsWithIndex() transformation uses a function that
takes in a partition index (think of this like the partition number)
and an iterator (to the items in that specific partition). For every
partition (index, iterator) pair, the function returns a tuple of the
same partition index number and an iterator of the transformed items
in that partition
if you try to keep too
many RDDs in memory, Spark will automatically delete RDDs from memory
to make space for new RDDs. If you later refer to one of the RDDs,
Spark will automatically recreate the RDD for you, but that takes
time
– You can use the
cache() operation to keep the RDD in memory
– once you are
finished using an RDD, you can optionally tell Spark to stop caching
it in memory by using the RDD's unpersist() method
pySpark runs Python
code in a JVM using Py4J. Py4J enables Python programs
running in a Python interpreter to dynamically access Java objects in
a Java Virtual Machine. Methods are called as if the Java objects
resided in the Python interpreter and Java collections can be
accessed through standard Python collection methods. Py4J also
enables Java programs to call back Python objects
Word Count Lab:
Building a word count application
#Part 1: Creating a base RDD and pair RDDs #Part 2: Counting with pair RDDs #Part 3: Finding unique words and a mean value #Part 4: Apply word count to a file wordsList = ['cat', 'elephant', 'rat', 'rat', 'cat'] wordsRDD = sc.parallelize(wordsList, 4) def makePlural(word): return word + 's' pluralRDD = wordsRDD.map(makePlural) pluralLambdaRDD = wordsRDD.map(lambda x: x + 's') pluralLengths = (pluralRDD .map(lambda x: len(x)) .collect()) wordPairs = wordsRDD.map(lambda x: (x, 1)) wordsGrouped = wordPairs.groupByKey() wordCountsGrouped = wordsGrouped.map(lambda (w, itr): (w, sum(itr))) wordCounts = wordPairs.reduceByKey(lambda num1, num2: num1 + num2) wordCountsCollected = (wordsRDD .map(lambda x: (x, 1)) .reduceByKey(lambda x, y: x + y) .collect()) uniqueWords = len(wordCountsCollected) from operator import add totalCount = (wordCounts .map(lambda (w, num): num) .reduce(lambda x, y: x + y)) average = totalCount / float(uniqueWords) def wordCount(wordListRDD): res = (wordListRDD .map(lambda x: (x, 1)) .reduceByKey(lambda x, y: x + y) ) return res def removePunctuation(text): res = text.lower() res = re.sub(r'[^\w\s]', '', res) res = re.sub(r'[_]', '', res) res = res.strip() return res shakespeareRDD = (sc .textFile(fileName, 8) .map(removePunctuation)) print '\n'.join(shakespeareRDD .zipWithIndex() # to (line, lineNum) .map(lambda (l, num): '{0}: {1}'.format(num, l)) # to 'lineNum: line' .take(15)) shakespeareWordsRDD = shakespeareRDD.flatMap(lambda line: line.split(' ')) shakespeareWordCount = shakespeareWordsRDD.count() shakeWordsRDD = shakespeareWordsRDD.filter(lambda x: x) shakeWordCount = shakeWordsRDD.count() top15WordsAndCounts = wordCount(shakeWordsRDD).takeOrdered(15, key=lambda (w, cnt): -cnt)
WEEK 3: Linear Regression and Distributed Machine Learning Principles
Topics: Linear
regression formulation and closed-form solution, distributed machine
learning principles (related to computation, storage, and
communication), gradient descent, quadratic features, grid search.
Lab 3: Millionsong
Regression Pipeline. Develop an end-to-end linear regression pipeline
to predict the release year of a song given a set of audio features.
You will implement a gradient descent solver for linear regression,
use Spark's machine Learning library ( mllib) to train additional
models, tune models via grid search, improve accuracy using quadratic
features, and visualize various intermediate results to build
intuition.
linear regression:
supervised learning problem (least squares regression)
– learn a mapping
from observations to continuos labels given a training set
– example: height,
gender, weight => shoe size
for each observation we
have a feature vector: X.T = [x1, x2, x3]
we assume a linear
mapping between features and label: y ~ w0 + w1x1 + w2x2 + w3x3
feature vector to
incorporate offset: X.T = [1, x1, x2, x3]
then: y ~ scalar
product = W.T * X
– simple, often works
well in practice, can introduce complexity via feature extraction
– loss function: (y —
yhat)^2, has nice math. properties
– find W that
minimizes squared loss over training poins:
min(w) norm (X*w - y)^2
X: n,d matrix storing
points
y: n vector, real
values, labels
yhat: n vector,
predicted labels, yhat = X*w
w: d vector, regression
parameters / model to learn
– find solution by
setting derivative to zero
df/dw = 2* (w*X.T*X —
X.T*y) = 0
# solve it for w
(closed form solution)
w = inv(X.T*X) * X.T*y
// if inverse exits
ridge regression
: (linear regression + regularization)
– penalize model
complexity (overfitting)
– add regularization,
keep model parameters small
min(w) norm (X*w —
y)^2 + (lambda * norm (w)^2)
–
closed-form solution:
w
= inv(X.T*X + lambda*Id)*X.T*y // Id: identity matrix size d,d
– all other things
being equal, models with smaller weights will be selected over models
with larger weights.
– trades off between
training error and model complexity
millionsong regression
pipeline
– First we must
obtain our raw dataset
– Next, we must split
the data into training and test datasets
– We then perform
feature extraction to find meaningful features,
and typically to
represent our data numerically.
– We then train
our model using the training data,
and subsequently
evaluate it on the test data.
– we iterate
over this process until we are satisfied with our results
– we can use the
final model that we've created to make predictions
– In terms of feature
extraction, we will use quadratic features,
which enable us to
incorporate pairwise feature interactions into our model.
In other words, these
quadratic features allow us to capture the covariance of the initial
timbre features,
thus leading to a
non-linear model
– given 2D data,
quadratic features are:
x = [x1, x2] => F(x) = [x1^2, x1x2, x2x1, x2^2] F'(x) = [x1^2, sqrt(2)x1x2, x2^2] F(x).T * F(z) = F'(x).T*F'(z)
– Our goal is to map audio features and, in particular, our
quadratic features to song year.
– We'll be using a
ridge regression model to learn this mapping.
how should we go about
choosing a value for this free parameter lambda?
And in fact, most
methods have free parameters, which are often called hyperparameters,
that need to be tuned.
– validation set
for searching hyperparameters
– We now use the
training data to train potentially multiple models, each with
different hyperparameter settings.
The validation
set to evaluate the various models, often using grid search;
and the test
set to evaluate the final model's accuracy.
Grid search is a
standard method for searching over hyperparameters,
exhaustive search
– define and
discretize the search space, either on a linear or log scale
– we use the training
data to train models at each of these grid points
using each of the
hyperparameter settings defined by our grid
– evaluate points via
validation error
– Since least squares
linear regression uses the squared loss, it seems reasonable to
consider the
mean squared error
to evaluate model quality
mse = 1/n sum (for i in 1..n: (yhat — y)^2)
– more natural to use: root-mean-square error: RMSE =
sqrt(mse)
Once we've identified
our preferred model, we can then use the test set to evaluate its
accuracy
And finally, once we
have our final model, we can use it to make predictions on future
observations
distributed machine
learning principles related to computation and storage
– time and space
complexity for closed-form solution
w = inv(X.T*X + lambda*Id)*X.T*y
Since matrix multiplication is an associative operation
temp = X.T * y //
O(n*d)
inv(...)*temp // O(d^2)
in summary, the two
computational bottlenecks:
X.T*X: O(nd^2); its
inverse: O(d^3)
overall: time O(nd^2 +
d^3)
– storage: X.T*X and
inverse: O(d^2) floats
X: O(nd) floats
big n and small d:
O(d^3) computations, O(d^2) storage is feasible on a single machine
– well suited for a
distributed computation
– store rows across
cluster (distributed n)
– compute X.T*X in
parallel: as sum of outer products (column * row = matrix)
column = i-th row.T =>
row(i).T * row(i) // supereasy
– O(nd) distributed
storage; O(nd^2) distributed computation;
O(d^2) local storage;
O(d^3) local computation
X.map(computeOuterProduct).reduce(sumAndInvert)
big n and big d:
d^3 computations; d^2 storage can't be performed locally
– rule of thumb:
we need complexity at most linear in (n, d) // records, dimensions
– idea: exploit
sparsity
sparse data is
prevalent
– can reduce
dimensions: PCA, low-rank approximation
X can be represented
by product of two skinny matrices n,d = n,r * r,d; r « n or d
– use different
algorithms: gradient descent O(nd) computations, O(d) local
storage per iteration
gradient descent
(linear regression optimization)
– objective function
(loss) is convex (least squares, ridge regression, logistic
regression: all convex)
f(w) = norm (X*w —
y)^2
find w that minimize
f(w)
– start at random
point; repeat until converge:
determine a descent
direction (gradient)
choose a step size
update w
– stop when max.
#iteration have been performed, or updates become small enough
– descent
direction: negative slope
w_next = w — alpha * df/dw
alpha: hyperparameter = step size, reduce over time ~
alpha/(n*sqrt(iteration))
df/dw: delta =
gradient (derivative)
we'll note that f(w) is
a summation.
So we can compute the
derivative by taking the derivative of each of the summands
separately and summing the results.
Moreover, we can take
the derivative of each summand using the chain rule.
df/dw = sum (for j in 1..n: (w.T*x(j) — y(j))*x(j)
GD can be done with
map-reduce
– if we share w(i)
with all workers
for i in range(numIters): alpha_i = alpha / (n * sqrt(i+1)) gradient = X.map(lambda point: gradientSummand(w, point)).sum() w -= alpha_i * gradient return w
GD summary:
– pros: easily
parallelized; cheap at each iteration; stochastic variants even
cheaper
– cons: slow
convergence (compared with closed-form); requires communication
across nodes!
Communication
hierarchy
– CPU to RAM 50 GB/s,
HDD 0.1 GB/s
– network, same rack
1 GB/s; other racks 0.3 GB/s
– 50x gap
memory/network (+latency)
distributed machine
learning principles: communication principles
– reduce
communication cost
– rule of thumb:
perform parallel and in-memory computation
X.cache() for i in range(numIters): alpha_i = alpha / (n * sqrt(i+1)) gradient = X.map(lambda point: gradientSummand(w, point)).sum()
– scale-up (multicore monsters) is not good enough, expensive
– scale-out
(distributed): scales good, but need to deal with network
communication
for i in range(numIters): ... gradient = X.map(lambda point: gradientSummand(w, point)).sum() w -= alpha_i * gradient
need to communicate parameters w in each iteration
– rule of thumb:
minimize network communication, stay local, reduce iterations
– keep large objects
local
– example: linear
regression, big n and small d, closed form solution
communicate O(d^2)
intermediate data
– example: linear
regression, big n and big d, gradient descent
communicate O(d)
parameters w on each iteration: OK for fairly large d
– example:
hyperparameter tuning, small n, small d, grid search
communicate data
O(nd), train each model locally (model parallel)
– example: linear
regression, big n and huge d, gradient descent
O(d) communications
slow, rely on sparsity to reduce communication
distribute data and
model
we can reduce
communication by reducing the number of iterations
– in Bulk Synchronous
Parallel (BSP) systems, e.g. Spark, we strictly alternate between
computations and
communications
– idea: compute
more, communicate less
– extreme:
divide-and-conquer, fully process each partition locally,
communicate final
result
problem: we obtain
approximate result, in general
w = X.mapPartitions(localLinearRegression).reduce(combineRegressionResults)
– less extreme option: mini-batching
for i in range(fewerIters): update = X.mapPartitions(localGradientUpdates).reduce(combineLocalUpdates) w += update
increase the amount of local computation;
if perform too big
batches, returns can be diminishing
throughput/latency
– bytes per second /
cost to sent message
– send larger
messages, batch communications
– e.g. train multiple
models together
millionsong lab
– implement a
regression pipeline for predicting a song's release year from audio
features
– raw data
explore features
shift labels to start
at 0
visualize data
– split to training,
cross-validation, test sets
– feature extraction:
initially use raw features
compare with quadratic
features
– supervised
learning: least squares regression
gradient descent from
scratch
use MLlib
implementation (ridge regression)
visualize performance
by iteration
– evaluation
hyperparameter tuning,
grid search for stepsize and regularization
evaluate using RMSE,
visualize grid search
evaluate final model:
using RMSE, compare to baseline model
from pyspark.mllib.regression import LabeledPoint import numpy as np from pyspark.mllib.linalg import DenseVector from pyspark.mllib.regression import LinearRegressionWithSGD numPartitions = 2 rawData = sc.textFile(fileName, numPartitions) numPoints = rawData.count() samplePoints = rawData.take(5) def parsePoint(line): arr = line.split(',') label = arr[0] features = arr[1:] return LabeledPoint(label, features) parsedDataInit = rawData.map(parsePoint) onlyLabels = parsedDataInit.map(lambda x: x.label) minYear = onlyLabels.min() maxYear = onlyLabels.max() parsedData = parsedDataInit.map(lambda x: LabeledPoint(x.label - minYear, x.features)) weights = [.8, .1, .1] seed = 42 parsedTrainData, parsedValData, parsedTestData = parsedData.randomSplit(weights, seed) parsedTrainData.cache() parsedValData.cache() parsedTestData.cache() nTrain = parsedTrainData.count() nVal = parsedValData.count() nTest = parsedTestData.count() averageTrainYear = (parsedTrainData .map(lambda x: x.label) .mean() ) def squaredError(label, prediction): return math.pow(label - prediction, 2) def calcRMSE(labelsAndPreds): sumErr = (labelsAndPreds .map(lambda (x, y): squaredError(x, y)) .reduce(lambda x, y: x + y) ) return math.sqrt(sumErr / labelsAndPreds.count()) labelsAndPredsTrain = parsedTrainData.map(lambda x: (x.label, averageTrainYear)) rmseTrainBase = calcRMSE(labelsAndPredsTrain) def gradientSummand(weights, lp): w = weights.T x = lp.features y = lp.label return (w.dot(x) - y) * x def getLabeledPrediction(weights, observation): label = observation.label w = weights.T x = observation.features prediction = w.dot(x) return (label, prediction) def linregGradientDescent(trainData, numIters): # The length of the training data n = trainData.count() # The number of features in the training data d = len(trainData.take(1)[0].features) w = np.zeros(d) alpha = 1.0 # We will compute and store the training error after each iteration errorTrain = np.zeros(numIters) for i in range(numIters): # Use getLabeledPrediction from (3b) with trainData to obtain an RDD of (label, prediction) # tuples. Note that the weights all equal 0 for the first iteration, so the predictions will # have large errors to start. labelsAndPredsTrain = trainData.map(lambda x: getLabeledPrediction(w, x)) errorTrain[i] = calcRMSE(labelsAndPredsTrain) # Calculate the `gradient`. Make use of the `gradientSummand` function you wrote in (3a). # Note that `gradient` sould be a `DenseVector` of length `d`. gradient = trainData.map(lambda x: gradientSummand(w, x)).reduce(lambda x,y: x+y) # Update the weights alpha_i = alpha / (n * np.sqrt(i+1)) w -= alpha_i * gradient return w, errorTrain numIters = 50 weightsLR0, errorTrainLR0 = linregGradientDescent(parsedTrainData, numIters) labelsAndPreds = parsedValData.map(lambda x: getLabeledPrediction(weightsLR0, x)) rmseValLR0 = calcRMSE(labelsAndPreds) # LinearRegressionWithSGD numIters = 500 # iterations alpha = 1.0 # step miniBatchFrac = 1.0 # miniBatchFraction reg = 1e-1 # regParam regType = 'l2' # regType useIntercept = True # intercept firstModel = LinearRegressionWithSGD.train( parsedTrainData, iterations=numIters, step=alpha, miniBatchFraction=miniBatchFrac, regParam=reg, regType=regType, intercept=useIntercept) weightsLR1 = firstModel.weights interceptLR1 = firstModel.intercept labelsAndPreds = parsedValData.map(lambda x: (x.label, firstModel.predict(x.features))) rmseValLR1 = calcRMSE(labelsAndPreds) # grid search, reg bestRMSE = rmseValLR1 bestRegParam = reg bestModel = firstModel numIters = 500 alpha = 1.0 miniBatchFrac = 1.0 for reg in (1e-10, 1e-5, 1): model = LinearRegressionWithSGD.train(parsedTrainData, numIters, alpha, miniBatchFrac, regParam=reg, regType='l2', intercept=True) labelsAndPreds = parsedValData.map(lambda lp: (lp.label, model.predict(lp.features))) rmseValGrid = calcRMSE(labelsAndPreds) if rmseValGrid < bestRMSE: bestRMSE = rmseValGrid bestRegParam = reg bestModel = model rmseValLRGrid = bestRMSE # grid search, alpha reg = bestRegParam modelRMSEs = [] for alpha in (1e-5, 10): for numIters in (5, 500): model = LinearRegressionWithSGD.train(parsedTrainData, numIters, alpha, miniBatchFrac, regParam=reg, regType='l2', intercept=True) labelsAndPreds = parsedValData.map(lambda lp: (lp.label, model.predict(lp.features))) rmseVal = calcRMSE(labelsAndPreds) print 'alpha = {0:.0e}, numIters = {1}, RMSE = {2:.3f}'.format(alpha, numIters, rmseVal) modelRMSEs.append(rmseVal) # Add interactions between features def twoWayInteractions(lp): newfeatures = itertools.product(lp.features, repeat=2) newfeatures = [x*y for x,y in newfeatures] label = lp.label features = np.hstack((lp.features, newfeatures)) return LabeledPoint(label, features) trainDataInteract = parsedTrainData.map(lambda x: twoWayInteractions(x)) valDataInteract = parsedValData.map(lambda x: twoWayInteractions(x)) testDataInteract = parsedTestData.map(lambda x: twoWayInteractions(x)) numIters = 500 alpha = 1.0 miniBatchFrac = 1.0 reg = 1e-10 modelInteract = LinearRegressionWithSGD.train(trainDataInteract, numIters, alpha, miniBatchFrac, regParam=reg, regType='l2', intercept=True) labelsAndPredsInteract = valDataInteract.map(lambda lp: (lp.label, modelInteract.predict(lp.features))) rmseValInteract = calcRMSE(labelsAndPredsInteract)
WEEK 4: Logistic Regression and Click-through Rate Prediction
Topics: Online
advertising, linear classification, logistic regression, working with
probabilistic predictions, categorical data and one-hot-encoding,
feature hashing for dimensionality reduction.
Lab 4:
Click-through Rate Prediction Pipeline. Construct a logistic
regression pipeline to predict click-through rate using data from a
recent Kaggle competition. You will extract numerical features from
the raw categorical data using one-hot-encoding, reduce the
dimensionality of these features via hashing, train logistic
regression models using mllib, tune hyperparameter via grid search,
and interpret probabilistic predictions via a ROC plot.
canonical scalable ML
problem : online advertising
– huge amount of
data: online human activity, ad properties, etc
– skewed classes;
data is sparse, high-dimensional
– heavily biased
label training set
players:
– publishers
– advertisers
– matchmakers (in
real time)
metric for success:
click-through rate
– get user to do
something, click on ad
– how likely it is
for a user to click on an ad?
– choose ads to
maximize probability
– conditional
probability: probability given predictive features
– goal: estimape
P(click | user, ad, publisher) given: massive amounts of labeled data
predictive features
– ad's historical
performance
– advertiser, ad
content info
– publisher info
– user info
(search/click history)
linear
classification, relation to linear regression; logistic
regression
– supervised learning
– goal: learn a
mapping from observations to discrete labels given a set of training
examples
– example: spam
classification, emails as observations, labels: spam/not spam
– example:
click-through rate prediction; observations: user,ad,publisher
triples;
labels:
click/not-click
– linear classifier:
yhat = sign(W.T*X)
– decision
boundary: W.T*X = 0
– how can we evaluate
predictions?
– 0-1 loss:
penalty is 0 for correct prediction, 1 otherwise
let y = {-1, 1); z = y
* W.T * X
then: loss = 1 if z <
0; loss = 0 otherwise
– idea: find W that
minimizes 0-1 loss over training poins
min sum (for i in 1..n:
loss(y(i) * W.T * X(i)))
– this is not a
convex function, alas.
– solution:
approximate 0-1 loss with convex loss: SVM, logistic
regression, Adaboost
logloss: logistic
loss
logloss = log(1 + e^-z)
– logistic
regression: learn parameters W that minimizes logloss on training
data
min sum (for i in 1..n:
logloss(y(i) * W.T * X(i)))
– convex, closed form
solution doesn't exist
– can solve via
gradient descent
– update rule: w_next
= w — alpha * gradient
gradient = sum (for j in 1..n: 1 — (1 / (1 + exp(-y(j)*W.T*X(j)))))*(-y(j)*X(j))
– adding regularization (trading off logloss over model complexity)
min sum (for i in 1..n: logloss(y(i) * W.T * X(i))) + lambda * norm(W)^2
probabilistic
interpretation of logistic regression
– we want more
granular information, not just 0/1
– goal: conditional
probability P(y=1 | x)
– example: P(y=rain |
temperature=a, clouds=b, humidity=c) = 0.7
– example: P(y=click
| … ) = 0.1
10% chance that a
click event occur
0.1 is a relatively
high probability
– if we want to work
with a linear model, we need to squash its output
to 0..1 range
– logistic, or
sigmoid function
P(y=1 | x) =
sigma(W.T*X)
– domain (x-axis) is
all real numbers; range of function (y-axis) is 0..1
sigma(z) = 1 / (1 + exp(-z))
– let y = {0, 1}
P(y=1 | x) =
sigma(W.T*X)
P(y=0 | x) = 1 -
sigma(W.T*X)
– to make class
predictions, we need a threshold
default threshold =
0.5
P(y=1 |x) > 0.5 =>
yhat = 1
– decision boundary:
sigma(0) = 0.5
– with default
threshold, the decision boundaries are identical to linear
classification
how we can use
probabilistic predictions
– evaluation metrics,
threshold
– with spam detector,
false positive is more harmful than false negative
– you can yse a
threshold greater 0.5 to be more 'conservative'
– how can we
determine the best choice?
– use a ROC plot
: false positive rate vs true positive rate for different thresholds
in ideal world TPR =
100%, FPR = 0%
at random: 50% - 50%
establishing these
baselines, you can use a ROC plot to evaluate a classifier
– assuming you might
be willing to tolerate at most 10% FPR => threshold
you may want to use
probabilities directly, w/o making class predictions
– click-through
prediction: click events are rare, probabilities will be uniformly
low
– in these cases, it
make sense to use the logloss
logloss(p,y) = -log(p) if y=1; logloss = -log(1-p) if y=0
when y = 1, we want p =1
– no penalty at 1;
increasing penalty away from 1
– similar when y=0
– in comparison to
the 0-1 loss, which either returns a penalty of 0 or 1,
the log loss is a more
fine-grained metric for evaluating the accuracy of probabilistic
predictions
categorical data,
one-hot-encoding
– one-hot encoding: a
standard technique for converting categorical features into numerical
ones
– in many other
cases, the raw data is not numeric (gender, nationality, occupation,
…)
– methods: Decision
Trees, Naive Bayes, … naturally support non-numeric features (this
limits our options)
– how we can
effectively perform this conversion to numeric data
non-numerical features
– categorical
(gender, language, …), no intrinsic ordering
– ordinal (is your
health: poor, reasonable, good, excellent?), intrinsic ordering,
relative (not absolute)
can't convert by just
assigning numerical values to categories
– introduce a degree
of closeness that didn't previously exist
one-hot-encoding (used
in bag-of-words)
– we create a
separate dummy variable for each category
sparse matrix: only
one variable in a row would be = 1
how to compute one-hot
encoded features
– features:
animal: bear, cat,
mouse // 3 categories
color: black, tabby //
2 categories
diet: mouse, salmon //
2 categories, optional
– points (animal,
color, diet):
a1 = mouse, black
a2 = cat, tabby, mouse
a3 = bear, black,
salmon
– in total: 7
categories across 3 features
– step 1: map each
category to a dummy feature, assign a number (position in array)
(animal, bear) => 0
# first feature
(animal, cat) => 1
# second
…
(diet, mouse) => 7
– step 2: map
categorical features to numerical
a1: mouse, black, n/a
// animal, color, diet // => (animal, mouse) = #2; (color, black)
=> #3
a1 = (0, 0, 1, 1, 0,
0, 0) // array of dummy features
how to efficiently
store OHE features
– our OHE features
are sparse
– store the indices
and values for all non-zero entries and assume that all other entries
equal 0
a1 = ((2,1), (3,1))
– can lead to a
dramatic savings, both in terms of storage and computation
feature hashing,
which can be used to reduce the dimensionality of OHE features
– hash features are a
reasonable alternative to OHE features
– a consequence of
OHE features is that they drastically increase the dimensionality of
our data
– example:
bag-of-words, over a million words in a dictionary
bigrams make it even
larger
– large d: need
bigger n; increased communication, larger storage
– By using hashing
principles, we can create a lower dimensional feature representation
– feature hashing can
be interpreted as an unsupervised learning method, which is performed
as a pre-processing step
– hash functions map
an object to one of m buckets
each feature category
is an object, and we have fewer buckets than feature categories
different categories
are bound to collide or map to the same bucket
– bucket indices are
what we define as our hashed features
– hash function H =>
0..3 // m = 4 number of buckets
H(animal, mouse) = 3
H(color, black) = 2
H(animal, cat) = 0
H(color, tabby) = 0
H(diet, mouse) = 2
– hashed features:
a1 = (0, 0, 1, 1)
a2 = (2, 0, 1, 0) //
2: tabby cat, both categories in bucket #0
– hashed features
have been used in practice and
have resulted in good
practical performance on various text classification tasks
– And it turns out
that under certain conditions, hashed features lead to good
approximations
of these inner
products, theoretically
pairwise inner
products between the data points
X_hash = X.map(applyHashFunction).map(createSparseVector)
lab: click-through
rate prediction pipeline
– goal: estimate
P(click | user, ad, publisher)
– raw data includes
39 features describing users, ads, and publishers.
Many of these features
contain a large number of categories
– split this data set
into training, validation, and test data sets
– feature extraction
is the main focus of this lab
You'll create OHE
features, as well as hashed features,
and store these
features using a sparse representation
– you will use MLlib
to train logistic regression models
– You will then
perform Hyperparameter tuning
to search for a good
regularization parameter,
evaluating the results
via log loss
– You will then
evaluate the final model, again, using log loss
and compare its
accuracy to that of a simple baseline
import numpy as np from pyspark.mllib.linalg import SparseVector from pyspark.mllib.regression import LabeledPoint from pyspark.mllib.classification import LogisticRegressionWithSGD from math import log from math import exp # exp(-t) = e^-t import math from collections import defaultdict import hashlib sampleOne = [(0, 'mouse'), (1, 'black')] sampleTwo = [(0, 'cat'), (1, 'tabby'), (2, 'mouse')] sampleThree = [(0, 'bear'), (1, 'black'), (2, 'salmon')] sampleDataRDD = sc.parallelize([sampleOne, sampleTwo, sampleThree]) # OHE encoding sampleOHEDictManual = {} sampleOHEDictManual[(0,'bear')] = 0 sampleOHEDictManual[(0,'cat')] = 1 sampleOHEDictManual[(0,'mouse')] = 2 sampleOHEDictManual[(1,'black')] = 3 sampleOHEDictManual[(1,'tabby')] = 4 sampleOHEDictManual[(2,'mouse')] = 5 sampleOHEDictManual[(2,'salmon')] = 6 # sparse vector bDense = np.array([0., 0., 0., 1.]) bSparse = SparseVector(len(bDense), enumerate(bDense)) sampleOneOHEFeatManual = SparseVector(7, {2: 1.0, 3: 1.0}) sampleTwoOHEFeatManual = SparseVector(7, {1: 1.0, 4: 1.0, 5: 1.0}) sampleThreeOHEFeatManual = SparseVector(7, {0: 1.0, 3: 1.0, 6: 1.0}) def oneHotEncoding(rawFeats, OHEDict, numOHEFeats): spDict = {} for x in rawFeats: key = OHEDict.get(x, None) if key is not None: spDict[key] = 1.0 return SparseVector(numOHEFeats, spDict) numSampleOHEFeats = len(sampleOHEDictManual) sampleOHEData = sampleDataRDD.map(lambda x: oneHotEncoding(x, sampleOHEDictManual, numSampleOHEFeats)) # OHE dict creation sampleDistinctFeats = (sampleDataRDD .flatMap(lambda x: x) .distinct()) sampleOHEDict = (sampleDistinctFeats .zipWithIndex() .collectAsMap()) def createOneHotDict(inputData): distinctFeats = (inputData .flatMap(lambda x: x) .distinct()) return (distinctFeats .zipWithIndex() .collectAsMap()) sampleOHEDictAuto = createOneHotDict(sampleDataRDD) # load data rawData = (sc .textFile(fileName, 2) .map(lambda x: x.replace('\t', ','))) weights = [.8, .1, .1] seed = 42 rawTrainData, rawValidationData, rawTestData = rawData.randomSplit(weights, seed) rawTrainData.cache() rawValidationData.cache() rawTestData.cache() nTrain = rawTrainData.count() nVal = rawValidationData.count() nTest = rawTestData.count() # parse data def parsePoint(point): res = [] rowFeats = map(lambda x: x.strip(), point.split(',')) for idx, x in enumerate(rowFeats): if idx == 0: continue res.append((idx-1, x)) return res parsedTrainFeat = rawTrainData.map(parsePoint) # OHE numCategories = (parsedTrainFeat .flatMap(lambda x: x) .distinct() .map(lambda x: (x[0], 1)) .reduceByKey(lambda x, y: x + y) .sortByKey() .collect()) ctrOHEDict = createOneHotDict(parsedTrainFeat) numCtrOHEFeats = len(ctrOHEDict.keys()) def parseOHEPoint(point, OHEDict, numOHEFeats): features = [] rowFeats = map(lambda x: x.strip(), point.split(',')) for idx, x in enumerate(rowFeats): if idx == 0: label = x continue features.append((idx-1, x)) oheFeats = oneHotEncoding(features, OHEDict, numOHEFeats) return LabeledPoint(label, oheFeats) OHETrainData = rawTrainData.map(lambda point: parseOHEPoint(point, ctrOHEDict, numCtrOHEFeats)) OHETrainData.cache() # ignore tuples w/o key def oneHotEncoding(rawFeats, OHEDict, numOHEFeats): spDict = {} for x in rawFeats: key = OHEDict.get(x, None) if key is not None: spDict[key] = 1.0 return SparseVector(numOHEFeats, spDict) OHEValidationData = rawValidationData.map(lambda point: parseOHEPoint(point, ctrOHEDict, numCtrOHEFeats)) OHEValidationData.cache() # CTR prediction, logloss evaluation numIters = 50 stepSize = 10. regParam = 1e-6 regType = 'l2' includeIntercept = True model0 = LogisticRegressionWithSGD.train( OHETrainData, iterations=numIters, step=stepSize, regParam=regParam, regType=regType, intercept=includeIntercept) sortedWeights = sorted(model0.weights) def computeLogLoss(p, y): epsilon = 10e-12 p = max(epsilon, p) p = min(1-epsilon, p) if y == 1: res = log(p) else: res = log(1 - p) return -1.0 * res # baseline logloss classOneFracTrain = (OHETrainData .map(lambda x: x.label) .reduce(lambda x, y: x + y) ) / OHETrainData.count() logLossTrBase = (OHETrainData .map(lambda x: computeLogLoss(classOneFracTrain, x.label)) .reduce(lambda x, y: x + y) ) / OHETrainData.count() # Calculate the probability for an observation given a set of weights and intercept. def getP(x, w, intercept): rawPrediction = intercept + x.dot(w) # Bound the raw prediction value rawPrediction = min(rawPrediction, 20) rawPrediction = max(rawPrediction, -20) return math.pow(1.0 + exp(-1 * rawPrediction), -1) trainingPredictions = (OHETrainData .map(lambda x: getP(x.features, model0.weights, model0.intercept)) ) # Calculates the log loss for the data given the model. def evaluateResults(model, data): res = (data .map(lambda x: computeLogLoss(getP(x.features, model.weights, model.intercept), x.label)) .reduce(lambda x, y: x + y) ) / data.count() return res logLossTrLR0 = evaluateResults(model0, OHETrainData) # validation logloss logLossValBase = (OHEValidationData .map(lambda x: computeLogLoss(classOneFracTrain, x.label)) .reduce(lambda x, y: x + y) ) / OHEValidationData.count() logLossValLR0 = evaluateResults(model0, OHEValidationData) # Reduce feature dimension via feature hashing def hashFunction(numBuckets, rawFeats, printMapping=False): """Calculate a feature dictionary for an observation's features based on hashing. Returns: dict of int to float: The keys will be integers which represent the buckets that the features have been hashed to. The value for a given key will contain the count of the (featureID, value) tuples that have hashed to that key. """ mapping = {} for ind, category in rawFeats: featureString = category + str(ind) mapping[featureString] = int(int(hashlib.md5(featureString).hexdigest(), 16) % numBuckets) if(printMapping): print mapping sparseFeatures = defaultdict(float) for bucket in mapping.values(): sparseFeatures[bucket] += 1.0 return dict(sparseFeatures) def parseHashPoint(point, numBuckets): """Create a LabeledPoint for this observation using hashing. Returns: LabeledPoint: A LabeledPoint with a label (0.0 or 1.0) and a SparseVector of hashed features. """ rowFeats = map(lambda x: x.strip(), point.split(',')) features = [] # list of tuples like [(0, 'mouse'), (1, 'black')] label = None for idx, x in enumerate(rowFeats): if idx == 0: label = x continue features.append((idx-1, x)) dictHashed = hashFunction(numBuckets, features, False) # dict like {2: 1.0, 3: 1.0} hashedFeats = SparseVector(numBuckets, dictHashed) return LabeledPoint(label, hashedFeats) numBucketsCTR = 2 ** 15 hashTrainData = rawTrainData.map(lambda x: parseHashPoint(x, numBucketsCTR)) hashTrainData.cache() hashValidationData = rawValidationData.map(lambda x: parseHashPoint(x, numBucketsCTR)) hashValidationData.cache() hashTestData = rawTestData.map(lambda x: parseHashPoint(x, numBucketsCTR)) hashTestData.cache() # check sparsity // Since we have 33K hashed features versus 233K OHE features, we should expect OHE features to be sparser def computeSparsity(data, d, n): """Calculates the average sparsity for the features in an RDD of LabeledPoints. Returns: float: The average of the ratio of features in a point to total features. """ total = (data .map(lambda x: len(x.features.indices)) .sum() ) return float(total) / float(d * n) averageSparsityHash = computeSparsity(hashTrainData, numBucketsCTR, nTrain) averageSparsityOHE = computeSparsity(OHETrainData, numCtrOHEFeats, nTrain) # Logistic model with hashed features numIters = 500 regType = 'l2' includeIntercept = True stepSizes = (1, 10) regParams = (1e-6, 1e-3) bestModel = None bestLogLoss = 1e10 for stepSize in stepSizes: for regParam in regParams: model = (LogisticRegressionWithSGD .train(hashTrainData, numIters, stepSize, regParam=regParam, regType=regType, intercept=includeIntercept)) logLossVa = evaluateResults(model, hashValidationData) print ('\tstepSize = {0:.1f}, regParam = {1:.0e}: logloss = {2:.3f}' .format(stepSize, regParam, logLossVa)) if (logLossVa < bestLogLoss): bestModel = model bestLogLoss = logLossVa # Evaluate on the test set logLossTest = evaluateResults(bestModel, hashTestData) logLossTestBaseline = (hashTestData .map(lambda x: computeLogLoss(classOneFracTrain, x.label)) .sum() ) / hashTestData.count()
WEEK 5: Principal Component Analysis and Neuroimaging
Topics:
Introduction to neuroscience and neuroimaging data, exploratory data
analysis, principal component analysis (PCA) formulations and
solution, distributed PCA.
Lab 5: Neuroimaging
Analysis via PCA - Identify patterns of brain activity in larval
zebrafish. You will work with time-varying images (generated using a
technique called light-sheet microscopy) that capture a zebrafish's
neural activity as it is presented with a moving visual pattern.
After implementing distributed PCA from scratch and gaining intuition
by working with synthetic data, you will use PCA to identify distinct
patterns across the zebrafish brain that are induced by different
types of stimuli.
Exploratory analysis
is a broad field covering many topics in both machine learning and
visualization
– process the data to
extract simpler, more compact representations
– perform analyses to
identify patterns in the data
– visualize and
explore them, and hopefully derive insight which we can share
– we really don't
know ahead of time exactly what analyses to perform
– almost all methods
in data analysis fall into one of two categories
supervised
unsupervised
– we're going to
focus on dimensionality reduction and clustering
clustering
– take a complex
collection of many data points and reduce it into a simpler
representation
dimensionality
reduction
– describes data
using a simpler representation; may make interesting patterns more
clear
– lower-dimensional
space: state space of a system
PCA, the most
commonly used method for dimensionality reduction
– in practice we
often measure redundant signals (US and EU shoe size)
– we represent data
via method by which it was gathered (pixels in images of brain
activity)
– goal: find a
'better' representation for data
to visualize and
discover patterns;
preprocessing for
other tasks (feature hashing)
– intuition: US and
EU shoe size, project points to the line
projected point close
to original, Euclidean distance, norm
– min dist: our goal
is to minimize the Euclidean distances between our original points
and their projections
unlike (yhat — y)^2
as in linear regression
– max variance: in
order to identify patterns in our data, we often look for variation
across observations
find a succinct
representation that best captures variation in our initial data
– PCA solution
represents the original data in terms of it's directions of
maximal variation.
PCA optimization
problem
– data: matrix X size
n,d (n rows, d columns)
– we aim to find a k
dimensional representation for each of our n data points,
where k is much
smaller than d
– matrix Z stores the
k dimensional representations of our data points (n,k)
– PCA scores: Z = X*P
// linear combination of the original data
– matrix P has k
columns: principal components (and d rows)
– PCA is an
optimization problem: we are searching over all d,k matrices
to find the right P
– we impose variance
and covariance constraints on P
off diagonal entries
of the covariance matrix Cz are all equal to zero;
the variance of the
first feature in the reduced dimension is the largest …;
– Formulation
X(i,j): i-th
observation, j-th feature
mu(j): mean of j-th
feature
– variance of
j-th feature:
var(j)^2 = 1/n sum (for i in 1..n: (X(i,j) — mu(j))^2)
– assuming zero mean (normalized data):
var(j)^2 = 1/n sum (for i in 1..n: X(i,j)^2)
covariance:
quantifies the relationship between two features
var(j,l) = 1/n sum (for i in 1..n: X(i,j)*X(i,l))
– The covariance is symmetric in that sigma 1,2 equals sigma 2,1
– a large positive
covariance indicates that the two features are highly correlated
– large negative
covariance indicates that the two features are highly anticorrelated
– covariance of zero
means that the two features are uncorrelated
– if the covariance
between the two features equals each of their squared variances,
then the two features
are the same
– The covariance
matrix is symmetric (d,d) matrix
where each entry
stores pairwise covariance information about the d features
Cx = 1/n * X.T * X
– diagonal entries of this matrix equal the sample variances
of the features
constraints
we want to avoid
redundancy
– and to address
this, PCA requires that the features in our reduced dimensions have
no correlation
– off diagonal
entries of the covariance matrix Cz are all equal to zero
features in the reduced
dimension maximize variance
– the variance of the
first feature in the reduced dimension is the largest,
followed by the
variance in the second feature, and so on
all covariance matrices
have an eigendecomposition,
– which means they
can be written as the product of three matrices, namely U, lambda,
and U transpose
(U * Lambda * U.T)
– with each column of
U being an eigenvector
– Eigenvectors have
corresponding eigenvalues, and the eigenvectors are sorted by their
eigenvalues
PCA solution can
be derived from the sample covariance matrix of X
– P = top k
eigenvectors of Cx
– These vectors are
stored by a matrix U, with each column of U being an eigenvector
In the context of PCA
– the d eigenvectors
of the covariance matrix are the d orthonormal directions of maximal
variance in the data
orthonormal vectors
are simply perpendicular vectors with Euclidean norm equal to one
how to choose k,
or the dimension of the new representation
– choose k to equal 2
or 3 for plotting purposes
– or, choose k that
captures most of the variance in our data
sum (i in 1..k: lambda(i)) / sum (j in 1..d: lambda(j)) > 95%
limitations and
assumptions
– First, PCA makes
linearity assumptions,
– and also assumes
that the principal components are orthogonal.
– we've assumed that
our data is centered, or, in other words, that our features have zero
mean
– since PCA chooses
directions of max variance, it depends on the relative scale of each
feature
normalize and scale
features! (x — mean) / (max - min)
algorithmic
interpretation of PCA
– Orthogonal vectors
are simply perpendicular vectors,
and one nice property
of orthogonal vectors is that their dot product always equals 0
– a unit vector is
simply a vector whose Euclidean norm equals one
– orthonormal: These
are vectors that are orthogonal and also have unit norm
interpretation of PCA
as an iterative algorithm
– At the i-th
iteration we aim to find the direction of maximal variance in the
data,
subject to the
constraint that this direction is of unit norm
and that it is
orthogonal to all directions we chose in previous iterations.
– We can then project
our data onto this direction,
and the locations
along this direction become the i-th feature in our new
representation
distributed PCA
– the first step in
PCA involves centering our data
– centered data is
stored in an n,d matrix which we'll call X
– we need to compute
the sample covariance matrix, or the scatter matrix X.T*X
– scatter
matrix is simply the sample covariance matrix without dividing by n
PCA returns the same
solution in either case
– in the third step,
we compute these eigenvectors by performing an eigendecomposition
– in order to obtain
our k dimensional representation,
we need to multiply
our original data by the top k eigenvectors to compute the PCA scores
In the first setting,
the big n and small d setting,
– we assume that
quadratic local storage, cubic local computation,
and O(dk)
communication are all feasible
– These are similar
assumptions we make when performing closed-form linear regression at
scale
– since n is large,
we store it in a data parallel fashion, which requires O of nd
distributed storage
– center our data.
And to do this, we must compute the mean of each feature m = sum/n
vector m to be the d
dimensional vector whose i-th component is the mean of the i-th
feature
compute the mean
vector via a simple reduce operation whereby we sum all of the data
points together
After computing m on
the driver, we must send it back to the workers so that each data
point can be centered
we can then perform a
map operation to create the centered data points, which simply
involves subtracting m from each original data point
– we next need to
compute the scatter matrix X.T * X
we can efficiently
perform this computation in a distributed fashion by using outer
products
we can express this
matrix multiplication as a sum of outer products where each outer
product
involves only a single
row of x, or a single data point
as a simple MapReduce
operation
In the map step, we
take each point and compute its outer product with itself //
bottleneck O(d^2)
In the reduce step, we
simply sum over all of these outer products
– we need to perform
its eigendecomposition
using lib func eigh
(generally requires cubic time and quadratic space)
As a result of
eigendecomposition, we have access to the top k eigenvectors on the
driver
– now we need to
communicate them to the workers
so that they can
compute these k dimensional representations for the data points //
bottleneck O(dk)
– now we can compute
the k dimensional representation for each point via a simple matrix
vector multiply
O(dk) map operation
second large-scale
setting, we assume that both n and d are large.
– And therefore, we
can only afford storage, computation, and communication that are
linear in both n and d
O(dk+n) local storage,
computation, communication
iterative algorithm
iterative approach
relies on performing a sequence of matrix vector products to compute
the top K eigenvectors
– top K principal
components of our data, and we will denote them by the matrix
P
The most common methods
for doing this are Krylov subspace and random projection based
methods
– iteratively compute
X.T*Xv for some v size d; O(k) passes over the data, O(dk) local
storage
– the driver provides
the workers with a D dimensional vector on each iteration i,
and requires that the
workers left multiply this vector by the scatter matrix
– The first step
involves the algorithm communicating the D dimensional vector, vi, to
all workers
vector vi communicated
to the workers in step one changes on each iteration
– Next, we need to
multiply the scatter matrix by the vector vi in a distributed fashion
qi = X.T * X*vi = X.T * (X*vi) // n.b. first calc bi size n = X*vi b = np.array(X.map(dotProduct).collect()) q = X.map(rescaleByBi).reduce(sumVectors)
each component of bi equal to the dot product between a row of X and
the vector vi
each worker will need
to store bi locally in order to compute the overall result, qi, in
the next step.
So we're going to need
to communicate bi to each worker
X.T * bi = sum of
rescaled data points
– The driver then
uses qi to update its estimate of P
PCA lab
we'll work with
time-varying images containing patterns of the zebrafish's neural
activity
as it is presented with
a moving visual pattern.
Different stimuli
induce different patterns across the brain, and we can use
exploratory analyses
to identify these
patterns
– we start with a
collection of many neural signals, each containing the average
response to the stimulus
– We then perform PCA
to reduce these high dimensional signals into a two dimensional space
– What we now have
for every neuron is a pair of numbers that describes the temporal
properties of its response
Part 1: Work through
the steps of PCA on a sample dataset
Part 2: Write a PCA
function and evaluate PCA on sample datasets
Part 3: Parse, inspect,
and preprocess neuroscience data then perform PCA
Part 4: Perform
feature-based aggregation followed by PCA
the non-spherical data
is amenable to dimensionality reduction via PCA, while the spherical
data is not
– When the covariance
is zero, the two dimensions are uncorrelated, and hence the data
looks spherical
– In contrast, when
the covariance is 0.9, the two dimensions are strongly (positively)
correlated and thus the data is non-spherical
import numpy as np from numpy.linalg import eigh correlatedData = sc.parallelize(dataCorrelated) meanCorrelated = correlatedData.sum() meanCorrelated = [meanCorrelated[0] / correlatedData.count(), meanCorrelated[1] / correlatedData.count()] correlatedDataZeroMean = correlatedData.map(lambda x: (x[0] - meanCorrelated[0], x[1] - meanCorrelated[1])) # Sample covariance matrix correlatedCov = (correlatedDataZeroMean .map(lambda x: np.outer(x, x)) .sum() ) / correlatedDataZeroMean.count() def estimateCovariance(data): """Compute the covariance matrix for a given rdd. Returns: np.ndarray: A multi-dimensional array where the number of rows and columns both equal the length of the arrays in the input `RDD`. """ count = data.count() mean = data.sum() mean = map(lambda x: x / count, mean) centered =data.map(lambda x: x - mean) res = (centered .map(lambda x: np.outer(x, x)) .sum() ) / count return res correlatedCovAuto= estimateCovariance(correlatedData) # Eigendecomposition eigVals, eigVecs = eigh(correlatedCovAuto) # find the top eigenvector based on the largest eigenvalue inds = np.argsort(eigVals)[::-1] topComponent = eigVecs[:, inds[0]] # PCA scores correlatedDataScores = correlatedData.map(lambda x: x.dot(topComponent)) # PCA function def pca(data, k=2): """Computes the top `k` principal components, corresponding scores, and all eigenvalues. Returns: tuple of (np.ndarray, RDD of np.ndarray, np.ndarray): A tuple of (eigenvectors, `RDD` of scores, eigenvalues). """ covMatrix = estimateCovariance(data) eigValues, eigVecs = eigh(covMatrix) inds = np.argsort(eigValues) revInds = inds[::-1] d = len(eigValues) topComponents = np.zeros((d, k)) for idx in range(k): # insert columns topComponents[:, idx] = eigVecs[:, revInds[idx]] dataScores = data.map(lambda x: x.dot(topComponents)) # Return the `k` principal components, `k` scores, and all eigenvalues return (topComponents, dataScores, eigValues[revInds]) # Run pca on correlatedData with k = 2 topComponentsCorrelated, correlatedDataScoresAuto, eigenvaluesCorrelated = pca(correlatedData, k=2) # Variance explained def varianceExplained(data, k=1): """Calculate the fraction of variance explained by the top `k` eigenvectors. Returns: float: A number between 0 and 1 representing the percentage of variance explained by the top `k` eigenvectors. """ components, scores, eigenvalues = pca(data, k) topEV = eigenvalues[:k] return sum(topEV) / sum(eigenvalues) varianceRandom1 = varianceExplained(randomData, 1) varianceCorrelated1 = varianceExplained(correlatedData, 1) # Parse, inspect, and preprocess neuroscience data then perform PCA lines = sc.textFile(inputFile) def parse(line): """Parse the raw data into a (`tuple`, `np.ndarray`) pair. Returns: tuple of tuple, np.ndarray: A (coordinate, pixel intensity array) `tuple` where coordinate is a `tuple` containing two values and the pixel intensity is stored in an NumPy array which contains 240 values. """ lst = line.split(' ') lst = map(lambda x: float(x.strip()), lst) coords = (int(lst[0]), int(lst[1])) pixValues = lst[2:] return (coords, np.asarray(pixValues)) rawData = lines.map(parse) rawData.cache() entry = rawData.first() # Min and max flouresence mn = (rawData .map(lambda (xy, vals): min(vals)) .min()) mx = (rawData .map(lambda (xy, vals): max(vals)) .max()) # Fractional signal change def rescale(ts): """Take a np.ndarray and return the standardized array by subtracting and dividing by the mean. Returns: np.ndarray: The times series adjusted by subtracting the mean and dividing by the mean. """ mn = np.mean(ts) res = (ts - mn) / mn return np.asarray(res) scaledData = rawData.mapValues(lambda v: rescale(v)) mnScaled = scaledData.map(lambda (k, v): v).map(lambda v: min(v)).min() mxScaled = scaledData.map(lambda (k, v): v).map(lambda v: max(v)).max() # PCA on the scaled data componentsScaled, scaledScores, eigenvaluesScaled = pca( scaledData.map(lambda (k,v): v), k=3) # Feature-based aggregation and PCA # during those 240 seconds, the zebrafish was presented with 12 different direction-specific visual patterns, with each one lasting for 20 seconds # For this exercise, you'll create several arrays that perform different types of aggregation vector = np.array([0., 1., 2., 3., 4., 5.]) # Create a multi-dimensional array that when multiplied (using .dot) against vector, results in # a two element array where the first element is the sum of the 0, 2, and 4 indexed elements of # vector and the second element is the sum of the 1, 3, and 5 indexed elements of vector. # This should be a 2 row by 6 column array sumEveryOther = np.array([[1, 0, 1, 0, 1, 0], [0, 1, 0, 1, 0, 1]]) # Create a multi-dimensional array that when multiplied (using .dot) against vector, results in a # three element array where the first element is the sum of the 0 and 3 indexed elements of vector, # the second element is the sum of the 1 and 4 indexed elements of vector, and the third element is # the sum of the 2 and 5 indexed elements of vector. # This should be a 3 row by 6 column array sumEveryThird = np.array([[1, 0, 0, 1, 0, 0], [0, 1, 0, 0, 1, 0], [0, 0, 1, 0, 0, 1]]) # Create a multi-dimensional array that can be used to sum the first three elements of vector and # the last three elements of vector, which returns a two element array with those values when dotted # with vector. # This should be a 2 row by 6 column array sumByThree = np.array([[1, 1, 1, 0, 0, 0], [0, 0, 0, 1, 1, 1]]) # Create a multi-dimensional array that sums the first two elements, second two elements, and # last two elements of vector, which returns a three element array with those values when dotted # with vector. # This should be a 3 row by 6 column array sumByTwo = np.array([ [1, 1, 0, 0, 0, 0], [0, 0, 1, 1, 0, 0], [0, 0, 0, 0, 1, 1]]) # Recreate with `np.tile` and `np.eye` sumEveryOtherTile = np.tile(np.eye(2),3) sumEveryThirdTile = np.tile(np.eye(3),2) # Recreate with `np.kron` sumByThreeKron = np.kron( np.eye(2), np.ones((1,3))) sumByTwoKron = np.kron( np.eye(3), np.ones((1,2))) # Aggregate by time # Create a multi-dimensional array to perform the aggregation T = np.tile( np.kron( np.eye(20), np.ones((1,1)) ), 12) # Transform scaledData using T. Make sure to retain the keys. timeData = scaledData.map(lambda (k,v): (k, T.dot(v))) timeData.cache() # Obtain a compact representation componentsTime, timeScores, eigenvaluesTime = pca( timeData.map(lambda (k,v): v), k=3) # Aggregate by direction # we want to see how different pixels (and the underlying neurons captured in these pixels) react when the # zebrafish is presented with 12 direction-specific patterns, ignoring the temporal aspect of the reaction # Create a multi-dimensional array to perform the aggregation D = np.kron( np.eye(12), np.ones((1,20))) # Transform scaledData using D. Make sure to retain the keys. directionData = scaledData.map(lambda (k,v): (k, D.dot(v))) directionData.cache() print directionData.count() print directionData.first() # Compact representation of direction data componentsDirection, directionScores, eigenvaluesDirection = pca( directionData.map(lambda (k,v): v), k=3)
original post http://vasnake.blogspot.com/2016/08/berkeleyx-cs1901x-scalable-machine.html
Этот комментарий был удален администратором блога.
ОтветитьУдалитьЭтот комментарий был удален администратором блога.
ОтветитьУдалить