Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix sync transaction/connection propagation #3336

Open
wants to merge 3 commits into
base: 4.11.x
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -218,12 +218,12 @@ public void complete(@NonNull ConnectionStatus<C> status) {
private DefaultConnectionStatus<C> openNewConnectionInternal(@NonNull ConnectionDefinition definition) {
C connection = openConnection(definition);
DefaultConnectionStatus<C> status = new DefaultConnectionStatus<>(connection, definition, true);
PropagatedContext propagatedContext = PropagatedContext.getOrEmpty().plus(new ConnectionPropagatedContextElement<>(this, status));
PropagatedContext.Scope scope = propagatedContext.propagate();
ConnectionPropagatedContextElement<C> newConnectionContextElement = new ConnectionPropagatedContextElement<>(this, status);
PropagatedContext.getOrEmpty().plus(newConnectionContextElement).propagate();
status.registerSynchronization(new ConnectionSynchronization() {
@Override
public void executionComplete() {
scope.close();
PropagatedContext.getOrEmpty().minus(newConnectionContextElement).propagate();
}
});
return status;
Expand All @@ -235,26 +235,27 @@ private DefaultConnectionStatus<C> reuseExistingConnectionInternal(@NonNull Conn
existingContextElement.status.getDefinition(),
false);
setupConnection(status);
PropagatedContext.Scope scope = PropagatedContext.getOrEmpty()
.replace(existingContextElement, new ConnectionPropagatedContextElement<>(this, status))
ConnectionPropagatedContextElement<C> newConnectionElement = new ConnectionPropagatedContextElement<>(this, status);
PropagatedContext.getOrEmpty()
.replace(existingContextElement, newConnectionElement)
.propagate();
status.registerSynchronization(new ConnectionSynchronization() {
@Override
public void executionComplete() {
scope.close();
PropagatedContext.getOrEmpty().minus(newConnectionElement).plus(existingContextElement).propagate();
}
});
return status;
}

private DefaultConnectionStatus<C> suspendOpenConnection(ConnectionPropagatedContextElement<C> existingConnectionContextElement,
@NonNull Supplier<DefaultConnectionStatus<C>> newStatusSupplier) {
PropagatedContext.Scope scope = PropagatedContext.getOrEmpty().minus(existingConnectionContextElement).propagate();
PropagatedContext.getOrEmpty().minus(existingConnectionContextElement).propagate();
DefaultConnectionStatus<C> newStatus = newStatusSupplier.get();
newStatus.registerSynchronization(new ConnectionSynchronization() {
@Override
public void executionComplete() {
scope.close();
PropagatedContext.getOrEmpty().plus(existingConnectionContextElement).propagate();
}
});
return newStatus;
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
package io.micronaut.data.jdbc

import io.micronaut.data.connection.ConnectionOperations
import io.micronaut.data.connection.jdbc.operations.DataSourceConnectionOperations
import io.micronaut.data.connection.jdbc.operations.DefaultDataSourceConnectionOperations
import io.micronaut.data.tck.tests.AbstractTransactionSpec
import io.micronaut.transaction.TransactionOperations
Expand All @@ -14,6 +16,11 @@ abstract class AbstractJdbcTransactionSpec extends AbstractTransactionSpec {
return context.getBean(DataSourceTransactionManager)
}

@Override
protected ConnectionOperations getConnectionOperations() {
return context.getBean(DataSourceConnectionOperations)
}

@Override
protected Runnable getNoTxCheck() {
DefaultDataSourceConnectionOperations connectionOperations = context.getBean(DefaultDataSourceConnectionOperations)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
*/
package io.micronaut.data.spring.jdbc


import io.micronaut.data.connection.ConnectionOperations
import io.micronaut.data.spring.jdbc.micronaut.H2BookRepository
import io.micronaut.data.tck.repositories.BookRepository
import io.micronaut.data.tck.tests.AbstractTransactionSpec
Expand Down Expand Up @@ -45,6 +45,11 @@ class SpringJdbcTransactionSpec extends AbstractTransactionSpec {
return context.getBean(SpringJdbcTransactionOperations)
}

@Override
protected ConnectionOperations getConnectionOperations() {
return context.getBean(SpringJdbcConnectionOperations)
}

@Override
protected Runnable getNoTxCheck() {
return new Runnable() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
*/
package io.micronaut.data.spring.jdbc


import io.micronaut.data.connection.ConnectionOperations
import io.micronaut.data.model.query.builder.sql.Dialect
import io.micronaut.data.spring.jdbc.micronaut.PostgresBookRepository
import io.micronaut.data.tck.repositories.BookRepository
Expand Down Expand Up @@ -44,6 +44,11 @@ class SpringPostgresJdbcTransactionSpec extends AbstractTransactionSpec implemen
return context.getBean(SpringJdbcTransactionOperations)
}

@Override
protected ConnectionOperations getConnectionOperations() {
return context.getBean(SpringJdbcConnectionOperations)
}

@Override
protected Runnable getNoTxCheck() {
return new Runnable() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,9 +15,11 @@
*/
package io.micronaut.data.spring.hibernate

import io.micronaut.data.connection.ConnectionOperations
import io.micronaut.data.model.query.builder.sql.Dialect
import io.micronaut.data.spring.hibernate.micronaut.HibernateBookRepository
import io.micronaut.data.spring.hibernate.micronaut.ReadOnlyTest
import io.micronaut.data.spring.jpa.hibernate.SpringHibernateConnectionOperations
import io.micronaut.data.spring.jpa.hibernate.SpringHibernateTransactionOperations
import io.micronaut.data.tck.repositories.BookRepository
import io.micronaut.data.tck.tests.AbstractTransactionSpec
Expand Down Expand Up @@ -55,6 +57,11 @@ class SpringHibernateTransactionSpec extends AbstractTransactionSpec implements
return context.getBean(SpringHibernateTransactionOperations)
}

@Override
protected ConnectionOperations getConnectionOperations() {
return context.getBean(SpringHibernateConnectionOperations)
}

@Override
protected Runnable getNoTxCheck() {
return new Runnable() {
Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package io.micronaut.data.tck.tests

import io.micronaut.context.ApplicationContext
import io.micronaut.data.connection.ConnectionOperations
import io.micronaut.data.tck.repositories.BookRepository
import io.micronaut.data.tck.services.TxBookService
import io.micronaut.data.tck.services.TxEventsService
Expand Down Expand Up @@ -29,10 +30,15 @@ abstract class AbstractTransactionSpec extends Specification implements TestProp

protected abstract TransactionOperations getTransactionOperations();

protected abstract ConnectionOperations getConnectionOperations();

protected abstract Runnable getNoTxCheck();

TxBookService getBookService() {
return context.getBean(TxBookService)
def service = context.getBean(TxBookService)
service.transactionManager = getTransactionOperations()
service.connectionOperations = getConnectionOperations()
return service
}

void cleanup() {
Expand Down Expand Up @@ -67,6 +73,13 @@ abstract class AbstractTransactionSpec extends Specification implements TestProp
return false
}

void "connectable with nested transaction"() {
when:
bookService.bookAddedInConnectableNestedTransaction()
then:
bookService.countBooksTransactional() == 1
}

void "custom name transaction"() {
when:
bookService.bookAddedCustomNamedTransaction(new Runnable() {
Expand All @@ -78,7 +91,7 @@ abstract class AbstractTransactionSpec extends Specification implements TestProp
}
})
then:
assert bookService.countBooksTransactional() == 1
bookService.countBooksTransactional() == 1
}

void "test book added in read only transaction throws error"() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,22 +2,53 @@

import io.micronaut.context.ApplicationContext;
import io.micronaut.context.annotation.Requires;
import io.micronaut.data.connection.ConnectionOperations;
import io.micronaut.data.connection.ConnectionStatus;
import io.micronaut.transaction.SynchronousTransactionManager;
import io.micronaut.transaction.TransactionDefinition;
import io.micronaut.transaction.TransactionStatus;
import io.micronaut.transaction.annotation.Transactional;
import io.micronaut.transaction.support.DefaultTransactionDefinition;
import jakarta.inject.Singleton;

import java.util.Optional;

@Requires(property = AbstractBookService.BOOK_REPOSITORY_CLASS_PROPERTY)
@Singleton
public class TxBookService extends AbstractBookService {

private final SynchronousTransactionManager<Object> transactionManager;
public SynchronousTransactionManager<Object> transactionManager;
public ConnectionOperations<Object> connectionOperations;

public TxBookService(ApplicationContext beanContext,
SynchronousTransactionManager<Object> transactionManager) {
public TxBookService(ApplicationContext beanContext) {
super(beanContext);
this.transactionManager = transactionManager;
}

public void bookAddedInConnectableNestedTransaction() {
connectionOperations.executeWrite(connectionStatus -> {
ConnectionStatus<Object> outerConnection = getConnection();
TransactionDefinition definition = new DefaultTransactionDefinition(TransactionDefinition.Propagation.NESTED);
TransactionStatus<Object> status = transactionManager.getTransaction(definition);
ConnectionStatus<Object> txConnection = getConnection();
if (!txConnection.equals(outerConnection)) {
throw new IllegalStateException("Connection is not the same as the outer connection");
}
bookRepository.save(newBook("MandatoryBook"));
transactionManager.commit(status);
ConnectionStatus<Object> afterTxConnection = getConnection();
if (!afterTxConnection.equals(outerConnection)) {
throw new IllegalStateException("Connection is not the same as the outer connection");
}
return null;
});
}

private ConnectionStatus<Object> getConnection() {
Optional<ConnectionStatus<Object>> optionalConnection = connectionOperations.findConnectionStatus();
if (optionalConnection.isEmpty()) {
throw new IllegalStateException("No connection status available");
}
return optionalConnection.get();
}

@Transactional(name = "MyTx")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ package io.micronaut.transaction.hibernate6

import io.micronaut.core.type.Argument
import io.micronaut.data.connection.ConnectionOperations
import io.micronaut.data.hibernate.connection.HibernateConnectionOperations
import io.micronaut.data.model.query.builder.sql.Dialect
import io.micronaut.data.tck.repositories.BookRepository
import io.micronaut.data.tck.tests.AbstractTransactionSpec
Expand Down Expand Up @@ -54,6 +55,11 @@ class HibernateTransactionSpec extends AbstractTransactionSpec implements TestRe
return context.getBean(HibernateTransactionManager)
}

@Override
protected ConnectionOperations getConnectionOperations() {
return context.getBean(HibernateConnectionOperations)
}

@Override
protected Runnable getNoTxCheck() {
ConnectionOperations<Session> connectionOperations = context.getBean(Argument.of(ConnectionOperations.class, Session.class))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -473,6 +473,14 @@ public TransactionStatus<C> getTransaction(TransactionDefinition definition) thr
throw new TransactionUsageException("Existing transaction found for transaction marked with propagation 'never'");
};
} else {
if (connectionStatus != null) {
return switch (definition.getPropagationBehavior()) {
case REQUIRED, REQUIRES_NEW, NESTED -> openNewTransaction(connectionStatus, definition); // Nested propagation applies only for the existing TX
case SUPPORTS, NEVER, NOT_SUPPORTED ->
withNoTransactionStatus(connectionStatus, definition);
case MANDATORY -> throw newMandatoryTx();
};
}
return switch (definition.getPropagationBehavior()) {
case REQUIRED, REQUIRES_NEW, NESTED -> openNewConnectionAndTransaction(definition); // Nested propagation applies only for the existing TX
case SUPPORTS, NEVER, NOT_SUPPORTED ->
Expand Down Expand Up @@ -552,6 +560,20 @@ public void afterCompletion(Status status) {
return transactionStatus;
}

@NonNull
private T openNewTransaction(ConnectionStatus<C> connectionStatus, TransactionDefinition definition) {
T transactionStatus = createNewTransactionStatus(connectionStatus, definition);
PropagatedContext.Scope scope = extendCurrentPropagatedContext(transactionStatus).propagate();
transactionStatus.registerInvocationSynchronization(new TransactionSynchronization() {
@Override
public void afterCompletion(Status status) {
scope.close();
}
});
begin(transactionStatus);
return transactionStatus;
}

@NonNull
private T withNoTransactionStatus(ConnectionStatus<C> connectionStatus, TransactionDefinition definition) {
T transactionStatus = createNoTxTransactionStatus(connectionStatus, definition);
Expand Down
Loading