Commit 63bf39b8 by 魏建枢

udf自定义函数初始化

parents
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>com.hive</groupId>
<artifactId>eagleEye-udf</artifactId>
<version>0.0.1-SNAPSHOT</version>
<build>
<plugins>
<plugin>
<artifactId>maven-compiler-plugin</artifactId>
<version>3.3</version>
<configuration>
<source>1.8</source>
<target>1.8</target>
</configuration>
</plugin>
<plugin>
<artifactId>maven-shade-plugin</artifactId>
<version>3.2.1</version>
<executions>
<execution>
<phase>package</phase>
<goals>
<goal>shade</goal>
</goals>
</execution>
</executions>
<configuration>
<transformers>
<transformer />
<transformer />
</transformers>
<filters>
<filter>
<artifact>*:*</artifact>
<excludes>
<exclude>META-INF/*.SF</exclude>
<exclude>META-INF/*.DSA</exclude>
<exclude>META-INF/*.RSA</exclude>
</excludes>
</filter>
</filters>
</configuration>
</plugin>
</plugins>
</build>
<dependencies>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-client</artifactId>
<version>3.3.6</version>
<scope>provided</scope>
<exclusions>
<exclusion>
<artifactId>hadoop-hdfs-client</artifactId>
<groupId>org.apache.hadoop</groupId>
</exclusion>
<exclusion>
<artifactId>hadoop-yarn-client</artifactId>
<groupId>org.apache.hadoop</groupId>
</exclusion>
<exclusion>
<artifactId>hadoop-mapreduce-client-core</artifactId>
<groupId>org.apache.hadoop</groupId>
</exclusion>
<exclusion>
<artifactId>hadoop-mapreduce-client-jobclient</artifactId>
<groupId>org.apache.hadoop</groupId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>jdk.tools</groupId>
<artifactId>jdk.tools</artifactId>
<version>1.8</version>
<scope>system</scope>
<systemPath>${JAVA_HOME}/lib/tools.jar</systemPath>
</dependency>
</dependencies>
</project>
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>com.hive</groupId>
<artifactId>eagleEye-udf</artifactId>
<version>0.0.1-SNAPSHOT</version>
<packaging>jar</packaging>
<dependencies>
<!--添加hive依赖 -->
<dependency>
<groupId>org.apache.hive</groupId>
<artifactId>hive-exec</artifactId>
<version>3.1.2</version>
</dependency>
<!-- hadoop-common -->
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-common</artifactId>
<version>3.3.6</version>
</dependency>
<!-- Json转换 -->
<dependency>
<groupId>com.alibaba</groupId>
<artifactId>fastjson</artifactId>
<version>1.2.62</version>
</dependency>
<!-- <dependency> -->
<!-- <groupId>org.apache.hadoop</groupId> -->
<!-- <artifactId>hadoop-client</artifactId> -->
<!-- <version>3.3.6</version> -->
<!-- <scope>provided</scope> -->
<!-- </dependency> -->
<dependency>
<groupId>jdk.tools</groupId>
<artifactId>jdk.tools</artifactId>
<version>1.8</version>
<scope>system</scope>
<systemPath>${JAVA_HOME}/lib/tools.jar</systemPath>
</dependency>
<dependency>
<groupId>org.lionsoul</groupId>
<artifactId>ip2region</artifactId>
<version>2.6.4</version>
</dependency>
</dependencies>
<build>
<plugins>
<plugin>
<!--用于编译Java代码,将源代码编译成目标字节码,并生成class文件。这里使用的版本是2.3.2,指定了编译器的源版本和目标版本都是1.8。 -->
<artifactId>maven-compiler-plugin</artifactId>
<version>2.3.2</version>
<configuration>
<source>1.8</source>
<target>1.8</target>
</configuration>
</plugin>
<plugin>
<!--用于将当前模块及其所有依赖打包成一个可执行的JAR文件, 其中使用了descriptorRef为"jar-with-dependencies"的描述符来实现依赖包的合并,它在Maven打包期间会自动将相关的依赖项打包进去。 -->
<artifactId>maven-assembly-plugin</artifactId>
<configuration>
<descriptorRefs>
<descriptorRef>jar-with-dependencies</descriptorRef>
</descriptorRefs>
</configuration>
<executions>
<execution>
<id>make-assembly</id>
<phase>package</phase>
<goals>
<goal>single</goal>
</goals>
</execution>
</executions>
</plugin>
</plugins>
</build>
</project>
package com.cloud.udf;
import java.net.Inet6Address;
import java.net.InetAddress;
import java.net.UnknownHostException;
import java.util.ArrayList;
import org.apache.commons.lang3.StringUtils;
import org.apache.hadoop.hive.ql.exec.UDFArgumentException;
import org.apache.hadoop.hive.ql.metadata.HiveException;
import org.apache.hadoop.hive.ql.udf.generic.GenericUDTF;
import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;
import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorFactory;
import org.apache.hadoop.hive.serde2.objectinspector.StructObjectInspector;
import org.apache.hadoop.hive.serde2.objectinspector.primitive.PrimitiveObjectInspectorFactory;
import com.cloud.udf.util.SearcherUtil;
/**
* @author wjs
* @version 创建时间:2025-3-7 15:58:05
* 类说明
*/
public class IpToConvert extends GenericUDTF{
private ArrayList<Object[]> result = new ArrayList<Object[]>();
@Override
public StructObjectInspector initialize(ObjectInspector[] argOIs) throws UDFArgumentException {
// 检测参数数量
if (argOIs.length != 6) {
throw new UDFArgumentException("需要六个输入参数:id,ip,cid,phone,nick,dt");
}
ArrayList<String> fieldNames = new ArrayList<String>();
ArrayList<ObjectInspector> fieldOIs = new ArrayList<ObjectInspector>();
fieldNames.add("id");
fieldOIs.add(PrimitiveObjectInspectorFactory.javaStringObjectInspector);
fieldNames.add("ip");
fieldOIs.add(PrimitiveObjectInspectorFactory.javaStringObjectInspector);
fieldNames.add("area_name");
fieldOIs.add(PrimitiveObjectInspectorFactory.javaStringObjectInspector);
fieldNames.add("cid");
fieldOIs.add(PrimitiveObjectInspectorFactory.javaStringObjectInspector);
fieldNames.add("phone");
fieldOIs.add(PrimitiveObjectInspectorFactory.javaStringObjectInspector);
fieldNames.add("nick");
fieldOIs.add(PrimitiveObjectInspectorFactory.javaStringObjectInspector);
fieldNames.add("dt");
fieldOIs.add(PrimitiveObjectInspectorFactory.javaStringObjectInspector);
return ObjectInspectorFactory.getStandardStructObjectInspector(fieldNames, fieldOIs);
}
@Override
public void process(Object[] args) throws HiveException {
System.out.println("IpToConvert process start! args:"+args !=null ? args.length:0);
if (args[0] == null) {
throw new HiveException("需要六个输入参数:id,ip,cid,phone,nick,dt");
}
String id = null;
if(args[0] != null) {
id = args[0].toString();
}
String ip = null;
if(args[1] != null) {
ip = args[1].toString();
}
String cid = null;
if(args[2] != null) {
cid = args[2].toString();
}
String phone = null;
if(args[3] != null) {
phone = args[3].toString();
}
String nick = null;
if(args[4] != null) {
nick = args[4].toString();
}
String dt = null;
if(args[5] != null) {
dt = args[5].toString();
}
String area_name = null;
try {
if(StringUtils.isNotEmpty(ip)) {
area_name = SearcherUtil.getCityInfoByFile(ip);
}
} catch (Exception e) {
System.out.println("ip:"+ip+"e:"+e.toString());
}
result.add(new Object[]{id,ip,area_name,cid,phone,nick,dt});
System.out.println("IpToConvert process end!");
}
@Override
public void close() throws HiveException {
for (Object[] r : result) {
forward(r);
}
result.clear();
}
private Boolean ipv6(String ip) {
Boolean flag = false;
try {
Inet6Address address = (Inet6Address) InetAddress.getByName(ip);
System.out.println(ip + " is an IPv6 address.");
flag = true;
} catch (UnknownHostException e) {
System.out.println(ip + " is not a valid IPv6 address.");
} catch (ClassCastException e) {
System.out.println(ip + " is not an IPv6 address.");
}
return flag;
}
public static void main(String[] args) {
String ip = "127.0.0.1";
if(StringUtils.isNotEmpty(ip) && !test(ip)) {
System.out.println(1);
}else {
System.out.println(2);
}
}
private static Boolean test(String ip) {
Boolean flag = false;
try {
Inet6Address address = (Inet6Address) InetAddress.getByName(ip);
System.out.println(ip + " is an IPv6 address.");
flag = true;
} catch (UnknownHostException e) {
System.out.println(ip + " is not a valid IPv6 address.");
} catch (ClassCastException e) {
System.out.println(ip + " is not an IPv6 address.");
}
return flag;
}
}
package com.cloud.udf;
import java.util.ArrayList;
import java.util.List;
import org.apache.hadoop.hive.ql.exec.UDFArgumentException;
import org.apache.hadoop.hive.ql.metadata.HiveException;
import org.apache.hadoop.hive.ql.udf.generic.GenericUDTF;
import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;
import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorFactory;
import org.apache.hadoop.hive.serde2.objectinspector.StructObjectInspector;
import org.apache.hadoop.hive.serde2.objectinspector.primitive.PrimitiveObjectInspectorFactory;
import org.apache.hive.com.esotericsoftware.minlog.Log;
import com.alibaba.fastjson.JSONException;
import com.alibaba.fastjson.JSONObject;
import com.alibaba.fastjson.TypeReference;
import com.cloud.udf.vo.jsonToRow.EventLog;
/**
* @author wjs
* @version 创建时间:2025-2-26 15:45:42
* 类说明
*/
//https://blog.csdn.net/qq_22973811/article/details/126304005
//https://blog.51cto.com/u_16175470/8961128 udtf
public class JsonToRow extends GenericUDTF{
private ArrayList<Object[]> result = new ArrayList<Object[]>();
@Override
public StructObjectInspector initialize(ObjectInspector[] argOIs) throws UDFArgumentException {
// 检测参数数量
if (argOIs.length != 20) {
throw new UDFArgumentException("需要按顺序输入二十个参数:id,send_time,create_time,strategy_group_id,app_key,app_type,app_channel,zone_code,zone_name,zone_type,sdk_version,user_agent,device_id,uid,strategy_version,event_list,route_ip,user_properties,unique_id,dt");
}
ArrayList<String> fieldNames = new ArrayList<String>();
ArrayList<ObjectInspector> fieldOIs = new ArrayList<ObjectInspector>();
fieldNames.add("id");
fieldOIs.add(PrimitiveObjectInspectorFactory.javaStringObjectInspector);
fieldNames.add("send_time");
fieldOIs.add(PrimitiveObjectInspectorFactory.javaStringObjectInspector);
fieldNames.add("create_time");
fieldOIs.add(PrimitiveObjectInspectorFactory.javaStringObjectInspector);
fieldNames.add("strategy_group_id");
fieldOIs.add(PrimitiveObjectInspectorFactory.javaStringObjectInspector);
fieldNames.add("app_key");
fieldOIs.add(PrimitiveObjectInspectorFactory.javaStringObjectInspector);
fieldNames.add("app_type");
fieldOIs.add(PrimitiveObjectInspectorFactory.javaStringObjectInspector);
fieldNames.add("app_channel");
fieldOIs.add(PrimitiveObjectInspectorFactory.javaStringObjectInspector);
fieldNames.add("zone_code");
fieldOIs.add(PrimitiveObjectInspectorFactory.javaStringObjectInspector);
fieldNames.add("zone_name");
fieldOIs.add(PrimitiveObjectInspectorFactory.javaStringObjectInspector);
fieldNames.add("zone_type");
fieldOIs.add(PrimitiveObjectInspectorFactory.javaStringObjectInspector);
fieldNames.add("sdk_version");
fieldOIs.add(PrimitiveObjectInspectorFactory.javaStringObjectInspector);
fieldNames.add("user_agent");
fieldOIs.add(PrimitiveObjectInspectorFactory.javaStringObjectInspector);
fieldNames.add("device_id");
fieldOIs.add(PrimitiveObjectInspectorFactory.javaStringObjectInspector);
fieldNames.add("uid");
fieldOIs.add(PrimitiveObjectInspectorFactory.javaStringObjectInspector);
fieldNames.add("strategy_version");
fieldOIs.add(PrimitiveObjectInspectorFactory.javaStringObjectInspector);
fieldNames.add("event_list");
fieldOIs.add(PrimitiveObjectInspectorFactory.javaStringObjectInspector);
fieldNames.add("route_ip");
fieldOIs.add(PrimitiveObjectInspectorFactory.javaStringObjectInspector);
fieldNames.add("user_properties");
fieldOIs.add(PrimitiveObjectInspectorFactory.javaStringObjectInspector);
fieldNames.add("unique_id");
fieldOIs.add(PrimitiveObjectInspectorFactory.javaStringObjectInspector);
fieldNames.add("dt");
fieldOIs.add(PrimitiveObjectInspectorFactory.javaStringObjectInspector);
return ObjectInspectorFactory.getStandardStructObjectInspector(fieldNames, fieldOIs);
}
@Override
public void process(Object[] args) throws HiveException {
System.out.println("JsonToRow process start! args:"+args !=null ? args.length:0);
if (args == null || args.length != 20) {
throw new HiveException("需要按顺序输入二十个参数:id,send_time,create_time,strategy_group_id,app_key,app_type,app_channel,zone_code,zone_name,zone_type,sdk_version,user_agent,device_id,uid,strategy_version,event_list,route_ip,user_properties,unique_id,dt");
}
String id = args[0].toString();
String send_time = args[1].toString();
String create_time = args[2].toString();
String strategy_group_id = args[3].toString();
String app_key = args[4].toString();
String app_type = args[5].toString();
String app_channel = args[6].toString();
String zone_code = args[7].toString();
String zone_name = args[8].toString();
String zone_type = args[9].toString();
String sdk_version = args[10].toString();
String user_agent = args[11].toString();
String device_id = args[12].toString();
String uid = args[13].toString();
String strategy_version = args[14].toString();
String jsonStr = args[15].toString();
String route_ip = args[16].toString();
String user_properties = args[17].toString();
String unique_id = args[18].toString();
String dt = args[19].toString();
try {
List<EventLog> eventList = JSONObject.parseObject(jsonStr,new TypeReference<List<EventLog>>(){});
System.out.println("JsonToRow process eventList:"+eventList !=null ? eventList.size():0);
if(eventList != null && eventList.size() > 0) {
for (int i = 0; i < eventList.size(); i++) {
EventLog eventLog = eventList.get(i);
JSONObject jsonObj = new JSONObject();
jsonObj.put("r7", eventLog.getR7());
jsonObj.put("r8", eventLog.getR8());
jsonObj.put("r9", eventLog.getR9());
String event_list=jsonObj.toJSONString();
result.add(new Object[]{id,send_time,create_time,strategy_group_id,app_key,app_type,app_channel,zone_code,zone_name,zone_type,sdk_version,user_agent,device_id,uid,strategy_version,event_list,route_ip,user_properties,unique_id,dt});
}
}
Log.info("JsonToRow process eventList end!");
} catch (JSONException e) {
e.printStackTrace();
System.out.println("JsonToRow process e:"+e.toString());
}finally {
System.out.println("JsonToRow process end!");
}
}
@Override
public void close() throws HiveException {
for (Object[] r : result) {
forward(r);
}
result.clear();
}
public static void main(String[] args) {
String jsonStr = "[{\"r7\":\"r10\",\"r8\":{\"r2\":12299,\"r3\":\"\",\"r6\":[\"303.00&.2603.333328\",\"300.67&631.00\",\"300.67&631.00\",\"299.67&664.67\",\"299.67&703.33\",\"299.67&703.33\"],\"s2\":[{\"cid\":\"7647hcmut\"},{\"phone\":\"16631557647\"},{\"nick\":\"共享母猪\"}],\"r1\":\"RCTView\"},\"r9\":1740355286000},{\"r7\":\"r10\",\"r8\":{\"r2\":12299,\"r3\":\"\",\"r6\":[\"282.33&.2594.000000\",\"282.33&596.33\",\"282.33&596.33\",\"282.33&599.00\",\"283.00&601.00\",\"283.67&602.00\",\"283.67&603.00\",\"283.67&603.33\",\"283.67&603.00\",\"283.67&603.00\"],\"s2\":[{\"cid\":\"7647hcmut\"},{\"phone\":\"16631557647\"},{\"nick\":\"共享母猪\"}],\"r1\":\"RCTView\"},\"r9\":1740355287000},{\"r7\":\"r10\",\"r8\":{\"r2\":12299,\"r3\":\"\",\"r6\":[\"39.67&.2681.666656\",\"47.00&681.67\",\"47.00&681.67\",\"54.67&681.67\",\"63.67&681.67\",\"75.33&679.33\",\"92.67&675.00\",\"116.00&667.67\",\"128.67&663.33\",\"165.00&651.67\",\"191.00&640.67\",\"206.67&634.33\",\"206.67&634.33\"],\"s2\":[{\"cid\":\"7647hcmut\"},{\"phone\":\"16631557647\"},{\"nick\":\"共享母猪\"}],\"r1\":\"RCTView\"},\"r9\":1740355288000}]";
List<EventLog> eventList = JSONObject.parseObject(jsonStr,new TypeReference<List<EventLog>>(){});
System.out.println("JsonToRow process eventList:"+eventList !=null ? eventList.size():0);
ArrayList<Object[]> result = new ArrayList<Object[]>();
if(eventList != null && eventList.size() > 0) {
for (int i = 0; i < eventList.size(); i++) {
EventLog eventLog = eventList.get(i);
JSONObject jsonObj = new JSONObject();
jsonObj.put("r7", eventLog.getR7());
jsonObj.put("r8", eventLog.getR8());
jsonObj.put("r9", eventLog.getR9());
String event_list=jsonObj.toJSONString();
result.add(new Object[]{event_list});
}
}
System.out.println(">>>>>>>>>>>>>>:"+result);
}
}
package com.cloud.udf.util;
import java.io.IOException;
import java.util.concurrent.TimeUnit;
import org.lionsoul.ip2region.xdb.Searcher;
/**
* @author wjs
* @version 创建时间:2025-2-14 14:38:41
* 类说明 https://blog.csdn.net/qq_37284798/article/details/130005988
*/
public class SearcherUtil {
public static String getCityInfoByFile(String ip) {
System.out.println("SearcherUtil start! ip:"+ip);
// 1、创建 searcher 对象
String dbPath = "D:\\gitEagleEye\\eagleEye-service\\src\\main\\resources\\ip2region.xdb";
// String dbPath = "/home/opc/ip2region.xdb";
Searcher searcher;
try {
System.out.println("dbPath:"+dbPath);
searcher = Searcher.newWithFileOnly(dbPath);
} catch (IOException e) {
System.out.println("failed to create searcher dbPath:{}: "+dbPath+"e:"+ e);
return null;
}
// 2、查询
try {
long sTime = System.nanoTime();
String region = searcher.search(ip);
long cost = TimeUnit.NANOSECONDS.toMicros(System.nanoTime() - sTime);
System.out.println("region:"+region+"ioCount:"+ searcher.getIOCount()+"took:"+cost);
return region;
} catch (Exception e) {
System.out.println("failed to search:"+ip+"e:"+e);
}finally {
try {
searcher.close();
} catch (IOException e) {
System.out.println("failed to close:"+ip+"e:"+e);
}
}
return null;
// 3、备注:并发使用,每个线程需要创建一个独立的 searcher 对象单独使用。
}
public static void main(String[] args) throws Exception {
String aa = getCityInfoByFile(null);
System.out.println(aa);
}
}
package com.cloud.udf.vo.jsonToRow;
import java.io.Serializable;
/**
* @author wjs
* @version 创建时间:2025-2-26 16:59:27
* 类说明
*/
public class EventLog implements Serializable{
/**
*
*/
private static final long serialVersionUID = 1L;
private String r7;
private Properties r8;
private Long r9;
public String getR7() {
return r7;
}
public void setR7(String r7) {
this.r7 = r7;
}
public Properties getR8() {
return r8;
}
public void setR8(Properties r8) {
this.r8 = r8;
}
public Long getR9() {
return r9;
}
public void setR9(Long r9) {
this.r9 = r9;
}
}
package com.cloud.udf.vo.jsonToRow;
import java.io.Serializable;
import java.util.List;
/**
* @author wjs
* @version 创建时间:2025-2-26 17:00:37
* 类说明
*/
public class Properties implements Serializable{
/**
*
*/
private static final long serialVersionUID = 1L;
private String r1;
private String r2;
private String r3;
private String r4;
private String r5;
private List<String> r6;
public String getR1() {
return r1;
}
public void setR1(String r1) {
this.r1 = r1;
}
public String getR2() {
return r2;
}
public void setR2(String r2) {
this.r2 = r2;
}
public String getR3() {
return r3;
}
public void setR3(String r3) {
this.r3 = r3;
}
public String getR4() {
return r4;
}
public void setR4(String r4) {
this.r4 = r4;
}
public String getR5() {
return r5;
}
public void setR5(String r5) {
this.r5 = r5;
}
public List<String> getR6() {
return r6;
}
public void setR6(List<String> r6) {
this.r6 = r6;
}
}
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or sign in to comment