Commit 2c66c15a by 魏建枢

统计好友相关数据

parent b51323f6
package com.flink.achieve.doris; package com.flink.achieve.doris;
import java.io.Serializable; import java.io.Serializable;
import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.PreparedStatement;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.util.ArrayList;
import java.util.Arrays; import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.HashSet;
import java.util.List; import java.util.List;
import java.util.Map;
import java.util.Objects; import java.util.Objects;
import java.util.Set;
import java.util.stream.Collectors;
import org.apache.commons.collections.CollectionUtils;
import org.apache.commons.lang3.StringUtils; import org.apache.commons.lang3.StringUtils;
import org.apache.doris.flink.sink.DorisSink; import org.apache.doris.flink.sink.DorisSink;
import org.apache.flink.api.common.functions.FlatMapFunction; import org.apache.flink.api.common.functions.FlatMapFunction;
...@@ -27,15 +40,16 @@ import com.flink.common.DorisConnector; ...@@ -27,15 +40,16 @@ import com.flink.common.DorisConnector;
import com.flink.common.SourceCommonBase; import com.flink.common.SourceCommonBase;
import com.flink.enums.OpenSimiApiTypeEnum; import com.flink.enums.OpenSimiApiTypeEnum;
import com.flink.processor.impl.OkHttpService; import com.flink.processor.impl.OkHttpService;
import com.flink.util.LoadPropertiesFile;
import com.flink.vo.simi.FriendsStream;
import com.flink.vo.simi.InitiateFriendRequestReqDto; import com.flink.vo.simi.InitiateFriendRequestReqDto;
import com.flink.vo.simi.SimiFriends; import com.flink.vo.simi.SimiFriends;
/** /**
* @author wjs * @author wjs
* @version 创建时间:2025-5-29 10:53:48 * @version 创建时间:2025-5-29 10:53:48 类说明
* 类说明 */
*/ public class SimiFriendsAchi extends SourceCommonBase implements Serializable {
public class SimiFriendsAchi extends SourceCommonBase implements Serializable{
/** /**
* *
...@@ -45,81 +59,313 @@ public class SimiFriendsAchi extends SourceCommonBase implements Serializable{ ...@@ -45,81 +59,313 @@ public class SimiFriendsAchi extends SourceCommonBase implements Serializable{
@Override @Override
public void parseSourceKafkaJson(DataStreamSource<String> dataStreamSource) throws ParseException, Exception { public void parseSourceKafkaJson(DataStreamSource<String> dataStreamSource) throws ParseException, Exception {
//=================配置入库字段========================================= // =================配置入库字段=========================================
String[] fields = { String[] fields = {
"cid", "cid",
"friend_cid", "friend_cid",
"nick", "nick",
"add_time", "add_time",
"third_id", "third_id",
"remarks", "remarks",
"__DORIS_DELETE_SIGN__" "__DORIS_DELETE_SIGN__"
}; };
DataType[] types = { DataType[] types = {
DataTypes.STRING(), DataTypes.STRING(),
DataTypes.STRING(), DataTypes.STRING(),
DataTypes.STRING(), DataTypes.STRING(),
DataTypes.STRING(), DataTypes.STRING(),
DataTypes.STRING(), DataTypes.STRING(),
DataTypes.STRING(), DataTypes.STRING(),
DataTypes.INT() DataTypes.INT()
};
String[] fieldsTotal = {
"cid",
"friend_cid",
"cid_total",
"friend_cid_total",
"overlap_cid",
"overlap_total",
"overlap_third_id_total",
"overlap_whitelist_total",
"__DORIS_DELETE_SIGN__"
};
DataType[] typesTotal = {
DataTypes.STRING(),
DataTypes.STRING(),
DataTypes.INT(),
DataTypes.INT(),
DataTypes.STRING(),
DataTypes.INT(),
DataTypes.INT(),
DataTypes.INT(),
DataTypes.INT()
}; };
//=================流式处理=========================================
String tableName = "bi.simi_friends";
DorisSink<RowData> dorisSink = DorisConnector.sinkDoris(fields, types, tableName);
SingleOutputStreamOperator<RowData> rowDataStream = dataStreamSource.flatMap( // =================流式处理=========================================
new FlatMapFunction<String, RowData>() { String tableName = "bi.simi_friends";
private static final long serialVersionUID = 1L; DorisSink<RowData> dorisSink = DorisConnector.sinkDoris(fields, types, tableName);
String tableNameTotal = "bi.simi_friends_total";
@Override DorisSink<RowData> dorisSinkTotal = DorisConnector.sinkDoris(fieldsTotal, typesTotal, tableNameTotal);
public void flatMap(String value, Collector<RowData> out) throws Exception {
try { SingleOutputStreamOperator<FriendsRecord> rowDataStream = dataStreamSource
JSONObject jsonObj = JSON.parseObject(value); .flatMap(new FlatMapFunction<String, FriendsRecord>() {
String sendType = jsonObj.getString("send_type"); private static final long serialVersionUID = 1L;
if(!StringUtils.equals(OpenSimiApiTypeEnum.INITIATE_FRIEND_REQUEST.getCode(), sendType)) {
return; @Override
} public void flatMap(String value, Collector<FriendsRecord> out) throws Exception {
String reqBody = jsonObj.getString("req_body"); try {
InitiateFriendRequestReqDto jsonReqDto = JSONObject.parseObject(reqBody,new TypeReference<InitiateFriendRequestReqDto>(){}); JSONObject jsonObj = JSON.parseObject(value);
List<String> cidList = Arrays.asList(jsonReqDto.getCid(), jsonReqDto.getFriendCid()); String sendType = jsonObj.getString("send_type");
if (!StringUtils.equals(OpenSimiApiTypeEnum.INITIATE_FRIEND_REQUEST.getCode(), sendType)) {
for(String cid : cidList) { return;
String friendsInfo = OkHttpService.friends(cid); }
if(StringUtils.isNotEmpty(friendsInfo)) { String reqBody = jsonObj.getString("req_body");
List<SimiFriends> friendsList = JSONObject.parseObject(friendsInfo,new TypeReference<List<SimiFriends>>(){}); InitiateFriendRequestReqDto jsonReqDto = JSONObject.parseObject(reqBody,
for (SimiFriends friend : friendsList) { new TypeReference<InitiateFriendRequestReqDto>() {
GenericRowData row = new GenericRowData(fields.length); });
row.setField(0, StringData.fromString(cid)); List<String> cidList = Arrays.asList(jsonReqDto.getCid(), jsonReqDto.getFriendCid());
row.setField(1, StringData.fromString(friend.getCid())); logger.info("统计条件 cid:{},friendCid:{}", jsonReqDto.getCid(), jsonReqDto.getFriendCid());
row.setField(2, StringData.fromString(friend.getNick()));
row.setField(3, StringData.fromString(friend.getAddTime())); String cid = jsonReqDto.getCid();
row.setField(4, StringData.fromString(friend.getThirdId())); String friendCid = jsonReqDto.getFriendCid();
row.setField(5, StringData.fromString(friend.getRemarks())); String cidInfo = OkHttpService.friends(cid);
row.setField(6, 0); String friendCidInfo = OkHttpService.friends(friendCid);
out.collect(row); if (StringUtils.isNotEmpty(cidInfo) && StringUtils.isNotEmpty(friendCidInfo)) {
} List<SimiFriends> cidsList = JSONObject.parseObject(cidInfo,
} new TypeReference<List<SimiFriends>>() {
} });
} catch (Exception e) { List<SimiFriends> friendsList = JSONObject.parseObject(friendCidInfo,
logger.error("SimiFriendsAchi 处理 Kafka 消息出错 | rawData:{} | error:{}", value, e.getMessage()); new TypeReference<List<SimiFriends>>() {
} });
} logger.info("统计条件 cid:{},total:{}", cid, friendsList.size());
}); List<FriendsStream> friendsStreamList = new ArrayList<>();
for (SimiFriends friend : cidsList) {
rowDataStream FriendsStream friendsStream = new FriendsStream(
.filter(Objects::nonNull) cid,
// .print() friend.getCid(),
.sinkTo(dorisSink) friend.getNick(),
.name("Doris-SimiFriends"); friend.getAddTime(),
friend.getThirdId(),
friend.getRemarks()
);
friendsStreamList.add(friendsStream);
}
for (SimiFriends friend : friendsList) {
FriendsStream friendsStream = new FriendsStream(
friendCid,
friend.getCid(),
friend.getNick(),
friend.getAddTime(),
friend.getThirdId(),
friend.getRemarks()
);
friendsStreamList.add(friendsStream);
}
if(CollectionUtils.isEmpty(friendsStreamList)) {
return;
}
FriendsRecord friendsRecord = new FriendsRecord(
friendsStreamList,
cid,
friendCid
);
out.collect(friendsRecord);
}
} catch (Exception e) {
logger.error("SimiFriendsAchi 处理 Kafka 消息出错 | rawData:{} | error:{}", value,
e.getMessage());
}
}
});
SingleOutputStreamOperator<RowData> friendsStream = rowDataStream
.flatMap(new FlatMapFunction<FriendsRecord, RowData>() {
/**
*
*/
private static final long serialVersionUID = 1L;
@Override
public void flatMap(FriendsRecord friendsRecord, Collector<RowData> out) {
List<FriendsStream> friendsStreamList = friendsRecord.getFriendsStreamList();
if(CollectionUtils.isEmpty(friendsStreamList)) {
return;
}
GenericRowData row = new GenericRowData(fieldsTotal.length);
for(FriendsStream friendsStream : friendsStreamList) {
row.setField(0, StringData.fromString(friendsStream.getCid()));//cid
row.setField(1, StringData.fromString(friendsStream.getFriendCid()));//friend_cid
row.setField(2, StringData.fromString(friendsStream.getNick()));//nick
row.setField(3, StringData.fromString(friendsStream.getAddTime()));//add_time
row.setField(4, StringData.fromString(friendsStream.getThirdId()));//third_id
row.setField(5, StringData.fromString(friendsStream.getRemarks()));//remarks
row.setField(6, 0);//__DORIS_DELETE_SIGN__
out.collect(row);
}
}
});
friendsStream
.filter(Objects::nonNull)
// .print()
.sinkTo(dorisSink)
.name("Doris-SimiFriends");
SingleOutputStreamOperator<RowData> flatStream = rowDataStream
.flatMap(new FlatMapFunction<FriendsRecord, RowData>() {
/**
*
*/
private static final long serialVersionUID = 1L;
@Override
public void flatMap(FriendsRecord friendsRecord, Collector<RowData> out) {
List<FriendsStream> friendsStreamList = friendsRecord.getFriendsStreamList();
String cid = friendsRecord.getCid();
String friendCid = friendsRecord.getFriendCid();
// 1. 按用户分组并统计各组条数
Map<String, Long> userGroupCounts = friendsStreamList.stream()
.collect(Collectors.groupingBy(
FriendsStream::getCid,
Collectors.counting()
));
long cidTotal = userGroupCounts.get(cid);
long friendCidTotal = userGroupCounts.get(friendCid);
// System.out.println("用户分组统计条数: " + userGroupCounts);
// 2. 计算所有用户共享的friend交集数量
// 步骤:a. 按用户分组, 收集每个用户的去重friend集合
Map<String, Set<String>> userFriendsMap = friendsStreamList.stream()
.collect(Collectors.groupingBy(
FriendsStream::getCid,
Collectors.mapping(
FriendsStream::getFriendCid,
Collectors.toSet()
)
));
//b. 提取所有用户的friend集合
Collection<Set<String>> friendSets = userFriendsMap.values();
//c. 求所有集合的交集
Set<String> overlapCids = friendSets.stream()
.reduce((set1, set2) -> {
Set<String> intersection = new HashSet<>(set1);
intersection.retainAll(set2);
return intersection;
})
.orElse(Collections.emptySet());
//d.共享Friend thirdId交集:
long validIdCount = overlapCids.stream()
.filter(friend ->
friendsStreamList.stream()
.anyMatch(fs ->
fs.getFriendCid().equals(friend) &&
fs.getThirdId() != null &&
!fs.getThirdId().isEmpty()
)
)
.count();
List<String> overlapWhitelist = queryDorisWhitelist(overlapCids);
// String str = "用户分组统计条数: " + userGroupCounts+"交集数量: " + overlapCids.size()+"共享Friend交集: "+overlapCids+"共享Friend thirdId交集: " + validIdCount+"白名单cid条数:"+overlapWhitelist.size();
GenericRowData row = new GenericRowData(fieldsTotal.length);
row.setField(0, StringData.fromString(cid));//cid
row.setField(1, StringData.fromString(friendCid));//friend_cid
row.setField(2, (int)cidTotal);//cid_total
row.setField(3, (int)friendCidTotal);//friend_cid_total
row.setField(4, String.join(", ", overlapCids));//overlap_cid
row.setField(5, overlapCids == null ? 0 : overlapCids.size());//overlap_total
row.setField(6, (int)validIdCount);//overlap_third_id_total
row.setField(7, overlapWhitelist == null ? 0 : overlapWhitelist.size());//overlap_whitelist_total
row.setField(8, 0);//__DORIS_DELETE_SIGN__
out.collect(row);
}
});
flatStream
.filter(Objects::nonNull)
.sinkTo(dorisSinkTotal)
// .print("统计结果>>>>>>>>>>>>>>>>>>:")
.name("Doris-SimiFriendsTotal");
}
public static class FriendsRecord {
public List<FriendsStream> friendsStreamList;
public String cid;
public String friendCid;
public FriendsRecord(List<FriendsStream> friendsStreamList, String cid,String friendCid) {
this.friendsStreamList = friendsStreamList;
this.cid = cid;
this.friendCid = friendCid;
}
public List<FriendsStream> getFriendsStreamList() {
return friendsStreamList;
}
public void setFriendsStreamList(List<FriendsStream> friendsStreamList) {
this.friendsStreamList = friendsStreamList;
}
public String getCid() {
return cid;
}
public void setCid(String cid) {
this.cid = cid;
}
public String getFriendCid() {
return friendCid;
}
public void setFriendCid(String friendCid) {
this.friendCid = friendCid;
}
}
public List<String> queryDorisWhitelist(Set<String> cidSet) {
List<List<String>> batches = partitionSet(cidSet, 1000); // 每批1000个CID
List<String> results = new ArrayList<>();
try (Connection conn = DriverManager.getConnection(LoadPropertiesFile.getPropertyFileValues("doris.jdbc_url"), LoadPropertiesFile.getPropertyFileValues("doris.username"), "")) {
// 开启 PreparedStatement
conn.setClientInfo("useServerPrepStmts", "true");
for (List<String> batch : batches) {
// 构建参数化 SQL
String placeholders = String.join(",", Collections.nCopies(batch.size(), "?"));
String sql = "SELECT * FROM bi.simi_user_antifraud_bw WHERE cid IN (" + placeholders + ") AND type = 1";
try (PreparedStatement pstmt = conn.prepareStatement(sql)) {
// 绑定参数值(自动转义)
for (int i = 0; i < batch.size(); i++) {
pstmt.setString(i + 1, batch.get(i));
}
// 执行查询
try (ResultSet rs = pstmt.executeQuery()) {
while (rs.next()) {
results.add(rs.getString("cid"));
}
}
} catch (SQLException e) {
logger.error("queryDorisWhitelist 分批查询失败,参数: {}", batch, e);
}
}
} catch (Exception e) {
logger.error("queryDorisWhitelist 白名单查询失败", e);
}
return results;
}
// 辅助方法:将 Set 分批处理
private List<List<String>> partitionSet(Set<String> set, int batchSize) {
List<String> list = new ArrayList<>(set);
List<List<String>> batches = new ArrayList<>();
for (int i = 0; i < list.size(); i += batchSize) {
batches.add(list.subList(i, Math.min(i + batchSize, list.size())));
}
return batches;
} }
@Override @Override
public void sendToSinkKafka(DataStreamSource<String> mStream) { public void sendToSinkKafka(DataStreamSource<String> mStream) {
// TODO Auto-generated method stub // TODO Auto-generated method stub
} }
} }
package com.flink.vo.simi;
import java.io.Serializable;
import lombok.Data;
import lombok.ToString;
/**
* @author wjs
* @version 创建时间:2025-6-6 16:12:24
* 类说明
*/
@Data
@ToString
public class FriendsStream implements Serializable{
/**
*
*/
private static final long serialVersionUID = 1L;
private String cid;
private String friendCid;
private String nick;
private String addTime;
private String thirdId;
private String remarks;
public FriendsStream(String cid, String friendCid, String nick, String addTime, String thirdId, String remarks) {
this.cid = cid;
this.friendCid = friendCid;
this.nick = nick;
this.addTime = addTime;
this.thirdId = thirdId;
this.remarks = remarks;
}
}
...@@ -3,7 +3,7 @@ ...@@ -3,7 +3,7 @@
#kafka.bootstrapServers=168.138.185.142:9092,213.35.103.223:9092,129.150.49.247:9092 #kafka.bootstrapServers=168.138.185.142:9092,213.35.103.223:9092,129.150.49.247:9092
kafka.bootstrapServers=10.0.0.29:9092,10.0.0.87:9092,10.0.0.18:9092 kafka.bootstrapServers=10.0.0.29:9092,10.0.0.87:9092,10.0.0.18:9092
#doris.jdbc_url=jdbc:mysql://10.0.0.105 9030/bi?useUnicode=true&characterEncoding=utf8&zeroDateTimeBehavior=convertToNull&useSSL=true&serverTimezone=GMT%2B8&nullCatalogMeansCurrent=true doris.jdbc_url=jdbc:mysql://10.0.0.105 9030/bi?useUnicode=true&characterEncoding=utf8&zeroDateTimeBehavior=convertToNull&useSSL=true&serverTimezone=GMT%2B8&nullCatalogMeansCurrent=true
#doris.jdbc_url=jdbc:mysql://140.245.112.44:9030/bi?useUnicode=true&characterEncoding=utf8&zeroDateTimeBehavior=convertToNull&useSSL=true&serverTimezone=GMT%2B8&nullCatalogMeansCurrent=true #doris.jdbc_url=jdbc:mysql://140.245.112.44:9030/bi?useUnicode=true&characterEncoding=utf8&zeroDateTimeBehavior=convertToNull&useSSL=true&serverTimezone=GMT%2B8&nullCatalogMeansCurrent=true
#doris.fe=140.245.112.44:8030 #doris.fe=140.245.112.44:8030
doris.fe=10.0.0.105:8030 doris.fe=10.0.0.105:8030
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or sign in to comment