Commit 667930aa by 魏建枢

代码提交

parent b5902e5d
package com.flink.common;
import java.time.Duration;
import org.apache.flink.api.common.RuntimeExecutionMode;
import org.apache.flink.configuration.CheckpointingOptions;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.ExternalizedCheckpointRetention;
import org.apache.flink.configuration.StateBackendOptions;
import org.apache.flink.core.execution.CheckpointingMode;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import com.flink.util.LoadPropertiesFile;
/**
* @author wjs
* @version 创建时间:2025-4-23 14:36:34
* 类说明 https://www.bookstack.cn/read/flink-1.20-zh/TryFlink.md
*/
public class EnvironmentSettings {
private static final Logger logger = LoggerFactory.getLogger(EnvironmentSettings.class);
//环境设置
public static StreamExecutionEnvironment environmentSettings(String jobType) {
Configuration config = new Configuration();
// config.setString("parallelism.default", "4");
// config.setString("taskmanager.numberOfTaskSlots", "16");
// config.setString("taskmanager.memory.flink.size", "8192m");
// config.setString("taskmanager.memory.jvm-metaspace.size", "4096m");
// config.setString("state.backend", "filesystem");
// 指定检查点目录(必须是持久化存储路径,如 HDFS)
// config.setString("state.checkpoints.dir", "hdfs://140.245.112.44:8020/user/ck");
// 状态后端设为 RocksDB
config.set(StateBackendOptions.STATE_BACKEND, "rocksdb");
// 检查点存储为文件系统
config.set(CheckpointingOptions.CHECKPOINT_STORAGE, "filesystem");
// HDFS 路径
config.set(CheckpointingOptions.CHECKPOINTS_DIRECTORY, LoadPropertiesFile.getPropertyFileValues("hdfs.url")+jobType);
// 检查点间隔(默认未启用)
config.set(CheckpointingOptions.CHECKPOINTING_INTERVAL, Duration.ofMinutes(5));
// 检查点超时时间(默认10分钟)
config.set(CheckpointingOptions.CHECKPOINTING_TIMEOUT, Duration.ofMinutes(8));
// 最大并发检查点数(默认1)
config.set(CheckpointingOptions.MAX_CONCURRENT_CHECKPOINTS, 2);
// 容忍检查点失败次数(默认0,不允许失败)
config.set(CheckpointingOptions.TOLERABLE_FAILURE_NUMBER, 3);
// 可选:指定保存点目录
// config.setString("state.savepoints.dir", "hdfs://140.245.112.44:8020/user/savepoints");
// config.setInteger("rest.port", 8081);
// StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironmentWithWebUI(config);
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(config);
env.getCheckpointConfig().disableCheckpointing();
// env.setParallelism(4); // 调整并行度
//=================启动服务=========================================
//开启flink的checkpoint功能:每隔5000ms启动一个检查点(设置checkpoint的声明周期)
//设置有且仅有一次模式 目前支持EXACTLY_ONCE/AT_LEAST_ONCE
env.enableCheckpointing(18000L, CheckpointingMode.EXACTLY_ONCE);
// env.getCheckpointConfig().setCheckpointingConsistencyMode(CheckpointingMode.EXACTLY_ONCE);
//checkpoint高级选项设置
//设置checkpoint的模式为exactly-once(这也是默认值)
//确保检查点之间至少有500ms间隔(即checkpoint的最小间隔)
env.getCheckpointConfig().setMinPauseBetweenCheckpoints(500);
//确保检查必须在1min之内完成,否则就会被丢弃掉(即checkpoint的超时时间)
env.getCheckpointConfig().setCheckpointInterval(30000); // 30秒检查点间隔
env.getCheckpointConfig().setCheckpointTimeout(60000);
//同一时间只允许操作一个检查点
env.getCheckpointConfig().setMaxConcurrentCheckpoints(1);
// 允许三个连续的 checkpoint 错误
env.getCheckpointConfig().setTolerableCheckpointFailureNumber(3); // 允许3次失败
// 在这个基础之上,添加快照
//开启在 job 中止后仍然保留的 externalizedcheckpoints 程序即使被cancel后,也会保留checkpoint数据,以便根据实际需要恢复到指定的checkpoint
env.getCheckpointConfig().setExternalizedCheckpointRetention(ExternalizedCheckpointRetention.RETAIN_ON_CANCELLATION);
//流处理模式
env.setRuntimeMode(RuntimeExecutionMode.STREAMING);
//开启checkpoints
env.getCheckpointConfig().enableUnalignedCheckpoints();
env.getCheckpointConfig().setAlignedCheckpointTimeout(Duration.ofSeconds(30));
env.disableOperatorChaining();
return env;
}
}
package com.flink.common;
import java.util.ArrayList;
import java.util.List;
import org.apache.commons.collections.CollectionUtils;
import org.apache.flink.api.common.io.ParseException;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import com.flink.enums.JobTypeEnum;
import com.flink.vo.KafkaDataSource;
import com.flink.vo.KafkaTopic;
/**
* @author wjs
* @version 创建时间:2024-12-20 10:43:56
* 类说明 抽象类对接kafka的数据,并解析关键字段
*/
public abstract class MultipleSourceCommonBase {
private static final Logger logger = LoggerFactory.getLogger(MultipleSourceCommonBase.class);
public void handleDataStreamSource(List<KafkaTopic> kafkaTopicList,JobTypeEnum jobName) throws Exception {
//1. 环境的设置
StreamExecutionEnvironment env = EnvironmentSettings.environmentSettings(jobName.getCode());
logger.info("1. 环境的设置成功");
//2.资源配置文件信息的获取
List<KafkaDataSource> dataSourceList = new ArrayList<>();
if(CollectionUtils.isNotEmpty(kafkaTopicList)) {
for(KafkaTopic kafkaTopic : kafkaTopicList) {
KafkaDataSource kafkaDataSource = new KafkaDataSource();
String topic = kafkaTopic.getTopic();
String group = kafkaTopic.getGroup();
DataStreamSource<String> dataStreamSource = KafkaConnector.sourceKafka(env, topic, group);
kafkaDataSource.setDataStreamSource(dataStreamSource);
kafkaDataSource.setTopic(topic);
dataSourceList.add(kafkaDataSource);
}
}
logger.info("2.资源配置文件信息的获取成功");
//3.Kafka资源ETL
parseSourceKafkaJson(dataSourceList);
logger.info("3.Kafka资源ETL操作成功");
env.execute(jobName.getDescription());
}
/**
* 解析JSON数据(抽象方法的设置)
* @param dataSourceList
* @throws ParseException
* @throws Exception
*/
public abstract void parseSourceKafkaJson(List<KafkaDataSource> dataSourceList) throws ParseException, Exception;
}
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or sign in to comment