Commit db8a7d63 by 魏建枢

代码提交

parent 31a40451
package com.flink.achieve.doris;
import java.io.Serializable;
import java.time.Instant;
import java.time.LocalDate;
import java.time.LocalDateTime;
import java.time.format.DateTimeFormatter;
......
package com.flink.achieve.doris;
import java.io.Serializable;
import java.time.Instant;
import java.time.LocalDateTime;
import java.time.format.DateTimeFormatter;
import java.util.ArrayList;
import java.util.List;
import java.util.Objects;
......@@ -25,7 +24,7 @@ import com.alibaba.fastjson.JSONObject;
import com.alibaba.fastjson.TypeReference;
import com.flink.common.DorisConnector;
import com.flink.common.SourceCommonBase;
import com.flink.enums.AppTypeEnum;
import com.flink.util.CompareUtils;
import com.flink.util.ip2region.SearcherUtil;
import com.flink.vo.OdsEventLog;
import com.flink.vo.UserProperties;
......@@ -47,9 +46,27 @@ public class EventIpConvertAchi extends SourceCommonBase implements Serializable
public void parseSourceKafkaJson(DataStreamSource<String> dataStreamSource) throws ParseException, Exception {
//=================配置入库字段=========================================
String[] fields = {"id", "ip", "area_name", "device_id", "cid", "phone", "nick","create_time","dt","__DORIS_DELETE_SIGN__"};
DataType[] types = {DataTypes.STRING(), DataTypes.STRING(), DataTypes.STRING(), DataTypes.STRING(),
DataTypes.STRING(), DataTypes.STRING(), DataTypes.STRING(),DataTypes.STRING(),DataTypes.TIMESTAMP(),DataTypes.INT()};
String[] fields = {
"id",
"ip",
"area_name",
"device_id",
"cid",
"phone",
"nick",
"create_time",
"__DORIS_DELETE_SIGN__"
};
DataType[] types = {
DataTypes.STRING(),
DataTypes.STRING(),
DataTypes.STRING(),
DataTypes.STRING(),
DataTypes.STRING(),
DataTypes.STRING(),
DataTypes.STRING(),
DataTypes.TIMESTAMP(3),
DataTypes.INT()};
//=================流式处理=========================================
String tableName = "bi.event_ip_convert";
DorisSink<RowData> dorisSink = DorisConnector.sinkDoris(fields, types, tableName);
......@@ -62,9 +79,7 @@ public class EventIpConvertAchi extends SourceCommonBase implements Serializable
return null;
}
GenericRowData row = new GenericRowData(fields.length);
DateTimeFormatter formatter = DateTimeFormatter.ISO_INSTANT;
System.out.println("value" + value);
String createTime = (String) jsonObj.get("createTime");
// 字段映射
row.setField(0, StringData.fromString((String)jsonObj.get("id")));
row.setField(1, StringData.fromString((String) jsonObj.get("ips")));
......@@ -73,9 +88,8 @@ public class EventIpConvertAchi extends SourceCommonBase implements Serializable
row.setField(4, StringData.fromString((String) jsonObj.get("cid")));
row.setField(5, StringData.fromString((String) jsonObj.get("phone")));
row.setField(6, StringData.fromString((String) jsonObj.get("nick")));
row.setField(7, StringData.fromString((String) jsonObj.get("createTime")));
row.setField(8, TimestampData.fromInstant(Instant.now()));
row.setField(9, 0);
row.setField(7, TimestampData.fromLocalDateTime(LocalDateTime.parse(createTime,DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss.SSS"))));
row.setField(8, 0);
return (RowData)row;
} catch (Exception e) {
System.err.println("解析失败: "+e.toString());
......@@ -83,7 +97,7 @@ public class EventIpConvertAchi extends SourceCommonBase implements Serializable
}
})
.filter(Objects::nonNull)
// .print(">>>>>>>>>>>>>>>");
// .print(">>>>>>>>>>>>>>>")
.sinkTo(dorisSink)
.name("Doris-EventIpConvert");
}
......@@ -93,24 +107,10 @@ public class EventIpConvertAchi extends SourceCommonBase implements Serializable
// TODO 数据的 ETL 处理
OdsEventLog odsEventLog = JSONObject.parseObject(record,new TypeReference<OdsEventLog>(){});
String id = odsEventLog.getId();
String sendTime = odsEventLog.getSend_time();
String createTime = odsEventLog.getCreate_time();
String strategyGroup_id = odsEventLog.getStrategy_group_id();
String appKey =odsEventLog.getApp_key();
String appType = odsEventLog.getApp_type();
String appChannel = odsEventLog.getApp_channel();
String zoneCode = odsEventLog.getZone_code();
String zoneName = odsEventLog.getZone_name();
String zoneType = odsEventLog.getZone_type();
String sdkVersion = odsEventLog.getSdk_version();
String userAgent = odsEventLog.getUser_agent();
String deviceId = odsEventLog.getDevice_id();
String uid = odsEventLog.getUid();
String strategyVersion = odsEventLog.getStrategy_version();
String eventList = odsEventLog.getEvent_list();
String routeIp = odsEventLog.getRoute_ip();
String userProperties = odsEventLog.getUser_properties();
String uniqueId = odsEventLog.getUnique_id();
logger.info("组装数据 body:{}",odsEventLog.toString());
String cid = null;
String phone = null;
......@@ -137,32 +137,35 @@ public class EventIpConvertAchi extends SourceCommonBase implements Serializable
if(CollectionUtils.isEmpty(ips)) {
return null;
}
List<String> ipList = new ArrayList<>();
List<String> areaNameList = new ArrayList<>();
String ip_name = null;
String area_name = null;
for(String ip:ips) {
if(!SearcherUtil.ipv6(ip)) {
String area_name = SearcherUtil.getCityInfoByFile(ip);
ipList.add(ip);
areaNameList.add(area_name);
area_name = SearcherUtil.getCityInfoByFile(ip);
if(!CompareUtils.stringExists(area_name,
"0|0|0|内网IP|内网IP",
"0|0|0|内网IP|Finance-and-Promoting-Technology")) {
ip_name = ip;
break;
}else {
ip_name = null;
area_name = null;
}
}
}
if(StringUtils.isEmpty(ip_name)) {
return null;
}
logger.info("组装数据开始");
JSONObject jsonObj = new JSONObject();
jsonObj.put("id", id);
jsonObj.put("ips", ips.toString());
jsonObj.put("areaNameList", areaNameList.toString());
jsonObj.put("ips", ip_name);
jsonObj.put("areaNameList", area_name);
jsonObj.put("deviceId", deviceId);
jsonObj.put("cid", cid);
jsonObj.put("phone", phone);
jsonObj.put("nick", nick);
jsonObj.put("createTime", createTime);
if(StringUtils.equals(appType, AppTypeEnum.ANDROID.getCode())) {
}else if(StringUtils.equals(appType, AppTypeEnum.IOS.getCode())) {
}
return jsonObj;
}
......
package com.flink.util;
/**
* 比较工具
* @author wjs
*
*/
public class CompareUtils {
/**
* 对象比较相等
* @param a
* @param b
* @return
*/
public static boolean objectEquals(Object a,Object b){
if(a == null && b == null){
return true;
}
if((a == null && b != null) || (a != null && b == null)){
return false;
}
if(a.equals(b)){
return true;
} else {
return false;
}
}
/**
* 对象比较不相等
* @param a
* @param b
* @return
*/
public static boolean objectNotEquals(Object a,Object b){
return !objectEquals(a, b);
}
/**
* Long比较相等
* @param a
* @param b
* @return
*/
public static boolean longEquals(Long a,Long b){
if(a == null && b == null){
return true;
}
if((a == null && b != null) || (a != null && b == null)){
return false;
}
if(a.longValue() == b.longValue()){
return true;
} else {
return false;
}
}
/**
* Long比较不相等
* @param a
* @param b
* @return
*/
public static boolean longNotEquals(Long a,Long b){
return !longEquals(a, b);
}
/**
* Integer比较相等
* @param a
* @param b
* @return
*/
public static boolean integerEquals(Integer a,Integer b){
if(a == null && b == null){
return true;
}
if((a == null && b != null) || (a != null && b == null)){
return false;
}
if(a.intValue() == b.intValue()){
return true;
} else {
return false;
}
}
/**
* Integer比较不相等
* @param a
* @param b
* @return
*/
public static boolean integerNotEquals(Integer a,Integer b){
return !integerEquals(a, b);
}
/**
* 判断值是否存在列表中
* @param a
* @param compareValues
* @return
*/
public static boolean integerExists(Integer a,Integer... compareValues){
if(a == null || compareValues == null){
return false;
}
for(Integer compareValue : compareValues){
if(compareValue != null && a.intValue() == compareValue.intValue()){
return true;
}
}
return false;
}
/**
* 判断值是否存在列表中
* @param a
* @param compareValues
* @return
*/
public static boolean stringExists(String a,String... compareValues){
if(a == null || compareValues == null){
return false;
}
for(String compareValue : compareValues){
if(compareValue != null && a.equals(compareValue)){
return true;
}
}
return false;
}
}
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or sign in to comment