Category Archives: 并行计算

基于MPI的分布式LR、FM、FFM模型

        MPI版本的分布式LR模型是最先实现的,我在读研一时就有这个打算,当时就想着用自己实现的分布式模型做CTR预估,直到去年才真正实现。将代码实现的一些细节简单记录一下,方便以后继续优化。代码地址在:https://github.com/xswang/Field-aware-Factorization-Machine-mpi

       程序启动后会根据参数启动对应的进程数,每个进程功能是等价的,都要负责计算,不过会指定某个节点做参数的allreduce, 在代码中我指定了rank 0。

       hosts文件中是集群IP地址。

       n2n_config.h是配置文件,用来设置是否online,FTRL的各种参数等等。

       main.cpp主要就是MPI rank设置,程序启动过程。

       param.h是参数的设置,基本不需要改动。

       predict.h是计算AUC的,是个分布式来算的。基于MPI实现的可分布式计算AUC的代码在这里:https://github.com/xswang/AUC-caculate-mpi , 就不再单独用一篇笔记解释代码,一般每人关注AUC的算法细节,只需要记得在算ROC曲线下面积时按纵横轴scale就行了,剩下的就很好理解。

       learner中提供了几种优化算法,主要是FtrlLearner,在这个算法实现中,通过一些方式可以在实现LR模型的同时,也实现了FM和FFM模型。OWLQN没在这里实现,可以看这里:https://github.com/xswang/logistic-regression-owlqn-mpi/blob/master/src/owlqn.h,不过这个并行的OWLQN并没有有来的及做正确性测试,所以它目前是不能用的。

       在ftrl_learner.h中update_w()函数和update_v()函数,其中update_w()函数就是标准的参数w的更新算法,update_v是针对FM和FFM模型的更新算法,在第98行,如果用户配置只是用LR模型,直接break。第102行,f是FFM模型中每个特征拥有的latnet factor vector个数,如果用户只是用FM模型,则将f设置为0,也就是每个特征只对应一个latent factor vector,这针对这一个vector更行,否则将更新特征对应的所有的latent factor vector.

       在ftrl_learner.cpp中,run()函数是入口,根据配置文件选择是哪种训练方式。无论哪种方式,都使用了线程池,多线程之间需要同步的地方通过加锁实现。allreduce_gradient()函数主要负责梯度的收集,allreduce_weight负责模型更新和发送。整个过程体现了星型拓扑结构:rank 0作为主节点,收集梯度并更新模型之后,将参数发送给其它节点。

       在代码实现时用到了openblas科学计算库已经一个修改过的线程池。

       在代码写完之后,我使用了三台机器对LR和FM模型进行了测试,根据测试集AUC变化情况,FM的ROC曲线在最开始更陡峭,但是最终和LR差不多,也就是FM能更快的学习,但是最终结果并不明显比LR好,这可能也和当时没有充分调参有关。不过至少说明这个分布式FM模型代码是可用的。FFM模型并没有测,主要是速度太慢。

       比较遗憾的是并没有对MPI版本的LR和ps-lite版本的LR进行对比过。

       目前Parameter Server框架比较流行,也许没人会再用MPI同步训练模型了。LR模型基本上都已经在各个公司上线,FM也有不少公司开始探索,基于ps-lite的FM和FFM模型是我的下一个计划。2014时用DNN做CTR预估还是个很神秘的事情,然而现在也被打破了神秘感,不少公司都在探索。不过目前DNN模型还没有一个很好的离散特征值输入的分布式训练系统,线下训练可能会比较慢,上线也比较困难。

       

分布式FTRL优化算法的实现

之前实现过Parameter Server框架下的分布式FTRL优化算法,用的是DMLC的ps-lite。在PS架构下,集群分为worker、server、scheduler三种线程。其中worker负责梯度的计算,server负责参数的更新和分布式存储,scheduler负责集群节点的管理和状态监控。将代码实现简单介绍如下:

        1.  在main.cpp中完成启三个线程的启动过程。

         2. 在worker.cpp中完成梯度的计算并向server 推送算得的梯度。其中,(1)oneline_learning_threadpool是做在线学习用的,数据流主要是从kafka中读取,所有数据只训练一遍;batch_learning_threadpool是做批量学习用的,数据主要是从磁盘或者HDFS中读取,训练数据可以反复训练多次。(2)calculate_batch_gradient_threadpool负责根据样本计算梯度,在计算过程中worker和server通过ps-lite提供的pull/push接口拉取模型参数和推送样本梯度值。针对梯度计算,过程如下:首先,读取一批训练数据,代码328行至339行得到该批次数据的所有特征ID:all_keys,340行至343行进行过滤得到唯一的特征ID,348行从server中拉取所需权重,且权重是按照特征ID值大小排序的。352行至363行,根据pull得到的特征权重,计算该批次中每条样本的w*x乘积,这里有个巧妙的方法可以在O(m)时间复杂度完成计算,最开始的做法是把pull回来的数据放到map中,这样需要m*O(logn)时间复杂度。经过试验对比,O(m)时间复杂度的算法速度快了很多。我曾经尝试过其它的hashmap来替代std::unordered_map,效果都没有明显提升。

        3. 在server.cpp中完成模型的更新,根据FTRL公式,server收到worker推送过来的梯度之后进行一系列的计算更新z和w,从而完成模型的更新。156行的Push函数是ps-lite会调用的回调函数,也是用户实现模型更新逻辑的地方。

        4. dump.cpp函数将二进制的模型文件转成明文。

        5. 在io文件夹中,io.h是所有io接口的声明。load_data_from_local.cc从磁盘读取数据,load_data_from_kafka.cc从kafka中读取流式数据。其中load_minibatch_hash_data_fread函数采用fread函数读取数据,比用std::cin读取一行之后再分割的方法,在速度上快了近8倍。

        代码地址在:https://github.com/xswang/Field-aware-Factorization-Machine-ps ,之前是在三台机器上测试过分布式版本的,目前修改成单机版,如果要改成多机版也非常方便。

TensorFlow的op placementce

关于op在device上的placement策略,目前还没有比较好的方法,TensorFlow目前也只是用了比较简单的策略,细节如下:

   1,用户明确指定了设备号,就尊重用户的意愿,例如:当代码中有with tf.device(“/job:local/task:0/gpu:0”):或者with tf.device(“/job:local/task:0/cpu:0”):

时,就把对应op放到用户明确指定的设备上去。

   2,除了source和sink的其它节点,优先分配设备等级较高的,其中GPU等级高于CPU,因此一般都会被分配到GPU:0。但是有以下三种例外:

2.1,generator节点,也就是0个输入且只有1个输出的节点,这样的节点会被放置到和输出节点相同的设备上。

2.2,元操作节点,例如reshape,该元操作节点放置在被操作的节点上。

2.3,明显需要在CPU上执行的op。

 

   以上就是TensorFlow的placement算法,从以上可以看出,如果想用机器上的多个GPU卡,需要用户明确指出,不然会默认用编号为0的GPU卡。

   在第二步中,在处理2.1和2.2两种情况时,用并查集算法找到整个计算图中的连通分量,节点之间的关联关系就是根据2.1和2.2得到的。找到连通分量之后,有两种处理方式,(1)已知连通分量中某个节点放到哪个设备,就将该连通分量中的所有节点都放置到该设备;(2)连通分量中所有节点都没有被指定设备,则选择设备级别最高的设备放置。

TensorFlow与mxnet分布式框架对比

Mxnet和TensorFlow是目前国内最流行的两个开源分布式深度学习训练系统, 两者在设计上有些不同.

mxnet是Parameter Server架构, server和worker是两个最主要的进程,另外还有个负责集群管理的scheduler进程。server负责分布式存储模型参数,worker负责计算,且worker之间不能直接通信,只能通过server互相影响,一般来说,mxnet常用来做数据并行,每个GPU设备上都训练完整的DL模型。TensorFlow主要由client,server,worker三种进程,它虽然也可以实现parameter server的功能,但它计算节点的通信方式并不是只能靠server来完成,TensorFlow的woker是可以互相通信的,可以根据op的依赖关系主动收发数据。

另一个不同点是关于op和device的对应,一般情况下,mxnet常被用来做数据并行,每个GPU设备上都包含了计算图中所有的op。而TensorFlow是可以由用户指定op的放置的,大部分情况下,一个GPU设备只负责某个或者某几个op的训练任务,因此,也催生了关于op placement算法的研究,Google最近发表了一篇使用LSTM模型进行op placement优化的论文,我也在知乎上回答过该论文的详细内容:https://www.zhihu.com/question/61210638/answer/188682910,那个抱着双手的老虎就是我。

这篇笔记主要对比mxnet和TensorFlow在分布式框架下的一些不同,主要包括:进程类型、如何在分布式环境下启集群、如何做初始化。

1.进程类型

mxnet tensorflow
worker client
server server
scheduler worker

2.网络通信库

Mxnet tensorflow
zmq grpc

3.进程间通信模式

Mxnet tensorflow
Worker与scheduler,控制信息 Client与server,控制信息
Server与scheduler,控制信息 Server与worker,控制信息
Worker与server,数据 Worker与worker,数据

4.通信模式

Mxnet tensorflow
消息(队列),PostOffice负责多机之间消息收发,每个机器节点上都有唯一的postoffice单例对象。 消息(std::map),Rendezvous负责多机之间消息收发。每个机器节点上都有唯一的Rendezvous对象。

5,分布式启动

Mxnet tensorflow
各节点执行相同脚本。

根据集群host配置在每个节点上分别启动worker,server,scheduler进程,并进行集群初始化例如worker和server向scheduler的注册等等。

此时集群各个节点之间通信已ready,必要的节点之间可以互相收发数据。

各节点执行相同脚本。

根据集群host配置在每个节点上分别启动client,server,worker进程。完成集群初始化例如GRPC的service启动等。

此时集群各个节点通信已ready,必要的节点之间可以互相收发数据。

  1. 初始化流程6.1 Mxnet:系统初始化是由ps-lite这个模块完成的。每个节点都有个postoffice对象,该对象在系统初始化时进行一系列操作:
    1.     在构造函数中,根据环境变量设置当前节点的
    2.     根据配置文件给worker,server,scheduler进程分配编号。
    3.     调用Van对象初始化各个节点之间的通信,包含以下几个步骤:
      • 获取role为scheduler的ip和端口地址,并建立本节点与scheduler的通信。
      • 启动一个receiveing线程用来接收remote节点的消息。
      • 发送一个message给scheduler报告自己的情况。
      • 启动一个线程用来向scheduler发送heartbeat。

    6.2 TensorFlow:

    1. 启动GRPC Server
    2. 在各个节点上分别启动一个WorkerService线程和一个MasterService线程。
  2. 消息处理

7.1 Mxnet

worker和server以及worker->scheduler、server->scheduler之间通过发送message进行通信。这些消息都会被postoffice这个单例对象负责处理。

在6.3.2中,zmq收到的消息会被放到customer的ThreadsafeQueue<Message>队列中,在customer的Receiving函数中,从消息队列中取出消息并交给recv_handle_处理。而recv_handle_函数就是KVWorker和KVServer的Process函数。

KVWorker::Process函数是worker进程的函数,只是在收到消息后执行回调,例如对pull到的数据进行计算。

KVServer::Process函数是server进程的函数,收到消息之后,调用request_handle_进行处理。request_handle_是一个函数指针,其指向的函数在用户程序中,由用户负责实现。

7.2 Tensorflow

Tensorflow中数据的发送和接收是由send Op和recv Op分别实现的, 发送的消息由Rendezvous类负责处理。

对于send Op, 首先给要发送的消息构造一个unique key,并且交给Rendezvous处理Rendezvous会根据key在自己的消息map中查找是否有对应的recv请求,如果有,就执行recv注册的回调函数;如果没有,就把<key,message>对存放到自己的消息map中。

对于recv Op, 首先给要接收的消息构造一个unique key,并交由Rendezvous处理,Rendezvous根据key在自己的消息map中查找是否已经存在所需的消息,如果已存在,就直接copy消息;如果不存在,就调用GRPC的接口向remote节点发起异步远程调用。

以上所有,都只是关于分布式框架部分的流程,不涉及深度学习模型分布式训练的业务需求,这些以后会记录下来。

DMLC和Petuum并行计算框架的搭建与使用

        DMLC的ps-lite以及Petuum的bosen两者都是目前比较流行的Parameter Server计算框架的实现,PS(Parameter Server)计算框架在并行机器学习系统中被广泛使用,之前并行机器学习领域比较流行的是基于MPI的All Reduce计算框架,AR(All Reduce)计算框架下,算法需要严格同步,节点之间需要等待; 而PS不需要严格同步,通过一些方法在一定的Error Bound条件下完成异步训练。
        由于是在实际中用到并行模型进行CTR预估训练,因此选择基于ps-lite的difacto以及基于bosen实现的mlr,前者是并行版本的factorization machine, 这个模型也是我在2014年参加Cretio在kaggle上举办的广告CTR预估竞赛时用过的,取得了17名的成绩,现在也被越来越多的公司在实际中使用;后者是并行版本的logistics regression模型,这个是最普遍被使用的模型。
     1 difacto
     编译dmlc-core,编译之前要将difacto_dmlc/dmlc-core/make/config.mk中关于使用HDFS的选项设置为:USE_HDFS = 1
      编译ps-lite, ps-lite依赖6个第三方库:gflags glog protobuf zmq lz4 cityhash。 由于所使用的机器不能连接外网,因此我先将这六个库先下载到ps-lite文件夹下,并且修改difacto_dmlc/ps-lite/make/deps.mk文件,避免在编译之前将已经下载好的第三方库给删除了。
      编译FM模型,进入文件夹:difacto_dmlc/src/difacto,执行命令:make即可,得到的可执行文件在build文件夹中。
    使用yarn提交job, run_yarn.sh内容为:
    ../../dmlc-core/tracker/dmlc_yarn.py –jobname dmlc_wxs –vcores 1 -mem 512 -n 2 -s 1 build/difacto.dmlc guide/dmlc.fm.conf –log_dir=log –sync_timeout 500 -alsologtostderr -v 10
     其中dmlc.fm.conf中内容为:
     train_data = “hdfs:///dmlc/data/agaricus.txt.train”
     val_data = “hdfs:///dmlc/data/agaricus.txt.test”
     model_out = “hdfs:///dmlc/data/out/”
     max_data_pass = 3
    在执行sh run_yarn.sh之前,记得将训练数据和测试数据放到dmlc.fm.conf中指定的路径下。
     过程中遇到过以下几个问题:

     (1)重新编译dmlc-core会遇到错误提示:
    In file included from src/io.cc:17:0: src/io/hdfs_filesys.h:10:18: 致命错误:hdfs.h:没有那个文件或目录 #include <hdfs.h>
  解决方法:设置HADOOP_HDFS_HOME的环境变量为:export HADOOP_HDFS_HOME=/home/worker/xiaoshu/hadoop/hadoop-2.7.1即可。
    2 mlr:
    第一次使用petuum是在2014年的6月份,当时在360搜索广告算法组实习做广告CTR预估。可以用三种方式使用petuum提供的算法:1,Local: 数据放在local机器上,单机训练;2,HDFS + ssh:将数据放到hdfs上,使用ssh提交job并行训练;3,HDFS + YARN,将数据放到hdfs上,使用yarn提交job并行训练。
     2.1  local模式:
        修改launch.py中train_file, test_file, output_file_prefix(输出路径),执行./launch.py
     2.2 HDFS+ssh模式:
        进入文件夹:petuum/bosen, 将defns.mk.template文件名修改为defns.mk,并将其中关于HDFS选项uncomment:  HAS_HDFS = -DHAS_HADOOP # Uncomment this line to enable hadoop。然后make 即可。
        随后进入:petuum/bosen/app/mlr文件夹,执行make命令编译mlr模型。
        进入petuum/bosen/app/mlr/script文件夹,修改launch.py中的几项配置:
   “train_file”: “hdfs://10.101.2.88:9000/user/worker/petuum/mlr/covtype.scale.train.small”
       “test_file”: “hdfs://10.101.2.88:9000/user/worker/petuum/mlr/covtype.scale.test.small”
        “output_file_prefix”: “hdfs://10.101.2.88:9000/user/worker/petuum/mlr/out”
     2.3 HDFS+YARN模式:
        需要用到launch_on_yarn.py和run_local.py这两个文件,只需要修改 run_local.py中的某些配置就可以:
   “train_file”: “hdfs://10.101.2.88:9000/user/worker/petuum/mlr/covtype.scale.train.small”
    “test_file”: “hdfs://10.101.2.88:9000/user/worker/petuum/mlr/covtype.scale.test.small”
    “output_file_prefix”: “hdfs://10.101.2.88:9000/user/worker/petuum/mlr/out”
        然后执行:./launch_on_yarn.py
       遇到的一些问题:
    (1)copy到新机器上直接运行./launch.py,会出现:
        /home/worker/xiaoshu/petuum_local/bosen/app/mlr/bin/mlr_main: error while loading shared libraries: libboost_thread.so.1.58.0: cannot open shared object file: No such file or directory
        重新make就行了。
    (2)执行./launch_on_local_with_hdfs.py
      重新在boson中make,成功;然后在mL中make,提示/usr/bin/ld: cannot find -lhdfs:
      进入/home/worker/xiaoshu/hadoop/hadoop-2.7.1/lib/native,里面有libhadoop.a libhadoop.so    libhadooputils.a  libhdfs.so libhadooppipes.a  libhadoop.so.1.0.0  libhdfs.a         libhdfs.so.0.0.0。
     执行:sudo cp * /usr/lib, 并执行sudo config生效
    (3)在执行./launch_on_local_with_hdfs.py,提示:Environment variable CLASSPATH not set! getJNIEnv: getGlobalJNIEnv failed F0810 04:20:17.638351 39658 hdfs.hpp:185] Cannot connect to HDFS. Host: 10.101.2.88, port: 9000
      执行:export CLASSPATH=`hadoop classpath –glob`:$CLASSPATH;然后即可成功执行./launch_on_local_with_hdfs.py

Hadoop 集群安装过程

        最近研究并行机器学习并将在工作中用来训练CTR模型,因此申请了3台机器(ReadHeat系统),准备把目前比较流行的Parameter server并行计算框架搭建起来,由于会用到HDFS和YARN,因此先安装Hadoop集群,其中一台机器作为master,其余两台机器作为slave。。
    这里初步记录一下搭建Hadoop环境的过程,也欢迎大家提出意见和问题进行交流。
一,环境设置
    1,安装Java,执行命令:yum install java,然后通过whereis java命令可以看到idk所在的路径。
    对三台机器分别进行环境变量的设置:
    有两种方式:
        对所有用户生效:sudo vi /etc/profile,添加
        export JAVA_HOME=/usr/local/jdk1.7/
        export PATH=$JAVA_HOME/bin:$PATH
        然后:  source /etc/profile
        只对当前用户有效:sudo vi ~/.bashrc,添加:
        export JAVA_HOME=/usr/local/jdk1.7/
        export PATH=$JAVA_HOME/bin:$PATH
        然后source ~/.bashrc
    2,修改hosts文件(在三台机器上都进行下面的操作)
        sudo vi /etc/hosts
        添加(或者修改):
        10.101.2.88 master
        10.101.2.89 slave1
        10.101.2.90 slave2
        IP和hostname之间空格分隔。
      3,修改hostname文件(在三台机器上都进行下面的操作)
        sudo vi /etc/hostname
        在master主机上此文件中内容修改为:master
        在slave1主机上此文件中内容修改为:slave1
         在slave2主机上此文件中内容修改为:slave2
      4,ssh免密码登录:
        说明:master必须能免密码登录slave1, slave2, 并且要能免密码登录本机。
        ssh-keygen
        ssh-copy-id worker@10.101.2.89(slave1)
        ssh-copy-id worker@10.101.2.90(slave2)
        cat ~/.ssh/id_rsa.pub>>~/.ssh/authorized_keys(如果authorized_keys中已经有master的rsa,一定要先删除)
二, 安装Hadoop
    1, 下载hadoop-2.7.2, 解压到master机器的任意一个文件夹中
        cd hadoop-2.7.2
        mkdir hdfs;
        mkdir hdfs/name;
        mkdir hdfs/data;
        mkdir tmp
    2, 修改配置文件:
        2.1 ,进入hadoop-2.7.1/etc/hadoop文件夹,core-site配置文件如下:
    <configuration>
    <property>
         <name>hadoop.tmp.dir</name>
         <value>/home/worker/xiaoshu/hadoop/hadoop-2.7.1/tmp</value>
     </property>
     <property>
         <name>fs.defaultFS</name>
         <value>hdfs://master:9000</value>
     </property>
     <property>
        <name>fs.hdfs.impl</name>
        <value>org.apache.hadoop.hdfs.DistributedFileSystem</value>
        <description>The FileSystem for hdfs: uris.</description>
     </property>
</configuration>
    2.2,hdfs-site.xml配置内容如下:
<configuration>
    <property>
            <name>dfs.namenode.name.dir</name>
            <value>/home/worker/xiaoshu/hadoop/hadoop-2.7.1/hdfs/name</value>
    </property>
    <property>
            <name>dfs.datanode.data.dir</name>
            <value>/home/worker/xiaoshu/hadoop/hadoop-2.7.1/hdfs/data</value>
    </property>
    <property>
            <name>dfs.replication</name>
            <value>2</value>
    </property>
    <property>
        <name>dfs.permissions</name>
        <value>false</value>
    </property>
</configuration>
    2.3,yarn-site.xml配置内容如下:
<configuration>
<property>
<name>yarn.nodemanager.aux-services</name>
<value>mapreduce_shuffle</value>
</property>
<property>
<name>yarn.nodemanager.aux-services.mapreduce.shuffle.class</name>
<value>org.apache.hadoop.mapred.shuffleHandler</value>
</property>
<property>
<name>yarn.nodemanager.vmem-pmem-ratio</name>
<value>100</value>
</property>
<property>
<name>yarn.resourcemanager.address</name>
<value>master:8032</value>
</property>
<property>
<name>yarn.resourcemanager.scheduler.address</name>
<value>master:8030</value>
</property>
<property>
        <name>yarn.resourcemanager.resource-tracker.address</name>
                <value>master:8031</value>
                    </property>
<property>
        <name>yarn.resourcemanager.admin.address</name>
                <value>master:8033</value>
                    </property>
<property>
        <name>yarn.resourcemanager.webapp.address</name>
                <value>master:8088</value>
                    </property>
</configuration>
    2.4,mapred-site.xml配置内容如下:
<configuration>
    <property>
    <name>mapreduce.framework.name</name>
    <value>yarn</value>
    </property>
</configuration>
    配置完毕之后,将hadoop-2.7.2文件夹分别copy到slave1和slave2机器上,文件夹所在路径最好路径相同。
    3, 格式化namenode:
    只在master机器上执行:
    (格式化之前:要在hadoop-2.7.1/重新创建hdfs文件夹)
    ssh master ‘/home/worker/xiaoshu/hadoop/hadoop-2.7.1/bin/hdfs namenode -format -clusterId cluster1’
    4,启动namenode,datanode和resourceManager、nodeManager
    4.1 启动namenode:
    在master上执行:
    ssh master ‘/home/worker/xiaoshu/hadoop/hadoop-2.7.1/sbin/hadoop-daemon.sh start namenode’
    4.2 启动slave1上的datanode:
    在master上执行:
    ssh slave1 ‘/home/worker/xiaoshu/hadoop/hadoop-2.7.1/sbin/hadoop-daemon.sh start datanode’
    4.3 启动slave2上的datanode:
    ssh slave2 ‘/home/worker/xiaoshu/hadoop/hadoop-2.7.1/sbin/hadoop-daemon.sh start datanode’
    4.4 启动YARN:
    在master上执行:
    ssh master ‘/home/worker/xiaoshu/hadoop/hadoop-2.7.1/sbin/start-yarn.sh’
    最后,在master上输入jps,看到:
    5863 ResourceManager
    5657 NameNode
    5545 Jps
    在两个slave上输入jps看到:
    37136 NodeManager
    37009 DataNode
    86469 Jps
    出现以上结果表明hadoop集群已经安装成功。
    master节点上有NameNode,slave节点上有DataNode说明HDFS已经正常启动;master节点上有ResourceManager,slave节点上有NodeManager,说明YARN已经正常启动。