Commit 067dc8d7 by 魏建枢

代码提交,整合flink sql

parent 667930aa
Showing with 439 additions and 106 deletions
......@@ -14,7 +14,6 @@ import org.slf4j.LoggerFactory;
import com.flink.common.SourceCommonBase;
import com.flink.enums.TopicTypeEnum;
import com.flink.vo.KafkaDataSource;
import com.flink.vo.KafkaTopic;
/**
* @author wjs
......@@ -74,7 +73,7 @@ public class CommonConsumeBaseAchi extends SourceCommonBase implements Serializa
}
@Override
public void parseMultipleSourceKafkaToSqlTable(List<KafkaTopic> dataSourceList, StreamTableEnvironment tableEnv)
public void parseMultipleSourceKafkaToSqlTable(StreamTableEnvironment tableEnv)
throws ParseException, Exception {
// TODO Auto-generated method stub
......
......@@ -45,7 +45,6 @@ import com.flink.vo.CollectLog;
import com.flink.vo.DeviceId;
import com.flink.vo.DeviceIdInfo;
import com.flink.vo.KafkaDataSource;
import com.flink.vo.KafkaTopic;
import com.flink.vo.PcCollectLog;
import com.flink.vo.PcDeviceInfo;
import com.flink.vo.Result;
......@@ -514,7 +513,7 @@ public class DeviceIdLatestAchi extends SourceCommonBase implements Serializable
}
@Override
public void parseMultipleSourceKafkaToSqlTable(List<KafkaTopic> dataSourceList, StreamTableEnvironment tableEnv)
public void parseMultipleSourceKafkaToSqlTable(StreamTableEnvironment tableEnv)
throws ParseException, Exception {
// TODO Auto-generated method stub
......
......@@ -29,7 +29,6 @@ import com.flink.enums.TopicTypeEnum;
import com.flink.util.CompareUtils;
import com.flink.util.ip2region.SearcherUtil;
import com.flink.vo.KafkaDataSource;
import com.flink.vo.KafkaTopic;
import com.flink.vo.OdsEventLog;
import com.flink.vo.UserProperties;
......@@ -184,7 +183,7 @@ public class EventIpConvertAchi extends SourceCommonBase implements Serializable
}
@Override
public void parseMultipleSourceKafkaToSqlTable(List<KafkaTopic> dataSourceList, StreamTableEnvironment tableEnv)
public void parseMultipleSourceKafkaToSqlTable(StreamTableEnvironment tableEnv)
throws ParseException, Exception {
// TODO Auto-generated method stub
......
......@@ -44,7 +44,6 @@ import com.flink.util.TimeConvertUtil;
import com.flink.util.ip2region.SearcherUtil;
import com.flink.vo.EventIp;
import com.flink.vo.KafkaDataSource;
import com.flink.vo.KafkaTopic;
import com.flink.vo.OdsEventLog;
import com.flink.vo.PcEventInfo;
import com.flink.vo.PcOdsEventLog;
......@@ -434,7 +433,7 @@ public class EventIpLatestAchi extends SourceCommonBase implements Serializable{
}
@Override
public void parseMultipleSourceKafkaToSqlTable(List<KafkaTopic> dataSourceList, StreamTableEnvironment tableEnv)
public void parseMultipleSourceKafkaToSqlTable(StreamTableEnvironment tableEnv)
throws ParseException, Exception {
// TODO Auto-generated method stub
......
......@@ -35,7 +35,6 @@ import com.flink.util.TimeConvertUtil;
import com.flink.vo.DeviceId;
import com.flink.vo.DeviceRegistrationResult;
import com.flink.vo.KafkaDataSource;
import com.flink.vo.KafkaTopic;
import com.flink.vo.SimiUserInfo;
import com.flink.vo.simi.UserRegistrationReqDto;
......@@ -220,7 +219,7 @@ public class RegistrationCheckAchi extends SourceCommonBase implements Serializa
}
@Override
public void parseMultipleSourceKafkaToSqlTable(List<KafkaTopic> dataSourceList, StreamTableEnvironment tableEnv)
public void parseMultipleSourceKafkaToSqlTable(StreamTableEnvironment tableEnv)
throws ParseException, Exception {
// TODO Auto-generated method stub
......
......@@ -56,7 +56,6 @@ import com.flink.processor.impl.OkHttpService;
import com.flink.util.TimeConvertUtil;
import com.flink.vo.EventIp;
import com.flink.vo.KafkaDataSource;
import com.flink.vo.KafkaTopic;
import com.flink.vo.SimiUserInfo;
import com.flink.vo.simi.FriendsStream;
import com.flink.vo.simi.InitiateFriendRequestReqDto;
......@@ -791,7 +790,7 @@ public class SimiFriendsAchi extends SourceCommonBase implements Serializable {
}
@Override
public void parseMultipleSourceKafkaToSqlTable(List<KafkaTopic> dataSourceList, StreamTableEnvironment tableEnv)
public void parseMultipleSourceKafkaToSqlTable(StreamTableEnvironment tableEnv)
throws ParseException, Exception {
// TODO Auto-generated method stub
......
......@@ -30,7 +30,6 @@ import com.flink.enums.OpenSimiApiTypeEnum;
import com.flink.enums.TopicTypeEnum;
import com.flink.processor.impl.OkHttpService;
import com.flink.vo.KafkaDataSource;
import com.flink.vo.KafkaTopic;
import com.flink.vo.simi.InitiateFriendRequestReqDto;
import com.flink.vo.simi.SimiGroups;
......@@ -128,7 +127,7 @@ public class SimiGroupstAchi extends SourceCommonBase implements Serializable{
}
@Override
public void parseMultipleSourceKafkaToSqlTable(List<KafkaTopic> dataSourceList, StreamTableEnvironment tableEnv)
public void parseMultipleSourceKafkaToSqlTable(StreamTableEnvironment tableEnv)
throws ParseException, Exception {
// TODO Auto-generated method stub
......
......@@ -59,7 +59,6 @@ import com.flink.util.CompareUtils;
import com.flink.util.TimeConvertUtil;
import com.flink.vo.EventList;
import com.flink.vo.KafkaDataSource;
import com.flink.vo.KafkaTopic;
import com.flink.vo.OdsEventLog;
import com.flink.vo.PcEventInfo;
import com.flink.vo.PcOdsEventLog;
......@@ -740,7 +739,7 @@ public class UserDailyActivityAchi extends SourceCommonBase implements Serializa
}
@Override
public void parseMultipleSourceKafkaToSqlTable(List<KafkaTopic> dataSourceList, StreamTableEnvironment tableEnv)
public void parseMultipleSourceKafkaToSqlTable(StreamTableEnvironment tableEnv)
throws ParseException, Exception {
// TODO Auto-generated method stub
......
......@@ -52,7 +52,6 @@ import com.flink.vo.CollectLogToJsonSource;
import com.flink.vo.EventList;
import com.flink.vo.EventLogToJsonSource;
import com.flink.vo.KafkaDataSource;
import com.flink.vo.KafkaTopic;
import com.flink.vo.OdsEventLog;
import com.flink.vo.UserProperties;
import com.flink.vo.android.deviceInfo.AndroidA1;
......@@ -847,7 +846,7 @@ public class VectorAngleCalculationAchi extends SourceCommonBase implements Seri
}
@Override
public void parseMultipleSourceKafkaToSqlTable(List<KafkaTopic> dataSourceList, StreamTableEnvironment tableEnv)
public void parseMultipleSourceKafkaToSqlTable(StreamTableEnvironment tableEnv)
throws ParseException, Exception {
// TODO Auto-generated method stub
......
......@@ -12,9 +12,7 @@ import org.slf4j.LoggerFactory;
import com.flink.common.SourceCommonBase;
import com.flink.enums.TopicTypeEnum;
import com.flink.processor.impl.ParseEventListUDTF;
import com.flink.vo.KafkaDataSource;
import com.flink.vo.KafkaTopic;
/**
* @author wjs
* @version 创建时间:2025-8-18 15:04:49
......@@ -48,79 +46,12 @@ public class RegistrationCheckAchi extends SourceCommonBase implements Serializa
}
@Override
public void parseMultipleSourceKafkaToSqlTable(List<KafkaTopic> dataSourceList, StreamTableEnvironment tableEnv) throws Exception {
// 1. 时区与配置优化
tableEnv.getConfig()
.set("table.local-time-zone", "Asia/Shanghai")
.set("parallelism.default", "4");
public void parseMultipleSourceKafkaToSqlTable(StreamTableEnvironment tableEnv) throws Exception {
tableEnv.createTemporarySystemFunction("ParseEventList", ParseEventListUDTF.class);
// 2. 正确定义 Kafka 源表结构
tableEnv.executeSql(
"CREATE TABLE kafka_event_log (" +
" `app_type` STRING, " +
" `unique_id` STRING, " +
" `create_time` TIMESTAMP(3), " +
" `device_id` STRING, " +
" `user_properties` STRING, " +
" `event_list` STRING," +
" `zone_name` STRING, " +
" `strategy_group_id` STRING, " +
" `flume_type` STRING, " +
" `zone_code` STRING, " +
" `app_channel` STRING, " +
" `uid` STRING, " +
" `send_time` TIMESTAMP(3), " +
" `app_key` STRING, " +
" `strategy_version` STRING, " +
" `sdk_version` STRING, " +
" `zone_type` STRING, " +
" `id` STRING, " +
" `user_agent` STRING, " +
" `topic` STRING METADATA FROM 'topic' VIRTUAL, " +
" WATERMARK FOR create_time AS create_time - INTERVAL '5' SECOND" +
") WITH (" +
" 'connector' = 'kafka'," +
" 'topic' = 'ods_event_log'," +
" 'properties.bootstrap.servers' = '168.138.185.142:9092,213.35.103.223:9092,129.150.49.247:9092'," +
" 'properties.group.id' = 'odsEventLog'," +
" 'scan.startup.mode' = 'latest-offset'," +
" 'format' = 'json'," +
" 'json.ignore-parse-errors' = 'true'" +
")"
);
Table result = tableEnv.sqlQuery(
"SELECT " +
" k.app_type, " +
" k.unique_id, " +
" k.create_time, " +
" k.device_id, " +
" k.zone_name, " +
" k.strategy_group_id, " +
" k.flume_type, " +
" k.zone_code, " +
" k.app_channel, " +
" k.uid, " +
" k.send_time, " +
" k.app_key, " +
" k.strategy_version, " +
" k.sdk_version, " +
" k.zone_type, " +
" k.id, " +
" k.user_agent, " +
" t.r7 AS r7, " +
" t.r8_r2 AS r2, " +
" t.r8_r3 AS r3, " +
" t.r8_s2['cid'] AS cid, " + // 直接访问Map
" t.r8_s2['phone'] AS phone, " +
" t.r8_s2['nick'] AS nick, " +
" t.r9 AS r9 " +
"FROM kafka_event_log AS k " +
"LEFT JOIN LATERAL TABLE(ParseEventList(event_list,user_properties)) AS t(r7, r8_r2, r8_r3, r8_r6, r8_s2, r9) ON TRUE"
);
Table result = tableEnv.sqlQuery("SELECT * FROM collect_log_view");
result.execute().print();
// Table result = tableEnv.sqlQuery("SELECT * FROM event_log_view");
// result.execute().print();
}
}
package com.flink.achieve.table.schema;
import static org.apache.flink.table.api.DataTypes.STRING;
import static org.apache.flink.table.api.DataTypes.TIMESTAMP;
import org.apache.flink.table.api.Schema;
import org.apache.flink.table.api.TableDescriptor;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
import com.flink.util.LoadPropertiesFile;
/**
* @author wjs
* @version 创建时间:2025-8-19 16:55:12
* 类说明 定义 Kafka源表结构(包含所有字段和水印定义)
*/
public class KafkaBaseSchema {
//-----------------------------------------------------定义 Kafka源表结构并创建Schema--------------------------------------------------------------------------
public static void eventLogSchema(StreamTableEnvironment tableEnv,String topic,String groupId,String tableName) {
Schema schema = Schema.newBuilder()
.column("app_type", STRING())
.column("unique_id", STRING())
.column("create_time", TIMESTAMP(3))
.column("device_id", STRING())
.column("user_properties", STRING())
.column("event_list", STRING())
.column("zone_name", STRING())
.column("strategy_group_id", STRING())
.column("flume_type", STRING())
.column("zone_code", STRING())
.column("app_channel", STRING())
.column("uid", STRING())
.column("send_time", TIMESTAMP(3))
.column("app_key", STRING())
.column("strategy_version", STRING())
.column("sdk_version", STRING())
.column("zone_type", STRING())
.column("id", STRING())
.column("user_agent", STRING())
.columnByMetadata("topic", STRING(), "topic", true) // 元数据字段
.watermark("create_time", "create_time - INTERVAL '5' SECOND") // 事件时间水印
.build();
createTableDescriptor(tableEnv, schema, topic, groupId, tableName);
}
public static void collectLogSchema(StreamTableEnvironment tableEnv,String topic,String groupId,String tableName) {
Schema schema = Schema.newBuilder()
.column("app_type", STRING())
.column("other_info", STRING())
.column("unique_id", STRING())
.column("device_id", STRING())
.column("create_time", TIMESTAMP(3)) // 事件时间字段
.column("send_type", STRING())
.column("zone_name", STRING())
.column("env_info", STRING())
.column("strategy_group_id", STRING())
.column("flume_type", STRING())
.column("zone_code", STRING())
.column("uid", STRING())
.column("app_channel", STRING())
.column("strategy_version", STRING())
.column("send_time", TIMESTAMP(3))
.column("app_key", STRING())
.column("device_info", STRING())
.column("sdk_version", STRING())
.column("zone_type", STRING())
.column("id", STRING())
.column("user_agent", STRING())
.column("user_properties", STRING())
.columnByMetadata("topic", STRING(), "topic", true) // Kafka 元数据字段
.watermark("create_time", "create_time - INTERVAL '5' SECOND") // 水印策略
.build();
createTableDescriptor(tableEnv, schema, topic, groupId, tableName);
}
//-----------------------------------------------------创建TableDescriptor--------------------------------------------------------------------------
public static void createTableDescriptor(StreamTableEnvironment tableEnv,Schema schema,String topic,String groupId,String tableName) {
TableDescriptor descriptor = TableDescriptor.forConnector("kafka")
.schema(schema)
.option("topic", topic)
.option("properties.bootstrap.servers",LoadPropertiesFile.getPropertyFileValues("kafka.bootstrapServers"))
.option("properties.group.id", groupId)
.option("scan.startup.mode", "latest-offset")
.option("format", "json")
.option("json.ignore-parse-errors", "true")
.build();
tableEnv.createTemporaryTable(tableName, descriptor);
}
}
package com.flink.achieve.table.sql;
import java.util.List;
import org.apache.commons.collections.CollectionUtils;
import org.apache.flink.calcite.shaded.org.apache.commons.codec.binary.StringUtils;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
import com.flink.achieve.table.schema.KafkaBaseSchema;
import com.flink.enums.TopicTypeEnum;
import com.flink.vo.KafkaTopic;
/**
* @author wjs
* @version 创建时间:2025-8-19 13:38:02
* 类说明 创建 创建kafka转sqlTable表
*/
public class BaseKafkaToSqlCreateTable {
public static void kafkaToSqlCreateTable(List<KafkaTopic> kafkaTopicList, StreamTableEnvironment tableEnv) {
//1. 时区与配置优化
tableEnv.getConfig().set("table.local-time-zone", "Asia/Shanghai").set("parallelism.default", "4");
//2.定义 Kafka源表结构
if(CollectionUtils.isNotEmpty(kafkaTopicList)) {
for(KafkaTopic kafkaTopic : kafkaTopicList) {
String topic = kafkaTopic.getTopic();
String groupId = kafkaTopic.getGroup();
if(StringUtils.equals(kafkaTopic.getTopic(), TopicTypeEnum.ODS_EVENT_LOG.getTopic())) {
KafkaBaseSchema.eventLogSchema(tableEnv,topic,groupId,"kafka_event_log");
EventLogTable.buildEventQuery(tableEnv);
// Table eventResult = EventLogTable.buildEventQuery(tableEnv);
// tableEnv.toDataStream(eventResult).print("事件表结果");
}else if(StringUtils.equals(kafkaTopic.getTopic(), TopicTypeEnum.ODS_NEW_COLLECT_LOG.getTopic())) {
KafkaBaseSchema.collectLogSchema(tableEnv,topic,groupId,"kafka_collect_log");
CollectLogTable.buildCollectQuery(tableEnv);
// Table collectResult = CollectLogTable.buildCollectQuery(tableEnv);
// tableEnv.toDataStream(collectResult).print("采集表结果");
}else if(StringUtils.equals(kafkaTopic.getTopic(), TopicTypeEnum.ODS_ZIPPER_STRATEGY.getTopic())) {
}else if(StringUtils.equals(kafkaTopic.getTopic(), TopicTypeEnum.ODS_ZIPPER_DEVICE_FINGERPRINT.getTopic())) {
}else if(StringUtils.equals(kafkaTopic.getTopic(), TopicTypeEnum.ODS_DEVICE_ID_NODE.getTopic())) {
}else if(StringUtils.equals(kafkaTopic.getTopic(), TopicTypeEnum.ODS_OPEN_API.getTopic())) {
}else if(StringUtils.equals(kafkaTopic.getTopic(), TopicTypeEnum.ODS_USER_FRIENDS.getTopic())) {
}else if(StringUtils.equals(kafkaTopic.getTopic(), TopicTypeEnum.ODS_USER_GROUPS.getTopic())) {
}else if(StringUtils.equals(kafkaTopic.getTopic(), TopicTypeEnum.ODS_USER_INFO.getTopic())) {
}else if(StringUtils.equals(kafkaTopic.getTopic(), TopicTypeEnum.ODS_USER_GROUP_CHAT_HISTORY.getTopic())) {
}else if(StringUtils.equals(kafkaTopic.getTopic(), TopicTypeEnum.ODS_USER_LIST.getTopic())) {
}else if(StringUtils.equals(kafkaTopic.getTopic(), TopicTypeEnum.ODS_DEVICE_OVERLAP.getTopic())) {
}else if(StringUtils.equals(kafkaTopic.getTopic(), TopicTypeEnum.ODS_CID_GROUP_OVERLAP.getTopic())) {
}else if(StringUtils.equals(kafkaTopic.getTopic(), TopicTypeEnum.ODS_EVENT_IP_CONVERT.getTopic())) {
}else if(StringUtils.equals(kafkaTopic.getTopic(), TopicTypeEnum.ODS_USER_INVITATION.getTopic())) {
}else if(StringUtils.equals(kafkaTopic.getTopic(), TopicTypeEnum.SIMI_USER_LIST_TOPIC.getTopic())) {
}else if(StringUtils.equals(kafkaTopic.getTopic(), TopicTypeEnum.ABROAD_SIMI_USER_LIST_TOPIC.getTopic())) {
}else if(StringUtils.equals(kafkaTopic.getTopic(), TopicTypeEnum.OPEN_SIMI_API.getTopic())) {
}else if(StringUtils.equals(kafkaTopic.getTopic(), TopicTypeEnum.ODS_PC_EVENT_LOG.getTopic())) {
}else if(StringUtils.equals(kafkaTopic.getTopic(), TopicTypeEnum.ODS_PC_COLLECT_LOG.getTopic())) {
}else if(StringUtils.equals(kafkaTopic.getTopic(), TopicTypeEnum.ODS_COMMUNITY_HISTORY.getTopic())) {
}
}
}
}
}
package com.flink.achieve.table.sql;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
import com.flink.achieve.table.udf.ParseCollectListUDTF;
/**
* @author wjs
* @version 创建时间:2025-8-19 14:07:56
* 类说明
*/
public class CollectLogTable {
public static void buildCollectQuery(StreamTableEnvironment tableEnv) {
//定义 Kafka源表结构
// tableEnv.executeSql(
// "CREATE TABLE kafka_collect_log (" +
// " `app_type` STRING, " +
// " `other_info` STRING, " +
// " `unique_id` STRING, " +
// " `device_id` STRING, " +
// " `create_time` TIMESTAMP(3), " +
// " `send_type` STRING," +
// " `zone_name` STRING, " +
// " `env_info` STRING, " +
// " `strategy_group_id` STRING, " +
// " `flume_type` STRING, " +
// " `zone_code` STRING, " +
// " `uid` STRING, " +
// " `app_channel` STRING, " +
// " `strategy_version` STRING, " +
// " `send_time` TIMESTAMP(3), " +
// " `app_key` STRING, " +
// " `device_info` STRING, " +
// " `sdk_version` STRING, " +
// " `zone_type` STRING, " +
// " `id` STRING, " +
// " `user_agent` STRING, " +
// " `user_properties` STRING, " +
// " `topic` STRING METADATA FROM 'topic' VIRTUAL, " +
// " WATERMARK FOR create_time AS create_time - INTERVAL '5' SECOND" +
// ") WITH (" +
// " 'connector' = 'kafka'," +
// " 'topic' = 'ods_new_collect_log'," +
// " 'properties.bootstrap.servers' = '168.138.185.142:9092,213.35.103.223:9092,129.150.49.247:9092'," +
// " 'properties.group.id' = 'odsNewCollectLog'," +
// " 'scan.startup.mode' = 'latest-offset'," +
// " 'format' = 'json'," +
// " 'json.ignore-parse-errors' = 'true'" +
// ")"
// );
tableEnv.createTemporarySystemFunction("ParseCollectList", ParseCollectListUDTF.class);
//查询sql数据 并执行
tableEnv.executeSql(
"CREATE VIEW collect_log_view AS " +
"SELECT " +
" k.app_type, " +
" k.unique_id, " +
" k.device_id, " +
" t.device_v1, " +
" k.create_time, " +
" k.send_type, " +
" k.zone_name, " +
" k.strategy_group_id, " +
" k.flume_type, " +
" k.zone_code, " +
" k.uid, " +
" k.app_channel, " +
" k.strategy_version, " +
" k.send_time, " +
" k.app_key, " +
" k.sdk_version, " +
" k.zone_type, " +
" k.id, " +
" k.user_agent, " +
" t.user_info['cid'] AS cid, " +
" t.user_info['phone'] AS phone, " +
" t.user_info['nick'] AS nick " +
"FROM kafka_collect_log AS k " +
"LEFT JOIN LATERAL TABLE(ParseCollectList(app_type,app_key,other_info,env_info,device_info,user_properties)) AS t(device_v1, user_info) ON TRUE"
);
// Table result = tableEnv.sqlQuery("SELECT * FROM collect_log_view");
// result.execute().print();
}
}
package com.flink.achieve.table.sql;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
import com.flink.achieve.table.udf.ParseEventListUDTF;
/**
* @author wjs
* @version 创建时间:2025-8-19 13:44:47
* 类说明 事件表
*/
public class EventLogTable {
public static void buildEventQuery(StreamTableEnvironment tableEnv) {
//定义 Kafka源表结构
// tableEnv.executeSql(
// "CREATE TABLE kafka_event_log (" +
// " `app_type` STRING, " +
// " `unique_id` STRING, " +
// " `create_time` TIMESTAMP(3), " +
// " `device_id` STRING, " +
// " `user_properties` STRING, " +
// " `event_list` STRING," +
// " `zone_name` STRING, " +
// " `strategy_group_id` STRING, " +
// " `flume_type` STRING, " +
// " `zone_code` STRING, " +
// " `app_channel` STRING, " +
// " `uid` STRING, " +
// " `send_time` TIMESTAMP(3), " +
// " `app_key` STRING, " +
// " `strategy_version` STRING, " +
// " `sdk_version` STRING, " +
// " `zone_type` STRING, " +
// " `id` STRING, " +
// " `user_agent` STRING, " +
// " `topic` STRING METADATA FROM 'topic' VIRTUAL, " +
// " WATERMARK FOR create_time AS create_time - INTERVAL '5' SECOND" +
// ") WITH (" +
// " 'connector' = 'kafka'," +
// " 'topic' = 'ods_event_log'," +
// " 'properties.bootstrap.servers' = '168.138.185.142:9092,213.35.103.223:9092,129.150.49.247:9092'," +
// " 'properties.group.id' = 'odsEventLog'," +
// " 'scan.startup.mode' = 'latest-offset'," +
// " 'format' = 'json'," +
// " 'json.ignore-parse-errors' = 'true'" +
// ")"
// );
//添加UDF
tableEnv.createTemporarySystemFunction("ParseEventList", ParseEventListUDTF.class);
//查询sql数据 并执行
tableEnv.executeSql(
"CREATE VIEW event_log_view AS " +
"SELECT " +
" k.app_type, " +
" k.unique_id, " +
" k.create_time, " +
" k.device_id, " +
" k.zone_name, " +
" k.strategy_group_id, " +
" k.flume_type, " +
" k.zone_code, " +
" k.app_channel, " +
" k.uid, " +
" k.send_time, " +
" k.app_key, " +
" k.strategy_version, " +
" k.sdk_version, " +
" k.zone_type, " +
" k.id, " +
" k.user_agent, " +
" t.r7 AS r7, " +
" t.r8_r2 AS r2, " +
" t.r8_r3 AS r3, " +
" t.r8_s2['cid'] AS cid, " +
" t.r8_s2['phone'] AS phone, " +
" t.r8_s2['nick'] AS nick, " +
" t.r9 AS r9 " +
"FROM kafka_event_log AS k " +
"LEFT JOIN LATERAL TABLE(ParseEventList(event_list,user_properties)) AS t(r7, r8_r2, r8_r3, r8_r6, r8_s2, r9) ON TRUE"
);
// Table result = tableEnv.sqlQuery("SELECT * FROM event_log_view");
// result.execute().print();
}
}
package com.flink.achieve.table.udf;
import java.util.HashMap;
import java.util.Map;
import org.apache.flink.table.annotation.DataTypeHint;
import org.apache.flink.table.annotation.FunctionHint;
import org.apache.flink.table.functions.TableFunction;
import org.apache.flink.types.Row;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import com.flink.processor.function.GenDeviceIdProcessor;
import com.flink.processor.function.UserPropertiesProcessor;
import com.flink.vo.DeviceIdInfo;
import com.flink.vo.UserProperties;
/**
* @author wjs
* @version 创建时间:2025-8-19 14:30:02
* 类说明
*/
@FunctionHint(output = @DataTypeHint("ROW<device_v1 STRING, user_info MAP<STRING,STRING>>"))
public class ParseCollectListUDTF extends TableFunction<Row>{
/**
*
*/
private static final long serialVersionUID = 1L;
private static final Logger logger = LoggerFactory.getLogger(ParseCollectListUDTF.class);
public void eval(String appType, String appKey,String other_info,String env_info,String device_info,String userInfo) {
try {
DeviceIdInfo deviceIdInfo = GenDeviceIdProcessor.genDeviceId(appType, appKey, other_info, device_info,env_info);
UserProperties userProps = UserPropertiesProcessor.userPropertiesToJson(userInfo);
Map<String, String> userParams = new HashMap<>();
userParams.put("cid", userProps.getCid());
userParams.put("phone", userProps.getPhone());
userParams.put("nick", userProps.getNick());
collect(Row.of(
deviceIdInfo.getDeviceIdV1(),
userParams
));
} catch (Exception e) {
logger.error("ParseCollectListUDTF eval e:{}",e.toString());
}
}
}
package com.flink.common;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import org.apache.commons.collections.CollectionUtils;
......@@ -35,7 +36,10 @@ public abstract class SourceCommonBase {
logger.info("1. 环境的设置成功");
if(useTableAPI) {
//2.table环境设置
StreamTableEnvironment tableEnv = StreamEnvironmentSettings.createTableEnv(env, jobTypeEnum.getCode(),false);
List<KafkaTopic> kafkaTopicList = Arrays.asList(
new KafkaTopic(topicTypeEnum.getGroup(), topicTypeEnum.getTopic())
);
StreamTableEnvironment tableEnv = StreamEnvironmentSettings.createTableEnv(env, jobTypeEnum.getCode(),kafkaTopicList,false);
logger.info("2. table的环境设置成功");
//3.资源配置文件信息的获取
parseSourceKafkaToSqlTable(topicTypeEnum, tableEnv);
......@@ -63,10 +67,10 @@ public abstract class SourceCommonBase {
logger.info("1. 环境的设置成功");
if(useTableAPI) {
//2.table环境设置
StreamTableEnvironment tableEnv = StreamEnvironmentSettings.createTableEnv(env, jobName.getCode(),false);
StreamTableEnvironment tableEnv = StreamEnvironmentSettings.createTableEnv(env, jobName.getCode(),kafkaTopicList,false);
logger.info("2. table的环境设置成功");
//3.资源配置文件信息的获取
parseMultipleSourceKafkaToSqlTable(kafkaTopicList,tableEnv);
parseMultipleSourceKafkaToSqlTable(tableEnv);
}
if(useStreamAPI){
//2.资源配置文件信息的获取
......@@ -122,6 +126,6 @@ public abstract class SourceCommonBase {
* @throws ParseException
* @throws Exception
*/
public abstract void parseMultipleSourceKafkaToSqlTable(List<KafkaTopic> dataSourceList, StreamTableEnvironment tableEnv) throws ParseException, Exception;
public abstract void parseMultipleSourceKafkaToSqlTable(StreamTableEnvironment tableEnv) throws ParseException, Exception;
}
......@@ -2,6 +2,7 @@ package com.flink.common;
import java.time.Duration;
import java.time.ZoneId;
import java.util.List;
import org.apache.flink.api.common.RuntimeExecutionMode;
import org.apache.flink.configuration.CheckpointingOptions;
......@@ -19,7 +20,9 @@ import org.apache.flink.table.api.config.TableConfigOptions;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import com.flink.achieve.table.sql.BaseKafkaToSqlCreateTable;
import com.flink.util.LoadPropertiesFile;
import com.flink.vo.KafkaTopic;
/**
* @author wjs
......@@ -43,8 +46,6 @@ public class StreamEnvironmentSettings {
// 指定检查点目录(必须是持久化存储路径,如 HDFS)
// config.setString("state.checkpoints.dir", "hdfs://140.245.112.44:8020/user/ck");
// 状态后端设为 RocksDB
config.set(StateBackendOptions.STATE_BACKEND, "rocksdb");
// 检查点存储为文件系统
......@@ -62,7 +63,6 @@ public class StreamEnvironmentSettings {
// 非对齐检查点加速Barrier传播
config.set(CheckpointingOptions.ENABLE_UNALIGNED, true);
config.set(CheckpointingOptions.ALIGNED_CHECKPOINT_TIMEOUT, Duration.ofSeconds(30));
// 可选:指定保存点目录
// config.setString("state.savepoints.dir", "hdfs://140.245.112.44:8020/user/savepoints");
......@@ -103,7 +103,7 @@ public class StreamEnvironmentSettings {
}
//创建支持嵌套JSON的Table环境
public static StreamTableEnvironment createTableEnv(StreamExecutionEnvironment env, String jobType, boolean enableHiveCatalog) {
public static StreamTableEnvironment createTableEnv(StreamExecutionEnvironment env, String jobType,List<KafkaTopic> kafkaTopicList, boolean enableHiveCatalog) {
logger.info("StreamEnvironmentSettings createTableEnv start jobType:{}",jobType);
// Blink Planner配置
EnvironmentSettings settings = EnvironmentSettings
......@@ -132,6 +132,9 @@ public class StreamEnvironmentSettings {
// );
// tableEnv.useCatalog("hive_catalog");
// }
//获取kafka数据转为table表
BaseKafkaToSqlCreateTable.kafkaToSqlCreateTable(kafkaTopicList, tableEnv);
logger.info("StreamEnvironmentSettings createTableEnv end jobType:{}",jobType);
return tableEnv;
}
......
......@@ -5,14 +5,13 @@ import java.io.Serializable;
import lombok.Data;
import lombok.ToString;
/**
* @author wjs
* @version 创建时间:2025-5-27 15:37:52
* 类说明
*/
/**
* @author wjs
* @version 创建时间:2025-5-27 15:37:52 类说明
*/
@Data
@ToString
public class KafkaTopic implements Serializable{
public class KafkaTopic implements Serializable {
/**
*
......@@ -21,4 +20,13 @@ public class KafkaTopic implements Serializable{
private String topic;
private String group;
public KafkaTopic(String topic, String group) {
super();
this.topic = topic;
this.group = group;
}
public KafkaTopic() {
}
}
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or sign in to comment