Commit 652a8b37 by 魏建枢

提交代码

parent f0508aeb
package com.flink.achieve.base; package com.flink.achieve.base;
import java.io.Serializable; import java.io.Serializable;
import java.util.ArrayList;
import java.util.List;
import java.util.Objects; import java.util.Objects;
import org.apache.commons.lang3.StringUtils;
import org.apache.doris.flink.sink.DorisSink; import org.apache.doris.flink.sink.DorisSink;
import org.apache.flink.api.common.functions.FlatMapFunction; import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.common.typeinfo.TypeInformation; import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.calcite.shaded.org.apache.commons.codec.binary.StringUtils;
import org.apache.flink.streaming.api.datastream.DataStreamSource; import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator; import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.table.api.DataTypes; import org.apache.flink.table.api.DataTypes;
...@@ -20,13 +22,14 @@ import org.slf4j.LoggerFactory; ...@@ -20,13 +22,14 @@ import org.slf4j.LoggerFactory;
import com.alibaba.fastjson.JSON; import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONObject; import com.alibaba.fastjson.JSONObject;
import com.alibaba.fastjson.TypeReference;
import com.flink.common.DorisConnector; import com.flink.common.DorisConnector;
import com.flink.config.TableConfig; import com.flink.config.TableConfig;
import com.flink.enums.OpenSimiApiTypeEnum; import com.flink.enums.OpenSimiApiTypeEnum;
import com.flink.vo.simi.ContentInteractionReqDto;
import com.flink.vo.simi.CreateGroupReqDto; import com.flink.vo.simi.CreateGroupReqDto;
import com.flink.vo.simi.JoinGroupReqDto; import com.flink.vo.simi.JoinGroupReqDto;
import com.flink.vo.simi.LeaveGroupReqDto; import com.flink.vo.simi.LeaveGroupReqDto;
import com.flink.vo.simi.PublishContentReqDto;
/** /**
* @author wjs * @author wjs
...@@ -53,18 +56,36 @@ public class OpenSimiApiAchi implements Serializable { ...@@ -53,18 +56,36 @@ public class OpenSimiApiAchi implements Serializable {
private static final DataType[] USERS_JOIN_EXIT_GROUP_TYPES = { DataTypes.STRING(), DataTypes.STRING(), private static final DataType[] USERS_JOIN_EXIT_GROUP_TYPES = { DataTypes.STRING(), DataTypes.STRING(),
DataTypes.STRING(), DataTypes.STRING(), DataTypes.STRING(), DataTypes.STRING(), DataTypes.INT() }; DataTypes.STRING(), DataTypes.STRING(), DataTypes.STRING(), DataTypes.STRING(), DataTypes.INT() };
// 用户发帖记录表配置
private static final String[] USERS_COMMUNITY_POSTING_FIELDS = { "cid","time",
"behavior_type","article_id", DORIS_DELETE_SIGN };
private static final DataType[] USERS_COMMUNITY_POSTING_TYPES = { DataTypes.STRING(), DataTypes.STRING(),
DataTypes.STRING(), DataTypes.STRING(), DataTypes.INT() };
public static void openSimiApi(DataStreamSource<String> dataStreamSource) { public static void openSimiApi(DataStreamSource<String> dataStreamSource) {
// 初始化表配置 // 初始化表配置
TableConfig usersJoinExitGroupConfig = new TableConfig(USERS_JOIN_EXIT_GROUP_FIELDS, TableConfig usersJoinExitGroupConfig = new TableConfig(USERS_JOIN_EXIT_GROUP_FIELDS,
USERS_JOIN_EXIT_GROUP_TYPES, "bi.user_join_exit_group_record"); USERS_JOIN_EXIT_GROUP_TYPES, "ai.user_join_exit_group_record");
// 创建Doris Sink
DorisSink<RowData> dorisUsersSink = DorisConnector.sinkDoris(usersJoinExitGroupConfig.getFields(), TableConfig usersCommunityPostingConfig = new TableConfig(USERS_COMMUNITY_POSTING_FIELDS,
USERS_COMMUNITY_POSTING_TYPES, "ai.user_community_posting_record");
//创建Doris Sink
DorisSink<RowData> dorisUserJoinExitGroupSink = DorisConnector.sinkDoris(usersJoinExitGroupConfig.getFields(),
usersJoinExitGroupConfig.getTypes(), usersJoinExitGroupConfig.getTableName()); usersJoinExitGroupConfig.getTypes(), usersJoinExitGroupConfig.getTableName());
DorisSink<RowData> dorisUserCommunityPostingSink = DorisConnector.sinkDoris(usersCommunityPostingConfig.getFields(),
usersCommunityPostingConfig.getTypes(), usersCommunityPostingConfig.getTableName());
// 处理用户数据 // 处理数据
processDataStream(dataStreamSource, "openSimiApi", usersJoinExitGroupConfig, dorisUsersSink, processDataStream(dataStreamSource, "openSimiApi", usersJoinExitGroupConfig, dorisUserJoinExitGroupSink,
(sendType, item, fieldCount) -> mapToUsersJoinExitGroupRow(sendType, item, fieldCount)); (sendType, item, fieldCount) -> mapToUsersJoinExitGroupRow(sendType, item, fieldCount));
processDataStream(dataStreamSource, "openSimiApi", usersCommunityPostingConfig, dorisUserCommunityPostingSink,
(sendType, item, fieldCount) -> mapToUsersCommunityPostingRow(sendType, item, fieldCount));
} }
...@@ -99,54 +120,102 @@ public class OpenSimiApiAchi implements Serializable { ...@@ -99,54 +120,102 @@ public class OpenSimiApiAchi implements Serializable {
} }
@Override @Override
public void flatMap(String value, Collector<RowData> out) throws Exception { public void flatMap(String value, Collector<RowData> out){
try {
JSONObject jsonObj = JSON.parseObject(value);
if (!flumeType.equals(jsonObj.getString(FLUME_TYPE_FIELD))) return;
String reqBody = jsonObj.getString(DATA_FIELD);
String sendType = jsonObj.getString(SEND_TYPE);
Object item = parseReqBody(sendType, reqBody); // 解析请求体
if (item != null) {
List<RowData> rows = mapper.map(sendType, item, fieldCount);
for (RowData row : rows) {
if (row != null) out.collect(row); // 逐条输出拆分后的记录
}
}
} catch (Exception e) {
logger.error("数据处理异常 | rawData:{} | error:{}", value, e.getMessage(), e);
}
}
private Object parseReqBody(String sendType, String reqBody) {
try { try {
JSONObject jsonObj = JSON.parseObject(value);
if (!flumeType.equals(jsonObj.getString(FLUME_TYPE_FIELD))) {
return;
}
String reqBody = jsonObj.getString(DATA_FIELD);
String sendType = jsonObj.getString(SEND_TYPE);
Object item = null;
if (StringUtils.equals(sendType, OpenSimiApiTypeEnum.CREATE_GROUP.getCode())) { if (StringUtils.equals(sendType, OpenSimiApiTypeEnum.CREATE_GROUP.getCode())) {
item = JSON.parseObject(reqBody, new TypeReference<CreateGroupReqDto>() { return JSON.parseObject(reqBody, CreateGroupReqDto.class);
}); } else if (StringUtils.equals(sendType, OpenSimiApiTypeEnum.JOIN_GROUP.getCode())) {
} else if (StringUtils.equals(sendType, OpenSimiApiTypeEnum.JOIN_GROUP.getCode())) { return JSON.parseObject(reqBody, JoinGroupReqDto.class);
item = JSON.parseObject(reqBody, new TypeReference<JoinGroupReqDto>() { } else if (StringUtils.equals(sendType, OpenSimiApiTypeEnum.LEAVE_GROUP.getCode())) {
}); return JSON.parseObject(reqBody, LeaveGroupReqDto.class);
} else if (StringUtils.equals(sendType, OpenSimiApiTypeEnum.LEAVE_GROUP.getCode())) { } else if (StringUtils.equals(sendType, OpenSimiApiTypeEnum.PUBLISH_CONTENT.getCode())) {
item = JSON.parseObject(reqBody, new TypeReference<LeaveGroupReqDto>() { return JSON.parseObject(reqBody, PublishContentReqDto.class);
}); } else if (StringUtils.equals(sendType, OpenSimiApiTypeEnum.CONTENT_INTERACTION.getCode())) {
} return JSON.parseObject(reqBody, ContentInteractionReqDto.class);
}
if (item != null) { }catch (Exception e) {
RowData row = mapper.map(sendType, item, fieldCount); logger.error("JSON解析失败 | sendType:{} | reqBody:{}", sendType, reqBody, e);
if (row != null) { }
out.collect(row); return null;
}
}
} catch (Exception e) {
logger.error("OpenSimiApiAchi 处理 {} 数据出错 | rawData:{} | error:{}", flumeType, value, e.getMessage(), e);
}
} }
} }
// 用户群数据映射 private static List<RowData> mapToUsersJoinExitGroupRow(String sendType, Object item, int fieldCount) {
private static RowData mapToUsersJoinExitGroupRow(String sendType, Object item, int fieldCount) { List<RowData> rows = new ArrayList<>();
GenericRowData row = new GenericRowData(fieldCount); String[] cidArray = extractCids(sendType, item); // 提取并拆分 cid
if (StringUtils.equals(sendType, OpenSimiApiTypeEnum.CREATE_GROUP.getCode())) { if (cidArray != null) {
CreateGroupReqDto createGroup = (CreateGroupReqDto) item; for (String cid : cidArray) {
row.setField(0, StringData.fromString(createGroup.getCid())); GenericRowData row = new GenericRowData(fieldCount);
row.setField(1, StringData.fromString(createGroup.getGroupId())); populateRowFields(sendType, item, row, cid.trim()); // 填充行数据
row.setField(2, StringData.fromString(createGroup.getTime())); row.setField(6, DELETE_SIGN_VALUE);
row.setField(3, StringData.fromString("加群")); rows.add(row);
row.setField(4, StringData.fromString("邀请加入")); }
row.setField(5, null); // 群名称 }
} else if (StringUtils.equals(sendType, OpenSimiApiTypeEnum.JOIN_GROUP.getCode())) { return rows;
}
private static List<RowData> mapToUsersCommunityPostingRow(String sendType, Object item, int fieldCount) {
List<RowData> rows = new ArrayList<>();
String cid = null;
if (OpenSimiApiTypeEnum.PUBLISH_CONTENT.getCode().equals(sendType)) {
cid = ((PublishContentReqDto) item).getCid();
} else if (OpenSimiApiTypeEnum.CONTENT_INTERACTION.getCode().equals(sendType)) {
cid = ((ContentInteractionReqDto) item).getCid();
}
if (StringUtils.isNotEmpty(cid)) {
GenericRowData row = new GenericRowData(fieldCount);
populateRowFields(sendType, item, row, cid.trim()); // 填充行数据
row.setField(4, DELETE_SIGN_VALUE);
rows.add(row);
}
return rows;
}
private static String[] extractCids(String sendType, Object item) {
String cids = null;
if (OpenSimiApiTypeEnum.CREATE_GROUP.getCode().equals(sendType)) {
cids = ((CreateGroupReqDto) item).getCid();
} else if (OpenSimiApiTypeEnum.JOIN_GROUP.getCode().equals(sendType)) {
cids = ((JoinGroupReqDto) item).getCid();
} else if (OpenSimiApiTypeEnum.LEAVE_GROUP.getCode().equals(sendType)) {
cids = ((LeaveGroupReqDto) item).getCid();
}
return (cids != null) ? cids.split(",") : null;
}
private static void populateRowFields(String sendType, Object item, GenericRowData row, String singleCid) {
if (StringUtils.equals(sendType, OpenSimiApiTypeEnum.CREATE_GROUP.getCode())) {
CreateGroupReqDto dto = (CreateGroupReqDto) item;
row.setField(0, StringData.fromString(singleCid));
row.setField(1, StringData.fromString(dto.getGroupId()));
row.setField(2, StringData.fromString(dto.getTime()));
row.setField(3, StringData.fromString("加群"));
row.setField(4, StringData.fromString("邀请加入"));
row.setField(5, null);
}else if (StringUtils.equals(sendType, OpenSimiApiTypeEnum.JOIN_GROUP.getCode())) {
JoinGroupReqDto joinGroup = (JoinGroupReqDto) item; JoinGroupReqDto joinGroup = (JoinGroupReqDto) item;
row.setField(0, StringData.fromString(joinGroup.getCid())); row.setField(0, StringData.fromString(singleCid));
row.setField(1, StringData.fromString(joinGroup.getGroupId())); row.setField(1, StringData.fromString(joinGroup.getGroupId()));
row.setField(2, StringData.fromString(joinGroup.getTime())); row.setField(2, StringData.fromString(joinGroup.getTime()));
row.setField(3, StringData.fromString("加群")); row.setField(3, StringData.fromString("加群"));
...@@ -154,16 +223,25 @@ public class OpenSimiApiAchi implements Serializable { ...@@ -154,16 +223,25 @@ public class OpenSimiApiAchi implements Serializable {
row.setField(5, StringData.fromString(joinGroup.getGroupName())); row.setField(5, StringData.fromString(joinGroup.getGroupName()));
} else if (StringUtils.equals(sendType, OpenSimiApiTypeEnum.LEAVE_GROUP.getCode())) { } else if (StringUtils.equals(sendType, OpenSimiApiTypeEnum.LEAVE_GROUP.getCode())) {
LeaveGroupReqDto leaveGroup = (LeaveGroupReqDto) item; LeaveGroupReqDto leaveGroup = (LeaveGroupReqDto) item;
row.setField(0, StringData.fromString(leaveGroup.getCid())); row.setField(0, StringData.fromString(singleCid));
row.setField(1, StringData.fromString(leaveGroup.getGroupId())); row.setField(1, StringData.fromString(leaveGroup.getGroupId()));
row.setField(2, StringData.fromString(leaveGroup.getTime())); row.setField(2, StringData.fromString(leaveGroup.getTime()));
row.setField(3, StringData.fromString("退群")); row.setField(3, StringData.fromString("退群"));
row.setField(4, StringData.fromString(leaveGroup.getExitType())); row.setField(4, StringData.fromString(leaveGroup.getExitType()));
row.setField(5, StringData.fromString(leaveGroup.getGroupName())); row.setField(5, StringData.fromString(leaveGroup.getGroupName()));
} else if (StringUtils.equals(sendType, OpenSimiApiTypeEnum.PUBLISH_CONTENT.getCode())) {
PublishContentReqDto publishContent = (PublishContentReqDto) item;
row.setField(0, StringData.fromString(singleCid));
row.setField(1, StringData.fromString(publishContent.getTime()));
row.setField(2, StringData.fromString("发帖"));
row.setField(3, StringData.fromString(publishContent.getArticleId()));
} else if (StringUtils.equals(sendType, OpenSimiApiTypeEnum.CONTENT_INTERACTION.getCode())) {
ContentInteractionReqDto contentInteraction = (ContentInteractionReqDto) item;
row.setField(0, StringData.fromString(singleCid));
row.setField(1, StringData.fromString(contentInteraction.getTime()));
row.setField(2, StringData.fromString(contentInteraction.getActionType()));
row.setField(3, StringData.fromString(contentInteraction.getArticleId()));
} }
row.setField(6, DELETE_SIGN_VALUE);
return row;
} }
/** /**
...@@ -173,7 +251,7 @@ public class OpenSimiApiAchi implements Serializable { ...@@ -173,7 +251,7 @@ public class OpenSimiApiAchi implements Serializable {
*/ */
@FunctionalInterface @FunctionalInterface
private interface RowMapper extends Serializable { private interface RowMapper extends Serializable {
RowData map(String sendType, Object item, int fieldCount); List<RowData> map(String sendType, Object item, int fieldCount); // 返回列表而非单个对象
} }
} }
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or sign in to comment