Курс Scalable
Machine Learning. Hadoop, Apache Spark, Python, ML -- вот это
конспектировать пройденный курс. Неделя
В прошлый
раз начали делать лабораторку №4, но
не закончили.
Part 3: Parse CTR
data and generate OHE features
Visualization 1:
Feature frequency
работа с датасетом от Criteo.
Загрузка сырых
данных в RDD
if os.path.isfile(fileName): rawData = (sc .textFile(fileName, 2) .map(lambda x: x.replace('\t', ','))) # work with either ',' or '\t' separated data print rawData.take(1) |
Много фич,
некоторые пустые, есть числа, есть
странный текст, вроде хешей каких-то.
Но это пока не важно. Мы тут занимаемся
экстрагированием фич.
(3a) Loading and
splitting the data
датасет на три части (3 RDD): тренировочный,
тестовый и кросс-валидационный наборы.
И закешируем их в оперативке.
weights = [.8, .1, .1] seed = 42 rawTrainData, rawValidationData, rawTestData = rawData.randomSplit(weights, seed) rawTrainData.cache() rawValidationData.cache() rawTestData.cache() nTrain = rawTrainData.count() nVal = rawValidationData.count() nTest = rawTestData.count() print nTrain, nVal, nTest, nTrain + nVal + nTest |
79911 10075 10014
Всего 100
(3b) Extract features
каждую строку датасета в список туплей.
Первый элемент тупля это порядковый
номер фичи, второй элемент – значение
Итоговый RDD
будет списком списков.
def parsePoint(point):
"""Converts a comma separated string into a list of (featureID, value) tuples.
featureIDs should start at 0 and increase to the number of features - 1.
point (str): A comma separated string where the first value is the label and the rest
are features.
list: A list of (featureID, value) tuples.
res = []
rowFeats = разбить по запятым и обрубить пробелы
for idx, x in enumerate(rowFeats):
пропустить первую фичу, это label
res.append((idx-1, x))
return res
parsedTrainFeat = rawTrainData.map(parsePoint)
numCategories = (parsedTrainFeat
.flatMap(lambda x: x)
.map(lambda x: (x[0], 1))
.reduceByKey(lambda x, y: x + y)
numCategories это
количества уникальных значений для
каждой фичи
[(0, 144), (1, 2467),
(2, 855), (3, 129), (4, 20311), (5, 1890), (6, 567), (7, 142), (8,
1796), (9, 8), (10, 81), (11, 62), (12, 252), (13, 471), (14, 492),
(15, 36044), (16, 21331), (17, 131), (18, 12), (19, 7221), (20, 233),
(21, 3), (22, 9905), (23, 3678), (24, 33988), (25, 2741), (26, 25),
(27, 4844), (28, 28762), (29, 10), (30, 2379), (31, 1223), (32, 4),
(33, 31887), (34, 11), (35, 14), (36, 10799), (37, 49), (38, 8325)]
(3c) Create an OHE
dictionary from the dataset
Это мы уже
проходили на игрушечном датасете.
ctrOHEDict = createOneHotDict(parsedTrainFeat) numCtrOHEFeats = len(ctrOHEDict.keys()) print numCtrOHEFeats
(3d) Apply OHE to the
Теперь используем
OHE словарь для создания нового датасета
из OHE фичей (кстати, можно было энкодить
не все фичи, некоторые уже были числовыми).
Подобную работу мы делали для игрушечного
def parseOHEPoint(point, OHEDict, numOHEFeats): """Obtain the label and feature vector for this raw observation. Note: You must use the function `oneHotEncoding` in this implementation or later portions of this lab may not function as expected. e.g. oneHotEncoding([(1, 'black'), (0, 'mouse')], sampleOHEDictManual, numSampleOHEFeats) Args: point (str): A comma separated string where the first value is the label and the rest are features. OHEDict (dict of (int, str) to int): Mapping of (featureID, value) to unique integer. numOHEFeats (int): The number of unique features in the training dataset. Returns: LabeledPoint: Contains the label for the observation and the one-hot-encoding of the raw features based on the provided OHE dictionary. """ features = [] rowFeats = разбить по запятым и обрубить пробелы for idx, x in enumerate(rowFeats): первая фича это лабель: label = х continue features.append((индекс, x)) oheFeats = закодировать фичи в OHE res = LabeledPoint(label, oheFeats) return res OHETrainData = rawTrainData.map(lambda point: parseOHEPoint(point, ctrOHEDict, numCtrOHEFeats)) OHETrainData.cache() print OHETrainData.take(1) |
Видно как
устроена OHE запись нового датасета:
лабель, длина вектора и SparseVector из индексов
и значений по этим индексам.
А визуализацию
мы пропустим, она малоинтересна.
(3e) Handling unseen
В функции
oneHotEncoding надо обратить внимание на
ситуацию, когда данные фичи пропущены.
В таком случае пропуск игнорируется (в
SparseVector остается нолик)
def oneHotEncoding(rawFeats, OHEDict, numOHEFeats): ??? key = OHEDict.get(x, None) if key is not None: ??? |
Part 4: CTR
prediction and logloss evaluation
Visualization 2: ROC
(4a) Logistic
Ну вот, добрались
до создания и тренировки модели.
from pyspark.mllib.classification import LogisticRegressionWithSGD # fixed hyperparameters numIters = 50 stepSize = 10. regParam = 1e-6 regType = 'l2' includeIntercept = True model0 = LogisticRegressionWithSGD.train( OHETrainData, iterations=итераций, step=размер шага, regParam=регуляризация, regType=тип регуляризации, intercept=использовать смещение ) sortedWeights = sorted(model0.weights) print sortedWeights[:5], model0.intercept |
-0.37973707648623972, -0.3699655826675331, -0.36934962879928285,
-0.32697945415010637] 0.56455084025
Как мы помним,
сумма произведений коэффициентов
(весов) на значения фич дает нам число,
засунув которое в функцию сигмоид, мы
получаем вероятность клика.
(4b) Log loss
we will use log loss to evaluate the quality of models
from math import log def computeLogLoss(p, y): """Calculates the value of log loss for a given probabilty and label. Note: log(0) is undefined, so when p is 0 we need to add a small value (epsilon) to it and when p is 1 we need to subtract a small value (epsilon) from it. Args: p (float): A probabilty between 0 and 1. y (int): A label. Takes on the values 0 and 1. Returns: float: The log loss value. """ epsilon = 10e-12 p = max(epsilon, p) p = min(1-epsilon, p) if y == 1: res = log(p) else: res = log(1 - p) return -1.0 * res print computeLogLoss(.5, 1) print computeLogLoss(.5, 0) print computeLogLoss(.99, 1) print computeLogLoss(.99, 0) print computeLogLoss(.01, 1) print computeLogLoss(.01, 0) print computeLogLoss(0, 1) print computeLogLoss(1, 1) print computeLogLoss(1, 0) |
Видно, как
штраф увеличивается при удалении от
правильного ответа.
(4c) Baseline log loss
Чтобы оценивать
модели, надо иметь точку опоры. Базовая
модель – дает вероятность клика равную
среднему значению лабелей.
classOneFracTrain = (OHETrainData .map(lambda x: лабель записи) .reduce(lambda x, y: сумма лабелей) ) / OHETrainData.количество записей print classOneFracTrain logLossTrBase = (OHETrainData .map(lambda x: computeLogLoss(найденная константа, лабель записи)) .reduce(lambda x, y: сумма штрафа) ) / OHETrainData.количество записей print 'Baseline Train Logloss = {0:.3f}\n'.format(logLossTrBase) |
Baseline Train Logloss
= 0.536
Точность модели
на уровне 50% – либо мы встретим динозавра,
либо нет.
(4d) Predicted
Напишем функцию,
выдающую предсказание по посчитанной
from math import exp # exp(-t) = e^-t import math def getP(x, w, intercept): """Calculate the probability for an observation given a set of weights and intercept. Note: We'll bound our raw prediction between 20 and -20 for numerical purposes. Args: x (SparseVector): A vector with values of 1.0 for features that exist in this observation and 0.0 otherwise. w (DenseVector): A vector of weights (betas) for the model. intercept (float): The model's intercept. Returns: float: A probability between 0 and 1. """ rawPrediction = интерсепт + фичи . веса # Bound the raw prediction value rawPrediction = min(rawPrediction, 20) rawPrediction = max(rawPrediction, -20) return math.pow(1.0 + exp(-1 * rawPrediction), -1) trainingPredictions = (OHETrainData .map(lambda x: getP(фичи, веса, интерсепт)) ) print trainingPredictions.take(5) |
0.10362661997434075, 0.283634247838756, 0.17846102057880114,
(4e) Evaluate the model
Оценим точность
модели и сравним с базовой
def evaluateResults(model, data): """Calculates the log loss for the data given the model. Args: model (LogisticRegressionModel): A trained logistic regression model. data (RDD of LabeledPoint): Labels and features for each observation. Returns: float: Log loss for the data. """ res = (data .map(lambda x: computeLogLoss(getP(???), лабель)) .reduce(lambda x, y: x + y) ) / data.count() return res logLossTrLR0 = evaluateResults(model0, OHETrainData) print ('OHE Features Train Logloss:\n\tBaseline = {0:.3f}\n\tLogReg = {1:.3f}' .format(logLossTrBase, logLossTrLR0)) |
OHE Features Train
Baseline = 0.536
LogReg = 0.457
Неплохо для
(4f) Validation log
Теперь заценим
результат на валидационном датасете
logLossValBase = (OHEValidationData .map(lambda x: computeLogLoss(classOneFracTrain, x.label)) .reduce(lambda x, y: x + y) ) / OHEValidationData.count() logLossValLR0 = evaluateResults(model0, OHEValidationData) print ('OHE Features Validation Logloss:\n\tBaseline = {0:.3f}\n\tLogReg = {1:.3f}' .format(logLossValBase, logLossValLR0)) |
OHE Features Validation
Baseline = 0.528
LogReg = 0.457
Лучше, чем у
базовой модели.
Visualization 2: ROC
We will now visualize
how well the model predicts our target. To do this we generate a plot
of the ROC curve. The ROC curve shows us the trade-off between the
false positive rate and true positive rate, as we liberalize the
threshold required to predict a positive outcome. A random model is
represented by the dashed line.
Пожалуйста, выбирайте пороговое значение какое устраивает.
Part 5: Reduce
feature dimension via feature hashing
Visualization 3:
Hyperparameter heat map
И теперь
быстренько пробежимся по методу
хеширования фич.
(5a) Hash function
from collections import defaultdict import hashlib def hashFunction(numBuckets, rawFeats, printMapping=False): """Calculate a feature dictionary for an observation's features based on hashing. Note: Use printMapping=True for debug purposes and to better understand how the hashing works. Args: numBuckets (int): Number of buckets to use as features. rawFeats (list of (int, str)): A list of features for an observation. Represented as (featureID, value) tuples. printMapping (bool, optional): If true, the mappings of featureString to index will be printed. Returns: dict of int to float: The keys will be integers which represent the buckets that the features have been hashed to. The value for a given key will contain the count of the (featureID, value) tuples that have hashed to that key. """ mapping = {} for ind, category in rawFeats: featureString = category + str(ind) mapping[featureString] = int(int(hashlib.md5(featureString).hexdigest(), 16) % numBuckets) if(printMapping): print mapping sparseFeatures = defaultdict(float) for bucket in mapping.values(): sparseFeatures[bucket] += 1.0 return dict(sparseFeatures) # Reminder of the sample values: # sampleOne = [(0, 'mouse'), (1, 'black')] # sampleTwo = [(0, 'cat'), (1, 'tabby'), (2, 'mouse')] # sampleThree = [(0, 'bear'), (1, 'black'), (2, 'salmon')] |
sampOneFourBuckets = hashFunction(4, sampleOne, True) sampTwoFourBuckets = hashFunction(4, sampleTwo, True) sampThreeFourBuckets = hashFunction(4, sampleThree, True) # Use one hundred buckets sampOneHundredBuckets = hashFunction(100, sampleOne, True) sampTwoHundredBuckets = hashFunction(100, sampleTwo, True) sampThreeHundredBuckets = hashFunction(100, sampleThree, True) print '\t\t 4 Buckets \t\t\t 100 Buckets' print 'SampleOne:\t {0}\t\t {1}'.format(sampOneFourBuckets, sampOneHundredBuckets) print 'SampleTwo:\t {0}\t\t {1}'.format(sampTwoFourBuckets, sampTwoHundredBuckets) print 'SampleThree:\t {0}\t {1}'.format(sampThreeFourBuckets, sampThreeHundredBuckets) |
{'black1': 2, 'mouse0':
{'cat0': 0, 'tabby1':
0, 'mouse2': 2}
{'bear0': 0, 'black1':
2, 'salmon2': 1}
{'black1': 14,
'mouse0': 31}
{'cat0': 40, 'tabby1':
16, 'mouse2': 62}
{'bear0': 72, 'black1':
14, 'salmon2': 5}
4 Buckets 100
SampleOne: {2: 1.0, 3:
1.0} {14: 1.0, 31: 1.0}
SampleTwo: {0: 2.0, 2:
1.0} {40: 1.0, 16: 1.0, 62: 1.0}
SampleThree: {0: 1.0,
1: 1.0, 2: 1.0} {72: 1.0, 5: 1.0, 14: 1.0}
На игрушечном
примере показано, как работает хеш-функция.
выдает нам параметр для SparseVector.
(5b) Creating hashed
хеш-функцию для подготовки датасетов
под задачу.
def parseHashPoint(point, numBuckets): """Create a LabeledPoint for this observation using hashing. Args: point (str): A comma separated string where the first value is the label and the rest are features. numBuckets: The number of buckets to hash to. Returns: LabeledPoint: A LabeledPoint with a label (0.0 or 1.0) and a SparseVector of hashed features. """ rowFeats = строку разбить по запятым и обрубить пробелы features = [] # list of tuples like [(0, 'mouse'), (1, 'black')] label = None for idx, x in enumerate(rowFeats): для первой фичи label = x continue features.append((индекс с 0, x)) dictHashed = hashFunction(количество корзин, фичи, False) hashedFeats = SparseVector(количество корзин, результат хеширования) res = LabeledPoint(label, hashedFeats) return res numBucketsCTR = 2 ** 15 hashTrainData = rawTrainData.map(lambda x: parseHashPoint(x, numBucketsCTR)) hashTrainData.cache() hashValidationData = rawValidationData.map ... hashValidationData.cache() hashTestData = rawTestData.map ... hashTestData.cache() print hashTrainData.take(1) |
Лабель и
SparseVector составляют запись датасета.
(5c) Sparsity
разреженности данных. Кому это интересно?
Пропустим, выдав сразу количественную
Average OHE Sparsity:
Average Hash Sparsity:
(5d) Logistic model
with hashed features
Будем тренировать
логистическую регрессию, подбирая
гиперпараметры через Grid Search.
numIters = 500 regType = 'l2' includeIntercept = True # Initialize variables using values from initial model training bestModel = None bestLogLoss = 1e10 stepSizes = (1, 10) regParams = (1e-6, 1e-3) for stepSize in stepSizes: for regParam in regParams: model = (LogisticRegressionWithSGD .train(hashTrainData, numIters, stepSize, regParam=regParam, regType=regType, intercept=includeIntercept)) logLossVa = evaluateResults(model, hashValidationData) print ('\tstepSize = {0:.1f}, regParam = {1:.0e}: logloss = {2:.3f}' .format(stepSize, regParam, logLossVa)) if (logLossVa < bestLogLoss): bestModel = model bestLogLoss = logLossVa print ('Hashed Features Validation Logloss:\n\tBaseline = {0:.3f}\n\tLogReg = {1:.3f}' .format(logLossValBase, bestLogLoss)) |
stepSize = 1.0,
regParam = 1e-06: logloss = 0.470
stepSize = 1.0,
regParam = 1e-03: logloss = 0.470
stepSize = 10.0,
regParam = 1e-06: logloss = 0.448
stepSize = 10.0,
regParam = 1e-03: logloss = 0.450
Hashed Features
Validation Logloss:
Baseline = 0.528
LogReg = 0.448
Visualization 3:
Hyperparameter heat map
We will now perform a
visualization of an extensive hyperparameter search. Specifically, we
will create a heat map where the brighter colors correspond to lower
values of logLoss
(5e) Evaluate on the
test set
Проверим нашу
лучшую модель на тестовом наборе данных.
logLossTest = evaluateResults(bestModel, hashTestData) # Log loss for the baseline model logLossTestBaseline = (hashTestData .map(lambda x: computeLogLoss(classOneFracTrain, x.label)) .sum() ) / hashTestData.count() print ('Hashed Features Test Log Loss:\n\tBaseline = {0:.3f}\n\tLogReg = {1:.3f}' .format(logLossTestBaseline, logLossTest)) |
Hashed Features Test
Log Loss:
Baseline = 0.537
LogReg = 0.456
Вот, четвертая
лабораторка закончена.
Мы трансформировали
датасеты, применяя технику Features
Extraction, используя кодирование
One-hot-encoding and Hashing.
Обучали модели
логистической регрессии, оценивали их
точность и делали предсказания.
Идея, в целом,
В следующий
раз будут материалы пятой недели курса.
WEEK 5: Principal
Component Analysis and Neuroimaging.
Topics: Introduction
to neuroscience and neuroimaging data, exploratory data analysis,
principal component analysis (PCA) formulations and solution,
distributed PCA
original post http://vasnake.blogspot.com/2015/11/week-4-lab-4-part-3.html
