from kgdata.dataset import Dataset
from kgdata.splitter import split_a_list
from kgdata.wikidata.datasets.entity_ids import entity_ids
from kgdata.wikidata.datasets.entity_redirections import entity_redirections
import orjson
from typing import List
from kgdata.wikidata.models import WDProperty
from kgdata.spark import does_result_dir_exist, get_spark_context, saveAsSingleTextFile
from kgdata.wikidata.config import WDDataDirCfg
from kgdata.wikidata.datasets.entities import entities
from kgdata.wikidata.datasets.classes import build_ancestors
from kgdata.wikidata.models.wdentity import WDEntity
import serde.jl
[docs]def properties(lang="en") -> Dataset[WDProperty]:
cfg = WDDataDirCfg.get_instance()
if not does_result_dir_exist(cfg.properties / "ids"):
(
entities(lang)
.get_rdd()
.flatMap(get_property_ids)
.distinct()
.coalesce(128, shuffle=True)
.saveAsTextFile(
str(cfg.properties / "ids"),
compressionCodecClass="org.apache.hadoop.io.compress.GzipCodec",
)
)
if not does_result_dir_exist(cfg.properties / "properties"):
sc = get_spark_context()
prop_ids = sc.broadcast(
set(sc.textFile(str(cfg.properties / "ids/*.gz")).collect())
)
(
entities(lang)
.get_rdd()
.filter(lambda ent: ent.id in prop_ids.value)
.map(lambda x: WDProperty.from_entity(x).to_dict())
.map(orjson.dumps)
.coalesce(128, shuffle=True)
.saveAsTextFile(
str(cfg.properties / "properties"),
compressionCodecClass="org.apache.hadoop.io.compress.GzipCodec",
)
)
if not (cfg.properties / "unknown_properties.txt").exists():
saveAsSingleTextFile(
get_spark_context()
.textFile(str(cfg.properties / "ids/*.gz"))
.subtract(entity_ids().get_rdd())
.subtract(entity_redirections().get_rdd().map(lambda x: x[0])),
cfg.properties / "unknown_properties.txt",
)
if not does_result_dir_exist(cfg.properties / "ancestors"):
sc = get_spark_context()
saveAsSingleTextFile(
sc.textFile(str(cfg.properties / "properties/*.gz"))
.map(orjson.loads)
.map(WDProperty.from_dict)
.map(lambda x: (x.id, x.parents))
.map(orjson.dumps),
str(cfg.properties / "ancestors/id2parents.ndjson.gz"),
compressionCodecClass="org.apache.hadoop.io.compress.GzipCodec",
)
id2parents = {
k: v
for k, v in serde.jl.deser(
cfg.properties / "ancestors/id2parents.ndjson.gz"
)
}
id2ancestors = build_ancestors(id2parents)
split_a_list(
[orjson.dumps(x) for x in sorted(id2ancestors.items())],
(cfg.properties / "ancestors/id2ancestors/part.ndjson.gz"),
)
(cfg.properties / "ancestors" / "_SUCCESS").touch()
if not does_result_dir_exist(cfg.properties / "full_properties"):
sc = get_spark_context()
id2ancestors = sc.textFile(
str(cfg.properties / "ancestors/id2ancestors/*.gz")
).map(orjson.loads)
def merge_ancestors(o):
id, (prop, ancestors) = o
prop.ancestors = set(ancestors)
return prop
(
sc.textFile(str(cfg.properties / "properties/*.gz"))
.map(orjson.loads)
.map(WDProperty.from_dict)
.map(lambda x: (x.id, x))
.join(id2ancestors)
.map(merge_ancestors)
.map(WDProperty.to_dict)
.map(orjson.dumps)
.coalesce(128, shuffle=True)
.saveAsTextFile(
str(cfg.properties / "full_properties"),
compressionCodecClass="org.apache.hadoop.io.compress.GzipCodec",
)
)
return Dataset(
cfg.properties / "full_properties/*.gz",
deserialize=lambda x: WDProperty.from_dict(orjson.loads(x)),
)
[docs]def get_property_ids(ent: WDEntity) -> List[str]:
prop_ids = set()
if ent.type == "property":
prop_ids.add(ent.id)
# P1647: subpropertyof
for stmt in ent.props.get("P1647", []):
if stmt.value.is_entity_id(stmt.value):
prop_ids.add(stmt.value.as_entity_id())
# statement property and qualifiers
prop_ids.update(ent.props.keys())
for stmts in ent.props.values():
for stmt in stmts:
prop_ids.update(stmt.qualifiers.keys())
return list(prop_ids)