一、flume限速拦截
flume架构图
ExecSource exec数据源--实时收集
限速拦截器 代码实现 拦截器 Interceptor 接口
通过第一次发送的时间计算出 ,发送一个事件本应该所需的时间,与实际发送的时间做对比。如果实际的发送时间<本应该花费的时间,说明超速了
1 package com.oldboy.umeng.flume.interceptor; 2 3 import java.util.List; 4 5 import org.apache.flume.Context; 6 import org.apache.flume.Event; 7 import org.apache.flume.interceptor.Interceptor; 8 import org.slf4j.Logger; 9 import org.slf4j.LoggerFactory;10 11 /**12 * 限速拦截器13 */14 public class SpeedInterceptor implements Interceptor {15 private static final Logger logger = LoggerFactory.getLogger(org.apache.flume.interceptor.HostInterceptor.class);16 //每秒发送的字节数17 private int speed = 1024;18 19 /**20 */21 private SpeedInterceptor(int speed) {22 this.speed = speed ;23 }24 25 public void initialize() {26 }27 28 /**29 * 拦截事件30 */31 //上次发送记录的毫秒数32 long prevTimeMs = 0 ;33 int prevDataLen = 0 ;34 public Event intercept(Event event) {35 System.out.println("开始拦截!");36 byte[] data = event.getBody();37 //第一次发送38 if(prevTimeMs == 0){39 prevTimeMs = System.currentTimeMillis() ;40 prevDataLen = data.length ;41 System.out.println("首次发送! : " + prevDataLen);42 return event ;43 }44 else{45 long now = System.currentTimeMillis() ;46 47 //实际消耗的时间48 long duration = now - prevTimeMs;49 50 //本应该花费的时间51 long shouldBeTimeMs = (long)((double)prevDataLen / speed * 1000) ;52 53 if(duration < shouldBeTimeMs ){54 try {55 System.out.println("超速了 : 休眠 " + (shouldBeTimeMs - duration));56 Thread.sleep(shouldBeTimeMs - duration);57 } catch (InterruptedException e) {58 e.printStackTrace();59 }60 }61 prevTimeMs = System.currentTimeMillis() ;62 prevDataLen = data.length ;63 return event;64 }65 }66 67 /**68 * 拦截事件集合69 */70 public Listintercept(List events) {71 for (Event event : events) {72 intercept(event);73 }74 return events;75 }76 77 public void close() {78 }79 80 /**81 *82 */83 public static class Builder implements Interceptor.Builder {84 private int speed ;85 86 public Interceptor build() {87 System.out.println("拦截器已创建!!");88 return new SpeedInterceptor(speed);89 }90 91 //从配置文件得参数92 public void configure(Context context) {93 speed = context.getInteger("speed", 1024);94 }95 }96 }
导成jar包 ---> 加到 /soft/flume/lib下
/soft/flume/lib
配置
/soft/flume/conf
umeng_nginx_to_kafka.conf
配置拦截器集合---flume配置文件(这个是完整版的---包括去重拦截器、自定义防丢失源、限速拦截器)
a1.sources = r1a1.channels = c1a1.sinks = k1a1.sources.r1.type = com.oldboy.umeng.flume.UmengExecSourcea1.sources.r1.command = tail -F /usr/local/openresty/nginx/logs/access.loga1.sources.r1.spooldir = /usr/local/openresty/nginx/logsa1.sources.r1.prefix = access.log.a1.sources.r1.suffix = COMPLETEDa1.sources.r1.redisHost = s101a1.sources.r1.redisPort = 6379a1.sources.r1.interceptors = i1 i2a1.sources.r1.interceptors.i1.type = com.oldboy.umeng.flume.interceptor.DuplicateInterceptor$Buildera1.sources.r1.interceptors.i1.redisHost = s101a1.sources.r1.interceptors.i1.redisPort = 6379a1.sources.r1.interceptors.i1.expire = 3600a1.sources.r1.interceptors.i2.type = com.oldboy.umeng.flume.interceptor.SpeedInterceptor$Buildera1.sources.r1.interceptors.i2.speed = 512 //每次发送512个字节a1.channels.c1.type = memorya1.channels.c1.capacity = 10000a1.sinks.k1.type = org.apache.flume.sink.kafka.KafkaSinka1.sinks.k1.kafka.topic = big12-umeng-raw-logsa1.sinks.k1.kafka.bootstrap.servers = s102:9092a1.sinks.k1.kafka.flumeBatchSize = 20a1.sinks.k1.kafka.producer.acks = 1a1.sinks.k1.kafka.producer.linger.ms = 0a1.sources.r1.channels=c1a1.sinks.k1.channel=c1
启动flume
启动kafka
。。。。。。
实现拦截限速
二、flume自定义源防丢失---解决flume还未收集完日志便已经滚动,数据丢失问题(检查 access.log.* 滚动后的文件 依次比对key是否在redis中)
防重、防丢失
改造exec源 :监控目录、收集新文件---增加个守护线程不断监控目录
redis 3号库维护一个key ,防止重复收集。key期限:3天
改造ExecSource 防丢失
UmengExecSource---监控目录、收集新文件---增加个守护线程不断监控目录 自定义数据源UmengExecSource代码太多
去重拦截器---DuplicateInterceptor 防重复
配置redis中 key有效期三天---
redis.expire(key , expire) ;
expire = context.getInteger("expire", 3 * 24 * 60 * 60);
1 package com.oldboy.umeng.flume.interceptor; 2 3 import org.apache.flume.Context; 4 import org.apache.flume.Event; 5 import org.apache.flume.interceptor.Interceptor; 6 import org.slf4j.Logger; 7 import org.slf4j.LoggerFactory; 8 import redis.clients.jedis.Jedis; 9 10 import java.util.List;11 12 /**13 * 去重拦截器14 */15 public class DuplicateInterceptor implements Interceptor {16 private static final Logger logger = LoggerFactory.getLogger(org.apache.flume.interceptor.HostInterceptor.class);17 //过期秒数18 private int expire ;19 private String redisHost ;20 private int redisPort ;21 private int redisDB ;22 23 private Jedis redis ;24 25 /**26 * 构造27 */28 private DuplicateInterceptor(String redisHost ,int redisPort ,int redisDB, int expire) {29 this.redisHost = redisHost ;30 this.redisPort = redisPort ;31 this.redisDB = redisDB ;32 this.expire = expire ;33 }34 35 public void initialize() {36 System.out.println("去重拦截器初始化!!");37 redis = new Jedis(redisHost ,redisPort) ;38 redis.select(redisDB) ;39 }40 41 /**42 * 拦截事件43 */44 public Event intercept(Event event) {45 String line = new String(event.getBody()) ;46 String key = line.substring(0,line.lastIndexOf("#"));47 System.out.println("去重的key : " + key);48 if(redis.exists(key)){49 System.out.println(key + "存在了!");50 return null ;51 }52 else{53 redis.set(key , "x") ;54 redis.expire(key , expire) ;55 System.out.println(key + " : 不存在!!");56 return event ;57 }58 }59 60 /**61 * 拦截事件集合62 */63 public Listintercept(List events) {64 for (Event event : events) {65 intercept(event);66 }67 return events;68 }69 70 public void close() {71 }72 73 /**74 *75 */76 public static class Builder implements Interceptor.Builder {77 private String redisHost ;78 private int redisPort ;79 private int redisDB ;80 private int expire ;81 82 public Interceptor build() {83 System.out.println("拦截器已创建!!");84 return new DuplicateInterceptor(redisHost , redisPort ,redisDB,expire);85 }86 87 //从配置文件得参数88 public void configure(Context context) {89 redisHost = context.getString("redisHost", "s101");90 redisPort = context.getInteger("redisPort", 6379);91 expire = context.getInteger("expire", 3 * 24 * 60 * 60);92 redisDB = context.getInteger("redisDB", 3);93 }94 }95 }
导出jar包
jar包 内容
jar包放 /soft/flume/lib
配置flume配置文件
/soft/flume/conf
umeng_nginx_to_kafka.conf
配置拦截器集合---flume配置文件
见上面flume配置文件