Commit 31a40451 by 魏建枢

JOB_06日志采集作业

parent 448f91cc
package com.flink.achieve.doris;
import java.io.Serializable;
import java.time.Instant;
import java.time.LocalDate;
import java.time.LocalDateTime;
import java.time.format.DateTimeFormatter;
import java.util.List;
import java.util.Objects;
import org.apache.commons.lang3.StringUtils;
import org.apache.doris.flink.sink.DorisSink;
import org.apache.flink.api.common.io.ParseException;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.table.api.DataTypes;
import org.apache.flink.table.data.GenericRowData;
import org.apache.flink.table.data.RowData;
import org.apache.flink.table.data.StringData;
import org.apache.flink.table.data.TimestampData;
import org.apache.flink.table.types.DataType;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import com.alibaba.fastjson.JSONObject;
import com.alibaba.fastjson.TypeReference;
import com.flink.common.DorisConnector;
import com.flink.common.SourceCommonBase;
import com.flink.enums.AppTypeEnum;
import com.flink.util.GenDeviceIdV1;
import com.flink.vo.CollectLog;
import com.flink.vo.UserProperties;
import com.flink.vo.android.AndroidCollectionBody;
import com.flink.vo.android.deviceInfo.AndroidA1;
import com.flink.vo.android.envInfo.AndroidEnvInfo;
import com.flink.vo.android.otherInfo.OtherInfo;
import com.flink.vo.ios.IosCollectionBody;
import com.flink.vo.ios.IosDeviceInfo;
import com.flink.vo.ios.IosEnvInfo;
/**
* @author wjs
* @version 创建时间:2025-5-21 11:42:51
* 类说明
*/
public class CollectLogAchi extends SourceCommonBase implements Serializable{
/**
*
*/
private static final long serialVersionUID = 1L;
private static final Logger logger = LoggerFactory.getLogger(CollectLogAchi.class);
@Override
public void parseSourceKafkaJson(DataStreamSource<String> dataStreamSource) throws ParseException, Exception {
//=================配置入库字段=========================================
String[] fields = {
"id",
"dt",
"device_id",
"device_id_v1",
"uid",
"idfv",
"app_key",
"app_type",
"other_info",
"device_info",
"env_info",
"cid",
"phone",
"nick",
"unique_id",
"create_time",
"__DORIS_DELETE_SIGN__"
};
DataType[] types = {
DataTypes.STRING(),
DataTypes.DATE(),
DataTypes.STRING(),
DataTypes.STRING(),
DataTypes.STRING(),
DataTypes.STRING(),
DataTypes.STRING(),
DataTypes.STRING(),
DataTypes.STRING(),
DataTypes.STRING(),
DataTypes.STRING(),
DataTypes.STRING(),
DataTypes.STRING(),
DataTypes.STRING(),
DataTypes.STRING(),
DataTypes.TIMESTAMP(3),
DataTypes.INT()
};
//=================流式处理=========================================
String tableName = "bi.collect_log";
DorisSink<RowData> dorisSink = DorisConnector.sinkDoris(fields, types, tableName);
//=================数据处理流水线=========================================
dataStreamSource
.map(value->{
try {
// 解析 Kafka 数据
CollectLog log = JSONObject.parseObject(value, new TypeReference<CollectLog>(){});
if(null == log) {
return null;
}
String appType = log.getAppType();
String appKey = log.getAppKey();
String other_info = log.getOtherInfo();
String device_info = log.getDeviceInfo();
String env_info = log.getEnvInfo();
String createTime = log.getCreateTime();
String genDeviceId = genDeviceId(appType, other_info, device_info, env_info);
String idfv = null;
String device_id_v1 = null;
if(StringUtils.equals(appType, AppTypeEnum.ANDROID.getCode())) {
device_id_v1 = genDeviceId;
if(StringUtils.isEmpty(appKey) || StringUtils.equals(appKey, "C7jias27jias2")) {
appKey = "8ooOvXJo276";
}
}else if(StringUtils.equals(appType, AppTypeEnum.IOS.getCode())) {
String[] genDeviceIdList = genDeviceId.split(",");
device_id_v1 = (genDeviceIdList.length > 0) ? genDeviceIdList[0] : "";
idfv = (genDeviceIdList.length > 1) ? genDeviceIdList[1] : "";
}
String userProperties = log.getUserProperties();
String cid = null;
String phone = null;
String nick = null;
if(StringUtils.isNotEmpty(userProperties)) {
List<UserProperties> userPropertiesList = JSONObject.parseObject(userProperties,new TypeReference<List<UserProperties>>(){});
if(userPropertiesList != null && userPropertiesList.size() > 0) {
for(UserProperties user : userPropertiesList) {
if(StringUtils.isNotEmpty(user.getCid())) {
cid = user.getCid();
}else if(StringUtils.isNotEmpty(user.getPhone())) {
phone = user.getPhone();
}else if(StringUtils.isNotEmpty(user.getId())) {
cid = user.getId();
}else if(StringUtils.isNotEmpty(user.getNick())) {
nick = user.getNick();
}else if(StringUtils.isNotEmpty(user.getEmail())) {
nick = user.getEmail();
}
}
}
}
// 转换为RowData
GenericRowData row = new GenericRowData(fields.length);
row.setField(0, StringData.fromString(log.getId())); // id
row.setField(1, convertToSqlDate(createTime.substring(0, 10))); // dt
row.setField(2, StringData.fromString(log.getDeviceId())); // device_id
row.setField(3, StringData.fromString(device_id_v1)); // device_id_v1
row.setField(4, StringData.fromString(log.getUid())); // uid
row.setField(5, StringData.fromString(idfv)); // idfv
row.setField(6, StringData.fromString(log.getAppKey())); // app_key
row.setField(7, StringData.fromString(log.getAppType())); // app_type
row.setField(8, StringData.fromString(other_info)); // other_info
row.setField(9, StringData.fromString(device_info)); // device_info
row.setField(10, StringData.fromString(env_info)); // env_info
row.setField(11, StringData.fromString(cid)); // cid
row.setField(12, StringData.fromString(phone)); // phone
row.setField(13, StringData.fromString(nick)); // nick
row.setField(14, StringData.fromString(log.getUniqueId())); // unique_id
row.setField(15, TimestampData.fromLocalDateTime(LocalDateTime.parse(createTime,DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss.SSS")))); //
row.setField(16, 0); // __DORIS_DELETE_SIGN__
return (RowData)row;
} catch (Exception e) {
System.err.println("解析失败: "+e.toString());
return null;
}
})
.filter(Objects::nonNull)
// .print()
.sinkTo(dorisSink)
.name("Doris-CollectLog");
}
private static int convertToSqlDate(String datetime) {
return (int) LocalDate.parse(datetime.split(" ")[0])
.toEpochDay(); // 转换为天数偏移量
}
private String genDeviceId(String appType,String other_info,String device_info,String env_info) {
String deviceId = null;
if(StringUtils.equals(appType, AppTypeEnum.ANDROID.getCode())) {
AndroidA1 a1 = JSONObject.parseObject(device_info,new TypeReference<AndroidA1>(){});
AndroidEnvInfo g1 = JSONObject.parseObject(env_info,new TypeReference<AndroidEnvInfo>(){});
OtherInfo i1 = JSONObject.parseObject(other_info,new TypeReference<OtherInfo>(){});
AndroidCollectionBody androidBodyObj = new AndroidCollectionBody();
androidBodyObj.setA1(a1);
androidBodyObj.setG1(g1);
androidBodyObj.setI1(i1);
deviceId = GenDeviceIdV1.genAndroidDeviceIdHashV1(androidBodyObj);
}else if(StringUtils.equals(appType, AppTypeEnum.IOS.getCode())) {
IosDeviceInfo a1 = JSONObject.parseObject(device_info,new TypeReference<IosDeviceInfo>(){});
IosEnvInfo g1 = JSONObject.parseObject(env_info,new TypeReference<IosEnvInfo>(){});
IosCollectionBody iosBodyObj = new IosCollectionBody();
iosBodyObj.setA1(a1);
iosBodyObj.setG1(g1);
deviceId = GenDeviceIdV1.genIosDeviceIdHash(iosBodyObj);
}
return deviceId;
}
@Override
public void sendToSinkKafka(DataStreamSource<String> mStream) {
// TODO Auto-generated method stub
}
}
......@@ -22,6 +22,7 @@ public enum JobTypeEnum {
REAL_KYC("JOB_03", "真实KYC作业"),
REAL_USERS("JOB_04", "真实用户作业"),
REAL_BALANCE("JOB_05", "真实余额作业"),
COLLECT_LOG("JOB_06", "日志采集作业"),
;
......
......@@ -2,6 +2,7 @@ package com.flink.factory;
import com.flink.enums.JobTypeEnum;
import com.flink.processor.JobProcessor;
import com.flink.processor.impl.CollectLogProcessor;
import com.flink.processor.impl.EventIpConvertProcessor;
import com.flink.processor.impl.RealBalanceProcessor;
import com.flink.processor.impl.RealKycProcessor;
......@@ -27,6 +28,8 @@ public class JobProcessorFactory {
return new RealUsersProcessor();
case REAL_BALANCE:
return new RealBalanceProcessor();
case COLLECT_LOG:
return new CollectLogProcessor();
default:
throw new IllegalArgumentException("未知的Job类型: " + jobType);
}
......
package com.flink.processor.impl;
import com.flink.achieve.doris.CollectLogAchi;
import com.flink.enums.JobTypeEnum;
import com.flink.enums.TopicTypeEnum;
import com.flink.processor.JobProcessor;
/**
* @author wjs
* @version 创建时间:2025-5-21 11:42:10
* 类说明
*/
public class CollectLogProcessor implements JobProcessor{
@Override
public void process() throws Exception {
new CollectLogAchi().handleDataStreamSource(
JobTypeEnum.COLLECT_LOG.getDescription(),
TopicTypeEnum.ODS_NEW_COLLECT_LOG.getTopic(),
TopicTypeEnum.ODS_NEW_COLLECT_LOG.getGroup(),
JobTypeEnum.COLLECT_LOG.getCode()
);
}
}
package com.flink.vo;
import java.io.Serializable;
import lombok.Data;
import lombok.ToString;
/**
* @author wjs
* @version 创建时间:2025-5-21 11:57:34
* 类说明
*/
@Data
@ToString
public class CollectLog implements Serializable{
/**
*
*/
private static final long serialVersionUID = 1L;
private String id;
private String deviceId;
private String uid;
private String strategyGroupId;
private String strategyVersion;
private String sendTime;
private String createTime;
private String appKey;
private String appType;
private String appChannel;
private String zoneCode;
private String zoneName;
private String zoneType;
private String sdkVersion;
private String userAgent;
private String otherInfo;
private String deviceInfo;
private String envInfo;
private String userProperties;
private String uniqueId;
}
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or sign in to comment