Commit 2bd5e5be by 魏建枢

flink初始化提交

parents
Showing with 2935 additions and 0 deletions
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>com.flink</groupId>
<artifactId>eagleEye-flink_kafka</artifactId>
<version>1.0-SNAPSHOT</version>
<build>
<resources>
<resource>
<directory>src/main/resources</directory>
</resource>
<resource>
<directory>src/main/java</directory>
<includes>
<include>**/*.xml</include>
</includes>
</resource>
</resources>
<finalName>${project.name}</finalName>
<pluginManagement>
<plugins>
<plugin>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-maven-plugin</artifactId>
<version>${spring.boot.version}</version>
<executions>
<execution>
<goals>
<goal>repackage</goal>
</goals>
</execution>
</executions>
<configuration>
<fork>true</fork>
<finalName>${project.build.finalName}</finalName>
</configuration>
</plugin>
<plugin>
<artifactId>maven-compiler-plugin</artifactId>
<version>${maven.plugin.version}</version>
<configuration>
<source>${java.version}</source>
<target>${java.version}</target>
<encoding>UTF-8</encoding>
<compilerArgs>
<arg>-parameters</arg>
</compilerArgs>
</configuration>
</plugin>
</plugins>
</pluginManagement>
<plugins>
<plugin>
<artifactId>maven-shade-plugin</artifactId>
<version>3.1.1</version>
<executions>
<execution>
<phase>package</phase>
<goals>
<goal>shade</goal>
</goals>
<configuration>
<artifactSet>
<excludes>
<exclude>org.apache.flink:force-shading</exclude>
<exclude>org.google.code.flindbugs:jar305</exclude>
<exclude>org.slf4j:*</exclude>
<excluder>org.apache.logging.log4j:*</excluder>
</excludes>
</artifactSet>
<filters>
<filter>
<artifact>*:*</artifact>
<excludes>
<exclude>META-INF/*.SF</exclude>
<exclude>META-INF/*.DSA</exclude>
<exclude>META-INF/*.RSA</exclude>
</excludes>
</filter>
</filters>
<transformers>
<transformer>
<mainClass>com.flink.KafkaStreamingJob</mainClass>
</transformer>
</transformers>
</configuration>
</execution>
</executions>
</plugin>
</plugins>
</build>
<repositories>
<repository>
<snapshots>
<enabled>false</enabled>
</snapshots>
<id>aliyun-repos</id>
<url>https://maven.aliyun.com/nexus/content/groups/public/</url>
</repository>
</repositories>
<pluginRepositories>
<pluginRepository>
<snapshots>
<enabled>false</enabled>
</snapshots>
<id>aliyun-plugin</id>
<url>https://maven.aliyun.com/nexus/content/groups/public/</url>
</pluginRepository>
</pluginRepositories>
<dependencies>
<dependency>
<groupId>org.apache.logging.log4j</groupId>
<artifactId>log4j-slf4j-impl</artifactId>
<version>2.17.1</version>
<scope>compile</scope>
</dependency>
<dependency>
<groupId>org.apache.logging.log4j</groupId>
<artifactId>log4j-api</artifactId>
<version>2.17.1</version>
<scope>compile</scope>
</dependency>
<dependency>
<groupId>org.apache.logging.log4j</groupId>
<artifactId>log4j-core</artifactId>
<version>2.17.1</version>
<scope>compile</scope>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-runtime-web</artifactId>
<version>1.20.0</version>
<scope>provided</scope>
</dependency>
</dependencies>
<properties>
<project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding>
<java.version>8</java.version>
<scala.binary.version>2.11</scala.binary.version>
<log4j.version>2.17.1</log4j.version>
<flink.version>1.20.0</flink.version>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<fastjson.version>1.2.75</fastjson.version>
<maven.plugin.version>3.8.1</maven.plugin.version>
</properties>
</project>
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>com.flink</groupId>
<artifactId>eagleEye-flink_kafka</artifactId>
<version>1.0-SNAPSHOT</version>
<!--属性设置 -->
<properties>
<!--java_JDK版本 -->
<java.version>8</java.version>
<!--maven打包插件 -->
<maven.plugin.version>3.8.1</maven.plugin.version>
<!--编译编码UTF-8 -->
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<!--输出报告编码UTF-8 -->
<project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding>
<!--json数据格式处理工具 -->
<fastjson.version>1.2.75</fastjson.version>
<!--log4j版本 -->
<log4j.version>2.17.1</log4j.version>
<!--flink版本 -->
<flink.version>1.20.0</flink.version>
<!--scala版本 -->
<scala.binary.version>2.11</scala.binary.version>
</properties>
<!--通用依赖 -->
<dependencies>
<!-- json -->
<dependency>
<groupId>com.alibaba</groupId>
<artifactId>fastjson</artifactId>
<version>${fastjson.version}</version>
</dependency>
<!-- https://mvnrepository.com/artifact/org.apache.flink/flink-java -->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-java</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-streaming-scala_2.12</artifactId>
<version>${flink.version}</version>
</dependency>
<!-- https://mvnrepository.com/artifact/org.apache.flink/flink-clients -->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-clients</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-streaming-java</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-files</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-jdbc</artifactId>
<version>3.3.0-1.20</version>
</dependency>
<dependency>
<groupId>mysql</groupId>
<artifactId>mysql-connector-java</artifactId>
<version>8.0.27</version>
</dependency>
<!--================================集成外部依赖========================================== -->
<!--集成日志框架 start -->
<dependency>
<groupId>org.apache.logging.log4j</groupId>
<artifactId>log4j-slf4j-impl</artifactId>
<version>${log4j.version}</version>
</dependency>
<dependency>
<groupId>org.apache.logging.log4j</groupId>
<artifactId>log4j-api</artifactId>
<version>${log4j.version}</version>
</dependency>
<dependency>
<groupId>org.apache.logging.log4j</groupId>
<artifactId>log4j-core</artifactId>
<version>${log4j.version}</version>
</dependency>
<!--集成日志框架 end -->
<!--kafka依赖 start -->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-kafka</artifactId>
<version>3.3.0-1.20</version>
</dependency>
<!--kafka依赖 end -->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-base</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-runtime-web</artifactId>
<version>1.20.0</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.doris</groupId>
<artifactId>flink-doris-connector-1.16</artifactId>
<version>25.0.0</version>
</dependency>
<!-- <dependency> -->
<!-- <groupId>org.apache.hadoop</groupId> -->
<!-- <artifactId>hadoop-client</artifactId> -->
<!-- <version>3.4.0</version> -->
<!-- </dependency> -->
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
<version>1.18.36</version>
</dependency>
<dependency>
<groupId>org.lionsoul</groupId>
<artifactId>ip2region</artifactId>
<version>2.6.4</version>
</dependency>
</dependencies>
<!--编译打包 -->
<build>
<finalName>${project.name}</finalName>
<!--资源文件打包 -->
<resources>
<resource>
<directory>src/main/resources</directory>
</resource>
<resource>
<directory>src/main/java</directory>
<includes>
<include>**/*.xml</include>
</includes>
</resource>
</resources>
<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-shade-plugin</artifactId>
<version>3.1.1</version>
<executions>
<execution>
<phase>package</phase>
<goals>
<goal>shade</goal>
</goals>
<configuration>
<artifactSet>
<excludes>
<exclude>org.apache.flink:force-shading</exclude>
<exclude>org.google.code.flindbugs:jar305</exclude>
<exclude>org.slf4j:*</exclude>
<excluder>org.apache.logging.log4j:*</excluder>
</excludes>
</artifactSet>
<filters>
<filter>
<artifact>*:*</artifact>
<excludes>
<exclude>META-INF/*.SF</exclude>
<exclude>META-INF/*.DSA</exclude>
<exclude>META-INF/*.RSA</exclude>
</excludes>
</filter>
</filters>
<transformers>
<transformer
implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer">
<mainClass>com.flink.KafkaStreamingJob</mainClass>
</transformer>
</transformers>
</configuration>
</execution>
</executions>
</plugin>
</plugins>
<!--插件统一管理 -->
<pluginManagement>
<plugins>
<!--maven打包插件 -->
<plugin>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-maven-plugin</artifactId>
<version>${spring.boot.version}</version>
<configuration>
<fork>true</fork>
<finalName>${project.build.finalName}</finalName>
</configuration>
<executions>
<execution>
<goals>
<goal>repackage</goal>
</goals>
</execution>
</executions>
</plugin>
<!--编译打包插件 -->
<plugin>
<artifactId>maven-compiler-plugin</artifactId>
<version>${maven.plugin.version}</version>
<configuration>
<source>${java.version}</source>
<target>${java.version}</target>
<encoding>UTF-8</encoding>
<compilerArgs>
<arg>-parameters</arg>
</compilerArgs>
</configuration>
</plugin>
</plugins>
</pluginManagement>
</build>
<!--配置Maven项目中需要使用的远程仓库 -->
<repositories>
<repository>
<id>aliyun-repos</id>
<url>https://maven.aliyun.com/nexus/content/groups/public/</url>
<snapshots>
<enabled>false</enabled>
</snapshots>
</repository>
</repositories>
<!--用来配置maven插件的远程仓库 -->
<pluginRepositories>
<pluginRepository>
<id>aliyun-plugin</id>
<url>https://maven.aliyun.com/nexus/content/groups/public/</url>
<snapshots>
<enabled>false</enabled>
</snapshots>
</pluginRepository>
</pluginRepositories>
</project>
\ No newline at end of file
package com.flink;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import com.flink.achieve.ods.OdsEventLogSourceAchi;
import com.flink.common.KafkaSourceConnector;
/**
* @author wjs
* @version 创建时间:2024-12-18 14:28:31
* 类说明
*/
public class KafkaStreamingJob {
private static final Logger logger = LoggerFactory.getLogger(KafkaSourceConnector.class);
public static void main(String[] args) throws Exception {
// StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
// DataStreamSource<String> dataStreamSource = KafkaSourceConnector.sourceKafka(env, "ods_collect_log", "collectGroup");
// //=================5.数据简单处理======================
// dataStreamSource.flatMap(new FlatMapFunction<String, String>() {
// /**
// *
// */
// private static final long serialVersionUID = 1L;
// @Override
// public void flatMap(String record, Collector<String> collector) throws Exception {
// logger.info("正在预处理源数据:{}", record);
// }
// });
// //=================6.启动服务=========================================
// env.execute("聚合统计JOB");
// OdsCollectLogSourceAchi sourceCommonBase = new OdsCollectLogSourceAchi();
// sourceCommonBase.handleDataStreamSource("测试", "ods_collect_log", "collectGroup");
OdsEventLogSourceAchi sourceEventLog = new OdsEventLogSourceAchi();
sourceEventLog.handleDataStreamSource("事件日志聚合JOB", "ods_event_log", "eventLogGroup");
}
}
package com.flink.achieve.ods;
import java.io.Serializable;
import java.util.ArrayList;
import java.util.List;
import org.apache.commons.lang3.StringUtils;
import org.apache.flink.connector.jdbc.JdbcConnectionOptions;
import org.apache.flink.connector.jdbc.JdbcSink;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import com.alibaba.fastjson.JSONObject;
import com.alibaba.fastjson.TypeReference;
import com.flink.common.SourceCommonBase;
import com.flink.enums.AppTypeEnum;
import com.flink.util.LoadPropertiesFile;
import com.flink.vo.OdsCollectLog;
import com.flink.vo.EventIpConvert;
import com.flink.vo.android.AndroidCollectionBody;
import com.flink.vo.android.deviceInfo.AndroidA1;
import com.flink.vo.android.envInfo.AndroidEnvInfo;
import com.flink.vo.android.otherInfo.OtherInfo;
import com.flink.vo.ios.IosCollectionBody;
import com.flink.vo.ios.IosDeviceInfo;
import com.flink.vo.ios.IosEnvInfo;
/**
* @author wjs
* @version 创建时间:2024-12-20 10:55:09
* 类说明
*/
public class OdsCollectLogSourceAchi extends SourceCommonBase implements Serializable{
/**
*
*/
private static final long serialVersionUID = 1L;
private static final Logger logger = LoggerFactory.getLogger(OdsCollectLogSourceAchi.class);
@Override
public void parseSourceKafkaJson(String record) throws Exception {
logger.info("record:{}",record);
// TODO 数据的 ETL 处理
OdsCollectLog odsCollectLog = JSONObject.parseObject(record,new TypeReference<OdsCollectLog>(){});
String id = odsCollectLog.getId();
String deviceId = odsCollectLog.getDevice_id();
String uid = odsCollectLog.getUid();
String strategyGroupId = odsCollectLog.getStrategy_group_id();
String strategyVersion = odsCollectLog.getStrategy_version();
String sendTime = odsCollectLog.getSend_time();
String createTime = odsCollectLog.getCreate_time();
String appKey = odsCollectLog.getApp_key();
String appType = odsCollectLog.getApp_type();
String appChannel = odsCollectLog.getApp_channel();
String zoneCode = odsCollectLog.getZone_code();
String zoneName = odsCollectLog.getZone_name();
String zoneType = odsCollectLog.getZone_type();
String sdkVersion = odsCollectLog.getSdk_version();
String userAgent = odsCollectLog.getUser_agent();
String otherInfo = odsCollectLog.getOther_info();
String deviceInfo = odsCollectLog.getDevice_info();
String envInfo = odsCollectLog.getEnv_info();
String userProperties = odsCollectLog.getUser_properties();
String uniqueId = odsCollectLog.getUnique_id();
if(StringUtils.equals(appType, AppTypeEnum.ANDROID.getCode())) {
AndroidA1 a1 = JSONObject.parseObject(deviceInfo,new TypeReference<AndroidA1>(){});
AndroidEnvInfo g1 = JSONObject.parseObject(envInfo,new TypeReference<AndroidEnvInfo>(){});
OtherInfo i1 = JSONObject.parseObject(otherInfo,new TypeReference<OtherInfo>(){});
AndroidCollectionBody androidBodyObj = new AndroidCollectionBody();
androidBodyObj.setA1(a1);
androidBodyObj.setG1(g1);
androidBodyObj.setI1(i1);
}else if(StringUtils.equals(appType, AppTypeEnum.IOS.getCode())) {
IosDeviceInfo a1 = JSONObject.parseObject(deviceInfo,new TypeReference<IosDeviceInfo>(){});
IosEnvInfo g1 = JSONObject.parseObject(envInfo,new TypeReference<IosEnvInfo>(){});
IosCollectionBody iosBodyObj = new IosCollectionBody();
iosBodyObj.setA1(a1);
iosBodyObj.setG1(g1);
}
}
@Override
public void sendToSinkKafka(DataStreamSource<String> mStream) {
// TODO Auto-generated method stub
}
}
package com.flink.achieve.ods;
import java.io.Serializable;
import java.net.Inet6Address;
import java.net.InetAddress;
import java.net.UnknownHostException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.stream.Collectors;
import org.apache.commons.collections.CollectionUtils;
import org.apache.commons.lang3.StringUtils;
import org.apache.flink.api.common.io.ParseException;
import org.apache.flink.connector.jdbc.JdbcConnectionOptions;
import org.apache.flink.connector.jdbc.JdbcSink;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import com.alibaba.fastjson.JSONObject;
import com.alibaba.fastjson.TypeReference;
import com.flink.common.SourceCommonBase;
import com.flink.enums.AppTypeEnum;
import com.flink.util.LoadPropertiesFile;
import com.flink.util.ip2region.SearcherUtil;
import com.flink.vo.EventIpConvert;
import com.flink.vo.OdsEventLog;
import com.flink.vo.UserProperties;
/**
* @author wjs
* @version 创建时间:2025-4-24 18:05:25
* 类说明
*/
public class OdsEventLogSourceAchi extends SourceCommonBase implements Serializable{
/**
*
*/
private static final long serialVersionUID = 1L;
private static final Logger logger = LoggerFactory.getLogger(OdsEventLogSourceAchi.class);
@Override
public void parseSourceKafkaJson(String record) throws ParseException, Exception {
logger.info("OdsEventLogSourceAchi record:{}",record);
// TODO 数据的 ETL 处理
OdsEventLog odsEventLog = JSONObject.parseObject(record,new TypeReference<OdsEventLog>(){});
String id = odsEventLog.getId();
String sendTime = odsEventLog.getSend_time();
String createTime = odsEventLog.getCreate_time();
String strategyGroup_id = odsEventLog.getStrategy_group_id();
String appKey =odsEventLog.getApp_key();
String appType = odsEventLog.getApp_type();
String appChannel = odsEventLog.getApp_channel();
String zoneCode = odsEventLog.getZone_code();
String zoneName = odsEventLog.getZone_name();
String zoneType = odsEventLog.getZone_type();
String sdkVersion = odsEventLog.getSdk_version();
String userAgent = odsEventLog.getUser_agent();
String deviceId = odsEventLog.getDevice_id();
String uid = odsEventLog.getUid();
String strategyVersion = odsEventLog.getStrategy_version();
String eventList = odsEventLog.getEvent_list();
String routeIp = odsEventLog.getRoute_ip();
String userProperties = odsEventLog.getUser_properties();
String uniqueId = odsEventLog.getUnique_id();
String cid = null;
String phone = null;
String nick = null;
if(StringUtils.isNotEmpty(userProperties)) {
List<UserProperties> userPropertiesList = JSONObject.parseObject(userProperties,new TypeReference<List<UserProperties>>(){});
if(userPropertiesList != null && userPropertiesList.size() > 0) {
for(UserProperties user : userPropertiesList) {
if(StringUtils.isNotEmpty(user.getCid())) {
cid = user.getCid();
}else if(StringUtils.isNotEmpty(user.getPhone())) {
phone = user.getPhone();
}else if(StringUtils.isNotEmpty(user.getId())) {
cid = user.getId();
}else if(StringUtils.isNotEmpty(user.getNick())) {
nick = user.getNick();
}else if(StringUtils.isNotEmpty(user.getEmail())) {
nick = user.getEmail();
}
}
}
}
List<String> ips = convertStringToList(routeIp);
if(CollectionUtils.isEmpty(ips)) {
return;
}
List<String> ipList = new ArrayList<>();
List<String> areaNameList = new ArrayList<>();
for(String ip:ips) {
if(!ipv6(ip)) {
String area_name = SearcherUtil.getCityInfoByFile(ip);
ipList.add(ip);
areaNameList.add(area_name);
}
}
logger.info("组装数据开始");
List<EventIpConvert> eventIpConvertList = new ArrayList<>();
EventIpConvert eventIpConvert = new EventIpConvert();
eventIpConvert.setId(id);
eventIpConvert.setIp(ipList.toString());
eventIpConvert.setAreaName(areaNameList.toString());
eventIpConvert.setDeviceId(deviceId);
eventIpConvert.setCid(cid);
eventIpConvert.setPhone(phone);
eventIpConvert.setNick(nick);
eventIpConvert.setCreateTime(createTime);
eventIpConvert.setDt(createTime);
eventIpConvertList.add(eventIpConvert);
logger.info("组装数据结束");
this.insert(eventIpConvertList);
if(StringUtils.equals(appType, AppTypeEnum.ANDROID.getCode())) {
}else if(StringUtils.equals(appType, AppTypeEnum.IOS.getCode())) {
}
}
@Override
public void sendToSinkKafka(DataStreamSource<String> mStream) {
// TODO Auto-generated method stub
}
public static List<String> convertStringToList(String str) {
if (StringUtils.isEmpty(str) || str.trim().isEmpty()) {
return new ArrayList<>(); // 返回空列表
}
// 去除中括号和多余空格
String trimmedStr = str.replaceAll("^\\[|\\]$", "").trim();
// 如果去除中括号后字符串为空,返回空列表
if (trimmedStr.isEmpty()) {
return new ArrayList<>();
}
// 按逗号拆分并去除空格
return Arrays.stream(trimmedStr.split(","))
.map(String::trim)
.collect(Collectors.toList());
}
private static Boolean ipv6(String ip) {
Boolean flag = false;
try {
Inet6Address address = (Inet6Address) InetAddress.getByName(ip);
System.out.println(ip + " is an IPv6 address.");
flag = true;
} catch (UnknownHostException e) {
System.out.println(ip + " is not a valid IPv6 address.");
} catch (ClassCastException e) {
System.out.println(ip + " is not an IPv6 address.");
}
return flag;
}
public static void insert(List<EventIpConvert> eventIpConvertList) throws Exception {
logger.info("insert eventIpConvertList:{}",eventIpConvertList.toString());
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);
env.fromData(eventIpConvertList)
.addSink(JdbcSink.sink(
"INSERT INTO `bi`.`event_ip_convert` " +
"(`id`, `ip`, `area_name`, `device_id`, `cid`, `phone`, `nick`, `create_time`, `dt`) " +
"VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?);",
(ps, data) -> {
ps.setString(1, data.getId());
ps.setString(2, data.getIp());
ps.setString(3, data.getAreaName());
ps.setString(4, data.getDeviceId());
ps.setString(5, data.getCid());
ps.setString(6, data.getPhone());
ps.setString(7, data.getNick());
ps.setString(8, data.getCreateTime());
ps.setString(9, data.getDt());
},
new JdbcConnectionOptions.JdbcConnectionOptionsBuilder()
.withUsername(LoadPropertiesFile.getPropertyFileValues("doris.username"))
.withPassword("")
.withDriverName(LoadPropertiesFile.getPropertyFileValues("doris.driver_class_name"))
.withUrl(LoadPropertiesFile.getPropertyFileValues("doris.jdbc_url"))
.build()
));
env.execute();
}
}
package com.flink.common;
import org.apache.flink.api.common.RuntimeExecutionMode;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.ExternalizedCheckpointRetention;
import org.apache.flink.core.execution.CheckpointingMode;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* @author wjs
* @version 创建时间:2025-4-23 14:36:34
* 类说明
*/
public class EnvironmentSettings {
private static final Logger logger = LoggerFactory.getLogger(EnvironmentSettings.class);
//环境设置
public static StreamExecutionEnvironment environmentSettings() {
Configuration conf = new Configuration();
conf.setInteger("rest.port", 8081);
// StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironmentWithWebUI(conf);
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
//=================启动服务=========================================
//开启flink的checkpoint功能:每隔5000ms启动一个检查点(设置checkpoint的声明周期)
//设置有且仅有一次模式 目前支持EXACTLY_ONCE/AT_LEAST_ONCE
env.enableCheckpointing(60000L, CheckpointingMode.EXACTLY_ONCE);
// env.getCheckpointConfig().setCheckpointingConsistencyMode(CheckpointingMode.EXACTLY_ONCE);
//checkpoint高级选项设置
//设置checkpoint的模式为exactly-once(这也是默认值)
//确保检查点之间至少有500ms间隔(即checkpoint的最小间隔)
env.getCheckpointConfig().setMinPauseBetweenCheckpoints(500);
//确保检查必须在1min之内完成,否则就会被丢弃掉(即checkpoint的超时时间)
env.getCheckpointConfig().setCheckpointTimeout(60000);
//同一时间只允许操作一个检查点
env.getCheckpointConfig().setMaxConcurrentCheckpoints(1);
// 在这个基础之上,添加快照
//开启在 job 中止后仍然保留的 externalizedcheckpoints 程序即使被cancel后,也会保留checkpoint数据,以便根据实际需要恢复到指定的checkpoint
env.getCheckpointConfig().setExternalizedCheckpointRetention(ExternalizedCheckpointRetention.RETAIN_ON_CANCELLATION);
//流处理模式
env.setRuntimeMode(RuntimeExecutionMode.STREAMING);
//开启checkpoints
env.getCheckpointConfig().enableUnalignedCheckpoints();
return env;
}
}
package com.flink.common;
import org.apache.flink.api.common.serialization.SimpleStringSchema;
import org.apache.flink.connector.base.DeliveryGuarantee;
import org.apache.flink.connector.kafka.sink.KafkaRecordSerializationSchema;
import org.apache.flink.connector.kafka.sink.KafkaSink;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import com.flink.util.LoadPropertiesFile;
/**
* @author wjs
* @version 创建时间:2024-12-16 18:21:22
* 类说明 sink Kafka
*/
public class KafkaSinkConnector {
public static void sinkKafka(StreamExecutionEnvironment env,String topic) {
String bootstrapServers = LoadPropertiesFile.getPropertyFileValues("kafka.bootstrapServers");
KafkaSink<String> sink = KafkaSink.<String>builder()
//设置kafka地址
.setBootstrapServers(bootstrapServers)
//设置消息序列号方式
.setRecordSerializer(KafkaRecordSerializationSchema.builder()
.setTopic(topic)
.setValueSerializationSchema(new SimpleStringSchema())
.build()
)
//至少一次
.setDeliveryGuarantee(DeliveryGuarantee.AT_LEAST_ONCE)
.build();
}
}
package com.flink.common;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.common.serialization.SimpleStringSchema;
import org.apache.flink.connector.base.DeliveryGuarantee;
import org.apache.flink.connector.kafka.sink.KafkaRecordSerializationSchema;
import org.apache.flink.connector.kafka.sink.KafkaSink;
import org.apache.flink.connector.kafka.source.KafkaSource;
import org.apache.flink.connector.kafka.source.enumerator.initializer.OffsetsInitializer;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import com.flink.util.LoadPropertiesFile;
//https://www.jianshu.com/p/fbda25d993df
/**
* @author wjs
* @version 创建时间:2024-12-16 15:57:41
* 类说明 source Kafka
*/
public class KafkaSourceConnector {
private static final Logger logger = LoggerFactory.getLogger(KafkaSourceConnector.class);
//kafka资源配置文件信息的获取
public static DataStreamSource<String> sourceKafka(StreamExecutionEnvironment env,String topic,String group) {
KafkaSource<String> kafkaSource = KafkaSource.<String>builder()
.setBootstrapServers(LoadPropertiesFile.getPropertyFileValues("kafka.bootstrapServers"))//设置kafka地址
.setTopics(topic)//设置主题,支持多种主题组合
.setGroupId(group)//设置消费这组id
.setStartingOffsets(OffsetsInitializer.earliest())//设置消费模式,支持多种消费模式
.setValueOnlyDeserializer(new SimpleStringSchema())//设置反序列化器
.setProperty("partition.discovery.interval.ms", "10000")//动态检查新分区, 10 秒检查一次新分区
.build();//构建全部参数
return env.fromSource(kafkaSource, WatermarkStrategy.noWatermarks(), "odsCollectLog");
}
//发送kafka
public static void sinkKafka(StreamExecutionEnvironment env,String topic) throws Exception {
String bootstrapServers = LoadPropertiesFile.getPropertyFileValues("kafka.bootstrapServers");
KafkaSink<String> sink = KafkaSink.<String>builder()
.setBootstrapServers(bootstrapServers)//设置kafka地址
.setRecordSerializer(KafkaRecordSerializationSchema.builder()//设置消息序列号方式
.setTopic(topic)
.setValueSerializationSchema(new SimpleStringSchema())
.build()
)
.setDeliveryGuarantee(DeliveryGuarantee.AT_LEAST_ONCE)//至少一次
.build();
}
}
package com.flink.common;
import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.common.io.ParseException;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.util.Collector;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* @author wjs
* @version 创建时间:2024-12-20 10:43:56
* 类说明 抽象类对接kafka的数据,并解析关键字段
*/
public abstract class SourceCommonBase {
private static final Logger logger = LoggerFactory.getLogger(SourceCommonBase.class);
public void handleDataStreamSource(String jobName,String topic,String group) throws Exception {
//1. 环境的设置
StreamExecutionEnvironment env = EnvironmentSettings.environmentSettings();
//2.资源配置文件信息的获取
DataStreamSource<String> dataStreamSource = KafkaSourceConnector.sourceKafka(env, topic, group);
//3.消费者接收数据并做json的粗粒度解析
DataStreamSource<String> resultDataStreamSource = praseJson(dataStreamSource);
//4.将处理完之后的数据发往 kafka 队列
// sendToSinkKafka(resultDataStreamSource);
//=================6.启动服务=========================================
env.execute(jobName);
}
/**
* 数据的 ETL 处理
* @return
*/
public DataStreamSource<String> praseJson(DataStreamSource<String> dataStreamSource) {
//=================5.数据简单处理======================
dataStreamSource.flatMap(new FlatMapFunction<String, String>() {
/**
*
*/
private static final long serialVersionUID = 1L;
@Override
public void flatMap(String record, Collector<String> collector) throws Exception {
logger.info("正在预处理源数据:{}", record);
// 抽象方法的设置
parseSourceKafkaJson(record);
}
});
return null;
}
/**
* 解析JSON数据(抽象方法的设置)
* @param record
* @return
* @throws ParseException
* @throws Exception
*/
public abstract void parseSourceKafkaJson(String record) throws ParseException, Exception;
/**
* 将处理完之后的数据发往 kafka 队列 供下游计算使用(抽象方法的设置)
* @param mStream
*/
public abstract void sendToSinkKafka(DataStreamSource<String> mStream);
}
package com.flink.enums;
/**
* @author wjs
* @version 创建时间:2025-3-13 10:14:42
* 类说明
*/
public enum AppTypeEnum {
ANDROID("1","安卓"),
IOS("2","IOS"),
;
private String code;
private String name;
private AppTypeEnum(String code, String name) {
this.code = code;
this.name = name;
}
public String getCode() {
return code;
}
public String getName() {
return name;
}
}
package com.flink.sink;
import java.util.ArrayList;
import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.common.serialization.SimpleStringSchema;
import org.apache.flink.api.java.utils.ParameterTool;
import org.apache.flink.connector.base.DeliveryGuarantee;
import org.apache.flink.connector.kafka.sink.KafkaRecordSerializationSchema;
import org.apache.flink.connector.kafka.sink.KafkaSink;
import org.apache.flink.streaming.api.CheckpointingMode;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.CheckpointConfig;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.util.Collector;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* @author wjs
* @version 创建时间:2024-12-16 15:48:34
* 类说明 kafka 连接器使用
*/
public class KafkaSinkStreamingJob {
// private static final Logger logger = LoggerFactory.getLogger(KafkaSinkStreamingJob.class);
//
// public static void main(String[] args) throws Exception {
//
// //===============1.获取参数==============================
// //定义文件路径
// String propertiesFilePath = "D:\\gitEagleEyeFlink\\eagleEye-flink_kafka\\src\\main\\resources\\application.properties";
// //方式一:直接使用内置工具类
// ParameterTool paramsMap = ParameterTool.fromPropertiesFile(propertiesFilePath);
//
// //================2.初始化kafka参数==============================
// String bootstrapServers = paramsMap.get("kafka.bootstrapServers");
// String topic = paramsMap.get("kafka.topic");
//
// KafkaSink<String> sink = KafkaSink.<String>builder()
// //设置kafka地址
// .setBootstrapServers(bootstrapServers)
// //设置消息序列号方式
// .setRecordSerializer(KafkaRecordSerializationSchema.builder()
// .setTopic(topic)
// .setValueSerializationSchema(new SimpleStringSchema())
// .build()
// )
// //至少一次
// .setDeliveryGuarantee(DeliveryGuarantee.AT_LEAST_ONCE)
// .build();
//
//
// //=================4.创建Flink运行环境=================
// StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
// ArrayList<String> listData = new ArrayList<>();
// listData.add("test");
// listData.add("java");
// listData.add("c++");
// DataStreamSource<String> dataStreamSource = env.fromCollection(listData);
//
// //=================5.数据简单处理======================
// SingleOutputStreamOperator<String> flatMap = dataStreamSource.flatMap(new FlatMapFunction<String, String>() {
// @Override
// public void flatMap(String record, Collector<String> collector) throws Exception {
// logger.info("正在处理kafka数据:{}", record);
// collector.collect(record);
// }
// });
//
// //数据输出算子
// flatMap.sinkTo(sink);
//
// //=================6.启动服务=========================================
// //开启flink的checkpoint功能:每隔1000ms启动一个检查点(设置checkpoint的声明周期)
// env.enableCheckpointing(1000);
// //checkpoint高级选项设置
// //设置checkpoint的模式为exactly-once(这也是默认值)
// env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);
// //确保检查点之间至少有500ms间隔(即checkpoint的最小间隔)
// env.getCheckpointConfig().setMinPauseBetweenCheckpoints(500);
// //确保检查必须在1min之内完成,否则就会被丢弃掉(即checkpoint的超时时间)
// env.getCheckpointConfig().setCheckpointTimeout(60000);
// //同一时间只允许操作一个检查点
// env.getCheckpointConfig().setMaxConcurrentCheckpoints(1);
// //程序即使被cancel后,也会保留checkpoint数据,以便根据实际需要恢复到指定的checkpoint
// env.getCheckpointConfig().enableExternalizedCheckpoints(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);
// //设置statebackend,指定state和checkpoint的数据存储位置(checkpoint的数据必须得有一个可以持久化存储的地方)
// env.getCheckpointConfig().setCheckpointStorage("file:///E:/flink/checkPoint");
// env.execute();
// }
}
package com.flink.source;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.common.serialization.SimpleStringSchema;
import org.apache.flink.api.java.utils.ParameterTool;
import org.apache.flink.connector.kafka.source.KafkaSource;
import org.apache.flink.connector.kafka.source.KafkaSourceBuilder;
import org.apache.flink.connector.kafka.source.enumerator.initializer.OffsetsInitializer;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.util.Collector;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
// 网址https://gitee.com/shawsongyue/aurora/tree/develop/aurora_flink_connector_kafka
// 网址https://blog.csdn.net/weixin_40736233/article/details/136002402
/**
* @author wjs
* @version 创建时间:2024-12-16 15:48:34
* 类说明
*/
public class KafkaSourceStreamingJob {
// private static final Logger logger = LoggerFactory.getLogger(KafkaSourceStreamingJob.class);
//
// public static void main(String[] args) throws Exception {
//
// //===============1.获取参数==============================
// //定义文件路径
// String propertiesFilePath = "D:\\gitEagleEyeFlink\\eagleEye-flink_kafka\\src\\main\\resources\\application.properties";
// //方式一:直接使用内置工具类
// ParameterTool paramsMap = ParameterTool.fromPropertiesFile(propertiesFilePath);
//
// //================2.初始化kafka参数==============================
// String bootstrapServers = paramsMap.get("kafka.bootstrapServers");
// String group = paramsMap.get("kafka.group");
//
//
// //=================3.创建kafka数据源=============================
// KafkaSourceBuilder<String> kafkaSourceBuilder = KafkaSource.builder();
// //(1)设置kafka地址
// kafkaSourceBuilder.setBootstrapServers(bootstrapServers);
// //(2)设置消费这组id
// kafkaSourceBuilder.setGroupId(group);
// //(3)设置主题,支持多种主题组合
// setTopic(kafkaSourceBuilder);
// //(4)设置消费模式,支持多种消费模式
// setStartingOffsets(kafkaSourceBuilder);
// //(5)设置反序列化器
// setDeserializer(kafkaSourceBuilder);
// //(6)构建全部参数
// KafkaSource<String> kafkaSource = kafkaSourceBuilder.build();
// //(7)动态检查新分区, 10 秒检查一次新分区
// kafkaSourceBuilder.setProperty("partition.discovery.interval.ms", "10000");
//
// //=================4.创建Flink运行环境=================
// StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
// DataStreamSource<String> dataStreamSource = env.fromSource(kafkaSource, WatermarkStrategy.noWatermarks(), "Kafka Source");
//
// //=================5.数据简单处理======================
// dataStreamSource.flatMap(new FlatMapFunction<String, String>() {
// @Override
// public void flatMap(String record, Collector<String> collector) throws Exception {
// logger.info("正在预处理源数据:{}", record);
// }
// });
//
// //=================6.启动服务=========================================
// env.execute();
// }
//
//
// /**
// *
// * @description 主题模式设置
// * 1.设置单个主题
// * 2.设置多个主题
// * 3.设置主题list
// * 4.设置正则表达式匹配主题
// * 5.订阅指定分区Partition
// *
// * @author 浅夏的猫
// * @datetime 21:18 2024/2/5
// * @param kafkaSourceBuilder
// */
// private static void setTopic(KafkaSourceBuilder<String> kafkaSourceBuilder) {
// //组合1:设置单个主题
// kafkaSourceBuilder.setTopics("ods_collect_log");
// //组合2:设置多个主题
//// kafkaSourceBuilder.setTopics("topic_a", "topic_b");
// //组合3:设置主题list
//// kafkaSourceBuilder.setTopics(Arrays.asList("topic_a", "topic_b"));
// //组合4:设置正则表达式匹配主题
//// kafkaSourceBuilder.setTopicPattern(Pattern.compile("topic_a.*"));
// //组合5:订阅指定分区Partition,指定消费主题的哪一个分区,也支持消费多个主题的多个分区
//// final HashSet<TopicPartition> partitionSet = new HashSet<>(Arrays.asList(new TopicPartition("topic_a", 0), new TopicPartition("topic_b", 4)));
//// kafkaSourceBuilder.setPartitions(partitionSet);
// }
//
// /**
// * @description 消费模式
// * 1.从消费组提交的位点开始消费,不指定位点重置策略
// * 2.从消费组提交的位点开始消费,如果提交位点不存在,使用最早位点
// * 3.从时间戳大于等于指定时间戳(毫秒)的数据开始消费
// * 4.从最早位点开始消费
// * 5.从最末尾位点开始消费,即从注册时刻开始消费
// *
// * @author 浅夏的猫
// * @datetime 21:27 2024/2/5
// * @param kafkaSourceBuilder
// */
// private static void setStartingOffsets(KafkaSourceBuilder<String> kafkaSourceBuilder){
// //模式1: 从消费组提交的位点开始消费,不指定位点重置策略,这种策略会报异常,没有设置快照或设置自动提交offset:Caused by: org.apache.kafka.clients.consumer.NoOffsetForPartitionException: Undefined offset with no reset policy for partitions: [topic_a-3]
//// kafkaSourceBuilder.setStartingOffsets(OffsetsInitializer.committedOffsets());
// //模式2:从消费组提交的位点开始消费,如果提交位点不存在,使用最早位点
//// kafkaSourceBuilder.setStartingOffsets(OffsetsInitializer.committedOffsets(OffsetResetStrategy.EARLIEST));
// //模式3:从时间戳大于等于指定时间戳(毫秒)的数据开始消费
// kafkaSourceBuilder.setStartingOffsets(OffsetsInitializer.timestamp(1657256176000L));
// //模式4:从最早位点开始消费
//// kafkaSourceBuilder.setStartingOffsets(OffsetsInitializer.earliest());
// //模式5:从最末尾位点开始消费,即从注册时刻开始消费
//// kafkaSourceBuilder.setStartingOffsets(OffsetsInitializer.latest());
// }
//
// /**
// * @description 设置反序列器,支持多种反序列号方式
// * 1.自定义如何解析kafka数据
// * 2.使用Kafka 提供的解析器处理
// * 3.只设置kafka的value反序列化
// *
// * @author 浅夏的猫
// * @datetime 21:35 2024/2/5
// * @param kafkaSourceBuilder
// */
// private static void setDeserializer(KafkaSourceBuilder<String> kafkaSourceBuilder){
// //1.自定义如何解析kafka数据
//// KafkaRecordDeserializationSchema<String> kafkaRecordDeserializationSchema = new KafkaRecordDeserializationSchema<>() {
//// @Override
//// public TypeInformation<String> getProducedType() {
//// return TypeInformation.of(String.class);
//// }
////
//// @Override
//// public void deserialize(ConsumerRecord<byte[], byte[]> consumerRecord, Collector<String> collector) throws IOException {
//// //自定义解析数据
//// byte[] valueByte = consumerRecord.value();
//// String value = new String(valueByte);
//// //下发消息
//// collector.collect(value);
//// }
//// };
//// kafkaSourceBuilder.setDeserializer(kafkaRecordDeserializationSchema);
//
// //2.使用Kafka 提供的解析器处理
//// kafkaSourceBuilder.setDeserializer(KafkaRecordDeserializationSchema.valueOnly(StringDeserializer.class));
//
// //3.只设置kafka的value反序列化
// kafkaSourceBuilder.setValueOnlyDeserializer(new SimpleStringSchema());
// }
}
package com.flink.util;
import java.util.ArrayList;
import java.util.List;
import org.apache.commons.collections.CollectionUtils;
import com.flink.vo.android.AndroidCollectionBody;
import com.flink.vo.android.deviceInfo.AndroidA1;
import com.flink.vo.android.deviceInfo.D3;
import com.flink.vo.ios.IosCollectionBody;
import com.flink.vo.ios.IosDeviceInfo;
/**
* @author wjs
* @version 创建时间:2025-3-25 17:53:21
* 类说明
*/
public class GenDeviceIdV0 {
private final static String SALT = "yylmfy6iehjzmq";
public static String genAndroidDeviceIdHashV1(AndroidCollectionBody androidBodyObj) {
System.out.println("GenDeviceIdV1 genAndroidDeviceIdHashV1 start!");
StringBuilder sbD3 = new StringBuilder();
StringBuilder sbDevicId = new StringBuilder();
AndroidA1 a1 = new AndroidA1();
List<D3> d3List = new ArrayList<>();
if(androidBodyObj != null) {
a1 = androidBodyObj.getA1();
if(a1 != null) {
d3List = a1.getD3();
if(CollectionUtils.isEmpty(d3List)) {
d3List = new ArrayList<>();
}
}else {
a1 = new AndroidA1();
}
}
for (D3 d3 : d3List) {
sbD3.append(d3.toString()).append(":");
}
sbDevicId.append(a1.getB6()).append("/");
sbDevicId.append(a1.getB2()).append("/");
sbDevicId.append(a1.getB15()).append("/");
sbDevicId.append(a1.getB1()).append("/");
sbDevicId.append(a1.getB3()).append("/");
sbDevicId.append(a1.getB14()).append("/");
sbDevicId.append(a1.getB8()).append("/");
// sbDevicId.append(Build.FINGERPRINT).append("/");
sbDevicId.append(a1.getD4().split("&")[0]).append("/");
sbDevicId.append(a1.getC3()).append("/");
System.out.println("GenDeviceIdV1 genAndroidDeviceIdHashV1 end! body:"+null + ":" + sbD3.toString() + ":" + sbDevicId.toString());
return Md5Utils.string2MD5(null + ":" + sbD3.toString() + ":" + sbDevicId.toString());
}
public static String genIosDeviceIdHash(IosCollectionBody iosBodyObj) {
System.out.println("GenDeviceIdV1 genIosDeviceIdHash start!");
StringBuilder sb = new StringBuilder();
IosDeviceInfo a1 = new IosDeviceInfo();
if(null != iosBodyObj) {
a1 = iosBodyObj.getA1();
if(null == a1) {
a1 = new IosDeviceInfo();
}
}
sb.append(a1.getAp1())
.append(a1.getAp5())
;
System.out.println("GenDeviceIdV1 genIosDeviceIdHash end! body:"+sb.toString());
return (HashUtil.genKeyHash(sb.toString())+SALT).toLowerCase();
}
}
package com.flink.util;
import java.util.ArrayList;
import java.util.List;
import org.apache.commons.collections.CollectionUtils;
import com.flink.vo.android.AndroidCollectionBody;
import com.flink.vo.android.deviceInfo.AndroidA1;
import com.flink.vo.android.deviceInfo.D3;
import com.flink.vo.android.otherInfo.OtherInfo;
import com.flink.vo.ios.IosCollectionBody;
import com.flink.vo.ios.IosDeviceInfo;
/**
* @author wjs
* @version 创建时间:2025-3-12 18:31:28
* 类说明
*/
public class GenDeviceIdV1 {
private final static String SALT = "yylmfy6iehjzmq";
public static String genAndroidDeviceIdHashV1(AndroidCollectionBody androidBodyObj) {
System.out.println("GenDeviceIdV1 genAndroidDeviceIdHashV1 start!");
StringBuilder sbD3 = new StringBuilder();
StringBuilder sbDevicId = new StringBuilder();
AndroidA1 a1 = new AndroidA1();
OtherInfo i1 = new OtherInfo();
List<D3> d3List = new ArrayList<>();
if(androidBodyObj != null) {
a1 = androidBodyObj.getA1();
i1 = androidBodyObj.getI1();
if(a1 != null) {
d3List = a1.getD3();
if(CollectionUtils.isEmpty(d3List)) {
d3List = new ArrayList<>();
}
}else {
a1 = new AndroidA1();
}
if(i1 == null) {
i1 = new OtherInfo();
}
}
for (D3 d3 : d3List) {
sbD3.append(d3.toString()).append(":");
}
sbDevicId.append(a1.getB6()).append("/");
sbDevicId.append(a1.getB2()).append("/");
sbDevicId.append(a1.getB15()).append("/");
sbDevicId.append(a1.getB1()).append("/");
sbDevicId.append(a1.getB3()).append("/");
sbDevicId.append(a1.getB14()).append("/");
sbDevicId.append(a1.getB8()).append("/");
// sbDevicId.append(Build.FINGERPRINT).append("/");
sbDevicId.append(a1.getD4().split("&")[0]).append("/");
sbDevicId.append(a1.getC3()).append("/");
System.out.println("GenDeviceIdV1 genAndroidDeviceIdHashV1 end! body:"+i1.getI3()+i1.getI4()+i1.getI5() + ":" + sbD3.toString() + ":" + sbDevicId.toString());
return Md5Utils.string2MD5(i1.getI3()+i1.getI4()+i1.getI5() + ":" + sbD3.toString() + ":" + sbDevicId.toString());
}
public static String genIosDeviceIdHash(IosCollectionBody iosBodyObj) {
System.out.println("GenDeviceIdV1 genIosDeviceIdHash start!");
StringBuilder sb = new StringBuilder();
IosDeviceInfo a1 = new IosDeviceInfo();
if(null != iosBodyObj) {
a1 = iosBodyObj.getA1();
if(null == a1) {
a1 = new IosDeviceInfo();
}
}
sb.append(a1.getAp1()+a1.getAp2())
.append(a1.getB1())
.append(a1.getB2())
.append(a1.getC3())
.append(a1.getAp5())
.append(a1.getAp6())
;
StringBuilder sbIos = new StringBuilder();
sbIos.append((HashUtil.genKeyHash(sb.toString())+SALT).toLowerCase())
.append(",")
.append(a1.getAp2());
System.out.println("GenDeviceIdV1 genIosDeviceIdHash end! body:"+sb.toString()+">>>ios:"+sbIos.toString());
return sbIos.toString();
}
}
package com.flink.util;
import java.security.MessageDigest;
import java.security.NoSuchAlgorithmException;
/**
* 生成哈希值
* @author wjs
*
*/
public class HashUtil {
public static String genKeyHash(String inputString) {
try {
//创建SHA-256哈希计算对象
MessageDigest digest = MessageDigest.getInstance("SHA-256");
//使用digest.update处理输入数据
digest.update(inputString.getBytes());
//获取哈希值
byte[] hashBytes = digest.digest();
//将哈希值转换为十六进制字符串
StringBuilder hashString = new StringBuilder();
for (byte b : hashBytes) {
String hex = Integer.toHexString(0xff & b);
if (hex.length() == 1) {
hashString.append('0');
}
hashString.append(hex);
}
//返回哈希值
return hashString.toString();
} catch (NoSuchAlgorithmException e) {
e.printStackTrace();
}
return null;
}
}
package com.flink.util;
import java.io.InputStream;
import java.util.Properties;
/**
* @author wjs
* @version 创建时间:2024-12-19 17:10:09 类说明 读取文件信息
*/
public class LoadPropertiesFile {
public static String getPropertyFileValues(String proKey) {
String proStr = "";
try {
// 读取配置文件
InputStream is = LoadPropertiesFile.class.getClassLoader().getResourceAsStream("application.properties");
Properties properties = new Properties();
properties.load(is);
proStr = properties.getProperty(proKey);
} catch (Exception e) {
e.printStackTrace();
throw new RuntimeException(e);
}
return proStr;
}
}
package com.flink.util;
import java.security.MessageDigest;
/**
* @author wjs
* @version 创建时间:2025-1-8 16:37:18
* 类说明
*/
public class Md5Utils {
public static String string2MD5(String inStr) {
MessageDigest md5 = null;
try {
md5 = MessageDigest.getInstance("MD5");
} catch (Exception e) {
System.out.println(e.toString());
e.printStackTrace();
return "";
}
char[] charArray = inStr.toCharArray();
byte[] byteArray = new byte[charArray.length];
for (int i = 0; i < charArray.length; i++)
byteArray[i] = (byte) charArray[i];
byte[] md5Bytes = md5.digest(byteArray);
StringBuffer hexValue = new StringBuffer();
for (int i = 0; i < md5Bytes.length; i++) {
int val = ((int) md5Bytes[i]) & 0xff;
if (val < 16)
hexValue.append("0");
hexValue.append(Integer.toHexString(val));
}
return hexValue.toString();
}
}
package com.flink.util.ip2region;
import java.io.IOException;
import java.util.concurrent.TimeUnit;
import org.lionsoul.ip2region.xdb.Searcher;
import lombok.extern.log4j.Log4j2;
/**
* @author wjs
* @version 创建时间:2025-2-14 14:38:41
* 类说明 https://blog.csdn.net/qq_37284798/article/details/130005988
*/
@Log4j2
public class SearcherUtil {
public static String getCityInfoByFile(String ip) {
// 1、创建 searcher 对象
String dbPath = "D:\\gitEagleEye\\eagleEye-service\\src\\main\\resources\\ip2region.xdb";
// String dbPath = "/home/opc/ip2region.xdb";
Searcher searcher;
try {
searcher = Searcher.newWithFileOnly(dbPath);
} catch (IOException e) {
log.error("failed to create searcher with `{}`: ", dbPath, e);
return null;
}
// 2、查询
try {
long sTime = System.nanoTime();
String region = searcher.search(ip);
long cost = TimeUnit.NANOSECONDS.toMicros(System.nanoTime() - sTime);
log.info("{region: {}, ioCount: {}, took: {} μs}", region, searcher.getIOCount(), cost);
return region;
} catch (Exception e) {
log.info("failed to search({}): ", ip, e);
}finally {
try {
searcher.close();
} catch (IOException e) {
log.info("failed to close({}): ", ip, e);
}
}
return null;
// 3、备注:并发使用,每个线程需要创建一个独立的 searcher 对象单独使用。
}
public static void main(String[] args) throws Exception {
getCityInfoByFile("1.9.241.214");
}
}
package com.flink.vo;
import java.io.Serializable;
import lombok.Data;
/**
* @author wjs
* @version 创建时间:2025-4-24 13:06:19
* 类说明
*/
@Data
public class EventIpConvert implements Serializable{
/**
*
*/
private static final long serialVersionUID = 1L;
private String id;
private String ip;
private String areaName;
private String deviceId;
private String cid;
private String phone;
private String nick;
private String createTime;
private String dt;
}
package com.flink.vo;
import java.io.Serializable;
import lombok.Data;
/**
* @author wjs
* @version 创建时间:2025-4-24 15:21:04
* 类说明
*/
@Data
public class OdsCollectLog implements Serializable{
/**
*
*/
private static final long serialVersionUID = 1L;
private String id;
private String device_id;
private String uid;
private String strategy_group_id;
private String strategy_version;
private String send_time;
private String create_time;
private String app_key;
private String app_type;
private String app_channel;
private String zone_code;
private String zone_name;
private String zone_type;
private String sdk_version;
private String user_agent;
private String other_info;
private String device_info;
private String env_info;
private String user_properties;
private String unique_id;
}
package com.flink.vo;
import java.io.Serializable;
import lombok.Data;
/**
* @author wjs
* @version 创建时间:2025-4-24 15:24:21
* 类说明
*/
@Data
public class OdsEventLog implements Serializable{
/**
*
*/
private static final long serialVersionUID = 1L;
private String id;
private String send_time;
private String create_time;
private String strategy_group_id;
private String app_key;
private String app_type;
private String app_channel;
private String zone_code;
private String zone_name;
private String zone_type;
private String sdk_version;
private String user_agent;
private String device_id;
private String uid;
private String strategy_version;
private String event_list;
private String route_ip;
private String user_properties;
private String unique_id;
}
package com.flink.vo;
import java.io.Serializable;
import lombok.Data;
import lombok.ToString;
/**
* @author wjs
* @version 创建时间:2025-3-18 11:15:27
* 类说明
*/
@Data
@ToString
public class UserProperties implements Serializable{
/**
*
*/
private static final long serialVersionUID = 1L;
private String cid;
private String id;
private String phone;
private String email;
private String nick;
}
package com.flink.vo.android;
import java.io.Serializable;
import com.flink.vo.android.deviceInfo.AndroidA1;
import com.flink.vo.android.envInfo.AndroidEnvInfo;
import com.flink.vo.android.otherInfo.OtherInfo;
import lombok.Data;
/**
* @author wjs
* @version 创建时间:2024-11-21 17:46:27
* 类说明
*/
@Data
public class AndroidCollectionBody implements Serializable{
/**
*
*/
private static final long serialVersionUID = 1L;
private AndroidA1 a1;
private AndroidEnvInfo g1;
private OtherInfo i1;
}
package com.flink.vo.android.deviceInfo;
import java.io.Serializable;
import java.util.List;
/**
* @author wjs
* @version 创建时间:2024-11-21 17:43:44
* 类说明
*/
public class AndroidA1 implements Serializable{
/**
*
*/
private static final long serialVersionUID = 1L;
private E2 e2;
private C7 c7;
private D5 d5;
private String d12= "000000000000";
private String f2= "000000000000";
private String d11= "000000000000";
private String d14= "000000000000";
private String d15= "000000000000";
private String f1= "000000000000";
private String d4= "000000000000";
private String d9= "000000000000";
private String d13= "000000000000";
private String c3= "0*0";
private String c5= "000000000000";
private String c6= "000000000000";
private String b11= "000000000000";
private String b6= "000000000000";
private String b5= "000000000000";
private String b7= "000000000000";
private String b15= "000000000000";
private String b4= "000000000000";
private String b13= "000000000000";
private String b17= "000000000000";
private String b1= "000000000000";
private String b10= "000000000000";
private String b2= "000000000000";
private String b3= "000000000000";
private String b8= "unknown";
private String build_finger_print= "000000000000";
private String b19= "000000000000";
private String b9= "000000000000";
private String b14= "000000000000";
private D2 d2;
private List<D3> d3;
private String c4= "000000000000";
public E2 getE2() {
return e2;
}
public void setE2(E2 e2) {
this.e2 = e2;
}
public C7 getC7() {
return c7;
}
public void setC7(C7 c7) {
this.c7 = c7;
}
public D5 getD5() {
return d5;
}
public void setD5(D5 d5) {
this.d5 = d5;
}
public String getD12() {
return d12;
}
public void setD12(String d12) {
this.d12 = d12;
}
public String getF2() {
return f2;
}
public void setF2(String f2) {
this.f2 = f2;
}
public String getD11() {
return d11;
}
public void setD11(String d11) {
this.d11 = d11;
}
public String getD14() {
return d14;
}
public void setD14(String d14) {
this.d14 = d14;
}
public String getD15() {
return d15;
}
public void setD15(String d15) {
this.d15 = d15;
}
public String getF1() {
return f1;
}
public void setF1(String f1) {
this.f1 = f1;
}
public String getD4() {
return d4;
}
public void setD4(String d4) {
this.d4 = d4;
}
public String getD9() {
return d9;
}
public void setD9(String d9) {
this.d9 = d9;
}
public String getD13() {
return d13;
}
public void setD13(String d13) {
this.d13 = d13;
}
public String getC3() {
return c3;
}
public void setC3(String c3) {
this.c3 = c3;
}
public String getC5() {
return c5;
}
public void setC5(String c5) {
this.c5 = c5;
}
public String getC6() {
return c6;
}
public void setC6(String c6) {
this.c6 = c6;
}
public String getB11() {
return b11;
}
public void setB11(String b11) {
this.b11 = b11;
}
public String getB6() {
return b6;
}
public void setB6(String b6) {
this.b6 = b6;
}
public String getB5() {
return b5;
}
public void setB5(String b5) {
this.b5 = b5;
}
public String getB7() {
return b7;
}
public void setB7(String b7) {
this.b7 = b7;
}
public String getB15() {
return b15;
}
public void setB15(String b15) {
this.b15 = b15;
}
public String getB4() {
return b4;
}
public void setB4(String b4) {
this.b4 = b4;
}
public String getB13() {
return b13;
}
public void setB13(String b13) {
this.b13 = b13;
}
public String getB17() {
return b17;
}
public void setB17(String b17) {
this.b17 = b17;
}
public String getB1() {
return b1;
}
public void setB1(String b1) {
this.b1 = b1;
}
public String getB10() {
return b10;
}
public void setB10(String b10) {
this.b10 = b10;
}
public String getB2() {
return b2;
}
public void setB2(String b2) {
this.b2 = b2;
}
public String getB3() {
return b3;
}
public void setB3(String b3) {
this.b3 = b3;
}
public String getB8() {
return b8;
}
public void setB8(String b8) {
this.b8 = b8;
}
public String getBuild_finger_print() {
return build_finger_print;
}
public void setBuild_finger_print(String build_finger_print) {
this.build_finger_print = build_finger_print;
}
public String getB19() {
return b19;
}
public void setB19(String b19) {
this.b19 = b19;
}
public String getB9() {
return b9;
}
public void setB9(String b9) {
this.b9 = b9;
}
public String getB14() {
return b14;
}
public void setB14(String b14) {
this.b14 = b14;
}
public D2 getD2() {
return d2;
}
public void setD2(D2 d2) {
this.d2 = d2;
}
public List<D3> getD3() {
return d3;
}
public void setD3(List<D3> d3) {
this.d3 = d3;
}
public String getC4() {
return c4;
}
public void setC4(String c4) {
this.c4 = c4;
}
}
package com.flink.vo.android.deviceInfo;
import java.io.Serializable;
/**
* @author wjs
* @version 创建时间:2024-11-21 17:54:43
* 类说明
*/
public class C7 implements Serializable{
/**
*
*/
private static final long serialVersionUID = 1L;
private String ua1= "000000000000";
private String ua2= "000000000000";
public String getUa1() {
return ua1;
}
public void setUa1(String ua1) {
this.ua1 = ua1;
}
public String getUa2() {
return ua2;
}
public void setUa2(String ua2) {
this.ua2 = ua2;
}
}
package com.flink.vo.android.deviceInfo;
import java.io.Serializable;
/**
* @author wjs
* @version 创建时间:2024-11-21 18:14:23
* 类说明
*/
public class D2 implements Serializable{
/**
*
*/
private static final long serialVersionUID = 1L;
private String totalMemory= "000000000000";
public String getTotalMemory() {
return totalMemory;
}
public void setTotalMemory(String totalMemory) {
this.totalMemory = totalMemory;
}
}
package com.flink.vo.android.deviceInfo;
import java.io.Serializable;
/**
* @author wjs
* @version 创建时间:2025-1-8 15:53:13
* 类说明
*/
public class D3 implements Serializable{
/**
*
*/
private static final long serialVersionUID = 1L;
private int j1 = 0;
private String j2 = "000000000000";
private int j3 = 0;
private String j4 = "000000000000";
private float j5 = 0;
private int j6 = 0;
private float j7 = 0;
private float j8 = 0;
@Override
public String toString() {
return "SensorDevice{" +
"type=" + j1 +
", name='" + j2 + '\'' +
", version=" + j3 +
", vendor='" + j4 + '\'' +
", max_range=" + j5 +
", min_delay=" + j6 +
", power=" + j7 +
", resolution=" + j8 +
'}';
}
public int getJ1() {
return j1;
}
public void setJ1(int j1) {
this.j1 = j1;
}
public String getJ2() {
return j2;
}
public void setJ2(String j2) {
this.j2 = j2;
}
public int getJ3() {
return j3;
}
public void setJ3(int j3) {
this.j3 = j3;
}
public String getJ4() {
return j4;
}
public void setJ4(String j4) {
this.j4 = j4;
}
public float getJ5() {
return j5;
}
public void setJ5(float j5) {
this.j5 = j5;
}
public int getJ6() {
return j6;
}
public void setJ6(int j6) {
this.j6 = j6;
}
public float getJ7() {
return j7;
}
public void setJ7(float j7) {
this.j7 = j7;
}
public float getJ8() {
return j8;
}
public void setJ8(float j8) {
this.j8 = j8;
}
}
package com.flink.vo.android.deviceInfo;
import java.io.Serializable;
/**
* @author wjs
* @version 创建时间:2024-11-21 17:56:41
* 类说明
*/
public class D5 implements Serializable{
/**
*
*/
private static final long serialVersionUID = 1L;
private String k1= "000000000000";
private String k3= "000000000000";
private String k4= "000000000000";
private String k2= "000000000000";
public String getK1() {
return k1;
}
public void setK1(String k1) {
this.k1 = k1;
}
public String getK3() {
return k3;
}
public void setK3(String k3) {
this.k3 = k3;
}
public String getK4() {
return k4;
}
public void setK4(String k4) {
this.k4 = k4;
}
public String getK2() {
return k2;
}
public void setK2(String k2) {
this.k2 = k2;
}
}
package com.flink.vo.android.deviceInfo;
import java.io.Serializable;
/**
* @author wjs
* @version 创建时间:2024-11-21 17:51:01
* 类说明
*/
public class E2 implements Serializable{
/**
*
*/
private static final long serialVersionUID = 1L;
private String n4= "000000000000";
private String n1= "000000000000";
public String getN4() {
return n4;
}
public void setN4(String n4) {
this.n4 = n4;
}
public String getN1() {
return n1;
}
public void setN1(String n1) {
this.n1 = n1;
}
}
package com.flink.vo.android.envInfo;
import java.io.Serializable;
import java.util.List;
import com.flink.vo.android.otherInfo.H1;
/**
* @author wjs
* @version 创建时间:2024-11-21 17:43:34
* 类说明
*/
public class AndroidEnvInfo implements Serializable{
/**
*
*/
private static final long serialVersionUID = 1L;
private H2 h2;
private G8 g8;
private List<H1> h1;
private String g3= "000000000000";
private String g2= "000000000000";
private String g5= "000000000000";
private String g6= "000000000000";
private String g4= "000000000000";
public H2 getH2() {
return h2;
}
public void setH2(H2 h2) {
this.h2 = h2;
}
public G8 getG8() {
return g8;
}
public void setG8(G8 g8) {
this.g8 = g8;
}
public List<H1> getH1() {
return h1;
}
public void setH1(List<H1> h1) {
this.h1 = h1;
}
public String getG3() {
return g3;
}
public void setG3(String g3) {
this.g3 = g3;
}
public String getG2() {
return g2;
}
public void setG2(String g2) {
this.g2 = g2;
}
public String getG5() {
return g5;
}
public void setG5(String g5) {
this.g5 = g5;
}
public String getG6() {
return g6;
}
public void setG6(String g6) {
this.g6 = g6;
}
public String getG4() {
return g4;
}
public void setG4(String g4) {
this.g4 = g4;
}
}
package com.flink.vo.android.envInfo;
import java.io.Serializable;
/**
* @author wjs
* @version 创建时间:2024-11-21 18:59:05
* 类说明
*/
public class G8 implements Serializable{
/**
*
*/
private static final long serialVersionUID = 1L;
private String system_lib64= "000000000000";
private String system_lib= "000000000000";
private String vendor_lib64= "000000000000";
private String system_fonts= "000000000000";
private String vendor_lib= "000000000000";
public String getSystem_lib64() {
return system_lib64;
}
public void setSystem_lib64(String system_lib64) {
this.system_lib64 = system_lib64;
}
public String getSystem_lib() {
return system_lib;
}
public void setSystem_lib(String system_lib) {
this.system_lib = system_lib;
}
public String getVendor_lib64() {
return vendor_lib64;
}
public void setVendor_lib64(String vendor_lib64) {
this.vendor_lib64 = vendor_lib64;
}
public String getSystem_fonts() {
return system_fonts;
}
public void setSystem_fonts(String system_fonts) {
this.system_fonts = system_fonts;
}
public String getVendor_lib() {
return vendor_lib;
}
public void setVendor_lib(String vendor_lib) {
this.vendor_lib = vendor_lib;
}
}
package com.flink.vo.android.envInfo;
import java.io.Serializable;
/**
* @author wjs
* @version 创建时间:2024-11-21 18:25:41
* 类说明
*/
public class H2 implements Serializable{
/**
*
*/
private static final long serialVersionUID = 1L;
private String p3= "000000000000";
private String p2= "000000000000";
private String p5= "000000000000";
public String getP3() {
return p3;
}
public void setP3(String p3) {
this.p3 = p3;
}
public String getP2() {
return p2;
}
public void setP2(String p2) {
this.p2 = p2;
}
public String getP5() {
return p5;
}
public void setP5(String p5) {
this.p5 = p5;
}
}
package com.flink.vo.android.otherInfo;
import java.io.Serializable;
/**
* @author wjs
* @version 创建时间:2024-11-21 18:27:52
* 类说明
*/
public class H1 implements Serializable{
/**
*
*/
private static final long serialVersionUID = 1L;
private String o1= "000000000000";
private String in_time= "000000000000";
private String version_name= "000000000000";
private String package_name= "000000000000";
private String version_code= "000000000000";
private String up_time= "000000000000";
private String source_dir= "000000000000";
public String getO1() {
return o1;
}
public void setO1(String o1) {
this.o1 = o1;
}
public String getIn_time() {
return in_time;
}
public void setIn_time(String in_time) {
this.in_time = in_time;
}
public String getVersion_name() {
return version_name;
}
public void setVersion_name(String version_name) {
this.version_name = version_name;
}
public String getPackage_name() {
return package_name;
}
public void setPackage_name(String package_name) {
this.package_name = package_name;
}
public String getVersion_code() {
return version_code;
}
public void setVersion_code(String version_code) {
this.version_code = version_code;
}
public String getUp_time() {
return up_time;
}
public void setUp_time(String up_time) {
this.up_time = up_time;
}
public String getSource_dir() {
return source_dir;
}
public void setSource_dir(String source_dir) {
this.source_dir = source_dir;
}
}
package com.flink.vo.android.otherInfo;
import java.io.Serializable;
/**
* @author wjs
* @version 创建时间:2024-11-21 18:19:10
* 类说明
*/
public class I6 implements Serializable{
/**
*
*/
private static final long serialVersionUID = 1L;
private String sys_boot_id= "000000000000";
private String sys_uuid= "000000000000";
private String cat_boot_id= "000000000000";
private String f_boot_id= "000000000000";
private String cat_uuid= "000000000000";
private String f_uuid= "000000000000";
public String getSys_boot_id() {
return sys_boot_id;
}
public void setSys_boot_id(String sys_boot_id) {
this.sys_boot_id = sys_boot_id;
}
public String getSys_uuid() {
return sys_uuid;
}
public void setSys_uuid(String sys_uuid) {
this.sys_uuid = sys_uuid;
}
public String getCat_boot_id() {
return cat_boot_id;
}
public void setCat_boot_id(String cat_boot_id) {
this.cat_boot_id = cat_boot_id;
}
public String getF_boot_id() {
return f_boot_id;
}
public void setF_boot_id(String f_boot_id) {
this.f_boot_id = f_boot_id;
}
public String getCat_uuid() {
return cat_uuid;
}
public void setCat_uuid(String cat_uuid) {
this.cat_uuid = cat_uuid;
}
public String getF_uuid() {
return f_uuid;
}
public void setF_uuid(String f_uuid) {
this.f_uuid = f_uuid;
}
}
package com.flink.vo.android.otherInfo;
import java.io.Serializable;
/**
* @author wjs
* @version 创建时间:2024-11-21 18:13:10
* 类说明
*/
public class I7 implements Serializable{
/**
*
*/
private static final long serialVersionUID = 1L;
private String api_mac= "000000000000";
public String getApi_mac() {
return api_mac;
}
public void setApi_mac(String api_mac) {
this.api_mac = api_mac;
}
}
package com.flink.vo.android.otherInfo;
import java.io.Serializable;
/**
* @author wjs
* @version 创建时间:2024-11-21 17:43:24
* 类说明
*/
public class OtherInfo implements Serializable{
/**
*
*/
private static final long serialVersionUID = 1L;
private I7 i7;
private I6 i6;
private String i3= "000000000000";
private String i4= "000000000000";
private String i5= "000000000000";
public I7 getI7() {
return i7;
}
public void setI7(I7 i7) {
this.i7 = i7;
}
public I6 getI6() {
return i6;
}
public void setI6(I6 i6) {
this.i6 = i6;
}
public String getI3() {
return i3;
}
public void setI3(String i3) {
this.i3 = i3;
}
public String getI4() {
return i4;
}
public void setI4(String i4) {
this.i4 = i4;
}
public String getI5() {
return i5;
}
public void setI5(String i5) {
this.i5 = i5;
}
}
package com.flink.vo.ios;
/**
* @author wjs
* @version 创建时间:2024-12-10 15:37:00
* 类说明
*/
public class IosCollectionBody {
private IosEnvInfo g1;
private IosDeviceInfo a1;
public IosEnvInfo getG1() {
return g1;
}
public void setG1(IosEnvInfo g1) {
this.g1 = g1;
}
public IosDeviceInfo getA1() {
return a1;
}
public void setA1(IosDeviceInfo a1) {
this.a1 = a1;
}
}
package com.flink.vo.ios;
import java.io.Serializable;
/**
* @author wjs
* @version 创建时间:2024-12-10 15:38:25 类说明
*/
public class IosDeviceInfo implements Serializable {
/**
*
*/
private static final long serialVersionUID = 1L;
private String d12 = "000000000000";
private String l4 = "000000000000";
private String ap5 = "000000000000";
private String d11 = "000000000000";
private String f3 = "000000000000";
private String c7 = "000000000000";
private String ap4 = "000000000000";
private String d8 = "000000000000";
private String l2 = "000000000000";
private String f2 = "000000000000";
private String b4 = "000000000000";
private String b1 = "000000000000";
private String d9 = "000000000000";
private String l1 = "000000000000";
private String ap7 = "000000000000";
private String l3 = "000000000000";
private String f8 = "000000000000";
private String l10 = "000000000000";
private String ap2 = "000000000000";
private String f1 = "000000000000";
private String b2 = "000000000000";
private String d10 = "000000000000";
private String d7 = "000000000000";
private String ap6 = "000000000000";
private String ap1 = "000000000000";
private String c3= "0*0";
public String getD12() {
return d12;
}
public void setD12(String d12) {
this.d12 = d12;
}
public String getL4() {
return l4;
}
public void setL4(String l4) {
this.l4 = l4;
}
public String getAp5() {
return ap5;
}
public void setAp5(String ap5) {
this.ap5 = ap5;
}
public String getD11() {
return d11;
}
public void setD11(String d11) {
this.d11 = d11;
}
public String getF3() {
return f3;
}
public void setF3(String f3) {
this.f3 = f3;
}
public String getC7() {
return c7;
}
public void setC7(String c7) {
this.c7 = c7;
}
public String getAp4() {
return ap4;
}
public void setAp4(String ap4) {
this.ap4 = ap4;
}
public String getD8() {
return d8;
}
public void setD8(String d8) {
this.d8 = d8;
}
public String getL2() {
return l2;
}
public void setL2(String l2) {
this.l2 = l2;
}
public String getF2() {
return f2;
}
public void setF2(String f2) {
this.f2 = f2;
}
public String getB4() {
return b4;
}
public void setB4(String b4) {
this.b4 = b4;
}
public String getB1() {
return b1;
}
public void setB1(String b1) {
this.b1 = b1;
}
public String getD9() {
return d9;
}
public void setD9(String d9) {
this.d9 = d9;
}
public String getL1() {
return l1;
}
public void setL1(String l1) {
this.l1 = l1;
}
public String getAp7() {
return ap7;
}
public void setAp7(String ap7) {
this.ap7 = ap7;
}
public String getL3() {
return l3;
}
public void setL3(String l3) {
this.l3 = l3;
}
public String getF8() {
return f8;
}
public void setF8(String f8) {
this.f8 = f8;
}
public String getL10() {
return l10;
}
public void setL10(String l10) {
this.l10 = l10;
}
public String getAp2() {
return ap2;
}
public void setAp2(String ap2) {
this.ap2 = ap2;
}
public String getF1() {
return f1;
}
public void setF1(String f1) {
this.f1 = f1;
}
public String getB2() {
return b2;
}
public void setB2(String b2) {
this.b2 = b2;
}
public String getD10() {
return d10;
}
public void setD10(String d10) {
this.d10 = d10;
}
public String getD7() {
return d7;
}
public void setD7(String d7) {
this.d7 = d7;
}
public String getAp6() {
return ap6;
}
public void setAp6(String ap6) {
this.ap6 = ap6;
}
public String getAp1() {
return ap1;
}
public void setAp1(String ap1) {
this.ap1 = ap1;
}
public String getC3() {
return c3;
}
public void setC3(String c3) {
this.c3 = c3;
}
}
package com.flink.vo.ios;
import java.io.Serializable;
/**
* @author wjs
* @version 创建时间:2024-12-10 15:38:36
* 类说明
*/
public class IosEnvInfo implements Serializable{
/**
*
*/
private static final long serialVersionUID = 1L;
private String g2= "000000000000";
private String speed= "000000000000";
private String q2= "000000000000";
private String ap9= "000000000000";
private String course= "000000000000";
private String p2= "000000000000";
private String q3= "000000000000";
private String ap8= "000000000000";
private String g3= "000000000000";
public String getG2() {
return g2;
}
public void setG2(String g2) {
this.g2 = g2;
}
public String getSpeed() {
return speed;
}
public void setSpeed(String speed) {
this.speed = speed;
}
public String getQ2() {
return q2;
}
public void setQ2(String q2) {
this.q2 = q2;
}
public String getAp9() {
return ap9;
}
public void setAp9(String ap9) {
this.ap9 = ap9;
}
public String getCourse() {
return course;
}
public void setCourse(String course) {
this.course = course;
}
public String getP2() {
return p2;
}
public void setP2(String p2) {
this.p2 = p2;
}
public String getQ3() {
return q3;
}
public void setQ3(String q3) {
this.q3 = q3;
}
public String getAp8() {
return ap8;
}
public void setAp8(String ap8) {
this.ap8 = ap8;
}
public String getG3() {
return g3;
}
public void setG3(String g3) {
this.g3 = g3;
}
}
#kafka集群地址
#kafka.bootstrapServers=140.245.125.203:9092
kafka.bootstrapServers=168.138.185.142:9092,213.35.103.223:9092,129.150.49.247:9092
#kafka主题
kafka.topic=ods_collect_log
#kafka消费者组
kafka.group=collectGroup
doris.jdbc_url=jdbc:mysql://140.245.112.44:9030/bi?useUnicode=true&characterEncoding=utf8&zeroDateTimeBehavior=convertToNull&useSSL=true&serverTimezone=GMT%2B8&nullCatalogMeansCurrent=true
doris.username=root
doris.driver_class_name=com.mysql.cj.jdbc.Driver
\ No newline at end of file
rootLogger.level=INFO
rootLogger.appenderRef.console.ref=ConsoleAppender
appender.console.name=ConsoleAppender
appender.console.type=CONSOLE
appender.console.layout.type=PatternLayout
appender.console.layout.pattern=%d{HH:mm:ss,SSS} %-5p %-60c %x - %m%n
log.file=D:\\tmprootLogger.level=INFO
rootLogger.appenderRef.console.ref=ConsoleAppender
appender.console.name=ConsoleAppender
appender.console.type=CONSOLE
appender.console.layout.type=PatternLayout
appender.console.layout.pattern=%d{HH:mm:ss,SSS} %-5p %-60c %x - %m%n
log.file=D:\\tmp
\ No newline at end of file
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or sign in to comment