Giraph 运行流程(一)

前言

本文主要分析了 Giraph1.3 SNAPSHOT 的 Job 提交和初始化的过程。其中 Job 提交部分的分析根据运行在 Standalone 模式下的 Hadoop 部分进行,分析仅涉及本地运行时执行的代码, 而初始化部分则主要根据集群模式进行分析。

示例 Job

该部分不属于源码,而是为了方便分析运行的一个示例 Job,Job 的具体配置和运行在 Giraph 编程实践及源码调试 一文中已经说明。通常情况下, Job 提交过程如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
GiraphConfiguration conf = new GiraphConfiguration(new Configuration());
//指定计算类
conf.setComputationClass(Shortestpath.class);
//设置输入和输出格式
conf.setVertexInputFormatClass(JsonLongDoubleFloatDoubleVertexInputFormat.class);
conf.setVertexOutputFormatClass(IdWithValueTextOutputFormat.class);
//设置本地运行模式,方便调试查看源码
conf.setLocalTestMode(true);
//设置 Worker 配置
conf.setWorkerConfiguration(1, 1, 100);
//本地模式下运行不分 Master 和 Worker
GiraphConstants.SPLIT_Master_Worker.set(conf, false);

GiraphJob job = new GiraphJob(conf, Shortestpath.class.getSimpleName());
//设置输入和输出路径
GiraphTextInputFormat.setVertexInputPath(conf, new Path(INPUT_PATH));
GiraphTextOutputFormat.setOutputPath(job.getInternalJob(), new Path(OUTPUT_PATH));
••••••
//向 Giraph 提交 Job
job.run(true);

首先指定一系列参数,然后调用 job.run(true) 向 Giraph 提交 Job

Giraph 向 Hadoop 提交 Job

Giraph 是基于 Hadoop 开发的,因此在向 Giraph 提交 Job 之后,Giraph 内部还会向 Hadoop 提交 Job。本部分主要分析 Giraph 如何向 Hadoop 提交 Job。首先查看 run 方法:

org.apache.giraph.job.GiraphJob#run

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
/**
* Runs the actual graph application through Hadoop Map-Reduce.
*
* @param verbose If true, provide verbose output, false otherwise
* @return True if success, false otherwise
* @throws ClassNotFoundException
* @throws InterruptedException
* @throws IOException
*/
public final boolean run(boolean verbose)
throws IOException, InterruptedException, ClassNotFoundException {
//更改 Job 的 counter 数量限制
setIntConfIfDefault("mapreduce.job.counters.limit", 512);

//设置 Giraph 中 Worker 或者 Master 内存上限
setIntConfIfDefault("mapred.job.map.memory.mb", 1024);
setIntConfIfDefault("mapred.job.reduce.memory.mb", 0);

// Speculative execution doesn't make sense for Giraph
giraphConfiguration.setBoolean(
"mapred.map.tasks.speculative.execution", false);

// Set the ping interval to 5 minutes instead of one minute
Client.setPingInterval(giraphConfiguration, 60000 * 5);

// 设置优先使用用户上传的 Jar 包的 class
giraphConfiguration.setBoolean("mapreduce.user.classpath.first", true);
giraphConfiguration.setBoolean("mapreduce.job.user.classpath.first", true);

//不做 Checkpoint 的时候最大尝试数为 1,为了让不能恢复的 Job 更快的结束
if (giraphConfiguration.getCheckpointFrequency() == 0) {
int oldMaxTaskAttempts = giraphConfiguration.getMaxTaskAttempts();
giraphConfiguration.setMaxTaskAttempts(1);

••••••
}


ImmutableClassesGiraphConfiguration conf =
new ImmutableClassesGiraphConfiguration(giraphConfiguration);
checkLocalJobRunnerConfiguration(conf);

int tryCount = 0;
//默认是 org.apache.giraph.job.DefaultGiraphJobRetryChecker
GiraphJobRetryChecker retryChecker = conf.getJobRetryChecker();
while (true) {
••••••

tryCount++;
//创建一个 Hadoop Job
Job submittedJob = new Job(conf, jobName);
if (submittedJob.getJar() == null) {
submittedJob.setJarByClass(getClass());
}
//Giraph 不需要执行 Reduce 任务
submittedJob.setNumReduceTasks(0);
//设置 Mapper
submittedJob.setMapperClass(GraphMapper.class);
//设置输入格式
submittedJob.setInputFormatClass(BspInputFormat.class);
//设置输出格式,默认情况是 org.apache.giraph.bsp.BspOutputFormat
submittedJob.setOutputFormatClass(
GiraphConstants.HADOOP_OUTPUT_FORMAT_CLASS.get(conf));
••••••
//提交 Job
submittedJob.submit();

••••••
//获取 Job 运行结果
boolean passed = submittedJob.waitForCompletion(verbose);

••••••

//如果运行失败则会尝试重启 Job
if (!passed) {
//默认情况(指没有指定 JobRetryChecker 情况)返回 null,即永远不会重启 Job
String restartFrom = retryChecker.shouldRestartCheckpoint(submittedJob);
if (restartFrom != null) {
GiraphConstants.RESTART_JOB_ID.set(conf, restartFrom);
continue;
}
}

//如果 Job 运行成功或者失败情况下不尝试重新运行(默认情况下永远不会尝试尝试重新运行)
if (passed || !retryChecker.shouldRetry(submittedJob, tryCount)) {
return passed;
}
•••••••
}
}

run 方法中首先会对 Hadoop 和 Giraph 进行配置,然后创建一个 Hadoop Job 对象。在设置好 Hadoop Job 的 MapperClass 和输入输出格式等相关信息后,即会调用 submit 向 Hadoop 提交 Job。从代码中可以看到整个过程与提交普通的 Hadoop Job 基本无异。

Hadoop 内部运行

在 Giraph 调用 submit 向 Hadoop 提交 Job 之后,程序的运行就会进入到 Hadoop 内部,对于该部分主要需要了解 Hadoop 如何启动 Giraph 的 MapTask。

内部提交 Job

org.apache.hadoop.mapreduce.Job#submit

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
public void submit() throws IOException, InterruptedException, ClassNotFoundException {
ensureState(JobState.DEFINE);
//设置用新的 API
setUseNewAPI();
connect();
final JobSubmitter submitter =
getJobSubmitter(cluster.getFileSystem(), cluster.getClient());
//提交 Job 到系统
status = ugi.doAs(new PrivilegedExceptionAction<JobStatus>() {
public JobStatus run() throws IOException, InterruptedException,
ClassNotFoundException {
return submitter.submitJobInternal(Job.this, cluster);
}
});
state = JobState.RUNNING;
••••••
}

submit 方法内部会创建 JobSubmitter 对象,然后通过 submitJobInternal 方法进一步提交 Job。

org.apache.hadoop.mapreduce.JobSubmitter#submitJobInternal

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
JobStatus submitJobInternal(Job job, Cluster cluster) 
throws ClassNotFoundException, InterruptedException, IOException {

••••••

Configuration conf = job.getConfiguration();
addMRFrameworkToDistributedCache(conf);

//获得暂存目录, 默认情况下路径生成在 /tmp/hadoop/mapred/staging 下
Path jobStagingArea = JobSubmissionFiles.getStagingDir(cluster, conf);
••••••
//生成 Job ID
JobID jobId = submitClient.getNewJobID();
//设置 Job ID
job.setJobID(jobId);
//获得提交 Job 的目录
Path submitJobDir = new Path(jobStagingArea, jobId.toString());
JobStatus status = null;
••••••

••••••
//实际提交 Job
status = submitClient.submitJob(
jobId, submitJobDir.toString(), job.getCredentials());
••••••
}

在 submitJobInternal 中,Hadoop 会通过 submitClient 实际提交 Job。submitClient 是一个 ClientProtocol 接口,其有两个实现,由于提交 Job 的时候 Hadoop 运行在 Standalone 模式下,所以这里 submitClient 的实现是 LocalJobRunner。

启动 MapTask

org.apache.hadoop.mapred.LocalJobRunner#submitJob

1
2
3
4
5
6
7
public org.apache.hadoop.mapreduce.JobStatus submitJob(
org.apache.hadoop.mapreduce.JobID jobid, String jobSubmitDir,
Credentials credentials) throws IOException {
Job job = new Job(JobID.downgrade(jobid), jobSubmitDir);
job.job.setCredentials(credentials);
return job.status;
}

org.apache.hadoop.mapred.LocalJobRunner.Job#Job

1
2
3
4
5
public Job(JobID jobid, String jobSubmitDir) throws IOException {
••••••

this.start();
}

submitJob 内部会创建一个 Job 对象,这里的 Job 是继承了 Thread 的 LocalJobRunner 的内部类。通过构造方法可以知道,submitJob 在创建 Job 的同时也开启了线程,所以需要查看 Job#run 方法。

org.apache.hadoop.mapred.LocalJobRunner.Job#run

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
@Override
public void run() {
JobID jobId = profile.getJobID();
JobContext jContext = new JobContextImpl(job, jobId);

••••••

Map<TaskAttemptID, MapOutputFile> mapOutputFiles =
Collections.synchronizedMap(new HashMap<TaskAttemptID, MapOutputFile>());

//获取需要执行的任务
List<RunnableWithThrowable> mapRunnables = getMapTaskRunnables(
taskSplitMetaInfos, jobId, mapOutputFiles);

initCounters(mapRunnables.size(), numReduceTasks);
ExecutorService mapService = createMapExecutor();
//运行任务
runTasks(mapRunnables, mapService, "map");

••••••
// delete the temporary directory in output directory
outputCommitter.commitJob(jContext);
status.setCleanupProgress(1.0f);

••••••
}

org.apache.hadoop.mapred.LocalJobRunner.Job#getMapTaskRunnables

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
protected List<RunnableWithThrowable> getMapTaskRunnables(
TaskSplitMetaInfo [] taskInfo, JobID jobId,
Map<TaskAttemptID, MapOutputFile> mapOutputFiles) {

int numTasks = 0;
ArrayList<RunnableWithThrowable> list =
new ArrayList<RunnableWithThrowable>();
//生成对应数量的 MapTaskRunnable
for (TaskSplitMetaInfo task : taskInfo) {
list.add(new MapTaskRunnable(task, numTasks++, jobId,
mapOutputFiles));
}

return list;
}

org.apache.hadoop.mapred.LocalJobRunner.Job#runTasks

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
private void runTasks(List<RunnableWithThrowable> runnables,
ExecutorService service, String taskType) throws Exception {
//提交任务
for (Runnable r : runnables) {
service.submit(r);
}

try {
service.shutdown(); // Instructs queue to drain.

// Wait for tasks to finish; do not use a time-based timeout.
// (See http://bugs.sun.com/bugdatabase/view_bug.do?bug_id=6179024)
LOG.info("Waiting for " + taskType + " tasks");
service.awaitTermination(Long.MAX_VALUE, TimeUnit.NANOSECONDS);
} catch (InterruptedException ie) {
// Cancel all threads.
service.shutdownNow();
throw ie;
}
••••••
}

对于 Job#run 方法应该主要关注 MapTaskRunnable 的生成和执行,可以看到 Hadoop 会通过 getMapTaskRunnables 方法根据分配的 Task 的数量生成对应数量的 MapTaskRunnable,然后会调用 runTasks 方法向线程池提交任务。

MapTaskRunnable 任务提交到线程池后继续关注 MapTaskRunnable#run 方法

org.apache.hadoop.mapred.LocalJobRunner.Job.MapTaskRunnable#run

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
public void run() {
try {
••••••
MapTask map = new MapTask(systemJobFile.toString(), mapId, taskId,
info.getSplitIndex(), 1);
••••••
try {
••••••
map.run(localConf, Job.this);
••••••
} catch (Throwable e) {
this.storedException = e;
}
}
}

从 MapTaskRunnable#run 中可以看到其创建了一个 MapTask 对象,并调用了 MapTask#run 方法。

org.apache.hadoop.mapred.MapTask#run

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
@Override
public void run(final JobConf job, final TaskUmbilicalProtocol umbilical)
throws IOException, ClassNotFoundException, InterruptedException {

••••••
//org.apache.hadoop.mapreduce.Job#submit 设置了 useNewApi,所以返回 true
boolean useNewApi = job.getUseNewMapper();

••••••

if (useNewApi) {
runNewMapper(job, splitMetaInfo, umbilical, reporter);
} else {
runOldMapper(job, splitMetaInfo, umbilical, reporter);
}
done(umbilical, reporter);
}

MapTask#run 中会调用 runNewMapper 方法,所以继续查看该方法

org.apache.hadoop.mapred.MapTask#runNewMapper

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
private <INKEY,INVALUE,OUTKEY,OUTVALUE> void runNewMapper(final JobConf job,
final TaskSplitIndex splitIndex,
final TaskUmbilicalProtocol umbilical,
TaskReporter reporter
) throws IOException, ClassNotFoundException,
InterruptedException {
// make a task context so we can get the classes
org.apache.hadoop.mapreduce.TaskAttemptContext taskContext =
new org.apache.hadoop.mapreduce.task.TaskAttemptContextImpl(job,
getTaskID(),
reporter);
// 反射获取设置的 MapperClass 实例对象
org.apache.hadoop.mapreduce.Mapper<INKEY,INVALUE,OUTKEY,OUTVALUE> mapper =
(org.apache.hadoop.mapreduce.Mapper<INKEY,INVALUE,OUTKEY,OUTVALUE>)
ReflectionUtils.newInstance(taskContext.getMapperClass(), job);

••••••

//创建 Context
org.apache.hadoop.mapreduce.MapContext<INKEY, INVALUE, OUTKEY, OUTVALUE>
mapContext =
new MapContextImpl<INKEY, INVALUE, OUTKEY, OUTVALUE>(job, getTaskID(),
input, output,
committer,
reporter, split);

org.apache.hadoop.mapreduce.Mapper<INKEY,INVALUE,OUTKEY,OUTVALUE>.Context
mapperContext =
new WrappedMapper<INKEY, INVALUE, OUTKEY, OUTVALUE>().getMapContext(
mapContext);

try {
••••••
mapper.run(mapperContext);
••••••
} finally {
••••••
}
}

MapTask#runNewMapper 方法中会通过反射创建设置的 MapperClass 的对象,即 org.apache.giraph.job.GiraphJob#run 中设定的 GraphMapper 类的对象。在获取到 GraphMapper 对象后,系统会调用其 run 方法,从而使得程序的执行进入到 Giraph 部分。

Giraph 执行 Job

org.apache.giraph.graph.GraphMapper#run

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
@Override
public void run(Context context) throws IOException, InterruptedException {
// Notify the master quicker if there is Worker failure rather than
// waiting for ZooKeeper to timeout and delete the ephemeral znodes
try {
//初始化
setup(context);
//执行计算
while (context.nextKeyValue()) {
graphTaskManager.execute();
}
//清理
cleanup(context);
} catch (RuntimeException e) {
••••••
}
}

从 GraphMapper#run 方法可以看到一个 Giraph Job 的执行能够分为三个过程:

  • 初始化
  • 执行计算
  • 清理

下面针对初始化过程进行分析

初始化

org.apache.giraph.graph.GraphMapper#setup

1
2
3
4
5
6
7
8
9
@Override
public void setup(Context context)
throws IOException, InterruptedException {
// Execute all Giraph-related role(s) assigned to this compute node.
// Roles can include "Master," "Worker," "zookeeper," or . . . ?
graphTaskManager = new GraphTaskManager<I, V, E>(context);
graphTaskManager.setup(
DistributedCache.getLocalCacheArchives(context.getConfiguration()));
}

GraphMapper#setup 方法中会创建 GraphTaskManager 对象,并调用其 setup 方法

org.apache.giraph.graph.GraphTaskManager#setup

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
public void setup(Path[] zkPathList) throws IOException, InterruptedException {
Configuration hadoopConf = context.getConfiguration();
//初始化一些配置
conf = new ImmutableClassesGiraphConfiguration<I, V, E>(hadoopConf);
••••••
//从配置中读取 Zookeeper 的连接信息,没有提供外部 Zookeeper 情况下为空
String serverPortList = conf.getZookeeperList();
//如果没有提供外部 Zookeeper 则需要自己启动 Zookeeper
if (serverPortList.isEmpty()) {
if (startZooKeeperManager()) {
return; // ZK connect/startup failed
}
} else {
createZooKeeperCounter(serverPortList);
}
••••••
this.graphFunctions = determineGraphFunctions(conf, zkManager);
if (zkManager != null && this.graphFunctions.isMaster()) {
//将由 Master 创建的文件夹标记为删除,文件系统关闭时将会删除文件
zkManager.cleanupOnExit();
}
try {
//初始化 BSP 服务
instantiateBspService();
} catch (IOException e) {
••••••
}
}

GraphTaskManager#setup 方法主要做三件事:

  • 获取 Zookeeper 连接信息
  • 决定进程的角色
  • 初始化 BSP 服务

获取 Zookeeper 连接信息

GraphTaskManager#setup 中会通过 conf.getZookeeperList() 获得 Zookeeper 的连接信息。如果提供了外部 Zookeeper 则直接返回连接信息,但如果没有提供外部 Zookeeper 时,getZookeeperList() 会返回空值。此时 GraphTaskManager#setup 会调用 startZooKeeperManager 方法在某一个 Task 启动 Zookeeper。

org.apache.giraph.graph.GraphTaskManager#startZooKeeperManager

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
/**
* Instantiate and configure ZooKeeperManager for this job. This will
* result in a Giraph-owned Zookeeper instance, a connection to an
* existing quorum as specified in the job configuration, or task failure
* @return true if this task should terminate
*/
private boolean startZooKeeperManager() throws IOException, InterruptedException {
zkManager = new ZooKeeperManager(context, conf);
context.setStatus("setup: Setting up Zookeeper manager.");
zkManager.setup();
//如果计算已经结束则不需要再启动 Zookeeper,
//大部分情况应该会在没有提供外部 Zookeeper 且重启 Task 时候起作用
if (zkManager.computationDone()) {
done = true;
return true;
}
zkManager.onlineZooKeeperServer();
//更新 Zookeeper 连接信息,创建计数器
String serverPortList = zkManager.getZooKeeperServerPortString();
conf.setZookeeperList(serverPortList);
createZooKeeperCounter(serverPortList);
return false;
}

startZooKeeperManager 中首先会创建 ZooKeeperManager 对象,然后调用其 setup 方法

org.apache.giraph.zk.ZooKeeperManager#setup

1
2
3
4
public void setup() throws IOException, InterruptedException {
createCandidateStamp();
getZooKeeperServerList();
}

ZooKeeperManager#setup 方法中会首先调用 createCandidateStamp 方法

org.apache.giraph.zk.ZooKeeperManager#createCandidateStamp

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
/**
* Create a HDFS stamp for this task. If another task already
* created it, then this one will fail, which is fine.
*/
public void createCandidateStamp() {
••••••
fs.mkdirs(baseDirectory);
••••••
fs.mkdirs(serverDirectory);
••••••
if (!fs.getFileStatus(baseDirectory).isDir()) {
throw new IllegalArgumentException(
"createCandidateStamp: " + baseDirectory +
" is not a directory, but should be.");
}

••••••
//根据 hostname 和 taskPartition 生成文件名
Path myCandidacyPath = new Path(
taskDirectory, myHostname +
HOSTNAME_TASK_SEPARATOR + taskPartition);
try {
••••••
fs.createNewFile(myCandidacyPath);
} catch (IOException e) {
LOG.error("createCandidateStamp: Failed (maybe previous task " +
"failed) to create filestamp " + myCandidacyPath, e);
}
}

在 createCandidateStamp 方法中,每个 Task 会根据自己的 hostname 和 taskPartition 在 _bsp/_defaultZkManagerDir/_task 下创建对应文件,这些文件将会在系统选择某个 Task 启动 Zookeeper 服务时用到。具体结果如下图所示:

图中 hostname 是 localhost 的原因在于,运行源码的时候 Hadoop 处于 Standalone 模式。

在 createCandidateStamp 执行完成之后,ZooKeeperManager#setup 会接着调用 getZooKeeperServerList

org.apache.giraph.zk.ZooKeeperManager#getZooKeeperServerList

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
private void getZooKeeperServerList() throws IOException,
InterruptedException {
String serverListFile;

//taskPartition 为 0 的 Task 会创建 zooKeeperServerList
if (taskPartition == 0) {
//0 号 Task 如果重启检查到已经有 serverList 则不会重新创建
serverListFile = getServerListFile();
if (serverListFile == null) {
//创建 serverList
createZooKeeperServerList();
}
}

while (true) {
//其余 Task 等待 serverList 的创建
serverListFile = getServerListFile();
••••••
if (serverListFile != null) {
break;
}
//减少 CPU 的占用
try {
Thread.sleep(pollMsecs);
} catch (InterruptedException e) {
LOG.warn("getZooKeeperServerList: Strange interrupted " +
"exception " + e.getMessage());
}

}

//解析 serverList 中的信息
String[] serverHostList = serverListFile.substring(
ZOOKEEPER_SERVER_LIST_FILE_PREFIX.length()).split(
HOSTNAME_TASK_SEPARATOR);
••••••

//获得 Zookeeper 服务所在节点的 hostname
zkServerHost = serverHostList[0];
//获得应该启动 Zookeeper 服务的 Task 的 taskPartition
zkServerTask = Integer.parseInt(serverHostList[1]);

//各个 Task 更新自己的 zkServerPortString
updateZkPortString();
}

getZooKeeperServerList 方法会根据 taskPartition 进行判断,如果是 0 号 Task 则会先调用 createZooKeeperServerList 创建 serverListFile(serverListFile 表明了 Zookeeper 服务所在的 hostname 和 taskPartition),而如果是非 0 号 Task 则会进行轮询来获取 serverListFile 的文件名。在获取到文件名后会对其进行解析来更新 zkServerHost、zkServerTask 以及 zkServerPortString。

接下来会对 createZooKeeperServerList 和 getZooKeeperServerList 进行分析以便更好的理解系统如何选取启动 Zookeeper 服务的 Task

  • org.apache.giraph.zk.ZooKeeperManager#createZooKeeperServerList

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    private void createZooKeeperServerList() throws IOException, InterruptedException {
    String host;
    String task;
    while (true) {
    //返回 Task 下文件的元数据,会有一个文件名格式校验的过程,会去掉以 . 开头和 crc 结尾文件
    FileStatus [] fileStatusArray = fs.listStatus(taskDirectory);
    if (fileStatusArray.length > 0) {
    //选取第一位的元数据标识的 Task 去启动 Zookeeper 服务
    FileStatus fileStatus = fileStatusArray[0];
    //解析信息
    String[] hostnameTaskArray =
    fileStatus.getPath().getName().split(
    HOSTNAME_TASK_SEPARATOR);
    ••••••
    host = hostnameTaskArray[0];
    task = hostnameTaskArray[1];
    break;
    }
    Thread.sleep(pollMsecs);
    }
    //根据解析的信息生成 serverListFile 文件名
    String serverListFile =
    ZOOKEEPER_SERVER_LIST_FILE_PREFIX + host +
    HOSTNAME_TASK_SEPARATOR + task;
    Path serverListPath =
    new Path(baseDirectory, serverListFile);
    ••••••
    }
    //创建文件
    fs.createNewFile(serverListPath);
    }

    createZooKeeperServerList 中会获取所有 Task 在 createCandidateStamp 方法中创建的文件的文件名,然后选取返回数组中第一个元素标识的 Task 信息去创建 serverListFile。

  • org.apache.giraph.zk.ZooKeeperManager#getServerListFile

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    private String getServerListFile() throws IOException {
    String serverListFile = null;
    //baseDirectory 是 _bsp/_defaultZkManagerDir,列出文件夹下的文件元数据
    FileStatus [] fileStatusArray = fs.listStatus(baseDirectory);
    for (FileStatus fileStatus : fileStatusArray) {
    //筛选文件名中含有 zkServerList_ 的文件,即 taskpartition 为 0 task 创建的 serverListFile
    if (fileStatus.getPath().getName().startsWith(
    ZOOKEEPER_SERVER_LIST_FILE_PREFIX)) {
    serverListFile = fileStatus.getPath().getName();
    break;
    }
    }
    return serverListFile;
    }

    getServerListFile 会获取 baseDirectory 下的文件元数据,然后筛选出对应的 serverListFile,最后返回其文件名。

接着回到 startZooKeeperManager 方法中,在选定了启动 Zookeeper 服务的 Task 后,系统会首先判断计算是否完成,如果已经完成则表明无需再继续运行。否则会调用 onlineZooKeeperServer 方法启动 Zookeeper 服务。

org.apache.giraph.zk.ZooKeeperManager#onlineZooKeeperServer

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
public void onlineZooKeeperServer() throws IOException {
//如果当前 task 的 taskPartition 等于 zkServerTask,则需要启动 Zookeeper 服务
if (zkServerTask == taskPartition) {
File zkDirFile = new File(this.zkDir);
try {
//删除旧的文件夹
••••••
FileUtils.deleteDirectory(zkDirFile);
} catch (IOException e) {
••••••
}
//生成 Zookeeper 配置
generateZooKeeperConfig();
synchronized (this) {
zkRunner = createRunner();
//启动 Zookeeper 服务
int port = zkRunner.start(zkDir, config);
if (port > 0) {
zkBasePort = port;
updateZkPortString();
}
}

// Once the server is up and running, notify that this server is up
// and running by dropping a ready stamp.
int connectAttempts = 0;
final int maxConnectAttempts =
conf.getZookeeperConnectionAttempts();
while (connectAttempts < maxConnectAttempts) {
try {
••••••
//连接 Zookeeper 服务
InetSocketAddress zkServerAddress =
new InetSocketAddress(myHostname, zkBasePort);
Socket testServerSock = new Socket();
testServerSock.connect(zkServerAddress, 5000);
••••••
break;
} catch (SocketTimeoutException e) {
LOG.warn("onlineZooKeeperServers: Got " +
"SocketTimeoutException", e);
} catch (ConnectException e) {
LOG.warn("onlineZooKeeperServers: Got " +
"ConnectException", e);
} catch (IOException e) {
LOG.warn("onlineZooKeeperServers: Got " +
"IOException", e);
}

++connectAttempts;
try {
Thread.sleep(pollMsecs);
} catch (InterruptedException e) {
LOG.warn("onlineZooKeeperServers: Sleep of " + pollMsecs +
" interrupted - " + e.getMessage());
}
}
//超过最大的尝试数,连接失败
if (connectAttempts == maxConnectAttempts) {
throw new IllegalStateException(
"onlineZooKeeperServers: Failed to connect in " +
connectAttempts + " tries!");
}
//
Path myReadyPath = new Path(
serverDirectory, myHostname +
HOSTNAME_TASK_SEPARATOR + taskPartition +
HOSTNAME_TASK_SEPARATOR + zkBasePort);
try {
••••••
//创建文件表明 Zookeeper 服务已经准备好,并且提供连接的信息
fs.createNewFile(myReadyPath);
} catch (IOException e) {
••••••
}
} else {
//其余 Task 等待 Zookeeper 服务的启动
int readyRetrievalAttempt = 0;
String foundServer = null;
while (true) {
try {
FileStatus [] fileStatusArray =
fs.listStatus(serverDirectory);
//检查 serverDirectory 文件夹下是否生成了 Zookeeper 连接信息文件
if ((fileStatusArray != null) &&
(fileStatusArray.length > 0)) {
//解析文件中的连接信息
for (int i = 0; i < fileStatusArray.length; ++i) {
String[] hostnameTaskArray =
fileStatusArray[i].getPath().getName().split(
HOSTNAME_TASK_SEPARATOR);
if (hostnameTaskArray.length != 3) {
throw new RuntimeException(
"getZooKeeperServerList: Task 0 failed " +
"to parse " +
fileStatusArray[i].getPath().getName());
}
//zookeeper 服务所在地址
foundServer = hostnameTaskArray[0];
//zookeeper 服务的连接端口
zkBasePort = Integer.parseInt(hostnameTaskArray[2]);
//更新 zookeeper 的连接信息
updateZkPortString();
}
••••••
//查看 hostname 是否相同,相同则跳出等待,具体场景尚未想到
if (zkServerHost.equals(foundServer)) {
break;
}
} else {
••••••
}
Thread.sleep(pollMsecs);
++readyRetrievalAttempt;
} catch (IOException e) {
throw new RuntimeException(e);
} catch (InterruptedException e) {
••••••
}
}
}
}

可以看到 onlineZooKeeperServer 实际做了两件事情:一是在选定的 Task 上启动 Zookeeper 服务,并创建文件表明服务已经准备好。二是所有未启动 Zookeeper 服务的 Task 去更新 Zookeeper 的连接信息。

分配角色

在启动完成 Zookeeper 服务之后系统会更新 Zookeeper 相关的配置信息然后返回到 org.apache.giraph.graph.GraphTaskManager#setup 方法中,之后会调用 determineGraphFunctions

org.apache.giraph.graph.GraphTaskManager#determineGraphFunctions

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
private static GraphFunctions determineGraphFunctions(
ImmutableClassesGiraphConfiguration conf,
ZooKeeperManager zkManager) {
//判断是本地模式还是集群模式,本地模式只会启动一个 Task
boolean splitMasterWorker = conf.getSplitMasterWorker();
//获取当前 Task 的 taskPartition
int taskPartition = conf.getTaskPartition();
//判断是否提供了外部的 Zookeeper
boolean zkAlreadyProvided = conf.isZookeeperExternal();
//初始时刻 Task 的角色
GraphFunctions functions = GraphFunctions.UNKNOWN;

if (!splitMasterWorker) {
//本地模式下如果是内部启动 Zookeeper 则 Task 充当所有的角色,否则充当 Master 和 Worker
if ((zkManager != null) && zkManager.runsZooKeeper()) {
functions = GraphFunctions.ALL;
} else {
functions = GraphFunctions.ALL_EXCEPT_ZOOKEEPER;
}
} else {
if (zkAlreadyProvided) {
//如果有外部 Zookeeper 则 0 号 Task 就是 Master,其余的都是 Worker
if (taskPartition == 0) {
functions = GraphFunctions.Master_ONLY;
} else {
functions = GraphFunctions.Worker_ONLY;
}
} else {
//如果是内部启动的 Zookeeper 服务,
//则启动 Zookeeper 服务的 Task 充当 Master 和 zookeeper 角色,其余为 Worker
if ((zkManager != null) && zkManager.runsZooKeeper()) {
functions = GraphFunctions.Master_ZOOKEEPER_ONLY;
} else {
functions = GraphFunctions.Worker_ONLY;
}
}
}
return functions;
}

determineGraphFunctions 主要是对 Task 的角色进行判断,系统提供了 6 种角色:

  • UNKNOWN

    表明 Task 的角色还未知

  • Master_ONLY

    表明 Task 是 Master

  • Master_ZOOKEEPER_ONLY

    表明 Task 既是 Master 也是 Zookeeper

  • Worker_ONLY

    表明 Task 只是 Worker

  • ALL

    表明 Task 既是 Master 也是 Worker 和 Zookeeper

  • ALL_EXCEPT_ZOOKEEPER

    表明 Task 既是 Master 也是 Worker

初始化 BSP

在决定各个 Task 的角色之后,系统会调用 instantiateBspService 初始化 BSP 服务。

org.apache.giraph.graph.GraphTaskManager#instantiateBspService

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
private void instantiateBspService()
throws IOException, InterruptedException {
if (graphFunctions.isMaster()) {
••••••
//创建 Master 对象
serviceMaster = new BspServiceMaster<I, V, E>(context, this);
//Master 运行在线程里面
MasterThread = new MasterThread<I, V, E>(serviceMaster, context);
MasterThread.setUncaughtExceptionHandler(
createUncaughtExceptionHandler());
MasterThread.start();
}
if (graphFunctions.isWorker()) {
••••••
//创建 Worker 对象
serviceWorker = new BspServiceWorker<I, V, E>(context, this);
installGCMonitoring();
••••••
}
}

instantiateBspService 中对于 Master 主要是创建 serviceMaster 对象,然后启动 MasterThread 线程,对于 Worker 则是创建 serviceWorker 对象。

总结

总的来说,Giraph 的 Job 提交和初始化依据以下流程来执行:

  1. 用户向 Giraph 提交 Job
  2. Giraph 向 Hadoop 提交 Job
  3. Hadoop 启动 MapTask,并执行 GraphMapper 的 run 方法
  4. GraphMapper 创建 GraphTaskManager 对象进行初始化
  5. 初始化过程首先获取 Zookeeper 连接信息,如果没有外置 Zookeeper 则需要从所有 MapTask 中进行选取 Task 来启动 Zookeeper 服务。
  6. 获取到 Zookeeper 连接信息之后会根据 determineGraphFunctions 分配角色,由此区分 MapTask 中的 Master 和 Worker
  7. 分配完角色之后则会通过 instantiateBspService 来初始化 BSP 服务,由此结束整个初始化过程。