自定义Interceptor是Flume中一个非常有用的功能,它允许用户在Flume中添加自己的拦截器,以实现自定义的日志处理逻辑。
要创建一个自定义的Interceptor,需要实现Flume的Interceptor
接口,并实现接口中定义的所有方法。然后,可以在Flume的配置文件中添加该拦截器,并指定它的位置。
拦截器将在Flume中的数据流中执行,并可以对数据进行处理、修改、过滤或转换。拦截器的处理逻辑取决于实现。
比如 拦截 超过100K 的消息
新建一个 maven 项目 在 pom.xml
中添加如下依赖
1 2 3 4 5 6 7
| <dependencies> <dependency> <groupId>org.apache.flume</groupId> <artifactId>flume-ng-core</artifactId> <version>1.9.0</version> </dependency> </dependencies>
|
然后 实现 Interceptor 接口
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48
| public class MyInterceptor implements Interceptor { private static final int MAX_MESSAGE_SIZE = 100 * 1024; @Override public void initialize() { } @Override public Event intercept(Event event) { if (event.getBody().length > MAX_MESSAGE_SIZE) { return null; } return event; } @Override public List<Event> intercept(List<Event> events) { List<Event> intercepted = new ArrayList<>(); for (Event event : events) { Event interceptedEvent = intercept(event); if (interceptedEvent != null) { intercepted.add(interceptedEvent); } } return intercepted; } @Override public void close() { } public static class Builder implements Interceptor.Builder { @Override public Interceptor build() { return new MyInterceptor(); } @Override public void configure(Context context) { } } }
|
将 项目打成 jar 包以后 上传到 flume 的 lib 目录下
如果想使用自定义的 拦截器 ,只需要在配置中设置即可
1 2 3
| a1.sources.r1.interceptors = i1 a1.sources.r1.interceptors.i1.type = com.example.MyInterceptor$Builder
|