Commit f2a23b2c by 魏建枢

代码提交

parent 6e43c6af
......@@ -34,6 +34,7 @@ import com.flink.processor.function.UserPropertiesProcessor;
import com.flink.util.CompareUtils;
import com.flink.util.TimeConvertUtil;
import com.flink.vo.EventList;
import com.flink.vo.EventListBak;
import com.flink.vo.OdsEventLog;
import com.flink.vo.Properties;
import com.flink.vo.UserProperties;
......@@ -67,6 +68,39 @@ public class EventLogAchi implements Serializable {
DataTypes.STRING(), DataTypes.STRING(), DataTypes.STRING(), DataTypes.STRING(), DataTypes.STRING(),
DataTypes.STRING(), DataTypes.STRING(), DataTypes.STRING(), DataTypes.STRING(), DataTypes.INT() };
// 社区事件采集日志表配置
private static final String[] COMMUNITY_EVENT_FIELDS = { "id", "event_type", "event_time", "event_info",
"send_time", "create_time", "strategy_group_id", "app_key", "app_type", "app_channel", "zone_code",
"zone_name", "zone_type", "sdk_version", "user_agent", "device_id", "uid", "strategy_version", "route_ip",
"cid", "phone", "nick", "unique_id","community_id","community_type", DORIS_DELETE_SIGN };
private static final DataType[] COMMUNITY_EVENT_TYPES = { DataTypes.STRING(), // id VARCHAR(50)
DataTypes.STRING(), // event_type varchar(255)
DataTypes.TIMESTAMP(3), // event_time datetime(3)
DataTypes.STRING(), // event_info text
DataTypes.TIMESTAMP(3), // send_time DATETIME(3)
DataTypes.TIMESTAMP(3), // create_time DATETIME(3)
DataTypes.STRING(), // strategy_group_id varchar(128)
DataTypes.STRING(), // app_key varchar(20)
DataTypes.STRING(), // app_type varchar(10)
DataTypes.STRING(), // app_channel varchar(100)
DataTypes.STRING(), // zone_code varchar(100)
DataTypes.STRING(), // zone_name varchar(100)
DataTypes.STRING(), // zone_type varchar(100)
DataTypes.STRING(), // sdk_version varchar(10)
DataTypes.STRING(), // user_agent TEXT
DataTypes.STRING(), // device_id varchar(128)
DataTypes.STRING(), // uid varchar(128)
DataTypes.STRING(), // strategy_version varchar(128)
DataTypes.STRING(), // route_ip TEXT
DataTypes.STRING(), // cid varchar(20)
DataTypes.STRING(), // phone varchar(20)
DataTypes.STRING(), // nick varchar(100)
DataTypes.STRING(), // unique_id varchar(100)
DataTypes.STRING(), // community_id varchar(100)
DataTypes.STRING(), // community_type varchar(100)
DataTypes.INT() };
// 事件采集错误日志表配置
private static final String[] EVENT_ERROR_FIELDS = { "id", "dt", "event_time", "create_time", "app_key", "app_type",
"cid", "phone", "nick", "event", "data", "startTime", "timeDifference", "endTime", "userId",
......@@ -79,20 +113,25 @@ public class EventLogAchi implements Serializable {
// 事件曝光表配置
private static final String[] EVENT_EXPOSURE_FIELDS = { "cid", "phone", "exposure_type", "time", "event_type",
"article_id","nick", "create_time", "send_time",DORIS_DELETE_SIGN };
"article_id", "nick", "create_time", "send_time", DORIS_DELETE_SIGN };
private static final DataType[] EVENT_EXPOSURE_TYPES = { DataTypes.STRING(), DataTypes.STRING(), DataTypes.STRING(), DataTypes.TIMESTAMP(3), DataTypes.STRING(),
DataTypes.STRING(), DataTypes.STRING(), DataTypes.STRING(), DataTypes.STRING(), DataTypes.INT() };
private static final DataType[] EVENT_EXPOSURE_TYPES = { DataTypes.STRING(), DataTypes.STRING(), DataTypes.STRING(),
DataTypes.TIMESTAMP(3), DataTypes.STRING(), DataTypes.STRING(), DataTypes.STRING(), DataTypes.STRING(),
DataTypes.STRING(), DataTypes.INT() };
public static void eventLog(DataStreamSource<String> dataStreamSource) {
// 初始化表配置
TableConfig eventConfig = new TableConfig(EVENT_FIELDS, EVENT_TYPES, "bi.event_log");
TableConfig communityEventConfig = new TableConfig(COMMUNITY_EVENT_FIELDS, COMMUNITY_EVENT_TYPES, "bi.community_event_log");
TableConfig eventErrorConfig = new TableConfig(EVENT_ERROR_FIELDS, EVENT_ERROR_TYPES, "bi.event_log_error");
TableConfig eventExposureConfig = new TableConfig(EVENT_EXPOSURE_FIELDS, EVENT_EXPOSURE_TYPES, "ai.event_exposure");
TableConfig eventExposureConfig = new TableConfig(EVENT_EXPOSURE_FIELDS, EVENT_EXPOSURE_TYPES,
"ai.event_exposure");
// 创建Doris Sink
DorisSink<RowData> dorisEventSink = DorisConnector.sinkDoris(eventConfig.getFields(), eventConfig.getTypes(),
eventConfig.getTableName());
DorisSink<RowData> doriscommunityEventSink = DorisConnector.sinkDoris(communityEventConfig.getFields(), communityEventConfig.getTypes(),
communityEventConfig.getTableName());
DorisSink<RowData> dorisEventErrorSink = DorisConnector.sinkDoris(eventErrorConfig.getFields(),
eventErrorConfig.getTypes(), eventErrorConfig.getTableName());
DorisSink<RowData> dorisEventExposureSink = DorisConnector.sinkDoris(eventExposureConfig.getFields(),
......@@ -102,16 +141,19 @@ public class EventLogAchi implements Serializable {
processDataStream(dataStreamSource, "eventLog", eventConfig, dorisEventSink,
(item, fieldCount) -> mapToEventRow(item, fieldCount));
batchCommunityProcessDataStream(dataStreamSource, "eventLog", communityEventConfig, doriscommunityEventSink,
(item, fieldCount) -> {
return null;
}, EventLogAchi::mapToCommunityEventRow);
batchProcessDataStream(dataStreamSource, "eventLog", eventErrorConfig, dorisEventErrorSink,
(item, fieldCount) -> {
return null;
}, EventLogAchi::mapToEventErrorRow
);
}, EventLogAchi::mapToEventErrorRow);
batchExposureProcessDataStream(dataStreamSource, "eventLog", eventExposureConfig, dorisEventExposureSink,
(item, fieldCount) -> {
return null;
}, EventLogAchi::mapToEventExposureRow
);
}, EventLogAchi::mapToEventExposureRow);
}
private static void processDataStream(DataStreamSource<String> dataStream, String flumeType,
......@@ -123,6 +165,14 @@ public class EventLogAchi implements Serializable {
processedStream.sinkTo(dorisSink).name("Doris-" + flumeType);
}
private static void batchCommunityProcessDataStream(DataStreamSource<String> dataStream, String flumeType,
TableConfig tableConfig, DorisSink<RowData> dorisSink, RowMapper mapper, EventCommunityMapper communityMapper) {
SingleOutputStreamOperator<RowData> processedStream = dataStream
.flatMap(new ElementCommunityProcessor(flumeType, communityMapper, tableConfig.getFields().length))
.returns(TypeInformation.of(RowData.class)).filter(Objects::nonNull);
processedStream.sinkTo(dorisSink).name("Doris-" + flumeType);
}
private static void batchProcessDataStream(DataStreamSource<String> dataStream, String flumeType,
TableConfig tableConfig, DorisSink<RowData> dorisSink, RowMapper mapper, EventErrorMapper errorMapper) {
......@@ -133,7 +183,8 @@ public class EventLogAchi implements Serializable {
}
private static void batchExposureProcessDataStream(DataStreamSource<String> dataStream, String flumeType,
TableConfig tableConfig, DorisSink<RowData> dorisSink, RowMapper mapper, EventExposureMapper exposureMapper) {
TableConfig tableConfig, DorisSink<RowData> dorisSink, RowMapper mapper,
EventExposureMapper exposureMapper) {
SingleOutputStreamOperator<RowData> processedStream = dataStream
.flatMap(new ElementExposureProcessor(flumeType, exposureMapper, tableConfig.getFields().length))
.returns(TypeInformation.of(RowData.class)).filter(Objects::nonNull);
......@@ -242,7 +293,75 @@ public class EventLogAchi implements Serializable {
// 事件类型过滤
private static boolean isTargetEventType(String eventType) {
return CompareUtils.stringExists(eventType, "socket_event", "socket_error", "socket_time", "refresh_token","all_time");
return CompareUtils.stringExists(eventType, "socket_event", "socket_error", "socket_time", "refresh_token",
"all_time");
}
}
private static class ElementCommunityProcessor implements FlatMapFunction<String, RowData>, Serializable {
/**
*
*/
private static final long serialVersionUID = 1L;
private final String flumeType;
private final EventCommunityMapper communityMapper;;
private final int fieldCount;
public ElementCommunityProcessor(String flumeType, EventCommunityMapper communityMapper, int fieldCount) {
this.flumeType = flumeType;
this.communityMapper = communityMapper;
this.fieldCount = fieldCount;
}
@Override
public void flatMap(String value, Collector<RowData> out) throws Exception {
try {
JSONObject jsonObj = JSON.parseObject(value);
if (!flumeType.equals(jsonObj.getString(FLUME_TYPE_FIELD))) {
return;
}
processKafkaMessage(value, out);
} catch (Exception e) {
logger.error("EventLogAchi 处理 {} 数据出错 | rawData:{} | error:{}", flumeType, value, e.getMessage(),
e);
}
}
private void processKafkaMessage(String value, Collector<RowData> out) {
try {
OdsEventLog event = JSON.parseObject(value, OdsEventLog.class);
if (event == null)
return;
List<EventListBak> eventList = parseEventListSafely(event.getEvent_list());
if (CollectionUtils.isEmpty(eventList))
return;
int dt = TimeConvertUtil.convertToSqlDate(event.getCreate_time().substring(0, 10));
UserProperties userProps = UserPropertiesProcessor.userPropertiesToJson(event.getUser_properties());
for (EventListBak eventInfo : eventList) {
if (isTargetEventType(eventInfo.getR7())) {
out.collect(communityMapper.map(event, eventInfo, userProps, fieldCount));
}
}
} catch (Exception e) {
logger.error("Kafka消息处理失败 | data:{} | error:{}", value, e.getMessage());
}
}
private static List<EventListBak> parseEventListSafely(String jsonStr) {
try {
return JSON.parseArray(jsonStr, EventListBak.class);
} catch (Exception e) {
logger.warn("事件列表解析失败: {}", jsonStr);
return Collections.emptyList();
}
}
// 事件类型过滤
private static boolean isTargetEventType(String eventType) {
return CompareUtils.stringExists(eventType, "enter_act", "exit_act", "show_act");
}
}
......@@ -356,6 +475,56 @@ public class EventLogAchi implements Serializable {
return row;
}
private static RowData mapToCommunityEventRow(OdsEventLog event, EventListBak eventInfo, UserProperties userProperties,int fieldCount) {
String community_id = null;
String community_type = null;
if(StringUtils.isNotEmpty(eventInfo.getR8())) {
JSONObject jsonObj = JSON.parseObject(eventInfo.getR8());
String id = jsonObj.getString("id");
String communityId = jsonObj.getString("community_id");
community_type = jsonObj.getString("type");
if(StringUtils.isNotEmpty(id)) {
community_id = id;
}if(StringUtils.isNotEmpty(communityId)) {
community_id = communityId;
}
}
GenericRowData row = new GenericRowData(fieldCount);
row.setField(0, StringData.fromString(event.getId()));
row.setField(1, StringData.fromString(eventInfo.getR7())); // event_type - 需要补充数据来源
row.setField(2, TimestampData.fromLocalDateTime(LocalDateTime.parse(TimeConvertUtil.parseToStringSSS(eventInfo.getR9()),
DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss.SSS")))); // event_time - 需要补充数据来源
row.setField(3, StringData.fromString(eventInfo.getR8())); // event_info - 需要补充数据来源
row.setField(4, TimestampData
.fromLocalDateTime(LocalDateTime.parse(event.getSend_time(), DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss"))));
row.setField(5, TimestampData.fromLocalDateTime(
LocalDateTime.parse(event.getCreate_time(), DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss.SSS"))));
row.setField(6, StringData.fromString(event.getStrategy_group_id()));
row.setField(7, StringData.fromString(event.getApp_key()));
row.setField(8, StringData.fromString(event.getApp_type()));
row.setField(9, StringData.fromString(event.getApp_channel()));
row.setField(10, StringData.fromString(event.getZone_code()));
row.setField(11, StringData.fromString(event.getZone_name()));
row.setField(12, StringData.fromString(event.getZone_type()));
row.setField(13, StringData.fromString(event.getSdk_version()));
row.setField(14, StringData.fromString(event.getUser_agent()));
row.setField(15, StringData.fromString(event.getDevice_id()));
row.setField(16, StringData.fromString(event.getUid()));
row.setField(17, StringData.fromString(event.getStrategy_version()));
row.setField(18, StringData.fromString(event.getRoute_ip()));
row.setField(19, StringData.fromString(userProperties == null ? null : userProperties.getCid()));
row.setField(20, StringData.fromString(userProperties == null ? null : userProperties.getPhone()));
row.setField(21, StringData.fromString(userProperties == null ? null : userProperties.getNick()));
row.setField(22, StringData.fromString(event.getUnique_id()));
row.setField(23, StringData.fromString(community_id));
row.setField(24, StringData.fromString(community_type));
row.setField(25, DELETE_SIGN_VALUE);
return row;
}
private static RowData mapToEventErrorRow(OdsEventLog event, EventList eventInfo, UserProperties userProps, int dt,
int fieldCount) {
GenericRowData row = new GenericRowData(fieldCount);
......@@ -386,8 +555,8 @@ public class EventLogAchi implements Serializable {
return row;
}
private static RowData mapToEventExposureRow(OdsEventLog event, EventList eventInfo, UserProperties userProps, int dt,
int fieldCount) {
private static RowData mapToEventExposureRow(OdsEventLog event, EventList eventInfo, UserProperties userProps,
int dt, int fieldCount) {
GenericRowData row = new GenericRowData(fieldCount);
row.setField(0, StringData.fromString(userProps == null ? null : userProps.getCid()));
row.setField(1, StringData.fromString(userProps == null ? null : userProps.getPhone()));
......@@ -399,11 +568,11 @@ public class EventLogAchi implements Serializable {
row.setField(4, StringData.fromString(r8.getType()));
String id = null;
if(r8 != null) {
if(StringUtils.isNotEmpty(r8.getId())) {
if(r8.getId().length() > 200) {
id = "测试异常数据:"+r8.getId().substring(0, 200);
}else {
if (r8 != null) {
if (StringUtils.isNotEmpty(r8.getId())) {
if (r8.getId().length() > 200) {
id = "测试异常数据:" + r8.getId().substring(0, 200);
} else {
id = r8.getId();
}
}
......@@ -435,4 +604,9 @@ public class EventLogAchi implements Serializable {
private interface EventExposureMapper extends Serializable {
RowData map(OdsEventLog event, EventList eventInfo, UserProperties userProps, int dt, int fieldCount);
}
@FunctionalInterface
private interface EventCommunityMapper extends Serializable {
RowData map(OdsEventLog event, EventListBak eventInfo, UserProperties userProps, int fieldCount);
}
}
......@@ -43,7 +43,7 @@ public class SpiderAchi implements Serializable{
// 爬虫表usdt
private static final String[] USDT_FIELDS = { "spider_time", "usdt",DORIS_DELETE_SIGN };
private static final DataType[] USDT_TYPES = { DataTypes.STRING(), DataTypes.DATE(), DataTypes.INT() };
private static final DataType[] USDT_TYPES = { DataTypes.STRING(), DataTypes.STRING(), DataTypes.INT() };
public static void spider(DataStreamSource<String> dataStreamSource) {
// 初始化表配置
......
package com.flink.vo;
import java.io.Serializable;
import lombok.Data;
import lombok.ToString;
/**
* @author wjs
* @version 创建时间:2025-6-18 17:35:56
* 类说明
*/
@Data
@ToString
public class EventListBak implements Serializable{
/**
*
*/
private static final long serialVersionUID = 1L;
private String r7;
private String r8;
private Long r9;
}
......@@ -34,4 +34,12 @@ public class Properties implements Serializable{
private String userId;
private String id;
private String type;
private String ids;
private String page_name;
private String community_id;
private String source_page;
private String source_cid;
}
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or sign in to comment