Commit 775278f7 by 魏建枢

增加社区id,增加海外版

parent 6c6db13e
...@@ -90,20 +90,20 @@ public class UserInvitationAchi implements Serializable { ...@@ -90,20 +90,20 @@ public class UserInvitationAchi implements Serializable {
DataTypes.STRING(), DataTypes.INT() }; DataTypes.STRING(), DataTypes.INT() };
//质押表配置 //质押表配置
private static final String[] REAL_STAKING_FIELDS = { "tx_index", "tx_hash", "block_height", "block_timestamp", "from_account_id", "to_account_id", // private static final String[] REAL_STAKING_FIELDS = { "tx_index", "tx_hash", "block_height", "block_timestamp", "from_account_id", "to_account_id",
"is_relayer", "amount", "symbol","post_time", DORIS_DELETE_SIGN }; // "is_relayer", "amount", "symbol","post_time", DORIS_DELETE_SIGN };
private static final DataType[] REAL_STAKING_TYPES = { DataTypes.STRING(), DataTypes.STRING(), DataTypes.STRING(), // private static final DataType[] REAL_STAKING_TYPES = { DataTypes.STRING(), DataTypes.STRING(), DataTypes.STRING(),
DataTypes.STRING(), DataTypes.STRING(), DataTypes.STRING(), DataTypes.STRING(), DataTypes.STRING(), // DataTypes.STRING(), DataTypes.STRING(), DataTypes.STRING(), DataTypes.STRING(), DataTypes.STRING(),
DataTypes.STRING(),DataTypes.STRING(), DataTypes.INT() }; // DataTypes.STRING(),DataTypes.STRING(), DataTypes.INT() };
//质押事件表配置 //质押事件表配置
private static final String[] REAL_STAKING_EVENT_FIELDS = { "event_id", "tx_hash", "block_height","receipt_id", "block_timestamp", "from_account_id", "to_account_id", // private static final String[] REAL_STAKING_EVENT_FIELDS = { "event_id", "tx_hash", "block_height","receipt_id", "block_timestamp", "from_account_id", "to_account_id",
"standard", "event", "version","amount","claim_amount","fee_amount","create_time","vault_id","post_time", DORIS_DELETE_SIGN }; // "standard", "event", "version","amount","claim_amount","fee_amount","create_time","vault_id","post_time", DORIS_DELETE_SIGN };
private static final DataType[] REAL_STAKING_EVENT_TYPES = { DataTypes.STRING(), DataTypes.STRING(), DataTypes.STRING(),DataTypes.STRING(),DataTypes.STRING(),DataTypes.STRING(),DataTypes.STRING(),DataTypes.STRING(), // private static final DataType[] REAL_STAKING_EVENT_TYPES = { DataTypes.STRING(), DataTypes.STRING(), DataTypes.STRING(),DataTypes.STRING(),DataTypes.STRING(),DataTypes.STRING(),DataTypes.STRING(),DataTypes.STRING(),
DataTypes.STRING(), DataTypes.STRING(), DataTypes.STRING(), DataTypes.STRING(), DataTypes.STRING(), // DataTypes.STRING(), DataTypes.STRING(), DataTypes.STRING(), DataTypes.STRING(), DataTypes.STRING(),
DataTypes.STRING(),DataTypes.STRING(),DataTypes.STRING(), DataTypes.INT() }; // DataTypes.STRING(),DataTypes.STRING(),DataTypes.STRING(), DataTypes.INT() };
public static void userInvitation(DataStreamSource<String> dataStreamSource) { public static void userInvitation(DataStreamSource<String> dataStreamSource) {
// 初始化表配置 // 初始化表配置
...@@ -112,8 +112,8 @@ public class UserInvitationAchi implements Serializable { ...@@ -112,8 +112,8 @@ public class UserInvitationAchi implements Serializable {
TableConfig keyConfig = new TableConfig(KYC_FIELDS, KYC_TYPES, "bi.real_kyc"); TableConfig keyConfig = new TableConfig(KYC_FIELDS, KYC_TYPES, "bi.real_kyc");
TableConfig balanceConfig = new TableConfig(BALANCE_FIELDS, BALANCE_TYPES, "bi.real_balance"); TableConfig balanceConfig = new TableConfig(BALANCE_FIELDS, BALANCE_TYPES, "bi.real_balance");
TableConfig leadConfig = new TableConfig(LEAD_FIELDS, LEAD_TYPES, "bi.real_lead_switch"); TableConfig leadConfig = new TableConfig(LEAD_FIELDS, LEAD_TYPES, "bi.real_lead_switch");
TableConfig realStakingConfig = new TableConfig(REAL_STAKING_FIELDS, REAL_STAKING_TYPES, "bi.real_staking"); // TableConfig realStakingConfig = new TableConfig(REAL_STAKING_FIELDS, REAL_STAKING_TYPES, "bi.real_staking");
TableConfig realStakingEventConfig = new TableConfig(REAL_STAKING_EVENT_FIELDS, REAL_STAKING_EVENT_TYPES, "bi.real_staking_event"); // TableConfig realStakingEventConfig = new TableConfig(REAL_STAKING_EVENT_FIELDS, REAL_STAKING_EVENT_TYPES, "bi.real_staking_event");
// 创建Doris Sink // 创建Doris Sink
DorisSink<RowData> dorisUsersSink = DorisConnector.sinkDoris(usersConfig.getFields(), usersConfig.getTypes(), DorisSink<RowData> dorisUsersSink = DorisConnector.sinkDoris(usersConfig.getFields(), usersConfig.getTypes(),
...@@ -126,10 +126,10 @@ public class UserInvitationAchi implements Serializable { ...@@ -126,10 +126,10 @@ public class UserInvitationAchi implements Serializable {
balanceConfig.getTypes(), balanceConfig.getTableName()); balanceConfig.getTypes(), balanceConfig.getTableName());
DorisSink<RowData> dorisLeadSink = DorisConnector.sinkDoris(leadConfig.getFields(), leadConfig.getTypes(), DorisSink<RowData> dorisLeadSink = DorisConnector.sinkDoris(leadConfig.getFields(), leadConfig.getTypes(),
leadConfig.getTableName()); leadConfig.getTableName());
DorisSink<RowData> dorisRealStakingSink = DorisConnector.sinkDoris(realStakingConfig.getFields(), realStakingConfig.getTypes(), // DorisSink<RowData> dorisRealStakingSink = DorisConnector.sinkDoris(realStakingConfig.getFields(), realStakingConfig.getTypes(),
realStakingConfig.getTableName()); // realStakingConfig.getTableName());
DorisSink<RowData> dorisRealStakingEventSink = DorisConnector.sinkDoris(realStakingEventConfig.getFields(), realStakingEventConfig.getTypes(), // DorisSink<RowData> dorisRealStakingEventSink = DorisConnector.sinkDoris(realStakingEventConfig.getFields(), realStakingEventConfig.getTypes(),
realStakingEventConfig.getTableName()); // realStakingEventConfig.getTableName());
// 处理用户数据 // 处理用户数据
// processDataStream(dataStreamSource, "realUsers", usersConfig, dorisUsersSink,(RowMapper<RealUsers>) UserInvitationAchi::mapToUsersRow); // processDataStream(dataStreamSource, "realUsers", usersConfig, dorisUsersSink,(RowMapper<RealUsers>) UserInvitationAchi::mapToUsersRow);
...@@ -144,9 +144,9 @@ public class UserInvitationAchi implements Serializable { ...@@ -144,9 +144,9 @@ public class UserInvitationAchi implements Serializable {
//处理真实上级数据 //处理真实上级数据
processDataStream(dataStreamSource, "realLead", leadConfig, dorisLeadSink,(RowMapper<RealLead>) UserInvitationAchi::mapToLeadRow); processDataStream(dataStreamSource, "realLead", leadConfig, dorisLeadSink,(RowMapper<RealLead>) UserInvitationAchi::mapToLeadRow);
//处理质押数据 //处理质押数据
processDataStream(dataStreamSource, "realStaking", realStakingConfig, dorisRealStakingSink,(RowMapper<RealStaking>) UserInvitationAchi::mapToStakingRow); // processDataStream(dataStreamSource, "realStaking", realStakingConfig, dorisRealStakingSink,(RowMapper<RealStaking>) UserInvitationAchi::mapToStakingRow);
//处理质押事件 //处理质押事件
processDataStream(dataStreamSource, "realStakingEvent", realStakingEventConfig, dorisRealStakingEventSink,(RowMapper<RealStakingEvent>) UserInvitationAchi::mapToStakingEventRow); // processDataStream(dataStreamSource, "realStakingEvent", realStakingEventConfig, dorisRealStakingEventSink,(RowMapper<RealStakingEvent>) UserInvitationAchi::mapToStakingEventRow);
} }
/** /**
...@@ -310,6 +310,7 @@ public class UserInvitationAchi implements Serializable { ...@@ -310,6 +310,7 @@ public class UserInvitationAchi implements Serializable {
return row; return row;
} }
// 质押 数据映射 // 质押 数据映射
@Deprecated
private static RowData mapToStakingRow(Object item, int fieldCount) { private static RowData mapToStakingRow(Object item, int fieldCount) {
RealStaking staking = (RealStaking) item; RealStaking staking = (RealStaking) item;
GenericRowData row = new GenericRowData(fieldCount); GenericRowData row = new GenericRowData(fieldCount);
...@@ -327,6 +328,7 @@ public class UserInvitationAchi implements Serializable { ...@@ -327,6 +328,7 @@ public class UserInvitationAchi implements Serializable {
return row; return row;
} }
// 质押事件 数据映射 // 质押事件 数据映射
@Deprecated
private static RowData mapToStakingEventRow(Object item, int fieldCount) { private static RowData mapToStakingEventRow(Object item, int fieldCount) {
RealStakingEvent staking = (RealStakingEvent) item; RealStakingEvent staking = (RealStakingEvent) item;
GenericRowData row = new GenericRowData(fieldCount); GenericRowData row = new GenericRowData(fieldCount);
......
...@@ -259,6 +259,7 @@ public class DorisBaseSchema { ...@@ -259,6 +259,7 @@ public class DorisBaseSchema {
.column("article_id", STRING()) .column("article_id", STRING())
.column("create_time", TIMESTAMP(3)) .column("create_time", TIMESTAMP(3))
.column("platform", STRING()) .column("platform", STRING())
.column("community_id", STRING())
.build(); .build();
createTableDescriptor(tableEnv, schema, tempTableName,dbTableName); createTableDescriptor(tableEnv, schema, tempTableName,dbTableName);
} }
......
...@@ -26,7 +26,7 @@ public class OpenSimiApiTable { ...@@ -26,7 +26,7 @@ public class OpenSimiApiTable {
" t.simi_api_info['platform'] AS platform " + " t.simi_api_info['platform'] AS platform " +
"FROM kafka_open_simi_api AS k " + "FROM kafka_open_simi_api AS k " +
"LEFT JOIN LATERAL TABLE(ParseOpenSimiApi(req_body,send_type,flume_type)) AS t(`time`,simi_api_info) ON TRUE " + "LEFT JOIN LATERAL TABLE(ParseOpenSimiApi(req_body,send_type,flume_type)) AS t(`time`,simi_api_info) ON TRUE " +
"WHERE send_type = '1'" "WHERE send_type in ('1','101')"
); );
//2 //2
tableEnv.executeSql( tableEnv.executeSql(
...@@ -41,7 +41,7 @@ public class OpenSimiApiTable { ...@@ -41,7 +41,7 @@ public class OpenSimiApiTable {
" t.simi_api_info['platform'] AS platform " + " t.simi_api_info['platform'] AS platform " +
"FROM kafka_open_simi_api AS k " + "FROM kafka_open_simi_api AS k " +
"LEFT JOIN LATERAL TABLE(ParseOpenSimiApi(req_body,send_type,flume_type)) AS t(`time`,simi_api_info) ON TRUE " + "LEFT JOIN LATERAL TABLE(ParseOpenSimiApi(req_body,send_type,flume_type)) AS t(`time`,simi_api_info) ON TRUE " +
"WHERE send_type = '2'" "WHERE send_type in ('2','102')"
); );
//3 //3
tableEnv.executeSql( tableEnv.executeSql(
...@@ -58,7 +58,7 @@ public class OpenSimiApiTable { ...@@ -58,7 +58,7 @@ public class OpenSimiApiTable {
" t.simi_api_info['platform'] AS platform " + " t.simi_api_info['platform'] AS platform " +
"FROM kafka_open_simi_api AS k " + "FROM kafka_open_simi_api AS k " +
"LEFT JOIN LATERAL TABLE(ParseOpenSimiApi(req_body,send_type,flume_type)) AS t(`time`,simi_api_info) ON TRUE " + "LEFT JOIN LATERAL TABLE(ParseOpenSimiApi(req_body,send_type,flume_type)) AS t(`time`,simi_api_info) ON TRUE " +
"WHERE send_type = '3'" "WHERE send_type in ('3','103')"
); );
//4 //4
tableEnv.executeSql( tableEnv.executeSql(
...@@ -75,7 +75,7 @@ public class OpenSimiApiTable { ...@@ -75,7 +75,7 @@ public class OpenSimiApiTable {
" t.simi_api_info['platform'] AS platform " + " t.simi_api_info['platform'] AS platform " +
"FROM kafka_open_simi_api AS k " + "FROM kafka_open_simi_api AS k " +
"LEFT JOIN LATERAL TABLE(ParseOpenSimiApi(req_body,send_type,flume_type)) AS t(`time`,simi_api_info) ON TRUE " + "LEFT JOIN LATERAL TABLE(ParseOpenSimiApi(req_body,send_type,flume_type)) AS t(`time`,simi_api_info) ON TRUE " +
"WHERE send_type = '4'" "WHERE send_type in ('4','104')"
); );
//5 //5
tableEnv.executeSql( tableEnv.executeSql(
...@@ -91,7 +91,7 @@ public class OpenSimiApiTable { ...@@ -91,7 +91,7 @@ public class OpenSimiApiTable {
" t.simi_api_info['platform'] AS platform " + " t.simi_api_info['platform'] AS platform " +
"FROM kafka_open_simi_api AS k " + "FROM kafka_open_simi_api AS k " +
"LEFT JOIN LATERAL TABLE(ParseOpenSimiApi(req_body,send_type,flume_type)) AS t(`time`,simi_api_info) ON TRUE " + "LEFT JOIN LATERAL TABLE(ParseOpenSimiApi(req_body,send_type,flume_type)) AS t(`time`,simi_api_info) ON TRUE " +
"WHERE send_type = '5'" "WHERE send_type in ('5','105')"
); );
//6 //6
tableEnv.executeSql( tableEnv.executeSql(
...@@ -107,7 +107,7 @@ public class OpenSimiApiTable { ...@@ -107,7 +107,7 @@ public class OpenSimiApiTable {
" t.simi_api_info['platform'] AS platform " + " t.simi_api_info['platform'] AS platform " +
"FROM kafka_open_simi_api AS k " + "FROM kafka_open_simi_api AS k " +
"LEFT JOIN LATERAL TABLE(ParseOpenSimiApi(req_body,send_type,flume_type)) AS t(`time`,simi_api_info) ON TRUE " + "LEFT JOIN LATERAL TABLE(ParseOpenSimiApi(req_body,send_type,flume_type)) AS t(`time`,simi_api_info) ON TRUE " +
"WHERE send_type = '6'" "WHERE send_type in ('6','106')"
); );
//7 //7
tableEnv.executeSql( tableEnv.executeSql(
...@@ -121,7 +121,7 @@ public class OpenSimiApiTable { ...@@ -121,7 +121,7 @@ public class OpenSimiApiTable {
" t.simi_api_info['platform'] AS platform " + " t.simi_api_info['platform'] AS platform " +
"FROM kafka_open_simi_api AS k " + "FROM kafka_open_simi_api AS k " +
"LEFT JOIN LATERAL TABLE(ParseOpenSimiApi(req_body,send_type,flume_type)) AS t(`time`,simi_api_info) ON TRUE " + "LEFT JOIN LATERAL TABLE(ParseOpenSimiApi(req_body,send_type,flume_type)) AS t(`time`,simi_api_info) ON TRUE " +
"WHERE send_type = '7'" "WHERE send_type in ('7','107')"
); );
//8 //8
tableEnv.executeSql( tableEnv.executeSql(
...@@ -137,7 +137,7 @@ public class OpenSimiApiTable { ...@@ -137,7 +137,7 @@ public class OpenSimiApiTable {
" t.simi_api_info['platform'] AS platform " + " t.simi_api_info['platform'] AS platform " +
"FROM kafka_open_simi_api AS k " + "FROM kafka_open_simi_api AS k " +
"LEFT JOIN LATERAL TABLE(ParseOpenSimiApi(req_body,send_type,flume_type)) AS t(`time`,simi_api_info) ON TRUE " + "LEFT JOIN LATERAL TABLE(ParseOpenSimiApi(req_body,send_type,flume_type)) AS t(`time`,simi_api_info) ON TRUE " +
"WHERE send_type = '8'" "WHERE send_type in ('8','108')"
); );
//9 //9
tableEnv.executeSql( tableEnv.executeSql(
...@@ -153,7 +153,7 @@ public class OpenSimiApiTable { ...@@ -153,7 +153,7 @@ public class OpenSimiApiTable {
" t.simi_api_info['platform'] AS platform " + " t.simi_api_info['platform'] AS platform " +
"FROM kafka_open_simi_api AS k " + "FROM kafka_open_simi_api AS k " +
"LEFT JOIN LATERAL TABLE(ParseOpenSimiApi(req_body,send_type,flume_type)) AS t(`time`,simi_api_info) ON TRUE " + "LEFT JOIN LATERAL TABLE(ParseOpenSimiApi(req_body,send_type,flume_type)) AS t(`time`,simi_api_info) ON TRUE " +
"WHERE send_type = '9'" "WHERE send_type in ('9','109')"
); );
//10 //10
tableEnv.executeSql( tableEnv.executeSql(
...@@ -169,7 +169,7 @@ public class OpenSimiApiTable { ...@@ -169,7 +169,7 @@ public class OpenSimiApiTable {
" t.simi_api_info['platform'] AS platform " + " t.simi_api_info['platform'] AS platform " +
"FROM kafka_open_simi_api AS k " + "FROM kafka_open_simi_api AS k " +
"LEFT JOIN LATERAL TABLE(ParseOpenSimiApi(req_body,send_type,flume_type)) AS t(`time`,simi_api_info) ON TRUE " + "LEFT JOIN LATERAL TABLE(ParseOpenSimiApi(req_body,send_type,flume_type)) AS t(`time`,simi_api_info) ON TRUE " +
"WHERE send_type = '10'" "WHERE send_type in ('10','110')"
); );
//11 //11
tableEnv.executeSql( tableEnv.executeSql(
...@@ -181,10 +181,11 @@ public class OpenSimiApiTable { ...@@ -181,10 +181,11 @@ public class OpenSimiApiTable {
" t.simi_api_info['releaseType'] AS release_type, " + " t.simi_api_info['releaseType'] AS release_type, " +
" t.simi_api_info['articleId'] AS article_id, " + " t.simi_api_info['articleId'] AS article_id, " +
" k.create_time AS create_time, " + " k.create_time AS create_time, " +
" t.simi_api_info['platform'] AS platform " + " t.simi_api_info['platform'] AS platform, " +
" t.simi_api_info['communityId'] AS community_id " +
"FROM kafka_open_simi_api AS k " + "FROM kafka_open_simi_api AS k " +
"LEFT JOIN LATERAL TABLE(ParseOpenSimiApi(req_body,send_type,flume_type)) AS t(`time`,simi_api_info) ON TRUE " + "LEFT JOIN LATERAL TABLE(ParseOpenSimiApi(req_body,send_type,flume_type)) AS t(`time`,simi_api_info) ON TRUE " +
"WHERE send_type = '11'" "WHERE send_type in ('11','111')"
); );
//12 //12
tableEnv.executeSql( tableEnv.executeSql(
...@@ -203,7 +204,7 @@ public class OpenSimiApiTable { ...@@ -203,7 +204,7 @@ public class OpenSimiApiTable {
" t.simi_api_info['platform'] AS platform " + " t.simi_api_info['platform'] AS platform " +
"FROM kafka_open_simi_api AS k " + "FROM kafka_open_simi_api AS k " +
"LEFT JOIN LATERAL TABLE(ParseOpenSimiApi(req_body,send_type,flume_type)) AS t(`time`,simi_api_info) ON TRUE " + "LEFT JOIN LATERAL TABLE(ParseOpenSimiApi(req_body,send_type,flume_type)) AS t(`time`,simi_api_info) ON TRUE " +
"WHERE send_type = '12'" "WHERE send_type in ('12','112')"
); );
} }
......
...@@ -61,13 +61,13 @@ public class ParseOpenSimiApiUDTF extends TableFunction<Row>{ ...@@ -61,13 +61,13 @@ public class ParseOpenSimiApiUDTF extends TableFunction<Row>{
if(StringUtils.isAllEmpty(send_type,req_body,flume_type)) { if(StringUtils.isAllEmpty(send_type,req_body,flume_type)) {
return; return;
} }
if(!CompareUtils.stringExists(flume_type, "openSimiApi","openAbroadSimiApiTest")) { if(!CompareUtils.stringExists(flume_type, "openSimiApi","openAbroadSimiApi")) {
return; return;
} }
String platform = null; String platform = null;
if(StringUtils.equals(flume_type, "openSimiApi")) { if(StringUtils.equals(flume_type, "openSimiApi")) {
platform = "国内版"; platform = "国内版";
}else if(StringUtils.equals(flume_type, "openAbroadSimiApiTest")) { }else if(StringUtils.equals(flume_type, "openAbroadSimiApi")) {
platform = "海外版"; platform = "海外版";
} }
Map<String, Object> params = new HashMap<>(); Map<String, Object> params = new HashMap<>();
...@@ -157,6 +157,7 @@ public class ParseOpenSimiApiUDTF extends TableFunction<Row>{ ...@@ -157,6 +157,7 @@ public class ParseOpenSimiApiUDTF extends TableFunction<Row>{
params.put("cid", jsonReqDto.getCid()); params.put("cid", jsonReqDto.getCid());
params.put("releaseType", jsonReqDto.getReleaseType()); params.put("releaseType", jsonReqDto.getReleaseType());
params.put("articleId", jsonReqDto.getArticleId()); params.put("articleId", jsonReqDto.getArticleId());
params.put("communityId", jsonReqDto.getCommunityId());
timestamp = convertToLocalDateTime(jsonReqDto.getTime()); timestamp = convertToLocalDateTime(jsonReqDto.getTime());
}else if(StringUtils.equals(send_type, OpenSimiApiTypeEnum.CONTENT_INTERACTION.getCode())) {//12 }else if(StringUtils.equals(send_type, OpenSimiApiTypeEnum.CONTENT_INTERACTION.getCode())) {//12
ContentInteractionReqDto jsonReqDto = JSONObject.parseObject(req_body,new TypeReference<ContentInteractionReqDto>(){}); ContentInteractionReqDto jsonReqDto = JSONObject.parseObject(req_body,new TypeReference<ContentInteractionReqDto>(){});
......
...@@ -20,4 +20,5 @@ public class PublishContentReqDto extends OpenSimiApiBaseReqDto implements Seria ...@@ -20,4 +20,5 @@ public class PublishContentReqDto extends OpenSimiApiBaseReqDto implements Seria
private String releaseType;//发布类型 BLOG:博客 OTHER:其它 private String releaseType;//发布类型 BLOG:博客 OTHER:其它
private String articleId;//文章ID(关联具体内容) private String articleId;//文章ID(关联具体内容)
private String communityId; //社区ID
} }
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or sign in to comment