Commit 3daa838d by 魏建枢

udtf

parent 105020b9
package com.cloud.udf;
import java.util.ArrayList;
import java.util.List;
import org.apache.commons.lang3.StringUtils;
import org.apache.hadoop.hive.ql.exec.UDFArgumentException;
import org.apache.hadoop.hive.ql.metadata.HiveException;
import org.apache.hadoop.hive.ql.udf.generic.GenericUDTF;
import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;
import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorFactory;
import org.apache.hadoop.hive.serde2.objectinspector.StructObjectInspector;
import org.apache.hadoop.hive.serde2.objectinspector.primitive.PrimitiveObjectInspectorFactory;
import org.apache.hive.com.esotericsoftware.minlog.Log;
import com.alibaba.fastjson.JSONObject;
import com.alibaba.fastjson.TypeReference;
import com.cloud.udf.vo.jsonToRow.EventLog;
import com.cloud.udf.vo.jsonToRow.UserProperties;
/**
* @author wjs
* @version 创建时间:2025-3-18 10:58:29
* 类说明
*/
public class AndoridJsonToRow extends GenericUDTF{
@Override
public StructObjectInspector initialize(ObjectInspector[] argOIs) throws UDFArgumentException {
// 检测参数数量
ArrayList<String> fieldNames = new ArrayList<String>();
ArrayList<ObjectInspector> fieldOIs = new ArrayList<ObjectInspector>();
try {
// 初始化输出字段(所有字段类型为String)
String[] fields = {
"id","device_id","unique_id","uid","app_key","view_type","view_id","content","screen_name","touch_pressure","draw_point","cid","phone","nick",
"route_ip","send_time","event","event_time","zone_code","dt"
};
for (String field : fields) {
fieldNames.add(field);
fieldOIs.add(PrimitiveObjectInspectorFactory.javaStringObjectInspector);
}
} catch (Exception e) {
throw new UDFArgumentException("需要十个输入参数:id,device_id,unique_id,uid,app_key,event_list,route_ip,send_time,dt,user_properties,zone_code >>>>>:"+argOIs.length);
}
System.out.println("AndoridJsonToRow initialize end! >>>>>>> fieldNames:"+ fieldNames+"<<<<<<<<<:"+fieldOIs);
return ObjectInspectorFactory.getStandardStructObjectInspector(fieldNames, fieldOIs);
}
@Override
public void process(Object[] args) throws HiveException {
System.out.println("CollectLogDeviceIdUpgrade process start! args:" + (args != null ? args.length : 0));
try {
String id = args[0] != null ? args[0].toString() : null;
String device_id = args[1] != null ? args[1].toString() : null;
String unique_id = args[2] != null ? args[2].toString() : null;
String uid = args[3] != null ? args[3].toString() : null;
String app_key = args[4] != null ? args[4].toString() : null;
String jsonStr = args[5] != null ? args[5].toString() : null;
String route_ip = args[6] != null ? args[6].toString() : null;
String send_time = args[7] != null ? args[7].toString() : null;
String dt = args[8] != null ? args[8].toString() : null;
String user_properties = args[9] != null ? args[9].toString() : null;
String zone_code = args[10] != null ? args[10].toString() : null;
String cid = null;
String phone = null;
String nick = null;
if(StringUtils.isNotEmpty(user_properties)) {
List<UserProperties> userProperties = JSONObject.parseObject(user_properties,new TypeReference<List<UserProperties>>(){});
if(userProperties != null && userProperties.size() > 0) {
for(UserProperties user : userProperties) {
if(StringUtils.isNotEmpty(user.getCid())) {
cid = user.getCid();
}else if(StringUtils.isNotEmpty(user.getPhone())) {
phone = user.getPhone();
}else if(StringUtils.isNotEmpty(user.getId())) {
cid = user.getId();
}else if(StringUtils.isNotEmpty(user.getNick())) {
nick = user.getNick();
}else if(StringUtils.isNotEmpty(user.getEmail())) {
nick = user.getEmail();
}
}
System.out.println(cid+">>"+phone+">>"+nick);
}
}
if(StringUtils.isNotEmpty(jsonStr)) {
List<EventLog> eventList = JSONObject.parseObject(jsonStr,new TypeReference<List<EventLog>>(){});
System.out.println("AndoridJsonToRow process eventList:"+eventList !=null ? eventList.size():0);
if(eventList != null && eventList.size() > 0) {
for (EventLog eventLog : eventList) {
String event = eventLog.getR7();
Long event_time = eventLog.getR9();
String view_type = eventLog.getR8().getR1();
String view_id = eventLog.getR8().getR2();
String content = eventLog.getR8().getR3();
String screen_name = eventLog.getR8().getR4();
String touch_pressure = eventLog.getR8().getR5();
String draw_point = eventLog.getR8().getR6() == null ? null : eventLog.getR8().getR6().toString();
forward(new Object[]{id,device_id,unique_id,uid,app_key,view_type,view_id,content,screen_name,touch_pressure,draw_point,cid,phone,nick,route_ip,send_time,event,event_time,zone_code,dt});
}
}
Log.info("AndoridJsonToRow process eventList end!");
}
System.out.println("AndoridJsonToRow process end!");
} catch (Exception e) {
throw new UDFArgumentException("需要十个输入参数:id,device_id,unique_id,uid,app_key,event_list,route_ip,send_time,dt,user_properties,zone_code >>>>>:"+args.length);
}
System.out.println("AndoridJsonToRow process end!");
}
@Override
public void close() throws HiveException {
// TODO Auto-generated method stub
}
public static void main(String[] args) {
String user_properties = "[{\"id\":\"5676jypto\"}, {\"phone\":\"18177755676\"}, {\"nick\":\"DW20戴维支持\"}]";
List<UserProperties> userProperties = JSONObject.parseObject(user_properties,new TypeReference<List<UserProperties>>(){});
if(userProperties != null && userProperties.size() > 0) {
String cid = null;
String phone = null;
String nick = null;
for(UserProperties user : userProperties) {
if(StringUtils.isNotEmpty(user.getCid())) {
cid = user.getCid();
}else if(StringUtils.isNotEmpty(user.getPhone())) {
phone = user.getPhone();
}else if(StringUtils.isNotEmpty(user.getId())) {
cid = user.getId();
}else if(StringUtils.isNotEmpty(user.getNick())) {
nick = user.getNick();
}else if(StringUtils.isNotEmpty(user.getEmail())) {
nick = user.getEmail();
}
}
System.out.println(cid+">>"+phone+">>"+nick);
}
}
}
...@@ -23,80 +23,47 @@ import com.cloud.udf.util.SearcherUtil; ...@@ -23,80 +23,47 @@ import com.cloud.udf.util.SearcherUtil;
*/ */
public class IpToConvert extends GenericUDTF{ public class IpToConvert extends GenericUDTF{
private ArrayList<Object[]> result = new ArrayList<Object[]>();
@Override @Override
public StructObjectInspector initialize(ObjectInspector[] argOIs) throws UDFArgumentException { public StructObjectInspector initialize(ObjectInspector[] argOIs) throws UDFArgumentException {
// 检测参数数量 // 检测参数数量
if (argOIs.length != 6) {
throw new UDFArgumentException("需要六个输入参数:id,ip,cid,phone,nick,dt");
}
ArrayList<String> fieldNames = new ArrayList<String>(); ArrayList<String> fieldNames = new ArrayList<String>();
ArrayList<ObjectInspector> fieldOIs = new ArrayList<ObjectInspector>(); ArrayList<ObjectInspector> fieldOIs = new ArrayList<ObjectInspector>();
try {
fieldNames.add("id"); // 初始化输出字段(所有字段类型为String)
fieldOIs.add(PrimitiveObjectInspectorFactory.javaStringObjectInspector); String[] fields = {
fieldNames.add("ip"); "id","ip","area_name","cid","phone","nick","dt"
fieldOIs.add(PrimitiveObjectInspectorFactory.javaStringObjectInspector); };
fieldNames.add("area_name"); for (String field : fields) {
fieldOIs.add(PrimitiveObjectInspectorFactory.javaStringObjectInspector); fieldNames.add(field);
fieldNames.add("cid"); fieldOIs.add(PrimitiveObjectInspectorFactory.javaStringObjectInspector);
fieldOIs.add(PrimitiveObjectInspectorFactory.javaStringObjectInspector); }
fieldNames.add("phone"); } catch (Exception e) {
fieldOIs.add(PrimitiveObjectInspectorFactory.javaStringObjectInspector); throw new UDFArgumentException("需要六个输入参数:id,ip,cid,phone,nick,dt");
fieldNames.add("nick"); }
fieldOIs.add(PrimitiveObjectInspectorFactory.javaStringObjectInspector);
fieldNames.add("dt");
fieldOIs.add(PrimitiveObjectInspectorFactory.javaStringObjectInspector);
return ObjectInspectorFactory.getStandardStructObjectInspector(fieldNames, fieldOIs); return ObjectInspectorFactory.getStandardStructObjectInspector(fieldNames, fieldOIs);
} }
@Override @Override
public void process(Object[] args) throws HiveException { public void process(Object[] args) throws HiveException {
System.out.println("IpToConvert process start! args:"+args !=null ? args.length:0); System.out.println("IpToConvert process start! args:"+args !=null ? args.length:0);
if (args.length != 6) { try {
throw new HiveException("需要六个输入参数:id,ip,cid,phone,nick,dt"); String id = args[0] != null ? args[0].toString() : null;
} String ip = args[1] != null ? args[1].toString() : null;
String cid = args[2] != null ? args[2].toString() : null;
String id = null; String phone = args[3] != null ? args[3].toString() : null;
if(args[0] != null) { String nick = args[4] != null ? args[4].toString() : null;
id = args[0].toString(); String dt = args[5] != null ? args[5].toString() : null;
} String area_name = SearcherUtil.getCityInfoByFile(ip);
String ip = args[1].toString(); forward(new Object[]{id,ip,area_name,cid,phone,nick,dt});
String cid = null;
if(args[2] != null) {
cid = args[2].toString();
}
String phone = null;
if(args[3] != null) {
phone = args[3].toString();
}
String nick = null;
if(args[4] != null) {
nick = args[4].toString();
}
String dt = null;
if(args[5] != null) {
dt = args[5].toString();
}
String area_name = null;
try {
area_name = SearcherUtil.getCityInfoByFile(ip);
} catch (Exception e) { } catch (Exception e) {
System.out.println("ip:"+ip+"e:"+e.toString()); System.out.println("e:"+e.toString());
throw new HiveException("需要六个输入参数:id,ip,cid,phone,nick,dt");
} }
result.add(new Object[]{id,ip,area_name,cid,phone,nick,dt});
System.out.println("IpToConvert process end!"); System.out.println("IpToConvert process end!");
} }
@Override @Override
public void close() throws HiveException { public void close() throws HiveException {
for (Object[] r : result) {
forward(r);
}
result.clear();
} }
private Boolean ipv6(String ip) { private Boolean ipv6(String ip) {
......
package com.cloud.udf.vo.jsonToRow;
import java.io.Serializable;
/**
* @author wjs
* @version 创建时间:2025-3-18 11:15:27
* 类说明
*/
public class UserProperties implements Serializable{
/**
*
*/
private static final long serialVersionUID = 1L;
private String cid;
private String id;
private String phone;
private String email;
private String nick;
public String getCid() {
return cid;
}
public void setCid(String cid) {
this.cid = cid;
}
public String getId() {
return id;
}
public void setId(String id) {
this.id = id;
}
public String getPhone() {
return phone;
}
public void setPhone(String phone) {
this.phone = phone;
}
public String getEmail() {
return email;
}
public void setEmail(String email) {
this.email = email;
}
public String getNick() {
return nick;
}
public void setNick(String nick) {
this.nick = nick;
}
}
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or sign in to comment