CSV-Wrangler
Patterns for handling real-world messy CSV files.
Encoding Detection
import chardet
def detect_encoding(file_path: str, sample_size: int = 10000) -> str:
"""Detect file encoding from sample."""
with open(file_path, 'rb') as f:
raw = f.read(sample_size)
result = chardet.detect(raw)
return result['encoding']
def read_with_encoding(path: str) -> pd.DataFrame:
"""Read CSV with auto-detected encoding."""
encoding = detect_encoding(path)
# Common fallback chain
encodings = [encoding, 'utf-8', 'latin-1', 'cp1252']
for enc in encodings:
try:
return pd.read_csv(path, encoding=enc)
except UnicodeDecodeError:
continue
# Last resort: ignore errors
return pd.read_csv(path, encoding='utf-8', errors='ignore')
Delimiter Detection
import csv
def detect_delimiter(file_path: str) -> str:
"""Detect CSV delimiter from file sample."""
with open(file_path, 'r', encoding='utf-8', errors='ignore') as f:
sample = f.read(4096)
sniffer = csv.Sniffer()
try:
dialect = sniffer.sniff(sample, delimiters=',;\t|')
return dialect.delimiter
except csv.Error:
# Count occurrences and pick most common
counts = {d: sample.count(d) for d in [',', ';', '\t', '|']}
return max(counts, key=counts.get)
def read_with_delimiter_detection(path: str) -> pd.DataFrame:
"""Read CSV with auto-detected delimiter."""
delimiter = detect_delimiter(path)
return pd.read_csv(path, sep=delimiter)
Handling Malformed Rows
def read_with_error_handling(path: str) -> tuple[pd.DataFrame, list[dict]]:
"""Read CSV, capturing malformed rows separately."""
good_rows = []
bad_rows = []
with open(path, 'r', encoding='utf-8', errors='replace') as f:
reader = csv.reader(f)
header = next(reader)
expected_cols = len(header)
for line_num, row in enumerate(reader, start=2):
if len(row) == expected_cols:
good_rows.append(row)
else:
bad_rows.append({
'line': line_num,
'expected': expected_cols,
'actual': len(row),
'data': row
})
df = pd.DataFrame(good_rows, columns=header)
return df, bad_rows
# Using pandas error_bad_lines (deprecated) / on_bad_lines
def read_skip_bad_lines(path: str) -> pd.DataFrame:
"""Read CSV, skipping malformed rows."""
return pd.read_csv(
path,
on_bad_lines='skip', # or 'warn' to see them
engine='python'
)
Quoted Fields with Embedded Delimiters
def read_quoted_csv(path: str) -> pd.DataFrame:
"""Handle CSVs with quoted fields containing delimiters."""
return pd.read_csv(
path,
quotechar='"',
doublequote=True, # Handle "" as escaped quote
escapechar='\\', # Handle \, as escaped comma
engine='python'
)
# For really messy files
def read_regex_separated(path: str, pattern: str = r',(?=(?:[^"]*"[^"]*")*[^"]*$)') -> pd.DataFrame:
"""Split on delimiter only outside quotes using regex."""
import re
rows = []
with open(path, 'r') as f:
for line in f:
rows.append(re.split(pattern, line.strip()))
return pd.DataFrame(rows[1:], columns=rows[0])
Header Detection
def detect_header_row(path: str, max_rows: int = 10) -> int:
"""Find the header row in files with metadata at top."""
with open(path, 'r') as f:
lines = [f.readline() for _ in range(max_rows)]
delimiter = detect_delimiter(path)
for i, line in enumerate(lines):
parts = line.split(delimiter)
# Header likely has more columns than metadata
# and contains text-like values
if len(parts) > 2 and all(p.strip().replace('_', '').isalnum() for p in parts[:3]):
return i
return 0 # Default to first row
def read_with_header_detection(path: str) -> pd.DataFrame:
"""Read CSV, auto-detecting header row."""
header_row = detect_header_row(path)
return pd.read_csv(path, skiprows=header_row)
Date Parsing
def infer_date_columns(df: pd.DataFrame) -> list[str]:
"""Identify columns that look like dates."""
date_cols = []
date_patterns = [
r'\d{4}-\d{2}-\d{2}', # 2024-01-15
r'\d{2}/\d{2}/\d{4}', # 01/15/2024
r'\d{2}-\d{2}-\d{4}', # 15-01-2024
r'\d{4}/\d{2}/\d{2}', # 2024/01/15
]
for col in df.columns:
if df[col].dtype == 'object':
sample = df[col].dropna().head(100)
for pattern in date_patterns:
if sample.str.match(pattern).mean() > 0.8:
date_cols.append(col)
break
return date_cols
def parse_dates_flexibly(df: pd.DataFrame, columns: list[str]) -> pd.DataFrame:
"""Parse dates with multiple format attempts."""
for col in columns:
try:
df[col] = pd.to_datetime(df[col], infer_datetime_format=True)
except:
# Try common formats explicitly
for fmt in ['%Y-%m-%d', '%m/%d/%Y', '%d/%m/%Y', '%Y/%m/%d']:
try:
df[col] = pd.to_datetime(df[col], format=fmt)
break
except:
continue
return df
Numeric Cleaning
def clean_numeric_column(series: pd.Series) -> pd.Series:
"""Clean numeric column with currency symbols, commas, etc."""
if series.dtype == 'object':
cleaned = (series
.astype(str)
.str.replace(r'[$€£¥,]', '', regex=True) # Remove currency/commas
.str.replace(r'\s+', '', regex=True) # Remove whitespace
.str.replace(r'\((.+)\)', r'-\1', regex=True) # (123) -> -123
)
return pd.to_numeric(cleaned, errors='coerce')
return series
def clean_all_numeric(df: pd.DataFrame) -> pd.DataFrame:
"""Attempt to convert object columns to numeric."""
for col in df.select_dtypes(include=['object']).columns:
converted = pd.to_numeric(df[col], errors='coerce')
# Keep conversion if >80% success
if converted.notna().mean() > 0.8:
df[col] = converted
return df
Complete Wrangling Pipeline
def wrangle_csv(path: str) -> tuple[pd.DataFrame, dict]:
"""Full pipeline for messy CSV handling."""
report = {'issues': [], 'fixes': []}
# Step 1: Detect encoding
encoding = detect_encoding(path)
report['encoding'] = encoding
# Step 2: Detect delimiter
delimiter = detect_delimiter(path)
report['delimiter'] = delimiter
# Step 3: Detect header row
header_row = detect_header_row(path)
if header_row > 0:
report['fixes'].append(f"Skipped {header_row} metadata rows")
# Step 4: Read with error handling
try:
df = pd.read_csv(
path,
encoding=encoding,
sep=delimiter,
skiprows=header_row,
on_bad_lines='warn',
engine='python'
)
except Exception as e:
report['issues'].append(f"Read error: {e}")
df = pd.read_csv(path, encoding='latin-1', on_bad_lines='skip')
# Step 5: Clean column names
df.columns = (df.columns
.str.strip()
.str.lower()
.str.replace(r'\s+', '_', regex=True)
.str.replace(r'[^\w]', '', regex=True)
)
report['fixes'].append("Normalized column names")
# Step 6: Parse dates
date_cols = infer_date_columns(df)
if date_cols:
df = parse_dates_flexibly(df, date_cols)
report['fixes'].append(f"Parsed dates: {date_cols}")
# Step 7: Clean numeric columns
df = clean_all_numeric(df)
report['shape'] = df.shape
report['dtypes'] = df.dtypes.to_dict()
return df, report
Excel File Handling
def read_excel_smart(path: str, sheet: str | int = 0) -> pd.DataFrame:
"""Read Excel with common cleanup."""
df = pd.read_excel(
path,
sheet_name=sheet,
engine='openpyxl', # For .xlsx
# engine='xlrd', # For .xls
)
# Drop fully empty rows/columns
df = df.dropna(how='all').dropna(axis=1, how='all')
# Reset index after dropping
df = df.reset_index(drop=True)
return df
def list_excel_sheets(path: str) -> list[str]:
"""List all sheets in an Excel file."""
xl = pd.ExcelFile(path)
return xl.sheet_names
CSV-to-SQL Conversion
Generate CREATE TABLE and load statements from CSV files with type inference.
Type Mapping
| Detected Pattern | PostgreSQL | MySQL | SQLite | |-----------------|------------|-------|--------| | Integer | INTEGER / BIGINT | INT / BIGINT | INTEGER | | Decimal | NUMERIC(p,s) | DECIMAL(p,s) | REAL | | Boolean | BOOLEAN | TINYINT(1) | INTEGER | | Date | DATE | DATE | TEXT | | Timestamp | TIMESTAMP | DATETIME | TEXT | | UUID | UUID | CHAR(36) | TEXT | | Short text | VARCHAR(n) | VARCHAR(n) | TEXT | | Long text | TEXT | TEXT | TEXT | | JSON | JSONB | JSON | TEXT |
PostgreSQL Output
-- Generated from: orders.csv
-- Rows: 50,000 | Columns: 8
DROP TABLE IF EXISTS orders CASCADE;
CREATE TABLE orders (
order_id BIGINT PRIMARY KEY,
customer_id BIGINT NOT NULL,
order_date DATE NOT NULL,
status VARCHAR(20) NOT NULL CHECK (status IN ('pending', 'shipped', 'delivered')),
total_amount NUMERIC(10, 2) NOT NULL CHECK (total_amount >= 0),
shipping_address TEXT,
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
updated_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
);
CREATE INDEX idx_orders_customer ON orders(customer_id);
CREATE INDEX idx_orders_date ON orders(order_date);
\COPY orders FROM 'orders.csv' WITH (FORMAT csv, HEADER true, NULL '');
SELECT COUNT(*) as loaded_rows FROM orders;
MySQL Output
DROP TABLE IF EXISTS orders;
CREATE TABLE orders (
order_id BIGINT PRIMARY KEY,
customer_id BIGINT NOT NULL,
order_date DATE NOT NULL,
status ENUM('pending', 'shipped', 'delivered') NOT NULL,
total_amount DECIMAL(10, 2) NOT NULL,
shipping_address TEXT,
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
updated_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP,
INDEX idx_customer (customer_id),
INDEX idx_date (order_date)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4;
LOAD DATA INFILE '/path/to/orders.csv'
INTO TABLE orders
FIELDS TERMINATED BY ','
ENCLOSED BY '"'
LINES TERMINATED BY '\n'
IGNORE 1 ROWS;
SQLite Output
DROP TABLE IF EXISTS orders;
CREATE TABLE orders (
order_id INTEGER PRIMARY KEY,
customer_id INTEGER NOT NULL,
order_date TEXT NOT NULL,
status TEXT NOT NULL CHECK (status IN ('pending', 'shipped', 'delivered')),
total_amount REAL NOT NULL CHECK (total_amount >= 0),
shipping_address TEXT,
created_at TEXT DEFAULT (datetime('now')),
updated_at TEXT DEFAULT (datetime('now'))
);
CREATE INDEX idx_orders_customer ON orders(customer_id);
CREATE INDEX idx_orders_date ON orders(order_date);
.mode csv
.import --skip 1 orders.csv orders
Python Load Script
import pandas as pd
from sqlalchemy import create_engine
def load_csv_to_db(
csv_path: str,
table_name: str,
connection_string: str,
if_exists: str = 'replace',
chunksize: int = 10000
) -> int:
"""Load CSV to database with progress."""
engine = create_engine(connection_string)
total_rows = 0
for chunk in pd.read_csv(csv_path, chunksize=chunksize):
chunk.to_sql(
table_name, engine,
if_exists=if_exists if total_rows == 0 else 'append',
index=False
)
total_rows += len(chunk)
return total_rows
Constraints Detection
Automatically detect and suggest:
- Primary Key - Column named 'id', '*_id', 'pk'; 100% unique, no nulls
- Foreign Keys - Columns ending in '_id'; reference table inferred from prefix
- NOT NULL - Columns with 0% null rate
- UNIQUE - Columns with 100% unique values; email, username patterns
- CHECK Constraints - Categorical columns -> IN list; numeric ranges -> >= / <=