Commit 8e4b37b0 by 魏建枢

增加处理类型

parent a4e4c796
...@@ -24,6 +24,7 @@ import com.alibaba.fastjson.JSONObject; ...@@ -24,6 +24,7 @@ import com.alibaba.fastjson.JSONObject;
import com.alibaba.fastjson.TypeReference; import com.alibaba.fastjson.TypeReference;
import com.flink.common.DorisConnector; import com.flink.common.DorisConnector;
import com.flink.config.TableConfig; import com.flink.config.TableConfig;
import com.flink.enums.ProcessTypeEnum;
import com.flink.processor.function.UserPropertiesProcessor; import com.flink.processor.function.UserPropertiesProcessor;
import com.flink.util.TimeConvertUtil; import com.flink.util.TimeConvertUtil;
import com.flink.vo.OdsEventLog; import com.flink.vo.OdsEventLog;
...@@ -47,14 +48,14 @@ public class ExceptionEventAchi implements Serializable { ...@@ -47,14 +48,14 @@ public class ExceptionEventAchi implements Serializable {
// 事件采集日志表配置 // 事件采集日志表配置
private static final String[] EXCEPTION_EVENT_FIELDS = { "id", "dt", "send_time", "create_time", "strategy_group_id", private static final String[] EXCEPTION_EVENT_FIELDS = { "id", "dt", "send_time", "create_time", "strategy_group_id",
"app_key", "app_type", "app_channel", "zone_code", "zone_name", "zone_type", "sdk_version", "user_agent", "app_key", "app_type", "app_channel", "zone_code", "zone_name", "zone_type", "sdk_version", "user_agent",
"device_id", "uid", "strategy_version", "event_list", "route_ip", "cid", "phone", "nick", "unique_id","remarks","network_ip", "device_id", "uid", "strategy_version", "event_list", "route_ip", "cid", "phone", "nick", "unique_id","remarks","network_ip","process_type",
DORIS_DELETE_SIGN }; DORIS_DELETE_SIGN };
private static final DataType[] EXCEPTION_EVENT_TYPES = { DataTypes.STRING(), DataTypes.DATE(), DataTypes.TIMESTAMP(3), private static final DataType[] EXCEPTION_EVENT_TYPES = { DataTypes.STRING(), DataTypes.DATE(), DataTypes.TIMESTAMP(3),
DataTypes.TIMESTAMP(3), DataTypes.STRING(), DataTypes.STRING(), DataTypes.STRING(), DataTypes.STRING(), DataTypes.TIMESTAMP(3), DataTypes.STRING(), DataTypes.STRING(), DataTypes.STRING(), DataTypes.STRING(),
DataTypes.STRING(), DataTypes.STRING(), DataTypes.STRING(), DataTypes.STRING(), DataTypes.STRING(), DataTypes.STRING(), DataTypes.STRING(), DataTypes.STRING(), DataTypes.STRING(), DataTypes.STRING(),
DataTypes.STRING(), DataTypes.STRING(), DataTypes.STRING(), DataTypes.STRING(), DataTypes.STRING(), DataTypes.STRING(), DataTypes.STRING(), DataTypes.STRING(), DataTypes.STRING(), DataTypes.STRING(),
DataTypes.STRING(), DataTypes.STRING(), DataTypes.STRING(), DataTypes.STRING(), DataTypes.STRING(),DataTypes.STRING(), DataTypes.INT() }; DataTypes.STRING(), DataTypes.STRING(), DataTypes.STRING(), DataTypes.STRING(), DataTypes.STRING(),DataTypes.STRING(), DataTypes.INT(),DataTypes.INT() };
public static void exceptionEvent(DataStreamSource<String> dataStreamSource) { public static void exceptionEvent(DataStreamSource<String> dataStreamSource) {
TableConfig exceptionEventConfig = new TableConfig(EXCEPTION_EVENT_FIELDS, EXCEPTION_EVENT_TYPES, "bi.exception_event_log"); TableConfig exceptionEventConfig = new TableConfig(EXCEPTION_EVENT_FIELDS, EXCEPTION_EVENT_TYPES, "bi.exception_event_log");
...@@ -146,7 +147,8 @@ public class ExceptionEventAchi implements Serializable { ...@@ -146,7 +147,8 @@ public class ExceptionEventAchi implements Serializable {
row.setField(21, StringData.fromString(event.getUnique_id())); row.setField(21, StringData.fromString(event.getUnique_id()));
row.setField(22, StringData.fromString(event.getRemarks())); row.setField(22, StringData.fromString(event.getRemarks()));
row.setField(23, StringData.fromString(event.getNetwork_ip())); row.setField(23, StringData.fromString(event.getNetwork_ip()));
row.setField(24, DELETE_SIGN_VALUE); row.setField(24, ProcessTypeEnum.NOT_HANDLE.getCode());
row.setField(25, DELETE_SIGN_VALUE);
return row; return row;
} }
/** /**
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or sign in to comment