发布时间:2025-12-10 11:50:32 浏览次数:7
地理围栏是一个虚拟的空间围栏,可以帮助开发者检测人或物何时进入或离开预定义区域,并支持实时报警功能。
电子围栏的定义
电子围栏规则数据结构
数据样本示例
电子围栏分析结果数据结构
电子围栏分析任务设置、原始数据json解析、过滤异常数据
读取已存在电子围栏中的车辆与电子围栏信息(广播流临时结果数据)
原始车辆数据与电子围栏广播流进行合并,生成电子围栏规则模型流数据(DStream)
创建90秒翻滚窗口,计算电子围栏信息(ElectricFenceModel中的值根据车辆是否在围栏内进行设置)
读取电子围栏分析结果表数据并广播
翻滚窗口电子围栏对象模型流数据与电子围栏分析结果数据广播流进行connect
对电子围栏对象模型,添加uuid和inMySQL(车辆是否已存在mysql表中)
电子围栏分析结果数据落地mysql,也可以选择落地mongo
电子栅栏分析的逻辑图
电子围栏分析主类:ElectricFenceTask
简化 ItcastDataObj 对象:ItcastDataPartObj.java
简化解析 ItcastParseUtil 对象: JsonParsePartUtil.java
测试工具类对象
广播变量就是将变量广播到各个 taskmanager的内存中,可以共享数据,一般情况下广播变量的类型是 map 类型 key->value
广播变量的数据格式是——map类型state
如何使用广播变量
HashMap<String,ElectriFenceResultTmp> ,其中String:vin
自定义 source 读取 MySQL 的数据源并广播
定义读取电子围栏规则类——MysqlElectricFenceSouce
返回类型为 HashMap<String,ElectricFenceResultTmp>
读取数据库中配置信息
select vins.vin,setting.id,setting.name,setting.address,setting.radius,setting.longitude,setting.latitude,setting.start_time,setting.end_time from vehicle_networking.electronic_fence_setting setting inner join vehicle_networking.electronic_fence_vins vins on setting.id=vins.setting_id where setting.status=1导入工具jar包坐标
<!-- geodesy地址位置查询依赖 --><dependency><groupId>org.gavaghan</groupId><artifactId>geodesy</artifactId><version>${geodesy.version}</version></dependency>两点之间球面距离的计算工具类
/*** TODO 球面距离计算工具类;根据两个点的经纬度,计算出距离*/public class DistanceCaculateUtil {/*** @desc:计算地址位置方法,坐标系、经纬度用于计算距离(直线距离)* @param gpsFrom* @param gpsTo* @param ellipsoid* @return 计算距离*/private static Double getDistanceMeter(GlobalCoordinates gpsFrom, GlobalCoordinates gpsTo, Ellipsoid ellipsoid) {//GeodeticCurve geodeticCurve = new GeodeticCalculator().calculateGeodeticCurve(ellipsoid, gpsFrom, gpsTo);return geodeticCurve.getEllipsoidalDistance();}/*** @desc:使用传入的ellipsoidsphere方法计算距离* @param latitude 位置1经度* @param longitude 位置1维度* @param latitude2 位置2经度* @param longitude2 位置2维度* @param ellipsoid 椭圆计算算法* @return*/private static Double ellipsoidMethodDistance(Double latitude, Double longitude, Double latitude2, Double longitude2, Ellipsoid ellipsoid){// todo 位置点经度、维度不为空 位置点2经度、维度不为空 椭圆算法Objects.requireNonNull(latitude, "latitude is not null");Objects.requireNonNull(longitude, "longitude is not null");Objects.requireNonNull(latitude2, "latitude2 is not null");Objects.requireNonNull(longitude2, "longitude2 is not null");Objects.requireNonNull(ellipsoid, "ellipsoid method is not null");// todo 地球坐标对象:封装经度维度坐标对象GlobalCoordinates source = new GlobalCoordinates(latitude, longitude);GlobalCoordinates target = new GlobalCoordinates(latitude2, longitude2);// todo 椭圆范围计算方法return getDistanceMeter(source, target, ellipsoid);}/*** @desc:使用ellipsoidsphere方法计算距离* @param latitude* @param longitude* @param latitude2* @param longitude2* @return distance 单位:m*/public static Double getDistance(Double latitude,Double longitude,Double latitude2,Double longitude2) {// 椭圆范围计算方法:Ellipsoid.Spherereturn ellipsoidMethodDistance(latitude, longitude, latitude2, longitude2, Ellipsoid.Sphere);}}通过关联两个数据流后CoFlatMap 后生成实体类—— ElectricFenceModel
/*** 电子围栏规则计算模型*/@Data@AllArgsConstructor@NoArgsConstructorpublic class ElectricFenceModel implements Comparable<ElectricFenceModel> {//车架号private String vin = "";//电子围栏结果表UUIDprivate Long uuid = -999999L;//上次状态 0 里面 1 外面private int lastStatus = -999999;//当前状态 0 里面 1 外面private int nowStatus = -999999;//位置时间 yyyy-MM-dd HH:mm:ssprivate String gpsTime = "";//位置纬度--private Double lat = -999999D;//位置经度--private Double lng = -999999D;//电子围栏IDprivate int eleId = -999999;//电子围栏名称private String eleName = "";//中心点地址private String address = "";//中心点纬度private Double latitude;//中心点经度private Double longitude = -999999D;//电子围栏半径private Float radius = -999999F;//出围栏时间private String outEleTime = null;//进围栏时间private String inEleTime = null;//是否在mysql结果表中private Boolean inMysql = false;//状态报警 0:出围栏 1:进围栏private int statusAlarm = -999999;//报警信息private String statusAlarmMsg = "";//终端时间private String terminalTime = "";// 扩展字段 终端时间private Long terminalTimestamp = -999999L;@Overridepublic int compareTo(ElectricFenceModel o) {if(this.getTerminalTimestamp() > o.getTerminalTimestamp()){return 1;}else if(this.getTerminalTimestamp() < o.getTerminalTimestamp()){return -1;}else{return 0;}}}实现将两个流合并CoFlatMapFunction接口—— ElectricFenceRulesFuntion
//1.定义返回的 ElectricFenceModel//2.判断如果流数据数据质量(车辆的经纬度不能为0或-999999,车辆GpsTime不能为空)//2.1.获取当前车辆的 vin//2.2.通过vin获取电子围栏的配置信息//2.3.如果电子围栏配置信息不为空//2.3.1.说明当前车辆关联了电子围栏规则,需要判断当前上报的数据是否在电子围栏规则的生效时间内,先获取上报地理位置时间gpsTimestamp//2.3.2.如果当前gpsTimestamp>=开始时间戳并且gpsTimestamp<=结束时间戳,以下内容存入到 ElectricFenceModel//2.3.2.1.上报车辆的数据在电子围栏生效期内 vin gpstime lng lat 终端时间和终端时间戳//2.3.2.2.电子围栏id,电子围栏名称,地址,半径//2.3.2.3.电子围栏经纬度//2.3.2.4.计算经纬度和电子围栏经纬度距离距离,如果两点之间大于半径(单位是千米)的距离,就是存在于圆外,否则反之//2.3.2.5.收集结果数据设置水印机制
根据 vin 进行分组
创建 90 秒翻滚窗口
自定义电子围栏窗口实现类:ElectricFenceWindowFunction
//对电子围栏进行自定义窗口操作,处理电子围栏判断逻辑//继承 RichWindowFunction<ElectricFenceModel, ElectricFenceModel, String, TimeWindow>//1.定义存储历史电子围栏数据的state,<vin,是否在电子围栏内0:内,1:外> MapState<String, Integer>//2.重写open方法//2.1 定义mapState的描述器(相当于表结构) <String,Integer>//2.2 获取 parameterTool,用来读取配置文件参数//2.3 读取状态的超时时间 "vehicle.state.last.period" ,构建ttl设置更新类型和状态可见//2.4 设置状态描述 StateTtlConfig,开启生命周期时间//2.5 获取map状态apply 方法步骤如下
//1.创建返回对象//2.对窗口内的数据进行排序//3.从 state 中获取车辆vin对应的上一次窗口电子围栏lastStateValue标记(车辆上一次窗口是否在电子围栏中)0:电子围栏内 1:电子围栏外//4.如果上次状态为空,初始化赋值//5.判断当前处于电子围栏内还是电子围栏外//5.1.定义当前车辆电子围栏内出现的次数//5.2.定义当前车辆电子围栏外出现的次数//6.定义当前窗口的电子围栏状态//7. 90s内车辆出现在电子围栏内的次数多于出现在电子围栏外的次数,则认为当前处于电子围栏内//8. 将当前窗口的电子围栏状态写入到 state 中,供下次判断//9.如果当前电子围栏状态与上一次电子围栏状态不同//9.1.如果上一次窗口处于电子围栏外,而本次是电子围栏内,则将进入电子围栏的时间写入到数据库中//9.1.1.过滤出来状态为0的第一条数据//9.1.2.拷贝属性给 electricFenceModel 并将进入终端时间赋值,并且将状态告警字段赋值为1 0:出围栏 1:进围栏,将数据collect返回//9.2.如果上一次窗口处于电子围栏内,而本次是电子围栏外,则将出电子围栏的时间写入到数据库中//9.2.1.过滤出来状态倒序为1的第一条数据//9.2.2.拷贝属性给 electricFenceModel 并将出终端时间赋值,并且将状态告警 0:出围栏 1:进围栏,将数据collect返回如果判断为进入到电子围栏,进入到电子围栏的第一条数据的时间会被记录下来
读取mysql的电子围栏结果表的数据——MysqlElectricFenceResultSource
//读取电子围栏分析结果表的数据,并进行广播//继承自 RichSourceFunction<HashMap<String, Long>>//1.重写 open 方法,初始化连接//1.1 编写sql "select vin, min(id) id from vehicle_networking.electric_fence where inTime is not null and outTime is null GROUP BY vin;"//2.重写 close 方法//3.重写 run 方法 获取出来vin 和 id 封装成map并返回//4.重写 cancel 方法读取电子栅栏的 vin 和 最近id
select vin,min(id) id from vehicle_networking.electric_fence where inTime is not null and outTime is null group by vin将读取的电子栅栏信息数据流广播出去
将电子栅栏模型数据流和电子栅栏 获取的<vin,id>流进行关联,并进行 flatMap
实现电子围栏分析结果模型添加 uuid 和 inMysql 字段 —— ElectricFenceModelFunction
//实现 CoFlatMapFunction<ElectricFenceModel, HashMap<String, Long>, ElectricFenceModel>//1.重写flatMap1方法//1.1.通过getvin获取配置流中是否存在值//2.如果不为 null//2.1.设置为当前时间戳//2.2.设置库中InMysql是否存在为 true//3.否则//3.1.设置 uuid 为最大值-当前时间戳//3.2 设置库中是否存在为 false//4.收集数据//5.重写 flatMap2 方法//5.1.读取配置数据将电子围栏分析结果数据写入到 mysql 数据库中 —— ElectricFenceMysqlSink
//继承于 RichSinkFunction<ElectricFenceModel>//1. 重写 open 方法,获取参数,创建连接//2. 重写 invoke 方法,//2.1 出围栏(且能获取到进围栏状态的)则修改进围栏的状态, 否则 进入围栏,转换ElectricFenceModel对象,插入结构数据到电子围栏结果表//3. 重写 close 方法