Springcloud+Seata+nacos 分布式事务项目搭建 AT模式

Springcloud+Seata+nacos 分布式事务项目搭建 AT模式

前言 本文要求对Springcloud有一定了解,对分布式事务本身有一定认知,如果不了解微服务,建议先看看Spring Cloud的基本认识和使用Spring Cloud的基本教程,再回头学习本文 为什么会出现分布式事务 开发过程中,随着项目模块的增加以及分库分表的出现,传统事务已经无...

前言

本文要求对Springcloud有一定了解,对分布式事务本身有一定认知,如果不了解微服务,建议先看看Spring Cloud的基本认识和使用Spring Cloud的基本教程,再回头学习本文

为什么会出现分布式事务

开发过程中,随着项目模块的增加以及分库分表的出现,传统事务已经无法满足业务需求,如分库,由于有多个数据源,而数据库事务又是基于数据库层,所以如果只用数据库原生事务,会导致数据库A成功提交,数据库B回滚,导致数据不一致,又比如多模块下,常见的订单流程,订单服务成功提交订单,调用库存服务扣减库存,由于是链式调用,库存成功扣减,然后回到订单服务时,出现异常,导致订单回滚,但是此时库存却未回滚,也会导致数据不一致,所以这些情况都需要分布式事务来解决这个问题(当然一般开发中,我们常用的做法是能避免就尽量避免,实在避免不了才使用分布式事务,因为分布式事务不管怎么样,性能,一致性,原子性等都会收到影响)

分布式事务目前的几种方案

2PC(二阶段提交)

3PC(三阶段提交)

TCC(Try – Confirm – Cancel)

最终一致性(消息队列等方式)

最大努力通知(数据不要求强一致)

每种方案都各有优劣,具体采用何种方案还需要根据实际业务场景来使用

初识Seata

Seata 是一款阿里巴巴开源的分布式事务解决方案,致力于提供高性能和简单易用的分布式事务服务。Seata 将为用户提供了 AT、TCC、SAGA 和 XA 事务模式,为用户打造一站式的分布式解决方案。详情可以去查看seata官方文档

初始Nacos

Nacos 是一款阿里巴巴开源的服务注册中心,配置中心,管理中心,以下是nacos官网简介:

Nacos 致力于帮助您发现、配置和管理微服务。Nacos 提供了一组简单易用的特性集,帮助您快速实现动态服务发现、服务配置、服务元数据及流量管理。

Nacos 帮助您更敏捷和容易地构建、交付和管理微服务平台。 Nacos 是构建以“服务”为中心的现代应用架构 (例如微服务范式、云原生范式) 的服务基础设施。

我们可以把nacos当作Eureka +Srpingcloud config的结合体,详情可以查去查看nacos官方文档

下载并部署Nacos

Nacos跟Eureka 不一样,Eureka是创建一个Eureka 的项目即可,但是Nacos需要专门下载Nacos的服务端,可以从下载地址去下载最新的稳定版,目前编写本文时最新版为2.0.0-ALPHA.2,下载好之后解压,进入bin目录,并根据系统执行相关的脚本启动服务,记得脚本后面加上参数

-m standalone

standalone代表着单机模式运行,非集群模式,由于本文只讲解单机模式,所以不用集群模式,集群模式还要配置其他东西,启动成功后浏览器访问http://localhost:8848/nacos,能看到登录界面,输入默认账号密码nacos(账号密码都是nacos),能进入首页,如下图

nacos图片

下载并部署Seata

下载并配置

从 https://github.com/seata/seata/releases,下载服务器软件包,将其解压缩,本文编写时最新版为1.4.0,进入conf目录,编辑registry.conf,配置Seata的registry和config为nacos模式,同时配置nacos相关配置,首先修改registry(注册中心)节点下的type值为nacos,然后配置register节点下的nacos节点。

旧配置:

  nacos {
    application = "seata-server"
    serverAddr = "127.0.0.1:8848"
    group = "SEATA_GROUP"
    namespace = ""
    cluster = "default"
    username = ""
    password = ""
  }

新配置

  nacos {
    application = "seata-server"
    serverAddr = "127.0.0.1:8848"
    group = "SEATA_GROUP"
    namespace = "public"
    cluster = "default"
    username = "nacos"
    password = "nacos"
  }

application 为Seata-server注册到nacos的应用名称,serverAddr 为nacos的地址,group为注册到nacos的应用分组,要使用seata,必须要seata-server跟其他使用使用seata分布式事务的应用在同一分组才行,namespace 为命名空间,我们使用默认的public,当然也可以创建其他命名空间来使用,username和password对应的事nacos的账号密码

修改config(配置中心)节点下type为nacos,同时修改config节点下的nacos节点

修改前配置

nacos {
    serverAddr = "127.0.0.1:8848"
    namespace = ""
    group = "SEATA_GROUP"
    username = ""
    password = ""
  }

修改后

  nacos {
    serverAddr = "127.0.0.1:8848"
    namespace = "public"
    group = "SEATA_GROUP"
    username = "nacos"
    password = "nacos"
  }

添加Seata配置
进入nacos里面,添加一个事务分组的配置(常规来说要添加很多配置,不过只要添加了事务分组配置,就可以满足基本运行要求)

点击添加按钮新增一个配置

添加按钮

添加如下配置:

添加配置

其中配置名称

service.vgroupMapping.my_test_tx_group

中的my_test_tx_group是我们自定义的名称,到时候需要配置项目中,可以根据实际情况配置,Group一定要跟之前Seata中配置的group对应上,内容default要跟之前Seata中的register节点下的cluster对应上,点击发布保存配置

完整Seata配置(此步骤非必须)

以上配置只是实现分布式事务的最基本的配置,==如果想要配置所有支持的参数配置==,可以从github下载配置文件config.txt,或者创建config.txt文件并复制写入下面代码(下面代码不是实时更新,所以最好还是去github下载)

transport.type=TCP
transport.server=NIO
transport.heartbeat=true
transport.enableClientBatchSendRequest=false
transport.threadFactory.bossThreadPrefix=NettyBoss
transport.threadFactory.workerThreadPrefix=NettyServerNIOWorker
transport.threadFactory.serverExecutorThreadPrefix=NettyServerBizHandler
transport.threadFactory.shareBossWorker=false
transport.threadFactory.clientSelectorThreadPrefix=NettyClientSelector
transport.threadFactory.clientSelectorThreadSize=1
transport.threadFactory.clientWorkerThreadPrefix=NettyClientWorkerThread
transport.threadFactory.bossThreadSize=1
transport.threadFactory.workerThreadSize=default
transport.shutdown.wait=3
service.vgroupMapping.my_test_tx_group=default
service.default.grouplist=127.0.0.1:8091
service.enableDegrade=false
service.disableGlobalTransaction=false
client.rm.asyncCommitBufferLimit=10000
client.rm.lock.retryInterval=10
client.rm.lock.retryTimes=30
client.rm.lock.retryPolicyBranchRollbackOnConflict=true
client.rm.reportRetryCount=5
client.rm.tableMetaCheckEnable=false
client.rm.tableMetaCheckerInterval=60000
client.rm.sqlParserType=druid
client.rm.reportSuccessEnable=false
client.rm.sagaBranchRegisterEnable=false
client.tm.commitRetryCount=5
client.tm.rollbackRetryCount=5
client.tm.defaultGlobalTransactionTimeout=60000
client.tm.degradeCheck=false
client.tm.degradeCheckAllowTimes=10
client.tm.degradeCheckPeriod=2000
store.mode=file
store.publicKey=
store.file.dir=file_store/data
store.file.maxBranchSessionSize=16384
store.file.maxGlobalSessionSize=512
store.file.fileWriteBufferCacheSize=16384
store.file.flushDiskMode=async
store.file.sessionReloadReadSize=100
store.db.datasource=druid
store.db.dbType=mysql
store.db.driverClassName=com.mysql.jdbc.Driver
store.db.url=jdbc:mysql://127.0.0.1:3306/seata?useUnicode=true&rewriteBatchedStatements=true
store.db.user=username
store.db.password=password
store.db.minConn=5
store.db.maxConn=30
store.db.globalTable=global_table
store.db.branchTable=branch_table
store.db.queryLimit=100
store.db.lockTable=lock_table
store.db.maxWait=5000
store.redis.mode=single
store.redis.single.host=127.0.0.1
store.redis.single.port=6379
store.redis.maxConn=10
store.redis.minConn=1
store.redis.maxTotal=100
store.redis.database=0
store.redis.password=
store.redis.queryLimit=100
server.recovery.committingRetryPeriod=1000
server.recovery.asynCommittingRetryPeriod=1000
server.recovery.rollbackingRetryPeriod=1000
server.recovery.timeoutRetryPeriod=1000
server.maxCommitRetryTimeout=-1
server.maxRollbackRetryTimeout=-1
server.rollbackRetryTimeoutUnlockEnable=false
client.undo.dataValidation=true
client.undo.logSerialization=jackson
client.undo.onlyCareUpdateColumns=true
server.undo.logSaveDays=7
server.undo.logDeletePeriod=86400000
client.undo.logTable=undo_log
client.undo.compress.enable=true
client.undo.compress.type=zip
client.undo.compress.threshold=64k
log.exceptionRate=100
transport.serialization=seata
transport.compressor=none
metrics.enabled=false
metrics.registryType=compact
metrics.exporterList=prometheus
metrics.exporterPrometheusPort=9898

以上配置含义建议百度或者查看官方文档确定每个配置的含义,创建好配置文件后,需要导入到nacos中,目前官方支持2种脚本导入方式,一种是python,另外一种是linux脚本,可以去github下载或者直接复制自己创建,下载地址如下:python导入脚本和 linux导入脚本

其中linux导入脚本命令如下

nacos-config.sh -h 127.0.0.1 -p 8848  -g SEATA_GROUP -u nacos -w nacos

nacos-config.sh为脚本名字,如果有修改需要修改成对应的名称,-h后面是nacos的服务器地址,-p为nacos的端口地址,-u后面为nacos的用户名,-w后面为nacos的用户名密码,-g后面为配置的分组,也就是我们之前手动创建配置时填的Group对应的值。

注意:分组必须要在同一个分组内,也就是说Seata配置的分组和需要分布式事务的程序的分组以及Seata-server的分组必须要在同一个分组内

以上是完整配置的,但是如果只是想实现基本的分布式事务,可以忽略上面一步

创建表

首先Seata要处理事务,需要创建一个Seata事务的日志表(UNDO_LOG),==所有库里面都需要这个表(如果每个项目连接的库不一致,那么每个项目连接的库中则都需要此表)==

CREATE TABLE `undo_log` (
  `id` bigint(20) NOT NULL AUTO_INCREMENT,
  `branch_id` bigint(20) NOT NULL,
  `xid` varchar(100) NOT NULL,
  `context` varchar(128) NOT NULL,
  `rollback_info` longblob NOT NULL,
  `log_status` int(11) NOT NULL,
  `log_created` datetime NOT NULL,
  `log_modified` datetime NOT NULL,
  `ext` varchar(100) DEFAULT NULL,
  PRIMARY KEY (`id`),
  UNIQUE KEY `ux_undo_log` (`xid`,`branch_id`)
) ENGINE=InnoDB AUTO_INCREMENT=1 DEFAULT CHARSET=utf8;

其次创建我们的业务表,这里为了方便演示,我创建2个非常简单的表,一个用户表,一个订单表,我们保存用户时,就调用订单服务保存一个订单,当用户id为偶数时,用户服务抛异常,触发回滚(表结构非常简陋且不符合逻辑,只为演示作)

CREATE TABLE `user` (
  `id` int NOT NULL AUTO_INCREMENT,
  `name` varchar(32) NOT NULL,
  `passwords` varchar(32) NOT NULL,
  PRIMARY KEY (`id`)
) ENGINE=InnoDB AUTO_INCREMENT=8 DEFAULT CHARSET=utf8;
CREATE TABLE `trade` (
  `id` int NOT NULL AUTO_INCREMENT,
  `userid` int NOT NULL,
  `value` int NOT NULL,
  PRIMARY KEY (`id`)
) ENGINE=InnoDB AUTO_INCREMENT=9 DEFAULT CHARSET=utf8;

创建项目

创建一个user和trade的maven项目,其中pom依赖一致

   1.8Hoxton.SR92.2.3.RELEASE1.1.02.2.0.RELEASEorg.springframework.bootspring-boot-starter-weborg.springframework.bootspring-boot-starter-jdbcorg.springframework.bootspring-boot-starter-aoporg.springframework.bootspring-boot-starter-actuatorcom.alibabadruid-spring-boot-starter1.1.10org.mybatis.spring.bootmybatis-spring-boot-starter1.3.2mysqlmysql-connector-java8.0.15com.alibaba.cloudspring-cloud-starter-alibaba-nacos-discovery${spring.cloud.alibaba.version}org.springframework.cloudspring-cloud-starter-openfeigncom.alibaba.cloudspring-cloud-alibaba-seata${spring-cloud-alibaba-seata.version}io.seataseata-spring-boot-starter1.4.0org.springframework.cloudspring-cloud-dependencies${spring-cloud.version}pomimportcom.alibaba.cloudspring-cloud-alibaba-dependencies${spring.cloud.alibaba.version}pomimport

配置trade项目application.yml


server:
  port: 8083
spring:
  application:
    name: trade
  datasource:
    druid:
      url: jdbc:mysql://localhost/test?useUnicode=true&characterEncoding=UTF-8&zeroDateTimeBehavior=convertToNull&useSSL=false&serverTimezone=CTT&allowPublicKeyRetrieval=true
      username: root
      password: rayewang.
      driver-class-name: com.mysql.cj.jdbc.Driver

      max-active: 20
      initial-size: 1
      min-idle: 3
      max-wait: 60000
      pool-prepared-statements: true
      test-while-idle: true
      time-between-eviction-runs-millis: 60000
      min-evictable-idle-time-millis: 300000
      filters: stat,wall,stat,slf4j,default
      web-stat-filter:
        enabled: true
        url-pattern: /*
        exclusions: "*.js,*.gif,*.jpg,*.bmp,*.png,*.css,*.ico,/druid/*"
      stat-view-servlet:
        enabled: true
        url-pattern: /druid/*
        reset-enable: true
        login-username: Raye
        login-password: 123456
      filter:
        slf4j:
          enabled: true
          statement-create-after-log-enabled: false
          statement-log-enabled: false
          statement-executable-sql-log-enable: true
          statement-log-error-enabled: true
          result-set-log-enabled: false
  servlet:
    multipart:
      max-file-size: 10MB
      max-request-size: 100MB
  cloud:
    nacos:
      discovery:
        server-addr: 127.0.0.1:8848
        group: SEATA_GROUP
feign:
  client:
    config:
      default:
        connect-timeout: 10000 #单位毫秒
        read-timeout : 10000 #单位毫秒
  okhttp:
    enabled: true
  httpclient:
    enabled: true
debug: false
logging:
  level:
    druid:
      sql:
        Statement: DEBUG
nacos:
  group: SEATA_GROUP
  namespace: public
  # 配置中心地址
  server-addr: 127.0.0.1:8848
  seata:
    application: seata-server
    tx-service-group: my_test_tx_group

seata:
  enabled: true
  application-id: ${spring.application.name}
  tx-service-group: ${nacos.seata.tx-service-group}
  enable-auto-data-source-proxy: true
  config:
    # 指明类型
    type: nacos
    nacos:
      server-addr: ${nacos.server-addr}
      namespace: ${nacos.namespace}
      group: ${nacos.group}
      username: "nacos"
      password: "nacos"
  registry:
    type: nacos
    nacos:
      application: ${nacos.seata.application}
      server-addr: ${nacos.server-addr}
      namespace: ${nacos.namespace}
      group: ${nacos.group}
      username: "nacos"
      password: "nacos"

其中分布式事务核心配置为nacos节点和seata节点的内容,seata节点内容配置大致跟seata-server的配置一致,其中tx-service-group的值就是我们之前手动添加的配置的节点最后一个名称,user项目的application.yml跟trade基本一致,除了spring.application.name和端口不同

Springcloud的核心配置为spring.cloud节点下的内容,主要是配置nacos为注册发现中心以及应用的分组,要跟Seata配置的分组一致

业务流程

项目创建好之后,我们先梳理整体流程,首先调用user项目的服务保存用户,然后在保存用户服务中开启分布式事务,保存用户后调用trade服务保存订单,订单保存完毕之后,判断用户的id是否是偶数,如果是偶数则抛出一个异常,看保存的用户和订单信息是否正常回滚,如果是奇数则看数据是否正常保存

编写trade项目相关代码

以下只列出逻辑相关代码

TradeMapper.java

package wang.raye.nacos;

import org.apache.ibatis.annotations.Insert;
public interface TradeMapper {

    @Insert({"insert into trade(userid,`value`) values(#{userid},#{value})"})
    int insertTrade(int userid,int value);
}

TradeService.java

package wang.raye.nacos;

import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
import org.springframework.transaction.annotation.Transactional;

@Service
public class TradeService {

    @Autowired
    private TradeMapper mapper;

    @Transactional(rollbackFor = Exception.class)
    public boolean saveTrade(int userid,int value){
        return mapper.insertTrade(userid,value) > 0;
    }
}

TradeController.java

package wang.raye.nacos;


import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;
@RestController
public class TradeController {

    @Autowired
    private TradeService tradeService;

    @RequestMapping("trade/save")
    public boolean save(int userid,int value){
        return tradeService.saveTrade(userid,value);
    }
}

编写user相关代码

UserMapper.java

package wang.raye.nacos;

import org.apache.ibatis.annotations.Insert;

public interface UserMapper {

    @Insert({"insert user(id,name,passwords) values(#{id},#{name},#{passwords})"})
    int insert(int id,String name,String passwords);
}

UserService.java

package wang.raye.nacos;

import io.seata.spring.annotation.GlobalTransactional;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
import org.springframework.transaction.annotation.Transactional;

@Service
public class UserService {
    @Autowired
    private TradeService service;
    @Autowired
    private UserMapper mapper;

    @GlobalTransactional
    @Transactional(rollbackFor = Exception.class)
    public boolean save(int userid,String name,String passwords,int value){
        mapper.insert(userid,name,passwords);
        service.save(userid,value);
        if(userid % 2 == 0){
            throw new RuntimeException("不给保存双数id");
        }
        return true;
    }
}

其中这里主要用注解@GlobalTransactional来开启分布式事务,代码

if(userid % 2 == 0){
    throw new RuntimeException("不给保存双数id");
}

只是为了模拟一下发生异常的情况,看是否会正常回滚,TradeService为Springcloud调用trade项目的

TradeService.java

package wang.raye.nacos;

import org.springframework.cloud.openfeign.FeignClient;
import org.springframework.web.bind.annotation.PostMapping;
import org.springframework.web.bind.annotation.RequestParam;

@FeignClient("trade")
public interface TradeService {

    @RequestMapping("trade/save")
    boolean save(@RequestParam("userid") int userid,@RequestParam("value") int value);
}

UserController.java

package wang.raye.nacos;

import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.PathVariable;
import org.springframework.web.bind.annotation.RestController;

@RestController
public class UserController {
    @Autowired
    private UserService service;

    @GetMapping("save")
    public String save(int id,String name,String passwords,int value){
        try {
            if (service.save(id, name, passwords, value)) {
                return "保存成功";
            }
            return "保存失败";
        }catch (Exception e){
            return e.getMessage();
        }
    }
}

测试

至此业务代码已经完成,跟没用分布式事务的代码只有一个注解的差异,所以应该还是很好理解的,启动2个项目,我们来测试一下,访问http://localhost:8082/save?id=16&name=raye&passwords=1234566&value=100(其中8082为user项目的端口),会提示不给保存双数id,并且保存的订单数据也会被删除掉,可以跟踪执行的sql,发现trade是先插入,然后再删除的,而用户是插入后回滚的,利用了本地事务,而将id改成奇数,发现能正常保存,说明AT模式的分布式事务已经搭建成功

0

评论0

鱼翔浅底,鹰击长空,驼走大漠
没有账号? 注册  忘记密码?