Skip to content

feat: Bring back Impala support #518

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 1 commit into from
Apr 6, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
134 changes: 73 additions & 61 deletions drivers/drivers.go
Original file line number Diff line number Diff line change
Expand Up @@ -526,88 +526,100 @@ func Copy(ctx context.Context, u *dburl.URL, stdout, stderr func() io.Writer, ro
return d.Copy(ctx, db, rows, table)
}

// CopyWithInsert builds a copy handler based on insert.
// CopyWithInsert builds a typical copy handler based on insert.
func CopyWithInsert(placeholder func(int) string) func(ctx context.Context, db *sql.DB, rows *sql.Rows, table string) (int64, error) {
if placeholder == nil {
placeholder = func(n int) string { return fmt.Sprintf("$%d", n) }
}
return func(ctx context.Context, db *sql.DB, rows *sql.Rows, table string) (int64, error) {
columns, err := rows.Columns()
if err != nil {
return 0, fmt.Errorf("failed to fetch source rows columns: %w", err)
}
clen := len(columns)
query := table
if !strings.HasPrefix(strings.ToLower(query), "insert into") {
leftParen := strings.IndexRune(table, '(')
if leftParen == -1 {
colRows, err := db.QueryContext(ctx, "SELECT * FROM "+table+" WHERE 1=0")
if err != nil {
return 0, fmt.Errorf("failed to execute query to determine target table columns: %w", err)
}
columns, err := colRows.Columns()
_ = colRows.Close()
if err != nil {
return 0, fmt.Errorf("failed to fetch target table columns: %w", err)
}
table += "(" + strings.Join(columns, ", ") + ")"
return FlexibleCopyWithInsert(ctx, db, rows, table, placeholder, true)
}
}

func FlexibleCopyWithInsert(ctx context.Context, db *sql.DB, rows *sql.Rows, table string, placeholder func(int) string, withTransaction bool) (int64, error) {
columns, err := rows.Columns()
if err != nil {
return 0, fmt.Errorf("failed to fetch source rows columns: %w", err)
}
clen := len(columns)
query := table
if !strings.HasPrefix(strings.ToLower(query), "insert into") {
leftParen := strings.IndexRune(table, '(')
if leftParen == -1 {
colRows, err := db.QueryContext(ctx, "SELECT * FROM "+table+" WHERE 1=0")
if err != nil {
return 0, fmt.Errorf("failed to execute query to determine target table columns: %w", err)
}
// TODO if the db supports multiple rows per insert, create batches of 100 rows
placeholders := make([]string, clen)
for i := 0; i < clen; i++ {
placeholders[i] = placeholder(i + 1)
columns, err := colRows.Columns()
_ = colRows.Close()
if err != nil {
return 0, fmt.Errorf("failed to fetch target table columns: %w", err)
}
query = "INSERT INTO " + table + " VALUES (" + strings.Join(placeholders, ", ") + ")"
table += "(" + strings.Join(columns, ", ") + ")"
}
// TODO if the db supports multiple rows per insert, create batches of 100 rows
placeholders := make([]string, clen)
for i := 0; i < clen; i++ {
placeholders[i] = placeholder(i + 1)
}
tx, err := db.BeginTx(ctx, nil)
query = "INSERT INTO " + table + " VALUES (" + strings.Join(placeholders, ", ") + ")"
}
var stmt *sql.Stmt
var tx *sql.Tx
if withTransaction {
tx, err = db.BeginTx(ctx, nil)
if err != nil {
return 0, fmt.Errorf("failed to begin transaction: %w", err)
}
stmt, err := tx.PrepareContext(ctx, query)
stmt, err = tx.PrepareContext(ctx, query)
} else {
stmt, err = db.PrepareContext(ctx, query)
}
if err != nil {
return 0, fmt.Errorf("failed to prepare insert query: %w", err)
}
defer stmt.Close()
columnTypes, err := rows.ColumnTypes()
if err != nil {
return 0, fmt.Errorf("failed to fetch source column types: %w", err)
}
values := make([]interface{}, clen)
valueRefs := make([]reflect.Value, clen)
actuals := make([]interface{}, clen)
for i := 0; i < len(columnTypes); i++ {
valueRefs[i] = reflect.New(columnTypes[i].ScanType())
values[i] = valueRefs[i].Interface()
}
var n int64
for rows.Next() {
err = rows.Scan(values...)
if err != nil {
return 0, fmt.Errorf("failed to prepare insert query: %w", err)
return n, fmt.Errorf("failed to scan row: %w", err)
}
defer stmt.Close()
columnTypes, err := rows.ColumnTypes()
if err != nil {
return 0, fmt.Errorf("failed to fetch source column types: %w", err)
//We can't use values... in Exec() below, because some drivers
//don't accept pointer to an argument instead of the arg itself.
for i := range values {
actuals[i] = valueRefs[i].Elem().Interface()
}
values := make([]interface{}, clen)
valueRefs := make([]reflect.Value, clen)
actuals := make([]interface{}, clen)
for i := 0; i < len(columnTypes); i++ {
valueRefs[i] = reflect.New(columnTypes[i].ScanType())
values[i] = valueRefs[i].Interface()
res, err := stmt.ExecContext(ctx, actuals...)
if err != nil {
return n, fmt.Errorf("failed to exec insert: %w", err)
}
var n int64
for rows.Next() {
err = rows.Scan(values...)
if err != nil {
return n, fmt.Errorf("failed to scan row: %w", err)
}
//We can't use values... in Exec() below, because some drivers
//don't accept pointer to an argument instead of the arg itself.
for i := range values {
actuals[i] = valueRefs[i].Elem().Interface()
}
res, err := stmt.ExecContext(ctx, actuals...)
if err != nil {
return n, fmt.Errorf("failed to exec insert: %w", err)
}
rn, err := res.RowsAffected()
if err != nil {
return n, fmt.Errorf("failed to check rows affected: %w", err)
}
n += rn
rn, err := res.RowsAffected()
if err != nil {
return n, fmt.Errorf("failed to check rows affected: %w", err)
}
// TODO if using batches, flush the last batch,
// TODO prepare another statement and count remaining rows
n += rn
}
// TODO if using batches, flush the last batch,
// TODO prepare another statement and count remaining rows
if tx != nil {
err = tx.Commit()
if err != nil {
return n, fmt.Errorf("failed to commit transaction: %w", err)
}
return n, rows.Err()
}
return n, rows.Err()
}

func init() {
Expand Down
22 changes: 19 additions & 3 deletions drivers/impala/impala.go
Original file line number Diff line number Diff line change
@@ -1,14 +1,30 @@
// Package impala defines and registers usql's Apache Impala driver.
//
// See: https://github.com/bippio/go-impala
// Group: bad
package impala

import (
_ "github.com/bippio/go-impala" // DRIVER
"context"
"database/sql"
"errors"

"github.com/sclgo/impala-go" // DRIVER
"github.com/xo/usql/drivers"
meta "github.com/xo/usql/drivers/metadata/impala"
)

func init() {
drivers.Register("impala", drivers.Driver{})
drivers.Register("impala", drivers.Driver{
NewMetadataReader: meta.New,
Copy: func(ctx context.Context, db *sql.DB, rows *sql.Rows, table string) (int64, error) {
placeholder := func(int) string {
return "?"
}
return drivers.FlexibleCopyWithInsert(ctx, db, rows, table, placeholder, false)
},
IsPasswordErr: func(err error) bool {
var authError *impala.AuthError
return errors.As(err, &authError)
},
})
}
77 changes: 77 additions & 0 deletions drivers/metadata/impala/metadata.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,77 @@
package impala

import (
"context"
"database/sql"

"github.com/xo/usql/drivers"
"github.com/xo/usql/drivers/metadata"

driver "github.com/sclgo/impala-go"
)

type MetaReader struct {
meta *driver.Metadata
}

func (r MetaReader) Columns(filter metadata.Filter) (*metadata.ColumnSet, error) {
columnIds, err := r.meta.GetColumns(context.Background(), filter.Schema, filter.Parent, filter.Name)
if err != nil {
return nil, err
}
columns := make([]metadata.Column, len(columnIds))
for i, columnId := range columnIds {
columns[i] = metadata.Column{
Schema: columnId.Schema,
Table: columnId.TableName,
Name: columnId.ColumnName,
}
}
return metadata.NewColumnSet(columns), nil
}

func (r MetaReader) Schemas(filter metadata.Filter) (*metadata.SchemaSet, error) {
schemaNames, err := r.meta.GetSchemas(context.Background(), filter.Name)
if err != nil {
return nil, err
}
schemas := make([]metadata.Schema, len(schemaNames))
for i, name := range schemaNames {
schemas[i] = metadata.Schema{
Schema: name,
}
}
return metadata.NewSchemaSet(schemas), nil
}

func (r MetaReader) Tables(filter metadata.Filter) (*metadata.TableSet, error) {
tableIds, err := r.meta.GetTables(context.Background(), filter.Schema, filter.Name)
if err != nil {
return nil, err
}
tables := make([]metadata.Table, len(tableIds))
for i, table := range tableIds {
tables[i] = metadata.Table{
Schema: table.Schema,
Name: table.Name,
Type: table.Type,
}
}
return metadata.NewTableSet(tables), nil
}

var (
_ metadata.SchemaReader = (*MetaReader)(nil)
_ metadata.TableReader = (*MetaReader)(nil)
_ metadata.ColumnReader = (*MetaReader)(nil)
)

func New(db drivers.DB, _ ...metadata.ReaderOption) metadata.Reader {
if sqlDb, ok := db.(*sql.DB); ok {
return &MetaReader{
meta: driver.NewMetadata(sqlDb),
}
} else {
return struct{}{} // reader with no capabilities
}
}
17 changes: 9 additions & 8 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@ require (
github.com/amsokol/ignite-go-client v0.12.2
github.com/apache/arrow/go/v17 v17.0.0
github.com/apache/calcite-avatica-go/v5 v5.4.0
github.com/bippio/go-impala v2.1.0+incompatible
github.com/btnguyen2k/gocosmos v1.1.0
github.com/btnguyen2k/godynamo v1.3.0
github.com/chaisql/chai v0.16.1-0.20240218103834-23e406360fd2
Expand Down Expand Up @@ -50,6 +49,7 @@ require (
github.com/ory/dockertest/v3 v3.11.0
github.com/prestodb/presto-go-client v0.0.0-20240426182841-905ac40a1783
github.com/proullon/ramsql v0.1.4
github.com/sclgo/impala-go v1.1.0
github.com/sijms/go-ora/v2 v2.8.24
github.com/snowflakedb/gosnowflake v1.13.2
github.com/spf13/cobra v1.9.1
Expand Down Expand Up @@ -84,15 +84,15 @@ require (
cloud.google.com/go/longrunning v0.6.6 // indirect
cloud.google.com/go/monitoring v1.24.1 // indirect
cloud.google.com/go/spanner v1.78.0 // indirect
dario.cat/mergo v1.0.0 // indirect
dario.cat/mergo v1.0.1 // indirect
filippo.io/edwards25519 v1.1.0 // indirect
github.com/99designs/go-keychain v0.0.0-20191008050251-8e49817e8af4 // indirect
github.com/99designs/keyring v1.2.2 // indirect
github.com/Azure/azure-sdk-for-go/sdk/azcore v1.18.0 // indirect
github.com/Azure/azure-sdk-for-go/sdk/azidentity v1.8.2 // indirect
github.com/Azure/azure-sdk-for-go/sdk/internal v1.11.0 // indirect
github.com/Azure/azure-sdk-for-go/sdk/storage/azblob v1.6.0 // indirect
github.com/Azure/go-ansiterm v0.0.0-20230124172434-306776ec8161 // indirect
github.com/Azure/go-ansiterm v0.0.0-20250102033503-faa5f7b0171c // indirect
github.com/AzureAD/microsoft-authentication-library-for-go v1.4.2 // indirect
github.com/BurntSushi/toml v1.5.0 // indirect
github.com/ClickHouse/ch-go v0.65.1 // indirect
Expand Down Expand Up @@ -236,7 +236,7 @@ require (
github.com/kr/pretty v0.3.1 // indirect
github.com/kr/text v0.2.0 // indirect
github.com/kylelemons/godebug v1.1.0 // indirect
github.com/magiconair/properties v1.8.7 // indirect
github.com/magiconair/properties v1.8.9 // indirect
github.com/mattn/go-colorable v0.1.14 // indirect
github.com/mattn/go-runewidth v0.0.16 // indirect
github.com/mattn/go-sixel v0.0.5 // indirect
Expand All @@ -249,17 +249,17 @@ require (
github.com/mithrandie/ternary v1.1.1 // indirect
github.com/moby/docker-image-spec v1.3.1 // indirect
github.com/moby/patternmatcher v0.6.0 // indirect
github.com/moby/sys/sequential v0.5.0 // indirect
github.com/moby/sys/user v0.1.0 // indirect
github.com/moby/sys/sequential v0.6.0 // indirect
github.com/moby/sys/user v0.3.0 // indirect
github.com/moby/sys/userns v0.1.0 // indirect
github.com/moby/term v0.5.0 // indirect
github.com/moby/term v0.5.2 // indirect
github.com/mtibben/percent v0.2.1 // indirect
github.com/munnerz/goautoneg v0.0.0-20191010083416-a7dc8b61c822 // indirect
github.com/nakagami/chacha20 v0.1.0 // indirect
github.com/nathan-fiscaletti/consolesize-go v0.0.0-20220204101620-317176b6684d // indirect
github.com/ncruces/go-strftime v0.1.9 // indirect
github.com/opencontainers/go-digest v1.0.0 // indirect
github.com/opencontainers/image-spec v1.1.0 // indirect
github.com/opencontainers/image-spec v1.1.1 // indirect
github.com/opencontainers/runc v1.1.13 // indirect
github.com/paulmach/orb v0.11.1 // indirect
github.com/pelletier/go-toml/v2 v2.2.3 // indirect
Expand All @@ -278,6 +278,7 @@ require (
github.com/rs/zerolog v1.34.0 // indirect
github.com/sagikazarmark/locafero v0.6.0 // indirect
github.com/sagikazarmark/slog-shim v0.1.0 // indirect
github.com/samber/lo v1.49.1 // indirect
github.com/segmentio/asm v1.2.0 // indirect
github.com/shopspring/decimal v1.4.0 // indirect
github.com/sirupsen/logrus v1.9.3 // indirect
Expand Down
Loading
Loading