Skip to content

Commit 7356f97

Browse files
committed
Fixed auto-read problem with result summary
All result stream should be buffered/consumed when result summary is requested or when session is closed. This means underlying channel should read everything that's available. It did not do so when summary was requested after number of buffered records exceeded high watermark. This commit makes fetching of summary or failure enable auto-read on the underlying channel. So SUCCESS and FAILURE messages will be read without waiting for all records to be consumed.
1 parent 9765378 commit 7356f97

File tree

3 files changed

+109
-2
lines changed

3 files changed

+109
-2
lines changed

driver/src/main/java/org/neo4j/driver/internal/handlers/PullAllResponseHandler.java

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -170,6 +170,10 @@ else if ( summary != null )
170170
{
171171
if ( summaryFuture == null )
172172
{
173+
// neither SUCCESS nor FAILURE message has arrived, register future to be notified when it arrives
174+
// future will be completed with summary on SUCCESS and completed exceptionally on FAILURE
175+
// enable auto-read, otherwise we might not read SUCCESS/FAILURE if records are not consumed
176+
connection.enableAutoRead();
173177
summaryFuture = new CompletableFuture<>();
174178
}
175179
return summaryFuture;
@@ -190,6 +194,10 @@ else if ( finished )
190194
{
191195
if ( failureFuture == null )
192196
{
197+
// neither SUCCESS nor FAILURE message has arrived, register future to be notified when it arrives
198+
// future will be completed with null on SUCCESS and completed with Throwable on FAILURE
199+
// enable auto-read, otherwise we might not read SUCCESS/FAILURE if records are not consumed
200+
connection.enableAutoRead();
193201
failureFuture = new CompletableFuture<>();
194202
}
195203
return failureFuture;

driver/src/test/java/org/neo4j/driver/internal/handlers/PullAllResponseHandlerTest.java

Lines changed: 55 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -708,7 +708,6 @@ public void shouldNotDisableAutoReadWhenSummaryRequested()
708708
handler.onRecord( values( "a", "b" ) );
709709
}
710710

711-
verify( connection, never() ).enableAutoRead();
712711
verify( connection, never() ).disableAutoRead();
713712

714713
handler.onSuccess( emptyMap() );
@@ -745,7 +744,6 @@ public void shouldNotDisableAutoReadWhenFailureRequested()
745744
handler.onRecord( values( 123, 456 ) );
746745
}
747746

748-
verify( connection, never() ).enableAutoRead();
749747
verify( connection, never() ).disableAutoRead();
750748

751749
IllegalStateException error = new IllegalStateException( "Wrong config" );
@@ -766,6 +764,61 @@ public void shouldNotDisableAutoReadWhenFailureRequested()
766764
assertNull( await( handler.nextAsync() ) );
767765
}
768766

767+
@Test
768+
public void shouldEnableAutoReadOnConnectionWhenFailureRequestedButNotAvailable() throws Exception
769+
{
770+
Connection connection = connectionMock();
771+
PullAllResponseHandler handler = newHandler( asList( "key1", "key2" ), connection );
772+
773+
handler.onRecord( values( 1, 2 ) );
774+
handler.onRecord( values( 3, 4 ) );
775+
776+
verify( connection, never() ).enableAutoRead();
777+
verify( connection, never() ).disableAutoRead();
778+
779+
CompletableFuture<Throwable> failureFuture = handler.failureAsync().toCompletableFuture();
780+
assertFalse( failureFuture.isDone() );
781+
782+
verify( connection ).enableAutoRead();
783+
verify( connection, never() ).disableAutoRead();
784+
785+
assertNotNull( await( handler.nextAsync() ) );
786+
assertNotNull( await( handler.nextAsync() ) );
787+
788+
RuntimeException error = new RuntimeException( "Oh my!" );
789+
handler.onFailure( error );
790+
791+
assertTrue( failureFuture.isDone() );
792+
assertEquals( error, failureFuture.get() );
793+
}
794+
795+
@Test
796+
public void shouldEnableAutoReadOnConnectionWhenSummaryRequestedButNotAvailable() throws Exception
797+
{
798+
Connection connection = connectionMock();
799+
PullAllResponseHandler handler = newHandler( asList( "key1", "key2", "key3" ), connection );
800+
801+
handler.onRecord( values( 1, 2, 3 ) );
802+
handler.onRecord( values( 4, 5, 6 ) );
803+
804+
verify( connection, never() ).enableAutoRead();
805+
verify( connection, never() ).disableAutoRead();
806+
807+
CompletableFuture<ResultSummary> summaryFuture = handler.summaryAsync().toCompletableFuture();
808+
assertFalse( summaryFuture.isDone() );
809+
810+
verify( connection ).enableAutoRead();
811+
verify( connection, never() ).disableAutoRead();
812+
813+
assertNotNull( await( handler.nextAsync() ) );
814+
assertNotNull( await( handler.nextAsync() ) );
815+
816+
handler.onSuccess( emptyMap() );
817+
818+
assertTrue( summaryFuture.isDone() );
819+
assertNotNull( summaryFuture.get() );
820+
}
821+
769822
private static PullAllResponseHandler newHandler()
770823
{
771824
return newHandler( new Statement( "RETURN 1" ) );

driver/src/test/java/org/neo4j/driver/v1/integration/SessionIT.java

Lines changed: 46 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1405,6 +1405,52 @@ public void shouldAllowAccessingRecordsAfterSessionClosed()
14051405
}
14061406
}
14071407

1408+
@Test
1409+
public void shouldAllowToConsumeRecordsSlowlyAndCloseSession() throws InterruptedException
1410+
{
1411+
Session session = neo4j.driver().session();
1412+
1413+
StatementResult result = session.run( "UNWIND range(10000, 0, -1) AS x RETURN 10 / x" );
1414+
1415+
// consume couple records slowly with a sleep in-between
1416+
for ( int i = 0; i < 10; i++ )
1417+
{
1418+
assertTrue( result.hasNext() );
1419+
assertNotNull( result.next() );
1420+
Thread.sleep( 50 );
1421+
}
1422+
1423+
try
1424+
{
1425+
session.close();
1426+
fail( "Exception expected" );
1427+
}
1428+
catch ( ClientException e )
1429+
{
1430+
assertThat( e, is( arithmeticError() ) );
1431+
}
1432+
}
1433+
1434+
@Test
1435+
public void shouldAllowToConsumeRecordsSlowlyAndRetrieveSummary() throws InterruptedException
1436+
{
1437+
try ( Session session = neo4j.driver().session() )
1438+
{
1439+
StatementResult result = session.run( "UNWIND range(8000, 1, -1) AS x RETURN 42 / x" );
1440+
1441+
// consume couple records slowly with a sleep in-between
1442+
for ( int i = 0; i < 12; i++ )
1443+
{
1444+
assertTrue( result.hasNext() );
1445+
assertNotNull( result.next() );
1446+
Thread.sleep( 50 );
1447+
}
1448+
1449+
ResultSummary summary = result.summary();
1450+
assertNotNull( summary );
1451+
}
1452+
}
1453+
14081454
private void assumeServerIs31OrLater()
14091455
{
14101456
ServerVersion serverVersion = ServerVersion.version( neo4j.driver() );

0 commit comments

Comments
 (0)