Commit 83a2b89d by 魏建枢

代码优化

parent fe6bce9c
......@@ -6,48 +6,50 @@ import org.apache.commons.lang3.StringUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import com.flink.achieve.doris.EventIpConvertAchi;
import com.flink.enums.JobTypeEnum;
import com.flink.factory.JobProcessorFactory;
import com.flink.processor.JobProcessor;
/**
* @author wjs
* @version 创建时间:2024-12-18 14:28:31 类说明
*/
* @author wjs
* @version 创建时间:2025-5-7 14:52:45
* 类说明
*/
public class JobMain {
private static final Logger logger = LoggerFactory.getLogger(JobMain.class);
public static void main(String[] args) throws Exception {
try {
logger.info("启动Flink作业,参数列表: {}", Arrays.toString(args));
// 参数解析与校验
JobTypeEnum jobType = parseJobType(args);
// 获取作业处理器并执行
JobProcessor processor = JobProcessorFactory.getProcessor(jobType);
processor.process();
} catch (IllegalArgumentException e) {
logger.error("参数校验失败: {}", e.getMessage());
throw e;
} catch (Exception e) {
logger.error("作业执行异常: 参数列表[{}]", Arrays.toString(args), e);
throw e;
}
}
private static JobTypeEnum parseJobType(String[] args) {
// 必需参数检查
if (args.length < 1) {
throw new IllegalArgumentException("必须指定JobType参数,可选值:"+JobTypeEnum.getValidCodes());
throw new IllegalArgumentException("缺失JobType参数,可用值: " + JobTypeEnum.getValidCodes());
}
String jobTypeCode = args[0];
if (StringUtils.isEmpty(jobTypeCode)) {
throw new IllegalArgumentException("JobType参数为空,合法值:"+JobTypeEnum.getValidCodes());
}
//作业类型路由
JobTypeEnum jobType = JobTypeEnum.fromCode(jobTypeCode);
switch (jobType) {
case EVENT_IP_CONVERT:
EventIpConvertAchi sourceEventLog = new EventIpConvertAchi();
sourceEventLog.handleDataStreamSource(JobTypeEnum.EVENT_IP_CONVERT.getDescription(), "ods_event_log",
"eventLogGroup", JobTypeEnum.EVENT_IP_CONVERT.getCode());
break;
default:
logger.error("未知的Job类型:{}", jobTypeCode);
if (StringUtils.isBlank(jobTypeCode)) {
throw new IllegalArgumentException("JobType参数为空,合法值: " + JobTypeEnum.getValidCodes());
}
// 枚举有效性检查
try {
return JobTypeEnum.fromCode(jobTypeCode.trim());
} catch (IllegalArgumentException e) {
logger.error("参数类型错误: {}", e.getMessage());
throw e;
} catch (ArrayIndexOutOfBoundsException e) {
logger.error("缺少必要参数: 必须指定JobType参数");
throw e;
} catch (Exception e) {
logger.error("作业启动异常", e);
throw e;
throw new IllegalArgumentException("非法的JobType参数[" + jobTypeCode + "],可用值: " + JobTypeEnum.getValidCodes());
}
}
}
......@@ -44,7 +44,7 @@ public class EventIpConvertAchi extends SourceCommonBase implements Serializable
private static final Logger logger = LoggerFactory.getLogger(EventIpConvertAchi.class);
@Override
public JSONObject parseSourceKafkaJson(DataStreamSource<String> dataStreamSource) throws ParseException, Exception {
public void parseSourceKafkaJson(DataStreamSource<String> dataStreamSource) throws ParseException, Exception {
//=================配置入库字段=========================================
String[] fields = {"id", "ip", "area_name", "device_id", "cid", "phone", "nick","create_time","dt","__DORIS_DELETE_SIGN__"};
......@@ -84,7 +84,6 @@ public class EventIpConvertAchi extends SourceCommonBase implements Serializable
.filter(Objects::nonNull)
// .print(">>>>>>>>>>>>>>>");
.sinkTo(dorisSink);
return null;
}
public static JSONObject handleData(String record) throws ParseException, Exception {
......
......@@ -36,7 +36,7 @@ public abstract class SourceCommonBase {
* @throws ParseException
* @throws Exception
*/
public abstract JSONObject parseSourceKafkaJson(DataStreamSource<String> dataStreamSource) throws ParseException, Exception;
public abstract void parseSourceKafkaJson(DataStreamSource<String> dataStreamSource) throws ParseException, Exception;
/**
* 将处理完之后的数据发往 kafka 队列 供下游计算使用(抽象方法的设置)
......
package com.flink.enums;
/**
* @author wjs
* @version 创建时间:2025-5-7 14:33:53
* 类说明
*/
public enum TopicTypeEnum {
ODS_EVENT_LOG("ods_event_log","eventLogGroup"),
ODS_NEW_COLLECT_LOG("ods_new_collect_log","odsNewCollectLog"),
ODS_ZIPPER_STRATEGY("ods_zipper_strategy","odsZipperStrategy"),
ODS_ZIPPER_DEVICE_FINGERPRINT("ods_zipper_device_fingerprint","odsZipperDeviceFingerprint"),
ODS_DEVICE_ID_NODE("ods_device_id_node","odsDeviceIdNode"),
ODS_OPEN_API("ods_open_api","odsOpenApi"),
ODS_USER_FRIENDS("ods_user_friends","odsUserFriends"),
ODS_USER_GROUPS("ods_user_groups","odsUserGroups"),
ODS_USER_INFO("ods_user_info","odsUserInfo"),
ODS_USER_GROUP_CHAT_HISTORY("ods_user_group_chat_history","odsUserGroupChatHistory"),
ODS_USER_LIST("ods_user_list","odsUserList"),
ODS_DEVICE_OVERLAP("ods_device_overlap","odsDeviceOverlap"),
ODS_CID_GROUP_OVERLAP("ods_cid_group_overlap","odsCidGroupOverlap"),
ODS_EVENT_IP_CONVERT("ods_event_ip_convert","odsEventIpConvert"),
;
private String topic;
private String group;
private TopicTypeEnum(String topic, String group) {
this.topic = topic;
this.group = group;
}
public String getTopic() {
return topic;
}
public String getGroup() {
return group;
}
}
package com.flink.factory;
import com.flink.enums.JobTypeEnum;
import com.flink.processor.JobProcessor;
import com.flink.processor.impl.EventIpConvertProcessor;
/**
* @author wjs
* @version 创建时间:2025-5-7 14:46:56
* 类说明 工厂模式实现
*/
public class JobProcessorFactory {
public static JobProcessor getProcessor(JobTypeEnum jobType) {
switch (jobType) {
case EVENT_IP_CONVERT:
return new EventIpConvertProcessor();
default:
throw new IllegalArgumentException("未知的Job类型: " + jobType);
}
}
}
package com.flink.processor;
/**
* @author wjs
* @version 创建时间:2025-5-7 15:16:54
* 类说明 作业处理器接口
*/
public interface JobProcessor {
void process() throws Exception;
}
package com.flink.processor.impl;
import com.flink.achieve.doris.EventIpConvertAchi;
import com.flink.enums.JobTypeEnum;
import com.flink.enums.TopicTypeEnum;
import com.flink.processor.JobProcessor;
/**
* @author wjs
* @version 创建时间:2025-5-7 15:17:46
* 类说明 IP转换作业具体实现
*/
public class EventIpConvertProcessor implements JobProcessor{
@Override
public void process() throws Exception {
new EventIpConvertAchi().handleDataStreamSource(
JobTypeEnum.EVENT_IP_CONVERT.getDescription(),
TopicTypeEnum.ODS_EVENT_LOG.getTopic(),
TopicTypeEnum.ODS_EVENT_LOG.getGroup(),
JobTypeEnum.EVENT_IP_CONVERT.getCode()
);
}
}
package com.flink.sink;
import java.util.ArrayList;
import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.common.serialization.SimpleStringSchema;
import org.apache.flink.api.java.utils.ParameterTool;
import org.apache.flink.connector.base.DeliveryGuarantee;
import org.apache.flink.connector.kafka.sink.KafkaRecordSerializationSchema;
import org.apache.flink.connector.kafka.sink.KafkaSink;
import org.apache.flink.streaming.api.CheckpointingMode;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.CheckpointConfig;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.util.Collector;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* @author wjs
* @version 创建时间:2024-12-16 15:48:34
* 类说明 kafka 连接器使用
*/
public class KafkaSinkStreamingJob {
// private static final Logger logger = LoggerFactory.getLogger(KafkaSinkStreamingJob.class);
//
// public static void main(String[] args) throws Exception {
//
// //===============1.获取参数==============================
// //定义文件路径
// String propertiesFilePath = "D:\\gitEagleEyeFlink\\eagleEye-flink_kafka\\src\\main\\resources\\application.properties";
// //方式一:直接使用内置工具类
// ParameterTool paramsMap = ParameterTool.fromPropertiesFile(propertiesFilePath);
//
// //================2.初始化kafka参数==============================
// String bootstrapServers = paramsMap.get("kafka.bootstrapServers");
// String topic = paramsMap.get("kafka.topic");
//
// KafkaSink<String> sink = KafkaSink.<String>builder()
// //设置kafka地址
// .setBootstrapServers(bootstrapServers)
// //设置消息序列号方式
// .setRecordSerializer(KafkaRecordSerializationSchema.builder()
// .setTopic(topic)
// .setValueSerializationSchema(new SimpleStringSchema())
// .build()
// )
// //至少一次
// .setDeliveryGuarantee(DeliveryGuarantee.AT_LEAST_ONCE)
// .build();
//
//
// //=================4.创建Flink运行环境=================
// StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
// ArrayList<String> listData = new ArrayList<>();
// listData.add("test");
// listData.add("java");
// listData.add("c++");
// DataStreamSource<String> dataStreamSource = env.fromCollection(listData);
//
// //=================5.数据简单处理======================
// SingleOutputStreamOperator<String> flatMap = dataStreamSource.flatMap(new FlatMapFunction<String, String>() {
// @Override
// public void flatMap(String record, Collector<String> collector) throws Exception {
// logger.info("正在处理kafka数据:{}", record);
// collector.collect(record);
// }
// });
//
// //数据输出算子
// flatMap.sinkTo(sink);
//
// //=================6.启动服务=========================================
// //开启flink的checkpoint功能:每隔1000ms启动一个检查点(设置checkpoint的声明周期)
// env.enableCheckpointing(1000);
// //checkpoint高级选项设置
// //设置checkpoint的模式为exactly-once(这也是默认值)
// env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);
// //确保检查点之间至少有500ms间隔(即checkpoint的最小间隔)
// env.getCheckpointConfig().setMinPauseBetweenCheckpoints(500);
// //确保检查必须在1min之内完成,否则就会被丢弃掉(即checkpoint的超时时间)
// env.getCheckpointConfig().setCheckpointTimeout(60000);
// //同一时间只允许操作一个检查点
// env.getCheckpointConfig().setMaxConcurrentCheckpoints(1);
// //程序即使被cancel后,也会保留checkpoint数据,以便根据实际需要恢复到指定的checkpoint
// env.getCheckpointConfig().enableExternalizedCheckpoints(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);
// //设置statebackend,指定state和checkpoint的数据存储位置(checkpoint的数据必须得有一个可以持久化存储的地方)
// env.getCheckpointConfig().setCheckpointStorage("file:///E:/flink/checkPoint");
// env.execute();
// }
}
package com.flink.source;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.common.serialization.SimpleStringSchema;
import org.apache.flink.api.java.utils.ParameterTool;
import org.apache.flink.connector.kafka.source.KafkaSource;
import org.apache.flink.connector.kafka.source.KafkaSourceBuilder;
import org.apache.flink.connector.kafka.source.enumerator.initializer.OffsetsInitializer;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.util.Collector;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
// 网址https://gitee.com/shawsongyue/aurora/tree/develop/aurora_flink_connector_kafka
// 网址https://blog.csdn.net/weixin_40736233/article/details/136002402
/**
* @author wjs
* @version 创建时间:2024-12-16 15:48:34
* 类说明
*/
public class KafkaSourceStreamingJob {
// private static final Logger logger = LoggerFactory.getLogger(KafkaSourceStreamingJob.class);
//
// public static void main(String[] args) throws Exception {
//
// //===============1.获取参数==============================
// //定义文件路径
// String propertiesFilePath = "D:\\gitEagleEyeFlink\\eagleEye-flink_kafka\\src\\main\\resources\\application.properties";
// //方式一:直接使用内置工具类
// ParameterTool paramsMap = ParameterTool.fromPropertiesFile(propertiesFilePath);
//
// //================2.初始化kafka参数==============================
// String bootstrapServers = paramsMap.get("kafka.bootstrapServers");
// String group = paramsMap.get("kafka.group");
//
//
// //=================3.创建kafka数据源=============================
// KafkaSourceBuilder<String> kafkaSourceBuilder = KafkaSource.builder();
// //(1)设置kafka地址
// kafkaSourceBuilder.setBootstrapServers(bootstrapServers);
// //(2)设置消费这组id
// kafkaSourceBuilder.setGroupId(group);
// //(3)设置主题,支持多种主题组合
// setTopic(kafkaSourceBuilder);
// //(4)设置消费模式,支持多种消费模式
// setStartingOffsets(kafkaSourceBuilder);
// //(5)设置反序列化器
// setDeserializer(kafkaSourceBuilder);
// //(6)构建全部参数
// KafkaSource<String> kafkaSource = kafkaSourceBuilder.build();
// //(7)动态检查新分区, 10 秒检查一次新分区
// kafkaSourceBuilder.setProperty("partition.discovery.interval.ms", "10000");
//
// //=================4.创建Flink运行环境=================
// StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
// DataStreamSource<String> dataStreamSource = env.fromSource(kafkaSource, WatermarkStrategy.noWatermarks(), "Kafka Source");
//
// //=================5.数据简单处理======================
// dataStreamSource.flatMap(new FlatMapFunction<String, String>() {
// @Override
// public void flatMap(String record, Collector<String> collector) throws Exception {
// logger.info("正在预处理源数据:{}", record);
// }
// });
//
// //=================6.启动服务=========================================
// env.execute();
// }
//
//
// /**
// *
// * @description 主题模式设置
// * 1.设置单个主题
// * 2.设置多个主题
// * 3.设置主题list
// * 4.设置正则表达式匹配主题
// * 5.订阅指定分区Partition
// *
// * @author 浅夏的猫
// * @datetime 21:18 2024/2/5
// * @param kafkaSourceBuilder
// */
// private static void setTopic(KafkaSourceBuilder<String> kafkaSourceBuilder) {
// //组合1:设置单个主题
// kafkaSourceBuilder.setTopics("ods_collect_log");
// //组合2:设置多个主题
//// kafkaSourceBuilder.setTopics("topic_a", "topic_b");
// //组合3:设置主题list
//// kafkaSourceBuilder.setTopics(Arrays.asList("topic_a", "topic_b"));
// //组合4:设置正则表达式匹配主题
//// kafkaSourceBuilder.setTopicPattern(Pattern.compile("topic_a.*"));
// //组合5:订阅指定分区Partition,指定消费主题的哪一个分区,也支持消费多个主题的多个分区
//// final HashSet<TopicPartition> partitionSet = new HashSet<>(Arrays.asList(new TopicPartition("topic_a", 0), new TopicPartition("topic_b", 4)));
//// kafkaSourceBuilder.setPartitions(partitionSet);
// }
//
// /**
// * @description 消费模式
// * 1.从消费组提交的位点开始消费,不指定位点重置策略
// * 2.从消费组提交的位点开始消费,如果提交位点不存在,使用最早位点
// * 3.从时间戳大于等于指定时间戳(毫秒)的数据开始消费
// * 4.从最早位点开始消费
// * 5.从最末尾位点开始消费,即从注册时刻开始消费
// *
// * @author 浅夏的猫
// * @datetime 21:27 2024/2/5
// * @param kafkaSourceBuilder
// */
// private static void setStartingOffsets(KafkaSourceBuilder<String> kafkaSourceBuilder){
// //模式1: 从消费组提交的位点开始消费,不指定位点重置策略,这种策略会报异常,没有设置快照或设置自动提交offset:Caused by: org.apache.kafka.clients.consumer.NoOffsetForPartitionException: Undefined offset with no reset policy for partitions: [topic_a-3]
//// kafkaSourceBuilder.setStartingOffsets(OffsetsInitializer.committedOffsets());
// //模式2:从消费组提交的位点开始消费,如果提交位点不存在,使用最早位点
//// kafkaSourceBuilder.setStartingOffsets(OffsetsInitializer.committedOffsets(OffsetResetStrategy.EARLIEST));
// //模式3:从时间戳大于等于指定时间戳(毫秒)的数据开始消费
// kafkaSourceBuilder.setStartingOffsets(OffsetsInitializer.timestamp(1657256176000L));
// //模式4:从最早位点开始消费
//// kafkaSourceBuilder.setStartingOffsets(OffsetsInitializer.earliest());
// //模式5:从最末尾位点开始消费,即从注册时刻开始消费
//// kafkaSourceBuilder.setStartingOffsets(OffsetsInitializer.latest());
// }
//
// /**
// * @description 设置反序列器,支持多种反序列号方式
// * 1.自定义如何解析kafka数据
// * 2.使用Kafka 提供的解析器处理
// * 3.只设置kafka的value反序列化
// *
// * @author 浅夏的猫
// * @datetime 21:35 2024/2/5
// * @param kafkaSourceBuilder
// */
// private static void setDeserializer(KafkaSourceBuilder<String> kafkaSourceBuilder){
// //1.自定义如何解析kafka数据
//// KafkaRecordDeserializationSchema<String> kafkaRecordDeserializationSchema = new KafkaRecordDeserializationSchema<>() {
//// @Override
//// public TypeInformation<String> getProducedType() {
//// return TypeInformation.of(String.class);
//// }
////
//// @Override
//// public void deserialize(ConsumerRecord<byte[], byte[]> consumerRecord, Collector<String> collector) throws IOException {
//// //自定义解析数据
//// byte[] valueByte = consumerRecord.value();
//// String value = new String(valueByte);
//// //下发消息
//// collector.collect(value);
//// }
//// };
//// kafkaSourceBuilder.setDeserializer(kafkaRecordDeserializationSchema);
//
// //2.使用Kafka 提供的解析器处理
//// kafkaSourceBuilder.setDeserializer(KafkaRecordDeserializationSchema.valueOnly(StringDeserializer.class));
//
// //3.只设置kafka的value反序列化
// kafkaSourceBuilder.setValueOnlyDeserializer(new SimpleStringSchema());
// }
}
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or sign in to comment