Mr Bean's Blog

record something

Flume in Action(1)

| Comments

使用Flume Log4j Appender正确的姿势

我们使用Flume-ng的LoadBalancingLog4jAppender,将线上服务的日志实时传输到日志服务器,转交给告警系统和HDFS做存储。
FLume的Log4j Appender必须使用Log4j的异步加载器,否则一旦日志服务器挂掉,将会导致应用服务器宕机。

使用过程中的坑

问题1: Flume Log4j使用异步加载器,日志服务器宕机情况导致业务系统阻塞

在阅读了Flume的RPC源码以及LoadBalancingLog4jAppender的实现之后,发现问题原来在Log4j的异步加载器AsyncAppender。异步加载器的原理见这里
根本原因是、日志服务器宕机导致消费者消费能力不足,缓冲区满的情况下,AsyncAppender会阻塞程序。设置Blocking=false之后就可以了。

问题2:Flume Log4j发送大量链接异常日志

当其中一台日志服务器宕机,其他的日志服务器就会不停的接收到链接异常的日志。明显是重连的时间间隔太短。在LoadBalancingRpcClient中,

1
2
3
4
5
6
7
8
9
10
11
12
while (it.hasNext()) {
  HostInfo host = it.next();
  try {
    RpcClient client = getClient(host);
    client.append(event); 
    eventSent = true;
    break;
  } catch (Exception ex) {
    selector.informFailure(host); //宕机情况标志该主机异常
    LOGGER.warn("Failed to send event to host " + host, ex);
  }
}

Flume默认不启用back off,也就是说selector.informFailure(host)这行代码完全没用。简直坑爹。OrderSelector.java:

1
2
3
4
5
6
7
8
  public void informFailure(T failedObject) {
    //If there is no backoff this method is a no-op.
    if (!shouldBackOff) {
      return;
    }
    //将该主机暂时移除可用主机列表
    ...
    

所以解决办法:配置max back off

问题3:Flume Log4j失败重连策略异常

问题体现在,设置了max back off,重连时间居然一直是2000ms,看了一下它的算法,指数退避算法。在OrderSelector.java的informFailure函数中。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
  public void informFailure(T failedObject) {
    //If there is no backoff this method is a no-op.
    if (!shouldBackOff) {
      return;
    }
    FailureState state = stateMap.get(failedObject);
    long now = System.currentTimeMillis();
    long delta = now - state.lastFail;
    long lastBackoffLength = Math.min(maxTimeout, 1000 * (1 << state.sequentialFails));
    long allowableDiff = lastBackoffLength + CONSIDER_SEQUENTIAL_RANGE;
    if (allowableDiff > delta) {
      if (state.sequentialFails < EXP_BACKOFF_COUNTER_LIMIT) {
        state.sequentialFails++;
      }
    } else {
      state.sequentialFails = 1;
    }
    state.lastFail = now;
    //Depending on the number of sequential failures this component had, delay
    //its restore time. Each time it fails, delay the restore by 1000 ms,
    //until the maxTimeOut is reached.
    state.restoreTime = now + Math.min(maxTimeout, 1000 * (1 << state.sequentialFails));
  }

最后生成的restoreTime即下一次进行重试的时间。我没有去设置avro connect time out 和request time out,默认都是20s,应该算是偏长了。根据他的算法,delta永远是大于40s,但是allowableDiff却一直是3s,4s.所以我直接改了判定条件,allowableDiff < delta,之后就正常。但是还存在一个问题,sequentialFails并不会在一段时间后reset.

问题4:Log4j异步加载器丢失日志数据

AsyncAppender默认缓冲区大小128,满了之后会丢失数据。调大缓冲区,avro connect time out 和request time out也得适当调一下

修改Flume Log4j Appender

| Comments

自定义Log4j Appender

要修改Flume Log4j Appender的实现,我们先了解一下Log4j Appender是如何自定义的。

自定义log4j appender需要继承log4j公共的基类:AppenderSkeleton

  • 打印日志核心方法:abstract protected void append(LoggingEvent event);
  • 初始化加载资源:public void activateOptions(),默认实现为空
  • 释放资源:public void close()
  • 是否需要按格式输出文本:public boolean requiresLayout()
  • 正常情况下我们只需要覆盖append方法即可。然后就可以在log4j中使用了

Demo:

1. import org.apache.log4j.AppenderSkeleton;  
2. import org.apache.log4j.spi.LoggingEvent;  
3.   
4. public class HelloAppender extends AppenderSkeleton {  
5.   
6.     private String account ;  
7.       
8.     @Override  
9.     protected void append(LoggingEvent event) {  
10.         System.out.println("Hello, " + account + " : "+ event.getMessage());  
11.     }  
12.   
13.     @Override  
14.     public void close() {  
15.         // TODO Auto-generated method stub  
16.   
17.     }  
18.   
19.     @Override  
20.     public boolean requiresLayout() {  
21.         // TODO Auto-generated method stub  
22.         return false;  
23.     }  
24.   
25.     public String getAccount() {  
26.         return account;  
27.     }  
28.   
29.     public void setAccount(String account) {  
30.         this.account = account;  
31.     }  
32. }  

1. public static void main(String[] args) {  
2.     Log log = LogFactory.getLog("helloLog") ;  
3.     log.info("I am ready.") ;  
4. }  

log4j.properties 配置

1
2
3
4
log4j.logger.helloLog=INFO, hello

log4j.appender.hello=HelloAppender
log4j.appender.hello.account=World

执行main函数,输出结果 Hello, World : I am ready.

修改FLume Log4j Appender

Event Header加入appname,hostname,logtype

由于hostname,logtype是固定不变的。所以直接写死在代码中。appname则用log4j.properties进行配置 – appname在log4j.properties中配置,添加:

1
2
#应用程序名
log4j.appender.flume.Appname = 91pc
  • 在Log4jAppender类中添加:
1
2
3
4
5
6
7
8
9
private String appname;

public String getAppname() {
  return appname;
}

public void setAppname(String appname) {
  this.appname = appname;
}
  • hostname,logtype在Log4jAppender类中添加
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
//author:scy
try {
  hdrs.put("hostname", InetAddress.getLocalHost()+"");
} catch (UnknownHostException e1) {
      String msg = "Cant't get localhost IP";
      LogLog.error(msg);
      if (unsafeMode) {
        return;
      }
      throw new FlumeException(msg + " Exception follows.", e1);
}
//author:scy Need to alarm(>Info)
if(event.getLevel().toInt() > 20000){
  hdrs.put("logtype", "alarm");
}else{
  hdrs.put("logtype", "log4j");
}

Flume Log4j Appender添加log4j的异常详细信息

由于Flume Log4j Appender并没有将Log4j的错误异常栈详细信息封装到Event中,不利于我们的告警系统分析原因。 Log4jAppender.append()中添加如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
Event flumeEvent;
Object message = event.getMessage();
if (message instanceof GenericRecord) {
..
} else {
  hdrs.put(Log4jAvroHeaders.MESSAGE_ENCODING.toString(), "UTF8");
  //按照log4j.properties配置格式化日志
  String msg = layout != null ? layout.format(event) : message.toString();
        
  //author:edwardsbean
  if(layout.ignoresThrowable()) {
      String[] s = event.getThrowableStrRep();
      if (s != null) {
  int len = s.length;
  for(int i = 0; i < len; i++) {
      msg += s[i];
      msg += Layout.LINE_SEP;
  }
    }
  }
  
  flumeEvent = EventBuilder.withBody(msg, Charset.forName("UTF8"), hdrs);
}
try {
  rpcClient.append(flumeEvent);

日志接收端就可以接受到日志的详细信息:

1
2
3
 [x] Received '2013-12-26 10:38:08 user log detail
 [nd-PC2600/192.168.253.126] FATAL [com.xx.test.Main] java.lang.Exception: error detail
  at com.xx.test.Main.main(Main.java:35)