From b9d553e551471ebee09a9093e1db06bfbc72577b Mon Sep 17 00:00:00 2001 From: Marcin Tolysz Date: Fri, 23 Dec 2016 00:36:03 +0000 Subject: [PATCH] Allow to select flushing strategy `NoFlush` is the default, but the `SyncFlush` is useful with the incremental compression eg. websockets --- Codec/Compression/Zlib/Internal.hs | 25 ++++++++++++-------- Codec/Compression/Zlib/Stream.hsc | 1 + test/Test/Codec/Compression/Zlib/Internal.hs | 2 ++ test/Test/Codec/Compression/Zlib/Stream.hs | 3 +++ 4 files changed, 21 insertions(+), 10 deletions(-) diff --git a/Codec/Compression/Zlib/Internal.hs b/Codec/Compression/Zlib/Internal.hs index 510d427..7005746 100644 --- a/Codec/Compression/Zlib/Internal.hs +++ b/Codec/Compression/Zlib/Internal.hs @@ -45,6 +45,7 @@ module Codec.Compression.Zlib.Internal ( defaultCompressParams, DecompressParams(..), defaultDecompressParams, + Stream.Flush(..), Stream.Format(..), Stream.gzipFormat, Stream.zlibFormat, @@ -111,7 +112,8 @@ data CompressParams = CompressParams { compressMemoryLevel :: !Stream.MemoryLevel, compressStrategy :: !Stream.CompressionStrategy, compressBufferSize :: !Int, - compressDictionary :: Maybe S.ByteString + compressDictionary :: Maybe S.ByteString, + compressFlush :: !Stream.Flush } deriving Show -- | The full set of parameters for decompression. The defaults are @@ -137,7 +139,8 @@ data DecompressParams = DecompressParams { decompressWindowBits :: !Stream.WindowBits, decompressBufferSize :: !Int, decompressDictionary :: Maybe S.ByteString, - decompressAllMembers :: Bool + decompressAllMembers :: Bool, + decompressFlush :: !Stream.Flush } deriving Show -- | The default set of parameters for compression. This is typically used with @@ -151,7 +154,8 @@ defaultCompressParams = CompressParams { compressMemoryLevel = Stream.defaultMemoryLevel, compressStrategy = Stream.defaultStrategy, compressBufferSize = defaultCompressBufferSize, - compressDictionary = Nothing + compressDictionary = Nothing, + compressFlush = Stream.NoFlush } -- | The default set of parameters for decompression. This is typically used with @@ -162,7 +166,8 @@ defaultDecompressParams = DecompressParams { decompressWindowBits = Stream.defaultWindowBits, decompressBufferSize = defaultDecompressBufferSize, decompressDictionary = Nothing, - decompressAllMembers = True + decompressAllMembers = True, + decompressFlush = Stream.NoFlush } -- | The default chunk sizes for the output of compression and decompression @@ -466,7 +471,7 @@ compressIO format params = compressStreamIO format params compressStream :: Stream.Format -> CompressParams -> S.ByteString -> Stream (CompressStream Stream) compressStream format (CompressParams compLevel method bits memLevel - strategy initChunkSize mdict) = + strategy initChunkSize mdict flushStrategy) = \chunk -> do Stream.deflateInit format compLevel method bits memLevel strategy @@ -526,13 +531,13 @@ compressStream format (CompressParams compLevel method bits memLevel -- this invariant guarantees we can always make forward progress -- and that therefore a BufferError is impossible - let flush = if lastChunk then Stream.Finish else Stream.NoFlush + let flush = if lastChunk then Stream.Finish else flushStrategy status <- Stream.deflate flush case status of Stream.Ok -> do outputBufferFull <- Stream.outputBufferFull - if outputBufferFull + if outputBufferFull || flushStrategy /= Stream.NoFlush then do (outFPtr, offset, length) <- Stream.popOutputBuffer let chunk = S.PS outFPtr offset length return $ CompressOutputAvailable chunk $ do @@ -596,7 +601,7 @@ decompressIO format params = decompressStreamIO format params decompressStream :: Stream.Format -> DecompressParams -> Bool -> S.ByteString -> Stream (DecompressStream Stream) -decompressStream format (DecompressParams bits initChunkSize mdict allMembers) +decompressStream format (DecompressParams bits initChunkSize mdict allMembers flushStrategy) resume = \chunk -> do @@ -675,12 +680,12 @@ decompressStream format (DecompressParams bits initChunkSize mdict allMembers) -- this invariant guarantees we can always make forward progress or at -- least if a BufferError does occur that it must be due to a premature EOF - status <- Stream.inflate Stream.NoFlush + status <- Stream.inflate flushStrategy case status of Stream.Ok -> do outputBufferFull <- Stream.outputBufferFull - if outputBufferFull + if outputBufferFull || flushStrategy /= Stream.NoFlush then do (outFPtr, offset, length) <- Stream.popOutputBuffer let chunk = S.PS outFPtr offset length return $ DecompressOutputAvailable chunk $ do diff --git a/Codec/Compression/Zlib/Stream.hsc b/Codec/Compression/Zlib/Stream.hsc index d47ed92..eafc78a 100644 --- a/Codec/Compression/Zlib/Stream.hsc +++ b/Codec/Compression/Zlib/Stream.hsc @@ -568,6 +568,7 @@ data Flush = | FullFlush | Finish -- | Block -- only available in zlib 1.2 and later, uncomment if you need it. + deriving (Show, Eq) fromFlush :: Flush -> CInt fromFlush NoFlush = #{const Z_NO_FLUSH} diff --git a/test/Test/Codec/Compression/Zlib/Internal.hs b/test/Test/Codec/Compression/Zlib/Internal.hs index 521a020..70bb132 100644 --- a/test/Test/Codec/Compression/Zlib/Internal.hs +++ b/test/Test/Codec/Compression/Zlib/Internal.hs @@ -16,6 +16,7 @@ instance Arbitrary CompressParams where `ap` arbitrary `ap` arbitrary `ap` arbitrary `ap` arbitraryBufferSize `ap` return Nothing + `ap` arbitrary arbitraryBufferSize :: Gen Int arbitraryBufferSize = frequency $ [(10, return n) | n <- [1..1024]] ++ @@ -29,4 +30,5 @@ instance Arbitrary DecompressParams where `ap` arbitraryBufferSize `ap` return Nothing `ap` arbitrary + `ap` arbitrary diff --git a/test/Test/Codec/Compression/Zlib/Stream.hs b/test/Test/Codec/Compression/Zlib/Stream.hs index 881ee45..49d4a59 100644 --- a/test/Test/Codec/Compression/Zlib/Stream.hs +++ b/test/Test/Codec/Compression/Zlib/Stream.hs @@ -16,6 +16,9 @@ instance Arbitrary Format where instance Arbitrary Method where arbitrary = return deflateMethod +instance Arbitrary Flush where + arbitrary = elements [NoFlush] + -- SyncFlush, Finish, FullFlush instance Arbitrary CompressionLevel where arbitrary = elements $ [defaultCompression, noCompression,