import gzip
import io
import tarfile
import zipfile
from pathlib import Path
from typing import List
import pytest
from gobbli.util import (
TokenizeMethod,
blob_to_dir,
detokenize,
dir_to_blob,
extract_archive,
is_archive,
shuffle_together,
tokenize,
)
[docs]def make_zip(tmpdir: Path, relative_paths: List[Path]) -> Path:
"""
Make a zip archive from a list of relative paths.
Create empty files at each path and add them to the archive.
"""
zip_path = tmpdir / "test.zip"
with zipfile.ZipFile(zip_path, "w") as z:
for relative_path in relative_paths:
full_path = tmpdir / relative_path
full_path.parent.mkdir(exist_ok=True, parents=True)
full_path.touch()
z.write(full_path, arcname=relative_path)
return zip_path
[docs]def make_tar_gz(tmpdir: Path, relative_paths: List[Path]) -> Path:
"""
Make a .tar.gz archive from a list of relative paths.
Create empty files at each path and add them to the archive.
"""
tar_path = tmpdir / "test.tar.gz"
with tarfile.open(tar_path, "w:gz") as z:
for relative_path in relative_paths:
full_path = tmpdir / relative_path
full_path.parent.mkdir(exist_ok=True, parents=True)
full_path.touch()
z.add(str(full_path), arcname=str(relative_path), recursive=False)
return tar_path
[docs]def make_gz(tmpdir: Path, name: str) -> Path:
"""
Create a gzip-compressed file with the given name under the given temp directory.
Return the path to the compressed file.
"""
gzip_path = tmpdir / f"{name}.gz"
with gzip.open(gzip_path, "wb") as z:
z.write(b"Test")
return gzip_path
TEST_ARCHIVE_DATA = ["./a", "./b/c"]
[docs]@pytest.mark.parametrize(
"name,expected_is_archive",
[
("test.tar.gz", True),
("test.gz", True),
("test.txt.gz", True),
("test.zip", True),
("test.xz", False),
("test.txt", False),
("test.vec", False),
("test.bin", False),
],
)
def test_is_archive(name, expected_is_archive):
assert is_archive(Path(name)) == expected_is_archive
[docs]def test_dir_to_blob(tmpdir):
test_dir = Path(tmpdir) / "test"
test_dir.mkdir()
test_file_name = "test.txt"
test_file = test_dir / test_file_name
file_contents = "test"
test_file.write_text(file_contents)
blob = dir_to_blob(test_dir)
fileobj = io.BytesIO(blob)
fileobj.seek(0)
extract_path = test_dir / "test2"
with tarfile.open(fileobj=fileobj, mode="r:gz") as archive:
archive.extractall(extract_path)
extracted_file = extract_path / test_file_name
assert extracted_file.exists()
assert extracted_file.read_text() == file_contents
[docs]def test_blob_to_dir(tmpdir):
test_dir = Path(tmpdir) / "test"
test_dir.mkdir()
test_file_name = "test.txt"
test_file = test_dir / test_file_name
file_contents = "test"
test_file.write_text(file_contents)
blob = dir_to_blob(test_dir)
extract_path = test_dir / "test2"
blob_to_dir(blob, extract_path)
extracted_file = extract_path / test_file_name
assert extracted_file.exists()
assert extracted_file.read_text() == file_contents
[docs]@pytest.mark.parametrize(
"l1,l2,err",
[
([], [], None),
(["a"], [1], None),
(["a", "b"], [1], ValueError),
(["a", "b"], [1, 2], None),
(["a", "b", "c"], [1, 2, 3], None),
(["a", "b", "c", "d"], [1, 2, 3, 4], None),
],
)
def test_shuffle_together(l1, l2, err):
seed = 1
if err is not None:
with pytest.raises(err):
shuffle_together(l1, l2, seed=seed)
else:
original_rows = set(zip(l1, l2))
shuffle_together(l1, l2, seed=seed)
for row in zip(l1, l2):
assert tuple(row) in original_rows
[docs]@pytest.mark.parametrize(
"text,tokens",
[
("This is a test.", ["this", "is", "a", "test."]),
("Two spaces", ["two", "spaces"]),
("Hyphenated-word", ["hyphenated-word"]),
("Numbers 1 and 2", ["numbers", "1", "and", "2"]),
],
)
def test_tokenize_split(text, tokens):
# Whitespace tokenization just splits on whitespace
assert tokenize(TokenizeMethod.SPLIT, [text]) == [tokens]
[docs]@pytest.mark.parametrize(
"text,tokens",
[
("This is a test.", ["this", "is", "a", "test"]),
("Two spaces", ["two", "spaces"]),
("Hyphenated-word", ["hyphenated", "word"]),
("Numbers 1 and 2", ["numbers", "and"]),
],
)
def test_tokenize_spacy(text, tokens):
# Spacy tokenization lowercases and removes non-alphabetic tokens
assert tokenize(TokenizeMethod.SPACY, [text]) == [tokens]
[docs]@pytest.mark.parametrize(
"tokenize_method", [TokenizeMethod.SPLIT, TokenizeMethod.SPACY]
)
@pytest.mark.parametrize(
"tokens,text",
[
(["this", "is", "a", "test"], "this is a test"),
(["hyphenated-word"], "hyphenated-word"),
(["try", ",", "punctuation", "."], "try , punctuation ."),
],
)
def test_detokenize_split_spacy(text, tokens, tokenize_method):
assert detokenize(tokenize_method, [tokens]) == [text]
[docs]@pytest.mark.parametrize("model_path", [Path("spm"), None])
def test_tokenize_detokenize_sentencepiece(tmpdir, model_path):
texts = ["a b c", "a ab c", "a b ac"]
# Model should be trained
if model_path is not None:
model_path = Path(tmpdir) / model_path
tokens = tokenize(
TokenizeMethod.SENTENCEPIECE, texts, model_path=model_path, vocab_size=7
)
# Control sequence indicating whitespace
_ = "▁"
expected_tokens = [
[_, "a", _, "b", _, "c"],
[_, "a", _, "a", "b", _, "c"],
[_, "a", _, "b", _, "a", "c"],
]
assert tokens == expected_tokens
# Can't detokenize if we didn't give a persistent model path to the tokenize
# function
if model_path is not None:
assert detokenize(TokenizeMethod.SENTENCEPIECE, tokens, model_path) == texts
# Previously should be reused with the old vocab size, and a new model
# shouldn't be trained
tokens = tokenize(TokenizeMethod.SENTENCEPIECE, texts, model_path=model_path)
assert tokens == expected_tokens