Commit 5333e716 by 魏建枢

代码提交tabl sql 入网ip处理

parent 067dc8d7
Showing with 846 additions and 34 deletions
......@@ -7,6 +7,7 @@ import org.apache.commons.collections.CollectionUtils;
import org.apache.commons.lang3.StringUtils;
import org.apache.flink.api.common.io.ParseException;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
......@@ -73,7 +74,7 @@ public class CommonConsumeBaseAchi extends SourceCommonBase implements Serializa
}
@Override
public void parseMultipleSourceKafkaToSqlTable(StreamTableEnvironment tableEnv)
public void parseMultipleSourceKafkaToSqlTable(StreamExecutionEnvironment env,StreamTableEnvironment tableEnv)
throws ParseException, Exception {
// TODO Auto-generated method stub
......
......@@ -17,6 +17,7 @@ import org.apache.flink.api.common.io.ParseException;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.DataTypes;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
import org.apache.flink.table.data.GenericRowData;
......@@ -513,7 +514,7 @@ public class DeviceIdLatestAchi extends SourceCommonBase implements Serializable
}
@Override
public void parseMultipleSourceKafkaToSqlTable(StreamTableEnvironment tableEnv)
public void parseMultipleSourceKafkaToSqlTable(StreamExecutionEnvironment env,StreamTableEnvironment tableEnv)
throws ParseException, Exception {
// TODO Auto-generated method stub
......
......@@ -11,6 +11,7 @@ import org.apache.commons.lang3.StringUtils;
import org.apache.doris.flink.sink.DorisSink;
import org.apache.flink.api.common.io.ParseException;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.DataTypes;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
import org.apache.flink.table.data.GenericRowData;
......@@ -183,7 +184,7 @@ public class EventIpConvertAchi extends SourceCommonBase implements Serializable
}
@Override
public void parseMultipleSourceKafkaToSqlTable(StreamTableEnvironment tableEnv)
public void parseMultipleSourceKafkaToSqlTable(StreamExecutionEnvironment env,StreamTableEnvironment tableEnv)
throws ParseException, Exception {
// TODO Auto-generated method stub
......
......@@ -21,6 +21,7 @@ import org.apache.flink.api.common.state.ValueStateDescriptor;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.co.CoProcessFunction;
import org.apache.flink.table.api.DataTypes;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
......@@ -433,7 +434,7 @@ public class EventIpLatestAchi extends SourceCommonBase implements Serializable{
}
@Override
public void parseMultipleSourceKafkaToSqlTable(StreamTableEnvironment tableEnv)
public void parseMultipleSourceKafkaToSqlTable(StreamExecutionEnvironment env,StreamTableEnvironment tableEnv)
throws ParseException, Exception {
// TODO Auto-generated method stub
......
......@@ -13,6 +13,7 @@ import org.apache.flink.api.java.tuple.Tuple3;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.co.ProcessJoinFunction;
import org.apache.flink.streaming.api.windowing.assigners.GlobalWindows;
import org.apache.flink.streaming.api.windowing.evictors.TimeEvictor;
......@@ -219,7 +220,7 @@ public class RegistrationCheckAchi extends SourceCommonBase implements Serializa
}
@Override
public void parseMultipleSourceKafkaToSqlTable(StreamTableEnvironment tableEnv)
public void parseMultipleSourceKafkaToSqlTable(StreamExecutionEnvironment env,StreamTableEnvironment tableEnv)
throws ParseException, Exception {
// TODO Auto-generated method stub
......
......@@ -28,6 +28,7 @@ import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.co.CoProcessFunction;
import org.apache.flink.table.api.DataTypes;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
......@@ -790,7 +791,7 @@ public class SimiFriendsAchi extends SourceCommonBase implements Serializable {
}
@Override
public void parseMultipleSourceKafkaToSqlTable(StreamTableEnvironment tableEnv)
public void parseMultipleSourceKafkaToSqlTable(StreamExecutionEnvironment env,StreamTableEnvironment tableEnv)
throws ParseException, Exception {
// TODO Auto-generated method stub
......
......@@ -11,6 +11,7 @@ import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.common.io.ParseException;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.DataTypes;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
import org.apache.flink.table.data.GenericRowData;
......@@ -127,7 +128,7 @@ public class SimiGroupstAchi extends SourceCommonBase implements Serializable{
}
@Override
public void parseMultipleSourceKafkaToSqlTable(StreamTableEnvironment tableEnv)
public void parseMultipleSourceKafkaToSqlTable(StreamExecutionEnvironment env,StreamTableEnvironment tableEnv)
throws ParseException, Exception {
// TODO Auto-generated method stub
......
......@@ -31,6 +31,7 @@ import org.apache.flink.streaming.api.datastream.AsyncDataStream;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.KeyedProcessFunction;
import org.apache.flink.streaming.api.functions.ProcessFunction;
import org.apache.flink.streaming.api.functions.async.ResultFuture;
......@@ -739,7 +740,7 @@ public class UserDailyActivityAchi extends SourceCommonBase implements Serializa
}
@Override
public void parseMultipleSourceKafkaToSqlTable(StreamTableEnvironment tableEnv)
public void parseMultipleSourceKafkaToSqlTable(StreamExecutionEnvironment env,StreamTableEnvironment tableEnv)
throws ParseException, Exception {
// TODO Auto-generated method stub
......
......@@ -19,6 +19,7 @@ import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.KeyedStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.DataTypes;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
import org.apache.flink.table.data.GenericRowData;
......@@ -846,7 +847,7 @@ public class VectorAngleCalculationAchi extends SourceCommonBase implements Seri
}
@Override
public void parseMultipleSourceKafkaToSqlTable(StreamTableEnvironment tableEnv)
public void parseMultipleSourceKafkaToSqlTable(StreamExecutionEnvironment env,StreamTableEnvironment tableEnv)
throws ParseException, Exception {
// TODO Auto-generated method stub
......
......@@ -5,7 +5,7 @@ import java.util.List;
import org.apache.flink.api.common.io.ParseException;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.table.api.Table;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
......@@ -46,12 +46,20 @@ public class RegistrationCheckAchi extends SourceCommonBase implements Serializa
}
@Override
public void parseMultipleSourceKafkaToSqlTable(StreamTableEnvironment tableEnv) throws Exception {
public void parseMultipleSourceKafkaToSqlTable(StreamExecutionEnvironment env,StreamTableEnvironment tableEnv) throws Exception {
Table result = tableEnv.sqlQuery("SELECT * FROM collect_log_view");
result.execute().print();
// Table result = tableEnv.sqlQuery("SELECT * FROM event_log_view");
// Table result = tableEnv.sqlQuery("SELECT * FROM collect_log_view");
// result.execute().print();
// Table result1 = tableEnv.sqlQuery("SELECT * FROM event_log_view");
// result1.execute().print();
// Table result1 = tableEnv.sqlQuery(
// "SELECT * " +
// "FROM event_log_view AS eventLog " +
// "LEFT JOIN collect_log_view AS collectLog " +
// "ON eventLog.device_id = collectLog.device_id " +
// "AND eventLog.unique_id = collectLog.unique_id"
// );
// result1.execute().print();
}
}
package com.flink.achieve.table;
import java.io.Serializable;
import java.util.List;
import org.apache.commons.lang3.StringUtils;
import org.apache.flink.api.common.io.ParseException;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
import org.apache.flink.types.Row;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.node.ObjectNode;
import com.flink.common.KafkaConnector;
import com.flink.common.SourceCommonBase;
import com.flink.enums.TopicTypeEnum;
import com.flink.vo.KafkaDataSource;
/**
* @author wjs
* @version 创建时间:2025-8-21 15:51:47
* 类说明
*/
public class TableSqlSinkKafkaAchi extends SourceCommonBase implements Serializable{
/**
*
*/
private static final long serialVersionUID = 1L;
@Override
public void parseSourceKafkaJson(DataStreamSource<String> dataStreamSource) throws ParseException, Exception {
// TODO Auto-generated method stub
}
@Override
public void parseSourceKafkaToSqlTable(TopicTypeEnum topicTypeEnum, StreamTableEnvironment tableEnv)
throws ParseException, Exception {
// TODO Auto-generated method stub
}
@Override
public void parseMultipleSourceKafkaJson(List<KafkaDataSource> dataSourceList) throws ParseException, Exception {
// TODO Auto-generated method stub
}
@Override
public void parseMultipleSourceKafkaToSqlTable(StreamExecutionEnvironment env,StreamTableEnvironment tableEnv) throws ParseException, Exception {
Table resultTable = tableEnv.sqlQuery("SELECT "
+ "network_ip,"
+ "network_area_name,"
+ "unique_id,"
+ "device_id,"
+ "device_id_v1,"
+ "send_type,"
+ "zone_name,"
+ "zone_code,"
+ "zone_type,"
+ "send_time,"
+ "app_key,"
+ "app_type,"
+ "cid,phone,"
+ "nick "
+ "FROM sys_log_view");
DataStream<Row> rowStream = tableEnv.toChangelogStream(resultTable);
// 3. 将 Row 转为 JSON 字符串
DataStream<String> jsonStream = rowStream.map(row -> {
ObjectMapper mapper = new ObjectMapper();
ObjectNode jsonNode = mapper.createObjectNode();
if(null != row.getField(0)) {
jsonNode.put("network_ip", row.getField(0).toString());
}
if(null != row.getField(1)) {
jsonNode.put("network_area_name", row.getField(1).toString());
}
if(null != row.getField(2)) {
jsonNode.put("unique_id", row.getField(2).toString());
}
if(null != row.getField(3)) {
jsonNode.put("device_id", row.getField(3).toString());
}
if(null != row.getField(4)) {
jsonNode.put("device_id_v1", row.getField(4).toString());
}
if(null != row.getField(5)) {
jsonNode.put("send_type", row.getField(5).toString());
}
if(null != row.getField(6)) {
jsonNode.put("zone_name", row.getField(6).toString());
}
if(null != row.getField(7)) {
jsonNode.put("zone_code", row.getField(7).toString());
}
if(null != row.getField(8)) {
jsonNode.put("zone_type", row.getField(8).toString());
}
if(null != row.getField(9)) {
jsonNode.put("send_time", row.getField(9).toString());
}
if(null != row.getField(10)) {
jsonNode.put("app_key", row.getField(10).toString());
}
if(null != row.getField(11)) {
jsonNode.put("app_type", row.getField(11).toString());
}
if(null != row.getField(12)) {
jsonNode.put("cid", row.getField(12).toString());
}
if(null != row.getField(13)) {
jsonNode.put("phone", row.getField(13).toString());
}
if(null != row.getField(14)) {
jsonNode.put("nick", row.getField(14).toString());
}
return jsonNode.toString();
});
KafkaConnector.sinkKafka(env, "dwd_sys_log",jsonStream);
}
}
......@@ -15,7 +15,19 @@ import com.flink.util.LoadPropertiesFile;
* 类说明 定义 Kafka源表结构(包含所有字段和水印定义)
*/
public class KafkaBaseSchema {
//-----------------------------------------------------创建TableDescriptor--------------------------------------------------------------------------
public static void createTableDescriptor(StreamTableEnvironment tableEnv,Schema schema,String topic,String groupId,String tableName) {
TableDescriptor descriptor = TableDescriptor.forConnector("kafka")
.schema(schema)
.option("topic", topic)
.option("properties.bootstrap.servers",LoadPropertiesFile.getPropertyFileValues("kafka.bootstrapServers"))
.option("properties.group.id", groupId)
.option("scan.startup.mode", "latest-offset")
.option("format", "json")
.option("json.ignore-parse-errors", "true")
.build();
tableEnv.createTemporaryTable(tableName, descriptor);
}
//-----------------------------------------------------定义 Kafka源表结构并创建Schema--------------------------------------------------------------------------
public static void eventLogSchema(StreamTableEnvironment tableEnv,String topic,String groupId,String tableName) {
......@@ -75,17 +87,27 @@ public class KafkaBaseSchema {
createTableDescriptor(tableEnv, schema, topic, groupId, tableName);
}
//-----------------------------------------------------创建TableDescriptor--------------------------------------------------------------------------
public static void createTableDescriptor(StreamTableEnvironment tableEnv,Schema schema,String topic,String groupId,String tableName) {
TableDescriptor descriptor = TableDescriptor.forConnector("kafka")
.schema(schema)
.option("topic", topic)
.option("properties.bootstrap.servers",LoadPropertiesFile.getPropertyFileValues("kafka.bootstrapServers"))
.option("properties.group.id", groupId)
.option("scan.startup.mode", "latest-offset")
.option("format", "json")
.option("json.ignore-parse-errors", "true")
public static void sysLogSchema(StreamTableEnvironment tableEnv, String topic, String groupId, String tableName) {
Schema schema = Schema.newBuilder()
.column("method", STRING())
.column("create_time", TIMESTAMP(3))
.column("description", STRING())
.column("request_method", STRING())
.column("title", STRING()) // 事件时间字段
.column("operate_url", STRING())
.column("operation_log_type", STRING())
.column("operate_name", STRING())
.column("request_body", STRING())
.column("success", STRING())
.column("operate_address", STRING())
.column("logging_type", STRING())
.column("browser", STRING())
.column("request_headers", STRING())
.column("system_os", STRING())
.column("id", STRING())
.columnByMetadata("topic", STRING(), "topic", true) // Kafka 元数据字段
.watermark("create_time", "create_time - INTERVAL '5' SECOND") // 水印策略
.build();
tableEnv.createTemporaryTable(tableName, descriptor);
createTableDescriptor(tableEnv, schema, topic, groupId, tableName);
}
}
......@@ -74,6 +74,9 @@ public class BaseKafkaToSqlCreateTable {
}else if(StringUtils.equals(kafkaTopic.getTopic(), TopicTypeEnum.ODS_COMMUNITY_HISTORY.getTopic())) {
}else if(StringUtils.equals(kafkaTopic.getTopic(), TopicTypeEnum.ODS_SYS_LOG.getTopic())) {
KafkaBaseSchema.sysLogSchema(tableEnv,topic,groupId,"kafka_sys_log");
SysLogTable.buildSysLogQuery(tableEnv);
}
}
}
......
......@@ -59,7 +59,7 @@ public class CollectLogTable {
" k.app_type, " +
" k.unique_id, " +
" k.device_id, " +
" t.device_v1, " +
" t.device_id_v1, " +
" k.create_time, " +
" k.send_type, " +
" k.zone_name, " +
......@@ -79,9 +79,9 @@ public class CollectLogTable {
" t.user_info['phone'] AS phone, " +
" t.user_info['nick'] AS nick " +
"FROM kafka_collect_log AS k " +
"LEFT JOIN LATERAL TABLE(ParseCollectList(app_type,app_key,other_info,env_info,device_info,user_properties)) AS t(device_v1, user_info) ON TRUE"
"LEFT JOIN LATERAL TABLE(ParseCollectList(app_type,app_key,other_info,env_info,device_info,user_properties)) AS t(device_id_v1, user_info) ON TRUE"
);
// Table result = tableEnv.sqlQuery("SELECT * FROM collect_log_view");
// result.execute().print();
Table result = tableEnv.sqlQuery("SELECT * FROM collect_log_view");
result.execute().print();
}
}
package com.flink.achieve.table.sql;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
import com.flink.achieve.table.udf.ParseSysLogListUDTF;
/**
* @author wjs
* @version 创建时间:2025-8-19 14:07:56
* 类说明
*/
public class SysLogTable {
public static void buildSysLogQuery(StreamTableEnvironment tableEnv) {
tableEnv.createTemporarySystemFunction("ParseSysLogList", ParseSysLogListUDTF.class);
//查询sql数据 并执行
tableEnv.executeSql(
"CREATE VIEW sys_log_view AS " +
"SELECT " +
" k.operate_address AS network_ip, " +
" t.collect_info['network_area_name'] AS network_area_name, " +
" t.collect_info['unique_id'] AS unique_id, " +
" t.collect_info['device_id'] AS device_id, " +
" t.collect_info['device_id_v1'] AS device_id_v1, " +
" t.collect_info['send_type'] AS send_type, " +
" t.collect_info['zone_name'] AS zone_name, " +
" t.collect_info['zone_code'] AS zone_code, " +
" t.collect_info['zone_type'] AS zone_type, " +
" t.collect_info['send_time'] AS send_time, " +
" t.collect_info['app_key'] AS app_key, " +
" t.collect_info['app_type'] AS app_type, " +
" t.collect_info['cid'] AS cid, " +
" t.collect_info['phone'] AS phone, " +
" t.collect_info['nick'] AS nick " +
"FROM kafka_sys_log AS k " +
"LEFT JOIN LATERAL TABLE(ParseSysLogList(operate_url,operate_address,request_body,request_headers)) AS t(collect_info) ON TRUE"
);
}
}
......@@ -20,7 +20,7 @@ import com.flink.vo.UserProperties;
* @version 创建时间:2025-8-19 14:30:02
* 类说明
*/
@FunctionHint(output = @DataTypeHint("ROW<device_v1 STRING, user_info MAP<STRING,STRING>>"))
@FunctionHint(output = @DataTypeHint("ROW<device_id_v1 STRING, user_info MAP<STRING,STRING>>"))
public class ParseCollectListUDTF extends TableFunction<Row>{
/**
......
package com.flink.achieve.table.udf;
import java.util.Base64;
import java.util.HashMap;
import java.util.Map;
import org.apache.commons.lang3.StringUtils;
import org.apache.flink.table.annotation.DataTypeHint;
import org.apache.flink.table.annotation.FunctionHint;
import org.apache.flink.table.functions.TableFunction;
import org.apache.flink.types.Row;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONObject;
import com.alibaba.fastjson.TypeReference;
import com.flink.enums.SendTypeEnum;
import com.flink.processor.function.GenDeviceIdProcessor;
import com.flink.processor.function.UserPropertiesProcessor;
import com.flink.util.CompareUtils;
import com.flink.util.TimeConvertUtil;
import com.flink.util.des.EncryptUtil;
import com.flink.util.ip2region.SearcherUtil;
import com.flink.vo.DeviceIdInfo;
import com.flink.vo.PcDeviceInfo;
import com.flink.vo.PcEventInfo;
import com.flink.vo.UserProperties;
/**
* @author wjs
* @version 创建时间:2025-8-20 18:06:47
* 类说明
*/
@FunctionHint(output = @DataTypeHint("ROW<collect_info MAP<STRING,STRING>>"))
public class ParseSysLogListUDTF extends TableFunction<Row>{
/**
*
*/
private static final long serialVersionUID = 1L;
private static final Logger logger = LoggerFactory.getLogger(ParseSysLogListUDTF.class);
public void eval(String operate_url,String operate_address, String request_body,String request_headers) {
try {
//判断路径
if(!CompareUtils.stringExists(operate_url,
"/api/v1/event",
"/api/v1/pcEvent")) {
return;
}
if(StringUtils.isAllEmpty(request_body,request_headers)) {
return;
}
JSONObject jsonObjBody = JSON.parseObject(request_body);
JSONObject jsonObjHeaders = JSON.parseObject(request_headers);
//解密
String bdParams = jsonObjBody.getString("bd");
String hashDateKeyStr = jsonObjHeaders.getString("hk0").substring(0,48);
String appKey = jsonObjHeaders.getString("ak0");
String appType = jsonObjHeaders.getString("at0");
String sendTime = TimeConvertUtil.parseToStringSSS(jsonObjHeaders.getLongValue("st0"));
String zoneName = new String(StringUtils.isNotEmpty(jsonObjHeaders.getString("zn0"))?Base64.getDecoder().decode(jsonObjHeaders.getString("zn0")):null);
String zoneCode = jsonObjHeaders.getString("zc0");
String zoneType = jsonObjHeaders.getString("zt0");
String sendType = jsonObjHeaders.getString("st1");
//bd解密
if(StringUtils.isAllEmpty(bdParams,hashDateKeyStr)) {
return;
}
String reqData =EncryptUtil.desDecrypt(bdParams, hashDateKeyStr);
if(StringUtils.isEmpty(reqData)) {
return;
}
String networkAreaName = SearcherUtil.getCityInfoByFile(operate_address);
//解析请求body和headers
Map<String, Object> collectParams = new HashMap<>();
collectParams.put("send_type", sendType);
collectParams.put("zone_name", zoneName);
collectParams.put("zone_code", zoneCode);
collectParams.put("zone_type", zoneType);
collectParams.put("send_time", sendTime);
collectParams.put("app_key", appKey);
collectParams.put("app_type", appType);
collectParams.put("network_area_name", networkAreaName);
if(StringUtils.equals(operate_url, "/api/v1/pcEvent")) {
if(CompareUtils.stringExists(sendType,SendTypeEnum.DEVICE_INFO.getCode())) {
PcDeviceInfo pcDeviceInfo = JSONObject.parseObject(reqData, new TypeReference<PcDeviceInfo>() {});
String deviceId = pcDeviceInfo.getI8();
collectParams.put("unique_id", deviceId);
collectParams.put("device_id", deviceId);
collectParams.put("device_id_v1", deviceId);
collectParams.put("cid", pcDeviceInfo.getCid());
collectParams.put("phone", pcDeviceInfo.getPhone());
collectParams.put("nick", pcDeviceInfo.getNick());
}
if(CompareUtils.stringExists(sendType,SendTypeEnum.EVENT_INFO.getCode())) {
PcEventInfo pcEventInfo = JSONObject.parseObject(reqData, new TypeReference<PcEventInfo>() {});
if(null == pcEventInfo) {
return;
}
String deviceId = pcEventInfo.getI8();
collectParams.put("unique_id", deviceId);
collectParams.put("device_id", deviceId);
collectParams.put("device_id_v1", deviceId);
collectParams.put("cid", pcEventInfo.getCid());
collectParams.put("phone", pcEventInfo.getPhone());
collectParams.put("nick", pcEventInfo.getNick());
}
}else if(StringUtils.equals(operate_url, "/api/v1/event")) {
String deviceId = jsonObjHeaders.getString("di0");
JSONObject jsonObj = JSON.parseObject(reqData);
String other_info = jsonObj.getString("i1");
String device_info = jsonObj.getString("a1");
String env_info = jsonObj.getString("g1");
String user_properties = jsonObj.getString("s2");
String unique_id = jsonObj.getString("s3");
if(CompareUtils.stringExists(sendType,
SendTypeEnum.ALL.getCode(),
SendTypeEnum.DEVICE_INFO.getCode()
)) {
if(!StringUtils.isAllEmpty(device_info,env_info)) {
DeviceIdInfo deviceIdInfo = GenDeviceIdProcessor.genDeviceId(appType,appKey, other_info, device_info, env_info);
UserProperties userProperties = UserPropertiesProcessor.userPropertiesToJson(user_properties);
collectParams.put("unique_id", unique_id);
collectParams.put("device_id", deviceId);
collectParams.put("device_id_v1", deviceIdInfo.getDeviceIdV1());
collectParams.put("cid", userProperties.getCid());
collectParams.put("phone", userProperties.getPhone());
collectParams.put("nick", userProperties.getNick());
}
}
if(CompareUtils.stringExists(sendType,
SendTypeEnum.ALL.getCode(),
SendTypeEnum.EVENT_INFO.getCode()
)) {
UserProperties userProperties = UserPropertiesProcessor.userPropertiesToJson(user_properties);
collectParams.put("unique_id", unique_id);
collectParams.put("device_id", deviceId);
collectParams.put("device_id_v1", null);
collectParams.put("cid", userProperties.getCid());
collectParams.put("phone", userProperties.getPhone());
collectParams.put("nick", userProperties.getNick());
}
}
if(null != collectParams) {
collect(Row.of(
collectParams
));
}
} catch (Exception e) {
logger.error("ParseSysLogListUDTF eval e:{}",e.toString());
}
}
}
......@@ -7,6 +7,7 @@ import org.apache.flink.connector.kafka.sink.KafkaRecordSerializationSchema;
import org.apache.flink.connector.kafka.sink.KafkaSink;
import org.apache.flink.connector.kafka.source.KafkaSource;
import org.apache.flink.connector.kafka.source.enumerator.initializer.OffsetsInitializer;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.slf4j.Logger;
......@@ -38,7 +39,7 @@ public class KafkaConnector {
}
//发送kafka
public static void sinkKafka(StreamExecutionEnvironment env,String topic) throws Exception {
public static void sinkKafka(StreamExecutionEnvironment env,String topic,DataStream<String> dataStream) throws Exception {
String bootstrapServers = LoadPropertiesFile.getPropertyFileValues("kafka.bootstrapServers");
KafkaSink<String> sink = KafkaSink.<String>builder()
.setBootstrapServers(bootstrapServers)//设置kafka地址
......@@ -49,6 +50,7 @@ public class KafkaConnector {
)
.setDeliveryGuarantee(DeliveryGuarantee.AT_LEAST_ONCE)//至少一次
.build();
dataStream.sinkTo(sink);
}
}
......@@ -70,7 +70,7 @@ public abstract class SourceCommonBase {
StreamTableEnvironment tableEnv = StreamEnvironmentSettings.createTableEnv(env, jobName.getCode(),kafkaTopicList,false);
logger.info("2. table的环境设置成功");
//3.资源配置文件信息的获取
parseMultipleSourceKafkaToSqlTable(tableEnv);
parseMultipleSourceKafkaToSqlTable(env,tableEnv);
}
if(useStreamAPI){
//2.资源配置文件信息的获取
......@@ -126,6 +126,6 @@ public abstract class SourceCommonBase {
* @throws ParseException
* @throws Exception
*/
public abstract void parseMultipleSourceKafkaToSqlTable(StreamTableEnvironment tableEnv) throws ParseException, Exception;
public abstract void parseMultipleSourceKafkaToSqlTable(StreamExecutionEnvironment env,StreamTableEnvironment tableEnv) throws ParseException, Exception;
}
package com.flink.enums;
/**
* @author wjs
* @version 创建时间:2024-10-23 17:30:12
* 类说明 删除枚举
*/
public enum SendTypeEnum {
/**
* 1、APP
*/
ALL("1", "全量 (设备信息+事件)"),
/**
* 2、设备信息
*/
DEVICE_INFO("2", "设备信息"),
/**
* 3、事件信息
*/
EVENT_INFO("3", "事件信息")
;
private String code;
private String name;
private SendTypeEnum(String code, String name) {
this.code = code;
this.name = name;
}
public String getCode() {
return code;
}
public String getName() {
return name;
}
}
......@@ -30,6 +30,7 @@ public enum TopicTypeEnum {
ODS_PC_EVENT_LOG("ods_pc_event_log","odsPcEventLog"),
ODS_PC_COLLECT_LOG("ods_pc_collect_log","odsPcCollectLog"),
ODS_COMMUNITY_HISTORY("ods_community_history","odsCommunityHistory"),
ODS_SYS_LOG("ods_sys_log","odsSysLog"),
;
private String topic;
......
package com.flink.util.des;
import java.io.ByteArrayInputStream;
import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.io.ObjectInputStream;
import java.io.ObjectOutputStream;
import java.io.Serializable;
public class ByteUtil {
private static final String[] CH = { "0", "1", "2", "3", "4", "5", "6", "7", "8", "9", "A", "B", "C", "D", "E", "F" };
private static final char[] ac = { '0', '1', '2', '3', '4', '5', '6', '7', '8', '9', 'a', 'b', 'c', 'd', 'e', 'f' };
/**
* 转换16进制字符串为byte[]
* @param ss
* @return
*/
public static byte[] convertHexString(String ss) {
byte digest[] = new byte[ss.length() / 2];
for (int i = 0; i < digest.length; i++) {
String byteString = ss.substring(2 * i, 2 * i + 2);
int byteValue = Integer.parseInt(byteString, 16);
digest[i] = (byte) byteValue;
}
return digest;
}
/**
* 转换byte[]为16进制字符串
* @param b
* @return
*/
public static String toHexString(byte b[]) {
StringBuffer hexString = new StringBuffer();
for (int i = 0; i < b.length; i++) {
String plainText = Integer.toHexString(0xff & b[i]);
if (plainText.length() < 2)
plainText = "0" + plainText;
hexString.append(plainText);
}
return hexString.toString();
}
public static byte[] getRandomKey(int byteLen) {
StringBuffer sb = new StringBuffer();
for (int i = 0; i < byteLen; i++) {
sb.append(CH[((int) (15.0D * java.lang.Math.random()))]);
}
return hexStringToByte(sb.toString());
}
public static byte[] addBytes(byte[][] src) {
int length = 0;
for (int i = 0; i < src.length; i++) {
if (src[i] != null)
length += src[i].length;
}
byte[] score = new byte[length];
int index = 0;
for (int i = 0; i < src.length; i++) {
if (src[i] != null) {
System.arraycopy(src[i], 0, score, index, src[i].length);
index += src[i].length;
}
}
src = (byte[][]) null;
return score;
}
public static byte[] getMidBytes(byte[] src, int startIndex, int length) {
byte[] b = new byte[length];
System.arraycopy(src, startIndex, b, 0, length);
return b;
}
public static byte[] getBytes(byte[] data, int len, byte flag) {
byte[] b = new byte[len];
for (int i = 0; i < b.length; i++) {
if (i < len - data.length)
b[i] = flag;
else {
b[i] = data[(i - b.length + data.length)];
}
}
data = (byte[]) null;
return b;
}
public static byte[] getBytes(byte[] data, byte old_flag, byte new_flag) {
for (int i = 0; i < data.length; i++) {
if (data[i] != old_flag)
break;
data[i] = new_flag;
}
return data;
}
public static byte[] hexStringToByte(String hex) {
byte[] data = hex.getBytes();
int i = data.length;
byte[] result = new byte[i / 2];
for (int j = 0; j < i; j += 2) {
String s1 = new String(data, j, 2);
result[(j / 2)] = ((byte) Integer.parseInt(s1, 16));
}
data = (byte[]) null;
return result;
}
private static byte toByte(char c) {
byte b = (byte) "0123456789ABCDEF".indexOf(c);
return b;
}
public static final String bytesToHexString(byte[] bArray) {
StringBuffer sb = new StringBuffer(bArray.length);
for (int i = 0; i < bArray.length; i++) {
String sTemp = byteToHexString(bArray[i]);
sb.append(sTemp.toUpperCase());
}
return sb.toString();
}
private static String byteToHexString(byte b) {
char[] ch = new char[2];
ch[0] = ac[(b >>> 4 & 0xF)];
ch[1] = ac[(b & 0xF)];
String s = new String(ch);
return new String(ch);
}
public static final Object bytesToObject(byte[] bytes) throws IOException, ClassNotFoundException {
ByteArrayInputStream in = new ByteArrayInputStream(bytes);
ObjectInputStream oi = new ObjectInputStream(in);
Object o = oi.readObject();
oi.close();
return o;
}
public static final byte[] objectToBytes(Serializable s) throws IOException {
ByteArrayOutputStream out = new ByteArrayOutputStream();
ObjectOutputStream ot = new ObjectOutputStream(out);
ot.writeObject(s);
ot.flush();
ot.close();
return out.toByteArray();
}
public static final String objectToHexString(Serializable s) throws IOException {
return bytesToHexString(objectToBytes(s));
}
public static final Object hexStringToObject(String hex) throws IOException, ClassNotFoundException {
return bytesToObject(hexStringToByte(hex));
}
public static String bcd2Str(byte[] bytes) {
StringBuffer temp = new StringBuffer(bytes.length * 2);
for (int i = 0; i < bytes.length; i++) {
temp.append((byte) ((bytes[i] & 0xF0) >>> 4));
temp.append((byte) (bytes[i] & 0xF));
}
return temp.toString().substring(0, 1).equalsIgnoreCase("0") ? temp.toString().substring(1) : temp.toString();
}
public static byte[] str2Bcd(String asc) {
int len = asc.length();
int mod = len % 2;
if (mod != 0) {
asc = "0" + asc;
len = asc.length();
}
byte[] abt = new byte[len];
if (len >= 2) {
len /= 2;
}
byte[] bbt = new byte[len];
abt = asc.getBytes();
for (int p = 0; p < asc.length() / 2; p++) {
int j;
if ((abt[(2 * p)] >= 48) && (abt[(2 * p)] <= 57)) {
j = abt[(2 * p)] - 48;
} else {
if ((abt[(2 * p)] >= 97) && (abt[(2 * p)] <= 122))
j = abt[(2 * p)] - 97 + 10;
else
j = abt[(2 * p)] - 65 + 10;
}
int k;
if ((abt[(2 * p + 1)] >= 48) && (abt[(2 * p + 1)] <= 57)) {
k = abt[(2 * p + 1)] - 48;
} else {
if ((abt[(2 * p + 1)] >= 97) && (abt[(2 * p + 1)] <= 122))
k = abt[(2 * p + 1)] - 97 + 10;
else {
k = abt[(2 * p + 1)] - 65 + 10;
}
}
int a = (j << 4) + k;
byte b = (byte) a;
bbt[p] = b;
}
return bbt;
}
public static byte[] getMultiples(byte[] src, int multiples, byte b) {
int remnant = src.length % multiples;
if (remnant == 0) {
return src;
}
int quotient = src.length / multiples;
byte[] newByte = new byte[(quotient + 1) * multiples];
for (int i = 0; i < newByte.length; i++) {
if (i < src.length)
newByte[i] = src[i];
else {
newByte[i] = b;
}
}
return newByte;
}
}
package com.flink.util.des;
import java.security.MessageDigest;
import javax.crypto.Cipher;
import javax.crypto.SecretKey;
import javax.crypto.spec.SecretKeySpec;
/**
* DES加密工具类
* @author wjs
*
*/
public class EncryptUtil {
private static final String model = "DESede/ECB/PKCS5Padding";//填充模式(PKCS5Padding) 选择不自动应用任何填充(NoPadding)
/**
* DES解密
* @param message
* @param key
* @return
* @throws Exception
*/
public static String desDecrypt(String message, String key) throws Exception {
try {
byte[] keyBytes = null;
if(key.length() == 16){
keyBytes = newInstance8Key(ByteUtil.convertHexString(key));
} else if(key.length() == 32){
keyBytes = newInstance16Key(ByteUtil.convertHexString(key));
} else if(key.length() == 48){
keyBytes = newInstance24Key(ByteUtil.convertHexString(key));
}
SecretKey deskey = new SecretKeySpec(keyBytes, "DESede");
Cipher c1 = Cipher.getInstance(model);
c1.init(2, deskey);
byte[] retByte = c1.doFinal(ByteUtil.convertHexString(message));
return new String(retByte);
} catch (Exception e) {
System.err.println(e);
}
return null;
}
/**
* DES解密
* @param message
* @param key
* @return
* @throws Exception
*/
public static String desDecryptToHex(String message, String key) throws Exception {
try {
byte[] keyBytes = null;
if(key.length() == 16){
keyBytes = newInstance8Key(ByteUtil.convertHexString(key));
} else if(key.length() == 32){
keyBytes = newInstance16Key(ByteUtil.convertHexString(key));
} else if(key.length() == 48){
keyBytes = newInstance24Key(ByteUtil.convertHexString(key));
}
SecretKey deskey = new SecretKeySpec(keyBytes, "DESede");
Cipher c1 = Cipher.getInstance(model);
c1.init(2, deskey);
byte[] retByte = c1.doFinal(ByteUtil.convertHexString(message));
return ByteUtil.toHexString(retByte);
} catch (Exception e) {
System.err.println(e);
}
return null;
}
/**
* DES加密
* @param message
* @param key
* @return
* @throws Exception
*/
public static String desEncrypt(String message, String key) throws Exception {
byte[] keyBytes = null;
if(key.length() == 16){
keyBytes = newInstance8Key(ByteUtil.convertHexString(key));
} else if(key.length() == 32){
keyBytes = newInstance16Key(ByteUtil.convertHexString(key));
} else if(key.length() == 48){
keyBytes = newInstance24Key(ByteUtil.convertHexString(key));
}
SecretKey deskey = new SecretKeySpec(keyBytes, "DESede");
Cipher cipher = Cipher.getInstance(model);
cipher.init(1, deskey);
return ByteUtil.toHexString(cipher.doFinal(message.getBytes("UTF-8")));
}
public static String desEncryptHexString(String message,String key) throws Exception {
byte[] keyBytes = null;
if(key.length() == 16){
keyBytes = newInstance8Key(ByteUtil.convertHexString(key));
} else if(key.length() == 32){
keyBytes = newInstance16Key(ByteUtil.convertHexString(key));
} else if(key.length() == 48){
keyBytes = newInstance24Key(ByteUtil.convertHexString(key));
}
SecretKey deskey = new SecretKeySpec(keyBytes, "DESede");
Cipher cipher = Cipher.getInstance(model);
cipher.init(1, deskey);
return ByteUtil.toHexString(cipher.doFinal(ByteUtil.convertHexString(message)));
}
/***
* MD5加码 生成32位md5码
*/
public static String md5Encrypt(String message) {
MessageDigest md5 = null;
try {
md5 = MessageDigest.getInstance("MD5");
} catch (Exception e) {
System.out.println(e.toString());
e.printStackTrace();
return "";
}
byte[] md5Bytes = md5.digest(message.getBytes());
String hexValue = ByteUtil.toHexString(md5Bytes);
return hexValue;
}
private static byte[] newInstance24Key(byte[] key) {
if ((key != null) && (key.length == 24)) {
return key;
}
System.err.println("密钥长度有误,期望值[24]");
return null;
}
private static byte[] newInstance16Key(byte[] key) {
if ((key != null) && (key.length == 16)) {
byte[] b = new byte[24];
System.arraycopy(key, 0, b, 0, 16);
System.arraycopy(key, 0, b, 16, 8);
key = (byte[]) null;
return b;
}
System.err.println("密钥长度有误,期望值[16]");
return null;
}
private static byte[] newInstance8Key(byte[] key) {
if ((key != null) && (key.length == 8)) {
byte[] b = new byte[24];
System.arraycopy(key, 0, b, 0, 8);
System.arraycopy(key, 0, b, 8, 8);
System.arraycopy(key, 0, b, 16, 8);
key = (byte[]) null;
return b;
}
System.err.println("密钥长度有误,期望值[8]");
return null;
}
}
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or sign in to comment