Skip to content

klinker

KlinkerBlockManager

Class for handling of blocks.


blocks: dataframe with blocks.

Examples:


>>> from klinker import KlinkerBlockManager
>>> kbm = KlinkerBlockManager.from_dict({ "block1": [[1,3,4],[3,4,5]], "block2": [[3,4,5],[5,6]]}, dataset_names=("A","B"))
>>> kbm.blocks.compute()
                A          B
block1  [1, 3, 4]  [3, 4, 5]
block2  [3, 4, 5]     [5, 6]
>>> kbm["block1"].compute()
                A          B
block1  [1, 3, 4]  [3, 4, 5]
>>> len(kbm)
2
>>> set(kbm.all_pairs())
{(4, 4), (5, 5), (3, 4), (1, 5), (4, 3), (4, 6), (1, 4), (4, 5), (3, 3), (5, 6), (3, 6), (1, 3), (3, 5)}
>>> kbm.block_sizes
block1    6
block2    5
Name: block_sizes, dtype: int64
>>> kbm.mean_block_size
5.5
>>> kbm.to_dict()
{'block1': ([1, 3, 4], [3, 4, 5]), 'block2': ([3, 4, 5], [5, 6])}

```
Source code in klinker/data/blocks.py
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
class KlinkerBlockManager:
    """Class for handling of blocks.

    Args:
    ----
        blocks: dataframe with blocks.

    Examples:
    --------
        >>> from klinker import KlinkerBlockManager
        >>> kbm = KlinkerBlockManager.from_dict({ "block1": [[1,3,4],[3,4,5]], "block2": [[3,4,5],[5,6]]}, dataset_names=("A","B"))
        >>> kbm.blocks.compute()
                        A          B
        block1  [1, 3, 4]  [3, 4, 5]
        block2  [3, 4, 5]     [5, 6]
        >>> kbm["block1"].compute()
                        A          B
        block1  [1, 3, 4]  [3, 4, 5]
        >>> len(kbm)
        2
        >>> set(kbm.all_pairs())
        {(4, 4), (5, 5), (3, 4), (1, 5), (4, 3), (4, 6), (1, 4), (4, 5), (3, 3), (5, 6), (3, 6), (1, 3), (3, 5)}
        >>> kbm.block_sizes
        block1    6
        block2    5
        Name: block_sizes, dtype: int64
        >>> kbm.mean_block_size
        5.5
        >>> kbm.to_dict()
        {'block1': ([1, 3, 4], [3, 4, 5]), 'block2': ([3, 4, 5], [5, 6])}

        ```
    """

    def __init__(self, blocks: dd.DataFrame):
        self.blocks = blocks
        self._grouped: Optional[Tuple] = None

    def __getitem__(self, key):
        return self.blocks.loc[key]

    def __len__(self) -> int:
        return len(self.blocks)

    def __repr__(self) -> str:
        return f"KlinkerBlockManager(blocks=\n{self.blocks.__repr__()})"

    def to_dict(self) -> Dict[Union[str, int], Tuple[Union[str, int], Union[str, int]]]:
        """Return blocks as dict.

        Returns
        -------
          The dict has block names as keys and a tuple of sets of entity ids.
        """
        return (
            self.blocks.apply(tuple, axis=1, meta=pd.Series([], dtype=object))
            .compute()
            .to_dict()
        )

    def find_blocks(self, entity_id: Union[str, int], column_id: int) -> np.ndarray:
        """Find blocks where entity id belongs to.

        Args:
        ----
          entity_id: Union[str, int]: Entity id.
          column_id: int: Whether entity belongs to left (0) or right (1) dataset.

        Returns:
        -------
            Blocks where entity id belongs to.
        """
        if self._grouped is None:
            grouped = []
            for column_name in self.blocks.columns:
                cur_ex = self.blocks[column_name].explode()
                grouped.append(cur_ex.to_frame().groupby(by=column_name))
            self._grouped = tuple(grouped)
        assert self._grouped  # for mypy
        return self._grouped[column_id].get_group(entity_id).index.values.compute()

    def entity_pairs(
        self, entity_id: Union[str, int], column_id: int
    ) -> Generator[Tuple[Union[int, str], ...], None, None]:
        """Get all pairs where this entity shows up.

        Args:
        ----
          entity_id: Union[str, int]: Entity id.
          column_id: int: Whether entity belongs to left (0) or right (1) dataset.

        Returns:
        -------
            Generator for these pairs.
        """
        cur_blocks = self.find_blocks(entity_id, column_id)
        other_column = 0 if column_id == 1 else 1
        other_column_name = self.blocks.columns[other_column]
        return (
            pair
            for blk_name in cur_blocks
            for _, blk in self.blocks.loc[blk_name][other_column_name].compute().items()
            for pair in itertools.product({entity_id}, blk)
        )

    def all_pairs(self) -> Generator[Tuple[Union[int, str], ...], None, None]:
        """Get all pairs.

        Returns
        -------
            Generator that creates all pairs, from blocks (including duplicates).
        """
        for block_tuple in self.blocks.itertuples(index=False, name=None):
            yield from itertools.product(*block_tuple)

    def inner_block_assignments(self) -> dd.DataFrame:
        return self.blocks.applymap(len)

    def block_assignments(self) -> dd.DataFrame:
        return self.inner_block_assignments().sum(axis=1)

    def block_comparisons(self) -> dd.DataFrame:
        ibs = self.inner_block_assignments()
        return ibs[self.blocks.columns[0]] * ibs[self.blocks.columns[1]]

    def individual_blocking_cardinality_per_ds(
        self, dataset_lens: List[int]
    ) -> Tuple[float, float]:
        return tuple(
            self.blocks[col].apply(len, meta=(col, "int64")).sum().compute() / ds_len
            for col, ds_len in zip(self.blocks.columns, dataset_lens)
        )

    def overall_blocking_cardinality(self, dataset_lens: List[int]) -> float:
        return self.block_assignments().sum().compute() / sum(dataset_lens)

    def comparisons_cardinality(self) -> float:
        block_sizes = self.blocks.applymap(len)
        sum_of_block_sizes = block_sizes.sum().sum().compute()
        aggregate_cardinality = (
            (block_sizes[self.blocks.columns[0]] * block_sizes[self.blocks.columns[1]])
            .sum()
            .compute()
        )
        return sum_of_block_sizes / aggregate_cardinality

    def _iterative_get_purge_threshold(self) -> int:
        block_stats = self.inner_block_assignments()
        # add block comparisons to block_stats
        block_stats["ind_block_card"] = (
            block_stats[block_stats.columns[0]] * block_stats[block_stats.columns[1]]
        )
        block_stats = block_stats.reset_index().set_index("ind_block_card")
        block_assignments = 0
        total_comparisons = 0
        last_i_cardinality = 1
        stats = []
        for (
            block_card,
            _,
            left_num_assign,
            right_num_assign,
        ) in block_stats.compute().itertuples(name=None):
            if last_i_cardinality < block_card:
                stats.append(
                    {
                        "i_cardinality": last_i_cardinality,
                        "cc": block_assignments / total_comparisons,
                    }
                )
                last_i_cardinality = block_card
            block_assignments += left_num_assign + right_num_assign
            total_comparisons += block_card

        stats.append(
            {
                "i_cardinality": last_i_cardinality,
                "cc": block_assignments / total_comparisons,
            }
        )
        max_i_cardinality = last_i_cardinality
        for i, cur_stats in enumerate(stats):
            if cur_stats["cc"] == stats[i - 1]["cc"]:
                max_i_cardinality = cur_stats["i_cardinality"]  # type: ignore[assignment]
                break
        return max_i_cardinality

    def _get_purge_threshold(self, round_cc: int) -> int:
        left_col, right_col = self.blocks.columns
        block_stats = self.inner_block_assignments()
        # add block comparisons to block_stats
        block_stats["ind_block_card"] = (
            block_stats[block_stats.columns[0]] * block_stats[block_stats.columns[1]]
        )
        block_stats["block_assignments"] = (
            block_stats[left_col] + block_stats[right_col]
        )
        block_stats["block_card"] = block_stats[left_col] * block_stats[right_col]
        bs = block_stats.reset_index().set_index("ind_block_card").compute()

        bs = bs[~bs.index.duplicated(keep="first")]
        bs["i_card"] = bs["block_card"].cumsum()
        bs["cc"] = bs["block_assignments"].cumsum() / bs["block_card"].cumsum()
        find_mask = bs["cc"].round(round_cc).duplicated(keep="first")
        if find_mask.any():
            return bs[find_mask].head(1)["i_card"].iloc[0]
        return bs.iloc[-1]["i_card"].iloc[0]

    def purge(self, round_cc=int) -> "KlinkerBlockManager":
        purge_threshold = self._get_purge_threshold(round_cc)
        left_col, right_col = self.blocks.columns
        block_stats = self.inner_block_assignments()
        # add block comparisons to block_stats
        block_stats["ind_block_card"] = (
            block_stats[block_stats.columns[0]] * block_stats[block_stats.columns[1]]
        )
        bs = block_stats.compute()
        blocks = self.blocks.loc[bs[bs["ind_block_card"] <= purge_threshold].index]
        return KlinkerBlockManager(blocks)

    @classmethod
    def combine(
        cls, this: "KlinkerBlockManager", other: "KlinkerBlockManager"
    ) -> "KlinkerBlockManager":
        """Combine blocks.

        Args:
        ----
          this: one block manager to combine
          other: other block manager to combine

        Returns:
        -------
          Combined KlinkerBlockManager

        Examples:
        --------
            >>> from klinker import KlinkerBlockManager
            >>> kbm = KlinkerBlockManager.from_dict({"block1": [[1,3,4],[3,4,5]], "block2": [[3,4,5],[5,6]]}, dataset_names=("A","B"))
            >>> kbm2 = KlinkerBlockManager.from_dict({"block3": [[7,4],[12,8]]}, dataset_names=("A","B"))
            >>> kbm_merged = KlinkerBlockManager.combine(kbm, kbm2)
            >>> kbm_merged.blocks.compute()
                            A          B
            block1  [1, 3, 4]  [3, 4, 5]
            block2  [3, 4, 5]     [5, 6]
            block3     [7, 4]    [12, 8]

        """

        def _merge_blocks(
            row: pd.Series, output_names: Sequence[str], left_right_names: Sequence[str]
        ):
            nonnull = row[~row.isnull()]
            if len(nonnull) == 2:  # no block overlap
                nonnull.index = output_names
                return nonnull
            else:
                A_left = set(nonnull[left_right_names[0]])
                A_right = set(nonnull[left_right_names[2]])
                B_left = set(nonnull[left_right_names[1]])
                B_right = set(nonnull[left_right_names[3]])
                A = list(A_left.union(A_right))
                B = list(B_left.union(B_right))
                return pd.Series([A, B], index=output_names, name=nonnull.name)

        if list(this.blocks.columns) != list(other.blocks.columns):
            raise ValueError("Cannot combine blocks from different datasets!")

        output_names = this.blocks.columns
        left_suffix = "left"
        right_suffix = "right"
        left_right_names = [
            col + suffix
            for col_names, suffix in zip(
                [this.blocks.columns, other.blocks.columns], [left_suffix, right_suffix]
            )
            for col in col_names
        ]
        joined = this.blocks.join(
            other.blocks, how="outer", lsuffix="left", rsuffix="right"
        )

        meta = pd.DataFrame([], columns=output_names)
        return cls(
            joined.apply(
                _merge_blocks,
                output_names=output_names,
                left_right_names=left_right_names,
                axis=1,
                meta=meta,
            )
        )

    def to_parquet(self, path: Union[str, pathlib.Path], **kwargs):
        """Write blocks as parquet file(s).

        Args:
        ----
          path: Union[str, pathlib.Path]: Where to write.
          **kwargs: passed to the parquet function
        """
        if "schema" not in kwargs:
            left, right = self.blocks.columns[:2]
            block_type = pa.list_(pa.string())
            schema = {
                left: block_type,
                right: block_type,
            }
        else:
            schema = kwargs.pop["schema"]  # type: ignore
        try:
            self.blocks.to_parquet(path, schema=schema, **kwargs)
        except ValueError:
            # If index is incorrectly assumed by dask to be string
            # and it turns out to be int64 an error would be thrown
            # This is kind of a dirty hack
            schema["__null_dask_index__"] = pa.int64()
            self.blocks.to_parquet(path, schema=schema, **kwargs)

    @staticmethod
    def read_parquet(
        path: Union[str, pathlib.Path],
        calculate_divisions: bool = True,
        **kwargs,
    ) -> "KlinkerBlockManager":
        """Read blocks from parquet.

        Args:
        ----
          path: Union[str, pathlib.Path]: Path where blocks are stored.
          calculate_divisions: bool: Calculate index divisions.
          **kwargs: Passed to `dd.read_parquet` function.

        Returns:
        -------
            Blocks as KlinkerBlockManager
        """
        if not isinstance(path, pathlib.Path):
            path = pathlib.Path(path)
        if path.joinpath("nn_blocks").exists():
            return CompositeWithNNBasedKlinkerBlockManager.read_parquet(
                path, calculate_divisions=calculate_divisions, **kwargs
            )
        blocks = dd.read_parquet(
            path=path,
            calculate_divisions=calculate_divisions,
            **kwargs,
        )
        if len(blocks.columns) > 2:
            return NNBasedKlinkerBlockManager(blocks)
        # for the rare case, that NN was <=2
        if isinstance(
            blocks[blocks.columns[0]].head(1, npartitions=-1).values[0], (str, int)
        ):
            return NNBasedKlinkerBlockManager(blocks)
        return KlinkerBlockManager(blocks)

    @classmethod
    def from_pandas(
        cls, df: pd.DataFrame, npartitions: int = 1, **kwargs
    ) -> "KlinkerBlockManager":
        """Create from pandas.

        Args:
        ----
          df: pd.DataFrame: DataFrame
          npartitions: int:  Partitions for dask
          **kwargs: Passed to `dd.from_pandas`

        Returns:
        -------
            Blocks as KlinkerBlockManager

        Examples:
        --------
            >>> import pandas as pd
            >>> from klinker import KlinkerBlockManager
            >>> pd_blocks = pd.DataFrame({'A': {'block1': [1, 3, 4], 'block2': [3, 4, 5]}, 'B': {'block1': [3, 4, 5], 'block2': [5, 6]}})
            >>> kbm = KlinkerBlockManager.from_pandas(pd_blocks)

        """
        return cls(dd.from_pandas(df, npartitions=npartitions, **kwargs))

    @classmethod
    def from_dict(
        cls,
        block_dict: Dict[
            BlockIdTypeVar, Tuple[List[EntityIdTypeVar], List[EntityIdTypeVar]]
        ],
        dataset_names: Tuple[str, str] = ("left", "right"),
        npartitions: int = 1,
        **kwargs,
    ) -> "KlinkerBlockManager":
        """Args:
        ----
          block_dict: Dictionary with block information.
          dataset_names: Tuple[str, str]: Tuple of dataset names.
          npartitions: int: Partitions used for dask.
          **kwargs: Passed to `dd.from_dict`.

        Returns
        -------
            Blocks as KlinkerBlockManager

        Examples
        --------
            >>> from klinker import KlinkerBlockManager
            >>> kbm = KlinkerBlockManager.from_dict({"block1": [[1,3,4],[3,4,5]], "block2": [[3,4,5],[5,6]]}, dataset_names=("A","B"))

        """
        return cls(
            dd.from_dict(
                block_dict,
                orient="index",
                columns=dataset_names,
                npartitions=npartitions,
                **kwargs,
            )
        )

    @classmethod
    @deprecated(reason="Please use parquet files")
    def read_pickle(cls, path) -> "KlinkerBlockManager":
        with open(path, "rb") as in_file:
            res = pickle.load(in_file)
            if isinstance(res, dict):
                return cls.from_dict(res)
            elif isinstance(res, pd.DataFrame):
                return cls.from_pandas(res)
            elif hasattr(res, "blocks") and isinstance(res.blocks, dict):
                return cls.from_dict(
                    {
                        bk: (list(left_v), list(right_v))
                        for bk, (left_v, right_v) in res.blocks.items()
                    }
                )  # type: ignore
            else:
                raise ValueError(f"Unknown pickled object of type {type(res)}")

all_pairs()

Get all pairs.

Returns
Generator that creates all pairs, from blocks (including duplicates).
Source code in klinker/data/blocks.py
131
132
133
134
135
136
137
138
139
def all_pairs(self) -> Generator[Tuple[Union[int, str], ...], None, None]:
    """Get all pairs.

    Returns
    -------
        Generator that creates all pairs, from blocks (including duplicates).
    """
    for block_tuple in self.blocks.itertuples(index=False, name=None):
        yield from itertools.product(*block_tuple)

combine(this, other) classmethod

Combine blocks.


this: one block manager to combine other: other block manager to combine


Combined KlinkerBlockManager

Examples:


>>> from klinker import KlinkerBlockManager
>>> kbm = KlinkerBlockManager.from_dict({"block1": [[1,3,4],[3,4,5]], "block2": [[3,4,5],[5,6]]}, dataset_names=("A","B"))
>>> kbm2 = KlinkerBlockManager.from_dict({"block3": [[7,4],[12,8]]}, dataset_names=("A","B"))
>>> kbm_merged = KlinkerBlockManager.combine(kbm, kbm2)
>>> kbm_merged.blocks.compute()
                A          B
block1  [1, 3, 4]  [3, 4, 5]
block2  [3, 4, 5]     [5, 6]
block3     [7, 4]    [12, 8]
Source code in klinker/data/blocks.py
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
@classmethod
def combine(
    cls, this: "KlinkerBlockManager", other: "KlinkerBlockManager"
) -> "KlinkerBlockManager":
    """Combine blocks.

    Args:
    ----
      this: one block manager to combine
      other: other block manager to combine

    Returns:
    -------
      Combined KlinkerBlockManager

    Examples:
    --------
        >>> from klinker import KlinkerBlockManager
        >>> kbm = KlinkerBlockManager.from_dict({"block1": [[1,3,4],[3,4,5]], "block2": [[3,4,5],[5,6]]}, dataset_names=("A","B"))
        >>> kbm2 = KlinkerBlockManager.from_dict({"block3": [[7,4],[12,8]]}, dataset_names=("A","B"))
        >>> kbm_merged = KlinkerBlockManager.combine(kbm, kbm2)
        >>> kbm_merged.blocks.compute()
                        A          B
        block1  [1, 3, 4]  [3, 4, 5]
        block2  [3, 4, 5]     [5, 6]
        block3     [7, 4]    [12, 8]

    """

    def _merge_blocks(
        row: pd.Series, output_names: Sequence[str], left_right_names: Sequence[str]
    ):
        nonnull = row[~row.isnull()]
        if len(nonnull) == 2:  # no block overlap
            nonnull.index = output_names
            return nonnull
        else:
            A_left = set(nonnull[left_right_names[0]])
            A_right = set(nonnull[left_right_names[2]])
            B_left = set(nonnull[left_right_names[1]])
            B_right = set(nonnull[left_right_names[3]])
            A = list(A_left.union(A_right))
            B = list(B_left.union(B_right))
            return pd.Series([A, B], index=output_names, name=nonnull.name)

    if list(this.blocks.columns) != list(other.blocks.columns):
        raise ValueError("Cannot combine blocks from different datasets!")

    output_names = this.blocks.columns
    left_suffix = "left"
    right_suffix = "right"
    left_right_names = [
        col + suffix
        for col_names, suffix in zip(
            [this.blocks.columns, other.blocks.columns], [left_suffix, right_suffix]
        )
        for col in col_names
    ]
    joined = this.blocks.join(
        other.blocks, how="outer", lsuffix="left", rsuffix="right"
    )

    meta = pd.DataFrame([], columns=output_names)
    return cls(
        joined.apply(
            _merge_blocks,
            output_names=output_names,
            left_right_names=left_right_names,
            axis=1,
            meta=meta,
        )
    )

entity_pairs(entity_id, column_id)

Get all pairs where this entity shows up.


entity_id: Union[str, int]: Entity id. column_id: int: Whether entity belongs to left (0) or right (1) dataset.


Generator for these pairs.
Source code in klinker/data/blocks.py
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
def entity_pairs(
    self, entity_id: Union[str, int], column_id: int
) -> Generator[Tuple[Union[int, str], ...], None, None]:
    """Get all pairs where this entity shows up.

    Args:
    ----
      entity_id: Union[str, int]: Entity id.
      column_id: int: Whether entity belongs to left (0) or right (1) dataset.

    Returns:
    -------
        Generator for these pairs.
    """
    cur_blocks = self.find_blocks(entity_id, column_id)
    other_column = 0 if column_id == 1 else 1
    other_column_name = self.blocks.columns[other_column]
    return (
        pair
        for blk_name in cur_blocks
        for _, blk in self.blocks.loc[blk_name][other_column_name].compute().items()
        for pair in itertools.product({entity_id}, blk)
    )

find_blocks(entity_id, column_id)

Find blocks where entity id belongs to.


entity_id: Union[str, int]: Entity id. column_id: int: Whether entity belongs to left (0) or right (1) dataset.


Blocks where entity id belongs to.
Source code in klinker/data/blocks.py
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
def find_blocks(self, entity_id: Union[str, int], column_id: int) -> np.ndarray:
    """Find blocks where entity id belongs to.

    Args:
    ----
      entity_id: Union[str, int]: Entity id.
      column_id: int: Whether entity belongs to left (0) or right (1) dataset.

    Returns:
    -------
        Blocks where entity id belongs to.
    """
    if self._grouped is None:
        grouped = []
        for column_name in self.blocks.columns:
            cur_ex = self.blocks[column_name].explode()
            grouped.append(cur_ex.to_frame().groupby(by=column_name))
        self._grouped = tuple(grouped)
    assert self._grouped  # for mypy
    return self._grouped[column_id].get_group(entity_id).index.values.compute()

from_dict(block_dict, dataset_names=('left', 'right'), npartitions=1, **kwargs) classmethod


block_dict: Dictionary with block information. dataset_names: Tuple[str, str]: Tuple of dataset names. npartitions: int: Partitions used for dask. **kwargs: Passed to dd.from_dict.

Returns
Blocks as KlinkerBlockManager
Examples
>>> from klinker import KlinkerBlockManager
>>> kbm = KlinkerBlockManager.from_dict({"block1": [[1,3,4],[3,4,5]], "block2": [[3,4,5],[5,6]]}, dataset_names=("A","B"))
Source code in klinker/data/blocks.py
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
@classmethod
def from_dict(
    cls,
    block_dict: Dict[
        BlockIdTypeVar, Tuple[List[EntityIdTypeVar], List[EntityIdTypeVar]]
    ],
    dataset_names: Tuple[str, str] = ("left", "right"),
    npartitions: int = 1,
    **kwargs,
) -> "KlinkerBlockManager":
    """Args:
    ----
      block_dict: Dictionary with block information.
      dataset_names: Tuple[str, str]: Tuple of dataset names.
      npartitions: int: Partitions used for dask.
      **kwargs: Passed to `dd.from_dict`.

    Returns
    -------
        Blocks as KlinkerBlockManager

    Examples
    --------
        >>> from klinker import KlinkerBlockManager
        >>> kbm = KlinkerBlockManager.from_dict({"block1": [[1,3,4],[3,4,5]], "block2": [[3,4,5],[5,6]]}, dataset_names=("A","B"))

    """
    return cls(
        dd.from_dict(
            block_dict,
            orient="index",
            columns=dataset_names,
            npartitions=npartitions,
            **kwargs,
        )
    )

from_pandas(df, npartitions=1, **kwargs) classmethod

Create from pandas.


df: pd.DataFrame: DataFrame npartitions: int: Partitions for dask **kwargs: Passed to dd.from_pandas


Blocks as KlinkerBlockManager

Examples:


>>> import pandas as pd
>>> from klinker import KlinkerBlockManager
>>> pd_blocks = pd.DataFrame({'A': {'block1': [1, 3, 4], 'block2': [3, 4, 5]}, 'B': {'block1': [3, 4, 5], 'block2': [5, 6]}})
>>> kbm = KlinkerBlockManager.from_pandas(pd_blocks)
Source code in klinker/data/blocks.py
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
@classmethod
def from_pandas(
    cls, df: pd.DataFrame, npartitions: int = 1, **kwargs
) -> "KlinkerBlockManager":
    """Create from pandas.

    Args:
    ----
      df: pd.DataFrame: DataFrame
      npartitions: int:  Partitions for dask
      **kwargs: Passed to `dd.from_pandas`

    Returns:
    -------
        Blocks as KlinkerBlockManager

    Examples:
    --------
        >>> import pandas as pd
        >>> from klinker import KlinkerBlockManager
        >>> pd_blocks = pd.DataFrame({'A': {'block1': [1, 3, 4], 'block2': [3, 4, 5]}, 'B': {'block1': [3, 4, 5], 'block2': [5, 6]}})
        >>> kbm = KlinkerBlockManager.from_pandas(pd_blocks)

    """
    return cls(dd.from_pandas(df, npartitions=npartitions, **kwargs))

read_parquet(path, calculate_divisions=True, **kwargs) staticmethod

Read blocks from parquet.


path: Union[str, pathlib.Path]: Path where blocks are stored. calculate_divisions: bool: Calculate index divisions. **kwargs: Passed to dd.read_parquet function.


Blocks as KlinkerBlockManager
Source code in klinker/data/blocks.py
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
@staticmethod
def read_parquet(
    path: Union[str, pathlib.Path],
    calculate_divisions: bool = True,
    **kwargs,
) -> "KlinkerBlockManager":
    """Read blocks from parquet.

    Args:
    ----
      path: Union[str, pathlib.Path]: Path where blocks are stored.
      calculate_divisions: bool: Calculate index divisions.
      **kwargs: Passed to `dd.read_parquet` function.

    Returns:
    -------
        Blocks as KlinkerBlockManager
    """
    if not isinstance(path, pathlib.Path):
        path = pathlib.Path(path)
    if path.joinpath("nn_blocks").exists():
        return CompositeWithNNBasedKlinkerBlockManager.read_parquet(
            path, calculate_divisions=calculate_divisions, **kwargs
        )
    blocks = dd.read_parquet(
        path=path,
        calculate_divisions=calculate_divisions,
        **kwargs,
    )
    if len(blocks.columns) > 2:
        return NNBasedKlinkerBlockManager(blocks)
    # for the rare case, that NN was <=2
    if isinstance(
        blocks[blocks.columns[0]].head(1, npartitions=-1).values[0], (str, int)
    ):
        return NNBasedKlinkerBlockManager(blocks)
    return KlinkerBlockManager(blocks)

to_dict()

Return blocks as dict.

Returns

The dict has block names as keys and a tuple of sets of entity ids.

Source code in klinker/data/blocks.py
73
74
75
76
77
78
79
80
81
82
83
84
def to_dict(self) -> Dict[Union[str, int], Tuple[Union[str, int], Union[str, int]]]:
    """Return blocks as dict.

    Returns
    -------
      The dict has block names as keys and a tuple of sets of entity ids.
    """
    return (
        self.blocks.apply(tuple, axis=1, meta=pd.Series([], dtype=object))
        .compute()
        .to_dict()
    )

to_parquet(path, **kwargs)

Write blocks as parquet file(s).


path: Union[str, pathlib.Path]: Where to write. **kwargs: passed to the parquet function

Source code in klinker/data/blocks.py
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
def to_parquet(self, path: Union[str, pathlib.Path], **kwargs):
    """Write blocks as parquet file(s).

    Args:
    ----
      path: Union[str, pathlib.Path]: Where to write.
      **kwargs: passed to the parquet function
    """
    if "schema" not in kwargs:
        left, right = self.blocks.columns[:2]
        block_type = pa.list_(pa.string())
        schema = {
            left: block_type,
            right: block_type,
        }
    else:
        schema = kwargs.pop["schema"]  # type: ignore
    try:
        self.blocks.to_parquet(path, schema=schema, **kwargs)
    except ValueError:
        # If index is incorrectly assumed by dask to be string
        # and it turns out to be int64 an error would be thrown
        # This is kind of a dirty hack
        schema["__null_dask_index__"] = pa.int64()
        self.blocks.to_parquet(path, schema=schema, **kwargs)

KlinkerDaskFrame

Bases: DataFrame, AbstractKlinkerFrame

Parallel KlinkerFrame.

Please don't use the __init__ method but rather from_dask_dataframe for initialisation!


dsk: The dask graph to compute this KlinkerFrame name: The key prefix that specifies which keys in the dask comprise this particular KlinkerFrame meta: An empty klinkerframe object with names, dtypes, and indices matching the expected output. divisions: Values along which we partition our blocks on the index


KlinkerDaskFrame

Examples:


>>> import pandas as pd
>>> from klinker.data import KlinkerDaskFrame
>>> import dask.dataframe as dd
>>> df = dd.from_pandas(pd.DataFrame([("1","John", "Doe"),("2","Jane","Doe")],columns=["id","first name", "surname"]),npartitions=1)
>>> kdf = KlinkerDaskFrame.from_dask_dataframe(df, table_name="A", id_col="id")
>>> kdf
Dask KlinkerDaskFrame Structure:
                   id first name surname
npartitions=1
0              object     object  object
1                 ...        ...     ...
Dask Name: KlinkerPandasFrame, 2 graph layers
Table Name: A, id_col: id
Source code in klinker/data/enhanced_df.py
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
class KlinkerDaskFrame(dd.core.DataFrame, AbstractKlinkerFrame):
    """Parallel KlinkerFrame.

    Please don't use the `__init__` method but rather `from_dask_dataframe` for
    initialisation!

    Args:
    ----
      dsk: The dask graph to compute this KlinkerFrame
      name: The key prefix that specifies which keys in the dask comprise this particular KlinkerFrame
      meta: An empty klinkerframe object with names, dtypes, and indices matching the expected output.
      divisions: Values along which we partition our blocks on the index

    Returns:
    -------
        KlinkerDaskFrame

    Examples:
    --------
        >>> import pandas as pd
        >>> from klinker.data import KlinkerDaskFrame
        >>> import dask.dataframe as dd
        >>> df = dd.from_pandas(pd.DataFrame([("1","John", "Doe"),("2","Jane","Doe")],columns=["id","first name", "surname"]),npartitions=1)
        >>> kdf = KlinkerDaskFrame.from_dask_dataframe(df, table_name="A", id_col="id")
        >>> kdf
        Dask KlinkerDaskFrame Structure:
                           id first name surname
        npartitions=1
        0              object     object  object
        1                 ...        ...     ...
        Dask Name: KlinkerPandasFrame, 2 graph layers
        Table Name: A, id_col: id

    """

    _partition_type = KlinkerPandasFrame

    def __init__(
        self,
        dsk,
        name,
        meta,
        divisions,
        table_name: Optional[str] = None,
        id_col: str = "id",
    ):
        super().__init__(dsk, name, meta, divisions)
        if table_name is None:
            self._table_name = meta.table_name
            self._id_col = meta.id_col
        else:
            self._table_name = table_name
            self._id_col = id_col

    @staticmethod
    def _static_propagate_klinker_attributes(
        new_object: "KlinkerDaskFrame", table_name: str, id_col: str
    ) -> "KlinkerDaskFrame":
        new_object.table_name = table_name
        new_object.id_col = id_col
        return new_object

    @property
    def non_id_columns(self) -> List[str]:
        """All columns which are not `id_col`."""
        return self._meta.non_id_columns

    @classmethod
    def _upgrade_from_series(
        cls,
        series,
        columns: List[str],
        table_name: Optional[str],
        id_col: str,
        reset_index: bool = True,
        meta=no_default,
    ) -> "KlinkerFrame":
        assert table_name
        kf = series.map_partitions(
            KlinkerPandasFrame._upgrade_from_series,
            columns=columns,
            table_name=table_name,
            id_col=id_col,
            reset_index=reset_index,
            meta=meta,
        )
        return KlinkerDaskFrame._static_propagate_klinker_attributes(
            kf, table_name, id_col
        )

    def concat_values(
        self,
    ) -> dd.Series:
        """Concatenate attribute values.

        Returns
        -------
            dd.Series with concatenated values and id_col as index.

        Examples
        --------
            >>> import pandas as pd
            >>> from klinker.data import KlinkerDaskFrame
            >>> import dask.dataframe as dd
            >>> df = dd.from_pandas(pd.DataFrame([("1","John", "Doe"),("2","Jane","Doe")],columns=["id","first name", "surname"]),npartitions=1)
            >>> kdf = KlinkerDaskFrame.from_dask_dataframe(df, table_name="A", id_col="id")
            >>> kdf.concat_values().compute()
            id
            1    John Doe
            2    Jane Doe
            Name: A, dtype: object

        """
        self = self.fillna("")
        assert self.table_name
        meta = pd.Series([], name=self.table_name, dtype="str")
        meta.index.name = self.id_col
        return self.map_partitions(
            M.concat_values,
            meta=meta,
        )

    @classmethod
    def from_dask_dataframe(
        cls,
        df: dd.DataFrame,
        table_name: str,
        id_col: str,
        meta=no_default,
        construction_class: Type[KlinkerPandasFrame] = KlinkerPandasFrame,
    ) -> "KlinkerDaskFrame":
        """Create KlinkDaskFrame from dask dataframe.

        Args:
        ----
          df: dd.DataFrame: Dask dataframe.
          table_name: str: Name of dataset.
          id_col: str: Column where entity_ids are stored
          meta: meta for dask
          construction_class: Either :class:`KlinkerPandasFrame` or :class:`KlinkerTriplePandasFrame`

        Returns:
        -------
            KlinkerDaskFrame

        Examples:
        --------
            >>> import pandas as pd
            >>> from klinker.data import KlinkerDaskFrame
            >>> import dask.dataframe as dd
            >>> df = dd.from_pandas(pd.DataFrame([("1","John", "Doe"),("2","Jane","Doe")],columns=["id","first name", "surname"]),npartitions=1)
            >>> kdf = KlinkerDaskFrame.from_dask_dataframe(df, table_name="A", id_col="id")
            >>> kdf
            Dask KlinkerDaskFrame Structure:
                               id first name surname
            npartitions=1
            0              object     object  object
            1                 ...        ...     ...
            Dask Name: KlinkerPandasFrame, 2 graph layers
            Table Name: A, id_col: id

        """
        new_df = df.map_partitions(
            construction_class,
            table_name=table_name,
            id_col=id_col,
            meta=meta,
        )
        meta = new_df._meta if meta is no_default else meta
        return cls(
            dsk=new_df.dask,
            name=new_df._name,
            meta=meta,
            divisions=new_df.divisions,
            table_name=table_name,
            id_col=id_col,
        )

    def __repr__(self) -> str:
        return (
            super().__repr__()
            + f"\nTable Name: {self.table_name}, id_col: {self.id_col}"
        )

non_id_columns: List[str] property

All columns which are not id_col.

concat_values()

Concatenate attribute values.

Returns
dd.Series with concatenated values and id_col as index.
Examples
>>> import pandas as pd
>>> from klinker.data import KlinkerDaskFrame
>>> import dask.dataframe as dd
>>> df = dd.from_pandas(pd.DataFrame([("1","John", "Doe"),("2","Jane","Doe")],columns=["id","first name", "surname"]),npartitions=1)
>>> kdf = KlinkerDaskFrame.from_dask_dataframe(df, table_name="A", id_col="id")
>>> kdf.concat_values().compute()
id
1    John Doe
2    Jane Doe
Name: A, dtype: object
Source code in klinker/data/enhanced_df.py
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
def concat_values(
    self,
) -> dd.Series:
    """Concatenate attribute values.

    Returns
    -------
        dd.Series with concatenated values and id_col as index.

    Examples
    --------
        >>> import pandas as pd
        >>> from klinker.data import KlinkerDaskFrame
        >>> import dask.dataframe as dd
        >>> df = dd.from_pandas(pd.DataFrame([("1","John", "Doe"),("2","Jane","Doe")],columns=["id","first name", "surname"]),npartitions=1)
        >>> kdf = KlinkerDaskFrame.from_dask_dataframe(df, table_name="A", id_col="id")
        >>> kdf.concat_values().compute()
        id
        1    John Doe
        2    Jane Doe
        Name: A, dtype: object

    """
    self = self.fillna("")
    assert self.table_name
    meta = pd.Series([], name=self.table_name, dtype="str")
    meta.index.name = self.id_col
    return self.map_partitions(
        M.concat_values,
        meta=meta,
    )

from_dask_dataframe(df, table_name, id_col, meta=no_default, construction_class=KlinkerPandasFrame) classmethod

Create KlinkDaskFrame from dask dataframe.


df: dd.DataFrame: Dask dataframe. table_name: str: Name of dataset. id_col: str: Column where entity_ids are stored meta: meta for dask construction_class: Either :class:KlinkerPandasFrame or :class:KlinkerTriplePandasFrame


KlinkerDaskFrame

Examples:


>>> import pandas as pd
>>> from klinker.data import KlinkerDaskFrame
>>> import dask.dataframe as dd
>>> df = dd.from_pandas(pd.DataFrame([("1","John", "Doe"),("2","Jane","Doe")],columns=["id","first name", "surname"]),npartitions=1)
>>> kdf = KlinkerDaskFrame.from_dask_dataframe(df, table_name="A", id_col="id")
>>> kdf
Dask KlinkerDaskFrame Structure:
                   id first name surname
npartitions=1
0              object     object  object
1                 ...        ...     ...
Dask Name: KlinkerPandasFrame, 2 graph layers
Table Name: A, id_col: id
Source code in klinker/data/enhanced_df.py
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
@classmethod
def from_dask_dataframe(
    cls,
    df: dd.DataFrame,
    table_name: str,
    id_col: str,
    meta=no_default,
    construction_class: Type[KlinkerPandasFrame] = KlinkerPandasFrame,
) -> "KlinkerDaskFrame":
    """Create KlinkDaskFrame from dask dataframe.

    Args:
    ----
      df: dd.DataFrame: Dask dataframe.
      table_name: str: Name of dataset.
      id_col: str: Column where entity_ids are stored
      meta: meta for dask
      construction_class: Either :class:`KlinkerPandasFrame` or :class:`KlinkerTriplePandasFrame`

    Returns:
    -------
        KlinkerDaskFrame

    Examples:
    --------
        >>> import pandas as pd
        >>> from klinker.data import KlinkerDaskFrame
        >>> import dask.dataframe as dd
        >>> df = dd.from_pandas(pd.DataFrame([("1","John", "Doe"),("2","Jane","Doe")],columns=["id","first name", "surname"]),npartitions=1)
        >>> kdf = KlinkerDaskFrame.from_dask_dataframe(df, table_name="A", id_col="id")
        >>> kdf
        Dask KlinkerDaskFrame Structure:
                           id first name surname
        npartitions=1
        0              object     object  object
        1                 ...        ...     ...
        Dask Name: KlinkerPandasFrame, 2 graph layers
        Table Name: A, id_col: id

    """
    new_df = df.map_partitions(
        construction_class,
        table_name=table_name,
        id_col=id_col,
        meta=meta,
    )
    meta = new_df._meta if meta is no_default else meta
    return cls(
        dsk=new_df.dask,
        name=new_df._name,
        meta=meta,
        divisions=new_df.divisions,
        table_name=table_name,
        id_col=id_col,
    )

KlinkerDataset dataclass

Helper class to hold info of benchmark datasets.

Source code in klinker/data/ea_dataset.py
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
@dataclass
class KlinkerDataset:
    """Helper class to hold info of benchmark datasets."""

    left: KlinkerFrame
    right: KlinkerFrame
    gold: pd.DataFrame
    left_rel: Optional[pd.DataFrame] = None
    right_rel: Optional[pd.DataFrame] = None

    @classmethod
    def from_sylloge(
        cls,
        dataset: MultiSourceEADataset,
        clean: bool = False,
        partition_size: Optional[str] = None,
    ) -> "KlinkerDataset":
        """Create a klinker dataset from sylloge dataset.

        Args:
        ----
          dataset: EADataset: Sylloge dataset.
          clean: bool: Clean attribute information.

        Returns:
        -------
            klinker dataset

        Examples:
        --------
            >>> # doctest: +SKIP
            >>> from sylloge import MovieGraphBenchmark
            >>> ds = KlinkerDataset.from_sylloge(MovieGraphBenchmark())

        """
        left: Union[KlinkerDaskFrame, KlinkerPandasFrame]
        right: Union[KlinkerDaskFrame, KlinkerPandasFrame]
        ds_names = dataset.dataset_names

        attr_left = dataset.attr_triples[0]
        attr_right = dataset.attr_triples[1]
        left_rel = dataset.rel_triples[0]
        right_rel = dataset.rel_triples[1]
        if dataset.backend == "pandas":
            left = KlinkerTriplePandasFrame.from_df(
                attr_left, table_name=ds_names[0], id_col="head"
            )
            right = KlinkerTriplePandasFrame.from_df(
                attr_right, table_name=ds_names[1], id_col="head"
            )
        elif dataset.backend == "dask":
            if partition_size:
                attr_left, attr_right, left_rel, right_rel = [
                    frame.repartition(partition_size=partition_size)
                    for frame in [
                        dataset.attr_triples[0],
                        dataset.attr_triples[1],
                        left_rel,
                        right_rel,
                    ]
                ]
            left = KlinkerTripleDaskFrame.from_dask_dataframe(
                attr_left, table_name=ds_names[0], id_col="head"
            )
            right = KlinkerTripleDaskFrame.from_dask_dataframe(
                attr_right, table_name=ds_names[1], id_col="head"
            )
        else:
            raise ValueError(f"Unknown dataset backend {dataset.backend}")

        if clean:
            # remove datatype
            left["tail"] = left["tail"].map(lambda x: str(x).split("^^")[0])
            right["tail"] = right["tail"].map(lambda x: str(x).split("^^")[0])

        if isinstance(dataset.ent_links, PrefixedClusterHelper):
            ent_links = pd.DataFrame(
                dataset.ent_links.all_pairs_no_intra(), columns=dataset.dataset_names
            )
        else:
            ent_links = dataset.ent_links.rename(
                columns={
                    "left": dataset.dataset_names[0],
                    "right": dataset.dataset_names[1],
                }
            )
        return cls(
            left=left,
            right=right,
            left_rel=left_rel,
            right_rel=right_rel,
            gold=ent_links,
        )

    def _sample_side(
        self, sample: pd.DataFrame, side: Side
    ) -> Tuple[KlinkerFrame, Optional[pd.DataFrame]]:
        if side == "left":
            rel_df = self.left_rel
            attr_df = self.left
            sample_col = sample.columns[0]
        else:
            rel_df = self.right_rel
            attr_df = self.right
            sample_col = sample.columns[1]
        sampled_attr_df = attr_df[attr_df[attr_df.id_col].isin(sample[sample_col])]
        if rel_df is None:
            return sampled_attr_df, None
        return (
            sampled_attr_df,
            rel_df[
                rel_df["head"].isin(sample[sample_col])
                | rel_df["tail"].isin(sample[sample_col])
            ],
        )

    def sample(self, frac: float) -> "KlinkerDataset":
        """Get a sample of the dataset.

        Note:
        ----
            Currently this only takes the first n entities of the gold standard.

        Args:
        ----
          frac: percentage of whole
        Returns:
        -------
            sampled klinker dataset

        Examples:
        --------
            >>> # doctest: +SKIP
            >>> from sylloge import MovieGraphBenchmark
            >>> ds = KlinkerDataset.from_sylloge(MovieGraphBenchmark())
            >>> sampled = ds.sample(0.2)

        """
        # TODO actually sample
        sample_ent_links = self.gold.sample(frac=frac)
        sample_left, sample_left_rel = self._sample_side(sample_ent_links, "left")
        sample_right, sample_right_rel = self._sample_side(sample_ent_links, "right")
        return KlinkerDataset(
            left=sample_left,
            right=sample_right,
            left_rel=sample_left_rel,
            right_rel=sample_right_rel,
            gold=sample_ent_links,
        )

from_sylloge(dataset, clean=False, partition_size=None) classmethod

Create a klinker dataset from sylloge dataset.


dataset: EADataset: Sylloge dataset. clean: bool: Clean attribute information.


klinker dataset

Examples:


>>> # doctest: +SKIP
>>> from sylloge import MovieGraphBenchmark
>>> ds = KlinkerDataset.from_sylloge(MovieGraphBenchmark())
Source code in klinker/data/ea_dataset.py
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
@classmethod
def from_sylloge(
    cls,
    dataset: MultiSourceEADataset,
    clean: bool = False,
    partition_size: Optional[str] = None,
) -> "KlinkerDataset":
    """Create a klinker dataset from sylloge dataset.

    Args:
    ----
      dataset: EADataset: Sylloge dataset.
      clean: bool: Clean attribute information.

    Returns:
    -------
        klinker dataset

    Examples:
    --------
        >>> # doctest: +SKIP
        >>> from sylloge import MovieGraphBenchmark
        >>> ds = KlinkerDataset.from_sylloge(MovieGraphBenchmark())

    """
    left: Union[KlinkerDaskFrame, KlinkerPandasFrame]
    right: Union[KlinkerDaskFrame, KlinkerPandasFrame]
    ds_names = dataset.dataset_names

    attr_left = dataset.attr_triples[0]
    attr_right = dataset.attr_triples[1]
    left_rel = dataset.rel_triples[0]
    right_rel = dataset.rel_triples[1]
    if dataset.backend == "pandas":
        left = KlinkerTriplePandasFrame.from_df(
            attr_left, table_name=ds_names[0], id_col="head"
        )
        right = KlinkerTriplePandasFrame.from_df(
            attr_right, table_name=ds_names[1], id_col="head"
        )
    elif dataset.backend == "dask":
        if partition_size:
            attr_left, attr_right, left_rel, right_rel = [
                frame.repartition(partition_size=partition_size)
                for frame in [
                    dataset.attr_triples[0],
                    dataset.attr_triples[1],
                    left_rel,
                    right_rel,
                ]
            ]
        left = KlinkerTripleDaskFrame.from_dask_dataframe(
            attr_left, table_name=ds_names[0], id_col="head"
        )
        right = KlinkerTripleDaskFrame.from_dask_dataframe(
            attr_right, table_name=ds_names[1], id_col="head"
        )
    else:
        raise ValueError(f"Unknown dataset backend {dataset.backend}")

    if clean:
        # remove datatype
        left["tail"] = left["tail"].map(lambda x: str(x).split("^^")[0])
        right["tail"] = right["tail"].map(lambda x: str(x).split("^^")[0])

    if isinstance(dataset.ent_links, PrefixedClusterHelper):
        ent_links = pd.DataFrame(
            dataset.ent_links.all_pairs_no_intra(), columns=dataset.dataset_names
        )
    else:
        ent_links = dataset.ent_links.rename(
            columns={
                "left": dataset.dataset_names[0],
                "right": dataset.dataset_names[1],
            }
        )
    return cls(
        left=left,
        right=right,
        left_rel=left_rel,
        right_rel=right_rel,
        gold=ent_links,
    )

sample(frac)

Get a sample of the dataset.

Note:
Currently this only takes the first n entities of the gold standard.

frac: percentage of whole Returns:


sampled klinker dataset

Examples:


>>> # doctest: +SKIP
>>> from sylloge import MovieGraphBenchmark
>>> ds = KlinkerDataset.from_sylloge(MovieGraphBenchmark())
>>> sampled = ds.sample(0.2)
Source code in klinker/data/ea_dataset.py
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
def sample(self, frac: float) -> "KlinkerDataset":
    """Get a sample of the dataset.

    Note:
    ----
        Currently this only takes the first n entities of the gold standard.

    Args:
    ----
      frac: percentage of whole
    Returns:
    -------
        sampled klinker dataset

    Examples:
    --------
        >>> # doctest: +SKIP
        >>> from sylloge import MovieGraphBenchmark
        >>> ds = KlinkerDataset.from_sylloge(MovieGraphBenchmark())
        >>> sampled = ds.sample(0.2)

    """
    # TODO actually sample
    sample_ent_links = self.gold.sample(frac=frac)
    sample_left, sample_left_rel = self._sample_side(sample_ent_links, "left")
    sample_right, sample_right_rel = self._sample_side(sample_ent_links, "right")
    return KlinkerDataset(
        left=sample_left,
        right=sample_right,
        left_rel=sample_left_rel,
        right_rel=sample_right_rel,
        gold=sample_ent_links,
    )

NNBasedKlinkerBlockManager

Bases: KlinkerBlockManager

Source code in klinker/data/blocks.py
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
class NNBasedKlinkerBlockManager(KlinkerBlockManager):
    def to_dict(self) -> Dict[Union[str, int], Tuple[Union[str, int], Union[str, int]]]:
        raise NotImplementedError

    def find_blocks(self, entity_id: Union[str, int], column_id: int) -> np.ndarray:
        raise NotImplementedError

    @classmethod
    def from_dict(
        cls,
        block_dict: Dict[
            BlockIdTypeVar, Tuple[List[EntityIdTypeVar], List[EntityIdTypeVar]]
        ],
        dataset_names: Tuple[str, str] = ("left", "right"),
        npartitions: int = 1,
        **kwargs,
    ) -> "KlinkerBlockManager":
        raise NotImplementedError

    def to_parquet(self, path: Union[str, pathlib.Path], **kwargs):
        self.blocks.to_parquet(path, **kwargs)

    def entity_pairs(
        self, entity_id: Union[str, int], column_id: int
    ) -> Generator[Tuple[Union[int, str], ...], None, None]:
        raise NotImplementedError

    def all_pairs(self) -> Generator[Tuple[Union[int, str], ...], None, None]:
        """Get all pairs.

        Returns
        -------
            Generator that creates all pairs, from blocks (including duplicates).
        """
        for row in self.blocks.itertuples(name=None):
            # first entry in itertuples here is index
            for pair in itertools.product([row[0]], row[1:]):
                if pair[1] is None:
                    continue
                yield pair

    @property
    def block_sizes(self) -> pd.DataFrame:
        """Sizes of blocks."""
        return (
            self.blocks.apply(
                np.count_nonzero, axis=1, meta=pd.Series([], dtype="int64")
            ).compute()
            + 1
        )

    @classmethod
    def combine(
        cls, this: "KlinkerBlockManager", other: "KlinkerBlockManager"
    ) -> "NNBasedKlinkerBlockManager":
        len_this = len(this.blocks.columns)
        len_other = len(other.blocks.columns)
        # we need string columns for saving to parquet
        new_cols = list(map(str, range(len_this + len_other)))
        cc = this.blocks.join(other.blocks, lsuffix="l", rsuffix="r", how="outer")
        cc.columns = new_cols
        return cls(cc)

    def _create_stat_df(self, name: str, offset: int) -> dd.DataFrame:
        nn_num = len(self.blocks.columns)
        # to keep index
        empty_df = self.blocks[[]]
        empty_df[name] = offset + nn_num
        return empty_df

    def block_assignments(self) -> dd.DataFrame:
        return self._create_stat_df("block_assignments", 1)

    def block_comparisons(self) -> dd.DataFrame:
        return self._create_stat_df("block_comparisons", 0)

    def individual_blocking_cardinality_per_ds(
        self, dataset_lens: List[int]
    ) -> Tuple[float, float]:
        left = 1.0
        right = len(self.blocks.columns) / dataset_lens[1]
        return left, right

    def overall_blocking_cardinality(self, dataset_lens: List[int]) -> float:
        numerator = dataset_lens[0] + len(self.blocks.columns)
        return numerator / sum(dataset_lens)

    def comparisons_cardinality(self) -> float:
        nn_num = len(self.blocks.columns)
        sum_of_block_sizes = 1 + nn_num
        aggregate_cardinality = 1 * nn_num
        return sum_of_block_sizes / aggregate_cardinality

block_sizes: pd.DataFrame property

Sizes of blocks.

all_pairs()

Get all pairs.

Returns
Generator that creates all pairs, from blocks (including duplicates).
Source code in klinker/data/blocks.py
493
494
495
496
497
498
499
500
501
502
503
504
505
def all_pairs(self) -> Generator[Tuple[Union[int, str], ...], None, None]:
    """Get all pairs.

    Returns
    -------
        Generator that creates all pairs, from blocks (including duplicates).
    """
    for row in self.blocks.itertuples(name=None):
        # first entry in itertuples here is index
        for pair in itertools.product([row[0]], row[1:]):
            if pair[1] is None:
                continue
            yield pair