Skip to content
Toggle navigation
P
Projects
G
Groups
S
Snippets
Help
eagleEye
/
eagleEye-flink_kafka
This project
Loading...
Sign in
Toggle navigation
Go to a project
Project
Repository
Issues
0
Merge Requests
0
Pipelines
Wiki
Snippets
Settings
Activity
Graph
Charts
Create a new issue
Jobs
Commits
Issue Boards
Files
Commits
Branches
Tags
Contributors
Graph
Compare
Charts
Commit
448f91cc
authored
May 21, 2025
by
魏建枢
Browse files
Options
_('Browse Files')
Download
Email Patches
Plain Diff
代码提交
parent
6c32ea48
Hide whitespace changes
Inline
Side-by-side
Showing
6 changed files
with
24 additions
and
12 deletions
eagleEye-flink_kafka/src/main/java/com/flink/achieve/doris/RealBalanceAchi.java
eagleEye-flink_kafka/src/main/java/com/flink/achieve/doris/RealTransactionAchi.java
eagleEye-flink_kafka/src/main/java/com/flink/achieve/doris/RealUsersAchi.java
eagleEye-flink_kafka/src/main/java/com/flink/vo/RealBalance.java
eagleEye-flink_kafka/src/main/java/com/flink/vo/RealTransaction.java
eagleEye-flink_kafka/src/main/java/com/flink/vo/RealUsers.java
eagleEye-flink_kafka/src/main/java/com/flink/achieve/doris/RealBalanceAchi.java
View file @
448f91cc
...
...
@@ -55,7 +55,7 @@ public class RealBalanceAchi extends SourceCommonBase implements Serializable{
DataType
[]
types
=
{
DataTypes
.
STRING
(),
DataTypes
.
STRING
(),
DataTypes
.
INT
(),
DataTypes
.
BIG
INT
(),
DataTypes
.
STRING
(),
DataTypes
.
DOUBLE
(),
DataTypes
.
STRING
(),
...
...
@@ -82,7 +82,7 @@ public class RealBalanceAchi extends SourceCommonBase implements Serializable{
GenericRowData
row
=
new
GenericRowData
(
fields
.
length
);
row
.
setField
(
0
,
StringData
.
fromString
(
balance
.
getId
()));
// id: INT
row
.
setField
(
1
,
StringData
.
fromString
(
balance
.
getAccount_id
()));
// account_id: STRING
row
.
setField
(
2
,
balance
.
getUid
());
// uid: INT
row
.
setField
(
2
,
balance
.
getUid
()
.
longValue
()
);
// uid: INT
row
.
setField
(
3
,
StringData
.
fromString
(
balance
.
getSymbol
()));
// symbol: STRING
row
.
setField
(
4
,
balance
.
getBalance
());
// balance: DOUBLE
row
.
setField
(
5
,
StringData
.
fromString
(
balance
.
getUpdated_at
()));
// updated_at: STRING
...
...
eagleEye-flink_kafka/src/main/java/com/flink/achieve/doris/RealTransactionAchi.java
View file @
448f91cc
...
...
@@ -66,8 +66,8 @@ public class RealTransactionAchi extends SourceCommonBase implements Serializabl
DataTypes
.
STRING
(),
DataTypes
.
STRING
(),
DataTypes
.
STRING
(),
DataTypes
.
INT
(),
DataTypes
.
INT
(),
DataTypes
.
BIG
INT
(),
DataTypes
.
BIG
INT
(),
DataTypes
.
STRING
(),
DataTypes
.
DOUBLE
(),
DataTypes
.
STRING
(),
...
...
@@ -103,8 +103,8 @@ public class RealTransactionAchi extends SourceCommonBase implements Serializabl
row
.
setField
(
0
,
StringData
.
fromString
(
transaction
.
getId
()));
// id: INT
row
.
setField
(
1
,
StringData
.
fromString
(
transaction
.
getSender
()));
// sender: STRING
row
.
setField
(
2
,
StringData
.
fromString
(
transaction
.
getReceiver
()));
// receiver: STRING
row
.
setField
(
3
,
transaction
.
getSender_id
());
// sender_id: INT
row
.
setField
(
4
,
transaction
.
getReceiver_id
());
// receiver_id: INT
row
.
setField
(
3
,
transaction
.
getSender_id
()
.
longValue
()
);
// sender_id: INT
row
.
setField
(
4
,
transaction
.
getReceiver_id
()
.
longValue
()
);
// receiver_id: INT
row
.
setField
(
5
,
StringData
.
fromString
(
transaction
.
getSymbol
()));
// symbol: STRING
row
.
setField
(
6
,
transaction
.
getAmount
());
// amount: DOUBLE
row
.
setField
(
7
,
StringData
.
fromString
(
transaction
.
getMemo
()));
// memo: STRING
...
...
@@ -148,5 +148,17 @@ public class RealTransactionAchi extends SourceCommonBase implements Serializabl
// TODO Auto-generated method stub
}
public
static
void
main
(
String
[]
args
)
{
String
record
=
"{\"flume_type\": \"realTransaction\", \"data\": [{\"id\": \"116075\", \"sender\": \"1002063153.user\", \"receiver\": \"15581239495.user\", \"sender_id\": 1002063153, \"receiver_id\": 15581239495, \"symbol\": \"TDW20\", \"amount\": 30000.0, \"memo\": null, \"stage\": \"SenderReconfirmed\", \"tx_type\": \"Forced\", \"receiver_contact\": \"15581239495.user\", \"fee_mt\": \"TDW20\", \"fee_amount\": 20.000000000000004, \"updated_at\": \"2025-04-03 06:39:58\", \"created_at\": \"2025-04-03 06:39:53\"}]}"
;
JSONObject
jsonObj
=
JSON
.
parseObject
(
record
);
String
flumeType
=
jsonObj
.
getString
(
"flume_type"
);
String
bodyStr
=
jsonObj
.
getString
(
"data"
);
List
<
RealTransaction
>
list
=
JSONObject
.
parseObject
(
bodyStr
,
new
TypeReference
<
List
<
RealTransaction
>>(){});
System
.
out
.
println
(
">>>>>>>>>>>>>flumeType:"
+
flumeType
+
"/n"
);
System
.
out
.
println
(
">>>>>>>>>>>>>bodyStr:"
+
bodyStr
+
"/n"
);
System
.
out
.
println
(
">>>>>>>>>>>>>list:"
+
list
+
"/n"
);
}
}
eagleEye-flink_kafka/src/main/java/com/flink/achieve/doris/RealUsersAchi.java
View file @
448f91cc
...
...
@@ -67,7 +67,7 @@ public class RealUsersAchi extends SourceCommonBase implements Serializable{
DataTypes
.
STRING
(),
// phone_number
DataTypes
.
STRING
(),
// email
DataTypes
.
STRING
(),
// leader
DataTypes
.
INT
(),
// leader_id
DataTypes
.
BIG
INT
(),
// leader_id
DataTypes
.
STRING
(),
// kind
DataTypes
.
STRING
(),
// login_pwd_hash
DataTypes
.
STRING
(),
// answer_indexes
...
...
@@ -99,7 +99,7 @@ public class RealUsersAchi extends SourceCommonBase implements Serializable{
row
.
setField
(
1
,
StringData
.
fromString
(
user
.
getPhone_number
()));
row
.
setField
(
2
,
StringData
.
fromString
(
user
.
getEmail
()));
row
.
setField
(
3
,
StringData
.
fromString
(
user
.
getLeader
()));
row
.
setField
(
4
,
user
.
getLeader_id
());
row
.
setField
(
4
,
user
.
getLeader_id
()
.
longValue
()
);
row
.
setField
(
5
,
StringData
.
fromString
(
user
.
getKind
()));
row
.
setField
(
6
,
StringData
.
fromString
(
user
.
getLogin_pwd_hash
()));
row
.
setField
(
7
,
StringData
.
fromString
(
user
.
getAnswer_indexes
()));
...
...
eagleEye-flink_kafka/src/main/java/com/flink/vo/RealBalance.java
View file @
448f91cc
...
...
@@ -18,7 +18,7 @@ public class RealBalance implements Serializable {
private
String
id
;
private
String
account_id
;
private
Integer
uid
;
private
Long
uid
;
private
String
symbol
;
private
Double
balance
;
private
String
updated_at
;
...
...
eagleEye-flink_kafka/src/main/java/com/flink/vo/RealTransaction.java
View file @
448f91cc
...
...
@@ -19,8 +19,8 @@ public class RealTransaction implements Serializable{
private
String
id
;
private
String
sender
;
private
String
receiver
;
private
Integer
sender_id
;
private
Integer
receiver_id
;
private
Long
sender_id
;
private
Long
receiver_id
;
private
String
symbol
;
private
Double
amount
;
private
String
memo
;
...
...
eagleEye-flink_kafka/src/main/java/com/flink/vo/RealUsers.java
View file @
448f91cc
...
...
@@ -20,7 +20,7 @@ public class RealUsers implements Serializable{
private
String
phone_number
;
private
String
email
;
private
String
leader
;
private
Integer
leader_id
;
private
Long
leader_id
;
private
String
kind
;
private
String
login_pwd_hash
;
private
String
answer_indexes
;
...
...
Write
Preview
Markdown
is supported
0%
Try again
or
attach a new file
Attach a file
Cancel
You are about to add
0
people
to the discussion. Proceed with caution.
Finish editing this message first!
Cancel
Please
register
or
sign in
to comment