Python and Impala — Quick Overview and Samples

Python and Impala — Quick Overview and Samples

When last we dove into the world of Big Data we spoke about its implementation with Python and Hive. Plus how that combination could be used to develop really to bust the plications.

Today we would like to switch gears a bit and get our feet wet with another BigData combo of Python and Impala. The reason for this is because there are some limitations that exist when using Hive that might prove a deal-breaker for your specific solution. Impala might be a better route to take instead.

To begin we have to understand a few core concepts Map-Reduce all the way to an overview of impala before we take a quick glance at some python-Impala coding.

Map-reduce concept overview

First on our list of concepts is Map Reduce. Simply stated, it is a software framework and programming model used for processing huge amounts of data. MapReduce programs work in two phases, namely, Map and Reduce. Map tasks deal with splitting and mapping of data while Reduce tasks shuffle and reduce the data.

Hadoop is capable of running MapReduce programs written in various languages: Java, Ruby, Python, and C++. MapReduce programs are parallel in nature, thus are very useful for performing large-scale data analysis using multiple machines in the cluster.

The input to each phase is key-value pairs. Plus, every programmer needs to specify two functions: map function and reduce function.

The whole process goes through four phases of execution:

Input Splits:

  • An input to a MapReduce job is divided into fixed-size pieces called input splits Input split is a chunk of the input that is consumed by a single map

Mapping:

  • This is the very first phase in the execution of the map-reduce program. In this phase data in each split is passed to a mapping function to produce output values. In our example, a job of mapping phase is to count a number of occurrences of each word from input splits (more details about input-split is given below) and prepare a list in the form of <word, frequency>

Shuffling:

  • This phase consumes the output of the mapping phase. Its task is to consolidate the relevant records from the Mapping phase output. In our example, the same words are clubbed together along with their respective frequency.

Reducing:

  • Here, output values from the Shuffling phase are aggregated. This phase combines values from the Shuffling phase and returns a single output value. In short, this phrase summarizes the complete dataset.

This is further divided into tasks. There are two types of tasks:

Map tasks (Splits & Mapping) Reduce tasks (Shuffling, Reducing)

The complete execution process (execution of Map and Reduce tasks, both) is controlled by two types of entities called a

Jobtracker: Acts like a master (responsible for the complete execution of submitted job)

Multiple Task Trackers: Acts like slaves, each of them performing the job For every job submitted for execution in the system, there is one Jobtracker that resides on Namenode and there are multiple task trackers which reside on Datanode.

Tez execution engine concept

Next on our list is Tez. What it does is to generalize the MapReduce paradigm to a more powerful framework based on expressing computations as a dataflow graph. Tez is not meant directly for end-users – in fact, it enables developers to build end-user applications with much better performance and flexibility. Hadoop has traditionally been a batch-processing platform for large amounts of data. However, there are a lot of use cases for the near-real-time performance of query processing. Tez helps Hadoop address cases which are not well suited for Map-Reduce.

Massive Parallel processing concept overview

Now we need to have a brief look into MPP (massively parallel processing). It can be stated to be the coordinated processing of a program by multiple processors that work on different parts of the program, with each processor using its own operating system and memory. Generally, MPP processors communicate using some messaging interface. In some executions, up to 200 or more processors can work on the same application. An "interconnect" arrangement of data paths allows messages to be sent between processors. Generally, the setup for MPP is more tricky, requiring thought about how to partition a common database among processors and how to assign work among the processors. An MPP system is also known as a "loosely coupled" or "shared nothing" system.

An MPP system is considered better than an asymmetrically parallel system ( SMP ) for applications that allow a number of databases to be searched in parallel. These include decision support systems and data warehouse applications.

What is Impala?

Now to the crux of our content. Impala is an MPP (Massive Parallel Processing) SQL query engine for processing huge volumes of data that is stored in a Hadoop cluster. It is an open-source software which is written in C++ and Java. It provides high performance and low latency compared to other SQL engines for Hadoop.

In other words, Impala is the highest performing SQL engine (giving RDBMS-like experience) which provides the fastest way to access data that is stored in the Hadoop Distributed File System.

Impala combines the SQL support and multi-user performance of a traditional analytic database with the scalability and flexibility of Apache Hadoop, by utilizing standard components such as HDFS, HBase, Metastore, YARN, and Sentry.

With Impala, users can communicate with HDFS or HBase using SQL queries in a faster way compared to other SQL engines like Hive.

Impala can read almost all the file formats such as Parquet, Avro, RCFile used by Hadoop.

Impala uses the same metadata, SQL syntax (Hive SQL), ODBC driver, and user interface (Hue Beeswax) as Apache Hive, providing a familiar and unified platform for batch-oriented or real-time queries.

Unlike Apache Hive, Impala is not based on MapReduce algorithms. It implements a distributed architecture based on daemon processes that are responsible for all the aspects of query execution that run on the same machines.

Thus, it reduces the latency of utilizing MapReduce and this makes Impala faster than Apache Hive.

Impala vs Hive - Comparison

Understanding what your solution requires is a critical criterion for selecting either to go with a Hive implementation or one that is dependent on Impala. However, there are differences to note between the two.

Impala has been shown to have a performance lead over Hive by benchmarks of both Cloudera (Impala’s vendor) and AMPLab. Benchmarks have been observed to be notorious about biasing due to minor software tricks and hardware settings. However, it is worthwhile to take a deeper look at this constantly observed difference. The following reasons come to the fore as possible causes:

Cloudera Impala being a native query language avoids startup overhead which is commonly seen in MapReduce/Tez based jobs (MapReduce programs take time before all nodes are running at full capacity). In Hive, every query has this problem of “cold start” whereas Impala daemon processes are started at boot time itself, always being ready to process a query.

Hadoop reuses JVM instances to reduce startup overhead partially but introduces another problem when large haps are in use. Cloudera benchmark has 384 GB memory which is a big challenge for the garbage collector of the reused JVM instances.

MapReduce materializes all intermediate results, which enables better scalability and fault tolerance (while slowing down data processing). Impala streams intermediate results between executors (trading off scalability).

Hive generates query expressions at compile time whereas Impala does runtime code generation for “big loops”. Apache Hive might not be ideal for interactive computing whereas Impala is meant for interactive computing.

Hive is batch-based Hadoop MapReduce whereas Impala is more like an MPP database.

Hive supports complex types but Impala does not.

Apache Hive is fault-tolerant whereas Impala does not support fault tolerance. When a hive query is run and if the DataNode goes down while the query is being executed, the output of the query will be produced as Hive is fault-tolerant. However, that is not the case with Impala. If a query execution fails in Impala it has to be started all over again.

Hive transforms SQL queries into Apache Spark or Apache Hadoop jobs making it a good choice for long-running ETL jobs for which it is desirable to have fault tolerance because developers do not want to re-run a long-running job after executing it for several hours

Impala pros & cons

Here is a list of some noted advantages of Cloudera Impala.

  • Using impala, you can process data that is stored in HDFS at lightning-fast speed with traditional SQL knowledge.

  • Since the data processing is carried where the data resides (on Hadoop cluster), data transformation and data movement are not required for data stored on Hadoop, while working with Impala.

  • Using Impala, you can access the data that is stored in HDFS, HBase, and Amazon s3 without the knowledge of Java (MapReduce jobs). You can access them with a basic idea of SQL queries.

  • To write queries in business tools, the data has to be gone through a complicated extract-transform-load (ETL) cycle. But, with Impala, this procedure is shortened. The time-consuming stages of loading & reorganizing are overcome with the new techniques such as exploratory data analysis & data discovery making the process faster.

  • Impala is pioneering the use of the Parquet file format, a columnar storage layout that is optimized for large-scale queries typical in data warehouse scenarios.

But as to everything, there are some notable downsides:

  • Impala does not provide any support for Serialization and Deserialization.

  • Whenever new records/files are added to the data directory in HDFS, the table needs to be refreshed.

Limitation of HDFS based tables

Hadoop distributed filesystem works well for many large datasets as the distributed filesystem. But we should know there are some limitations of HDFS which makes it a bad fit for some applications.

Low latency data access If an application requires low latency data access, in the range of milliseconds, it would not work well with HDFS. It is designed to provide high throughput at the expense of low latency.

Lots of small files Namenode holds data about file location in the HDFS cluster. If there are too many files, Namenode will not have enough memory to store such metadata about each file. We will learn about HDFS architecture in the next tutorial.

Arbitrary data modification Hadoop distributed file system does not support updating data once it is written. We can append data to the end of files but modifying arbitrary data is not possible in HDFS. Hence application which needs data

KUDU storage engine concept overview

Kudu is a columnar storage manager developed for the Apache Hadoop platform. Kudu shares the common technical properties of Hadoop ecosystem applications: it runs on commodity hardware, is horizontally scalable, and supports the highly available operation.

It's feature set include:

  • Fast processing of OLAP workloads.

  • Integration with MapReduce, Spark and other Hadoop ecosystem components.

  • Tight integration with Apache Impala, making it a good, mutable alternative to using HDFS with Apache Parquet.

  • Strong but flexible consistency model, allowing you to choose consistency requirements on a per-request basis, including the option for strict-serializable consistency.

  • Strong performance for running sequential and random workloads simultaneously.

  • Easy to administer and manage.

  • High availability. Tablet Servers and Masters use the Raft Consensus Algorithm, which ensures that as long as more than half the total number of replicas are available, the tablet is available for reads and writes. For instance, if 2 out of 3 replicas or 3 out of 5 replicas are available, the tablet is available.

  • Reads can be serviced by read-only follower tablets, even in the event of a leader tablet failure.

  • Structured data model.

  • By combining all of these properties, Kudu targets support for families of applications that are difficult or impossible to implement on current generation Hadoop storage technologies. A few examples of applications for which Kudu is a great solution are:

  • Reporting applications where newly-arrived data needs to be immediately available for end-users

  • Time-series applications that must simultaneously support

  • queries across large amounts of historic data

  • granular queries about an individual entity that must return very quickly

  • Applications that use predictive models to make real-time decisions with periodic refreshes of the predictive model based on all historic data

Impala + KUDU fast RDBMS like a solution for Big-Data analytics

Kudu is well integration with Impala, such that it allows you to use Impala to insert, query, update, and delete data from Kudu tablets using Impala's SQL syntax, as an alternative to using the Kudu APIs to build a custom Kudu application. Plus, you can use JDBC or ODBC to connect existing or new applications written in other languages, frameworks, or business intelligence tools to your Kudu data, using Impala as the broker.

  • CREATE/ALTER/DROP TABLE - Impala supports creating, altering, and dropping tables using Kudu as the persistence layer. The tables follow the same internal/external approach as other tables in Impala, allowing for flexible data ingestion and querying.

  • INSERT - Data can be inserted into Kudu tables from Impala using the same mechanisms as any other table with HDFS or HBase persistence.

  • UPDATE/DELETE - Impala supports the UPDATE and DELETE SQL commands to modify existing data in a Kudu table row-by-row or as a batch. The syntax of the SQL commands is designed to be as compatible as possible with existing solutions. In addition to simple DELETE or UPDATE commands, you can specify complex joins in the FROM clause of the query, using the same syntax as a regular SELECT statement.

  • Flexible Partitioning - Similar to the partitioning of tables in Hive, Kudu allows you to dynamically pre-split tables by hash or range into a predefined number of tablets, in order to distribute writes and queries evenly across your cluster. You can partition by any number of primary key columns, with any number of hashes, a list of split rows, or a combination of these. A partition scheme is required.

  • Parallel Scan - To achieve the highest possible performance on modern hardware, the Kudu client used by Impala parallelizes scans across multiple tablets.

  • High-efficiency queries - Where possible, Impala pushes down predicate evaluation to Kudu so that predicates are evaluated as close as possible to the data. Query performance is comparable to Parquet in many workloads.

Using Impala with Python - Python and Impala Samples

Connect to impala

We use the Impyla package to manage Impala connections.

(pip install impyla)

#
  from impala.dbapi import connect
  import logging
# Connect to Impala and execute the query

def execute_query(query, cursor=None):
  try:
    impala_con = connect(host='192.168.250.10')

    # If you have a Kerberos auth on your Impala, you could use connection string like:
    # impala_con = connect(host='192.168.250.10', port=21050, use_ssl=True,
    # database='default', user='username', kerberos_service_name='impala',
    # auth_mechanism = 'GSSAPI')
    # NOTE: You may need to install additional OS related packages like:
    # libsasl2-modules-gssapi-mit, thrift_sasl

    impala_cur = impala_con.cursor()
    impala_cur.execute(query)
    result = impala_cur if cursor else impala_cur.fetchall()
    logging.info('Query has been successfully executed')
    impala_cur.close()
    impala_con.close().
    return result
  except Exception as err:
    logging.error('Query execution failed!')
    logging.error(err)
    return None

Creating KUDU tables for using in Impala

Unlike HDFS tables, KUDU tables allow us to have primary key constraint and establish effective data distribution across partitions / buckets.

The primary key fields must be defined first in the table and in the order in which the PRIMARY KEY () is specified in the expression. Primary key fields are implicitly marked NOT NULL.

Proper partitioning can significantly speed up access / search data, so you should pay attention to the correct partitioning /bucketing strategy

create_expression = (
'CREATE TABLE kudu_table '
'(app_id BIGINT,'
            'code STRING,'
            'description STRING,'
            'events_count BIGINT,'
            'PRIMARY KEY(app_id, code)) '
'PARTITION BY HASH(app_id) PARTITIONS 64 STORED AS KUDU ')

execute_query(create_expression)

Export query results into CSV file:

import csv

query = 'SELECT * FROM kudu_table LIMIT 500'
cursor = execute_query(query, cursor=True)
fields = [filed[0] for field in cursor.description]
csv_file = '/csvs/export.csv'

with open(csv_file, 'w', newline='') as out_csv:
    csv_writer = csv.writer(out_csv, delimiter=',', quotechar='"', quoting=csv.QUOTE_ALL, lineterminator='\n')
    csv_writer.writerow(fields)
    for row in cursor:
        csv_writer.writerow(row)

Export query results as Pandas DataFrame object:

from impala.util import as_pandas

query = 'SELECT * FROM kudu_table LIMIT 500'
cursor = execute_query(query, cursor=True)

data_frame = as_pandas(cursor)

Using Ibis with Impala and Kudu:

pip install ibis-framework

import ibis
ibis.options.interactive = True

host = '192.168.250.10'

hdfs = ibis.hdfs_connect(host, port=50070)
ii = ibis.impala.connect(host, port=21050, hdfs_client=hdfs)
ii.kudu.connect(host, 7051)

# List KUDU tables
ii.kudu.list_tables()

# The table method on ic.kudu automatically creates an Impala table whose metadata references the existing data in Kudu:

some_table = ic.kudu.table('some_table')

You can issue SELECT, INSERT, DELETE, and UPDATE queries on data in Kudu tables via Impala, but for now only SELECT and INSERT operations are available from Ibis.

Creating new Kudu-backed Impala tables:

The Impala client's Kudu interface has a method create_table which enables more flexible Impala table creation with data stored in Kudu. This includes:

Creating empty tables with a particular schema Creating tables from an Ibis table expression (i.e. a "CTAS" in database speak) Creating tables from pandas DataFrame objects

schema = ibis.schema([('one', 'int32'), ('two', 'string'), ('three', 'double')])

if ii.exists_database('kudu_tables'):
  ii.drop_database('kudu_tables', force=True)

  ii.create_database('kudu_tables')
  impala_name = 'example1'
  kudu_name = 'example1-kudu-table'
  ic.kudu.create_table(impala_name, kudu_name, schema=schema,
  primary_keys=['one'],
  database='kudu_tables')

  db = ii.database('kudu_tables')
  impala_table = db.table(impala_name)