Commit b6f14797 by 魏建枢

代码提交

parent 93042482
...@@ -61,7 +61,7 @@ public class CommonConsumeBaseAchi extends SourceCommonBase implements Serializa ...@@ -61,7 +61,7 @@ public class CommonConsumeBaseAchi extends SourceCommonBase implements Serializa
} }
if(StringUtils.equals(kafkaDataSource.getTopic(),TopicTypeEnum.ODS_COLLECT_USER_BEHAVIOR.getTopic())) { if(StringUtils.equals(kafkaDataSource.getTopic(),TopicTypeEnum.ODS_COLLECT_USER_BEHAVIOR.getTopic())) {
DataStreamSource<String> collectUserBehaviorStreamSource = kafkaDataSource.getDataStreamSource(); DataStreamSource<String> collectUserBehaviorStreamSource = kafkaDataSource.getDataStreamSource();
CollectUserBehaviorAchi.collectUserBehavior(collectUserBehaviorStreamSource); UserDailyActivityAchi.collectUserBehavior(collectUserBehaviorStreamSource);
} }
} }
}else { }else {
......
package com.flink.achieve.base;
import java.io.Serializable;
import java.time.format.DateTimeFormatter;
import java.util.Objects;
import org.apache.doris.flink.sink.DorisSink;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.table.api.DataTypes;
import org.apache.flink.table.data.GenericRowData;
import org.apache.flink.table.data.RowData;
import org.apache.flink.table.data.StringData;
import org.apache.flink.table.types.DataType;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import com.alibaba.fastjson.JSONObject;
import com.alibaba.fastjson.TypeReference;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.flink.common.DorisConnector;
import com.flink.util.TimeConvertUtil;
import com.flink.vo.CollectUserBehavior;
/**
* @author wjs
* @version 创建时间:2025-9-12 11:44:06 类说明
*/
public class UserDailyActivityAchi implements Serializable{
private static final long serialVersionUID = 1L;
private static final Logger logger = LoggerFactory.getLogger(UserDailyActivityAchi.class);
private static final ObjectMapper objectMapper = new ObjectMapper();
// 定义公共常量
private static final String DORIS_DELETE_SIGN = "__DORIS_DELETE_SIGN__";
private static final int DELETE_SIGN_VALUE = 0;
// 日期时间格式
private static final DateTimeFormatter DATE_TIME_FORMATTER = DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss.SSS");
public static void collectUserBehavior(DataStreamSource<String> dataStreamSource) {
//================= 配置入库字段 =================
String[] fields = {
"device_id",
"cid",
"phone",
"app_key",
"app_type",
"dt",
"device_id_v1",
"platform",
"country_code",
"nick",
"brand",
"device_model",
"os_release",
"app_version",
"ip_name",
"area_name",
"network_ip",
"network_area_name",
"phone_name",
"network_model",
"device_name",
"zone_name",
"zone_type",
"zone_code",
DORIS_DELETE_SIGN
};
DataType[] types = {
DataTypes.STRING(), DataTypes.STRING(), DataTypes.STRING(),
DataTypes.STRING(), DataTypes.STRING(), DataTypes.DATE(),
DataTypes.STRING(), DataTypes.STRING(), DataTypes.STRING(),
DataTypes.STRING(), DataTypes.STRING(), DataTypes.STRING(),
DataTypes.STRING(), DataTypes.STRING(), DataTypes.STRING(),
DataTypes.STRING(), DataTypes.STRING(), DataTypes.STRING(),
DataTypes.STRING(), DataTypes.STRING(), DataTypes.STRING(),
DataTypes.STRING(), DataTypes.STRING(), DataTypes.STRING(),
DataTypes.INT()
};
//================= 创建 Doris Sink =================
String tableName = "bi.user_daily_activity";
DorisSink<RowData> dorisSink = DorisConnector.sinkDoris(fields, types, tableName);
dataStreamSource
.map(value -> {
try {
CollectUserBehavior log = JSONObject.parseObject(value,new TypeReference<CollectUserBehavior>(){});
if(null == log) {
return null;
}
GenericRowData row = new GenericRowData(fields.length);
// 字段映射
row.setField(0, StringData.fromString(log.getDevice_id()));
row.setField(1, StringData.fromString(log.getCid()));
row.setField(2, StringData.fromString(log.getPhone()));
row.setField(3, StringData.fromString(log.getApp_key()));
row.setField(4, StringData.fromString(log.getApp_type()));
row.setField(5, TimeConvertUtil.convertToSqlDate(log.getCreate_time()));
row.setField(6, StringData.fromString(log.getDevice_id_v1()));
row.setField(7, StringData.fromString(log.getPlatform()));
row.setField(8, StringData.fromString(log.getCountry_code()));
row.setField(9, StringData.fromString(log.getNick()));
row.setField(10, StringData.fromString(log.getBrand()));
row.setField(11, StringData.fromString(log.getDevice_model()));
row.setField(12, StringData.fromString(log.getOs_release()));
row.setField(13, StringData.fromString(log.getApp_version()));
row.setField(14, StringData.fromString(log.getIp_name()));
row.setField(15, StringData.fromString(log.getArea_name()));
row.setField(16, StringData.fromString(log.getNetwork_ip()));
row.setField(17, StringData.fromString(log.getNetwork_area_name()));
row.setField(18, StringData.fromString(log.getPhone_name()));
row.setField(19, StringData.fromString(log.getNetwork_model()));
row.setField(20, StringData.fromString(log.getDevice_name()));
row.setField(21, StringData.fromString(log.getZone_name()));
row.setField(22, StringData.fromString(log.getZone_type()));
row.setField(23, StringData.fromString(log.getZone_code()));
row.setField(24, DELETE_SIGN_VALUE); // 删除标记(0=正常数据)
return (RowData)row;
} catch (Exception e) {
System.err.println("解析失败: "+e.toString());
return null;
}
})
.filter(Objects::nonNull)
// .print();
.sinkTo(dorisSink)
.name("Doris-DailyActivitySink");
}
}
package com.flink.achieve.doris;
import java.io.Serializable;
import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.PreparedStatement;
import java.sql.ResultSet;
import java.time.Duration;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import java.util.Objects;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;
import org.apache.commons.collections.CollectionUtils;
import org.apache.commons.lang3.StringUtils;
import org.apache.doris.flink.sink.DorisSink;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.common.functions.CoGroupFunction;
import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.common.io.ParseException;
import org.apache.flink.api.common.state.StateTtlConfig;
import org.apache.flink.api.common.state.ValueState;
import org.apache.flink.api.common.state.ValueStateDescriptor;
import org.apache.flink.api.common.typeinfo.TypeHint;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.datastream.AsyncDataStream;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.KeyedProcessFunction;
import org.apache.flink.streaming.api.functions.ProcessFunction;
import org.apache.flink.streaming.api.functions.async.ResultFuture;
import org.apache.flink.streaming.api.functions.async.RichAsyncFunction;
import org.apache.flink.streaming.api.functions.co.ProcessJoinFunction;
import org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows;
import org.apache.flink.table.api.DataTypes;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
import org.apache.flink.table.data.GenericRowData;
import org.apache.flink.table.data.RowData;
import org.apache.flink.table.data.StringData;
import org.apache.flink.table.data.TimestampData;
import org.apache.flink.table.types.DataType;
import org.apache.flink.util.Collector;
import org.apache.flink.util.OutputTag;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONObject;
import com.alibaba.fastjson.TypeReference;
import com.flink.common.DorisConnector;
import com.flink.common.SourceCommonBase;
import com.flink.enums.TopicTypeEnum;
import com.flink.processor.function.LatestUserProcessFunction;
import com.flink.util.CompareUtils;
import com.flink.util.SqlLoader;
import com.flink.util.TimeConvertUtil;
import com.flink.vo.DwdSysLog;
import com.flink.vo.EventList;
import com.flink.vo.KafkaDataSource;
import com.flink.vo.OdsEventLog;
import com.flink.vo.PcEventInfo;
import com.flink.vo.PcOdsEventLog;
import com.flink.vo.PcProperties;
import com.flink.vo.SimiUserInfo;
import com.flink.vo.UserDailyActivityOutputLog;
import com.flink.vo.userDailyActivity.BulidDailyParams;
import com.flink.vo.userDailyActivity.DailyActivityCombinedLog;
import com.flink.vo.userDailyActivity.DailyActivityDeviceInfo;
import com.flink.vo.userDailyActivity.DailyActivityEnrichedLog;
import com.flink.vo.userDailyActivity.DailyActivityEventInfo;
/**
* @author wjs
* @version 创建时间:2025-7-31 11:02:52
* 类说明
*/
public class UserDailyActivityAchi extends SourceCommonBase implements Serializable{
/**
*
*/
private static final long serialVersionUID = 1L;
private static final Logger logger = LoggerFactory.getLogger(UserDailyActivityAchi.class);
@Override
public void parseMultipleSourceKafkaJson(List<KafkaDataSource> dataSourceList) throws ParseException, Exception {
DataStreamSource<String> collectLogStreamSource = null;
DataStreamSource<String> eventStreamSource = null;
DataStreamSource<String> pcCollectLogStreamSource = null;
DataStreamSource<String> pcEventStreamSource = null;
DataStreamSource<String> userStreamSource = null;
DataStreamSource<String> abroadUserStreamSource = null;
DataStreamSource<String> sysLogStreamSource = null;
if(CollectionUtils.isNotEmpty(dataSourceList)) {
for(KafkaDataSource kafkaDataSource : dataSourceList) {
if(StringUtils.equals(kafkaDataSource.getTopic(),TopicTypeEnum.DWD_SYS_LOG.getTopic())) {
sysLogStreamSource = kafkaDataSource.getDataStreamSource();
}
if(StringUtils.equals(kafkaDataSource.getTopic(),TopicTypeEnum.ODS_NEW_COLLECT_LOG.getTopic())) {
collectLogStreamSource = kafkaDataSource.getDataStreamSource();
}
if(StringUtils.equals(kafkaDataSource.getTopic(),TopicTypeEnum.ODS_EVENT_LOG.getTopic())) {
eventStreamSource = kafkaDataSource.getDataStreamSource();
}
if(StringUtils.equals(kafkaDataSource.getTopic(),TopicTypeEnum.SIMI_USER_LIST_TOPIC.getTopic())) {
userStreamSource = kafkaDataSource.getDataStreamSource();
}
if(StringUtils.equals(kafkaDataSource.getTopic(),TopicTypeEnum.ABROAD_SIMI_USER_LIST_TOPIC.getTopic())) {
abroadUserStreamSource = kafkaDataSource.getDataStreamSource();
}
if(StringUtils.equals(kafkaDataSource.getTopic(),TopicTypeEnum.ODS_PC_COLLECT_LOG.getTopic())) {
pcCollectLogStreamSource = kafkaDataSource.getDataStreamSource();
}
if(StringUtils.equals(kafkaDataSource.getTopic(),TopicTypeEnum.ODS_PC_EVENT_LOG.getTopic())) {
pcEventStreamSource = kafkaDataSource.getDataStreamSource();
}
}
}else {
return;
}
//设备信息合并
DataStream<DailyActivityDeviceInfo> mergedDeviceStream = mergedDeviceStream(collectLogStreamSource,pcCollectLogStreamSource,sysLogStreamSource);
//事件信息合并
DataStream<DailyActivityEventInfo> mergedEventStream = mergedEventStream(eventStreamSource,pcEventStreamSource);
// 用户数据合并
DataStream<SimiUserInfo> mergedAllUserStream = mergedUserDataStream(abroadUserStreamSource,userStreamSource);
//设备关联事件
DataStream<DailyActivityEnrichedLog> processedStream = deviceJoinEventStream(mergedDeviceStream,mergedEventStream);
//分组去重
DataStream<DailyActivityEnrichedLog> processedGroupDistinctStream = processedGroupDistinctStream(processedStream);
OutputTag<DailyActivityEnrichedLog> wulianTag = new OutputTag<DailyActivityEnrichedLog>("wulian"){private static final long serialVersionUID = 1L;};
OutputTag<DailyActivityEnrichedLog> simiTag = new OutputTag<DailyActivityEnrichedLog>("simi"){private static final long serialVersionUID = 1L;};
SingleOutputStreamOperator<DailyActivityEnrichedLog> splitStream = processedGroupDistinctStream
.process(new ProcessFunction<DailyActivityEnrichedLog, DailyActivityEnrichedLog>() {
/**
*
*/
private static final long serialVersionUID = 1L;
@Override
public void processElement(DailyActivityEnrichedLog log, Context ctx, Collector<DailyActivityEnrichedLog> out) {
if (isWulianApp(log.getAppKey())) {
ctx.output(wulianTag, log); // 无链业务
} else if (isSimiApp(log.getAppKey())) {
ctx.output(simiTag, log); // 私米业务
}
}
private boolean isWulianApp(String appKey) {
return Arrays.asList("8ooOvXJo276", "9JQ3A7GA420")
.contains(appKey);
}
private boolean isSimiApp(String appKey) {
return Arrays.asList(
"ptyzTPaV207",
"giHQ1YLp925",
"lOxLJYzx658",
"lcALJYzx932",
"pc1KPjmh951",
"pcrIjvC5805",
"pcUXtmMh356",
"pcrPGB1z531",
"pcRIhwh1380",
"pcQmdNl0952",
"pc1etTC6207",
"pcd9Sa8T989"
)
.contains(appKey);
}
});
DataStream<UserDailyActivityOutputLog> wulianStream = processWulian(splitStream.getSideOutput(wulianTag))
.filter(log -> log != null && log instanceof UserDailyActivityOutputLog);
DataStream<UserDailyActivityOutputLog> simiStream = processSimi(splitStream.getSideOutput(simiTag), mergedAllUserStream)
.filter(log -> log != null && log instanceof UserDailyActivityOutputLog);
sinkDoris(wulianStream);
sinkDoris(simiStream);
}
private DataStream<DailyActivityEnrichedLog> processedGroupDistinctStream(DataStream<DailyActivityEnrichedLog> processedStream) {
return processedStream.keyBy(new KeySelector<DailyActivityEnrichedLog, Tuple2<String, String>>() {
/**
*
*/
private static final long serialVersionUID = 1L;
@Override
public Tuple2<String, String> getKey(DailyActivityEnrichedLog info) {
return Tuple2.of(info.getDeviceIdV1(), info.getDt());
}
})
.process(new KeyedProcessFunction<Tuple2<String, String>, DailyActivityEnrichedLog, DailyActivityEnrichedLog>() {
/**
*
*/
private static final long serialVersionUID = 1L;
private ValueState<DailyActivityEnrichedLog> bestState;
@Override
public void open(Configuration params) {
ValueStateDescriptor<DailyActivityEnrichedLog> descriptor = new ValueStateDescriptor<>("bestInfo", DailyActivityEnrichedLog.class);
StateTtlConfig ttlConfig = StateTtlConfig.newBuilder(Duration.ofMinutes(30)).build();
descriptor.enableTimeToLive(ttlConfig);
bestState = getRuntimeContext().getState(descriptor);
}
@Override
public void processElement(DailyActivityEnrichedLog value,Context ctx, Collector<DailyActivityEnrichedLog> out) throws Exception {
DailyActivityEnrichedLog current = bestState.value();
if (current == null || isBetter(value, current)) {
bestState.update(value);
out.collect(value);
}
}
private boolean isBetter(DailyActivityEnrichedLog newInfo, DailyActivityEnrichedLog current) {
boolean newValid = isValidIp(newInfo.getIp());
boolean currentValid = isValidIp(current.getIp());
if (newValid && !currentValid) return true;
if (newValid == currentValid) {
return newInfo.getWaterMarkTime() > current.getWaterMarkTime();
}
return false;
}
private boolean isValidIp(String ip) {
return StringUtils.isNotEmpty(ip) &&
!ip.equals("[]") &&
!ip.equals("null");
}
});
}
public void sinkDoris(DataStream<UserDailyActivityOutputLog> outputStream) {
//================= 配置入库字段 =================
String[] fields = {
"device_id", "cid", "app_key", "platform", "app_type", "dt",
"country_code", "phone", "nick", "brand", "model", "os_release",
"app_version", "ip", "area_name", "network_ip", "network_area_name",
"frist_time", "latest_time", "phone_name", "network_model",
"device_name", "zone_name", "zone_type", "zone_code",
"__DORIS_DELETE_SIGN__"
};
DataType[] types = {
DataTypes.STRING(), DataTypes.STRING(), DataTypes.STRING(),
DataTypes.STRING(), DataTypes.STRING(), DataTypes.STRING(),
DataTypes.STRING(), DataTypes.STRING(), DataTypes.STRING(),
DataTypes.STRING(), DataTypes.STRING(), DataTypes.STRING(),
DataTypes.STRING(), DataTypes.STRING(), DataTypes.STRING(),
DataTypes.STRING(), DataTypes.STRING(),
DataTypes.TIMESTAMP(3), DataTypes.TIMESTAMP(3),
DataTypes.STRING(), DataTypes.STRING(), DataTypes.STRING(),
DataTypes.STRING(), DataTypes.STRING(), DataTypes.STRING(),
DataTypes.INT()
};
//================= 创建 Doris Sink =================
String tableName = "bi.daily_activity";
DorisSink<RowData> dorisSink = DorisConnector.sinkDoris(fields, types, tableName);
//================= 数据处理流水线 =================
outputStream
.map(log -> {
try {
if(log == null) {
return null;
}
GenericRowData row = new GenericRowData(fields.length);
// 字段映射
row.setField(0, StringData.fromString(log.getDeviceId()));
row.setField(1, StringData.fromString(log.getCid()));
row.setField(2, StringData.fromString(log.getAppKey()));
row.setField(3, StringData.fromString(log.getPlatform()));
row.setField(4, StringData.fromString(log.getAppType()));
row.setField(5, StringData.fromString(log.getDt()));
row.setField(6, StringData.fromString(log.getCountryCode()));
row.setField(7, StringData.fromString(log.getPhone()));
row.setField(8, StringData.fromString(log.getNick()));
row.setField(9, StringData.fromString(log.getBrand()));
row.setField(10, StringData.fromString(log.getModel()));
row.setField(11, StringData.fromString(log.getOsRelease()));
row.setField(12, StringData.fromString(log.getAppVersion()));
row.setField(13, StringData.fromString(log.getIp()));
row.setField(14, StringData.fromString(log.getAreaName()));
row.setField(15, StringData.fromString(StringUtils.isNotEmpty(log.getNetworkIp()) ? log.getNetworkIp() : null));
row.setField(16, StringData.fromString(StringUtils.isNotEmpty(log.getNetworkAreaName()) ? log.getNetworkAreaName() : null));
row.setField(17, TimestampData.fromEpochMillis(log.getFirstTime()));
row.setField(18, TimestampData.fromEpochMillis(log.getLatestTime()));
row.setField(19, StringData.fromString(log.getPhoneName()));
row.setField(20, StringData.fromString(log.getNetworkModel()));
row.setField(21, StringData.fromString(log.getDeviceName()));
row.setField(22, StringData.fromString(log.getZoneName()));
row.setField(23, StringData.fromString(log.getZoneType()));
row.setField(24, StringData.fromString(log.getZoneCode()));
row.setField(25, 0); // 删除标记(0=正常数据)
return (RowData)row;
} catch (Exception e) {
logger.error("数据映射失败: device_id={}, error={}",
log.getDeviceId(), e.getMessage());
return null;
}
})
.filter(obj -> obj != null)
.sinkTo(dorisSink)
.name("Doris-DailyActivitySink");
}
//私米业务处理(需要关联用户)
private static DataStream<UserDailyActivityOutputLog> processSimi(
DataStream<DailyActivityEnrichedLog> simiStream,
DataStream<SimiUserInfo> userStream) {
return simiStream.coGroup(userStream)
.where(log -> log.getCid() + "|" + log.getPhone())
.equalTo(user -> user.getCid() + "|" + user.getPhone_number())
.window(TumblingEventTimeWindows.of(Duration.ofMinutes(5)))
.apply(new CoGroupFunction<DailyActivityEnrichedLog, SimiUserInfo, UserDailyActivityOutputLog>() {
/**
*
*/
private static final long serialVersionUID = 1L;
@Override
public void coGroup(Iterable<DailyActivityEnrichedLog> logs, Iterable<SimiUserInfo> users,
Collector<UserDailyActivityOutputLog> out) {
// 获取用户信息(取第一个匹配项)
String countryCode = null;
for (SimiUserInfo user : users) {
countryCode = user.getCountry_code();
break;
}
// 处理日志
for (DailyActivityEnrichedLog log : logs) {
UserDailyActivityOutputLog output = new UserDailyActivityOutputLog();
// 基础字段映射
output.setDeviceId(log.getDeviceIdV1());
output.setCid(log.getCid());
output.setAppKey(log.getAppKey());
output.setPlatform(log.getPlatform());
output.setAppType(log.getAppType());
output.setDt(log.getDt());
output.setCountryCode(countryCode);
output.setPhone(log.getPhone());
output.setNick(log.getNick());
output.setBrand(log.getBrand());
output.setModel(log.getModel());
output.setOsRelease(log.getOsRelease());
output.setAppVersion(log.getAppVersion());
output.setIp(log.getIp());
output.setAreaName(log.getAreaName());
output.setNetworkIp(log.getNetwork_ip());
output.setNetworkAreaName(log.getNetwork_area_name());
output.setFirstTime(log.getFirstTime());
output.setLatestTime(log.getLatestTime());
output.setPhoneName(log.getPhoneName());
output.setNetworkModel(log.getNetworkModel());
output.setDeviceName(log.getBrand() + "-" + log.getPhoneName());
output.setZoneName(log.getZoneName());
output.setZoneType(log.getZoneType());
output.setZoneCode(log.getZoneCode());
out.collect(output);
}
}
});
}
//无链业务处理(无需关联用户)
private static DataStream<UserDailyActivityOutputLog> processWulian(DataStream<DailyActivityEnrichedLog> wulianStream) {
return wulianStream.map(log -> {
// 电话号码拆分
UserDailyActivityOutputLog output = new UserDailyActivityOutputLog();
// 基础字段映射
output.setDeviceId(log.getDeviceIdV1());
output.setCid(log.getCid());
output.setAppKey(log.getAppKey());
output.setPlatform(log.getPlatform());
output.setAppType(log.getAppType());
output.setDt(log.getDt());
output.setCountryCode(log.getCountryCode());
output.setPhone(log.getPhone());
output.setNick(log.getNick());
output.setBrand(log.getBrand());
output.setModel(log.getModel());
output.setOsRelease(log.getOsRelease());
output.setAppVersion(log.getAppVersion());
output.setIp(log.getIp());
output.setAreaName(log.getAreaName());
output.setNetworkIp(log.getNetwork_ip());
output.setNetworkAreaName(log.getNetwork_area_name());
output.setFirstTime(log.getFirstTime());
output.setLatestTime(log.getLatestTime());
output.setPhoneName(log.getPhoneName());
output.setNetworkModel(log.getNetworkModel());
output.setDeviceName(log.getBrand() + "-" + log.getPhoneName());
output.setZoneName(log.getZoneName());
output.setZoneType(log.getZoneType());
output.setZoneCode(log.getZoneCode());
return output;
});
}
//设备信息关联事件信息通过deviceId+uniqueId
private DataStream<DailyActivityEnrichedLog> deviceJoinEventStream(DataStream<DailyActivityDeviceInfo> deviceStream,
DataStream<DailyActivityEventInfo> eventStream) {
DataStream<DailyActivityCombinedLog> combinedStream = deviceStream
.keyBy(dev -> dev.getDeviceId() + "#_#" + dev.getUniqueId())
.intervalJoin(eventStream.keyBy(evt -> evt.getDeviceId() + "#_#" + evt.getUniqueId()))
.between(Duration.ofMinutes(-10), Duration.ofMinutes(5))
.process(new ProcessJoinFunction<DailyActivityDeviceInfo, DailyActivityEventInfo, DailyActivityCombinedLog>() {
/**
*
*/
private static final long serialVersionUID = 1L;
@Override
public void processElement(DailyActivityDeviceInfo dev, DailyActivityEventInfo evt, Context ctx, Collector<DailyActivityCombinedLog> out) {
String appKey = dev.getAppKey();
String countryCode = null;
String phone = null;
if(CompareUtils.stringExists(appKey,
"8ooOvXJo276",
"9JQ3A7GA420")) {
String wulianPhone = evt.getPhone();
if (StringUtils.isNotEmpty(wulianPhone) && wulianPhone.contains(" ")) {
String[] parts = wulianPhone.split(" ", 2);
countryCode = parts[0];
phone = parts[1];
}
}else {
phone = evt.getPhone();
}
logger.info(">>>>>>>>>>>>>>>>>>> deviceJoinEventStream dev:{},evt:{}",dev,evt);
out.collect(new DailyActivityCombinedLog(
dev.getDeviceIdV1(), appKey, dev.getAppType(), dev.getDt(),
dev.getModel(), dev.getBrand(), dev.getOsRelease(), dev.getAppVersion(),
evt.getCid(), phone, evt.getNick(), evt.getIp(), evt.getAreaName(),
dev.getWaterMarkTime(), dev.getZoneName(), dev.getZoneType(), dev.getZoneCode(),
0L, 0L,dev.getDeviceName(),dev.getPlatform(),countryCode,dev.getNetwork_ip(),dev.getNetwork_area_name()
));
}
});
// 2. 关联维度表
DataStream<DailyActivityEnrichedLog> enrichedStream = AsyncDataStream.orderedWait(
combinedStream,
new DatabaseAsyncFunction(),
10,
TimeUnit.SECONDS,
100
);
//3. 计算设备时间
return enrichedStream
.keyBy(log -> log.getDeviceIdV1())
.process(new KeyedProcessFunction<String, DailyActivityEnrichedLog, DailyActivityEnrichedLog>() {
/**
*
*/
private static final long serialVersionUID = 1L;
private ValueState<Tuple2<Long, Long>> timeState;
@Override
public void open(Configuration params) {
ValueStateDescriptor<Tuple2<Long, Long>> descriptor =
new ValueStateDescriptor<>("deviceTimes", TypeInformation.of(new TypeHint<Tuple2<Long, Long>>() {}));
StateTtlConfig ttlConfig = StateTtlConfig.newBuilder(Duration.ofMinutes(10))
.setUpdateType(StateTtlConfig.UpdateType.OnReadAndWrite)
.cleanupInRocksdbCompactFilter(1000)
.build();
descriptor.enableTimeToLive(ttlConfig);
timeState = getRuntimeContext().getState(descriptor);
}
@Override
public void processElement(DailyActivityEnrichedLog log, Context ctx, Collector<DailyActivityEnrichedLog> out) throws Exception {
Tuple2<Long, Long> times = timeState.value();
long eventTime = log.getWaterMarkTime();
if (times == null) {
times = Tuple2.of(eventTime, eventTime);
} else {
times = Tuple2.of(
Math.min(times.f0, eventTime),
Math.max(times.f1, eventTime)
);
}
timeState.update(times);
log.setFirstTime(times.f0);
log.setLatestTime(times.f1);
out.collect(log);
}
});
}
// 异步JDBC查询实现
private static class DatabaseAsyncFunction extends RichAsyncFunction<DailyActivityCombinedLog, DailyActivityEnrichedLog> {
/**
*
*/
private static final long serialVersionUID = 1L;
private transient Connection connection;
private transient com.google.common.cache.Cache<String, PhoneModel> cache;
@Override
public void open(Configuration parameters) throws Exception {
// 1. 初始化数据库连接
Class.forName("com.mysql.cj.jdbc.Driver");
connection = DriverManager.getConnection(
"jdbc:mysql://10.0.1.213:3306/spider?useSSL=false&serverTimezone=UTC",
"bigdata",
"Im#Social&20181*29#im"
);
// 2. 初始化本地缓存(最大1000条,10分钟过期)
cache = com.google.common.cache.CacheBuilder.newBuilder()
.maximumSize(1000)
.expireAfterWrite(10, TimeUnit.MINUTES)
.build();
}
@Override
public void asyncInvoke(DailyActivityCombinedLog input, ResultFuture<DailyActivityEnrichedLog> resultFuture) {
// 3. 先尝试从缓存获取
PhoneModel cachedModel = cache.getIfPresent(input.getModel());
if (cachedModel != null) {
resultFuture.complete(Collections.singleton(new DailyActivityEnrichedLog(input, cachedModel.phoneName, cachedModel.networkModel)));
return;
}
// 4. 异步数据库查询
CompletableFuture.supplyAsync(() -> {
try {
// 5. 缓存未命中则查询数据库
try (PreparedStatement stmt = connection.prepareStatement(
"SELECT phone_name, 入网型号 AS network_model FROM brand_phone WHERE 入网型号 = ?")) {
stmt.setString(1, input.getModel());
ResultSet rs = stmt.executeQuery();
if (rs.next()) {
PhoneModel model = new PhoneModel(
rs.getString("phone_name"),
rs.getString("network_model")
);
// 6. 更新缓存
cache.put(input.getModel(), model);
return model;
}
}
return new PhoneModel(null, null); // 空结果
} catch (Exception e) {
logger.error("Async query failed for model: {}", input.getModel(), e);
return new PhoneModel(null, null); // 降级处理
}
}).thenAccept(model -> {
// 7. 组装结果
resultFuture.complete(Collections.singleton(
new DailyActivityEnrichedLog(input, model.phoneName, model.networkModel)
));
});
}
@Override
public void close() {
if (connection != null) try {
connection.close();
} catch (Exception e) {
logger.error("DB connection close error", e);
}
}
// 缓存数据结构
private static class PhoneModel {
final String phoneName;
final String networkModel;
PhoneModel(String phoneName, String networkModel) {
this.phoneName = phoneName;
this.networkModel = networkModel;
}
}
}
//设备信息关联入网ip合并
private DataStream<DailyActivityDeviceInfo> mergedDeviceStream(DataStreamSource<String> collectLogStreamSource,
DataStreamSource<String> pcCollectLogStreamSource,DataStreamSource<String> sysLogStreamSource) {
SingleOutputStreamOperator<DwdSysLog> sysDataStream = sysLogStreamSource
.map(new MapFunction<String, DwdSysLog>() {
/**
*
*/
private static final long serialVersionUID = 1L;
@Override
public DwdSysLog map(String value) throws Exception {
try {
DwdSysLog sysLog = JSON.parseObject(value, DwdSysLog.class);
logger.debug("sysLog原始数据: {}", sysLog);
if (sysLog == null || StringUtils.isEmpty(sysLog.getSend_time())) {
logger.warn("空值日志: {}", value);
return null;
}
// 时间戳转换(添加异常捕获)
Long waterMarkTime = TimeConvertUtil.convertToTimestamp(sysLog.getSend_time());
if (waterMarkTime <= 0) {
logger.error("时间转换失败: {}", sysLog.getSend_time());
return null;
}
sysLog.setWaterMarkTime(waterMarkTime);
return sysLog;
} catch (Exception e) {
logger.error("JSON解析失败: {} | 错误: {}", value, e.getMessage());
return null;
}
}
})
.filter(Objects::nonNull)
.assignTimestampsAndWatermarks(
WatermarkStrategy.<DwdSysLog>forBoundedOutOfOrderness(Duration.ofSeconds(5))
.withTimestampAssigner((sysLog, ts) -> sysLog.getWaterMarkTime())
);
SingleOutputStreamOperator<DailyActivityDeviceInfo> mergedDeviceStream = collectLogStreamSource.flatMap(new FlatMapFunction<String, DailyActivityDeviceInfo>() {
private static final long serialVersionUID = 1L;
@Override
public void flatMap(String value, Collector<DailyActivityDeviceInfo> out) throws Exception {
try {
// 解析 Kafka 数据
DailyActivityDeviceInfo device = BulidDailyParams.handleDeviceData(value);
if (device != null)
out.collect(device);
} catch (Exception e) {
logger.error("Error parsing ods_new_collect_log 处理 Kafka 消息出错 | data:{} | error:{}", value,e.getMessage());
}
}
}).union(
// PC设备信息数据流处理
pcCollectLogStreamSource.flatMap(new FlatMapFunction<String, DailyActivityDeviceInfo>() {
private static final long serialVersionUID = 1L;
@Override
public void flatMap(String value, Collector<DailyActivityDeviceInfo> out) throws Exception {
try {
// 解析 Kafka 数据
DailyActivityDeviceInfo device = BulidDailyParams.handlePcDeviceData(value);
if (device != null)
out.collect(device);
} catch (Exception e) {
logger.error("Error parsing ods_pc_collect_log 处理 Kafka 消息出错 | data:{} | error:{}", value,e.getMessage());
}
}
})
)
.assignTimestampsAndWatermarks(WatermarkStrategy.<DailyActivityDeviceInfo>forBoundedOutOfOrderness(Duration.ofSeconds(5))
.withTimestampAssigner((device, ts) -> device.getWaterMarkTime()));
return sysDataStream
.keyBy(new KeySelector<DwdSysLog, String>() {
/**
*
*/
private static final long serialVersionUID = 1L;
@Override
public String getKey(DwdSysLog sysLog) {
return sysLog.getDevice_id() + "#_#" +
sysLog.getUnique_id() + "#_#" +
sysLog.getDevice_id_v1();
}
})
.intervalJoin(
mergedDeviceStream.keyBy(new KeySelector<DailyActivityDeviceInfo, String>() {
/**
*
*/
private static final long serialVersionUID = 1L;
@Override
public String getKey(DailyActivityDeviceInfo dev) {
return dev.getDeviceId() + "#_#" +
dev.getUniqueId() + "#_#" +
dev.getDeviceIdV1(); // 注意字段名一致性!
}
})
)
.between(Duration.ofMinutes(-10), Duration.ofMinutes(5))
.process(new ProcessJoinFunction<DwdSysLog, DailyActivityDeviceInfo, DailyActivityDeviceInfo>() {
/**
*
*/
private static final long serialVersionUID = 1L;
@Override
public void processElement(DwdSysLog sysLog, DailyActivityDeviceInfo dev, Context ctx, Collector<DailyActivityDeviceInfo> out) {
out.collect(new DailyActivityDeviceInfo(dev.getDeviceId(), dev.getDeviceIdV1(), dev.getAppKey(), dev.getUniqueId(),
dev.getAppType(), dev.getDt(),dev.getModel(),dev.getBrand(),dev.getOsRelease(),dev.getAppVersion(),dev.getWaterMarkTime(),
dev.getZoneName(),dev.getZoneType(),dev.getZoneCode(),dev.getPlatform(),dev.getDeviceName(),sysLog.getNetwork_ip(),sysLog.getNetwork_area_name()));
}
});
}
// 用户数据合并
private DataStream<SimiUserInfo> mergedUserDataStream(DataStreamSource<String> abroadUserStreamSource,
DataStreamSource<String> userStreamSource) {
return abroadUserStreamSource.flatMap(new FlatMapFunction<String, SimiUserInfo>() {
private static final long serialVersionUID = 1L;
@Override
public void flatMap(String value, Collector<SimiUserInfo> out) throws Exception {
try {
// 解析 Kafka 数据
SimiUserInfo simiUserInfo = JSONObject.parseObject(value, new TypeReference<SimiUserInfo>() {
});
simiUserInfo.setUpdateTime(TimeConvertUtil.convertToTimestamp(simiUserInfo.getCreate_time()));
out.collect(simiUserInfo);
} catch (Exception e) {
logger.error("Error parsing abroad_simi_user_list 处理 Kafka 消息出错 | data:{} | error:{}", value,e.getMessage());
}
}
}).union(
userStreamSource.flatMap(new FlatMapFunction<String, SimiUserInfo>() {
private static final long serialVersionUID = 1L;
@Override
public void flatMap(String value, Collector<SimiUserInfo> out) throws Exception {
try {
// 解析 Kafka 数据
SimiUserInfo simiUserInfo = JSONObject.parseObject(value, new TypeReference<SimiUserInfo>() {
});
simiUserInfo.setUpdateTime(TimeConvertUtil.convertToTimestamp(simiUserInfo.getCreate_time()));
out.collect(simiUserInfo);
} catch (Exception e) {
logger.error("Error parsing simi_user_list 处理 Kafka 消息出错 | data:{} | error:{}", value,e.getMessage());
}
}
})
).filter(u -> StringUtils.isNoneEmpty(u.getCid(), u.getPhone_number()))
.assignTimestampsAndWatermarks(
WatermarkStrategy.<SimiUserInfo>forBoundedOutOfOrderness(Duration.ofMinutes(5))
.withTimestampAssigner((user, ts) -> user.getUpdateTime()))
.keyBy(user -> user.getCid() + "#_#" + user.getPhone_number()).process(new LatestUserProcessFunction());
}
//事件信息合并
private DataStream<DailyActivityEventInfo> mergedEventStream(DataStreamSource<String> eventStreamSource,
DataStreamSource<String> pcEventStreamSource) {
return eventStreamSource.flatMap(new FlatMapFunction<String, DailyActivityEventInfo>() {
private static final long serialVersionUID = 1L;
@Override
public void flatMap(String value, Collector<DailyActivityEventInfo> out) throws Exception {
try {
// 解析 Kafka 数据
OdsEventLog odsEventLog = JSONObject.parseObject(value, new TypeReference<OdsEventLog>() {});
if (null == odsEventLog) {
return;
}
List<EventList> eventList = JSONObject.parseObject(odsEventLog.getEvent_list(), new TypeReference<List<EventList>>() {});
if (CollectionUtils.isEmpty(eventList)) {
return;
}
for(EventList event : eventList) {
DailyActivityEventInfo eventInfo = BulidDailyParams.handleEventData(odsEventLog,event);
if (eventInfo != null)
out.collect(eventInfo);
}
} catch (Exception e) {
logger.error("Error parsing ods_event_log 处理 Kafka 消息出错 | data:{} | error:{}", value,e.getMessage());
}
}
}).union(
// PC事件信息数据流处理
pcEventStreamSource.flatMap(new FlatMapFunction<String, DailyActivityEventInfo>() {
private static final long serialVersionUID = 1L;
@Override
public void flatMap(String value, Collector<DailyActivityEventInfo> out) throws Exception {
try {
// 解析 Kafka 数据
PcOdsEventLog pcOdsEventLog = JSONObject.parseObject(value, new TypeReference<PcOdsEventLog>() {});
String event_info = pcOdsEventLog.getEvent_info();
if(StringUtils.isEmpty(event_info)) {
return;
}
PcEventInfo pcEventInfo = JSONObject.parseObject(event_info, new TypeReference<PcEventInfo>() {});
if(null == pcEventInfo) {
return;
}
List<PcProperties> properties = pcEventInfo.getProperties();
if (CollectionUtils.isEmpty(properties)) {
return;
}
for(PcProperties pcProperties : properties) {
DailyActivityEventInfo eventInfo = BulidDailyParams.handlePcEventData(pcOdsEventLog,pcEventInfo,pcProperties);
if (eventInfo != null)
out.collect(eventInfo);
}
} catch (Exception e) {
logger.error("Error parsing ods_pc_event_log 处理 Kafka 消息出错 | data:{} | error:{}", value,e.getMessage());
}
}
})
)
.assignTimestampsAndWatermarks(WatermarkStrategy.<DailyActivityEventInfo>forBoundedOutOfOrderness(Duration.ofSeconds(5))
.withTimestampAssigner((event, ts) -> event.getWaterMarkTime()));
}
@Override
public void parseSourceKafkaJson(DataStreamSource<String> dataStreamSource) throws ParseException, Exception {
// TODO Auto-generated method stub
}
@Override
public void parseSourceKafkaToSqlTable(TopicTypeEnum topicTypeEnum, StreamTableEnvironment tableEnv,SqlLoader loader)
throws ParseException, Exception {
// TODO Auto-generated method stub
}
@Override
public void parseMultipleSourceKafkaToSqlTable(StreamExecutionEnvironment env,StreamTableEnvironment tableEnv,SqlLoader loader)
throws ParseException, Exception {
// TODO Auto-generated method stub
}
}
...@@ -19,7 +19,6 @@ import java.util.stream.Collectors; ...@@ -19,7 +19,6 @@ import java.util.stream.Collectors;
public enum JobTypeEnum { public enum JobTypeEnum {
EVENT_IP_CONVERT("JOB_01", "事件IP转换作业"), EVENT_IP_CONVERT("JOB_01", "事件IP转换作业"),
COMMON_CONSUME_BASE("JOB_02", "公共基础消费采集作业"), COMMON_CONSUME_BASE("JOB_02", "公共基础消费采集作业"),
USER_DAILY_ACTIVITY("JOB_03", "用户日活作业"),
COMMON_CONSUME_SQL_BASE("JOB_04", "公共基础消费采集SQL作业"), COMMON_CONSUME_SQL_BASE("JOB_04", "公共基础消费采集SQL作业"),
EVENT_IP_CONVERT_CID("JOB_07", "最新事件IP作业"), EVENT_IP_CONVERT_CID("JOB_07", "最新事件IP作业"),
......
...@@ -10,7 +10,6 @@ import com.flink.processor.impl.EventIpLatestProcessor; ...@@ -10,7 +10,6 @@ import com.flink.processor.impl.EventIpLatestProcessor;
import com.flink.processor.impl.RegistrationCheckProcessor; import com.flink.processor.impl.RegistrationCheckProcessor;
import com.flink.processor.impl.SimiFriendsProcessor; import com.flink.processor.impl.SimiFriendsProcessor;
import com.flink.processor.impl.SimiGroupstProcessor; import com.flink.processor.impl.SimiGroupstProcessor;
import com.flink.processor.impl.UserDailyActivityProcessor;
import com.flink.processor.impl.VectorAngleCalculationProcessor; import com.flink.processor.impl.VectorAngleCalculationProcessor;
/** /**
...@@ -28,8 +27,6 @@ public class JobProcessorFactory { ...@@ -28,8 +27,6 @@ public class JobProcessorFactory {
return new CommonConsumeBaseProcessor(); return new CommonConsumeBaseProcessor();
case COMMON_CONSUME_SQL_BASE: case COMMON_CONSUME_SQL_BASE:
return new CommonConsumeSqlBaseProcessor(); return new CommonConsumeSqlBaseProcessor();
case USER_DAILY_ACTIVITY:
return new UserDailyActivityProcessor();
case EVENT_IP_CONVERT_CID: case EVENT_IP_CONVERT_CID:
return new EventIpLatestProcessor(); return new EventIpLatestProcessor();
case DEVICE_ID_CID: case DEVICE_ID_CID:
......
...@@ -35,6 +35,7 @@ public class CommonConsumeBaseProcessor implements JobProcessor{ ...@@ -35,6 +35,7 @@ public class CommonConsumeBaseProcessor implements JobProcessor{
TopicTypeEnum.ODS_NEW_COLLECT_LOG, TopicTypeEnum.ODS_NEW_COLLECT_LOG,
TopicTypeEnum.OPEN_SIMI_API, TopicTypeEnum.OPEN_SIMI_API,
TopicTypeEnum.ODS_EXCEPTION_EVENT_TOPIC, TopicTypeEnum.ODS_EXCEPTION_EVENT_TOPIC,
TopicTypeEnum.ODS_COLLECT_USER_BEHAVIOR,
}).map(TopicTypeEnum::createKafkaTopic) }).map(TopicTypeEnum::createKafkaTopic)
.collect(Collectors.toList()); .collect(Collectors.toList());
......
package com.flink.processor.impl;
import java.util.Arrays;
import java.util.List;
import java.util.stream.Collectors;
import com.flink.achieve.doris.UserDailyActivityAchi;
import com.flink.enums.JobTypeEnum;
import com.flink.enums.TopicTypeEnum;
import com.flink.processor.JobProcessor;
import com.flink.vo.KafkaTopic;
/**
* @author wjs
* @version 创建时间:2025-7-31 11:03:25
* 类说明
*/
public class UserDailyActivityProcessor implements JobProcessor{
@Override
public void process() throws Exception {
new UserDailyActivityAchi().multipleExecuteJob(
createTopicList(),
JobTypeEnum.USER_DAILY_ACTIVITY,
false,
true
);
}
private static List<KafkaTopic> createTopicList() {
return Arrays.stream(new TopicTypeEnum[]{
TopicTypeEnum.ODS_NEW_COLLECT_LOG,
TopicTypeEnum.ODS_EVENT_LOG,
TopicTypeEnum.ODS_PC_EVENT_LOG,
TopicTypeEnum.ODS_PC_COLLECT_LOG,
TopicTypeEnum.SIMI_USER_LIST_TOPIC,
TopicTypeEnum.ABROAD_SIMI_USER_LIST_TOPIC,
TopicTypeEnum.DWD_SYS_LOG
}).map(TopicTypeEnum::createKafkaTopic)
.collect(Collectors.toList());
}
}
package com.flink.util;
/**
* @author wjs
* @version 创建时间:2025-9-15 15:19:18
* 类说明
*/
import com.fasterxml.jackson.core.JsonParser;
import com.fasterxml.jackson.core.JsonToken;
import com.fasterxml.jackson.databind.DeserializationContext;
import com.fasterxml.jackson.databind.JsonDeserializer;
import java.io.IOException;
public class StringArrayToStringDeserializer extends JsonDeserializer<String> {
@Override
public String deserialize(JsonParser p, DeserializationContext ctxt) throws IOException {
// 如果当前令牌是数组的开始(即数据是 ["..."] 这种形式)
if (p.currentToken() == JsonToken.START_ARRAY) {
// 移动到数组内的第一个元素(即我们的目标字符串)
p.nextToken();
// 读取这个字符串的值
String value = p.getValueAsString();
// 确保消耗掉数组的结束标记
p.nextToken();
return value;
} else if (p.currentToken() == JsonToken.VALUE_STRING) {
// 如果本来就是字符串,直接返回(兼容性处理)
return p.getValueAsString();
} else {
// 如果既不是数组也不是字符串,可以根据需要抛出异常或返回null
return null;
}
}
}
\ No newline at end of file
package com.flink.vo;
import java.io.Serializable;
import com.fasterxml.jackson.databind.annotation.JsonDeserialize;
import com.flink.util.StringArrayToStringDeserializer;
import lombok.Data;
import lombok.ToString;
/**
* @author wjs
* @version 创建时间:2025-9-12 11:46:43
* 类说明
*/
@Data
@ToString
public class CollectUserBehavior implements Serializable{
/**
*
*/
private static final long serialVersionUID = 1L;
private String id;
private String device_id;
private String app_type;
private String app_key;
private String cid;
private String phone;
private String event_type;
private String event_time;
private String view_type;
private String view_id;
private String uid;
private String nick;
private String unique_id;
private String create_time;
private String strategy_group_id;
private String strategy_version;
private String send_time;
private String app_channel;
private String zone_code;
private String zone_name;
private String zone_type;
private String sdk_version;
private String user_agent;
private String user_properties;
private String route_ip;
private String ip_name;
private String area_name;
private String device_id_v1;
private String other_info;
private String device_info;
private String env_info;
private String network_ip;
private String network_area_name;
private String network_model;
private String phone_name;
private String device_name;
private String brand;
private String device_model;
private String os_release;
private String app_version;
private String platform;
private String third_id;
private String country_code;
private String register_time;
private String user_state;
private String user_head_url;
private String content;
private String screen_name;
private String touch_pressure;
@JsonDeserialize(using = StringArrayToStringDeserializer.class)
private String draw_point;
private String dt;
private String first_time;
private String latest_time;
}
package com.flink.vo;
import java.io.Serializable;
import lombok.Data;
import lombok.ToString;
/**
* @author wjs
* @version 创建时间:2025-8-1 18:17:36
* 类说明
*/
@Data
@ToString
public class UserDailyActivityOutputLog implements Serializable{
/**
*
*/
private static final long serialVersionUID = 1L;
private String deviceId;
private String cid;
private String appKey;
private String platform;
private String appType;
private String dt;
private String countryCode;
private String phone;
private String nick;
private String brand;
private String model;
private String osRelease;
private String appVersion;
private String ip;
private String areaName;
private String networkIp;
private String networkAreaName;
private Long firstTime;
private Long latestTime;
private String phoneName;
private String networkModel;
private String deviceName;
private String zoneName;
private String zoneType;
private String zoneCode;
}
package com.flink.vo.userDailyActivity;
import java.util.List;
import org.apache.commons.collections.CollectionUtils;
import org.apache.commons.lang3.StringUtils;
import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONObject;
import com.alibaba.fastjson.TypeReference;
import com.flink.processor.function.GenDeviceIdProcessor;
import com.flink.processor.function.UserPropertiesProcessor;
import com.flink.util.CompareUtils;
import com.flink.util.TimeConvertUtil;
import com.flink.util.ip2region.SearcherUtil;
import com.flink.vo.DeviceIdInfo;
import com.flink.vo.EventList;
import com.flink.vo.OdsCollectLog;
import com.flink.vo.OdsEventLog;
import com.flink.vo.PcCollectLog;
import com.flink.vo.PcDeviceInfo;
import com.flink.vo.PcEventInfo;
import com.flink.vo.PcOdsEventLog;
import com.flink.vo.PcProperties;
import com.flink.vo.UserProperties;
/**
* @author wjs
* @version 创建时间:2025-8-13 17:37:32 类说明 构建日活参数
*/
public class BulidDailyParams {
// 处理APP设备ID数据
public static DailyActivityDeviceInfo handleDeviceData(String value) throws Exception {
OdsCollectLog log = JSON.parseObject(value, OdsCollectLog.class);
if (log == null)
return null;
String deviceId = log.getDevice_id();
String uniqueId = log.getUnique_id();
String appType = log.getApp_type();
String appKey = log.getApp_key();
String other_info = log.getOther_info();
String device_info = log.getDevice_info();
String env_info = log.getEnv_info();
String createTime = log.getCreate_time();
DeviceIdInfo deviceIdInfo = GenDeviceIdProcessor.genDeviceId(appType, appKey, other_info, device_info,
env_info);
UserProperties userProperties = UserPropertiesProcessor.userPropertiesToJson(log.getUser_properties());
if (null == deviceIdInfo) {
return null;
}
if (StringUtils.isEmpty(deviceIdInfo.getDeviceIdV1())) {
return null;
}
if (null == userProperties) {
return null;
}
if (StringUtils.isEmpty(userProperties.getCid())) {
return null;
}
if (StringUtils.isEmpty(userProperties.getPhone())) {
return null;
}
return new DailyActivityDeviceInfo(deviceId, deviceIdInfo.getDeviceIdV1(), appKey, uniqueId, appType,
createTime.substring(0, 10), deviceIdInfo.getModel(), deviceIdInfo.getBrand(),
deviceIdInfo.getOsRelease(), deviceIdInfo.getAppVersion(),
TimeConvertUtil.convertToTimestamp(createTime), log.getZone_name(), log.getZone_type(),
log.getZone_code(),getPlatformByAppKey(appKey),null,null,null);
}
// 处理PC设备ID数据
public static DailyActivityDeviceInfo handlePcDeviceData(String value) throws Exception {
PcCollectLog log = JSONObject.parseObject(value, new TypeReference<PcCollectLog>() {
});
if (null == log) {
return null;
}
String appType = log.getApp_type();
String appKey = log.getApp_key();
String device_info = log.getDevice_info();
String createTime = log.getCreate_time();
if (StringUtils.isEmpty(device_info)) {
return null;
}
PcDeviceInfo pcDeviceInfo = JSONObject.parseObject(device_info, new TypeReference<PcDeviceInfo>() {
});
if (null == pcDeviceInfo) {
return null;
}
if (StringUtils.isEmpty(pcDeviceInfo.getCid())) {
return null;
}
if (StringUtils.isEmpty(pcDeviceInfo.getPhone())) {
return null;
}
String deviceId = pcDeviceInfo.getI8();
String deviceName = pcDeviceInfo.getB2() + "-" + pcDeviceInfo.getB3();
return new DailyActivityDeviceInfo(deviceId, deviceId, appKey, deviceId, appType, createTime.substring(0, 10),
pcDeviceInfo.getB3(), pcDeviceInfo.getB2(), pcDeviceInfo.getB4(), log.getApp_version(),
TimeConvertUtil.convertToTimestamp(createTime), log.getZone_name(), log.getZone_type(),
log.getZone_code(),getPlatformByAppKey(appKey),deviceName,null,null);
}
// 处理APP事件数据
public static DailyActivityEventInfo handleEventData(OdsEventLog odsEventLog, EventList event) throws Exception {
String deviceId = odsEventLog.getDevice_id();
String uniqueId = odsEventLog.getUnique_id();
String appType = odsEventLog.getApp_type();
String appKey = odsEventLog.getApp_key();
String createTime = odsEventLog.getCreate_time();
String routeIp = odsEventLog.getRoute_ip();
String userProperties = odsEventLog.getUser_properties();
if (StringUtils.isEmpty(appKey) || StringUtils.equals(appKey, "C7jias27jias2")) {
appKey = "8ooOvXJo276";
}
String cid = null;
String phone = null;
String nick = null;
if (StringUtils.isNotEmpty(userProperties)) {
List<UserProperties> userPropertiesList = JSONObject.parseObject(userProperties,
new TypeReference<List<UserProperties>>() {
});
if (userPropertiesList != null && userPropertiesList.size() > 0) {
for (UserProperties user : userPropertiesList) {
if (StringUtils.isNotEmpty(user.getCid())) {
cid = user.getCid();
} else if (StringUtils.isNotEmpty(user.getPhone())) {
phone = user.getPhone();
} else if (StringUtils.isNotEmpty(user.getId())) {
cid = user.getId();
} else if (StringUtils.isNotEmpty(user.getNick())) {
nick = user.getNick();
} else if (StringUtils.isNotEmpty(user.getEmail())) {
nick = user.getEmail();
}
}
}
}
List<String> ips = SearcherUtil.convertStringToList(routeIp);
if (CollectionUtils.isEmpty(ips)) {
return null;
}
String ip_name = null;
String area_name = null;
for (String ip : ips) {
if (!SearcherUtil.ipv6(ip)) {
area_name = SearcherUtil.getCityInfoByFile(ip);
if (!CompareUtils.stringExists(area_name, "0|0|0|内网IP|内网IP",
"0|0|0|内网IP|Finance-and-Promoting-Technology")) {
ip_name = ip;
break;
} else {
ip_name = null;
area_name = null;
}
}
}
if (StringUtils.isEmpty(ip_name)) {
return null;
}
return new DailyActivityEventInfo(deviceId, uniqueId, cid, phone, nick, ip_name, area_name,
TimeConvertUtil.parseToStringSSS(event.getR9()), TimeConvertUtil.convertToTimestamp(createTime), appKey,
appType, createTime, createTime.substring(0, 10),getPlatformByAppKey(appKey));
}
// 处理PC事件数据
public static DailyActivityEventInfo handlePcEventData(PcOdsEventLog pcOdsEventLog, PcEventInfo pcEventInfo,
PcProperties pcProperties) throws Exception {
String appKey = pcOdsEventLog.getApp_key();
String appType = pcOdsEventLog.getApp_type();
String createTime = pcOdsEventLog.getCreate_time();
String cid = pcEventInfo.getCid();
String phone = pcEventInfo.getPhone();
String nick = pcEventInfo.getNick();
if (StringUtils.isEmpty(cid)) {
return null;
}
if (StringUtils.isEmpty(phone)) {
return null;
}
String routeIp = pcEventInfo.getS1();
if (StringUtils.isEmpty(routeIp)) {
return null;
}
List<String> ips = SearcherUtil.convertStringToList(routeIp);
if (CollectionUtils.isEmpty(ips)) {
return null;
}
String ip_name = null;
String area_name = null;
for (String ip : ips) {
if (!SearcherUtil.ipv6(ip)) {
area_name = SearcherUtil.getCityInfoByFile(ip);
if (!CompareUtils.stringExists(area_name, "0|0|0|内网IP|内网IP",
"0|0|0|内网IP|Finance-and-Promoting-Technology", "Request timed out.", "*")) {
ip_name = ip;
break;
} else {
ip_name = null;
area_name = null;
}
}
}
if (StringUtils.isEmpty(ip_name)) {
return null;
}
return new DailyActivityEventInfo(pcEventInfo.getI8(), pcEventInfo.getI8(), cid, phone, nick, ip_name,
area_name, TimeConvertUtil.parseToStringSSS(pcProperties.getR9()),
TimeConvertUtil.convertToTimestamp(createTime), appKey, appType, createTime,
createTime.substring(0, 10),getPlatformByAppKey(appKey));
}
private static String getPlatformByAppKey(String appKey) {
switch (appKey) {
// 无链平台
case "8ooOvXJo276":
return "无链安卓国内版";
case "9JQ3A7GA420":
return "无链IOS海外版";
// 私米平台
case "ptyzTPaV207":
return "私米安卓国内版";
case "giHQ1YLp925":
return "私米IOS国内版";
case "lOxLJYzx658":
return "私米安卓海外版";
case "lcALJYzx932":
return "私米IOS海外版";
// pc 国内版
case "pc1KPjmh951":
return "Win国内版";
case "pcrIjvC5805":
return "Linux国内版";
case "pcUXtmMh356":
return "MacIntel国内版";
case "pcrPGB1z531":
return "MacArm国内版";
// pc 海外版
case "pcRIhwh1380":
return "Win海外版";
case "pcQmdNl0952":
return "Linux海外版";
case "pc1etTC6207":
return "MacIntel海外版";
case "pcd9Sa8T989":
return "MacArm海外版";
default:
return "未知平台";
}
}
}
package com.flink.vo.userDailyActivity;
import java.io.Serializable;
import lombok.Data;
import lombok.ToString;
/**
* @author wjs
* @version 创建时间:2025-8-1 16:17:44 类说明
*/
@Data
@ToString
public class DailyActivityCombinedLog implements Serializable {
/**
*
*/
private static final long serialVersionUID = 1L;
private String deviceIdV1;
private String appKey;
private String appType;
private String dt;
private String model;
private String brand;
private String osRelease;
private String appVersion;
private String cid;
private String phone;
private String nick;
private String ip;
private String areaName;
private long waterMarkTime;
private String zoneName;
private String zoneType;
private String zoneCode;
private long firstTime;
private long latestTime;
private String platform;
private String deviceName;
private String countryCode;
private String network_ip;
private String network_area_name;
public DailyActivityCombinedLog(String deviceIdV1, String appKey, String appType, String dt, String model,
String brand, String osRelease, String appVersion, String cid, String phone, String nick, String ip,
String areaName, long waterMarkTime, String zoneName, String zoneType, String zoneCode, long firstTime,
long latestTime, String deviceName, String platform,String countryCode,String network_ip,String network_area_name) {
super();
this.deviceIdV1 = deviceIdV1;
this.appKey = appKey;
this.appType = appType;
this.dt = dt;
this.model = model;
this.brand = brand;
this.osRelease = osRelease;
this.appVersion = appVersion;
this.cid = cid;
this.phone = phone;
this.nick = nick;
this.ip = ip;
this.areaName = areaName;
this.waterMarkTime = waterMarkTime;
this.zoneName = zoneName;
this.zoneType = zoneType;
this.zoneCode = zoneCode;
this.firstTime = firstTime;
this.latestTime = latestTime;
this.deviceName = deviceName;
this.platform = platform;
this.countryCode = countryCode;
this.network_ip = network_ip;
this.network_area_name = network_area_name;
}
}
package com.flink.vo.userDailyActivity;
import java.io.Serializable;
import lombok.Data;
import lombok.ToString;
/**
* @author wjs
* @version 创建时间:2025-8-13 17:19:11
* 类说明 日活设备信息
*/
@Data
@ToString
public class DailyActivityDeviceInfo implements Serializable{
/**
*
*/
private static final long serialVersionUID = 1L;
private String deviceId;
private String deviceIdV1;
private String appKey;
private String uniqueId;
private String appType;
private String dt;
private String model;
private String brand;
private String osRelease;
private String appVersion;
private String zoneName;
private String zoneType;
private String zoneCode;
private Long waterMarkTime;
private String platform;
private String deviceName;
private String network_ip;
private String network_area_name;
public DailyActivityDeviceInfo(
String deviceId,
String deviceIdV1,
String appKey,
String uniqueId,
String appType,
String dt,
String model,
String brand,
String osRelease,
String appVersion,
Long waterMarkTime,
String zoneName,
String zoneType,
String zoneCode,
String platform,
String deviceName,
String network_ip,
String network_area_name
) {
this.deviceId = deviceId;
this.deviceIdV1 = deviceIdV1;
this.appKey = appKey;
this.uniqueId = uniqueId;
this.appType = appType;
this.dt = dt;
this.model = model;
this.brand = brand;
this.osRelease = osRelease;
this.appVersion = appVersion;
this.waterMarkTime = waterMarkTime;
this.zoneName = zoneName;
this.zoneType = zoneType;
this.zoneCode = zoneCode;
this.platform = platform;
this.deviceName = deviceName;
this.network_ip = network_ip;
this.network_area_name = network_area_name;
}
}
package com.flink.vo.userDailyActivity;
import java.io.Serializable;
import lombok.Data;
import lombok.ToString;
/**
* @author wjs
* @version 创建时间:2025-8-1 16:24:00
* 类说明
*/
@Data
@ToString(callSuper = true)
public class DailyActivityEnrichedLog extends DailyActivityCombinedLog implements Serializable{
/**
*
*/
private static final long serialVersionUID = 1L;
private String phoneName;
private String networkModel;
/**
* 核心构造方法:基于原始日志对象和维度数据创建增强日志
* @param baseLog 原始日志对象
* @param phoneName 品牌手机名称(可空)
* @param networkModel 入网型号(可空)
*/
public DailyActivityEnrichedLog(DailyActivityCombinedLog baseLog, String phoneName, String networkModel) {
// 调用父类构造方法初始化基础字段
super(
baseLog.getDeviceIdV1(), baseLog.getAppKey(), baseLog.getAppType(),
baseLog.getDt(), baseLog.getModel(), baseLog.getBrand(),
baseLog.getOsRelease(), baseLog.getAppVersion(), baseLog.getCid(),
baseLog.getPhone(), baseLog.getNick(), baseLog.getIp(),
baseLog.getAreaName(), baseLog.getWaterMarkTime(),
baseLog.getZoneName(), baseLog.getZoneType(), baseLog.getZoneCode(),
baseLog.getFirstTime(), baseLog.getLatestTime(),baseLog.getDeviceName(), baseLog.getPlatform(), baseLog.getCountryCode(),
baseLog.getNetwork_ip(),baseLog.getNetwork_area_name()
);
this.phoneName = phoneName;
this.networkModel = networkModel;
}
}
package com.flink.vo.userDailyActivity;
import java.io.Serializable;
import lombok.Data;
import lombok.ToString;
/**
* @author wjs
* @version 创建时间:2025-8-13 17:40:32
* 类说明
*/
@Data
@ToString
public class DailyActivityEventInfo implements Serializable{
/**
*
*/
private static final long serialVersionUID = 1L;
private String deviceId;
private String uniqueId;
private String cid;
private String phone;
private String nick;
private String ip;
private String areaName;
private String eventTime;
private Long waterMarkTime;
private String appKey;
private String appType;
private String createTime;
private String dt;
private String platform;
public DailyActivityEventInfo(String deviceId,String uniqueId, String cid, String phone, String nick, String ip, String areaName, String eventTime,Long waterMarkTime,
String appKey, String appType, String createTime,String dt,String platform) {
this.deviceId = deviceId;
this.uniqueId = uniqueId;
this.cid = cid;
this.phone = phone;
this.nick = nick;
this.ip = ip;
this.areaName = areaName;
this.eventTime = eventTime;
this.waterMarkTime = waterMarkTime;
this.appKey = appKey;
this.appType = appType;
this.createTime = createTime;
this.dt = dt;
this.platform = platform;
}
}
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or sign in to comment