DataRobot & Snowflake: Server-Side Model Scoring

cancel
Showing results for 
Search instead for 
Did you mean: 

DataRobot & Snowflake: Server-Side Model Scoring

While prior Snowflake articles used Snowflake as one might any other database, this article will focus on more advanced scoring options with Snowflake as a cloud native database and their benefits. There are three scoring approaches that will be covered which will leverage DataRobot's Batch Prediction API:

  1. DataRobot GUI—Table Scoring (JDBC, API behind the scenes)
  2. DataRobot API—Query As Source (JDBC, API)
  3. DataRobot API—S3 Scoring with pre/post SQL (S3, API)

Each option has its own trade-off between simplicity and performance to meet business requirements and will be discussed in further detail below.  A brief overview of the Batch Prediction API and prerequisites universal to all scoring approaches are covered first.

Batch Prediction API

The Batch Prediction API allows a dataset of any size to be sent to DataRobot for scoring. This data is sliced up into individual HTTP requests and sent in parallel threads to saturate the Dedicated Prediction Engines (DPEs) available to maximize scoring throughput. (These DPEs can be scaled horizontally for DataRobot customers.) Source data and target data can be local files, S3/object storage, or JDBC data connections. These can be mixed and matched as well. See in-app documentation for additional information (search for in-app documentation for Data connections).

Prerequisites/Considerations

  • DataRobot customers with their own installation on-premise or VPC will likely already have connectivity between their Snowflake account and their DataRobot environment. If additional network access is required, their infrastructure teams will be able to fully control network connectivity as well.
  • DataRobot customers on the public cloud offering who wish for DataRobot to access their Snowflake instance may require additional infrastructure configuration and will need to work with DataRobot's support team to establish it. Snowflake is by default publicly accessible (e.g. https://dp12345.us-east-1.snowflakecomputing.com). Customers may have set up easy/vanity local DNS entries (customer.snowflakecomputing.com) which DataRobot cannot resolve, or be leveraging AWS PrivateLink with the option to block public IPs.
  • SSO/SAML support will be available soon, but as of the writing of this article, is not yet available. Hence, locally authenticated Snowflake accounts must be used.
  • As a best practice, a service account is recommended to be used in a production scoring pipeline.
  • The Snowflake write back account will require CREATE TABLE, INSERT, and UPDATE privileges depending on use case and workflow. Additionally the JDBC driver requires the CREATE STAGE privilege to perform faster stage bulk inserts vs. regular array binding inserts. This allows for the creation of a temporary stage object used for the duration of the JDBC session.

DataRobot GUI—Table Scoring

Quick and simple batch scoring jobs can be configured directly within the DataRobot application. Jobs can be run ad-hoc or scheduled. Generally speaking, this scoring approach is a great option for use cases that only require scoring for a reasonably small sized table.  It enables a data scientist or business analyst to perform some scoring and write back to the database, often to a sandbox/analysis area.

This example will use the titanic kaggle competition dataset with a binary classifier predicting 1/0 with the positive class label 1 for survival. Navigating to a deployed model, the Snowflake tile is available under the Integrations tab for the deployment.

Screen Shot 2020-03-20 at 10.22.43 PM.png

Choose the Snowflake tile. The GUI allows for browsing of AI Catalog items. In this case, the test dataset has been uploaded to the titanic.public.passengers table, which has a dynamic AI Catalog item associated with it. Dynamic means the data has not been snapshot and kept into Snowflake; the dataset will be pulled from the database at run-time. An example of creating an AI Catalog item can be found in the Snowflake Project Creation article.

Screen Shot 2020-03-20 at 10.26.00 PM.png

User credentials will be required that will be associated with running the job. The next step will allow for some prediction options to be chosen. In this example, three explanations have been requested. As a best practice, the surrogate key rather than the entire record has been added as a pass through to the result table, as the scoring results can simply be joined back to the original data in the database on PASSENGERID. Not including the full record is more network efficient; however, in the event of no key being present, additional values of interest or the entire record would be necessary to understand input data associated with the values produced by the deployed model.

Screen Shot 2020-03-20 at 11.10.18 PM.png

Specifying the destination is the next step. There is no AI Catalog for this as the catalog is typically for datasets that one would create a project from, although in the case of this scoring job, a table is being written to. In the case of the options specified below, DataRobot is creating the table (community_passengers) in the database to write to. The interface currently requires that the JDBC URL to be specified. An example URL is as follows, with the working/default database being specified as part of the connection string.

jdbc:snowflake://dp12345.us-east-1.snowflakecomputing.com/?db=titanic

DataRobot will quote the schema and table name in the create table SQL—making them case sensitive—a best practice is to specify the values in ALL CAPS to create an ANSI standard case-insensitive object. The table will be created upon clicking Next.

Screen Shot 2020-03-20 at 11.29.43 PM.png

The last tab allows for the job to be named and a regular running schedule to be set, although that option will not be used in this example and the job will be run on demand.

Screen Shot 2020-03-20 at 11.43.43 PM.png

The job itself can be edited, although will now be run on demand with the "Run now" button.

Screen Shot 2020-03-20 at 11.45.26 PM.png

The job is then added to the Batch Prediction API queue. One batch job is allowed to run at a time. The results will show up in Snowflake in a streaming fashion until all records are scored through the pipeline.  Completed results are shown below.

Screen Shot 2020-03-20 at 11.51.40 PM.png

DataRobot API - Query As Source

Screen Shot 2020-03-30 at 10.09.28 PM.png

DataRobot's Batch Prediction API can also be used programmatically as well. See in-app documentation for more information. In general, batch prediction jobs must be initialized and will be added to a job queue. Jobs from local files will not begin until data is uploaded. For a Snowflake-to-Snowflake job, both ends of the pipeline must be set with Snowflake source and target. Additional details about the job (deployment, prediction host, columns to be passed through) can be specified as well (see Batch Prediction API in the in-app documentation for a full list of available options).

The benefits of using the API over the GUI in this approach include:

  • The request code can be inserted into any production pipeline, and sit between pre-scoring and post-scoring steps.
  • The code can be triggered by an organization's existing scheduler, or upon events as they take place.
  • It is not necessary to create an AI Catalog, as the API will accept a table, view, or query.

Below is an example of how DataRobot's Batch Prediction API can be used to score Snowflake data via a basic JDBC Connection.

 

import pandas as pd
import requests
import time
from pandas.io.json import json_normalize
import json

import my_creds

# datarobot parameters
API_KEY = my_creds.API_KEY
USERNAME = my_creds.USERNAME
DEPLOYMENT_ID = my_creds.DEPLOYMENT_ID
DATAROBOT_KEY = my_creds.DATAROBOT_KEY
# replace with the load balancer for your prediction instance(s)
DR_PREDICTION_HOST = my_creds.DR_PREDICTION_HOST
DR_APP_HOST = 'https://app.datarobot.com'

DR_MODELING_HEADERS = {'Content-Type': 'application/json', 'Authorization': 'token %s' % API_KEY}

headers = {'Content-Type': 'text/plain; charset=UTF-8', 'datarobot-key': DATAROBOT_KEY}

url = '{dr_prediction_host}/predApi/v1.0/deployments/{deployment_id}/'\
          'predictions'.format(dr_prediction_host=DR_PREDICTION_HOST, deployment_id=DEPLOYMENT_ID)

# snowflake parameters
SNOW_USER = my_creds.SNOW_USER
SNOW_PASS = my_creds.SNOW_PASS

 

An existing data connection can be leveraged here for connecting to a database. An example of creating one via the GUI can be found in the Snowflake Project Creation article. In the case below, one will be looked up by name.

 

"""
    get a data connection by name, return None if not found
"""
def dr_get_data_connection(name):
    
    data_connection_id = None

    response = requests.get(
            DR_APP_HOST + '/api/v2/externalDataStores/',
            headers=DR_MODELING_HEADERS,
        )

    if response.status_code == 200:

        df = pd.io.json.json_normalize(response.json()['data'])[['id', 'canonicalName']]

        if df[df['canonicalName'] == name]['id'].size > 0:
            data_connection_id = df[df['canonicalName'] == name]['id'].iloc[0]
        
    else:

        print('Request failed; http error {code}: {content}'.format(code=response.status_code, content=response.content))

    return data_connection_id

data_connection_id = dr_get_data_connection('snow_3_12_0_titanic')

 

The Batch Prediction API will also need to specify credentials to be used for the job. The Snowflake user credentials can be saved securely to the server to run the job. Note that the DataRobot privileges being applied have been established via the DataRobot API Token in the header level of the request or session. These privileges will own the prediction job created and must be able to access the deployed model. Credentials for the database can be created or looked up with the following code snippets.

 

# get a saved credential set, return None if not found
def dr_get_catalog_credentials(name, cred_type):
    if cred_type not in ['basic', 's3']:
        print('credentials type must be: basic, s3 - value passed was {ct}'.format(ct=cred_type))
        return None
    
    credentials_id = None

    response = requests.get(
            DR_APP_HOST + '/api/v2/credentials/',
            headers=DR_MODELING_HEADERS,
        )

    if response.status_code == 200:

        df = pd.io.json.json_normalize(response.json()['data'])[['credentialId', 'name', 'credentialType']]

        if df[(df['name'] == name) & (df['credentialType'] == cred_type)]['credentialId'].size > 0:
            credentials_id = df[(df['name'] == name) & (df['credentialType'] == cred_type)]['credentialId'].iloc[0]
     
    else:

        print('Request failed; http error {code}: {content}'.format(code=response.status_code, content=response.content))

    return credentials_id

# create credentials set
def dr_create_catalog_credentials(name, cred_type, user, password, token=None):
    if cred_type not in ['basic', 's3']:
        print('credentials type must be: basic, s3 - value passed was {ct}'.format(ct=cred_type))
        return None
    
    if cred_type == 'basic':  
        json = {
            "credentialType": cred_type,
            "user": user,
            "password": password,
            "name": name
        }
    elif cred_type == 's3' and token != None:  
        json = {
            "credentialType": cred_type,
            "awsAccessKeyId": user,
            "awsSecretAccessKey": password,
            "awsSessionToken": token,
            "name": name
        }
    elif cred_type == 's3' and token == None:  
        json = {
            "credentialType": cred_type,
            "awsAccessKeyId": user,
            "awsSecretAccessKey": password,
            "name": name
        }
        
    response = requests.post(
        url = DR_APP_HOST + '/api/v2/credentials/',
        headers=DR_MODELING_HEADERS,
        json=json
    )
    
    if response.status_code == 201:

        return response.json()['credentialId']
        
    else:

        print('Request failed; http error {code}: {content}'.format(code=response.status_code, content=response.content))

# get or create a credential set
def dr_get_or_create_catalog_credentials(name, cred_type, user, password, token=None):
    cred_id = dr_get_catalog_credentials(name, cred_type)
    
    if cred_id == None:
        return dr_create_catalog_credentials(name, cred_type, user, password, token=None)
    else:
        return cred_id

credentials_id = dr_get_or_create_catalog_credentials('snow_community_credentials', 
                                                      'basic', my_creds.SNOW_USER, my_creds.SNOW_PASS)

 

Next a session will be created and the job will be defined. The job will be submitted and slottedto run asynchronously,  and an HTTP 202 status code will be returned upon successful submission. Job state can be retrieved by querying the API for the current state of the job.

 

session = requests.Session()
session.headers = {
    'Authorization': 'Bearer {}'.format(API_KEY)
}

 

The table to hold the results has been created in Snowflake with the following SQL statement, reflecting the structure found within the in-app doc:

 

create or replace TABLE PASSENGERS_SCORED_BATCH_API (
	SURVIVED_1_PREDICTION NUMBER(10,9),
	SURVIVED_0_PREDICTION NUMBER(10,9),
	SURVIVED_PREDICTION NUMBER(38,0),
	THRESHOLD NUMBER(6,5),
	POSITIVE_CLASS NUMBER(38,0),
	PASSENGERID NUMBER(38,0)
);

 

The job will be specified as follows:

  • Source: Snowflake JDBC
  • Source data: query results (simple select * from passengers)
  • Source fetch size: 100,000 (max fetch data size)
  • Job concurrency: 4 prediction core threads requested
  • Passthrough Columns: keep the surrogate key PASSENGERID
  • Target table: PUBLIC.PASSENGERS_SCORED_BATCH_API
  • statementType: insert - the data will simply be inserted into the table

 

job_details = {
    "deploymentId": DEPLOYMENT_ID,
    "numConcurrent": 4,
    "passthroughColumns": ["PASSENGERID"],
    "includeProbabilities": True,
    "predictionInstance" : {
        "hostName": DR_PREDICTION_HOST,
        "datarobotKey": DATAROBOT_KEY
    },
    "intakeSettings": {
        "type": "jdbc",
        "fetchSize": 100000,
        "dataStoreId": data_connection_id,
        "credentialId": credentials_id,
        #"table": "PASSENGERS_500K",
        #"schema": "PUBLIC",
        "query": "select * from PASSENGERS"
    },
    'outputSettings': {
        "type": "jdbc",
        "table": "PASSENGERS_SCORED_BATCH_API",
        "schema": "PUBLIC",
        "statementType": "insert",
        "dataStoreId": data_connection_id, 
        "credentialId": credentials_id
    }
}

 

The job can be submitted for processing. Upon successful submission, a link will be provided to check job state and details.

 

response = session.post(
        DR_APP_HOST + '/api/v2/batchPredictions',
        json=job_details
    )

 

The job may or may not be in queue, depending on if other jobs are in front of it. Once launched it will proceed to initialization and running stages until aborted or completed. A loop can be created to repetitively check the state of the asynchronous job, and hold control of a process until job completion with an ABORTED or COMPLETED status.

 

if response.status_code == 202:
    
    job = response.json()
    print('queued batch job: {}'.format(job['links']['self']))

    while job['status'] == 'INITIALIZING':
        time.sleep(3)
        response = session.get(job['links']['self'])
        response.raise_for_status()
        job = response.json()
        
    print('completed INITIALIZING')
        
    if job['status'] == 'RUNNING':

        while job['status'] == 'RUNNING':
            time.sleep(3)
            response = session.get(job['links']['self'])
            response.raise_for_status()
            job = response.json()
            
    print('completed RUNNING')
    print('status is now {status}'.format(status=job['status']))
    
    if job['status'] != 'COMPLETED':
        for i in job['logs']:
            print(i)
    
else:
    
    print('Job submission failed; http error {code}: {content}'.format(code=response.status_code, content=response.content))

 

This code can be found in github.

DataRobot API—S3 Scoring with pre/post SQL for New Records Only

This example highlights how an S3 pipeline can work between Snowflake sources and targets. It is not required to do pre/post processing in SQL, although this example will show scoring changes or new records only based on pre-scoring retrieval of the last successful scoring run, and a post-scoring process to populate a target table and update the successful ETL run history.

Data will be loaded into a STG schema on Snowflake that exists to support ETL/ELT pipelines, and will be updated into the target presentation table in the PUBLIC schema via a bulk update, rather than individual update statements (which would be very slow on Snowflake and other analytic databases vs. traditional row-store operational databases.)  The target presentation table will contain only a single field for reporting purposes from the scored results table, the SURVIVAL field. Using S3 will allow for stage objects to be used on data extract and load, and as discrete operations separate from scoring, can minimize the amount of time an ETL compute warehouse is up and running during the pipeline operations.

Considerations that may result in S3 being part of a scoring pipeline include:

  • Leveraging Snowflake's native design to write to S3 (and possibly shred the data into multiple files).
  • Similarly using the native bulk insert capability.
  • Currently, Snowflake compute warehouses charge based on the first 60 seconds of spin-up for a cluster, then each second after that. The prior methods (above) stream data out and in via JDBC and will keep a cluster active throughout the scoring process. Discretely separating out extract, scoring, and ingest steps may allow for a reduction in time for when the compute warehouse is actually running, and a reduction in cost as a result.
  • S3 inputs and scored sets could easily be used to create a point-in-time archive of data.

Screen Shot 2020-03-30 at 10.10.49 PM.png

In this example, there is a simple ETL_HISTORY table showing the history of the scoring job to score records. The name of the job is pass_scoring, and the last three times it ran were March 3rd, 7th, and 11th.

Screen Shot 2020-03-21 at 7.52.07 PM.png

The next job to run should score any changed records greater than or equal to the last time, but before the current timestamp when the job is being run. Upon successful completion of the job, a new record will be placed into this table. Which records will be scored of the 500k in this table?

Screen Shot 2020-03-21 at 7.49.25 PM.png

  • Row 1 in this example will not be scored; it has not changed since the prior successful ETL run on the 11th.
  • Row 2 will be re-scored, as it was updated on the 20th.
  • Row 3 will be scored for the first time, as it was newly created on the 19th.

Initial imports and various environment variables for DataRobot, Snowflake, and AWS S3.

 

import pandas as pd
import requests
import time
from pandas.io.json import json_normalize
import snowflake.connector

import my_creds

# datarobot parameters
API_KEY = my_creds.API_KEY
USERNAME = my_creds.USERNAME
DEPLOYMENT_ID = my_creds.DEPLOYMENT_ID
DATAROBOT_KEY = my_creds.DATAROBOT_KEY
# replace with the load balancer for your prediction instance(s)
DR_PREDICTION_HOST = my_creds.DR_PREDICTION_HOST
DR_APP_HOST = 'https://app.datarobot.com'

DR_MODELING_HEADERS = {'Content-Type': 'application/json', 'Authorization': 'token %s' % API_KEY}

# snowflake parameters
SNOW_ACCOUNT = my_creds.SNOW_ACCOUNT
SNOW_USER = my_creds.SNOW_USER
SNOW_PASS = my_creds.SNOW_PASS
SNOW_DB = 'TITANIC'
SNOW_SCHEMA = 'PUBLIC'

# ETL parameters
JOB_NAME = 'pass_scoring'

 

Similar to the prior example, credentials will be required to leverage S3. Credentials can be created and saved, or looked up, with the following snippets for S3 access. The account should have privileges to access the same area that a Snowflake Stage object is using to read/write data from. Creating the Stage is out of scope for this article.

 

# get a saved credential set, return None if not found
def dr_get_catalog_credentials(name, cred_type):
    if cred_type not in ['basic', 's3']:
        print('credentials type must be: basic, s3 - value passed was {ct}'.format(ct=cred_type))
        return None
    
    credentials_id = None

    response = requests.get(
            DR_APP_HOST + '/api/v2/credentials/',
            headers=DR_MODELING_HEADERS,
        )

    if response.status_code == 200:

        df = pd.io.json.json_normalize(response.json()['data'])[['credentialId', 'name', 'credentialType']]

        if df[(df['name'] == name) & (df['credentialType'] == cred_type)]['credentialId'].size > 0:
            credentials_id = df[(df['name'] == name) & (df['credentialType'] == cred_type)]['credentialId'].iloc[0]
     
    else:

        print('Request failed; http error {code}: {content}'.format(code=response.status_code, content=response.content))

    return credentials_id

# create credentials set
def dr_create_catalog_credentials(name, cred_type, user, password, token=None):
    if cred_type not in ['basic', 's3']:
        print('credentials type must be: basic, s3 - value passed was {ct}'.format(ct=cred_type))
        return None
    
    if cred_type == 'basic':  
        json = {
            "credentialType": cred_type,
            "user": user,
            "password": password,
            "name": name
        }
    elif cred_type == 's3' and token != None:  
        json = {
            "credentialType": cred_type,
            "awsAccessKeyId": user,
            "awsSecretAccessKey": password,
            "awsSessionToken": token,
            "name": name
        }
    elif cred_type == 's3' and token == None:  
        json = {
            "credentialType": cred_type,
            "awsAccessKeyId": user,
            "awsSecretAccessKey": password,
            "name": name
        }
        
    response = requests.post(
        url = DR_APP_HOST + '/api/v2/credentials/',
        headers=DR_MODELING_HEADERS,
        json=json
    )
    
    if response.status_code == 201:

        return response.json()['credentialId']
        
    else:

        print('Request failed; http error {code}: {content}'.format(code=response.status_code, content=response.content))

# get or create a credential set
def dr_get_or_create_catalog_credentials(name, cred_type, user, password, token=None):
    cred_id = dr_get_catalog_credentials(name, cred_type)
    
    if cred_id == None:
        return dr_create_catalog_credentials(name, cred_type, user, password, token=None)
    else:
        return cred_id

credentials_id = dr_get_or_create_catalog_credentials('s3_community', 
                                                      's3', my_creds.SNOW_USER, my_creds.SNOW_PASS)

 

Create a connection to Snowflake, and use the last successful run time and current time to create the bounds for determining which newly created or recently updated rows must be scored.

 

# create a connection
ctx = snowflake.connector.connect(
    user=SNOW_USER,
    password=SNOW_PASS,
    account=SNOW_ACCOUNT,
    database=SNOW_DB,
    schema=SNOW_SCHEMA,
    protocol='https'
)

# create a cursor
cur = ctx.cursor()

# execute sql to get start/end timestamps to use
sql = "select last_ts_scored_through, current_timestamp::TIMESTAMP_NTZ cur_ts " \
    "from etl_history " \
    "where job_nm = '{job}' " \
    "order by last_ts_scored_through desc " \
    "limit 1 ".format(job=JOB_NAME)
cur.execute(sql)

# fetch results into dataframe
df = cur.fetch_pandas_all()
start_ts = df['LAST_TS_SCORED_THROUGH'][0]
end_ts = df['CUR_TS'][0]

 

Dump the data out to S3.

 

# execute sql to dump data into a single file in S3 stage bucket
# AWS single file snowflake limit 5 GB
sql = "COPY INTO @S3_SUPPORT/titanic/community/" + JOB_NAME + ".csv " \
    "from  " \
    "( " \
    "  select passengerid, pclass, name, sex, age, sibsp, parch, ticket, fare, cabin, embarked " \
    "  from passengers_500k_ts " \
    "  where nvl(updt_ts, crt_ts) >= '{start}' " \
    "  and nvl(updt_ts, crt_ts) < '{end}' " \
    ") " \
    "file_format = (format_name='default_csv' compression='none') header=true overwrite=true single=true;".format(start=start_ts, end=end_ts)
cur.execute(sql)

 

Next, create a session to perform the DataRobot Batch Prediction API scoring job submission and monitor its progress.

 

session = requests.Session()
session.headers = {
    'Authorization': 'Bearer {}'.format(API_KEY)
}

 

The job will be defined to take the file dump from Snowflake as input, and create a file with _scored appended in the same S3 path. A concurrency of 4 prediction cores has been specified in this example, with pass through of the surrogate key PASSENGERID to be joined on later.

 

INPUT_FILE = 's3://'+ my_creds.S3_BUCKET + '/titanic/community/' + JOB_NAME + '.csv'
OUTPUT_FILE = 's3://'+ my_creds.S3_BUCKET + '/titanic/community/' + JOB_NAME + '_scored.csv'

job_details = {
    'deploymentId': DEPLOYMENT_ID,
    'passthroughColumns': ['PASSENGERID'],
    'numConcurrent': 4,
    "predictionInstance" : {
        "hostName": DR_PREDICTION_HOST,
        "datarobotKey": DATAROBOT_KEY
    },
    'intakeSettings': {
        'type': 's3',
        'url': INPUT_FILE,
        'credentialId': credentials_id
    },
    'outputSettings': {
        'type': 's3',
        'url': OUTPUT_FILE,
        'credentialId': credentials_id
    }
}

 

The job can now be submitted for processing and a URL retrieved for monitoring.

 

response = session.post(
        DR_APP_HOST + '/api/v2/batchPredictions',
        json=job_details
    )

 

Control can be held until the job completes.

 

if response.status_code == 202:
    
    job = response.json()
    print('queued batch job: {}'.format(job['links']['self']))

    while job['status'] == 'INITIALIZING':
        time.sleep(3)
        response = session.get(job['links']['self'])
        response.raise_for_status()
        job = response.json()
        
    print('completed INITIALIZING')
        
    if job['status'] == 'RUNNING':

        while job['status'] == 'RUNNING':
            time.sleep(3)
            response = session.get(job['links']['self'])
            response.raise_for_status()
            job = response.json()
            
    print('completed RUNNING')
    print('status is now {status}'.format(status=job['status']))
    
    if job['status'] != 'COMPLETED':
        for i in job['logs']:
            print(i)
    
else:
    
    print('Job submission failed; http error {code}: {content}'.format(code=response.status_code, content=response.content))

 

Upon completion, the staging table can be loaded into the STG schema table PASSENGERS_SCORED_BATCH_API with the prediction results via a truncate and bulk load operation.

 

# multi-statement executions
# https://docs.snowflake.com/en/user-guide/python-connector-api.html#execute_string

# truncate and load STG schema table with scored results
sql = "truncate titanic.stg.PASSENGERS_SCORED_BATCH_API; " \
    " copy into titanic.stg.PASSENGERS_SCORED_BATCH_API from @S3_SUPPORT/titanic/community/" + JOB_NAME + "_scored.csv" \
    " FILE_FORMAT = 'DEFAULT_CSV' ON_ERROR = 'ABORT_STATEMENT' PURGE = FALSE;"
ctx.execute_string(sql)

 

Lastly, a transaction can be created to update the presentation table with the latest survivability scores towards the positive class label 1 of survival. The ETL history is updated upon successful completion of all tasks as well.

 

# update target presentation table and ETL history table in transaction

sql = \
    "begin; " \
    "update titanic.public.passengers_500k_ts trg " \
    "set trg.survival = src.survived_1_prediction " \
    "from titanic.stg.PASSENGERS_SCORED_BATCH_API src " \
    "where src.passengerid = trg.passengerid; " \
    "insert into etl_history values ('{job}', '{run_through_ts}'); " \
    "commit; ".format(job=JOB_NAME, run_through_ts=end_ts)
ctx.execute_string(sql)

 

Rows 2 and 3 were updated with new survival scores as expected.

Screen Shot 2020-03-22 at 4.05.03 PM.png

ETL history has been updated, and subsequent runs will now be based off of this now (most recent) successful timestamp.

Screen Shot 2020-03-22 at 4.06.55 PM.png

This code can be found in github.

Enhancements to consider:

  • Additional error handling, scoring or otherwise, that suits an organization's workflow and toolset.
  • Serverless technology like AWS Lambda could be incorporated into scoring workflows to kick off a job based on an event, like S3 object creation.
  • Snowflake single statement dumps/ingests seem to perform best around 8 threads per cluster node, e.g., a 2-node Small will not ingest a single file any faster than 1-node XSmall instance. An XSmall would likely perform best with 8 or more file shreds. This is not likely to be particularly meaningful until data volumes grow of significant interest.

Summary 

These examples are just some of the methods that may be used to score Snowflake data and leverage the DataRobot Batch Prediction API. The GUI approach is good for ad-hoc use cases and smaller table sandbox scoring jobs.  As complexity grows, the API offers flexibility to run more complex, multi-step pipeline jobs.  As data volumes grow, using S3 as an intermediate layer is one option for keeping strict control over resource usage and optimizing for cost efficiency.

Labels (2)
Version history
Revision #:
20 of 20
Last update:
‎04-14-2020 04:39 PM
Updated by: