Commit 05aafffc by 魏建枢

事件ip处理代码提交

parent a4daf7f2
...@@ -52,6 +52,7 @@ public class KafkaBaseSchema { ...@@ -52,6 +52,7 @@ public class KafkaBaseSchema {
.column("strategy_version", STRING()) .column("strategy_version", STRING())
.column("sdk_version", STRING()) .column("sdk_version", STRING())
.column("zone_type", STRING()) .column("zone_type", STRING())
.column("route_ip", STRING())
.column("id", STRING()) .column("id", STRING())
.column("user_agent", STRING()) .column("user_agent", STRING())
.columnByMetadata("topic", STRING(), "topic", true) // 元数据字段 .columnByMetadata("topic", STRING(), "topic", true) // 元数据字段
......
...@@ -67,20 +67,27 @@ public class EventLogTable { ...@@ -67,20 +67,27 @@ public class EventLogTable {
" k.uid, " + " k.uid, " +
" k.send_time, " + " k.send_time, " +
" k.app_key, " + " k.app_key, " +
" k.route_ip, " +
" k.strategy_version, " + " k.strategy_version, " +
" k.sdk_version, " + " k.sdk_version, " +
" k.zone_type, " + " k.zone_type, " +
" k.id, " + " k.id, " +
" k.user_agent, " + " k.user_agent, " +
" t.r7 AS r7, " + " t.userParams['cid'] AS cid, " +
" t.r8_r2 AS r2, " + " t.userParams['phone'] AS phone, " +
" t.r8_r3 AS r3, " + " t.userParams['nick'] AS nick, " +
" t.r8_s2['cid'] AS cid, " + " t.userParams['view_type'] AS view_type, " +
" t.r8_s2['phone'] AS phone, " + " t.userParams['view_id'] AS view_id, " +
" t.r8_s2['nick'] AS nick, " + " t.userParams['content'] AS content, " +
" t.r9 AS r9 " + " t.userParams['screen_name'] AS screen_name, " +
" t.userParams['touch_pressure'] AS touch_pressure, " +
" t.userParams['draw_point'] AS draw_point, " +
" t.userParams['event_type'] AS event_type, " +
" t.userParams['event_time'] AS event_time, " +
" t.userParams['ip_name'] AS ip_name, " +
" t.userParams['area_name'] AS area_name " +
"FROM kafka_event_log AS k " + "FROM kafka_event_log AS k " +
"LEFT JOIN LATERAL TABLE(ParseEventList(event_list,user_properties)) AS t(r7, r8_r2, r8_r3, r8_r6, r8_s2, r9) ON TRUE" "LEFT JOIN LATERAL TABLE(ParseEventList(event_list,user_properties,route_ip)) AS t(userParams) ON TRUE"
); );
// Table result = tableEnv.sqlQuery("SELECT * FROM event_log_view"); // Table result = tableEnv.sqlQuery("SELECT * FROM event_log_view");
// result.execute().print(); // result.execute().print();
......
package com.flink.achieve.table.udf; package com.flink.achieve.table.udf;
import java.util.HashMap; import java.util.HashMap;
import java.util.List;
import java.util.Map; import java.util.Map;
import org.apache.commons.collections.CollectionUtils;
import org.apache.commons.lang3.StringUtils; import org.apache.commons.lang3.StringUtils;
import org.apache.flink.table.annotation.DataTypeHint; import org.apache.flink.table.annotation.DataTypeHint;
import org.apache.flink.table.annotation.FunctionHint; import org.apache.flink.table.annotation.FunctionHint;
...@@ -15,6 +17,9 @@ import com.alibaba.fastjson.JSON; ...@@ -15,6 +17,9 @@ import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONArray; import com.alibaba.fastjson.JSONArray;
import com.alibaba.fastjson.JSONObject; import com.alibaba.fastjson.JSONObject;
import com.flink.processor.function.UserPropertiesProcessor; import com.flink.processor.function.UserPropertiesProcessor;
import com.flink.util.CompareUtils;
import com.flink.util.TimeConvertUtil;
import com.flink.util.ip2region.SearcherUtil;
import com.flink.vo.UserProperties; import com.flink.vo.UserProperties;
/** /**
...@@ -22,7 +27,7 @@ import com.flink.vo.UserProperties; ...@@ -22,7 +27,7 @@ import com.flink.vo.UserProperties;
* @version 创建时间:2025-8-18 18:11:07 * @version 创建时间:2025-8-18 18:11:07
* 类说明 * 类说明
*/ */
@FunctionHint(output = @DataTypeHint("ROW<r7 STRING, r8_r2 INT, r8_r3 STRING, r8_r6 STRING, r8_s2 MAP<STRING,STRING>, r9 BIGINT>")) @FunctionHint(output = @DataTypeHint("ROW<userParams MAP<STRING,STRING>>"))
public class ParseEventListUDTF extends TableFunction<Row>{ public class ParseEventListUDTF extends TableFunction<Row>{
/** /**
...@@ -31,27 +36,63 @@ public class ParseEventListUDTF extends TableFunction<Row>{ ...@@ -31,27 +36,63 @@ public class ParseEventListUDTF extends TableFunction<Row>{
private static final long serialVersionUID = 1L; private static final long serialVersionUID = 1L;
private static final Logger logger = LoggerFactory.getLogger(ParseEventListUDTF.class); private static final Logger logger = LoggerFactory.getLogger(ParseEventListUDTF.class);
public void eval(String jsonStr,String user_properties) { public void eval(String jsonStr,String user_properties,String route_ip) {
try { try {
if (StringUtils.isEmpty(jsonStr)) return; if (StringUtils.isEmpty(jsonStr)) return;
if(StringUtils.isEmpty(user_properties)) return; UserProperties userProps = new UserProperties();
UserProperties userProps = UserPropertiesProcessor.userPropertiesToJson(user_properties); if(StringUtils.isNotEmpty(user_properties)) {
userProps = UserPropertiesProcessor.userPropertiesToJson(user_properties);
}
String ip_name = null;
String area_name = null;
if(StringUtils.isNotEmpty(route_ip)) {
List<String> ips = SearcherUtil.convertStringToList(route_ip);
if(CollectionUtils.isNotEmpty(ips)) {
for(String ip:ips) {
if(!SearcherUtil.ipv6(ip)) {
area_name = SearcherUtil.getCityInfoByFile(ip);
if(!CompareUtils.stringExists(area_name,
"0|0|0|内网IP|内网IP",
"0|0|0|内网IP|Finance-and-Promoting-Technology")) {
ip_name = ip;
break;
}else {
ip_name = null;
area_name = null;
}
}
}
}
}
JSONArray eventArray = JSON.parseArray(jsonStr); JSONArray eventArray = JSON.parseArray(jsonStr);
for (int i = 0; i < eventArray.size(); i++) { for (int i = 0; i < eventArray.size(); i++) {
JSONObject event = eventArray.getJSONObject(i); JSONObject event = eventArray.getJSONObject(i);
String r7 = event.getString("r7"); String event_type = event.getString("r7");
long r9 = event.getLong("r9"); String event_time = TimeConvertUtil.parseToStringSSS(event.getLong("r9"));
JSONObject r8 = event.getJSONObject("r8"); JSONObject r8 = event.getJSONObject("r8");
int r8_r2 = r8.getIntValue("r2"); String view_type = r8.getString("r1");
String r8_r3 = r8.getString("r3"); String view_id = r8.getString("r2");
String content = r8.getString("r3");
String r6 = r8.getString("r6"); String screen_name = r8.getString("r4");
int touch_pressure = r8.getIntValue("r5");
String draw_point = r8.getString("r6");
Map<String, String> userParams = new HashMap<>(); Map<String, Object> userParams = new HashMap<>();
userParams.put("cid", userProps.getCid()); userParams.put("view_type", view_type);
userParams.put("phone", userProps.getPhone()); userParams.put("view_id", view_id);
userParams.put("nick", userProps.getNick()); userParams.put("content", content);
userParams.put("screen_name", screen_name);
userParams.put("touch_pressure", touch_pressure+"");
userParams.put("draw_point", draw_point);
userParams.put("event_type", event_type);
userParams.put("event_time", event_time);
userParams.put("ip_name", ip_name);
userParams.put("area_name", area_name);
if(null != userProps) {
userParams.put("cid", userProps.getCid());
userParams.put("phone", userProps.getPhone());
userParams.put("nick", userProps.getNick());
}
//TODO 数据为空只做测试不做真实操作 //TODO 数据为空只做测试不做真实操作
// Map<String, String> s2Map = new HashMap<>(); // Map<String, String> s2Map = new HashMap<>();
// JSONArray s2Array = r8.getJSONArray("s2"); // JSONArray s2Array = r8.getJSONArray("s2");
...@@ -66,16 +107,12 @@ public class ParseEventListUDTF extends TableFunction<Row>{ ...@@ -66,16 +107,12 @@ public class ParseEventListUDTF extends TableFunction<Row>{
// } // }
// 输出解析结果 // 输出解析结果
collect(Row.of( collect(Row.of(
r7, userParams
r8_r2,
r8_r3,
r6,
userParams,
r9
)); ));
} }
} catch (Exception e) { } catch (Exception e) {
logger.error("ParseEventListUDTF eval e:{}",e.toString()); logger.error("ParseEventListUDTF eval e:{}",e.toString());
} }
} }
} }
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or sign in to comment