Source code for kgdata.wikidata.datasets.entity_pagerank
from collections import defaultdict
from dataclasses import dataclass
from glob import glob
from io import BytesIO
from pathlib import Path
from typing import (
Callable,
Generic,
Iterable,
List,
Optional,
Sequence,
Tuple,
TypeVar,
)
from sm.misc.ray_helper import ray_map
from kgdata.dataset import Dataset
from kgdata.spark import (
does_result_dir_exist,
left_outer_join_repartition,
get_spark_context,
)
from kgdata.wikidata.config import WDDataDirCfg
from kgdata.wikidata.datasets.entities import entities
from kgdata.wikidata.models.wdentity import WDEntity
import orjson, ray, numpy as np
import serde.textline
KeyType = TypeVar("KeyType")
[docs]@dataclass
class Edge(Generic[KeyType]):
source: KeyType
target: KeyType
weight: int
[docs] def serialize(self):
return "\t".join((str(self.source), str(self.target), str(self.weight)))
[docs] @staticmethod
def deserialize_str(o: str):
r = o.split("\t")
return Edge(r[0], r[1], int(r[2]))
[docs] @staticmethod
def deserialize_int(o: str):
r = o.split("\t")
return Edge(int(r[0]), int(r[1]), int(r[2]))
EntityPageRank = Tuple[str, float]
[docs]def entity_pagerank(lang: str = "en") -> Dataset[EntityPageRank]:
"""Generate a weighted graph of Wikidata's entities. The graph can be used to calculate page rank to determine entity popularity"""
cfg = WDDataDirCfg.get_instance()
idmap_outdir = cfg.entity_pagerank / f"idmap_{lang}"
if not does_result_dir_exist(idmap_outdir):
(
entities(lang=lang)
.get_rdd()
.map(lambda ent: ent.id)
.sortBy(lambda x: (x[0], int(x[1:]))) # type: ignore
.zipWithIndex()
.map(tab_ser)
.saveAsTextFile(
str(idmap_outdir),
compressionCodecClass="org.apache.hadoop.io.compress.GzipCodec",
)
)
# write the total number of entity
(cfg.entity_pagerank / (idmap_outdir.name + ".txt")).write_text(
str(
Dataset(
idmap_outdir / "*.gz",
deserialize=kv_tab_deser,
)
.get_rdd()
.count()
)
)
entity_idmap = Dataset(
idmap_outdir / "*.gz",
deserialize=kv_tab_deser,
)
graph_outdir = cfg.entity_pagerank / f"graph_{lang}"
if not does_result_dir_exist(graph_outdir):
idmap_rdd = entity_idmap.get_rdd()
(
left_outer_join_repartition(
entities(lang=lang)
.get_rdd()
.flatMap(get_edges)
.map(lambda x: (x.source, x))
.groupByKey()
.leftOuterJoin(idmap_rdd)
.flatMap(update_edge_ids_source)
.map(lambda x: (x.target, x)),
idmap_rdd,
num_partitions=3000,
)
.flatMap(update_edge_ids_target)
.map(Edge.serialize)
.saveAsTextFile(
str(graph_outdir),
compressionCodecClass="org.apache.hadoop.io.compress.GzipCodec",
)
)
edges_dataset: Dataset[Edge[int]] = Dataset(
graph_outdir / "*.gz",
deserialize=Edge.deserialize_int,
)
graphtool_indir = cfg.entity_pagerank / f"graphtool_{lang}"
if not does_result_dir_exist(graphtool_indir, create_if_not_exist=True):
ray.init()
@ray.remote
def create_edges_npy(infiles: List[str], outfile: str):
edges = []
eprops = []
for infile in infiles:
for x in serde.textline.deser(infile, trim=True):
edge = Edge.deserialize_int(x)
edges.append((edge.source, edge.target))
eprops.append(edge.weight)
edges = np.asarray(edges)
eprops = np.asarray(eprops)
np.savez_compressed(outfile, edges=edges, eprops=eprops)
# leverage the fact that input file has the format: `part-00000.gz`
infiles = edges_dataset.get_files()
outfiles = {}
for infile in infiles:
outfile = str(graphtool_indir / (Path(infile).stem[:-1] + ".npz"))
if outfile not in outfiles:
outfiles[outfile] = []
outfiles[outfile].append(infile)
assert sum(len(x) for x in outfiles.values()) == len(infiles)
ray_map(
create_edges_npy.remote,
[(sub_infiles, outfile) for outfile, sub_infiles in outfiles.items()],
verbose=True,
poll_interval=0.5,
)
(graphtool_indir / "_SUCCESS").touch()
pagerank_outdir = cfg.entity_pagerank / f"pagerank_{lang}"
if not does_result_dir_exist(pagerank_outdir):
assert does_result_dir_exist(
cfg.entity_pagerank / "graphtool_pagerank_en", allow_override=False
), "Must run graph-tool pagerank at `kgdata/scripts/pagerank_v2.py` first"
n_files = len(
glob(str(cfg.entity_pagerank / "graphtool_pagerank_en" / "*.npz"))
)
def deserialize_np(dat: bytes) -> List[Tuple[int, float]]:
f = BytesIO(dat)
array = np.load(f)
return list(zip(array["ids"], array["data"]))
def process_join(
x: Tuple[int, Tuple[Optional[float], Optional[str]]]
) -> Tuple[str, float]:
assert x[1][0] is not None
assert x[1][1] is not None
return x[1][1], float(x[1][0])
(
get_spark_context()
.binaryFiles(
str(cfg.entity_pagerank / "graphtool_pagerank_en" / "*.npz"),
)
.repartition(n_files)
.flatMap(lambda x: deserialize_np(x[1]))
.fullOuterJoin(entity_idmap.get_rdd().map(lambda x: (int(x[1]), x[0])))
.map(process_join)
.map(orjson.dumps)
.saveAsTextFile(
str(pagerank_outdir),
compressionCodecClass="org.apache.hadoop.io.compress.GzipCodec",
)
)
pagerank_stat_outfile = cfg.entity_pagerank / f"pagerank_{lang}.json"
if not pagerank_stat_outfile.exists():
n_files = len(
glob(str(cfg.entity_pagerank / "graphtool_pagerank_en" / "*.npz"))
)
def deserialize_np2(dat: bytes) -> np.ndarray:
f = BytesIO(dat)
array = np.load(f)
return array["data"]
rdd = (
get_spark_context()
.binaryFiles(
str(cfg.entity_pagerank / "graphtool_pagerank_en" / "*.npz"),
)
.repartition(n_files)
.map(lambda x: deserialize_np2(x[1]))
)
total = rdd.map(lambda x: np.sum(x)).sum()
size = rdd.map(lambda x: len(x)).sum()
mean_pagerank = total / size
std_pagerank = np.sqrt(
rdd.map(lambda x: np.sum(np.square(x - mean_pagerank))).sum() / size
)
max_pagerank = rdd.map(lambda x: np.max(x)).max()
min_pagerank = rdd.map(lambda x: np.min(x)).min()
pagerank_stat_outfile.write_bytes(
orjson.dumps(
{
"sum": total,
"len": size,
"mean": mean_pagerank,
"max": max_pagerank,
"min": min_pagerank,
"std": std_pagerank,
},
option=orjson.OPT_INDENT_2 | orjson.OPT_SERIALIZE_NUMPY,
)
)
return Dataset(pagerank_outdir / "*.gz", deserialize=orjson.loads)
[docs]def get_edges(ent: WDEntity) -> List[Edge]:
edges = defaultdict(int)
for pid, stmts in ent.props.items():
for stmt in stmts:
if stmt.value.is_entity_id(stmt.value):
edges[ent.id, stmt.value.as_entity_id()] += 1
for qvals in stmt.qualifiers.values():
for qval in qvals:
if qval.is_entity_id(qval):
edges[ent.id, qval.as_entity_id()] += 1
return [Edge(source=s, target=t, weight=w) for (s, t), w in edges.items()]
[docs]def update_edge_ids_source(
joined_result: Tuple[str, Tuple[Iterable[Edge], Optional[str]]]
):
assert joined_result[1][1] is not None
for edge in joined_result[1][0]:
edge.source = joined_result[1][1]
return joined_result[1][0]
[docs]def update_edge_ids_target(
joined_result: Tuple[str, Tuple[Iterable[Edge], Optional[str]]]
):
assert joined_result[1][1] is not None
for edge in joined_result[1][0]:
edge.target = joined_result[1][1]
return joined_result[1][0]
kv_tab_deser: Callable[[str], Tuple[str, str]] = tab_deser # type: ignore