博客
关于我
强烈建议你试试无所不能的chatGPT,快点击我
并发编程源码分析一之Log接口
阅读量:5874 次
发布时间:2019-06-19

本文共 7887 字,大约阅读时间需要 26 分钟。

hot3.png

并发编程实践部分源码使用github里面部分开源框架源码,重新整理,如有错误或版权问题,请指出指正。

欢迎star、fork,读书笔记系列会同步更新

git

模块j360-jdk-application

前言

平常开发过程中使用并发框架的场景在SSH的框架里面直接接触的机会并不是很多,基本上大量使用的就是部分原子类,然而在底层实现中,为了实现高并发的可能,jdk并发编程框架几乎所谓不在,每个并发编程类以及其参数都有各自所擅长的场景,并发编程框架不是一卡通,但是却很像搭积木,通过组合几乎很多的并发场景都不在话下,这里就是用批量日志管理接口的场景中,如何使用并发编程类实现高并发下的日志管理。

内容

业务日志在每个系统中都是不可或缺的功能点,业务日志的输出也有多种的输出形式,会话界面、文本、数据库等等,通常业务日志在系统中的埋点会作为一个日志单元,一个业务系统每天产生的日志数量=每个处理流程*埋点*pv(或者同单位request),在高并发系统中,单个集群节点处理日志也通常采用并发框架批量进行处理,这里分别设置两个条件来对收集的日志进行处理:

1:每nSeconds批量将收集的日志队列输出

2:日志队列占用的内存>设置的阈值(保护n秒内的队列过大)

类图

143247_6UYx_1026123.png

包结构

143331_NqQn_1026123.png

类说明

分别代表工厂类、日志方法接口类、代理类以及实现类的工厂类、方法实现类

先看接口类的方法:

public interface BizLogger {    public void log(BizLogPo bizLogPo);    public void log(List
 bizLogPos);}

分别处理单条和多条日志

再看会话打印实现类:

private Logger LOGGER = LoggerFactory.getLogger(ConsoleBizLogger.class.getSimpleName());@Overridepublic void log(BizLogPo jobLogPo) {    LOGGER.info(JSONUtils.toJSONString(jobLogPo));}@Overridepublic void log(List
 jobLogPos) {    for (BizLogPo jobLogPo : jobLogPos) {        log(jobLogPo);    }}

这里使用slf4j接口类作为会话打印的输出接口,实现类同样可以使用mysql、file等输出形式

通过工厂类配置获得实现类,通常会使用多个实现类提供接口,而多个实现类难以同时对实现过程进行控制,这里引入了代理类来调用具体的实现类,而并发编程框架通常在代理类中进行集中控制管理:

public class BizLoggerDelegate implements BizLogger {    private static final Logger LOGGER = LoggerFactory.getLogger(BizLoggerDelegate.class);    // 3S 检查输盘一次日志    private int flushPeriod;    private BizLogger jobLogger;    private boolean lazyLog = false;    private ScheduledExecutorService executor;    private ScheduledFuture scheduledFuture;    private BlockingQueue
 memoryQueue;    // 日志批量刷盘数量    private int batchFlushSize = 100;    private int overflowSize = 10000;    // 内存中最大的日志量阀值    private int maxMemoryLogSize;    private AtomicBoolean flushing = new AtomicBoolean(false);    public BizLoggerDelegate(Config config) {        BizLoggerFactory jobLoggerFactory = new BizLoggerFactory() {            @Override            public BizLogger getJobLogger() {                return new ConsoleBizLogger();            }        };        jobLogger = jobLoggerFactory.getJobLogger();        lazyLog = config.getParameter(Constants.LAZY_JOB_LOGGER, false);        if (lazyLog) {            // 无界Queue            memoryQueue = new LinkedBlockingQueue
();            maxMemoryLogSize = config.getParameter(Constants.LAZY_JOB_LOGGER_MEM_SIZE, 1000);            flushPeriod = config.getParameter(Constants.LAZY_JOB_LOGGER_CHECK_PERIOD, 3);            executor = Executors.newSingleThreadScheduledExecutor(new NamedThreadFactory("LazyJobLogger"));            scheduledFuture = executor.scheduleWithFixedDelay(new Runnable() {                @Override                public void run() {                    try {                        if (flushing.compareAndSet(false, true)) {                            checkAndFlush();                        }                    } catch (Throwable t) {                        LOGGER.error("CheckAndFlush log error", t);                    }                }            }, flushPeriod, flushPeriod, TimeUnit.SECONDS);        }    }    /**     * 检查内存中是否有日志,如果有就批量刷盘     */    private void checkAndFlush() {        try {            int nowSize = memoryQueue.size();            if (nowSize == 0) {                return;            }            List
 batch = new ArrayList
();            for (int i = 0; i < nowSize; i++) {                BizLogPo jobLogPo = memoryQueue.poll();                batch.add(jobLogPo);                if (batch.size() >= batchFlushSize) {                    flush(batch);                }            }            if (batch.size() > 0) {                flush(batch);            }        } finally {            flushing.compareAndSet(true, false);        }    }    private void checkOverflowSize() {        if (memoryQueue.size() > overflowSize) {            throw new BizLogException("Memory Log size is " + memoryQueue.size() + " , please check the JobLogger is available");        }    }    private void flush(List
 batch) {        boolean flushSuccess = false;        try {            jobLogger.log(batch);            flushSuccess = true;        } finally {            if (!flushSuccess) {                memoryQueue.addAll(batch);            }            batch.clear();        }    }    /**     * 检查内存中的日志量是否超过阀值,如果超过需要批量刷盘日志     */    private void checkCapacity() {        if (memoryQueue.size() > maxMemoryLogSize) {            // 超过阀值,需要批量刷盘            if (flushing.compareAndSet(false, true)) {                // 这里可以采用new Thread, 因为这里只会同时new一个                new Thread(new Runnable() {                    @Override                    public void run() {                        try {                            checkAndFlush();                        } catch (Throwable t) {                            LOGGER.error("Capacity full flush error", t);                        }                    }                }).start();            }        }    }    @Override    public void log(BizLogPo jobLogPo) {        if (jobLogPo == null) {            return;        }        if (lazyLog) {            checkOverflowSize();            memoryQueue.offer(jobLogPo);            checkCapacity();        } else {            jobLogger.log(jobLogPo);        }    }    @Override    public void log(List
 jobLogPos) {        if (CollectionUtils.isEmpty(jobLogPos)) {            return;        }        if (lazyLog) {            checkOverflowSize();            for (BizLogPo jobLogPo : jobLogPos) {                memoryQueue.offer(jobLogPo);            }            // checkCapacity            checkCapacity();        } else {            jobLogger.log(jobLogPos);        }    }}

这里写一个测试类测试下结果:

@Testpublic void loggerTest() throws InterruptedException {    Config config = new Config();    config.setParameter("biz.logger","console");    List
 list = new ArrayList
();    for(int i =0;i<=10;i++){        BizLogPo jobLogPo = new BizLogPo();        jobLogPo.setMsg("hello" + i);        list.add(jobLogPo);    }    TimeUnit.SECONDS.sleep(5);    BizLoggerDelegate jobLoggerDelegate = new BizLoggerDelegate(config);    jobLoggerDelegate.log(list);}

通常开发使用Spring环境时,新增Spring适配工厂类,通过Spring配置下:

public class BizLoggerFactoryBean implements FactoryBean
,        InitializingBean, DisposableBean {        public BizLogger getBizLogger() {                return bizLogger;        }        public void setBizLogger(BizLogger bizLogger) {                this.bizLogger = bizLogger;        }        private BizLogger bizLogger;        @Override        public void destroy() throws Exception {        }        @Override        public BizLogger getObject() throws Exception {                return bizLogger;        }        @Override        public Class
 getObjectType() {                return bizLogger.getClass();        }        @Override        public boolean isSingleton() {                return true;        }        @Override        public void afterPropertiesSet() throws Exception {        }}

新增Spring的配置类:

LoggerSpringConfig ApplicationContextAware {    ApplicationContext (ApplicationContext applicationContext) BeansException {        .= applicationContext}    (=)    BizLogger () Exception {        BizLoggerFactoryBean bizLoggerFactoryBean = BizLoggerFactoryBean()bizLoggerFactoryBean.setBizLogger(ConsoleBizLogger())bizLoggerFactoryBean.getObject()}}
 

新增Spring的测试类:

@Testpublic void loggerSpringTest(){    ApplicationContext context = new AnnotationConfigApplicationContext(LoggerSpringConfig.class);    BizLogger bizLogger = (BizLogger) context.getBean("bizLogger");    List
 list = new ArrayList
();    for(int i =0;i<=10;i++){        BizLogPo jobLogPo = new BizLogPo();        jobLogPo.setMsg("hello" + i);        list.add(jobLogPo);    }    bizLogger.log(list);}

实际使用中,把LoggerDelelate配置到Bean里面即可。

到这里一个简单的使用并发框架实现的日志处理接口完成了,当然实现类中可以使用批量SQL的形式进行处理,增强sql的吞吐量。

转载于:https://my.oschina.net/smartsales/blog/537575

你可能感兴趣的文章
Mongodb中Sharding集群
查看>>
使用TortoiseSVN新建及合并分支图文教程
查看>>
SpringMVC 使用JSR-303进行校验 @Valid
查看>>
[改善Java代码]减少HashMap中元素的数量
查看>>
JUnit4的使用2
查看>>
SQL is null函数
查看>>
HTML5 video 视频标签 常用属性
查看>>
一张图看懂开源许可协议,开源许可证GPL、BSD、MIT、Mozilla、Apache和LGPL的区别...
查看>>
深入理解javascript对象系列第一篇——初识对象
查看>>
CentOS 7 设置中文环境
查看>>
javascript中关于数组的一些鄙视题
查看>>
C#语法之特性
查看>>
C#中使用aria2c进行下载并显示进度条
查看>>
鞋业管理系统定期执行任务
查看>>
2016教师节微信祝福语大全
查看>>
【转】随机函数的rand、srand用法
查看>>
Nginx: could not build the server_names_hash 解决办法
查看>>
P4factory <Integration with Mininet>
查看>>
Ubuntu16.04下搭建Go语言环境
查看>>
.NetCore~Linux环境下部署
查看>>