Source code for kgdata.wikidata.datasets.entity_ids

from operator import itemgetter

from kgdata.spark import (
    does_result_dir_exist,
    get_spark_context,
    head,
    saveAsSingleTextFile,
)
from kgdata.splitter import default_currentbyte_constructor
from kgdata.wikidata.config import WDDataDirCfg
from kgdata.wikidata.datasets.entity_dump import entity_dump
from kgdata.dataset import Dataset
from kgdata.wikidata.models.wdentity import WDEntity
from sm.misc.funcs import identity_func
from tqdm import tqdm


[docs]def is_entity_id(id: str) -> bool: """Check if id is a Wikidata entity id. The implementation of the function is verified using the `entity_ids` dataset. """ return ( len(id) > 0 and (id[0] == "Q" or id[0] == "P" or id[0] == "L") and id[1:].isdigit() )
[docs]def entity_ids() -> Dataset[str]: """Get Wikidata entity ids""" cfg = WDDataDirCfg.get_instance() if not does_result_dir_exist(cfg.entity_ids / "ids"): ( entity_dump() .get_rdd() .map(itemgetter("id")) .saveAsTextFile( str(cfg.entity_ids / "ids"), compressionCodecClass="org.apache.hadoop.io.compress.GzipCodec", ) ) dataset = Dataset.string(cfg.entity_ids / "ids/*.gz") if not (cfg.entity_ids / "identifiers.txt").exists(): saveAsSingleTextFile( dataset.get_rdd().sortBy(identity_func).map(lambda x: x + "_"), (cfg.entity_ids / "identifiers.txt"), shuffle=False, ) if not (cfg.entity_ids / "metadata.txt").exists(): prefixes = {"P", "Q", "L"} seen_prefixes = set() # open in bytes mode to use .tell to get the current byte position with open(str(cfg.entity_ids / "identifiers.txt"), "rb") as f, tqdm( total=(cfg.entity_ids / "identifiers.txt").stat().st_size, unit="B", unit_scale=True, desc="verifying entity ids", ) as pbar: n_ids = 0 last_bytes = 0 prev_id = "" for line in f: id = line.strip().decode()[:-1] if prev_id >= id: raise ValueError( f"Id must be unique and sorted, but found: {prev_id} and {id}" ) prev_id = id assert id[0] in prefixes and id[1:].isdigit(), id seen_prefixes.add(id[0]) current_bytes = f.tell() pbar.update(current_bytes - last_bytes) last_bytes = current_bytes # verify the `is_entity_id` function assert is_entity_id(id), id n_ids += 1 (cfg.entity_ids / "metadata.txt").write_text( f""" seen id prefixes: {sorted(seen_prefixes)} total number of ids: {n_ids} """.strip() ) return dataset