在 Golang 中实现类似于 Spring 中的模板事务
事务(TRANSACTION),是指一组操作的集合,这些操作要么全部成功,要么全部失败。其目的是在出现错误、系统崩溃或其他意外情况下,保证数据的一致性和完整性。
事务通常具有以下四个重要的特性,这些特性被统称为 ACID 属性:
- Atomicity(原子性):
- 定义: 事务中的所有操作要么全部完成,要么全部不完成,任何一个操作失败都会导致整个事务的失败,并且事务的所有操作都会被回滚(撤销)。
- 示例: 银行转账操作,如果从一个账户扣款后无法在另一个账户中存款,那么整个操作将回滚,不会执行任何更改。
- Consistency(一致性):
- 定义: 事务只能把数据库从一种一致状态转换到另一种一致状态。在事务开始之前和结束之后,数据库的完整性约束没有被破坏。
- 示例: 在一个事务中插入数据时,如果插入的数据违反了数据库的完整性约束(例如唯一约束),那么这个事务将失败,数据库将保持一致状态。
- Isolation(隔离性):
- 定义: 事务的执行是隔离的,多个事务并发执行时,一个事务的执行不会受到其他事务的干扰。隔离性确保了并发事务的执行结果与按顺序执行的结果相同。
- 示例: 两个用户同时购买同一件商品,隔离性确保每个用户看到的库存是正确的,避免超卖的情况。
- Durability(持久性):
- 定义: 一旦事务提交,其结果将永久保存在数据库中,即使系统崩溃也不会丢失。
- 示例: 即使在事务提交后立即发生系统崩溃,事务的结果也会保存在数据库中,重启系统后数据依然存在。
以 MYSQL 关系型数据库为例,事务的使用如下:
-- 开始事务
BEGIN TRANSACTION;
-- TODO 执行业务 1
-- TODO 执行业务 2
-- TODO 执行业务 3
-- ....
-- 提交事务
COMMIT;
-- 或者,回滚事务
ROLLBACK;
其中,BEGIN TRANSACTION
、COMMIT
以及 ROLLBACK
都是事务固定的模板代码,当代的大多数框架都会自动帮我们进行处理。
Spring 对事务的支持
Spring 对关系型数据库中的事务提供强大的支持,包括声明式事务、TransactionTemplate
模板事务等等。
@Transactional 声明式事务
实际开发中,最常用的就是通过 @Transactional
注解来声明事务方法。事务方法会在执行开始前自动开始事务,在方法结束后自动提交事务,在执行过程中如果遇到异常则自动回滚事务。
import org.springframework.stereotype.Service;
import org.springframework.transaction.annotation.Transactional;
@Service
@Transactional // 为 public 方法开启事务
public class FooService {
public void service () {
// TODO 在事务中执行业务
}
}
在类上注解 @Transactional
,则会为当前类中所有 public
方法开启声明式事务。也可以单独注解在方法上,则会覆盖类上的 @Transactional
定义。
@Transactional
注解中还定义了其他的属性,例如:propagation
属性可以指定事务在多个方法之间的传播方式,isolation
属性则可以控制事务的隔离级别。更多的细节你可以参阅 “Spring Boot 应用中的事务处理”
通过 @Transactional
注解,可以把我们从事务的模板代码中解脱出来,专注于业务逻辑。
TransactionTemplate 事务模板
Spring 还提供了一个编程式事务模板类,TransactionTemplate
,用于在代码中显式地控制事务边界。
通过该模板类,我们可以在任何需要执行事务的地方执行事务方法,其核心的两个方法签名如下:
@Nullable
<T> T execute(TransactionCallback<T> action) throws TransactionException {}
void executeWithoutResult(Consumer<TransactionStatus> action) throws TransactionException;
如上,两个方法都是通过 回调 的方式来执行业务方法的。其中 execute
方法需要事务返回一个最终的结果,而 executeWithoutResult
方法则不需要任何返回值。
使用示例如下:
import org.springframework.stereotype.Service;
import org.springframework.transaction.support.TransactionTemplate;
@Service
public class FooService {
private final TransactionTemplate transactionTemplate;
// 构造函数注入 TransactionTemplate
public FooService(TransactionTemplate transactionTemplate) {
super();
this.transactionTemplate = transactionTemplate;
}
public void service () {
// 开始执行事务方法
this.transactionTemplate.execute((status) -> {
// TODO 在事务中执行业务,返回结果
return "Ok";
});
// 开始执行事务方法
this.transactionTemplate.executeWithoutResult(status -> {
// TODO 在事务中执行业务,不需要返回结果
});
// TODO 其他未在事务中的业务 ...
}
}
与声明式注解事务一样,TransactionTemplate
也会自动帮我们开启事务、提交事务,在遇到异常时自动回滚事务,并且更加灵活。
Spring 事务总结
Spring 的事务支持主要是解决了两个问题。
- 事务模板,通过 AOP 来自动处理事务的开启、提交和回滚,我们可以专心于业务逻辑。
- 事务传播,通过
ThreadLocal
来绑定当前线程的事务,因此,即使是在多个事务方法相互嵌套调用的情况下,仍然可以保证使用的是同一个事务。
Golang 实现事务模板
Golang 这些年火得一塌糊涂,它以简单、高性能著称。
在 Golang 中使用事务
Golang 为关系型数据库也提供了一流的支持,要在 Golang 中使用事务也比较简单:
package main
import (
"context"
"database/sql"
"errors"
"fmt"
_ "github.com/go-sql-driver/mysql" // 匿名导入数据库驱动
"log/slog"
)
func main() {
err := serviceDemo()
if err != nil {
slog.Error("service err:", slog.String("err", err.Error()))
}
}
func serviceDemo() (err error) {
// 打开连接
db, err := sql.Open("mysql", fmt.Sprintf("%s:%s@tcp(%s:%d)/%s?charset=utf8mb4&parseTime=True&loc=Local", "root", "root", "localhost", 3306, "demo"))
if err != nil {
return
}
// 测试连接可用性
if err = db.Ping(); err != nil {
return
}
// 1,开启事务
tx, err := db.BeginTx(context.Background(), &sql.TxOptions{
Isolation: sql.LevelDefault, // 使用默认的隔离级别
ReadOnly: false, // 非只读事务
})
if err != nil {
return
}
// 4,方法结束后,始终回滚事务。在事务已提交的情况下,无任何影响。
defer func(t *sql.Tx) {
err = t.Rollback()
if errors.Is(err, sql.ErrTxDone) {
err = nil
}
}(tx)
// 2,执行业务,更改数据
result, err := tx.Exec("UPDATE `test` SET `title` = ? WHERE id = ?;", "springdoc.cn", 1)
if err != nil {
return
}
rowsAffected, err := result.RowsAffected()
if err != nil {
return
}
slog.Info("Exec Success", slog.Int64("RowsAffected", rowsAffected))
// TODO 执行更多的业务
// 3,业务执行完毕,提交事务
err = tx.Commit()
return
}
同样地,在 Golang 中使用事务也遵循如下步骤:
- 开启事务,获取到
sql.Tx
事务对象。 - 使用事务对象来执行 N 个业务。
- 在没有错误的情况下,提交事务。
- 如果出现错误,则回滚事务。
本身事务相关的模板代码就比较烦琐,更加上 Golang 饱受诟病的异常处理方式,使得事务代码看起来更加的复杂,我们急需实现一种类似于 Spring 中的事务处理方式,重点是要实现两个目标:
- 自动地处理模板代码,自动开启事务、提交事务、回滚事务。
- 可以在多个事务方法之间传播事务,保证多个方法都在同一个事务之中。
实现事务模板
如上所述,Spring 使用 AOP + ThreadLocal
为事务提供了强大的支持。而,Golang 中没有 AOP 这种东西,而且 Golang 采用了 Goroutine(协程)的并发模型,没有线程,所以更不可能有 ThreadLocal
这种概念。
但是,我们可以借鉴 Spring TransactionTemplate
的思想,通过回调方法来执行事务方法。统一地开始、提交事务,在事务方法返回的 error
为 nil
时回滚事务,并通过 context.Context
上下文在多个事务方法之间传播事务。
创建 transaction 模块
transaction
包作为事务入口,自动控制事务的开启、提交和回滚,并且允许事务在多个事务方法之间传播!
package transaction
import (
"context"
"database/sql"
"errors"
"fmt"
"log/slog"
)
const (
// 在 Context 中存储 sql.Tx 的 Key
ctxKeyTx = "__current_tx"
)
// DB 数据源
var DB *sql.DB
// Option 事务配置方法
type Option func(*sql.TxOptions)
// TxReadOnly 开启只读事务
var TxReadOnly = func(options *sql.TxOptions) {
options.ReadOnly = true
}
// TxIsolationLevel 设置隔离级别
var TxIsolationLevel = func(level sql.IsolationLevel) Option {
return func(options *sql.TxOptions) {
options.Isolation = level
}
}
// Execute 执行事务方法
func Execute[R any](serviceCall func(ctx context.Context) (R, error), options ...Option) (ret R, err error) {
// 从数据库获取连接
conn, err := DB.Conn(context.Background())
slog.Info("[Transaction] Init DB Conn")
if err != nil {
return
}
defer func() {
if r := recover(); r != nil {
_ = conn.Close()
panic(r)
}
// 最后关闭数据库连接
if closeErr := conn.Close(); closeErr != nil {
slog.Error("[Transaction] Close DB Conn Err", slog.String("err", closeErr.Error()))
if err == nil {
err = closeErr
} else {
err = errors.Join(closeErr, err)
}
}
}()
// 事务配置
txOption := &sql.TxOptions{}
for _, option := range options {
option(txOption)
}
// 开启事务
tx, err := conn.BeginTx(context.Background(), txOption)
if err != nil {
return
}
// 执行事务方法
return run(serviceCall, tx)
}
// run 在事务中执行逻辑,自动提交。err != nil 或者 panic 自动回滚
func run[R any](service func(ctx context.Context) (R, error), tx *sql.Tx) (ret R, err error) {
defer func() {
if r := recover(); r != nil {
_ = rollback(tx)
panic(r)
}
if err != nil {
// 回滚事务
slog.Info("[Transaction] Rollback Tx")
if rollbackErr := rollback(tx); rollbackErr != nil {
err = errors.Join(rollbackErr, err)
}
} else {
// 提交事务
slog.Info("[Transaction] Commit Tx")
if commitErr := commit(tx); commitErr != nil {
err = commitErr
}
}
}()
// 存储事务到 context 中,执行业务
ret, err = service(context.WithValue(context.Background(), ctxKeyTx, tx))
return
}
// Tx 获取当前 Context 中绑定的事务对象
func Tx(ctx context.Context) *sql.Tx {
ret := ctx.Value(ctxKeyTx)
if ret == nil {
panic(errors.New("context 中没有事务,请在事务上下文中调用"))
}
tx, ok := ret.(*sql.Tx)
if !ok {
panic(errors.New(fmt.Sprintf("%v 不是合法的 *sql.Tx 对象", ret)))
}
slog.Info("[Transaction] Get Concurrent Tx")
return tx
}
// rollback 回滚事务
func rollback(tx *sql.Tx) error {
err := tx.Rollback()
if err != nil {
if errors.Is(err, sql.ErrConnDone) {
return nil
}
slog.Error("[Transaction] Rollback Tx Err", slog.String("err", err.Error()))
}
return err
}
// commit 提交事务
func commit(tx *sql.Tx) error {
err := tx.Commit()
if err != nil {
slog.Error("[Transaction] Commit Tx Err", slog.String("err", err.Error()))
}
return err
}
如上,只有 100 多行代码,很简单。核心方法 Execute
的逻辑如下:
transaction
模块的Execute
方法的第一个参数要求传入一个回调方法(serviceCall
),且回调方法的返回值,即最终Execute
方法的返回值。- 回调方法有一个
ctx
参数,两个返回值,第一个返回值是R
(泛型),即需要返回的业务数据,第二个返回值是error
,表示异常。 - 执行
Execute
方法,首先从DB
数据源中获取一个连接,开启事务,并且根据可选的Option
参数对事务进行配置。 - 开启事务后,把事务对象(
sql.Tx
)存储到context.Context
中,传递给回调方法,执行回调。 - 回调方法执行时,就可以从
ctx
参数中获取到绑定的事务对象,并且可以传递给任何需要执行事务的方法。
整个 Execute
方法就是事务的边界,它会自动开启事务、提交事务、回滚事务。所有嵌套执行的事务方法,都应该调用 transaction
模块的 Tx
方法来获取与 context.Context
绑定的事务对象。
创建 service 模块
现在,创建一个简单的 service
业务模块进行演示:
package service
import (
"app/transaction"
"context"
"log/slog"
)
type foo struct{}
var Foo = &foo{}
// Update 更新
func (f *foo) Update(ctx context.Context, id int64, title string) (err error) {
// 从上下文获取到 Tx,执行业务
result, err := transaction.Tx(ctx).Exec("UPDATE `test` SET `title` = ? WHERE id = ?;", title, id)
if err != nil {
return
}
rowsAffected, err := result.RowsAffected()
if err == nil {
slog.Info("Update Success", slog.Int64("RowsAffected", rowsAffected))
}
return
}
// Get 检索
func (f *foo) Get(ctx context.Context, id int64) (ret string, err error) {
// 从上下文获取到 Tx,执行业务
err = transaction.Tx(ctx).QueryRow("SELECT `title` FROM `test` WHERE id = ?;", id).Scan(&ret)
return
}
// UpdateAndGet 先更新,在获取
func (f *foo) UpdateAndGet(ctx context.Context, id int64, title string) (ret string, err error) {
// 先 Update
err = f.Update(ctx, id, title)
if err != nil {
return
}
// 再检索
return f.Get(ctx, id)
}
如上, Foo
有三个方法:
Update
,根据id
更新标题title
,返回error
。Get
,根据id
检索标题,返回title
值和error
。UpdateAndGet
,先更新再检索。它先调用Update
方法进行更新,然后调用Get
方法获取更新后的结果并返回。
其中,Update
和 Get
中凡是涉及到了数据库相关的操作,都是调用 transaction.Tx(ctx)
方法来获取到事务对象(sql.Tx
),从而保证了这些方法之间的调用都是在同一个事务中。
测试
创建一个 main
函数进行测试。
调用多个事务方法
package main
import (
"app/service"
"app/transaction"
"context"
"database/sql"
"fmt"
_ "github.com/go-sql-driver/mysql" // 匿名导入数据库驱动
"log/slog"
"os"
)
func init() {
// 日志配置,输出到 stdout
slog.SetDefault(slog.New(slog.NewTextHandler(os.Stdout, &slog.HandlerOptions{
AddSource: true, // 输出调用栈
Level: slog.LevelDebug,
})))
// 初始化数据库
db, err := sql.Open("mysql", fmt.Sprintf("%s:%s@tcp(%s:%d)/%s?charset=utf8mb4&parseTime=True&loc=Local", "root", "root", "localhost", 3306, "demo"))
if err != nil {
panic(err.Error())
}
// 测试连接可用性
if err = db.Ping(); err != nil {
panic(err.Error())
}
// 初始化模板的数据源
transaction.DB = db
}
func main() {
id := int64(1)
ret, err := transaction.Execute(func(ctx context.Context) (ret string, err error) {
// 调用第一个事务方法,执行更新
err = service.Foo.Update(ctx, id, "Hi springdoc.cn")
if err != nil {
return
}
// 调用第二个事务方法,执行查询,并且返回结果
return service.Foo.Get(ctx, id)
}, transaction.TxIsolationLevel(sql.LevelReadCommitted)) // 设置隔离级别为读已提交
if err != nil {
slog.Error("Execute transaction error", slog.String("err", err.Error()))
return
}
slog.Info("Update Success", slog.String("title", ret))
}
如上,首先在 init
方法中初始化数据源,和日志配置。然后在 main
方法中调用 transaction.Execute
方法,传递回调,在回调中调用多个业务方法,并且这些方法使用的都是同一个事务。
输出的日志如下,Execute
方法自动开启、提交了事务,两个业务方法都在同一个事务中,且最终返回的结果也符合预期:
time=2024-06-25T16:48:09.355+08:00 level=INFO source=D:/golang-app/app/transaction/transaction.go:38 msg="[Transaction] Init DB Conn"
time=2024-06-25T16:48:09.371+08:00 level=INFO source=D:/golang-app/app/transaction/transaction.go:113 msg="[Transaction] Get Concurrent Tx"
time=2024-06-25T16:48:09.371+08:00 level=INFO source=D:/golang-app/app/service/foo.go:22 msg="Update Success" RowsAffected=0
time=2024-06-25T16:48:09.371+08:00 level=INFO source=D:/golang-app/app/transaction/transaction.go:113 msg="[Transaction] Get Concurrent Tx"
time=2024-06-25T16:48:09.372+08:00 level=INFO source=D:/golang-app/app/transaction/transaction.go:91 msg="[Transaction] Commit Tx"
time=2024-06-25T16:48:09.372+08:00 level=INFO source=D:/golang-app/app/main.go:56 msg="Update Success" title="Hi springdoc.cn"
嵌套调用事务方法
init
方法不变,main
方法如下。
func main() {
id := int64(1)
ret, err := transaction.Execute(func(ctx context.Context) (ret string, err error) {
// 调用 UpdateAndGet 方法,先更新,再检索
return service.Foo.UpdateAndGet(ctx, id, "Hello springdoc.cn")
}, transaction.TxIsolationLevel(sql.LevelReadCommitted)) // 设置隔离级别为读已提交
if err != nil {
slog.Error("Execute transaction error", slog.String("err", err.Error()))
return
}
slog.Info("Update Success", slog.String("title", ret))
}
在事务方法中,调用 UpdateAndGet
方法,它会在内部嵌套调用 Update
和 Get
方法。
执行测试,输出日志如下,没任何问题:
time=2024-06-25T16:50:37.896+08:00 level=INFO source=D:/golang-app/app/transaction/transaction.go:38 msg="[Transaction] Init DB Conn"
time=2024-06-25T16:50:37.910+08:00 level=INFO source=D:/golang-app/app/transaction/transaction.go:113 msg="[Transaction] Get Concurrent Tx"
time=2024-06-25T16:50:37.911+08:00 level=INFO source=D:/golang-app/app/service/foo.go:22 msg="Update Success" RowsAffected=1
time=2024-06-25T16:50:37.911+08:00 level=INFO source=D:/golang-app/app/transaction/transaction.go:113 msg="[Transaction] Get Concurrent Tx"
time=2024-06-25T16:50:37.912+08:00 level=INFO source=D:/golang-app/app/transaction/transaction.go:91 msg="[Transaction] Commit Tx"
time=2024-06-25T16:50:37.917+08:00 level=INFO source=D:/golang-app/app/main.go:50 msg="Update Success" title="Hello springdoc.cn"
异常的情况
最后来测试异常情况下,事务会不会自动地回滚。
在上个例子基础上进行小小的修改。通过 Execute
方法的第二个参数设置事务为只读,由于 UpdateAndGet
方法会调用 Update
方法执行 UPDATE
语句,这在只读事务中会导致异常。
func main() {
id := int64(1)
ret, err := transaction.Execute(func(ctx context.Context) (ret string, err error) {
// 调用 UpdateAndGet 方法,先更新,再检索
return service.Foo.UpdateAndGet(ctx, id, "Hello springdoc.cn")
}, transaction.TxReadOnly) // 设置事务为只读
if err != nil {
slog.Error("Execute transaction error", slog.String("err", err.Error()))
return
}
slog.Info("Update Success", slog.String("title", ret))
}
执行测试,输出的日志如下:
time=2024-06-25T16:56:33.356+08:00 level=INFO source=D:/golang-app/app/transaction/transaction.go:38 msg="[Transaction] Init DB Conn"
time=2024-06-25T16:56:33.372+08:00 level=INFO source=D:/golang-app/app/transaction/transaction.go:113 msg="[Transaction] Get Concurrent Tx"
time=2024-06-25T16:56:33.372+08:00 level=INFO source=D:/golang-app/app/transaction/transaction.go:85 msg="[Transaction] Rollback Tx"
time=2024-06-25T16:56:33.372+08:00 level=ERROR source=D:/golang-app/app/main.go:46 msg="Execute transaction error" err="Error 1792 (25006): Cannot execute statement in a READ ONLY transaction."
通过日志可以看到 Execute
自动地回滚事务,并且返回了原始的异常。
总结
通过一些简单的抽象,也可以在 Golang 中实现类似于 Spring 中的模板事务,这种方式可以减少事务相关的模板代码,并且还能保证多个业务方法在同一个事务中。
本文中使用的是原始的 sql
库,实际应用中大多数项目都会使用 ORM 框架。目前 Golang 流行的 ORM 框架都(应该)有 Session 的概念,理解了 transaction
模块中的设计思想后,只需要稍作修改就可以适配任何 ORM 框架了。