Skip to content
Toggle navigation
P
Projects
G
Groups
S
Snippets
Help
eagleEye
/
eagleEye-flink_kafka
This project
Loading...
Sign in
Toggle navigation
Go to a project
Project
Repository
Issues
0
Merge Requests
0
Pipelines
Wiki
Snippets
Settings
Activity
Graph
Charts
Create a new issue
Jobs
Commits
Issue Boards
Files
Commits
Branches
Tags
Contributors
Graph
Compare
Charts
Commit
5e0e66f4
authored
Oct 09, 2025
by
魏建枢
Browse files
Options
_('Browse Files')
Download
Email Patches
Plain Diff
空值异常提交
parent
27c4fd22
Show whitespace changes
Inline
Side-by-side
Showing
1 changed file
with
47 additions
and
8 deletions
eagleEye-flink_kafka/src/main/java/com/flink/achieve/doris/SimiFriendsAchi.java
eagleEye-flink_kafka/src/main/java/com/flink/achieve/doris/SimiFriendsAchi.java
View file @
5e0e66f4
...
@@ -441,25 +441,35 @@ public class SimiFriendsAchi extends SourceCommonBase implements Serializable {
...
@@ -441,25 +441,35 @@ public class SimiFriendsAchi extends SourceCommonBase implements Serializable {
@Override
@Override
public
void
flatMap
(
TotalTemp
temp
,
Collector
<
RowData
>
out
)
{
public
void
flatMap
(
TotalTemp
temp
,
Collector
<
RowData
>
out
)
{
try
{
try
{
// 空值安全检查
if
(
temp
==
null
)
{
logger
.
warn
(
"TotalTemp对象为null,跳过处理"
);
return
;
}
GenericRowData
row
=
new
GenericRowData
(
totalConfigTemp
.
getFields
().
length
);
GenericRowData
row
=
new
GenericRowData
(
totalConfigTemp
.
getFields
().
length
);
String
areaName
=
temp
.
getAreaName
();
String
areaName
=
temp
.
getAreaName
();
String
networkAreaName
=
temp
.
getNetwork_area_name
();
String
networkAreaName
=
temp
.
getNetwork_area_name
();
String
zoneName
=
temp
.
getZone_name
();
String
zoneName
=
temp
.
getZone_name
();
String
cid
=
temp
.
getCid
();
String
cid
=
temp
.
getCid
();
// 添加CID空值检查
if
(
StringUtils
.
isEmpty
(
cid
))
{
logger
.
warn
(
"CID为空,跳过处理: {}"
,
temp
);
return
;
}
if
(
matchesCondition
(
areaName
,
networkAreaName
,
zoneName
))
{
if
(
matchesCondition
(
areaName
,
networkAreaName
,
zoneName
))
{
logger
.
info
(
"匹配成功 cid:{}"
,
cid
);
logger
.
info
(
"匹配成功 cid:{}"
,
cid
);
OkHttpService
.
reportRisks
(
cid
);
OkHttpService
.
reportRisks
(
cid
);
}
}
// 按表结构顺序设置字段(与totalConfigTemp定义一致)
// 按表结构顺序设置字段(与totalConfigTemp定义一致)
row
.
setField
(
0
,
String
Data
.
fromString
(
cid
)
);
// cid
row
.
setField
(
0
,
String
Utils
.
isNotEmpty
(
cid
)
?
StringData
.
fromString
(
cid
)
:
null
);
// cid
row
.
setField
(
1
,
temp
.
getOverlapCidTotal
());
// overlap_cid_total
row
.
setField
(
1
,
temp
.
getOverlapCidTotal
()
!=
null
?
temp
.
getOverlapCidTotal
()
:
0
);
// overlap_cid_total
row
.
setField
(
2
,
temp
.
getIp
()
!=
null
?
StringData
.
fromString
(
temp
.
getIp
())
:
null
);
// ip
row
.
setField
(
2
,
StringUtils
.
isNotEmpty
(
temp
.
getIp
())
?
StringData
.
fromString
(
temp
.
getIp
())
:
null
);
// ip
row
.
setField
(
3
,
temp
.
getAreaName
()
!=
null
?
StringData
.
fromString
(
temp
.
getAreaName
())
:
null
);
// area_name
row
.
setField
(
3
,
StringUtils
.
isNotEmpty
(
temp
.
getAreaName
())
?
StringData
.
fromString
(
temp
.
getAreaName
())
:
null
);
// area_name
row
.
setField
(
4
,
String
Data
.
fromString
(
temp
.
getAddMethod
()
));
// add_method
row
.
setField
(
4
,
String
Utils
.
isNotEmpty
(
temp
.
getAddMethod
())
?
StringData
.
fromString
(
temp
.
getAddMethod
())
:
StringData
.
fromString
(
"未知"
));
// add_method
row
.
setField
(
5
,
TimeConvertUtil
.
currentTimestamp
());
// create_time
row
.
setField
(
5
,
TimeConvertUtil
.
currentTimestamp
());
// create_time
row
.
setField
(
6
,
String
Data
.
fromString
(
temp
.
getNetwork_ip
())
);
// network_ip
row
.
setField
(
6
,
String
Utils
.
isNotEmpty
(
temp
.
getNetwork_ip
())
?
StringData
.
fromString
(
temp
.
getNetwork_ip
())
:
null
);
// network_ip
row
.
setField
(
7
,
String
Data
.
fromString
(
temp
.
getNetwork_area_name
())
);
// network_area_name
row
.
setField
(
7
,
String
Utils
.
isNotEmpty
(
temp
.
getNetwork_area_name
())
?
StringData
.
fromString
(
temp
.
getNetwork_area_name
())
:
null
);
// network_area_name
row
.
setField
(
8
,
String
Data
.
fromString
(
temp
.
getZone_name
())
);
// zone_name
row
.
setField
(
8
,
String
Utils
.
isNotEmpty
(
temp
.
getZone_name
())
?
StringData
.
fromString
(
temp
.
getZone_name
())
:
null
);
// zone_name
row
.
setField
(
9
,
0
);
// __DORIS_DELETE_SIGN__ (0表示有效数据)
row
.
setField
(
9
,
0
);
// __DORIS_DELETE_SIGN__ (0表示有效数据)
out
.
collect
(
row
);
out
.
collect
(
row
);
}
catch
(
Exception
e
)
{
}
catch
(
Exception
e
)
{
...
@@ -467,6 +477,9 @@ public class SimiFriendsAchi extends SourceCommonBase implements Serializable {
...
@@ -467,6 +477,9 @@ public class SimiFriendsAchi extends SourceCommonBase implements Serializable {
}
}
}
}
public
boolean
matchesCondition
(
String
areaName
,
String
networkAreaName
,
String
zoneName
)
{
public
boolean
matchesCondition
(
String
areaName
,
String
networkAreaName
,
String
zoneName
)
{
if
(
areaName
==
null
)
areaName
=
""
;
if
(
networkAreaName
==
null
)
networkAreaName
=
""
;
if
(
zoneName
==
null
)
zoneName
=
""
;
// 检查 area_name 或 network_area_name 是否匹配国家列表
// 检查 area_name 或 network_area_name 是否匹配国家列表
boolean
isCountryMatch
=
matchesPattern
(
areaName
,
COUNTRY_PATTERN
)
||
matchesPattern
(
networkAreaName
,
COUNTRY_PATTERN
);
boolean
isCountryMatch
=
matchesPattern
(
areaName
,
COUNTRY_PATTERN
)
||
matchesPattern
(
networkAreaName
,
COUNTRY_PATTERN
);
// 检查 zone_name 是否匹配时区
// 检查 zone_name 是否匹配时区
...
@@ -506,6 +519,11 @@ public class SimiFriendsAchi extends SourceCommonBase implements Serializable {
...
@@ -506,6 +519,11 @@ public class SimiFriendsAchi extends SourceCommonBase implements Serializable {
@Override
@Override
public
void
flatMap
(
FriendsRecord
friendsRecord
,
Collector
<
RowData
>
out
)
{
public
void
flatMap
(
FriendsRecord
friendsRecord
,
Collector
<
RowData
>
out
)
{
// 1. 检查 friendsRecord 是否为 null
if
(
friendsRecord
==
null
)
{
logger
.
warn
(
"friendsRecord is null, skipping..."
);
return
;
}
List
<
FriendsStream
>
friendsStreamList
=
friendsRecord
.
getFriendsStreamList
();
List
<
FriendsStream
>
friendsStreamList
=
friendsRecord
.
getFriendsStreamList
();
if
(
CollectionUtils
.
isEmpty
(
friendsStreamList
))
{
if
(
CollectionUtils
.
isEmpty
(
friendsStreamList
))
{
return
;
return
;
...
@@ -538,11 +556,32 @@ public class SimiFriendsAchi extends SourceCommonBase implements Serializable {
...
@@ -538,11 +556,32 @@ public class SimiFriendsAchi extends SourceCommonBase implements Serializable {
@Override
@Override
public
void
flatMap
(
FriendsRecord
friendsRecord
,
Collector
<
RowData
>
out
)
{
public
void
flatMap
(
FriendsRecord
friendsRecord
,
Collector
<
RowData
>
out
)
{
// 1. 检查 friendsRecord 是否为 null
if
(
friendsRecord
==
null
)
{
logger
.
warn
(
"friendsRecord is null, skipping..."
);
return
;
}
List
<
FriendsStream
>
friendsStreamList
=
friendsRecord
.
getFriendsStreamList
();
List
<
FriendsStream
>
friendsStreamList
=
friendsRecord
.
getFriendsStreamList
();
String
cid
=
friendsRecord
.
getCid
();
String
cid
=
friendsRecord
.
getCid
();
String
friendCid
=
friendsRecord
.
getFriendCid
();
String
friendCid
=
friendsRecord
.
getFriendCid
();
List
<
SimiFriends
>
cidsList
=
friendsRecord
.
getCidsList
();
List
<
SimiFriends
>
cidsList
=
friendsRecord
.
getCidsList
();
List
<
SimiFriends
>
friendsList
=
friendsRecord
.
getFriendsList
();
List
<
SimiFriends
>
friendsList
=
friendsRecord
.
getFriendsList
();
// 2. 检查 friendsStreamList 是否为空
if
(
CollectionUtils
.
isEmpty
(
friendsStreamList
))
{
return
;
}
// 3. 检查 cid 和 friendCid 是否为空
if
(
StringUtils
.
isEmpty
(
cid
)
||
StringUtils
.
isEmpty
(
friendCid
))
{
logger
.
warn
(
"cid or friendCid is empty, skipping..."
);
return
;
}
// 4. 检查 cidsList 和 friendsList 是否为空
if
(
CollectionUtils
.
isEmpty
(
cidsList
)
||
CollectionUtils
.
isEmpty
(
friendsList
))
{
logger
.
warn
(
"cidsList or friendsList is empty, skipping..."
);
return
;
}
// 1. 按用户分组并统计各组条数
// 1. 按用户分组并统计各组条数
Map
<
String
,
Long
>
userGroupCounts
=
friendsStreamList
.
stream
()
Map
<
String
,
Long
>
userGroupCounts
=
friendsStreamList
.
stream
()
.
collect
(
Collectors
.
groupingBy
(
.
collect
(
Collectors
.
groupingBy
(
...
...
Write
Preview
Markdown
is supported
0%
Try again
or
attach a new file
Attach a file
Cancel
You are about to add
0
people
to the discussion. Proceed with caution.
Finish editing this message first!
Cancel
Please
register
or
sign in
to comment