Skip to content
Toggle navigation
P
Projects
G
Groups
S
Snippets
Help
eagleEye
/
eagleEye-flink_kafka
This project
Loading...
Sign in
Toggle navigation
Go to a project
Project
Repository
Issues
0
Merge Requests
0
Pipelines
Wiki
Snippets
Settings
Activity
Graph
Charts
Create a new issue
Jobs
Commits
Issue Boards
Files
Commits
Branches
Tags
Contributors
Graph
Compare
Charts
Commit
448e98a5
authored
Aug 28, 2025
by
魏建枢
Browse files
Options
_('Browse Files')
Download
Email Patches
Plain Diff
代码提交
parent
05aafffc
Hide whitespace changes
Inline
Side-by-side
Showing
3 changed files
with
16 additions
and
1 deletions
eagleEye-flink_kafka/src/main/java/com/flink/achieve/base/EventLogAchi.java
eagleEye-flink_kafka/src/main/java/com/flink/achieve/table/udf/ParseEventListUDTF.java
eagleEye-flink_kafka/src/main/java/com/flink/common/StreamEnvironmentSettings.java
eagleEye-flink_kafka/src/main/java/com/flink/achieve/base/EventLogAchi.java
View file @
448e98a5
...
@@ -397,7 +397,18 @@ public class EventLogAchi implements Serializable {
...
@@ -397,7 +397,18 @@ public class EventLogAchi implements Serializable {
DateTimeFormatter
.
ofPattern
(
"yyyy-MM-dd HH:mm:ss.SSS"
))));
DateTimeFormatter
.
ofPattern
(
"yyyy-MM-dd HH:mm:ss.SSS"
))));
Properties
r8
=
eventInfo
.
getR8
();
Properties
r8
=
eventInfo
.
getR8
();
row
.
setField
(
4
,
StringData
.
fromString
(
r8
.
getType
()));
row
.
setField
(
4
,
StringData
.
fromString
(
r8
.
getType
()));
row
.
setField
(
5
,
StringData
.
fromString
(
r8
.
getId
()));
String
id
=
null
;
if
(
r8
!=
null
)
{
if
(
StringUtils
.
isNotEmpty
(
r8
.
getId
()))
{
if
(
r8
.
getId
().
length
()
>
200
)
{
id
=
"测试异常数据:"
+
r8
.
getId
().
substring
(
0
,
200
);
}
else
{
id
=
r8
.
getId
();
}
}
}
row
.
setField
(
5
,
StringData
.
fromString
(
id
));
row
.
setField
(
6
,
StringData
.
fromString
(
userProps
==
null
?
null
:
userProps
.
getNick
()));
row
.
setField
(
6
,
StringData
.
fromString
(
userProps
==
null
?
null
:
userProps
.
getNick
()));
row
.
setField
(
7
,
StringData
.
fromString
(
event
.
getCreate_time
()));
row
.
setField
(
7
,
StringData
.
fromString
(
event
.
getCreate_time
()));
row
.
setField
(
8
,
StringData
.
fromString
(
event
.
getSend_time
()));
row
.
setField
(
8
,
StringData
.
fromString
(
event
.
getSend_time
()));
...
...
eagleEye-flink_kafka/src/main/java/com/flink/achieve/table/udf/ParseEventListUDTF.java
View file @
448e98a5
...
@@ -60,6 +60,9 @@ public class ParseEventListUDTF extends TableFunction<Row>{
...
@@ -60,6 +60,9 @@ public class ParseEventListUDTF extends TableFunction<Row>{
ip_name
=
null
;
ip_name
=
null
;
area_name
=
null
;
area_name
=
null
;
}
}
}
else
{
ip_name
=
ip
;
break
;
}
}
}
}
}
}
...
...
eagleEye-flink_kafka/src/main/java/com/flink/common/StreamEnvironmentSettings.java
View file @
448e98a5
...
@@ -112,6 +112,7 @@ public class StreamEnvironmentSettings {
...
@@ -112,6 +112,7 @@ public class StreamEnvironmentSettings {
.
build
();
.
build
();
// TableEnvironment tEnv = TableEnvironment.create(settings);//代码中同时创建了 TableEnvironment和 StreamTableEnvironment,但两者功能重叠,通常只需保留 StreamTableEnvironment。
// TableEnvironment tEnv = TableEnvironment.create(settings);//代码中同时创建了 TableEnvironment和 StreamTableEnvironment,但两者功能重叠,通常只需保留 StreamTableEnvironment。
StreamTableEnvironment
tableEnv
=
StreamTableEnvironment
.
create
(
env
,
settings
);
StreamTableEnvironment
tableEnv
=
StreamTableEnvironment
.
create
(
env
,
settings
);
tableEnv
.
getConfig
().
set
(
"table.display.max-column-width"
,
"100"
);
TableConfig
tableConfig
=
tableEnv
.
getConfig
();
TableConfig
tableConfig
=
tableEnv
.
getConfig
();
//时区与状态管理
//时区与状态管理
tableConfig
.
setLocalTimeZone
(
ZoneId
.
of
(
"UTC"
));
tableConfig
.
setLocalTimeZone
(
ZoneId
.
of
(
"UTC"
));
...
...
Write
Preview
Markdown
is supported
0%
Try again
or
attach a new file
Attach a file
Cancel
You are about to add
0
people
to the discussion. Proceed with caution.
Finish editing this message first!
Cancel
Please
register
or
sign in
to comment