Apache NiFi 2.0 lột xác với nhiều đổi mới quan trọng

Sau một thời gian dài thử nghiệm, Apache NiFi 2.0 đã được phát hành chính thức với nhiều cải tiến, đáng chú ý nhất là khả năng hỗ trợ xây dựng các bộ xử lý dữ liệu bằng Python.

Apache NiFi là một nền tảng mã nguồn mở nổi tiếng của Apache chuyên dùng cho việc tự động hóa việc vận chuyển dữ liệu giữa các hệ thống phần mềm, có khả năng tùy biến cao cũng như đạt được độ ổn định cần thiết nhờ phát triển qua thời gian dài.

Tuy nhiên do sử dụng kiến trúc cũ (được phát hành vào năm 2006), NiFi 1.x cũng gặp một số vấn đề nội tại về thiết kế cũng như hiệu năng. NiFi 1.x hiện vẫn sử dụng phiên bản Java 8 trong khi phiên bản này đã kết thúc vòng đời hỗ trợ từ lâu. Ngoài ra, NiFi 1.x cũng phụ thuộc vào công nghệ lỗi thời (cụ thể là ZooKeeper) khiến việc triển khai trên nền tảng này tốn nhiều thời gian, kém hiệu quả.

Vì vậy Apache đã quyết định thay đổi toàn diện NiFi để đáp ứng tốt hơn với xu hướng hiện tại. Sau một thời gian dài thử nghiệm, NiFi đã ra mắt phiên bản GA 2.0 với nhiều điểm nổi trội vào tháng 11 vừa qua.

Upload image

Apache NiFi 2.0 có gì mới?

Dưới đây là một số điểm đổi mới quan trọng của NiFi 2.0:

Hỗ trợ Python: NiFi 2.0 đã cho phép các lập trình viên sử dụng Python để xây dựng các bộ xử lý tùy biến để biến đổi dữ liệu. Đây có thể được xem là tính năng đáng mong chờ nhất trong phiên bản NiFi mới.

Hỗ trợ Kubernetes: Phiên bản mới cho phép chạy NiFi 2.0 như một ứng dụng độc lập trong môi trường K8s cũng như xóa bỏ phụ thuộc vào ZooKeeper trong lúc triển khai.

Làm mới giao diện: Giao diện được làm lại theo hướng hiện đại hơn cũng như hỗ trợ chế độ màn hình tối.

Nâng cấp phiên bản Java: NiFi 2 ngừng sử dụng Java 8 và yêu cầu sử dụng tối thiểu Java 17. Hiện tại Java 8 đã hết vòng đời sản phẩm từ lâu nên đội dự án quyết định nâng phiên bản tối thiểu là Java 17 cũng như sẵn sàng nâng cấp lên các phiên bản mới hơn như JDK 21.

Thay thế một số thành phần quan trọng: Các bộ xử lý Kafka 2 sẽ bị loại bỏ hoàn toàn để thay thế thành Publish/Consume Kafka 3. Một số bộ xử lý khác cũng bị loại bỏ như HBase và Hive.

Kiến trúc tổng quan

Upload image

Kiến trúc chung của NiFi bao gồm các thành phần sau:

FlowFile (File lưu lượng): Đây là thành phần cơ bản của NiFi, bao gồm các file dữ liệu gốc cùng với các siêu dữ liệu đi kèm để mô tả dữ liệu. Các file này có thể lưu trữ dưới dạng text hoặc các dạng khác như hình ảnh, âm thanh, video, ...

Web Server (Máy chủ web): Giao diện web dựa trên phương thức HTTP cho phép người dùng dễ dàng tương tác với NiFi.

Flow Controller (Bộ điều khiển lưu lượng): Được ví như bộ não của NiFi, đây thành phần cốt lõi cung cấp tài nguyên cũng như quản lý môi trường thực thi cho các bộ xử lý tác vụ cũng như các thành phần mở rộng khác.

Extension (Tiện ích mở rộng): Cho phép tùy chỉnh thêm các chức năng của NiFi, ví dụ như khả năng xử lý tùy biến bằng Python.

FlowFile Repository (Kho lưu trữ thông tin): Nơi lưu trữ toàn bộ thông tin trạng thái của các FlowFile.

Content Repository (Kho lưu trữ nội dung): Nơi lưu trữ thông tin nội dung thực tế của các FlowFile dưới dạng file.

Provenance Repository (Kho lưu trữ xuất xứ): Nơi lưu trữ tất cả thông tin sự kiện của NiFi cũng như cho phép truy xuất nguồn gốc của các sự kiện đó.

Xây dựng bộ xử lý tùy chỉnh với NiFi 2.0

Tiếp theo chúng ta sẽ sử dụng Python để xây dựng một bộ xử lý tùy chỉnh đơn giản và cài đặt lên NiFi 2.0.

Cài đặt NiFi

  • Cài đặt Open JDK 21. Đối với Windows, bạn có thể sử dụng WinGet để cài đặt đơn giản bằng câu lệnh sau: winget install Microsoft.OpenJDK.21
  • Download và giải nén Apache NiFi 2.0 bản tiêu chuẩn tại đây
  • Cài đặt Python 3: winget install Python.Python.3.10

Khởi chạy NiFi

  • Vào thư mục cài đặt NiFi và chạy lệnh bin/nifi.cmd start
  • Sau khi khởi tạo, bạn vào địa chỉ https://localhost:8443/nifi để kiểm tra

Upload image

  • Username/Password sẽ được tạo tự động trong quá trình khởi chạy. Bạn có thể vào file logs/nifi-app.log để lấy.

Upload image

  • Sau khi đăng nhập, bạn sẽ thấy giao diện Cavas mặc định của NiFi. Với NiFi 2.0, chúng ta có thể tùy chọn chế độ hiển thị sáng/tối tùy theo sở thích.

Viết phần xử lý bằng Python

Sau khi cài đặt thành công NiFi 2.0, chúng ta cùng tạo một file Python WriteHelloWorld.py đơn giản như sau:

from nifiapi.flowfiletransform import FlowFileTransform, FlowFileTransformResult
from nifiapi.properties import ProcessContext

class WriteHelloWorld(FlowFileTransform):
  class Java:
      implements = ['org.apache.nifi.python.processor.FlowFileTransform']

  class ProcessorDetails:
      version = '0.0.1-SNAPSHOT'
      tags = ["hello"]
 
  def __init__(self, **kwargs):
      super().__init__()

  def transform(self, context: ProcessContext, flowfile) -> FlowFileTransformResult:
      return FlowFileTransformResult(relationship = "success", contents = "Hello World", attributes = {"greeting": "hello"})  

Trong file WriteHelloWorld.py ở trên, chúng ta đã định nghĩa một lớp WriteHelloWorld để thực thi lớp FlowFileTransform có sẵn của NiFi. Sau đó, chúng ta chúng ta tạo một hàm transform đơn giản và trả về kết quả là một đối tượng FlowFileTransformResult.

Sau khi tạo xong file WriteHelloWorld.py, chúng ta sẽ vào file conf/nifi.properties, tìm đến dòng #nifi.python.command=python3 và thêm nifi.python.command=python để NiFi có thể chạy được các bộ mở rộng Python.

Sau đó chúng ta chép file WriteHelloWorld.py vào thư mục python/extensions (nếu chưa có thì tạo mới) và khởi động lại NiFi.

Sau khi khởi động xong, vào lại giao diện web chúng ta sẽ thấy phần mở rộng Python vừa tạo đã được nạp vào NiFi:

Upload image

Tiếp theo chúng ta có thể sử dụng bộ xử lý mới này một cách bình thường và nhận kết quả trả về là một chuỗi Hello World:

Upload image

Upload image

Như vậy, chúng ta đã tạo một bộ xử lý tùy chỉnh đơn giản trên NiFi bằng Python, tuy nhiên nó chưa được hữu ích lắm khi chỉ in ra một chuỗi Hello World cố định. Tiếp theo, chúng ta sẽ tạo một file Python GetWikiData.py khác phức tạp hơn để lấy dữ liệu từ một trang Wiki như sau:

import json
import re
from nifiapi.flowfiletransform import FlowFileTransform, FlowFileTransformResult
from nifiapi.properties import PropertyDescriptor, StandardValidators, ExpressionLanguageScope

class GetWikiData(FlowFileTransform):
  class Java:
      implements = ['org.apache.nifi.python.processor.FlowFileTransform']

  class ProcessorDetails:
      version = '0.0.1-SNAPSHOT'
      description = 'Get a Wiki Article'
      dependencies = ['wikipedia-api']
      tags = ['wikipedia', 'article', 'wiki', 'text']

  FORMAT = PropertyDescriptor(
      name="Wiki format in plant text or html",
      description="Wiki format",
      default_value="text",
      allowable_values=["text", "html"],
      required=True
  )

  WIKIPAGE = PropertyDescriptor(
      name="Wiki Page",
      description='Name of a wiki page',
      required=True,
      validators=[StandardValidators.NON_EMPTY_VALIDATOR],
      expression_language_scope=ExpressionLanguageScope.FLOWFILE_ATTRIBUTES
  )

  property_descriptors = [
      WIKIPAGE,
      FORMAT
  ]

  def __init__(self, **kwargs):
      super().__init__()
      self.property_descriptors.append(self.WIKIPAGE)
      self.property_descriptors.append(self.FORMAT)

  def getPropertyDescriptors(self):
      return self.property_descriptors

  def transform(self, context, flowfile):
      import wikipediaapi
      
      page = context.getProperty(self.WIKIPAGE).evaluateAttributeExpressions(flowfile).getValue()
      format = context.getProperty(self.FORMAT).evaluateAttributeExpressions(flowfile).getValue()

      attributes = {"format": format, "wikipage": page}

      if (format != None):
          if (format == "text"):
              wiki_wiki = wikipediaapi.Wikipedia(user_agent='NiFi Python Demo',language='en',extract_format=wikipediaapi.ExtractFormat.WIKI)
          else: 
              wiki_wiki = wikipediaapi.Wikipedia(user_agent='NiFi Python Demo',language='en',extract_format=wikipediaapi.ExtractFormat.HTML)

      if (page != None):
          results = wiki_wiki.page(page)
          attributes["results"] = results.text

      return FlowFileTransformResult(relationship = "success", contents=results.text, attributes=attributes)

Trong file GetWikiData.py ở trên, chúng ta khai báo sử dụng thư viện wikipedia-api bằng cách thêm vào phần dependencies trong hàm ProcessorDetails. Ngoài ra chúng ta cũng định nghĩa 2 thuộc tính WIKIPAGEFORMAT để lấy dữ liệu từ trang wiki chọn trước. Sau khi thêm file này vào thư mục python/extensions, chúng ta có thể sử dụng trên NiFi như sau:

Upload image

Upload image

Upload image

Với việc phát hành phiên bản GA 2.0, Apache NiFi đã lột xác hoàn toàn và trở nên mạnh mẽ hơn bao giờ hết. Cùng với việc hỗ trợ xây dựng các bộ xử lý bằng Python, NiFi trở nên thân thiện hơn với các lập trình viên Python cũng như các chuyên gia xử lý dữ liệu lớn. Tuy nhiên, do NiFi 2.0 đã loại bỏ một số thành phần quan trọng nên việc nâng cấp từ NiFi 1.x lên NiFi 2.0 có thể sẽ gặp một số khó khăn về mặt tương thích.

Tham khảo

Apache NiFi

NiFi Python Developer's Guide

NiFi 2.0 Release Goals

Python script in NiFi

Atekco - Home for Authentic Technical Consultants