Apache Griffin for Data Validation: Yay & Nay

10 minute read

Published:

In the previous post, I mentioned that there are several observed points regarding Griffin during my exploration.

Before we start, here are several notes.

  • Leveraged docker with image from apachegriffin/griffin_spark2
  • Only batch validation
  • Spark local mode
  • Built Griffin from master branch (0.6.0-SNAPSHOT) because the latest release (0.5.0) had a problem in loading the data sources

Continue.

Yay Points

Yay 1

As you can see in the Data Quality Configuration, Griffin enables the users to specify multiple data connectors. An example of this point is the following.

"data.sources": [
    {
      "name": "src",
      "connector": {
        "type": "file",
        "config": {
          "format": "parquet",
          "paths": []
        }
      }
    },
    {
      "name": "tgt",
      "connector": {
        "type": "file",
        "config": {
          "format": "parquet",
          "paths": []
        }
      }
    }
]

The above example illustrates that we specified two data sources labeled as src and tgt. Both sources are in the parquet format. Well, actually, we can also leverage other data source types with the corresponding connectors, such as hive, kafka, file based (parquet, avro, csv, tsv, text), jdbc based (mysql, postgresql), and custom (cassandra, elasticsearch).

For more information regarding the data connectors, please visit this page.

Yay 2

Griffin supports storing records that don’t meet the constraints (bad records). However, this bad records are only stored in HDFS currently.

To store the bad records, just add the following in the Data Quality Configuration.

"out": [
  {
    "type": "record",
    "name": "file_name"
  }
]

For the sake of clarity, here’s the complete example.

"evaluate.rule": {
    "rules": [
      {
        "dsl.type": "griffin-dsl",
        "dq.type": "ACCURACY",
        "out.dataframe.name": "accu",
        "rule": "source.user_id = target.user_id AND upper(source.first_name) = upper(target.first_name) AND source.last_name = target.last_name AND source.address = target.address AND source.email = target.email AND source.phone = target.phone AND source.post_code = target.post_code",
        "details": {
          "source": "source",
          "target": "target",
          "miss": "miss_count",
          "total": "total_count",
          "matched": "matched_count"
        },
        "out": [
          {
            "type": "metric",
            "name": "accu"
          },
          {
            "type": "record",
            "name": "file_name"
          }
        ]
      }
    ]
  },

The bad records should be stored in hdfs:///griffin/persist. Run the following command to get the full path.

hdfs dfs -ls -R /griffin/persist

Run the following to show the file content.

hdfs dfs -cat <PATH_TO_BAD_RECORDS_FILE>

Yay 3

The next one is that Griffin supports multiple sinks for metrics output. Several examples are console, HDFS, HTTP, MongoDB, and custom sink.

For more information on this sinks please visit this page.

To specify the sinks, just add the following to the Environment Configs.

"sinks": [
    {
      "type": "console",
      "config": {
        "max.log.lines": 100
      }
    }, {
      "type": "hdfs",
      "config": {
        "path": "hdfs:///griffin/streaming/persist",
        "max.lines.per.file": 10000
      }
    }
]

The above example shows that we provide two options for outputting the metrics, namely console and HDFS.

To select the relevant sinks from the specified options, add the following to the Data Quality Configs.

"sinks": [
    "CONSOLE",
    "HDFS"
]

Yay 4

Griffin supports renaming for the metrics’ name. Here’s a simple example.

"details": {
          "source": "src",
          "target": "tgt",
          "miss": "miss_count",
          "total": "total_count",
          "matched": "matched_count"
}

From the above example, we rename miss metric to miss_count. The same also applies for total and matched.

Please visit the rule section for more details.

Yay 5

Griffin supports API service.

Please visit this page for more information.


Nay Points

There are also several limitations for Griffin.

Nay 1

I tried to specify several configs for Uniqueness constraint, such as the following.

"evaluate.rule": {
    "rules": [
      {
        "dsl.type": "griffin-dsl",
        "dq.type": "UNIQUENESS",
        "out.dataframe.name": "unique",
        "rule": "first_name",
        "details": {
          "source": "src",
          "target": "tgt",
          "unique": "unique"
        },
        "out": [
          {
            "type": "metric",
            "name": "unique_first_name"
          },
          {
            "type": "record",
            "name": "invalid_unique_first_name"
          }
        ]
      },
      {
        "dsl.type": "griffin-dsl",
        "dq.type": "UNIQUENESS",
        "out.dataframe.name": "unique",
        "rule": "last_name",
        "details": {
          "source": "src",
          "target": "tgt",
          "unique": "unique"
        },
        "out": [
          {
            "type": "metric",
            "name": "unique_last_name"
          },
          {
            "type": "record",
            "name": "invalid_unique_last_name"
          }
        ]
      },
      {
        "dsl.type": "griffin-dsl",
        "dq.type": "UNIQUENESS",
        "out.dataframe.name": "unique",
        "rule": "first_name, last_name",
        "details": {
          "source": "src",
          "target": "tgt",
          "unique": "unique"
        },
        "out": [
          {
            "type": "metric",
            "name": "unique_first_last_name"
          },
          {
            "type": "record",
            "name": "invalid_unique_first_last_name"
          }
        ]
      }
    ]
  }

From the above example, there are three configs for Uniqueness constraint. The first config validates the constraint for column first_name. The second one validates column last_name. The last one validates multiple column of first_name and last_name.

The problem is that Griffin only returns the metrics of the last config, namely for unique_first_last_name.

I checked the metrics stored in HDFS and the result was still the same. However, the bad records stored in HDFS are completely fine for all the configs (invalid_unique_first_name, invalid_unique_last_name, and invalid_unique_first_last_name).

Nay 2

Next, I observed that how Griffin computes the duplication metrics for the Uniqueness constraint.

If you see the rule for uniqueness, there are the following parameters.

dup: the duplicate count name in metric, optional.

num: the duplicate number name in metric, optional.

duplication.array: optional, if set as a non-empty string, 
the duplication metric will be computed, and the group metric name is this string.

From the above description, the duplication metrics are specified via the dup and num. This is how these metrics are computed.

1. dup -> count the number of occurrence for each category.
2. num -> count the number of occurrence for each "dup".

I’m pretty sure that the above doesn’t make you understand. Let’s take a look at an example.

df = {A: [X, Y, X, X, Y, Z, Z, P, P, P]}

dup -> df.groupby(A).count()

result => {X: 3, Y: 2, Z: 2, P: 3}

dup_result => result - 1 => {X: 2, Y: 1, Z: 1, P:2}

========

From "dup_result", we know that there are two categories with one duplicate count (Y & Z) and two categories with two duplicate counts (X & P).

The number of categories becomes the value for "num".

{"duplicate count": "number of categories"} -> {1: 2, 2: 2}

The duplication metrics are returned as the following.

Suppose that duplication.array = "dup_metric"

Then, 

"dup_metric": [
	{
		"dup": 1,
		"num": 2
	},
	{
		"dup": 2,
		"num": 2
	}
]

Well, what’s the concern then?

If you observe the computation approach, it’s obvious that Griffin requires huge memory when the duplication metrics is activated. Should keep this point in mind when choosing to use the duplication metrics.

Nay 3

Bad records for Completeness constraint are not stored intuitively.

Let’s take a look at an example.

Suppose we have the following dataset.

col_a    |    col_b
==========================
"apache" |    "griffin"
""       |    "spark"
"hello"  |    "world"
"batch"  |    ""
"batch"  |    "streaming"
""       |    ""
==========================

We then execute the Completeness constraint to check whether our data has missing values.

To keep it simple, let’s just use a short notation here.

Completeness(col_a) = 4/6
Completeness(col_b) = 4/6
Completeness(col_a, col_b) = 3/6

How about the stored bad records?

Completeness(col_a)

{}
{}

Completeness(col_b)

{}
{}

Completeness(col_a, col_b)

{"col_b": "spark"}
{"col_a": "batch"}
{}

The problem is, how to interpret the above bad records? This is worse for Completeness(col_a) and Completeness(col_b) only return empty dictionaries.

This is how we interpret the bad records for Completeness(col_a, col_b).

`{"col_b": "spark"}` means that a certain row with "col_b" equals to "spark" has missing value for "col_a".
However, we don't know which row it is.

The same rule applies to `{"col_a": "batch"}`.

Nay 4

The point is that we need another alternative approach for other validations, such as value ranges (min, max). It seems that we can’t directly leverage the available constraints for this validation.

Looking at the available constraints, it’s obvious we can’t use Uniqueness, Completeness, or Timeliness. However, we can use Accuracy constraint with a bit engineering.

If you read the documentation, it’s obvious that the Accuracy constraint leverages “LEFT JOIN” as the core computation such as the following (simplified).

We have a source data S and a target data T.
We would like to compare how many records in T matches records in S.

We can use "Accuracy" for this purpose.

1. Get the miss records of T that don't match with S.

joined_table = S LEFT JOIN T ON <ALL_COLUMNS>

Select all records where T.* are NULL (missed records).

2. Count the number of missed records.

Now, let’s get back to our original problem of validating value ranges.

Since “LEFT JOIN” is the core of the Accuracy computation, we need another table to join. For the case of value ranges, we could create a dummy table with two columns (min & max) and a single row like the following.

DUMMY TABLE

MIN	|	MAX
===================
10	|	50

Suppose below is our source table.

VALUE
=====
5
10
20
50
90
100

Now, here’s the part of config for Accuracy validation.

"data.sources": [
    {
      "name": "src",
      "connector": {
        "type": "file",
        "config": {
          "format": "parquet",
          "paths": [path/to/source/table]
        }
      }
    },
    {
      "name": "dummy",
      "connector": {
        "type": "file",
        "config": {
          "format": "parquet",
          "paths": [path/to/dummy/table]
        }
      }
    }
],
"evaluate.rule": {
    "rules": [
      {
        "dsl.type": "griffin-dsl",
        "dq.type": "ACCURACY",
        "out.dataframe.name": "accu",
        "rule": "src.VALUE >= dummy.MIN AND src.VALUE <= dummy.MAX",
        "details": {
          "source": "src",
          "target": "dummy"
        },
        "out": [
          {
            "type": "metric",
            "name": "accu"
          },
          {
            "type": "record",
            "name": "invalid_records"
          }
        ]
      }
    ]
}

Let’s take a look at how Griffin retrieves the bad records for the above config and examples.

Get the missed records

SELECT src.* FROM src LEFT JOIN dummy 
ON src.VALUE >= dummy.MIN AND src.VALUE <= dummy.MAX 
WHERE (NOT (src.VALUE IS NULL)) AND (dummy.MIN IS NULL AND dummy.MAX IS NULL)

Left Join Result

VALUE   |   MIN   |   MAX
==========================
5       |   NULL  |   NULL
10      |   10    |   50
20      |   10    |   50
50      |   10    |   50
90      |   NULL  |   NULL
100     |   NULL  |   NULL
==========================

Select Result (missed records) where src.VALUE is NOT NULL and dummy.MIN & dummy.MAX is NULL

VALUE
=====
5
90
100
=====

End.