Skip to content

Reduce Operation

The Reduce operation aggregates multiple values in a given column into a single result according to the user's instruction.

Core Implementation

nirvana.ops.reduce.ReduceOperation(user_instruction: str = '', input_columns: list[str] = [], context: list[dict] | str | None = None, model: str | None = None, tool: Callable | BaseTool | None = None, strategy: Literal['plain'] = 'plain', rate_limit: int = 16, assertions: list[Callable] | None = [])

Bases: BaseOperation

Reduce operator: Aggregates values in a column based on an NL-specified reduction function

This is a simple implementation that does not consider the case where the input length exceeds the token limit.

The next step is to implement several optimizations, like summarize and aggregate and incremental aggregation

Source code in nirvana/ops/reduce.py
def __init__(
    self,
    user_instruction: str = "",
    input_columns: list[str] = [],
    context: list[dict] | str | None = None,
    model: str | None = None,
    tool: Callable | BaseTool | None = None,
    strategy: Literal["plain"] = "plain",
    rate_limit: int = 16,
    assertions: list[Callable] | None = [],
):
    if tool and not isinstance(tool, BaseTool):
        tool = FunctionCallTool.from_function(func=tool)

    super().__init__(
        op_name="reduce",
        user_instruction=user_instruction,
        context=context,
        model=model,
        tool=tool,
        strategy=strategy,
        rate_limit=rate_limit,
        assertions=assertions,
    )
    self.prompter = ReducePrompter()
    self.input_columns = input_columns

Attributes

prompter = ReducePrompter() instance-attribute
input_columns = input_columns instance-attribute
dependencies: list[str] property
generated_fields: list[str] property
op_kwargs property

Functions

execute(input_data: pd.DataFrame, **kwargs) async
Source code in nirvana/ops/reduce.py
async def execute(
    self,
    input_data: pd.DataFrame,
    **kwargs
):
    if self.user_instruction is None and not self.has_udf():
        raise ValueError("Neither `user_instruction` nor `func` is given.")

    if input_data.empty:
        return ReduceOpOutputs(output="No data to process.")

    processed_data = input_data[self.input_columns[0]]
    if isinstance(processed_data.dtype, ImageDtype):
        dtype = "image"
    elif is_numeric_dtype(processed_data):
        dtype = "numeric"
    else:
        dtype = "text"

    reduce_results, token_cost = None, 0
    if self.has_udf() and dtype == "numeric":
        reduce_results, token_cost = await self._execute_by_func(processed_data, self.user_instruction, self.tool, self._execute_by_plain_llm, dtype, model=self.model, **kwargs)    
    else:
        reduce_results, token_cost = await self._execute_by_plain_llm(processed_data, self.user_instruction, dtype, model=self.model, **kwargs)

    return ReduceOpOutputs(
        output=reduce_results,
        cost=token_cost
    )

Output Class

nirvana.ops.reduce.ReduceOpOutputs(cost: float = 0.0, output: Any = None) dataclass

Bases: BaseOpOutputs

Attributes

output: Any = field(default=None) class-attribute instance-attribute

Functions

__add__(other: ReduceOpOutputs)
Source code in nirvana/ops/reduce.py
def __add__(self, other: "ReduceOpOutputs"):
    return ReduceOpOutputs(
        output=self.output + other.output,
        cost=self.cost + other.cost
    )

Function Wrapper

nirvana.ops.reduce

reduce_wrapper(input_data: Iterable[Any], user_instruction: str = None, input_column: str = None, context: list[dict] | str | None = None, model: str | None = None, func: Callable = None, strategy: Literal['plain'] = 'plain', rate_limit: int = 16, assertions: list[Callable] | None = [], **kwargs)

A function wrapper for reduce operation

Parameters:

Name Type Description Default
input_data Iterable[Any]

Input data

required
user_instruction str

User instruction. Defaults to None.

None
input_column str

Input column. Defaults to None.

None
context list[dict] | str

Context. Defaults to None.

None
model str

Model. Defaults to None.

None
func Callable

User function. Defaults to None.

None
strategy Literal['plain']

Strategy. Defaults to "plain".

'plain'
rate_limit int

Rate limit. Defaults to 16.

16
assertions list[Callable]

Assertions. Defaults to [].

[]
**kwargs

Additional keyword arguments for OpenAI Clent.

{}
Source code in nirvana/ops/reduce.py
def reduce_wrapper(
    input_data: Iterable[Any],
    user_instruction: str = None,
    input_column: str = None,
    context: list[dict] | str | None = None,
    model: str | None = None,
    func: Callable = None,
    strategy: Literal["plain"] = "plain",
    rate_limit: int = 16,
    assertions: list[Callable] | None = [],
    **kwargs
):
    """
    A function wrapper for reduce operation

    Args:
        input_data (Iterable[Any]): Input data
        user_instruction (str, optional): User instruction. Defaults to None.
        input_column (str, optional): Input column. Defaults to None.
        context (list[dict] | str, optional): Context. Defaults to None.
        model (str, optional): Model. Defaults to None.
        func (Callable, optional): User function. Defaults to None.
        strategy (Literal["plain"], optional): Strategy. Defaults to "plain".
        rate_limit (int, optional): Rate limit. Defaults to 16.
        assertions (list[Callable], optional): Assertions. Defaults to [].
        **kwargs: Additional keyword arguments for OpenAI Clent.
    """

    reduce_op = ReduceOperation(
        user_instruction=user_instruction,
        input_columns=[input_column],
        context=context,
        model=model,
        tool=FunctionCallTool.from_function(func=func) if func else None,
        strategy=strategy,
        rate_limit=rate_limit,
        assertions=assertions,
    )
    outputs = asyncio.run(reduce_op.execute(
        input_data=input_data,
        **kwargs
    ))
    return outputs