Commit 60033268 by 魏建枢

解析pc事件信息

parent f2a23b2c
......@@ -43,6 +43,10 @@ public class CommonConsumeBaseAchi extends SourceCommonBase implements Serializa
DataStreamSource<String> eventLogStreamSource = kafkaDataSource.getDataStreamSource();
EventLogAchi.eventLog(eventLogStreamSource);
}
if(StringUtils.equals(kafkaDataSource.getTopic(),TopicTypeEnum.ODS_PC_EVENT_LOG.getTopic())) {
DataStreamSource<String> pcEventLogStreamSource = kafkaDataSource.getDataStreamSource();
PcEventLogAchi.pcEventLog(pcEventLogStreamSource);
}
if(StringUtils.equals(kafkaDataSource.getTopic(),TopicTypeEnum.ODS_COMMUNITY_HISTORY.getTopic())) {
DataStreamSource<String> communityHistoryStreamSource = kafkaDataSource.getDataStreamSource();
CommunityHistoryAchi.communityHistory(communityHistoryStreamSource);
......
package com.flink.achieve.base;
import java.io.Serializable;
import java.time.LocalDateTime;
import java.time.format.DateTimeFormatter;
import java.util.Objects;
import org.apache.commons.lang3.StringUtils;
import org.apache.doris.flink.sink.DorisSink;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.table.api.DataTypes;
import org.apache.flink.table.data.GenericRowData;
import org.apache.flink.table.data.RowData;
import org.apache.flink.table.data.StringData;
import org.apache.flink.table.data.TimestampData;
import org.apache.flink.table.types.DataType;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONObject;
import com.alibaba.fastjson.TypeReference;
import com.flink.common.DorisConnector;
import com.flink.config.TableConfig;
import com.flink.util.TimeConvertUtil;
import com.flink.vo.PcEventInfo;
import com.flink.vo.PcOdsEventLog;
/**
* @author wjs
* @version 创建时间:2025-10-22 18:40:59
* 类说明
*/
public class PcEventLogAchi implements Serializable{
/**
*
*/
private static final long serialVersionUID = 1L;
private static final Logger logger = LoggerFactory.getLogger(EventLogAchi.class);
// 定义公共常量
private static final String FLUME_TYPE_FIELD = "flume_type";
private static final String DORIS_DELETE_SIGN = "__DORIS_DELETE_SIGN__";
private static final int DELETE_SIGN_VALUE = 0;
// 事件采集日志表配置
private static final String[] PC_EVENT_FIELDS = { "id", "dt", "send_time", "create_time", "strategy_group_id",
"app_key", "app_type", "app_channel", "zone_code", "zone_name", "zone_type", "sdk_version", "user_agent",
"device_id", "uid", "strategy_version", "event_list", "route_ip", "cid", "phone", "nick", "unique_id",
DORIS_DELETE_SIGN };
private static final DataType[] PC_EVENT_TYPES = { DataTypes.STRING(), DataTypes.DATE(), DataTypes.TIMESTAMP(3),
DataTypes.TIMESTAMP(3), DataTypes.STRING(), DataTypes.STRING(), DataTypes.STRING(), DataTypes.STRING(),
DataTypes.STRING(), DataTypes.STRING(), DataTypes.STRING(), DataTypes.STRING(), DataTypes.STRING(),
DataTypes.STRING(), DataTypes.STRING(), DataTypes.STRING(), DataTypes.STRING(), DataTypes.STRING(),
DataTypes.STRING(), DataTypes.STRING(), DataTypes.STRING(), DataTypes.STRING(), DataTypes.INT() };
public static void pcEventLog(DataStreamSource<String> dataStreamSource) {
// 初始化表配置
TableConfig pcEventConfig = new TableConfig(PC_EVENT_FIELDS, PC_EVENT_TYPES, "bi.event_log");
// 创建Doris Sink
DorisSink<RowData> dorisPcEventSink = DorisConnector.sinkDoris(pcEventConfig.getFields(), pcEventConfig.getTypes(),
pcEventConfig.getTableName());
// 处理设备信息采集日志数据
processDataStream(dataStreamSource, "pcEventLog", pcEventConfig, dorisPcEventSink,
(item, fieldCount) -> mapToPcEventRow(item, fieldCount));
}
private static RowData mapToPcEventRow(Object item, int fieldCount) {
String value = (String) item;
PcOdsEventLog pcOdsEventLog = JSONObject.parseObject(value, new TypeReference<PcOdsEventLog>() {});
if (null == pcOdsEventLog) {
return null;
}
String event_info = pcOdsEventLog.getEvent_info();
if(StringUtils.isEmpty(event_info)) {
return null;
}
PcEventInfo pcEventInfo = JSONObject.parseObject(event_info, new TypeReference<PcEventInfo>() {});
String id = pcOdsEventLog.getId();
String send_time = pcOdsEventLog.getSend_time();
String create_time = pcOdsEventLog.getCreate_time();
// String strategy_group_id = pcOdsEventLog.getStrategy_group_id();
String app_key = pcOdsEventLog.getApp_key();
String app_type = pcOdsEventLog.getApp_type();
String app_channel = pcOdsEventLog.getApp_channel();
String zone_code = pcOdsEventLog.getZone_code();
String zone_name = pcOdsEventLog.getZone_name();
String zone_type = pcOdsEventLog.getZone_type();
// String sdk_version = pcOdsEventLog.getSdk_version();
String user_agent = pcOdsEventLog.getUser_agent();
String uid = pcEventInfo.getI7();
// String strategy_version = pcOdsEventLog.getStrategy_version();
String device_id = pcEventInfo.getI8();
String unique_id = pcEventInfo.getI7();
String cid = pcEventInfo.getCid();
String phone = pcEventInfo.getPhone();
String nick = pcEventInfo.getNick();
String route_ip = pcEventInfo.getS1();
GenericRowData row = new GenericRowData(fieldCount);
row.setField(0, StringData.fromString(id));
row.setField(1, TimeConvertUtil.convertToSqlDate(create_time.substring(0, 10)));
row.setField(2, TimestampData
.fromLocalDateTime(LocalDateTime.parse(send_time, DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss.SSS"))));
row.setField(3, TimestampData.fromLocalDateTime(
LocalDateTime.parse(create_time, DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss.SSS"))));
row.setField(4, StringData.fromString(null));
row.setField(5, StringData.fromString(app_key));
row.setField(6, StringData.fromString(app_type));
row.setField(7, StringData.fromString(app_channel));
row.setField(8, StringData.fromString(zone_code));
row.setField(9, StringData.fromString(zone_name));
row.setField(10, StringData.fromString(zone_type));
row.setField(11, StringData.fromString(null));
row.setField(12, StringData.fromString(user_agent));
row.setField(13, StringData.fromString(device_id));
row.setField(14, StringData.fromString(uid));
row.setField(15, StringData.fromString(null));
row.setField(16, StringData.fromString(event_info));
row.setField(17, StringData.fromString(route_ip));
row.setField(18, StringData.fromString(cid));
row.setField(19, StringData.fromString(phone));
row.setField(20, StringData.fromString(nick));
row.setField(21, StringData.fromString(unique_id));
row.setField(22, DELETE_SIGN_VALUE);
return row;
}
private static void processDataStream(DataStreamSource<String> dataStream, String flumeType,
TableConfig tableConfig, DorisSink<RowData> dorisSink, RowMapper mapper) {
SingleOutputStreamOperator<RowData> processedStream = dataStream
.map(new ElementProcessorWithMap(flumeType, mapper, tableConfig.getFields().length))
.returns(TypeInformation.of(RowData.class)).filter(Objects::nonNull);
processedStream.sinkTo(dorisSink).name("Doris-" + flumeType);
}
/**
* 使用map算子的内部处理类
*/
private static class ElementProcessorWithMap implements MapFunction<String, RowData>, Serializable {
private static final long serialVersionUID = 1L;
private final String flumeType;
private final RowMapper mapper;
private final int fieldCount;
public ElementProcessorWithMap(String flumeType, RowMapper mapper, int fieldCount) {
this.flumeType = flumeType;
this.mapper = mapper;
this.fieldCount = fieldCount;
}
@Override
public RowData map(String value) throws Exception {
try {
JSONObject jsonObj = JSON.parseObject(value);
if (!flumeType.equals(jsonObj.getString(FLUME_TYPE_FIELD))) {
return null;
}
return mapper.map(value, fieldCount);
} catch (Exception e) {
logger.error("PcEventLogAchi 处理 {} 数据出错 | rawData:{} | error:{}", flumeType, value, e.getMessage(), e);
return null;
}
}
}
/**
* 行数据映射接口
*
* @param <T> 数据类型
*/
@FunctionalInterface
private interface RowMapper extends Serializable {
RowData map(Object item, int fieldCount);
}
}
......@@ -497,10 +497,6 @@ public class SimiFriendsAchi extends SourceCommonBase implements Serializable {
.sinkTo(dorisSinkTotalTemp)
.name("Doris-SimiFriendsTotalTemp");
}
public static void main(String[] args) {
String formattedTime = LocalDateTime.now().format(DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss.SSS"));
System.out.println(formattedTime);
}
//simi好友统计分析
public void simiFriendTotalAnalysis(
......
......@@ -180,7 +180,7 @@ public class KafkaBaseSchema {
.column("flume_type", STRING())
.column("zone_code", STRING())
.columnByMetadata("topic", STRING(), "topic", true) // Kafka 元数据字段
.watermark("create_time", "create_time - INTERVAL '180' SECOND") // 水印策略
.watermark("create_time", "create_time - INTERVAL '3' MINUTE") // 水印策略
.build();
createTableDescriptor(tableEnv, schema, topic, groupId, tableName);
}
......@@ -238,7 +238,7 @@ public class KafkaBaseSchema {
.column("touch_pressure", STRING())
.column("draw_point", STRING())
// .columnByMetadata("topic", STRING(), "topic", true) // Kafka 元数据字段
.watermark("create_time", "create_time - INTERVAL '180' SECOND") // 水印策略
.watermark("create_time", "create_time - INTERVAL '3' MINUTE") // 水印策略
.build();
createTableDescriptor(tableEnv, schema, topic, groupId, tableName);
}
......
......@@ -38,6 +38,7 @@ public class CommonConsumeBaseProcessor implements JobProcessor{
TopicTypeEnum.ODS_EXCEPTION_EVENT_TOPIC,
TopicTypeEnum.ODS_COLLECT_USER_BEHAVIOR,
TopicTypeEnum.SPIDER,
TopicTypeEnum.ODS_PC_EVENT_LOG,
}).map(TopicTypeEnum::createKafkaTopic)
.collect(Collectors.toList());
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or sign in to comment