В прошлый раз
я рассказал, что входило в лекции
по теории, третья неделя:
Linear Regression and
Distributed Machine Learning Principles
Теперь посмотрим
на лабораторку lab3
This
lab covers a common supervised learning pipeline, using a subset of
the Million
Song Dataset from
theUCI
Machine Learning Repository.
Our goal is to train a linear regression model to predict the release
year of a song given a set of audio features.
Lab 3: Millionsong
Regression Pipeline. Develop an end-to-end linear regression pipeline
to predict the release year of a song given a set of audio features.
You will implement a gradient descent solver for linear regression,
use Spark's machine Learning library ( mllib) to train additional
models, tune models via grid search, improve accuracy using quadratic
features, and visualize various intermediate results to build
intuition
Построим модель
(линейная регрессия, gradient descent), потренируем
ее и будем угадывать год выхода песни
по ее свойствам. Это в принципе. А в
кожухе, в лабе будет еще много всякого,
типа извлечения дополнительный фичей,
построения альтернативных моделей,
поиск и тюнинг гиперпараметров для
ridge regression и проч.
The
notebook can be found on GitHub at the following links:
ОК, скачиваем
нотебук, запускаем виртмашину
valik@snafu:~$ pushd ~/sparkvagrant/ valik@snafu:~/sparkvagrant$ vagrant up
И понеслась http://localhost:8001/
Part 1: Read and parse
the initial dataset
Visualization 1:
Features
Visualization 2:
Shifting labels
Part 2: Create and
evaluate a baseline model
Visualization 3:
Predicted vs. actual
Part 3: Train (via
gradient descent) and evaluate a linear regression model
Visualization 4:
Training error
Part 4: Train using
MLlib and tune hyperparameters via grid search
Visualization 5: Best
model's predictions
Visualization 6:
Hyperparameter heat map
Part 5: Add
interactions between features
Документация
к используемым API:
1. Загрузка
и парсинг датасета.
На входе у нас
текстовый файл, CSV, где в первом поле
указан год (label) и остальные поля содержат
свойства музыкального трека, в виде
чисел.
numPartitions = 2 rawData = sc.textFile(fileName, numPartitions) numPoints = rawData.count() print numPoints samplePoints = rawData.take(5) print samplePoints 6724 [u'2001.0,0.884123733793,0.610454259079,0.600498416968,0.474669212493,0.247232680947,0.357306088914,0.344136412234,0.339641227335,0.600858840135,0.425704689024,0.60491501652,0.419193351817', u'2001.0,0.854411946129,0.604124786151,0.593634078776,0.495885413963,0.266307830936,0.261472105188,0.506387076327,0.464453565511,0.665798573683,0.542968988766,0.58044428577,0.445219373624', ...
rawData это будет
RDD с сырыми данными. Почти 7 тыщ записей.
Распарсим
данные и создадим RDD содержащий
объекты LabeledPoint.
Сначала напишем
функцию конвертации строки из исходного
датасета в LabeledPoint
from pyspark.mllib.regression import LabeledPoint import numpy as np def parsePoint(line): """Converts a comma separated unicode string into a `LabeledPoint`. Args: line (unicode): Comma separated unicode string where the first element is the label and the remaining elements are features. Returns: LabeledPoint: The line is converted into a `LabeledPoint`, which consists of a label and features. """ label = подсказывать нельзя features = ??? res = LabeledPoint(label, features) return res parsedSamplePoints = [parsePoint(x) for x in samplePoints] firstPointFeatures = parsedSamplePoints[0].features firstPointLabel = parsedSamplePoints[0].label print firstPointFeatures, firstPointLabel |
Для осознания,
какие данные у нас в датасете, был
построен график:
Then we will look at
the raw features for 50 data points by generating a heatmap that
visualizes each feature on a grey-scale and shows the variation of
each feature across the 50 sample data points. The features are all
between 0 and 1, with values closer to 1 represented via darker
shades of grey.
Продолжаем
играться с данными. Найдем диапазон лет
(лабелей) для песен из датасета. Применим
функцию parsePoint для создания нового RDD,
выцепим оттуда labels и возьмем min, max:
parsedDataInit = rawData.map(<FILL IN>) onlyLabels = parsedDataInit.map(<FILL IN>) minYear = <FILL IN> maxYear = <FILL IN> print maxYear, minYear 2011.0 1922.0
Диапазон от
1922 года до 2011. Для обучения модели с этим
надо что-то сделать. Нормализовать,
чтобы минимальное значение было 0.
parsedData = parsedDataInit.<FILL IN> # Should be a LabeledPoint print type(parsedData.take(1)[0]) # View the first point print '\n{0}'.format(parsedData.take(1))
Блин, с этим договором не давать подсказок,
фак! Невозможно ничего показать.
В общем, тут
создается новый RDD (через map), где из
каждой лабели вычтено минимальное
значение.
Теперь наш
датасет надо разделить на сеты:
тренировочный, кросс-валидации и
тестовый. Для разделения используем
Датасеты
закешируем в оперативке, ибо обращаться
к ним мы будем постоянно.
weights = [.8, .1, .1] seed = 42 parsedTrainData, parsedValData, parsedTestData = parsedData.randomSplit(weights, seed) parsedTrainData.cache() parsedValData.cache() parsedTestData.cache() nTrain = parsedTrainData.count() nVal = parsedValData.count() nTest = parsedTestData.count() print nTrain, nVal, nTest, nTrain + nVal + nTest print parsedData.count()
5371 682 671 6724
6724
2. Создание
и оценка референсной модели.
Референсная
(baseline) модель, это модель самая простая,
нужная для того, чтобы иметь ориентир
хоть какой-то.
Самым простым
вариантом была признана константа,
равная среднему арифметическому
averageTrainYear = (parsedTrainData .map(lambda x: x.label) .mean() ) print averageTrainYear
53.9316700801
Напишем пару
функций для оценки модели методом
среднеквадратичной ошибки
def squaredError(label, prediction): """Calculates the the squared error for a single prediction. Args: label (float): The correct value for this observation. prediction (float): The predicted value for this observation. Returns: float: The difference between the `label` and `prediction` squared. """ import math res = math.pow(label - prediction, 2) return res def calcRMSE(labelsAndPreds): """Calculates the root mean squared error for an `RDD` of (label, prediction) tuples. Args: labelsAndPred (RDD of (float, float)): An `RDD` consisting of (label, prediction) tuples. Returns: float: The square root of the mean of the squared errors. """ res = (labelsAndPreds .map(lambda (x, y): квадрат ошибки) .reduce(lambda x, y: просуммировать) ) import math return math.sqrt(поделить сумму на количество) labelsAndPreds = sc.parallelize([(3., 1.), (1., 2.), (2., 2.)]) # RMSE = sqrt[((3-1)^2 + (1-2)^2 + (2-2)^2) / 3] = 1.291 exampleRMSE = calcRMSE(labelsAndPreds) print exampleRMSE |
1.29099444874
Посмотрим
теперь, какую ошибку в годах дает наша
базовая модель для песенного датасета
(трех: тренировочного, кросс-валидации
и тестового).
labelsAndPredsTrain = parsedTrainData.map(lambda x: (актуальная лабель и предсказание)) rmseTrainBase = calcRMSE(labelsAndPredsTrain) labelsAndPredsVal = parsedValData.map(lambda x: (лабель и предсказание)) rmseValBase = calcRMSE(labelsAndPredsVal) labelsAndPredsTest = parsedTestData.map(lambda x: (лабель и предсказание)) rmseTestBase = calcRMSE(labelsAndPredsTest) print 'Baseline Train RMSE = {0:.3f}'.format(rmseTrainBase) print 'Baseline Validation RMSE = {0:.3f}'.format(rmseValBase) print 'Baseline Test RMSE = {0:.3f}'.format(rmseTestBase)
Baseline Train RMSE = 21.306
Baseline Validation
RMSE = 21.586
Baseline Test RMSE =
22.137
3. Тренировка
и оценка самопальной модели линейной
регресии.
Размялись?
Теперь пора занятся настоящей моделью,
линейная регрессия обученная через
gradient descent.
Напишем функцию
вычисления апдейта для одной итерации
def gradientSummand(weights, lp): """Calculates the gradient summand for a given weight and `LabeledPoint`. Note: `DenseVector` behaves similarly to a `numpy.ndarray` and they can be used interchangably within this function. For example, they both implement the `dot` method. Args: weights (DenseVector): An array of model weights (betas). lp (LabeledPoint): The `LabeledPoint` for a single observation. Returns: DenseVector: An array of values the same length as `weights`. The gradient summand. """ w = транспонированные веса x = фичи лабельпойнта y = лабель лабельпойнта res = (w.dot(x) - y) * x return res exampleW = DenseVector([1, 1, 1]) exampleLP = LabeledPoint(2.0, [3, 1, 4]) # gradientSummand = (dot([1 1 1], [3 1 4]) - 2) * [3 1 4] = (8 - 2) * [3 1 4] = [18 6 24] summandOne = gradientSummand(exampleW, exampleLP) print summandOne |
[18.0,6.0,24.0]
Теперь нужна
функция вычисляющая предсказание. Нужно
просто умножить два вектора – веса и
фичи, dot product
def getLabeledPrediction(weights, observation): """Calculates predictions and returns a (label, prediction) tuple. Note: The labels should remain unchanged as we'll use this information to calculate prediction error later. Args: weights (np.ndarray): An array with one weight for each features in `trainData`. observation (LabeledPoint): A `LabeledPoint` that contain the correct label and the features for the data point. Returns: tuple: A (label, prediction) tuple. """ label = лабель наблюдения w = транспонированные веса x = фичи наблюдения prediction = веса дотпродакт фичи res = (label, prediction) return res weights = np.array([1.0, 1.5]) predictionExample = sc.parallelize([LabeledPoint(2, np.array([1.0, .5])), LabeledPoint(1.5, np.array([.5, .5]))]) labelsAndPredsExample = predictionExample.map(lambda lp: getLabeledPrediction(weights, lp)) print labelsAndPredsExample.collect() |
[(2.0, 1.75), (1.5,
1.25)]
Теперь самое
вкусное, gradient descent
def linregGradientDescent(trainData, numIters): """Calculates the weights and error for a linear regression model trained with gradient descent. Note: `DenseVector` behaves similarly to a `numpy.ndarray` and they can be used interchangably within this function. For example, they both implement the `dot` method. Args: trainData (RDD of LabeledPoint): The labeled data for use in training the model. numIters (int): The number of iterations of gradient descent to perform. Returns: (np.ndarray, np.ndarray): A tuple of (weights, training errors). Weights will be the final weights (one weight per feature) for the model, and training errors will contain an error (RMSE) for each iteration of the algorithm. """ # The length of the training data n = trainData.count() # The number of features in the training data d = len(trainData.take(1)[0].features) w = np.zeros(d) alpha = 1.0 # We will compute and store the training error after each iteration errorTrain = np.zeros(numIters) for i in range(numIters): # Use getLabeledPrediction from (3b) with trainData to obtain an RDD of (label, prediction) # tuples. Note that the weights all equal 0 for the first iteration, so the predictions will # have large errors to start. labelsAndPredsTrain = trainData.map getLabeledPrediction errorTrain[i] = calcRMSE(labelsAndPredsTrain) # Calculate the `gradient`. Make use of the `gradientSummand` function you wrote in (3a). # Note that `gradient` sould be a `DenseVector` of length `d`. gradient = trainData.map gradientSummand reduce просуммировать # Update the weights alpha_i = alpha / (n * np.sqrt(i+1)) w -= альфа помноженная на градиент return w, errorTrain # create a toy dataset with n = 10, d = 3, and then run 5 iterations of gradient descent # note: the resulting model will not be useful; the goal here is to verify that # linregGradientDescent is working properly exampleN = 10 exampleD = 3 exampleData = (sc .parallelize(parsedTrainData.take(exampleN)) .map(lambda lp: LabeledPoint(lp.label, lp.features[0:exampleD]))) print exampleData.take(2) exampleNumIters = 5 exampleWeights, exampleErrorTrain = linregGradientDescent(exampleData, exampleNumIters) print exampleWeights |
[LabeledPoint(79.0,
[0.884123733793,0.610454259079,0.600498416968]), LabeledPoint(79.0,
[0.854411946129,0.604124786151,0.593634078776])]
[ 48.88110449
36.01144093 30.25350092]
Теперь применим
эти функции для тренировки линрег
модели, получим веса. И проверим точность
модели на датасете кросс-валидации.
numIters = 50 weightsLR0, errorTrainLR0 = linregGradientDescent(распарсенные трен.данные, numIters) labelsAndPreds = parsedValData.map(lambda x: getLabeledPrediction(weightsLR0, x)) rmseValLR0 = calcRMSE(labelsAndPreds) print 'Validation RMSE:\n\tBaseline = {0:.3f}\n\tLR0 = {1:.3f}'.format(rmseValBase, rmseValLR0)
Validation RMSE:
Baseline = 21.586
LR0 = 19.192
Фиговенькая
точность, хотя и лучше чем у базовой
модели.
4. Тренировка
моделей в/из MLlib, поиск гиперпараметров
через grid search.
Теперь
воспользуемся библиотечной функциональностью
для построения
модели линейной регрессии с регуляризацией
и доп.коэффициентом/смещением (intercept).
from pyspark.mllib.regression import LinearRegressionWithSGD # Values to use when training the linear regression model numIters = 500 # iterations alpha = 1.0 # step miniBatchFrac = 1.0 # miniBatchFraction reg = 1e-1 # regParam regType = 'l2' # regType useIntercept = True # intercept firstModel = LinearRegressionWithSGD.тренировать( распарсенные трен.данные, iterations=numIters, step=alpha, miniBatchFraction=miniBatchFrac, regParam=reg, regType=regType, intercept=useIntercept ) # weightsLR1 stores the model weights; interceptLR1 stores the model intercept weightsLR1 = firstModel.weights interceptLR1 = firstModel.intercept print weightsLR1, interceptLR1
[16.682292427,14.7439059559,-0.0935105608897,6.22080088829,4.01454261926,-3.30214858535,11.0403027232,2.67190962854,7.18925791279,4.46093254586,8.14950409475,2.75135810882]
13.3335907631
Воспользуемся
для предсказания библиотечной функцией
samplePoint = parsedTrainData.take(1)[0] samplePrediction = firstModel.предсказать(samplePoint.features) print samplePrediction
56.8013380112
Модель есть,
натренирована. Проверим, насколько она
ошибается.
labelsAndPreds = распарсенные валид.данные map(lambda x: (x.label, firstModel.предсказать(x.features))) rmseValLR1 = calcRMSE(labelsAndPreds) print ('Validation RMSE:\n\tBaseline = {0:.3f}\n\tLR0 = {1:.3f}' + '\n\tLR1 = {2:.3f}').format(rmseValBase, rmseValLR0, rmseValLR1)
Validation RMSE:
Baseline = 21.586
LR0 = 19.192
LR1 = 19.691
Забавно, ошибка
больше, чем у модели обученной на меньшем
количестве итераций. Очевидно,
регуляризация была подобрана неудачно.
Поэтому что?
Правильно, надо занятся подбором
гиперпараметров.
Методом Grid
Search подберем размер регуляризатора
bestRMSE = rmseValLR1 bestRegParam = reg bestModel = firstModel numIters = 500 alpha = 1.0 miniBatchFrac = 1.0 for reg in (1e-10, 1e-5, 1): model = LinearRegressionWithSGD.train(parsedTrainData, numIters, alpha, miniBatchFrac, regParam=reg, regType='l2', intercept=True) labelsAndPreds = parsedValData.map(lambda lp: (lp.label, model.predict(lp.features))) rmseValGrid = calcRMSE(labelsAndPreds) print rmseValGrid if rmseValGrid < bestRMSE: bestRMSE = rmseValGrid bestRegParam = reg bestModel = model rmseValLRGrid = bestRMSE print ('Validation RMSE:\n\tBaseline = {0:.3f}\n\tLR0 = {1:.3f}\n\tLR1 = {2:.3f}\n' + '\tLRGrid = {3:.3f}').format(rmseValBase, rmseValLR0, rmseValLR1, rmseValLRGrid) |
17.0171700716
17.0175981807
23.8007746698
Validation RMSE:
Baseline = 21.586
LR0 = 19.192
LR1 = 19.691
LRGrid = 17.017
Вот, другое
дело. Теперь предсказания точнее на два
с лихуем года. А по сравнению с базовой
моделью, так вообще, 4+
Теперь проверим,
что будет при других значениях альфа
(шаг gradient descent)
reg = bestRegParam modelRMSEs = [] for alpha in (1e-5, 10): for numIters in (5, 500): model = LinearRegressionWithSGD.train(parsedTrainData, numIters, alpha, miniBatchFrac, regParam=reg, regType='l2', intercept=True) labelsAndPreds = parsedValData.map(lambda lp: (lp.label, model.predict(lp.features))) rmseVal = calcRMSE(labelsAndPreds) print 'alpha = {0:.0e}, numIters = {1}, RMSE = {2:.3f}'.format(alpha, numIters, rmseVal) modelRMSEs.append(rmseVal) |
alpha = 1e-05, numIters
= 5, RMSE = 56.970
alpha = 1e-05, numIters
= 500, RMSE = 56.893
alpha = 1e+01, numIters
= 5, RMSE = 355124752.221
alpha = 1e+01, numIters
= 500, RMSE =
33110728225678989137251839534941311488306376280786874812632607716020409015644103457295574688229365257796099669135380709376.000
Ну нихрена ж
себе! Видали, что будет если шаг слишком
большой?
5. Добавим в
модель производные фичи, попарно связав
исходные.
Now, we will add
features that capture the two-way interactions between our existing
features. Write a function twoWayInteractions that takes in a
LabeledPoint and generates a new LabeledPoint that contains the old
features and the two-way interactions between them. Note that a
dataset with three features would have nine ( 3^2 ) two-way
interactions
import itertools def twoWayInteractions(lp): """Creates a new `LabeledPoint` that includes two-way interactions. Note: For features [x, y] the two-way interactions would be [x^2, x*y, y*x, y^2] and these would be appended to the original [x, y] feature list. Args: lp (LabeledPoint): The label and features for this observation. Returns: LabeledPoint: The new `LabeledPoint` should have the same label as `lp`. Its features should include the features from `lp` followed by the two-way interaction features. """ newfeatures = itertools.перемножить(lp.features, repeat=2) newfeatures = [x*y for x,y in newfeatures] label = lp.label features = np.горизонтальный стек((lp.features, newfeatures)) res = LabeledPoint(label, features) return res print twoWayInteractions(LabeledPoint(0.0, [2, 3])) # Transform the existing train, validation, and test sets to include two-way interactions. trainDataInteract = parsedTrainData.для каждого lp(lambda x: twoWayInteractions(x)) valDataInteract = parsedValData.для каждого lp(lambda x: twoWayInteractions(x)) testDataInteract = parsedTestData.для каждого lp(lambda x: twoWayInteractions(x)) |
(0.0,[2.0,3.0,4.0,6.0,6.0,9.0])
Посмотрим, что
даст тренировка модели на нашем раздутом
датасете. Предварительный поиск
гиперпараметров был сделан за кадром.
numIters = 500 alpha = 1.0 miniBatchFrac = 1.0 reg = 1e-10 modelInteract = LinearRegressionWithSGD.train(trainDataInteract, numIters, alpha, miniBatchFrac, regParam=reg, regType='l2', intercept=True) labelsAndPredsInteract = valDataInteract.map(lambda lp: (lp.label, modelInteract.predict(lp.features))) rmseValInteract = calcRMSE(labelsAndPredsInteract) print ('Validation RMSE:\n\tBaseline = {0:.3f}\n\tLR0 = {1:.3f}\n\tLR1 = {2:.3f}\n\tLRGrid = ' + '{3:.3f}\n\tLRInteract = {4:.3f}').format(rmseValBase, rmseValLR0, rmseValLR1, rmseValLRGrid, rmseValInteract) |
Validation RMSE:
Baseline = 21.586
LR0 = 19.192
LR1 = 19.691
LRGrid = 17.017
LRInteract = 15.690
Опаньки,
отыграли еще два года точности, более
10%. Отлично!
Остается только
проверить последнюю, самую точную модель
на тестовом наборе данных
labelsAndPredsTest = testDataInteract.map(lambda x: (x.label, modelInteract.predict(x.features))) rmseTestInteract = calcRMSE(labelsAndPredsTest) print ('Test RMSE:\n\tBaseline = {0:.3f}\n\tLRInteract = {1:.3f}' .format(rmseTestBase, rmseTestInteract)) |
Test RMSE:
Baseline = 22.137
LRInteract = 16.327
Видно, что
ошибка больше, чем на кросс-валидационном
наборе. Догадываетесь почему? Правильно,
кросс-валидационный набор был использован
в подборе гиперпараметров, что привело
к некоторому оверфиттингу для этого
набора.
Вот и вся третья
лабораторка. Энджой.
original post http://vasnake.blogspot.com/2015/10/week-3-lab.html
Комментариев нет:
Отправить комментарий