Bucketing in Spark

Bucketing in Spark

πŸ€”How Bucketing Organizes Your Apache Spark Universe⚑

Β·

3 min read

Bucketing πŸͺ£

  1. Bucketing is a way to assign rows of a dataset to specific buckets and collocate them on disk.

  2. Explicit bucket counts (clustering columns) can be provided to partition the data based on the number of buckets.

Ideal situation to use πŸ‘

Bucketing is done when the cardinality of the column is high.
[High Cardinality: Term used to say when values in a column is unique or very uncommon]

Working βš™οΈ

To visualize the working of bucketing let us take a sample data. Below is data of an organization having some employees with details like:- ID, Name

employeeDf = spark.createDataFrame([
    (10001,'Deepankar'),
    (10002,'Deepansh'),
    (10003,'Janani'),
    (10004,'Rishab')], schema = 'ID integer,Name String')

employeeDf.show()

+-----+---------+
|   ID|     Name|
+-----+---------+
|10001|Deepankar|
|10002| Deepansh|
|10003|   Janani|
|10004|   Rishab|
+-----+---------+

β€Ž

  1. HASHING ✨
    While Apache Hive doesn't explicitly specify a single hashing technique for bucketing, it generally relies on MurmurHash3 as the default hashing algorithm.
    MurmurHash3 has ability to take input of various types and gives output of unsigned integer of 32 or 128 bit (whichever is opted)

     employeeDf.write 
          .format("parquet") 
          .mode("overwrite") 
          .bucketBy(2, "ID") 
          .saveAsTable("myDataBase.colleagues")
    
     '''
     Once bucketBy() function is triggered the Apache starts hashing the 
     column data pased in parameter [in our case it is ID column].
     MurmurHash3 hash value of 10001 = 2546479776
     '''
     +------+-----------+-------------+
     | ID   | Names     | MurmurHash3 |
     +------+-----------+-------------+
     | 10001| Deepankar | 2546479776  |
     | 10002| Prashant  | 4269264863  |
     | 10003| Janani    | 3357256574  |
     | 10004| Rishab    | 3687225167  |
     +------+-----------+-------------+
    

    All the values in the column are hashed one by one, post hashing comes bucket assigning which we will discuss in next step.

    You can see the MurmurHash3 values of input in https://murmurhash.shorelabs.com/

    β€Žβ€Žβ€Žβ€Žβ€Ž

  2. BUCKETING πŸͺ£
    When storing data, a hash function is applied using modulo function.

    h%b β†’ (hash key value % number of buckets defined)

     +------+-----------+-------------+-----------------+--------------+
     | ID   | Names     | MurmurHash3 | Modulo (h % b)  | BucketNumber |
     +------+-----------+-------------+-----------------+--------------+
     | 10001| Deepankar | 2546479776  |  2546479776 % 2 |     0        |
     | 10002| Prashant  | 4269264863  |  4269264863 % 2 |     1        |
     | 10003| Janani    | 3357256574  |  3357256574 % 2 |     0        |
     | 10004| Rishab    | 3687225167  |  3687225167 % 2 |     1        |
     +------+-----------+-------------+-----------------+--------------+
    

    One the Apache completes the hashing it applies modulo function to get the bucket number where that particular record will be saved. In our case we defined bucket count as 2 hence each hash value will be modulo by 2 and the result will be it's bucket number.

    β€Ž

  3. READING πŸ“–

    Reading data involves applying the same hash function using modulo.
    For example, if retrieving record having ID 10003, then (hash of 10003) % 2 = 0, indicating data is in Bucket 0 (only Bucket 0 is scanned, and others are skipped).

     readDf = spark.read.table("myDataBase.colleagues")
                 .filter(col('ID')===10003)
    
     readDf.show()
    
     +-----+---------+
     |   ID|     Name|
     +-----+---------+
     |10003|   Janani|
     +-----+---------+
    

    While reading we have explicitly provided ID = 10003 using the filter condition which spark uses to get the correct bucket number which will have the record.

Β