Commit 39eaeab9 by 魏建枢

增加测试

parent 83a2b89d
package com.flink.achieve.doris;
import java.io.Serializable;
import org.apache.flink.api.common.io.ParseException;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import com.flink.common.SourceCommonBase;
/**
* @author wjs
* @version 创建时间:2025-5-8 11:51:13
* 类说明
*/
public class RealBalanceAchi extends SourceCommonBase implements Serializable{
/**
*
*/
private static final long serialVersionUID = 1L;
private static final Logger logger = LoggerFactory.getLogger(RealBalanceAchi.class);
@Override
public void parseSourceKafkaJson(DataStreamSource<String> dataStreamSource) throws ParseException, Exception {
// TODO Auto-generated method stub
}
@Override
public void sendToSinkKafka(DataStreamSource<String> mStream) {
// TODO Auto-generated method stub
}
}
package com.flink.achieve.doris;
import java.io.Serializable;
import org.apache.flink.api.common.io.ParseException;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import com.flink.common.SourceCommonBase;
/**
* @author wjs
* @version 创建时间:2025-5-8 11:37:18
* 类说明
*/
public class RealKycAchi extends SourceCommonBase implements Serializable{
/**
*
*/
private static final long serialVersionUID = 1L;
private static final Logger logger = LoggerFactory.getLogger(RealKycAchi.class);
@Override
public void parseSourceKafkaJson(DataStreamSource<String> dataStreamSource) throws ParseException, Exception {
// TODO Auto-generated method stub
}
@Override
public void sendToSinkKafka(DataStreamSource<String> mStream) {
// TODO Auto-generated method stub
}
}
package com.flink.achieve.doris;
import java.io.Serializable;
import java.util.List;
import org.apache.commons.collections.CollectionUtils;
import org.apache.commons.lang3.StringUtils;
import org.apache.doris.flink.sink.DorisSink;
import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.common.io.ParseException;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.table.api.DataTypes;
import org.apache.flink.table.data.GenericRowData;
import org.apache.flink.table.data.RowData;
import org.apache.flink.table.data.StringData;
import org.apache.flink.table.types.DataType;
import org.apache.flink.util.Collector;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONObject;
import com.alibaba.fastjson.TypeReference;
import com.flink.common.DorisConnector;
import com.flink.common.SourceCommonBase;
import com.flink.vo.RealTransaction;
/**
* @author wjs
* @version 创建时间:2025-5-7 16:13:05
* 类说明
*/
public class RealTransactionAchi extends SourceCommonBase implements Serializable{
/**
*
*/
private static final long serialVersionUID = 1L;
private static final Logger logger = LoggerFactory.getLogger(RealTransactionAchi.class);
@Override
public void parseSourceKafkaJson(DataStreamSource<String> dataStreamSource) throws ParseException, Exception {
//=================配置入库字段=========================================
String[] fields = {
"id",
"sender",
"receiver",
"sender_id",
"receiver_id",
"symbol",
"amount",
"memo",
"stage",
"tx_type",
"receiver_contact",
"fee_mt",
"fee_amount",
"updated_at",
"created_at"
};
DataType[] types = {
DataTypes.INT(),
DataTypes.STRING(),
DataTypes.STRING(),
DataTypes.INT(),
DataTypes.INT(),
DataTypes.STRING(),
DataTypes.DOUBLE(),
DataTypes.STRING(),
DataTypes.STRING(),
DataTypes.STRING(),
DataTypes.STRING(),
DataTypes.STRING(),
DataTypes.STRING(),
DataTypes.STRING(),
DataTypes.STRING()
};
//=================流式处理=========================================
String tableName = "bi.real_transaction";
DorisSink<RowData> dorisSink = DorisConnector.sinkDoris(fields, types, tableName);
SingleOutputStreamOperator<RowData> rowDataStream = dataStreamSource.flatMap(
new FlatMapFunction<String, RowData>() {
private static final long serialVersionUID = 1L;
@Override
public void flatMap(String value, Collector<RowData> out) throws Exception {
try {
// 解析 Kafka 数据
List<RealTransaction> recordList = handleData(value);
if (CollectionUtils.isEmpty(recordList)) {
return;
}
// 将每个 RealTransaction 转换为 RowData 并输出
for (RealTransaction transaction : recordList) {
GenericRowData row = new GenericRowData(fields.length);
row.setField(0, transaction.getId()); // id: INT
row.setField(1, StringData.fromString(transaction.getSender())); // sender: STRING
row.setField(2, StringData.fromString(transaction.getReceiver())); // receiver: STRING
row.setField(3, transaction.getSender_id()); // sender_id: INT
row.setField(4, transaction.getReceiver_id()); // receiver_id: INT
row.setField(5, StringData.fromString(transaction.getSymbol())); // symbol: STRING
row.setField(6, transaction.getAmount()); // amount: DOUBLE
row.setField(7, StringData.fromString(transaction.getMemo())); // memo: STRING
row.setField(8, StringData.fromString(transaction.getStage())); // stage: STRING
row.setField(9, StringData.fromString(transaction.getTx_type())); // tx_type: STRING
row.setField(10, StringData.fromString(transaction.getReceiver_contact())); // receiver_contact: STRING
row.setField(11, StringData.fromString(transaction.getFee_mt())); // fee_mt: STRING
row.setField(12, StringData.fromString(transaction.getFee_amount())); // fee_amount: STRING
row.setField(13, StringData.fromString(transaction.getUpdated_at())); // updated_at: STRING
row.setField(14, StringData.fromString(transaction.getCreated_at())); // created_at: STRING
out.collect(row);
}
} catch (Exception e) {
logger.error("处理 Kafka 消息出错:{}", value, e);
}
}
});
rowDataStream.sinkTo(dorisSink);
}
public static List<RealTransaction> handleData(String record) throws ParseException, Exception {
logger.info("RealTransactionAchi record:{}",record);
// TODO 数据的 ETL 处理
JSONObject jsonObj = JSON.parseObject(record);
String flumeType = jsonObj.getString("flume_type");
String bodyStr = jsonObj.getString("data");
if(!StringUtils.equals(flumeType, "realTransaction")) {
return null;
}
logger.info("组装数据 body:{}",bodyStr);
return JSONObject.parseObject(bodyStr,new TypeReference<List<RealTransaction>>(){});
}
@Override
public void sendToSinkKafka(DataStreamSource<String> mStream) {
// TODO Auto-generated method stub
}
}
package com.flink.achieve.doris;
import java.io.Serializable;
import org.apache.flink.api.common.io.ParseException;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import com.flink.common.SourceCommonBase;
/**
* @author wjs
* @version 创建时间:2025-5-8 11:39:07
* 类说明
*/
public class RealUsersAchi extends SourceCommonBase implements Serializable{
/**
*
*/
private static final long serialVersionUID = 1L;
private static final Logger logger = LoggerFactory.getLogger(RealUsersAchi.class);
@Override
public void parseSourceKafkaJson(DataStreamSource<String> dataStreamSource) throws ParseException, Exception {
// TODO Auto-generated method stub
}
@Override
public void sendToSinkKafka(DataStreamSource<String> mStream) {
// TODO Auto-generated method stub
}
}
......@@ -12,6 +12,8 @@ import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import com.flink.util.LoadPropertiesFile;
/**
* @author wjs
* @version 创建时间:2025-4-23 14:36:34
......@@ -37,7 +39,7 @@ public class EnvironmentSettings {
// 检查点存储为文件系统
config.set(CheckpointingOptions.CHECKPOINT_STORAGE, "filesystem");
// HDFS 路径
config.set(CheckpointingOptions.CHECKPOINTS_DIRECTORY, "hdfs://140.245.112.44:8020/user/ck/"+jobType);
config.set(CheckpointingOptions.CHECKPOINTS_DIRECTORY, LoadPropertiesFile.getPropertyFileValues("hdfs.url")+jobType);
// 检查点间隔(默认未启用)
config.set(CheckpointingOptions.CHECKPOINTING_INTERVAL, Duration.ofMinutes(5));
// 检查点超时时间(默认10分钟)
......
......@@ -17,7 +17,11 @@ import java.util.stream.Collectors;
* 类说明 job类型
*/
public enum JobTypeEnum {
EVENT_IP_CONVERT("JOB_01", "事件IP转换作业")
EVENT_IP_CONVERT("JOB_01", "事件IP转换作业"),
REAL_TRANSACTION("JOB_02", "实际交易作业"),
REAL_KYC("JOB_03", "真实KYC作业"),
REAL_USERS("JOB_04", "真实用户作业"),
REAL_BALANCE("JOB_05", "真实余额作业"),
;
......
......@@ -22,6 +22,7 @@ public enum TopicTypeEnum {
ODS_DEVICE_OVERLAP("ods_device_overlap","odsDeviceOverlap"),
ODS_CID_GROUP_OVERLAP("ods_cid_group_overlap","odsCidGroupOverlap"),
ODS_EVENT_IP_CONVERT("ods_event_ip_convert","odsEventIpConvert"),
ODS_USER_INVITATION("ods_user_invitation","odsUserInvitation"),
;
private String topic;
......
......@@ -3,6 +3,10 @@ package com.flink.factory;
import com.flink.enums.JobTypeEnum;
import com.flink.processor.JobProcessor;
import com.flink.processor.impl.EventIpConvertProcessor;
import com.flink.processor.impl.RealBalanceProcessor;
import com.flink.processor.impl.RealKycProcessor;
import com.flink.processor.impl.RealTransactionProcessor;
import com.flink.processor.impl.RealUsersProcessor;
/**
* @author wjs
......@@ -13,10 +17,18 @@ public class JobProcessorFactory {
public static JobProcessor getProcessor(JobTypeEnum jobType) {
switch (jobType) {
case EVENT_IP_CONVERT:
return new EventIpConvertProcessor();
default:
throw new IllegalArgumentException("未知的Job类型: " + jobType);
case EVENT_IP_CONVERT:
return new EventIpConvertProcessor();
case REAL_TRANSACTION:
return new RealTransactionProcessor();
case REAL_KYC:
return new RealKycProcessor();
case REAL_USERS:
return new RealUsersProcessor();
case REAL_BALANCE:
return new RealBalanceProcessor();
default:
throw new IllegalArgumentException("未知的Job类型: " + jobType);
}
}
......
package com.flink.processor.impl;
import com.flink.achieve.doris.RealBalanceAchi;
import com.flink.enums.JobTypeEnum;
import com.flink.enums.TopicTypeEnum;
import com.flink.processor.JobProcessor;
/**
* @author wjs
* @version 创建时间:2025-5-8 11:36:20
* 类说明
*/
public class RealBalanceProcessor implements JobProcessor{
@Override
public void process() throws Exception {
new RealBalanceAchi().handleDataStreamSource(
JobTypeEnum.REAL_BALANCE.getDescription(),
TopicTypeEnum.ODS_USER_INVITATION.getTopic(),
TopicTypeEnum.ODS_USER_INVITATION.getGroup(),
JobTypeEnum.REAL_BALANCE.getCode()
);
}
}
package com.flink.processor.impl;
import com.flink.achieve.doris.RealKycAchi;
import com.flink.enums.JobTypeEnum;
import com.flink.enums.TopicTypeEnum;
import com.flink.processor.JobProcessor;
/**
* @author wjs
* @version 创建时间:2025-5-8 11:36:02
* 类说明
*/
public class RealKycProcessor implements JobProcessor{
@Override
public void process() throws Exception {
new RealKycAchi().handleDataStreamSource(
JobTypeEnum.REAL_KYC.getDescription(),
TopicTypeEnum.ODS_USER_INVITATION.getTopic(),
TopicTypeEnum.ODS_USER_INVITATION.getGroup(),
JobTypeEnum.REAL_KYC.getCode()
);
}
}
package com.flink.processor.impl;
import com.flink.achieve.doris.RealTransactionAchi;
import com.flink.enums.JobTypeEnum;
import com.flink.enums.TopicTypeEnum;
import com.flink.processor.JobProcessor;
/**
* @author wjs
* @version 创建时间:2025-5-7 16:12:34
* 类说明
*/
public class RealTransactionProcessor implements JobProcessor{
@Override
public void process() throws Exception {
new RealTransactionAchi().handleDataStreamSource(
JobTypeEnum.REAL_TRANSACTION.getDescription(),
TopicTypeEnum.ODS_USER_INVITATION.getTopic(),
TopicTypeEnum.ODS_USER_INVITATION.getGroup(),
JobTypeEnum.REAL_TRANSACTION.getCode()
);
}
}
package com.flink.processor.impl;
import com.flink.achieve.doris.RealUsersAchi;
import com.flink.enums.JobTypeEnum;
import com.flink.enums.TopicTypeEnum;
import com.flink.processor.JobProcessor;
/**
* @author wjs
* @version 创建时间:2025-5-8 11:36:11
* 类说明
*/
public class RealUsersProcessor implements JobProcessor{
@Override
public void process() throws Exception {
new RealUsersAchi().handleDataStreamSource(
JobTypeEnum.REAL_USERS.getDescription(),
TopicTypeEnum.ODS_USER_INVITATION.getTopic(),
TopicTypeEnum.ODS_USER_INVITATION.getGroup(),
JobTypeEnum.REAL_USERS.getCode()
);
}
}
package com.flink.vo;
import java.io.Serializable;
import lombok.Data;
/**
* @author wjs
* @version 创建时间:2025-5-8 14:15:42
* 类说明
*/
@Data
public class RealBalance implements Serializable {
/**
*
*/
private static final long serialVersionUID = 1L;
private Integer id;
private String account_id;
private Integer uid;
private String symbol;
private Double balance;
private String updated_at;
}
package com.flink.vo;
import java.io.Serializable;
import lombok.Data;
/**
* @author wjs
* @version 创建时间:2025-5-8 12:00:10
* 类说明
*/
@Data
public class RealKyc implements Serializable{
/**
*
*/
private static final long serialVersionUID = 1L;
private Integer id;
private String kind;
private String procedure_verdict;
private String admin_verdict;
private String admin;
private String memo;
private String created_at;
private String updated_at;
}
package com.flink.vo;
import java.io.Serializable;
import lombok.Data;
/**
* @author wjs
* @version 创建时间:2025-5-8 11:54:40
* 类说明
*/
@Data
public class RealTransaction implements Serializable{
/**
*
*/
private static final long serialVersionUID = 1L;
private Integer id;
private String sender;
private String receiver;
private Integer sender_id;
private Integer receiver_id;
private String symbol;
private Double amount;
private String memo;
private String stage;
private String tx_type;
private String receiver_contact;
private String fee_mt;
private String fee_amount;
private String updated_at;
private String created_at;
}
package com.flink.vo;
import java.io.Serializable;
import lombok.Data;
/**
* @author wjs
* @version 创建时间:2025-5-8 12:01:48
* 类说明
*/
@Data
public class RealUsers implements Serializable{
/**
*
*/
private static final long serialVersionUID = 1L;
private Integer id;
private String phone_number;
private String email;
private String leader;
private Integer leader_id;
private String kind;
private String login_pwd_hash;
private String answer_indexes;
private String main_account;
private String device_manage_state;
private String state;
private String token_version;
private String updated_at;
private String created_at;
}
#kafka集群地址
#kafka.bootstrapServers=140.245.125.203:9092
kafka.bootstrapServers=168.138.185.142:9092,213.35.103.223:9092,129.150.49.247:9092
#kafka主题
kafka.topic=ods_collect_log
#kafka消费者组
kafka.group=collectGroup
#kafka.bootstrapServers=10.0.0.29:9092,10.0.0.87:9092,10.0.0.18:9092
#doris.jdbc_url=jdbc:mysql://10.0.0.105 9030/bi?useUnicode=true&characterEncoding=utf8&zeroDateTimeBehavior=convertToNull&useSSL=true&serverTimezone=GMT%2B8&nullCatalogMeansCurrent=true
#doris.jdbc_url=jdbc:mysql://140.245.112.44:9030/bi?useUnicode=true&characterEncoding=utf8&zeroDateTimeBehavior=convertToNull&useSSL=true&serverTimezone=GMT%2B8&nullCatalogMeansCurrent=true
#doris.fe=140.245.112.44:8030
doris.fe=10.0.0.105:8030
doris.username=root
doris.driver_class_name=com.mysql.cj.jdbc.Driver
\ No newline at end of file
doris.driver_class_name=com.mysql.cj.jdbc.Driver
hdfs.url=hdfs://140.245.112.44:8020/user/ck/
\ No newline at end of file
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or sign in to comment