Commit de18ac73 by 魏建枢

日活,好友汇总代码提交

parent 5333e716
Showing with 383 additions and 22 deletions
...@@ -64,6 +64,50 @@ ...@@ -64,6 +64,50 @@
<dependencies> <dependencies>
<dependency> <dependency>
<groupId>org.apache.flink</groupId> <groupId>org.apache.flink</groupId>
<artifactId>flink-table-planner_2.12</artifactId>
<version>1.20.0</version>
<scope>provided</scope>
<exclusions>
<exclusion>
<artifactId>value</artifactId>
<groupId>org.immutables</groupId>
</exclusion>
<exclusion>
<artifactId>value-annotations</artifactId>
<groupId>org.immutables</groupId>
</exclusion>
<exclusion>
<artifactId>commons-compiler</artifactId>
<groupId>org.codehaus.janino</groupId>
</exclusion>
<exclusion>
<artifactId>janino</artifactId>
<groupId>org.codehaus.janino</groupId>
</exclusion>
<exclusion>
<artifactId>flink-scala_2.12</artifactId>
<groupId>org.apache.flink</groupId>
</exclusion>
<exclusion>
<artifactId>flink-table-runtime</artifactId>
<groupId>org.apache.flink</groupId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-api-java</artifactId>
<version>1.20.0</version>
<scope>provided</scope>
<exclusions>
<exclusion>
<artifactId>flink-table-common</artifactId>
<groupId>org.apache.flink</groupId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-api-java-bridge</artifactId> <artifactId>flink-table-api-java-bridge</artifactId>
<version>1.20.0</version> <version>1.20.0</version>
<scope>provided</scope> <scope>provided</scope>
...@@ -76,9 +120,115 @@ ...@@ -76,9 +120,115 @@
</dependency> </dependency>
<dependency> <dependency>
<groupId>org.apache.flink</groupId> <groupId>org.apache.flink</groupId>
<artifactId>flink-java</artifactId>
<version>1.20.0</version>
<scope>provided</scope>
<exclusions>
<exclusion>
<artifactId>flink-core</artifactId>
<groupId>org.apache.flink</groupId>
</exclusion>
<exclusion>
<artifactId>chill-java</artifactId>
<groupId>com.twitter</groupId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-streaming-scala_2.12</artifactId>
<version>1.20.0</version>
<scope>provided</scope>
<exclusions>
<exclusion>
<artifactId>scala-reflect</artifactId>
<groupId>org.scala-lang</groupId>
</exclusion>
<exclusion>
<artifactId>scala-library</artifactId>
<groupId>org.scala-lang</groupId>
</exclusion>
<exclusion>
<artifactId>scala-compiler</artifactId>
<groupId>org.scala-lang</groupId>
</exclusion>
<exclusion>
<artifactId>flink-scala_2.12</artifactId>
<groupId>org.apache.flink</groupId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-clients</artifactId>
<version>1.20.0</version>
<scope>provided</scope>
<exclusions>
<exclusion>
<artifactId>flink-runtime</artifactId>
<groupId>org.apache.flink</groupId>
</exclusion>
<exclusion>
<artifactId>flink-optimizer</artifactId>
<groupId>org.apache.flink</groupId>
</exclusion>
<exclusion>
<artifactId>flink-datastream</artifactId>
<groupId>org.apache.flink</groupId>
</exclusion>
<exclusion>
<artifactId>flink-core</artifactId>
<groupId>org.apache.flink</groupId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-streaming-java</artifactId>
<version>1.20.0</version>
<scope>provided</scope>
<exclusions>
<exclusion>
<artifactId>flink-shaded-guava</artifactId>
<groupId>org.apache.flink</groupId>
</exclusion>
<exclusion>
<artifactId>flink-connector-datagen</artifactId>
<groupId>org.apache.flink</groupId>
</exclusion>
<exclusion>
<artifactId>flink-runtime</artifactId>
<groupId>org.apache.flink</groupId>
</exclusion>
<exclusion>
<artifactId>flink-core</artifactId>
<groupId>org.apache.flink</groupId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-runtime-web</artifactId> <artifactId>flink-runtime-web</artifactId>
<version>1.20.0</version> <version>1.20.0</version>
<scope>provided</scope> <scope>provided</scope>
<exclusions>
<exclusion>
<artifactId>flink-shaded-netty</artifactId>
<groupId>org.apache.flink</groupId>
</exclusion>
<exclusion>
<artifactId>flink-shaded-jackson</artifactId>
<groupId>org.apache.flink</groupId>
</exclusion>
<exclusion>
<artifactId>flink-shaded-guava</artifactId>
<groupId>org.apache.flink</groupId>
</exclusion>
<exclusion>
<artifactId>flink-runtime</artifactId>
<groupId>org.apache.flink</groupId>
</exclusion>
</exclusions>
</dependency> </dependency>
<dependency> <dependency>
<groupId>jdk.tools</groupId> <groupId>jdk.tools</groupId>
...@@ -91,7 +241,7 @@ ...@@ -91,7 +241,7 @@
<properties> <properties>
<maven.plugin.version>3.8.1</maven.plugin.version> <maven.plugin.version>3.8.1</maven.plugin.version>
<flink.version>1.20.0</flink.version> <flink.version>1.20.0</flink.version>
<scala.binary.version>2.11</scala.binary.version> <scala.binary.version>2.12</scala.binary.version>
<maven.compiler.target>1.8</maven.compiler.target> <maven.compiler.target>1.8</maven.compiler.target>
<project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding> <project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding>
<log4j.version>2.17.1</log4j.version> <log4j.version>2.17.1</log4j.version>
......
...@@ -26,22 +26,23 @@ ...@@ -26,22 +26,23 @@
<!--flink版本 --> <!--flink版本 -->
<flink.version>1.20.0</flink.version> <flink.version>1.20.0</flink.version>
<!--scala版本 --> <!--scala版本 -->
<scala.binary.version>2.11</scala.binary.version> <scala.binary.version>2.12</scala.binary.version>
</properties> </properties>
<!--通用依赖 --> <!--通用依赖 -->
<dependencies> <dependencies>
<dependency> <dependency>
<groupId>org.apache.flink</groupId> <groupId>org.apache.flink</groupId>
<artifactId>flink-table-planner_2.12</artifactId> <artifactId>flink-table-planner_2.12</artifactId>
<version>${flink.version}</version> <version>${flink.version}</version>
<scope>provided</scope>
</dependency> </dependency>
<dependency> <dependency>
<groupId>org.apache.flink</groupId> <groupId>org.apache.flink</groupId>
<artifactId>flink-table-api-java</artifactId> <artifactId>flink-table-api-java</artifactId>
<version>${flink.version}</version> <version>${flink.version}</version>
<scope>provided</scope>
</dependency> </dependency>
<dependency> <dependency>
<groupId>org.apache.flink</groupId> <groupId>org.apache.flink</groupId>
...@@ -61,12 +62,14 @@ ...@@ -61,12 +62,14 @@
<groupId>org.apache.flink</groupId> <groupId>org.apache.flink</groupId>
<artifactId>flink-java</artifactId> <artifactId>flink-java</artifactId>
<version>${flink.version}</version> <version>${flink.version}</version>
<scope>provided</scope>
</dependency> </dependency>
<dependency> <dependency>
<groupId>org.apache.flink</groupId> <groupId>org.apache.flink</groupId>
<artifactId>flink-streaming-scala_2.12</artifactId> <artifactId>flink-streaming-scala_2.12</artifactId>
<version>${flink.version}</version> <version>${flink.version}</version>
<scope>provided</scope>
</dependency> </dependency>
<!-- https://mvnrepository.com/artifact/org.apache.flink/flink-clients --> <!-- https://mvnrepository.com/artifact/org.apache.flink/flink-clients -->
...@@ -74,12 +77,14 @@ ...@@ -74,12 +77,14 @@
<groupId>org.apache.flink</groupId> <groupId>org.apache.flink</groupId>
<artifactId>flink-clients</artifactId> <artifactId>flink-clients</artifactId>
<version>${flink.version}</version> <version>${flink.version}</version>
<scope>provided</scope>
</dependency> </dependency>
<dependency> <dependency>
<groupId>org.apache.flink</groupId> <groupId>org.apache.flink</groupId>
<artifactId>flink-streaming-java</artifactId> <artifactId>flink-streaming-java</artifactId>
<version>${flink.version}</version> <version>${flink.version}</version>
<scope>provided</scope>
</dependency> </dependency>
<dependency> <dependency>
...@@ -154,6 +159,12 @@ ...@@ -154,6 +159,12 @@
<groupId>org.apache.hadoop</groupId> <groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-common</artifactId> <artifactId>hadoop-common</artifactId>
<version>3.3.6</version> <version>3.3.6</version>
<exclusions>
<exclusion>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-log4j12</artifactId>
</exclusion>
</exclusions>
</dependency> </dependency>
<dependency> <dependency>
<groupId>org.apache.hadoop</groupId> <groupId>org.apache.hadoop</groupId>
......
...@@ -354,6 +354,8 @@ public class EventIpLatestAchi extends SourceCommonBase implements Serializable{ ...@@ -354,6 +354,8 @@ public class EventIpLatestAchi extends SourceCommonBase implements Serializable{
eventIp.setAppKey(appKey); eventIp.setAppKey(appKey);
eventIp.setAppType(appType); eventIp.setAppType(appType);
eventIp.setCreateTime(createTime); eventIp.setCreateTime(createTime);
eventIp.setDeviceId(odsEventLog.getDevice_id());
eventIp.setUniqueId(odsEventLog.getUnique_id());
return eventIp; return eventIp;
} }
...@@ -417,6 +419,8 @@ public class EventIpLatestAchi extends SourceCommonBase implements Serializable{ ...@@ -417,6 +419,8 @@ public class EventIpLatestAchi extends SourceCommonBase implements Serializable{
eventIp.setAppKey(appKey); eventIp.setAppKey(appKey);
eventIp.setAppType(appType); eventIp.setAppType(appType);
eventIp.setCreateTime(createTime); eventIp.setCreateTime(createTime);
eventIp.setDeviceId(pcEventInfo.getI8());
eventIp.setUniqueId(pcEventInfo.getI8());
return eventIp; return eventIp;
} }
......
...@@ -9,6 +9,7 @@ import java.time.Duration; ...@@ -9,6 +9,7 @@ import java.time.Duration;
import java.util.Arrays; import java.util.Arrays;
import java.util.Collections; import java.util.Collections;
import java.util.List; import java.util.List;
import java.util.Objects;
import java.util.concurrent.CompletableFuture; import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeUnit;
...@@ -18,6 +19,7 @@ import org.apache.doris.flink.sink.DorisSink; ...@@ -18,6 +19,7 @@ import org.apache.doris.flink.sink.DorisSink;
import org.apache.flink.api.common.eventtime.WatermarkStrategy; import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.common.functions.CoGroupFunction; import org.apache.flink.api.common.functions.CoGroupFunction;
import org.apache.flink.api.common.functions.FlatMapFunction; import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.common.io.ParseException; import org.apache.flink.api.common.io.ParseException;
import org.apache.flink.api.common.state.StateTtlConfig; import org.apache.flink.api.common.state.StateTtlConfig;
import org.apache.flink.api.common.state.ValueState; import org.apache.flink.api.common.state.ValueState;
...@@ -50,6 +52,7 @@ import org.apache.flink.util.OutputTag; ...@@ -50,6 +52,7 @@ import org.apache.flink.util.OutputTag;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONObject; import com.alibaba.fastjson.JSONObject;
import com.alibaba.fastjson.TypeReference; import com.alibaba.fastjson.TypeReference;
import com.flink.common.DorisConnector; import com.flink.common.DorisConnector;
...@@ -58,6 +61,7 @@ import com.flink.enums.TopicTypeEnum; ...@@ -58,6 +61,7 @@ import com.flink.enums.TopicTypeEnum;
import com.flink.processor.function.LatestUserProcessFunction; import com.flink.processor.function.LatestUserProcessFunction;
import com.flink.util.CompareUtils; import com.flink.util.CompareUtils;
import com.flink.util.TimeConvertUtil; import com.flink.util.TimeConvertUtil;
import com.flink.vo.DwdSysLog;
import com.flink.vo.EventList; import com.flink.vo.EventList;
import com.flink.vo.KafkaDataSource; import com.flink.vo.KafkaDataSource;
import com.flink.vo.OdsEventLog; import com.flink.vo.OdsEventLog;
...@@ -93,8 +97,12 @@ public class UserDailyActivityAchi extends SourceCommonBase implements Serializa ...@@ -93,8 +97,12 @@ public class UserDailyActivityAchi extends SourceCommonBase implements Serializa
DataStreamSource<String> pcEventStreamSource = null; DataStreamSource<String> pcEventStreamSource = null;
DataStreamSource<String> userStreamSource = null; DataStreamSource<String> userStreamSource = null;
DataStreamSource<String> abroadUserStreamSource = null; DataStreamSource<String> abroadUserStreamSource = null;
DataStreamSource<String> sysLogStreamSource = null;
if(CollectionUtils.isNotEmpty(dataSourceList)) { if(CollectionUtils.isNotEmpty(dataSourceList)) {
for(KafkaDataSource kafkaDataSource : dataSourceList) { for(KafkaDataSource kafkaDataSource : dataSourceList) {
if(StringUtils.equals(kafkaDataSource.getTopic(),TopicTypeEnum.DWD_SYS_LOG.getTopic())) {
sysLogStreamSource = kafkaDataSource.getDataStreamSource();
}
if(StringUtils.equals(kafkaDataSource.getTopic(),TopicTypeEnum.ODS_NEW_COLLECT_LOG.getTopic())) { if(StringUtils.equals(kafkaDataSource.getTopic(),TopicTypeEnum.ODS_NEW_COLLECT_LOG.getTopic())) {
collectLogStreamSource = kafkaDataSource.getDataStreamSource(); collectLogStreamSource = kafkaDataSource.getDataStreamSource();
} }
...@@ -119,7 +127,7 @@ public class UserDailyActivityAchi extends SourceCommonBase implements Serializa ...@@ -119,7 +127,7 @@ public class UserDailyActivityAchi extends SourceCommonBase implements Serializa
} }
//设备信息合并 //设备信息合并
DataStream<DailyActivityDeviceInfo> mergedDeviceStream = mergedDeviceStream(collectLogStreamSource,pcCollectLogStreamSource); DataStream<DailyActivityDeviceInfo> mergedDeviceStream = mergedDeviceStream(collectLogStreamSource,pcCollectLogStreamSource,sysLogStreamSource);
//事件信息合并 //事件信息合并
DataStream<DailyActivityEventInfo> mergedEventStream = mergedEventStream(eventStreamSource,pcEventStreamSource); DataStream<DailyActivityEventInfo> mergedEventStream = mergedEventStream(eventStreamSource,pcEventStreamSource);
// 用户数据合并 // 用户数据合并
...@@ -352,6 +360,8 @@ public class UserDailyActivityAchi extends SourceCommonBase implements Serializa ...@@ -352,6 +360,8 @@ public class UserDailyActivityAchi extends SourceCommonBase implements Serializa
output.setAppVersion(log.getAppVersion()); output.setAppVersion(log.getAppVersion());
output.setIp(log.getIp()); output.setIp(log.getIp());
output.setAreaName(log.getAreaName()); output.setAreaName(log.getAreaName());
output.setNetworkIp(log.getNetwork_ip());
output.setNetworkAreaName(log.getNetwork_area_name());
output.setFirstTime(log.getFirstTime()); output.setFirstTime(log.getFirstTime());
output.setLatestTime(log.getLatestTime()); output.setLatestTime(log.getLatestTime());
output.setPhoneName(log.getPhoneName()); output.setPhoneName(log.getPhoneName());
...@@ -387,6 +397,8 @@ public class UserDailyActivityAchi extends SourceCommonBase implements Serializa ...@@ -387,6 +397,8 @@ public class UserDailyActivityAchi extends SourceCommonBase implements Serializa
output.setAppVersion(log.getAppVersion()); output.setAppVersion(log.getAppVersion());
output.setIp(log.getIp()); output.setIp(log.getIp());
output.setAreaName(log.getAreaName()); output.setAreaName(log.getAreaName());
output.setNetworkIp(log.getNetwork_ip());
output.setNetworkAreaName(log.getNetwork_area_name());
output.setFirstTime(log.getFirstTime()); output.setFirstTime(log.getFirstTime());
output.setLatestTime(log.getLatestTime()); output.setLatestTime(log.getLatestTime());
output.setPhoneName(log.getPhoneName()); output.setPhoneName(log.getPhoneName());
...@@ -436,7 +448,7 @@ public class UserDailyActivityAchi extends SourceCommonBase implements Serializa ...@@ -436,7 +448,7 @@ public class UserDailyActivityAchi extends SourceCommonBase implements Serializa
dev.getModel(), dev.getBrand(), dev.getOsRelease(), dev.getAppVersion(), dev.getModel(), dev.getBrand(), dev.getOsRelease(), dev.getAppVersion(),
evt.getCid(), phone, evt.getNick(), evt.getIp(), evt.getAreaName(), evt.getCid(), phone, evt.getNick(), evt.getIp(), evt.getAreaName(),
dev.getWaterMarkTime(), dev.getZoneName(), dev.getZoneType(), dev.getZoneCode(), dev.getWaterMarkTime(), dev.getZoneName(), dev.getZoneType(), dev.getZoneCode(),
0L, 0L,dev.getDeviceName(),dev.getPlatform(),countryCode 0L, 0L,dev.getDeviceName(),dev.getPlatform(),countryCode,dev.getNetwork_ip(),dev.getNetwork_area_name()
)); ));
} }
}); });
...@@ -582,10 +594,49 @@ public class UserDailyActivityAchi extends SourceCommonBase implements Serializa ...@@ -582,10 +594,49 @@ public class UserDailyActivityAchi extends SourceCommonBase implements Serializa
} }
} }
//设备信息合并 //设备信息关联入网ip合并
private DataStream<DailyActivityDeviceInfo> mergedDeviceStream(DataStreamSource<String> collectLogStreamSource, private DataStream<DailyActivityDeviceInfo> mergedDeviceStream(DataStreamSource<String> collectLogStreamSource,
DataStreamSource<String> pcCollectLogStreamSource) { DataStreamSource<String> pcCollectLogStreamSource,DataStreamSource<String> sysLogStreamSource) {
return collectLogStreamSource.flatMap(new FlatMapFunction<String, DailyActivityDeviceInfo>() {
SingleOutputStreamOperator<DwdSysLog> sysDataStream = sysLogStreamSource
.map(new MapFunction<String, DwdSysLog>() {
/**
*
*/
private static final long serialVersionUID = 1L;
@Override
public DwdSysLog map(String value) throws Exception {
try {
DwdSysLog sysLog = JSON.parseObject(value, DwdSysLog.class);
logger.debug("sysLog原始数据: {}", sysLog);
if (sysLog == null || StringUtils.isEmpty(sysLog.getSend_time())) {
logger.warn("空值日志: {}", value);
return null;
}
// 时间戳转换(添加异常捕获)
Long waterMarkTime = TimeConvertUtil.convertToTimestamp(sysLog.getSend_time());
if (waterMarkTime <= 0) {
logger.error("时间转换失败: {}", sysLog.getSend_time());
return null;
}
sysLog.setWaterMarkTime(waterMarkTime);
return sysLog;
} catch (Exception e) {
logger.error("JSON解析失败: {} | 错误: {}", value, e.getMessage());
return null;
}
}
})
.filter(Objects::nonNull)
.assignTimestampsAndWatermarks(
WatermarkStrategy.<DwdSysLog>forBoundedOutOfOrderness(Duration.ofSeconds(5))
.withTimestampAssigner((sysLog, ts) -> sysLog.getWaterMarkTime())
);
SingleOutputStreamOperator<DailyActivityDeviceInfo> mergedDeviceStream = collectLogStreamSource.flatMap(new FlatMapFunction<String, DailyActivityDeviceInfo>() {
private static final long serialVersionUID = 1L; private static final long serialVersionUID = 1L;
@Override @Override
...@@ -619,6 +670,51 @@ public class UserDailyActivityAchi extends SourceCommonBase implements Serializa ...@@ -619,6 +670,51 @@ public class UserDailyActivityAchi extends SourceCommonBase implements Serializa
) )
.assignTimestampsAndWatermarks(WatermarkStrategy.<DailyActivityDeviceInfo>forBoundedOutOfOrderness(Duration.ofSeconds(5)) .assignTimestampsAndWatermarks(WatermarkStrategy.<DailyActivityDeviceInfo>forBoundedOutOfOrderness(Duration.ofSeconds(5))
.withTimestampAssigner((device, ts) -> device.getWaterMarkTime())); .withTimestampAssigner((device, ts) -> device.getWaterMarkTime()));
return sysDataStream
.keyBy(new KeySelector<DwdSysLog, String>() {
/**
*
*/
private static final long serialVersionUID = 1L;
@Override
public String getKey(DwdSysLog sysLog) {
return sysLog.getDevice_id() + "#_#" +
sysLog.getUnique_id() + "#_#" +
sysLog.getDevice_id_v1();
}
})
.intervalJoin(
mergedDeviceStream.keyBy(new KeySelector<DailyActivityDeviceInfo, String>() {
/**
*
*/
private static final long serialVersionUID = 1L;
@Override
public String getKey(DailyActivityDeviceInfo dev) {
return dev.getDeviceId() + "#_#" +
dev.getUniqueId() + "#_#" +
dev.getDeviceIdV1(); // 注意字段名一致性!
}
})
)
.between(Duration.ofMinutes(-10), Duration.ofMinutes(5))
.process(new ProcessJoinFunction<DwdSysLog, DailyActivityDeviceInfo, DailyActivityDeviceInfo>() {
/**
*
*/
private static final long serialVersionUID = 1L;
@Override
public void processElement(DwdSysLog sysLog, DailyActivityDeviceInfo dev, Context ctx, Collector<DailyActivityDeviceInfo> out) {
out.collect(new DailyActivityDeviceInfo(dev.getDeviceId(), dev.getDeviceIdV1(), dev.getAppKey(), dev.getUniqueId(),
dev.getAppType(), dev.getDt(),dev.getModel(),dev.getBrand(),dev.getOsRelease(),dev.getAppVersion(),dev.getWaterMarkTime(),
dev.getZoneName(),dev.getZoneType(),dev.getZoneCode(),dev.getPlatform(),dev.getDeviceName(),sysLog.getNetwork_ip(),sysLog.getNetwork_area_name()));
}
});
} }
// 用户数据合并 // 用户数据合并
......
...@@ -3,7 +3,6 @@ package com.flink.achieve.table; ...@@ -3,7 +3,6 @@ package com.flink.achieve.table;
import java.io.Serializable; import java.io.Serializable;
import java.util.List; import java.util.List;
import org.apache.commons.lang3.StringUtils;
import org.apache.flink.api.common.io.ParseException; import org.apache.flink.api.common.io.ParseException;
import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.DataStreamSource; import org.apache.flink.streaming.api.datastream.DataStreamSource;
......
...@@ -3,7 +3,7 @@ package com.flink.achieve.table.sql; ...@@ -3,7 +3,7 @@ package com.flink.achieve.table.sql;
import java.util.List; import java.util.List;
import org.apache.commons.collections.CollectionUtils; import org.apache.commons.collections.CollectionUtils;
import org.apache.flink.calcite.shaded.org.apache.commons.codec.binary.StringUtils; import org.apache.commons.lang3.StringUtils;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment; import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
import com.flink.achieve.table.schema.KafkaBaseSchema; import com.flink.achieve.table.schema.KafkaBaseSchema;
......
package com.flink.common; package com.flink.common;
import org.apache.commons.lang3.StringUtils;
import org.apache.flink.api.common.eventtime.WatermarkStrategy; import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.common.serialization.SimpleStringSchema; import org.apache.flink.api.common.serialization.SimpleStringSchema;
import org.apache.flink.connector.base.DeliveryGuarantee; import org.apache.flink.connector.base.DeliveryGuarantee;
......
...@@ -20,6 +20,7 @@ public enum JobTypeEnum { ...@@ -20,6 +20,7 @@ public enum JobTypeEnum {
EVENT_IP_CONVERT("JOB_01", "事件IP转换作业"), EVENT_IP_CONVERT("JOB_01", "事件IP转换作业"),
COMMON_CONSUME_BASE("JOB_02", "公共基础消费采集作业"), COMMON_CONSUME_BASE("JOB_02", "公共基础消费采集作业"),
USER_DAILY_ACTIVITY("JOB_03", "用户日活作业"), USER_DAILY_ACTIVITY("JOB_03", "用户日活作业"),
COMMON_CONSUME_SQL_BASE("JOB_04", "公共基础消费采集SQL作业"),
EVENT_IP_CONVERT_CID("JOB_07", "最新事件IP作业"), EVENT_IP_CONVERT_CID("JOB_07", "最新事件IP作业"),
DEVICE_ID_CID("JOB_08", "最新设备ID作业"), DEVICE_ID_CID("JOB_08", "最新设备ID作业"),
......
...@@ -8,7 +8,7 @@ import com.flink.vo.KafkaTopic; ...@@ -8,7 +8,7 @@ import com.flink.vo.KafkaTopic;
* 类说明 * 类说明
*/ */
public enum TopicTypeEnum { public enum TopicTypeEnum {
DWD_SYS_LOG("dwd_sys_log","dwdSysLog"),
ODS_EVENT_LOG("ods_event_log","eventLogGroup"), ODS_EVENT_LOG("ods_event_log","eventLogGroup"),
ODS_NEW_COLLECT_LOG("ods_new_collect_log","odsNewCollectLog"), ODS_NEW_COLLECT_LOG("ods_new_collect_log","odsNewCollectLog"),
ODS_ZIPPER_STRATEGY("ods_zipper_strategy","odsZipperStrategy"), ODS_ZIPPER_STRATEGY("ods_zipper_strategy","odsZipperStrategy"),
...@@ -31,6 +31,7 @@ public enum TopicTypeEnum { ...@@ -31,6 +31,7 @@ public enum TopicTypeEnum {
ODS_PC_COLLECT_LOG("ods_pc_collect_log","odsPcCollectLog"), ODS_PC_COLLECT_LOG("ods_pc_collect_log","odsPcCollectLog"),
ODS_COMMUNITY_HISTORY("ods_community_history","odsCommunityHistory"), ODS_COMMUNITY_HISTORY("ods_community_history","odsCommunityHistory"),
ODS_SYS_LOG("ods_sys_log","odsSysLog"), ODS_SYS_LOG("ods_sys_log","odsSysLog"),
; ;
private String topic; private String topic;
......
...@@ -3,6 +3,7 @@ package com.flink.factory; ...@@ -3,6 +3,7 @@ package com.flink.factory;
import com.flink.enums.JobTypeEnum; import com.flink.enums.JobTypeEnum;
import com.flink.processor.JobProcessor; import com.flink.processor.JobProcessor;
import com.flink.processor.impl.CommonConsumeBaseProcessor; import com.flink.processor.impl.CommonConsumeBaseProcessor;
import com.flink.processor.impl.CommonConsumeSqlBaseProcessor;
import com.flink.processor.impl.DeviceIdLatestProcessor; import com.flink.processor.impl.DeviceIdLatestProcessor;
import com.flink.processor.impl.EventIpConvertProcessor; import com.flink.processor.impl.EventIpConvertProcessor;
import com.flink.processor.impl.EventIpLatestProcessor; import com.flink.processor.impl.EventIpLatestProcessor;
...@@ -25,6 +26,8 @@ public class JobProcessorFactory { ...@@ -25,6 +26,8 @@ public class JobProcessorFactory {
return new EventIpConvertProcessor(); return new EventIpConvertProcessor();
case COMMON_CONSUME_BASE: case COMMON_CONSUME_BASE:
return new CommonConsumeBaseProcessor(); return new CommonConsumeBaseProcessor();
case COMMON_CONSUME_SQL_BASE:
return new CommonConsumeSqlBaseProcessor();
case USER_DAILY_ACTIVITY: case USER_DAILY_ACTIVITY:
return new UserDailyActivityProcessor(); return new UserDailyActivityProcessor();
case EVENT_IP_CONVERT_CID: case EVENT_IP_CONVERT_CID:
......
...@@ -75,7 +75,10 @@ public class SimiFriendsTempJoinProcessor extends CoProcessFunction<TotalTemp, S ...@@ -75,7 +75,10 @@ public class SimiFriendsTempJoinProcessor extends CoProcessFunction<TotalTemp, S
totalTemp.getCreateTime(), totalTemp.getCreateTime(),
totalTemp.getCidsList(), totalTemp.getCidsList(),
totalTemp.getFriendsList(), totalTemp.getFriendsList(),
totalTemp.getCollectTime() totalTemp.getCollectTime(),
StringUtils.isNotEmpty(totalTemp.getNetwork_ip())?totalTemp.getNetwork_ip() : null,
StringUtils.isNotEmpty(totalTemp.getNetwork_area_name())?totalTemp.getNetwork_area_name() : null,
StringUtils.isNotEmpty(totalTemp.getZone_name())?totalTemp.getZone_name() : null
); );
} }
......
package com.flink.processor.impl;
import java.util.Arrays;
import java.util.List;
import java.util.stream.Collectors;
import com.flink.achieve.table.TableSqlSinkKafkaAchi;
import com.flink.enums.JobTypeEnum;
import com.flink.enums.TopicTypeEnum;
import com.flink.processor.JobProcessor;
import com.flink.vo.KafkaTopic;
/**
* @author wjs
* @version 创建时间:2025-8-24 17:37:39
* 类说明
*/
public class CommonConsumeSqlBaseProcessor implements JobProcessor{
@Override
public void process() throws Exception {
new TableSqlSinkKafkaAchi().multipleExecuteJob(
createTopicList(),
JobTypeEnum.COMMON_CONSUME_SQL_BASE,
true,
false
);
}
private static List<KafkaTopic> createTopicList() {
return Arrays.stream(new TopicTypeEnum[]{
TopicTypeEnum.ODS_SYS_LOG
}).map(TopicTypeEnum::createKafkaTopic)
.collect(Collectors.toList());
}
}
...@@ -31,7 +31,8 @@ public class SimiFriendsProcessor implements JobProcessor{ ...@@ -31,7 +31,8 @@ public class SimiFriendsProcessor implements JobProcessor{
return Arrays.stream(new TopicTypeEnum[]{ return Arrays.stream(new TopicTypeEnum[]{
TopicTypeEnum.OPEN_SIMI_API, TopicTypeEnum.OPEN_SIMI_API,
TopicTypeEnum.ODS_EVENT_LOG, TopicTypeEnum.ODS_EVENT_LOG,
TopicTypeEnum.SIMI_USER_LIST_TOPIC TopicTypeEnum.SIMI_USER_LIST_TOPIC,
TopicTypeEnum.DWD_SYS_LOG
}).map(TopicTypeEnum::createKafkaTopic) }).map(TopicTypeEnum::createKafkaTopic)
.collect(Collectors.toList()); .collect(Collectors.toList());
......
...@@ -34,7 +34,8 @@ public class UserDailyActivityProcessor implements JobProcessor{ ...@@ -34,7 +34,8 @@ public class UserDailyActivityProcessor implements JobProcessor{
TopicTypeEnum.ODS_PC_EVENT_LOG, TopicTypeEnum.ODS_PC_EVENT_LOG,
TopicTypeEnum.ODS_PC_COLLECT_LOG, TopicTypeEnum.ODS_PC_COLLECT_LOG,
TopicTypeEnum.SIMI_USER_LIST_TOPIC, TopicTypeEnum.SIMI_USER_LIST_TOPIC,
TopicTypeEnum.ABROAD_SIMI_USER_LIST_TOPIC TopicTypeEnum.ABROAD_SIMI_USER_LIST_TOPIC,
TopicTypeEnum.DWD_SYS_LOG
}).map(TopicTypeEnum::createKafkaTopic) }).map(TopicTypeEnum::createKafkaTopic)
.collect(Collectors.toList()); .collect(Collectors.toList());
......
...@@ -71,8 +71,9 @@ public class TimeConvertUtil { ...@@ -71,8 +71,9 @@ public class TimeConvertUtil {
return dateTime.format(formatter); return dateTime.format(formatter);
} }
public static void main(String[] args) { public static void main(String[] args) throws Exception {
String aa = parseToStringSSS(1752039403729L); // String aa = parseToStringSSS(1752039403729L);
Long aa =convertToTimestamp("2025-08-24 12:14:06.107");
System.out.println(aa); System.out.println(aa);
} }
} }
package com.flink.vo;
import java.io.Serializable;
import lombok.Data;
import lombok.ToString;
/**
* @author wjs
* @version 创建时间:2025-8-24 20:22:15
* 类说明
*/
@Data
@ToString
public class DwdSysLog implements Serializable{/**
*
*/
private static final long serialVersionUID = 1L;
private String network_ip;
private String network_area_name;
private String unique_id;
private String device_id;
private String device_id_v1;
private String send_type;
private String zone_name;
private String zone_code;
private String zone_type;
private String send_time;
private String app_key;
private String app_type;
private String cid;
private String phone;
private String nick;
private Long waterMarkTime;
}
...@@ -27,5 +27,9 @@ public class EventIp implements Serializable{ ...@@ -27,5 +27,9 @@ public class EventIp implements Serializable{
private String appType; private String appType;
private Long eventTime; private Long eventTime;
private String createTime; private String createTime;
private String deviceId;
private String uniqueId;
private String network_ip;
private String network_area_name;
private String zone_name;
} }
...@@ -65,7 +65,7 @@ public class BulidDailyParams { ...@@ -65,7 +65,7 @@ public class BulidDailyParams {
createTime.substring(0, 10), deviceIdInfo.getModel(), deviceIdInfo.getBrand(), createTime.substring(0, 10), deviceIdInfo.getModel(), deviceIdInfo.getBrand(),
deviceIdInfo.getOsRelease(), deviceIdInfo.getAppVersion(), deviceIdInfo.getOsRelease(), deviceIdInfo.getAppVersion(),
TimeConvertUtil.convertToTimestamp(createTime), log.getZone_name(), log.getZone_type(), TimeConvertUtil.convertToTimestamp(createTime), log.getZone_name(), log.getZone_type(),
log.getZone_code(),getPlatformByAppKey(appKey),null); log.getZone_code(),getPlatformByAppKey(appKey),null,null,null);
} }
// 处理PC设备ID数据 // 处理PC设备ID数据
...@@ -99,7 +99,7 @@ public class BulidDailyParams { ...@@ -99,7 +99,7 @@ public class BulidDailyParams {
return new DailyActivityDeviceInfo(deviceId, deviceId, appKey, deviceId, appType, createTime.substring(0, 10), return new DailyActivityDeviceInfo(deviceId, deviceId, appKey, deviceId, appType, createTime.substring(0, 10),
pcDeviceInfo.getB3(), pcDeviceInfo.getB2(), pcDeviceInfo.getB4(), log.getApp_version(), pcDeviceInfo.getB3(), pcDeviceInfo.getB2(), pcDeviceInfo.getB4(), log.getApp_version(),
TimeConvertUtil.convertToTimestamp(createTime), log.getZone_name(), log.getZone_type(), TimeConvertUtil.convertToTimestamp(createTime), log.getZone_name(), log.getZone_type(),
log.getZone_code(),getPlatformByAppKey(appKey),deviceName); log.getZone_code(),getPlatformByAppKey(appKey),deviceName,null,null);
} }
// 处理APP事件数据 // 处理APP事件数据
......
...@@ -39,11 +39,13 @@ public class DailyActivityCombinedLog implements Serializable { ...@@ -39,11 +39,13 @@ public class DailyActivityCombinedLog implements Serializable {
private String platform; private String platform;
private String deviceName; private String deviceName;
private String countryCode; private String countryCode;
private String network_ip;
private String network_area_name;
public DailyActivityCombinedLog(String deviceIdV1, String appKey, String appType, String dt, String model, public DailyActivityCombinedLog(String deviceIdV1, String appKey, String appType, String dt, String model,
String brand, String osRelease, String appVersion, String cid, String phone, String nick, String ip, String brand, String osRelease, String appVersion, String cid, String phone, String nick, String ip,
String areaName, long waterMarkTime, String zoneName, String zoneType, String zoneCode, long firstTime, String areaName, long waterMarkTime, String zoneName, String zoneType, String zoneCode, long firstTime,
long latestTime, String deviceName, String platform,String countryCode) { long latestTime, String deviceName, String platform,String countryCode,String network_ip,String network_area_name) {
super(); super();
this.deviceIdV1 = deviceIdV1; this.deviceIdV1 = deviceIdV1;
this.appKey = appKey; this.appKey = appKey;
...@@ -67,5 +69,7 @@ public class DailyActivityCombinedLog implements Serializable { ...@@ -67,5 +69,7 @@ public class DailyActivityCombinedLog implements Serializable {
this.deviceName = deviceName; this.deviceName = deviceName;
this.platform = platform; this.platform = platform;
this.countryCode = countryCode; this.countryCode = countryCode;
this.network_ip = network_ip;
this.network_area_name = network_area_name;
} }
} }
...@@ -35,6 +35,8 @@ public class DailyActivityDeviceInfo implements Serializable{ ...@@ -35,6 +35,8 @@ public class DailyActivityDeviceInfo implements Serializable{
private Long waterMarkTime; private Long waterMarkTime;
private String platform; private String platform;
private String deviceName; private String deviceName;
private String network_ip;
private String network_area_name;
public DailyActivityDeviceInfo( public DailyActivityDeviceInfo(
String deviceId, String deviceId,
...@@ -52,7 +54,9 @@ public class DailyActivityDeviceInfo implements Serializable{ ...@@ -52,7 +54,9 @@ public class DailyActivityDeviceInfo implements Serializable{
String zoneType, String zoneType,
String zoneCode, String zoneCode,
String platform, String platform,
String deviceName String deviceName,
String network_ip,
String network_area_name
) { ) {
this.deviceId = deviceId; this.deviceId = deviceId;
this.deviceIdV1 = deviceIdV1; this.deviceIdV1 = deviceIdV1;
...@@ -70,5 +74,7 @@ public class DailyActivityDeviceInfo implements Serializable{ ...@@ -70,5 +74,7 @@ public class DailyActivityDeviceInfo implements Serializable{
this.zoneCode = zoneCode; this.zoneCode = zoneCode;
this.platform = platform; this.platform = platform;
this.deviceName = deviceName; this.deviceName = deviceName;
this.network_ip = network_ip;
this.network_area_name = network_area_name;
} }
} }
...@@ -36,7 +36,8 @@ public class DailyActivityEnrichedLog extends DailyActivityCombinedLog implement ...@@ -36,7 +36,8 @@ public class DailyActivityEnrichedLog extends DailyActivityCombinedLog implement
baseLog.getPhone(), baseLog.getNick(), baseLog.getIp(), baseLog.getPhone(), baseLog.getNick(), baseLog.getIp(),
baseLog.getAreaName(), baseLog.getWaterMarkTime(), baseLog.getAreaName(), baseLog.getWaterMarkTime(),
baseLog.getZoneName(), baseLog.getZoneType(), baseLog.getZoneCode(), baseLog.getZoneName(), baseLog.getZoneType(), baseLog.getZoneCode(),
baseLog.getFirstTime(), baseLog.getLatestTime(),baseLog.getDeviceName(), baseLog.getPlatform(), baseLog.getCountryCode() baseLog.getFirstTime(), baseLog.getLatestTime(),baseLog.getDeviceName(), baseLog.getPlatform(), baseLog.getCountryCode(),
baseLog.getNetwork_ip(),baseLog.getNetwork_area_name()
); );
this.phoneName = phoneName; this.phoneName = phoneName;
this.networkModel = networkModel; this.networkModel = networkModel;
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or sign in to comment