Skip to content

Dict

Bases: ProcessDict[K, V], IterDict[K, V], NestedDict[K, V], JoinsDict[K, V], FilterDict[K, V], GroupsDict[K, V]

Wrapper for Python dictionaries with chainable methods.

Source code in src/pyochain/_dict/_main.py
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
class Dict[K, V](
    ProcessDict[K, V],
    IterDict[K, V],
    NestedDict[K, V],
    JoinsDict[K, V],
    FilterDict[K, V],
    GroupsDict[K, V],
):
    """
    Wrapper for Python dictionaries with chainable methods.
    """

    __slots__ = ()

    def __repr__(self) -> str:
        return f"{self.__class__.__name__}({dict_repr(self.unwrap())})"

    @staticmethod
    def from_[G, I](data: Mapping[G, I] | SupportsKeysAndGetItem[G, I]) -> Dict[G, I]:
        """
        Create a Dict from a mapping or SupportsKeysAndGetItem.

        Args:
            data: A mapping or object supporting keys and item access to convert into a Dict.

        ```python
        >>> import pyochain as pc
        >>> class MyMapping:
        ...     def __init__(self):
        ...         self._data = {1: "a", 2: "b", 3: "c"}
        ...
        ...     def keys(self):
        ...         return self._data.keys()
        ...
        ...     def __getitem__(self, key):
        ...         return self._data[key]
        >>>
        >>> pc.Dict.from_(MyMapping()).unwrap()
        {1: 'a', 2: 'b', 3: 'c'}

        ```
        """
        return Dict(dict(data))

    @staticmethod
    def from_object(obj: object) -> Dict[str, Any]:
        """
        Create a Dict from an object's __dict__ attribute.

        Args:
            obj: The object whose `__dict__` attribute will be used to create the Dict.

        ```python
        >>> import pyochain as pc
        >>> class Person:
        ...     def __init__(self, name: str, age: int):
        ...         self.name = name
        ...         self.age = age
        >>> person = Person("Alice", 30)
        >>> pc.Dict.from_object(person).unwrap()
        {'name': 'Alice', 'age': 30}

        ```
        """
        return Dict(obj.__dict__)

    def select(self: Dict[str, Any], *exprs: IntoExpr) -> Dict[str, Any]:
        """
        Select and alias fields from the dict based on expressions and/or strings.

        Navigate nested fields using the `pyochain.key` function.

        - Chain `key.key()` calls to access nested fields.
        - Use `key.apply()` to transform values.
        - Use `key.alias()` to rename fields in the resulting dict.

        Args:
            *exprs: Expressions or strings to select and alias fields from the dictionary.

        ```python
        >>> import pyochain as pc
        >>> data = {
        ...     "name": "Alice",
        ...     "age": 30,
        ...     "scores": {"eng": [85, 90, 95], "math": [80, 88, 92]},
        ... }
        >>> scores_expr = pc.key("scores")  # save an expression for reuse
        >>> pc.Dict(data).select(
        ...     pc.key("name").alias("student_name"),
        ...     "age",  # shorthand for pc.key("age")
        ...     scores_expr.key("math").alias("math_scores"),
        ...     scores_expr.key("eng")
        ...     .apply(lambda v: pc.Seq(v).mean())
        ...     .alias("average_eng_score"),
        ... ).unwrap()
        {'student_name': 'Alice', 'age': 30, 'math_scores': [80, 88, 92], 'average_eng_score': 90}

        ```
        """

        def _select(data: dict[str, Any]) -> dict[str, Any]:
            return compute_exprs(exprs, data, {})

        return self.apply(_select)

    def with_fields(self: Dict[str, Any], *exprs: IntoExpr) -> Dict[str, Any]:
        """
        Merge aliased expressions into the root dict (overwrite on collision).

        Args:
            *exprs: Expressions to merge into the root dictionary.

        ```python
        >>> import pyochain as pc
        >>> data = {
        ...     "name": "Alice",
        ...     "age": 30,
        ...     "scores": {"eng": [85, 90, 95], "math": [80, 88, 92]},
        ... }
        >>> scores_expr = pc.key("scores")  # save an expression for reuse
        >>> pc.Dict(data).with_fields(
        ...     scores_expr.key("eng")
        ...     .apply(lambda v: pc.Seq(v).mean())
        ...     .alias("average_eng_score"),
        ... ).unwrap()
        {'name': 'Alice', 'age': 30, 'scores': {'eng': [85, 90, 95], 'math': [80, 88, 92]}, 'average_eng_score': 90}

        ```
        """

        def _with_fields(data: dict[str, Any]) -> dict[str, Any]:
            return compute_exprs(exprs, data, data.copy())

        return self.apply(_with_fields)

    def map_keys[T](self, func: Callable[[K], T]) -> Dict[T, V]:
        """
        Return a Dict with keys transformed by func.

        Args:
            func: Function to apply to each key in the dictionary.

        ```python
        >>> import pyochain as pc
        >>> pc.Dict({"Alice": [20, 15, 30], "Bob": [10, 35]}).map_keys(
        ...     str.lower
        ... ).unwrap()
        {'alice': [20, 15, 30], 'bob': [10, 35]}
        >>>
        >>> pc.Dict({1: "a"}).map_keys(str).unwrap()
        {'1': 'a'}

        ```
        """
        return self.apply(partial(cz.dicttoolz.keymap, func))

    def map_values[T](self, func: Callable[[V], T]) -> Dict[K, T]:
        """
        Return a Dict with values transformed by func.

        Args:
            func: Function to apply to each value in the dictionary.

        ```python
        >>> import pyochain as pc
        >>> pc.Dict({"Alice": [20, 15, 30], "Bob": [10, 35]}).map_values(sum).unwrap()
        {'Alice': 65, 'Bob': 45}
        >>>
        >>> pc.Dict({1: 1}).map_values(lambda v: v + 1).unwrap()
        {1: 2}

        ```
        """
        return self.apply(partial(cz.dicttoolz.valmap, func))

    def map_items[KR, VR](
        self,
        func: Callable[[tuple[K, V]], tuple[KR, VR]],
    ) -> Dict[KR, VR]:
        """
        Transform (key, value) pairs using a function that takes a (key, value) tuple.

        Args:
            func: Function to transform each (key, value) pair into a new (key, value) tuple.

        ```python
        >>> import pyochain as pc
        >>> pc.Dict({"Alice": 10, "Bob": 20}).map_items(
        ...     lambda kv: (kv[0].upper(), kv[1] * 2)
        ... ).unwrap()
        {'ALICE': 20, 'BOB': 40}

        ```
        """
        return self.apply(partial(cz.dicttoolz.itemmap, func))

    def map_kv[KR, VR](
        self,
        func: Callable[[K, V], tuple[KR, VR]],
    ) -> Dict[KR, VR]:
        """
        Transform (key, value) pairs using a function that takes key and value as separate arguments.

        Args:
            func: Function to transform each key and value into a new (key, value) tuple.

        ```python
        >>> import pyochain as pc
        >>> pc.Dict({1: 2}).map_kv(lambda k, v: (k + 1, v * 10)).unwrap()
        {2: 20}

        ```
        """

        def _map_kv(data: dict[K, V]) -> dict[KR, VR]:
            def _(kv: tuple[K, V]) -> tuple[KR, VR]:
                return func(kv[0], kv[1])

            return cz.dicttoolz.itemmap(_, data)

        return self.apply(_map_kv)

    def invert(self) -> Dict[V, list[K]]:
        """
        Invert the dictionary, grouping keys by common (and hashable) values.
        ```python
        >>> import pyochain as pc
        >>> d = {"a": 1, "b": 2, "c": 1}
        >>> pc.Dict(d).invert().unwrap()
        {1: ['a', 'c'], 2: ['b']}

        ```
        """

        def _invert(data: dict[K, V]) -> dict[V, list[K]]:
            inverted: dict[V, list[K]] = defaultdict(list)
            for k, v in data.items():
                inverted[v].append(k)
            return dict(inverted)

        return self.apply(_invert)

    def implode(self) -> Dict[K, list[V]]:
        """
        Nest all the values in lists.
        syntactic sugar for map_values(lambda v: [v])
        ```python
        >>> import pyochain as pc
        >>> pc.Dict({1: 2, 3: 4}).implode().unwrap()
        {1: [2], 3: [4]}

        ```
        """

        def _implode(data: dict[K, V]) -> dict[K, list[V]]:
            def _(v: V) -> list[V]:
                return [v]

            return cz.dicttoolz.valmap(_, data)

        return self.apply(_implode)

    def equals_to(self, other: Self | Mapping[Any, Any]) -> bool:
        """
        Check if two records are equal based on their data.

        Args:
            other: Another Dict or mapping to compare against.

        Example:
        ```python
        >>> import pyochain as pc
        >>> d1 = pc.Dict({"a": 1, "b": 2})
        >>> d2 = pc.Dict({"a": 1, "b": 2})
        >>> d3 = pc.Dict({"a": 1, "b": 3})
        >>> d1.equals_to(d2)
        True
        >>> d1.equals_to(d3)
        False

        ```
        """
        return (
            self.unwrap() == other.unwrap()
            if isinstance(other, Dict)
            else self.unwrap() == other
        )

__slots__ class-attribute instance-attribute

__slots__ = ()

_data instance-attribute

_data: dict[K, V]

__init__

__init__(data: T) -> None
Source code in src/pyochain/_core/_main.py
34
35
def __init__(self, data: T) -> None:
    self._data = data

__repr__

__repr__() -> str
Source code in src/pyochain/_dict/_main.py
35
36
def __repr__(self) -> str:
    return f"{self.__class__.__name__}({dict_repr(self.unwrap())})"

apply

apply(
    func: Callable[Concatenate[dict[K, V], P], dict[KU, VU]],
    *args: P.args,
    **kwargs: P.kwargs,
) -> Dict[KU, VU]

Apply a function to the underlying dict and return a Dict of the result. Allow to pass user defined functions that transform the dict while retaining the Dict wrapper.

Parameters:

Name Type Description Default
func Callable[Concatenate[dict[K, V], P], dict[KU, VU]]

Function to apply to the underlying dict.

required
*args P.args

Positional arguments to pass to the function.

()
**kwargs P.kwargs

Keyword arguments to pass to the function.

{}

Example:

>>> import pyochain as pc
>>> def invert_dict(d: dict[K, V]) -> dict[V, K]:
...     return {v: k for k, v in d.items()}
>>> pc.Dict({'a': 1, 'b': 2}).apply(invert_dict).unwrap()
{1: 'a', 2: 'b'}

Source code in src/pyochain/_core/_main.py
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
def apply[**P, KU, VU](
    self,
    func: Callable[Concatenate[dict[K, V], P], dict[KU, VU]],
    *args: P.args,
    **kwargs: P.kwargs,
) -> Dict[KU, VU]:
    """
    Apply a function to the underlying dict and return a Dict of the result.
    Allow to pass user defined functions that transform the dict while retaining the Dict wrapper.

    Args:
        func: Function to apply to the underlying dict.
        *args: Positional arguments to pass to the function.
        **kwargs: Keyword arguments to pass to the function.
    Example:
    ```python
    >>> import pyochain as pc
    >>> def invert_dict(d: dict[K, V]) -> dict[V, K]:
    ...     return {v: k for k, v in d.items()}
    >>> pc.Dict({'a': 1, 'b': 2}).apply(invert_dict).unwrap()
    {1: 'a', 2: 'b'}

    ```
    """
    from .._dict import Dict

    return Dict(self.into(func, *args, **kwargs))

diff

diff(other: Mapping[K, V]) -> Dict[K, tuple[V | None, V | None]]

Returns a dict of the differences between this dict and another.

Parameters:

Name Type Description Default
other Mapping[K, V]

The mapping to compare against.

required

The keys of the returned dict are the keys that are not shared or have different values. The values are tuples containing the value from self and the value from other.

>>> import pyochain as pc
>>> d1 = {"a": 1, "b": 2, "c": 3}
>>> d2 = {"b": 2, "c": 4, "d": 5}
>>> pc.Dict(d1).diff(d2).sort().unwrap()
{'a': (1, None), 'c': (3, 4), 'd': (None, 5)}

Source code in src/pyochain/_dict/_joins.py
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
def diff(self, other: Mapping[K, V]) -> Dict[K, tuple[V | None, V | None]]:
    """
    Returns a dict of the differences between this dict and another.

    Args:
        other: The mapping to compare against.

    The keys of the returned dict are the keys that are not shared or have different values.
    The values are tuples containing the value from self and the value from other.
    ```python
    >>> import pyochain as pc
    >>> d1 = {"a": 1, "b": 2, "c": 3}
    >>> d2 = {"b": 2, "c": 4, "d": 5}
    >>> pc.Dict(d1).diff(d2).sort().unwrap()
    {'a': (1, None), 'c': (3, 4), 'd': (None, 5)}

    ```
    """

    def _diff(
        data: Mapping[K, V], other: Mapping[K, V]
    ) -> dict[K, tuple[V | None, V | None]]:
        all_keys: set[K] = data.keys() | other.keys()
        diffs: dict[K, tuple[V | None, V | None]] = {}
        for key in all_keys:
            self_val = data.get(key)
            other_val = other.get(key)
            if self_val != other_val:
                diffs[key] = (self_val, other_val)
        return diffs

    return self.apply(_diff, other)

diff_keys

diff_keys(*others: Mapping[K, V]) -> Dict[K, V]

Return a new Dict keeping only keys present in self but not in others.

Parameters:

Name Type Description Default
*others Mapping[K, V]

Other mappings to exclude keys from.

()
>>> import pyochain as pc
>>> d1 = {"a": 1, "b": 2, "c": 3}
>>> d2 = {"b": 10, "d": 40}
>>> d3 = {"c": 30}
>>> pc.Dict(d1).diff_keys(d2, d3).unwrap()
{'a': 1}
Source code in src/pyochain/_dict/_filters.py
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
def diff_keys(self, *others: Mapping[K, V]) -> Dict[K, V]:
    """
    Return a new Dict keeping only keys present in self but not in others.

    Args:
        *others: Other mappings to exclude keys from.

    ```python
    >>> import pyochain as pc
    >>> d1 = {"a": 1, "b": 2, "c": 3}
    >>> d2 = {"b": 10, "d": 40}
    >>> d3 = {"c": 30}
    >>> pc.Dict(d1).diff_keys(d2, d3).unwrap()
    {'a': 1}

    ```
    """

    def _diff_keys(data: dict[K, V]) -> dict[K, V]:
        self_keys = set(data.keys())
        for other in others:
            self_keys.difference_update(other.keys())
        return {k: data[k] for k in self_keys}

    return self.apply(_diff_keys)

drop

drop(*keys: K) -> Dict[K, V]

Return a new Dict with given keys removed.

Parameters:

Name Type Description Default
*keys K

Sequence of keys to remove from the dictionary.

()

New dict has d[key] deleted for each supplied key.

>>> import pyochain as pc
>>> pc.Dict({"x": 1, "y": 2}).drop("y").unwrap()
{'x': 1}
>>> pc.Dict({"x": 1, "y": 2}).drop("y", "x").unwrap()
{}
>>> pc.Dict({"x": 1}).drop("y").unwrap()  # Ignores missing keys
{'x': 1}
>>> pc.Dict({1: 2, 3: 4}).drop(1).unwrap()
{3: 4}

Source code in src/pyochain/_dict/_process.py
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
def drop(self, *keys: K) -> Dict[K, V]:
    """
    Return a new Dict with given keys removed.

    Args:
        *keys: Sequence of keys to remove from the dictionary.

    New dict has d[key] deleted for each supplied key.
    ```python
    >>> import pyochain as pc
    >>> pc.Dict({"x": 1, "y": 2}).drop("y").unwrap()
    {'x': 1}
    >>> pc.Dict({"x": 1, "y": 2}).drop("y", "x").unwrap()
    {}
    >>> pc.Dict({"x": 1}).drop("y").unwrap()  # Ignores missing keys
    {'x': 1}
    >>> pc.Dict({1: 2, 3: 4}).drop(1).unwrap()
    {3: 4}

    ```
    """
    return self.apply(cz.dicttoolz.dissoc, *keys)

equals_to

equals_to(other: Self | Mapping[Any, Any]) -> bool

Check if two records are equal based on their data.

Parameters:

Name Type Description Default
other Self | Mapping[Any, Any]

Another Dict or mapping to compare against.

required

Example:

>>> import pyochain as pc
>>> d1 = pc.Dict({"a": 1, "b": 2})
>>> d2 = pc.Dict({"a": 1, "b": 2})
>>> d3 = pc.Dict({"a": 1, "b": 3})
>>> d1.equals_to(d2)
True
>>> d1.equals_to(d3)
False

Source code in src/pyochain/_dict/_main.py
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
def equals_to(self, other: Self | Mapping[Any, Any]) -> bool:
    """
    Check if two records are equal based on their data.

    Args:
        other: Another Dict or mapping to compare against.

    Example:
    ```python
    >>> import pyochain as pc
    >>> d1 = pc.Dict({"a": 1, "b": 2})
    >>> d2 = pc.Dict({"a": 1, "b": 2})
    >>> d3 = pc.Dict({"a": 1, "b": 3})
    >>> d1.equals_to(d2)
    True
    >>> d1.equals_to(d3)
    False

    ```
    """
    return (
        self.unwrap() == other.unwrap()
        if isinstance(other, Dict)
        else self.unwrap() == other
    )

filter_attr

filter_attr(attr: str, dtype: type[U] = object) -> Dict[K, U]

Filter values that have a given attribute.

This does not enforce type checking at runtime for performance considerations.

Parameters:

Name Type Description Default
attr str

Attribute name to check for.

required
dtype type[U]

Optional expected type of the attribute for type hinting.

object

Example:

>>> import pyochain as pc
>>> pc.Dict({"a": "hello", "b": "world", "c": 2, "d": 5}).filter_attr(
...     "capitalize", str
... ).unwrap()
{'a': 'hello', 'b': 'world'}

Source code in src/pyochain/_dict/_filters.py
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
def filter_attr[U](self, attr: str, dtype: type[U] = object) -> Dict[K, U]:
    """
    Filter values that have a given attribute.

    This does not enforce type checking at runtime for performance considerations.

    Args:
        attr: Attribute name to check for.
        dtype: Optional expected type of the attribute for type hinting.
    Example:
    ```python
    >>> import pyochain as pc
    >>> pc.Dict({"a": "hello", "b": "world", "c": 2, "d": 5}).filter_attr(
    ...     "capitalize", str
    ... ).unwrap()
    {'a': 'hello', 'b': 'world'}

    ```
    """

    def _filter_attr(data: dict[K, V]) -> dict[K, U]:
        def has_attr(x: V) -> TypeGuard[U]:
            return hasattr(x, attr)

        return cz.dicttoolz.valfilter(has_attr, data)

    return self.apply(_filter_attr)

filter_callable

filter_callable() -> Dict[K, Callable[..., Any]]

Filter values that are callable.

>>> import pyochain as pc
>>> def foo():
...     pass
>>> data = {1: "one", 2: "two", 3: foo, 4: print}
>>> pc.Dict(data).filter_callable().map_values(lambda x: x.__name__).unwrap()
{3: 'foo', 4: 'print'}

Source code in src/pyochain/_dict/_filters.py
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
def filter_callable(self) -> Dict[K, Callable[..., Any]]:
    """
    Filter values that are callable.
    ```python
    >>> import pyochain as pc
    >>> def foo():
    ...     pass
    >>> data = {1: "one", 2: "two", 3: foo, 4: print}
    >>> pc.Dict(data).filter_callable().map_values(lambda x: x.__name__).unwrap()
    {3: 'foo', 4: 'print'}

    ```
    """

    def _filter_callable(data: dict[K, V]) -> dict[K, Callable[..., Any]]:
        def _(x: V) -> TypeGuard[Callable[..., Any]]:
            return callable(x)

        return cz.dicttoolz.valfilter(_, data)

    return self.apply(_filter_callable)

filter_items

filter_items(predicate: Callable[[tuple[K, V]], bool]) -> Dict[K, V]

Filter items by predicate applied to (key, value) tuples.

Parameters:

Name Type Description Default
predicate Callable[[tuple[K, V]], bool]

Function to determine if a (key, value) pair should be included.

required

Example:

>>> import pyochain as pc
>>> def isvalid(item):
...     k, v = item
...     return k % 2 == 0 and v < 4
>>> d = pc.Dict({1: 2, 2: 3, 3: 4, 4: 5})
>>>
>>> d.filter_items(isvalid).unwrap()
{2: 3}
>>> d.filter_items(lambda kv: not isvalid(kv)).unwrap()
{1: 2, 3: 4, 4: 5}

Source code in src/pyochain/_dict/_filters.py
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
def filter_items(
    self,
    predicate: Callable[[tuple[K, V]], bool],
) -> Dict[K, V]:
    """
    Filter items by predicate applied to (key, value) tuples.

    Args:
        predicate: Function to determine if a (key, value) pair should be included.
    Example:
    ```python
    >>> import pyochain as pc
    >>> def isvalid(item):
    ...     k, v = item
    ...     return k % 2 == 0 and v < 4
    >>> d = pc.Dict({1: 2, 2: 3, 3: 4, 4: 5})
    >>>
    >>> d.filter_items(isvalid).unwrap()
    {2: 3}
    >>> d.filter_items(lambda kv: not isvalid(kv)).unwrap()
    {1: 2, 3: 4, 4: 5}

    ```
    """
    return self.apply(partial(cz.dicttoolz.itemfilter, predicate))

filter_keys

filter_keys(predicate: Callable[[K], bool]) -> Dict[K, V]

Return a new Dict containing keys that satisfy predicate.

Parameters:

Name Type Description Default
predicate Callable[[K], bool]

Function to determine if a key should be included.

required

Example:

>>> import pyochain as pc
>>> d = {1: 2, 2: 3, 3: 4, 4: 5}
>>> pc.Dict(d).filter_keys(lambda x: x % 2 == 0).unwrap()
{2: 3, 4: 5}

Source code in src/pyochain/_dict/_filters.py
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
def filter_keys(self, predicate: Callable[[K], bool]) -> Dict[K, V]:
    """
    Return a new Dict containing keys that satisfy predicate.

    Args:
        predicate: Function to determine if a key should be included.
    Example:
    ```python
    >>> import pyochain as pc
    >>> d = {1: 2, 2: 3, 3: 4, 4: 5}
    >>> pc.Dict(d).filter_keys(lambda x: x % 2 == 0).unwrap()
    {2: 3, 4: 5}

    ```
    """
    return self.apply(partial(cz.dicttoolz.keyfilter, predicate))

filter_kv

filter_kv(predicate: Callable[[K, V], bool]) -> Dict[K, V]

Filter items by predicate applied to unpacked (key, value) tuples.

Parameters:

Name Type Description Default
predicate Callable[[K, V], bool]

Function to determine if a key-value pair should be included.

required

Example:

>>> import pyochain as pc
>>> def isvalid(key, value):
...     return key % 2 == 0 and value < 4
>>> d = pc.Dict({1: 2, 2: 3, 3: 4, 4: 5})
>>>
>>> d.filter_kv(isvalid).unwrap()
{2: 3}
>>> d.filter_kv(lambda k, v: not isvalid(k, v)).unwrap()
{1: 2, 3: 4, 4: 5}

Source code in src/pyochain/_dict/_filters.py
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
def filter_kv(
    self,
    predicate: Callable[[K, V], bool],
) -> Dict[K, V]:
    """
    Filter items by predicate applied to unpacked (key, value) tuples.

    Args:
        predicate: Function to determine if a key-value pair should be included.
    Example:
    ```python
    >>> import pyochain as pc
    >>> def isvalid(key, value):
    ...     return key % 2 == 0 and value < 4
    >>> d = pc.Dict({1: 2, 2: 3, 3: 4, 4: 5})
    >>>
    >>> d.filter_kv(isvalid).unwrap()
    {2: 3}
    >>> d.filter_kv(lambda k, v: not isvalid(k, v)).unwrap()
    {1: 2, 3: 4, 4: 5}

    ```
    """

    def _filter_kv(data: dict[K, V]) -> dict[K, V]:
        def _(kv: tuple[K, V]) -> bool:
            return predicate(kv[0], kv[1])

        return cz.dicttoolz.itemfilter(_, data)

    return self.apply(_filter_kv)

filter_subclass

filter_subclass(parent: type[R], keep_parent: bool = True) -> Dict[K, type[R]]

Filter values that are subclasses of a given parent class.

Parameters:

Name Type Description Default
parent type[R]

Parent class to check against.

required
keep_parent bool

Whether to include the parent class itself. Defaults to True.

True
>>> import pyochain as pc
>>> class A:
...     pass
>>> class B(A):
...     pass
>>> class C:
...     pass
>>> def name(cls: type[Any]) -> str:
...     return cls.__name__
>>> data = pc.Dict({"first": A, "second": B, "third": C})
>>> data.filter_subclass(A).map_values(name).unwrap()
{'first': 'A', 'second': 'B'}
>>> data.filter_subclass(A, keep_parent=False).map_values(name).unwrap()
{'second': 'B'}
Source code in src/pyochain/_dict/_filters.py
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
def filter_subclass[U: type[Any], R](
    self: FilterDict[K, U], parent: type[R], keep_parent: bool = True
) -> Dict[K, type[R]]:
    """
    Filter values that are subclasses of a given parent class.

    Args:
        parent: Parent class to check against.
        keep_parent: Whether to include the parent class itself. Defaults to True.

    ```python
    >>> import pyochain as pc
    >>> class A:
    ...     pass
    >>> class B(A):
    ...     pass
    >>> class C:
    ...     pass
    >>> def name(cls: type[Any]) -> str:
    ...     return cls.__name__
    >>> data = pc.Dict({"first": A, "second": B, "third": C})
    >>> data.filter_subclass(A).map_values(name).unwrap()
    {'first': 'A', 'second': 'B'}
    >>> data.filter_subclass(A, keep_parent=False).map_values(name).unwrap()
    {'second': 'B'}

    ```
    """

    def _filter_subclass(data: dict[K, U]) -> dict[K, type[R]]:
        def _(x: type[Any]) -> TypeGuard[type[R]]:
            if keep_parent:
                return issubclass(x, parent)
            else:
                return issubclass(x, parent) and x is not parent

        return cz.dicttoolz.valfilter(_, data)

    return self.apply(_filter_subclass)

filter_type

filter_type(typ: type[R]) -> Dict[K, R]

Filter values by type.

Parameters:

Name Type Description Default
typ type[R]

Type to filter values by.

required

Example:

>>> import pyochain as pc
>>> data = {"a": "one", "b": "two", "c": 3, "d": 4}
>>> pc.Dict(data).filter_type(str).unwrap()
{'a': 'one', 'b': 'two'}

Source code in src/pyochain/_dict/_filters.py
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
def filter_type[R](self, typ: type[R]) -> Dict[K, R]:
    """
    Filter values by type.

    Args:
        typ: Type to filter values by.
    Example:
    ```python
    >>> import pyochain as pc
    >>> data = {"a": "one", "b": "two", "c": 3, "d": 4}
    >>> pc.Dict(data).filter_type(str).unwrap()
    {'a': 'one', 'b': 'two'}

    ```
    """

    def _filter_type(data: dict[K, V]) -> dict[K, R]:
        def _(x: V) -> TypeGuard[R]:
            return isinstance(x, typ)

        return cz.dicttoolz.valfilter(_, data)

    return self.apply(_filter_type)

filter_values

filter_values(predicate: Callable[[V], bool]) -> Dict[K, V]

Return a new Dict containing items whose values satisfy predicate.

Parameters:

Name Type Description Default
predicate Callable[[V], bool]

Function to determine if a value should be included.

required

Example:

>>> import pyochain as pc
>>> d = {1: 2, 2: 3, 3: 4, 4: 5}
>>> pc.Dict(d).filter_values(lambda x: x % 2 == 0).unwrap()
{1: 2, 3: 4}
>>> pc.Dict(d).filter_values(lambda x: not x > 3).unwrap()
{1: 2, 2: 3}

Source code in src/pyochain/_dict/_filters.py
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
def filter_values(self, predicate: Callable[[V], bool]) -> Dict[K, V]:
    """
    Return a new Dict containing items whose values satisfy predicate.

    Args:
        predicate: Function to determine if a value should be included.
    Example:
    ```python
    >>> import pyochain as pc
    >>> d = {1: 2, 2: 3, 3: 4, 4: 5}
    >>> pc.Dict(d).filter_values(lambda x: x % 2 == 0).unwrap()
    {1: 2, 3: 4}
    >>> pc.Dict(d).filter_values(lambda x: not x > 3).unwrap()
    {1: 2, 2: 3}

    ```
    """
    return self.apply(partial(cz.dicttoolz.valfilter, predicate))

flatten

flatten(sep: str = '.', max_depth: int | None = None) -> Dict[str, Any]

Flatten a nested dictionary, concatenating keys with the specified separator.

Parameters:

Name Type Description Default
sep str

Separator to use when concatenating keys

'.'
max_depth int | None

Maximum depth to flatten. If None, flattens completely.

None
>>> import pyochain as pc
>>> data = {
...     "config": {"params": {"retries": 3, "timeout": 30}, "mode": "fast"},
...     "version": 1.0,
... }
>>> pc.Dict(data).flatten().unwrap()
{'config.params.retries': 3, 'config.params.timeout': 30, 'config.mode': 'fast', 'version': 1.0}
>>> pc.Dict(data).flatten(sep="_").unwrap()
{'config_params_retries': 3, 'config_params_timeout': 30, 'config_mode': 'fast', 'version': 1.0}
>>> pc.Dict(data).flatten(max_depth=1).unwrap()
{'config.params': {'retries': 3, 'timeout': 30}, 'config.mode': 'fast', 'version': 1.0}
Source code in src/pyochain/_dict/_nested.py
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
def flatten(
    self: NestedDict[str, Any], sep: str = ".", max_depth: int | None = None
) -> Dict[str, Any]:
    """
    Flatten a nested dictionary, concatenating keys with the specified separator.

    Args:
        sep: Separator to use when concatenating keys
        max_depth: Maximum depth to flatten. If None, flattens completely.
    ```python
    >>> import pyochain as pc
    >>> data = {
    ...     "config": {"params": {"retries": 3, "timeout": 30}, "mode": "fast"},
    ...     "version": 1.0,
    ... }
    >>> pc.Dict(data).flatten().unwrap()
    {'config.params.retries': 3, 'config.params.timeout': 30, 'config.mode': 'fast', 'version': 1.0}
    >>> pc.Dict(data).flatten(sep="_").unwrap()
    {'config_params_retries': 3, 'config_params_timeout': 30, 'config_mode': 'fast', 'version': 1.0}
    >>> pc.Dict(data).flatten(max_depth=1).unwrap()
    {'config.params': {'retries': 3, 'timeout': 30}, 'config.mode': 'fast', 'version': 1.0}

    ```
    """

    def _flatten(
        d: dict[Any, Any], parent_key: str = "", current_depth: int = 1
    ) -> dict[str, Any]:
        items: list[tuple[str, Any]] = []
        for k, v in d.items():
            new_key = parent_key + sep + k if parent_key else k
            if isinstance(v, dict) and (
                max_depth is None or current_depth < max_depth + 1
            ):
                items.extend(
                    _flatten(v, new_key, current_depth + 1).items()  # type: ignore
                )
            else:
                items.append((new_key, v))  # type: ignore
        return dict(items)

    return self.apply(_flatten)

for_each

for_each(
    func: Callable[Concatenate[K, V, P], Any], *args: P.args, **kwargs: P.kwargs
) -> Dict[K, V]

Apply a function to each key-value pair in the dict for side effects.

Parameters:

Name Type Description Default
func Callable[Concatenate[K, V, P], Any]

Function to apply to each key-value pair.

required
*args P.args

Positional arguments to pass to the function.

()
**kwargs P.kwargs

Keyword arguments to pass to the function.

{}

Returns the original Dict unchanged.

>>> import pyochain as pc
>>> pc.Dict({"a": 1, "b": 2}).for_each(
...     lambda k, v: print(f"Key: {k}, Value: {v}")
... ).unwrap()
Key: a, Value: 1
Key: b, Value: 2
{'a': 1, 'b': 2}

Source code in src/pyochain/_dict/_process.py
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
def for_each[**P](
    self,
    func: Callable[Concatenate[K, V, P], Any],
    *args: P.args,
    **kwargs: P.kwargs,
) -> Dict[K, V]:
    """
    Apply a function to each key-value pair in the dict for side effects.

    Args:
        func: Function to apply to each key-value pair.
        *args: Positional arguments to pass to the function.
        **kwargs: Keyword arguments to pass to the function.

    Returns the original Dict unchanged.
    ```python
    >>> import pyochain as pc
    >>> pc.Dict({"a": 1, "b": 2}).for_each(
    ...     lambda k, v: print(f"Key: {k}, Value: {v}")
    ... ).unwrap()
    Key: a, Value: 1
    Key: b, Value: 2
    {'a': 1, 'b': 2}

    ```
    """

    def _for_each(data: dict[K, V]) -> dict[K, V]:
        for k, v in data.items():
            func(k, v, *args, **kwargs)
        return data

    return self.apply(_for_each)

from_ staticmethod

from_(data: Mapping[G, I] | SupportsKeysAndGetItem[G, I]) -> Dict[G, I]

Create a Dict from a mapping or SupportsKeysAndGetItem.

Parameters:

Name Type Description Default
data Mapping[G, I] | SupportsKeysAndGetItem[G, I]

A mapping or object supporting keys and item access to convert into a Dict.

required
>>> import pyochain as pc
>>> class MyMapping:
...     def __init__(self):
...         self._data = {1: "a", 2: "b", 3: "c"}
...
...     def keys(self):
...         return self._data.keys()
...
...     def __getitem__(self, key):
...         return self._data[key]
>>>
>>> pc.Dict.from_(MyMapping()).unwrap()
{1: 'a', 2: 'b', 3: 'c'}
Source code in src/pyochain/_dict/_main.py
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
@staticmethod
def from_[G, I](data: Mapping[G, I] | SupportsKeysAndGetItem[G, I]) -> Dict[G, I]:
    """
    Create a Dict from a mapping or SupportsKeysAndGetItem.

    Args:
        data: A mapping or object supporting keys and item access to convert into a Dict.

    ```python
    >>> import pyochain as pc
    >>> class MyMapping:
    ...     def __init__(self):
    ...         self._data = {1: "a", 2: "b", 3: "c"}
    ...
    ...     def keys(self):
    ...         return self._data.keys()
    ...
    ...     def __getitem__(self, key):
    ...         return self._data[key]
    >>>
    >>> pc.Dict.from_(MyMapping()).unwrap()
    {1: 'a', 2: 'b', 3: 'c'}

    ```
    """
    return Dict(dict(data))

from_object staticmethod

from_object(obj: object) -> Dict[str, Any]

Create a Dict from an object's dict attribute.

Parameters:

Name Type Description Default
obj object

The object whose __dict__ attribute will be used to create the Dict.

required
>>> import pyochain as pc
>>> class Person:
...     def __init__(self, name: str, age: int):
...         self.name = name
...         self.age = age
>>> person = Person("Alice", 30)
>>> pc.Dict.from_object(person).unwrap()
{'name': 'Alice', 'age': 30}
Source code in src/pyochain/_dict/_main.py
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
@staticmethod
def from_object(obj: object) -> Dict[str, Any]:
    """
    Create a Dict from an object's __dict__ attribute.

    Args:
        obj: The object whose `__dict__` attribute will be used to create the Dict.

    ```python
    >>> import pyochain as pc
    >>> class Person:
    ...     def __init__(self, name: str, age: int):
    ...         self.name = name
    ...         self.age = age
    >>> person = Person("Alice", 30)
    >>> pc.Dict.from_object(person).unwrap()
    {'name': 'Alice', 'age': 30}

    ```
    """
    return Dict(obj.__dict__)

get_in

get_in(*keys: K, default: Any = None) -> Any

Retrieve a value from a nested dictionary structure.

Parameters:

Name Type Description Default
*keys K

Sequence of keys representing the nested path to retrieve the value.

()
default Any

Default value to return if the keys do not exist.

None
>>> import pyochain as pc
>>> data = {"a": {"b": {"c": 1}}}
>>> pc.Dict(data).get_in("a", "b", "c")
1
>>> pc.Dict(data).get_in("a", "x", default="Not Found")
'Not Found'
Source code in src/pyochain/_dict/_nested.py
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
def get_in(self, *keys: K, default: Any = None) -> Any:
    """
    Retrieve a value from a nested dictionary structure.

    Args:
        *keys: Sequence of keys representing the nested path to retrieve the value.
        default: Default value to return if the keys do not exist.

    ```python
    >>> import pyochain as pc
    >>> data = {"a": {"b": {"c": 1}}}
    >>> pc.Dict(data).get_in("a", "b", "c")
    1
    >>> pc.Dict(data).get_in("a", "x", default="Not Found")
    'Not Found'

    ```
    """

    def _get_in(data: Mapping[K, V]) -> Any:
        return cz.dicttoolz.get_in(keys, data, default)

    return self.into(_get_in)

group_by_key

group_by_key(func: Callable[[K], G]) -> Dict[G, dict[K, V]]

Group dict items into sub-dictionaries based on a function of the key.

Parameters:

Name Type Description Default
func Callable[[K], G]

Function to determine the group for each key.

required
>>> import pyochain as pc
>>> d = {"user_1": 10, "user_2": 20, "admin_1": 100}
>>> pc.Dict(d).group_by_key(lambda k: k.split("_")[0]).unwrap()
{'user': {'user_1': 10, 'user_2': 20}, 'admin': {'admin_1': 100}}
Source code in src/pyochain/_dict/_groups.py
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
def group_by_key[G](self, func: Callable[[K], G]) -> Dict[G, dict[K, V]]:
    """
    Group dict items into sub-dictionaries based on a function of the key.

    Args:
        func: Function to determine the group for each key.

    ```python
    >>> import pyochain as pc
    >>> d = {"user_1": 10, "user_2": 20, "admin_1": 100}
    >>> pc.Dict(d).group_by_key(lambda k: k.split("_")[0]).unwrap()
    {'user': {'user_1': 10, 'user_2': 20}, 'admin': {'admin_1': 100}}

    ```
    """

    def _group_by_key(data: dict[K, V]) -> dict[G, dict[K, V]]:
        def _(kv: tuple[K, V]) -> G:
            return func(kv[0])

        return cz.dicttoolz.valmap(dict, cz.itertoolz.groupby(_, data.items()))

    return self.apply(_group_by_key)

group_by_key_agg

group_by_key_agg(
    key_func: Callable[[K], G], agg_func: Callable[[Dict[K, V]], R]
) -> Dict[G, R]

Group by key function, then apply aggregation function to each sub-dict.

Parameters:

Name Type Description Default
key_func Callable[[K], G]

Function to determine the group for each key.

required
agg_func Callable[[Dict[K, V]], R]

Function to aggregate each sub-dictionary.

required

This avoids materializing intermediate Dict objects if you only need an aggregated result for each group.

>>> import pyochain as pc
>>>
>>> data = {"user_1": 10, "user_2": 20, "admin_1": 100}
>>> pc.Dict(data).group_by_key_agg(
...     key_func=lambda k: k.split("_")[0],
...     agg_func=lambda d: d.iter_values().sum(),
... ).unwrap()
{'user': 30, 'admin': 100}
>>>
>>> data_files = {
...     "file_a.txt": 100,
...     "file_b.log": 20,
...     "file_c.txt": 50,
...     "file_d.log": 5,
... }
>>>
>>> def get_stats(sub_dict: pc.Dict[str, int]) -> dict[str, Any]:
...     return {
...         "count": sub_dict.iter_keys().count(),
...         "total_size": sub_dict.iter_values().sum(),
...         "max_size": sub_dict.iter_values().max(),
...         "files": sub_dict.iter_keys().sort().into(list),
...     }
>>>
>>> pc.Dict(data_files).group_by_key_agg(
...     key_func=lambda k: k.split(".")[-1], agg_func=get_stats
... ).sort().unwrap()
{'log': {'count': 2, 'total_size': 25, 'max_size': 20, 'files': ['file_b.log', 'file_d.log']}, 'txt': {'count': 2, 'total_size': 150, 'max_size': 100, 'files': ['file_a.txt', 'file_c.txt']}}

Source code in src/pyochain/_dict/_groups.py
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
def group_by_key_agg[G, R](
    self,
    key_func: Callable[[K], G],
    agg_func: Callable[[Dict[K, V]], R],
) -> Dict[G, R]:
    """
    Group by key function, then apply aggregation function to each sub-dict.

    Args:
        key_func: Function to determine the group for each key.
        agg_func: Function to aggregate each sub-dictionary.

    This avoids materializing intermediate `Dict` objects if you only need
    an aggregated result for each group.
    ```python
    >>> import pyochain as pc
    >>>
    >>> data = {"user_1": 10, "user_2": 20, "admin_1": 100}
    >>> pc.Dict(data).group_by_key_agg(
    ...     key_func=lambda k: k.split("_")[0],
    ...     agg_func=lambda d: d.iter_values().sum(),
    ... ).unwrap()
    {'user': 30, 'admin': 100}
    >>>
    >>> data_files = {
    ...     "file_a.txt": 100,
    ...     "file_b.log": 20,
    ...     "file_c.txt": 50,
    ...     "file_d.log": 5,
    ... }
    >>>
    >>> def get_stats(sub_dict: pc.Dict[str, int]) -> dict[str, Any]:
    ...     return {
    ...         "count": sub_dict.iter_keys().count(),
    ...         "total_size": sub_dict.iter_values().sum(),
    ...         "max_size": sub_dict.iter_values().max(),
    ...         "files": sub_dict.iter_keys().sort().into(list),
    ...     }
    >>>
    >>> pc.Dict(data_files).group_by_key_agg(
    ...     key_func=lambda k: k.split(".")[-1], agg_func=get_stats
    ... ).sort().unwrap()
    {'log': {'count': 2, 'total_size': 25, 'max_size': 20, 'files': ['file_b.log', 'file_d.log']}, 'txt': {'count': 2, 'total_size': 150, 'max_size': 100, 'files': ['file_a.txt', 'file_c.txt']}}

    ```
    """
    from ._main import Dict

    def _group_by_key_agg(data: dict[K, V]) -> dict[G, R]:
        def _key_func(kv: tuple[K, V]) -> G:
            return key_func(kv[0])

        def _agg_func(items: list[tuple[K, V]]) -> R:
            return agg_func(Dict(dict(items)))

        groups = cz.itertoolz.groupby(_key_func, data.items())
        return cz.dicttoolz.valmap(_agg_func, groups)

    return self.apply(_group_by_key_agg)

group_by_value

group_by_value(func: Callable[[V], G]) -> Dict[G, dict[K, V]]

Group dict items into sub-dictionaries based on a function of the value.

Parameters:

Name Type Description Default
func Callable[[V], G]

Function to determine the group for each value.

required
>>> import pyochain as pc
>>> d = {"a": 1, "b": 2, "c": 3, "d": 2}
>>> pc.Dict(d).group_by_value(lambda v: v % 2).unwrap()
{1: {'a': 1, 'c': 3}, 0: {'b': 2, 'd': 2}}
Source code in src/pyochain/_dict/_groups.py
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
def group_by_value[G](self, func: Callable[[V], G]) -> Dict[G, dict[K, V]]:
    """
    Group dict items into sub-dictionaries based on a function of the value.

    Args:
        func: Function to determine the group for each value.

    ```python
    >>> import pyochain as pc
    >>> d = {"a": 1, "b": 2, "c": 3, "d": 2}
    >>> pc.Dict(d).group_by_value(lambda v: v % 2).unwrap()
    {1: {'a': 1, 'c': 3}, 0: {'b': 2, 'd': 2}}

    ```
    """

    def _group_by_value(data: dict[K, V]) -> dict[G, dict[K, V]]:
        def _(kv: tuple[K, V]) -> G:
            return func(kv[1])

        return cz.dicttoolz.valmap(dict, cz.itertoolz.groupby(_, data.items()))

    return self.apply(_group_by_value)

group_by_value_agg

group_by_value_agg(
    value_func: Callable[[V], G], agg_func: Callable[[Dict[K, V]], R]
) -> Dict[G, R]

Group by value function, then apply aggregation function to each sub-dict.

Parameters:

Name Type Description Default
value_func Callable[[V], G]

Function to determine the group for each value.

required
agg_func Callable[[Dict[K, V]], R]

Function to aggregate each sub-dictionary.

required

This avoids materializing intermediate Dict objects if you only need an aggregated result for each group.

>>> import pyochain as pc
>>>
>>> data = {"math": "A", "physics": "B", "english": "A"}
>>> pc.Dict(data).group_by_value_agg(
...     value_func=lambda grade: grade,
...     agg_func=lambda d: d.iter_keys().count(),
... ).unwrap()
{'A': 2, 'B': 1}
>>>
>>> # --- Exemple 2: Agrégation plus complexe ---
>>> sales_data = {
...     "store_1": "Electronics",
...     "store_2": "Groceries",
...     "store_3": "Electronics",
...     "store_4": "Clothing",
... }
>>>
>>> # Obtain the first store for each category (after sorting store names)
>>> pc.Dict(sales_data).group_by_value_agg(
...     value_func=lambda category: category,
...     agg_func=lambda d: d.iter_keys().sort().first(),
... ).sort().unwrap()
{'Clothing': 'store_4', 'Electronics': 'store_1', 'Groceries': 'store_2'}

Source code in src/pyochain/_dict/_groups.py
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
def group_by_value_agg[G, R](
    self,
    value_func: Callable[[V], G],
    agg_func: Callable[[Dict[K, V]], R],
) -> Dict[G, R]:
    """
    Group by value function, then apply aggregation function to each sub-dict.

    Args:
        value_func: Function to determine the group for each value.
        agg_func: Function to aggregate each sub-dictionary.

    This avoids materializing intermediate `Dict` objects if you only need
    an aggregated result for each group.
    ```python
    >>> import pyochain as pc
    >>>
    >>> data = {"math": "A", "physics": "B", "english": "A"}
    >>> pc.Dict(data).group_by_value_agg(
    ...     value_func=lambda grade: grade,
    ...     agg_func=lambda d: d.iter_keys().count(),
    ... ).unwrap()
    {'A': 2, 'B': 1}
    >>>
    >>> # --- Exemple 2: Agrégation plus complexe ---
    >>> sales_data = {
    ...     "store_1": "Electronics",
    ...     "store_2": "Groceries",
    ...     "store_3": "Electronics",
    ...     "store_4": "Clothing",
    ... }
    >>>
    >>> # Obtain the first store for each category (after sorting store names)
    >>> pc.Dict(sales_data).group_by_value_agg(
    ...     value_func=lambda category: category,
    ...     agg_func=lambda d: d.iter_keys().sort().first(),
    ... ).sort().unwrap()
    {'Clothing': 'store_4', 'Electronics': 'store_1', 'Groceries': 'store_2'}

    ```
    """
    from ._main import Dict

    def _group_by_value_agg(data: dict[K, V]) -> dict[G, R]:
        def _key_func(kv: tuple[K, V]) -> G:
            return value_func(kv[1])

        def _agg_func(items: list[tuple[K, V]]) -> R:
            return agg_func(Dict(dict(items)))

        groups = cz.itertoolz.groupby(_key_func, data.items())
        return cz.dicttoolz.valmap(_agg_func, groups)

    return self.apply(_group_by_value_agg)

implode

implode() -> Dict[K, list[V]]

Nest all the values in lists. syntactic sugar for map_values(lambda v: [v])

>>> import pyochain as pc
>>> pc.Dict({1: 2, 3: 4}).implode().unwrap()
{1: [2], 3: [4]}

Source code in src/pyochain/_dict/_main.py
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
def implode(self) -> Dict[K, list[V]]:
    """
    Nest all the values in lists.
    syntactic sugar for map_values(lambda v: [v])
    ```python
    >>> import pyochain as pc
    >>> pc.Dict({1: 2, 3: 4}).implode().unwrap()
    {1: [2], 3: [4]}

    ```
    """

    def _implode(data: dict[K, V]) -> dict[K, list[V]]:
        def _(v: V) -> list[V]:
            return [v]

        return cz.dicttoolz.valmap(_, data)

    return self.apply(_implode)

inner_join

inner_join(other: Mapping[K, W]) -> Dict[K, tuple[V, W]]

Performs an inner join with another mapping based on keys.

Parameters:

Name Type Description Default
other Mapping[K, W]

The mapping to join with.

required

Only keys present in both mappings are kept.

>>> import pyochain as pc
>>> d1 = {"a": 1, "b": 2}
>>> d2 = {"b": 10, "c": 20}
>>> pc.Dict(d1).inner_join(d2).unwrap()
{'b': (2, 10)}

Source code in src/pyochain/_dict/_joins.py
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
def inner_join[W](self, other: Mapping[K, W]) -> Dict[K, tuple[V, W]]:
    """
    Performs an inner join with another mapping based on keys.

    Args:
        other: The mapping to join with.

    Only keys present in both mappings are kept.
    ```python
    >>> import pyochain as pc
    >>> d1 = {"a": 1, "b": 2}
    >>> d2 = {"b": 10, "c": 20}
    >>> pc.Dict(d1).inner_join(d2).unwrap()
    {'b': (2, 10)}

    ```
    """

    def _inner_join(data: Mapping[K, V]) -> dict[K, tuple[V, W]]:
        return {k: (v, other[k]) for k, v in data.items() if k in other}

    return self.apply(_inner_join)

intersect_keys

intersect_keys(*others: Mapping[K, V]) -> Dict[K, V]

Return a new Dict keeping only keys present in self and all others.

Parameters:

Name Type Description Default
*others Mapping[K, V]

Other mappings to intersect keys with.

()
>>> import pyochain as pc
>>> d1 = {"a": 1, "b": 2, "c": 3}
>>> d2 = {"b": 10, "c": 20}
>>> d3 = {"c": 30}
>>> pc.Dict(d1).intersect_keys(d2, d3).unwrap()
{'c': 3}
Source code in src/pyochain/_dict/_filters.py
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
def intersect_keys(self, *others: Mapping[K, V]) -> Dict[K, V]:
    """
    Return a new Dict keeping only keys present in self and all others.

    Args:
        *others: Other mappings to intersect keys with.

    ```python
    >>> import pyochain as pc
    >>> d1 = {"a": 1, "b": 2, "c": 3}
    >>> d2 = {"b": 10, "c": 20}
    >>> d3 = {"c": 30}
    >>> pc.Dict(d1).intersect_keys(d2, d3).unwrap()
    {'c': 3}

    ```
    """

    def _intersect_keys(data: dict[K, V]) -> dict[K, V]:
        self_keys = set(data.keys())
        for other in others:
            self_keys.intersection_update(other.keys())
        return {k: data[k] for k in self_keys}

    return self.apply(_intersect_keys)

into

into(func: Callable[Concatenate[T, P], R], *args: P.args, **kwargs: P.kwargs) -> R

Pass the unwrapped underlying data into a function.

The result is not wrapped.

>>> import pyochain as pc
>>> pc.Iter.from_(range(5)).into(list)
[0, 1, 2, 3, 4]
This is a core functionality that allows ending the chain whilst keeping the code style consistent.

Source code in src/pyochain/_core/_main.py
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
def into[**P, R](
    self,
    func: Callable[Concatenate[T, P], R],
    *args: P.args,
    **kwargs: P.kwargs,
) -> R:
    """
    Pass the *unwrapped* underlying data into a function.

    The result is not wrapped.
    ```python
    >>> import pyochain as pc
    >>> pc.Iter.from_(range(5)).into(list)
    [0, 1, 2, 3, 4]

    ```
    This is a core functionality that allows ending the chain whilst keeping the code style consistent.
    """
    return func(self.unwrap(), *args, **kwargs)

invert

invert() -> Dict[V, list[K]]

Invert the dictionary, grouping keys by common (and hashable) values.

>>> import pyochain as pc
>>> d = {"a": 1, "b": 2, "c": 1}
>>> pc.Dict(d).invert().unwrap()
{1: ['a', 'c'], 2: ['b']}

Source code in src/pyochain/_dict/_main.py
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
def invert(self) -> Dict[V, list[K]]:
    """
    Invert the dictionary, grouping keys by common (and hashable) values.
    ```python
    >>> import pyochain as pc
    >>> d = {"a": 1, "b": 2, "c": 1}
    >>> pc.Dict(d).invert().unwrap()
    {1: ['a', 'c'], 2: ['b']}

    ```
    """

    def _invert(data: dict[K, V]) -> dict[V, list[K]]:
        inverted: dict[V, list[K]] = defaultdict(list)
        for k, v in data.items():
            inverted[v].append(k)
        return dict(inverted)

    return self.apply(_invert)

iter_items

iter_items() -> Iter[tuple[K, V]]

Return a Iter of the dict's items.

>>> import pyochain as pc
>>> pc.Dict({1: 2}).iter_items().into(list)
[(1, 2)]

Source code in src/pyochain/_dict/_iter.py
80
81
82
83
84
85
86
87
88
89
90
91
92
def iter_items(self) -> Iter[tuple[K, V]]:
    """
    Return a Iter of the dict's items.
    ```python
    >>> import pyochain as pc
    >>> pc.Dict({1: 2}).iter_items().into(list)
    [(1, 2)]

    ```
    """
    from .._iter import Iter

    return Iter.from_(self.unwrap().items())

iter_keys

iter_keys() -> Iter[K]

Return a Iter of the dict's keys.

>>> import pyochain as pc
>>> pc.Dict({1: 2}).iter_keys().into(list)
[1]

Source code in src/pyochain/_dict/_iter.py
52
53
54
55
56
57
58
59
60
61
62
63
64
def iter_keys(self) -> Iter[K]:
    """
    Return a Iter of the dict's keys.
    ```python
    >>> import pyochain as pc
    >>> pc.Dict({1: 2}).iter_keys().into(list)
    [1]

    ```
    """
    from .._iter import Iter

    return Iter.from_(self.unwrap().keys())

iter_values

iter_values() -> Iter[V]

Return an Iter of the dict's values.

>>> import pyochain as pc
>>> pc.Dict({1: 2}).iter_values().into(list)
[2]

Source code in src/pyochain/_dict/_iter.py
66
67
68
69
70
71
72
73
74
75
76
77
78
def iter_values(self) -> Iter[V]:
    """
    Return an Iter of the dict's values.
    ```python
    >>> import pyochain as pc
    >>> pc.Dict({1: 2}).iter_values().into(list)
    [2]

    ```
    """
    from .._iter import Iter

    return Iter.from_(self.unwrap().values())

itr

itr(
    func: Callable[Concatenate[Iter[U], P], R], *args: P.args, **kwargs: P.kwargs
) -> Dict[K, R]

Apply a function to each value after wrapping it in an Iter.

Parameters:

Name Type Description Default
func Callable[Concatenate[Iter[U], P], R]

Function to apply to each value after wrapping it in an Iter.

required
*args P.args

Positional arguments to pass to the function.

()
**kwargs P.kwargs

Keyword arguments to pass to the function.

{}

Syntactic sugar for map_values(lambda data: func(Iter(data), *args, **kwargs))

>>> import pyochain as pc
>>> data = {
...     "numbers1": [1, 2, 3],
...     "numbers2": [4, 5, 6],
... }
>>> pc.Dict(data).itr(lambda v: v.repeat(5).flatten().sum()).unwrap()
{'numbers1': 30, 'numbers2': 75}

Source code in src/pyochain/_dict/_iter.py
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
def itr[**P, R, U](
    self: MappingWrapper[K, Iterable[U]],
    func: Callable[Concatenate[Iter[U], P], R],
    *args: P.args,
    **kwargs: P.kwargs,
) -> Dict[K, R]:
    """
    Apply a function to each value after wrapping it in an Iter.

    Args:
        func: Function to apply to each value after wrapping it in an Iter.
        *args: Positional arguments to pass to the function.
        **kwargs: Keyword arguments to pass to the function.

    Syntactic sugar for `map_values(lambda data: func(Iter(data), *args, **kwargs))`
    ```python
    >>> import pyochain as pc
    >>> data = {
    ...     "numbers1": [1, 2, 3],
    ...     "numbers2": [4, 5, 6],
    ... }
    >>> pc.Dict(data).itr(lambda v: v.repeat(5).flatten().sum()).unwrap()
    {'numbers1': 30, 'numbers2': 75}

    ```
    """
    from .._iter import Iter

    def _itr(data: Mapping[K, Iterable[U]]) -> dict[K, R]:
        def _(v: Iterable[U]) -> R:
            return func(Iter.from_(v), *args, **kwargs)

        return cz.dicttoolz.valmap(_, data)

    return self.apply(_itr)

left_join

left_join(other: Mapping[K, W]) -> Dict[K, tuple[V, W | None]]

Performs a left join with another mapping based on keys.

Parameters:

Name Type Description Default
other Mapping[K, W]

The mapping to join with.

required

All keys from the left dictionary (self) are kept.

>>> import pyochain as pc
>>> d1 = {"a": 1, "b": 2}
>>> d2 = {"b": 10, "c": 20}
>>> pc.Dict(d1).left_join(d2).unwrap()
{'a': (1, None), 'b': (2, 10)}

Source code in src/pyochain/_dict/_joins.py
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
def left_join[W](self, other: Mapping[K, W]) -> Dict[K, tuple[V, W | None]]:
    """
    Performs a left join with another mapping based on keys.

    Args:
        other: The mapping to join with.

    All keys from the left dictionary (self) are kept.
    ```python
    >>> import pyochain as pc
    >>> d1 = {"a": 1, "b": 2}
    >>> d2 = {"b": 10, "c": 20}
    >>> pc.Dict(d1).left_join(d2).unwrap()
    {'a': (1, None), 'b': (2, 10)}

    ```
    """

    def _left_join(data: Mapping[K, V]) -> dict[K, tuple[V, W | None]]:
        return {k: (v, other.get(k)) for k, v in data.items()}

    return self.apply(_left_join)

map_items

map_items(func: Callable[[tuple[K, V]], tuple[KR, VR]]) -> Dict[KR, VR]

Transform (key, value) pairs using a function that takes a (key, value) tuple.

Parameters:

Name Type Description Default
func Callable[[tuple[K, V]], tuple[KR, VR]]

Function to transform each (key, value) pair into a new (key, value) tuple.

required
>>> import pyochain as pc
>>> pc.Dict({"Alice": 10, "Bob": 20}).map_items(
...     lambda kv: (kv[0].upper(), kv[1] * 2)
... ).unwrap()
{'ALICE': 20, 'BOB': 40}
Source code in src/pyochain/_dict/_main.py
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
def map_items[KR, VR](
    self,
    func: Callable[[tuple[K, V]], tuple[KR, VR]],
) -> Dict[KR, VR]:
    """
    Transform (key, value) pairs using a function that takes a (key, value) tuple.

    Args:
        func: Function to transform each (key, value) pair into a new (key, value) tuple.

    ```python
    >>> import pyochain as pc
    >>> pc.Dict({"Alice": 10, "Bob": 20}).map_items(
    ...     lambda kv: (kv[0].upper(), kv[1] * 2)
    ... ).unwrap()
    {'ALICE': 20, 'BOB': 40}

    ```
    """
    return self.apply(partial(cz.dicttoolz.itemmap, func))

map_keys

map_keys(func: Callable[[K], T]) -> Dict[T, V]

Return a Dict with keys transformed by func.

Parameters:

Name Type Description Default
func Callable[[K], T]

Function to apply to each key in the dictionary.

required
>>> import pyochain as pc
>>> pc.Dict({"Alice": [20, 15, 30], "Bob": [10, 35]}).map_keys(
...     str.lower
... ).unwrap()
{'alice': [20, 15, 30], 'bob': [10, 35]}
>>>
>>> pc.Dict({1: "a"}).map_keys(str).unwrap()
{'1': 'a'}
Source code in src/pyochain/_dict/_main.py
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
def map_keys[T](self, func: Callable[[K], T]) -> Dict[T, V]:
    """
    Return a Dict with keys transformed by func.

    Args:
        func: Function to apply to each key in the dictionary.

    ```python
    >>> import pyochain as pc
    >>> pc.Dict({"Alice": [20, 15, 30], "Bob": [10, 35]}).map_keys(
    ...     str.lower
    ... ).unwrap()
    {'alice': [20, 15, 30], 'bob': [10, 35]}
    >>>
    >>> pc.Dict({1: "a"}).map_keys(str).unwrap()
    {'1': 'a'}

    ```
    """
    return self.apply(partial(cz.dicttoolz.keymap, func))

map_kv

map_kv(func: Callable[[K, V], tuple[KR, VR]]) -> Dict[KR, VR]

Transform (key, value) pairs using a function that takes key and value as separate arguments.

Parameters:

Name Type Description Default
func Callable[[K, V], tuple[KR, VR]]

Function to transform each key and value into a new (key, value) tuple.

required
>>> import pyochain as pc
>>> pc.Dict({1: 2}).map_kv(lambda k, v: (k + 1, v * 10)).unwrap()
{2: 20}
Source code in src/pyochain/_dict/_main.py
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
def map_kv[KR, VR](
    self,
    func: Callable[[K, V], tuple[KR, VR]],
) -> Dict[KR, VR]:
    """
    Transform (key, value) pairs using a function that takes key and value as separate arguments.

    Args:
        func: Function to transform each key and value into a new (key, value) tuple.

    ```python
    >>> import pyochain as pc
    >>> pc.Dict({1: 2}).map_kv(lambda k, v: (k + 1, v * 10)).unwrap()
    {2: 20}

    ```
    """

    def _map_kv(data: dict[K, V]) -> dict[KR, VR]:
        def _(kv: tuple[K, V]) -> tuple[KR, VR]:
            return func(kv[0], kv[1])

        return cz.dicttoolz.itemmap(_, data)

    return self.apply(_map_kv)

map_values

map_values(func: Callable[[V], T]) -> Dict[K, T]

Return a Dict with values transformed by func.

Parameters:

Name Type Description Default
func Callable[[V], T]

Function to apply to each value in the dictionary.

required
>>> import pyochain as pc
>>> pc.Dict({"Alice": [20, 15, 30], "Bob": [10, 35]}).map_values(sum).unwrap()
{'Alice': 65, 'Bob': 45}
>>>
>>> pc.Dict({1: 1}).map_values(lambda v: v + 1).unwrap()
{1: 2}
Source code in src/pyochain/_dict/_main.py
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
def map_values[T](self, func: Callable[[V], T]) -> Dict[K, T]:
    """
    Return a Dict with values transformed by func.

    Args:
        func: Function to apply to each value in the dictionary.

    ```python
    >>> import pyochain as pc
    >>> pc.Dict({"Alice": [20, 15, 30], "Bob": [10, 35]}).map_values(sum).unwrap()
    {'Alice': 65, 'Bob': 45}
    >>>
    >>> pc.Dict({1: 1}).map_values(lambda v: v + 1).unwrap()
    {1: 2}

    ```
    """
    return self.apply(partial(cz.dicttoolz.valmap, func))

merge

merge(*others: Mapping[K, V]) -> Dict[K, V]

Merge other dicts into this one and return a new Dict.

Parameters:

Name Type Description Default
*others Mapping[K, V]

One or more mappings to merge into the current dictionary.

()
>>> import pyochain as pc
>>> pc.Dict({1: "one"}).merge({2: "two"}).unwrap()
{1: 'one', 2: 'two'}
>>> # Later dictionaries have precedence
>>> pc.Dict({1: 2, 3: 4}).merge({3: 3, 4: 4}).unwrap()
{1: 2, 3: 3, 4: 4}
Source code in src/pyochain/_dict/_joins.py
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
def merge(self, *others: Mapping[K, V]) -> Dict[K, V]:
    """
    Merge other dicts into this one and return a new Dict.

    Args:
        *others: One or more mappings to merge into the current dictionary.

    ```python
    >>> import pyochain as pc
    >>> pc.Dict({1: "one"}).merge({2: "two"}).unwrap()
    {1: 'one', 2: 'two'}
    >>> # Later dictionaries have precedence
    >>> pc.Dict({1: 2, 3: 4}).merge({3: 3, 4: 4}).unwrap()
    {1: 2, 3: 3, 4: 4}

    ```
    """
    return self.apply(cz.dicttoolz.merge, *others)

merge_with

merge_with(*others: Mapping[K, V], func: Callable[[Iterable[V]], V]) -> Dict[K, V]

Merge dicts using a function to combine values for duplicate keys.

Parameters:

Name Type Description Default
*others Mapping[K, V]

One or more mappings to merge into the current dictionary.

()
func Callable[[Iterable[V]], V]

Function to combine values for duplicate keys.

required

A key may occur in more than one dict, and all values mapped from the key will be passed to the function as a list, such as func([val1, val2, ...]).

>>> import pyochain as pc
>>> pc.Dict({1: 1, 2: 2}).merge_with({1: 10, 2: 20}, func=sum).unwrap()
{1: 11, 2: 22}
>>> pc.Dict({1: 1, 2: 2}).merge_with({2: 20, 3: 30}, func=max).unwrap()
{1: 1, 2: 20, 3: 30}

Source code in src/pyochain/_dict/_joins.py
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
def merge_with(
    self, *others: Mapping[K, V], func: Callable[[Iterable[V]], V]
) -> Dict[K, V]:
    """
    Merge dicts using a function to combine values for duplicate keys.

    Args:
        *others: One or more mappings to merge into the current dictionary.
        func: Function to combine values for duplicate keys.

    A key may occur in more than one dict, and all values mapped from the key will be passed to the function as a list, such as func([val1, val2, ...]).
    ```python
    >>> import pyochain as pc
    >>> pc.Dict({1: 1, 2: 2}).merge_with({1: 10, 2: 20}, func=sum).unwrap()
    {1: 11, 2: 22}
    >>> pc.Dict({1: 1, 2: 2}).merge_with({2: 20, 3: 30}, func=max).unwrap()
    {1: 1, 2: 20, 3: 30}

    ```
    """

    def _merge_with(data: Mapping[K, V]) -> dict[K, V]:
        return cz.dicttoolz.merge_with(func, data, *others)

    return self.apply(_merge_with)

pipe

pipe(func: Callable[Concatenate[Self, P], R], *args: P.args, **kwargs: P.kwargs) -> R

Pipe the instance in the function and return the result.

Source code in src/pyochain/_core/_main.py
13
14
15
16
17
18
19
20
def pipe[**P, R](
    self,
    func: Callable[Concatenate[Self, P], R],
    *args: P.args,
    **kwargs: P.kwargs,
) -> R:
    """Pipe the instance in the function and return the result."""
    return func(self, *args, **kwargs)

pluck

pluck(*keys: str) -> Dict[U, Any]

Extract values from nested dictionaries using a sequence of keys.

Parameters:

Name Type Description Default
*keys str

Sequence of keys to extract values from the nested dictionaries.

()
>>> import pyochain as pc
>>> data = {
...     "person1": {"name": "Alice", "age": 30},
...     "person2": {"name": "Bob", "age": 25},
... }
>>> pc.Dict(data).pluck("name").unwrap()
{'person1': 'Alice', 'person2': 'Bob'}
Source code in src/pyochain/_dict/_nested.py
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
def pluck[U: str | int](self: NestedDict[U, Any], *keys: str) -> Dict[U, Any]:
    """
    Extract values from nested dictionaries using a sequence of keys.

    Args:
        *keys: Sequence of keys to extract values from the nested dictionaries.
    ```python
    >>> import pyochain as pc
    >>> data = {
    ...     "person1": {"name": "Alice", "age": 30},
    ...     "person2": {"name": "Bob", "age": 25},
    ... }
    >>> pc.Dict(data).pluck("name").unwrap()
    {'person1': 'Alice', 'person2': 'Bob'}

    ```
    """

    getter = partial(cz.dicttoolz.get_in, keys)

    def _pluck(data: Mapping[U, Any]) -> dict[U, Any]:
        return cz.dicttoolz.valmap(getter, data)

    return self.apply(_pluck)

println

println(pretty: bool = True) -> Self

Print the underlying data and return self for chaining.

Useful for debugging, simply insert .println() in the chain, and then removing it will not affect the rest of the chain.

Source code in src/pyochain/_core/_main.py
46
47
48
49
50
51
52
53
54
55
56
57
58
59
def println(self, pretty: bool = True) -> Self:
    """
    Print the underlying data and return self for chaining.

    Useful for debugging, simply insert `.println()` in the chain,
    and then removing it will not affect the rest of the chain.
    """
    from pprint import pprint

    if pretty:
        pprint(self.unwrap(), sort_dicts=False)
    else:
        print(self.unwrap())
    return self

rename

rename(mapping: Mapping[K, K]) -> Dict[K, V]

Return a new Dict with keys renamed according to the mapping.

Parameters:

Name Type Description Default
mapping Mapping[K, K]

A dictionary mapping old keys to new keys.

required

Keys not in the mapping are kept as is.

>>> import pyochain as pc
>>> d = {"a": 1, "b": 2, "c": 3}
>>> mapping = {"b": "beta", "c": "gamma"}
>>> pc.Dict(d).rename(mapping).unwrap()
{'a': 1, 'beta': 2, 'gamma': 3}

Source code in src/pyochain/_dict/_process.py
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
def rename(self, mapping: Mapping[K, K]) -> Dict[K, V]:
    """
    Return a new Dict with keys renamed according to the mapping.

    Args:
        mapping: A dictionary mapping old keys to new keys.

    Keys not in the mapping are kept as is.
    ```python
    >>> import pyochain as pc
    >>> d = {"a": 1, "b": 2, "c": 3}
    >>> mapping = {"b": "beta", "c": "gamma"}
    >>> pc.Dict(d).rename(mapping).unwrap()
    {'a': 1, 'beta': 2, 'gamma': 3}

    ```
    """

    def _rename(data: dict[K, V]) -> dict[K, V]:
        return {mapping.get(k, k): v for k, v in data.items()}

    return self.apply(_rename)

schema

schema(max_depth: int = 1) -> Dict[str, Any]

Return the schema of the dictionary up to a maximum depth.

Parameters:

Name Type Description Default
max_depth int

Maximum depth to inspect. Nested dicts beyond this depth are marked as 'dict'.

1

When the max depth is reached, nested dicts are marked as 'dict'. For lists, only the first element is inspected.

>>> import pyochain as pc
>>> # Depth 2: we see up to level2
>>> data = {
...     "level1": {"level2": {"level3": {"key": "value"}}},
...     "other_key": 123,
...     "list_key": [{"sub_key": "sub_value"}],
... }
>>> pc.Dict(data).schema(max_depth=1).unwrap()
{'level1': 'dict', 'other_key': 'int', 'list_key': 'list'}
>>> pc.Dict(data).schema(max_depth=2).unwrap()
{'level1': {'level2': 'dict'}, 'other_key': 'int', 'list_key': 'dict'}
>>>
>>> # Depth 3: we see up to level3
>>> pc.Dict(data).schema(max_depth=3).unwrap()
{'level1': {'level2': {'level3': 'dict'}}, 'other_key': 'int', 'list_key': {'sub_key': 'str'}}

Source code in src/pyochain/_dict/_nested.py
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
def schema(self, max_depth: int = 1) -> Dict[str, Any]:
    """
    Return the schema of the dictionary up to a maximum depth.

    Args:
        max_depth: Maximum depth to inspect. Nested dicts beyond this depth are marked as 'dict'.

    When the max depth is reached, nested dicts are marked as 'dict'.
    For lists, only the first element is inspected.
    ```python
    >>> import pyochain as pc
    >>> # Depth 2: we see up to level2
    >>> data = {
    ...     "level1": {"level2": {"level3": {"key": "value"}}},
    ...     "other_key": 123,
    ...     "list_key": [{"sub_key": "sub_value"}],
    ... }
    >>> pc.Dict(data).schema(max_depth=1).unwrap()
    {'level1': 'dict', 'other_key': 'int', 'list_key': 'list'}
    >>> pc.Dict(data).schema(max_depth=2).unwrap()
    {'level1': {'level2': 'dict'}, 'other_key': 'int', 'list_key': 'dict'}
    >>>
    >>> # Depth 3: we see up to level3
    >>> pc.Dict(data).schema(max_depth=3).unwrap()
    {'level1': {'level2': {'level3': 'dict'}}, 'other_key': 'int', 'list_key': {'sub_key': 'str'}}

    ```
    """

    def _schema(data: dict[Any, Any]) -> Any:
        def _recurse_schema(node: Any, current_depth: int) -> Any:
            if isinstance(node, dict):
                if current_depth >= max_depth:
                    return "dict"
                return {
                    k: _recurse_schema(v, current_depth + 1)
                    for k, v in node.items()  # type: ignore
                }
            elif cz.itertoolz.isiterable(node):
                if current_depth >= max_depth:
                    return type(node).__name__
                return _recurse_schema(cz.itertoolz.first(node), current_depth + 1)
            else:
                return type(node).__name__

        return _recurse_schema(data, 0)

    return self.apply(_schema)

select

select(*exprs: IntoExpr) -> Dict[str, Any]

Select and alias fields from the dict based on expressions and/or strings.

Navigate nested fields using the pyochain.key function.

  • Chain key.key() calls to access nested fields.
  • Use key.apply() to transform values.
  • Use key.alias() to rename fields in the resulting dict.

Parameters:

Name Type Description Default
*exprs IntoExpr

Expressions or strings to select and alias fields from the dictionary.

()
>>> import pyochain as pc
>>> data = {
...     "name": "Alice",
...     "age": 30,
...     "scores": {"eng": [85, 90, 95], "math": [80, 88, 92]},
... }
>>> scores_expr = pc.key("scores")  # save an expression for reuse
>>> pc.Dict(data).select(
...     pc.key("name").alias("student_name"),
...     "age",  # shorthand for pc.key("age")
...     scores_expr.key("math").alias("math_scores"),
...     scores_expr.key("eng")
...     .apply(lambda v: pc.Seq(v).mean())
...     .alias("average_eng_score"),
... ).unwrap()
{'student_name': 'Alice', 'age': 30, 'math_scores': [80, 88, 92], 'average_eng_score': 90}
Source code in src/pyochain/_dict/_main.py
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
def select(self: Dict[str, Any], *exprs: IntoExpr) -> Dict[str, Any]:
    """
    Select and alias fields from the dict based on expressions and/or strings.

    Navigate nested fields using the `pyochain.key` function.

    - Chain `key.key()` calls to access nested fields.
    - Use `key.apply()` to transform values.
    - Use `key.alias()` to rename fields in the resulting dict.

    Args:
        *exprs: Expressions or strings to select and alias fields from the dictionary.

    ```python
    >>> import pyochain as pc
    >>> data = {
    ...     "name": "Alice",
    ...     "age": 30,
    ...     "scores": {"eng": [85, 90, 95], "math": [80, 88, 92]},
    ... }
    >>> scores_expr = pc.key("scores")  # save an expression for reuse
    >>> pc.Dict(data).select(
    ...     pc.key("name").alias("student_name"),
    ...     "age",  # shorthand for pc.key("age")
    ...     scores_expr.key("math").alias("math_scores"),
    ...     scores_expr.key("eng")
    ...     .apply(lambda v: pc.Seq(v).mean())
    ...     .alias("average_eng_score"),
    ... ).unwrap()
    {'student_name': 'Alice', 'age': 30, 'math_scores': [80, 88, 92], 'average_eng_score': 90}

    ```
    """

    def _select(data: dict[str, Any]) -> dict[str, Any]:
        return compute_exprs(exprs, data, {})

    return self.apply(_select)

sort

sort(reverse: bool = False) -> Dict[K, V]

Sort the dictionary by its keys and return a new Dict.

Parameters:

Name Type Description Default
reverse bool

Whether to sort in descending order. Defaults to False.

False
>>> import pyochain as pc
>>> pc.Dict({"b": 2, "a": 1}).sort().unwrap()
{'a': 1, 'b': 2}
Source code in src/pyochain/_dict/_process.py
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
def sort(self, reverse: bool = False) -> Dict[K, V]:
    """
    Sort the dictionary by its keys and return a new Dict.

    Args:
        reverse: Whether to sort in descending order. Defaults to False.

    ```python
    >>> import pyochain as pc
    >>> pc.Dict({"b": 2, "a": 1}).sort().unwrap()
    {'a': 1, 'b': 2}

    ```
    """

    def _sort(data: dict[K, V]) -> dict[K, V]:
        return dict(sorted(data.items(), reverse=reverse))

    return self.apply(_sort)

struct

struct(
    func: Callable[Concatenate[Dict[K, U], P], R], *args: P.args, **kwargs: P.kwargs
) -> Dict[K, R]

Apply a function to each value after wrapping it in a Dict.

Parameters:

Name Type Description Default
func Callable[Concatenate[Dict[K, U], P], R]

Function to apply to each value after wrapping it in a Dict.

required
*args P.args

Positional arguments to pass to the function.

()
**kwargs P.kwargs

Keyword arguments to pass to the function.

{}

Syntactic sugar for map_values(lambda data: func(pc.Dict(data), *args, **kwargs))

>>> import pyochain as pc
>>> data = {
...     "person1": {"name": "Alice", "age": 30, "city": "New York"},
...     "person2": {"name": "Bob", "age": 25, "city": "Los Angeles"},
... }
>>> pc.Dict(data).struct(lambda d: d.map_keys(str.upper).drop("AGE").unwrap())
... # doctest: +NORMALIZE_WHITESPACE
Dict({
    'person1': {'NAME': 'Alice', 'CITY': 'New York'},
    'person2': {'NAME': 'Bob', 'CITY': 'Los Angeles'}
})

Source code in src/pyochain/_dict/_nested.py
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
def struct[**P, R, U: dict[Any, Any]](
    self: NestedDict[K, U],
    func: Callable[Concatenate[Dict[K, U], P], R],
    *args: P.args,
    **kwargs: P.kwargs,
) -> Dict[K, R]:
    """
    Apply a function to each value after wrapping it in a Dict.

    Args:
        func: Function to apply to each value after wrapping it in a Dict.
        *args: Positional arguments to pass to the function.
        **kwargs: Keyword arguments to pass to the function.

    Syntactic sugar for `map_values(lambda data: func(pc.Dict(data), *args, **kwargs))`
    ```python
    >>> import pyochain as pc
    >>> data = {
    ...     "person1": {"name": "Alice", "age": 30, "city": "New York"},
    ...     "person2": {"name": "Bob", "age": 25, "city": "Los Angeles"},
    ... }
    >>> pc.Dict(data).struct(lambda d: d.map_keys(str.upper).drop("AGE").unwrap())
    ... # doctest: +NORMALIZE_WHITESPACE
    Dict({
        'person1': {'NAME': 'Alice', 'CITY': 'New York'},
        'person2': {'NAME': 'Bob', 'CITY': 'Los Angeles'}
    })

    ```
    """
    from ._main import Dict

    def _struct(data: Mapping[K, U]) -> dict[K, R]:
        def _(v: dict[Any, Any]) -> R:
            return func(Dict(v), *args, **kwargs)

        return cz.dicttoolz.valmap(_, data)

    return self.apply(_struct)

unwrap

unwrap() -> T

Return the underlying data.

This is a terminal operation.

Source code in src/pyochain/_core/_main.py
61
62
63
64
65
66
67
def unwrap(self) -> T:
    """
    Return the underlying data.

    This is a terminal operation.
    """
    return self._data

update_in

update_in(*keys: K, func: Callable[[V], V], default: V | None = None) -> Dict[K, V]

Update value in a (potentially) nested dictionary.

Parameters:

Name Type Description Default
*keys K

Sequence of keys representing the nested path to update.

()
func Callable[[V], V]

Function to apply to the value at the specified path.

required
default V | None

Default value to use if the path does not exist, by default None

None

Applies the func to the value at the path specified by keys, returning a new Dict with the updated value.

If the path does not exist, it will be created with the default value (if provided) before applying func.

>>> import pyochain as pc
>>> inc = lambda x: x + 1
>>> pc.Dict({"a": 0}).update_in("a", func=inc).unwrap()
{'a': 1}
>>> transaction = {
...     "name": "Alice",
...     "purchase": {"items": ["Apple", "Orange"], "costs": [0.50, 1.25]},
...     "credit card": "5555-1234-1234-1234",
... }
>>> pc.Dict(transaction).update_in("purchase", "costs", func=sum).unwrap()
{'name': 'Alice', 'purchase': {'items': ['Apple', 'Orange'], 'costs': 1.75}, 'credit card': '5555-1234-1234-1234'}
>>> # updating a value when k0 is not in d
>>> pc.Dict({}).update_in(1, 2, 3, func=str, default="bar").unwrap()
{1: {2: {3: 'bar'}}}
>>> pc.Dict({1: "foo"}).update_in(2, 3, 4, func=inc, default=0).unwrap()
{1: 'foo', 2: {3: {4: 1}}}

Source code in src/pyochain/_dict/_process.py
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
def update_in(
    self, *keys: K, func: Callable[[V], V], default: V | None = None
) -> Dict[K, V]:
    """
    Update value in a (potentially) nested dictionary.

    Args:
        *keys: Sequence of keys representing the nested path to update.
        func: Function to apply to the value at the specified path.
        default: Default value to use if the path does not exist, by default None

    Applies the func to the value at the path specified by keys, returning a new Dict with the updated value.

    If the path does not exist, it will be created with the default value (if provided) before applying func.
    ```python
    >>> import pyochain as pc
    >>> inc = lambda x: x + 1
    >>> pc.Dict({"a": 0}).update_in("a", func=inc).unwrap()
    {'a': 1}
    >>> transaction = {
    ...     "name": "Alice",
    ...     "purchase": {"items": ["Apple", "Orange"], "costs": [0.50, 1.25]},
    ...     "credit card": "5555-1234-1234-1234",
    ... }
    >>> pc.Dict(transaction).update_in("purchase", "costs", func=sum).unwrap()
    {'name': 'Alice', 'purchase': {'items': ['Apple', 'Orange'], 'costs': 1.75}, 'credit card': '5555-1234-1234-1234'}
    >>> # updating a value when k0 is not in d
    >>> pc.Dict({}).update_in(1, 2, 3, func=str, default="bar").unwrap()
    {1: {2: {3: 'bar'}}}
    >>> pc.Dict({1: "foo"}).update_in(2, 3, 4, func=inc, default=0).unwrap()
    {1: 'foo', 2: {3: {4: 1}}}

    ```
    """
    return self.apply(cz.dicttoolz.update_in, keys, func, default=default)

with_fields

with_fields(*exprs: IntoExpr) -> Dict[str, Any]

Merge aliased expressions into the root dict (overwrite on collision).

Parameters:

Name Type Description Default
*exprs IntoExpr

Expressions to merge into the root dictionary.

()
>>> import pyochain as pc
>>> data = {
...     "name": "Alice",
...     "age": 30,
...     "scores": {"eng": [85, 90, 95], "math": [80, 88, 92]},
... }
>>> scores_expr = pc.key("scores")  # save an expression for reuse
>>> pc.Dict(data).with_fields(
...     scores_expr.key("eng")
...     .apply(lambda v: pc.Seq(v).mean())
...     .alias("average_eng_score"),
... ).unwrap()
{'name': 'Alice', 'age': 30, 'scores': {'eng': [85, 90, 95], 'math': [80, 88, 92]}, 'average_eng_score': 90}
Source code in src/pyochain/_dict/_main.py
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
def with_fields(self: Dict[str, Any], *exprs: IntoExpr) -> Dict[str, Any]:
    """
    Merge aliased expressions into the root dict (overwrite on collision).

    Args:
        *exprs: Expressions to merge into the root dictionary.

    ```python
    >>> import pyochain as pc
    >>> data = {
    ...     "name": "Alice",
    ...     "age": 30,
    ...     "scores": {"eng": [85, 90, 95], "math": [80, 88, 92]},
    ... }
    >>> scores_expr = pc.key("scores")  # save an expression for reuse
    >>> pc.Dict(data).with_fields(
    ...     scores_expr.key("eng")
    ...     .apply(lambda v: pc.Seq(v).mean())
    ...     .alias("average_eng_score"),
    ... ).unwrap()
    {'name': 'Alice', 'age': 30, 'scores': {'eng': [85, 90, 95], 'math': [80, 88, 92]}, 'average_eng_score': 90}

    ```
    """

    def _with_fields(data: dict[str, Any]) -> dict[str, Any]:
        return compute_exprs(exprs, data, data.copy())

    return self.apply(_with_fields)

with_key

with_key(key: K, value: V) -> Dict[K, V]

Return a new Dict with key set to value.

Parameters:

Name Type Description Default
key K

Key to set in the dictionary.

required
value V

Value to associate with the specified key.

required

Does not modify the initial dictionary.

>>> import pyochain as pc
>>> pc.Dict({"x": 1}).with_key("x", 2).unwrap()
{'x': 2}
>>> pc.Dict({"x": 1}).with_key("y", 3).unwrap()
{'x': 1, 'y': 3}
>>> pc.Dict({}).with_key("x", 1).unwrap()
{'x': 1}

Source code in src/pyochain/_dict/_process.py
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
def with_key(self, key: K, value: V) -> Dict[K, V]:
    """
    Return a new Dict with key set to value.

    Args:
        key: Key to set in the dictionary.
        value: Value to associate with the specified key.

    Does not modify the initial dictionary.
    ```python
    >>> import pyochain as pc
    >>> pc.Dict({"x": 1}).with_key("x", 2).unwrap()
    {'x': 2}
    >>> pc.Dict({"x": 1}).with_key("y", 3).unwrap()
    {'x': 1, 'y': 3}
    >>> pc.Dict({}).with_key("x", 1).unwrap()
    {'x': 1}

    ```
    """
    return self.apply(cz.dicttoolz.assoc, key, value)

with_nested_key

with_nested_key(*keys: K, value: V) -> Dict[K, V]

Set a nested key path and return a new Dict with new, potentially nested, key value pair.

Parameters:

Name Type Description Default
*keys K

Sequence of keys representing the nested path.

()
value V

Value to set at the specified nested path.

required
>>> import pyochain as pc
>>> purchase = {
...     "name": "Alice",
...     "order": {"items": ["Apple", "Orange"], "costs": [0.50, 1.25]},
...     "credit card": "5555-1234-1234-1234",
... }
>>> pc.Dict(purchase).with_nested_key(
...     "order", "costs", value=[0.25, 1.00]
... ).unwrap()
{'name': 'Alice', 'order': {'items': ['Apple', 'Orange'], 'costs': [0.25, 1.0]}, 'credit card': '5555-1234-1234-1234'}
Source code in src/pyochain/_dict/_nested.py
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
def with_nested_key(self, *keys: K, value: V) -> Dict[K, V]:
    """
    Set a nested key path and return a new Dict with new, potentially nested, key value pair.

    Args:
        *keys: Sequence of keys representing the nested path.
        value: Value to set at the specified nested path.
    ```python
    >>> import pyochain as pc
    >>> purchase = {
    ...     "name": "Alice",
    ...     "order": {"items": ["Apple", "Orange"], "costs": [0.50, 1.25]},
    ...     "credit card": "5555-1234-1234-1234",
    ... }
    >>> pc.Dict(purchase).with_nested_key(
    ...     "order", "costs", value=[0.25, 1.00]
    ... ).unwrap()
    {'name': 'Alice', 'order': {'items': ['Apple', 'Orange'], 'costs': [0.25, 1.0]}, 'credit card': '5555-1234-1234-1234'}

    ```
    """
    return self.apply(cz.dicttoolz.assoc_in, keys, value=value)