为什么

目前自己采购了35块树莓派4core4G用来计算Rosetta@home,等到COVID-2019过去之后,会用这些开发板测试TF项目,因为TF的生态链齐全,所以在工业上会考虑使用TF来做为最终的产品技术使用方案。

未来的趋势会面向边缘计算领域,像自动驾驶,智能家居,家庭医疗辅助系统,农业生产,制造业零部件质量检测,工业机械磨损检测等等,都会考虑到数据的实时接受和传输,还有计算成本,在机器学习上,如果依靠云计算平台,去辅助上述这些项目,那么就需要考虑本地到服务中心的网络,带宽延迟,数据安全性,计算实时性的问题。如果依托边缘计算,采用工业的微控制器,在微控制器上部署模型以及Tensorflow Lite用于模型的推演,在本地解决计算高可用问题,无需将数据传递到公网上,减少带宽的消耗,从而降低计算成本。

Tensorflow aarch64 源码构建

安装依赖

apt-get install libatlas3-base libopenblas-dev libopenblas-base libblas-dev gcc gfortran python3-dev libgfortran5 g++ libhdf5-dev libfreetype-dev build-essential openjdk-11-jdk zip unzip python3-h5py python3-numpy python3-pip

sudo pip3 install keras_preprocessing keras_applications

安装 Bazel

install-compile-bootstrap-unix

bazel 官方未给出arm64架构的二进制文件,所以需要自己手工编译

下载 bazel-2.0.0-dist.zip

运行 EXTRA_BAZEL_ARGS="--host_javabase=@local_jdk//:jdk" bash ./compile.sh

拷贝 output/bazel 到 /usr/local/bin/hazel

编译 Tensorflow

Raspberry pi 4B 上安装的系统是Ubuntu 20.04 ARM64架构,Python Version: 3.8,Tensorflow官方未给出对应版本的python whl安装包,所以需要自己手动从源码构建

git clone https://github.com/tensorflow/tensorflow.git

git checkout v2.2.0

./configure 配置选项

编译操作之前需要增加swap分区,4G系统内存编译是完全不够的,建议swap设置6G
最好能够单独增加一块USB3转SATA的移动硬盘用来单独增加SWAP分区

fallocate -l 6G /swapfile
chmod 0600 /swapfile
mkswap /swapfile
swapon /swapfile

执行编译操作 bazel  build --config=noaws --config=nogcp --config=nohdfs --config=nonccl --config=monolithic --config=v2 --local_cpu_resources=3 //tensorflow/tools/pip_package:build_pip_package

由于是直接在4核4G的Raspberry pi 上构建,所以需要耐心等待,编译时间大概在 15 - 25 小时之间
☕️☕️☕️

编译完成之后执行构建pip安装包 bazel-bin/tensorflow/tools/pip_package/build_pip_package /tmp/tensorflow_pkg

tensorflow-2.2.0-cp38-cp38-linux_aarch64.whl 安装包构建完成

最后执行 pip install tensorflow-2.2.0-cp38-cp38-linux_aarch64.whl

TF 测试

使用官方的最简MNIST教程中的代码

import tensorflow as tf
mnist = tf.keras.datasets.mnist
(x_train, y_train),(x_test, y_test) = mnist.load_data()
x_train, x_test = x_train / 255.0, x_test / 255.0
model = tf.keras.models.Sequential([
  tf.keras.layers.Flatten(input_shape=(28, 28)),
  tf.keras.layers.Dense(128, activation='relu'),
  tf.keras.layers.Dropout(0.2),
  tf.keras.layers.Dense(10, activation='softmax')
])
model.compile(optimizer='adam',
              loss='sparse_categorical_crossentropy',
              metrics=['accuracy'])
model.fit(x_train, y_train, epochs=5)
model.evaluate(x_test, y_test)
Epoch 1/5
1875/1875 [==============================] - 10s 6ms/step - loss: 0.2953 - accuracy: 0.9140
Epoch 2/5
1875/1875 [==============================] - 10s 6ms/step - loss: 0.1426 - accuracy: 0.9578
Epoch 3/5
1875/1875 [==============================] - 10s 6ms/step - loss: 0.1074 - accuracy: 0.9674
Epoch 4/5
1875/1875 [==============================] - 11s 6ms/step - loss: 0.0878 - accuracy: 0.9728
Epoch 5/5
1875/1875 [==============================] - 10s 6ms/step - loss: 0.0746 - accuracy: 0.9766
313/313 [==============================] - 1s 3ms/step - loss: 0.0690 - accuracy: 0.9783
Out[1]: [0.06903273612260818, 0.9782999753952026]

TF 分布式训练测试

分布式训练模式

  1. 模型并行 -> 模型分散到每台raspberry pi的CPU中,每个CPU独立计算和更新,各个设备之间的数据输入都是一致的。

  2. 数据并行 -> 数据分散到每台raspberry pi的CPU中,每个CPU获取到的模型都是一个副本,而输入的数据却不一样,相当于增加训练样本数据,加快模型学习速度。

分布式训练策略

  1. MirroredStrategy
  2. TPUStrategy
  3. MultiWorkerMirroredStrategy
  4. CentralStorageStrategy
  5. ParameterServerStrategy

下午主要会采用MultiWorkerMirroredStrategy的策略模式来训练模型

分布式训练 - MNIST

host 1 worker1.py 代码:

#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
Created on Thu Jun  4 14:16:43 2020

@author: alexchen
"""


import tensorflow as tf
import numpy as np
import json
import os

os.environ['TF_CONFIG'] = json.dumps({
    'cluster': {
        'worker': ["192.168.1.142:12345", "192.168.1.190:23456"]
    },
    'task': {'type': 'worker', 'index': 1}
})

strategy = tf.distribute.experimental.MultiWorkerMirroredStrategy()

def mnist_dataset(batch_size):
  (x_train, y_train), _ = tf.keras.datasets.mnist.load_data()
  # The `x` arrays are in uint8 and have values in the range [0, 255].
  # We need to convert them to float32 with values in the range [0, 1]
  x_train = x_train / np.float32(255)
  y_train = y_train.astype(np.int64)
  train_dataset = tf.data.Dataset.from_tensor_slices(
      (x_train, y_train)).shuffle(60000).repeat().batch(batch_size)
  return train_dataset


def build_and_compile_cnn_model():
  model = tf.keras.Sequential([
      tf.keras.Input(shape=(28, 28)),
      tf.keras.layers.Reshape(target_shape=(28, 28, 1)),
      tf.keras.layers.Conv2D(32, 3, activation='relu'),
      tf.keras.layers.Flatten(),
      tf.keras.layers.Dense(128, activation='relu'),
      tf.keras.layers.Dense(10)
  ])
  model.compile(
      loss=tf.keras.losses.SparseCategoricalCrossentropy(from_logits=True),
      optimizer=tf.keras.optimizers.SGD(learning_rate=0.001),
      metrics=['accuracy'])
  return model

num_workers = 2

# Here the batch size scales up by number of workers since
# `tf.data.Dataset.batch` expects the global batch size. Previously we used 64,
# and now this becomes 128.
per_worker_batch_size = 2000
global_batch_size = per_worker_batch_size * num_workers
multi_worker_dataset = mnist_dataset(global_batch_size)

with strategy.scope():
  multi_worker_model = build_and_compile_cnn_model()

multi_worker_model.fit(multi_worker_dataset, epochs=20, steps_per_epoch=100)

keras_model_path = "/tmp/keras_save"
multi_worker_model.save(keras_model_path)

host 2 worker2.py 代码:

#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
Created on Thu Jun  4 14:16:43 2020

@author: alexchen
"""


import tensorflow as tf
import numpy as np
import json
import os

os.environ['TF_CONFIG'] = json.dumps({
    'cluster': {
        'worker': ["192.168.1.142:12345", "192.168.1.190:23456"]
    },
    'task': {'type': 'worker', 'index': 0}
})

strategy = tf.distribute.experimental.MultiWorkerMirroredStrategy()

def mnist_dataset(batch_size):
  (x_train, y_train), _ = tf.keras.datasets.mnist.load_data()
  # The `x` arrays are in uint8 and have values in the range [0, 255].
  # We need to convert them to float32 with values in the range [0, 1]
  x_train = x_train / np.float32(255)
  y_train = y_train.astype(np.int64)
  train_dataset = tf.data.Dataset.from_tensor_slices(
      (x_train, y_train)).shuffle(60000).repeat().batch(batch_size)
  return train_dataset


def build_and_compile_cnn_model():
  model = tf.keras.Sequential([
      tf.keras.Input(shape=(28, 28)),
      tf.keras.layers.Reshape(target_shape=(28, 28, 1)),
      tf.keras.layers.Conv2D(32, 3, activation='relu'),
      tf.keras.layers.Flatten(),
      tf.keras.layers.Dense(128, activation='relu'),
      tf.keras.layers.Dense(10)
  ])
  model.compile(
      loss=tf.keras.losses.SparseCategoricalCrossentropy(from_logits=True),
      optimizer=tf.keras.optimizers.SGD(learning_rate=0.001),
      metrics=['accuracy'])
  return model

num_workers = 2

# Here the batch size scales up by number of workers since
# `tf.data.Dataset.batch` expects the global batch size. Previously we used 64,
# and now this becomes 128.
per_worker_batch_size = 2000
global_batch_size = per_worker_batch_size * num_workers
multi_worker_dataset = mnist_dataset(global_batch_size)

with strategy.scope():
  multi_worker_model = build_and_compile_cnn_model()

multi_worker_model.fit(multi_worker_dataset, epochs=20, steps_per_epoch=100)
keras_model_path = "/tmp/keras_save"
multi_worker_model.save(keras_model_path)

两者的区别只在于TF_CONFIG变量的设定

os.environ['TF_CONFIG'] = json.dumps({
    'cluster': {
        'worker': ["192.168.1.142:12345", "192.168.1.190:23456"]
    },
    'task': {'type': 'worker', 'index': 1}
})

分布式训练 - dog&cat

cnn-distributed.py

#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
Created on Fri Jun  5 14:23:53 2020

@author: alexchen
"""
import tensorflow as tf
from tensorflow.keras.models import Sequential
from tensorflow.keras.layers import Conv2D
from tensorflow.keras.layers import MaxPooling2D
from tensorflow.keras.layers import Flatten
from tensorflow.keras.layers import Dense
import tensorflow_datasets as tfds
import json
import os

os.environ['TF_CONFIG'] = json.dumps({
    'cluster': {
        'worker': ["192.168.1.142:12345", "192.168.1.190:23456"]
    },
    'task': {'type': 'worker', 'index': 1}
})

def create_model():
    model = Sequential([
        Conv2D(32,(3,3),input_shape=(64,64,3),activation='relu'),
        MaxPooling2D(pool_size=(2,2)),
        Conv2D(32,(3,3),activation='relu'),
        MaxPooling2D(pool_size=(2,2)),
        Flatten(),
        Dense(units=128,activation='relu'),
        Dense(units=1, activation='sigmoid')
        ])
    model.compile(optimizer='adam',loss='binary_crossentropy',metrics = ['accuracy'])
    return model

per_worker_batch_size = 32
workes = 2
batch_size = per_worker_batch_size * workes

strategy = tf.distribute.experimental.MultiWorkerMirroredStrategy()

dataset = tfds.load("cats_vs_dogs", split=tfds.Split.TRAIN, as_supervised=True)
def resize(image, label):
    image = tf.image.resize(image, [64, 64]) / 255.0
    return image, label

multi_worker_dataset = dataset.repeat().map(resize).shuffle(1024).batch(batch_size)
with strategy.scope():
    model = create_model()

model.fit(multi_worker_dataset,
          steps_per_epoch = 50,
          epochs = 10)

最后输出正确率为:

Epoch 10/10
 6/50 [==>...........................] - ETA: 25s - loss: 0.5148 - accuracy: 0.7682Corrupt JPEG data: 99 extraneous bytes before marker 0xd9
24/50 [=============>................] - ETA: 17s - loss: 0.4916 - accuracy: 0.7721Warning: unknown JFIF revision number 0.00
30/50 [=================>............] - ETA: 13s - loss: 0.4956 - accuracy: 0.7693Corrupt JPEG data: 396 extraneous bytes before marker 0xd9
50/50 [==============================] - 35s 694ms/step - loss: 0.4803 - accuracy: 0.7794

在单台raspberry pi上基于相同的数据集训练相同的模型,得到的正确率为:

50/50 [==============================] - 18s 351ms/step - loss: 0.5901 - accuracy: 0.6938
Epoch 9/10
50/50 [==============================] - 18s 356ms/step - loss: 0.5774 - accuracy: 0.7038
Epoch 10/10
50/50 [==============================] - 18s 355ms/step - loss: 0.5643 - accuracy: 0.7063

因为分布式训练中采用的是数据并行,两份相同的模型基于不同的数据进行训练,因此分布式训练损失函数的收敛相较于单机训练更快。

分布式训练还需要考虑集群内IO和Network的吞吐量性能。

TF 分布式训练计算实验项目

搭建完成集群之后,可以在自己运行的树莓派集群上对分布式训练进行测试,受限于内存和IO的制约,对于训练深度网络会有大量的瓶颈存在,不过可以用来练手。

比如:

  • 对加密货币的价格预测

  • 量化金融

  • 训练VGG16图像分类深度网络