Commit fe6bce9c by 魏建枢

代码抽取

parent da551c2d
......@@ -33,7 +33,7 @@
</filters>
<transformers>
<transformer>
<mainClass>com.flink.KafkaStreamingJob</mainClass>
<mainClass>com.flink.JobMain</mainClass>
</transformer>
<transformer />
</transformers>
......
......@@ -87,7 +87,6 @@
<artifactId>flink-connector-jdbc</artifactId>
<version>3.3.0-1.20</version>
</dependency>
<dependency>
<groupId>mysql</groupId>
<artifactId>mysql-connector-java</artifactId>
......@@ -211,7 +210,7 @@
<transformer
implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer">
<!-- Replace this with the main class of your job -->
<mainClass>com.flink.KafkaStreamingJob</mainClass>
<mainClass>com.flink.JobMain</mainClass>
</transformer>
<transformer
implementation="org.apache.maven.plugins.shade.resource.ServicesResourceTransformer" />
......
package com.flink;
import java.util.Arrays;
import org.apache.commons.lang3.StringUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import com.flink.achieve.doris.EventIpConvertAchi;
import com.flink.enums.JobTypeEnum;
/**
* @author wjs
* @version 创建时间:2024-12-18 14:28:31 类说明
*/
public class JobMain {
private static final Logger logger = LoggerFactory.getLogger(JobMain.class);
public static void main(String[] args) throws Exception {
try {
logger.info("启动Flink作业,参数列表: {}", Arrays.toString(args));
if (args.length < 1) {
throw new IllegalArgumentException("必须指定JobType参数,可选值:"+JobTypeEnum.getValidCodes());
}
String jobTypeCode = args[0];
if (StringUtils.isEmpty(jobTypeCode)) {
throw new IllegalArgumentException("JobType参数为空,合法值:"+JobTypeEnum.getValidCodes());
}
//作业类型路由
JobTypeEnum jobType = JobTypeEnum.fromCode(jobTypeCode);
switch (jobType) {
case EVENT_IP_CONVERT:
EventIpConvertAchi sourceEventLog = new EventIpConvertAchi();
sourceEventLog.handleDataStreamSource(JobTypeEnum.EVENT_IP_CONVERT.getDescription(), "ods_event_log",
"eventLogGroup", JobTypeEnum.EVENT_IP_CONVERT.getCode());
break;
default:
logger.error("未知的Job类型:{}", jobTypeCode);
}
} catch (IllegalArgumentException e) {
logger.error("参数类型错误: {}", e.getMessage());
throw e;
} catch (ArrayIndexOutOfBoundsException e) {
logger.error("缺少必要参数: 必须指定JobType参数");
throw e;
} catch (Exception e) {
logger.error("作业启动异常", e);
throw e;
}
}
}
package com.flink;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import com.flink.achieve.ods.OdsEventLogSourceAchi;
/**
* @author wjs
* @version 创建时间:2024-12-18 14:28:31
* 类说明
*/
public class KafkaStreamingJob {
private static final Logger logger = LoggerFactory.getLogger(KafkaStreamingJob.class);
public static void main(String[] args) throws Exception {
OdsEventLogSourceAchi sourceEventLog = new OdsEventLogSourceAchi();
sourceEventLog.handleDataStreamSource("事件日志聚合JOB", "ods_event_log", "eventLogGroup");
}
}
package com.flink.achieve.ods;
package com.flink.achieve.doris;
import java.io.Serializable;
import java.time.Instant;
import java.time.format.DateTimeFormatter;
import java.util.ArrayList;
import java.util.List;
import java.util.Objects;
import org.apache.commons.collections.CollectionUtils;
import org.apache.commons.lang3.StringUtils;
import org.apache.doris.flink.sink.DorisSink;
import org.apache.flink.api.common.io.ParseException;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.table.api.DataTypes;
import org.apache.flink.table.data.GenericRowData;
import org.apache.flink.table.data.RowData;
import org.apache.flink.table.data.StringData;
import org.apache.flink.table.data.TimestampData;
import org.apache.flink.table.types.DataType;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import com.alibaba.fastjson.JSONObject;
import com.alibaba.fastjson.TypeReference;
import com.flink.common.DorisConnector;
import com.flink.common.SourceCommonBase;
import com.flink.enums.AppTypeEnum;
import com.flink.util.ip2region.SearcherUtil;
......@@ -21,20 +32,63 @@ import com.flink.vo.UserProperties;
/**
* @author wjs
* @version 创建时间:2025-4-24 18:05:25
* @version 创建时间:2025-5-6 16:01:23
* 类说明
*/
public class OdsEventLogSourceAchi extends SourceCommonBase implements Serializable{
public class EventIpConvertAchi extends SourceCommonBase implements Serializable{
/**
*
*/
private static final long serialVersionUID = 1L;
private static final Logger logger = LoggerFactory.getLogger(OdsEventLogSourceAchi.class);
private static final Logger logger = LoggerFactory.getLogger(EventIpConvertAchi.class);
@Override
public JSONObject parseSourceKafkaJson(String record) throws ParseException, Exception {
logger.info("OdsEventLogSourceAchi record:{}",record);
public JSONObject parseSourceKafkaJson(DataStreamSource<String> dataStreamSource) throws ParseException, Exception {
//=================配置入库字段=========================================
String[] fields = {"id", "ip", "area_name", "device_id", "cid", "phone", "nick","create_time","dt","__DORIS_DELETE_SIGN__"};
DataType[] types = {DataTypes.STRING(), DataTypes.STRING(), DataTypes.STRING(), DataTypes.STRING(),
DataTypes.STRING(), DataTypes.STRING(), DataTypes.STRING(),DataTypes.STRING(),DataTypes.TIMESTAMP()};
//=================流式处理=========================================
String tableName = "bi.event_ip_convert";
DorisSink<RowData> dorisSink = DorisConnector.sinkDoris(fields, types, tableName);
//=================数据处理流水线=========================================
dataStreamSource
.map(value -> {
try {
JSONObject jsonObj = handleData(value);
if(null == jsonObj) {
return null;
}
GenericRowData row = new GenericRowData(9);
DateTimeFormatter formatter = DateTimeFormatter.ISO_INSTANT;
System.out.println("value" + value);
// 字段映射
row.setField(0, StringData.fromString((String)jsonObj.get("id")));
row.setField(1, StringData.fromString((String) jsonObj.get("ips")));
row.setField(2, StringData.fromString((String) jsonObj.get("areaNameList")));
row.setField(3, StringData.fromString((String) jsonObj.get("deviceId")));
row.setField(4, StringData.fromString((String) jsonObj.get("cid")));
row.setField(5, StringData.fromString((String) jsonObj.get("phone")));
row.setField(6, StringData.fromString((String) jsonObj.get("nick")));
row.setField(7, StringData.fromString((String) jsonObj.get("createTime")));
row.setField(8, TimestampData.fromInstant(Instant.now()));
return (RowData)row;
} catch (Exception e) {
System.err.println("解析失败: "+e.toString());
return null;
}
})
.filter(Objects::nonNull)
// .print(">>>>>>>>>>>>>>>");
.sinkTo(dorisSink);
return null;
}
public static JSONObject handleData(String record) throws ParseException, Exception {
logger.info("EventIpConvertAchi record:{}",record);
// TODO 数据的 ETL 处理
OdsEventLog odsEventLog = JSONObject.parseObject(record,new TypeReference<OdsEventLog>(){});
String id = odsEventLog.getId();
......@@ -117,6 +171,4 @@ public class OdsEventLogSourceAchi extends SourceCommonBase implements Serializa
}
}
package com.flink.achieve.ods;
import java.io.Serializable;
import org.apache.commons.lang3.StringUtils;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import com.alibaba.fastjson.JSONObject;
import com.alibaba.fastjson.TypeReference;
import com.flink.common.SourceCommonBase;
import com.flink.enums.AppTypeEnum;
import com.flink.vo.OdsCollectLog;
import com.flink.vo.android.AndroidCollectionBody;
import com.flink.vo.android.deviceInfo.AndroidA1;
import com.flink.vo.android.envInfo.AndroidEnvInfo;
import com.flink.vo.android.otherInfo.OtherInfo;
import com.flink.vo.ios.IosCollectionBody;
import com.flink.vo.ios.IosDeviceInfo;
import com.flink.vo.ios.IosEnvInfo;
/**
* @author wjs
* @version 创建时间:2024-12-20 10:55:09
* 类说明
*/
public class OdsCollectLogSourceAchi extends SourceCommonBase implements Serializable{
/**
*
*/
private static final long serialVersionUID = 1L;
private static final Logger logger = LoggerFactory.getLogger(OdsCollectLogSourceAchi.class);
@Override
public JSONObject parseSourceKafkaJson(String record) throws Exception {
logger.info("record:{}",record);
// TODO 数据的 ETL 处理
OdsCollectLog odsCollectLog = JSONObject.parseObject(record,new TypeReference<OdsCollectLog>(){});
String id = odsCollectLog.getId();
String deviceId = odsCollectLog.getDevice_id();
String uid = odsCollectLog.getUid();
String strategyGroupId = odsCollectLog.getStrategy_group_id();
String strategyVersion = odsCollectLog.getStrategy_version();
String sendTime = odsCollectLog.getSend_time();
String createTime = odsCollectLog.getCreate_time();
String appKey = odsCollectLog.getApp_key();
String appType = odsCollectLog.getApp_type();
String appChannel = odsCollectLog.getApp_channel();
String zoneCode = odsCollectLog.getZone_code();
String zoneName = odsCollectLog.getZone_name();
String zoneType = odsCollectLog.getZone_type();
String sdkVersion = odsCollectLog.getSdk_version();
String userAgent = odsCollectLog.getUser_agent();
String otherInfo = odsCollectLog.getOther_info();
String deviceInfo = odsCollectLog.getDevice_info();
String envInfo = odsCollectLog.getEnv_info();
String userProperties = odsCollectLog.getUser_properties();
String uniqueId = odsCollectLog.getUnique_id();
if(StringUtils.equals(appType, AppTypeEnum.ANDROID.getCode())) {
AndroidA1 a1 = JSONObject.parseObject(deviceInfo,new TypeReference<AndroidA1>(){});
AndroidEnvInfo g1 = JSONObject.parseObject(envInfo,new TypeReference<AndroidEnvInfo>(){});
OtherInfo i1 = JSONObject.parseObject(otherInfo,new TypeReference<OtherInfo>(){});
AndroidCollectionBody androidBodyObj = new AndroidCollectionBody();
androidBodyObj.setA1(a1);
androidBodyObj.setG1(g1);
androidBodyObj.setI1(i1);
}else if(StringUtils.equals(appType, AppTypeEnum.IOS.getCode())) {
IosDeviceInfo a1 = JSONObject.parseObject(deviceInfo,new TypeReference<IosDeviceInfo>(){});
IosEnvInfo g1 = JSONObject.parseObject(envInfo,new TypeReference<IosEnvInfo>(){});
IosCollectionBody iosBodyObj = new IosCollectionBody();
iosBodyObj.setA1(a1);
iosBodyObj.setG1(g1);
}
return null;
}
@Override
public void sendToSinkKafka(DataStreamSource<String> mStream) {
// TODO Auto-generated method stub
}
}
......@@ -23,7 +23,7 @@ public class DorisConnector {
//=================设置属性=========================================
Properties streamLoadProps = new Properties();
streamLoadProps.setProperty("format", "json");
streamLoadProps.setProperty("format", LoadConstants.JSON);
streamLoadProps.setProperty("read_json_by_line", "true");
streamLoadProps.setProperty("strip_outer_array", "false");
streamLoadProps.setProperty("sink.enable-2pc", "true");
......
package com.flink.common;
import java.time.Duration;
import org.apache.flink.api.common.RuntimeExecutionMode;
import org.apache.flink.configuration.CheckpointingOptions;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.ExternalizedCheckpointRetention;
import org.apache.flink.configuration.StateBackendOptions;
import org.apache.flink.core.execution.CheckpointingMode;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.slf4j.Logger;
......@@ -11,45 +15,61 @@ import org.slf4j.LoggerFactory;
/**
* @author wjs
* @version 创建时间:2025-4-23 14:36:34
* 类说明
* 类说明 https://www.bookstack.cn/read/flink-1.20-zh/TryFlink.md
*/
public class EnvironmentSettings {
private static final Logger logger = LoggerFactory.getLogger(EnvironmentSettings.class);
//环境设置
public static StreamExecutionEnvironment environmentSettings() {
public static StreamExecutionEnvironment environmentSettings(String jobType) {
Configuration config = new Configuration();
// config.setString("parallelism.default", "4");
// config.setString("taskmanager.numberOfTaskSlots", "16");
// config.setString("taskmanager.memory.flink.size", "8192m");
// config.setString("taskmanager.memory.jvm-metaspace.size", "4096m");
config.setString("state.backend", "filesystem");
// config.setString("state.backend", "filesystem");
// 指定检查点目录(必须是持久化存储路径,如 HDFS)
config.setString("state.checkpoints.dir", "hdfs://140.245.112.44:8020/user/ck");
// 可选:指定保存点目录
config.setString("state.savepoints.dir", "hdfs://140.245.112.44:8020/user/savepoints");
// config.setString("state.checkpoints.dir", "hdfs://140.245.112.44:8020/user/ck");
// 状态后端设为 RocksDB
config.set(StateBackendOptions.STATE_BACKEND, "rocksdb");
// 检查点存储为文件系统
config.set(CheckpointingOptions.CHECKPOINT_STORAGE, "filesystem");
// HDFS 路径
config.set(CheckpointingOptions.CHECKPOINTS_DIRECTORY, "hdfs://140.245.112.44:8020/user/ck/"+jobType);
// 检查点间隔(默认未启用)
config.set(CheckpointingOptions.CHECKPOINTING_INTERVAL, Duration.ofMinutes(5));
// 检查点超时时间(默认10分钟)
config.set(CheckpointingOptions.CHECKPOINTING_TIMEOUT, Duration.ofMinutes(8));
// 最大并发检查点数(默认1)
config.set(CheckpointingOptions.MAX_CONCURRENT_CHECKPOINTS, 2);
// 容忍检查点失败次数(默认0,不允许失败)
config.set(CheckpointingOptions.TOLERABLE_FAILURE_NUMBER, 3);
// conf.setInteger("rest.port", 8081);
// StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironmentWithWebUI(conf);
// 可选:指定保存点目录
// config.setString("state.savepoints.dir", "hdfs://140.245.112.44:8020/user/savepoints");
// config.setInteger("rest.port", 8081);
// StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironmentWithWebUI(config);
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(config);
env.getCheckpointConfig().disableCheckpointing();
// env.setParallelism(4); // 调整并行度
//=================启动服务=========================================
//开启flink的checkpoint功能:每隔5000ms启动一个检查点(设置checkpoint的声明周期)
//设置有且仅有一次模式 目前支持EXACTLY_ONCE/AT_LEAST_ONCE
env.enableCheckpointing(180000L, CheckpointingMode.EXACTLY_ONCE);
env.enableCheckpointing(18000L, CheckpointingMode.EXACTLY_ONCE);
// env.getCheckpointConfig().setCheckpointingConsistencyMode(CheckpointingMode.EXACTLY_ONCE);
//checkpoint高级选项设置
//设置checkpoint的模式为exactly-once(这也是默认值)
//确保检查点之间至少有500ms间隔(即checkpoint的最小间隔)
env.getCheckpointConfig().setMinPauseBetweenCheckpoints(50000);
env.getCheckpointConfig().setMinPauseBetweenCheckpoints(500);
//确保检查必须在1min之内完成,否则就会被丢弃掉(即checkpoint的超时时间)
env.getCheckpointConfig().setCheckpointInterval(30000); // 30秒检查点间隔
env.getCheckpointConfig().setCheckpointTimeout(600000);
env.getCheckpointConfig().setCheckpointTimeout(60000);
//同一时间只允许操作一个检查点
env.getCheckpointConfig().setMaxConcurrentCheckpoints(1);
// 允许三个连续的 checkpoint 错误
env.getCheckpointConfig().setTolerableCheckpointFailureNumber(3); // 允许3次失败
// 在这个基础之上,添加快照
//开启在 job 中止后仍然保留的 externalizedcheckpoints 程序即使被cancel后,也会保留checkpoint数据,以便根据实际需要恢复到指定的checkpoint
......@@ -58,7 +78,8 @@ public class EnvironmentSettings {
env.setRuntimeMode(RuntimeExecutionMode.STREAMING);
//开启checkpoints
env.getCheckpointConfig().enableUnalignedCheckpoints();
// env.disableOperatorChaining();
env.getCheckpointConfig().setAlignedCheckpointTimeout(Duration.ofSeconds(30));
env.disableOperatorChaining();
return env;
}
}
package com.flink.common;
import java.time.Instant;
import java.time.LocalDateTime;
import java.time.ZonedDateTime;
import java.time.format.DateTimeFormatter;
import java.util.Objects;
import org.apache.doris.flink.sink.DorisSink;
import org.apache.flink.api.common.io.ParseException;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.DataTypes;
import org.apache.flink.table.data.GenericRowData;
import org.apache.flink.table.data.RowData;
import org.apache.flink.table.data.StringData;
import org.apache.flink.table.data.TimestampData;
import org.apache.flink.table.types.DataType;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
......@@ -29,52 +16,16 @@ import com.alibaba.fastjson.JSONObject;
public abstract class SourceCommonBase {
private static final Logger logger = LoggerFactory.getLogger(SourceCommonBase.class);
public void handleDataStreamSource(String jobName,String topic,String group) throws Exception {
public void handleDataStreamSource(String jobName,String topic,String group,String jobType) throws Exception {
//1. 环境的设置
StreamExecutionEnvironment env = EnvironmentSettings.environmentSettings();
StreamExecutionEnvironment env = EnvironmentSettings.environmentSettings(jobType);
logger.info("1. 环境的设置成功");
//2.资源配置文件信息的获取
DataStreamSource<String> dataStreamSource = KafkaConnector.sourceKafka(env, topic, group);
//=================配置入库字段=========================================
String[] fields = {"id", "ip", "area_name", "device_id", "cid", "phone", "nick","create_time","dt","__DORIS_DELETE_SIGN__"};
DataType[] types = {DataTypes.STRING(), DataTypes.STRING(), DataTypes.STRING(), DataTypes.STRING(),
DataTypes.STRING(), DataTypes.STRING(), DataTypes.STRING(),DataTypes.STRING(),DataTypes.TIMESTAMP()};
//=================流式处理=========================================
String tableName = "bi.event_ip_convert";
DorisSink<RowData> dorisSink = DorisConnector.sinkDoris(fields, types, tableName);
// 数据处理流水线
dataStreamSource
.map(value -> {
try {
JSONObject jsonObj = parseSourceKafkaJson(value);
if(null == jsonObj) {
return null;
}
// String[] parts = parsed.split(",");
GenericRowData row = new GenericRowData(9);
DateTimeFormatter formatter = DateTimeFormatter.ISO_INSTANT;
System.out.println("value" + value);
// 字段映射
row.setField(0, StringData.fromString((String)jsonObj.get("id")));
row.setField(1, StringData.fromString((String) jsonObj.get("ips")));
row.setField(2, StringData.fromString((String) jsonObj.get("areaNameList")));
row.setField(3, StringData.fromString((String) jsonObj.get("deviceId")));
row.setField(4, StringData.fromString((String) jsonObj.get("cid")));
row.setField(5, StringData.fromString((String) jsonObj.get("phone")));
row.setField(6, StringData.fromString((String) jsonObj.get("nick")));
row.setField(7, StringData.fromString((String) jsonObj.get("createTime")));
row.setField(8, TimestampData.fromInstant(Instant.now()));
return (RowData)row;
} catch (Exception e) {
System.err.println("解析失败: "+e.toString());
return null;
}
})
.filter(Objects::nonNull)
// .print(">>>>>>>>>>>>>>>");
.sinkTo(dorisSink);
logger.info("2.资源配置文件信息的获取成功");
//3.Kafka资源ETL
parseSourceKafkaJson(dataStreamSource);
logger.info("3.Kafka资源ETL操作成功");
env.execute(jobName);
}
......@@ -85,7 +36,7 @@ public abstract class SourceCommonBase {
* @throws ParseException
* @throws Exception
*/
public abstract JSONObject parseSourceKafkaJson(String record) throws ParseException, Exception;
public abstract JSONObject parseSourceKafkaJson(DataStreamSource<String> dataStreamSource) throws ParseException, Exception;
/**
* 将处理完之后的数据发往 kafka 队列 供下游计算使用(抽象方法的设置)
......
package com.flink.enums;
import java.util.Arrays;
import java.util.List;
import java.util.Locale;
import java.util.Map;
import java.util.function.Function;
import java.util.stream.Collectors;
/**
* 作业类型枚举(线程安全)
* @author wjs
* @version 创建时间:2025-5-6 16:48:12
* <p>使用示例:
* JobTypeEnum type = JobTypeEnum.fromCode("JOB_01");
* </p>
* 类说明 job类型
*/
public enum JobTypeEnum {
EVENT_IP_CONVERT("JOB_01", "事件IP转换作业")
;
private static final String JOB_PREFIX = "作业类型-";
public static final Map<String, JobTypeEnum> CODE_MAP =
Arrays.stream(values()).collect(Collectors.toMap(JobTypeEnum::getCode, Function.identity()));
private final String code;
private final String description;
JobTypeEnum(String code, String description) {
this.code = code;
this.description = JOB_PREFIX + description;
}
public String getCode() { return code; }
public String getDescription() { return description; }
public static JobTypeEnum fromCode(String code) {
JobTypeEnum type = CODE_MAP.get(code);
if (type == null) {
throw new IllegalArgumentException("无效的作业代码: " + code);
}
return type;
}
public static List<String> getValidCodes() {
return CODE_MAP.keySet().stream().collect(Collectors.toList());
}
public String toMetricName() {
return name().toLowerCase(Locale.ENGLISH) + "_job";
}
@Override
public String toString() {
return String.format("%s(%s)", description, code);
}
}
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or sign in to comment