Commit 27bba8ff by 魏建枢

代码提交

parent ea89c502
......@@ -24,12 +24,8 @@ import com.alibaba.fastjson.JSONObject;
import com.alibaba.fastjson.TypeReference;
import com.flink.common.DorisConnector;
import com.flink.config.TableConfig;
import com.flink.processor.function.GenDeviceIdProcessor;
import com.flink.processor.function.UserPropertiesProcessor;
import com.flink.util.TimeConvertUtil;
import com.flink.vo.DeviceIdInfo;
import com.flink.vo.OdsCollectLog;
import com.flink.vo.UserProperties;
import com.flink.vo.CollectInfoLog;
/**
* @author wjs
......@@ -50,13 +46,20 @@ public class CollectLogAchi implements Serializable {
// 设备信息采集日志表配置
private static final String[] COLLECT_FIELDS = { "id", "dt", "device_id", "device_id_v1", "uid", "idfv", "app_key",
"app_type", "other_info", "device_info", "env_info", "cid", "phone", "nick", "unique_id", "create_time",
"app_type", "other_info", "device_info", "env_info", "cid", "phone", "nick", "unique_id", "create_time","network_ip",
"network_area_name","strategy_group_id","strategy_version","send_time","app_channel","zone_code","zone_name","zone_type",
"sdk_version","user_agent","user_properties","network_model","phone_name","device_name","brand","device_model","os_release",
"app_version","platform","third_id","country_code","register_time","user_state","user_head_url",
DORIS_DELETE_SIGN };
private static final DataType[] COLLECT_TYPES = { DataTypes.STRING(), DataTypes.DATE(), DataTypes.STRING(),
DataTypes.STRING(), DataTypes.STRING(), DataTypes.STRING(), DataTypes.STRING(), DataTypes.STRING(),
DataTypes.STRING(), DataTypes.STRING(), DataTypes.STRING(), DataTypes.STRING(), DataTypes.STRING(),
DataTypes.STRING(), DataTypes.STRING(), DataTypes.TIMESTAMP(3), DataTypes.INT() };
DataTypes.STRING(), DataTypes.STRING(), DataTypes.TIMESTAMP(3), DataTypes.STRING(),
DataTypes.STRING(),DataTypes.STRING(),DataTypes.STRING(),DataTypes.TIMESTAMP(0),DataTypes.STRING(),DataTypes.STRING(),DataTypes.STRING(),DataTypes.STRING(),
DataTypes.STRING(),DataTypes.STRING(),DataTypes.STRING(),DataTypes.STRING(),DataTypes.STRING(),DataTypes.STRING(),DataTypes.STRING(),DataTypes.STRING(),
DataTypes.STRING(),DataTypes.STRING(),DataTypes.STRING(),DataTypes.STRING(),DataTypes.STRING(),DataTypes.STRING(),DataTypes.STRING(),DataTypes.STRING(),
DataTypes.INT() };
public static void collectLog(DataStreamSource<String> dataStreamSource) {
// 初始化表配置
......@@ -114,34 +117,61 @@ public class CollectLogAchi implements Serializable {
// 设备日志采集数据映射
private static RowData mapToCollectRow(String value, int fieldCount) {
OdsCollectLog log = JSON.parseObject(value, new TypeReference<OdsCollectLog>() {});
CollectInfoLog log = JSON.parseObject(value, new TypeReference<CollectInfoLog>() {});
String appType = log.getApp_type();
String appKey = log.getApp_key();
String other_info = log.getOther_info();
String device_info = log.getDevice_info();
String env_info = log.getEnv_info();
String createTime = log.getCreate_time();
DeviceIdInfo deviceIdInfo = GenDeviceIdProcessor.genDeviceId(appType, appKey, other_info, device_info,env_info);
UserProperties userProperties = UserPropertiesProcessor.userPropertiesToJson(log.getUser_properties());
String sendTime = log.getSend_time();
// DeviceIdInfo deviceIdInfo = GenDeviceIdProcessor.genDeviceId(appType, appKey, other_info, device_info,env_info);
// UserProperties userProperties = UserPropertiesProcessor.userPropertiesToJson(log.getUser_properties());
GenericRowData row = new GenericRowData(fieldCount);
row.setField(0, StringData.fromString(log.getId()));
row.setField(1, TimeConvertUtil.convertToSqlDate(createTime.substring(0, 10)));
row.setField(2, StringData.fromString(log.getDevice_id()));
row.setField(3, StringData.fromString(deviceIdInfo == null ? null : deviceIdInfo.getDeviceIdV1()));
row.setField(3, StringData.fromString(log.getDevice_id_v1()));
row.setField(4, StringData.fromString(log.getUid()));
row.setField(5, StringData.fromString(deviceIdInfo == null ? null : deviceIdInfo.getIdfv()));
row.setField(6, StringData.fromString(deviceIdInfo.getAppKey()));
row.setField(5, StringData.fromString(log.getIdfv()));
row.setField(6, StringData.fromString(appKey));
row.setField(7, StringData.fromString(appType));
row.setField(8, StringData.fromString(other_info));
row.setField(9, StringData.fromString(device_info));
row.setField(10, StringData.fromString(env_info));
row.setField(11, StringData.fromString(userProperties == null ? null : userProperties.getCid()));
row.setField(12, StringData.fromString(userProperties == null ? null : userProperties.getPhone()));
row.setField(13, StringData.fromString(userProperties == null ? null : userProperties.getNick()));
row.setField(11, StringData.fromString(log.getCid()));
row.setField(12, StringData.fromString(log.getPhone()));
row.setField(13, StringData.fromString(log.getNick()));
row.setField(14, StringData.fromString(log.getUnique_id()));
row.setField(15, TimestampData.fromLocalDateTime(
LocalDateTime.parse(createTime, DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss.SSS"))));
row.setField(16, DELETE_SIGN_VALUE);
row.setField(16, StringData.fromString(log.getNetwork_ip()));
row.setField(17, StringData.fromString(log.getNetworkAreaName()));
row.setField(18, StringData.fromString(log.getStrategy_group_id()));
row.setField(19, StringData.fromString(log.getStrategy_version()));
row.setField(20, TimestampData.fromLocalDateTime(
LocalDateTime.parse(sendTime, DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss"))));
row.setField(21, StringData.fromString(log.getApp_channel()));
row.setField(22, StringData.fromString(log.getZone_code()));
row.setField(23, StringData.fromString(log.getZone_name()));
row.setField(24, StringData.fromString(log.getZone_type()));
row.setField(25, StringData.fromString(log.getSdk_version()));
row.setField(26, StringData.fromString(log.getUser_agent()));
row.setField(27, StringData.fromString(log.getUser_properties()));
row.setField(28, StringData.fromString(log.getNetwork_model()));
row.setField(29, StringData.fromString(log.getPhone_name()));
row.setField(30, StringData.fromString(log.getDevice_name()));
row.setField(31, StringData.fromString(log.getBrand()));
row.setField(32, StringData.fromString(log.getDevice_model()));
row.setField(33, StringData.fromString(log.getOs_release()));
row.setField(34, StringData.fromString(log.getApp_version()));
row.setField(35, StringData.fromString(log.getPlatform()));
row.setField(36, StringData.fromString(log.getThird_id()));
row.setField(37, StringData.fromString(log.getCountry_code()));
row.setField(38, StringData.fromString(log.getRegister_time()));
row.setField(39, StringData.fromString(log.getUser_state()));
row.setField(40, StringData.fromString(log.getUser_head_url()));
row.setField(41, DELETE_SIGN_VALUE);
return row;
}
......
......@@ -55,6 +55,10 @@ public class CommonConsumeBaseAchi extends SourceCommonBase implements Serializa
DataStreamSource<String> oepnSimiApiStreamSource = kafkaDataSource.getDataStreamSource();
OpenSimiApiAchi.openSimiApi(oepnSimiApiStreamSource);
}
if(StringUtils.equals(kafkaDataSource.getTopic(),TopicTypeEnum.ODS_EXCEPTION_EVENT_TOPIC.getTopic())) {
DataStreamSource<String> exceptionEventStreamSource = kafkaDataSource.getDataStreamSource();
ExceptionEventAchi.exceptionEvent(exceptionEventStreamSource);
}
}
}else {
return;
......
package com.flink.achieve.base;
import java.io.Serializable;
import java.time.LocalDateTime;
import java.time.format.DateTimeFormatter;
import java.util.Objects;
import org.apache.doris.flink.sink.DorisSink;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.table.api.DataTypes;
import org.apache.flink.table.data.GenericRowData;
import org.apache.flink.table.data.RowData;
import org.apache.flink.table.data.StringData;
import org.apache.flink.table.data.TimestampData;
import org.apache.flink.table.types.DataType;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONObject;
import com.alibaba.fastjson.TypeReference;
import com.flink.common.DorisConnector;
import com.flink.config.TableConfig;
import com.flink.processor.function.UserPropertiesProcessor;
import com.flink.util.TimeConvertUtil;
import com.flink.vo.OdsEventLog;
import com.flink.vo.UserProperties;
/**
* @author wjs
* @version 创建时间:2025-9-10 17:22:28
* 类说明
*/
public class ExceptionEventAchi implements Serializable {
private static final long serialVersionUID = 1L;
private static final Logger logger = LoggerFactory.getLogger(ExceptionEventAchi.class);
// 定义公共常量
private static final String FLUME_TYPE_FIELD = "flume_type";
private static final String DORIS_DELETE_SIGN = "__DORIS_DELETE_SIGN__";
private static final int DELETE_SIGN_VALUE = 0;
// 事件采集日志表配置
private static final String[] EXCEPTION_EVENT_FIELDS = { "id", "dt", "send_time", "create_time", "strategy_group_id",
"app_key", "app_type", "app_channel", "zone_code", "zone_name", "zone_type", "sdk_version", "user_agent",
"device_id", "uid", "strategy_version", "event_list", "route_ip", "cid", "phone", "nick", "unique_id","remarks","network_ip",
DORIS_DELETE_SIGN };
private static final DataType[] EXCEPTION_EVENT_TYPES = { DataTypes.STRING(), DataTypes.DATE(), DataTypes.TIMESTAMP(3),
DataTypes.TIMESTAMP(3), DataTypes.STRING(), DataTypes.STRING(), DataTypes.STRING(), DataTypes.STRING(),
DataTypes.STRING(), DataTypes.STRING(), DataTypes.STRING(), DataTypes.STRING(), DataTypes.STRING(),
DataTypes.STRING(), DataTypes.STRING(), DataTypes.STRING(), DataTypes.STRING(), DataTypes.STRING(),
DataTypes.STRING(), DataTypes.STRING(), DataTypes.STRING(), DataTypes.STRING(), DataTypes.STRING(),DataTypes.STRING(), DataTypes.INT() };
public static void exceptionEvent(DataStreamSource<String> dataStreamSource) {
TableConfig exceptionEventConfig = new TableConfig(EXCEPTION_EVENT_FIELDS, EXCEPTION_EVENT_TYPES, "bi.exception_event_log");
// 创建Doris Sink
DorisSink<RowData> dorisExceptionEventSink = DorisConnector.sinkDoris(exceptionEventConfig.getFields(), exceptionEventConfig.getTypes(),
exceptionEventConfig.getTableName());
// 处理设备信息采集日志数据
processDataStream(dataStreamSource, "eventLog", exceptionEventConfig, dorisExceptionEventSink,
(item, fieldCount) -> mapToExceptionEventRow(item, fieldCount));
}
private static void processDataStream(DataStreamSource<String> dataStream, String flumeType,
TableConfig tableConfig, DorisSink<RowData> dorisSink, RowMapper mapper) {
SingleOutputStreamOperator<RowData> processedStream = dataStream
.map(new ElementProcessorWithMap(flumeType, mapper, tableConfig.getFields().length))
.returns(TypeInformation.of(RowData.class)).filter(Objects::nonNull);
processedStream.sinkTo(dorisSink).name("Doris-" + flumeType);
}
/**
* 使用map算子的内部处理类
*/
private static class ElementProcessorWithMap implements MapFunction<String, RowData>, Serializable {
private static final long serialVersionUID = 1L;
private final String flumeType;
private final RowMapper mapper;
private final int fieldCount;
public ElementProcessorWithMap(String flumeType, RowMapper mapper, int fieldCount) {
this.flumeType = flumeType;
this.mapper = mapper;
this.fieldCount = fieldCount;
}
@Override
public RowData map(String value) throws Exception {
try {
JSONObject jsonObj = JSON.parseObject(value);
if (!flumeType.equals(jsonObj.getString(FLUME_TYPE_FIELD))) {
return null;
}
return mapper.map(value, fieldCount);
} catch (Exception e) {
logger.error("ExceptionEventAchi 处理 {} 数据出错 | rawData:{} | error:{}", flumeType, value, e.getMessage(), e);
return null;
}
}
}
private static RowData mapToExceptionEventRow(Object item, int fieldCount) {
String value = (String) item;
OdsEventLog event = JSONObject.parseObject(value, new TypeReference<OdsEventLog>() {
});
String createTime = event.getCreate_time();
String routeIp = event.getRoute_ip();
String appKey = event.getApp_key().trim();
String appType = event.getApp_type();
String sendTime = event.getSend_time();
UserProperties userProperties = UserPropertiesProcessor.userPropertiesToJson(event.getUser_properties());
GenericRowData row = new GenericRowData(fieldCount);
row.setField(0, StringData.fromString(event.getId()));
row.setField(1, TimeConvertUtil.convertToSqlDate(createTime.substring(0, 10)));
row.setField(2, TimestampData
.fromLocalDateTime(LocalDateTime.parse(sendTime, DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss"))));
row.setField(3, TimestampData.fromLocalDateTime(
LocalDateTime.parse(createTime, DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss.SSS"))));
row.setField(4, StringData.fromString(event.getStrategy_group_id()));
row.setField(5, StringData.fromString(appKey));
row.setField(6, StringData.fromString(appType));
row.setField(7, StringData.fromString(event.getApp_channel()));
row.setField(8, StringData.fromString(event.getZone_code()));
row.setField(9, StringData.fromString(event.getZone_name()));
row.setField(10, StringData.fromString(event.getZone_type()));
row.setField(11, StringData.fromString(event.getSdk_version()));
row.setField(12, StringData.fromString(event.getUser_agent()));
row.setField(13, StringData.fromString(event.getDevice_id()));
row.setField(14, StringData.fromString(event.getUid()));
row.setField(15, StringData.fromString(event.getStrategy_version()));
row.setField(16, StringData.fromString(event.getEvent_list()));
row.setField(17, StringData.fromString(routeIp));
row.setField(18, StringData.fromString(userProperties == null ? null : userProperties.getCid()));
row.setField(19, StringData.fromString(userProperties == null ? null : userProperties.getPhone()));
row.setField(20, StringData.fromString(userProperties == null ? null : userProperties.getNick()));
row.setField(21, StringData.fromString(event.getUnique_id()));
row.setField(22, StringData.fromString(event.getRemarks()));
row.setField(23, StringData.fromString(event.getNetwork_ip()));
row.setField(24, DELETE_SIGN_VALUE);
return row;
}
/**
* 行数据映射接口
*
* @param <T> 数据类型
*/
@FunctionalInterface
private interface RowMapper extends Serializable {
RowData map(Object item, int fieldCount);
}
}
......@@ -71,7 +71,7 @@ public class StreamEnvironmentSettings {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(config);
env.getCheckpointConfig().disableCheckpointing();
// env.getCheckpointConfig().disableCheckpointing();
// env.setParallelism(4); // 调整并行度
//=================启动服务=========================================
//开启flink的checkpoint功能:每隔5000ms启动一个检查点(设置checkpoint的声明周期)
......
......@@ -31,6 +31,8 @@ public enum TopicTypeEnum {
ODS_PC_COLLECT_LOG("ods_pc_collect_log","odsPcCollectLog"),
ODS_COMMUNITY_HISTORY("ods_community_history","odsCommunityHistory"),
ODS_SYS_LOG("ods_sys_log","odsSysLog"),
ODS_COLLECT_USER_BEHAVIOR("ods_collect_user_behavior","odsCollectUserBehavior"),
ODS_EXCEPTION_EVENT_TOPIC("ods_exception_event_topic","odsExceptionEventTopic"),
;
......
......@@ -33,7 +33,8 @@ public class CommonConsumeBaseProcessor implements JobProcessor{
TopicTypeEnum.ODS_EVENT_LOG,
TopicTypeEnum.ODS_COMMUNITY_HISTORY,
TopicTypeEnum.ODS_NEW_COLLECT_LOG,
TopicTypeEnum.OPEN_SIMI_API
TopicTypeEnum.OPEN_SIMI_API,
TopicTypeEnum.ODS_EXCEPTION_EVENT_TOPIC,
}).map(TopicTypeEnum::createKafkaTopic)
.collect(Collectors.toList());
......
package com.flink.vo;
import java.io.Serializable;
import lombok.Data;
/**
* @author wjs
* @version 创建时间:2025-9-9 14:31:54 类说明
*/
@Data
public class CollectInfoLog implements Serializable {
/**
*
*/
private static final long serialVersionUID = 1L;
private String id;
private String flume_type;
private String device_id;
private String device_id_v1;
private String third_id;
private String app_type;
private String user_head_url;
private String device_model;
private String app_version;
private String user_properties;
private String send_type;
private String zone_name;
private String os_release;
private String env_info;
private String strategy_group_id;
private String platform;
private String register_time;
private String zone_code;
private String user_state;
private String nick;
private String uid;
private String device_name;
private String sdk_version;
private String zone_type;
private String brand;
private String user_agent;
private String other_info;
private String unique_id;
private String create_time;
private String network_ip;
private String app_channel;
private String country_code;
private String strategy_version;
private String send_time;
private String app_key;
private String device_info;
private String phone;
private String networkAreaName;
private String cid;
private String network_model;
private String phone_name;
private String idfv;
}
......@@ -35,4 +35,7 @@ public class OdsEventLog implements Serializable{
private String route_ip;
private String user_properties;
private String unique_id;
private String remarks;
private String network_ip;
}
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or sign in to comment