Commit 14b52cf9 by 魏建枢

转化kafka为临时表

parent de18ac73
......@@ -63,7 +63,7 @@ public class KafkaBaseSchema {
.column("other_info", STRING())
.column("unique_id", STRING())
.column("device_id", STRING())
.column("create_time", TIMESTAMP(3)) // 事件时间字段
.column("create_time", TIMESTAMP(3))
.column("send_type", STRING())
.column("zone_name", STRING())
.column("env_info", STRING())
......@@ -93,7 +93,7 @@ public class KafkaBaseSchema {
.column("create_time", TIMESTAMP(3))
.column("description", STRING())
.column("request_method", STRING())
.column("title", STRING()) // 事件时间字段
.column("title", STRING())
.column("operate_url", STRING())
.column("operation_log_type", STRING())
.column("operate_name", STRING())
......@@ -110,4 +110,82 @@ public class KafkaBaseSchema {
.build();
createTableDescriptor(tableEnv, schema, topic, groupId, tableName);
}
public static void simiUserSchema(StreamTableEnvironment tableEnv, String topic, String groupId, String tableName) {
Schema schema = Schema.newBuilder()
.column("nick", STRING())
.column("country_code", STRING())
.column("user_head_url", STRING())
.column("create_time", TIMESTAMP(3))
.column("phone_number", STRING())
.column("register_time", TIMESTAMP(3))
.column("flume_type", STRING())
.column("cid", STRING())
.column("user_state", STRING())
.columnByMetadata("topic", STRING(), "topic", true) // Kafka 元数据字段
.watermark("create_time", "create_time - INTERVAL '5' SECOND") // 水印策略
.build();
createTableDescriptor(tableEnv, schema, topic, groupId, tableName);
}
public static void abroadSimiUserSchema(StreamTableEnvironment tableEnv, String topic, String groupId,String tableName) {
Schema schema = Schema.newBuilder()
.column("nick", STRING())
.column("country_code", STRING())
.column("user_head_url", STRING())
.column("create_time", TIMESTAMP(3))
.column("phone_number", STRING())
.column("register_time", TIMESTAMP(3))
.column("flume_type", STRING())
.column("cid", STRING())
.column("user_state", STRING())
.columnByMetadata("topic", STRING(), "topic", true) // Kafka 元数据字段
.watermark("create_time", "create_time - INTERVAL '5' SECOND") // 水印策略
.build();
createTableDescriptor(tableEnv, schema, topic, groupId, tableName);
}
public static void pcEventLogSchema(StreamTableEnvironment tableEnv, String topic, String groupId, String tableName) {
Schema schema = Schema.newBuilder()
.column("app_type", STRING())
.column("app_version", STRING())
.column("create_time", TIMESTAMP(3))
.column("send_type", STRING())
.column("zone_name", STRING())
.column("zone_code", STRING())
.column("flume_type", STRING())
.column("app_channel", STRING())
.column("app_key", STRING())
.column("send_time", TIMESTAMP(3))
.column("event_info", STRING())
.column("zone_type", STRING())
.column("id", STRING())
.column("user_agent", STRING())
.columnByMetadata("topic", STRING(), "topic", true) // Kafka 元数据字段
.watermark("create_time", "create_time - INTERVAL '5' SECOND") // 水印策略
.build();
createTableDescriptor(tableEnv, schema, topic, groupId, tableName);
}
public static void pcCollectLogSchema(StreamTableEnvironment tableEnv, String topic, String groupId,String tableName) {
Schema schema = Schema.newBuilder()
.column("app_type", STRING())
.column("app_version", STRING())
.column("create_time", TIMESTAMP(3))
.column("send_type", STRING())
.column("zone_name", STRING())
.column("zone_code", STRING())
.column("flume_type", STRING())
.column("app_channel", STRING())
.column("app_key", STRING())
.column("send_time", TIMESTAMP(3))
.column("device_info", STRING())
.column("zone_type", STRING())
.column("id", STRING())
.column("user_agent", STRING())
.columnByMetadata("topic", STRING(), "topic", true) // Kafka 元数据字段
.watermark("create_time", "create_time - INTERVAL '5' SECOND") // 水印策略
.build();
createTableDescriptor(tableEnv, schema, topic, groupId, tableName);
}
}
......@@ -63,15 +63,17 @@ public class BaseKafkaToSqlCreateTable {
}else if(StringUtils.equals(kafkaTopic.getTopic(), TopicTypeEnum.ODS_USER_INVITATION.getTopic())) {
}else if(StringUtils.equals(kafkaTopic.getTopic(), TopicTypeEnum.SIMI_USER_LIST_TOPIC.getTopic())) {
KafkaBaseSchema.simiUserSchema(tableEnv,topic,groupId,"kafka_simi_user");
}else if(StringUtils.equals(kafkaTopic.getTopic(), TopicTypeEnum.ABROAD_SIMI_USER_LIST_TOPIC.getTopic())) {
KafkaBaseSchema.abroadSimiUserSchema(tableEnv,topic,groupId,"kafka_abroad_simi_user");
}else if(StringUtils.equals(kafkaTopic.getTopic(), TopicTypeEnum.OPEN_SIMI_API.getTopic())) {
}else if(StringUtils.equals(kafkaTopic.getTopic(), TopicTypeEnum.ODS_PC_EVENT_LOG.getTopic())) {
KafkaBaseSchema.pcEventLogSchema(tableEnv,topic,groupId,"kafka_pc_event_log");
PcEventLogTable.buildPcEventLogQuery(tableEnv);
}else if(StringUtils.equals(kafkaTopic.getTopic(), TopicTypeEnum.ODS_PC_COLLECT_LOG.getTopic())) {
KafkaBaseSchema.pcCollectLogSchema(tableEnv,topic,groupId,"kafka_pc_collect_log");
PcCollectLogTable.buildPcCollectLogQuery(tableEnv);
}else if(StringUtils.equals(kafkaTopic.getTopic(), TopicTypeEnum.ODS_COMMUNITY_HISTORY.getTopic())) {
}else if(StringUtils.equals(kafkaTopic.getTopic(), TopicTypeEnum.ODS_SYS_LOG.getTopic())) {
......
......@@ -81,7 +81,7 @@ public class CollectLogTable {
"FROM kafka_collect_log AS k " +
"LEFT JOIN LATERAL TABLE(ParseCollectList(app_type,app_key,other_info,env_info,device_info,user_properties)) AS t(device_id_v1, user_info) ON TRUE"
);
Table result = tableEnv.sqlQuery("SELECT * FROM collect_log_view");
result.execute().print();
// Table result = tableEnv.sqlQuery("SELECT * FROM collect_log_view");
// result.execute().print();
}
}
package com.flink.achieve.table.sql;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
import com.flink.achieve.table.udf.ParsePcDeviceInfoUDTF;
/**
* @author wjs
* @version 创建时间:2025-8-25 15:02:42
* 类说明
*/
public class PcCollectLogTable {
public static void buildPcCollectLogQuery(StreamTableEnvironment tableEnv) {
//添加UDF
tableEnv.createTemporarySystemFunction("ParsePcDeviceInfo", ParsePcDeviceInfoUDTF.class);
//查询sql数据 并执行
tableEnv.executeSql(
"CREATE VIEW pc_collect_log_view AS " +
"SELECT " +
" k.id, " +
" k.user_agent, " +
" k.zone_type, " +
" k.send_time, " +
" k.app_key, " +
" k.app_channel, " +
" k.zone_code, " +
" k.zone_name, " +
" k.send_type, " +
" k.create_time, " +
" k.app_version, " +
" k.app_type, " +
" t.pc_collect_info['cid'] AS cid, " +
" t.pc_collect_info['phone'] AS phone, " +
" t.pc_collect_info['nick'] AS nick, " +
" t.pc_collect_info['b2'] AS brand, " +
" t.pc_collect_info['b3'] AS `model`, " +
" t.pc_collect_info['i7'] AS serial_num, " +
" t.pc_collect_info['i8'] AS device_id " +
"FROM kafka_pc_collect_log AS k " +
"LEFT JOIN LATERAL TABLE(ParsePcDeviceInfo(device_info)) AS t(pc_collect_info) ON TRUE"
);
// Table result = tableEnv.sqlQuery("SELECT * FROM pc_collect_log_view");
// result.execute().print();
}
}
package com.flink.achieve.table.sql;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
import com.flink.achieve.table.udf.ParsePcEventInfoUDTF;
/**
* @author wjs
* @version 创建时间:2025-8-25 15:02:28
* 类说明
*/
public class PcEventLogTable {
public static void buildPcEventLogQuery(StreamTableEnvironment tableEnv) {
//添加UDF
tableEnv.createTemporarySystemFunction("ParsePcEventInfo", ParsePcEventInfoUDTF.class);
//查询sql数据 并执行
tableEnv.executeSql(
"CREATE VIEW pc_event_log_view AS " +
"SELECT " +
" k.id, " +
" k.user_agent, " +
" k.zone_type, " +
" k.send_time, " +
" k.app_key, " +
" k.app_channel, " +
" k.zone_code, " +
" k.zone_name, " +
" k.send_type, " +
" k.create_time, " +
" k.app_version, " +
" k.app_type, " +
" t.pc_event_info['cid'] AS cid, " +
" t.pc_event_info['phone'] AS phone, " +
" t.pc_event_info['nick'] AS nick, " +
" t.pc_event_info['s1'] AS route_ip, " +
" t.pc_event_info['i8'] AS device_id, " +
" t.pc_event_info['i7'] AS serial_num, " +
" t.r9 AS event_time, " +
" t.pc_event_info['r4'] AS event_type, " +
" t.pc_event_info['r3'] AS event_value " +
"FROM kafka_pc_event_log AS k " +
"LEFT JOIN LATERAL TABLE(ParsePcEventInfo(event_info)) AS t(r9,pc_event_info) ON TRUE"
);
// Table result = tableEnv.sqlQuery("SELECT * FROM pc_event_log_view");
// result.execute().print();
}
}
package com.flink.achieve.table.udf;
import java.util.HashMap;
import java.util.Map;
import org.apache.commons.lang3.StringUtils;
import org.apache.flink.table.annotation.DataTypeHint;
import org.apache.flink.table.annotation.FunctionHint;
import org.apache.flink.table.functions.TableFunction;
import org.apache.flink.types.Row;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import com.alibaba.fastjson.JSONObject;
import com.alibaba.fastjson.TypeReference;
import com.flink.vo.PcDeviceInfo;
/**
* @author wjs
* @version 创建时间:2025-8-25 15:05:29
* 类说明
*/
@FunctionHint(output = @DataTypeHint("ROW<pc_collect_info MAP<STRING,STRING>>"))
public class ParsePcDeviceInfoUDTF extends TableFunction<Row>{
/**
*
*/
private static final long serialVersionUID = 1L;
private static final Logger logger = LoggerFactory.getLogger(ParsePcDeviceInfoUDTF.class);
public void eval(String device_info) {
if (StringUtils.isEmpty(device_info)) {
return;
}
PcDeviceInfo pcDeviceInfo = JSONObject.parseObject(device_info, new TypeReference<PcDeviceInfo>() {
});
if (null == pcDeviceInfo) {
return;
}
Map<String, Object> collectParams = new HashMap<>();
collectParams.put("cid", pcDeviceInfo.getCid());
collectParams.put("phone", pcDeviceInfo.getPhone());
collectParams.put("nick", pcDeviceInfo.getNick());
collectParams.put("b2", pcDeviceInfo.getB2());
collectParams.put("b3", pcDeviceInfo.getB3());
collectParams.put("i7", pcDeviceInfo.getI7());
collectParams.put("i8", pcDeviceInfo.getI8());
if(null != collectParams) {
collect(Row.of(
collectParams
));
}
}
}
package com.flink.achieve.table.udf;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import org.apache.commons.collections.CollectionUtils;
import org.apache.commons.lang3.StringUtils;
import org.apache.flink.table.annotation.DataTypeHint;
import org.apache.flink.table.annotation.FunctionHint;
import org.apache.flink.table.functions.TableFunction;
import org.apache.flink.types.Row;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import com.alibaba.fastjson.JSONObject;
import com.alibaba.fastjson.TypeReference;
import com.flink.util.TimeConvertUtil;
import com.flink.vo.PcEventInfo;
import com.flink.vo.PcProperties;
/**
* @author wjs
* @version 创建时间:2025-8-25 15:44:49
* 类说明
*/
@FunctionHint(output = @DataTypeHint("ROW<r9 STRING,pc_event_info MAP<STRING,STRING>>"))
public class ParsePcEventInfoUDTF extends TableFunction<Row>{
/**
*
*/
private static final long serialVersionUID = 1L;
private static final Logger logger = LoggerFactory.getLogger(ParsePcEventInfoUDTF.class);
public void eval(String event_info) {
if(StringUtils.isEmpty(event_info)) {
return;
}
PcEventInfo pcEventInfo = JSONObject.parseObject(event_info, new TypeReference<PcEventInfo>() {});
if(null == pcEventInfo) {
return;
}
List<PcProperties> properties = pcEventInfo.getProperties();
if (CollectionUtils.isEmpty(properties)) {
return;
}
for(PcProperties pcProperties : properties) {
Map<String, Object> eventParams = new HashMap<>();
if(null != eventParams) {
String eventTime = TimeConvertUtil.parseToStringSSS(pcProperties.getR9());
eventParams.put("cid", pcEventInfo.getCid());
eventParams.put("phone", pcEventInfo.getPhone());
eventParams.put("nick", pcEventInfo.getNick());
eventParams.put("s1", pcEventInfo.getS1());
eventParams.put("i8", pcEventInfo.getI8());
eventParams.put("i7", pcEventInfo.getI7());
eventParams.put("r4", pcProperties.getR4());
eventParams.put("r3", pcProperties.getR3());
collect(Row.of(
eventTime,
eventParams
));
}
}
}
}
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or sign in to comment