Skip to content
Toggle navigation
P
Projects
G
Groups
S
Snippets
Help
eagleEye
/
eagleEye-flink_kafka
This project
Loading...
Sign in
Toggle navigation
Go to a project
Project
Repository
Issues
0
Merge Requests
0
Pipelines
Wiki
Snippets
Settings
Activity
Graph
Charts
Create a new issue
Jobs
Commits
Issue Boards
Files
Commits
Branches
Tags
Contributors
Graph
Compare
Charts
Commit
b79af8cf
authored
Jul 28, 2025
by
魏建枢
Browse files
Options
_('Browse Files')
Download
Email Patches
Plain Diff
代码提交
parent
708c9999
Expand all
Hide whitespace changes
Inline
Side-by-side
Showing
35 changed files
with
968 additions
and
663 deletions
eagleEye-flink_kafka/src/main/java/com/flink/achieve/base/CollectLogAchi.java
eagleEye-flink_kafka/src/main/java/com/flink/achieve/base/CommonConsumeBaseAchi.java
eagleEye-flink_kafka/src/main/java/com/flink/achieve/base/CommunityHistoryAchi.java
eagleEye-flink_kafka/src/main/java/com/flink/achieve/base/EventLogAchi.java
eagleEye-flink_kafka/src/main/java/com/flink/achieve/base/UserInvitationAchi.java
eagleEye-flink_kafka/src/main/java/com/flink/achieve/doris/EventLogAchi.java
eagleEye-flink_kafka/src/main/java/com/flink/achieve/doris/RealBalanceAchi.java
eagleEye-flink_kafka/src/main/java/com/flink/achieve/doris/RealKycAchi.java
eagleEye-flink_kafka/src/main/java/com/flink/achieve/doris/RealTransactionAchi.java
eagleEye-flink_kafka/src/main/java/com/flink/achieve/doris/RealUsersAchi.java
eagleEye-flink_kafka/src/main/java/com/flink/achieve/doris/RegistrationCheckAchi.java
eagleEye-flink_kafka/src/main/java/com/flink/enums/JobTypeEnum.java
eagleEye-flink_kafka/src/main/java/com/flink/enums/TopicTypeEnum.java
eagleEye-flink_kafka/src/main/java/com/flink/factory/JobProcessorFactory.java
eagleEye-flink_kafka/src/main/java/com/flink/processor/function/CollectLogJoinProcessor.java
eagleEye-flink_kafka/src/main/java/com/flink/processor/function/DistinctUserAggregator.java
eagleEye-flink_kafka/src/main/java/com/flink/processor/function/GenDeviceIdProcessor.java
eagleEye-flink_kafka/src/main/java/com/flink/processor/function/JoinDeviceWithRegistrationProcessFunction.java
eagleEye-flink_kafka/src/main/java/com/flink/processor/function/LatestUserProcessFunction.java
eagleEye-flink_kafka/src/main/java/com/flink/processor/function/WindowResultFunction.java
eagleEye-flink_kafka/src/main/java/com/flink/processor/impl/RealKycProcessor.java → eagleEye-flink_kafka/src/main/java/com/flink/processor/impl/CommonConsumeBaseProcessor.java
eagleEye-flink_kafka/src/main/java/com/flink/processor/impl/EventLogProcessor.java
eagleEye-flink_kafka/src/main/java/com/flink/processor/impl/RealTransactionProcessor.java
eagleEye-flink_kafka/src/main/java/com/flink/processor/impl/RealUsersProcessor.java
eagleEye-flink_kafka/src/main/java/com/flink/processor/impl/RealBalanceProcessor.java → eagleEye-flink_kafka/src/main/java/com/flink/processor/impl/RegistrationCheckProcessor.java
eagleEye-flink_kafka/src/main/java/com/flink/util/TimeConvertUtil.java
eagleEye-flink_kafka/src/main/java/com/flink/vo/CommunityHistory.java
eagleEye-flink_kafka/src/main/java/com/flink/vo/DeviceId.java
eagleEye-flink_kafka/src/main/java/com/flink/vo/DeviceIdInfo.java
eagleEye-flink_kafka/src/main/java/com/flink/vo/DeviceRegistrationResult.java
eagleEye-flink_kafka/src/main/java/com/flink/vo/PcEventInfo.java
eagleEye-flink_kafka/src/main/java/com/flink/vo/RealLead.java
eagleEye-flink_kafka/src/main/java/com/flink/vo/Result.java
eagleEye-flink_kafka/src/main/java/com/flink/vo/RiskInfo.java
eagleEye-flink_kafka/src/main/java/com/flink/vo/android/deviceInfo/D6.java
eagleEye-flink_kafka/src/main/java/com/flink/achieve/base/CollectLogAchi.java
0 → 100644
View file @
b79af8cf
package
com
.
flink
.
achieve
.
base
;
import
java.io.Serializable
;
import
org.apache.flink.streaming.api.datastream.DataStreamSource
;
/**
* @author wjs
* @version 创建时间:2025-7-28 10:38:56
* 类说明
*/
public
class
CollectLogAchi
implements
Serializable
{
/**
*
*/
private
static
final
long
serialVersionUID
=
1L
;
public
static
void
collectLog
(
DataStreamSource
<
String
>
collectLogStreamSource
)
{
// TODO Auto-generated method stub
}
}
eagleEye-flink_kafka/src/main/java/com/flink/achieve/base/CommonConsumeBaseAchi.java
0 → 100644
View file @
b79af8cf
package
com
.
flink
.
achieve
.
base
;
import
java.io.Serializable
;
import
java.util.List
;
import
org.apache.commons.collections.CollectionUtils
;
import
org.apache.commons.lang3.StringUtils
;
import
org.apache.flink.api.common.io.ParseException
;
import
org.apache.flink.streaming.api.datastream.DataStreamSource
;
import
org.slf4j.Logger
;
import
org.slf4j.LoggerFactory
;
import
com.flink.common.MultipleSourceCommonBase
;
import
com.flink.enums.TopicTypeEnum
;
import
com.flink.vo.KafkaDataSource
;
/**
* @author wjs
* @version 创建时间:2025-7-25 10:29:23
* 类说明
*/
public
class
CommonConsumeBaseAchi
extends
MultipleSourceCommonBase
implements
Serializable
{
/**
*
*/
private
static
final
long
serialVersionUID
=
1L
;
private
static
final
Logger
logger
=
LoggerFactory
.
getLogger
(
CommonConsumeBaseAchi
.
class
);
@Override
public
void
parseSourceKafkaJson
(
List
<
KafkaDataSource
>
dataSourceList
)
throws
ParseException
,
Exception
{
if
(
CollectionUtils
.
isNotEmpty
(
dataSourceList
))
{
for
(
KafkaDataSource
kafkaDataSource
:
dataSourceList
)
{
if
(
StringUtils
.
equals
(
kafkaDataSource
.
getTopic
(),
TopicTypeEnum
.
ODS_USER_INVITATION
.
getTopic
()))
{
DataStreamSource
<
String
>
userInvitationStreamSource
=
kafkaDataSource
.
getDataStreamSource
();
UserInvitationAchi
.
userInvitation
(
userInvitationStreamSource
);
}
if
(
StringUtils
.
equals
(
kafkaDataSource
.
getTopic
(),
TopicTypeEnum
.
ODS_EVENT_LOG
.
getTopic
()))
{
DataStreamSource
<
String
>
eventLogStreamSource
=
kafkaDataSource
.
getDataStreamSource
();
EventLogAchi
.
eventLog
(
eventLogStreamSource
);
}
if
(
StringUtils
.
equals
(
kafkaDataSource
.
getTopic
(),
TopicTypeEnum
.
ODS_COMMUNITY_HISTORY
.
getTopic
()))
{
DataStreamSource
<
String
>
communityHistoryStreamSource
=
kafkaDataSource
.
getDataStreamSource
();
CommunityHistoryAchi
.
communityHistory
(
communityHistoryStreamSource
);
}
if
(
StringUtils
.
equals
(
kafkaDataSource
.
getTopic
(),
TopicTypeEnum
.
ODS_NEW_COLLECT_LOG
.
getTopic
()))
{
DataStreamSource
<
String
>
collectLogStreamSource
=
kafkaDataSource
.
getDataStreamSource
();
CollectLogAchi
.
collectLog
(
collectLogStreamSource
);
}
}
}
else
{
return
;
}
}
}
eagleEye-flink_kafka/src/main/java/com/flink/achieve/base/CommunityHistoryAchi.java
0 → 100644
View file @
b79af8cf
package
com
.
flink
.
achieve
.
base
;
import
java.io.Serializable
;
import
org.apache.flink.streaming.api.datastream.DataStreamSource
;
/**
* @author wjs
* @version 创建时间:2025-7-28 10:40:20
* 类说明
*/
public
class
CommunityHistoryAchi
implements
Serializable
{
/**
*
*/
private
static
final
long
serialVersionUID
=
1L
;
public
static
void
communityHistory
(
DataStreamSource
<
String
>
communityHistoryStreamSource
)
{
// TODO Auto-generated method stub
}
}
eagleEye-flink_kafka/src/main/java/com/flink/achieve/base/EventLogAchi.java
0 → 100644
View file @
b79af8cf
package
com
.
flink
.
achieve
.
base
;
import
java.io.Serializable
;
import
org.apache.flink.streaming.api.datastream.DataStreamSource
;
/**
* @author wjs
* @version 创建时间:2025-7-28 10:37:25
* 类说明
*/
public
class
EventLogAchi
implements
Serializable
{
/**
*
*/
private
static
final
long
serialVersionUID
=
1L
;
public
static
void
eventLog
(
DataStreamSource
<
String
>
eventLogStreamSource
)
{
// TODO Auto-generated method stub
}
}
eagleEye-flink_kafka/src/main/java/com/flink/achieve/base/UserInvitationAchi.java
0 → 100644
View file @
b79af8cf
This diff is collapsed.
Click to expand it.
eagleEye-flink_kafka/src/main/java/com/flink/achieve/doris/EventLogAchi.java
View file @
b79af8cf
...
...
@@ -23,17 +23,22 @@ import org.apache.flink.util.Collector;
import
org.slf4j.Logger
;
import
org.slf4j.LoggerFactory
;
import
com.alibaba.fastjson.JSON
;
import
com.alibaba.fastjson.JSONObject
;
import
com.alibaba.fastjson.TypeReference
;
import
com.flink.common.DorisConnector
;
import
com.flink.common.SourceCommonBase
;
import
com.flink.common.
Multiple
SourceCommonBase
;
import
com.flink.config.TableConfig
;
import
com.flink.enums.TopicTypeEnum
;
import
com.flink.processor.function.UserPropertiesProcessor
;
import
com.flink.util.CompareUtils
;
import
com.flink.util.TimeConvertUtil
;
import
com.flink.vo.CommunityHistory
;
import
com.flink.vo.EventList
;
import
com.flink.vo.KafkaDataSource
;
import
com.flink.vo.OdsEventLog
;
import
com.flink.vo.Properties
;
import
com.flink.vo.RiskInfo
;
import
com.flink.vo.UserProperties
;
/**
...
...
@@ -41,7 +46,7 @@ import com.flink.vo.UserProperties;
* @version 创建时间:2025-6-20 23:40:33
* 类说明
*/
public
class
EventLogAchi
extends
SourceCommonBase
implements
Serializable
{
public
class
EventLogAchi
extends
Multiple
SourceCommonBase
implements
Serializable
{
/**
*
...
...
@@ -50,7 +55,7 @@ public class EventLogAchi extends SourceCommonBase implements Serializable{
private
static
final
Logger
logger
=
LoggerFactory
.
getLogger
(
EventLogAchi
.
class
);
@Override
public
void
parseSourceKafkaJson
(
DataStreamSource
<
String
>
dataStreamSource
)
throws
ParseException
,
Exception
{
public
void
parseSourceKafkaJson
(
List
<
KafkaDataSource
>
dataSourceList
)
throws
ParseException
,
Exception
{
// =================配置入库字段=========================================
// 事件明细表结构
TableConfig
tableConfig
=
new
TableConfig
(
...
...
@@ -107,7 +112,6 @@ public class EventLogAchi extends SourceCommonBase implements Serializable{
"bi.event_log"
);
TableConfig
tableErrorConfig
=
new
TableConfig
(
new
String
[]{
"id"
,
...
...
@@ -148,12 +152,71 @@ public class EventLogAchi extends SourceCommonBase implements Serializable{
"bi.event_log_error"
);
TableConfig
tableCommunityHistoryConfig
=
new
TableConfig
(
new
String
[]{
"article_id"
,
"source_type"
,
"article_type"
,
"article_state"
,
"create_time"
,
"article_user_cid"
,
"article_text"
,
"images"
,
"label"
,
"score"
,
"law"
,
"law_image"
,
"politics"
,
"politics_image"
,
"ad"
,
"ad_image"
,
"hot_search_title"
,
"__DORIS_DELETE_SIGN__"
},
new
DataType
[]{
DataTypes
.
STRING
(),
DataTypes
.
STRING
(),
DataTypes
.
STRING
(),
DataTypes
.
STRING
(),
DataTypes
.
STRING
(),
DataTypes
.
STRING
(),
DataTypes
.
STRING
(),
DataTypes
.
STRING
(),
DataTypes
.
STRING
(),
DataTypes
.
STRING
(),
DataTypes
.
STRING
(),
DataTypes
.
STRING
(),
DataTypes
.
STRING
(),
DataTypes
.
STRING
(),
DataTypes
.
STRING
(),
DataTypes
.
STRING
(),
DataTypes
.
STRING
(),
DataTypes
.
INT
()
},
"bi.simi_community_history"
);
//=================流式处理=========================================
DorisSink
<
RowData
>
dorisSink
=
DorisConnector
.
sinkDoris
(
tableConfig
.
getFields
(),
tableConfig
.
getTypes
(),
tableConfig
.
getTableName
());
DorisSink
<
RowData
>
dorisErrorSink
=
DorisConnector
.
sinkDoris
(
tableErrorConfig
.
getFields
(),
tableErrorConfig
.
getTypes
(),
tableErrorConfig
.
getTableName
());
DorisSink
<
RowData
>
dorisCommunityHistorySink
=
DorisConnector
.
sinkDoris
(
tableCommunityHistoryConfig
.
getFields
(),
tableCommunityHistoryConfig
.
getTypes
(),
tableCommunityHistoryConfig
.
getTableName
());
//=================数据处理流水线=========================================
dataStreamSource
DataStreamSource
<
String
>
eventLogStreamSource
=
null
;
DataStreamSource
<
String
>
communityHistoryStreamSource
=
null
;
if
(
CollectionUtils
.
isNotEmpty
(
dataSourceList
))
{
for
(
KafkaDataSource
kafkaDataSource
:
dataSourceList
)
{
if
(
StringUtils
.
equals
(
kafkaDataSource
.
getTopic
(),
TopicTypeEnum
.
ODS_EVENT_LOG
.
getTopic
()))
{
eventLogStreamSource
=
kafkaDataSource
.
getDataStreamSource
();
}
if
(
StringUtils
.
equals
(
kafkaDataSource
.
getTopic
(),
TopicTypeEnum
.
ODS_COMMUNITY_HISTORY
.
getTopic
()))
{
communityHistoryStreamSource
=
kafkaDataSource
.
getDataStreamSource
();
}
}
}
else
{
return
;
}
eventLogStreamSource
.
map
(
value
->{
try
{
// 解析 Kafka 数据
...
...
@@ -204,9 +267,8 @@ public class EventLogAchi extends SourceCommonBase implements Serializable{
// .print()
.
sinkTo
(
dorisSink
)
.
name
(
"Doris-EventLog"
);
SingleOutputStreamOperator
<
RowData
>
rowDataStream
=
dataStreamSource
.
flatMap
(
SingleOutputStreamOperator
<
RowData
>
rowDataStream
=
eventLogStreamSource
.
flatMap
(
new
FlatMapFunction
<
String
,
RowData
>()
{
private
static
final
long
serialVersionUID
=
1L
;
...
...
@@ -271,12 +333,70 @@ public class EventLogAchi extends SourceCommonBase implements Serializable{
// .print()
.
sinkTo
(
dorisErrorSink
)
.
name
(
"Doris-EventLogError"
);
}
SingleOutputStreamOperator
<
RowData
>
communityHistoryRowDataStream
=
communityHistoryStreamSource
.
flatMap
(
new
FlatMapFunction
<
String
,
RowData
>(){
@Override
public
void
sendToSinkKafka
(
DataStreamSource
<
String
>
mStream
)
{
// TODO Auto-generated method stub
/**
*
*/
private
static
final
long
serialVersionUID
=
1L
;
@Override
public
void
flatMap
(
String
value
,
Collector
<
RowData
>
out
)
throws
Exception
{
try
{
// 解析 Kafka 数据
JSONObject
jsonObj
=
JSON
.
parseObject
(
value
);
String
list
=
jsonObj
.
getString
(
"list"
);
String
flumeType
=
jsonObj
.
getString
(
"flume_type"
);
if
(
StringUtils
.
isEmpty
(
list
))
{
return
;
}
String
sourceType
=
null
;
if
(
StringUtils
.
equals
(
flumeType
,
"communityHistory"
))
{
sourceType
=
"国内版"
;
}
else
if
(
StringUtils
.
equals
(
flumeType
,
"communityHistoryAbroad"
))
{
sourceType
=
"海外版"
;
}
List
<
CommunityHistory
>
records
=
JSONObject
.
parseObject
(
list
,
new
TypeReference
<
List
<
CommunityHistory
>>()
{});
if
(
CollectionUtils
.
isEmpty
(
records
))
{
return
;
}
for
(
CommunityHistory
communityHistory
:
records
)
{
RiskInfo
riskInfo
=
communityHistory
.
getRisk
();
GenericRowData
row
=
new
GenericRowData
(
tableCommunityHistoryConfig
.
getFields
().
length
);
row
.
setField
(
0
,
StringData
.
fromString
(
communityHistory
.
getArticleId
()));
row
.
setField
(
1
,
StringData
.
fromString
(
sourceType
));
row
.
setField
(
2
,
StringData
.
fromString
(
communityHistory
.
getArticleType
()));
row
.
setField
(
3
,
StringData
.
fromString
(
communityHistory
.
getArticleState
()));
row
.
setField
(
4
,
StringData
.
fromString
(
TimeConvertUtil
.
parseToStringSSS
(
communityHistory
.
getCreateTime
())));
row
.
setField
(
5
,
StringData
.
fromString
(
communityHistory
.
getArticleUserCid
()));
row
.
setField
(
6
,
StringData
.
fromString
(
communityHistory
.
getArticleText
()));
row
.
setField
(
7
,
StringData
.
fromString
(
communityHistory
.
getImages
()));
row
.
setField
(
8
,
StringData
.
fromString
(
communityHistory
.
getLabel
()));
row
.
setField
(
9
,
StringData
.
fromString
(
riskInfo
==
null
?
null
:
riskInfo
.
getScore
()));
row
.
setField
(
10
,
StringData
.
fromString
(
riskInfo
==
null
?
null
:
riskInfo
.
getLaw
()));
row
.
setField
(
11
,
StringData
.
fromString
(
riskInfo
==
null
?
null
:
riskInfo
.
getLawImage
()));
row
.
setField
(
12
,
StringData
.
fromString
(
riskInfo
==
null
?
null
:
riskInfo
.
getPolitics
()));
row
.
setField
(
13
,
StringData
.
fromString
(
riskInfo
==
null
?
null
:
riskInfo
.
getPoliticsImage
()));
row
.
setField
(
14
,
StringData
.
fromString
(
riskInfo
==
null
?
null
:
riskInfo
.
getAd
()));
row
.
setField
(
15
,
StringData
.
fromString
(
riskInfo
==
null
?
null
:
riskInfo
.
getAdImage
()));
row
.
setField
(
16
,
StringData
.
fromString
(
communityHistory
.
getHotSearchTitle
()));
row
.
setField
(
17
,
0
);
out
.
collect
(
row
);
}
}
catch
(
Exception
e
)
{
logger
.
error
(
"EventLogAchi communityHistory处理 Kafka 消息出错 | rawData:{} | error:{}"
,
value
,
e
.
getMessage
());
System
.
err
.
println
(
"解析失败: "
+
e
.
toString
());
}
}
});
communityHistoryRowDataStream
.
filter
(
Objects:
:
nonNull
)
// .print()
.
sinkTo
(
dorisCommunityHistorySink
)
.
name
(
"Doris-EventLogCommunityHistory"
);
}
}
eagleEye-flink_kafka/src/main/java/com/flink/achieve/doris/RealBalanceAchi.java
deleted
100644 → 0
View file @
708c9999
package
com
.
flink
.
achieve
.
doris
;
import
java.io.Serializable
;
import
java.util.List
;
import
java.util.Objects
;
import
org.apache.commons.collections.CollectionUtils
;
import
org.apache.commons.lang3.StringUtils
;
import
org.apache.doris.flink.sink.DorisSink
;
import
org.apache.flink.api.common.functions.FlatMapFunction
;
import
org.apache.flink.api.common.io.ParseException
;
import
org.apache.flink.streaming.api.datastream.DataStreamSource
;
import
org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator
;
import
org.apache.flink.table.api.DataTypes
;
import
org.apache.flink.table.data.GenericRowData
;
import
org.apache.flink.table.data.RowData
;
import
org.apache.flink.table.data.StringData
;
import
org.apache.flink.table.types.DataType
;
import
org.apache.flink.util.Collector
;
import
org.slf4j.Logger
;
import
org.slf4j.LoggerFactory
;
import
com.alibaba.fastjson.JSON
;
import
com.alibaba.fastjson.JSONObject
;
import
com.alibaba.fastjson.TypeReference
;
import
com.flink.common.DorisConnector
;
import
com.flink.common.SourceCommonBase
;
import
com.flink.vo.RealBalance
;
/**
* @author wjs
* @version 创建时间:2025-5-8 11:51:13
* 类说明
*/
public
class
RealBalanceAchi
extends
SourceCommonBase
implements
Serializable
{
/**
*
*/
private
static
final
long
serialVersionUID
=
1L
;
private
static
final
Logger
logger
=
LoggerFactory
.
getLogger
(
RealBalanceAchi
.
class
);
@Override
public
void
parseSourceKafkaJson
(
DataStreamSource
<
String
>
dataStreamSource
)
throws
ParseException
,
Exception
{
//=================配置入库字段=========================================
String
[]
fields
=
{
"id"
,
"account_id"
,
"uid"
,
"symbol"
,
"balance"
,
"updated_at"
,
"__DORIS_DELETE_SIGN__"
};
DataType
[]
types
=
{
DataTypes
.
STRING
(),
DataTypes
.
STRING
(),
DataTypes
.
BIGINT
(),
DataTypes
.
STRING
(),
DataTypes
.
DOUBLE
(),
DataTypes
.
STRING
(),
DataTypes
.
INT
()
};
//=================流式处理=========================================
String
tableName
=
"bi.real_balance"
;
DorisSink
<
RowData
>
dorisSink
=
DorisConnector
.
sinkDoris
(
fields
,
types
,
tableName
);
SingleOutputStreamOperator
<
RowData
>
rowDataStream
=
dataStreamSource
.
flatMap
(
new
FlatMapFunction
<
String
,
RowData
>()
{
private
static
final
long
serialVersionUID
=
1L
;
@Override
public
void
flatMap
(
String
value
,
Collector
<
RowData
>
out
)
throws
Exception
{
try
{
// 解析 Kafka 数据
List
<
RealBalance
>
recordList
=
handleData
(
value
);
if
(
CollectionUtils
.
isEmpty
(
recordList
))
{
return
;
}
// 转换为RowData
for
(
RealBalance
balance
:
recordList
)
{
GenericRowData
row
=
new
GenericRowData
(
fields
.
length
);
row
.
setField
(
0
,
StringData
.
fromString
(
balance
.
getId
()));
// id: INT
row
.
setField
(
1
,
StringData
.
fromString
(
balance
.
getAccount_id
()));
// account_id: STRING
row
.
setField
(
2
,
balance
.
getUid
().
longValue
());
// uid: INT
row
.
setField
(
3
,
StringData
.
fromString
(
balance
.
getSymbol
()));
// symbol: STRING
row
.
setField
(
4
,
balance
.
getBalance
());
// balance: DOUBLE
row
.
setField
(
5
,
StringData
.
fromString
(
balance
.
getUpdated_at
()));
// updated_at: STRING
row
.
setField
(
6
,
0
);
out
.
collect
(
row
);
}
}
catch
(
Exception
e
)
{
logger
.
error
(
"RealBalanceAchi 处理 Kafka 消息出错 | rawData:{} | error:{}"
,
value
,
e
.
getMessage
());
}
}
});
rowDataStream
.
filter
(
Objects:
:
nonNull
)
.
sinkTo
(
dorisSink
)
.
name
(
"Doris-RealBalance"
);
}
public
static
List
<
RealBalance
>
handleData
(
String
record
)
throws
ParseException
,
Exception
{
logger
.
info
(
"RealKycAchi record:{}"
,
record
);
// 数据的 ETL 处理
JSONObject
jsonObj
=
JSON
.
parseObject
(
record
);
String
flumeType
=
jsonObj
.
getString
(
"flume_type"
);
String
bodyStr
=
jsonObj
.
getString
(
"data"
);
if
(!
StringUtils
.
equals
(
flumeType
,
"realBalance"
))
{
return
null
;
}
logger
.
info
(
"组装数据 body:{}"
,
bodyStr
);
return
JSONObject
.
parseObject
(
bodyStr
,
new
TypeReference
<
List
<
RealBalance
>>(){});
}
@Override
public
void
sendToSinkKafka
(
DataStreamSource
<
String
>
mStream
)
{
// TODO Auto-generated method stub
}
}
eagleEye-flink_kafka/src/main/java/com/flink/achieve/doris/RealKycAchi.java
deleted
100644 → 0
View file @
708c9999
package
com
.
flink
.
achieve
.
doris
;
import
java.io.Serializable
;
import
java.util.List
;
import
java.util.Objects
;
import
org.apache.commons.collections.CollectionUtils
;
import
org.apache.commons.lang3.StringUtils
;
import
org.apache.doris.flink.sink.DorisSink
;
import
org.apache.flink.api.common.functions.FlatMapFunction
;
import
org.apache.flink.api.common.io.ParseException
;
import
org.apache.flink.streaming.api.datastream.DataStreamSource
;
import
org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator
;
import
org.apache.flink.table.api.DataTypes
;
import
org.apache.flink.table.data.GenericRowData
;
import
org.apache.flink.table.data.RowData
;
import
org.apache.flink.table.data.StringData
;
import
org.apache.flink.table.types.DataType
;
import
org.apache.flink.util.Collector
;
import
org.slf4j.Logger
;
import
org.slf4j.LoggerFactory
;
import
com.alibaba.fastjson.JSON
;
import
com.alibaba.fastjson.JSONObject
;
import
com.alibaba.fastjson.TypeReference
;
import
com.flink.common.DorisConnector
;
import
com.flink.common.SourceCommonBase
;
import
com.flink.vo.RealKyc
;
/**
* @author wjs
* @version 创建时间:2025-5-8 11:37:18
* 类说明
*/
public
class
RealKycAchi
extends
SourceCommonBase
implements
Serializable
{
/**
*
*/
private
static
final
long
serialVersionUID
=
1L
;
private
static
final
Logger
logger
=
LoggerFactory
.
getLogger
(
RealKycAchi
.
class
);
@Override
public
void
parseSourceKafkaJson
(
DataStreamSource
<
String
>
dataStreamSource
)
throws
ParseException
,
Exception
{
//=================配置入库字段=========================================
String
[]
fields
=
{
"id"
,
"kind"
,
"procedure_verdict"
,
"admin_verdict"
,
"admin"
,
"memo"
,
"created_at"
,
"updated_at"
,
"__DORIS_DELETE_SIGN__"
};
DataType
[]
types
=
{
DataTypes
.
STRING
(),
DataTypes
.
STRING
(),
DataTypes
.
STRING
(),
DataTypes
.
STRING
(),
DataTypes
.
STRING
(),
DataTypes
.
STRING
(),
DataTypes
.
STRING
(),
DataTypes
.
STRING
(),
DataTypes
.
INT
()
};
//=================流式处理=========================================
String
tableName
=
"bi.real_kyc"
;
DorisSink
<
RowData
>
dorisSink
=
DorisConnector
.
sinkDoris
(
fields
,
types
,
tableName
);
SingleOutputStreamOperator
<
RowData
>
rowDataStream
=
dataStreamSource
.
flatMap
(
new
FlatMapFunction
<
String
,
RowData
>()
{
private
static
final
long
serialVersionUID
=
1L
;
@Override
public
void
flatMap
(
String
value
,
Collector
<
RowData
>
out
)
throws
Exception
{
try
{
// 解析 Kafka 数据
List
<
RealKyc
>
recordList
=
handleData
(
value
);
if
(
CollectionUtils
.
isEmpty
(
recordList
))
{
return
;
}
// 将每个 RealKyc 转换为 RowData 并输出
for
(
RealKyc
kyc
:
recordList
)
{
GenericRowData
row
=
new
GenericRowData
(
fields
.
length
);
row
.
setField
(
0
,
StringData
.
fromString
(
kyc
.
getId
()));
// id: INT
row
.
setField
(
1
,
StringData
.
fromString
(
kyc
.
getKind
()));
// kind: STRING
row
.
setField
(
2
,
StringData
.
fromString
(
kyc
.
getProcedure_verdict
()));
// procedure_verdict: STRING
row
.
setField
(
3
,
StringData
.
fromString
(
kyc
.
getAdmin_verdict
()));
// admin_verdict: STRING
row
.
setField
(
4
,
StringData
.
fromString
(
kyc
.
getAdmin
()));
// admin: STRING
row
.
setField
(
5
,
StringData
.
fromString
(
kyc
.
getMemo
()));
// memo: STRING
row
.
setField
(
6
,
StringData
.
fromString
(
kyc
.
getCreated_at
()));
// created_at: STRING
row
.
setField
(
7
,
StringData
.
fromString
(
kyc
.
getUpdated_at
()));
// updated_at: STRING
row
.
setField
(
8
,
0
);
out
.
collect
(
row
);
}
}
catch
(
Exception
e
)
{
logger
.
error
(
"RealKycAchi 处理 Kafka 消息出错 | rawData:{} | error:{}"
,
value
,
e
.
getMessage
());
}
}
});
rowDataStream
.
filter
(
Objects:
:
nonNull
)
.
sinkTo
(
dorisSink
)
.
name
(
"Doris-RealKyc"
);
}
public
static
List
<
RealKyc
>
handleData
(
String
record
)
throws
ParseException
,
Exception
{
logger
.
info
(
"RealKycAchi record:{}"
,
record
);
// 数据的 ETL 处理
JSONObject
jsonObj
=
JSON
.
parseObject
(
record
);
String
flumeType
=
jsonObj
.
getString
(
"flume_type"
);
String
bodyStr
=
jsonObj
.
getString
(
"data"
);
if
(!
StringUtils
.
equals
(
flumeType
,
"realKyc"
))
{
return
null
;
}
logger
.
info
(
"组装数据 body:{}"
,
bodyStr
);
return
JSONObject
.
parseObject
(
bodyStr
,
new
TypeReference
<
List
<
RealKyc
>>(){});
}
@Override
public
void
sendToSinkKafka
(
DataStreamSource
<
String
>
mStream
)
{
// TODO Auto-generated method stub
}
}
eagleEye-flink_kafka/src/main/java/com/flink/achieve/doris/RealTransactionAchi.java
deleted
100644 → 0
View file @
708c9999
package
com
.
flink
.
achieve
.
doris
;
import
java.io.Serializable
;
import
java.util.List
;
import
java.util.Objects
;
import
org.apache.commons.collections.CollectionUtils
;
import
org.apache.commons.lang3.StringUtils
;
import
org.apache.doris.flink.sink.DorisSink
;
import
org.apache.flink.api.common.functions.FlatMapFunction
;
import
org.apache.flink.api.common.io.ParseException
;
import
org.apache.flink.streaming.api.datastream.DataStreamSource
;
import
org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator
;
import
org.apache.flink.table.api.DataTypes
;
import
org.apache.flink.table.data.GenericRowData
;
import
org.apache.flink.table.data.RowData
;
import
org.apache.flink.table.data.StringData
;
import
org.apache.flink.table.types.DataType
;
import
org.apache.flink.util.Collector
;
import
org.slf4j.Logger
;
import
org.slf4j.LoggerFactory
;
import
com.alibaba.fastjson.JSON
;
import
com.alibaba.fastjson.JSONObject
;
import
com.alibaba.fastjson.TypeReference
;
import
com.flink.common.DorisConnector
;
import
com.flink.common.SourceCommonBase
;
import
com.flink.vo.RealTransaction
;
/**
* @author wjs
* @version 创建时间:2025-5-7 16:13:05
* 类说明
*/
public
class
RealTransactionAchi
extends
SourceCommonBase
implements
Serializable
{
/**
*
*/
private
static
final
long
serialVersionUID
=
1L
;
private
static
final
Logger
logger
=
LoggerFactory
.
getLogger
(
RealTransactionAchi
.
class
);
@Override
public
void
parseSourceKafkaJson
(
DataStreamSource
<
String
>
dataStreamSource
)
throws
ParseException
,
Exception
{
//=================配置入库字段=========================================
String
[]
fields
=
{
"id"
,
"sender"
,
"receiver"
,
"sender_id"
,
"receiver_id"
,
"symbol"
,
"amount"
,
"memo"
,
"stage"
,
"tx_type"
,
"receiver_contact"
,
"fee_mt"
,
"fee_amount"
,
"updated_at"
,
"created_at"
,
"__DORIS_DELETE_SIGN__"
};
DataType
[]
types
=
{
DataTypes
.
STRING
(),
DataTypes
.
STRING
(),
DataTypes
.
STRING
(),
DataTypes
.
BIGINT
(),
DataTypes
.
BIGINT
(),
DataTypes
.
STRING
(),
DataTypes
.
DOUBLE
(),
DataTypes
.
STRING
(),
DataTypes
.
STRING
(),
DataTypes
.
STRING
(),
DataTypes
.
STRING
(),
DataTypes
.
STRING
(),
DataTypes
.
STRING
(),
DataTypes
.
STRING
(),
DataTypes
.
STRING
(),
DataTypes
.
INT
()
};
//=================流式处理=========================================
String
tableName
=
"bi.real_transaction"
;
DorisSink
<
RowData
>
dorisSink
=
DorisConnector
.
sinkDoris
(
fields
,
types
,
tableName
);
SingleOutputStreamOperator
<
RowData
>
rowDataStream
=
dataStreamSource
.
flatMap
(
new
FlatMapFunction
<
String
,
RowData
>()
{
private
static
final
long
serialVersionUID
=
1L
;
@Override
public
void
flatMap
(
String
value
,
Collector
<
RowData
>
out
)
throws
Exception
{
try
{
// 解析 Kafka 数据
List
<
RealTransaction
>
recordList
=
handleData
(
value
);
if
(
CollectionUtils
.
isEmpty
(
recordList
))
{
return
;
}
// 将每个 RealTransaction 转换为 RowData 并输出
for
(
RealTransaction
transaction
:
recordList
)
{
GenericRowData
row
=
new
GenericRowData
(
fields
.
length
);
row
.
setField
(
0
,
StringData
.
fromString
(
transaction
.
getId
()));
// id: INT
row
.
setField
(
1
,
StringData
.
fromString
(
transaction
.
getSender
()));
// sender: STRING
row
.
setField
(
2
,
StringData
.
fromString
(
transaction
.
getReceiver
()));
// receiver: STRING
row
.
setField
(
3
,
transaction
.
getSender_id
().
longValue
());
// sender_id: INT
row
.
setField
(
4
,
transaction
.
getReceiver_id
().
longValue
());
// receiver_id: INT
row
.
setField
(
5
,
StringData
.
fromString
(
transaction
.
getSymbol
()));
// symbol: STRING
row
.
setField
(
6
,
transaction
.
getAmount
());
// amount: DOUBLE
row
.
setField
(
7
,
StringData
.
fromString
(
transaction
.
getMemo
()));
// memo: STRING
row
.
setField
(
8
,
StringData
.
fromString
(
transaction
.
getStage
()));
// stage: STRING
row
.
setField
(
9
,
StringData
.
fromString
(
transaction
.
getTx_type
()));
// tx_type: STRING
row
.
setField
(
10
,
StringData
.
fromString
(
transaction
.
getReceiver_contact
()));
// receiver_contact: STRING
row
.
setField
(
11
,
StringData
.
fromString
(
transaction
.
getFee_mt
()));
// fee_mt: STRING
row
.
setField
(
12
,
StringData
.
fromString
(
transaction
.
getFee_amount
()));
// fee_amount: STRING
row
.
setField
(
13
,
StringData
.
fromString
(
transaction
.
getUpdated_at
()));
// updated_at: STRING
row
.
setField
(
14
,
StringData
.
fromString
(
transaction
.
getCreated_at
()));
// created_at: STRING
row
.
setField
(
15
,
0
);
out
.
collect
(
row
);
}
}
catch
(
Exception
e
)
{
logger
.
error
(
"RealTransactionAchi 处理 Kafka 消息出错 | rawData:{} | error:{}"
,
value
,
e
.
getMessage
());
}
}
});
rowDataStream
.
filter
(
Objects:
:
nonNull
)
.
sinkTo
(
dorisSink
)
.
name
(
"Doris-RealTransaction"
);
}
public
static
List
<
RealTransaction
>
handleData
(
String
record
)
throws
ParseException
,
Exception
{
logger
.
info
(
"RealTransactionAchi record:{}"
,
record
);
//数据的 ETL 处理
JSONObject
jsonObj
=
JSON
.
parseObject
(
record
);
String
flumeType
=
jsonObj
.
getString
(
"flume_type"
);
String
bodyStr
=
jsonObj
.
getString
(
"data"
);
if
(!
StringUtils
.
equals
(
flumeType
,
"realTransaction"
))
{
return
null
;
}
logger
.
info
(
"组装数据 body:{}"
,
bodyStr
);
return
JSONObject
.
parseObject
(
bodyStr
,
new
TypeReference
<
List
<
RealTransaction
>>(){});
}
@Override
public
void
sendToSinkKafka
(
DataStreamSource
<
String
>
mStream
)
{
// TODO Auto-generated method stub
}
public
static
void
main
(
String
[]
args
)
{
String
record
=
"{\"flume_type\": \"realTransaction\", \"data\": [{\"id\": \"116075\", \"sender\": \"1002063153.user\", \"receiver\": \"15581239495.user\", \"sender_id\": 1002063153, \"receiver_id\": 15581239495, \"symbol\": \"TDW20\", \"amount\": 30000.0, \"memo\": null, \"stage\": \"SenderReconfirmed\", \"tx_type\": \"Forced\", \"receiver_contact\": \"15581239495.user\", \"fee_mt\": \"TDW20\", \"fee_amount\": 20.000000000000004, \"updated_at\": \"2025-04-03 06:39:58\", \"created_at\": \"2025-04-03 06:39:53\"}]}"
;
JSONObject
jsonObj
=
JSON
.
parseObject
(
record
);
String
flumeType
=
jsonObj
.
getString
(
"flume_type"
);
String
bodyStr
=
jsonObj
.
getString
(
"data"
);
List
<
RealTransaction
>
list
=
JSONObject
.
parseObject
(
bodyStr
,
new
TypeReference
<
List
<
RealTransaction
>>(){});
System
.
out
.
println
(
">>>>>>>>>>>>>flumeType:"
+
flumeType
+
"/n"
);
System
.
out
.
println
(
">>>>>>>>>>>>>bodyStr:"
+
bodyStr
+
"/n"
);
System
.
out
.
println
(
">>>>>>>>>>>>>list:"
+
list
+
"/n"
);
}
}
eagleEye-flink_kafka/src/main/java/com/flink/achieve/doris/RealUsersAchi.java
deleted
100644 → 0
View file @
708c9999
package
com
.
flink
.
achieve
.
doris
;
import
java.io.Serializable
;
import
java.util.List
;
import
java.util.Objects
;
import
org.apache.commons.collections.CollectionUtils
;
import
org.apache.commons.lang3.StringUtils
;
import
org.apache.doris.flink.sink.DorisSink
;
import
org.apache.flink.api.common.functions.FlatMapFunction
;
import
org.apache.flink.api.common.io.ParseException
;
import
org.apache.flink.streaming.api.datastream.DataStreamSource
;
import
org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator
;
import
org.apache.flink.table.api.DataTypes
;
import
org.apache.flink.table.data.GenericRowData
;
import
org.apache.flink.table.data.RowData
;
import
org.apache.flink.table.data.StringData
;
import
org.apache.flink.table.types.DataType
;
import
org.apache.flink.util.Collector
;
import
org.slf4j.Logger
;
import
org.slf4j.LoggerFactory
;
import
com.alibaba.fastjson.JSON
;
import
com.alibaba.fastjson.JSONObject
;
import
com.alibaba.fastjson.TypeReference
;
import
com.flink.common.DorisConnector
;
import
com.flink.common.SourceCommonBase
;
import
com.flink.vo.RealUsers
;
/**
* @author wjs
* @version 创建时间:2025-5-8 11:39:07
* 类说明
*/
public
class
RealUsersAchi
extends
SourceCommonBase
implements
Serializable
{
/**
*
*/
private
static
final
long
serialVersionUID
=
1L
;
private
static
final
Logger
logger
=
LoggerFactory
.
getLogger
(
RealUsersAchi
.
class
);
@Override
public
void
parseSourceKafkaJson
(
DataStreamSource
<
String
>
dataStreamSource
)
throws
ParseException
,
Exception
{
//=================配置入库字段=========================================
String
[]
fields
=
{
"id"
,
"phone_number"
,
"email"
,
"leader"
,
"leader_id"
,
"kind"
,
"login_pwd_hash"
,
"answer_indexes"
,
"main_account"
,
"device_manage_state"
,
"state"
,
"token_version"
,
"updated_at"
,
"created_at"
,
"__DORIS_DELETE_SIGN__"
};
DataType
[]
types
=
{
DataTypes
.
STRING
(),
// id
DataTypes
.
STRING
(),
// phone_number
DataTypes
.
STRING
(),
// email
DataTypes
.
STRING
(),
// leader
DataTypes
.
BIGINT
(),
// leader_id
DataTypes
.
STRING
(),
// kind
DataTypes
.
STRING
(),
// login_pwd_hash
DataTypes
.
STRING
(),
// answer_indexes
DataTypes
.
STRING
(),
// main_account
DataTypes
.
STRING
(),
// device_manage_state
DataTypes
.
STRING
(),
// state
DataTypes
.
STRING
(),
// token_version
DataTypes
.
STRING
(),
// updated_at
DataTypes
.
STRING
(),
// created_at
DataTypes
.
INT
()
};
//=================流式处理=========================================
String
tableName
=
"bi.real_users"
;
DorisSink
<
RowData
>
dorisSink
=
DorisConnector
.
sinkDoris
(
fields
,
types
,
tableName
);
SingleOutputStreamOperator
<
RowData
>
rowDataStream
=
dataStreamSource
.
flatMap
(
new
FlatMapFunction
<
String
,
RowData
>()
{
private
static
final
long
serialVersionUID
=
1L
;
@Override
public
void
flatMap
(
String
value
,
Collector
<
RowData
>
out
)
throws
Exception
{
try
{
List
<
RealUsers
>
usersList
=
handleData
(
value
);
if
(
CollectionUtils
.
isEmpty
(
usersList
))
return
;
for
(
RealUsers
user
:
usersList
)
{
GenericRowData
row
=
new
GenericRowData
(
fields
.
length
);
row
.
setField
(
0
,
StringData
.
fromString
(
user
.
getId
()));
row
.
setField
(
1
,
StringData
.
fromString
(
user
.
getPhone_number
()));
row
.
setField
(
2
,
StringData
.
fromString
(
user
.
getEmail
()));
row
.
setField
(
3
,
StringData
.
fromString
(
user
.
getLeader
()));
row
.
setField
(
4
,
user
.
getLeader_id
().
longValue
());
row
.
setField
(
5
,
StringData
.
fromString
(
user
.
getKind
()));
row
.
setField
(
6
,
StringData
.
fromString
(
user
.
getLogin_pwd_hash
()));
row
.
setField
(
7
,
StringData
.
fromString
(
user
.
getAnswer_indexes
()));
row
.
setField
(
8
,
StringData
.
fromString
(
user
.
getMain_account
()));
row
.
setField
(
9
,
StringData
.
fromString
(
user
.
getDevice_manage_state
()));
row
.
setField
(
10
,
StringData
.
fromString
(
user
.
getState
()));
row
.
setField
(
11
,
StringData
.
fromString
(
user
.
getToken_version
()));
row
.
setField
(
12
,
StringData
.
fromString
(
user
.
getUpdated_at
()));
row
.
setField
(
13
,
StringData
.
fromString
(
user
.
getCreated_at
()));
row
.
setField
(
14
,
0
);
out
.
collect
(
row
);
}
}
catch
(
Exception
e
)
{
logger
.
error
(
"RealUsersAchi 处理 Kafka 消息出错 | rawData:{} | error:{}"
,
value
,
e
.
getMessage
());
}
}
});
rowDataStream
.
filter
(
Objects:
:
nonNull
)
.
sinkTo
(
dorisSink
)
.
name
(
"Doris-UserSink"
);
}
public
static
List
<
RealUsers
>
handleData
(
String
record
)
throws
ParseException
,
Exception
{
logger
.
info
(
"RealUsersAchi record:{}"
,
record
);
// 数据的 ETL 处理
JSONObject
jsonObj
=
JSON
.
parseObject
(
record
);
String
flumeType
=
jsonObj
.
getString
(
"flume_type"
);
String
bodyStr
=
jsonObj
.
getString
(
"data"
);
if
(!
StringUtils
.
equals
(
flumeType
,
"realUsers"
))
{
return
null
;
}
logger
.
info
(
"组装数据 body:{}"
,
bodyStr
);
return
JSONObject
.
parseObject
(
bodyStr
,
new
TypeReference
<
List
<
RealUsers
>>(){});
}
@Override
public
void
sendToSinkKafka
(
DataStreamSource
<
String
>
mStream
)
{
// TODO Auto-generated method stub
}
}
eagleEye-flink_kafka/src/main/java/com/flink/achieve/doris/RegistrationCheckAchi.java
0 → 100644
View file @
b79af8cf
package
com
.
flink
.
achieve
.
doris
;
import
java.io.Serializable
;
import
java.time.Duration
;
import
java.util.List
;
import
org.apache.commons.collections.CollectionUtils
;
import
org.apache.commons.lang3.StringUtils
;
import
org.apache.flink.api.common.eventtime.WatermarkStrategy
;
import
org.apache.flink.api.common.functions.FlatMapFunction
;
import
org.apache.flink.api.common.io.ParseException
;
import
org.apache.flink.api.java.tuple.Tuple3
;
import
org.apache.flink.streaming.api.datastream.DataStream
;
import
org.apache.flink.streaming.api.datastream.DataStreamSource
;
import
org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator
;
import
org.apache.flink.streaming.api.functions.co.ProcessJoinFunction
;
import
org.apache.flink.streaming.api.windowing.assigners.GlobalWindows
;
import
org.apache.flink.streaming.api.windowing.evictors.TimeEvictor
;
import
org.apache.flink.streaming.api.windowing.triggers.ContinuousProcessingTimeTrigger
;
import
org.apache.flink.util.Collector
;
import
org.slf4j.Logger
;
import
org.slf4j.LoggerFactory
;
import
com.alibaba.fastjson.JSON
;
import
com.alibaba.fastjson.JSONObject
;
import
com.alibaba.fastjson.TypeReference
;
import
com.flink.common.MultipleSourceCommonBase
;
import
com.flink.enums.OpenSimiApiTypeEnum
;
import
com.flink.enums.TopicTypeEnum
;
import
com.flink.processor.function.DistinctUserAggregator
;
import
com.flink.processor.function.LatestUserProcessFunction
;
import
com.flink.processor.function.WindowResultFunction
;
import
com.flink.util.TimeConvertUtil
;
import
com.flink.vo.DeviceId
;
import
com.flink.vo.DeviceRegistrationResult
;
import
com.flink.vo.KafkaDataSource
;
import
com.flink.vo.SimiUserInfo
;
import
com.flink.vo.simi.UserRegistrationReqDto
;
/**
* @author wjs
* @version 创建时间:2025-7-1 16:14:00
* 类说明 注册检验
*/
//2. 主处理逻辑
public
class
RegistrationCheckAchi
extends
MultipleSourceCommonBase
implements
Serializable
{
/**
*
*/
private
static
final
long
serialVersionUID
=
1L
;
private
static
final
Logger
logger
=
LoggerFactory
.
getLogger
(
RegistrationCheckAchi
.
class
);
@Override
public
void
parseSourceKafkaJson
(
List
<
KafkaDataSource
>
dataSourceList
)
throws
ParseException
,
Exception
{
DataStreamSource
<
String
>
openSimiApiStreamSource
=
null
;
DataStreamSource
<
String
>
collectLogStreamSource
=
null
;
DataStreamSource
<
String
>
pcCollectLogStreamSource
=
null
;
if
(
CollectionUtils
.
isNotEmpty
(
dataSourceList
))
{
for
(
KafkaDataSource
kafkaDataSource
:
dataSourceList
)
{
if
(
StringUtils
.
equals
(
kafkaDataSource
.
getTopic
(),
TopicTypeEnum
.
OPEN_SIMI_API
.
getTopic
()))
{
openSimiApiStreamSource
=
kafkaDataSource
.
getDataStreamSource
();
}
if
(
StringUtils
.
equals
(
kafkaDataSource
.
getTopic
(),
TopicTypeEnum
.
ODS_NEW_COLLECT_LOG
.
getTopic
()))
{
collectLogStreamSource
=
kafkaDataSource
.
getDataStreamSource
();
}
if
(
StringUtils
.
equals
(
kafkaDataSource
.
getTopic
(),
TopicTypeEnum
.
ODS_PC_COLLECT_LOG
.
getTopic
()))
{
pcCollectLogStreamSource
=
kafkaDataSource
.
getDataStreamSource
();
}
}
}
else
{
return
;
}
// 1. 注册流水印策略(空闲检测 + 允许5分钟乱序)
//simi国内用户注册数据流处理
DataStream
<
SimiUserInfo
>
registerDataStream
=
openSimiApiStreamSource
.
flatMap
(
new
FlatMapFunction
<
String
,
SimiUserInfo
>()
{
private
static
final
long
serialVersionUID
=
1L
;
@Override
public
void
flatMap
(
String
value
,
Collector
<
SimiUserInfo
>
out
)
throws
Exception
{
try
{
//解析 Kafka 数据
JSONObject
jsonObj
=
JSON
.
parseObject
(
value
);
String
sendType
=
jsonObj
.
getString
(
"send_type"
);
// String createTime = jsonObj.getString("create_time");
if
(!
StringUtils
.
equals
(
OpenSimiApiTypeEnum
.
USER_REGISTRATION
.
getCode
(),
sendType
))
{
return
;
}
String
reqBody
=
jsonObj
.
getString
(
"req_body"
);
UserRegistrationReqDto
userRegistrationReqDto
=
JSONObject
.
parseObject
(
reqBody
,
new
TypeReference
<
UserRegistrationReqDto
>()
{});
SimiUserInfo
simiUserInfo
=
new
SimiUserInfo
();
simiUserInfo
.
setCid
(
userRegistrationReqDto
.
getCid
());
simiUserInfo
.
setCountry_code
(
userRegistrationReqDto
.
getCountryCode
());
simiUserInfo
.
setPhone_number
(
userRegistrationReqDto
.
getPhoneNumber
());
simiUserInfo
.
setUpdateTime
(
TimeConvertUtil
.
convertToTimestampSSS
(
userRegistrationReqDto
.
getTime
()));
logger
.
info
(
">>>>>>>>>>registerDataStream cid:{},Country_code:{},Phone_number:{},UpdateTime:{}"
,
userRegistrationReqDto
.
getCid
()
,
userRegistrationReqDto
.
getCountryCode
(),
userRegistrationReqDto
.
getPhoneNumber
(),
TimeConvertUtil
.
convertToTimestampSSS
(
userRegistrationReqDto
.
getTime
()));
out
.
collect
(
simiUserInfo
);
}
catch
(
Exception
e
)
{
logger
.
error
(
"Error parsing simi_user_list 处理 Kafka 消息出错 | data:{} | error:{}"
,
value
,
e
.
getMessage
());
}
}
})
.
filter
(
u
->
u
!=
null
&&
StringUtils
.
isNoneEmpty
(
u
.
getCid
()))
.
assignTimestampsAndWatermarks
(
WatermarkStrategy
.<
SimiUserInfo
>
forBoundedOutOfOrderness
(
Duration
.
ofMinutes
(
5
))
.
withTimestampAssigner
((
user
,
ts
)
->
user
.
getUpdateTime
()))
.
keyBy
(
user
->
user
.
getCid
())
.
process
(
new
LatestUserProcessFunction
());
//2. 设备流水印策略(允许10秒乱序,适应高频采集)合并APP/PC设备流
DataStream
<
DeviceId
>
mergedDeviceStream
=
collectLogStreamSource
.
flatMap
(
new
FlatMapFunction
<
String
,
DeviceId
>()
{
private
static
final
long
serialVersionUID
=
1L
;
@Override
public
void
flatMap
(
String
value
,
Collector
<
DeviceId
>
out
)
throws
Exception
{
try
{
//解析 Kafka 数据
DeviceId
device
=
DeviceIdLatestAchi
.
handleData
(
value
);
if
(
device
!=
null
)
{
logger
.
info
(
">>>>>>>>>>mergedDeviceStream cid:{},CollectTime:{},CreateTime:{}"
,
device
.
getCid
(),
device
.
getCollectTime
(),
device
.
getCreateTime
());
out
.
collect
(
device
);
}
}
catch
(
Exception
e
)
{
logger
.
error
(
"Error parsing ods_new_collect_log 处理 Kafka 消息出错 | data:{} | error:{}"
,
value
,
e
.
getMessage
());
}
}
}).
union
(
//PC设备信息数据流处理
pcCollectLogStreamSource
.
flatMap
(
new
FlatMapFunction
<
String
,
DeviceId
>()
{
private
static
final
long
serialVersionUID
=
1L
;
@Override
public
void
flatMap
(
String
value
,
Collector
<
DeviceId
>
out
)
throws
Exception
{
try
{
//解析 Kafka 数据
DeviceId
device
=
DeviceIdLatestAchi
.
handlePcData
(
value
);
if
(
device
!=
null
)
{
logger
.
info
(
">>>>>>>>>>mergedDeviceStreamPc cid:{},CollectTime:{},CreateTime:{}"
,
device
.
getCid
(),
device
.
getCollectTime
(),
device
.
getCreateTime
());
out
.
collect
(
device
);
}
}
catch
(
Exception
e
)
{
logger
.
error
(
"Error parsing ods_pc_collect_log 处理 Kafka 消息出错 | data:{} | error:{}"
,
value
,
e
.
getMessage
());
}
}
})
).
filter
(
device
->
device
!=
null
&&
StringUtils
.
isNoneEmpty
(
device
.
getCid
()))
.
assignTimestampsAndWatermarks
(
WatermarkStrategy
.<
DeviceId
>
forBoundedOutOfOrderness
(
Duration
.
ofMinutes
(
5
))
.
withTimestampAssigner
((
device
,
ts
)
->
device
.
getCollectTime
()));
DataStream
<
Tuple3
<
String
,
String
,
Long
>>
deviceUserStream
=
registerDataStream
.
keyBy
(
SimiUserInfo:
:
getCid
)
.
intervalJoin
(
mergedDeviceStream
.
keyBy
(
DeviceId:
:
getCid
))
.
between
(
Duration
.
ofHours
(-
12
),
Duration
.
ofHours
(
12
))
// 时间区间关联
.
process
(
new
ProcessJoinFunction
<
SimiUserInfo
,
DeviceId
,
Tuple3
<
String
,
String
,
Long
>>()
{
/**
*
*/
private
static
final
long
serialVersionUID
=
1L
;
@Override
public
void
processElement
(
SimiUserInfo
user
,
DeviceId
device
,
Context
ctx
,
Collector
<
Tuple3
<
String
,
String
,
Long
>>
out
)
{
// 输出: (设备ID, 用户ID, 注册时间)
logger
.
info
(
">>>>>>>>>>deviceUserStream deviceId:{},cid:{},UpdateTime:{}"
,
device
.
getDeviceId
(),
user
.
getCid
(),
user
.
getUpdateTime
());
// 确保cid不为空且相等(实际上keyBy已经保证,此处为冗余校验)
if
(
user
.
getCid
().
equals
(
device
.
getCid
()))
{
out
.
collect
(
Tuple3
.
of
(
device
.
getDeviceId
(),
user
.
getCid
(),
user
.
getUpdateTime
()));
}
}
});
SingleOutputStreamOperator
<
DeviceRegistrationResult
>
resultStream
=
deviceUserStream
.
assignTimestampsAndWatermarks
(
WatermarkStrategy
.<
Tuple3
<
String
,
String
,
Long
>>
forBoundedOutOfOrderness
(
Duration
.
ofMinutes
(
5
))
.
withTimestampAssigner
((
event
,
ts
)
->
event
.
f2
)
// 使用事件时间戳
)
.
keyBy
(
event
->
event
.
f0
)
// 按设备ID分组
.
window
(
GlobalWindows
.
create
())
// 全局窗口保留所有数据
.
trigger
(
ContinuousProcessingTimeTrigger
.
of
(
Duration
.
ofSeconds
(
5
)))
// 每5秒触发计算
.
evictor
(
TimeEvictor
.
of
(
Duration
.
ofHours
(
24
)))
// 驱逐24小时前数据
.
aggregate
(
new
DistinctUserAggregator
(),
new
WindowResultFunction
()
);
resultStream
.
print
(
"同设备24小时注册人数统计(实时更新)"
);
//窗口统计(按设备ID分组,24小时滚动窗口)
// deviceUserStream.assignTimestampsAndWatermarks(
// WatermarkStrategy.<Tuple3<String, String, Long>>forBoundedOutOfOrderness(Duration.ofMinutes(5))
// .withTimestampAssigner((event, ts) -> event.f2) // 使用注册时间戳
// ).keyBy(event -> event.f0) // 按设备ID分组
// .window(TumblingEventTimeWindows.of(Duration.ofDays(1))) // 24小时滚动窗口
// .aggregate(new DistinctUserAggregator(), new WindowResultFunction())
// .disableChaining()
// .print("同设备24小时注册人数统计");
}
}
\ No newline at end of file
eagleEye-flink_kafka/src/main/java/com/flink/enums/JobTypeEnum.java
View file @
b79af8cf
...
...
@@ -18,10 +18,8 @@ import java.util.stream.Collectors;
*/
public
enum
JobTypeEnum
{
EVENT_IP_CONVERT
(
"JOB_01"
,
"事件IP转换作业"
),
REAL_TRANSACTION
(
"JOB_02"
,
"实际交易作业"
),
REAL_KYC
(
"JOB_03"
,
"真实KYC作业"
),
REAL_USERS
(
"JOB_04"
,
"真实用户作业"
),
REAL_BALANCE
(
"JOB_05"
,
"真实余额作业"
),
COMMON_CONSUME_BASE
(
"JOB_02"
,
"公共基础消费采集作业"
),
COLLECT_LOG
(
"JOB_06"
,
"日志采集作业"
),
EVENT_IP_CONVERT_CID
(
"JOB_07"
,
"最新事件IP作业"
),
DEVICE_ID_CID
(
"JOB_08"
,
"最新设备ID作业"
),
...
...
@@ -29,7 +27,7 @@ public enum JobTypeEnum {
SIMI_GROUPS
(
"JOB_10"
,
"SIMI群组作业"
),
VECTOR_ANGLE_CALCULATION
(
"JOB_11"
,
"矢量角度计算作业"
),
EVENT_LOG
(
"JOB_12"
,
"事件采集作业"
),
REGISTRATION_CHECK
(
"JOB_13"
,
"注册检验采集作业"
),
;
private
static
final
String
JOB_PREFIX
=
"作业类型-"
;
...
...
eagleEye-flink_kafka/src/main/java/com/flink/enums/TopicTypeEnum.java
View file @
b79af8cf
...
...
@@ -29,6 +29,7 @@ public enum TopicTypeEnum {
OPEN_SIMI_API
(
"ods_open_simi_api"
,
"odsOpenSimiApi"
),
ODS_PC_EVENT_LOG
(
"ods_pc_event_log"
,
"odsPcEventLog"
),
ODS_PC_COLLECT_LOG
(
"ods_pc_collect_log"
,
"odsPcCollectLog"
),
ODS_COMMUNITY_HISTORY
(
"ods_community_history"
,
"odsCommunityHistory"
),
;
private
String
topic
;
...
...
eagleEye-flink_kafka/src/main/java/com/flink/factory/JobProcessorFactory.java
View file @
b79af8cf
...
...
@@ -3,14 +3,12 @@ package com.flink.factory;
import
com.flink.enums.JobTypeEnum
;
import
com.flink.processor.JobProcessor
;
import
com.flink.processor.impl.CollectLogProcessor
;
import
com.flink.processor.impl.CommonConsumeBaseProcessor
;
import
com.flink.processor.impl.DeviceIdLatestProcessor
;
import
com.flink.processor.impl.EventIpConvertProcessor
;
import
com.flink.processor.impl.EventIpLatestProcessor
;
import
com.flink.processor.impl.EventLogProcessor
;
import
com.flink.processor.impl.RealBalanceProcessor
;
import
com.flink.processor.impl.RealKycProcessor
;
import
com.flink.processor.impl.RealTransactionProcessor
;
import
com.flink.processor.impl.RealUsersProcessor
;
import
com.flink.processor.impl.RegistrationCheckProcessor
;
import
com.flink.processor.impl.SimiFriendsProcessor
;
import
com.flink.processor.impl.SimiGroupstProcessor
;
import
com.flink.processor.impl.VectorAngleCalculationProcessor
;
...
...
@@ -26,14 +24,6 @@ public class JobProcessorFactory {
switch
(
jobType
)
{
case
EVENT_IP_CONVERT:
return
new
EventIpConvertProcessor
();
case
REAL_TRANSACTION:
return
new
RealTransactionProcessor
();
case
REAL_KYC:
return
new
RealKycProcessor
();
case
REAL_USERS:
return
new
RealUsersProcessor
();
case
REAL_BALANCE:
return
new
RealBalanceProcessor
();
case
COLLECT_LOG:
return
new
CollectLogProcessor
();
case
EVENT_IP_CONVERT_CID:
...
...
@@ -48,9 +38,12 @@ public class JobProcessorFactory {
return
new
VectorAngleCalculationProcessor
();
case
EVENT_LOG:
return
new
EventLogProcessor
();
case
REGISTRATION_CHECK:
return
new
RegistrationCheckProcessor
();
case
COMMON_CONSUME_BASE:
return
new
CommonConsumeBaseProcessor
();
default
:
throw
new
IllegalArgumentException
(
"未知的Job类型: "
+
jobType
);
}
}
}
eagleEye-flink_kafka/src/main/java/com/flink/processor/function/CollectLogJoinProcessor.java
View file @
b79af8cf
...
...
@@ -71,6 +71,10 @@ public class CollectLogJoinProcessor extends CoProcessFunction<DeviceId, SimiUse
result
.
setAppKey
(
device
.
getAppKey
());
result
.
setAppType
(
device
.
getAppType
());
result
.
setCreateTime
(
device
.
getCreateTime
());
result
.
setBrand
(
device
.
getBrand
());
result
.
setModel
(
device
.
getModel
());
result
.
setOsRelease
(
device
.
getOsRelease
());
result
.
setAppVersion
(
device
.
getAppVersion
());
return
result
;
}
...
...
eagleEye-flink_kafka/src/main/java/com/flink/processor/function/DistinctUserAggregator.java
0 → 100644
View file @
b79af8cf
package
com
.
flink
.
processor
.
function
;
import
java.util.HashSet
;
import
java.util.Set
;
import
org.apache.flink.api.common.functions.AggregateFunction
;
import
org.apache.flink.api.java.tuple.Tuple3
;
/**
* @author wjs
* @version 创建时间:2025-7-8 11:08:15
* 类说明
*/
public
class
DistinctUserAggregator
implements
AggregateFunction
<
Tuple3
<
String
,
String
,
Long
>,
Set
<
String
>,
Integer
>{
/**
*
*/
private
static
final
long
serialVersionUID
=
1L
;
@Override
public
Set
<
String
>
createAccumulator
()
{
return
new
HashSet
<>();
}
@Override
public
Set
<
String
>
add
(
Tuple3
<
String
,
String
,
Long
>
value
,
Set
<
String
>
accumulator
)
{
accumulator
.
add
(
value
.
f1
);
// 添加用户ID
return
accumulator
;
}
@Override
public
Integer
getResult
(
Set
<
String
>
accumulator
)
{
return
accumulator
.
size
();
// 返回不同用户数
}
@Override
public
Set
<
String
>
merge
(
Set
<
String
>
a
,
Set
<
String
>
b
)
{
a
.
addAll
(
b
);
return
a
;
}
}
eagleEye-flink_kafka/src/main/java/com/flink/processor/function/GenDeviceIdProcessor.java
View file @
b79af8cf
...
...
@@ -27,6 +27,10 @@ public class GenDeviceIdProcessor {
String
deviceId
=
null
;
String
idfv
=
null
;
String
deviceIdV1
=
null
;
String
brand
=
null
;
String
model
=
null
;
String
osRelease
=
null
;
String
appVersion
=
null
;
if
(
StringUtils
.
equals
(
appType
,
AppTypeEnum
.
ANDROID
.
getCode
()))
{
AndroidA1
a1
=
JSONObject
.
parseObject
(
device_info
,
new
TypeReference
<
AndroidA1
>(){});
AndroidEnvInfo
g1
=
JSONObject
.
parseObject
(
env_info
,
new
TypeReference
<
AndroidEnvInfo
>(){});
...
...
@@ -36,6 +40,10 @@ public class GenDeviceIdProcessor {
androidBodyObj
.
setG1
(
g1
);
androidBodyObj
.
setI1
(
i1
);
deviceId
=
GenDeviceIdV1
.
genAndroidDeviceIdHashV1
(
androidBodyObj
);
brand
=
a1
.
getB2
();
model
=
a1
.
getB1
();
osRelease
=
a1
.
getB4
();
appVersion
=
a1
.
getD6
().
getL2
();
}
else
if
(
StringUtils
.
equals
(
appType
,
AppTypeEnum
.
IOS
.
getCode
()))
{
IosDeviceInfo
a1
=
JSONObject
.
parseObject
(
device_info
,
new
TypeReference
<
IosDeviceInfo
>(){});
IosEnvInfo
g1
=
JSONObject
.
parseObject
(
env_info
,
new
TypeReference
<
IosEnvInfo
>(){});
...
...
@@ -43,6 +51,10 @@ public class GenDeviceIdProcessor {
iosBodyObj
.
setA1
(
a1
);
iosBodyObj
.
setG1
(
g1
);
deviceId
=
GenDeviceIdV1
.
genIosDeviceIdHash
(
iosBodyObj
);
brand
=
a1
.
getB2
();
model
=
a1
.
getB1
();
osRelease
=
a1
.
getB4
();
appVersion
=
a1
.
getL2
();
}
if
(
StringUtils
.
equals
(
appType
,
AppTypeEnum
.
ANDROID
.
getCode
()))
{
...
...
@@ -58,6 +70,10 @@ public class GenDeviceIdProcessor {
deviceIdInfo
.
setDeviceIdV1
(
deviceIdV1
);
deviceIdInfo
.
setIdfv
(
idfv
);
deviceIdInfo
.
setAppKey
(
appKey
);
deviceIdInfo
.
setBrand
(
brand
);
deviceIdInfo
.
setModel
(
model
);
deviceIdInfo
.
setOsRelease
(
osRelease
);
deviceIdInfo
.
setAppVersion
(
appVersion
);
return
deviceIdInfo
;
}
...
...
eagleEye-flink_kafka/src/main/java/com/flink/processor/function/JoinDeviceWithRegistrationProcessFunction.java
0 → 100644
View file @
b79af8cf
package
com
.
flink
.
processor
.
function
;
import
java.io.IOException
;
import
org.apache.flink.api.common.state.ValueState
;
import
org.apache.flink.api.common.state.ValueStateDescriptor
;
import
org.apache.flink.api.java.tuple.Tuple3
;
import
org.apache.flink.configuration.Configuration
;
import
org.apache.flink.streaming.api.functions.co.KeyedCoProcessFunction
;
import
org.apache.flink.util.Collector
;
import
org.slf4j.Logger
;
import
org.slf4j.LoggerFactory
;
import
com.flink.vo.DeviceId
;
import
com.flink.vo.SimiUserInfo
;
/**
* @author wjs
* @version 创建时间:2025-7-8 11:03:06
* 类说明
*/
public
class
JoinDeviceWithRegistrationProcessFunction
extends
KeyedCoProcessFunction
<
String
,
SimiUserInfo
,
DeviceId
,
Tuple3
<
String
,
String
,
Long
>>{
/**
*
*/
private
static
final
long
serialVersionUID
=
1L
;
private
static
final
Logger
logger
=
LoggerFactory
.
getLogger
(
JoinDeviceWithRegistrationProcessFunction
.
class
);
// 存储设备信息的状态
private
transient
ValueState
<
DeviceId
>
deviceState
;
@Override
public
void
open
(
Configuration
parameters
)
{
ValueStateDescriptor
<
DeviceId
>
descriptor
=
new
ValueStateDescriptor
<>(
"deviceState"
,
DeviceId
.
class
);
deviceState
=
getRuntimeContext
().
getState
(
descriptor
);
}
@Override
public
void
processElement1
(
SimiUserInfo
user
,
Context
ctx
,
Collector
<
Tuple3
<
String
,
String
,
Long
>>
out
)
throws
Exception
{
DeviceId
device
=
deviceState
.
value
();
if
(
device
!=
null
)
{
//输出三元组: (设备ID, 用户ID, 注册时间)
logger
.
info
(
"输出三元组 设备ID:{}, 用户ID:{}, 注册时间:{}"
,
device
.
getDeviceId
(),
user
.
getCid
(),
user
.
getUpdateTime
());
out
.
collect
(
new
Tuple3
<>(
device
.
getDeviceId
(),
user
.
getCid
(),
user
.
getUpdateTime
()));
}
}
@Override
public
void
processElement2
(
DeviceId
device
,
Context
ctx
,
Collector
<
Tuple3
<
String
,
String
,
Long
>>
out
)
throws
IOException
{
// 更新设备状态
deviceState
.
update
(
device
);
}
}
eagleEye-flink_kafka/src/main/java/com/flink/processor/function/LatestUserProcessFunction.java
View file @
b79af8cf
...
...
@@ -23,7 +23,7 @@ public class LatestUserProcessFunction extends KeyedProcessFunction<String, Simi
*/
private
static
final
long
serialVersionUID
=
1L
;
private
ValueState
<
SimiUserInfo
>
latestUserState
;
private
transient
ValueState
<
SimiUserInfo
>
latestUserState
;
@Override
public
void
open
(
Configuration
parameters
)
{
...
...
eagleEye-flink_kafka/src/main/java/com/flink/processor/function/WindowResultFunction.java
0 → 100644
View file @
b79af8cf
package
com
.
flink
.
processor
.
function
;
import
java.time.Duration
;
import
org.apache.flink.api.common.state.MapState
;
import
org.apache.flink.api.common.state.MapStateDescriptor
;
import
org.apache.flink.api.common.state.StateTtlConfig
;
import
org.apache.flink.api.common.typeinfo.TypeHint
;
import
org.apache.flink.api.common.typeinfo.TypeInformation
;
import
org.apache.flink.configuration.Configuration
;
import
org.apache.flink.streaming.api.functions.windowing.ProcessWindowFunction
;
import
org.apache.flink.streaming.api.windowing.windows.GlobalWindow
;
import
org.apache.flink.util.Collector
;
import
org.slf4j.Logger
;
import
org.slf4j.LoggerFactory
;
import
com.flink.vo.DeviceRegistrationResult
;
/**
* @author wjs
* @version 创建时间:2025-7-8 11:10:12
* 类说明
*/
public
class
WindowResultFunction
extends
ProcessWindowFunction
<
Integer
,
DeviceRegistrationResult
,
String
,
GlobalWindow
>{
/**
*
*/
private
static
final
long
serialVersionUID
=
1L
;
private
static
final
Logger
logger
=
LoggerFactory
.
getLogger
(
WindowResultFunction
.
class
);
private
transient
MapState
<
String
,
Boolean
>
state
;
@Override
public
void
open
(
Configuration
parameters
)
{
// 配置24小时TTL状态
StateTtlConfig
ttlConfig
=
StateTtlConfig
.
newBuilder
(
Duration
.
ofHours
(
24
))
.
setUpdateType
(
StateTtlConfig
.
UpdateType
.
OnCreateAndWrite
)
.
cleanupInRocksdbCompactFilter
(
1000
)
// RocksDB专用优化
.
build
();
MapStateDescriptor
<
String
,
Boolean
>
descriptor
=
new
MapStateDescriptor
<>(
"distinct-users"
,
TypeInformation
.
of
(
new
TypeHint
<
String
>()
{}),
TypeInformation
.
of
(
new
TypeHint
<
Boolean
>()
{})
);
descriptor
.
enableTimeToLive
(
ttlConfig
);
// 启用TTL
state
=
getRuntimeContext
().
getMapState
(
descriptor
);
}
@Override
public
void
process
(
String
deviceId
,
Context
context
,
Iterable
<
Integer
>
counts
,
Collector
<
DeviceRegistrationResult
>
out
)
throws
Exception
{
Integer
userCount
=
counts
.
iterator
().
next
();
//更新全局状态(记录已处理设备)
if
(!
state
.
contains
(
deviceId
))
{
state
.
put
(
deviceId
,
true
);
}
logger
.
info
(
"窗口窗口结果处理》》》 deviceId:{},windowEnd:{},count:{}"
,
deviceId
,
System
.
currentTimeMillis
(),
userCount
);
//输出窗口结果(设备ID, 窗口结束时间, 用户数)
out
.
collect
(
new
DeviceRegistrationResult
(
deviceId
,
// context.window().getEnd(),
System
.
currentTimeMillis
(),
userCount
));
}
}
eagleEye-flink_kafka/src/main/java/com/flink/processor/impl/
RealKyc
Processor.java
→
eagleEye-flink_kafka/src/main/java/com/flink/processor/impl/
CommonConsumeBase
Processor.java
View file @
b79af8cf
package
com
.
flink
.
processor
.
impl
;
import
com.flink.achieve.doris.RealKycAchi
;
import
java.util.Arrays
;
import
java.util.List
;
import
java.util.stream.Collectors
;
import
com.flink.achieve.base.CommonConsumeBaseAchi
;
import
com.flink.enums.JobTypeEnum
;
import
com.flink.enums.TopicTypeEnum
;
import
com.flink.processor.JobProcessor
;
import
com.flink.vo.KafkaTopic
;
/**
* @author wjs
* @version 创建时间:2025-
5-8 11:36:02
* @version 创建时间:2025-
7-25 10:34:20
* 类说明
*/
public
class
RealKyc
Processor
implements
JobProcessor
{
public
class
CommonConsumeBase
Processor
implements
JobProcessor
{
@Override
public
void
process
()
throws
Exception
{
new
RealKyc
Achi
().
handleDataStreamSource
(
JobTypeEnum
.
REAL_KYC
,
TopicTypeEnum
.
ODS_USER_INVITATION
new
CommonConsumeBase
Achi
().
handleDataStreamSource
(
createTopicList
()
,
JobTypeEnum
.
COMMON_CONSUME_BASE
);
}
private
static
List
<
KafkaTopic
>
createTopicList
()
{
return
Arrays
.
stream
(
new
TopicTypeEnum
[]{
TopicTypeEnum
.
ODS_USER_INVITATION
,
TopicTypeEnum
.
ODS_EVENT_LOG
,
TopicTypeEnum
.
ODS_COMMUNITY_HISTORY
,
TopicTypeEnum
.
ODS_NEW_COLLECT_LOG
}).
map
(
TopicTypeEnum:
:
createKafkaTopic
)
.
collect
(
Collectors
.
toList
());
}
}
eagleEye-flink_kafka/src/main/java/com/flink/processor/impl/EventLogProcessor.java
View file @
b79af8cf
package
com
.
flink
.
processor
.
impl
;
import
java.util.Arrays
;
import
java.util.List
;
import
java.util.stream.Collectors
;
import
com.flink.achieve.doris.EventLogAchi
;
import
com.flink.enums.JobTypeEnum
;
import
com.flink.enums.TopicTypeEnum
;
import
com.flink.processor.JobProcessor
;
import
com.flink.vo.KafkaTopic
;
/**
* @author wjs
...
...
@@ -14,9 +19,17 @@ public class EventLogProcessor implements JobProcessor{
@Override
public
void
process
()
throws
Exception
{
new
EventLogAchi
().
handleDataStreamSource
(
JobTypeEnum
.
EVENT_LOG
,
TopicTypeEnum
.
ODS_
EVENT_LOG
createTopicList
()
,
JobTypeEnum
.
EVENT_LOG
);
}
private
static
List
<
KafkaTopic
>
createTopicList
()
{
return
Arrays
.
stream
(
new
TopicTypeEnum
[]{
TopicTypeEnum
.
ODS_EVENT_LOG
,
TopicTypeEnum
.
ODS_COMMUNITY_HISTORY
}).
map
(
TopicTypeEnum:
:
createKafkaTopic
)
.
collect
(
Collectors
.
toList
());
}
}
eagleEye-flink_kafka/src/main/java/com/flink/processor/impl/RealTransactionProcessor.java
deleted
100644 → 0
View file @
708c9999
package
com
.
flink
.
processor
.
impl
;
import
com.flink.achieve.doris.RealTransactionAchi
;
import
com.flink.enums.JobTypeEnum
;
import
com.flink.enums.TopicTypeEnum
;
import
com.flink.processor.JobProcessor
;
/**
* @author wjs
* @version 创建时间:2025-5-7 16:12:34
* 类说明
*/
public
class
RealTransactionProcessor
implements
JobProcessor
{
@Override
public
void
process
()
throws
Exception
{
new
RealTransactionAchi
().
handleDataStreamSource
(
JobTypeEnum
.
REAL_TRANSACTION
,
TopicTypeEnum
.
ODS_USER_INVITATION
);
}
}
eagleEye-flink_kafka/src/main/java/com/flink/processor/impl/RealUsersProcessor.java
deleted
100644 → 0
View file @
708c9999
package
com
.
flink
.
processor
.
impl
;
import
com.flink.achieve.doris.RealUsersAchi
;
import
com.flink.enums.JobTypeEnum
;
import
com.flink.enums.TopicTypeEnum
;
import
com.flink.processor.JobProcessor
;
/**
* @author wjs
* @version 创建时间:2025-5-8 11:36:11
* 类说明
*/
public
class
RealUsersProcessor
implements
JobProcessor
{
@Override
public
void
process
()
throws
Exception
{
new
RealUsersAchi
().
handleDataStreamSource
(
JobTypeEnum
.
REAL_USERS
,
TopicTypeEnum
.
ODS_USER_INVITATION
);
}
}
eagleEye-flink_kafka/src/main/java/com/flink/processor/impl/Re
alBalance
Processor.java
→
eagleEye-flink_kafka/src/main/java/com/flink/processor/impl/Re
gistrationCheck
Processor.java
View file @
b79af8cf
package
com
.
flink
.
processor
.
impl
;
import
com.flink.achieve.doris.RealBalanceAchi
;
import
java.util.Arrays
;
import
java.util.List
;
import
java.util.stream.Collectors
;
import
com.flink.achieve.doris.RegistrationCheckAchi
;
import
com.flink.enums.JobTypeEnum
;
import
com.flink.enums.TopicTypeEnum
;
import
com.flink.processor.JobProcessor
;
import
com.flink.vo.KafkaTopic
;
/**
* @author wjs
* @version 创建时间:2025-
5-8 11:36:2
0
* @version 创建时间:2025-
7-7 15:50:0
0
* 类说明
*/
public
class
Re
alBalance
Processor
implements
JobProcessor
{
public
class
Re
gistrationCheck
Processor
implements
JobProcessor
{
@Override
public
void
process
()
throws
Exception
{
new
Re
alBalance
Achi
().
handleDataStreamSource
(
JobTypeEnum
.
REAL_BALANCE
,
TopicTypeEnum
.
ODS_USER_INVITATION
new
Re
gistrationCheck
Achi
().
handleDataStreamSource
(
createTopicList
()
,
JobTypeEnum
.
REGISTRATION_CHECK
);
}
private
static
List
<
KafkaTopic
>
createTopicList
()
{
return
Arrays
.
stream
(
new
TopicTypeEnum
[]{
TopicTypeEnum
.
OPEN_SIMI_API
,
TopicTypeEnum
.
ODS_NEW_COLLECT_LOG
,
TopicTypeEnum
.
ODS_PC_COLLECT_LOG
}).
map
(
TopicTypeEnum:
:
createKafkaTopic
)
.
collect
(
Collectors
.
toList
());
}
}
eagleEye-flink_kafka/src/main/java/com/flink/util/TimeConvertUtil.java
View file @
b79af8cf
...
...
@@ -9,7 +9,6 @@ import java.time.format.DateTimeFormatter;
import
java.util.TimeZone
;
import
org.apache.flink.table.data.TimestampData
;
import
org.apache.kerby.util.SysUtil
;
/**
* @author wjs
...
...
@@ -22,6 +21,12 @@ public class TimeConvertUtil {
sdf
.
setTimeZone
(
TimeZone
.
getTimeZone
(
"UTC"
));
return
sdf
.
parse
(
timeStr
).
getTime
();
}
public
static
long
convertToTimestampSSS
(
String
timeStr
)
{
DateTimeFormatter
formatter
=
DateTimeFormatter
.
ofPattern
(
"yyyy-MM-dd HH:mm:ss.SSS"
);
LocalDateTime
dateTime
=
LocalDateTime
.
parse
(
timeStr
,
formatter
);
return
dateTime
.
atZone
(
ZoneId
.
of
(
"UTC"
)).
toInstant
().
toEpochMilli
();
}
public
static
int
convertToSqlDate
(
String
datetime
)
{
return
(
int
)
LocalDate
.
parse
(
datetime
.
split
(
" "
)[
0
]).
toEpochDay
();
// 转换为天数偏移量
...
...
@@ -67,7 +72,7 @@ public class TimeConvertUtil {
}
public
static
void
main
(
String
[]
args
)
{
String
aa
=
parseToStringSSS
(
175
0739369000
L
);
String
aa
=
parseToStringSSS
(
175
2039403729
L
);
System
.
out
.
println
(
aa
);
}
}
eagleEye-flink_kafka/src/main/java/com/flink/vo/CommunityHistory.java
0 → 100644
View file @
b79af8cf
package
com
.
flink
.
vo
;
import
java.io.Serializable
;
import
lombok.Data
;
import
lombok.ToString
;
/**
* @author wjs
* @version 创建时间:2025-7-10 16:24:25
* 类说明
*/
@Data
@ToString
public
class
CommunityHistory
implements
Serializable
{
/**
*
*/
private
static
final
long
serialVersionUID
=
1L
;
private
String
articleId
;
private
String
articleType
;
private
String
articleState
;
private
Long
createTime
;
private
String
articleUserCid
;
private
String
articleText
;
private
String
images
;
private
String
label
;
private
RiskInfo
risk
;
private
String
hotSearchTitle
;
}
eagleEye-flink_kafka/src/main/java/com/flink/vo/DeviceId.java
View file @
b79af8cf
...
...
@@ -26,6 +26,11 @@ public class DeviceId implements Serializable{
private
String
appType
;
private
String
createTime
;
private
String
brand
;
private
String
model
;
private
String
osRelease
;
private
String
appVersion
;
private
Long
collectTime
;
}
eagleEye-flink_kafka/src/main/java/com/flink/vo/DeviceIdInfo.java
View file @
b79af8cf
...
...
@@ -21,4 +21,8 @@ public class DeviceIdInfo implements Serializable{
private
String
idfv
;
private
String
deviceIdV1
;
private
String
appKey
;
private
String
brand
;
private
String
model
;
private
String
osRelease
;
private
String
appVersion
;
}
eagleEye-flink_kafka/src/main/java/com/flink/vo/DeviceRegistrationResult.java
0 → 100644
View file @
b79af8cf
package
com
.
flink
.
vo
;
import
java.io.Serializable
;
import
lombok.Data
;
import
lombok.ToString
;
/**
* @author wjs
* @version 创建时间:2025-7-8 11:11:48
* 类说明
*/
@Data
@ToString
public
class
DeviceRegistrationResult
implements
Serializable
{
/**
*
*/
private
static
final
long
serialVersionUID
=
1L
;
private
String
deviceId
;
private
Long
windowEnd
;
private
Integer
userCount
;
// 构造函数
public
DeviceRegistrationResult
()
{}
public
DeviceRegistrationResult
(
String
deviceId
,
Long
windowEnd
,
Integer
userCount
)
{
this
.
deviceId
=
deviceId
;
this
.
windowEnd
=
windowEnd
;
this
.
userCount
=
userCount
;
}
}
eagleEye-flink_kafka/src/main/java/com/flink/vo/PcEventInfo.java
View file @
b79af8cf
...
...
@@ -25,5 +25,6 @@ public class PcEventInfo implements Serializable {
private
String
phone
;
private
String
nick
;
private
String
s1
;
private
String
ap2
;
private
List
<
PcProperties
>
properties
;
}
eagleEye-flink_kafka/src/main/java/com/flink/vo/RealLead.java
0 → 100644
View file @
b79af8cf
package
com
.
flink
.
vo
;
import
java.io.Serializable
;
import
lombok.Data
;
/**
* @author wjs
* @version 创建时间:2025-7-28 15:37:43
* 类说明
*/
@Data
public
class
RealLead
implements
Serializable
{
/**
*
*/
private
static
final
long
serialVersionUID
=
1L
;
private
Integer
event_id
;
private
String
account_id
;
private
String
leader_id
;
private
String
prev_leader_id
;
private
Long
uid
;
private
Long
lid
;
private
Long
prev_lid
;
private
String
updated_at
;
private
String
created_at
;
}
eagleEye-flink_kafka/src/main/java/com/flink/vo/Result.java
View file @
b79af8cf
...
...
@@ -30,4 +30,8 @@ public class Result implements Serializable{
private
String
appKey
;
private
String
appType
;
private
String
createTime
;
private
String
brand
;
private
String
model
;
private
String
osRelease
;
private
String
appVersion
;
}
eagleEye-flink_kafka/src/main/java/com/flink/vo/RiskInfo.java
0 → 100644
View file @
b79af8cf
package
com
.
flink
.
vo
;
import
java.io.Serializable
;
import
lombok.Data
;
import
lombok.ToString
;
/**
* @author wjs
* @version 创建时间:2025-7-10 16:27:05
* 类说明
*/
@Data
@ToString
public
class
RiskInfo
implements
Serializable
{
/**
*
*/
private
static
final
long
serialVersionUID
=
1L
;
private
String
score
;
private
String
law
;
private
String
lawImage
;
private
String
politics
;
private
String
politicsImage
;
private
String
ad
;
private
String
adImage
;
}
eagleEye-flink_kafka/src/main/java/com/flink/vo/android/deviceInfo/D6.java
0 → 100644
View file @
b79af8cf
package
com
.
flink
.
vo
.
android
.
deviceInfo
;
import
java.io.Serializable
;
/**
* @author wjs
* @version 创建时间:2025-7-15 16:54:53
* 类说明
*/
public
class
D6
implements
Serializable
{
/**
*
*/
private
static
final
long
serialVersionUID
=
1L
;
private
String
l1
=
"000000000000"
;
private
String
l2
=
"000000000000"
;
private
String
l3
=
"000000000000"
;
private
String
l4
=
"000000000000"
;
private
String
l5
=
"000000000000"
;
private
String
l6
=
"000000000000"
;
private
String
l7
=
"000000000000"
;
private
String
l8
=
"000000000000"
;
private
String
l9
=
"000000000000"
;
private
String
l10
=
"000000000000"
;
private
String
l11
=
"000000000000"
;
public
String
getL1
()
{
return
l1
;
}
public
void
setL1
(
String
l1
)
{
this
.
l1
=
l1
;
}
public
String
getL2
()
{
return
l2
;
}
public
void
setL2
(
String
l2
)
{
this
.
l2
=
l2
;
}
public
String
getL3
()
{
return
l3
;
}
public
void
setL3
(
String
l3
)
{
this
.
l3
=
l3
;
}
public
String
getL4
()
{
return
l4
;
}
public
void
setL4
(
String
l4
)
{
this
.
l4
=
l4
;
}
public
String
getL5
()
{
return
l5
;
}
public
void
setL5
(
String
l5
)
{
this
.
l5
=
l5
;
}
public
String
getL6
()
{
return
l6
;
}
public
void
setL6
(
String
l6
)
{
this
.
l6
=
l6
;
}
public
String
getL7
()
{
return
l7
;
}
public
void
setL7
(
String
l7
)
{
this
.
l7
=
l7
;
}
public
String
getL8
()
{
return
l8
;
}
public
void
setL8
(
String
l8
)
{
this
.
l8
=
l8
;
}
public
String
getL9
()
{
return
l9
;
}
public
void
setL9
(
String
l9
)
{
this
.
l9
=
l9
;
}
public
String
getL10
()
{
return
l10
;
}
public
void
setL10
(
String
l10
)
{
this
.
l10
=
l10
;
}
public
String
getL11
()
{
return
l11
;
}
public
void
setL11
(
String
l11
)
{
this
.
l11
=
l11
;
}
}
Write
Preview
Markdown
is supported
0%
Try again
or
attach a new file
Attach a file
Cancel
You are about to add
0
people
to the discussion. Proceed with caution.
Finish editing this message first!
Cancel
Please
register
or
sign in
to comment