Commit 6c32ea48 by 魏建枢

配置调整

parent 1c981f87
...@@ -49,7 +49,7 @@ public class EventIpConvertAchi extends SourceCommonBase implements Serializable ...@@ -49,7 +49,7 @@ public class EventIpConvertAchi extends SourceCommonBase implements Serializable
//=================配置入库字段========================================= //=================配置入库字段=========================================
String[] fields = {"id", "ip", "area_name", "device_id", "cid", "phone", "nick","create_time","dt","__DORIS_DELETE_SIGN__"}; String[] fields = {"id", "ip", "area_name", "device_id", "cid", "phone", "nick","create_time","dt","__DORIS_DELETE_SIGN__"};
DataType[] types = {DataTypes.STRING(), DataTypes.STRING(), DataTypes.STRING(), DataTypes.STRING(), DataType[] types = {DataTypes.STRING(), DataTypes.STRING(), DataTypes.STRING(), DataTypes.STRING(),
DataTypes.STRING(), DataTypes.STRING(), DataTypes.STRING(),DataTypes.STRING(),DataTypes.TIMESTAMP()}; DataTypes.STRING(), DataTypes.STRING(), DataTypes.STRING(),DataTypes.STRING(),DataTypes.TIMESTAMP(),DataTypes.INT()};
//=================流式处理========================================= //=================流式处理=========================================
String tableName = "bi.event_ip_convert"; String tableName = "bi.event_ip_convert";
DorisSink<RowData> dorisSink = DorisConnector.sinkDoris(fields, types, tableName); DorisSink<RowData> dorisSink = DorisConnector.sinkDoris(fields, types, tableName);
...@@ -61,7 +61,7 @@ public class EventIpConvertAchi extends SourceCommonBase implements Serializable ...@@ -61,7 +61,7 @@ public class EventIpConvertAchi extends SourceCommonBase implements Serializable
if(null == jsonObj) { if(null == jsonObj) {
return null; return null;
} }
GenericRowData row = new GenericRowData(9); GenericRowData row = new GenericRowData(fields.length);
DateTimeFormatter formatter = DateTimeFormatter.ISO_INSTANT; DateTimeFormatter formatter = DateTimeFormatter.ISO_INSTANT;
System.out.println("value" + value); System.out.println("value" + value);
...@@ -75,6 +75,7 @@ public class EventIpConvertAchi extends SourceCommonBase implements Serializable ...@@ -75,6 +75,7 @@ public class EventIpConvertAchi extends SourceCommonBase implements Serializable
row.setField(6, StringData.fromString((String) jsonObj.get("nick"))); row.setField(6, StringData.fromString((String) jsonObj.get("nick")));
row.setField(7, StringData.fromString((String) jsonObj.get("createTime"))); row.setField(7, StringData.fromString((String) jsonObj.get("createTime")));
row.setField(8, TimestampData.fromInstant(Instant.now())); row.setField(8, TimestampData.fromInstant(Instant.now()));
row.setField(9, 0);
return (RowData)row; return (RowData)row;
} catch (Exception e) { } catch (Exception e) {
System.err.println("解析失败: "+e.toString()); System.err.println("解析失败: "+e.toString());
......
...@@ -49,7 +49,8 @@ public class RealBalanceAchi extends SourceCommonBase implements Serializable{ ...@@ -49,7 +49,8 @@ public class RealBalanceAchi extends SourceCommonBase implements Serializable{
"uid", "uid",
"symbol", "symbol",
"balance", "balance",
"updated_at" "updated_at",
"__DORIS_DELETE_SIGN__"
}; };
DataType[] types = { DataType[] types = {
DataTypes.STRING(), DataTypes.STRING(),
...@@ -57,7 +58,8 @@ public class RealBalanceAchi extends SourceCommonBase implements Serializable{ ...@@ -57,7 +58,8 @@ public class RealBalanceAchi extends SourceCommonBase implements Serializable{
DataTypes.INT(), DataTypes.INT(),
DataTypes.STRING(), DataTypes.STRING(),
DataTypes.DOUBLE(), DataTypes.DOUBLE(),
DataTypes.STRING() DataTypes.STRING(),
DataTypes.INT()
}; };
//=================流式处理========================================= //=================流式处理=========================================
String tableName = "bi.real_balance"; String tableName = "bi.real_balance";
...@@ -84,6 +86,7 @@ public class RealBalanceAchi extends SourceCommonBase implements Serializable{ ...@@ -84,6 +86,7 @@ public class RealBalanceAchi extends SourceCommonBase implements Serializable{
row.setField(3, StringData.fromString(balance.getSymbol())); // symbol: STRING row.setField(3, StringData.fromString(balance.getSymbol())); // symbol: STRING
row.setField(4, balance.getBalance()); // balance: DOUBLE row.setField(4, balance.getBalance()); // balance: DOUBLE
row.setField(5, StringData.fromString(balance.getUpdated_at())); // updated_at: STRING row.setField(5, StringData.fromString(balance.getUpdated_at())); // updated_at: STRING
row.setField(6, 0);
out.collect(row); out.collect(row);
} }
} catch (Exception e) { } catch (Exception e) {
......
...@@ -52,7 +52,8 @@ public class RealKycAchi extends SourceCommonBase implements Serializable{ ...@@ -52,7 +52,8 @@ public class RealKycAchi extends SourceCommonBase implements Serializable{
"admin", "admin",
"memo", "memo",
"created_at", "created_at",
"updated_at" "updated_at",
"__DORIS_DELETE_SIGN__"
}; };
DataType[] types = { DataType[] types = {
DataTypes.STRING(), DataTypes.STRING(),
...@@ -62,7 +63,8 @@ public class RealKycAchi extends SourceCommonBase implements Serializable{ ...@@ -62,7 +63,8 @@ public class RealKycAchi extends SourceCommonBase implements Serializable{
DataTypes.STRING(), DataTypes.STRING(),
DataTypes.STRING(), DataTypes.STRING(),
DataTypes.STRING(), DataTypes.STRING(),
DataTypes.STRING() DataTypes.STRING(),
DataTypes.INT()
}; };
//=================流式处理========================================= //=================流式处理=========================================
String tableName = "bi.real_kyc"; String tableName = "bi.real_kyc";
...@@ -92,6 +94,7 @@ public class RealKycAchi extends SourceCommonBase implements Serializable{ ...@@ -92,6 +94,7 @@ public class RealKycAchi extends SourceCommonBase implements Serializable{
row.setField(5, StringData.fromString(kyc.getMemo())); // memo: STRING row.setField(5, StringData.fromString(kyc.getMemo())); // memo: STRING
row.setField(6, StringData.fromString(kyc.getCreated_at())); // created_at: STRING row.setField(6, StringData.fromString(kyc.getCreated_at())); // created_at: STRING
row.setField(7, StringData.fromString(kyc.getUpdated_at())); // updated_at: STRING row.setField(7, StringData.fromString(kyc.getUpdated_at())); // updated_at: STRING
row.setField(8, 0);
out.collect(row); out.collect(row);
} }
} catch (Exception e) { } catch (Exception e) {
......
...@@ -59,7 +59,8 @@ public class RealTransactionAchi extends SourceCommonBase implements Serializabl ...@@ -59,7 +59,8 @@ public class RealTransactionAchi extends SourceCommonBase implements Serializabl
"fee_mt", "fee_mt",
"fee_amount", "fee_amount",
"updated_at", "updated_at",
"created_at" "created_at",
"__DORIS_DELETE_SIGN__"
}; };
DataType[] types = { DataType[] types = {
DataTypes.STRING(), DataTypes.STRING(),
...@@ -76,7 +77,8 @@ public class RealTransactionAchi extends SourceCommonBase implements Serializabl ...@@ -76,7 +77,8 @@ public class RealTransactionAchi extends SourceCommonBase implements Serializabl
DataTypes.STRING(), DataTypes.STRING(),
DataTypes.STRING(), DataTypes.STRING(),
DataTypes.STRING(), DataTypes.STRING(),
DataTypes.STRING() DataTypes.STRING(),
DataTypes.INT()
}; };
//=================流式处理========================================= //=================流式处理=========================================
String tableName = "bi.real_transaction"; String tableName = "bi.real_transaction";
...@@ -113,6 +115,7 @@ public class RealTransactionAchi extends SourceCommonBase implements Serializabl ...@@ -113,6 +115,7 @@ public class RealTransactionAchi extends SourceCommonBase implements Serializabl
row.setField(12, StringData.fromString(transaction.getFee_amount())); // fee_amount: STRING row.setField(12, StringData.fromString(transaction.getFee_amount())); // fee_amount: STRING
row.setField(13, StringData.fromString(transaction.getUpdated_at())); // updated_at: STRING row.setField(13, StringData.fromString(transaction.getUpdated_at())); // updated_at: STRING
row.setField(14, StringData.fromString(transaction.getCreated_at())); // created_at: STRING row.setField(14, StringData.fromString(transaction.getCreated_at())); // created_at: STRING
row.setField(15, 0);
out.collect(row); out.collect(row);
} }
} catch (Exception e) { } catch (Exception e) {
......
...@@ -58,7 +58,8 @@ public class RealUsersAchi extends SourceCommonBase implements Serializable{ ...@@ -58,7 +58,8 @@ public class RealUsersAchi extends SourceCommonBase implements Serializable{
"state", "state",
"token_version", "token_version",
"updated_at", "updated_at",
"created_at" "created_at",
"__DORIS_DELETE_SIGN__"
}; };
DataType[] types = { DataType[] types = {
...@@ -75,7 +76,8 @@ public class RealUsersAchi extends SourceCommonBase implements Serializable{ ...@@ -75,7 +76,8 @@ public class RealUsersAchi extends SourceCommonBase implements Serializable{
DataTypes.STRING(), // state DataTypes.STRING(), // state
DataTypes.STRING(), // token_version DataTypes.STRING(), // token_version
DataTypes.STRING(), // updated_at DataTypes.STRING(), // updated_at
DataTypes.STRING() // created_at DataTypes.STRING(), // created_at
DataTypes.INT()
}; };
//=================流式处理========================================= //=================流式处理=========================================
String tableName = "bi.real_users"; String tableName = "bi.real_users";
...@@ -107,7 +109,7 @@ public class RealUsersAchi extends SourceCommonBase implements Serializable{ ...@@ -107,7 +109,7 @@ public class RealUsersAchi extends SourceCommonBase implements Serializable{
row.setField(11, StringData.fromString(user.getToken_version())); row.setField(11, StringData.fromString(user.getToken_version()));
row.setField(12, StringData.fromString(user.getUpdated_at())); row.setField(12, StringData.fromString(user.getUpdated_at()));
row.setField(13, StringData.fromString(user.getCreated_at())); row.setField(13, StringData.fromString(user.getCreated_at()));
row.setField(14, 0);
out.collect(row); out.collect(row);
} }
} catch (Exception e) { } catch (Exception e) {
......
...@@ -27,6 +27,7 @@ public class DorisConnector { ...@@ -27,6 +27,7 @@ public class DorisConnector {
streamLoadProps.setProperty("read_json_by_line", "true"); streamLoadProps.setProperty("read_json_by_line", "true");
streamLoadProps.setProperty("strip_outer_array", "false"); streamLoadProps.setProperty("strip_outer_array", "false");
streamLoadProps.setProperty("sink.enable-2pc", "true"); streamLoadProps.setProperty("sink.enable-2pc", "true");
streamLoadProps.setProperty("sink.enable-delete", "true");
//=================Doris Sink 配置========================================= //=================Doris Sink 配置=========================================
DorisOptions dorisOptions = DorisOptions.builder() DorisOptions dorisOptions = DorisOptions.builder()
.setFenodes(LoadPropertiesFile.getPropertyFileValues("doris.fe")) .setFenodes(LoadPropertiesFile.getPropertyFileValues("doris.fe"))
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or sign in to comment