Trong các đường ống dẫn dữ liệu hiện đại, dữ liệu thường đi vào Các định dạng này linh hoạt, cho phép các mối quan hệ phân cấp và mảng, nhưng chúng có thể khó xử lý với các cách tiếp cận quan hệ truyền thống. . nested JSON or XML formats analytics, reporting, or loading into data warehouses like Snowflake Trong blog này, chúng tôi khám phá một sử dụng một chức năng tái sử dụng, . dynamic, recursive approach to parsing nested JSON and XML in Spark flatten_df_recursive Real-World Scenario Hãy tưởng tượng làm việc tại một : e-commerce company Dữ liệu khách hàng được lưu trữ trong các tệp JSON với cấu trúc niêm phong cho địa chỉ, đơn đặt hàng và thông tin thanh toán. Chi tiết đơn đặt hàng có thể chứa hàng loạt các mặt hàng, mỗi mặt hàng có thông tin sản phẩm. Legacy Partners gửi các tệp XML để cập nhật hàng tồn kho và vận chuyển. Your goal: Tải dữ liệu này vào Databricks/Spark. Phẳng tất cả các cấu trúc niêm phong một cách năng động (không có tên cột mã cứng). Lưu kết quả phẳng cho phân tích, ML hoặc báo cáo. Challenges: Mức độ không xác định Tích hợp với các cấu trúc nested. Thay đổi lịch trình thường xuyên Đây chính xác là nơi đến trong tay. recursive flattening The Recursive Flatten Function Đây là chức năng cốt lõi: from pyspark.sql.types import StructType, ArrayType from pyspark.sql.functions import col, explode_outer def flatten_df_recursive(df): """ Recursively flattens all nested StructType and ArrayType columns in a Spark DataFrame. Supports multiple nested levels for JSON/XML data. """ # Track complex fields (StructType or ArrayType) complex_fields = [(field.name, field.dataType) for field in df.schema.fields if isinstance(field.dataType, (StructType, ArrayType))] while complex_fields: col_name, col_type = complex_fields.pop(0) # If StructType, expand its fields with aliases if isinstance(col_type, StructType): expanded_cols = [ col(f"{col_name}.{nested_field.name}").alias(f"{col_name}_{nested_field.name}") for nested_field in col_type.fields ] df = df.select("*", *expanded_cols).drop(col_name) # If ArrayType, explode the array elif isinstance(col_type, ArrayType): df = df.withColumn(col_name, explode_outer(col(col_name))) # Refresh the complex fields list after modifications complex_fields = [(field.name, field.dataType) for field in df.schema.fields if isinstance(field.dataType, (StructType, ArrayType))] return df Key Features: Hoàn toàn năng động: Xử lý bất kỳ số lượng cấu trúc hoặc cấp độ mảng được niêm phong. Không có mã hóa cứng: Hoạt động ngay cả khi các trường mới được thêm sau. Hỗ trợ Array: Tự động bùng nổ hàng loạt thành hàng. XML & JSON thân thiện: Làm việc với bất kỳ dữ liệu phân cấp nào được tải vào Spark. Reading Nested JSON in Spark Đọc tập tin JSON với multiline df = spark.read.option("multiline", "true").json("dbfs:/FileStore/temp/orders.json") display(df) Chuyển JSON Data Frame vào sẽ , chuẩn bị dữ liệu để phân tích hoặc báo cáo. flatten_df_recursive flatten all nested structs and arrays flat_df = flatten_df_recursive(df) display(flat_df) Reading Nested XML in Spark xml_path = "/dbfs/data/nested_orders.xml" df = spark.read.format("xml") \ .option("rowTag", "order") \ .load(xml_path) flat_df = flatten_df_recursive(df) flat_df.show(truncate=False) "rowTag" chỉ định các yếu tố XML lặp đi lặp lại đại diện cho một bản ghi. Các yếu tố và mảng niêm yết được tự động xử lý bởi “flatten_df_recursive”. Why This Approach Matters Dynamic Schema Handling: Không cần phải viết lại logic phẳng nếu cấu trúc JSON/XML thay đổi. Hỗ trợ Multi-Level Nesting: Hoạt động cho các cấu trúc và hàng loạt được niêm phong sâu. Scalable: Có thể xử lý các tập tin lớn trên Spark mà không tải mọi thứ vào bộ nhớ. Có thể tái sử dụng: Hoạt động cho bất kỳ nguồn nào — JSON, XML, Snowflake, Delta hoặc Parquet với cấu trúc niêm phong. Summary Nested JSON và XML là phổ biến trong các đường ống dữ liệu hiện đại, nhưng các phương pháp phẳng truyền thống thất bại với tổ hợp sâu hoặc thay đổi sơ đồ. Bạn có thể: recursive, dynamic Spark flattening function Tự động làm phẳng bất kỳ độ sâu của tổ. Xử lý hàng rào và cấu trúc liền mạch. Tích hợp với phân tích, đường ống ML và kho như Snowflake. Cách tiếp cận này đặc biệt hữu ích cho , và Cấu trúc dữ liệu phát triển thường xuyên. ETL pipelines data lake ingestion reporting systems Ở đây tôi đã đính kèm toàn bộ báo cáo để tham khảo của bạn. Notebook báo cáo