Записки программиста, обо всем и ни о чем. Но, наверное, больше профессионального.

2016-08-17

BerkeleyX: CS190.1x Scalable Machine Learning, digest

Курс
BerkeleyX: CS190.1x Scalable Machine Learning

и мои подробные конспекты

экстра

Пришла пора обновить память и скомпилять это все в дайджест.

Content

In this course, you will learn how to write and debug Python Spark (pySpark) programs in five weekly lab exercises

WEEK 0: Setup Course Software Environment
Topics: Step-by-step instructions for installing / using the course software environment, and submitting assignments to the course autograder.

WEEK 1: Course Overview and Introduction to Machine Learning
Topics: Course goals, Apache Spark overview, basic machine learning concepts, steps of typical supervised learning pipelines, linear algebra review, computational complexity / big O notation review.
Lab 1: NumPy, Linear Algebra, and Lambda Function Review.

WEEK 2: Introduction to Apache Spark
Topics: Big data and hardware trends, history of Apache Spark, Spark's Resilient Distributed Datasets (RDDs), transformations, and actions.
Lab 2: Learning Apache Spark. Perform your first course lab where you will learn about the Spark data model, transformations, and actions, and write a word counting program to count the words in all of Shakespeare's plays.

WEEK 3: Linear Regression and Distributed Machine Learning Principles
Topics: Linear regression formulation and closed-form solution, distributed machine learning principles (related to computation, storage, and communication), gradient descent, quadratic features, grid search.
Lab 3: Millionsong Regression Pipeline. Develop an end-to-end linear regression pipeline to predict the release year of a song given a set of audio features. You will implement a gradient descent solver for linear regression, use Spark's machine Learning library ( mllib) to train additional models, tune models via grid search, improve accuracy using quadratic features, and visualize various intermediate results to build intuition.

WEEK 4: Logistic Regression and Click-through Rate Prediction
Topics: Online advertising, linear classification, logistic regression, working with probabilistic predictions, categorical data and one-hot-encoding, feature hashing for dimensionality reduction.
Lab 4: Click-through Rate Prediction Pipeline. Construct a logistic regression pipeline to predict click-through rate using data from a recent Kaggle competition. You will extract numerical features from the raw categorical data using one-hot-encoding, reduce the dimensionality of these features via hashing, train logistic regression models using mllib, tune hyperparameter via grid search, and interpret probabilistic predictions via a ROC plot.

WEEK 5: Principal Component Analysis and Neuroimaging
Topics: Introduction to neuroscience and neuroimaging data, exploratory data analysis, principal component analysis (PCA) formulations and solution, distributed PCA.
Lab 5: Neuroimaging Analysis via PCA - Identify patterns of brain activity in larval zebrafish. You will work with time-varying images (generated using a technique called light-sheet microscopy) that capture a zebrafish's neural activity as it is presented with a moving visual pattern. After implementing distributed PCA from scratch and gaining intuition by working with synthetic data, you will use PCA to identify distinct patterns across the zebrafish brain that are induced by different types of stimuli.

Digest...


WEEK 0: Setup Course Software Environment

Topics: Step-by-step instructions for installing / using the course software environment, and submitting assignments to the course autograder.

Information for setting up for the Spark MOOC, and lab assignments for the course.

Install VirtualBox, Vagrant;
Put Vagrantfile into work dir, run: $ vagrant up --provider=virtualbox
Open url http://localhost:8001/ in browser
Upload lab*.ipynb into Jupiter

WEEK 1: Course Overview and Introduction to Machine Learning

Topics: Course goals, Apache Spark overview, basic machine learning concepts, steps of typical supervised learning pipelines, linear algebra review, computational complexity / big O notation review.
Lab 1: NumPy, Linear Algebra, and Lambda Function Review.

Apache Spark: cluster computing engine
– fast iterative computations
– efficient communication primitives
– APIs in Scala, Java, Python, R
– interactive shell
– integrated higher-level libraries

ML: methods that learn from and make predictions on data
– tools and ideas from: computer science, probability and statistics, optimization, linear algebra

Supervised learning: labeled observations
– learn mapping from observations to labels
– classification: assign a category to each item, categories are discrete
– regression: predict a real value for each item, e.g. stock prices

Unsupervised learning: unlabeled observations
– find latent structure from features
– can be goal in itself; can be means for preprocessing
– clustering: partition observations into homogenous regions, e.g. identify 'communities' within lagre grous of people
– dimensionality reduction: transform an initial feature representation into a more concise representation

typical supervised ML pipeline
– obtain raw data
– feature extraction: represent observations, numeric features, incorporate domain knowledge
unsupervised learning may be used as preprocessor;
spam/not spam classifier: bag of words, if word n in document, set feature n=1
– train a model: use cross-validation dataset
classifiers: logistic regression, SVM, decision tree, random forest, etc.
training involves iterative computations, e.g. gradient descent
goal: find decision boundary: model parameters = feature weights and offset
– evaluate: on test dataset; evaluation metrics: accuracy, precision, recall, F1-score
detect overfitting/high bias
– iterate: goto feature extraction
– predict

linear algebra review
– matrix A: 2-dimensional array, n rows, m columns; A(i,j): entry in i-th row, j-th column
– vector: a: matrix with n rows, 1 column
– transpose: A(i,j) = A.T(j,i); rotate vector couter-clock-wise
scalar product = dot product = inner product for vectors: two vectors => scalar // 1,m * m,1 = 1,1
vector_a dot vector_b = sum a(i)*b(i)
it's a special case of marix multiplication: A.T * B = A dot B for A,B: vectors size m,1
– matrix-vector multiplication: n,m * m,1 = n,1
result: n scalar products fo n row vectors in matrix
– matrix-matrix multiplication: A * B = C // n,m * m,p = n,p
C(i,j) = scalar product of row A(i) and column B(j)
associative, not commutative
outer product: two vectors => matrix // n,1 * 1,m = n,m
identity matrix: I * A = A // ones in diagonal
inverse matrix: A * A.inv = I // only a square matrix can have an inverse
– euclidean norm: norm(x) = ||x||2 = sqrt(x1^2 + x2^2) // vector 'length'

Big O notation
– describes how algorithms respond to changes in input size
– processing time: proportional to number of 'basic' operations
– space: units of storage, 8 bytes for float
– notation: f(x) = O(g(x)) // f does not grow faster than g
– formal definition: |f(x)| <= C|g(x)|
– ignores constants and lower-order terms
– matrix-matrix multiply n,m * m,p = n,p
n*p entries, each entry = dot product between 2 m-dimentional vectors
time O(npm); space o(np)

lab: Gain hands on experience using Python's scientific computing library to manipulate matrices and vectors
import numpy as np 
arr = np.array([1, 2, 3])
elementWise = arr * brr 
dotProd = np.dot(arr, brr) 
A = np.matrix([[1,2,3,4], [5,6,7,8]])
AAt = A * A.T
AAInv = np.linalg.inv(AAt)
lastThree = arr[-3:]
zeros = np.zeros(8)
ones = np.ones(8) 
zerosThenOnes = np.hstack((zeros,ones))
zerosAboveOnes = np.vstack((zeros,ones))
from pyspark.mllib.linalg import DenseVector 
numpyVector = np.array([-3,-4,5])
myDenseVector = DenseVector([3.0, 4.0, 5.0])
denseProduct = DenseVector.dot(DenseVector(numpyVector), myDenseVector)
multiplyByTen = lambda x: x * 10 
mapResult = dataset.map(lambda x: x*5)
filterResult = dataset.filter(lambda x: not(x % 2)) # keep even elems 
reduceResult = dataset.reduce(lambda x,y: x+y)
(dataset 
 .map(lambda x: x+2)
 .reduce(lambda x,y: x*y))

WEEK 2: Introduction to Apache Spark

Topics: Big data and hardware trends, history of Apache Spark, Spark's Resilient Distributed Datasets (RDDs), transformations, and actions.
Lab 2: Learning Apache Spark. Perform your first course lab where you will learn about the Spark data model, transformations, and actions, and write a word counting program to count the words in all of Shakespeare's plays.

the Big Data problem
– data growing faster than computation speed
– storage getting cheaper
– you can't process all data on machine-you-can-afford: big data
– Google web index: 10+ PB
– time to read 1 TB from disk: 3 hours
– solution: distribute data over large clusters

problems with cheap hardware
– failures: 1-5% hard drives/year
0.2% DIMMs/year
– network speed, latency vs. shared memory
– uneven performance

what's hard about cluster computing
– how do we split work across machines? hash table, map-reduce
must consider network, data locality
moving data may be very expensive
– how to deal with failures?
10K nodes, 10 faults a day
stragglers, slow nodes

failures, slow tasks
– re-launch the task on other machine
requirements: individual task in job are idempotent; have no side effects

map-reduce, iterative jobs
– involve a lot of disk I/O for each repetition
– each stage passes data through disk: processing speed limited by HDD speed
– I/O, serialization/deserialization

Apache Spark keeps intermediate results in RAM
– up to 100 times faster
– storage: in-memory, on disk
– operations: map, reduce, join, sample, …
– execution model: batch, interactive, streaming
– programming env: Scala, Java, R, Python
– lazy evaluation: better pipelining, reduces 'wait states'
– lower overhead for starting jobs
– less expensive shuffles

Apache Spark cornerstone: RDD
– resilient distributed dataset: a fault-tolerant abstraction for in-memory cluster computing
– write programs in terms of ops on distributed datasets
– partitioned collections of objects spread across a cluster, stored in memory or on disk
– RDDs built and manipulated through a diverse set of parallel transformations (map, filter, join)
and actions (count, collect, save)
– RDDs automatically rebuilt on machine failure

Spark computing framework
– abstractions and parallel runtime to hide complexities of fault-tolerance and slow machines
– here's an operation, run it on all the data
I don't care where it runs
– Spark tools: SQL, Streaming, MLlib, GraphX, ...

Spark essentials
– programming Spark (pySpark)
– RDDs, creating RDD
– transformations, actions
– Spark programming model

Driver and workers
driver, driver machine, SparkContext: one program
worker, executor on cluster nodes (or local threads): another program
RDDs are distributed across workers
SparkContext object: how and where to access a cluster
parameter: master, type and size of cluster (local[K], spark://host:port, mesos://host:port)

RDD: immutable, track lineage information for recovery; enable ops in parallel
– construct RDD: from files in HDFS, …
by parallelizing existing Python collections
by transforming an existing RDDs
– specify number of partitions in constructor
rdd = sc.parallelize(data, 4) # 4 partitions
rdd2 = sc.textFile('readme.md', 4) # lines of input

two types of operations
transformations (recipe for creating result)
actions
– transformations are lazy
– transformed RDD is executed when action runs on it
– you can persist (cache) RDD in memory or disk

transformations: create a new rdd from existing one
– map, filter, distinct, flatMap, …
rdd.map(lambda x: x*2) # 'lambda: …' functon literals are closures, automatically passed to workers;
– lazy evaluation: nothing executes, Spark saves recipe for transforming source.

actions
– cause Spark to execute recipe to transform source
– mechanism for getting results out of Spark
reduce, take, collect, takeOrdered, …
rdd.reduce(lambda a,b: a*b)

caching
lines = sc.textFile('filename', 4)
lines.cache() # read file and form partitions only once 
comments = lines.filter(isComment)
print lines.count(), comments.count() # second time calls to lines, cached

key-value RDDs
– each element of a Pair RDD is a pair tuple
rdd = sc.parallelize([(1,2), (3,4)])
– key-value transformations
sortByKey, groupByKey, reduceByKey, …
groupByKey can cause massive data shuffle and overload some workers

closures
– Spark automatically creates closures for functions that run on RDD at workers;
any global variables used by those workers
– one closure per worker: sent for every task
no communications between workers
changes to global vars at workers are not sent to driver

shared variables
broadcast variables
efficiently send (large) read-only value to all workers
saved at workers for use in one or more operators
useful for shared read-only data (maps)
accumulators
aggregate values from workers back to driver
only driver can access value of accumulator
for tasks, accum. are write-only
use to count errors seen in RDD across workers
can be 'added' to by associative op, counters, sums
#on driver 
signPrefixes = sc.broadcast(loadCallSignTable()) # on driver 
def processSignCount(sign_count, signPrefixes):
 country = lookupCountry(sign_count[0], signPrefixes.value) # on worker
 …
counts = (contacts.map(processSignCount).reduceByKey(lambda x,y: x+y))
# accum:
accum = sc.accumulator(0) # on driver
rdd = … 
def f(x):
 global accum # on worker
 accum += x
rdd.foreach(f) 
accum.value # on driver 
– using accum. in actions: guaranteed to be applied only once
no such guarantee for accum. in transformations (might have to be run multiple times)

Lab 2: Learning Apache Spark

– At a high level, every Spark application consists of a driver program that launches various parallel operations on executor JVM
– this driver program contains the main loop for the program and creates distributed datasets on the cluster, then applies operations (transformations & actions) to those datasets
– Driver programs access Spark through a SparkContext object, which represents a connection to a computing cluster.
– When you run map() on a dataset, a single stage of tasks is launched.
– A stage is a group of tasks that all perform the same computation, but on different input data.
– One task is launched for each partitition. A task is a unit of execution that runs on a single machine.
– to create a new list on the driver from the the data distributed in the executor nodes. To do this we call the collect() method on our RDD
– the data returned to the driver must fit into the driver's available memory. If not, the driver will crash.
– the dataset is broken into n partitions, so n collect() tasks are launched. Each task collects the entries in its partition and sends the result to the SparkContext, which creates a list of the values
– The filter(f) method is a transformation operation that creates a new RDD from the input RDD...Elements that do not return True will be dropped.
– The key advantage of using takeOrdered() instead of first() or take() is that takeOrdered() returns a deterministic result.
– The top() action is similar to takeOrdered() except that it returns the list in descending order
– The reduce() action reduces the elements of a RDD to a single value by applying a function that takes two parameters and returns a single value. The function should be commutative and associative, as reduce() is applied at the partition level and then again to aggregate results from partitions.
– The takeSample() action returns an array with a random sample of elements from the dataset
– The countByValue() action returns the count of each unique value in the RDD as a dictionary that maps values to counts
flatMap() is similar to map(), except that with flatMap() each input item can be mapped to zero or more output elements
– The reduceByKey() transformation gathers together pairs that have the same key and applies a function to two associated values at a time. reduceByKey() operates by applying the function first within each partition on a per-key basis and then across the partitions
– when using the groupByKey() transformation - all the key-value pairs are shuffled around, causing a lot of unnecessary data to being transferred over the network
combineByKey() can be used when you are combining elements but your return type differs from your input value type
foldByKey() merges the values for each key using an associative function and a neutral "zero value".
– The mapPartitions() transformation uses a function that takes in an iterator (to the items in that specific partition) and returns an iterator. The function is applied on a partition by partition – basis
– The mapPartitionsWithIndex() transformation uses a function that takes in a partition index (think of this like the partition number) and an iterator (to the items in that specific partition). For every partition (index, iterator) pair, the function returns a tuple of the same partition index number and an iterator of the transformed items in that partition

if you try to keep too many RDDs in memory, Spark will automatically delete RDDs from memory to make space for new RDDs. If you later refer to one of the RDDs, Spark will automatically recreate the RDD for you, but that takes time
– You can use the cache() operation to keep the RDD in memory
– once you are finished using an RDD, you can optionally tell Spark to stop caching it in memory by using the RDD's unpersist() method

pySpark runs Python code in a JVM using Py4J. Py4J enables Python programs running in a Python interpreter to dynamically access Java objects in a Java Virtual Machine. Methods are called as if the Java objects resided in the Python interpreter and Java collections can be accessed through standard Python collection methods. Py4J also enables Java programs to call back Python objects

Word Count Lab: Building a word count application
#Part 1: Creating a base RDD and pair RDDs
#Part 2: Counting with pair RDDs
#Part 3: Finding unique words and a mean value
#Part 4: Apply word count to a file

wordsList = ['cat', 'elephant', 'rat', 'rat', 'cat']
wordsRDD = sc.parallelize(wordsList, 4)
def makePlural(word):
    return word + 's'
pluralRDD = wordsRDD.map(makePlural)
pluralLambdaRDD = wordsRDD.map(lambda x: x + 's')
pluralLengths = (pluralRDD
                 .map(lambda x: len(x))
                 .collect())
wordPairs = wordsRDD.map(lambda x: (x, 1))
wordsGrouped = wordPairs.groupByKey()
wordCountsGrouped = wordsGrouped.map(lambda (w, itr): (w, sum(itr)))
wordCounts = wordPairs.reduceByKey(lambda num1, num2: num1 + num2)
wordCountsCollected = (wordsRDD
                       .map(lambda x: (x, 1))
                       .reduceByKey(lambda x, y: x + y)
                       .collect())
uniqueWords = len(wordCountsCollected)
from operator import add
totalCount = (wordCounts
              .map(lambda (w, num): num)
              .reduce(lambda x, y: x + y))
average = totalCount / float(uniqueWords)
def wordCount(wordListRDD):
    res = (wordListRDD
           .map(lambda x: (x, 1))
           .reduceByKey(lambda x, y: x + y)
    )
    return res
def removePunctuation(text):
    res = text.lower()
    res = re.sub(r'[^\w\s]', '', res)
    res = re.sub(r'[_]', '', res)
    res = res.strip()
    return res

shakespeareRDD = (sc
                  .textFile(fileName, 8)
                  .map(removePunctuation))
print '\n'.join(shakespeareRDD
                .zipWithIndex()  # to (line, lineNum)
                .map(lambda (l, num): '{0}: {1}'.format(num, l))  # to 'lineNum: line'
                .take(15))
shakespeareWordsRDD = shakespeareRDD.flatMap(lambda line: line.split(' '))
shakespeareWordCount = shakespeareWordsRDD.count()
shakeWordsRDD = shakespeareWordsRDD.filter(lambda x: x)
shakeWordCount = shakeWordsRDD.count()
top15WordsAndCounts = wordCount(shakeWordsRDD).takeOrdered(15, key=lambda (w, cnt): -cnt)

WEEK 3: Linear Regression and Distributed Machine Learning Principles

Topics: Linear regression formulation and closed-form solution, distributed machine learning principles (related to computation, storage, and communication), gradient descent, quadratic features, grid search.
Lab 3: Millionsong Regression Pipeline. Develop an end-to-end linear regression pipeline to predict the release year of a song given a set of audio features. You will implement a gradient descent solver for linear regression, use Spark's machine Learning library ( mllib) to train additional models, tune models via grid search, improve accuracy using quadratic features, and visualize various intermediate results to build intuition.

linear regression: supervised learning problem (least squares regression)
– learn a mapping from observations to continuos labels given a training set
– example: height, gender, weight => shoe size
for each observation we have a feature vector: X.T = [x1, x2, x3]
we assume a linear mapping between features and label: y ~ w0 + w1x1 + w2x2 + w3x3
feature vector to incorporate offset: X.T = [1, x1, x2, x3]
then: y ~ scalar product = W.T * X
– simple, often works well in practice, can introduce complexity via feature extraction
– loss function: (y — yhat)^2, has nice math. properties
– find W that minimizes squared loss over training poins:
min(w) norm (X*w - y)^2
X: n,d matrix storing points
y: n vector, real values, labels
yhat: n vector, predicted labels, yhat = X*w
w: d vector, regression parameters / model to learn
– find solution by setting derivative to zero
df/dw = 2* (w*X.T*X — X.T*y) = 0
# solve it for w (closed form solution)
w = inv(X.T*X) * X.T*y // if inverse exits

ridge regression : (linear regression + regularization)
– penalize model complexity (overfitting)
– add regularization, keep model parameters small
min(w) norm (X*w — y)^2 + (lambda * norm (w)^2)
– closed-form solution:
w = inv(X.T*X + lambda*Id)*X.T*y // Id: identity matrix size d,d
– all other things being equal, models with smaller weights will be selected over models with larger weights.
– trades off between training error and model complexity

millionsong regression pipeline
– First we must obtain our raw dataset
– Next, we must split the data into training and test datasets
– We then perform feature extraction to find meaningful features,
and typically to represent our data numerically.
– We then train our model using the training data,
and subsequently evaluate it on the test data.
– we iterate over this process until we are satisfied with our results
– we can use the final model that we've created to make predictions
– In terms of feature extraction, we will use quadratic features,
which enable us to incorporate pairwise feature interactions into our model.
In other words, these quadratic features allow us to capture the covariance of the initial timbre features,
thus leading to a non-linear model
– given 2D data, quadratic features are:
x = [x1, x2] => F(x) = [x1^2, x1x2, x2x1, x2^2] 
F'(x) = [x1^2, sqrt(2)x1x2, x2^2]
F(x).T * F(z) = F'(x).T*F'(z) 
– Our goal is to map audio features and, in particular, our quadratic features to song year.
– We'll be using a ridge regression model to learn this mapping.

how should we go about choosing a value for this free parameter lambda?
And in fact, most methods have free parameters, which are often called hyperparameters, that need to be tuned.
validation set for searching hyperparameters
– We now use the training data to train potentially multiple models, each with different hyperparameter settings.
The validation set to evaluate the various models, often using grid search;
and the test set to evaluate the final model's accuracy.

Grid search is a standard method for searching over hyperparameters,
exhaustive search
– define and discretize the search space, either on a linear or log scale
– we use the training data to train models at each of these grid points
using each of the hyperparameter settings defined by our grid
– evaluate points via validation error
– Since least squares linear regression uses the squared loss, it seems reasonable to consider the
mean squared error to evaluate model quality
mse = 1/n sum (for i in 1..n: (yhat — y)^2)
– more natural to use: root-mean-square error: RMSE = sqrt(mse)

Once we've identified our preferred model, we can then use the test set to evaluate its accuracy
And finally, once we have our final model, we can use it to make predictions on future observations

distributed machine learning principles related to computation and storage
– time and space complexity for closed-form solution
w = inv(X.T*X + lambda*Id)*X.T*y
Since matrix multiplication is an associative operation
temp = X.T * y // O(n*d)
inv(...)*temp // O(d^2)
in summary, the two computational bottlenecks:
X.T*X: O(nd^2); its inverse: O(d^3)
overall: time O(nd^2 + d^3)
– storage: X.T*X and inverse: O(d^2) floats
X: O(nd) floats

big n and small d: O(d^3) computations, O(d^2) storage is feasible on a single machine
– well suited for a distributed computation
– store rows across cluster (distributed n)
– compute X.T*X in parallel: as sum of outer products (column * row = matrix)
column = i-th row.T => row(i).T * row(i) // supereasy
– O(nd) distributed storage; O(nd^2) distributed computation;
O(d^2) local storage; O(d^3) local computation
X.map(computeOuterProduct).reduce(sumAndInvert)

big n and big d: d^3 computations; d^2 storage can't be performed locally
rule of thumb: we need complexity at most linear in (n, d) // records, dimensions
– idea: exploit sparsity
sparse data is prevalent
– can reduce dimensions: PCA, low-rank approximation
X can be represented by product of two skinny matrices n,d = n,r * r,d; r « n or d
– use different algorithms: gradient descent O(nd) computations, O(d) local storage per iteration

gradient descent (linear regression optimization)
– objective function (loss) is convex (least squares, ridge regression, logistic regression: all convex)
f(w) = norm (X*w — y)^2
find w that minimize f(w)
– start at random point; repeat until converge:
determine a descent direction (gradient)
choose a step size
update w
– stop when max. #iteration have been performed, or updates become small enough
descent direction: negative slope
 w_next = w — alpha * df/dw 
alpha: hyperparameter = step size, reduce over time ~ alpha/(n*sqrt(iteration))
df/dw: delta = gradient (derivative)

we'll note that f(w) is a summation.
So we can compute the derivative by taking the derivative of each of the summands separately and summing the results.
Moreover, we can take the derivative of each summand using the chain rule.
df/dw = sum (for j in 1..n: (w.T*x(j) — y(j))*x(j)

GD can be done with map-reduce
– if we share w(i) with all workers
for i in range(numIters):
 alpha_i = alpha / (n * sqrt(i+1))
 gradient = X.map(lambda point: gradientSummand(w, point)).sum()
 w -= alpha_i * gradient 
return w 

GD summary:
– pros: easily parallelized; cheap at each iteration; stochastic variants even cheaper
– cons: slow convergence (compared with closed-form); requires communication across nodes!

Communication hierarchy
– CPU to RAM 50 GB/s, HDD 0.1 GB/s
– network, same rack 1 GB/s; other racks 0.3 GB/s
– 50x gap memory/network (+latency)

distributed machine learning principles: communication principles
– reduce communication cost
rule of thumb: perform parallel and in-memory computation
X.cache()
for i in range(numIters):
 alpha_i = alpha / (n * sqrt(i+1))
 gradient = X.map(lambda point: gradientSummand(w, point)).sum()
– scale-up (multicore monsters) is not good enough, expensive
– scale-out (distributed): scales good, but need to deal with network communication
for i in range(numIters):
 ...
 gradient = X.map(lambda point: gradientSummand(w, point)).sum()
 w -= alpha_i * gradient 
need to communicate parameters w in each iteration
rule of thumb: minimize network communication, stay local, reduce iterations
– keep large objects local
– example: linear regression, big n and small d, closed form solution
communicate O(d^2) intermediate data
– example: linear regression, big n and big d, gradient descent
communicate O(d) parameters w on each iteration: OK for fairly large d
– example: hyperparameter tuning, small n, small d, grid search
communicate data O(nd), train each model locally (model parallel)
– example: linear regression, big n and huge d, gradient descent
O(d) communications slow, rely on sparsity to reduce communication
distribute data and model

we can reduce communication by reducing the number of iterations
– in Bulk Synchronous Parallel (BSP) systems, e.g. Spark, we strictly alternate between
computations and communications
– idea: compute more, communicate less
– extreme: divide-and-conquer, fully process each partition locally,
communicate final result
problem: we obtain approximate result, in general
w = X.mapPartitions(localLinearRegression).reduce(combineRegressionResults)
– less extreme option: mini-batching
for i in range(fewerIters):
 update = X.mapPartitions(localGradientUpdates).reduce(combineLocalUpdates)
 w += update 
increase the amount of local computation;
if perform too big batches, returns can be diminishing

throughput/latency
– bytes per second / cost to sent message
– send larger messages, batch communications
– e.g. train multiple models together

millionsong lab
– implement a regression pipeline for predicting a song's release year from audio features
– raw data
explore features
shift labels to start at 0
visualize data
– split to training, cross-validation, test sets
– feature extraction: initially use raw features
compare with quadratic features
– supervised learning: least squares regression
gradient descent from scratch
use MLlib implementation (ridge regression)
visualize performance by iteration
– evaluation
hyperparameter tuning, grid search for stepsize and regularization
evaluate using RMSE, visualize grid search
evaluate final model: using RMSE, compare to baseline model
from pyspark.mllib.regression import LabeledPoint
import numpy as np
from pyspark.mllib.linalg import DenseVector
from pyspark.mllib.regression import LinearRegressionWithSGD
numPartitions = 2
rawData = sc.textFile(fileName, numPartitions)
numPoints = rawData.count()
samplePoints = rawData.take(5)
def parsePoint(line):
    arr = line.split(',')
    label = arr[0]
    features = arr[1:]
    return LabeledPoint(label, features)
parsedDataInit = rawData.map(parsePoint)
onlyLabels = parsedDataInit.map(lambda x: x.label)
minYear = onlyLabels.min()
maxYear = onlyLabels.max()
parsedData = parsedDataInit.map(lambda x: LabeledPoint(x.label - minYear, x.features))
weights = [.8, .1, .1]
seed = 42
parsedTrainData, parsedValData, parsedTestData = parsedData.randomSplit(weights, seed)
parsedTrainData.cache()
parsedValData.cache()
parsedTestData.cache()
nTrain = parsedTrainData.count()
nVal = parsedValData.count()
nTest = parsedTestData.count()
averageTrainYear = (parsedTrainData
    .map(lambda x: x.label)
    .mean()
)
def squaredError(label, prediction):
    return math.pow(label - prediction, 2)
def calcRMSE(labelsAndPreds):
    sumErr = (labelsAndPreds
           .map(lambda (x, y): squaredError(x, y))
           .reduce(lambda x, y: x + y)
    )
    return math.sqrt(sumErr / labelsAndPreds.count())
labelsAndPredsTrain = parsedTrainData.map(lambda x: (x.label, averageTrainYear))
rmseTrainBase = calcRMSE(labelsAndPredsTrain)
def gradientSummand(weights, lp):
    w = weights.T
    x = lp.features
    y = lp.label
    return (w.dot(x) - y) * x
def getLabeledPrediction(weights, observation):
    label = observation.label
    w = weights.T
    x = observation.features
    prediction = w.dot(x)
    return (label, prediction)
def linregGradientDescent(trainData, numIters):
    # The length of the training data
    n = trainData.count()
    # The number of features in the training data
    d = len(trainData.take(1)[0].features)
    w = np.zeros(d)
    alpha = 1.0
    # We will compute and store the training error after each iteration
    errorTrain = np.zeros(numIters)
    for i in range(numIters):
        # Use getLabeledPrediction from (3b) with trainData to obtain an RDD of (label, prediction)
        # tuples.  Note that the weights all equal 0 for the first iteration, so the predictions will
        # have large errors to start.
        labelsAndPredsTrain = trainData.map(lambda x: getLabeledPrediction(w, x))
        errorTrain[i] = calcRMSE(labelsAndPredsTrain)
        # Calculate the `gradient`.  Make use of the `gradientSummand` function you wrote in (3a).
        # Note that `gradient` sould be a `DenseVector` of length `d`.
        gradient = trainData.map(lambda x: gradientSummand(w, x)).reduce(lambda x,y: x+y)
        # Update the weights
        alpha_i = alpha / (n * np.sqrt(i+1))
        w -= alpha_i * gradient
    return w, errorTrain
numIters = 50
weightsLR0, errorTrainLR0 = linregGradientDescent(parsedTrainData, numIters)
labelsAndPreds = parsedValData.map(lambda x: getLabeledPrediction(weightsLR0, x))
rmseValLR0 = calcRMSE(labelsAndPreds)
# LinearRegressionWithSGD
numIters = 500  # iterations
alpha = 1.0  # step
miniBatchFrac = 1.0  # miniBatchFraction
reg = 1e-1  # regParam
regType = 'l2'  # regType
useIntercept = True  # intercept
firstModel = LinearRegressionWithSGD.train(
    parsedTrainData,
    iterations=numIters,
    step=alpha,
    miniBatchFraction=miniBatchFrac,
    regParam=reg,
    regType=regType,
    intercept=useIntercept)
weightsLR1 = firstModel.weights
interceptLR1 = firstModel.intercept
labelsAndPreds = parsedValData.map(lambda x: (x.label, firstModel.predict(x.features)))
rmseValLR1 = calcRMSE(labelsAndPreds)
# grid search, reg
bestRMSE = rmseValLR1
bestRegParam = reg
bestModel = firstModel
numIters = 500
alpha = 1.0
miniBatchFrac = 1.0
for reg in (1e-10, 1e-5, 1):
    model = LinearRegressionWithSGD.train(parsedTrainData, numIters, alpha,
                                          miniBatchFrac, regParam=reg,
                                          regType='l2', intercept=True)
    labelsAndPreds = parsedValData.map(lambda lp: (lp.label, model.predict(lp.features)))
    rmseValGrid = calcRMSE(labelsAndPreds)
    if rmseValGrid < bestRMSE:
        bestRMSE = rmseValGrid
        bestRegParam = reg
        bestModel = model
rmseValLRGrid = bestRMSE
# grid search, alpha
reg = bestRegParam
modelRMSEs = []
for alpha in (1e-5, 10):
    for numIters in (5, 500):
        model = LinearRegressionWithSGD.train(parsedTrainData, numIters, alpha,
                                              miniBatchFrac, regParam=reg,
                                              regType='l2', intercept=True)
        labelsAndPreds = parsedValData.map(lambda lp: (lp.label, model.predict(lp.features)))
        rmseVal = calcRMSE(labelsAndPreds)
        print 'alpha = {0:.0e}, numIters = {1}, RMSE = {2:.3f}'.format(alpha, numIters, rmseVal)
        modelRMSEs.append(rmseVal)
# Add interactions between features
def twoWayInteractions(lp):
    newfeatures = itertools.product(lp.features, repeat=2)
    newfeatures = [x*y for x,y in newfeatures]
    label = lp.label
    features = np.hstack((lp.features, newfeatures))
    return LabeledPoint(label, features)
trainDataInteract = parsedTrainData.map(lambda x: twoWayInteractions(x))
valDataInteract = parsedValData.map(lambda x: twoWayInteractions(x))
testDataInteract = parsedTestData.map(lambda x: twoWayInteractions(x))
numIters = 500
alpha = 1.0
miniBatchFrac = 1.0
reg = 1e-10
modelInteract = LinearRegressionWithSGD.train(trainDataInteract, numIters, alpha,
                                              miniBatchFrac, regParam=reg,
                                              regType='l2', intercept=True)
labelsAndPredsInteract = valDataInteract.map(lambda lp: (lp.label, modelInteract.predict(lp.features)))
rmseValInteract = calcRMSE(labelsAndPredsInteract)

WEEK 4: Logistic Regression and Click-through Rate Prediction

Topics: Online advertising, linear classification, logistic regression, working with probabilistic predictions, categorical data and one-hot-encoding, feature hashing for dimensionality reduction.
Lab 4: Click-through Rate Prediction Pipeline. Construct a logistic regression pipeline to predict click-through rate using data from a recent Kaggle competition. You will extract numerical features from the raw categorical data using one-hot-encoding, reduce the dimensionality of these features via hashing, train logistic regression models using mllib, tune hyperparameter via grid search, and interpret probabilistic predictions via a ROC plot.

canonical scalable ML problem : online advertising
– huge amount of data: online human activity, ad properties, etc
– skewed classes; data is sparse, high-dimensional
– heavily biased label training set
players:
– publishers
– advertisers
– matchmakers (in real time)
metric for success: click-through rate
– get user to do something, click on ad
– how likely it is for a user to click on an ad?
– choose ads to maximize probability
conditional probability: probability given predictive features
– goal: estimape P(click | user, ad, publisher) given: massive amounts of labeled data

predictive features
– ad's historical performance
– advertiser, ad content info
– publisher info
– user info (search/click history)

linear classification, relation to linear regression; logistic regression
– supervised learning
– goal: learn a mapping from observations to discrete labels given a set of training examples
– example: spam classification, emails as observations, labels: spam/not spam
– example: click-through rate prediction; observations: user,ad,publisher triples;
labels: click/not-click
– linear classifier: yhat = sign(W.T*X)
decision boundary: W.T*X = 0
– how can we evaluate predictions?
0-1 loss: penalty is 0 for correct prediction, 1 otherwise
let y = {-1, 1); z = y * W.T * X
then: loss = 1 if z < 0; loss = 0 otherwise
– idea: find W that minimizes 0-1 loss over training poins
min sum (for i in 1..n: loss(y(i) * W.T * X(i)))
– this is not a convex function, alas.
– solution: approximate 0-1 loss with convex loss: SVM, logistic regression, Adaboost

logloss: logistic loss
logloss = log(1 + e^-z)
– logistic regression: learn parameters W that minimizes logloss on training data
min sum (for i in 1..n: logloss(y(i) * W.T * X(i)))
– convex, closed form solution doesn't exist
– can solve via gradient descent
– update rule: w_next = w — alpha * gradient
gradient = sum (for j in 1..n: 1 — (1 / (1 + exp(-y(j)*W.T*X(j)))))*(-y(j)*X(j))
– adding regularization (trading off logloss over model complexity)
min sum (for i in 1..n: logloss(y(i) * W.T * X(i))) + lambda * norm(W)^2

probabilistic interpretation of logistic regression
– we want more granular information, not just 0/1
– goal: conditional probability P(y=1 | x)
– example: P(y=rain | temperature=a, clouds=b, humidity=c) = 0.7
– example: P(y=click | … ) = 0.1
10% chance that a click event occur
0.1 is a relatively high probability
– if we want to work with a linear model, we need to squash its output
to 0..1 range
– logistic, or sigmoid function
P(y=1 | x) = sigma(W.T*X)
– domain (x-axis) is all real numbers; range of function (y-axis) is 0..1
sigma(z) = 1 / (1 + exp(-z))
– let y = {0, 1}
P(y=1 | x) = sigma(W.T*X)
P(y=0 | x) = 1 - sigma(W.T*X)
– to make class predictions, we need a threshold
default threshold = 0.5
P(y=1 |x) > 0.5 => yhat = 1
– decision boundary: sigma(0) = 0.5
– with default threshold, the decision boundaries are identical to linear classification

how we can use probabilistic predictions
– evaluation metrics, threshold
– with spam detector, false positive is more harmful than false negative
– you can yse a threshold greater 0.5 to be more 'conservative'
– how can we determine the best choice?
– use a ROC plot : false positive rate vs true positive rate for different thresholds
in ideal world TPR = 100%, FPR = 0%
at random: 50% - 50%
establishing these baselines, you can use a ROC plot to evaluate a classifier
– assuming you might be willing to tolerate at most 10% FPR => threshold

you may want to use probabilities directly, w/o making class predictions
– click-through prediction: click events are rare, probabilities will be uniformly low
– in these cases, it make sense to use the logloss
logloss(p,y) = -log(p) if y=1; logloss = -log(1-p) if y=0 
when y = 1, we want p =1
– no penalty at 1; increasing penalty away from 1
– similar when y=0
– in comparison to the 0-1 loss, which either returns a penalty of 0 or 1,
the log loss is a more fine-grained metric for evaluating the accuracy of probabilistic predictions

categorical data, one-hot-encoding
– one-hot encoding: a standard technique for converting categorical features into numerical ones
– in many other cases, the raw data is not numeric (gender, nationality, occupation, …)
– methods: Decision Trees, Naive Bayes, … naturally support non-numeric features (this limits our options)
– how we can effectively perform this conversion to numeric data

non-numerical features
– categorical (gender, language, …), no intrinsic ordering
– ordinal (is your health: poor, reasonable, good, excellent?), intrinsic ordering, relative (not absolute)

can't convert by just assigning numerical values to categories
– introduce a degree of closeness that didn't previously exist

one-hot-encoding (used in bag-of-words)
– we create a separate dummy variable for each category
sparse matrix: only one variable in a row would be = 1

how to compute one-hot encoded features
– features:
animal: bear, cat, mouse // 3 categories
color: black, tabby // 2 categories
diet: mouse, salmon // 2 categories, optional
– points (animal, color, diet):
a1 = mouse, black
a2 = cat, tabby, mouse
a3 = bear, black, salmon
– in total: 7 categories across 3 features
– step 1: map each category to a dummy feature, assign a number (position in array)
(animal, bear) => 0 # first feature
(animal, cat) => 1 # second
(diet, mouse) => 7
– step 2: map categorical features to numerical
a1: mouse, black, n/a // animal, color, diet // => (animal, mouse) = #2; (color, black) => #3
a1 = (0, 0, 1, 1, 0, 0, 0) // array of dummy features

how to efficiently store OHE features
– our OHE features are sparse
– store the indices and values for all non-zero entries and assume that all other entries equal 0
a1 = ((2,1), (3,1))
– can lead to a dramatic savings, both in terms of storage and computation

feature hashing, which can be used to reduce the dimensionality of OHE features
– hash features are a reasonable alternative to OHE features
– a consequence of OHE features is that they drastically increase the dimensionality of our data
– example: bag-of-words, over a million words in a dictionary
bigrams make it even larger
– large d: need bigger n; increased communication, larger storage
– By using hashing principles, we can create a lower dimensional feature representation
– feature hashing can be interpreted as an unsupervised learning method, which is performed as a pre-processing step
– hash functions map an object to one of m buckets
each feature category is an object, and we have fewer buckets than feature categories
different categories are bound to collide or map to the same bucket
– bucket indices are what we define as our hashed features
– hash function H => 0..3 // m = 4 number of buckets
H(animal, mouse) = 3
H(color, black) = 2
H(animal, cat) = 0
H(color, tabby) = 0
H(diet, mouse) = 2
– hashed features:
a1 = (0, 0, 1, 1)
a2 = (2, 0, 1, 0) // 2: tabby cat, both categories in bucket #0
– hashed features have been used in practice and
have resulted in good practical performance on various text classification tasks
– And it turns out that under certain conditions, hashed features lead to good approximations
of these inner products, theoretically
pairwise inner products between the data points
X_hash = X.map(applyHashFunction).map(createSparseVector)

lab: click-through rate prediction pipeline
– goal: estimate P(click | user, ad, publisher)
– raw data includes 39 features describing users, ads, and publishers.
Many of these features contain a large number of categories
– split this data set into training, validation, and test data sets
– feature extraction is the main focus of this lab
You'll create OHE features, as well as hashed features,
and store these features using a sparse representation
– you will use MLlib to train logistic regression models
– You will then perform Hyperparameter tuning
to search for a good regularization parameter,
evaluating the results via log loss
– You will then evaluate the final model, again, using log loss
and compare its accuracy to that of a simple baseline
import numpy as np
from pyspark.mllib.linalg import SparseVector
from pyspark.mllib.regression import LabeledPoint
from pyspark.mllib.classification import LogisticRegressionWithSGD
from math import log
from math import exp #  exp(-t) = e^-t
import math
from collections import defaultdict
import hashlib
sampleOne = [(0, 'mouse'), (1, 'black')]
sampleTwo = [(0, 'cat'), (1, 'tabby'), (2, 'mouse')]
sampleThree =  [(0, 'bear'), (1, 'black'), (2, 'salmon')]
sampleDataRDD = sc.parallelize([sampleOne, sampleTwo, sampleThree])
# OHE encoding
sampleOHEDictManual = {}
sampleOHEDictManual[(0,'bear')] = 0
sampleOHEDictManual[(0,'cat')] = 1
sampleOHEDictManual[(0,'mouse')] = 2
sampleOHEDictManual[(1,'black')] = 3
sampleOHEDictManual[(1,'tabby')] = 4
sampleOHEDictManual[(2,'mouse')] = 5
sampleOHEDictManual[(2,'salmon')] = 6
# sparse vector
bDense = np.array([0., 0., 0., 1.])
bSparse = SparseVector(len(bDense), enumerate(bDense))
sampleOneOHEFeatManual = SparseVector(7, {2: 1.0, 3: 1.0})
sampleTwoOHEFeatManual = SparseVector(7, {1: 1.0, 4: 1.0, 5: 1.0})
sampleThreeOHEFeatManual = SparseVector(7, {0: 1.0, 3: 1.0, 6: 1.0})
def oneHotEncoding(rawFeats, OHEDict, numOHEFeats):
    spDict = {}
    for x in rawFeats:
        key = OHEDict.get(x, None)
        if key is not None:
            spDict[key] = 1.0
    return SparseVector(numOHEFeats, spDict)
numSampleOHEFeats = len(sampleOHEDictManual)
sampleOHEData = sampleDataRDD.map(lambda x: oneHotEncoding(x, sampleOHEDictManual, numSampleOHEFeats))
# OHE dict creation
sampleDistinctFeats = (sampleDataRDD
                       .flatMap(lambda x: x)
                       .distinct())
sampleOHEDict = (sampleDistinctFeats
    .zipWithIndex()
    .collectAsMap())
def createOneHotDict(inputData):
    distinctFeats = (inputData
        .flatMap(lambda x: x)
        .distinct())
    return (distinctFeats
        .zipWithIndex()
        .collectAsMap())
sampleOHEDictAuto = createOneHotDict(sampleDataRDD)
# load data
rawData = (sc
    .textFile(fileName, 2)
    .map(lambda x: x.replace('\t', ',')))
weights = [.8, .1, .1]
seed = 42
rawTrainData, rawValidationData, rawTestData = rawData.randomSplit(weights, seed)
rawTrainData.cache()
rawValidationData.cache()
rawTestData.cache()
nTrain = rawTrainData.count()
nVal = rawValidationData.count()
nTest = rawTestData.count()
# parse data
def parsePoint(point):
    res = []
    rowFeats = map(lambda x: x.strip(), point.split(','))
    for idx, x in enumerate(rowFeats):
        if idx == 0: continue
        res.append((idx-1, x))
    return res
parsedTrainFeat = rawTrainData.map(parsePoint)
# OHE
numCategories = (parsedTrainFeat
                 .flatMap(lambda x: x)
                 .distinct()
                 .map(lambda x: (x[0], 1))
                 .reduceByKey(lambda x, y: x + y)
                 .sortByKey()
                 .collect())
ctrOHEDict = createOneHotDict(parsedTrainFeat)
numCtrOHEFeats = len(ctrOHEDict.keys())
def parseOHEPoint(point, OHEDict, numOHEFeats):
    features = []
    rowFeats = map(lambda x: x.strip(), point.split(','))
    for idx, x in enumerate(rowFeats):
        if idx == 0:
            label = x
            continue
        features.append((idx-1, x))
    oheFeats = oneHotEncoding(features, OHEDict, numOHEFeats)
    return LabeledPoint(label, oheFeats)
OHETrainData = rawTrainData.map(lambda point: parseOHEPoint(point, ctrOHEDict, numCtrOHEFeats))
OHETrainData.cache()
# ignore tuples w/o key
def oneHotEncoding(rawFeats, OHEDict, numOHEFeats):
    spDict = {}
    for x in rawFeats:
        key = OHEDict.get(x, None)
        if key is not None:
            spDict[key] = 1.0
    return SparseVector(numOHEFeats, spDict)
OHEValidationData = rawValidationData.map(lambda point: parseOHEPoint(point, ctrOHEDict, numCtrOHEFeats))
OHEValidationData.cache()
# CTR prediction, logloss evaluation
numIters = 50
stepSize = 10.
regParam = 1e-6
regType = 'l2'
includeIntercept = True
model0 = LogisticRegressionWithSGD.train(
    OHETrainData,
    iterations=numIters,
    step=stepSize,
    regParam=regParam,
    regType=regType,
    intercept=includeIntercept)
sortedWeights = sorted(model0.weights)
def computeLogLoss(p, y):
    epsilon = 10e-12
    p = max(epsilon, p)
    p = min(1-epsilon, p)
    if y == 1:
        res = log(p)
    else:
        res = log(1 - p)
    return -1.0 * res
# baseline logloss
classOneFracTrain = (OHETrainData
    .map(lambda x: x.label)
    .reduce(lambda x, y: x + y)
) / OHETrainData.count()
logLossTrBase = (OHETrainData
    .map(lambda x: computeLogLoss(classOneFracTrain, x.label))
    .reduce(lambda x, y: x + y)
) / OHETrainData.count()
# Calculate the probability for an observation given a set of weights and intercept.
def getP(x, w, intercept):
    rawPrediction = intercept + x.dot(w)
    # Bound the raw prediction value
    rawPrediction = min(rawPrediction, 20)
    rawPrediction = max(rawPrediction, -20)
    return math.pow(1.0 + exp(-1 * rawPrediction), -1)
trainingPredictions = (OHETrainData
    .map(lambda x: getP(x.features, model0.weights, model0.intercept))
)
# Calculates the log loss for the data given the model.
def evaluateResults(model, data):
    res = (data
        .map(lambda x: computeLogLoss(getP(x.features, model.weights, model.intercept), x.label))
        .reduce(lambda x, y: x + y)
    ) / data.count()
    return res
logLossTrLR0 = evaluateResults(model0, OHETrainData)
# validation logloss
logLossValBase = (OHEValidationData
    .map(lambda x: computeLogLoss(classOneFracTrain, x.label))
    .reduce(lambda x, y: x + y)
) / OHEValidationData.count()
logLossValLR0 = evaluateResults(model0, OHEValidationData)
# Reduce feature dimension via feature hashing
def hashFunction(numBuckets, rawFeats, printMapping=False):
    """Calculate a feature dictionary for an observation's features based on hashing.
    Returns:
        dict of int to float:  The keys will be integers which represent the buckets that the
            features have been hashed to.  The value for a given key will contain the count of the
            (featureID, value) tuples that have hashed to that key.
    """
    mapping = {}
    for ind, category in rawFeats:
        featureString = category + str(ind)
        mapping[featureString] = int(int(hashlib.md5(featureString).hexdigest(), 16) % numBuckets)
    if(printMapping): print mapping
    sparseFeatures = defaultdict(float)
    for bucket in mapping.values():
        sparseFeatures[bucket] += 1.0
    return dict(sparseFeatures)
def parseHashPoint(point, numBuckets):
    """Create a LabeledPoint for this observation using hashing.
    Returns:
        LabeledPoint: A LabeledPoint with a label (0.0 or 1.0) and a SparseVector of hashed
            features.
    """
    rowFeats = map(lambda x: x.strip(), point.split(','))
    features = [] # list of tuples like [(0, 'mouse'), (1, 'black')]
    label = None
    for idx, x in enumerate(rowFeats):
        if idx == 0:
            label = x
            continue
        features.append((idx-1, x))
    dictHashed = hashFunction(numBuckets, features, False) # dict like {2: 1.0, 3: 1.0}
    hashedFeats = SparseVector(numBuckets, dictHashed)
    return LabeledPoint(label, hashedFeats)
numBucketsCTR = 2 ** 15
hashTrainData = rawTrainData.map(lambda x: parseHashPoint(x, numBucketsCTR))
hashTrainData.cache()
hashValidationData = rawValidationData.map(lambda x: parseHashPoint(x, numBucketsCTR))
hashValidationData.cache()
hashTestData = rawTestData.map(lambda x: parseHashPoint(x, numBucketsCTR))
hashTestData.cache()
# check sparsity // Since we have 33K hashed features versus 233K OHE features, we should expect OHE features to be sparser
def computeSparsity(data, d, n):
    """Calculates the average sparsity for the features in an RDD of LabeledPoints.
    Returns:
        float: The average of the ratio of features in a point to total features.
    """
    total = (data
        .map(lambda x: len(x.features.indices))
        .sum()
    )
    return float(total) / float(d * n)
averageSparsityHash = computeSparsity(hashTrainData, numBucketsCTR, nTrain)
averageSparsityOHE = computeSparsity(OHETrainData, numCtrOHEFeats, nTrain)
# Logistic model with hashed features
numIters = 500
regType = 'l2'
includeIntercept = True
stepSizes = (1, 10)
regParams = (1e-6, 1e-3)
bestModel = None
bestLogLoss = 1e10
for stepSize in stepSizes:
    for regParam in regParams:
        model = (LogisticRegressionWithSGD
            .train(hashTrainData, numIters, stepSize, regParam=regParam, regType=regType,
            intercept=includeIntercept))
        logLossVa = evaluateResults(model, hashValidationData)
        print ('\tstepSize = {0:.1f}, regParam = {1:.0e}: logloss = {2:.3f}'
               .format(stepSize, regParam, logLossVa))
        if (logLossVa < bestLogLoss):
            bestModel = model
            bestLogLoss = logLossVa
# Evaluate on the test set 
logLossTest = evaluateResults(bestModel, hashTestData)
logLossTestBaseline = (hashTestData
    .map(lambda x: computeLogLoss(classOneFracTrain, x.label))
    .sum()
) / hashTestData.count()

WEEK 5: Principal Component Analysis and Neuroimaging

Topics: Introduction to neuroscience and neuroimaging data, exploratory data analysis, principal component analysis (PCA) formulations and solution, distributed PCA.
Lab 5: Neuroimaging Analysis via PCA - Identify patterns of brain activity in larval zebrafish. You will work with time-varying images (generated using a technique called light-sheet microscopy) that capture a zebrafish's neural activity as it is presented with a moving visual pattern. After implementing distributed PCA from scratch and gaining intuition by working with synthetic data, you will use PCA to identify distinct patterns across the zebrafish brain that are induced by different types of stimuli.

Exploratory analysis is a broad field covering many topics in both machine learning and visualization
– process the data to extract simpler, more compact representations
– perform analyses to identify patterns in the data
– visualize and explore them, and hopefully derive insight which we can share
– we really don't know ahead of time exactly what analyses to perform
– almost all methods in data analysis fall into one of two categories
supervised
unsupervised
– we're going to focus on dimensionality reduction and clustering

clustering
– take a complex collection of many data points and reduce it into a simpler representation

dimensionality reduction
– describes data using a simpler representation; may make interesting patterns more clear
– lower-dimensional space: state space of a system

PCA, the most commonly used method for dimensionality reduction
– in practice we often measure redundant signals (US and EU shoe size)
– we represent data via method by which it was gathered (pixels in images of brain activity)
– goal: find a 'better' representation for data
to visualize and discover patterns;
preprocessing for other tasks (feature hashing)
– intuition: US and EU shoe size, project points to the line
projected point close to original, Euclidean distance, norm
– min dist: our goal is to minimize the Euclidean distances between our original points and their projections
unlike (yhat — y)^2 as in linear regression
– max variance: in order to identify patterns in our data, we often look for variation across observations
find a succinct representation that best captures variation in our initial data
– PCA solution represents the original data in terms of it's directions of maximal variation.

PCA optimization problem
– data: matrix X size n,d (n rows, d columns)
– we aim to find a k dimensional representation for each of our n data points,
where k is much smaller than d
– matrix Z stores the k dimensional representations of our data points (n,k)
– PCA scores: Z = X*P // linear combination of the original data
– matrix P has k columns: principal components (and d rows)
– PCA is an optimization problem: we are searching over all d,k matrices
to find the right P
– we impose variance and covariance constraints on P
off diagonal entries of the covariance matrix Cz are all equal to zero;
the variance of the first feature in the reduced dimension is the largest …;
– Formulation
X(i,j): i-th observation, j-th feature
mu(j): mean of j-th feature
variance of j-th feature:
var(j)^2 = 1/n sum (for i in 1..n: (X(i,j) — mu(j))^2)
assuming zero mean (normalized data):
var(j)^2 = 1/n sum (for i in 1..n: X(i,j)^2)

covariance: quantifies the relationship between two features
var(j,l) = 1/n sum (for i in 1..n: X(i,j)*X(i,l))
– The covariance is symmetric in that sigma 1,2 equals sigma 2,1
– a large positive covariance indicates that the two features are highly correlated
– large negative covariance indicates that the two features are highly anticorrelated
– covariance of zero means that the two features are uncorrelated
– if the covariance between the two features equals each of their squared variances,
then the two features are the same
– The covariance matrix is symmetric (d,d) matrix
where each entry stores pairwise covariance information about the d features
Cx = 1/n * X.T * X
– diagonal entries of this matrix equal the sample variances of the features

constraints
we want to avoid redundancy
– and to address this, PCA requires that the features in our reduced dimensions have no correlation
– off diagonal entries of the covariance matrix Cz are all equal to zero
features in the reduced dimension maximize variance
– the variance of the first feature in the reduced dimension is the largest,
followed by the variance in the second feature, and so on

all covariance matrices have an eigendecomposition,
– which means they can be written as the product of three matrices, namely U, lambda, and U transpose
(U * Lambda * U.T)
– with each column of U being an eigenvector
– Eigenvectors have corresponding eigenvalues, and the eigenvectors are sorted by their eigenvalues

PCA solution can be derived from the sample covariance matrix of X
– P = top k eigenvectors of Cx
– These vectors are stored by a matrix U, with each column of U being an eigenvector
In the context of PCA
– the d eigenvectors of the covariance matrix are the d orthonormal directions of maximal variance in the data
orthonormal vectors are simply perpendicular vectors with Euclidean norm equal to one

how to choose k, or the dimension of the new representation
– choose k to equal 2 or 3 for plotting purposes
– or, choose k that captures most of the variance in our data
sum (i in 1..k: lambda(i)) / sum (j in 1..d: lambda(j)) > 95%

limitations and assumptions
– First, PCA makes linearity assumptions,
– and also assumes that the principal components are orthogonal.
– we've assumed that our data is centered, or, in other words, that our features have zero mean
– since PCA chooses directions of max variance, it depends on the relative scale of each feature
normalize and scale features! (x — mean) / (max - min)

algorithmic interpretation of PCA
– Orthogonal vectors are simply perpendicular vectors,
and one nice property of orthogonal vectors is that their dot product always equals 0
– a unit vector is simply a vector whose Euclidean norm equals one
– orthonormal: These are vectors that are orthogonal and also have unit norm

interpretation of PCA as an iterative algorithm
– At the i-th iteration we aim to find the direction of maximal variance in the data,
subject to the constraint that this direction is of unit norm
and that it is orthogonal to all directions we chose in previous iterations.
– We can then project our data onto this direction,
and the locations along this direction become the i-th feature in our new representation

distributed PCA
– the first step in PCA involves centering our data
– centered data is stored in an n,d matrix which we'll call X
– we need to compute the sample covariance matrix, or the scatter matrix X.T*X
scatter matrix is simply the sample covariance matrix without dividing by n
PCA returns the same solution in either case
– in the third step, we compute these eigenvectors by performing an eigendecomposition
– in order to obtain our k dimensional representation,
we need to multiply our original data by the top k eigenvectors to compute the PCA scores

In the first setting, the big n and small d setting,
– we assume that quadratic local storage, cubic local computation,
and O(dk) communication are all feasible
– These are similar assumptions we make when performing closed-form linear regression at scale
– since n is large, we store it in a data parallel fashion, which requires O of nd distributed storage
– center our data. And to do this, we must compute the mean of each feature m = sum/n
vector m to be the d dimensional vector whose i-th component is the mean of the i-th feature
compute the mean vector via a simple reduce operation whereby we sum all of the data points together
After computing m on the driver, we must send it back to the workers so that each data point can be centered
we can then perform a map operation to create the centered data points, which simply involves subtracting m from each original data point
– we next need to compute the scatter matrix X.T * X
we can efficiently perform this computation in a distributed fashion by using outer products
we can express this matrix multiplication as a sum of outer products where each outer product
involves only a single row of x, or a single data point
as a simple MapReduce operation
In the map step, we take each point and compute its outer product with itself // bottleneck O(d^2)
In the reduce step, we simply sum over all of these outer products
– we need to perform its eigendecomposition
using lib func eigh (generally requires cubic time and quadratic space)
As a result of eigendecomposition, we have access to the top k eigenvectors on the driver
– now we need to communicate them to the workers
so that they can compute these k dimensional representations for the data points // bottleneck O(dk)
– now we can compute the k dimensional representation for each point via a simple matrix vector multiply
O(dk) map operation

second large-scale setting, we assume that both n and d are large.
– And therefore, we can only afford storage, computation, and communication that are linear in both n and d
O(dk+n) local storage, computation, communication
iterative algorithm
iterative approach relies on performing a sequence of matrix vector products to compute the top K eigenvectors
top K principal components of our data, and we will denote them by the matrix P
The most common methods for doing this are Krylov subspace and random projection based methods
– iteratively compute X.T*Xv for some v size d; O(k) passes over the data, O(dk) local storage
– the driver provides the workers with a D dimensional vector on each iteration i,
and requires that the workers left multiply this vector by the scatter matrix
– The first step involves the algorithm communicating the D dimensional vector, vi, to all workers
vector vi communicated to the workers in step one changes on each iteration
– Next, we need to multiply the scatter matrix by the vector vi in a distributed fashion
qi = X.T * X*vi = X.T * (X*vi) // n.b. first calc bi size n = X*vi
 b = np.array(X.map(dotProduct).collect())
q = X.map(rescaleByBi).reduce(sumVectors)
each component of bi equal to the dot product between a row of X and the vector vi
each worker will need to store bi locally in order to compute the overall result, qi, in the next step.
So we're going to need to communicate bi to each worker
X.T * bi = sum of rescaled data points
– The driver then uses qi to update its estimate of P

PCA lab
we'll work with time-varying images containing patterns of the zebrafish's neural activity
as it is presented with a moving visual pattern.
Different stimuli induce different patterns across the brain, and we can use exploratory analyses
to identify these patterns
– we start with a collection of many neural signals, each containing the average response to the stimulus
– We then perform PCA to reduce these high dimensional signals into a two dimensional space
– What we now have for every neuron is a pair of numbers that describes the temporal properties of its response

Part 1: Work through the steps of PCA on a sample dataset
Part 2: Write a PCA function and evaluate PCA on sample datasets
Part 3: Parse, inspect, and preprocess neuroscience data then perform PCA
Part 4: Perform feature-based aggregation followed by PCA

the non-spherical data is amenable to dimensionality reduction via PCA, while the spherical data is not
– When the covariance is zero, the two dimensions are uncorrelated, and hence the data looks spherical
– In contrast, when the covariance is 0.9, the two dimensions are strongly (positively) correlated and thus the data is non-spherical
import numpy as np
from numpy.linalg import eigh
correlatedData = sc.parallelize(dataCorrelated)
meanCorrelated = correlatedData.sum()
meanCorrelated = [meanCorrelated[0] / correlatedData.count(), meanCorrelated[1] / correlatedData.count()]
correlatedDataZeroMean = correlatedData.map(lambda x: (x[0] - meanCorrelated[0], x[1] - meanCorrelated[1]))
# Sample covariance matrix
correlatedCov = (correlatedDataZeroMean
    .map(lambda x: np.outer(x, x))
    .sum()
) / correlatedDataZeroMean.count()
def estimateCovariance(data):
    """Compute the covariance matrix for a given rdd.
    Returns:
        np.ndarray: A multi-dimensional array where the number of rows and columns both equal the
        length of the arrays in the input `RDD`.
    """
    count = data.count()
    mean = data.sum()
    mean = map(lambda x: x / count, mean)
    centered =data.map(lambda x: x - mean)
    res = (centered
           .map(lambda x: np.outer(x, x))
           .sum()
    ) / count
    return res
correlatedCovAuto= estimateCovariance(correlatedData)
# Eigendecomposition
eigVals, eigVecs = eigh(correlatedCovAuto)
# find the top eigenvector based on the largest eigenvalue
inds = np.argsort(eigVals)[::-1]
topComponent = eigVecs[:, inds[0]]
# PCA scores
correlatedDataScores = correlatedData.map(lambda x: x.dot(topComponent))
# PCA function
def pca(data, k=2):
    """Computes the top `k` principal components, corresponding scores, and all eigenvalues.
    Returns:
        tuple of (np.ndarray, RDD of np.ndarray, np.ndarray): A tuple of (eigenvectors, `RDD` of
        scores, eigenvalues).
    """
    covMatrix = estimateCovariance(data)
    eigValues, eigVecs = eigh(covMatrix)
    inds = np.argsort(eigValues)
    revInds = inds[::-1]
    d = len(eigValues)
    topComponents = np.zeros((d, k))
    for idx in range(k): # insert columns
        topComponents[:, idx] = eigVecs[:, revInds[idx]]
    dataScores = data.map(lambda x: x.dot(topComponents))
    # Return the `k` principal components, `k` scores, and all eigenvalues
    return (topComponents, dataScores, eigValues[revInds])
# Run pca on correlatedData with k = 2
topComponentsCorrelated, correlatedDataScoresAuto, eigenvaluesCorrelated = pca(correlatedData, k=2)
# Variance explained
def varianceExplained(data, k=1):
    """Calculate the fraction of variance explained by the top `k` eigenvectors.
    Returns:
        float: A number between 0 and 1 representing the percentage of variance explained
        by the top `k` eigenvectors.
    """
    components, scores, eigenvalues = pca(data, k)
    topEV = eigenvalues[:k]
    return sum(topEV) / sum(eigenvalues)
varianceRandom1 = varianceExplained(randomData, 1)
varianceCorrelated1 = varianceExplained(correlatedData, 1)
# Parse, inspect, and preprocess neuroscience data then perform PCA
lines = sc.textFile(inputFile)
def parse(line):
    """Parse the raw data into a (`tuple`, `np.ndarray`) pair.
    Returns:
        tuple of tuple, np.ndarray: A (coordinate, pixel intensity array) `tuple` where coordinate is
        a `tuple` containing two values and the pixel intensity is stored in an NumPy array
        which contains 240 values.
    """
    lst = line.split(' ')
    lst = map(lambda x: float(x.strip()), lst)
    coords = (int(lst[0]), int(lst[1]))
    pixValues = lst[2:]
    return (coords, np.asarray(pixValues))
rawData = lines.map(parse)
rawData.cache()
entry = rawData.first()
# Min and max flouresence
mn = (rawData
      .map(lambda (xy, vals): min(vals))
      .min())
mx = (rawData
      .map(lambda (xy, vals): max(vals))
      .max())
# Fractional signal change
def rescale(ts):
    """Take a np.ndarray and return the standardized array by subtracting and dividing by the mean.
    Returns:
        np.ndarray: The times series adjusted by subtracting the mean and dividing by the mean.
    """
    mn = np.mean(ts)
    res = (ts - mn) / mn
    return np.asarray(res)
scaledData = rawData.mapValues(lambda v: rescale(v))
mnScaled = scaledData.map(lambda (k, v): v).map(lambda v: min(v)).min()
mxScaled = scaledData.map(lambda (k, v): v).map(lambda v: max(v)).max()
# PCA on the scaled data
componentsScaled, scaledScores, eigenvaluesScaled = pca(
    scaledData.map(lambda (k,v): v),
    k=3)
# Feature-based aggregation and PCA
# during those 240 seconds, the zebrafish was presented with 12 different direction-specific visual patterns, with each one lasting for 20 seconds
# For this exercise, you'll create several arrays that perform different types of aggregation
vector = np.array([0., 1., 2., 3., 4., 5.])
# Create a multi-dimensional array that when multiplied (using .dot) against vector, results in
# a two element array where the first element is the sum of the 0, 2, and 4 indexed elements of
# vector and the second element is the sum of the 1, 3, and 5 indexed elements of vector.
# This should be a 2 row by 6 column array
sumEveryOther = np.array([[1, 0, 1, 0, 1, 0],
                          [0, 1, 0, 1, 0, 1]])
# Create a multi-dimensional array that when multiplied (using .dot) against vector, results in a
# three element array where the first element is the sum of the 0 and 3 indexed elements of vector,
# the second element is the sum of the 1 and 4 indexed elements of vector, and the third element is
# the sum of the 2 and 5 indexed elements of vector.
# This should be a 3 row by 6 column array
sumEveryThird = np.array([[1, 0, 0, 1, 0, 0],
                          [0, 1, 0, 0, 1, 0],
                          [0, 0, 1, 0, 0, 1]])
# Create a multi-dimensional array that can be used to sum the first three elements of vector and
# the last three elements of vector, which returns a two element array with those values when dotted
# with vector.
# This should be a 2 row by 6 column array
sumByThree = np.array([[1, 1, 1, 0, 0, 0],
                       [0, 0, 0, 1, 1, 1]])
# Create a multi-dimensional array that sums the first two elements, second two elements, and
# last two elements of vector, which returns a three element array with those values when dotted
# with vector.
# This should be a 3 row by 6 column array
sumByTwo = np.array([ [1, 1, 0, 0, 0, 0],
                      [0, 0, 1, 1, 0, 0],
                      [0, 0, 0, 0, 1, 1]])
# Recreate with `np.tile` and `np.eye`
sumEveryOtherTile = np.tile(np.eye(2),3)
sumEveryThirdTile = np.tile(np.eye(3),2)
# Recreate with `np.kron`
sumByThreeKron = np.kron(
    np.eye(2),
    np.ones((1,3)))
sumByTwoKron = np.kron(
    np.eye(3),
    np.ones((1,2)))
# Aggregate by time
# Create a multi-dimensional array to perform the aggregation
T = np.tile(
    np.kron(
        np.eye(20),
        np.ones((1,1))
    ), 12)
# Transform scaledData using T.  Make sure to retain the keys.
timeData = scaledData.map(lambda (k,v): (k, T.dot(v)))
timeData.cache()
# Obtain a compact representation
componentsTime, timeScores, eigenvaluesTime = pca(
    timeData.map(lambda (k,v): v),
    k=3)
# Aggregate by direction
# we want to see how different pixels (and the underlying neurons captured in these pixels) react when the
# zebrafish is presented with 12 direction-specific patterns, ignoring the temporal aspect of the reaction
# Create a multi-dimensional array to perform the aggregation
D = np.kron(
    np.eye(12),
    np.ones((1,20)))
# Transform scaledData using D.  Make sure to retain the keys.
directionData = scaledData.map(lambda (k,v): (k, D.dot(v)))
directionData.cache()
print directionData.count()
print directionData.first()
# Compact representation of direction data
componentsDirection, directionScores, eigenvaluesDirection = pca(
    directionData.map(lambda (k,v): v),
    k=3)






original post http://vasnake.blogspot.com/2016/08/berkeleyx-cs1901x-scalable-machine.html

2 комментария:

Архив блога

Ярлыки

linux (241) python (191) citation (186) web-develop (170) gov.ru (159) video (124) бытовуха (115) sysadm (100) GIS (97) Zope(Plone) (88) бурчалки (84) Book (83) programming (82) грабли (77) Fun (76) development (73) windsurfing (72) Microsoft (64) hiload (62) internet provider (57) opensource (57) security (57) опыт (55) movie (52) Wisdom (51) ML (47) driving (45) hardware (45) language (45) money (42) JS (41) curse (40) bigdata (39) DBMS (38) ArcGIS (34) history (31) PDA (30) howto (30) holyday (29) Google (27) Oracle (27) tourism (27) virtbox (27) health (26) vacation (24) AI (23) Autodesk (23) SQL (23) Java (22) humor (22) knowledge (22) translate (20) CSS (19) cheatsheet (19) hack (19) Apache (16) Manager (15) web-browser (15) Никонов (15) functional programming (14) happiness (14) music (14) todo (14) Klaipeda (13) PHP (13) course (13) scala (13) weapon (13) HTTP. Apache (12) SSH (12) frameworks (12) hero (12) im (12) settings (12) HTML (11) SciTE (11) USA (11) crypto (11) game (11) map (11) HTTPD (9) ODF (9) Photo (9) купи/продай (9) benchmark (8) documentation (8) 3D (7) CS (7) DNS (7) NoSQL (7) cloud (7) django (7) gun (7) matroska (7) telephony (7) Microsoft Office (6) VCS (6) bluetooth (6) pidgin (6) proxy (6) Donald Knuth (5) ETL (5) NVIDIA (5) Palanga (5) REST (5) bash (5) flash (5) keyboard (5) price (5) samba (5) CGI (4) LISP (4) RoR (4) cache (4) car (4) display (4) holywar (4) nginx (4) pistol (4) spark (4) xml (4) Лебедев (4) IDE (3) IE8 (3) J2EE (3) NTFS (3) RDP (3) holiday (3) mount (3) Гоблин (3) кухня (3) урюк (3) AMQP (2) ERP (2) IE7 (2) NAS (2) Naudoc (2) PDF (2) address (2) air (2) british (2) coffee (2) fitness (2) font (2) ftp (2) fuckup (2) messaging (2) notify (2) sharepoint (2) ssl/tls (2) stardict (2) tests (2) tunnel (2) udev (2) APT (1) CRUD (1) Canyonlands (1) Cyprus (1) DVDShrink (1) Jabber (1) K9Copy (1) Matlab (1) Portugal (1) VBA (1) WD My Book (1) autoit (1) bike (1) cannabis (1) chat (1) concurrent (1) dbf (1) ext4 (1) idioten (1) join (1) krusader (1) license (1) life (1) migration (1) mindmap (1) navitel (1) pneumatic weapon (1) quiz (1) regexp (1) robot (1) science (1) serialization (1) spatial (1) tie (1) vim (1) Науру (1) крысы (1) налоги (1) пианино (1)