Model Monitoring with Serverless MLOps Agents

cancel
Showing results for 
Search instead for 
Did you mean: 

Model Monitoring with Serverless MLOps Agents

Guest Architect: XzibitGuest Architect: Xzibit

DataRobot can monitor model performance and drift statistics for models deployed externally to the system. These externally deployed models could be Docker containers of DataRobot-generated models; DataRobot models exported as Java or Python scoring code; or even be completely custom-coded models or models created in another tool.

Screen Shot 2020-09-24 at 5.27.37 PM.png

This article presents an option for scaling DataRobot MLOps agents on AWS to handle small and large data queues using serverless resources to track deployed models.

High Level MLOps Agent Architecture

fhuthmacher_7-1593211213734.png

At a high level, an externally deployed machine learning model leverages the DataRobot MLOps Library to produce reporting records that will be sent back to DataRobot to process and report on. It is the job of the DataRobot MLOps agent to consume these records and pipe them into a DataRobot instance. The agent itself can be configured to read from various spool sources, including flat files, AWS SQS, and RabbitMQ as of this writing. This article will focus on scaling serverless MLOps agents to consume an AWS SQS Queue and focus solely on the queue data consumption and reporting to DataRobot.

Standard Solution Architecture

Screen Shot 2020-07-02 at 12.46.58 PM.png

The External Model in these diagrams represents a model of any of the types mentioned above, deployed outside of the DataRobot cluster. One approach is to run the MLOps agent on a standing compute environment. This involves standing up a server resource, installing the agent on the resource, and having it continually poll for new data to consume and report back to DataRobot. This is an absolutely fine and simple solution, but does present two drawbacks: cost and scaling. The server is always up and running, which can be costly for even the smallest solutions. And scaling is an issue when the solution needs to consume a queue that has many elements, such as many models, or there are very busy models writing to the SQS queue. For example, what if the SQS queue has a million elements backlogged? How long until the queue is fully processed by a single agent? Multiple agents can be run to consume and send back data in a concurrent fashion. EC2 auto-scaling does not solve this problem because the triggering mechanisms for scaling more machines relate to how busy the EC2 server itself is, rather than the actual quantity of items in its backlog (that it needs to process from an external queue).

Serverless Solution Architecture

Screen Shot 2020-07-02 at 1.51.11 PM.png

In a serverless architecture, AWS Lambda can be leveraged to create an MLOps agent on demand, and scale additional agents via multiple Lambda functions to consume a populated queue based on its backlog. The MLOps agent code will be stored and retrieved from S3, brought into a Lambda function to instantiate it; where it will poll and consume records written by the External Model into a Data SQS queue. The function will include logic for when to run additional MLOps agents (and how many). Inserting messages into the Agent SQS queue "recruits" the agents by triggering their creation.

Concurrency of Lambda functions is managed by keeping track of those in a running state via the DynamoDB noSQL database. Although the Lambda service itself has a concurrency reservation setting, it does not operate as a number of open slots for a Lambda function to run in immediately in once a slot is open.

Instead, it works by sleeping Lambda function invocations past the reservation limit; then, until concurrency slots open up, the lengths of sleep times increase. This is rarely the desired effect when processing data because it can lead to periods of idle wait time where nothing is being processed. A CloudWatch schedule will be set to insert a message into the Agent SQS queue every 15 minutes. This operation runs an initial Lambda function to see if the Data Queue itself is in a non-empty state;  after running, it must be cleared.

A Lambda function has a maximum duration of 15 minutes. If the backlog still remains after that time, the MLOps agent will gracefully terminate and pass another message to the Agent SQS queue to trigger another new Lambda function instance to take its place.

Add Dependent Items to S3

The Lambda function will retrieve the agent installer and configuration files from S3, and create or leverage an existing bucket for these items. The MLOps agent tarball and its configuration will both be hosted in S3. These will both be provided to the function via its environment config. (This article refers to these objects as the agent archive and config, respectively.)

s3://my_bucket/community/agent/datarobot-mlops-agent-6.1.3-272.tar.gz s3://my_bucket/community/agent/mlops.agent.conf.yaml

Create Data Simple Queue Service (SQS) Queue

This queue will be written to by one to many externally deployed models. Scored data records and metadata will be sent to this queue. An MLOps agent will read from the queue and direct the records to DataRobot for modeling monitoring..

Navigate to Services -> SQS. Select the Create New Queue button, provide the queue name (sqs_mlops_data_queue), and choose the Standard Queue option. Configure Queue Attributes if desired, or simply choose the Quick-Create Queue with defaults.

Notice some of the configurable options, such as Default visibility timeout. Once an element is read from the queue, it is invisible to other queue consumers for this amount of time, after which it becomes visible again if the consumer did not also report back that the item was processed successfully and can be deleted from the queue.

Another configurable option, Message retention (period), defines how long an unprocessed item will be allowed to stay in the queue. If it is not processed by this time, the item will be removed. If agents are down for some reason, it may be a good idea to set this value to a week or longer so that queue items will not be lost before being consumed by an MLOps agent. Make a note of the Amazon Resource Name (ARN) upon creation.

Create Agent SQS Queue

This queue will be used to kick off Lambda services which will run MLOps agents. These agents will dequeue elements from the Data Queue and report them to the DataRobot platform for monitoring. Follow the same steps explained above, using the name sqs_mlops_agent_queue, but instead set the default queue attribute Default Visibility Timeout to 16 minutes. A read element will not become readable again for this duration. In theory, a successful Lambda service (AWS limit of 15 minutes) will have been triggered by the element, and a successful return from the function will result in the SQS element being removed permanently from the queue. If it fails for some reason, read element visibility will return to the queue. This timeout should allow for each element to be sent to DataRobot once, and will prevent any record from being read and sent in duplicate.

Create a DynamoDB Tables to Track Lambda Agents and Errors

A table will be used to track concurrently running Lambda functions running MLOps agents. Navigate to the DynamoDB (Managed NoSQL AWS key-value/document store database) service on AWS, and select the Create table option. Name a new table lambda_mlops_agents and set a primary key of aws_request_id; create the table with default settings.

Screen Shot 2020-07-01 at 3.37.04 PM.png

Upon table creation, the Overview tab will provide an ARN for the table; make a note of this value. Then, using the same structure, make another table named lambda_mlops_agents_error and record its ARN as well.

Configure IAM Role for Lambda Function

Navigate to Identity and Access Management (IAM). Under Roles, select Create role. Select AWS service and Lambda as a use case and then navigate to Next: Permissions. Search for and add the AWSLambdaBasicExecutionRole policy. Proceed with the next steps, provide the Role name lambda_mlops_agent_role, and then complete the task by hitting Create role. In the roles page, filter on the newly created role; choose it and then select Add inline policy under the Permissions tab. Select choose a service, then filter on “DynamoDB” and select it from the returned options.

Under actions, choose the following privileges:

Screen Shot 2020-07-01 at 4.00.15 PM.png

Under Resources, choose Specific and click the Add ARN link under the table option. Specify the ARN of the DynamoDB lambda_mlops_agents and lambda_mlops_agents_error tables created previously. Choose Review policy, provide the name lambda_mlops_agent_role_dynamodb_policy, and complete the task by hitting the Create policy button.

Follow the same steps to add another inline policy, this time for the queues. Filter on the “SQS” service, select Read and Write checkboxes and Add ARN, and then add the ARN of each SQS queue. Review the policy and name it lambda_mlops_agent_role_sqs_policy.

Lastly, an inline policy must be set for S3 access. Follow the same steps, and choose the S3 service in the search filter. Choose the Read -> GetObject privilege, and under Resources choose the specific bucket and all objects in it. Review the policy and save out as lambda_mlops_agent_role_s3_policy.

Screen Shot 2020-07-02 at 4.39.09 PM.png

Create Python Lambda

Navigate to the Lambda service Create function and, from scratch, create a new Python 3.7 Lambda service named mlops_agent_processor. Under Permissions, choose Use an existing role and select lambda_mlops_agent_role

Create Lambda Environment Variables

Some Lambda service configuration and environment settings need to be set. The service will be routinely creating output, although a timeout should be set—30 seconds is probably safe. The Lambda service isn't doing a large amount of local processing, so 512MB should be sufficient to run the agent efficiently and at a low Lambda service cost (the service is billed by GB-seconds). Environment variables need to be set for where the agent and its configuration are stored, the queues the agent interacts with, and target concurrency.

Screen Shot 2020-12-16 at 1.52.00 AM.png

Populate Lambda Function Code

Use the following code in the lambda_function.py window.

from urllib.parse import unquote_plus
import boto3
import os
import time
from boto3.dynamodb.conditions import Key, Attr
import subprocess
import datetime

def get_approx_data_queue_size(sqs_resource, queue_name):
    data_queue = sqs_resource.get_queue_by_name(QueueName=queue_name)
    return int(data_queue.attributes.get('ApproximateNumberOfMessages'))

def lambda_handler(event, context):
    
    request_id = context.aws_request_id

    # the lambda environment is coming with openjdk version "1.8.0_201"
    #os.system("java -version")
    
    try:
        # get and parse environment values
        ENV_AGENT_BUCKET = os.environ['dr_mlops_agent_bucket']
        ENV_AGENT_ZIP = os.environ['dr_mlops_agent_zip']
        ENV_AGENT_CONFIG = os.environ['dr_mlops_agent_config']
        ENV_TARGET_CONCURRENCY = int(os.environ['dr_mlops_agent_target_concurrency'])
        ENV_DATA_QUEUE = os.environ['data_queue']
        ENV_AGENT_QUEUE = os.environ['agent_queue']
        
        agent_config = os.path.basename(ENV_AGENT_CONFIG)
        
        # datarobot_mlops_package-6.3.3-488.tar.gz
        agent_zip = os.path.basename(ENV_AGENT_ZIP)
        
        # datarobot_mlops_package-6.3.3-488
        temp_agent_dir = agent_zip.split(".tar")[0]

        # datarobot_mlops_package-6.3.3
        temp_agent_dir = temp_agent_dir.split("-")
        agent_dir = temp_agent_dir[0] + '-' + temp_agent_dir[1]

    except: 
        raise Exception("Problem retrieving and parsing environment variables!")

    # lambda max runtime allowed (15 minute AWS maximum duration, recommended value to use here is 14 for MAXIMUM_MINUTES)
    MAXIMUM_MINUTES = 14
    start_time_epoch = int(time.time())
    time_epoch_15m_ago = start_time_epoch - int(60 * 14.7)
    time_epoch_60m_ago = start_time_epoch - 60 * 60
    max_time_epoch = start_time_epoch + 60 * int(MAXIMUM_MINUTES)

    # check number of items in data queue to process
    sqs = boto3.resource('sqs')
    approx_data_queue_size = get_approx_data_queue_size(sqs, ENV_DATA_QUEUE)
    
    # exit immediately if data queue has nothing to process
    if approx_data_queue_size == 0:
        print('nothing to process, queue is empty.')
        return None
    
    # connect to database
    dynamodb = boto3.resource('dynamodb')
    
    # count running agents in dynamo in the last 15 minutes
    table = dynamodb.Table('lambda_mlops_agents')
    response = table.scan(
        FilterExpression=Attr('start_time_epoch').gte(time_epoch_15m_ago)
    )
    agent_count = int(response['Count'])
    print ('agent count started and running in the last 15 minutes is: ' + str(agent_count))
    
    # count error agent records in dynamo in the last hour
    error_table = dynamodb.Table('lambda_mlops_agents_error')
    response = error_table.scan(
        FilterExpression=Attr('start_time_epoch').gte(time_epoch_60m_ago)
    )
    error_count = int(response['Count'])
    print ('agent errors count in the past 60 minutes: ' + str(error_count))
    
    # exit immediately if there has been an error in the last 60 minutes
    if error_count > 0:
        print('exiting - lambda agents have errored within the last hour.')
        return None
    
    # create agent queue in case recruitment is needed
    agent_queue = sqs.get_queue_by_name(QueueName=ENV_AGENT_QUEUE)

    # exit immediately if target concurrent lambda count has already been reached
    if agent_count >= ENV_TARGET_CONCURRENCY:
        print('exiting without creating a new agent, already hit target concurrency of: ' + str(ENV_TARGET_CONCURRENCY))
        return None
    else:
        # how many items does it take to be in the queue backlog for each additional agent to recruit?
        SQS_QUEUE_ITEMS_PER_LAMBDA = 500
        
        # add agent record to table for this lambda instance
        table.put_item(Item= {'aws_request_id': request_id, 'start_time_epoch':  start_time_epoch})

        # total lambdas, minimum of queue size / items per lambda or target concurrency, -1 for this lambda, - current running agent count
        lambdas_to_recruit = min(-(-approx_data_queue_size // SQS_QUEUE_ITEMS_PER_LAMBDA), ENV_TARGET_CONCURRENCY) - 1 - agent_count
        if lambdas_to_recruit < 0:
            lambdas_to_recruit = 0
        
        for x in range(lambdas_to_recruit):
            print('adding new agent: ' + str(x))
            agent_queue.send_message(MessageBody='{ request_id: "' + request_id + '", source: "lambda_new_' + str(x) + '" }')

    # install agent
    try:
        # switch to a local workspace
        os.chdir("/tmp")

        # get agent zip if it is not already here, and install it
        if os.path.isfile("/tmp/" + agent_zip) == False:
            print('agent does not exist.  installing...')
            print("bucket: " + ENV_AGENT_BUCKET + " agent zip: " + ENV_AGENT_ZIP)
            # get agent zip
            s3 = boto3.resource('s3')
            s3.Bucket(ENV_AGENT_BUCKET).download_file(ENV_AGENT_ZIP, agent_zip)

            # unzip contents
            os.system('tar -xvf ' + agent_zip + ' 2> /dev/null')

            # replace config
            os.chdir('/tmp/' + agent_dir + '/conf')
            s3.Bucket(ENV_AGENT_BUCKET).download_file(ENV_AGENT_CONFIG, agent_config)

        else:
            print('agent already exists, hot agent!')

    except: 
        raise Exception("Problem installing the agent!")

    # start the agent
    os.chdir('/tmp/' + agent_dir)
    os.system('bin/start-agent.sh')
    
    time.sleep(5)
    output = subprocess.check_output("head logs/mlops.agent.log", shell=True)
    print('head mlops.agent.log --- \n' + str(output))
    
    output = subprocess.check_output("head logs/mlops.agent.out", shell=True)
    print('head mlops.agent.out --- \n' + str(output))
    
    output = subprocess.check_output("tail -3 logs/mlops.agent.log | grep 'Fail\|Error\|ERROR' | wc -l", shell=True)
    print('tail -3 logs/mlops.agent.log log count --- \n' + str(int(output)) + ' errors.')

    # write to error_table if agent is failing
    if int(output) > 0:
        error_table.put_item(Item= {'aws_request_id': request_id, 'start_time_epoch':  start_time_epoch})
        print('exiting - lambda agent errored.')
        
        # stop agent and clean up
        os.system('bin/stop-agent.sh ')
        os.system('rm -rf bin/PID.agent')
        
        # remove dynamo record for this lambda
        table.delete_item(Key= {'aws_request_id': request_id})
    
        return None

    # time to let the agent do its thing...
    current_epoch = int(time.time())

    # while there is still time left to run the Lambda and there are items on the queue
    while current_epoch < max_time_epoch and approx_data_queue_size > 0:
        time.sleep(10)
        current_epoch = int(time.time())
        approx_data_queue_size = get_approx_data_queue_size(sqs, ENV_DATA_QUEUE)
        print(datetime.datetime.now(datetime.timezone.utc).strftime("%Y-%m-%d %H:%M:%S") + \
            ' - approx_data_queue_size: ' + str(approx_data_queue_size))
        #output = subprocess.check_output("tail logs/mlops.agent.log", shell=True)
        #print('tail mlops.agent.log --- \n' + str(output))
    
    # exited processing loop, time to stop agent and clean up
    os.system('bin/stop-agent.sh ')
    os.system('rm -rf bin/PID.agent')
    
    # remove dynamo record for this lambda
    table.delete_item(Key= {'aws_request_id': request_id})
    
    # if we ran out of time and there are more items in the backlog, recruit a replacement for this lambda
    if current_epoch > max_time_epoch:
        print('ran out of time...')
        # if there are still elements to process
        if approx_data_queue_size > 0:
            print('adding replacement agent...')
            agent_queue.send_message(MessageBody='{ request_id: "' + request_id + '", source: "lambda_replacement" }')

Create Lambda Function SQS Trigger

Click +Add trigger to create a trigger to initiate the Lambda function. Search for "SQS" under Trigger configuration. Choose sqs_mlops_agent_queue, and specify a batch size of 1. Enable and add the trigger.

Agent Account Configuration

An API token for a DataRobot account needs to be set in mlops.agent.conf.yaml; it is advised to use the token for a service account rather than a particular user, to avoid any risk of the account being deactivated in the future. It is also advised for the admin to exempt this account from any API rate limiting, which can be done in the user's profile settings.

Screen Shot 2020-07-28 at 4.28.46 PM.png

Testing the Queue Processor

Create an IAM MLOps Agent User

This user will be used for generating records to fill the data queue. Navigate to the IAM service and choose Add user. Name the user community_agent_user_iam and select the checkbox for Programmatic access. On the Permissions page, simply choose Attach existing policies directly and filter on “sqs.” Choose the AmazonSQSFullAccess checkbox and click through next steps to create the user. Upon successful creation, the Access key ID and Secret access key will be presented. Save both of these values.

Create an External Deployment

This step and the following will be done in a client environment within the unzipped MLOps agent directory, such as datarobot-mlops-agent-6.1.3.

Install the Python wheel file from within the unzipped MLOps agent directory:

pip install lib/datarobot_mlops-*-py2.py3-none-any.whl --user

The conf/mlops.agent.conf.yaml must be updated as well; specify the mlopsURL value to be that of the app server, for instance https://app.datarobot.com.  Update the apiToken as well. Navigate to channelConfigs and comment out the FS_SPOOL and associated spoolDirectoryPath line. Specify SQS channel values to take its place, using the URL of the SQS data queue.

- type: "SQS_SPOOL"
details: {name: "sqsSpool", queueUrl: "https://sqs.us-east-1.amazonaws.com/1234567/sqs_mlops_data_queue"}

The model that will be used is the examples/python/BinaryClassificationExample. Change into this directory and create the deployment. This will produce a MLOPS_DEPLOYMENT_ID and MLOPS_MODEL_ID; make note of those values.

Send Records to SQS Queue via Script

From the same example directory, edit the run_example.sh script. Add the following the lines to the script to specify the target SQS queue for tracking record data and the AWS credentials for authentication:

# values created from create_deployment.sh
export MLOPS_DEPLOYMENT_ID=5e123456789012
export MLOPS_MODEL_ID=5e123456789032

# where the tracking records will be sent
export MLOPS_OUTPUT_TYPE=SQS
export MLOPS_SQS_QUEUE_URL='https://sqs.us-east-1.amazonaws.com/1234567/sqs_mlops_data_queue'

# credentials
export AWS_DEFAULT_REGION=us-east-1
export AWS_ACCESS_KEY_ID='AKIAABCDEABCDEABCDEPY'
export AWS_SECRET_ACCESS_KEY='GpzABCDEABCDEABCDEl6dcm'

Comment out the existing export MLOPS_OUTPUT_TYPE=OUTPUT_DIR line. Run the example script. It should successfully populate records into the queue, which the agent process will then consume and report back to DataRobot.

Create a Schedule

The last step required is to run the Lambda function on a scheduled basis to check if any records are in the queue, and subsequently begin processing them and scaling up as needed. AWS CloudWatch Events will be used to support this functionality.

In AWS, navigate to the CloudWatch service, then to Events -> Rules, and select the Create rule button. Choose the Schedule button, and specify to run the rule every 15 minutes. Next, click Add target and, from the dropdown, choose the SQS queue option. Select sqs_mlops_agent_queue, and then click Configure details.

Screen Shot 2020-09-24 at 5.42.42 PM.png

Provide the name of sqs_mlops_agent_queue_trigger_15m, enable the rule, and select Create rule. The rule is now running. Note that it will execute every 15 minutes when the rule is created; using a cron expression instead would result in the rule running on a round number and at the top of the minute, if desired.

Serverless Architecture is Complete!

That's it! The serverless architecture to spin up a single MLOps agent on a schedule is enabled. If the defined logic determines the queue has a lot of items, it will recruit friends to process the queue. With this architecture in place, the actual costs for processing the queue are minimal since compute is only paid for when necessary. Flexible scaling options are configurable to scale up and handle a large amount of demand, should the need arise.

Labels (2)
Version history
Last update:
‎12-16-2020 02:59 PM
Updated by:
Contributors