Commit 44507015 by 魏建枢

日活,事件曝光,质押代码提交

parent 2ac7a1b0
Showing with 1135 additions and 683 deletions
......@@ -71,22 +71,32 @@ public class EventLogAchi implements Serializable {
private static final String[] EVENT_ERROR_FIELDS = { "id", "dt", "event_time", "create_time", "app_key", "app_type",
"cid", "phone", "nick", "event", "data", "startTime", "timeDifference", "endTime", "userId",
DORIS_DELETE_SIGN };
private static final DataType[] EVENT_ERROR_TYPES = { DataTypes.STRING(), DataTypes.DATE(), DataTypes.TIMESTAMP(3),
DataTypes.TIMESTAMP(3), DataTypes.STRING(), DataTypes.STRING(), DataTypes.STRING(), DataTypes.STRING(),
DataTypes.STRING(), DataTypes.STRING(), DataTypes.STRING(), DataTypes.TIMESTAMP(3), DataTypes.STRING(),
DataTypes.TIMESTAMP(3), DataTypes.STRING(), DataTypes.INT() };
// 事件曝光表配置
private static final String[] EVENT_EXPOSURE_FIELDS = { "cid", "phone", "exposure_type", "time", "event_type",
"article_id","nick", "create_time", "send_time",DORIS_DELETE_SIGN };
private static final DataType[] EVENT_EXPOSURE_TYPES = { DataTypes.STRING(), DataTypes.STRING(), DataTypes.STRING(), DataTypes.TIMESTAMP(3), DataTypes.STRING(),
DataTypes.STRING(), DataTypes.STRING(), DataTypes.STRING(), DataTypes.STRING(), DataTypes.INT() };
public static void eventLog(DataStreamSource<String> dataStreamSource) {
// 初始化表配置
TableConfig eventConfig = new TableConfig(EVENT_FIELDS, EVENT_TYPES, "bi.event_log");
TableConfig eventErrorConfig = new TableConfig(EVENT_ERROR_FIELDS, EVENT_ERROR_TYPES, "bi.event_log_error");
TableConfig eventExposureConfig = new TableConfig(EVENT_EXPOSURE_FIELDS, EVENT_EXPOSURE_TYPES, "ai.event_exposure");
// 创建Doris Sink
DorisSink<RowData> dorisEventSink = DorisConnector.sinkDoris(eventConfig.getFields(), eventConfig.getTypes(),
eventConfig.getTableName());
DorisSink<RowData> dorisEventErrorSink = DorisConnector.sinkDoris(eventErrorConfig.getFields(),
eventErrorConfig.getTypes(), eventErrorConfig.getTableName());
DorisSink<RowData> dorisEventExposureSink = DorisConnector.sinkDoris(eventExposureConfig.getFields(),
eventExposureConfig.getTypes(), eventExposureConfig.getTableName());
// 处理设备信息采集日志数据
processDataStream(dataStreamSource, "eventLog", eventConfig, dorisEventSink,
......@@ -97,6 +107,11 @@ public class EventLogAchi implements Serializable {
return null;
}, EventLogAchi::mapToEventErrorRow
);
batchExposureProcessDataStream(dataStreamSource, "eventLog", eventExposureConfig, dorisEventExposureSink,
(item, fieldCount) -> {
return null;
}, EventLogAchi::mapToEventExposureRow
);
}
private static void processDataStream(DataStreamSource<String> dataStream, String flumeType,
......@@ -116,6 +131,14 @@ public class EventLogAchi implements Serializable {
.returns(TypeInformation.of(RowData.class)).filter(Objects::nonNull);
processedStream.sinkTo(dorisSink).name("Doris-" + flumeType);
}
private static void batchExposureProcessDataStream(DataStreamSource<String> dataStream, String flumeType,
TableConfig tableConfig, DorisSink<RowData> dorisSink, RowMapper mapper, EventExposureMapper exposureMapper) {
SingleOutputStreamOperator<RowData> processedStream = dataStream
.flatMap(new ElementExposureProcessor(flumeType, exposureMapper, tableConfig.getFields().length))
.returns(TypeInformation.of(RowData.class)).filter(Objects::nonNull);
processedStream.sinkTo(dorisSink).name("Doris-" + flumeType);
}
/**
* 使用map算子的内部处理类
......@@ -222,6 +245,74 @@ public class EventLogAchi implements Serializable {
return CompareUtils.stringExists(eventType, "socket_event", "socket_error", "socket_time", "refresh_token","all_time");
}
}
private static class ElementExposureProcessor implements FlatMapFunction<String, RowData>, Serializable {
/**
*
*/
private static final long serialVersionUID = 1L;
private final String flumeType;
private final EventExposureMapper exposureMapper;
private final int fieldCount;
public ElementExposureProcessor(String flumeType, EventExposureMapper exposureMapper, int fieldCount) {
this.flumeType = flumeType;
this.exposureMapper = exposureMapper;
this.fieldCount = fieldCount;
}
@Override
public void flatMap(String value, Collector<RowData> out) throws Exception {
try {
JSONObject jsonObj = JSON.parseObject(value);
if (!flumeType.equals(jsonObj.getString(FLUME_TYPE_FIELD))) {
return;
}
processKafkaMessage(value, out);
} catch (Exception e) {
logger.error("UserInvitationAchi 处理 {} 数据出错 | rawData:{} | error:{}", flumeType, value, e.getMessage(),
e);
}
}
private void processKafkaMessage(String value, Collector<RowData> out) {
try {
OdsEventLog event = JSON.parseObject(value, OdsEventLog.class);
if (event == null)
return;
List<EventList> eventList = parseEventListSafely(event.getEvent_list());
if (CollectionUtils.isEmpty(eventList))
return;
int dt = TimeConvertUtil.convertToSqlDate(event.getCreate_time().substring(0, 10));
UserProperties userProps = UserPropertiesProcessor.userPropertiesToJson(event.getUser_properties());
for (EventList eventInfo : eventList) {
if (isTargetEventType(eventInfo.getR7())) {
out.collect(exposureMapper.map(event, eventInfo, userProps, dt, fieldCount));
}
}
} catch (Exception e) {
logger.error("Kafka消息处理失败 | data:{} | error:{}", value, e.getMessage());
}
}
private static List<EventList> parseEventListSafely(String jsonStr) {
try {
return JSON.parseArray(jsonStr, EventList.class);
} catch (Exception e) {
logger.warn("事件列表解析失败: {}", jsonStr);
return Collections.emptyList();
}
}
// 事件类型过滤
private static boolean isTargetEventType(String eventType) {
return CompareUtils.stringExists(eventType, "enter_act", "exit_act", "show_act");
}
}
private static RowData mapToEventRow(Object item, int fieldCount) {
String value = (String) item;
......@@ -294,6 +385,25 @@ public class EventLogAchi implements Serializable {
row.setField(15, DELETE_SIGN_VALUE);
return row;
}
private static RowData mapToEventExposureRow(OdsEventLog event, EventList eventInfo, UserProperties userProps, int dt,
int fieldCount) {
GenericRowData row = new GenericRowData(fieldCount);
row.setField(0, StringData.fromString(userProps == null ? null : userProps.getCid()));
row.setField(1, StringData.fromString(userProps == null ? null : userProps.getPhone()));
row.setField(2, StringData.fromString(eventInfo.getR7()));
row.setField(3,
TimestampData.fromLocalDateTime(LocalDateTime.parse(TimeConvertUtil.parseToStringSSS(eventInfo.getR9()),
DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss.SSS"))));
Properties r8 = eventInfo.getR8();
row.setField(4, StringData.fromString(r8.getType()));
row.setField(5, StringData.fromString(r8.getId()));
row.setField(6, StringData.fromString(userProps == null ? null : userProps.getNick()));
row.setField(7, StringData.fromString(event.getCreate_time()));
row.setField(8, StringData.fromString(event.getSend_time()));
row.setField(9, DELETE_SIGN_VALUE);
return row;
}
/**
* 行数据映射接口
......@@ -309,4 +419,9 @@ public class EventLogAchi implements Serializable {
private interface EventErrorMapper extends Serializable {
RowData map(OdsEventLog event, EventList eventInfo, UserProperties userProps, int dt, int fieldCount);
}
@FunctionalInterface
private interface EventExposureMapper extends Serializable {
RowData map(OdsEventLog event, EventList eventInfo, UserProperties userProps, int dt, int fieldCount);
}
}
......@@ -26,6 +26,7 @@ import com.flink.config.TableConfig;
import com.flink.vo.RealBalance;
import com.flink.vo.RealKyc;
import com.flink.vo.RealLead;
import com.flink.vo.RealStaking;
import com.flink.vo.RealTransaction;
import com.flink.vo.RealUsers;
......@@ -86,6 +87,14 @@ public class UserInvitationAchi implements Serializable {
private static final DataType[] LEAD_TYPES = { DataTypes.INT(), DataTypes.STRING(), DataTypes.STRING(),
DataTypes.STRING(), DataTypes.BIGINT(), DataTypes.BIGINT(), DataTypes.BIGINT(), DataTypes.STRING(),
DataTypes.STRING(), DataTypes.INT() };
//质押表配置
private static final String[] REAL_STAKING_FIELDS = { "tx_index", "tx_hash", "block_height", "block_timestamp", "from_account_id", "to_account_id",
"is_relayer", "amount", "symbol","post_time", DORIS_DELETE_SIGN };
private static final DataType[] REAL_STAKING_TYPES = { DataTypes.STRING(), DataTypes.STRING(), DataTypes.STRING(),
DataTypes.STRING(), DataTypes.STRING(), DataTypes.STRING(), DataTypes.STRING(), DataTypes.STRING(),
DataTypes.STRING(),DataTypes.STRING(), DataTypes.INT() };
public static void userInvitation(DataStreamSource<String> dataStreamSource) {
// 初始化表配置
......@@ -94,6 +103,7 @@ public class UserInvitationAchi implements Serializable {
TableConfig keyConfig = new TableConfig(KYC_FIELDS, KYC_TYPES, "bi.real_kyc");
TableConfig balanceConfig = new TableConfig(BALANCE_FIELDS, BALANCE_TYPES, "bi.real_balance");
TableConfig leadConfig = new TableConfig(LEAD_FIELDS, LEAD_TYPES, "bi.real_lead_switch");
TableConfig realStakingConfig = new TableConfig(REAL_STAKING_FIELDS, REAL_STAKING_TYPES, "bi.real_staking");
// 创建Doris Sink
DorisSink<RowData> dorisUsersSink = DorisConnector.sinkDoris(usersConfig.getFields(), usersConfig.getTypes(),
......@@ -106,6 +116,8 @@ public class UserInvitationAchi implements Serializable {
balanceConfig.getTypes(), balanceConfig.getTableName());
DorisSink<RowData> dorisLeadSink = DorisConnector.sinkDoris(leadConfig.getFields(), leadConfig.getTypes(),
leadConfig.getTableName());
DorisSink<RowData> dorisRealStakingSink = DorisConnector.sinkDoris(realStakingConfig.getFields(), realStakingConfig.getTypes(),
realStakingConfig.getTableName());
// 处理用户数据
// processDataStream(dataStreamSource, "realUsers", usersConfig, dorisUsersSink,(RowMapper<RealUsers>) UserInvitationAchi::mapToUsersRow);
......@@ -119,6 +131,8 @@ public class UserInvitationAchi implements Serializable {
processDataStream(dataStreamSource, "realBalance", balanceConfig, dorisBalanceSink,(RowMapper<RealBalance>) UserInvitationAchi::mapToBalanceRow);
//处理真实上级数据
processDataStream(dataStreamSource, "realLead", leadConfig, dorisLeadSink,(RowMapper<RealLead>) UserInvitationAchi::mapToLeadRow);
//处理质押数据
processDataStream(dataStreamSource, "realStaking", realStakingConfig, dorisRealStakingSink,(RowMapper<RealStaking>) UserInvitationAchi::mapToStakingRow);
}
/**
......@@ -183,12 +197,11 @@ public class UserInvitationAchi implements Serializable {
case "realKyc": return RealKyc.class;
case "realBalance": return RealBalance.class;
case "realLead": return RealLead.class;
case "realStaking": return RealStaking.class;
default: throw new IllegalArgumentException("未知类型: " + type);
}
}
}
// 用户数据映射
private static RowData mapToUsersRow(Object item, int fieldCount) {
......@@ -281,6 +294,23 @@ public class UserInvitationAchi implements Serializable {
row.setField(9, DELETE_SIGN_VALUE);
return row;
}
// 质押 数据映射
private static RowData mapToStakingRow(Object item, int fieldCount) {
RealStaking staking = (RealStaking) item;
GenericRowData row = new GenericRowData(fieldCount);
row.setField(0, StringData.fromString(staking.getTx_index()));
row.setField(1, StringData.fromString(staking.getTx_hash()));
row.setField(2, StringData.fromString(staking.getBlock_height()));
row.setField(3, StringData.fromString(staking.getBlock_timestamp()));
row.setField(4, StringData.fromString(staking.getFrom_account_id()));
row.setField(5, StringData.fromString(staking.getTo_account_id()));
row.setField(6, StringData.fromString(staking.getIs_relayer()));
row.setField(7, StringData.fromString(staking.getAmount()));
row.setField(8, StringData.fromString(staking.getSymbol()));
row.setField(9, StringData.fromString(staking.getPost_time()));
row.setField(10, DELETE_SIGN_VALUE);
return row;
}
/**
* 行数据映射接口
......
......@@ -103,7 +103,6 @@ public class EventIpConvertAchi extends SourceCommonBase implements Serializable
}
public static JSONObject handleData(String record) throws ParseException, Exception {
logger.info("EventIpConvertAchi record:{}",record);
// TODO 数据的 ETL 处理
OdsEventLog odsEventLog = JSONObject.parseObject(record,new TypeReference<OdsEventLog>(){});
String id = odsEventLog.getId();
......@@ -111,7 +110,6 @@ public class EventIpConvertAchi extends SourceCommonBase implements Serializable
String deviceId = odsEventLog.getDevice_id();
String routeIp = odsEventLog.getRoute_ip();
String userProperties = odsEventLog.getUser_properties();
logger.info("组装数据 body:{}",odsEventLog.toString());
String cid = null;
String phone = null;
String nick = null;
......@@ -156,7 +154,6 @@ public class EventIpConvertAchi extends SourceCommonBase implements Serializable
if(StringUtils.isEmpty(ip_name)) {
return null;
}
logger.info("组装数据开始");
JSONObject jsonObj = new JSONObject();
jsonObj.put("id", id);
jsonObj.put("ips", ip_name);
......
......@@ -97,8 +97,8 @@ public class RegistrationCheckAchi extends MultipleSourceCommonBase implements S
simiUserInfo.setPhone_number(userRegistrationReqDto.getPhoneNumber());
simiUserInfo.setUpdateTime(TimeConvertUtil.convertToTimestampSSS(userRegistrationReqDto.getTime()));
logger.info(">>>>>>>>>>registerDataStream cid:{},Country_code:{},Phone_number:{},UpdateTime:{}", userRegistrationReqDto.getCid()
,userRegistrationReqDto.getCountryCode(),userRegistrationReqDto.getPhoneNumber(),TimeConvertUtil.convertToTimestampSSS(userRegistrationReqDto.getTime()));
// logger.info(">>>>>>>>>>registerDataStream cid:{},Country_code:{},Phone_number:{},UpdateTime:{}", userRegistrationReqDto.getCid()
// ,userRegistrationReqDto.getCountryCode(),userRegistrationReqDto.getPhoneNumber(),TimeConvertUtil.convertToTimestampSSS(userRegistrationReqDto.getTime()));
out.collect(simiUserInfo);
} catch (Exception e) {
logger.error("Error parsing simi_user_list 处理 Kafka 消息出错 | data:{} | error:{}", value,e.getMessage());
......@@ -123,7 +123,7 @@ public class RegistrationCheckAchi extends MultipleSourceCommonBase implements S
//解析 Kafka 数据
DeviceId device = DeviceIdLatestAchi.handleData(value);
if (device != null) {
logger.info(">>>>>>>>>>mergedDeviceStream cid:{},CollectTime:{},CreateTime:{}", device.getCid(),device.getCollectTime(),device.getCreateTime());
// logger.info(">>>>>>>>>>mergedDeviceStream cid:{},CollectTime:{},CreateTime:{}", device.getCid(),device.getCollectTime(),device.getCreateTime());
out.collect(device);
}
} catch (Exception e) {
......@@ -141,7 +141,7 @@ public class RegistrationCheckAchi extends MultipleSourceCommonBase implements S
//解析 Kafka 数据
DeviceId device = DeviceIdLatestAchi.handlePcData(value);
if (device != null) {
logger.info(">>>>>>>>>>mergedDeviceStreamPc cid:{},CollectTime:{},CreateTime:{}", device.getCid(),device.getCollectTime(),device.getCreateTime());
// logger.info(">>>>>>>>>>mergedDeviceStreamPc cid:{},CollectTime:{},CreateTime:{}", device.getCid(),device.getCollectTime(),device.getCreateTime());
out.collect(device);
}
} catch (Exception e) {
......@@ -168,7 +168,7 @@ public class RegistrationCheckAchi extends MultipleSourceCommonBase implements S
public void processElement(SimiUserInfo user, DeviceId device, Context ctx,
Collector<Tuple3<String, String, Long>> out) {
// 输出: (设备ID, 用户ID, 注册时间)
logger.info(">>>>>>>>>>deviceUserStream deviceId:{},cid:{},UpdateTime:{}", device.getDeviceId(), user.getCid(), user.getUpdateTime());
// logger.info(">>>>>>>>>>deviceUserStream deviceId:{},cid:{},UpdateTime:{}", device.getDeviceId(), user.getCid(), user.getUpdateTime());
// 确保cid不为空且相等(实际上keyBy已经保证,此处为冗余校验)
if (user.getCid().equals(device.getCid())) {
out.collect(Tuple3.of(device.getDeviceId(), user.getCid(), user.getUpdateTime()));
......
......@@ -583,7 +583,7 @@ public class SimiFriendsAchi extends MultipleSourceCommonBase implements Seriali
.batchSize(3) // 每批2个参数
.build();
List<Map<String, Object>> params = DynamicSqlBuilder.queryDoris(queries);
logger.info(">>>>>>>>>>>params:{}",params);
// logger.info(">>>>>>>>>>>params:{}",params);
return params;
}
......
......@@ -6,12 +6,11 @@ import java.sql.DriverManager;
import java.sql.PreparedStatement;
import java.sql.ResultSet;
import java.time.Duration;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import java.util.Objects;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;
import java.util.function.Supplier;
import org.apache.commons.collections.CollectionUtils;
import org.apache.commons.lang3.StringUtils;
......@@ -19,23 +18,24 @@ import org.apache.doris.flink.sink.DorisSink;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.common.functions.CoGroupFunction;
import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.common.functions.JoinFunction;
import org.apache.flink.api.common.io.ParseException;
import org.apache.flink.api.common.state.MapState;
import org.apache.flink.api.common.state.MapStateDescriptor;
import org.apache.flink.api.common.state.StateTtlConfig;
import org.apache.flink.api.common.state.ValueState;
import org.apache.flink.api.common.state.ValueStateDescriptor;
import org.apache.flink.api.common.typeinfo.TypeHint;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.datastream.AsyncDataStream;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.functions.KeyedProcessFunction;
import org.apache.flink.streaming.api.functions.ProcessFunction;
import org.apache.flink.streaming.api.functions.async.ResultFuture;
import org.apache.flink.streaming.api.functions.async.RichAsyncFunction;
import org.apache.flink.streaming.api.functions.co.ProcessJoinFunction;
import org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows;
import org.apache.flink.table.api.DataTypes;
import org.apache.flink.table.data.GenericRowData;
......@@ -44,32 +44,31 @@ import org.apache.flink.table.data.StringData;
import org.apache.flink.table.data.TimestampData;
import org.apache.flink.table.types.DataType;
import org.apache.flink.util.Collector;
import org.apache.flink.util.OutputTag;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONObject;
import com.alibaba.fastjson.TypeReference;
import com.flink.common.DorisConnector;
import com.flink.common.MultipleSourceCommonBase;
import com.flink.enums.TopicTypeEnum;
import com.flink.processor.function.GenDeviceIdProcessor;
import com.flink.processor.function.LatestUserProcessFunction;
import com.flink.util.CompareUtils;
import com.flink.util.TimeConvertUtil;
import com.flink.util.ip2region.SearcherUtil;
import com.flink.vo.CombinedLog;
import com.flink.vo.DeviceIdInfo;
import com.flink.vo.DeviceLog;
import com.flink.vo.EnrichedLog;
import com.flink.vo.EventList;
import com.flink.vo.EventLog;
import com.flink.vo.KafkaDataSource;
import com.flink.vo.OdsCollectLog;
import com.flink.vo.OdsEventLog;
import com.flink.vo.PcEventInfo;
import com.flink.vo.PcOdsEventLog;
import com.flink.vo.PcProperties;
import com.flink.vo.SimiUserInfo;
import com.flink.vo.UserDailyActivityOutputLog;
import com.flink.vo.UserProperties;
import com.flink.vo.userDailyActivity.BulidDailyParams;
import com.flink.vo.userDailyActivity.DailyActivityCombinedLog;
import com.flink.vo.userDailyActivity.DailyActivityDeviceInfo;
import com.flink.vo.userDailyActivity.DailyActivityEnrichedLog;
import com.flink.vo.userDailyActivity.DailyActivityEventInfo;
/**
* @author wjs
......@@ -87,9 +86,9 @@ public class UserDailyActivityAchi extends MultipleSourceCommonBase implements S
@Override
public void parseSourceKafkaJson(List<KafkaDataSource> dataSourceList) throws ParseException, Exception {
DataStreamSource<String> collectLogStreamSource = null;
DataStreamSource<String> eventLogStreamSource = null;
DataStreamSource<String> eventStreamSource = null;
DataStreamSource<String> pcCollectLogStreamSource = null;
DataStreamSource<String> pcEventLogStreamSource = null;
DataStreamSource<String> pcEventStreamSource = null;
DataStreamSource<String> userStreamSource = null;
DataStreamSource<String> abroadUserStreamSource = null;
if(CollectionUtils.isNotEmpty(dataSourceList)) {
......@@ -98,7 +97,7 @@ public class UserDailyActivityAchi extends MultipleSourceCommonBase implements S
collectLogStreamSource = kafkaDataSource.getDataStreamSource();
}
if(StringUtils.equals(kafkaDataSource.getTopic(),TopicTypeEnum.ODS_EVENT_LOG.getTopic())) {
eventLogStreamSource = kafkaDataSource.getDataStreamSource();
eventStreamSource = kafkaDataSource.getDataStreamSource();
}
if(StringUtils.equals(kafkaDataSource.getTopic(),TopicTypeEnum.SIMI_USER_LIST_TOPIC.getTopic())) {
userStreamSource = kafkaDataSource.getDataStreamSource();
......@@ -110,310 +109,129 @@ public class UserDailyActivityAchi extends MultipleSourceCommonBase implements S
pcCollectLogStreamSource = kafkaDataSource.getDataStreamSource();
}
if(StringUtils.equals(kafkaDataSource.getTopic(),TopicTypeEnum.ODS_PC_EVENT_LOG.getTopic())) {
pcEventLogStreamSource = kafkaDataSource.getDataStreamSource();
pcEventStreamSource = kafkaDataSource.getDataStreamSource();
}
}
}else {
return;
}
// 1. 设备日志流处理
DataStream<DeviceLog> deviceLogStream = collectLogStreamSource
.flatMap(new FlatMapFunction<String, DeviceLog>() {
private static final long serialVersionUID = 1L;
@Override
public void flatMap(String value, Collector<DeviceLog> out) throws Exception {
try {
// 解析 Kafka 数据
OdsCollectLog log = JSON.parseObject(value, new TypeReference<OdsCollectLog>() {});
if (null == log) {
return;
}
String deviceId = log.getDevice_id();
String uniqueId = log.getUnique_id();
String appType = log.getApp_type();
String appKey = log.getApp_key();
String other_info = log.getOther_info();
String device_info = log.getDevice_info();
String env_info = log.getEnv_info();
String createTime = log.getCreate_time();
String sendTime = log.getSend_time();
DeviceIdInfo deviceIdInfo = GenDeviceIdProcessor.genDeviceId(appType, appKey, other_info, device_info,env_info);
DeviceLog deviceLog = new DeviceLog(
deviceId,
deviceIdInfo.getDeviceIdV1(),
deviceIdInfo.getAppKey(),
uniqueId,
appType,
createTime.substring(0, 10),
deviceIdInfo.getModel(),
deviceIdInfo.getBrand(),
deviceIdInfo.getOsRelease(),
deviceIdInfo.getAppVersion(),
TimeConvertUtil.convertToTimestamp(sendTime),
log.getZone_name(),
log.getZone_type(),
log.getZone_code()
);
if (deviceLog != null)
out.collect(deviceLog);
} catch (Exception e) {
logger.error("Error parsing ods_new_collect_log 处理 Kafka 消息出错 | data:{} | error:{}", value,
e.getMessage());
}
}
})
.filter(collectLog -> StringUtils.isNoneEmpty(collectLog.getDeviceId(),collectLog.getUniqueId()))
.assignTimestampsAndWatermarks( // 原有水印逻辑
WatermarkStrategy.<DeviceLog>forBoundedOutOfOrderness(Duration.ofMinutes(10))
.withTimestampAssigner((collectLog, ts) -> collectLog.getWaterMarkTime())
)
.keyBy(collectLog -> collectLog.getDeviceId() + "#_#" + collectLog.getUniqueId())
.process(new KeyedProcessFunction<String, DeviceLog, DeviceLog>() {
/**
*
*/
private static final long serialVersionUID = 1L;
private ValueState<DeviceLog> latestDeviceInfoState;
@Override
public void open(Configuration parameters) {
// 1. 状态描述符配置
ValueStateDescriptor<DeviceLog> descriptor = new ValueStateDescriptor<>(
"deviceInfo-state",
DeviceLog.class
);
// 2. TTL配置 (30分钟过期)
StateTtlConfig ttlConfig = StateTtlConfig.newBuilder(Duration.ofMinutes(30))
.setUpdateType(StateTtlConfig.UpdateType.OnCreateAndWrite)
.cleanupIncrementally(1000, true)
.build();
descriptor.enableTimeToLive(ttlConfig);
latestDeviceInfoState = getRuntimeContext().getState(descriptor);
}
@Override
public void processElement(DeviceLog deviceLog, Context ctx, Collector<DeviceLog> out) throws Exception {
DeviceLog currentLatest = latestDeviceInfoState.value();
// 3. 状态更新逻辑:仅保留最新数据
if (currentLatest == null || deviceLog.getWaterMarkTime() > currentLatest.getWaterMarkTime()) {
latestDeviceInfoState.update(deviceLog);
out.collect(deviceLog);
}
}
});
//设备信息合并
DataStream<DailyActivityDeviceInfo> mergedDeviceStream = mergedDeviceStream(collectLogStreamSource,pcCollectLogStreamSource);
//事件信息合并
DataStream<DailyActivityEventInfo> mergedEventStream = mergedEventStream(eventStreamSource,pcEventStreamSource);
// 用户数据合并
DataStream<SimiUserInfo> mergedAllUserStream = mergedUserDataStream(abroadUserStreamSource,userStreamSource);
//设备关联事件
DataStream<DailyActivityEnrichedLog> processedStream = deviceJoinEventStream(mergedDeviceStream,mergedEventStream);
//分组去重
DataStream<DailyActivityEnrichedLog> processedGroupDistinctStream = processedGroupDistinctStream(processedStream);
// 2. 事件日志流处理
DataStream<EventLog> eventLogStream = eventLogStreamSource
.flatMap(new FlatMapFunction<String, EventLog>(){
private static final long serialVersionUID = 1L;
@Override
public void flatMap(String value, Collector<EventLog> out) throws Exception {
try {
// 解析 Kafka 数据
OdsEventLog odsEventLog = JSONObject.parseObject(value, new TypeReference<OdsEventLog>() {});
if (null == odsEventLog) {
return;
}
List<EventList> eventList = JSONObject.parseObject(odsEventLog.getEvent_list(), new TypeReference<List<EventList>>() {});
if (CollectionUtils.isEmpty(eventList)) {
return;
}
String deviceId = odsEventLog.getDevice_id();
String uniqueId = odsEventLog.getUnique_id();
String appType = odsEventLog.getApp_type();
String appKey = odsEventLog.getApp_key();
String createTime = odsEventLog.getCreate_time();
String sendTime = odsEventLog.getSend_time();
String routeIp = odsEventLog.getRoute_ip();
String userProperties = odsEventLog.getUser_properties();
if(StringUtils.isEmpty(appKey) || StringUtils.equals(appKey, "C7jias27jias2")) {
appKey = "8ooOvXJo276";
}
String cid = null;
String phone = null;
String nick = null;
if (StringUtils.isNotEmpty(userProperties)) {
List<UserProperties> userPropertiesList = JSONObject.parseObject(userProperties,new TypeReference<List<UserProperties>>() {});
if (userPropertiesList != null && userPropertiesList.size() > 0) {
for (UserProperties user : userPropertiesList) {
if (StringUtils.isNotEmpty(user.getCid())) {
cid = user.getCid();
} else if (StringUtils.isNotEmpty(user.getPhone())) {
phone = user.getPhone();
} else if (StringUtils.isNotEmpty(user.getId())) {
cid = user.getId();
} else if (StringUtils.isNotEmpty(user.getNick())) {
nick = user.getNick();
} else if (StringUtils.isNotEmpty(user.getEmail())) {
nick = user.getEmail();
}
}
}
}
List<String> ips = SearcherUtil.convertStringToList(routeIp);
if (CollectionUtils.isEmpty(ips)) {
return;
}
String ip_name = null;
String area_name = null;
for (String ip : ips) {
if (!SearcherUtil.ipv6(ip)) {
area_name = SearcherUtil.getCityInfoByFile(ip);
if (!CompareUtils.stringExists(area_name, "0|0|0|内网IP|内网IP",
"0|0|0|内网IP|Finance-and-Promoting-Technology")) {
ip_name = ip;
break;
} else {
ip_name = null;
area_name = null;
}
}
}
if (StringUtils.isEmpty(ip_name)) {
return;
}
for(EventList event : eventList) {
EventLog eventLog = new EventLog(
deviceId,
uniqueId,
cid,
phone,
nick,
ip_name,
area_name,
TimeConvertUtil.parseToStringSSS(event.getR9()),
TimeConvertUtil.convertToTimestamp(sendTime),
appKey,
appType,
createTime,
createTime.substring(0, 10)
);
if (eventLog != null) {
out.collect(eventLog);
}
}
} catch (Exception e) {
logger.error("Error parsing ods_event_log 处理 Kafka 消息出错 | data:{} | error:{}", value,e.getMessage());
}
}
})
.filter(eventLog -> StringUtils.isNoneEmpty(eventLog.getDeviceId(),eventLog.getUniqueId()))
.assignTimestampsAndWatermarks( // 原有水印逻辑
WatermarkStrategy.<EventLog>forBoundedOutOfOrderness(Duration.ofMinutes(10))
.withTimestampAssigner((eventLog, ts) -> eventLog.getWaterMarkTime())
)
.keyBy(eventLog -> eventLog.getDeviceId() + "#_#" + eventLog.getUniqueId())
.process(new KeyedProcessFunction<String, EventLog, EventLog>() {
/**
OutputTag<DailyActivityEnrichedLog> wulianTag = new OutputTag<DailyActivityEnrichedLog>("wulian"){private static final long serialVersionUID = 1L;};
OutputTag<DailyActivityEnrichedLog> simiTag = new OutputTag<DailyActivityEnrichedLog>("simi"){private static final long serialVersionUID = 1L;};
SingleOutputStreamOperator<DailyActivityEnrichedLog> splitStream = processedGroupDistinctStream
.process(new ProcessFunction<DailyActivityEnrichedLog, DailyActivityEnrichedLog>() {
/**
*
*/
private static final long serialVersionUID = 1L;
private ValueState<EventLog> latestEventInfoState;
@Override
public void open(Configuration parameters) {
// 1. 状态描述符配置
ValueStateDescriptor<EventLog> descriptor = new ValueStateDescriptor<>(
"deviceInfo-state",
EventLog.class
);
// 2. TTL配置 (30分钟过期)
StateTtlConfig ttlConfig = StateTtlConfig.newBuilder(Duration.ofMinutes(30))
.setUpdateType(StateTtlConfig.UpdateType.OnCreateAndWrite)
.cleanupIncrementally(1000, true)
.build();
descriptor.enableTimeToLive(ttlConfig);
latestEventInfoState = getRuntimeContext().getState(descriptor);
}
@Override
public void processElement(EventLog deviceLog, Context ctx, Collector<EventLog> out) throws Exception {
EventLog currentLatest = latestEventInfoState.value();
// 3. 状态更新逻辑:仅保留最新数据
if (currentLatest == null || deviceLog.getWaterMarkTime() > currentLatest.getWaterMarkTime()) {
latestEventInfoState.update(deviceLog);
out.collect(deviceLog);
}
}
});
// 3. 国内用户数据流处理(5分钟批量更新)
DataStream<SimiUserInfo> userDataStream = userStreamSource.flatMap(new FlatMapFunction<String, SimiUserInfo>() {
private static final long serialVersionUID = 1L;
public void processElement(DailyActivityEnrichedLog log, Context ctx, Collector<DailyActivityEnrichedLog> out) {
if (isWulianApp(log.getAppKey())) {
ctx.output(wulianTag, log); // 无链业务
} else if (isSimiApp(log.getAppKey())) {
ctx.output(simiTag, log); // 私米业务
}
}
private boolean isWulianApp(String appKey) {
return Arrays.asList("8ooOvXJo276", "9JQ3A7GA420")
.contains(appKey);
}
private boolean isSimiApp(String appKey) {
return Arrays.asList(
"ptyzTPaV207",
"giHQ1YLp925",
"lOxLJYzx658",
"lcALJYzx932",
"pc1KPjmh951",
"pcrIjvC5805",
"pcUXtmMh356",
"pcrPGB1z531",
"pcRIhwh1380",
"pcQmdNl0952",
"pc1etTC6207",
"pcd9Sa8T989"
)
.contains(appKey);
}
});
DataStream<UserDailyActivityOutputLog> wulianStream = processWulian(splitStream.getSideOutput(wulianTag))
.filter(log -> log != null && log instanceof UserDailyActivityOutputLog);
DataStream<UserDailyActivityOutputLog> simiStream = processSimi(splitStream.getSideOutput(simiTag), mergedAllUserStream)
.filter(log -> log != null && log instanceof UserDailyActivityOutputLog);
sinkDoris(wulianStream);
sinkDoris(simiStream);
}
private DataStream<DailyActivityEnrichedLog> processedGroupDistinctStream(DataStream<DailyActivityEnrichedLog> processedStream) {
return processedStream.keyBy(new KeySelector<DailyActivityEnrichedLog, Tuple2<String, String>>() {
/**
*
*/
private static final long serialVersionUID = 1L;
@Override
public void flatMap(String value, Collector<SimiUserInfo> out) throws Exception {
try {
// 解析 Kafka 数据
SimiUserInfo simiUserInfo = JSONObject.parseObject(value, new TypeReference<SimiUserInfo>() {
});
simiUserInfo.setUpdateTime(TimeConvertUtil.convertToTimestamp(simiUserInfo.getCreate_time()));
out.collect(simiUserInfo);
} catch (Exception e) {
logger.error("Error parsing simi_user_list 处理 Kafka 消息出错 | data:{} | error:{}", value,e.getMessage());
}
}
})
.filter(u -> StringUtils.isNoneEmpty(u.getCid(), u.getPhone_number()))
.assignTimestampsAndWatermarks(
WatermarkStrategy.<SimiUserInfo>forBoundedOutOfOrderness(Duration.ofMinutes(5))
.withTimestampAssigner((user, ts) -> user.getUpdateTime()))
.keyBy(user -> user.getCid() + "#_#" + user.getPhone_number()).process(new LatestUserProcessFunction());
// 3. 海外用户数据流处理(5分钟批量更新)
DataStream<SimiUserInfo> abroadUserDataStream = abroadUserStreamSource.flatMap(new FlatMapFunction<String, SimiUserInfo>() {
@Override
public Tuple2<String, String> getKey(DailyActivityEnrichedLog info) {
return Tuple2.of(info.getDeviceIdV1(), info.getDt());
}
})
.process(new KeyedProcessFunction<Tuple2<String, String>, DailyActivityEnrichedLog, DailyActivityEnrichedLog>() {
/**
*
*/
private static final long serialVersionUID = 1L;
private ValueState<DailyActivityEnrichedLog> bestState;
@Override
public void flatMap(String value, Collector<SimiUserInfo> out) throws Exception {
try {
// 解析 Kafka 数据
SimiUserInfo simiUserInfo = JSONObject.parseObject(value, new TypeReference<SimiUserInfo>() {
});
simiUserInfo.setUpdateTime(TimeConvertUtil.convertToTimestamp(simiUserInfo.getCreate_time()));
out.collect(simiUserInfo);
} catch (Exception e) {
logger.error("Error parsing abroad_simi_user_list 处理 Kafka 消息出错 | data:{} | error:{}", value,e.getMessage());
}
public void open(Configuration params) {
ValueStateDescriptor<DailyActivityEnrichedLog> descriptor = new ValueStateDescriptor<>("bestInfo", DailyActivityEnrichedLog.class);
StateTtlConfig ttlConfig = StateTtlConfig.newBuilder(Duration.ofMinutes(30)).build();
descriptor.enableTimeToLive(ttlConfig);
bestState = getRuntimeContext().getState(descriptor);
}
@Override
public void processElement(DailyActivityEnrichedLog value,Context ctx, Collector<DailyActivityEnrichedLog> out) throws Exception {
DailyActivityEnrichedLog current = bestState.value();
if (current == null || isBetter(value, current)) {
bestState.update(value);
out.collect(value);
}
}
})
.filter(u -> StringUtils.isNoneEmpty(u.getCid(), u.getPhone_number()))
.assignTimestampsAndWatermarks(
WatermarkStrategy.<SimiUserInfo>forBoundedOutOfOrderness(Duration.ofMinutes(5))
.withTimestampAssigner((user, ts) -> user.getUpdateTime()))
.keyBy(user -> user.getCid() + "#_#" + user.getPhone_number()).process(new LatestUserProcessFunction());
// 4. 关联设备日志和事件日志
DataStream<CombinedLog> combinedLogStream = joinDeviceAndEventLogs(deviceLogStream, eventLogStream);
// 5. 关联品牌手机维度表
DataStream<EnrichedLog> enrichedLogStream = enrichWithPhoneModel(combinedLogStream);
DataStream<EnrichedLog> timeEnrichedStream = enrichedLogStream
.keyBy(log -> log.getDeviceIdV1())
.process(new DeviceTimeCalculator());
// 6. 按设备ID和日期分组,取最优记录
DataStream<EnrichedLog> rankedLogStream = rankAndFilter(timeEnrichedStream);
// 8. 关联用户信息
sinkDoris(joinUserInfo(rankedLogStream, userDataStream));
sinkDoris(joinUserInfo(rankedLogStream, abroadUserDataStream ));
// DataStream<UserDailyActivityOutputLog> outputAppSimiStream = joinUserInfo(rankedLogStream, userDataStream);
// DataStream<UserDailyActivityOutputLog> outputAppAbroadSimiStream = joinUserInfo(rankedLogStream, abroadUserDataStream );
private boolean isBetter(DailyActivityEnrichedLog newInfo, DailyActivityEnrichedLog current) {
boolean newValid = isValidIp(newInfo.getIp());
boolean currentValid = isValidIp(current.getIp());
if (newValid && !currentValid) return true;
if (newValid == currentValid) {
return newInfo.getWaterMarkTime() > current.getWaterMarkTime();
}
return false;
}
private boolean isValidIp(String ip) {
return StringUtils.isNotEmpty(ip) &&
!ip.equals("[]") &&
!ip.equals("null");
}
});
}
public void sinkDoris(DataStream<UserDailyActivityOutputLog> outputStream) {
//================= 配置入库字段 =================
String[] fields = {
......@@ -477,7 +295,6 @@ public class UserDailyActivityAchi extends MultipleSourceCommonBase implements S
row.setField(23, StringData.fromString(log.getZoneType()));
row.setField(24, StringData.fromString(log.getZoneCode()));
row.setField(25, 0); // 删除标记(0=正常数据)
logger.info(">>>>>>>>>>>>>sinkDoris row:{}",row.toString());
return (RowData)row;
} catch (Exception e) {
logger.error("数据映射失败: device_id={}, error={}",
......@@ -485,78 +302,197 @@ public class UserDailyActivityAchi extends MultipleSourceCommonBase implements S
return null;
}
})
.filter(Objects::nonNull)
.filter(obj -> obj != null)
.sinkTo(dorisSink)
.name("Doris-DailyActivitySink");
}
/**
* 关联设备日志和事件日志
*/
private DataStream<CombinedLog> joinDeviceAndEventLogs(
DataStream<DeviceLog> deviceLogStream,
DataStream<EventLog> eventLogStream) {
return deviceLogStream
.join(eventLogStream)
.where(device -> device.getDeviceId() + ":" + device.getDt())
.equalTo(event -> event.getDeviceId() + ":" + event.getDt())
.window(TumblingEventTimeWindows.of(Duration.ofDays(1)))
.apply(new JoinFunction<DeviceLog, EventLog, CombinedLog>() {
//私米业务处理(需要关联用户)
private static DataStream<UserDailyActivityOutputLog> processSimi(
DataStream<DailyActivityEnrichedLog> simiStream,
DataStream<SimiUserInfo> userStream) {
return simiStream.coGroup(userStream)
.where(log -> log.getCid() + "|" + log.getPhone())
.equalTo(user -> user.getCid() + "|" + user.getPhone_number())
.window(TumblingEventTimeWindows.of(Duration.ofMinutes(5)))
.apply(new CoGroupFunction<DailyActivityEnrichedLog, SimiUserInfo, UserDailyActivityOutputLog>() {
/**
*
*/
private static final long serialVersionUID = 1L;
@Override
public CombinedLog join(DeviceLog device, EventLog event) {
long firstTime =0L;
long latestTime =0L;
logger.info(">>>>>>>>joinDeviceAndEventLogs device:{},event:{}",device.toString(),event.toString());
return new CombinedLog(
device.getDeviceIdV1(),
device.getAppKey(),
device.getAppType(),
device.getDt(),
device.getModel(),
device.getBrand(),
device.getOsRelease(),
device.getAppVersion(),
event.getCid(),
event.getPhone(),
event.getNick(),
event.getIp(),
event.getAreaName(),
device.getWaterMarkTime(),
device.getZoneName(),
device.getZoneType(),
device.getZoneCode(),
firstTime,
latestTime
);
public void coGroup(Iterable<DailyActivityEnrichedLog> logs, Iterable<SimiUserInfo> users,
Collector<UserDailyActivityOutputLog> out) {
// 获取用户信息(取第一个匹配项)
String countryCode = null;
for (SimiUserInfo user : users) {
countryCode = user.getCountry_code();
break;
}
// 处理日志
for (DailyActivityEnrichedLog log : logs) {
UserDailyActivityOutputLog output = new UserDailyActivityOutputLog();
// 基础字段映射
output.setDeviceId(log.getDeviceIdV1());
output.setCid(log.getCid());
output.setAppKey(log.getAppKey());
output.setPlatform(log.getPlatform());
output.setAppType(log.getAppType());
output.setDt(log.getDt());
output.setCountryCode(countryCode);
output.setPhone(log.getPhone());
output.setNick(log.getNick());
output.setBrand(log.getBrand());
output.setModel(log.getModel());
output.setOsRelease(log.getOsRelease());
output.setAppVersion(log.getAppVersion());
output.setIp(log.getIp());
output.setAreaName(log.getAreaName());
output.setFirstTime(log.getFirstTime());
output.setLatestTime(log.getLatestTime());
output.setPhoneName(log.getPhoneName());
output.setNetworkModel(log.getNetworkModel());
output.setDeviceName(log.getBrand() + "-" + log.getPhoneName());
output.setZoneName(log.getZoneName());
output.setZoneType(log.getZoneType());
output.setZoneCode(log.getZoneCode());
out.collect(output);
}
}
})
.assignTimestampsAndWatermarks(
WatermarkStrategy.<CombinedLog>forBoundedOutOfOrderness(Duration.ofMinutes(5))
.withTimestampAssigner((event, timestamp) -> event.getWaterMarkTime()));
});
}
//无链业务处理(无需关联用户)
private static DataStream<UserDailyActivityOutputLog> processWulian(DataStream<DailyActivityEnrichedLog> wulianStream) {
return wulianStream.map(log -> {
// 电话号码拆分
UserDailyActivityOutputLog output = new UserDailyActivityOutputLog();
// 基础字段映射
output.setDeviceId(log.getDeviceIdV1());
output.setCid(log.getCid());
output.setAppKey(log.getAppKey());
output.setPlatform(log.getPlatform());
output.setAppType(log.getAppType());
output.setDt(log.getDt());
output.setCountryCode(log.getCountryCode());
output.setPhone(log.getPhone());
output.setNick(log.getNick());
output.setBrand(log.getBrand());
output.setModel(log.getModel());
output.setOsRelease(log.getOsRelease());
output.setAppVersion(log.getAppVersion());
output.setIp(log.getIp());
output.setAreaName(log.getAreaName());
output.setFirstTime(log.getFirstTime());
output.setLatestTime(log.getLatestTime());
output.setPhoneName(log.getPhoneName());
output.setNetworkModel(log.getNetworkModel());
output.setDeviceName(log.getBrand() + "-" + log.getPhoneName());
output.setZoneName(log.getZoneName());
output.setZoneType(log.getZoneType());
output.setZoneCode(log.getZoneCode());
return output;
});
}
/**
* 关联品牌手机维度表
*/
public DataStream<EnrichedLog> enrichWithPhoneModel(DataStream<CombinedLog> combinedLogStream) {
return AsyncDataStream.orderedWait(
combinedLogStream,
//设备信息关联事件信息通过deviceId+uniqueId
private DataStream<DailyActivityEnrichedLog> deviceJoinEventStream(DataStream<DailyActivityDeviceInfo> deviceStream,
DataStream<DailyActivityEventInfo> eventStream) {
DataStream<DailyActivityCombinedLog> combinedStream = deviceStream
.keyBy(dev -> dev.getDeviceId() + "#_#" + dev.getUniqueId())
.intervalJoin(eventStream.keyBy(evt -> evt.getDeviceId() + "#_#" + evt.getUniqueId()))
.between(Duration.ofMinutes(-10), Duration.ofMinutes(5))
.process(new ProcessJoinFunction<DailyActivityDeviceInfo, DailyActivityEventInfo, DailyActivityCombinedLog>() {
/**
*
*/
private static final long serialVersionUID = 1L;
@Override
public void processElement(DailyActivityDeviceInfo dev, DailyActivityEventInfo evt, Context ctx, Collector<DailyActivityCombinedLog> out) {
String appKey = dev.getAppKey();
String countryCode = null;
String phone = null;
if(CompareUtils.stringExists(appKey,
"8ooOvXJo276",
"9JQ3A7GA420")) {
String wulianPhone = evt.getPhone();
if (StringUtils.isNotEmpty(wulianPhone) && wulianPhone.contains(" ")) {
String[] parts = wulianPhone.split(" ", 2);
countryCode = parts[0];
phone = parts[1];
}
}else {
phone = evt.getPhone();
}
logger.info(">>>>>>>>>>>>>>>>>>> deviceJoinEventStream dev:{},evt:{}",dev,evt);
out.collect(new DailyActivityCombinedLog(
dev.getDeviceIdV1(), appKey, dev.getAppType(), dev.getDt(),
dev.getModel(), dev.getBrand(), dev.getOsRelease(), dev.getAppVersion(),
evt.getCid(), phone, evt.getNick(), evt.getIp(), evt.getAreaName(),
dev.getWaterMarkTime(), dev.getZoneName(), dev.getZoneType(), dev.getZoneCode(),
0L, 0L,dev.getDeviceName(),dev.getPlatform(),countryCode
));
}
});
// 2. 关联维度表
DataStream<DailyActivityEnrichedLog> enrichedStream = AsyncDataStream.orderedWait(
combinedStream,
new DatabaseAsyncFunction(),
30, // 超时时间30秒
TimeUnit.SECONDS,
100 // 最大并行请求数
10,
TimeUnit.SECONDS,
100
);
}
// 异步JDBC查询实现
private static class DatabaseAsyncFunction extends RichAsyncFunction<CombinedLog, EnrichedLog> {
//3. 计算设备时间
return enrichedStream
.keyBy(log -> log.getDeviceIdV1())
.process(new KeyedProcessFunction<String, DailyActivityEnrichedLog, DailyActivityEnrichedLog>() {
/**
*
*/
private static final long serialVersionUID = 1L;
private ValueState<Tuple2<Long, Long>> timeState;
@Override
public void open(Configuration params) {
ValueStateDescriptor<Tuple2<Long, Long>> descriptor =
new ValueStateDescriptor<>("deviceTimes", TypeInformation.of(new TypeHint<Tuple2<Long, Long>>() {}));
StateTtlConfig ttlConfig = StateTtlConfig.newBuilder(Duration.ofMinutes(10))
.setUpdateType(StateTtlConfig.UpdateType.OnReadAndWrite)
.cleanupInRocksdbCompactFilter(1000)
.build();
descriptor.enableTimeToLive(ttlConfig);
timeState = getRuntimeContext().getState(descriptor);
}
@Override
public void processElement(DailyActivityEnrichedLog log, Context ctx, Collector<DailyActivityEnrichedLog> out) throws Exception {
Tuple2<Long, Long> times = timeState.value();
long eventTime = log.getWaterMarkTime();
if (times == null) {
times = Tuple2.of(eventTime, eventTime);
} else {
times = Tuple2.of(
Math.min(times.f0, eventTime),
Math.max(times.f1, eventTime)
);
}
timeState.update(times);
log.setFirstTime(times.f0);
log.setLatestTime(times.f1);
out.collect(log);
}
});
}
// 异步JDBC查询实现
private static class DatabaseAsyncFunction extends RichAsyncFunction<DailyActivityCombinedLog, DailyActivityEnrichedLog> {
/**
*
......@@ -583,13 +519,11 @@ public class UserDailyActivityAchi extends MultipleSourceCommonBase implements S
}
@Override
public void asyncInvoke(CombinedLog input, ResultFuture<EnrichedLog> resultFuture) {
public void asyncInvoke(DailyActivityCombinedLog input, ResultFuture<DailyActivityEnrichedLog> resultFuture) {
// 3. 先尝试从缓存获取
PhoneModel cachedModel = cache.getIfPresent(input.getModel());
if (cachedModel != null) {
resultFuture.complete(Collections.singleton(
new EnrichedLog(input, cachedModel.phoneName, cachedModel.networkModel)
));
resultFuture.complete(Collections.singleton(new DailyActivityEnrichedLog(input, cachedModel.phoneName, cachedModel.networkModel)));
return;
}
......@@ -601,11 +535,8 @@ public class UserDailyActivityAchi extends MultipleSourceCommonBase implements S
"SELECT phone_name, 入网型号 AS network_model FROM brand_phone WHERE 入网型号 = ?")) {
stmt.setString(1, input.getModel());
stmt.setString(2, input.getModel());
ResultSet rs = stmt.executeQuery();
if (rs.next()) {
logger.info(">>>>>enrichWithPhoneModel phone_name:{},network_model:{}",rs.getString("phone_name"),rs.getString("network_model"));
PhoneModel model = new PhoneModel(
rs.getString("phone_name"),
rs.getString("network_model")
......@@ -623,7 +554,7 @@ public class UserDailyActivityAchi extends MultipleSourceCommonBase implements S
}).thenAccept(model -> {
// 7. 组装结果
resultFuture.complete(Collections.singleton(
new EnrichedLog(input, model.phoneName, model.networkModel)
new DailyActivityEnrichedLog(input, model.phoneName, model.networkModel)
));
});
}
......@@ -648,269 +579,149 @@ public class UserDailyActivityAchi extends MultipleSourceCommonBase implements S
}
}
}
// private DataStream<EnrichedLog> enrichWithPhoneModel(DataStream<CombinedLog> combinedLogStream) {
// return AsyncDataStream.orderedWait(
// combinedLogStream,
// new RichAsyncFunction<CombinedLog, EnrichedLog>() {
// /**
// *
// */
// private static final long serialVersionUID = 1L;
// private transient Connection connection;
//
// @Override
// public void open(Configuration parameters) throws Exception {
// Class.forName("com.mysql.cj.jdbc.Driver");
// connection = DriverManager.getConnection(
// "jdbc:mysql://10.0.1.213:3306/spider?useUnicode=true&characterEncoding=utf8&zeroDateTimeBehavior=convertToNull&useSSL=true&serverTimezone=GMT%2B8&nullCatalogMeansCurrent=true",
// "bigdata", "Im#Social&20181*29#im");
// }
//
// @Override
// public void asyncInvoke(CombinedLog input, ResultFuture<EnrichedLog> resultFuture) {
// CompletableFuture.supplyAsync((Supplier<Void>) () -> {
// try (PreparedStatement stmt = connection.prepareStatement(
// "SELECT phone_name, 入网型号 AS network_model FROM brand_phone WHERE 入网型号 = ?")) {
//
// stmt.setString(1, input.getModel());
// ResultSet rs = stmt.executeQuery();
//
// if (rs.next()) {
// logger.info(">>>>>enrichWithPhoneModel phone_name:{},network_model:{}",rs.getString("phone_name"),rs.getString("network_model"));
// resultFuture.complete(Collections.singleton(
// new EnrichedLog(input,
// rs.getString("phone_name"),
// rs.getString("network_model"))
// ));
// } else {
// resultFuture.complete(Collections.singleton(
// new EnrichedLog(input, null, null)
// ));
// }
// } catch (Exception e) {
// resultFuture.completeExceptionally(e);
//// resultFuture.complete(Collections.singleton(
//// new EnrichedLog(input, null, null)
//// ));
// }
// return null;
// });
// }
//
// @Override
// public void close() {
// if (connection != null) {
// try { connection.close(); }
// catch (Exception e) { logger.error("MySQL connection close error", e); }
// }
// }
// },
// 10, // 超时时间5秒
// TimeUnit.SECONDS,
// 50 // 最大并行请求数
// );
// }
/**
* 按设备ID和日期分组,取最优记录
*/
private DataStream<EnrichedLog> rankAndFilter(DataStream<EnrichedLog> enrichedLogStream) {
return enrichedLogStream
.keyBy(log -> log.getDeviceIdV1() + ":" + log.getDt())
.process(new KeyedProcessFunction<String, EnrichedLog, EnrichedLog>() {
/**
*
*/
private static final long serialVersionUID = 1L;
// 状态1:存储每个设备的最优记录(IP+时间策略)
private MapState<String, EnrichedLog> bestRecordState;
// 状态2:存储设备的首次/最新时间(跨日期持续更新)
private MapState<String, Tuple2<Long, Long>> deviceTimeState;
@Override
public void open(Configuration parameters) {
// 初始化最优记录状态
MapStateDescriptor<String, EnrichedLog> bestRecordDesc =
new MapStateDescriptor<>(
"bestRecords",
TypeInformation.of(String.class),
TypeInformation.of(EnrichedLog.class)
);
StateTtlConfig ttl = StateTtlConfig.newBuilder(Duration.ofDays(2)).build();
bestRecordDesc.enableTimeToLive(ttl);
bestRecordState = getRuntimeContext().getMapState(bestRecordDesc);
// 初始化时间状态(永久存储设备生命周期时间)
MapStateDescriptor<String, Tuple2<Long, Long>> timeDesc =
new MapStateDescriptor<>(
"deviceTimeState",
TypeInformation.of(String.class),
TypeInformation.of(new TypeHint<Tuple2<Long, Long>>() {})
);
deviceTimeState = getRuntimeContext().getMapState(timeDesc);
}
@Override
public void processElement(EnrichedLog log, Context ctx, Collector<EnrichedLog> out) throws Exception {
String key = log.getDeviceIdV1() + ":" + log.getDt();
String deviceKey = log.getDeviceIdV1();
//设备信息合并
private DataStream<DailyActivityDeviceInfo> mergedDeviceStream(DataStreamSource<String> collectLogStreamSource,
DataStreamSource<String> pcCollectLogStreamSource) {
return collectLogStreamSource.flatMap(new FlatMapFunction<String, DailyActivityDeviceInfo>() {
private static final long serialVersionUID = 1L;
// 更新首次/最新时间
Tuple2<Long, Long> times = deviceTimeState.get(deviceKey);
if (times == null) {
times = Tuple2.of(log.getWaterMarkTime(), log.getWaterMarkTime());
} else {
times = Tuple2.of(
Math.min(times.f0, log.getWaterMarkTime()),
Math.max(times.f1, log.getWaterMarkTime())
);
}
deviceTimeState.put(deviceKey, times); // 更新设备时间状态
log.setFirstTime(times.f0); // 注入首次时间
log.setLatestTime(times.f1); // 注入最新时间
@Override
public void flatMap(String value, Collector<DailyActivityDeviceInfo> out) throws Exception {
try {
// 解析 Kafka 数据
DailyActivityDeviceInfo device = BulidDailyParams.handleDeviceData(value);
if (device != null)
out.collect(device);
} catch (Exception e) {
logger.error("Error parsing ods_new_collect_log 处理 Kafka 消息出错 | data:{} | error:{}", value,e.getMessage());
}
}
}).union(
// PC设备信息数据流处理
pcCollectLogStreamSource.flatMap(new FlatMapFunction<String, DailyActivityDeviceInfo>() {
private static final long serialVersionUID = 1L;
@Override
public void flatMap(String value, Collector<DailyActivityDeviceInfo> out) throws Exception {
try {
// 解析 Kafka 数据
DailyActivityDeviceInfo device = BulidDailyParams.handlePcDeviceData(value);
if (device != null)
out.collect(device);
} catch (Exception e) {
logger.error("Error parsing ods_pc_collect_log 处理 Kafka 消息出错 | data:{} | error:{}", value,e.getMessage());
}
}
})
)
.assignTimestampsAndWatermarks(WatermarkStrategy.<DailyActivityDeviceInfo>forBoundedOutOfOrderness(Duration.ofSeconds(5))
.withTimestampAssigner((device, ts) -> device.getWaterMarkTime()));
}
// 用户数据合并
private DataStream<SimiUserInfo> mergedUserDataStream(DataStreamSource<String> abroadUserStreamSource,
DataStreamSource<String> userStreamSource) {
return abroadUserStreamSource.flatMap(new FlatMapFunction<String, SimiUserInfo>() {
private static final long serialVersionUID = 1L;
@Override
public void flatMap(String value, Collector<SimiUserInfo> out) throws Exception {
try {
// 解析 Kafka 数据
SimiUserInfo simiUserInfo = JSONObject.parseObject(value, new TypeReference<SimiUserInfo>() {
});
simiUserInfo.setUpdateTime(TimeConvertUtil.convertToTimestamp(simiUserInfo.getCreate_time()));
out.collect(simiUserInfo);
} catch (Exception e) {
logger.error("Error parsing abroad_simi_user_list 处理 Kafka 消息出错 | data:{} | error:{}", value,e.getMessage());
}
}
}).union(
userStreamSource.flatMap(new FlatMapFunction<String, SimiUserInfo>() {
private static final long serialVersionUID = 1L;
// IP优先级策略筛选最优记录
EnrichedLog currentBest = bestRecordState.get(key);
if (currentBest == null || isBetter(log, currentBest)) {
logger.info(">>>>>>>>>>>>>>rankAndFilter key:{},log:{}",key,log.toString());
bestRecordState.put(key, log);
out.collect(log); // 输出当前最优记录
}
}
private boolean isBetter(EnrichedLog newLog, EnrichedLog currentBest) {
boolean newValid = isValidIp(newLog.getIp());
boolean currentValid = isValidIp(currentBest.getIp());
if (newValid && !currentValid) return true;
if (newValid == currentValid) {
return newLog.getWaterMarkTime() > currentBest.getWaterMarkTime();
}
return false;
}
private boolean isValidIp(String ip) {
return StringUtils.isNotEmpty(ip)
&& !ip.equals("[]")
&& !ip.equals("null");
}
});
}
/**
* 关联用户信息
*/
private DataStream<UserDailyActivityOutputLog> joinUserInfo(
DataStream<EnrichedLog> finalLogStream,
DataStream<SimiUserInfo> userDataStream) {
return finalLogStream
.coGroup(userDataStream)
.where(log -> log.getCid() + "|" + log.getPhone())
.equalTo(user -> user.getCid() + "|" + user.getPhone_number())
.window(TumblingEventTimeWindows.of(Duration.ofMinutes(5)))
.apply(new CoGroupFunction<EnrichedLog, SimiUserInfo, UserDailyActivityOutputLog>() {
/**
*
*/
private static final long serialVersionUID = 1L;
@Override
public void flatMap(String value, Collector<SimiUserInfo> out) throws Exception {
try {
// 解析 Kafka 数据
SimiUserInfo simiUserInfo = JSONObject.parseObject(value, new TypeReference<SimiUserInfo>() {
});
simiUserInfo.setUpdateTime(TimeConvertUtil.convertToTimestamp(simiUserInfo.getCreate_time()));
out.collect(simiUserInfo);
} catch (Exception e) {
logger.error("Error parsing simi_user_list 处理 Kafka 消息出错 | data:{} | error:{}", value,e.getMessage());
}
}
})
).filter(u -> StringUtils.isNoneEmpty(u.getCid(), u.getPhone_number()))
.assignTimestampsAndWatermarks(
WatermarkStrategy.<SimiUserInfo>forBoundedOutOfOrderness(Duration.ofMinutes(5))
.withTimestampAssigner((user, ts) -> user.getUpdateTime()))
.keyBy(user -> user.getCid() + "#_#" + user.getPhone_number()).process(new LatestUserProcessFunction());
}
//事件信息合并
private DataStream<DailyActivityEventInfo> mergedEventStream(DataStreamSource<String> eventStreamSource,
DataStreamSource<String> pcEventStreamSource) {
return eventStreamSource.flatMap(new FlatMapFunction<String, DailyActivityEventInfo>() {
private static final long serialVersionUID = 1L;
@Override
public void flatMap(String value, Collector<DailyActivityEventInfo> out) throws Exception {
try {
// 解析 Kafka 数据
OdsEventLog odsEventLog = JSONObject.parseObject(value, new TypeReference<OdsEventLog>() {});
if (null == odsEventLog) {
return;
}
List<EventList> eventList = JSONObject.parseObject(odsEventLog.getEvent_list(), new TypeReference<List<EventList>>() {});
if (CollectionUtils.isEmpty(eventList)) {
return;
}
for(EventList event : eventList) {
DailyActivityEventInfo eventInfo = BulidDailyParams.handleEventData(odsEventLog,event);
if (eventInfo != null)
out.collect(eventInfo);
}
} catch (Exception e) {
logger.error("Error parsing ods_event_log 处理 Kafka 消息出错 | data:{} | error:{}", value,e.getMessage());
}
}
}).union(
// PC事件信息数据流处理
pcEventStreamSource.flatMap(new FlatMapFunction<String, DailyActivityEventInfo>() {
private static final long serialVersionUID = 1L;
@Override
public void coGroup(Iterable<EnrichedLog> logs, Iterable<SimiUserInfo> users, Collector<UserDailyActivityOutputLog> out) {
// 1. 预处理用户数据(取最新或第一条)
String countryCode = null;
for (SimiUserInfo user : users) {
if (user.getCountry_code() != null) {
countryCode = user.getCountry_code();
break; // 取第一个有效值
}
}
// 处理主数据流
for (EnrichedLog log : logs) {
UserDailyActivityOutputLog output = new UserDailyActivityOutputLog();
// 基础字段映射
output.setDeviceId(log.getDeviceIdV1());
output.setCid(log.getCid());
output.setAppKey(log.getAppKey());
output.setPlatform(getPlatformName(log.getAppKey()));
output.setAppType(log.getAppType());
output.setDt(log.getDt());
output.setCountryCode(countryCode);
output.setPhone(log.getPhone());
output.setNick(log.getNick());
output.setBrand(log.getBrand());
output.setModel(log.getModel());
output.setOsRelease(log.getOsRelease());
output.setAppVersion(log.getAppVersion());
output.setIp(log.getIp());
output.setAreaName(log.getAreaName());
output.setFirstTime(log.getFirstTime());
output.setLatestTime(log.getLatestTime());
output.setPhoneName(log.getPhoneName());
output.setNetworkModel(log.getNetworkModel());
output.setDeviceName(log.getBrand() + "-" + log.getPhoneName());
output.setZoneName(log.getZoneName());
output.setZoneType(log.getZoneType());
output.setZoneCode(log.getZoneCode());
// 关联用户维度数据
// for (SimiUserInfo user : users) {
// output.setCountryCode(user.getCountry_code());
// break; // 取第一个匹配项
// }
logger.info(">>>>>>>>>>joinUserInfo output:{}",output.toString());
out.collect(output);
}
}
});
}
private String getPlatformName(String appKey) {
switch (appKey) {
case "ptyzTPaV207": return "私米安卓国内版";
case "giHQ1YLp925": return "私米IOS国内版";
default: return "未知平台";
}
}
// 设备时间计算器
public static class DeviceTimeCalculator extends KeyedProcessFunction<String, EnrichedLog, EnrichedLog> {
/**
*
*/
private static final long serialVersionUID = 1L;
private ValueState<Tuple2<Long, Long>> timeState;
@Override
public void open(Configuration parameters) {
ValueStateDescriptor<Tuple2<Long, Long>> descriptor =
new ValueStateDescriptor<>("deviceTimes", TypeInformation.of(new TypeHint<Tuple2<Long, Long>>() {}));
StateTtlConfig ttlConfig = StateTtlConfig.newBuilder(Duration.ofDays(30)).build();
descriptor.enableTimeToLive(ttlConfig);
timeState = getRuntimeContext().getState(descriptor);
}
@Override
public void processElement(EnrichedLog log, Context ctx, Collector<EnrichedLog> out) throws Exception {
Tuple2<Long, Long> times = timeState.value();
long eventTime = log.getWaterMarkTime();
if (times == null) {
times = Tuple2.of(eventTime, eventTime);
} else {
times.f0 = Math.min(times.f0, eventTime);
times.f1 = Math.max(times.f1, eventTime);
}
timeState.update(times);
logger.info(">>>>>>>>>DeviceTimeCalculator FirstTime:{},LatestTime:{}",times.f0,times.f1);
log.setFirstTime(times.f0);
log.setLatestTime(times.f1);
out.collect(log);
}
}
@Override
public void flatMap(String value, Collector<DailyActivityEventInfo> out) throws Exception {
try {
// 解析 Kafka 数据
PcOdsEventLog pcOdsEventLog = JSONObject.parseObject(value, new TypeReference<PcOdsEventLog>() {});
String event_info = pcOdsEventLog.getEvent_info();
if(StringUtils.isEmpty(event_info)) {
return;
}
PcEventInfo pcEventInfo = JSONObject.parseObject(event_info, new TypeReference<PcEventInfo>() {});
if(null == pcEventInfo) {
return;
}
List<PcProperties> properties = pcEventInfo.getProperties();
if (CollectionUtils.isEmpty(properties)) {
return;
}
for(PcProperties pcProperties : properties) {
DailyActivityEventInfo eventInfo = BulidDailyParams.handlePcEventData(pcOdsEventLog,pcEventInfo,pcProperties);
if (eventInfo != null)
out.collect(eventInfo);
}
} catch (Exception e) {
logger.error("Error parsing ods_pc_event_log 处理 Kafka 消息出错 | data:{} | error:{}", value,e.getMessage());
}
}
})
)
.assignTimestampsAndWatermarks(WatermarkStrategy.<DailyActivityEventInfo>forBoundedOutOfOrderness(Duration.ofSeconds(5))
.withTimestampAssigner((event, ts) -> event.getWaterMarkTime()));
}
}
......@@ -247,7 +247,7 @@ public class VectorAngleCalculationAchi extends MultipleSourceCommonBase impleme
@Override
public Tuple2<String, Long> getKey(PointRecord r) {
logger.info("KeyedStream >> id:{},eventTime:{}",r.id,r.eventTime);
// logger.info("KeyedStream >> id:{},eventTime:{}",r.id,r.eventTime);
return Tuple2.of(r.id, r.eventTime);
}
......
......@@ -41,7 +41,7 @@ public class JoinDeviceWithRegistrationProcessFunction extends KeyedCoProcessFun
DeviceId device = deviceState.value();
if (device != null) {
//输出三元组: (设备ID, 用户ID, 注册时间)
logger.info("输出三元组 设备ID:{}, 用户ID:{}, 注册时间:{}",device.getDeviceId(), user.getCid(), user.getUpdateTime());
// logger.info("输出三元组 设备ID:{}, 用户ID:{}, 注册时间:{}",device.getDeviceId(), user.getCid(), user.getUpdateTime());
out.collect(new Tuple3<>(device.getDeviceId(), user.getCid(), user.getUpdateTime()));
}
}
......
......@@ -78,9 +78,9 @@ public class KeyPointSelector extends KeyedProcessFunction<Tuple2<String, Long>,
// 3. 生成坐标数组字符串
String vectorArray = generateVectorArray(keyPoints);
logger.info(">>>>>>>>KeyPointSelector id:{},eventTime:{},vectorArray:{}",ctx.getCurrentKey().f0,
ctx.getCurrentKey().f1,
vectorArray);
// logger.info(">>>>>>>>KeyPointSelector id:{},eventTime:{},vectorArray:{}",ctx.getCurrentKey().f0,
// ctx.getCurrentKey().f1,
// vectorArray);
// 4. 输出结果
out.collect(new ResultOutput(
......
......@@ -154,7 +154,7 @@ public class PointRecordJoinProcessor extends CoProcessFunction<EventLogToJsonSo
public void onTimer(long timestamp, OnTimerContext ctx, Collector<PointRecord> out) {
try {
pendingEventLog.remove(timestamp - 60000);
logger.info("Cleaned expired events at {}", timestamp);
// logger.info("Cleaned expired events at {}", timestamp);
} catch (Exception e) {
logger.error("Timer error: {}", e.getMessage(), e);
}
......
......@@ -42,7 +42,7 @@ public class VectorAngleProcessor extends KeyedProcessFunction<Tuple2<String, Lo
public void processElement(PointRecord record,
KeyedProcessFunction<Tuple2<String, Long>, PointRecord, ResultRecord>.Context ctx,
Collector<ResultRecord> out) throws Exception {
logger.info("processElement >>>start!");
// logger.info("processElement >>>start!");
VectorState state = vectorState.value();
double vectorX, vectorY, vectorM, pointV;
// 处理第一条记录
......@@ -87,24 +87,24 @@ public class VectorAngleProcessor extends KeyedProcessFunction<Tuple2<String, Lo
record.resolutionX,
record.resolutionY
);
logger.info("VectorAngleProcessor processElement >>>end! id:{},eventTime:{},rowNum:{},"
+ "positionX:{},positionY:{},vectorX:{},vectorY:{},vectorM:{},"
+ "pointV:{},cosV:{},angleV:{},radianV:{},resolutionX:{},resolutionY:{}",
record.id,
record.eventTime,
record.rowNum,
record.positionX,
record.positionY,
vectorX,
vectorY,
vectorM,
pointV,
cosV,
angleV,
radianV,
record.resolutionX,
record.resolutionY
);
// logger.info("VectorAngleProcessor processElement >>>end! id:{},eventTime:{},rowNum:{},"
// + "positionX:{},positionY:{},vectorX:{},vectorY:{},vectorM:{},"
// + "pointV:{},cosV:{},angleV:{},radianV:{},resolutionX:{},resolutionY:{}",
// record.id,
// record.eventTime,
// record.rowNum,
// record.positionX,
// record.positionY,
// vectorX,
// vectorY,
// vectorM,
// pointV,
// cosV,
// angleV,
// radianV,
// record.resolutionX,
// record.resolutionY
// );
out.collect(result);
// 更新状态(当前记录成为下一条的"前一条")
......
......@@ -93,7 +93,7 @@ public class VectorDifferenceProcessor extends KeyedProcessFunction<Tuple2<Strin
// 创建包含两个元素的数组[x, y]
double[] coordinate = {convertedX, convertedY};
coordinateList.add(coordinate);
logger.info(">>>>>>>>准备输出要计算的数据: vectorDiffX:{},resolutionX:{},vectorDiffY:{},resolutionY:{}",point.vectorDiffX,point.resolutionX,point.vectorDiffY,point.resolutionY);
// logger.info(">>>>>>>>准备输出要计算的数据: vectorDiffX:{},resolutionX:{},vectorDiffY:{},resolutionY:{}",point.vectorDiffX,point.resolutionX,point.vectorDiffY,point.resolutionY);
}
// 生成[[x1,y1],[x2,y2],...]格式的二维数组
// 注意:实际输出时可以直接使用coordinateList.toString(),但格式需要微调
......@@ -114,7 +114,7 @@ public class VectorDifferenceProcessor extends KeyedProcessFunction<Tuple2<Strin
Long eventTime = ctx.getCurrentKey().f1;
// String dt = String.format("%tF", eventTime); // yyyy-MM-dd格式
logger.info(">>>>>>>>准备输出 最终结果: id:{},eventTime:{},vectorArray:{}",ctx.getCurrentKey().f0,eventTime,resultBuilder.toString());
// logger.info(">>>>>>>>准备输出 最终结果: id:{},eventTime:{},vectorArray:{}",ctx.getCurrentKey().f0,eventTime,resultBuilder.toString());
out.collect(new ResultOutput(
ctx.getCurrentKey().f0, // id
......
......@@ -90,7 +90,7 @@ public class VectorSimilarityProcessor extends KeyedProcessFunction<String, Vect
// 3. 输出结果
for (SimilarityResult result : results) {
logger.info("VectorSimilarityProcessor 结果输入>>>>>>>>>>>>>> pairId:{},isSimilar:{},avgDistance:{} ",result.pairId,result.isSimilar,result.avgDistance);
// logger.info("VectorSimilarityProcessor 结果输入>>>>>>>>>>>>>> pairId:{},isSimilar:{},avgDistance:{} ",result.pairId,result.isSimilar,result.avgDistance);
out.collect(result);
}
......
......@@ -57,7 +57,7 @@ public class WindowResultFunction extends ProcessWindowFunction<Integer, DeviceR
if (!state.contains(deviceId)) {
state.put(deviceId, true);
}
logger.info("窗口窗口结果处理》》》 deviceId:{},windowEnd:{},count:{}",deviceId,System.currentTimeMillis(),userCount);
// logger.info("窗口窗口结果处理》》》 deviceId:{},windowEnd:{},count:{}",deviceId,System.currentTimeMillis(),userCount);
//输出窗口结果(设备ID, 窗口结束时间, 用户数)
out.collect(new DeviceRegistrationResult(
deviceId,
......
......@@ -72,11 +72,11 @@ public class OkHttpService{
Response response = client.newCall(request).execute();
if (response.isSuccessful()) {
String resultStr = response.body().string();
logger.info("OkHttpService 响应内容 body:{},friendsType:{},cid:{}",resultStr,cid);
// logger.info("OkHttpService 响应内容 body:{},friendsType:{},cid:{}",resultStr,cid);
SimiInterfaceBase resBody = JSONObject.parseObject(resultStr,new TypeReference<SimiInterfaceBase>(){});
result = resBody.getData();
} else {
logger.info("OkHttpService error 响应内容friendsType:{},cid:{}",cid);
// logger.info("OkHttpService error 响应内容friendsType:{},cid:{}",cid);
}
} catch (Exception e) {
logger.error("OkHttpService get e:{}",e.toString());
......
......@@ -89,7 +89,7 @@ public class SearcherUtil {
public static void main(String[] args) throws Exception {
// getCityInfoByFile("1.9.241.214");
String str = "1.1.1.1,2.2.2.2,5.5.5.5";
String str = "192.168.1.1,111.193.48.161,61.48.41.5,124.65.242.25,221.222.117.237,27.221.85.218";
System.out.println(convertStringToList(str));
}
}
......@@ -18,7 +18,7 @@ public class PcProperties implements Serializable{
*/
private static final long serialVersionUID = 1L;
private String r9;
private Long r9;
private String r4;
private String r3;
}
......@@ -26,6 +26,8 @@ public class Properties implements Serializable{
private String timeDifference;
private Long endTime;
private String userId;
private String id;
private String type;
public String getR1() {
......@@ -94,5 +96,16 @@ public class Properties implements Serializable{
public void setUserId(String userId) {
this.userId = userId;
}
public String getId() {
return id;
}
public void setId(String id) {
this.id = id;
}
public String getType() {
return type;
}
public void setType(String type) {
this.type = type;
}
}
package com.flink.vo;
import java.io.Serializable;
import lombok.Data;
import lombok.ToString;
/**
* @author wjs
* @version 创建时间:2025-8-14 14:32:26
* 类说明
*/
@Data
@ToString
public class RealStaking implements Serializable{
/**
*
*/
private static final long serialVersionUID = 1L;
private String tx_index;
private String tx_hash;
private String block_height;
private String block_timestamp;
private String from_account_id;
private String to_account_id;
private String is_relayer;
private String amount;
private String symbol;
private String post_time;
}
package com.flink.vo.userDailyActivity;
import java.util.List;
import org.apache.commons.collections.CollectionUtils;
import org.apache.commons.lang3.StringUtils;
import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONObject;
import com.alibaba.fastjson.TypeReference;
import com.flink.processor.function.GenDeviceIdProcessor;
import com.flink.processor.function.UserPropertiesProcessor;
import com.flink.util.CompareUtils;
import com.flink.util.TimeConvertUtil;
import com.flink.util.ip2region.SearcherUtil;
import com.flink.vo.DeviceIdInfo;
import com.flink.vo.EventList;
import com.flink.vo.OdsCollectLog;
import com.flink.vo.OdsEventLog;
import com.flink.vo.PcCollectLog;
import com.flink.vo.PcDeviceInfo;
import com.flink.vo.PcEventInfo;
import com.flink.vo.PcOdsEventLog;
import com.flink.vo.PcProperties;
import com.flink.vo.UserProperties;
/**
* @author wjs
* @version 创建时间:2025-8-13 17:37:32 类说明 构建日活参数
*/
public class BulidDailyParams {
// 处理APP设备ID数据
public static DailyActivityDeviceInfo handleDeviceData(String value) throws Exception {
OdsCollectLog log = JSON.parseObject(value, OdsCollectLog.class);
if (log == null)
return null;
String deviceId = log.getDevice_id();
String uniqueId = log.getUnique_id();
String appType = log.getApp_type();
String appKey = log.getApp_key();
String other_info = log.getOther_info();
String device_info = log.getDevice_info();
String env_info = log.getEnv_info();
String createTime = log.getCreate_time();
DeviceIdInfo deviceIdInfo = GenDeviceIdProcessor.genDeviceId(appType, appKey, other_info, device_info,
env_info);
UserProperties userProperties = UserPropertiesProcessor.userPropertiesToJson(log.getUser_properties());
if (null == deviceIdInfo) {
return null;
}
if (StringUtils.isEmpty(deviceIdInfo.getDeviceIdV1())) {
return null;
}
if (null == userProperties) {
return null;
}
if (StringUtils.isEmpty(userProperties.getCid())) {
return null;
}
if (StringUtils.isEmpty(userProperties.getPhone())) {
return null;
}
return new DailyActivityDeviceInfo(deviceId, deviceIdInfo.getDeviceIdV1(), appKey, uniqueId, appType,
createTime.substring(0, 10), deviceIdInfo.getModel(), deviceIdInfo.getBrand(),
deviceIdInfo.getOsRelease(), deviceIdInfo.getAppVersion(),
TimeConvertUtil.convertToTimestamp(createTime), log.getZone_name(), log.getZone_type(),
log.getZone_code(),getPlatformByAppKey(appKey),null);
}
// 处理PC设备ID数据
public static DailyActivityDeviceInfo handlePcDeviceData(String value) throws Exception {
PcCollectLog log = JSONObject.parseObject(value, new TypeReference<PcCollectLog>() {
});
if (null == log) {
return null;
}
String appType = log.getApp_type();
String appKey = log.getApp_key();
String device_info = log.getDevice_info();
String createTime = log.getCreate_time();
if (StringUtils.isEmpty(device_info)) {
return null;
}
PcDeviceInfo pcDeviceInfo = JSONObject.parseObject(device_info, new TypeReference<PcDeviceInfo>() {
});
if (null == pcDeviceInfo) {
return null;
}
if (StringUtils.isEmpty(pcDeviceInfo.getCid())) {
return null;
}
if (StringUtils.isEmpty(pcDeviceInfo.getPhone())) {
return null;
}
String deviceId = pcDeviceInfo.getI8();
String deviceName = pcDeviceInfo.getB2() + "-" + pcDeviceInfo.getB3();
return new DailyActivityDeviceInfo(deviceId, deviceId, appKey, deviceId, appType, createTime.substring(0, 10),
pcDeviceInfo.getB3(), pcDeviceInfo.getB2(), pcDeviceInfo.getB4(), log.getApp_version(),
TimeConvertUtil.convertToTimestamp(createTime), log.getZone_name(), log.getZone_type(),
log.getZone_code(),getPlatformByAppKey(appKey),deviceName);
}
// 处理APP事件数据
public static DailyActivityEventInfo handleEventData(OdsEventLog odsEventLog, EventList event) throws Exception {
String deviceId = odsEventLog.getDevice_id();
String uniqueId = odsEventLog.getUnique_id();
String appType = odsEventLog.getApp_type();
String appKey = odsEventLog.getApp_key();
String createTime = odsEventLog.getCreate_time();
String routeIp = odsEventLog.getRoute_ip();
String userProperties = odsEventLog.getUser_properties();
if (StringUtils.isEmpty(appKey) || StringUtils.equals(appKey, "C7jias27jias2")) {
appKey = "8ooOvXJo276";
}
String cid = null;
String phone = null;
String nick = null;
if (StringUtils.isNotEmpty(userProperties)) {
List<UserProperties> userPropertiesList = JSONObject.parseObject(userProperties,
new TypeReference<List<UserProperties>>() {
});
if (userPropertiesList != null && userPropertiesList.size() > 0) {
for (UserProperties user : userPropertiesList) {
if (StringUtils.isNotEmpty(user.getCid())) {
cid = user.getCid();
} else if (StringUtils.isNotEmpty(user.getPhone())) {
phone = user.getPhone();
} else if (StringUtils.isNotEmpty(user.getId())) {
cid = user.getId();
} else if (StringUtils.isNotEmpty(user.getNick())) {
nick = user.getNick();
} else if (StringUtils.isNotEmpty(user.getEmail())) {
nick = user.getEmail();
}
}
}
}
List<String> ips = SearcherUtil.convertStringToList(routeIp);
if (CollectionUtils.isEmpty(ips)) {
return null;
}
String ip_name = null;
String area_name = null;
for (String ip : ips) {
if (!SearcherUtil.ipv6(ip)) {
area_name = SearcherUtil.getCityInfoByFile(ip);
if (!CompareUtils.stringExists(area_name, "0|0|0|内网IP|内网IP",
"0|0|0|内网IP|Finance-and-Promoting-Technology")) {
ip_name = ip;
break;
} else {
ip_name = null;
area_name = null;
}
}
}
if (StringUtils.isEmpty(ip_name)) {
return null;
}
return new DailyActivityEventInfo(deviceId, uniqueId, cid, phone, nick, ip_name, area_name,
TimeConvertUtil.parseToStringSSS(event.getR9()), TimeConvertUtil.convertToTimestamp(createTime), appKey,
appType, createTime, createTime.substring(0, 10),getPlatformByAppKey(appKey));
}
// 处理PC事件数据
public static DailyActivityEventInfo handlePcEventData(PcOdsEventLog pcOdsEventLog, PcEventInfo pcEventInfo,
PcProperties pcProperties) throws Exception {
String appKey = pcOdsEventLog.getApp_key();
String appType = pcOdsEventLog.getApp_type();
String createTime = pcOdsEventLog.getCreate_time();
String cid = pcEventInfo.getCid();
String phone = pcEventInfo.getPhone();
String nick = pcEventInfo.getNick();
if (StringUtils.isEmpty(cid)) {
return null;
}
if (StringUtils.isEmpty(phone)) {
return null;
}
String routeIp = pcEventInfo.getS1();
if (StringUtils.isEmpty(routeIp)) {
return null;
}
List<String> ips = SearcherUtil.convertStringToList(routeIp);
if (CollectionUtils.isEmpty(ips)) {
return null;
}
String ip_name = null;
String area_name = null;
for (String ip : ips) {
if (!SearcherUtil.ipv6(ip)) {
area_name = SearcherUtil.getCityInfoByFile(ip);
if (!CompareUtils.stringExists(area_name, "0|0|0|内网IP|内网IP",
"0|0|0|内网IP|Finance-and-Promoting-Technology", "Request timed out.", "*")) {
ip_name = ip;
break;
} else {
ip_name = null;
area_name = null;
}
}
}
if (StringUtils.isEmpty(ip_name)) {
return null;
}
return new DailyActivityEventInfo(pcEventInfo.getI8(), pcEventInfo.getI8(), cid, phone, nick, ip_name,
area_name, TimeConvertUtil.parseToStringSSS(pcProperties.getR9()),
TimeConvertUtil.convertToTimestamp(createTime), appKey, appType, createTime,
createTime.substring(0, 10),getPlatformByAppKey(appKey));
}
private static String getPlatformByAppKey(String appKey) {
switch (appKey) {
// 无链平台
case "8ooOvXJo276":
return "无链安卓国内版";
case "9JQ3A7GA420":
return "无链IOS海外版";
// 私米平台
case "ptyzTPaV207":
return "私米安卓国内版";
case "giHQ1YLp925":
return "私米IOS国内版";
case "lOxLJYzx658":
return "私米安卓海外版";
case "lcALJYzx932":
return "私米IOS海外版";
// pc 国内版
case "pc1KPjmh951":
return "Win国内版";
case "pcrIjvC5805":
return "Linux国内版";
case "pcUXtmMh356":
return "MacIntel国内版";
case "pcrPGB1z531":
return "MacArm国内版";
// pc 海外版
case "pcRIhwh1380":
return "Win海外版";
case "pcQmdNl0952":
return "Linux海外版";
case "pc1etTC6207":
return "MacIntel海外版";
case "pcd9Sa8T989":
return "MacArm海外版";
default:
return "未知平台";
}
}
}
package com.flink.vo.userDailyActivity;
import java.io.Serializable;
import lombok.Data;
import lombok.ToString;
/**
* @author wjs
* @version 创建时间:2025-8-1 16:17:44 类说明
*/
@Data
@ToString
public class DailyActivityCombinedLog implements Serializable {
/**
*
*/
private static final long serialVersionUID = 1L;
private String deviceIdV1;
private String appKey;
private String appType;
private String dt;
private String model;
private String brand;
private String osRelease;
private String appVersion;
private String cid;
private String phone;
private String nick;
private String ip;
private String areaName;
private long waterMarkTime;
private String zoneName;
private String zoneType;
private String zoneCode;
private long firstTime;
private long latestTime;
private String platform;
private String deviceName;
private String countryCode;
public DailyActivityCombinedLog(String deviceIdV1, String appKey, String appType, String dt, String model,
String brand, String osRelease, String appVersion, String cid, String phone, String nick, String ip,
String areaName, long waterMarkTime, String zoneName, String zoneType, String zoneCode, long firstTime,
long latestTime, String deviceName, String platform,String countryCode) {
super();
this.deviceIdV1 = deviceIdV1;
this.appKey = appKey;
this.appType = appType;
this.dt = dt;
this.model = model;
this.brand = brand;
this.osRelease = osRelease;
this.appVersion = appVersion;
this.cid = cid;
this.phone = phone;
this.nick = nick;
this.ip = ip;
this.areaName = areaName;
this.waterMarkTime = waterMarkTime;
this.zoneName = zoneName;
this.zoneType = zoneType;
this.zoneCode = zoneCode;
this.firstTime = firstTime;
this.latestTime = latestTime;
this.deviceName = deviceName;
this.platform = platform;
this.countryCode = countryCode;
}
}
package com.flink.vo.userDailyActivity;
import java.io.Serializable;
import lombok.Data;
import lombok.ToString;
/**
* @author wjs
* @version 创建时间:2025-8-13 17:19:11
* 类说明 日活设备信息
*/
@Data
@ToString
public class DailyActivityDeviceInfo implements Serializable{
/**
*
*/
private static final long serialVersionUID = 1L;
private String deviceId;
private String deviceIdV1;
private String appKey;
private String uniqueId;
private String appType;
private String dt;
private String model;
private String brand;
private String osRelease;
private String appVersion;
private String zoneName;
private String zoneType;
private String zoneCode;
private Long waterMarkTime;
private String platform;
private String deviceName;
public DailyActivityDeviceInfo(
String deviceId,
String deviceIdV1,
String appKey,
String uniqueId,
String appType,
String dt,
String model,
String brand,
String osRelease,
String appVersion,
Long waterMarkTime,
String zoneName,
String zoneType,
String zoneCode,
String platform,
String deviceName
) {
this.deviceId = deviceId;
this.deviceIdV1 = deviceIdV1;
this.appKey = appKey;
this.uniqueId = uniqueId;
this.appType = appType;
this.dt = dt;
this.model = model;
this.brand = brand;
this.osRelease = osRelease;
this.appVersion = appVersion;
this.waterMarkTime = waterMarkTime;
this.zoneName = zoneName;
this.zoneType = zoneType;
this.zoneCode = zoneCode;
this.platform = platform;
this.deviceName = deviceName;
}
}
package com.flink.vo;
package com.flink.vo.userDailyActivity;
import java.io.Serializable;
......@@ -12,7 +12,7 @@ import lombok.ToString;
*/
@Data
@ToString(callSuper = true)
public class EnrichedLog extends CombinedLog implements Serializable{
public class DailyActivityEnrichedLog extends DailyActivityCombinedLog implements Serializable{
/**
*
......@@ -27,7 +27,7 @@ public class EnrichedLog extends CombinedLog implements Serializable{
* @param phoneName 品牌手机名称(可空)
* @param networkModel 入网型号(可空)
*/
public EnrichedLog(CombinedLog baseLog, String phoneName, String networkModel) {
public DailyActivityEnrichedLog(DailyActivityCombinedLog baseLog, String phoneName, String networkModel) {
// 调用父类构造方法初始化基础字段
super(
baseLog.getDeviceIdV1(), baseLog.getAppKey(), baseLog.getAppType(),
......@@ -36,7 +36,7 @@ public class EnrichedLog extends CombinedLog implements Serializable{
baseLog.getPhone(), baseLog.getNick(), baseLog.getIp(),
baseLog.getAreaName(), baseLog.getWaterMarkTime(),
baseLog.getZoneName(), baseLog.getZoneType(), baseLog.getZoneCode(),
baseLog.getFirstTime(), baseLog.getLatestTime()
baseLog.getFirstTime(), baseLog.getLatestTime(),baseLog.getDeviceName(), baseLog.getPlatform(), baseLog.getCountryCode()
);
this.phoneName = phoneName;
this.networkModel = networkModel;
......
package com.flink.vo.userDailyActivity;
import java.io.Serializable;
import lombok.Data;
import lombok.ToString;
/**
* @author wjs
* @version 创建时间:2025-8-13 17:40:32
* 类说明
*/
@Data
@ToString
public class DailyActivityEventInfo implements Serializable{
/**
*
*/
private static final long serialVersionUID = 1L;
private String deviceId;
private String uniqueId;
private String cid;
private String phone;
private String nick;
private String ip;
private String areaName;
private String eventTime;
private Long waterMarkTime;
private String appKey;
private String appType;
private String createTime;
private String dt;
private String platform;
public DailyActivityEventInfo(String deviceId,String uniqueId, String cid, String phone, String nick, String ip, String areaName, String eventTime,Long waterMarkTime,
String appKey, String appType, String createTime,String dt,String platform) {
this.deviceId = deviceId;
this.uniqueId = uniqueId;
this.cid = cid;
this.phone = phone;
this.nick = nick;
this.ip = ip;
this.areaName = areaName;
this.eventTime = eventTime;
this.waterMarkTime = waterMarkTime;
this.appKey = appKey;
this.appType = appType;
this.createTime = createTime;
this.dt = dt;
this.platform = platform;
}
}
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or sign in to comment