传感器监测数据实时存储、计算和展示(RabbitMQ-Flink-InfluxDB)

2023-03-27 15:58:12

Backgroud


【2021-09-01】更新内容见下面两篇文章:


Goal: 目前一直在做传感器监测类项目,包括地震、桥梁、大厦、体育场、高铁站房、爬模架、风电塔筒等的监测。传感器类型包括GPS、应变、位移、温度、振动、索力、静力水准仪、倾角仪、气象站等。数据采集频率从分钟级到毫秒级不等。需求大同小异,一般需要存储原始监测数据、实时计算、阈值告警等。
Design: 此类时序数据的存储采用influxDB,实时计算采用Flink,所有传感器数据都接入RabbitMQ(本来想用Kafka,但前期架构不好改了)。
Tool: 这里分享一个数据模拟软件(传感器数据模拟软件(To RabbitMQ))提取码:nxlu
声明: 本软件为公司自己开发,未经允许和授权,任何组织或个人不得用于商业用途】
这里只是做个简单的记录,方便以后查阅,若有错漏之处,敬请斧正。

先贴个简单的架构图

在这里插入图片描述

influxDB 和 redis 中存储的数据结构和格式

influx中的v1,v2等和redis中的q1,q2等代表的是传感器的监测指标quota

在这里插入图片描述
在这里插入图片描述

上代码

package com.cloudansys.api.rabbitmqsource;

import com.cloudansys.config.DefaultConfig;
import com.cloudansys.config.FlinkConfig;
import com.cloudansys.core.constant.Const;
import com.cloudansys.core.entity.MultiDataEntity;
import com.cloudansys.core.flink.function.CustomFilterFunc;
import com.cloudansys.core.flink.function.rabbitmq.*;
import com.cloudansys.core.flink.sink.RawValueInfluxSink;
import com.cloudansys.core.flink.sink.RedisBatchSink;
import com.cloudansys.core.flink.sink.SecMeanValueInfluxSink;
import lombok.extern.slf4j.Slf4j;
import org.apache.flink.streaming.api.TimeCharacteristic;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.util.OutputTag;

import java.util.List;

@Slf4j
public class Application {

    public Application() {
    }

    public static void main(String[] args) throws Exception {

        // Windows 下配置 Hadoop 目录
        if (System.getProperty("os.name").startsWith("Win")) {
            log.info("------ 【windows env】 ------");
            System.setProperty(Const.HADOOP_HOME_DIR, FlinkConfig.getHadoopHomeDir());
        }

        log.info("");
        log.info("****************** 获取 Flink 执行环境");
        log.info("");

        // 获取 Flink 执行环境
        StreamExecutionEnvironment env = FlinkUtil.getFlinkEnv();
        env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);

        log.info("");
        log.info("****************** 配置 RabbitMQ 数据源并解析");
        log.info("");

        // 解析原始数据,把流元素封装成 List<MultiDataEntity>
        SingleOutputStreamOperator<List<MultiDataEntity>> rawStream = env
                .setParallelism(1)
                // 配置 RabbitMQ 数据源
                .addSource(DefaultConfig.getRMQSource())
                // 过滤掉为 null 和 whitespace 的值
                .filter(new CustomFilterFunc())
                // 解析数据
                .map(new RabbitMQMapFunc());

        log.info("");
        log.info("****************** 原始数据存储【influx】");
        log.info("");

        // 存储原始数据到 influxDB 中
        rawStream
                .addSink(new RawValueInfluxSink());

        log.info("");
        log.info("****************** 高低频数据分流");
        log.info("");

        // 毫秒级高频测流标志
        final OutputTag<List<MultiDataEntity>> tagH = new OutputTag<List<MultiDataEntity>>(Const.TAG_HIGH_FREQUENCY) {
        };
        // 低频测流标志(即数据采集频率是秒级或分钟级的低频流)
        final OutputTag<List<MultiDataEntity>> tagL = new OutputTag<List<MultiDataEntity>>(Const.TAG_LOWER_FREQUENCY) {
        };

        // 分流,给不同采集频率的数据打上 tag
        SingleOutputStreamOperator<List<MultiDataEntity>> tagStream = rawStream
                .process(new RabbitMQSideOutputProcessFunc(tagH, tagL));

        // 获取低频数据流
        DataStream<List<MultiDataEntity>> lowFrequencyStream = tagStream.getSideOutput(tagL);
        // 获取高频数据流
        DataStream<List<MultiDataEntity>> highFrequencyStream = tagStream.getSideOutput(tagH);

        log.info("");
        log.info("****************** 秒级均值计算");
        log.info("");

        // 高频数据流计算秒级均值
        SingleOutputStreamOperator<List<MultiDataEntity>> secMeanValueStream = highFrequencyStream
                // 设置 watermark
                .assignTimestampsAndWatermarks(new RabbitMQWatermark())
                // 根据项目 id 和传感器类型分组
                .keyBy(new RabbitMQKeySelector())
                // 指定时间窗口大小
                .window(TumblingEventTimeWindows.of(Time.seconds(1)))
                // 对窗口内的元素进行聚合计算---计算秒级均值
                .apply(new RabbitMQWindowFunc());

        log.info("");
        log.info("****************** 秒级均值计算结果存储【influxDB 和 redis】");
        log.info("");

        // 把秒级均值计算结果写入 influxDB 和 redis
        secMeanValueStream.addSink(new SecMeanValueInfluxSink());
        secMeanValueStream.addSink(new RedisBatchSink());

        log.info("");
        log.info("****************** 任务配置完毕,流计算开始 . . . ");
        log.info("");

        // 因为Flink是懒加载的,所以必须调用execute方法,上面的代码才会执行
        env.execute(Const.JOB_ID);
    }

}

RabbitMQ 数据源

    /**
     * RabbitMQ 数据源
     */
    public static RMQSource<String> getRMQSource() {
        return new RMQSource<String>(
                DefaultConfig.getRabbitMQConfig(),
                props.getProperty(Const.RABBITMQ_QUEUE_NAME),
                new SimpleStringSchema()) {
            /**
             * 如果 rabbitmq 中的 queue 设置了 ttl
             * 这里要把 queueDeclare 的第二个参数修改成 true,并配置 x-message-ttl
             */
            @Override
            protected void setupQueue() throws IOException {
                if (queueName != null) {
                    Map<String, Object> arguments = new HashMap<>();
                    arguments.put("x-message-ttl", 259200000);
                    channel.queueDeclare(queueName, true, false, false, arguments);
                }
            }
        };
    }

数据解析

package com.cloudansys.core.flink.function.rabbitmq;

import com.cloudansys.core.entity.MultiDataEntity;
import com.cloudansys.core.util.SensorPayloadParseUtil;
import org.apache.flink.api.common.functions.MapFunction;

import java.util.List;

public class RabbitMQMapFunc implements MapFunction<String, List<MultiDataEntity>> {

    @Override
    public List<MultiDataEntity> map(String element) throws Exception {
        return SensorPayloadParseUtil.parse(element);
    }

}
package com.cloudansys.core.util;

import com.cloudansys.core.constant.Const;
import com.cloudansys.core.entity.MultiDataEntity;
import lombok.extern.slf4j.Slf4j;

import java.text.ParseException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;

@Slf4j
public class SensorPayloadParseUtil {

    /**
     * 原始流中元素{项目id,时间(17位字符串),功能码,测定类型,预留字节,测定个数,指标个数,测定id,value}
     * {项目id,时间(17位字符串),测定类型,测定个数,指标个数,id,value}
     * 输出:projectId, timestamp, typeSymbol, sensorNum, quotaNum, sensorId1, value1, sensorId2, value2, ...
     */
    public static List<MultiDataEntity> parse(String element) throws ParseException {
        List<MultiDataEntity> multiDataEntities = new ArrayList<>();
        int sensorNum, quotaNum;
        String projectId, typeSymbol, pickTime;
        String[] items = element.split(Const.COMMA);

        int index = 0;
        projectId = items[index++];
        pickTime = items[index++];
        index++; // 功能码
        typeSymbol = items[index++];
        index++; // 预留字节
        sensorNum = Integer.parseInt(items[index++]);
        quotaNum = Integer.parseInt(items[index]);

        // 将数值拷贝成一个新数组  a1,aq1,aq2,aq3,b1,bq1,bq2,bq3,c1,cq1,cq2,cq3,...
        String[] keyValues = Arrays.copyOfRange(items, 7, items.length);
        for (int i = 0; i < sensorNum; i++) {
            int indexOfSerialCode = i * (quotaNum + 1);
            MultiDataEntity multiDataEntity = new MultiDataEntity();
            multiDataEntity.setProjectId(projectId);
            multiDataEntity.setTypeSymbol(typeSymbol);
            multiDataEntity.setPickTime(pickTime);
            multiDataEntity.setSerialCode(keyValues[indexOfSerialCode]);
            Double[] values = new Double[quotaNum];
            int indexOfFirstQuota = indexOfSerialCode + 1;
            int indexOfLastQuota = indexOfSerialCode + quotaNum;
            for (int j = indexOfFirstQuota; j <= indexOfLastQuota; j++) {
                values[j - indexOfFirstQuota] = Double.parseDouble(keyValues[j]);
            }
            multiDataEntity.setValues(values);
            multiDataEntities.add(multiDataEntity);
        }

        return multiDataEntities;
    }
}

写入 influxDB

package com.cloudansys.core.flink.sink;

import com.cloudansys.config.DefaultConfig;
import com.cloudansys.core.constant.Const;
import com.cloudansys.core.entity.MultiDataEntity;
import lombok.extern.slf4j.Slf4j;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;
import org.influxdb.InfluxDB;
import org.influxdb.dto.BatchPoints;
import org.influxdb.dto.Point;

import java.text.ParseException;
import java.text.SimpleDateFormat;
import java.util.List;
import java.util.concurrent.TimeUnit;

@Slf4j
public class RawValueInfluxSink extends RichSinkFunction<List<MultiDataEntity>> {

    private static InfluxDB influxDBClient;
    private static BatchPoints batchPoints;
    private static final String measurement = "value_raw";
    private static final String tagName_1 = "projectId";
    private static final String tagName_2 = "typeSymbol";
    private static final String tagName_3 = "serialCode";
    private static final String fieldName_base = "v";
    private static int count = 0;
    private static long lastTime = 0L;

    @Override
    public void open(Configuration parameters) throws Exception {
        super.open(parameters);
        influxDBClient = DefaultConfig.getInfluxDB();
        batchPoints = DefaultConfig.getBatchPoints();
    }

    @Override
    public void invoke(List<MultiDataEntity> element, Context context) throws Exception {
        log.info("##### write in influx 【{}】#####", measurement);
        for (MultiDataEntity multiDataEntity : element) {
            String projectId = multiDataEntity.getProjectId();
            String typeSymbol = multiDataEntity.getTypeSymbol();
            String serialCode = multiDataEntity.getSerialCode();
            String pickTime = multiDataEntity.getPickTime();
            Point.Builder pointBuilder = getPointBuilder(projectId, typeSymbol, serialCode, pickTime);
            if (pointBuilder != null) {
                Double[] values = multiDataEntity.getValues();
                for (int j = 0; j < values.length; j++) {

                    String fieldName = fieldName_base + (j + 1);

                    // 数值一律采用 double 类型
                    double fieldValue = values[j]
  • 作者:WaiSaa
  • 原文链接:https://blog.csdn.net/qq_42761569/article/details/108599902
    更新时间:2023-03-27 15:58:12