本文由 简悦 SimpRead 转码, 原文地址 mp.weixin.qq.com
前言
我们经常会接到这样的需求:给你一个文本文件,每行文本对应一条数据,需要对每条数据做一些业务判断,符合条件的存到数据库的业务表;不符合条件的要将原始信息保存到异常表,并记录行标,以及异常原因。
我们先来一个最基本的实现
/**
* 主方法
* @param fileName
* @throws IOException
*/
public void importer(String fileName) throws IOException {
lines(fileName).forEach(it -> {
String line = it.v1;
Long lineAt = it.v2;
ValueBox<EntityData> box = handle(line);
ResultType resultType = box.getResultType();
if(ResultType.FAILED == resultType){
onFailed(line, lineAt, box.getException());
}
});
}
/**
*
* 通过文件名,将文件转化成带有行标的Seq
* 顺便安利一下jool,强大且好用~
* @param fileName 文件名全路径
* @return Seq<内容, 行标>
* @throws IOException
*/
private Seq<Tuple2<String, Long>> lines(String fileName) throws IOException {
return Seq.zipWithIndex(Files.lines(Paths.get(fileName)));
}
/**
* 处理单行数据
* @param line 行内容
* @return
*/
private ValueBox<EntityData> handle(String line){
return ValueOperator.start(line)
.map(Util::convert)
//.filter(...)
//.peek(...)
.peek(Util::saveEntity)
.end();
}
/**
* 异常处理
* @param line 行内容
* @param lineAt 行标
* @param e 异常
*/
private void onFailed(String line, Long lineAt, Exception e) {
//TODO
}
但需要思考:
内存问题:文件过大怎么办?
性能问题:假设一条数据从解析到校验再到入库需耗时 100ms,那么处理一个包含 10 万条数据的文件就要耗时 2.8 小时。
第一个问题比较轻松,文件过大么就拆分成固定大小的若干个文件好了。网上随便搜一下就有 N 多的现成代码,这里不做冗述。我们着重关注第二个问题。
数据库的批处理
大多数的性能优化会在最后写库的时候进行,因为数据库的 IO 耗时和内存操作不是一个数量级的,所以我们往往会采取数据库的批处理来进行性能优化,即攒够足够多的数据,一次性入库。
但数据库的批处理有着自身的缺陷。比如事务性,批量的数据会因为其中的一条入库失败而全部失败。而由于历史数据的存在,和数据表的唯一索引,这种情况发生的概率是很大的。
另外,我们还需要对异常情况做额外的业务处理,这种异常当然包括入库时的异常。你可能需要通过二分法慢慢揪出这一堆数据中的异常,过程中可能还会再揪出其他的异常数据,所以采用数据库批处理不是说不行,只是说会把事情复杂化,当你真这么做的时候,你会发现你大部分的设计、编码都是在处理回滚的问题。而真正专注在业务上的精力反而不多。
而且,数据库连接池的广泛应用、以及现代 ORM 的各种优化,实际上数据库的写操作已经不再是真正的瓶颈,或者说没你想象的那么弱。
再假设业务逻辑的其它操作本身就很耗时,那么数据库的批操作可能就不是银弹了。因为我们大部分时间是用来排队,而真实的内存利用率是极低的。想象一下火车站的售票窗口,即便出票那个过程耗时为 0,也经不住人多,因为售票员要问询、查票、调度、再问询。所以,瓶颈在哪里?我们又该怎样优化性能?
SimpleBatch
多开几个窗口不就完事儿了嘛!我们不但要处理数据,还要拿到处理结果,所以我们可以使用 Executor 和 FutureTask。
这东西的处理过程很有意思,简言之就是我们可以把一个业务过程当作一个任务扔到任务池里面,在任务未完成的时候会新建任务;当池被填满的时候,会排队等待其他任务完成。当然,你可以介入调度,细粒度地观察和决策任务的执行过程和状态,但在大部分的情景下,我们使用默认就已足够。
所以,我们需要一个算法结构,简单地帮我们优化性能。它应该具有以下特性:
它的使用应该足够简单,不能大幅度修改已有代码;
它的功能也应该足够单一,能帮我们调度批量数据的执行足矣;
它应该是可复用的,这意味着它不应该有任何具体的业务逻辑;
SimpleBatch.java
/**
* 批量工具
* @author lucifer.chan
**/
public class SimpleBatch {
private final ExecutorService executorService;
public SimpleBatch(int nThreads){
executorService = Executors.newFixedThreadPool(nThreads);
}
/**
* 批量处理
* @param iterable 数据集
* @param handler 对单条数据的处理
* @param onFailed 失败处理
* @param onComplete 完成后的处理
* @return
*/
public <T, R> List<R> execute(Iterable<T> iterable
, LcFunction<T, R> handler
, BiConsumer<T, Exception> onFailed
, Consumer<T> onComplete){
List<Future<R>> futures = StreamSupport
.stream(iterable.spliterator(), false)
.map(t -> singleTask(t, handler, onFailed, onComplete))
.collect(Collectors.toList());
return futures.stream()
.map(Unchecked.function(Future::get))
.filter(Objects::nonNull)
.collect(Collectors.toList());
}
/**
* 批量处理
* @param iterable 数据集
* @param handler 对单条数据的处理
* @param onFailed 失败处理
* @return
*/
public <T, R> List<R> execute(Iterable<T> iterable
, LcFunction<T, R> handler
, BiConsumer<T, Exception> onFailed){
return execute(iterable, handler, onFailed, t->{});
}
/**
* 批量处理
* @param iterable 数据集
* @param handler 对单条数据的处理
* @return
*/
public <T, R> List<R> execute(Iterable<T> iterable
, LcFunction<T, R> handler){
return execute(iterable
, handler
, (t, e)-> log.error(e.getMessage())
, t-> {});
}
private <T, R> FutureTask<R> singleTask(T data
, LcFunction<T, R> handler
, BiConsumer<T, Exception> onFailed
, Consumer<T> onComplete){
FutureTask<R> task = new FutureTask<>(()-> {
try {
return handler.apply(data);
} catch (Exception e){
onFailed.accept(data, e);
return null;
} finally {
onComplete.accept(data);
}
});
executorService.submit(task);
return task;
}
}
解释一下代码:
构造方法来初始化任务池大小;
用 LcFunction 来包装业务方法;
用 BiConsumer 来包装异常处理;
用 Consumer 来包装业务处理完成后的清理工作;
两个重载方法用来消费异常和清理;
最后来看一下最初需求经过改造后的实现,在未修改主业务代码的情况下,效率提高了 1000 倍。
/**
* 主方法
* @param fileName
* @throws IOException
*/
public void importe(String fileName) throws IOException {
new SimpleBatch(1000).execute(lines(fileName)
, tuple2 -> {
ValueBox<EntityData> box = handle(tuple2.v1);
ResultType resultType = box.getResultType();
if(ResultType.FAILED == resultType){
throw new RuntimeException(box.getException());
}
return box;
}
, (tuple2, e) -> onFailed(tuple2.v1, tuple2.v2, e)
);
}
全文(完)