Skip to content
Toggle navigation
P
Projects
G
Groups
S
Snippets
Help
eagleEye
/
eagleEye-flink_kafka
This project
Loading...
Sign in
Toggle navigation
Go to a project
Project
Repository
Issues
0
Merge Requests
0
Pipelines
Wiki
Snippets
Settings
Activity
Graph
Charts
Create a new issue
Jobs
Commits
Issue Boards
Files
Commits
Branches
Tags
Contributors
Graph
Compare
Charts
Commit
a4daf7f2
authored
Aug 27, 2025
by
魏建枢
Browse files
Options
_('Browse Files')
Download
Email Patches
Plain Diff
新增opensimiapi临时表
parent
a0818dc7
Hide whitespace changes
Inline
Side-by-side
Showing
7 changed files
with
436 additions
and
6 deletions
eagleEye-flink_kafka/src/main/java/com/flink/achieve/table/RegistrationCheckAchi.java
eagleEye-flink_kafka/src/main/java/com/flink/achieve/table/schema/KafkaBaseSchema.java
eagleEye-flink_kafka/src/main/java/com/flink/achieve/table/sql/BaseKafkaToSqlCreateTable.java
eagleEye-flink_kafka/src/main/java/com/flink/achieve/table/sql/OpenSimiApiTable.java
eagleEye-flink_kafka/src/main/java/com/flink/achieve/table/udf/ParseOpenSimiApiUDTF.java
eagleEye-flink_kafka/src/main/java/com/flink/common/SourceCommonBase.java
eagleEye-flink_kafka/src/main/java/com/flink/util/SqlLoader.java
eagleEye-flink_kafka/src/main/java/com/flink/achieve/table/RegistrationCheckAchi.java
View file @
a4daf7f2
...
...
@@ -6,12 +6,14 @@ import java.util.List;
import
org.apache.flink.api.common.io.ParseException
;
import
org.apache.flink.streaming.api.datastream.DataStreamSource
;
import
org.apache.flink.streaming.api.environment.StreamExecutionEnvironment
;
import
org.apache.flink.table.api.Table
;
import
org.apache.flink.table.api.bridge.java.StreamTableEnvironment
;
import
org.slf4j.Logger
;
import
org.slf4j.LoggerFactory
;
import
com.flink.common.SourceCommonBase
;
import
com.flink.enums.TopicTypeEnum
;
import
com.flink.util.SqlLoader
;
import
com.flink.vo.KafkaDataSource
;
/**
* @author wjs
...
...
@@ -33,8 +35,7 @@ public class RegistrationCheckAchi extends SourceCommonBase implements Serializa
}
@Override
public
void
parseSourceKafkaToSqlTable
(
TopicTypeEnum
topicTypeEnum
,
StreamTableEnvironment
tableEnv
)
throws
ParseException
,
Exception
{
public
void
parseSourceKafkaToSqlTable
(
TopicTypeEnum
topicTypeEnum
,
StreamTableEnvironment
tableEnv
,
SqlLoader
loader
)
throws
ParseException
,
Exception
{
// TODO Auto-generated method stub
}
...
...
@@ -46,7 +47,7 @@ public class RegistrationCheckAchi extends SourceCommonBase implements Serializa
}
@Override
public
void
parseMultipleSourceKafkaToSqlTable
(
StreamExecutionEnvironment
env
,
StreamTableEnvironment
tableEnv
)
throws
Exception
{
public
void
parseMultipleSourceKafkaToSqlTable
(
StreamExecutionEnvironment
env
,
StreamTableEnvironment
tableEnv
,
SqlLoader
loader
)
throws
Exception
{
// Table result = tableEnv.sqlQuery("SELECT * FROM collect_log_view");
// result.execute().print();
...
...
@@ -59,7 +60,14 @@ public class RegistrationCheckAchi extends SourceCommonBase implements Serializa
// "ON eventLog.device_id = collectLog.device_id " +
// "AND eventLog.unique_id = collectLog.unique_id"
// );
// result1.execute().print();
// 获取SQL
// String sql = loader.getSql("registrationCheck");
// System.out.println("Join SQL:\n" + sql);
//执行SQL
Table
result
=
tableEnv
.
sqlQuery
(
"SELECT * FROM content_interaction_view"
);
result
.
execute
().
print
();
}
}
eagleEye-flink_kafka/src/main/java/com/flink/achieve/table/schema/KafkaBaseSchema.java
View file @
a4daf7f2
...
...
@@ -9,6 +9,8 @@ import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
import
com.flink.util.LoadPropertiesFile
;
import
scala.math.BigInt
;
/**
* @author wjs
* @version 创建时间:2025-8-19 16:55:12
...
...
@@ -22,7 +24,8 @@ public class KafkaBaseSchema {
.
option
(
"topic"
,
topic
)
.
option
(
"properties.bootstrap.servers"
,
LoadPropertiesFile
.
getPropertyFileValues
(
"kafka.bootstrapServers"
))
.
option
(
"properties.group.id"
,
groupId
)
.
option
(
"scan.startup.mode"
,
"latest-offset"
)
.
option
(
"scan.startup.mode"
,
"earliest-offset"
)
.
option
(
"properties.auto.offset.reset"
,
"latest"
)
.
option
(
"format"
,
"json"
)
.
option
(
"json.ignore-parse-errors"
,
"true"
)
.
build
();
...
...
@@ -188,4 +191,20 @@ public class KafkaBaseSchema {
.
build
();
createTableDescriptor
(
tableEnv
,
schema
,
topic
,
groupId
,
tableName
);
}
public
static
void
openSimiApiSchema
(
StreamTableEnvironment
tableEnv
,
String
topic
,
String
groupId
,
String
tableName
)
{
Schema
schema
=
Schema
.
newBuilder
()
.
column
(
"send_time"
,
STRING
())
.
column
(
"req_body"
,
STRING
())
.
column
(
"create_time"
,
TIMESTAMP
(
3
))
.
column
(
"send_type"
,
STRING
())
.
column
(
"zone_time"
,
STRING
())
.
column
(
"ua"
,
STRING
())
.
column
(
"flume_type"
,
STRING
())
.
column
(
"zone_code"
,
STRING
())
.
columnByMetadata
(
"topic"
,
STRING
(),
"topic"
,
true
)
// Kafka 元数据字段
.
watermark
(
"create_time"
,
"create_time - INTERVAL '5' SECOND"
)
// 水印策略
.
build
();
createTableDescriptor
(
tableEnv
,
schema
,
topic
,
groupId
,
tableName
);
}
}
eagleEye-flink_kafka/src/main/java/com/flink/achieve/table/sql/BaseKafkaToSqlCreateTable.java
View file @
a4daf7f2
...
...
@@ -67,7 +67,8 @@ public class BaseKafkaToSqlCreateTable {
}
else
if
(
StringUtils
.
equals
(
kafkaTopic
.
getTopic
(),
TopicTypeEnum
.
ABROAD_SIMI_USER_LIST_TOPIC
.
getTopic
()))
{
KafkaBaseSchema
.
abroadSimiUserSchema
(
tableEnv
,
topic
,
groupId
,
"kafka_abroad_simi_user"
);
}
else
if
(
StringUtils
.
equals
(
kafkaTopic
.
getTopic
(),
TopicTypeEnum
.
OPEN_SIMI_API
.
getTopic
()))
{
KafkaBaseSchema
.
openSimiApiSchema
(
tableEnv
,
topic
,
groupId
,
"kafka_open_simi_api"
);
OpenSimiApiTable
.
buildOpenSimiApiQuery
(
tableEnv
);
}
else
if
(
StringUtils
.
equals
(
kafkaTopic
.
getTopic
(),
TopicTypeEnum
.
ODS_PC_EVENT_LOG
.
getTopic
()))
{
KafkaBaseSchema
.
pcEventLogSchema
(
tableEnv
,
topic
,
groupId
,
"kafka_pc_event_log"
);
PcEventLogTable
.
buildPcEventLogQuery
(
tableEnv
);
...
...
eagleEye-flink_kafka/src/main/java/com/flink/achieve/table/sql/OpenSimiApiTable.java
0 → 100644
View file @
a4daf7f2
package
com
.
flink
.
achieve
.
table
.
sql
;
import
org.apache.flink.table.api.bridge.java.StreamTableEnvironment
;
import
com.flink.achieve.table.udf.ParseOpenSimiApiUDTF
;
/**
* @author wjs
* @version 创建时间:2025-8-26 18:15:10
* 类说明
*/
public
class
OpenSimiApiTable
{
public
static
void
buildOpenSimiApiQuery
(
StreamTableEnvironment
tableEnv
)
{
tableEnv
.
createTemporarySystemFunction
(
"ParseOpenSimiApi"
,
ParseOpenSimiApiUDTF
.
class
);
//查询sql数据 并执行
tableEnv
.
executeSql
(
"CREATE VIEW user_registration_view AS "
+
"SELECT "
+
" k.create_time AS create_time, "
+
" t.simi_api_info['id'] AS id, "
+
" t.simi_api_info['cid'] AS cid, "
+
" t.simi_api_info['time'] AS `time`, "
+
" t.simi_api_info['countryCode'] AS countryCode, "
+
" t.simi_api_info['phoneNumber'] AS phoneNumber "
+
"FROM kafka_open_simi_api AS k "
+
"LEFT JOIN LATERAL TABLE(ParseOpenSimiApi(req_body,send_type)) AS t(simi_api_info) ON TRUE "
+
"WHERE send_type = '1'"
);
tableEnv
.
executeSql
(
"CREATE VIEW update_nickname_view AS "
+
"SELECT "
+
" k.create_time AS create_time, "
+
" t.simi_api_info['id'] AS id, "
+
" t.simi_api_info['cid'] AS cid, "
+
" t.simi_api_info['time'] AS `time`, "
+
" t.simi_api_info['oldNickname'] AS oldNickname, "
+
" t.simi_api_info['newNickname'] AS newNickname "
+
"FROM kafka_open_simi_api AS k "
+
"LEFT JOIN LATERAL TABLE(ParseOpenSimiApi(req_body,send_type)) AS t(simi_api_info) ON TRUE "
+
"WHERE send_type = '2'"
);
tableEnv
.
executeSql
(
"CREATE VIEW initiate_friend_request_view AS "
+
"SELECT "
+
" k.create_time AS create_time, "
+
" t.simi_api_info['id'] AS id, "
+
" t.simi_api_info['cid'] AS cid, "
+
" t.simi_api_info['time'] AS `time`, "
+
" t.simi_api_info['friendCid'] AS friendCid, "
+
" t.simi_api_info['friendNickname'] AS friendNickname, "
+
" t.simi_api_info['addMethod'] AS addMethod, "
+
" t.simi_api_info['remark'] AS remark "
+
"FROM kafka_open_simi_api AS k "
+
"LEFT JOIN LATERAL TABLE(ParseOpenSimiApi(req_body,send_type)) AS t(simi_api_info) ON TRUE "
+
"WHERE send_type = '3'"
);
tableEnv
.
executeSql
(
"CREATE VIEW friend_request_accepted_view AS "
+
"SELECT "
+
" k.create_time AS create_time, "
+
" t.simi_api_info['id'] AS id, "
+
" t.simi_api_info['cid'] AS cid, "
+
" t.simi_api_info['time'] AS `time`, "
+
" t.simi_api_info['friendCid'] AS friendCid, "
+
" t.simi_api_info['friendNickname'] AS friendNickname, "
+
" t.simi_api_info['addMethod'] AS addMethod, "
+
" t.simi_api_info['remark'] AS remark "
+
"FROM kafka_open_simi_api AS k "
+
"LEFT JOIN LATERAL TABLE(ParseOpenSimiApi(req_body,send_type)) AS t(simi_api_info) ON TRUE "
+
"WHERE send_type = '4'"
);
tableEnv
.
executeSql
(
"CREATE VIEW update_friend_nickname_view AS "
+
"SELECT "
+
" k.create_time AS create_time, "
+
" t.simi_api_info['id'] AS id, "
+
" t.simi_api_info['cid'] AS cid, "
+
" t.simi_api_info['time'] AS `time`, "
+
" t.simi_api_info['friendCid'] AS friendCid, "
+
" t.simi_api_info['oldNickname'] AS oldNickname, "
+
" t.simi_api_info['newNickname'] AS newNickname "
+
"FROM kafka_open_simi_api AS k "
+
"LEFT JOIN LATERAL TABLE(ParseOpenSimiApi(req_body,send_type)) AS t(simi_api_info) ON TRUE "
+
"WHERE send_type = '5'"
);
tableEnv
.
executeSql
(
"CREATE VIEW delete_friend_view AS "
+
"SELECT "
+
" k.create_time AS create_time, "
+
" t.simi_api_info['id'] AS id, "
+
" t.simi_api_info['cid'] AS cid, "
+
" t.simi_api_info['time'] AS `time`, "
+
" t.simi_api_info['friendCid'] AS friendCid, "
+
" t.simi_api_info['friendNickname'] AS friendNickname, "
+
" t.simi_api_info['remark'] AS remark "
+
"FROM kafka_open_simi_api AS k "
+
"LEFT JOIN LATERAL TABLE(ParseOpenSimiApi(req_body,send_type)) AS t(simi_api_info) ON TRUE "
+
"WHERE send_type = '6'"
);
tableEnv
.
executeSql
(
"CREATE VIEW create_group_view AS "
+
"SELECT "
+
" k.create_time AS create_time, "
+
" t.simi_api_info['id'] AS id, "
+
" t.simi_api_info['cid'] AS cid, "
+
" t.simi_api_info['time'] AS `time`, "
+
" t.simi_api_info['groupId'] AS groupId "
+
"FROM kafka_open_simi_api AS k "
+
"LEFT JOIN LATERAL TABLE(ParseOpenSimiApi(req_body,send_type)) AS t(simi_api_info) ON TRUE "
+
"WHERE send_type = '7'"
);
tableEnv
.
executeSql
(
"CREATE VIEW join_group_view AS "
+
"SELECT "
+
" k.create_time AS create_time, "
+
" t.simi_api_info['id'] AS id, "
+
" t.simi_api_info['cid'] AS cid, "
+
" t.simi_api_info['time'] AS `time`, "
+
" t.simi_api_info['groupId'] AS groupId, "
+
" t.simi_api_info['groupName'] AS groupName, "
+
" t.simi_api_info['joinType'] AS joinType "
+
"FROM kafka_open_simi_api AS k "
+
"LEFT JOIN LATERAL TABLE(ParseOpenSimiApi(req_body,send_type)) AS t(simi_api_info) ON TRUE "
+
"WHERE send_type = '8'"
);
tableEnv
.
executeSql
(
"CREATE VIEW update_group_name_view AS "
+
"SELECT "
+
" k.create_time AS create_time, "
+
" t.simi_api_info['id'] AS id, "
+
" t.simi_api_info['cid'] AS cid, "
+
" t.simi_api_info['time'] AS `time`, "
+
" t.simi_api_info['groupId'] AS groupId, "
+
" t.simi_api_info['oldGroupName'] AS oldGroupName, "
+
" t.simi_api_info['newGroupName'] AS newGroupName "
+
"FROM kafka_open_simi_api AS k "
+
"LEFT JOIN LATERAL TABLE(ParseOpenSimiApi(req_body,send_type)) AS t(simi_api_info) ON TRUE "
+
"WHERE send_type = '9'"
);
tableEnv
.
executeSql
(
"CREATE VIEW leave_group_view AS "
+
"SELECT "
+
" k.create_time AS create_time, "
+
" t.simi_api_info['id'] AS id, "
+
" t.simi_api_info['cid'] AS cid, "
+
" t.simi_api_info['time'] AS `time`, "
+
" t.simi_api_info['groupId'] AS groupId, "
+
" t.simi_api_info['groupName'] AS groupName, "
+
" t.simi_api_info['exitType'] AS exitType "
+
"FROM kafka_open_simi_api AS k "
+
"LEFT JOIN LATERAL TABLE(ParseOpenSimiApi(req_body,send_type)) AS t(simi_api_info) ON TRUE "
+
"WHERE send_type = '10'"
);
tableEnv
.
executeSql
(
"CREATE VIEW publish_content_view AS "
+
"SELECT "
+
" k.create_time AS create_time, "
+
" t.simi_api_info['id'] AS id, "
+
" t.simi_api_info['cid'] AS cid, "
+
" t.simi_api_info['time'] AS `time`, "
+
" t.simi_api_info['releaseType'] AS releaseType, "
+
" t.simi_api_info['articleId'] AS articleId "
+
"FROM kafka_open_simi_api AS k "
+
"LEFT JOIN LATERAL TABLE(ParseOpenSimiApi(req_body,send_type)) AS t(simi_api_info) ON TRUE "
+
"WHERE send_type = '11'"
);
tableEnv
.
executeSql
(
"CREATE VIEW content_interaction_view AS "
+
"SELECT "
+
" k.create_time AS create_time, "
+
" t.simi_api_info['id'] AS id, "
+
" t.simi_api_info['cid'] AS cid, "
+
" t.simi_api_info['time'] AS `time`, "
+
" t.simi_api_info['releaseType'] AS releaseType, "
+
" t.simi_api_info['articleId'] AS articleId, "
+
" t.simi_api_info['actionType'] AS actionType, "
+
" t.simi_api_info['commentContent'] AS commentContent, "
+
" t.simi_api_info['commentId'] AS commentId, "
+
" t.simi_api_info['commentLevel'] AS commentLevel "
+
"FROM kafka_open_simi_api AS k "
+
"LEFT JOIN LATERAL TABLE(ParseOpenSimiApi(req_body,send_type)) AS t(simi_api_info) ON TRUE "
+
"WHERE send_type = '12'"
);
}
}
eagleEye-flink_kafka/src/main/java/com/flink/achieve/table/udf/ParseOpenSimiApiUDTF.java
0 → 100644
View file @
a4daf7f2
package
com
.
flink
.
achieve
.
table
.
udf
;
import
java.util.HashMap
;
import
java.util.Map
;
import
org.apache.commons.lang3.StringUtils
;
import
org.apache.flink.table.annotation.DataTypeHint
;
import
org.apache.flink.table.annotation.FunctionHint
;
import
org.apache.flink.table.functions.TableFunction
;
import
org.apache.flink.types.Row
;
import
org.slf4j.Logger
;
import
org.slf4j.LoggerFactory
;
import
com.alibaba.fastjson.JSONObject
;
import
com.alibaba.fastjson.TypeReference
;
import
com.flink.enums.OpenSimiApiTypeEnum
;
import
com.flink.vo.simi.ContentInteractionReqDto
;
import
com.flink.vo.simi.CreateGroupReqDto
;
import
com.flink.vo.simi.DeleteFriendReqDto
;
import
com.flink.vo.simi.FriendRequestAcceptedReqDto
;
import
com.flink.vo.simi.InitiateFriendRequestReqDto
;
import
com.flink.vo.simi.JoinGroupReqDto
;
import
com.flink.vo.simi.LeaveGroupReqDto
;
import
com.flink.vo.simi.PublishContentReqDto
;
import
com.flink.vo.simi.UpdateFriendNicknameReqDto
;
import
com.flink.vo.simi.UpdateGroupNameReqDto
;
import
com.flink.vo.simi.UpdateNicknameReqDto
;
import
com.flink.vo.simi.UserRegistrationReqDto
;
/**
* @author wjs
* @version 创建时间:2025-8-26 18:19:05
* 类说明
*/
@FunctionHint
(
output
=
@DataTypeHint
(
"ROW<simi_api_info MAP<STRING,STRING>>"
))
public
class
ParseOpenSimiApiUDTF
extends
TableFunction
<
Row
>{
/**
*
*/
private
static
final
long
serialVersionUID
=
1L
;
private
static
final
Logger
logger
=
LoggerFactory
.
getLogger
(
ParseOpenSimiApiUDTF
.
class
);
public
void
eval
(
String
req_body
,
String
send_type
)
{
if
(
StringUtils
.
isAllEmpty
(
send_type
,
req_body
))
{
return
;
}
Map
<
String
,
Object
>
params
=
new
HashMap
<>();
if
(
StringUtils
.
equals
(
send_type
,
OpenSimiApiTypeEnum
.
USER_REGISTRATION
.
getCode
()))
{
//1
UserRegistrationReqDto
jsonReqDto
=
JSONObject
.
parseObject
(
req_body
,
new
TypeReference
<
UserRegistrationReqDto
>(){});
params
.
put
(
"id"
,
jsonReqDto
.
getId
());
params
.
put
(
"cid"
,
jsonReqDto
.
getCid
());
params
.
put
(
"time"
,
jsonReqDto
.
getTime
());
params
.
put
(
"countryCode"
,
jsonReqDto
.
getCountryCode
());
params
.
put
(
"phoneNumber"
,
jsonReqDto
.
getPhoneNumber
());
}
else
if
(
StringUtils
.
equals
(
send_type
,
OpenSimiApiTypeEnum
.
UPDATE_NICKNAME
.
getCode
()))
{
//2
UpdateNicknameReqDto
jsonReqDto
=
JSONObject
.
parseObject
(
req_body
,
new
TypeReference
<
UpdateNicknameReqDto
>(){});
params
.
put
(
"id"
,
jsonReqDto
.
getId
());
params
.
put
(
"cid"
,
jsonReqDto
.
getCid
());
params
.
put
(
"time"
,
jsonReqDto
.
getTime
());
params
.
put
(
"oldNickname"
,
jsonReqDto
.
getOldNickname
());
params
.
put
(
"newNickname"
,
jsonReqDto
.
getNewNickname
());
}
else
if
(
StringUtils
.
equals
(
send_type
,
OpenSimiApiTypeEnum
.
INITIATE_FRIEND_REQUEST
.
getCode
()))
{
//3
InitiateFriendRequestReqDto
jsonReqDto
=
JSONObject
.
parseObject
(
req_body
,
new
TypeReference
<
InitiateFriendRequestReqDto
>(){});
params
.
put
(
"id"
,
jsonReqDto
.
getId
());
params
.
put
(
"cid"
,
jsonReqDto
.
getCid
());
params
.
put
(
"time"
,
jsonReqDto
.
getTime
());
params
.
put
(
"friendCid"
,
jsonReqDto
.
getFriendCid
());
params
.
put
(
"friendNickname"
,
jsonReqDto
.
getFriendNickname
());
params
.
put
(
"addMethod"
,
jsonReqDto
.
getAddMethod
());
params
.
put
(
"remark"
,
jsonReqDto
.
getRemark
());
}
else
if
(
StringUtils
.
equals
(
send_type
,
OpenSimiApiTypeEnum
.
FRIEND_REQUEST_ACCEPTED
.
getCode
()))
{
//4
FriendRequestAcceptedReqDto
jsonReqDto
=
JSONObject
.
parseObject
(
req_body
,
new
TypeReference
<
FriendRequestAcceptedReqDto
>(){});
params
.
put
(
"id"
,
jsonReqDto
.
getId
());
params
.
put
(
"cid"
,
jsonReqDto
.
getCid
());
params
.
put
(
"time"
,
jsonReqDto
.
getTime
());
params
.
put
(
"friendCid"
,
jsonReqDto
.
getFriendCid
());
params
.
put
(
"friendNickname"
,
jsonReqDto
.
getFriendNickname
());
params
.
put
(
"addMethod"
,
jsonReqDto
.
getAddMethod
());
params
.
put
(
"remark"
,
jsonReqDto
.
getRemark
());
}
else
if
(
StringUtils
.
equals
(
send_type
,
OpenSimiApiTypeEnum
.
UPDATE_FRIEND_NICKNAME
.
getCode
()))
{
//5
UpdateFriendNicknameReqDto
jsonReqDto
=
JSONObject
.
parseObject
(
req_body
,
new
TypeReference
<
UpdateFriendNicknameReqDto
>(){});
params
.
put
(
"id"
,
jsonReqDto
.
getId
());
params
.
put
(
"cid"
,
jsonReqDto
.
getCid
());
params
.
put
(
"time"
,
jsonReqDto
.
getTime
());
params
.
put
(
"friendCid"
,
jsonReqDto
.
getFriendCid
());
params
.
put
(
"oldNickname"
,
jsonReqDto
.
getOldNickname
());
params
.
put
(
"newNickname"
,
jsonReqDto
.
getNewNickname
());
}
else
if
(
StringUtils
.
equals
(
send_type
,
OpenSimiApiTypeEnum
.
DELETE_FRIEND
.
getCode
()))
{
//6
DeleteFriendReqDto
jsonReqDto
=
JSONObject
.
parseObject
(
req_body
,
new
TypeReference
<
DeleteFriendReqDto
>(){});
params
.
put
(
"id"
,
jsonReqDto
.
getId
());
params
.
put
(
"cid"
,
jsonReqDto
.
getCid
());
params
.
put
(
"time"
,
jsonReqDto
.
getTime
());
params
.
put
(
"friendCid"
,
jsonReqDto
.
getFriendCid
());
params
.
put
(
"friendNickname"
,
jsonReqDto
.
getFriendNickname
());
params
.
put
(
"remark"
,
jsonReqDto
.
getRemark
());
}
else
if
(
StringUtils
.
equals
(
send_type
,
OpenSimiApiTypeEnum
.
CREATE_GROUP
.
getCode
()))
{
//7
CreateGroupReqDto
jsonReqDto
=
JSONObject
.
parseObject
(
req_body
,
new
TypeReference
<
CreateGroupReqDto
>(){});
params
.
put
(
"id"
,
jsonReqDto
.
getId
());
params
.
put
(
"cid"
,
jsonReqDto
.
getCid
());
params
.
put
(
"time"
,
jsonReqDto
.
getTime
());
params
.
put
(
"groupId"
,
jsonReqDto
.
getGroupId
());
}
else
if
(
StringUtils
.
equals
(
send_type
,
OpenSimiApiTypeEnum
.
JOIN_GROUP
.
getCode
()))
{
//8
JoinGroupReqDto
jsonReqDto
=
JSONObject
.
parseObject
(
req_body
,
new
TypeReference
<
JoinGroupReqDto
>(){});
params
.
put
(
"id"
,
jsonReqDto
.
getId
());
params
.
put
(
"cid"
,
jsonReqDto
.
getCid
());
params
.
put
(
"time"
,
jsonReqDto
.
getTime
());
params
.
put
(
"groupId"
,
jsonReqDto
.
getGroupId
());
params
.
put
(
"groupName"
,
jsonReqDto
.
getGroupName
());
params
.
put
(
"joinType"
,
jsonReqDto
.
getJoinType
());
}
else
if
(
StringUtils
.
equals
(
send_type
,
OpenSimiApiTypeEnum
.
UPDATE_GROUP_NAME
.
getCode
()))
{
//9
UpdateGroupNameReqDto
jsonReqDto
=
JSONObject
.
parseObject
(
req_body
,
new
TypeReference
<
UpdateGroupNameReqDto
>(){});
params
.
put
(
"id"
,
jsonReqDto
.
getId
());
params
.
put
(
"cid"
,
jsonReqDto
.
getCid
());
params
.
put
(
"time"
,
jsonReqDto
.
getTime
());
params
.
put
(
"groupId"
,
jsonReqDto
.
getGroupId
());
params
.
put
(
"oldGroupName"
,
jsonReqDto
.
getOldGroupName
());
params
.
put
(
"newGroupName"
,
jsonReqDto
.
getNewGroupName
());
}
else
if
(
StringUtils
.
equals
(
send_type
,
OpenSimiApiTypeEnum
.
LEAVE_GROUP
.
getCode
()))
{
//10
LeaveGroupReqDto
jsonReqDto
=
JSONObject
.
parseObject
(
req_body
,
new
TypeReference
<
LeaveGroupReqDto
>(){});
params
.
put
(
"id"
,
jsonReqDto
.
getId
());
params
.
put
(
"cid"
,
jsonReqDto
.
getCid
());
params
.
put
(
"time"
,
jsonReqDto
.
getTime
());
params
.
put
(
"groupId"
,
jsonReqDto
.
getGroupId
());
params
.
put
(
"groupName"
,
jsonReqDto
.
getGroupName
());
params
.
put
(
"exitType"
,
jsonReqDto
.
getExitType
());
}
else
if
(
StringUtils
.
equals
(
send_type
,
OpenSimiApiTypeEnum
.
PUBLISH_CONTENT
.
getCode
()))
{
//11
PublishContentReqDto
jsonReqDto
=
JSONObject
.
parseObject
(
req_body
,
new
TypeReference
<
PublishContentReqDto
>(){});
params
.
put
(
"id"
,
jsonReqDto
.
getId
());
params
.
put
(
"cid"
,
jsonReqDto
.
getCid
());
params
.
put
(
"time"
,
jsonReqDto
.
getTime
());
params
.
put
(
"releaseType"
,
jsonReqDto
.
getReleaseType
());
params
.
put
(
"articleId"
,
jsonReqDto
.
getArticleId
());
}
else
if
(
StringUtils
.
equals
(
send_type
,
OpenSimiApiTypeEnum
.
CONTENT_INTERACTION
.
getCode
()))
{
//12
ContentInteractionReqDto
jsonReqDto
=
JSONObject
.
parseObject
(
req_body
,
new
TypeReference
<
ContentInteractionReqDto
>(){});
params
.
put
(
"id"
,
jsonReqDto
.
getId
());
params
.
put
(
"cid"
,
jsonReqDto
.
getCid
());
params
.
put
(
"time"
,
jsonReqDto
.
getTime
());
params
.
put
(
"releaseType"
,
jsonReqDto
.
getReleaseType
());
params
.
put
(
"articleId"
,
jsonReqDto
.
getArticleId
());
params
.
put
(
"actionType"
,
jsonReqDto
.
getActionType
());
params
.
put
(
"commentContent"
,
jsonReqDto
.
getCommentContent
());
params
.
put
(
"commentId"
,
jsonReqDto
.
getCommentId
());
params
.
put
(
"commentLevel"
,
jsonReqDto
.
getCommentLevel
());
}
if
(
null
!=
params
)
{
collect
(
Row
.
of
(
params
));
}
}
}
eagleEye-flink_kafka/src/main/java/com/flink/common/SourceCommonBase.java
View file @
a4daf7f2
...
...
@@ -50,6 +50,7 @@ public abstract class SourceCommonBase {
}
if
(
useStreamAPI
){
//2.资源配置文件信息的获取
//初始化加载器 加载配置文件
DataStreamSource
<
String
>
dataStreamSource
=
KafkaConnector
.
sourceKafka
(
env
,
topicTypeEnum
.
getTopic
(),
topicTypeEnum
.
getGroup
());
logger
.
info
(
"2.资源配置文件信息的获取成功"
);
//3.Kafka资源ETL
...
...
@@ -73,6 +74,7 @@ public abstract class SourceCommonBase {
StreamTableEnvironment
tableEnv
=
StreamEnvironmentSettings
.
createTableEnv
(
env
,
jobName
.
getCode
(),
kafkaTopicList
,
false
);
logger
.
info
(
"2. table的环境设置成功"
);
//3.资源配置文件信息的获取
//初始化加载器 加载配置文件
SqlLoader
loader
=
new
SqlLoader
();
loader
.
load
(
"sql-config.yaml"
);
parseMultipleSourceKafkaToSqlTable
(
env
,
tableEnv
,
loader
);
...
...
eagleEye-flink_kafka/src/main/java/com/flink/util/SqlLoader.java
View file @
a4daf7f2
...
...
@@ -39,6 +39,15 @@ public class SqlLoader {
}
/**
* 获取指定Key对应的SQL语句
* @param key SQL配置的唯一标识符
* @return SQL字符串(若未找到则返回null)
*/
public
String
getSql
(
String
key
)
{
return
sqlMap
.
get
(
key
);
}
/**
* 获取带参数替换的SQL
* @param key SQL配置的唯一标识符
* @param params 参数键值对(如{"date":"2025-08-26"})
...
...
@@ -54,4 +63,55 @@ public class SqlLoader {
}
return
sql
;
}
/**
* 静态方法:直接获取YAML配置中的值
* @param proKey 配置项键名
* @return 配置值
*/
public
static
String
getYamlFileValues
(
String
proKey
)
{
return
new
SqlLoader
().
getSql
(
proKey
);
}
public
static
void
main
(
String
[]
args
)
{
//yaml示例
// queries:
// user_stats: |
// SELECT user_id, COUNT(*) AS behavior_count
// FROM user_behavior
// WHERE dt = '${date}'
// GROUP BY user_id
//
// join_logs: |
// SELECT *
// FROM event_log AS e
// LEFT JOIN collect_log AS c
// ON e.device_id = c.device_id
// AND e.unique_id = c.unique_id
//
// latest_orders: >
// SELECT order_id, product_name, amount
// FROM orders
// ORDER BY order_time DESC
// LIMIT 10
// 初始化加载器
SqlLoader
loader
=
new
SqlLoader
();
loader
.
load
(
"sql-config.yaml"
);
// 加载配置文件
// 获取基础SQL
String
joinSql
=
loader
.
getSql
(
"join_logs"
);
System
.
out
.
println
(
"Join SQL:\n"
+
joinSql
);
// 获取带参数替换的SQL
Map
<
String
,
String
>
params
=
new
HashMap
<>();
params
.
put
(
"date"
,
"2023-10-01"
);
String
userStatsSql
=
loader
.
getSql
(
"user_stats"
,
params
);
System
.
out
.
println
(
"User Stats SQL:\n"
+
userStatsSql
);
// 使用静态方法获取配置
String
sql
=
SqlLoader
.
getYamlFileValues
(
"latest_orders"
);
System
.
out
.
println
(
"Latest Orders SQL:\n"
+
sql
);
}
}
Write
Preview
Markdown
is supported
0%
Try again
or
attach a new file
Attach a file
Cancel
You are about to add
0
people
to the discussion. Proceed with caution.
Finish editing this message first!
Cancel
Please
register
or
sign in
to comment