Commit e691cc10 by 魏建枢

simi好友指标作业

parent 2ac13f87
package com.flink.achieve.doris;
import java.io.Serializable;
import java.time.Duration;
import java.time.LocalDateTime;
import java.time.format.DateTimeFormatter;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
......@@ -14,10 +17,18 @@ import java.util.stream.Collectors;
import org.apache.commons.collections.CollectionUtils;
import org.apache.commons.lang3.StringUtils;
import org.apache.doris.flink.sink.DorisSink;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.common.io.ParseException;
import org.apache.flink.api.common.state.MapState;
import org.apache.flink.api.common.state.MapStateDescriptor;
import org.apache.flink.api.common.state.ValueState;
import org.apache.flink.api.common.state.ValueStateDescriptor;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.functions.co.CoProcessFunction;
import org.apache.flink.table.api.DataTypes;
import org.apache.flink.table.data.GenericRowData;
import org.apache.flink.table.data.RowData;
......@@ -34,10 +45,17 @@ import com.flink.common.DorisConnector;
import com.flink.common.DynamicSqlBuilder;
import com.flink.common.DynamicSqlBuilder.SqlColumn;
import com.flink.common.DynamicSqlBuilder.SqlWithParams;
import com.flink.common.SourceCommonBase;
import com.flink.common.MultipleSourceCommonBase;
import com.flink.config.TableConfig;
import com.flink.enums.OpenSimiApiTypeEnum;
import com.flink.enums.TopicTypeEnum;
import com.flink.processor.function.LatestUserProcessFunction;
import com.flink.processor.function.SimiFriendsTempJoinProcessor;
import com.flink.processor.impl.OkHttpService;
import com.flink.util.TimeConvertUtil;
import com.flink.vo.EventIp;
import com.flink.vo.KafkaDataSource;
import com.flink.vo.SimiUserInfo;
import com.flink.vo.simi.FriendsStream;
import com.flink.vo.simi.InitiateFriendRequestReqDto;
import com.flink.vo.simi.SimiFriends;
......@@ -46,7 +64,7 @@ import com.flink.vo.simi.SimiFriends;
* @author wjs
* @version 创建时间:2025-5-29 10:53:48 类说明
*/
public class SimiFriendsAchi extends SourceCommonBase implements Serializable {
public class SimiFriendsAchi extends MultipleSourceCommonBase implements Serializable {
/**
*
......@@ -55,7 +73,7 @@ public class SimiFriendsAchi extends SourceCommonBase implements Serializable {
private static final Logger logger = LoggerFactory.getLogger(SimiFriendsAchi.class);
@Override
public void parseSourceKafkaJson(DataStreamSource<String> dataStreamSource) throws ParseException, Exception {
public void parseSourceKafkaJson(List<KafkaDataSource> dataSourceList) throws ParseException, Exception {
// =================配置入库字段=========================================
// 明细表结构
TableConfig detailConfig = new TableConfig(
......@@ -113,17 +131,60 @@ public class SimiFriendsAchi extends SourceCommonBase implements Serializable {
},
"bi.simi_friends_total"
);
// 汇总临时表结构
TableConfig totalConfigTemp = new TableConfig(
new String[]{
"cid",
"overlap_cid_total",
"ip",
"area_name",
"add_method",
"create_time",
"__DORIS_DELETE_SIGN__"
},
new DataType[]{
DataTypes.STRING(),
DataTypes.INT(),
DataTypes.STRING(),
DataTypes.STRING(),
DataTypes.STRING(),
DataTypes.TIMESTAMP(3),
DataTypes.INT()
},
"bi.simi_friends_total_temp"
);
// =================流式处理=========================================
DorisSink<RowData> dorisSink = DorisConnector.sinkDoris(detailConfig.getFields(), detailConfig.getTypes(), detailConfig.getTableName());
DorisSink<RowData> dorisSinkTotal = DorisConnector.sinkDoris(totalConfig.getFields(), totalConfig.getTypes(), totalConfig.getTableName());
DorisSink<RowData> dorisSinkTotalTemp = DorisConnector.sinkDoris(totalConfigTemp.getFields(), totalConfigTemp.getTypes(), totalConfigTemp.getTableName());
SingleOutputStreamOperator<FriendsRecord> rowDataStream = dataStreamSource
DataStreamSource<String> openSimiApiStreamSource = null;
DataStreamSource<String> eventLogStreamSource = null;
DataStreamSource<String> userStreamSource = null;
if(CollectionUtils.isNotEmpty(dataSourceList)) {
for(KafkaDataSource kafkaDataSource : dataSourceList) {
if(StringUtils.equals(kafkaDataSource.getTopic(),TopicTypeEnum.OPEN_SIMI_API.getTopic())) {
openSimiApiStreamSource = kafkaDataSource.getDataStreamSource();
}
if(StringUtils.equals(kafkaDataSource.getTopic(),TopicTypeEnum.ODS_EVENT_LOG.getTopic())) {
eventLogStreamSource = kafkaDataSource.getDataStreamSource();
}
if(StringUtils.equals(kafkaDataSource.getTopic(),TopicTypeEnum.SIMI_USER_LIST_TOPIC.getTopic())) {
userStreamSource = kafkaDataSource.getDataStreamSource();
}
}
}else {
return;
}
SingleOutputStreamOperator<FriendsRecord> rowDataStream = openSimiApiStreamSource
.flatMap(new FlatMapFunction<String, FriendsRecord>() {
private static final long serialVersionUID = 1L;
@Override
public void flatMap(String value, Collector<FriendsRecord> out) throws Exception {
try {
try {
JSONObject jsonObj = JSON.parseObject(value);
String sendType = jsonObj.getString("send_type");
if (!StringUtils.equals(OpenSimiApiTypeEnum.INITIATE_FRIEND_REQUEST.getCode(), sendType)) {
......@@ -137,6 +198,9 @@ public class SimiFriendsAchi extends SourceCommonBase implements Serializable {
String friendCid = jsonReqDto.getFriendCid();
String cidInfo = OkHttpService.friends(cid);
String friendCidInfo = OkHttpService.friends(friendCid);
String addMethod = jsonReqDto.getAddMethod();
String cidAddMethod = "主动添加_"+addMethod;
String friendCidAddMethod = "被动添加_"+addMethod;
if (StringUtils.isNotEmpty(cidInfo) && StringUtils.isNotEmpty(friendCidInfo)) {
List<SimiFriends> cidsList = JSONObject.parseObject(cidInfo,
new TypeReference<List<SimiFriends>>() {
......@@ -144,7 +208,7 @@ public class SimiFriendsAchi extends SourceCommonBase implements Serializable {
List<SimiFriends> friendsList = JSONObject.parseObject(friendCidInfo,
new TypeReference<List<SimiFriends>>() {
});
// logger.info("统计条件 cid:{},total:{}", cid, friendsList.size());
// logger.info("统计条件 cid:{},total:{}", cid, friendsList.size());
List<FriendsStream> friendsStreamList = new ArrayList<>();
for (SimiFriends friend : cidsList) {
FriendsStream friendsStream = new FriendsStream(
......@@ -176,7 +240,9 @@ public class SimiFriendsAchi extends SourceCommonBase implements Serializable {
cid,
friendCid,
cidsList,
friendsList
friendsList,
cidAddMethod,
friendCidAddMethod
);
out.collect(friendsRecord);
}
......@@ -187,6 +253,193 @@ public class SimiFriendsAchi extends SourceCommonBase implements Serializable {
}
});
this.simiFriendTotalAnalysis(rowDataStream, dorisSink, dorisSinkTotal, detailConfig, totalConfig);
this.simiFriendTotalAnalysisTemp(rowDataStream, dorisSinkTotalTemp, totalConfigTemp,eventLogStreamSource,userStreamSource);
}
public void simiFriendTotalAnalysisTemp(
SingleOutputStreamOperator<FriendsRecord> rowDataStream,
DorisSink<RowData> dorisSinkTotalTemp,
TableConfig totalConfigTemp,
DataStreamSource<String> eventLogStreamSource,
DataStreamSource<String> userStreamSource
) {
// 事件数据流处理
DataStream<EventIp> eventDataStream = eventLogStreamSource.flatMap(new FlatMapFunction<String, EventIp>() {
private static final long serialVersionUID = 1L;
@Override
public void flatMap(String value, Collector<EventIp> out) throws Exception {
try {
// 解析 Kafka 数据
EventIp event = EventIpLatestAchi.handleData(value);
if (event != null)
out.collect(event);
} catch (Exception e) {
logger.error("Error parsing ods_event_log 处理 Kafka 消息出错 | data:{} | error:{}", value,e.getMessage());
}
}
})
.assignTimestampsAndWatermarks(
WatermarkStrategy.<EventIp>forBoundedOutOfOrderness(Duration.ofSeconds(5))
.withTimestampAssigner((event, ts) -> event.getEventTime())
).filter(event -> event != null && StringUtils.isNotEmpty(event.getCid()));
DataStream<TotalTemp> joinedStream = rowDataStream
.keyBy(record -> record.cid)
.connect(eventDataStream.keyBy(EventIp::getCid))
.process(new CoProcessFunction<FriendsRecord, EventIp, TotalTemp>() {
private static final long serialVersionUID = 1L;
// 修复3:使用MapState替代临时缓存
private transient MapState<String, EventIp> latestEventState;
private transient ValueState<Integer> overlapState; // 预存重叠计算结果
@Override
public void open(Configuration parameters) {
// 状态描述符初始化
MapStateDescriptor<String, EventIp> eventDesc =
new MapStateDescriptor<>("eventState", String.class, EventIp.class);
latestEventState = getRuntimeContext().getMapState(eventDesc);
ValueStateDescriptor<Integer> overlapDesc =
new ValueStateDescriptor<>("overlapState", Integer.class);
overlapState = getRuntimeContext().getState(overlapDesc);
}
@Override
public void processElement1(FriendsRecord record, Context ctx, Collector<TotalTemp> out) throws Exception {
// 预计算重叠好友数
int overlap = calculateOverlap(record);
overlapState.update(overlap); // 存储计算结果
// 获取事件数据(优先从状态获取)
EventIp cidEvent = latestEventState.get(record.cid);
EventIp friendEvent = latestEventState.get(record.friendCid);
List<SimiFriends> cidsList = record.getCidsList();
List<SimiFriends> friendsList = record.getFriendsList();
// 输出两条记录(复用计算结果)
emitTempRecord(cidsList,friendsList,record.cid, overlap, record.cidAddMethod, cidEvent, out);
emitTempRecord(cidsList,friendsList,record.friendCid, overlap, record.friendCidAddMethod, friendEvent, out);
}
@Override
public void processElement2(EventIp event, Context ctx, Collector<TotalTemp> out) throws Exception {
// 更新最新事件状态(事件时间戳处理)
latestEventState.put(event.getCid(), event);
}
private void emitTempRecord(List<SimiFriends> cidsList,List<SimiFriends> friendsList,String cid, int overlap, String addMethod,
EventIp event, Collector<TotalTemp> out) {
out.collect(new TotalTemp(
cid,
overlap,
event != null ? event.getIp() : null, // 空值防护
event != null ? event.getAreaName() : null,
addMethod,
event != null ? event.getCreateTime() : null, // 保留原始时间戳
cidsList,
friendsList,
System.currentTimeMillis()
));
}
// 优化:提取重叠计算逻辑
private int calculateOverlap(FriendsRecord record) {
Set<String> friendSet = record.getFriendsList().stream()
.map(SimiFriends::getCid)
.collect(Collectors.toSet());
return (int) record.getCidsList().stream()
.map(SimiFriends::getCid)
.filter(friendSet::contains)
.count();
}
});
// simi国内用户数据流处理(5分钟批量更新)
DataStream<SimiUserInfo> userDataStream = userStreamSource.flatMap(new FlatMapFunction<String, SimiUserInfo>() {
private static final long serialVersionUID = 1L;
@Override
public void flatMap(String value, Collector<SimiUserInfo> out) throws Exception {
try {
// 解析 Kafka 数据
SimiUserInfo simiUserInfo = JSONObject.parseObject(value, new TypeReference<SimiUserInfo>() {
});
simiUserInfo.setUpdateTime(TimeConvertUtil.convertToTimestamp(simiUserInfo.getCreate_time()));
out.collect(simiUserInfo);
} catch (Exception e) {
logger.error("Error parsing simi_user_list 处理 Kafka 消息出错 | data:{} | error:{}", value,e.getMessage());
}
}
})
.filter(u -> StringUtils.isNoneEmpty(u.getCid()))
.assignTimestampsAndWatermarks(
WatermarkStrategy.<SimiUserInfo>forBoundedOutOfOrderness(Duration.ofMinutes(5))
.withTimestampAssigner((user, ts) -> user.getUpdateTime()))
.keyBy(user -> user.getCid()).process(new LatestUserProcessFunction());
//检查本人cid是否有ThirdId
DataStream<TotalTemp> joinTempDataStream = joinedStream.connect(userDataStream)
.keyBy(record -> record.getCid(),
user -> user.getCid())
.process(new SimiFriendsTempJoinProcessor() {
/**
*
*/
private static final long serialVersionUID = 1L;
});
SingleOutputStreamOperator<RowData> totalTempRowDataStream = joinTempDataStream
.flatMap(new FlatMapFunction<TotalTemp, RowData>() {
/**
*
*/
private static final long serialVersionUID = 1L;
@Override
public void flatMap(TotalTemp temp, Collector<RowData> out) {
try {
GenericRowData row = new GenericRowData(totalConfigTemp.getFields().length);
// 按表结构顺序设置字段(与totalConfigTemp定义一致)
row.setField(0, StringData.fromString(temp.getCid())); // cid
row.setField(1, temp.getOverlapCidTotal()); // overlap_cid_total
row.setField(2, temp.getIp() != null ? StringData.fromString(temp.getIp()) : null); // ip
row.setField(3, temp.getAreaName() != null ? StringData.fromString(temp.getAreaName()) : null); // area_name
row.setField(4, StringData.fromString(temp.getAddMethod())); // add_method
row.setField(5, TimeConvertUtil.currentTimestamp()); // create_time
row.setField(6, 0); // __DORIS_DELETE_SIGN__ (0表示有效数据)
out.collect(row);
} catch (Exception e) {
logger.error("转换TotalTemp为RowData时出错: {}", e.getMessage());
}
}
});
totalTempRowDataStream
.filter(Objects::nonNull)
.sinkTo(dorisSinkTotalTemp)
.name("Doris-SimiFriendsTotalTemp");
}
public static void main(String[] args) {
String formattedTime = LocalDateTime.now().format(DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss.SSS"));
System.out.println(formattedTime);
}
//simi好友统计分析
public void simiFriendTotalAnalysis(
SingleOutputStreamOperator<FriendsRecord> rowDataStream,
DorisSink<RowData> dorisSink,
DorisSink<RowData> dorisSinkTotal,
TableConfig detailConfig,
TableConfig totalConfig
) {
SingleOutputStreamOperator<RowData> friendsStream = rowDataStream
.flatMap(new FlatMapFunction<FriendsRecord, RowData>() {
/**
......@@ -215,7 +468,7 @@ public class SimiFriendsAchi extends SourceCommonBase implements Serializable {
});
friendsStream
.filter(Objects::nonNull)
// .print()
// .print()
.sinkTo(dorisSink)
.name("Doris-SimiFriends");
......@@ -242,7 +495,7 @@ public class SimiFriendsAchi extends SourceCommonBase implements Serializable {
long cidTotal = userGroupCounts.get(cid);
long friendCidTotal = userGroupCounts.get(friendCid);
// System.out.println("用户分组统计条数: " + userGroupCounts);
// System.out.println("用户分组统计条数: " + userGroupCounts);
// 2. 计算所有用户共享的friend交集数量
// 步骤:a. 按用户分组, 收集每个用户的去重friend集合
......@@ -291,7 +544,7 @@ public class SimiFriendsAchi extends SourceCommonBase implements Serializable {
//查询重叠cid是否有白名单0
List<Map<String, Object>> overlapWhitelist = queryDorisWhitelist(overlapCids);
// String str = "用户分组统计条数: " + userGroupCounts+"交集数量: " + overlapCids.size()+"共享Friend交集: "+overlapCids+"共享Friend thirdId交集: " + validIdCount+"白名单cid条数:"+overlapWhitelist.size();
// String str = "用户分组统计条数: " + userGroupCounts+"交集数量: " + overlapCids.size()+"共享Friend交集: "+overlapCids+"共享Friend thirdId交集: " + validIdCount+"白名单cid条数:"+overlapWhitelist.size();
GenericRowData row = new GenericRowData(totalConfig.getFields().length);
row.setField(0, StringData.fromString(cid));//cid
row.setField(1, StringData.fromString(friendCid));//friend_cid
......@@ -312,10 +565,12 @@ public class SimiFriendsAchi extends SourceCommonBase implements Serializable {
flatStream
.filter(Objects::nonNull)
.sinkTo(dorisSinkTotal)
// .print("统计结果>>>>>>>>>>>>>>>>>>:")
// .print("统计结果>>>>>>>>>>>>>>>>>>:")
.name("Doris-SimiFriendsTotal");
}
public List<Map<String, Object>> queryDorisWhitelist(Set<String> cidSet) {
// 定义类型安全列(防拼写错误)
SqlColumn<Integer> TYPE = new SqlColumn<>("type");
......@@ -353,18 +608,128 @@ public class SimiFriendsAchi extends SourceCommonBase implements Serializable {
return flag;
}
public static class FriendsRecord {
public static class TotalTemp implements Serializable{
/**
*
*/
private static final long serialVersionUID = 1L;
public String cid;
public Integer overlapCidTotal;
public String ip;
public String areaName;
public String addMethod;
public String createTime;
public List<SimiFriends> cidsList;
public List<SimiFriends> friendsList;
public Long collectTime;
public TotalTemp(
String cid,
Integer overlapCidTotal,
String ip,
String areaName,
String addMethod,
String createTime,
List<SimiFriends> cidsList,
List<SimiFriends> friendsList,
Long collectTime
) {
this.cid = cid;
this.overlapCidTotal = overlapCidTotal;
this.ip = ip;
this.areaName = areaName;
this.addMethod = addMethod;
this.createTime = createTime;
this.cidsList = cidsList;
this.friendsList = friendsList;
this.collectTime = collectTime;
}
public Long getCollectTime() {
return collectTime;
}
public void setCollectTime(Long collectTime) {
this.collectTime = collectTime;
}
public List<SimiFriends> getCidsList() {
return cidsList;
}
public void setCidsList(List<SimiFriends> cidsList) {
this.cidsList = cidsList;
}
public List<SimiFriends> getFriendsList() {
return friendsList;
}
public void setFriendsList(List<SimiFriends> friendsList) {
this.friendsList = friendsList;
}
public String getCid() {
return cid;
}
public void setCid(String cid) {
this.cid = cid;
}
public Integer getOverlapCidTotal() {
return overlapCidTotal;
}
public void setOverlapCidTotal(Integer overlapCidTotal) {
this.overlapCidTotal = overlapCidTotal;
}
public String getIp() {
return ip;
}
public void setIp(String ip) {
this.ip = ip;
}
public String getAreaName() {
return areaName;
}
public void setAreaName(String areaName) {
this.areaName = areaName;
}
public String getAddMethod() {
return addMethod;
}
public void setAddMethod(String addMethod) {
this.addMethod = addMethod;
}
public String getCreateTime() {
return createTime;
}
public void setCreateTime(String createTime) {
this.createTime = createTime;
}
}
public static class FriendsRecord implements Serializable{
/**
*
*/
private static final long serialVersionUID = 1L;
public List<FriendsStream> friendsStreamList;
public List<SimiFriends> cidsList;
public List<SimiFriends> friendsList;
public String cid;
public String friendCid;
public FriendsRecord(List<FriendsStream> friendsStreamList, String cid,String friendCid,List<SimiFriends> cidsList,List<SimiFriends> friendsList) {
public String cidAddMethod;
public String friendCidAddMethod;
public FriendsRecord(
List<FriendsStream> friendsStreamList,
String cid,
String friendCid,
List<SimiFriends> cidsList,
List<SimiFriends> friendsList,
String cidAddMethod,
String friendCidAddMethod) {
this.friendsStreamList = friendsStreamList;
this.cid = cid;
this.friendCid = friendCid;
this.cidsList = cidsList;
this.friendsList = friendsList;
this.cidAddMethod = cidAddMethod;
this.friendCidAddMethod = friendCidAddMethod;
}
public List<FriendsStream> getFriendsStreamList() {
return friendsStreamList;
......@@ -396,12 +761,18 @@ public class SimiFriendsAchi extends SourceCommonBase implements Serializable {
public void setFriendsList(List<SimiFriends> friendsList) {
this.friendsList = friendsList;
}
}
@Override
public void sendToSinkKafka(DataStreamSource<String> mStream) {
// TODO Auto-generated method stub
public String getCidAddMethod() {
return cidAddMethod;
}
public void setCidAddMethod(String cidAddMethod) {
this.cidAddMethod = cidAddMethod;
}
public String getFriendCidAddMethod() {
return friendCidAddMethod;
}
public void setFriendCidAddMethod(String friendCidAddMethod) {
this.friendCidAddMethod = friendCidAddMethod;
}
}
}
......@@ -12,6 +12,7 @@ import com.flink.processor.impl.RealTransactionProcessor;
import com.flink.processor.impl.RealUsersProcessor;
import com.flink.processor.impl.SimiFriendsProcessor;
import com.flink.processor.impl.SimiGroupstProcessor;
import com.flink.processor.impl.VectorAngleCalculationProcessor;
/**
* @author wjs
......@@ -42,6 +43,8 @@ public class JobProcessorFactory {
return new SimiFriendsProcessor();
case SIMI_GROUPS:
return new SimiGroupstProcessor();
case VECTOR_ANGLE_CALCULATION:
return new VectorAngleCalculationProcessor();
default:
throw new IllegalArgumentException("未知的Job类型: " + jobType);
}
......
package com.flink.processor.function;
import org.apache.commons.lang3.StringUtils;
import org.apache.flink.api.common.state.MapState;
import org.apache.flink.api.common.state.MapStateDescriptor;
import org.apache.flink.api.common.state.ValueState;
import org.apache.flink.api.common.state.ValueStateDescriptor;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.functions.co.CoProcessFunction;
import org.apache.flink.util.Collector;
import com.flink.achieve.doris.SimiFriendsAchi.TotalTemp;
import com.flink.vo.SimiUserInfo;
/**
* @author wjs
* @version 创建时间:2025-6-17 11:21:03
* 类说明
*/
public class SimiFriendsTempJoinProcessor extends CoProcessFunction<TotalTemp, SimiUserInfo, TotalTemp>{
/**
*
*/
private static final long serialVersionUID = 1L;
private ValueState<SimiUserInfo> userState;
private MapState<Long, TotalTemp> pendingTotalTemp;
@Override
public void open(Configuration parameters) {
userState = getRuntimeContext().getState(new ValueStateDescriptor<>("user-state", SimiUserInfo.class));
MapStateDescriptor<Long, TotalTemp> totalTempDesc = new MapStateDescriptor<>("pendingDevices", Long.class,
TotalTemp.class);
pendingTotalTemp = getRuntimeContext().getMapState(totalTempDesc);
}
@Override
public void processElement1(TotalTemp totalTemp, CoProcessFunction<TotalTemp, SimiUserInfo, TotalTemp>.Context ctx,
Collector<TotalTemp> out) throws Exception {
SimiUserInfo user = userState.value();
if (user != null && StringUtils.equals(user.getCid(), totalTemp.getCid()) && StringUtils.isEmpty(user.getThird_id())) {
out.collect(buildResult(user, totalTemp));
} else {
pendingTotalTemp.put(totalTemp.getCollectTime(), totalTemp);
ctx.timerService().registerEventTimeTimer(totalTemp.getCollectTime() + 60000); // 1分钟超时
}
}
@Override
public void processElement2(SimiUserInfo user, CoProcessFunction<TotalTemp, SimiUserInfo, TotalTemp>.Context ctx,
Collector<TotalTemp> out) throws Exception {
userState.update(user); // 更新最新用户状态
for (TotalTemp totalTemp : pendingTotalTemp.values()) {
if (user != null && StringUtils.equals(user.getCid(), totalTemp.getCid()) && StringUtils.isEmpty(user.getThird_id())) {
out.collect(buildResult(user, totalTemp));
}
}
pendingTotalTemp.clear();
}
@Override
public void onTimer(long timestamp, OnTimerContext ctx, Collector<TotalTemp> out) throws Exception {
pendingTotalTemp.remove(timestamp - 60000); // 清理超时事件
}
private TotalTemp buildResult(SimiUserInfo user, TotalTemp totalTemp) {
return new TotalTemp(
totalTemp.getCid(),
totalTemp.getOverlapCidTotal(),
totalTemp.getIp(),
totalTemp.getAreaName(),
totalTemp.getAddMethod(),
totalTemp.getCreateTime(),
totalTemp.getCidsList(),
totalTemp.getFriendsList(),
totalTemp.getCollectTime()
);
}
}
package com.flink.processor.impl;
import java.util.Arrays;
import java.util.List;
import java.util.stream.Collectors;
import com.flink.achieve.doris.SimiFriendsAchi;
import com.flink.enums.JobTypeEnum;
import com.flink.enums.TopicTypeEnum;
import com.flink.processor.JobProcessor;
import com.flink.vo.KafkaTopic;
/**
* @author wjs
......@@ -15,9 +20,18 @@ public class SimiFriendsProcessor implements JobProcessor{
@Override
public void process() throws Exception {
new SimiFriendsAchi().handleDataStreamSource(
JobTypeEnum.SIMI_FRIENDS,
TopicTypeEnum.OPEN_SIMI_API
createTopicList(),
JobTypeEnum.SIMI_FRIENDS
);
}
private static List<KafkaTopic> createTopicList() {
return Arrays.stream(new TopicTypeEnum[]{
TopicTypeEnum.OPEN_SIMI_API,
TopicTypeEnum.ODS_EVENT_LOG,
TopicTypeEnum.SIMI_USER_LIST_TOPIC
}).map(TopicTypeEnum::createKafkaTopic)
.collect(Collectors.toList());
}
}
......@@ -2,24 +2,46 @@ package com.flink.util;
import java.text.SimpleDateFormat;
import java.time.LocalDate;
import java.time.LocalDateTime;
import java.time.ZoneId;
import java.time.format.DateTimeFormatter;
import java.util.TimeZone;
/**
* @author wjs
* @version 创建时间:2025-5-27 14:33:05
* 类说明
*/
import org.apache.flink.table.data.TimestampData;
/**
* @author wjs
* @version 创建时间:2025-5-27 14:33:05 类说明
*/
public class TimeConvertUtil {
public static long convertToTimestamp(String timeStr) throws Exception {
SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
sdf.setTimeZone(TimeZone.getTimeZone("UTC"));
return sdf.parse(timeStr).getTime();
}
public static int convertToSqlDate(String datetime) {
return (int) LocalDate.parse(datetime.split(" ")[0])
.toEpochDay(); // 转换为天数偏移量
}
return (int) LocalDate.parse(datetime.split(" ")[0]).toEpochDay(); // 转换为天数偏移量
}
private static final DateTimeFormatter FORMATTER = DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss.SSS");
private static final ZoneId DEFAULT_ZONE = ZoneId.systemDefault();
// 获取当前TimestampData(毫秒级)
public static TimestampData currentTimestamp() {
return TimestampData.fromEpochMillis(System.currentTimeMillis());
}
// 带时区的字符串转时间戳
public static TimestampData parseToTimestamp(String timeStr, ZoneId zone) {
LocalDateTime ldt = LocalDateTime.parse(timeStr, FORMATTER);
long epochMillis = ldt.atZone(zone).toInstant().toEpochMilli();
return TimestampData.fromEpochMillis(epochMillis);
}
// 格式化输出(用于日志/导出)
public static String format(TimestampData timestamp) {
return FORMATTER.format(timestamp.toLocalDateTime());
}
}
......@@ -19,6 +19,7 @@ public class SimiUserInfo implements Serializable {
private static final long serialVersionUID = 1L;
private String nick;
private String third_id;
private String country_code;
private String user_head_url;
private String create_time;
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or sign in to comment