Commit da551c2d by 魏建枢

代码调整

parent 2bd5e5be
......@@ -5,50 +5,6 @@
<artifactId>eagleEye-flink_kafka</artifactId>
<version>1.0-SNAPSHOT</version>
<build>
<resources>
<resource>
<directory>src/main/resources</directory>
</resource>
<resource>
<directory>src/main/java</directory>
<includes>
<include>**/*.xml</include>
</includes>
</resource>
</resources>
<finalName>${project.name}</finalName>
<pluginManagement>
<plugins>
<plugin>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-maven-plugin</artifactId>
<version>${spring.boot.version}</version>
<executions>
<execution>
<goals>
<goal>repackage</goal>
</goals>
</execution>
</executions>
<configuration>
<fork>true</fork>
<finalName>${project.build.finalName}</finalName>
</configuration>
</plugin>
<plugin>
<artifactId>maven-compiler-plugin</artifactId>
<version>${maven.plugin.version}</version>
<configuration>
<source>${java.version}</source>
<target>${java.version}</target>
<encoding>UTF-8</encoding>
<compilerArgs>
<arg>-parameters</arg>
</compilerArgs>
</configuration>
</plugin>
</plugins>
</pluginManagement>
<plugins>
<plugin>
<artifactId>maven-shade-plugin</artifactId>
......@@ -62,10 +18,7 @@
<configuration>
<artifactSet>
<excludes>
<exclude>org.apache.flink:force-shading</exclude>
<exclude>org.google.code.flindbugs:jar305</exclude>
<exclude>org.slf4j:*</exclude>
<excluder>org.apache.logging.log4j:*</excluder>
<exclude>com.google.code.findbugs:jsr305</exclude>
</excludes>
</artifactSet>
<filters>
......@@ -82,6 +35,7 @@
<transformer>
<mainClass>com.flink.KafkaStreamingJob</mainClass>
</transformer>
<transformer />
</transformers>
</configuration>
</execution>
......@@ -109,38 +63,28 @@
</pluginRepositories>
<dependencies>
<dependency>
<groupId>org.apache.logging.log4j</groupId>
<artifactId>log4j-slf4j-impl</artifactId>
<version>2.17.1</version>
<scope>compile</scope>
</dependency>
<dependency>
<groupId>org.apache.logging.log4j</groupId>
<artifactId>log4j-api</artifactId>
<version>2.17.1</version>
<scope>compile</scope>
</dependency>
<dependency>
<groupId>org.apache.logging.log4j</groupId>
<artifactId>log4j-core</artifactId>
<version>2.17.1</version>
<scope>compile</scope>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-runtime-web</artifactId>
<version>1.20.0</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>jdk.tools</groupId>
<artifactId>jdk.tools</artifactId>
<version>1.8</version>
<scope>system</scope>
<systemPath>${JAVA_HOME}/lib/tools.jar</systemPath>
</dependency>
</dependencies>
<properties>
<project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding>
<java.version>8</java.version>
<maven.plugin.version>3.8.1</maven.plugin.version>
<flink.version>1.20.0</flink.version>
<scala.binary.version>2.11</scala.binary.version>
<maven.compiler.target>1.8</maven.compiler.target>
<project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding>
<log4j.version>2.17.1</log4j.version>
<flink.version>1.20.0</flink.version>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<maven.compiler.source>1.8</maven.compiler.source>
<fastjson.version>1.2.75</fastjson.version>
<maven.plugin.version>3.8.1</maven.plugin.version>
</properties>
</project>
......@@ -11,7 +11,8 @@
<!--属性设置 -->
<properties>
<!--java_JDK版本 -->
<java.version>8</java.version>
<maven.compiler.source>1.8</maven.compiler.source>
<maven.compiler.target>1.8</maven.compiler.target>
<!--maven打包插件 -->
<maven.plugin.version>3.8.1</maven.plugin.version>
<!--编译编码UTF-8 -->
......@@ -28,8 +29,20 @@
<scala.binary.version>2.11</scala.binary.version>
</properties>
<!--通用依赖 -->
<dependencies>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-planner_2.12</artifactId>
<version>1.20.0</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-api-java</artifactId>
<version>1.20.0</version>
</dependency>
<!-- json -->
<dependency>
<groupId>com.alibaba</groupId>
......@@ -80,7 +93,6 @@
<artifactId>mysql-connector-java</artifactId>
<version>8.0.27</version>
</dependency>
<!--================================集成外部依赖========================================== -->
<!--集成日志框架 start -->
<dependency>
<groupId>org.apache.logging.log4j</groupId>
......@@ -99,9 +111,7 @@
<artifactId>log4j-core</artifactId>
<version>${log4j.version}</version>
</dependency>
<!--集成日志框架 end -->
<!--================================集成外部依赖========================================== -->
<!--kafka依赖 start -->
<dependency>
<groupId>org.apache.flink</groupId>
......@@ -123,14 +133,29 @@
</dependency>
<dependency>
<groupId>org.apache.doris</groupId>
<artifactId>flink-doris-connector-1.16</artifactId>
<artifactId>flink-doris-connector-1.20</artifactId>
<version>25.0.0</version>
</dependency>
<!-- <dependency> -->
<!-- <groupId>org.apache.hadoop</groupId> -->
<!-- <artifactId>hadoop-client</artifactId> -->
<!-- <version>3.4.0</version> -->
<!-- </dependency> -->
<!-- <dependency> -->
<!-- <groupId>org.apache.hadoop</groupId> -->
<!-- <artifactId>hadoop-client</artifactId> -->
<!-- <version>3.4.0</version> -->
<!-- </dependency> -->
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-common</artifactId>
<version>3.3.6</version>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-hdfs</artifactId>
<version>3.3.6</version>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-mapreduce-client-core</artifactId>
<version>3.3.6</version>
</dependency>
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
......@@ -141,24 +166,18 @@
<artifactId>ip2region</artifactId>
<version>2.6.4</version>
</dependency>
<dependency>
<groupId>jdk.tools</groupId>
<artifactId>jdk.tools</artifactId>
<version>1.8</version>
<scope>system</scope>
<!-- 根据 JDK 版本选择路径 -->
<systemPath>${JAVA_HOME}/lib/tools.jar</systemPath>
</dependency>
</dependencies>
<!--编译打包 -->
<build>
<finalName>${project.name}</finalName>
<!--资源文件打包 -->
<resources>
<resource>
<directory>src/main/resources</directory>
</resource>
<resource>
<directory>src/main/java</directory>
<includes>
<include>**/*.xml</include>
</includes>
</resource>
</resources>
<build>
<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
......@@ -173,14 +192,13 @@
<configuration>
<artifactSet>
<excludes>
<exclude>org.apache.flink:force-shading</exclude>
<exclude>org.google.code.flindbugs:jar305</exclude>
<exclude>org.slf4j:*</exclude>
<excluder>org.apache.logging.log4j:*</excluder>
<exclude>com.google.code.findbugs:jsr305</exclude>
</excludes>
</artifactSet>
<filters>
<filter>
<!-- Do not copy the signatures in the META-INF folder. Otherwise,
this might cause SecurityExceptions when using the JAR. -->
<artifact>*:*</artifact>
<excludes>
<exclude>META-INF/*.SF</exclude>
......@@ -192,51 +210,17 @@
<transformers>
<transformer
implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer">
<!-- Replace this with the main class of your job -->
<mainClass>com.flink.KafkaStreamingJob</mainClass>
</transformer>
<transformer
implementation="org.apache.maven.plugins.shade.resource.ServicesResourceTransformer" />
</transformers>
</configuration>
</execution>
</executions>
</plugin>
</plugins>
<!--插件统一管理 -->
<pluginManagement>
<plugins>
<!--maven打包插件 -->
<plugin>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-maven-plugin</artifactId>
<version>${spring.boot.version}</version>
<configuration>
<fork>true</fork>
<finalName>${project.build.finalName}</finalName>
</configuration>
<executions>
<execution>
<goals>
<goal>repackage</goal>
</goals>
</execution>
</executions>
</plugin>
<!--编译打包插件 -->
<plugin>
<artifactId>maven-compiler-plugin</artifactId>
<version>${maven.plugin.version}</version>
<configuration>
<source>${java.version}</source>
<target>${java.version}</target>
<encoding>UTF-8</encoding>
<compilerArgs>
<arg>-parameters</arg>
</compilerArgs>
</configuration>
</plugin>
</plugins>
</pluginManagement>
</build>
<!--配置Maven项目中需要使用的远程仓库 -->
......
......@@ -4,7 +4,6 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import com.flink.achieve.ods.OdsEventLogSourceAchi;
import com.flink.common.KafkaSourceConnector;
/**
* @author wjs
......@@ -13,28 +12,10 @@ import com.flink.common.KafkaSourceConnector;
*/
public class KafkaStreamingJob {
private static final Logger logger = LoggerFactory.getLogger(KafkaSourceConnector.class);
private static final Logger logger = LoggerFactory.getLogger(KafkaStreamingJob.class);
public static void main(String[] args) throws Exception {
// StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
// DataStreamSource<String> dataStreamSource = KafkaSourceConnector.sourceKafka(env, "ods_collect_log", "collectGroup");
// //=================5.数据简单处理======================
// dataStreamSource.flatMap(new FlatMapFunction<String, String>() {
// /**
// *
// */
// private static final long serialVersionUID = 1L;
// @Override
// public void flatMap(String record, Collector<String> collector) throws Exception {
// logger.info("正在预处理源数据:{}", record);
// }
// });
// //=================6.启动服务=========================================
// env.execute("聚合统计JOB");
// OdsCollectLogSourceAchi sourceCommonBase = new OdsCollectLogSourceAchi();
// sourceCommonBase.handleDataStreamSource("测试", "ods_collect_log", "collectGroup");
OdsEventLogSourceAchi sourceEventLog = new OdsEventLogSourceAchi();
sourceEventLog.handleDataStreamSource("事件日志聚合JOB", "ods_event_log", "eventLogGroup");
}
......
package com.flink.achieve.ods;
import java.io.Serializable;
import java.util.ArrayList;
import java.util.List;
import org.apache.commons.lang3.StringUtils;
import org.apache.flink.connector.jdbc.JdbcConnectionOptions;
import org.apache.flink.connector.jdbc.JdbcSink;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
......@@ -16,9 +11,7 @@ import com.alibaba.fastjson.JSONObject;
import com.alibaba.fastjson.TypeReference;
import com.flink.common.SourceCommonBase;
import com.flink.enums.AppTypeEnum;
import com.flink.util.LoadPropertiesFile;
import com.flink.vo.OdsCollectLog;
import com.flink.vo.EventIpConvert;
import com.flink.vo.android.AndroidCollectionBody;
import com.flink.vo.android.deviceInfo.AndroidA1;
import com.flink.vo.android.envInfo.AndroidEnvInfo;
......@@ -41,7 +34,7 @@ public class OdsCollectLogSourceAchi extends SourceCommonBase implements Seriali
private static final Logger logger = LoggerFactory.getLogger(OdsCollectLogSourceAchi.class);
@Override
public void parseSourceKafkaJson(String record) throws Exception {
public JSONObject parseSourceKafkaJson(String record) throws Exception {
logger.info("record:{}",record);
// TODO 数据的 ETL 处理
OdsCollectLog odsCollectLog = JSONObject.parseObject(record,new TypeReference<OdsCollectLog>(){});
......@@ -82,6 +75,7 @@ public class OdsCollectLogSourceAchi extends SourceCommonBase implements Seriali
iosBodyObj.setA1(a1);
iosBodyObj.setG1(g1);
}
return null;
}
@Override
......
package com.flink.achieve.ods;
import java.io.Serializable;
import java.net.Inet6Address;
import java.net.InetAddress;
import java.net.UnknownHostException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.stream.Collectors;
import org.apache.commons.collections.CollectionUtils;
import org.apache.commons.lang3.StringUtils;
import org.apache.flink.api.common.io.ParseException;
import org.apache.flink.connector.jdbc.JdbcConnectionOptions;
import org.apache.flink.connector.jdbc.JdbcSink;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
......@@ -23,9 +15,7 @@ import com.alibaba.fastjson.JSONObject;
import com.alibaba.fastjson.TypeReference;
import com.flink.common.SourceCommonBase;
import com.flink.enums.AppTypeEnum;
import com.flink.util.LoadPropertiesFile;
import com.flink.util.ip2region.SearcherUtil;
import com.flink.vo.EventIpConvert;
import com.flink.vo.OdsEventLog;
import com.flink.vo.UserProperties;
......@@ -43,7 +33,7 @@ public class OdsEventLogSourceAchi extends SourceCommonBase implements Serializa
private static final Logger logger = LoggerFactory.getLogger(OdsEventLogSourceAchi.class);
@Override
public void parseSourceKafkaJson(String record) throws ParseException, Exception {
public JSONObject parseSourceKafkaJson(String record) throws ParseException, Exception {
logger.info("OdsEventLogSourceAchi record:{}",record);
// TODO 数据的 ETL 处理
OdsEventLog odsEventLog = JSONObject.parseObject(record,new TypeReference<OdsEventLog>(){});
......@@ -66,7 +56,7 @@ public class OdsEventLogSourceAchi extends SourceCommonBase implements Serializa
String routeIp = odsEventLog.getRoute_ip();
String userProperties = odsEventLog.getUser_properties();
String uniqueId = odsEventLog.getUnique_id();
logger.info("组装数据 body:{}",odsEventLog.toString());
String cid = null;
String phone = null;
String nick = null;
......@@ -88,40 +78,37 @@ public class OdsEventLogSourceAchi extends SourceCommonBase implements Serializa
}
}
}
List<String> ips = convertStringToList(routeIp);
List<String> ips = SearcherUtil.convertStringToList(routeIp);
if(CollectionUtils.isEmpty(ips)) {
return;
return null;
}
List<String> ipList = new ArrayList<>();
List<String> areaNameList = new ArrayList<>();
for(String ip:ips) {
if(!ipv6(ip)) {
if(!SearcherUtil.ipv6(ip)) {
String area_name = SearcherUtil.getCityInfoByFile(ip);
ipList.add(ip);
areaNameList.add(area_name);
}
}
logger.info("组装数据开始");
List<EventIpConvert> eventIpConvertList = new ArrayList<>();
EventIpConvert eventIpConvert = new EventIpConvert();
eventIpConvert.setId(id);
eventIpConvert.setIp(ipList.toString());
eventIpConvert.setAreaName(areaNameList.toString());
eventIpConvert.setDeviceId(deviceId);
eventIpConvert.setCid(cid);
eventIpConvert.setPhone(phone);
eventIpConvert.setNick(nick);
eventIpConvert.setCreateTime(createTime);
eventIpConvert.setDt(createTime);
eventIpConvertList.add(eventIpConvert);
logger.info("组装数据结束");
this.insert(eventIpConvertList);
JSONObject jsonObj = new JSONObject();
jsonObj.put("id", id);
jsonObj.put("ips", ips.toString());
jsonObj.put("areaNameList", areaNameList.toString());
jsonObj.put("deviceId", deviceId);
jsonObj.put("cid", cid);
jsonObj.put("phone", phone);
jsonObj.put("nick", nick);
jsonObj.put("createTime", createTime);
if(StringUtils.equals(appType, AppTypeEnum.ANDROID.getCode())) {
}else if(StringUtils.equals(appType, AppTypeEnum.IOS.getCode())) {
}
return jsonObj;
}
@Override
......@@ -130,66 +117,6 @@ public class OdsEventLogSourceAchi extends SourceCommonBase implements Serializa
}
public static List<String> convertStringToList(String str) {
if (StringUtils.isEmpty(str) || str.trim().isEmpty()) {
return new ArrayList<>(); // 返回空列表
}
// 去除中括号和多余空格
String trimmedStr = str.replaceAll("^\\[|\\]$", "").trim();
// 如果去除中括号后字符串为空,返回空列表
if (trimmedStr.isEmpty()) {
return new ArrayList<>();
}
// 按逗号拆分并去除空格
return Arrays.stream(trimmedStr.split(","))
.map(String::trim)
.collect(Collectors.toList());
}
private static Boolean ipv6(String ip) {
Boolean flag = false;
try {
Inet6Address address = (Inet6Address) InetAddress.getByName(ip);
System.out.println(ip + " is an IPv6 address.");
flag = true;
} catch (UnknownHostException e) {
System.out.println(ip + " is not a valid IPv6 address.");
} catch (ClassCastException e) {
System.out.println(ip + " is not an IPv6 address.");
}
return flag;
}
public static void insert(List<EventIpConvert> eventIpConvertList) throws Exception {
logger.info("insert eventIpConvertList:{}",eventIpConvertList.toString());
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);
env.fromData(eventIpConvertList)
.addSink(JdbcSink.sink(
"INSERT INTO `bi`.`event_ip_convert` " +
"(`id`, `ip`, `area_name`, `device_id`, `cid`, `phone`, `nick`, `create_time`, `dt`) " +
"VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?);",
(ps, data) -> {
ps.setString(1, data.getId());
ps.setString(2, data.getIp());
ps.setString(3, data.getAreaName());
ps.setString(4, data.getDeviceId());
ps.setString(5, data.getCid());
ps.setString(6, data.getPhone());
ps.setString(7, data.getNick());
ps.setString(8, data.getCreateTime());
ps.setString(9, data.getDt());
},
new JdbcConnectionOptions.JdbcConnectionOptionsBuilder()
.withUsername(LoadPropertiesFile.getPropertyFileValues("doris.username"))
.withPassword("")
.withDriverName(LoadPropertiesFile.getPropertyFileValues("doris.driver_class_name"))
.withUrl(LoadPropertiesFile.getPropertyFileValues("doris.jdbc_url"))
.build()
));
env.execute();
}
}
package com.flink.common;
import java.util.Properties;
import org.apache.doris.flink.cfg.DorisExecutionOptions;
import org.apache.doris.flink.cfg.DorisOptions;
import org.apache.doris.flink.sink.DorisSink;
import org.apache.doris.flink.sink.writer.LoadConstants;
import org.apache.doris.flink.sink.writer.serializer.RowDataSerializer;
import org.apache.flink.table.data.RowData;
import org.apache.flink.table.types.DataType;
import com.flink.util.LoadPropertiesFile;
/**
* @author wjs
* @version 创建时间:2024-12-16 18:21:22
* 类说明 sink Kafka
*/
public class DorisConnector {
public static DorisSink<RowData> sinkDoris(String[] fields,DataType[] types,String tableName) {
//=================设置属性=========================================
Properties streamLoadProps = new Properties();
streamLoadProps.setProperty("format", "json");
streamLoadProps.setProperty("read_json_by_line", "true");
streamLoadProps.setProperty("strip_outer_array", "false");
streamLoadProps.setProperty("sink.enable-2pc", "true");
//=================Doris Sink 配置=========================================
DorisOptions dorisOptions = DorisOptions.builder()
.setFenodes(LoadPropertiesFile.getPropertyFileValues("doris.fe"))
.setTableIdentifier(tableName)
.setUsername(LoadPropertiesFile.getPropertyFileValues("doris.username"))
.setPassword("")
.build();
//=================流式处理=========================================
DorisSink<RowData> dorisSink = DorisSink.<RowData>builder()
.setDorisOptions(dorisOptions)
.setDorisExecutionOptions(DorisExecutionOptions.builder()
.setLabelPrefix("label-doris")
.setStreamLoadProp(streamLoadProps)
.build())
.setSerializer(RowDataSerializer.builder()
.setFieldNames(fields)
.setFieldType(types)
.setType(LoadConstants.JSON)
.build())
.build();
return dorisSink;
}
}
......@@ -19,23 +19,38 @@ public class EnvironmentSettings {
//环境设置
public static StreamExecutionEnvironment environmentSettings() {
Configuration conf = new Configuration();
conf.setInteger("rest.port", 8081);
Configuration config = new Configuration();
// config.setString("parallelism.default", "4");
// config.setString("taskmanager.numberOfTaskSlots", "16");
// config.setString("taskmanager.memory.flink.size", "8192m");
// config.setString("taskmanager.memory.jvm-metaspace.size", "4096m");
config.setString("state.backend", "filesystem");
// 指定检查点目录(必须是持久化存储路径,如 HDFS)
config.setString("state.checkpoints.dir", "hdfs://140.245.112.44:8020/user/ck");
// 可选:指定保存点目录
config.setString("state.savepoints.dir", "hdfs://140.245.112.44:8020/user/savepoints");
// conf.setInteger("rest.port", 8081);
// StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironmentWithWebUI(conf);
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(config);
env.getCheckpointConfig().disableCheckpointing();
// env.setParallelism(4); // 调整并行度
//=================启动服务=========================================
//开启flink的checkpoint功能:每隔5000ms启动一个检查点(设置checkpoint的声明周期)
//设置有且仅有一次模式 目前支持EXACTLY_ONCE/AT_LEAST_ONCE
env.enableCheckpointing(60000L, CheckpointingMode.EXACTLY_ONCE);
env.enableCheckpointing(180000L, CheckpointingMode.EXACTLY_ONCE);
// env.getCheckpointConfig().setCheckpointingConsistencyMode(CheckpointingMode.EXACTLY_ONCE);
//checkpoint高级选项设置
//设置checkpoint的模式为exactly-once(这也是默认值)
//确保检查点之间至少有500ms间隔(即checkpoint的最小间隔)
env.getCheckpointConfig().setMinPauseBetweenCheckpoints(500);
env.getCheckpointConfig().setMinPauseBetweenCheckpoints(50000);
//确保检查必须在1min之内完成,否则就会被丢弃掉(即checkpoint的超时时间)
env.getCheckpointConfig().setCheckpointTimeout(60000);
env.getCheckpointConfig().setCheckpointInterval(30000); // 30秒检查点间隔
env.getCheckpointConfig().setCheckpointTimeout(600000);
//同一时间只允许操作一个检查点
env.getCheckpointConfig().setMaxConcurrentCheckpoints(1);
env.getCheckpointConfig().setTolerableCheckpointFailureNumber(3); // 允许3次失败
// 在这个基础之上,添加快照
//开启在 job 中止后仍然保留的 externalizedcheckpoints 程序即使被cancel后,也会保留checkpoint数据,以便根据实际需要恢复到指定的checkpoint
env.getCheckpointConfig().setExternalizedCheckpointRetention(ExternalizedCheckpointRetention.RETAIN_ON_CANCELLATION);
......@@ -43,6 +58,7 @@ public class EnvironmentSettings {
env.setRuntimeMode(RuntimeExecutionMode.STREAMING);
//开启checkpoints
env.getCheckpointConfig().enableUnalignedCheckpoints();
// env.disableOperatorChaining();
return env;
}
}
......@@ -21,8 +21,8 @@ import com.flink.util.LoadPropertiesFile;
* @version 创建时间:2024-12-16 15:57:41
* 类说明 source Kafka
*/
public class KafkaSourceConnector {
private static final Logger logger = LoggerFactory.getLogger(KafkaSourceConnector.class);
public class KafkaConnector {
private static final Logger logger = LoggerFactory.getLogger(KafkaConnector.class);
//kafka资源配置文件信息的获取
public static DataStreamSource<String> sourceKafka(StreamExecutionEnvironment env,String topic,String group) {
......@@ -30,11 +30,11 @@ public class KafkaSourceConnector {
.setBootstrapServers(LoadPropertiesFile.getPropertyFileValues("kafka.bootstrapServers"))//设置kafka地址
.setTopics(topic)//设置主题,支持多种主题组合
.setGroupId(group)//设置消费这组id
.setValueOnlyDeserializer(new SafeStringDeserializer()) // 自定义容错反序列化器
.setStartingOffsets(OffsetsInitializer.earliest())//设置消费模式,支持多种消费模式
.setValueOnlyDeserializer(new SimpleStringSchema())//设置反序列化器
.setProperty("partition.discovery.interval.ms", "10000")//动态检查新分区, 10 秒检查一次新分区
.setProperty("partition.discovery.interval.ms", "10000")//动态检查新分区, 30 秒检查一次新分区
.build();//构建全部参数
return env.fromSource(kafkaSource, WatermarkStrategy.noWatermarks(), "odsCollectLog");
return env.fromSource(kafkaSource, WatermarkStrategy.noWatermarks(), topic);
}
//发送kafka
......
package com.flink.common;
import org.apache.flink.api.common.serialization.SimpleStringSchema;
import org.apache.flink.connector.base.DeliveryGuarantee;
import org.apache.flink.connector.kafka.sink.KafkaRecordSerializationSchema;
import org.apache.flink.connector.kafka.sink.KafkaSink;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import com.flink.util.LoadPropertiesFile;
/**
* @author wjs
* @version 创建时间:2024-12-16 18:21:22
* 类说明 sink Kafka
*/
public class KafkaSinkConnector {
public static void sinkKafka(StreamExecutionEnvironment env,String topic) {
String bootstrapServers = LoadPropertiesFile.getPropertyFileValues("kafka.bootstrapServers");
KafkaSink<String> sink = KafkaSink.<String>builder()
//设置kafka地址
.setBootstrapServers(bootstrapServers)
//设置消息序列号方式
.setRecordSerializer(KafkaRecordSerializationSchema.builder()
.setTopic(topic)
.setValueSerializationSchema(new SimpleStringSchema())
.build()
)
//至少一次
.setDeliveryGuarantee(DeliveryGuarantee.AT_LEAST_ONCE)
.build();
}
}
package com.flink.common;
import org.apache.flink.api.common.serialization.SimpleStringSchema;
/**
* @author wjs
* @version 创建时间:2025-4-25 19:59:53
* 类说明
*/
public class SafeStringDeserializer extends SimpleStringSchema {
/**
*
*/
private static final long serialVersionUID = 1L;
@Override
public String deserialize(byte[] message) {
try {
return super.deserialize(message);
} catch (Exception e) {
// 记录死信到指定Topic
return null;
}
}
}
package com.flink.common;
import org.apache.flink.api.common.functions.FlatMapFunction;
import java.time.Instant;
import java.time.LocalDateTime;
import java.time.ZonedDateTime;
import java.time.format.DateTimeFormatter;
import java.util.Objects;
import org.apache.doris.flink.sink.DorisSink;
import org.apache.flink.api.common.io.ParseException;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.util.Collector;
import org.apache.flink.table.api.DataTypes;
import org.apache.flink.table.data.GenericRowData;
import org.apache.flink.table.data.RowData;
import org.apache.flink.table.data.StringData;
import org.apache.flink.table.data.TimestampData;
import org.apache.flink.table.types.DataType;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import com.alibaba.fastjson.JSONObject;
/**
* @author wjs
* @version 创建时间:2024-12-20 10:43:56
......@@ -20,35 +33,50 @@ public abstract class SourceCommonBase {
//1. 环境的设置
StreamExecutionEnvironment env = EnvironmentSettings.environmentSettings();
//2.资源配置文件信息的获取
DataStreamSource<String> dataStreamSource = KafkaSourceConnector.sourceKafka(env, topic, group);
//3.消费者接收数据并做json的粗粒度解析
DataStreamSource<String> resultDataStreamSource = praseJson(dataStreamSource);
//4.将处理完之后的数据发往 kafka 队列
// sendToSinkKafka(resultDataStreamSource);
//=================6.启动服务=========================================
env.execute(jobName);
}
DataStreamSource<String> dataStreamSource = KafkaConnector.sourceKafka(env, topic, group);
/**
* 数据的 ETL 处理
* @return
*/
public DataStreamSource<String> praseJson(DataStreamSource<String> dataStreamSource) {
//=================5.数据简单处理======================
dataStreamSource.flatMap(new FlatMapFunction<String, String>() {
/**
*
*/
private static final long serialVersionUID = 1L;
@Override
public void flatMap(String record, Collector<String> collector) throws Exception {
logger.info("正在预处理源数据:{}", record);
// 抽象方法的设置
parseSourceKafkaJson(record);
//=================配置入库字段=========================================
String[] fields = {"id", "ip", "area_name", "device_id", "cid", "phone", "nick","create_time","dt","__DORIS_DELETE_SIGN__"};
DataType[] types = {DataTypes.STRING(), DataTypes.STRING(), DataTypes.STRING(), DataTypes.STRING(),
DataTypes.STRING(), DataTypes.STRING(), DataTypes.STRING(),DataTypes.STRING(),DataTypes.TIMESTAMP()};
//=================流式处理=========================================
String tableName = "bi.event_ip_convert";
DorisSink<RowData> dorisSink = DorisConnector.sinkDoris(fields, types, tableName);
// 数据处理流水线
dataStreamSource
.map(value -> {
try {
JSONObject jsonObj = parseSourceKafkaJson(value);
if(null == jsonObj) {
return null;
}
});
// String[] parts = parsed.split(",");
GenericRowData row = new GenericRowData(9);
DateTimeFormatter formatter = DateTimeFormatter.ISO_INSTANT;
System.out.println("value" + value);
// 字段映射
row.setField(0, StringData.fromString((String)jsonObj.get("id")));
row.setField(1, StringData.fromString((String) jsonObj.get("ips")));
row.setField(2, StringData.fromString((String) jsonObj.get("areaNameList")));
row.setField(3, StringData.fromString((String) jsonObj.get("deviceId")));
row.setField(4, StringData.fromString((String) jsonObj.get("cid")));
row.setField(5, StringData.fromString((String) jsonObj.get("phone")));
row.setField(6, StringData.fromString((String) jsonObj.get("nick")));
row.setField(7, StringData.fromString((String) jsonObj.get("createTime")));
row.setField(8, TimestampData.fromInstant(Instant.now()));
return (RowData)row;
} catch (Exception e) {
System.err.println("解析失败: "+e.toString());
return null;
}
})
.filter(Objects::nonNull)
// .print(">>>>>>>>>>>>>>>");
.sinkTo(dorisSink);
env.execute(jobName);
}
/**
* 解析JSON数据(抽象方法的设置)
......@@ -57,7 +85,7 @@ public abstract class SourceCommonBase {
* @throws ParseException
* @throws Exception
*/
public abstract void parseSourceKafkaJson(String record) throws ParseException, Exception;
public abstract JSONObject parseSourceKafkaJson(String record) throws ParseException, Exception;
/**
* 将处理完之后的数据发往 kafka 队列 供下游计算使用(抽象方法的设置)
......
package com.flink.util.ip2region;
import java.io.IOException;
import java.net.Inet6Address;
import java.net.InetAddress;
import java.net.UnknownHostException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
import org.apache.commons.lang3.StringUtils;
import org.lionsoul.ip2region.xdb.Searcher;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import lombok.extern.log4j.Log4j2;
/**
* @author wjs
* @version 创建时间:2025-2-14 14:38:41
* 类说明 https://blog.csdn.net/qq_37284798/article/details/130005988
*/
@Log4j2
public class SearcherUtil {
private static final Logger logger = LoggerFactory.getLogger(SearcherUtil.class);
public static String getCityInfoByFile(String ip) {
// 1、创建 searcher 对象
String dbPath = "D:\\gitEagleEye\\eagleEye-service\\src\\main\\resources\\ip2region.xdb";
// String dbPath = "/home/opc/ip2region.xdb";
// String dbPath = "D:\\gitEagleEye\\eagleEye-service\\src\\main\\resources\\ip2region.xdb";
String dbPath = "/home/opc/ip2region.xdb";
Searcher searcher;
try {
searcher = Searcher.newWithFileOnly(dbPath);
} catch (IOException e) {
log.error("failed to create searcher with `{}`: ", dbPath, e);
logger.error("failed to create searcher with `{}`: ", dbPath, e);
return null;
}
......@@ -32,21 +42,51 @@ public class SearcherUtil {
long sTime = System.nanoTime();
String region = searcher.search(ip);
long cost = TimeUnit.NANOSECONDS.toMicros(System.nanoTime() - sTime);
log.info("{region: {}, ioCount: {}, took: {} μs}", region, searcher.getIOCount(), cost);
logger.info("{region: {}, ioCount: {}, took: {} μs}", region, searcher.getIOCount(), cost);
return region;
} catch (Exception e) {
log.info("failed to search({}): ", ip, e);
logger.info("failed to search({}): ", ip, e);
}finally {
try {
searcher.close();
} catch (IOException e) {
log.info("failed to close({}): ", ip, e);
logger.info("failed to close({}): ", ip, e);
}
}
return null;
// 3、备注:并发使用,每个线程需要创建一个独立的 searcher 对象单独使用。
}
public static List<String> convertStringToList(String str) {
if (StringUtils.isEmpty(str) || str.trim().isEmpty()) {
return new ArrayList<>(); // 返回空列表
}
// 去除中括号和多余空格
String trimmedStr = str.replaceAll("^\\[|\\]$", "").trim();
// 如果去除中括号后字符串为空,返回空列表
if (trimmedStr.isEmpty()) {
return new ArrayList<>();
}
// 按逗号拆分并去除空格
return Arrays.stream(trimmedStr.split(","))
.map(String::trim)
.collect(Collectors.toList());
}
public static Boolean ipv6(String ip) {
Boolean flag = false;
try {
Inet6Address address = (Inet6Address) InetAddress.getByName(ip);
System.out.println(ip + " is an IPv6 address.");
flag = true;
} catch (UnknownHostException e) {
System.out.println(ip + " is not a valid IPv6 address.");
} catch (ClassCastException e) {
System.out.println(ip + " is not an IPv6 address.");
}
return flag;
}
public static void main(String[] args) throws Exception {
getCityInfoByFile("1.9.241.214");
}
......
package com.flink.vo;
import java.io.Serializable;
import lombok.Data;
/**
* @author wjs
* @version 创建时间:2025-4-24 13:06:19
* 类说明
*/
@Data
public class EventIpConvert implements Serializable{
/**
*
*/
private static final long serialVersionUID = 1L;
private String id;
private String ip;
private String areaName;
private String deviceId;
private String cid;
private String phone;
private String nick;
private String createTime;
private String dt;
}
......@@ -6,6 +6,9 @@ kafka.topic=ods_collect_log
#kafka消费者组
kafka.group=collectGroup
doris.jdbc_url=jdbc:mysql://140.245.112.44:9030/bi?useUnicode=true&characterEncoding=utf8&zeroDateTimeBehavior=convertToNull&useSSL=true&serverTimezone=GMT%2B8&nullCatalogMeansCurrent=true
#doris.jdbc_url=jdbc:mysql://10.0.0.105 9030/bi?useUnicode=true&characterEncoding=utf8&zeroDateTimeBehavior=convertToNull&useSSL=true&serverTimezone=GMT%2B8&nullCatalogMeansCurrent=true
#doris.jdbc_url=jdbc:mysql://140.245.112.44:9030/bi?useUnicode=true&characterEncoding=utf8&zeroDateTimeBehavior=convertToNull&useSSL=true&serverTimezone=GMT%2B8&nullCatalogMeansCurrent=true
#doris.fe=140.245.112.44:8030
doris.fe=10.0.0.105:8030
doris.username=root
doris.driver_class_name=com.mysql.cj.jdbc.Driver
\ No newline at end of file
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or sign in to comment