Commit 81e56705 by 魏建枢

代码抽取

parent 9098a04c
......@@ -112,7 +112,7 @@ public class CollectLogAchi extends SourceCommonBase implements Serializable{
row.setField(3, StringData.fromString(deviceIdInfo==null ? null : deviceIdInfo.getDeviceIdV1())); // device_id_v1
row.setField(4, StringData.fromString(log.getUid())); // uid
row.setField(5, StringData.fromString(deviceIdInfo==null ? null : deviceIdInfo.getIdfv())); // idfv
row.setField(6, StringData.fromString(log.getAppKey())); // app_key
row.setField(6, StringData.fromString(deviceIdInfo.getAppKey())); // app_key
row.setField(7, StringData.fromString(log.getAppType())); // app_type
row.setField(8, StringData.fromString(other_info)); // other_info
row.setField(9, StringData.fromString(device_info)); // device_info
......
......@@ -16,6 +16,7 @@ import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.common.io.ParseException;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.table.api.DataTypes;
import org.apache.flink.table.data.GenericRowData;
import org.apache.flink.table.data.RowData;
......@@ -35,6 +36,8 @@ import com.flink.processor.function.CollectLogJoinProcessor;
import com.flink.processor.function.GenDeviceIdProcessor;
import com.flink.processor.function.LatestUserProcessFunction;
import com.flink.processor.function.UserPropertiesProcessor;
import com.flink.util.CompareUtils;
import com.flink.util.LoadPropertiesFile;
import com.flink.util.TimeConvertUtil;
import com.flink.vo.CollectLog;
import com.flink.vo.DeviceId;
......@@ -86,11 +89,36 @@ public class DeviceIdLatestAchi extends MultipleSourceCommonBase implements Seri
DataTypes.TIMESTAMP(3),
DataTypes.INT()
};
String[] chainlessFields = {
"cid",
"app_key",
"device_id",
"phone",
"app_type",
"nick",
"create_time",
"__DORIS_DELETE_SIGN__"
};
DataType[] chainlessTypes = {
DataTypes.STRING(),
DataTypes.STRING(),
DataTypes.STRING(),
DataTypes.STRING(),
DataTypes.STRING(),
DataTypes.STRING(),
DataTypes.TIMESTAMP(3),
DataTypes.INT()
};
//=================流式处理=========================================
String tableName = "bi.device_id_latest";
DorisSink<RowData> dorisSink = DorisConnector.sinkDoris(fields, types, tableName);
String tableNameChainless = "bi.chainless_device_id_latest";
DorisSink<RowData> chainlessDorisSink = DorisConnector.sinkDoris(chainlessFields, chainlessTypes, tableNameChainless);
//=================数据处理流水线=========================================
operatorStream(dataSourceList).map(new MapFunction<Result, RowData>(){
//simi国内
operatorSimiUserStream(dataSourceList).map(new MapFunction<Result, RowData>(){
/**
*
......@@ -99,31 +127,87 @@ public class DeviceIdLatestAchi extends MultipleSourceCommonBase implements Seri
@Override
public RowData map(Result result) throws Exception {
if(null == result) {
if(!CompareUtils.stringExists(result.getAppKey().trim(),
LoadPropertiesFile.getPropertyFileValues("simi.android.appKe"),
LoadPropertiesFile.getPropertyFileValues("simi.ios.appKey"))) {
return null;
}
GenericRowData row = new GenericRowData(fields.length);
row.setField(0, StringData.fromString(result.getCid()));
row.setField(1, StringData.fromString(result.getPhone()));
row.setField(2, StringData.fromString(result.getAppKey()));
row.setField(3, StringData.fromString(result.getDeviceId()));
row.setField(4, StringData.fromString(result.getAppType()));
row.setField(5, StringData.fromString(result.getCountryCode()));
row.setField(6, StringData.fromString(result.getUserState()));
row.setField(7, StringData.fromString(result.getNick()));
row.setField(8, TimestampData.fromLocalDateTime(LocalDateTime.parse(result.getCreateTime(),DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss.SSS"))));
row.setField(9, 0); // __DORIS_DELETE_SIGN__
return row;
return exeRowData(result, fields);
}
})
.filter(Objects::nonNull)
// .print()
.sinkTo(dorisSink)
.name("Doris-DeviceIdLatest");
.name("Doris-DeviceIdLatest simi");
//simi海外
operatorAbroadSimiUserStream(dataSourceList).map(new MapFunction<Result, RowData>(){
/**
*
*/
private static final long serialVersionUID = 1L;
@Override
public RowData map(Result result) throws Exception {
if(!CompareUtils.stringExists(result.getAppKey().trim(),
LoadPropertiesFile.getPropertyFileValues("abroadSimi.android.appKey"),
LoadPropertiesFile.getPropertyFileValues("abroadSimi.ios.appKey"))) {
return null;
}
return exeRowData(result, fields);
}
})
.filter(Objects::nonNull)
// .print()
.sinkTo(dorisSink)
.name("Doris-DeviceIdLatest abroadSimi");
//处理无链用户
if(CollectionUtils.isNotEmpty(dataSourceList)) {
DataStreamSource<String> collectLogStreamSource = null;
for(KafkaDataSource kafkaDataSource : dataSourceList) {
if(StringUtils.equals(kafkaDataSource.getTopic(),TopicTypeEnum.ODS_NEW_COLLECT_LOG.getTopic())) {
collectLogStreamSource = kafkaDataSource.getDataStreamSource();
}
}
SingleOutputStreamOperator<RowData> rowDataStream = collectLogStreamSource.flatMap(new FlatMapFunction<String, RowData>() {
private static final long serialVersionUID = 1L;
@Override
public void flatMap(String value, Collector<RowData> out) throws Exception {
try {
// 解析 Kafka 数据
DeviceId result = handleData(value);
if (result == null) return;
if(!CompareUtils.stringExists(result.getAppKey().trim(),
LoadPropertiesFile.getPropertyFileValues("chainless.android.appKey"),
LoadPropertiesFile.getPropertyFileValues("chainless.ios.appKey"))) {
return;
}
GenericRowData row = new GenericRowData(chainlessFields.length);
row.setField(0, StringData.fromString(result.getCid()));
row.setField(1, StringData.fromString(result.getAppKey()));
row.setField(2, StringData.fromString(result.getDeviceId()));
row.setField(3, StringData.fromString(result.getPhone()));
row.setField(4, StringData.fromString(result.getAppType()));
row.setField(5, StringData.fromString(result.getNick()));
row.setField(6, TimestampData.fromLocalDateTime(LocalDateTime.parse(result.getCreateTime(),DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss.SSS"))));
row.setField(7, 0); // __DORIS_DELETE_SIGN__
out.collect(row);
} catch (Exception e) {
logger.error("Error parsing ods_new_collect_log 处理 Kafka 消息出错 | data:{} | error:{}", value,e.getMessage());
}
}
});
rowDataStream
.filter(Objects::nonNull)
.sinkTo(chainlessDorisSink)
.name("Doris-DeviceIdLatest chainless");
}
}
private DataStream<Result> operatorStream(List<KafkaDataSource> dataSourceList) {
private DataStream<Result> operatorSimiUserStream(List<KafkaDataSource> dataSourceList) {
DataStreamSource<String> userStreamSource = null;
DataStreamSource<String> collectLogStreamSource = null;
if(CollectionUtils.isNotEmpty(dataSourceList)) {
......@@ -139,7 +223,7 @@ public class DeviceIdLatestAchi extends MultipleSourceCommonBase implements Seri
return null;
}
// 用户数据流处理(5分钟批量更新)
// simi国内用户数据流处理(5分钟批量更新)
DataStream<SimiUserInfo> userDataStream = userStreamSource.flatMap(new FlatMapFunction<String, SimiUserInfo>() {
private static final long serialVersionUID = 1L;
......@@ -177,31 +261,6 @@ public class DeviceIdLatestAchi extends MultipleSourceCommonBase implements Seri
logger.error("Error parsing ods_new_collect_log 处理 Kafka 消息出错 | data:{} | error:{}", value,e.getMessage());
}
}
private DeviceId handleData(String value) throws Exception {
CollectLog log = JSONObject.parseObject(value, new TypeReference<CollectLog>(){});
if(null == log) {
return null;
}
String appType = log.getAppType();
String appKey = log.getAppKey();
String other_info = log.getOtherInfo();
String device_info = log.getDeviceInfo();
String env_info = log.getEnvInfo();
String createTime = log.getCreateTime();
DeviceIdInfo deviceIdInfo = GenDeviceIdProcessor.genDeviceId(appType,appKey, other_info, device_info, env_info);
UserProperties userProperties = UserPropertiesProcessor.userPropertiesToJson(log.getUserProperties());
DeviceId deviceId = new DeviceId();
deviceId.setDeviceId(deviceIdInfo == null ? null : deviceIdInfo.getDeviceIdV1());
deviceId.setCid(userProperties == null ? null : userProperties.getCid());
deviceId.setPhone(userProperties == null ? null : userProperties.getPhone());
deviceId.setNick(userProperties == null ? null : userProperties.getNick());
deviceId.setAppKey(appKey);
deviceId.setAppType(appType);
deviceId.setCreateTime(createTime);
deviceId.setCollectTime(TimeConvertUtil.convertToTimestamp(createTime));
return deviceId;
}
})
.assignTimestampsAndWatermarks(WatermarkStrategy.<DeviceId>forBoundedOutOfOrderness(Duration.ofSeconds(5))
.withTimestampAssigner((device, ts) -> device.getCollectTime()));
......@@ -218,4 +277,135 @@ public class DeviceIdLatestAchi extends MultipleSourceCommonBase implements Seri
});
}
private DataStream<Result> operatorAbroadSimiUserStream(List<KafkaDataSource> dataSourceList) {
DataStreamSource<String> userAbroadStreamSource = null;
DataStreamSource<String> collectLogStreamSource = null;
if(CollectionUtils.isNotEmpty(dataSourceList)) {
for(KafkaDataSource kafkaDataSource : dataSourceList) {
if(StringUtils.equals(kafkaDataSource.getTopic(),TopicTypeEnum.ABROAD_SIMI_USER_LIST_TOPIC.getTopic())) {
userAbroadStreamSource = kafkaDataSource.getDataStreamSource();
}
if(StringUtils.equals(kafkaDataSource.getTopic(),TopicTypeEnum.ODS_NEW_COLLECT_LOG.getTopic())) {
collectLogStreamSource = kafkaDataSource.getDataStreamSource();
}
}
}else {
return null;
}
// simi海外用户数据流处理(5分钟批量更新)
DataStream<SimiUserInfo> abroadUserDataStream = userAbroadStreamSource.flatMap(new FlatMapFunction<String, SimiUserInfo>() {
private static final long serialVersionUID = 1L;
@Override
public void flatMap(String value, Collector<SimiUserInfo> out) throws Exception {
try {
// 解析 Kafka 数据
SimiUserInfo simiUserInfo = JSONObject.parseObject(value, new TypeReference<SimiUserInfo>() {
});
simiUserInfo.setUpdateTime(TimeConvertUtil.convertToTimestamp(simiUserInfo.getCreate_time()));
out.collect(simiUserInfo);
} catch (Exception e) {
logger.error("Error parsing abroad_simi_user_list 处理 Kafka 消息出错 | data:{} | error:{}", value,e.getMessage());
}
}
})
.filter(u -> StringUtils.isNoneEmpty(u.getCid(), u.getPhone_number()))
.assignTimestampsAndWatermarks(
WatermarkStrategy.<SimiUserInfo>forBoundedOutOfOrderness(Duration.ofMinutes(5))
.withTimestampAssigner((user, ts) -> user.getUpdateTime()))
.keyBy(user -> user.getCid() + "#_#" + user.getPhone_number()).process(new LatestUserProcessFunction());
// 事件数据流处理
DataStream<DeviceId> deviceDataStream = collectLogStreamSource.flatMap(new FlatMapFunction<String, DeviceId>() {
private static final long serialVersionUID = 1L;
@Override
public void flatMap(String value, Collector<DeviceId> out) throws Exception {
try {
// 解析 Kafka 数据
DeviceId device = handleData(value);
if (device != null)
out.collect(device);
} catch (Exception e) {
logger.error("Error parsing ods_new_collect_log 处理 Kafka 消息出错 | data:{} | error:{}", value,e.getMessage());
}
}
})
.assignTimestampsAndWatermarks(WatermarkStrategy.<DeviceId>forBoundedOutOfOrderness(Duration.ofSeconds(5))
.withTimestampAssigner((device, ts) -> device.getCollectTime()));
return deviceDataStream.connect(abroadUserDataStream)
.keyBy(device -> device.getCid() + "#_#" + device.getPhone(),
user -> user.getCid() + "#_#" + user.getPhone_number())
.process(new CollectLogJoinProcessor() {
/**
*
*/
private static final long serialVersionUID = 1L;
});
}
//处理生成设备ID
public DeviceId handleData(String value) throws Exception {
CollectLog log = JSONObject.parseObject(value, new TypeReference<CollectLog>(){});
if(null == log) {
return null;
}
String appType = log.getAppType();
String appKey = log.getAppKey();
String other_info = log.getOtherInfo();
String device_info = log.getDeviceInfo();
String env_info = log.getEnvInfo();
String createTime = log.getCreateTime();
DeviceIdInfo deviceIdInfo = GenDeviceIdProcessor.genDeviceId(appType,appKey, other_info, device_info, env_info);
UserProperties userProperties = UserPropertiesProcessor.userPropertiesToJson(log.getUserProperties());
if(null == deviceIdInfo) {
return null;
}
if(StringUtils.isEmpty(deviceIdInfo.getDeviceIdV1())) {
return null;
}
if(null == userProperties) {
return null;
}
if(StringUtils.isEmpty(userProperties.getCid())) {
return null;
}
if(StringUtils.isEmpty(userProperties.getPhone())) {
return null;
}
DeviceId deviceId = new DeviceId();
deviceId.setDeviceId(deviceIdInfo.getDeviceIdV1());
deviceId.setCid(userProperties.getCid());
deviceId.setPhone(userProperties.getPhone());
deviceId.setNick(userProperties == null ? null : userProperties.getNick());
deviceId.setAppKey(deviceIdInfo.getAppKey());
deviceId.setAppType(appType);
deviceId.setCreateTime(createTime);
deviceId.setCollectTime(TimeConvertUtil.convertToTimestamp(createTime));
return deviceId;
}
//rowData格式组装
public RowData exeRowData(Result result,String[] fields){
if(null == result) {
return null;
}
GenericRowData row = new GenericRowData(fields.length);
row.setField(0, StringData.fromString(result.getCid()));
row.setField(1, StringData.fromString(result.getPhone()));
row.setField(2, StringData.fromString(result.getAppKey()));
row.setField(3, StringData.fromString(result.getDeviceId()));
row.setField(4, StringData.fromString(result.getAppType()));
row.setField(5, StringData.fromString(result.getCountryCode()));
row.setField(6, StringData.fromString(result.getUserState()));
row.setField(7, StringData.fromString(result.getNick()));
row.setField(8, TimestampData.fromLocalDateTime(LocalDateTime.parse(result.getCreateTime(),DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss.SSS"))));
row.setField(9, 0); // __DORIS_DELETE_SIGN__
return row;
}
}
......@@ -25,6 +25,7 @@ public enum TopicTypeEnum {
ODS_EVENT_IP_CONVERT("ods_event_ip_convert","odsEventIpConvert"),
ODS_USER_INVITATION("ods_user_invitation","odsUserInvitation"),
SIMI_USER_LIST_TOPIC("simi_user_list","simiUserList"),
ABROAD_SIMI_USER_LIST_TOPIC("abroad_simi_user_list","abroadSimiUserList"),
OPEN_SIMI_API("ods_open_simi_api","odsOpenSimiApi"),
;
......
......@@ -21,7 +21,7 @@ import com.flink.vo.ios.IosEnvInfo;
* 类说明
*/
public class GenDeviceIdProcessor {
public static DeviceIdInfo genDeviceId(String appType,String appKey,String other_info,String device_info,String env_info) {
DeviceIdInfo deviceIdInfo = new DeviceIdInfo();
String deviceId = null;
......@@ -47,7 +47,7 @@ public class GenDeviceIdProcessor {
if(StringUtils.equals(appType, AppTypeEnum.ANDROID.getCode())) {
deviceIdV1 = deviceId;
if(StringUtils.isEmpty(appKey) || StringUtils.equals(appKey, "C7jias27jias2")) {
if(StringUtils.isEmpty(appKey) || StringUtils.isEmpty(appKey.trim()) || StringUtils.equals(appKey, "C7jias27jias2")) {
appKey = "8ooOvXJo276";
}
}else if(StringUtils.equals(appType, AppTypeEnum.IOS.getCode())) {
......@@ -57,6 +57,9 @@ public class GenDeviceIdProcessor {
}
deviceIdInfo.setDeviceIdV1(deviceIdV1);
deviceIdInfo.setIdfv(idfv);
deviceIdInfo.setAppKey(appKey);
return deviceIdInfo;
}
}
......@@ -28,7 +28,9 @@ public class DeviceIdLatestProcessor implements JobProcessor{
private static List<KafkaTopic> createTopicList() {
return Arrays.stream(new TopicTypeEnum[]{
TopicTypeEnum.ODS_NEW_COLLECT_LOG,
TopicTypeEnum.SIMI_USER_LIST_TOPIC
TopicTypeEnum.SIMI_USER_LIST_TOPIC,
TopicTypeEnum.ABROAD_SIMI_USER_LIST_TOPIC,
}).map(TopicTypeEnum::createKafkaTopic)
.collect(Collectors.toList());
}
......
......@@ -20,4 +20,5 @@ public class DeviceIdInfo implements Serializable{
private static final long serialVersionUID = 1L;
private String idfv;
private String deviceIdV1;
private String appKey;
}
......@@ -15,4 +15,16 @@ hdfs.url=hdfs://10.0.0.105:8020/user/ck/
simiUserInfo.url= https://admin.dw20.net/prod-api/
simiUserInfo.authorization= KhhZAQKaZkfd7p55
simiUserInfo.key= niiKpP4SXce2zCHZ
\ No newline at end of file
simiUserInfo.key= niiKpP4SXce2zCHZ
simi.android.appKe= ptyzTPaV207
simi.ios.appKey= giHQ1YLp925
abroadSimi.android.appKey= lOxLJYzx658
abroadSimi.ios.appKey= lcALJYzx932
chainless.android.appKey= 8ooOvXJo276
chainless.ios.appKey= 9JQ3A7GA420
abroadChainless.android.appKey= 672OvXJo236
abroadChainless.ios.appKey= KJa3A7GA410
\ No newline at end of file
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or sign in to comment