Source code for kgdata.wikidata.datasets.class_count
from typing import Tuple
from operator import add
from kgdata.dataset import Dataset
from kgdata.spark import does_result_dir_exist, saveAsSingleTextFile
from kgdata.wikidata.config import WDDataDirCfg
from kgdata.wikidata.datasets.classes import classes
from kgdata.wikidata.datasets.entities import entities
from kgdata.wikidata.datasets.entity_types import entity_types, get_instanceof
import orjson
[docs]def class_count(lang="en") -> Dataset[Tuple[str, int]]:
cfg = WDDataDirCfg.get_instance()
if not does_result_dir_exist(cfg.class_count):
ds = entity_types(lang)
if ds.does_exist():
rdd = ds.get_rdd()
else:
rdd = entities(lang).get_rdd().map(get_instanceof)
class_count = rdd.flatMap(lambda x: [(c, 1) for c in x[1]]).reduceByKey(add)
(
classes(lang)
.get_rdd()
.map(lambda x: (x.id, 0))
.leftOuterJoin(class_count)
.map(lambda x: (x[0], x[1][1] if x[1][1] is not None else 0))
.map(orjson.dumps)
.saveAsTextFile(
str(cfg.class_count),
compressionCodecClass="org.apache.hadoop.io.compress.GzipCodec",
)
)
# return Dataset(cfg.class_count / "*.gz", deserialize=orjson.loads)
ds = Dataset(cfg.class_count / "*.gz", deserialize=orjson.loads)
rdd = (
classes(lang)
.get_rdd()
.map(lambda x: (x.id, x.label))
.join(ds.get_rdd())
.map(lambda x: [x[0], x[1][0], x[1][1]])
.sortBy(lambda x: x[2], ascending=True)
.map(lambda x: "\t".join([str(y) for y in x]))
)
saveAsSingleTextFile(
rdd, str(cfg.class_count / "../class_count_sorted.tsv"), shuffle=False
)
# print(ds.get_rdd().sortBy(lambda x: x[1], ascending=False).take(100))
# print(ds.get_rdd().sortBy(lambda x: x[1], ascending=True).take(1000)[900:])
return ds