では、Databricks のリファレンス アーキテクチャに沿って、Databricks と Spark を使用してエンドツーエンドの MLOps パイプラインを構築するための最初の手順を実行しました。ここで、説明した主要な手順を要約します。 このチュートリアル シリーズの最初の部分 : Unity カタログ内でデータをブロンズ、シルバー、ゴールドのレイヤーに整理し、構造化された効率的なデータ管理システムを確立しました。 Medallion Architecture 用の Unity カタログの設定 : 生データをシステムにインポートし、後続の処理段階で一貫性と品質を確保する方法を説明しました。 Unity カタログへのデータの取り込み : Databricks を利用して、スケーラブルで効果的なモデル開発のベスト プラクティスに従い、データセットに合わせてカスタマイズされた機械学習モデルをトレーニングしました。 モデルのトレーニング : モデルのパフォーマンスを向上させるために、HyperOpt を使用して最適なハイパーパラメータの検索を自動化し、精度と効率を向上させました。 HyperOpt によるハイパーパラメータの調整 : MLflow を使用して実験を記録および監視し、モデルのバージョン、メトリック、およびパラメーターの包括的な記録を維持して、簡単に比較および再現できるようにしました。 Databricks MLflow を使用した実験の追跡 これらの基本的な手順が完了すると、モデルを展開する準備が整います。この第 2 部では、次の 2 つの重要なコンポーネントをシステムに統合することに焦点を当てます。 : 一括スコアリングや定期レポートなどのアプリケーションに適した、大規模なデータセットで予測を生成するバッチ処理を実装します。 バッチ推論 : インタラクティブなアプリケーションやサービスに不可欠な即時予測を提供するためのリアルタイム モデル サービングを設定します。 オンライン推論 (モデル サービング) デプロイされたモデルが長期にわたって最適なパフォーマンスと信頼性を維持できるようにします。 モデル監視: さあ始めましょう! モデルの展開 前回のブログの出発点はモデル評価でした。比較を行った結果、この実稼働モデルと比較して、私たちのモデルの方がパフォーマンスが高いことがわかったとします。実稼働環境でモデルを使用する (と想定している) ため、所有するすべてのデータを活用したいと考えています。次のステップは、完全なデータセットを使用してモデルをトレーニングおよびテストすることです。次に、チャンピオン モデルとしてデプロイして、後で使用するためにモデルを永続化します。これは推論に使用する最終的なモデルであるため、特徴エンジニアリング クライアントを使用してモデルをトレーニングします。この方法では、モデル系統の追跡が容易になるだけでなく、スキーマ検証と特徴変換 (ある場合) をクライアントにオフロードできます。 with mlflow.start_run(run_name="ALS_best_model") as run: als = ALS() # Now we set the parameters for the method als.setMaxIter(MAX_ITER)\ .setSeed(SEED)\ .setRegParam(best_params["REG_PARAM"])\ .setUserCol(COL_USER)\ .setItemCol(COL_ITEM)\ .setRatingCol(COL_LABEL)\ .setRank(best_params["RANK"]) mlflow.log_param("MAX_ITER", MAX_ITER) mlflow.log_param("RANK", best_params["RANK"]) mlflow.log_param("REG_PARAM", best_params["REG_PARAM"]) # Create the model with these parameters. model = als.fit(df_full_data) #drop predictions where users and products from the test test and didn't make it into the training set. in this case, the prediction is NaN model.setColdStartStrategy('drop') predictions = model.transform(df_full_data) signature = infer_signature(model_input = df_full_data, model_output = predictions.select(COL_LABEL)) #log the model mlflow.spark.log_model(model, model_name, sample_input=df_full_data.limit(3), signature = signature, conda_env=mlflow.spark.get_default_conda_env(), registered_model_name=f"{catalog_name}.{model_schema}.{model_name}") evaluator = RegressionEvaluator(predictionCol=COL_PRED, labelCol=COL_LABEL) rmse = evaluator.setMetricName("rmse").evaluate(predictions) mlflow.log_metric('rmse', rmse) 使用してモデルをトレーニングし、ログに記録することもできます。 Feature StoreやFeature Engineering APIを model_info = fe.log_model(model=model, artifact_path = model_name, flavor=mlflow.spark, training_set=fe_full_data, conda_env=mlflow.spark.get_default_conda_env(), registered_model_name= f"{catalog_name}.{model_schema}.{model_name}" ) 特徴量エンジニアリングAPIを使用すると、カタログエクスプローラーでモデルの系統を表示できます。 次に、モデルの説明を更新し、Champion ラベルを割り当てます。 import time from mlflow.tracking.client import MlflowClient from mlflow.entities.model_registry.model_version_status import ModelVersionStatus client = MlflowClient() #find the latest model version model_name_path = f"{catalog_name}.{model_schema}.{model_name}" model_version_infos = client.search_model_versions(f"name ='{model_name_path}'") new_model_version = max([int(model_version_info.version) for model_version_info in model_version_infos]) #add the model and model version descirption client.update_registered_model( name=model_name_path, description="collaborative filtering using Spark mllib ALS. This model use rating table" ) client.update_model_version( name=model_name_path, version=new_model_version, description="this model is optimized Rank and REG_PARAM with Hyperopt and rmse as a loss function. trained on the full dataset" ) # assign alias client.set_registered_model_alias(model_name_path, "Champion", new_model_version) 次に、モデルを登録したスキーマを確認します。次のようにすべての更新が表示されます。 : モデルレジストリにワークスペースを使用する場合は、モデルを管理するためにステージを使用する必要があります。エイリアスは使用できません。 参照してください。 どのように機能するかを見る モデルステージ こちらを モデル推論 バッチスコアリング ここで、推論のために本番環境でモデルを使用したいとします。このステップでは、チャンピオン モデルをロードし、それを使用して各ユーザーに対して 20 本の映画の推奨を生成します。 from mlflow.spark import load_model as spark_load_model from mlflow.tracking.client import MlflowClient from create_training_set import split_data #-- set UC as model registray mlflow.set_registry_uri("databricks-uc") #-- initate mlflow client client = MlflowClient() # -- read the config file with open('config.json') as config_file: config = json.load(config_file) catalog_name = config["catalog_name"] gold_layer = config["gold_layer_name"] silver_layer = config["silver_layer_name"] user_item_table_name = config["user_item_table_name"] ft_user_item_name = config["ft_user_item_name"] model_name = config["model_name"] model_schema = config["model_schema"] #-- create the model uri model_path = f"{catalog_name}.{model_schema}.{model_name}" # --create the model_uri: there are two ways to do this # 1: using the alias (we use this*) model_version_uri = f"models:/{model_uri}@champion" # 2: using model version #champion_version = client.get_model_version_by_alias(model_uri, "champion") #model_version_uri = f"models:/{model_uri}/{champion_version.version}" # -- load the model pipline and exctract the model model_pipeline = spark_load_model(model_version_uri) model = model_pipeline.stages[0] # -- batch scoring using the the model fe_full_data, df_full_data, df_train, df_test = split_data() df_batch_input = df_full_data.drop("rating") df_scores = model.transform(df_batch_input) # --- in case you used Feature Engineering to train and register model #from databricks.feature_engineering import FeatureEngineeringClient #fe = FeatureEngineeringClient() # fe.score_batch(model_uri=f"{model_version_uri}",df = df_batch_input) バッチ スコアリングに同じトレーニング データを使用したことがわかります。レコメンデーション システムの場合は理にかなっています。ただし、ほとんどのアプリケーションでは、モデルを使用して、目に見えないデータをスコアリングする必要があります。たとえば、Netflix を視聴していて、1 日の終わりに新しい視聴リストに基づいてユーザー レコメンデーションを更新したいとします。1 日の終わりの特定の時間にバッチ スコアリングを実行するジョブをスケジュールできます。 次に、各ユーザーに対するおすすめを生成します。ユーザーごとに上位20個のアイテムを見つけます。 from pyspark.sql.window import Window from pyspark.sql.functions import col, split, row_number, collect_list from pyspark.sql.functions import col, collect_list, expr, lit, min, row_number, desc windowSpec = Window.partitionBy("user_id").orderBy(col("prediction").desc()) df_top_20_items = df_scores.withColumn("rank", row_number().over(windowSpec)).filter(col("rank") <= 20) df_user_recs = df_top_20_items.groupBy("user_id") \ .agg(collect_list(col("item_id").cast("double")).alias("top_item_ids")) 結果は次のようになります 最後に、予測をデルタラベルとしてUCに保存するか、下流のシステムMongo DBまたはAzure Cosmos DBに公開することができます。最初のオプションを選択します。 df_user_recs.write.mode("overwrite").saveAsTable(f"{catalog_name}.{output_schema}.top20_item_recommendations") ストリーミング/オンライン推論 ここで、リアルタイムのユーザー インタラクションに基づいて推奨事項を更新するケースを想像してください。このケースでは、モデル サービングを使用できます。誰かがモデルを使用したい場合、データをサーバーに送信できます。サーバーは、そのデータをデプロイされたモデルに送り、モデルが動作してデータを分析し、予測を生成します。これらは、Web アプリケーション、モバイル アプリ、さらには組み込みシステムでも使用できます。このアプローチの用途の 1 つは、A/B テストのトラフィック ルーティングを有効にすることです。 ALS アルゴリズムは、推奨事項を更新するためにデータ全体 (古いデータと新しいデータ) を使用してモデルを再トレーニングする必要があるため、オンライン推論に直接使用することはできません。勾配降下学習アルゴリズムは、オンライン更新に使用できるモデルの例です。今後の投稿で、これらのアルゴリズムのいくつかについて説明する予定です。 ただし、このようなモデルがどのように機能するかを説明するために、ユーザーが映画を評価するたびにそれに基づいて映画の評価を予測する (役に立たない) モデル提供エンドポイントを作成しています。 import requests model_path = f"{catalog_name}.{model_schema}.{model_name}" champion_version = client.get_model_version_by_alias(model_path, "champion") # Set the name of the MLflow endpoint endpoint_name = config["model_serving_endpoint_name"] # Name of the registered MLflow model model_name = model_path # Specify the type of compute (CPU, GPU_SMALL, GPU_MEDIUM, etc.) workload_type = "CPU" # Specify the scale-out size of compute (Small, Medium, Large, etc.) workload_size = "Small" # Get the latest version of the MLflow model model_version = int(champion_version.version) # Specify Scale to Zero(only supported for CPU endpoints) scale_to_zero = False # Get the API endpoint and token for the current notebook context API_ROOT = dbutils.notebook.entry_point.getDbutils().notebook().getContext().apiUrl().get() API_TOKEN = dbutils.notebook.entry_point.getDbutils().notebook().getContext().apiToken().get() data = { "name": endpoint_name, "config": { "served_models": [ { "model_name": model_name, "model_version": int(model_version), "workload_size": workload_size, "scale_to_zero_enabled": scale_to_zero, "workload_type": workload_type, } ] }, } headers = {"Context-Type": "text/json", "Authorization": f"Bearer {API_TOKEN}"} response = requests.post( url=f"{API_ROOT}/api/2.0/serving-endpoints", json=data, headers=headers ) これにより、モデル サービング クラスターが作成され、起動されるため、しばらく時間がかかります。ここで、 ウィンドウを開くと、エンドポイントが表示されます。 Serving 1 つのエンドポイントを使用して複数のモデルを提供できます。その後、A/B テストなどのシナリオにトラフィック ルーティングを使用したり、本番環境で異なるモデルのパフォーマンスを比較したりできます。 推論表 Databricks Model Serving の推論テーブルは、デプロイされたモデルの自動ログとして機能します。有効にすると、受信リクエスト (予測用に送信されるデータ)、対応するモデル出力 (予測)、およびその他のメタデータが Unity Catalog 内の Delta テーブルとしてキャプチャされます。推論テーブルは、 、 、およびモデルの や ためのデータ収集手順に使用できます。 監視とデバッグ 系統追跡 再トレーニング 微調整の モデルを監視するために、サービス エンドポイントで 有効にすることができます。これを行うには、最初にエンドポイントを作成するときに、ペイロードで プロパティを指定します。または、次のように コマンドと エンドポイント URL を使用して、後でエンドポイントを更新します (詳細は inference table auto_capture_config put config )。 こちら data = { "served_models": [ { "model_name": model_name, "model_version": int(model_version), "workload_size": workload_size, "scale_to_zero_enabled": scale_to_zero, "workload_type": workload_type, } ], "auto_capture_config":{ "catalog_name": catalog_name, "schema_name": model_schema, "table_name_prefix": payload_table, } } headers = {"Context-Type": "application/json", "Authorization": f"Bearer {API_TOKEN}"} response = requests.put(url=f"{API_ROOT}/api/2.0/serving-endpoints/{endpoint_name}/config", json=data, headers=headers) print(json.dumps(response.json(), indent=4)) では、エンドポイントにダミーのユーザーインタラクションデータを入力してみましょう。 import random import time all_items = df_full_data.select(col("item_id")).distinct() for user_id in range(50,54): items_not_rated_by_user = df_full_data.where(col("user_id")==user_id).select(col("item_id")).distinct()#collect()[0][0] no_rated_items = [item.item_id for item in all_items.subtract(items_not_rated_by_user).limit(4).collect()] data = { "dataframe_records": [ {"user_id":user_id, "item_id":no_rated_items[0], "rating": random.randint(1, 5)}, {"user_id":user_id, "item_id":no_rated_items[1], "rating": random.randint(1, 5)}, {"user_id":user_id, "item_id":no_rated_items[2], "rating": random.randint(1, 5)}, {"user_id":user_id, "item_id":no_rated_items[2], "rating": random.randint(1, 5)}, ] } response = requests.post( url=f"{API_ROOT}/serving-endpoints/{endpoint_name}/invocations", json=data, headers=headers ) # generate the data within the timespan of 1 to 8 minutes time.sleep(random.randint(60*1, 60*8)) エンドポイント ログは テーブルで確認できます。テーブルにデータが表示されるまでに約 10 分かかります。 <catalog>.<schema>.<payload_table> table_path = f"{catalog_name}.{model_schema}.{payload_table}" # Read data from the inference table df_inf_table = spark.read.table(table_path) display(df_inf_table ) ペイロードテーブルには次のようなものが表示されます この推論テーブルのスキーマを理解するには、 「Unity カタログ推論テーブル スキーマ」を確認してください。 ここにある モデル監視 モデルとデータの監視は、習得に長い時間を要する複雑なトピックです。Databricks Lakehouse Monitoring (DLM) は、一般的なユースケース向けの標準のカスタマイズ可能なテンプレートを提供することで、適切な監視システムを構築するオーバーヘッドを削減します。ただし、DLM とモデル監視を習得するには、一般的に多くの実験が必要です。ここでは、モデル監視の広範な概要を説明するのではなく、出発点を示します。今後、このトピックに関するブログを書くかもしれません。 DLMの機能と特徴の簡単な概要 モデルが稼働している状態で、サービス エンドポイントによって生成された推論テーブルを使用して、モデルのパフォーマンスやドリフトなどの主要なメトリックを監視し、時間の経過とともにデータやモデルの偏差や異常を検出できます。このプロアクティブなアプローチにより、モデルの再トレーニングや機能の更新などの適切な修正アクションをタイムリーに実行して、最適なパフォーマンスを維持し、ビジネス目標との整合性を保つことができます。 DLM は、 、 、 の 3 種類の分析または を提供します。推論テーブルの分析に関心があるため、後者に焦点を当てます。監視用のテーブル (「 」) を使用するには、テーブルの構造が正しいことを確認する必要があります。 各行は次の列を持つリクエストに対応している必要があります。 時系列 スナップショット 推論 profile type プライマリ テーブル の場合、 推論テーブル モデルの特徴 モデル予測 モデルID : 推論リクエストのタイムスタンプ タイムスタンプ (オプション) グラウンドトゥルース 複数のモデルを提供し、1 つの監視ダッシュボードで各モデルのパフォーマンスを追跡する場合に重要です。複数のモデル ID が使用可能な場合、DLM はそれを使用してデータをスライスし、スライスごとにメトリックと統計を個別に計算します。 モデル ID は、 DLM は、指定された時間間隔の各統計とメトリックを計算します。推論分析では、 列とユーザー定義のウィンドウ サイズを使用して時間ウィンドウを識別します。詳細は以下を参照してください。 タイムスタンプ DLM は推論テーブルに対して「 」または「 」という 2 つの をサポートしています。この仕様に基づいて、関連するメトリックと統計の一部を計算します。 分類 回帰 problem type DLM を使用するには、モニターを作成し、それをテーブルにアタッチする必要があります。これを行うと、DLM は 2 つの を作成します。 metric tables : このテーブルには、最小値、最大値、null とゼロの割合などの要約統計が含まれています。また、ユーザーが定義した問題の種類に基づく追加のメトリックも含まれます。たとえば、分類モデルの 、 、 、回帰モデルの と などです。 プロファイル メトリック テーブル 精度 再現率 f1_score mean_squared_error mean_average_error : データの分布が 、または と比較してどのように変化したかを測定する統計が含まれます。カイ二乗検定、KS 検定などの指標を計算します。 ドリフト メトリック テーブル 時間の経過と共に ベースライン値 (提供されている場合) 各テーブルの完全なメトリックのリストを表示するには、 ドキュメント ページを確認してください。 モニター メトリック テーブルの を作成することもできます。 カスタム メトリック 監視システムを構築する上で重要な点は、最新の推論データが到着するとすぐに監視ダッシュボードがそれらにアクセスできるようにすることです。そのためには、 使用して、推論テーブルで処理された行を追跡できます。モデル サービングの推論テーブルをソース テーブル ( ) として使用し、監視テーブルをシンク テーブル ( ) として使用します。また、両方のテーブルで (CDC) が有効になっていることを確認します (推論テーブルではデフォルトで有効になっています)。このようにして、更新のたびにテーブル全体を再処理するのではなく、ソース テーブルの変更 (挿入/更新/削除) のみを処理します。 デルタ テーブル ストリーミングを readStream writeStream 変更データ キャプチャ 実践 推論テーブルの監視を有効にするには、次の手順を実行します。 推論テーブルをストリーミングテーブルとして読み取る モデル提供エンドポイントによって生成された推論テーブルを解凍して、適切なスキーマを持つ新しいデルタ テーブルを作成します。 ベースラインテーブルを準備する(ある場合) 結果のテーブルにモニターを作成し、メトリックを更新します。 推論テーブルを適切な構造に解凍し、メトリックを更新するワークフローをスケジュールします。 まず、Lakehouse Monitoring API をインストールする必要があります。Databricks rum time 15.3 LTS 以上を使用している場合は、すでにインストールされているはずです。 %pip install "https://ml-team-public-read.s3.amazonaws.com/wheels/data-monitoring/a4050ef7-b183-47a1-a145-e614628e3146/databricks_lakehouse_monitoring-0.4.14-py3-none-any.whl" dbutils.library.restartPython() 推論テーブルをストリーミングテーブルとして読み込んでみましょう requests_raw = spark.readStream\ .format("delta")\ .table(inference_table_name) requests_raw.isStreaming #-> True 次に、上記のようにテーブルを正しい形式にする必要があります。このテーブルには、関連する機能と予測値を含む予測ごとに 1 つの行が必要です。モデル提供エンドポイントから取得する推論テーブルには、エンドポイントの要求と応答がネストされた JSON 形式で保存されます。以下は、要求列と応答列の JSON ペイロードの例です。 #requests {"dataframe_records": [ {"user_id": 1, "item_id": 346, "rating": 5}, {"user_id": 1, "item_id": 377, "rating": 2}, {"user_id": 1, "item_id": 302, "rating": 4} ] } #reponse {"predictions": [4.248899936676025, 1.1172138452529907, 4.279165744781494] } # --> what we need | user_id | item_id | rating | prediction | |---------|---------|--------|------------| | 1 | 346 | 5 | 4.248900 | | 1 | 377 | 2 | 1.117214 | | 1 | 302 | 4 | 4.279166 | このテーブルを適切なスキーマに解凍するには、Databricks ドキュメント ( ) から改変した次のコードを使用できます。 推論テーブル Lakehouse 監視スターター ノートブック # define the schema of the request and reponse fields in the inference tabel REQUEST_FIELDS = [StructField('user_id', IntegerType(), False),\ StructField('item_id', IntegerType(), False),\ StructField('rating', IntegerType(), False)\ ] RESPONSE_FIELD = [T.StructField("predictions", FloatType())] def process_col_requests(json_str:str) -> str: """ to proccess the JSON payload of request column in inference table """ request = json.loads(json_str) dataframe_records = request.get("dataframe_records", []) return dataframe_records def procces_col_response(json_str: str) -> str: """ to proccess the JSON payload of reponse column in inference table """ reponse = json.loads(json_str) output = [{prediction_col: round(prediction,4)} for prediction in reponse["predictions"]] return output def get_model_id(endpoint_name: str) -> str: """ create the model id by concatinating the model name and the model version. note: the assumption is the endpoint serves only one model """ served_models = get_served_models(endpoint_name) model_id = f"{served_models[0]['model_name']}_{served_models[0]['model_version']}" return model_id def process_requests(requests_raw: DataFrame, request_fields: List[T.StructField], response_field: T.StructField, endpoint_name: str) -> DataFrame: """ Takes a stream of raw requests and processes them by: - Unpacking JSON payloads for requests and responses - Exploding batched requests into individual rows - Converting Unix epoch millisecond timestamps to be Spark TimestampType :param requests_raw: DataFrame containing raw requests. Assumed to contain the following columns: - `request` - `response` - `timestamp_ms` :param request_fields: List of StructFields representing the request schema :param response_field: A StructField representing the response schema :return: A DataFrame containing processed requests """ # Convert the timestamp milliseconds to TimestampType for downstream processing. requests_timestamped = requests_raw \ .withColumn(timestamp_col, (F.col("timestamp_ms") / 1000).cast(T.TimestampType())) \ .drop("timestamp_ms") # create the model identifier column model_id = get_model_id(endpoint_name) # Convert the model name and version columns into a model identifier column. requests_identified = requests_timestamped \ .withColumn(model_id_col, F.lit(model_id)) # Rename the date column to avoid collisions with features. requests_dated = requests_identified.withColumnRenamed("date", date_col) # Consolidate and unpack JSON. request_schema = T.ArrayType(T.StructType(request_fields)) response_schema = T.ArrayType(T.StructType(response_field)) # w udf_request = F.udf(process_col_requests, request_schema) udf_reponse = F.udf(procces_col_response, response_schema) requests_unpacked = requests_dated.withColumn("request", udf_request("request")).\ withColumn("response", udf_reponse("response")) # Explode batched requests into individual rows. DB_PREFIX = "__db" requests_exploded = requests_unpacked \ .withColumn(f"{DB_PREFIX}_request_response", F.arrays_zip(F.col("request"), F.col("response"))) \ .withColumn(f"{DB_PREFIX}_request_response", F.explode(F.col(f"{DB_PREFIX}_request_response"))) \ .select(F.col("*"), F.col(f"{DB_PREFIX}_request_response.request.*"), F.col(f"{DB_PREFIX}_request_response.response.*")) \ .drop(f"{DB_PREFIX}_request_response", "request", "response") requests_cleaned = requests_exploded.drop("status_code", "sampling_fraction", "client_request_id", "databricks_request_id", "request_metadata") return requests_cleaned 結果の表は次のようになります。 次にシンクテーブルを初期化します dt_builder = DeltaTable.createIfNotExists(spark) \ .tableName(unpacked_requests_table_name) \ .addColumns(schema) \ .partitionedBy(requests_cleaned.schema) \ .property("delta.enableChangeDataFeed", "true") \ dt_builder.execute() 結果を書きます checkpoint_path = f"dbfs:/payload-logging/{endpoint_name}/checkpoint" requests_stream = requests_cleaned.writeStream \ .trigger(once=True) \ .format("delta") \ .partitionBy(date_col) \ .outputMode("append") \ .option("checkpointLocation", checkpoint_path) \ .toTable(unpacked_requests_table_name) \ 最後に、ベースライン テーブルを作成します。DLM はこのテーブルを使用して、ベースライン モデルとプライマリ モデルの類似した列の分布を比較することでドリフトを計算します。ベースライン テーブルには、プライマリ列と同じ特徴列と、同じモデル識別列が必要です。ベースライン テーブルには、最適なハイパーパラメータを使用してモデルをトレーニングした後に、以前に保存した の予測テーブルを使用します。ドリフト メトリックを計算するために、Databricks はプライマリ テーブルとベースライン テーブルの両方のプロファイル メトリックを計算します。 検証データセット については、こちらを参照してください。 プライマリ テーブルとベースライン テーブル #read the prediction table df_base_table = spark.table(f"{catalog_name}.{model_schema}.predictions") # create the model id and add it to the table model_id = get_model_id(endpoint_name) df_base_table = df_base_table.withColumn(model_id_col, F.lit(model_id)) #write the new table and enable the CDC on it output_base_table_name = f"{catalog_name}.{model_schema}.{base_table_prefix}_{model_name}" df_base_table.write.format("delta").mode("overwrite").saveAsTable(output_base_table_name) spark.sql(f"ALTER TABLE {output_base_table_name} SET TBLPROPERTIES (delta.enableChangeDataFeed = true)") これで監視ダッシュボードを作成する準備ができました。UI 使用して作成することもできます または Lakehouse Monitoring API を使用します。ここでは 2 番目のオプションを使用します。 を # This is where we store the metric tables. output_schema_name = f"{catalog_name}.{model_schema}" try: info = lm.create_monitor( table_name=unpacked_requests_table_name, profile_type=lm.InferenceLog( timestamp_col=timestamp_col, granularities=granularities,#the aggregation window model_id_col=model_id_col, prediction_col=prediction_col, label_col=label_col, problem_type=problem_type, ), output_schema_name=output_schema_name, schedule=None, # We will refresh the metrics on-demand in this notebook baseline_table_name=output_base_table_name, ) print(info) except Exception as e: # Ensure the exception was expected assert "RESOURCE_ALREADY_EXISTS" in str(e), f"Unexpected error: {e}" # Update the monitor if any parameters of this notebook have changed. lm.update_monitor( table_name=unpacked_requests_table_name, updated_params=dict( profile_type=lm.InferenceLog( timestamp_col=timestamp_col, granularities=granularities, model_id_col=model_id_col, prediction_col=prediction_col, label_col=label_col, problem_type=problem_type, ), output_schema_name=output_schema_name, schedule=None, baseline_table_name=output_base_table_name, ) ) # Refresh metrics calculated on the requests table. refresh_info = lm.run_refresh(table_name=unpacked_requests_table_name) print(refresh_info) コードを実行した後、Databricks がすべてのメトリックを計算するまでにはしばらく時間がかかります。ダッシュボードを表示するには、シンク テーブル (つまり、 ) の タブに移動します。次のようなページが表示されます。 unpacked_requests_table_name Quality 表示をクリックすると、実行中、保留中、過去の更新が表示されます。ダッシュボードを開くには、 をクリックします。 refresh history View Dashboard そこで、推論テーブル ( ) から始めて、それを処理して結果を に保存し、このテーブルをベースライン テーブル ( ) とともに監視 API に渡します。DLM は各テーブルのプロファイル メトリック ( ) を計算し、それらを使用してドリフト メトリック ( ) を計算します。 my_endpoint_payload my_endpoint_payload_unpacked base_table_als my_endpoint_payload_unpacked_profile_metric my_endpoint_payload_unpacked_drift_metrics これで完了です。モデルを提供および監視するために必要なものがすべて揃いました。 次のパートでは、 と 使用してこのプロセスを自動化する方法を紹介します。 Databricks Assets Bundle Gitlab を では、Databricks のリファレンス アーキテクチャに沿って、Databricks と Spark を使用してエンドツーエンドの MLOps パイプラインを構築するための最初の手順を実行しました。ここで、説明した主要な手順を要約します。 このチュートリアル シリーズの最初の部分 : Unity カタログ内でデータをブロンズ、シルバー、ゴールドのレイヤーに整理し、構造化された効率的なデータ管理システムを確立しました。 Medallion Architecture 用の Unity カタログの設定 : 生データをシステムにインポートし、後続の処理段階で一貫性と品質を確保する方法を説明しました。 Unity カタログへのデータの取り込み : Databricks を利用して、スケーラブルで効果的なモデル開発のベスト プラクティスに従い、データセットに合わせてカスタマイズされた機械学習モデルをトレーニングしました。 モデルのトレーニング : モデルのパフォーマンスを向上させるために、HyperOpt を使用して最適なハイパーパラメータの検索を自動化し、精度と効率を向上させました。 HyperOpt によるハイパーパラメータの調整 : MLflow を使用して実験を記録および監視し、モデルのバージョン、メトリック、およびパラメーターの包括的な記録を維持して、簡単に比較および再現できるようにしました。 Databricks MLflow を使用した実験の追跡 これらの基本的な手順が完了すると、モデルを展開する準備が整います。この第 2 部では、次の 2 つの重要なコンポーネントをシステムに統合することに焦点を当てます。 : 一括スコアリングや定期レポートなどのアプリケーションに適した、大規模なデータセットで予測を生成するバッチ処理を実装します。 バッチ推論 : インタラクティブなアプリケーションやサービスに不可欠な即時予測を提供するためのリアルタイム モデル サービングを設定します。 オンライン推論 (モデル サービング) デプロイされたモデルが長期にわたって最適なパフォーマンスと信頼性を維持できるようにします。 モデル監視: さあ始めましょう! モデルの展開 前回のブログの出発点はモデル評価でした。比較を行った結果、この実稼働モデルと比較して、私たちのモデルの方がパフォーマンスが高いことがわかったとします。実稼働環境でモデルを使用する (と想定している) ため、所有するすべてのデータを活用したいと考えています。次のステップは、完全なデータセットを使用してモデルをトレーニングおよびテストすることです。次に、チャンピオン モデルとしてデプロイして、後で使用するためにモデルを永続化します。これは推論に使用する最終的なモデルであるため、特徴エンジニアリング クライアントを使用してモデルをトレーニングします。この方法では、モデル系統の追跡が容易になるだけでなく、スキーマ検証と特徴変換 (ある場合) をクライアントにオフロードできます。 with mlflow.start_run(run_name="ALS_best_model") as run: als = ALS() # Now we set the parameters for the method als.setMaxIter(MAX_ITER)\ .setSeed(SEED)\ .setRegParam(best_params["REG_PARAM"])\ .setUserCol(COL_USER)\ .setItemCol(COL_ITEM)\ .setRatingCol(COL_LABEL)\ .setRank(best_params["RANK"]) mlflow.log_param("MAX_ITER", MAX_ITER) mlflow.log_param("RANK", best_params["RANK"]) mlflow.log_param("REG_PARAM", best_params["REG_PARAM"]) # Create the model with these parameters. model = als.fit(df_full_data) #drop predictions where users and products from the test test and didn't make it into the training set. in this case, the prediction is NaN model.setColdStartStrategy('drop') predictions = model.transform(df_full_data) signature = infer_signature(model_input = df_full_data, model_output = predictions.select(COL_LABEL)) #log the model mlflow.spark.log_model(model, model_name, sample_input=df_full_data.limit(3), signature = signature, conda_env=mlflow.spark.get_default_conda_env(), registered_model_name=f"{catalog_name}.{model_schema}.{model_name}") evaluator = RegressionEvaluator(predictionCol=COL_PRED, labelCol=COL_LABEL) rmse = evaluator.setMetricName("rmse").evaluate(predictions) mlflow.log_metric('rmse', rmse) 使用してモデルをトレーニングし、ログに記録することもできます。 Feature StoreやFeature Engineering APIを model_info = fe.log_model(model=model, artifact_path = model_name, flavor=mlflow.spark, training_set=fe_full_data, conda_env=mlflow.spark.get_default_conda_env(), registered_model_name= f"{catalog_name}.{model_schema}.{model_name}" ) 特徴量エンジニアリングAPIを使用すると、カタログエクスプローラーでモデルの系統を表示できます。 次に、モデルの説明を更新し、Champion ラベルを割り当てます。 import time from mlflow.tracking.client import MlflowClient from mlflow.entities.model_registry.model_version_status import ModelVersionStatus client = MlflowClient() #find the latest model version model_name_path = f"{catalog_name}.{model_schema}.{model_name}" model_version_infos = client.search_model_versions(f"name ='{model_name_path}'") new_model_version = max([int(model_version_info.version) for model_version_info in model_version_infos]) #add the model and model version descirption client.update_registered_model( name=model_name_path, description="collaborative filtering using Spark mllib ALS. This model use rating table" ) client.update_model_version( name=model_name_path, version=new_model_version, description="this model is optimized Rank and REG_PARAM with Hyperopt and rmse as a loss function. trained on the full dataset" ) # assign alias client.set_registered_model_alias(model_name_path, "Champion", new_model_version) 次に、モデルを登録したスキーマを確認します。次のようにすべての更新が表示されます。 : モデルレジストリにワークスペースを使用する場合は、モデルを管理するためにステージを使用する必要があります。エイリアスは使用できません。 参照してください。 どのように機能するかを見る モデルステージ こちらを モデル推論 バッチスコアリング ここで、推論のために本番環境でモデルを使用したいとします。このステップでは、チャンピオン モデルをロードし、それを使用して各ユーザーに対して 20 本の映画の推奨を生成します。 from mlflow.spark import load_model as spark_load_model from mlflow.tracking.client import MlflowClient from create_training_set import split_data #-- set UC as model registray mlflow.set_registry_uri("databricks-uc") #-- initate mlflow client client = MlflowClient() # -- read the config file with open('config.json') as config_file: config = json.load(config_file) catalog_name = config["catalog_name"] gold_layer = config["gold_layer_name"] silver_layer = config["silver_layer_name"] user_item_table_name = config["user_item_table_name"] ft_user_item_name = config["ft_user_item_name"] model_name = config["model_name"] model_schema = config["model_schema"] #-- create the model uri model_path = f"{catalog_name}.{model_schema}.{model_name}" # --create the model_uri: there are two ways to do this # 1: using the alias (we use this*) model_version_uri = f"models:/{model_uri}@champion" # 2: using model version #champion_version = client.get_model_version_by_alias(model_uri, "champion") #model_version_uri = f"models:/{model_uri}/{champion_version.version}" # -- load the model pipline and exctract the model model_pipeline = spark_load_model(model_version_uri) model = model_pipeline.stages[0] # -- batch scoring using the the model fe_full_data, df_full_data, df_train, df_test = split_data() df_batch_input = df_full_data.drop("rating") df_scores = model.transform(df_batch_input) # --- in case you used Feature Engineering to train and register model #from databricks.feature_engineering import FeatureEngineeringClient #fe = FeatureEngineeringClient() # fe.score_batch(model_uri=f"{model_version_uri}",df = df_batch_input) バッチ スコアリングに同じトレーニング データを使用したことがわかります。レコメンデーション システムの場合は理にかなっています。ただし、ほとんどのアプリケーションでは、モデルを使用して、目に見えないデータをスコアリングする必要があります。たとえば、Netflix を視聴していて、1 日の終わりに新しい視聴リストに基づいてユーザー レコメンデーションを更新したいとします。1 日の終わりの特定の時間にバッチ スコアリングを実行するジョブをスケジュールできます。 では、各ユーザーに対するおすすめを生成してみましょう。ユーザーごとに上位20個のアイテムを見つけます。 from pyspark.sql.window import Window from pyspark.sql.functions import col, split, row_number, collect_list from pyspark.sql.functions import col, collect_list, expr, lit, min, row_number, desc windowSpec = Window.partitionBy("user_id").orderBy(col("prediction").desc()) df_top_20_items = df_scores.withColumn("rank", row_number().over(windowSpec)).filter(col("rank") <= 20) df_user_recs = df_top_20_items.groupBy("user_id") \ .agg(collect_list(col("item_id").cast("double")).alias("top_item_ids")) 結果は次のようになります 最後に、予測をデルタラベルとしてUCに保存するか、下流のシステムMongo DBまたはAzure Cosmos DBに公開することができます。最初のオプションを選択します。 df_user_recs.write.mode("overwrite").saveAsTable(f"{catalog_name}.{output_schema}.top20_item_recommendations") ストリーミング/オンライン推論 ここで、リアルタイムのユーザー インタラクションに基づいて推奨事項を更新するケースを想像してください。このケースでは、モデル サービングを使用できます。誰かがモデルを使用したい場合、データをサーバーに送信できます。サーバーは、そのデータをデプロイされたモデルに送り、モデルが動作してデータを分析し、予測を生成します。これらは、Web アプリケーション、モバイル アプリ、さらには組み込みシステムでも使用できます。このアプローチの用途の 1 つは、A/B テストのトラフィック ルーティングを有効にすることです。 ALS アルゴリズムは、推奨事項を更新するためにデータ全体 (古いデータと新しいデータ) を使用してモデルを再トレーニングする必要があるため、オンライン推論に直接使用することはできません。勾配降下学習アルゴリズムは、オンライン更新に使用できるモデルの例です。今後の投稿で、これらのアルゴリズムのいくつかについて説明する予定です。 ただし、このようなモデルがどのように機能するかを説明するために、ユーザーが映画を評価するたびにそれに基づいて映画の評価を予測する (役に立たない) モデル提供エンドポイントを作成しています。 import requests model_path = f"{catalog_name}.{model_schema}.{model_name}" champion_version = client.get_model_version_by_alias(model_path, "champion") # Set the name of the MLflow endpoint endpoint_name = config["model_serving_endpoint_name"] # Name of the registered MLflow model model_name = model_path # Specify the type of compute (CPU, GPU_SMALL, GPU_MEDIUM, etc.) workload_type = "CPU" # Specify the scale-out size of compute (Small, Medium, Large, etc.) workload_size = "Small" # Get the latest version of the MLflow model model_version = int(champion_version.version) # Specify Scale to Zero(only supported for CPU endpoints) scale_to_zero = False # Get the API endpoint and token for the current notebook context API_ROOT = dbutils.notebook.entry_point.getDbutils().notebook().getContext().apiUrl().get() API_TOKEN = dbutils.notebook.entry_point.getDbutils().notebook().getContext().apiToken().get() data = { "name": endpoint_name, "config": { "served_models": [ { "model_name": model_name, "model_version": int(model_version), "workload_size": workload_size, "scale_to_zero_enabled": scale_to_zero, "workload_type": workload_type, } ] }, } headers = {"Context-Type": "text/json", "Authorization": f"Bearer {API_TOKEN}"} response = requests.post( url=f"{API_ROOT}/api/2.0/serving-endpoints", json=data, headers=headers ) これにより、モデル サービング クラスターが作成され、起動されるため、しばらく時間がかかります。ここで、 ウィンドウを開くと、エンドポイントが表示されます。 Serving 1 つのエンドポイントを使用して複数のモデルを提供できます。その後、A/B テストなどのシナリオにトラフィック ルーティングを使用したり、本番環境で異なるモデルのパフォーマンスを比較したりできます。 推論表 Databricks Model Serving の推論テーブルは、デプロイされたモデルの自動ログとして機能します。有効にすると、受信リクエスト (予測用に送信されるデータ)、対応するモデル出力 (予測)、およびその他のメタデータが Unity Catalog 内の Delta テーブルとしてキャプチャされます。推論テーブルは、 、 、およびモデルの や ためのデータ収集手順に使用できます。 監視とデバッグ 系統追跡 再トレーニング 微調整の モデルを監視するために、サービス エンドポイントで 有効にすることができます。これを行うには、最初にエンドポイントを作成するときに、ペイロードで プロパティを指定します。または、次のように コマンドと エンドポイント URL を使用して、後でエンドポイントを更新します (詳細は inference table auto_capture_config put config )。 こちら data = { "served_models": [ { "model_name": model_name, "model_version": int(model_version), "workload_size": workload_size, "scale_to_zero_enabled": scale_to_zero, "workload_type": workload_type, } ], "auto_capture_config":{ "catalog_name": catalog_name, "schema_name": model_schema, "table_name_prefix": payload_table, } } headers = {"Context-Type": "application/json", "Authorization": f"Bearer {API_TOKEN}"} response = requests.put(url=f"{API_ROOT}/api/2.0/serving-endpoints/{endpoint_name}/config", json=data, headers=headers) print(json.dumps(response.json(), indent=4)) では、エンドポイントにダミーのユーザーインタラクションデータを入力してみましょう。 import random import time all_items = df_full_data.select(col("item_id")).distinct() for user_id in range(50,54): items_not_rated_by_user = df_full_data.where(col("user_id")==user_id).select(col("item_id")).distinct()#collect()[0][0] no_rated_items = [item.item_id for item in all_items.subtract(items_not_rated_by_user).limit(4).collect()] data = { "dataframe_records": [ {"user_id":user_id, "item_id":no_rated_items[0], "rating": random.randint(1, 5)}, {"user_id":user_id, "item_id":no_rated_items[1], "rating": random.randint(1, 5)}, {"user_id":user_id, "item_id":no_rated_items[2], "rating": random.randint(1, 5)}, {"user_id":user_id, "item_id":no_rated_items[2], "rating": random.randint(1, 5)}, ] } response = requests.post( url=f"{API_ROOT}/serving-endpoints/{endpoint_name}/invocations", json=data, headers=headers ) # generate the data within the timespan of 1 to 8 minutes time.sleep(random.randint(60*1, 60*8)) エンドポイント ログは テーブルで確認できます。テーブルにデータが表示されるまでに約 10 分かかります。 <catalog>.<schema>.<payload_table> table_path = f"{catalog_name}.{model_schema}.{payload_table}" # Read data from the inference table df_inf_table = spark.read.table(table_path) display(df_inf_table ) ペイロードテーブルには次のようなものが表示されます この推論テーブルのスキーマを理解するには、 「Unity カタログ推論テーブル スキーマ」を確認してください。 ここにある モデル監視 モデルとデータの監視は、習得に長い時間を要する複雑なトピックです。Databricks Lakehouse Monitoring (DLM) は、一般的なユースケース向けの標準のカスタマイズ可能なテンプレートを提供することで、適切な監視システムを構築するオーバーヘッドを削減します。ただし、DLM とモデル監視を習得するには、一般的に多くの実験が必要です。ここでは、モデル監視の広範な概要を説明するのではなく、出発点を示します。今後、このトピックに関するブログを書くかもしれません。 DLMの機能と特徴の簡単な概要 モデルが稼働している状態で、サービス エンドポイントによって生成された推論テーブルを使用して、モデルのパフォーマンスやドリフトなどの主要なメトリックを監視し、時間の経過とともにデータやモデルの偏差や異常を検出できます。このプロアクティブなアプローチにより、モデルの再トレーニングや機能の更新などの適切な修正アクションをタイムリーに実行して、最適なパフォーマンスを維持し、ビジネス目標との整合性を保つことができます。 DLM は、 、 、 の 3 種類の分析または を提供します。推論テーブルの分析に関心があるため、後者に焦点を当てます。監視用のテーブル (「 」) を使用するには、テーブルの構造が正しいことを確認する必要があります。 各行は次の列を持つリクエストに対応している必要があります。 時系列 スナップショット 推論 profile type プライマリ テーブル の場合、 推論テーブル モデルの特徴 モデル予測 モデルID : 推論リクエストのタイムスタンプ タイムスタンプ (オプション) グラウンドトゥルース 複数のモデルを提供し、1 つの監視ダッシュボードで各モデルのパフォーマンスを追跡する場合に重要です。複数のモデル ID が使用可能な場合、DLM はそれを使用してデータをスライスし、スライスごとにメトリックと統計を個別に計算します。 モデル ID は、 DLM は、指定された時間間隔の各統計とメトリックを計算します。推論分析では、 列とユーザー定義のウィンドウ サイズを使用して時間ウィンドウを識別します。詳細は以下を参照してください。 タイムスタンプ DLM は推論テーブルに対して「 」または「 」という 2 つの をサポートしています。この仕様に基づいて、関連するメトリックと統計の一部を計算します。 分類 回帰 problem type DLM を使用するには、モニターを作成し、それをテーブルにアタッチする必要があります。これを行うと、DLM は 2 つの を作成します。 metric tables : このテーブルには、最小値、最大値、null とゼロの割合などの要約統計が含まれています。また、ユーザーが定義した問題の種類に基づく追加のメトリックも含まれます。たとえば、分類モデルの 、 、 、回帰モデルの と などです。 プロファイル メトリック テーブル 精度 再現率 f1_score mean_squared_error mean_average_error : データの分布が 、または と比較してどのように変化したかを測定する統計が含まれます。カイ二乗検定、KS 検定などの指標を計算します。 ドリフト メトリック テーブル 時間の経過と共に ベースライン値 (提供されている場合) 各テーブルの完全なメトリックのリストを表示するには、 ドキュメント ページを確認してください。 モニター メトリック テーブルの を作成することもできます。 カスタム メトリック 監視システムを構築する上で重要な点は、最新の推論データが到着するとすぐに監視ダッシュボードがそれらにアクセスできるようにすることです。そのためには、 使用して、推論テーブルで処理された行を追跡できます。モデル サービングの推論テーブルをソース テーブル ( ) として使用し、監視テーブルをシンク テーブル ( ) として使用します。また、両方のテーブルで (CDC) が有効になっていることを確認します (推論テーブルではデフォルトで有効になっています)。このようにして、更新のたびにテーブル全体を再処理するのではなく、ソース テーブルの変更 (挿入/更新/削除) のみを処理します。 デルタ テーブル ストリーミングを readStream writeStream 変更データ キャプチャ 実践 推論テーブルの監視を有効にするには、次の手順を実行します。 推論テーブルをストリーミングテーブルとして読み取る モデル提供エンドポイントによって生成された推論テーブルを解凍して、適切なスキーマを持つ新しいデルタ テーブルを作成します。 ベースラインテーブルを準備する(ある場合) 結果のテーブルにモニターを作成し、メトリックを更新します。 推論テーブルを適切な構造に解凍し、メトリックを更新するワークフローをスケジュールします。 まず、Lakehouse Monitoring API をインストールする必要があります。Databricks rum time 15.3 LTS 以上を使用している場合は、すでにインストールされているはずです。 %pip install "https://ml-team-public-read.s3.amazonaws.com/wheels/data-monitoring/a4050ef7-b183-47a1-a145-e614628e3146/databricks_lakehouse_monitoring-0.4.14-py3-none-any.whl" dbutils.library.restartPython() 推論テーブルをストリーミングテーブルとして読み込んでみましょう requests_raw = spark.readStream\ .format("delta")\ .table(inference_table_name) requests_raw.isStreaming #-> True 次に、上記のようにテーブルを正しい形式にする必要があります。このテーブルには、関連する機能と予測値を含む予測ごとに 1 つの行が必要です。モデル提供エンドポイントから取得する推論テーブルには、エンドポイントの要求と応答がネストされた JSON 形式で保存されます。以下は、要求列と応答列の JSON ペイロードの例です。 #requests {"dataframe_records": [ {"user_id": 1, "item_id": 346, "rating": 5}, {"user_id": 1, "item_id": 377, "rating": 2}, {"user_id": 1, "item_id": 302, "rating": 4} ] } #reponse {"predictions": [4.248899936676025, 1.1172138452529907, 4.279165744781494] } # --> what we need | user_id | item_id | rating | prediction | |---------|---------|--------|------------| | 1 | 346 | 5 | 4.248900 | | 1 | 377 | 2 | 1.117214 | | 1 | 302 | 4 | 4.279166 | このテーブルを適切なスキーマに解凍するには、Databricks ドキュメント ( ) から改変した次のコードを使用できます。 推論テーブル Lakehouse 監視スターター ノートブック # define the schema of the request and reponse fields in the inference tabel REQUEST_FIELDS = [StructField('user_id', IntegerType(), False),\ StructField('item_id', IntegerType(), False),\ StructField('rating', IntegerType(), False)\ ] RESPONSE_FIELD = [T.StructField("predictions", FloatType())] def process_col_requests(json_str:str) -> str: """ to proccess the JSON payload of request column in inference table """ request = json.loads(json_str) dataframe_records = request.get("dataframe_records", []) return dataframe_records def procces_col_response(json_str: str) -> str: """ to proccess the JSON payload of reponse column in inference table """ reponse = json.loads(json_str) output = [{prediction_col: round(prediction,4)} for prediction in reponse["predictions"]] return output def get_model_id(endpoint_name: str) -> str: """ create the model id by concatinating the model name and the model version. note: the assumption is the endpoint serves only one model """ served_models = get_served_models(endpoint_name) model_id = f"{served_models[0]['model_name']}_{served_models[0]['model_version']}" return model_id def process_requests(requests_raw: DataFrame, request_fields: List[T.StructField], response_field: T.StructField, endpoint_name: str) -> DataFrame: """ Takes a stream of raw requests and processes them by: - Unpacking JSON payloads for requests and responses - Exploding batched requests into individual rows - Converting Unix epoch millisecond timestamps to be Spark TimestampType :param requests_raw: DataFrame containing raw requests. Assumed to contain the following columns: - `request` - `response` - `timestamp_ms` :param request_fields: List of StructFields representing the request schema :param response_field: A StructField representing the response schema :return: A DataFrame containing processed requests """ # Convert the timestamp milliseconds to TimestampType for downstream processing. requests_timestamped = requests_raw \ .withColumn(timestamp_col, (F.col("timestamp_ms") / 1000).cast(T.TimestampType())) \ .drop("timestamp_ms") # create the model identifier column model_id = get_model_id(endpoint_name) # Convert the model name and version columns into a model identifier column. requests_identified = requests_timestamped \ .withColumn(model_id_col, F.lit(model_id)) # Rename the date column to avoid collisions with features. requests_dated = requests_identified.withColumnRenamed("date", date_col) # Consolidate and unpack JSON. request_schema = T.ArrayType(T.StructType(request_fields)) response_schema = T.ArrayType(T.StructType(response_field)) # w udf_request = F.udf(process_col_requests, request_schema) udf_reponse = F.udf(procces_col_response, response_schema) requests_unpacked = requests_dated.withColumn("request", udf_request("request")).\ withColumn("response", udf_reponse("response")) # Explode batched requests into individual rows. DB_PREFIX = "__db" requests_exploded = requests_unpacked \ .withColumn(f"{DB_PREFIX}_request_response", F.arrays_zip(F.col("request"), F.col("response"))) \ .withColumn(f"{DB_PREFIX}_request_response", F.explode(F.col(f"{DB_PREFIX}_request_response"))) \ .select(F.col("*"), F.col(f"{DB_PREFIX}_request_response.request.*"), F.col(f"{DB_PREFIX}_request_response.response.*")) \ .drop(f"{DB_PREFIX}_request_response", "request", "response") requests_cleaned = requests_exploded.drop("status_code", "sampling_fraction", "client_request_id", "databricks_request_id", "request_metadata") return requests_cleaned 結果の表は次のようになります。 次にシンクテーブルを初期化します dt_builder = DeltaTable.createIfNotExists(spark) \ .tableName(unpacked_requests_table_name) \ .addColumns(schema) \ .partitionedBy(requests_cleaned.schema) \ .property("delta.enableChangeDataFeed", "true") \ dt_builder.execute() 結果を書きます checkpoint_path = f"dbfs:/payload-logging/{endpoint_name}/checkpoint" requests_stream = requests_cleaned.writeStream \ .trigger(once=True) \ .format("delta") \ .partitionBy(date_col) \ .outputMode("append") \ .option("checkpointLocation", checkpoint_path) \ .toTable(unpacked_requests_table_name) \ 最後に、ベースライン テーブルを作成します。DLM はこのテーブルを使用して、ベースライン モデルとプライマリ モデルの類似した列の分布を比較することでドリフトを計算します。ベースライン テーブルには、プライマリ列と同じ特徴列と、同じモデル識別列が必要です。ベースライン テーブルには、最適なハイパーパラメータを使用してモデルをトレーニングした後に、以前に保存した の予測テーブルを使用します。ドリフト メトリックを計算するために、Databricks はプライマリ テーブルとベースライン テーブルの両方のプロファイル メトリックを計算します。 検証データセット については、こちらを参照してください。 プライマリ テーブルとベースライン テーブル #read the prediction table df_base_table = spark.table(f"{catalog_name}.{model_schema}.predictions") # create the model id and add it to the table model_id = get_model_id(endpoint_name) df_base_table = df_base_table.withColumn(model_id_col, F.lit(model_id)) #write the new table and enable the CDC on it output_base_table_name = f"{catalog_name}.{model_schema}.{base_table_prefix}_{model_name}" df_base_table.write.format("delta").mode("overwrite").saveAsTable(output_base_table_name) spark.sql(f"ALTER TABLE {output_base_table_name} SET TBLPROPERTIES (delta.enableChangeDataFeed = true)") これで監視ダッシュボードを作成する準備ができました。UI 使用して作成することもできます または Lakehouse Monitoring API を使用します。ここでは 2 番目のオプションを使用します。 を # This is where we store the metric tables. output_schema_name = f"{catalog_name}.{model_schema}" try: info = lm.create_monitor( table_name=unpacked_requests_table_name, profile_type=lm.InferenceLog( timestamp_col=timestamp_col, granularities=granularities,#the aggregation window model_id_col=model_id_col, prediction_col=prediction_col, label_col=label_col, problem_type=problem_type, ), output_schema_name=output_schema_name, schedule=None, # We will refresh the metrics on-demand in this notebook baseline_table_name=output_base_table_name, ) print(info) except Exception as e: # Ensure the exception was expected assert "RESOURCE_ALREADY_EXISTS" in str(e), f"Unexpected error: {e}" # Update the monitor if any parameters of this notebook have changed. lm.update_monitor( table_name=unpacked_requests_table_name, updated_params=dict( profile_type=lm.InferenceLog( timestamp_col=timestamp_col, granularities=granularities, model_id_col=model_id_col, prediction_col=prediction_col, label_col=label_col, problem_type=problem_type, ), output_schema_name=output_schema_name, schedule=None, baseline_table_name=output_base_table_name, ) ) # Refresh metrics calculated on the requests table. refresh_info = lm.run_refresh(table_name=unpacked_requests_table_name) print(refresh_info) コードを実行した後、Databricks がすべてのメトリックを計算するまでにはしばらく時間がかかります。ダッシュボードを表示するには、シンク テーブル (つまり、 ) の タブに移動します。次のようなページが表示されます。 unpacked_requests_table_name Quality 表示をクリックすると、実行中、保留中、過去の更新が表示されます。ダッシュボードを開くには、 をクリックします。 refresh history View Dashboard そこで、推論テーブル ( ) から始めて、それを処理して結果を に保存し、このテーブルをベースライン テーブル ( ) とともに監視 API に渡します。DLM は各テーブルのプロファイル メトリック ( ) を計算し、それらを使用してドリフト メトリック ( ) を計算します。 my_endpoint_payload my_endpoint_payload_unpacked base_table_als my_endpoint_payload_unpacked_profile_metric my_endpoint_payload_unpacked_drift_metrics これで完了です。モデルを提供および監視するために必要なものがすべて揃いました。 次のパートでは、 と 使用してこのプロセスを自動化する方法を紹介します。 Databricks Assets Bundle Gitlab を