Linear Regression and
Distributed Machine Learning Principles
Теперь посмотрим
на лабораторку lab3
Lab 3: Millionsong
Regression Pipeline. Develop an end-to-end linear regression pipeline
to predict the release year of a song given a set of audio features.
You will implement a gradient descent solver for linear regression,
use Spark's machine Learning library ( mllib) to train additional
models, tune models via grid search, improve accuracy using quadratic
features, and visualize various intermediate results to build
intuition
Построим модель
(линейная регрессия, gradient descent), потренируем
ее и будем угадывать год выхода песни
по ее свойствам. Это в принципе. А в
кожухе, в лабе будет еще много всякого,
типа извлечения дополнительный фичей,
построения альтернативных моделей,
поиск и тюнинг гиперпараметров для
ridge regression и проч.
The
notebook can be found on GitHub at the following links:
ОК, скачиваем
нотебук, запускаем виртмашину
valik@snafu:~$ pushd ~/sparkvagrant/
valik@snafu:~/sparkvagrant$ vagrant up
Part 1: Read and parse
the initial dataset
Visualization 1:
Features
Visualization 2:
Shifting labels
Part 2: Create and
evaluate a baseline model
Visualization 3:
Predicted vs. actual
Part 3: Train (via
gradient descent) and evaluate a linear regression model
Visualization 4:
Training error
Part 4: Train using
MLlib and tune hyperparameters via grid search
Visualization 5: Best
model's predictions
Visualization 6:
Hyperparameter heat map
Part 5: Add
interactions between features
Документация
к используемым API:
1. Загрузка
и парсинг датасета.
На входе у нас
текстовый файл, CSV, где в первом поле
указан год (label) и остальные поля содержат
свойства музыкального трека, в виде
чисел.
numPartitions = 2
rawData = sc.textFile(fileName, numPartitions)
numPoints = rawData.count()
print numPoints
samplePoints = rawData.take(5)
print samplePoints
6724
[u'2001.0,0.884123733793,0.610454259079,0.600498416968,0.474669212493,0.247232680947,0.357306088914,0.344136412234,0.339641227335,0.600858840135,0.425704689024,0.60491501652,0.419193351817', u'2001.0,0.854411946129,0.604124786151,0.593634078776,0.495885413963,0.266307830936,0.261472105188,0.506387076327,0.464453565511,0.665798573683,0.542968988766,0.58044428577,0.445219373624', ...
rawData это будет
RDD с сырыми данными. Почти 7 тыщ записей.
Распарсим
данные и создадим RDD содержащий
объекты LabeledPoint.
Сначала напишем
функцию конвертации строки из исходного
датасета в LabeledPoint
from pyspark.mllib.regression import LabeledPoint
import numpy as np
def parsePoint(line):
"""Converts a comma separated unicode string into a `LabeledPoint`.
Args:
line (unicode): Comma separated unicode string where the first element is the label and the
remaining elements are features.
Returns:
LabeledPoint: The line is converted into a `LabeledPoint`, which consists of a label and
features.
"""
label = подсказывать нельзя
features = ???
res = LabeledPoint(label, features)
return res
parsedSamplePoints = [parsePoint(x) for x in samplePoints]
firstPointFeatures = parsedSamplePoints[0].features
firstPointLabel = parsedSamplePoints[0].label
print firstPointFeatures, firstPointLabel
|
Для осознания,
какие данные у нас в датасете, был
построен график:
Then we will look at
the raw features for 50 data points by generating a heatmap that
visualizes each feature on a grey-scale and shows the variation of
each feature across the 50 sample data points. The features are all
between 0 and 1, with values closer to 1 represented via darker
shades of grey.
Продолжаем
играться с данными. Найдем диапазон лет
(лабелей) для песен из датасета. Применим
функцию parsePoint для создания нового RDD,
выцепим оттуда labels и возьмем min, max:
parsedDataInit = rawData.map(<FILL IN>)
onlyLabels = parsedDataInit.map(<FILL IN>)
minYear = <FILL IN>
maxYear = <FILL IN>
print maxYear, minYear
2011.0 1922.0
Диапазон от
1922 года до 2011. Для обучения модели с этим
надо что-то сделать. Нормализовать,
чтобы минимальное значение было 0.
parsedData = parsedDataInit.<FILL IN>
# Should be a LabeledPoint
print type(parsedData.take(1)[0])
# View the first point
print '\n{0}'.format(parsedData.take(1))
Блин, с этим договором не давать подсказок,
фак! Невозможно ничего показать.
В общем, тут
создается новый RDD (через map), где из
каждой лабели вычтено минимальное
значение.
Теперь наш
датасет надо разделить на сеты:
тренировочный, кросс-валидации и
тестовый. Для разделения используем
Датасеты
закешируем в оперативке, ибо обращаться
к ним мы будем постоянно.
weights = [.8, .1, .1]
seed = 42
parsedTrainData, parsedValData, parsedTestData = parsedData.randomSplit(weights, seed)
parsedTrainData.cache()
parsedValData.cache()
parsedTestData.cache()
nTrain = parsedTrainData.count()
nVal = parsedValData.count()
nTest = parsedTestData.count()
print nTrain, nVal, nTest, nTrain + nVal + nTest
print parsedData.count()
5371 682 671 6724
6724
2. Создание
и оценка референсной модели.
Референсная
(baseline) модель, это модель самая простая,
нужная для того, чтобы иметь ориентир
хоть какой-то.
Самым простым
вариантом была признана константа,
равная среднему арифметическому
averageTrainYear = (parsedTrainData
.map(lambda x: x.label)
.mean()
)
print averageTrainYear
53.9316700801
Напишем пару
функций для оценки модели методом
среднеквадратичной ошибки
def squaredError(label, prediction):
"""Calculates the the squared error for a single prediction.
Args:
label (float): The correct value for this observation.
prediction (float): The predicted value for this observation.
Returns:
float: The difference between the `label` and `prediction` squared.
"""
import math
res = math.pow(label - prediction, 2)
return res
def calcRMSE(labelsAndPreds):
"""Calculates the root mean squared error for an `RDD` of (label, prediction) tuples.
Args:
labelsAndPred (RDD of (float, float)): An `RDD` consisting of (label, prediction) tuples.
Returns:
float: The square root of the mean of the squared errors.
"""
res = (labelsAndPreds
.map(lambda (x, y): квадрат ошибки)
.reduce(lambda x, y: просуммировать)
)
import math
return math.sqrt(поделить сумму на количество)
labelsAndPreds = sc.parallelize([(3., 1.), (1., 2.), (2., 2.)])
# RMSE = sqrt[((3-1)^2 + (1-2)^2 + (2-2)^2) / 3] = 1.291
exampleRMSE = calcRMSE(labelsAndPreds)
print exampleRMSE
|
1.29099444874
Посмотрим
теперь, какую ошибку в годах дает наша
базовая модель для песенного датасета
(трех: тренировочного, кросс-валидации
и тестового).
labelsAndPredsTrain = parsedTrainData.map(lambda x: (актуальная лабель и предсказание))
rmseTrainBase = calcRMSE(labelsAndPredsTrain)
labelsAndPredsVal = parsedValData.map(lambda x: (лабель и предсказание))
rmseValBase = calcRMSE(labelsAndPredsVal)
labelsAndPredsTest = parsedTestData.map(lambda x: (лабель и предсказание))
rmseTestBase = calcRMSE(labelsAndPredsTest)
print 'Baseline Train RMSE = {0:.3f}'.format(rmseTrainBase)
print 'Baseline Validation RMSE = {0:.3f}'.format(rmseValBase)
print 'Baseline Test RMSE = {0:.3f}'.format(rmseTestBase)
Baseline Train RMSE = 21.306
Baseline Validation
RMSE = 21.586
Baseline Test RMSE =
22.137
3. Тренировка
и оценка самопальной модели линейной
регресии.
Размялись?
Теперь пора занятся настоящей моделью,
линейная регрессия обученная через
gradient descent.
Напишем функцию
вычисления апдейта для одной итерации
def gradientSummand(weights, lp):
"""Calculates the gradient summand for a given weight and `LabeledPoint`.
Note:
`DenseVector` behaves similarly to a `numpy.ndarray` and they can be used interchangably
within this function. For example, they both implement the `dot` method.
Args:
weights (DenseVector): An array of model weights (betas).
lp (LabeledPoint): The `LabeledPoint` for a single observation.
Returns:
DenseVector: An array of values the same length as `weights`. The gradient summand.
"""
w = транспонированные веса
x = фичи лабельпойнта
y = лабель лабельпойнта
res = (w.dot(x) - y) * x
return res
exampleW = DenseVector([1, 1, 1])
exampleLP = LabeledPoint(2.0, [3, 1, 4])
# gradientSummand = (dot([1 1 1], [3 1 4]) - 2) * [3 1 4] = (8 - 2) * [3 1 4] = [18 6 24]
summandOne = gradientSummand(exampleW, exampleLP)
print summandOne
|
[18.0,6.0,24.0]
Теперь нужна
функция вычисляющая предсказание. Нужно
просто умножить два вектора – веса и
фичи, dot product
def getLabeledPrediction(weights, observation):
"""Calculates predictions and returns a (label, prediction) tuple.
Note:
The labels should remain unchanged as we'll use this information to calculate prediction
error later.
Args:
weights (np.ndarray): An array with one weight for each features in `trainData`.
observation (LabeledPoint): A `LabeledPoint` that contain the correct label and the
features for the data point.
Returns:
tuple: A (label, prediction) tuple.
"""
label = лабель наблюдения
w = транспонированные веса
x = фичи наблюдения
prediction = веса дотпродакт фичи
res = (label, prediction)
return res
weights = np.array([1.0, 1.5])
predictionExample = sc.parallelize([LabeledPoint(2, np.array([1.0, .5])),
LabeledPoint(1.5, np.array([.5, .5]))])
labelsAndPredsExample = predictionExample.map(lambda lp: getLabeledPrediction(weights, lp))
print labelsAndPredsExample.collect()
|
[(2.0, 1.75), (1.5,
1.25)]
Теперь самое
вкусное, gradient descent
def linregGradientDescent(trainData, numIters):
"""Calculates the weights and error for a linear regression model trained with gradient descent.
Note:
`DenseVector` behaves similarly to a `numpy.ndarray` and they can be used interchangably
within this function. For example, they both implement the `dot` method.
Args:
trainData (RDD of LabeledPoint): The labeled data for use in training the model.
numIters (int): The number of iterations of gradient descent to perform.
Returns:
(np.ndarray, np.ndarray): A tuple of (weights, training errors). Weights will be the
final weights (one weight per feature) for the model, and training errors will contain
an error (RMSE) for each iteration of the algorithm.
"""
# The length of the training data
n = trainData.count()
# The number of features in the training data
d = len(trainData.take(1)[0].features)
w = np.zeros(d)
alpha = 1.0
# We will compute and store the training error after each iteration
errorTrain = np.zeros(numIters)
for i in range(numIters):
# Use getLabeledPrediction from (3b) with trainData to obtain an RDD of (label, prediction)
# tuples. Note that the weights all equal 0 for the first iteration, so the predictions will
# have large errors to start.
labelsAndPredsTrain = trainData.map getLabeledPrediction
errorTrain[i] = calcRMSE(labelsAndPredsTrain)
# Calculate the `gradient`. Make use of the `gradientSummand` function you wrote in (3a).
# Note that `gradient` sould be a `DenseVector` of length `d`.
gradient = trainData.map gradientSummand reduce просуммировать
# Update the weights
alpha_i = alpha / (n * np.sqrt(i+1))
w -= альфа помноженная на градиент
return w, errorTrain
# create a toy dataset with n = 10, d = 3, and then run 5 iterations of gradient descent
# note: the resulting model will not be useful; the goal here is to verify that
# linregGradientDescent is working properly
exampleN = 10
exampleD = 3
exampleData = (sc
.parallelize(parsedTrainData.take(exampleN))
.map(lambda lp: LabeledPoint(lp.label, lp.features[0:exampleD])))
print exampleData.take(2)
exampleNumIters = 5
exampleWeights, exampleErrorTrain = linregGradientDescent(exampleData, exampleNumIters)
print exampleWeights
|
[LabeledPoint(79.0,
[0.884123733793,0.610454259079,0.600498416968]), LabeledPoint(79.0,
[0.854411946129,0.604124786151,0.593634078776])]
[ 48.88110449
36.01144093 30.25350092]
Теперь применим
эти функции для тренировки линрег
модели, получим веса. И проверим точность
модели на датасете кросс-валидации.
numIters = 50
weightsLR0, errorTrainLR0 = linregGradientDescent(распарсенные трен.данные, numIters)
labelsAndPreds = parsedValData.map(lambda x: getLabeledPrediction(weightsLR0, x))
rmseValLR0 = calcRMSE(labelsAndPreds)
print 'Validation RMSE:\n\tBaseline = {0:.3f}\n\tLR0 = {1:.3f}'.format(rmseValBase,
rmseValLR0)
Validation RMSE:
Baseline = 21.586
LR0 = 19.192
Фиговенькая
точность, хотя и лучше чем у базовой
модели.
4. Тренировка
моделей в/из MLlib, поиск гиперпараметров
через grid search.
Теперь
воспользуемся библиотечной функциональностью
для построения
модели линейной регрессии с регуляризацией
и доп.коэффициентом/смещением (intercept).
from pyspark.mllib.regression import LinearRegressionWithSGD
# Values to use when training the linear regression model
numIters = 500 # iterations
alpha = 1.0 # step
miniBatchFrac = 1.0 # miniBatchFraction
reg = 1e-1 # regParam
regType = 'l2' # regType
useIntercept = True # intercept
firstModel = LinearRegressionWithSGD.тренировать(
распарсенные трен.данные,
iterations=numIters,
step=alpha,
miniBatchFraction=miniBatchFrac,
regParam=reg,
regType=regType,
intercept=useIntercept
)
# weightsLR1 stores the model weights; interceptLR1 stores the model intercept
weightsLR1 = firstModel.weights
interceptLR1 = firstModel.intercept
print weightsLR1, interceptLR1
[16.682292427,14.7439059559,-0.0935105608897,6.22080088829,4.01454261926,-3.30214858535,11.0403027232,2.67190962854,7.18925791279,4.46093254586,8.14950409475,2.75135810882]
13.3335907631
Воспользуемся
для предсказания библиотечной функцией
samplePoint = parsedTrainData.take(1)[0]
samplePrediction = firstModel.предсказать(samplePoint.features)
print samplePrediction
56.8013380112
Модель есть,
натренирована. Проверим, насколько она
ошибается.
labelsAndPreds = распарсенные валид.данные map(lambda x: (x.label, firstModel.предсказать(x.features)))
rmseValLR1 = calcRMSE(labelsAndPreds)
print ('Validation RMSE:\n\tBaseline = {0:.3f}\n\tLR0 = {1:.3f}' +
'\n\tLR1 = {2:.3f}').format(rmseValBase, rmseValLR0, rmseValLR1)
Validation RMSE:
Baseline = 21.586
LR0 = 19.192
LR1 = 19.691
Забавно, ошибка
больше, чем у модели обученной на меньшем
количестве итераций. Очевидно,
регуляризация была подобрана неудачно.
Поэтому что?
Правильно, надо занятся подбором
гиперпараметров.
Методом Grid
Search подберем размер регуляризатора
bestRMSE = rmseValLR1
bestRegParam = reg
bestModel = firstModel
numIters = 500
alpha = 1.0
miniBatchFrac = 1.0
for reg in (1e-10, 1e-5, 1):
model = LinearRegressionWithSGD.train(parsedTrainData, numIters, alpha,
miniBatchFrac, regParam=reg,
regType='l2', intercept=True)
labelsAndPreds = parsedValData.map(lambda lp: (lp.label, model.predict(lp.features)))
rmseValGrid = calcRMSE(labelsAndPreds)
print rmseValGrid
if rmseValGrid < bestRMSE:
bestRMSE = rmseValGrid
bestRegParam = reg
bestModel = model
rmseValLRGrid = bestRMSE
print ('Validation RMSE:\n\tBaseline = {0:.3f}\n\tLR0 = {1:.3f}\n\tLR1 = {2:.3f}\n' +
'\tLRGrid = {3:.3f}').format(rmseValBase, rmseValLR0, rmseValLR1, rmseValLRGrid)
|
17.0171700716
17.0175981807
23.8007746698
Validation RMSE:
Baseline = 21.586
LR0 = 19.192
LR1 = 19.691
LRGrid = 17.017
Вот, другое
дело. Теперь предсказания точнее на два
с лихуем года. А по сравнению с базовой
моделью, так вообще, 4+
Теперь проверим,
что будет при других значениях альфа
(шаг gradient descent)
reg = bestRegParam
modelRMSEs = []
for alpha in (1e-5, 10):
for numIters in (5, 500):
model = LinearRegressionWithSGD.train(parsedTrainData, numIters, alpha,
miniBatchFrac, regParam=reg,
regType='l2', intercept=True)
labelsAndPreds = parsedValData.map(lambda lp: (lp.label, model.predict(lp.features)))
rmseVal = calcRMSE(labelsAndPreds)
print 'alpha = {0:.0e}, numIters = {1}, RMSE = {2:.3f}'.format(alpha, numIters, rmseVal)
modelRMSEs.append(rmseVal)
|
alpha = 1e-05, numIters
= 5, RMSE = 56.970
alpha = 1e-05, numIters
= 500, RMSE = 56.893
alpha = 1e+01, numIters
= 5, RMSE = 355124752.221
alpha = 1e+01, numIters
= 500, RMSE =
33110728225678989137251839534941311488306376280786874812632607716020409015644103457295574688229365257796099669135380709376.000
Ну нихрена ж
себе! Видали, что будет если шаг слишком
большой?
5. Добавим в
модель производные фичи, попарно связав
исходные.
Now, we will add
features that capture the two-way interactions between our existing
features. Write a function twoWayInteractions that takes in a
LabeledPoint and generates a new LabeledPoint that contains the old
features and the two-way interactions between them. Note that a
dataset with three features would have nine ( 3^2 ) two-way
interactions
import itertools
def twoWayInteractions(lp):
"""Creates a new `LabeledPoint` that includes two-way interactions.
Note:
For features [x, y] the two-way interactions would be [x^2, x*y, y*x, y^2] and these
would be appended to the original [x, y] feature list.
Args:
lp (LabeledPoint): The label and features for this observation.
Returns:
LabeledPoint: The new `LabeledPoint` should have the same label as `lp`. Its features
should include the features from `lp` followed by the two-way interaction features.
"""
newfeatures = itertools.перемножить(lp.features, repeat=2)
newfeatures = [x*y for x,y in newfeatures]
label = lp.label
features = np.горизонтальный стек((lp.features, newfeatures))
res = LabeledPoint(label, features)
return res
print twoWayInteractions(LabeledPoint(0.0, [2, 3]))
# Transform the existing train, validation, and test sets to include two-way interactions.
trainDataInteract = parsedTrainData.для каждого lp(lambda x: twoWayInteractions(x))
valDataInteract = parsedValData.для каждого lp(lambda x: twoWayInteractions(x))
testDataInteract = parsedTestData.для каждого lp(lambda x: twoWayInteractions(x))
|
(0.0,[2.0,3.0,4.0,6.0,6.0,9.0])
Посмотрим, что
даст тренировка модели на нашем раздутом
датасете. Предварительный поиск
гиперпараметров был сделан за кадром.
numIters = 500
alpha = 1.0
miniBatchFrac = 1.0
reg = 1e-10
modelInteract = LinearRegressionWithSGD.train(trainDataInteract, numIters, alpha,
miniBatchFrac, regParam=reg,
regType='l2', intercept=True)
labelsAndPredsInteract = valDataInteract.map(lambda lp: (lp.label, modelInteract.predict(lp.features)))
rmseValInteract = calcRMSE(labelsAndPredsInteract)
print ('Validation RMSE:\n\tBaseline = {0:.3f}\n\tLR0 = {1:.3f}\n\tLR1 = {2:.3f}\n\tLRGrid = ' +
'{3:.3f}\n\tLRInteract = {4:.3f}').format(rmseValBase, rmseValLR0, rmseValLR1,
rmseValLRGrid, rmseValInteract)
|
Validation RMSE:
Baseline = 21.586
LR0 = 19.192
LR1 = 19.691
LRGrid = 17.017
LRInteract = 15.690
Опаньки,
отыграли еще два года точности, более
10%. Отлично!
Остается только
проверить последнюю, самую точную модель
на тестовом наборе данных
labelsAndPredsTest = testDataInteract.map(lambda x: (x.label, modelInteract.predict(x.features)))
rmseTestInteract = calcRMSE(labelsAndPredsTest)
print ('Test RMSE:\n\tBaseline = {0:.3f}\n\tLRInteract = {1:.3f}'
.format(rmseTestBase, rmseTestInteract))
|
Test RMSE:
Baseline = 22.137
LRInteract = 16.327
Видно, что
ошибка больше, чем на кросс-валидационном
наборе. Догадываетесь почему? Правильно,
кросс-валидационный набор был использован
в подборе гиперпараметров, что привело
к некоторому оверфиттингу для этого
набора.
Вот и вся третья
лабораторка. Энджой.
original post http://vasnake.blogspot.com/2015/10/week-3-lab.html