Commit d8f3af8a by 魏建枢

事件流处理抽取代码提交

parent 98c426fc
...@@ -48,7 +48,7 @@ public class CollectLogAchi implements Serializable { ...@@ -48,7 +48,7 @@ public class CollectLogAchi implements Serializable {
private static final String DORIS_DELETE_SIGN = "__DORIS_DELETE_SIGN__"; private static final String DORIS_DELETE_SIGN = "__DORIS_DELETE_SIGN__";
private static final int DELETE_SIGN_VALUE = 0; private static final int DELETE_SIGN_VALUE = 0;
// 用户表配置 // 设备信息采集日志表配置
private static final String[] COLLECT_FIELDS = { "id", "dt", "device_id", "device_id_v1", "uid", "idfv", "app_key", private static final String[] COLLECT_FIELDS = { "id", "dt", "device_id", "device_id_v1", "uid", "idfv", "app_key",
"app_type", "other_info", "device_info", "env_info", "cid", "phone", "nick", "unique_id", "create_time", "app_type", "other_info", "device_info", "env_info", "cid", "phone", "nick", "unique_id", "create_time",
DORIS_DELETE_SIGN }; DORIS_DELETE_SIGN };
...@@ -66,7 +66,7 @@ public class CollectLogAchi implements Serializable { ...@@ -66,7 +66,7 @@ public class CollectLogAchi implements Serializable {
DorisSink<RowData> dorisCollectSink = DorisConnector.sinkDoris(collectConfig.getFields(), DorisSink<RowData> dorisCollectSink = DorisConnector.sinkDoris(collectConfig.getFields(),
collectConfig.getTypes(), collectConfig.getTableName()); collectConfig.getTypes(), collectConfig.getTableName());
// 处理用户数据 // 处理设备信息采集日志数据
processDataStream(dataStreamSource, "newCollectLog", collectConfig, dorisCollectSink, processDataStream(dataStreamSource, "newCollectLog", collectConfig, dorisCollectSink,
(RowMapper<String>) CollectLogAchi::mapToCollectRow); (RowMapper<String>) CollectLogAchi::mapToCollectRow);
} }
...@@ -114,7 +114,6 @@ public class CollectLogAchi implements Serializable { ...@@ -114,7 +114,6 @@ public class CollectLogAchi implements Serializable {
// 设备日志采集数据映射 // 设备日志采集数据映射
private static RowData mapToCollectRow(String value, int fieldCount) { private static RowData mapToCollectRow(String value, int fieldCount) {
logger.error("CollectLogAchi mapToCollectRow 数据打印 | rawData:{} | fieldCount:{}", value, fieldCount);
OdsCollectLog log = JSON.parseObject(value, new TypeReference<OdsCollectLog>() {}); OdsCollectLog log = JSON.parseObject(value, new TypeReference<OdsCollectLog>() {});
String appType = log.getApp_type(); String appType = log.getApp_type();
String appKey = log.getApp_key(); String appKey = log.getApp_key();
...@@ -122,8 +121,7 @@ public class CollectLogAchi implements Serializable { ...@@ -122,8 +121,7 @@ public class CollectLogAchi implements Serializable {
String device_info = log.getDevice_info(); String device_info = log.getDevice_info();
String env_info = log.getEnv_info(); String env_info = log.getEnv_info();
String createTime = log.getCreate_time(); String createTime = log.getCreate_time();
DeviceIdInfo deviceIdInfo = GenDeviceIdProcessor.genDeviceId(appType, appKey, other_info, device_info, DeviceIdInfo deviceIdInfo = GenDeviceIdProcessor.genDeviceId(appType, appKey, other_info, device_info,env_info);
env_info);
UserProperties userProperties = UserPropertiesProcessor.userPropertiesToJson(log.getUser_properties()); UserProperties userProperties = UserPropertiesProcessor.userPropertiesToJson(log.getUser_properties());
GenericRowData row = new GenericRowData(fieldCount); GenericRowData row = new GenericRowData(fieldCount);
row.setField(0, StringData.fromString(log.getId())); row.setField(0, StringData.fromString(log.getId()));
......
...@@ -36,10 +36,10 @@ public class CommonConsumeBaseAchi extends MultipleSourceCommonBase implements S ...@@ -36,10 +36,10 @@ public class CommonConsumeBaseAchi extends MultipleSourceCommonBase implements S
DataStreamSource<String> userInvitationStreamSource = kafkaDataSource.getDataStreamSource(); DataStreamSource<String> userInvitationStreamSource = kafkaDataSource.getDataStreamSource();
UserInvitationAchi.userInvitation(userInvitationStreamSource); UserInvitationAchi.userInvitation(userInvitationStreamSource);
} }
// if(StringUtils.equals(kafkaDataSource.getTopic(),TopicTypeEnum.ODS_EVENT_LOG.getTopic())) { if(StringUtils.equals(kafkaDataSource.getTopic(),TopicTypeEnum.ODS_EVENT_LOG.getTopic())) {
// DataStreamSource<String> eventLogStreamSource = kafkaDataSource.getDataStreamSource(); DataStreamSource<String> eventLogStreamSource = kafkaDataSource.getDataStreamSource();
// EventLogAchi.eventLog(eventLogStreamSource); EventLogAchi.eventLog(eventLogStreamSource);
// } }
// if(StringUtils.equals(kafkaDataSource.getTopic(),TopicTypeEnum.ODS_COMMUNITY_HISTORY.getTopic())) { // if(StringUtils.equals(kafkaDataSource.getTopic(),TopicTypeEnum.ODS_COMMUNITY_HISTORY.getTopic())) {
// DataStreamSource<String> communityHistoryStreamSource = kafkaDataSource.getDataStreamSource(); // DataStreamSource<String> communityHistoryStreamSource = kafkaDataSource.getDataStreamSource();
// CommunityHistoryAchi.communityHistory(communityHistoryStreamSource); // CommunityHistoryAchi.communityHistory(communityHistoryStreamSource);
......
package com.flink.achieve.base; package com.flink.achieve.base;
import java.io.Serializable; import java.io.Serializable;
import java.time.LocalDateTime;
import java.time.format.DateTimeFormatter;
import java.util.Collections;
import java.util.List;
import java.util.Objects;
import org.apache.commons.collections.CollectionUtils;
import org.apache.commons.lang3.StringUtils;
import org.apache.doris.flink.sink.DorisSink;
import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.streaming.api.datastream.DataStreamSource; import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.table.api.DataTypes;
import org.apache.flink.table.data.GenericRowData;
import org.apache.flink.table.data.RowData;
import org.apache.flink.table.data.StringData;
import org.apache.flink.table.data.TimestampData;
import org.apache.flink.table.types.DataType;
import org.apache.flink.util.Collector;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/** import com.alibaba.fastjson.JSON;
* @author wjs import com.alibaba.fastjson.JSONObject;
* @version 创建时间:2025-7-28 10:37:25 import com.alibaba.fastjson.TypeReference;
* 类说明 import com.flink.common.DorisConnector;
*/ import com.flink.config.TableConfig;
public class EventLogAchi implements Serializable{ import com.flink.processor.function.UserPropertiesProcessor;
import com.flink.util.CompareUtils;
import com.flink.util.TimeConvertUtil;
import com.flink.vo.EventList;
import com.flink.vo.OdsEventLog;
import com.flink.vo.Properties;
import com.flink.vo.UserProperties;
/**
* @author wjs
* @version 创建时间:2025-7-28 10:37:25 类说明
*/
public class EventLogAchi implements Serializable {
/** /**
* *
*/ */
private static final long serialVersionUID = 1L; private static final long serialVersionUID = 1L;
private static final Logger logger = LoggerFactory.getLogger(EventLogAchi.class);
// 定义公共常量
private static final String FLUME_TYPE_FIELD = "flume_type";
private static final String DORIS_DELETE_SIGN = "__DORIS_DELETE_SIGN__";
private static final int DELETE_SIGN_VALUE = 0;
// 事件采集日志表配置
private static final String[] EVENT_FIELDS = { "id", "dt", "send_time", "create_time", "strategy_group_id",
"app_key", "app_type", "app_channel", "zone_code", "zone_name", "zone_type", "sdk_version", "user_agent",
"device_id", "uid", "strategy_version", "event_list", "route_ip", "cid", "phone", "nick", "unique_id",
DORIS_DELETE_SIGN };
private static final DataType[] EVENT_TYPES = { DataTypes.STRING(), DataTypes.DATE(), DataTypes.TIMESTAMP(3),
DataTypes.TIMESTAMP(3), DataTypes.STRING(), DataTypes.STRING(), DataTypes.STRING(), DataTypes.STRING(),
DataTypes.STRING(), DataTypes.STRING(), DataTypes.STRING(), DataTypes.STRING(), DataTypes.STRING(),
DataTypes.STRING(), DataTypes.STRING(), DataTypes.STRING(), DataTypes.STRING(), DataTypes.STRING(),
DataTypes.STRING(), DataTypes.STRING(), DataTypes.STRING(), DataTypes.STRING(), DataTypes.INT() };
// 事件采集错误日志表配置
private static final String[] EVENT_ERROR_FIELDS = { "id", "dt", "event_time", "create_time", "app_key", "app_type",
"cid", "phone", "nick", "event", "data", "startTime", "timeDifference", "endTime", "userId",
DORIS_DELETE_SIGN };
private static final DataType[] EVENT_ERROR_TYPES = { DataTypes.STRING(), DataTypes.DATE(), DataTypes.TIMESTAMP(3),
DataTypes.TIMESTAMP(3), DataTypes.STRING(), DataTypes.STRING(), DataTypes.STRING(), DataTypes.STRING(),
DataTypes.STRING(), DataTypes.STRING(), DataTypes.STRING(), DataTypes.TIMESTAMP(3), DataTypes.STRING(),
DataTypes.TIMESTAMP(3), DataTypes.STRING(), DataTypes.INT() };
public static void eventLog(DataStreamSource<String> dataStreamSource) {
// 初始化表配置
TableConfig eventConfig = new TableConfig(EVENT_FIELDS, EVENT_TYPES, "bi.event_log");
TableConfig eventErrorConfig = new TableConfig(EVENT_ERROR_FIELDS, EVENT_ERROR_TYPES, "bi.event_log_error");
// 创建Doris Sink
DorisSink<RowData> dorisEventSink = DorisConnector.sinkDoris(eventConfig.getFields(), eventConfig.getTypes(),
eventConfig.getTableName());
DorisSink<RowData> dorisEventErrorSink = DorisConnector.sinkDoris(eventErrorConfig.getFields(),
eventErrorConfig.getTypes(), eventErrorConfig.getTableName());
// 处理设备信息采集日志数据
processDataStream(dataStreamSource, "eventLog", eventConfig, dorisEventSink,
(item, fieldCount) -> mapToEventRow(item, fieldCount));
batchProcessDataStream(dataStreamSource, "eventLog", eventErrorConfig, dorisEventErrorSink,
(item, fieldCount) -> {
return null;
}, EventLogAchi::mapToEventErrorRow
);
}
private static void processDataStream(DataStreamSource<String> dataStream, String flumeType,
TableConfig tableConfig, DorisSink<RowData> dorisSink, RowMapper mapper) {
SingleOutputStreamOperator<RowData> processedStream = dataStream
.map(new ElementProcessorWithMap(flumeType, mapper, tableConfig.getFields().length))
.returns(TypeInformation.of(RowData.class)).filter(Objects::nonNull);
processedStream.sinkTo(dorisSink).name("Doris-" + flumeType);
}
private static void batchProcessDataStream(DataStreamSource<String> dataStream, String flumeType,
TableConfig tableConfig, DorisSink<RowData> dorisSink, RowMapper mapper, EventErrorMapper errorMapper) {
SingleOutputStreamOperator<RowData> processedStream = dataStream
.flatMap(new ElementProcessor(flumeType, errorMapper, tableConfig.getFields().length))
.returns(TypeInformation.of(RowData.class)).filter(Objects::nonNull);
processedStream.sinkTo(dorisSink).name("Doris-" + flumeType);
}
/**
* 使用map算子的内部处理类
*/
private static class ElementProcessorWithMap implements MapFunction<String, RowData>, Serializable {
private static final long serialVersionUID = 1L;
private final String flumeType;
private final RowMapper mapper;
private final int fieldCount;
public ElementProcessorWithMap(String flumeType, RowMapper mapper, int fieldCount) {
this.flumeType = flumeType;
this.mapper = mapper;
this.fieldCount = fieldCount;
}
@Override
public RowData map(String value) throws Exception {
try {
JSONObject jsonObj = JSON.parseObject(value);
if (!flumeType.equals(jsonObj.getString(FLUME_TYPE_FIELD))) {
return null;
}
return mapper.map(value, fieldCount);
} catch (Exception e) {
logger.error("EventLogAchi 处理 {} 数据出错 | rawData:{} | error:{}", flumeType, value, e.getMessage(), e);
return null;
}
}
}
/**
* 使用flatMap算子的内部处理类
*
* @author wjs
*
* @param <T>
*/
private static class ElementProcessor implements FlatMapFunction<String, RowData>, Serializable {
/**
*
*/
private static final long serialVersionUID = 1L;
private final String flumeType;
private final EventErrorMapper errorMapper;;
private final int fieldCount;
public ElementProcessor(String flumeType, EventErrorMapper errorMapper, int fieldCount) {
this.flumeType = flumeType;
this.errorMapper = errorMapper;
this.fieldCount = fieldCount;
}
@Override
public void flatMap(String value, Collector<RowData> out) throws Exception {
try {
JSONObject jsonObj = JSON.parseObject(value);
if (!flumeType.equals(jsonObj.getString(FLUME_TYPE_FIELD))) {
return;
}
processKafkaMessage(value, out);
} catch (Exception e) {
logger.error("UserInvitationAchi 处理 {} 数据出错 | rawData:{} | error:{}", flumeType, value, e.getMessage(),
e);
}
}
private void processKafkaMessage(String value, Collector<RowData> out) {
try {
OdsEventLog event = JSON.parseObject(value, OdsEventLog.class);
if (event == null)
return;
List<EventList> eventList = parseEventListSafely(event.getEvent_list());
if (CollectionUtils.isEmpty(eventList))
return;
int dt = TimeConvertUtil.convertToSqlDate(event.getCreate_time().substring(0, 10));
UserProperties userProps = UserPropertiesProcessor.userPropertiesToJson(event.getUser_properties());
for (EventList eventInfo : eventList) {
if (isTargetEventType(eventInfo.getR7())) {
out.collect(errorMapper.map(event, eventInfo, userProps, dt, fieldCount));
}
}
} catch (Exception e) {
logger.error("Kafka消息处理失败 | data:{} | error:{}", value, e.getMessage());
}
}
public static void eventLog(DataStreamSource<String> eventLogStreamSource) { private static List<EventList> parseEventListSafely(String jsonStr) {
// TODO Auto-generated method stub try {
return JSON.parseArray(jsonStr, EventList.class);
} catch (Exception e) {
logger.warn("事件列表解析失败: {}", jsonStr);
return Collections.emptyList();
}
}
// 事件类型过滤
private static boolean isTargetEventType(String eventType) {
return CompareUtils.stringExists(eventType, "socket_event", "socket_error", "socket_time", "refresh_token","all_time");
}
}
private static RowData mapToEventRow(Object item, int fieldCount) {
String value = (String) item;
OdsEventLog event = JSONObject.parseObject(value, new TypeReference<OdsEventLog>() {
});
String createTime = event.getCreate_time();
String routeIp = event.getRoute_ip();
String appKey = event.getApp_key().trim();
String appType = event.getApp_type();
String sendTime = event.getSend_time();
if (StringUtils.isEmpty(appKey) || StringUtils.equals(appKey, "C7jias27jias2")) {
appKey = "8ooOvXJo276";
}
UserProperties userProperties = UserPropertiesProcessor.userPropertiesToJson(event.getUser_properties());
GenericRowData row = new GenericRowData(fieldCount);
row.setField(0, StringData.fromString(event.getId()));
row.setField(1, TimeConvertUtil.convertToSqlDate(createTime.substring(0, 10)));
row.setField(2, TimestampData
.fromLocalDateTime(LocalDateTime.parse(sendTime, DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss"))));
row.setField(3, TimestampData.fromLocalDateTime(
LocalDateTime.parse(createTime, DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss.SSS"))));
row.setField(4, StringData.fromString(event.getStrategy_group_id()));
row.setField(5, StringData.fromString(appKey));
row.setField(6, StringData.fromString(appType));
row.setField(7, StringData.fromString(event.getApp_channel()));
row.setField(8, StringData.fromString(event.getZone_code()));
row.setField(9, StringData.fromString(event.getZone_name()));
row.setField(10, StringData.fromString(event.getZone_type()));
row.setField(11, StringData.fromString(event.getSdk_version()));
row.setField(12, StringData.fromString(event.getUser_agent()));
row.setField(13, StringData.fromString(event.getDevice_id()));
row.setField(14, StringData.fromString(event.getUid()));
row.setField(15, StringData.fromString(event.getStrategy_version()));
row.setField(16, StringData.fromString(event.getEvent_list()));
row.setField(17, StringData.fromString(routeIp));
row.setField(18, StringData.fromString(userProperties == null ? null : userProperties.getCid()));
row.setField(19, StringData.fromString(userProperties == null ? null : userProperties.getPhone()));
row.setField(20, StringData.fromString(userProperties == null ? null : userProperties.getNick()));
row.setField(21, StringData.fromString(event.getUnique_id()));
row.setField(22, DELETE_SIGN_VALUE);
return row;
} }
private static RowData mapToEventErrorRow(OdsEventLog event, EventList eventInfo, UserProperties userProps, int dt,
int fieldCount) {
GenericRowData row = new GenericRowData(fieldCount);
row.setField(0, StringData.fromString(event.getId()));
row.setField(1, dt);
row.setField(2,
TimestampData.fromLocalDateTime(LocalDateTime.parse(TimeConvertUtil.parseToStringSSS(eventInfo.getR9()),
DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss.SSS"))));
row.setField(3, TimestampData.fromLocalDateTime(
LocalDateTime.parse(event.getCreate_time(), DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss.SSS"))));
row.setField(4, StringData.fromString(event.getApp_key()));
row.setField(5, StringData.fromString(event.getApp_type()));
row.setField(6, StringData.fromString(userProps == null ? null : userProps.getCid()));
row.setField(7, StringData.fromString(userProps == null ? null : userProps.getPhone()));
row.setField(8, StringData.fromString(userProps == null ? null : userProps.getNick()));
Properties r8 = eventInfo.getR8();
row.setField(9, StringData.fromString(eventInfo.getR7()));
row.setField(10, StringData.fromString(r8.getData()));
row.setField(11,
TimestampData.fromLocalDateTime(LocalDateTime.parse(TimeConvertUtil.parseToStringSSS(r8.getStartTime()),
DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss.SSS"))));
row.setField(12, StringData.fromString(r8.getTimeDifference()));
row.setField(13,
TimestampData.fromLocalDateTime(LocalDateTime.parse(TimeConvertUtil.parseToStringSSS(r8.getEndTime()),
DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss.SSS"))));
row.setField(14, StringData.fromString(r8.getUserId()));
row.setField(15, DELETE_SIGN_VALUE);
return row;
}
/**
* 行数据映射接口
*
* @param <T> 数据类型
*/
@FunctionalInterface
private interface RowMapper extends Serializable {
RowData map(Object item, int fieldCount);
}
@FunctionalInterface
private interface EventErrorMapper extends Serializable {
RowData map(OdsEventLog event, EventList eventInfo, UserProperties userProps, int dt, int fieldCount);
}
} }
...@@ -58,100 +58,100 @@ public class EventLogAchi extends MultipleSourceCommonBase implements Serializab ...@@ -58,100 +58,100 @@ public class EventLogAchi extends MultipleSourceCommonBase implements Serializab
public void parseSourceKafkaJson(List<KafkaDataSource> dataSourceList) throws ParseException, Exception { public void parseSourceKafkaJson(List<KafkaDataSource> dataSourceList) throws ParseException, Exception {
// =================配置入库字段========================================= // =================配置入库字段=========================================
// 事件明细表结构 // 事件明细表结构
TableConfig tableConfig = new TableConfig( // TableConfig tableConfig = new TableConfig(
new String[]{ // new String[]{
"id", // "id",
"dt", // "dt",
"send_time", // "send_time",
"create_time", // "create_time",
"strategy_group_id", // "strategy_group_id",
"app_key", // "app_key",
"app_type", // "app_type",
"app_channel", // "app_channel",
"zone_code", // "zone_code",
"zone_name", // "zone_name",
"zone_type", // "zone_type",
"sdk_version", // "sdk_version",
"user_agent", // "user_agent",
"device_id", // "device_id",
"uid", // "uid",
"strategy_version", // "strategy_version",
"event_list", // "event_list",
"route_ip", // "route_ip",
"cid", // "cid",
"phone", // "phone",
"nick", // "nick",
"unique_id", // "unique_id",
"__DORIS_DELETE_SIGN__" // "__DORIS_DELETE_SIGN__"
}, // },
new DataType[]{ // new DataType[]{
DataTypes.STRING(), // DataTypes.STRING(),
DataTypes.DATE(), // DataTypes.DATE(),
DataTypes.TIMESTAMP(3), // DataTypes.TIMESTAMP(3),
DataTypes.TIMESTAMP(3), // DataTypes.TIMESTAMP(3),
DataTypes.STRING(), // DataTypes.STRING(),
DataTypes.STRING(), // DataTypes.STRING(),
DataTypes.STRING(), // DataTypes.STRING(),
DataTypes.STRING(), // DataTypes.STRING(),
DataTypes.STRING(), // DataTypes.STRING(),
DataTypes.STRING(), // DataTypes.STRING(),
DataTypes.STRING(), // DataTypes.STRING(),
DataTypes.STRING(), // DataTypes.STRING(),
DataTypes.STRING(), // DataTypes.STRING(),
DataTypes.STRING(), // DataTypes.STRING(),
DataTypes.STRING(), // DataTypes.STRING(),
DataTypes.STRING(), // DataTypes.STRING(),
DataTypes.STRING(), // DataTypes.STRING(),
DataTypes.STRING(), // DataTypes.STRING(),
DataTypes.STRING(), // DataTypes.STRING(),
DataTypes.STRING(), // DataTypes.STRING(),
DataTypes.STRING(), // DataTypes.STRING(),
DataTypes.STRING(), // DataTypes.STRING(),
DataTypes.INT() // DataTypes.INT()
}, // },
"bi.event_log" // "bi.event_log"
); // );
//
TableConfig tableErrorConfig = new TableConfig( // TableConfig tableErrorConfig = new TableConfig(
new String[]{ // new String[]{
"id", // "id",
"dt", // "dt",
"event_time", // "event_time",
"create_time", // "create_time",
"app_key", // "app_key",
"app_type", // "app_type",
"cid", // "cid",
"phone", // "phone",
"nick", // "nick",
"event", // "event",
"data", // "data",
"startTime", // "startTime",
"timeDifference", // "timeDifference",
"endTime", // "endTime",
"userId", // "userId",
"__DORIS_DELETE_SIGN__" // "__DORIS_DELETE_SIGN__"
}, // },
new DataType[]{ // new DataType[]{
DataTypes.STRING(), // DataTypes.STRING(),
DataTypes.DATE(), // DataTypes.DATE(),
DataTypes.TIMESTAMP(3), // DataTypes.TIMESTAMP(3),
DataTypes.TIMESTAMP(3), // DataTypes.TIMESTAMP(3),
DataTypes.STRING(), // DataTypes.STRING(),
DataTypes.STRING(), // DataTypes.STRING(),
DataTypes.STRING(), // DataTypes.STRING(),
DataTypes.STRING(), // DataTypes.STRING(),
DataTypes.STRING(), // DataTypes.STRING(),
DataTypes.STRING(), // DataTypes.STRING(),
DataTypes.STRING(), // DataTypes.STRING(),
DataTypes.TIMESTAMP(3), // DataTypes.TIMESTAMP(3),
DataTypes.STRING(), // DataTypes.STRING(),
DataTypes.TIMESTAMP(3), // DataTypes.TIMESTAMP(3),
DataTypes.STRING(), // DataTypes.STRING(),
DataTypes.INT() // DataTypes.INT()
}, // },
"bi.event_log_error" // "bi.event_log_error"
); // );
//
TableConfig tableCommunityHistoryConfig = new TableConfig( TableConfig tableCommunityHistoryConfig = new TableConfig(
new String[]{ new String[]{
"article_id", "article_id",
...@@ -197,17 +197,17 @@ public class EventLogAchi extends MultipleSourceCommonBase implements Serializab ...@@ -197,17 +197,17 @@ public class EventLogAchi extends MultipleSourceCommonBase implements Serializab
); );
//=================流式处理========================================= //=================流式处理=========================================
DorisSink<RowData> dorisSink = DorisConnector.sinkDoris(tableConfig.getFields(), tableConfig.getTypes(), tableConfig.getTableName()); // DorisSink<RowData> dorisSink = DorisConnector.sinkDoris(tableConfig.getFields(), tableConfig.getTypes(), tableConfig.getTableName());
DorisSink<RowData> dorisErrorSink = DorisConnector.sinkDoris(tableErrorConfig.getFields(), tableErrorConfig.getTypes(), tableErrorConfig.getTableName()); // DorisSink<RowData> dorisErrorSink = DorisConnector.sinkDoris(tableErrorConfig.getFields(), tableErrorConfig.getTypes(), tableErrorConfig.getTableName());
DorisSink<RowData> dorisCommunityHistorySink = DorisConnector.sinkDoris(tableCommunityHistoryConfig.getFields(), tableCommunityHistoryConfig.getTypes(), tableCommunityHistoryConfig.getTableName()); DorisSink<RowData> dorisCommunityHistorySink = DorisConnector.sinkDoris(tableCommunityHistoryConfig.getFields(), tableCommunityHistoryConfig.getTypes(), tableCommunityHistoryConfig.getTableName());
//=================数据处理流水线========================================= //=================数据处理流水线=========================================
DataStreamSource<String> eventLogStreamSource = null; // DataStreamSource<String> eventLogStreamSource = null;
DataStreamSource<String> communityHistoryStreamSource = null; DataStreamSource<String> communityHistoryStreamSource = null;
if(CollectionUtils.isNotEmpty(dataSourceList)) { if(CollectionUtils.isNotEmpty(dataSourceList)) {
for(KafkaDataSource kafkaDataSource : dataSourceList) { for(KafkaDataSource kafkaDataSource : dataSourceList) {
if(StringUtils.equals(kafkaDataSource.getTopic(),TopicTypeEnum.ODS_EVENT_LOG.getTopic())) { // if(StringUtils.equals(kafkaDataSource.getTopic(),TopicTypeEnum.ODS_EVENT_LOG.getTopic())) {
eventLogStreamSource = kafkaDataSource.getDataStreamSource(); // eventLogStreamSource = kafkaDataSource.getDataStreamSource();
} // }
if(StringUtils.equals(kafkaDataSource.getTopic(),TopicTypeEnum.ODS_COMMUNITY_HISTORY.getTopic())) { if(StringUtils.equals(kafkaDataSource.getTopic(),TopicTypeEnum.ODS_COMMUNITY_HISTORY.getTopic())) {
communityHistoryStreamSource = kafkaDataSource.getDataStreamSource(); communityHistoryStreamSource = kafkaDataSource.getDataStreamSource();
} }
...@@ -216,123 +216,123 @@ public class EventLogAchi extends MultipleSourceCommonBase implements Serializab ...@@ -216,123 +216,123 @@ public class EventLogAchi extends MultipleSourceCommonBase implements Serializab
return; return;
} }
eventLogStreamSource // eventLogStreamSource
.map(value->{ // .map(value->{
try { // try {
// 解析 Kafka 数据 // // 解析 Kafka 数据
OdsEventLog event = JSONObject.parseObject(value, new TypeReference<OdsEventLog>() {}); // OdsEventLog event = JSONObject.parseObject(value, new TypeReference<OdsEventLog>() {});
String createTime = event.getCreate_time(); // String createTime = event.getCreate_time();
String routeIp = event.getRoute_ip(); // String routeIp = event.getRoute_ip();
String appKey = event.getApp_key().trim(); // String appKey = event.getApp_key().trim();
String appType = event.getApp_type(); // String appType = event.getApp_type();
String sendTime = event.getSend_time(); // String sendTime = event.getSend_time();
if(StringUtils.isEmpty(appKey) || StringUtils.equals(appKey, "C7jias27jias2")) { // if(StringUtils.isEmpty(appKey) || StringUtils.equals(appKey, "C7jias27jias2")) {
appKey = "8ooOvXJo276"; // appKey = "8ooOvXJo276";
} // }
UserProperties userProperties = UserPropertiesProcessor.userPropertiesToJson(event.getUser_properties()); // UserProperties userProperties = UserPropertiesProcessor.userPropertiesToJson(event.getUser_properties());
//
//
// 转换为RowData // // 转换为RowData
GenericRowData row = new GenericRowData(tableConfig.getFields().length); // GenericRowData row = new GenericRowData(tableConfig.getFields().length);
row.setField(0, StringData.fromString(event.getId())); // row.setField(0, StringData.fromString(event.getId()));
row.setField(1, TimeConvertUtil.convertToSqlDate(createTime.substring(0, 10))); // row.setField(1, TimeConvertUtil.convertToSqlDate(createTime.substring(0, 10)));
row.setField(2, TimestampData.fromLocalDateTime(LocalDateTime.parse(sendTime,DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss")))); // row.setField(2, TimestampData.fromLocalDateTime(LocalDateTime.parse(sendTime,DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss"))));
row.setField(3, TimestampData.fromLocalDateTime(LocalDateTime.parse(createTime,DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss.SSS")))); // row.setField(3, TimestampData.fromLocalDateTime(LocalDateTime.parse(createTime,DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss.SSS"))));
row.setField(4, StringData.fromString(event.getStrategy_group_id())); // row.setField(4, StringData.fromString(event.getStrategy_group_id()));
row.setField(5, StringData.fromString(appKey)); // row.setField(5, StringData.fromString(appKey));
row.setField(6, StringData.fromString(appType)); // row.setField(6, StringData.fromString(appType));
row.setField(7, StringData.fromString(event.getApp_channel())); // row.setField(7, StringData.fromString(event.getApp_channel()));
row.setField(8, StringData.fromString(event.getZone_code())); // row.setField(8, StringData.fromString(event.getZone_code()));
row.setField(9, StringData.fromString(event.getZone_name())); // row.setField(9, StringData.fromString(event.getZone_name()));
row.setField(10, StringData.fromString(event.getZone_type())); // row.setField(10, StringData.fromString(event.getZone_type()));
row.setField(11, StringData.fromString(event.getSdk_version())); // row.setField(11, StringData.fromString(event.getSdk_version()));
row.setField(12, StringData.fromString(event.getUser_agent())); // row.setField(12, StringData.fromString(event.getUser_agent()));
row.setField(13, StringData.fromString(event.getDevice_id())); // row.setField(13, StringData.fromString(event.getDevice_id()));
row.setField(14, StringData.fromString(event.getUid())); // row.setField(14, StringData.fromString(event.getUid()));
row.setField(15, StringData.fromString(event.getStrategy_version())); // row.setField(15, StringData.fromString(event.getStrategy_version()));
row.setField(16, StringData.fromString(event.getEvent_list())); // row.setField(16, StringData.fromString(event.getEvent_list()));
row.setField(17, StringData.fromString(routeIp)); // row.setField(17, StringData.fromString(routeIp));
row.setField(18, StringData.fromString(userProperties==null ? null : userProperties.getCid())); // row.setField(18, StringData.fromString(userProperties==null ? null : userProperties.getCid()));
row.setField(19, StringData.fromString(userProperties==null ? null : userProperties.getPhone())); // row.setField(19, StringData.fromString(userProperties==null ? null : userProperties.getPhone()));
row.setField(20, StringData.fromString(userProperties==null ? null : userProperties.getNick())); // row.setField(20, StringData.fromString(userProperties==null ? null : userProperties.getNick()));
row.setField(21, StringData.fromString(event.getUnique_id())); // row.setField(21, StringData.fromString(event.getUnique_id()));
row.setField(22, 0); // row.setField(22, 0);
return (RowData)row; // return (RowData)row;
} catch (Exception e) { // } catch (Exception e) {
System.err.println("解析失败: "+e.toString()); // System.err.println("解析失败: "+e.toString());
return null; // return null;
} // }
}) // })
.filter(Objects::nonNull) // .filter(Objects::nonNull)
// .print() //// .print()
.sinkTo(dorisSink) // .sinkTo(dorisSink)
.name("Doris-EventLog"); // .name("Doris-EventLog");
//
SingleOutputStreamOperator<RowData> rowDataStream = eventLogStreamSource.flatMap( // SingleOutputStreamOperator<RowData> rowDataStream = eventLogStreamSource.flatMap(
new FlatMapFunction<String, RowData>() { // new FlatMapFunction<String, RowData>() {
private static final long serialVersionUID = 1L; // private static final long serialVersionUID = 1L;
//
@Override // @Override
public void flatMap(String value, Collector<RowData> out) throws Exception { // public void flatMap(String value, Collector<RowData> out) throws Exception {
try { // try {
// 解析 Kafka 数据 // // 解析 Kafka 数据
OdsEventLog event = JSONObject.parseObject(value, new TypeReference<OdsEventLog>() {}); // OdsEventLog event = JSONObject.parseObject(value, new TypeReference<OdsEventLog>() {});
if (null == event) { // if (null == event) {
return; // return;
} // }
//
String id = event.getId(); // String id = event.getId();
String createTime = event.getCreate_time(); // String createTime = event.getCreate_time();
int dt = TimeConvertUtil.convertToSqlDate(createTime.substring(0, 10)); // int dt = TimeConvertUtil.convertToSqlDate(createTime.substring(0, 10));
String appKey = event.getApp_key().trim(); // String appKey = event.getApp_key().trim();
String appType = event.getApp_type(); // String appType = event.getApp_type();
UserProperties userProperties = UserPropertiesProcessor.userPropertiesToJson(event.getUser_properties()); // UserProperties userProperties = UserPropertiesProcessor.userPropertiesToJson(event.getUser_properties());
List<EventList> eventList = JSONObject.parseObject(event.getEvent_list(), new TypeReference<List<EventList>>() {}); // List<EventList> eventList = JSONObject.parseObject(event.getEvent_list(), new TypeReference<List<EventList>>() {});
if(CollectionUtils.isNotEmpty(eventList)){ // if(CollectionUtils.isNotEmpty(eventList)){
for(EventList eventListInfo : eventList) { // for(EventList eventListInfo : eventList) {
String r7 = eventListInfo.getR7(); // String r7 = eventListInfo.getR7();
Long r9 = eventListInfo.getR9(); // Long r9 = eventListInfo.getR9();
Properties r8 = eventListInfo.getR8(); // Properties r8 = eventListInfo.getR8();
if(CompareUtils.stringExists(r7, // if(CompareUtils.stringExists(r7,
"socket_event", // "socket_event",
"socket_error", // "socket_error",
"socket_time", // "socket_time",
"refresh_token", // "refresh_token",
"all_time")) { // "all_time")) {
// 转换为RowData // // 转换为RowData
GenericRowData row = new GenericRowData(tableErrorConfig.getFields().length); // GenericRowData row = new GenericRowData(tableErrorConfig.getFields().length);
row.setField(0, StringData.fromString(id)); //id // row.setField(0, StringData.fromString(id)); //id
row.setField(1, dt); //dt // row.setField(1, dt); //dt
row.setField(2, TimestampData.fromLocalDateTime(LocalDateTime.parse(TimeConvertUtil.parseToStringSSS(r9),DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss.SSS")))); // row.setField(2, TimestampData.fromLocalDateTime(LocalDateTime.parse(TimeConvertUtil.parseToStringSSS(r9),DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss.SSS"))));
row.setField(3, TimestampData.fromLocalDateTime(LocalDateTime.parse(createTime,DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss.SSS")))); // row.setField(3, TimestampData.fromLocalDateTime(LocalDateTime.parse(createTime,DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss.SSS"))));
row.setField(4, StringData.fromString(appKey)); // row.setField(4, StringData.fromString(appKey));
row.setField(5, StringData.fromString(appType)); // row.setField(5, StringData.fromString(appType));
row.setField(6, StringData.fromString(userProperties==null ? null : userProperties.getCid())); // row.setField(6, StringData.fromString(userProperties==null ? null : userProperties.getCid()));
row.setField(7, StringData.fromString(userProperties==null ? null : userProperties.getPhone())); // row.setField(7, StringData.fromString(userProperties==null ? null : userProperties.getPhone()));
row.setField(8, StringData.fromString(userProperties==null ? null : userProperties.getNick())); // row.setField(8, StringData.fromString(userProperties==null ? null : userProperties.getNick()));
row.setField(9, StringData.fromString(r7)); // row.setField(9, StringData.fromString(r7));
row.setField(10, StringData.fromString(r8.getData())); // row.setField(10, StringData.fromString(r8.getData()));
row.setField(11, TimestampData.fromLocalDateTime(LocalDateTime.parse(TimeConvertUtil.parseToStringSSS(r8.getStartTime()),DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss.SSS")))); // row.setField(11, TimestampData.fromLocalDateTime(LocalDateTime.parse(TimeConvertUtil.parseToStringSSS(r8.getStartTime()),DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss.SSS"))));
row.setField(12, StringData.fromString(r8.getTimeDifference())); // row.setField(12, StringData.fromString(r8.getTimeDifference()));
row.setField(13, TimestampData.fromLocalDateTime(LocalDateTime.parse(TimeConvertUtil.parseToStringSSS(r8.getEndTime()),DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss.SSS")))); // row.setField(13, TimestampData.fromLocalDateTime(LocalDateTime.parse(TimeConvertUtil.parseToStringSSS(r8.getEndTime()),DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss.SSS"))));
row.setField(14, StringData.fromString(r8.getUserId())); // row.setField(14, StringData.fromString(r8.getUserId()));
row.setField(15, 0); // row.setField(15, 0);
out.collect(row); // out.collect(row);
} // }
} // }
} // }
} catch (Exception e) { // } catch (Exception e) {
logger.error("EventLogAchi 处理 Kafka 消息出错 | rawData:{} | error:{}", value, e.getMessage()); // logger.error("EventLogAchi 处理 Kafka 消息出错 | rawData:{} | error:{}", value, e.getMessage());
System.err.println("解析失败: "+e.toString()); // System.err.println("解析失败: "+e.toString());
} // }
} // }
}); // });
//
rowDataStream // rowDataStream
.filter(Objects::nonNull) // .filter(Objects::nonNull)
// .print() //// .print()
.sinkTo(dorisErrorSink) // .sinkTo(dorisErrorSink)
.name("Doris-EventLogError"); // .name("Doris-EventLogError");
SingleOutputStreamOperator<RowData> communityHistoryRowDataStream = communityHistoryStreamSource.flatMap(new FlatMapFunction<String, RowData>(){ SingleOutputStreamOperator<RowData> communityHistoryRowDataStream = communityHistoryStreamSource.flatMap(new FlatMapFunction<String, RowData>(){
......
...@@ -26,7 +26,7 @@ public class EventLogProcessor implements JobProcessor{ ...@@ -26,7 +26,7 @@ public class EventLogProcessor implements JobProcessor{
private static List<KafkaTopic> createTopicList() { private static List<KafkaTopic> createTopicList() {
return Arrays.stream(new TopicTypeEnum[]{ return Arrays.stream(new TopicTypeEnum[]{
TopicTypeEnum.ODS_EVENT_LOG, // TopicTypeEnum.ODS_EVENT_LOG,
TopicTypeEnum.ODS_COMMUNITY_HISTORY TopicTypeEnum.ODS_COMMUNITY_HISTORY
}).map(TopicTypeEnum::createKafkaTopic) }).map(TopicTypeEnum::createKafkaTopic)
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or sign in to comment