Commit 708c9999 by 魏建枢

代码提交

parent 4ba81497
......@@ -44,6 +44,8 @@ import com.flink.util.ip2region.SearcherUtil;
import com.flink.vo.EventIp;
import com.flink.vo.KafkaDataSource;
import com.flink.vo.OdsEventLog;
import com.flink.vo.PcEventInfo;
import com.flink.vo.PcOdsEventLog;
import com.flink.vo.Result;
import com.flink.vo.SimiUserInfo;
import com.flink.vo.UserProperties;
......@@ -135,6 +137,7 @@ public class EventIpLatestAchi extends MultipleSourceCommonBase implements Seria
private DataStream<Result> operatorStream(List<KafkaDataSource> dataSourceList) {
DataStreamSource<String> userStreamSource = null;
DataStreamSource<String> eventStreamSource = null;
DataStreamSource<String> pcEventStreamSource = null;
if(CollectionUtils.isNotEmpty(dataSourceList)) {
for(KafkaDataSource kafkaDataSource : dataSourceList) {
if(StringUtils.equals(kafkaDataSource.getTopic(),TopicTypeEnum.SIMI_USER_LIST_TOPIC.getTopic())) {
......@@ -143,6 +146,9 @@ public class EventIpLatestAchi extends MultipleSourceCommonBase implements Seria
if(StringUtils.equals(kafkaDataSource.getTopic(),TopicTypeEnum.ODS_EVENT_LOG.getTopic())) {
eventStreamSource = kafkaDataSource.getDataStreamSource();
}
if(StringUtils.equals(kafkaDataSource.getTopic(),TopicTypeEnum.ODS_PC_EVENT_LOG.getTopic())) {
pcEventStreamSource = kafkaDataSource.getDataStreamSource();
}
}
}else {
return null;
......@@ -171,7 +177,7 @@ public class EventIpLatestAchi extends MultipleSourceCommonBase implements Seria
.withTimestampAssigner((user, ts) -> user.getUpdateTime()))
.keyBy(user -> user.getCid() + "#_#" + user.getPhone_number()).process(new LatestUserProcessFunction());
// 事件数据流处理
//合并APP/PC事件流
DataStream<EventIp> eventDataStream = eventStreamSource.flatMap(new FlatMapFunction<String, EventIp>() {
private static final long serialVersionUID = 1L;
......@@ -186,7 +192,24 @@ public class EventIpLatestAchi extends MultipleSourceCommonBase implements Seria
logger.error("Error parsing ods_event_log 处理 Kafka 消息出错 | data:{} | error:{}", value,e.getMessage());
}
}
})
}).union(
// PC事件信息数据流处理
pcEventStreamSource.flatMap(new FlatMapFunction<String, EventIp>() {
private static final long serialVersionUID = 1L;
@Override
public void flatMap(String value, Collector<EventIp> out) throws Exception {
try {
// 解析 Kafka 数据
EventIp event = handlePcData(value);
if (event != null)
out.collect(event);
} catch (Exception e) {
logger.error("Error parsing ods_pc_event_log 处理 Kafka 消息出错 | data:{} | error:{}", value,e.getMessage());
}
}
})
)
.assignTimestampsAndWatermarks(WatermarkStrategy.<EventIp>forBoundedOutOfOrderness(Duration.ofSeconds(5))
.withTimestampAssigner((event, ts) -> event.getEventTime()));
......@@ -331,4 +354,67 @@ public class EventIpLatestAchi extends MultipleSourceCommonBase implements Seria
eventIp.setCreateTime(createTime);
return eventIp;
}
public static EventIp handlePcData(String record) throws Exception {
PcOdsEventLog pcOdsEventLog = JSONObject.parseObject(record, new TypeReference<PcOdsEventLog>() {});
String appKey = pcOdsEventLog.getApp_key();
String appType = pcOdsEventLog.getApp_type();
String createTime = pcOdsEventLog.getCreate_time();
String event_info = pcOdsEventLog.getEvent_info();
if(StringUtils.isEmpty(event_info)) {
return null;
}
PcEventInfo pcEventInfo = JSONObject.parseObject(event_info, new TypeReference<PcEventInfo>() {});
if(null == pcEventInfo) {
return null;
}
if(StringUtils.isEmpty(pcEventInfo.getCid())) {
return null;
}
if(StringUtils.isEmpty(pcEventInfo.getPhone())) {
return null;
}
String routeIp = pcEventInfo.getS1();
if(StringUtils.isEmpty(routeIp)) {
return null;
}
List<String> ips = SearcherUtil.convertStringToList(routeIp);
if (CollectionUtils.isEmpty(ips)) {
return null;
}
String ip_name = null;
String area_name = null;
for (String ip : ips) {
if (!SearcherUtil.ipv6(ip)) {
area_name = SearcherUtil.getCityInfoByFile(ip);
if (!CompareUtils.stringExists(area_name, "0|0|0|内网IP|内网IP",
"0|0|0|内网IP|Finance-and-Promoting-Technology",
"Request timed out.",
"*")) {
ip_name = ip;
break;
} else {
ip_name = null;
area_name = null;
}
}
}
if (StringUtils.isEmpty(ip_name)) {
return null;
}
EventIp eventIp = new EventIp();
eventIp.setCid(pcEventInfo.getCid());
eventIp.setEventTime(TimeConvertUtil.convertToTimestamp(createTime));
eventIp.setPhone(pcEventInfo.getPhone());
eventIp.setIp(ip_name);
eventIp.setAreaName(area_name);
eventIp.setNick(pcEventInfo.getNick());
eventIp.setAppKey(appKey);
eventIp.setAppType(appType);
eventIp.setCreateTime(createTime);
return eventIp;
}
}
......@@ -27,6 +27,8 @@ public enum TopicTypeEnum {
SIMI_USER_LIST_TOPIC("simi_user_list","simiUserList"),
ABROAD_SIMI_USER_LIST_TOPIC("abroad_simi_user_list","abroadSimiUserList"),
OPEN_SIMI_API("ods_open_simi_api","odsOpenSimiApi"),
ODS_PC_EVENT_LOG("ods_pc_event_log","odsPcEventLog"),
ODS_PC_COLLECT_LOG("ods_pc_collect_log","odsPcCollectLog"),
;
private String topic;
......
......@@ -30,6 +30,7 @@ public class DeviceIdLatestProcessor implements JobProcessor{
TopicTypeEnum.ODS_NEW_COLLECT_LOG,
TopicTypeEnum.SIMI_USER_LIST_TOPIC,
TopicTypeEnum.ABROAD_SIMI_USER_LIST_TOPIC,
TopicTypeEnum.ODS_PC_COLLECT_LOG,
}).map(TopicTypeEnum::createKafkaTopic)
.collect(Collectors.toList());
......
......@@ -28,7 +28,8 @@ public class EventIpLatestProcessor implements JobProcessor{
private static List<KafkaTopic> createTopicList() {
return Arrays.stream(new TopicTypeEnum[]{
TopicTypeEnum.ODS_EVENT_LOG,
TopicTypeEnum.SIMI_USER_LIST_TOPIC
TopicTypeEnum.SIMI_USER_LIST_TOPIC,
TopicTypeEnum.ODS_PC_EVENT_LOG,
}).map(TopicTypeEnum::createKafkaTopic)
.collect(Collectors.toList());
}
......
......@@ -88,6 +88,8 @@ public class SearcherUtil {
}
public static void main(String[] args) throws Exception {
getCityInfoByFile("1.9.241.214");
// getCityInfoByFile("1.9.241.214");
String str = "1.1.1.1,2.2.2.2,5.5.5.5";
System.out.println(convertStringToList(str));
}
}
package com.flink.vo;
import java.io.Serializable;
import lombok.Data;
import lombok.ToString;
/**
* @author wjs
* @version 创建时间:2025-7-3 17:21:25 类说明
*/
@Data
@ToString
public class PcCollectLog implements Serializable {
/**
*
*/
private static final long serialVersionUID = 1L;
private String app_type;
private String app_version;
private String create_time;
private String send_type;
private String zone_name;
private String zone_code;
private String flume_type;
private String app_channel;
private String app_key;
private String send_time;
private String device_info;
private String zone_type;
private String id;
private String user_agent;
}
package com.flink.vo;
import java.io.Serializable;
import lombok.Data;
import lombok.ToString;
/**
* @author wjs
* @version 创建时间:2025-7-3 17:32:24 类说明
*/
@Data
@ToString
public class PcDeviceInfo implements Serializable {
/**
*
*/
private static final long serialVersionUID = 1L;
private String b2;
private String b3;
private String i7;
private String i8;
private String k1;
private String k3;
private String k4;
private String k6;
private String c5;
private String c6;
private String k10;
private String b4;
private String ap2;
private String b1;
private String d1;
private String d2;
private String d13;
private String d14;
private String p10;
private String p14;
private String cid;
private String phone;
private String nick;
}
package com.flink.vo;
import java.io.Serializable;
import java.util.List;
import lombok.Data;
import lombok.ToString;
/**
* @author wjs
* @version 创建时间:2025-7-4 14:40:15 类说明
*/
@Data
@ToString
public class PcEventInfo implements Serializable {
/**
*
*/
private static final long serialVersionUID = 1L;
private String i7;
private String i8;
private String cid;
private String phone;
private String nick;
private String s1;
private List<PcProperties> properties;
}
package com.flink.vo;
import java.io.Serializable;
import lombok.Data;
import lombok.ToString;
/**
* @author wjs
* @version 创建时间:2025-7-4 14:35:01
* 类说明
*/
@Data
@ToString
public class PcOdsEventLog implements Serializable{
/**
*
*/
private static final long serialVersionUID = 1L;
private String app_type;
private String app_version;
private String create_time;
private String send_type;
private String zone_name;
private String zone_code;
private String flume_type;
private String app_channel;
private String app_key;
private String send_time;
private String event_info;
private String zone_type;
private String id;
private String user_agent;
}
package com.flink.vo;
import java.io.Serializable;
import lombok.Data;
import lombok.ToString;
/**
* @author wjs
* @version 创建时间:2025-7-4 14:42:15
* 类说明
*/
@Data
@ToString
public class PcProperties implements Serializable{
/**
*
*/
private static final long serialVersionUID = 1L;
private String r9;
private String r4;
private String r3;
}
package com.flink.vo;
import java.util.Set;
import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;
/**
* @author wjs
* @version 创建时间:2025-7-1 18:19:55
* 类说明 定义注册事件数据结构
*/
@Data
@AllArgsConstructor
@NoArgsConstructor
public class RegisterEvent {
private String userId; // 用户ID
private String deviceId; // 设备ID
private String ip; // IP地址
private String phone; // 手机号
private String nickname; // 昵称
private Set<String> friends; // 添加的好友列表
private Set<String> groups; // 加入的群组列表
private Long timestamp; // 事件时间戳
}
......@@ -27,4 +27,9 @@ chainless.android.appKey= 8ooOvXJo276
chainless.ios.appKey= 9JQ3A7GA420
abroadChainless.android.appKey= 672OvXJo236
abroadChainless.ios.appKey= KJa3A7GA410
\ No newline at end of file
abroadChainless.ios.appKey= KJa3A7GA410
simi.pc.winAppKey: pc1KPjmh951
simi.pc.linuxAppKey: pcrIjvC5805
simi.pc.macINterAppKey: pcUXtmMh356
simi.pc.macArmAppKey: pcrPGB1z531
\ No newline at end of file
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or sign in to comment