Commit 0a7239f6 by 魏建枢

代码整合提交

parent d8f3af8a
...@@ -40,10 +40,10 @@ public class CommonConsumeBaseAchi extends MultipleSourceCommonBase implements S ...@@ -40,10 +40,10 @@ public class CommonConsumeBaseAchi extends MultipleSourceCommonBase implements S
DataStreamSource<String> eventLogStreamSource = kafkaDataSource.getDataStreamSource(); DataStreamSource<String> eventLogStreamSource = kafkaDataSource.getDataStreamSource();
EventLogAchi.eventLog(eventLogStreamSource); EventLogAchi.eventLog(eventLogStreamSource);
} }
// if(StringUtils.equals(kafkaDataSource.getTopic(),TopicTypeEnum.ODS_COMMUNITY_HISTORY.getTopic())) { if(StringUtils.equals(kafkaDataSource.getTopic(),TopicTypeEnum.ODS_COMMUNITY_HISTORY.getTopic())) {
// DataStreamSource<String> communityHistoryStreamSource = kafkaDataSource.getDataStreamSource(); DataStreamSource<String> communityHistoryStreamSource = kafkaDataSource.getDataStreamSource();
// CommunityHistoryAchi.communityHistory(communityHistoryStreamSource); CommunityHistoryAchi.communityHistory(communityHistoryStreamSource);
// } }
if(StringUtils.equals(kafkaDataSource.getTopic(),TopicTypeEnum.ODS_NEW_COLLECT_LOG.getTopic())) { if(StringUtils.equals(kafkaDataSource.getTopic(),TopicTypeEnum.ODS_NEW_COLLECT_LOG.getTopic())) {
DataStreamSource<String> collectLogStreamSource = kafkaDataSource.getDataStreamSource(); DataStreamSource<String> collectLogStreamSource = kafkaDataSource.getDataStreamSource();
CollectLogAchi.collectLog(collectLogStreamSource); CollectLogAchi.collectLog(collectLogStreamSource);
......
package com.flink.achieve.base; package com.flink.achieve.base;
import java.io.Serializable; import java.io.Serializable;
import java.util.List;
import java.util.Objects;
import org.apache.commons.collections.CollectionUtils;
import org.apache.doris.flink.sink.DorisSink;
import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.streaming.api.datastream.DataStreamSource; import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.table.api.DataTypes;
import org.apache.flink.table.data.GenericRowData;
import org.apache.flink.table.data.RowData;
import org.apache.flink.table.data.StringData;
import org.apache.flink.table.types.DataType;
import org.apache.flink.util.Collector;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/** import com.alibaba.fastjson.JSON;
* @author wjs import com.alibaba.fastjson.JSONObject;
* @version 创建时间:2025-7-28 10:40:20 import com.flink.common.DorisConnector;
* 类说明 import com.flink.config.TableConfig;
*/ import com.flink.util.TimeConvertUtil;
public class CommunityHistoryAchi implements Serializable{ import com.flink.vo.CommunityHistory;
import com.flink.vo.RiskInfo;
/**
* @author wjs
* @version 创建时间:2025-7-28 10:40:20 类说明
*/
public class CommunityHistoryAchi implements Serializable {
/** /**
* *
*/ */
private static final long serialVersionUID = 1L; private static final long serialVersionUID = 1L;
private static final Logger logger = LoggerFactory.getLogger(CommunityHistoryAchi.class);
// 定义公共常量
private static final String FLUME_TYPE_FIELD = "flume_type";
private static final String DATA_FIELD = "list";
private static final String DORIS_DELETE_SIGN = "__DORIS_DELETE_SIGN__";
private static final int DELETE_SIGN_VALUE = 0;
private static final String[] COMMUNITY_HISTORY_FIELDS = { "article_id", "source_type", "article_type",
"article_state", "create_time", "article_user_cid", "article_text", "images", "label", "score", "law",
"law_image", "politics", "politics_image", "ad", "ad_image", "hot_search_title", DORIS_DELETE_SIGN };
private static final DataType[] COMMUNITY_HISTORY_TYPES = { DataTypes.STRING(), DataTypes.STRING(),
DataTypes.STRING(), DataTypes.STRING(), DataTypes.STRING(), DataTypes.STRING(), DataTypes.STRING(),
DataTypes.STRING(), DataTypes.STRING(), DataTypes.STRING(), DataTypes.STRING(), DataTypes.STRING(),
DataTypes.STRING(), DataTypes.STRING(), DataTypes.STRING(), DataTypes.STRING(), DataTypes.STRING(),
DataTypes.INT() };
public static void communityHistory(DataStreamSource<String> dataStreamSource) {
// 初始化表配置
TableConfig communityHistoryConfig = new TableConfig(COMMUNITY_HISTORY_FIELDS, COMMUNITY_HISTORY_TYPES,
"bi.simi_community_history");
// 创建Doris Sink
DorisSink<RowData> dorisCommunityHistorySink = DorisConnector.sinkDoris(communityHistoryConfig.getFields(),
communityHistoryConfig.getTypes(), communityHistoryConfig.getTableName());
processDataStream(dataStreamSource, "communityHistory", communityHistoryConfig, dorisCommunityHistorySink,
(RowMapper<CommunityHistory>) CommunityHistoryAchi::mapToCommunityHistoryRow);
processDataStream(dataStreamSource, "communityHistoryAbroad", communityHistoryConfig, dorisCommunityHistorySink,
(RowMapper<CommunityHistory>) CommunityHistoryAchi::mapToCommunityHistoryAbroadRow);
}
/**
* 通用数据处理方法
*
* @param dataStream 数据流
* @param flumeType 数据类型
* @param tableConfig 表配置
* @param dorisSink Doris Sink
* @param mapper 行数据映射函数
*/
private static <T> void processDataStream(DataStreamSource<String> dataStream, String flumeType,
TableConfig tableConfig, DorisSink<RowData> dorisSink, RowMapper<T> mapper) {
SingleOutputStreamOperator<RowData> processedStream = dataStream
.flatMap(new ElementProcessor<>(flumeType, mapper, tableConfig.getFields().length))
.returns(TypeInformation.of(RowData.class)).filter(Objects::nonNull);
processedStream.sinkTo(dorisSink).name("Doris-" + flumeType);
}
public static void communityHistory(DataStreamSource<String> communityHistoryStreamSource) { private static class ElementProcessor<T> implements FlatMapFunction<String, RowData>, Serializable {
// TODO Auto-generated method stub
/**
*
*/
private static final long serialVersionUID = 1L;
private final String flumeType;
private final RowMapper<T> mapper;
private final int fieldCount;
public ElementProcessor(String flumeType, RowMapper<T> mapper, int fieldCount) {
this.flumeType = flumeType;
this.mapper = mapper;
this.fieldCount = fieldCount;
}
@Override
public void flatMap(String value, Collector<RowData> out) throws Exception {
try {
JSONObject jsonObj = JSON.parseObject(value);
if (!flumeType.equals(jsonObj.getString(FLUME_TYPE_FIELD))) {
return;
}
String bodyStr = jsonObj.getString(DATA_FIELD);
List<?> dataList = JSON.parseArray(bodyStr, getTargetClass(flumeType));
if (CollectionUtils.isNotEmpty(dataList)) {
for (Object item : dataList) {
RowData row = mapper.map(item, fieldCount);
if (row != null)
out.collect(row);
}
}
} catch (Exception e) {
logger.error("CommunityHistoryAchi 处理 {} 数据出错 | rawData:{} | error:{}", flumeType, value,
e.getMessage(), e);
}
}
private Class<?> getTargetClass(String type) {
switch (type) {
case "communityHistory":
return CommunityHistory.class;
case "communityHistoryAbroad":
return CommunityHistory.class;
default:
throw new IllegalArgumentException("未知类型: " + type);
}
}
}
// 国内社区记录数据映射
private static RowData mapToCommunityHistoryRow(Object item, int fieldCount) {
CommunityHistory communityHistory = (CommunityHistory) item; // 显式类型转换
RiskInfo riskInfo = communityHistory.getRisk();
GenericRowData row = new GenericRowData(fieldCount);
row.setField(0, StringData.fromString(communityHistory.getArticleId()));
row.setField(1, StringData.fromString("国内版"));
row.setField(2, StringData.fromString(communityHistory.getArticleType()));
row.setField(3, StringData.fromString(communityHistory.getArticleState()));
row.setField(4, StringData.fromString(TimeConvertUtil.parseToStringSSS(communityHistory.getCreateTime())));
row.setField(5, StringData.fromString(communityHistory.getArticleUserCid()));
row.setField(6, StringData.fromString(communityHistory.getArticleText()));
row.setField(7, StringData.fromString(communityHistory.getImages()));
row.setField(8, StringData.fromString(communityHistory.getLabel()));
row.setField(9, StringData.fromString(riskInfo == null ? null : riskInfo.getScore()));
row.setField(10, StringData.fromString(riskInfo == null ? null : riskInfo.getLaw()));
row.setField(11, StringData.fromString(riskInfo == null ? null : riskInfo.getLawImage()));
row.setField(12, StringData.fromString(riskInfo == null ? null : riskInfo.getPolitics()));
row.setField(13, StringData.fromString(riskInfo == null ? null : riskInfo.getPoliticsImage()));
row.setField(14, StringData.fromString(riskInfo == null ? null : riskInfo.getAd()));
row.setField(15, StringData.fromString(riskInfo == null ? null : riskInfo.getAdImage()));
row.setField(16, StringData.fromString(communityHistory.getHotSearchTitle()));
row.setField(17, DELETE_SIGN_VALUE);
return row;
}
// 海外社区记录数据映射
private static RowData mapToCommunityHistoryAbroadRow(Object item, int fieldCount) {
CommunityHistory communityHistory = (CommunityHistory) item; // 显式类型转换
RiskInfo riskInfo = communityHistory.getRisk();
GenericRowData row = new GenericRowData(fieldCount);
row.setField(0, StringData.fromString(communityHistory.getArticleId()));
row.setField(1, StringData.fromString("海外版"));
row.setField(2, StringData.fromString(communityHistory.getArticleType()));
row.setField(3, StringData.fromString(communityHistory.getArticleState()));
row.setField(4, StringData.fromString(TimeConvertUtil.parseToStringSSS(communityHistory.getCreateTime())));
row.setField(5, StringData.fromString(communityHistory.getArticleUserCid()));
row.setField(6, StringData.fromString(communityHistory.getArticleText()));
row.setField(7, StringData.fromString(communityHistory.getImages()));
row.setField(8, StringData.fromString(communityHistory.getLabel()));
row.setField(9, StringData.fromString(riskInfo == null ? null : riskInfo.getScore()));
row.setField(10, StringData.fromString(riskInfo == null ? null : riskInfo.getLaw()));
row.setField(11, StringData.fromString(riskInfo == null ? null : riskInfo.getLawImage()));
row.setField(12, StringData.fromString(riskInfo == null ? null : riskInfo.getPolitics()));
row.setField(13, StringData.fromString(riskInfo == null ? null : riskInfo.getPoliticsImage()));
row.setField(14, StringData.fromString(riskInfo == null ? null : riskInfo.getAd()));
row.setField(15, StringData.fromString(riskInfo == null ? null : riskInfo.getAdImage()));
row.setField(16, StringData.fromString(communityHistory.getHotSearchTitle()));
row.setField(17, DELETE_SIGN_VALUE);
return row;
}
/**
* 行数据映射接口
*
* @param <T> 数据类型
*/
@FunctionalInterface
private static interface RowMapper<T> extends Serializable {
RowData map(Object item, int fieldCount);
} }
} }
...@@ -25,7 +25,6 @@ public enum JobTypeEnum { ...@@ -25,7 +25,6 @@ public enum JobTypeEnum {
SIMI_FRIENDS("JOB_09", "SIMI好友作业"), SIMI_FRIENDS("JOB_09", "SIMI好友作业"),
SIMI_GROUPS("JOB_10", "SIMI群组作业"), SIMI_GROUPS("JOB_10", "SIMI群组作业"),
VECTOR_ANGLE_CALCULATION("JOB_11", "矢量角度计算作业"), VECTOR_ANGLE_CALCULATION("JOB_11", "矢量角度计算作业"),
EVENT_LOG("JOB_12", "事件采集作业"),
REGISTRATION_CHECK("JOB_13", "注册检验采集作业"), REGISTRATION_CHECK("JOB_13", "注册检验采集作业"),
; ;
......
...@@ -6,7 +6,6 @@ import com.flink.processor.impl.CommonConsumeBaseProcessor; ...@@ -6,7 +6,6 @@ import com.flink.processor.impl.CommonConsumeBaseProcessor;
import com.flink.processor.impl.DeviceIdLatestProcessor; import com.flink.processor.impl.DeviceIdLatestProcessor;
import com.flink.processor.impl.EventIpConvertProcessor; import com.flink.processor.impl.EventIpConvertProcessor;
import com.flink.processor.impl.EventIpLatestProcessor; import com.flink.processor.impl.EventIpLatestProcessor;
import com.flink.processor.impl.EventLogProcessor;
import com.flink.processor.impl.RegistrationCheckProcessor; import com.flink.processor.impl.RegistrationCheckProcessor;
import com.flink.processor.impl.SimiFriendsProcessor; import com.flink.processor.impl.SimiFriendsProcessor;
import com.flink.processor.impl.SimiGroupstProcessor; import com.flink.processor.impl.SimiGroupstProcessor;
...@@ -33,8 +32,6 @@ public class JobProcessorFactory { ...@@ -33,8 +32,6 @@ public class JobProcessorFactory {
return new SimiGroupstProcessor(); return new SimiGroupstProcessor();
case VECTOR_ANGLE_CALCULATION: case VECTOR_ANGLE_CALCULATION:
return new VectorAngleCalculationProcessor(); return new VectorAngleCalculationProcessor();
case EVENT_LOG:
return new EventLogProcessor();
case REGISTRATION_CHECK: case REGISTRATION_CHECK:
return new RegistrationCheckProcessor(); return new RegistrationCheckProcessor();
case COMMON_CONSUME_BASE: case COMMON_CONSUME_BASE:
......
package com.flink.processor.impl;
import java.util.Arrays;
import java.util.List;
import java.util.stream.Collectors;
import com.flink.achieve.doris.EventLogAchi;
import com.flink.enums.JobTypeEnum;
import com.flink.enums.TopicTypeEnum;
import com.flink.processor.JobProcessor;
import com.flink.vo.KafkaTopic;
/**
* @author wjs
* @version 创建时间:2025-6-20 23:39:38
* 类说明
*/
public class EventLogProcessor implements JobProcessor{
@Override
public void process() throws Exception {
new EventLogAchi().handleDataStreamSource(
createTopicList(),
JobTypeEnum.EVENT_LOG
);
}
private static List<KafkaTopic> createTopicList() {
return Arrays.stream(new TopicTypeEnum[]{
// TopicTypeEnum.ODS_EVENT_LOG,
TopicTypeEnum.ODS_COMMUNITY_HISTORY
}).map(TopicTypeEnum::createKafkaTopic)
.collect(Collectors.toList());
}
}
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or sign in to comment