执行

1. 交互式

/path/to/spark/bin/pyspack

2. 批处理:

/path/to/spark/bin/spark-submit ~/spark-resource/spark-training/spark/examples/src/main/python/pi.py

 

Basic RDDs

element-wise transformations

  • map
  • filter
  • flatMap

pseudo set operations transformations

要求所操作的RDDs含有相同类型的元素。

  • distinct,expensive!
  • union,不去重
  • intersection,会去重,expensive!
  • subtract,expensive!
  • cartesion,expensive!

actions

  • reduce
  • fold
  • aggregate
  • collect,会将所有结果返回到driver,所以结果集需要能保存在一台服务器的内存里。可以作用于list返回list,dict返回keys list。
  • take
  • top
  • takeSample
  • foreach,针对每个element应用func,没有返回值
  • count
  • countByValue
  • takeOrdered
需要注意:不像Scala和Java,除了Base RDD,还有DoubleRDD之类,Python只有base rdd。所有的rdd 方法都作用于base rdd,但如果其内含的数据类型不正确,就直接挂掉了。

 persistence

  • persist = cache。对应的level是 pyspark.StorageLevel.MEMORY_ONLY_SER and so on
  • unpersist
  • is_cached,属性,不是方法

 Key/Value pairs = pair RDDs

注意,python spark里的pair RDD是指 list(tuples),而非直接处理dict。例如 [ (‘k1′, ‘v1′), (‘k2′, ‘v2′) ]。

create pair RDDs

  • pairs = lines.map(lambda line: (line.split(” “)[0], line)) ,从一个文本里生成,以第一个word为key
  • data = sc.parallelize( [ (‘k1′, ‘v1′), (‘k2′, ‘v2′) ] ),从in-memory的dict里parallelize生成
  • 直接从文件里读取(TODO

transformations on pair RDDs

  • 上面列举的,可以针对base RDDs的transformations都可以用在pair RDDs上面。Pair RDD每个item是一个tuple(key, val),所以t[0]是key,t[1]是value。
transformations on one pair RDD
  • reduceByKey
  • groupByKey
  • combineByKey, createCombiner是在每个partition首次遇到一个key时被调用的,所以在整个数据集上会多次调用
  • mapValues
  • flatMapValues
  • keys
  • values
  • sortByKey
  • foldByKey

transformations on two pair RDDs

  • subtractByKey
  • join, inner join, only keys that are present in both pair RDDs are output
  • rightOuterJoin
  • leftOuterJoin 
  • cogroup

 actions on pair RDDs

  • 上面列举的可以针对base RDD的actions,也都可以应用于pair RDD
  • countByKey
  • collectAsMap,可以作用于 list( tuple, tuple ), 返回 dict。 不可作用于dict、非tuple的list
  • lookup

针对pari RDDs join操作的调优

  • partitionBy,将大数据集partition到“固定”的服务器上 ,之后再与小数据集join的时候,就不用分发大数据集了。spark会将小数据集按照相同的方式分发过去。是一个transformation,第二个参数partitionFunc可以用来控制如何进行分片。
  • 对于pair RDDs,当不改变key时,尽量使用mapValues和flatMapValues,以保持partition(虽然也可以用map模拟这些操作,但spark不会分析map的func,所以就无法维持partition了)
  • partition会极大影响性能,等在实战中积累经验之后,再来补充(TODO

 文件读取

  • python的saveAsSequenceFile,可以处理的是 tuple的list,例如:sc.parallelize([ (‘k1′, ‘v1′), (‘k2′, ‘v2′) ]).saveAsSequenceFile(‘…’)。如果处理dict、非tuple的list,则会报错:RDD element of type java.lang.String cannot be used。另外,如果传入的keyClass、valueClass与类型不匹配,则会默认被当做string处理。
Output a Python RDD of key-value pairs (of form RDD[(K, V)]) to any Hadoop file system
……
  1. A Java RDD is created from the SequenceFile or other InputFormat, and the key and value Writable classes
  2. Serialization is attempted via Pyrolite pickling
  3. If this fails, the fallback is to call ‘toString’ on each key and value
  4. PickleSerializer is used to deserialize pickled objects on the Python side
  •  saveAsNewAPIHadoopFile,也是处理list(tuples)。可以用于protobuf格式的数据。

shared variables

  • 可以broadcast list、dict等类型的数据,用b_var.value读取其值
  • action阶段的accumulator 可以保证fault tolerant时只计算一次,transformation阶段的不保证

spark transformations:

  • map(func) , and func is:  mixed func(x)

example: distData.map(lambda x: (x, 1))  是用python的lambda构造了一个匿名函数,接受map的每一行作为输入x,并返回(x, 1)。所以,这条语句的作用是,针对每一行x,转换为(x, 1)。

  • filter(func), and func is: bool func(x)

example:  f.filter(lambda x: x.find(“spark”) != -1) 是在f对应的RDD里,找包含spark这个单词的行。

  • flatMap(func), and func is: Seq func(x)

example:  f.flatMap(lambda x: x.split(” “)).collect() 是用空格符分隔输入的每一行,这样一个flatMap输入行会对应到多个输出行,所以返回的是Seq。注意,以上语句的output每一行是一个单词,如果改为f.map(lambda x: x.split(” “)).collect() 则output每一行是一个seq(word)。

  • sample(withReplacement, fraction, seed) 该method接受3个参数,分别是bool、float和int型。

example: f.flatMap(lambda x: x.split(” “)).sample(withReplacement = True, fraction = 0.01, seed = 181341).collect()

  • union(otherDataSet), 貌似otherDataSet也需要是一个RDD,而不能是一个普通的array

example:

new_edges = tc.join(edges).map(lambda (_, (a, b)): (b, a))
tc = tc.union(new_edges).distinct().cache()

  • distinct([numTasks]), 去重

example: f.flatMap(lambda x: x.split(” “)).distinct()

  • groupByKey([numTasks]),类似sql的group by,将(k, v) => (k, seq(v))
  • reduceByKey(func, [numTasks]),类似MR的reduce,针对相同key的所有val,循环调用func方法

example:

contribs.reduceByKey(add) // 直接用python add作为func使用了

f.reduceByKey(lambda x, _: x) // 生成了一个匿名函数,作为func使用

pointStats = closest.reduceByKey(lambda (x1, y1), (x2, y2): (x1 + x2, y1 + y2)) // 生成了一个匿名函数,作为func使用

  • sortByKey([ascending], [numTasks])
  • join(otherDataSet, [numTasks])
  • cogroup(otherDataSet, [numTasks])
  • cartesian(otherDataSet),类似php的array_combine

Spark Actions:

  • reduce(func), func is: mixed  func(mixed first, mixed second)
  • collect()
  • count()
  • first(), 等同于调用take(1)
  • take(n),注意:当前非并行处理,而是由driver program自行计算的
  • takeSample(withReplacement, fraction, seed)
  • saveAsTextFile(path),path是directory路径,针对每一个element会调用toString转化为字符串进行写入
  • saveAsSequenceFile(path),path是directory路径,Only available on RDDs of key-value pairs that either implement Hadoop’s Writable interface or are implicitly convertible to Writable (Spark includes conversions for basic types like Int, Double, String, etc).
  • countByKey()
  • foreach(func)

pyspark package

  • parallelize(c, numSlices=None),numSlices控制启动几个tasks并行计算

example:count = sc.parallelize(xrange(1, n+1), slices).map(f).reduce(add)

Python help

>>> help (‘operator’)

>>> help(‘operator.add’)

 

Leave a Reply