kgdata.spark#
Utility functions for Apache Spark.
Functions
|
|
|
Check if the result directory exists |
|
Make sure that RDDs contain unique records |
|
|
Get spark context |
|
|
|
|
Join two RDDs (left outer join) by non primary key in RDD1. |
|
Join two RDDs (left outer join) by non primary key in RDD1. |
|
This join is useful in the following scenario: |
|
- does_result_dir_exist(dpath: Union[str, Path], allow_override: bool = True, create_if_not_exist: bool = False) bool[source]#
Check if the result directory exists
- ensure_unique_records(rdd, keyfn, print_error: bool = True)[source]#
Make sure that RDDs contain unique records
- left_outer_join_repartition(rdd1: RDD[Tuple[K, V]], rdd2: RDD[Tuple[K, V2]], threshold: int = 10000, batch_size: int = 1000, num_partitions: Optional[int] = None)[source]#
This join is useful in the following scenario:
rdd1 contains duplicated keys, and potentially high cardinality keys
rdd2 contains unique keys
To avoid high cardinality keys, we artificially generate new keys that have the following format (key, category) where category is a number between [1, n], then perform the join.
- left_outer_join(rdd1: RDD[R1], rdd2: RDD[R2], rdd1_keyfn: Callable[[R1], K1], rdd1_fk_fn: Callable[[R1], List[K2]], rdd2_keyfn: Callable[[R2], K2], join_fn: Callable[[R1, List[Tuple[K2, Optional[R2]]]], Optional[R1]], ser_fn: Optional[Callable[[R1], Union[str, bytes]]] = None, outfile: Optional[str] = None, compression: bool = True) RDD[R1][source]#
Join two RDDs (left outer join) by non primary key in RDD1.
RDD1: contains records of (x, Y, x_data) where x is the id of the record, Y are list of ids of records in RDD2. RDD2: contains records of (y, y_data) where y is the id of the record.
- Parameters:
rdd1 (RDD[R1]) – records of (x, Y, x_data) where x is the id of the record, Y are list of ids of records in RDD2.
rdd2 (RDD[R2]) – records of (y, y_data) where y is the id of the record.
rdd1_keyfn (Callable[[R1], K1]) – function that extract id of a record (x) of RDD1
rdd1_fk_fn (Callable[[R1], List[K2]]) – function that extract Y from a record of RDD1
rdd2_keyfn (Callable[[R2], K2]) – function that extract id of a record (y) of RDD2
rdd1_join_fn (Callable[[R1, List[Tuple[K2, Optional[R2]]]], Optional[None]]) – function that merge list of Y into record R1, if its return not None, we use that value
rdd1_serfn (Optional[Callable[[R1], Union[str, bytes]]]) – function that serialize records of RDD1 to save to file
outfile (Optional[str]) – output file – save the result to file if request
compression (bool, optional) – whether we should compress the result, by default True
join_fn (Callable[[R1, List[Tuple[K2, Optional[R2]]]], Optional[R1]]) –
- Return type:
RDD[R1] the merged records
- left_outer_join_broadcast(rdd1, rdd2, rdd1_fk_fn: Callable[[R1], List[K2]], rdd2_keyfn: Callable[[R2], K2], rdd1_join_fn: Callable[[R1, List[Tuple[K2, Optional[R2]]]], None], rdd1_serfn: Callable[[R1], Union[str, bytes]], outfile: str, compression: bool = True)[source]#
Join two RDDs (left outer join) by non primary key in RDD1. This join assumes that RDD2 can fit in memory, and takes the broadcast approach.
RDD1: contains records of (x, Y, x_data) where x is the id of the record, Y are list of ids of records in RDD2. RDD2: contains records of (y, y_data) where y is the id of the record.
- Parameters:
rdd1 (RDD[R1]) – records of (x, Y, x_data) where x is the id of the record, Y are list of ids of records in RDD2.
rdd2 (RDD[R2]) – records of (y, y_data) where y is the id of the record.
rdd1_fk_fn (Callable[[R1], List[K2]]) – function that extract Y from a record of RDD1
rdd2_keyfn (Callable[[R2], K2]) – function that extract id of a record (y) of RDD2
rdd1_join_fn (Callable[[R1, List[Tuple[K2, Optional[R2]]]], None]) – function that merge list of Y into record R1
rdd1_serfn (Callable[[R1], Union[str, bytes]]) – function that serialize records of RDD1 to save to file
outfile (str) – output file
compression (bool, optional) – whether we should compress the result, by default True
- saveAsSingleTextFile(rdd, outfile: Union[str, Path], compressionCodecClass=None, shuffle=True)[source]#