DataRobot Machine Learning with AWS Athena and Parquet Data

cancel
Showing results for 
Search instead for 
Did you mean: 

DataRobot Machine Learning with AWS Athena and Parquet Data

Multiple 'big data' formats are becoming popular for offering different approaches to compressing large amounts of data for storage and analytics; some of these formats include Orc, Parquet, and Avro. Using and querying these sets can present some challenges. This article will serve to demonstrate one method of how DataRobot can ingest data in Apache Parquet format which is sitting at rest in AWS S3. Similar techniques can be applied in other cloud environments.

What is Parquet, and why is it used?

Parquet is an open source columnar data storage format.  It is often misunderstood to be used primarily, or solely, for compression (although disk space is cheap and there exist near infinite object storage services such as S3 so that reduced space is not exactly an obvious immediate benefit)  Note also that when using compressed data, there is a CPU cost to both compress and decompress it; so there's no speed advantage when using all of the data either.  Snowflake has an article on this showing several approaches to load 10TB of benchmark data.

Screen Shot 2020-02-02 at 7.12.52 AM.png

Snowflake clearly demonstrates the load of full data records to be far higher for simple CSV format. So what is the advantage of Parquet?

Columnar offers little to no advantage when one is interested in a full record. The more columns requested, the more work must be done to read and uncompress them. This is why the truly full data exercise (shown above) shows such high performance for basic CSV files.  However, selecting a subset of the data is where columnar really shines.  If there are 50 columns of data in a loan dataset and the only one of interest is the loan_id, reading a CSV file will require reading 100% of the data file; but, reading a parquet file will require reading only 1 of 50 columns. It is unlikely—but for simplicity's sake, let's assume all of the columns take up exactly the same space—this would also translate into needing to read only 2% of the data.

Further read reductions can be made by partitioning the data. This involves creating a path structure based on data values for a field. The SQL engine WHERE clause will be applied to the folder path structure to decide whether a Parquet file inside it need be read at all. For example, daily files could be partitioned and stored in a structure of YYYY/MM/DD for a loans datasource:

loans/2020/1/1/jan1file.parquet
loans/2020/1/2/jan2file.parquet
loans/2020/1/3/jan3file.parquet

The "hive style" of this would include the field name in the directory (partition):

loans/year=2020/month=1/day=1/jan1file.parquet
loans/year=2020/month=1/day=2/jan2file.parquet
loans/year=2020/month=1/day=3/jan3file.parquet

If the original program was interested in just the loan_id and specifically those loan_id values from January 2, 2020, then the 2% read would be reduced further still. Evenly distributed, this would reduce the read and decompress operation down to just 0.67% of the data. This is going to result in a faster read, a faster return of the data, and a lower bill for the resources required to retrieve the data.

Data for Project Creation Exercise (AWS S3)

The data that will be used for the remainder of this article can be found in GitHub and was made with this notebook20,000 records of (some repeated) loan data was used from LendingClub.  It was then uploaded to S3 using the AWS Command Line Interface (CLI).

$ aws --profile support s3 ls s3://engineering/athena --recursive | awk '{print $4}'
athena/
athena/loan_credit/20k_loans_credit_data.csv
athena/loan_history/year=2015/1/1020de8e664e4584836c3ec603c06786.parquet
athena/loan_history/year=2015/1/448262ad616e4c28b2fbd376284ae203.parquet
athena/loan_history/year=2015/2/5e956232d0d241558028fc893a90627b.parquet
athena/loan_history/year=2015/2/bd7153e175d7432eb5521608aca4fbbc.parquet
athena/loan_history/year=2016/1/d0220d318f8d4cfd9333526a8a1b6054.parquet
athena/loan_history/year=2016/1/de8da11ba02a4092a556ad93938c579b.parquet athena/loan_history/year=2016/2/b961272f61544701b9780b2da84015d9.parquet athena/loan_history/year=2016/2/ef93ffa9790c42978fa016ace8e4084d.parquet

20k_loans_credit_data.csv contains credit score and history information about all loans. Loans are partitioned by year and month, in partial hive format, to demonstrate steps to work with either format within AWS Athena.  Multiple parquet files are represented within the YYYY/MM structure, potentially representing different days a loan was created.  All .parquet files represent loan application and repayment. This data is in a bucket in the AWS Region US-East-1 (N. Virginia).

Introducing AWS Athena

What is Athena?  Athena is a managed service on AWS that provides serverless (aka server on demand) access to use ANSI SQL against S3 objects. It uses Apache Presto and is able to read the following file formats as of this writing: CSV, TSV, JSON, ORC, Apache Parquet, and Avro; and also has support for compressed data in Snappy, Zlib, LZO, and GZIP formats. Athena charges on a pay-per-query model based on the amount of data read.

AWS also provides an article on using Athena against both regular text files as well as parquet, and the amount of data read, time taken, and cost spent for a query against the large amount of data used in their example is quite telling in regards to the advantages of Parquet.

Screen Shot 2020-02-03 at 9.00.44 AM.png

Introducing AWS Glue

What is Glue? Glue, an Extract Transform Load (ETL) tool, will be used in a supporting fashion for this process. ETL jobs will not be constructed and scheduled out; rather Glue will be used to discover files and structure on the hosted S3 bucket in order to apply a high level schema against the contents so that Athena is able to understand how to read and query the contents. This is stored in a hive-like meta store. (Note that Hive DDL could be written explicitly, but this article will assume a large number of potentially different files and leverage Glue's crawler to discover schema and define tables.)

In Glue, a crawler will be made and pointed at the S3 bucket containing the above files. The crawler will be set to output its data into an AWS Glue Data Catalog which will be leveraged by Athena. The Glue job should be created in the same region as the AWS S3 bucket, for this example that is US-East-1.

You can click Add crawler in the AWS Glue service in the AWS console to add a crawler job.

Screen Shot 2020-02-03 at 10.24.33 AM.png

Name the crawler.

Screen Shot 2020-02-03 at 10.25.46 AM.png

Choose Data stores for type, then specify the bucket of interest.

Screen Shot 2020-02-03 at 10.28.31 AM.png

Screen Shot 2020-04-20 at 12.13.19 PM.png

Choose or create an IAM role for the crawler to run as. Managing IAM is considered out of scope for the purposes of this article.  Note the AWS guidance for privileges.

Screen Shot 2020-02-03 at 10.33.43 AM.pngSet the frequency to run on demand, or update as necessary to meet requirements.

Screen Shot 2020-02-03 at 10.34.35 AM.png

The crawler discovered metadata must be written somewhere. Choose an existing database for a catalog or create a new one.

Screen Shot 2020-02-03 at 10.37.09 AM.png

Crawler creation can now be completed.  A prompt will ask if the run on demand crawler should be run now; choose Yes.

In this example, you can see the crawler has discovered and created two tables for the paths: loan_credit and loan_history.

Screen Shot 2020-02-03 at 11.48.29 AM.png

The log shows creation of tables as well as partitions for the loan_history.

Screen Shot 2020-02-03 at 11.52.58 AM.png

The year partition was left in Hive format while the month was not, simply to show what happens if this methodology has not been used. Glue will assign it a generic name that can be updated. Navigate to tables and open loan_history.

Screen Shot 2020-02-03 at 11.57.40 AM.png

Choose to edit the schema and click on the column name to rename the secondary partition to month and save.

Screen Shot 2020-02-03 at 12.00.06 PM.png

This table is now available for querying in Athena.  

Creating a Project in DataRobot

What follows are four examples of starting a project from data queried through Athena in different ways.  All programmatic methods will use the Python SDK and some helper functions as defined in these DataRobot Community GitHub examples.

  1. JDBC Driver
  2. Local Retrieval of SQL Results
  3. Local Retrieval of S3 CSV Results File
  4. AWS S3 Bucket (Signed URL)

1. JDBC Driver

JDBC drivers can be installed and used on DataRobot to ingest data. (Contact Support for installation assistance for the driver, which is out of scope for this article.) As of DataRobot 6.0 for the Managed AI Cloud offering, version 2.0 of the JDBC driver is available, specifically 2.0.5 installed and available on the cloud.  A catalog item dataset can be constructed by navigating to AI Catalog > Add New Data and then selecting Data Connection > Add a new data connection.

For the purposes of this article, the Athena JDBC driver connection was set up to explicitly specify the address (Awsregion and S3OutputLocation (required) were also specified). Query results will be written to this location as a CSV file.

Screen Shot 2020-04-25 at 10.14.35 AM.png

Authentication takes place with an AWSAccessKey and AWSSecretAccessKey for user and password on the next and last step.  As AWS users often have access to many services, including the ability to spin up many resources, a typical best practice is to create an IAM account within AWS with specific permissions for querying and then work with Athena and S3 only.

Once the Data Connection is created, it can be selected in the Add New Data from Data Connection window and used to create an item and project. This workflow can be seen in the JDBC Pull example from this community article on Snowflake Project Creation.  Subsequent examples will show several examples of programmatic creation.

2. Local Retrieval of SQL Results

 

athena_client = session.client('athena')
database = 'community_athena_demo_db'
s3_out = 's3://engineering/athena/output/'
query = "select * from loan_history limit 100"

query_results = fetchall_athena_sql(query, athena_client, database, s3_out)

# convert to dataframe to view and manipulate
df = pd.DataFrame(query_results) 
df.head(2)

proj = dr.Project.create(sourcedata=df,
    project_name='athena load query')

# further work with project via the python API, or work in GUI (link to project printed below)
print(DR_APP_ENDPOINT[:-7] + 'projects/{}'.format(proj.id))

 

The snippet above sends a query to retrieve the first 100 records of loan history. The results are provided back in a dictionary after paginating through the result set from Athena and loading it to local memory.  The results can then be loaded to a dataframe, manipulated to engineer new features, and pushed into DataRobot to create a new project.  The s3_out variable is a required parameter for Athena, which is where Athena will write CSV query results.  This file will be used in subsequent examples.

This method is only recommended for smaller-sized datasets; it may be both easier and faster to simply download the data as a file rather than spool it back in paginated query results.  A pandas dataframe is also used here for convenience and ease of potential data manipulation and feature engineering; it is not required for working with the data or creating a DataRobot project. The machine this code runs on will require adequate memory to work with a pandas dataframe for the size of the dataset being used in this example as well.

3. Local Retrieval of S3 CSV Results File 

 

athena_client = session.client('athena')
s3_client = session.client('s3')
database = 'community_athena_demo_db'
s3_out_bucket = 'engineering'
s3_out_path = 'athena/output/'
s3_out = 's3://' + s3_out_bucket + '/' + s3_out_path
local_path = '/Users/mike/Documents/community/'
local_path = !pwd
local_path = local_path[0]

query = "select lh.loan_id, " \
    "lh.loan_amnt, lh.term, lh.int_rate, lh.installment, lh.grade, lh.sub_grade, " \
    "lh.emp_title, lh.emp_length, lh.home_ownership, lh.annual_inc, lh.verification_status,  " \
    "lh.pymnt_plan, lh.purpose, lh.title, lh.zip_code, lh.addr_state, lh.dti,  " \
    "lh.installment / (lh.annual_inc / 12) as mnthly_paymt_to_income_ratio, " \
    "lh.is_bad, " \
    "lc.delinq_2yrs, lc.earliest_cr_line, lc.inq_last_6mths, lc.mths_since_last_delinq, lc.mths_since_last_record, " \
    "lc.open_acc, lc.pub_rec, lc.revol_bal, lc.revol_util, lc.total_acc, lc.mths_since_last_major_derog " \
    "from community_athena_demo_db.loan_credit lc " \
    "join community_athena_demo_db.loan_history lh on lc.loan_id = lh.loan_id"

s3_file = fetch_athena_file(query, athena_client, database, s3_out, s3_client, local_path)

# get results file from S3
s3_client.download_file(s3_out_bucket, s3_out_path + s3_file, local_path + '/' + s3_file)

proj = dr.Project.create(local_path + '/' + s3_file,
    project_name='athena load file')

# further work with project via the python API, or work in GUI (link to project printed below)
print(DR_APP_ENDPOINT[:-7] + 'projects/{}'.format(proj.id))

 

This snippet shows a more complicated query, where all loans are being pulled, and CSV credit history data is being joined with Parquet loan history data. Upon completion, the S3 results file itself is downloaded to the local Python environment. From here additional processing could be made, or the file could be directly pushed to DataRobot for a new project as shown.

4. AWS S3 Bucket (Signed URL)

Another method for creating a project in DataRobot is by ingesting data from S3, using URL ingest. There are several ways this can be done based on data, environment, and configuration. This example  leverages a private dataset on the cloud and creates a Signed URL for use in DataRobot.

Dataset DataRobot Environment Approach Description
Public Local Install, Cloud Public If a dataset is in a public bucket, the direct HTTP link to the file object can be ingested.
Private Local Install Global IAM Role DataRobot can be installed with an IAM role granted to the DataRobot service account with its own access privileges to S3.  Any URL passed in that the DataRobot service account can see can be used to ingest data.
Private Local Install IAM Impersonation Finer grain security control can be implemented by having DataRobot assume the role and S3 privileges of a user. This requires LDAP authentication and LDAP fields containing S3 information be made available.
Private Local Install, Cloud Signed S3 URL AWS users can create a signed URL to an S3 object, providing a temporary link that expires after a specified amount of time.

 

response = s3_client.generate_presigned_url('get_object',
    Params={'Bucket': s3_out_bucket,
            'Key': s3_out_path + s3_file},
    ExpiresIn=3600)

proj = dr.Project.create(response,
    project_name='athena signed url')

# further work with project via the python API, or work in GUI (link to project printed below)
print(DR_APP_ENDPOINT[:-7] + 'projects/{}'.format(proj.id))

 

This snippet builds on the work presented in the prior one ("Local Retrieval of S3 CSV Results File").  Rather than download the file to the local environment, AWS credentials are used to sign the URL for temporary usage.  The response variable contains a link to the results file, with an authentication string good for 3600 seconds.  Anyone with the entire string URL will be able to access the file for the duration requested.  In this way, rather than downloading the results locally, a DataRobot project can be initiated by referencing the URL value.

Helper functions and full code is available in the DataRobot Community github.

Conclusion

Congratulations!  The data is now in DataRobot.  AWS Athena (and Apache Presto) has enabled SQL against varied data sources to produce results that can be used for DataRobot ingestion.  Similar approaches can be used to work with this type of input data in Azure and Google cloud services as well.

Time to update the resume.

3o025o

Labels (2)
Version history
Revision #:
23 of 23
Last update:
‎05-01-2020 11:38 AM
Updated by:
 
Contributors