2 Mar 2023
On the occasion of starting work in my new position (distributed processing of nested data), I thought it worth writing a short post about the challenges, my ideas regarding large-scale nested data processing, and Wikidata dump processing. We also have a warm-up project using Spark!
Wikidata is a collaborative and open KG, well-known for its massive size and complicated structure. It now (March 2023) has more than 102 million items and nearly 1.4 billion facts about those items and these numbers growing like every minute. Wikidata is called a Knowledge Graph, but it is not stored in native graph DBMSes, its backend is a MySQL database and its original dumps are in JSON, but as a Linked Open Data dataset, the dumps are published in RDF format as well.
The Wikidata JSON dump, with nearly 105 GB of data (compressed), is the input of several tasks, including Subsetting. Almost all practical subsetting tools (i.e., WDSub, WDumper, Wikibdase Dump Filter, and KGTK) get the JSON.gz dump. And as a complex, unbalanced, and massive nested structure, the JSON dump is a tempting target for large-scale nested data processing.
Well, nested data are widely used nowadays. Data transfer in web applications and from the front end to the back end is almost entirely based on JSON. No-SQL and hierarchical databases and DBMSes are on the other side. There is a high demand for processing nested data.
On the other hand, these data are massive. So, one of the first approaches that come to mind in such a situation is to distribute the processing using distributed platforms like Hadoop and Spark.
But distributed processing on nested data on a large scale has challenges [1]:
It refers to the common problem of transferring centralized computations on distributed platforms. So the main problem in such cases is that the distributed algorithm needs to access some parts of data that are not available at the current working node.
The traditional solutions are repeating the parts of data that are most likely to be needed in all worker nodes, but such an approach will increase redundancy and needs more storage space, so it is costly.
Another solution is to Flatten the nested data and to request to join the flattened data whenever it is not available. But flattening also reduces the accuracy and it is possible some parts are missed in such joins requests, so there is a chance of having errors.
A more recent solution from [1] is a compilation framework, very similar to a query optimization approach: doing joint requests as top-level as possible!
In the Wikidata dump, and for querying the dump or subsetting the dump, it is quite possible that we face this challenge. The main reason is the linked nature of the dump because top-level blobs (which are the items) are connected with statements linked together, so it is quite possible that any query would need items that are not available at the current worker node.
This challenge is when the number of top-level tuples is low but the number and the amount of data in their nested parts (low-level tuples) are high. Because the distribution is based on top-level tuples, having a low number of them will reduce the parallelization ratio, which reduces the efficiency.
The traditional solution for this problem is to flatten the nested data, but again we will have the possibility of errors and lots of redundancy. Another limitation is that we will lose the nested structure for the intermediate queries when the output of a query is the input of another query, and we would need this intermediate output to be nested as well.
Smith et al.'s [1] solution for this problem is called Shredding Framework, which is an optimized flattening algorithm that creates concise outputs. This shredding should be applied to both data and the query, and it's based on mapping the inner nested parts to labels and using pointers to these labels.
It refers to the imbalances in the amount of data in the nested blobs which raises performance issues as some parts take longer processing time than others.
This is a real problem in the context of Wikidata JSON dumps. For example, while some items have 10 statements, some others have more than 200 statements. In the second level, where some statements have one reference, some have more than 20 references or more.
The only way to deal with this problem in nested data is the Skew-resilient Processing framework [1], which has 3 steps:
1. Identify heavy nodes, i.e., finding those blobs with more data than a specific threshold
2. Create the Skew Triple, which is separating the heavy and light blobs
3. Apply separate processing plans for the light and heavy blobs. For heavy blobs, there would be a second level of distribution.
I have two ideas:
The first one is to study the Wikidata Dump as a use case using available heuristics. There are very joint features like Wikidata dumps that are JSON based, which are quite Massive in size. Its Query time is very important and a challenge. Subsetting is important and is a current problem and the fact that it is very skewed and also interconnected.
So in theory, I think the nested data algorithms and heuristics can be applied to Wikidata distribution processing, but we need also some modifications. For example, we need to transform the algorithms in the context of graphs. Queries should be in the form of SPARQL.
The second suggestion is vice versa: using graph representations for helping nested data processing, i.e., deploying graph triples to flatten data. and subsequently, we need to create a mapping from initial queries to SPARQL. There are several nested structures (like JSON) to RDF mapping approaches. In that case, we will have some redundancy (maybe a lot), but we might get good performance because the RDF structure is very simple.
One of the challenges in both directions is that the Spark GraphX platform is not well maintained as far as I know. So we might need to think of a different distribution approach.
I tried to implement a very simple experiment: an average calculation distributed on three LAN machines using Spark. To do this, I used one of the large RQSS_Extractor output files from the Gene Wiki subset, which can be downloaded from zenodo.org/record/7336208 (Download GeneWiki_rqss_extractor_output.zip and use statement_node_ref_num.data)
This file is ~5 GB in size and contains 63521697 records. I added a record as the header (statementNode,score) in the first line and changed the file extension to .csv. The part we are dealing with is the second column (score). We want to get the average of this column.
A simple non-distributed program to calculate the average like the below takes ~150 seconds on my machine:
import pandas as pd
from datetime import datetime
start=datetime.now()
data =
pd.read_csv("file:///scratch/spark-test/rqss_extractor_output/statement_node_ref_num.csv")
print("The average score is {:5f}".format(data['score'].mean()))
end=datetime.now()
print('time: ',(end-start).total_seconds())
Then, I downloaded Spark standalone from this link: https://www.apache.org/dyn/closer.lua/spark/spark-3.3.2/spark-3.3.2-bin-hadoop3.tgz
I decompressed the binaries, putting the binaries on four Linux machines connected as a LAN, calling them Linux01, Linux02, Linux03, and Linux04.
Linux01 is the master, others will be the workers. So on Linux01, I ran the following command:
spark-3.3.2-bin-hadoop3/sbin/start-master.sh
and Linux01 will be my master node with URL: spark://linux01:7077. Then I ran the following on the other machines:
spark-3.3.2-bin-hadoop3/sbin/start-worker.sh spark://linux01:7077
I also installed PySpark on Linux01.
Now I wrote the following code using PySpark (thanks to the https://www.projectpro.io/recipes/perform-descriptive-statistics-on-columns-of-dataframe-pyspark):
import pyspark
from pyspark.sql import SparkSession
import pyspark.sql.functions as f
spark = SparkSession.builder.appName('score_average').getOrCreate()
df =
spark.read.load("file:///scratch/spark-test/rqss_extractor_output/statement_node_ref_num.csv",
format="csv", sep=",", inferSchema="true", header="true")
df.select("score").describe().show()
then used the following command to submit the program to my small Spark cluster:
./spark-3.3.2-bin-hadoop3/bin/spark-submit --master spark://linux01:7077 --deploy-mode client score_average.py
and this time, I have the average, along with max, mean, count and standard deviation in less than 35 seconds!
--------------------------------
[1] Smith, J., Benedikt, M., Nikolic, M., & Shaikhha, A. (2020). Scalable querying of nested data. Proceedings of the VLDB Endowment, 14(3), 445-457.