Module spin_sdk.postgres

Module for interacting with a Postgres database

Classes

class Connection (connection: Connection)
Expand source code
class Connection:
    def __init__(self, connection: pg.Connection) -> None:
        self.connection = connection

    def __enter__(self) -> Self:
        return self

    def __exit__(self, exc_type: type[BaseException] | None, exc_value: BaseException | None, traceback: TracebackType | None) -> bool | None:
        return self.connection.__exit__(exc_type, exc_value, traceback)

    @classmethod
    async def open(cls, connection_string: str) -> Self:
        """
        Open a connection with a Postgres database.

        The connection_string is the Postgres URL or connection string.

        A `componentize_py_types.Err(Error_ConnectionFailed(str))` when a connection fails.

        A `componentize_py_types.Err(Error_Other(str))` when some other error occurs.
        """
        return cls(await pg.Connection.open_async(connection_string))

    async def query(self, statement: str, params: List[ParameterValue]) -> Tuple[List[Column], StreamReader[List[DbValue]], FutureReader[Result[None, Error]]]:
        """
        Query the Postgres database.

        Returns a Tuple containing a List of Columns, a List of DbValues encapsulated in an asynchronous iterator (`componentize_py_async_support.streams.StreamReader`),
        and a future (`componentize_py_async_support.futures.FutureReader`) containing the result of the operation.

        Raises: `componentize_py_types.Err(spin_sdk.wit.imports.spin_postgres_postgres_4_2_0.Error)`
        """
        return await self.connection.query_async(statement, params)

    async def execute(self, statement: str, params: List[ParameterValue]) -> int:
        """
        Execute command to the database.

        Returns the number of affected rows as an int.

        Raises: `componentize_py_types.Err(spin_sdk.wit.imports.spin_postgres_postgres_4_2_0.Error)`
        """
        return await self.connection.execute_async(statement, params)

Static methods

async def open(connection_string: str) ‑> Self

Open a connection with a Postgres database.

The connection_string is the Postgres URL or connection string.

A componentize_py_types.Err(Error_ConnectionFailed(str)) when a connection fails.

A componentize_py_types.Err(Error_Other(str)) when some other error occurs.

Methods

async def execute(self,
statement: str,
params: List[ParameterValue_Boolean | ParameterValue_Int8 | ParameterValue_Int16 | ParameterValue_Int32 | ParameterValue_Int64 | ParameterValue_Floating32 | ParameterValue_Floating64 | ParameterValue_Str | ParameterValue_Binary | ParameterValue_Date | ParameterValue_Time | ParameterValue_Datetime | ParameterValue_Timestamp | ParameterValue_Uuid | ParameterValue_Jsonb | ParameterValue_Decimal | ParameterValue_RangeInt32 | ParameterValue_RangeInt64 | ParameterValue_RangeDecimal | ParameterValue_ArrayInt32 | ParameterValue_ArrayInt64 | ParameterValue_ArrayDecimal | ParameterValue_ArrayStr | ParameterValue_Interval | ParameterValue_DbNull]) ‑> int
Expand source code
async def execute(self, statement: str, params: List[ParameterValue]) -> int:
    """
    Execute command to the database.

    Returns the number of affected rows as an int.

    Raises: `componentize_py_types.Err(spin_sdk.wit.imports.spin_postgres_postgres_4_2_0.Error)`
    """
    return await self.connection.execute_async(statement, params)

Execute command to the database.

Returns the number of affected rows as an int.

Raises: componentize_py_types.Err(Error)

async def query(self,
statement: str,
params: List[ParameterValue_Boolean | ParameterValue_Int8 | ParameterValue_Int16 | ParameterValue_Int32 | ParameterValue_Int64 | ParameterValue_Floating32 | ParameterValue_Floating64 | ParameterValue_Str | ParameterValue_Binary | ParameterValue_Date | ParameterValue_Time | ParameterValue_Datetime | ParameterValue_Timestamp | ParameterValue_Uuid | ParameterValue_Jsonb | ParameterValue_Decimal | ParameterValue_RangeInt32 | ParameterValue_RangeInt64 | ParameterValue_RangeDecimal | ParameterValue_ArrayInt32 | ParameterValue_ArrayInt64 | ParameterValue_ArrayDecimal | ParameterValue_ArrayStr | ParameterValue_Interval | ParameterValue_DbNull]) ‑> Tuple[List[Column], componentize_py_async_support.streams.StreamReader[List[DbValue_Boolean | DbValue_Int8 | DbValue_Int16 | DbValue_Int32 | DbValue_Int64 | DbValue_Floating32 | DbValue_Floating64 | DbValue_Str | DbValue_Binary | DbValue_Date | DbValue_Time | DbValue_Datetime | DbValue_Timestamp | DbValue_Uuid | DbValue_Jsonb | DbValue_Decimal | DbValue_RangeInt32 | DbValue_RangeInt64 | DbValue_RangeDecimal | DbValue_ArrayInt32 | DbValue_ArrayInt64 | DbValue_ArrayDecimal | DbValue_ArrayStr | DbValue_Interval | DbValue_DbNull | DbValue_Unsupported]], componentize_py_async_support.futures.FutureReader[componentize_py_types.Ok[None] | componentize_py_types.Err[Error_ConnectionFailed | Error_BadParameter | Error_QueryFailed | Error_ValueConversionFailed | Error_Other]]]
Expand source code
async def query(self, statement: str, params: List[ParameterValue]) -> Tuple[List[Column], StreamReader[List[DbValue]], FutureReader[Result[None, Error]]]:
    """
    Query the Postgres database.

    Returns a Tuple containing a List of Columns, a List of DbValues encapsulated in an asynchronous iterator (`componentize_py_async_support.streams.StreamReader`),
    and a future (`componentize_py_async_support.futures.FutureReader`) containing the result of the operation.

    Raises: `componentize_py_types.Err(spin_sdk.wit.imports.spin_postgres_postgres_4_2_0.Error)`
    """
    return await self.connection.query_async(statement, params)

Query the Postgres database.

Returns a Tuple containing a List of Columns, a List of DbValues encapsulated in an asynchronous iterator (componentize_py_async_support.streams.StreamReader), and a future (componentize_py_async_support.futures.FutureReader) containing the result of the operation.

Raises: componentize_py_types.Err(Error)