dataframe_to_pg package¶
- dataframe_to_pg.write_dataframe_to_postgres(df: DataFrame | DataFrame, engine: Engine, table_name: str, dtypes: dict[str, Any] | None = None, sql_dtypes: dict[str, Any] | None = None, write_method: str = 'upsert', chunksize: int | str | None = None, index: str | list[str] | None = None, clean_column_names: bool = False, case_type: str = 'snake', truncate_limit: int = 55, yield_chunks: bool = False, progress_bar: bool = True) None | WriteResult | Generator[list[dict[str, Any]], None, WriteResult | int][source]¶
Write a DataFrame to a PostgreSQL table with conflict resolution, automatic addition of missing columns, optional processing in chunks, and support for a custom primary key.
If sql_dtypes is provided then these SQLAlchemy types (one of Boolean, DateTime, Float, Integer, Text, or postgresql JSON) will be used for the columns and no type inference will be performed. You cannot pass both dtypes and sql_dtypes.
- Parameters:
- df:
The DataFrame to be written to the PostgreSQL table. Can be either a pandas or Polars DataFrame.
- engine:
SQLAlchemy engine object to connect to the PostgreSQL database.
- table_name:
The name of the PostgreSQL table to write the DataFrame to.
- dtypes:
Optional dictionary mapping column names (including primary key columns) to SQLAlchemy types. If not provided for a given column, the type is inferred.
- sql_dtypes:
Optional dictionary mapping column names to SQLAlchemy types. If provided, these types (which should be one of Boolean, DateTime, Float, Integer, Text, or postgresql JSON) will be used for the columns without inferring. Cannot be passed alongside dtypes.
- write_method:
- One of the following options (default is ‘upsert’):
‘insert’: Insert rows; if the primary key(s) already exist, skip that row.
- ‘replace’: Insert rows; if the primary key(s) already exist, update every
non-key column with the new value.
- ‘upsert’: Insert rows; if the primary key(s) already exist, update only those
non-key columns whose new value is not null.
- chunksize:
Either None, a positive integer, or the string “auto”. If provided, the data will be processed in chunks.
- index:
Optional parameter specifying the primary key column(s). For pandas DataFrames, if not provided, the DataFrame’s index (or MultiIndex) is used. For Polars DataFrames, this parameter is required.
- clean_column_names:
If True, the DataFrame’s column names will be cleaned using pyjanitors’ clean_names method and the resulting column names will be returned in the WriteResult object.
- case_type:
The case type to pass to pyjanitors’ clean_names method (default is “snake”).
- truncate_limit:
The truncate limit to pass to pyjanitors clean_names method (default is 55).
- yield_chunks:
If True, yields each chunk as it is written to the database and returns, via the generator’s return value, a WriteResult (if clean_column_names=True) or the number of non-primary key columns updated. Otherwise, the function executes all chunks and, if clean_column_names is True, returns a WriteResult object; if False, returns None.
- progress_bar:
If True, displays a progress bar for the operation. This is only applicable if
- Returns:
If yield_chunks is True, yields each chunk (as a list of dicts) and finally returns a WriteResult (if clean_column_names=True) or an int representing the updated columns count.
If yield_chunks is False and clean_column_names is True, returns a WriteResult object with the updated_columns_count and the cleaned column names (accessible via .columns).
Otherwise, returns None.
- Raises:
- ValueError: If write_method is invalid, if chunksize is invalid, if a Polars
DataFrame is passed without specifying the index parameter, if column cleaning is requested but not supported, or if both dtypes and sql_dtypes are provided.
Submodules¶
dataframe_to_pg.writer module¶
- class dataframe_to_pg.writer.WriteResult(updated_columns_count: int, columns: list[str])[source]¶
Bases:
objectDataclass to encapsulate the result of writing a DataFrame to PostgreSQL.
- Attributes:
- updated_columns_count: Number of non-primary key columns updated
(only applicable for ‘replace’ and ‘upsert’).
columns: The list of column names after cleaning (only provided if clean_column_names=True).
- columns: list[str]¶
- updated_columns_count: int¶
- dataframe_to_pg.writer.clean_value(x: Any) Any[source]¶
Convert missing values to None.
- For scalar values:
If x is a string that is empty (after stripping) or equals “NaT” or “nan” (case-insensitive), return None.
Otherwise, use pd.isna to check for missing values.
- For non-scalars:
Convert x to a NumPy array and return None if the array is empty or if all elements are missing.
Otherwise, return the original value.
- dataframe_to_pg.writer.is_text_type(sql_type: Any) bool[source]¶
Helper to determine if an SQL type is (or should be) considered Text.
- dataframe_to_pg.writer.write_dataframe_to_postgres(df: DataFrame | DataFrame, engine: Engine, table_name: str, dtypes: dict[str, Any] | None = None, sql_dtypes: dict[str, Any] | None = None, write_method: str = 'upsert', chunksize: int | str | None = None, index: str | list[str] | None = None, clean_column_names: bool = False, case_type: str = 'snake', truncate_limit: int = 55, yield_chunks: bool = False, progress_bar: bool = True) None | WriteResult | Generator[list[dict[str, Any]], None, WriteResult | int][source]¶
Write a DataFrame to a PostgreSQL table with conflict resolution, automatic addition of missing columns, optional processing in chunks, and support for a custom primary key.
If sql_dtypes is provided then these SQLAlchemy types (one of Boolean, DateTime, Float, Integer, Text, or postgresql JSON) will be used for the columns and no type inference will be performed. You cannot pass both dtypes and sql_dtypes.
- Parameters:
- df:
The DataFrame to be written to the PostgreSQL table. Can be either a pandas or Polars DataFrame.
- engine:
SQLAlchemy engine object to connect to the PostgreSQL database.
- table_name:
The name of the PostgreSQL table to write the DataFrame to.
- dtypes:
Optional dictionary mapping column names (including primary key columns) to SQLAlchemy types. If not provided for a given column, the type is inferred.
- sql_dtypes:
Optional dictionary mapping column names to SQLAlchemy types. If provided, these types (which should be one of Boolean, DateTime, Float, Integer, Text, or postgresql JSON) will be used for the columns without inferring. Cannot be passed alongside dtypes.
- write_method:
- One of the following options (default is ‘upsert’):
‘insert’: Insert rows; if the primary key(s) already exist, skip that row.
- ‘replace’: Insert rows; if the primary key(s) already exist, update every
non-key column with the new value.
- ‘upsert’: Insert rows; if the primary key(s) already exist, update only those
non-key columns whose new value is not null.
- chunksize:
Either None, a positive integer, or the string “auto”. If provided, the data will be processed in chunks.
- index:
Optional parameter specifying the primary key column(s). For pandas DataFrames, if not provided, the DataFrame’s index (or MultiIndex) is used. For Polars DataFrames, this parameter is required.
- clean_column_names:
If True, the DataFrame’s column names will be cleaned using pyjanitors’ clean_names method and the resulting column names will be returned in the WriteResult object.
- case_type:
The case type to pass to pyjanitors’ clean_names method (default is “snake”).
- truncate_limit:
The truncate limit to pass to pyjanitors clean_names method (default is 55).
- yield_chunks:
If True, yields each chunk as it is written to the database and returns, via the generator’s return value, a WriteResult (if clean_column_names=True) or the number of non-primary key columns updated. Otherwise, the function executes all chunks and, if clean_column_names is True, returns a WriteResult object; if False, returns None.
- progress_bar:
If True, displays a progress bar for the operation. This is only applicable if
- Returns:
If yield_chunks is True, yields each chunk (as a list of dicts) and finally returns a WriteResult (if clean_column_names=True) or an int representing the updated columns count.
If yield_chunks is False and clean_column_names is True, returns a WriteResult object with the updated_columns_count and the cleaned column names (accessible via .columns).
Otherwise, returns None.
- Raises:
- ValueError: If write_method is invalid, if chunksize is invalid, if a Polars
DataFrame is passed without specifying the index parameter, if column cleaning is requested but not supported, or if both dtypes and sql_dtypes are provided.