Skip to content
Toggle navigation
P
Projects
G
Groups
S
Snippets
Help
eagleEye
/
eagleEye-flink_kafka
This project
Loading...
Sign in
Toggle navigation
Go to a project
Project
Repository
Issues
0
Merge Requests
0
Pipelines
Wiki
Snippets
Settings
Activity
Graph
Charts
Create a new issue
Jobs
Commits
Issue Boards
Files
Commits
Branches
Tags
Contributors
Graph
Compare
Charts
Commit
27c4fd22
authored
Sep 23, 2025
by
魏建枢
Browse files
Options
_('Browse Files')
Download
Email Patches
Plain Diff
simi好友12个类型落库
parent
510633ea
Show whitespace changes
Inline
Side-by-side
Showing
7 changed files
with
348 additions
and
92 deletions
eagleEye-flink_kafka/src/main/java/com/flink/achieve/table/InitCreateTempKafkaSqlTable.java
eagleEye-flink_kafka/src/main/java/com/flink/achieve/table/schema/DorisBaseSchema.java
eagleEye-flink_kafka/src/main/java/com/flink/achieve/table/sql/BaseKafkaToSqlCreateTable.java
eagleEye-flink_kafka/src/main/java/com/flink/achieve/table/sql/OpenSimiApiTable.java
eagleEye-flink_kafka/src/main/java/com/flink/achieve/table/udf/ParseOpenSimiApiUDTF.java
eagleEye-flink_kafka/src/main/java/com/flink/processor/impl/CommonConsumeSqlBaseProcessor.java
eagleEye-flink_kafka/src/main/resources/sql-config.yaml
eagleEye-flink_kafka/src/main/java/com/flink/achieve/table/InitCreateTempKafkaSqlTable.java
View file @
27c4fd22
package
com
.
flink
.
achieve
.
table
;
import
java.io.Serializable
;
import
java.util.Arrays
;
import
java.util.List
;
import
org.apache.flink.api.common.io.ParseException
;
import
org.apache.flink.streaming.api.datastream.DataStreamSource
;
import
org.apache.flink.streaming.api.environment.StreamExecutionEnvironment
;
import
org.apache.flink.table.api.StatementSet
;
import
org.apache.flink.table.api.TableResult
;
import
org.apache.flink.table.api.bridge.java.StreamTableEnvironment
;
import
com.flink.common.SourceCommonBase
;
...
...
@@ -46,8 +49,28 @@ public class InitCreateTempKafkaSqlTable extends SourceCommonBase implements Ser
@Override
public
void
parseMultipleSourceKafkaToSqlTable
(
StreamExecutionEnvironment
env
,
StreamTableEnvironment
tableEnv
,
SqlLoader
loader
)
throws
ParseException
,
Exception
{
String
insertSql
=
loader
.
getSql
(
"collect_user_behavior"
);
tableEnv
.
executeSql
(
insertSql
);
// String insertSql = loader.getSql("collect_user_behavior");
// tableEnv.executeSql(insertSql);
// 创建 StatementSet
StatementSet
statementSet
=
tableEnv
.
createStatementSet
();
// 使用集合管理所有需要执行的SQL语句
List
<
String
>
sqlKeys
=
Arrays
.
asList
(
"collect_user_behavior"
,
"open_simi_api_1"
,
"open_simi_api_2"
,
"open_simi_api_3"
,
"open_simi_api_4"
,
"open_simi_api_5"
,
"open_simi_api_6"
,
"open_simi_api_7"
,
"open_simi_api_8"
,
"open_simi_api_9"
,
"open_simi_api_10"
,
"open_simi_api_11"
,
"open_simi_api_12"
);
// 批量加载并执行所有SQL语句
// 添加所有 INSERT 语句到 StatementSet
for
(
String
sqlId
:
sqlKeys
)
{
String
sql
=
loader
.
getSql
(
sqlId
);
if
(
sql
!=
null
&&
!
sql
.
trim
().
isEmpty
())
{
statementSet
.
addInsertSql
(
sql
);
}
}
// 一次性执行所有语句
TableResult
result
=
statementSet
.
execute
();
// 可选:等待作业完成
// result.await();
}
}
eagleEye-flink_kafka/src/main/java/com/flink/achieve/table/schema/DorisBaseSchema.java
View file @
27c4fd22
...
...
@@ -6,6 +6,7 @@ import static org.apache.flink.table.api.DataTypes.TIMESTAMP;
import
java.util.UUID
;
import
org.apache.doris.flink.sink.writer.LoadConstants
;
import
org.apache.flink.table.api.DataTypes
;
import
org.apache.flink.table.api.Schema
;
import
org.apache.flink.table.api.TableDescriptor
;
import
org.apache.flink.table.api.bridge.java.StreamTableEnvironment
;
...
...
@@ -120,4 +121,149 @@ public class DorisBaseSchema {
.
build
();
createTableDescriptor
(
tableEnv
,
schema
,
tempTableName
,
dbTableName
);
}
public
static
void
openSimiApi1Schema
(
StreamTableEnvironment
tableEnv
,
String
tempTableName
,
String
dbTableName
)
{
Schema
schema
=
Schema
.
newBuilder
()
.
column
(
"id"
,
STRING
())
.
column
(
"cid"
,
STRING
())
.
column
(
"time"
,
TIMESTAMP
(
3
))
.
column
(
"country_code"
,
STRING
())
.
column
(
"phone_number"
,
STRING
())
.
column
(
"create_time"
,
TIMESTAMP
(
3
))
.
build
();
createTableDescriptor
(
tableEnv
,
schema
,
tempTableName
,
dbTableName
);
}
public
static
void
openSimiApi2Schema
(
StreamTableEnvironment
tableEnv
,
String
tempTableName
,
String
dbTableName
)
{
Schema
schema
=
Schema
.
newBuilder
()
.
column
(
"id"
,
STRING
())
.
column
(
"cid"
,
STRING
())
.
column
(
"time"
,
TIMESTAMP
(
3
))
.
column
(
"old_nick_name"
,
STRING
())
.
column
(
"new_nick_name"
,
STRING
())
.
column
(
"create_time"
,
TIMESTAMP
(
3
))
.
build
();
createTableDescriptor
(
tableEnv
,
schema
,
tempTableName
,
dbTableName
);
}
public
static
void
openSimiApi3Schema
(
StreamTableEnvironment
tableEnv
,
String
tempTableName
,
String
dbTableName
)
{
Schema
schema
=
Schema
.
newBuilder
()
.
column
(
"id"
,
STRING
())
.
column
(
"cid"
,
STRING
())
.
column
(
"time"
,
TIMESTAMP
(
3
))
.
column
(
"friend_cid"
,
STRING
())
.
column
(
"friend_nick_name"
,
STRING
())
.
column
(
"add_method"
,
STRING
())
.
column
(
"remark"
,
STRING
())
.
column
(
"create_time"
,
TIMESTAMP
(
3
))
.
build
();
createTableDescriptor
(
tableEnv
,
schema
,
tempTableName
,
dbTableName
);
}
public
static
void
openSimiApi4Schema
(
StreamTableEnvironment
tableEnv
,
String
tempTableName
,
String
dbTableName
)
{
Schema
schema
=
Schema
.
newBuilder
()
.
column
(
"id"
,
STRING
())
.
column
(
"cid"
,
STRING
())
.
column
(
"time"
,
TIMESTAMP
(
3
))
.
column
(
"friend_cid"
,
STRING
())
.
column
(
"friend_nick_name"
,
STRING
())
.
column
(
"add_method"
,
STRING
())
.
column
(
"remark"
,
STRING
())
.
column
(
"create_time"
,
TIMESTAMP
(
3
))
.
build
();
createTableDescriptor
(
tableEnv
,
schema
,
tempTableName
,
dbTableName
);
}
public
static
void
openSimiApi5Schema
(
StreamTableEnvironment
tableEnv
,
String
tempTableName
,
String
dbTableName
)
{
Schema
schema
=
Schema
.
newBuilder
()
.
column
(
"id"
,
STRING
())
.
column
(
"cid"
,
STRING
())
.
column
(
"time"
,
TIMESTAMP
(
3
))
.
column
(
"friend_cid"
,
STRING
())
.
column
(
"old_nick_name"
,
STRING
())
.
column
(
"new_nick_name"
,
STRING
())
.
column
(
"create_time"
,
TIMESTAMP
(
3
))
.
build
();
createTableDescriptor
(
tableEnv
,
schema
,
tempTableName
,
dbTableName
);
}
public
static
void
openSimiApi6Schema
(
StreamTableEnvironment
tableEnv
,
String
tempTableName
,
String
dbTableName
)
{
Schema
schema
=
Schema
.
newBuilder
()
.
column
(
"id"
,
STRING
())
.
column
(
"cid"
,
STRING
())
.
column
(
"time"
,
TIMESTAMP
(
3
))
.
column
(
"friend_cid"
,
STRING
())
.
column
(
"friend_nick_name"
,
STRING
())
.
column
(
"remark"
,
STRING
())
.
column
(
"create_time"
,
TIMESTAMP
(
3
))
.
build
();
createTableDescriptor
(
tableEnv
,
schema
,
tempTableName
,
dbTableName
);
}
public
static
void
openSimiApi7Schema
(
StreamTableEnvironment
tableEnv
,
String
tempTableName
,
String
dbTableName
)
{
Schema
schema
=
Schema
.
newBuilder
()
.
column
(
"id"
,
STRING
())
.
column
(
"cid"
,
STRING
())
.
column
(
"time"
,
TIMESTAMP
(
3
))
.
column
(
"group_id"
,
DataTypes
.
STRING
())
.
column
(
"create_time"
,
TIMESTAMP
(
3
))
.
build
();
createTableDescriptor
(
tableEnv
,
schema
,
tempTableName
,
dbTableName
);
}
public
static
void
openSimiApi8Schema
(
StreamTableEnvironment
tableEnv
,
String
tempTableName
,
String
dbTableName
)
{
Schema
schema
=
Schema
.
newBuilder
()
.
column
(
"id"
,
STRING
())
.
column
(
"cid"
,
STRING
())
.
column
(
"time"
,
TIMESTAMP
(
3
))
.
column
(
"group_id"
,
DataTypes
.
STRING
())
.
column
(
"group_name"
,
STRING
())
.
column
(
"join_type"
,
STRING
())
.
column
(
"create_time"
,
TIMESTAMP
(
3
))
.
build
();
createTableDescriptor
(
tableEnv
,
schema
,
tempTableName
,
dbTableName
);
}
public
static
void
openSimiApi9Schema
(
StreamTableEnvironment
tableEnv
,
String
tempTableName
,
String
dbTableName
)
{
Schema
schema
=
Schema
.
newBuilder
()
.
column
(
"id"
,
STRING
())
.
column
(
"cid"
,
STRING
())
.
column
(
"time"
,
TIMESTAMP
(
3
))
.
column
(
"group_id"
,
DataTypes
.
STRING
())
.
column
(
"old_group_name"
,
STRING
())
.
column
(
"new_group_name"
,
STRING
())
.
column
(
"create_time"
,
TIMESTAMP
(
3
))
.
build
();
createTableDescriptor
(
tableEnv
,
schema
,
tempTableName
,
dbTableName
);
}
public
static
void
openSimiApi10Schema
(
StreamTableEnvironment
tableEnv
,
String
tempTableName
,
String
dbTableName
)
{
Schema
schema
=
Schema
.
newBuilder
()
.
column
(
"id"
,
STRING
())
.
column
(
"cid"
,
STRING
())
.
column
(
"time"
,
TIMESTAMP
(
3
))
.
column
(
"group_id"
,
DataTypes
.
STRING
())
.
column
(
"group_name"
,
STRING
())
.
column
(
"exit_type"
,
STRING
())
.
column
(
"create_time"
,
TIMESTAMP
(
3
))
.
build
();
createTableDescriptor
(
tableEnv
,
schema
,
tempTableName
,
dbTableName
);
}
public
static
void
openSimiApi11Schema
(
StreamTableEnvironment
tableEnv
,
String
tempTableName
,
String
dbTableName
)
{
Schema
schema
=
Schema
.
newBuilder
()
.
column
(
"id"
,
STRING
())
.
column
(
"cid"
,
STRING
())
.
column
(
"time"
,
TIMESTAMP
(
3
))
.
column
(
"release_type"
,
STRING
())
.
column
(
"article_id"
,
STRING
())
.
column
(
"create_time"
,
TIMESTAMP
(
3
))
.
build
();
createTableDescriptor
(
tableEnv
,
schema
,
tempTableName
,
dbTableName
);
}
public
static
void
openSimiApi12Schema
(
StreamTableEnvironment
tableEnv
,
String
tempTableName
,
String
dbTableName
)
{
Schema
schema
=
Schema
.
newBuilder
()
.
column
(
"id"
,
STRING
())
.
column
(
"cid"
,
STRING
())
.
column
(
"time"
,
TIMESTAMP
(
3
))
.
column
(
"release_type"
,
STRING
())
.
column
(
"article_id"
,
STRING
())
.
column
(
"action_type"
,
STRING
())
.
column
(
"comment_content"
,
STRING
())
.
column
(
"comment_id"
,
STRING
())
.
column
(
"comment_level"
,
STRING
())
.
column
(
"create_time"
,
TIMESTAMP
(
3
))
.
build
();
createTableDescriptor
(
tableEnv
,
schema
,
tempTableName
,
dbTableName
);
}
}
eagleEye-flink_kafka/src/main/java/com/flink/achieve/table/sql/BaseKafkaToSqlCreateTable.java
View file @
27c4fd22
...
...
@@ -70,6 +70,18 @@ public class BaseKafkaToSqlCreateTable {
}
else
if
(
StringUtils
.
equals
(
kafkaTopic
.
getTopic
(),
TopicTypeEnum
.
OPEN_SIMI_API
.
getTopic
()))
{
KafkaBaseSchema
.
openSimiApiSchema
(
tableEnv
,
topic
,
groupId
,
"kafka_open_simi_api"
);
OpenSimiApiTable
.
buildOpenSimiApiQuery
(
tableEnv
);
DorisBaseSchema
.
openSimiApi1Schema
(
tableEnv
,
"doris_open_simi_api_1"
,
"bi.open_simi_api_1"
);
DorisBaseSchema
.
openSimiApi2Schema
(
tableEnv
,
"doris_open_simi_api_2"
,
"bi.open_simi_api_2"
);
DorisBaseSchema
.
openSimiApi3Schema
(
tableEnv
,
"doris_open_simi_api_3"
,
"bi.open_simi_api_3"
);
DorisBaseSchema
.
openSimiApi4Schema
(
tableEnv
,
"doris_open_simi_api_4"
,
"bi.open_simi_api_4"
);
DorisBaseSchema
.
openSimiApi5Schema
(
tableEnv
,
"doris_open_simi_api_5"
,
"bi.open_simi_api_5"
);
DorisBaseSchema
.
openSimiApi6Schema
(
tableEnv
,
"doris_open_simi_api_6"
,
"bi.open_simi_api_6"
);
DorisBaseSchema
.
openSimiApi7Schema
(
tableEnv
,
"doris_open_simi_api_7"
,
"bi.open_simi_api_7"
);
DorisBaseSchema
.
openSimiApi8Schema
(
tableEnv
,
"doris_open_simi_api_8"
,
"bi.open_simi_api_8"
);
DorisBaseSchema
.
openSimiApi9Schema
(
tableEnv
,
"doris_open_simi_api_9"
,
"bi.open_simi_api_9"
);
DorisBaseSchema
.
openSimiApi10Schema
(
tableEnv
,
"doris_open_simi_api_10"
,
"bi.open_simi_api_10"
);
DorisBaseSchema
.
openSimiApi11Schema
(
tableEnv
,
"doris_open_simi_api_11"
,
"bi.open_simi_api_11"
);
DorisBaseSchema
.
openSimiApi12Schema
(
tableEnv
,
"doris_open_simi_api_12"
,
"bi.open_simi_api_12"
);
}
else
if
(
StringUtils
.
equals
(
kafkaTopic
.
getTopic
(),
TopicTypeEnum
.
ODS_PC_EVENT_LOG
.
getTopic
()))
{
KafkaBaseSchema
.
pcEventLogSchema
(
tableEnv
,
topic
,
groupId
,
"kafka_pc_event_log"
);
PcEventLogTable
.
buildPcEventLogQuery
(
tableEnv
);
...
...
eagleEye-flink_kafka/src/main/java/com/flink/achieve/table/sql/OpenSimiApiTable.java
View file @
27c4fd22
...
...
@@ -13,173 +13,184 @@ public class OpenSimiApiTable {
public
static
void
buildOpenSimiApiQuery
(
StreamTableEnvironment
tableEnv
)
{
tableEnv
.
createTemporarySystemFunction
(
"ParseOpenSimiApi"
,
ParseOpenSimiApiUDTF
.
class
);
//查询sql数据 并执行
//查询sql数据 并执行
1
tableEnv
.
executeSql
(
"CREATE VIEW user_registration_view AS "
+
"SELECT "
+
" k.create_time AS create_time, "
+
" t.simi_api_info['id'] AS id, "
+
" t.simi_api_info['cid'] AS cid, "
+
" t.simi_api_info['time'] AS `time`, "
+
" t.simi_api_info['countryCode'] AS countryCode, "
+
" t.simi_api_info['phoneNumber'] AS phoneNumber "
+
" t.`time` AS `time`, "
+
" t.simi_api_info['countryCode'] AS country_code, "
+
" t.simi_api_info['phoneNumber'] AS phone_number, "
+
" k.create_time AS create_time "
+
"FROM kafka_open_simi_api AS k "
+
"LEFT JOIN LATERAL TABLE(ParseOpenSimiApi(req_body,send_type)) AS t(simi_api_info) ON TRUE "
+
"LEFT JOIN LATERAL TABLE(ParseOpenSimiApi(req_body,send_type)) AS t(
`time`,
simi_api_info) ON TRUE "
+
"WHERE send_type = '1'"
);
//2
tableEnv
.
executeSql
(
"CREATE VIEW update_nickname_view AS "
+
"SELECT "
+
" k.create_time AS create_time, "
+
" t.simi_api_info['id'] AS id, "
+
" t.simi_api_info['cid'] AS cid, "
+
" t.simi_api_info['time'] AS `time`, "
+
" t.simi_api_info['oldNickname'] AS oldNickname, "
+
" t.simi_api_info['newNickname'] AS newNickname "
+
" t.`time` AS `time`, "
+
" t.simi_api_info['oldNickname'] AS old_nick_name, "
+
" t.simi_api_info['newNickname'] AS new_nick_name, "
+
" k.create_time AS create_time "
+
"FROM kafka_open_simi_api AS k "
+
"LEFT JOIN LATERAL TABLE(ParseOpenSimiApi(req_body,send_type)) AS t(simi_api_info) ON TRUE "
+
"LEFT JOIN LATERAL TABLE(ParseOpenSimiApi(req_body,send_type)) AS t(
`time`,
simi_api_info) ON TRUE "
+
"WHERE send_type = '2'"
);
//3
tableEnv
.
executeSql
(
"CREATE VIEW initiate_friend_request_view AS "
+
"SELECT "
+
" k.create_time AS create_time, "
+
" t.simi_api_info['id'] AS id, "
+
" t.simi_api_info['cid'] AS cid, "
+
" t.simi_api_info['time'] AS `time`, "
+
" t.simi_api_info['friendCid'] AS friendCid, "
+
" t.simi_api_info['friendNickname'] AS friendNickname, "
+
" t.simi_api_info['addMethod'] AS addMethod, "
+
" t.simi_api_info['remark'] AS remark "
+
" t.`time` AS `time`, "
+
" t.simi_api_info['friendCid'] AS friend_cid, "
+
" t.simi_api_info['friendNickname'] AS friend_nick_name, "
+
" t.simi_api_info['addMethod'] AS add_method, "
+
" t.simi_api_info['remark'] AS remark, "
+
" k.create_time AS create_time "
+
"FROM kafka_open_simi_api AS k "
+
"LEFT JOIN LATERAL TABLE(ParseOpenSimiApi(req_body,send_type)) AS t(simi_api_info) ON TRUE "
+
"LEFT JOIN LATERAL TABLE(ParseOpenSimiApi(req_body,send_type)) AS t(
`time`,
simi_api_info) ON TRUE "
+
"WHERE send_type = '3'"
);
//4
tableEnv
.
executeSql
(
"CREATE VIEW friend_request_accepted_view AS "
+
"SELECT "
+
" k.create_time AS create_time, "
+
" t.simi_api_info['id'] AS id, "
+
" t.simi_api_info['cid'] AS cid, "
+
" t.simi_api_info['time'] AS `time`, "
+
" t.simi_api_info['friendCid'] AS friendCid, "
+
" t.simi_api_info['friendNickname'] AS friendNickname, "
+
" t.simi_api_info['addMethod'] AS addMethod, "
+
" t.simi_api_info['remark'] AS remark "
+
" t.`time` AS `time`, "
+
" t.simi_api_info['friendCid'] AS friend_cid, "
+
" t.simi_api_info['friendNickname'] AS friend_nick_name, "
+
" t.simi_api_info['addMethod'] AS add_method, "
+
" t.simi_api_info['remark'] AS remark, "
+
" k.create_time AS create_time "
+
"FROM kafka_open_simi_api AS k "
+
"LEFT JOIN LATERAL TABLE(ParseOpenSimiApi(req_body,send_type)) AS t(simi_api_info) ON TRUE "
+
"LEFT JOIN LATERAL TABLE(ParseOpenSimiApi(req_body,send_type)) AS t(
`time`,
simi_api_info) ON TRUE "
+
"WHERE send_type = '4'"
);
//5
tableEnv
.
executeSql
(
"CREATE VIEW update_friend_nickname_view AS "
+
"SELECT "
+
" k.create_time AS create_time, "
+
" t.simi_api_info['id'] AS id, "
+
" t.simi_api_info['cid'] AS cid, "
+
" t.simi_api_info['time'] AS `time`, "
+
" t.simi_api_info['friendCid'] AS friendCid, "
+
" t.simi_api_info['oldNickname'] AS oldNickname, "
+
" t.simi_api_info['newNickname'] AS newNickname "
+
" t.`time` AS `time`, "
+
" t.simi_api_info['friendCid'] AS friend_cid, "
+
" t.simi_api_info['oldNickname'] AS old_nick_name, "
+
" t.simi_api_info['newNickname'] AS new_nick_name, "
+
" k.create_time AS create_time "
+
"FROM kafka_open_simi_api AS k "
+
"LEFT JOIN LATERAL TABLE(ParseOpenSimiApi(req_body,send_type)) AS t(simi_api_info) ON TRUE "
+
"LEFT JOIN LATERAL TABLE(ParseOpenSimiApi(req_body,send_type)) AS t(
`time`,
simi_api_info) ON TRUE "
+
"WHERE send_type = '5'"
);
//6
tableEnv
.
executeSql
(
"CREATE VIEW delete_friend_view AS "
+
"SELECT "
+
" k.create_time AS create_time, "
+
" t.simi_api_info['id'] AS id, "
+
" t.simi_api_info['cid'] AS cid, "
+
" t.simi_api_info['time'] AS `time`, "
+
" t.simi_api_info['friendCid'] AS friendCid, "
+
" t.simi_api_info['friendNickname'] AS friendNickname, "
+
" t.simi_api_info['remark'] AS remark "
+
" t.`time` AS `time`, "
+
" t.simi_api_info['friendCid'] AS friend_cid, "
+
" t.simi_api_info['friendNickname'] AS friend_nick_name, "
+
" t.simi_api_info['remark'] AS remark, "
+
" k.create_time AS create_time "
+
"FROM kafka_open_simi_api AS k "
+
"LEFT JOIN LATERAL TABLE(ParseOpenSimiApi(req_body,send_type)) AS t(simi_api_info) ON TRUE "
+
"LEFT JOIN LATERAL TABLE(ParseOpenSimiApi(req_body,send_type)) AS t(
`time`,
simi_api_info) ON TRUE "
+
"WHERE send_type = '6'"
);
//7
tableEnv
.
executeSql
(
"CREATE VIEW create_group_view AS "
+
"SELECT "
+
" k.create_time AS create_time, "
+
" t.simi_api_info['id'] AS id, "
+
" t.simi_api_info['cid'] AS cid, "
+
" t.simi_api_info['time'] AS `time`, "
+
" t.simi_api_info['groupId'] AS groupId "
+
" t.`time` AS `time`, "
+
" t.simi_api_info['groupId'] AS group_id, "
+
" k.create_time AS create_time "
+
"FROM kafka_open_simi_api AS k "
+
"LEFT JOIN LATERAL TABLE(ParseOpenSimiApi(req_body,send_type)) AS t(simi_api_info) ON TRUE "
+
"LEFT JOIN LATERAL TABLE(ParseOpenSimiApi(req_body,send_type)) AS t(
`time`,
simi_api_info) ON TRUE "
+
"WHERE send_type = '7'"
);
//8
tableEnv
.
executeSql
(
"CREATE VIEW join_group_view AS "
+
"SELECT "
+
" k.create_time AS create_time, "
+
" t.simi_api_info['id'] AS id, "
+
" t.simi_api_info['cid'] AS cid, "
+
" t.simi_api_info['time'] AS `time`, "
+
" t.simi_api_info['groupId'] AS groupId, "
+
" t.simi_api_info['groupName'] AS groupName, "
+
" t.simi_api_info['joinType'] AS joinType "
+
" t.`time` AS `time`, "
+
" t.simi_api_info['groupId'] AS group_id, "
+
" t.simi_api_info['groupName'] AS group_name, "
+
" t.simi_api_info['joinType'] AS join_type, "
+
" k.create_time AS create_time "
+
"FROM kafka_open_simi_api AS k "
+
"LEFT JOIN LATERAL TABLE(ParseOpenSimiApi(req_body,send_type)) AS t(simi_api_info) ON TRUE "
+
"LEFT JOIN LATERAL TABLE(ParseOpenSimiApi(req_body,send_type)) AS t(
`time`,
simi_api_info) ON TRUE "
+
"WHERE send_type = '8'"
);
//9
tableEnv
.
executeSql
(
"CREATE VIEW update_group_name_view AS "
+
"SELECT "
+
" k.create_time AS create_time, "
+
" t.simi_api_info['id'] AS id, "
+
" t.simi_api_info['cid'] AS cid, "
+
" t.simi_api_info['time'] AS `time`, "
+
" t.simi_api_info['groupId'] AS groupId, "
+
" t.simi_api_info['oldGroupName'] AS oldGroupName, "
+
" t.simi_api_info['newGroupName'] AS newGroupName "
+
" t.`time` AS `time`, "
+
" t.simi_api_info['groupId'] AS group_id, "
+
" t.simi_api_info['oldGroupName'] AS old_group_name, "
+
" t.simi_api_info['newGroupName'] AS new_group_name, "
+
" k.create_time AS create_time "
+
"FROM kafka_open_simi_api AS k "
+
"LEFT JOIN LATERAL TABLE(ParseOpenSimiApi(req_body,send_type)) AS t(simi_api_info) ON TRUE "
+
"LEFT JOIN LATERAL TABLE(ParseOpenSimiApi(req_body,send_type)) AS t(
`time`,
simi_api_info) ON TRUE "
+
"WHERE send_type = '9'"
);
//10
tableEnv
.
executeSql
(
"CREATE VIEW leave_group_view AS "
+
"SELECT "
+
" k.create_time AS create_time, "
+
" t.simi_api_info['id'] AS id, "
+
" t.simi_api_info['cid'] AS cid, "
+
" t.simi_api_info['time'] AS `time`, "
+
" t.simi_api_info['groupId'] AS groupId, "
+
" t.simi_api_info['groupName'] AS groupName, "
+
" t.simi_api_info['exitType'] AS exitType "
+
" t.`time` AS `time`, "
+
" t.simi_api_info['groupId'] AS group_id, "
+
" t.simi_api_info['groupName'] AS group_name, "
+
" t.simi_api_info['exitType'] AS exit_type, "
+
" k.create_time AS create_time "
+
"FROM kafka_open_simi_api AS k "
+
"LEFT JOIN LATERAL TABLE(ParseOpenSimiApi(req_body,send_type)) AS t(simi_api_info) ON TRUE "
+
"LEFT JOIN LATERAL TABLE(ParseOpenSimiApi(req_body,send_type)) AS t(
`time`,
simi_api_info) ON TRUE "
+
"WHERE send_type = '10'"
);
//11
tableEnv
.
executeSql
(
"CREATE VIEW publish_content_view AS "
+
"SELECT "
+
" k.create_time AS create_time, "
+
" t.simi_api_info['id'] AS id, "
+
" t.simi_api_info['cid'] AS cid, "
+
" t.simi_api_info['time'] AS `time`, "
+
" t.simi_api_info['releaseType'] AS releaseType, "
+
" t.simi_api_info['articleId'] AS articleId "
+
" t.`time` AS `time`, "
+
" t.simi_api_info['releaseType'] AS release_type, "
+
" t.simi_api_info['articleId'] AS article_id, "
+
" k.create_time AS create_time "
+
"FROM kafka_open_simi_api AS k "
+
"LEFT JOIN LATERAL TABLE(ParseOpenSimiApi(req_body,send_type)) AS t(simi_api_info) ON TRUE "
+
"LEFT JOIN LATERAL TABLE(ParseOpenSimiApi(req_body,send_type)) AS t(
`time`,
simi_api_info) ON TRUE "
+
"WHERE send_type = '11'"
);
//12
tableEnv
.
executeSql
(
"CREATE VIEW content_interaction_view AS "
+
"SELECT "
+
" k.create_time AS create_time, "
+
" t.simi_api_info['id'] AS id, "
+
" t.simi_api_info['cid'] AS cid, "
+
" t.simi_api_info['time'] AS `time`, "
+
" t.simi_api_info['releaseType'] AS releaseType, "
+
" t.simi_api_info['articleId'] AS articleId, "
+
" t.simi_api_info['actionType'] AS actionType, "
+
" t.simi_api_info['commentContent'] AS commentContent, "
+
" t.simi_api_info['commentId'] AS commentId, "
+
" t.simi_api_info['commentLevel'] AS commentLevel "
+
" t.`time` AS `time`, "
+
" t.simi_api_info['releaseType'] AS release_type, "
+
" t.simi_api_info['articleId'] AS article_id, "
+
" t.simi_api_info['actionType'] AS action_type, "
+
" t.simi_api_info['commentContent'] AS comment_content, "
+
" t.simi_api_info['commentId'] AS comment_id, "
+
" t.simi_api_info['commentLevel'] AS comment_level, "
+
" k.create_time AS create_time "
+
"FROM kafka_open_simi_api AS k "
+
"LEFT JOIN LATERAL TABLE(ParseOpenSimiApi(req_body,send_type)) AS t(simi_api_info) ON TRUE "
+
"LEFT JOIN LATERAL TABLE(ParseOpenSimiApi(req_body,send_type)) AS t(
`time`,
simi_api_info) ON TRUE "
+
"WHERE send_type = '12'"
);
}
...
...
eagleEye-flink_kafka/src/main/java/com/flink/achieve/table/udf/ParseOpenSimiApiUDTF.java
View file @
27c4fd22
package
com
.
flink
.
achieve
.
table
.
udf
;
import
java.text.ParseException
;
import
java.time.LocalDateTime
;
import
java.time.format.DateTimeFormatter
;
import
java.util.HashMap
;
import
java.util.Map
;
...
...
@@ -32,7 +35,7 @@ import com.flink.vo.simi.UserRegistrationReqDto;
* @version 创建时间:2025-8-26 18:19:05
* 类说明
*/
@FunctionHint
(
output
=
@DataTypeHint
(
"ROW<simi_api_info MAP<STRING,STRING>>"
))
@FunctionHint
(
output
=
@DataTypeHint
(
"ROW<
`time` TIMESTAMP(3),
simi_api_info MAP<STRING,STRING>>"
))
public
class
ParseOpenSimiApiUDTF
extends
TableFunction
<
Row
>{
/**
...
...
@@ -42,112 +45,123 @@ public class ParseOpenSimiApiUDTF extends TableFunction<Row>{
private
static
final
Logger
logger
=
LoggerFactory
.
getLogger
(
ParseOpenSimiApiUDTF
.
class
);
public
void
eval
(
String
req_body
,
String
send_type
)
{
// 将字符串转换为 Date
// 使用 DateTimeFormatter(线程安全)
private
static
final
DateTimeFormatter
FORMATTER
=
DateTimeFormatter
.
ofPattern
(
"yyyy-MM-dd HH:mm:ss.SSS"
);
// 修改转换方法
private
LocalDateTime
convertToLocalDateTime
(
String
timeStr
)
{
return
LocalDateTime
.
parse
(
timeStr
,
FORMATTER
);
}
public
void
eval
(
String
req_body
,
String
send_type
)
throws
ParseException
{
if
(
StringUtils
.
isAllEmpty
(
send_type
,
req_body
))
{
return
;
}
Map
<
String
,
Object
>
params
=
new
HashMap
<>();
LocalDateTime
timestamp
=
null
;
if
(
StringUtils
.
equals
(
send_type
,
OpenSimiApiTypeEnum
.
USER_REGISTRATION
.
getCode
()))
{
//1
UserRegistrationReqDto
jsonReqDto
=
JSONObject
.
parseObject
(
req_body
,
new
TypeReference
<
UserRegistrationReqDto
>(){});
params
.
put
(
"id"
,
jsonReqDto
.
getId
());
params
.
put
(
"cid"
,
jsonReqDto
.
getCid
());
params
.
put
(
"time"
,
jsonReqDto
.
getTime
());
params
.
put
(
"countryCode"
,
jsonReqDto
.
getCountryCode
());
params
.
put
(
"phoneNumber"
,
jsonReqDto
.
getPhoneNumber
());
timestamp
=
convertToLocalDateTime
(
jsonReqDto
.
getTime
());
}
else
if
(
StringUtils
.
equals
(
send_type
,
OpenSimiApiTypeEnum
.
UPDATE_NICKNAME
.
getCode
()))
{
//2
UpdateNicknameReqDto
jsonReqDto
=
JSONObject
.
parseObject
(
req_body
,
new
TypeReference
<
UpdateNicknameReqDto
>(){});
params
.
put
(
"id"
,
jsonReqDto
.
getId
());
params
.
put
(
"cid"
,
jsonReqDto
.
getCid
());
params
.
put
(
"time"
,
jsonReqDto
.
getTime
());
params
.
put
(
"oldNickname"
,
jsonReqDto
.
getOldNickname
());
params
.
put
(
"newNickname"
,
jsonReqDto
.
getNewNickname
());
timestamp
=
convertToLocalDateTime
(
jsonReqDto
.
getTime
());
}
else
if
(
StringUtils
.
equals
(
send_type
,
OpenSimiApiTypeEnum
.
INITIATE_FRIEND_REQUEST
.
getCode
()))
{
//3
InitiateFriendRequestReqDto
jsonReqDto
=
JSONObject
.
parseObject
(
req_body
,
new
TypeReference
<
InitiateFriendRequestReqDto
>(){});
params
.
put
(
"id"
,
jsonReqDto
.
getId
());
params
.
put
(
"cid"
,
jsonReqDto
.
getCid
());
params
.
put
(
"time"
,
jsonReqDto
.
getTime
());
params
.
put
(
"friendCid"
,
jsonReqDto
.
getFriendCid
());
params
.
put
(
"friendNickname"
,
jsonReqDto
.
getFriendNickname
());
params
.
put
(
"addMethod"
,
jsonReqDto
.
getAddMethod
());
params
.
put
(
"remark"
,
jsonReqDto
.
getRemark
());
timestamp
=
convertToLocalDateTime
(
jsonReqDto
.
getTime
());
}
else
if
(
StringUtils
.
equals
(
send_type
,
OpenSimiApiTypeEnum
.
FRIEND_REQUEST_ACCEPTED
.
getCode
()))
{
//4
FriendRequestAcceptedReqDto
jsonReqDto
=
JSONObject
.
parseObject
(
req_body
,
new
TypeReference
<
FriendRequestAcceptedReqDto
>(){});
params
.
put
(
"id"
,
jsonReqDto
.
getId
());
params
.
put
(
"cid"
,
jsonReqDto
.
getCid
());
params
.
put
(
"time"
,
jsonReqDto
.
getTime
());
params
.
put
(
"friendCid"
,
jsonReqDto
.
getFriendCid
());
params
.
put
(
"friendNickname"
,
jsonReqDto
.
getFriendNickname
());
params
.
put
(
"addMethod"
,
jsonReqDto
.
getAddMethod
());
params
.
put
(
"remark"
,
jsonReqDto
.
getRemark
());
timestamp
=
convertToLocalDateTime
(
jsonReqDto
.
getTime
());
}
else
if
(
StringUtils
.
equals
(
send_type
,
OpenSimiApiTypeEnum
.
UPDATE_FRIEND_NICKNAME
.
getCode
()))
{
//5
UpdateFriendNicknameReqDto
jsonReqDto
=
JSONObject
.
parseObject
(
req_body
,
new
TypeReference
<
UpdateFriendNicknameReqDto
>(){});
params
.
put
(
"id"
,
jsonReqDto
.
getId
());
params
.
put
(
"cid"
,
jsonReqDto
.
getCid
());
params
.
put
(
"time"
,
jsonReqDto
.
getTime
());
params
.
put
(
"friendCid"
,
jsonReqDto
.
getFriendCid
());
params
.
put
(
"oldNickname"
,
jsonReqDto
.
getOldNickname
());
params
.
put
(
"newNickname"
,
jsonReqDto
.
getNewNickname
());
timestamp
=
convertToLocalDateTime
(
jsonReqDto
.
getTime
());
}
else
if
(
StringUtils
.
equals
(
send_type
,
OpenSimiApiTypeEnum
.
DELETE_FRIEND
.
getCode
()))
{
//6
DeleteFriendReqDto
jsonReqDto
=
JSONObject
.
parseObject
(
req_body
,
new
TypeReference
<
DeleteFriendReqDto
>(){});
params
.
put
(
"id"
,
jsonReqDto
.
getId
());
params
.
put
(
"cid"
,
jsonReqDto
.
getCid
());
params
.
put
(
"time"
,
jsonReqDto
.
getTime
());
params
.
put
(
"friendCid"
,
jsonReqDto
.
getFriendCid
());
params
.
put
(
"friendNickname"
,
jsonReqDto
.
getFriendNickname
());
params
.
put
(
"remark"
,
jsonReqDto
.
getRemark
());
timestamp
=
convertToLocalDateTime
(
jsonReqDto
.
getTime
());
}
else
if
(
StringUtils
.
equals
(
send_type
,
OpenSimiApiTypeEnum
.
CREATE_GROUP
.
getCode
()))
{
//7
CreateGroupReqDto
jsonReqDto
=
JSONObject
.
parseObject
(
req_body
,
new
TypeReference
<
CreateGroupReqDto
>(){});
params
.
put
(
"id"
,
jsonReqDto
.
getId
());
params
.
put
(
"cid"
,
jsonReqDto
.
getCid
());
params
.
put
(
"time"
,
jsonReqDto
.
getTime
());
params
.
put
(
"groupId"
,
jsonReqDto
.
getGroupId
());
timestamp
=
convertToLocalDateTime
(
jsonReqDto
.
getTime
());
}
else
if
(
StringUtils
.
equals
(
send_type
,
OpenSimiApiTypeEnum
.
JOIN_GROUP
.
getCode
()))
{
//8
JoinGroupReqDto
jsonReqDto
=
JSONObject
.
parseObject
(
req_body
,
new
TypeReference
<
JoinGroupReqDto
>(){});
params
.
put
(
"id"
,
jsonReqDto
.
getId
());
params
.
put
(
"cid"
,
jsonReqDto
.
getCid
());
params
.
put
(
"time"
,
jsonReqDto
.
getTime
());
params
.
put
(
"groupId"
,
jsonReqDto
.
getGroupId
());
params
.
put
(
"groupName"
,
jsonReqDto
.
getGroupName
());
params
.
put
(
"joinType"
,
jsonReqDto
.
getJoinType
());
timestamp
=
convertToLocalDateTime
(
jsonReqDto
.
getTime
());
}
else
if
(
StringUtils
.
equals
(
send_type
,
OpenSimiApiTypeEnum
.
UPDATE_GROUP_NAME
.
getCode
()))
{
//9
UpdateGroupNameReqDto
jsonReqDto
=
JSONObject
.
parseObject
(
req_body
,
new
TypeReference
<
UpdateGroupNameReqDto
>(){});
params
.
put
(
"id"
,
jsonReqDto
.
getId
());
params
.
put
(
"cid"
,
jsonReqDto
.
getCid
());
params
.
put
(
"time"
,
jsonReqDto
.
getTime
());
params
.
put
(
"groupId"
,
jsonReqDto
.
getGroupId
());
params
.
put
(
"oldGroupName"
,
jsonReqDto
.
getOldGroupName
());
params
.
put
(
"newGroupName"
,
jsonReqDto
.
getNewGroupName
());
timestamp
=
convertToLocalDateTime
(
jsonReqDto
.
getTime
());
}
else
if
(
StringUtils
.
equals
(
send_type
,
OpenSimiApiTypeEnum
.
LEAVE_GROUP
.
getCode
()))
{
//10
LeaveGroupReqDto
jsonReqDto
=
JSONObject
.
parseObject
(
req_body
,
new
TypeReference
<
LeaveGroupReqDto
>(){});
params
.
put
(
"id"
,
jsonReqDto
.
getId
());
params
.
put
(
"cid"
,
jsonReqDto
.
getCid
());
params
.
put
(
"time"
,
jsonReqDto
.
getTime
());
params
.
put
(
"groupId"
,
jsonReqDto
.
getGroupId
());
params
.
put
(
"groupName"
,
jsonReqDto
.
getGroupName
());
params
.
put
(
"exitType"
,
jsonReqDto
.
getExitType
());
timestamp
=
convertToLocalDateTime
(
jsonReqDto
.
getTime
());
}
else
if
(
StringUtils
.
equals
(
send_type
,
OpenSimiApiTypeEnum
.
PUBLISH_CONTENT
.
getCode
()))
{
//11
PublishContentReqDto
jsonReqDto
=
JSONObject
.
parseObject
(
req_body
,
new
TypeReference
<
PublishContentReqDto
>(){});
params
.
put
(
"id"
,
jsonReqDto
.
getId
());
params
.
put
(
"cid"
,
jsonReqDto
.
getCid
());
params
.
put
(
"time"
,
jsonReqDto
.
getTime
());
params
.
put
(
"releaseType"
,
jsonReqDto
.
getReleaseType
());
params
.
put
(
"articleId"
,
jsonReqDto
.
getArticleId
());
timestamp
=
convertToLocalDateTime
(
jsonReqDto
.
getTime
());
}
else
if
(
StringUtils
.
equals
(
send_type
,
OpenSimiApiTypeEnum
.
CONTENT_INTERACTION
.
getCode
()))
{
//12
ContentInteractionReqDto
jsonReqDto
=
JSONObject
.
parseObject
(
req_body
,
new
TypeReference
<
ContentInteractionReqDto
>(){});
params
.
put
(
"id"
,
jsonReqDto
.
getId
());
params
.
put
(
"cid"
,
jsonReqDto
.
getCid
());
params
.
put
(
"time"
,
jsonReqDto
.
getTime
());
params
.
put
(
"releaseType"
,
jsonReqDto
.
getReleaseType
());
params
.
put
(
"articleId"
,
jsonReqDto
.
getArticleId
());
params
.
put
(
"actionType"
,
jsonReqDto
.
getActionType
());
params
.
put
(
"commentContent"
,
jsonReqDto
.
getCommentContent
());
params
.
put
(
"commentId"
,
jsonReqDto
.
getCommentId
());
params
.
put
(
"commentLevel"
,
jsonReqDto
.
getCommentLevel
());
timestamp
=
convertToLocalDateTime
(
jsonReqDto
.
getTime
());
}
if
(
null
!=
params
)
{
collect
(
Row
.
of
(
params
));
collect
(
Row
.
of
(
timestamp
,
params
));
}
}
}
eagleEye-flink_kafka/src/main/java/com/flink/processor/impl/CommonConsumeSqlBaseProcessor.java
View file @
27c4fd22
...
...
@@ -33,7 +33,7 @@ public class CommonConsumeSqlBaseProcessor implements JobProcessor{
// TopicTypeEnum.ODS_NEW_COLLECT_LOG,
// TopicTypeEnum.SIMI_USER_LIST_TOPIC,
// TopicTypeEnum.ABROAD_SIMI_USER_LIST_TOPIC,
//
TopicTypeEnum.OPEN_SIMI_API,
TopicTypeEnum
.
OPEN_SIMI_API
,
// TopicTypeEnum.ODS_PC_EVENT_LOG,
// TopicTypeEnum.ODS_PC_COLLECT_LOG,
// TopicTypeEnum.ODS_COMMUNITY_HISTORY,
...
...
eagleEye-flink_kafka/src/main/resources/sql-config.yaml
View file @
27c4fd22
...
...
@@ -10,3 +10,52 @@ queries:
SELECT *
FROM kafka_collect_user_behavior
WHERE event_type NOT IN ('socket_event', 'socket_error', 'socket_time', 'refresh_token', 'all_time', 'enter_act', 'exit_act', 'show_act', 'activity_red_exposure')
open_simi_api_1
:
INSERT INTO doris_open_simi_api_1
SELECT *
FROM user_registration_view
open_simi_api_2
:
INSERT INTO doris_open_simi_api_2
SELECT *
FROM update_nickname_view
open_simi_api_3
:
INSERT INTO doris_open_simi_api_3
SELECT *
FROM initiate_friend_request_view
open_simi_api_4
:
INSERT INTO doris_open_simi_api_4
SELECT *
FROM friend_request_accepted_view
open_simi_api_5
:
INSERT INTO doris_open_simi_api_5
SELECT *
FROM update_friend_nickname_view
open_simi_api_6
:
INSERT INTO doris_open_simi_api_6
SELECT *
FROM delete_friend_view
open_simi_api_7
:
INSERT INTO doris_open_simi_api_7
SELECT *
FROM create_group_view
open_simi_api_8
:
INSERT INTO doris_open_simi_api_8
SELECT *
FROM join_group_view
open_simi_api_9
:
INSERT INTO doris_open_simi_api_9
SELECT *
FROM update_group_name_view
open_simi_api_10
:
INSERT INTO doris_open_simi_api_10
SELECT *
FROM leave_group_view
open_simi_api_11
:
INSERT INTO doris_open_simi_api_11
SELECT *
FROM publish_content_view
open_simi_api_12
:
INSERT INTO doris_open_simi_api_12
SELECT *
FROM content_interaction_view
\ No newline at end of file
Write
Preview
Markdown
is supported
0%
Try again
or
attach a new file
Attach a file
Cancel
You are about to add
0
people
to the discussion. Proceed with caution.
Finish editing this message first!
Cancel
Please
register
or
sign in
to comment