Distributed Transactions in Microservices with Spring Boot
When I’m talking about microservices with other people they are often asking me about an approach to distributed transactions. My advice is always the same – try to completely avoid distributed transactions in your microservices architecture. It is a very complex process with a lot of moving parts that can fail. That’s why it does not fit the nature of microservices-based systems.
However, if for any reason you require to use distributed transactions, there are two popular approaches for that: Two Phase Commit Protocol and Eventual Consistency and Compensation also known as Saga pattern. You can read some interesting articles about it online. Most of them are discussing theoretical aspects related two those approaches, so in this article, I’m going to present the sample implementation in Spring Boot. It is worth mentioning that there are some ready implementations of Saga pattern like support for complex business transactions provided by Axon Framework. The documentation of this solution is available here: https://docs.axoniq.io/reference-guide/implementing-domain-logic/complex-business-transactions.
Example
The source code with sample applications is as usual available on GitHub in the repository: https://github.com/piomin/sample-spring-microservices-transactions.git.
Architecture
First, we need to add a new component to our system. It is responsible just for managing distributed transactions across microservices. That element is described as transaction-server on the diagram below. We also use another popular component in microservices-based architecture discovery-server
. There are three applications: order-service
, account-service
and product-service
. The application order-service
is communicating with account-service
and product-service
. All these applications are using Postgres database as a backend store. Just for simplification I have run a single database with multiple tables. In a normal situation we would have a single database per each microservice.
Now, we will consider the following situation (it is visualized on the diagram below). The application order-service
is creating an order, storing it in the database, and then starting a new distributed transaction (1). After that, it is communicating with application product-service
to update the current number of stored products and get their price (2). At the same time product-service
is sending information to transaction-server
that it is participating in the transaction (3). Then order-service
is trying to withdraw the required funds from the customer account and transfer them into another account related to a seller (4). Finally, we are rolling back the transaction by throwing an exception inside the transaction method from order-service
(6). This rollback should cause a rollback of the whole distributed transaction.
Building transaction server
We are starting implementation from transaction-server
. A transaction server is responsible for managing distributed transactions across all microservices in our sample system. It exposes REST API available for all other microservices for adding new transactions and updating their status. It also sends asynchronous broadcast events after receiving transaction confirmation or rollback from a source microservice. It uses RabbitMQ message broker for sending events to other microservices via topic exchange. All other microservices are listening for incoming events, and after receiving them they are committing or rolling back transactions. We can avoid using a message broker for exchanging events and use communication over HTTP endpoints, but that makes sense only if we have a single instance of every microservice. Here’s the picture that illustrates the currently described architecture.
Let’s take a look on the list of required dependencies. It would be pretty the same for other applications. We need spring-boot-starter-amqp
for integration with RabbitMQ, spring-boot-starter-web
for exposing REST API over HTTP, spring-cloud-starter-netflix-eureka-client
for integration with Eureka discovery server and some basic Kotlin libraries.
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<dependency>
<groupId>com.fasterxml.jackson.module</groupId>
<artifactId>jackson-module-kotlin</artifactId>
</dependency>
<dependency>
<groupId>org.jetbrains.kotlin</groupId>
<artifactId>kotlin-reflect</artifactId>
</dependency>
<dependency>
<groupId>org.jetbrains.kotlin</groupId>
<artifactId>kotlin-stdlib</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-starter-netflix-eureka-client</artifactId>
</dependency>
In the main class we are defining a topic exchange for events sent to microservices. The name of exchange is trx-events
, and it is automatically created on RabbitMQ after application startup.
@SpringBootApplication
class TransactionServerApp {
@Bean
fun topic(): TopicExchange = TopicExchange("trx-events")
}
fun main(args: Array) {
runApplication(*args)
}
Here are domain model classes used by a transaction server. The same classes are used by the microservices during communication with transaction-server
.
data class DistributedTransaction(var id: String? = null,var status: DistributedTransactionStatus,
val participants: MutableList<DistributedTransactionParticipant> = mutableListOf())
class DistributedTransactionParticipant(val serviceId: String, var status: DistributedTransactionStatus)
enum class DistributedTransactionStatus {
NEW, CONFIRMED, ROLLBACK, TO_ROLLBACK
}
Here’s the controller class. It is using a simple in-memory implementation of repository and RabbitTemplate
for sending events to RabbitMQ. The HTTP API provides methods for adding new transaction, finishing existing transaction with a given status (CONFIRM
or ROLLBACK
), searching transaction by id
and adding participants (new services) into a transaction.
@RestController
@RequestMapping("/transactions")
class TransactionController(val repository: TransactionRepository,
val template: RabbitTemplate) {
@PostMapping
fun add(@RequestBody transaction: DistributedTransaction): DistributedTransaction =
repository.save(transaction)
@GetMapping("/{id}")
fun findById(@PathVariable id: String): DistributedTransaction? = repository.findById(id)
@PutMapping("/{id}/finish/{status}")
fun finish(@PathVariable id: String, @PathVariable status: DistributedTransactionStatus) {
val transaction: DistributedTransaction? = repository.findById(id)
if (transaction != null) {
transaction.status = status
repository.update(transaction)
template.convertAndSend("trx-events", DistributedTransaction(id, status))
}
}
@PutMapping("/{id}/participants")
fun addParticipant(@PathVariable id: String,
@RequestBody participant: DistributedTransactionParticipant) =
repository.findById(id)?.participants?.add(participant)
@PutMapping("/{id}/participants/{serviceId}/status/{status}")
fun updateParticipant(@PathVariable id: String,
@PathVariable serviceId: String,
@PathVariable status: DistributedTransactionStatus) {
val transaction: DistributedTransaction? = repository.findById(id)
if (transaction != null) {
val index = transaction.participants.indexOfFirst { it.serviceId == serviceId }
if (index != -1) {
transaction.participants[index].status = status
template.convertAndSend("trx-events", DistributedTransaction(id, status))
}
}
}
}
Handling transactions in downstream services
Let’s analyze how our microservices are handling transactions on the example of account
. Here’s the implementation of AccountService
that is called by the controller for transfering funds from/to account. All methods here are @Transactional
and here we need an attention – @Async
. It means that each method is running in a new thread and is processing asynchronously. Why? That’s a key concept here. We will block the transaction in order to wait for confirmation from transaction-server
, but the main thread used by the controller will not be blocked. It returns the response with the current state of Account
immediately.
@Service
@Transactional
@Async
class AccountService(val repository: AccountRepository,
var applicationEventPublisher: ApplicationEventPublisher) {
fun payment(id: Int, amount: Int, transactionId: String) =
transfer(id, amount, transactionId)
fun withdrawal(id: Int, amount: Int, transactionId: String) =
transfer(id, (-1) * amount, transactionId)
private fun transfer(id: Int, amount: Int, transactionId: String) {
val accountOpt: Optional<Account> = repository.findById(id)
if (accountOpt.isPresent) {
val account: Account = accountOpt.get()
account.balance += amount
applicationEventPublisher.publishEvent(AccountTransactionEvent(transactionId, account))
repository.save(account)
}
}
}
Here’s the implementation of @Controller
class. As you see it is calling methods from AccountService
, that are being processed asynchronously. The returned Account
object is taken from EventBus
bean. This bean is responsible for exchanging asynchronous events within the application scope. En event is sent by the AccountTransactionListener
bean responsible for handling Spring transaction events.
@RestController
@RequestMapping("/accounts")
class AccountController(val repository: AccountRepository,
val service: AccountService,
val eventBus: EventBus) {
@PostMapping
fun add(@RequestBody account: Account): Account = repository.save(account)
@GetMapping("/customer/{customerId}")
fun findByCustomerId(@PathVariable customerId: Int): List<Account> =
repository.findByCustomerId(customerId)
@PutMapping("/{id}/payment/{amount}")
fun payment(@PathVariable id: Int, @PathVariable amount: Int,
@RequestHeader("X-Transaction-ID") transactionId: String): Account {
service.payment(id, amount, transactionId)
return eventBus.receiveEvent(transactionId)!!.account
}
@PutMapping("/{id}/withdrawal/{amount}")
fun withdrawal(@PathVariable id: Int, @PathVariable amount: Int,
@RequestHeader("X-Transaction-ID") transactionId: String): Account {
service.withdrawal(id, amount, transactionId)
return eventBus.receiveEvent(transactionId)!!.account
}
}
The event object exchanged between bean is very simple. It contains an id of transaction and the current Account
object.
class AccountTransactionEvent(val transactionId: String, val account: Account)
Finally, let’s take a look at the implementation of AccountTransactionListener
bean responsible for handling transactional events. We are using Spring @TransactionalEventListener
for annotating methods that should handle incoming events. There are 4 possible event types to handle: BEFORE_COMMIT
, AFTER_COMMIT
, AFTER_ROLLBACK
and AFTER_COMPLETION
. There is one very important thing in @TransactionalEventListener
, which may be not very intuitive. It is being processed in the same thread as the transaction. So if you would do something that should not block the thread with transaction you should annotate it with @Async
. However, in our case this behaviour is required, since we need to block a transactional thread until we receive a confirmation or rollback from transaction-server
for a given transaction. These events are sent by transaction-server
through RabbitMQ, and they are also exchanged between beans using EventBus
. If the status of the received event is different than CONFIRMED
we are throwing the exception to rollback transaction.
The AccountTransactionListener
is also listening on AFTER_ROLLBACK
and AFTER_COMPLETION
. After receiving such an event type it is changing the status of the transaction by calling endpoint exposed by transaction-server
.
@Component
class AccountTransactionListener(val restTemplate: RestTemplate,
val eventBus: EventBus) {
@TransactionalEventListener(phase = TransactionPhase.BEFORE_COMMIT)
@Throws(AccountProcessingException::class)
fun handleEvent(event: AccountTransactionEvent) {
eventBus.sendEvent(event)
var transaction: DistributedTransaction? = null
for (x in 0..100) {
transaction = eventBus.receiveTransaction(event.transactionId)
if (transaction == null)
Thread.sleep(100)
else break
}
if (transaction == null || transaction.status != DistributedTransactionStatus.CONFIRMED)
throw AccountProcessingException()
}
@TransactionalEventListener(phase = TransactionPhase.AFTER_ROLLBACK)
fun handleAfterRollback(event: AccountTransactionEvent) {
restTemplate.put("http://transaction-server/transactions/transactionId/participants/{serviceId}/status/{status}",
null, "account-service", "TO_ROLLBACK")
}
@TransactionalEventListener(phase = TransactionPhase.AFTER_COMPLETION)
fun handleAfterCompletion(event: AccountTransactionEvent) {
restTemplate.put("http://transaction-server/transactions/transactionId/participants/{serviceId}/status/{status}",
null, "account-service", "CONFIRM")
}
}
Here’s the implementation of the bean responsible for receiving asynchronous events from a message broker. As you see after receiving such an event it is using EventBus
to forward that event to other beans.
@Component
class DistributedTransactionEventListener(val eventBus: EventBus) {
@RabbitListener(bindings = [
QueueBinding(exchange = Exchange(type = ExchangeTypes.TOPIC, name = "trx-events"),
value = Queue("trx-events-account"))
])
fun onMessage(transaction: DistributedTransaction) {
eventBus.sendTransaction(transaction)
}
}
Integration with database
Of course our application is using Postgres as a backend store, so we need to provide integration. In fact, that is the simplest step of our implementation. First we need to add the following 2 dependencies. We will use Spring Data JPA for integration with Postgres.
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-data-jpa</artifactId>
</dependency>
<dependency>
<groupId>org.postgresql</groupId>
<artifactId>postgresql</artifactId>
<scope>runtime</scope>
</dependency>
Our entity is very simple. Besides the id
field it contains two fields: customerId
and balance
.
@Entity
data class Account(@Id @GeneratedValue(strategy = GenerationType.AUTO) val id: Int,
val customerId: Int,
var balance: Int)
We are using the well-known Spring Data repository pattern.
interface AccountRepository: CrudRepository<Account, Int> {
fun findByCustomerId(id: Int): List<Account>
}
Here’s the suggested list of configuration settings.
spring:
application:
name: account-service
datasource:
url: jdbc:postgresql://postgresql:5432/trx
username: trx
password: trx
hikari:
connection-timeout: 2000
initialization-fail-timeout: 0
jpa:
database-platform: org.hibernate.dialect.PostgreSQLDialect
hibernate:
ddl-auto: create
show-sql: true
properties:
hibernate:
format_sql: true
rabbitmq:
host: rabbitmq
port: 5672
connection-timeout: 2000
Building order-service
Ok, we have already finished the implementation of transaction-server
, and two microservices account-service
and product-service
. Since the implementation of product-service
is very similar to account-service
, I have explained everything on the example of account-service
. Now, we may proceed to the last part – the implementation of order-service
. It is responsible for starting a new transaction and marking it as finished. It also may finish it with rollback.Of course, rollback events may be sent by another two applications as well.
The implementation of @Controller
class is visible below. I’ll describe it step by step. We are starting a new distributed transaction by calling POST /transactions
endpoint exposed by transaction-server
(1). Then we are storing a new order in database (2). When we are calling a transactional method from downstream service we need to set HTTP header X-Transaction-ID
. The first transactional method that is called here is PUT /products/{id}/count/{count}
(3). It updates the number of products in the store and calculates a final price (4). In the step it is calling another transaction method – this time from account-service
(5). It is responsible for withdrawing money from customer accounts. We are enabling Spring transaction events processing (6). In the last step we are generating a random number, and then basing on its value application is throwing an exception to rollback transaction (7).
@RestController
@RequestMapping("/orders")
class OrderController(val repository: OrderRepository,
val restTemplate: RestTemplate,
var applicationEventPublisher: ApplicationEventPublisher) {
@PostMapping
@Transactional
@Throws(OrderProcessingException::class)
fun addAndRollback(@RequestBody order: Order) {
var transaction = restTemplate.postForObject("http://transaction-server/transactions",
DistributedTransaction(), DistributedTransaction::class.java) // (1)
val orderSaved = repository.save(order) // (2)
val product = updateProduct(transaction!!.id!!, order) // (3)
val totalPrice = product.price * product.count // (4)
val accounts = restTemplate.getForObject("http://account-service/accounts/customer/{customerId}",
Array<Account>::class.java, order.customerId)
val account = accounts!!.first { it.balance >= totalPrice}
updateAccount(transaction.id!!, account.id, totalPrice) // (5)
applicationEventPublisher.publishEvent(OrderTransactionEvent(transaction.id!!)) // (6)
val r = Random.nextInt(100) // (7)
if (r % 2 == 0)
throw OrderProcessingException()
}
fun updateProduct(transactionId: String, order: Order): Product {
val headers = HttpHeaders()
headers.set("X-Transaction-ID", transactionId)
val entity: HttpEntity<*> = HttpEntity<Any?>(headers)
val product = restTemplate.exchange("http://product-service/products/{id}/count/{count}",
HttpMethod.PUT, null, Product::class.java, order.id, order.count)
return product.body!!
}
fun updateAccount(transactionId: String, accountId: Int, totalPrice: Int): Account {
val headers = HttpHeaders()
headers.set("X-Transaction-ID", transactionId)
val entity: HttpEntity<*> = HttpEntity<Any?>(headers)
val account = restTemplate.exchange("http://account-service/accounts/{id}/withdrawal/{amount}",
HttpMethod.PUT, null, Account::class.java, accountId, totalPrice)
return account.body!!
}
}
Conclusion
Even a trivial implementation of distributed transactions in microservices, like the one, demonstrated in this article, can be complicated. As you see we need to add a new element to our architecture, transaction-server
, responsible only for distributed transaction management. We also have to add a message broker in order to exchange events between our applications and transaction-server
. However, many of you were asking me about distributed transactions in the microservices world, so I decided to build that simple demo. I’m waiting for your feedback and opinions.
19 COMMENTS