技术开发文档

全面讲解Biz-SIP业务中台开发,包括原理、配置和代码开发。

1 Biz-SIP介绍

1.1 概述

Biz-SIP是一套基于领域驱动设计(DDD),用于快速构建金融级云原生架构的服务整合中间件,包含了在金融场景里锤炼出来的最佳实践。
主要功能有:

  • 支持服务接入和服务调用:金融应用随着前后端分离架构、微服务架构、中台战略、产业互联互通的实施必将产生大量的各种协议业务系统的整合,系统之间的相互调用和依赖情况也随之越来越多和复杂,Biz-SIP中间件支持全面整合企业内外部应用和接口,打造业务中台;
  • 支持标准接口接入和非标接口接入:标准接口采用标准的微服务接口接入,非标接口支持开箱即用的常用接口连接器,并支持个性化服务接入端的二次开发;
  • 服务接入和服务调用,支持常用的开箱即用、可配置的消息格式适配能力和通讯协议适配能力;
    • 支持可配置的转换器:支持JSON、XML、定长、有分隔符、8583等报文等报文的解包和打包,并支持二次开发扩展;
    • 支持可配置的连接器:支持WebService、RESTful、消息中间件、TCP等通讯协议的连接,并支持二次开发扩展;
  • 支持多种服务整合和编排方式:业务系统与业务系统之间、关联合作伙伴之间的系统调用都相应存在大量的服务相互调用和逻辑重组需求,目前支持脚本和Java代码来进行服务的整合和编排;
  • 丰富的内部服务集成:还可以在服务整合和编排中,加入数据库、内容存储、加密计算、AI计算、分布式事务等内部服务节点,从而进一步丰富服务整合的能力。
  • 其它功能:支持服务校验、分布式事务处理、延迟服务、交易日志监控等功能。

适合业务场景

  • 业务中台:作为企业中台架构中的前置服务化平台的服务管理、发布平台
  • 开放平台:将企业内部服务以OpenAPI方式对公众市场开放,共同打造应用场景
  • 遗留系统:作为集成企业遗留系统的统一集成平台
  • 协议转换:作为多个服务开发体系(SpringCloud、Dubbo、.Net)的统一服务转换和管理平台
  • 逻辑重组:作为企业大量的API服务与服务之间的逻辑重组平台
  • 系统解藕:解决系统服务之间相互藕合的问题
  • 敏捷交付:可快速重组业务逻辑、敏捷交付业务应用,比传统代码模式交付速度提升80%

Biz-SIP中间件整体架构如下图所示:

image.png

  • Source层:负责聚合服务的接入,包括通讯协议适配和消息格式转换,并统一接入App层的App服务。
  • App层:负责服务的整合和编排,对Source层接入的服务,进行服务编排,并通过Sink模块接入要编排的服务。
  • Sink层:统一被App服务所调用,实现业务逻辑的处理,或接入外部第三方服务,包括和外部服务对接时的通讯协议适配和消息格式转换。

1.2 运行机制

Biz-SIP中间件调用服务的一个完整运行流程如下:

Biz-SIP中间件从左到右分别是:

  • Source层(对应于DDD架构中的适配接入层):负责聚合服务的接入,包括通讯协议适配和消息格式转换,并统一调用App层的App服务。
  • App层(对应于DDD架构中的应用层):负责服务的整合和编排,对Source层接入的服务,进行服务编排,并通过Sink模块调用要编排的服务。
  • Sink层(对应于DDD架构中的领域层):统一被App服务所调用,实现业务逻辑的处理,或接入外部第三方服务,包括和外部服务对接时的通讯协议适配和消息格式转换。

1.3 文件目录结构

1.3.1 Biz-SIP源码目录结构

Biz-SIP源码目录结构,如下所示:

├── api-gateway  开放平台网关子模块
├── common       公共包子模块
├── app-spring-boot-starter				app层Spring Boot Starter子模块
├── source-spring-boot-starter   	Source层Spring Boot Starter子模块
├── sink-spring-boot-starter    	Sink层Sprint Boot Starter子模块
├── log-spring-boot-starter    		Log日志Sprint Boot Starter子模块
├── redis-spring-boot-starter  		Redis Sprint Boot Starter子模块
├── sample          Sample子模块
│   ├── config      Sample配置文件主目录
│   ├── sample-app          app层Sample子模块
│   ├── sample-sink         Sink层Sample子模块
│   ├── sample-source       Sink层Sample子模块
│   └── sample-app-log      交易日志监控Sample子模块
└── source      Source服务接入子模块
    ├── netty-source        Netty接入Source子模块
    └── rest-source         Restful接入Source子模块

1.3.2 Biz-SIP项目目录结构

基于Biz-SIP中间件开发的项目,以xBank为例,如下所示:

image.png

xbank项目分为以下模块:

  • xbank-source:适配层模块,存放外部适配接入的适配层实现。
    • xbank-openapi-source:OPENAPI接入的适配层子模块
  • xbank-app:应用层模块,存放应用层的服务实现。
  • xbank-sink:领域层模块,存放按领域划分的领域服务实现。
    • xbank-account-sink:账户域子模块
    • xbank-customer-sink:客户域子模块
    • xbank-payment1-sink:缴费域子模块(存储转发)
    • xbank-payment1-sink:缴费域子模块(交易补偿)
  • xbank-infrastructure:基础设施层模块,存放对数据库、内容存储、HSM等基础设施的访问能力实现。
    • xbank-account-db:账户数据库访问子模块
    • xbank-customre-db:客户数据库访问子模块
  • xbank-client:接口模块,存放各层之间的服务接口,以及相关数据DTO。
    • xbank-account-sink-client:账户域服务接口子模块
    • xbank-customer-sink-client:客户域服务接口子模块
    • xbank-app-client:应用服务接口子模块

各层之间调用关系如下所示:

1.3.3 配置文件目录结构

Biz-SIP中间件的配置文件,一般都统一存放在配置目录下(由application.yml文件中的bizsip.config-path配置项所约定),配置目录中的文件如下例所示:

config
|____sink.yml
|____source.yml
|____app.yml
|____converter
| |____server3
| | |____woman.vm
| | |____error.vm
| | |____man.vm
|____service
| |____client1
| | |____error.script
| |____openapi
| | |____sample1.script
| | |____sample5.script
| | |____sample2.script
| | |____sample4.script
|____check-rule
| |____client1
| | |____sample1.yaml

  • sink.yml:领域层sink模块的配置文件
  • source.yml:渠道层source模块的配置文件
  • app.yml:应用层app聚合服务的的配置文件(非脚本的服务)
  • service目录:存放所有的应用层app聚合服务编排脚本
  • converter目录:存放消息转换器converter相关的消息格式转换配置文件
  • check-rule目录:存放聚合服务校验规则配置文件

2 Source层

2.1 Source模块介绍

2.1.1 功能介绍

适配层Source模块主要是对接外部和系统内部其它系统的服务接入。
主要功能主要有:

  • 通讯协议的对接:服务接入模块(Source)负责外部调用方服务的通讯接入;
  • 消息转换:把外部多种消息格式,转换成平台内部通用的JSON标准消息报文类型;
  • 调用应用层的App服务进行服务编排。

2.1.2 运行流程

Source层采用Source Connector,运行流程如下图所示:

Source层不采用Source Connector,直接开发Source模块来进行通讯接入,运行流程如下图所示:

Source层不调用转换器,开发Source模块来进行通讯接入和格式转换,运行流程如下图所示:
Source层直接通过OpenAPI网关接入,运行流程如下图所示:

2.2 Source模块代码

2.2.1 目录结构

以xBank项目中的xbank-xml-source模块为例,这个Source模块是接受以HTTP POST方式提交的XML报文,l转换成平台标准JSON格式后,调用App层的App服务,具体代码目录结构如下所示:

image.png

可以看到,xbank-xml-source模块目录中包括以下主要文件:

  • 启动类:XmlSourceApplication.java
  • 接收HTTP请求的Controller类:XmlController.java
  • 应用配置文件:application.yml、application-*.yml
  • 模块Maven配置文件:pom.xml

2.2.2 pom.xml

在pom.xml文件中添加source-spring-boot-starter依赖,以及app服务相关联的client接口相关包:

        <dependency>
            <groupId>com.bizmda.bizsip</groupId>
            <artifactId>source-spring-boot-starter</artifactId>
        </dependency>

2.2.3 application.yml

Source模块的application.yml中的主要相关参数:

配置项 配置说明
bizsip.config-path Biz-SIP中间件配置文件根目录位置
bizsip.integrator-url App层服务的接口url地址
bizsip.rest.* 调用App层服务的相关超时时间配置

application.yml配置文件例子:

server:
  port: 8080

spring:
  application:
    name: bizsip-sample-source

  cloud:
    nacos:
      discovery:
        server-addr: bizsip-nacos:8848

bizsip:
  config-path: /var/bizsip/config
  integrator-url: http://bizsip-integrator/api
  rest:
    connection:
      # 指从连接池获取连接的timeout
      connection-request-timeout: 3000
      # 指客户端和服务器建立连接的timeout,
      # 就是http请求的三个阶段,一:建立连接;二:数据传送;三,断开连接。
      # 超时后会ConnectionTimeOutException
      connect-timeout: 3000
      # 指客户端从服务器读取数据的timeout,超出后会抛出SocketTimeOutException
      read-timeout: 3000
      
logging:
  level:
    com.bizmda.bizsip: debug

2.2.3 Application启动类

Source模块的Application启动类中,要通过“@Import”注册SpringUtil类:

@SpringBootApplication
@Import(cn.hutool.extra.spring.SpringUtil.class)
public class SampleRestSourceApplication {
    public static void main(String[] args) {
        SpringApplication.run(SampleRestSourceApplication.class, args);
    }
}

2.3 配置

source.yml定义了客户端适配器的配置参数,并不是所有适配层的接入模块都需要在source.yml中定义,只有用到可配置的Converter(消息转换器)时,才需要定义:

配置项 配置说明
[].id Source端口ID,要全局唯一
[].converter 参见:转换器配置
[].service 【一般无需配置】App服务相关配置(Source模块根据报文定位App Service ID,为高级选项)
[].service-rules[] 聚合服务定位断言规则
[].service-rules[].predicate 断言规则,返回true选择当前规则rule作为聚合服务ID,空条件为true,支持EL表达式
[].service-rules[].rule 当前断言规则对应的聚合服务ID,支持EL表达式

例子:

- id: source1
  converter:
    type: simple-json
  service:
    service-rules:
      - predicate: '#{#data[serviceId] != null}'
        rule: '#{#data[serviceId]}'
      - predicate:
        rule: source1/error

2.4 开发

2.4.1 调用Converter

Source模块要求支持消息的解包和打包:对传入的消息解包成平台标准JSON消息格式,并对返回的消息按接入消息格式进行打包。
Source模块的解包和打包功能,可能通过Java编码来实现,也可以通过采用内置的Converter(消息转换器),通过配置来实现,无需编码。
平台提供JSON、XML、定长、有分隔符、ISO8583的Converter,如果采用这些平台内置的Converter,就需要在source.yml中配置对应Source ID,以及对应的converter参数。
首先,应在在source.yml中配置相应的converter:

- id: xml-source
  converter:
    type: simple-xml

然后,就可以在Source模块代码中通过“Converter.getSourceConverter()”来获取converter调用句柄,进行相应的解包和打包操作:

@RestController
@RequestMapping("/personal")
public class XmlController {
    ......
    // 获取source层消息转换句柄
    private Converter converter = Converter.getSourceConverter("xml-source");

    @PostMapping(value = "/getCustomerAndAccountList", consumes = "application/xml", produces = "application/xml")
    public String getCustomerAndAccountList(@RequestBody String inMessage) throws BizException {
        // 消息解包操作
        JSONObject jsonObject = this.converter.unpack(inMessage.getBytes());
        String customerId = (String)jsonObject.get("customerId");
        CustomerAndAccountList customerAndAccountList = this.personalAppInterface.getCustomerAndAccountList(customerId);
        jsonObject = JSONUtil.parseObj(customerAndAccountList);
        // 消息打包并返回
        return new String(this.converter.pack(jsonObject));
    }
  	...
 }

2.4.2 调用App服务(RESTful接口)

Source模块是作为服务的接入模块,最主要的功能是要调用App层的App服务,调用接口主要有二种:RESTful接口和RabbitMQ接口,这一节主要讲普遍使用的RESTful接口。

2.4.2.1 App服务RESTful接口规范

App服务RESTful接口,对外是以Spring Cloud微服务的形式提供的,内部调用是采用以下接口规范,同时接入非标接口的Source模块也是采用这个接口进行接入的:

URL http://sip-integrator/api
注:根据App服务模块的服务暴露接口来定。
HTTP请求头

- Content-Type
application/json

- Biz-Service-Id
调用的聚合服务ID

- (可选)Biz-Trace-Id
预先设置的App服务的TraceId,不设置则由App服务自动生成TraceId。
请求包 JSON报文
响应包 Biz-SIP内部标准消息(参见“Biz-SIP内部标准消息规范”)

2.4.2.2 RESTful接口App服务调用方式

Source模块中,通过“SourceClientFactory.getAppServiceClient(Class tClass,String appServiceId)”来获取调用App层服务(tClass必须是接口类)句柄,二种调用方式:

  • 指定接口类调用:由App层定义接口调用的Interface接口类,Source模块会引用此接口,如下例所示:
@RestController
@RequestMapping("/personal")
public class PersonalController  {
    private PersonalAppInterface personalAppInterface = SourceClientFactory
            .getAppServiceClient(PersonalAppInterface.class,"app/personal");
	...

    @GetMapping(value ="/getCustomerAndAccountList")
    public CustomerAndAccountList getCustomerAndAccountList(String customerId) {
        return this.personalAppInterface.getCustomerAndAccountList(customerId);
    }

    @GetMapping(value ="/getAccountListByCustomerId")
    public List<Account> getAccountListByCustomerId(String customerId) {
        return this.personalAppInterface.getAccountListByCustomerId(customerId);
    }

    @GetMapping(value ="/getCustomer")
    public Customer getCustomer(String customerId) {
        return this.personalAppInterface.getCustomer(customerId);
    }
	...
}

  • 平台JSON接口调用:采用平台通用JSON格式消息接口,Source模块通过BizMessageInterface接口的call()方法,来实现通过平台通用JSON格式来调用App服务。如下例所示:
@RestController
@RequestMapping("/personal")
public class PersonalController  {
    private BizMessageInterface payment1SinkInterface = SourceClientFactory
            .getAppServiceClient(BizMessageInterface.class,"sink/payment1");
    ...
        
    @GetMapping(value ="/send2Payment")
    public BizMessage<JSONObject> send2Payment(String message) throws BizException {
        JSONObject jsonObject = new JSONObject();
        jsonObject.set("message",message);
        return this.payment1SinkInterface.call(jsonObject);
    }
}

2.4.3 调用App服务(RabbitMQ接口)

调用App层的App服务,除了上节介绍的RESTful接口外,还支持RabbitMQ接口,RabbitMQ接口和RESTful接口相比,除了支持同步调用(采用RabbitMQ RPC调用模式)外,还支持异步调用。

2.4.3.1 App服务RabbitMQ接口规范

App服务RabbitMQ接口,是通过RabbitMQ消息中间件来提供的,发送时相关参数:

Exchange exchange.direct.bizsip.app
代码中可以使用:BizConstant.APP_SERVICE_EXCHANGE
RoutingKey key.bizsip.app
代码中可以使用:BizConstant.APP_SERVICE_ROUTING_KEY

发送的Message消息,约定如下:

body 为调用JSON报文的byte[]字节流(UTF-8码制),通过message.withBody()设置,调用JSON报文包括:
- Biz-Service-Id:调用的App服务ID
- Biz-Trace-id:预先设置的App服务的TraceId,不设置则由App服务自动生成TraceId。
- data:调用的请求JSON报文
replyTo(可选) App服务执行完成后,响应报文回调的队列,通过setReplyTo()设置(注:如采用RPC调用模式,则无需设置)。
correlationId(可选) App服务执行完成后的回调消息,会自动带回请求时设置的correlationId,通过setCorrelationId()设置(注:如采用RPC调用模式,则无需设置)。

2.4.3.2 RabbitMQ接口App服务调用方式

Source模块中,可以通过RabbitMQ分别开发生产者(Producer)和消息者(Consumer)模块来进行App服务的调用申请发起和调用响应回调,从而实现异步调用流程:

  • 生产者模块,如下例所示:
    @PostMapping(value = "/source5", consumes = "application/json", produces = "application/json")
    public Object doService(@RequestBody String inMessage, HttpServletResponse response) {
        Message message = MessageBuilder.withBody(inMessage.getBytes(StandardCharsets.UTF_8))
                .setDeliveryMode(MessageDeliveryMode.PERSISTENT)
                .setContentType(MessageProperties.CONTENT_TYPE_JSON)
                .setReplyTo("queue.bizsip.source5").build();
        this.rabbitTemplate.send(BizConstant.APP_SERVICE_EXCHANGE, BizConstant.APP_SERVICE_ROUTING_KEY, message);
        return "success";
    }
  • 消费者模块,如下例所示:
    @RabbitListener(queuesToDeclare = @Queue(value = "queue.bizsip.source5", durable = "true", autoDelete = "false"))
    public void process(Message message) {
        JSONObject jsonObject = null;
        try {
            String str = new String(message.getBody(), "UTF-8");
            jsonObject = JSONUtil.parseObj(str);
        } catch (UnsupportedEncodingException e) {
            log.error("不支持的编码!",e);
            return;
        }
        BizMessage bizMessage = new BizMessage(jsonObject);
        log.debug("bizMessage:\n{}",BizUtils.buildBizMessageLog(bizMessage));
        return;
    }

Source模块中,也可以通过RabbitMQ中间件的RPC调用方式,实现对App服务的同步调用流程。

2.5 Source模块实施指南

Source模块会涉及格式转换、通讯连接的实现,涉及到配置和编码,在实施落地时,可以参考以下指南:

3 App层

3.1 App模块介绍

3.1.1 功能介绍

App层主要是通过App服务来进行对于内部服务和外部服务的编排处理,系统架构上支持多种模式的服务整合器,目前支持服务聚合编排模式包括:

  • 基于Script脚本的服务编排:通过编写类JavaScript的脚本文件,来实现领域层服务的聚合和编排;
  • 基于Java代码开发的服务编排:开发人员通过Java代码的编写,实现领域层服务的编排。基于代码的App服务类,支持2种类型的Java服务类:
    • app-bean-service接口服务类:app-bean-service通过实现统一的平台标准JSON接口来实现服务编排。
    • bean-service接口服务类:bean-service提供更为灵活的、约定好的接口类实现来进行服务编排。
  • 领域层服务的直接透传:通过配置sink-service,可以快速把领域层sink服务透传发布到前端的适配层source和OpenAPI接口。

3.1.2 运行流程

App服务类型为bean-service和app-bean-service时,运行流程如下图所示:

App服务类型为采用sink-service的Sink服务直通模式,运行流程如下图所示:

3.2 App模块代码结构

3.2.1 目录结构

在一个项目中,App模块有且只能有一个,以xBank项目中的xbank-app模块为例,目录结构如下所示:

image.png

xbank-app模块目录中包括:

  • Biz-SIP配置目录:config,一般建议把Biz-SIP的config配置目录统一放在项目的app模块目录中,以方便统一管理。
  • 启动类:XbankAppApplication.java
  • App服务类:PersonalAppService.java等
  • 应用配置文件:application.yml、application-*.yml
  • 模块Maven配置文件:pom.xml

3.2.2 pom.xml

pom.xml文件中应添加app-spring-boot-starter依赖(1.0.0.Beta8版本之前是依赖integrator-spring-boot-starter),以及领域层sink模块相关联的client接口相关包:

        <dependency>
            <groupId>com.bizmda.bizsip</groupId>
            <artifactId>app-spring-boot-starter</artifactId>
        </dependency>

3.2.3 application.yml

app模块中的application.yml中的主要相关参数:

配置项 配置说明
bizsip.config-path Biz-SIP中间件配置文件根目录位置
bizsip.rabbitmq-log success:会把交易日志(成功)发送给rabbitMQ
suspend:会把交易日志(成功、挂起)发送给rabbitMQ
fail:会把交易日志(成功、挂起、失败)发送给rabbitMQ
交易日志的rabbitMQ相关发送参数:
- exchange:exchange.direct.bizsip.log
- RoutingKey:queue.log.routing.key
bizsip.rest.* 调用Sink服务的相关超时时间配置
server.port App服务的微服务端口,建议用8888端口,避免和其它端口相冲突
spring.cloud.nacos.discovery.server-addr Nacos服务端口
spring.datasource.* 数据库连接配置(用于服务脚本中db对象)
spring.redis.* redis连接配置(用于服务脚本中redis对象)
spring.rabbitmq.* RabbitMQ配置(用于事务管理器)

例子:

server:
  port: 8888

spring:
  application:
    name: bizsip-integrator

  cloud:
    nacos:
      discovery:
        server-addr: bizsip-nacos:8848
#  以下配置在Istio部署中打开,以不采用NACOS注册中心,而采用etcd注册机制
#  cloud:
#    service-registry:
#      auto-registration:
#        enabled: false  #禁用注册中心服务

  datasource:
    url: jdbc:mysql://bizsip-mysql/sip
    username: root
    password: root
    driver-class-name: com.mysql.jdbc.Driver

  redis:
    redisson:
      enable: true
    host: bizsip-redis
    port: 6379
    timeout: 6000
    database: 0
    lettuce:
      pool:
        max-active: 10 # 连接池最大连接数(使用负值表示没有限制),如果赋值为-1,则表示不限制;如果pool已经分配了maxActive个jedis实例,则此时pool的状态为exhausted(耗尽)
        max-idle: 8   # 连接池中的最大空闲连接 ,默认值也是8
        max-wait: 100 # # 等待可用连接的最大时间,单位毫秒,默认值为-1,表示永不超时。如果超过等待时间,则直接抛出JedisConnectionException
        min-idle: 2    # 连接池中的最小空闲连接 ,默认值也是0
      shutdown-timeout: 100ms

  rabbitmq:
    virtual-host: /
    host: bizsip-rabbitmq
    port: 5672
    username: springcloud
    password: springcloud
    listener:
      simple:
        concurrency: 5
        max-concurrency: 15
        prefetch: 10

bizsip:
  config-path: /var/bizsip/config
	rabbitmq-log: fail
  rest:
    connection:
      # 指从连接池获取连接的timeout
      connection-request-timeout: 3000
      # 指客户端和服务器建立连接的timeout,
      # 就是http请求的三个阶段,一:建立连接;二:数据传送;三,断开连接。
      # 超时后会ConnectionTimeOutException
      connect-timeout: 3000
      # 指客户端从服务器读取数据的timeout,超出后会抛出SocketTimeOutException
      read-timeout: 3000
      
logging:
  level:
    com.bizmda.bizsip: debug

3.2.4 Application启动类

App模块的Application启动类中,要通过“@Import”注册SpringUtil类,以及通过“@CompontScan”来扫描AppController和Sink服务类装配到Spring容器中:

@SpringBootApplication
@ComponentScan(basePackages={"com.sample.app","com.bizmda.bizsip.app","cn.hutool.extra.spring"})
@Import(cn.hutool.extra.spring.SpringUtil.class)
public class BizSipAppApplication {
    public static void main(String[] args) {
        SpringApplication.run(BizSipAppApplication.class, args);
    }
}

3.3 配置

3.3.1 app.yml

app.yml定义了所有非脚本类App服务的配置参数:

配置项 配置说明
[].app-service-id 应用服务ID,要全局唯一
[].type 应用服务执行调用方式,目前支持“app-bean-service”、“bean-service”、“sink-service”三种方式:

3.3.1.1 app-bean-service

该类型执行继承BizServiceInterface接口的SpringBean,并调用约定的doBizService()方法,配置参数如下:

配置项 配置说明
[].class-name 要执行的SpringBean类名(从AppBeanInterface接口继承)

app.yml配置例子如下:

- app-service-id: /sample/service4
  type: app-bean-service
  class-name: com.bizmda.bizsip.sample.app.IntegratorService4

class-name是服务绑定的类,这个类需要从AppBeanInterface接口继承,接口如下:

public interface AppBeanInterface {
    /**
     * 执行聚合服务
     * @param message 传入的消息
     * @return 返回的消息
     */
    public abstract JSONObject process(JSONObject message) throws BizException;
}

3.3.1.2 bean-service

该类型执行约定的SpringBean,调用的方法和参数通过消息传递过来,方法名为消息体JSON数据中的“methodName”域,参数为消息体JSON数据中的“params”域(应为JSON数组),配置参数如下:

配置项 配置说明
[].class-name 要执行的SpringBean类名,方法由开发者自由定义。

app.yml配置例子如下:

- app-service-id: /springbean
  type: bean-service
  class-name: com.bizmda.bizsip.sample.app.SinkClientInterface1Impl

class-name是服务绑定的类,这个类的接口需要由开发者约定Interface并实现。

3.3.1.3 sink-service

sink-service是直接把后端的sink服务,通过此接口通过聚合服务暴露出来。

配置项 配置说明
[].sink-id 该聚合服务所对应的sink id,此聚合服务的数据会直接透传给sink id所对应的Sink后端。

app.yml配置例子如下:

- app-service-id: /sinkService
  type: sink-service
  sink-id: sink1

3.3.2 service目录

service目录位置配置文件根目录下的/service目录,目录下存放了聚合服务脚本:

config
|____sink.yml
|____source.yml
|____converter
|____service
| |____client1
| | |____error.script
| |____openapi
| | |____sample1.script
| | |____sample5.script
| | |____sample2.script
| | |____sample4.script

聚合服务ID为“目录+文件前缀”的方案,例如“service/openapi/sample1.script”文件,对应的聚合服务ID为“/openapi/sample1”,文件的后续名表示是支持不同的服务聚合器类型,目前支持script服务聚合器(.script)。
Script服务整合器是构建在magic-script脚本解析器基础上的服务编排引擎,通过解释执行service目录下的
.script文件,实现内外部服务整合和编排运行。
service目录下所有的“*.script”文件,都是Script服务整合器脚本。脚本语言采用magic-script(https://gitee.com/ssssssss-team/magic-script),这是一款基于JVM的脚本语言。
脚本中涉及的相关对象详细介绍,请参见附录1。

3.3.3 check-rule目录

Biz-SIP中间件支持App服务消息域校验规则和服务阻断校验规则,App服务消息域校验规则能对上送App服务中域的域值进行合法性检验,对于不符合检验规则的消息,会直接在入口处拒绝。服务阻断检验规则能对App服务上送数据进行检查,对不符合规则的App服务,也会在入口处拒绝。
App服务检验规则,是配置在config/check-rule目录下,配置文件的目录和文件名和对应的服务脚本位置应一致,配置文件后缀为“.yml”,配置相关参数:

配置项 配置说明
field-check-rules 定义域校验规则
field-check-rules.[].field 域的JSON取值Path,例如email、account[1].balance
field-check-rules.[].rule 域校验规则,具体参见域校验规则一节
field-check-rules.[].args[] 域检验规则的参数,具体参见域检验规则一节
field-check-rules.[].message 检验出错后,显示给调用方的错误消息。message可以嵌入域值,例如“不是邮箱地址:{}”。
field-check-mode 域检验模式,主要有“one”和“all”这二种模式,“one”表示有一个域不满足,即返回错误,返回的错误只包括第一个不满足的原因信息;“all”表示返回错误会包括所有不满足的原因信息。
service-check-rules 定义服务校验规则
service-check-rules.[].script 服务检验脚本,检验出错后,返回错误信息,没有出错直接return即可。
service-check-mode 服务校验模式,主要有“one”和“all”这二种模式,“one”表示有一个域不满足,即返回错误,返回的错误只包括第一个不满足的原因信息;“all”表示返回错误会包括所有不满足的原因信息。

具体案例如下:
config/check-rule/openapi/sample1.yml

field-check-rules:
  - field: email
    rule: isEmail
    message: '不是邮箱地址:{}'
  - field: sex
    rule: notEmpty
    message: '不能为空'
  - field: mobile
    rule: isMatchRegex
    args:
      - '^[1][3,4,5,6,7,8,9][0-9]{9}$'
    message: '不是手机号{}'
field-check-mode: one
service-check-rules:
  - script: if(data.sex == '1')
            {return '性别不符合!';}
    message: '额度超限'
service-check-mode: one

域检验规则:

域校验规则 说明 参数
isCitizenId 验证是否为身份证号码(支持18位、15位和港澳台的10位) (无)
isEmail 验证是否为可用邮箱地址 (无)
notEmpty 验证是否为空 (无)
isMatchRegex 通过正则表达式验证 参数1:正则表达式
isMoney 验证是否为金额,单位元2位小数
isCent 验证是否为金额,单位分支持
isUrl 验证是否为URL
isNumber 验证该字符串是否是数字
isMobile 验证是否为手机号码(中国)
isBetween 验证给定的数字是否在指定范围内 参数1:下限,参数2:上限
checkTimeFormatter 验证时间格式
isLengthBetween 验证给定的字符传长度是否在指定范围内 参数1:最小长度,参数2:最大长度

3.4 开发

3.4.1 二种App服务类

聚合应用服务和延迟服务,有二种服务类型:bean-service和app-bean-service。

3.4.1.1 基于接口类的bean-service

bean-service:基于Java接口类调用的服务类型,绑定的服务类是基于预先约定好的Java接口。
bean-service在App层服务配置文件(app.yml)中,如下例所示:

app.yml:
- app-service-id: app/sample-bean-service
  type: bean-service
  class-name: com.sample.app.service.SampleBeanService

具体的bean-service代码,要求是一个Spring Service容器类,实现一个约定的App层服务接口,如下例所示:

@Service
public class SampleBeanService implements SampleBeanServiceInterface {
    private BizMessageInterface sampleSinkBeankSinkInterface = AppClientFactory
            .getSinkClient(BizMessageInterface.class,"sample-sink-bean-sink");
    private HelloInterface helloInterface = AppClientFactory
            .getSinkClient(HelloInterface.class,"sample-bean-sink");
    @Override
    public String callSampleSinkBeanSink(String message) {
        JSONObject jsonObject = new JSONObject();
        jsonObject.set("message",message);
        BizMessage<JSONObject> bizMessage;
        try {
            bizMessage = sampleSinkBeankSinkInterface.call(jsonObject);
        } catch (BizException e) {
            e.printStackTrace();
            return null;
        }
        return (String)bizMessage.getData().get("message");
    }

    @Override
    public String callSampleBeanSink(String message) {
        return this.helloInterface.hello(message);
    }
}

3.4.1.2 基于平台标准消息接口的app-bean-service

app-bean-service:基于平台标准JSON接口调用的服务类型,绑定的服务类是继承AppBeanInterface接口。
app-bean-service在App层服务配置文件(app.yml)中,如下例所示:

app.yml:
- app-service-id: app/sample-app-bean-service
  type: app-bean-service
  class-name: com.sample.app.service.SampleAppBeanService

具体的app-bean-service代码,要求是一个Spring Service容器类,实现AppBeanInterface接口,如下例所示:

@Service
public class SampleAppBeanService implements AppBeanInterface {
    private HelloInterface helloInterface = AppClientFactory
            .getSinkClient(HelloInterface.class,"sample-bean-sink");

    @Override
    public JSONObject process(JSONObject jsonObject) throws BizException {
        String message = (String)jsonObject.get("message");
        jsonObject.set("message","sample-app-bean-service: Hello,"+message+";"
                + this.helloInterface.hello(message));
        return jsonObject;
    }
}

3.4.2 调用Sink服务

Sink接口,是App模块调用所有Sink模块的接口,规范如下:

Content-Type application/json
请求包 Biz-SIP内部标准消息(参见“Biz-SIP内部标准消息规范”)
响应包 Biz-SIP内部标准消息(参见“Biz-SIP内部标准消息规范”)

在App服务类中,是统一采用“AppClientFactory.getSinkClient(Class tClass,String sinkId)”来调用Sink服务(tClass必须是接口类),有二种调用方式:

  • 接口类调用:由app层定义接口调用的Interface接口类,由渠道接入模块所引用。如下例所示:
@Service
public class PersonalAppService implements PersonalAppInterface {
    private AccountSinkInterface accountSinkInterface = AppClientFactory
            .getSinkClient(AccountSinkInterface.class,"account-sink");
    private CustomerSinkInterface customerSinkInterface = AppClientFactory
            .getSinkClient(CustomerSinkInterface.class,"customer-sink");
	...
        
    @Override
    public CustomerAndAccountList getCustomerAndAccountList(String customerId) {
        Customer customer = this.customerSinkInterface.getCustomer(customerId);
        List<Account> accountList = this.accountSinkInterface.getAccountListByCustomerId(customerId);
        CustomerAndAccountList customerAndAccountList = new CustomerAndAccountList();
        customerAndAccountList.setCustomer(customer);
        customerAndAccountList.setAccountList(accountList);
        return customerAndAccountList;
    }
	...
}
  • 平台标准接口调用:采用平台通用JSONObject类型,Interface接口类统一采用BizMessageInterface接口类,并统一用call()来调用app层聚合服务。如下例所示:
@Service
public class PersonalAppService implements PersonalAppInterface {
	...
    private BizMessageInterface payment1SinkInterface = AppClientFactory
            .getSinkClient(BizMessageInterface.class,"payment1-sink");
    private BizMessageInterface payment2SinkInterface = AppClientFactory
            .getSinkClient(BizMessageInterface.class,"payment2-sink");
	...
    @Override
    public BizMessage<JSONObject> send2Payment1(Object message) throws BizException {
        JSONObject jsonObject = new JSONObject();
        jsonObject.set("message",message);
        return this.payment1SinkInterface.call(jsonObject);
    }

    @Override
    public BizMessage send2Payment2(String tranMode, String tranCode, Object message) throws BizException {
        JSONObject jsonObject = new JSONObject();
        jsonObject.set("tranCode",tranCode);
        jsonObject.set("tranMode",tranMode);
        jsonObject.set("message",message);
        return this.payment2SinkInterface.call(jsonObject);
    }
    ...
}

3.5 App服务实施指南

App服务涉及到配置和编码,同时会支持app-bean-service、bean-service、sink-service等多种类型的App服务,在进行App服务实施落地时,可以参考以下指南:

3.6 延迟服务

分布式事务是分布式系统架构设计中的一个技术难点,特别是在这几年越来越火的微服务架构中,服务拆分所带来的跨服务数据一致性问题亟待解决,而在金融系统中,这个问题更是备受关注。
在Biz-SIP中间件中,是利用基于队列的延迟服务,来提供对分布式事务的支持,主要场景有:
1. 重试通知:通知对方系统,直到对方给出明确的回复响应。
2. 向前补偿:调用第三方服务后,超时没有响应,系统会后续发起查询上笔交易状态的查询交易,根据查询交易状态来决定继续完成后续服务步骤还是对以前服务步骤发起补偿(冲正)操作。
3. 向后补偿:调用第三方服务后,超时没有响应,系统会立即发起针对此交易的补偿交易,补偿成功后,会对以前服务步骤依次发起补偿(冲正)操作;如果补偿失败,会置交易失败,由人工介入处理。

3.6.1 运行机制

延迟服务处理运行流程图,如下所示:

延迟服务可以用于消息多次重试通知、调用异常后的向前补偿、向后补偿,一般会在调用Sink服务异常后,由App服务触发的另一个App服务(一般称为App延迟服务),App延迟服务会重试发送、查询交易状态或者直接进行交易冲正补偿,失败后可以重试App延迟服务多次,直到最终调用成功。
基于队列的延迟服务的内部处理机制,如下图所示:
image.png

  1. 适配层调用App模块后,会执行指定的App服务;
  2. 在Java编写的聚合服务中,会首先约定一个聚合服务调用的接口xxxDelayInterface,会声明接口类型、调用延迟间隔时间和调用次数;
  3. 在Java编写的服务中,可以通过前面声明的xxxDelayInterface接口,发起延迟服务的调用;
  4. 发起延迟消息后,在平台内部实现机制上,是通过发送RabbitMQ延迟消息的方式来实现的;
  5. 事务管理器在延迟服务接口约定的延迟时间后,会收到RabbitMQ消息,调起聚合服务(子服务)的处理;
  6. 如果执行成功,则延迟服务顺利完成,父服务和子服务都标记为成功;
  7. 如果在执行中抛出BizResultEnum.RETRY_DELAY_APP_SERVICE枚举(App服务重试)的BizException异常,则会结束当前延迟服务,等待延迟服务的再次唤起;
  8. 如果在执行中抛出其它BizException异常,则会结束当前延迟服务,标记父服务和子服务为失败,等待人工后续处理;
  9. 延迟服务抛出App服务重试异常超过最大重试次数,也会会结束当前延迟服务,标记父服务和子服务为失败,等待人工后续处理。

3.6.2 调用延迟服务

延迟服务也是位于App层的App服务,延迟服务只能由App层的App服务来进行嵌套调用,不能从适配层直接调用延迟服务。
App调用延迟服务,统一采用“AppClientFactory.getDelayAppServiceClient(Class tClass, String bizServiceId, int… delayMilliseconds)”来调用(tClass必须是接口类),同样有自定义接口调用和平台标准接口调用二种方式,其中平台标准接口调用是采用BizMessageInterface接口类,并统一用call()来调用app层聚合服务,如下例所示:

@Service
public class PersonalAppService implements PersonalAppInterface {
    private AccountSinkInterface accountSinkInterface = AppClientFactory
            .getSinkClient(AccountSinkInterface.class,"account-sink");
    private CustomerSinkInterface customerSinkInterface = AppClientFactory
            .getSinkClient(CustomerSinkInterface.class,"customer-sink");
    private BizMessageInterface payment1SinkInterface = AppClientFactory
            .getSinkClient(BizMessageInterface.class,"payment1-sink");
    private BizMessageInterface payment2SinkInterface = AppClientFactory
            .getSinkClient(BizMessageInterface.class,"payment2-sink");
    private PersonalAppInterface personalAppDelayInterface = AppClientFactory
            .getDelayAppServiceClient(PersonalAppInterface.class,"app/personal",
                    0,1000,2000,4000,8000,16000,32000);
	...
    @Override
    public void payoutForward(String tranMode,String accountId, long amount) throws BizException {
        log.info("account出金:{},{}",accountId,amount);
        this.accountSinkInterface.payout(accountId,amount);
        JSONObject jsonObject = new JSONObject();
        jsonObject.set("tranCode","pay");
        jsonObject.set("tranMode",tranMode);
        jsonObject.set("accountId",accountId);
        jsonObject.set("tranAmount",amount);
        BizMessage<JSONObject> bizMessage = null;
        try {
            log.info("payment缴费...");
            bizMessage = this.payment2SinkInterface.call(jsonObject);
        } catch (BizException e) {
            if (e.isTimeOutException()) {
                log.info("payment交易超时,开始payout补偿...");
                this.personalAppDelayInterface.payoutForwardCompensate(jsonObject);
                return;
            }
            else {
                throw e;
            }
        }
        log.info("payment缴费成功!");
        log.info("payout成功!");
    }
}

3.6.3 延迟服务例子

下面以具体的向前补偿和向后补偿二个场景案例,来介绍如何来开发支持分布式事务的代码。
sampleCompensate服务是一个app服务,依次调用了sink53、sink51、sink54这三个sink服务,其中sink51服务是调用第三方系统的sink服务,sink53和sink54是纯内部交易的sink服务:

image.png

3.6.3.1 向前补偿

我们来看一下向前补偿的场景,向前补偿是在第3步sink51服务调用外部第三方系统时,发现消息超时没有返回后,app服务会根据约定的时延和重试次数发起向前补偿的延迟服务(第4步),首先,会发起查询上笔交易状态的查询交易(第5步),然后根据查询交易状态,来决定继续完成后续服务步骤(左边的第6步),还是对以前服务步骤发起补偿(冲正)操作(右边的第6步)。:

image.png

下面是app服务的相关代码:

public class SampleCompensate implements SampleCompensateInterface {
    // sink51为调用第三方外部系统的sink服务
    private Sink51Interface sink51Interface = IntegratorClientFactory
            .getSinkClient(Sink51Interface.class, "sink51");
    // sink53为内部交易处理的sink服务
    private Sink53Interface sink53Interface = IntegratorClientFactory
            .getSinkClient(Sink53Interface.class, "sink53");
    // sink54为内部交易处理的sink服务
    private Sink54Interface sink54Interface = IntegratorClientFactory
            .getSinkClient(Sink54Interface.class, "sink54");
    // sampleCompensate作为app服务,既能被source服务所调用,
    // 也能申明为延迟服务接口,被延迟服务所调用
    private SampleCompensateInterface delaySampleCompensateInterface
            = IntegratorClientFactory.getDelayBizServiceClient(
                    SampleCompensateInterface.class, "sampleCompensate",
                    1000, 2000, 4000, 8000, 16000, 32000);

    @Override
    public String tx01(String tranId,String data) throws BizException {
        // 调用sink53(内部交易处理)
        this.sink53Interface.tx01(tranId,data);
        try {
            // 调用sink51(调用第三方系统)
            this.sink51Interface.tx01(tranId, data);
        } catch (BizException e) {
            if (e.isRetryDelayAppService()) {
                // 超时异常后,调用延迟服务进行向前补偿
                this.delaySampleCompensateInterface.forwardCompensateTx01(tranId, data);
                return "tx01交易向前补偿中...";
            }
            throw e;
        }
        // 成功后调用sink54(内部交易处理)
        this.sink54Interface.tx01(tranId,data);
        return "tx01交易成功";
    }

    @Override
    public void forwardCompensateTx01(String tranId,String data) throws BizException {
        // 调用sink51的查询交易状态接口(调用第三方系统),超时直接抛出BizTimeOutException异常
        boolean successFlag = this.sink51Interface.queryTx01(tranId);
        if (successFlag) {
            // 返回原调用第三方系统交易成功,继续后续调用sink54
            this.sink54Interface.tx01(tranId,data);
            return;
        } else {
            // 返回原调用第三方系统交易失败,对上步sink53的交易进行冲正
            this.sink53Interface.compensateTx01(tranId,data);
            return;
        }
    }
    ......
}

首先,在app服务代码类中,申明了sink服务和延迟服务的调用接口:

    // sink51为调用第三方外部系统的sink服务
    private Sink51Interface sink51Interface = IntegratorClientFactory
            .getSinkClient(Sink51Interface.class, "sink51");
    // sink53为内部交易处理的sink服务
    private Sink53Interface sink53Interface = IntegratorClientFactory
            .getSinkClient(Sink53Interface.class, "sink53");
    // sink54为内部交易处理的sink服务
    private Sink54Interface sink54Interface = IntegratorClientFactory
            .getSinkClient(Sink54Interface.class, "sink54");
    // sampleCompensate作为app服务,既能被source服务所调用,
    // 也能申明为延迟服务接口,被延迟服务所调用
    private SampleCompensateInterface delaySampleCompensateInterface
            = IntegratorClientFactory.getDelayBizServiceClient(
                    SampleCompensateInterface.class, "sampleCompensate",
                    1000, 2000, 4000, 8000, 16000, 32000);

其中延迟服务接口delaySampleCompensateInterface在申明中,约定了如果调用超时,会最多调用6次,每次间隔分别是1秒、2秒、4秒、8秒、16秒、32秒。
其次,tx01()方法是提供给source层调用的app服务接口,是依次调用sink53、sink51、sink54的sink接口服务,其中在调用sink51超时(抛出BizException异常错误码为超时重试)后,会立即触发当前app服务封装的forwardCompensateTx01()延迟服务接口(此接口是严格按照前面申明的延迟服务接口delaySampleCompensateInterface调用延时和频次来调用的):

    @Override
    public String tx01(String tranId,String data) throws BizException {
        // 调用sink53(内部交易处理)
        this.sink53Interface.tx01(tranId,data);
        try {
            // 调用sink51(调用第三方系统)
            this.sink51Interface.tx01(tranId, data);
        } catch (BizException e) {
            if (e.isRetryDelayAppService()) {
                // 超时异常后,调用延迟服务进行向前补偿
                this.delaySampleCompensateInterface.forwardCompensateTx01(tranId, data);
                return "tx01交易向前补偿中...";
            }
            throw e;
        }
        // 成功后调用sink54(内部交易处理)
        this.sink54Interface.tx01(tranId,data);
        return "tx01交易成功";
    }

在超时异常后会调用延迟服务forwardCompensateTx01()接口,forwardCompensateTx01()也是封装在sampleCompensate的app服务代码类中,但是通过延迟服务调用的,具体逻辑如下:

    @Override
    public void forwardCompensateTx01(String tranId,String data) throws BizException {
        // 调用sink51的查询交易状态接口(调用第三方系统),超时直接抛出BizTimeOutException异常
        boolean successFlag = this.sink51Interface.queryTx01(tranId);
        if (successFlag) {
            // 返回原调用第三方系统交易成功,继续后续调用sink54
            this.sink54Interface.tx01(tranId,data);
            return;
        } else {
            // 返回原调用第三方系统交易失败,对上步sink53的交易进行冲正
            this.sink53Interface.compensateTx01(tranId,data);
            return;
        }
    }

在forwardCompensateTx01()方法中封装了在调用sink51服务超时异常后,所触发的补偿交易逻辑,会根据原tranId去sink51连接的第三方系统查询原超时交易的最终状态,成功后会继续调用sink54服务,失败后会对sink53触发补偿冲正服务。

3.6.3.2 向后补偿

我们再来看一下向后补偿的场景,向后补偿是在第3步sink51服务调用外部第三方系统时,发现消息超时没有返回后,app服务会根据约定的时延和重试次数发起向前补偿的延迟服务(第4步),首先,会向sink51发起冲正交易(第5步),如果冲正交易成功,则继续完成sink53的冲正交易(第6步),如果对sink51的冲正交易失败,则抛出异常,系统会告警并提示人工介入处理。:

image.png

下面是app服务的相关代码:

    @Override
    public String tx02(String tranId,String data) throws BizException {
        // 调用sink53(内部交易处理)
        this.sink53Interface.tx02(tranId, data);
        try {
            // 调用sink51(调用第三方系统)
            this.sink51Interface.tx02(tranId, data);
        } catch (BizException e) {
            if (e.isRetryDelayAppService()) {
                // 超时异常后,调用延迟服务进行向后补偿
                this.delaySampleCompensateInterface.backwardCompensateTx02(tranId, data);
                return "tx02交易向后补偿中...";
            }
            throw e;
        }
        // 成功后调用sink54(内部交易处理)
        this.sink54Interface.tx02(tranId, data);
        return "tx02交易成功";
    }

    @Override
    public void backwardCompensateTx02(String tranId,String data) throws BizException {
        // 调用sink51的冲正交易(调用第三方系统),并返回成功失败结果
        boolean successFlag = this.sink51Interface.compensateTx02(tranId, data);
        if (successFlag) {
            // 冲正交易成功,继续对上一步sink53进行冲正
            this.sink53Interface.compensateTx02(tranId, data);
            return;
        }
        else {
            // 冲正交易失败,抛出异常,提醒人工介入处理
            throw new BizException(BizResultEnum.OTHER_ERROR
                    ,"this.sink51Interface.compensateTx02()补偿失败");
        }
    }

tx02()方法是提供给source层调用的app服务接口,是依次调用sink53、sink51、sink54的sink接口服务,其中在调用sink51超时(抛出BizException异常错误码为超时重试)后,会立即触发当前app服务封装的backwardCompensateTx02()延迟服务接口(此接口是严格按照前面申明的延迟服务接口delaySampleCompensateInterface调用延时和频次来调用的)。
backwardCompensateTx02()也是封装在sampleCompensate的app服务代码类中,封装了在调用sink51服务超时异常后,所触发的补偿交易逻辑,会通过sink51向连接的第三方系统发起冲正交易,成功后会继续对sink53触发补偿冲正服务,而失败后会抛出BizException异常,此异常可以通过交易日志捕捉并触发告警。

4 Sink层

4.1 Sink模块介绍

Sink服务功能主要有二种:

  • 一种是对接第三方外部应用;
  • 另一种是内部的交易处理模块。

Sink服务的调用类型也主要是二种:

  • rest:采用微服务RESTful接口,是采用同步调用的方式;
  • rabbitmq:采用RabbitMQ中间件异步调用的方式。

Sink服务的功能,特别是对接第三方外部应用的Sink服务,主要是二个:

  • 通讯协议的对接:Sink Connect负责和调用系统的通讯对接,支持微服务、消息中间件、TCP长短连接等多种接入方式;
  • 消息格式的转换:Converter负责调用服务消息的格式转换,支持XML、JSON、定长、有分隔符、8583等报文。

Sink服务的处理方式主要有三种:

  • default:采用缺省的、无代码开发的的处理方式;
  • sink-bean:通过编写继承平台约定sink-bean接口(SinkBeanInterface)的Java类来进行处理;
  • bean:通过编写自定义接口的Java类来进行处理。

4.2 Sink模块代码结构

4.2.1 目录结构

以xBank项目中的xbank-account-sink模块为例,这个Sink模块是模拟账户系统服务,被App层所调用,具体代码目录结构如下所示:

image.png

可以看到,xbank-account-sink模块目录中包括:

  • 启动类:AccountSinkApplication.java
  • Sink服务类:AccountSinkService.java
  • 应用配置文件:application.yml、application-*.yml
  • 模块Maven配置文件:pom.xml

4.2.2 pom.xml

在pom.xml文件中添加sink-spring-boot-starter依赖:

        <dependency>
            <groupId>com.bizmda.bizsip</groupId>
            <artifactId>sink-spring-boot-starter</artifactId>
        </dependency>

4.2.3 application.yml

Sink模块中的application.yml中的主要参数:

配置项 配置说明
bizsip.config-path Biz-SIP中间件配置文件根目录位置
bizsip.sink-id(可选) sink-id可以配置多个以“,”号分隔的id,应用会自动加载根据sink.yml中对应sink id的sink服务(包括rest同步sink服务调用、rabbitmq异步sink服务调用)
bizsip.rabbitmq-log success:会把交易日志(成功)发送给rabbitMQ
suspend:会把交易日志(成功、挂起)发送给rabbitMQ
fail:会把交易日志(成功、挂起、失败)发送给rabbitMQ
交易日志的rabbitMQ相关发送参数:
- exchange:exchange.direct.bizsip.log
- RoutingKey:queue.log.routing.key

如下例:

server:
  port: 8001
spring:
  application:
    name: customer-sink

  cloud:
    nacos:
      discovery:
        server-addr: bizsip-nacos:8848

  datasource:
    url: jdbc:mysql://bizsip-mysql/xbank?autoReconnect=true
    username: root
    password: bizsip123456
    driver-class-name: com.mysql.cj.jdbc.Driver

bizsip:
  config-path: /var/bizsip/config
  sink-id: sink1,sink2,sink3,sink4,sink5,sink6,sink7,sink9,sink10,sink11,sink12,sink13,sink14,netty,sink15,sink22,sink23,sink24,sink25
  rabbitmq-log: success

logging:
  level:
    com.bizmda.bizsip: debug
    com.xbank: trace

4.2.4 Application启动类

Sink模块的Application启动类中,要通过“@Import”注册SpringUtil类,以及通过“@CompontScan”来扫描Sink服务类装配到Spring容器中:

@SpringBootApplication
@ComponentScan(basePackages={"cn.hutool.extra.spring", "com.sample.sink.samplebean"})
@Import(cn.hutool.extra.spring.SpringUtil.class)
public class SampleBeanSinkApplication {
    public static void main(String[] args) {
        SpringApplication.run(SampleBeanSinkApplication.class, args);
    }
}

4.3 配置(sink.yml)

sink.yml定义了所有接入Sink端口的配置参数:

配置项 配置说明
[].id Sink的ID,要全局唯一
[].type Sink调用方式,目前支持rest、rabbitmq:
- rest为RESTful同步调用方式
- rabbitmq:为基于RabbitMQ消息中间件的异步调用方式
[].processor 支持3种Sink处理方式:
- default:为缺省的Sink服务处理方式,会根据配置的converter和connector进行处理;
- sink-bean:为调用sink-bean类处理方式,sink-bean是基于SinkBeanInterface统一接口的类;
- bean:为调用基于开发者自定义接口实现的类。
[].url [条件:type为rest]:服务端适配器调用方式为RESTful方式时,该属性约定RESTful调用的url地址
[].exchange [条件:type为rabbitmq]:服务端适配器调用方式为rabbitmq方式时,该属性约定异步调用时,投递到RabbitMQ中间件的exchange。
缺省值为“exchange.dircect.bizsip.sink”
[].routing-key [条件:type为rabbitmq]:服务端适配器调用方式为rabbitmq方式时,该属性约定异步调用时,投递到RabbitMQ中间件的routing key的值。
缺省值为对应sink id的值。
[].queue [条件:type为rabbitmq]:
服务端适配器调用方式为rabbitmq方式时,该属性约定异步调用时,投递到RabbitMQ中间件的queue的值。
缺省值为“queue.bizsip.sink.{sink id}”值。
[].converter 参见:消息处理器配置
[].connector 服务端协议处理器相关配置
[].connector.type 约定服务端协议处理器的类型(具体参见服务端协议处理器配置)
[].connector.* 约定服务端协议处理器的其它参数(具体参见服务端协议处理器配置)

sink.yml例子:

- id: sink1
  type: rest
  processor: sink-bean
  url: http://bizsip-sample-sink/sink1
  class-name: com.bizmda.bizsip.sample.sink.controller.CrmServer

- id: sink3
  type: rest
  url: http://bizsip-sample-sink/sink3
  converter:
    type: simple-xml
  connector:
    type: service
    class-name: com.bizmda.bizsip.sample.sink.controller.EchoService

- id: sink4
  type: rest
  url: http://bizsip-sample-sink/sink4
  converter:
    type: velocity-json
    pack-rules:
      - predicate: '#{#data[sex]=="0"}'
        rule: sink4/woman.vm
      - predicate: '#{#data[sex]=="1"}'
        rule: sink4/man.vm
      - predicate:
        rule: sink4/error.vm
    unpack-rules:
  connector:
    type: service
    class-name: com.bizmda.bizsip.sample.sink.controller.EchoService

- id: netty
  type: rest
  url: http://bizsip-sample-sink/netty
  converter:
    type: simple-json
  connector:
    type: netty
    host: 127.0.0.1
    port: 10001

- id: sink9
  type: rest
  url: http://bizsip-sample-sink/sink9
  converter:
    type: simple-json
  connector:
    type: rabbitmq
    rpc-mode: true
    exchange: exchange.dircect.bizsip.sink
    routing-key: key.bizsip.sink9

- id: sink10
  type: rest
  url: http://bizsip-sample-sink/sink10
  converter:
    type: simple-json
  connector:
    type: service
    class-name: com.bizmda.bizsip.sample.sink.controller.EchoService

- id: sink12
  type: rest
  processor: bean
  url: http://bizsip-sample-sink/sink12

- id: sink15
  type: rabbitmq
  converter:
    type: simple-json
  connector:
    type: service
    class-name: com.bizmda.bizsip.sample.sink.controller.EchoService

4.4 Sink调用方式(type)

在sink.yml中,每个sink都需要定义type属性,约定针对此sink服务的调用方式,type属性包括:

  • rest:通过RESTful同步调用Sink服务;
  • rabbitmq:通过RabbitMQ异步调用Sink服务。

4.4.1 rest(同步调用Sink服务)

Sink模块是通过RESTful接口来接收app层发来的同步服务请求,主要有二种方式:

  • 采用Sink层内置的RESTful同步接口
  • 采用个性化开发接口

4.4.1.1 内置的RESTful同步接口

直接在application.yml中配置bizsip.sink-id属性,在application启动时扫描sink.yml中配置的sink,对于type属性为“rest”的sink,会根据sink.yml中对应sink的url属性自动接入path点。
application.yml中应配置bizsip.sink-id属性:

bizsip:
  sink-id: sink1,sink2,sink3,sink4,sink5,sink6

Sink层rest服务处理的运行流程如下:

4.4.1.2 个性化开发Sink模块

可以直接写RestController,以获得更灵活的定制:

个性化接口模块代码,如下例所示:

@RestController
public class Sink2Controller {
    private Converter converter = Converter.getSinkConverter("sink2");
    private Connector connector = Connector.getSinkConnector("sink2");

    @PostMapping(value = "/sink2", consumes = "application/json", produces = "application/json")
    public BizMessage<JSONObject> doService(@RequestBody BizMessage<JSONObject> inMessage, HttpServletResponse response) {
        log.debug("inMessage:{}", inMessage);
        try {
            byte[] packedMessage = this.converter.pack(inMessage.getData());
            byte[] returnMessage = this.connector.process(packedMessage);
            JSONObject jsonObject = this.converter.unpack(returnMessage);
            return BizMessage.buildSuccessMessage(inMessage,jsonObject);
        } catch (BizException e) {
            log.error("服务端适配器执行出错",e);
            return BizMessage.buildFailMessage(inMessage,e);
        }
    }
}

从上面的例子可以看出,在开发的RestController中,可以灵活地进行消息解包、调用配置的connector,最后消息打包返回。

4.4.2 rabbitmq(异步调用Sink服务)

type属性为rabbitmq的Sink模块,是通过RabbitMQ消息队列,来接收app层发来的异步服务请求,主要有二种方式:

  • 采用Sink层内置的RabbitMQ异步接口
  • 采用个性化开发接口

4.4.2.1 内置的RabbitMQ异步接口

直接在application.yml中配置bizsip.sink-id属性,在application启动时扫描sink.yml中配置的sink,对于rabbbitmq型的sink,会根据sink.yml中对应sink的exchange、routing-key、queue来约定Rabbitmq消息传递的exchange、routing key,以及绑定的接收队列:

  • exchange缺省为exchange.dircect.bizsip.sink,
  • routing key缺省为对应的sink id,
  • 绑定queue缺省为queue.bizsip.sink.{sink id})。

运行流程如下:

4.4.2.2 个性化开发Sink模块

可以自己开发RabbitmqListener,以获得更灵活的定制:

具体写法可以参考SinkRabbitmqListener源码:

@Service
@ConditionalOnProperty(name = "bizsip.rabbitmq.queue",matchIfMissing = false)
public class SinkRabbitmqListener {
    @Value("${bizsip.sink-id}")
    private String sinkId;
    @Value("${bizsip.rabbitmq.queue}")
    private String queue;
    @Value("${bizsip.rabbitmq.exchange}")
    private String exchange;
    @Value("${bizsip.rabbitmq.routing-key}")
    private String routingKey;

    @Value("${bizsip.rabbitmq-log:false}")
    private boolean rabbitmqLog;

    private Converter converter = null;
    private Connector connector = null;

    @Autowired
    private RabbitTemplate rabbitTemplate;
    private Jackson2JsonMessageConverter jackson2JsonMessageConverter =new Jackson2JsonMessageConverter();

    @PostConstruct
    public void init() {
        if (this.exchange == null) {
            log.error("配置文件中bizsip.rabbitmq.exchange没有配置,SinkRabbitmqListener初始化失败!");
        }
        if (this.queue == null) {
            log.error("配置文件中bizsip.rabbitmq.queue没有配置,SinkRabbitmqListener初始化失败!");
        }
        if (this.routingKey == null) {
            log.error("配置文件中bizsip.rabbitmq.routing-key没有配置,SinkRabbitmqListener初始化失败!");
        }
        if (this.sinkId == null) {
            log.error("配置文件中bizsip.sink-id没有配置,SinkRabbitmqListener初始化失败!");
            return;
        }
        this.converter = Converter.getSinkConverter(this.sinkId);
        this.connector = Connector.getSinkConnector(this.sinkId);
    }

    @RabbitListener(bindings = @QueueBinding(
            value = @Queue(value = "${bizsip.rabbitmq.queue}", durable = "true", autoDelete = "false"),
            exchange = @Exchange(value = "${bizsip.rabbitmq.exchange}", type = ExchangeTypes.DIRECT),
            key = "${bizsip.rabbitmq.routing-key}"))
    public void doService(Message message) {
        BizMessage<JSONObject> inMessage;
        inMessage = (BizMessage) jackson2JsonMessageConverter.fromMessage(message);
        if (!(inMessage.getData() instanceof JSONObject)) {
            inMessage.setData(JSONUtil.parseObj(inMessage.getData()));
        }
        JSONObject jsonObject = inMessage.getData();
        try {
            jsonObject = this.process(jsonObject);
            this.sendSuccessLog(inMessage,BizMessage.buildSuccessMessage(inMessage,jsonObject));
        } catch (BizException e) {
            this.sendFailLog(inMessage,BizMessage.buildFailMessage(inMessage,e));
        }
    }

    ......
}

4.5 Sink处理器类型(processor)

sink.yml中可以约定每个sink的服务处理方式,这是通过sink的processor属性来设置的,processor包括以下属性值:

  • default:采用缺省的、无代码开发的的处理方式;
  • sink-bean:通过编写平台标准接口的Spring容器类(实现SinkBeanInterface接口)来进行处理;
  • bean:通过编写实现自定义接口的Spring容器类来进行处理。

4.5.1 default

在sink.yml中sink的processor设置为default时,sink服务是采用默认的缺省Sink服务流程来处理的,无需任何代码开发,调用的运行流程如下:

sink.yml配置类似如下例:

- id: netty
  type: rest
  url: http://bizsip-sample-sink/netty
  converter:
    type: simple-json
  connector:
    type: netty
    host: 127.0.0.1
    port: 10001

4.5.2 sink-bean

处理器类型为sink-bean和bean的处理流程类似,如下所示:

在sink.yml中sink的processor设置sink-bean时,sink服务是会调用实现sink-bean接口(SinkBeanInterface接口)的sink-bean服务类(要求为Spring容器类),具体类名是由sink.yml中当前sink的class-name来指定的。
在sink-bean服务类中,可以调用sink.yml中配置的connector和converter,如果sink-bean服务类是继承AbstractSinkService类,当前sink的connector和converter是会自动注入的。
如需要调用connector和converter,处理流程如下:
sink-bean服务类如下例所示:

@Service
public class Payment1SinkService extends AbstractSinkService implements SinkBeanInterface {
    @Override
    public JSONObject process(JSONObject jsonObject) throws BizException {
        log.info("传入消息:\n{}", BizUtils.buildJsonLog(jsonObject));
        byte[] inBytes = this.converter.pack(jsonObject);
        log.info("打包后消息:\n{}", BizUtils.buildHexLog(inBytes));
        JSONObject outJsonObject = this.converter.unpack(inBytes);
        log.info("解包后消息:\n{}", BizUtils.buildJsonLog(outJsonObject));
        return outJsonObject;
    }
}

如果sink-bean和bean服务类是继承AbstractSinkService类,当前sink的connector和converter是会自动注入的。
sink.yml配置如下:

- id: payment1-sink
  type: rest
  processor: sink-bean
  url: http://payment1-sink/sink
  class-name: com.xbank.sink.payment.service.Payment1SinkService
  converter:
    type: simple-xml

采用sink-bean服务,接口不用提前约定,扩展性强,但缺点也是很明显,调用双方缺乏接口约定的机制。

扩展

在交易类型超过5个时,建议采用Command Executor(命令处理程序),分成多个xxxCmdExe类来进行处理,这些类应统一继承AbstractSinkBeanCmdExe类,并统一在execute()中实现业务逻辑处理。

@Service
public class Payment2SinkService implements SinkBeanInterface {
    @Autowired
    private TimeoutCmdExe timeoutCmdExe;
    @Autowired
    private TimeoutAndFailCmdExe timeoutAndFailCmdExe;
    @Autowired
    private TimeoutAndSuccessCmdExe timeoutAndSuccessCmdExe;
    @Autowired
    private SuccessCmdExe successCmdExe;

    @Override
    public JSONObject process(JSONObject jsonObject) throws BizException {
        log.info("传入消息:\n{}", jsonObject.toString());
        AbstractSinkBeanCmdExe sinkBeanCmdExe;
        String tranMode = (String)jsonObject.get("tranMode");
        switch (tranMode) {
            case "timeout":
                // 收到交易后,永远返回超时
                return timeoutCmdExe.execute(jsonObject);
            case "3timeout-fail":
                // 收到交易后,前3次返回超时,第4次返回失败码
                return timeoutAndFailCmdExe.execute(jsonObject);
            case "3timeout-success":
                // 收到交易后,前3次返回超时,第4次成功返回原报文
                return timeoutAndSuccessCmdExe.execute(jsonObject);
            default:
                //其它情况,成功返回原报文
                return successCmdExe.execute(jsonObject);
        }
    }
}

Command Executor(命令处理程序)如下例:
@Service
public class SuccessCmdExe extends AbstractSinkBeanCmdExe {
    @Override
    public JSONObject execute(JSONObject jsonObject) throws BizException {
        String tranCode = (String)jsonObject.get("tranCode");
        log.info("交易:{},返回交易成功!",tranCode);
        return jsonObject;
    }
}

4.5.3 bean

在sink.yml中sink的processor设置为bean时,sink服务是会调用实现基于开发者自己约定接口实现的bean服务类(要求为Spring容器类),具体类名是由sink.yml中当前sink的class-name来指定的。
在bean服务类中,可以调用sink.yml中配置的connector和converter,如果bean服务类是继承AbstractSinkService类,当前sink的connector和converter是会自动注入的。
bean服务代码如下例所示:

@Service
public class SampleBeanService implements HelloInterface {
    @Override
    public String hello(String message) {
        return "sample-bean-sink: Hello," + message;
    }
}

bean服务接口明确清晰,调用方一目了然,但需要调用双方提前约定Interface接口类。
如果sink-bean和bean服务类是继承AbstractSinkService类,当前sink的connector和converter是会自动注入的。
sink.yml配置如下:

- id: sample-bean-sink
  type: rest
  url: http://sample-bean-sink/sink
  processor: bean
  class-name: com.sample.sink.samplebean.service.SampleBeanService

扩展

在交易类型超过5个时,建议采用Command Executor(命令处理程序),分成多个xxxCmdExe类来进行处理,这些类应统一继承AbstractBeanCmdExe类。

4.6 Converter和Sink Connector的使用

Converter和Sink Connector,Sink层可以在以下场景中使用:

  • Sink服务类:包括sink-bean服务类和bean服务类,这种类为Spring容器类,会直接被Sink层内置接口所调用。
  • 个性化的Sink模块:不采用Sink层内置的标准接口模块,开发者可以自己开发RestController类和RabbitmqListener类开进行自定义处理。

对Sink服务类,如果sink-bean和bean服务类是继承AbstractSinkService类,当前sink的converter是会自动注入的,可以直接在Sink服务类中进行使用。
而对于没有继承AbstractSinkService的Sink服务类,以及个性化的Sink模块,就需要通过代码来获取Converter和Sink Connector的调用句柄。
下例是开发者自己写的RestController,包含了获取Converter和Sink Connector句柄及后续调用的方法:

@RestController
public class Sink3Controller {
    private Converter converter = Converter.getSinkConverter("sink3");
    private Connector connector = Connector.getSinkConnector("sink3");

    @PostMapping(value = "/sink3", consumes = "application/json", produces = "application/json")
    public BizMessage<JSONObject> doService(@RequestBody BizMessage<JSONObject> inMessage, HttpServletResponse response) {
        log.debug("inMessage:{}", inMessage);
        try {
            // 消息打包
            byte[] packedMessage = this.converter.pack(inMessage.getData());
            // 调用connector处理
            byte[] returnMessage = this.connector.process(packedMessage);
            // 消息解包
            JSONObject jsonObject = this.converter.unpack(returnMessage);
            return BizMessage.buildSuccessMessage(inMessage,jsonObject);
        } catch (BizException e) {
            log.error("服务端适配器执行出错",e);
            return BizMessage.buildFailMessage(inMessage,e);
        }
    }
}

在上例中,converter会根据sink2的消息格式配置,调用Converter进行消息打包和解包,调用Connector进行外部连接调用。当然,应在sink.yml中配置指定sink的converter和connector:

- id: sink3
  type: rest
  url: http://bizsip-sample-sink/sink3
  converter:
    type: simple-xml
  connector:
    type: service
    class-name: com.bizmda.bizsip.sample.sink.controller.EchoService

4.7 Sink服务实施指南

Sink服务涉及到配置和编码,会支持default、bean、sink-bean等多种处理类型的Sink服务,并且支持同步和异步调用方式。
在进行Sink服务实施落地时,主要有以下二种实施方案:

  • 采用在Sink模块的application.yml中配置sink-id列表,在Sink模块应用启动时,会根据sink.yml配置,自动加载指定的Sink服务,实施过程如下:

  • 在Sink模块中自行开发RestController和RabbitmqListener类,分别实现同步和异步的调用接入,并根据sink.yml配置,加载配置Sink的格式转换器(converter)和通讯连接器(connector),从而实现个性化编码处理控制,实施过程如下:

5 Connector

5.1 Connector介绍

Connector(连接器)是支持外部应用接入和调用的,主要处理通讯协议的对接,主要分为二类:

  • Source Connector:主要用在Source层,作为外部应用的接入,会适配外部应用接入的通讯方式,接收到消息后,Source Connector会触发和调用App层的App服务。
  • Sink Connector:主要用在Sink层,作为对外部应用的调用,会适配外部应用调用的通讯方式,会主动调用外部应用。

5.2 Source Connector

5.2.1 基于Source Connector的Source模块架构

Source模块负责外部应用的接入,外部应用的Connector通讯接入模块,理论上是可复用的,Source Connector接入框架,是把通用的Source Connector和个性化的Source服务分开,如下所示:

image.png

基于Source Connector的Source模块框架中包括Source Connector和Source服务二块:

  • Source Connector:主要负责和外部应用的通讯连接,根据不同的通讯方式可以选择不同的通讯适配应用来对接,通讯适配应用后续会不断扩展,可以选择已有的Source通讯适配应用,也可以在此框架上,开发自己的Source通讯适配应用。
  • Source服务:是被Source通讯适配应用调用的Spring容器类,一般是实现SourceServiceInterface接口的doService()方法,在此方法中完成消息解包、消息打包、App服务调用等步骤。

相关运行流程如下图所示:

Source服务,应实现SourceServiceInterface接口,约定如下:

/**
 * Source服务接口
 */
public interface SourceServiceInterface {
    /**
     *
     * @param data 传入Source服务的数据
     * @return Source服务返回数据
     * @throws BizException
     */
    Object doService(Object data) throws BizException;
}

在可复用的Source模块中,可以通过@Autowired进行注入引用,从而实现通用的Source通讯适配应用和个性化的Source服务代码分离的目标:

@RestController
public class RestSourceController {
    @Autowired
    private SourceBeanInterface restSourceService;

所以,一般建议在基于Source Connector的Source模块应用中,应该有且只有一个Source服务,以避免多个Source服务冲突。

5.2.2 典型的Source Connector介绍

在source模块下,有Netty和RESTful接入的Source Connector模块,可以参考:

├── source      Source服务接入子模块
│   ├── netty-source		Netty接入Source子模块
│   ├── rest-source			Restful接入Source

5.2.2.1 rest-source模块(基于JSON格式POST请求的RESTful服务)

rest-source模块是提交JSON格式的POST请求,模块获取POST的JSON报文和HTTP请求头的信息,提交给Source服务进行处理。

5.2.2.1.1 目录结构

rest-source模块目录如下所示:

image.png

rest-source模块目录中包括:

  • 启动类:RestSourceApplication.java
  • rest-source模块相关类:com.bizmda.bizsip.source.rest.controller.*
  • Source服务类:RestSourceBean.java
  • 应用配置文件:application.yml、application-*.yml
  • 模块Maven配置文件:pom.xml

要启用rest-source模块,开发者可以把该模块拷贝到自己的项目中,重写Source服务类即可。

5.2.2.1.2 application.yml

直接采用Source模块的配置参数即可:

配置项 配置说明
bizsip.config-path Biz-SIP中间件配置文件根目录位置
bizsip.integrator-url App层服务的接口url地址
5.2.2.1.3 Source服务接口约定

doService()接口描述:

  • 参数:RestSourceDTO对象
public class RestSourceDTO {
    // HTTP请求头Map
    private Map<String,String> headerMap;
    // 上送的JSON报文
    private JSONObject jsonObjectData;
}
  • 返回:JSONObject

5.2.2.2 netty-source模块(基于Netty的同步短连接)

netty-source模块是基于Netty的TCP服务端,客户端通过同步短连接来接入。

5.2.2.2.1 目录结构

netty-source模块目录如下所示:

image.png

netty-source模块目录中包括:

  • 启动类:NettySourceApplication.java
  • netty-source模块其它相关类:com.bizmda.bizsip.source.netty.*
  • Source服务类:NettySourceBean.java
  • 应用配置文件:application.yml、application-*.yml
  • 模块Maven配置文件:pom.xml

要启用netty-source模块,开发者可以把该模块拷贝到自己的项目中,重写Source服务类即可。

5.2.2.2.2 application.yml

netty-source模块除了Source模块涉及的配置参数外,还需要配置以下参数:

netty:
  port: 10002

netty.port为TCP服务端的侦听端口。

5.2.2.2.3 Source服务接口约定

doService()接口描述:

  • 参数:byte[]
  • 返回:byte[]

5.2.3 Source Connector使用方法

如果要使用现成的Source Connector模块,可以按以下步骤操作:

  1. 开发者可以把要使用的Source Connector模块拷贝到自己的项目中,作为一个单独的模块;
  2. 重写Source服务类;
  3. 根据具体情况修改application.yml、application-*.yml配置参数;
  4. 启动Source Connector模块对应的启动类。

5.3 Sink Connector

5.3.1 运行机制

Sink Connector是处于Sink层,主要用于调用外部应用:

有多种现成的Sink Connector可供选择,也可以开发新的Sink Connector进行接入。
Sink Connector主要实现功能:

  • 初始化:根据sink.yml中的connector项下参数,进行Connector初始化;
  • 与外部应用进行通讯交互:App层的App服务调用Sink服务后,会通过Sink接口模块调用Sink Connector类的方法,一般是调用出入参数都为byte[]的process()方法。

5.3.2 典型Sink Connector介绍

5.3.2.1 Netty Connector

通过Netty实现TCP短连接调用。
相关配置参数:

配置项 配置说明
[].connector.type 约定为“netty”
[].connector.host 约定调用的TCP主机地址
[].connector.port 约定调用的TCP端口地址

例子:

- id: netty-server
  type: rest
  url: http://sip-sample-server/netty
  converter:
    type: simple-json
  connector:
    type: netty
    host: 127.0.0.1
    port: 10002

5.3.2.2 RabbitMQ Connector

通过RabbitMQ进行RPC同步调用。
相关配置参数:

配置项 配置说明
[].connector.type 约定为“rabbitmq”
[].connector.exchange 投递消息的exchange目标
[].connector.routing-key 投递消息时的Routing Key

例子:

- id: sink9
  type: rest
  url: http://bizsip-sample-sink/sink9
  converter:
    type: simple-json
  connector:
    type: rabbitmq
    exchange: exchange.dircect.bizsip.sink
    routing-key: key.bizsip.sink9

5.3.2.3 Spring Service Connector

调用Spring Service类,该类应该实现ByteProcessInterface接口。
相关配置参数:

配置项 配置说明
[].connector.type 约定为“service”
[].connector.class-name Spring Service的实现类

例子:

- id: sink3
  type: rest
  url: http://bizsip-sample-sink/sink3
  converter:
    type: simple-xml
  connector:
    type: service
    class-name: com.bizmda.bizsip.sample.sink.controller.EchoService

Spring Service类要求必须实现ByteProcessInterface接口:

public interface ByteProcessInterface {
    /**
     * service类型Connector类,要实现的基于字节流byte[]作为出入参数的接口
     * @param inMessage
     * @return
     * @throws BizException
     */
    byte[] process(byte[] inMessage) throws BizException;
}

5.3.2.4 http-post Connector

通过HTTP方式进行数据内容的提供和返回,提交方式约定统一采用POST方式。
相关配置参数:

配置项 配置说明
[].connector.type 约定为“http-post”
[].connector.url HTTP连接所对应的URL地址
[].connector.headers 指定提交请求的HTTP headers
[].connector.charset 指定发送和接收数据的码制,缺省为“utf-8”

例子:

- id: http-connector-sink
  type: rest
  url: http://bizsip-sample-sink/http-connector-sink
  converter:
    type: simple-json
  connector:
    type: http-post
    url: http://httpbin.org/post
    headers:
      Content-Type: application/json

5.3.3 Sink Connector使用方法

Sink Connector可以让Sink服务快速对接外部应用,要使用Sink Connector:
首先需要在sink.yml中在对应id的sink项下配置connector属性,其中type属性为要使用的Sink Connector。
Sink Connector有以下几个使用场景:

  1. Sink处理器为default类型:

Sink服务类会自己调用Sink Connector,完成和外部应用的通讯交互。

  1. Sink处理器为bean/sink-bean类型:
  • 显式声明及调用:需要在bean/sink-bean服务类中,显式声明(“Connector.getSinkConnector(sink-id)”)并调用Sink Connector(“connector.process()”)。
  • 隐式声明及调用:bean/sin-bean服务类如果继承AbstractSinkService类,当前sink的connectorr是会自动注入的,无需显式进行获取调用句柄,直接可以在代码中调用(“this.connector.process()”)。

6 Converter

6.1 配置文件(source.yml、sink.yml)

source.yml和sink.yml文件中定义了消息转换器相关配置:

配置项 配置说明
[].converter Source模块/Sink模块的消息格式相关配置
[].converter.type Source模块/Sink模块的消息转换类型,目前支持:simple-json、simple-xml、velocity-json、velocity-xml、fixed-length、velocity-split等
[].converter.pack-rules[](可选) 消息打包规则,选择打包格式文件
[].converter.pack-rules[].predicate 断言规则,返回true选择当前规则rule作为打包格式文件,空条件为true,支持EL表达式
[].converter.pack-rules[].rule 消息格式文件,支持EL表达式
[].converter.unpack-rules[](可选) 消息解包规则
[].converter.unpack-rules[].predicate 断言规则(同上)
[].converter.unpack-rules[].rule 消息格式文件(同上)

消息转换类型,目前支持simple-json、simple-xml、velocity-json、velocity-xml、fixed-length、velocity-split、iso-8583等7种方式,这7种消息转换类型的说明和针对打包、解包规则说明如下:

消息转换类型 消息转换说明 消息转换描述 消息打包规则 消息解包规则
simple-json 支持快速简单的JSON报文适配 打包:直接输出内部标准JSON报文
解包:直接对传入JSON进行解包
simple-xml 支持快速简单的XML报文适配 打包:直接把内部标准JSON报文转换成XML格式输出
解包:对传入XML直接解包成内部标准JSON报文
velocity-json 支持基于Velocity模板的JSON报文适配 打包:根据断言规则定位的Velocity模板文件进行打包
解包:直接对传入JSON进行解包
velocity-xml 支持基于Velocity模板的XML报文适配 打包:根据断言规则定位的Velocity模板文件,进行打包
解包:对传入XML直接解包成内部标准JSON报文
fixed-length 支持基于配置的定长报文适配 打包:根据断言规则定位的打包文件,文件是基于域级打包配置参数定制的
解包:基于预解包域级配置,并根据断言规则 定位的解包文件,文件是基于域级配置参数配置的
velocity-split 支持基于配置和Velocity模板的有分隔符报文适配 打包:根据断言规则定位的Velocity模板文件进行打包
解包:基于多级分隔符配置的解包
iso-8583 支持基于配置的ISO8583报文适配 打包:根据sink.yml中的fields、destination-id、source-id配置进行打包;
解包:根据sink.yml中的fields配置进行解包

6.2 converter目录

“config/converter”目录,存放了各种消息的格式转换配置文件,目前支持的文件类型有“.vm”和“.yml”。
对于消息格式转换类型有打包和解包规则 的,会根据断言规则定位,到message目录下用对应的文件来加载对应的格式转换配置,如下例所示:

config/converter
├── server3
│   ├── error.vm
│   ├── man.vm
│   └── woman.vm
├── server4
│   ├── man.yml
│   └── woman.yml
└── server5
    ├── error.vm
    ├── man.vm
    └── woman.vm

文件类型和消息转换类型的关系如下:

  • *.vm:Velocity模板文件
    • velocity-json消息转换类型中的打包模板
    • velocity-xml消息转换类型中的打包模板
    • velocity-split消息转换类型中的打包模板
  • *.yml:YML配置文件
    • fix-length消息转换类型中的打包和解包参数配置文件

6.2.1 velocity-json/velocity-xml(*.vm)

对于消息转换类型为velocity-json、velocity-xml的,会根据断言规则定位,到converter目录下用对应的vm文件进行消息打包,如下例所示:

config
|____converter
| |____sink4
| | |____woman.vm
| | |____error.vm
| | |____man.vm

vm文件支持velocity模板语言进行处理,模板中注入变量有:

变量名 变量说明
data BizMessage.data:内部标准消息的数据体,为JSONObject对象格式

6.2.2 fixed-length(*.yml)

对于消息转换类型为fixed-length,会根据断言规则 定位,到converter目录下用对应的yml文件进行打包和解包,如下例所示:

config
|____converter
| |____sink6
| | |____woman.yml
| | |____error.yml
| | |____man.yml

yml配置文件如下例所示:

- name: sex
  length: 1
- name: accountNo
  length: 8
  unpack-functions:
    - name: trim
- name: accountName
  length: 10
  unpack-functions:
    - name: trim
- name: balance
  length: 10
  pack-functions:
    - name: decimalFormat
      args:
        - "###,###.00"

相关配置域属性如下:

配置域 域说明
name 对应内部标准消息格式中的JSON域名。
length 域值所占长度
unpack-functions 解包函数,支持多个函数,多个函数会按序执行。
pack-functions 打包函数,同样支持多个函数

6.3 Converter类型

6.3.1 simple-json(简单JSON格式适配)

支持快速、简单的JSON消息适配,直接把Biz-SIP内部标准消息,直接打包成外部的JSON格式;同时也直接把外部的JSON格式消息,快速直接解包成Biz-SIP内部标准消息。
可以在客户端适配器和服务端适配器中启用,配置如下例所示:

- id: sink1
  type: rest
  url: http://bizsip-sample-sink/sink1
  converter:
    type: simple-json
  connector:
    type: sink-bean
    class-name: com.bizmda.bizsip.sample.sink.controller.CrmServer

6.3.2 simple-xml(简单XML格式适配)

支持快速、简单的XML消息适配,直接把Biz-SIP内部标准消息,直接打包成外部的XML格式;同时也直接把外部的XML格式消息,快速直接解包成Biz-SIP内部标准消息。
可以在客户端适配器和服务端适配器中启用,配置如下例所示:

- id: server3
  type: rest
  url: http://sip-sample-server/server3
  converter:
    type: simple-xml
  connector:
    type: sink-bean
    class-name: com.bizmda.bizsip.sample.sink.controller.EchoServer

6.3.3 velocity-json(基于模板的JSON格式适配)

支持基于Velocity模板的JSON报文适配,在对外部的JSON报文打包,是采用Velocity模板文件,这个文件是通过打包断言规则来进行匹配的;外部JSON报文的解包,是直接解包成Biz-SIP内部标准消息的。
配置如下例所示,含有pack-rules:

- id: server4
  type: rest
  url: http://sip-sample-server/server4
  converter:
    type: velocity-json
    pack-rules:
      - predicate: '#{#data[sex]=="0"}'
        rule: server3/woman.vm
      - predicate: '#{#data[sex]=="1"}'
        rule: server3/man.vm
      - predicate:
        rule: server3/error.vm
  connector:
    type: sink-bean
    class-name: com.bizmda.bizsip.sample.sink.controller.EchoServer

Velocity模板中注入变量有:

变量名 变量说明
data BizMessage.data:内部标准消息的数据体,为JSONObject对象格式

6.3.4 velocity-xml(基于模板的XML适配)

支持基于Velocity模板的XML报文适配,在对外部的XML报文打包,是采用Velocity模板文件,这个文件是通过打包断言规则来进行匹配的;外部XML报文的解包,是直接解包成Biz-SIP内部标准消息的。
配置如下例所示,含有pack-rules:

- id: server5
  type: rest
  url: http://sip-sample-server/server5
  converter:
    type: velocity-xml
    pack-rules:
      - predicate: '#{#data[sex]=="0"}'
        rule: server3/woman.vm
      - predicate: '#{#data[sex]=="1"}'
        rule: server3/man.vm
      - predicate:
        rule: server3/error.vm
    unpack-rules:
  connector:
    type: sink-bean
    class-name: com.bizmda.bizsip.sample.sink.controller.EchoServer

Velocity模板中注入变量有:

变量名 变量说明
data BizMessage.data:内部标准消息的数据体,为JSONObject对象格式

6.3.5 fixed-length(定长消息适配)

支持基于配置的定长报文适配:

  • 打包:根据断言规则定位的打包文件,文件是基于域级打包配置参数定制的
  • 解包:先基于域级配置进行消息报文的预解包,并根据断言规则 定位的解包文件,文件是基于域级配置参数配置的。

配置如下例所示,含有pre-unpack、pack-rules和unpack-rules:

- id: server6
  type: rest
  url: http://sip-sample-server/server6
  converter:
    type: fixed-length
    pre-unpack:
      - name: sex
        length: 1
        functions:
          - name: trim
    pack-rules:
      - predicate: '#{#data[sex]=="0"}'
        rule: server4/woman.yml
      - predicate: '#{#data[sex]=="1"}'
        rule: server4/man.yml
      - predicate:
        rule: server4/error.yml
    unpack-rules:
      - predicate: '#{#data[sex]=="0"}'
        rule: server4/woman.yml
      - predicate: '#{#data[sex]=="1"}'
        rule: server4/man.yml
      - predicate:
        rule: server4/error.yml
  connector:
    type: sink-bean
    class-name: com.bizmda.bizsip.sample.sink.controller.EchoServer

根据解包和打包断言规则,定位报文对应的解包或打包定义yml定义文件,解包和打包定义文件如下例所示:

- name: sex
  length: 1
- name: accountNo
  length: 8
  unpack-functions:
    - name: trim
- name: accountName
  length: 10
  unpack-functions:
    - name: trim
- name: balance
  length: 10
  pack-functions:
    - name: decimalFormat
      args:
        - "###,###.00"

主要的配置定义如下:

配置项 配置说明
[].name 域名,对应Biz-SIP内部标准消息中的域名。
解包:表示当前数据从些域名中取得数据;
打包:表示数据对置入该域名对应的数据域。
[].length 当前域的长度
[].unpack-functions[](可选) 解包时,对当前域域值进行处理的域处理函数。
[].pack-functions[](可选) 打包时,对当前域域值进行处理的域处理函数。

其中,unpack-functions、pack-functions,以及pre-unpack项下的functions,都是域处理函数,具体请参见“6.4 域处理函数”一节,相关配置定义如下:

配置项 配置说明
name 域处理函数,具体请参见域处理函数一节
args[] 域处理函数的参数,具体参数约定请参见域处理函数一节。

fixed-length格式转换的解包流程为:

  1. 根据“pre-unpack”中的设置,首先对传入的定长消息进行预解包,先解出关键域的域值;
  2. 根据“unpack-rule”,用上一步解出的关键域域值,根据解包断言规则,选择指定的解包yml定义文件;
  3. 根据上一步选择的解包yml定义文件,对传入的定长消息进行解包,解包成Biz-SIP内部标准消息。

打包流程为:

  1. 根据“pack-rule”,根据打包断言规则,选择指定的打包yml定义文件;
  2. 根据上一步选择的打包yml定义文件,对Biz-SIP内部标准消息进行打包,打包成定长消息报文。

6.3.6 velocity-split(基于模板的有分隔符消息适配)

velocity-split消息适配,是针对有分隔符消息的转换和适配,打包是基于Velocity模板方式来进行转换的,解包是基于客户端适配器和服务端适配器的消息配置来转换的。
velocity-split消息适配的打包,是支持基于Velocity模板的适配,模板文件是通过打包断言规则来进行匹配的。
source.yml/sink.yml文件的配置如下例所示,含有separators和pack-rules的定义:

- id: server7
  type: rest
  url: http://sip-sample-server/server7
  converter:
    type: velocity-split
    separators:
      - "*"
      - ","
    pack-rules:
      - predicate: '#{#data[sex]=="0"}'
        rule: server5/woman.vm
      - predicate: '#{#data[sex]=="1"}'
        rule: server5/man.vm
      - predicate:
        rule: server5/error.vm
  connector:
    type: sink-bean
    class-name: com.bizmda.bizsip.sample.sink.controller.EchoServer

主要的配置定义如下:

配置项 配置说明
[].converter.separators 约定分隔符,可以设置多个分隔符。

velocity-split格式转换的打包流程为:

  1. 根据“pack-rule”,执行打包断言规则,选择指定的打包vm定义文件;
  2. 根据上一步选择的解包yml定义文件,对Biz-SIP内部标准消息进行打包,打包成有分隔符消息报文。

解包流程为:

  1. 根据“unpack-rule”,根据打包断言规则,选择指定的打包yml定义文件;
  2. 根据source.yml/sink.yml文件中的separators设置,将有分隔符消息报文,直接解包到Biz-SIP内部标准消息报文。

6.3.7 iso-8583(ISO8583消息适配)

支持基于配置的ISO8583消息适配,打包和解包,都是根据sink.yml文件中对应sink的iso-8583报文和域配置信息,进行ISO8583报文的打包和解包。
sink.yml对应ISO8583报文配置,如下例所示:

- id: sink13
  type: rest
  url: http://bizsip-sample-sink/sink13
  converter:
    type: iso-8583
    destination-id: '00010344'
    source-id: '42430440'
    fields:
      - index: 2
        length: 19
        unfixed: 2
        name: accNo2
      - index: 3
        length: 6
        name: processingCode3
      - index: 4
        length: 12
        name: amt_trans4
      - index: 5
        length: 12
        name: amt_settlmt5
      - index: 6
        length: 12
        name: transactionAmount6
      - index: 7
        length: 10
        name: transmsn_date_time7
      - index: 9
        length: 8
        name: transmsn_date_time9
      - index: 10
        length: 8
        name: transmsn_date_time10
      - index: 11
        length: 6
        name: systemTraceAuditNumber11
      - index: 12
        length: 6
        name: time_local_trans12
      - index: 13
        length: 4
        name: date_local_trans13
      - index: 14
        length: 4
        name: date_expr14
      - index: 15
        length: 4
        name: date_settlmt15
      - index: 16
        length: 4
        name: date_conv16
      - index: 18
        length: 4
        name: mchnt_type18
      - index: 19
        length: 3
        name: mchnt_cntry_code19
      - index: 22
        length: 3
        name: pos_entry_mode_code22
      - index: 23
        length: 3
        name: card_seq_num23
      - index: 25
        length: 2
        name: pos_cond_code25
      - index: 26
        length: 2
        name: pos_pin_cap_code26
      - index: 28
        length: 9
        name: amt_fee28
      - index: 32
        length: 11
        unfixed: 2
        name: acq_inst_id_code32
      - index: 33
        length: 11
        unfixed: 2
        name: fwd_inst_id_code33
      - index: 35
        length: 37
        unfixed: 2
        name: track2_data35
      - index: 36
        length: 104
        unfixed: 3
        name: track3_data36
      - index: 37
        length: 12
        name: retrivl_ref_num37
      - index: 38
        length: 6
        name: authr_id_resp38
      - index: 39
        length: 2
        name: resp_code39
      - index: 41
        length: 8
        name: card_accptr_termnl_id41
      - index: 42
        length: 15
        name: card_accptr_id42
      - index: 43
        length: 40
        name: card_accptr_name_loc43
      - index: 44
        length: 25
        unfixed: 2
        name: addtnl_resp_code44
      - index: 45
        length: 76
        unfixed: 2
        name: track1_data45
      - index: 48
        length: 512
        unfixed: 3
        name: addtnl_data_private48
      - index: 49
        length: 3
        name: currcy_code_trans49
      - index: 50
        length: 3
        name: currcy_code_settlmt50
      - index: 51
        length: 3
        name: currcy_code_cdhldr_bil51
      - index: 52
        length: 8
        name: pin_data52
      - index: 53
        length: 16
        name: security_control_info53
      - index: 54
        length: 40
        unfixed: 3
        name: addtnl_amt54
      - index: 55
        length: 255
        unfixed: 3
        name: ICC_data55
      - index: 56
        length: 255
        unfixed: 3
        name: token_par56
      - index: 57
        length: 100
        unfixed: 3
        name: issr_addtnl_data57
      - index: 59
        length: 600
        unfixed: 3
        name: detai_inquiring59
      - index: 60
        length: 100
        unfixed: 3
        name: reserved60
      - index: 61
        length: 200
        unfixed: 3
        name: ch_auth_info61
      - index: 62
        length: 200
        unfixed: 3
        name: switching_data62
      - index: 63
        length: 512
        unfixed: 3
        name: fin_net_data63
      - index: 70
        length: 3
        name: net_mgr_info_code70
      - index: 90
        length: 42
        name: ori_data_element90
      - index: 96
        length: 8
        name: msg_security_code96
      - index: 100
        length: 11
        unfixed: 2
        name: rcvg_inst_id_code100
      - index: 102
        length: 28
        unfixed: 2
        name: acct_id_1_102
      - index: 103
        length: 28
        unfixed: 2
        name: acct_id_2_103
      - index: 104
        length: 512
        unfixed: 3
        name: trans_industry_app_inf104
      - index: 113
        length: 512
        unfixed: 3
        name: add_data113
      - index: 117
        length: 256
        unfixed: 3
        name: addtnl_data117
      - index: 121
        length: 100
        unfixed: 3
        name: national_sw_resved121
      - index: 122
        length: 100
        unfixed: 3
        name: acq_inst_resvd122
      - index: 123
        length: 100
        unfixed: 3
        name: issr_inst_resvd123
      - index: 125
        length: 256
        unfixed: 3
        name: addtnl_data125
      - index: 128
        length: 8
        name: msg_authn_code128
  connector:
    type: sink-bean
    class-name: com.bizmda.bizsip.sample.sink.controller.EchoServer

主要的配置定义如下:

配置项 配置说明
[].type 约定为“iso-8583”
[].destination-id 配置ISO8583报文中的目的ID
[].source-id 配置ISO8583报文中的源ID
[].fields ISO8583中各域的配置
[].fields[].index 对应ISO8583报文中的域索引号。
0:报文头,缺省域名为“msgHead”
1:报文类型标识,缺省域名为“msgType”
2…128:ISO8583域
[].fields[].name 域名,主要在生成的JSON报文中用,报文头、报文类型标识缺省为msgHead、msgType,2…128域缺省为f2…f128
[].fields[].length 域长度,可变长度域为最大域长度
[].fields[].unfixed 可变长度域类型,可以取值为2和3,代表长度位分别是2位和3位;如果不配置则为定长域。

6.4 Converter使用方法

Converter可以对消息报进行解包和打包,其中解包是把消息报文从外部各种消息格式转换成平台内部消息格式,打包是把消息报文从平台内部消息格式转换成外部各种消息格式,Source模块和Sink服务都能使用Converter,要使用Converter:
首先,需要在source.yml/sink.yml中在对应id的source/sink项下配置converter属性,其中type属性为要使用的Converter类型。
然后,Source模块和Sink服务就可以声明和调用Converter:

  • 显式声明及调用:

需要在Source模块、Source服务类、bean/sink-bean服务类中,显式声明(“Converter.getSinkConverter(sink-id)”)并调用Converter(“converter.pack()、converter.unpack()”)。

  • 隐式声明及调用:

bean/sin-bean服务类如果继承AbstractSinkService类,当前sink的converter是会自动注入的,无需显式进行获取调用句柄,直接可以在代码中调用(“this.converter.pack()、this.converter.unpack()”)。

  • Sink处理器为default类型:

Sink服务类会自己调用Sink Connector,完成和外部应用的通讯交互,无需代码实现。

6.5 域处理函数

域处理函数可以用在以下几个地方:

  1. 预解包:一般定义在source.yml/sink.yml文件中的pre-unpack设置下的functions;
  2. 解包:一般定义在解包yml文件中的unpack-functions;
  3. 打包:一般定义在打包yml文件中的pack-functions。

支持的域处理函数如下(可以自行扩展):

域处理函数名 说明 参数
fill 填充 参数1(必选):填充方式,left/right分别代表左填充和右填充
参数2(必选):填充字符
参数3(可选):填充长度,缺省为当前域长度
trim 去除空格 参数1(可选):left/right分别代表去左边空格和去右边空格,缺省为左右空格都去掉。
decimalFormat 对数字进行格式化 参数1(必选):根据Java中的DecimalFormat类格式,对数字进行格式化

6.6 扩展开发

所有Converter的实现类,有一个公共的抽象父类——AbstractConverter类,解包和打包,分别这调用AbstractConverter类的pack()和unpack()方法。
解包(AbstractConverter类的unpack()方法)流程如下:

  1. 将外部报文进行预解包,这是调用AbstractConverter类的adaptor2json()方法,该方法是根据消息类型配置和相关参数,对外部消息预解成中间的JSON格式的消息,以进行后续进一步处理;
  2. 对预解包进行最终的解包,这是调用AbstractConverter类的json2biz()方法,该方法是用上一步的预解包消息,组织Biz-SIP内部标准JSON消息;在解包时,可能会涉及到格式转换文件的断言规则判断(调用AbstractConverter类的matchMessagePredicateRule()方法),这是根据消息类型来定的。打包(AbstractConverter类的pack()方法)流程如下:
  3. 根据内部标准JSON消息进行预打包,这是调用AbstractConverter类的biz2json()方法;在预打包时,可能会涉及到格式转换文件的断言规则判断(调用AbstractConverter类的matchMessagePredicateRule()方法),这是根据消息类型来定的。
  4. 实现最终的打包,这是调用AbstractConverter类的json2adaptor()方法,这是根据适配器所设置的参数来定的。

目前Biz-SIP中间件支持simple-json、simple-xml、velocity-json等消息格式,具体的实现类在AbstractConverter类中进行关联:

	public static final Map<String,Object> CONVERTER_TYPE_MAP = new HashMap<>();
    static {
        CONVERTER_TYPE_MAP.put("simple-json", SimpleJsonConverter.class);
        CONVERTER_TYPE_MAP.put("simple-xml", SimpleXmlConverter.class);
        CONVERTER_TYPE_MAP.put("velocity-json", VelocityJsonConverter.class);
        CONVERTER_TYPE_MAP.put("velocity-xml", VelocityXmlConverter.class);
        CONVERTER_TYPE_MAP.put("fixed-length", FixedLengthConverter.class);
        CONVERTER_TYPE_MAP.put("velocity-split", VelocitySplitConverter.class);
    }

要实现新的消息处理器,主要开发流程如下:

  1. 继承AbstractConverter,实现biz2json()、json2adaptor()、adaptor2json()、json2biz()这4个方法;
  2. 将实现的消息处理类,在AbstractConverter类的CONVERTER_TYPE_MAP中添加。

7 交易日志监控

7.1 交易日志监控机制

Biz-SIP支持交易日志监控功能,开发者能打开交易日志服务,App服务(正常和延迟服务)、Sink服务(仅限异步)在运行时,根据设置的日志级别,会将交易成功、交易失败、交易挂起的交易事件,通过RabbitMQ队列发送给开发者开发的交易日志监控应用,统一进行各种针对交易状态的事后处理:

image.png

7.2 交易日志监控配置

当app层在application.yml中设置bizsip.rabbitmq-log属性后,平台会把相关的交易日志发送给rabbitMQ消息中间件,发送队列参数:

  • exchange:exchange.direct.bizsip.log
  • routing key:key.bizsip.log

一般需要在以下二类应用启动时,配置application.yml中的bizsip.rabbitmq-log属性:

  • App层的应用:所有的应用都是通过App层应用来进行服务编排的,需要设置rabbitmq-log交易日志监控级别,来对App层的App服务和App延迟服务进行全生命周期的监控;
  • Sink层包括RabbitMQ异步Sink服务的应用:对于涉及RabbitMQ异步Sink服务的应用,也需要单独实现全生命周期的监控,也是需要设置rabbitmq-log交易日志监控级别的。

交易日志包括成功、失败、挂起三种状态,发送交易日志的时机和相关状态如下表:

交易日志状态 发送交易日志时机 收到后建议处理方式
0-App服务成功 1、app层服务执行成功后;
2、app层延迟服务执行成功后。
一般应进行交易成功的后续处理,但是应考虑到App延迟服务和Sink异步服务的情况,延迟服务是作为主服务的子交易(子交易的
parentTraceId为父交易的traceId),而且子交易的交易日志有可能会先于父交易的日志被接收到,应根据先期到达的子交易,进行父交易的后续处理。
1-App服务失败 1、app层服务执行失败后;
2、app层延迟服务执行失败后(不包括服务出现超时错误,并没有超过最大重试次数的情况);
3、app层延迟服务出现超时错误,但超过最大重试次数;
一般应进行交易失败的后续处理。
2-App服务挂起 app层延迟服务放入等待队列,直到被触发执行完成前,都为挂起状态。 一般不做处理,等待交易成功或失败后再行处理。
3-Sink服务成功 sink异步服务执行成功后。 一般应进行交易成功的后续处理,Sink异步服务的交易日志有可能会先于App服务的交易日志被接收到。
4-Sink服务失败 sink异步服务执行失败后。 一般应进行交易失败的后续处理。

bizsip.rabbitmq-log的设置属性包括:

  • success:发送成功、挂起、失败类型的交易日志
  • suspend:发送挂起、失败类型的交易日志
  • fail:发送失败类型的交易日志
  • 不设置或其它:不发送交易日志

接收到的交易日志,为Map数据类型,约定如下:

KEY键 值类型 值说明
type int 0-App服务成功,1-App服务失败,2-App服务挂起,3-异步Sink服务成功,4-异步Sink服务失败
request BizMessage App服务的最初请求报文
response BizMessage App服务的最终响应报文

7.3 交易日志监控应用开发

Biz-SIP平台在配置好后,会把相关的交易日志发送给rabbitMQ消息中间件,开发者可以开发RabbitMQ中间件客户端侦听应用,对收到的交易日志进行处理,相关队列侦听参数:

  • exchange:exchange.direct.bizsip.log
  • routing key:key.bizsip.log

交易日志监控应用例子,如下所示:

@Service
public class AppLogQueueListener {
    public static final String APP_LOG_QUEUE = "queue.bizsip.applog";
    private Jackson2JsonMessageConverter jackson2JsonMessageConverter =new Jackson2JsonMessageConverter();

    @RabbitListener(bindings = @QueueBinding(
            value = @Queue(value = APP_LOG_QUEUE, durable = "true", autoDelete = "false"),
            exchange = @Exchange(value = BizConstant.BIZSIP_LOG_EXCHANGE, type = ExchangeTypes.DIRECT, durable = "true", autoDelete = "false"),
            key = BizConstant.BIZSIP_LOG_ROUTING_KEY))
    public void process(Message message) {
        Map<String,Object> map = (Map<String,Object>)jackson2JsonMessageConverter.fromMessage(message);
        int type = (int)map.get("type");
        BizMessage<JSONObject> inBizMessage = new BizMessage<>((Map) map.get("request"));
        BizMessage<JSONObject> outBizMessage = new BizMessage<>((Map) map.get("response"));
        log.info("\ntype:{}\nrequest:\n{}\nresponse:\n{}",type,
                BizUtils.buildBizMessageLog(inBizMessage),
                BizUtils.buildBizMessageLog(outBizMessage));
        return;
    }
}

8 风控规则

8.1 风控概述

  • 风控的定义

在金融系统中,涉及到用户注册、用户登录、提现、充值、兑换等交易,所谓风控就是对这些交易(各种交易行为的统称)非正常程度的评估。

  • 风险控制

在金融系统中,是如何处理风险的?
以“用户注册”为例:用户提交注册请求的时候,我们会检测当前设备或者IP 在当前时间段的注册数,如果达到某一数量,限制注册一段时间。
以“用户登录”为例, 很典型的处理方式,累计密码错误次数(通常3次以上),我们会延长其尝试时间。
以“提现”为例, 限制小额提现的交易次数, 大额交易需要提前预约等。

  • 风险的量化

所谓量化,就是以用户的行为特征为依据,然后通过科学的方式进行计算,得出一个综合的分数,通过这个风险的分数可以直观的判断交易的风险程度。
建议采用综合累计积分来对风险进行量化, 简单的来说:
f(r) = (a*x +n1)+ (b*y + n2) + (c*z + n3)
f(r)为风险总分, x、y、z 为 特征,, a、b、c 为权重 , n 为偏移量。
对于量化后的分数,可以设置警戒值和拒绝值,对于低于拒绝值的交易,直接拒绝返回,对于介于警戒值和拒绝值之间的交易,可以移交由运营人员进行跟进处理。

  • 风险的集中管理

我们知道企业做大后,会有很多产品线,而几乎每一个产品都需要做风险控制,通常我们都是把风险控制的逻辑写在相应的业务功能代码里,
大量重复的风控逻辑代码耦合在我们的业务逻辑里面,随着时间的累积,代码会变得异常复杂,会给后期的维护造成巨大的人力成本和风险。
所以风险的集中管理势在必行,只有通过一个统一的风险管控,使用规则引擎,采用配置的形式,平台化管理不同产品风控策略才是一种更好的方式,这正是Biz-SIP风控模块所要解决的问题。

8.2 风控规则处理机制

风控规则是在App层进行处理的,如果当前App服务配置了风控规则,就会在App服务处理前,自动进行风控评价,根据评价结果决定是否继续执行App服务,并在App服务执行成功后更新风控相关指标数据。
风控规则处理流程如下图所示:

8.3 配置

8.3.1 control-rule.yml

control-rule.yml定义了风控指标的配置参数:

配置项 配置说明
metrics.[].name 指标名称,要求唯一。
metrics.[].desc 指标描述
metrics.[].fliding-window-time 滑动时间窗口,单位为ms,表示当前指标只汇总当前时间往前单位ms的指标个数和指标值汇总。
metrics.[].fixed-window-time 固定时间窗口,表示当前指标在同一个时间窗口中指标个数和指标值汇总,如为天,则为当天的0:00-24:00之间的指标汇总,约定如下:
y:年
M:月
d:天
H:小时

具体案例如下所示:

metrics:
  - name: metric-1
    desc: 每分钟提现交易度量
    fliding-window-time: 60000
  - name: metric-2
    desc: 每日充值交易度量
    fixed-window-time: d

8.3.2 control-rule目录

Biz-SIP中间件能对App服务进行风控规则处理,预先定义各项风控指标,App服务风控规则能根据上送App服务中报文的消息域,进行预处理,根据风控指标进行风控规则计算,并把风控处理建议传送给App服务进行后续处理。
App服务风控规则,是通过风控规则脚本来设置的,风控规则脚本是配置在config/control-rule目录下,配置文件的目录和文件名和对应的服务脚本位置应一致,配置文件后缀为“.yml”,配置相关参数:

配置项 配置说明
pre-rating-script 风控评价前处理脚本
【注入参数】
request【可修改】:请求JSON对象(不含报文头)
【返回值】
(无)
rules.[].name 风控规则名称
rules.[].script 风控规则计算脚本
【注入参数】
request【不可修改】:请求JSON对象(不含报文头)
【返回值】
风控计算结果
rating-script 风控评价脚本
【注入参数】
request【可修改】:请求JSON对象(不含报文头)
rules[]【不可修改】:风控计算结果集
control【可修改】:风控处理建议,control.action为处理动作,control.message为处理信息。
【返回值】
(无)
updating-script App服务结束后更新指标脚本
【注入参数】
request【不可修改】:请求JSON对象(不含报文头)
response【可修改】:响应BizMessage对象(含报文头),可以修改。
control【不可修改】:风控处理建议,control.action为处理动作,control.message为处理信息,control.rules[]为风控计算结果集。
【返回值】
(无)
sub-control-rules 可配置多个子风控规则,子风控规则是通过SpEL表达式来配置的,并顺序依次执行的。
其中SpEL表达式注入变量:
- request:请求报文的JSONObject对象

具体案例如下:
config/control-rule/bean/sample1-control-rule.yml

pre-rating-script:
rules:
  - name: '获取每分钟交易次数'
    script: |
      var count = sip.getMetric("metric-sample1-1","count",request.account);
      log.info("[rules]每分钟交易次数: {}",count);
      return count;
  - name: '获取每分钟交易金额累计'
    script: |
      var score = sip.getMetric("metric-sample1-1","total-score",request.account);
      log.info("[rules]每分钟交易金额: {}",score);
      return score;
  - name: '获取每分钟交易对手数量'
    script: |
      var count = sip.getMetric("metric-sample1-2","z-card",request.account);
      log.info("[rules]每分钟交易对手数量: {}",count);
      return count;
rating-script: |
  control.message = "";
  if (rules[0].result >= 10) {
    control.action = "error";
    control.message = control.message + "每分钟交易次数超过10次:" + rules[0].result + ",";
  }
  if (rules[1].result >= 10000) {
    control.action = "error";
    control.message = control.message + "每分钟交易金额超过10000元:" + rules[1].result + ",";
  }
  if (rules[2].result >= 2) {
    control.action = "warn";
    control.message = control.message + "每分钟交易对手数量超过2个:" + rules[2].result + ",";
  }
  return;
updating-script: |
  sip.addRecord("metric-sample1-1",request.account,request.amount);
  sip.addRecord("metric-sample1-2",request.account,request.other_account,request.amount);
  return;
sub-control-rules:
  - /bean/sample1-control-rule/#{#request[tran_code]}

8.3.3 风控规则函数

风控规则脚本中,可以通过风控规则函数来进行风控指标的获取、风控交易的增加。

8.3.3.1 sip.getMetric(参数1,参数2,参数3)

  • 传入参数

    参数 类型 描述
    1 String 指标名称(在control-rule.yml中定义)
    2 String 指标结果类型:
    - count:风控交易个数
    - total-score:风控交易指标值汇总数
    - z-card:第二主键键值的个数
    3 String 主键值
  • 返回值:获取指定指标的统计结果

// 获取每分钟交易次数
var count = sip.getMetric("metric-sample1-1","count",request.account);

8.3.3.2 sip.addRecord(参数1,参数2,参数3)

  • 传入参数

    参数 类型 描述
    1 String 指标名称(在control-rule.yml中定义)
    2 String 主键值
    3 Number 指标值(基于主键值项下),整数、浮点数都可以,字符串会强制转换成数字
  • 返回值:无

// 增加指标“metric-sample-1”项下,关联到request.account项下的风控记录
sip.addRecord("metric-sample1-1",request.account,request.amount);

8.3.3.3 sip.addRecord(参数1,参数2,参数3,参数4)

  • 传入参数

    参数 类型 描述
    1 String 指标名称(在control-rule.yml中定义)
    2 String 主键值
    3 String 第二主键值
    4 Number 指标值(基于主键值项、第二主键值项下),整数、浮点数都可以,字符串会强制转换成数字
  • 返回值:无

// 增加指标“metric-sample-2”项下,关联到request.account(主键)、request.other_account(第二主键)项下的风控记录
sip.addRecord("metric-sample1-2",request.account,request.other_account,request.amount);

8.4 开发

在App服务类中,可以通过代码实现对风控结果的查询,并且能控制是否执行后续的处理后更新指标脚本(
updating-script)。

  • BizUtils.getControlRule():获取当前交易的控制规则(仅支持App层调用)
public static cn.hutool.json.JSONObject getControlRule()
返回:
当前交易的控制规则

返回的控制规则为JSONObject类,主要包括以下键值:

键值 描述
action 在rating-script脚本中设置的action,为“error”值时App服务自动返回异常,其它值会继续调用App服务,但在App服务类中,可以通过getControlRule()获取。
message 在规则配置文件“rating-script”段中设置的message
rules 为JSONArray,和配置文件“rules”段中配置相一致:
- name:规则名称
- result:规则执行结果
如果涉及子风控规则,则rules会根据执行顺序,逐步增加执行过的风控规则所产生的rules。
  • BizUtils.setControlRuleUpdatingFlag():设置控制规则后更新标识(true为更新,flase为不更新,缺省为true)
public void setControlRuleUpdatingFlag(boolean flag)
参数:
flag - 更新标识(true为更新,flase为不更新)

附录1:Biz-SIP内部标准消息规范

Biz-SIP中间件的内部消息规范如下:

参数 类型 是否必填 描述
code int Y 返回码,0为成功,非0为失败
message String N 返回消息
extMessage String N 返回扩展消息
appServiceId String Y 对应App服务ID
traceId char(32) Y 由Biz-SIP统一生成的唯一跟踪ID,每个聚合服务中traceId相同
parentTraceId char(32) N 父交易服务的traceId,父交易服务一般调用延迟服务,会产生子交易服务
timestamp long Y 由Biz-SIP统一生成的时间戳,为聚合服务的最初发起时间,为1970年1月1日零点整至发起时间的毫秒数
data String(JSON格式) N 传送的数据,一般为JSON格式

附录2:App层接口(OpenAPI接口)规范

App层接口和OpenAPI接口,是Biz-SIP对Source模块和平台开放接口提供的标准接口接入,包括RESTful同步调用接口和RabbitMQ异步调用接口。

1、RESTful同步调用接口

规范如下:

URL http://{地址}:{端口}/api
HTTP请求头

- Content-Type
application/json

- Biz-Service-Id
调用的App服务ID

- (可选)Biz-Trace-Id
预先设置的App服务的TraceId,不设置则由App服务自动生成TraceId。
请求包 JSON报文
响应包 Biz-SIP内部标准消息(参见“Biz-SIP内部标准消息规范”)

2、RabbitMQ异步调用接口

App服务RabbitMQ接口,是通过RabbitMQ消息中间件来提供的,发送时相关参数:

Exchange exchange.direct.bizsip.app
代码中可以使用:BizConstant.APP_SERVICE_EXCHANGE
RoutingKey key.bizsip.app
代码中可以使用:BizConstant.APP_SERVICE_ROUTING_KEY

发送的Message消息,约定如下:

body 为调用JSON报文的byte[]字节流(UTF-8码制),通过message.withBody()设置,调用JSON报文包括:
- Biz-Service-Id:调用的App服务ID
- Biz-Trace-id:预先设置的App服务的TraceId,不设置则由App服务自动生成TraceId。
- data:调用的请求JSON报文
replyTo(可选) App服务执行完成后,响应报文回调的队列,通过setReplyTo()设置(注:如采用RPC调用模式,则无需设置)。
correlationId(可选) App服务执行完成后的回调消息,会自动带回请求时设置的correlationId,通过setCorrelationId()设置(注:如采用RPC调用模式,则无需设置)。

附录3:Sink层接口规范

Sink层接口,是App模块调用所有Sink模块的接口,规范如下:

Content-Type application/json
请求包 Biz-SIP内部标准消息(参见“Biz-SIP内部标准消息规范”)
响应包 Biz-SIP内部标准消息(参见“Biz-SIP内部标准消息规范”)

附录4:接口类方法调用的平台内部消息约定

在App层App服务和Sink层Sink服务的调用时,都是通过Biz-SIP内部消息来进行调用的。
以下二种的服务调用时,会涉及接口类方法的调用:

  • “bean-service”类型的App服务调用
  • “bean”类型的Sink服务调用

在调用时,会转换成平台内部消息,转换时采用以下约定:

域名 说明
className 关联到调用接口类的类名
methodName 关联到调用接口类调用方法的方法名
params 调用的参数,为JSONArray类型,支持多个参数,每个参数可以为JSONObject或JSONArray

例如,要调用HelloInterface接口的hello()方法,HelloInterface接口如下:

public interface HelloInterface {
    String hello(String message);
}

传入参数为"world“,即调用hello(“world”),调用时转换成的平台内部消息数据域(data)应为:

{
    "className": "com.bizmda.bizsip.sample.sink.api.HelloInterface",
    "methodName": "hello",
    "params": [
        "world"
    ]
}

附录5:服务Script脚本相关对象

在Script服务整合器脚本中,主要使用的特殊对象包括:sip对象、log对象等。

1 sip对象

sip.callSink(sinkId,inData)

执行适配器服务调用
参数:

  • sinkId:Sink ID,在sink.yml配置文件中约定。
  • inData:传送给服务端的数据

返回:

  • 服务端返回的数据,是BizMessage内部消息类型。
sip.doDelayService(serviceId,inData,delayMilliseconds1,delayMilliseconds2…)

执行SAF服务调用
参数:

  • serviceId:SAF服务ID,为配置在config/script中的服务脚本。
  • inData:传送给服务的数据,在脚本中以“bizmessage”进行存取。
  • delayMilliseconds…:延迟毫秒数,可以有多个,分别代表第1次、第2次…..的服务延迟毫秒数

返回:

  • SAF子交易的BizMessage内部消息类型。
sip.getServiceRetryCount()

获取SAF服务的当前重试次数
参数:

  • (无)

返回:

  • SAF服务的当前重试次数
sip.saveAsyncContext(transactionKey,context,timeout)

保存异步服务上下文
参数:

  • transactionKey:异步回调的全局唯一交易索引键
  • context:注入回调聚合服务的上下文变量
  • timeout:异步服务超时时间,单位(秒)

返回:

  • (无)
sip.loadAsyncContext(transactionKey)

保存异步服务上下文
参数:

  • transactionKey:异步回调的全局唯一交易索引键

返回:

  • 异步服务上下文

2 log对象

// 使用方法与SLF4J完全一致
log.info('Hello');
log.info('Hello {}','MagicAPI');
log.debug('test');

3 db对象

select
  • 入参:sql:String
  • 返回值:List<Map<String,Object>>
  • 函数说明:查询List结果
return db.select('select * from sys_user');
selectInt
  • 入参:sql:String
  • 返回值:Integer
  • 函数说明:查询int结果
//需要保证结果返回一行一列
return db.selectInt('select count(*) from sys_user');
selectOne
  • 入参:sql:String
  • 返回值:Map<String,Object>
  • 函数说明:查询单个对象
return db.selectOne('select * from sys_user limit 1');
selectValue
  • 入参:sql:String
  • 返回值:Object
  • 函数说明:查询单个值
//需要保证结果返回一行一列 
return db.selectValue('select user_name from sys_user limit 1');
page
  • 入参:sql:String
  • 入参:limit : long 可省略
  • 入参:offset : long 可省略
  • 返回值:Object 默认返回为Object,如果自定义了分页结果,则返回自定义结果
  • 函数说明:分页查询
//需要保证结果返回一行一列 
return db.page('select * from sys_user');
update
  • 入参:sql:String
  • 返回值:Integer
  • 函数说明:执行增删改操作
return db.update('delete from sys_user');
cache
  • 入参:cacheName:String
  • 入参:ttl:long 缓存有效期,单位毫秒,可省略,默认为配置的值
  • 返回值:db //返回当前实例,即可以链式调用
  • 函数说明:使用缓存
//使用缓存名为user的查询
return db.cache('user').select('select * from sys_user');
transaction
  • 入参:callback:Function,回调函数,可省略
  • 返回值:Object
  • 函数说明:开启事务
  • 自动事务
var val = db.transaction(()=>{
    var v1 = db.update('...');
    var v2 = db.update('....');
    return v2;
});
return val;
  • 手动开启事务
var tx = db.transaction();  //开启事务
try{
    var value = db.update('...');
    db.commit();    // 提交事务
    return value;
}catch(e){
    db.rollback();  // 回滚事务
}

4 redis对象

var data = bizmessage.data;
var value = data.accountNo;
//通过redis.命令名(命令参数,命令参数,.....,命令参数) 进行调用,其中命令名不区分大小写
redis.set('key',value); //调用set命令
//redis.setex('key',10,value);    //调用setex命令
data.accountName = redis.get('key');    //调用get命令
return data;

附录6:内部运行原理图

image.png

上图中:

  • 蓝色虚框所呈现的部分,主要是Biz-SIP中间件的配置文件。
  • 蓝底黑字所呈现的部分,表示是需要Java代码开发的,涉及适配层、应用层和领域层。
  • 蓝色虚箭头线所涉及的接口,就是不同层次之间需要互相依赖的接口定义。
上一页
下一页