Commit 510633ea by 魏建枢

代码提交

parent c889f22f
......@@ -5,6 +5,8 @@ import java.time.LocalDateTime;
import java.time.format.DateTimeFormatter;
import java.util.Objects;
import org.apache.commons.collections.CollectionUtils;
import org.apache.commons.lang3.StringUtils;
import org.apache.doris.flink.sink.DorisSink;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.common.typeinfo.TypeInformation;
......@@ -28,6 +30,8 @@ import com.flink.enums.ProcessTypeEnum;
import com.flink.processor.function.UserPropertiesProcessor;
import com.flink.util.TimeConvertUtil;
import com.flink.vo.OdsEventLog;
import com.flink.vo.PcEventInfo;
import com.flink.vo.PcOdsEventLog;
import com.flink.vo.UserProperties;
/**
......@@ -67,6 +71,9 @@ public class ExceptionEventAchi implements Serializable {
// 处理设备信息采集日志数据
processDataStream(dataStreamSource, "eventLog", exceptionEventConfig, dorisExceptionEventSink,
(item, fieldCount) -> mapToExceptionEventRow(item, fieldCount));
processDataStream(dataStreamSource, "pcEventLog", exceptionEventConfig, dorisExceptionEventSink,
(item, fieldCount) -> mapToExceptionPcEventRow(item, fieldCount));
}
private static void processDataStream(DataStreamSource<String> dataStream, String flumeType,
......@@ -151,6 +158,64 @@ public class ExceptionEventAchi implements Serializable {
row.setField(25, DELETE_SIGN_VALUE);
return row;
}
private static RowData mapToExceptionPcEventRow(Object item, int fieldCount) {
String value = (String) item;
PcOdsEventLog event = JSONObject.parseObject(value, new TypeReference<PcOdsEventLog>() {});
String event_info = event.getEvent_info();
if(StringUtils.isEmpty(event_info)) {
return null;
}
PcEventInfo pcEventInfo = JSONObject.parseObject(event_info, new TypeReference<PcEventInfo>() {});
String id = event.getId();
String send_time = event.getSend_time();
String create_time = event.getCreate_time();
String app_key = event.getApp_key();
String app_type = event.getApp_type();
String app_channel = event.getApp_channel();
String zone_code = event.getZone_code();
String zone_name = event.getZone_name();
String zone_type = event.getZone_type();
String user_agent = event.getUser_agent();
String uid = pcEventInfo.getI7();
String device_id = pcEventInfo.getI8();
String unique_id = pcEventInfo.getI7();
String cid = pcEventInfo.getCid();
String phone = pcEventInfo.getPhone();
String nick = pcEventInfo.getNick();
String route_ip = pcEventInfo.getS1();
GenericRowData row = new GenericRowData(fieldCount);
row.setField(0, StringData.fromString(id));
row.setField(1, TimeConvertUtil.convertToSqlDate(create_time.substring(0, 10)));
row.setField(2, TimestampData
.fromLocalDateTime(LocalDateTime.parse(send_time, DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss.SSS"))));
row.setField(3, TimestampData.fromLocalDateTime(
LocalDateTime.parse(create_time, DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss.SSS"))));
row.setField(4, StringData.fromString(null));
row.setField(5, StringData.fromString(app_key));
row.setField(6, StringData.fromString(app_type));
row.setField(7, StringData.fromString(app_channel));
row.setField(8, StringData.fromString(zone_code));
row.setField(9, StringData.fromString(zone_name));
row.setField(10, StringData.fromString(zone_type));
row.setField(11, StringData.fromString(null));
row.setField(12, StringData.fromString(user_agent));
row.setField(13, StringData.fromString(device_id));
row.setField(14, StringData.fromString(uid));
row.setField(15, StringData.fromString(null));
row.setField(16, StringData.fromString(CollectionUtils.isNotEmpty(pcEventInfo.getProperties()) ? pcEventInfo.getProperties().toString() : null));
row.setField(17, StringData.fromString(route_ip));
row.setField(18, StringData.fromString(cid));
row.setField(19, StringData.fromString(phone));
row.setField(20, StringData.fromString(nick));
row.setField(21, StringData.fromString(unique_id));
row.setField(22, StringData.fromString(event.getRemarks()));
row.setField(23, StringData.fromString(event.getNetwork_ip()));
row.setField(24, ProcessTypeEnum.NOT_HANDLE.getCode());
row.setField(25, DELETE_SIGN_VALUE);
return row;
}
/**
* 行数据映射接口
*
......
......@@ -33,4 +33,7 @@ public class PcOdsEventLog implements Serializable{
private String zone_type;
private String id;
private String user_agent;
private String remarks;
private String network_ip;
}
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or sign in to comment