Commit 25c910d7 by 魏建枢

事件采集作业

parent 18db721c
package com.flink.achieve.doris;
import java.io.Serializable;
import java.time.LocalDateTime;
import java.time.format.DateTimeFormatter;
import java.util.Objects;
import org.apache.commons.lang3.StringUtils;
import org.apache.doris.flink.sink.DorisSink;
import org.apache.flink.api.common.io.ParseException;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.table.api.DataTypes;
import org.apache.flink.table.data.GenericRowData;
import org.apache.flink.table.data.RowData;
import org.apache.flink.table.data.StringData;
import org.apache.flink.table.data.TimestampData;
import org.apache.flink.table.types.DataType;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import com.alibaba.fastjson.JSONObject;
import com.alibaba.fastjson.TypeReference;
import com.flink.common.DorisConnector;
import com.flink.common.SourceCommonBase;
import com.flink.config.TableConfig;
import com.flink.processor.function.UserPropertiesProcessor;
import com.flink.util.TimeConvertUtil;
import com.flink.vo.OdsEventLog;
import com.flink.vo.UserProperties;
/**
* @author wjs
* @version 创建时间:2025-6-20 23:40:33
* 类说明
*/
public class EventLogAchi extends SourceCommonBase implements Serializable{
/**
*
*/
private static final long serialVersionUID = 1L;
private static final Logger logger = LoggerFactory.getLogger(EventLogAchi.class);
@Override
public void parseSourceKafkaJson(DataStreamSource<String> dataStreamSource) throws ParseException, Exception {
// =================配置入库字段=========================================
// 事件明细表结构
TableConfig tableConfig = new TableConfig(
new String[]{
"id",
"dt",
"send_time",
"create_time",
"strategy_group_id",
"app_key",
"app_type",
"app_channel",
"zone_code",
"zone_name",
"zone_type",
"sdk_version",
"user_agent",
"device_id",
"uid",
"strategy_version",
"event_list",
"route_ip",
"cid",
"phone",
"nick",
"unique_id",
"__DORIS_DELETE_SIGN__"
},
new DataType[]{
DataTypes.STRING(),
DataTypes.DATE(),
DataTypes.TIMESTAMP(3),
DataTypes.TIMESTAMP(3),
DataTypes.STRING(),
DataTypes.STRING(),
DataTypes.STRING(),
DataTypes.STRING(),
DataTypes.STRING(),
DataTypes.STRING(),
DataTypes.STRING(),
DataTypes.STRING(),
DataTypes.STRING(),
DataTypes.STRING(),
DataTypes.STRING(),
DataTypes.STRING(),
DataTypes.STRING(),
DataTypes.STRING(),
DataTypes.STRING(),
DataTypes.STRING(),
DataTypes.STRING(),
DataTypes.STRING(),
DataTypes.INT()
},
"bi.event_log"
);
//=================流式处理=========================================
DorisSink<RowData> dorisSink = DorisConnector.sinkDoris(tableConfig.getFields(), tableConfig.getTypes(), tableConfig.getTableName());
//=================数据处理流水线=========================================
dataStreamSource
.map(value->{
try {
// 解析 Kafka 数据
OdsEventLog event = JSONObject.parseObject(value, new TypeReference<OdsEventLog>() {});
String createTime = event.getCreate_time();
String routeIp = event.getRoute_ip();
String appKey = event.getApp_key().trim();
String appType = event.getApp_type();
String sendTime = event.getSend_time();
if(StringUtils.isEmpty(appKey) || StringUtils.equals(appKey, "C7jias27jias2")) {
appKey = "8ooOvXJo276";
}
UserProperties userProperties = UserPropertiesProcessor.userPropertiesToJson(event.getUser_properties());
// 转换为RowData
GenericRowData row = new GenericRowData(tableConfig.getFields().length);
row.setField(0, StringData.fromString(event.getId()));
row.setField(1, TimeConvertUtil.convertToSqlDate(createTime.substring(0, 10)));
row.setField(2, TimestampData.fromLocalDateTime(LocalDateTime.parse(sendTime,DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss"))));
row.setField(3, TimestampData.fromLocalDateTime(LocalDateTime.parse(createTime,DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss.SSS"))));
row.setField(4, StringData.fromString(event.getStrategy_group_id()));
row.setField(5, StringData.fromString(event.getApp_key()));
row.setField(6, StringData.fromString(event.getApp_type()));
row.setField(7, StringData.fromString(event.getApp_channel()));
row.setField(8, StringData.fromString(event.getZone_code()));
row.setField(9, StringData.fromString(event.getZone_name()));
row.setField(10, StringData.fromString(event.getZone_type()));
row.setField(11, StringData.fromString(event.getSdk_version()));
row.setField(12, StringData.fromString(event.getUser_agent()));
row.setField(13, StringData.fromString(event.getDevice_id()));
row.setField(14, StringData.fromString(event.getUid()));
row.setField(15, StringData.fromString(event.getStrategy_version()));
row.setField(16, StringData.fromString(event.getEvent_list()));
row.setField(17, StringData.fromString(event.getRoute_ip()));
row.setField(18, StringData.fromString(userProperties==null ? null : userProperties.getCid()));
row.setField(19, StringData.fromString(userProperties==null ? null : userProperties.getPhone()));
row.setField(20, StringData.fromString(userProperties==null ? null : userProperties.getNick()));
row.setField(21, StringData.fromString(event.getUnique_id()));
row.setField(22, 0);
return (RowData)row;
} catch (Exception e) {
System.err.println("解析失败: "+e.toString());
return null;
}
})
.filter(Objects::nonNull)
// .print()
.sinkTo(dorisSink)
.name("Doris-CollectLog");
}
@Override
public void sendToSinkKafka(DataStreamSource<String> mStream) {
// TODO Auto-generated method stub
}
}
...@@ -28,6 +28,7 @@ public enum JobTypeEnum { ...@@ -28,6 +28,7 @@ public enum JobTypeEnum {
SIMI_FRIENDS("JOB_09", "SIMI好友作业"), SIMI_FRIENDS("JOB_09", "SIMI好友作业"),
SIMI_GROUPS("JOB_10", "SIMI群组作业"), SIMI_GROUPS("JOB_10", "SIMI群组作业"),
VECTOR_ANGLE_CALCULATION("JOB_11", "矢量角度计算作业"), VECTOR_ANGLE_CALCULATION("JOB_11", "矢量角度计算作业"),
EVENT_LOG("JOB_12", "事件采集作业"),
; ;
......
...@@ -6,6 +6,7 @@ import com.flink.processor.impl.CollectLogProcessor; ...@@ -6,6 +6,7 @@ import com.flink.processor.impl.CollectLogProcessor;
import com.flink.processor.impl.DeviceIdLatestProcessor; import com.flink.processor.impl.DeviceIdLatestProcessor;
import com.flink.processor.impl.EventIpConvertProcessor; import com.flink.processor.impl.EventIpConvertProcessor;
import com.flink.processor.impl.EventIpLatestProcessor; import com.flink.processor.impl.EventIpLatestProcessor;
import com.flink.processor.impl.EventLogProcessor;
import com.flink.processor.impl.RealBalanceProcessor; import com.flink.processor.impl.RealBalanceProcessor;
import com.flink.processor.impl.RealKycProcessor; import com.flink.processor.impl.RealKycProcessor;
import com.flink.processor.impl.RealTransactionProcessor; import com.flink.processor.impl.RealTransactionProcessor;
...@@ -45,6 +46,8 @@ public class JobProcessorFactory { ...@@ -45,6 +46,8 @@ public class JobProcessorFactory {
return new SimiGroupstProcessor(); return new SimiGroupstProcessor();
case VECTOR_ANGLE_CALCULATION: case VECTOR_ANGLE_CALCULATION:
return new VectorAngleCalculationProcessor(); return new VectorAngleCalculationProcessor();
case EVENT_LOG:
return new EventLogProcessor();
default: default:
throw new IllegalArgumentException("未知的Job类型: " + jobType); throw new IllegalArgumentException("未知的Job类型: " + jobType);
} }
......
package com.flink.processor.function;
import org.apache.flink.api.common.state.ValueState;
import org.apache.flink.api.common.state.ValueStateDescriptor;
import org.apache.flink.api.common.typeinfo.TypeHint;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.functions.KeyedProcessFunction;
import org.apache.flink.util.Collector;
import com.flink.achieve.doris.VectorAngleCalculationAchi.PointRecord;
import com.flink.achieve.doris.VectorAngleCalculationAchi.ResultRecord;
/**
* @author wjs
* @version 创建时间:2025-6-20 16:06:54
* 类说明
*/
public class VectorAngleProcessor extends KeyedProcessFunction<Tuple2<String, Long>, PointRecord, ResultRecord>{
/**
*
*/
private static final long serialVersionUID = 1L;
private transient ValueState<VectorState> vectorState;
@Override
public void open(Configuration parameters) {
ValueStateDescriptor<VectorState> descriptor = new ValueStateDescriptor<>(
"vectorState",
TypeInformation.of(new TypeHint<VectorState>() {})
);
vectorState = getRuntimeContext().getState(descriptor);
}
@Override
public void processElement(PointRecord record,
KeyedProcessFunction<Tuple2<String, Long>, PointRecord, ResultRecord>.Context ctx,
Collector<ResultRecord> out) throws Exception {
VectorState state = vectorState.value();
double vectorX, vectorY, vectorM, pointV;
// 处理第一条记录
if (state == null) {
vectorX = 0;
vectorY = 0;
vectorM = 0;
pointV = 0; // 首行点积为0
} else {
// 计算向量差(对应t3)
vectorX = record.positionX - state.prevPositionX;
vectorY = record.positionY - state.prevPositionY;
// 计算向量模长(对应temp111)
vectorM = Math.sqrt(vectorX * vectorX + vectorY * vectorY);
// 计算点积(对应point_v)
pointV = vectorX * state.prevVectorX + vectorY * state.prevVectorY;
}
// 准备前一个向量的模长(用于余弦计算)
double prevVectorM = (state != null) ? state.prevVectorM : vectorM;
// 分母检查(防止除0)
double denominator = vectorM * prevVectorM;
double cosV = (denominator == 0) ? 1.0 : pointV / denominator;
// 约束余弦值[-1,1]
cosV = Math.max(-1.0, Math.min(1.0, cosV));
// 计算弧度和角度
double radianV = Math.acos(cosV);
double angleV = radianV * 180 / Math.PI;
// 构建结果记录(对应最终SELECT)
ResultRecord result = new ResultRecord(
record.id,
record.eventTime,
record.rowNum,
record.positionX,
record.positionY,
vectorX,
vectorY,
vectorM,
pointV,
cosV,
angleV,
radianV,
record.resolutionX,
record.resolutionY
);
out.collect(result);
// 更新状态(当前记录成为下一条的"前一条")
vectorState.update(new VectorState(
record.positionX,
record.positionY,
vectorX,
vectorY,
vectorM
));
}
// // 向量计算状态类
public static class VectorState {
public double prevPositionX;
public double prevPositionY;
public double prevVectorX;
public double prevVectorY;
public double prevVectorM;
public VectorState(double x, double y, double vx, double vy, double vm) {
prevPositionX = x;
prevPositionY = y;
prevVectorX = vx;
prevVectorY = vy;
prevVectorM = vm;
}
}
}
package com.flink.processor.impl;
import com.flink.achieve.doris.EventLogAchi;
import com.flink.enums.JobTypeEnum;
import com.flink.enums.TopicTypeEnum;
import com.flink.processor.JobProcessor;
/**
* @author wjs
* @version 创建时间:2025-6-20 23:39:38
* 类说明
*/
public class EventLogProcessor implements JobProcessor{
@Override
public void process() throws Exception {
new EventLogAchi().handleDataStreamSource(
JobTypeEnum.EVENT_LOG,
TopicTypeEnum.ODS_EVENT_LOG
);
}
}
package com.flink.processor.impl;
import java.util.Arrays;
import java.util.List;
import java.util.stream.Collectors;
import com.flink.achieve.doris.VectorAngleCalculationAchi;
import com.flink.enums.JobTypeEnum;
import com.flink.enums.TopicTypeEnum;
import com.flink.processor.JobProcessor;
import com.flink.vo.KafkaTopic;
/**
* @author wjs
* @version 创建时间:2025-6-18 11:07:02
* 类说明
*/
public class VectorAngleCalculationProcessor implements JobProcessor{
@Override
public void process() throws Exception {
new VectorAngleCalculationAchi().handleDataStreamSource(
createTopicList(),
JobTypeEnum.VECTOR_ANGLE_CALCULATION
);
}
private static List<KafkaTopic> createTopicList() {
return Arrays.stream(new TopicTypeEnum[]{
TopicTypeEnum.ODS_EVENT_LOG,
TopicTypeEnum.ODS_NEW_COLLECT_LOG
}).map(TopicTypeEnum::createKafkaTopic)
.collect(Collectors.toList());
}
}
package com.flink.util; package com.flink.util;
import java.text.SimpleDateFormat; import java.text.SimpleDateFormat;
import java.time.Instant;
import java.time.LocalDate; import java.time.LocalDate;
import java.time.LocalDateTime; import java.time.LocalDateTime;
import java.time.ZoneId; import java.time.ZoneId;
...@@ -44,4 +45,14 @@ public class TimeConvertUtil { ...@@ -44,4 +45,14 @@ public class TimeConvertUtil {
public static String format(TimestampData timestamp) { public static String format(TimestampData timestamp) {
return FORMATTER.format(timestamp.toLocalDateTime()); return FORMATTER.format(timestamp.toLocalDateTime());
} }
public static String parseToString (Long timestamp) {
// 步骤1:时间戳 → Instant对象
Instant instant = Instant.ofEpochMilli(timestamp);
// 步骤2:指定时区 → 转为LocalDateTime
LocalDateTime dateTime = LocalDateTime.ofInstant(instant, ZoneId.systemDefault());
// 步骤3:定义格式 → 生成字符串
DateTimeFormatter formatter = DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss");
return dateTime.format(formatter);
}
} }
package com.flink.vo;
import java.io.Serializable;
import com.flink.achieve.doris.VectorAngleCalculationAchi.JoinKey;
import lombok.Data;
import lombok.ToString;
/**
* @author wjs
* @version 创建时间:2025-6-19 11:09:31
* 类说明
*/
@Data
@ToString
public class CollectLogToJsonSource implements Serializable{
/**
*
*/
private static final long serialVersionUID = 1L;
private String deviceId;
private String uniqueId;
private String cid;
private String phone;
private String nick;
private int resolution_x;
private int resolution_y;
private Long collectTime;
private transient JoinKey joinKey; // 非序列化字段
public JoinKey getJoinKey() {
if (joinKey == null) {
joinKey = new JoinKey(uniqueId, deviceId, cid, phone);
}
return joinKey;
}
public CollectLogToJsonSource(
String deviceId,
String uniqueId,
String cid,
String phone,
String nick,
int resolution_x,
int resolution_y,
Long collectTime
) {
this.deviceId = deviceId;
this.uniqueId = uniqueId;
this.cid = cid;
this.phone = phone;
this.nick = nick;
this.resolution_x = resolution_x;
this.resolution_y = resolution_y;
this.collectTime = collectTime;
}
}
package com.flink.vo;
import java.io.Serializable;
import lombok.Data;
import lombok.ToString;
/**
* @author wjs
* @version 创建时间:2025-6-18 17:35:56
* 类说明
*/
@Data
@ToString
public class EventList implements Serializable{
/**
*
*/
private static final long serialVersionUID = 1L;
private String r7;
private Properties r8;
private Long r9;
}
package com.flink.vo;
import java.io.Serializable;
import java.util.List;
import com.flink.achieve.doris.VectorAngleCalculationAchi.JoinKey;
import lombok.Data;
import lombok.ToString;
/**
* @author wjs
* @version 创建时间:2025-6-19 11:09:03 类说明
*/
@Data
@ToString
public class EventLogToJsonSource implements Serializable {
/**
*
*/
private static final long serialVersionUID = 1L;
private String id;
private String uniqueId;
private String deviceId;
private String cid;
private String phone;
private String nick;
private List<EventList> eventList;
private Long createTime;
private transient JoinKey joinKey; // 非序列化字段
public JoinKey getJoinKey() {
if (joinKey == null) {
joinKey = new JoinKey(uniqueId, deviceId, cid, phone);
}
return joinKey;
}
public EventLogToJsonSource(String id, String uniqueId, String deviceId, String cid, String phone, String nick,
List<EventList> eventList, Long createTime) {
this.id = id;
this.uniqueId = uniqueId;
this.deviceId = deviceId;
this.cid = cid;
this.phone = phone;
this.nick = nick;
this.eventList = eventList;
this.createTime = createTime;
}
}
package com.flink.vo;
import java.io.Serializable;
import java.util.List;
/**
* @author wjs
* @version 创建时间:2025-2-26 17:00:37
* 类说明
*/
public class Properties implements Serializable{
/**
*
*/
private static final long serialVersionUID = 1L;
private String r1;
private String r2;
private String r3;
private String r4;
private String r5;
private List<String> r6;
public String getR1() {
return r1;
}
public void setR1(String r1) {
this.r1 = r1;
}
public String getR2() {
return r2;
}
public void setR2(String r2) {
this.r2 = r2;
}
public String getR3() {
return r3;
}
public void setR3(String r3) {
this.r3 = r3;
}
public String getR4() {
return r4;
}
public void setR4(String r4) {
this.r4 = r4;
}
public String getR5() {
return r5;
}
public void setR5(String r5) {
this.r5 = r5;
}
public List<String> getR6() {
return r6;
}
public void setR6(List<String> r6) {
this.r6 = r6;
}
}
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or sign in to comment