Commit 6e4c1e19 by 魏建枢

代码优化

parent d81d9d04
...@@ -83,7 +83,8 @@ public class EventIpConvertAchi extends SourceCommonBase implements Serializable ...@@ -83,7 +83,8 @@ public class EventIpConvertAchi extends SourceCommonBase implements Serializable
}) })
.filter(Objects::nonNull) .filter(Objects::nonNull)
// .print(">>>>>>>>>>>>>>>"); // .print(">>>>>>>>>>>>>>>");
.sinkTo(dorisSink); .sinkTo(dorisSink)
.name("Doris-EventIpConvert");
} }
public static JSONObject handleData(String record) throws ParseException, Exception { public static JSONObject handleData(String record) throws ParseException, Exception {
......
...@@ -2,6 +2,7 @@ package com.flink.achieve.doris; ...@@ -2,6 +2,7 @@ package com.flink.achieve.doris;
import java.io.Serializable; import java.io.Serializable;
import java.util.List; import java.util.List;
import java.util.Objects;
import org.apache.commons.collections.CollectionUtils; import org.apache.commons.collections.CollectionUtils;
import org.apache.commons.lang3.StringUtils; import org.apache.commons.lang3.StringUtils;
...@@ -91,7 +92,10 @@ public class RealBalanceAchi extends SourceCommonBase implements Serializable{ ...@@ -91,7 +92,10 @@ public class RealBalanceAchi extends SourceCommonBase implements Serializable{
} }
}); });
rowDataStream.sinkTo(dorisSink).name("Doris-RealBalance"); rowDataStream
.filter(Objects::nonNull)
.sinkTo(dorisSink)
.name("Doris-RealBalance");
} }
public static List<RealBalance> handleData(String record) throws ParseException, Exception { public static List<RealBalance> handleData(String record) throws ParseException, Exception {
......
...@@ -2,6 +2,7 @@ package com.flink.achieve.doris; ...@@ -2,6 +2,7 @@ package com.flink.achieve.doris;
import java.io.Serializable; import java.io.Serializable;
import java.util.List; import java.util.List;
import java.util.Objects;
import org.apache.commons.collections.CollectionUtils; import org.apache.commons.collections.CollectionUtils;
import org.apache.commons.lang3.StringUtils; import org.apache.commons.lang3.StringUtils;
...@@ -99,7 +100,10 @@ public class RealKycAchi extends SourceCommonBase implements Serializable{ ...@@ -99,7 +100,10 @@ public class RealKycAchi extends SourceCommonBase implements Serializable{
} }
}); });
rowDataStream.sinkTo(dorisSink).name("Doris-RealKyc"); rowDataStream
.filter(Objects::nonNull)
.sinkTo(dorisSink)
.name("Doris-RealKyc");
} }
......
...@@ -2,6 +2,7 @@ package com.flink.achieve.doris; ...@@ -2,6 +2,7 @@ package com.flink.achieve.doris;
import java.io.Serializable; import java.io.Serializable;
import java.util.List; import java.util.List;
import java.util.Objects;
import org.apache.commons.collections.CollectionUtils; import org.apache.commons.collections.CollectionUtils;
import org.apache.commons.lang3.StringUtils; import org.apache.commons.lang3.StringUtils;
...@@ -119,7 +120,10 @@ public class RealTransactionAchi extends SourceCommonBase implements Serializabl ...@@ -119,7 +120,10 @@ public class RealTransactionAchi extends SourceCommonBase implements Serializabl
} }
} }
}); });
rowDataStream.sinkTo(dorisSink).name("Doris-RealTransaction"); rowDataStream
.filter(Objects::nonNull)
.sinkTo(dorisSink)
.name("Doris-RealTransaction");
} }
public static List<RealTransaction> handleData(String record) throws ParseException, Exception { public static List<RealTransaction> handleData(String record) throws ParseException, Exception {
......
...@@ -2,6 +2,7 @@ package com.flink.achieve.doris; ...@@ -2,6 +2,7 @@ package com.flink.achieve.doris;
import java.io.Serializable; import java.io.Serializable;
import java.util.List; import java.util.List;
import java.util.Objects;
import org.apache.commons.collections.CollectionUtils; import org.apache.commons.collections.CollectionUtils;
import org.apache.commons.lang3.StringUtils; import org.apache.commons.lang3.StringUtils;
...@@ -115,7 +116,10 @@ public class RealUsersAchi extends SourceCommonBase implements Serializable{ ...@@ -115,7 +116,10 @@ public class RealUsersAchi extends SourceCommonBase implements Serializable{
} }
}); });
rowDataStream.sinkTo(dorisSink).name("Doris-UserSink"); rowDataStream
.filter(Objects::nonNull)
.sinkTo(dorisSink)
.name("Doris-UserSink");
} }
public static List<RealUsers> handleData(String record) throws ParseException, Exception { public static List<RealUsers> handleData(String record) throws ParseException, Exception {
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or sign in to comment