Чистые транзакции в гексагональном Go

В современной микросервисной разработке очень популярна чистая архитектура (она же луковая). Этот подход ясно отвечает на много архитектурных вопросов, а также хорошо подходит для сервисов с небольшой кодовой базой. Другая приятная особенность чистой архитектуры состоит в том, что она отлично сочетается с Domain Driven Development - они отлично дополняют друг друга.

Одной из прикладных реализаций чистой архитектуры является гексагональная архитектура - подход, явно выделяющей слои, адаптеры и прочее. Данный подход заслуженно сыскал любовь среди разработчиков на Go - он не требует сложных абстракций или зубодробительных паттернов, а также почти ни в чем не противоречит сложной идиоматике языка - так называемому Go way.

Но есть проблема, которую я часто вижу во многих командах, адаптирующих гексагоны, и с которой я сам столкнулся и успешно решил - реализация транзакций базы данных в рамках DDD и пресловутого гексагона. Что у меня вышло я и расскажу в этой заметке.

Проблема высоких абстракций

Гексагональная архитектура предполагает инверсию зависимостей следующим образом: в центре всего находится модель данных, вокруг нее строится (и зависит от нее) доменная логика, на нее накладывается слой логики сервиса, а дальше идут адаптеры, скрытые за интерфейсами, называемыми портами. Это может варьироваться, но основная идея в том, что зависимости расходятся от центра к периферии, остается.

Для примера на минуточку представим, что мы делаем микросервис, реализующий продажу б/у автомобилей.

Представим, что одним из адаптеров является модуль взаимодействия с базой данных. Но не какой-нибудь случайной, а базой, поддерживающей ACID транзакции. Взаимодействие с базой с точки зрения кода реализовать довольно легко - оборачиваем доменные модели в репозитории, каждый репозиторий прячем за интерфейс (порт), а внутри адаптера его реализуем. Выглядеть такой порт может как-то так:

package port

import ...

// CarRepository car persistence repository
type CarRepository interface {
    UpsertCar(ctx context.Context, car *model.Car) error
    GetCar(ctx context.Context, id string) (*model.Car, error)
    GetCarsBatch(ctx context.Context, ids []string) ([]model.Car, error)
    GetCarsByTimePeriod(ctx context.Context, from, to time.Time) ([]model.Car, error)
    GetCarsByModel(ctx context.Context, model string) ([]model.Car, error)
    DeleteCar(ctx context.Context, id string) error
}

Со стороны доменной логики этот адаптер будет передаваться как DI через интерфейс.

package domain

import ...

type Car struct {
    carRepo port.CarRepository
}

func NewCar(carRepo port.CarRepository) &Car {
    return &Car{
        carRepo: carRepo,
    }
}

Для примера логика поиска авто по году выпуска будет такой.

func (c *Car) GetNewCars(ctx context.Context, from time.Time) ([]model.Car, error) {
   
    if err := c.someValidation(); err := nil {
        return nil, fmt.Errorf("invalid request: %v", err)
    }

    // read latest cars
    cars, err := c.carRepo.GetCarsByTimePeriod(ctx, from, time.Now())
    if err := nil {
        return nil, fmt.Errorf("newest cars read: %v", err)
    }

    return c.filterCars(cars), nil
}

Это довольно хорошо работает с простыми атомарными операциями, как создание, удаление или чтение. Но довольно часто возникает необходимость выполнить сложную логику в рамках одной транзакции БД. Я не буду расписывать примеры, вы и так их отлично знаете.

Проблема тут в том, что с точки зрения архитектуры транзакция является частью адаптера по работе с базой данных - она открывается и закрывается определенными командами (BEGIN, COMMIT или ROLLBACK в SQL), и имеет привязку к порожденной сущности - транзакции. Транзакция сама по себе обычно тоже не витает в облаках глобального скоупа программы, а явно привязана к сессии подключения к базе данных поверх TCP соединения. Поэтому мы не можем (да и не хотим) абстрактно объявить “начало транзакции” и “конец транзакции” в бизнесовом коде - внутри домена. При открытии транзакции у нас появляется некая сущность - транзакция - которую в дальнейшем нужно передать в адаптер для выполнения операций БД непосредственно в этой транзакции.

Здесь возникает проблема курицы и яйца. С одной стороны, адаптер требует, чтобы для каждого запроса в рамках транзакции была передана информация об этой самой транзакции - обычно используемая библиотека реализует транзакцию как некий объект, через который можно делать запросы. С другой стороны, слой домена или сервиса не может знать про реализацию адаптера в парадигме гексагона. Можно завернуть транзакцию в какой-нибудь интерфейс, этот интерфейс окажется монструозно огромным (с методами вроде Select, Insert, Delete и прочими - откройте любимую SQL библиотеку и посмотрите сколько там методов). Причем он не будет иметь никакого смысла для домена - эти методы будут использоваться внутри адаптера, где есть доступ к “незаабстракченой” транзакции.

Можно пойти иначе, и передать транзакцию как interface{}, а потом в адаптере через рефлексию привести к нужному типу, но я считаю такой подход несерьезным и негодным для продуктивного кода. Кроме того, очень не хочется замусоривать сигнатуру методов передачей дополнительно транзакции - ведь она не имеет прямого отношения к самому методу, а указывает на особенности всего процесса работы с бд в рамках операции. Что же делать?

Решение предметных реализаций

Теперь пару слов о контексте нашего решения. В поиске элегантной реализации я несколько раз сталкивался с решениями вроде UnitOfWork, представляющих транзакцию как некоторую бизнес-сущность (о которой знает ядро гексагона с бизнес-логикой). Действительно, транзакцию можно представить как некую бизнес-сущность - ведь бизнес логика может требовать атомарного и неконкурентного выполнения операции. Но проблема элегантных идей в неэлегантной реализации - абстрактные фабрики, рефлексия и некрасивая работа с методами самого адаптера.

Часто эти изощрения продиктованы желанием работать с несколькими базами данных, или иметь возможность переключаться с одной БД на другую, изменив лишь код адаптера (и не меняя бизнес логики).

Поняв, что это слишком “абстрактно”, да и в целом не отвечает go way, я вывел несколько ограничений нашего проекта, которые должны были упростить эту задачу.

Сервис работает только с одной БД

И это PosgreSQL. Ну действительно, часто ли вы переключаетесь между БД? Многие стараются писать некий обобщенный код для работы с generic SQL базой данных, однако есть ли в этом смысл? Практика показывает, что переход с одной SQL базы данных на другую все равно заставит вас перелопатить весь проект, а про переход с SQL на NoSQL или наоборот даже говорить не приходится.

Мы используем конкретную библиотеку для работы с БД

И это go-pg. Лично мне она очень нравится, как билдер запросов (нежели как ORM), и отличается хорошей производительностью. У нее есть одна особенность, о которой я скажу дальше, без которой мне пришлось бы повозиться для реализации задуманного. Но такой функционал есть и в других библиотеках, так что не спешите переписывать свой код.

К чему пришли в итоге

Поэтому я с чистым сердцем взял за основу транзакции в go-pg. Мне хотелось с одной стороны оставить сигнатуры методов репозитория чистыми (только контекст и параметры вызова метода), но при этом сделать решение идиоматичным с точки зрения Go.

В го есть прекрасный инструмент, который позволяет передавать утилитарные данные, которые не касаются вызова конкретного метода, но касаются контекста операции - context.Context. Часто туда попадает телеметрия, логгеры, идентификаторы идемпотентности и прочее. С моей точки зрения информация о транзакции отлично подходит под определение “утилитарных данных” - это некий модификатор процесса, который не влияет на логику напрямую, но оказывает косвенное влияние. От слов - к делу!

package postgres

import ...

type txKey struct{}

// injectTx injects transaction to context
func injectTx(ctx context.Context, tx *pg.Tx) context.Context {
	return context.WithValue(ctx, txKey{}, tx)
}

// extractTx extracts transaction from context
func extractTx(ctx context.Context) *pg.Tx {
	if tx, ok := ctx.Value(txKey{}).(*pg.Tx); ok {
		return tx
	}
	return nil
}

Первый шаг - добавляем методы для добавления транзакции в контекст и извлечения транзакции из контекста. Методы неэкспортируемые, то есть вызывать их можно только внутри адаптера. Обратите внимание - здесь используется транзакция из пакета go-pg безо всяких оберток или абстракций. Можем себе позволить это внутри адаптера!

Далее, нам нужно научить сам адаптер (репозиторий) работать с транзакцией. И вот тут нам понадобится возможность, которая есть в go-pg, но нет в некоторых других библиотеках, например, в sqlx. Это - единый интерфейс для методов запросов, выполняемых библиотекой как в транзакции, так и без нее. Это Select, Insert, Delete и прочие - у них должна быть одинаковая сигнатура для транзакции и без, чтобы можно было вынести за интерфейс. Если нет - придется написать обертку. В случае go-pg и у объекта подключения к БД, и у транзакции есть метод ModelContext(c context.Context, model ...interface{}) *Query, который мы и использовали.

Получилась небольшая оберточка, которая проверяет, есть ли в контексте транзакция. Если есть - возвращает Query из транзакции, а если нет - возвращает Query из коннекта к БД.

package postgres

import ...

// model returns query model with context with or without transaction extracted from context
func (db *Database) model(ctx context.Context, model ...interface{}) *orm.Query {
	tx := extractTx(ctx)
	if tx != nil {
		return tx.ModelContext(ctx, model...)
	}
	return db.conn.ModelContext(ctx, model...)
}

Здесь Database - это непосредственно структура, реализующая CarRepository, в методах которой содержатся SQL запросы к PostgreSQL, а также коннект (пул конектов) к базе данных. Она может реализовывать и больше репозиториев, если у вас их много.

В итоге реализация метода, читающего машины из БД, будет выглядеть так:

package postgres

import ...

func (db *Database) GetCarsByTimePeriod(ctx context.Context, from, to time.Time) ([]model.Car, error) {
	var m []model.Car

	err := db.model(ctx, &m).
		Where("manufacture_date BETWEEN ? AND ?", from, to).
		Order("model").
		Select()
	if err != nil {
		return nil, err
	}

	return m, nil
}

При этом метод можно использовать как в транзакции, так и без нее - ни сигнатура, ни сам метод от этого не меняется. Причем решение, использовать транзакцию, или нет, принимает именно часть сервиса с бизнес-логикой. Давайте же посмотрим, как это сделано.

Транзакции в бизнесе и бизнес в транзакциях

Дело осталось за малым - реализовать метод создания транзакции внутри адаптера, который будет возвращать “заряженный” транзакцией контекст, добавить этот метод в интерфейс, вызывать его в бизнес логике и передавать во все вызовы репозитория, а в конце делать коммит или роллбэк.

Звучит логично, но как-то некрасиво. Может быть, в Go есть более элегантный инструмент?

И он есть! Это - замыкание. Реализуем метод, который позволит нам реализовать всю транзакцию, не отходя от кассы:

package postgres

import ...

// WithinTransaction runs function within transaction
//
// The transaction commits when function were finished without error
func (db *Database) WithinTransaction(ctx context.Context, tFunc func(ctx context.Context) error) error {
	// begin transaction
	tx, err := db.conn.Begin()
	if err != nil {
		return fmt.Errorf("begin transaction: %w", err)
	}

	defer func() {
		if errTx := tx.Close(); errTx != nil {
           log.Printf("close transaction: %v", err)
		}
	}()

	// run callback
	err = tFunc(injectTx(ctx, tx))
	if err != nil {
		// if error, rollback
		if errRollback := tx.Rollback(); errRollback != nil {
            log.Printf("rollback transaction: %v", errRollback)
		}
		return err
	}
	// if no error, commit
	if errCommit := tx.Commit(); errCommit != nil {
        log.Printf("commit transaction: %v", errCommit)
	}
	return nil
}

Метод принимает контекст и функцию, которую нужно выполнить в транзакции. На основе контекста создается контекст с транзакцией, и передается в функцию. Это позволяет также прервать выполнение функции при отмене родительского контекста - например, при graceful shutdown.

Далее если функция выполнена без ошибок, выполняется commit, в противном случае выполняется rollback, а ошибка возвращается из метода.

Этот метод выведем в отдельный порт - изоляцию нужно соблюдать!

package port

import ...

// Transactor runs logic inside a single database transaction
type Transactor interface {
	// WithinTransaction runs a function within a database transaction.
	//
	// Transaction is propagated in the context,
	// so it is important to propagate it to underlying repositories.
	// Function commits if error is nil, and rollbacks if not.
	// It returns the same error.
	WithinTransaction(context.Context, func(ctx context.Context) error) error
}

Добавим его через DI в домен:

package domain

import ...

type Car struct {
    carRepo port.CarRepository
    transactor port.Transactor
}

func NewCar(transactor port.Transactor, carRepo port.CarRepository) &Car {
    return &Car{
        carRepo: carRepo,
        transactor: transactor,
    }
}

Это позволяет нам совсем не заморачиваться по поводу транзакций внутри бизнес логики, и упростить транзакционные операции до следующего:

package domain

import ...

func (c *Car) BuyCar(ctx context.Context, id string, price int, owner model.Owner) error {

   if err := c.validateBuyer(ctx, price, owner); err != nil {
        return err
    }

    return c.transactor.WithinTransaction(ctx, func(txCtx context.Context) error {
		car, err := c.carRepo.GetCar(txCtx, id)
		if err != nil {
			return err
		}

		if err := c.validatePurchase(txCtx, car, owner); err != nil {
            return err
        }

        car.Owner = owner
        car.SellPrice = price
        car.SellDate = time.Now()

		if err := c.carRepo.UpsertCar(txCtx, car); err != nil {
			return err
		}

        log.Printf("car %s model % sold to %s for %d", 
            car.Id, car.Model, owner.Name, price)

		return nil
	})

}

Утешительные итоги

В результате мы получили:

  • простой с точки зрения слоя бизнес-логики механизм выполнения операций в транзакции;
  • изоляция уровней, абстракции не протекают;
  • отсутствие рефлексии, вся работа с транзакцией типизирована и отказоустойчива;
  • чистые методы репозиториев, нет нужды пробрасывать транзакцию в сигнатуру;
  • методы с запросами агностичны к наличию транзакции - если она есть, выполнятся в ней, если нет - напрямую в БД;
  • commit и rollback выполняются автоматически по результату выполнения функции. Никаких defer.
  • при панике выполнится rollback внутри tx.Close().

Этот подход применим к любой базе данных, поддерживающий ACID транзакции, при условии общего интерфейса для запросов как в транзакции, так и без него. При желании можно дописать свою обертку, если в любимой библиотеке этого нет.

Этот подход не применим в ситуации, когда вы работаете с несколькими БД в одном сервисе, и вам нужно связать две транзакции в одну. В этом случае я вам не завидую.

Возможно, где-то я отошел от принципов DDD или пренебрег концепциями гексагональной архитектуры, однако результат вышел простым, красивым и читабельным.

А как бы сделали вы? Приглашаю в комментарии для обсуждения идей и критики!

comments