Skip to content

Join Operation

The Join operation joins two tables by keeping tuple pairs that satisfy a natural language condition.

Core Implementation

nirvana.ops.join.JoinOperation(user_instruction: str = '', left_on: list[str] = [], right_on: list[str] = [], how: str = 'inner', context: list[dict] | str | None = None, model: str | None = None, tool: Callable | BaseTool | None = None, strategy: Literal['nest', 'block'] = 'nest', limit: int | None = None, rate_limit: int = 16, assertions: list[Callable] | None = [], batch_size: int = 5)

Bases: BaseOperation

Join operator: Join values of two columns against a specific user's instruction.

Source code in nirvana/ops/join.py
def __init__(
    self,
    user_instruction: str = "",
    left_on: list[str] = [],
    right_on: list[str] = [],
    how: str = "inner",
    context: list[dict] | str | None = None,
    model: str | None = None,
    tool: Callable | BaseTool | None = None,
    strategy: Literal["nest", "block"] = "nest",
    limit: int | None = None,
    rate_limit: int = 16,
    assertions: list[Callable] | None = [],
    batch_size: int = 5,
):
    if tool and not isinstance(tool, BaseTool):
        tool = FunctionCallTool.from_function(func=tool)

    super().__init__(
        op_name="join", 
        user_instruction=user_instruction,
        context=context,
        model=model,
        tool=tool,
        strategy=strategy,
        limit=limit,
        rate_limit=rate_limit,
        assertions=assertions,
    )
    self.prompter = JoinPrompter()
    self.left_on = left_on
    self.right_on = right_on
    self.how = how
    self.batch_size = batch_size

Attributes

strategy_options = ['nest', 'block'] class-attribute instance-attribute
prompter = JoinPrompter() instance-attribute
left_on = left_on instance-attribute
right_on = right_on instance-attribute
how = how instance-attribute
batch_size = batch_size instance-attribute
dependencies: list[str] property
generated_fields: list[str] property
op_kwargs: dict property

Functions

execute(left_data: pd.DataFrame, right_data: pd.DataFrame, **kwargs) async
Source code in nirvana/ops/join.py
async def execute(
    self, 
    left_data: pd.DataFrame,
    right_data: pd.DataFrame,
    **kwargs
):
    if self.user_instruction is None and not self.has_udf():
        raise ValueError("`user_instruction` or `tool` (e.g., a UDF) is required.")
    if left_data.empty or right_data.empty:
        return JoinOpOutputs(
            output=[],
            left_join_keys=[],
            right_join_keys=[],
            cost=0.0,
        )

    # Prepare dtypes for left and right join columns
    left_dtypes: list = []
    for col in self.left_on:
        if isinstance(left_data[col].dtype, ImageDtype):
            left_dtypes.append("image")
        else:
            left_dtypes.append("text")
    right_dtypes: list = []
    for col in self.right_on:
        if isinstance(right_data[col].dtype, ImageDtype):
            right_dtypes.append("image")
        else:
            right_dtypes.append("text")

    if self.strategy == "nest":
        return await self._nested_join(left_data, right_data, self.user_instruction, left_dtypes, right_dtypes, **kwargs)
    elif self.strategy == "block":
        if self.has_udf():
            warnings.warn("The block semantic join does not support user-defined functions for now.")
        return await self._block_join(left_data, right_data, self.user_instruction, self.batch_size, left_dtypes, right_dtypes, **kwargs)
    else:
        raise ValueError(f"The optional strategies available for join are {self.strategy_options}. Strategy {self.strategy} is not supported.")

Output Class

nirvana.ops.join.JoinOpOutputs(cost: float = 0.0, join_pairs: list[tuple] = list(), left_join_keys: list[int] = list(), right_join_keys: list[int] = list()) dataclass

Bases: BaseOpOutputs

Attributes

join_pairs: list[tuple] = field(default_factory=list) class-attribute instance-attribute
left_join_keys: list[int] = field(default_factory=list) class-attribute instance-attribute
right_join_keys: list[int] = field(default_factory=list) class-attribute instance-attribute

Functions

Function Wrapper

nirvana.ops.join

join_wrapper(left_data: DataFrame, right_data: DataFrame, user_instruction: str, left_on: str, right_on: str, how: str = 'inner', context: list[dict] | str | None = None, model: str | None = None, func: Callable = None, strategy: Literal['nest', 'block'] = 'nest', limit: int | None = None, rate_limit: int = 16, assertions: list[Callable] | None = [], batch_size: int = 5, **kwargs)

A function wrapper for join operation

Parameters:

Name Type Description Default
left_data DataFrame

Left dataframe

required
right_data DataFrame

Right dataframe

required
user_instruction str

User instruction

required
left_on str

Left on

required
right_on str

Right on

required
how str

How. Defaults to "inner".

'inner'
context list[dict] | str

Context. Defaults to None.

None
model str

Model. Defaults to None.

None
func Callable

User function. Defaults to None.

None
strategy Literal['nest', 'block']

Strategy. Defaults to "nest".

'nest'
limit int

Maximum number of outputs to produce before stopping.

None
rate_limit int

Rate limit. Defaults to 16.

16
assertions list[Callable]

Assertions. Defaults to [].

[]
batch_size int

Batch size for block join. Defaults to 5.

5
**kwargs

Additional keyword arguments for OpenAI Clent.

{}
Source code in nirvana/ops/join.py
def join_wrapper(
    left_data: pd.DataFrame,
    right_data: pd.DataFrame, 
    user_instruction: str,
    left_on: str,
    right_on: str,
    how: str = "inner",
    context: list[dict] | str | None = None,
    model: str | None = None,
    func: Callable = None,
    strategy: Literal["nest", "block"] = "nest",
    limit: int | None = None,
    rate_limit: int = 16,
    assertions: list[Callable] | None = [],
    batch_size: int = 5,
    **kwargs
):
    """
    A function wrapper for join operation

    Args:
        left_data (pd.DataFrame): Left dataframe
        right_data (pd.DataFrame): Right dataframe
        user_instruction (str): User instruction
        left_on (str): Left on
        right_on (str): Right on
        how (str, optional): How. Defaults to "inner".
        context (list[dict] | str, optional): Context. Defaults to None.
        model (str, optional): Model. Defaults to None.
        func (Callable, optional): User function. Defaults to None.
        strategy (Literal["nest", "block"], optional): Strategy. Defaults to "nest".
        limit (int): Maximum number of outputs to produce before stopping.
        rate_limit (int, optional): Rate limit. Defaults to 16.
        assertions (list[Callable], optional): Assertions. Defaults to [].
        batch_size (int, optional): Batch size for block join. Defaults to 5.
        **kwargs: Additional keyword arguments for OpenAI Clent.
    """

    join_op = JoinOperation(
        user_instruction=user_instruction,
        left_on=[left_on],
        right_on=[right_on],
        how=how,
        context=context,
        model=model,
        tool=FunctionCallTool(func=func) if func else None,
        strategy=strategy,
        limit=limit,
        rate_limit=rate_limit,
        assertions=assertions,
        batch_size=batch_size,
    )
    outputs = asyncio.run(join_op.execute(
        left_data=left_data,
        right_data=right_data,
        **kwargs
    ))
    return outputs