Backgroud
【2021-09-01】更新内容见下面两篇文章:
- 数据模拟软件:传感器数据模拟软件【StreamFaker】【带界面】【发送到Rabbitmq】【python3+pyqt5实现】
- 程序源码:传感器采集的数据怎么处理、怎么存储【flink】【influxdb】
Goal: 目前一直在做传感器监测类项目,包括地震、桥梁、大厦、体育场、高铁站房、爬模架、风电塔筒等的监测。传感器类型包括GPS、应变、位移、温度、振动、索力、静力水准仪、倾角仪、气象站等。数据采集频率从分钟级到毫秒级不等。需求大同小异,一般需要存储原始监测数据、实时计算、阈值告警等。
Design: 此类时序数据的存储采用influxDB,实时计算采用Flink,所有传感器数据都接入RabbitMQ(本来想用Kafka,但前期架构不好改了)。
Tool: 这里分享一个数据模拟软件(传感器数据模拟软件(To RabbitMQ))提取码:nxlu
【声明: 本软件为公司自己开发,未经允许和授权,任何组织或个人不得用于商业用途】
这里只是做个简单的记录,方便以后查阅,若有错漏之处,敬请斧正。
先贴个简单的架构图
influxDB 和 redis 中存储的数据结构和格式
influx中的v1,v2等和redis中的q1,q2等代表的是传感器的监测指标quota
上代码
package com.cloudansys.api.rabbitmqsource;
import com.cloudansys.config.DefaultConfig;
import com.cloudansys.config.FlinkConfig;
import com.cloudansys.core.constant.Const;
import com.cloudansys.core.entity.MultiDataEntity;
import com.cloudansys.core.flink.function.CustomFilterFunc;
import com.cloudansys.core.flink.function.rabbitmq.*;
import com.cloudansys.core.flink.sink.RawValueInfluxSink;
import com.cloudansys.core.flink.sink.RedisBatchSink;
import com.cloudansys.core.flink.sink.SecMeanValueInfluxSink;
import lombok.extern.slf4j.Slf4j;
import org.apache.flink.streaming.api.TimeCharacteristic;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.util.OutputTag;
import java.util.List;
@Slf4j
public class Application {
public Application() {
}
public static void main(String[] args) throws Exception {
// Windows 下配置 Hadoop 目录
if (System.getProperty("os.name").startsWith("Win")) {
log.info("------ 【windows env】 ------");
System.setProperty(Const.HADOOP_HOME_DIR, FlinkConfig.getHadoopHomeDir());
}
log.info("");
log.info("****************** 获取 Flink 执行环境");
log.info("");
// 获取 Flink 执行环境
StreamExecutionEnvironment env = FlinkUtil.getFlinkEnv();
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
log.info("");
log.info("****************** 配置 RabbitMQ 数据源并解析");
log.info("");
// 解析原始数据,把流元素封装成 List<MultiDataEntity>
SingleOutputStreamOperator<List<MultiDataEntity>> rawStream = env
.setParallelism(1)
// 配置 RabbitMQ 数据源
.addSource(DefaultConfig.getRMQSource())
// 过滤掉为 null 和 whitespace 的值
.filter(new CustomFilterFunc())
// 解析数据
.map(new RabbitMQMapFunc());
log.info("");
log.info("****************** 原始数据存储【influx】");
log.info("");
// 存储原始数据到 influxDB 中
rawStream
.addSink(new RawValueInfluxSink());
log.info("");
log.info("****************** 高低频数据分流");
log.info("");
// 毫秒级高频测流标志
final OutputTag<List<MultiDataEntity>> tagH = new OutputTag<List<MultiDataEntity>>(Const.TAG_HIGH_FREQUENCY) {
};
// 低频测流标志(即数据采集频率是秒级或分钟级的低频流)
final OutputTag<List<MultiDataEntity>> tagL = new OutputTag<List<MultiDataEntity>>(Const.TAG_LOWER_FREQUENCY) {
};
// 分流,给不同采集频率的数据打上 tag
SingleOutputStreamOperator<List<MultiDataEntity>> tagStream = rawStream
.process(new RabbitMQSideOutputProcessFunc(tagH, tagL));
// 获取低频数据流
DataStream<List<MultiDataEntity>> lowFrequencyStream = tagStream.getSideOutput(tagL);
// 获取高频数据流
DataStream<List<MultiDataEntity>> highFrequencyStream = tagStream.getSideOutput(tagH);
log.info("");
log.info("****************** 秒级均值计算");
log.info("");
// 高频数据流计算秒级均值
SingleOutputStreamOperator<List<MultiDataEntity>> secMeanValueStream = highFrequencyStream
// 设置 watermark
.assignTimestampsAndWatermarks(new RabbitMQWatermark())
// 根据项目 id 和传感器类型分组
.keyBy(new RabbitMQKeySelector())
// 指定时间窗口大小
.window(TumblingEventTimeWindows.of(Time.seconds(1)))
// 对窗口内的元素进行聚合计算---计算秒级均值
.apply(new RabbitMQWindowFunc());
log.info("");
log.info("****************** 秒级均值计算结果存储【influxDB 和 redis】");
log.info("");
// 把秒级均值计算结果写入 influxDB 和 redis
secMeanValueStream.addSink(new SecMeanValueInfluxSink());
secMeanValueStream.addSink(new RedisBatchSink());
log.info("");
log.info("****************** 任务配置完毕,流计算开始 . . . ");
log.info("");
// 因为Flink是懒加载的,所以必须调用execute方法,上面的代码才会执行
env.execute(Const.JOB_ID);
}
}
RabbitMQ 数据源
/**
* RabbitMQ 数据源
*/
public static RMQSource<String> getRMQSource() {
return new RMQSource<String>(
DefaultConfig.getRabbitMQConfig(),
props.getProperty(Const.RABBITMQ_QUEUE_NAME),
new SimpleStringSchema()) {
/**
* 如果 rabbitmq 中的 queue 设置了 ttl
* 这里要把 queueDeclare 的第二个参数修改成 true,并配置 x-message-ttl
*/
@Override
protected void setupQueue() throws IOException {
if (queueName != null) {
Map<String, Object> arguments = new HashMap<>();
arguments.put("x-message-ttl", 259200000);
channel.queueDeclare(queueName, true, false, false, arguments);
}
}
};
}
数据解析
package com.cloudansys.core.flink.function.rabbitmq;
import com.cloudansys.core.entity.MultiDataEntity;
import com.cloudansys.core.util.SensorPayloadParseUtil;
import org.apache.flink.api.common.functions.MapFunction;
import java.util.List;
public class RabbitMQMapFunc implements MapFunction<String, List<MultiDataEntity>> {
@Override
public List<MultiDataEntity> map(String element) throws Exception {
return SensorPayloadParseUtil.parse(element);
}
}
package com.cloudansys.core.util;
import com.cloudansys.core.constant.Const;
import com.cloudansys.core.entity.MultiDataEntity;
import lombok.extern.slf4j.Slf4j;
import java.text.ParseException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
@Slf4j
public class SensorPayloadParseUtil {
/**
* 原始流中元素{项目id,时间(17位字符串),功能码,测定类型,预留字节,测定个数,指标个数,测定id,value}
* {项目id,时间(17位字符串),测定类型,测定个数,指标个数,id,value}
* 输出:projectId, timestamp, typeSymbol, sensorNum, quotaNum, sensorId1, value1, sensorId2, value2, ...
*/
public static List<MultiDataEntity> parse(String element) throws ParseException {
List<MultiDataEntity> multiDataEntities = new ArrayList<>();
int sensorNum, quotaNum;
String projectId, typeSymbol, pickTime;
String[] items = element.split(Const.COMMA);
int index = 0;
projectId = items[index++];
pickTime = items[index++];
index++; // 功能码
typeSymbol = items[index++];
index++; // 预留字节
sensorNum = Integer.parseInt(items[index++]);
quotaNum = Integer.parseInt(items[index]);
// 将数值拷贝成一个新数组 a1,aq1,aq2,aq3,b1,bq1,bq2,bq3,c1,cq1,cq2,cq3,...
String[] keyValues = Arrays.copyOfRange(items, 7, items.length);
for (int i = 0; i < sensorNum; i++) {
int indexOfSerialCode = i * (quotaNum + 1);
MultiDataEntity multiDataEntity = new MultiDataEntity();
multiDataEntity.setProjectId(projectId);
multiDataEntity.setTypeSymbol(typeSymbol);
multiDataEntity.setPickTime(pickTime);
multiDataEntity.setSerialCode(keyValues[indexOfSerialCode]);
Double[] values = new Double[quotaNum];
int indexOfFirstQuota = indexOfSerialCode + 1;
int indexOfLastQuota = indexOfSerialCode + quotaNum;
for (int j = indexOfFirstQuota; j <= indexOfLastQuota; j++) {
values[j - indexOfFirstQuota] = Double.parseDouble(keyValues[j]);
}
multiDataEntity.setValues(values);
multiDataEntities.add(multiDataEntity);
}
return multiDataEntities;
}
}
写入 influxDB
package com.cloudansys.core.flink.sink;
import com.cloudansys.config.DefaultConfig;
import com.cloudansys.core.constant.Const;
import com.cloudansys.core.entity.MultiDataEntity;
import lombok.extern.slf4j.Slf4j;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;
import org.influxdb.InfluxDB;
import org.influxdb.dto.BatchPoints;
import org.influxdb.dto.Point;
import java.text.ParseException;
import java.text.SimpleDateFormat;
import java.util.List;
import java.util.concurrent.TimeUnit;
@Slf4j
public class RawValueInfluxSink extends RichSinkFunction<List<MultiDataEntity>> {
private static InfluxDB influxDBClient;
private static BatchPoints batchPoints;
private static final String measurement = "value_raw";
private static final String tagName_1 = "projectId";
private static final String tagName_2 = "typeSymbol";
private static final String tagName_3 = "serialCode";
private static final String fieldName_base = "v";
private static int count = 0;
private static long lastTime = 0L;
@Override
public void open(Configuration parameters) throws Exception {
super.open(parameters);
influxDBClient = DefaultConfig.getInfluxDB();
batchPoints = DefaultConfig.getBatchPoints();
}
@Override
public void invoke(List<MultiDataEntity> element, Context context) throws Exception {
log.info("##### write in influx 【{}】#####", measurement);
for (MultiDataEntity multiDataEntity : element) {
String projectId = multiDataEntity.getProjectId();
String typeSymbol = multiDataEntity.getTypeSymbol();
String serialCode = multiDataEntity.getSerialCode();
String pickTime = multiDataEntity.getPickTime();
Point.Builder pointBuilder = getPointBuilder(projectId, typeSymbol, serialCode, pickTime);
if (pointBuilder != null) {
Double[] values = multiDataEntity.getValues();
for (int j = 0; j < values.length; j++) {
String fieldName = fieldName_base + (j + 1);
// 数值一律采用 double 类型
double fieldValue = values[j]