Commit a4e4c796 by 魏建枢

simi好友代码提交

parent b6f14797
...@@ -21,20 +21,17 @@ import org.apache.commons.lang3.StringUtils; ...@@ -21,20 +21,17 @@ import org.apache.commons.lang3.StringUtils;
import org.apache.doris.flink.sink.DorisSink; import org.apache.doris.flink.sink.DorisSink;
import org.apache.flink.api.common.eventtime.WatermarkStrategy; import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.common.functions.FlatMapFunction; import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.common.io.ParseException; import org.apache.flink.api.common.io.ParseException;
import org.apache.flink.api.common.state.MapState; import org.apache.flink.api.common.state.MapState;
import org.apache.flink.api.common.state.MapStateDescriptor; import org.apache.flink.api.common.state.MapStateDescriptor;
import org.apache.flink.api.common.state.ValueState; import org.apache.flink.api.common.state.ValueState;
import org.apache.flink.api.common.state.ValueStateDescriptor; import org.apache.flink.api.common.state.ValueStateDescriptor;
import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.configuration.Configuration; import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.DataStreamSource; import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator; import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.co.CoProcessFunction; import org.apache.flink.streaming.api.functions.co.CoProcessFunction;
import org.apache.flink.streaming.api.functions.co.ProcessJoinFunction;
import org.apache.flink.table.api.DataTypes; import org.apache.flink.table.api.DataTypes;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment; import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
import org.apache.flink.table.data.GenericRowData; import org.apache.flink.table.data.GenericRowData;
...@@ -61,7 +58,7 @@ import com.flink.processor.function.SimiFriendsTempJoinProcessor; ...@@ -61,7 +58,7 @@ import com.flink.processor.function.SimiFriendsTempJoinProcessor;
import com.flink.processor.impl.OkHttpService; import com.flink.processor.impl.OkHttpService;
import com.flink.util.SqlLoader; import com.flink.util.SqlLoader;
import com.flink.util.TimeConvertUtil; import com.flink.util.TimeConvertUtil;
import com.flink.vo.DwdSysLog; import com.flink.vo.CollectUserBehavior;
import com.flink.vo.EventIp; import com.flink.vo.EventIp;
import com.flink.vo.KafkaDataSource; import com.flink.vo.KafkaDataSource;
import com.flink.vo.SimiUserInfo; import com.flink.vo.SimiUserInfo;
...@@ -179,19 +176,15 @@ public class SimiFriendsAchi extends SourceCommonBase implements Serializable { ...@@ -179,19 +176,15 @@ public class SimiFriendsAchi extends SourceCommonBase implements Serializable {
DorisSink<RowData> dorisSinkTotalTemp = DorisConnector.sinkDoris(totalConfigTemp.getFields(), totalConfigTemp.getTypes(), totalConfigTemp.getTableName()); DorisSink<RowData> dorisSinkTotalTemp = DorisConnector.sinkDoris(totalConfigTemp.getFields(), totalConfigTemp.getTypes(), totalConfigTemp.getTableName());
DataStreamSource<String> openSimiApiStreamSource = null; DataStreamSource<String> openSimiApiStreamSource = null;
DataStreamSource<String> eventLogStreamSource = null; DataStreamSource<String> collectUserBehaviorStreamSource = null;
DataStreamSource<String> userStreamSource = null; DataStreamSource<String> userStreamSource = null;
DataStreamSource<String> sysLogStreamSource = null;
if(CollectionUtils.isNotEmpty(dataSourceList)) { if(CollectionUtils.isNotEmpty(dataSourceList)) {
for(KafkaDataSource kafkaDataSource : dataSourceList) { for(KafkaDataSource kafkaDataSource : dataSourceList) {
if(StringUtils.equals(kafkaDataSource.getTopic(),TopicTypeEnum.DWD_SYS_LOG.getTopic())) {
sysLogStreamSource = kafkaDataSource.getDataStreamSource();
}
if(StringUtils.equals(kafkaDataSource.getTopic(),TopicTypeEnum.OPEN_SIMI_API.getTopic())) { if(StringUtils.equals(kafkaDataSource.getTopic(),TopicTypeEnum.OPEN_SIMI_API.getTopic())) {
openSimiApiStreamSource = kafkaDataSource.getDataStreamSource(); openSimiApiStreamSource = kafkaDataSource.getDataStreamSource();
} }
if(StringUtils.equals(kafkaDataSource.getTopic(),TopicTypeEnum.ODS_EVENT_LOG.getTopic())) { if(StringUtils.equals(kafkaDataSource.getTopic(),TopicTypeEnum.ODS_COLLECT_USER_BEHAVIOR.getTopic())) {
eventLogStreamSource = kafkaDataSource.getDataStreamSource(); collectUserBehaviorStreamSource = kafkaDataSource.getDataStreamSource();
} }
if(StringUtils.equals(kafkaDataSource.getTopic(),TopicTypeEnum.SIMI_USER_LIST_TOPIC.getTopic())) { if(StringUtils.equals(kafkaDataSource.getTopic(),TopicTypeEnum.SIMI_USER_LIST_TOPIC.getTopic())) {
userStreamSource = kafkaDataSource.getDataStreamSource(); userStreamSource = kafkaDataSource.getDataStreamSource();
...@@ -201,52 +194,6 @@ public class SimiFriendsAchi extends SourceCommonBase implements Serializable { ...@@ -201,52 +194,6 @@ public class SimiFriendsAchi extends SourceCommonBase implements Serializable {
return; return;
} }
SingleOutputStreamOperator<DwdSysLog> sysDataStream = sysLogStreamSource
.map(new MapFunction<String, DwdSysLog>() {
/**
*
*/
private static final long serialVersionUID = 1L;
@Override
public DwdSysLog map(String value) throws Exception {
try {
DwdSysLog sysLog = JSON.parseObject(value, DwdSysLog.class);
logger.debug("sysLog原始数据: {}", sysLog);
if (sysLog == null || StringUtils.isEmpty(sysLog.getSend_time())) {
logger.warn("空值日志: {}", value);
return null;
}
// 时间戳转换(添加异常捕获)
Long waterMarkTime = TimeConvertUtil.convertToTimestamp(sysLog.getSend_time());
if (waterMarkTime <= 0) {
logger.error("时间转换失败: {}", sysLog.getSend_time());
return null;
}
sysLog.setWaterMarkTime(waterMarkTime);
return sysLog;
} catch (Exception e) {
logger.error("JSON解析失败: {} | 错误: {}", value, e.getMessage());
return null;
}
}
})
.filter(obj -> {
if (obj == null) return false;
if (!(obj instanceof DwdSysLog)) {
logger.error("Unexpected type: {}", obj.getClass().getName());
return false;
}
return true;
})
.returns(DwdSysLog.class)
.assignTimestampsAndWatermarks(
WatermarkStrategy.<DwdSysLog>forBoundedOutOfOrderness(Duration.ofSeconds(5))
.withTimestampAssigner((sysLog, ts) -> sysLog.getWaterMarkTime())
);
SingleOutputStreamOperator<FriendsRecord> rowDataStream = openSimiApiStreamSource SingleOutputStreamOperator<FriendsRecord> rowDataStream = openSimiApiStreamSource
.flatMap(new FlatMapFunction<String, FriendsRecord>() { .flatMap(new FlatMapFunction<String, FriendsRecord>() {
private static final long serialVersionUID = 1L; private static final long serialVersionUID = 1L;
...@@ -323,7 +270,7 @@ public class SimiFriendsAchi extends SourceCommonBase implements Serializable { ...@@ -323,7 +270,7 @@ public class SimiFriendsAchi extends SourceCommonBase implements Serializable {
}); });
this.simiFriendTotalAnalysis(rowDataStream, dorisSink, dorisSinkTotal, detailConfig, totalConfig); this.simiFriendTotalAnalysis(rowDataStream, dorisSink, dorisSinkTotal, detailConfig, totalConfig);
this.simiFriendTotalAnalysisTemp(rowDataStream, dorisSinkTotalTemp, totalConfigTemp,eventLogStreamSource,userStreamSource,sysDataStream); this.simiFriendTotalAnalysisTemp(rowDataStream, dorisSinkTotalTemp, totalConfigTemp,collectUserBehaviorStreamSource,userStreamSource);
} }
...@@ -331,22 +278,37 @@ public class SimiFriendsAchi extends SourceCommonBase implements Serializable { ...@@ -331,22 +278,37 @@ public class SimiFriendsAchi extends SourceCommonBase implements Serializable {
SingleOutputStreamOperator<FriendsRecord> rowDataStream, SingleOutputStreamOperator<FriendsRecord> rowDataStream,
DorisSink<RowData> dorisSinkTotalTemp, DorisSink<RowData> dorisSinkTotalTemp,
TableConfig totalConfigTemp, TableConfig totalConfigTemp,
DataStreamSource<String> eventLogStreamSource, DataStreamSource<String> collectUserBehaviorStreamSource,
DataStreamSource<String> userStreamSource, DataStreamSource<String> userStreamSource
SingleOutputStreamOperator<DwdSysLog> sysDataStream
) { ) {
// 事件数据流处理 // 事件数据流处理
DataStream<EventIp> eventDataStream = eventLogStreamSource.flatMap(new FlatMapFunction<String, EventIp>() { DataStream<EventIp> eventDataStream = collectUserBehaviorStreamSource.flatMap(new FlatMapFunction<String, EventIp>() {
private static final long serialVersionUID = 1L; private static final long serialVersionUID = 1L;
@Override @Override
public void flatMap(String value, Collector<EventIp> out) throws Exception { public void flatMap(String value, Collector<EventIp> out) throws Exception {
try { try {
// 解析 Kafka 数据 // 解析 Kafka 数据
EventIp event = EventIpLatestAchi.handleData(value); CollectUserBehavior log = JSONObject.parseObject(value,new TypeReference<CollectUserBehavior>(){});
if (event != null) if (log != null) {
out.collect(event); EventIp eventIp = new EventIp();
eventIp.setCid(log.getCid());
eventIp.setEventTime(TimeConvertUtil.convertToTimestamp(log.getEvent_time()));
eventIp.setPhone(log.getPhone());
eventIp.setIp(log.getIp_name());
eventIp.setAreaName(log.getArea_name());
eventIp.setNick(log.getNick());
eventIp.setAppKey(log.getApp_key());
eventIp.setAppType(log.getApp_type());
eventIp.setCreateTime(log.getCreate_time());
eventIp.setDeviceId(log.getDevice_id());
eventIp.setUniqueId(log.getUnique_id());
eventIp.setNetwork_ip(log.getNetwork_ip());
eventIp.setNetwork_area_name(log.getNetwork_area_name());
eventIp.setZone_name(log.getZone_name());
out.collect(eventIp);
}
} catch (Exception e) { } catch (Exception e) {
logger.error("Error parsing ods_event_log 处理 Kafka 消息出错 | data:{} | error:{}", value,e.getMessage()); logger.error("Error parsing ods_event_log 处理 Kafka 消息出错 | data:{} | error:{}", value,e.getMessage());
} }
...@@ -357,65 +319,9 @@ public class SimiFriendsAchi extends SourceCommonBase implements Serializable { ...@@ -357,65 +319,9 @@ public class SimiFriendsAchi extends SourceCommonBase implements Serializable {
.withTimestampAssigner((event, ts) -> event.getEventTime()) .withTimestampAssigner((event, ts) -> event.getEventTime())
).filter(event -> event != null && StringUtils.isNotEmpty(event.getCid())); ).filter(event -> event != null && StringUtils.isNotEmpty(event.getCid()));
DataStream<EventIp> mergedEventStream = sysDataStream.keyBy(new KeySelector<DwdSysLog, String>() {
/**
*
*/
private static final long serialVersionUID = 1L;
@Override
public String getKey(DwdSysLog sysLog) {
return sysLog.getDevice_id() + "#_#" +
sysLog.getUnique_id() + "#_#" +
sysLog.getCid();
}
})
.intervalJoin(
eventDataStream.keyBy(new KeySelector<EventIp, String>() {
/**
*
*/
private static final long serialVersionUID = 1L;
@Override
public String getKey(EventIp dev) {
return dev.getDeviceId() + "#_#" +
dev.getUniqueId() + "#_#" +
dev.getCid(); // 注意字段名一致性!
}
})
)
.between(Duration.ofMinutes(-10), Duration.ofMinutes(5))
.process(new ProcessJoinFunction<DwdSysLog, EventIp, EventIp>() {
/**
*
*/
private static final long serialVersionUID = 1L;
@Override
public void processElement(DwdSysLog sysLog, EventIp dev, Context ctx, Collector<EventIp> out) {
EventIp eventIp = new EventIp();
eventIp.setCid(dev.getCid());
eventIp.setEventTime(dev.getEventTime());
eventIp.setPhone(dev.getPhone());
eventIp.setIp(dev.getIp());
eventIp.setAreaName(dev.getAreaName());
eventIp.setNick(dev.getNick());
eventIp.setAppKey(dev.getAppKey());
eventIp.setAppType(dev.getAppType());
eventIp.setCreateTime(dev.getCreateTime());
eventIp.setNetwork_ip(sysLog.getNetwork_ip());
eventIp.setNetwork_area_name(sysLog.getNetwork_area_name());
eventIp.setZone_name(sysLog.getZone_name());
out.collect(eventIp);
}
});
DataStream<TotalTemp> joinedStream = rowDataStream DataStream<TotalTemp> joinedStream = rowDataStream
.keyBy(record -> record.cid) .keyBy(record -> record.cid)
.connect(mergedEventStream.keyBy(EventIp::getCid)) .connect(eventDataStream.keyBy(EventIp::getCid))
.process(new CoProcessFunction<FriendsRecord, EventIp, TotalTemp>() { .process(new CoProcessFunction<FriendsRecord, EventIp, TotalTemp>() {
private static final long serialVersionUID = 1L; private static final long serialVersionUID = 1L;
// 修复3:使用MapState替代临时缓存 // 修复3:使用MapState替代临时缓存
......
...@@ -8,7 +8,6 @@ import com.flink.vo.KafkaTopic; ...@@ -8,7 +8,6 @@ import com.flink.vo.KafkaTopic;
* 类说明 * 类说明
*/ */
public enum TopicTypeEnum { public enum TopicTypeEnum {
DWD_SYS_LOG("dwd_sys_log","dwdSysLog"),//弃用
ODS_EVENT_LOG("ods_event_log","eventLogGroup"), ODS_EVENT_LOG("ods_event_log","eventLogGroup"),
ODS_NEW_COLLECT_LOG("ods_new_collect_log","odsNewCollectLog"), ODS_NEW_COLLECT_LOG("ods_new_collect_log","odsNewCollectLog"),
ODS_ZIPPER_STRATEGY("ods_zipper_strategy","odsZipperStrategy"), ODS_ZIPPER_STRATEGY("ods_zipper_strategy","odsZipperStrategy"),
...@@ -30,7 +29,6 @@ public enum TopicTypeEnum { ...@@ -30,7 +29,6 @@ public enum TopicTypeEnum {
ODS_PC_EVENT_LOG("ods_pc_event_log","odsPcEventLog"), ODS_PC_EVENT_LOG("ods_pc_event_log","odsPcEventLog"),
ODS_PC_COLLECT_LOG("ods_pc_collect_log","odsPcCollectLog"), ODS_PC_COLLECT_LOG("ods_pc_collect_log","odsPcCollectLog"),
ODS_COMMUNITY_HISTORY("ods_community_history","odsCommunityHistory"), ODS_COMMUNITY_HISTORY("ods_community_history","odsCommunityHistory"),
ODS_SYS_LOG("ods_sys_log","odsSysLog"),//弃用
ODS_COLLECT_USER_BEHAVIOR("ods_collect_user_behavior","odsCollectUserBehavior"), ODS_COLLECT_USER_BEHAVIOR("ods_collect_user_behavior","odsCollectUserBehavior"),
ODS_EXCEPTION_EVENT_TOPIC("ods_exception_event_topic","odsExceptionEventTopic"), ODS_EXCEPTION_EVENT_TOPIC("ods_exception_event_topic","odsExceptionEventTopic"),
......
...@@ -30,9 +30,8 @@ public class SimiFriendsProcessor implements JobProcessor{ ...@@ -30,9 +30,8 @@ public class SimiFriendsProcessor implements JobProcessor{
private static List<KafkaTopic> createTopicList() { private static List<KafkaTopic> createTopicList() {
return Arrays.stream(new TopicTypeEnum[]{ return Arrays.stream(new TopicTypeEnum[]{
TopicTypeEnum.OPEN_SIMI_API, TopicTypeEnum.OPEN_SIMI_API,
TopicTypeEnum.ODS_EVENT_LOG, TopicTypeEnum.ODS_COLLECT_USER_BEHAVIOR,
TopicTypeEnum.SIMI_USER_LIST_TOPIC, TopicTypeEnum.SIMI_USER_LIST_TOPIC
TopicTypeEnum.DWD_SYS_LOG
}).map(TopicTypeEnum::createKafkaTopic) }).map(TopicTypeEnum::createKafkaTopic)
.collect(Collectors.toList()); .collect(Collectors.toList());
......
package com.flink.vo;
import java.io.Serializable;
import lombok.Data;
import lombok.ToString;
/**
* @author wjs
* @version 创建时间:2025-8-24 20:22:15
* 类说明
*/
@Data
@ToString
public class DwdSysLog implements Serializable{/**
*
*/
private static final long serialVersionUID = 1L;
private String network_ip;
private String network_area_name;
private String unique_id;
private String device_id;
private String device_id_v1;
private String send_type;
private String zone_name;
private String zone_code;
private String zone_type;
private String send_time;
private String app_key;
private String app_type;
private String cid;
private String phone;
private String nick;
private Long waterMarkTime;
}
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or sign in to comment