Commit c889f22f by 魏建枢

代码提交

parent 8e4b37b0
...@@ -51,6 +51,10 @@ public class CommonConsumeBaseAchi extends SourceCommonBase implements Serializa ...@@ -51,6 +51,10 @@ public class CommonConsumeBaseAchi extends SourceCommonBase implements Serializa
DataStreamSource<String> collectLogStreamSource = kafkaDataSource.getDataStreamSource(); DataStreamSource<String> collectLogStreamSource = kafkaDataSource.getDataStreamSource();
CollectLogAchi.collectLog(collectLogStreamSource); CollectLogAchi.collectLog(collectLogStreamSource);
} }
if(StringUtils.equals(kafkaDataSource.getTopic(),TopicTypeEnum.ODS_PC_COLLECT_LOG.getTopic())) {
DataStreamSource<String> pcCollectLogStreamSource = kafkaDataSource.getDataStreamSource();
PcCollectLogAchi.pccollectLog(pcCollectLogStreamSource);
}
if(StringUtils.equals(kafkaDataSource.getTopic(),TopicTypeEnum.OPEN_SIMI_API.getTopic())) { if(StringUtils.equals(kafkaDataSource.getTopic(),TopicTypeEnum.OPEN_SIMI_API.getTopic())) {
DataStreamSource<String> oepnSimiApiStreamSource = kafkaDataSource.getDataStreamSource(); DataStreamSource<String> oepnSimiApiStreamSource = kafkaDataSource.getDataStreamSource();
OpenSimiApiAchi.openSimiApi(oepnSimiApiStreamSource); OpenSimiApiAchi.openSimiApi(oepnSimiApiStreamSource);
......
package com.flink.achieve.base;
import java.io.Serializable;
import java.time.LocalDateTime;
import java.time.format.DateTimeFormatter;
import java.util.Objects;
import org.apache.commons.codec.digest.DigestUtils;
import org.apache.commons.lang3.StringUtils;
import org.apache.doris.flink.sink.DorisSink;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.table.api.DataTypes;
import org.apache.flink.table.data.GenericRowData;
import org.apache.flink.table.data.RowData;
import org.apache.flink.table.data.StringData;
import org.apache.flink.table.data.TimestampData;
import org.apache.flink.table.types.DataType;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONObject;
import com.alibaba.fastjson.TypeReference;
import com.flink.common.DorisConnector;
import com.flink.config.TableConfig;
import com.flink.processor.impl.OkHttpService;
import com.flink.processor.impl.StringBuilderParams;
import com.flink.util.CompareUtils;
import com.flink.util.TimeConvertUtil;
import com.flink.vo.PcCollectInfoLog;
import com.flink.vo.PcDeviceInfo;
import com.flink.vo.UserInfoDto;
/**
* @author wjs
* @version 创建时间:2025-9-19 16:32:14 类说明
*/
public class PcCollectLogAchi implements Serializable {
/**
*
*/
private static final long serialVersionUID = 1L;
private static final Logger logger = LoggerFactory.getLogger(PcCollectLogAchi.class);
// 定义公共常量
private static final String FLUME_TYPE_FIELD = "flume_type";
private static final String DORIS_DELETE_SIGN = "__DORIS_DELETE_SIGN__";
private static final int DELETE_SIGN_VALUE = 0;
// 设备信息采集日志表配置
private static final String[] PC_COLLECT_FIELDS = { "id", "dt", "device_id", "device_id_v1", "uid", "app_key",
"app_type", "device_info", "cid", "phone", "nick", "unique_id", "create_time", "network_ip",
"network_area_name", "send_time", "app_channel", "zone_code", "zone_name", "zone_type", "user_agent",
"device_name", "brand", "device_model", "os_release", "app_version", "platform", "third_id", "country_code",
"register_time", "user_state", "user_head_url", DORIS_DELETE_SIGN };
private static final DataType[] PC_COLLECT_TYPES = { DataTypes.STRING(), DataTypes.DATE(), DataTypes.STRING(),
DataTypes.STRING(), DataTypes.STRING(), DataTypes.STRING(), DataTypes.STRING(), DataTypes.STRING(),
DataTypes.STRING(), DataTypes.STRING(), DataTypes.STRING(), DataTypes.STRING(), DataTypes.TIMESTAMP(3),
DataTypes.STRING(), DataTypes.STRING(), DataTypes.TIMESTAMP(3), DataTypes.STRING(), DataTypes.STRING(),
DataTypes.STRING(), DataTypes.STRING(), DataTypes.STRING(), DataTypes.STRING(), DataTypes.STRING(),
DataTypes.STRING(), DataTypes.STRING(), DataTypes.STRING(), DataTypes.STRING(), DataTypes.STRING(),
DataTypes.STRING(), DataTypes.STRING(), DataTypes.STRING(), DataTypes.STRING(), DataTypes.INT() };
public static void pccollectLog(DataStreamSource<String> dataStreamSource) {
// 初始化表配置
TableConfig pcCollectConfig = new TableConfig(PC_COLLECT_FIELDS, PC_COLLECT_TYPES, "bi.pc_collect_log");
// 创建Doris Sink
DorisSink<RowData> dorisPcCollectSink = DorisConnector.sinkDoris(pcCollectConfig.getFields(),
pcCollectConfig.getTypes(), pcCollectConfig.getTableName());
// 处理设备信息采集日志数据
processDataStream(dataStreamSource, "pcCollectLog", pcCollectConfig, dorisPcCollectSink,
(RowMapper<String>) PcCollectLogAchi::mapToPcCollectRow);
}
private static <T> void processDataStream(DataStreamSource<String> dataStream, String flumeType,
TableConfig tableConfig, DorisSink<RowData> dorisSink, RowMapper<String> rowMapper) {
SingleOutputStreamOperator<RowData> processedStream = dataStream
.map(new ElementProcessorWithMap<>(flumeType, rowMapper, tableConfig.getFields().length))
.returns(TypeInformation.of(RowData.class)).filter(Objects::nonNull);
processedStream.sinkTo(dorisSink).name("Doris-" + flumeType);
}
/**
* 使用map算子的内部处理类
*/
private static class ElementProcessorWithMap<T> implements MapFunction<String, RowData>, Serializable {
private static final long serialVersionUID = 1L;
private final String flumeType;
private final RowMapper<String> mapper;
private final int fieldCount;
public ElementProcessorWithMap(String flumeType, RowMapper<String> mapper, int fieldCount) {
this.flumeType = flumeType;
this.mapper = mapper;
this.fieldCount = fieldCount;
}
@Override
public RowData map(String value) throws Exception {
try {
JSONObject jsonObj = JSON.parseObject(value);
if (!flumeType.equals(jsonObj.getString(FLUME_TYPE_FIELD))) {
return null;
}
return mapper.map(value, fieldCount);
} catch (Exception e) {
logger.error("PcCollectLogAchi 处理 {} 数据出错 | rawData:{} | error:{}", flumeType, value, e.getMessage(),
e);
}
return null;
}
}
// 设备日志采集数据映射
private static RowData mapToPcCollectRow(String value, int fieldCount) {
PcCollectInfoLog log = JSON.parseObject(value, new TypeReference<PcCollectInfoLog>() {
});
String appType = log.getApp_type();
String appKey = log.getApp_key();
String device_info = log.getDevice_info();
String createTime = log.getCreate_time();
String sendTime = log.getSend_time();
String deviceId = log.getDevice_id();
String deviceIdV1 = log.getDevice_id_v1();
String uniqueId = log.getUnique_id();
String uid = log.getUid();
String brand = log.getBrand();
String deviceModel = log.getDevice_model();
String osRelease = log.getOs_release();
String deviceName = log.getDevice_name();
String cid = log.getCid();
String phone = log.getPhone();
String nick = log.getNick();
String platform = getPlatformByAppKey(appKey);
if (StringUtils.isEmpty(log.getDevice_id())) {
if (StringUtils.isNotEmpty(device_info)) {
PcDeviceInfo pcDeviceInfo = JSONObject.parseObject(device_info, new TypeReference<PcDeviceInfo>() {
});
deviceId = pcDeviceInfo.getI8();
deviceIdV1 = pcDeviceInfo.getI8();
uniqueId = pcDeviceInfo.getI8();
uid = pcDeviceInfo.getI7();
deviceModel = pcDeviceInfo.getB3();
brand = pcDeviceInfo.getB2();
osRelease = pcDeviceInfo.getB4();
deviceName = pcDeviceInfo.getB2() + "-" + pcDeviceInfo.getB3();
cid = pcDeviceInfo.getCid();
phone = pcDeviceInfo.getPhone();
nick = pcDeviceInfo.getNick();
// log = OkHttpService.getUserInfo(appKey, cid, log);
}
}
GenericRowData row = new GenericRowData(fieldCount);
row.setField(0, StringData.fromString(log.getId()));
row.setField(1, TimeConvertUtil.convertToSqlDate(createTime.substring(0, 10)));
row.setField(2, StringData.fromString(deviceId));
row.setField(3, StringData.fromString(deviceIdV1));
row.setField(4, StringData.fromString(uid));
row.setField(5, StringData.fromString(appKey));
row.setField(6, StringData.fromString(appType));
row.setField(7, StringData.fromString(device_info));
row.setField(8, StringData.fromString(cid));
row.setField(9, StringData.fromString(phone));
row.setField(10, StringData.fromString(nick));
row.setField(11, StringData.fromString(uniqueId));
row.setField(12, TimestampData.fromLocalDateTime(
LocalDateTime.parse(createTime, DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss.SSS"))));
row.setField(13, StringData.fromString(log.getNetwork_ip()));
row.setField(14, StringData.fromString(log.getNetworkAreaName()));
row.setField(15, TimestampData.fromLocalDateTime(
LocalDateTime.parse(sendTime, DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss.SSS"))));
row.setField(16, StringData.fromString(log.getApp_channel()));
row.setField(17, StringData.fromString(log.getZone_code()));
row.setField(18, StringData.fromString(log.getZone_name()));
row.setField(19, StringData.fromString(log.getZone_type()));
row.setField(20, StringData.fromString(log.getUser_agent()));
row.setField(21, StringData.fromString(deviceName));
row.setField(22, StringData.fromString(brand));
row.setField(23, StringData.fromString(deviceModel));
row.setField(24, StringData.fromString(osRelease));
row.setField(25, StringData.fromString(log.getApp_version()));
row.setField(26, StringData.fromString(platform));
row.setField(27, StringData.fromString(log.getThird_id()));
row.setField(28, StringData.fromString(log.getCountry_code()));
row.setField(29, StringData.fromString(log.getRegister_time()));
row.setField(30, StringData.fromString(log.getUser_state()));
row.setField(31, StringData.fromString(log.getUser_head_url()));
row.setField(32, DELETE_SIGN_VALUE);
return row;
}
/**
* 行数据映射接口
*
* @param <T> 数据类型
*/
@FunctionalInterface
private static interface RowMapper<T> extends Serializable {
RowData map(T item, int fieldCount);
}
private static String getPlatformByAppKey(String appKey) {
switch (appKey) {
// 无链平台
case "8ooOvXJo276":
return "无链安卓国内版";
case "9JQ3A7GA420":
return "无链IOS海外版";
// 私米平台
case "ptyzTPaV207":
return "私米安卓国内版";
case "giHQ1YLp925":
return "私米IOS国内版";
case "lOxLJYzx658":
return "私米安卓海外版";
case "lcALJYzx932":
return "私米IOS海外版";
// pc 国内版
case "pc1KPjmh951":
return "Win国内版";
case "pcrIjvC5805":
return "Linux国内版";
case "pcUXtmMh356":
return "MacIntel国内版";
case "pcrPGB1z531":
return "MacArm国内版";
// pc 海外版
case "pcRIhwh1380":
return "Win海外版";
case "pcQmdNl0952":
return "Linux海外版";
case "pc1etTC6207":
return "MacIntel海外版";
case "pcd9Sa8T989":
return "MacArm海外版";
// 私米无链测试版
case "tMC1YpTb451":
return "私米安卓国内测试版";
case "t4vyAoSP759":
return "私米IOS国内测试版";
case "t83W0uD5645":
return "私米安卓海外测试版";
case "trkxeF7j175":
return "私米IOS海外测试版";
case "tXPNHvy9327":
return "无链安卓国内测试版";
case "tdNfngBH705":
return "无链IOS国内测试版";
// pc 国内测试版
case "ts1KPjmh951":
return "Win国内测试版";
case "tsrIjvC5805":
return "Linux国内测试版";
case "tsUXtmMh356":
return "MacIntel国内测试版";
case "tsrPGB1z531":
return "MacArm国内测试版";
// pc 海外测试版
case "tsRIhwh1380":
return "Win海外测试版";
case "tsQmdNl0952":
return "Linux海外测试版";
case "ts1etTC6207":
return "MacIntel海外测试版";
case "tsd9Sa8T989":
return "MacArm海外测试版";
default:
return "未知平台";
}
}
}
package com.flink.enums;
/**
* @author wjs
* @version 创建时间:2025-9-16 16:21:26
* 类说明
*/
public enum ProcessTypeEnum {
/**
* 0、未处理
*/
NOT_HANDLE(0, "未处理"),
/**
* 1、无需处理
*/
NO_HANDLE(1, "无需处理"),
/**
* 2、已处理
*/
SUCCESS(2,"已处理")
;
private Integer code;
private String name;
private ProcessTypeEnum(Integer code, String name) {
this.code = code;
this.name = name;
}
public Integer getCode() {
return code;
}
public String getName() {
return name;
}
}
...@@ -33,6 +33,7 @@ public class CommonConsumeBaseProcessor implements JobProcessor{ ...@@ -33,6 +33,7 @@ public class CommonConsumeBaseProcessor implements JobProcessor{
TopicTypeEnum.ODS_EVENT_LOG, TopicTypeEnum.ODS_EVENT_LOG,
TopicTypeEnum.ODS_COMMUNITY_HISTORY, TopicTypeEnum.ODS_COMMUNITY_HISTORY,
TopicTypeEnum.ODS_NEW_COLLECT_LOG, TopicTypeEnum.ODS_NEW_COLLECT_LOG,
TopicTypeEnum.ODS_PC_COLLECT_LOG,
TopicTypeEnum.OPEN_SIMI_API, TopicTypeEnum.OPEN_SIMI_API,
TopicTypeEnum.ODS_EXCEPTION_EVENT_TOPIC, TopicTypeEnum.ODS_EXCEPTION_EVENT_TOPIC,
TopicTypeEnum.ODS_COLLECT_USER_BEHAVIOR, TopicTypeEnum.ODS_COLLECT_USER_BEHAVIOR,
......
...@@ -5,14 +5,18 @@ import java.time.LocalDate; ...@@ -5,14 +5,18 @@ import java.time.LocalDate;
import java.time.format.DateTimeFormatter; import java.time.format.DateTimeFormatter;
import org.apache.commons.codec.digest.DigestUtils; import org.apache.commons.codec.digest.DigestUtils;
import org.apache.commons.lang3.StringUtils;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
import com.alibaba.fastjson.JSONArray; import com.alibaba.fastjson.JSONArray;
import com.alibaba.fastjson.JSONObject; import com.alibaba.fastjson.JSONObject;
import com.alibaba.fastjson.TypeReference; import com.alibaba.fastjson.TypeReference;
import com.flink.util.CompareUtils;
import com.flink.util.LoadPropertiesFile; import com.flink.util.LoadPropertiesFile;
import com.flink.vo.PcCollectInfoLog;
import com.flink.vo.SimiInterfaceBase; import com.flink.vo.SimiInterfaceBase;
import com.flink.vo.UserInfoDto;
import okhttp3.MediaType; import okhttp3.MediaType;
import okhttp3.OkHttpClient; import okhttp3.OkHttpClient;
...@@ -32,6 +36,7 @@ public class OkHttpService{ ...@@ -32,6 +36,7 @@ public class OkHttpService{
public static final MediaType JSON = MediaType.parse("application/json; charset=utf-8"); public static final MediaType JSON = MediaType.parse("application/json; charset=utf-8");
private final static String URL = LoadPropertiesFile.getPropertyFileValues("simiUserInfo.url"); private final static String URL = LoadPropertiesFile.getPropertyFileValues("simiUserInfo.url");
private final static String ABROAD_URL = LoadPropertiesFile.getPropertyFileValues("simiUserInfo.abroadUrl");
private final static String AUTHORIZATION = LoadPropertiesFile.getPropertyFileValues("simiUserInfo.authorization"); private final static String AUTHORIZATION = LoadPropertiesFile.getPropertyFileValues("simiUserInfo.authorization");
private final static String KEY = LoadPropertiesFile.getPropertyFileValues("simiUserInfo.key"); private final static String KEY = LoadPropertiesFile.getPropertyFileValues("simiUserInfo.key");
...@@ -43,13 +48,60 @@ public class OkHttpService{ ...@@ -43,13 +48,60 @@ public class OkHttpService{
} }
public static PcCollectInfoLog getUserInfo(String appKey,String cid,PcCollectInfoLog pcCollectInfoLog) {
//通过cid或者phone去获取用户信息(调第三方接口)
String timestamp = System.currentTimeMillis()+"";
String reqParams = StringBuilderParams.userInfo(cid, null);
String url = null;
//国内
if(CompareUtils.stringExists(appKey,
"pc1KPjmh951",
"pcrIjvC5805",
"pcUXtmMh356",
"pcrPGB1z531",
"ts1KPjmh951",
"tsrIjvC5805",
"tsUXtmMh356",
"tsrPGB1z531"
)) {
url = URL+"/dataApi/userInfo?"+reqParams;
}
//海外
if(CompareUtils.stringExists(appKey,
"pcRIhwh1380",
"pcQmdNl0952",
"pc1etTC6207",
"pcd9Sa8T989",
"tsRIhwh1380",
"tsQmdNl0952",
"ts1etTC6207",
"tsd9Sa8T989"
)) {
url = ABROAD_URL+"/dataApi/userInfo?"+reqParams;
}
if(StringUtils.isNotEmpty(url)) {
String signature = DigestUtils.md5Hex(reqParams+"&"+AUTHORIZATION+"&"+timestamp+"&"+KEY);
String userStr = OkHttpService.get(url, timestamp,AUTHORIZATION,signature);
if(StringUtils.isNotEmpty(userStr)) {
UserInfoDto userInfo = JSONObject.parseObject(userStr,new TypeReference<UserInfoDto>(){});
pcCollectInfoLog.setThird_id(userInfo.getThirdId());
pcCollectInfoLog.setCountry_code(userInfo.getCountryCode());
pcCollectInfoLog.setRegister_time(userInfo.getCreateTime());
pcCollectInfoLog.setUser_state(userInfo.getUserState());
pcCollectInfoLog.setUser_head_url(userInfo.getUserHeadUrl());
}
}
return pcCollectInfoLog;
}
public static String friends(String cid){ public static String friends(String cid){
String timestamp = System.currentTimeMillis()+""; String timestamp = System.currentTimeMillis()+"";
String reqParams = StringBuilderParams.friends(cid, null); String reqParams = StringBuilderParams.friends(cid, null);
String url = URL+"/dataApi/friends?"+reqParams; String url = URL+"/dataApi/friends?"+reqParams;
String authorization = AUTHORIZATION; String authorization = AUTHORIZATION;
String signature = DigestUtils.md5Hex(reqParams+"&"+authorization+"&"+timestamp+"&"+KEY); String signature = DigestUtils.md5Hex(reqParams+"&"+authorization+"&"+timestamp+"&"+KEY);
return get(url, timestamp,authorization,signature,cid); return get(url, timestamp,authorization,signature);
} }
public static String groups(String cid) { public static String groups(String cid) {
...@@ -58,7 +110,7 @@ public class OkHttpService{ ...@@ -58,7 +110,7 @@ public class OkHttpService{
String url = URL+"/dataApi/groups?"+reqParams; String url = URL+"/dataApi/groups?"+reqParams;
String authorization = AUTHORIZATION; String authorization = AUTHORIZATION;
String signature = DigestUtils.md5Hex(reqParams+"&"+authorization+"&"+timestamp+"&"+KEY); String signature = DigestUtils.md5Hex(reqParams+"&"+authorization+"&"+timestamp+"&"+KEY);
return get(url, timestamp,authorization,signature,cid); return get(url, timestamp,authorization,signature);
} }
...@@ -80,7 +132,7 @@ public class OkHttpService{ ...@@ -80,7 +132,7 @@ public class OkHttpService{
post(url,jsonArray.toJSONString(),timestamp,authorization,signature); post(url,jsonArray.toJSONString(),timestamp,authorization,signature);
} }
private static String get(String url, String timestamp, String authorization, String signature,String cid) { private static String get(String url, String timestamp, String authorization, String signature) {
System.out.println("get OkHttpService: 请求内容 : \n " + url); System.out.println("get OkHttpService: 请求内容 : \n " + url);
OkHttpClient client = new OkHttpClient(); OkHttpClient client = new OkHttpClient();
Request request = new Request.Builder() Request request = new Request.Builder()
......
...@@ -35,4 +35,15 @@ public class StringBuilderParams { ...@@ -35,4 +35,15 @@ public class StringBuilderParams {
return String.join("&", array); return String.join("&", array);
} }
public static String userInfo(String cid,String phoneNumber) {
List<String> array = new ArrayList<>();
if(StringUtils.isNotEmpty(cid)) {
array.add("cid="+cid);
}
if(StringUtils.isNotEmpty(phoneNumber)) {
array.add("phoneNumber="+phoneNumber);
}
return String.join("&", array);
}
} }
package com.flink.vo;
import java.io.Serializable;
import lombok.Data;
/**
* @author wjs
* @version 创建时间:2025-9-9 14:31:54 类说明
*/
@Data
public class PcCollectInfoLog implements Serializable {
/**
*
*/
private static final long serialVersionUID = 1L;
private String id;
private String flume_type;
private String device_id;
private String device_id_v1;
private String third_id;
private String app_type;
private String user_head_url;
private String device_model;
private String app_version;
private String send_type;
private String zone_name;
private String os_release;
private String platform;
private String register_time;
private String zone_code;
private String user_state;
private String nick;
private String uid;
private String device_name;
private String zone_type;
private String brand;
private String user_agent;
private String unique_id;
private String create_time;
private String network_ip;
private String app_channel;
private String country_code;
private String send_time;
private String app_key;
private String device_info;
private String phone;
private String networkAreaName;
private String cid;
}
package com.flink.vo;
import java.io.Serializable;
import lombok.Data;
/**
* @author wjs
* @version 创建时间:2025-2-17 11:10:52
* 类说明
*/
@Data
public class UserInfoDto implements Serializable{
/**
*
*/
private static final long serialVersionUID = 1L;
private String cid;
private String countryCode;
private String phoneNumber;
private String nick;
private String createTime;
private String userState;
private String userHeadUrl;
private String thirdId;
}
...@@ -15,6 +15,7 @@ hdfs.url=hdfs://10.0.0.105:8020/user/ck/ ...@@ -15,6 +15,7 @@ hdfs.url=hdfs://10.0.0.105:8020/user/ck/
#simiUserInfo.url= https://imadmin.simitalk.com/prod-api/ #simiUserInfo.url= https://imadmin.simitalk.com/prod-api/
simiUserInfo.url= https://zterra.simitalk.com/prod-api/ simiUserInfo.url= https://zterra.simitalk.com/prod-api/
simiUserInfo.abroadUrl= https://zterra.simitalk.top/prod-api/
simiUserInfo.authorization= KhhZAQKaZkfd7p55 simiUserInfo.authorization= KhhZAQKaZkfd7p55
simiUserInfo.key= niiKpP4SXce2zCHZ simiUserInfo.key= niiKpP4SXce2zCHZ
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or sign in to comment