Using Apache Hive or Apache Spark v2
You can use the Hadoop Foreign Data Wrapper either through the Apache Hive or the Apache Spark. Both Hive and Spark store metadata in the configured metastore, where databases and tables are created using HiveQL.
Using HDFS FDW with Apache Hive on top of Hadoop
Apache Hive data warehouse software helps with querying and managing large datasets residing in distributed storage. Hive provides a mechanism to project structure onto this data and query the data using a SQL-like language called HiveQL. At the same time, this language allows traditional map/reduce programmers to plug in their custom mappers and reducers when it's inconvenient or inefficient to express this logic in HiveQL.
You can download the two versions of Hive—HiveServer1 and HiveServer2—from the Apache Hive website.
Note
The Hadoop Foreign Data Wrapper supports only HiveServer2.
To use HDFS FDW with Apache Hive on top of Hadoop:
Download weblogs_parse and follow the instructions at the Wiki Pentaho website.
Upload the
weblog_parse.txt
file using these commands:hadoop fs -mkdir /weblogs hadoop fs -mkdir /weblogs/parse hadoop fs -put weblogs_parse.txt /weblogs/parse/part-00000
Start HiveServer, if not already running, using following command:
$HIVE_HOME/bin/hiveserver2
or
$HIVE_HOME/bin/hive --service hiveserver2
Connect to HiveServer2 using the hive beeline client. For example:
$ beeline Beeline version 1.0.1 by Apache Hive beeline> !connect jdbc:hive2://localhost:10000/default;auth=noSasl
Create a table in Hive. The example creates a table named
weblogs
:CREATE TABLE weblogs ( client_ip STRING, full_request_date STRING, day STRING, month STRING, month_num INT, year STRING, hour STRING, minute STRING, second STRING, timezone STRING, http_verb STRING, uri STRING, http_status_code STRING, bytes_returned STRING, referrer STRING, user_agent STRING) row format delimited fields terminated by '\t';
Load data into the table.
hadoop fs -cp /weblogs/parse/part-00000 /user/hive/warehouse/weblogs/
Access your data from Postgres. You can now use the
weblog
table. Once you're connected using psql, follow these steps:-- set the GUC variables appropriately, e.g. : hdfs_fdw.jvmpath='/home/edb/Projects/hadoop_fdw/jdk1.8.0_111/jre/lib/amd64/server/' hdfs_fdw.classpath='/usr/local/edbas/lib/postgresql/HiveJdbcClient-1.0.jar:/home/edb/Projects/hadoop_fdw/hadoop/share/hadoop/common/hadoop-common-2.6.4.jar:/home/edb/Projects/hadoop_fdw/apache-hive-1.0.1-bin/lib/hive-jdbc-1.0.1-standalone.jar' -- load extension first time after install CREATE EXTENSION hdfs_fdw; -- create server object CREATE SERVER hdfs_server FOREIGN DATA WRAPPER hdfs_fdw OPTIONS (host '127.0.0.1'); -- create user mapping CREATE USER MAPPING FOR postgres SERVER hdfs_server OPTIONS (username 'hive_username', password 'hive_password'); -- create foreign table CREATE FOREIGN TABLE weblogs ( client_ip TEXT, full_request_date TEXT, day TEXT, Month TEXT, month_num INTEGER, year TEXT, hour TEXT, minute TEXT, second TEXT, timezone TEXT, http_verb TEXT, uri TEXT, http_status_code TEXT, bytes_returned TEXT, referrer TEXT, user_agent TEXT ) SERVER hdfs_server OPTIONS (dbname 'default', table_name 'weblogs'); -- select from table postgres=# SELECT DISTINCT client_ip IP, count(*) FROM weblogs GROUP BY IP HAVING count(*) > 5000 ORDER BY 1; ip | count -----------------+------- 13.53.52.13 | 5494 14.323.74.653 | 16194 322.6.648.325 | 13242 325.87.75.336 | 6500 325.87.75.36 | 6498 361.631.17.30 | 64979 363.652.18.65 | 10561 683.615.622.618 | 13505 (8 rows) -- EXPLAIN output showing WHERE clause being pushed down to remote server. EXPLAIN (VERBOSE, COSTS OFF) SELECT client_ip, full_request_date, uri FROM weblogs WHERE http_status_code = 200; QUERY PLAN ---------------------------------------------------------------------------------------------------------------- Foreign Scan on public.weblogs Output: client_ip, full_request_date, uri Remote SQL: SELECT client_ip, full_request_date, uri FROM default.weblogs WHERE ((http_status_code = '200')) (3 rows)
Using HDFS FDW with Apache Spark on top of Hadoop
Apache Spark is a general-purpose distributed computing framework that supports a wide variety of use cases. It provides real-time streaming as well as batch processing with speed, ease-of-use, and sophisticated analytics. Spark doesn't provide a storage layer, as it relies on third-party storage providers like Hadoop, HBASE, Cassandra, S3, and so on. Spark integrates seamlessly with Hadoop and can process existing data. Spark SQL is 100% compatible with HiveQL. You can use it to replace Hiveserver2, using Spark Thrift Server.
To use HDFS FDW with Apache Spark on top of Hadoop:
Download and install Apache Spark in local mode.
In the folder
$SPARK_HOME/conf
, create a filespark-defaults.conf
containing the following line:spark.sql.warehouse.dir hdfs://localhost:9000/user/hive/warehouse
By default, Spark uses
derby
for both the meta data and the data itself (called a warehouse in Spark). To have Spark use Hadoop as a warehouse, add this property.Start Spark Thrift Server.
./start-thriftserver.sh
Make sure Spark Thrift Server is running and writing to a log file.
Create a local file (
names.txt
) that contains the following entries:$ cat /tmp/names.txt 1,abcd 2,pqrs 3,wxyz 4,a_b_c 5,p_q_r ,
Connect to Spark Thrift Server2 using the Spark beeline client. For example:
$ beeline Beeline version 1.2.1.spark2 by Apache Hive beeline> !connect jdbc:hive2://localhost:10000/default;auth=noSasl org.apache.hive.jdbc.HiveDriver
Prepare the sample data on Spark. Run the following commands in the beeline command line tool:
./beeline Beeline version 1.2.1.spark2 by Apache Hive beeline> !connect jdbc:hive2://localhost:10000/default;auth=noSasl org.apache.hive.jdbc.HiveDriver Connecting to jdbc:hive2://localhost:10000/default;auth=noSasl Enter password for jdbc:hive2://localhost:10000/default;auth=noSasl: Connected to: Spark SQL (version 2.1.1) Driver: Hive JDBC (version 1.2.1.spark2) Transaction isolation: TRANSACTION_REPEATABLE_READ 0: jdbc:hive2://localhost:10000> create database my_test_db; +---------+--+ | Result | +---------+--+ +---------+--+ No rows selected (0.379 seconds) 0: jdbc:hive2://localhost:10000> use my_test_db; +---------+--+ | Result | +---------+--+ +---------+--+ No rows selected (0.03 seconds) 0: jdbc:hive2://localhost:10000> create table my_names_tab(a int, name string) row format delimited fields terminated by ' '; +---------+--+ | Result | +---------+--+ +---------+--+ No rows selected (0.11 seconds) 0: jdbc:hive2://localhost:10000> 0: jdbc:hive2://localhost:10000> load data local inpath '/tmp/names.txt' into table my_names_tab; +---------+--+ | Result | +---------+--+ +---------+--+ No rows selected (0.33 seconds) 0: jdbc:hive2://localhost:10000> select * from my_names_tab; +-------+---------+--+ | a | name | +-------+---------+--+ | 1 | abcd | | 2 | pqrs | | 3 | wxyz | | 4 | a_b_c | | 5 | p_q_r | | NULL | NULL | +-------+---------+--+
The following commands list the corresponding files in Hadoop:
$ hadoop fs -ls /user/hive/warehouse/ Found 1 items drwxrwxrwx - org.apache.hive.jdbc.HiveDriver supergroup 0 2020-06-12 17:03 /user/hive/warehouse/my_test_db.db $ hadoop fs -ls /user/hive/warehouse/my_test_db.db/ Found 1 items drwxrwxrwx - org.apache.hive.jdbc.HiveDriver supergroup 0 2020-06-12 17:03 /user/hive/warehouse/my_test_db.db/my_names_tab
Access your data from Postgres using psql:
-- set the GUC variables appropriately, e.g. : hdfs_fdw.jvmpath='/home/edb/Projects/hadoop_fdw/jdk1.8.0_111/jre/lib/amd64/server/' hdfs_fdw.classpath='/usr/local/edbas/lib/postgresql/HiveJdbcClient-1.0.jar:/home/edb/Projects/hadoop_fdw/hadoop/share/hadoop/common/hadoop-common-2.6.4.jar:/home/edb/Projects/hadoop_fdw/apache-hive-1.0.1-bin/lib/hive-jdbc-1.0.1-standalone.jar' -- load extension first time after install CREATE EXTENSION hdfs_fdw; -- create server object CREATE SERVER hdfs_server FOREIGN DATA WRAPPER hdfs_fdw OPTIONS (host '127.0.0.1', port '10000', client_type 'spark', auth_type 'NOSASL'); -- create user mapping CREATE USER MAPPING FOR postgres SERVER hdfs_server OPTIONS (username 'spark_username', password 'spark_password'); -- create foreign table CREATE FOREIGN TABLE f_names_tab( a int, name varchar(255)) SERVER hdfs_svr OPTIONS (dbname 'testdb', table_name 'my_names_tab'); -- select the data from foreign server select * from f_names_tab; a | name ---+-------- 1 | abcd 2 | pqrs 3 | wxyz 4 | a_b_c 5 | p_q_r 0 | (6 rows) -- EXPLAIN output showing WHERE clause being pushed down to remote server. EXPLAIN (verbose, costs off) SELECT name FROM f_names_tab WHERE a > 3; QUERY PLAN -------------------------------------------------------------------------- Foreign Scan on public.f_names_tab Output: name Remote SQL: SELECT name FROM my_test_db.my_names_tab WHERE ((a > '3')) (3 rows)
Note
This example uses the same port while creating the foreign server because Spark Thrift Server is compatible with Hive Thrift Server. Applications using Hiveserver2 work with Spark except for the behavior of the ANALYZE
command and the connection string in the case of NOSASL
. We recommend using ALTER SERVER
and changing the client_type
option if you replace Hive with Spark.