Data Types - RDD-Based API

MLlib支持存储在单机上的本地向量和矩阵,和由一个或者多个RDDs组成的分布式矩阵。
本地向量和矩阵是很简单的数据模型,用来当作公共接口。底层的线性代数操作是由Breeze提供。
在MLlib中,用于有监督学习的训练样本称为 labeled point 。

Local vector

本地向量具有从0开始的integer类型的索引以及double类型的值,并且保存在单台机器上。
MLlib支持两种类型的本地向量:稠密型(dense)和稀疏型(spase)。
稠密向量通过double类型的数组来表示它的输入值(entry values);而稀疏向量是通过两个并列的数组(indices和values)来表示。例如,一个向量 (1.0, 0.0, 3.0)用稠密的形式可以表示为[1.0, 0.0, 3.0],或者用稀疏的形式表示为(3, [0, 2], [1.0, 3.0]),这里3表示向量的大小。

MLlib把下列类型当作密集型向量:

  • NumPy’s array
  • Python’s list, e.g.,[1, 2, 3]
import numpy as np

dv1 = np.array([1.0, 0.0, 3.0])  # 使用NumPy数组作为稠密型向量
dv2 = [1.0, 0.0, 3.0]  # 使用Python列表作为稠密型向量

把下列类型当作稀疏型向量:

import scipy.sparse as sps
from pyspark.mllib.linalg import Vectors

sv1 = Vectors.sparse(3, [0, 2], [1.0, 3.0])  #构建稀疏向量
sv2 = sps.csc_matrix((np.array([1.0, 3.0]), np.array([0, 2]), np.array([0, 2])), shape = (3, 1))  #用单列的SciPy csc_matrix作为稀疏向量

基于效率考虑,推荐使用NumPy数组而不是python本身的列表,并且使用在Vectors中实现的工厂方法来创建稀疏向量。

Refer to the Vectors Python docs for more details on the API.

Labeled point

labeled point 是带有标签/response的本地向量,既可以是稠密的,也可以是稀疏的。在MLlib中,labeled points被用在有监督学习算法中。我们用double类型存储标签,这样就可以在回归和分类任务中都使用labeled points。对于二元分类,标签应该是0 (negative) 或者 1 (positive)。对于多元分类,标签应该是从0开始的类别索引:0,1,2,…

A labeled point is represented by LabeledPoint.
Refer to the LabeledPoint Python docs for more details on the API.

from pyspark.mllib.linalg import SparseVector
from pyspark.mllib.regression import LabeledPoint

pos = LabeledPoint(1.0, [1.0, 0.0, 3.0])   #用正向的标签和稠密的特征向量创建labeled point
neg = LabeledPoint(0.0, SparseVector(3, [0, 2], [1.0, 3.0])) # 用负向的标签和稀疏的特征向量创建labeled point

Sparse data
实际上,我们的训练数据通常都是稀疏的。MLlib支持读取用LIBSVM格式存储的训练样本,这种格式是LIBSVMLIBLINEAR使用的默认格式。它是一种文本格式,每一行是一个带标签的稀疏特征向量,具体格式如下:

label index1:value1 index2:value2 …

这里索引是从1开始,并且是升序排列。数据经过加载之后,特征的索引被转化为从0开始。

MLUtils.loadLibSVMFile reads training examples stored in LIBSVM format.
Refer to the MLUtils Python docs for more details on the API.

from pyspark.mllib.util import MLUtils

examples = MLUtils.loadLibSVMFile(sc, "data/mllib/sample_libsvm_data.txt")

Local matrix

本地矩阵具有integer类型的行列索引,和double类型的值,存储在单台机器上。MLlib支持稠密型的矩阵,它的输入值以列顺序被存在一个单独的double类型的数组中。也支持稀疏矩阵,非零的输入值以列顺序用压缩稀疏列Compressed Sparse Column (CSC)的格式来存储。
比如,下列的稠密型矩阵
$$ \left( \begin{matrix} 1.0 & 2.0 \\ 3.0 & 4.0 \\ 5.0 & 6.0 \end{matrix} \right) $$
是用一维数组[1.0, 3.0, 5.0, 2.0, 4.0, 6.0]和矩阵大小(3, 2)存储的。

上面所说的一个单独的double类型的数组指的就是[1.0, 3.0, 5.0, 2.0, 4.0, 6.0],也就是说矩阵里的所有元素都是存在一个数组中的,值的类型为double类型。存储的方式是先存矩阵的第1列的元素,然后再存第2列,以此类推。

本地矩阵的基类是Matrix,我们提供了两种实现:DenseMatrixSparseMatrix。我们推荐使用在Matrices中实现的工厂方法来创建本地矩阵。注意,MLlib中的本地矩阵是以列顺序存储的。

Refer to the Matrix Python docs and Matrices Python docs for more details on the API.

from pyspark.mllib.linalg import Matrix, Matrices

# Create a dense matrix ((1.0, 2.0), (3.0, 4.0), (5.0, 6.0))
dm2 = Matrices.dense(3, 2, [1, 2, 3, 4, 5, 6])

# Create a sparse matrix ((9.0, 0.0), (0.0, 8.0), (0.0, 6.0))
sm = Matrices.sparse(3, 2, [0, 1, 3], [0, 2, 1], [9, 6, 8])

Distributed matrix

分布式矩阵有long类型的行列索引,和double类型的值,分散地存储在一个或者多个RDDs里。选择正确的格式来存储分布式的矩阵是很重要的。将分布式矩阵转换成另一种不同的格式可能需要进行全局的shuffle,这个代价是昂贵的。目前已经实现了4种分布式矩阵。

基本的类型是 RowMatrixRowMatrix 是面向行的分布式矩阵,没有有意义的行索引(没有行索引)。比如,特征向量的集合。它被它的行的RDD支持,这里每一行都是一个本地向量。我们假设RowMatrix的列的数量不是太大,一个单个的本地向量可以被合理地传递给 driver,也可以用一个单节点来存储和操作。IndexedRowMatrixRowMatrix 很相似,但是有行索引,行索引可以被用来定位行,以及进行 join 操作。CoordinateMatrix 是以 coordinate list (COO) 格式存储的分布式矩阵,由它的输入的RDD所支持。BlockMatrix是一个分布式矩阵,由MatrixBlock的RDD所支持,MatrixBlock是一个(Int, Int, Matrix)元组。

注意: 分布式矩阵的内部RDDs必须是确定性的,因为我们会缓存矩阵的大小。通常使用非确定性RDDS可能会导致错误。

RowMatrix

RowMatrix是一个面向行的分布式矩阵,没有行索引,由一个所有行组成的RDD支持,这里每一行都是一个本地向量。因为每一行都由一个本地向量所表示,列的数量只能在整数范围以内,实际上,列的数目应该更小。

我们可以用向量的RDD来创建RowMatrix

Refer to the RowMatrix Python docs for more details on the API.

from pyspark.mllib.linalg.distributed import RowMatrix

rows = sc.parallelize([[1, 2, 3], [4, 5, 6], [7, 8, 9], [10, 11, 12]])   # 创建一个向量组成的RDD。

mat = RowMatrix(rows)   # 从向量组成的RDD创建一个RowMatrix

m = mat.numRows()  # 4   # 获得它们的大小
n = mat.numCols()  # 3

rowsRDD = mat.rows    # 取出所有的行再生成一个由向量组成的RDD

IndexedRowMatrix

IndexedRowMatrixRowMatrix 很相似,但是它有有意义的行索引。由于IndexedRowMatrix就是一个行含有索引的RDD,每一行都由它的索引(long类型)和本地向量所表示。

我们可以用由IndexedRows组成的RDD创建IndexedRowMatrix,这里IndexedRow是对(long, vector)的一个包装。IndexedRowMatrixcan可以通过丢掉它的行索引转变成RowMatrix

Refer to the IndexedRowMatrix Python docs for more details on the API.

CoordinateMatrix

CoordinateMatrix是分布式矩阵,内部是它的输入值的RDD。 它的每个输入都是一个元组(i: Long, j: Long, value: Double),这里i是行索引,j是列索引,值是输入值。只有当矩阵的两个维度都很大,并且矩阵是非常稀疏的情况下,我们才使用CoordinateMatrix

我们可以通过MatrixEntry输入格式的RDD来创建一个CoordinateMatrix ,这里MatrixEntry是对(long, long, float)的包装。 CoordinateMatrix通过调用toRowMatrix方法,可以被转化为 RowMatrix,或者调用 toIndexedRowMatrix方法转化成具有稀疏的行的IndexedRowMatrix。因为整个矩阵是稀疏的,所以行也是稀疏的。

Refer to the CoordinateMatrix Python docs for more details on the API.

BlockMatrix

BlockMatrix是分布式矩阵,内部是MatrixBlocks格式的RDD,这里MatrixBlock是((Int, Int), Matrix)形式的元组,(Int, Int)是block的索引, Matrix是在给定索引位置上大小为rowsPerBlock x colsPerBlock的子矩阵。BlockMatrix支持和另一个矩阵做像加法和乘法的方法。BlockMatrix也有一个帮助函数validate,它可以用来检查BlockMatrix是否被设置正确。

我们可以通过子矩阵块的RDD来创建一个BlockMatrix,这里子矩阵块是一个((blockRowIndex, blockColIndex), sub-matrix)元组。

Refer to the BlockMatrix Python docs for more details on the API.