Skip to content
Toggle navigation
P
Projects
G
Groups
S
Snippets
Help
eagleEye
/
eagleEye-flink_kafka
This project
Loading...
Sign in
Toggle navigation
Go to a project
Project
Repository
Issues
0
Merge Requests
0
Pipelines
Wiki
Snippets
Settings
Activity
Graph
Charts
Create a new issue
Jobs
Commits
Issue Boards
Files
Commits
Branches
Tags
Contributors
Graph
Compare
Charts
Commit
98c426fc
authored
Jul 29, 2025
by
魏建枢
Browse files
Options
_('Browse Files')
Download
Email Patches
Plain Diff
流式处理整合
parent
316d7cd3
Hide whitespace changes
Inline
Side-by-side
Showing
7 changed files
with
194 additions
and
210 deletions
eagleEye-flink_kafka/src/main/java/com/flink/achieve/base/CollectLogAchi.java
eagleEye-flink_kafka/src/main/java/com/flink/achieve/base/CommonConsumeBaseAchi.java
eagleEye-flink_kafka/src/main/java/com/flink/achieve/base/UserInvitationAchi.java
eagleEye-flink_kafka/src/main/java/com/flink/achieve/doris/CollectLogAchi.java
eagleEye-flink_kafka/src/main/java/com/flink/enums/JobTypeEnum.java
eagleEye-flink_kafka/src/main/java/com/flink/factory/JobProcessorFactory.java
eagleEye-flink_kafka/src/main/java/com/flink/processor/impl/CollectLogProcessor.java
eagleEye-flink_kafka/src/main/java/com/flink/achieve/base/CollectLogAchi.java
View file @
98c426fc
package
com
.
flink
.
achieve
.
base
;
import
java.io.Serializable
;
import
java.time.LocalDateTime
;
import
java.time.format.DateTimeFormatter
;
import
java.util.Objects
;
import
org.apache.doris.flink.sink.DorisSink
;
import
org.apache.flink.api.common.functions.MapFunction
;
import
org.apache.flink.api.common.typeinfo.TypeInformation
;
import
org.apache.flink.streaming.api.datastream.DataStreamSource
;
import
org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator
;
import
org.apache.flink.table.api.DataTypes
;
import
org.apache.flink.table.data.GenericRowData
;
import
org.apache.flink.table.data.RowData
;
import
org.apache.flink.table.data.StringData
;
import
org.apache.flink.table.data.TimestampData
;
import
org.apache.flink.table.types.DataType
;
import
org.slf4j.Logger
;
import
org.slf4j.LoggerFactory
;
/**
* @author wjs
* @version 创建时间:2025-7-28 10:38:56
* 类说明
*/
public
class
CollectLogAchi
implements
Serializable
{
import
com.alibaba.fastjson.JSON
;
import
com.alibaba.fastjson.JSONObject
;
import
com.alibaba.fastjson.TypeReference
;
import
com.flink.common.DorisConnector
;
import
com.flink.config.TableConfig
;
import
com.flink.processor.function.GenDeviceIdProcessor
;
import
com.flink.processor.function.UserPropertiesProcessor
;
import
com.flink.util.TimeConvertUtil
;
import
com.flink.vo.DeviceIdInfo
;
import
com.flink.vo.OdsCollectLog
;
import
com.flink.vo.UserProperties
;
/**
* @author wjs
* @version 创建时间:2025-7-28 10:38:56 类说明
*/
public
class
CollectLogAchi
implements
Serializable
{
/**
*
*/
private
static
final
long
serialVersionUID
=
1L
;
private
static
final
Logger
logger
=
LoggerFactory
.
getLogger
(
CollectLogAchi
.
class
);
// 定义公共常量
private
static
final
String
FLUME_TYPE_FIELD
=
"flume_type"
;
private
static
final
String
DORIS_DELETE_SIGN
=
"__DORIS_DELETE_SIGN__"
;
private
static
final
int
DELETE_SIGN_VALUE
=
0
;
// 用户表配置
private
static
final
String
[]
COLLECT_FIELDS
=
{
"id"
,
"dt"
,
"device_id"
,
"device_id_v1"
,
"uid"
,
"idfv"
,
"app_key"
,
"app_type"
,
"other_info"
,
"device_info"
,
"env_info"
,
"cid"
,
"phone"
,
"nick"
,
"unique_id"
,
"create_time"
,
DORIS_DELETE_SIGN
};
private
static
final
DataType
[]
COLLECT_TYPES
=
{
DataTypes
.
STRING
(),
DataTypes
.
DATE
(),
DataTypes
.
STRING
(),
DataTypes
.
STRING
(),
DataTypes
.
STRING
(),
DataTypes
.
STRING
(),
DataTypes
.
STRING
(),
DataTypes
.
STRING
(),
DataTypes
.
STRING
(),
DataTypes
.
STRING
(),
DataTypes
.
STRING
(),
DataTypes
.
STRING
(),
DataTypes
.
STRING
(),
DataTypes
.
STRING
(),
DataTypes
.
STRING
(),
DataTypes
.
TIMESTAMP
(
3
),
DataTypes
.
INT
()
};
public
static
void
collectLog
(
DataStreamSource
<
String
>
dataStreamSource
)
{
// 初始化表配置
TableConfig
collectConfig
=
new
TableConfig
(
COLLECT_FIELDS
,
COLLECT_TYPES
,
"bi.collect_log"
);
// 创建Doris Sink
DorisSink
<
RowData
>
dorisCollectSink
=
DorisConnector
.
sinkDoris
(
collectConfig
.
getFields
(),
collectConfig
.
getTypes
(),
collectConfig
.
getTableName
());
// 处理用户数据
processDataStream
(
dataStreamSource
,
"newCollectLog"
,
collectConfig
,
dorisCollectSink
,
(
RowMapper
<
String
>)
CollectLogAchi:
:
mapToCollectRow
);
}
private
static
<
T
>
void
processDataStream
(
DataStreamSource
<
String
>
dataStream
,
String
flumeType
,
TableConfig
tableConfig
,
DorisSink
<
RowData
>
dorisSink
,
RowMapper
<
String
>
rowMapper
)
{
SingleOutputStreamOperator
<
RowData
>
processedStream
=
dataStream
.
map
(
new
ElementProcessorWithMap
<>(
flumeType
,
rowMapper
,
tableConfig
.
getFields
().
length
))
.
returns
(
TypeInformation
.
of
(
RowData
.
class
)).
filter
(
Objects:
:
nonNull
);
processedStream
.
sinkTo
(
dorisSink
).
name
(
"Doris-"
+
flumeType
);
}
public
static
void
collectLog
(
DataStreamSource
<
String
>
collectLogStreamSource
)
{
// TODO Auto-generated method stub
/**
* 使用map算子的内部处理类
*/
private
static
class
ElementProcessorWithMap
<
T
>
implements
MapFunction
<
String
,
RowData
>,
Serializable
{
private
static
final
long
serialVersionUID
=
1L
;
private
final
String
flumeType
;
private
final
RowMapper
<
String
>
mapper
;
private
final
int
fieldCount
;
public
ElementProcessorWithMap
(
String
flumeType
,
RowMapper
<
String
>
mapper
,
int
fieldCount
)
{
this
.
flumeType
=
flumeType
;
this
.
mapper
=
mapper
;
this
.
fieldCount
=
fieldCount
;
}
@Override
public
RowData
map
(
String
value
)
throws
Exception
{
try
{
JSONObject
jsonObj
=
JSON
.
parseObject
(
value
);
if
(!
flumeType
.
equals
(
jsonObj
.
getString
(
FLUME_TYPE_FIELD
)))
{
return
null
;
}
return
mapper
.
map
(
value
,
fieldCount
);
}
catch
(
Exception
e
)
{
logger
.
error
(
"CollectLogAchi 处理 {} 数据出错 | rawData:{} | error:{}"
,
flumeType
,
value
,
e
.
getMessage
(),
e
);
}
return
null
;
}
}
// 设备日志采集数据映射
private
static
RowData
mapToCollectRow
(
String
value
,
int
fieldCount
)
{
logger
.
error
(
"CollectLogAchi mapToCollectRow 数据打印 | rawData:{} | fieldCount:{}"
,
value
,
fieldCount
);
OdsCollectLog
log
=
JSON
.
parseObject
(
value
,
new
TypeReference
<
OdsCollectLog
>()
{});
String
appType
=
log
.
getApp_type
();
String
appKey
=
log
.
getApp_key
();
String
other_info
=
log
.
getOther_info
();
String
device_info
=
log
.
getDevice_info
();
String
env_info
=
log
.
getEnv_info
();
String
createTime
=
log
.
getCreate_time
();
DeviceIdInfo
deviceIdInfo
=
GenDeviceIdProcessor
.
genDeviceId
(
appType
,
appKey
,
other_info
,
device_info
,
env_info
);
UserProperties
userProperties
=
UserPropertiesProcessor
.
userPropertiesToJson
(
log
.
getUser_properties
());
GenericRowData
row
=
new
GenericRowData
(
fieldCount
);
row
.
setField
(
0
,
StringData
.
fromString
(
log
.
getId
()));
row
.
setField
(
1
,
TimeConvertUtil
.
convertToSqlDate
(
createTime
.
substring
(
0
,
10
)));
row
.
setField
(
2
,
StringData
.
fromString
(
log
.
getDevice_id
()));
row
.
setField
(
3
,
StringData
.
fromString
(
deviceIdInfo
==
null
?
null
:
deviceIdInfo
.
getDeviceIdV1
()));
row
.
setField
(
4
,
StringData
.
fromString
(
log
.
getUid
()));
row
.
setField
(
5
,
StringData
.
fromString
(
deviceIdInfo
==
null
?
null
:
deviceIdInfo
.
getIdfv
()));
row
.
setField
(
6
,
StringData
.
fromString
(
deviceIdInfo
.
getAppKey
()));
row
.
setField
(
7
,
StringData
.
fromString
(
appType
));
row
.
setField
(
8
,
StringData
.
fromString
(
other_info
));
row
.
setField
(
9
,
StringData
.
fromString
(
device_info
));
row
.
setField
(
10
,
StringData
.
fromString
(
env_info
));
row
.
setField
(
11
,
StringData
.
fromString
(
userProperties
==
null
?
null
:
userProperties
.
getCid
()));
row
.
setField
(
12
,
StringData
.
fromString
(
userProperties
==
null
?
null
:
userProperties
.
getPhone
()));
row
.
setField
(
13
,
StringData
.
fromString
(
userProperties
==
null
?
null
:
userProperties
.
getNick
()));
row
.
setField
(
14
,
StringData
.
fromString
(
log
.
getUnique_id
()));
row
.
setField
(
15
,
TimestampData
.
fromLocalDateTime
(
LocalDateTime
.
parse
(
createTime
,
DateTimeFormatter
.
ofPattern
(
"yyyy-MM-dd HH:mm:ss.SSS"
))));
row
.
setField
(
16
,
DELETE_SIGN_VALUE
);
return
row
;
}
/**
* 行数据映射接口
*
* @param <T> 数据类型
*/
@FunctionalInterface
private
static
interface
RowMapper
<
T
>
extends
Serializable
{
RowData
map
(
T
item
,
int
fieldCount
);
}
}
eagleEye-flink_kafka/src/main/java/com/flink/achieve/base/CommonConsumeBaseAchi.java
View file @
98c426fc
...
...
@@ -36,14 +36,14 @@ public class CommonConsumeBaseAchi extends MultipleSourceCommonBase implements S
DataStreamSource
<
String
>
userInvitationStreamSource
=
kafkaDataSource
.
getDataStreamSource
();
UserInvitationAchi
.
userInvitation
(
userInvitationStreamSource
);
}
if
(
StringUtils
.
equals
(
kafkaDataSource
.
getTopic
(),
TopicTypeEnum
.
ODS_EVENT_LOG
.
getTopic
()))
{
DataStreamSource
<
String
>
eventLogStreamSource
=
kafkaDataSource
.
getDataStreamSource
();
EventLogAchi
.
eventLog
(
eventLogStreamSource
);
}
if
(
StringUtils
.
equals
(
kafkaDataSource
.
getTopic
(),
TopicTypeEnum
.
ODS_COMMUNITY_HISTORY
.
getTopic
()))
{
DataStreamSource
<
String
>
communityHistoryStreamSource
=
kafkaDataSource
.
getDataStreamSource
();
CommunityHistoryAchi
.
communityHistory
(
communityHistoryStreamSource
);
}
//
if(StringUtils.equals(kafkaDataSource.getTopic(),TopicTypeEnum.ODS_EVENT_LOG.getTopic())) {
//
DataStreamSource<String> eventLogStreamSource = kafkaDataSource.getDataStreamSource();
//
EventLogAchi.eventLog(eventLogStreamSource);
//
}
//
if(StringUtils.equals(kafkaDataSource.getTopic(),TopicTypeEnum.ODS_COMMUNITY_HISTORY.getTopic())) {
//
DataStreamSource<String> communityHistoryStreamSource = kafkaDataSource.getDataStreamSource();
//
CommunityHistoryAchi.communityHistory(communityHistoryStreamSource);
//
}
if
(
StringUtils
.
equals
(
kafkaDataSource
.
getTopic
(),
TopicTypeEnum
.
ODS_NEW_COLLECT_LOG
.
getTopic
()))
{
DataStreamSource
<
String
>
collectLogStreamSource
=
kafkaDataSource
.
getDataStreamSource
();
CollectLogAchi
.
collectLog
(
collectLogStreamSource
);
...
...
eagleEye-flink_kafka/src/main/java/com/flink/achieve/base/UserInvitationAchi.java
View file @
98c426fc
...
...
@@ -21,7 +21,6 @@ import org.slf4j.LoggerFactory;
import
com.alibaba.fastjson.JSON
;
import
com.alibaba.fastjson.JSONObject
;
import
com.alibaba.fastjson.TypeReference
;
import
com.flink.common.DorisConnector
;
import
com.flink.config.TableConfig
;
import
com.flink.vo.RealBalance
;
...
...
@@ -39,13 +38,13 @@ public class UserInvitationAchi implements Serializable {
private
static
final
long
serialVersionUID
=
1L
;
private
static
final
Logger
logger
=
LoggerFactory
.
getLogger
(
UserInvitationAchi
.
class
);
//
定义公共常量
//定义公共常量
private
static
final
String
FLUME_TYPE_FIELD
=
"flume_type"
;
private
static
final
String
DATA_FIELD
=
"data"
;
private
static
final
String
DORIS_DELETE_SIGN
=
"__DORIS_DELETE_SIGN__"
;
private
static
final
int
DELETE_SIGN_VALUE
=
0
;
//
用户表配置
//用户表配置
private
static
final
String
[]
USERS_FIELDS
=
{
"id"
,
"phone_number"
,
"email"
,
"leader"
,
"leader_id"
,
"kind"
,
"login_pwd_hash"
,
"answer_indexes"
,
"main_account"
,
"device_manage_state"
,
"state"
,
"token_version"
,
"updated_at"
,
"created_at"
,
DORIS_DELETE_SIGN
};
...
...
@@ -55,7 +54,7 @@ public class UserInvitationAchi implements Serializable {
DataTypes
.
STRING
(),
DataTypes
.
STRING
(),
DataTypes
.
STRING
(),
DataTypes
.
STRING
(),
DataTypes
.
STRING
(),
DataTypes
.
STRING
(),
DataTypes
.
INT
()
};
//
交易表配置
//交易表配置
private
static
final
String
[]
TRANSACTION_FIELDS
=
{
"id"
,
"sender"
,
"receiver"
,
"sender_id"
,
"receiver_id"
,
"symbol"
,
"amount"
,
"memo"
,
"stage"
,
"tx_type"
,
"receiver_contact"
,
"fee_mt"
,
"fee_amount"
,
"updated_at"
,
"created_at"
,
DORIS_DELETE_SIGN
};
...
...
@@ -65,7 +64,7 @@ public class UserInvitationAchi implements Serializable {
DataTypes
.
STRING
(),
DataTypes
.
STRING
(),
DataTypes
.
STRING
(),
DataTypes
.
STRING
(),
DataTypes
.
STRING
(),
DataTypes
.
STRING
(),
DataTypes
.
STRING
(),
DataTypes
.
INT
()
};
//
KYC表配置
//KYC表配置
private
static
final
String
[]
KYC_FIELDS
=
{
"id"
,
"kind"
,
"procedure_verdict"
,
"admin_verdict"
,
"admin"
,
"memo"
,
"created_at"
,
"updated_at"
,
DORIS_DELETE_SIGN
};
...
...
@@ -73,14 +72,14 @@ public class UserInvitationAchi implements Serializable {
DataTypes
.
STRING
(),
DataTypes
.
STRING
(),
DataTypes
.
STRING
(),
DataTypes
.
STRING
(),
DataTypes
.
STRING
(),
DataTypes
.
INT
()
};
//
余额表配置
//余额表配置
private
static
final
String
[]
BALANCE_FIELDS
=
{
"id"
,
"account_id"
,
"uid"
,
"symbol"
,
"balance"
,
"updated_at"
,
DORIS_DELETE_SIGN
};
private
static
final
DataType
[]
BALANCE_TYPES
=
{
DataTypes
.
STRING
(),
DataTypes
.
STRING
(),
DataTypes
.
BIGINT
(),
DataTypes
.
STRING
(),
DataTypes
.
DOUBLE
(),
DataTypes
.
STRING
(),
DataTypes
.
INT
()
};
//
真实上级表配置
//真实上级表配置
private
static
final
String
[]
LEAD_FIELDS
=
{
"event_id"
,
"account_id"
,
"leader_id"
,
"prev_leader_id"
,
"uid"
,
"lid"
,
"prev_lid"
,
"updated_at"
,
"created_at"
,
DORIS_DELETE_SIGN
};
...
...
@@ -109,9 +108,11 @@ public class UserInvitationAchi implements Serializable {
leadConfig
.
getTableName
());
// 处理用户数据
processDataStream
(
dataStreamSource
,
"realUsers"
,
usersConfig
,
dorisUsersSink
,(
RowMapper
<
RealUsers
>)
UserInvitationAchi:
:
mapToUsersRow
);
// processDataStream(dataStreamSource, "realUsers", usersConfig, dorisUsersSink,(RowMapper<RealUsers>) UserInvitationAchi::mapToUsersRow);
processDataStream
(
dataStreamSource
,
"realUsers"
,
usersConfig
,
dorisUsersSink
,(
item
,
fieldCount
)
->
mapToUsersRow
((
RealUsers
)
item
,
fieldCount
));
// 处理交易数据
processDataStream
(
dataStreamSource
,
"realTransaction"
,
transactionConfig
,
dorisTransactionSink
,(
RowMapper
<
RealTransaction
>)
UserInvitationAchi:
:
mapToTransactionRow
);
// processDataStream(dataStreamSource, "realTransaction", transactionConfig, dorisTransactionSink,(RowMapper<RealTransaction>) UserInvitationAchi::mapToTransactionRow);
processDataStream
(
dataStreamSource
,
"realTransaction"
,
transactionConfig
,
dorisTransactionSink
,(
item
,
fieldCount
)
->
mapToTransactionRow
((
RealTransaction
)
item
,
fieldCount
));
//处理KYC数据
processDataStream
(
dataStreamSource
,
"realKyc"
,
keyConfig
,
dorisKeySink
,(
RowMapper
<
RealKyc
>)
UserInvitationAchi:
:
mapToKycRow
);
//处理余额数据
...
...
@@ -163,21 +164,35 @@ public class UserInvitationAchi implements Serializable {
return
;
}
String
bodyStr
=
jsonObj
.
getString
(
DATA_FIELD
);
List
<
T
>
dataList
=
JSON
.
parseObject
(
bodyStr
,
new
TypeReference
<
List
<
T
>>()
{}
);
List
<
?>
dataList
=
JSON
.
parseArray
(
bodyStr
,
getTargetClass
(
flumeType
)
);
if
(
CollectionUtils
.
isNotEmpty
(
dataList
))
{
for
(
T
item
:
dataList
)
{
out
.
collect
(
mapper
.
map
(
item
,
fieldCount
));
}
}
for
(
Object
item
:
dataList
)
{
RowData
row
=
mapper
.
map
(
item
,
fieldCount
);
if
(
row
!=
null
)
out
.
collect
(
row
);
}
}
}
catch
(
Exception
e
)
{
logger
.
error
(
"处理 {} 数据出错 | rawData:{} | error:{}"
,
flumeType
,
value
,
e
.
getMessage
(),
e
);
logger
.
error
(
"
UserInvitationAchi
处理 {} 数据出错 | rawData:{} | error:{}"
,
flumeType
,
value
,
e
.
getMessage
(),
e
);
}
}
private
Class
<?>
getTargetClass
(
String
type
)
{
switch
(
type
)
{
case
"realUsers"
:
return
RealUsers
.
class
;
case
"realTransaction"
:
return
RealTransaction
.
class
;
case
"realKyc"
:
return
RealKyc
.
class
;
case
"realBalance"
:
return
RealBalance
.
class
;
case
"realLead"
:
return
RealLead
.
class
;
default
:
throw
new
IllegalArgumentException
(
"未知类型: "
+
type
);
}
}
}
// 用户数据映射
private
static
RowData
mapToUsersRow
(
RealUsers
user
,
int
fieldCount
)
{
private
static
RowData
mapToUsersRow
(
Object
item
,
int
fieldCount
)
{
RealUsers
user
=
(
RealUsers
)
item
;
// 显式类型转换
GenericRowData
row
=
new
GenericRowData
(
fieldCount
);
row
.
setField
(
0
,
StringData
.
fromString
(
user
.
getId
()));
row
.
setField
(
1
,
StringData
.
fromString
(
user
.
getPhone_number
()));
...
...
@@ -198,7 +213,8 @@ public class UserInvitationAchi implements Serializable {
}
// 交易数据映射
private
static
RowData
mapToTransactionRow
(
RealTransaction
transaction
,
int
fieldCount
)
{
private
static
RowData
mapToTransactionRow
(
Object
item
,
int
fieldCount
)
{
RealTransaction
transaction
=
(
RealTransaction
)
item
;
// 显式类型转换
GenericRowData
row
=
new
GenericRowData
(
fieldCount
);
row
.
setField
(
0
,
StringData
.
fromString
(
transaction
.
getId
()));
row
.
setField
(
1
,
StringData
.
fromString
(
transaction
.
getSender
()));
...
...
@@ -220,7 +236,8 @@ public class UserInvitationAchi implements Serializable {
}
// KYC数据映射
private
static
RowData
mapToKycRow
(
RealKyc
kyc
,
int
fieldCount
)
{
private
static
RowData
mapToKycRow
(
Object
item
,
int
fieldCount
)
{
RealKyc
kyc
=
(
RealKyc
)
item
;
// 显式类型转换
GenericRowData
row
=
new
GenericRowData
(
fieldCount
);
row
.
setField
(
0
,
StringData
.
fromString
(
kyc
.
getId
()));
row
.
setField
(
1
,
StringData
.
fromString
(
kyc
.
getKind
()));
...
...
@@ -235,7 +252,8 @@ public class UserInvitationAchi implements Serializable {
}
// 余额数据映射
private
static
RowData
mapToBalanceRow
(
RealBalance
balance
,
int
fieldCount
)
{
private
static
RowData
mapToBalanceRow
(
Object
item
,
int
fieldCount
)
{
RealBalance
balance
=
(
RealBalance
)
item
;
GenericRowData
row
=
new
GenericRowData
(
fieldCount
);
row
.
setField
(
0
,
StringData
.
fromString
(
balance
.
getId
()));
row
.
setField
(
1
,
StringData
.
fromString
(
balance
.
getAccount_id
()));
...
...
@@ -248,7 +266,8 @@ public class UserInvitationAchi implements Serializable {
}
// 真实上级 数据映射
private
static
RowData
mapToLeadRow
(
RealLead
lead
,
int
fieldCount
)
{
private
static
RowData
mapToLeadRow
(
Object
item
,
int
fieldCount
)
{
RealLead
lead
=
(
RealLead
)
item
;
GenericRowData
row
=
new
GenericRowData
(
fieldCount
);
row
.
setField
(
0
,
lead
.
getEvent_id
());
row
.
setField
(
1
,
StringData
.
fromString
(
lead
.
getAccount_id
()));
...
...
@@ -270,6 +289,6 @@ public class UserInvitationAchi implements Serializable {
*/
@FunctionalInterface
private
static
interface
RowMapper
<
T
>
extends
Serializable
{
RowData
map
(
T
item
,
int
fieldCount
);
RowData
map
(
Object
item
,
int
fieldCount
);
}
}
\ No newline at end of file
eagleEye-flink_kafka/src/main/java/com/flink/achieve/doris/CollectLogAchi.java
deleted
100644 → 0
View file @
316d7cd3
package
com
.
flink
.
achieve
.
doris
;
import
java.io.Serializable
;
import
java.time.LocalDateTime
;
import
java.time.format.DateTimeFormatter
;
import
java.util.Objects
;
import
org.apache.doris.flink.sink.DorisSink
;
import
org.apache.flink.api.common.io.ParseException
;
import
org.apache.flink.streaming.api.datastream.DataStreamSource
;
import
org.apache.flink.table.api.DataTypes
;
import
org.apache.flink.table.data.GenericRowData
;
import
org.apache.flink.table.data.RowData
;
import
org.apache.flink.table.data.StringData
;
import
org.apache.flink.table.data.TimestampData
;
import
org.apache.flink.table.types.DataType
;
import
org.slf4j.Logger
;
import
org.slf4j.LoggerFactory
;
import
com.alibaba.fastjson.JSONObject
;
import
com.alibaba.fastjson.TypeReference
;
import
com.flink.common.DorisConnector
;
import
com.flink.common.SourceCommonBase
;
import
com.flink.processor.function.GenDeviceIdProcessor
;
import
com.flink.processor.function.UserPropertiesProcessor
;
import
com.flink.util.TimeConvertUtil
;
import
com.flink.vo.CollectLog
;
import
com.flink.vo.DeviceIdInfo
;
import
com.flink.vo.UserProperties
;
/**
* @author wjs
* @version 创建时间:2025-5-21 11:42:51
* 类说明
*/
public
class
CollectLogAchi
extends
SourceCommonBase
implements
Serializable
{
/**
*
*/
private
static
final
long
serialVersionUID
=
1L
;
private
static
final
Logger
logger
=
LoggerFactory
.
getLogger
(
CollectLogAchi
.
class
);
@Override
public
void
parseSourceKafkaJson
(
DataStreamSource
<
String
>
dataStreamSource
)
throws
ParseException
,
Exception
{
//=================配置入库字段=========================================
String
[]
fields
=
{
"id"
,
"dt"
,
"device_id"
,
"device_id_v1"
,
"uid"
,
"idfv"
,
"app_key"
,
"app_type"
,
"other_info"
,
"device_info"
,
"env_info"
,
"cid"
,
"phone"
,
"nick"
,
"unique_id"
,
"create_time"
,
"__DORIS_DELETE_SIGN__"
};
DataType
[]
types
=
{
DataTypes
.
STRING
(),
DataTypes
.
DATE
(),
DataTypes
.
STRING
(),
DataTypes
.
STRING
(),
DataTypes
.
STRING
(),
DataTypes
.
STRING
(),
DataTypes
.
STRING
(),
DataTypes
.
STRING
(),
DataTypes
.
STRING
(),
DataTypes
.
STRING
(),
DataTypes
.
STRING
(),
DataTypes
.
STRING
(),
DataTypes
.
STRING
(),
DataTypes
.
STRING
(),
DataTypes
.
STRING
(),
DataTypes
.
TIMESTAMP
(
3
),
DataTypes
.
INT
()
};
//=================流式处理=========================================
String
tableName
=
"bi.collect_log"
;
DorisSink
<
RowData
>
dorisSink
=
DorisConnector
.
sinkDoris
(
fields
,
types
,
tableName
);
//=================数据处理流水线=========================================
dataStreamSource
.
map
(
value
->{
try
{
// 解析 Kafka 数据
CollectLog
log
=
JSONObject
.
parseObject
(
value
,
new
TypeReference
<
CollectLog
>(){});
if
(
null
==
log
)
{
return
null
;
}
String
appType
=
log
.
getAppType
();
String
appKey
=
log
.
getAppKey
();
String
other_info
=
log
.
getOtherInfo
();
String
device_info
=
log
.
getDeviceInfo
();
String
env_info
=
log
.
getEnvInfo
();
String
createTime
=
log
.
getCreateTime
();
DeviceIdInfo
deviceIdInfo
=
GenDeviceIdProcessor
.
genDeviceId
(
appType
,
appKey
,
other_info
,
device_info
,
env_info
);
UserProperties
userProperties
=
UserPropertiesProcessor
.
userPropertiesToJson
(
log
.
getUserProperties
());
// 转换为RowData
GenericRowData
row
=
new
GenericRowData
(
fields
.
length
);
row
.
setField
(
0
,
StringData
.
fromString
(
log
.
getId
()));
// id
row
.
setField
(
1
,
TimeConvertUtil
.
convertToSqlDate
(
createTime
.
substring
(
0
,
10
)));
// dt
row
.
setField
(
2
,
StringData
.
fromString
(
log
.
getDeviceId
()));
// device_id
row
.
setField
(
3
,
StringData
.
fromString
(
deviceIdInfo
==
null
?
null
:
deviceIdInfo
.
getDeviceIdV1
()));
// device_id_v1
row
.
setField
(
4
,
StringData
.
fromString
(
log
.
getUid
()));
// uid
row
.
setField
(
5
,
StringData
.
fromString
(
deviceIdInfo
==
null
?
null
:
deviceIdInfo
.
getIdfv
()));
// idfv
row
.
setField
(
6
,
StringData
.
fromString
(
deviceIdInfo
.
getAppKey
()));
// app_key
row
.
setField
(
7
,
StringData
.
fromString
(
log
.
getAppType
()));
// app_type
row
.
setField
(
8
,
StringData
.
fromString
(
other_info
));
// other_info
row
.
setField
(
9
,
StringData
.
fromString
(
device_info
));
// device_info
row
.
setField
(
10
,
StringData
.
fromString
(
env_info
));
// env_info
row
.
setField
(
11
,
StringData
.
fromString
(
userProperties
==
null
?
null
:
userProperties
.
getCid
()));
// cid
row
.
setField
(
12
,
StringData
.
fromString
(
userProperties
==
null
?
null
:
userProperties
.
getPhone
()));
// phone
row
.
setField
(
13
,
StringData
.
fromString
(
userProperties
==
null
?
null
:
userProperties
.
getNick
()));
// nick
row
.
setField
(
14
,
StringData
.
fromString
(
log
.
getUniqueId
()));
// unique_id
row
.
setField
(
15
,
TimestampData
.
fromLocalDateTime
(
LocalDateTime
.
parse
(
createTime
,
DateTimeFormatter
.
ofPattern
(
"yyyy-MM-dd HH:mm:ss.SSS"
))));
//
row
.
setField
(
16
,
0
);
// __DORIS_DELETE_SIGN__
return
(
RowData
)
row
;
}
catch
(
Exception
e
)
{
System
.
err
.
println
(
"解析失败: "
+
e
.
toString
());
return
null
;
}
})
.
filter
(
Objects:
:
nonNull
)
// .print()
.
sinkTo
(
dorisSink
)
.
name
(
"Doris-CollectLog"
);
}
@Override
public
void
sendToSinkKafka
(
DataStreamSource
<
String
>
mStream
)
{
// TODO Auto-generated method stub
}
}
eagleEye-flink_kafka/src/main/java/com/flink/enums/JobTypeEnum.java
View file @
98c426fc
...
...
@@ -20,7 +20,6 @@ public enum JobTypeEnum {
EVENT_IP_CONVERT
(
"JOB_01"
,
"事件IP转换作业"
),
COMMON_CONSUME_BASE
(
"JOB_02"
,
"公共基础消费采集作业"
),
COLLECT_LOG
(
"JOB_06"
,
"日志采集作业"
),
EVENT_IP_CONVERT_CID
(
"JOB_07"
,
"最新事件IP作业"
),
DEVICE_ID_CID
(
"JOB_08"
,
"最新设备ID作业"
),
SIMI_FRIENDS
(
"JOB_09"
,
"SIMI好友作业"
),
...
...
eagleEye-flink_kafka/src/main/java/com/flink/factory/JobProcessorFactory.java
View file @
98c426fc
...
...
@@ -2,7 +2,6 @@ package com.flink.factory;
import
com.flink.enums.JobTypeEnum
;
import
com.flink.processor.JobProcessor
;
import
com.flink.processor.impl.CollectLogProcessor
;
import
com.flink.processor.impl.CommonConsumeBaseProcessor
;
import
com.flink.processor.impl.DeviceIdLatestProcessor
;
import
com.flink.processor.impl.EventIpConvertProcessor
;
...
...
@@ -24,8 +23,6 @@ public class JobProcessorFactory {
switch
(
jobType
)
{
case
EVENT_IP_CONVERT:
return
new
EventIpConvertProcessor
();
case
COLLECT_LOG:
return
new
CollectLogProcessor
();
case
EVENT_IP_CONVERT_CID:
return
new
EventIpLatestProcessor
();
case
DEVICE_ID_CID:
...
...
eagleEye-flink_kafka/src/main/java/com/flink/processor/impl/CollectLogProcessor.java
deleted
100644 → 0
View file @
316d7cd3
package
com
.
flink
.
processor
.
impl
;
import
com.flink.achieve.doris.CollectLogAchi
;
import
com.flink.enums.JobTypeEnum
;
import
com.flink.enums.TopicTypeEnum
;
import
com.flink.processor.JobProcessor
;
/**
* @author wjs
* @version 创建时间:2025-5-21 11:42:10
* 类说明
*/
public
class
CollectLogProcessor
implements
JobProcessor
{
@Override
public
void
process
()
throws
Exception
{
new
CollectLogAchi
().
handleDataStreamSource
(
JobTypeEnum
.
COLLECT_LOG
,
TopicTypeEnum
.
ODS_NEW_COLLECT_LOG
);
}
}
Write
Preview
Markdown
is supported
0%
Try again
or
attach a new file
Attach a file
Cancel
You are about to add
0
people
to the discussion. Proceed with caution.
Finish editing this message first!
Cancel
Please
register
or
sign in
to comment