Source code for cpdflow.lifecycle.subscribe

"""
Subscribe and evaluate model on OpenScale in development or production.
"""
import logging
import pandas as pd
from cpdflow import graph
from cpdflow.wos import wos
from cpdflow.wml import wml
import functools

_logger = logging.getLogger(__name__)


[docs]def subscribe_remove(config: dict, model_configs: list, space_type: str, backward_steps: list) -> None: """ Remove subscriptions that are not specified in ``model_configs``. Args: config (dict): configuration dictionary model_configs (list[dict]): models to be removed space_type (str): development or production environment backward_steps (list[str]): list of steps to remove downstream assets """ all_subscriptions = wos.get_subscriptions(config) model_names = [x["model_name"] for x in model_configs] delete_subscriptions_model_names = [ wos.get_model_name_from_subscription_name(subscription_name=x, space_type=space_type) for x in all_subscriptions.keys() if wos.get_model_name_from_subscription_name(subscription_name=x, space_type=space_type) in model_names ] if delete_subscriptions_model_names: log_format = f"SUBSCRIBE - {'REMOVE':<15} - backward_steps" _logger.info(f"{log_format} - {' -> '.join(backward_steps)} for {delete_subscriptions_model_names}") graph.remove(config=config, model_names=delete_subscriptions_model_names, space_types=[space_type], backward_steps=backward_steps, log_format=log_format) else: _logger.info(f"SUBSCRIBE - {'REMOVE':<15} - Nothing to remove")
[docs]def subscribe_overwrite(config: dict, model_configs: list, space_type: str, backward_steps: list) -> None: """ Remove subscriptions that are specified in ``model_configs``. Args: config (dict): configuration dictionary model_configs (list[dict]): subscriptions to be removed backward_steps (list[str]): list of steps to remove assets """ if model_configs: model_names = [x["model_name"] for x in model_configs if x["overwrite"]] if model_names: log_format = f"SUBSCRIBE - {'OVERWRITE':<15} - backward_steps" _logger.info(f"{log_format} - {' -> '.join(backward_steps)} for {model_names}") graph.remove(config=config, model_names=model_names, space_types=[space_type], backward_steps=backward_steps, log_format=log_format) else: _logger.info(f"SUBSCRIBE - {'OVERWRITE':<15} - Nothing to overwrite.") else: _logger.info(f"SUBSCRIBE - {'OVERWRITE':<15} - Nothing to overwrite")
[docs]def subscribe_create(config: dict, model_configs: list, space_type: str, scoring_payload: dict, feedback_payload: dict) -> None: """ Create subscriptions that are specified in ``model_configs``. Args: config (dict): configuration dictionary model_configs (list[dict]): subscriptions to be created space_type (str): development or production environment scoring_payload (dict): payload for scoring feedback_payload (pd.Dataframe): payload to store feedback """ all_subscriptions = wos.get_subscriptions(config) model_configs = [x for x in model_configs if wos.get_subscription_name(model_name=x["model_name"], space_type=space_type) not in all_subscriptions.keys()] if model_configs: for model_config in model_configs: include = ["store_payload"] if "scoring_url" in model_config else ["score_model"] check_require_steps_source = "get_metadata" if "scoring_url" in model_config else "run_model" forward_steps = graph.get_forward_steps(source="subscribe_model", target="evaluate", include=include) check_require_steps = graph.get_check_require_steps(source=check_require_steps_source, target="subscribe_model") model_name = model_config["model_name"] args = { "config": config, "model_config": model_config, "model_name": model_name, "scoring_payload": scoring_payload, "feedback_payload": feedback_payload, "custom_monitor_config": None, "space_types": [space_type], } log_format = f"SUBSCRIBE - {'REQUIREMENTS':<15} - require_steps" args["log_format"] = log_format _logger.info(f"{log_format} - {' -> '.join(check_require_steps)} for {model_name}.") results = graph.run(direction="require", steps=check_require_steps, args=args) require_steps = [k for k, v in results.items() if not v] forward_steps = require_steps + forward_steps log_format = f"SUBSCRIBE - {'CREATE':<15} - forward_steps" args["log_format"] = log_format _logger.info(f"{log_format} - {' -> '.join(forward_steps)} for {model_name}.") graph.run(direction="forward", steps=forward_steps, args=args) else: _logger.info(f"SUBSCRIBE - {'CREATE':<15} - Nothing to create")
[docs]def custom_metric_overwrite(config: dict, space_type: str) -> None: """ Remove subscriptions that are specified in ``model_configs``. Args: config (dict): configuration dictionary """ log_format = f"SUBSCRIBE - {'OVERWRITE':<15} - remove custom metric" _logger.info(f"{log_format} - {config['custom_metric']['custom_monitor_name']}.") custom_monitor_name = config["custom_metric"]["custom_monitor_name"] custom_metric_function_name = wos.get_custom_monitor_function_name(custom_monitor_name=custom_monitor_name) function_deployment_name = wml.get_function_deployment_name(function_name=custom_metric_function_name) wos.delete_custom_metric_monitor_by_custom_monitor_name(config=config, log_format=log_format) wos.delete_integrated_system_by_custom_monitor_name(config=config, log_format=log_format) wml.delete_function_deployment_by_function_deployment_names(config=config, function_deployment_names=[function_deployment_name], space_type=space_type) wml.delete_function_by_function_names(config=config, function_names=[custom_metric_function_name], space_type=space_type, log_format=log_format)
[docs]def subscribe(config: dict, model_names: list, space_type: str) -> None: """ Remove, overwrite or create subscriptions that are specified in ``model_configs``. Handles the ``validate`` and ``operate`` lifecycle stages. Args: config (dict): configuration dictionary model_names (list[str]): subscriptions to be removed, updated, overwritten or created """ model_names_copy = model_names[:] _logger.info(f"SUBSCRIBE - {'START':<15} - {model_names}") backward_steps = graph.get_backward_steps(source="subscribe_model", target="evaluate") df_scoring_payload = pd.read_csv(config["scoring_payload"]["file_name"]) df_meta_payload = pd.read_csv(config["meta_payload"]["file_name"]) meta_payload = {"fields": df_meta_payload.columns.tolist(), "values": df_meta_payload.values.tolist()} scoring_payload = {"input_data": [{"fields": df_scoring_payload.columns.tolist(), "values": df_scoring_payload.values.tolist(), "meta": meta_payload}]} feedback_payload = pd.read_csv(config["feedback_payload"]["file_name"]) model_configs = [x for x in config["model_configs"] if x["model_name"] in model_names] model_names = [x["model_name"] for x in model_configs] # remove subscribe_remove(config=config, model_configs=model_configs, space_type=space_type, backward_steps=backward_steps) # overwrite subscribe_overwrite(config=config, model_configs=model_configs, space_type=space_type, backward_steps=backward_steps) # custom metric if "custom_metric" in config: if "overwrite" in config["custom_metric"]: custom_metric_overwrite(config=config, space_type=space_type) wos.create_custom_metric_provider(config=config, space_type=space_type) wos.create_integrated_system(config=config, space_type=space_type) wos.create_custom_metric_monitor(config=config) # create subscribe_create(config=config, model_configs=model_configs, space_type=space_type, scoring_payload=scoring_payload, feedback_payload=feedback_payload) # evaluate evaluate(config=config, model_names=model_names, space_type=space_type) _logger.info(f"SUBSCRIBE - {'COMPLETED':<15} - {model_names_copy}")
[docs]def evaluate(config: dict, model_names: list, space_type: str) -> None: """ Evaluates the models. Args: config (dict): configuration dictionary model_names (list[str]): subscriptions to be removed, updated, overwritten or created """ log_format = f"EVALUATE" for model_name in model_names: wos.evaluate(config=config, model_name=model_name, space_type=space_type, log_format=log_format)
[docs]class validate: """ Handles delete and apply commands. """
[docs] @staticmethod def delete(config: dict, model_names: list) -> None: """ Remove subscriptions specified in ``model_configs``. Handles the ``validate`` and ``operate`` lifecycle stages. Args: config (dict): configuration dictionary model_names (list[str]): subscriptions to be removed, updated, overwritten or created """ model_names_copy = model_names[:] _logger.info(f"SUBSCRIBE - {'START':<15} - {model_names}") backward_steps = graph.get_backward_steps(source="subscribe_model", target="evaluate") model_configs = [x for x in config["model_configs"] if x["model_name"] in model_names] model_names = [x["model_name"] for x in model_configs] # remove subscribe_remove(config=config, model_configs=model_configs, space_type="dev", backward_steps=backward_steps) _logger.info(f"SUBSCRIBE - {'COMPLETED':<15} - {model_names_copy}")
[docs] @staticmethod def apply(config: dict, model_names: list) -> None: """ Remove, overwrite or create subscriptions that are specified in ``model_configs``. Handles the ``validate`` and ``operate`` lifecycle stages. Args: config (dict): configuration dictionary model_names (list[str]): subscriptions to be removed, updated, overwritten or created """ subscribe(config=config, model_names=model_names, space_type="dev")
[docs]class operate: """ Handles delete and apply commands. """
[docs] @staticmethod def delete(config: dict, model_names: list) -> None: """ Remove subscriptions specified in ``model_configs``. Handles the ``validate`` and ``operate`` lifecycle stages. Args: config (dict): configuration dictionary model_names (list[str]): subscriptions to be removed, updated, overwritten or created """ model_names_copy = model_names[:] _logger.info(f"SUBSCRIBE - {'START':<15} - {model_names}") backward_steps = graph.get_backward_steps(source="subscribe_model", target="evaluate") model_configs = [x for x in config["model_configs"] if x["model_name"] in model_names] model_names = [x["model_name"] for x in model_configs] # remove subscribe_remove(config=config, model_configs=model_configs, space_type="prod", backward_steps=backward_steps) _logger.info(f"SUBSCRIBE - {'COMPLETED':<15} - {model_names_copy}")
[docs] @staticmethod def apply(config: dict, model_names: list) -> None: """ Remove, overwrite or create subscriptions that are specified in ``model_configs``. Handles the ``validate`` and ``operate`` lifecycle stages. Args: config (dict): configuration dictionary model_names (list[str]): subscriptions to be removed, updated, overwritten or created """ subscribe(config=config, model_names=model_names, space_type="prod")
# validate = functools.partial(subscribe, space_type="dev") # operate = functools.partial(subscribe, space_type="prod") # def subscribe_remove_declarative(config: dict, model_configs: list[dict], space_type: str, backward_steps: list[str]) -> None: # """ # Remove subscriptions that are not specified in ``model_configs``. # Args: # config (dict): configuration dictionary # model_configs (list[dict]): models to be removed # space_type (str): development or production environment # backward_steps (list[str]): list of steps to remove downstream assets # """ # all_subscriptions = wos.get_subscriptions(config) # model_names = [x["model_name"] for x in model_configs] # delete_subscriptions_model_names = [ # wos.get_model_name_from_subscription_name(subscription_name=x, space_type=space_type) # for x in all_subscriptions.keys() # if wos.get_model_name_from_subscription_name(subscription_name=x, space_type=space_type) not in model_names # ] # if delete_subscriptions_model_names: # log_format = f"SUBSCRIBE - {'REMOVE':<15} - backward_steps" # _logger.info(f"{log_format} - {' -> '.join(backward_steps)} for {delete_subscriptions_model_names}") # graph.remove(config=config, model_names=delete_subscriptions_model_names, space_types=[space_type], backward_steps=backward_steps, log_format=log_format) # else: # _logger.info(f"SUBSCRIBE - {'REMOVE':<15} - Nothing to remove") # def subscribe_declarative(config: dict, model_names: list[str], space_type: str) -> None: # """ # Remove, overwrite or create subscriptions that are specified in ``model_configs``. # Handles the ``validate`` and ``operate`` lifecycle stages. # Args: # config (dict): configuration dictionary # model_names (list[str]): subscriptions to be removed, updated, overwritten or created # """ # model_names_copy = model_names[:] # _logger.info(f"SUBSCRIBE - {'START':<15} - {model_names}") # backward_steps = graph.get_backward_steps(source="subscribe_model", target="evaluate") # df_scoring_payload = pd.read_csv(config["scoring_payload"]["file_name"]) # df_meta_payload = pd.read_csv(config["meta_payload"]["file_name"]) # meta_payload = {"fields": df_meta_payload.columns.tolist(), "values": df_meta_payload.values.tolist()} # scoring_payload = {"input_data": [{"fields": df_scoring_payload.columns.tolist(), "values": df_scoring_payload.values.tolist(), "meta": meta_payload}]} # feedback_payload = pd.read_csv(config["feedback_payload"]["file_name"]) # model_configs = [x for x in config["model_configs"] if x["model_name"] in model_names] # model_names = [x["model_name"] for x in model_configs] # # remove # subscribe_remove_declarative(config=config, model_configs=model_configs, space_type=space_type, backward_steps=backward_steps) # # overwrite # subscribe_overwrite(config=config, model_configs=model_configs, space_type=space_type, backward_steps=backward_steps) # # custom metric # if "custom_metric" in config: # if "overwrite" in config["custom_metric"]: # custom_metric_overwrite(config=config, space_type=space_type) # wos.create_custom_metric_provider(config=config, space_type=space_type) # wos.create_integrated_system(config=config, space_type=space_type) # wos.create_custom_metric_monitor(config=config) # # create # subscribe_create(config=config, model_configs=model_configs, space_type=space_type, scoring_payload=scoring_payload, feedback_payload=feedback_payload) # # evaluate # evaluate(config=config, model_names=model_names, space_type=space_type) # _logger.info(f"SUBSCRIBE - {'COMPLETED':<15} - {model_names_copy}") # root_nodes = [n for n, d in graph.G.in_degree() if d == 0] # requirements = {x: nx.shortest_path(G=graph.G, source=x, target="subscribe_model")[:-1] for x in root_nodes} # requirement_steps = [] # check_require_steps = requirements["run_model"] # log_format = f"SUBSCRIBE - {'REQUIREMENTS':<15} - require_steps" # args["log_format"] = log_format # _logger.info(f"{log_format} - {' -> '.join(check_require_steps)} for {model_name}.") # results = graph.run(direction="require", steps=check_require_steps, args=args) # require_steps = [k for k, v in results.items() if not v] # requirement_steps.append(require_steps) # if "custom_metric" in config: # requirement_steps.append(requirements["create_custom_metric_provider"]) # Parallel(n_jobs=2, prefer="threads")(delayed(graph.run)(direction="forward", steps=x, args=args) for x in requirement_steps)