kgdata.spark#

Utility functions for Apache Spark.

Functions

cache_rdd(rdd, outfile, serfn, deserfn)

close_spark_context()

does_result_dir_exist(dpath[, ...])

Check if the result directory exists

ensure_unique_records(rdd, keyfn[, print_error])

Make sure that RDDs contain unique records

fix_rdd()

get_spark_context()

Get spark context

head(rdd, n)

left_outer_join(rdd1, rdd2, rdd1_keyfn, ...)

Join two RDDs (left outer join) by non primary key in RDD1.

left_outer_join_broadcast(rdd1, rdd2, ...[, ...])

Join two RDDs (left outer join) by non primary key in RDD1.

left_outer_join_repartition(rdd1, rdd2[, ...])

This join is useful in the following scenario:

saveAsSingleTextFile(rdd, outfile[, ...])

get_spark_context()[source]#

Get spark context

Return type:

SparkContext

close_spark_context()[source]#
does_result_dir_exist(dpath: Union[str, Path], allow_override: bool = True, create_if_not_exist: bool = False) bool[source]#

Check if the result directory exists

Parameters:
  • dpath (Union[str, Path]) – path to the result directory

  • allow_override (bool, optional) – allow override the result directory. Defaults to True.

  • create_if_not_exist (bool) –

Return type:

bool

ensure_unique_records(rdd, keyfn, print_error: bool = True)[source]#

Make sure that RDDs contain unique records

Parameters:
  • rdd (RDD) – input dataset

  • keyfn (Callable[[Any], Union[int, str]]) – function that get key value of a record

  • print_error (bool) –

left_outer_join_repartition(rdd1: RDD[Tuple[K, V]], rdd2: RDD[Tuple[K, V2]], threshold: int = 10000, batch_size: int = 1000, num_partitions: Optional[int] = None)[source]#

This join is useful in the following scenario:

  1. rdd1 contains duplicated keys, and potentially high cardinality keys

  2. rdd2 contains unique keys

To avoid high cardinality keys, we artificially generate new keys that have the following format (key, category) where category is a number between [1, n], then perform the join.

Parameters:
left_outer_join(rdd1: RDD[R1], rdd2: RDD[R2], rdd1_keyfn: Callable[[R1], K1], rdd1_fk_fn: Callable[[R1], List[K2]], rdd2_keyfn: Callable[[R2], K2], join_fn: Callable[[R1, List[Tuple[K2, Optional[R2]]]], Optional[R1]], ser_fn: Optional[Callable[[R1], Union[str, bytes]]] = None, outfile: Optional[str] = None, compression: bool = True) RDD[R1][source]#

Join two RDDs (left outer join) by non primary key in RDD1.

RDD1: contains records of (x, Y, x_data) where x is the id of the record, Y are list of ids of records in RDD2. RDD2: contains records of (y, y_data) where y is the id of the record.

Parameters:
  • rdd1 (RDD[R1]) – records of (x, Y, x_data) where x is the id of the record, Y are list of ids of records in RDD2.

  • rdd2 (RDD[R2]) – records of (y, y_data) where y is the id of the record.

  • rdd1_keyfn (Callable[[R1], K1]) – function that extract id of a record (x) of RDD1

  • rdd1_fk_fn (Callable[[R1], List[K2]]) – function that extract Y from a record of RDD1

  • rdd2_keyfn (Callable[[R2], K2]) – function that extract id of a record (y) of RDD2

  • rdd1_join_fn (Callable[[R1, List[Tuple[K2, Optional[R2]]]], Optional[None]]) – function that merge list of Y into record R1, if its return not None, we use that value

  • rdd1_serfn (Optional[Callable[[R1], Union[str, bytes]]]) – function that serialize records of RDD1 to save to file

  • outfile (Optional[str]) – output file – save the result to file if request

  • compression (bool, optional) – whether we should compress the result, by default True

  • join_fn (Callable[[R1, List[Tuple[K2, Optional[R2]]]], Optional[R1]]) –

  • ser_fn (Optional[Callable[[R1], Union[str, bytes]]]) –

Return type:

RDD[R1] the merged records

left_outer_join_broadcast(rdd1, rdd2, rdd1_fk_fn: Callable[[R1], List[K2]], rdd2_keyfn: Callable[[R2], K2], rdd1_join_fn: Callable[[R1, List[Tuple[K2, Optional[R2]]]], None], rdd1_serfn: Callable[[R1], Union[str, bytes]], outfile: str, compression: bool = True)[source]#

Join two RDDs (left outer join) by non primary key in RDD1. This join assumes that RDD2 can fit in memory, and takes the broadcast approach.

RDD1: contains records of (x, Y, x_data) where x is the id of the record, Y are list of ids of records in RDD2. RDD2: contains records of (y, y_data) where y is the id of the record.

Parameters:
  • rdd1 (RDD[R1]) – records of (x, Y, x_data) where x is the id of the record, Y are list of ids of records in RDD2.

  • rdd2 (RDD[R2]) – records of (y, y_data) where y is the id of the record.

  • rdd1_fk_fn (Callable[[R1], List[K2]]) – function that extract Y from a record of RDD1

  • rdd2_keyfn (Callable[[R2], K2]) – function that extract id of a record (y) of RDD2

  • rdd1_join_fn (Callable[[R1, List[Tuple[K2, Optional[R2]]]], None]) – function that merge list of Y into record R1

  • rdd1_serfn (Callable[[R1], Union[str, bytes]]) – function that serialize records of RDD1 to save to file

  • outfile (str) – output file

  • compression (bool, optional) – whether we should compress the result, by default True

head(rdd, n: int)[source]#
Parameters:

n (int) –

saveAsSingleTextFile(rdd, outfile: Union[str, Path], compressionCodecClass=None, shuffle=True)[source]#
Parameters:

outfile (Union[str, Path]) –

cache_rdd(rdd, outfile, serfn: Callable[[Any], str], deserfn: Callable[[str], Any])[source]#
Parameters:
fix_rdd()[source]#