Commit b5902e5d by 魏建枢

支持 flink tabl sql 语法

parent 44507015
Showing with 806 additions and 143 deletions
......@@ -64,6 +64,18 @@
<dependencies>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-api-java-bridge</artifactId>
<version>1.20.0</version>
<scope>provided</scope>
<exclusions>
<exclusion>
<artifactId>flink-table-api-bridge-base</artifactId>
<groupId>org.apache.flink</groupId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-runtime-web</artifactId>
<version>1.20.0</version>
<scope>provided</scope>
......
......@@ -43,6 +43,12 @@
<artifactId>flink-table-api-java</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-api-java-bridge</artifactId>
<version>${flink.version}</version>
<scope>provided</scope>
</dependency>
<!-- json -->
<dependency>
<groupId>com.alibaba</groupId>
......
......@@ -7,19 +7,21 @@ import org.apache.commons.collections.CollectionUtils;
import org.apache.commons.lang3.StringUtils;
import org.apache.flink.api.common.io.ParseException;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import com.flink.common.MultipleSourceCommonBase;
import com.flink.common.SourceCommonBase;
import com.flink.enums.TopicTypeEnum;
import com.flink.vo.KafkaDataSource;
import com.flink.vo.KafkaTopic;
/**
* @author wjs
* @version 创建时间:2025-7-25 10:29:23
* 类说明
*/
public class CommonConsumeBaseAchi extends MultipleSourceCommonBase implements Serializable{
public class CommonConsumeBaseAchi extends SourceCommonBase implements Serializable{
/**
*
......@@ -29,7 +31,7 @@ public class CommonConsumeBaseAchi extends MultipleSourceCommonBase implements S
private static final Logger logger = LoggerFactory.getLogger(CommonConsumeBaseAchi.class);
@Override
public void parseSourceKafkaJson(List<KafkaDataSource> dataSourceList) throws ParseException, Exception {
public void parseMultipleSourceKafkaJson(List<KafkaDataSource> dataSourceList) throws ParseException, Exception {
if(CollectionUtils.isNotEmpty(dataSourceList)) {
for(KafkaDataSource kafkaDataSource : dataSourceList) {
if(StringUtils.equals(kafkaDataSource.getTopic(),TopicTypeEnum.ODS_USER_INVITATION.getTopic())) {
......@@ -58,6 +60,25 @@ public class CommonConsumeBaseAchi extends MultipleSourceCommonBase implements S
}
}
@Override
public void parseSourceKafkaJson(DataStreamSource<String> dataStreamSource) throws ParseException, Exception {
// TODO Auto-generated method stub
}
@Override
public void parseSourceKafkaToSqlTable(TopicTypeEnum topicTypeEnum, StreamTableEnvironment tableEnv)
throws ParseException, Exception {
// TODO Auto-generated method stub
}
@Override
public void parseMultipleSourceKafkaToSqlTable(List<KafkaTopic> dataSourceList, StreamTableEnvironment tableEnv)
throws ParseException, Exception {
// TODO Auto-generated method stub
}
}
......
......@@ -27,6 +27,7 @@ import com.flink.vo.RealBalance;
import com.flink.vo.RealKyc;
import com.flink.vo.RealLead;
import com.flink.vo.RealStaking;
import com.flink.vo.RealStakingEvent;
import com.flink.vo.RealTransaction;
import com.flink.vo.RealUsers;
......@@ -95,6 +96,14 @@ public class UserInvitationAchi implements Serializable {
private static final DataType[] REAL_STAKING_TYPES = { DataTypes.STRING(), DataTypes.STRING(), DataTypes.STRING(),
DataTypes.STRING(), DataTypes.STRING(), DataTypes.STRING(), DataTypes.STRING(), DataTypes.STRING(),
DataTypes.STRING(),DataTypes.STRING(), DataTypes.INT() };
//质押事件表配置
private static final String[] REAL_STAKING_EVENT_FIELDS = { "event_id", "tx_hash", "block_height","receipt_id", "block_timestamp", "from_account_id", "to_account_id",
"standard", "event", "version","amount","claim_amount","fee_amount","create_time","vault_id","post_time", DORIS_DELETE_SIGN };
private static final DataType[] REAL_STAKING_EVENT_TYPES = { DataTypes.STRING(), DataTypes.STRING(), DataTypes.STRING(),DataTypes.STRING(),DataTypes.STRING(),DataTypes.STRING(),DataTypes.STRING(),DataTypes.STRING(),
DataTypes.STRING(), DataTypes.STRING(), DataTypes.STRING(), DataTypes.STRING(), DataTypes.STRING(),
DataTypes.STRING(),DataTypes.STRING(),DataTypes.STRING(), DataTypes.INT() };
public static void userInvitation(DataStreamSource<String> dataStreamSource) {
// 初始化表配置
......@@ -104,6 +113,7 @@ public class UserInvitationAchi implements Serializable {
TableConfig balanceConfig = new TableConfig(BALANCE_FIELDS, BALANCE_TYPES, "bi.real_balance");
TableConfig leadConfig = new TableConfig(LEAD_FIELDS, LEAD_TYPES, "bi.real_lead_switch");
TableConfig realStakingConfig = new TableConfig(REAL_STAKING_FIELDS, REAL_STAKING_TYPES, "bi.real_staking");
TableConfig realStakingEventConfig = new TableConfig(REAL_STAKING_EVENT_FIELDS, REAL_STAKING_EVENT_TYPES, "bi.real_staking_event");
// 创建Doris Sink
DorisSink<RowData> dorisUsersSink = DorisConnector.sinkDoris(usersConfig.getFields(), usersConfig.getTypes(),
......@@ -118,6 +128,8 @@ public class UserInvitationAchi implements Serializable {
leadConfig.getTableName());
DorisSink<RowData> dorisRealStakingSink = DorisConnector.sinkDoris(realStakingConfig.getFields(), realStakingConfig.getTypes(),
realStakingConfig.getTableName());
DorisSink<RowData> dorisRealStakingEventSink = DorisConnector.sinkDoris(realStakingEventConfig.getFields(), realStakingEventConfig.getTypes(),
realStakingEventConfig.getTableName());
// 处理用户数据
// processDataStream(dataStreamSource, "realUsers", usersConfig, dorisUsersSink,(RowMapper<RealUsers>) UserInvitationAchi::mapToUsersRow);
......@@ -133,6 +145,8 @@ public class UserInvitationAchi implements Serializable {
processDataStream(dataStreamSource, "realLead", leadConfig, dorisLeadSink,(RowMapper<RealLead>) UserInvitationAchi::mapToLeadRow);
//处理质押数据
processDataStream(dataStreamSource, "realStaking", realStakingConfig, dorisRealStakingSink,(RowMapper<RealStaking>) UserInvitationAchi::mapToStakingRow);
//处理质押事件
processDataStream(dataStreamSource, "realStakingEvent", realStakingEventConfig, dorisRealStakingEventSink,(RowMapper<RealStakingEvent>) UserInvitationAchi::mapToStakingEventRow);
}
/**
......@@ -198,6 +212,7 @@ public class UserInvitationAchi implements Serializable {
case "realBalance": return RealBalance.class;
case "realLead": return RealLead.class;
case "realStaking": return RealStaking.class;
case "realStakingEvent": return RealStakingEvent.class;
default: throw new IllegalArgumentException("未知类型: " + type);
}
}
......@@ -311,6 +326,29 @@ public class UserInvitationAchi implements Serializable {
row.setField(10, DELETE_SIGN_VALUE);
return row;
}
// 质押事件 数据映射
private static RowData mapToStakingEventRow(Object item, int fieldCount) {
RealStakingEvent staking = (RealStakingEvent) item;
GenericRowData row = new GenericRowData(fieldCount);
row.setField(0, StringData.fromString(staking.getEvent_id()));
row.setField(1, StringData.fromString(staking.getTx_hash()));
row.setField(2, StringData.fromString(staking.getBlock_height()));
row.setField(3, StringData.fromString(staking.getReceipt_id()));
row.setField(4, StringData.fromString(staking.getBlock_timestamp()));
row.setField(5, StringData.fromString(staking.getFrom_account_id()));
row.setField(6, StringData.fromString(staking.getTo_account_id()));
row.setField(7, StringData.fromString(staking.getStandard()));
row.setField(8, StringData.fromString(staking.getEvent()));
row.setField(9, StringData.fromString(staking.getVersion()));
row.setField(10, StringData.fromString(staking.getAmount()));
row.setField(11, StringData.fromString(staking.getClaim_amount()));
row.setField(12, StringData.fromString(staking.getFee_amount()));
row.setField(13, StringData.fromString(staking.getCreate_time()));
row.setField(14, StringData.fromString(staking.getVault_id()));
row.setField(15, StringData.fromString(staking.getPost_time()));
row.setField(16, DELETE_SIGN_VALUE);
return row;
}
/**
* 行数据映射接口
......
......@@ -18,6 +18,7 @@ import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.table.api.DataTypes;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
import org.apache.flink.table.data.GenericRowData;
import org.apache.flink.table.data.RowData;
import org.apache.flink.table.data.StringData;
......@@ -30,7 +31,7 @@ import org.slf4j.LoggerFactory;
import com.alibaba.fastjson.JSONObject;
import com.alibaba.fastjson.TypeReference;
import com.flink.common.DorisConnector;
import com.flink.common.MultipleSourceCommonBase;
import com.flink.common.SourceCommonBase;
import com.flink.config.TableConfig;
import com.flink.enums.TopicTypeEnum;
import com.flink.processor.function.CollectLogJoinProcessor;
......@@ -44,6 +45,7 @@ import com.flink.vo.CollectLog;
import com.flink.vo.DeviceId;
import com.flink.vo.DeviceIdInfo;
import com.flink.vo.KafkaDataSource;
import com.flink.vo.KafkaTopic;
import com.flink.vo.PcCollectLog;
import com.flink.vo.PcDeviceInfo;
import com.flink.vo.Result;
......@@ -55,7 +57,7 @@ import com.flink.vo.UserProperties;
* @version 创建时间:2025-5-28 10:44:56
* 类说明
*/
public class DeviceIdLatestAchi extends MultipleSourceCommonBase implements Serializable{
public class DeviceIdLatestAchi extends SourceCommonBase implements Serializable{
/**
*
......@@ -65,7 +67,7 @@ public class DeviceIdLatestAchi extends MultipleSourceCommonBase implements Seri
private static final Logger logger = LoggerFactory.getLogger(DeviceIdLatestAchi.class);
@Override
public void parseSourceKafkaJson(List<KafkaDataSource> dataSourceList) throws ParseException, Exception {
public void parseMultipleSourceKafkaJson(List<KafkaDataSource> dataSourceList) throws ParseException, Exception {
//=================配置入库字段=========================================
TableConfig detailConfig = new TableConfig(
......@@ -497,4 +499,25 @@ public class DeviceIdLatestAchi extends MultipleSourceCommonBase implements Seri
row.setField(13, 0); // __DORIS_DELETE_SIGN__
return row;
}
@Override
public void parseSourceKafkaJson(DataStreamSource<String> dataStreamSource) throws ParseException, Exception {
// TODO Auto-generated method stub
}
@Override
public void parseSourceKafkaToSqlTable(TopicTypeEnum topicTypeEnum, StreamTableEnvironment tableEnv)
throws ParseException, Exception {
// TODO Auto-generated method stub
}
@Override
public void parseMultipleSourceKafkaToSqlTable(List<KafkaTopic> dataSourceList, StreamTableEnvironment tableEnv)
throws ParseException, Exception {
// TODO Auto-generated method stub
}
}
......@@ -12,6 +12,7 @@ import org.apache.doris.flink.sink.DorisSink;
import org.apache.flink.api.common.io.ParseException;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.table.api.DataTypes;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
import org.apache.flink.table.data.GenericRowData;
import org.apache.flink.table.data.RowData;
import org.apache.flink.table.data.StringData;
......@@ -24,8 +25,11 @@ import com.alibaba.fastjson.JSONObject;
import com.alibaba.fastjson.TypeReference;
import com.flink.common.DorisConnector;
import com.flink.common.SourceCommonBase;
import com.flink.enums.TopicTypeEnum;
import com.flink.util.CompareUtils;
import com.flink.util.ip2region.SearcherUtil;
import com.flink.vo.KafkaDataSource;
import com.flink.vo.KafkaTopic;
import com.flink.vo.OdsEventLog;
import com.flink.vo.UserProperties;
......@@ -167,7 +171,21 @@ public class EventIpConvertAchi extends SourceCommonBase implements Serializable
}
@Override
public void sendToSinkKafka(DataStreamSource<String> mStream) {
public void parseMultipleSourceKafkaJson(List<KafkaDataSource> dataSourceList) throws ParseException, Exception {
// TODO Auto-generated method stub
}
@Override
public void parseSourceKafkaToSqlTable(TopicTypeEnum topicTypeEnum, StreamTableEnvironment tableEnv)
throws ParseException, Exception {
// TODO Auto-generated method stub
}
@Override
public void parseMultipleSourceKafkaToSqlTable(List<KafkaTopic> dataSourceList, StreamTableEnvironment tableEnv)
throws ParseException, Exception {
// TODO Auto-generated method stub
}
......
......@@ -23,6 +23,7 @@ import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.functions.co.CoProcessFunction;
import org.apache.flink.table.api.DataTypes;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
import org.apache.flink.table.data.GenericRowData;
import org.apache.flink.table.data.RowData;
import org.apache.flink.table.data.StringData;
......@@ -35,7 +36,7 @@ import org.slf4j.LoggerFactory;
import com.alibaba.fastjson.JSONObject;
import com.alibaba.fastjson.TypeReference;
import com.flink.common.DorisConnector;
import com.flink.common.MultipleSourceCommonBase;
import com.flink.common.SourceCommonBase;
import com.flink.enums.TopicTypeEnum;
import com.flink.processor.function.LatestUserProcessFunction;
import com.flink.util.CompareUtils;
......@@ -43,6 +44,7 @@ import com.flink.util.TimeConvertUtil;
import com.flink.util.ip2region.SearcherUtil;
import com.flink.vo.EventIp;
import com.flink.vo.KafkaDataSource;
import com.flink.vo.KafkaTopic;
import com.flink.vo.OdsEventLog;
import com.flink.vo.PcEventInfo;
import com.flink.vo.PcOdsEventLog;
......@@ -55,7 +57,7 @@ import com.flink.vo.UserProperties;
* @version 创建时间:2025-5-26 14:40:49
* 类说明
*/
public class EventIpLatestAchi extends MultipleSourceCommonBase implements Serializable{
public class EventIpLatestAchi extends SourceCommonBase implements Serializable{
......@@ -67,7 +69,7 @@ public class EventIpLatestAchi extends MultipleSourceCommonBase implements Seria
private static final Logger logger = LoggerFactory.getLogger(EventIpConvertAchi.class);
@Override
public void parseSourceKafkaJson(List<KafkaDataSource> dataSourceList) throws ParseException, Exception {
public void parseMultipleSourceKafkaJson(List<KafkaDataSource> dataSourceList) throws ParseException, Exception {
//=================配置入库字段=========================================
String[] fields = {
......@@ -417,4 +419,25 @@ public class EventIpLatestAchi extends MultipleSourceCommonBase implements Seria
eventIp.setCreateTime(createTime);
return eventIp;
}
@Override
public void parseSourceKafkaJson(DataStreamSource<String> dataStreamSource) throws ParseException, Exception {
// TODO Auto-generated method stub
}
@Override
public void parseSourceKafkaToSqlTable(TopicTypeEnum topicTypeEnum, StreamTableEnvironment tableEnv)
throws ParseException, Exception {
// TODO Auto-generated method stub
}
@Override
public void parseMultipleSourceKafkaToSqlTable(List<KafkaTopic> dataSourceList, StreamTableEnvironment tableEnv)
throws ParseException, Exception {
// TODO Auto-generated method stub
}
}
......@@ -17,6 +17,7 @@ import org.apache.flink.streaming.api.functions.co.ProcessJoinFunction;
import org.apache.flink.streaming.api.windowing.assigners.GlobalWindows;
import org.apache.flink.streaming.api.windowing.evictors.TimeEvictor;
import org.apache.flink.streaming.api.windowing.triggers.ContinuousProcessingTimeTrigger;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
import org.apache.flink.util.Collector;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
......@@ -24,7 +25,7 @@ import org.slf4j.LoggerFactory;
import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONObject;
import com.alibaba.fastjson.TypeReference;
import com.flink.common.MultipleSourceCommonBase;
import com.flink.common.SourceCommonBase;
import com.flink.enums.OpenSimiApiTypeEnum;
import com.flink.enums.TopicTypeEnum;
import com.flink.processor.function.DistinctUserAggregator;
......@@ -34,6 +35,7 @@ import com.flink.util.TimeConvertUtil;
import com.flink.vo.DeviceId;
import com.flink.vo.DeviceRegistrationResult;
import com.flink.vo.KafkaDataSource;
import com.flink.vo.KafkaTopic;
import com.flink.vo.SimiUserInfo;
import com.flink.vo.simi.UserRegistrationReqDto;
......@@ -43,7 +45,7 @@ import com.flink.vo.simi.UserRegistrationReqDto;
* 类说明 注册检验
*/
//2. 主处理逻辑
public class RegistrationCheckAchi extends MultipleSourceCommonBase implements Serializable{
public class RegistrationCheckAchi extends SourceCommonBase implements Serializable{
/**
*
......@@ -53,7 +55,7 @@ public class RegistrationCheckAchi extends MultipleSourceCommonBase implements S
private static final Logger logger = LoggerFactory.getLogger(RegistrationCheckAchi.class);
@Override
public void parseSourceKafkaJson(List<KafkaDataSource> dataSourceList) throws ParseException, Exception {
public void parseMultipleSourceKafkaJson(List<KafkaDataSource> dataSourceList) throws ParseException, Exception {
DataStreamSource<String> openSimiApiStreamSource = null;
DataStreamSource<String> collectLogStreamSource = null;
DataStreamSource<String> pcCollectLogStreamSource = null;
......@@ -203,8 +205,25 @@ public class RegistrationCheckAchi extends MultipleSourceCommonBase implements S
// .print("同设备24小时注册人数统计");
}
@Override
public void parseSourceKafkaJson(DataStreamSource<String> dataStreamSource) throws ParseException, Exception {
// TODO Auto-generated method stub
}
@Override
public void parseSourceKafkaToSqlTable(TopicTypeEnum topicTypeEnum, StreamTableEnvironment tableEnv)
throws ParseException, Exception {
// TODO Auto-generated method stub
}
@Override
public void parseMultipleSourceKafkaToSqlTable(List<KafkaTopic> dataSourceList, StreamTableEnvironment tableEnv)
throws ParseException, Exception {
// TODO Auto-generated method stub
}
}
\ No newline at end of file
......@@ -30,6 +30,7 @@ import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.functions.co.CoProcessFunction;
import org.apache.flink.table.api.DataTypes;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
import org.apache.flink.table.data.GenericRowData;
import org.apache.flink.table.data.RowData;
import org.apache.flink.table.data.StringData;
......@@ -45,7 +46,7 @@ import com.flink.common.DorisConnector;
import com.flink.common.DynamicSqlBuilder;
import com.flink.common.DynamicSqlBuilder.SqlColumn;
import com.flink.common.DynamicSqlBuilder.SqlWithParams;
import com.flink.common.MultipleSourceCommonBase;
import com.flink.common.SourceCommonBase;
import com.flink.config.TableConfig;
import com.flink.enums.OpenSimiApiTypeEnum;
import com.flink.enums.TopicTypeEnum;
......@@ -55,6 +56,7 @@ import com.flink.processor.impl.OkHttpService;
import com.flink.util.TimeConvertUtil;
import com.flink.vo.EventIp;
import com.flink.vo.KafkaDataSource;
import com.flink.vo.KafkaTopic;
import com.flink.vo.SimiUserInfo;
import com.flink.vo.simi.FriendsStream;
import com.flink.vo.simi.InitiateFriendRequestReqDto;
......@@ -64,7 +66,7 @@ import com.flink.vo.simi.SimiFriends;
* @author wjs
* @version 创建时间:2025-5-29 10:53:48 类说明
*/
public class SimiFriendsAchi extends MultipleSourceCommonBase implements Serializable {
public class SimiFriendsAchi extends SourceCommonBase implements Serializable {
/**
*
......@@ -73,7 +75,7 @@ public class SimiFriendsAchi extends MultipleSourceCommonBase implements Seriali
private static final Logger logger = LoggerFactory.getLogger(SimiFriendsAchi.class);
@Override
public void parseSourceKafkaJson(List<KafkaDataSource> dataSourceList) throws ParseException, Exception {
public void parseMultipleSourceKafkaJson(List<KafkaDataSource> dataSourceList) throws ParseException, Exception {
// =================配置入库字段=========================================
// 明细表结构
TableConfig detailConfig = new TableConfig(
......@@ -775,4 +777,24 @@ public class SimiFriendsAchi extends MultipleSourceCommonBase implements Seriali
}
}
@Override
public void parseSourceKafkaJson(DataStreamSource<String> dataStreamSource) throws ParseException, Exception {
// TODO Auto-generated method stub
}
@Override
public void parseSourceKafkaToSqlTable(TopicTypeEnum topicTypeEnum, StreamTableEnvironment tableEnv)
throws ParseException, Exception {
// TODO Auto-generated method stub
}
@Override
public void parseMultipleSourceKafkaToSqlTable(List<KafkaTopic> dataSourceList, StreamTableEnvironment tableEnv)
throws ParseException, Exception {
// TODO Auto-generated method stub
}
}
......@@ -12,6 +12,7 @@ import org.apache.flink.api.common.io.ParseException;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.table.api.DataTypes;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
import org.apache.flink.table.data.GenericRowData;
import org.apache.flink.table.data.RowData;
import org.apache.flink.table.data.StringData;
......@@ -26,7 +27,10 @@ import com.alibaba.fastjson.TypeReference;
import com.flink.common.DorisConnector;
import com.flink.common.SourceCommonBase;
import com.flink.enums.OpenSimiApiTypeEnum;
import com.flink.enums.TopicTypeEnum;
import com.flink.processor.impl.OkHttpService;
import com.flink.vo.KafkaDataSource;
import com.flink.vo.KafkaTopic;
import com.flink.vo.simi.InitiateFriendRequestReqDto;
import com.flink.vo.simi.SimiGroups;
......@@ -111,7 +115,21 @@ public class SimiGroupstAchi extends SourceCommonBase implements Serializable{
}
@Override
public void sendToSinkKafka(DataStreamSource<String> mStream) {
public void parseMultipleSourceKafkaJson(List<KafkaDataSource> dataSourceList) throws ParseException, Exception {
// TODO Auto-generated method stub
}
@Override
public void parseSourceKafkaToSqlTable(TopicTypeEnum topicTypeEnum, StreamTableEnvironment tableEnv)
throws ParseException, Exception {
// TODO Auto-generated method stub
}
@Override
public void parseMultipleSourceKafkaToSqlTable(List<KafkaTopic> dataSourceList, StreamTableEnvironment tableEnv)
throws ParseException, Exception {
// TODO Auto-generated method stub
}
......
......@@ -38,6 +38,7 @@ import org.apache.flink.streaming.api.functions.async.RichAsyncFunction;
import org.apache.flink.streaming.api.functions.co.ProcessJoinFunction;
import org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows;
import org.apache.flink.table.api.DataTypes;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
import org.apache.flink.table.data.GenericRowData;
import org.apache.flink.table.data.RowData;
import org.apache.flink.table.data.StringData;
......@@ -51,13 +52,14 @@ import org.slf4j.LoggerFactory;
import com.alibaba.fastjson.JSONObject;
import com.alibaba.fastjson.TypeReference;
import com.flink.common.DorisConnector;
import com.flink.common.MultipleSourceCommonBase;
import com.flink.common.SourceCommonBase;
import com.flink.enums.TopicTypeEnum;
import com.flink.processor.function.LatestUserProcessFunction;
import com.flink.util.CompareUtils;
import com.flink.util.TimeConvertUtil;
import com.flink.vo.EventList;
import com.flink.vo.KafkaDataSource;
import com.flink.vo.KafkaTopic;
import com.flink.vo.OdsEventLog;
import com.flink.vo.PcEventInfo;
import com.flink.vo.PcOdsEventLog;
......@@ -75,7 +77,7 @@ import com.flink.vo.userDailyActivity.DailyActivityEventInfo;
* @version 创建时间:2025-7-31 11:02:52
* 类说明
*/
public class UserDailyActivityAchi extends MultipleSourceCommonBase implements Serializable{
public class UserDailyActivityAchi extends SourceCommonBase implements Serializable{
/**
*
......@@ -84,7 +86,7 @@ public class UserDailyActivityAchi extends MultipleSourceCommonBase implements S
private static final Logger logger = LoggerFactory.getLogger(UserDailyActivityAchi.class);
@Override
public void parseSourceKafkaJson(List<KafkaDataSource> dataSourceList) throws ParseException, Exception {
public void parseMultipleSourceKafkaJson(List<KafkaDataSource> dataSourceList) throws ParseException, Exception {
DataStreamSource<String> collectLogStreamSource = null;
DataStreamSource<String> eventStreamSource = null;
DataStreamSource<String> pcCollectLogStreamSource = null;
......@@ -723,5 +725,25 @@ public class UserDailyActivityAchi extends MultipleSourceCommonBase implements S
.assignTimestampsAndWatermarks(WatermarkStrategy.<DailyActivityEventInfo>forBoundedOutOfOrderness(Duration.ofSeconds(5))
.withTimestampAssigner((event, ts) -> event.getWaterMarkTime()));
}
@Override
public void parseSourceKafkaJson(DataStreamSource<String> dataStreamSource) throws ParseException, Exception {
// TODO Auto-generated method stub
}
@Override
public void parseSourceKafkaToSqlTable(TopicTypeEnum topicTypeEnum, StreamTableEnvironment tableEnv)
throws ParseException, Exception {
// TODO Auto-generated method stub
}
@Override
public void parseMultipleSourceKafkaToSqlTable(List<KafkaTopic> dataSourceList, StreamTableEnvironment tableEnv)
throws ParseException, Exception {
// TODO Auto-generated method stub
}
}
......@@ -20,6 +20,7 @@ import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.KeyedStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.table.api.DataTypes;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
import org.apache.flink.table.data.GenericRowData;
import org.apache.flink.table.data.RowData;
import org.apache.flink.table.data.StringData;
......@@ -33,7 +34,7 @@ import com.alibaba.fastjson.JSONArray;
import com.alibaba.fastjson.JSONObject;
import com.alibaba.fastjson.TypeReference;
import com.flink.common.DorisConnector;
import com.flink.common.MultipleSourceCommonBase;
import com.flink.common.SourceCommonBase;
import com.flink.config.TableConfig;
import com.flink.enums.AppTypeEnum;
import com.flink.enums.TopicTypeEnum;
......@@ -51,6 +52,7 @@ import com.flink.vo.CollectLogToJsonSource;
import com.flink.vo.EventList;
import com.flink.vo.EventLogToJsonSource;
import com.flink.vo.KafkaDataSource;
import com.flink.vo.KafkaTopic;
import com.flink.vo.OdsEventLog;
import com.flink.vo.UserProperties;
import com.flink.vo.android.deviceInfo.AndroidA1;
......@@ -61,7 +63,7 @@ import com.flink.vo.ios.IosDeviceInfo;
* @version 创建时间:2025-6-5 10:49:50
* 类说明
*/
public class VectorAngleCalculationAchi extends MultipleSourceCommonBase implements Serializable{
public class VectorAngleCalculationAchi extends SourceCommonBase implements Serializable{
/**
......@@ -72,7 +74,7 @@ public class VectorAngleCalculationAchi extends MultipleSourceCommonBase impleme
private static final Logger logger = LoggerFactory.getLogger(VectorAngleCalculationAchi.class);
@Override
public void parseSourceKafkaJson(List<KafkaDataSource> dataSourceList) throws ParseException, Exception {
public void parseMultipleSourceKafkaJson(List<KafkaDataSource> dataSourceList) throws ParseException, Exception {
DataStreamSource<String> collectLogStreamSource = null;
DataStreamSource<String> eventLogStreamSource = null;
if(CollectionUtils.isNotEmpty(dataSourceList)) {
......@@ -830,4 +832,24 @@ public class VectorAngleCalculationAchi extends MultipleSourceCommonBase impleme
}
}
@Override
public void parseSourceKafkaJson(DataStreamSource<String> dataStreamSource) throws ParseException, Exception {
// TODO Auto-generated method stub
}
@Override
public void parseSourceKafkaToSqlTable(TopicTypeEnum topicTypeEnum, StreamTableEnvironment tableEnv)
throws ParseException, Exception {
// TODO Auto-generated method stub
}
@Override
public void parseMultipleSourceKafkaToSqlTable(List<KafkaTopic> dataSourceList, StreamTableEnvironment tableEnv)
throws ParseException, Exception {
// TODO Auto-generated method stub
}
}
\ No newline at end of file
package com.flink.achieve.table;
import java.io.Serializable;
import java.util.List;
import org.apache.flink.api.common.io.ParseException;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import com.flink.common.SourceCommonBase;
import com.flink.enums.TopicTypeEnum;
import com.flink.processor.impl.ParseEventListUDTF;
import com.flink.vo.KafkaDataSource;
import com.flink.vo.KafkaTopic;
/**
* @author wjs
* @version 创建时间:2025-8-18 15:04:49
* 类说明
*/
public class RegistrationCheckAchi extends SourceCommonBase implements Serializable{
/**
*
*/
private static final long serialVersionUID = 1L;
private static final Logger logger = LoggerFactory.getLogger(RegistrationCheckAchi.class);
@Override
public void parseSourceKafkaJson(DataStreamSource<String> dataStreamSource) throws ParseException, Exception {
// TODO Auto-generated method stub
}
@Override
public void parseSourceKafkaToSqlTable(TopicTypeEnum topicTypeEnum, StreamTableEnvironment tableEnv)
throws ParseException, Exception {
// TODO Auto-generated method stub
}
@Override
public void parseMultipleSourceKafkaJson(List<KafkaDataSource> dataSourceList) throws ParseException, Exception {
// TODO Auto-generated method stub
}
@Override
public void parseMultipleSourceKafkaToSqlTable(List<KafkaTopic> dataSourceList, StreamTableEnvironment tableEnv) throws Exception {
// 1. 时区与配置优化
tableEnv.getConfig()
.set("table.local-time-zone", "Asia/Shanghai")
.set("parallelism.default", "4");
tableEnv.createTemporarySystemFunction("ParseEventList", ParseEventListUDTF.class);
// 2. 正确定义 Kafka 源表结构
tableEnv.executeSql(
"CREATE TABLE kafka_event_log (" +
" `app_type` STRING, " +
" `unique_id` STRING, " +
" `create_time` TIMESTAMP(3), " +
" `device_id` STRING, " +
" `user_properties` STRING, " +
" `event_list` STRING," +
" `zone_name` STRING, " +
" `strategy_group_id` STRING, " +
" `flume_type` STRING, " +
" `zone_code` STRING, " +
" `app_channel` STRING, " +
" `uid` STRING, " +
" `send_time` TIMESTAMP(3), " +
" `app_key` STRING, " +
" `strategy_version` STRING, " +
" `sdk_version` STRING, " +
" `zone_type` STRING, " +
" `id` STRING, " +
" `user_agent` STRING, " +
" `topic` STRING METADATA FROM 'topic' VIRTUAL, " +
" WATERMARK FOR create_time AS create_time - INTERVAL '5' SECOND" +
") WITH (" +
" 'connector' = 'kafka'," +
" 'topic' = 'ods_event_log'," +
" 'properties.bootstrap.servers' = '168.138.185.142:9092,213.35.103.223:9092,129.150.49.247:9092'," +
" 'properties.group.id' = 'odsEventLog'," +
" 'scan.startup.mode' = 'latest-offset'," +
" 'format' = 'json'," +
" 'json.ignore-parse-errors' = 'true'" +
")"
);
Table result = tableEnv.sqlQuery(
"SELECT " +
" k.app_type, " +
" k.unique_id, " +
" k.create_time, " +
" k.device_id, " +
" k.zone_name, " +
" k.strategy_group_id, " +
" k.flume_type, " +
" k.zone_code, " +
" k.app_channel, " +
" k.uid, " +
" k.send_time, " +
" k.app_key, " +
" k.strategy_version, " +
" k.sdk_version, " +
" k.zone_type, " +
" k.id, " +
" k.user_agent, " +
" t.r7 AS r7, " +
" t.r8_r2 AS r2, " +
" t.r8_r3 AS r3, " +
" t.r8_s2['cid'] AS cid, " + // 直接访问Map
" t.r8_s2['phone'] AS phone, " +
" t.r8_s2['nick'] AS nick, " +
" t.r9 AS r9 " +
"FROM kafka_event_log AS k " +
"LEFT JOIN LATERAL TABLE(ParseEventList(event_list,user_properties)) AS t(r7, r8_r2, r8_r3, r8_r6, r8_s2, r9) ON TRUE"
);
result.execute().print();
}
}
package com.flink.common;
import java.util.ArrayList;
import java.util.List;
import org.apache.commons.collections.CollectionUtils;
import org.apache.flink.api.common.io.ParseException;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import com.flink.enums.JobTypeEnum;
import com.flink.enums.TopicTypeEnum;
import com.flink.vo.KafkaDataSource;
import com.flink.vo.KafkaTopic;
/**
* @author wjs
......@@ -17,32 +24,104 @@ import com.flink.enums.TopicTypeEnum;
public abstract class SourceCommonBase {
private static final Logger logger = LoggerFactory.getLogger(SourceCommonBase.class);
public void handleDataStreamSource(JobTypeEnum jobTypeEnum,TopicTypeEnum topicTypeEnum) throws Exception {
//1. 环境的设置
StreamExecutionEnvironment env = EnvironmentSettings.environmentSettings(jobTypeEnum.getCode());
logger.info("1. 环境的设置成功");
//2.资源配置文件信息的获取
DataStreamSource<String> dataStreamSource = KafkaConnector.sourceKafka(env, topicTypeEnum.getTopic(), topicTypeEnum.getGroup());
logger.info("2.资源配置文件信息的获取成功");
//3.Kafka资源ETL
parseSourceKafkaJson(dataStreamSource);
logger.info("3.Kafka资源ETL操作成功");
env.execute(jobTypeEnum.getDescription());
}
/**
* 统一执行入口(兼容三种API)
* @param jobType
* @param useTableAPI
*/
public void executeJob(JobTypeEnum jobTypeEnum,TopicTypeEnum topicTypeEnum,boolean useTableAPI,boolean useStreamAPI) throws Exception {
//1. 环境的设置
StreamExecutionEnvironment env = StreamEnvironmentSettings.createStreamEnv(jobTypeEnum.getCode());
logger.info("1. 环境的设置成功");
if(useTableAPI) {
//2.table环境设置
StreamTableEnvironment tableEnv = StreamEnvironmentSettings.createTableEnv(env, jobTypeEnum.getCode(),false);
logger.info("2. table的环境设置成功");
//3.资源配置文件信息的获取
parseSourceKafkaToSqlTable(topicTypeEnum, tableEnv);
logger.info("3.Kafka资源ETL操作成功");
}
if(useStreamAPI){
//2.资源配置文件信息的获取
DataStreamSource<String> dataStreamSource = KafkaConnector.sourceKafka(env, topicTypeEnum.getTopic(), topicTypeEnum.getGroup());
logger.info("2.资源配置文件信息的获取成功");
//3.Kafka资源ETL
parseSourceKafkaJson(dataStreamSource);
logger.info("3.Kafka资源ETL操作成功");
}
env.execute(jobTypeEnum.getDescription());
}
/**
* 统一执行入口(兼容三种API)
* @param jobType
* @param useTableAPI
*/
public void multipleExecuteJob(List<KafkaTopic> kafkaTopicList,JobTypeEnum jobName,boolean useTableAPI,boolean useStreamAPI) throws Exception {
//1. 环境的设置
StreamExecutionEnvironment env = StreamEnvironmentSettings.createStreamEnv(jobName.getCode());
logger.info("1. 环境的设置成功");
if(useTableAPI) {
//2.table环境设置
StreamTableEnvironment tableEnv = StreamEnvironmentSettings.createTableEnv(env, jobName.getCode(),false);
logger.info("2. table的环境设置成功");
//3.资源配置文件信息的获取
parseMultipleSourceKafkaToSqlTable(kafkaTopicList,tableEnv);
}
if(useStreamAPI){
//2.资源配置文件信息的获取
List<KafkaDataSource> dataSourceList = new ArrayList<>();
if(CollectionUtils.isNotEmpty(kafkaTopicList)) {
for(KafkaTopic kafkaTopic : kafkaTopicList) {
KafkaDataSource kafkaDataSource = new KafkaDataSource();
String topic = kafkaTopic.getTopic();
String group = kafkaTopic.getGroup();
DataStreamSource<String> dataStreamSource = KafkaConnector.sourceKafka(env, topic, group);
kafkaDataSource.setDataStreamSource(dataStreamSource);
kafkaDataSource.setTopic(topic);
dataSourceList.add(kafkaDataSource);
}
}
logger.info("2.资源配置文件信息的获取成功");
//3.Kafka资源ETL
parseMultipleSourceKafkaJson(dataSourceList);
logger.info("3.Kafka资源ETL操作成功");
}
env.execute(jobName.getDescription());
}
/**
* 解析JSON数据(抽象方法的设置)
* @param record
* @return
* @param dataStreamSource
* @throws ParseException
* @throws Exception
* @throws Exception
*/
public abstract void parseSourceKafkaJson(DataStreamSource<String> dataStreamSource) throws ParseException, Exception;
/**
* 将处理完之后的数据发往 kafka 队列 供下游计算使用(抽象方法的设置)
* @param mStream
* 解析Kafka数据成sql表(抽象方法的设置)
* @param topicTypeEnum
* @param tableEnv
* @throws ParseException
* @throws Exception
*/
public abstract void sendToSinkKafka(DataStreamSource<String> mStream);
public abstract void parseSourceKafkaToSqlTable(TopicTypeEnum topicTypeEnum, StreamTableEnvironment tableEnv) throws ParseException, Exception;
/**
* 解析JSON数据(抽象方法的设置)
* @param dataSourceList
* @throws ParseException
* @throws Exception
*/
public abstract void parseMultipleSourceKafkaJson(List<KafkaDataSource> dataSourceList) throws ParseException, Exception;
/**
* 解析Kafka数据成sql表(抽象方法的设置)
* @param dataSourceList
* @param tableEnv
* @throws ParseException
* @throws Exception
*/
public abstract void parseMultipleSourceKafkaToSqlTable(List<KafkaTopic> dataSourceList, StreamTableEnvironment tableEnv) throws ParseException, Exception;
}
package com.flink.common;
import java.time.Duration;
import java.time.ZoneId;
import org.apache.flink.api.common.RuntimeExecutionMode;
import org.apache.flink.configuration.CheckpointingOptions;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.ExternalizedCheckpointRetention;
import org.apache.flink.configuration.MemorySize;
import org.apache.flink.configuration.StateBackendOptions;
import org.apache.flink.configuration.TaskManagerOptions;
import org.apache.flink.core.execution.CheckpointingMode;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.EnvironmentSettings;
import org.apache.flink.table.api.TableConfig;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
import org.apache.flink.table.api.config.TableConfigOptions;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import com.flink.util.LoadPropertiesFile;
/**
* @author wjs
* @version 创建时间:2025-4-23 14:36:34
* 类说明 https://www.bookstack.cn/read/flink-1.20-zh/TryFlink.md
*/
public class StreamEnvironmentSettings {
private static final Logger logger = LoggerFactory.getLogger(StreamEnvironmentSettings.class);
// 创建基础流执行环境(DataStream API)
public static StreamExecutionEnvironment createStreamEnv(String jobType) {
logger.info("StreamEnvironmentSettings createStreamEnv start jobType:{}",jobType);
Configuration config = new Configuration();
// config.setString("parallelism.default", "4");
// config.setString("taskmanager.numberOfTaskSlots", "16");
// config.setString("taskmanager.memory.flink.size", "8192m");
// config.setString("taskmanager.memory.jvm-metaspace.size", "4096m");
// config.setString("state.backend", "filesystem");
// 指定检查点目录(必须是持久化存储路径,如 HDFS)
// config.setString("state.checkpoints.dir", "hdfs://140.245.112.44:8020/user/ck");
// 状态后端设为 RocksDB
config.set(StateBackendOptions.STATE_BACKEND, "rocksdb");
// 检查点存储为文件系统
config.set(CheckpointingOptions.CHECKPOINT_STORAGE, "filesystem");
// HDFS 路径
config.set(CheckpointingOptions.CHECKPOINTS_DIRECTORY, LoadPropertiesFile.getPropertyFileValues("hdfs.url")+jobType);
// 检查点间隔(默认未启用)
config.set(CheckpointingOptions.CHECKPOINTING_INTERVAL, Duration.ofMinutes(5));
// 检查点超时时间(默认10分钟)
config.set(CheckpointingOptions.CHECKPOINTING_TIMEOUT, Duration.ofMinutes(8));
// 最大并发检查点数(默认1)
config.set(CheckpointingOptions.MAX_CONCURRENT_CHECKPOINTS, 2);
// 容忍检查点失败次数(默认0,不允许失败)
config.set(CheckpointingOptions.TOLERABLE_FAILURE_NUMBER, 3);
// 非对齐检查点加速Barrier传播
config.set(CheckpointingOptions.ENABLE_UNALIGNED, true);
config.set(CheckpointingOptions.ALIGNED_CHECKPOINT_TIMEOUT, Duration.ofSeconds(30));
// 可选:指定保存点目录
// config.setString("state.savepoints.dir", "hdfs://140.245.112.44:8020/user/savepoints");
// config.setInteger("rest.port", 8081);
// StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironmentWithWebUI(config);
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(config);
env.getCheckpointConfig().disableCheckpointing();
// env.setParallelism(4); // 调整并行度
//=================启动服务=========================================
//开启flink的checkpoint功能:每隔5000ms启动一个检查点(设置checkpoint的声明周期)
//设置有且仅有一次模式 目前支持EXACTLY_ONCE/AT_LEAST_ONCE
env.enableCheckpointing(18000L, CheckpointingMode.EXACTLY_ONCE);
// env.getCheckpointConfig().setCheckpointingConsistencyMode(CheckpointingMode.EXACTLY_ONCE);
//checkpoint高级选项设置
//设置checkpoint的模式为exactly-once(这也是默认值)
//确保检查点之间至少有500ms间隔(即checkpoint的最小间隔)
env.getCheckpointConfig().setMinPauseBetweenCheckpoints(500);
//确保检查必须在1min之内完成,否则就会被丢弃掉(即checkpoint的超时时间)
env.getCheckpointConfig().setCheckpointInterval(30000); // 30秒检查点间隔
env.getCheckpointConfig().setCheckpointTimeout(60000);
//同一时间只允许操作一个检查点
env.getCheckpointConfig().setMaxConcurrentCheckpoints(1);
// 允许三个连续的 checkpoint 错误
env.getCheckpointConfig().setTolerableCheckpointFailureNumber(3); // 允许3次失败
// 在这个基础之上,添加快照
//开启在 job 中止后仍然保留的 externalizedcheckpoints 程序即使被cancel后,也会保留checkpoint数据,以便根据实际需要恢复到指定的checkpoint
env.getCheckpointConfig().setExternalizedCheckpointRetention(ExternalizedCheckpointRetention.RETAIN_ON_CANCELLATION);
//流处理模式
env.setRuntimeMode(RuntimeExecutionMode.STREAMING);
//开启checkpoints
env.getCheckpointConfig().enableUnalignedCheckpoints();
env.getCheckpointConfig().setAlignedCheckpointTimeout(Duration.ofSeconds(30));
env.disableOperatorChaining();
logger.info("StreamEnvironmentSettings createStreamEnv end jobType:{}",jobType);
return env;
}
//创建支持嵌套JSON的Table环境
public static StreamTableEnvironment createTableEnv(StreamExecutionEnvironment env, String jobType, boolean enableHiveCatalog) {
logger.info("StreamEnvironmentSettings createTableEnv start jobType:{}",jobType);
// Blink Planner配置
EnvironmentSettings settings = EnvironmentSettings
.newInstance() // 使用 Builder 模式
.inStreamingMode() // 流模式:inStreamingMode 批模式:inBatchMode
.build();
// TableEnvironment tEnv = TableEnvironment.create(settings);//代码中同时创建了 TableEnvironment和 StreamTableEnvironment,但两者功能重叠,通常只需保留 StreamTableEnvironment。
StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env, settings);
TableConfig tableConfig = tableEnv.getConfig();
//时区与状态管理
tableConfig.setLocalTimeZone(ZoneId.of("UTC"));
tableConfig.setIdleStateRetention(Duration.ofHours(48)); // 状态TTL
tableConfig.set(TableConfigOptions.TABLE_DYNAMIC_TABLE_OPTIONS_ENABLED, true);
//嵌套JSON支持配置
tableConfig.set(TableConfigOptions.TABLE_SQL_DIALECT, "DEFAULT"); // 或 "HIVE"
//动态加载生产配置
tableConfig.addConfiguration(getProductionConfig());
//按需注册 Hive Catalog(SQL方式)暂时用不上
// if (enableHiveCatalog) {
// tableEnv.executeSql(
// "CREATE CATALOG hive_catalog WITH (" +
// " 'type'='hive'," +
// " 'default-database'='default'," +
// " 'hive-conf-dir'='/path/to/hive-conf'" +
// ")"
// );
// tableEnv.useCatalog("hive_catalog");
// }
logger.info("StreamEnvironmentSettings createTableEnv end jobType:{}",jobType);
return tableEnv;
}
// 生产环境专用配置
private static Configuration getProductionConfig() {
Configuration prodConfig = new Configuration();
// 内存配置使用 MemorySize 类型
prodConfig.set(TaskManagerOptions.MEMORY_SEGMENT_SIZE, MemorySize.parse("32kb"));
prodConfig.set(TaskManagerOptions.NETWORK_MEMORY_MIN, MemorySize.parse("512mb"));
prodConfig.set(TaskManagerOptions.NETWORK_MEMORY_MAX, MemorySize.parse("512mb"));
prodConfig.set(TaskManagerOptions.TOTAL_PROCESS_MEMORY, MemorySize.ofMebiBytes(4096L));
prodConfig.set(TaskManagerOptions.NUM_TASK_SLOTS, 4);
return prodConfig;
}
}
\ No newline at end of file
......@@ -19,9 +19,11 @@ public class CommonConsumeBaseProcessor implements JobProcessor{
@Override
public void process() throws Exception {
new CommonConsumeBaseAchi().handleDataStreamSource(
new CommonConsumeBaseAchi().multipleExecuteJob(
createTopicList(),
JobTypeEnum.COMMON_CONSUME_BASE
JobTypeEnum.COMMON_CONSUME_BASE,
false,
true
);
}
......
......@@ -19,9 +19,11 @@ public class DeviceIdLatestProcessor implements JobProcessor{
@Override
public void process() throws Exception {
new DeviceIdLatestAchi().handleDataStreamSource(
new DeviceIdLatestAchi().multipleExecuteJob(
createTopicList(),
JobTypeEnum.DEVICE_ID_CID
JobTypeEnum.DEVICE_ID_CID,
false,
true
);
}
......
......@@ -14,9 +14,11 @@ public class EventIpConvertProcessor implements JobProcessor{
@Override
public void process() throws Exception {
new EventIpConvertAchi().handleDataStreamSource(
new EventIpConvertAchi().executeJob(
JobTypeEnum.EVENT_IP_CONVERT,
TopicTypeEnum.ODS_EVENT_LOG
TopicTypeEnum.ODS_EVENT_LOG,
false,
true
);
}
......
......@@ -19,9 +19,11 @@ public class EventIpLatestProcessor implements JobProcessor{
@Override
public void process() throws Exception {
new EventIpLatestAchi().handleDataStreamSource(
new EventIpLatestAchi().multipleExecuteJob(
createTopicList(),
JobTypeEnum.EVENT_IP_CONVERT_CID
JobTypeEnum.EVENT_IP_CONVERT_CID,
false,
true
);
}
......
package com.flink.processor.impl;
import java.util.HashMap;
import java.util.Map;
import org.apache.commons.lang3.StringUtils;
import org.apache.flink.table.annotation.DataTypeHint;
import org.apache.flink.table.annotation.FunctionHint;
import org.apache.flink.table.functions.TableFunction;
import org.apache.flink.types.Row;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONArray;
import com.alibaba.fastjson.JSONObject;
import com.flink.processor.function.UserPropertiesProcessor;
import com.flink.vo.UserProperties;
/**
* @author wjs
* @version 创建时间:2025-8-18 18:11:07
* 类说明
*/
@FunctionHint(output = @DataTypeHint("ROW<r7 STRING, r8_r2 INT, r8_r3 STRING, r8_r6 STRING, r8_s2 MAP<STRING,STRING>, r9 BIGINT>"))
public class ParseEventListUDTF extends TableFunction<Row>{
/**
*
*/
private static final long serialVersionUID = 1L;
private static final Logger logger = LoggerFactory.getLogger(ParseEventListUDTF.class);
public void eval(String jsonStr,String user_properties) {
try {
if (StringUtils.isEmpty(jsonStr)) return;
if(StringUtils.isEmpty(user_properties)) return;
UserProperties userProps = UserPropertiesProcessor.userPropertiesToJson(user_properties);
JSONArray eventArray = JSON.parseArray(jsonStr);
for (int i = 0; i < eventArray.size(); i++) {
JSONObject event = eventArray.getJSONObject(i);
String r7 = event.getString("r7");
long r9 = event.getLong("r9");
JSONObject r8 = event.getJSONObject("r8");
int r8_r2 = r8.getIntValue("r2");
String r8_r3 = r8.getString("r3");
String r6 = r8.getString("r6");
Map<String, String> userParams = new HashMap<>();
userParams.put("cid", userProps.getCid());
userParams.put("phone", userProps.getPhone());
userParams.put("nick", userProps.getNick());
//TODO 数据为空只做测试不做真实操作
// Map<String, String> s2Map = new HashMap<>();
// JSONArray s2Array = r8.getJSONArray("s2");
// if(null != s2Array) {
// for (int j = 0; j < s2Array.size(); j++) {
// JSONObject obj = s2Array.getJSONObject(j);
// // 遍历每个对象的唯一键值对
// for (String key : obj.keySet()) {
// s2Map.put(key, obj.getString(key));
// }
// }
// }
// 输出解析结果
collect(Row.of(
r7,
r8_r2,
r8_r3,
r6,
userParams,
r9
));
}
} catch (Exception e) {
logger.error("ParseEventListUDTF eval e:{}",e.toString());
}
}
}
......@@ -19,9 +19,11 @@ public class RegistrationCheckProcessor implements JobProcessor{
@Override
public void process() throws Exception {
new RegistrationCheckAchi().handleDataStreamSource(
new RegistrationCheckAchi().multipleExecuteJob(
createTopicList(),
JobTypeEnum.REGISTRATION_CHECK
JobTypeEnum.REGISTRATION_CHECK,
false,
true
);
}
......
......@@ -19,9 +19,11 @@ public class SimiFriendsProcessor implements JobProcessor{
@Override
public void process() throws Exception {
new SimiFriendsAchi().handleDataStreamSource(
new SimiFriendsAchi().multipleExecuteJob(
createTopicList(),
JobTypeEnum.SIMI_FRIENDS
JobTypeEnum.SIMI_FRIENDS,
false,
true
);
}
......
......@@ -14,9 +14,11 @@ public class SimiGroupstProcessor implements JobProcessor{
@Override
public void process() throws Exception {
new SimiGroupstAchi().handleDataStreamSource(
new SimiGroupstAchi().executeJob(
JobTypeEnum.SIMI_GROUPS,
TopicTypeEnum.OPEN_SIMI_API
TopicTypeEnum.OPEN_SIMI_API,
false,
true
);
}
......
......@@ -19,9 +19,11 @@ public class UserDailyActivityProcessor implements JobProcessor{
@Override
public void process() throws Exception {
new UserDailyActivityAchi().handleDataStreamSource(
new UserDailyActivityAchi().multipleExecuteJob(
createTopicList(),
JobTypeEnum.USER_DAILY_ACTIVITY
JobTypeEnum.USER_DAILY_ACTIVITY,
false,
true
);
}
......
......@@ -19,9 +19,11 @@ public class VectorAngleCalculationProcessor implements JobProcessor{
@Override
public void process() throws Exception {
new VectorAngleCalculationAchi().handleDataStreamSource(
new VectorAngleCalculationAchi().multipleExecuteJob(
createTopicList(),
JobTypeEnum.VECTOR_ANGLE_CALCULATION
JobTypeEnum.VECTOR_ANGLE_CALCULATION,
false,
true
);
}
......
......@@ -3,11 +3,16 @@ package com.flink.vo;
import java.io.Serializable;
import java.util.List;
import lombok.Data;
import lombok.ToString;
/**
* @author wjs
* @version 创建时间:2025-2-26 17:00:37
* 类说明
*/
@Data
@ToString
public class Properties implements Serializable{
/**
......@@ -20,6 +25,7 @@ public class Properties implements Serializable{
private String r4;
private String r5;
private List<String> r6;
private String s2;
private String data;
private Long startTime;
......@@ -28,84 +34,4 @@ public class Properties implements Serializable{
private String userId;
private String id;
private String type;
public String getR1() {
return r1;
}
public void setR1(String r1) {
this.r1 = r1;
}
public String getR2() {
return r2;
}
public void setR2(String r2) {
this.r2 = r2;
}
public String getR3() {
return r3;
}
public void setR3(String r3) {
this.r3 = r3;
}
public String getR4() {
return r4;
}
public void setR4(String r4) {
this.r4 = r4;
}
public String getR5() {
return r5;
}
public void setR5(String r5) {
this.r5 = r5;
}
public List<String> getR6() {
return r6;
}
public void setR6(List<String> r6) {
this.r6 = r6;
}
public String getData() {
return data;
}
public void setData(String data) {
this.data = data;
}
public Long getStartTime() {
return startTime;
}
public void setStartTime(Long startTime) {
this.startTime = startTime;
}
public String getTimeDifference() {
return timeDifference;
}
public void setTimeDifference(String timeDifference) {
this.timeDifference = timeDifference;
}
public Long getEndTime() {
return endTime;
}
public void setEndTime(Long endTime) {
this.endTime = endTime;
}
public String getUserId() {
return userId;
}
public void setUserId(String userId) {
this.userId = userId;
}
public String getId() {
return id;
}
public void setId(String id) {
this.id = id;
}
public String getType() {
return type;
}
public void setType(String type) {
this.type = type;
}
}
package com.flink.vo;
import java.io.Serializable;
import lombok.Data;
import lombok.ToString;
/**
* @author wjs
* @version 创建时间:2025-8-14 17:35:07
* 类说明
*/
@Data
@ToString
public class RealStakingEvent implements Serializable{/**
*
*/
private static final long serialVersionUID = 1L;
private String event_id;
private String tx_hash;
private String block_height;
private String receipt_id;
private String block_timestamp;
private String from_account_id;
private String to_account_id;
private String standard;
private String event;
private String version;
private String amount;
private String claim_amount;
private String fee_amount;
private String create_time;
private String vault_id;
private String post_time;
}
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or sign in to comment