Commit 2ac7a1b0 by 魏建枢

代码提交

parent 652a8b37
......@@ -59,16 +59,25 @@ public class CommunityHistoryAchi implements Serializable {
// 初始化表配置
TableConfig communityHistoryConfig = new TableConfig(COMMUNITY_HISTORY_FIELDS, COMMUNITY_HISTORY_TYPES,
"bi.simi_community_history");
TableConfig aiCommunityHistoryConfig = new TableConfig(COMMUNITY_HISTORY_FIELDS, COMMUNITY_HISTORY_TYPES,
"ai.simi_community_history");
// 创建Doris Sink
DorisSink<RowData> dorisCommunityHistorySink = DorisConnector.sinkDoris(communityHistoryConfig.getFields(),
communityHistoryConfig.getTypes(), communityHistoryConfig.getTableName());
DorisSink<RowData> dorisAiCommunityHistorySink = DorisConnector.sinkDoris(aiCommunityHistoryConfig.getFields(),
aiCommunityHistoryConfig.getTypes(), aiCommunityHistoryConfig.getTableName());
processDataStream(dataStreamSource, "communityHistory", communityHistoryConfig, dorisCommunityHistorySink,
(RowMapper<CommunityHistory>) CommunityHistoryAchi::mapToCommunityHistoryRow);
processDataStream(dataStreamSource, "communityHistoryAbroad", communityHistoryConfig, dorisCommunityHistorySink,
(RowMapper<CommunityHistory>) CommunityHistoryAchi::mapToCommunityHistoryAbroadRow);
processDataStream(dataStreamSource, "communityHistory", aiCommunityHistoryConfig, dorisAiCommunityHistorySink,
(RowMapper<CommunityHistory>) CommunityHistoryAchi::mapToCommunityHistoryRow);
processDataStream(dataStreamSource, "communityHistoryAbroad", aiCommunityHistoryConfig, dorisAiCommunityHistorySink,
(RowMapper<CommunityHistory>) CommunityHistoryAchi::mapToCommunityHistoryAbroadRow);
}
/**
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or sign in to comment