cancel
Showing results for 
Search instead for 
Did you mean: 

DataRobot & Snowflake: Client-Request Model Scoring

DataRobot & Snowflake: Client-Request Model Scoring

This is the second in a series of articles leveraging the DataRobot platform with the cloud database Snowflake. See the previous article and introduction to the series.

DataRobot provides several options for scoring data through a model; the most integrated and feature rich method is scoring through the Prediction API. The API can be leveraged and scaled horizontally to support real-time scoring requests as well as batch scoring approaches. A single API request can be sent with a data payload of one or more records, and many requests can be sent concurrently. DataRobot keeps track of data coming in for scoring requests and compares it to training data used to build a model as well. Using model management, technical performance statistics around the API endpoint are delivered along with  data drift and model drift (associated with the health of the model itself).

The information required to construct a successful API request can be collected from several places within DataRobot, although the quickest way to capture all values required is from the DeploymentsIntegrations tab within DataRobot. A sample Python script is provided, which includes everything of interest.

snowflake-client-model-url2.png

The API endpoint accepts CSV and JSON data. The Content-Type header value must be set appropriately for the type of data being sent in (text/plain or application/json); the raw API request will always respond with JSON.

The following methods provide several examples of calling a request to score data from Snowflake via a client request from a local or standing server environment.

Using the Prediction API

Interesting values to take note in the integration script are as follows:

 

USERNAME = 'doyouevendatadoyouimeanreallybro@datarobot.com'
API_KEY = 'YOUR API KEY'
DEPLOYMENT_ID = 'YOUR DEPLOYMENT ID'
DATAROBOT_KEY = 'YOUR DR KEY'
# 'Content-Type' header value
# url domain hostname

 

Deployment access and privileges are at a different level than projects in DataRobot; an account may have the ability to score through a model, but not see the project or data that went into creating it. As a best practice, production workflows should be associated with some kind of service account to abstract an employee from a production scoring pipeline. Typically, this is an e-mail for a group distribution list of admins and interested parties.

  • Authentication and privileges are established via the USERNAME and API_KEY. 
  • DEPLOYMENT_ID is the unique ID of the Deployment in DataRobot, which sits in front of the model (although that can be swapped behind the ID, if desired).
  • DATAROBOT_KEY is an additional engine access key, only required for the Managed AI Cloud of DataRobot. Customers with their own installation can remove this option.
  • As noted above, the appropriate Content-Type header needs be specified for the input data.
  • The URL contains the hostname for scoring data. Typically, this is a load balancer in front of one or more prediction engines.

The following script snippet shows how to extract some data from Snowflake via the Python connector and send it to DataRobot for scoring. It will create a single thread with a single request. Maximizing speed involves creating parallel request threads with appropriately sized data payloads to handle input of any size. 

Consider this simple example of creating a scoring request and working with results.

 

import snowflake.connector
import datetime
import sys
import pandas as pd
import requests
from pandas.io.json import json_normalize

# snowflake parameters
SNOW_ACCOUNT = 'dp12345.us-east-1'
SNOW_USER = 'your user'
SNOW_PASS = 'your pass'
SNOW_DB = 'TITANIC'
SNOW_SCHEMA = 'PUBLIC'

# create a connection
ctx = snowflake.connector.connect(
          user=SNOW_USER,
          password=SNOW_PASS,
          account=SNOW_ACCOUNT,
          database=SNOW_DB,
          schema=SNOW_SCHEMA,
          protocol='https'
)

# create a cursor
cur = ctx.cursor()

# execute sql
sql = "select passengerid, pclass, name, sex, age, sibsp, parch, fare, cabin, embarked " \
    + " from titanic.public.passengers"
cur.execute(sql)

# fetch results into dataframe
df = cur.fetch_pandas_all()

 

Screen Shot 2020-01-22 at 10.25.28 AM.png

Note that the fields are all capitalized as a result of following ANSI standard SQL; the greater takeaway is that DataRobot is case-sensitive to feature names. These fields must match between the data being provided and what the model is expecting. Depending on the model-building workflow used, this may mean that database extractions via SQL require aliasing of the columns to match model-feature case. At this point the data is in the Python script. Any pre-processing that occurred outside of DataRobot before building the model could be applied now as well. Upon completion, it is time to score the data.

 

# datarobot parameters
API_KEY = 'YOUR API KEY'
USERNAME = 'mike.t@datarobot.com'
DEPLOYMENT_ID = 'YOUR DEPLOYMENT ID'
DATAROBOT_KEY = 'YOUR DR KEY'
# replace with the load balancer for your prediction instance(s)
DR_PREDICTION_HOST = 'https://YOURSERVER.com'

headers = {'Content-Type': 'text/plain; charset=UTF-8', 'datarobot-key': DATAROBOT_KEY}
url = '{dr_prediction_host}/predApi/v1.0/deployments/{deployment_id}/'\
          'predictions'.format(dr_prediction_host=DR_PREDICTION_HOST, deployment_id=DEPLOYMENT_ID)

predictions_response = requests.post(
        url,
        auth=(USERNAME, API_KEY),
        data=df.to_csv(),
        headers=headers,
        # business key passed through
        params={'passthroughColumns' : 'PASSENGERID'}
    )

if predictions_response.status_code != 200:
    print("error {status_code}: {content}".format(status_code=predictions_response.status_code, content=predictions_response.content))
    sys.exit(-1)

# first 3 records json structure
predictions_response.json()['data'][0:3]

 

Screen Shot 2020-01-22 at 10.29.45 AM.png

The above is a simple, straightforward call with little error handling; it is intended only as an example for reference. The request includes a parameter value to request the logical or business key for the data being returned along with the labels and scores. As stated previously, the API always returns records in JSON format.

Because Snowflake is flexible when working with JSON, the option to simply load the response into the database is available.

 

df_response = pd.DataFrame.from_dict(predictions_response.json())
df_response.head()

 

Screen Shot 2020-01-22 at 10.35.17 AM.png

The following code creates a table and inserts the raw JSON. Note that it only does so with an abbreviated set of five records. For the sake of this demonstration, the records are being inserted one at a time via the Python Snowflake connector. This is not a best practice and provided for demonstration only; when doing this yourself, make sure Snowflake instead ingests data via flat files and stage objects. 

 

ctx.cursor().execute('create or replace table passenger_scored_json(json_rec variant)')

df5 =  df_response.head()

# this is not the proper way to insert data into snowflake, but is used for quick demo convenience.
# snowflake ingest should be done via snowflake stage objects.
for ind, row in df5.iterrows():
    escaped = str(row['data']).replace("'", "''")
    ctx.cursor().execute("insert into passenger_scored_json select parse_json('{rec}')".format(rec=escaped))
    print(row['data'])

 

Snowflake's native JSON functions can be used to parse and flatten the data. The below code retrieves all scores towards the positive class label 1 for survival from the binary classification model.

 

select json_rec:passthroughValues.PASSENGERID::int as passengerid 
, json_rec:prediction::int as prediction 
, json_rec:predictionThreshold::numeric(10,9) as prediction_threshold 
, f.value:label as prediction_label 
, f.value:value as prediction_score 
from titanic.public.passenger_scored_json
, table(flatten(json_rec:predictionValues)) f 
where f.value:label = 1;

 

Screen Shot 2020-01-22 at 10.45.35 AM.png

The raw score can be used. It is also provided against the threshold. In this example, passenger 892's chance (11.69%) was less than the 50% threshold; thus, the prediction towards the positive class survival label 1 was 0 (i.e., non-survival).

The original response in Python can be flattened within Python as well.

 

df_results = json_normalize(data=predictions_response.json()['data'], record_path='predictionValues',
    meta = [['passthroughValues', 'PASSENGERID'], 'prediction', 'predictionThreshold'])
df_results = df_results[df_results['label'] == 1]
df_results.rename(columns={"passthroughValues.PASSENGERID": "PASSENGERID"}, inplace=True)
df_results.head()

 

Screen Shot 2020-01-22 at 11.14.11 AM.png

The above dataframe can be written to one or more CSV or compressed files into a Snowflake stage environment for ingestion into the database.

Using the Client-Side Batch Scoring Script

DataRobot offers a client-side batch scoring script available on githubThis script is typically installed as a Python utility, but has some alternative installation methods including both Linux and Windows native command line binaries. Unlike the API endpoint, this script manages requests: it expects all input data to be in CSV format and produces only CSV output.

Many options are available with this script. It can work with any size input, will shred the input file into individual scoring requests and batch payloads, and it can manage running concurrent scoring streams as well. The example below will hit the predictionExplanations endpoint rather than prediction; this provides not only the score, but the top three reasons that impacted the score for the particular record being scored.

To keep this demo simple and straightforward, the input CSV file is created from the available dataframe containing SQL results. Production pipelines may use code, ETL tools, or direct data exports to external tables; files, in the case of many traditional databases; or object storage, in the case of a database like Snowflake. From this point the batch scoring script can be leveraged. The script is typically called from the command line, although it can be called via a dynamic string and wrapper (as illustrated in shown in the Python example below).

 

import os

df.to_csv('input.csv', index=False)
os.system('rm output.csv')

batch_script_string = 'batch_scoring_deployment_aware \
--host="{host}" \
--user="{user}" \
--api_token="{api_token}" \
--out="output.csv" \
--datarobot_key="{datarobot_key}" \
--keep_cols="PASSENGERID" \
--max_prediction_explanations=3 \
{deployment_id} \
input.csv'.format(host=DR_PREDICTION_HOST, user=USERNAME, api_token=API_KEY, datarobot_key=DATAROBOT_KEY, deployment_id=DEPLOYMENT_ID)
os.system(batch_script_string)

 

The script leverages some default options: it auto-samples to create a payload size of about 2.5MB worth of records, or passengers in this case. It will run four concurrent parallel threads unless otherwise specified. Shredding of an input file of any size is handled by the script; it will send the determined or parameter-defined payloads in both size and concurrent quantity until the input file is exhausted. Output showing the current state can be written to a log.

snowflake-client-model-outputlog2.png

The top three prediction explanations were requested in addition to the score and these are observable in the output file.

 

df_output = pd.read_csv('output.csv')
df_output.head()

 

Screen Shot 2020-01-24 at 2.26.06 AM.png

Here it can be seen that passenger 894's primary feature impact came from the value of the sex column; being male moved the needle the largest amount, away from the positive class label 1 for survival. Passenger 893's third class ticket is a very strong negative indicator as another example.

Output.csv can then be ingested just any CSV file within a customer database environment and tech stack.

Code for this article is available in github.

Labels (1)