Skip to content
Toggle navigation
P
Projects
G
Groups
S
Snippets
Help
eagleEye
/
eagleEye-flink_kafka
This project
Loading...
Sign in
Toggle navigation
Go to a project
Project
Repository
Issues
0
Merge Requests
0
Pipelines
Wiki
Snippets
Settings
Activity
Graph
Charts
Create a new issue
Jobs
Commits
Issue Boards
Files
Commits
Branches
Tags
Contributors
Graph
Compare
Charts
Commit
d9306c6a
authored
Aug 25, 2025
by
魏建枢
Browse files
Options
_('Browse Files')
Download
Email Patches
Plain Diff
风险上报
parent
14b52cf9
Hide whitespace changes
Inline
Side-by-side
Showing
2 changed files
with
52 additions
and
4 deletions
eagleEye-flink_kafka/src/main/java/com/flink/achieve/doris/SimiFriendsAchi.java
eagleEye-flink_kafka/src/main/java/com/flink/processor/impl/OkHttpService.java
eagleEye-flink_kafka/src/main/java/com/flink/achieve/doris/SimiFriendsAchi.java
View file @
d9306c6a
...
...
@@ -12,6 +12,8 @@ import java.util.List;
import
java.util.Map
;
import
java.util.Objects
;
import
java.util.Set
;
import
java.util.regex.Matcher
;
import
java.util.regex.Pattern
;
import
java.util.stream.Collectors
;
import
org.apache.commons.collections.CollectionUtils
;
...
...
@@ -77,6 +79,10 @@ public class SimiFriendsAchi extends SourceCommonBase implements Serializable {
*/
private
static
final
long
serialVersionUID
=
1L
;
private
static
final
Logger
logger
=
LoggerFactory
.
getLogger
(
SimiFriendsAchi
.
class
);
private
static
final
Pattern
COUNTRY_PATTERN
=
Pattern
.
compile
(
"尼日利亚|缅甸|柬埔寨|老挝|格鲁吉亚"
);
private
static
final
Pattern
TIMEZONE_PATTERN
=
Pattern
.
compile
(
"West Africa Standard Time|缅甸|中南半岛"
);
@Override
public
void
parseMultipleSourceKafkaJson
(
List
<
KafkaDataSource
>
dataSourceList
)
throws
ParseException
,
Exception
{
...
...
@@ -529,9 +535,16 @@ public class SimiFriendsAchi extends SourceCommonBase implements Serializable {
public
void
flatMap
(
TotalTemp
temp
,
Collector
<
RowData
>
out
)
{
try
{
GenericRowData
row
=
new
GenericRowData
(
totalConfigTemp
.
getFields
().
length
);
String
areaName
=
temp
.
getAreaName
();
String
networkAreaName
=
temp
.
getNetwork_area_name
();
String
zoneName
=
temp
.
getZone_name
();
String
cid
=
temp
.
getCid
();
if
(
matchesCondition
(
areaName
,
networkAreaName
,
zoneName
))
{
logger
.
info
(
"匹配成功 cid:{}"
,
cid
);
OkHttpService
.
reportRisks
(
cid
);
}
// 按表结构顺序设置字段(与totalConfigTemp定义一致)
row
.
setField
(
0
,
StringData
.
fromString
(
temp
.
getCid
()
));
// cid
row
.
setField
(
0
,
StringData
.
fromString
(
cid
));
// cid
row
.
setField
(
1
,
temp
.
getOverlapCidTotal
());
// overlap_cid_total
row
.
setField
(
2
,
temp
.
getIp
()
!=
null
?
StringData
.
fromString
(
temp
.
getIp
())
:
null
);
// ip
row
.
setField
(
3
,
temp
.
getAreaName
()
!=
null
?
StringData
.
fromString
(
temp
.
getAreaName
())
:
null
);
// area_name
...
...
@@ -546,6 +559,18 @@ public class SimiFriendsAchi extends SourceCommonBase implements Serializable {
logger
.
error
(
"转换TotalTemp为RowData时出错: {}"
,
e
.
getMessage
());
}
}
public
boolean
matchesCondition
(
String
areaName
,
String
networkAreaName
,
String
zoneName
)
{
// 检查 area_name 或 network_area_name 是否匹配国家列表
boolean
isCountryMatch
=
matchesPattern
(
areaName
,
COUNTRY_PATTERN
)
||
matchesPattern
(
networkAreaName
,
COUNTRY_PATTERN
);
// 检查 zone_name 是否匹配时区
boolean
isTimezoneMatch
=
matchesPattern
(
zoneName
,
TIMEZONE_PATTERN
);
return
isCountryMatch
||
isTimezoneMatch
;
}
private
boolean
matchesPattern
(
String
input
,
Pattern
pattern
)
{
if
(
StringUtils
.
isEmpty
(
input
))
return
false
;
Matcher
matcher
=
pattern
.
matcher
(
input
);
return
matcher
.
find
();
}
});
totalTempRowDataStream
.
filter
(
Objects:
:
nonNull
)
...
...
eagleEye-flink_kafka/src/main/java/com/flink/processor/impl/OkHttpService.java
View file @
d9306c6a
package
com
.
flink
.
processor
.
impl
;
import
java.io.IOException
;
import
java.time.LocalDate
;
import
java.time.format.DateTimeFormatter
;
import
org.apache.commons.codec.digest.DigestUtils
;
import
org.slf4j.Logger
;
import
org.slf4j.LoggerFactory
;
import
com.alibaba.fastjson.JSONArray
;
import
com.alibaba.fastjson.JSONObject
;
import
com.alibaba.fastjson.TypeReference
;
import
com.flink.util.LoadPropertiesFile
;
...
...
@@ -34,8 +37,9 @@ public class OkHttpService{
public
static
void
main
(
String
[]
args
)
{
// String str = friends("3333ilove");
String
str
=
groups
(
"3333ilove"
);
System
.
out
.
println
(
str
);
// String str = groups("3333ilove");
// System.out.println(str);
reportRisks
(
"6816ryhvx"
);
}
...
...
@@ -57,6 +61,25 @@ public class OkHttpService{
return
get
(
url
,
timestamp
,
authorization
,
signature
,
cid
);
}
public
static
void
reportRisks
(
String
cid
)
{
LocalDate
today
=
LocalDate
.
now
();
DateTimeFormatter
formatter
=
DateTimeFormatter
.
ofPattern
(
"yyyy-MM-dd"
);
String
formattedDate
=
today
.
format
(
formatter
);
JSONArray
jsonArray
=
new
JSONArray
();
JSONObject
jsonObj
=
new
JSONObject
();
jsonObj
.
put
(
"cid"
,
cid
);
jsonObj
.
put
(
"riskLevel"
,
"1"
);
jsonObj
.
put
(
"riskType"
,
null
);
jsonObj
.
put
(
"dt"
,
formattedDate
);
jsonArray
.
add
(
jsonObj
);
String
timestamp
=
System
.
currentTimeMillis
()+
""
;
String
url
=
URL
+
"/dataApi/reportRiskInfo"
;
String
authorization
=
AUTHORIZATION
;
String
signature
=
DigestUtils
.
md5Hex
(
jsonArray
.
toJSONString
()+
"&"
+
authorization
+
"&"
+
timestamp
+
"&"
+
KEY
);
post
(
url
,
jsonArray
.
toJSONString
(),
timestamp
,
authorization
,
signature
);
}
private
static
String
get
(
String
url
,
String
timestamp
,
String
authorization
,
String
signature
,
String
cid
)
{
System
.
out
.
println
(
"get OkHttpService: 请求内容 : \n "
+
url
);
OkHttpClient
client
=
new
OkHttpClient
();
...
...
Write
Preview
Markdown
is supported
0%
Try again
or
attach a new file
Attach a file
Cancel
You are about to add
0
people
to the discussion. Proceed with caution.
Finish editing this message first!
Cancel
Please
register
or
sign in
to comment