Skip to content

本文由 简悦 SimpRead 转码, 原文地址 mp.weixin.qq.com

前言

我们经常会接到这样的需求:给你一个文本文件,每行文本对应一条数据,需要对每条数据做一些业务判断,符合条件的存到数据库的业务表;不符合条件的要将原始信息保存到异常表,并记录行标,以及异常原因。

我们先来一个最基本的实现

/**
 * 主方法
 * @param fileName
 * @throws IOException
 */
public void importer(String fileName) throws IOException {
    lines(fileName).forEach(it -> {
        String line = it.v1;
        Long lineAt = it.v2;
        ValueBox<EntityData> box = handle(line);
        ResultType resultType = box.getResultType();
        if(ResultType.FAILED == resultType){
            onFailed(line, lineAt, box.getException());
        }
    });
}

/**
 *
 * 通过文件名,将文件转化成带有行标的Seq
 * 顺便安利一下jool,强大且好用~
 * @param fileName 文件名全路径
 * @return Seq<内容, 行标>
 * @throws IOException
 */
private Seq<Tuple2<String, Long>> lines(String fileName) throws IOException {
    return Seq.zipWithIndex(Files.lines(Paths.get(fileName)));
}

/**
 * 处理单行数据
 * @param line 行内容
 * @return
 */
private ValueBox<EntityData> handle(String line){
    return ValueOperator.start(line)
            .map(Util::convert)
            //.filter(...)
            //.peek(...)
            .peek(Util::saveEntity)
            .end();
}

/**
 * 异常处理
 * @param line 行内容
 * @param lineAt 行标
 * @param e 异常
 */
private void onFailed(String line, Long lineAt, Exception e) {
    //TODO
}

但需要思考:

  1. 内存问题:文件过大怎么办?

  2. 性能问题:假设一条数据从解析到校验再到入库需耗时 100ms,那么处理一个包含 10 万条数据的文件就要耗时 2.8 小时。

第一个问题比较轻松,文件过大么就拆分成固定大小的若干个文件好了。网上随便搜一下就有 N 多的现成代码,这里不做冗述。我们着重关注第二个问题。

数据库的批处理

大多数的性能优化会在最后写库的时候进行,因为数据库的 IO 耗时和内存操作不是一个数量级的,所以我们往往会采取数据库的批处理来进行性能优化,即攒够足够多的数据,一次性入库。

但数据库的批处理有着自身的缺陷。比如事务性,批量的数据会因为其中的一条入库失败而全部失败。而由于历史数据的存在,和数据表的唯一索引,这种情况发生的概率是很大的。

另外,我们还需要对异常情况做额外的业务处理,这种异常当然包括入库时的异常。你可能需要通过二分法慢慢揪出这一堆数据中的异常,过程中可能还会再揪出其他的异常数据,所以采用数据库批处理不是说不行,只是说会把事情复杂化,当你真这么做的时候,你会发现你大部分的设计、编码都是在处理回滚的问题。而真正专注在业务上的精力反而不多。

而且,数据库连接池的广泛应用、以及现代 ORM 的各种优化,实际上数据库的写操作已经不再是真正的瓶颈,或者说没你想象的那么弱。

再假设业务逻辑的其它操作本身就很耗时,那么数据库的批操作可能就不是银弹了。因为我们大部分时间是用来排队,而真实的内存利用率是极低的。想象一下火车站的售票窗口,即便出票那个过程耗时为 0,也经不住人多,因为售票员要问询、查票、调度、再问询。所以,瓶颈在哪里?我们又该怎样优化性能?

SimpleBatch

多开几个窗口不就完事儿了嘛!我们不但要处理数据,还要拿到处理结果,所以我们可以使用 Executor 和 FutureTask。

这东西的处理过程很有意思,简言之就是我们可以把一个业务过程当作一个任务扔到任务池里面,在任务未完成的时候会新建任务;当池被填满的时候,会排队等待其他任务完成。当然,你可以介入调度,细粒度地观察和决策任务的执行过程和状态,但在大部分的情景下,我们使用默认就已足够。

所以,我们需要一个算法结构,简单地帮我们优化性能。它应该具有以下特性:

  1. 它的使用应该足够简单,不能大幅度修改已有代码;

  2. 它的功能也应该足够单一,能帮我们调度批量数据的执行足矣;

  3. 它应该是可复用的,这意味着它不应该有任何具体的业务逻辑;

SimpleBatch.java

/**
 * 批量工具
 * @author lucifer.chan
 **/
public class SimpleBatch {

    private final ExecutorService executorService;

    public SimpleBatch(int nThreads){
        executorService = Executors.newFixedThreadPool(nThreads);
    }
    
    /**
     * 批量处理
     * @param iterable 数据集
     * @param handler 对单条数据的处理
     * @param onFailed 失败处理
     * @param onComplete 完成后的处理
     * @return
     */
    public <T, R> List<R> execute(Iterable<T> iterable
            , LcFunction<T, R> handler
            , BiConsumer<T, Exception> onFailed
            , Consumer<T> onComplete){
        List<Future<R>> futures = StreamSupport
                .stream(iterable.spliterator(), false)
                .map(t -> singleTask(t, handler, onFailed, onComplete))
                .collect(Collectors.toList());
        return futures.stream()
                .map(Unchecked.function(Future::get))
                .filter(Objects::nonNull)
                .collect(Collectors.toList());
    }
    
    /**
     * 批量处理
     * @param iterable 数据集
     * @param handler 对单条数据的处理
     * @param onFailed 失败处理
     * @return
     */
    public <T, R> List<R> execute(Iterable<T> iterable
            , LcFunction<T, R> handler
            , BiConsumer<T, Exception> onFailed){
        return execute(iterable, handler, onFailed, t->{});
    }
    
    /**
     * 批量处理
     * @param iterable 数据集
     * @param handler 对单条数据的处理
     * @return
     */
    public <T, R> List<R> execute(Iterable<T> iterable
            , LcFunction<T, R> handler){
        return execute(iterable
            , handler
            , (t, e)-> log.error(e.getMessage())
            , t-> {});
    }

    private <T, R> FutureTask<R> singleTask(T data
        , LcFunction<T, R> handler
        , BiConsumer<T, Exception> onFailed
        , Consumer<T> onComplete){
        FutureTask<R> task = new FutureTask<>(()-> {
            try {
                return handler.apply(data);
            } catch (Exception e){
                onFailed.accept(data, e);
                return null;
            } finally {
                onComplete.accept(data);
            }
        });

        executorService.submit(task);
        return task;
    }
}

解释一下代码:

  1. 构造方法来初始化任务池大小;

  2. 用 LcFunction 来包装业务方法;

  3. 用 BiConsumer 来包装异常处理;

  4. 用 Consumer 来包装业务处理完成后的清理工作;

  5. 两个重载方法用来消费异常和清理;

最后来看一下最初需求经过改造后的实现,在未修改主业务代码的情况下,效率提高了 1000 倍。

/**
 * 主方法
 * @param fileName
 * @throws IOException
 */
public void importe(String fileName) throws IOException {
    new SimpleBatch(1000).execute(lines(fileName)
        , tuple2 -> {
            ValueBox<EntityData> box = handle(tuple2.v1);
            ResultType resultType = box.getResultType();
            if(ResultType.FAILED == resultType){
                throw new RuntimeException(box.getException());
            }
            return box;
        }
        , (tuple2, e) -> onFailed(tuple2.v1, tuple2.v2, e)
    );
}

全文(完)

Released under the MIT License.