前言
本文主要分析了 Giraph1.3 SNAPSHOT 的 Job 提交和初始化的过程。其中 Job 提交部分的分析根据运行在 Standalone 模式下的 Hadoop 部分进行,分析仅涉及本地运行时执行的代码, 而初始化部分则主要根据集群模式进行分析。
示例 Job
该部分不属于源码,而是为了方便分析运行的一个示例 Job,Job 的具体配置和运行在 Giraph 编程实践及源码调试 一文中已经说明。通常情况下, Job 提交过程如下:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 GiraphConfiguration conf = new GiraphConfiguration (new Configuration ());conf.setComputationClass(Shortestpath.class); conf.setVertexInputFormatClass(JsonLongDoubleFloatDoubleVertexInputFormat.class); conf.setVertexOutputFormatClass(IdWithValueTextOutputFormat.class); conf.setLocalTestMode(true ); conf.setWorkerConfiguration(1 , 1 , 100 ); GiraphConstants.SPLIT_Master_Worker.set(conf, false ); GiraphJob job = new GiraphJob (conf, Shortestpath.class.getSimpleName());GiraphTextInputFormat.setVertexInputPath(conf, new Path (INPUT_PATH)); GiraphTextOutputFormat.setOutputPath(job.getInternalJob(), new Path (OUTPUT_PATH)); •••••• job.run(true );
首先指定一系列参数,然后调用 job.run(true) 向 Giraph 提交 Job
Giraph 向 Hadoop 提交 Job
Giraph 是基于 Hadoop 开发的,因此在向 Giraph 提交 Job 之后,Giraph 内部还会向 Hadoop 提交 Job。本部分主要分析 Giraph 如何向 Hadoop 提交 Job。首先查看 run 方法:
org.apache.giraph.job.GiraphJob#run
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 public final boolean run (boolean verbose) throws IOException, InterruptedException, ClassNotFoundException { setIntConfIfDefault("mapreduce.job.counters.limit" , 512 ); setIntConfIfDefault("mapred.job.map.memory.mb" , 1024 ); setIntConfIfDefault("mapred.job.reduce.memory.mb" , 0 ); giraphConfiguration.setBoolean( "mapred.map.tasks.speculative.execution" , false ); Client.setPingInterval(giraphConfiguration, 60000 * 5 ); giraphConfiguration.setBoolean("mapreduce.user.classpath.first" , true ); giraphConfiguration.setBoolean("mapreduce.job.user.classpath.first" , true ); if (giraphConfiguration.getCheckpointFrequency() == 0 ) { int oldMaxTaskAttempts = giraphConfiguration.getMaxTaskAttempts(); giraphConfiguration.setMaxTaskAttempts(1 ); •••••• } ImmutableClassesGiraphConfiguration conf = new ImmutableClassesGiraphConfiguration (giraphConfiguration); checkLocalJobRunnerConfiguration(conf); int tryCount = 0 ; GiraphJobRetryChecker retryChecker = conf.getJobRetryChecker(); while (true ) { •••••• tryCount++; Job submittedJob = new Job (conf, jobName); if (submittedJob.getJar() == null ) { submittedJob.setJarByClass(getClass()); } submittedJob.setNumReduceTasks(0 ); submittedJob.setMapperClass(GraphMapper.class); submittedJob.setInputFormatClass(BspInputFormat.class); submittedJob.setOutputFormatClass( GiraphConstants.HADOOP_OUTPUT_FORMAT_CLASS.get(conf)); •••••• submittedJob.submit(); •••••• boolean passed = submittedJob.waitForCompletion(verbose); •••••• if (!passed) { String restartFrom = retryChecker.shouldRestartCheckpoint(submittedJob); if (restartFrom != null ) { GiraphConstants.RESTART_JOB_ID.set(conf, restartFrom); continue ; } } if (passed || !retryChecker.shouldRetry(submittedJob, tryCount)) { return passed; } ••••••• } }
run 方法中首先会对 Hadoop 和 Giraph 进行配置,然后创建一个 Hadoop Job 对象。在设置好 Hadoop Job 的 MapperClass 和输入输出格式等相关信息后,即会调用 submit 向 Hadoop 提交 Job。从代码中可以看到整个过程与提交普通的 Hadoop Job 基本无异。
Hadoop 内部运行
在 Giraph 调用 submit 向 Hadoop 提交 Job 之后,程序的运行就会进入到 Hadoop 内部,对于该部分主要需要了解 Hadoop 如何启动 Giraph 的 MapTask。
内部提交 Job
org.apache.hadoop.mapreduce.Job#submit
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 public void submit () throws IOException, InterruptedException, ClassNotFoundException { ensureState(JobState.DEFINE); setUseNewAPI(); connect(); final JobSubmitter submitter = getJobSubmitter(cluster.getFileSystem(), cluster.getClient()); status = ugi.doAs(new PrivilegedExceptionAction <JobStatus>() { public JobStatus run () throws IOException, InterruptedException, ClassNotFoundException { return submitter.submitJobInternal(Job.this , cluster); } }); state = JobState.RUNNING; •••••• }
submit 方法内部会创建 JobSubmitter 对象,然后通过 submitJobInternal 方法进一步提交 Job。
org.apache.hadoop.mapreduce.JobSubmitter#submitJobInternal
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 JobStatus submitJobInternal (Job job, Cluster cluster) throws ClassNotFoundException, InterruptedException, IOException { •••••• Configuration conf = job.getConfiguration(); addMRFrameworkToDistributedCache(conf); Path jobStagingArea = JobSubmissionFiles.getStagingDir(cluster, conf); •••••• JobID jobId = submitClient.getNewJobID(); job.setJobID(jobId); Path submitJobDir = new Path (jobStagingArea, jobId.toString()); JobStatus status = null ; •••••• •••••• status = submitClient.submitJob( jobId, submitJobDir.toString(), job.getCredentials()); •••••• }
在 submitJobInternal 中,Hadoop 会通过 submitClient 实际提交 Job。submitClient 是一个 ClientProtocol 接口,其有两个实现,由于提交 Job 的时候 Hadoop 运行在 Standalone 模式下,所以这里 submitClient 的实现是 LocalJobRunner。
启动 MapTask
org.apache.hadoop.mapred.LocalJobRunner#submitJob
1 2 3 4 5 6 7 public org.apache.hadoop.mapreduce.JobStatus submitJob ( org.apache.hadoop.mapreduce.JobID jobid, String jobSubmitDir, Credentials credentials) throws IOException { Job job = new Job (JobID.downgrade(jobid), jobSubmitDir); job.job.setCredentials(credentials); return job.status; }
org.apache.hadoop.mapred.LocalJobRunner.Job#Job
1 2 3 4 5 public Job (JobID jobid, String jobSubmitDir) throws IOException { •••••• this .start(); }
submitJob 内部会创建一个 Job 对象,这里的 Job 是继承了 Thread
的 LocalJobRunner 的内部类。通过构造方法可以知道,submitJob 在创建 Job 的同时也开启了线程,所以需要查看 Job#run 方法。
org.apache.hadoop.mapred.LocalJobRunner.Job#run
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 @Override public void run () { JobID jobId = profile.getJobID(); JobContext jContext = new JobContextImpl (job, jobId); •••••• Map<TaskAttemptID, MapOutputFile> mapOutputFiles = Collections.synchronizedMap(new HashMap <TaskAttemptID, MapOutputFile>()); List<RunnableWithThrowable> mapRunnables = getMapTaskRunnables( taskSplitMetaInfos, jobId, mapOutputFiles); initCounters(mapRunnables.size(), numReduceTasks); ExecutorService mapService = createMapExecutor(); runTasks(mapRunnables, mapService, "map" ); •••••• outputCommitter.commitJob(jContext); status.setCleanupProgress(1.0f ); •••••• }
org.apache.hadoop.mapred.LocalJobRunner.Job#getMapTaskRunnables
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 protected List<RunnableWithThrowable> getMapTaskRunnables ( TaskSplitMetaInfo [] taskInfo, JobID jobId, Map<TaskAttemptID, MapOutputFile> mapOutputFiles) { int numTasks = 0 ; ArrayList<RunnableWithThrowable> list = new ArrayList <RunnableWithThrowable>(); for (TaskSplitMetaInfo task : taskInfo) { list.add(new MapTaskRunnable (task, numTasks++, jobId, mapOutputFiles)); } return list; }
org.apache.hadoop.mapred.LocalJobRunner.Job#runTasks
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 private void runTasks (List<RunnableWithThrowable> runnables, ExecutorService service, String taskType) throws Exception { for (Runnable r : runnables) { service.submit(r); } try { service.shutdown(); LOG.info("Waiting for " + taskType + " tasks" ); service.awaitTermination(Long.MAX_VALUE, TimeUnit.NANOSECONDS); } catch (InterruptedException ie) { service.shutdownNow(); throw ie; } •••••• }
对于 Job#run 方法应该主要关注 MapTaskRunnable 的生成和执行,可以看到 Hadoop 会通过 getMapTaskRunnables 方法根据分配的 Task 的数量生成对应数量的 MapTaskRunnable,然后会调用 runTasks 方法向线程池提交任务。
MapTaskRunnable 任务提交到线程池后继续关注 MapTaskRunnable#run 方法
org.apache.hadoop.mapred.LocalJobRunner.Job.MapTaskRunnable#run
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 public void run () { try { •••••• MapTask map = new MapTask (systemJobFile.toString(), mapId, taskId, info.getSplitIndex(), 1 ); •••••• try { •••••• map.run(localConf, Job.this ); •••••• } catch (Throwable e) { this .storedException = e; } } }
从 MapTaskRunnable#run 中可以看到其创建了一个 MapTask 对象,并调用了 MapTask#run 方法。
org.apache.hadoop.mapred.MapTask#run
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 @Override public void run (final JobConf job, final TaskUmbilicalProtocol umbilical) throws IOException, ClassNotFoundException, InterruptedException { •••••• boolean useNewApi = job.getUseNewMapper(); •••••• if (useNewApi) { runNewMapper(job, splitMetaInfo, umbilical, reporter); } else { runOldMapper(job, splitMetaInfo, umbilical, reporter); } done(umbilical, reporter); }
MapTask#run 中会调用 runNewMapper 方法,所以继续查看该方法
org.apache.hadoop.mapred.MapTask#runNewMapper
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 private <INKEY,INVALUE,OUTKEY,OUTVALUE> void runNewMapper (final JobConf job, final TaskSplitIndex splitIndex, final TaskUmbilicalProtocol umbilical, TaskReporter reporter ) throws IOException, ClassNotFoundException, InterruptedException { org.apache.hadoop.mapreduce.TaskAttemptContext taskContext = new org .apache.hadoop.mapreduce.task.TaskAttemptContextImpl(job, getTaskID(), reporter); org.apache.hadoop.mapreduce.Mapper<INKEY,INVALUE,OUTKEY,OUTVALUE> mapper = (org.apache.hadoop.mapreduce.Mapper<INKEY,INVALUE,OUTKEY,OUTVALUE>) ReflectionUtils.newInstance(taskContext.getMapperClass(), job); •••••• org.apache.hadoop.mapreduce.MapContext<INKEY, INVALUE, OUTKEY, OUTVALUE> mapContext = new MapContextImpl <INKEY, INVALUE, OUTKEY, OUTVALUE>(job, getTaskID(), input, output, committer, reporter, split); org.apache.hadoop.mapreduce.Mapper<INKEY,INVALUE,OUTKEY,OUTVALUE>.Context mapperContext = new WrappedMapper <INKEY, INVALUE, OUTKEY, OUTVALUE>().getMapContext( mapContext); try { •••••• mapper.run(mapperContext); •••••• } finally { •••••• } }
MapTask#runNewMapper 方法中会通过反射创建设置的 MapperClass 的对象,即 org.apache.giraph.job.GiraphJob#run 中设定的 GraphMapper 类的对象。在获取到 GraphMapper 对象后,系统会调用其 run 方法,从而使得程序的执行进入到 Giraph 部分。
Giraph 执行 Job
org.apache.giraph.graph.GraphMapper#run
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 @Override public void run (Context context) throws IOException, InterruptedException { try { setup(context); while (context.nextKeyValue()) { graphTaskManager.execute(); } cleanup(context); } catch (RuntimeException e) { •••••• } }
从 GraphMapper#run 方法可以看到一个 Giraph Job 的执行能够分为三个过程:
下面针对初始化过程进行分析
初始化
org.apache.giraph.graph.GraphMapper#setup
1 2 3 4 5 6 7 8 9 @Override public void setup (Context context) throws IOException, InterruptedException { graphTaskManager = new GraphTaskManager <I, V, E>(context); graphTaskManager.setup( DistributedCache.getLocalCacheArchives(context.getConfiguration())); }
GraphMapper#setup 方法中会创建 GraphTaskManager 对象,并调用其 setup 方法
org.apache.giraph.graph.GraphTaskManager#setup
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 public void setup (Path[] zkPathList) throws IOException, InterruptedException { Configuration hadoopConf = context.getConfiguration(); conf = new ImmutableClassesGiraphConfiguration <I, V, E>(hadoopConf); •••••• String serverPortList = conf.getZookeeperList(); if (serverPortList.isEmpty()) { if (startZooKeeperManager()) { return ; } } else { createZooKeeperCounter(serverPortList); } •••••• this .graphFunctions = determineGraphFunctions(conf, zkManager); if (zkManager != null && this .graphFunctions.isMaster()) { zkManager.cleanupOnExit(); } try { instantiateBspService(); } catch (IOException e) { •••••• } }
GraphTaskManager#setup 方法主要做三件事:
获取 Zookeeper 连接信息
决定进程的角色
初始化 BSP 服务
获取 Zookeeper 连接信息
GraphTaskManager#setup 中会通过 conf.getZookeeperList() 获得 Zookeeper 的连接信息。如果提供了外部 Zookeeper 则直接返回连接信息,但如果没有提供外部 Zookeeper 时,getZookeeperList() 会返回空值。此时 GraphTaskManager#setup 会调用 startZooKeeperManager 方法在某一个 Task 启动 Zookeeper。
org.apache.giraph.graph.GraphTaskManager#startZooKeeperManager
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 private boolean startZooKeeperManager () throws IOException, InterruptedException { zkManager = new ZooKeeperManager (context, conf); context.setStatus("setup: Setting up Zookeeper manager." ); zkManager.setup(); if (zkManager.computationDone()) { done = true ; return true ; } zkManager.onlineZooKeeperServer(); String serverPortList = zkManager.getZooKeeperServerPortString(); conf.setZookeeperList(serverPortList); createZooKeeperCounter(serverPortList); return false ; }
startZooKeeperManager 中首先会创建 ZooKeeperManager 对象,然后调用其 setup 方法
org.apache.giraph.zk.ZooKeeperManager#setup
1 2 3 4 public void setup () throws IOException, InterruptedException { createCandidateStamp(); getZooKeeperServerList(); }
ZooKeeperManager#setup 方法中会首先调用 createCandidateStamp 方法
org.apache.giraph.zk.ZooKeeperManager#createCandidateStamp
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 public void createCandidateStamp () { •••••• fs.mkdirs(baseDirectory); •••••• fs.mkdirs(serverDirectory); •••••• if (!fs.getFileStatus(baseDirectory).isDir()) { throw new IllegalArgumentException ( "createCandidateStamp: " + baseDirectory + " is not a directory, but should be." ); } •••••• Path myCandidacyPath = new Path ( taskDirectory, myHostname + HOSTNAME_TASK_SEPARATOR + taskPartition); try { •••••• fs.createNewFile(myCandidacyPath); } catch (IOException e) { LOG.error("createCandidateStamp: Failed (maybe previous task " + "failed) to create filestamp " + myCandidacyPath, e); } }
在 createCandidateStamp 方法中,每个 Task 会根据自己的 hostname 和 taskPartition 在 _bsp/_defaultZkManagerDir/_task 下创建对应文件,这些文件将会在系统选择某个 Task 启动 Zookeeper 服务时用到。具体结果如下图所示:
图中 hostname 是 localhost 的原因在于,运行源码的时候 Hadoop 处于 Standalone 模式。
在 createCandidateStamp 执行完成之后,ZooKeeperManager#setup 会接着调用 getZooKeeperServerList
org.apache.giraph.zk.ZooKeeperManager#getZooKeeperServerList
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 private void getZooKeeperServerList () throws IOException, InterruptedException { String serverListFile; if (taskPartition == 0 ) { serverListFile = getServerListFile(); if (serverListFile == null ) { createZooKeeperServerList(); } } while (true ) { serverListFile = getServerListFile(); •••••• if (serverListFile != null ) { break ; } try { Thread.sleep(pollMsecs); } catch (InterruptedException e) { LOG.warn("getZooKeeperServerList: Strange interrupted " + "exception " + e.getMessage()); } } String[] serverHostList = serverListFile.substring( ZOOKEEPER_SERVER_LIST_FILE_PREFIX.length()).split( HOSTNAME_TASK_SEPARATOR); •••••• zkServerHost = serverHostList[0 ]; zkServerTask = Integer.parseInt(serverHostList[1 ]); updateZkPortString(); }
getZooKeeperServerList 方法会根据 taskPartition 进行判断,如果是 0 号 Task 则会先调用 createZooKeeperServerList 创建 serverListFile(serverListFile 表明了 Zookeeper 服务所在的 hostname 和 taskPartition),而如果是非 0 号 Task 则会进行轮询来获取 serverListFile 的文件名。在获取到文件名后会对其进行解析来更新 zkServerHost、zkServerTask 以及 zkServerPortString。
接下来会对 createZooKeeperServerList 和 getZooKeeperServerList 进行分析以便更好的理解系统如何选取启动 Zookeeper 服务的 Task
org.apache.giraph.zk.ZooKeeperManager#createZooKeeperServerList
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 private void createZooKeeperServerList () throws IOException, InterruptedException { String host; String task; while (true ) { FileStatus [] fileStatusArray = fs.listStatus(taskDirectory); if (fileStatusArray.length > 0 ) { FileStatus fileStatus = fileStatusArray[0 ]; String[] hostnameTaskArray = fileStatus.getPath().getName().split( HOSTNAME_TASK_SEPARATOR); •••••• host = hostnameTaskArray[0 ]; task = hostnameTaskArray[1 ]; break ; } Thread.sleep(pollMsecs); } String serverListFile = ZOOKEEPER_SERVER_LIST_FILE_PREFIX + host + HOSTNAME_TASK_SEPARATOR + task; Path serverListPath = new Path (baseDirectory, serverListFile); •••••• } fs.createNewFile(serverListPath); }
createZooKeeperServerList 中会获取所有 Task 在 createCandidateStamp 方法中创建的文件的文件名,然后选取返回数组中第一个元素标识的 Task 信息去创建 serverListFile。
org.apache.giraph.zk.ZooKeeperManager#getServerListFile
1 2 3 4 5 6 7 8 9 10 11 12 13 14 private String getServerListFile () throws IOException { String serverListFile = null ; FileStatus [] fileStatusArray = fs.listStatus(baseDirectory); for (FileStatus fileStatus : fileStatusArray) { if (fileStatus.getPath().getName().startsWith( ZOOKEEPER_SERVER_LIST_FILE_PREFIX)) { serverListFile = fileStatus.getPath().getName(); break ; } } return serverListFile; }
getServerListFile 会获取 baseDirectory 下的文件元数据,然后筛选出对应的 serverListFile,最后返回其文件名。
接着回到 startZooKeeperManager 方法中,在选定了启动 Zookeeper 服务的 Task 后,系统会首先判断计算是否完成,如果已经完成则表明无需再继续运行。否则会调用 onlineZooKeeperServer 方法启动 Zookeeper 服务。
org.apache.giraph.zk.ZooKeeperManager#onlineZooKeeperServer
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 public void onlineZooKeeperServer () throws IOException { if (zkServerTask == taskPartition) { File zkDirFile = new File (this .zkDir); try { •••••• FileUtils.deleteDirectory(zkDirFile); } catch (IOException e) { •••••• } generateZooKeeperConfig(); synchronized (this ) { zkRunner = createRunner(); int port = zkRunner.start(zkDir, config); if (port > 0 ) { zkBasePort = port; updateZkPortString(); } } int connectAttempts = 0 ; final int maxConnectAttempts = conf.getZookeeperConnectionAttempts(); while (connectAttempts < maxConnectAttempts) { try { •••••• InetSocketAddress zkServerAddress = new InetSocketAddress (myHostname, zkBasePort); Socket testServerSock = new Socket (); testServerSock.connect(zkServerAddress, 5000 ); •••••• break ; } catch (SocketTimeoutException e) { LOG.warn("onlineZooKeeperServers: Got " + "SocketTimeoutException" , e); } catch (ConnectException e) { LOG.warn("onlineZooKeeperServers: Got " + "ConnectException" , e); } catch (IOException e) { LOG.warn("onlineZooKeeperServers: Got " + "IOException" , e); } ++connectAttempts; try { Thread.sleep(pollMsecs); } catch (InterruptedException e) { LOG.warn("onlineZooKeeperServers: Sleep of " + pollMsecs + " interrupted - " + e.getMessage()); } } if (connectAttempts == maxConnectAttempts) { throw new IllegalStateException ( "onlineZooKeeperServers: Failed to connect in " + connectAttempts + " tries!" ); } Path myReadyPath = new Path ( serverDirectory, myHostname + HOSTNAME_TASK_SEPARATOR + taskPartition + HOSTNAME_TASK_SEPARATOR + zkBasePort); try { •••••• fs.createNewFile(myReadyPath); } catch (IOException e) { •••••• } } else { int readyRetrievalAttempt = 0 ; String foundServer = null ; while (true ) { try { FileStatus [] fileStatusArray = fs.listStatus(serverDirectory); if ((fileStatusArray != null ) && (fileStatusArray.length > 0 )) { for (int i = 0 ; i < fileStatusArray.length; ++i) { String[] hostnameTaskArray = fileStatusArray[i].getPath().getName().split( HOSTNAME_TASK_SEPARATOR); if (hostnameTaskArray.length != 3 ) { throw new RuntimeException ( "getZooKeeperServerList: Task 0 failed " + "to parse " + fileStatusArray[i].getPath().getName()); } foundServer = hostnameTaskArray[0 ]; zkBasePort = Integer.parseInt(hostnameTaskArray[2 ]); updateZkPortString(); } •••••• if (zkServerHost.equals(foundServer)) { break ; } } else { •••••• } Thread.sleep(pollMsecs); ++readyRetrievalAttempt; } catch (IOException e) { throw new RuntimeException (e); } catch (InterruptedException e) { •••••• } } } }
可以看到 onlineZooKeeperServer 实际做了两件事情:一是在选定的 Task 上启动 Zookeeper 服务,并创建文件表明服务已经准备好。二是所有未启动 Zookeeper 服务的 Task 去更新 Zookeeper 的连接信息。
分配角色
在启动完成 Zookeeper 服务之后系统会更新 Zookeeper 相关的配置信息然后返回到 org.apache.giraph.graph.GraphTaskManager#setup 方法中,之后会调用 determineGraphFunctions
org.apache.giraph.graph.GraphTaskManager#determineGraphFunctions
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 private static GraphFunctions determineGraphFunctions ( ImmutableClassesGiraphConfiguration conf, ZooKeeperManager zkManager) { boolean splitMasterWorker = conf.getSplitMasterWorker(); int taskPartition = conf.getTaskPartition(); boolean zkAlreadyProvided = conf.isZookeeperExternal(); GraphFunctions functions = GraphFunctions.UNKNOWN; if (!splitMasterWorker) { if ((zkManager != null ) && zkManager.runsZooKeeper()) { functions = GraphFunctions.ALL; } else { functions = GraphFunctions.ALL_EXCEPT_ZOOKEEPER; } } else { if (zkAlreadyProvided) { if (taskPartition == 0 ) { functions = GraphFunctions.Master_ONLY; } else { functions = GraphFunctions.Worker_ONLY; } } else { if ((zkManager != null ) && zkManager.runsZooKeeper()) { functions = GraphFunctions.Master_ZOOKEEPER_ONLY; } else { functions = GraphFunctions.Worker_ONLY; } } } return functions; }
determineGraphFunctions 主要是对 Task 的角色进行判断,系统提供了 6 种角色:
UNKNOWN
表明 Task 的角色还未知
Master_ONLY
表明 Task 是 Master
Master_ZOOKEEPER_ONLY
表明 Task 既是 Master 也是 Zookeeper
Worker_ONLY
表明 Task 只是 Worker
ALL
表明 Task 既是 Master 也是 Worker 和 Zookeeper
ALL_EXCEPT_ZOOKEEPER
表明 Task 既是 Master 也是 Worker
初始化 BSP
在决定各个 Task 的角色之后,系统会调用 instantiateBspService 初始化 BSP 服务。
org.apache.giraph.graph.GraphTaskManager#instantiateBspService
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 private void instantiateBspService () throws IOException, InterruptedException { if (graphFunctions.isMaster()) { •••••• serviceMaster = new BspServiceMaster <I, V, E>(context, this ); MasterThread = new MasterThread <I, V, E>(serviceMaster, context); MasterThread.setUncaughtExceptionHandler( createUncaughtExceptionHandler()); MasterThread.start(); } if (graphFunctions.isWorker()) { •••••• serviceWorker = new BspServiceWorker <I, V, E>(context, this ); installGCMonitoring(); •••••• } }
instantiateBspService 中对于 Master 主要是创建 serviceMaster 对象,然后启动 MasterThread 线程,对于 Worker 则是创建 serviceWorker 对象。
总结
总的来说,Giraph 的 Job 提交和初始化依据以下流程来执行:
用户向 Giraph 提交 Job
Giraph 向 Hadoop 提交 Job
Hadoop 启动 MapTask,并执行 GraphMapper 的 run 方法
GraphMapper 创建 GraphTaskManager 对象进行初始化
初始化过程首先获取 Zookeeper 连接信息,如果没有外置 Zookeeper 则需要从所有 MapTask 中进行选取 Task 来启动 Zookeeper 服务。
获取到 Zookeeper 连接信息之后会根据 determineGraphFunctions 分配角色,由此区分 MapTask 中的 Master 和 Worker
分配完角色之后则会通过 instantiateBspService 来初始化 BSP 服务,由此结束整个初始化过程。