Skip to content

Commit effa353

Browse files
committed
Support for allowlist/blocklist that includes skipping decompression and string table load for columns not loaded.
1 parent 9603db4 commit effa353

File tree

1 file changed

+25
-25
lines changed

1 file changed

+25
-25
lines changed

src/tech/v3/libs/arrow.clj

Lines changed: 25 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -106,6 +106,7 @@
106106
[ham-fisted.api :as hamf]
107107
[ham-fisted.reduce :as hamf-rf]
108108
[ham-fisted.lazy-noncaching :as lznc]
109+
[ham-fisted.function :as hamf-fn]
109110
[ham-fisted.set :as set]
110111
[ham-fisted.protocols :as hamf-proto])
111112
(:import [ham_fisted ArrayLists]
@@ -1459,7 +1460,7 @@ Dependent block frames are not supported!!")
14591460
(fn parse-boolean-field
14601461
[decompressor]
14611462
;;nil subtype means null column
1462-
(if
1463+
(if nil-subtype?
14631464
(col-impl/new-column
14641465
(:name field)
14651466
(dtype/const-reader false n-elems)
@@ -1830,33 +1831,32 @@ Dependent block frames are not supported!!")
18301831
(deftype NextDatasetIter [schema ^Iterator messages fname
18311832
^{:unsynchronized-mutable true
18321833
:tag long} idx
1833-
^:unsynchronized-mutable dict-map
1834+
^Map dict-map
18341835
options]
18351836
Iterator
18361837
(hasNext [this] (.hasNext messages))
18371838
(next [this]
1838-
(loop [msg (.next messages)
1839-
dicts dict-map]
1839+
(loop [msg (.next messages)]
18401840
(if (identical? :dictionary-batch (get msg :message-type))
1841-
(recur (maybe-next messages)
1842-
(update dicts (get msg :id)
1843-
(fn [old-val]
1844-
(delay
1845-
(let [new-val (dictionary->strings msg)]
1846-
(if (and old-val (new-val :delta?))
1847-
(update new-val :strings
1848-
(fn [new-strs]
1849-
(let [old-strs (get @old-val :strings)
1850-
new-ec (+ (count new-strs) (count old-strs))
1851-
rv (hamf/wrap-array-growable
1852-
(make-array String new-ec)
1853-
0)]
1854-
(.addAllReducible rv old-strs)
1855-
(.addAllReducible rv new-strs)
1856-
rv))))
1857-
new-val)))))
1841+
(do
1842+
(.compute dict-map (get msg :id)
1843+
(hamf-fn/bi-function
1844+
k old-val
1845+
(delay
1846+
(let [new-val (dictionary->strings msg)]
1847+
(if (and old-val (new-val :delta?))
1848+
(let [old-strs (get @old-val :strings)
1849+
new-strs (get new-val :strings)
1850+
new-ec (+ (count new-strs) (count old-strs))
1851+
rv (hamf/wrap-array-growable
1852+
(make-array String new-ec)
1853+
0)]
1854+
(.addAllReducible rv old-strs)
1855+
(.addAllReducible rv new-strs)
1856+
(assoc new-val :strings rv))
1857+
new-val)))))
1858+
(recur (maybe-next messages)))
18581859
(let [cur-idx idx]
1859-
(set! dict-map dicts)
18601860
(set! idx (inc cur-idx))
18611861
(-> (records->ds schema dict-map msg options)
18621862
(ds-base/set-dataset-name (format "%s-%03d" fname cur-idx))))))))
@@ -1881,7 +1881,7 @@ Dependent block frames are not supported!!")
18811881
(get options :close-input-stream? true))
18821882
(.close ^InputStream input))
18831883
(throw (Exception. "Initial message is not a schema message.")))
1884-
(NextDatasetIter. schema messages fname 0 {} options)))
1884+
(NextDatasetIter. schema messages fname 0 (hamf/java-hashmap) options)))
18851885

18861886
(defn stream->dataset-iterable
18871887
"Loads data up to and including the first data record. Returns the a lazy
@@ -1924,9 +1924,9 @@ Dependent block frames are not supported!!")
19241924
:arrow-file
19251925
(next-dataset-iter (discard input 8) fname options)
19261926
:arrow-ipc
1927-
(next-dataset-iter input options)
1927+
(next-dataset-iter input fname options)
19281928
:feather-v1
1929-
[(feather->ds input options)])))))
1929+
(iter (hamf/vector (feather->ds input options))))))))
19301930

19311931
(defn ^:no-doc stream->dataset-seq
19321932
"see docs for [[stream->dataset-iterable]]"

0 commit comments

Comments
 (0)