Commit 2ac13f87 by 魏建枢

sql生成器代码

parent 9c42b51f
package com.flink.achieve.doris;
import java.io.Serializable;
import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.PreparedStatement;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
......@@ -36,10 +31,13 @@ import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONObject;
import com.alibaba.fastjson.TypeReference;
import com.flink.common.DorisConnector;
import com.flink.common.DynamicSqlBuilder;
import com.flink.common.DynamicSqlBuilder.SqlColumn;
import com.flink.common.DynamicSqlBuilder.SqlWithParams;
import com.flink.common.SourceCommonBase;
import com.flink.config.TableConfig;
import com.flink.enums.OpenSimiApiTypeEnum;
import com.flink.processor.impl.OkHttpService;
import com.flink.util.LoadPropertiesFile;
import com.flink.vo.simi.FriendsStream;
import com.flink.vo.simi.InitiateFriendRequestReqDto;
import com.flink.vo.simi.SimiFriends;
......@@ -59,55 +57,65 @@ public class SimiFriendsAchi extends SourceCommonBase implements Serializable {
@Override
public void parseSourceKafkaJson(DataStreamSource<String> dataStreamSource) throws ParseException, Exception {
// =================配置入库字段=========================================
String[] fields = {
"cid",
// 明细表结构
TableConfig detailConfig = new TableConfig(
new String[]{
"cid",
"friend_cid",
"nick",
"add_time",
"third_id",
"remarks",
"__DORIS_DELETE_SIGN__"
},
new DataType[]{
DataTypes.STRING(),
DataTypes.STRING(),
DataTypes.STRING(),
DataTypes.STRING(),
DataTypes.STRING(),
DataTypes.STRING(),
DataTypes.INT()
},
"bi.simi_friends"
);
// 汇总表结构
TableConfig totalConfig = new TableConfig(
new String[]{
"cid",
"friend_cid",
"nick",
"add_time",
"third_id",
"remarks",
"cid_total",
"friend_cid_total",
"overlap_cid",
"overlap_total",
"overlap_third_id_total",
"overlap_whitelist_total",
"self_cid_thirdid_total",
"self_friend_cid_thirdid_total",
"friends_of_friend_cid_thirdid_total",
"friend_cid_of_friend_cid_thirdid_total",
"__DORIS_DELETE_SIGN__"
};
DataType[] types = {
DataTypes.STRING(),
DataTypes.STRING(),
DataTypes.STRING(),
DataTypes.STRING(),
},
new DataType[]{
DataTypes.STRING(),
DataTypes.STRING(),
DataTypes.INT(),
DataTypes.INT(),
DataTypes.STRING(),
DataTypes.INT(),
DataTypes.INT(),
DataTypes.INT(),
DataTypes.INT(),
DataTypes.INT(),
DataTypes.INT(),
DataTypes.INT(),
DataTypes.INT()
};
String[] fieldsTotal = {
"cid",
"friend_cid",
"cid_total",
"friend_cid_total",
"overlap_cid",
"overlap_total",
"overlap_third_id_total",
"overlap_whitelist_total",
"__DORIS_DELETE_SIGN__"
};
DataType[] typesTotal = {
DataTypes.STRING(),
DataTypes.STRING(),
DataTypes.INT(),
DataTypes.INT(),
DataTypes.STRING(),
DataTypes.INT(),
DataTypes.INT(),
DataTypes.INT(),
DataTypes.INT()
};
},
"bi.simi_friends_total"
);
// =================流式处理=========================================
String tableName = "bi.simi_friends";
DorisSink<RowData> dorisSink = DorisConnector.sinkDoris(fields, types, tableName);
String tableNameTotal = "bi.simi_friends_total";
DorisSink<RowData> dorisSinkTotal = DorisConnector.sinkDoris(fieldsTotal, typesTotal, tableNameTotal);
DorisSink<RowData> dorisSink = DorisConnector.sinkDoris(detailConfig.getFields(), detailConfig.getTypes(), detailConfig.getTableName());
DorisSink<RowData> dorisSinkTotal = DorisConnector.sinkDoris(totalConfig.getFields(), totalConfig.getTypes(), totalConfig.getTableName());
SingleOutputStreamOperator<FriendsRecord> rowDataStream = dataStreamSource
.flatMap(new FlatMapFunction<String, FriendsRecord>() {
......@@ -125,9 +133,6 @@ public class SimiFriendsAchi extends SourceCommonBase implements Serializable {
InitiateFriendRequestReqDto jsonReqDto = JSONObject.parseObject(reqBody,
new TypeReference<InitiateFriendRequestReqDto>() {
});
// List<String> cidList = Arrays.asList(jsonReqDto.getCid(), jsonReqDto.getFriendCid());
// logger.info("统计条件 cid:{},friendCid:{}", jsonReqDto.getCid(), jsonReqDto.getFriendCid());
String cid = jsonReqDto.getCid();
String friendCid = jsonReqDto.getFriendCid();
String cidInfo = OkHttpService.friends(cid);
......@@ -195,7 +200,7 @@ public class SimiFriendsAchi extends SourceCommonBase implements Serializable {
if(CollectionUtils.isEmpty(friendsStreamList)) {
return;
}
GenericRowData row = new GenericRowData(fieldsTotal.length);
GenericRowData row = new GenericRowData(detailConfig.getFields().length);
for(FriendsStream friendsStream : friendsStreamList) {
row.setField(0, StringData.fromString(friendsStream.getCid()));//cid
row.setField(1, StringData.fromString(friendsStream.getFriendCid()));//friend_cid
......@@ -226,6 +231,8 @@ public class SimiFriendsAchi extends SourceCommonBase implements Serializable {
List<FriendsStream> friendsStreamList = friendsRecord.getFriendsStreamList();
String cid = friendsRecord.getCid();
String friendCid = friendsRecord.getFriendCid();
List<SimiFriends> cidsList = friendsRecord.getCidsList();
List<SimiFriends> friendsList = friendsRecord.getFriendsList();
// 1. 按用户分组并统计各组条数
Map<String, Long> userGroupCounts = friendsStreamList.stream()
.collect(Collectors.groupingBy(
......@@ -269,12 +276,23 @@ public class SimiFriendsAchi extends SourceCommonBase implements Serializable {
)
)
.count();
//查询重叠cid是否有白名单
List<String> overlapWhitelist = queryDorisWhitelist(overlapCids);
//查询
String cidThirdId = friendsStreamList.stream()
.filter(f -> StringUtils.equals(f.getCid(), cid) && !"simitalk".equals(f.getThirdId()))
.findFirst()
.map(FriendsStream::getThirdId)
.orElse(null);
String friendCidThirdId = friendsStreamList.stream()
.filter(f -> StringUtils.equals(f.getCid(), friendCid) && !"simitalk".equals(f.getThirdId()))
.findFirst()
.map(FriendsStream::getThirdId)
.orElse(null);
//查询重叠cid是否有白名单0
List<Map<String, Object>> overlapWhitelist = queryDorisWhitelist(overlapCids);
// String str = "用户分组统计条数: " + userGroupCounts+"交集数量: " + overlapCids.size()+"共享Friend交集: "+overlapCids+"共享Friend thirdId交集: " + validIdCount+"白名单cid条数:"+overlapWhitelist.size();
GenericRowData row = new GenericRowData(fieldsTotal.length);
GenericRowData row = new GenericRowData(totalConfig.getFields().length);
row.setField(0, StringData.fromString(cid));//cid
row.setField(1, StringData.fromString(friendCid));//friend_cid
row.setField(2, (int)cidTotal);//cid_total
......@@ -283,7 +301,11 @@ public class SimiFriendsAchi extends SourceCommonBase implements Serializable {
row.setField(5, overlapCids == null ? 0 : overlapCids.size());//overlap_total
row.setField(6, (int)validIdCount);//overlap_third_id_total
row.setField(7, overlapWhitelist == null ? 0 : overlapWhitelist.size());//overlap_whitelist_total
row.setField(8, 0);//__DORIS_DELETE_SIGN__
row.setField(8, StringUtils.isNotEmpty(cidThirdId) ? 1 : 0);//self_cid_thirdid_total
row.setField(9, StringUtils.isNotEmpty(friendCidThirdId) ? 1 : 0);//self_friend_cid_thirdid_total
row.setField(10, checkThirdIds(cidsList));//friends_of_friend_cid_thirdid__total
row.setField(11, checkThirdIds(friendsList));//friend_cid_of_friend_cid_thirdid_total
row.setField(12, 0);//__DORIS_DELETE_SIGN__
out.collect(row);
}
});
......@@ -294,6 +316,43 @@ public class SimiFriendsAchi extends SourceCommonBase implements Serializable {
.name("Doris-SimiFriendsTotal");
}
public List<Map<String, Object>> queryDorisWhitelist(Set<String> cidSet) {
// 定义类型安全列(防拼写错误)
SqlColumn<Integer> TYPE = new SqlColumn<>("type");
SqlColumn<String> CID = new SqlColumn<>("cid");
// 生成动态SQL(自动分批)
List<SqlWithParams> queries = new DynamicSqlBuilder("SELECT cid FROM bi.simi_user_antifraud_bw")
.allowColumns("type", "cid") // 防SQL注入
.andCondition(TYPE, "=", 1)
.andIn(CID, new ArrayList<>(cidSet))
.batchSize(3) // 每批2个参数
.build();
List<Map<String, Object>> params = DynamicSqlBuilder.queryDoris(queries);
logger.info(">>>>>>>>>>>params:{}",params);
return params;
}
public static int checkThirdIds(List<SimiFriends> simiFriendList) {
int flag = 0;
if (simiFriendList == null || simiFriendList.isEmpty()) {
return flag;
}
boolean allNull = simiFriendList.stream()
.map(SimiFriends::getThirdId)
.allMatch(StringUtils::isBlank); // 同时检查 null、空字符串、纯空格 [4,9](@ref)
boolean anyNonNull = simiFriendList.stream()
.filter(Objects::nonNull) // 过滤掉null对象
.filter(friend -> !"simitalk".equals(friend.getThirdId()))
.map(SimiFriends::getThirdId)
.anyMatch(id -> !StringUtils.isBlank(id)); // 存在有效值
if (allNull) {
return flag;
} else if (anyNonNull) {
flag = 1;
}
return flag;
}
public static class FriendsRecord {
public List<FriendsStream> friendsStreamList;
public List<SimiFriends> cidsList;
......@@ -304,6 +363,8 @@ public class SimiFriendsAchi extends SourceCommonBase implements Serializable {
this.friendsStreamList = friendsStreamList;
this.cid = cid;
this.friendCid = friendCid;
this.cidsList = cidsList;
this.friendsList = friendsList;
}
public List<FriendsStream> getFriendsStreamList() {
return friendsStreamList;
......@@ -336,47 +397,6 @@ public class SimiFriendsAchi extends SourceCommonBase implements Serializable {
this.friendsList = friendsList;
}
}
public List<String> queryDorisWhitelist(Set<String> cidSet) {
List<List<String>> batches = partitionSet(cidSet, 1000); // 每批1000个CID
List<String> results = new ArrayList<>();
try (Connection conn = DriverManager.getConnection(LoadPropertiesFile.getPropertyFileValues("doris.jdbc_url"), LoadPropertiesFile.getPropertyFileValues("doris.username"), "")) {
// 开启 PreparedStatement
conn.setClientInfo("useServerPrepStmts", "true");
for (List<String> batch : batches) {
// 构建参数化 SQL
String placeholders = String.join(",", Collections.nCopies(batch.size(), "?"));
String sql = "SELECT * FROM bi.simi_user_antifraud_bw WHERE cid IN (" + placeholders + ") AND type = 1";
try (PreparedStatement pstmt = conn.prepareStatement(sql)) {
// 绑定参数值(自动转义)
for (int i = 0; i < batch.size(); i++) {
pstmt.setString(i + 1, batch.get(i));
}
// 执行查询
try (ResultSet rs = pstmt.executeQuery()) {
while (rs.next()) {
results.add(rs.getString("cid"));
}
}
} catch (SQLException e) {
logger.error("queryDorisWhitelist 分批查询失败,参数: {}", batch, e);
}
}
} catch (Exception e) {
logger.error("queryDorisWhitelist 白名单查询失败", e);
}
return results;
}
// 辅助方法:将 Set 分批处理
private List<List<String>> partitionSet(Set<String> set, int batchSize) {
List<String> list = new ArrayList<>(set);
List<List<String>> batches = new ArrayList<>();
for (int i = 0; i < list.size(); i += batchSize) {
batches.add(list.subList(i, Math.min(i + batchSize, list.size())));
}
return batches;
}
@Override
public void sendToSinkKafka(DataStreamSource<String> mStream) {
......
......@@ -9,6 +9,8 @@ import org.apache.doris.flink.sink.writer.LoadConstants;
import org.apache.doris.flink.sink.writer.serializer.RowDataSerializer;
import org.apache.flink.table.data.RowData;
import org.apache.flink.table.types.DataType;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import com.flink.util.LoadPropertiesFile;
......@@ -19,6 +21,8 @@ import com.flink.util.LoadPropertiesFile;
*/
public class DorisConnector {
private static final Logger logger = LoggerFactory.getLogger(DorisConnector.class);
public static DorisSink<RowData> sinkDoris(String[] fields,DataType[] types,String tableName) {
//=================设置属性=========================================
......
package com.flink.common;
import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.PreparedStatement;
import java.sql.ResultSet;
import java.sql.ResultSetMetaData;
import java.sql.SQLException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.stream.Collectors;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import com.flink.util.LoadPropertiesFile;
/**
* 动态sql生成器
* @author wjs
*
*/
public class DynamicSqlBuilder {
private static final Logger logger = LoggerFactory.getLogger(DynamicSqlBuilder.class);
// ==================== 核心数据结构 ====================
public static class SqlColumn<T> {
private final String name;
public SqlColumn(String name) { this.name = name; }
public String getName() { return name; }
}
public static class ConditionGroup {
private final String logicOp;
private final List<String> conditions = new ArrayList<>();
private final List<Object> parameters = new ArrayList<>();
public ConditionGroup(String logicOp) {
this.logicOp = logicOp;
}
public ConditionGroup addCondition(String condition, Object value) {
if (value != null) {
conditions.add(condition);
parameters.add(value);
}
return this;
}
public String build() {
return conditions.isEmpty() ? "" :
"(" + String.join(" " + logicOp + " ", conditions) + ")";
}
public List<Object> getParameters() {
return parameters;
}
}
// ==================== 构建器主体 ====================
private final String baseSql;
private final List<String> conditions = new ArrayList<>();
private final List<Object> parameters = new ArrayList<>();
private final Map<SqlColumn<?>, List<Object>> inConditions = new HashMap<>();
private final Set<String> allowedColumns = new HashSet<>();
private int batchSize = 1000;
public DynamicSqlBuilder(String baseSql) {
this.baseSql = baseSql;
}
// ==================== 链式API方法 ====================
public DynamicSqlBuilder allowColumns(String... columns) {
Collections.addAll(allowedColumns, columns);
return this;
}
public DynamicSqlBuilder andGroup(ConditionGroup group) {
if (!group.getParameters().isEmpty()) {
conditions.add(group.build());
parameters.addAll(group.getParameters());
}
return this;
}
public <T> DynamicSqlBuilder andCondition(SqlColumn<T> column, String operator, T value) {
validateColumn(column.getName());
if (value != null) {
conditions.add(column.getName() + " " + operator + " ?");
parameters.add(value);
}
return this;
}
public DynamicSqlBuilder andLike(SqlColumn<String> column, String value) {
return andCondition(column, "LIKE", "%" + value + "%");
}
public <T> DynamicSqlBuilder andBetween(SqlColumn<T> column, T start, T end) {
validateColumn(column.getName());
if (start != null && end != null) {
conditions.add(column.getName() + " BETWEEN ? AND ?");
parameters.add(start);
parameters.add(end);
}
return this;
}
public <T> DynamicSqlBuilder andIn(SqlColumn<T> column, Collection<T> values) {
validateColumn(column.getName());
if (values != null && !values.isEmpty()) {
inConditions.put(column, new ArrayList<>(values));
}
return this;
}
public DynamicSqlBuilder batchSize(int batchSize) {
this.batchSize = batchSize;
return this;
}
// ==================== 核心构建逻辑 ====================
public List<SqlWithParams> build() {
// 无IN条件时直接返回
if (inConditions.isEmpty()) {
return Collections.singletonList(
new SqlWithParams(buildFinalSql(conditions), parameters)
);
}
List<SqlWithParams> result = new ArrayList<>();
List<Map.Entry<SqlColumn<?>, List<Object>>> inEntries = new ArrayList<>(inConditions.entrySet());
// 计算最大批次(每列独立处理)
int maxBatches = inEntries.stream()
.mapToInt(e -> (int) Math.ceil((double) e.getValue().size() / batchSize))
.max().orElse(1);
// 生成批次SQL
for (int batchIdx = 0; batchIdx < maxBatches; batchIdx++) {
List<String> batchConditions = new ArrayList<>(conditions);
List<Object> batchParams = new ArrayList<>(parameters);
List<String> inClauses = new ArrayList<>();
// 为每列添加当前批次
for (Map.Entry<SqlColumn<?>, List<Object>> entry : inEntries) {
List<Object> values = entry.getValue();
int fromIdx = batchIdx * batchSize;
if (fromIdx >= values.size()) continue;
int toIdx = Math.min(fromIdx + batchSize, values.size());
List<Object> batchValues = values.subList(fromIdx, toIdx);
String placeholders = Collections.nCopies(batchValues.size(), "?")
.stream().collect(Collectors.joining(","));
inClauses.add(entry.getKey().getName() + " IN (" + placeholders + ")");
batchParams.addAll(batchValues);
}
batchConditions.addAll(inClauses);
result.add(new SqlWithParams(
buildFinalSql(batchConditions),
batchParams
));
}
return result;
}
// ==================== 辅助方法 ====================
private void validateColumn(String column) {
if (!allowedColumns.isEmpty() && !allowedColumns.contains(column)) {
throw new IllegalArgumentException("非法列名: " + column);
}
}
private String buildFinalSql(List<String> conditions) {
return conditions.isEmpty() ? baseSql :
baseSql + " WHERE " + String.join(" AND ", conditions);
}
// ==================== 结果封装 ====================
public static class SqlWithParams {
public final String sql;
public final List<Object> params;
public SqlWithParams(String sql, List<Object> params) {
this.sql = sql;
this.params = params;
}
}
//查询doris
public static List<Map<String, Object>> queryDoris(List<SqlWithParams> queries){
List<Map<String, Object>> results = new ArrayList<>(); // 改用 Map 存储动态字段
try (Connection conn = DriverManager.getConnection(
LoadPropertiesFile.getPropertyFileValues("doris.jdbc_url"),
LoadPropertiesFile.getPropertyFileValues("doris.username"),
"")) {
conn.setClientInfo("useServerPrepStmts", "true");
for (SqlWithParams query : queries) {
try (PreparedStatement pstmt = conn.prepareStatement(query.sql)) {
// 参数化绑定防注入
for (int i = 0; i < query.params.size(); i++) {
pstmt.setObject(i + 1, query.params.get(i));
}
try (ResultSet rs = pstmt.executeQuery()) {
ResultSetMetaData metaData = rs.getMetaData(); // 获取结果集元数据
int columnCount = metaData.getColumnCount(); // 获取列数量
while (rs.next()) {
Map<String, Object> row = new LinkedHashMap<>(); // 按列顺序存储
for (int i = 1; i <= columnCount; i++) {
String columnName = metaData.getColumnName(i); // 动态获取列名
Object value = rs.getObject(i); // 动态获取值
row.put(columnName, value);
}
results.add(row); // 添加当前行数据
}
}
}
}
logger.info("查询结果: {}",results);
} catch (SQLException e) {
logger.error("DynamicSqlBuilder queryDoris SQL执行失败", e);
}
return results;
}
// ==================== 使用示例 ====================
public static void main(String[] args) throws SQLException {
// 1. 定义类型安全列(防拼写错误)
SqlColumn<Integer> TYPE = new SqlColumn<>("type");
SqlColumn<String> CID = new SqlColumn<>("cid");
SqlColumn<String> PHONE = new SqlColumn<>("phone");
// 2. 构建复杂条件组(支持OR逻辑)
// ConditionGroup riskGroup = new ConditionGroup("OR")
// .addCondition("risk_level > ?", 3)
// .addCondition("is_blacklist = ?", 1);
// 3. 生成动态SQL(自动分批)
List<SqlWithParams> queries = new DynamicSqlBuilder("SELECT cid, phone FROM bi.simi_user_antifraud_bw")
.allowColumns("type", "cid", "phone") // 防SQL注入
.andCondition(TYPE, "=", 1)
.andLike(PHONE, "177") // 模糊查询
// .andGroup(riskGroup) // 嵌套条件组
.andIn(CID, Arrays.asList("8555hotzh", "7418ltmhp"))
.andIn(PHONE, Arrays.asList("17710498555", "18765167418"))
.batchSize(2) // 每批2个参数
.build();
// 4. 执行查询
List<Map<String, Object>> results = new ArrayList<>(); // 改用 Map 存储动态字段
try (Connection conn = DriverManager.getConnection(
LoadPropertiesFile.getPropertyFileValues("doris.jdbc_url"),
LoadPropertiesFile.getPropertyFileValues("doris.username"),
"")) {
for (SqlWithParams query : queries) {
try (PreparedStatement pstmt = conn.prepareStatement(query.sql)) {
// 参数化绑定防注入
for (int i = 0; i < query.params.size(); i++) {
pstmt.setObject(i + 1, query.params.get(i));
}
try (ResultSet rs = pstmt.executeQuery()) {
ResultSetMetaData metaData = rs.getMetaData(); // 获取结果集元数据
int columnCount = metaData.getColumnCount(); // 获取列数量
while (rs.next()) {
Map<String, Object> row = new LinkedHashMap<>(); // 按列顺序存储
for (int i = 1; i <= columnCount; i++) {
String columnName = metaData.getColumnName(i); // 动态获取列名
Object value = rs.getObject(i); // 动态获取值
row.put(columnName, value);
}
results.add(row); // 添加当前行数据
// results.add(String.format(
// "CID: %s | Phone: %s",
// rs.getString("cid"),
// rs.getString("phone")
// ));
}
}
}
}
System.out.println("查询结果: " + results);
}
}
}
\ No newline at end of file
package com.flink.config;
import java.io.Serializable;
import org.apache.flink.table.types.DataType;
import lombok.Data;
/**
* @author wjs
* @version 创建时间:2025-6-11 11:31:01
* 类说明 表配置
*/
@Data
public class TableConfig implements Serializable{
/**
*
*/
private static final long serialVersionUID = 1L;
final String[] fields;
final DataType[] types;
final String tableName;
public TableConfig(String[] fields, DataType[] types, String tableName) {
this.fields = fields;
this.types = types;
this.tableName = tableName;
}
}
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or sign in to comment