kgdata.splitter#
Functions to split a big file into smaller files.
Functions
|
Get a function that returns the current byte position that the file reader is currently at. |
|
Split a file containing a list of records into smaller files stored in a directory. |
|
|
|
Strip newline from a line. |
|
Write records from a queue to a file. |
- default_currentbyte_constructor(file_object: Union[BZ2File, GzipFile, BinaryIO, TextIOWrapper]) Callable[[], int][source]#
Get a function that returns the current byte position that the file reader is currently at.
- split_a_file(infile: ~typing.Union[str, ~pathlib.Path, ~typing.Callable[[], ~typing.Tuple[int, ~typing.ContextManager[~typing.BinaryIO]]]], outfile: ~typing.Union[str, ~pathlib.Path], record_iter: ~typing.Callable[[~typing.Union[~bz2.BZ2File, ~gzip.GzipFile, ~typing.BinaryIO]], ~typing.Iterable[bytes]] = <function identity_func>, record_postprocess: str = 'kgdata.splitter.strip_newline', currentbyte_constructor: ~typing.Callable[[~typing.Union[~bz2.BZ2File, ~gzip.GzipFile, ~typing.BinaryIO]], ~typing.Callable[[], int]] = <function default_currentbyte_constructor>, override: bool = False, n_writers: int = 8, n_records_per_file: int = 64000)[source]#
Split a file containing a list of records into smaller files stored in a directory. The list of records are written in a round-robin fashion by multiple writers (processes) in parallel but read process is run in sequence.
- Parameters:
infile (Union[str, Path, Callable[[], Tuple[int, ContextManager[BinaryIO]]]]) – path of input file (e.g., ‘/data/input/bigfile.json.gz’) or a function that returns a file object (opened in binary mode) and its size in bytes.
outfile (Union[str, Path]) – template of path of output file (e.g., ‘/data/outputs/smallfile.json.gz’) from the template, this function will write to files in the parent folder (e.g., ‘/data/outputs’) with files named ‘smallfile-<number>.json.gz’ and an extra file named ‘_SUCCESS’ to indicate that the job is done.
record_iter (Callable[[Union[BZ2File, GzipFile, BinaryIO]], Iterable[bytes]]) – a function that returns an iterator of records given a file object, by default it returns the file object itself.
record_postprocess (str) – name/path to import the function that post-process an record. by default we strip the newline from the end of the string. when the function returns None, skip the record.
currentbyte_constructor (Callable[[Union[BZ2File, GzipFile, BinaryIO]], Callable[[], int]]) – a function that returns a function that returns the current byte position of a file object.
override (bool) – whether to override existing files.
n_writers (int) – number of parallel writers.
n_records_per_file (int) – number of records per file.
- write_to_file(outfile_template: str, n_records_per_file: int, record_postprocessing: str, queue: Queue)[source]#
Write records from a queue to a file.
- Parameters:
outfile_template (str) – template of path of output file
n_records_per_file (int) – number of records per file
record_postprocessing (str) – name/path to import the function that post-process an record. the function can return None to skip the record.
queue (Queue) – a queue that yields records to be written to a file, when it yields None, the writer stops.