博客
关于我
强烈建议你试试无所不能的chatGPT,快点击我
24 友盟项目--优化-flume限速拦截、flume自定义源防丢失--改造exec源守护线程监控目录(防丢失)redis维护key(去重)...
阅读量:5371 次
发布时间:2019-06-15

本文共 7381 字,大约阅读时间需要 24 分钟。

 

一、flume限速拦截

 flume架构图

 

ExecSource   exec数据源--实时收集

 

 

 

限速拦截器  代码实现 拦截器 Interceptor 接口

通过第一次发送的时间计算出 ,发送一个事件本应该所需的时间,与实际发送的时间做对比。如果实际的发送时间<本应该花费的时间,说明超速了

1 package com.oldboy.umeng.flume.interceptor; 2  3 import java.util.List; 4  5 import org.apache.flume.Context; 6 import org.apache.flume.Event; 7 import org.apache.flume.interceptor.Interceptor; 8 import org.slf4j.Logger; 9 import org.slf4j.LoggerFactory;10 11 /**12  * 限速拦截器13  */14 public class SpeedInterceptor implements Interceptor {15     private static final Logger logger = LoggerFactory.getLogger(org.apache.flume.interceptor.HostInterceptor.class);16     //每秒发送的字节数17     private int speed  = 1024;18 19     /**20      */21     private SpeedInterceptor(int speed) {22         this.speed = speed ;23     }24 25     public void initialize() {26     }27 28     /**29      * 拦截事件30      */31     //上次发送记录的毫秒数32     long prevTimeMs = 0 ;33     int prevDataLen = 0 ;34     public Event intercept(Event event) {35         System.out.println("开始拦截!");36         byte[] data = event.getBody();37         //第一次发送38         if(prevTimeMs == 0){39             prevTimeMs = System.currentTimeMillis() ;40             prevDataLen = data.length ;41             System.out.println("首次发送! : " + prevDataLen);42             return event ;43         }44         else{45             long now = System.currentTimeMillis() ;46 47             //实际消耗的时间48             long duration = now - prevTimeMs;49 50             //本应该花费的时间51             long shouldBeTimeMs = (long)((double)prevDataLen / speed * 1000) ;52 53             if(duration < shouldBeTimeMs ){54                 try {55                     System.out.println("超速了 : 休眠 " + (shouldBeTimeMs - duration));56                     Thread.sleep(shouldBeTimeMs - duration);57                 } catch (InterruptedException e) {58                     e.printStackTrace();59                 }60             }61             prevTimeMs = System.currentTimeMillis() ;62             prevDataLen = data.length ;63             return event;64         }65     }66 67     /**68      * 拦截事件集合69      */70     public List
intercept(List
events) {71 for (Event event : events) {72 intercept(event);73 }74 return events;75 }76 77 public void close() {78 }79 80 /**81 *82 */83 public static class Builder implements Interceptor.Builder {84 private int speed ;85 86 public Interceptor build() {87 System.out.println("拦截器已创建!!");88 return new SpeedInterceptor(speed);89 }90 91 //从配置文件得参数92 public void configure(Context context) {93 speed = context.getInteger("speed", 1024);94 }95 }96 }

 导成jar包   --->   加到 /soft/flume/lib下

/soft/flume/lib

配置

/soft/flume/conf

umeng_nginx_to_kafka.conf

配置拦截器集合---flume配置文件(这个是完整版的---包括去重拦截器、自定义防丢失源、限速拦截器)

a1.sources = r1a1.channels = c1a1.sinks = k1a1.sources.r1.type = com.oldboy.umeng.flume.UmengExecSourcea1.sources.r1.command = tail -F /usr/local/openresty/nginx/logs/access.loga1.sources.r1.spooldir = /usr/local/openresty/nginx/logsa1.sources.r1.prefix = access.log.a1.sources.r1.suffix = COMPLETEDa1.sources.r1.redisHost = s101a1.sources.r1.redisPort = 6379a1.sources.r1.interceptors = i1 i2a1.sources.r1.interceptors.i1.type = com.oldboy.umeng.flume.interceptor.DuplicateInterceptor$Buildera1.sources.r1.interceptors.i1.redisHost = s101a1.sources.r1.interceptors.i1.redisPort = 6379a1.sources.r1.interceptors.i1.expire = 3600a1.sources.r1.interceptors.i2.type = com.oldboy.umeng.flume.interceptor.SpeedInterceptor$Buildera1.sources.r1.interceptors.i2.speed = 512    //每次发送512个字节a1.channels.c1.type = memorya1.channels.c1.capacity = 10000a1.sinks.k1.type = org.apache.flume.sink.kafka.KafkaSinka1.sinks.k1.kafka.topic = big12-umeng-raw-logsa1.sinks.k1.kafka.bootstrap.servers = s102:9092a1.sinks.k1.kafka.flumeBatchSize = 20a1.sinks.k1.kafka.producer.acks = 1a1.sinks.k1.kafka.producer.linger.ms = 0a1.sources.r1.channels=c1a1.sinks.k1.channel=c1

启动flume

启动kafka

 

。。。。。。

 实现拦截限速

 

 

二、flume自定义源防丢失---解决flume还未收集完日志便已经滚动,数据丢失问题(检查  access.log.*  滚动后的文件 依次比对key是否在redis中

防重、防丢失

 

改造exec源 :监控目录、收集新文件---增加个守护线程不断监控目录

redis 3号库维护一个key  ,防止重复收集。key期限:3天

 

 改造ExecSource  防丢失

UmengExecSource---监控目录、收集新文件---增加个守护线程不断监控目录 自定义数据源UmengExecSource代码太多
 

 

 

去重拦截器---DuplicateInterceptor 防重复

配置redis中 key有效期三天---

redis.expire(key , expire) ;
expire = context.getInteger("expire", 3 * 24 * 60 * 60);

 

1 package com.oldboy.umeng.flume.interceptor; 2  3 import org.apache.flume.Context; 4 import org.apache.flume.Event; 5 import org.apache.flume.interceptor.Interceptor; 6 import org.slf4j.Logger; 7 import org.slf4j.LoggerFactory; 8 import redis.clients.jedis.Jedis; 9 10 import java.util.List;11 12 /**13  * 去重拦截器14  */15 public class DuplicateInterceptor implements Interceptor {16     private static final Logger logger = LoggerFactory.getLogger(org.apache.flume.interceptor.HostInterceptor.class);17     //过期秒数18     private int expire ;19     private String redisHost ;20     private  int redisPort ;21     private int redisDB ;22 23     private Jedis redis ;24 25     /**26      * 构造27      */28     private DuplicateInterceptor(String redisHost ,int redisPort ,int redisDB, int expire) {29         this.redisHost = redisHost ;30         this.redisPort = redisPort ;31         this.redisDB = redisDB ;32         this.expire = expire ;33     }34 35     public void initialize() {36         System.out.println("去重拦截器初始化!!");37         redis = new Jedis(redisHost ,redisPort) ;38         redis.select(redisDB) ;39     }40 41     /**42      * 拦截事件43      */44     public Event intercept(Event event) {45         String line = new String(event.getBody()) ;46         String key = line.substring(0,line.lastIndexOf("#"));47         System.out.println("去重的key : " + key);48         if(redis.exists(key)){49             System.out.println(key + "存在了!");50             return null ;51         }52         else{53             redis.set(key , "x") ;54             redis.expire(key , expire) ;55             System.out.println(key + " : 不存在!!");56             return event ;57         }58     }59 60     /**61      * 拦截事件集合62      */63     public List
intercept(List
events) {64 for (Event event : events) {65 intercept(event);66 }67 return events;68 }69 70 public void close() {71 }72 73 /**74 *75 */76 public static class Builder implements Interceptor.Builder {77 private String redisHost ;78 private int redisPort ;79 private int redisDB ;80 private int expire ;81 82 public Interceptor build() {83 System.out.println("拦截器已创建!!");84 return new DuplicateInterceptor(redisHost , redisPort ,redisDB,expire);85 }86 87 //从配置文件得参数88 public void configure(Context context) {89 redisHost = context.getString("redisHost", "s101");90 redisPort = context.getInteger("redisPort", 6379);91 expire = context.getInteger("expire", 3 * 24 * 60 * 60);92 redisDB = context.getInteger("redisDB", 3);93 }94 }95 }

 导出jar包

jar包 内容

 

 

 

 

 

jar包放  /soft/flume/lib

配置flume配置文件

/soft/flume/conf

umeng_nginx_to_kafka.conf

配置拦截器集合---flume配置文件

见上面flume配置文件

转载于:https://www.cnblogs.com/star521/p/10003530.html

你可能感兴趣的文章
cs231n spring 2017 lecture11 Detection and Segmentation
查看>>
安装docker
查看>>
NSMutableString
查看>>
[SDOI2010]地精部落 题解
查看>>
beta冲刺6
查看>>
PHP 5.2、5.3、5.4、5.5、5.6 版本区别对比以及新功能详解
查看>>
Oracle数据库连接字符串
查看>>
FLASH动画
查看>>
四则运算加强版
查看>>
1. Storm介绍
查看>>
今天网站wordpress所有页面全部都404了,把固定链接重新“确定下”就好了
查看>>
Luogu 3665 [USACO17OPEN]Switch Grass 切换牧草
查看>>
[NOIP2014]无线网站发射器选址
查看>>
设计模式:单例(Singleton)
查看>>
[BZOJ3751] [NOIP2014] 解方程 (数学)
查看>>
阻止冒泡和阻止默认事件的兼容写法
查看>>
js jQuery函数 $.ajax()
查看>>
CAT Build Status(转存)
查看>>
BI7.0 IDES虚拟机安装
查看>>
django中间件
查看>>