API Reference¶
Use crump programmatically as a Python library.
Overview¶
The crump package can be imported and used in your Python applications. This is useful for:
- Building custom ETL pipelines
- Integrating with existing applications
- Automating data synchronization workflows
- Creating custom tools on top of crump
Installation¶
Quick Example¶
from pathlib import Path
from crump import sync_file_to_db, CrumpConfig
# Load configuration
config = CrumpConfig.from_yaml(Path("crump_config.yml"))
job = config.get_job("my_job")
# Sync a tabular file (CSV or Parquet)
rows_synced = sync_file_to_db(
file_path=Path("users.parquet"),
job=job,
db_connection_string="sqlite:///test.db"
)
print(f"Synced {rows_synced} rows")
Core Functions¶
sync_file_to_db¶
Sync a tabular file (CSV or Parquet) to a database (PostgreSQL or SQLite).
File format is automatically detected from the file extension.
def sync_file_to_db(
file_path: Path,
job: CrumpJob,
db_connection_string: str,
filename_values: dict[str, str] | None = None,
enable_history: bool = False
) -> int
Parameters:
| Parameter | Type | Description |
|---|---|---|
file_path |
Path |
Path to the tabular file (CSV or Parquet) to sync |
job |
CrumpJob |
Configuration for the sync job |
db_connection_string |
str |
Database connection string (PostgreSQL or SQLite) |
filename_values |
dict[str, str] \| None |
Extracted values from filename (optional) |
enable_history |
bool |
Whether to record sync history (default: False) |
Returns: Number of rows synced (int)
Raises:
- FileNotFoundError: File doesn't exist
- ValueError: Invalid configuration or data
- DatabaseError: Database connection or query errors
Example:
from pathlib import Path
from crump import sync_file_to_db, CrumpConfig
config = CrumpConfig.from_yaml(Path("crump_config.yml"))
job = config.get_job("users_sync")
# Sync CSV file
rows = sync_file_to_db(
file_path=Path("users.csv"),
job=job,
db_connection_string="sqlite:///test.db"
# OR db_connection_string="postgresql://localhost:5432/mydb"
)
# Sync Parquet file (format auto-detected from extension)
rows = sync_file_to_db(
file_path=Path("users.csv"),
crump_job=job,
db_connection_string="sqlite:///test.db"
)
# With filename extraction
filename_values = {"date": "2024-01-15"}
rows = sync_csv_to_db(
file_path=Path("users.csv"),
crump_job=job,
db_connection_string="sqlite:///test.db",
filename_values=filename_values
)
sync_csv_to_db_dry_run¶
Preview sync without making database changes.
def sync_csv_to_db_dry_run(
file_path: Path,
crump_job: CrumpJob,
db_connection_string: str,
filename_values: dict[str, str] | None = None
) -> DryRunSummary
Parameters: Same as sync_csv_to_db
Returns: DryRunSummary object with:
| Attribute | Type | Description |
|---|---|---|
table_name |
str |
Name of the target table |
table_exists |
bool |
Whether table already exists |
new_columns |
list[tuple[str, str]] |
Columns to be added (name, type) |
new_indexes |
list[str] |
Indexes to be created |
rows_to_sync |
int |
Number of rows to insert/update |
rows_to_delete |
int |
Number of stale rows to delete |
Example:
from pathlib import Path
from crump import sync_csv_to_db_dry_run, CrumpConfig
config = CrumpConfig.from_yaml(Path("crump_config.yml"))
job = config.get_job("my_job")
summary = sync_csv_to_db_dry_run(
file_path=Path("users.csv"),
crump_job=job,
db_connection_string="sqlite:///test.db"
)
print(f"Table exists: {summary.table_exists}")
print(f"New columns: {summary.new_columns}")
print(f"Rows to sync: {summary.rows_to_sync}")
print(f"Rows to delete: {summary.rows_to_delete}")
# Check before proceeding
if summary.new_columns:
print("Warning: New columns will be added!")
for col_name, col_type in summary.new_columns:
print(f" - {col_name}: {col_type}")
analyze_csv_types_and_nullable¶
Analyze CSV file to detect column types.
Parameters:
| Parameter | Type | Description |
|---|---|---|
file_path |
Path |
Path to the CSV file |
Returns: Dictionary mapping column names to (type, nullable) tuples
Example:
from pathlib import Path
from crump import analyze_csv_types_and_nullable
column_info = analyze_csv_types_and_nullable(Path("users.csv"))
for col_name, (data_type, nullable) in column_info.items():
null_str = "NULL" if nullable else "NOT NULL"
print(f"{col_name}: {data_type} {null_str}")
Output:
user_id: INTEGER NOT NULL
name: TEXT NOT NULL
email: TEXT NOT NULL
age: INTEGER NULL
created_at: DATE NULL
suggest_id_column¶
Suggest an ID column from a list of columns.
Parameters:
| Parameter | Type | Description |
|---|---|---|
columns |
list[str] |
List of column names |
Returns: Suggested ID column name (str)
Logic:
1. Looks for columns ending with _id or _key
2. Prefers shorter names
3. Falls back to first column if none found
Example:
from crump import suggest_id_column
columns = ["user_id", "name", "email", "created_at"]
id_col = suggest_id_column(columns)
print(f"Suggested ID: {id_col}") # Output: user_id
columns = ["name", "email"]
id_col = suggest_id_column(columns)
print(f"Suggested ID: {id_col}") # Output: name (first column)
Configuration Classes¶
CrumpConfig¶
Main configuration container.
class CrumpConfig:
def __init__(self, jobs: dict[str, CrumpJob]) -> None
@classmethod
def from_yaml(cls, config_path: Path) -> CrumpConfig
def save_to_yaml(self, config_path: Path) -> None
def get_job(self, job_name: str) -> CrumpJob | None
def add_or_update_job(self, job: CrumpJob, force: bool = False) -> None
Example:
from pathlib import Path
from crump import CrumpConfig
# Load from file
config = CrumpConfig.from_yaml(Path("crump_config.yml"))
# Get a job
job = config.get_job("my_job")
if job:
print(f"Target table: {job.target_table}")
# Create new config
from crump import CrumpJob, ColumnMapping
new_config = CrumpConfig(jobs={})
job = CrumpJob(
name="users",
target_table="users",
id_mapping=[ColumnMapping("user_id", "id", "integer")]
)
new_config.add_or_update_job(job)
new_config.save_to_yaml(Path("new_crump_config.yml"))
CrumpJob¶
Configuration for a single sync job.
class CrumpJob:
def __init__(
self,
name: str,
target_table: str,
id_mapping: list[ColumnMapping],
columns: list[ColumnMapping] | None = None,
filename_to_column: FilenameToColumn | None = None,
indexes: list[Index] | None = None,
) -> None
Attributes:
| Attribute | Type | Description |
|---|---|---|
name |
str |
Job name |
target_table |
str |
Target database table |
id_mapping |
list[ColumnMapping] |
Primary key mapping |
columns |
list[ColumnMapping] |
Column mappings (None = sync all) |
filename_to_column |
FilenameToColumn \| None |
Filename extraction config |
indexes |
list[Index] |
Database indexes to create |
Example:
from crump import CrumpJob, ColumnMapping, Index, IndexColumn
job = CrumpJob(
name="users_sync",
target_table="users",
id_mapping=[
ColumnMapping("user_id", "id", "integer")
],
columns=[
ColumnMapping("name", "full_name", "text"),
ColumnMapping("email", "email", "text"),
],
indexes=[
Index(
name="idx_email",
columns=[IndexColumn("email", "ASC")]
)
]
)
ColumnMapping¶
Mapping for a single column.
class ColumnMapping:
def __init__(
self,
csv_column: str,
db_column: str,
data_type: str | None = None
) -> None
Example:
from crump import ColumnMapping
mapping = ColumnMapping(
csv_column="user_id",
db_column="id",
data_type="integer"
)
FilenameToColumn¶
Configuration for extracting values from filenames.
class FilenameToColumn:
def __init__(
self,
columns: dict[str, FilenameColumnMapping],
template: str | None = None,
regex: str | None = None,
) -> None
def extract_values_from_filename(
self,
filename: str | Path
) -> dict[str, str] | None
def get_delete_key_columns(self) -> list[str]
Example:
from crump import FilenameToColumn, FilenameColumnMapping
ftc = FilenameToColumn(
template="sales_[date].csv",
columns={
"date": FilenameColumnMapping(
name="date",
db_column="sync_date",
data_type="date",
use_to_delete_old_rows=True
)
}
)
# Extract values
values = ftc.extract_values_from_filename("sales_2024-01-15.csv")
print(values) # {'date': '2024-01-15'}
# Get delete key columns
delete_cols = ftc.get_delete_key_columns()
print(delete_cols) # ['sync_date']
Index¶
Database index configuration.
Example:
from crump import Index, IndexColumn
# Single column index
idx1 = Index(
name="idx_created_at",
columns=[IndexColumn("created_at", "DESC")]
)
# Multi-column index
idx2 = Index(
name="idx_user_date",
columns=[
IndexColumn("user_id", "ASC"),
IndexColumn("created_at", "DESC")
]
)
Complete Example¶
Here's a complete example demonstrating various API features:
from pathlib import Path
from crump import (
CrumpConfig,
CrumpJob,
ColumnMapping,
FilenameToColumn,
FilenameColumnMapping,
Index,
IndexColumn,
sync_csv_to_db,
sync_csv_to_db_dry_run,
analyze_csv_types_and_nullable,
suggest_id_column,
)
# Analyze a CSV file
csv_path = Path("users.csv")
column_info = analyze_csv_types_and_nullable(csv_path)
columns = list(column_info.keys())
id_column = suggest_id_column(columns)
print(f"Detected columns: {columns}")
print(f"Suggested ID: {id_column}")
# Create a job programmatically
job = CrumpJob(
name="daily_sales",
target_table="sales",
id_mapping=[
ColumnMapping(id_column, "id", "integer")
],
columns=[
ColumnMapping("product_id", "product_id", "integer"),
ColumnMapping("amount", "amount", "float"),
],
filename_to_column=FilenameToColumn(
template="sales_[date].csv",
columns={
"date": FilenameColumnMapping(
name="date",
db_column="sync_date",
data_type="date",
use_to_delete_old_rows=True
)
}
),
indexes=[
Index(
name="idx_sync_date",
columns=[IndexColumn("sync_date", "DESC")]
)
]
)
# Create config and save
config = CrumpConfig(jobs={})
config.add_or_update_job(job)
config.save_to_yaml(Path("crump_config.yml"))
# Extract filename values
filename_values = job.filename_to_column.extract_values_from_filename(csv_path)
# Dry-run first
summary = sync_csv_to_db_dry_run(
file_path=csv_path,
crump_job=job,
db_connection_string="sqlite:///test.db",
filename_values=filename_values
)
print(f"\nDry-run results:")
print(f" Table exists: {summary.table_exists}")
print(f" Rows to sync: {summary.rows_to_sync}")
print(f" Rows to delete: {summary.rows_to_delete}")
# Confirm and sync
if input("Proceed with sync? (y/n): ").lower() == "y":
rows = sync_csv_to_db(
file_path=csv_path,
crump_job=job,
db_connection_string="sqlite:///test.db",
filename_values=filename_values
)
print(f"\nSynced {rows} rows successfully!")
Error Handling¶
from pathlib import Path
from crump import sync_csv_to_db, CrumpConfig
try:
config = CrumpConfig.from_yaml(Path("crump_config.yml"))
job = config.get_job("my_job")
if not job:
print("Job not found in configuration")
exit(1)
rows = sync_csv_to_db(
file_path=Path("users.csv"),
crump_job=job,
db_connection_string="sqlite:///test.db"
)
print(f"Success: {rows} rows synced")
except FileNotFoundError as e:
print(f"File not found: {e}")
except ValueError as e:
print(f"Configuration error: {e}")
except Exception as e:
print(f"Unexpected error: {e}")
Type Hints¶
All functions include full type hints for IDE autocomplete and type checking:
from pathlib import Path
from crump import sync_csv_to_db, CrumpJob
# Type checker knows the types
def sync_data(csv_file: Path, job: CrumpJob) -> None:
rows: int = sync_csv_to_db(
file_path=csv_file,
crump_job=job,
db_connection_string="sqlite:///test.db"
)
# rows is guaranteed to be int
print(f"Synced {rows} rows")
Next Steps¶
- Configuration Guide - Learn about YAML configuration
- Features - Detailed feature documentation
- Development - Contributing to crump