"""
Watson OpenScale APIs.
"""
import json
import requests
import logging
import pandas as pd
import ibm_watson_openscale
from cpdflow.wml import wml
import uuid
IAM_URL = "https://iam.cloud.ibm.com/identity/token"
_logger = logging.getLogger(__name__)
[docs]def get_subscription_name(model_name: str, space_type: str) -> str:
"""
Get subscription name
Args:
model_name (str): model name
space_type (str): development or production environment
Returns:
str: subscription name
"""
return model_name + " " + space_type.upper()
[docs]def get_custom_monitor_function_name(custom_monitor_name: str) -> str:
"""
Get custom monitor function name
Args:
custom_monitor_name (str): custom monitor name
Returns:
str: custom monitor function name
"""
return custom_monitor_name + " Function"
[docs]def get_custom_metric_provider_name(custom_monitor_name: str) -> str:
"""
Get custom monitor provider name
Args:
custom_monitor_name (str): custom monitor name
Returns:
str: custom monitor provider name
"""
return custom_monitor_name + " Provider"
[docs]def get_custom_metric_name(custom_monitor_name: str) -> str:
"""
Get custom metric name
Args:
custom_monitor_name (str): custom monitor name
Returns:
str: custom metric name
"""
return custom_monitor_name.replace(" ", "_").lower()
[docs]def get_model_name_from_subscription_name(subscription_name: str, space_type: str) -> str:
"""
Get model name from subscription name
Args:
subscription_name (str): subscription name
space_type (str): development or production environment
Returns:
str: model name
"""
return subscription_name.replace(" " + space_type.upper(), "")
[docs]def get_subscriptions(config: dict) -> dict:
"""
Get all subscriptions.
Args:
config (dict): configuration dictionary
Returns:
dict: a dictionary of subscription names as keys and ids as values
"""
wos_client = config["wos_client"]
subscriptions = {x["entity"]["deployment"]["name"]: x["metadata"]["id"] for x in wos_client.subscriptions.list().result.to_dict()["subscriptions"]}
return subscriptions
[docs]def get_service_providers(config: dict) -> dict:
"""
Get all service providers.
Args:
config (dict): configuration dictionary
Returns:
dict: a dictionary of service provider names as keys and ids as values
"""
wos_client = config["wos_client"]
service_providers = {x["entity"]["name"]: x["metadata"]["id"] for x in wos_client.service_providers.list().result.to_dict()["service_providers"]}
return service_providers
[docs]def get_integrated_systems(config: dict) -> dict:
"""
Get all integrated systems.
Args:
config (dict): configuration dictionary
Returns:
dict: a dictionary of integrated_system names as keys and ids as values
"""
wos_client = config["wos_client"]
integrated_systems = {
x["entity"]["name"]: x["metadata"]["id"]
for x in ibm_watson_openscale.base_classes.watson_open_scale_v2.IntegratedSystems(wos_client).list().result.to_dict()["integrated_systems"]
if x["entity"]["type"] == "custom_metrics_provider"
}
return integrated_systems
[docs]def get_monitor_definitions(config: dict) -> dict:
"""
Get all monitor definitions.
Args:
config (dict): configuration dictionary
Returns:
dict: a dictionary of monitor definition names as keys and ids as values
"""
wos_client = config["wos_client"]
monitor_definitions = {x["entity"]["name"]: x["metadata"]["id"] for x in wos_client.monitor_definitions.list().result.to_dict()["monitor_definitions"]}
return monitor_definitions
[docs]def get_monitor_instances_by_subscription_name(config: dict, model_name: str, space_type: str) -> dict:
"""
Get all monitor instances by subscription name.
Args:
config (dict): configuration dictionary
model_name (str): model name
space_type (dict): development or production environment
Returns:
dict: a dictionary of monitor instances names as keys and ids as values
"""
wos_client = config["wos_client"]
subscription_name = get_subscription_name(model_name=model_name, space_type=space_type)
subscriptions = get_subscriptions(config=config)
subscription_id = subscriptions[subscription_name]
dict_monitor_instances = {
x["entity"]["monitor_definition_id"]: x["metadata"]["id"]
for x in wos_client.monitor_instances.list().result.to_dict()["monitor_instances"]
if x["entity"]["target"]["target_id"] == subscription_id
}
return dict_monitor_instances
[docs]def delete_subscription_by_subscription_names(config: dict, subscription_names: list, log_format: str) -> None:
"""
Delete subscriptions by subscription names.
Args:
config (dict): configuration dictionary
subscription_names (list[str]): subscription names
log_format (str): log format for this method
"""
wos_client = config["wos_client"]
subscriptions = get_subscriptions(config=config)
for x in subscription_names:
if x in subscriptions:
_logger.info(f"{log_format} - deleting ... {x}.")
wos_client.subscriptions.delete(subscriptions[x], background_mode=False)
_logger.info(f"{log_format} - deleted {x}.")
_logger.info(f"{log_format} - delete_subscription_by_subscription_names completed.")
[docs]def create_custom_metric_provider(config: dict, space_type: str) -> None:
"""
Create custom metric provider.
Args:
config (dict): configuration dictionary
space_type (dict): development or production environment
"""
with open(config["custom_metric"]["custom_metric_script"]) as f:
calculate_metrics = f.read()
params = {"apikey": config["apikey"], "calculate_metrics": calculate_metrics}
def custom_metrics_provider(params=params):
WOS_URL = "https://api.aiopenscale.cloud.ibm.com"
import datetime
import requests
import pandas as pd
headers = {"Content-Type": "application/json", "Accept": "application/json"}
import subprocess
packages = ["aif360[all]", "category_encoders"]
subprocess.run(["pip", "install"] + packages + ["--user", "--no-cache-dir"], stderr=subprocess.PIPE, stdout=subprocess.PIPE)
with open("custom_metric.py", "w") as f:
f.write(params["calculate_metrics"])
def get_access_token():
headers = {"Content-Type": "application/x-www-form-urlencoded", "Accept": "application/json"}
data = {"grant_type": "urn:ibm:params:oauth:grant-type:apikey", "apikey": params["apikey"]}
access_token = requests.post("https://iam.cloud.ibm.com/identity/token", data=data, headers=headers).json()["access_token"]
return access_token
def get_feedback_dataset_id(access_token, data_mart_id, subscription_id):
headers["Authorization"] = "Bearer " + access_token
DATASETS_URL = f"{WOS_URL}/openscale/{data_mart_id}/v2/data_sets?target.target_id={subscription_id}&target.target_type=subscription&type=feedback"
json_data = requests.get(DATASETS_URL, headers=headers).json()
feedback_dataset_id = None
if "data_sets" in json_data and len(json_data["data_sets"]) > 0:
feedback_dataset_id = json_data["data_sets"][0]["metadata"]["id"]
return feedback_dataset_id
def get_feedback_data(access_token, data_mart_id, feedback_dataset_id):
json_data = None
num_records = 100
if feedback_dataset_id is not None:
headers["Authorization"] = "Bearer " + access_token
DATASETS_STORE_RECORDS_URL = f"{WOS_URL}/openscale/{data_mart_id}/v2/data_sets/{feedback_dataset_id}/records?limit={num_records}&format=list"
json_data = requests.get(DATASETS_STORE_RECORDS_URL, headers=headers, verify=False).json()
return json_data
def update_monitor_instance(base_url, access_token, custom_monitor_instance_id, payload):
monitor_instance_url = f"{base_url}/v2/monitor_instances/{custom_monitor_instance_id}?update_metadata_only=true"
patch_payload = [{"op": "replace", "path": "/parameters", "value": payload}]
headers["Authorization"] = "Bearer " + access_token
response = requests.patch(monitor_instance_url, headers=headers, json=patch_payload, verify=False)
monitor_response = response.json()
return response.status_code, monitor_response
def get_metrics(access_token, data_mart_id, subscription_id):
import custom_metric
feedback_dataset_id = get_feedback_dataset_id(access_token, data_mart_id, subscription_id)
json_data = get_feedback_data(access_token, data_mart_id, feedback_dataset_id)
fields = json_data["records"][0]["fields"]
values = json_data["records"][0]["values"]
df = pd.DataFrame(values, columns=fields)
metrics = custom_metric.calculate_metrics(df=df)
return metrics
def publish_metrics(base_url, access_token, data_mart_id, subscription_id, custom_monitor_id, custom_monitor_instance_id, custom_monitoring_run_id, timestamp):
custom_metrics = get_metrics(access_token, data_mart_id, subscription_id)
measurements_payload = [{"timestamp": timestamp, "run_id": custom_monitoring_run_id, "metrics": [custom_metrics]}]
headers["Authorization"] = "Bearer {}".format(access_token)
measurements_url = f"{base_url}/v2/monitor_instances/{custom_monitor_instance_id}/measurements"
response = requests.post(measurements_url, headers=headers, json=measurements_payload)
published_measurement = response.json()
return response.status_code, published_measurement
def publish(input_data):
timestamp = datetime.datetime.utcnow().strftime("%Y-%m-%dT%H:%M:%S.%fZ")
payload = input_data["input_data"][0]["values"]
data_mart_id = payload["data_mart_id"]
subscription_id = payload["subscription_id"]
custom_monitor_id = payload["custom_monitor_id"]
custom_monitor_instance_id = payload["custom_monitor_instance_id"]
custom_monitor_instance_params = payload["custom_monitor_instance_params"]
base_url = f"{WOS_URL}/openscale/{data_mart_id}"
access_token = get_access_token()
published_measurements = []
error_msg = None
custom_monitoring_run_id = custom_monitor_instance_params["run_details"]["run_id"]
try:
status_code, published_measurement = publish_metrics(
base_url, access_token, data_mart_id, subscription_id, custom_monitor_id, custom_monitor_instance_id, custom_monitoring_run_id, timestamp
)
if int(status_code) in [200, 201, 202]:
custom_monitor_instance_params["run_details"]["run_status"] = "finished"
published_measurements.append(published_measurement)
else:
custom_monitor_instance_params["run_details"]["run_status"] = "error"
custom_monitor_instance_params["run_details"]["run_error_msg"] = published_measurement
error_msg = published_measurement
custom_monitor_instance_params["last_run_time"] = timestamp
status_code, response = update_monitor_instance(base_url, access_token, custom_monitor_instance_id, custom_monitor_instance_params)
if not int(status_code) in [200, 201, 202]:
error_msg = response
except Exception as ex:
error_msg = str(ex)
if error_msg is None:
response_payload = {"predictions": [{"values": published_measurements}]}
else:
# response_payload = {"error_msg": error_msg}
response_payload = {"predictions": [{"values": error_msg}]}
return response_payload
return publish
custom_metric_function_name = get_custom_monitor_function_name(custom_monitor_name=config["custom_metric"]["custom_monitor_name"])
wml.deploy_function(config=config, function=custom_metrics_provider, function_name=custom_metric_function_name, space_type=space_type)
[docs]def delete_integrated_system_by_custom_monitor_name(config: dict, log_format: str) -> None:
"""
Delete integrated system by custom monitor name.
Args:
config (dict): configuration dictionary
log_format (str): log format for this method
"""
wos_client = config["wos_client"]
custom_metric_provider_name = get_custom_metric_provider_name(custom_monitor_name=config["custom_metric"]["custom_monitor_name"])
integrated_systems = get_integrated_systems(config=config)
if custom_metric_provider_name in integrated_systems:
_logger.info(f"{log_format} - deleting integrated systems ... {custom_metric_provider_name}")
ibm_watson_openscale.base_classes.watson_open_scale_v2.IntegratedSystems(wos_client).delete(integrated_system_id=integrated_systems[custom_metric_provider_name], background_mode=False)
_logger.info(f"{log_format} - delete_integrated_system_by_custom_monitor_name completed.")
else:
_logger.info(f"{log_format} - delete_integrated_system_by_custom_monitor_name - Nothing to delete.")
[docs]def delete_custom_metric_monitor_by_custom_monitor_name(config: dict, log_format: str) -> None:
"""
Delete custom metric monitor by custom monitor name.
Args:
config (dict): configuration dictionary
log_format (str): log format for this method
"""
wos_client = config["wos_client"]
custom_monitor_name = config["custom_metric"]["custom_monitor_name"]
monitor_definitions = get_monitor_definitions(config=config)
if custom_monitor_name in monitor_definitions:
_logger.info(f"{log_format} - deleting custom metric ... {custom_monitor_name}")
wos_client.monitor_definitions.delete(monitor_definitions[custom_monitor_name], background_mode=False)
_logger.info(f"{log_format} - delete_custom_metric_monitor_by_custom_monitor_name completed.")
else:
_logger.info(f"{log_format} - delete_custom_metric_monitor_by_custom_monitor_name - Nothing to delete.")
[docs]def create_integrated_system(config: dict, space_type: str) -> None:
"""
Create integrated system.
Args:
config (dict): configuration dictionary
space_type (dict): development or production environment
"""
wos_client = config["wos_client"]
wml_client = config["wml_client"]
custom_monitor_name = config["custom_metric"]["custom_monitor_name"]
custom_metric_provider_name = get_custom_metric_provider_name(custom_monitor_name=custom_monitor_name)
CUSTOM_METRICS_PROVIDER_CREDENTIALS = {
"auth_type": "bearer",
"token_info": {
"url": IAM_URL,
"headers": {"Content-Type": "application/x-www-form-urlencoded"},
"payload": "grant_type=urn:ibm:params:oauth:grant-type:apikey&response_type=cloud_iam&apikey=" + config["apikey"],
"method": "POST",
},
}
integrated_systems = get_integrated_systems(config=config)
if custom_metric_provider_name in integrated_systems:
ibm_watson_openscale.base_classes.watson_open_scale_v2.IntegratedSystems(wos_client).delete(integrated_system_id=integrated_systems[custom_metric_provider_name], background_mode=False)
deployments = wml.get_deployment_details(config=config, space_type=space_type)
custom_monitor_function_name = get_custom_monitor_function_name(custom_monitor_name=custom_monitor_name)
function_deployment_name = wml.get_function_deployment_name(function_name=custom_monitor_function_name)
function_deployment_details = deployments[function_deployment_name]
scoring_url = wml_client.deployments.get_scoring_href(function_deployment_details)
created_at = function_deployment_details["metadata"]["created_at"]
current_date = created_at[0 : created_at.find("T")]
scoring_url = wml_client.deployments.get_scoring_href(function_deployment_details) + "?version=" + current_date
ibm_watson_openscale.base_classes.watson_open_scale_v2.IntegratedSystems(wos_client).add(
name=custom_metric_provider_name,
description=custom_metric_provider_name,
type="custom_metrics_provider",
credentials=CUSTOM_METRICS_PROVIDER_CREDENTIALS,
connection={"display_name": custom_metric_provider_name, "endpoint": scoring_url},
)
[docs]def create_custom_metric_monitor(config: dict) -> None:
"""
Create custom metric monitor.
Args:
config (dict): configuration dictionary
"""
wos_client = config["wos_client"]
custom_monitor_name = config["custom_metric"]["custom_monitor_name"]
metrics = [
ibm_watson_openscale.base_classes.watson_open_scale_v2.MonitorMetricRequest(
name="statistical_parity_difference",
thresholds=[ibm_watson_openscale.base_classes.watson_open_scale_v2.MetricThreshold(type=ibm_watson_openscale.supporting_classes.enums.MetricThresholdTypes.UPPER_LIMIT, default=0.01)],
),
ibm_watson_openscale.base_classes.watson_open_scale_v2.MonitorMetricRequest(
name="disparate_impact_ratio",
thresholds=[ibm_watson_openscale.base_classes.watson_open_scale_v2.MetricThreshold(type=ibm_watson_openscale.supporting_classes.enums.MetricThresholdTypes.UPPER_LIMIT, default=0.01)],
),
# ibm_watson_openscale.base_classes.watson_open_scale_v2.MonitorMetricRequest(
# name="specificity",
# thresholds=[
# ibm_watson_openscale.base_classes.watson_open_scale_v2.MetricThreshold(type=ibm_watson_openscale.supporting_classes.enums.MetricThresholdTypes.LOWER_LIMIT, default=0.6),
# ibm_watson_openscale.base_classes.watson_open_scale_v2.MetricThreshold(type=ibm_watson_openscale.supporting_classes.enums.MetricThresholdTypes.UPPER_LIMIT, default=1),
# ],
# ),
# ibm_watson_openscale.base_classes.watson_open_scale_v2.MonitorMetricRequest(
# name="sensitivity",
# thresholds=[
# ibm_watson_openscale.base_classes.watson_open_scale_v2.MetricThreshold(type=ibm_watson_openscale.supporting_classes.enums.MetricThresholdTypes.LOWER_LIMIT, default=0.4),
# ibm_watson_openscale.base_classes.watson_open_scale_v2.MetricThreshold(type=ibm_watson_openscale.supporting_classes.enums.MetricThresholdTypes.UPPER_LIMIT, default=1),
# ],
# ),
]
tags = [ibm_watson_openscale.base_classes.watson_open_scale_v2.MonitorTagRequest(name="region", description="us-south")]
monitor_definitions = get_monitor_definitions(config=config)
if custom_monitor_name in monitor_definitions:
wos_client.monitor_definitions.delete(monitor_definitions[custom_monitor_name], background_mode=False)
wos_client.monitor_definitions.add(name=custom_monitor_name, metrics=metrics, tags=tags, background_mode=False)
[docs]def subscribe_custom_model(config: dict, model_config: dict, space_type: str, log_format: str) -> None:
"""
Subscribe custom model.
Args:
config (dict): configuration dictionary
model_name (str): model name
space_type (dict): development or production environment
log_format (str): log format for this method
"""
model_name = model_config["model_name"]
_logger.info(f"{log_format} - subscribing custom model ... {model_name}.")
wos_client = config["wos_client"]
data_mart_id = config["data_mart_id"]
service_provider_id = config["custom_service_provider_id"]
training_file_name = config["training_file_name"]
subscription_name = get_subscription_name(model_name=model_name, space_type=space_type)
label_column = model_config["target"]
scoring_url = model_config["scoring_url"]
input_data_schema = model_config["input_data_schema"][0]["fields"]
asset_id = str(uuid.uuid4())
asset_deployment_id = str(uuid.uuid4())
scoring_request_headers = {"Content-Type": "application/json"}
asset = ibm_watson_openscale.supporting_classes.Asset(
asset_id=asset_id,
name=subscription_name,
url="",
asset_type=ibm_watson_openscale.supporting_classes.enums.AssetTypes.MODEL,
input_data_type=ibm_watson_openscale.supporting_classes.enums.InputDataType.STRUCTURED,
problem_type=ibm_watson_openscale.supporting_classes.enums.ProblemType.BINARY_CLASSIFICATION,
)
asset_deployment = ibm_watson_openscale.supporting_classes.AssetDeploymentRequest(
deployment_id=asset_deployment_id,
name=subscription_name,
deployment_type=ibm_watson_openscale.supporting_classes.enums.DeploymentTypes.ONLINE,
scoring_endpoint=ibm_watson_openscale.base_classes.watson_open_scale_v2.ScoringEndpointRequest(url=scoring_url, request_headers=scoring_request_headers),
)
training_data_reference = ibm_watson_openscale.supporting_classes.TrainingDataReference(
type="cos",
location=ibm_watson_openscale.supporting_classes.COSTrainingDataReferenceLocation(bucket=config["bucket_name"], file_name=training_file_name),
connection=ibm_watson_openscale.supporting_classes.COSTrainingDataReferenceConnection.from_dict(
{"resource_instance_id": config["cos_resource_crn"], "url": config["cos_endpoint"], "api_key": config["cos_api_key"], "iam_url": "https://iam.cloud.ibm.com/oidc/token",}
),
)
feature_columns = [x["name"] for x in input_data_schema]
cat_features = [x["name"] for x in input_data_schema if x["type"] == "object"]
training_data_schema = {
"type": "struct",
"fields": [{"name": x["name"], "type": "string" if x["type"] == "object" else "long", "nullable": True} for x in input_data_schema],
"id": "1",
}
asset_properties_request = ibm_watson_openscale.supporting_classes.AssetPropertiesRequest(
label_column=label_column,
probability_fields=["probability"],
prediction_field="prediction",
feature_fields=feature_columns,
categorical_fields=cat_features,
training_data_reference=training_data_reference,
training_data_schema=ibm_watson_openscale.supporting_classes.SparkStruct.from_dict(training_data_schema),
)
wos_client.subscriptions.add(
data_mart_id=data_mart_id, service_provider_id=service_provider_id, asset=asset, deployment=asset_deployment, asset_properties=asset_properties_request, background_mode=False
)
_logger.info(f"{log_format} - subscribe_custom_model completed for {model_name}.")
[docs]def subscribe_wml_model(config: dict, model_config: dict, space_type: str, log_format: str) -> None:
"""
Subscribe model.
Args:
config (dict): configuration dictionary
model_name (str): model name
space_type (dict): development or production environment
log_format (str): log format for this method
"""
model_name = model_config["model_name"]
_logger.info(f"{log_format} - subscribing wml model ... {model_name}.")
wml_client = config["wml_client"]
wos_client = config["wos_client"]
space_id = config["dev_space_id"] if space_type == "dev" else config["prod_space_id"]
data_mart_id = config["data_mart_id"]
service_provider_id = config["dev_service_provider_id"] if space_type == "dev" else config["prod_service_provider_id"]
training_file_name = config["training_file_name"]
deployment_name = wml.get_model_deployment_name(model_name=model_name)
subscription_name = get_subscription_name(model_name=model_name, space_type=space_type)
model_deployments = wml.get_deployments(config=config, space_type=space_type)
deployment_uid = model_deployments[deployment_name]
model_details = wml.get_model_details(config=config, space_type=space_type)
label_column = model_details[model_name]["entity"]["label_column"]
asset_deployment_details = wos_client.service_providers.list_assets(
data_mart_id=data_mart_id, service_provider_id=service_provider_id, deployment_id=deployment_uid, deployment_space_id=space_id
).result["resources"][0]
model_asset_details_from_deployment = wos_client.service_providers.get_deployment_asset(
data_mart_id=data_mart_id, service_provider_id=service_provider_id, deployment_id=deployment_uid, deployment_space_id=space_id
)
asset = ibm_watson_openscale.supporting_classes.Asset(
asset_id=model_asset_details_from_deployment["entity"]["asset"]["asset_id"],
name=model_asset_details_from_deployment["entity"]["asset"]["name"],
url=model_asset_details_from_deployment["entity"]["asset"]["url"],
asset_type=ibm_watson_openscale.supporting_classes.enums.AssetTypes.MODEL,
input_data_type=ibm_watson_openscale.supporting_classes.enums.InputDataType.STRUCTURED,
problem_type=ibm_watson_openscale.supporting_classes.enums.ProblemType.BINARY_CLASSIFICATION,
)
asset_deployment = ibm_watson_openscale.supporting_classes.AssetDeploymentRequest(
deployment_id=asset_deployment_details["metadata"]["guid"],
name=subscription_name,
deployment_type=ibm_watson_openscale.supporting_classes.enums.DeploymentTypes.ONLINE,
url=asset_deployment_details["entity"]["scoring_endpoint"]["url"],
scoring_endpoint=ibm_watson_openscale.base_classes.watson_open_scale_v2.ScoringEndpointRequest(url=wml_client.deployments.get_scoring_href(wml_client.deployments.get_details(deployment_uid))),
)
training_data_reference = ibm_watson_openscale.supporting_classes.TrainingDataReference(
type="cos",
location=ibm_watson_openscale.supporting_classes.COSTrainingDataReferenceLocation(bucket=config["bucket_name"], file_name=training_file_name),
connection=ibm_watson_openscale.supporting_classes.COSTrainingDataReferenceConnection.from_dict(
{"resource_instance_id": config["cos_resource_crn"], "url": config["cos_endpoint"], "api_key": config["cos_api_key"], "iam_url": "https://iam.cloud.ibm.com/oidc/token",}
),
)
asset_properties_request = ibm_watson_openscale.supporting_classes.AssetPropertiesRequest(
label_column=label_column,
probability_fields=["probability"],
prediction_field="prediction",
feature_fields=[x["name"] for x in model_asset_details_from_deployment["entity"]["asset_properties"]["input_data_schema"]["fields"]],
categorical_fields=[x["name"] for x in model_asset_details_from_deployment["entity"]["asset_properties"]["input_data_schema"]["fields"] if x["type"] == "string"],
training_data_reference=training_data_reference,
training_data_schema=ibm_watson_openscale.supporting_classes.SparkStruct.from_dict(model_asset_details_from_deployment["entity"]["asset_properties"]["input_data_schema"]),
)
wos_client.subscriptions.add(
data_mart_id=data_mart_id, service_provider_id=service_provider_id, asset=asset, deployment=asset_deployment, asset_properties=asset_properties_request, background_mode=False
)
_logger.info(f"{log_format} - subscribe_wml_model completed for {model_name}.")
[docs]def subscribe_model(config: dict, model_config: dict, space_type: str, log_format: str) -> None:
"""
Subscribe model.
Args:
config (dict): configuration dictionary
model_name (str): model name
space_type (dict): development or production environment
log_format (str): log format for this method
"""
model_name = model_config["model_name"]
_logger.info(f"{log_format} - subscribing model ... {model_name}.")
if "scoring_url" in model_config:
subscribe_custom_model(config=config, model_config=model_config, space_type=space_type, log_format=log_format)
else:
subscribe_wml_model(config=config, model_config=model_config, space_type=space_type, log_format=log_format)
_logger.info(f"{log_format} - subscribe_model completed for {model_name}.")
[docs]def create_monitor(config: dict, model_name: str, space_type: str, log_format: str):
"""
Subscribe model.
Args:
config (dict): configuration dictionary
model_name (str): model name
space_type (dict): development or production environment
log_format (str): log format for this method
"""
_logger.info(f"{log_format} - creating monitors ... {model_name}.")
wos_client = config["wos_client"]
data_mart_id = config["data_mart_id"]
monitor_config = config["monitor_config"]
subscriptions = get_subscriptions(config=config)
subscription_name = get_subscription_name(model_name=model_name, space_type=space_type)
subscription_id = subscriptions[subscription_name]
if "quality" in monitor_config:
_logger.info(f"{log_format} - creating quality monitor ... {model_name}.")
wos_client.monitor_instances.create(
monitor_definition_id=wos_client.monitor_definitions.MONITORS.QUALITY.ID,
target=ibm_watson_openscale.supporting_classes.Target(target_type=ibm_watson_openscale.supporting_classes.enums.TargetTypes.SUBSCRIPTION, target_id=subscription_id),
data_mart_id=data_mart_id,
parameters=monitor_config["quality"]["parameters"],
thresholds=monitor_config["quality"]["thresholds"],
background_mode=False,
)
_logger.info(f"{log_format} - quality monitor created for {model_name}.")
if "drift" in monitor_config:
_logger.info(f"{log_format} - creating drift monitor ... {model_name}.")
wos_client.monitor_instances.create(
monitor_definition_id=wos_client.monitor_definitions.MONITORS.DRIFT.ID,
target=ibm_watson_openscale.supporting_classes.Target(target_type=ibm_watson_openscale.supporting_classes.enums.TargetTypes.SUBSCRIPTION, target_id=subscription_id),
data_mart_id=data_mart_id,
parameters=monitor_config["drift"]["parameters"],
background_mode=False,
)
_logger.info(f"{log_format} - drift monitor created for {model_name}.")
if "fairness" in monitor_config:
_logger.info(f"{log_format} - creating fairness monitor ... {model_name}.")
wos_client.monitor_instances.create(
monitor_definition_id=wos_client.monitor_definitions.MONITORS.FAIRNESS.ID,
target=ibm_watson_openscale.supporting_classes.Target(target_type=ibm_watson_openscale.supporting_classes.enums.TargetTypes.SUBSCRIPTION, target_id=subscription_id),
data_mart_id=data_mart_id,
parameters=monitor_config["fairness"]["parameters"],
thresholds=monitor_config["fairness"]["thresholds"],
background_mode=False,
)
_logger.info(f"{log_format} - fairness monitor created for {model_name}.")
if "explainability" in monitor_config:
_logger.info(f"{log_format} - creating explainability monitor ... {model_name}.")
wos_client.monitor_instances.create(
monitor_definition_id=wos_client.monitor_definitions.MONITORS.EXPLAINABILITY.ID,
target=ibm_watson_openscale.supporting_classes.Target(target_type=ibm_watson_openscale.supporting_classes.enums.TargetTypes.SUBSCRIPTION, target_id=subscription_id),
data_mart_id=data_mart_id,
parameters=monitor_config["explainability"]["parameters"],
background_mode=False,
)
_logger.info(f"{log_format} - explainability monitor created for {model_name}.")
if "custom_metric" in config:
_logger.info(f"{log_format} - creating custom monitors ... {model_name}.")
custom_monitor_name = config["custom_metric"]["custom_monitor_name"]
custom_metric_provider_name = get_custom_metric_provider_name(custom_monitor_name=custom_monitor_name)
integrated_system_id = get_integrated_systems(config=config)[custom_metric_provider_name]
custom_monitor_id = get_monitor_definitions(config=config)[custom_monitor_name]
custom_monitor = {"custom_monitor_id": custom_monitor_id, "parameters": {"custom_metrics_provider_id": integrated_system_id, "custom_metrics_wait_time": 60}}
wos_client.monitor_instances.create(
monitor_definition_id=custom_monitor["custom_monitor_id"],
target=ibm_watson_openscale.supporting_classes.Target(target_type=ibm_watson_openscale.supporting_classes.enums.TargetTypes.SUBSCRIPTION, target_id=subscription_id),
data_mart_id=data_mart_id,
parameters=custom_monitor["parameters"],
background_mode=False,
)
_logger.info(f"{log_format} - custom monitors created for {model_name}.")
wos_client.monitor_instances.create(
monitor_definition_id=wos_client.monitor_definitions.MONITORS.MODEL_RISK_MANAGEMENT_MONITORING.ID,
target=ibm_watson_openscale.supporting_classes.Target(target_type=ibm_watson_openscale.supporting_classes.enums.TargetTypes.SUBSCRIPTION, target_id=subscription_id),
data_mart_id=data_mart_id,
parameters={},
managed_by="user",
background_mode=False,
)
_logger.info(f"{log_format} - create_monitor completed for {model_name}.")
[docs]def store_feedback(config: dict, model_name: str, feedback_payload: pd.DataFrame, space_type: str, log_format: str):
"""
Store feedback data.
Args:
config (dict): configuration dictionary
model_name (str): model name
feedback_payload (pd.DataFrame): feedback payload
space_type (dict): development or production environment
log_format (str): log format for this method
"""
_logger.info(f"{log_format} - storing feedback data ... {model_name}.")
subscription_name = get_subscription_name(model_name=model_name, space_type=space_type)
subscriptions = get_subscriptions(config=config)
subscription_id = subscriptions[subscription_name]
wos_client = config["wos_client"]
feedback_dataset_id = (
wos_client.data_sets.list(
type=ibm_watson_openscale.supporting_classes.enums.DataSetTypes.FEEDBACK,
target_target_id=subscription_id,
target_target_type=ibm_watson_openscale.supporting_classes.enums.TargetTypes.SUBSCRIPTION,
)
.result.data_sets[0]
.metadata.id
)
wos_client.data_sets.store_records(feedback_dataset_id, request_body=json.loads(feedback_payload.to_json(orient="records")), background_mode=False)
_logger.info(f"{log_format} - stored {len(feedback_payload)} records for {model_name}.")
_logger.info(f"{log_format} - store_feedback completed for {model_name}.")
[docs]def store_payload(config: dict, model_config: dict, scoring_payload: pd.DataFrame, space_type: str, log_format: str):
"""
Store payload data.
Args:
config (dict): configuration dictionary
model_config (dict): model configuraton
scoring_payload (pd.DataFrame): scoring payload
space_type (dict): development or production environment
log_format (str): log format for this method
"""
model_name = model_config["model_name"]
_logger.info(f"{log_format} - storing payload data ... {model_name}.")
subscription_name = get_subscription_name(model_name=model_name, space_type=space_type)
subscriptions = get_subscriptions(config=config)
subscription_id = subscriptions[subscription_name]
wos_client = config["wos_client"]
scoring_url = model_config["scoring_url"]
scoring_payload = scoring_payload["input_data"][0]
payload_data_set_id = (
wos_client.data_sets.list(
type=ibm_watson_openscale.supporting_classes.enums.DataSetTypes.PAYLOAD_LOGGING,
target_target_id=subscription_id,
target_target_type=ibm_watson_openscale.supporting_classes.enums.TargetTypes.SUBSCRIPTION,
)
.result.data_sets[0]
.metadata.id
)
headers = {"Content-Type": "application/json"}
r = requests.post(scoring_url, data=json.dumps(scoring_payload), headers=headers, verify=False)
scoring_response = r.json()
payload_records = [ibm_watson_openscale.supporting_classes.payload_record.PayloadRecord(scoring_id=str(uuid.uuid4()), request=scoring_payload, response=scoring_response, response_time=int(460))]
wos_client.data_sets.store_records(data_set_id=payload_data_set_id, request_body=payload_records, background_mode=False)
_logger.info(f"{log_format} - stored {len(scoring_payload['values'])} records for {model_name}.")
_logger.info(f"{log_format} - store_payload completed for {model_name}.")
[docs]def evaluate(config: dict, model_name: str, space_type: str, log_format: str):
"""
Evaluate model.
Args:
config (dict): configuration dictionary
model_name (str): model name
space_type (dict): development or production environment
log_format (str): log format for this method
"""
_logger.info(f"{log_format} - evaluating ... {model_name}.")
wos_client = config["wos_client"]
monitors = get_monitor_instances_by_subscription_name(config=config, model_name=model_name, space_type=space_type)
mrm_monitor_instance_id = monitors["mrm"]
evaluation_tests = ["fairness", "quality", "drift"]
if "custom_metric" in config:
custom_monitor_name = config["custom_metric"]["custom_monitor_name"]
custom_metric_name = get_custom_metric_name(custom_monitor_name=custom_monitor_name)
evaluation_tests.append(custom_metric_name)
mrm_run_parameters = {"on_demand_trigger": True, "evaluation_tests": evaluation_tests, "publish_fact": "true"}
wos_client.monitor_instances.run(monitor_instance_id=mrm_monitor_instance_id, triggered_by="user", background_mode=False, parameters=mrm_run_parameters)
_logger.info(f"{log_format} - evaluate completed for {model_name}.")