Commit 93042482 by 魏建枢

采集用户行为宽表

parent 27bba8ff
...@@ -59,6 +59,10 @@ public class CommonConsumeBaseAchi extends SourceCommonBase implements Serializa ...@@ -59,6 +59,10 @@ public class CommonConsumeBaseAchi extends SourceCommonBase implements Serializa
DataStreamSource<String> exceptionEventStreamSource = kafkaDataSource.getDataStreamSource(); DataStreamSource<String> exceptionEventStreamSource = kafkaDataSource.getDataStreamSource();
ExceptionEventAchi.exceptionEvent(exceptionEventStreamSource); ExceptionEventAchi.exceptionEvent(exceptionEventStreamSource);
} }
if(StringUtils.equals(kafkaDataSource.getTopic(),TopicTypeEnum.ODS_COLLECT_USER_BEHAVIOR.getTopic())) {
DataStreamSource<String> collectUserBehaviorStreamSource = kafkaDataSource.getDataStreamSource();
CollectUserBehaviorAchi.collectUserBehavior(collectUserBehaviorStreamSource);
}
} }
}else { }else {
return; return;
......
package com.flink.achieve.table;
import java.io.Serializable;
import java.util.List;
import org.apache.flink.api.common.io.ParseException;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
import com.flink.common.SourceCommonBase;
import com.flink.enums.TopicTypeEnum;
import com.flink.util.SqlLoader;
import com.flink.vo.KafkaDataSource;
/**
* @author wjs
* @version 创建时间:2025-9-11 14:34:19
* 类说明
*/
public class InitCreateTempKafkaSqlTable extends SourceCommonBase implements Serializable{
/**
*
*/
private static final long serialVersionUID = 1L;
@Override
public void parseSourceKafkaJson(DataStreamSource<String> dataStreamSource) throws ParseException, Exception {
// TODO Auto-generated method stub
}
@Override
public void parseSourceKafkaToSqlTable(TopicTypeEnum topicTypeEnum, StreamTableEnvironment tableEnv,
SqlLoader loader) throws ParseException, Exception {
// TODO Auto-generated method stub
}
@Override
public void parseMultipleSourceKafkaJson(List<KafkaDataSource> dataSourceList) throws ParseException, Exception {
// TODO Auto-generated method stub
}
@Override
public void parseMultipleSourceKafkaToSqlTable(StreamExecutionEnvironment env, StreamTableEnvironment tableEnv,SqlLoader loader) throws ParseException, Exception {
String insertSql = loader.getSql("collect_user_behavior");
tableEnv.executeSql(insertSql);
}
}
...@@ -65,9 +65,11 @@ public class RegistrationCheckAchi extends SourceCommonBase implements Serializa ...@@ -65,9 +65,11 @@ public class RegistrationCheckAchi extends SourceCommonBase implements Serializa
// String sql = loader.getSql("registrationCheck"); // String sql = loader.getSql("registrationCheck");
// System.out.println("Join SQL:\n" + sql); // System.out.println("Join SQL:\n" + sql);
//执行SQL //执行SQL
Table result = tableEnv.sqlQuery("SELECT * FROM content_interaction_view"); // Table result = tableEnv.sqlQuery("SELECT * FROM kafka_collect_user_behavior");
result.execute().print(); Table kafkaTable = tableEnv.from("kafka_collect_user_behavior");
kafkaTable.execute().print();
} }
} }
package com.flink.achieve.table.schema;
import static org.apache.flink.table.api.DataTypes.STRING;
import static org.apache.flink.table.api.DataTypes.TIMESTAMP;
import java.util.UUID;
import org.apache.doris.flink.sink.writer.LoadConstants;
import org.apache.flink.table.api.Schema;
import org.apache.flink.table.api.TableDescriptor;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
import com.flink.util.LoadPropertiesFile;
/**
* @author wjs
* @version 创建时间:2025-9-11 15:08:07
* 类说明
*/
public class DorisBaseSchema {
//-----------------------------------------------------创建TableDescriptor--------------------------------------------------------------------------
/**
*
* @param tableEnv 表环境
* @param schema 表结构
* @param tempTableName tempTbaleName
* @param dbTableName bi.tableName
*/
public static void createTableDescriptor(StreamTableEnvironment tableEnv,Schema schema,String tempTableName,String dbTableName) {
String uniqueLabelPrefix = "doris_label_" + UUID.randomUUID().toString();
TableDescriptor descriptor = TableDescriptor.forConnector("doris")
.schema(schema)
.option("connector", "doris")
.option("fenodes", LoadPropertiesFile.getPropertyFileValues("doris.fe"))
.option("table.identifier", dbTableName)
.option("username", LoadPropertiesFile.getPropertyFileValues("doris.username"))
.option("password", "")
.option("sink.properties.format", LoadConstants.JSON)
.option("sink.properties.read_json_by_line", "true")
.option("sink.properties.strip_outer_array", "false")
.option("sink.enable-2pc", "true")
.option("sink.enable-delete", "true")
.option("sink.label-prefix", "doris_label_"+uniqueLabelPrefix)
// 以下是性能调优相关参数,可根据实际情况调整
.option("sink.buffer-flush.max-rows", "10000") // 重点修改:必须 >=10000
.option("sink.buffer-flush.interval", "10s")
.option("sink.buffer-size", "4194304") // 可调整为4MB
.option("sink.buffer-count", "3")
.option("sink.enable.batch-mode", "true") // 保持开启以优化性能
.option("sink.max-retries", "3")
// .option("sink.buffer-size", "4194304") // 缓冲区大小(字节),例如4MB
// .option("sink.buffer-count", "3") // 缓冲区条数
// .option("sink.buffer-flush.interval", "10s") // 刷新间隔(毫秒),例如10秒
// .option("sink.max-retries", "3") // 重试次数
.build();
tableEnv.createTemporaryTable(tempTableName, descriptor);
}
//-----------------------------------------------------定义 Doris源表结构并创建Schema--------------------------------------------------------------------------
public static void collectUserBehaviorSchema(StreamTableEnvironment tableEnv,String tempTableName,String dbTableName) {
Schema schema = Schema.newBuilder()
.column("id", STRING())
.column("device_id", STRING())
.column("app_type", STRING())
.column("app_key", STRING())
.column("cid", STRING())
.column("phone", STRING())
.column("event_type", STRING())
.column("event_time", TIMESTAMP(3))
.column("view_type", STRING())
.column("view_id", STRING())
.column("uid", STRING())
.column("nick", STRING())
.column("unique_id", STRING())
.column("create_time", TIMESTAMP(3))
.column("strategy_group_id", STRING())
.column("strategy_version", STRING())
.column("send_time", TIMESTAMP(3))
.column("app_channel", STRING())
.column("zone_code", STRING())
.column("zone_name", STRING())
.column("zone_type", STRING())
.column("sdk_version", STRING())
.column("user_agent", STRING())
.column("user_properties", STRING())
.column("route_ip", STRING())
.column("ip_name", STRING())
.column("area_name", STRING())
.column("device_id_v1", STRING())
.column("other_info", STRING())
.column("device_info", STRING())
.column("env_info", STRING())
.column("network_ip", STRING())
.column("network_area_name", STRING())
.column("network_model", STRING())
.column("phone_name", STRING())
.column("device_name", STRING())
.column("brand", STRING())
.column("device_model", STRING())
.column("os_release", STRING())
.column("app_version", STRING())
.column("platform", STRING())
.column("third_id", STRING())
.column("country_code", STRING())
.column("register_time", TIMESTAMP(3))
.column("user_state", STRING())
.column("user_head_url", STRING())
.column("content", STRING())
.column("screen_name", STRING())
.column("touch_pressure", STRING())
.column("draw_point", STRING())
.columnByMetadata("topic", STRING(), "topic", true) // Kafka 元数据字段
.watermark("create_time", "create_time - INTERVAL '5' SECOND") // 水印策略
.build();
createTableDescriptor(tableEnv, schema, tempTableName,dbTableName);
}
}
...@@ -91,30 +91,6 @@ public class KafkaBaseSchema { ...@@ -91,30 +91,6 @@ public class KafkaBaseSchema {
createTableDescriptor(tableEnv, schema, topic, groupId, tableName); createTableDescriptor(tableEnv, schema, topic, groupId, tableName);
} }
public static void sysLogSchema(StreamTableEnvironment tableEnv, String topic, String groupId, String tableName) {
Schema schema = Schema.newBuilder()
.column("method", STRING())
.column("create_time", TIMESTAMP(3))
.column("description", STRING())
.column("request_method", STRING())
.column("title", STRING())
.column("operate_url", STRING())
.column("operation_log_type", STRING())
.column("operate_name", STRING())
.column("request_body", STRING())
.column("success", STRING())
.column("operate_address", STRING())
.column("logging_type", STRING())
.column("browser", STRING())
.column("request_headers", STRING())
.column("system_os", STRING())
.column("id", STRING())
.columnByMetadata("topic", STRING(), "topic", true) // Kafka 元数据字段
.watermark("create_time", "create_time - INTERVAL '5' SECOND") // 水印策略
.build();
createTableDescriptor(tableEnv, schema, topic, groupId, tableName);
}
public static void simiUserSchema(StreamTableEnvironment tableEnv, String topic, String groupId, String tableName) { public static void simiUserSchema(StreamTableEnvironment tableEnv, String topic, String groupId, String tableName) {
Schema schema = Schema.newBuilder() Schema schema = Schema.newBuilder()
.column("nick", STRING()) .column("nick", STRING())
...@@ -208,4 +184,62 @@ public class KafkaBaseSchema { ...@@ -208,4 +184,62 @@ public class KafkaBaseSchema {
.build(); .build();
createTableDescriptor(tableEnv, schema, topic, groupId, tableName); createTableDescriptor(tableEnv, schema, topic, groupId, tableName);
} }
public static void collectUserBehaviorSchema(StreamTableEnvironment tableEnv, String topic, String groupId, String tableName) {
Schema schema = Schema.newBuilder()
.column("id", STRING())
.column("device_id", STRING())
.column("app_type", STRING())
.column("app_key", STRING())
.column("cid", STRING())
.column("phone", STRING())
.column("event_type", STRING())
.column("event_time", TIMESTAMP(3))
.column("view_type", STRING())
.column("view_id", STRING())
.column("uid", STRING())
.column("nick", STRING())
.column("unique_id", STRING())
.column("create_time", TIMESTAMP(3))
.column("strategy_group_id", STRING())
.column("strategy_version", STRING())
.column("send_time", TIMESTAMP(3))
.column("app_channel", STRING())
.column("zone_code", STRING())
.column("zone_name", STRING())
.column("zone_type", STRING())
.column("sdk_version", STRING())
.column("user_agent", STRING())
.column("user_properties", STRING())
.column("route_ip", STRING())
.column("ip_name", STRING())
.column("area_name", STRING())
.column("device_id_v1", STRING())
.column("other_info", STRING())
.column("device_info", STRING())
.column("env_info", STRING())
.column("network_ip", STRING())
.column("network_area_name", STRING())
.column("network_model", STRING())
.column("phone_name", STRING())
.column("device_name", STRING())
.column("brand", STRING())
.column("device_model", STRING())
.column("os_release", STRING())
.column("app_version", STRING())
.column("platform", STRING())
.column("third_id", STRING())
.column("country_code", STRING())
.column("register_time", TIMESTAMP(3))
.column("user_state", STRING())
.column("user_head_url", STRING())
.column("content", STRING())
.column("screen_name", STRING())
.column("touch_pressure", STRING())
.column("draw_point", STRING())
// .columnByMetadata("topic", STRING(), "topic", true) // Kafka 元数据字段
.watermark("create_time", "create_time - INTERVAL '5' SECOND") // 水印策略
.build();
createTableDescriptor(tableEnv, schema, topic, groupId, tableName);
}
} }
...@@ -6,6 +6,7 @@ import org.apache.commons.collections.CollectionUtils; ...@@ -6,6 +6,7 @@ import org.apache.commons.collections.CollectionUtils;
import org.apache.commons.lang3.StringUtils; import org.apache.commons.lang3.StringUtils;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment; import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
import com.flink.achieve.table.schema.DorisBaseSchema;
import com.flink.achieve.table.schema.KafkaBaseSchema; import com.flink.achieve.table.schema.KafkaBaseSchema;
import com.flink.enums.TopicTypeEnum; import com.flink.enums.TopicTypeEnum;
import com.flink.vo.KafkaTopic; import com.flink.vo.KafkaTopic;
...@@ -19,7 +20,7 @@ public class BaseKafkaToSqlCreateTable { ...@@ -19,7 +20,7 @@ public class BaseKafkaToSqlCreateTable {
public static void kafkaToSqlCreateTable(List<KafkaTopic> kafkaTopicList, StreamTableEnvironment tableEnv) { public static void kafkaToSqlCreateTable(List<KafkaTopic> kafkaTopicList, StreamTableEnvironment tableEnv) {
//1. 时区与配置优化 //1. 时区与配置优化
tableEnv.getConfig().set("table.local-time-zone", "Asia/Shanghai").set("parallelism.default", "4"); tableEnv.getConfig().set("table.local-time-zone", "Asia/Shanghai").set("parallelism.default", "1");
//2.定义 Kafka源表结构 //2.定义 Kafka源表结构
if(CollectionUtils.isNotEmpty(kafkaTopicList)) { if(CollectionUtils.isNotEmpty(kafkaTopicList)) {
...@@ -77,9 +78,9 @@ public class BaseKafkaToSqlCreateTable { ...@@ -77,9 +78,9 @@ public class BaseKafkaToSqlCreateTable {
PcCollectLogTable.buildPcCollectLogQuery(tableEnv); PcCollectLogTable.buildPcCollectLogQuery(tableEnv);
}else if(StringUtils.equals(kafkaTopic.getTopic(), TopicTypeEnum.ODS_COMMUNITY_HISTORY.getTopic())) { }else if(StringUtils.equals(kafkaTopic.getTopic(), TopicTypeEnum.ODS_COMMUNITY_HISTORY.getTopic())) {
}else if(StringUtils.equals(kafkaTopic.getTopic(), TopicTypeEnum.ODS_SYS_LOG.getTopic())) { }else if(StringUtils.equals(kafkaTopic.getTopic(), TopicTypeEnum.ODS_COLLECT_USER_BEHAVIOR.getTopic())) {
KafkaBaseSchema.sysLogSchema(tableEnv,topic,groupId,"kafka_sys_log"); KafkaBaseSchema.collectUserBehaviorSchema(tableEnv,topic,groupId,"kafka_collect_user_behavior");
SysLogTable.buildSysLogQuery(tableEnv); DorisBaseSchema.collectUserBehaviorSchema(tableEnv,"doris_collect_user_behavior","bi.collect_user_behavior");
} }
} }
} }
......
package com.flink.achieve.table.sql;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
import com.flink.achieve.table.udf.ParseSysLogListUDTF;
/**
* @author wjs
* @version 创建时间:2025-8-19 14:07:56
* 类说明
*/
public class SysLogTable {
public static void buildSysLogQuery(StreamTableEnvironment tableEnv) {
tableEnv.createTemporarySystemFunction("ParseSysLogList", ParseSysLogListUDTF.class);
//查询sql数据 并执行
tableEnv.executeSql(
"CREATE VIEW sys_log_view AS " +
"SELECT " +
" k.operate_address AS network_ip, " +
" t.collect_info['network_area_name'] AS network_area_name, " +
" t.collect_info['unique_id'] AS unique_id, " +
" t.collect_info['device_id'] AS device_id, " +
" t.collect_info['device_id_v1'] AS device_id_v1, " +
" t.collect_info['send_type'] AS send_type, " +
" t.collect_info['zone_name'] AS zone_name, " +
" t.collect_info['zone_code'] AS zone_code, " +
" t.collect_info['zone_type'] AS zone_type, " +
" t.collect_info['send_time'] AS send_time, " +
" t.collect_info['app_key'] AS app_key, " +
" t.collect_info['app_type'] AS app_type, " +
" t.collect_info['cid'] AS cid, " +
" t.collect_info['phone'] AS phone, " +
" t.collect_info['nick'] AS nick " +
"FROM kafka_sys_log AS k " +
"LEFT JOIN LATERAL TABLE(ParseSysLogList(operate_url,operate_address,request_body,request_headers)) AS t(collect_info) ON TRUE"
);
}
}
package com.flink.achieve.table.udf;
import java.util.Base64;
import java.util.HashMap;
import java.util.Map;
import org.apache.commons.lang3.StringUtils;
import org.apache.flink.table.annotation.DataTypeHint;
import org.apache.flink.table.annotation.FunctionHint;
import org.apache.flink.table.functions.TableFunction;
import org.apache.flink.types.Row;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONObject;
import com.alibaba.fastjson.TypeReference;
import com.flink.enums.SendTypeEnum;
import com.flink.processor.function.GenDeviceIdProcessor;
import com.flink.processor.function.UserPropertiesProcessor;
import com.flink.util.CompareUtils;
import com.flink.util.TimeConvertUtil;
import com.flink.util.des.EncryptUtil;
import com.flink.util.ip2region.SearcherUtil;
import com.flink.vo.DeviceIdInfo;
import com.flink.vo.PcDeviceInfo;
import com.flink.vo.PcEventInfo;
import com.flink.vo.UserProperties;
/**
* @author wjs
* @version 创建时间:2025-8-20 18:06:47
* 类说明
*/
@FunctionHint(output = @DataTypeHint("ROW<collect_info MAP<STRING,STRING>>"))
public class ParseSysLogListUDTF extends TableFunction<Row>{
/**
*
*/
private static final long serialVersionUID = 1L;
private static final Logger logger = LoggerFactory.getLogger(ParseSysLogListUDTF.class);
public void eval(String operate_url,String operate_address, String request_body,String request_headers) {
try {
//判断路径
if(!CompareUtils.stringExists(operate_url,
"/api/v1/event",
"/api/v1/pcEvent")) {
return;
}
if(StringUtils.isAllEmpty(request_body,request_headers)) {
return;
}
JSONObject jsonObjBody = JSON.parseObject(request_body);
JSONObject jsonObjHeaders = JSON.parseObject(request_headers);
//解密
String bdParams = jsonObjBody.getString("bd");
String hashDateKeyStr = jsonObjHeaders.getString("hk0").substring(0,48);
String appKey = jsonObjHeaders.getString("ak0");
String appType = jsonObjHeaders.getString("at0");
String sendTime = TimeConvertUtil.parseToStringSSS(jsonObjHeaders.getLongValue("st0"));
String zoneName = new String(StringUtils.isNotEmpty(jsonObjHeaders.getString("zn0"))?Base64.getDecoder().decode(jsonObjHeaders.getString("zn0")):null);
String zoneCode = jsonObjHeaders.getString("zc0");
String zoneType = jsonObjHeaders.getString("zt0");
String sendType = jsonObjHeaders.getString("st1");
//bd解密
if(StringUtils.isAllEmpty(bdParams,hashDateKeyStr)) {
return;
}
String reqData =EncryptUtil.desDecrypt(bdParams, hashDateKeyStr);
if(StringUtils.isEmpty(reqData)) {
return;
}
String networkAreaName = SearcherUtil.getCityInfoByFile(operate_address);
//解析请求body和headers
Map<String, Object> collectParams = new HashMap<>();
collectParams.put("send_type", sendType);
collectParams.put("zone_name", zoneName);
collectParams.put("zone_code", zoneCode);
collectParams.put("zone_type", zoneType);
collectParams.put("send_time", sendTime);
collectParams.put("app_key", appKey);
collectParams.put("app_type", appType);
collectParams.put("network_area_name", networkAreaName);
if(StringUtils.equals(operate_url, "/api/v1/pcEvent")) {
if(CompareUtils.stringExists(sendType,SendTypeEnum.DEVICE_INFO.getCode())) {
PcDeviceInfo pcDeviceInfo = JSONObject.parseObject(reqData, new TypeReference<PcDeviceInfo>() {});
String deviceId = pcDeviceInfo.getI8();
collectParams.put("unique_id", deviceId);
collectParams.put("device_id", deviceId);
collectParams.put("device_id_v1", deviceId);
collectParams.put("cid", pcDeviceInfo.getCid());
collectParams.put("phone", pcDeviceInfo.getPhone());
collectParams.put("nick", pcDeviceInfo.getNick());
}
if(CompareUtils.stringExists(sendType,SendTypeEnum.EVENT_INFO.getCode())) {
PcEventInfo pcEventInfo = JSONObject.parseObject(reqData, new TypeReference<PcEventInfo>() {});
if(null == pcEventInfo) {
return;
}
String deviceId = pcEventInfo.getI8();
collectParams.put("unique_id", deviceId);
collectParams.put("device_id", deviceId);
collectParams.put("device_id_v1", deviceId);
collectParams.put("cid", pcEventInfo.getCid());
collectParams.put("phone", pcEventInfo.getPhone());
collectParams.put("nick", pcEventInfo.getNick());
}
}else if(StringUtils.equals(operate_url, "/api/v1/event")) {
String deviceId = jsonObjHeaders.getString("di0");
JSONObject jsonObj = JSON.parseObject(reqData);
String other_info = jsonObj.getString("i1");
String device_info = jsonObj.getString("a1");
String env_info = jsonObj.getString("g1");
String user_properties = jsonObj.getString("s2");
String unique_id = jsonObj.getString("s3");
if(CompareUtils.stringExists(sendType,
SendTypeEnum.ALL.getCode(),
SendTypeEnum.DEVICE_INFO.getCode()
)) {
if(!StringUtils.isAllEmpty(device_info,env_info)) {
DeviceIdInfo deviceIdInfo = GenDeviceIdProcessor.genDeviceId(appType,appKey, other_info, device_info, env_info);
UserProperties userProperties = UserPropertiesProcessor.userPropertiesToJson(user_properties);
collectParams.put("unique_id", unique_id);
collectParams.put("device_id", deviceId);
collectParams.put("device_id_v1", deviceIdInfo.getDeviceIdV1());
collectParams.put("cid", userProperties.getCid());
collectParams.put("phone", userProperties.getPhone());
collectParams.put("nick", userProperties.getNick());
}
}
if(CompareUtils.stringExists(sendType,
SendTypeEnum.ALL.getCode(),
SendTypeEnum.EVENT_INFO.getCode()
)) {
UserProperties userProperties = UserPropertiesProcessor.userPropertiesToJson(user_properties);
collectParams.put("unique_id", unique_id);
collectParams.put("device_id", deviceId);
collectParams.put("device_id_v1", null);
collectParams.put("cid", userProperties.getCid());
collectParams.put("phone", userProperties.getPhone());
collectParams.put("nick", userProperties.getNick());
}
}
if(null != collectParams) {
collect(Row.of(
collectParams
));
}
} catch (Exception e) {
logger.error("ParseSysLogListUDTF eval e:{}",e.toString());
}
}
}
...@@ -40,7 +40,7 @@ public abstract class SourceCommonBase { ...@@ -40,7 +40,7 @@ public abstract class SourceCommonBase {
List<KafkaTopic> kafkaTopicList = Arrays.asList( List<KafkaTopic> kafkaTopicList = Arrays.asList(
new KafkaTopic(topicTypeEnum.getGroup(), topicTypeEnum.getTopic()) new KafkaTopic(topicTypeEnum.getGroup(), topicTypeEnum.getTopic())
); );
StreamTableEnvironment tableEnv = StreamEnvironmentSettings.createTableEnv(env, jobTypeEnum.getCode(),kafkaTopicList,false); StreamTableEnvironment tableEnv = StreamEnvironmentSettings.createTableEnv(env, jobTypeEnum,kafkaTopicList,false);
logger.info("2. table的环境设置成功"); logger.info("2. table的环境设置成功");
//3.资源配置文件信息的获取 //3.资源配置文件信息的获取
SqlLoader loader = new SqlLoader(); SqlLoader loader = new SqlLoader();
...@@ -65,13 +65,13 @@ public abstract class SourceCommonBase { ...@@ -65,13 +65,13 @@ public abstract class SourceCommonBase {
* @param jobType * @param jobType
* @param useTableAPI * @param useTableAPI
*/ */
public void multipleExecuteJob(List<KafkaTopic> kafkaTopicList,JobTypeEnum jobName,boolean useTableAPI,boolean useStreamAPI) throws Exception { public void multipleExecuteJob(List<KafkaTopic> kafkaTopicList,JobTypeEnum jobTypeEnum,boolean useTableAPI,boolean useStreamAPI) throws Exception {
//1. 环境的设置 //1. 环境的设置
StreamExecutionEnvironment env = StreamEnvironmentSettings.createStreamEnv(jobName.getCode()); StreamExecutionEnvironment env = StreamEnvironmentSettings.createStreamEnv(jobTypeEnum.getCode());
logger.info("1. 环境的设置成功"); logger.info("1. 环境的设置成功");
if(useTableAPI) { if(useTableAPI) {
//2.table环境设置 //2.table环境设置
StreamTableEnvironment tableEnv = StreamEnvironmentSettings.createTableEnv(env, jobName.getCode(),kafkaTopicList,false); StreamTableEnvironment tableEnv = StreamEnvironmentSettings.createTableEnv(env, jobTypeEnum,kafkaTopicList,false);
logger.info("2. table的环境设置成功"); logger.info("2. table的环境设置成功");
//3.资源配置文件信息的获取 //3.资源配置文件信息的获取
//初始化加载器 加载配置文件 //初始化加载器 加载配置文件
...@@ -97,8 +97,9 @@ public abstract class SourceCommonBase { ...@@ -97,8 +97,9 @@ public abstract class SourceCommonBase {
//3.Kafka资源ETL //3.Kafka资源ETL
parseMultipleSourceKafkaJson(dataSourceList); parseMultipleSourceKafkaJson(dataSourceList);
logger.info("3.Kafka资源ETL操作成功"); logger.info("3.Kafka资源ETL操作成功");
env.execute(jobTypeEnum.getDescription());
} }
env.execute(jobName.getDescription());
} }
/** /**
......
...@@ -21,6 +21,7 @@ import org.slf4j.Logger; ...@@ -21,6 +21,7 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
import com.flink.achieve.table.sql.BaseKafkaToSqlCreateTable; import com.flink.achieve.table.sql.BaseKafkaToSqlCreateTable;
import com.flink.enums.JobTypeEnum;
import com.flink.util.LoadPropertiesFile; import com.flink.util.LoadPropertiesFile;
import com.flink.vo.KafkaTopic; import com.flink.vo.KafkaTopic;
...@@ -103,8 +104,8 @@ public class StreamEnvironmentSettings { ...@@ -103,8 +104,8 @@ public class StreamEnvironmentSettings {
} }
//创建支持嵌套JSON的Table环境 //创建支持嵌套JSON的Table环境
public static StreamTableEnvironment createTableEnv(StreamExecutionEnvironment env, String jobType,List<KafkaTopic> kafkaTopicList, boolean enableHiveCatalog) { public static StreamTableEnvironment createTableEnv(StreamExecutionEnvironment env, JobTypeEnum jobTypeEnum,List<KafkaTopic> kafkaTopicList, boolean enableHiveCatalog) {
logger.info("StreamEnvironmentSettings createTableEnv start jobType:{}",jobType); logger.info("StreamEnvironmentSettings createTableEnv start jobType:{}",jobTypeEnum.getCode());
// Blink Planner配置 // Blink Planner配置
EnvironmentSettings settings = EnvironmentSettings EnvironmentSettings settings = EnvironmentSettings
.newInstance() // 使用 Builder 模式 .newInstance() // 使用 Builder 模式
...@@ -122,6 +123,7 @@ public class StreamEnvironmentSettings { ...@@ -122,6 +123,7 @@ public class StreamEnvironmentSettings {
tableConfig.set(TableConfigOptions.TABLE_SQL_DIALECT, "DEFAULT"); // 或 "HIVE" tableConfig.set(TableConfigOptions.TABLE_SQL_DIALECT, "DEFAULT"); // 或 "HIVE"
//动态加载生产配置 //动态加载生产配置
tableConfig.addConfiguration(getProductionConfig()); tableConfig.addConfiguration(getProductionConfig());
tableConfig.set("pipeline.name", jobTypeEnum.getDescription());
//按需注册 Hive Catalog(SQL方式)暂时用不上 //按需注册 Hive Catalog(SQL方式)暂时用不上
// if (enableHiveCatalog) { // if (enableHiveCatalog) {
// tableEnv.executeSql( // tableEnv.executeSql(
...@@ -136,7 +138,7 @@ public class StreamEnvironmentSettings { ...@@ -136,7 +138,7 @@ public class StreamEnvironmentSettings {
//获取kafka数据转为table表 //获取kafka数据转为table表
BaseKafkaToSqlCreateTable.kafkaToSqlCreateTable(kafkaTopicList, tableEnv); BaseKafkaToSqlCreateTable.kafkaToSqlCreateTable(kafkaTopicList, tableEnv);
logger.info("StreamEnvironmentSettings createTableEnv end jobType:{}",jobType); logger.info("StreamEnvironmentSettings createTableEnv end jobType:{}",jobTypeEnum.getCode());
return tableEnv; return tableEnv;
} }
...@@ -148,7 +150,7 @@ public class StreamEnvironmentSettings { ...@@ -148,7 +150,7 @@ public class StreamEnvironmentSettings {
prodConfig.set(TaskManagerOptions.NETWORK_MEMORY_MIN, MemorySize.parse("512mb")); prodConfig.set(TaskManagerOptions.NETWORK_MEMORY_MIN, MemorySize.parse("512mb"));
prodConfig.set(TaskManagerOptions.NETWORK_MEMORY_MAX, MemorySize.parse("512mb")); prodConfig.set(TaskManagerOptions.NETWORK_MEMORY_MAX, MemorySize.parse("512mb"));
prodConfig.set(TaskManagerOptions.TOTAL_PROCESS_MEMORY, MemorySize.ofMebiBytes(4096L)); prodConfig.set(TaskManagerOptions.TOTAL_PROCESS_MEMORY, MemorySize.ofMebiBytes(4096L));
prodConfig.set(TaskManagerOptions.NUM_TASK_SLOTS, 4); prodConfig.set(TaskManagerOptions.NUM_TASK_SLOTS, 1);
return prodConfig; return prodConfig;
} }
} }
\ No newline at end of file
...@@ -8,7 +8,7 @@ import com.flink.vo.KafkaTopic; ...@@ -8,7 +8,7 @@ import com.flink.vo.KafkaTopic;
* 类说明 * 类说明
*/ */
public enum TopicTypeEnum { public enum TopicTypeEnum {
DWD_SYS_LOG("dwd_sys_log","dwdSysLog"), DWD_SYS_LOG("dwd_sys_log","dwdSysLog"),//弃用
ODS_EVENT_LOG("ods_event_log","eventLogGroup"), ODS_EVENT_LOG("ods_event_log","eventLogGroup"),
ODS_NEW_COLLECT_LOG("ods_new_collect_log","odsNewCollectLog"), ODS_NEW_COLLECT_LOG("ods_new_collect_log","odsNewCollectLog"),
ODS_ZIPPER_STRATEGY("ods_zipper_strategy","odsZipperStrategy"), ODS_ZIPPER_STRATEGY("ods_zipper_strategy","odsZipperStrategy"),
...@@ -30,7 +30,7 @@ public enum TopicTypeEnum { ...@@ -30,7 +30,7 @@ public enum TopicTypeEnum {
ODS_PC_EVENT_LOG("ods_pc_event_log","odsPcEventLog"), ODS_PC_EVENT_LOG("ods_pc_event_log","odsPcEventLog"),
ODS_PC_COLLECT_LOG("ods_pc_collect_log","odsPcCollectLog"), ODS_PC_COLLECT_LOG("ods_pc_collect_log","odsPcCollectLog"),
ODS_COMMUNITY_HISTORY("ods_community_history","odsCommunityHistory"), ODS_COMMUNITY_HISTORY("ods_community_history","odsCommunityHistory"),
ODS_SYS_LOG("ods_sys_log","odsSysLog"), ODS_SYS_LOG("ods_sys_log","odsSysLog"),//弃用
ODS_COLLECT_USER_BEHAVIOR("ods_collect_user_behavior","odsCollectUserBehavior"), ODS_COLLECT_USER_BEHAVIOR("ods_collect_user_behavior","odsCollectUserBehavior"),
ODS_EXCEPTION_EVENT_TOPIC("ods_exception_event_topic","odsExceptionEventTopic"), ODS_EXCEPTION_EVENT_TOPIC("ods_exception_event_topic","odsExceptionEventTopic"),
......
...@@ -4,7 +4,7 @@ import java.util.Arrays; ...@@ -4,7 +4,7 @@ import java.util.Arrays;
import java.util.List; import java.util.List;
import java.util.stream.Collectors; import java.util.stream.Collectors;
import com.flink.achieve.table.TableSqlSinkKafkaAchi; import com.flink.achieve.table.InitCreateTempKafkaSqlTable;
import com.flink.enums.JobTypeEnum; import com.flink.enums.JobTypeEnum;
import com.flink.enums.TopicTypeEnum; import com.flink.enums.TopicTypeEnum;
import com.flink.processor.JobProcessor; import com.flink.processor.JobProcessor;
...@@ -19,7 +19,7 @@ public class CommonConsumeSqlBaseProcessor implements JobProcessor{ ...@@ -19,7 +19,7 @@ public class CommonConsumeSqlBaseProcessor implements JobProcessor{
@Override @Override
public void process() throws Exception { public void process() throws Exception {
new TableSqlSinkKafkaAchi().multipleExecuteJob( new InitCreateTempKafkaSqlTable().multipleExecuteJob(
createTopicList(), createTopicList(),
JobTypeEnum.COMMON_CONSUME_SQL_BASE, JobTypeEnum.COMMON_CONSUME_SQL_BASE,
true, true,
...@@ -29,7 +29,15 @@ public class CommonConsumeSqlBaseProcessor implements JobProcessor{ ...@@ -29,7 +29,15 @@ public class CommonConsumeSqlBaseProcessor implements JobProcessor{
private static List<KafkaTopic> createTopicList() { private static List<KafkaTopic> createTopicList() {
return Arrays.stream(new TopicTypeEnum[]{ return Arrays.stream(new TopicTypeEnum[]{
TopicTypeEnum.ODS_SYS_LOG // TopicTypeEnum.ODS_EVENT_LOG,
// TopicTypeEnum.ODS_NEW_COLLECT_LOG,
// TopicTypeEnum.SIMI_USER_LIST_TOPIC,
// TopicTypeEnum.ABROAD_SIMI_USER_LIST_TOPIC,
// TopicTypeEnum.OPEN_SIMI_API,
// TopicTypeEnum.ODS_PC_EVENT_LOG,
// TopicTypeEnum.ODS_PC_COLLECT_LOG,
// TopicTypeEnum.ODS_COMMUNITY_HISTORY,
TopicTypeEnum.ODS_COLLECT_USER_BEHAVIOR
}).map(TopicTypeEnum::createKafkaTopic) }).map(TopicTypeEnum::createKafkaTopic)
.collect(Collectors.toList()); .collect(Collectors.toList());
......
package com.flink.util; package com.flink.util;
import java.io.InputStream; import java.io.InputStream;
import java.io.Serializable;
import java.util.HashMap; import java.util.HashMap;
import java.util.Map; import java.util.Map;
...@@ -11,7 +12,11 @@ import org.apache.flink.shaded.jackson2.org.yaml.snakeyaml.Yaml; ...@@ -11,7 +12,11 @@ import org.apache.flink.shaded.jackson2.org.yaml.snakeyaml.Yaml;
* @version 创建时间:2025-8-26 14:55:56 * @version 创建时间:2025-8-26 14:55:56
* 类说明 * 类说明
*/ */
public class SqlLoader { public class SqlLoader implements Serializable{
/**
*
*/
private static final long serialVersionUID = 1L;
private final Map<String, String> sqlMap = new HashMap<>(); private final Map<String, String> sqlMap = new HashMap<>();
/** /**
......
...@@ -5,3 +5,8 @@ queries: ...@@ -5,3 +5,8 @@ queries:
event_log_view: event_log_view:
SELECT * FROM event_log_view SELECT * FROM event_log_view
collect_user_behavior:
INSERT INTO doris_collect_user_behavior
SELECT *
FROM kafka_collect_user_behavior
WHERE event_type NOT IN ('socket_event', 'socket_error', 'socket_time', 'refresh_token', 'all_time', 'enter_act', 'exit_act', 'show_act', 'activity_red_exposure')
\ No newline at end of file
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or sign in to comment