系统开发

介绍使用Java代码来开发基于Biz-SIP中间件的系统。

一、概述

Biz-SIP中间件在开发应用时,主要涉及配置文件编写和Java代码开发,系统架构如下所示:

image.png
上图中蓝底黑字所呈现的部分,就是需要Java代码开发的,涉及适配(source)层、应用(app)层和领域(sink)层。蓝色虚箭头线所涉及的接口,就是不同层次之间需要互相依赖的接口定义。

二、source层

Source服务位于适配层,主要功能是接入外部发起的服务,主要功能有:

  • 通讯适配:和外部接入通讯方式的适配,接收外部发起的请求消息,并把处理过后的响应消息送回。
  • 消息转换:把外部发来的请求消息,转换成Biz-SIP平台内部消息格式;并把平台的响应消息,打包成外部的消息格式。
  • app服务调用:往后调用app层服务。
  • 渠道接入的安全处理,包括报文加解密、加签解签;
  • 渠道接入的报文校验;
  • 渠道接入的交易处理,包括流水记录、风控处理、终端管理等;

1. pom.xml配置

在pom.xml文件中添加source-spring-boot-starter依赖,以及app层Integrator模块和领域层sink模块相关联的client接口相关包:

        <dependency>
            <groupId>com.bizmda.bizsip</groupId>
            <artifactId>source-spring-boot-starter</artifactId>
          	<version>1.0-SNAPSHOT</version>
        </dependency>

2. application.yml配置

Source模块的application.yml中的主要相关参数:

配置项 配置说明
bizsip.config-path Biz-SIP中间件配置文件根目录位置
bizsip.integrator-url 服务聚合器Integrator的接口url地址

例子:

server:
  port: 8080

spring:
  application:
    name: bizsip-sample-source

  cloud:
    nacos:
      discovery:
        server-addr: bizsip-nacos:8848
  #  以下配置在Istio部署中打开,以不采用NACOS注册中心,而采用etcd注册机制
  #  cloud:
  #    service-registry:
  #      auto-registration:
  #        enabled: false  #禁用注册中心服务

bizsip:
  config-path: /var/bizsip/config
  integrator-url: http://bizsip-integrator/api

logging:
  level:
    com.bizmda.bizsip: debug

3. 通讯接入的适配

针对不同的通讯接入方式,source层会采用不同的接入方式,主要有:

  • Controller:接入http的请求,包括RESTful报文请求接入;
  • Netty:接入TCP协议的请求;
  • RabbitMQ:实现异步转换成同步的接入;
  • 等等…

在source模块下,有Netty和RESTful接入的source模块,可以参考:

├── source      Source服务接入子模块
│   ├── netty-source		Netty接入Source子模块
│   ├── rest-source			Restful接入Source

4. source模块打包和解包

平台提供JSON、XML、定长、有分隔符、ISO8583的格式转换器(converter),如果采用这些平台提供的converter,就需要约定Source ID。 首先,应在在source.yml中配置相应的converter:

- id: xml-source
  converter:
    type: simple-xml

然后,就可以在代码中通过Converter.getSourceConverter()绑定converter,进行相应的解包和打包操作:

@RestController
@RequestMapping("/personal")
public class XmlController {
    ......
    // 获取source层消息转换句柄
    private Converter converter = Converter.getSourceConverter("xml-source");

    @PostMapping(value = "/getCustomerAndAccountList", consumes = "application/xml", produces = "application/xml")
    public String getCustomerAndAccountList(@RequestBody String inMessage) throws BizException {
        // 消息解包操作
        JSONObject jsonObject = this.converter.unpack(inMessage.getBytes());
        String customerId = (String)jsonObject.get("customerId");
        CustomerAndAccountList customerAndAccountList = this.personalAppInterface.getCustomerAndAccountList(customerId);
        jsonObject = JSONUtil.parseObj(customerAndAccountList);
        // 消息打包并返回
        return new String(this.converter.pack(jsonObject));
    }
  	...
 }

如果采用Java编程实现报文打解包,就可以不考虑Source ID的约定。

5. 报文校验

在app层中,有对app层聚合服务的校验机制,包括域级校验和服务级校验,可以考虑统一在app层进行报文检验配置:

config/check-rule/openapi/sample1.yml

field-check-rules:
  - field: email
    rule: isEmail
    message: '不是邮箱地址:{}'
  - field: sex
    rule: notEmpty
    message: '不能为空'
  - field: mobile
    rule: isMatchRegex
    args:
      - '^[1][3,4,5,6,7,8,9][0-9]{9}$'
    message: '不是手机号{}'
field-check-mode: one
service-check-rules:
  - script: if(data.sex == '1')
            {return '性别不符合!';}
    message: '额度超限'
service-check-mode: one

但如果是仅针对于该特定渠道的报文校验,就只能考虑在渠道接入模块做个性化处理。

6. 交易处理

对于简单的交易处理,可以考虑用Spring Service来做交易处理。 在编码时注意,调用app层聚合服务可能会导致时延较长,应避免在有数据库操作的Spring Service类中调用app层聚合服务,并在有数据库操作的Spring Service类上尽量加@Transactional,以避免长时间锁表。

7. 调用app层服务

统一采用“SourceClientFactory.getBizServiceClient(Class tClass,String bizServiceId)”来调用app层聚合服务(tClass必须是接口类),二种调用方式:

  • 接口约定调用:由app层定义接口调用的Interface接口类,由渠道接入模块所引用。如下例所示:
@RestController
@RequestMapping("/personal")
public class PersonalController  {
    private PersonalAppInterface personalAppInterface = SourceClientFactory
            .getBizServiceClient(PersonalAppInterface.class,"app/personal");
	...

    @GetMapping(value ="/getCustomerAndAccountList")
    public CustomerAndAccountList getCustomerAndAccountList(String customerId) {
        return this.personalAppInterface.getCustomerAndAccountList(customerId);
    }

    @GetMapping(value ="/getAccountListByCustomerId")
    public List<Account> getAccountListByCustomerId(String customerId) {
        return this.personalAppInterface.getAccountListByCustomerId(customerId);
    }

    @GetMapping(value ="/getCustomer")
    public Customer getCustomer(String customerId) {
        return this.personalAppInterface.getCustomer(customerId);
    }
	...
}

  • 非接口约定调用:采用平台通用JSONObject类型,Interface接口类统一采用BizMessageInterface接口类,并统一用call()来调用app层聚合服务。如下例所示:
@RestController
@RequestMapping("/personal")
public class PersonalController  {
    private BizMessageInterface payment1SinkInterface = SourceClientFactory
            .getBizServiceClient(BizMessageInterface.class,"sink/payment1");
    ...
        
    @GetMapping(value ="/send2Payment")
    public BizMessage<JSONObject> send2Payment(String message) throws BizException {
        JSONObject jsonObject = new JSONObject();
        jsonObject.set("message",message);
        return this.payment1SinkInterface.call(jsonObject);
    }
}

8. OpenAPI接口模块

OpenAPI接口模块主要功能:

  • 敏感数据加解密及报文加签验签;
  • RESTful接口封装;
  • OpenAPI接口文档的生成;
  • Sandbox;
  • 聚合服务的调用

OpenAPI接口模块如下例:

@RestController
@RequestMapping("/personal")
public class OpenapiController {
    private PersonalAppInterface personalAppInterface = SourceClientFactory
            .getBizServiceClient(PersonalAppInterface.class,"app/personal");
    private BizMessageInterface payment1SinkInterface = SourceClientFactory
            .getBizServiceClient(BizMessageInterface.class,"sink/payment1");

    @GetMapping(value ="/getCustomerAndAccountList")
    public CustomerAndAccountList getCustomerAndAccountList(String customerId) {
        return this.personalAppInterface.getCustomerAndAccountList(customerId);
    }

    @GetMapping(value ="/getAccountListByCustomerId")
    public List<Account> getAccountListByCustomerId(String customerId) {
        return this.personalAppInterface.getAccountListByCustomerId(customerId);
    }

    @GetMapping(value ="/getCustomer")
    public Customer getCustomer(String customerId) {
        return this.personalAppInterface.getCustomer(customerId);
    }

    @GetMapping(value ="/getCustomerAndSaf2Payment2")
    public Customer getCustomerAndSaf2Payment2(String tranCode, String customerId) throws BizException {
        return this.personalAppInterface.getCustomerAndSaf2Payment2(tranCode,customerId);
    }

    @GetMapping(value ="/send2Payment1")
    public BizMessage<JSONObject> send2Payment1(String message) throws BizException {
        return this.personalAppInterface.send2Payment1(message);
    }

    @GetMapping(value ="/send2Payment")
    public BizMessage<JSONObject> send2Payment(String message) throws BizException {
        JSONObject jsonObject = new JSONObject();
        jsonObject.set("message",message);
        return this.payment1SinkInterface.call(jsonObject);
    }
}

OpenAPI接口模块可以根据接口要求开发controller类,并加上API注释,自动生成Swagger/Knife4j文档。

三、app层

app层服务功能:

  • 领域层服务(Sink服务)的编排
  • 领域层服务(Sink服务)透传到适配层服务(Source服务和OpenAPI)
  • 存储转发服务的封装
  • 补偿交易的封装

1. pom.xml配置

在pom.xml文件中添加integrator-spring-boot-starter依赖,以及领域层sink模块相关联的client接口相关包:

        <dependency>
            <groupId>com.bizmda.bizsip</groupId>
            <artifactId>integrator-spring-boot-starter</artifactId>
          	<version>1.0-SNAPSHOT</version>
        </dependency>

2. application.yml配置

app层Integrator模块中的application.yml中的主要相关参数:

配置项 配置说明
bizsip.config-path Biz-SIP中间件配置文件根目录位置
bizsip.rabbitmq-log true:会把交易日志(成功、失败、挂起)发送给rabbitMQ
false(默认):不发送服务处理日志
发送rabbitMQ相关参数:
- exchange:exchange.direct.bizsip.log
- RoutingKey:queue.log.routing.key
server.port 服务整合器(Integrator)的微服务端口,建议用8888端口,避免和其它端口相冲突
spring.cloud.nacos.discovery.server-addr Nacos服务端口
spring.datasource.* 数据库连接配置(用于服务脚本中db对象)
spring.redis.* redis连接配置(用于服务脚本中redis对象)
spring.rabbitmq.* RabbitMQ配置(用于事务管理器)

例子:

server:
  port: 8888

spring:
  application:
    name: bizsip-integrator

  cloud:
    nacos:
      discovery:
        server-addr: bizsip-nacos:8848
#  以下配置在Istio部署中打开,以不采用NACOS注册中心,而采用etcd注册机制
#  cloud:
#    service-registry:
#      auto-registration:
#        enabled: false  #禁用注册中心服务

  datasource:
    url: jdbc:mysql://bizsip-mysql/sip
    username: root
    password: bizsip123456
    driver-class-name: com.mysql.jdbc.Driver

  redis:
    redisson:
      enable: true
    host: bizsip-redis
    port: 6379
    timeout: 6000
    database: 0
    lettuce:
      pool:
        max-active: 10 # 连接池最大连接数(使用负值表示没有限制),如果赋值为-1,则表示不限制;如果pool已经分配了maxActive个jedis实例,则此时pool的状态为exhausted(耗尽)
        max-idle: 8   # 连接池中的最大空闲连接 ,默认值也是8
        max-wait: 100 # # 等待可用连接的最大时间,单位毫秒,默认值为-1,表示永不超时。如果超过等待时间,则直接抛出JedisConnectionException
        min-idle: 2    # 连接池中的最小空闲连接 ,默认值也是0
      shutdown-timeout: 100ms

  rabbitmq:
    virtual-host: /
    host: bizsip-rabbitmq
    port: 5672
    username: springcloud
    password: springcloud
    listener:
      simple:
        concurrency: 5
        max-concurrency: 15
        prefetch: 10

bizsip:
  config-path: /var/bizsip/config

logging:
  level:
    com.bizmda.bizsip: debug

3. 二种app服务类型

聚合应用服务和延迟服务,有二种服务类型:bean-service和app-bean-service。

3.1 基于Java接口类的bean-service

bean-service:基于Java接口类调用的服务类型,绑定的服务类是基于预先约定好的Java接口。

service.yml:
- bizServiceId: app/sample-bean-service
  type: bean-service
  className: com.sample.app.service.SampleBeanService
  
服务代码例子:
@Service
public class SampleBeanService implements SampleBeanServiceInterface {
    private BizMessageInterface sampleSinkBeankSinkInterface = IntegratorClientFactory
            .getSinkClient(BizMessageInterface.class,"sample-sink-bean-sink");
    private HelloInterface helloInterface = IntegratorClientFactory
            .getSinkClient(HelloInterface.class,"sample-bean-sink");
    @Override
    public String callSampleSinkBeanSink(String message) {
        JSONObject jsonObject = new JSONObject();
        jsonObject.set("message",message);
        BizMessage<JSONObject> bizMessage;
        try {
            bizMessage = sampleSinkBeankSinkInterface.call(jsonObject);
        } catch (BizException e) {
            e.printStackTrace();
            return null;
        }
        return (String)bizMessage.getData().get("message");
    }

    @Override
    public String callSampleBeanSink(String message) {
        return this.helloInterface.hello(message);
    }
}

3.2 基于JSON的AppBeanInterface

app-bean-service:基于平台标准JSON接口调用的服务类型,绑定的服务类是继承AppBeanInterface接口。

service.yml:
- bizServiceId: app/sample-app-bean-service
  type: app-bean-service
  className: com.sample.app.service.SampleAppBeanService
  
服务代码例子:
@Service
public class SampleAppBeanService implements AppBeanInterface {
    private HelloInterface helloInterface = IntegratorClientFactory
            .getSinkClient(HelloInterface.class,"sample-bean-sink");

    @Override
    public JSONObject process(JSONObject jsonObject) throws BizException {
        String message = (String)jsonObject.get("message");
        jsonObject.set("message","sample-app-bean-service: Hello,"+message+";"
                + this.helloInterface.hello(message));
        return jsonObject;
    }
}

4. 调用Sink服务

统一采用“IntegratorClientFactory.getSinkClient(Class tClass,String sinkId)”来调用领域层Sink服务(tClass必须是接口类),有二种调用方式:

  • 接口约定调用:由app层定义接口调用的Interface接口类,由渠道接入模块所引用。如下例所示:
@Service
public class PersonalAppService implements PersonalAppInterface {
    private AccountSinkInterface accountSinkInterface = IntegratorClientFactory
            .getSinkClient(AccountSinkInterface.class,"account-sink");
    private CustomerSinkInterface customerSinkInterface = IntegratorClientFactory
            .getSinkClient(CustomerSinkInterface.class,"customer-sink");
	...
        
    @Override
    public CustomerAndAccountList getCustomerAndAccountList(String customerId) {
        Customer customer = this.customerSinkInterface.getCustomer(customerId);
        List<Account> accountList = this.accountSinkInterface.getAccountListByCustomerId(customerId);
        CustomerAndAccountList customerAndAccountList = new CustomerAndAccountList();
        customerAndAccountList.setCustomer(customer);
        customerAndAccountList.setAccountList(accountList);
        return customerAndAccountList;
    }
	...
}
  • 非接口约定调用:采用平台通用JSONObject类型,Interface接口类统一采用BizMessageInterface接口类,并统一用call()来调用app层聚合服务。如下例所示:
@Service
public class PersonalAppService implements PersonalAppInterface {
	...
    private BizMessageInterface payment1SinkInterface = IntegratorClientFactory
            .getSinkClient(BizMessageInterface.class,"payment1-sink");
    private BizMessageInterface payment2SinkInterface = IntegratorClientFactory
            .getSinkClient(BizMessageInterface.class,"payment2-sink");
	...
    @Override
    public BizMessage<JSONObject> send2Payment1(Object message) throws BizException {
        JSONObject jsonObject = new JSONObject();
        jsonObject.set("message",message);
        return this.payment1SinkInterface.call(jsonObject);
    }

    @Override
    public BizMessage send2Payment2(String tranMode, String tranCode, Object message) throws BizException {
        JSONObject jsonObject = new JSONObject();
        jsonObject.set("tranCode",tranCode);
        jsonObject.set("tranMode",tranMode);
        jsonObject.set("message",message);
        return this.payment2SinkInterface.call(jsonObject);
    }
    ...
}

5. 调用延迟服务

延迟服务也是位于app层的聚合服务,延迟服务只能由app层的聚合服务来进行嵌套调用,不能从适配层直接调用延迟服务。 App调用延迟服务,统一采用“IntegratorClientFactory.getDelayBizServiceClient(Class tClass, String bizServiceId, int… delayMilliseconds)”来调用(tClass必须是接口类),同样有接口约定调用和非接口约定调用二种方式,其中非接口约定调用是采用BizMessageInterface接口类,并统一用call()来调用app层聚合服务,如下例所示:

@Service
public class PersonalAppService implements PersonalAppInterface {
    private AccountSinkInterface accountSinkInterface = IntegratorClientFactory
            .getSinkClient(AccountSinkInterface.class,"account-sink");
    private CustomerSinkInterface customerSinkInterface = IntegratorClientFactory
            .getSinkClient(CustomerSinkInterface.class,"customer-sink");
    private BizMessageInterface payment1SinkInterface = IntegratorClientFactory
            .getSinkClient(BizMessageInterface.class,"payment1-sink");
    private BizMessageInterface payment2SinkInterface = IntegratorClientFactory
            .getSinkClient(BizMessageInterface.class,"payment2-sink");
    private PersonalAppInterface personalAppDelayInterface = IntegratorClientFactory
            .getDelayBizServiceClient(PersonalAppInterface.class,"app/personal",
                    0,1000,2000,4000,8000,16000,32000);
	...
    @Override
    public void payoutForward(String tranMode,String accountId, long amount) throws BizException {
        log.info("account出金:{},{}",accountId,amount);
        this.accountSinkInterface.payout(accountId,amount);
        JSONObject jsonObject = new JSONObject();
        jsonObject.set("tranCode","pay");
        jsonObject.set("tranMode",tranMode);
        jsonObject.set("accountId",accountId);
        jsonObject.set("tranAmount",amount);
        BizMessage<JSONObject> bizMessage = null;
        try {
            log.info("payment缴费...");
            bizMessage = this.payment2SinkInterface.call(jsonObject);
        } catch (BizException e) {
            if (e.isTimeOutException()) {
                log.info("payment交易超时,开始payout补偿...");
                this.personalAppDelayInterface.payoutForwardCompensate(jsonObject);
                return;
            }
            else {
                throw e;
            }
        }
        log.info("payment缴费成功!");
        log.info("payout成功!");
    }
}

6. 交易日志的接收与处理

当app层在application.yml中设置bizsip.rabbitmq-log为true时,平台会把交易日志发送给rabbitMQ消息中间件,发送队列参数:

  • exchange:exchange.direct.bizsip.log
  • RoutingKey:queue.log.routing.key

交易日志包括成功、失败、挂起三种状态,发送交易日志的时机和相关状态如下表:

交易日志状态 发送交易日志时机 收到后建议处理方式
成功 1、app层聚合服务执行成功后;
2、app层延迟服务执行成功后。
一般应进行交易成功的后续处理,但是应考虑到延迟服务的情况,延迟交易是作为主服务的子交易(子交易的parentTraceId为父交易的traceId),而且子交易的交易日志会先于父交易的日志被接收到,应根据先期到达的子交易,进行父交易的后续处理。
失败 1、app层聚合服务执行失败后;
2、app层延迟服务执行失败后(服务出现超时错误,并没有超过最大重试次数除外);
3、app层延迟服务出现超时错误,但超过最大重试次数。
一般应进行交易失败的后续处理。
挂起 app层延迟服务放入等待队列,直到被触发期间,都为挂起状态。 一般不做处理,等待交易成功或失败后再行处理。

接收到的交易日志,为Map数据类型,约定如下:

KEY键 值类型 值说明
type int 0-成功,1-失败,2-挂起
request BizMessage 聚合服务的最初请求报文
response BizMessage 聚合服务的最终响应报文

四、sink层

sink层主要包括二类模块:第三方接入模块和交易处理模块:

  • 第三方系统接入模块,会涉及第三方应用的调用,包括同步调用和异步调用。
  • 交易处理模块就是一种特殊的Sink模块,是通过connector接收RestController发来的服务请求,并进行一系列内部交易处理后返回,不涉及第三方应用的调用。

1. pom.xml配置

在pom.xml文件中添加sink-spring-boot-starter依赖:

        <dependency>
            <groupId>com.bizmda.bizsip</groupId>
            <artifactId>sink-spring-boot-starter</artifactId>
          	<version>1.0-SNAPSHOT</version>
        </dependency>

2. application.yml配置

领域层Sink应用的application.yml中的主要相关参数:

配置项 配置说明
bizsip.config-path Biz-SIP中间件配置文件根目录位置
bizsip.sink-id(可选) sink-spring-boot-starter中自带通用SinkController对应的sink-id参数。
只有启用SinkController才会配置该参数

SinkController为通用的Sink处理模块,能接收来自Integrator发来服务,并根据sink.yml中Sink的配置自动进行相应处理,如果没有特殊的处理需求,一般建议直接采用SinkController即可。 要启用SinkController,需要在应用启动类中,加上对应的Spring类扫描路径(“com.bizmda.bizsip.sink.controller”)​:

@SpringBootApplication
@MapperScan("com.xbank.infrastructure.db.customer.mapper")
@ComponentScan(basePackages={"cn.hutool.extra.spring",
        "com.bizmda.bizsip.sink.controller", "com.xbank"})
@Import(cn.hutool.extra.spring.SpringUtil.class)
public class CustomerSinkApplication {
    public static void main(String[] args) {
        SpringApplication.run(CustomerSinkApplication.class, args);
    }
}

例子:

server:
  port: 8001
spring:
  application:
    name: customer-sink

  cloud:
    nacos:
      discovery:
        server-addr: bizsip-nacos:8848

  datasource:
    url: jdbc:mysql://bizsip-mysql/xbank?autoReconnect=true
    username: root
    password: bizsip123456
    driver-class-name: com.mysql.cj.jdbc.Driver

bizsip:
  config-path: /Users/shizhengye/IdeaProjects/xbank/xbank-app/config
  sink-id: customer-sink

logging:
  level:
    com.bizmda.bizsip: debug
    com.xbank: trace

3. 接收app层服务请求

Sink模块是通过RestController,来接收app层发来的服务请求,主要有二种方式:

  • 一是可以直接用sink-spring-boot-starter包中提供的内置RestController,在application启动时扫描com.bizmda.bizsip.sink.controller类,并直接在sink.yml中的url配置“服务名/sink”:
- id: customer-sink
  type: rest
  url: http://customer-sink/sink
  converter:
    type: simple-json
  connector:
    type: bean
    class-name: com.xbank.sink.customer.service.CustomerSinkService
    spring-bean: true

application.yml中应配置bizsip.sink-id属性:

bizsip:
  sink-id: sample-sink-bean-sink

内置的RestController会根据配置的sink-id,自动选择进行消息打包,调用配置的connector,并对返回的消息进行解包并返回app层服务。

  • 二是可以直接写RestController,以获得更灵活的定制:
@RestController
public class Sink2Controller {
    private Converter converter = Converter.getSinkConverter("sink2");
    private Connector connector = Connector.getSinkConnector("sink2");

    @PostMapping(value = "/sink2", consumes = "application/json", produces = "application/json")
    public BizMessage<JSONObject> doService(@RequestBody BizMessage<JSONObject> inMessage, HttpServletResponse response) {
        log.debug("inMessage:{}", inMessage);
        try {
            byte[] packedMessage = this.converter.pack(inMessage.getData());
            byte[] returnMessage = this.connector.process(packedMessage);
            JSONObject jsonObject = this.converter.unpack(returnMessage);
            return BizMessage.buildSuccessMessage(inMessage,jsonObject);
        } catch (BizException e) {
            log.error("服务端适配器执行出错",e);
            return BizMessage.buildFailMessage(inMessage,e);
        }
    }
}

从上面的例子可以看出,在开发的RestController中,可以灵活地进行消息解包、调用配置的connector,最后消息打包返回。

4. sink模块打包和解包

不管采用平台内置的RestControlller,还是开发者自己写的RestController,都会进行报文消息的打包和解包。 下例是开发者自己写的RestController,包含了消息打包和消息解包操作:

@RestController
public class Sink2Controller {
    private Converter converter = Converter.getSinkConverter("sink2");
    private Connector connector = Connector.getSinkConnector("sink2");

    @PostMapping(value = "/sink2", consumes = "application/json", produces = "application/json")
    public BizMessage<JSONObject> doService(@RequestBody BizMessage<JSONObject> inMessage, HttpServletResponse response) {
        log.debug("inMessage:{}", inMessage);
        try {
            // 消息打包
            byte[] packedMessage = this.converter.pack(inMessage.getData());
            // 调用connector处理
            byte[] returnMessage = this.connector.process(packedMessage);
            // 消息解包
            JSONObject jsonObject = this.converter.unpack(returnMessage);
            return BizMessage.buildSuccessMessage(inMessage,jsonObject);
        } catch (BizException e) {
            log.error("服务端适配器执行出错",e);
            return BizMessage.buildFailMessage(inMessage,e);
        }
    }
}

在上例中,converter会根据sink2的消息格式配置,调用接口进行消息打包和解包。当然,应在sink.yml中配置指定sink的converter:

- id: sink2
  type: rest
  url: http://bizsip-sample-sink/sink2
  converter:
    type: simple-json
  connector:
    type: sink-bean
    class-name: com.bizmda.bizsip.sample.sink.controller.ActServer

如果开发者想在connector中进行报文处理,可以配置和实现JSONObjectSinkBeanInterface接口的connector,这样就可以直接把RestController中收到的平台JSON报文,不在RestController中进行消息格式转换处理,直接通过connector的process()传入平台JSON报文进行处理。

5. 支持Java开发的二种connector类型

Sink模块接收RestController发来的服务请求后,能调用connector进行第三方通讯和交易处理,connector类型包括netty、rabbitmq、bean、sink-bean等类型,其中支持Java开发的connector主要有以下二种:

  • bean:需要定义Interface接口类;
  • sink-bean:采用Biz-SIP标准的JSONObject来做入参、出参,交易处理模块需要实现JSONObjectSinkBeanInterface接口。

5.1 bean(Interface接口类交易处理)

采用bean类型connector,接口明确清晰,调用方一目了然,但需要调用双方提前约定Interface接口类。 在交易类型超过5个时,建议采用Command Executor(命令处理程序),分成多个xxxCmdExe类来进行处理,这些类应统一继承AbstractBeanCmdExe类。

@Service
public class AccountSinkService implements AccountSinkInterface {
    @Autowired
    private PayoutCmdExe payoutCmdExe;
    @Autowired
    private  PayoutCompensationCmdExe payoutCompensationCmdExe;
    @Autowired
    private GetAccountListByCustomerIdCmdExe getAccountListByCustomerIdCmdExe;

    @Override
    public List<Account> getAccountListByCustomerId(String customerId) {
        return this.getAccountListByCustomerIdCmdExe.getAccountListByCustomerId(customerId);
    }

    @Override
    public Account payout(String accountId, long amount) {
        return this.payoutCmdExe.payout(accountId,amount);
    }

    @Override
    public Account payoutCompensation(String accountId, long amount) {
        return this.payoutCompensationCmdExe.payoutCompensation(accountId,amount);
    }
}

Command Executor(命令处理程序)如下例:
@Service
public class GetAccountListByCustomerIdCmdExe extends AbstractBeanCmdExe {
    @Autowired
    private AccountService accountService;

    public List<Account> getAccountListByCustomerId(String customerId) {
        QueryWrapper<Account> queryWrapper=new QueryWrapper();
        queryWrapper.eq("customer_id",customerId);
        return this.accountService.list(queryWrapper);
    }
}

5.2 sink-bean(JSONObject出入参交易处理)

采用sink-bean类型connector,接口不用提前约定,扩展性强,但缺点也是很明显,调用双方缺乏接口约定的机制。 在交易类型超过5个时,建议采用Command Executor(命令处理程序),分成多个xxxCmdExe类来进行处理,这些类应统一继承AbstractSinkBeanCmdExe类,并统一在execute()中实现业务逻辑处理。

@Service
public class Payment2SinkService implements JSONObjectSinkBeanInterface {
    @Autowired
    private TimeoutCmdExe timeoutCmdExe;
    @Autowired
    private TimeoutAndFailCmdExe timeoutAndFailCmdExe;
    @Autowired
    private TimeoutAndSuccessCmdExe timeoutAndSuccessCmdExe;
    @Autowired
    private SuccessCmdExe successCmdExe;

    @Override
    public JSONObject process(JSONObject jsonObject) throws BizException {
        log.info("传入消息:\n{}", jsonObject.toString());
        AbstractSinkBeanCmdExe sinkBeanCmdExe;
        String tranMode = (String)jsonObject.get("tranMode");
        switch (tranMode) {
            case "timeout":
                // 收到交易后,永远返回超时
                return timeoutCmdExe.execute(jsonObject);
            case "3timeout-fail":
                // 收到交易后,前3次返回超时,第4次返回失败码
                return timeoutAndFailCmdExe.execute(jsonObject);
            case "3timeout-success":
                // 收到交易后,前3次返回超时,第4次成功返回原报文
                return timeoutAndSuccessCmdExe.execute(jsonObject);
            default:
                //其它情况,成功返回原报文
                return successCmdExe.execute(jsonObject);
        }
    }
}

Command Executor(命令处理程序)如下例:
@Service
public class SuccessCmdExe extends AbstractSinkBeanCmdExe {
    @Override
    public JSONObject execute(JSONObject jsonObject) throws BizException {
        String tranCode = (String)jsonObject.get("tranCode");
        log.info("交易:{},返回交易成功!",tranCode);
        return jsonObject;
    }
}

6. sink-bean的二种接口类型

sink-bean也有二种类型的接口,应根据场景来选择使用。

6.1 基于JSON的JSONObjectSinkBeanInterface接口

采用Biz-SIP标准的JSONObject来做入参、出参,一般不涉及到报文格式转换或报文格式转换由开发者自己编码完成,JSONObjectSinkBeanInterface接口约定如下:

public interface JSONObjectSinkBeanInterface {
    /**
     * JSONObject SinkBean服务调用接口
     * @param packMessage 传入的消息
     * @return 返回值
     * @throws BizException
     */
    public JSONObject process(JSONObject packMessage) throws BizException;
}

例子:
@Service
public class SampleSinkBeanService implements JSONObjectSinkBeanInterface {
    @Override
    public JSONObject process(JSONObject packMessage) throws BizException {
        String message = (String)packMessage.get("message");
        packMessage.set("message","sample-sink-bean-sink: Hello,"+message);
        return packMessage;
    }
}

6.2 基于byte[]的SinkBeanInterface接口

采用字节流byte[]来做入参、出参,一般是在RestController中进行消息类型的转换,把转换后的消息通过字节流byte[]传入,SinkBeanInterface接口约定如下:

public interface SinkBeanInterface {
    /**
     * Java服务调用接口
     * @param packMessage 传入的消息
     * @return 返回值
     * @throws BizException
     */
    public byte[] process(byte[] packMessage) throws BizException;
}

例子:
@Service
public class Payment1SinkService implements SinkBeanInterface {
    @Override
    public byte[] process(byte[] inMessage) throws BizException {
        log.info("传入消息:\n{}", BizUtils.buildHexLog(inMessage));
        return inMessage;
    }
}

附录:命名规则

用途 规范 解释
领域层服务(sink) xxx-sink
应用层服务(service) /sink/xxx 通过sink-service透传到领域层的sink服务
/app/yyy/… 封装的某一app层聚合服务
/delay/xxx/… 封装的延迟服务
适配层服务(source) xxx-source 涉及到消息格式转换配置时,才需要配置Source ID
上一页
下一页