Cơ chế xử lý linh động trên Azure Databricks

Trong bài viết này, tôi sẽ chia sẻ về cơ chế mà một dự án Big Data đã thực hiện trên Azure Databricks để có được phương pháp xử lí linh hoạt và có thể cập nhật qua cấu hình.

Trong một dự án về Big Data gần đây, tôi phụ trách phần xử lí dữ liệu từ nhiều nguồn dữ liệu khác nhau, mỗi nguồn dữ liệu sẽ có từng quy trình xử lí khác nhau. Điểm mấu chốt đó là các quy trình xử lí này cần có khả năng cập nhật dễ dàng, việc tạo mới quy trình xử lí cần hạn chế tạo tác động tới những luồng đang có, ngoài ra còn có thể có thể cập nhật qua cấu hình.

Với những yêu cầu như trên, cả team quyết định sử dụng Azure Databricks cho Big Data Analytics service. Bên cạnh đó, tập tin cấu hình sẽ dùng định dạng JSON. Việc quan trọng đó là tìm cách nào để kết nối JSON và Databricks với nhau để hệ thống hoạt động linh hoạt và có khả năng cập nhật dễ dàng. Trong bài viết này, tôi sẽ chia sẻ về cơ chế mà dự án đã thực hiện trên Databricks để có được phương pháp xử lí linh hoạt và có thể cập nhật qua cấu hình.

I. Tổng quan Databricks

Azure Databricks là dịch vụ triển khai Databricks trên nền tảng Azure, cung cấp khả năng autoscale, tương tác với các thành viên khác dễ dàng thông qua workspace. Azure Databricks hỗ trợ nhiều ngôn ngữ như Java, Python, Scala… Trong dự án thì tôi sử dụng Scala và Python là ngôn ngữ chính.

Để có thêm nhiều thông tin, các bạn có thể tìm hiểu thêm trong các đường dẫn sau:

II. Quy trình xử lý linh động trong Databricks

Có một cách để hiết kế nền tảng Databricks đó là tạo một notebook cho mỗi data flow. Tuy nhiên cách này lại có một số nhược điểm như:

  • Nếu data flow được bổ sung thường xuyên, đội ngũ phát triển sẽ phải tạo một notebook mới từ đầu.
  • Cập nhật data flow rất tốn thời gian, và không phải flow nào cũng phù hợp với yêu cầu.
  • Dù phát triển các tính năng giống nhau nhưng mỗi developer sẽ có cách làm khác nhau, gây khó khăn trong việc cập nhật

Trong khi đó, yêu cầu đặt ra là thực hiện cơ chế để xử lý cho nhiều luồng dữ liệu, hệ thống cần có tính linh hoạt và có thể cập nhật qua cấu hình để có thể thay đổi luồng xử lí theo nhu cầu. Vì thế, thay vì tạo riêng notebook cho mỗi luồng xử lí, tôi tạo một cơ chế như sau:

Giải thích cơ chế:

  • Input: Tôi tạo một tập tin cấu hình cho mỗi luồng xử lí riêng biệt. Khi dữ liệu của một luồng được xử lí, các bước cụ thể của luồng xử lí sẽ được lấy từ trong tập tin cấu hình này.
  • Coordinate notebook: notebook này có nhiệm vụ download input file, kiểm tra tập tin cấu hình và gọi các feature notebook theo như cấu hình để thực thi luồng xử lí dữ liệu.
  • Feature notebooks: Các notebook này cung cấp nhiều khả năng xử lí dữ liệu khác nhau, và có thể được mở rộng tùy theo nhu cầu trong tương lai.

Tiếp theo, tôi sẽ thực hiện cơ chế mẫu cho 2 feature notebook: format phone number (định dạng số điện thoại) và remove sensitive column (xóa các cột thông tin nhạy cảm). Việc hiện thực hóa được thực hiện bằng Azure Databricks và Azure Storage Blob.

Đầu tiên, tôi tạo các common notebook để tải và ghi tập tin, tương tác với Azure Storage Blob. Download file notebook (scala):

1.  val storageAccountName = "<your_storage_account_name>"  
2.  val storageAccountKey = "<your_storage_account_key>"  
3.  // Get information from parameters
4.  val containerName = dbutils.widgets.get("containerName")  
5.  val fileName = dbutils.widgets.get("fileName")  
6.  val outputFile = dbutils.widgets.get("outputFile")  
7.  val fileType = dbutils.widgets.get("fileType")  
8.    
9.    
10. // Configure connection string
11. // In reality, you should use Key Vault instead of using key directly
12. spark.conf.set("fs.azure.account.key." + storageAccountName + ".blob.core.windows.net", storageAccountKey)  
13.   
14. // Remove the file if it exists  
15. dbutils.fs.rm(outputFile, true)  
16. 
17. // Download csv file
18. if (fileType == "csv") {  
19.     val dataDf = spark.read  
20.     .option("header","true")  
21.     .option("inferSchema", "true")  
22.     .csv("wasbs://" + containerName + "@"+ storageAccountName+ ".blob.core.windows.net/" + fileName)        
23.     dataDf.write.parquet(outputFile)  
24. } else if (fileType == "json") {  
25. // Download json file
26.     val dataDf = spark.read.json("wasbs://" + containerName + "@"+ storageAccountName+ ".blob.core.windows.net/" + fileName)  
27.     dataDf.write.json(outputFile)  
28. } else {  
29. // Download text file
30.     val dataDf = spark.read.textFile("wasbs://" + containerName + "@"+ storageAccountName+ ".blob.core.windows.net/" + fileName)  
31.     
32.     val result = dataDf.collect().mkString("\n")  
33.     dbutils.notebook.exit(result)  
34. }

Write file notebook (python):

1.  val storageAccountName = "<your_storage_account_name>"  
2.  val storageAccountKey = "<your_storage_account_key>"  
3.  // Get information from parameters
4.  dataPath = dbutils.widgets.get("dataPath")  
5.  output_container_name = dbutils.widgets.get("output_container_name")  
6.  outputPath = dbutils.widgets.get("outputPath")  
7.    
8.  # Configure blob storage account access key globally  
9.  # In reality, you should use Key Vault instead of using key directly
10. spark.conf.set("fs.azure.account.key.%s.blob.core.windows.net" % storage_name, storage_key)  
11.   
12. df = spark.read.parquet(dataPath)  
13.   
14. output_container_path = "wasbs://%s@%s.blob.core.windows.net/" % (output_container_name, storage_name)  
15. output_blob_folder = "%stmpFolder" % output_container_path  
16.   
17. # write the dataframe as a single file to blob storage  
18. (df  
19.  .coalesce(1)  
20.  .write  
21.  .mode("overwrite")  
22.  .option("header", "true")  
23.  .format("com.databricks.spark.csv")  
24.  .save(output_blob_folder))  
25.   
26. files = dbutils.fs.ls(output_blob_folder)  
27. output_file = [x for x in files if x.name.startswith("part-")]  
28.   
29. dbutils.fs.mv(output_file[0].path, "%s" % (output_container_path + outputPath))  

Tiếp theo, tôi tạo notebook để chuyển đổi định dạng số điện thoại thành định dạng chuẩn, và xóa những số điện thoại không thể định dạng được.

1.  import scala.util.matching.Regex  
2.  import org.apache.spark.sql.functions._  
3.    
4.  // define function check  
5.  def isPhoneNumberFormat() = udf((data: String) => {  
6.    var pattern: Regex = "^[+]*[(]84[)]\\d{8,10}$".r  
7.    if (pattern.findFirstMatchIn(data).mkString.length() > 0) {  
8.      1 // correct pattern  
9.    } else {  
10.     val data2 = data.replaceAll("[+()-]|\\s", "") // remove  special character  
11.     if (data2.length() > 11 | data2.length() < 10) {  
12.       0 // not a format of phone number  
13.     } else {  
14.       100 // can be corrected  
15.     }  
16.   }  
17. })  
18. // define corrected function  
19. def correctPhoneNumber() = udf((data: String, check: Int) => {  
20.   if (check == 1 | check == 0) {  
21.     data.toString // no need to correct  
22.   } else {  
23.     if (check == 100) {  
24.       // replace first digit to +(84)  
25.       data.replaceAll("[+()-]|\\s", "").replaceFirst("[0-9]", "+(84)").toString  
26.     } else {  
27.       data  
28.     }  
29.   }  
30. })  
31. // read paramter  
32. val dataPath = dbutils.widgets.get("dataPath")  
33. val colName = dbutils.widgets.get("phoneCol")  
34. val keyCol = dbutils.widgets.get("keyCol")  
35. val tmpColumn = dbutils.widgets.get("tmpColumn")  
36.   
37. var df = spark.read.parquet(dataPath)  
38. 
39. // execute check, return data frame with 3 column: key Column, corrected column, check column  
40. df = df.withColumn(tmpColumn, isPhoneNumberFormat()(df(colName)))  
41. df = df.withColumn(colName, correctPhoneNumber()(df(colName), df(tmpColumn)))  
42. .select(keyCol,colName, tmpColumn)  
43. df.show()  
44. 
45. // write to temp file and result  
46. val outputPath = dataPath + System.currentTimeMillis().toString  
47. df.write.parquet(outputPath)  
48. dbutils.notebook.exit(outputPath)  

Tiếp theo, tạo notebook để xóa các cột dữ liệu nhạy cảm.

1.  import org.apache.spark.sql.functions._  
2.  
3.  // read paramter  
4.  val dataPath = dbutils.widgets.get("dataPath")  
5.  val keyCol = dbutils.widgets.get("keyCol")  
6.  val colNameStr = dbutils.widgets.get("column")  
7.    
8.  var df = spark.read.parquet(dataPath)  
9.  val colNameLst = colNameStr.split("//")  
10.   
11. df= df.select(keyCol)  
12. for( w <- 0 to colNameLst.length - 1)   
13.   {   
14.     df= df.withColumn(colNameLst(w), lit(1))  
15.   }  
16. df= df.drop(keyCol)  
17. 
18. // write to temp file and result  
19. val outputPath = dataPath + (current_timestamp()).expr.eval().toString  
20. df.write.parquet(outputPath)  
21. dbutils.notebook.exit(outputPath)

Tiếp theo là tạo coordinate notebook.

1.  import org.apache.spark.sql.functions._  
2.  import org.apache.spark.sql.types.{  
3.      StructType, StructField, StringType, IntegerType}  
4.  import org.apache.spark.sql.Row  
5.  import org.apache.spark.sql.Column  
6.  
7.  
8.  // Configure common feature notebook  
9.  val downloadFileNotebook = "<path_to_download_file_notebook>"  
10. val writeFileNotebook = "<path_to_write_file_notebook>"  
11. val contraintFeaturesFolder = "<path_to_feature_notebook_folder>"  
12. 
13. val tmpFolder = "/tmp/"  
14. 
15. // These values are hard coded here, but you can update to get it from parameters  
16. val dataFile = "user.csv"  
17. val dataFileContainer = "input"    
18. val validationFile = "user-configuration.json"  
19. val validationFileContainer = "configuration"  
20. val keyCol = "User_ID"  
21.   
22.   
23. val hashString = (current_timestamp()).expr.eval().toString  
24. // Download data file  
25. val outputFileData = tmpFolder + "dataTemp" + hashString  
26. dbutils.notebook.run(downloadFileNotebook, 60, Map("containerName" -> dataFileContainer, "fileName" -> dataFile, "outputFile" -> outputFileData, "fileType" -> "csv"))  
27.   
28. //download validation file  
29. val outputFileValidation = tmpFolder + "validation" + hashString  
30. dbutils.notebook.run(downloadFileNotebook, 60, Map("containerName" -> validationFileContainer, "fileName" -> validationFile, "outputFile" -> outputFileValidation, "fileType" -> "json"))  
31.   
32. // Read validation file  
33. val constraintDF = spark.read.json(outputFileValidation)  
34.   
35. // Read data file  
36. var dataDf = spark.read.parquet(outputFileData)  
37. val dataOrginDf = spark.read.parquet(outputFileData)  
38. 
39. //Read constraint check  
40. val conDf = constraintDF.select($"action", $"param.*")  
41.   
42. val tmpColumn = "tmpCheck"  
43.   
44. //create empty failzone dataframe  
45. val schema = StructType(  
46.     StructField(keyCol, StringType, false) :: Nil)  
47.   
48. // var failDF = spark.createDataFrame(sc.emptyRDD[Row], schema)  
49. var constOutputLst = Array[String]()  
50. // execute constraint check  
51. conDf.collect().foreach(row => {  
52.   var paramMap = scala.collection.mutable.Map[String, String]()  
53.   paramMap("dataPath") = outputFileData  
54.   paramMap("keyCol") = keyCol  
55.   paramMap("tmpColumn") = tmpColumn  
56.   // create param map  
57.   for( w <- 1 to conDf.columns.size - 1)   
58.     {   
59.       if (row(w) != null) {  
60.         paramMap(conDf.columns(w)) = row(w).toString  
61.       }  
62.     }  
63.   // execute Check  
64.   val outputPath = dbutils.notebook.run(contraintFeaturesFolder + row(0).toString, 60, paramMap)   
65.   // add output file path to list for process later   
66.   constOutputLst :+= outputPath  
67. })  
68. 
69. var failDF = spark.createDataFrame(sc.emptyRDD[Row], schema)  
70.   
71. //Process constraint output file  
72. constOutputLst.foreach (file => {  
73.   val outDF = spark.read.parquet(file)  
74.   // check to remove column, if returned dataframe not have key Column => remove column  
75.   if (outDF.columns.filter(_ == keyCol).length > 0) {  
76.     // get all column but keycol and tempCheck column  
77.     outDF.columns.filter(_ != keyCol).filter(_ != tmpColumn).foreach(colName => {  
78.       // update or add new column from output file  
79.       val tmp = outDF.select(keyCol, colName)  
80.       dataDf = dataDf.drop(colName).join(tmp, dataDf(keyCol) === tmp(keyCol), "left").drop(tmp(keyCol))  
81.     })  
82.     // add fail data to fail df  
83.     failDF = failDF.union(outDF.filter(col(tmpColumn) === 0).select(keyCol))  
84.   } else {  
85.     // remove column  
86.     outDF.columns.filter(_ != tmpColumn).foreach(colName => {  
87.       dataDf = dataDf.drop(colName)  
88.     })  
89.   }  
90.     
91. })  
92. 
93. //save to correctZone  
94. val correctZoneDF = dataDf.join(failDF, dataDf(keyCol) === failDF(keyCol), "left_anti")  
95. if (correctZoneDF.collect().size > 0) {  
96.   val dataPath = outputFileData + "output" + System.currentTimeMillis().toString  
97.   val output_container_name = "output"  
98.   val outputPath = dataFile  
99.   correctZoneDF.write.parquet(dataPath)  
100.      dbutils.notebook.run(writeFileNotebook, 60, Map("dataPath" -> dataPath, "output_container_name" -> output_container_name, "outputPath" -> outputPath))  
101.    }  
102.    
103.    // Remove temp file if it exists  
104.    dbutils.fs.rm(outputFileValidation, true)  
105.    dbutils.fs.rm(outputFileData, true)  
106.    
107.    // return result check  
108.    dbutils.notebook.exit("1") 

Cuối cùng sẽ là bước kiểm thử luồng xử lí bằng cách upload data file và configuration file vào Azure Blob Storage.

Data file sample:

User_ID,Phone_No,User_Name,Password

24306,303-555-0011,achigeol,8[gxXvQDt9sTQX

65824,225-556-1923,hermathe,Q7#CDYrr?hdxnth6

14506,219-557-3874,stashero,Vq8#upVE7qj9_M+n

71463,215-558-9821,inghthlo,WXzshf8rU^ts8CUN

36808,262-559212-212,adeldona,8@6RvrbJzNg%Dws5

69170,319-660-9832,wdyalbow,r3^T8++f9MhVJe5h

17255,229-661-2134,introsgo,eXH8ENa8J!cd^P4

56940,216-662-8732,burienti,BSZC_vxPgTm^q4J%

52720,210-663-8724,itereart,A$F3Rtnc4b%Rtk

Configuration file sample:

[{"action":"remove-column","param":{"column":"Password"}},{"action":"format-phone-number","param":{"phoneCol":"Phone_No"}}]

Quá trình kiểm thử như sau:

  • Upload data file với tên “user.csv” vào “input” container
  • Upload configuration file với tên “user-configuration.json” vào “configuration” file
  • Tạo “output” container

Những tên này được hard code trong coordinate notebook để phục vụ cho việc demo. Trong thực tế, ta sẽ lấy những giá trị này từ parameter hoặc configuration file. Sau khi đã chuẩn bị các tập tin, ta sẽ chạy coordinate notebook. Output file với tên “user.csv” sẽ được xuất ra trong “output” container. Nội dung kì vọng:

User_ID,User_Name,Phone_No

65824,hermathe,+(84)255561923

24306,achigeol,+(84)035550011

56940,burienti,+(84)166628732

71463,inghthlo,+(84)155589821

17255,introsgo,+(84)296612134

14506,stashero,+(84)195573874

69170,wdyalbow,+(84)196609832

52720,itereart,+(84)106638724

Cột Phone number được chuyển thành định dạng chuẩn, và cột Password được xóa khỏi dữ liệu.

III. Kinh nghiệm cá nhân

Khả năng cập nhật và tái sử dụng (configurable and reusable)

Với cơ chế như trên, các feature có thể được tạo mới hoặc cập nhật riêng biệt. Việc này giúp các feature có thể phát triển riêng biệt. Hơn nữa, một feature notebook có thể được tái sử dụng cho nhiều luồng xử lí khác nhau.

Lợi thế của cơ chế trên:

  • Chỉ cần một tệp cấu hình cho mỗi processing flow mới. Với UI/UX, doanh nghiệp sẽ có thể tự cập nhật/tạo lập các flow.
  • Hệ thống có thể được bổ sung tính năng xử lý mới mà không gây ảnh hưởng tới bất cứ thành phần nào khác. Chi phí phát triển một tính năng cũng rẻ hơn so với việc thực hiện một processing flow hoàn toàn mới.
  • Các tính năng có thể được tái sử dụng cho các processing flow khác.

Trigger Databricks bằng Azure Logic App

Trong trường hợp cần tích hợp với các hệ thống khác, bạn có thể dùng Azure Logic App để trigger Databricks khi một tập tin dữ liệu được upload vào blob container. Các bước như bên dưới:

  • Tạo Databricks Job để publish coordinate notebook, bạn có thể trigger job này để run notebook bằng Azure Databricks REST API
  • Tạo Logic App để gọi Azure Databricks REST API nhằm trigger notebook. Logic App này có thể được trigger khi tập tin dữ liệu upload vào blob container
  • Cấu hình blob container để gọi Logic App khi tập tin dữ liệu được upload

Cải tiến hiệu quả

Theo kinh nghiệm sử dụng Azure Databricks của tôi, việc chạy notebook từ một notebook khác có thể chậm hơn so với xử lí cùng notebook. Đó là vì các notebook cần thời gian khởi động trước khi chạy. Vì thế, nếu có quá nhiều feature trong luồng xử lí, bạn có thể cân nhắc các điểm sau để cải thiện performance:

  • Thay vì dùng notebook riêng rẽ theo feature, bạn có thể tách notebook theo nghiệp vụ (gồm nhiều feature), hoặc mỗi luồng xử lí có một notebook riêng. Tất cả các feature cần thiết đều được hiện thực trong cùng notebook. Cơ chế này có thể làm giảm bớt tính tái sử dụng, nhưng có thể cải thiện performance.
  • Build feature thành các thư viện và import vào coordinate notebook để gọi mà không cần phải thông qua các notebook khác. Mặc dù mỗi lần cập nhật tính năng sẽ cần phải cập nhật lại các thư viện, nhưng nếu nghiệp vụ không yêu cầu update thường xuyên thì cơ chế này phù hợp để cải thiện performance.

Kết lại, dù việc ứng dụng vào thực tiễn có có rất nhiều hạn chế và khó khăn, nhưng tôi hy vọng bạn sẽ tìm thấy vài điểm hữu ích giúp bạn hiện thực được luồng xử lí dữ liệu bằng Databricks.

Atekco - Home for Authentic Technical Consultants