Skip to content

01

前言

01

什么是管道模式

管道模式不属于我们常说的23种设计模式中的一种,它可以看成是责任链模式的一种变体。所谓的管道模式用技术话来说,就是把数据传递给一个任务队列,由任务队列按次序依次对数据进行加工处理。
图片

02

什么样的场景适合用管道模式

当业务流程比较复杂时,需要拆分成多个子步骤,且每个子步骤可以自由组合,替换,新增,删除的场景

02

实现管道的一般套路

01

封装管道数据透传上下文

auto
public class ChannelHandlerContext extends ConcurrentHashMap<String,Object> {    protected static Class<? extends ChannelHandlerContext> contextClass = ChannelHandlerContext.class;    protected static final TransmittableThreadLocal<? extends ChannelHandlerContext> CHAIN_CONTEXT = new TransmittableThreadLocal<ChannelHandlerContext>() {        @Override        protected ChannelHandlerContext initialValue() {            try {                return contextClass.getDeclaredConstructor().newInstance();            } catch (Throwable e) {                throw new RuntimeException(e);            }        }    };    /**     * 覆盖默认的管道上下文     *     * @param clazz     */    public static void setContextClass(Class<? extends ChannelHandlerContext> clazz) {        contextClass = clazz;    }    /**     * 获取当前管道上下文     *     *     */    public static final ChannelHandlerContext getCurrentContext() {        return CHAIN_CONTEXT.get();    }    /**     * 释放上下文资源     *     * @return     */    public void release() {        this.clear();        CHAIN_CONTEXT.remove();    }    /**     *     * 获取上下文默认值     * @param key     * @param defaultValue     * @return     */    public Object getDefault(String key, Object defaultValue) {        return Optional.ofNullable(get(key)).orElse(defaultValue);    }    public static final String CHANNEL_HANDLER_REQUEST_KEY = "channelHandlerRequest";    public ChannelHandlerRequest getChannelHandlerRequest() {        return (ChannelHandlerRequest) this.getDefault(CHANNEL_HANDLER_REQUEST_KEY,ChannelHandlerRequest.builder().build());    }}

02

定义管道抽象执行器

auto
public abstract class AbstactChannelHandler {    private String channelHandlerName;    public String getChannelHandlerName() {        return channelHandlerName;    }    public void setChannelHandlerName(String channelHandlerName) {        this.channelHandlerName = channelHandlerName;    }    public abstract boolean handler(ChannelHandlerContext chx);}

03

定义管道

auto
@Slf4jpublic class ChannelPipeline {    private LinkedBlockingDeque<AbstactChannelHandler> channelHandlers = new LinkedBlockingDeque();    private ChannelHandlerContext handlerContext;    public ChannelPipeline addFirst(AbstactChannelHandler channelHandler){       return addFirst(null,channelHandler);    }    public ChannelPipeline addLast(AbstactChannelHandler channelHandler){      return addLast(null,channelHandler);    }    public ChannelPipeline addFirst(String channelHandlerName,AbstactChannelHandler channelHandler){        if(StringUtils.isNotBlank(channelHandlerName)){            channelHandler.setChannelHandlerName(channelHandlerName);        }        channelHandlers.addFirst(channelHandler);        return this;    }    public ChannelPipeline addLast(String channelHandlerName,AbstactChannelHandler channelHandler){        if(org.apache.commons.lang3.StringUtils.isNotBlank(channelHandlerName)){            channelHandler.setChannelHandlerName(channelHandlerName);        }        channelHandlers.addLast(channelHandler);        return this;    }    public void setChannelHandlers(LinkedBlockingDeque<AbstactChannelHandler> channelHandlers) {        this.channelHandlers = channelHandlers;    }    public ChannelHandlerContext getHandlerContext() {        return handlerContext;    }    public void setHandlerContext(ChannelHandlerContext handlerContext) {        this.handlerContext = handlerContext;    }    public boolean start(ChannelHandlerRequest channelHandlerRequest){         if(channelHandlers.isEmpty()){             log.warn("channelHandlers is empty");             return false;         }        return handler(channelHandlerRequest);    }    private boolean handler(ChannelHandlerRequest channelHandlerRequest) {        if(StringUtils.isBlank(channelHandlerRequest.getRequestId())){            channelHandlerRequest.setRequestId(String.valueOf(SnowflakeUtils.getNextId()));        }        handlerContext.put(ChannelHandlerContext.CHANNEL_HANDLER_REQUEST_KEY,channelHandlerRequest);        boolean isSuccess = true;        try {            for (AbstactChannelHandler channelHandler : channelHandlers) {                  isSuccess = channelHandler.handler(handlerContext);                if(!isSuccess){                    break;                }            }            if(!isSuccess){                channelHandlers.clear();            }        } catch (Exception e) {            log.error("{}",e.getMessage());            isSuccess = false;        } finally {            handlerContext.release();        }        return isSuccess;    }}

04

根据业务的复杂度拆分不同子任务管道执行器

auto
@Slf4jpublic class UserCheckChannelHandler extends AbstactChannelHandler {        @Override    public boolean handler(ChannelHandlerContext chx) {        ChannelHandlerRequest channelHandlerRequest = chx.getChannelHandlerRequest();        System.out.println("------------------------------------步骤一:用户数据校验【"+channelHandlerRequest.getRequestId()+"】");        Object params = channelHandlerRequest.getParams();        if(params instanceof User){            User user = (User)params;            if(StringUtils.isBlank(user.getFullname())){                log.error("用户名不能为空");                return false;            }            return true;        }        return false;    }}
auto
@Slf4jpublic class UserFillUsernameAndEmailChannelHandler extends AbstactChannelHandler {    @SneakyThrows    @Override    public boolean handler(ChannelHandlerContext chx) {        ChannelHandlerRequest channelHandlerRequest = chx.getChannelHandlerRequest();        System.out.println("------------------------------------步骤二:用户名以及邮箱填充【将汉语转成拼音填充】【"+channelHandlerRequest.getRequestId()+"】");        Object params = channelHandlerRequest.getParams();        if(params instanceof User){            User user = (User)params;            String fullname = user.getFullname();            HanyuPinyinOutputFormat hanyuPinyinOutputFormat = new HanyuPinyinOutputFormat();            hanyuPinyinOutputFormat.setToneType(HanyuPinyinToneType.WITHOUT_TONE);            String username = PinyinHelper.toHanYuPinyinString(fullname, hanyuPinyinOutputFormat);            user.setUsername(username);            user.setEmail(username + "@qq.com");            return true;        }        return false;    }}
auto
public class UserPwdEncryptChannelHandler extends AbstactChannelHandler {    @Override    public boolean handler(ChannelHandlerContext chx) {        ChannelHandlerRequest channelHandlerRequest = chx.getChannelHandlerRequest();        System.out.println("------------------------------------步骤三:用户密码明文转密文【"+channelHandlerRequest.getRequestId()+"】");        Object params = channelHandlerRequest.getParams();        if(params instanceof User){            String encryptPwd = DigestUtil.sha256Hex(((User) params).getPassword());            ((User) params).setPassword(encryptPwd);            return true;        }        return false;    }}
auto
public class UserMockSaveChannelHandler extends AbstactChannelHandler {    @Override    public boolean handler(ChannelHandlerContext chx) {        ChannelHandlerRequest channelHandlerRequest = chx.getChannelHandlerRequest();        System.out.println("------------------------------------步骤四:模拟用户数据落库【"+channelHandlerRequest.getRequestId()+"】");        Object params = channelHandlerRequest.getParams();        if(params instanceof User){            Map<String, User> userMap = new HashMap<>();            User user = (User)params;            userMap.put(user.getUsername(),user);            chx.put("userMap",userMap);            return true;        }        return false;    }}
auto
public class UserPrintChannleHandler extends AbstactChannelHandler {    @Override    public boolean handler(ChannelHandlerContext chx) {        ChannelHandlerRequest channelHandlerRequest = chx.getChannelHandlerRequest();        System.out.println("------------------------------------步骤五:打印用户数据【"+channelHandlerRequest.getRequestId()+"】");        Object params = channelHandlerRequest.getParams();        if(params instanceof User){            Object userMap = chx.get("userMap");            if(userMap instanceof Map){                Map map = (Map)userMap;                if(map.containsKey(((User) params).getUsername())){                    System.out.println(map.get(((User) params).getUsername()));                    return true;                }            }        }        return false;    }}

05

对各个子任务进行编排组合

auto
@Servicepublic class UserServiceImpl implements UserService {    @Override    public boolean save(User user) {       return ChannelPipelineExecutor.pipeline()                .addLast(new UserCheckChannelHandler())                .addLast(new UserFillUsernameAndEmailChannelHandler())                .addLast(new UserPwdEncryptChannelHandler())                .addLast(new UserMockSaveChannelHandler())                .addLast(new UserPrintChannleHandler())                .start(ChannelHandlerRequest.builder().params(user).build());    }}

06

测试

auto
Faker faker = Faker.instance(Locale.CHINA);        User user = User.builder().age(20)                .fullname(faker.name().fullName())                .mobile(faker.phoneNumber().phoneNumber())                .password("123456").build();        userService.save(user);

查看控制台

图片

思考一下:上述实现的管道模式,有没有优化的空间?

在步骤5对各个子任务进行编排组合,假设子业务存在N个步骤,我们需要addLast N次,感觉有点硬编码了。因此我们可以做如下改造

03

改造

01

定义管道注解

auto
@Retention(RetentionPolicy.RUNTIME)@Target(ElementType.TYPE)@Documented@Component@Role(BeanDefinition.ROLE_INFRASTRUCTURE)public @interface Pipeline {    Class consumePipelinesService();    String consumePipelinesMethod();    Class[] args() default {};    int order();}

02

定义管道扫描器

auto
public class PipelineClassPathBeanDefinitionScanner extends ClassPathBeanDefinitionScanner {    public PipelineClassPathBeanDefinitionScanner(BeanDefinitionRegistry registry) {        super(registry);    }    @Override    protected Set<BeanDefinitionHolder> doScan(String... basePackages) {        Set<BeanDefinitionHolder> beanDefinitionHolders = super.doScan(basePackages);        for (BeanDefinitionHolder beanDefinitionHolder : beanDefinitionHolders) {            GenericBeanDefinition beanDefinition = (GenericBeanDefinition) beanDefinitionHolder.getBeanDefinition();            String className = beanDefinition.getBeanClassName();            beanDefinition.getPropertyValues().addPropertyValue("pipelineServiceClz",className);            beanDefinition.setBeanClass(ComsumePipelineFactoryBean.class);        }        return beanDefinitionHolders;    }    @Override    protected boolean isCandidateComponent(AnnotatedBeanDefinition beanDefinition) {        return beanDefinition.getMetadata().isInterface();    }}

03

定义管道注册器

auto
public class PipelineImportBeanDefinitionRegistrar implements ImportBeanDefinitionRegistrar {    @Override    public void registerBeanDefinitions(AnnotationMetadata importingClassMetadata, BeanDefinitionRegistry registry) {        PipelineClassPathBeanDefinitionScanner scanner = new PipelineClassPathBeanDefinitionScanner(registry);        scanner.addIncludeFilter(new AnnotationTypeFilter(FunctionalInterface.class));        Set<String> basePackages = getBasePackages(importingClassMetadata);        String[] basePackageArr = {};        scanner.scan(basePackages.toArray(basePackageArr));    }    protected Set<String> getBasePackages(AnnotationMetadata importingClassMetadata) {        Map<String, Object> attributes = importingClassMetadata.getAnnotationAttributes(EnabledPipeline.class.getCanonicalName());        Set<String> basePackages = new HashSet<>();        for (String pkg : (String[]) attributes.get("basePackages")) {            if (StringUtils.hasText(pkg)) {                basePackages.add(pkg);            }        }        if (basePackages.isEmpty()) {            basePackages.add(                    ClassUtils.getPackageName(importingClassMetadata.getClassName()));        }        return basePackages;    }}

04

定义EnableXXX注解

auto
@Retention(RetentionPolicy.RUNTIME)@Target(ElementType.TYPE)@Documented@Import(PipelineImportBeanDefinitionRegistrar.class)public @interface EnabledPipeline {    String[] basePackages() default {};}

注: 此外还需定义管道代理和管道factoryBean,因为篇幅就不贴了。感兴趣的朋友就查看文末的demo链接

05

原有的管道任务执行器,改造成如下

auto
@Slf4j@Pipeline(consumePipelinesService = UserService.class,consumePipelinesMethod = "save",args = {User.class},order = 1)public class UserCheckChannelHandler extends AbstactChannelHandler {        @Override    public boolean handler(ChannelHandlerContext chx) {        ChannelHandlerRequest channelHandlerRequest = chx.getChannelHandlerRequest();        System.out.println("------------------------------------步骤一:用户数据校验【"+channelHandlerRequest.getRequestId()+"】");        String json = JSON.toJSONString(channelHandlerRequest.getParams());        List<User> users = JSON.parseArray(json,User.class);        if(CollectionUtil.isEmpty(users) || StringUtils.isBlank(users.get(0).getFullname())){            log.error("用户名不能为空");            return false;        }        return true;    }}
auto
@Slf4j@Pipeline(consumePipelinesService = UserService.class,consumePipelinesMethod = "save",args = {User.class},order = 2)public class UserFillUsernameAndEmailChannelHandler extends AbstactChannelHandler {    @SneakyThrows    @Override    public boolean handler(ChannelHandlerContext chx) {        ChannelHandlerRequest channelHandlerRequest = chx.getChannelHandlerRequest();        System.out.println("------------------------------------步骤二:用户名以及邮箱填充【将汉语转成拼音填充】【"+channelHandlerRequest.getRequestId()+"】");        String json = JSON.toJSONString(channelHandlerRequest.getParams());        List<User> users = JSON.parseArray(json,User.class);        if(CollectionUtil.isNotEmpty(users)){            User user = users.get(0);            String fullname = user.getFullname();            HanyuPinyinOutputFormat hanyuPinyinOutputFormat = new HanyuPinyinOutputFormat();            hanyuPinyinOutputFormat.setToneType(HanyuPinyinToneType.WITHOUT_TONE);            String username = PinyinHelper.toHanYuPinyinString(fullname, hanyuPinyinOutputFormat);            user.setUsername(username);            user.setEmail(username + "@qq.com");            return true;        }        return false;    }}

。。。省略剩余管道任务执行器

06

原来的步骤编排,仅需写接口即可

auto
@FunctionalInterfacepublic interface UserService {    boolean save(User user);}

仅需这样即可进行编排

07

测试

在启动类上加上@EnabledPipeline注解。示例如下

auto
@SpringBootApplication@EnabledPipeline(basePackages = "com.github.lybgeek.pipeline.spring.test")public class SpringPipelineApplication {    public static void main(String[] args) {        SpringApplication.run(SpringPipelineApplication.class);    }}
auto
@Test    public void testPipeline(){        boolean isOk = userService.save(user);        Assert.assertTrue(isOk);    }

图片
编排的效果和之前的一样

04

总结

本文主要实现2种不同形式的管道模式,一种基于注解,编排步骤通过注解直接写在了执行器上,通过执行器去定位业务执行方法。另外一种是业务方法里面自己组合调用执行器。通过注解这方式虽然避免了业务方法自己去编排执行器,但也存在当执行器一多的话,就需要翻每个执行器类,看他的执行器顺序,这样可能会出现执行器因为顺序问题,而达不到我们想要的组合效果。基于这个问题,我将在下篇文章,在介绍其他2种实现方式

05

demo链接

https://github.com/lyb-geek/springboot-learning/tree/master/springboot-pipeline

Released under the MIT License.