Calling ML Models via Snowflake External Functions & Streams

cancel
Showing results for 
Search instead for 
Did you mean: 

Calling ML Models via Snowflake External Functions & Streams

In this article, a Snowflake scoring pipeline will be constructed, taking advantage of Snowflake’s ability to call external API functions. The pipeline will leverage Snowflake Streams and Tasks to create a streaming micro-batch ingestion flow which incorporates a DataRobot hosted machine learning model.

Snowflake has recently announced the ability to call out to external APIs from User-Defined Functions (UDFs), as noted in this announcement.  Note there are a fair amount of requirements and considerations when exploring this approach.  Any API must be fronted by the trusted cloud native API service; in the case of AWS, that is the AWS API Gateway.  There are also a slew of considerations to think about concerning scaling, concurrency, and reliability.  Although various cloud providers will have different limitations, but note that the max payload size for API Gateway is 10MB and for Lambda the max is 6MB for synchronous requests.  There is much to consider here: how does the total infrastructure react when scoring 10 rows vs. 10,000 rows or 10 million rows?  What kind of load is sent when a small 2-node cluster is vertically scaled to a large 8-node cluster?  Or, when it is scaled horizontally to 2 or 3 instances?  What happens if a request times out or a resource is unavailable?

See my other articles (such as DataRobot & Snowflake: Client-Request Model Scoring, DataRobot & Snowflake: Server-Side Model Scoring, or Scoring Snowflake Data via DataRobot Models on AWS EMR Spark) for multiple ways in which typical database jobs can be executed on Snowflake which offers greater simplicity and efficiency for scoring large batches of data. Generally speaking these are best done as part of ETL or ELT pipelines.  A potential good usage of a UDF leveraging external functions is a low volume streaming ingest process which uses internal Snowflake streaming capabilities. This article will demonstrate such a pipeline using Snowpipe, Streams, and Tasks within Snowflake.

An example will be constructed below to score records through a DataRobot-hosted model. The Titanic dataset from Kaggle will be used. Data will be ingested via a streaming pipeline with objects in a STG schema, scored against the model, and then loaded to the PUBLIC schema presentation layer.

Technologies Used

Snowflake—Storage Integration, Stage, Snowpipe, Streams, Tasks, Tables and External Function UDF objects will be used to assemble a streaming scoring pipeline for data as it is ingested.

AWS—A Python Lambda will be used as serverless compute service to act as the intermediary between Snowflake and DataRobot (which is currently a requirement for using an External Function). API Gateway will provide an endpoint to front the Lambda function. IAM policies will also be leveraged to grant roles and privileges to necessary components.  Incoming data will be placed into an S3 object store bucket. An SQS queue will be leveraged for Snowpipe as well.

DataRobot—The model has been built and deployed on the AutoML platform and is available for scoring requests via the DataRobot Prediction API.  The model in this case is being served on horizontally scalable DataRobot cluster member hardware dedicated solely to serving these requests.

Snowflake External API UDF Architecture

External-Function-Architecture.png

Although a native UDF in Snowflake is written in JavaScript, the External Function is executed remotely and can be coded in any language the remote infrastructure supports.  It is then coupled with an API integration in Snowflake to expose it as an External UDF. This integration sends the payload to be operated on to an API proxy service; this is an AWS API Gateway, in this instance. The Gateway then satisfies this request through a remote service behind it. This could be a microservice backed by a container or by a Lambda piece of code.

Create the Remote Service (AWS Lambda)

Oleksandr has some great articles on bringing a model out of DataRobot and into AWS to be hosted via Lambda, with both DataRobot Prime and Scoring Code.  (DataRobot also supports exporting a model outside of DataRobot as a Docker container, as explained in this article.) These are all great options to host the model inside AWS and to take advantage of AWS scalability features.

This article will provide an example of simply treating the gateway as a proxy for a complete pass through and sending the scoring request to a DataRobot-hosted prediction engine.  Note that in this approach scalability would also include horizontally scaling prediction engines on the DataRobot cluster.

See the above mentioned articles for additional Lambda-creation workflows to gain familiarity with the environment and process.  Create a Lambda named proxy_titanic with a Python 3.7 runtime environment.  Leverage an existing IAM role or create a new one with default execution permissions.

Screen Shot 2020-06-14 at 5.32.56 PM.png

Some sensitive information will be required to connect to the DataRobot cluster.  The load balancing hostname in front of the DataRobot Prediction Engine (DPE) cluster, the API token for the user to be used, the deployment for the model to be scored through, and the (cloud only) DataRobot key. These values can be stored within the Lambda within the Environment variables section. (For security purposes, the sensitive information included in this article is fake.)

Screen Shot 2020-06-15 at 2.40.26 PM.png

Lambda layers provide the opportunity to build Lambda code on top of libraries, and separate that code from the delivery package. It is not required to separate the libraries, although using layers simplifies the process of bringing in necessary packages and maintaining code. This demo will require the requests and pandas libraries, which are not part of the base Amazon Linux image, and must be added via a layer. This can be done by creating a virtual environment. In this example, the environment used is an Amazon Linux EC2 box. Instructions to install Python 3 on Amazon Linux are here.

Creating a ZIP file for a layer can then be done as follows:

 

python3 -m venv my_app/env
source ~/my_app/env/bin/activate
pip install requests
pip install pandas
deactivate

 

Per the Amazon documentation, this must be placed in the python or site-packages directory and is expanded under /opt.

 

cd ~/my_app/env
mkdir -p python/lib/python3.7/site-packages
cp -r lib/python3.7/site-packages/* python/lib/python3.7/site-packages/.
zip -r9 ~/layer.zip python

 

Copy the layer.zip file to a location on S3; this is required if the Lambda layer is > 10MB.

 

aws s3 cp layer.zip s3://datarobot-bucket/layers/layer.zip

 

Navigate to the Lambda service > Layers > Create Layer.  Provide a name and link to the file in S3; note that this will be the Object URL of the uploaded ZIP. It is recommended but not necessary to set compatible environments; this will make them more easily accessible in a dropdown menu when adding them to a Lambda. Select to save the layer and its ARN.

Screen Shot 2020-06-16 at 2.53.12 AM.png

Navigate back to the Lambda and click Layers under the Lambda title; add a layer and provide the ARN from the previous step.

Navigate back to the Lambda code.  The following Python code will accept a payload from Snowflake, pass it to DataRobot's Prediction API for scoring, and return a Snowflake-compatible response.

 

import os
import json
#from pandas.io.json import json_normalize
import requests
import pandas as pd
import csv

def lambda_handler(event, context):

    # set default status to OK, no DR API error
    status_code = 200
    dr_error = ""

    # The return value will contain an array of arrays (one inner array per input row).
    array_of_rows_to_return = [ ]

    try:
        # obtain secure environment variables to reach out to DataRobot API
        DR_DPE_HOST = os.environ['dr_dpe_host']
        DR_USER = os.environ['dr_user']
        DR_TOKEN = os.environ['dr_token']
        DR_DEPLOYMENT = os.environ['dr_deployment']
        DR_KEY = os.environ['dr_key']
        
        # retrieve body containing input rows
        event_body = event["body"]

        # retrieve payload from body
        payload = json.loads(event_body)

        # retrieve row data from payload
        payload_data = payload["data"]

        # map list of lists to expected inputs
        cols = ['row', 'NAME', 'SEX', 'PCLASS', 'FARE', 'CABIN', 'SIBSP', 'EMBARKED', 'PARCH', 'AGE']
        df = pd.DataFrame(payload_data, columns=cols)
        
        print("record count is: " + str(len(df.index)))
        
        # assemble and send scoring request
        headers = {'Content-Type': 'text/csv; charset=UTF-8', 'Accept': 'text/csv', 'datarobot-key': DR_KEY}
        response = requests.post(DR_DPE_HOST + '/predApi/v1.0/deployments/%s/predictions' % (DR_DEPLOYMENT),
            auth=(DR_USER, DR_TOKEN), data=df.to_csv(), headers=headers)
        
        # bail if anything other than a successful response occurred
        if response.status_code != 200:
            dr_error = str(response.status_code) + " - " + str(response.content)
            print("dr_error: " + dr_error)
            raise
        
        array_of_rows_to_return = []
        
        row = 0
        wrapper = csv.reader(response.text.strip().split('\n'))
        header = next(wrapper)
        idx = header.index('SURVIVED_1_PREDICTION')
        for record in wrapper:
            array_of_rows_to_return.append([row, record[idx]])
            row += 1
        
        # send data back in required snowflake format
        json_compatible_string_to_return = json.dumps({"data" : array_of_rows_to_return})

    except Exception as err:
        # 400 implies some type of error.
        status_code = 400
        # Tell caller what this function could not handle.
        json_compatible_string_to_return = 'failed'
        
        # if the API call failed, update the error message with what happened
        if len(dr_error) > 0:
            print("error")
            json_compatible_string_to_return = 'failed; DataRobot API call request error: ' + dr_error

    # Return the return value and HTTP status code.
    return {
        'statusCode': status_code,
        'body': json_compatible_string_to_return
    }

 

Lambda code for this article is also hosted in GitHub. A test event can be configured to make sure the Lambda acts as expected.  A DataRobot payload can be represented for this model with a couple JSON records in the following format.

{
"body": "{\"data\": [[0, \"test one\", \"male\", 3, 7.8292, null, 0, \"Q\", 0, 34.5 ], [1, \"test two\", \"female\", 3, 7, null, 1, \"S\", 0, 47 ] ] }"
}

Once this event has been created, select it from the test drop down and hit the Test button. This should produce the expected results: a 200-level success response with a JSON-encapsulated list of lists, containing the 0-based row number and the returned model value. In this case, that model value will be a score towards the positive class of label 1, e.g., Titanic passenger survivability from a binary classifier model.

Screen Shot 2020-06-21 at 12.50.54 PM.png

Additional Lambda configuration can be set under Basic Settings. Lambda serverless costs are based on RAM used * seconds duration. The more RAM allowed, the more virtual CPU is allocated. This allows for larger-sized input loads to be handled and manipulated and for processing inside the Lambda to occur more quickly. Note that this Lambda is deferring to DataRobot for the heavier work; it just needs to accommodate for the data movement. If a Lambda is exiting prematurely due to exceeding resources, then these values may need to be edited. The timeout default is 3 seconds; if the response from DataRobot for the micro-batch of records the Lambda is responsible for takes longer to process than the default value, the Lambda does not detect activity and so shuts down. We found the following values found to be safe/conservative in our testing: 256 MB and a 10-second timeout. Actual usage for each executed Lambda can be found in the associated CloudWatch logs, available under the Monitoring tab of the Lambda.

Configure the Proxy Service (AWS API Gateway)

Create IAM Role

A role must be created for a Snowflake-owned IAM user to be granted permission. This user will then be able to assume the role within the AWS account. In the console, navigate to IAM -> Roles -> Create role. When asked to Select type of trusted entity, choose Another AWS account and fill in the Account ID box with the AWS Account ID for the currently logged in account. This can be found in the ARN of other roles, from the My Account menu or from various other places. A Snowflake external ID for the account will be applied here later.

Proceed through the next screens and save this role as snowflake_external_function_role. Save the Role as Amazon Resource Name (ARN).

Create API Gateway Entry

Navigate to the API Gateway service console and choose Create API. Choose to build a REST API and select the REST protocol. Select to Create a New API. Create a friendly readable name as appropriate and click Create API. On the next screen, choose Actions -> Create Resource. Set the resource name and path to score.

Screen Shot 2020-06-15 at 5.34.16 PM.png

Next choose Actions -> Create Method. In the dropdown menu under the endpoint, choose POST. Select the checkbox to Use Lambda Proxy Integration, select the previously created Lambda, and save.

Screen Shot 2020-06-15 at 11.00.13 PM.png

Lastly, choose Actions -> Deploy API. A stage is required to be created, such as test. Complete the form and click Deploy. Note the Invoke URL on the subsequent editor page; this is to be used in creating an integration with Snowflake, later.

Secure the API Gateway Endpoint

Navigate back to the Resources of the created API; this is in the left menu above Stages. Click on the POST under the endpoint to bring up the Method Execution. Click the Method Request, toggle the Authorization dropdown to AWS_IAM, and then click the checkmark to save. Navigate back to the Method Execution and note the ARN within the Method Request.

Navigate to Resource Policy in the left menu. Add a policy which is populated with the AWS account number and the name of the previously created IAM role above, as per the Snowflake documentation.

Screen Shot 2020-06-16 at 12.01.08 AM.png

Create an API Integration Object in Snowflake

A privilege level of accountadmin is required to create an API Integration. This object will map Snowflake to the AWS Account role. Provide the Role ARN and set the allowed prefixes to include the Invoke URL from the stage referenced above.

 

use role accountadmin;

create or replace api integration titanic_external_api_integration
    api_provider=aws_api_gateway
    api_aws_role_arn='arn:aws:iam::123456789012:role/snowflake_external_function_role'
    api_allowed_prefixes=('https://76abcdefg.execute-api.us-east-1.amazonaws.com/test/')
    enabled=true;

 

Describe the integration:

 

describe integration titanic_external_api_integration;

 

Copy out the values for API_AWS_IAM_USER_ARN and API_AWS_EXTERNAL_ID.

Configure the Trust Relationship between Snowflake and the IAM Role

Navigate back to the AWS IAM service > Roles, and to the snowflake_external_function_role role. 

At the bottom of the Summary page, choose the Trust relationships tab and click the Edit trust relationship button. This will open a policy document to edit. As per the Snowflake documentation, edit the Principal attribute AWS key by replacing the existing value with the API_AWS_IAM_USER_ARN from Snowflake. Next to the sts:AssumeRole action, there will be a Condition key with an empty value between curly braces. Inside the braces, paste the following, replacing the API_AWS_EXTERNAL_ID with the value from Snowflake:

 

"StringEquals": { "sts:ExternalId": "API_AWS_EXTERNAL_ID" }

 

Click Update Trust Policy to save out of this screen.

Create the External Function

The external function can now be created inside Snowflake to reference the trusted endpoint to invoke via the previously built API Integration.  Be sure to match the expected parameter value in the function definition to that which the Lambda is expecting.

 

create or replace external function 
udf_titanic_score(name string, sex string, pclass int, fare numeric(10,5), 
   cabin string, sibsp int, embarked string, parch int, age numeric(5,2))
   returns variant
   api_integration = titanic_external_api_integration
   as 'https://76abcdefg.execute-api.us-east-1.amazonaws.com/test/score';

 

 

The function is now ready for use!

Calling the External Function and Performance Considerations

The function can simply now be called as expected. This code scores 100,000 Titanic passenger records:

 

select passengerid
, udf_titanic_score(name, sex, pclass, fare, cabin, sibsp, embarked, parch, age) as score
from passengers_100k;

 

Screen Shot 2020-06-21 at 1.11.31 PM.png

In the above prediction, Passenger 7254024 has an 84.4% chance of Titanic survivability.

Some of my own observations (at the time of this writing

  • Full received payloads in this case contained ~1860 records. Payloads were about 0.029 MB in size. I assume Snowflake is limiting them to 0.03 MB.
  • Whether scoring from an extra small-, small, or medium-sized compute warehouse on Snowflake, the Lambda concurrency CloudWatch metrics dashboard always showed a concurrent execution peak of 8. Prior to testing I speculated it might be 8 per node. Overall, this represents a rather gentle load on scoring infrastructure.
  • Performance should be rather satisfactory whether the model is run in the Lambda itself or offset to a DataRobot Prediction Engine. Note that for larger batch jobs and maximum throughput, other methods are still more efficient with time and resources.
  • Testing against an r4.xlarge Dedicated Prediction Engine on DataRobot produced a rate of around 13,800 records/s for this particular dataset and model.
  • Snowflake determines payload size and concurrency based on a number of factors, to set both payload size and concurrent request threads.  As of this writing, a controllable payload ceiling can be specified with a MAX_BATCH_ROWS value during external function creation.  Future options may allow developers greater control over payload size, concurrency, and scaling with warehouse upsizing.

Snowflake Streaming Ingest with Streams and Tasks

There are multiple options to bring data into Snowflake in a streaming fashion. One option is to use Snowflake's native periodic data-loading capabilities with Snowpipe. As these records arrive, it would be nice to do something with them without an external driving ETL/ELT this is where Snowflake Streams and Tasks come in. A set of tasks to handle new records upon arrival can be constructed.

Ingest Pipeline Architecture

Screen Shot 2020-06-23 at 12.12.36 AM.png

Create Staging and Presentation Tables

Tables need to be created to hold the newly arrived records loaded from Snowpipe, and to hold the processed and scored records for reporting. In this example, a raw passengers table will be created in a STG schema and a scored passengers table will be presented in the PUBLIC schema.

 

create or replace TABLE TITANIC.STG.PASSENGERS (
	PASSENGERID int,
	PCLASS int,
	NAME VARCHAR(100),
	SEX VARCHAR(10),
	AGE NUMBER(5,2),
	SIBSP int,
	PARCH int,
	TICKET VARCHAR(30),
	FARE NUMBER(10,5),
	CABIN VARCHAR(25),
	EMBARKED VARCHAR(5)
);

create or replace TABLE TITANIC.PUBLIC.PASSENGERS_SCORED (
	PASSENGERID int,
	PCLASS int,
	NAME VARCHAR(100),
	SEX VARCHAR(10),
	AGE NUMBER(5,2),
	SIBSP int,
	PARCH int,
	TICKET VARCHAR(30),
	FARE NUMBER(10,5),
	CABIN VARCHAR(25),
	EMBARKED VARCHAR(5),
	SURVIVAL_SCORE NUMBER(11,10)
);

 

Create the Snowpipe

Snowflake will need to be connected to an external stage object. Snowflake documentation provides directions explaining how to set up a storage integration with AWS and IAM; some of that is referenced below.

 

use role accountadmin;

--note a replace will break all existing associated stage objects!
create or replace storage integration SNOWPIPE_INTEGRATION
type = EXTERNAL_STAGE
STORAGE_PROVIDER = S3
STORAGE_AWS_ROLE_ARN = 'arn:aws:iam::123456789:role/snowflake_lc_role'
enabled = true
STORAGE_ALLOWED_LOCATIONS = ('s3://bucket');

 

Once the integration is available, it can be used to create a stage that will map to S3 and use the integration to apply security.

 

CREATE or replace STAGE titanic.stg.snowpipe_passengers 
URL = 's3://bucket/snowpipe/input/passengers' 
storage_integration = SNOWPIPE_INTEGRATION;

 

Lastly, the Snowpipe itself is created to map this stage to a table.  A file format is created for it below as well.

 

CREATE OR REPLACE FILE FORMAT TITANIC.STG.DEFAULT_CSV TYPE = 'CSV' COMPRESSION = 'AUTO' FIELD_DELIMITER = ',' 
RECORD_DELIMITER = '\n' SKIP_HEADER = 1 FIELD_OPTIONALLY_ENCLOSED_BY = '\042' TRIM_SPACE = FALSE 
ERROR_ON_COLUMN_COUNT_MISMATCH = TRUE ESCAPE = 'NONE' ESCAPE_UNENCLOSED_FIELD = '\134' DATE_FORMAT = 'AUTO' 
TIMESTAMP_FORMAT = 'AUTO' NULL_IF = ('');

create or replace pipe titanic.stg.snowpipe auto_ingest=true as 
copy into titanic.stg.passengers
from @titanic.stg.snowpipe_passengers
file_format = TITANIC.STG.DEFAULT_CSV;

 

Automate the Snowpipe Loading

Snowflake provides a couple of options for the loading of new data as it arrives; Option 1 from the Snowflake documentation will be utilized to use a Snowflake SQS queue directly. In particular, Step 4 to create new file event notifications is required. Navigate to the S3 bucket, choose the Properties tab > Events tile, and choose Add notification. Create a notification, which adds a message to the specified SQS queue retrieved from the Snowflake pipe for every new file arrival.

Screen Shot 2020-06-22 at 4.55.46 PM.png

The pipe is now ready to accept and load data.

Create the Stream

here are two types of stream objects that can be created in Snowflake: standard and append-only. Standard stream objects capture any type of change to a table, and append-only stream objects capture inserted rows; the former can be used for general Change Data Capture (CDC) processing while the latter fits nicely for simple new row ingest processing, thus it will be used in this article.

Logically, the stream can be thought of in this approach as a table, but a table that contains only records that are new since the last time any data was selected from it. Once a DML query is made sourcing a stream, the rows returned are considered consumed and the stream becomes empty. This is similar to a queue in programming terms.

 

create or replace stream TITANIC.STG.new_passengers_stream 
on table TITANIC.STG.PASSENGERS append_only=true;

 

Create the Task

A task is a step or series/tree of cascading steps that can be constructed to perform an ELT operation. Tasks can be scheduled similar to cron jobs and set to run by days, times, or periodic intervals.

The following simple task will score the Titanic passengers through the UDF and load the scored data to the presentation layer. It will check to see if new records exist in the stream every 5 minutes; if records are found, the task will run. The task will be created in a suspended state; enable the task by resuming it. Note that many timing options are available for scheduling based on days, times, or periods.

 

CREATE or replace TASK TITANIC.STG.score_passengers_task
    WAREHOUSE = COMPUTE_WH
    SCHEDULE = '5 minute'
WHEN
    SYSTEM$STREAM_HAS_DATA('TITANIC.STG.NEW_PASSENGERS_STREAM')
AS
    INSERT INTO TITANIC.PUBLIC.PASSENGERS_SCORED
    select passengerid, pclass, name, sex, age, sibsp, parch, ticket, fare, cabin, embarked,
    udf_titanic_score(name, sex, pclass, fare, cabin, sibsp, embarked, parch, age) as score
    from TITANIC.STG.new_passengers_stream;

ALTER TASK score_passengers_task RESUME;

 

Ingest and Scoring Pipeline Complete!

The end-to-end pipeline is now complete. A PASSENGERS.csv file (available in GitHub) can run the pipeline, copying it into the watched bucket. The file prefix will result in the data being ingested into a staging schema, scored through a DataRobot model, and then loaded into the presentation schema—all without any external ETL tooling!

aws s3 cp PASSENGERS.csv s3://bucket/snowpipe/input/passengers/PASSENGERS.csv

Labels (2)
Version history
Revision #:
29 of 29
Last update:
‎08-10-2020 02:32 AM
Updated by:
 
Contributors