@@ -35,10 +35,14 @@ class Neo4jExtractor(Extractor):
35
35
"""NEO4J_ENCRYPTED is a boolean indicating whether to use SSL/TLS when connecting."""
36
36
NEO4J_VALIDATE_SSL = 'neo4j_validate_ssl'
37
37
"""NEO4J_VALIDATE_SSL is a boolean indicating whether to validate the server's SSL/TLS cert against system CAs."""
38
+ NEO4J_USE_IMPLICIT_TRANSACTIONS = 'neo4j_use_implicit_transactions'
39
+ """NEO4J_USE_IMPLICIT_TRANSACTIONS is a boolean indicating whether to use implicit or explicit transactions. This
40
+ is only needed when implicit transactions are required, such as for CALL {} IN TRANSACTIONS queries."""
38
41
39
42
DEFAULT_CONFIG = ConfigFactory .from_dict ({
40
43
NEO4J_MAX_CONN_LIFE_TIME_SEC : 50 ,
41
44
NEO4J_DATABASE_NAME : neo4j .DEFAULT_DATABASE ,
45
+ NEO4J_USE_IMPLICIT_TRANSACTIONS : False ,
42
46
})
43
47
44
48
def init (self , conf : ConfigTree ) -> None :
@@ -50,6 +54,7 @@ def init(self, conf: ConfigTree) -> None:
50
54
self .graph_url = self .conf .get_string (Neo4jExtractor .GRAPH_URL_CONFIG_KEY )
51
55
self .cypher_query = self .conf .get_string (Neo4jExtractor .CYPHER_QUERY_CONFIG_KEY )
52
56
self .db_name = self .conf .get_string (Neo4jExtractor .NEO4J_DATABASE_NAME )
57
+ self .use_implicit_transactions = self .conf .get (Neo4jExtractor .NEO4J_USE_IMPLICIT_TRANSACTIONS )
53
58
54
59
uri = self .conf .get_string (Neo4jExtractor .GRAPH_URL_CONFIG_KEY )
55
60
driver_args = {
@@ -107,10 +112,15 @@ def _get_extract_iter(self) -> Iterator[Any]:
107
112
Execute {cypher_query} and yield result one at a time
108
113
"""
109
114
with self .driver .session (
110
- database = self .db_name
115
+ database = self .db_name ,
116
+ default_access_mode = neo4j .READ_ACCESS
111
117
) as session :
112
118
if not hasattr (self , 'results' ):
113
- self .results = session .read_transaction (self ._execute_query )
119
+ if not self .use_implicit_transactions :
120
+ self .results = session .read_transaction (self ._execute_query )
121
+ else :
122
+ LOGGER .info ('Executing query in implicit transaction %s' , self .cypher_query )
123
+ self .results = session .run (self .cypher_query ).data ()
114
124
115
125
for result in self .results :
116
126
if hasattr (self , 'model_class' ):
0 commit comments