Module spin_sdk.postgres
Module for interacting with a Postgres database
Classes
class Connection (connection: Connection)-
Expand source code
class Connection: def __init__(self, connection: pg.Connection) -> None: self.connection = connection def __enter__(self) -> Self: return self def __exit__(self, exc_type: type[BaseException] | None, exc_value: BaseException | None, traceback: TracebackType | None) -> bool | None: return self.connection.__exit__(exc_type, exc_value, traceback) @classmethod async def open(cls, connection_string: str) -> Self: """ Open a connection with a Postgres database. The connection_string is the Postgres URL or connection string. A `componentize_py_types.Err(Error_ConnectionFailed(str))` when a connection fails. A `componentize_py_types.Err(Error_Other(str))` when some other error occurs. """ return cls(await pg.Connection.open_async(connection_string)) async def query(self, statement: str, params: List[ParameterValue]) -> Tuple[List[Column], StreamReader[List[DbValue]], FutureReader[Result[None, Error]]]: """ Query the Postgres database. Returns a Tuple containing a List of Columns, a List of DbValues encapsulated in an asynchronous iterator (`componentize_py_async_support.streams.StreamReader`), and a future (`componentize_py_async_support.futures.FutureReader`) containing the result of the operation. Raises: `componentize_py_types.Err(spin_sdk.wit.imports.spin_postgres_postgres_4_2_0.Error)` """ return await self.connection.query_async(statement, params) async def execute(self, statement: str, params: List[ParameterValue]) -> int: """ Execute command to the database. Returns the number of affected rows as an int. Raises: `componentize_py_types.Err(spin_sdk.wit.imports.spin_postgres_postgres_4_2_0.Error)` """ return await self.connection.execute_async(statement, params)Static methods
async def open(connection_string: str) ‑> Self-
Open a connection with a Postgres database.
The connection_string is the Postgres URL or connection string.
A
componentize_py_types.Err(Error_ConnectionFailed(str))when a connection fails.A
componentize_py_types.Err(Error_Other(str))when some other error occurs.
Methods
async def execute(self,
statement: str,
params: List[ParameterValue_Boolean | ParameterValue_Int8 | ParameterValue_Int16 | ParameterValue_Int32 | ParameterValue_Int64 | ParameterValue_Floating32 | ParameterValue_Floating64 | ParameterValue_Str | ParameterValue_Binary | ParameterValue_Date | ParameterValue_Time | ParameterValue_Datetime | ParameterValue_Timestamp | ParameterValue_Uuid | ParameterValue_Jsonb | ParameterValue_Decimal | ParameterValue_RangeInt32 | ParameterValue_RangeInt64 | ParameterValue_RangeDecimal | ParameterValue_ArrayInt32 | ParameterValue_ArrayInt64 | ParameterValue_ArrayDecimal | ParameterValue_ArrayStr | ParameterValue_Interval | ParameterValue_DbNull]) ‑> int-
Expand source code
async def execute(self, statement: str, params: List[ParameterValue]) -> int: """ Execute command to the database. Returns the number of affected rows as an int. Raises: `componentize_py_types.Err(spin_sdk.wit.imports.spin_postgres_postgres_4_2_0.Error)` """ return await self.connection.execute_async(statement, params)Execute command to the database.
Returns the number of affected rows as an int.
Raises:
componentize_py_types.Err(Error) async def query(self,
statement: str,
params: List[ParameterValue_Boolean | ParameterValue_Int8 | ParameterValue_Int16 | ParameterValue_Int32 | ParameterValue_Int64 | ParameterValue_Floating32 | ParameterValue_Floating64 | ParameterValue_Str | ParameterValue_Binary | ParameterValue_Date | ParameterValue_Time | ParameterValue_Datetime | ParameterValue_Timestamp | ParameterValue_Uuid | ParameterValue_Jsonb | ParameterValue_Decimal | ParameterValue_RangeInt32 | ParameterValue_RangeInt64 | ParameterValue_RangeDecimal | ParameterValue_ArrayInt32 | ParameterValue_ArrayInt64 | ParameterValue_ArrayDecimal | ParameterValue_ArrayStr | ParameterValue_Interval | ParameterValue_DbNull]) ‑> Tuple[List[Column], componentize_py_async_support.streams.StreamReader[List[DbValue_Boolean | DbValue_Int8 | DbValue_Int16 | DbValue_Int32 | DbValue_Int64 | DbValue_Floating32 | DbValue_Floating64 | DbValue_Str | DbValue_Binary | DbValue_Date | DbValue_Time | DbValue_Datetime | DbValue_Timestamp | DbValue_Uuid | DbValue_Jsonb | DbValue_Decimal | DbValue_RangeInt32 | DbValue_RangeInt64 | DbValue_RangeDecimal | DbValue_ArrayInt32 | DbValue_ArrayInt64 | DbValue_ArrayDecimal | DbValue_ArrayStr | DbValue_Interval | DbValue_DbNull | DbValue_Unsupported]], componentize_py_async_support.futures.FutureReader[componentize_py_types.Ok[None] | componentize_py_types.Err[Error_ConnectionFailed | Error_BadParameter | Error_QueryFailed | Error_ValueConversionFailed | Error_Other]]]-
Expand source code
async def query(self, statement: str, params: List[ParameterValue]) -> Tuple[List[Column], StreamReader[List[DbValue]], FutureReader[Result[None, Error]]]: """ Query the Postgres database. Returns a Tuple containing a List of Columns, a List of DbValues encapsulated in an asynchronous iterator (`componentize_py_async_support.streams.StreamReader`), and a future (`componentize_py_async_support.futures.FutureReader`) containing the result of the operation. Raises: `componentize_py_types.Err(spin_sdk.wit.imports.spin_postgres_postgres_4_2_0.Error)` """ return await self.connection.query_async(statement, params)Query the Postgres database.
Returns a Tuple containing a List of Columns, a List of DbValues encapsulated in an asynchronous iterator (
componentize_py_async_support.streams.StreamReader), and a future (componentize_py_async_support.futures.FutureReader) containing the result of the operation.Raises:
componentize_py_types.Err(Error)