Commit f0508aeb by 魏建枢

增加公共消费基础job

parent 0a7239f6
......@@ -177,6 +177,11 @@
<!-- 根据 JDK 版本选择路径 -->
<systemPath>${JAVA_HOME}/lib/tools.jar</systemPath>
</dependency>
<dependency>
<groupId>com.zaxxer</groupId>
<artifactId>HikariCP</artifactId>
<version>3.4.5</version>
</dependency>
</dependencies>
......
......@@ -48,6 +48,10 @@ public class CommonConsumeBaseAchi extends MultipleSourceCommonBase implements S
DataStreamSource<String> collectLogStreamSource = kafkaDataSource.getDataStreamSource();
CollectLogAchi.collectLog(collectLogStreamSource);
}
if(StringUtils.equals(kafkaDataSource.getTopic(),TopicTypeEnum.OPEN_SIMI_API.getTopic())) {
DataStreamSource<String> oepnSimiApiStreamSource = kafkaDataSource.getDataStreamSource();
OpenSimiApiAchi.openSimiApi(oepnSimiApiStreamSource);
}
}
}else {
return;
......
package com.flink.achieve.base;
import java.io.Serializable;
import java.util.Objects;
import org.apache.doris.flink.sink.DorisSink;
import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.calcite.shaded.org.apache.commons.codec.binary.StringUtils;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.table.api.DataTypes;
import org.apache.flink.table.data.GenericRowData;
import org.apache.flink.table.data.RowData;
import org.apache.flink.table.data.StringData;
import org.apache.flink.table.types.DataType;
import org.apache.flink.util.Collector;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONObject;
import com.alibaba.fastjson.TypeReference;
import com.flink.common.DorisConnector;
import com.flink.config.TableConfig;
import com.flink.enums.OpenSimiApiTypeEnum;
import com.flink.vo.simi.CreateGroupReqDto;
import com.flink.vo.simi.JoinGroupReqDto;
import com.flink.vo.simi.LeaveGroupReqDto;
/**
* @author wjs
* @version 创建时间:2025-8-7 10:40:09 类说明
*/
public class OpenSimiApiAchi implements Serializable {
/**
*
*/
private static final long serialVersionUID = 1L;
private static final Logger logger = LoggerFactory.getLogger(OpenSimiApiAchi.class);
// 定义公共常量
private static final String FLUME_TYPE_FIELD = "flume_type";
private static final String DATA_FIELD = "req_body";
private static final String SEND_TYPE = "send_type";
private static final String DORIS_DELETE_SIGN = "__DORIS_DELETE_SIGN__";
private static final int DELETE_SIGN_VALUE = 0;
// 用户加群退群的记录表配置
private static final String[] USERS_JOIN_EXIT_GROUP_FIELDS = { "cid", "group_id", "time",
"operation_type", "behavior_type","group_name", DORIS_DELETE_SIGN };
private static final DataType[] USERS_JOIN_EXIT_GROUP_TYPES = { DataTypes.STRING(), DataTypes.STRING(),
DataTypes.STRING(), DataTypes.STRING(), DataTypes.STRING(), DataTypes.STRING(), DataTypes.INT() };
public static void openSimiApi(DataStreamSource<String> dataStreamSource) {
// 初始化表配置
TableConfig usersJoinExitGroupConfig = new TableConfig(USERS_JOIN_EXIT_GROUP_FIELDS,
USERS_JOIN_EXIT_GROUP_TYPES, "bi.user_join_exit_group_record");
// 创建Doris Sink
DorisSink<RowData> dorisUsersSink = DorisConnector.sinkDoris(usersJoinExitGroupConfig.getFields(),
usersJoinExitGroupConfig.getTypes(), usersJoinExitGroupConfig.getTableName());
// 处理用户数据
processDataStream(dataStreamSource, "openSimiApi", usersJoinExitGroupConfig, dorisUsersSink,
(sendType, item, fieldCount) -> mapToUsersJoinExitGroupRow(sendType, item, fieldCount));
}
/**
* 通用数据处理方法
*
* @param dataStream 数据流
* @param flumeType 数据类型
* @param tableConfig 表配置
* @param dorisSink Doris Sink
* @param mapper 行数据映射函数
*/
private static void processDataStream(DataStreamSource<String> dataStream, String flumeType,
TableConfig tableConfig, DorisSink<RowData> dorisSink, RowMapper mapper) {
SingleOutputStreamOperator<RowData> processedStream = dataStream
.flatMap(new ElementProcessor(flumeType, mapper, tableConfig.getFields().length))
.returns(TypeInformation.of(RowData.class)).filter(Objects::nonNull);
processedStream.sinkTo(dorisSink).name("Doris-" + flumeType);
}
private static class ElementProcessor implements FlatMapFunction<String, RowData>, Serializable {
private static final long serialVersionUID = 1L;
private final String flumeType;
private final RowMapper mapper;
private final int fieldCount;
public ElementProcessor(String flumeType, RowMapper mapper, int fieldCount) {
this.flumeType = flumeType;
this.mapper = mapper;
this.fieldCount = fieldCount;
}
@Override
public void flatMap(String value, Collector<RowData> out) throws Exception {
try {
JSONObject jsonObj = JSON.parseObject(value);
if (!flumeType.equals(jsonObj.getString(FLUME_TYPE_FIELD))) {
return;
}
String reqBody = jsonObj.getString(DATA_FIELD);
String sendType = jsonObj.getString(SEND_TYPE);
Object item = null;
if (StringUtils.equals(sendType, OpenSimiApiTypeEnum.CREATE_GROUP.getCode())) {
item = JSON.parseObject(reqBody, new TypeReference<CreateGroupReqDto>() {
});
} else if (StringUtils.equals(sendType, OpenSimiApiTypeEnum.JOIN_GROUP.getCode())) {
item = JSON.parseObject(reqBody, new TypeReference<JoinGroupReqDto>() {
});
} else if (StringUtils.equals(sendType, OpenSimiApiTypeEnum.LEAVE_GROUP.getCode())) {
item = JSON.parseObject(reqBody, new TypeReference<LeaveGroupReqDto>() {
});
}
if (item != null) {
RowData row = mapper.map(sendType, item, fieldCount);
if (row != null) {
out.collect(row);
}
}
} catch (Exception e) {
logger.error("OpenSimiApiAchi 处理 {} 数据出错 | rawData:{} | error:{}", flumeType, value, e.getMessage(), e);
}
}
}
// 用户群数据映射
private static RowData mapToUsersJoinExitGroupRow(String sendType, Object item, int fieldCount) {
GenericRowData row = new GenericRowData(fieldCount);
if (StringUtils.equals(sendType, OpenSimiApiTypeEnum.CREATE_GROUP.getCode())) {
CreateGroupReqDto createGroup = (CreateGroupReqDto) item;
row.setField(0, StringData.fromString(createGroup.getCid()));
row.setField(1, StringData.fromString(createGroup.getGroupId()));
row.setField(2, StringData.fromString(createGroup.getTime()));
row.setField(3, StringData.fromString("加群"));
row.setField(4, StringData.fromString("邀请加入"));
row.setField(5, null); // 群名称
} else if (StringUtils.equals(sendType, OpenSimiApiTypeEnum.JOIN_GROUP.getCode())) {
JoinGroupReqDto joinGroup = (JoinGroupReqDto) item;
row.setField(0, StringData.fromString(joinGroup.getCid()));
row.setField(1, StringData.fromString(joinGroup.getGroupId()));
row.setField(2, StringData.fromString(joinGroup.getTime()));
row.setField(3, StringData.fromString("加群"));
row.setField(4, StringData.fromString(joinGroup.getJoinType()));
row.setField(5, StringData.fromString(joinGroup.getGroupName()));
} else if (StringUtils.equals(sendType, OpenSimiApiTypeEnum.LEAVE_GROUP.getCode())) {
LeaveGroupReqDto leaveGroup = (LeaveGroupReqDto) item;
row.setField(0, StringData.fromString(leaveGroup.getCid()));
row.setField(1, StringData.fromString(leaveGroup.getGroupId()));
row.setField(2, StringData.fromString(leaveGroup.getTime()));
row.setField(3, StringData.fromString("退群"));
row.setField(4, StringData.fromString(leaveGroup.getExitType()));
row.setField(5, StringData.fromString(leaveGroup.getGroupName()));
}
row.setField(6, DELETE_SIGN_VALUE);
return row;
}
/**
* 行数据映射接口
*
* @param <T> 数据类型
*/
@FunctionalInterface
private interface RowMapper extends Serializable {
RowData map(String sendType, Object item, int fieldCount);
}
}
......@@ -17,7 +17,7 @@ import com.flink.util.LoadPropertiesFile;
/**
* @author wjs
* @version 创建时间:2024-12-16 18:21:22
* 类说明 sink Kafka
* 类说明 sink Doris
*/
public class DorisConnector {
......
......@@ -19,6 +19,7 @@ import java.util.stream.Collectors;
public enum JobTypeEnum {
EVENT_IP_CONVERT("JOB_01", "事件IP转换作业"),
COMMON_CONSUME_BASE("JOB_02", "公共基础消费采集作业"),
USER_DAILY_ACTIVITY("JOB_03", "用户日活作业"),
EVENT_IP_CONVERT_CID("JOB_07", "最新事件IP作业"),
DEVICE_ID_CID("JOB_08", "最新设备ID作业"),
......
......@@ -9,6 +9,7 @@ import com.flink.processor.impl.EventIpLatestProcessor;
import com.flink.processor.impl.RegistrationCheckProcessor;
import com.flink.processor.impl.SimiFriendsProcessor;
import com.flink.processor.impl.SimiGroupstProcessor;
import com.flink.processor.impl.UserDailyActivityProcessor;
import com.flink.processor.impl.VectorAngleCalculationProcessor;
/**
......@@ -22,6 +23,10 @@ public class JobProcessorFactory {
switch (jobType) {
case EVENT_IP_CONVERT:
return new EventIpConvertProcessor();
case COMMON_CONSUME_BASE:
return new CommonConsumeBaseProcessor();
case USER_DAILY_ACTIVITY:
return new UserDailyActivityProcessor();
case EVENT_IP_CONVERT_CID:
return new EventIpLatestProcessor();
case DEVICE_ID_CID:
......@@ -34,8 +39,6 @@ public class JobProcessorFactory {
return new VectorAngleCalculationProcessor();
case REGISTRATION_CHECK:
return new RegistrationCheckProcessor();
case COMMON_CONSUME_BASE:
return new CommonConsumeBaseProcessor();
default:
throw new IllegalArgumentException("未知的Job类型: " + jobType);
}
......
......@@ -30,7 +30,8 @@ public class CommonConsumeBaseProcessor implements JobProcessor{
TopicTypeEnum.ODS_USER_INVITATION,
TopicTypeEnum.ODS_EVENT_LOG,
TopicTypeEnum.ODS_COMMUNITY_HISTORY,
TopicTypeEnum.ODS_NEW_COLLECT_LOG
TopicTypeEnum.ODS_NEW_COLLECT_LOG,
TopicTypeEnum.OPEN_SIMI_API
}).map(TopicTypeEnum::createKafkaTopic)
.collect(Collectors.toList());
......
package com.flink.processor.impl;
import java.util.Arrays;
import java.util.List;
import java.util.stream.Collectors;
import com.flink.achieve.doris.UserDailyActivityAchi;
import com.flink.enums.JobTypeEnum;
import com.flink.enums.TopicTypeEnum;
import com.flink.processor.JobProcessor;
import com.flink.vo.KafkaTopic;
/**
* @author wjs
* @version 创建时间:2025-7-31 11:03:25
* 类说明
*/
public class UserDailyActivityProcessor implements JobProcessor{
@Override
public void process() throws Exception {
new UserDailyActivityAchi().handleDataStreamSource(
createTopicList(),
JobTypeEnum.USER_DAILY_ACTIVITY
);
}
private static List<KafkaTopic> createTopicList() {
return Arrays.stream(new TopicTypeEnum[]{
TopicTypeEnum.ODS_NEW_COLLECT_LOG,
TopicTypeEnum.ODS_EVENT_LOG,
TopicTypeEnum.ODS_PC_EVENT_LOG,
TopicTypeEnum.ODS_PC_COLLECT_LOG,
TopicTypeEnum.SIMI_USER_LIST_TOPIC,
TopicTypeEnum.ABROAD_SIMI_USER_LIST_TOPIC
}).map(TopicTypeEnum::createKafkaTopic)
.collect(Collectors.toList());
}
}
package com.flink.vo;
import java.io.Serializable;
import lombok.Data;
import lombok.ToString;
/**
* @author wjs
* @version 创建时间:2025-8-1 16:17:44
* 类说明
*/
@Data
@ToString
public class CombinedLog implements Serializable{
/**
*
*/
private static final long serialVersionUID = 1L;
private String deviceIdV1;
private String appKey;
private String appType;
private String dt;
private String model;
private String brand;
private String osRelease;
private String appVersion;
private String cid;
private String phone;
private String nick;
private String ip;
private String areaName;
private long waterMarkTime;
private String zoneName;
private String zoneType;
private String zoneCode;
private long firstTime;
private long latestTime;
public CombinedLog(String deviceIdV1, String appKey, String appType, String dt, String model, String brand,
String osRelease, String appVersion, String cid, String phone, String nick, String ip, String areaName,
long waterMarkTime, String zoneName, String zoneType, String zoneCode, long firstTime, long latestTime) {
super();
this.deviceIdV1 = deviceIdV1;
this.appKey = appKey;
this.appType = appType;
this.dt = dt;
this.model = model;
this.brand = brand;
this.osRelease = osRelease;
this.appVersion = appVersion;
this.cid = cid;
this.phone = phone;
this.nick = nick;
this.ip = ip;
this.areaName = areaName;
this.waterMarkTime = waterMarkTime;
this.zoneName = zoneName;
this.zoneType = zoneType;
this.zoneCode = zoneCode;
this.firstTime = firstTime;
this.latestTime = latestTime;
}
}
package com.flink.vo;
import java.io.Serializable;
import lombok.Data;
import lombok.ToString;
/**
* @author wjs
* @version 创建时间:2025-7-31 16:20:48
* 类说明
*/
@Data
@ToString
public class DeviceLog implements Serializable{
/**
*
*/
private static final long serialVersionUID = 1L;
private String deviceId;
private String deviceIdV1;
private String appKey;
private String uniqueId;
private String appType;
private String dt;
private String model;
private String brand;
private String osRelease;
private String appVersion;
private Long waterMarkTime;
private String zoneName;
private String zoneType;
private String zoneCode;
public DeviceLog(
String deviceId,
String deviceIdV1,
String appKey,
String uniqueId,
String appType,
String dt,
String model,
String brand,
String osRelease,
String appVersion,
Long waterMarkTime,
String zoneName,
String zoneType,
String zoneCode
) {
this.deviceId = deviceId;
this.deviceIdV1 = deviceIdV1;
this.appKey = appKey;
this.uniqueId = uniqueId;
this.appType = appType;
this.dt = dt;
this.model = model;
this.brand = brand;
this.osRelease = osRelease;
this.appVersion = appVersion;
this.waterMarkTime = waterMarkTime;
this.zoneName = zoneName;
this.zoneType = zoneType;
this.zoneCode = zoneCode;
}
}
package com.flink.vo;
import java.io.Serializable;
import lombok.Data;
import lombok.ToString;
/**
* @author wjs
* @version 创建时间:2025-8-1 16:24:00
* 类说明
*/
@Data
@ToString(callSuper = true)
public class EnrichedLog extends CombinedLog implements Serializable{
/**
*
*/
private static final long serialVersionUID = 1L;
private String phoneName;
private String networkModel;
/**
* 核心构造方法:基于原始日志对象和维度数据创建增强日志
* @param baseLog 原始日志对象
* @param phoneName 品牌手机名称(可空)
* @param networkModel 入网型号(可空)
*/
public EnrichedLog(CombinedLog baseLog, String phoneName, String networkModel) {
// 调用父类构造方法初始化基础字段
super(
baseLog.getDeviceIdV1(), baseLog.getAppKey(), baseLog.getAppType(),
baseLog.getDt(), baseLog.getModel(), baseLog.getBrand(),
baseLog.getOsRelease(), baseLog.getAppVersion(), baseLog.getCid(),
baseLog.getPhone(), baseLog.getNick(), baseLog.getIp(),
baseLog.getAreaName(), baseLog.getWaterMarkTime(),
baseLog.getZoneName(), baseLog.getZoneType(), baseLog.getZoneCode(),
baseLog.getFirstTime(), baseLog.getLatestTime()
);
this.phoneName = phoneName;
this.networkModel = networkModel;
}
}
package com.flink.vo;
import java.io.Serializable;
import lombok.Data;
import lombok.ToString;
/**
* @author wjs
* @version 创建时间:2025-8-1 15:07:12 类说明
*/
@Data
@ToString
public class EventLog implements Serializable {
/**
*
*/
private static final long serialVersionUID = 1L;
private String deviceId;
private String uniqueId;
private String cid;
private String phone;
private String nick;
private String ip;
private String areaName;
private String eventTime;
private Long waterMarkTime;
private String appKey;
private String appType;
private String createTime;
private String dt;
public EventLog(String deviceId,String uniqueId, String cid, String phone, String nick, String ip, String areaName, String eventTime,Long waterMarkTime,
String appKey, String appType, String createTime,String dt) {
this.deviceId = deviceId;
this.uniqueId = uniqueId;
this.cid = cid;
this.phone = phone;
this.nick = nick;
this.ip = ip;
this.areaName = areaName;
this.eventTime = eventTime;
this.waterMarkTime = waterMarkTime;
this.appKey = appKey;
this.appType = appType;
this.createTime = createTime;
this.dt = dt;
}
}
package com.flink.vo;
import java.io.Serializable;
import lombok.Data;
import lombok.ToString;
/**
* @author wjs
* @version 创建时间:2025-8-1 18:17:36
* 类说明
*/
@Data
@ToString
public class UserDailyActivityOutputLog implements Serializable{
/**
*
*/
private static final long serialVersionUID = 1L;
private String deviceId;
private String cid;
private String appKey;
private String platform;
private String appType;
private String dt;
private String countryCode;
private String phone;
private String nick;
private String brand;
private String model;
private String osRelease;
private String appVersion;
private String ip;
private String areaName;
private String networkIp;
private String networkAreaName;
private Long firstTime;
private Long latestTime;
private String phoneName;
private String networkModel;
private String deviceName;
private String zoneName;
private String zoneType;
private String zoneCode;
}
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or sign in to comment