Commit d81d9d04 by 魏建枢

用户邀请相关ETL处理

parent 39eaeab9
package com.flink.achieve.doris;
import java.io.Serializable;
import java.util.List;
import org.apache.commons.collections.CollectionUtils;
import org.apache.commons.lang3.StringUtils;
import org.apache.doris.flink.sink.DorisSink;
import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.common.io.ParseException;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.table.api.DataTypes;
import org.apache.flink.table.data.GenericRowData;
import org.apache.flink.table.data.RowData;
import org.apache.flink.table.data.StringData;
import org.apache.flink.table.types.DataType;
import org.apache.flink.util.Collector;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONObject;
import com.alibaba.fastjson.TypeReference;
import com.flink.common.DorisConnector;
import com.flink.common.SourceCommonBase;
import com.flink.vo.RealBalance;
/**
* @author wjs
......@@ -24,10 +41,73 @@ public class RealBalanceAchi extends SourceCommonBase implements Serializable{
@Override
public void parseSourceKafkaJson(DataStreamSource<String> dataStreamSource) throws ParseException, Exception {
// TODO Auto-generated method stub
//=================配置入库字段=========================================
String[] fields = {
"id",
"account_id",
"uid",
"symbol",
"balance",
"updated_at"
};
DataType[] types = {
DataTypes.INT(),
DataTypes.STRING(),
DataTypes.INT(),
DataTypes.STRING(),
DataTypes.DOUBLE(),
DataTypes.STRING()
};
//=================流式处理=========================================
String tableName = "bi.real_balance";
DorisSink<RowData> dorisSink = DorisConnector.sinkDoris(fields, types, tableName);
SingleOutputStreamOperator<RowData> rowDataStream = dataStreamSource.flatMap(
new FlatMapFunction<String, RowData>() {
private static final long serialVersionUID = 1L;
@Override
public void flatMap(String value, Collector<RowData> out) throws Exception {
try {
// 解析 Kafka 数据
List<RealBalance> recordList = handleData(value);
if (CollectionUtils.isEmpty(recordList)) {
return;
}
// 转换为RowData
for (RealBalance balance : recordList) {
GenericRowData row = new GenericRowData(fields.length);
row.setField(0, balance.getId()); // id: INT
row.setField(1, StringData.fromString(balance.getAccount_id())); // account_id: STRING
row.setField(2, balance.getUid()); // uid: INT
row.setField(3, StringData.fromString(balance.getSymbol())); // symbol: STRING
row.setField(4, balance.getBalance()); // balance: DOUBLE
row.setField(5, StringData.fromString(balance.getUpdated_at())); // updated_at: STRING
out.collect(row);
}
} catch (Exception e) {
logger.error("RealBalanceAchi 处理 Kafka 消息出错 | rawData:{} | error:{}", value, e.getMessage());
}
}
});
rowDataStream.sinkTo(dorisSink).name("Doris-RealBalance");
}
public static List<RealBalance> handleData(String record) throws ParseException, Exception {
logger.info("RealKycAchi record:{}",record);
// 数据的 ETL 处理
JSONObject jsonObj = JSON.parseObject(record);
String flumeType = jsonObj.getString("flume_type");
String bodyStr = jsonObj.getString("data");
if(!StringUtils.equals(flumeType, "realBalance")) {
return null;
}
logger.info("组装数据 body:{}",bodyStr);
return JSONObject.parseObject(bodyStr,new TypeReference<List<RealBalance>>(){});
}
@Override
public void sendToSinkKafka(DataStreamSource<String> mStream) {
// TODO Auto-generated method stub
......
package com.flink.achieve.doris;
import java.io.Serializable;
import java.util.List;
import org.apache.commons.collections.CollectionUtils;
import org.apache.commons.lang3.StringUtils;
import org.apache.doris.flink.sink.DorisSink;
import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.common.io.ParseException;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.table.api.DataTypes;
import org.apache.flink.table.data.GenericRowData;
import org.apache.flink.table.data.RowData;
import org.apache.flink.table.data.StringData;
import org.apache.flink.table.types.DataType;
import org.apache.flink.util.Collector;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONObject;
import com.alibaba.fastjson.TypeReference;
import com.flink.common.DorisConnector;
import com.flink.common.SourceCommonBase;
import com.flink.vo.RealKyc;
/**
* @author wjs
......@@ -25,8 +42,79 @@ public class RealKycAchi extends SourceCommonBase implements Serializable{
@Override
public void parseSourceKafkaJson(DataStreamSource<String> dataStreamSource) throws ParseException, Exception {
// TODO Auto-generated method stub
//=================配置入库字段=========================================
String[] fields = {
"id",
"kind",
"procedure_verdict",
"admin_verdict",
"admin",
"memo",
"created_at",
"updated_at"
};
DataType[] types = {
DataTypes.INT(),
DataTypes.STRING(),
DataTypes.STRING(),
DataTypes.STRING(),
DataTypes.STRING(),
DataTypes.STRING(),
DataTypes.STRING(),
DataTypes.STRING()
};
//=================流式处理=========================================
String tableName = "bi.real_transaction";
DorisSink<RowData> dorisSink = DorisConnector.sinkDoris(fields, types, tableName);
SingleOutputStreamOperator<RowData> rowDataStream = dataStreamSource.flatMap(
new FlatMapFunction<String, RowData>() {
private static final long serialVersionUID = 1L;
@Override
public void flatMap(String value, Collector<RowData> out) throws Exception {
try {
// 解析 Kafka 数据
List<RealKyc> recordList = handleData(value);
if (CollectionUtils.isEmpty(recordList)) {
return;
}
// 将每个 RealKyc 转换为 RowData 并输出
for (RealKyc kyc : recordList) {
GenericRowData row = new GenericRowData(fields.length);
row.setField(0, kyc.getId()); // id: INT
row.setField(1, StringData.fromString(kyc.getKind())); // kind: STRING
row.setField(2, StringData.fromString(kyc.getProcedure_verdict())); // procedure_verdict: STRING
row.setField(3, StringData.fromString(kyc.getAdmin_verdict())); // admin_verdict: STRING
row.setField(4, StringData.fromString(kyc.getAdmin())); // admin: STRING
row.setField(5, StringData.fromString(kyc.getMemo())); // memo: STRING
row.setField(6, StringData.fromString(kyc.getCreated_at())); // created_at: STRING
row.setField(7, StringData.fromString(kyc.getUpdated_at())); // updated_at: STRING
out.collect(row);
}
} catch (Exception e) {
logger.error("RealKycAchi 处理 Kafka 消息出错 | rawData:{} | error:{}", value, e.getMessage());
}
}
});
rowDataStream.sinkTo(dorisSink).name("Doris-RealKyc");
}
public static List<RealKyc> handleData(String record) throws ParseException, Exception {
logger.info("RealKycAchi record:{}",record);
// 数据的 ETL 处理
JSONObject jsonObj = JSON.parseObject(record);
String flumeType = jsonObj.getString("flume_type");
String bodyStr = jsonObj.getString("data");
if(!StringUtils.equals(flumeType, "realKyc")) {
return null;
}
logger.info("组装数据 body:{}",bodyStr);
return JSONObject.parseObject(bodyStr,new TypeReference<List<RealKyc>>(){});
}
@Override
......
......@@ -115,16 +115,16 @@ public class RealTransactionAchi extends SourceCommonBase implements Serializabl
out.collect(row);
}
} catch (Exception e) {
logger.error("处理 Kafka 消息出错:{}", value, e);
logger.error("RealTransactionAchi 处理 Kafka 消息出错 | rawData:{} | error:{}", value, e.getMessage());
}
}
});
rowDataStream.sinkTo(dorisSink);
rowDataStream.sinkTo(dorisSink).name("Doris-RealTransaction");
}
public static List<RealTransaction> handleData(String record) throws ParseException, Exception {
logger.info("RealTransactionAchi record:{}",record);
// TODO 数据的 ETL 处理
//数据的 ETL 处理
JSONObject jsonObj = JSON.parseObject(record);
String flumeType = jsonObj.getString("flume_type");
String bodyStr = jsonObj.getString("data");
......
package com.flink.achieve.doris;
import java.io.Serializable;
import java.util.List;
import org.apache.commons.collections.CollectionUtils;
import org.apache.commons.lang3.StringUtils;
import org.apache.doris.flink.sink.DorisSink;
import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.common.io.ParseException;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.table.api.DataTypes;
import org.apache.flink.table.data.GenericRowData;
import org.apache.flink.table.data.RowData;
import org.apache.flink.table.data.StringData;
import org.apache.flink.table.types.DataType;
import org.apache.flink.util.Collector;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONObject;
import com.alibaba.fastjson.TypeReference;
import com.flink.common.DorisConnector;
import com.flink.common.SourceCommonBase;
import com.flink.vo.RealUsers;
/**
* @author wjs
......@@ -25,10 +42,96 @@ public class RealUsersAchi extends SourceCommonBase implements Serializable{
@Override
public void parseSourceKafkaJson(DataStreamSource<String> dataStreamSource) throws ParseException, Exception {
// TODO Auto-generated method stub
//=================配置入库字段=========================================
String[] fields = {
"id",
"phone_number",
"email",
"leader",
"leader_id",
"kind",
"login_pwd_hash",
"answer_indexes",
"main_account",
"device_manage_state",
"state",
"token_version",
"updated_at",
"created_at"
};
DataType[] types = {
DataTypes.INT(), // id
DataTypes.STRING(), // phone_number
DataTypes.STRING(), // email
DataTypes.STRING(), // leader
DataTypes.INT(), // leader_id
DataTypes.STRING(), // kind
DataTypes.STRING(), // login_pwd_hash
DataTypes.STRING(), // answer_indexes
DataTypes.STRING(), // main_account
DataTypes.STRING(), // device_manage_state
DataTypes.STRING(), // state
DataTypes.STRING(), // token_version
DataTypes.STRING(), // updated_at
DataTypes.STRING() // created_at
};
//=================流式处理=========================================
String tableName = "bi.real_users";
DorisSink<RowData> dorisSink = DorisConnector.sinkDoris(fields, types, tableName);
SingleOutputStreamOperator<RowData> rowDataStream = dataStreamSource.flatMap(
new FlatMapFunction<String, RowData>() {
private static final long serialVersionUID = 1L;
@Override
public void flatMap(String value, Collector<RowData> out) throws Exception {
try {
List<RealUsers> usersList = handleData(value);
if (CollectionUtils.isEmpty(usersList)) return;
for (RealUsers user : usersList) {
GenericRowData row = new GenericRowData(fields.length);
row.setField(0, user.getId());
row.setField(1, StringData.fromString(user.getPhone_number()));
row.setField(2, StringData.fromString(user.getEmail()));
row.setField(3, StringData.fromString(user.getLeader()));
row.setField(4, user.getLeader_id());
row.setField(5, StringData.fromString(user.getKind()));
row.setField(6, StringData.fromString(user.getLogin_pwd_hash()));
row.setField(7, StringData.fromString(user.getAnswer_indexes()));
row.setField(8, StringData.fromString(user.getMain_account()));
row.setField(9, StringData.fromString(user.getDevice_manage_state()));
row.setField(10, StringData.fromString(user.getState()));
row.setField(11, StringData.fromString(user.getToken_version()));
row.setField(12, StringData.fromString(user.getUpdated_at()));
row.setField(13, StringData.fromString(user.getCreated_at()));
out.collect(row);
}
} catch (Exception e) {
logger.error("RealUsersAchi 处理 Kafka 消息出错 | rawData:{} | error:{}", value, e.getMessage());
}
}
});
rowDataStream.sinkTo(dorisSink).name("Doris-UserSink");
}
public static List<RealUsers> handleData(String record) throws ParseException, Exception {
logger.info("RealUsersAchi record:{}",record);
// 数据的 ETL 处理
JSONObject jsonObj = JSON.parseObject(record);
String flumeType = jsonObj.getString("flume_type");
String bodyStr = jsonObj.getString("data");
if(!StringUtils.equals(flumeType, "realUsers")) {
return null;
}
logger.info("组装数据 body:{}",bodyStr);
return JSONObject.parseObject(bodyStr,new TypeReference<List<RealUsers>>(){});
}
@Override
public void sendToSinkKafka(DataStreamSource<String> mStream) {
// TODO Auto-generated method stub
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or sign in to comment