kgdata.splitter#

Functions to split a big file into smaller files.

Functions

default_currentbyte_constructor(file_object)

Get a function that returns the current byte position that the file reader is currently at.

split_a_file(infile, outfile[, record_iter, ...])

Split a file containing a list of records into smaller files stored in a directory.

split_a_list(lst, outfile[, n_records_per_file])

strip_newline(line)

Strip newline from a line.

write_to_file(outfile_template, ...)

Write records from a queue to a file.

default_currentbyte_constructor(file_object: Union[BZ2File, GzipFile, BinaryIO, TextIOWrapper]) Callable[[], int][source]#

Get a function that returns the current byte position that the file reader is currently at.

Parameters:

file_object (Union[BZ2File, GzipFile, BinaryIO, TextIOWrapper]) –

Return type:

Callable[[], int]

split_a_file(infile: ~typing.Union[str, ~pathlib.Path, ~typing.Callable[[], ~typing.Tuple[int, ~typing.ContextManager[~typing.BinaryIO]]]], outfile: ~typing.Union[str, ~pathlib.Path], record_iter: ~typing.Callable[[~typing.Union[~bz2.BZ2File, ~gzip.GzipFile, ~typing.BinaryIO]], ~typing.Iterable[bytes]] = <function identity_func>, record_postprocess: str = 'kgdata.splitter.strip_newline', currentbyte_constructor: ~typing.Callable[[~typing.Union[~bz2.BZ2File, ~gzip.GzipFile, ~typing.BinaryIO]], ~typing.Callable[[], int]] = <function default_currentbyte_constructor>, override: bool = False, n_writers: int = 8, n_records_per_file: int = 64000)[source]#

Split a file containing a list of records into smaller files stored in a directory. The list of records are written in a round-robin fashion by multiple writers (processes) in parallel but read process is run in sequence.

Parameters:
  • infile (Union[str, Path, Callable[[], Tuple[int, ContextManager[BinaryIO]]]]) – path of input file (e.g., ‘/data/input/bigfile.json.gz’) or a function that returns a file object (opened in binary mode) and its size in bytes.

  • outfile (Union[str, Path]) – template of path of output file (e.g., ‘/data/outputs/smallfile.json.gz’) from the template, this function will write to files in the parent folder (e.g., ‘/data/outputs’) with files named ‘smallfile-<number>.json.gz’ and an extra file named ‘_SUCCESS’ to indicate that the job is done.

  • record_iter (Callable[[Union[BZ2File, GzipFile, BinaryIO]], Iterable[bytes]]) – a function that returns an iterator of records given a file object, by default it returns the file object itself.

  • record_postprocess (str) – name/path to import the function that post-process an record. by default we strip the newline from the end of the string. when the function returns None, skip the record.

  • currentbyte_constructor (Callable[[Union[BZ2File, GzipFile, BinaryIO]], Callable[[], int]]) – a function that returns a function that returns the current byte position of a file object.

  • override (bool) – whether to override existing files.

  • n_writers (int) – number of parallel writers.

  • n_records_per_file (int) – number of records per file.

write_to_file(outfile_template: str, n_records_per_file: int, record_postprocessing: str, queue: Queue)[source]#

Write records from a queue to a file.

Parameters:
  • outfile_template (str) – template of path of output file

  • n_records_per_file (int) – number of records per file

  • record_postprocessing (str) – name/path to import the function that post-process an record. the function can return None to skip the record.

  • queue (Queue) – a queue that yields records to be written to a file, when it yields None, the writer stops.

strip_newline(line: bytes) bytes[source]#

Strip newline from a line.

Parameters:

line (bytes) –

Return type:

bytes

split_a_list(lst: List[bytes], outfile: Union[str, Path], n_records_per_file: int = 64000)[source]#
Parameters: