Archive for 六月, 2015

spark的调度分为资源的调度和任务的调度。前者目前支持standalone、yarn、mesos等,后者对标hadoop的MapReduce。本文介绍资源调度的流程,能力所限仅cover到最简单的standalone,无法对比各种调度框架的优劣。

Spark中与资源调度相关的角色包括driver、master、worker和executor,其中executor更多是与任务调度有关。其交互关系如下图所示,主要交互流如下:

  • driver->master:提交Application,并告知需要多少资源(cores)
  • master->driver:告知Application提交成功(我们只考虑最理想的情况)
  • driver->workers:发送appDescription,启动executor子进程
  • executor->driver:注册executor
  • driver->executor:提交Tasks,由后者执行任务

任务提交流程

上图的虚线代表Actor交互,spark基于Actor模式的Akka包进行进程、线程、网络间的交互,以避免同步、死锁等问题,也使代码相对简单。可以注意到,worker主进程是不跟driver交互的,只有executor子进程才跟driver交互。

Driver & Master

在SparkContext初始化的时候,在SparkContext class的一个大try catch中,就会完成Application注册,在standalone mode下,主要做了以下事情:

  • 启动心跳维持监听actor,因为executors需要同driver维持心跳
  • 启动Job进度监听actor,在UI和LOG里需要能够看到进度
  • 启动mapOutputTracker,因为作为reducer的executors会向driver询问map output的地址
  • 初始化task pool,确定FIFO or FAIR的调度方式
  • 根据masters uri的格式,确定schedulerBackend和taskScheduler的取值,其中schedulerBackend与executor端的executorBackend交互
  • 由AppClient/ClientActor代理,同masters交互,确定可用master,由后者代理获取并启动workers上的可用executors资源

SparkContext.createTaskScheduler方法会确定backend和scheduler的取值,并调用_taskScheduler.initialize(),初始化pool并确定schedulableBuilder:FIFO、FAIR。

masterschedulerBackendtaskScheduler
localLocalBackendTaskSchedulerImpl
local\[([0-9]+|\*)\]LocalBackendTaskSchedulerImpl
local\[([0-9]+|\*)\s*,\s*([0-9]+)\]LocalBackendTaskSchedulerImpl
spark://(.*)SparkDeploySchedulerBackendTaskSchedulerImpl
local-cluster\[\s*([0-9]+)\s*,\s*([0-9]+)\s*,\s*([0-9]+)\s*]SparkDeploySchedulerBackendTaskSchedulerImpl
"yarn-standalone" | "yarn-cluster"org.apache.spark.scheduler.cluster.YarnClusterSchedulerBackendorg.apache.spark.scheduler.cluster.YarnClusterScheduler
"yarn-client"org.apache.spark.scheduler.cluster.YarnClientSchedulerBackendorg.apache.spark.scheduler.cluster.YarnScheduler
mesosUrl @ MESOS_REGEX(_)CoarseMesosSchedulerBackend or MesosSchedulerBackendTaskSchedulerImpl
simr://(.*)SimrSchedulerBackendTaskSchedulerImpl
确定两者取值之后,会立即触发与master的交互:

 XML |  copy code |? 
1
 // start TaskScheduler after taskScheduler sets DAGScheduler reference in DAGScheduler's
2
// constructor
3
_taskScheduler.start()   //  以standalone模式为例,间接调用 _schedulerBackend.start(),并阻塞等待初始化AppClient完成(即接收到master返回的RegisteredApplication消息)

AppClient就是一个wrapper,初始化了:

     

 Scala |  copy code |? 
1
actor = actorSystem.actorOf(Props(new ClientActor))

ClientActor代理与master的交互,在preStart()里向每一个master uri发送RegisterApplication消息,并注册了接收处理master发送消息的多个处理方法。

Worker & Driver

deploy/worker/Worker.scala 是worker主进程,以actor方式接收master发送的消息,例如LaunchExecutor。在该消息处理方法中,通过一个子线程启动真正的executor子进程。
deploy/worker/ExecutorRunner.scala -> fetchAndRunExecutor()是启动子进程的地方。
executor/CoarseGrainedExecutorBackend.scala -> onStart() 里会向driver发送RegisterExecutor消息,由driver的schedulerBackend接收并处理。在该消息里提交了executor的相关信息,driver会将executor信息存储在executorDataMap对象里,并触发makeOffers方法,分配pending的tasks。

Driver端的消息处理方法如下:

 Scala |  copy code |? 
01
    case RegisterExecutor(executorId, executorRef, hostPort, cores, logUrls) =>
02
        Utils.checkHostPort(hostPort, "Host port expected " + hostPort)
03
        if (executorDataMap.contains(executorId)) {
04
          context.reply(RegisterExecutorFailed("Duplicate executor ID: " + executorId))
05
        } else {
06
          logInfo("Registered executor: " + executorRef + " with ID " + executorId)
07
          context.reply(RegisteredExecutor)
08
          addressToExecutorId(executorRef.address) = executorId
09
          totalCoreCount.addAndGet(cores)
10
          totalRegisteredExecutors.addAndGet(1)
11
          val (host, _) = Utils.parseHostPort(hostPort)
12
          val data = new ExecutorData(executorRef, executorRef.address, host, cores, cores, logUrls)
13
          // This must be synchronized because variables mutated
14
          // in this block are read when requesting executors
15
          CoarseGrainedSchedulerBackend.this.synchronized {
16
            executorDataMap.put(executorId, data)
17
            if (numPendingExecutors > 0) {
18
              numPendingExecutors −= 1
19
              logDebug(s"Decremented number of pending executors ($numPendingExecutors left)")
20
            }
21
          }
22
          listenerBus.post(  // 通过bus,向所有监听SparkListenerExecutorAdded的线程发送通知,这里好像没人关注。
23
            SparkListenerExecutorAdded(System.currentTimeMillis(), executorId, data)) 
24
          makeOffers()  // Make fake resource offers on all executors ,
25
        }

其中makerOffers方法会分配tasks,是把资源调度和Task调度融合的地方,调用该方法的地方还有:

  •      case StatusUpdate
  •      case ReviveOffers    submitTasks以及任务重试等时机时,会调用
  •      case RegisterExecutor   即一旦有executor注册,就看看有没有需要分配的任务
该方法调用scheduler.resourceOffers ,每次尽量调度更多的tasks,主要考虑的是:
  • 本地化 recomputeLocality,PROCESS_LOCAL, NODE_LOCAL, NO_PREF, RACK_LOCAL, ANY
  • 负载均衡 round robin
  • 任务的重跑
  • 任务 + executorId的黑名单
  • 各个状态的任务列表

其中调用的scheduler.resourceOffers关键代码如下,针对每个taskSet,尽量多的调度task,在分配时,尽量分配locality的task:

 Scala |  copy code |? 
01
    // Take each TaskSet in our scheduling order, and then offer it each node in increasing order
02
    // of locality levels so that it gets a chance to launch local tasks on all of them.
03
    // NOTE: the preferredLocality order: PROCESS_LOCAL, NODE_LOCAL, NO_PREF, RACK_LOCAL, ANY
04
    var launchedTask = false
05
    for (taskSet <− sortedTaskSets; maxLocality <− taskSet.myLocalityLevels) {
06
      do {
07
        launchedTask = resourceOfferSingleTaskSet(
08
            taskSet, maxLocality, shuffledOffers, availableCpus, tasks)
09
      } while (launchedTask)
10
    }

在scheduler.resourceOffers子方法中,也会告知driver,有task状态的改变,可用通过listenerBus告知监听者:

 Scala |  copy code |? 
1
          sched.dagScheduler.taskStarted(task, info) 
2
               eventProcessLoop.post(BeginEvent(task, taskInfo))

在makeOffers的最后,调用launchTasks,这时才是真正发起任务。workder端的CoarseGrainedExecutorBackend.scala接收到消息后,会继续执行。

 Scala |  copy code |? 
1
          executorData.executorEndpoint.send(LaunchTask(new SerializableBuffer(serializedTask)))

     总结:
  • Worker主进程启动executor子进程后(独立JVM),后者通过actor方式向driver注册自己
  • driver接收到RegisterExecutor消息,会检查有没有pending的tasks,并且task的localityLevel与executorId匹配
  • 如果有合适的tasks(可能多个),则会再通过actor方式发送LaunchTask消息给executor,由后者真正执行任务

问题:

  • driver、master、worker里有哪些相关的子线程、子进程?
    • driver里至少有工作主进程、通过ClientActor与master通信的子线程们、通过SchedulerBackend与executors通信的子线程们、关注各种进度的listenerBus相关子线程、通过MapOutputTrackerMasterEndpoint管理mapOutput位置的子线程们、心跳维持的子线程,并且应该还有block、http等
    • standalone的master采用actor子线程维护可用workers列表,包括资源使用情况;并且接收driver发起的app注册请求等。如果是使用zk进行master选主,还得维持与zk之间的连接。另外,还有UI界面。
    • worker主进程需要维持与master之间的心跳,汇报资源使用情况。executors子进程需要同driver间维持心跳。主进程通过子线程启动executor子进程。另外还有block、shuffle output(不确定是否直接复用了block线程or进程)等。
  • 模块、进程、线程间如何通信的?数据结构体如何?
    • scala程序,网络、进程、线程间主要通过Actor方式进行交互,资源调度主要走这条流
    • python通过py4j调用java代理的scala程序
    • shuffle output等通过netty交互数据
    • broadcast通过p2p交互数据
  • master怎么知道有哪些可用worker的?
    • worker启动时,向master发送RegisterWorker消息,由Master类通过Actor方式处理
  • 任务调度的算法是什么?有没有优缺点?trait SchedulerBuilder , FairSchedulingAlgorithm/FIFOSchedulingAlgorithm
    • FIFO:
    • Fair:
  • 资源调度的依据,对可用executor 随机化,locality的原则,每个worker有cores限制,当前每个task只能占用1个core
  • jar、files、library是这时发送给executor的吗?driver还是master发送的呢?
    • driver提交给master的是appDescription,其中包含的command只是指定了executorBackend类,未涉及app具体的代码
    • driver在executor注册后,向其发送的launchTask消息才包含真正的task信息(files、jars名称),由TaskRunner中反序列化并拉取文件
  • standalone、yarn、mesos切换对driver、master、worker都有什么影响呢?使用到哪些类?
    • 目前只能看出有不同的backend类,实现功能,没有比较优劣。
  • 同一worker上的资源隔离做到哪一步?JVM可以控制memory,CPU、IO、DISK呢?
    • 我们当前的配置,一台worker上只能有一个executor,后者是在driver->master->Worker时生成的
    • 每个executor在调度时,可能会被分配多个tasks,只要availableCpus(i) >= CPUS_PER_TASK
    • executorBackend在接收到launchTask请求时,会生成TaskRunner对象,并由threadpool调度执行,即如果有多个tasks到达,就会多线程并发执行了
    • 这些属于同一个executor的tasks共享一个JVM,所以共享executor.memory等限制
  • JVM、python进程复用 是如何做到的?
  • standalone cluster manager type时,如何与zk配合?
    • ZooKeeperPersistenceEngine和ZooKeeperLeaderElectionAgent 在Master类里被初始化。后者在new时直接start了,基于apache curator.apache.org实现,其isLeader、notLeader方法会被curator的方法调用。

最后,放一张简化后的时序图:

spark-resource-schedule

IDEA Intellij小技巧和插件 一文中简单介绍了一下IdeaVim插件。在这里详细总结一下这个插件在日常编程中的一些常用小技巧。供有兴趣使用这个插件,但对Vim还不十分熟悉的朋友参考。当然基本的hjkl移动光标和几种常见模式等等基本概念就略过不提了。

为了确保只包含常用操作,这里提到的技巧都没有从现成文档里抄,而是凭记忆列出(不常用自然就不记得了)。估计会有所遗漏,慢慢再补充。

1. 切换Vim模拟器状态

这个插件允许设置一个快捷键一键开启或关闭,在切换模式时会同时自动切换keymap,十分方便。默认键位是Ctrl+Alt+V,但这个键位覆盖了很常用的“抽取局部变量”功能,建议重设,在setting->keymap中查找VIM Emulator即可。

由于开启和关闭状态分别使用两套keymap,因此两套都需要设定。可以把两套keymap下的都设为一样的键,也就是用同一个键切换。但个人建议设为不同的键,这样能清楚知道当前处于那种模式中。并且,如果在开启Vim的插入模式下关闭Vim模拟器,下次进入时仍然是插入模式,比较混乱(因为你关闭模拟器就是为了使用默认keymap输入大段代码,重新开启Vim模拟器就是为了使用普通模式下的命令)。因此建议把Vim keymap中的Exit Insert Mode设为与另一个keymap的Vim Emulator相同的键(也就是进入Vim模拟器的快捷键)。例如,我使用的设定是:
Default keymap -> Vim Emulator : Ctrl+;     (用Ctrl+分号开启Vim模拟器)
Vim keymap -> Vim Emulator : Ctrl+,    (用Ctrl+逗号关闭Vim模拟器)
Vim keymap -> Vim Emulator : Ctrl+;    (用Ctrl+分号退出插入模式,进入普通模式)
这样,在任何时候只要连按两下ctrl+分号,就能保证必定在Vim模拟器的普通模式中。


2. ScrollOff 参数

启动Intellij后在Vim模拟器下输入命令 :set so=5 可以令屏幕滚动时在光标上下方保留5行预览代码(也就是光标会在第5行触发向上滚动,或者在倒数第5行触发向下滚动)。在代码窗口比较狭小时(例如单步跟踪调试时)非常方便。可惜仅在Vim模拟器开启时有效。

3. 行号定位
普通模式下输入 行号G 或 :行号<回车> 都能快速定位到某一行。区别在于前者在输入行号时屏幕上没有任何提示,后者则在Vim命令输入框中可以看到输入过程。(题外话:Sublime Text 2也是用 :行号 来快速定位到某行,应该是沿用了Vim的习惯)

4. 进入修改
进入插入模式的方式有很多,直接选用合适的方式进入插入模式比进入后再用箭头键移动光标要好。常用的有:
o – 在当前行下方插入新行并自动缩进
O – 在当前行上方插入新行并自动缩进 (普通模式下的大写字母命令用 shift+字母键 输入,下同)
i – 在当前字符左方开始插入字符
a – 在当前字符右方开始插入字符
I – 光标移动到行首并进入插入模式
A – 光标移动到行尾并进入插入模式
s – 删除光标所在字符并进入插入模式
S – 删除光标所在行并进入插入模式
c<范围> – 删除光标所在位置周围某个范围的文本并进入插入模式。关于范围请看第5点,常用的组合有:caw – 删除一个单词包括它后面的空格并开始插入; ciw – 删除一个单词并开始插入; ci” – 删除一个字符串内部文本并开始插入; c$ – 从光标位置删除到行尾并开始插入; ct字符 – 从光标位置删除本行某个字符之前(保留该字符)并开始插入。等等。
C – 删除光标位置到行尾的内容并进入插入模式 (相当于c$)
r – 修改光标所在字符,然后返回普通模式
R – 进入覆盖模式

5. 范围操作
某些普通模式的动作命令后面可以追加一些表示范围的指令,表示该动作将作用在整个范围上。这类命令常用的有:
d<范围> – 删除一定范围内的文本
c<范围> – 删除一定范围内的文本并进入插入模式
y<范围> – 将范围内的文本放入0号和”号注册栏
v<范围> – 选择范围内的文本
=<范围> – 自动缩进范围内的文本
gU<范围> – 将范围内的字符转换为大写
gu<范围> – 将范围内的字符转换为小写
><范围> – 将范围中的内容缩进一格
<<范围> – 将范围中的内容取消缩进一格

常用的范围指令有:
空格 – 光标所在位置字符。(例如 gU空格 – 将光标位置字符转为大写)
重复某些动作命令 – 光标所在行。 (例如dd删除一行,yy复制一行,cc删除一行文本并开始插入,>> 当前行缩进一格,==自动缩进当前行)
$ – 从光标位置到行尾
^ – 从光标位置到行首,不包含缩进空白
0 – 从光标位置到行首,包含缩进空白
gg – 从光标位置到文件开头
G – 从光标位置到文件结尾
% – 从光标位置到另一边匹配的括号
f<字符> – 从光标位置到光标右边某个字符首次出现的位置,包括该字符
F<字符> – 从光标位置到光标左边某个字符首次出现的位置,包括该字符
t<字符> – 从光标位置到光标右边某个字符首次出现的位置,包括该字符
F<字符> – 从光标位置到光标左边某个字符首次出现的位置,包括该字符
/正则表达式 – 从光标位置到下一个匹配正则表达式的位置(跨行)
?正则表达式 – 从光标位置到上一个匹配正则表达式的位置(跨行)
aw – 一个单词加一个空格 (a可理解为“一个”,下同)
iw – 一个单词 (i可理解为in,下同)
a” – 一个字符串包括双引号
i” – 一个字符串内部文本
a< – 一组< >包含的文本,包括< >号本身
同理类推: i<, a[, i[, a(, i(
注意:真正vim中的it范围(一对xml标签内部)在ideaVim中不生效。

用/或?命令查找时,正则表达式默认大小写敏感,如果需要不敏感,可以在正则表达式开始处加上\c标志。例如 /\cabc 可以匹配到 ABC。下面提到的:s命令同样适用。

6. 选择文本
在Vim中,选择文本需要进入“可视模式”(Visual Mode),这个名称比较奇怪,它的来由据说是因为在Vim的前身Vi中,选择区域是不可见的。在Vim中选择区域会高亮显示,因此称为“可视模式”。
v – 进入字符选择模式, V – 进入行选择模式, Ctrl+v – 进入块选择模式。
进入相应模式后移动光标即可选中文本。过程中可按o键令光标在选区两端切换。
在块选择模式中选中多行,然后按I或A后输入文本,再退出插入模式,所输入的文本将自动加入到每一行的开头或结尾。

7. 复制粘贴
在Vim模式下,复制粘贴并不直接使用系统的剪贴板,而是使用Vim提供的多个“寄存器”,每个寄存器都以一个字符来表示。关于寄存器的详细说明可以看这里 http://blah.blogsome.com/2006/04/27/vim_tut_register/ (随便google的一个网页),这里简单列一些常用的操作技巧 (注意,vim使用双引号”来作为选择寄存器的命令,因此下文中的双引号均指在普通模式下按双引号键):

a)用y命令将文本存入寄存器后,如果想在别处替换原有内容,可以先用v命令选中原有内容,然后用p命令粘贴。但第一次粘贴后,默认的寄存器将被替换为刚刚删除的内容。如果要再次粘贴之前复制的内容,需要使用 “0p 命令组合来复制。也可以进入插入模式后用 Ctrl+r 0 来复制,例如 ciw<Ctrl+r>0 命令组合将用粘贴内容替换光标处的一个单词,并停留在插入模式。

b)在Windows下,寄存器 + 和 * 都代表系统剪贴板,可以互换使用,选一个顺手的即可。例如 “+yy 命令组合可将当前行复制到系统剪贴板。 ci”<Ctrl+r>* 命令组合则将系统剪贴板的内容替换字符串的内部文本。

c) 寄存器1至9记录之前九次的删除大段文本,每次超过一行的删除操作都会导致这9个寄存器的内容发生位移,最近删除的文本会存入寄存器1。但只有删除超过1行时才会影响寄存器1至9,行内的删除内容则会被存入寄存器-(减号)。如果用q命令录制宏时不涉及跨行删除,可以在宏中直接使用这9个寄存器来暂存文本。(在Vim中,复制内容与录制宏共享同一套寄存器,因此我习惯把字母寄存器留给宏使用)

d) 普通模式下小写p把寄存器内容复制到当前位置之后,大写P把寄存器内容复制到当前位置之前。

e) 使用 :regs 命令可以列出当前所有寄存器的内容

8.  一些插入模式下的常用快捷键
Ctrl+h – 删除光标左边字符
Ctrl+w – 删除光标左边的单词
Ctrl+y – 复制上方的一个字符
Ctrl+e – 复制下方的一个字符
Ctrl+r 0 – 插入前一次用y命令寄存的内容
Ctrl+r * – 插入系统剪贴板的内容
Ctrl+r <寄存器名称> – 插入指定寄存器的内容
Ctrl+a – 插入前一次插入模式所键入的内容
Ctrl+o – 执行一个普通模式下的命令然后返回插入模式。 例如 Ctrl+o A 相当于按 End键, Ctrl+o I相当于按Home键

9. 退出插入模式
退出插入模式可以用 ESC 键,但键位太远。其实也可以用 Ctrl+[ 键退出插入模式 。当然也可以用第1点自定义的Ctrl+;快捷键,但这不是标准vim按键,会养成不良习惯,不建议使用。

10. 重复操作
普通模式下按. (小数点)可重复上一次的修改操作
& – 重复上一次的:s替换命令
@@ – 重复上一次执行的宏

11. 跳转
Ctrl+] 跳转到当前标识符的定义位置 (相当于在当前光标位置的单词上按住ctrl用鼠标点击)
Ctrl+o 回退一步 (go back)
Ctrl+i 前进一步 (go forward)
`. 跳转到之前修改位置
“ 在前一次跳转位置与当前位置间切换
行号G 或 :行号<回车>  跳转到某一行
gg 跳转到文件开头
G  跳转到文件末尾
H  跳转到屏幕顶端(如果设置了set so=n,则跳转到第n行)
L  跳转到屏幕底端(如果设置了set so=n,则跳转到倒数第n行)
M  跳转到屏幕中间
f 或 F 跳转到本行某个字符,小写f向右查找,大写F向左查找。用;或,在匹配间切换
t 或 T 跳转到本行某个字符之前,小写t向右查找,大写T向左查找。用;或,在匹配间切换
/正则表达式  跳转到下一个匹配。用n或N在匹配间切换。
?正则表达式  跳转到上一个匹配。用n或N在匹配间切换。
(结合前面第5点,你也许注意到了,在指定范围时,使用跳转命令将指定一个从光标位置到跳转目标的区域)

12 书签
在普通模式下按 m<小写字母> 即可定义书签,按 `<字母> 则可跳转到某个书签的精确位置,按 ‘<字母>可跳转到某个书签所在行的行首(用来录制宏时比较有用)。最常用的自然是mm, mn, mj, mk, ml这几个顺手的键位。
真正的vim中的全局书签 m<大写字母> 在目前IdeaVim版本中不生效。需要定义全局书签可以使用Idea原本的 F11 + 数字 方式

13 文本替换
使用 :s/正则表达式/替换文本/ 可在本行内替换首次出现的匹配
使用 :s/正则表达式/替换文本/g 在本行内替换所有出现的匹配
使用 :%s/正则表达式/替换文本/g 在当前文件内替换所有出现的匹配

在可视模式下选中文本后,使用:'<,’>s/正则表达式/替换文本/g 命令可在选中区域中替换文本。其中'<,’>部分在可视模式下,按:冒号后自动加入,直接输入s命令即可。但有效区域只能以行为单位。真正Vim中的 \%V 标志在IdeaVim中不生效。

11 代码折叠
zo – 打开折叠
zc – 关闭折叠

14 宏定义
在IdeaVim中定义宏比Idea自带的宏功能要轻量许多。按在普通模式下 q<寄存器名称> 即可开始把后续按键序列录制到指定寄存器中(寄存器参考前面第7条)。录制完毕进入普通模式再按q键即可停止录制。之后用 @<寄存器名称> 即可重放。需要注意的是宏和复制粘贴共用一套寄存器,因此在录制宏时就注意不要把当前宏正在使用的寄存器用来复制了。寄存器内容是自动保存的,重启Idea仍然生效。但IdeaVim没有导出宏独立保存的功能。因此最好把用来保存宏的寄存器和用来复制粘贴的寄存器分开,不要同一个寄存器有时用来记录宏,有时用来复制粘贴。我的习惯是键盘左手区用来保存一些长期使用的宏(比如说我有一个宏专门用来把pom.xml中的版本号抽取到property区域,原来的位置则改用${property}引用)。右手区的hjklnm键用来保存一些临时宏。yuiop五个寄存器保留用来复制粘贴。如果录制的宏不涉及删除大段代码,寄存器1至9也可以用来进行复制粘贴。

执行一次宏后,可以用@@命令重复上一次执行的宏。

在Idea中录制宏时,如果触发了代码自动完成,在自动完成列表启动的状态输入的字符不会被记录。因此最好在Setting -> Code Completion -> Autopopup code completion中把延迟设为500ms以上或干脆关掉。在录制宏的过程中避免触发代码自动完成功能。

录制一些长期有效的宏时,开始录制后,最好先用0,^,T, F, $等命令把光标对齐到行首行末或某个特定起始位置(比如说用 F” 跳转到字符串的左边引号),再用一个f或/指令跳转到操作位置,这样的宏就不用必须把光标放在某个特定字符才能使用了。

15. 一些常用组合技
全选: ggvG
调换两个字符位置: xp
复制一行: yyp
调换两行位置: ddp
插入模式下到行尾继续输入(相当于End键): Ctrl+o A 或 Ctrl+[ A
插入模式下到行首继续输入(相当于Home键): Ctrl+o I 或 Ctrl+[ I
到类定义位置(适用于正确缩进的public,protected类) : ?^p回车

16. 一些在目前版本已知没有实现的一些常用Vim功能
(如果对Vim不熟悉可以跳过这节)
a)let命令 (没有let命令就无法导出/导入寄存器内容,也就是无法导入宏)
b):g命令 (在文本处理中很有用的一个命令,在编程中倒是不那么常用)
c)!命令 (执行shell命令)
d)大部分正则表达式标记 (例如 \%V, \v 等等)
e) 某些多键命令双击最后一个字符表示作用于当前行。例如在Vim中gUU可以把当前行转换为大写,在IdeaVim中无效,实现同样功能可以先用V命令选中当前行,再用gU转换为大写。
f)关于窗口操作的大部分命令 (Ctrl+w系列命令, :split等)
g)所有Vim脚本插件 (不过大部分可以用Idea自身的功能和插件来补偿)

zz from: http://kidneyball.iteye.com/blog/1828427

什么是知识图谱?

百度百科的定义:

知识图谱,也称为科学知识图谱,它通过将应用数学、图形学、信息可视化技术、信息科学等学科的理论与方法与计量学引文分析、共现分析等方法结合,并利用可视化的图谱形象地展示学科的核心结构、发展历史、前沿领域以及整体知识架构达到多学科融合目的的现代理论。为学科研究提供切实的、有价值的参考。

各公司技术现状

综述类

Google “Knowledge Graph”,主要提升PC和移动搜索结果质量,也用于其他产品线,行业至少涉及医疗、

微软 Satori

facebook,社交图谱

twitter,兴趣图谱

yahoo,“Yahoo Knowledge”

wikidata,以结构化的形式,向机器提供数据。All the data inside Wikidata is completely available, via REST APIs, data dumps, and as Linked Open Data in RDF.

搜狗,知立方

阿里,淘宝数据盛典

知识图谱架构

inside the knowledge graph (google)

How Google and Microsoft taught search to “understand” the Web

Build a small Knowledge Graph

searching and querying knowledge graphs with Solr/SIREn: A Reference Architecture

存储介质与数据结构

计算框架

google的pregel,是一个分布式图计算框架,不过多涉及存储。

spark graphx:

相关技术

在大型互联网项目中,服务、机器和网络的故障是很常见的。而由于一般使用普通服务器,硬件层面也没有什么可靠的容错机制,必须由软件应用层面予以解决。

这是一个典型的分层架构,PHP项目部署在多机房,并且对后端多个Services发起串行调用,这些services分属不同产品线,也都是分布式部署的。

以PHP项目的视角来看,网络交互的故障可能发生在service应用层、服务器、网络。service应用层面的故障可能由于对方代码、部署等问题造成,如果是功能性问题,我们是通过自动或手工的优雅降级解决。这里主要关注由于对方故障,导致的网络通信细节。

另外,需要注意的是,当前的多层架构,一般都引入了负载均衡中间层,或者zookeeper之类的可用性保障。zk不在本文讨论范围,如果使用中间层,在网络故障处理方面,可以简单地把它当成service。

以下的client端如无特殊说明,指PHP进程。

连接建立后的故障处理

service进程异常退出

如果仅是进程异常崩溃,所在服务器仍然可用的话,服务器内核会自动给所有已建立连接的peer端发送FIN分节,走类似正常关闭4步的流程。client端内核会响应以ACK,其read会读到EOF,一般会关闭连接以发送FIN,并等待ACK。

可以看到,如果client端没有等待到期望数据格式的响应,会摘除故障节点并retry或降级容错的话,那对client的业务流程是没有影响的。在retry的处理方式里,最多就是好使double了,也没有太大性能影响。

service服务器崩溃

假设这时的服务器崩溃,导致无法发出FIN分节。

如果client此时在write,是无法收到ACK的,内核会不断重试直至超时,可能有数分钟之久,然后返回ENETUNREACH之类的错误。

如果client此时在read,而且没有设置超时时间的话,那就永久block了。

所以,需要在PHP代码里合理的设置超时时间,例如通过stream_set_timeout等方法。

不论是否设置超时,可以看到这种服务器崩溃,对性能的影响还是比较大的,必须等到tcp或应用层的超时expire了之后,才可以继续下面的流程。

网络拥塞或路由器、交换机故障

这种情况与服务器崩溃的原理和影响都差不多。

但假如网络只是拥塞还不是完全不可用,而且client端又有超时、重试和故障摘除机制的话,可能影响更坏。想象下client端如果发现到某个service节点不可达了,它可能会重试几次,如果还不行就摘除并且切换到其他节点去,保障后面的交互是顺畅的。但假如内核层面或者应用层面的重试是断断续续成功的呢?那故障节点or网络路径还是会对外提供服务,却是质量受损的服务!

我厂hadoop client端里就是类似问题,是通过设置期望的网络读写速率来予以降低影响的。

连接建立过程中的故障处理

service服务器正常,但进程未启动

client发出SYN后,会很快接收到RST,从而得知故障。只要应用层有重试机制就没有影响。

service服务器故障或网络故障

client发出SYN后,很可能接收到的是路由不可达或主机故障,但协议规定的有重试机制,所以tcp层会持续重发几个SYN才会放弃,这时connect或fsockopen之类才会返回错误。所以,对服务质量有损,需要合理设置connect超时时间。

统筹故障处理

从上面可以看到,如果无法正常FIN或ACK,对服务质量是有损的,但从tcp协议层面又无法解决。而当前的多层服务器部署架构,经常是网状的,可以认为,故障服务、节点、链路会对上层多个节点和请求造成影响。

我们将client端的多个节点视为横向,多个请求视为纵向,期望是横纵都尽量降低影响。

所以,采用了:

  1. 单节点利用故障时间窗口,自动摘除和恢复底层故障节点
  2. 集群利用故障消息流,自动摘除底层故障service

这里是思路是,如果故障达到了阈值,那就从横纵两个方向扩散故障消息,告诉其他client端或其他请求不要使用故障服务。

开源大数据查询分析引擎现状

Dremel促使了实时计算系统的兴起,Pregel开辟了图数据计算这个新方向,Percolator使分布式增量索引更新成为文本检索领域的新标准,Spanner和F1向我们展现了跨数据中心数据库的可能

Dremel

Dremel: Interactive Analysis of WebScaleDatasets

Google Dremel 原理 – 如何能3秒分析1PB

Google BigQuery:https://cloud.google.com/bigquery/ dremel在google的最终实现

Apache Trevni : https://cloud.google.com/bigquery/ 对dremel中的列存储的实现的开源项目

Cloudera Impala : http://impala.io/ 很大一部分程度上借鉴了dremel query engine的设计思路

Apache Drill : http://drill.apache.org/ dremel开源项目

Open Dremel : http://code.google.com/p/dremel/ 山寨版dremel(从名字就能看出来)

Pregel

Pregel: A System for Large-Scale Graph Processing

图的分布式计算框架,不过多涉及图的存储。

Caffeine、Percolator

Large-scale Incremental Processing Using Distributed Transactions and Notifications

Spanner

Spanner: Google’s Globally-Distributed Database

F1

F1 – The Fault-Tolerant Distributed RDBMS Supporting Google’s Ad Business  (ppt)

F1: A Distributed SQL Database That Scales  (pdf)

在Google的第二波技术浪潮中,基于Hive和Dremel,新兴的大数据公司Cloudera开源了大数据查询分析引擎Impala,Hortonworks开源了Stinger,Fackbook开源了Presto。类似Pregel,UC Berkeley AMPLAB实验室开发了Spark图计算框架,并以Spark为核心开源了大数据查询分析引擎Shark

Storm

使用 Twitter Storm 处理实时的大数据

实时大数据分析

Real-Time Big Data Analytics: Emerging Architecture

大数据的实时分析与应用案例分享

大数据实时更新框架 (网易有道)

腾讯大数据之 –实时精准推荐

商业智能系统(BI系统)是利用数据分析技术来辅助商业决策的一套系统。它通常包括3种应用:Data Reporting、OLAP和Data Mining。这个大家可以在网上找到大量的材料,这里只做简要介绍。这3种应用内在本质呈现如下趋势:分析维度从少到多,计算复杂度从低到高,从以人为主转为以机器为主。

Data Reporting应用主要是那些静态报表,简单查询报表及Dashboard等,展现方式一般比较固定,使用频率比较高,要求响应比较及时,所以每个query涉及的维度会比较少,计算复杂度也较简单,但是并发要求一般比较高,同时要求响应及时。在Reporting上的分析主要以人为主,而计算机系统承担的Query复杂度比较简单,使用简单的数据库系统,甚至是NoSQL存储系统有时即可满足。

OLAP应用主要是指在线(交互式)数据分析,主要指多维度的adhoc分析。一般主要的操作是roll up、drill down和slice/dice。每个分析涉及的维度要多于Reporting,计算复杂度也就相应地提高了,但是并发要求不高,并且响应比报表的要求要低,在秒级。在OLAP上的分析主要以机器为主,人为辅,此时就要求底层是可以支持复杂查询的数据库系统,简单的存储系统已经无法胜任。

Data Mining主要指利用机器学习技术来对数据进行分类或者聚类等分析。分析涉及的维度远超于OLAP所涉及的维度数量,这也就造成分析的复杂度已经超越人类的极限,所以这类数据的分析基本上全部依赖机器运行相应的机器学习算法来完成。这种分析对响应要求不高。由于算法的复杂度异常高,所以对于底层支撑系统来说,更重要的是提供一个高性能的计算系统,对于存储系统基本没有太多要求。

统计类工具的实现原理

腾讯分析系统架构解析

Google Analytics为什么会这么快

Google Analytics & BigQuery: The Whys and Hows

商业智能系统

维度建模

星型模型和雪花模型

事实表与维度表

BIEE

物化视图

存储结构

RCFile: A Fast and Space-efficient Data Placement Structure in MapReduce-based Warehouse Systems

该项目基于Pyspark core,处理1.5TB input,期间涉及rightOuterJoin、reduceBy以及多媒体处理,执行耗时从最初的1hour+,到最终的10min+-。这里记录一些优化的方法。

调优一般3步走:

  • 找瓶颈,从各种log和统计数据,定位瓶颈在哪里,Job、stage、细节交互、function层层递推
  • 制定目标,通过对spark、业务、代码的理解,给出一个靠谱的目标
  • 优化
    • 针对stage,我们优化了Java GC、数据倾斜、并发度
    • 针对细节和function,我们调整shuffle等配置,合理设计代码逻辑,避免过多的网络IO和跨语言交互

数据本地化

Job的input来自HDFS,但由于公司的HDFS与Spark集群目前没有共享资源,所以无法做到local或rack local,只能尽量选择离Spark机房较近且专线的HDFS机房,避免网络延迟的过度干扰。

瓶颈定位方法

  1. driver端业务日志里,打印总耗时,所有的优化手段只有较大影响到该值,才认为是有效的
  2. 打开spark.python.profile,针对每个RDD分析python内部方法调用耗时
  3. 通过master UI界面(我们还是standalone mode),找到最耗时的stage、不均衡的task等
  4. executor stderr log,通过打开log4j的level = DEBUG,可以看到scala进程内部很多运行细节

提高并发度

一开始我们的输入是业务方提供的原始数据,HDFS block size=256MB,而整个文件才3.5GB,所以针对这个数据源只有14个并发任务,每个耗时数分钟。

通过将文件拆分为1000个小part,将并发提升至1000,每个耗时20s左右。(20s其实算短task,有可能稍微增大些性能反而更好)

同时,修改spark.cores.max=1000,使我们最多可以获得1000个可用cores,分布在64个executors上面(每个是16cores)。

由于中间有rightOuterJoin过程,我们也设置了numPartitions参数=1000(该值后续会优化)。

内存使用

pyspark场景,driver java进程、driver python进程、executor java进程、executor python进程都需要消耗内存。driver端还好,在我们当前的场景下,并没有使用太大内存。但executor端,python和scala/java都有shuffle需求,故需要分配足够内存。

我们的配置是:

 Python |  copy code |? 
1
spark.python.worker.memory 40g
2
spark.driver.memory 10G
3
spark.executor.memory 100G

其中,executor.memory代表java进程内存,fraction没有修改,使用默认的storage占60%、shuffle 20%、code 20%的比例。

但通过UI界面,发现某些tasks失败,通过speculation重试后又可以成功,出错日志里包含如下异常,重试几次失败后,task就退出了:

 Scala |  copy code |? 
1
OneForOneBlockFetcher: Failed while starting block fetches
2
     java.io.IOException: Connection from other−executor−ip closed

在other-executor-ip所在服务器看到如下异常,但稍后又可以正常提供服务:

 Scala |  copy code |? 
1
TransprotChannelHandler: Exception in connect from ip:port
2
    java.lang.OutOfMemory: requested array size exceeds VM limit

猜测other-executor-ip在提供shuffle output时内存不足,需GC释放足够后,才可以正常提供服务。故调整GC相关参数如下:

 Scala |  copy code |? 
1
          −−conf "spark.executor.extraJavaOptions=−XX:+UseParallelGC −XX:+UseParallelOldGC −XX:ParallelGCThreads=64 −XX:NewRatio=3 −XX:SurvivorRatio=3 −XX:+HeapDumpOnOutOfMemoryError −XX:HeapDumpPath=/home/spark/heapdump.bin −XX:+PrintHeapAtGC −XX:PermSize=256m −XX:MaxPermSize=256m  −XX:+PrintGCDetails −XX:+PrintGCTimeStamps −XX:+PrintGCDateStamps −XX:+PrintTenuringDistribution −XX:+PrintGCApplicationStoppedTime −XX:−OmitStackTraceInFastThrow  −verbose:gc −Xloggc:/tmp/spark.executor.gc.log −Dlog4j.configuration=ftp://.../tmp/chengyi02/log4j.properties"

调整后没有再看到相关错误。

数据倾斜

开始的时候,有一个数据输入源的文件分片较多(>1000),尝试过用默认的partition func对其repartition为1000个分片,结果导致了分片不均衡,有的分片256MB,有的只有几MB,这跟我们的数据key有关系。不repartition反而更好。

另外,在《spark 任务调优》也提到当出现少数慢tasks时的处理方法。

序列化方法与压缩

Kryo等优化方式,都是针对scala、java语言的。pyspark的数据本身就是在python进程里序列化后,才作为二进制流传递给scala进程的,所以开启kryo没有效果。

默认情况,spill、broadcast、shuffle等的compress都是开启的。另外,我们也将spark.rdd.compress设置为true了。

rightOuterJoin的numPartitions参数调整

rightjoin操作用来整合多个数据源的输入,产生最终被处理的数据(2TB+),并作为map-filter-reduceByKey stage的parent。将rightOuterJoin的numPartitions参数从1000下降为400,则reduceByKey stage的执行时间从7min下降到4min左右。

结论先放着:如果使用pyspark,产生shuffle操作时,可以尝试下numPartitions数量设置为cores.max的一半。(但由于我们的集群环境与其他任务混布,测试数据不稳定,无法给出确切结论)

查看executor的debug log发现,调整numPartitions前,单机并发16tasks;调整后,单机并发7tasks。每个scala task获取shuffle的耗时从2.3s左右,下降到500ms左右。但这个降幅,离分钟差太远,应该不是决定性的影响因素。

以优化后的index=0,TID=7802为例,分析其耗时:

 Javascript |  copy code |? 
1
15/06/01 13:52:50 INFO CoarseGrainedExecutorBackend: Got assigned task 7802
2
15/06/01 13:52:50 INFO Executor: Running task 0.0 in stage 6.0 (TID 7802)
3
15/06/01 13:52:54 DEBUG BlockStoreShuffleFetcher: Fetching outputs for shuffle 0, reduce 0
4
15/06/01 13:52:54 DEBUG BlockStoreShuffleFetcher: Fetching map output location for shuffle 0, reduce 0 took 514 ms
5
15/06/01 13:55:40 INFO PythonRDD: Times: total = 170763, boot = −43996, init = 48943, finish = 165816
6
15/06/01 13:55:40 INFO Executor: Finished task 0.0 in stage 6.0 (TID 7802)33362 bytes result sent to driver

可以看到scala的shuffle read耗时只有514ms,说明shuffle read本身在scala内部其实没有太大耗时。进一步分析python pstats输出,优化前,rdd_23.pstats耗时479039.008 seconds,平均tasks耗时=7.9min;优化后,66494.758 seconds,平均耗时=2.7min。

屏幕快照 2015-06-01 17.44.33

从上图可以看到,发生剧烈变化的就是{method ‘read’ of ‘file’ objects},相差约6倍!猜测的原因是,当并发度16时(实际有16个scala和16个python进程),由于单机cores=16,且物理内存也就100G+,故tasks间相互产生较大的竞争关系。而并发度7时,竞争降低,进程间通信的效率提高。

上图一个可能的疑问是,根据代码逻辑,每一条item都会调用feature、policy等方法,其调用次数是5kw,而cPickle.loads的调用次数只有1kw,不相符。如果调整numPartitions数目,会看到loads次数随之改变。这是由于pyspark.SparkContext初始化时,会生成一个批量序列化对象:

 Python |  copy code |? 
01
    def _do_init(self, master, appName, sparkHome, pyFiles, environment, batchSize, serializer,
02
                 conf, jsc):
03
        self.environment = environment or {}
04
        self._conf = conf or SparkConf(_jvm=self._jvm)
05
        self._batchSize = batchSize  # −1 represents an unlimited batch size
06
        self._unbatched_serializer = serializer
07
        if batchSize == 0:
08
            self.serializer = AutoBatchedSerializer(self._unbatched_serializer)
09
        else:
10
            self.serializer = BatchedSerializer(self._unbatched_serializer,
11
                                                batchSize)

通过查看不同numPartitions cPickle.loads 在该stage的总耗时,波动不大,故没有调优余地,无需关注。(虽然percall变化大,但那是由于batch造成的)

以下列出executor log中一些与性能相关的关键词:

 Python |  copy code |? 
1
15/06/01 10:55:44 INFO MemoryStore: ensureFreeSpace(62854) called with curMem=9319, maxMem=55082955571
2
15/06/01 10:58:20 DEBUG ShuffleBlockFetcherIterator: maxBytesInFlight: 50331648, targetRequestSize: 10066329
3
15/06/01 10:58:20 INFO ShuffleBlockFetcherIterator: Getting 7725 non−empty blocks out of 7747 blocks
4
15/06/01 11:04:59 INFO MemoryStore: Block rdd_27_89 stored as values in memory (estimated size 3.4 MB, free 51.3 GB)
5
15/06/01 10:56:00 INFO TorrentBroadcast: Reading broadcast variable 5 took 361 ms
6
15/06/01 10:57:56 INFO PythonRDD: Times: total = 9401, boot = −3692, init = 3791, finish = 9302
7
15/06/01 10:58:20 DEBUG BlockStoreShuffleFetcher: Fetching map output location for shuffle 0, reduce 627 took 2338 ms