Skip to content

DataFrameBaseView#

Tip

To learn how to use Pandas DataFrame with db-ally see How To: Use Pandas DataFrames with db-ally.

dbally.DataFrameBaseView #

DataFrameBaseView(df: DataFrame)

Bases: MethodsBaseView

Base class for views that use Pandas DataFrames to store and filter data.

The views take a Pandas DataFrame as input and apply filters to it. The filters are defined as methods that return a Pandas Series representing a boolean mask to be applied to the DataFrame.

Creates a new instance of the DataFrame view.

PARAMETER DESCRIPTION
df

Pandas DataFrame with the data to be filtered.

TYPE: DataFrame

Source code in src/dbally/views/pandas_base.py
def __init__(self, df: pd.DataFrame) -> None:
    """
    Creates a new instance of the DataFrame view.

    Args:
        df: Pandas DataFrame with the data to be filtered.
    """
    super().__init__()
    self.df = df
    self._filter_mask: Optional[pd.Series] = None
    self._aggregation_group: AggregationGroup = AggregationGroup()

HIDDEN_ARGUMENTS class-attribute instance-attribute #

HIDDEN_ARGUMENTS = ['cls', 'self', 'return']

df instance-attribute #

df = df

ask async #

ask(query: str, llm: LLM, event_tracker: Optional[EventTracker] = None, n_retries: int = 3, dry_run: bool = False, llm_options: Optional[LLMOptions] = None) -> ViewExecutionResult

Executes the query and returns the result. It generates the IQL query from the natural language query and applies the filters to the view. It retries the process in case of errors.

PARAMETER DESCRIPTION
query

The natural language query to execute.

TYPE: str

llm

The LLM used to execute the query.

TYPE: LLM

event_tracker

The event tracker used to audit the query execution.

TYPE: Optional[EventTracker] DEFAULT: None

n_retries

The number of retries to execute the query in case of errors.

TYPE: int DEFAULT: 3

dry_run

If True, the query will not be used to fetch data from the datasource.

TYPE: bool DEFAULT: False

llm_options

Options to use for the LLM.

TYPE: Optional[LLMOptions] DEFAULT: None

RETURNS DESCRIPTION
ViewExecutionResult

The result of the query.

RAISES DESCRIPTION
ViewExecutionError

When an error occurs while executing the view.

Source code in src/dbally/views/structured.py
async def ask(
    self,
    query: str,
    llm: LLM,
    event_tracker: Optional[EventTracker] = None,
    n_retries: int = 3,
    dry_run: bool = False,
    llm_options: Optional[LLMOptions] = None,
) -> ViewExecutionResult:
    """
    Executes the query and returns the result. It generates the IQL query from the natural language query\
    and applies the filters to the view. It retries the process in case of errors.

    Args:
        query: The natural language query to execute.
        llm: The LLM used to execute the query.
        event_tracker: The event tracker used to audit the query execution.
        n_retries: The number of retries to execute the query in case of errors.
        dry_run: If True, the query will not be used to fetch data from the datasource.
        llm_options: Options to use for the LLM.

    Returns:
        The result of the query.

    Raises:
        ViewExecutionError: When an error occurs while executing the view.
    """
    filters = self.list_filters()
    examples = self.list_few_shots()
    aggregations = self.list_aggregations()

    iql_generator = self.get_iql_generator()
    iql = await iql_generator(
        question=query,
        filters=filters,
        aggregations=aggregations,
        examples=examples,
        llm=llm,
        event_tracker=event_tracker,
        llm_options=llm_options,
        n_retries=n_retries,
    )

    if iql.failed:
        raise ViewExecutionError(
            view_name=self.__class__.__name__,
            iql=iql,
        )

    if iql.filters:
        await self.apply_filters(iql.filters)

    if iql.aggregation:
        await self.apply_aggregation(iql.aggregation)

    result = self.execute(dry_run=dry_run)
    result.context["iql"] = {
        "filters": str(iql.filters) if iql.filters else None,
        "aggregation": str(iql.aggregation) if iql.aggregation else None,
    }
    return result

list_similarity_indexes #

list_similarity_indexes() -> Dict[AbstractSimilarityIndex, List[IndexLocation]]

Lists all the similarity indexes used by the view.

RETURNS DESCRIPTION
Dict[AbstractSimilarityIndex, List[IndexLocation]]

Mapping of similarity indexes to their locations in the (view_name, filter_name, argument_name) format.

Source code in src/dbally/views/structured.py
def list_similarity_indexes(self) -> Dict[AbstractSimilarityIndex, List[IndexLocation]]:
    """
    Lists all the similarity indexes used by the view.

    Returns:
        Mapping of similarity indexes to their locations in the (view_name, filter_name, argument_name) format.
    """
    indexes = defaultdict(list)
    filters = self.list_filters()
    for filter_ in filters:
        for param in filter_.parameters:
            if param.similarity_index:
                indexes[param.similarity_index].append((self.__class__.__name__, filter_.name, param.name))
    return indexes

list_few_shots #

list_few_shots() -> List[FewShotExample]

List all examples to be injected into few-shot prompt.

RETURNS DESCRIPTION
List[FewShotExample]

List of few-shot examples

Source code in src/dbally/views/base.py
def list_few_shots(self) -> List[FewShotExample]:
    """
    List all examples to be injected into few-shot prompt.

    Returns:
        List of few-shot examples
    """
    return []

get_iql_generator #

get_iql_generator() -> IQLGenerator

Returns the IQL generator for the view.

RETURNS DESCRIPTION
IQLGenerator

IQL generator for the view.

Source code in src/dbally/views/structured.py
def get_iql_generator(self) -> IQLGenerator:
    """
    Returns the IQL generator for the view.

    Returns:
        IQL generator for the view.
    """
    return IQLGenerator()

list_filters #

list_filters() -> List[ExposedFunction]

List filters in the given view

RETURNS DESCRIPTION
List[ExposedFunction]

Filters defined inside the View and decorated with decorators.view_filter.

Source code in src/dbally/views/methods_base.py
def list_filters(self) -> List[ExposedFunction]:
    """
    List filters in the given view

    Returns:
        Filters defined inside the View and decorated with `decorators.view_filter`.
    """
    return self.list_methods_by_decorator(decorators.view_filter)

list_aggregations #

list_aggregations() -> List[ExposedFunction]

List aggregations in the given view

RETURNS DESCRIPTION
List[ExposedFunction]

Aggregations defined inside the View and decorated with decorators.view_aggregation.

Source code in src/dbally/views/methods_base.py
def list_aggregations(self) -> List[ExposedFunction]:
    """
    List aggregations in the given view

    Returns:
        Aggregations defined inside the View and decorated with `decorators.view_aggregation`.
    """
    return self.list_methods_by_decorator(decorators.view_aggregation)

list_methods_by_decorator classmethod #

list_methods_by_decorator(decorator: Callable) -> List[ExposedFunction]

Lists all methods decorated with the given decorator.

PARAMETER DESCRIPTION
decorator

The decorator to filter the methods

TYPE: Callable

RETURNS DESCRIPTION
List[ExposedFunction]

List of exposed methods

Source code in src/dbally/views/methods_base.py
@classmethod
def list_methods_by_decorator(cls, decorator: Callable) -> List[ExposedFunction]:
    """
    Lists all methods decorated with the given decorator.

    Args:
        decorator: The decorator to filter the methods

    Returns:
        List of exposed methods
    """
    methods = []
    for method_name in dir(cls):
        method = getattr(cls, method_name)
        if (
            hasattr(method, "_methodDecorator")
            and method._methodDecorator == decorator  # pylint: disable=protected-access
        ):
            annotations = method.__annotations__.items()
            methods.append(
                ExposedFunction(
                    name=method_name,
                    description=textwrap.dedent(method.__doc__).strip() if method.__doc__ else "",
                    parameters=[
                        MethodParamWithTyping(n, t) for n, t in annotations if n not in cls.HIDDEN_ARGUMENTS
                    ],
                )
            )
    return methods

call_filter_method async #

call_filter_method(func: FunctionCall) -> Any

Converts a IQL FunctonCall filter to a method call. If the method is a coroutine, it will be awaited.

PARAMETER DESCRIPTION
func

IQL FunctionCall node

TYPE: FunctionCall

RETURNS DESCRIPTION
Any

The result of the method call

Source code in src/dbally/views/methods_base.py
async def call_filter_method(self, func: syntax.FunctionCall) -> Any:
    """
    Converts a IQL FunctonCall filter to a method call. If the method is a coroutine, it will be awaited.

    Args:
        func: IQL FunctionCall node

    Returns:
        The result of the method call
    """
    method, args = self._method_with_args_from_call(func, decorators.view_filter)
    return await self._call_method(method, args)

call_aggregation_method async #

call_aggregation_method(func: FunctionCall) -> Any

Converts a IQL FunctonCall aggregation to a method call. If the method is a coroutine, it will be awaited.

PARAMETER DESCRIPTION
func

IQL FunctionCall node

TYPE: FunctionCall

RETURNS DESCRIPTION
Any

The result of the method call

Source code in src/dbally/views/methods_base.py
async def call_aggregation_method(self, func: syntax.FunctionCall) -> Any:
    """
    Converts a IQL FunctonCall aggregation to a method call. If the method is a coroutine, it will be awaited.

    Args:
        func: IQL FunctionCall node

    Returns:
        The result of the method call
    """
    method, args = self._method_with_args_from_call(func, decorators.view_aggregation)
    return await self._call_method(method, args)

apply_filters async #

apply_filters(filters: IQLFiltersQuery) -> None

Applies the chosen filters to the view.

PARAMETER DESCRIPTION
filters

IQLQuery object representing the filters to apply.

TYPE: IQLFiltersQuery

Source code in src/dbally/views/pandas_base.py
async def apply_filters(self, filters: IQLFiltersQuery) -> None:
    """
    Applies the chosen filters to the view.

    Args:
        filters: IQLQuery object representing the filters to apply.
    """
    self._filter_mask = await self._build_filter_node(filters.root)

apply_aggregation async #

apply_aggregation(aggregation: IQLAggregationQuery) -> None

Applies the aggregation of choice to the view.

PARAMETER DESCRIPTION
aggregation

IQLQuery object representing the aggregation to apply.

TYPE: IQLAggregationQuery

Source code in src/dbally/views/pandas_base.py
async def apply_aggregation(self, aggregation: IQLAggregationQuery) -> None:
    """
    Applies the aggregation of choice to the view.

    Args:
        aggregation: IQLQuery object representing the aggregation to apply.
    """
    self._aggregation_group = await self.call_aggregation_method(aggregation.root)

execute #

execute(dry_run: bool = False) -> ViewExecutionResult

Executes the view and returns the results. The results are filtered based on the applied filters.

PARAMETER DESCRIPTION
dry_run

If True, the method will only add context field to the ExecutionResult with the mask that would be applied to the dataframe.

TYPE: bool DEFAULT: False

RETURNS DESCRIPTION
ViewExecutionResult

ExecutionResult object with the results and the context information with the binary mask.

Source code in src/dbally/views/pandas_base.py
def execute(self, dry_run: bool = False) -> ViewExecutionResult:
    """
    Executes the view and returns the results. The results are filtered based on the applied filters.

    Args:
        dry_run: If True, the method will only add `context` field to the `ExecutionResult` with the\
            mask that would be applied to the dataframe.

    Returns:
        ExecutionResult object with the results and the context information with the binary mask.
    """
    results = pd.DataFrame()

    if not dry_run:
        results = self.df
        if self._filter_mask is not None:
            results = results.loc[self._filter_mask]

        if self._aggregation_group.groupbys is not None:
            results = results.groupby(self._aggregation_group.groupbys)

        if self._aggregation_group.aggregations is not None:
            results = results.agg(
                **{
                    f"{agg.column}_{agg.function}": (agg.column, agg.function)
                    for agg in self._aggregation_group.aggregations
                }
            )
            results = results.reset_index()

    return ViewExecutionResult(
        results=results.to_dict(orient="records"),
        context={
            "filter_mask": self._filter_mask,
            "groupbys": self._aggregation_group.groupbys,
            "aggregations": self._aggregation_group.aggregations,
        },
    )