Source code for kgdata.wikidata.datasets.page_ids

import csv
import os
from operator import itemgetter
from typing import Tuple
from kgdata.spark import does_result_dir_exist, get_spark_context, ensure_unique_records
from kgdata.wikidata.config import WDDataDirCfg
from kgdata.wikidata.datasets.entity_ids import is_entity_id
from kgdata.dataset import Dataset
from kgdata.wikidata.datasets.page_dump import page_dump


[docs]def page_ids() -> Dataset[Tuple[str, str]]: """Get mapping from Wikidata internal page id and Wikidata entity id (possible old id). To use the entity id, we should use it with redirections (`entity_redirections`) Pages may contain user pages, etc. So we are only keep pages that have entity ids. Returns: Dataset[tuple[str, str]] """ cfg = WDDataDirCfg.get_instance() if not does_result_dir_exist(cfg.page_ids): ( page_dump() .get_rdd() .flatMap(parse_sql_values) .map(extract_id) .filter(lambda x: x is not None) .map(lambda x: "\t".join(x)) .saveAsTextFile( str(cfg.page_ids), compressionCodecClass="org.apache.hadoop.io.compress.GzipCodec", ) ) if not (cfg.page_ids / "_METADATA").exists(): rdd = ( get_spark_context() .textFile(os.path.join(cfg.page_ids, "*.gz")) .map(lambda x: x.split("\t")) ) ensure_unique_records( rdd, itemgetter(0), ) (cfg.page_ids / "_METADATA").write_text( f""" key.unique: true n_records: {(rdd.count())} """.strip() ) return Dataset( file_pattern=cfg.page_ids / "*.gz", deserialize=lambda x: tuple(x.split("\t")) )
[docs]def extract_id(row: list): # the dumps contain other pages such as user pages, etc. page_id, entity_id = row[0], row[2] if not is_entity_id(entity_id): return None assert page_id.isdigit(), page_id return page_id, entity_id
[docs]def parse_sql_values(line): values = line[line.find("` VALUES ") + 9 :] latest_row = [] reader = csv.reader( [values], delimiter=",", doublequote=False, escapechar="\\", quotechar="'", strict=True, ) output = [] for reader_row in reader: for column in reader_row: if len(column) == 0 or column == "NULL": latest_row.append(chr(0)) continue if column[0] == "(": new_row = False if len(latest_row) > 0: if latest_row[-1][-1] == ")": latest_row[-1] = latest_row[-1][:-1] new_row = True if new_row: output.append(latest_row) latest_row = [] if len(latest_row) == 0: column = column[1:] latest_row.append(column) if latest_row[-1][-2:] == ");": latest_row[-1] = latest_row[-1][:-2] output.append(latest_row) return output