为什么
目前自己采购了35块树莓派4core4G用来计算Rosetta@home,等到COVID-2019过去之后,会用这些开发板测试TF项目,因为TF的生态链齐全,所以在工业上会考虑使用TF来做为最终的产品技术使用方案。
未来的趋势会面向边缘计算领域,像自动驾驶,智能家居,家庭医疗辅助系统,农业生产,制造业零部件质量检测,工业机械磨损检测等等,都会考虑到数据的实时接受和传输,还有计算成本,在机器学习上,如果依靠云计算平台,去辅助上述这些项目,那么就需要考虑本地到服务中心的网络,带宽延迟,数据安全性,计算实时性的问题。如果依托边缘计算,采用工业的微控制器,在微控制器上部署模型以及Tensorflow Lite用于模型的推演,在本地解决计算高可用问题,无需将数据传递到公网上,减少带宽的消耗,从而降低计算成本。
Tensorflow aarch64 源码构建
安装依赖
apt-get install libatlas3-base libopenblas-dev libopenblas-base libblas-dev gcc gfortran python3-dev libgfortran5 g++ libhdf5-dev libfreetype-dev build-essential openjdk-11-jdk zip unzip python3-h5py python3-numpy python3-pip
sudo pip3 install keras_preprocessing keras_applications
安装 Bazel
install-compile-bootstrap-unix
bazel 官方未给出arm64架构的二进制文件,所以需要自己手工编译
下载 bazel-2.0.0-dist.zip
运行 EXTRA_BAZEL_ARGS="--host_javabase=@local_jdk//:jdk" bash ./compile.sh
拷贝 output/bazel 到 /usr/local/bin/hazel
编译 Tensorflow
Raspberry pi 4B 上安装的系统是Ubuntu 20.04 ARM64架构,Python Version: 3.8,Tensorflow官方未给出对应版本的python whl安装包,所以需要自己手动从源码构建
git clone https://github.com/tensorflow/tensorflow.git
git checkout v2.2.0
./configure 配置选项
编译操作之前需要增加swap分区,4G系统内存编译是完全不够的,建议swap设置6G
最好能够单独增加一块USB3转SATA的移动硬盘用来单独增加SWAP分区
fallocate -l 6G /swapfile
chmod 0600 /swapfile
mkswap /swapfile
swapon /swapfile
执行编译操作 bazel build --config=noaws --config=nogcp --config=nohdfs --config=nonccl --config=monolithic --config=v2 --local_cpu_resources=3 //tensorflow/tools/pip_package:build_pip_package
由于是直接在4核4G的Raspberry pi 上构建,所以需要耐心等待,编译时间大概在 15 - 25 小时之间
☕️☕️☕️
编译完成之后执行构建pip安装包 bazel-bin/tensorflow/tools/pip_package/build_pip_package /tmp/tensorflow_pkg
tensorflow-2.2.0-cp38-cp38-linux_aarch64.whl 安装包构建完成
最后执行 pip install tensorflow-2.2.0-cp38-cp38-linux_aarch64.whl
TF 测试
使用官方的最简MNIST教程中的代码
import tensorflow as tf
mnist = tf.keras.datasets.mnist
(x_train, y_train),(x_test, y_test) = mnist.load_data()
x_train, x_test = x_train / 255.0, x_test / 255.0
model = tf.keras.models.Sequential([
tf.keras.layers.Flatten(input_shape=(28, 28)),
tf.keras.layers.Dense(128, activation='relu'),
tf.keras.layers.Dropout(0.2),
tf.keras.layers.Dense(10, activation='softmax')
])
model.compile(optimizer='adam',
loss='sparse_categorical_crossentropy',
metrics=['accuracy'])
model.fit(x_train, y_train, epochs=5)
model.evaluate(x_test, y_test)
Epoch 1/5
1875/1875 [==============================] - 10s 6ms/step - loss: 0.2953 - accuracy: 0.9140
Epoch 2/5
1875/1875 [==============================] - 10s 6ms/step - loss: 0.1426 - accuracy: 0.9578
Epoch 3/5
1875/1875 [==============================] - 10s 6ms/step - loss: 0.1074 - accuracy: 0.9674
Epoch 4/5
1875/1875 [==============================] - 11s 6ms/step - loss: 0.0878 - accuracy: 0.9728
Epoch 5/5
1875/1875 [==============================] - 10s 6ms/step - loss: 0.0746 - accuracy: 0.9766
313/313 [==============================] - 1s 3ms/step - loss: 0.0690 - accuracy: 0.9783
Out[1]: [0.06903273612260818, 0.9782999753952026]
TF 分布式训练测试
分布式训练模式
-
模型并行 -> 模型分散到每台raspberry pi的CPU中,每个CPU独立计算和更新,各个设备之间的数据输入都是一致的。
-
数据并行 -> 数据分散到每台raspberry pi的CPU中,每个CPU获取到的模型都是一个副本,而输入的数据却不一样,相当于增加训练样本数据,加快模型学习速度。
分布式训练策略
- MirroredStrategy
- TPUStrategy
- MultiWorkerMirroredStrategy
- CentralStorageStrategy
- ParameterServerStrategy
下午主要会采用MultiWorkerMirroredStrategy的策略模式来训练模型
分布式训练 - MNIST
host 1 worker1.py 代码:
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
Created on Thu Jun 4 14:16:43 2020
@author: alexchen
"""
import tensorflow as tf
import numpy as np
import json
import os
os.environ['TF_CONFIG'] = json.dumps({
'cluster': {
'worker': ["192.168.1.142:12345", "192.168.1.190:23456"]
},
'task': {'type': 'worker', 'index': 1}
})
strategy = tf.distribute.experimental.MultiWorkerMirroredStrategy()
def mnist_dataset(batch_size):
(x_train, y_train), _ = tf.keras.datasets.mnist.load_data()
# The `x` arrays are in uint8 and have values in the range [0, 255].
# We need to convert them to float32 with values in the range [0, 1]
x_train = x_train / np.float32(255)
y_train = y_train.astype(np.int64)
train_dataset = tf.data.Dataset.from_tensor_slices(
(x_train, y_train)).shuffle(60000).repeat().batch(batch_size)
return train_dataset
def build_and_compile_cnn_model():
model = tf.keras.Sequential([
tf.keras.Input(shape=(28, 28)),
tf.keras.layers.Reshape(target_shape=(28, 28, 1)),
tf.keras.layers.Conv2D(32, 3, activation='relu'),
tf.keras.layers.Flatten(),
tf.keras.layers.Dense(128, activation='relu'),
tf.keras.layers.Dense(10)
])
model.compile(
loss=tf.keras.losses.SparseCategoricalCrossentropy(from_logits=True),
optimizer=tf.keras.optimizers.SGD(learning_rate=0.001),
metrics=['accuracy'])
return model
num_workers = 2
# Here the batch size scales up by number of workers since
# `tf.data.Dataset.batch` expects the global batch size. Previously we used 64,
# and now this becomes 128.
per_worker_batch_size = 2000
global_batch_size = per_worker_batch_size * num_workers
multi_worker_dataset = mnist_dataset(global_batch_size)
with strategy.scope():
multi_worker_model = build_and_compile_cnn_model()
multi_worker_model.fit(multi_worker_dataset, epochs=20, steps_per_epoch=100)
keras_model_path = "/tmp/keras_save"
multi_worker_model.save(keras_model_path)
host 2 worker2.py 代码:
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
Created on Thu Jun 4 14:16:43 2020
@author: alexchen
"""
import tensorflow as tf
import numpy as np
import json
import os
os.environ['TF_CONFIG'] = json.dumps({
'cluster': {
'worker': ["192.168.1.142:12345", "192.168.1.190:23456"]
},
'task': {'type': 'worker', 'index': 0}
})
strategy = tf.distribute.experimental.MultiWorkerMirroredStrategy()
def mnist_dataset(batch_size):
(x_train, y_train), _ = tf.keras.datasets.mnist.load_data()
# The `x` arrays are in uint8 and have values in the range [0, 255].
# We need to convert them to float32 with values in the range [0, 1]
x_train = x_train / np.float32(255)
y_train = y_train.astype(np.int64)
train_dataset = tf.data.Dataset.from_tensor_slices(
(x_train, y_train)).shuffle(60000).repeat().batch(batch_size)
return train_dataset
def build_and_compile_cnn_model():
model = tf.keras.Sequential([
tf.keras.Input(shape=(28, 28)),
tf.keras.layers.Reshape(target_shape=(28, 28, 1)),
tf.keras.layers.Conv2D(32, 3, activation='relu'),
tf.keras.layers.Flatten(),
tf.keras.layers.Dense(128, activation='relu'),
tf.keras.layers.Dense(10)
])
model.compile(
loss=tf.keras.losses.SparseCategoricalCrossentropy(from_logits=True),
optimizer=tf.keras.optimizers.SGD(learning_rate=0.001),
metrics=['accuracy'])
return model
num_workers = 2
# Here the batch size scales up by number of workers since
# `tf.data.Dataset.batch` expects the global batch size. Previously we used 64,
# and now this becomes 128.
per_worker_batch_size = 2000
global_batch_size = per_worker_batch_size * num_workers
multi_worker_dataset = mnist_dataset(global_batch_size)
with strategy.scope():
multi_worker_model = build_and_compile_cnn_model()
multi_worker_model.fit(multi_worker_dataset, epochs=20, steps_per_epoch=100)
keras_model_path = "/tmp/keras_save"
multi_worker_model.save(keras_model_path)
两者的区别只在于TF_CONFIG变量的设定
os.environ['TF_CONFIG'] = json.dumps({
'cluster': {
'worker': ["192.168.1.142:12345", "192.168.1.190:23456"]
},
'task': {'type': 'worker', 'index': 1}
})
分布式训练 - dog&cat
cnn-distributed.py
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
Created on Fri Jun 5 14:23:53 2020
@author: alexchen
"""
import tensorflow as tf
from tensorflow.keras.models import Sequential
from tensorflow.keras.layers import Conv2D
from tensorflow.keras.layers import MaxPooling2D
from tensorflow.keras.layers import Flatten
from tensorflow.keras.layers import Dense
import tensorflow_datasets as tfds
import json
import os
os.environ['TF_CONFIG'] = json.dumps({
'cluster': {
'worker': ["192.168.1.142:12345", "192.168.1.190:23456"]
},
'task': {'type': 'worker', 'index': 1}
})
def create_model():
model = Sequential([
Conv2D(32,(3,3),input_shape=(64,64,3),activation='relu'),
MaxPooling2D(pool_size=(2,2)),
Conv2D(32,(3,3),activation='relu'),
MaxPooling2D(pool_size=(2,2)),
Flatten(),
Dense(units=128,activation='relu'),
Dense(units=1, activation='sigmoid')
])
model.compile(optimizer='adam',loss='binary_crossentropy',metrics = ['accuracy'])
return model
per_worker_batch_size = 32
workes = 2
batch_size = per_worker_batch_size * workes
strategy = tf.distribute.experimental.MultiWorkerMirroredStrategy()
dataset = tfds.load("cats_vs_dogs", split=tfds.Split.TRAIN, as_supervised=True)
def resize(image, label):
image = tf.image.resize(image, [64, 64]) / 255.0
return image, label
multi_worker_dataset = dataset.repeat().map(resize).shuffle(1024).batch(batch_size)
with strategy.scope():
model = create_model()
model.fit(multi_worker_dataset,
steps_per_epoch = 50,
epochs = 10)
最后输出正确率为:
Epoch 10/10
6/50 [==>...........................] - ETA: 25s - loss: 0.5148 - accuracy: 0.7682Corrupt JPEG data: 99 extraneous bytes before marker 0xd9
24/50 [=============>................] - ETA: 17s - loss: 0.4916 - accuracy: 0.7721Warning: unknown JFIF revision number 0.00
30/50 [=================>............] - ETA: 13s - loss: 0.4956 - accuracy: 0.7693Corrupt JPEG data: 396 extraneous bytes before marker 0xd9
50/50 [==============================] - 35s 694ms/step - loss: 0.4803 - accuracy: 0.7794
在单台raspberry pi上基于相同的数据集训练相同的模型,得到的正确率为:
50/50 [==============================] - 18s 351ms/step - loss: 0.5901 - accuracy: 0.6938
Epoch 9/10
50/50 [==============================] - 18s 356ms/step - loss: 0.5774 - accuracy: 0.7038
Epoch 10/10
50/50 [==============================] - 18s 355ms/step - loss: 0.5643 - accuracy: 0.7063
因为分布式训练中采用的是数据并行,两份相同的模型基于不同的数据进行训练,因此分布式训练损失函数的收敛相较于单机训练更快。
分布式训练还需要考虑集群内IO和Network的吞吐量性能。
TF 分布式训练计算实验项目
搭建完成集群之后,可以在自己运行的树莓派集群上对分布式训练进行测试,受限于内存和IO的制约,对于训练深度网络会有大量的瓶颈存在,不过可以用来练手。
比如:
-
对加密货币的价格预测
-
量化金融
-
训练VGG16图像分类深度网络