Commit a0818dc7 by 魏建枢

代码提交支持sql在yaml里获取

parent d9306c6a
......@@ -199,6 +199,11 @@
<artifactId>HikariCP</artifactId>
<version>3.4.5</version>
</dependency>
<dependency>
<groupId>org.yaml</groupId>
<artifactId>snakeyaml</artifactId>
<version>1.30</version>
</dependency>
</dependencies>
......
......@@ -14,6 +14,7 @@ import org.slf4j.LoggerFactory;
import com.flink.common.SourceCommonBase;
import com.flink.enums.TopicTypeEnum;
import com.flink.util.SqlLoader;
import com.flink.vo.KafkaDataSource;
/**
......@@ -67,14 +68,14 @@ public class CommonConsumeBaseAchi extends SourceCommonBase implements Serializa
}
@Override
public void parseSourceKafkaToSqlTable(TopicTypeEnum topicTypeEnum, StreamTableEnvironment tableEnv)
public void parseSourceKafkaToSqlTable(TopicTypeEnum topicTypeEnum, StreamTableEnvironment tableEnv,SqlLoader loader)
throws ParseException, Exception {
// TODO Auto-generated method stub
}
@Override
public void parseMultipleSourceKafkaToSqlTable(StreamExecutionEnvironment env,StreamTableEnvironment tableEnv)
public void parseMultipleSourceKafkaToSqlTable(StreamExecutionEnvironment env,StreamTableEnvironment tableEnv,SqlLoader loader)
throws ParseException, Exception {
// TODO Auto-generated method stub
......
......@@ -41,6 +41,7 @@ import com.flink.processor.function.LatestUserProcessFunction;
import com.flink.processor.function.UserPropertiesProcessor;
import com.flink.util.CompareUtils;
import com.flink.util.LoadPropertiesFile;
import com.flink.util.SqlLoader;
import com.flink.util.TimeConvertUtil;
import com.flink.vo.CollectLog;
import com.flink.vo.DeviceId;
......@@ -507,14 +508,14 @@ public class DeviceIdLatestAchi extends SourceCommonBase implements Serializable
}
@Override
public void parseSourceKafkaToSqlTable(TopicTypeEnum topicTypeEnum, StreamTableEnvironment tableEnv)
public void parseSourceKafkaToSqlTable(TopicTypeEnum topicTypeEnum, StreamTableEnvironment tableEnv,SqlLoader loader)
throws ParseException, Exception {
// TODO Auto-generated method stub
}
@Override
public void parseMultipleSourceKafkaToSqlTable(StreamExecutionEnvironment env,StreamTableEnvironment tableEnv)
public void parseMultipleSourceKafkaToSqlTable(StreamExecutionEnvironment env,StreamTableEnvironment tableEnv,SqlLoader loader)
throws ParseException, Exception {
// TODO Auto-generated method stub
......
......@@ -28,6 +28,7 @@ import com.flink.common.DorisConnector;
import com.flink.common.SourceCommonBase;
import com.flink.enums.TopicTypeEnum;
import com.flink.util.CompareUtils;
import com.flink.util.SqlLoader;
import com.flink.util.ip2region.SearcherUtil;
import com.flink.vo.KafkaDataSource;
import com.flink.vo.OdsEventLog;
......@@ -177,14 +178,14 @@ public class EventIpConvertAchi extends SourceCommonBase implements Serializable
}
@Override
public void parseSourceKafkaToSqlTable(TopicTypeEnum topicTypeEnum, StreamTableEnvironment tableEnv)
public void parseSourceKafkaToSqlTable(TopicTypeEnum topicTypeEnum, StreamTableEnvironment tableEnv,SqlLoader loader)
throws ParseException, Exception {
// TODO Auto-generated method stub
}
@Override
public void parseMultipleSourceKafkaToSqlTable(StreamExecutionEnvironment env,StreamTableEnvironment tableEnv)
public void parseMultipleSourceKafkaToSqlTable(StreamExecutionEnvironment env,StreamTableEnvironment tableEnv,SqlLoader loader)
throws ParseException, Exception {
// TODO Auto-generated method stub
......
......@@ -41,6 +41,7 @@ import com.flink.common.SourceCommonBase;
import com.flink.enums.TopicTypeEnum;
import com.flink.processor.function.LatestUserProcessFunction;
import com.flink.util.CompareUtils;
import com.flink.util.SqlLoader;
import com.flink.util.TimeConvertUtil;
import com.flink.util.ip2region.SearcherUtil;
import com.flink.vo.EventIp;
......@@ -431,14 +432,14 @@ public class EventIpLatestAchi extends SourceCommonBase implements Serializable{
}
@Override
public void parseSourceKafkaToSqlTable(TopicTypeEnum topicTypeEnum, StreamTableEnvironment tableEnv)
public void parseSourceKafkaToSqlTable(TopicTypeEnum topicTypeEnum, StreamTableEnvironment tableEnv,SqlLoader loader)
throws ParseException, Exception {
// TODO Auto-generated method stub
}
@Override
public void parseMultipleSourceKafkaToSqlTable(StreamExecutionEnvironment env,StreamTableEnvironment tableEnv)
public void parseMultipleSourceKafkaToSqlTable(StreamExecutionEnvironment env,StreamTableEnvironment tableEnv,SqlLoader loader)
throws ParseException, Exception {
// TODO Auto-generated method stub
......
......@@ -32,6 +32,7 @@ import com.flink.enums.TopicTypeEnum;
import com.flink.processor.function.DistinctUserAggregator;
import com.flink.processor.function.LatestUserProcessFunction;
import com.flink.processor.function.WindowResultFunction;
import com.flink.util.SqlLoader;
import com.flink.util.TimeConvertUtil;
import com.flink.vo.DeviceId;
import com.flink.vo.DeviceRegistrationResult;
......@@ -213,14 +214,14 @@ public class RegistrationCheckAchi extends SourceCommonBase implements Serializa
}
@Override
public void parseSourceKafkaToSqlTable(TopicTypeEnum topicTypeEnum, StreamTableEnvironment tableEnv)
public void parseSourceKafkaToSqlTable(TopicTypeEnum topicTypeEnum, StreamTableEnvironment tableEnv,SqlLoader loader)
throws ParseException, Exception {
// TODO Auto-generated method stub
}
@Override
public void parseMultipleSourceKafkaToSqlTable(StreamExecutionEnvironment env,StreamTableEnvironment tableEnv)
public void parseMultipleSourceKafkaToSqlTable(StreamExecutionEnvironment env,StreamTableEnvironment tableEnv,SqlLoader loader)
throws ParseException, Exception {
// TODO Auto-generated method stub
......
......@@ -59,6 +59,7 @@ import com.flink.enums.TopicTypeEnum;
import com.flink.processor.function.LatestUserProcessFunction;
import com.flink.processor.function.SimiFriendsTempJoinProcessor;
import com.flink.processor.impl.OkHttpService;
import com.flink.util.SqlLoader;
import com.flink.util.TimeConvertUtil;
import com.flink.vo.DwdSysLog;
import com.flink.vo.EventIp;
......@@ -959,14 +960,14 @@ public class SimiFriendsAchi extends SourceCommonBase implements Serializable {
}
@Override
public void parseSourceKafkaToSqlTable(TopicTypeEnum topicTypeEnum, StreamTableEnvironment tableEnv)
public void parseSourceKafkaToSqlTable(TopicTypeEnum topicTypeEnum, StreamTableEnvironment tableEnv,SqlLoader loader)
throws ParseException, Exception {
// TODO Auto-generated method stub
}
@Override
public void parseMultipleSourceKafkaToSqlTable(StreamExecutionEnvironment env,StreamTableEnvironment tableEnv)
public void parseMultipleSourceKafkaToSqlTable(StreamExecutionEnvironment env,StreamTableEnvironment tableEnv,SqlLoader loader)
throws ParseException, Exception {
// TODO Auto-generated method stub
......
......@@ -30,6 +30,7 @@ import com.flink.common.SourceCommonBase;
import com.flink.enums.OpenSimiApiTypeEnum;
import com.flink.enums.TopicTypeEnum;
import com.flink.processor.impl.OkHttpService;
import com.flink.util.SqlLoader;
import com.flink.vo.KafkaDataSource;
import com.flink.vo.simi.InitiateFriendRequestReqDto;
import com.flink.vo.simi.SimiGroups;
......@@ -121,14 +122,14 @@ public class SimiGroupstAchi extends SourceCommonBase implements Serializable{
}
@Override
public void parseSourceKafkaToSqlTable(TopicTypeEnum topicTypeEnum, StreamTableEnvironment tableEnv)
public void parseSourceKafkaToSqlTable(TopicTypeEnum topicTypeEnum, StreamTableEnvironment tableEnv,SqlLoader loader)
throws ParseException, Exception {
// TODO Auto-generated method stub
}
@Override
public void parseMultipleSourceKafkaToSqlTable(StreamExecutionEnvironment env,StreamTableEnvironment tableEnv)
public void parseMultipleSourceKafkaToSqlTable(StreamExecutionEnvironment env,StreamTableEnvironment tableEnv,SqlLoader loader)
throws ParseException, Exception {
// TODO Auto-generated method stub
......
......@@ -60,6 +60,7 @@ import com.flink.common.SourceCommonBase;
import com.flink.enums.TopicTypeEnum;
import com.flink.processor.function.LatestUserProcessFunction;
import com.flink.util.CompareUtils;
import com.flink.util.SqlLoader;
import com.flink.util.TimeConvertUtil;
import com.flink.vo.DwdSysLog;
import com.flink.vo.EventList;
......@@ -829,14 +830,14 @@ public class UserDailyActivityAchi extends SourceCommonBase implements Serializa
}
@Override
public void parseSourceKafkaToSqlTable(TopicTypeEnum topicTypeEnum, StreamTableEnvironment tableEnv)
public void parseSourceKafkaToSqlTable(TopicTypeEnum topicTypeEnum, StreamTableEnvironment tableEnv,SqlLoader loader)
throws ParseException, Exception {
// TODO Auto-generated method stub
}
@Override
public void parseMultipleSourceKafkaToSqlTable(StreamExecutionEnvironment env,StreamTableEnvironment tableEnv)
public void parseMultipleSourceKafkaToSqlTable(StreamExecutionEnvironment env,StreamTableEnvironment tableEnv,SqlLoader loader)
throws ParseException, Exception {
// TODO Auto-generated method stub
......
......@@ -47,6 +47,7 @@ import com.flink.processor.function.VectorAngleProcessor;
import com.flink.processor.function.VectorSimilarityProcessor;
import com.flink.processor.function.VectorSimilarityProcessor.Point;
import com.flink.processor.function.VectorSimilarityProcessor.VectorPair;
import com.flink.util.SqlLoader;
import com.flink.util.TimeConvertUtil;
import com.flink.vo.CollectLog;
import com.flink.vo.CollectLogToJsonSource;
......@@ -840,14 +841,14 @@ public class VectorAngleCalculationAchi extends SourceCommonBase implements Seri
}
@Override
public void parseSourceKafkaToSqlTable(TopicTypeEnum topicTypeEnum, StreamTableEnvironment tableEnv)
public void parseSourceKafkaToSqlTable(TopicTypeEnum topicTypeEnum, StreamTableEnvironment tableEnv,SqlLoader loader)
throws ParseException, Exception {
// TODO Auto-generated method stub
}
@Override
public void parseMultipleSourceKafkaToSqlTable(StreamExecutionEnvironment env,StreamTableEnvironment tableEnv)
public void parseMultipleSourceKafkaToSqlTable(StreamExecutionEnvironment env,StreamTableEnvironment tableEnv,SqlLoader loader)
throws ParseException, Exception {
// TODO Auto-generated method stub
......
......@@ -16,6 +16,7 @@ import com.fasterxml.jackson.databind.node.ObjectNode;
import com.flink.common.KafkaConnector;
import com.flink.common.SourceCommonBase;
import com.flink.enums.TopicTypeEnum;
import com.flink.util.SqlLoader;
import com.flink.vo.KafkaDataSource;
/**
* @author wjs
......@@ -36,7 +37,7 @@ public class TableSqlSinkKafkaAchi extends SourceCommonBase implements Serializa
}
@Override
public void parseSourceKafkaToSqlTable(TopicTypeEnum topicTypeEnum, StreamTableEnvironment tableEnv)
public void parseSourceKafkaToSqlTable(TopicTypeEnum topicTypeEnum, StreamTableEnvironment tableEnv,SqlLoader loader)
throws ParseException, Exception {
// TODO Auto-generated method stub
......@@ -49,7 +50,7 @@ public class TableSqlSinkKafkaAchi extends SourceCommonBase implements Serializa
}
@Override
public void parseMultipleSourceKafkaToSqlTable(StreamExecutionEnvironment env,StreamTableEnvironment tableEnv) throws ParseException, Exception {
public void parseMultipleSourceKafkaToSqlTable(StreamExecutionEnvironment env,StreamTableEnvironment tableEnv,SqlLoader loader) throws ParseException, Exception {
Table resultTable = tableEnv.sqlQuery("SELECT "
+ "network_ip,"
+ "network_area_name,"
......
......@@ -14,6 +14,7 @@ import org.slf4j.LoggerFactory;
import com.flink.enums.JobTypeEnum;
import com.flink.enums.TopicTypeEnum;
import com.flink.util.SqlLoader;
import com.flink.vo.KafkaDataSource;
import com.flink.vo.KafkaTopic;
......@@ -42,7 +43,9 @@ public abstract class SourceCommonBase {
StreamTableEnvironment tableEnv = StreamEnvironmentSettings.createTableEnv(env, jobTypeEnum.getCode(),kafkaTopicList,false);
logger.info("2. table的环境设置成功");
//3.资源配置文件信息的获取
parseSourceKafkaToSqlTable(topicTypeEnum, tableEnv);
SqlLoader loader = new SqlLoader();
loader.load("sql-config.yaml");
parseSourceKafkaToSqlTable(topicTypeEnum, tableEnv,loader);
logger.info("3.Kafka资源ETL操作成功");
}
if(useStreamAPI){
......@@ -70,7 +73,9 @@ public abstract class SourceCommonBase {
StreamTableEnvironment tableEnv = StreamEnvironmentSettings.createTableEnv(env, jobName.getCode(),kafkaTopicList,false);
logger.info("2. table的环境设置成功");
//3.资源配置文件信息的获取
parseMultipleSourceKafkaToSqlTable(env,tableEnv);
SqlLoader loader = new SqlLoader();
loader.load("sql-config.yaml");
parseMultipleSourceKafkaToSqlTable(env,tableEnv,loader);
}
if(useStreamAPI){
//2.资源配置文件信息的获取
......@@ -109,7 +114,7 @@ public abstract class SourceCommonBase {
* @throws ParseException
* @throws Exception
*/
public abstract void parseSourceKafkaToSqlTable(TopicTypeEnum topicTypeEnum, StreamTableEnvironment tableEnv) throws ParseException, Exception;
public abstract void parseSourceKafkaToSqlTable(TopicTypeEnum topicTypeEnum, StreamTableEnvironment tableEnv,SqlLoader loader) throws ParseException, Exception;
/**
* 解析JSON数据(抽象方法的设置)
......@@ -126,6 +131,6 @@ public abstract class SourceCommonBase {
* @throws ParseException
* @throws Exception
*/
public abstract void parseMultipleSourceKafkaToSqlTable(StreamExecutionEnvironment env,StreamTableEnvironment tableEnv) throws ParseException, Exception;
public abstract void parseMultipleSourceKafkaToSqlTable(StreamExecutionEnvironment env,StreamTableEnvironment tableEnv,SqlLoader loader) throws ParseException, Exception;
}
package com.flink.util;
import java.io.InputStream;
import java.util.HashMap;
import java.util.Map;
import org.apache.flink.shaded.jackson2.org.yaml.snakeyaml.Yaml;
/**
* @author wjs
* @version 创建时间:2025-8-26 14:55:56
* 类说明
*/
public class SqlLoader {
private final Map<String, String> sqlMap = new HashMap<>();
/**
* 加载并解析YAML配置文件
* @param configPath YAML配置文件的类路径(如:"sql-config.yaml")
*/
public void load(String configPath) {
Yaml yaml = new Yaml();
try (InputStream inputStream = getClass().getClassLoader().getResourceAsStream(configPath)) {
if (inputStream == null) {
throw new RuntimeException("配置文件未找到: " + configPath);
}
// 解析YAML为Map结构
Map<String, Object> yamlMap = yaml.load(inputStream);
// 提取SQL配置部分
if (yamlMap.containsKey("queries")) {
Map<String, String> queries = (Map<String, String>) yamlMap.get("queries");
if (queries != null) {
sqlMap.putAll(queries);
}
}
} catch (Exception e) {
throw new RuntimeException("YAML解析失败", e);
}
}
/**
* 获取带参数替换的SQL
* @param key SQL配置的唯一标识符
* @param params 参数键值对(如{"date":"2025-08-26"})
* @return 参数替换后的SQL字符串
*/
public String getSql(String key, Map<String, String> params) {
String sql = sqlMap.get(key);
if (sql == null) return null;
for (Map.Entry<String, String> entry : params.entrySet()) {
String placeholder = "${" + entry.getKey() + "}";
sql = sql.replace(placeholder, entry.getValue());
}
return sql;
}
}
# SQL配置中心
queries:
collect_log_view:
SELECT * FROM collect_log_view where dt='${date}'
event_log_view:
SELECT * FROM event_log_view
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or sign in to comment