Skip to content
Toggle navigation
P
Projects
G
Groups
S
Snippets
Help
eagleEye
/
eagleEye-flink_kafka
This project
Loading...
Sign in
Toggle navigation
Go to a project
Project
Repository
Issues
0
Merge Requests
0
Pipelines
Wiki
Snippets
Settings
Activity
Graph
Charts
Create a new issue
Jobs
Commits
Issue Boards
Files
Commits
Branches
Tags
Contributors
Graph
Compare
Charts
Commit
d894f28f
authored
May 27, 2025
by
魏建枢
Browse files
Options
_('Browse Files')
Download
Email Patches
Plain Diff
双流join
parent
db8a7d63
Hide whitespace changes
Inline
Side-by-side
Showing
21 changed files
with
676 additions
and
49 deletions
eagleEye-flink_kafka/pom.xml
eagleEye-flink_kafka/src/main/java/com/flink/achieve/doris/EventIpConvertAchi.java
eagleEye-flink_kafka/src/main/java/com/flink/achieve/doris/EventIpLatestAchi.java
eagleEye-flink_kafka/src/main/java/com/flink/common/MultipleSourceCommonBase.java
eagleEye-flink_kafka/src/main/java/com/flink/common/SourceCommonBase.java
eagleEye-flink_kafka/src/main/java/com/flink/enums/JobTypeEnum.java
eagleEye-flink_kafka/src/main/java/com/flink/enums/TopicTypeEnum.java
eagleEye-flink_kafka/src/main/java/com/flink/factory/JobProcessorFactory.java
eagleEye-flink_kafka/src/main/java/com/flink/processor/impl/CollectLogProcessor.java
eagleEye-flink_kafka/src/main/java/com/flink/processor/impl/EventIpConvertProcessor.java
eagleEye-flink_kafka/src/main/java/com/flink/processor/impl/EventIpLatestProcessor.java
eagleEye-flink_kafka/src/main/java/com/flink/processor/impl/RealBalanceProcessor.java
eagleEye-flink_kafka/src/main/java/com/flink/processor/impl/RealKycProcessor.java
eagleEye-flink_kafka/src/main/java/com/flink/processor/impl/RealTransactionProcessor.java
eagleEye-flink_kafka/src/main/java/com/flink/processor/impl/RealUsersProcessor.java
eagleEye-flink_kafka/src/main/java/com/flink/util/TimeConvertUtil.java
eagleEye-flink_kafka/src/main/java/com/flink/vo/EventIp.java
eagleEye-flink_kafka/src/main/java/com/flink/vo/KafkaDataSource.java
eagleEye-flink_kafka/src/main/java/com/flink/vo/KafkaTopic.java
eagleEye-flink_kafka/src/main/java/com/flink/vo/Result.java
eagleEye-flink_kafka/src/main/java/com/flink/vo/SimiUserInfo.java
eagleEye-flink_kafka/pom.xml
View file @
d894f28f
...
...
@@ -32,17 +32,17 @@
<!--通用依赖 -->
<dependencies>
<dependency>
<groupId>
org.apache.flink
</groupId>
<artifactId>
flink-table-planner_2.12
</artifactId>
<version>
1.20.0
</version>
</dependency>
<dependency>
<groupId>
org.apache.flink
</groupId>
<artifactId>
flink-table-api-java
</artifactId>
<version>
1.20.0
</version>
</dependency>
<dependency>
<groupId>
org.apache.flink
</groupId>
<artifactId>
flink-table-planner_2.12
</artifactId>
<version>
${flink.version}
</version>
</dependency>
<dependency>
<groupId>
org.apache.flink
</groupId>
<artifactId>
flink-table-api-java
</artifactId>
<version>
${flink.version}
</version>
</dependency>
<!-- json -->
<dependency>
<groupId>
com.alibaba
</groupId>
...
...
@@ -81,7 +81,11 @@
<artifactId>
flink-connector-files
</artifactId>
<version>
${flink.version}
</version>
</dependency>
<dependency>
<groupId>
org.apache.flink
</groupId>
<artifactId>
flink-json
</artifactId>
<version>
${flink.version}
</version>
</dependency>
<dependency>
<groupId>
org.apache.flink
</groupId>
<artifactId>
flink-connector-jdbc
</artifactId>
...
...
@@ -127,7 +131,7 @@
<dependency>
<groupId>
org.apache.flink
</groupId>
<artifactId>
flink-runtime-web
</artifactId>
<version>
1.20.0
</version>
<version>
${flink.version}
</version>
<scope>
provided
</scope>
</dependency>
<dependency>
...
...
@@ -166,12 +170,12 @@
<version>
2.6.4
</version>
</dependency>
<dependency>
<groupId>
jdk.tools
</groupId>
<artifactId>
jdk.tools
</artifactId>
<version>
1.8
</version>
<scope>
system
</scope>
<!-- 根据 JDK 版本选择路径 -->
<systemPath>
${JAVA_HOME}/lib/tools.jar
</systemPath>
<groupId>
jdk.tools
</groupId>
<artifactId>
jdk.tools
</artifactId>
<version>
1.8
</version>
<scope>
system
</scope>
<!-- 根据 JDK 版本选择路径 -->
<systemPath>
${JAVA_HOME}/lib/tools.jar
</systemPath>
</dependency>
</dependencies>
...
...
eagleEye-flink_kafka/src/main/java/com/flink/achieve/doris/EventIpConvertAchi.java
View file @
d894f28f
...
...
@@ -32,7 +32,7 @@ import com.flink.vo.UserProperties;
/**
* @author wjs
* @version 创建时间:2025-5-6 16:01:23
* 类说明
* 类说明
https://www.bookstack.cn/read/flink-1.20-zh/60d184d31c0a61a5.md
*/
public
class
EventIpConvertAchi
extends
SourceCommonBase
implements
Serializable
{
...
...
eagleEye-flink_kafka/src/main/java/com/flink/achieve/doris/EventIpLatestAchi.java
0 → 100644
View file @
d894f28f
package
com
.
flink
.
achieve
.
doris
;
import
java.io.Serializable
;
import
java.time.Duration
;
import
java.time.LocalDateTime
;
import
java.time.format.DateTimeFormatter
;
import
java.util.List
;
import
java.util.Objects
;
import
org.apache.commons.collections.CollectionUtils
;
import
org.apache.commons.lang3.StringUtils
;
import
org.apache.doris.flink.sink.DorisSink
;
import
org.apache.flink.api.common.eventtime.WatermarkStrategy
;
import
org.apache.flink.api.common.functions.FlatMapFunction
;
import
org.apache.flink.api.common.functions.MapFunction
;
import
org.apache.flink.api.common.io.ParseException
;
import
org.apache.flink.api.common.state.MapState
;
import
org.apache.flink.api.common.state.MapStateDescriptor
;
import
org.apache.flink.api.common.state.StateTtlConfig
;
import
org.apache.flink.api.common.state.ValueState
;
import
org.apache.flink.api.common.state.ValueStateDescriptor
;
import
org.apache.flink.configuration.Configuration
;
import
org.apache.flink.streaming.api.datastream.DataStream
;
import
org.apache.flink.streaming.api.datastream.DataStreamSource
;
import
org.apache.flink.streaming.api.functions.KeyedProcessFunction
;
import
org.apache.flink.streaming.api.functions.co.CoProcessFunction
;
import
org.apache.flink.table.api.DataTypes
;
import
org.apache.flink.table.data.GenericRowData
;
import
org.apache.flink.table.data.RowData
;
import
org.apache.flink.table.data.StringData
;
import
org.apache.flink.table.data.TimestampData
;
import
org.apache.flink.table.types.DataType
;
import
org.apache.flink.util.Collector
;
import
org.slf4j.Logger
;
import
org.slf4j.LoggerFactory
;
import
com.alibaba.fastjson.JSONObject
;
import
com.alibaba.fastjson.TypeReference
;
import
com.flink.common.DorisConnector
;
import
com.flink.common.MultipleSourceCommonBase
;
import
com.flink.util.CompareUtils
;
import
com.flink.util.TimeConvertUtil
;
import
com.flink.util.ip2region.SearcherUtil
;
import
com.flink.vo.EventIp
;
import
com.flink.vo.KafkaDataSource
;
import
com.flink.vo.OdsEventLog
;
import
com.flink.vo.Result
;
import
com.flink.vo.SimiUserInfo
;
import
com.flink.vo.UserProperties
;
/**
* @author wjs
* @version 创建时间:2025-5-26 14:40:49
* 类说明
*/
public
class
EventIpLatestAchi
extends
MultipleSourceCommonBase
implements
Serializable
{
/**
*
*/
private
static
final
long
serialVersionUID
=
1L
;
private
static
final
Logger
logger
=
LoggerFactory
.
getLogger
(
EventIpConvertAchi
.
class
);
@Override
public
void
parseSourceKafkaJson
(
List
<
KafkaDataSource
>
dataSourceList
)
throws
ParseException
,
Exception
{
//=================配置入库字段=========================================
String
[]
fields
=
{
"cid"
,
"phone"
,
"app_key"
,
"ip"
,
"area_name"
,
"app_type"
,
"country_code"
,
"user_state"
,
"nick"
,
"create_time"
,
"__DORIS_DELETE_SIGN__"
};
DataType
[]
types
=
{
DataTypes
.
STRING
(),
DataTypes
.
STRING
(),
DataTypes
.
STRING
(),
DataTypes
.
STRING
(),
DataTypes
.
STRING
(),
DataTypes
.
STRING
(),
DataTypes
.
STRING
(),
DataTypes
.
STRING
(),
DataTypes
.
STRING
(),
DataTypes
.
TIMESTAMP
(
3
),
DataTypes
.
INT
()
};
//=================流式处理=========================================
String
tableName
=
"bi.event_ip_latest"
;
DorisSink
<
RowData
>
dorisSink
=
DorisConnector
.
sinkDoris
(
fields
,
types
,
tableName
);
//=================数据处理流水线=========================================
operatorStream
(
dataSourceList
).
map
(
new
MapFunction
<
Result
,
RowData
>(){
/**
*
*/
private
static
final
long
serialVersionUID
=
1L
;
@Override
public
RowData
map
(
Result
result
)
throws
Exception
{
if
(
null
==
result
)
{
return
null
;
}
GenericRowData
row
=
new
GenericRowData
(
fields
.
length
);
row
.
setField
(
0
,
StringData
.
fromString
(
result
.
getCid
()));
row
.
setField
(
1
,
StringData
.
fromString
(
result
.
getPhone
()));
row
.
setField
(
2
,
StringData
.
fromString
(
result
.
getAppKey
()));
row
.
setField
(
3
,
StringData
.
fromString
(
result
.
getIp
()));
row
.
setField
(
4
,
StringData
.
fromString
(
result
.
getAreaName
()));
row
.
setField
(
5
,
StringData
.
fromString
(
result
.
getAppType
()));
row
.
setField
(
6
,
StringData
.
fromString
(
result
.
getCountryCode
()));
row
.
setField
(
7
,
StringData
.
fromString
(
result
.
getUserState
()));
row
.
setField
(
8
,
StringData
.
fromString
(
result
.
getNick
()));
row
.
setField
(
9
,
TimestampData
.
fromLocalDateTime
(
LocalDateTime
.
parse
(
result
.
getCreateTime
(),
DateTimeFormatter
.
ofPattern
(
"yyyy-MM-dd HH:mm:ss.SSS"
))));
row
.
setField
(
10
,
0
);
// __DORIS_DELETE_SIGN__
return
row
;
}
})
.
filter
(
Objects:
:
nonNull
)
// .print()
.
sinkTo
(
dorisSink
)
.
name
(
"Doris-EventIpLatest"
);
}
private
DataStream
<
Result
>
operatorStream
(
List
<
KafkaDataSource
>
dataSourceList
)
{
DataStreamSource
<
String
>
userStreamSource
=
null
;
DataStreamSource
<
String
>
eventStreamSource
=
null
;
if
(
CollectionUtils
.
isNotEmpty
(
dataSourceList
))
{
for
(
KafkaDataSource
kafkaDataSource
:
dataSourceList
)
{
if
(
StringUtils
.
equals
(
kafkaDataSource
.
getTopic
(),
"simi_user_list"
))
{
userStreamSource
=
kafkaDataSource
.
getDataStreamSource
();
}
if
(
StringUtils
.
equals
(
kafkaDataSource
.
getTopic
(),
"ods_event_log"
))
{
eventStreamSource
=
kafkaDataSource
.
getDataStreamSource
();
}
}
}
else
{
return
null
;
}
// 用户数据流处理(5分钟批量更新)
DataStream
<
SimiUserInfo
>
userDataStream
=
userStreamSource
.
flatMap
(
new
FlatMapFunction
<
String
,
SimiUserInfo
>()
{
private
static
final
long
serialVersionUID
=
1L
;
@Override
public
void
flatMap
(
String
value
,
Collector
<
SimiUserInfo
>
out
)
throws
Exception
{
try
{
// 解析 Kafka 数据
SimiUserInfo
simiUserInfo
=
JSONObject
.
parseObject
(
value
,
new
TypeReference
<
SimiUserInfo
>()
{
});
simiUserInfo
.
setUpdateTime
(
TimeConvertUtil
.
convertToTimestamp
(
simiUserInfo
.
getCreate_time
()));
out
.
collect
(
simiUserInfo
);
}
catch
(
Exception
e
)
{
logger
.
error
(
"Error parsing simi_user_list 处理 Kafka 消息出错 | data:{} | error:{}"
,
value
,
e
.
getMessage
());
}
}
})
.
filter
(
u
->
StringUtils
.
isNoneEmpty
(
u
.
getCid
(),
u
.
getPhone_number
()))
.
assignTimestampsAndWatermarks
(
WatermarkStrategy
.<
SimiUserInfo
>
forBoundedOutOfOrderness
(
Duration
.
ofMinutes
(
5
))
.
withTimestampAssigner
((
user
,
ts
)
->
user
.
getUpdateTime
()))
.
keyBy
(
user
->
user
.
getCid
()
+
"#_#"
+
user
.
getPhone_number
()).
process
(
new
LatestUserProcessFunction
());
// 事件数据流处理
DataStream
<
EventIp
>
eventDataStream
=
eventStreamSource
.
flatMap
(
new
FlatMapFunction
<
String
,
EventIp
>()
{
private
static
final
long
serialVersionUID
=
1L
;
@Override
public
void
flatMap
(
String
value
,
Collector
<
EventIp
>
out
)
throws
Exception
{
try
{
// 解析 Kafka 数据
EventIp
event
=
handleData
(
value
);
if
(
event
!=
null
)
out
.
collect
(
event
);
}
catch
(
Exception
e
)
{
logger
.
error
(
"Error parsing ods_event_log 处理 Kafka 消息出错 | data:{} | error:{}"
,
value
,
e
.
getMessage
());
}
}
})
.
assignTimestampsAndWatermarks
(
WatermarkStrategy
.<
EventIp
>
forBoundedOutOfOrderness
(
Duration
.
ofSeconds
(
5
))
.
withTimestampAssigner
((
event
,
ts
)
->
event
.
getEventTime
()));
return
eventDataStream
.
connect
(
userDataStream
)
.
keyBy
(
event
->
event
.
getCid
()
+
"#_#"
+
event
.
getPhone
(),
user
->
user
.
getCid
()
+
"#_#"
+
user
.
getPhone_number
())
.
process
(
new
JoinProcessor
()
{
/**
*
*/
private
static
final
long
serialVersionUID
=
1L
;
});
}
// 双流连接处理器(带缓存机制)
private
static
class
JoinProcessor
extends
CoProcessFunction
<
EventIp
,
SimiUserInfo
,
Result
>
{
/**
*
*/
private
static
final
long
serialVersionUID
=
1L
;
private
ValueState
<
SimiUserInfo
>
userState
;
private
MapState
<
Long
,
EventIp
>
pendingEvents
;
@Override
public
void
open
(
Configuration
parameters
)
{
userState
=
getRuntimeContext
().
getState
(
new
ValueStateDescriptor
<>(
"user-state"
,
SimiUserInfo
.
class
));
MapStateDescriptor
<
Long
,
EventIp
>
eventsDesc
=
new
MapStateDescriptor
<>(
"pendingEvents"
,
Long
.
class
,
EventIp
.
class
);
pendingEvents
=
getRuntimeContext
().
getMapState
(
eventsDesc
);
}
@Override
public
void
processElement1
(
EventIp
event
,
Context
ctx
,
Collector
<
Result
>
out
)
throws
Exception
{
SimiUserInfo
user
=
userState
.
value
();
if
(
user
!=
null
)
{
out
.
collect
(
buildResult
(
user
,
event
));
}
else
{
pendingEvents
.
put
(
event
.
getEventTime
(),
event
);
ctx
.
timerService
().
registerEventTimeTimer
(
event
.
getEventTime
()
+
60000
);
// 1分钟超时
}
}
@Override
public
void
processElement2
(
SimiUserInfo
user
,
Context
ctx
,
Collector
<
Result
>
out
)
throws
Exception
{
userState
.
update
(
user
);
// 更新最新用户状态
for
(
EventIp
event
:
pendingEvents
.
values
())
{
out
.
collect
(
buildResult
(
user
,
event
));
}
pendingEvents
.
clear
();
}
@Override
public
void
onTimer
(
long
timestamp
,
OnTimerContext
ctx
,
Collector
<
Result
>
out
)
throws
Exception
{
pendingEvents
.
remove
(
timestamp
-
60000
);
// 清理超时事件
}
private
Result
buildResult
(
SimiUserInfo
user
,
EventIp
event
)
{
Result
result
=
new
Result
();
result
.
setCid
(
user
.
getCid
());
result
.
setPhone
(
user
.
getPhone_number
());
result
.
setCountryCode
(
user
.
getCountry_code
());
result
.
setUserState
(
user
.
getUser_state
());
result
.
setIp
(
event
.
getIp
());
result
.
setAreaName
(
event
.
getAreaName
());
result
.
setNick
(
event
.
getNick
());
result
.
setAppKey
(
event
.
getAppKey
());
result
.
setAppType
(
event
.
getAppType
());
result
.
setCreateTime
(
event
.
getCreateTime
());
return
result
;
}
}
// 使用状态维护每个用户的最新记录
private
static
class
LatestUserProcessFunction
extends
KeyedProcessFunction
<
String
,
SimiUserInfo
,
SimiUserInfo
>
{
/**
*
*/
private
static
final
long
serialVersionUID
=
1L
;
private
ValueState
<
SimiUserInfo
>
latestUserState
;
@Override
public
void
open
(
Configuration
parameters
)
{
// 初始化用户状态
ValueStateDescriptor
<
SimiUserInfo
>
descriptor
=
new
ValueStateDescriptor
<>(
"user-state"
,
SimiUserInfo
.
class
);
StateTtlConfig
ttlConfig
=
StateTtlConfig
.
newBuilder
(
Duration
.
ofMinutes
(
30
))
.
setUpdateType
(
StateTtlConfig
.
UpdateType
.
OnCreateAndWrite
)
.
cleanupIncrementally
(
1000
,
true
)
//Heap StateBackend 使用增量清理
// .cleanupInRocksdbCompactFilter(1000) //RocksDB StateBackend 使用压缩清理
.
build
();
descriptor
.
enableTimeToLive
(
ttlConfig
);
latestUserState
=
getRuntimeContext
().
getState
(
descriptor
);
}
@Override
public
void
processElement
(
SimiUserInfo
user
,
Context
ctx
,
Collector
<
SimiUserInfo
>
out
)
throws
Exception
{
SimiUserInfo
currentLatest
=
latestUserState
.
value
();
if
(
currentLatest
==
null
||
user
.
getUpdateTime
()
>
currentLatest
.
getUpdateTime
())
{
latestUserState
.
update
(
user
);
out
.
collect
(
user
);
}
}
}
public
static
EventIp
handleData
(
String
record
)
throws
ParseException
,
Exception
{
// TODO 数据的 ETL 处理
OdsEventLog
odsEventLog
=
JSONObject
.
parseObject
(
record
,
new
TypeReference
<
OdsEventLog
>()
{});
String
createTime
=
odsEventLog
.
getCreate_time
();
String
routeIp
=
odsEventLog
.
getRoute_ip
();
String
userProperties
=
odsEventLog
.
getUser_properties
();
String
appKey
=
odsEventLog
.
getApp_key
();
String
appType
=
odsEventLog
.
getApp_type
();
if
(
StringUtils
.
isEmpty
(
appKey
)
||
StringUtils
.
equals
(
appKey
,
"C7jias27jias2"
))
{
appKey
=
"8ooOvXJo276"
;
}
String
cid
=
null
;
String
phone
=
null
;
String
nick
=
null
;
if
(
StringUtils
.
isNotEmpty
(
userProperties
))
{
List
<
UserProperties
>
userPropertiesList
=
JSONObject
.
parseObject
(
userProperties
,
new
TypeReference
<
List
<
UserProperties
>>()
{
});
if
(
userPropertiesList
!=
null
&&
userPropertiesList
.
size
()
>
0
)
{
for
(
UserProperties
user
:
userPropertiesList
)
{
if
(
StringUtils
.
isNotEmpty
(
user
.
getCid
()))
{
cid
=
user
.
getCid
();
}
else
if
(
StringUtils
.
isNotEmpty
(
user
.
getPhone
()))
{
phone
=
user
.
getPhone
();
}
else
if
(
StringUtils
.
isNotEmpty
(
user
.
getId
()))
{
cid
=
user
.
getId
();
}
else
if
(
StringUtils
.
isNotEmpty
(
user
.
getNick
()))
{
nick
=
user
.
getNick
();
}
else
if
(
StringUtils
.
isNotEmpty
(
user
.
getEmail
()))
{
nick
=
user
.
getEmail
();
}
}
}
}
List
<
String
>
ips
=
SearcherUtil
.
convertStringToList
(
routeIp
);
if
(
CollectionUtils
.
isEmpty
(
ips
))
{
return
null
;
}
String
ip_name
=
null
;
String
area_name
=
null
;
for
(
String
ip
:
ips
)
{
if
(!
SearcherUtil
.
ipv6
(
ip
))
{
area_name
=
SearcherUtil
.
getCityInfoByFile
(
ip
);
if
(!
CompareUtils
.
stringExists
(
area_name
,
"0|0|0|内网IP|内网IP"
,
"0|0|0|内网IP|Finance-and-Promoting-Technology"
))
{
ip_name
=
ip
;
break
;
}
else
{
ip_name
=
null
;
area_name
=
null
;
}
}
}
if
(
StringUtils
.
isEmpty
(
ip_name
))
{
return
null
;
}
EventIp
eventIp
=
new
EventIp
();
eventIp
.
setCid
(
cid
);
eventIp
.
setEventTime
(
TimeConvertUtil
.
convertToTimestamp
(
createTime
));
eventIp
.
setPhone
(
phone
);
eventIp
.
setIp
(
ip_name
);
eventIp
.
setAreaName
(
area_name
);
eventIp
.
setNick
(
nick
);
eventIp
.
setAppKey
(
appKey
);
eventIp
.
setAppType
(
appType
);
eventIp
.
setCreateTime
(
createTime
);
return
eventIp
;
}
}
eagleEye-flink_kafka/src/main/java/com/flink/common/MultipleSourceCommonBase.java
0 → 100644
View file @
d894f28f
package
com
.
flink
.
common
;
import
java.util.ArrayList
;
import
java.util.List
;
import
org.apache.commons.collections.CollectionUtils
;
import
org.apache.flink.api.common.io.ParseException
;
import
org.apache.flink.streaming.api.datastream.DataStreamSource
;
import
org.apache.flink.streaming.api.environment.StreamExecutionEnvironment
;
import
org.slf4j.Logger
;
import
org.slf4j.LoggerFactory
;
import
com.flink.enums.JobTypeEnum
;
import
com.flink.vo.KafkaDataSource
;
import
com.flink.vo.KafkaTopic
;
/**
* @author wjs
* @version 创建时间:2024-12-20 10:43:56
* 类说明 抽象类对接kafka的数据,并解析关键字段
*/
public
abstract
class
MultipleSourceCommonBase
{
private
static
final
Logger
logger
=
LoggerFactory
.
getLogger
(
MultipleSourceCommonBase
.
class
);
public
void
handleDataStreamSource
(
List
<
KafkaTopic
>
kafkaTopicList
,
JobTypeEnum
jobName
)
throws
Exception
{
//1. 环境的设置
StreamExecutionEnvironment
env
=
EnvironmentSettings
.
environmentSettings
(
jobName
.
getCode
());
logger
.
info
(
"1. 环境的设置成功"
);
//2.资源配置文件信息的获取
List
<
KafkaDataSource
>
dataSourceList
=
new
ArrayList
<>();
if
(
CollectionUtils
.
isNotEmpty
(
kafkaTopicList
))
{
for
(
KafkaTopic
kafkaTopic
:
kafkaTopicList
)
{
KafkaDataSource
kafkaDataSource
=
new
KafkaDataSource
();
String
topic
=
kafkaTopic
.
getTopic
();
String
group
=
kafkaTopic
.
getGroup
();
DataStreamSource
<
String
>
dataStreamSource
=
KafkaConnector
.
sourceKafka
(
env
,
topic
,
group
);
kafkaDataSource
.
setDataStreamSource
(
dataStreamSource
);
kafkaDataSource
.
setTopic
(
topic
);
dataSourceList
.
add
(
kafkaDataSource
);
}
}
logger
.
info
(
"2.资源配置文件信息的获取成功"
);
//3.Kafka资源ETL
parseSourceKafkaJson
(
dataSourceList
);
logger
.
info
(
"3.Kafka资源ETL操作成功"
);
env
.
execute
(
jobName
.
getDescription
());
}
/**
* 解析JSON数据(抽象方法的设置)
* @param dataSourceList
* @throws ParseException
* @throws Exception
*/
public
abstract
void
parseSourceKafkaJson
(
List
<
KafkaDataSource
>
dataSourceList
)
throws
ParseException
,
Exception
;
}
eagleEye-flink_kafka/src/main/java/com/flink/common/SourceCommonBase.java
View file @
d894f28f
...
...
@@ -6,7 +6,8 @@ import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import
org.slf4j.Logger
;
import
org.slf4j.LoggerFactory
;
import
com.alibaba.fastjson.JSONObject
;
import
com.flink.enums.JobTypeEnum
;
import
com.flink.enums.TopicTypeEnum
;
/**
* @author wjs
...
...
@@ -16,17 +17,17 @@ import com.alibaba.fastjson.JSONObject;
public
abstract
class
SourceCommonBase
{
private
static
final
Logger
logger
=
LoggerFactory
.
getLogger
(
SourceCommonBase
.
class
);
public
void
handleDataStreamSource
(
String
jobName
,
String
topic
,
String
group
,
String
jobType
)
throws
Exception
{
public
void
handleDataStreamSource
(
JobTypeEnum
jobTypeEnum
,
TopicTypeEnum
topicTypeEnum
)
throws
Exception
{
//1. 环境的设置
StreamExecutionEnvironment
env
=
EnvironmentSettings
.
environmentSettings
(
jobType
);
StreamExecutionEnvironment
env
=
EnvironmentSettings
.
environmentSettings
(
jobType
Enum
.
getCode
()
);
logger
.
info
(
"1. 环境的设置成功"
);
//2.资源配置文件信息的获取
DataStreamSource
<
String
>
dataStreamSource
=
KafkaConnector
.
sourceKafka
(
env
,
topic
,
group
);
DataStreamSource
<
String
>
dataStreamSource
=
KafkaConnector
.
sourceKafka
(
env
,
topic
TypeEnum
.
getTopic
(),
topicTypeEnum
.
getGroup
()
);
logger
.
info
(
"2.资源配置文件信息的获取成功"
);
//3.Kafka资源ETL
parseSourceKafkaJson
(
dataStreamSource
);
logger
.
info
(
"3.Kafka资源ETL操作成功"
);
env
.
execute
(
job
Name
);
env
.
execute
(
job
TypeEnum
.
getDescription
()
);
}
/**
...
...
eagleEye-flink_kafka/src/main/java/com/flink/enums/JobTypeEnum.java
View file @
d894f28f
...
...
@@ -23,6 +23,7 @@ public enum JobTypeEnum {
REAL_USERS
(
"JOB_04"
,
"真实用户作业"
),
REAL_BALANCE
(
"JOB_05"
,
"真实余额作业"
),
COLLECT_LOG
(
"JOB_06"
,
"日志采集作业"
),
EVENT_IP_CONVERT_CID
(
"JOB_07"
,
"最新事件IP作业"
),
;
...
...
eagleEye-flink_kafka/src/main/java/com/flink/enums/TopicTypeEnum.java
View file @
d894f28f
package
com
.
flink
.
enums
;
import
com.flink.vo.KafkaTopic
;
/**
* @author wjs
...
...
@@ -23,6 +24,7 @@ public enum TopicTypeEnum {
ODS_CID_GROUP_OVERLAP
(
"ods_cid_group_overlap"
,
"odsCidGroupOverlap"
),
ODS_EVENT_IP_CONVERT
(
"ods_event_ip_convert"
,
"odsEventIpConvert"
),
ODS_USER_INVITATION
(
"ods_user_invitation"
,
"odsUserInvitation"
),
SIMI_USER_LIST_TOPIC
(
"simi_user_list"
,
"simiUserList"
),
;
private
String
topic
;
...
...
@@ -34,6 +36,13 @@ public enum TopicTypeEnum {
this
.
group
=
group
;
}
public
KafkaTopic
createKafkaTopic
()
{
KafkaTopic
obj
=
new
KafkaTopic
();
obj
.
setTopic
(
this
.
topic
);
obj
.
setGroup
(
this
.
group
);
return
obj
;
}
public
String
getTopic
()
{
return
topic
;
}
...
...
eagleEye-flink_kafka/src/main/java/com/flink/factory/JobProcessorFactory.java
View file @
d894f28f
...
...
@@ -3,6 +3,7 @@ package com.flink.factory;
import
com.flink.enums.JobTypeEnum
;
import
com.flink.processor.JobProcessor
;
import
com.flink.processor.impl.CollectLogProcessor
;
import
com.flink.processor.impl.EventIpLatestProcessor
;
import
com.flink.processor.impl.EventIpConvertProcessor
;
import
com.flink.processor.impl.RealBalanceProcessor
;
import
com.flink.processor.impl.RealKycProcessor
;
...
...
@@ -30,6 +31,8 @@ public class JobProcessorFactory {
return
new
RealBalanceProcessor
();
case
COLLECT_LOG:
return
new
CollectLogProcessor
();
case
EVENT_IP_CONVERT_CID:
return
new
EventIpLatestProcessor
();
default
:
throw
new
IllegalArgumentException
(
"未知的Job类型: "
+
jobType
);
}
...
...
eagleEye-flink_kafka/src/main/java/com/flink/processor/impl/CollectLogProcessor.java
View file @
d894f28f
...
...
@@ -15,10 +15,8 @@ public class CollectLogProcessor implements JobProcessor{
@Override
public
void
process
()
throws
Exception
{
new
CollectLogAchi
().
handleDataStreamSource
(
JobTypeEnum
.
COLLECT_LOG
.
getDescription
(),
TopicTypeEnum
.
ODS_NEW_COLLECT_LOG
.
getTopic
(),
TopicTypeEnum
.
ODS_NEW_COLLECT_LOG
.
getGroup
(),
JobTypeEnum
.
COLLECT_LOG
.
getCode
()
JobTypeEnum
.
COLLECT_LOG
,
TopicTypeEnum
.
ODS_NEW_COLLECT_LOG
);
}
...
...
eagleEye-flink_kafka/src/main/java/com/flink/processor/impl/EventIpConvertProcessor.java
View file @
d894f28f
...
...
@@ -15,10 +15,8 @@ public class EventIpConvertProcessor implements JobProcessor{
@Override
public
void
process
()
throws
Exception
{
new
EventIpConvertAchi
().
handleDataStreamSource
(
JobTypeEnum
.
EVENT_IP_CONVERT
.
getDescription
(),
TopicTypeEnum
.
ODS_EVENT_LOG
.
getTopic
(),
TopicTypeEnum
.
ODS_EVENT_LOG
.
getGroup
(),
JobTypeEnum
.
EVENT_IP_CONVERT
.
getCode
()
JobTypeEnum
.
EVENT_IP_CONVERT
,
TopicTypeEnum
.
ODS_EVENT_LOG
);
}
...
...
eagleEye-flink_kafka/src/main/java/com/flink/processor/impl/EventIpLatestProcessor.java
0 → 100644
View file @
d894f28f
package
com
.
flink
.
processor
.
impl
;
import
java.util.Arrays
;
import
java.util.List
;
import
java.util.stream.Collectors
;
import
com.flink.achieve.doris.EventIpLatestAchi
;
import
com.flink.enums.JobTypeEnum
;
import
com.flink.enums.TopicTypeEnum
;
import
com.flink.processor.JobProcessor
;
import
com.flink.vo.KafkaTopic
;
/**
* @author wjs
* @version 创建时间:2025-5-26 14:39:44
* 类说明
*/
public
class
EventIpLatestProcessor
implements
JobProcessor
{
@Override
public
void
process
()
throws
Exception
{
new
EventIpLatestAchi
().
handleDataStreamSource
(
createTopicList
(),
JobTypeEnum
.
EVENT_IP_CONVERT_CID
);
}
private
static
List
<
KafkaTopic
>
createTopicList
()
{
return
Arrays
.
stream
(
new
TopicTypeEnum
[]{
TopicTypeEnum
.
ODS_EVENT_LOG
,
TopicTypeEnum
.
SIMI_USER_LIST_TOPIC
}).
map
(
TopicTypeEnum:
:
createKafkaTopic
)
.
collect
(
Collectors
.
toList
());
}
}
eagleEye-flink_kafka/src/main/java/com/flink/processor/impl/RealBalanceProcessor.java
View file @
d894f28f
...
...
@@ -15,10 +15,8 @@ public class RealBalanceProcessor implements JobProcessor{
@Override
public
void
process
()
throws
Exception
{
new
RealBalanceAchi
().
handleDataStreamSource
(
JobTypeEnum
.
REAL_BALANCE
.
getDescription
(),
TopicTypeEnum
.
ODS_USER_INVITATION
.
getTopic
(),
TopicTypeEnum
.
ODS_USER_INVITATION
.
getGroup
(),
JobTypeEnum
.
REAL_BALANCE
.
getCode
()
JobTypeEnum
.
REAL_BALANCE
,
TopicTypeEnum
.
ODS_USER_INVITATION
);
}
...
...
eagleEye-flink_kafka/src/main/java/com/flink/processor/impl/RealKycProcessor.java
View file @
d894f28f
...
...
@@ -15,10 +15,8 @@ public class RealKycProcessor implements JobProcessor{
@Override
public
void
process
()
throws
Exception
{
new
RealKycAchi
().
handleDataStreamSource
(
JobTypeEnum
.
REAL_KYC
.
getDescription
(),
TopicTypeEnum
.
ODS_USER_INVITATION
.
getTopic
(),
TopicTypeEnum
.
ODS_USER_INVITATION
.
getGroup
(),
JobTypeEnum
.
REAL_KYC
.
getCode
()
JobTypeEnum
.
REAL_KYC
,
TopicTypeEnum
.
ODS_USER_INVITATION
);
}
...
...
eagleEye-flink_kafka/src/main/java/com/flink/processor/impl/RealTransactionProcessor.java
View file @
d894f28f
...
...
@@ -15,10 +15,8 @@ public class RealTransactionProcessor implements JobProcessor{
@Override
public
void
process
()
throws
Exception
{
new
RealTransactionAchi
().
handleDataStreamSource
(
JobTypeEnum
.
REAL_TRANSACTION
.
getDescription
(),
TopicTypeEnum
.
ODS_USER_INVITATION
.
getTopic
(),
TopicTypeEnum
.
ODS_USER_INVITATION
.
getGroup
(),
JobTypeEnum
.
REAL_TRANSACTION
.
getCode
()
JobTypeEnum
.
REAL_TRANSACTION
,
TopicTypeEnum
.
ODS_USER_INVITATION
);
}
...
...
eagleEye-flink_kafka/src/main/java/com/flink/processor/impl/RealUsersProcessor.java
View file @
d894f28f
...
...
@@ -15,10 +15,8 @@ public class RealUsersProcessor implements JobProcessor{
@Override
public
void
process
()
throws
Exception
{
new
RealUsersAchi
().
handleDataStreamSource
(
JobTypeEnum
.
REAL_USERS
.
getDescription
(),
TopicTypeEnum
.
ODS_USER_INVITATION
.
getTopic
(),
TopicTypeEnum
.
ODS_USER_INVITATION
.
getGroup
(),
JobTypeEnum
.
REAL_USERS
.
getCode
()
JobTypeEnum
.
REAL_USERS
,
TopicTypeEnum
.
ODS_USER_INVITATION
);
}
...
...
eagleEye-flink_kafka/src/main/java/com/flink/util/TimeConvertUtil.java
0 → 100644
View file @
d894f28f
package
com
.
flink
.
util
;
import
java.text.SimpleDateFormat
;
import
java.util.TimeZone
;
/**
* @author wjs
* @version 创建时间:2025-5-27 14:33:05
* 类说明
*/
public
class
TimeConvertUtil
{
public
static
long
convertToTimestamp
(
String
timeStr
)
throws
Exception
{
SimpleDateFormat
sdf
=
new
SimpleDateFormat
(
"yyyy-MM-dd HH:mm:ss"
);
sdf
.
setTimeZone
(
TimeZone
.
getTimeZone
(
"UTC"
));
return
sdf
.
parse
(
timeStr
).
getTime
();
}
}
eagleEye-flink_kafka/src/main/java/com/flink/vo/EventIp.java
0 → 100644
View file @
d894f28f
package
com
.
flink
.
vo
;
import
java.io.Serializable
;
import
lombok.Data
;
import
lombok.ToString
;
/**
* @author wjs
* @version 创建时间:2025-5-26 17:47:06
* 类说明
*/
@Data
@ToString
public
class
EventIp
implements
Serializable
{
/**
*
*/
private
static
final
long
serialVersionUID
=
1L
;
private
String
ip
;
private
String
areaName
;
private
String
cid
;
private
String
phone
;
private
String
nick
;
private
String
appKey
;
private
String
appType
;
private
Long
eventTime
;
private
String
createTime
;
}
eagleEye-flink_kafka/src/main/java/com/flink/vo/KafkaDataSource.java
0 → 100644
View file @
d894f28f
package
com
.
flink
.
vo
;
import
java.io.Serializable
;
import
org.apache.flink.streaming.api.datastream.DataStreamSource
;
import
lombok.Data
;
import
lombok.ToString
;
/**
* @author wjs
* @version 创建时间:2025-5-27 15:43:13
* 类说明
*/
@Data
@ToString
public
class
KafkaDataSource
implements
Serializable
{
/**
*
*/
private
static
final
long
serialVersionUID
=
1L
;
private
String
topic
;
private
DataStreamSource
<
String
>
dataStreamSource
;
}
eagleEye-flink_kafka/src/main/java/com/flink/vo/KafkaTopic.java
0 → 100644
View file @
d894f28f
package
com
.
flink
.
vo
;
import
java.io.Serializable
;
import
lombok.Data
;
import
lombok.ToString
;
/**
* @author wjs
* @version 创建时间:2025-5-27 15:37:52
* 类说明
*/
@Data
@ToString
public
class
KafkaTopic
implements
Serializable
{
/**
*
*/
private
static
final
long
serialVersionUID
=
1L
;
private
String
topic
;
private
String
group
;
}
eagleEye-flink_kafka/src/main/java/com/flink/vo/Result.java
0 → 100644
View file @
d894f28f
package
com
.
flink
.
vo
;
import
java.io.Serializable
;
import
lombok.Data
;
import
lombok.ToString
;
/**
* @author wjs
* @version 创建时间:2025-5-26 17:54:45
* 类说明
*/
@Data
@ToString
public
class
Result
implements
Serializable
{
/**
*
*/
private
static
final
long
serialVersionUID
=
1L
;
private
String
cid
;
private
String
phone
;
private
String
ip
;
private
String
areaName
;
private
String
countryCode
;
private
String
userState
;
private
String
nick
;
private
String
appKey
;
private
String
appType
;
private
String
createTime
;
}
eagleEye-flink_kafka/src/main/java/com/flink/vo/SimiUserInfo.java
0 → 100644
View file @
d894f28f
package
com
.
flink
.
vo
;
import
java.io.Serializable
;
import
lombok.Data
;
import
lombok.ToString
;
/**
* @author wjs
* @version 创建时间:2025-5-26 16:44:29 类说明
*/
@Data
@ToString
public
class
SimiUserInfo
implements
Serializable
{
/**
*
*/
private
static
final
long
serialVersionUID
=
1L
;
private
String
nick
;
private
String
country_code
;
private
String
user_head_url
;
private
String
create_time
;
private
String
phone_number
;
private
String
register_time
;
private
String
flume_type
;
private
String
cid
;
private
String
user_state
;
public
Long
updateTime
;
// 时间戳用于水印
}
Write
Preview
Markdown
is supported
0%
Try again
or
attach a new file
Attach a file
Cancel
You are about to add
0
people
to the discussion. Proceed with caution.
Finish editing this message first!
Cancel
Please
register
or
sign in
to comment