Commit 19f3245f by 魏建枢

代码提交增加事件错误事件

parent 25c910d7
......@@ -3,18 +3,23 @@ package com.flink.achieve.doris;
import java.io.Serializable;
import java.time.LocalDateTime;
import java.time.format.DateTimeFormatter;
import java.util.List;
import java.util.Objects;
import org.apache.commons.collections.CollectionUtils;
import org.apache.commons.lang3.StringUtils;
import org.apache.doris.flink.sink.DorisSink;
import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.common.io.ParseException;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.table.api.DataTypes;
import org.apache.flink.table.data.GenericRowData;
import org.apache.flink.table.data.RowData;
import org.apache.flink.table.data.StringData;
import org.apache.flink.table.data.TimestampData;
import org.apache.flink.table.types.DataType;
import org.apache.flink.util.Collector;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
......@@ -24,8 +29,11 @@ import com.flink.common.DorisConnector;
import com.flink.common.SourceCommonBase;
import com.flink.config.TableConfig;
import com.flink.processor.function.UserPropertiesProcessor;
import com.flink.util.CompareUtils;
import com.flink.util.TimeConvertUtil;
import com.flink.vo.EventList;
import com.flink.vo.OdsEventLog;
import com.flink.vo.Properties;
import com.flink.vo.UserProperties;
/**
......@@ -99,8 +107,51 @@ public class EventLogAchi extends SourceCommonBase implements Serializable{
"bi.event_log"
);
TableConfig tableErrorConfig = new TableConfig(
new String[]{
"id",
"dt",
"event_time",
"create_time",
"app_key",
"app_type",
"cid",
"phone",
"nick",
"event",
"data",
"startTime",
"timeDifference",
"endTime",
"userId",
"__DORIS_DELETE_SIGN__"
},
new DataType[]{
DataTypes.STRING(),
DataTypes.DATE(),
DataTypes.TIMESTAMP(3),
DataTypes.TIMESTAMP(3),
DataTypes.STRING(),
DataTypes.STRING(),
DataTypes.STRING(),
DataTypes.STRING(),
DataTypes.STRING(),
DataTypes.STRING(),
DataTypes.STRING(),
DataTypes.TIMESTAMP(3),
DataTypes.STRING(),
DataTypes.TIMESTAMP(3),
DataTypes.STRING(),
DataTypes.INT()
},
"bi.event_log_error"
);
//=================流式处理=========================================
DorisSink<RowData> dorisSink = DorisConnector.sinkDoris(tableConfig.getFields(), tableConfig.getTypes(), tableConfig.getTableName());
DorisSink<RowData> dorisErrorSink = DorisConnector.sinkDoris(tableErrorConfig.getFields(), tableErrorConfig.getTypes(), tableErrorConfig.getTableName());
//=================数据处理流水线=========================================
dataStreamSource
.map(value->{
......@@ -125,8 +176,8 @@ public class EventLogAchi extends SourceCommonBase implements Serializable{
row.setField(2, TimestampData.fromLocalDateTime(LocalDateTime.parse(sendTime,DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss"))));
row.setField(3, TimestampData.fromLocalDateTime(LocalDateTime.parse(createTime,DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss.SSS"))));
row.setField(4, StringData.fromString(event.getStrategy_group_id()));
row.setField(5, StringData.fromString(event.getApp_key()));
row.setField(6, StringData.fromString(event.getApp_type()));
row.setField(5, StringData.fromString(appKey));
row.setField(6, StringData.fromString(appType));
row.setField(7, StringData.fromString(event.getApp_channel()));
row.setField(8, StringData.fromString(event.getZone_code()));
row.setField(9, StringData.fromString(event.getZone_name()));
......@@ -137,7 +188,7 @@ public class EventLogAchi extends SourceCommonBase implements Serializable{
row.setField(14, StringData.fromString(event.getUid()));
row.setField(15, StringData.fromString(event.getStrategy_version()));
row.setField(16, StringData.fromString(event.getEvent_list()));
row.setField(17, StringData.fromString(event.getRoute_ip()));
row.setField(17, StringData.fromString(routeIp));
row.setField(18, StringData.fromString(userProperties==null ? null : userProperties.getCid()));
row.setField(19, StringData.fromString(userProperties==null ? null : userProperties.getPhone()));
row.setField(20, StringData.fromString(userProperties==null ? null : userProperties.getNick()));
......@@ -152,7 +203,74 @@ public class EventLogAchi extends SourceCommonBase implements Serializable{
.filter(Objects::nonNull)
// .print()
.sinkTo(dorisSink)
.name("Doris-CollectLog");
.name("Doris-EventLog");
SingleOutputStreamOperator<RowData> rowDataStream = dataStreamSource.flatMap(
new FlatMapFunction<String, RowData>() {
private static final long serialVersionUID = 1L;
@Override
public void flatMap(String value, Collector<RowData> out) throws Exception {
try {
// 解析 Kafka 数据
OdsEventLog event = JSONObject.parseObject(value, new TypeReference<OdsEventLog>() {});
if (null == event) {
return;
}
String id = event.getId();
String createTime = event.getCreate_time();
int dt = TimeConvertUtil.convertToSqlDate(createTime.substring(0, 10));
String appKey = event.getApp_key().trim();
String appType = event.getApp_type();
UserProperties userProperties = UserPropertiesProcessor.userPropertiesToJson(event.getUser_properties());
List<EventList> eventList = JSONObject.parseObject(event.getEvent_list(), new TypeReference<List<EventList>>() {});
if(CollectionUtils.isNotEmpty(eventList)){
for(EventList eventListInfo : eventList) {
String r7 = eventListInfo.getR7();
Long r9 = eventListInfo.getR9();
Properties r8 = eventListInfo.getR8();
if(CompareUtils.stringExists(r7,
"socket_event",
"socket_error",
"socket_time",
"refresh_token",
"all_time")) {
// 转换为RowData
GenericRowData row = new GenericRowData(tableErrorConfig.getFields().length);
row.setField(0, StringData.fromString(id)); //id
row.setField(1, dt); //dt
row.setField(2, TimestampData.fromLocalDateTime(LocalDateTime.parse(TimeConvertUtil.parseToStringSSS(r9),DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss.SSS"))));
row.setField(3, TimestampData.fromLocalDateTime(LocalDateTime.parse(createTime,DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss.SSS"))));
row.setField(4, StringData.fromString(appKey));
row.setField(5, StringData.fromString(appType));
row.setField(6, StringData.fromString(userProperties==null ? null : userProperties.getCid()));
row.setField(7, StringData.fromString(userProperties==null ? null : userProperties.getPhone()));
row.setField(8, StringData.fromString(userProperties==null ? null : userProperties.getNick()));
row.setField(9, StringData.fromString(r7));
row.setField(10, StringData.fromString(r8.getData()));
row.setField(11, TimestampData.fromLocalDateTime(LocalDateTime.parse(TimeConvertUtil.parseToStringSSS(r8.getStartTime()),DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss.SSS"))));
row.setField(12, StringData.fromString(r8.getTimeDifference()));
row.setField(13, TimestampData.fromLocalDateTime(LocalDateTime.parse(TimeConvertUtil.parseToStringSSS(r8.getEndTime()),DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss.SSS"))));
row.setField(14, StringData.fromString(r8.getUserId()));
row.setField(15, 0);
out.collect(row);
}
}
}
} catch (Exception e) {
logger.error("EventLogAchi 处理 Kafka 消息出错 | rawData:{} | error:{}", value, e.getMessage());
System.err.println("解析失败: "+e.toString());
}
}
});
rowDataStream
.filter(Objects::nonNull)
// .print()
.sinkTo(dorisErrorSink)
.name("Doris-EventLogError");
}
@Override
......
......@@ -3,28 +3,19 @@ package com.flink.achieve.doris;
import java.io.Serializable;
import java.time.Duration;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import org.apache.commons.collections.CollectionUtils;
import org.apache.commons.lang3.StringUtils;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.common.functions.RichCoGroupFunction;
import org.apache.flink.api.common.io.ParseException;
import org.apache.flink.api.common.state.ListState;
import org.apache.flink.api.common.state.ListStateDescriptor;
import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.KeyedStream;
import org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows;
import org.apache.flink.util.Collector;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
......@@ -34,6 +25,8 @@ import com.alibaba.fastjson.TypeReference;
import com.flink.common.MultipleSourceCommonBase;
import com.flink.enums.AppTypeEnum;
import com.flink.enums.TopicTypeEnum;
import com.flink.processor.function.CollectLogProcessFunction;
import com.flink.processor.function.PointRecordJoinProcessor;
import com.flink.processor.function.UserPropertiesProcessor;
import com.flink.processor.function.VectorAngleProcessor;
import com.flink.util.TimeConvertUtil;
......@@ -43,7 +36,6 @@ import com.flink.vo.EventList;
import com.flink.vo.EventLogToJsonSource;
import com.flink.vo.KafkaDataSource;
import com.flink.vo.OdsEventLog;
import com.flink.vo.Properties;
import com.flink.vo.UserProperties;
import com.flink.vo.android.deviceInfo.AndroidA1;
import com.flink.vo.ios.IosDeviceInfo;
......@@ -80,70 +72,13 @@ public class VectorAngleCalculationAchi extends MultipleSourceCommonBase impleme
return;
}
// 事件数据流处理
DataStream<EventLogToJsonSource> eventDataStream = eventLogStreamSource.flatMap(new FlatMapFunction<String, EventLogToJsonSource>() {
private static final long serialVersionUID = 1L;
@Override
public void flatMap(String value, Collector<EventLogToJsonSource> out) throws Exception {
// logger.info("eventLogStreamSource flatMap start!");
try {
// 解析 Kafka 数据
OdsEventLog odsEventLog = JSONObject.parseObject(value, new TypeReference<OdsEventLog>() {
});
if (null == odsEventLog) {
return;
}
String id = odsEventLog.getId();
String uniqueId = odsEventLog.getUnique_id();
String deviceId = odsEventLog.getDevice_id();
String event_list = odsEventLog.getEvent_list();
String createTime = odsEventLog.getCreate_time();
UserProperties userProperties = UserPropertiesProcessor
.userPropertiesToJson(odsEventLog.getUser_properties());
if (StringUtils.isEmpty(odsEventLog.getEvent_list())) {
return;
}
List<EventList> eventList = JSONObject.parseObject(event_list,
new TypeReference<List<EventList>>() {
});
EventLogToJsonSource eventLogToJsonSource = new EventLogToJsonSource(
id,
uniqueId,
deviceId,
userProperties.getCid(),
userProperties.getPhone(),
userProperties.getNick(),
eventList,
TimeConvertUtil.convertToTimestamp(createTime)
);
if (eventLogToJsonSource != null)
out.collect(eventLogToJsonSource);
// logger.info("eventLogStreamSource flatMap end!");
} catch (Exception e) {
logger.error("Error parsing ods_event_log 处理 Kafka 消息出错 | data:{} | error:{}", value,
e.getMessage());
}
}
}).assignTimestampsAndWatermarks(
WatermarkStrategy.<EventLogToJsonSource>forBoundedOutOfOrderness(Duration.ofSeconds(5))
.withTimestampAssigner((event, ts) -> event.getCreateTime()))
.filter(event -> event != null && StringUtils.isNotEmpty(event.getUniqueId())
&& StringUtils.isNotEmpty(event.getDeviceId()) && StringUtils.isNotEmpty(event.getCid())
&& StringUtils.isNotEmpty(event.getPhone())
&& CollectionUtils.isNotEmpty(event.getEventList()))
.keyBy(EventLogToJsonSource::getJoinKey);
// 设备信息数据流处理
DataStream<CollectLogToJsonSource> collectDataStream = collectLogStreamSource
DataStream<CollectLogToJsonSource> deduplicatedCollectStream = collectLogStreamSource
.flatMap(new FlatMapFunction<String, CollectLogToJsonSource>() {
private static final long serialVersionUID = 1L;
@Override
public void flatMap(String value, Collector<CollectLogToJsonSource> out) throws Exception {
// logger.info("collectLogStreamSource flatMap start!");
try {
// 解析 Kafka 数据
CollectLog log = JSONObject.parseObject(value, new TypeReference<CollectLog>() {
......@@ -155,7 +90,8 @@ public class VectorAngleCalculationAchi extends MultipleSourceCommonBase impleme
String appType = log.getAppType();
String device_info = log.getDeviceInfo();
String uniqueId = log.getUniqueId();
String createTime = log.getCreateTime();
String sendTime = log.getSendTime();
// String createTime = log.getCreateTime();
if (StringUtils.isEmpty(device_info)) {
return;
}
......@@ -181,120 +117,84 @@ public class VectorAngleCalculationAchi extends MultipleSourceCommonBase impleme
userProperties.getNick(),
Integer.valueOf(resolution.split("\\*")[0]),
Integer.valueOf(resolution.split("\\*")[1]),
TimeConvertUtil.convertToTimestamp(createTime)
TimeConvertUtil.convertToTimestamp(sendTime)
);
if (collectLogToJsonSource != null)
out.collect(collectLogToJsonSource);
// logger.info("collectLogStreamSource flatMap end!");
} catch (Exception e) {
logger.error("Error parsing ods_new_collect_log 处理 Kafka 消息出错 | data:{} | error:{}", value,
e.getMessage());
}
}
})
.assignTimestampsAndWatermarks(
WatermarkStrategy.<CollectLogToJsonSource>forBoundedOutOfOrderness(Duration.ofSeconds(5))
.withTimestampAssigner((collectLog, ts) -> collectLog.getCollectTime()))
.filter(collectLog -> collectLog != null && StringUtils.isNotEmpty(collectLog.getUniqueId())
&& StringUtils.isNotEmpty(collectLog.getDeviceId())
&& StringUtils.isNotEmpty(collectLog.getPhone())
&& StringUtils.isNotEmpty(collectLog.getCid()))
.keyBy(CollectLogToJsonSource::getJoinKey);
.filter(collectLog -> StringUtils.isNoneEmpty(collectLog.getUniqueId(),collectLog.getDeviceId(),collectLog.getPhone(),collectLog.getCid()))
.assignTimestampsAndWatermarks( // 原有水印逻辑
WatermarkStrategy.<CollectLogToJsonSource>forBoundedOutOfOrderness(Duration.ofMinutes(10))
.withTimestampAssigner((collectLog, ts) -> collectLog.getCollectTime())
)
.keyBy(collectLog -> collectLog.getCid() + "#_#" + collectLog.getDeviceId() + "#_#" + collectLog.getPhone() + "#_#" + collectLog.getUniqueId())
.process(new CollectLogProcessFunction());
// 步骤1: 展开draw_point字段
DataStream<PointRecord> pointRecordStream = eventDataStream
.coGroup(collectDataStream)
.where(EventLogToJsonSource::getJoinKey)
.equalTo(CollectLogToJsonSource::getJoinKey)
.window(TumblingEventTimeWindows.of(Duration.ofMinutes(5)))
.apply(new RichCoGroupFunction<EventLogToJsonSource, CollectLogToJsonSource, PointRecord>() {
/**
*
*/
// 事件数据流处理
DataStream<EventLogToJsonSource> eventDataStream = eventLogStreamSource.flatMap(new FlatMapFunction<String, EventLogToJsonSource>() {
private static final long serialVersionUID = 1L;
private transient ListState<CollectLogToJsonSource> collectState;
@Override
public void open(Configuration parameters) {
// 初始化状态存储
ListStateDescriptor<CollectLogToJsonSource> stateDescriptor =
new ListStateDescriptor<>("collect-state", CollectLogToJsonSource.class);
collectState = getRuntimeContext().getListState(stateDescriptor);
}
@Override
public void coGroup(
Iterable<EventLogToJsonSource> events,
Iterable<CollectLogToJsonSource> collects,
Collector<PointRecord> out) throws Exception {
// logger.info("coGroup start!");
// 缓存collect数据到状态
collects.forEach(collect -> {
public void flatMap(String value, Collector<EventLogToJsonSource> out) throws Exception {
try {
collectState.add(collect);
} catch (Exception e) {
logger.error("coGroup Error parsing CollectLogToJsonSource 处理 Kafka 消息出错 | data:{} | error:{}", collect,
e.getMessage());
}
// 解析 Kafka 数据
OdsEventLog odsEventLog = JSONObject.parseObject(value, new TypeReference<OdsEventLog>() {
});
// 创建本地缓存Map提高查询性能
Map<JoinKey, CollectLogToJsonSource> collectMap = new HashMap<>();
collectState.get().forEach(collect ->
collectMap.put(collect.getJoinKey(), collect));
for (EventLogToJsonSource event : events) {
CollectLogToJsonSource matchedCollect = collectMap.get(event.getJoinKey());
if (matchedCollect == null) continue;
for (EventList eventLog : event.getEventList()) {
List<String> pointList = Optional.ofNullable(eventLog.getR8())
.map(Properties::getR6)
.orElse(Collections.emptyList());
for (int i = 0; i < pointList.size(); i++) {
String pointStr = pointList.get(i);
String[] xy = pointStr.split("&", 2);
// 格式校验
if (xy.length != 2) {
logger.error("Invalid point format:{},in event ID:{} ", pointStr, event.getId());
continue;
}
// 坐标解析(兼容异常)
try {
// long x = Long.parseLong(xy[0].trim());
// long y = Long.parseLong(xy[1].trim());
PointRecord pointRecord = new PointRecord(
event.getId(),
eventLog.getR9(),
i,
Double.parseDouble(xy[0].trim()),
Double.parseDouble(xy[1].trim()),
matchedCollect.getResolution_x(),
matchedCollect.getResolution_y()
);
out.collect(pointRecord);
// logger.info("coGroup end! id:{},r9Time:{},rowNum:{},xy0:{},xy1:{},resolution_x:{},resolution_y:{}",
// event.getId(),
// eventLog.getR9(),
// i,
// xy[0].trim(),
// xy[1].trim(),
// matchedCollect.getResolution_x(),
// matchedCollect.getResolution_y());
} catch (NumberFormatException e) {
logger.error("Coordinate parsing error:{},in event ID:{},e:{} ", pointStr, event.getId(),e.getMessage());
}
if (null == odsEventLog) {
return;
}
String id = odsEventLog.getId();
String uniqueId = odsEventLog.getUnique_id();
String deviceId = odsEventLog.getDevice_id();
String event_list = odsEventLog.getEvent_list();
String sendTime = odsEventLog.getSend_time();
UserProperties userProperties = UserPropertiesProcessor
.userPropertiesToJson(odsEventLog.getUser_properties());
if (StringUtils.isEmpty(odsEventLog.getEvent_list())) {
return;
}
List<EventList> eventList = JSONObject.parseObject(event_list,
new TypeReference<List<EventList>>() {
});
EventLogToJsonSource eventLogToJsonSource = new EventLogToJsonSource(
id,
uniqueId,
deviceId,
userProperties.getCid(),
userProperties.getPhone(),
userProperties.getNick(),
eventList,
TimeConvertUtil.convertToTimestamp(sendTime)
);
if (eventLogToJsonSource != null)
out.collect(eventLogToJsonSource);
} catch (Exception e) {
logger.error("Error parsing ods_event_log 处理 Kafka 消息出错 | data:{} | error:{}", value,
e.getMessage());
}
// 清理当前窗口的状态
collectState.clear();
}
}).assignTimestampsAndWatermarks(WatermarkStrategy.<EventLogToJsonSource>forBoundedOutOfOrderness(Duration.ofMinutes(10))
.withTimestampAssigner((device, ts) -> device.getCreateTime()));;
// 步骤1: 展开draw_point字段
DataStream<PointRecord> pointRecordStream = eventDataStream
.connect(deduplicatedCollectStream)
.keyBy(collectLog -> collectLog.getCid() + "#_#" + collectLog.getDeviceId() + "#_#" + collectLog.getPhone() + "#_#" + collectLog.getUniqueId(),
eventLog -> eventLog.getCid() + "#_#" + eventLog.getDeviceId() + "#_#" + eventLog.getPhone() + "#_#" + eventLog.getUniqueId())
.process(new PointRecordJoinProcessor(){
/**
*
*/
private static final long serialVersionUID = 1L;
});
KeyedStream<PointRecord, Tuple2<String, Long>> keyedStream = pointRecordStream
......@@ -306,6 +206,9 @@ public class VectorAngleCalculationAchi extends MultipleSourceCommonBase impleme
@Override
public Tuple2<String, Long> getKey(PointRecord r) {
logger.info("KeyedStream >> id:{},eventTime:{}",r.id,r.eventTime);
return Tuple2.of(r.id, r.eventTime);
}
});
......
package com.flink.processor.function;
import java.time.Duration;
import org.apache.flink.api.common.state.StateTtlConfig;
import org.apache.flink.api.common.state.ValueState;
import org.apache.flink.api.common.state.ValueStateDescriptor;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.functions.KeyedProcessFunction;
import org.apache.flink.util.Collector;
import com.flink.vo.CollectLogToJsonSource;
/**
* @author wjs
* @version 创建时间:2025-6-23 13:50:36
* 类说明
*/
public class CollectLogProcessFunction extends KeyedProcessFunction<String, CollectLogToJsonSource, CollectLogToJsonSource>{
/**
*
*/
private static final long serialVersionUID = 1L;
private ValueState<CollectLogToJsonSource> latestUserState;
@Override
public void open(Configuration parameters) {
// 初始化用户状态
ValueStateDescriptor<CollectLogToJsonSource> descriptor = new ValueStateDescriptor<>("collectLog-state",
CollectLogToJsonSource.class);
StateTtlConfig ttlConfig = StateTtlConfig.newBuilder(Duration.ofMinutes(30))
.setUpdateType(StateTtlConfig.UpdateType.OnCreateAndWrite)
.cleanupIncrementally(1000, true)//Heap StateBackend 使用增量清理
// .cleanupInRocksdbCompactFilter(1000) //RocksDB StateBackend 使用压缩清理
.build();
descriptor.enableTimeToLive(ttlConfig);
latestUserState = getRuntimeContext().getState(descriptor);
}
@Override
public void processElement(CollectLogToJsonSource collectLog,Context ctx,Collector<CollectLogToJsonSource> out) throws Exception {
CollectLogToJsonSource currentLatest = latestUserState.value();
if (currentLatest == null || collectLog.getCollectTime() > currentLatest.getCollectTime()) {
latestUserState.update(collectLog);
out.collect(collectLog);
}
}
}
package com.flink.processor.function;
import java.util.Collections;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import org.apache.flink.api.common.state.MapState;
import org.apache.flink.api.common.state.MapStateDescriptor;
import org.apache.flink.api.common.state.ValueState;
import org.apache.flink.api.common.state.ValueStateDescriptor;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.functions.co.CoProcessFunction;
import org.apache.flink.util.Collector;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import com.flink.achieve.doris.VectorAngleCalculationAchi.PointRecord;
import com.flink.vo.CollectLogToJsonSource;
import com.flink.vo.EventList;
import com.flink.vo.EventLogToJsonSource;
import com.flink.vo.Properties;
/**
* @author wjs
* @version 创建时间:2025-6-23 14:05:21
* 类说明
*/
public class PointRecordJoinProcessor extends CoProcessFunction<EventLogToJsonSource, CollectLogToJsonSource, PointRecord>{
private static final Logger logger = LoggerFactory.getLogger(PointRecordJoinProcessor.class);
/**
*
*/
private static final long serialVersionUID = 1L;
private transient ValueState<CollectLogToJsonSource> userState;
private transient MapState<Long, EventLogToJsonSource> pendingEventLog;
@Override
public void open(Configuration parameters) {
userState = getRuntimeContext().getState(new ValueStateDescriptor<>("collectLog-state", CollectLogToJsonSource.class));
pendingEventLog = getRuntimeContext().getMapState(new MapStateDescriptor<>("pendingEventLog", Long.class, EventLogToJsonSource.class));
}
@Override
public void processElement1(EventLogToJsonSource eventLog,Context ctx,Collector<PointRecord> out) throws Exception {try {
CollectLogToJsonSource collectLog = userState.value();
if (collectLog != null) {
processEventWithCollectLog(eventLog, collectLog, out); // 直接处理
} else {
// 存储并设置超时计时器
pendingEventLog.put(ctx.timestamp(), eventLog);
ctx.timerService().registerEventTimeTimer(ctx.timestamp() + 60000);
}
} catch (Exception e) {
logger.error("ProcessElement1 error: {}", e.getMessage(), e);
}}
@Override
public void processElement2(CollectLogToJsonSource collectLog,Context ctx,Collector<PointRecord> out) throws Exception {try {
userState.update(collectLog); // 更新最新状态
// 主动处理滞留事件(解决数据滞留问题)
Iterator<Map.Entry<Long, EventLogToJsonSource>> it = pendingEventLog.iterator();
while (it.hasNext()) {
EventLogToJsonSource pendingEvent = it.next().getValue();
processEventWithCollectLog(pendingEvent, collectLog, out);
it.remove();
}
} catch (Exception e) {
logger.error("ProcessElement2 error: {}", e.getMessage(), e);
}}
// 统一事件处理逻辑
private void processEventWithCollectLog(EventLogToJsonSource event,
CollectLogToJsonSource collectLog,
Collector<PointRecord> out) {
for (EventList eventLogInfo : event.getEventList()) {
List<String> pointList = Optional.ofNullable(eventLogInfo.getR8())
.map(Properties::getR6).orElse(Collections.emptyList());
for(String pointStr : pointList) {
String points = cleanPointString(pointStr);
if (points.isEmpty()) continue;
// 增强解析健壮性
parseAndEmitPoints(points, event.getId(), eventLogInfo.getR9(),collectLog, out);
}
}
}
// 坐标解析(含异常处理)
private void parseAndEmitPoints(String pointStr, String eventId, long timestamp,CollectLogToJsonSource collectLog,
Collector<PointRecord> out) {
String[] points = pointStr.split(",");
for (int i = 0; i < points.length; i++) {
String trimmed = points[i].trim();
if (!isValidPointFormat(trimmed)) continue;
String[] xy = splitPoint(trimmed);
if (xy.length != 2) continue;
try {
double x = Double.parseDouble(xy[0]);
double y = Double.parseDouble(xy[1]);
logger.info("parseAndEmitPoints params id:{},r9:{},i:{},xy0:{},xy1:{},Resolution_x:{},Resolution_y:{}",eventId,
timestamp,
i,
Double.parseDouble(xy[0].trim()),
Double.parseDouble(xy[1].trim()),
collectLog.getResolution_x(),
collectLog.getResolution_y());
out.collect(new PointRecord(eventId, timestamp, i,
x, y,
collectLog.getResolution_x(),
collectLog.getResolution_y()));
} catch (NumberFormatException e) {
logger.warn("Coordinate parse failed: {} | Error: {}", pointStr, e.getMessage());
}
}
}
// 辅助方法:坐标格式校验
private boolean isValidPointFormat(String point) {
int firstIdx = point.indexOf('&');
int lastIdx = point.lastIndexOf('&');
return !point.isEmpty() &&
firstIdx != -1 &&
firstIdx == lastIdx &&
!point.startsWith(".") &&
!point.endsWith(".");
}
// 辅助方法:坐标拆分
private String[] splitPoint(String point) {
return point.split("&", 2); // 限制分割次数
}
// 辅助方法:数据清洗
private String cleanPointString(String raw) {
return Optional.ofNullable(raw)
.map(s -> s.replace("[", "")
.replace("]", "")
.replace("\"", "")
.trim())
.orElse("");
}
// 定时器清理滞留事件
@Override
public void onTimer(long timestamp, OnTimerContext ctx, Collector<PointRecord> out) {
try {
pendingEventLog.remove(timestamp - 60000);
logger.info("Cleaned expired events at {}", timestamp);
} catch (Exception e) {
logger.error("Timer error: {}", e.getMessage(), e);
}
}
}
......@@ -8,6 +8,8 @@ import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.functions.KeyedProcessFunction;
import org.apache.flink.util.Collector;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import com.flink.achieve.doris.VectorAngleCalculationAchi.PointRecord;
import com.flink.achieve.doris.VectorAngleCalculationAchi.ResultRecord;
......@@ -23,6 +25,7 @@ public class VectorAngleProcessor extends KeyedProcessFunction<Tuple2<String, Lo
*
*/
private static final long serialVersionUID = 1L;
private static final Logger logger = LoggerFactory.getLogger(VectorAngleProcessor.class);
private transient ValueState<VectorState> vectorState;
......@@ -39,6 +42,7 @@ public class VectorAngleProcessor extends KeyedProcessFunction<Tuple2<String, Lo
public void processElement(PointRecord record,
KeyedProcessFunction<Tuple2<String, Long>, PointRecord, ResultRecord>.Context ctx,
Collector<ResultRecord> out) throws Exception {
logger.info("processElement >>>start!");
VectorState state = vectorState.value();
double vectorX, vectorY, vectorM, pointV;
// 处理第一条记录
......@@ -83,6 +87,24 @@ public class VectorAngleProcessor extends KeyedProcessFunction<Tuple2<String, Lo
record.resolutionX,
record.resolutionY
);
logger.info("VectorAngleProcessor processElement >>>end! id:{},eventTime:{},rowNum:{},"
+ "positionX:{},positionY:{},vectorX:{},vectorY:{},vectorM:{},"
+ "pointV:{},cosV:{},angleV:{},radianV:{},resolutionX:{},resolutionY:{}",
record.id,
record.eventTime,
record.rowNum,
record.positionX,
record.positionY,
vectorX,
vectorY,
vectorM,
pointV,
cosV,
angleV,
radianV,
record.resolutionX,
record.resolutionY
);
out.collect(result);
// 更新状态(当前记录成为下一条的"前一条")
......
......@@ -9,6 +9,7 @@ import java.time.format.DateTimeFormatter;
import java.util.TimeZone;
import org.apache.flink.table.data.TimestampData;
import org.apache.kerby.util.SysUtil;
/**
* @author wjs
......@@ -55,4 +56,18 @@ public class TimeConvertUtil {
DateTimeFormatter formatter = DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss");
return dateTime.format(formatter);
}
public static String parseToStringSSS (Long timestamp) {
// 步骤1:时间戳 → Instant对象
Instant instant = Instant.ofEpochMilli(timestamp);
// 步骤2:指定时区 → 转为LocalDateTime
LocalDateTime dateTime = LocalDateTime.ofInstant(instant, ZoneId.systemDefault());
// 步骤3:定义格式 → 生成字符串
DateTimeFormatter formatter = DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss.SSS");
return dateTime.format(formatter);
}
public static void main(String[] args) {
String aa = parseToStringSSS(1750739369000L);
System.out.println(aa);
}
}
......@@ -21,6 +21,12 @@ public class Properties implements Serializable{
private String r5;
private List<String> r6;
private String data;
private Long startTime;
private String timeDifference;
private Long endTime;
private String userId;
public String getR1() {
return r1;
......@@ -58,5 +64,35 @@ public class Properties implements Serializable{
public void setR6(List<String> r6) {
this.r6 = r6;
}
public String getData() {
return data;
}
public void setData(String data) {
this.data = data;
}
public Long getStartTime() {
return startTime;
}
public void setStartTime(Long startTime) {
this.startTime = startTime;
}
public String getTimeDifference() {
return timeDifference;
}
public void setTimeDifference(String timeDifference) {
this.timeDifference = timeDifference;
}
public Long getEndTime() {
return endTime;
}
public void setEndTime(Long endTime) {
this.endTime = endTime;
}
public String getUserId() {
return userId;
}
public void setUserId(String userId) {
this.userId = userId;
}
}
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or sign in to comment