Source code for kgdata.wikidata.datasets.property_domains

from typing import Dict, List, Set, Tuple, Union

import orjson
import sm.misc as M
from kgdata.dataset import Dataset
from kgdata.spark import does_result_dir_exist, get_spark_context, saveAsSingleTextFile
from kgdata.splitter import split_a_list
from kgdata.wikidata.config import WDDataDirCfg
from kgdata.wikidata.datasets.classes import build_ancestors
from kgdata.wikidata.datasets.entities import entities, ser_entity
from kgdata.wikidata.datasets.entity_ids import entity_ids
from kgdata.wikidata.datasets.entity_redirections import entity_redirections
from kgdata.wikidata.models import WDProperty
from kgdata.wikidata.models.wdentity import WDEntity


[docs]def property_domains(lang="en") -> Dataset[Tuple[str, Dict[str, int]]]: """Extract the domains of a property. NOTE: it does not returns children of a domain class but only the class that appears in the statement with the property. For example, consider the statement Peter - age - 50. the direct domain is the class Human, and we don't include class Men, which is a child of the class Human. """ cfg = WDDataDirCfg.get_instance() if not does_result_dir_exist(cfg.property_domains): ( entities(lang) .get_rdd() .flatMap(get_property_domains) .reduceByKey(merge_counters) .coalesce(256) .map(orjson.dumps) .saveAsTextFile( str(cfg.property_domains), compressionCodecClass="org.apache.hadoop.io.compress.GzipCodec", ) ) return Dataset(cfg.property_domains / "*.gz", deserialize=orjson.loads)
[docs]def merge_counters(a: Dict[str, int], b: Dict[str, int]): out = a.copy() for k, v in b.items(): if k not in out: out[k] = 0 out[k] += v return out
[docs]def get_property_domains(ent: WDEntity) -> List[Tuple[str, Dict[str, int]]]: instanceof = "P31" subclass_of = "P279" subproperty_of = "P1647" ignored_props = {instanceof, subclass_of, subproperty_of} domains = { stmt.value.as_entity_id_safe(): 1 for stmt in ent.props.get(instanceof, []) } lst = {} for prop, stmts in ent.props.items(): if prop in ignored_props: continue lst[prop] = domains for stmt in stmts: for qid in stmt.qualifiers.keys(): lst[qid] = domains return list(lst.items())