Skip to content

Commit

Permalink
Fix empty result set parsing in sadrill and add DB-API compliance tes…
Browse files Browse the repository at this point in the history
…t suite (#91)

* Fix handling of responses with an empty rows array.

* Rename deprecated dbapi() classmethod.

* Collect test code under the test/ directory.

* Replace logger.warn calls with logger.warning calls.

* Add Drill testcontainer pytest fixtures and test_sadrill.py with a test for empty reults.

* Add DB-API 2.0 compliance test suite.

* Add dbapi20.py from https://github.com/baztian/dbapi-compliance/.

* test_Exceptions: import all exception types into _drilldbapi.

* test_setinputsizes: fix method signatures.

* Set no-op logging statements to DEBUG level.

* test_fetchmany: default arraysize = 1.

* test_execute: fix parameter substitution.

* test_description: use DBAPITypeObject in cursor.description.

* Add pytest + testcontainers run to GitHub CI.

* Add ijson to requirements/common.txt

* Bump version to 1.1.6.

* Remove debug logger level setting.
  • Loading branch information
jnturton authored Feb 25, 2025
1 parent 27c8aa8 commit 32e4d83
Show file tree
Hide file tree
Showing 19 changed files with 1,250 additions and 90 deletions.
3 changes: 1 addition & 2 deletions .editorconfig
Original file line number Diff line number Diff line change
Expand Up @@ -10,5 +10,4 @@ insert_final_newline = true

[*.py]
indent_size = 4
max_line_length = 80

max_line_length = 100
31 changes: 31 additions & 0 deletions .github/workflows/test.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
# This workflow will upload a Python Package using Twine when a release is created
# For more information see: https://docs.github.com/en/actions/automating-builds-and-tests/building-and-testing-python#publishing-to-package-registries

# This workflow uses actions that are not certified by GitHub.
# They are provided by a third-party and are governed by
# separate terms of service, privacy policy, and support
# documentation.

name: Run test suite

on: [push, pull_request]

permissions:
contents: read

jobs:
test:
runs-on: ubuntu-latest

steps:
- uses: actions/checkout@v4
- name: Set up Python
uses: actions/setup-python@v4
with:
python-version: '3.x'
- name: Install dependencies
run: |
python -m pip install --upgrade pip
pip install -r requirements/test.txt
- name: Run pytest
run: python -m pytest -vs test/test_dbapi_compliance.py test/test_sadrill.py
12 changes: 11 additions & 1 deletion CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,10 +1,20 @@
## [Unreleased]

## [1.1.6] - 2025-02-24

### Fixed

- Parsing of empty result set data in sadrill.

### Changed

- Added a DB-API compliance test suite running against a local Drill using testcontainers.
-
## [1.1.5] - 2024-06-04

### Fixed

- Fix a leaked StopIteration from a generator.
- Fix a leaked StopIteration from a generator in sadrill.

## [1.1.4] - 2023-10-23

Expand Down
1 change: 1 addition & 0 deletions requirements/common.txt
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
ijson~=3.3.0
sqlalchemy
JayDeBeApi
pyodbc
3 changes: 3 additions & 0 deletions requirements/test.txt
Original file line number Diff line number Diff line change
@@ -1,4 +1,7 @@
-r common.txt
nose
mock
pytest
coverage
testcontainers
dbapi-compliance
4 changes: 2 additions & 2 deletions setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@
long_description = f.read()

setup(name='sqlalchemy_drill',
version='1.1.5',
version='1.1.6',
description="Apache Drill for SQLAlchemy",
long_description=long_description,
long_description_content_type="text/markdown",
Expand Down Expand Up @@ -64,7 +64,7 @@
license='MIT',
url='https://github.com/JohnOmernik/sqlalchemy-drill',
download_url='https://github.com/JohnOmernik/sqlalchemy-drill/archive/'
'1.1.5.tar.gz',
'1.1.6.tar.gz',
packages=find_packages(),
include_package_data=True,
tests_require=['nose >= 0.11'],
Expand Down
2 changes: 1 addition & 1 deletion sqlalchemy_drill/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@
# OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER
# DEALINGS IN THE SOFTWARE.

__version__ = '1.1.5'
__version__ = '1.1.6'
from sqlalchemy.dialects import registry

registry.register("drill", "sqlalchemy_drill.sadrill", "DrillDialect_sadrill")
Expand Down
78 changes: 48 additions & 30 deletions sqlalchemy_drill/drilldbapi/_drilldbapi.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,8 +10,9 @@
from datetime import date, time, datetime
from time import gmtime
from . import api_globals
from .api_exceptions import AuthError, DatabaseError, ProgrammingError, \
CursorClosedException, ConnectionClosedException
from .api_exceptions import (AuthError, ConnectionClosedException, CursorClosedException,
DatabaseError, Error, IntegrityError, InterfaceError, InternalError,
NotSupportedError, OperationalError, ProgrammingError, Warning)

apilevel = '2.0'
threadsafety = 3
Expand All @@ -30,10 +31,12 @@ def substitute_in_query(string_query, parameters):
query = string_query
try:
for param in parameters:
if type(param) == str:
if isinstance(param, str):
param = f"'{param}'"
else:
param = str(param)

query.replace('?', param, 1)
query = query.replace('?', param, 1)
logger.debug(f'set parameter value {param}')
except Exception as ex:
logger.error(f'query parameter substitution encountered {ex}.')
Expand All @@ -46,7 +49,7 @@ def substitute_in_query(string_query, parameters):

def __init__(self, conn):

self.arraysize: int = 200
self.arraysize: int = 1
self.description: tuple = None
self.connection = conn
self.rowcount: int = -1
Expand Down Expand Up @@ -76,10 +79,12 @@ def func_wrapper(self, *args, **kwargs):

def _gen_description(self, col_types):
blank = [None] * len(self.result_md['columns'])
dbapi_col_types = [DBAPITypeObject(col_type) for col_type in col_types]

self.description = tuple(
zip(
self.result_md['columns'], # name
col_types or blank, # type_code
dbapi_col_types or blank, # type_code
blank, # display_size
blank, # internal_size
blank, # precision
Expand Down Expand Up @@ -139,6 +144,9 @@ def _outer_parsing_loop(self) -> bool:
continue

if value == 'rows':
# discard the array node itself
next(self._result_event_stream)

self._row_stream = _items_once(
self._result_event_stream, 'rows.item'
)
Expand Down Expand Up @@ -220,12 +228,12 @@ def execute(self, operation, parameters=()):
self._gen_description(basic_coltypes)

self._typecaster_list = [
self.connection.typecasters.get(col, lambda v: v) for
self.connection.python_typecasters.get(col, lambda v: v) for
col in basic_coltypes
]
else:
self._gen_description(None)
logger.warn(
logger.warning(
'encountered data before metadata, typecasting during '
'streaming by this module will not take place. Upgrade '
'to Drill >= 1.19 or apply your own typecasting.'
Expand Down Expand Up @@ -293,13 +301,13 @@ def fetchall(self) -> List:
'''Fetch all (remaining) rows of a query result.'''
return self.fetchmany(-1)

def setinputsizes(sizes):
def setinputsizes(self, *sizes):
'''Not supported.'''
logger.warn('setinputsizes is a no-op in this driver.')
logger.debug('setinputsizes is a no-op in this driver.')

def setoutputsize(size, column=0):
def setoutputsize(self, size, column=0):
'''Not supported.'''
logger.warn('setoutputsize is a no-op in this driver.')
logger.debug('setoutputsize is a no-op in this driver.')

@is_open
def get_query_id(self) -> str:
Expand Down Expand Up @@ -354,10 +362,10 @@ def __init__(self,
logger.info(f'has connected to Drill version {self.drill_version}.')

if self.drill_version < '1.19':
self.typecasters = {}
self.python_typecasters = {}
else:
# Starting in 1.19 the Drill REST API returns UNIX times
self.typecasters = {
self.python_typecasters = {
'DATE': lambda v: DateFromTicks(v),
'TIME': lambda v: TimeFromTicks(v),
'TIMESTAMP': lambda v: TimestampFromTicks(v)
Expand All @@ -384,6 +392,9 @@ def submit_query(self, query: str):
stream=True
)

logger.debug('received an HTTP response with body:')
logger.debug(resp.text)

if resp.status_code == 200:
return resp
else:
Expand Down Expand Up @@ -414,12 +425,12 @@ def close(self):
self._session.close()
self._connected = False
except Exception as ex:
logger.warn(f'encountered {ex} when try to close connection.')
logger.warning(f'encountered {ex} when try to close connection.')
raise ConnectionClosedException('Failed to close connection')

@connected
def commit(self):
logger.info('A commit is a no-op in this driver.')
logger.debug('commit is a no-op in this driver.')

@connected
def cursor(self) -> Cursor:
Expand Down Expand Up @@ -502,21 +513,18 @@ def read(self, n):

def _items_once(event_stream, prefix):
'''
Generator dispatching native Python objects constructed from the ijson
events under the next occurrence of the given prefix. It is very
similar to ijson.items except that it will not consume the entire JSON
stream looking for occurrences of prefix, but rather stop after
completing the *first* encountered occurrence of prefix. The need for
this property is what precluded the use of ijson.items instead.
Generator dispatching native Python objects constructed from the ijson events under the next
occurrence of the given prefix. It is similar similar to ijson.items except that it will
not consume the entire JSON stream looking for occurrences of prefix, but rather stop after
completing the current occurrence of prefix. The need for this behaviour is what precluded
the use of ijson.items instead.
'''
current = None
while current != prefix:
try:
current, event, value = next(event_stream)
except StopIteration:
return # see PEP-479

logger.debug(f'found and will now parse an occurrence of {prefix}')
try:
current, event, value = next(event_stream)
except StopIteration:
return # see PEP-479

while current == prefix:
if event in ('start_map', 'start_array'):
object_depth = 1
Expand All @@ -536,7 +544,11 @@ def _items_once(event_stream, prefix):
else:
yield value

current, event, value = next(event_stream)
try:
current, event, value = next(event_stream)
except StopIteration:
return # see PEP-479

logger.debug(f'finished parsing one occurrence of {prefix}')


Expand All @@ -552,6 +564,12 @@ def __cmp__(self, other):
else:
return -1

def __eq__(self, other):
return self.values == other.values

def __hash__(self):
return hash(repr(self))


# Mandatory type objects defined by DB-API 2 specs.

Expand Down
60 changes: 60 additions & 0 deletions sqlalchemy_drill/drilldbapi/api_exceptions.py
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,66 @@ def __str__(self):
)


class InterfaceError(Error):

def __str__(self):
return repr(
"{msg} {type} {err}".format(
msg=self.message,
type="HTTP ERROR:",
err=self.httperror,
)
)


class OperationalError(Error):

def __str__(self):
return repr(
"{msg} {type} {err}".format(
msg=self.message,
type="HTTP ERROR:",
err=self.httperror,
)
)


class IntegrityError(Error):

def __str__(self):
return repr(
"{msg} {type} {err}".format(
msg=self.message,
type="HTTP ERROR:",
err=self.httperror,
)
)


class InternalError(Error):

def __str__(self):
return repr(
"{msg} {type} {err}".format(
msg=self.message,
type="HTTP ERROR:",
err=self.httperror,
)
)


class NotSupportedError(Error):

def __str__(self):
return repr(
"{msg} {type} {err}".format(
msg=self.message,
type="HTTP ERROR:",
err=self.httperror,
)
)


class CursorClosedException(Error):

def __init__(self, message):
Expand Down
8 changes: 6 additions & 2 deletions sqlalchemy_drill/sadrill.py
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,6 @@ class DrillDialect_sadrill(DrillDialect):

name = 'drilldbapi'
driver = 'rest'
dbapi = ''
preparer = DrillIdentifierPreparer
statement_compiler = DrillCompiler_sadrill
poolclass = pool.SingletonThreadPool
Expand All @@ -65,10 +64,15 @@ def __init__(self, **kw):
self.supported_extensions = []

@classmethod
def dbapi(cls):
def import_dbapi(cls):
import sqlalchemy_drill.drilldbapi as module
return module

@classmethod
def dbapi(cls):
"""Deprecated in SQLAlchemy, retained for backwards compatibility."""
DrillCompiler_sadrill.import_dbapi()

def create_connect_args(self, url, **kwargs):
url_port = url.port or 8047
qargs = {'host': url.host, 'port': url_port}
Expand Down
Loading

0 comments on commit 32e4d83

Please sign in to comment.