Commit 6c6db13e by 魏建枢

simi上报增加海内外版

parent 36fe53e0
...@@ -72,11 +72,34 @@ public class EventLogAchi implements Serializable { ...@@ -72,11 +72,34 @@ public class EventLogAchi implements Serializable {
private static final String[] COMMUNITY_EVENT_FIELDS = { "id", "event_type", "event_time", "event_info", private static final String[] COMMUNITY_EVENT_FIELDS = { "id", "event_type", "event_time", "event_info",
"send_time", "create_time", "strategy_group_id", "app_key", "app_type", "app_channel", "zone_code", "send_time", "create_time", "strategy_group_id", "app_key", "app_type", "app_channel", "zone_code",
"zone_name", "zone_type", "sdk_version", "user_agent", "device_id", "uid", "strategy_version", "route_ip", "zone_name", "zone_type", "sdk_version", "user_agent", "device_id", "uid", "strategy_version", "route_ip",
"cid", "phone", "nick", "unique_id","community_id","community_type", DORIS_DELETE_SIGN }; "cid", "phone", "nick", "unique_id", "community_id", "community_type", DORIS_DELETE_SIGN };
private static final DataType[] COMMUNITY_EVENT_TYPES = { DataTypes.STRING(), // id VARCHAR(50) private static final DataType[] COMMUNITY_EVENT_TYPES = { DataTypes.STRING(), // id VARCHAR(50)
DataTypes.STRING(), DataTypes.TIMESTAMP(3), DataTypes.STRING(), DataTypes.TIMESTAMP(3),
DataTypes.TIMESTAMP(3), DataTypes.STRING(), DataTypes.STRING(), DataTypes.STRING(), DataTypes.STRING(),
DataTypes.STRING(), DataTypes.STRING(), DataTypes.STRING(), DataTypes.STRING(), DataTypes.STRING(),
DataTypes.STRING(), DataTypes.STRING(), DataTypes.STRING(), DataTypes.STRING(), DataTypes.STRING(),
DataTypes.STRING(), DataTypes.STRING(), DataTypes.STRING(), DataTypes.STRING(), DataTypes.STRING(),
DataTypes.INT() };
private static final String[] PUSH_EVENT_LOG_FIELDS = { "id", "event_type", "event_time", "push_event_type",
"msg_id", "msg_time", "pace","type", "length", "title", "msg_id_list", "event_info", "send_time", "create_time",
"strategy_group_id", "app_key", "app_type", "app_channel", "zone_code", "zone_name", "zone_type",
"sdk_version", "user_agent", "device_id", "uid", "strategy_version", "route_ip", "cid", "phone", "nick",
"unique_id", DORIS_DELETE_SIGN };
private static final DataType[] PUSH_EVENT_LOG_TYPES = {
DataTypes.STRING(), // id VARCHAR(50)
DataTypes.STRING(), // event_type varchar(255) DataTypes.STRING(), // event_type varchar(255)
DataTypes.TIMESTAMP(3), // event_time datetime(3) DataTypes.TIMESTAMP(3), // event_time datetime(3)
DataTypes.STRING(), // push_event_type varchar(255)
DataTypes.STRING(), // msg_id varchar(255)
DataTypes.TIMESTAMP(3), // msg_time datetime(3)
DataTypes.INT(), // pace integer
DataTypes.STRING(), // pace integer
DataTypes.DOUBLE(), // length double
DataTypes.STRING(), // title text
DataTypes.STRING(), // msg_id_list text
DataTypes.STRING(), // event_info text DataTypes.STRING(), // event_info text
DataTypes.TIMESTAMP(3), // send_time DATETIME(3) DataTypes.TIMESTAMP(3), // send_time DATETIME(3)
DataTypes.TIMESTAMP(3), // create_time DATETIME(3) DataTypes.TIMESTAMP(3), // create_time DATETIME(3)
...@@ -88,19 +111,17 @@ public class EventLogAchi implements Serializable { ...@@ -88,19 +111,17 @@ public class EventLogAchi implements Serializable {
DataTypes.STRING(), // zone_name varchar(100) DataTypes.STRING(), // zone_name varchar(100)
DataTypes.STRING(), // zone_type varchar(100) DataTypes.STRING(), // zone_type varchar(100)
DataTypes.STRING(), // sdk_version varchar(10) DataTypes.STRING(), // sdk_version varchar(10)
DataTypes.STRING(), // user_agent TEXT DataTypes.STRING(), // user_agent TEXT NULL
DataTypes.STRING(), // device_id varchar(128) DataTypes.STRING(), // device_id varchar(128)
DataTypes.STRING(), // uid varchar(128) DataTypes.STRING(), // uid varchar(128)
DataTypes.STRING(), // strategy_version varchar(128) DataTypes.STRING(), // strategy_version varchar(128)
DataTypes.STRING(), // route_ip TEXT DataTypes.STRING(), // route_ip TEXT NULL
DataTypes.STRING(), // cid varchar(20) DataTypes.STRING(), // cid varchar(20)
DataTypes.STRING(), // phone varchar(20) DataTypes.STRING(), // phone varchar(20)
DataTypes.STRING(), // nick varchar(100) DataTypes.STRING(), // nick varchar(100)
DataTypes.STRING(), // unique_id varchar(100) DataTypes.STRING(), // unique_id varchar(100)
DataTypes.STRING(), // community_id varchar(100) DataTypes.INT()
DataTypes.STRING(), // community_type varchar(100) };
DataTypes.INT() };
// 事件采集错误日志表配置 // 事件采集错误日志表配置
private static final String[] EVENT_ERROR_FIELDS = { "id", "dt", "event_time", "create_time", "app_key", "app_type", private static final String[] EVENT_ERROR_FIELDS = { "id", "dt", "event_time", "create_time", "app_key", "app_type",
"cid", "phone", "nick", "event", "data", "startTime", "timeDifference", "endTime", "userId", "cid", "phone", "nick", "event", "data", "startTime", "timeDifference", "endTime", "userId",
...@@ -122,34 +143,37 @@ public class EventLogAchi implements Serializable { ...@@ -122,34 +143,37 @@ public class EventLogAchi implements Serializable {
public static void eventLog(DataStreamSource<String> dataStreamSource) { public static void eventLog(DataStreamSource<String> dataStreamSource) {
// 初始化表配置 // 初始化表配置
TableConfig eventConfig = new TableConfig(EVENT_FIELDS, EVENT_TYPES, "bi.event_log"); TableConfig eventConfig = new TableConfig(EVENT_FIELDS, EVENT_TYPES, "bi.event_log");
TableConfig communityEventConfig = new TableConfig(COMMUNITY_EVENT_FIELDS, COMMUNITY_EVENT_TYPES, "bi.community_event_log"); TableConfig communityEventConfig = new TableConfig(COMMUNITY_EVENT_FIELDS, COMMUNITY_EVENT_TYPES,"bi.community_event_log");
TableConfig pushEventConfig = new TableConfig(PUSH_EVENT_LOG_FIELDS, PUSH_EVENT_LOG_TYPES,"bi.push_event_log");
TableConfig eventErrorConfig = new TableConfig(EVENT_ERROR_FIELDS, EVENT_ERROR_TYPES, "bi.event_log_error"); TableConfig eventErrorConfig = new TableConfig(EVENT_ERROR_FIELDS, EVENT_ERROR_TYPES, "bi.event_log_error");
TableConfig eventExposureConfig = new TableConfig(EVENT_EXPOSURE_FIELDS, EVENT_EXPOSURE_TYPES, TableConfig eventExposureConfig = new TableConfig(EVENT_EXPOSURE_FIELDS, EVENT_EXPOSURE_TYPES,"ai.event_exposure");
"ai.event_exposure");
// 创建Doris Sink // 创建Doris Sink
DorisSink<RowData> dorisEventSink = DorisConnector.sinkDoris(eventConfig.getFields(), eventConfig.getTypes(), DorisSink<RowData> dorisEventSink = DorisConnector.sinkDoris(eventConfig.getFields(), eventConfig.getTypes(),eventConfig.getTableName());
eventConfig.getTableName()); DorisSink<RowData> dorisCommunityEventSink = DorisConnector.sinkDoris(communityEventConfig.getFields(),communityEventConfig.getTypes(), communityEventConfig.getTableName());
DorisSink<RowData> doriscommunityEventSink = DorisConnector.sinkDoris(communityEventConfig.getFields(), communityEventConfig.getTypes(), DorisSink<RowData> dorisPushEventSink = DorisConnector.sinkDoris(pushEventConfig.getFields(),pushEventConfig.getTypes(), pushEventConfig.getTableName());
communityEventConfig.getTableName()); DorisSink<RowData> dorisEventErrorSink = DorisConnector.sinkDoris(eventErrorConfig.getFields(),eventErrorConfig.getTypes(), eventErrorConfig.getTableName());
DorisSink<RowData> dorisEventErrorSink = DorisConnector.sinkDoris(eventErrorConfig.getFields(), DorisSink<RowData> dorisEventExposureSink = DorisConnector.sinkDoris(eventExposureConfig.getFields(),eventExposureConfig.getTypes(), eventExposureConfig.getTableName());
eventErrorConfig.getTypes(), eventErrorConfig.getTableName());
DorisSink<RowData> dorisEventExposureSink = DorisConnector.sinkDoris(eventExposureConfig.getFields(),
eventExposureConfig.getTypes(), eventExposureConfig.getTableName());
// 处理设备信息采集日志数据 // 处理设备信息采集日志数据
processDataStream(dataStreamSource, "eventLog", eventConfig, dorisEventSink, processDataStream(dataStreamSource, "eventLog", eventConfig, dorisEventSink,
(item, fieldCount) -> mapToEventRow(item, fieldCount)); (item, fieldCount) -> mapToEventRow(item, fieldCount));
batchCommunityProcessDataStream(dataStreamSource, "eventLog", communityEventConfig, doriscommunityEventSink, batchCommunityProcessDataStream(dataStreamSource, "eventLog", communityEventConfig, dorisCommunityEventSink,
(item, fieldCount) -> { (item, fieldCount) -> {
return null; return null;
}, EventLogAchi::mapToCommunityEventRow); }, EventLogAchi::mapToCommunityEventRow);
batchPushProcessDataStream(dataStreamSource, "eventLog", pushEventConfig, dorisPushEventSink,
(item, fieldCount) -> {
return null;
}, EventLogAchi::mapToPushEventRow);
batchProcessDataStream(dataStreamSource, "eventLog", eventErrorConfig, dorisEventErrorSink, batchProcessDataStream(dataStreamSource, "eventLog", eventErrorConfig, dorisEventErrorSink,
(item, fieldCount) -> { (item, fieldCount) -> {
return null; return null;
}, EventLogAchi::mapToEventErrorRow); }, EventLogAchi::mapToEventErrorRow);
batchExposureProcessDataStream(dataStreamSource, "eventLog", eventExposureConfig, dorisEventExposureSink, batchExposureProcessDataStream(dataStreamSource, "eventLog", eventExposureConfig, dorisEventExposureSink,
(item, fieldCount) -> { (item, fieldCount) -> {
return null; return null;
...@@ -166,13 +190,23 @@ public class EventLogAchi implements Serializable { ...@@ -166,13 +190,23 @@ public class EventLogAchi implements Serializable {
} }
private static void batchCommunityProcessDataStream(DataStreamSource<String> dataStream, String flumeType, private static void batchCommunityProcessDataStream(DataStreamSource<String> dataStream, String flumeType,
TableConfig tableConfig, DorisSink<RowData> dorisSink, RowMapper mapper, EventCommunityMapper communityMapper) { TableConfig tableConfig, DorisSink<RowData> dorisSink, RowMapper mapper,
EventCommunityMapper communityMapper) {
SingleOutputStreamOperator<RowData> processedStream = dataStream SingleOutputStreamOperator<RowData> processedStream = dataStream
.flatMap(new ElementCommunityProcessor(flumeType, communityMapper, tableConfig.getFields().length)) .flatMap(new ElementCommunityProcessor(flumeType, communityMapper, tableConfig.getFields().length))
.returns(TypeInformation.of(RowData.class)).filter(Objects::nonNull); .returns(TypeInformation.of(RowData.class)).filter(Objects::nonNull);
processedStream.sinkTo(dorisSink).name("Doris-" + flumeType); processedStream.sinkTo(dorisSink).name("Doris-" + flumeType);
} }
private static void batchPushProcessDataStream(DataStreamSource<String> dataStream, String flumeType,
TableConfig tableConfig, DorisSink<RowData> dorisSink, RowMapper mapper,
EventPushMapper pushMapper) {
SingleOutputStreamOperator<RowData> processedStream = dataStream
.flatMap(new ElementPushProcessor(flumeType, pushMapper, tableConfig.getFields().length))
.returns(TypeInformation.of(RowData.class)).filter(Objects::nonNull);
processedStream.sinkTo(dorisSink).name("Doris-" + flumeType);
}
private static void batchProcessDataStream(DataStreamSource<String> dataStream, String flumeType, private static void batchProcessDataStream(DataStreamSource<String> dataStream, String flumeType,
TableConfig tableConfig, DorisSink<RowData> dorisSink, RowMapper mapper, EventErrorMapper errorMapper) { TableConfig tableConfig, DorisSink<RowData> dorisSink, RowMapper mapper, EventErrorMapper errorMapper) {
...@@ -297,6 +331,7 @@ public class EventLogAchi implements Serializable { ...@@ -297,6 +331,7 @@ public class EventLogAchi implements Serializable {
"all_time"); "all_time");
} }
} }
private static class ElementCommunityProcessor implements FlatMapFunction<String, RowData>, Serializable { private static class ElementCommunityProcessor implements FlatMapFunction<String, RowData>, Serializable {
/** /**
...@@ -322,8 +357,7 @@ public class EventLogAchi implements Serializable { ...@@ -322,8 +357,7 @@ public class EventLogAchi implements Serializable {
} }
processKafkaMessage(value, out); processKafkaMessage(value, out);
} catch (Exception e) { } catch (Exception e) {
logger.error("EventLogAchi 处理 {} 数据出错 | rawData:{} | error:{}", flumeType, value, e.getMessage(), logger.error("EventLogAchi 处理 {} 数据出错 | rawData:{} | error:{}", flumeType, value, e.getMessage(), e);
e);
} }
} }
...@@ -365,6 +399,73 @@ public class EventLogAchi implements Serializable { ...@@ -365,6 +399,73 @@ public class EventLogAchi implements Serializable {
} }
} }
private static class ElementPushProcessor implements FlatMapFunction<String, RowData>, Serializable {
/**
*
*/
private static final long serialVersionUID = 1L;
private final String flumeType;
private final EventPushMapper pushMapper;;
private final int fieldCount;
public ElementPushProcessor(String flumeType, EventPushMapper pushMapper, int fieldCount) {
this.flumeType = flumeType;
this.pushMapper = pushMapper;
this.fieldCount = fieldCount;
}
@Override
public void flatMap(String value, Collector<RowData> out) throws Exception {
try {
JSONObject jsonObj = JSON.parseObject(value);
if (!flumeType.equals(jsonObj.getString(FLUME_TYPE_FIELD))) {
return;
}
processKafkaMessage(value, out);
} catch (Exception e) {
logger.error("EventLogAchi 处理 {} 数据出错 | rawData:{} | error:{}", flumeType, value, e.getMessage(), e);
}
}
private void processKafkaMessage(String value, Collector<RowData> out) {
try {
OdsEventLog event = JSON.parseObject(value, OdsEventLog.class);
if (event == null)
return;
List<EventListBak> eventList = parseEventListSafely(event.getEvent_list());
if (CollectionUtils.isEmpty(eventList))
return;
// int dt = TimeConvertUtil.convertToSqlDate(event.getCreate_time().substring(0, 10));
UserProperties userProps = UserPropertiesProcessor.userPropertiesToJson(event.getUser_properties());
for (EventListBak eventInfo : eventList) {
if (isTargetEventType(eventInfo.getR7())) {
out.collect(pushMapper.map(event, eventInfo, userProps, fieldCount));
}
}
} catch (Exception e) {
logger.error("Kafka消息处理失败 | data:{} | error:{}", value, e.getMessage());
}
}
private static List<EventListBak> parseEventListSafely(String jsonStr) {
try {
return JSON.parseArray(jsonStr, EventListBak.class);
} catch (Exception e) {
logger.warn("事件列表解析失败: {}", jsonStr);
return Collections.emptyList();
}
}
// 事件类型过滤
private static boolean isTargetEventType(String eventType) {
return CompareUtils.stringExists(eventType, "push_video","push_all_know","push_know");
}
}
private static class ElementExposureProcessor implements FlatMapFunction<String, RowData>, Serializable { private static class ElementExposureProcessor implements FlatMapFunction<String, RowData>, Serializable {
/** /**
...@@ -475,31 +576,32 @@ public class EventLogAchi implements Serializable { ...@@ -475,31 +576,32 @@ public class EventLogAchi implements Serializable {
return row; return row;
} }
private static RowData mapToCommunityEventRow(OdsEventLog event, EventListBak eventInfo, UserProperties userProperties,int fieldCount) { private static RowData mapToCommunityEventRow(OdsEventLog event, EventListBak eventInfo,UserProperties userProperties, int fieldCount) {
String community_id = null; String community_id = null;
String community_type = null; String community_type = null;
if(StringUtils.isNotEmpty(eventInfo.getR8())) { if (StringUtils.isNotEmpty(eventInfo.getR8())) {
JSONObject jsonObj = JSON.parseObject(eventInfo.getR8()); JSONObject jsonObj = JSON.parseObject(eventInfo.getR8());
String id = jsonObj.getString("id"); String id = jsonObj.getString("id");
String communityId = jsonObj.getString("community_id"); String communityId = jsonObj.getString("community_id");
community_type = jsonObj.getString("type"); community_type = jsonObj.getString("type");
if(StringUtils.isNotEmpty(id)) { if (StringUtils.isNotEmpty(id)) {
community_id = id; community_id = id;
}if(StringUtils.isNotEmpty(communityId)) { }
if (StringUtils.isNotEmpty(communityId)) {
community_id = communityId; community_id = communityId;
} }
} }
GenericRowData row = new GenericRowData(fieldCount); GenericRowData row = new GenericRowData(fieldCount);
row.setField(0, StringData.fromString(event.getId())); row.setField(0, StringData.fromString(event.getId()));
row.setField(1, StringData.fromString(eventInfo.getR7())); // event_type - 需要补充数据来源 row.setField(1, StringData.fromString(eventInfo.getR7())); // event_type - 需要补充数据来源
row.setField(2, TimestampData.fromLocalDateTime(LocalDateTime.parse(TimeConvertUtil.parseToStringSSS(eventInfo.getR9()), row.setField(2,
TimestampData.fromLocalDateTime(LocalDateTime.parse(TimeConvertUtil.parseToStringSSS(eventInfo.getR9()),
DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss.SSS")))); // event_time - 需要补充数据来源 DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss.SSS")))); // event_time - 需要补充数据来源
row.setField(3, StringData.fromString(eventInfo.getR8())); // event_info - 需要补充数据来源 row.setField(3, StringData.fromString(eventInfo.getR8())); // event_info - 需要补充数据来源
row.setField(4, TimestampData row.setField(4, TimestampData.fromLocalDateTime(
.fromLocalDateTime(LocalDateTime.parse(event.getSend_time(), DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss")))); LocalDateTime.parse(event.getSend_time(), DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss"))));
row.setField(5, TimestampData.fromLocalDateTime( row.setField(5, TimestampData.fromLocalDateTime(
LocalDateTime.parse(event.getCreate_time(), DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss.SSS")))); LocalDateTime.parse(event.getCreate_time(), DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss.SSS"))));
row.setField(6, StringData.fromString(event.getStrategy_group_id())); row.setField(6, StringData.fromString(event.getStrategy_group_id()));
...@@ -525,6 +627,64 @@ public class EventLogAchi implements Serializable { ...@@ -525,6 +627,64 @@ public class EventLogAchi implements Serializable {
return row; return row;
} }
private static RowData mapToPushEventRow(OdsEventLog event, EventListBak eventInfo,UserProperties userProperties, int fieldCount) {
String push_event_type = null;
Integer pace = null;
Double length = null;
Long msg_time = null;
String msg_id = null;
String title = null;
String msg_id_list = null;
String type = null;
if (StringUtils.isNotEmpty(eventInfo.getR8())) {
JSONObject jsonObj = JSON.parseObject(eventInfo.getR8());
push_event_type = jsonObj.getString("event_type");
pace = jsonObj.getInteger("pace");
length = jsonObj.getDouble("length");
msg_time = jsonObj.getLong("msg_time");
msg_id = jsonObj.getString("msg_id");
title = jsonObj.getString("title");
msg_id_list = jsonObj.getString("msg_id_list");
type = jsonObj.getString("type");
}
GenericRowData row = new GenericRowData(fieldCount);
row.setField(0, StringData.fromString(event.getId()));
row.setField(1, StringData.fromString(eventInfo.getR7())); // event_type - 需要补充数据来源
row.setField(2,TimestampData.fromLocalDateTime(LocalDateTime.parse(TimeConvertUtil.parseToStringSSS(eventInfo.getR9()),DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss.SSS")))); // event_time - 需要补充数据来源
row.setField(3, StringData.fromString(push_event_type)); // push_event_type
row.setField(4, StringData.fromString(msg_id)); // msg_id
row.setField(5, msg_time == null ? null : TimestampData.fromLocalDateTime(LocalDateTime.parse(TimeConvertUtil.parseToStringSSS(eventInfo.getR9()),DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss.SSS")))); // msg_time
row.setField(6, pace); // pace
row.setField(7, StringData.fromString(type)); // type
row.setField(8, length); // length
row.setField(9, StringData.fromString(title)); // title
row.setField(10, StringData.fromString(msg_id_list)); // msg_id_list
row.setField(11, StringData.fromString(eventInfo.getR8())); // event_info - 需要补充数据来源
row.setField(12, TimestampData.fromLocalDateTime(LocalDateTime.parse(event.getSend_time(), DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss"))));
row.setField(13, TimestampData.fromLocalDateTime(LocalDateTime.parse(event.getCreate_time(), DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss.SSS"))));
row.setField(14, StringData.fromString(event.getStrategy_group_id()));
row.setField(15, StringData.fromString(event.getApp_key()));
row.setField(16, StringData.fromString(event.getApp_type()));
row.setField(17, StringData.fromString(event.getApp_channel()));
row.setField(18, StringData.fromString(event.getZone_code()));
row.setField(19, StringData.fromString(event.getZone_name()));
row.setField(20, StringData.fromString(event.getZone_type()));
row.setField(21, StringData.fromString(event.getSdk_version()));
row.setField(22, StringData.fromString(event.getUser_agent()));
row.setField(23, StringData.fromString(event.getDevice_id()));
row.setField(24, StringData.fromString(event.getUid()));
row.setField(25, StringData.fromString(event.getStrategy_version()));
row.setField(26, StringData.fromString(event.getRoute_ip()));
row.setField(27, StringData.fromString(userProperties == null ? null : userProperties.getCid()));
row.setField(28, StringData.fromString(userProperties == null ? null : userProperties.getPhone()));
row.setField(29, StringData.fromString(userProperties == null ? null : userProperties.getNick()));
row.setField(30, StringData.fromString(event.getUnique_id()));
row.setField(31, DELETE_SIGN_VALUE);
return row;
}
private static RowData mapToEventErrorRow(OdsEventLog event, EventList eventInfo, UserProperties userProps, int dt, private static RowData mapToEventErrorRow(OdsEventLog event, EventList eventInfo, UserProperties userProps, int dt,
int fieldCount) { int fieldCount) {
GenericRowData row = new GenericRowData(fieldCount); GenericRowData row = new GenericRowData(fieldCount);
...@@ -609,4 +769,9 @@ public class EventLogAchi implements Serializable { ...@@ -609,4 +769,9 @@ public class EventLogAchi implements Serializable {
private interface EventCommunityMapper extends Serializable { private interface EventCommunityMapper extends Serializable {
RowData map(OdsEventLog event, EventListBak eventInfo, UserProperties userProps, int fieldCount); RowData map(OdsEventLog event, EventListBak eventInfo, UserProperties userProps, int fieldCount);
} }
@FunctionalInterface
private interface EventPushMapper extends Serializable {
RowData map(OdsEventLog event, EventListBak eventInfo, UserProperties userProps, int fieldCount);
}
} }
...@@ -3,10 +3,13 @@ package com.flink.achieve.base; ...@@ -3,10 +3,13 @@ package com.flink.achieve.base;
import java.io.Serializable; import java.io.Serializable;
import java.time.LocalDateTime; import java.time.LocalDateTime;
import java.time.format.DateTimeFormatter; import java.time.format.DateTimeFormatter;
import java.util.List;
import java.util.Objects; import java.util.Objects;
import org.apache.commons.collections.CollectionUtils;
import org.apache.commons.lang3.StringUtils; import org.apache.commons.lang3.StringUtils;
import org.apache.doris.flink.sink.DorisSink; import org.apache.doris.flink.sink.DorisSink;
import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.common.functions.MapFunction; import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.common.typeinfo.TypeInformation; import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.streaming.api.datastream.DataStreamSource; import org.apache.flink.streaming.api.datastream.DataStreamSource;
...@@ -17,6 +20,7 @@ import org.apache.flink.table.data.RowData; ...@@ -17,6 +20,7 @@ import org.apache.flink.table.data.RowData;
import org.apache.flink.table.data.StringData; import org.apache.flink.table.data.StringData;
import org.apache.flink.table.data.TimestampData; import org.apache.flink.table.data.TimestampData;
import org.apache.flink.table.types.DataType; import org.apache.flink.table.types.DataType;
import org.apache.flink.util.Collector;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
...@@ -25,17 +29,19 @@ import com.alibaba.fastjson.JSONObject; ...@@ -25,17 +29,19 @@ import com.alibaba.fastjson.JSONObject;
import com.alibaba.fastjson.TypeReference; import com.alibaba.fastjson.TypeReference;
import com.flink.common.DorisConnector; import com.flink.common.DorisConnector;
import com.flink.config.TableConfig; import com.flink.config.TableConfig;
import com.flink.util.CompareUtils;
import com.flink.util.TimeConvertUtil; import com.flink.util.TimeConvertUtil;
import com.flink.vo.PcEventInfo; import com.flink.vo.PcEventInfo;
import com.flink.vo.PcOdsEventLog; import com.flink.vo.PcOdsEventLog;
import com.flink.vo.PcProperties;
import com.flink.vo.UserProperties;
/** /**
* @author wjs * @author wjs
* @version 创建时间:2025-10-22 18:40:59 * @version 创建时间:2025-10-22 18:40:59 类说明
* 类说明 */
*/
public class PcEventLogAchi implements Serializable{ public class PcEventLogAchi implements Serializable {
/** /**
* *
...@@ -60,33 +66,165 @@ public class PcEventLogAchi implements Serializable{ ...@@ -60,33 +66,165 @@ public class PcEventLogAchi implements Serializable{
DataTypes.STRING(), DataTypes.STRING(), DataTypes.STRING(), DataTypes.STRING(), DataTypes.STRING(), DataTypes.STRING(), DataTypes.STRING(), DataTypes.STRING(), DataTypes.STRING(), DataTypes.STRING(),
DataTypes.STRING(), DataTypes.STRING(), DataTypes.STRING(), DataTypes.STRING(), DataTypes.INT() }; DataTypes.STRING(), DataTypes.STRING(), DataTypes.STRING(), DataTypes.STRING(), DataTypes.INT() };
private static final String[] PUSH_PC_EVENT_LOG_FIELDS = { "id", "event_type", "event_time", "push_event_type",
"msg_id", "msg_time", "pace", "type", "length", "title", "msg_id_list", "event_info", "send_time",
"create_time", "strategy_group_id", "app_key", "app_type", "app_channel", "zone_code", "zone_name",
"zone_type", "sdk_version", "user_agent", "device_id", "uid", "strategy_version", "route_ip", "cid",
"phone", "nick", "unique_id", DORIS_DELETE_SIGN };
private static final DataType[] PUSH_PC_EVENT_LOG_TYPES = { DataTypes.STRING(), DataTypes.STRING(),
DataTypes.TIMESTAMP(3), DataTypes.STRING(), DataTypes.STRING(), DataTypes.TIMESTAMP(3), DataTypes.INT(),
DataTypes.STRING(), DataTypes.DOUBLE(), DataTypes.STRING(), DataTypes.STRING(), DataTypes.STRING(),
DataTypes.TIMESTAMP(3), DataTypes.TIMESTAMP(3), DataTypes.STRING(), DataTypes.STRING(), DataTypes.STRING(),
DataTypes.STRING(), DataTypes.STRING(), DataTypes.STRING(), DataTypes.STRING(), DataTypes.STRING(),
DataTypes.STRING(), DataTypes.STRING(), DataTypes.STRING(), DataTypes.STRING(), DataTypes.STRING(),
DataTypes.STRING(), DataTypes.STRING(), DataTypes.STRING(), DataTypes.STRING(), DataTypes.INT() };
public static void pcEventLog(DataStreamSource<String> dataStreamSource) { public static void pcEventLog(DataStreamSource<String> dataStreamSource) {
// 初始化表配置 // 初始化表配置
TableConfig pcEventConfig = new TableConfig(PC_EVENT_FIELDS, PC_EVENT_TYPES, "bi.event_log"); TableConfig pcEventConfig = new TableConfig(PC_EVENT_FIELDS, PC_EVENT_TYPES, "bi.event_log");
TableConfig pushPcEventConfig = new TableConfig(PUSH_PC_EVENT_LOG_FIELDS, PUSH_PC_EVENT_LOG_TYPES,"bi.push_event_log");
// 创建Doris Sink // 创建Doris Sink
DorisSink<RowData> dorisPcEventSink = DorisConnector.sinkDoris(pcEventConfig.getFields(), pcEventConfig.getTypes(), DorisSink<RowData> dorisPcEventSink = DorisConnector.sinkDoris(pcEventConfig.getFields(),pcEventConfig.getTypes(), pcEventConfig.getTableName());
pcEventConfig.getTableName()); DorisSink<RowData> dorisPushEventSink = DorisConnector.sinkDoris(pushPcEventConfig.getFields(),pushPcEventConfig.getTypes(), pushPcEventConfig.getTableName());
// 处理设备信息采集日志数据 // 处理设备信息采集日志数据
processDataStream(dataStreamSource, "pcEventLog", pcEventConfig, dorisPcEventSink, processDataStream(dataStreamSource, "pcEventLog", pcEventConfig, dorisPcEventSink,
(item, fieldCount) -> mapToPcEventRow(item, fieldCount)); (item, fieldCount) -> mapToPcEventRow(item, fieldCount));
batchPushProcessDataStream(dataStreamSource, "pcEventLog", pushPcEventConfig, dorisPushEventSink,
(item, fieldCount) -> {
return null;
}, PcEventLogAchi::mapToPushPcEventRow);
}
private static void processDataStream(DataStreamSource<String> dataStream, String flumeType,
TableConfig tableConfig, DorisSink<RowData> dorisSink, RowMapper mapper) {
SingleOutputStreamOperator<RowData> processedStream = dataStream
.map(new ElementProcessorWithMap(flumeType, mapper, tableConfig.getFields().length))
.returns(TypeInformation.of(RowData.class)).filter(Objects::nonNull);
processedStream.sinkTo(dorisSink).name("Doris-" + flumeType);
}
private static void batchPushProcessDataStream(DataStreamSource<String> dataStream, String flumeType,
TableConfig tableConfig, DorisSink<RowData> dorisSink, RowMapper mapper,
EventPushMapper pushMapper) {
SingleOutputStreamOperator<RowData> processedStream = dataStream
.flatMap(new ElementPushProcessor(flumeType, pushMapper, tableConfig.getFields().length))
.returns(TypeInformation.of(RowData.class)).filter(Objects::nonNull);
processedStream.sinkTo(dorisSink).name("Doris-" + flumeType);
}
/**
* 使用map算子的内部处理类
*/
private static class ElementProcessorWithMap implements MapFunction<String, RowData>, Serializable {
private static final long serialVersionUID = 1L;
private final String flumeType;
private final RowMapper mapper;
private final int fieldCount;
public ElementProcessorWithMap(String flumeType, RowMapper mapper, int fieldCount) {
this.flumeType = flumeType;
this.mapper = mapper;
this.fieldCount = fieldCount;
}
@Override
public RowData map(String value) throws Exception {
try {
JSONObject jsonObj = JSON.parseObject(value);
if (!flumeType.equals(jsonObj.getString(FLUME_TYPE_FIELD))) {
return null;
}
return mapper.map(value, fieldCount);
} catch (Exception e) {
logger.error("PcEventLogAchi 处理 {} 数据出错 | rawData:{} | error:{}", flumeType, value, e.getMessage(), e);
return null;
}
}
}
private static class ElementPushProcessor implements FlatMapFunction<String, RowData>, Serializable {
/**
*
*/
private static final long serialVersionUID = 1L;
private final String flumeType;
private final EventPushMapper pushMapper;;
private final int fieldCount;
public ElementPushProcessor(String flumeType, EventPushMapper pushMapper, int fieldCount) {
this.flumeType = flumeType;
this.pushMapper = pushMapper;
this.fieldCount = fieldCount;
}
@Override
public void flatMap(String value, Collector<RowData> out) throws Exception {
try {
JSONObject jsonObj = JSON.parseObject(value);
if (!flumeType.equals(jsonObj.getString(FLUME_TYPE_FIELD))) {
return;
}
processKafkaMessage(value, out);
} catch (Exception e) {
logger.error("PcEventLogAchi 处理 {} 数据出错 | rawData:{} | error:{}", flumeType, value, e.getMessage(), e);
}
}
private void processKafkaMessage(String value, Collector<RowData> out) {
try {
PcOdsEventLog pcOdsEventLog = JSON.parseObject(value, PcOdsEventLog.class);
if (pcOdsEventLog == null) {
return;
}
String event_info = pcOdsEventLog.getEvent_info();
PcEventInfo pcEventInfo = JSONObject.parseObject(event_info, new TypeReference<PcEventInfo>() {});
List<PcProperties> eventList = pcEventInfo.getProperties();
if (CollectionUtils.isEmpty(eventList))
return;
UserProperties userProps = new UserProperties();
userProps.setCid(pcEventInfo.getCid());
userProps.setPhone(pcEventInfo.getPhone());
userProps.setNick(pcEventInfo.getNick());
for (PcProperties eventInfo : eventList) {
if (isTargetEventType(eventInfo.getR4())) {
out.collect(pushMapper.map(pcOdsEventLog,pcEventInfo, eventInfo, userProps, fieldCount));
}
}
} catch (Exception e) {
logger.error("Kafka消息处理失败 | data:{} | error:{}", value, e.getMessage());
}
} }
// 事件类型过滤
private static boolean isTargetEventType(String eventType) {
return CompareUtils.stringExists(eventType, "push_video","push_all_know","push_know");
}
}
private static RowData mapToPcEventRow(Object item, int fieldCount) { private static RowData mapToPcEventRow(Object item, int fieldCount) {
String value = (String) item; String value = (String) item;
PcOdsEventLog pcOdsEventLog = JSONObject.parseObject(value, new TypeReference<PcOdsEventLog>() {}); PcOdsEventLog pcOdsEventLog = JSONObject.parseObject(value, new TypeReference<PcOdsEventLog>() {
});
if (null == pcOdsEventLog) { if (null == pcOdsEventLog) {
return null; return null;
} }
String event_info = pcOdsEventLog.getEvent_info(); String event_info = pcOdsEventLog.getEvent_info();
if(StringUtils.isEmpty(event_info)) { if (StringUtils.isEmpty(event_info)) {
return null; return null;
} }
PcEventInfo pcEventInfo = JSONObject.parseObject(event_info, new TypeReference<PcEventInfo>() {}); PcEventInfo pcEventInfo = JSONObject.parseObject(event_info, new TypeReference<PcEventInfo>() {
});
String id = pcOdsEventLog.getId(); String id = pcOdsEventLog.getId();
String send_time = pcOdsEventLog.getSend_time(); String send_time = pcOdsEventLog.getSend_time();
String create_time = pcOdsEventLog.getCreate_time(); String create_time = pcOdsEventLog.getCreate_time();
...@@ -111,8 +249,8 @@ public class PcEventLogAchi implements Serializable{ ...@@ -111,8 +249,8 @@ public class PcEventLogAchi implements Serializable{
GenericRowData row = new GenericRowData(fieldCount); GenericRowData row = new GenericRowData(fieldCount);
row.setField(0, StringData.fromString(id)); row.setField(0, StringData.fromString(id));
row.setField(1, TimeConvertUtil.convertToSqlDate(create_time.substring(0, 10))); row.setField(1, TimeConvertUtil.convertToSqlDate(create_time.substring(0, 10)));
row.setField(2, TimestampData row.setField(2, TimestampData.fromLocalDateTime(
.fromLocalDateTime(LocalDateTime.parse(send_time, DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss.SSS")))); LocalDateTime.parse(send_time, DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss.SSS"))));
row.setField(3, TimestampData.fromLocalDateTime( row.setField(3, TimestampData.fromLocalDateTime(
LocalDateTime.parse(create_time, DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss.SSS")))); LocalDateTime.parse(create_time, DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss.SSS"))));
row.setField(4, StringData.fromString(null)); row.setField(4, StringData.fromString(null));
...@@ -138,46 +276,68 @@ public class PcEventLogAchi implements Serializable{ ...@@ -138,46 +276,68 @@ public class PcEventLogAchi implements Serializable{
} }
private static void processDataStream(DataStreamSource<String> dataStream, String flumeType, private static RowData mapToPushPcEventRow(PcOdsEventLog event,PcEventInfo pcEventInfo, PcProperties eventInfo,UserProperties userProperties, int fieldCount) {
TableConfig tableConfig, DorisSink<RowData> dorisSink, RowMapper mapper) { String push_event_type = null;
SingleOutputStreamOperator<RowData> processedStream = dataStream Integer pace = null;
.map(new ElementProcessorWithMap(flumeType, mapper, tableConfig.getFields().length)) Double length = null;
.returns(TypeInformation.of(RowData.class)).filter(Objects::nonNull); Long msg_time = null;
processedStream.sinkTo(dorisSink).name("Doris-" + flumeType); String msg_id = null;
String title = null;
String msg_id_list = null;
String type = null;
if (StringUtils.isNotEmpty(eventInfo.getR3())) {
JSONObject jsonObj = JSON.parseObject(eventInfo.getR3());
push_event_type = jsonObj.getString("event_type");
pace = jsonObj.getInteger("pace");
length = jsonObj.getDouble("length");
msg_time = jsonObj.getLong("msg_time");
msg_id = jsonObj.getString("msg_id");
title = jsonObj.getString("title");
msg_id_list = jsonObj.getString("msg_id_list");
type = jsonObj.getString("type");
} }
String device_id = pcEventInfo.getI8();
String unique_id = pcEventInfo.getI7();
String uid = pcEventInfo.getI7();
String route_ip = pcEventInfo.getS1();
/** GenericRowData row = new GenericRowData(fieldCount);
* 使用map算子的内部处理类 row.setField(0, StringData.fromString(event.getId()));
*/ row.setField(1, StringData.fromString(eventInfo.getR4())); // event_type - 需要补充数据来源
private static class ElementProcessorWithMap implements MapFunction<String, RowData>, Serializable { row.setField(2,TimestampData.fromLocalDateTime(LocalDateTime.parse(TimeConvertUtil.parseToStringSSS(eventInfo.getR9()),DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss.SSS")))); // event_time - 需要补充数据来源
private static final long serialVersionUID = 1L;
private final String flumeType;
private final RowMapper mapper;
private final int fieldCount;
public ElementProcessorWithMap(String flumeType, RowMapper mapper, int fieldCount) { row.setField(3, StringData.fromString(push_event_type)); // push_event_type
this.flumeType = flumeType; row.setField(4, StringData.fromString(msg_id)); // msg_id
this.mapper = mapper; row.setField(5, msg_time == null ? null : TimestampData.fromLocalDateTime(LocalDateTime.parse(TimeConvertUtil.parseToStringSSS(eventInfo.getR9()),DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss.SSS")))); // msg_time
this.fieldCount = fieldCount; row.setField(6, pace); // pace
} row.setField(7, StringData.fromString(type)); // type
row.setField(8, length); // length
row.setField(9, StringData.fromString(title)); // title
row.setField(10, StringData.fromString(msg_id_list)); // msg_id_list
@Override row.setField(11, StringData.fromString(eventInfo.getR3())); // event_info - 需要补充数据来源
public RowData map(String value) throws Exception { row.setField(12, TimestampData.fromLocalDateTime(LocalDateTime.parse(event.getSend_time(), DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss.SSS"))));
try { row.setField(13, TimestampData.fromLocalDateTime(LocalDateTime.parse(event.getCreate_time(), DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss.SSS"))));
JSONObject jsonObj = JSON.parseObject(value); row.setField(14, StringData.fromString(null));
if (!flumeType.equals(jsonObj.getString(FLUME_TYPE_FIELD))) { row.setField(15, StringData.fromString(event.getApp_key()));
return null; row.setField(16, StringData.fromString(event.getApp_type()));
} row.setField(17, StringData.fromString(event.getApp_channel()));
return mapper.map(value, fieldCount); row.setField(18, StringData.fromString(event.getZone_code()));
} catch (Exception e) { row.setField(19, StringData.fromString(event.getZone_name()));
logger.error("PcEventLogAchi 处理 {} 数据出错 | rawData:{} | error:{}", flumeType, value, e.getMessage(), e); row.setField(20, StringData.fromString(event.getZone_type()));
return null; row.setField(21, StringData.fromString(null));
} row.setField(22, StringData.fromString(event.getUser_agent()));
} row.setField(23, StringData.fromString(device_id));
row.setField(24, StringData.fromString(uid));
row.setField(25, StringData.fromString(null));
row.setField(26, StringData.fromString(route_ip));
row.setField(27, StringData.fromString(userProperties == null ? null : userProperties.getCid()));
row.setField(28, StringData.fromString(userProperties == null ? null : userProperties.getPhone()));
row.setField(29, StringData.fromString(userProperties == null ? null : userProperties.getNick()));
row.setField(30, StringData.fromString(unique_id));
row.setField(31, DELETE_SIGN_VALUE);
return row;
} }
/** /**
* 行数据映射接口 * 行数据映射接口
* *
...@@ -187,4 +347,9 @@ public class PcEventLogAchi implements Serializable{ ...@@ -187,4 +347,9 @@ public class PcEventLogAchi implements Serializable{
private interface RowMapper extends Serializable { private interface RowMapper extends Serializable {
RowData map(Object item, int fieldCount); RowData map(Object item, int fieldCount);
} }
@FunctionalInterface
private interface EventPushMapper extends Serializable {
RowData map(PcOdsEventLog event,PcEventInfo pcEventInfo, PcProperties eventInfo, UserProperties userProps, int fieldCount);
}
} }
...@@ -130,6 +130,7 @@ public class DorisBaseSchema { ...@@ -130,6 +130,7 @@ public class DorisBaseSchema {
.column("country_code", STRING()) .column("country_code", STRING())
.column("phone_number", STRING()) .column("phone_number", STRING())
.column("create_time", TIMESTAMP(3)) .column("create_time", TIMESTAMP(3))
.column("platform", STRING())
.build(); .build();
createTableDescriptor(tableEnv, schema, tempTableName,dbTableName); createTableDescriptor(tableEnv, schema, tempTableName,dbTableName);
} }
...@@ -141,6 +142,7 @@ public class DorisBaseSchema { ...@@ -141,6 +142,7 @@ public class DorisBaseSchema {
.column("old_nick_name", STRING()) .column("old_nick_name", STRING())
.column("new_nick_name", STRING()) .column("new_nick_name", STRING())
.column("create_time", TIMESTAMP(3)) .column("create_time", TIMESTAMP(3))
.column("platform", STRING())
.build(); .build();
createTableDescriptor(tableEnv, schema, tempTableName,dbTableName); createTableDescriptor(tableEnv, schema, tempTableName,dbTableName);
} }
...@@ -154,6 +156,7 @@ public class DorisBaseSchema { ...@@ -154,6 +156,7 @@ public class DorisBaseSchema {
.column("add_method", STRING()) .column("add_method", STRING())
.column("remark", STRING()) .column("remark", STRING())
.column("create_time", TIMESTAMP(3)) .column("create_time", TIMESTAMP(3))
.column("platform", STRING())
.build(); .build();
createTableDescriptor(tableEnv, schema, tempTableName,dbTableName); createTableDescriptor(tableEnv, schema, tempTableName,dbTableName);
} }
...@@ -167,6 +170,7 @@ public class DorisBaseSchema { ...@@ -167,6 +170,7 @@ public class DorisBaseSchema {
.column("add_method", STRING()) .column("add_method", STRING())
.column("remark", STRING()) .column("remark", STRING())
.column("create_time", TIMESTAMP(3)) .column("create_time", TIMESTAMP(3))
.column("platform", STRING())
.build(); .build();
createTableDescriptor(tableEnv, schema, tempTableName,dbTableName); createTableDescriptor(tableEnv, schema, tempTableName,dbTableName);
} }
...@@ -179,6 +183,7 @@ public class DorisBaseSchema { ...@@ -179,6 +183,7 @@ public class DorisBaseSchema {
.column("old_nick_name", STRING()) .column("old_nick_name", STRING())
.column("new_nick_name", STRING()) .column("new_nick_name", STRING())
.column("create_time", TIMESTAMP(3)) .column("create_time", TIMESTAMP(3))
.column("platform", STRING())
.build(); .build();
createTableDescriptor(tableEnv, schema, tempTableName,dbTableName); createTableDescriptor(tableEnv, schema, tempTableName,dbTableName);
} }
...@@ -191,6 +196,7 @@ public class DorisBaseSchema { ...@@ -191,6 +196,7 @@ public class DorisBaseSchema {
.column("friend_nick_name", STRING()) .column("friend_nick_name", STRING())
.column("remark", STRING()) .column("remark", STRING())
.column("create_time", TIMESTAMP(3)) .column("create_time", TIMESTAMP(3))
.column("platform", STRING())
.build(); .build();
createTableDescriptor(tableEnv, schema, tempTableName,dbTableName); createTableDescriptor(tableEnv, schema, tempTableName,dbTableName);
} }
...@@ -201,6 +207,7 @@ public class DorisBaseSchema { ...@@ -201,6 +207,7 @@ public class DorisBaseSchema {
.column("time", TIMESTAMP(3)) .column("time", TIMESTAMP(3))
.column("group_id", DataTypes.STRING()) .column("group_id", DataTypes.STRING())
.column("create_time", TIMESTAMP(3)) .column("create_time", TIMESTAMP(3))
.column("platform", STRING())
.build(); .build();
createTableDescriptor(tableEnv, schema, tempTableName,dbTableName); createTableDescriptor(tableEnv, schema, tempTableName,dbTableName);
} }
...@@ -213,6 +220,7 @@ public class DorisBaseSchema { ...@@ -213,6 +220,7 @@ public class DorisBaseSchema {
.column("group_name", STRING()) .column("group_name", STRING())
.column("join_type", STRING()) .column("join_type", STRING())
.column("create_time", TIMESTAMP(3)) .column("create_time", TIMESTAMP(3))
.column("platform", STRING())
.build(); .build();
createTableDescriptor(tableEnv, schema, tempTableName,dbTableName); createTableDescriptor(tableEnv, schema, tempTableName,dbTableName);
} }
...@@ -225,6 +233,7 @@ public class DorisBaseSchema { ...@@ -225,6 +233,7 @@ public class DorisBaseSchema {
.column("old_group_name", STRING()) .column("old_group_name", STRING())
.column("new_group_name", STRING()) .column("new_group_name", STRING())
.column("create_time", TIMESTAMP(3)) .column("create_time", TIMESTAMP(3))
.column("platform", STRING())
.build(); .build();
createTableDescriptor(tableEnv, schema, tempTableName,dbTableName); createTableDescriptor(tableEnv, schema, tempTableName,dbTableName);
} }
...@@ -237,6 +246,7 @@ public class DorisBaseSchema { ...@@ -237,6 +246,7 @@ public class DorisBaseSchema {
.column("group_name", STRING()) .column("group_name", STRING())
.column("exit_type", STRING()) .column("exit_type", STRING())
.column("create_time", TIMESTAMP(3)) .column("create_time", TIMESTAMP(3))
.column("platform", STRING())
.build(); .build();
createTableDescriptor(tableEnv, schema, tempTableName,dbTableName); createTableDescriptor(tableEnv, schema, tempTableName,dbTableName);
} }
...@@ -248,6 +258,7 @@ public class DorisBaseSchema { ...@@ -248,6 +258,7 @@ public class DorisBaseSchema {
.column("release_type", STRING()) .column("release_type", STRING())
.column("article_id", STRING()) .column("article_id", STRING())
.column("create_time", TIMESTAMP(3)) .column("create_time", TIMESTAMP(3))
.column("platform", STRING())
.build(); .build();
createTableDescriptor(tableEnv, schema, tempTableName,dbTableName); createTableDescriptor(tableEnv, schema, tempTableName,dbTableName);
} }
...@@ -263,6 +274,7 @@ public class DorisBaseSchema { ...@@ -263,6 +274,7 @@ public class DorisBaseSchema {
.column("comment_id", STRING()) .column("comment_id", STRING())
.column("comment_level", STRING()) .column("comment_level", STRING())
.column("create_time", TIMESTAMP(3)) .column("create_time", TIMESTAMP(3))
.column("platform", STRING())
.build(); .build();
createTableDescriptor(tableEnv, schema, tempTableName,dbTableName); createTableDescriptor(tableEnv, schema, tempTableName,dbTableName);
} }
......
...@@ -22,9 +22,10 @@ public class OpenSimiApiTable { ...@@ -22,9 +22,10 @@ public class OpenSimiApiTable {
" t.`time` AS `time`, " + " t.`time` AS `time`, " +
" t.simi_api_info['countryCode'] AS country_code, " + " t.simi_api_info['countryCode'] AS country_code, " +
" t.simi_api_info['phoneNumber'] AS phone_number, " + " t.simi_api_info['phoneNumber'] AS phone_number, " +
" k.create_time AS create_time " + " k.create_time AS create_time, " +
" t.simi_api_info['platform'] AS platform " +
"FROM kafka_open_simi_api AS k " + "FROM kafka_open_simi_api AS k " +
"LEFT JOIN LATERAL TABLE(ParseOpenSimiApi(req_body,send_type)) AS t(`time`,simi_api_info) ON TRUE " + "LEFT JOIN LATERAL TABLE(ParseOpenSimiApi(req_body,send_type,flume_type)) AS t(`time`,simi_api_info) ON TRUE " +
"WHERE send_type = '1'" "WHERE send_type = '1'"
); );
//2 //2
...@@ -36,9 +37,10 @@ public class OpenSimiApiTable { ...@@ -36,9 +37,10 @@ public class OpenSimiApiTable {
" t.`time` AS `time`, " + " t.`time` AS `time`, " +
" t.simi_api_info['oldNickname'] AS old_nick_name, " + " t.simi_api_info['oldNickname'] AS old_nick_name, " +
" t.simi_api_info['newNickname'] AS new_nick_name, " + " t.simi_api_info['newNickname'] AS new_nick_name, " +
" k.create_time AS create_time " + " k.create_time AS create_time, " +
" t.simi_api_info['platform'] AS platform " +
"FROM kafka_open_simi_api AS k " + "FROM kafka_open_simi_api AS k " +
"LEFT JOIN LATERAL TABLE(ParseOpenSimiApi(req_body,send_type)) AS t(`time`,simi_api_info) ON TRUE " + "LEFT JOIN LATERAL TABLE(ParseOpenSimiApi(req_body,send_type,flume_type)) AS t(`time`,simi_api_info) ON TRUE " +
"WHERE send_type = '2'" "WHERE send_type = '2'"
); );
//3 //3
...@@ -52,9 +54,10 @@ public class OpenSimiApiTable { ...@@ -52,9 +54,10 @@ public class OpenSimiApiTable {
" t.simi_api_info['friendNickname'] AS friend_nick_name, " + " t.simi_api_info['friendNickname'] AS friend_nick_name, " +
" t.simi_api_info['addMethod'] AS add_method, " + " t.simi_api_info['addMethod'] AS add_method, " +
" t.simi_api_info['remark'] AS remark, " + " t.simi_api_info['remark'] AS remark, " +
" k.create_time AS create_time " + " k.create_time AS create_time, " +
" t.simi_api_info['platform'] AS platform " +
"FROM kafka_open_simi_api AS k " + "FROM kafka_open_simi_api AS k " +
"LEFT JOIN LATERAL TABLE(ParseOpenSimiApi(req_body,send_type)) AS t(`time`,simi_api_info) ON TRUE " + "LEFT JOIN LATERAL TABLE(ParseOpenSimiApi(req_body,send_type,flume_type)) AS t(`time`,simi_api_info) ON TRUE " +
"WHERE send_type = '3'" "WHERE send_type = '3'"
); );
//4 //4
...@@ -68,9 +71,10 @@ public class OpenSimiApiTable { ...@@ -68,9 +71,10 @@ public class OpenSimiApiTable {
" t.simi_api_info['friendNickname'] AS friend_nick_name, " + " t.simi_api_info['friendNickname'] AS friend_nick_name, " +
" t.simi_api_info['addMethod'] AS add_method, " + " t.simi_api_info['addMethod'] AS add_method, " +
" t.simi_api_info['remark'] AS remark, " + " t.simi_api_info['remark'] AS remark, " +
" k.create_time AS create_time " + " k.create_time AS create_time, " +
" t.simi_api_info['platform'] AS platform " +
"FROM kafka_open_simi_api AS k " + "FROM kafka_open_simi_api AS k " +
"LEFT JOIN LATERAL TABLE(ParseOpenSimiApi(req_body,send_type)) AS t(`time`,simi_api_info) ON TRUE " + "LEFT JOIN LATERAL TABLE(ParseOpenSimiApi(req_body,send_type,flume_type)) AS t(`time`,simi_api_info) ON TRUE " +
"WHERE send_type = '4'" "WHERE send_type = '4'"
); );
//5 //5
...@@ -83,9 +87,10 @@ public class OpenSimiApiTable { ...@@ -83,9 +87,10 @@ public class OpenSimiApiTable {
" t.simi_api_info['friendCid'] AS friend_cid, " + " t.simi_api_info['friendCid'] AS friend_cid, " +
" t.simi_api_info['oldNickname'] AS old_nick_name, " + " t.simi_api_info['oldNickname'] AS old_nick_name, " +
" t.simi_api_info['newNickname'] AS new_nick_name, " + " t.simi_api_info['newNickname'] AS new_nick_name, " +
" k.create_time AS create_time " + " k.create_time AS create_time, " +
" t.simi_api_info['platform'] AS platform " +
"FROM kafka_open_simi_api AS k " + "FROM kafka_open_simi_api AS k " +
"LEFT JOIN LATERAL TABLE(ParseOpenSimiApi(req_body,send_type)) AS t(`time`,simi_api_info) ON TRUE " + "LEFT JOIN LATERAL TABLE(ParseOpenSimiApi(req_body,send_type,flume_type)) AS t(`time`,simi_api_info) ON TRUE " +
"WHERE send_type = '5'" "WHERE send_type = '5'"
); );
//6 //6
...@@ -98,9 +103,10 @@ public class OpenSimiApiTable { ...@@ -98,9 +103,10 @@ public class OpenSimiApiTable {
" t.simi_api_info['friendCid'] AS friend_cid, " + " t.simi_api_info['friendCid'] AS friend_cid, " +
" t.simi_api_info['friendNickname'] AS friend_nick_name, " + " t.simi_api_info['friendNickname'] AS friend_nick_name, " +
" t.simi_api_info['remark'] AS remark, " + " t.simi_api_info['remark'] AS remark, " +
" k.create_time AS create_time " + " k.create_time AS create_time, " +
" t.simi_api_info['platform'] AS platform " +
"FROM kafka_open_simi_api AS k " + "FROM kafka_open_simi_api AS k " +
"LEFT JOIN LATERAL TABLE(ParseOpenSimiApi(req_body,send_type)) AS t(`time`,simi_api_info) ON TRUE " + "LEFT JOIN LATERAL TABLE(ParseOpenSimiApi(req_body,send_type,flume_type)) AS t(`time`,simi_api_info) ON TRUE " +
"WHERE send_type = '6'" "WHERE send_type = '6'"
); );
//7 //7
...@@ -111,9 +117,10 @@ public class OpenSimiApiTable { ...@@ -111,9 +117,10 @@ public class OpenSimiApiTable {
" t.simi_api_info['cid'] AS cid, " + " t.simi_api_info['cid'] AS cid, " +
" t.`time` AS `time`, " + " t.`time` AS `time`, " +
" t.simi_api_info['groupId'] AS group_id, " + " t.simi_api_info['groupId'] AS group_id, " +
" k.create_time AS create_time " + " k.create_time AS create_time, " +
" t.simi_api_info['platform'] AS platform " +
"FROM kafka_open_simi_api AS k " + "FROM kafka_open_simi_api AS k " +
"LEFT JOIN LATERAL TABLE(ParseOpenSimiApi(req_body,send_type)) AS t(`time`,simi_api_info) ON TRUE " + "LEFT JOIN LATERAL TABLE(ParseOpenSimiApi(req_body,send_type,flume_type)) AS t(`time`,simi_api_info) ON TRUE " +
"WHERE send_type = '7'" "WHERE send_type = '7'"
); );
//8 //8
...@@ -126,9 +133,10 @@ public class OpenSimiApiTable { ...@@ -126,9 +133,10 @@ public class OpenSimiApiTable {
" t.simi_api_info['groupId'] AS group_id, " + " t.simi_api_info['groupId'] AS group_id, " +
" t.simi_api_info['groupName'] AS group_name, " + " t.simi_api_info['groupName'] AS group_name, " +
" t.simi_api_info['joinType'] AS join_type, " + " t.simi_api_info['joinType'] AS join_type, " +
" k.create_time AS create_time " + " k.create_time AS create_time, " +
" t.simi_api_info['platform'] AS platform " +
"FROM kafka_open_simi_api AS k " + "FROM kafka_open_simi_api AS k " +
"LEFT JOIN LATERAL TABLE(ParseOpenSimiApi(req_body,send_type)) AS t(`time`,simi_api_info) ON TRUE " + "LEFT JOIN LATERAL TABLE(ParseOpenSimiApi(req_body,send_type,flume_type)) AS t(`time`,simi_api_info) ON TRUE " +
"WHERE send_type = '8'" "WHERE send_type = '8'"
); );
//9 //9
...@@ -141,9 +149,10 @@ public class OpenSimiApiTable { ...@@ -141,9 +149,10 @@ public class OpenSimiApiTable {
" t.simi_api_info['groupId'] AS group_id, " + " t.simi_api_info['groupId'] AS group_id, " +
" t.simi_api_info['oldGroupName'] AS old_group_name, " + " t.simi_api_info['oldGroupName'] AS old_group_name, " +
" t.simi_api_info['newGroupName'] AS new_group_name, " + " t.simi_api_info['newGroupName'] AS new_group_name, " +
" k.create_time AS create_time " + " k.create_time AS create_time, " +
" t.simi_api_info['platform'] AS platform " +
"FROM kafka_open_simi_api AS k " + "FROM kafka_open_simi_api AS k " +
"LEFT JOIN LATERAL TABLE(ParseOpenSimiApi(req_body,send_type)) AS t(`time`,simi_api_info) ON TRUE " + "LEFT JOIN LATERAL TABLE(ParseOpenSimiApi(req_body,send_type,flume_type)) AS t(`time`,simi_api_info) ON TRUE " +
"WHERE send_type = '9'" "WHERE send_type = '9'"
); );
//10 //10
...@@ -156,9 +165,10 @@ public class OpenSimiApiTable { ...@@ -156,9 +165,10 @@ public class OpenSimiApiTable {
" t.simi_api_info['groupId'] AS group_id, " + " t.simi_api_info['groupId'] AS group_id, " +
" t.simi_api_info['groupName'] AS group_name, " + " t.simi_api_info['groupName'] AS group_name, " +
" t.simi_api_info['exitType'] AS exit_type, " + " t.simi_api_info['exitType'] AS exit_type, " +
" k.create_time AS create_time " + " k.create_time AS create_time, " +
" t.simi_api_info['platform'] AS platform " +
"FROM kafka_open_simi_api AS k " + "FROM kafka_open_simi_api AS k " +
"LEFT JOIN LATERAL TABLE(ParseOpenSimiApi(req_body,send_type)) AS t(`time`,simi_api_info) ON TRUE " + "LEFT JOIN LATERAL TABLE(ParseOpenSimiApi(req_body,send_type,flume_type)) AS t(`time`,simi_api_info) ON TRUE " +
"WHERE send_type = '10'" "WHERE send_type = '10'"
); );
//11 //11
...@@ -170,9 +180,10 @@ public class OpenSimiApiTable { ...@@ -170,9 +180,10 @@ public class OpenSimiApiTable {
" t.`time` AS `time`, " + " t.`time` AS `time`, " +
" t.simi_api_info['releaseType'] AS release_type, " + " t.simi_api_info['releaseType'] AS release_type, " +
" t.simi_api_info['articleId'] AS article_id, " + " t.simi_api_info['articleId'] AS article_id, " +
" k.create_time AS create_time " + " k.create_time AS create_time, " +
" t.simi_api_info['platform'] AS platform " +
"FROM kafka_open_simi_api AS k " + "FROM kafka_open_simi_api AS k " +
"LEFT JOIN LATERAL TABLE(ParseOpenSimiApi(req_body,send_type)) AS t(`time`,simi_api_info) ON TRUE " + "LEFT JOIN LATERAL TABLE(ParseOpenSimiApi(req_body,send_type,flume_type)) AS t(`time`,simi_api_info) ON TRUE " +
"WHERE send_type = '11'" "WHERE send_type = '11'"
); );
//12 //12
...@@ -188,9 +199,10 @@ public class OpenSimiApiTable { ...@@ -188,9 +199,10 @@ public class OpenSimiApiTable {
" t.simi_api_info['commentContent'] AS comment_content, " + " t.simi_api_info['commentContent'] AS comment_content, " +
" t.simi_api_info['commentId'] AS comment_id, " + " t.simi_api_info['commentId'] AS comment_id, " +
" t.simi_api_info['commentLevel'] AS comment_level, " + " t.simi_api_info['commentLevel'] AS comment_level, " +
" k.create_time AS create_time " + " k.create_time AS create_time, " +
" t.simi_api_info['platform'] AS platform " +
"FROM kafka_open_simi_api AS k " + "FROM kafka_open_simi_api AS k " +
"LEFT JOIN LATERAL TABLE(ParseOpenSimiApi(req_body,send_type)) AS t(`time`,simi_api_info) ON TRUE " + "LEFT JOIN LATERAL TABLE(ParseOpenSimiApi(req_body,send_type,flume_type)) AS t(`time`,simi_api_info) ON TRUE " +
"WHERE send_type = '12'" "WHERE send_type = '12'"
); );
} }
......
...@@ -17,6 +17,7 @@ import org.slf4j.LoggerFactory; ...@@ -17,6 +17,7 @@ import org.slf4j.LoggerFactory;
import com.alibaba.fastjson.JSONObject; import com.alibaba.fastjson.JSONObject;
import com.alibaba.fastjson.TypeReference; import com.alibaba.fastjson.TypeReference;
import com.flink.enums.OpenSimiApiTypeEnum; import com.flink.enums.OpenSimiApiTypeEnum;
import com.flink.util.CompareUtils;
import com.flink.vo.simi.ContentInteractionReqDto; import com.flink.vo.simi.ContentInteractionReqDto;
import com.flink.vo.simi.CreateGroupReqDto; import com.flink.vo.simi.CreateGroupReqDto;
import com.flink.vo.simi.DeleteFriendReqDto; import com.flink.vo.simi.DeleteFriendReqDto;
...@@ -55,13 +56,23 @@ public class ParseOpenSimiApiUDTF extends TableFunction<Row>{ ...@@ -55,13 +56,23 @@ public class ParseOpenSimiApiUDTF extends TableFunction<Row>{
return LocalDateTime.parse(timeStr, FORMATTER); return LocalDateTime.parse(timeStr, FORMATTER);
} }
public void eval(String req_body,String send_type) throws ParseException { public void eval(String req_body,String send_type,String flume_type) throws ParseException {
if(StringUtils.isAllEmpty(send_type,req_body)) { if(StringUtils.isAllEmpty(send_type,req_body,flume_type)) {
return; return;
} }
if(!CompareUtils.stringExists(flume_type, "openSimiApi","openAbroadSimiApiTest")) {
return;
}
String platform = null;
if(StringUtils.equals(flume_type, "openSimiApi")) {
platform = "国内版";
}else if(StringUtils.equals(flume_type, "openAbroadSimiApiTest")) {
platform = "海外版";
}
Map<String, Object> params = new HashMap<>(); Map<String, Object> params = new HashMap<>();
LocalDateTime timestamp = null; LocalDateTime timestamp = null;
params.put("platform", platform);
if(StringUtils.equals(send_type, OpenSimiApiTypeEnum.USER_REGISTRATION.getCode())) {//1 if(StringUtils.equals(send_type, OpenSimiApiTypeEnum.USER_REGISTRATION.getCode())) {//1
UserRegistrationReqDto jsonReqDto = JSONObject.parseObject(req_body,new TypeReference<UserRegistrationReqDto>(){}); UserRegistrationReqDto jsonReqDto = JSONObject.parseObject(req_body,new TypeReference<UserRegistrationReqDto>(){});
params.put("id", jsonReqDto.getId()); params.put("id", jsonReqDto.getId());
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or sign in to comment