Kiểm thử chất lượng dữ liệu hiệu quả với Great Expectations

Nếu bạn đang băn khoăn tìm kiếm một công cụ hữu ích nhằm giải quyết những khó khăn trong việc kiểm tra chất lượng dữ liệu thì Great Expectations chính là một ứng cử viên sáng giá.

Upload image

Trong thời đại mới, dữ liệu là nhân tố quan trọng thúc đẩy quá trình ra quyết định, hoạch định chiến lược và lợi thế cạnh tranh. Do đó, việc đảm bảo dữ liệu chất lượng cao là điều cần thiết để có thể khai thác hiệu quả tiềm năng, tối ưu hóa hoạt động của một tổ chức.

Tuy nhiên, đảm bảo chất lượng dữ liệu là một quy trình phức tạp, đòi hỏi kĩ năng, kiến thức, và cả sự hợp tác của kỹ sư dữ liệu, người phân tích dữ liệu,... Trong thực tế, bước kiểm tra chất lượng dữ liệu thường bị "làm ngơ" trong quá trình phát triển hệ thống phân tích và xử lý dữ liệu.

Đối đầu với thách thức trên, công cụ Great Expectations nổi lên như một giải pháp kiểm thử chất lượng dữ liệu đơn giản và hiệu quả, đáp ứng được các tiêu chí như: dễ cài đặt, dễ tiếp cận, dễ sử dụng và có khả năng tích hợp với nhiều công cụ, dịch vụ khác.

Công cụ Great Expectations

Great Expectations (GX) là công cụ mã nguồn mở dựa trên ngôn ngữ Python để kiểm tra chất lượng dữ liệu, giúp đảm bảo tính chính xác và toàn vẹn của dữ liệu trong data pipeline. GX cung cấp phương pháp đơn giản và linh hoạt để cài đặt các "kì vọng" - expectation mà dữ liệu cần tuân thủ, qua đó giúp tự động hóa quá trình kiểm tra, phát hiện lỗi và bất thường trong dữ liệu từ sớm, tăng độ tin cậy vào chất lượng dữ liệu. GX có khả năng tích hợp vào nhiều platform và framework, phù hợp với nhu cầu của người kỹ sư và nhà phân tích dữ liệu.

Upload image

Những tính năng nổi bật của Great Expectations có thể kể đến như:

  • Expectations: Cài đặt "kỳ vọng" về dữ liệu bằng các hàm Python đơn giản, dễ đọc. GX cũng cung cấp sẵn một thư viện gồm rất nhiều các expectation thường sử dụng.
  • Automated Data Profiling: Quan sát dữ liệu, thống kê và tự động sinh ra tập expectation phù hợp với dữ liệu, giúp người dùng không cần viết kiểm thử cho pipeline từ đầu.
  • Data Validation: Thực thi kiểm thử trên batch / nhiều batch dữ liệu và thông báo kết quả chi tiết về những giá trị không phù hợp.
  • Data docs: Hỗ trợ tạo tài liệu về kết quả mỗi lần kiểm thử dưới dạng trang web thân thiện.
  • Hỗ trợ nhiều nguồn dữ liệu: như Pandas dataframes, Spark dataframes, SQL database, CSV files,... và cả các cloud storage như AWS S3, Google Cloud Storage,...

Trong bài viết này, hãy cùng mình tìm hiểu về Great Expectations thông qua một bài demo kiểm tra dữ liệu đơn giản.

Mô tả dữ liệu kiểm thử

Đơn vị của mình thường hay tổ chức các chương trình giao lưu về công nghệ. Dữ liệu sau mỗi sự kiện được thu thập thông qua form đăng kí, thông tin check-in / feedback, dữ liệu thống kê của công cụ họp online (như MS Teams),... Dữ liệu này sau đó được phân tích và xây dựng báo cáo để đánh giá chất lượng của chương trình. Tuy trong thực tế dữ liệu thô được tập hợp từ nhiều nguồn khác nhau và cần qua quá trình biến đổi để sử dụng, trong bài viết này, mình sẽ đơn giản hóa dữ liệu thô thu thập được qua một tập tin csv có cấu trúc như sau:

EmailRegistrationDatePositionToolDuration
octavia.volkman@samplemail02/09/2020 08:21:08DeveloperTeams/1.2.00.137651.15

Để đảm bảo chất lượng của báo cáo, các giá trị cần đảm bảo tính đúng đắn và hợp lí. Một số tiêu chí để kiểm tra tính đúng sai của các trường có thể kể đến như:

FieldCriteria
EmailEmail must be unique
RegistrationDateRegistrationDate must be after registration form is posted and before the form is closed
PositionPosition must be in a predefined set
ToolTool must be a version of allowed app
DurationDuration must be shorter than 2 hours (maximum time of event)

Chúng ta sẽ thực hiện cài đặt kiểm thử các tiêu chí này cho dữ liệu với công cụ Great Expectations.

Cài đặt Great Expectations

Chúng ta có thể cài đặt GX trên môi trường local theo hướng dẫn trên trang chủ: How to install Great Expectations locally.

Các bước tóm tắt như sau:

  • Cài đặt môi trường:

    • python version 3.7 -> 3.10 (tính đến 04.2023)
    • pip package installer
    • tạo virtual environment (bằng câu lệnh python -m venv my_venv hoặc sử dụng anaconda/miniconda)
  • Cài đặt GX:

pip install great_expectations
  • Kiểm tra version của GX:
great_expectations --version

Trong demo này, ta sẽ sử dụng GX phiên bản 0.16.7

Upload image

Sử dụng GX với Jupyter Notebook

Quá trình triển khai GX gồm 4 bước cơ bản: Khởi tạo -> Kết nối Datasource -> Tạo Expectation -> Thực thi đánh giá.

Upload image

  1. Khởi tạo project

Để khởi tạo project, ta sử dụng câu lệnh:

great_expectations init

Câu lệnh sẽ tạo một thư mục với tên great_expectations và xây dựng cấu trúc thư mục cũng như những tập tin cấu hình cần thiết cho một project GX.

Upload image

  1. Kết nối Datasource

Để kết nối và tương tác với Datasource, ta cần chọn Data Connector là thành phần giúp việc đọc dữ liệu từ nguồn và Execution Engine là thành phần "gom" dữ liệu theo batch và thực hiện validate. Trong demo này, chúng ta sẽ sử dụng local file làm Data Connector và pandas làm Execution Engine.

Great Expectations hỗ trợ câu lệnh để tạo cấu hình cho một số loại Datasource và điều chỉnh cũng như thử nghiệm nó thông qua Jupyter Notebook. Để tạo Datasource, ta sử dụng câu lệnh:

great_expectations datasource new

Ở lựa chọn nguồn dữ liệu để kết nối, ta chọn 1 (local filesystem)

Ở lựa chọn công cụ xử lý dữ liệu, ta chọn 1 (Pandas)

Tiếp theo, ta đặt tên thư mục sẽ chứa các tập tin csv cần xử lý (ở đây mình sẽ đặt là data).

Upload image

Sau khi khởi tạo, một Jupyter Notebook sẽ được triển khai ở localhost để ta hiệu chỉnh và kiểm thử. Ta có thể:

  • Đổi tên Datasource
  • Kiểm tra cấu hình Datasource bằng cách tạo tập tin csv trong thư mục lưu trữ dữ liệu mà ta đã cấu hình. Nếu cấu hình đúng, ta sẽ thấy các tập tin csv được liệt kê. Trong demo này, mình đã chuẩn bị một file csv là attendance.csv:

Upload image

Sau khi chạy tất cả các bước trong notebook, phần cấu hình Datasource sẽ được thêm vào trong tập tin great_expectation.yml.

  1. Tạo Expectation

Trong GX, tập hợp các "bài test" được gọi là Expectation Suite. Cũng như Datasource, ta có thể tạo Expectation Suite bằng câu lệnh CLI:

great_expectations suite new

Ở bước How would you like to create your Expectation Suite?, ta chọn option 2. Sau đó, ta có thể chọn Data Set (tương ứng với các tập tin được liệt kê ở bước tạo Datasource) để sử dụng.

Upload image

Trong Jupyter notebook, ta có thể:

  • Kiểm tra việc kết nối với dữ liệu.

  • Tạo các Expectations để kiểm thử:

    • Tạo thêm một cell trong mục Column Expectation(s)
    • GX cung cấp rất nhiều Expectations có sẵn tại Expectations Gallery, ta có thể tìm những Expectations phù hợp với mục tiêu kiểm thử và sử dụng trong cell đã tạo. Với yêu cầu như đầu bài đã nêu, ta có thể tạo Expectations như sau:

Upload image

Sau khi chạy xong bước Review & Save Your Expectations, tập tin json cấu hình Expectations sẽ được tạo trong thư mục expectations, và document về kết quả kiểm tra sẽ được tạo trong thư mục uncommitted\data_docs\local_site

Upload image

  1. Thực thi kiểm thử

Để thực thi kiểm thử bằng câu lệnh CLI mà không cần mở lại notebook, ta cần lưu lại checkpoint từ bước Review & Save Your Expectations. Trong cell này, ta thêm đoạn code lưu lại checkpoint.

Upload image

Tập tin yaml sẽ được tạo trong thư mục checkpoints, ta có thể thực thi kiểm thử với checkpoint này bằng câu lệnh:

great_expectations checkpoint run <checkpoint_name>

Upload image

Với mỗi lần thực thi, ta có thể xem lại kết quả chi tiết được cập nhật ở Data Docs.

Sử dụng GX với Python

Sử dụng CLI cùng Jupyter Notebook giúp ta dễ dàng tạo Expectations Suite và thực thi kiểm thử. Tuy nhiên trong thực tế, đôi lúc ta cần có lựa chọn linh hoạt hơn. Vì thế, trong phần này, ta sẽ xem qua cách sử dụng GX với Python code.

Về workflow, ta vẫn sẽ thực hiện 4 bước như phần trên. Nhưng để phần demo mới mẻ hơn, thay vì sử dụng local file làm Datasource, ta sẽ sử dụng Azure Blob Storage.

  1. Khởi tạo project

Ta vẫn thực hiện các bước như phần 1 ở trên, ngoài ra:

  • Tạo tập tin attendance.py để code các nội dung trong phần 2 và 3
  • Tạo tập tin attendance_validate.py để code nội dung trong phần 4
  • Chuẩn bị Azure account -> Tạo một Storage account -> Tạo một container -> Upload file csv vào container
  • Cài đặt theo hướng dẫn của GX cho Azure Storage
python -m pip install 'great_expectations[azure]' 
  1. Kết nối Datasource

Để kết nối Azure Blob Storage, ta sử dụng đoạn code:

import great_expectations as gx
from great_expectations.checkpoint import SimpleCheckpoint
from great_expectations.exceptions import DataContextError
from ruamel.yaml import YAML

context = gx.get_context()

datasource_name = "azure_datasource"
datasource = context.sources.add_pandas_abs(
    name=datasource_name,
    azure_options={
        "conn_str": "<AZURE_BLOB_STORAGE_CONNECTION_STRING>",
    },
)

Sau khi thực thi, cấu hình Azure Datasource sẽ được thêm vào tập tin great_expectations.yml. Sau lần đầu khởi tạo, ta dùng hàm sau để access datasource:

datasource = context.get_datasource(datasource_name)

Tiếp theo, ta sẽ tạo Data Asset kết nối với tập tin mà ta muốn thực thi kiểm thử. Trong demo này, mình muốn kiểm thử tất cả các file trong container azgxdata nên đoạn code tạo Data Asset sẽ như sau:

asset_name = "attendance_data_asset"
batching_regex = r"(.*)"
abs_container = "azgxdata"
abs_name_starts_with = ""

data_asset = datasource.add_csv_asset(
    name=asset_name,
    batching_regex=batching_regex,
    abs_container=abs_container,
    abs_name_starts_with=abs_name_starts_with,
)
  1. Tạo Expectation

Tạo Expectations Suite cùng validator bằng đoạn code tham khảo từ Jupyter Notebook:

my_batch_request = data_asset.build_batch_request()

expectation_suite_name = "attendance_suite"

try:
    suite = context.get_expectation_suite(expectation_suite_name=expectation_suite_name)
    print(f'Loaded ExpectationSuite "{suite.expectation_suite_name}" containing {len(suite.expectations)} expectations.')
except DataContextError:
    suite = context.add_expectation_suite(expectation_suite_name=expectation_suite_name)
    print(f'Created ExpectationSuite "{suite.expectation_suite_name}".')

validator = context.get_validator(
    batch_request=my_batch_request,
    expectation_suite_name=expectation_suite_name
)

column_names = [f'"{column_name}"' for column_name in validator.columns()]
print(f"Columns: {', '.join(column_names)}.")
validator.head(n_rows=5, fetch_all=False)

Tiếp đó, ta tạo các Expectations:

# Email must be unique
validator.expect_column_values_to_be_unique(column='Email')
# RegistrationDate must be after registration form is posted and before the form is closed
validator.expect_column_values_to_be_between(column='RegistrationDate', max_value='03/09/2020 23:59:29', min_value='01/09/2020 08:00:00', output_strftime_format='%d/%m/%Y %H:%M:%S', parse_strings_as_datetimes=True)
# Position must be in a predefined set
validator.expect_column_values_to_be_in_set(column='Position', value_set=['Developer', 'DevOps', 'Quality Assurance', 'IT', 'Manager', 'Data Scientist'])
# Tool must be a version of allowed app
validator.expect_column_values_to_match_regex_list(column='Tool', match_on='any', regex_list=['Teams', 'TeamSpaceApp'])
# Duration must be shorter than 2 hours (maximum time of event)
validator.expect_column_values_to_be_between(column='Duration', max_value=2, min_value=0)

Cuối cùng, ta thực hiện lưu lại Expectations Suite và Checkpoint, tương tự như trong Jupyter Notebook:

validator.get_expectation_suite(discard_failed_expectations=False)
validator.save_expectation_suite(discard_failed_expectations=False)

checkpoint_config = {
    "class_name": "SimpleCheckpoint",
    "validations": [
        {
            "batch_request": {
                "datasource_name": datasource_name,
                "data_asset_name": asset_name,
                "options": {}
            },
            "expectation_suite_name": expectation_suite_name
        }
    ]
}

checkpointName = "attendance_checkpoint"
checkpoint = SimpleCheckpoint(
    # f"{validator.active_batch_definition.data_asset_name}_{expectation_suite_name}",
    checkpointName,
    context,
    **checkpoint_config
)
yaml = YAML()
context.add_checkpoint(**yaml.load(checkpoint.get_config(mode="yaml")))

Thực thi attendance.py:

Upload image

  1. Thực thi kiểm thử

Để thực thi kiểm thử bằng checkpoint đã tạo, ta sử dụng đoạn code sau:

import great_expectations as gx

context = gx.get_context()

datasource_name = "azure_datasource"
datasource = context.get_datasource(datasource_name)

asset_name = "attendance_data_asset"
batching_regex = r"(.*)"
abs_container = "azgxdata"
abs_name_starts_with = ""

data_asset = datasource.add_csv_asset(
    name=asset_name,
    batching_regex=batching_regex,
    abs_container=abs_container,
    abs_name_starts_with=abs_name_starts_with,
)

checkpointName = "attendance_checkpoint"

context.run_checkpoint(checkpoint_name=checkpointName)

Thực thi attendance_evaluate.py:

Upload image

Cũng tương tự như khi dùng Jupyter Notebook và CLI, sau mỗi lần thực thi, ta có thể xem lại kết quả chi tiết được cập nhật ở Data Docs.

  1. Một số lỗi có thể gặp phải:

Trong quá trình chuẩn bị sample code cho phần Python với Azure Storage, mình đã gặp khá nhiều lỗi khác nhau. Trong phần này, mình sẽ chia sẻ một số lỗi và cách sửa của mình (tuy có thể không được hoàn chỉnh và thuyết phục). Trong những bản nâng cấp kế tiếp của GX, hy vọng chúng ta sẽ không gặp phải những lỗi này nữa.

Lỗi cấu hình checkpoint yml:

 __init__() missing 1 required positional argument

Hay là:

'BatchRequest' object has no attribute 'options'

Bạn hãy mở tập tin yml được tạo (theo source code trên thì sẽ là checkpoints/attendance_checkpoint.yml) và bổ sung thêm options: {} như sau:

...
validations:
  - batch_request:
      datasource_name: azure_datasource
      data_asset_name: attendance_data_asset
      options: {}
    expectation_suite_name: attendance_suite
...

(Mình cũng đã cố gắng lưu lại options: {} từ source code nhưng không được)

Lỗi đọc file trong thư mục của Azure Storage:

Keys in tuplefilesystemstorebackend must not contain substrings in ['/', '\\']

Với những file được lưu trong thư mục con trong container của Azure Storage, ta sẽ gặp phải lỗi trên. Theo mình tìm hiểu thì lỗi này xuất phát từ cấu trúc lưu trữ và validate của GX, hiện đã có issue ticket trên GX GitHub về vấn đề tương tự cho Google Cloud Storage (GCS). Hiện tại, với Azure Storage, mình chỉ có thể đọc và xử lý các file lưu trong thư mục gốc của container mà không nằm trong thư mục con nào.

Great Expectations không phải là...

Dù rất tiện lợi và linh hoạt, tuy nhiên Great Expectations cũng không phải là công cụ "all-in-one". Ví dụ như:

  • Great Expectations không phải là framework thiết kế và thực thi pipeline, thay vào đó, GX tích hợp tốt với những công cụ DAG như Airflow, dbt, Prefect,... và đóng vai trò là công cụ sử dụng trong bước kiểm thử dữ liệu trong pipeline
  • Great Expectations không lưu trữ dữ liệu và thực hiện versioning cho dữ liệu, thay vào đó, các công cụ phù hợp cho việc này là DVC, Quilt, lakeFS...
  • Great Expectations hoạt động tốt nhất trong môi trường Python, trong những ecosystem khác, bạn có thể lựa chọn công cụ phù hợp hơn, như assetR trong môi trường R tuần túy, hoặc TFDV trong hệ sinh thái TensorFlow.

Qua bài viết này, mình hy vọng đã giúp các bạn bước đầu làm quen với công cụ Great Expectations. Việc triển khai GX vào trong hệ thống xử lý dữ liệu sẽ còn nhiều khó khăn và thử thách, tuy nhiên, việc kiểm tra chất lượng dữ liệu vẫn là một bước quan trọng không thể bỏ qua.

Atekco - Home for Authentic Technical Consultants