并发编程实践部分源码使用github里面部分开源框架源码,重新整理,如有错误或版权问题,请指出指正。
欢迎star、fork,读书笔记系列会同步更新
git
模块j360-jdk-application
前言
平常开发过程中使用并发框架的场景在SSH的框架里面直接接触的机会并不是很多,基本上大量使用的就是部分原子类,然而在底层实现中,为了实现高并发的可能,jdk并发编程框架几乎所谓不在,每个并发编程类以及其参数都有各自所擅长的场景,并发编程框架不是一卡通,但是却很像搭积木,通过组合几乎很多的并发场景都不在话下,这里就是用批量日志管理接口的场景中,如何使用并发编程类实现高并发下的日志管理。
内容
业务日志在每个系统中都是不可或缺的功能点,业务日志的输出也有多种的输出形式,会话界面、文本、数据库等等,通常业务日志在系统中的埋点会作为一个日志单元,一个业务系统每天产生的日志数量=每个处理流程*埋点*pv(或者同单位request),在高并发系统中,单个集群节点处理日志也通常采用并发框架批量进行处理,这里分别设置两个条件来对收集的日志进行处理:
1:每nSeconds批量将收集的日志队列输出
2:日志队列占用的内存>设置的阈值(保护n秒内的队列过大)
类图
包结构
类说明
分别代表工厂类、日志方法接口类、代理类以及实现类的工厂类、方法实现类
先看接口类的方法:
public interface BizLogger { public void log(BizLogPo bizLogPo); public void log(ListbizLogPos);}
分别处理单条和多条日志
再看会话打印实现类:
private Logger LOGGER = LoggerFactory.getLogger(ConsoleBizLogger.class.getSimpleName());@Overridepublic void log(BizLogPo jobLogPo) { LOGGER.info(JSONUtils.toJSONString(jobLogPo));}@Overridepublic void log(ListjobLogPos) { for (BizLogPo jobLogPo : jobLogPos) { log(jobLogPo); }}
这里使用slf4j接口类作为会话打印的输出接口,实现类同样可以使用mysql、file等输出形式
通过工厂类配置获得实现类,通常会使用多个实现类提供接口,而多个实现类难以同时对实现过程进行控制,这里引入了代理类来调用具体的实现类,而并发编程框架通常在代理类中进行集中控制管理:
public class BizLoggerDelegate implements BizLogger { private static final Logger LOGGER = LoggerFactory.getLogger(BizLoggerDelegate.class); // 3S 检查输盘一次日志 private int flushPeriod; private BizLogger jobLogger; private boolean lazyLog = false; private ScheduledExecutorService executor; private ScheduledFuture scheduledFuture; private BlockingQueuememoryQueue; // 日志批量刷盘数量 private int batchFlushSize = 100; private int overflowSize = 10000; // 内存中最大的日志量阀值 private int maxMemoryLogSize; private AtomicBoolean flushing = new AtomicBoolean(false); public BizLoggerDelegate(Config config) { BizLoggerFactory jobLoggerFactory = new BizLoggerFactory() { @Override public BizLogger getJobLogger() { return new ConsoleBizLogger(); } }; jobLogger = jobLoggerFactory.getJobLogger(); lazyLog = config.getParameter(Constants.LAZY_JOB_LOGGER, false); if (lazyLog) { // 无界Queue memoryQueue = new LinkedBlockingQueue (); maxMemoryLogSize = config.getParameter(Constants.LAZY_JOB_LOGGER_MEM_SIZE, 1000); flushPeriod = config.getParameter(Constants.LAZY_JOB_LOGGER_CHECK_PERIOD, 3); executor = Executors.newSingleThreadScheduledExecutor(new NamedThreadFactory("LazyJobLogger")); scheduledFuture = executor.scheduleWithFixedDelay(new Runnable() { @Override public void run() { try { if (flushing.compareAndSet(false, true)) { checkAndFlush(); } } catch (Throwable t) { LOGGER.error("CheckAndFlush log error", t); } } }, flushPeriod, flushPeriod, TimeUnit.SECONDS); } } /** * 检查内存中是否有日志,如果有就批量刷盘 */ private void checkAndFlush() { try { int nowSize = memoryQueue.size(); if (nowSize == 0) { return; } List batch = new ArrayList (); for (int i = 0; i < nowSize; i++) { BizLogPo jobLogPo = memoryQueue.poll(); batch.add(jobLogPo); if (batch.size() >= batchFlushSize) { flush(batch); } } if (batch.size() > 0) { flush(batch); } } finally { flushing.compareAndSet(true, false); } } private void checkOverflowSize() { if (memoryQueue.size() > overflowSize) { throw new BizLogException("Memory Log size is " + memoryQueue.size() + " , please check the JobLogger is available"); } } private void flush(List batch) { boolean flushSuccess = false; try { jobLogger.log(batch); flushSuccess = true; } finally { if (!flushSuccess) { memoryQueue.addAll(batch); } batch.clear(); } } /** * 检查内存中的日志量是否超过阀值,如果超过需要批量刷盘日志 */ private void checkCapacity() { if (memoryQueue.size() > maxMemoryLogSize) { // 超过阀值,需要批量刷盘 if (flushing.compareAndSet(false, true)) { // 这里可以采用new Thread, 因为这里只会同时new一个 new Thread(new Runnable() { @Override public void run() { try { checkAndFlush(); } catch (Throwable t) { LOGGER.error("Capacity full flush error", t); } } }).start(); } } } @Override public void log(BizLogPo jobLogPo) { if (jobLogPo == null) { return; } if (lazyLog) { checkOverflowSize(); memoryQueue.offer(jobLogPo); checkCapacity(); } else { jobLogger.log(jobLogPo); } } @Override public void log(List jobLogPos) { if (CollectionUtils.isEmpty(jobLogPos)) { return; } if (lazyLog) { checkOverflowSize(); for (BizLogPo jobLogPo : jobLogPos) { memoryQueue.offer(jobLogPo); } // checkCapacity checkCapacity(); } else { jobLogger.log(jobLogPos); } }}
这里写一个测试类测试下结果:
@Testpublic void loggerTest() throws InterruptedException { Config config = new Config(); config.setParameter("biz.logger","console"); Listlist = new ArrayList (); for(int i =0;i<=10;i++){ BizLogPo jobLogPo = new BizLogPo(); jobLogPo.setMsg("hello" + i); list.add(jobLogPo); } TimeUnit.SECONDS.sleep(5); BizLoggerDelegate jobLoggerDelegate = new BizLoggerDelegate(config); jobLoggerDelegate.log(list);}
通常开发使用Spring环境时,新增Spring适配工厂类,通过Spring配置下:
public class BizLoggerFactoryBean implements FactoryBean, InitializingBean, DisposableBean { public BizLogger getBizLogger() { return bizLogger; } public void setBizLogger(BizLogger bizLogger) { this.bizLogger = bizLogger; } private BizLogger bizLogger; @Override public void destroy() throws Exception { } @Override public BizLogger getObject() throws Exception { return bizLogger; } @Override public Class getObjectType() { return bizLogger.getClass(); } @Override public boolean isSingleton() { return true; } @Override public void afterPropertiesSet() throws Exception { }}
新增Spring的配置类:
LoggerSpringConfig ApplicationContextAware { ApplicationContext (ApplicationContext applicationContext) BeansException { .= applicationContext} (=) BizLogger () Exception { BizLoggerFactoryBean bizLoggerFactoryBean = BizLoggerFactoryBean()bizLoggerFactoryBean.setBizLogger(ConsoleBizLogger())bizLoggerFactoryBean.getObject()}}
新增Spring的测试类:
@Testpublic void loggerSpringTest(){ ApplicationContext context = new AnnotationConfigApplicationContext(LoggerSpringConfig.class); BizLogger bizLogger = (BizLogger) context.getBean("bizLogger"); Listlist = new ArrayList (); for(int i =0;i<=10;i++){ BizLogPo jobLogPo = new BizLogPo(); jobLogPo.setMsg("hello" + i); list.add(jobLogPo); } bizLogger.log(list);}
实际使用中,把LoggerDelelate配置到Bean里面即可。
到这里一个简单的使用并发框架实现的日志处理接口完成了,当然实现类中可以使用批量SQL的形式进行处理,增强sql的吞吐量。