Commit 1c981f87 by 魏建枢

代码提交

parent 6e4c1e19
...@@ -52,7 +52,7 @@ public class RealBalanceAchi extends SourceCommonBase implements Serializable{ ...@@ -52,7 +52,7 @@ public class RealBalanceAchi extends SourceCommonBase implements Serializable{
"updated_at" "updated_at"
}; };
DataType[] types = { DataType[] types = {
DataTypes.INT(), DataTypes.STRING(),
DataTypes.STRING(), DataTypes.STRING(),
DataTypes.INT(), DataTypes.INT(),
DataTypes.STRING(), DataTypes.STRING(),
...@@ -78,7 +78,7 @@ public class RealBalanceAchi extends SourceCommonBase implements Serializable{ ...@@ -78,7 +78,7 @@ public class RealBalanceAchi extends SourceCommonBase implements Serializable{
// 转换为RowData // 转换为RowData
for (RealBalance balance : recordList) { for (RealBalance balance : recordList) {
GenericRowData row = new GenericRowData(fields.length); GenericRowData row = new GenericRowData(fields.length);
row.setField(0, balance.getId()); // id: INT row.setField(0, StringData.fromString(balance.getId())); // id: INT
row.setField(1, StringData.fromString(balance.getAccount_id())); // account_id: STRING row.setField(1, StringData.fromString(balance.getAccount_id())); // account_id: STRING
row.setField(2, balance.getUid()); // uid: INT row.setField(2, balance.getUid()); // uid: INT
row.setField(3, StringData.fromString(balance.getSymbol())); // symbol: STRING row.setField(3, StringData.fromString(balance.getSymbol())); // symbol: STRING
......
...@@ -55,7 +55,7 @@ public class RealKycAchi extends SourceCommonBase implements Serializable{ ...@@ -55,7 +55,7 @@ public class RealKycAchi extends SourceCommonBase implements Serializable{
"updated_at" "updated_at"
}; };
DataType[] types = { DataType[] types = {
DataTypes.INT(), DataTypes.STRING(),
DataTypes.STRING(), DataTypes.STRING(),
DataTypes.STRING(), DataTypes.STRING(),
DataTypes.STRING(), DataTypes.STRING(),
...@@ -65,7 +65,7 @@ public class RealKycAchi extends SourceCommonBase implements Serializable{ ...@@ -65,7 +65,7 @@ public class RealKycAchi extends SourceCommonBase implements Serializable{
DataTypes.STRING() DataTypes.STRING()
}; };
//=================流式处理========================================= //=================流式处理=========================================
String tableName = "bi.real_transaction"; String tableName = "bi.real_kyc";
DorisSink<RowData> dorisSink = DorisConnector.sinkDoris(fields, types, tableName); DorisSink<RowData> dorisSink = DorisConnector.sinkDoris(fields, types, tableName);
SingleOutputStreamOperator<RowData> rowDataStream = dataStreamSource.flatMap( SingleOutputStreamOperator<RowData> rowDataStream = dataStreamSource.flatMap(
...@@ -84,7 +84,7 @@ public class RealKycAchi extends SourceCommonBase implements Serializable{ ...@@ -84,7 +84,7 @@ public class RealKycAchi extends SourceCommonBase implements Serializable{
// 将每个 RealKyc 转换为 RowData 并输出 // 将每个 RealKyc 转换为 RowData 并输出
for (RealKyc kyc : recordList) { for (RealKyc kyc : recordList) {
GenericRowData row = new GenericRowData(fields.length); GenericRowData row = new GenericRowData(fields.length);
row.setField(0, kyc.getId()); // id: INT row.setField(0, StringData.fromString(kyc.getId())); // id: INT
row.setField(1, StringData.fromString(kyc.getKind())); // kind: STRING row.setField(1, StringData.fromString(kyc.getKind())); // kind: STRING
row.setField(2, StringData.fromString(kyc.getProcedure_verdict())); // procedure_verdict: STRING row.setField(2, StringData.fromString(kyc.getProcedure_verdict())); // procedure_verdict: STRING
row.setField(3, StringData.fromString(kyc.getAdmin_verdict())); // admin_verdict: STRING row.setField(3, StringData.fromString(kyc.getAdmin_verdict())); // admin_verdict: STRING
......
...@@ -62,7 +62,7 @@ public class RealTransactionAchi extends SourceCommonBase implements Serializabl ...@@ -62,7 +62,7 @@ public class RealTransactionAchi extends SourceCommonBase implements Serializabl
"created_at" "created_at"
}; };
DataType[] types = { DataType[] types = {
DataTypes.INT(), DataTypes.STRING(),
DataTypes.STRING(), DataTypes.STRING(),
DataTypes.STRING(), DataTypes.STRING(),
DataTypes.INT(), DataTypes.INT(),
...@@ -98,7 +98,7 @@ public class RealTransactionAchi extends SourceCommonBase implements Serializabl ...@@ -98,7 +98,7 @@ public class RealTransactionAchi extends SourceCommonBase implements Serializabl
// 将每个 RealTransaction 转换为 RowData 并输出 // 将每个 RealTransaction 转换为 RowData 并输出
for (RealTransaction transaction : recordList) { for (RealTransaction transaction : recordList) {
GenericRowData row = new GenericRowData(fields.length); GenericRowData row = new GenericRowData(fields.length);
row.setField(0, transaction.getId()); // id: INT row.setField(0, StringData.fromString(transaction.getId())); // id: INT
row.setField(1, StringData.fromString(transaction.getSender())); // sender: STRING row.setField(1, StringData.fromString(transaction.getSender())); // sender: STRING
row.setField(2, StringData.fromString(transaction.getReceiver())); // receiver: STRING row.setField(2, StringData.fromString(transaction.getReceiver())); // receiver: STRING
row.setField(3, transaction.getSender_id()); // sender_id: INT row.setField(3, transaction.getSender_id()); // sender_id: INT
......
...@@ -62,7 +62,7 @@ public class RealUsersAchi extends SourceCommonBase implements Serializable{ ...@@ -62,7 +62,7 @@ public class RealUsersAchi extends SourceCommonBase implements Serializable{
}; };
DataType[] types = { DataType[] types = {
DataTypes.INT(), // id DataTypes.STRING(), // id
DataTypes.STRING(), // phone_number DataTypes.STRING(), // phone_number
DataTypes.STRING(), // email DataTypes.STRING(), // email
DataTypes.STRING(), // leader DataTypes.STRING(), // leader
...@@ -93,7 +93,7 @@ public class RealUsersAchi extends SourceCommonBase implements Serializable{ ...@@ -93,7 +93,7 @@ public class RealUsersAchi extends SourceCommonBase implements Serializable{
for (RealUsers user : usersList) { for (RealUsers user : usersList) {
GenericRowData row = new GenericRowData(fields.length); GenericRowData row = new GenericRowData(fields.length);
row.setField(0, user.getId()); row.setField(0, StringData.fromString(user.getId()));
row.setField(1, StringData.fromString(user.getPhone_number())); row.setField(1, StringData.fromString(user.getPhone_number()));
row.setField(2, StringData.fromString(user.getEmail())); row.setField(2, StringData.fromString(user.getEmail()));
row.setField(3, StringData.fromString(user.getLeader())); row.setField(3, StringData.fromString(user.getLeader()));
......
...@@ -16,7 +16,7 @@ public class RealBalance implements Serializable { ...@@ -16,7 +16,7 @@ public class RealBalance implements Serializable {
*/ */
private static final long serialVersionUID = 1L; private static final long serialVersionUID = 1L;
private Integer id; private String id;
private String account_id; private String account_id;
private Integer uid; private Integer uid;
private String symbol; private String symbol;
......
...@@ -16,7 +16,7 @@ public class RealKyc implements Serializable{ ...@@ -16,7 +16,7 @@ public class RealKyc implements Serializable{
*/ */
private static final long serialVersionUID = 1L; private static final long serialVersionUID = 1L;
private Integer id; private String id;
private String kind; private String kind;
private String procedure_verdict; private String procedure_verdict;
private String admin_verdict; private String admin_verdict;
......
...@@ -16,7 +16,7 @@ public class RealTransaction implements Serializable{ ...@@ -16,7 +16,7 @@ public class RealTransaction implements Serializable{
*/ */
private static final long serialVersionUID = 1L; private static final long serialVersionUID = 1L;
private Integer id; private String id;
private String sender; private String sender;
private String receiver; private String receiver;
private Integer sender_id; private Integer sender_id;
......
...@@ -16,7 +16,7 @@ public class RealUsers implements Serializable{ ...@@ -16,7 +16,7 @@ public class RealUsers implements Serializable{
*/ */
private static final long serialVersionUID = 1L; private static final long serialVersionUID = 1L;
private Integer id; private String id;
private String phone_number; private String phone_number;
private String email; private String email;
private String leader; private String leader;
......
#kafka集群地址 #kafka集群地址
#kafka.bootstrapServers=140.245.125.203:9092 #kafka.bootstrapServers=140.245.125.203:9092
kafka.bootstrapServers=168.138.185.142:9092,213.35.103.223:9092,129.150.49.247:9092 #kafka.bootstrapServers=168.138.185.142:9092,213.35.103.223:9092,129.150.49.247:9092
#kafka.bootstrapServers=10.0.0.29:9092,10.0.0.87:9092,10.0.0.18:9092 kafka.bootstrapServers=10.0.0.29:9092,10.0.0.87:9092,10.0.0.18:9092
#doris.jdbc_url=jdbc:mysql://10.0.0.105 9030/bi?useUnicode=true&characterEncoding=utf8&zeroDateTimeBehavior=convertToNull&useSSL=true&serverTimezone=GMT%2B8&nullCatalogMeansCurrent=true #doris.jdbc_url=jdbc:mysql://10.0.0.105 9030/bi?useUnicode=true&characterEncoding=utf8&zeroDateTimeBehavior=convertToNull&useSSL=true&serverTimezone=GMT%2B8&nullCatalogMeansCurrent=true
#doris.jdbc_url=jdbc:mysql://140.245.112.44:9030/bi?useUnicode=true&characterEncoding=utf8&zeroDateTimeBehavior=convertToNull&useSSL=true&serverTimezone=GMT%2B8&nullCatalogMeansCurrent=true #doris.jdbc_url=jdbc:mysql://140.245.112.44:9030/bi?useUnicode=true&characterEncoding=utf8&zeroDateTimeBehavior=convertToNull&useSSL=true&serverTimezone=GMT%2B8&nullCatalogMeansCurrent=true
...@@ -10,4 +10,5 @@ doris.fe=10.0.0.105:8030 ...@@ -10,4 +10,5 @@ doris.fe=10.0.0.105:8030
doris.username=root doris.username=root
doris.driver_class_name=com.mysql.cj.jdbc.Driver doris.driver_class_name=com.mysql.cj.jdbc.Driver
hdfs.url=hdfs://140.245.112.44:8020/user/ck/ hdfs.url=hdfs://10.0.0.105:8020/user/ck/
\ No newline at end of file #hdfs.url=hdfs://140.245.112.44:8020/user/ck/
\ No newline at end of file
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or sign in to comment