在 Golang 中实现类似于 Spring 中的模板事务

事务(TRANSACTION),是指一组操作的集合,这些操作要么全部成功,要么全部失败。其目的是在出现错误、系统崩溃或其他意外情况下,保证数据的一致性和完整性。

事务通常具有以下四个重要的特性,这些特性被统称为 ACID 属性:

  1. Atomicity(原子性):
    • 定义: 事务中的所有操作要么全部完成,要么全部不完成,任何一个操作失败都会导致整个事务的失败,并且事务的所有操作都会被回滚(撤销)。
    • 示例: 银行转账操作,如果从一个账户扣款后无法在另一个账户中存款,那么整个操作将回滚,不会执行任何更改。
  2. Consistency(一致性):
    • 定义: 事务只能把数据库从一种一致状态转换到另一种一致状态。在事务开始之前和结束之后,数据库的完整性约束没有被破坏。
    • 示例: 在一个事务中插入数据时,如果插入的数据违反了数据库的完整性约束(例如唯一约束),那么这个事务将失败,数据库将保持一致状态。
  3. Isolation(隔离性):
    • 定义: 事务的执行是隔离的,多个事务并发执行时,一个事务的执行不会受到其他事务的干扰。隔离性确保了并发事务的执行结果与按顺序执行的结果相同。
    • 示例: 两个用户同时购买同一件商品,隔离性确保每个用户看到的库存是正确的,避免超卖的情况。
  4. Durability(持久性):
    • 定义: 一旦事务提交,其结果将永久保存在数据库中,即使系统崩溃也不会丢失。
    • 示例: 即使在事务提交后立即发生系统崩溃,事务的结果也会保存在数据库中,重启系统后数据依然存在。

MYSQL 关系型数据库为例,事务的使用如下:

-- 开始事务
BEGIN TRANSACTION;

-- TODO 执行业务 1
-- TODO 执行业务 2
-- TODO 执行业务 3
-- ....

-- 提交事务
COMMIT;

-- 或者,回滚事务
ROLLBACK;

其中,BEGIN TRANSACTIONCOMMIT 以及 ROLLBACK 都是事务固定的模板代码,当代的大多数框架都会自动帮我们进行处理。

Spring 对事务的支持

Spring 对关系型数据库中的事务提供强大的支持,包括声明式事务、TransactionTemplate 模板事务等等。

@Transactional 声明式事务

实际开发中,最常用的就是通过 @Transactional 注解来声明事务方法。事务方法会在执行开始前自动开始事务,在方法结束后自动提交事务,在执行过程中如果遇到异常则自动回滚事务。

import org.springframework.stereotype.Service;
import org.springframework.transaction.annotation.Transactional;

@Service
@Transactional // 为 public 方法开启事务
public class FooService {

    public void service () {
        // TODO 在事务中执行业务
    }

}

在类上注解 @Transactional,则会为当前类中所有 public 方法开启声明式事务。也可以单独注解在方法上,则会覆盖类上的 @Transactional 定义。

@Transactional 注解中还定义了其他的属性,例如:propagation 属性可以指定事务在多个方法之间的传播方式,isolation 属性则可以控制事务的隔离级别。更多的细节你可以参阅 “Spring Boot 应用中的事务处理

通过 @Transactional 注解,可以把我们从事务的模板代码中解脱出来,专注于业务逻辑。

TransactionTemplate 事务模板

Spring 还提供了一个编程式事务模板类,TransactionTemplate,用于在代码中显式地控制事务边界。

通过该模板类,我们可以在任何需要执行事务的地方执行事务方法,其核心的两个方法签名如下:

@Nullable
<T> T execute(TransactionCallback<T> action) throws TransactionException {}
void executeWithoutResult(Consumer<TransactionStatus> action) throws TransactionException;

如上,两个方法都是通过 回调 的方式来执行业务方法的。其中 execute 方法需要事务返回一个最终的结果,而 executeWithoutResult 方法则不需要任何返回值。

使用示例如下:

import org.springframework.stereotype.Service;
import org.springframework.transaction.support.TransactionTemplate;

@Service
public class FooService {

    private final TransactionTemplate transactionTemplate;

    // 构造函数注入 TransactionTemplate
    public FooService(TransactionTemplate transactionTemplate) {
        super();
        this.transactionTemplate = transactionTemplate;
    }


    public void service () {
        
        // 开始执行事务方法
        this.transactionTemplate.execute((status) -> {
            // TODO 在事务中执行业务,返回结果
            return "Ok";
        });
        

        // 开始执行事务方法
        this.transactionTemplate.executeWithoutResult(status -> {
            // TODO 在事务中执行业务,不需要返回结果
        });

        // TODO 其他未在事务中的业务 ...
    }
}

与声明式注解事务一样,TransactionTemplate 也会自动帮我们开启事务、提交事务,在遇到异常时自动回滚事务,并且更加灵活。

Spring 事务总结

Spring 的事务支持主要是解决了两个问题。

  1. 事务模板,通过 AOP 来自动处理事务的开启、提交和回滚,我们可以专心于业务逻辑。
  2. 事务传播,通过 ThreadLocal 来绑定当前线程的事务,因此,即使是在多个事务方法相互嵌套调用的情况下,仍然可以保证使用的是同一个事务。

Golang 实现事务模板

Golang 这些年火得一塌糊涂,它以简单、高性能著称。

在 Golang 中使用事务

Golang 为关系型数据库也提供了一流的支持,要在 Golang 中使用事务也比较简单:

package main

import (
    "context"
    "database/sql"
    "errors"
    "fmt"
    _ "github.com/go-sql-driver/mysql" // 匿名导入数据库驱动
    "log/slog"
)

func main() {
    err := serviceDemo()
    if err != nil {
        slog.Error("service err:", slog.String("err", err.Error()))
    }
}

func serviceDemo() (err error) {
    // 打开连接
    db, err := sql.Open("mysql", fmt.Sprintf("%s:%s@tcp(%s:%d)/%s?charset=utf8mb4&parseTime=True&loc=Local", "root", "root", "localhost", 3306, "demo"))
    if err != nil {
        return
    }

    // 测试连接可用性
    if err = db.Ping(); err != nil {
        return
    }

    // 1,开启事务
    tx, err := db.BeginTx(context.Background(), &sql.TxOptions{
        Isolation: sql.LevelDefault, // 使用默认的隔离级别
        ReadOnly:  false,            // 非只读事务
    })
    if err != nil {
        return
    }
    // 4,方法结束后,始终回滚事务。在事务已提交的情况下,无任何影响。
    defer func(t *sql.Tx) {
        err = t.Rollback()
        if errors.Is(err, sql.ErrTxDone) {
            err = nil
        }
    }(tx)

    // 2,执行业务,更改数据
    result, err := tx.Exec("UPDATE `test` SET `title` = ? WHERE id = ?;", "springdoc.cn", 1)
    if err != nil {
        return
    }
    rowsAffected, err := result.RowsAffected()
    if err != nil {
        return
    }
    slog.Info("Exec Success", slog.Int64("RowsAffected", rowsAffected))

    // TODO 执行更多的业务

    // 3,业务执行完毕,提交事务
    err = tx.Commit()
    return
}

同样地,在 Golang 中使用事务也遵循如下步骤:

  1. 开启事务,获取到 sql.Tx 事务对象。
  2. 使用事务对象来执行 N 个业务。
  3. 在没有错误的情况下,提交事务。
  4. 如果出现错误,则回滚事务。

本身事务相关的模板代码就比较烦琐,更加上 Golang 饱受诟病的异常处理方式,使得事务代码看起来更加的复杂,我们急需实现一种类似于 Spring 中的事务处理方式,重点是要实现两个目标:

  1. 自动地处理模板代码,自动开启事务、提交事务、回滚事务。
  2. 可以在多个事务方法之间传播事务,保证多个方法都在同一个事务之中。

实现事务模板

如上所述,Spring 使用 AOP + ThreadLocal 为事务提供了强大的支持。而,Golang 中没有 AOP 这种东西,而且 Golang 采用了 Goroutine(协程)的并发模型,没有线程,所以更不可能有 ThreadLocal 这种概念。

但是,我们可以借鉴 Spring TransactionTemplate 的思想,通过回调方法来执行事务方法。统一地开始、提交事务,在事务方法返回的 errornil 时回滚事务,并通过 context.Context 上下文在多个事务方法之间传播事务。

创建 transaction 模块

transaction 包作为事务入口,自动控制事务的开启、提交和回滚,并且允许事务在多个事务方法之间传播!

package transaction

import (
    "context"
    "database/sql"
    "errors"
    "fmt"
    "log/slog"
)

const (
    // 在 Context 中存储 sql.Tx 的 Key
    ctxKeyTx = "__current_tx"
)

// DB 数据源
var DB *sql.DB

// Option 事务配置方法
type Option func(*sql.TxOptions)

// TxReadOnly 开启只读事务
var TxReadOnly = func(options *sql.TxOptions) {
    options.ReadOnly = true
}

// TxIsolationLevel 设置隔离级别
var TxIsolationLevel = func(level sql.IsolationLevel) Option {
    return func(options *sql.TxOptions) {
        options.Isolation = level
    }
}

// Execute 执行事务方法
func Execute[R any](serviceCall func(ctx context.Context) (R, error), options ...Option) (ret R, err error) {
    // 从数据库获取连接
    conn, err := DB.Conn(context.Background())
    slog.Info("[Transaction] Init DB Conn")
    if err != nil {
        return
    }
    defer func() {
        if r := recover(); r != nil {
            _ = conn.Close()
            panic(r)
        }

        // 最后关闭数据库连接
        if closeErr := conn.Close(); closeErr != nil {
            slog.Error("[Transaction] Close DB Conn Err", slog.String("err", closeErr.Error()))
            if err == nil {
                err = closeErr
            } else {
                err = errors.Join(closeErr, err)
            }
        }
    }()

    // 事务配置
    txOption := &sql.TxOptions{}
    for _, option := range options {
        option(txOption)
    }

    // 开启事务
    tx, err := conn.BeginTx(context.Background(), txOption)
    if err != nil {
        return
    }

    // 执行事务方法
    return run(serviceCall, tx)
}

// run 在事务中执行逻辑,自动提交。err != nil 或者 panic 自动回滚
func run[R any](service func(ctx context.Context) (R, error), tx *sql.Tx) (ret R, err error) {
    defer func() {
        if r := recover(); r != nil {
            _ = rollback(tx)
            panic(r)
        }

        if err != nil {
            // 回滚事务
            slog.Info("[Transaction] Rollback Tx")
            if rollbackErr := rollback(tx); rollbackErr != nil {
                err = errors.Join(rollbackErr, err)
            }
        } else {
            // 提交事务
            slog.Info("[Transaction] Commit Tx")
            if commitErr := commit(tx); commitErr != nil {
                err = commitErr
            }
        }
    }()

    // 存储事务到 context 中,执行业务
    ret, err = service(context.WithValue(context.Background(), ctxKeyTx, tx))

    return
}

// Tx 获取当前 Context 中绑定的事务对象
func Tx(ctx context.Context) *sql.Tx {
    ret := ctx.Value(ctxKeyTx)
    if ret == nil {
        panic(errors.New("context 中没有事务,请在事务上下文中调用"))
    }
    tx, ok := ret.(*sql.Tx)
    if !ok {
        panic(errors.New(fmt.Sprintf("%v 不是合法的 *sql.Tx 对象", ret)))
    }
    slog.Info("[Transaction] Get Concurrent Tx")
    return tx
}

// rollback 回滚事务
func rollback(tx *sql.Tx) error {
    err := tx.Rollback()
    if err != nil {
        if errors.Is(err, sql.ErrConnDone) {
            return nil
        }
        slog.Error("[Transaction] Rollback Tx Err", slog.String("err", err.Error()))
    }
    return err
}

// commit 提交事务
func commit(tx *sql.Tx) error {
    err := tx.Commit()
    if err != nil {
        slog.Error("[Transaction] Commit Tx Err", slog.String("err", err.Error()))
    }
    return err
}

如上,只有 100 多行代码,很简单。核心方法 Execute 的逻辑如下:

  1. transaction 模块的 Execute 方法的第一个参数要求传入一个回调方法(serviceCall),且回调方法的返回值,即最终 Execute 方法的返回值。
  2. 回调方法有一个 ctx 参数,两个返回值,第一个返回值是 R(泛型),即需要返回的业务数据,第二个返回值是 error,表示异常。
  3. 执行 Execute 方法,首先从 DB 数据源中获取一个连接,开启事务,并且根据可选的 Option 参数对事务进行配置。
  4. 开启事务后,把事务对象(sql.Tx)存储到 context.Context 中,传递给回调方法,执行回调。
  5. 回调方法执行时,就可以从 ctx 参数中获取到绑定的事务对象,并且可以传递给任何需要执行事务的方法。

整个 Execute 方法就是事务的边界,它会自动开启事务、提交事务、回滚事务。所有嵌套执行的事务方法,都应该调用 transaction 模块的 Tx 方法来获取与 context.Context 绑定的事务对象。

创建 service 模块

现在,创建一个简单的 service 业务模块进行演示:

package service

import (
    "app/transaction"
    "context"
    "log/slog"
)

type foo struct{}

var Foo = &foo{}

// Update 更新
func (f *foo) Update(ctx context.Context, id int64, title string) (err error) {
    // 从上下文获取到 Tx,执行业务
    result, err := transaction.Tx(ctx).Exec("UPDATE `test` SET `title` = ? WHERE id = ?;", title, id)
    if err != nil {
        return
    }
    rowsAffected, err := result.RowsAffected()
    if err == nil {
        slog.Info("Update Success", slog.Int64("RowsAffected", rowsAffected))
    }
    return
}

// Get 检索
func (f *foo) Get(ctx context.Context, id int64) (ret string, err error) {
    // 从上下文获取到 Tx,执行业务
    err = transaction.Tx(ctx).QueryRow("SELECT `title` FROM `test` WHERE id = ?;", id).Scan(&ret)
    return
}

// UpdateAndGet 先更新,在获取
func (f *foo) UpdateAndGet(ctx context.Context, id int64, title string) (ret string, err error) {
    // 先 Update
    err = f.Update(ctx, id, title)
    if err != nil {
        return
    }
    // 再检索
    return f.Get(ctx, id)
}

如上, Foo 有三个方法:

  1. Update,根据 id 更新标题 title,返回 error
  2. Get,根据 id 检索标题,返回 title 值和 error
  3. UpdateAndGet,先更新再检索。它先调用 Update 方法进行更新,然后调用 Get 方法获取更新后的结果并返回。

其中,UpdateGet 中凡是涉及到了数据库相关的操作,都是调用 transaction.Tx(ctx) 方法来获取到事务对象(sql.Tx),从而保证了这些方法之间的调用都是在同一个事务中。

测试

创建一个 main 函数进行测试。

调用多个事务方法

package main

import (
    "app/service"
    "app/transaction"
    "context"
    "database/sql"
    "fmt"
    _ "github.com/go-sql-driver/mysql" // 匿名导入数据库驱动
    "log/slog"
    "os"
)

func init() {
    // 日志配置,输出到 stdout
    slog.SetDefault(slog.New(slog.NewTextHandler(os.Stdout, &slog.HandlerOptions{
        AddSource: true, // 输出调用栈
        Level: slog.LevelDebug,
    })))

    // 初始化数据库
    db, err := sql.Open("mysql", fmt.Sprintf("%s:%s@tcp(%s:%d)/%s?charset=utf8mb4&parseTime=True&loc=Local", "root", "root", "localhost", 3306, "demo"))
    if err != nil {
        panic(err.Error())
    }

    // 测试连接可用性
    if err = db.Ping(); err != nil {
        panic(err.Error())
    }

    // 初始化模板的数据源
    transaction.DB = db
}

func main() {

    id := int64(1)

    ret, err := transaction.Execute(func(ctx context.Context) (ret string, err error) {
        // 调用第一个事务方法,执行更新
        err = service.Foo.Update(ctx, id, "Hi springdoc.cn")
        if err != nil {
            return
        }

        // 调用第二个事务方法,执行查询,并且返回结果
        return service.Foo.Get(ctx, id)
    }, transaction.TxIsolationLevel(sql.LevelReadCommitted)) // 设置隔离级别为读已提交

    if err != nil {
        slog.Error("Execute transaction error", slog.String("err", err.Error()))
        return
    }

    slog.Info("Update Success", slog.String("title", ret))
}

如上,首先在 init 方法中初始化数据源,和日志配置。然后在 main 方法中调用 transaction.Execute 方法,传递回调,在回调中调用多个业务方法,并且这些方法使用的都是同一个事务。

输出的日志如下,Execute 方法自动开启、提交了事务,两个业务方法都在同一个事务中,且最终返回的结果也符合预期:

time=2024-06-25T16:48:09.355+08:00 level=INFO source=D:/golang-app/app/transaction/transaction.go:38 msg="[Transaction] Init DB Conn"
time=2024-06-25T16:48:09.371+08:00 level=INFO source=D:/golang-app/app/transaction/transaction.go:113 msg="[Transaction] Get Concurrent Tx"
time=2024-06-25T16:48:09.371+08:00 level=INFO source=D:/golang-app/app/service/foo.go:22 msg="Update Success" RowsAffected=0
time=2024-06-25T16:48:09.371+08:00 level=INFO source=D:/golang-app/app/transaction/transaction.go:113 msg="[Transaction] Get Concurrent Tx"
time=2024-06-25T16:48:09.372+08:00 level=INFO source=D:/golang-app/app/transaction/transaction.go:91 msg="[Transaction] Commit Tx"
time=2024-06-25T16:48:09.372+08:00 level=INFO source=D:/golang-app/app/main.go:56 msg="Update Success" title="Hi springdoc.cn"

嵌套调用事务方法

init 方法不变,main 方法如下。

func main() {

    id := int64(1)

    ret, err := transaction.Execute(func(ctx context.Context) (ret string, err error) {
        // 调用 UpdateAndGet 方法,先更新,再检索
        return service.Foo.UpdateAndGet(ctx, id, "Hello springdoc.cn")
    }, transaction.TxIsolationLevel(sql.LevelReadCommitted)) // 设置隔离级别为读已提交

    if err != nil {
        slog.Error("Execute transaction error", slog.String("err", err.Error()))
        return
    }

    slog.Info("Update Success", slog.String("title", ret))
}

在事务方法中,调用 UpdateAndGet 方法,它会在内部嵌套调用 UpdateGet 方法。

执行测试,输出日志如下,没任何问题:

time=2024-06-25T16:50:37.896+08:00 level=INFO source=D:/golang-app/app/transaction/transaction.go:38 msg="[Transaction] Init DB Conn"
time=2024-06-25T16:50:37.910+08:00 level=INFO source=D:/golang-app/app/transaction/transaction.go:113 msg="[Transaction] Get Concurrent Tx"
time=2024-06-25T16:50:37.911+08:00 level=INFO source=D:/golang-app/app/service/foo.go:22 msg="Update Success" RowsAffected=1
time=2024-06-25T16:50:37.911+08:00 level=INFO source=D:/golang-app/app/transaction/transaction.go:113 msg="[Transaction] Get Concurrent Tx"
time=2024-06-25T16:50:37.912+08:00 level=INFO source=D:/golang-app/app/transaction/transaction.go:91 msg="[Transaction] Commit Tx"
time=2024-06-25T16:50:37.917+08:00 level=INFO source=D:/golang-app/app/main.go:50 msg="Update Success" title="Hello springdoc.cn"

异常的情况

最后来测试异常情况下,事务会不会自动地回滚。

在上个例子基础上进行小小的修改。通过 Execute 方法的第二个参数设置事务为只读,由于 UpdateAndGet 方法会调用 Update 方法执行 UPDATE 语句,这在只读事务中会导致异常。

func main() {

    id := int64(1)

    ret, err := transaction.Execute(func(ctx context.Context) (ret string, err error) {
        // 调用 UpdateAndGet 方法,先更新,再检索
        return service.Foo.UpdateAndGet(ctx, id, "Hello springdoc.cn")
    }, transaction.TxReadOnly) // 设置事务为只读

    if err != nil {
        slog.Error("Execute transaction error", slog.String("err", err.Error()))
        return
    }
    slog.Info("Update Success", slog.String("title", ret))
}

执行测试,输出的日志如下:

time=2024-06-25T16:56:33.356+08:00 level=INFO source=D:/golang-app/app/transaction/transaction.go:38 msg="[Transaction] Init DB Conn"
time=2024-06-25T16:56:33.372+08:00 level=INFO source=D:/golang-app/app/transaction/transaction.go:113 msg="[Transaction] Get Concurrent Tx"
time=2024-06-25T16:56:33.372+08:00 level=INFO source=D:/golang-app/app/transaction/transaction.go:85 msg="[Transaction] Rollback Tx"
time=2024-06-25T16:56:33.372+08:00 level=ERROR source=D:/golang-app/app/main.go:46 msg="Execute transaction error" err="Error 1792 (25006): Cannot execute statement in a READ ONLY transaction."

通过日志可以看到 Execute 自动地回滚事务,并且返回了原始的异常。

总结

通过一些简单的抽象,也可以在 Golang 中实现类似于 Spring 中的模板事务,这种方式可以减少事务相关的模板代码,并且还能保证多个业务方法在同一个事务中。

本文中使用的是原始的 sql 库,实际应用中大多数项目都会使用 ORM 框架。目前 Golang 流行的 ORM 框架都(应该)有 Session 的概念,理解了 transaction 模块中的设计思想后,只需要稍作修改就可以适配任何 ORM 框架了。