Giraph 编程实践及源码编译调试

前言

本文主要总结了如何利用 Giraph 提供的 API 实现图计算编程,并说明了将 Giraph 源码导入 IDEA 进行调试的过程。

编程实践

本部分通过实现最短路径算法说明 Giraph 的编程流程

  1. 创建 Maven 工程

  2. 添加相关依赖

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    <dependencies>
    <!--添加 Giraph 依赖-->>
    <dependency>
    <groupId>org.apache.giraph</groupId>
    <artifactId>giraph-core</artifactId>
    <version>1.2.0-hadoop2</version>
    </dependency>

    <!--添加 Hadoop 依赖-->>
    <dependency>
    <groupId>org.apache.hadoop</groupId>
    <artifactId>hadoop-common</artifactId>
    <version>2.5.1</version>
    </dependency>

    <dependency>
    <groupId>org.apache.hadoop</groupId>
    <artifactId>hadoop-client</artifactId>
    <version>2.5.1</version>
    </dependency>
    </dependencies>
  3. 创建 ShortestPathComputation 类

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    39
    40
    41
    42
    43
    44
    45
    46
    47
    48
    49
    50
    51
    52
    53
    54
    55
    56
    57
    58
    59
    60
    import org.apache.giraph.edge.Edge;
    import org.apache.giraph.graph.BasicComputation;
    import org.apache.giraph.graph.Vertex;
    import org.apache.hadoop.io.DoubleWritable;
    import org.apache.hadoop.io.FloatWritable;
    import org.apache.hadoop.io.LongWritable;

    import java.io.IOException;

    public class ShortestPathComputation extends BasicComputation<LongWritable, DoubleWritable,
    FloatWritable, DoubleWritable> {
    /**
    * 需要计算最短路径的源顶点
    */
    private static final int SOURCE_VERTEX = 0;
    /**
    * 表示节点不可达
    */
    private static final double UNREACHABLE = Double.MAX_VALUE;

    /**
    * @param vertex 待处理的顶点
    * @param messages vertex 接收到的来自其余顶点的 message
    */
    @Override
    public void compute(Vertex<LongWritable, DoubleWritable, FloatWritable> vertex,
    Iterable<DoubleWritable> messages) throws IOException {
    if (getSuperstep() == 0) {
    //超步 0 时源顶点最短路径设置为 0,其余顶点设置为不可达,并且源顶点需要向其它顶点发送最短距离message
    if (vertex.getId().get() == SOURCE_VERTEX) {
    vertex.setValue(new DoubleWritable(0));
    sendDist(vertex);
    } else {
    vertex.setValue(new DoubleWritable(UNREACHABLE));
    }
    } else {
    //遍历处理从其余顶点收到的 message,
    // 查看 message 中传递的最短距离是否小于当前的最短距离,如果是则进行更新
    for (DoubleWritable message : messages) {
    if (message.get() < vertex.getValue().get()) {
    vertex.setValue(message);
    sendDist(vertex);
    }
    }
    }
    //主动将顶点置于不活跃状态,如果顶点收到 message,系统会将顶点再度激活
    vertex.voteToHalt();
    }

    /**
    * 发送顶点 vertex 到其邻接顶点的最短距离
    */
    private void sendDist(Vertex<LongWritable,
    DoubleWritable, FloatWritable> vertex) {
    for (Edge<LongWritable, FloatWritable> edge : vertex.getEdges()) {
    double distance = vertex.getValue().get() + edge.getValue().get();
    sendMessage(edge.getTargetVertexId(), new DoubleWritable(distance));
    }
    }
    }

    ShortestPathComputation 继承了 BasicComputation,对于 BasicComputation 中指定的四个类型参数,其含义依次为

    • Vertex id

    • Vertex data

    • Edge data

    • Message

    整个计算的过程可以总结为:

    1. Superstep 0 的时候进行初始化,然后源头顶点向邻接顶点发送可能的最短路径 message
    2. 下一 Superstep 邻接顶点处理接收到的 message 然后和顶点值进行比较,如果 message 小于顶点值则对顶点值进行更新,并向邻接顶点发送可能的最短路径 message。
    3. 重复 2 直到所有的顶点处于不活跃状态,最后结束计算进行输出。
  4. 提交验证

    验证部分基于 Pseudo-Distributed 模式的 Hadoop 进行

    • 上传测试文件到 HDFS

      1
      2
      3
      4
      5
      6
      7
      8
      $HADOOP_HOME/bin/hdfs dfs -put giraph_data.txt <your_input_path>/shortestpath
      #giraph_data 中的数据,格式为 [source_id,source_value,[[dest_id, edge_value],...]],
      # 如 [0,0,[[1,1],[3,3]]] 表示顶点序号为 0,顶点值为 0,存在序号 0 到 序号 1 的边,值为 1,存在序号 0 到序号 3 的边,值为 3。
      [0,100,[[1,1],[3,3]]]
      [1,20,[[0,1],[2,2],[3,1]]]
      [2,90,[[1,2],[4,4]]]
      [3,50,[[0,3],[1,1],[4,4]]]
      [4,80,[[3,4],[2,4]]]
    • 将工程打包为 jar 文件,并提交到 Hadoop

      1
      $HADOOP_HOME/bin/hadoop jar Examples.jar org.apache.giraph.GiraphRunner com.ikroal.shortestpath.ShortestPathComputation -vif org.apache.giraph.io.formats.JsonLongDoubleFloatDoubleVertexInputFormat -vip <your_input_path>/giraph_data.txt -vof org.apache.giraph.io.formats.IdWithValueTextOutputFormat -op <your_output>/shortestpaths -w 1

      提交任务的过程基本与一般的提交过程一致,只是还额外指定了 Giraph 运行所需要的参数,对于相关参数的解释请参考 Giraph 环境搭建

    • 查看结果

      运行之后将会在输出路径下看到输出文件,其内容为

      1
      2
      3
      4
      5
      0  0.0
      1 1.0
      2 3.0
      3 2.0
      4 6.0

总结

Giraph 的编程过程可以总结为:

  • 继承 BasicComputation,实现 compute 方法

    compute 方法主要完成顶点的计算更新和必要的消息发送

  • 命令行提交 Job

    由于系统已经提供了 GiraphRunner 的主类,所以大部分情况下用户不需要编写 Job 的配置和提交过程,但是如果 GiraphRunner 不满足用户需求,用户也可以自定义主类然后命令行提交的时候进行指定

以上仅仅说明了最基本的 Giraph 编程过程,但如果想要实现性能最优的图计算过程,则还需要考虑编程过程中利用 AggregatorsCombiners 机制,相关示例可以参考 giraph 源码的 giraph-examples 部分。

源码编译调试

环境要求

Giraph 源码的编译调试要求 Java 1.8、Maven 3 以上版本以及 Hadoop2.5.1

导入源码

  1. 下载源码

    1
    git clone https://github.com/apache/giraph.git
  2. 编译源码

    1
    mvn -Phadoop_2 -DskipTests clean package

    成功后将会输出

  3. 使用 IDEA 打开 giraph 源码目录

    在 Maven 的 Profies 页面选择 hadoop2 ( 默认是 hadoop1 )

增加自定义入口

Giraph 提供了一个位于 giraph-core/src/main/java/org/apache/giraph 下的入口类 GiraphRunner。但该类比较繁琐并且不能自动删除输出文件,不太利于本地调试阅读 Giraph 的源码。因此最好是增加一个自定义的入口类。

  • 在 org.apache.giraph 包下创建 custom 包

  • 在 custom 包下创建用于测试的 Shortestpath 类(内容与编程实践部分一致)

  • 在 custom 包下创建自定义入口类 CustomRunner

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    39
    40
    41
    42
    43
    44
    45
    46
    47
    48
    49
    50
    51
    52
    53
    54
    55
    56
    57
    58
    59
    60
    61
    62
    63
    64
    65
    66
    67
    import org.apache.giraph.conf.GiraphConfiguration;
    import org.apache.giraph.conf.GiraphConstants;
    import org.apache.giraph.io.formats.*;
    import org.apache.giraph.job.GiraphJob;
    import org.apache.hadoop.conf.Configuration;
    import org.apache.hadoop.fs.Path;

    import java.io.File;

    public class CustomRunner {

    /**
    * 输入路径
    */
    private static final String INPUT_PATH = "giraph-core/src/main/resources/input/graph_data.txt";

    /**
    * 输出路径
    */
    private static final String OUTPUT_PATH = "giraph-core/src/main/resources/output/shortestPath";

    public static void main(String[] args) throws Exception {
    GiraphConfiguration conf = new GiraphConfiguration(new Configuration());
    conf.setComputationClass(Shortestpath.class);
    //设置输入和输出格式
    conf.setVertexInputFormatClass(JsonLongDoubleFloatDoubleVertexInputFormat.class);
    conf.setVertexOutputFormatClass(IdWithValueTextOutputFormat.class);
    //设置本地运行模式
    conf.setLocalTestMode(true);
    //设置 worker 配置
    conf.setWorkerConfiguration(1, 1, 100);
    //可选,如果要学习 Checkpoint 机制应该设置
    conf.setCheckpointFrequency(4);
    GiraphConstants.SPLIT_MASTER_WORKER.set(conf, false);

    GiraphJob job = new GiraphJob(conf, Shortestpath.class.getSimpleName());
    //设置输入和输出路径
    GiraphTextInputFormat.setVertexInputPath(conf, new Path(INPUT_PATH));
    GiraphTextOutputFormat.setOutputPath(job.getInternalJob(), new Path(OUTPUT_PATH));
    //删除之前的输出
    deletePath(OUTPUT_PATH, true);
    job.run(true);
    }

    /**
    * 用于删除输出目录
    *
    * @param path 目录路径
    */
    public static void deletePath(String path, boolean isDirectory) {
    File file = new File(path);
    if (file.exists()) {
    //本地目录递归删除
    if (isDirectory) {
    File[] subFiles = file.listFiles();
    for (File subFile : subFiles) {
    if (subFile.isFile()) {
    subFile.delete();
    } else {
    deletePath(subFile.getPath(), true);
    }
    }
    }
    file.delete();
    }
    }
    }
  • 在 resources 文件夹创建 input 文件夹并放入编程实践中的测试数据 graph_data.txt

  • 验证

    修改配置并运行如果在 resources 文件夹下看到输出文件,证明添加自定义入口成功,此时可以进行断点调试

问题

  1. 运行时提示 TestYarnJob 中的 MiniYARNCluster 缺失问题

    对于 Test 部分的内容因为不影响源码阅读,可以将出错部分注释掉

Thanks

  1. Introduction to Apache Giraph
  2. Building and Testing