Commit 8775513f by 魏建枢

增加爬虫落库

parent 5e0e66f4
......@@ -67,6 +67,10 @@ public class CommonConsumeBaseAchi extends SourceCommonBase implements Serializa
DataStreamSource<String> collectUserBehaviorStreamSource = kafkaDataSource.getDataStreamSource();
UserDailyActivityAchi.collectUserBehavior(collectUserBehaviorStreamSource);
}
if(StringUtils.equals(kafkaDataSource.getTopic(),TopicTypeEnum.SPIDER.getTopic())) {
DataStreamSource<String> spiderStreamSource = kafkaDataSource.getDataStreamSource();
SpiderAchi.spider(spiderStreamSource);
}
}
}else {
return;
......
package com.flink.achieve.base;
import java.io.Serializable;
import java.util.Objects;
import org.apache.commons.lang3.StringUtils;
import org.apache.doris.flink.sink.DorisSink;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.table.api.DataTypes;
import org.apache.flink.table.data.GenericRowData;
import org.apache.flink.table.data.RowData;
import org.apache.flink.table.data.StringData;
import org.apache.flink.table.types.DataType;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONObject;
import com.flink.common.DorisConnector;
import com.flink.config.TableConfig;
/**
* @author wjs
* @version 创建时间:2025-10-16 10:52:05
* 类说明
*/
public class SpiderAchi implements Serializable{
/**
*
*/
private static final long serialVersionUID = 1L;
private static final Logger logger = LoggerFactory.getLogger(SpiderAchi.class);
// 定义公共常量
private static final String FLUME_TYPE_FIELD = "flume_type";
private static final String DORIS_DELETE_SIGN = "__DORIS_DELETE_SIGN__";
private static final int DELETE_SIGN_VALUE = 0;
// 爬虫表usdt
private static final String[] USDT_FIELDS = { "spider_time", "usdt",DORIS_DELETE_SIGN };
private static final DataType[] USDT_TYPES = { DataTypes.STRING(), DataTypes.DATE(), DataTypes.INT() };
public static void spider(DataStreamSource<String> dataStreamSource) {
// 初始化表配置
TableConfig usdtConfig = new TableConfig(USDT_FIELDS, USDT_TYPES, "spider.ods_usdt");
// 创建Doris Sink
DorisSink<RowData> dorisUsdtSink = DorisConnector.sinkDoris(usdtConfig.getFields(), usdtConfig.getTypes(),usdtConfig.getTableName());
// 处理设备信息采集日志数据
processDataStream(dataStreamSource, "ods_usdt", usdtConfig, dorisUsdtSink,
(item, fieldCount) -> mapToUsdtRow(item, fieldCount));
}
private static void processDataStream(DataStreamSource<String> dataStream, String flumeType,
TableConfig tableConfig, DorisSink<RowData> dorisSink, RowMapper mapper) {
SingleOutputStreamOperator<RowData> processedStream = dataStream
.map(new ElementProcessorWithMap(flumeType, mapper, tableConfig.getFields().length))
.returns(TypeInformation.of(RowData.class)).filter(Objects::nonNull);
processedStream.sinkTo(dorisSink).name("Doris-" + flumeType);
}
/**
* 使用map算子的内部处理类
*/
private static class ElementProcessorWithMap implements MapFunction<String, RowData>, Serializable {
private static final long serialVersionUID = 1L;
private final String flumeType;
private final RowMapper mapper;
private final int fieldCount;
public ElementProcessorWithMap(String flumeType, RowMapper mapper, int fieldCount) {
this.flumeType = flumeType;
this.mapper = mapper;
this.fieldCount = fieldCount;
}
@Override
public RowData map(String value) throws Exception {
try {
JSONObject jsonObj = JSON.parseObject(value);
if (!flumeType.equals(jsonObj.getString(FLUME_TYPE_FIELD))) {
return null;
}
return mapper.map(value, fieldCount);
} catch (Exception e) {
logger.error("SpiderAchi 处理 {} 数据出错 | rawData:{} | error:{}", flumeType, value, e.getMessage(), e);
return null;
}
}
}
private static RowData mapToUsdtRow(Object item, int fieldCount) {
String value = (String) item;
if(StringUtils.isEmpty(value)) {
return null;
}
JSONObject jsonObj = JSON.parseObject(value);
String usdtStrItem = jsonObj.getString("item");
if(StringUtils.isEmpty(usdtStrItem)) {
return null;
}
JSONObject itemJsonObj = JSON.parseObject(usdtStrItem);
String spider_time = (String) itemJsonObj.get("spider_time");
String usdt = (String) itemJsonObj.get("usdt");
GenericRowData row = new GenericRowData(fieldCount);
row.setField(0, StringData.fromString(spider_time));
row.setField(1, StringData.fromString(usdt));
row.setField(2, DELETE_SIGN_VALUE);
return row;
}
/**
* 行数据映射接口
*
* @param <T> 数据类型
*/
@FunctionalInterface
private interface RowMapper extends Serializable {
RowData map(Object item, int fieldCount);
}
}
......@@ -180,7 +180,7 @@ public class KafkaBaseSchema {
.column("flume_type", STRING())
.column("zone_code", STRING())
.columnByMetadata("topic", STRING(), "topic", true) // Kafka 元数据字段
.watermark("create_time", "create_time - INTERVAL '5' SECOND") // 水印策略
.watermark("create_time", "create_time - INTERVAL '180' SECOND") // 水印策略
.build();
createTableDescriptor(tableEnv, schema, topic, groupId, tableName);
}
......@@ -238,7 +238,7 @@ public class KafkaBaseSchema {
.column("touch_pressure", STRING())
.column("draw_point", STRING())
// .columnByMetadata("topic", STRING(), "topic", true) // Kafka 元数据字段
.watermark("create_time", "create_time - INTERVAL '5' SECOND") // 水印策略
.watermark("create_time", "create_time - INTERVAL '180' SECOND") // 水印策略
.build();
createTableDescriptor(tableEnv, schema, topic, groupId, tableName);
}
......
......@@ -31,6 +31,7 @@ public enum TopicTypeEnum {
ODS_COMMUNITY_HISTORY("ods_community_history","odsCommunityHistory"),
ODS_COLLECT_USER_BEHAVIOR("ods_collect_user_behavior","odsCollectUserBehavior"),
ODS_EXCEPTION_EVENT_TOPIC("ods_exception_event_topic","odsExceptionEventTopic"),
SPIDER("ods_spider","odsSpider"),
;
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or sign in to comment