Commit f57f8f42 by 魏建枢

拦截器初始化提交

parents
#!groovy
/* groovylint-disable-next-line CompileStatic, NoDef, VariableName, VariableTypeRequired test taggie*/
@Library('jenkins-ops-pipeline-library@main') _
eagleEyeHomeCIPipeline codeType: 'eaglejava',
packagePath: 'eagleEye-service/target/eagleEye-service.jar'
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>com.flume</groupId>
<artifactId>flume-interceptor</artifactId>
<version>0.0.1-SNAPSHOT</version>
<packaging>jar</packaging>
<dependencies>
<!-- Json转换 -->
<dependency>
<groupId>com.alibaba</groupId>
<artifactId>fastjson</artifactId>
<version>1.2.62</version>
</dependency>
<dependency>
<groupId>org.apache.flume</groupId>
<artifactId>flume-ng-core</artifactId>
<version>1.11.0</version>
<scope>provided</scope>
</dependency>
</dependencies>
<build>
<plugins>
<plugin>
<!--用于编译Java代码,将源代码编译成目标字节码,并生成class文件。这里使用的版本是2.3.2,指定了编译器的源版本和目标版本都是1.8。 -->
<artifactId>maven-compiler-plugin</artifactId>
<version>2.3.2</version>
<configuration>
<source>1.8</source>
<target>1.8</target>
</configuration>
</plugin>
<plugin>
<!--用于将当前模块及其所有依赖打包成一个可执行的JAR文件, 其中使用了descriptorRef为"jar-with-dependencies"的描述符来实现依赖包的合并,它在Maven打包期间会自动将相关的依赖项打包进去。 -->
<artifactId>maven-assembly-plugin</artifactId>
<configuration>
<descriptorRefs>
<descriptorRef>jar-with-dependencies</descriptorRef>
</descriptorRefs>
</configuration>
<executions>
<execution>
<id>make-assembly</id>
<phase>package</phase>
<goals>
<goal>single</goal>
</goals>
</execution>
</executions>
</plugin>
</plugins>
</build>
</project>
package com.org.apache.flume.interceptor;
import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.List;
import org.apache.flume.Context;
import org.apache.flume.Event;
import org.apache.flume.event.EventBuilder;
import org.apache.flume.interceptor.Interceptor;
import com.alibaba.fastjson.JSONObject;
import com.org.apache.flume.interceptor.interceptor.InterceptorSelector;
/**
* @author wjs
* @version 创建时间:2024-11-5 10:55:31
* 类说明
*/
public class TimestampeInterceptor implements Interceptor{
private ArrayList<Event> events = new ArrayList<>();
@Override
public void initialize() {
// TODO Auto-generated method stub
}
@Override
public Event intercept(Event event) {
String log = new String(event.getBody(),StandardCharsets.UTF_8);
System.out.println("TimestampeInterceptor>>>>>>>>>>>>>>>>>>>>>开始"+log);
JSONObject jsonObj = new JSONObject();
try {
jsonObj = JSONObject.parseObject(log);
} catch (Exception e) {
// TODO: handle exception
}
if(null != jsonObj) {
System.out.println("TimestampeInterceptorjson json解析成功开始"+jsonObj.toJSONString());
return EventBuilder.withBody(InterceptorSelector.selector(jsonObj),StandardCharsets.UTF_8);
}
System.out.println("TimestampeInterceptor>>>>>>>>>>>>>>>>>>>>>结束");
return null;
}
@Override
public List<Event> intercept(List<Event> list) {
events.clear();
for (Event event:list){
events.add(intercept(event));
}
return events;
}
@Override
public void close() {
// TODO Auto-generated method stub
}
public static class Builder implements Interceptor.Builder{
@Override
public Interceptor build() {
return new TimestampeInterceptor();
}
@Override
public void configure(Context context) {
// TODO Auto-generated method stub
}
}
}
package com.org.apache.flume.interceptor.interceptor;
import java.util.ArrayList;
import java.util.List;
import com.alibaba.fastjson.JSONObject;
/**
* @author wjs
* @version 创建时间:2024-11-18 18:26:52
* 类说明
*/
public class InterceptorDataFormatUtil {
public static List<String> collectLogDataFormat(JSONObject jsonObj) {
List<String> arrayList = new ArrayList<>();
arrayList.add(jsonObj.getString("device_id"));
arrayList.add(jsonObj.getString("strategy_group_id"));
arrayList.add(jsonObj.getString("strategy_version"));
arrayList.add(jsonObj.getString("send_time"));
arrayList.add(jsonObj.getString("create_time"));
arrayList.add(jsonObj.getString("app_key"));
arrayList.add(jsonObj.getString("app_type"));
arrayList.add(jsonObj.getString("app_channel"));
arrayList.add(jsonObj.getString("zone_code"));
arrayList.add(jsonObj.getString("zone_name"));
arrayList.add(jsonObj.getString("zone_type"));
arrayList.add(jsonObj.getString("sdk_version"));
arrayList.add(jsonObj.getString("collect_log"));
arrayList.add(jsonObj.getString("uesr_agent"));
return arrayList;
}
public static List<String> zipperStrategyDataFormat(JSONObject jsonObj) {
List<String> arrayList = new ArrayList<>();
arrayList.add(jsonObj.getString("group_id"));
arrayList.add(jsonObj.getString("strategy_group_time"));
arrayList.add(jsonObj.getString("strategy_time"));
arrayList.add(jsonObj.getString("group_version"));
arrayList.add(jsonObj.getString("zipper_version"));
arrayList.add(jsonObj.getString("strategy_group"));
arrayList.add(jsonObj.getString("strategy"));
return arrayList;
}
}
package com.org.apache.flume.interceptor.interceptor;
import java.util.List;
import org.apache.commons.lang3.StringUtils;
import com.alibaba.fastjson.JSONObject;
/**
* @author wjs
* @version 创建时间:2024-11-18 18:36:48
* 类说明
*/
public class InterceptorSelector {
public static String selector(JSONObject jsonObj) {
String resultStr = null;
String flume_type = jsonObj.getString("flume_type");
if(StringUtils.equals(flume_type, "zipperStrategy")) {
return dataSplit(InterceptorDataFormatUtil.zipperStrategyDataFormat(jsonObj));
}else if(StringUtils.equals(flume_type, "collectLog")) {
return dataSplit(InterceptorDataFormatUtil.collectLogDataFormat(jsonObj));
}
return resultStr;
}
private static String dataSplit(List<String> resultList) {
System.out.println("InterceptorSelector>>>>>>>>>>>>>>>>>>>>>解析成功"+resultList==null?0:resultList);
return StringUtils.join(resultList,"\001");
}
}
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or sign in to comment