@lijiang

Sculpting in time

Do one thing and do it well.
Every story has a beginning and an end.

边缘领域的ML

TinyML在工业领域的探索

7-Minute Read

本文用到的Mathematica notebook源码可以在这里下载

为什么写这篇文章

从目前的市场环境看,有很多大型公司都在提倡人工智能的发展,然而绝大部分我们能看到的AI的应用就是智能小车自动驾驶图像识别行为预测医疗辅助智能推荐系统语音识别图像与语音生成,在这些应用中,很少会与大规模工业领域有交集,原因就在于机器学习运算的成本。

首先像工业领域的机器学习的推演预测,第一个要保证就是数据安全性和实时性,在介绍Coral Dev Board本地机器学习的文章中,我们可以看到本地机器学习的应用范围,在工业范围内的AI应用中,我们可以看出占比很少,像自动驾驶医疗辅助可以算是在工业领域的初探,大部分的人工智能应用都是停留在一个概念上,很少出现有能应用在工业领域并且具有商业性质的项目,原因就是在安全性和低成本。

试想你开发了一个ML应用,如果要应用到工业领域,比如检测设备故障,预测机器运行的下一个状态,公司盈利与设备故障率及工人流动情况的关系,在部署这些模型时,我们要考虑在工业方面的苛刻要求,比如这个工厂基本上没有覆盖网络,那么你要去部署检测设备故障模型的这一个环节就会遇到问题,是采用本地集群部署,还是部署一个终端,然后终端的数据由人为来进行录入,在采用集群部署时,因为会有成千上万台机器设备,所以部署的节点也会有很大的成本,像部署模型在一台Jetson Nano的设备上,虽然成本已经很低,但是部署成千上万个Jetson Nano,你就要考虑功耗,考虑nano模块的故障率以及模块更换的成本,还需要考虑模块与模块之间的高可用网络成本。那么能否将模型部署到更低廉的设备上,比如像微控制器的单片机,这样可以降低很多成本,并且功耗很低,对于大型公司采购此方案的成功率就会有很大的提升。

接下来的5-10年,或许就是机器学习在工业领域的兴起,TinyML意指在小型的微控制器上运算和推演模型,它可以运行在像我们平时接触的家电设备的微控制器上,可以运行在很小的集成电路里面和无处不在的小型设备上。

ML在工业领域的展望,比如像电影领域,通过小型穿戴设备进行演员的动作捕捉计算,目前购买一套动作捕捉的设备是相当昂贵的,还有电影中故事板的制作,对于独立电影人来说将是福利,通过运行有生成模型的小型设备,独立电影人可以通过所处的环境,来生成三维结构的环境,然后导入模型人物来进行预演,最后生成一张张故事板,提高了整个独立创作的效率,也让演员能够更好的理解现场的氛围和如何更好的表演。当然要让ML去创作出好听的小提琴音乐,那是相当难的,更不用说拉小提琴🎻,因为有灵魂注入到了琴中😄。

TinyML的工作流程

  1. 确定目标
  2. 收集数据
  3. 针对实际场景设计神经网络架构
  4. 训练模型
  5. 转换和部署模型
  6. 排查运行过程中的错误问题

实例

拟合Sin(x)函数,在给定x的情况下预测出Sin(x)的值

Mathematica 原型设计:

data = Table[
   x -> Sin[x] + RandomVariate[NormalDistribution[0, .15]] , {x, 0, 
    2 \[Pi], .001}];
    
ListPlot[List @@@ data, PlotStyle -> Dashed]

data = RandomSample[data];
trainData = Take[data, {1, Floor[0.6*Length[data]]}];
validationData = 
  Take[data, {Floor[0.6*Length[data]] + 1, Floor[0.8*Length[data]]}];
testData = Take[data, {Floor[0.8*Length[data]] + 1, Length[data]}];
Total[Length /@ {trainData, validationData, testData}] - Length[data]
Length /@ {trainData, validationData, testData}

模型1:

model1 = NetChain[{
   LinearLayer[16],
   Ramp,
   LinearLayer[1]}]

训练:

history1 = 
 NetTrain[model1, trainData, All, ValidationSet -> validationData, 
  MaxTrainingRounds -> 1000, BatchSize -> 16]
  
history1["FinalPlots"]
trainedNet1 = history1["TrainedNet"]

验证集和训练集的损失函数图:

在测试集中验证上述模型:

testDataX = Keys[testData];
testDataY = Values[testData];
predictedTestY = trainedNet1[testDataX];
predictedTestData = MapThread[List, {testDataX, predictedTestY}];
ListPlot[{predictedTestData, List @@@ testData}, 
 PlotLegends -> {"predicted value", "actual"}]

从上面的Test实际与预测图中可以看出,模型还不能很好的拟合Sinx函数,所以需要改进模型,增加模型的深度。

model2 = NetChain[{
   LinearLayer[16],
   Ramp,
   LinearLayer[16],
   Ramp,
   LinearLayer[1]}]
history2 = 
 NetTrain[model2, trainData, All, ValidationSet -> validationData, 
  MaxTrainingRounds -> 1000, BatchSize -> 16]
trainedNet2 = history2["TrainedNet"]

得到改进模型之后的预测对比:

从图表中可以发现,模型中拟合Sinx的非线性特性已经有比前一次更好的表现。

继续增加模型中的层:

model3 = NetChain[{
   LinearLayer[16],
   Ramp,
   LinearLayer[16],
   Ramp,
   LinearLayer[16],
   Ramp,
   LinearLayer[1]}]

模型拟合的最终表现图:

Tensorflow 版本:

数据生成:

#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
Created on Tue Jun 16 13:38:31 2020

@author: alexchen
"""

SAMPLES = 1000

SEED = 1337

import math
import numpy as np
import tensorflow as tf
import matplotlib.pyplot as plt

TRAIN_SPLIT = int(0.6 * SAMPLES)
TEST_SPLIT = int(0.2 * SAMPLES + TRAIN_SPLIT)
x_values = np.random.uniform(low=0, high=2*math.pi, size=SAMPLES)
np.random.shuffle(x_values)
y_values = np.sin(x_values)

x_train, x_valid, x_test = np.split(x_values, [TRAIN_SPLIT, TEST_SPLIT])
y_train, y_valid, y_test = np.split(y_values, [TRAIN_SPLIT, TEST_SPLIT])

模型1:

model = tf.keras.Sequential()

model.add(tf.keras.layers.Dense(16, activation="relu",input_shape=(1,)))
model.add(tf.keras.layers.Dense(1))

model.compile(optimizer='rmsprop',loss='mse',metrics=['mae'])
model.summary()

history_1 = model.fit(x_train, y_train, batch_size=16, epochs=1000,
          validation_data=(x_valid, y_valid))

获得损失函数对比图:

loss = history_1.history['loss']
epochs = range(1,len(loss) + 1)
val_loss = history_1.history['val_loss']
plt.plot(epochs,loss, 'g.', label='Training Loss')
plt.plot(epochs,val_loss, 'b.', label='Validation loss')
plt.title('Model1 Training and Validation Loss')
plt.xlabel('Epochs')
plt.ylabel('Loss')
plt.legend()
plt.show()

SKIP = 100
plt.plot(epochs[SKIP:], loss[SKIP:],'g.',label='Training loss')
plt.plot(epochs[SKIP:], val_loss[SKIP:], 'b.', label = 'Validation loss')
plt.title("Model1 SKIP 100 Training and Validation Loss")
plt.xlabel('Epochs')
plt.ylabel('Loss')
plt.legend()
plt.show()


mae = history_1.history["mae"]
val_mae = history_1.history["val_mae"]
plt.plot(epochs[SKIP:], mae[SKIP:],'g.',label='Training MAE')
plt.plot(epochs[SKIP:], val_mae[SKIP:], 'b.', label = 'Validation MAE')
plt.title("Model1 Training and Validation MAE")
plt.xlabel('Epochs')
plt.ylabel('Loss')
plt.legend()
plt.show()

predictions = model.predict(x_test)
plt.clf()
plt.title("data prediction vs actual data")
plt.plot(x_test, y_test, 'b.', label='Actual')
plt.plot(x_test,predictions, 'r.', label='Predicted')
plt.legend()
plt.show()

TensorBoard 损失函数图:

从最后对Test数据的预测对比图中可以看出,模型并不能很好的拟合Sinx非线性的特性,增加模型的层数。

Model2:

model_2 = tf.keras.Sequential()
model_2.add(tf.keras.layers.Dense(16,activation='relu',input_shape=(1,)))
model_2.add(tf.keras.layers.Dense(16,activation='relu'))
model_2.add(tf.keras.layers.Dense(1))
model_2.compile(optimizer='rmsprop',loss='mse',metrics=['mae'])
model_2.summary()

history_2 = model_2.fit(x_train, y_train, batch_size=16, epochs=1000,
          validation_data=(x_valid, y_valid))
Epoch 998/1000
38/38 [==============================] - 0s 2ms/step - loss: 0.0224 - mae: 0.1188 - val_loss: 0.0238 - val_mae: 0.1210
Epoch 999/1000
38/38 [==============================] - 0s 2ms/step - loss: 0.0222 - mae: 0.1195 - val_loss: 0.0294 - val_mae: 0.1355
Epoch 1000/1000
38/38 [==============================] - 0s 1ms/step - loss: 0.0228 - mae: 0.1207 - val_loss: 0.0248 - val_mae: 0.1225

TensorBoard Loss && MAE:

loss = history_2.history['loss']
epochs = range(1,len(loss) + 1)
val_loss = history_2.history['val_loss']
plt.plot(epochs,loss, 'g.', label='Training Loss')
plt.plot(epochs,val_loss, 'b.', label='Validation loss')
plt.title('Model2 Training and Validation Loss')
plt.xlabel('Epochs')
plt.ylabel('Loss')
plt.legend()
plt.show()


SKIP = 100
plt.plot(epochs[SKIP:], loss[SKIP:],'g.',label='Training loss')
plt.plot(epochs[SKIP:], val_loss[SKIP:], 'b.', label = 'Validation loss')
plt.title("Model2 SKIP 100 Training and Validation Loss")
plt.xlabel('Epochs')
plt.ylabel('Loss')
plt.legend()
plt.show()

mae = history_2.history["mae"]
val_mae = history_2.history["val_mae"]
plt.plot(epochs[SKIP:], mae[SKIP:],'g.',label='Training MAE')
plt.plot(epochs[SKIP:], val_mae[SKIP:], 'b.', label = 'Validation MAE')
plt.title("Model2 Training and Validation MAE")
plt.xlabel('Epochs')
plt.ylabel('Loss')
plt.legend()
plt.show()

loss = model_2.evaluate(x_test,y_test)

predictions = model_2.predict(x_test)
plt.clf()
plt.title("Model2 data prediction vs actual data")
plt.plot(x_test, y_test, 'b.', label='Actual')
plt.plot(x_test,predictions, 'r.', label='Predicted')
plt.legend()
plt.show()

最后得到的分析对比图:

根据最后对Test数据集的实际和预测值的比较,模型2的拟合要优于模型1,模型2中对Sinx的拟合已经非常接近。

转化模型到TFLite格式

## convert model2 to tflite model ##
converter = tf.lite.TFLiteConverter.from_keras_model(model_2)
converter.optimizations = [tf.lite.Optimize.DEFAULT]
def representative_dataset_generator():
    for val in x_test:
        yield [np.array(val,dtype=np.float32,ndmin=2)]
converter.representative_dataset = representative_dataset_generator
tflite_quant_model2 = converter.convert()      
with open("sinx_model2_quantized.tflite","wb") as f:
    f.write(tflite_quant_model2)

导入tflite模型进行数值预测

## check tflite model ##
sinx_model = tf.lite.Interpreter(model_path="sinx_model2_quantized.tflite")
sinx_model.allocate_tensors()
inputs_index = sinx_model.get_input_details()[0]["index"]
output_index = sinx_model.get_output_details()[0]["index"]

sinx_model_predictions = []

for xvalue in x_test:
    x_value_tensor = tf.convert_to_tensor([[xvalue]],dtype=np.float32)
    sinx_model.set_tensor(inputs_index,x_value_tensor)
    sinx_model.invoke()
    sinx_model_predictions.append(sinx_model.get_tensor(output_index)[0])
    
plt.clf()
plt.title("tfLite Model2 data prediction vs actual data")
plt.plot(x_test, y_test, 'b.', label='Actual')
plt.plot(x_test, sinx_model_predictions, 'r.', label='Predicted')
plt.legend()
plt.show()

预测对比结果:

转换成C File

因为很多的微控制没有本地文件系统,那么就需要将模型转换到C文件数组中,然后在微控制器程序中使用该模型数组。

使用命令 xxd -i sinx_model2_quantized.tflite > sinx_model2_quantized.cc将TFlite模型文件转化到C文件:

unsigned char sinx_model2_quantized_tflite[] = {
  0x20, 0x00, 0x00, 0x00, 0x54, 0x46, 0x4c, 0x33, 0x00, 0x00, 0x00, 0x00,
  0x00, 0x00, 0x12, 0x00, 0x1c, 0x00, 0x04, 0x00, 0x08, 0x00, 0x0c, 0x00,
  0x10, 0x00, 0x14, 0x00, 0x00, 0x00, 0x18, 0x00, 0x12, 0x00, 0x00, 0x00,
  0x03, 0x00, 0x00, 0x00, 0x2c, 0x0a, 0x00, 0x00, 0x0c, 0x03, 0x00, 0x00,
  0xf4, 0x02, 0x00, 0x00, 0x3c, 0x00, 0x00, 0x00, 0x04, 0x00, 0x00, 0x00,
  0x01, 0x00, 0x00, 0x00, 0x0c, 0x00, 0x00, 0x00, 0x08, 0x00, 0x0c, 0x00,
  0x04, 0x00, 0x08, 0x00, 0x08, 0x00, 0x00, 0x00, 0x08, 0x00, 0x00, 0x00,
  0x0b, 0x00, 0x00, 0x00, 0x13, 0x00, 0x00, 0x..................};

unsigned int sinx_model2_quantized_tflite_len = 2720;

TinyML开发流程

  1. 模型设计和训练

  2. 将训练好的模型转换成Tensorflow Lite FlatBuffer格式

  3. 将FlatBuffer格式的lite文件转换成C byte array

  4. 将模型代码与TF lite for micro c++ 库链接

  5. 部署模型程序到微控制器上

Arduino Nano 33 BLE 设备

需要将模型部署到该设备上,并让其LED灯的亮度随按照模型计算输出的值进行改变。

Arduino使用TFLite C File

Tensorflow github

Path: tensorflow/lite/micro/examples/hello_world

main.pp

#include <TensorFlowLite.h>
#include "main_functions.h"
#include "tensorflow/lite/micro/all_ops_resolver.h"
#include "constants.h"
#include "model.h" // 上述模型C code的变量定义头文件
#include "output_handler.h"
#include "tensorflow/lite/micro/micro_error_reporter.h"
#include "tensorflow/lite/micro/micro_interpreter.h"
#include "tensorflow/lite/schema/schema_generated.h"
#include "tensorflow/lite/version.h"

// 定义全局变量
namespace {

	  tflite::ErrorReporter* error_reporter = nullptr;
	  const tflite::Model* model = nullptr;
	  tflite::MicroInterpreter* interpreter = nullptr;
	  TfLiteTensor* input = nullptr;
	  TfLiteTensor* output = nullptr;
	  int inference_count = 0;

	  const int kTensorArenaSize = 2 * 1024;
	  uint8_t tensor_arena[kTensorArenaSize];
}

// 栽入模型
void setup() {
  static tflite::MicroErrorReporter micro_error_reporter;
  error_reporter = &micro_error_reporter;

  model = tflite::GetModel(g_model);
  if (model->version() != TFLITE_SCHEMA_VERSION) {
    TF_LITE_REPORT_ERROR(error_reporter,
                         "Model provided is schema version %d not equal "
                         "to supported version %d.",
                         model->version(), TFLITE_SCHEMA_VERSION);
    return;
  }
  
  static tflite::AllOpsResolver resolver;
  static tflite::MicroInterpreter static_interpreter(
      model, resolver, tensor_arena, kTensorArenaSize, error_reporter);
  interpreter = &static_interpreter;

  TfLiteStatus allocate_status = interpreter->AllocateTensors();
  if (allocate_status != kTfLiteOk) {
    TF_LITE_REPORT_ERROR(error_reporter, "AllocateTensors() failed");
    return;
  }
  input = interpreter->input(0);
  output = interpreter->output(0);
  inference_count = 0;
}

void loop() {
  float position = static_cast<float>(inference_count) /
                   static_cast<float>(kInferencesPerCycle);
  float x_val = position * kXrange;

  input->data.f[0] = x_val;

  //进行前向推演计算
  TfLiteStatus invoke_status = interpreter->Invoke();
  if (invoke_status != kTfLiteOk) {
    TF_LITE_REPORT_ERROR(error_reporter, "Invoke failed on x_val: %f\n",
                         static_cast<double>(x_val));
    return;
  }

  float y_val = output->data.f[0]; // 得到推演之后的y值

  HandleOutput(error_reporter, x_val, y_val); // 得到输出y值之后,应用于Arduino的LED亮度调节

  inference_count += 1;
  if (inference_count >= kInferencesPerCycle)
     inference_count = 0;
}

hander.cpp

#include "output_handler.h"
#include "Arduino.h"
#include "constants.h"

int led = LED_BUILTIN;

bool initialized = false;

void HandleOutput(tflite::ErrorReporter* error_reporter, float x_value,
                  float y_value) {
   if (!initialized) {
      pinMode(led, OUTPUT);
      initialized = true;
   }

  int brightness = (int)(127.5f * (y_value + 1));
  analogWrite(led, brightness);
  TF_LITE_REPORT_ERROR(error_reporter, "%d\n", brightness);
}

最后的输出的结果,模拟信号呈现正弦波动状输出

使用 Arduino CLI

Arduino CLI是面向命令行的工具。

命令行比较直观并且易于管理源代码,所以以下例子全部以Arduino CLI来进行编译上传二进制到Arduino Nano 33 BLE 控制板上。

初始化安装:

arduino-cli core update-index
arduino-cli core install arduino:mbed
arduino-cli lib install [email protected]

训练拟合 x Sin[x] + Cos[x]

mathematica 原型设计代码:

data2 = Table[
   x -> x Sin[x] +  Cos[x] + RandomVariate[NormalDistribution[0, .15]],
   {x, 0, 8 \[Pi], .001}];
ListPlot[List @@@ data2, PlotStyle -> Dashed]

data2 = RandomSample[data2];
trainData2 = Take[data2, {1, Floor[0.6*Length[data2]]}];
validationData2 = 
  Take[data2, {Floor[0.6*Length[data2]] + 1, 
    Floor[0.8*Length[data2]]}];
testData2 = Take[data2, {Floor[0.8*Length[data2]] + 1, Length[data2]}];
Total[Length /@ {trainData2, validationData2, testData2}] - 
 Length[data2]
Length /@ {trainData2, validationData2, testData2}
modelSinCos = NetChain[{
   LinearLayer[16],
   Ramp,
   LinearLayer[16],
   Ramp,
   LinearLayer[16],
   Ramp,
   LinearLayer[16],
   Ramp,
   LinearLayer[16],
   Ramp,
   LinearLayer[16],
   Ramp,
   LinearLayer[16],
   Ramp,
   LinearLayer[16],
   Ramp,
   LinearLayer[1]}];
   
fitSinCos2 = 
 NetTrain[modelSinCos, trainData2, All, 
  ValidationSet -> validationData2, 
  LossFunction -> MeanSquaredLossLayer[],
  MaxTrainingRounds -> 3000, BatchSize -> 64]

fitSinCos2["RoundMeasurements"]

模型的损失率在<|“Loss” -> 0.0394896|>

最后用测试数据测试该模型:

trained = fitSinCos2["TrainedNet"];
testData2X = Keys[testData2];
testData2Y = Values[testData2];
predictedTestY = trained[testData2X ];
predictedTestData = MapThread[List, {testData2X, predictedTestY}];
ListPlot[{predictedTestData, List @@@ RandomSample[testData2, 500]}, 
 PlotLegends -> {"predicted value", "actual"}]

蓝色线条为预测的值,从最后的测试数据看出该模型能够有效的拟合函数x Sin[x] + Cos[x]。

Tensorflow 版本:

#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
Created on Thu Jul  2 14:30:43 2020

@author: alexchen
"""

## prepare datasets ##

SAMPLES = 20000
SEED = 1337
import math
import numpy as np
import tensorflow as tf
from tensorflow.keras import layers
import matplotlib.pyplot as plt

TRAIN_SPLIT = int(0.6 * SAMPLES)
TEST_SPLIT = int(0.2 * SAMPLES + TRAIN_SPLIT)
x_values = np.random.uniform(low=0, high=8*math.pi, size=SAMPLES)
np.random.shuffle(x_values)

y_values = x_values * np.sin(x_values) + np.cos(x_values)

x_train, x_valid, x_test = np.split(x_values, [TRAIN_SPLIT, TEST_SPLIT])
y_train, y_valid, y_test = np.split(y_values, [TRAIN_SPLIT, TEST_SPLIT])

plt.plot(x_values,y_values,'bo')

## build ANN ##

model = tf.keras.Sequential(
    [layers.Dense(16,activation='relu',input_shape=(1,)),
     layers.Dense(16,activation='relu'),
     layers.Dense(16,activation='relu'),
     layers.Dense(16,activation='relu'),
     layers.Dense(16,activation='relu'),
     layers.Dense(16,activation='relu'),
     layers.Dense(16,activation='relu'),
     layers.Dense(16,activation='relu'),
     layers.Dense(1)
     ])


model.compile(optimizer='Adam',loss='mse',metrics=["mae", "acc"])

## Training ##

model.fit(x_train,y_train,batch_size=32,epochs=1000,steps_per_epoch=100,
          validation_data=(x_valid,y_valid))


## Plot ##

predictions = model.predict(x_test)
plt.title("data prediction vs actual data")
plt.plot(x_test, y_test, 'b.', label='Actual')
plt.plot(x_test,predictions, 'r.', label='Predicted')
plt.legend()
plt.show()


## Convert model ##

converter = tf.lite.TFLiteConverter.from_keras_model(model)

tflite_model = converter.convert()

with tf.io.gfile.GFile('mode1.tflite','wb') as f:
    f.write(tflite_model)


### 8 bits CPU model size ###

converter.optimizations = [tf.lite.Optimize.DEFAULT]

tflite_quantize_model1 = converter.convert()

with tf.io.gfile.GFile('mode1-default-quant.tflite','wb') as f:
    f.write(tflite_quantize_model1)

### Fuil integer model ###

def representative_dataset_gen():
    for val in x_test:
        yield [np.array(val,dtype=np.float16,ndim=2)]
        
converter.representative_dataset = representative_dataset_gen
tflite_quantize_model2 = converter.convert()

with tf.io.gfile.GFile('mode1-default-quant-integer.tflite','wb') as f:
    f.write(tflite_quantize_model2)

预测值与实际值的对比图:

使用xxd将tflite转换成C源码文件,然后与Arduino源码结合,使用arduino-cli进行编译和上传代码到nano BLE上。

最后将完成的Arduino源码项目直接在Arduino IDE 中打开(文件为fitfunctions.ino),或者使用arduino-cli编译上传代码到BLE控制器中即可,LED指示灯会随着函数的波动忽明忽暗,并且在一个周期内(0~8pi)亮度会随着时间呈现差异化的波动。

Recent Posts

Categories

About

Keep thinking, Stay curious
Always be sensitive to new things