Commit 9098a04c by 魏建枢

增加作业

parent d894f28f
Showing with 1510 additions and 118 deletions
package com.flink.achieve.doris;
import java.io.Serializable;
import java.time.LocalDate;
import java.time.LocalDateTime;
import java.time.format.DateTimeFormatter;
import java.util.List;
import java.util.Objects;
import org.apache.commons.lang3.StringUtils;
import org.apache.doris.flink.sink.DorisSink;
import org.apache.flink.api.common.io.ParseException;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
......@@ -24,17 +21,12 @@ import com.alibaba.fastjson.JSONObject;
import com.alibaba.fastjson.TypeReference;
import com.flink.common.DorisConnector;
import com.flink.common.SourceCommonBase;
import com.flink.enums.AppTypeEnum;
import com.flink.util.GenDeviceIdV1;
import com.flink.processor.function.GenDeviceIdProcessor;
import com.flink.processor.function.UserPropertiesProcessor;
import com.flink.util.TimeConvertUtil;
import com.flink.vo.CollectLog;
import com.flink.vo.DeviceIdInfo;
import com.flink.vo.UserProperties;
import com.flink.vo.android.AndroidCollectionBody;
import com.flink.vo.android.deviceInfo.AndroidA1;
import com.flink.vo.android.envInfo.AndroidEnvInfo;
import com.flink.vo.android.otherInfo.OtherInfo;
import com.flink.vo.ios.IosCollectionBody;
import com.flink.vo.ios.IosDeviceInfo;
import com.flink.vo.ios.IosEnvInfo;
/**
* @author wjs
......@@ -110,58 +102,24 @@ public class CollectLogAchi extends SourceCommonBase implements Serializable{
String device_info = log.getDeviceInfo();
String env_info = log.getEnvInfo();
String createTime = log.getCreateTime();
String genDeviceId = genDeviceId(appType, other_info, device_info, env_info);
String idfv = null;
String device_id_v1 = null;
if(StringUtils.equals(appType, AppTypeEnum.ANDROID.getCode())) {
device_id_v1 = genDeviceId;
if(StringUtils.isEmpty(appKey) || StringUtils.equals(appKey, "C7jias27jias2")) {
appKey = "8ooOvXJo276";
}
}else if(StringUtils.equals(appType, AppTypeEnum.IOS.getCode())) {
String[] genDeviceIdList = genDeviceId.split(",");
device_id_v1 = (genDeviceIdList.length > 0) ? genDeviceIdList[0] : "";
idfv = (genDeviceIdList.length > 1) ? genDeviceIdList[1] : "";
}
String userProperties = log.getUserProperties();
String cid = null;
String phone = null;
String nick = null;
if(StringUtils.isNotEmpty(userProperties)) {
List<UserProperties> userPropertiesList = JSONObject.parseObject(userProperties,new TypeReference<List<UserProperties>>(){});
if(userPropertiesList != null && userPropertiesList.size() > 0) {
for(UserProperties user : userPropertiesList) {
if(StringUtils.isNotEmpty(user.getCid())) {
cid = user.getCid();
}else if(StringUtils.isNotEmpty(user.getPhone())) {
phone = user.getPhone();
}else if(StringUtils.isNotEmpty(user.getId())) {
cid = user.getId();
}else if(StringUtils.isNotEmpty(user.getNick())) {
nick = user.getNick();
}else if(StringUtils.isNotEmpty(user.getEmail())) {
nick = user.getEmail();
}
}
}
}
DeviceIdInfo deviceIdInfo = GenDeviceIdProcessor.genDeviceId(appType,appKey, other_info, device_info, env_info);
UserProperties userProperties = UserPropertiesProcessor.userPropertiesToJson(log.getUserProperties());
// 转换为RowData
GenericRowData row = new GenericRowData(fields.length);
row.setField(0, StringData.fromString(log.getId())); // id
row.setField(1, convertToSqlDate(createTime.substring(0, 10))); // dt
row.setField(1, TimeConvertUtil.convertToSqlDate(createTime.substring(0, 10))); // dt
row.setField(2, StringData.fromString(log.getDeviceId())); // device_id
row.setField(3, StringData.fromString(device_id_v1)); // device_id_v1
row.setField(3, StringData.fromString(deviceIdInfo==null ? null : deviceIdInfo.getDeviceIdV1())); // device_id_v1
row.setField(4, StringData.fromString(log.getUid())); // uid
row.setField(5, StringData.fromString(idfv)); // idfv
row.setField(5, StringData.fromString(deviceIdInfo==null ? null : deviceIdInfo.getIdfv())); // idfv
row.setField(6, StringData.fromString(log.getAppKey())); // app_key
row.setField(7, StringData.fromString(log.getAppType())); // app_type
row.setField(8, StringData.fromString(other_info)); // other_info
row.setField(9, StringData.fromString(device_info)); // device_info
row.setField(10, StringData.fromString(env_info)); // env_info
row.setField(11, StringData.fromString(cid)); // cid
row.setField(12, StringData.fromString(phone)); // phone
row.setField(13, StringData.fromString(nick)); // nick
row.setField(11, StringData.fromString(userProperties==null ? null : userProperties.getCid())); // cid
row.setField(12, StringData.fromString(userProperties==null ? null : userProperties.getPhone())); // phone
row.setField(13, StringData.fromString(userProperties==null ? null : userProperties.getNick())); // nick
row.setField(14, StringData.fromString(log.getUniqueId())); // unique_id
row.setField(15, TimestampData.fromLocalDateTime(LocalDateTime.parse(createTime,DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss.SSS")))); //
row.setField(16, 0); // __DORIS_DELETE_SIGN__
......@@ -177,33 +135,6 @@ public class CollectLogAchi extends SourceCommonBase implements Serializable{
.name("Doris-CollectLog");
}
private static int convertToSqlDate(String datetime) {
return (int) LocalDate.parse(datetime.split(" ")[0])
.toEpochDay(); // 转换为天数偏移量
}
private String genDeviceId(String appType,String other_info,String device_info,String env_info) {
String deviceId = null;
if(StringUtils.equals(appType, AppTypeEnum.ANDROID.getCode())) {
AndroidA1 a1 = JSONObject.parseObject(device_info,new TypeReference<AndroidA1>(){});
AndroidEnvInfo g1 = JSONObject.parseObject(env_info,new TypeReference<AndroidEnvInfo>(){});
OtherInfo i1 = JSONObject.parseObject(other_info,new TypeReference<OtherInfo>(){});
AndroidCollectionBody androidBodyObj = new AndroidCollectionBody();
androidBodyObj.setA1(a1);
androidBodyObj.setG1(g1);
androidBodyObj.setI1(i1);
deviceId = GenDeviceIdV1.genAndroidDeviceIdHashV1(androidBodyObj);
}else if(StringUtils.equals(appType, AppTypeEnum.IOS.getCode())) {
IosDeviceInfo a1 = JSONObject.parseObject(device_info,new TypeReference<IosDeviceInfo>(){});
IosEnvInfo g1 = JSONObject.parseObject(env_info,new TypeReference<IosEnvInfo>(){});
IosCollectionBody iosBodyObj = new IosCollectionBody();
iosBodyObj.setA1(a1);
iosBodyObj.setG1(g1);
deviceId = GenDeviceIdV1.genIosDeviceIdHash(iosBodyObj);
}
return deviceId;
}
@Override
public void sendToSinkKafka(DataStreamSource<String> mStream) {
// TODO Auto-generated method stub
......
package com.flink.achieve.doris;
import java.io.Serializable;
import java.time.Duration;
import java.time.LocalDateTime;
import java.time.format.DateTimeFormatter;
import java.util.List;
import java.util.Objects;
import org.apache.commons.collections.CollectionUtils;
import org.apache.commons.lang3.StringUtils;
import org.apache.doris.flink.sink.DorisSink;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.common.io.ParseException;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.table.api.DataTypes;
import org.apache.flink.table.data.GenericRowData;
import org.apache.flink.table.data.RowData;
import org.apache.flink.table.data.StringData;
import org.apache.flink.table.data.TimestampData;
import org.apache.flink.table.types.DataType;
import org.apache.flink.util.Collector;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import com.alibaba.fastjson.JSONObject;
import com.alibaba.fastjson.TypeReference;
import com.flink.common.DorisConnector;
import com.flink.common.MultipleSourceCommonBase;
import com.flink.enums.TopicTypeEnum;
import com.flink.processor.function.CollectLogJoinProcessor;
import com.flink.processor.function.GenDeviceIdProcessor;
import com.flink.processor.function.LatestUserProcessFunction;
import com.flink.processor.function.UserPropertiesProcessor;
import com.flink.util.TimeConvertUtil;
import com.flink.vo.CollectLog;
import com.flink.vo.DeviceId;
import com.flink.vo.DeviceIdInfo;
import com.flink.vo.KafkaDataSource;
import com.flink.vo.Result;
import com.flink.vo.SimiUserInfo;
import com.flink.vo.UserProperties;
/**
* @author wjs
* @version 创建时间:2025-5-28 10:44:56
* 类说明
*/
public class DeviceIdLatestAchi extends MultipleSourceCommonBase implements Serializable{
/**
*
*/
private static final long serialVersionUID = 1L;
private static final Logger logger = LoggerFactory.getLogger(DeviceIdLatestAchi.class);
@Override
public void parseSourceKafkaJson(List<KafkaDataSource> dataSourceList) throws ParseException, Exception {
//=================配置入库字段=========================================
String[] fields = {
"cid",
"phone",
"app_key",
"device_id",
"app_type",
"country_code",
"user_state",
"nick",
"create_time",
"__DORIS_DELETE_SIGN__"
};
DataType[] types = {
DataTypes.STRING(),
DataTypes.STRING(),
DataTypes.STRING(),
DataTypes.STRING(),
DataTypes.STRING(),
DataTypes.STRING(),
DataTypes.STRING(),
DataTypes.STRING(),
DataTypes.TIMESTAMP(3),
DataTypes.INT()
};
//=================流式处理=========================================
String tableName = "bi.device_id_latest";
DorisSink<RowData> dorisSink = DorisConnector.sinkDoris(fields, types, tableName);
//=================数据处理流水线=========================================
operatorStream(dataSourceList).map(new MapFunction<Result, RowData>(){
/**
*
*/
private static final long serialVersionUID = 1L;
@Override
public RowData map(Result result) throws Exception {
if(null == result) {
return null;
}
GenericRowData row = new GenericRowData(fields.length);
row.setField(0, StringData.fromString(result.getCid()));
row.setField(1, StringData.fromString(result.getPhone()));
row.setField(2, StringData.fromString(result.getAppKey()));
row.setField(3, StringData.fromString(result.getDeviceId()));
row.setField(4, StringData.fromString(result.getAppType()));
row.setField(5, StringData.fromString(result.getCountryCode()));
row.setField(6, StringData.fromString(result.getUserState()));
row.setField(7, StringData.fromString(result.getNick()));
row.setField(8, TimestampData.fromLocalDateTime(LocalDateTime.parse(result.getCreateTime(),DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss.SSS"))));
row.setField(9, 0); // __DORIS_DELETE_SIGN__
return row;
}
})
.filter(Objects::nonNull)
// .print()
.sinkTo(dorisSink)
.name("Doris-DeviceIdLatest");
}
private DataStream<Result> operatorStream(List<KafkaDataSource> dataSourceList) {
DataStreamSource<String> userStreamSource = null;
DataStreamSource<String> collectLogStreamSource = null;
if(CollectionUtils.isNotEmpty(dataSourceList)) {
for(KafkaDataSource kafkaDataSource : dataSourceList) {
if(StringUtils.equals(kafkaDataSource.getTopic(),TopicTypeEnum.SIMI_USER_LIST_TOPIC.getTopic())) {
userStreamSource = kafkaDataSource.getDataStreamSource();
}
if(StringUtils.equals(kafkaDataSource.getTopic(),TopicTypeEnum.ODS_NEW_COLLECT_LOG.getTopic())) {
collectLogStreamSource = kafkaDataSource.getDataStreamSource();
}
}
}else {
return null;
}
// 用户数据流处理(5分钟批量更新)
DataStream<SimiUserInfo> userDataStream = userStreamSource.flatMap(new FlatMapFunction<String, SimiUserInfo>() {
private static final long serialVersionUID = 1L;
@Override
public void flatMap(String value, Collector<SimiUserInfo> out) throws Exception {
try {
// 解析 Kafka 数据
SimiUserInfo simiUserInfo = JSONObject.parseObject(value, new TypeReference<SimiUserInfo>() {
});
simiUserInfo.setUpdateTime(TimeConvertUtil.convertToTimestamp(simiUserInfo.getCreate_time()));
out.collect(simiUserInfo);
} catch (Exception e) {
logger.error("Error parsing simi_user_list 处理 Kafka 消息出错 | data:{} | error:{}", value,e.getMessage());
}
}
})
.filter(u -> StringUtils.isNoneEmpty(u.getCid(), u.getPhone_number()))
.assignTimestampsAndWatermarks(
WatermarkStrategy.<SimiUserInfo>forBoundedOutOfOrderness(Duration.ofMinutes(5))
.withTimestampAssigner((user, ts) -> user.getUpdateTime()))
.keyBy(user -> user.getCid() + "#_#" + user.getPhone_number()).process(new LatestUserProcessFunction());
// 事件数据流处理
DataStream<DeviceId> deviceDataStream = collectLogStreamSource.flatMap(new FlatMapFunction<String, DeviceId>() {
private static final long serialVersionUID = 1L;
@Override
public void flatMap(String value, Collector<DeviceId> out) throws Exception {
try {
// 解析 Kafka 数据
DeviceId device = handleData(value);
if (device != null)
out.collect(device);
} catch (Exception e) {
logger.error("Error parsing ods_new_collect_log 处理 Kafka 消息出错 | data:{} | error:{}", value,e.getMessage());
}
}
private DeviceId handleData(String value) throws Exception {
CollectLog log = JSONObject.parseObject(value, new TypeReference<CollectLog>(){});
if(null == log) {
return null;
}
String appType = log.getAppType();
String appKey = log.getAppKey();
String other_info = log.getOtherInfo();
String device_info = log.getDeviceInfo();
String env_info = log.getEnvInfo();
String createTime = log.getCreateTime();
DeviceIdInfo deviceIdInfo = GenDeviceIdProcessor.genDeviceId(appType,appKey, other_info, device_info, env_info);
UserProperties userProperties = UserPropertiesProcessor.userPropertiesToJson(log.getUserProperties());
DeviceId deviceId = new DeviceId();
deviceId.setDeviceId(deviceIdInfo == null ? null : deviceIdInfo.getDeviceIdV1());
deviceId.setCid(userProperties == null ? null : userProperties.getCid());
deviceId.setPhone(userProperties == null ? null : userProperties.getPhone());
deviceId.setNick(userProperties == null ? null : userProperties.getNick());
deviceId.setAppKey(appKey);
deviceId.setAppType(appType);
deviceId.setCreateTime(createTime);
deviceId.setCollectTime(TimeConvertUtil.convertToTimestamp(createTime));
return deviceId;
}
})
.assignTimestampsAndWatermarks(WatermarkStrategy.<DeviceId>forBoundedOutOfOrderness(Duration.ofSeconds(5))
.withTimestampAssigner((device, ts) -> device.getCollectTime()));
return deviceDataStream.connect(userDataStream)
.keyBy(device -> device.getCid() + "#_#" + device.getPhone(),
user -> user.getCid() + "#_#" + user.getPhone_number())
.process(new CollectLogJoinProcessor() {
/**
*
*/
private static final long serialVersionUID = 1L;
});
}
}
......@@ -16,13 +16,11 @@ import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.common.io.ParseException;
import org.apache.flink.api.common.state.MapState;
import org.apache.flink.api.common.state.MapStateDescriptor;
import org.apache.flink.api.common.state.StateTtlConfig;
import org.apache.flink.api.common.state.ValueState;
import org.apache.flink.api.common.state.ValueStateDescriptor;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.functions.KeyedProcessFunction;
import org.apache.flink.streaming.api.functions.co.CoProcessFunction;
import org.apache.flink.table.api.DataTypes;
import org.apache.flink.table.data.GenericRowData;
......@@ -38,6 +36,8 @@ import com.alibaba.fastjson.JSONObject;
import com.alibaba.fastjson.TypeReference;
import com.flink.common.DorisConnector;
import com.flink.common.MultipleSourceCommonBase;
import com.flink.enums.TopicTypeEnum;
import com.flink.processor.function.LatestUserProcessFunction;
import com.flink.util.CompareUtils;
import com.flink.util.TimeConvertUtil;
import com.flink.util.ip2region.SearcherUtil;
......@@ -137,10 +137,10 @@ public class EventIpLatestAchi extends MultipleSourceCommonBase implements Seria
DataStreamSource<String> eventStreamSource = null;
if(CollectionUtils.isNotEmpty(dataSourceList)) {
for(KafkaDataSource kafkaDataSource : dataSourceList) {
if(StringUtils.equals(kafkaDataSource.getTopic(),"simi_user_list")) {
if(StringUtils.equals(kafkaDataSource.getTopic(),TopicTypeEnum.SIMI_USER_LIST_TOPIC.getTopic())) {
userStreamSource = kafkaDataSource.getDataStreamSource();
}
if(StringUtils.equals(kafkaDataSource.getTopic(),"ods_event_log")) {
if(StringUtils.equals(kafkaDataSource.getTopic(),TopicTypeEnum.ODS_EVENT_LOG.getTopic())) {
eventStreamSource = kafkaDataSource.getDataStreamSource();
}
}
......@@ -262,38 +262,6 @@ public class EventIpLatestAchi extends MultipleSourceCommonBase implements Seria
}
}
// 使用状态维护每个用户的最新记录
private static class LatestUserProcessFunction extends KeyedProcessFunction<String, SimiUserInfo, SimiUserInfo> {
/**
*
*/
private static final long serialVersionUID = 1L;
private ValueState<SimiUserInfo> latestUserState;
@Override
public void open(Configuration parameters) {
// 初始化用户状态
ValueStateDescriptor<SimiUserInfo> descriptor = new ValueStateDescriptor<>("user-state",
SimiUserInfo.class);
StateTtlConfig ttlConfig = StateTtlConfig.newBuilder(Duration.ofMinutes(30))
.setUpdateType(StateTtlConfig.UpdateType.OnCreateAndWrite)
.cleanupIncrementally(1000, true)//Heap StateBackend 使用增量清理
// .cleanupInRocksdbCompactFilter(1000) //RocksDB StateBackend 使用压缩清理
.build();
descriptor.enableTimeToLive(ttlConfig);
latestUserState = getRuntimeContext().getState(descriptor);
}
@Override
public void processElement(SimiUserInfo user, Context ctx, Collector<SimiUserInfo> out) throws Exception {
SimiUserInfo currentLatest = latestUserState.value();
if (currentLatest == null || user.getUpdateTime() > currentLatest.getUpdateTime()) {
latestUserState.update(user);
out.collect(user);
}
}
}
public static EventIp handleData(String record) throws ParseException, Exception {
// TODO 数据的 ETL 处理
OdsEventLog odsEventLog = JSONObject.parseObject(record, new TypeReference<OdsEventLog>() {});
......
package com.flink.achieve.doris;
import java.io.Serializable;
import java.util.Arrays;
import java.util.List;
import java.util.Objects;
import org.apache.commons.lang3.StringUtils;
import org.apache.doris.flink.sink.DorisSink;
import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.common.io.ParseException;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.table.api.DataTypes;
import org.apache.flink.table.data.GenericRowData;
import org.apache.flink.table.data.RowData;
import org.apache.flink.table.data.StringData;
import org.apache.flink.table.types.DataType;
import org.apache.flink.util.Collector;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONObject;
import com.alibaba.fastjson.TypeReference;
import com.flink.common.DorisConnector;
import com.flink.common.SourceCommonBase;
import com.flink.enums.OpenSimiApiTypeEnum;
import com.flink.processor.impl.OkHttpService;
import com.flink.vo.simi.InitiateFriendRequestReqDto;
import com.flink.vo.simi.SimiFriends;
/**
* @author wjs
* @version 创建时间:2025-5-29 10:53:48
* 类说明
*/
public class SimiFriendsAchi extends SourceCommonBase implements Serializable{
/**
*
*/
private static final long serialVersionUID = 1L;
private static final Logger logger = LoggerFactory.getLogger(SimiFriendsAchi.class);
@Override
public void parseSourceKafkaJson(DataStreamSource<String> dataStreamSource) throws ParseException, Exception {
//=================配置入库字段=========================================
String[] fields = {
"cid",
"friend_cid",
"nick",
"add_time",
"third_id",
"remarks",
"__DORIS_DELETE_SIGN__"
};
DataType[] types = {
DataTypes.STRING(),
DataTypes.STRING(),
DataTypes.STRING(),
DataTypes.STRING(),
DataTypes.STRING(),
DataTypes.STRING(),
DataTypes.INT()
};
//=================流式处理=========================================
String tableName = "bi.simi_friends";
DorisSink<RowData> dorisSink = DorisConnector.sinkDoris(fields, types, tableName);
SingleOutputStreamOperator<RowData> rowDataStream = dataStreamSource.flatMap(
new FlatMapFunction<String, RowData>() {
private static final long serialVersionUID = 1L;
@Override
public void flatMap(String value, Collector<RowData> out) throws Exception {
try {
JSONObject jsonObj = JSON.parseObject(value);
String sendType = jsonObj.getString("send_type");
if(!StringUtils.equals(OpenSimiApiTypeEnum.INITIATE_FRIEND_REQUEST.getCode(), sendType)) {
return;
}
String reqBody = jsonObj.getString("req_body");
InitiateFriendRequestReqDto jsonReqDto = JSONObject.parseObject(reqBody,new TypeReference<InitiateFriendRequestReqDto>(){});
List<String> cidList = Arrays.asList(jsonReqDto.getCid(), jsonReqDto.getFriendCid());
for(String cid : cidList) {
String friendsInfo = OkHttpService.friends(cid);
if(StringUtils.isNotEmpty(friendsInfo)) {
List<SimiFriends> friendsList = JSONObject.parseObject(friendsInfo,new TypeReference<List<SimiFriends>>(){});
for (SimiFriends friend : friendsList) {
GenericRowData row = new GenericRowData(fields.length);
row.setField(0, StringData.fromString(cid));
row.setField(1, StringData.fromString(friend.getCid()));
row.setField(2, StringData.fromString(friend.getNick()));
row.setField(3, StringData.fromString(friend.getAddTime()));
row.setField(4, StringData.fromString(friend.getThirdId()));
row.setField(5, StringData.fromString(friend.getRemarks()));
row.setField(6, 0);
out.collect(row);
}
}
}
} catch (Exception e) {
logger.error("SimiFriendsAchi 处理 Kafka 消息出错 | rawData:{} | error:{}", value, e.getMessage());
}
}
});
rowDataStream
.filter(Objects::nonNull)
// .print()
.sinkTo(dorisSink)
.name("Doris-SimiFriends");
}
@Override
public void sendToSinkKafka(DataStreamSource<String> mStream) {
// TODO Auto-generated method stub
}
}
package com.flink.achieve.doris;
import java.io.Serializable;
import java.util.Arrays;
import java.util.List;
import java.util.Objects;
import org.apache.commons.lang3.StringUtils;
import org.apache.doris.flink.sink.DorisSink;
import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.common.io.ParseException;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.table.api.DataTypes;
import org.apache.flink.table.data.GenericRowData;
import org.apache.flink.table.data.RowData;
import org.apache.flink.table.data.StringData;
import org.apache.flink.table.types.DataType;
import org.apache.flink.util.Collector;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONObject;
import com.alibaba.fastjson.TypeReference;
import com.flink.common.DorisConnector;
import com.flink.common.SourceCommonBase;
import com.flink.enums.OpenSimiApiTypeEnum;
import com.flink.processor.impl.OkHttpService;
import com.flink.vo.simi.InitiateFriendRequestReqDto;
import com.flink.vo.simi.SimiGroups;
/**
* @author wjs
* @version 创建时间:2025-5-29 10:53:34
* 类说明
*/
public class SimiGroupstAchi extends SourceCommonBase implements Serializable{
/**
*
*/
private static final long serialVersionUID = 1L;
private static final Logger logger = LoggerFactory.getLogger(SimiGroupstAchi.class);
@Override
public void parseSourceKafkaJson(DataStreamSource<String> dataStreamSource) throws ParseException, Exception {
//=================配置入库字段=========================================
String[] fields = {
"cid",
"group_id",
"group_name",
"add_time",
"__DORIS_DELETE_SIGN__"
};
DataType[] types = {
DataTypes.STRING(),
DataTypes.BIGINT(),
DataTypes.STRING(),
DataTypes.STRING(),
DataTypes.INT()
};
//=================流式处理=========================================
String tableName = "bi.simi_groups";
DorisSink<RowData> dorisSink = DorisConnector.sinkDoris(fields, types, tableName);
SingleOutputStreamOperator<RowData> rowDataStream = dataStreamSource.flatMap(
new FlatMapFunction<String, RowData>() {
private static final long serialVersionUID = 1L;
@Override
public void flatMap(String value, Collector<RowData> out) throws Exception {
try {
JSONObject jsonObj = JSON.parseObject(value);
String sendType = jsonObj.getString("send_type");
if(!StringUtils.equals(OpenSimiApiTypeEnum.INITIATE_FRIEND_REQUEST.getCode(), sendType)) {
return;
}
// 2. 解析请求体
String reqBody = jsonObj.getString("req_body");
InitiateFriendRequestReqDto jsonReqDto = JSONObject.parseObject(reqBody,new TypeReference<InitiateFriendRequestReqDto>(){});
List<String> cidList = Arrays.asList(jsonReqDto.getCid(), jsonReqDto.getFriendCid());
for(String cid : cidList) {
String groupsInfo = OkHttpService.groups(cid);
if(StringUtils.isNotEmpty(groupsInfo)) {
List<SimiGroups> groupsList = JSONObject.parseObject(groupsInfo,new TypeReference<List<SimiGroups>>(){});
for (SimiGroups group : groupsList) {
GenericRowData row = new GenericRowData(fields.length);
row.setField(0, StringData.fromString(cid));
row.setField(1, group.getGroupId().longValue());
row.setField(2, StringData.fromString(group.getGroupName()));
row.setField(3, StringData.fromString(group.getAddTime()));
row.setField(4, 0);
out.collect(row);
}
}
}
} catch (Exception e) {
logger.error("SimiGroupsAchi 处理 Kafka 消息出错 | rawData:{} | error:{}", value, e.getMessage(), e);
}
}
});
rowDataStream
.filter(Objects::nonNull)
.sinkTo(dorisSink)
.name("Doris-SimiGroups");
}
@Override
public void sendToSinkKafka(DataStreamSource<String> mStream) {
// TODO Auto-generated method stub
}
}
......@@ -24,6 +24,9 @@ public enum JobTypeEnum {
REAL_BALANCE("JOB_05", "真实余额作业"),
COLLECT_LOG("JOB_06", "日志采集作业"),
EVENT_IP_CONVERT_CID("JOB_07", "最新事件IP作业"),
DEVICE_ID_CID("JOB_08", "最新设备ID作业"),
SIMI_FRIENDS("JOB_09", "SIMI好友作业"),
SIMI_GROUPS("JOB_10", "SIMI群组作业"),
;
......
package com.flink.enums;
/**
* @author wjs
* @version 创建时间:2025-5-29 10:37:10
* 类说明
*/
public enum OpenSimiApiTypeEnum {
/**
* 1、用户注册
*/
USER_REGISTRATION("1", "用户注册"),
/**
* 2、修改昵称
*/
UPDATE_NICKNAME("2", "修改昵称"),
/**
* 3、添加发起添加好友申请
*/
INITIATE_FRIEND_REQUEST("3", "添加发起添加好友申请"),
/**
* 4、成功添加好友
*/
FRIEND_REQUEST_ACCEPTED("4", "成功添加好友"),
/**
* 5、修改好友昵称
*/
UPDATE_FRIEND_NICKNAME("5", "修改好友昵称"),
/**
* 6、删除好友
*/
DELETE_FRIEND("6", "删除好友"),
/**
* 7、创建群组
*/
CREATE_GROUP("7", "创建群组"),
/**
* 8、加入群组
*/
JOIN_GROUP("8", "加入群组"),
/**
* 9、修改群组名称
*/
UPDATE_GROUP_NAME("9", "修改群组名称"),
/**
* 10、退出群组
*/
LEAVE_GROUP("10", "退出群组"),
/**
* 11、发布博客/文章
*/
PUBLISH_CONTENT("11", "发布博客/文章"),
/**
* 12、博客/文章点赞转发评论
*/
CONTENT_INTERACTION("12", "博客/文章点赞转发评论"),
;
private String code;
private String name;
private OpenSimiApiTypeEnum(String code, String name) {
this.code = code;
this.name = name;
}
public String getCode() {
return code;
}
public String getName() {
return name;
}
}
......@@ -25,6 +25,7 @@ public enum TopicTypeEnum {
ODS_EVENT_IP_CONVERT("ods_event_ip_convert","odsEventIpConvert"),
ODS_USER_INVITATION("ods_user_invitation","odsUserInvitation"),
SIMI_USER_LIST_TOPIC("simi_user_list","simiUserList"),
OPEN_SIMI_API("ods_open_simi_api","odsOpenSimiApi"),
;
private String topic;
......
......@@ -3,12 +3,15 @@ package com.flink.factory;
import com.flink.enums.JobTypeEnum;
import com.flink.processor.JobProcessor;
import com.flink.processor.impl.CollectLogProcessor;
import com.flink.processor.impl.EventIpLatestProcessor;
import com.flink.processor.impl.DeviceIdLatestProcessor;
import com.flink.processor.impl.EventIpConvertProcessor;
import com.flink.processor.impl.EventIpLatestProcessor;
import com.flink.processor.impl.RealBalanceProcessor;
import com.flink.processor.impl.RealKycProcessor;
import com.flink.processor.impl.RealTransactionProcessor;
import com.flink.processor.impl.RealUsersProcessor;
import com.flink.processor.impl.SimiFriendsProcessor;
import com.flink.processor.impl.SimiGroupstProcessor;
/**
* @author wjs
......@@ -33,6 +36,12 @@ public class JobProcessorFactory {
return new CollectLogProcessor();
case EVENT_IP_CONVERT_CID:
return new EventIpLatestProcessor();
case DEVICE_ID_CID:
return new DeviceIdLatestProcessor();
case SIMI_FRIENDS:
return new SimiFriendsProcessor();
case SIMI_GROUPS:
return new SimiGroupstProcessor();
default:
throw new IllegalArgumentException("未知的Job类型: " + jobType);
}
......
package com.flink.processor.function;
import org.apache.flink.api.common.state.MapState;
import org.apache.flink.api.common.state.MapStateDescriptor;
import org.apache.flink.api.common.state.ValueState;
import org.apache.flink.api.common.state.ValueStateDescriptor;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.functions.co.CoProcessFunction;
import org.apache.flink.util.Collector;
import com.flink.vo.DeviceId;
import com.flink.vo.Result;
import com.flink.vo.SimiUserInfo;
/**
* @author wjs
* @version 创建时间:2025-5-28 11:05:33
* 类说明
*/
public class CollectLogJoinProcessor extends CoProcessFunction<DeviceId, SimiUserInfo, Result> {
/**
*
*/
private static final long serialVersionUID = 1L;
private ValueState<SimiUserInfo> userState;
private MapState<Long, DeviceId> pendingDevices;
@Override
public void open(Configuration parameters) {
userState = getRuntimeContext().getState(new ValueStateDescriptor<>("user-state", SimiUserInfo.class));
MapStateDescriptor<Long, DeviceId> devicesDesc = new MapStateDescriptor<>("pendingDevices", Long.class,
DeviceId.class);
pendingDevices = getRuntimeContext().getMapState(devicesDesc);
}
@Override
public void processElement1(DeviceId device, CoProcessFunction<DeviceId, SimiUserInfo, Result>.Context ctx,
Collector<Result> out) throws Exception {
SimiUserInfo user = userState.value();
if (user != null) {
out.collect(buildResult(user, device));
} else {
pendingDevices.put(device.getCollectTime(), device);
ctx.timerService().registerEventTimeTimer(device.getCollectTime() + 60000); // 1分钟超时
}
}
@Override
public void processElement2(SimiUserInfo user, Context ctx, Collector<Result> out) throws Exception {
userState.update(user); // 更新最新用户状态
for (DeviceId device : pendingDevices.values()) {
out.collect(buildResult(user, device));
}
pendingDevices.clear();
}
@Override
public void onTimer(long timestamp, OnTimerContext ctx, Collector<Result> out) throws Exception {
pendingDevices.remove(timestamp - 60000); // 清理超时事件
}
private Result buildResult(SimiUserInfo user, DeviceId device) {
Result result = new Result();
result.setCid(user.getCid());
result.setPhone(user.getPhone_number());
result.setCountryCode(user.getCountry_code());
result.setUserState(user.getUser_state());
result.setDeviceId(device.getDeviceId());
result.setNick(device.getNick());
result.setAppKey(device.getAppKey());
result.setAppType(device.getAppType());
result.setCreateTime(device.getCreateTime());
return result;
}
}
package com.flink.processor.function;
import org.apache.commons.lang3.StringUtils;
import com.alibaba.fastjson.JSONObject;
import com.alibaba.fastjson.TypeReference;
import com.flink.enums.AppTypeEnum;
import com.flink.util.GenDeviceIdV1;
import com.flink.vo.DeviceIdInfo;
import com.flink.vo.android.AndroidCollectionBody;
import com.flink.vo.android.deviceInfo.AndroidA1;
import com.flink.vo.android.envInfo.AndroidEnvInfo;
import com.flink.vo.android.otherInfo.OtherInfo;
import com.flink.vo.ios.IosCollectionBody;
import com.flink.vo.ios.IosDeviceInfo;
import com.flink.vo.ios.IosEnvInfo;
/**
* @author wjs
* @version 创建时间:2025-5-28 14:39:26
* 类说明
*/
public class GenDeviceIdProcessor {
public static DeviceIdInfo genDeviceId(String appType,String appKey,String other_info,String device_info,String env_info) {
DeviceIdInfo deviceIdInfo = new DeviceIdInfo();
String deviceId = null;
String idfv = null;
String deviceIdV1 = null;
if(StringUtils.equals(appType, AppTypeEnum.ANDROID.getCode())) {
AndroidA1 a1 = JSONObject.parseObject(device_info,new TypeReference<AndroidA1>(){});
AndroidEnvInfo g1 = JSONObject.parseObject(env_info,new TypeReference<AndroidEnvInfo>(){});
OtherInfo i1 = JSONObject.parseObject(other_info,new TypeReference<OtherInfo>(){});
AndroidCollectionBody androidBodyObj = new AndroidCollectionBody();
androidBodyObj.setA1(a1);
androidBodyObj.setG1(g1);
androidBodyObj.setI1(i1);
deviceId = GenDeviceIdV1.genAndroidDeviceIdHashV1(androidBodyObj);
}else if(StringUtils.equals(appType, AppTypeEnum.IOS.getCode())) {
IosDeviceInfo a1 = JSONObject.parseObject(device_info,new TypeReference<IosDeviceInfo>(){});
IosEnvInfo g1 = JSONObject.parseObject(env_info,new TypeReference<IosEnvInfo>(){});
IosCollectionBody iosBodyObj = new IosCollectionBody();
iosBodyObj.setA1(a1);
iosBodyObj.setG1(g1);
deviceId = GenDeviceIdV1.genIosDeviceIdHash(iosBodyObj);
}
if(StringUtils.equals(appType, AppTypeEnum.ANDROID.getCode())) {
deviceIdV1 = deviceId;
if(StringUtils.isEmpty(appKey) || StringUtils.equals(appKey, "C7jias27jias2")) {
appKey = "8ooOvXJo276";
}
}else if(StringUtils.equals(appType, AppTypeEnum.IOS.getCode())) {
String[] genDeviceIdList = deviceId.split(",");
deviceIdV1 = (genDeviceIdList.length > 0) ? genDeviceIdList[0] : "";
idfv = (genDeviceIdList.length > 1) ? genDeviceIdList[1] : "";
}
deviceIdInfo.setDeviceIdV1(deviceIdV1);
deviceIdInfo.setIdfv(idfv);
return deviceIdInfo;
}
}
package com.flink.processor.function;
import java.time.Duration;
import org.apache.flink.api.common.state.StateTtlConfig;
import org.apache.flink.api.common.state.ValueState;
import org.apache.flink.api.common.state.ValueStateDescriptor;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.functions.KeyedProcessFunction;
import org.apache.flink.util.Collector;
import com.flink.vo.SimiUserInfo;
/**
* @author wjs
* @version 创建时间:2025-5-28 11:01:52
* 类说明 使用状态维护每个用户的最新记录
*/
public class LatestUserProcessFunction extends KeyedProcessFunction<String, SimiUserInfo, SimiUserInfo>{
/**
*
*/
private static final long serialVersionUID = 1L;
private ValueState<SimiUserInfo> latestUserState;
@Override
public void open(Configuration parameters) {
// 初始化用户状态
ValueStateDescriptor<SimiUserInfo> descriptor = new ValueStateDescriptor<>("user-state",
SimiUserInfo.class);
StateTtlConfig ttlConfig = StateTtlConfig.newBuilder(Duration.ofMinutes(30))
.setUpdateType(StateTtlConfig.UpdateType.OnCreateAndWrite)
.cleanupIncrementally(1000, true)//Heap StateBackend 使用增量清理
// .cleanupInRocksdbCompactFilter(1000) //RocksDB StateBackend 使用压缩清理
.build();
descriptor.enableTimeToLive(ttlConfig);
latestUserState = getRuntimeContext().getState(descriptor);
}
@Override
public void processElement(SimiUserInfo user, Context ctx, Collector<SimiUserInfo> out) throws Exception {
SimiUserInfo currentLatest = latestUserState.value();
if (currentLatest == null || user.getUpdateTime() > currentLatest.getUpdateTime()) {
latestUserState.update(user);
out.collect(user);
}
}
}
package com.flink.processor.function;
import java.util.List;
import org.apache.commons.lang3.StringUtils;
import com.alibaba.fastjson.JSONObject;
import com.alibaba.fastjson.TypeReference;
import com.flink.vo.UserProperties;
/**
* @author wjs
* @version 创建时间:2025-5-28 14:42:21
* 类说明
*/
public class UserPropertiesProcessor {
public static UserProperties userPropertiesToJson(String value) {
UserProperties userProperties = new UserProperties();
String cid = null;
String phone = null;
String nick = null;
if(StringUtils.isNotEmpty(value)) {
List<UserProperties> userPropertiesList = JSONObject.parseObject(value,new TypeReference<List<UserProperties>>(){});
if(userPropertiesList != null && userPropertiesList.size() > 0) {
for(UserProperties user : userPropertiesList) {
if(StringUtils.isNotEmpty(user.getCid())) {
cid = user.getCid();
}else if(StringUtils.isNotEmpty(user.getPhone())) {
phone = user.getPhone();
}else if(StringUtils.isNotEmpty(user.getId())) {
cid = user.getId();
}else if(StringUtils.isNotEmpty(user.getNick())) {
nick = user.getNick();
}else if(StringUtils.isNotEmpty(user.getEmail())) {
nick = user.getEmail();
}
}
}
}
userProperties.setCid(cid);
userProperties.setNick(nick);
userProperties.setPhone(phone);
return userProperties;
}
}
package com.flink.processor.impl;
import java.util.Arrays;
import java.util.List;
import java.util.stream.Collectors;
import com.flink.achieve.doris.DeviceIdLatestAchi;
import com.flink.enums.JobTypeEnum;
import com.flink.enums.TopicTypeEnum;
import com.flink.processor.JobProcessor;
import com.flink.vo.KafkaTopic;
/**
* @author wjs
* @version 创建时间:2025-5-28 10:44:27
* 类说明
*/
public class DeviceIdLatestProcessor implements JobProcessor{
@Override
public void process() throws Exception {
new DeviceIdLatestAchi().handleDataStreamSource(
createTopicList(),
JobTypeEnum.DEVICE_ID_CID
);
}
private static List<KafkaTopic> createTopicList() {
return Arrays.stream(new TopicTypeEnum[]{
TopicTypeEnum.ODS_NEW_COLLECT_LOG,
TopicTypeEnum.SIMI_USER_LIST_TOPIC
}).map(TopicTypeEnum::createKafkaTopic)
.collect(Collectors.toList());
}
}
package com.flink.processor.impl;
import java.io.IOException;
import org.apache.commons.codec.digest.DigestUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import com.alibaba.fastjson.JSONObject;
import com.alibaba.fastjson.TypeReference;
import com.flink.util.LoadPropertiesFile;
import com.flink.vo.SimiInterfaceBase;
import okhttp3.MediaType;
import okhttp3.OkHttpClient;
import okhttp3.Request;
import okhttp3.RequestBody;
import okhttp3.Response;
/**
* @author wjs
* @version 创建时间:2025-5-29 11:03:31
* 类说明
*/
public class OkHttpService{
private static final Logger logger = LoggerFactory.getLogger(OkHttpService.class);
public static final MediaType JSON = MediaType.parse("application/json; charset=utf-8");
private final static String URL = LoadPropertiesFile.getPropertyFileValues("simiUserInfo.url");
private final static String AUTHORIZATION = LoadPropertiesFile.getPropertyFileValues("simiUserInfo.authorization");
private final static String KEY = LoadPropertiesFile.getPropertyFileValues("simiUserInfo.key");
public static void main(String[] args) {
// String str = friends("3333ilove");
String str = groups("3333ilove");
System.out.println(str);
}
public static String friends(String cid){
String timestamp = System.currentTimeMillis()+"";
String reqParams = StringBuilderParams.friends(cid, null);
String url = URL+"/dataApi/friends?"+reqParams;
String authorization = AUTHORIZATION;
String signature = DigestUtils.md5Hex(reqParams+"&"+authorization+"&"+timestamp+"&"+KEY);
return get(url, timestamp,authorization,signature,cid);
}
public static String groups(String cid) {
String timestamp = System.currentTimeMillis()+"";
String reqParams = StringBuilderParams.groups(cid, null);
String url = URL+"/dataApi/groups?"+reqParams;
String authorization = AUTHORIZATION;
String signature = DigestUtils.md5Hex(reqParams+"&"+authorization+"&"+timestamp+"&"+KEY);
return get(url, timestamp,authorization,signature,cid);
}
private static String get(String url, String timestamp, String authorization, String signature,String cid) {
System.out.println("get OkHttpService: 请求内容 : \n " + url);
OkHttpClient client = new OkHttpClient();
Request request = new Request.Builder()
.header("timestamp", timestamp)
.addHeader("Authorization", authorization)
.addHeader("signature", signature)
.url(url)
.get()
.build();
String result = null;
try {
Response response = client.newCall(request).execute();
if (response.isSuccessful()) {
String resultStr = response.body().string();
logger.info("OkHttpService 响应内容 body:{},friendsType:{},cid:{}",resultStr,cid);
SimiInterfaceBase resBody = JSONObject.parseObject(resultStr,new TypeReference<SimiInterfaceBase>(){});
result = resBody.getData();
} else {
logger.info("OkHttpService error 响应内容friendsType:{},cid:{}",cid);
}
} catch (Exception e) {
logger.error("OkHttpService get e:{}",e.toString());
}
return result;
}
private static String post(String url,String reqBody, String timestamp, String authorization, String signature){
System.out.println("请求内容 : \n " + reqBody);
OkHttpClient client = new OkHttpClient();
RequestBody body = RequestBody.create(JSON, reqBody);
Request request = new Request.Builder()
.header("timestamp", timestamp)
.addHeader("Authorization", authorization)
.addHeader("signature", signature)
.url(url)
.post(body)
.build();
String resBodyStr = null;
try {
Response response = client.newCall(request).execute();
if (response.isSuccessful()) {
resBodyStr = response.body().string();
System.out.println("响应内容 : \n " + resBodyStr);
} else {
System.out.println("响应码: " + response.code());
System.out.println("请求内容: " + response.body().string());
throw new IOException("Unexpected code " + response);
}
} catch (Exception e) {
logger.error("OkHttpService post e:{}",e.toString());
}
return resBodyStr;
}
}
package com.flink.processor.impl;
import com.flink.achieve.doris.SimiFriendsAchi;
import com.flink.enums.JobTypeEnum;
import com.flink.enums.TopicTypeEnum;
import com.flink.processor.JobProcessor;
/**
* @author wjs
* @version 创建时间:2025-5-29 10:46:14
* 类说明
*/
public class SimiFriendsProcessor implements JobProcessor{
@Override
public void process() throws Exception {
new SimiFriendsAchi().handleDataStreamSource(
JobTypeEnum.SIMI_FRIENDS,
TopicTypeEnum.OPEN_SIMI_API
);
}
}
package com.flink.processor.impl;
import com.flink.achieve.doris.SimiGroupstAchi;
import com.flink.enums.JobTypeEnum;
import com.flink.enums.TopicTypeEnum;
import com.flink.processor.JobProcessor;
/**
* @author wjs
* @version 创建时间:2025-5-29 10:46:25
* 类说明
*/
public class SimiGroupstProcessor implements JobProcessor{
@Override
public void process() throws Exception {
new SimiGroupstAchi().handleDataStreamSource(
JobTypeEnum.SIMI_GROUPS,
TopicTypeEnum.OPEN_SIMI_API
);
}
}
package com.flink.processor.impl;
import java.util.ArrayList;
import java.util.List;
import org.apache.commons.lang3.StringUtils;
/**
* @author wjs
* @version 创建时间:2025-2-14 17:37:45
* 类说明
*/
public class StringBuilderParams {
public static String friends(String cid,String phoneNumber) {
List<String> array = new ArrayList<>();
if(StringUtils.isNotEmpty(cid)) {
array.add("cid="+cid);
}
if(StringUtils.isNotEmpty(phoneNumber)) {
array.add("phoneNumber="+phoneNumber);
}
return String.join("&", array);
}
public static String groups(String cid,String phoneNumber) {
List<String> array = new ArrayList<>();
if(StringUtils.isNotEmpty(cid)) {
array.add("cid="+cid);
}
if(StringUtils.isNotEmpty(phoneNumber)) {
array.add("phoneNumber="+phoneNumber);
}
return String.join("&", array);
}
}
package com.flink.util;
import java.text.SimpleDateFormat;
import java.time.LocalDate;
import java.util.TimeZone;
/**
......@@ -16,4 +17,9 @@ public class TimeConvertUtil {
sdf.setTimeZone(TimeZone.getTimeZone("UTC"));
return sdf.parse(timeStr).getTime();
}
public static int convertToSqlDate(String datetime) {
return (int) LocalDate.parse(datetime.split(" ")[0])
.toEpochDay(); // 转换为天数偏移量
}
}
package com.flink.vo;
import java.io.Serializable;
import lombok.Data;
import lombok.ToString;
/**
* @author wjs
* @version 创建时间:2025-5-26 17:47:06
* 类说明
*/
@Data
@ToString
public class DeviceId implements Serializable{
/**
*
*/
private static final long serialVersionUID = 1L;
private String deviceId;
private String cid;
private String phone;
private String nick;
private String appKey;
private String appType;
private String createTime;
private Long collectTime;
}
package com.flink.vo;
import java.io.Serializable;
import lombok.Data;
import lombok.ToString;
/**
* @author wjs
* @version 创建时间:2025-5-28 14:49:42
* 类说明
*/
@Data
@ToString
public class DeviceIdInfo implements Serializable{
/**
*
*/
private static final long serialVersionUID = 1L;
private String idfv;
private String deviceIdV1;
}
......@@ -23,6 +23,7 @@ public class Result implements Serializable{
private String phone;
private String ip;
private String areaName;
private String deviceId;
private String countryCode;
private String userState;
private String nick;
......
package com.flink.vo;
import lombok.Data;
import lombok.ToString;
/**
* @author wjs
* @version 创建时间:2025-2-17 11:16:32
* 类说明
*/
@Data
@ToString
public class SimiInterfaceBase {
private String errCode;
private String msg;
private Boolean success;
private String data;
}
package com.flink.vo.simi;
import java.io.Serializable;
import lombok.Data;
/**
* @author wjs
* @version 创建时间:2025-5-12 18:29:59
* 类说明 博客/文章点赞转发评论
*/
@Data
public class ContentInteractionReqDto extends OpenSimiApiBaseReqDto implements Serializable{
/**
*
*/
private static final long serialVersionUID = 1L;
private String releaseType;//发布类型 BLOG:博客 OTHER:其它
private String articleId;//文章ID(关联具体内容)
private String actionType;//动作类型(LIKE:点赞、COMMENT:评论、SHARE:转发)
private String commentContent;//评论内容
private String commentId;//评论id
private String commentLevel;//评论级别level1:一级 level2二级
}
package com.flink.vo.simi;
import java.io.Serializable;
import lombok.Data;
import lombok.ToString;
/**
* @author wjs
* @version 创建时间:2025-5-12 18:24:52
* 类说明 创建群组
*/
@Data
@ToString
public class CreateGroupReqDto extends OpenSimiApiBaseReqDto implements Serializable{
/**
*
*/
private static final long serialVersionUID = 1L;
private String groupId;//群组id
}
package com.flink.vo.simi;
import java.io.Serializable;
import lombok.Data;
import lombok.ToString;
/**
* @author wjs
* @version 创建时间:2025-5-12 18:24:01
* 类说明 删除好友
*/
@Data
@ToString
public class DeleteFriendReqDto extends OpenSimiApiBaseReqDto implements Serializable{
/**
*
*/
private static final long serialVersionUID = 1L;
private String friendCid;//用户cid
private String friendNickname;//好友昵称
private String remark; //添加备注
}
package com.flink.vo.simi;
import java.io.Serializable;
import lombok.Data;
import lombok.ToString;
/**
* @author wjs
* @version 创建时间:2025-5-12 18:22:04
* 类说明 成功添加好友
*/
@Data
@ToString
public class FriendRequestAcceptedReqDto extends OpenSimiApiBaseReqDto implements Serializable{
/**
*
*/
private static final long serialVersionUID = 1L;
private String friendCid; //好友唯一标识(引用用户表CID)
private String friendNickname; // 好友昵称(添加时显示的名称)
private String addMethod; //添加方式枚举(ACTIVE:主动添加、PASSIVE:被动添加)
private String remark; //添加备注
}
package com.flink.vo.simi;
import java.io.Serializable;
import lombok.Data;
import lombok.ToString;
/**
* @author wjs
* @version 创建时间:2025-5-12 18:20:22
* 类说明 添加发起添加好友申请
*/
@Data
@ToString
public class InitiateFriendRequestReqDto extends OpenSimiApiBaseReqDto implements Serializable{
/**
*
*/
private static final long serialVersionUID = 1L;
private String friendCid; // 好友唯一标识
private String friendNickname;// 好友昵称(添加时的名称)
private String addMethod; // 添加方式枚举: SEARCH_CID:通过CID搜索添加、SEARCH_PHONE:通过手机号搜索添加、GROUP_ADD:通过群组添加、SCAN_QR:扫码添加
private String remark; //添加备注
}
package com.flink.vo.simi;
import java.io.Serializable;
import lombok.Data;
import lombok.ToString;
/**
* @author wjs
* @version 创建时间:2025-5-12 18:26:09
* 类说明 加入群组
*/
@Data
@ToString
public class JoinGroupReqDto extends OpenSimiApiBaseReqDto implements Serializable{
/**
*
*/
private static final long serialVersionUID = 1L;
private String groupId;//群组ID
private String groupName;//群名称
private String joinType;//加入类型(CREATE_GROUP:创建群、INVITED:被邀请、SCAN_JOIN:扫码加入、SEARCH_JOIN:搜索加入)
}
package com.flink.vo.simi;
import java.io.Serializable;
import lombok.Data;
import lombok.ToString;
/**
* @author wjs
* @version 创建时间:2025-5-12 18:27:49
* 类说明 退出群组
*/
@Data
@ToString
public class LeaveGroupReqDto extends OpenSimiApiBaseReqDto implements Serializable{
/**
*
*/
private static final long serialVersionUID = 1L;
private String groupId;// 群组ID(退出时的ID)
private String groupName;//群名称(退出时的名称)
private String exitType;//退出类型(ACTIVE_EXIT:主动退出、KICKED_OUT:被踢出、GROUP_DISMISSED:群解散)
}
package com.flink.vo.simi;
import java.io.Serializable;
import lombok.Data;
import lombok.ToString;
/**
* @author wjs
* @version 创建时间:2025-5-13 10:23:28
* 类说明
*/
@Data
@ToString
public class OpenSimiApiBaseReqDto implements Serializable{
/**
*
*/
private static final long serialVersionUID = 1L;
//主键ID
private String id;
//用户唯一标识符(Customer ID)
private String cid;
//操作时间,格式为:yyyy-MM-dd HH:mm:ss.SSS
private String time;
}
package com.flink.vo.simi;
import java.io.Serializable;
import lombok.Data;
import lombok.ToString;
/**
* @author wjs
* @version 创建时间:2025-5-12 18:29:14
* 类说明
*/
@Data
@ToString
public class PublishContentReqDto extends OpenSimiApiBaseReqDto implements Serializable{
/**
*
*/
private static final long serialVersionUID = 1L;
private String releaseType;//发布类型 BLOG:博客 OTHER:其它
private String articleId;//文章ID(关联具体内容)
}
package com.flink.vo.simi;
import java.io.Serializable;
import lombok.Data;
import lombok.ToString;
/**
* @author wjs
* @version 创建时间:2025-5-29 14:25:57
* 类说明
*/
@Data
@ToString
public class SimiFriends implements Serializable{
/**
*
*/
private static final long serialVersionUID = 1L;
private String nick;
private String addTime;
private String thirdId;
private String remarks;
private String cid;
}
package com.flink.vo.simi;
import java.io.Serializable;
import lombok.Data;
import lombok.ToString;
/**
* @author wjs
* @version 创建时间:2025-5-29 14:25:57
* 类说明
*/
@Data
@ToString
public class SimiGroups implements Serializable{
/**
*
*/
private static final long serialVersionUID = 1L;
private String groupName;
private String addTime;
private Long groupId;
}
package com.flink.vo.simi;
import java.io.Serializable;
import lombok.Data;
import lombok.ToString;
/**
* @author wjs
* @version 创建时间:2025-5-12 18:23:03
* 类说明 修改好友昵称
*/
@Data
@ToString
public class UpdateFriendNicknameReqDto extends OpenSimiApiBaseReqDto implements Serializable{
/**
*
*/
private static final long serialVersionUID = 1L;
private String friendCid; // 被修改好友唯一标识
private String oldNickname; // 原昵称(修改前)
private String newNickname; // 新昵称(修改后)
}
package com.flink.vo.simi;
import java.io.Serializable;
import lombok.Data;
import lombok.ToString;
/**
* @author wjs
* @version 创建时间:2025-5-12 18:27:03
* 类说明 修改群组名称
*/
@Data
@ToString
public class UpdateGroupNameReqDto extends OpenSimiApiBaseReqDto implements Serializable{
/**
*
*/
private static final long serialVersionUID = 1L;
private String groupId;//群组id
private String oldGroupName;//旧的群名称(变更前)
private String newGroupName;//新的群名称(变更后)
}
package com.flink.vo.simi;
import java.io.Serializable;
import lombok.Data;
import lombok.ToString;
/**
* @author wjs
* @version 创建时间:2025-5-12 18:17:38
* 类说明 修改昵称
*/
@Data
@ToString
public class UpdateNicknameReqDto extends OpenSimiApiBaseReqDto implements Serializable{
/**
*
*/
private static final long serialVersionUID = 1L;
private String oldNickname; // 旧昵称
private String newNickname; // 新昵称
}
package com.flink.vo.simi;
import java.io.Serializable;
import lombok.Data;
import lombok.ToString;
/**
* @author wjs
* @version 创建时间:2025-5-12 18:16:38
* 类说明 用户注册
*/
@Data
@ToString
public class UserRegistrationReqDto extends OpenSimiApiBaseReqDto implements Serializable{
/**
*
*/
private static final long serialVersionUID = 1L;
//国际电话区号(例如:+86 表示中国)
private String countryCode;
//手机号码,需符合E.164国际标准格式
private String phoneNumber;
}
......@@ -12,3 +12,7 @@ doris.driver_class_name=com.mysql.cj.jdbc.Driver
hdfs.url=hdfs://10.0.0.105:8020/user/ck/
#hdfs.url=hdfs://140.245.112.44:8020/user/ck/
simiUserInfo.url= https://admin.dw20.net/prod-api/
simiUserInfo.authorization= KhhZAQKaZkfd7p55
simiUserInfo.key= niiKpP4SXce2zCHZ
\ No newline at end of file
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or sign in to comment