Bases: SchemaAgnosticBlocker
Concatenates and tokenizes entity attribute values and blocks on tokens.
Examples:
>>> # doctest: +SKIP
>>> from sylloge import MovieGraphBenchmark
>>> from klinker.data import KlinkerDataset
>>> ds = KlinkerDataset.from_sylloge(MovieGraphBenchmark(),clean=True)
>>> from klinker.blockers import TokenBlocker
>>> blocker = TokenBlocker()
>>> blocks = blocker.assign(left=ds.left, right=ds.right)
Source code in klinker/blockers/token_blocking.py
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115 | class TokenBlocker(SchemaAgnosticBlocker):
"""Concatenates and tokenizes entity attribute values and blocks on tokens.
Examples:
>>> # doctest: +SKIP
>>> from sylloge import MovieGraphBenchmark
>>> from klinker.data import KlinkerDataset
>>> ds = KlinkerDataset.from_sylloge(MovieGraphBenchmark(),clean=True)
>>> from klinker.blockers import TokenBlocker
>>> blocker = TokenBlocker()
>>> blocks = blocker.assign(left=ds.left, right=ds.right)
"""
def __init__(
self,
tokenize_fn: Callable[[str], List[str]] = word_tokenize,
min_token_length: int = 3,
):
self.tokenize_fn = tokenize_fn
self.min_token_length = min_token_length
def _tok_block(self, tab: SeriesType) -> Frame:
"""Perform token blocking on this series.
Args:
tab: SeriesType: series on which token blocking should be done.
Returns:
token blocked series.
"""
name = tab.name
id_col_name = tab.index.name
# TODO figure out why this hack is needed
# i.e. why does dask assume later for the join, that this is named 0
# no matter what it is actually named
tok_name = "tok"
tok_kwargs = dict(
tokenize_fn=self.tokenize_fn, min_token_length=self.min_token_length
)
collect_ids_kwargs = dict(id_col=id_col_name)
if isinstance(tab, dd.Series):
tok_kwargs["meta"] = (tab.name, "O")
collect_ids_kwargs["meta"] = pd.Series(
[],
name=tab.name,
dtype="O",
index=pd.Series([], dtype="O", name=tok_name),
)
return (
tab.apply(tokenize_series, **tok_kwargs)
.explode()
.to_frame()
.reset_index()
.rename(columns={name: tok_name}) # avoid same name for col and index
.groupby(tok_name)
.apply(lambda x, id_col: list(set(x[id_col])), **collect_ids_kwargs)
.to_frame(name=name)
)
def _assign(
self,
left: SeriesType,
right: SeriesType,
left_rel: Optional[KlinkerFrame] = None,
right_rel: Optional[KlinkerFrame] = None,
) -> KlinkerBlockManager:
"""Assign entity ids to blocks.
Args:
left: KlinkerFrame: Contains entity attribute information of left dataset.
right: KlinkerFrame: Contains entity attribute information of right dataset.
left_rel: Optional[KlinkerFrame]: (Default value = None) Contains relational information of left dataset.
right_rel: Optional[KlinkerFrame]: (Default value = None) Contains relational information of left dataset.
Returns:
KlinkerBlockManager: instance holding the resulting blocks.
"""
left_tok = self._tok_block(left)
right_tok = self._tok_block(right)
pd_blocks = left_tok.join(right_tok, how="inner")
if isinstance(pd_blocks, dd.DataFrame):
return KlinkerBlockManager(pd_blocks)
return KlinkerBlockManager.from_pandas(pd_blocks)
|