@@ -223,6 +223,8 @@ static class BloscCompressor extends Compressor {
223223 public final static int defaultShuffle = BYTESHUFFLE ;
224224 public final static String keyBlocksize = "blocksize" ;
225225 public final static int defaultBlocksize = 0 ;
226+ public final static String keyNumThreads = "nthreads" ;
227+ public final static int defaultNumThreads = 1 ;
226228 public final static int [] supportedShuffle = new int []{/*AUTOSHUFFLE, */ NOSHUFFLE , BYTESHUFFLE , BITSHUFFLE };
227229 public final static String [] supportedCnames = new String []{"zstd" , "blosclz" , defaultCname , "lz4hc" , "zlib" /*, "snappy"*/ };
228230
@@ -232,12 +234,14 @@ static class BloscCompressor extends Compressor {
232234 put (keyClevel , defaultCLevel );
233235 put (keyShuffle , defaultShuffle );
234236 put (keyBlocksize , defaultBlocksize );
237+ put (keyNumThreads , defaultNumThreads );
235238 }});
236239
237240 private final int clevel ;
238241 private final int blocksize ;
239242 private final int shuffle ;
240243 private final String cname ;
244+ private final int nthreads ;
241245
242246 private BloscCompressor (Map <String , Object > map ) {
243247 final Object cnameObj = map .get (keyCname );
@@ -285,6 +289,15 @@ private BloscCompressor(Map<String, Object> map) {
285289 } else {
286290 this .blocksize = ((Number ) blocksizeObj ).intValue ();
287291 }
292+
293+ final Object nthreadsObj = map .get (keyNumThreads );
294+ if (nthreadsObj == null ) {
295+ this .nthreads = defaultNumThreads ;
296+ } else if (nthreadsObj instanceof String ) {
297+ this .nthreads = Integer .parseInt ((String ) nthreadsObj );
298+ } else {
299+ this .nthreads = ((Number ) nthreadsObj ).intValue ();
300+ }
288301 }
289302
290303 @ Override
@@ -308,6 +321,10 @@ public String getCname() {
308321 return cname ;
309322 }
310323
324+ public int getNumThreads () {
325+ return nthreads ;
326+ }
327+
311328 @ Override
312329 public String toString () {
313330 return "compressor=" + getId ()
@@ -324,7 +341,7 @@ public void compress(InputStream is, OutputStream os) throws IOException {
324341 final int outputSize = inputSize + JBlosc .OVERHEAD ;
325342 final ByteBuffer inputBuffer = ByteBuffer .wrap (inputBytes );
326343 final ByteBuffer outBuffer = ByteBuffer .allocate (outputSize );
327- final int i = JBlosc .compressCtx (clevel , shuffle , 1 , inputBuffer , inputSize , outBuffer , outputSize , cname , blocksize , 1 );
344+ final int i = JBlosc .compressCtx (clevel , shuffle , 1 , inputBuffer , inputSize , outBuffer , outputSize , cname , blocksize , nthreads );
328345 final BufferSizes bs = cbufferSizes (outBuffer );
329346 byte [] compressedChunk = Arrays .copyOfRange (outBuffer .array (), 0 , (int ) bs .getCbytes ());
330347 os .write (compressedChunk );
@@ -341,7 +358,7 @@ public void uncompress(InputStream is, OutputStream os) throws IOException {
341358 byte [] inBytes = Arrays .copyOf (header , compressedSize );
342359 di .readFully (inBytes , header .length , compressedSize - header .length );
343360 ByteBuffer outBuffer = ByteBuffer .allocate (uncompressedSize );
344- JBlosc .decompressCtx (ByteBuffer .wrap (inBytes ), outBuffer , outBuffer .limit (), 1 );
361+ JBlosc .decompressCtx (ByteBuffer .wrap (inBytes ), outBuffer , outBuffer .limit (), nthreads );
345362 os .write (outBuffer .array ());
346363 }
347364
0 commit comments