現代のデータパイプラインでは、データはしばしば入ってくる。 これらのフォーマットは柔軟で、階層的な関係や配列を可能にしますが、伝統的な関係アプローチと対処するのに挑戦する可能性があります。 . nested JSON or XML formats analytics, reporting, or loading into data warehouses like Snowflake このブログでは、A 再利用可能な機能として、 . dynamic, recursive approach to parsing nested JSON and XML in Spark flatten_df_recursive Real-World Scenario Imagine working at an : e-commerce company 顧客データは、アドレス、注文、支払い情報のための組み込まれた構造を持つJSONファイルに格納されます。 注文詳細は、それぞれに組み込まれた製品情報が含まれるアイテムの配列を含む場合があります。 Legacy Partners では、XML ファイルを送信して、在庫および配送の更新を行うことができます。 Your goal: このデータを Databricks/Spark にアップロードします。 すべての組み込まれた構造をダイナミックにフラット(ハードコードの列名なし)します。 アナリティクス、ML、またはレポートのためのフラットな出力を保存します。 Challenges: 未知のニットレベル 巣立った構造を構成する。 頻繁にスケジュール変更 これはまさに、どこに ハンドイッチで来る。 recursive flattening The Recursive Flatten Function 以下がコア機能です。 from pyspark.sql.types import StructType, ArrayType from pyspark.sql.functions import col, explode_outer def flatten_df_recursive(df): """ Recursively flattens all nested StructType and ArrayType columns in a Spark DataFrame. Supports multiple nested levels for JSON/XML data. """ # Track complex fields (StructType or ArrayType) complex_fields = [(field.name, field.dataType) for field in df.schema.fields if isinstance(field.dataType, (StructType, ArrayType))] while complex_fields: col_name, col_type = complex_fields.pop(0) # If StructType, expand its fields with aliases if isinstance(col_type, StructType): expanded_cols = [ col(f"{col_name}.{nested_field.name}").alias(f"{col_name}_{nested_field.name}") for nested_field in col_type.fields ] df = df.select("*", *expanded_cols).drop(col_name) # If ArrayType, explode the array elif isinstance(col_type, ArrayType): df = df.withColumn(col_name, explode_outer(col(col_name))) # Refresh the complex fields list after modifications complex_fields = [(field.name, field.dataType) for field in df.schema.fields if isinstance(field.dataType, (StructType, ArrayType))] return df Key Features: 完全にダイナミック: 組み込まれた構造またはマレーレベルのいかなる数でも処理できます。 ハードコードなし:新しいフィールドが後で追加される場合でも動作します。 マレー サポート: 自動的にマレーを行に爆発します。 XML & JSON フレンドリー: Spark にロードされた任意の階層データで動作します。 Reading Nested JSON in Spark マルチラインでJSONファイルを読み取る df = spark.read.option("multiline", "true").json("dbfs:/FileStore/temp/orders.json") display(df) JSON Data Frame を移行する ウィル データを分析またはレポートのために準備する。 flatten_df_recursive flatten all nested structs and arrays flat_df = flatten_df_recursive(df) display(flat_df) Reading Nested XML in Spark xml_path = "/dbfs/data/nested_orders.xml" df = spark.read.format("xml") \ .option("rowTag", "order") \ .load(xml_path) flat_df = flatten_df_recursive(df) flat_df.show(truncate=False) “rowTag” は、1 つのレコードを表す XML 要素の繰り返しを示します。 Nested elements and array are automatically handled by “flatten_df_recursive” (フラッテン_df_recursive) です。 Why This Approach Matters Dynamic Schema Handling: JSON/XML 構造が変更された場合、フラッティング ロジックを書き換える必要はありません。 Multi-Level Nesting: 深く埋め込まれた構造やアーレイのために機能します。 スケーラブル: メモリにすべてをロードすることなく、Sparkで大きなファイルを処理できます。 再利用可能: JSON、XML、Snowflake、Delta、または組み込まれた構造を持つパルケートのいずれかのソースで動作します。 Summary Nested JSON および XML は、現代のデータパイプラインでは一般的ですが、伝統的なフラットニングアプローチは、深いニストリングやスケジュールの変更で失敗します。 あなたは: recursive, dynamic Spark flattening function すべての深さを自動的に表します。 構造や構造をきれいに扱う。 アナリティクス、MLパイプライン、Snowflakeなどの倉庫との統合 このアプローチは特に役に立ち、 で、 そして、 データ構造が頻繁に進化する。 ETL pipelines data lake ingestion reporting systems ここでは、あなたの参考に全文を添付しました。 ノートブックレポート