How to use Python with Hive to handle Big Data?

How to use Python with Hive to handle Big Data?

Over the past few years, we have been hearing more about the wealth of data we humans generate. This has progressively grown into the concept that if you have enough of this data and you are able to piece together some meaning from it, then you can achieve everything from predicting the future to curing all human ills. However, if you ask the average man on the street what BigData is, you might find that to them it is just a comparative term. But you don't hear people talking about LittleData, it's polar opposite. This leads many in the industry believing that the whole concept was fabricated by marketers to give a cool sounding name to a concept they can't even define.

But marketing slang aside, BigData has grown to become what we perceive as the technological revolution that has been years in the making. Even stripped down to its core, BigData a system that’s too big, too complex, or too resource-hungry to be attainable using conventional database strategies. That still leaves us with the problem—what exactly is conventional or traditional?—but it does give us a clearer picture of the three vectors by which it is defined. These are volume, variety and velocity.

Volume obviously, refers to the size of the storage required. Velocity can be taken as the speed at which data is acquired, processed and used. This combines both the networking and CPU choices. Then finally we have variety. This is a little more complex as it defines the mix of data types to be analysed. The core concept of big data is recognising patterns in a given chunk of data, but the more varied the source of data, the more powerful it becomes.

Why the regular DBMS solutions is not a “First choice solution” in BigData world

Let's start off with size, once you have no data source size limitation, regular DBMS have issues processing. We are talking in the Petabyte range. However, they could try to add more central processing units or more memory, depending on the scenario, to the database management system to enable it to scale up vertically, but it leads mostly to increasing costs of software and hardware infrastructure and does not help to solve the problem itself.

Next, we have the problem of the variety of forms big data comes in. Most of which happen to be semi-structured or unstructured data. Regular DBMS run into issues here as most of them are unable to do anything about huge amounts of unstructured data. They are designed more inline for handling structured data sets or support ACID conception.

Finally, with so much data needing to be processed and handled very quickly, RDBMS lacks the high velocity because it’s designed for steady data retention rather than rapid growth. As a direct result, the ineptitude of relational databases to handle “big data” led to the emergence of new technologies.

What is Hadoop?

Now that we know regular DBMS won't work for big data, we need to look into some other alternatives. Hadoop just happens to be one of these.

Hadoop, in simple terms, is an open-source software framework for storing large data and running applications on clusters of stock hardware. It is a repository for massive storage of any kind of data, enormous processing power and the ability to handle virtually limitless concurrent tasks or jobs without breaking a sweat.

But why is Hadoop so important for BigData?

There are a few reasons why it makes sense to use something like Hadoop for big data and the first is it's the ability to store and process all kinds of data very quickly. With data volumes and varieties continual increasing, particularly from social media and the Internet of Things (IoT). But other reasons include:

  • Computing power It allows you to distribute computing model processes big data quickly. Basically, the more computing nodes you have, the more processing power you can make use of.

  • Fault tolerance Data and application processing are protected against hardware failure. If a node goes down, jobs are automatically redirected to other nodes to make sure the distributed computing does not fail. Multiple copies of all data are stored automatically.

  • Flexibility It has the ability to store data first and let you think about how to use it later down the line. This is where it shines over it's RDBMS counterparts.

  • Low cost The open-source framework is free and uses commodity hardware to store large quantities of data.

  • Scalability With the addition of more nodes, scaling up your projects is quick and easy without any tedious administrative hurdles.

To learn more, check out Data Lake Consulting services.

What is HDFS?

By definition, the Hadoop Distributed File System or HDFS for short is the primary data storage system used by Hadoop applications. It makes use of a NameNode and DataNode architecture to implement a distributed file system that provides high-performance access to data across highly scalable Hadoop clusters.

It provides a reliable means by which one can manage pools of big data and supporting related big data analytics applications. Hence it is considered a core part of the Hadoop ecosystem.

HDFS works by supporting the rapid transfer of data between compute nodes. From the outset, it was closely bonded with MapReduce, which is a programmatic framework for data processing but has since surpassed its limitations.

The data that HDFS takes in is broken down into separate blocks and distributes them to different nodes in a cluster, thus facilitating highly efficient parallel processing.

With its highly fault-tolerant framework, it allows for the file system to replicate or copy, each piece of data multiple times and distributes the copies to individual nodes, placing at least one copy on a different server rack than the others. This way even if the data on a particular node crashes or becomes lost it can be found elsewhere within a cluster. This ensures that processing can continue while data is recovered.

What is Hive

And now we have made our way down to "HIVE". Hive is an ETL and Data warehousing tool developed on top of the Hadoop Distributed File System. Hive simplifies the performance of operations such as:

  • Data encapsulation
  • Ad-hoc queries
  • Analysis of huge datasets

But what makes Hive standout?

To begin with, in Hive, tables and databases could be created beforehand and then you can load data into them.

It is also a data warehouse built for managing and querying only structured data that is stored in tables. While dealing with structured data Hive framework has features that support optimization and usability such as UDFs. Query optimization here pertains to an effective way of query execution in terms of performance.

New versions of Hive has a limited support of unstructured data that encapsulated into special Hive table fields.

One new and important component of Hive is called Metastore. It is used for storing schema information and typically resides in a relational database. When dealing with single-user metadata storage, Hive uses derby database and for multiple user Metadata or shared Metadata case Hive could use uses MySQL or PostgreSQL.

This brings us to the focal point of this article.

Because the Hive is one of the major tools in the Hadoop ecosystem, we could be able to use it with one of the most popular PL - Python

We can connect Hive using Python to a creating Internal Hive table

Now at this point, we are going to go into practical examples of blending Python with Hive. You can try out the following snippets to get started.

One of the major Hive concepts is Internal and external tables.

Choosing an appropriate table type will affect how data is loaded, controlled, and managed.

Use EXTERNAL tables when:

  1. The data is also used outside of Hive. For example, the data files are read and processed by an existing program that doesn't lock the files.

  2. Data needs to remain in the underlying location even after a DROP TABLE. This can apply if you are pointing multiple schemas (tables or views) at a single data set or if you are iterating through various possible schemas.

  3. You want to use a custom location (Local storage, S3 etc.).

  4. Hive should not own data and control settings, dirs, etc., you have another process that will do those things.

  5. You are not creating a table based on existing table (AS SELECT).

Use INTERNAL tables when:

  1. The data is temporary.

  2. You want Hive to completely manage the lifecycle of the table and data.

Lets connect to the Hive from Python and create two Inernal tables in different formats:

import pyhs2
  if __name__ == "__main__":

    try:
      hive_con =  pyhs2.connect(host='', # Hive server2 IP or host
      port=10000,
      authMechanism="NOSASL",
      user='', # Username
      password='' #User password,
      database='default')
      hive_cur = hive_con.cursor()
      table_body  = '(`Id` BIGINT, `some_field_1` STRING, `some_field_2` STRING ) '
      db_name = "my_db"
      table_name = "my_first_parquete_table"
      table_format = ("PARQUET", "TEXTFILE", "AVRO",)

      # Creating internal Parquet table
      create_tb = ('CREATE TABLE IF NOT EXISTS `%s`.`%s` %s STORED AS %s') % (db_name, tb_name, table_body, table_format[0])
      hive_cur.execute(create_tb)

      # Creating internal Textfile table
      create_tb = ('CREATE TABLE IF NOT EXISTS `%s`.`%s` %s STORED AS %s') % (db_name, tb_name, table_body, table_format[1])
      hive_cur.execute(create_tb)
      hive_cur.close()
      hive_con.close()
  except Exception as e:
    logging.error(str(e))

Because the fact that we should be able to use S3 for storing our external table data, let’s try to connect to S3 and store some files in it:

import logging
import boto3
from boto.s3.key import Key
from boto.s3.connection import S3Connection
from boto.s3.connection import OrdinaryCallingFormat

s3_creds = dict(
    access=""  # AWS Access key
    secret=""   # AWS Secret key
    region="US"
    bucket="" #AWS S3 Bucket Name
    prefix =""  #Target S3 "folder" to store file
)
#############################################################
# Store local file to S3 bucket
#############################################################
def put_file_to_s3(s3_creds, local_file):
    start_time = time.time()
    target_object = ("s3://%s/%s%s") % (s3_creds["bucket"], s3_creds["prefix"], local_file.split("/")[-1])
    try:
        s3_client = boto3.client('s3',
                         aws_access_key_id=s3_creds['access'],
                         aws_secret_access_key=s3_creds['secret'],
                         region_name=s3_creds["region"]
                         )
        s3_client.upload_file(local_file, s3_creds['bucket'], s3_creds["prefix"]+local_file.split("/",2)[-1])
        logging.info(("File %s has been stored to S3 in %s sec.") % (target_object, str(round(time.time()-start_time,3))))
    except Exception as e:
        logging.error(("File %s has been failed to store to S3") % (target_object))

#############################################################
if __name__ == "__main__":
    set_env()
    set_hive_permissions()
    table_content = "row1_column1\trow1_column_2\nrow2_column1\trow2_column2"
    file_name = "./my_temp_folder/my_firs_s3_table.tsv"
    with open(file_name, "w") as local_file:
        local_file.write(table_content)
    put_file_to_s3(s3_creds, file_name)

Now let’s do the same thing with HDFS

import pyarrow as pa
import subprocess
Import logging
Import os
from subprocess import PIPE
#############################################################
# Do some initial magic - set the path to the JAVA libraries
# Set JAVA env variables
#############################################################
def set_env():
  # libhdfs.so path
  cmd = ["locate", "-l", "1", "libhdfs.so"]
  libhdfsso_path = subprocess.check_output(cmd).strip()
  os.environ["ARROW_LIBHDFS_DIR"] = os.path.dirname(libhdfsso_path)

  # JAVA_HOME path
  os.environ["JAVA_HOME"] = '/usr/lib/jvm/java-7-oracle-cloudera'

  # classpath
  cmd = ["/usr/bin/hadoop", "classpath", "--glob"]
  hadoop_cp = subprocess.check_output(cmd).strip()

  if "CLASSPATH" in os.environ:
    os.environ["CLASSPATH"] = os.environ["CLASSPATH"] + ":" + hadoop_cp
  else:
    os.environ["CLASSPATH"] = hadoop_cp
###############################################################
# Get HDFS connector
###############################################################
def hdfs_connect():
  # 192.168.250.15 -  the IP address of the one of the Hadoop cluster name Node
  return pa.hdfs.connect("192.168.250.15", 8020, user='hdfs', driver='libhdfs')

###############################################################
# Write some text to the HDFS file
###############################################################
def write_hdfs(content, file_name):
  try:
    hdfs = hdfs_connect()
    of = hdfs.open(file_name, "wb")
    of.write(content)
    of.close()
    hdfs.close()
    logging.info('Data has been successfully written'))
    return True
  except Exception as err:
    logging.error('Failed when writing content’)
    return False

###############################################################
# Set required permission to HDFS path
# It should be done for using HDFS file as Impala table source
###############################################################
def set_hive_permissions(base_dir):
  try:
    logging.info('Changing HDFS files permissions')
    cmd = ["sudo", "-u", "hdfs", "hdfs", "dfs", "-chown", "-R", "hive:supergroup", base_dir]
    subprocess.Popen(cmd, stdout=PIPE)
    cmd = ["sudo", "-u", "hdfs", "hdfs", "dfs", "-chmod", "-R", "777", base_dir]
    subprocess.Popen(cmd, stdout=PIPE)
  except Exception as err:
    logging.error('Can\'t change HDFS files permissions!')

###############################################################
if __name__ == "__main__":
  set_env()
  set_hive_permissions()
  table_content = "row1_column1\trow1_column_2\nrow2_column1\trow2_column2"
  write_hdfs(table_content, "my_firs_hdfs_table.tsv")

Create External HIVE table using files downloaded above

  table_body  = '(`column_1` STRING, `column_2` STRING ) '
  db_name = "my_db"
  table_name = "my_first_s3 based_external_table"
  s3_loc =  ("s3://%s/%s%s") % (s3_creds["bucket"], s3_creds["prefix"], "my_firs_s3_table.tsv")

  # Creating external S3 based Hive table
  create_tb = ('CREATE TABLE IF NOT EXISTS `%s`.`%s`  %s '
                       'ROW FORMAT DELIMITED FIELDS TERMINATED BY \'\\t\''
                       'LOCATION \'%s\'') % (db_name, tb_name, table_body, s3_loc)
  hive_cur.execute(create_tb)
  hdfs_loc = "/my_firs_hdfs_table.tsv"
  table_name = "my_first_hdfs_based_external_table"
  #  Creating external HDFS based Hive table
  create_tb = ('CREATE TABLE IF NOT EXISTS `%s`.`%s`  %s '
                       'ROW FORMAT DELIMITED FIELDS TERMINATED BY \'\\t\''
                       'LOCATION \'%s\'') % (db_name, tb_name, table_body, hdfs_loc)
  hive_cur.execute(create_tb)

Querying Hive using Python

  hive_cur.execute('SELECT * FROM  `my_db`.`my_first_hdfs_based_external_table` ')
  # Fetch ALL table records in the tuple (could be extremely slow for large tables)
  records = hive_cur.fetchall()
  for record in records:
      # Do smth with record
      pass
  # Fetch ONE working like iterator
  record = hive_cur.fetchone()
  while record is not None:
      # Do smth with record
      record = hive_cur.fetchone()

Converting Hive table formats

# If you would like to convert Text (or another format) based Hive table, you could use a trick like this:
  # Text file-based external Hive Table
  hdfs_loc = "/my_firs_hdfs_table.tsv"
  table_name = "my_hdfs_external_table"
  table_body  = '(`column_1` STRING, `column_2` STRING ) '
  db_name = "my_db"

  #  Creating external HDFS based Hive table
  create_tb = ('CREATE TABLE IF NOT EXISTS `%s`.`%s`  %s '
                       'ROW FORMAT DELIMITED FIELDS TERMINATED BY \'\\t\''
                       'LOCATION \'%s\'') % (db_name, tb_name, table_body, hdfs_loc)
  hive_cur.execute(create_tb)

  #  Creating internal Parquet table with the same structure
  parquet_table_name = "my_parquete_table"
  table_format = "PARQUET"
  create_tb = ('CREATE TABLE IF NOT EXISTS `%s`.`%s` %s STORED AS %s') % (db_name, parquet_tb_name, table_body, table_format)
  hive_cur.execute(create_tb)

  # Converting tables
  query = ("INSERT OVERWRITE TABLE `%s`.`%s` SELECT * FROM `%s`.`%s`") % (db_name, parquet_table_name, db_name, table_name)
  hive_cur.execute()

Quick notes:

  • (All code snippet tested with Python 2.7 and Cloudera CDH)
  • (The pyhs2 is no longer supported, we suggest the use of PyHive instead, as it has similar syntax)

Summary

In the era of BigData, Hive is an invaluable tool and having it in conjunction with flexible programming like Python gives you an extra layer of manoeuvrability when building your next BigData project.

Separate both are invaluable tools but together, there is a spark of magic. And as we move forward in the ever-growing realization of what we can accomplish with BigData, we will need such pairings to be able to develop further.