Knowledge science workflows usually contain repetitive, time-consuming duties that may drain productiveness and delay insights. From cleansing messy datasets to producing constant stories, these guide processes create bottlenecks that stop knowledge scientists from specializing in what issues most: extracting significant insights and constructing strong fashions.
Python has emerged because the undisputed champion for automating these tedious workflows. With its wealthy ecosystem of libraries and intuitive syntax, Python allows knowledge scientists to rework hours of guide work into automated scripts that run reliably and constantly. Whether or not you’re coping with knowledge preprocessing, mannequin coaching, or report era — automation not solely saves time but in addition reduces human error and ensures reproducibility.
The next ten Python scripts signify important automation instruments that each knowledge scientist ought to have of their toolkit. Every script addresses a standard ache level within the knowledge science workflow, offering sensible options that may be applied instantly and customised for particular wants.
1. Automated Knowledge Cleansing with Pandas
Knowledge cleansing sometimes consumes 60–80% of a knowledge scientist’s time, making it the proper candidate for automation. This script handles the most typical knowledge high quality points in a standardized, repeatable approach.
import pandas as pd
import numpy as npdef automated_data_cleaning(df):
"""
Complete knowledge cleansing pipeline
"""
# Take away duplicate rows
df = df.drop_duplicates()
# Deal with lacking values
numeric_cols = df.select_dtypes(embody=[np.number]).columns
categorical_cols = df.select_dtypes(embody=['object']).columns
# Fill numeric columns with median
df[numeric_cols] = df[numeric_cols].fillna(df[numeric_cols].median())
# Fill categorical columns with mode
for col in categorical_cols:
df[col] = df[col].fillna(df[col].mode()[0] if not df[col].mode().empty else 'Unknown')
# Take away outliers utilizing IQR technique
for col in numeric_cols:
Q1 = df[col].quantile(0.25)
Q3 = df[col].quantile(0.75)
IQR = Q3 - Q1
df = df[~((df[col] < (Q1 - 1.5 * IQR)) | (df[col] > (Q3 + 1.5 * IQR)))]
# Standardize column names
df.columns = df.columns.str.decrease().str.change(' ', '_')
return df
# Utilization
df_clean = automated_data_cleaning(raw_df)
Actual-world use case: A retail firm processes each day gross sales knowledge from a number of shops with inconsistent formatting, lacking entries, and occasional knowledge entry errors. This script ensures all datasets observe the identical high quality requirements earlier than evaluation.
Advantages of automation:
- Constant knowledge high quality throughout all datasets
- Reduces cleansing time from hours to minutes
- Prevents downstream errors in evaluation and modeling
- Allows quick knowledge processing in manufacturing pipelines
2. Exploratory Knowledge Evaluation with ydata-profiling
Handbook exploratory knowledge evaluation (EDA) can take hours of writing repetitive code. This script generates complete knowledge profiles routinely, offering immediate insights into your dataset’s traits.
from ydata_profiling import ProfileReport
import pandas as pddef generate_eda_report(df, title="Knowledge Evaluation Report"):
# Generate complete EDA report routinely
profile = ProfileReport(
df,
title=title,
explorative=True,
config_file={
'correlations': {'auto': {'calculate': True}},
'missing_diagrams': {'heatmap': True},
'interactions': {'steady': True}
}
)
# Save report as HTML
profile.to_file(f"{title.change(' ', '_').decrease()}.html")
# Generate abstract statistics
abstract = {
'total_rows': len(df),
'total_columns': len(df.columns),
'missing_percentage': (df.isnull().sum().sum() / (len(df) * len(df.columns))) * 100,
'duplicate_rows': df.duplicated().sum(),
'numeric_columns': len(df.select_dtypes(embody=[np.number]).columns),
'categorical_columns': len(df.select_dtypes(embody=['object']).columns)
}
return profile, abstract
# Utilization
profile, abstract = generate_eda_report(df, "Buyer Dataset Evaluation")
print(f"Dataset has {abstract['missing_percentage']:.2f}% lacking values")
Actual-world use case: A advertising crew receives a brand new buyer dataset each month and must rapidly perceive buyer demographics, conduct patterns, and knowledge high quality points earlier than launching focused campaigns.
Advantages of automation:
- Generates publication-ready stories in seconds
- Identifies knowledge high quality points and patterns routinely
- Gives interactive visualizations for stakeholder shows
- Standardizes EDA course of throughout completely different groups and initiatives
3. Interactive Knowledge Visualization Dashboard with Plotly and Sprint
Creating dynamic dashboards for stakeholders historically requires intensive improvement time. This script creates interactive dashboards that replace routinely with new knowledge.
import sprint
from sprint import dcc, html, Enter, Output
import plotly.categorical as px
import pandas as pddef create_automated_dashboard(df):
# Create interactive dashboard with automated chart era
app = sprint.Sprint(__name__)
# Get numeric and categorical columns
numeric_cols = df.select_dtypes(embody=['number']).columns.tolist()
categorical_cols = df.select_dtypes(embody=['object']).columns.tolist()
app.structure = html.Div([
html.H1("Automated Data Dashboard", style={'textAlign': 'center'}),
html.Div([
html.Label("Select X-axis:"),
dcc.Dropdown(
id='x-axis-dropdown',
options=[{'label': col, 'value': col} for col in numeric_cols + categorical_cols],
worth=numeric_cols[0] if numeric_cols else categorical_cols[0]
)
], model={'width': '48%', 'show': 'inline-block'}),
html.Div([
html.Label("Select Y-axis:"),
dcc.Dropdown(
id='y-axis-dropdown',
options=[{'label': col, 'value': col} for col in numeric_cols],
worth=numeric_cols[1] if len(numeric_cols) > 1 else numeric_cols[0]
)
], model={'width': '48%', 'float': 'proper', 'show': 'inline-block'}),
dcc.Graph(id='main-graph'),
dcc.Graph(id='distribution-graph')
])
@app.callback(
[Output('main-graph', 'figure'),
Output('distribution-graph', 'figure')],
[Input('x-axis-dropdown', 'value'),
Input('y-axis-dropdown', 'value')]
)
def update_graphs(x_axis, y_axis):
# Scatter plot
scatter_fig = px.scatter(df, x=x_axis, y=y_axis, title=f'{y_axis} vs {x_axis}')
# Distribution plot
if x_axis in numeric_cols:
dist_fig = px.histogram(df, x=x_axis, title=f'Distribution of {x_axis}')
else:
dist_fig = px.bar(df[x_axis].value_counts().reset_index(),
x='index', y=x_axis, title=f'Rely of {x_axis}')
return scatter_fig, dist_fig
return app
# Utilization
dashboard = create_automated_dashboard(df)
dashboard.run_server(debug=True)
Actual-world use case: A gross sales supervisor wants real-time insights into crew efficiency metrics. The dashboard routinely updates with new gross sales knowledge and permits filtering by area, product, or time interval with out requiring technical data.
Advantages of automation:
- Creates skilled dashboards with out frontend improvement abilities
- Allows self-service analytics for non-technical stakeholders
- Updates routinely with new knowledge
- Reduces dependency on knowledge visualization specialists
4. Net Scraping for Knowledge Assortment with BeautifulSoup
Manually accumulating knowledge from web sites is tedious and error-prone. This script automates net scraping with built-in error dealing with and fee limiting.
import requests
from bs4 import BeautifulSoup
import pandas as pd
import time
import randomdef automated_web_scraper(urls, delay_range=(1, 3)):
# Automated net scraper with error dealing with and fee limiting
scraped_data = []
for i, url in enumerate(urls):
attempt:
# Random delay to keep away from being blocked
time.sleep(random.uniform(*delay_range))
# Make request with headers to look extra human-like
headers = {
'Person-Agent': 'Mozilla/5.0 (Home windows NT 10.0; Win64; x64) AppleWebKit/537.36'
}
response = requests.get(url, headers=headers, timeout=10)
response.raise_for_status()
# Parse HTML
soup = BeautifulSoup(response.content material, 'html.parser')
# Extract knowledge (customise primarily based on course web site construction)
knowledge = {
'url': url,
'title': soup.discover('title').textual content.strip() if soup.discover('title') else 'N/A',
'meta_description': '',
'headings': [h.text.strip() for h in soup.find_all(['h1', 'h2', 'h3'])[:5]],
'scraped_at': pd.Timestamp.now()
}
# Extract meta description
meta_desc = soup.discover('meta', attrs={'identify': 'description'})
if meta_desc:
knowledge['meta_description'] = meta_desc.get('content material', '')
scraped_data.append(knowledge)
print(f"Scraped {i+1}/{len(urls)}: {url}")
besides Exception as e:
print(f"Error scraping {url}: {str(e)}")
scraped_data.append({
'url': url,
'error': str(e),
'scraped_at': pd.Timestamp.now()
})
return pd.DataFrame(scraped_data)
# Utilization
urls_to_scrape = [
'https://example1.com',
'https://example2.com',
'https://example3.com'
]
scraped_df = automated_web_scraper(urls_to_scrape)
scraped_df.to_csv('scraped_data.csv', index=False)
Actual-world use case: A market analysis crew wants to observe competitor pricing throughout 500+ product pages each day. This script collects pricing knowledge routinely and identifies worth adjustments with out guide checking.
Advantages of automation:
- Collects knowledge 24/7 with out human intervention
- Handles errors gracefully and continues processing
- Scales to 1000’s of URLs simply
- Maintains constant knowledge assortment schedules
5. Automating Mannequin Coaching with Scikit-learn Pipelines
Mannequin coaching usually entails repetitive preprocessing steps and parameter tuning. This script creates reusable pipelines that standardize all the machine studying workflow.
from sklearn.pipeline import Pipeline
from sklearn.compose import ColumnTransformer
from sklearn.preprocessing import StandardScaler, OneHotEncoder
from sklearn.impute import SimpleImputer
from sklearn.ensemble import RandomForestClassifier
from sklearn.model_selection import cross_val_score, train_test_split
from sklearn.metrics import classification_report
import pandas as pd
import joblibdef create_automated_ml_pipeline(df, target_column, model_type='classification'):
# Create and practice automated ML pipeline
X = df.drop(columns=[target_column])
y = df[target_column]
# Determine column varieties
numeric_features = X.select_dtypes(embody=['int64', 'float64']).columns
categorical_features = X.select_dtypes(embody=['object']).columns
# Create preprocessing pipeline
numeric_transformer = Pipeline(steps=[
('imputer', SimpleImputer(strategy='median')),
('scaler', StandardScaler())
])
categorical_transformer = Pipeline(steps=[
('imputer', SimpleImputer(strategy='constant', fill_value='missing')),
('onehot', OneHotEncoder(handle_unknown='ignore'))
])
# Mix preprocessing steps
preprocessor = ColumnTransformer(
transformers=[
('num', numeric_transformer, numeric_features),
('cat', categorical_transformer, categorical_features)
]
)
# Create full pipeline with mannequin
if model_type == 'classification':
mannequin = RandomForestClassifier(n_estimators=100, random_state=42)
else:
from sklearn.ensemble import RandomForestRegressor
mannequin = RandomForestRegressor(n_estimators=100, random_state=42)
pipeline = Pipeline(steps=[
('preprocessor', preprocessor),
('classifier', model)
])
# Break up knowledge and practice
X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.2, random_state=42)
# Prepare pipeline
pipeline.match(X_train, y_train)
# Consider mannequin
cv_scores = cross_val_score(pipeline, X_train, y_train, cv=5)
test_score = pipeline.rating(X_test, y_test)
# Generate predictions and report
y_pred = pipeline.predict(X_test)
outcomes = {
'pipeline': pipeline,
'cv_scores': cv_scores,
'test_score': test_score,
'classification_report': classification_report(y_test, y_pred) if model_type == 'classification' else None
}
# Save pipeline
joblib.dump(pipeline, f'automated_ml_pipeline_{target_column}.pkl')
return outcomes
# Utilization
outcomes = create_automated_ml_pipeline(df, 'target_column', 'classification')
print(f"Cross-validation rating: {outcomes['cv_scores'].imply():.3f} (+/- {outcomes['cv_scores'].std() * 2:.3f})")
Actual-world use case: A monetary establishment must retrain fraud detection fashions weekly with new transaction knowledge. This pipeline routinely handles knowledge preprocessing, mannequin coaching, and validation with out guide intervention.
Advantages of automation:
- Ensures constant preprocessing throughout completely different datasets
- Reduces mannequin improvement time from days to hours
- Prevents knowledge leakage via correct pipeline construction
- Allows straightforward mannequin deployment and model management
6. Function Engineering with Function-engine
Function engineering usually requires area experience and repetitive coding. This script automates frequent characteristic engineering duties with clever defaults and customizable choices.
from feature_engine.creation import CombineWithReferenceFeature, MathFeatures
from feature_engine.discretisation import EqualFrequencyDiscretiser
from feature_engine.encoding import RareLabelEncoder, OneHotEncoder
from feature_engine.transformation import LogTransformer, BoxCoxTransformer
from feature_engine.choice import DropConstantFeatures, DropDuplicateFeatures
import pandas as pd
import numpy as npdef automated_feature_engineering(df, target_column=None):
# Separate options and goal
if target_column:
X = df.drop(columns=[target_column])
y = df[target_column]
else:
X = df.copy()
y = None
numeric_vars = X.select_dtypes(embody=['int64', 'float64']).columns.tolist()
categorical_vars = X.select_dtypes(embody=['object']).columns.tolist()
print(f"Beginning characteristic engineering with {len(X.columns)} options...")
# 1. Take away fixed and duplicate options
constant_dropper = DropConstantFeatures()
duplicate_dropper = DropDuplicateFeatures()
X = constant_dropper.fit_transform(X)
X = duplicate_dropper.fit_transform(X)
# 2. Deal with uncommon classes in categorical variables
if categorical_vars:
rare_encoder = RareLabelEncoder(tol=0.01, n_categories=10)
X = rare_encoder.fit_transform(X)
# 3. Create mathematical mixtures of numeric options
if len(numeric_vars) >= 2:
math_combiner = MathFeatures(
variables=numeric_vars[:5], # Restrict to first 5 to keep away from explosion
func=['sum', 'prod', 'mean'],
reference=['mean']
)
X = math_combiner.fit_transform(X)
# 4. Apply transformations to numeric variables
# Log transformation for skewed variables
skewed_vars = []
for var in numeric_vars:
if X[var].min() > 0: # Log solely optimistic values
skewness = X[var].skew()
if abs(skewness) > 1:
skewed_vars.append(var)
if skewed_vars:
log_transformer = LogTransformer(variables=skewed_vars)
X = log_transformer.fit_transform(X)
# 5. Discretize steady variables
if len(numeric_vars) > 0:
discretizer = EqualFrequencyDiscretiser(
variables=numeric_vars[:3], # Discretize first 3 numeric vars
q=5,
return_object=True
)
X_discrete = discretizer.fit_transform(X)
# Add discretized variations with suffix
for var in discretizer.variables:
X[f'{var}_binned'] = X_discrete[var]
# 6. One-hot encode categorical variables
updated_categorical_vars = X.select_dtypes(embody=['object']).columns.tolist()
if updated_categorical_vars:
ohe = OneHotEncoder(
variables=updated_categorical_vars,
drop_last=True
)
X = ohe.fit_transform(X)
print(f"Function engineering full. New characteristic depend: {len(X.columns)}")
# Create characteristic significance abstract
feature_summary = {
'original_features': len(df.columns) - (1 if target_column else 0),
'final_features': len(X.columns),
'features_created': len(X.columns) - len(df.columns) + (1 if target_column else 0),
'numeric_features': len(X.select_dtypes(embody=['int64', 'float64']).columns),
'categorical_features': len(X.select_dtypes(embody=['object']).columns),
'binary_features': len([col for col in X.columns if X[col].nunique() == 2])
}
return X, feature_summary
# Utilization
X_engineered, abstract = automated_feature_engineering(df, 'target_column')
print(f"Created {abstract['features_created']} new options")
Actual-world use case: An e-commerce firm needs to enhance their advice system by creating significant options from person conduct knowledge, product attributes, and transaction historical past with out manually coding lots of of characteristic mixtures.
Advantages of automation:
- Systematically explores characteristic mixtures that people would possibly miss
- Applies domain-agnostic transformations constantly
- Scales characteristic engineering to massive datasets
- Paperwork characteristic creation course of for reproducibility
7. Automated Hyperparameter Tuning with Optuna
Handbook hyperparameter tuning is time-consuming and sometimes suboptimal. This script makes use of superior optimization algorithms to seek out the most effective parameters routinely.
import optuna
from sklearn.ensemble import RandomForestClassifier
from sklearn.model_selection import cross_val_score
from sklearn.metrics import accuracy_score
import pandas as pd
import numpy as npdef automated_hyperparameter_tuning(X, y, model_type='random_forest', n_trials=100):
def goal(trial):
if model_type == 'random_forest':
params = {
'n_estimators': trial.suggest_int('n_estimators', 50, 300),
'max_depth': trial.suggest_int('max_depth', 3, 20),
'min_samples_split': trial.suggest_int('min_samples_split', 2, 20),
'min_samples_leaf': trial.suggest_int('min_samples_leaf', 1, 10),
'max_features': trial.suggest_categorical('max_features', ['auto', 'sqrt', 'log2']),
'bootstrap': trial.suggest_categorical('bootstrap', [True, False])
}
mannequin = RandomForestClassifier(**params, random_state=42, n_jobs=-1)
elif model_type == 'xgboost':
import xgboost as xgb
params = {
'n_estimators': trial.suggest_int('n_estimators', 50, 300),
'max_depth': trial.suggest_int('max_depth', 3, 10),
'learning_rate': trial.suggest_float('learning_rate', 0.01, 0.3),
'subsample': trial.suggest_float('subsample', 0.6, 1.0),
'colsample_bytree': trial.suggest_float('colsample_bytree', 0.6, 1.0),
'reg_alpha': trial.suggest_float('reg_alpha', 0, 10),
'reg_lambda': trial.suggest_float('reg_lambda', 0, 10)
}
mannequin = xgb.XGBClassifier(**params, random_state=42, n_jobs=-1)
# Carry out cross-validation
cv_scores = cross_val_score(mannequin, X, y, cv=5, scoring='accuracy', n_jobs=-1)
return cv_scores.imply()
# Create research and optimize
research = optuna.create_study(route='maximize')
research.optimize(goal, n_trials=n_trials, show_progress_bar=True)
# Get finest parameters and rating
best_params = research.best_params
best_score = research.best_value
# Prepare remaining mannequin with finest parameters
if model_type == 'random_forest':
best_model = RandomForestClassifier(**best_params, random_state=42, n_jobs=-1)
elif model_type == 'xgboost':
import xgboost as xgb
best_model = xgb.XGBClassifier(**best_params, random_state=42, n_jobs=-1)
best_model.match(X, y)
# Generate optimization historical past
optimization_history = pd.DataFrame({
'trial': vary(len(research.trials)),
'worth': [trial.value for trial in study.trials],
'params': [trial.params for trial in study.trials]
})
outcomes = {
'best_model': best_model,
'best_params': best_params,
'best_score': best_score,
'research': research,
'optimization_history': optimization_history
}
return outcomes
# Utilization
tuning_results = automated_hyperparameter_tuning(X_train, y_train, 'random_forest', n_trials=50)
print(f"Greatest cross-validation rating: {tuning_results['best_score']:.4f}")
print(f"Greatest parameters: {tuning_results['best_params']}")
# Plot optimization historical past
import matplotlib.pyplot as plt
plt.determine(figsize=(10, 6))
plt.plot(tuning_results['optimization_history']['trial'],
tuning_results['optimization_history']['value'])
plt.xlabel('Trial')
plt.ylabel('Accuracy')
plt.title('Hyperparameter Optimization Progress')
plt.present()
Actual-world use case: A machine studying crew must optimize fashions for various shopper initiatives with various datasets and necessities. This script routinely finds optimum parameters for every use case with out guide experimentation.
Advantages of automation:
- Finds higher parameters than guide tuning
- Saves weeks of guide experimentation
- Makes use of clever search algorithms as a substitute of grid search
- Gives optimization insights and visualizations
8. Mannequin Analysis Experiences with Yellowbrick
Creating complete mannequin analysis stories manually requires writing intensive plotting and evaluation code. This script generates skilled analysis stories routinely.
from yellowbrick.classifier import ClassificationReport, ROCAUC, ConfusionMatrix
from yellowbrick.model_selection import ValidationCurve, LearningCurve
from yellowbrick.options import FeatureImportances
import matplotlib.pyplot as plt
from sklearn.model_selection import train_test_split
import pandas as pddef automated_model_evaluation(mannequin, X, y, model_name="Mannequin"):
# Break up knowledge
X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.2, random_state=42)
# Create determine with subplots
fig, axes = plt.subplots(2, 3, figsize=(18, 12))
fig.suptitle(f'{model_name} Analysis Report', fontsize=16, fontweight='daring')
# 1. Classification Report
visualizer1 = ClassificationReport(mannequin, ax=axes[0,0], help=True)
visualizer1.match(X_train, y_train)
visualizer1.rating(X_test, y_test)
visualizer1.finalize()
# 2. ROC-AUC Curve
visualizer2 = ROCAUC(mannequin, ax=axes[0,1])
visualizer2.match(X_train, y_train)
visualizer2.rating(X_test, y_test)
visualizer2.finalize()
# 3. Confusion Matrix
visualizer3 = ConfusionMatrix(mannequin, ax=axes[0,2])
visualizer3.match(X_train, y_train)
visualizer3.rating(X_test, y_test)
visualizer3.finalize()
# 4. Function Significance (if mannequin helps it)
if hasattr(mannequin, 'feature_importances_'):
visualizer4 = FeatureImportances(mannequin, ax=axes[1,0])
visualizer4.match(X_train, y_train)
visualizer4.finalize()
else:
axes[1,0].textual content(0.5, 0.5, 'Function ImportancenNot Out there',
ha='middle', va='middle', remodel=axes[1,0].transAxes)
# 5. Studying Curve
visualizer5 = LearningCurve(mannequin, ax=axes[1,1], scoring='accuracy')
visualizer5.match(X, y)
visualizer5.finalize()
# 6. Validation Curve (instance with max_depth for tree-based fashions)
if hasattr(mannequin, 'max_depth'):
visualizer6 = ValidationCurve(
mannequin, ax=axes[1,2], param_name='max_depth',
param_range=vary(1, 11), scoring='accuracy'
)
visualizer6.match(X, y)
visualizer6.finalize()
else:
axes[1,2].textual content(0.5, 0.5, 'Validation CurvenNot Out there',
ha='middle', va='middle', remodel=axes[1,2].transAxes)
plt.tight_layout()
plt.savefig(f'{model_name.decrease().change(" ", "_")}_evaluation_report.png',
dpi=300, bbox_inches='tight')
plt.present()
# Generate numerical abstract
mannequin.match(X_train, y_train)
train_score = mannequin.rating(X_train, y_train)
test_score = mannequin.rating(X_test, y_test)
abstract = {
'model_name': model_name,
'train_accuracy': train_score,
'test_accuracy': test_score,
'overfit_gap': train_score - test_score,
'total_features': X.form[1],
'training_samples': X_train.form[0],
'test_samples': X_test.form[0]
}
return abstract
def compare_multiple_models(models_dict, X, y):
"""
Evaluate a number of fashions and generate comparability report
"""
outcomes = []
for model_name, mannequin in models_dict.objects():
print(f"Evaluating {model_name}...")
abstract = automated_model_evaluation(mannequin, X, y, model_name)
outcomes.append(abstract)
# Create comparability DataFrame
comparison_df = pd.DataFrame(outcomes)
comparison_df = comparison_df.sort_values('test_accuracy', ascending=False)
# Save comparability report
comparison_df.to_csv('model_comparison_report.csv', index=False)
return comparison_df
# Utilization
from sklearn.ensemble import RandomForestClassifier
from sklearn.svm import SVC
from sklearn.linear_model import LogisticRegression
fashions = {
'Random Forest': RandomForestClassifier(n_estimators=100, random_state=42),
'SVM': SVC(random_state=42, likelihood=True),
'Logistic Regression': LogisticRegression(random_state=42)
}
comparison_results = compare_multiple_models(fashions, X, y)
print("nModel Comparability Outcomes:")
print(comparison_results[['model_name', 'test_accuracy', 'overfit_gap']])
Actual-world use case: A consulting agency must current mannequin efficiency outcomes to shoppers with clear visualizations and metrics. This script generates skilled stories that may be immediately included in shopper shows.
Advantages of automation:
- Creates publication-ready analysis stories
- Ensures constant analysis metrics throughout initiatives
- Identifies overfitting and efficiency points routinely
- Allows straightforward mannequin comparability and choice
9. Automating Dataset Versioning with DVC
Knowledge versioning is essential for reproducible machine studying however usually ignored on account of complexity. This script automates dataset versioning and experiment monitoring.
import dvc.api
import pandas as pd
import os
import git
from datetime import datetime
import hashlib
import jsonclass AutomatedDataVersioning:
def __init__(self, project_path="."):
self.project_path = project_path
self.data_dir = os.path.be a part of(project_path, "knowledge")
self.dvc_dir = os.path.be a part of(project_path, ".dvc")
# Initialize directories
os.makedirs(self.data_dir, exist_ok=True)
def setup_dvc_project(self):
"""Initialize DVC mission if not already initialized"""
attempt:
if not os.path.exists(self.dvc_dir):
os.system(f"cd {self.project_path} && dvc init")
print("DVC mission initialized")
else:
print("DVC mission already exists")
besides Exception as e:
print(f"Error initializing DVC: {e}")
def add_dataset_version(self, dataframe, dataset_name, description=""):
"""Add new model of dataset with automated monitoring"""
timestamp = datetime.now().strftime("%Ypercentmpercentd_percentHpercentMpercentS")
# Generate knowledge hash for uniqueness
data_string = dataframe.to_string()
data_hash = hashlib.md5(data_string.encode()).hexdigest()[:8]
# Create versioned filename
filename = f"{dataset_name}_{timestamp}_{data_hash}.csv"
filepath = os.path.be a part of(self.data_dir, filename)
# Save dataset
dataframe.to_csv(filepath, index=False)
# Create metadata
metadata = {
'dataset_name': dataset_name,
'timestamp': timestamp,
'description': description,
'form': dataframe.form,
'columns': listing(dataframe.columns),
'data_hash': data_hash,
'file_size': os.path.getsize(filepath),
'missing_values': dataframe.isnull().sum().sum(),
'dtypes': dataframe.dtypes.to_dict()
}
# Save metadata
metadata_file = filepath.change('.csv', '_metadata.json')
with open(metadata_file, 'w') as f:
json.dump(metadata, f, indent=2, default=str)
# Add to DVC monitoring
attempt:
os.system(f"cd {self.project_path} && dvc add {filepath}")
print(f"Dataset model saved: {filename}")
print(f"Form: {dataframe.form}, Hash: {data_hash}")
besides Exception as e:
print(f"Error including to DVC: {e}")
return filepath, metadata
def list_dataset_versions(self, dataset_name=None):
"""Listing all variations of datasets"""
variations = []
for file in os.listdir(self.data_dir):
if file.endswith('_metadata.json'):
with open(os.path.be a part of(self.data_dir, file), 'r') as f:
metadata = json.load(f)
if dataset_name is None or metadata['dataset_name'] == dataset_name:
variations.append(metadata)
return pd.DataFrame(variations).sort_values('timestamp', ascending=False)
def load_dataset_version(self, dataset_name, version_hash=None):
"""Load particular model of dataset"""
variations = self.list_dataset_versions(dataset_name)
if version_hash:
model = variations[versions['data_hash'] == version_hash]
else:
model = variations.iloc[0] # Newest model
if len(model) == 0:
elevate ValueError(f"Model not discovered for {dataset_name}")
filename = f"{dataset_name}_{model.iloc[0]['timestamp']}_{model.iloc[0]['data_hash']}.csv"
filepath = os.path.be a part of(self.data_dir, filename)
return pd.read_csv(filepath)
# Utilization
versioning = AutomatedDataVersioning()
versioning.setup_dvc_project()
# Add new dataset model
filepath, metadata = versioning.add_dataset_version(
df,
"customer_data",
"Preliminary buyer dataset with demographics"
)
# Listing all variations
variations = versioning.list_dataset_versions("customer_data")
print(variations[['dataset_name', 'timestamp', 'shape', 'data_hash']])
# Load particular model
df_v1 = versioning.load_dataset_version("customer_data", version_hash="abc12345")
Actual-world use case: A machine studying crew engaged on a buyer churn mannequin wants to trace completely different variations of their coaching knowledge as new buyer segments are added and options are engineered, guaranteeing they’ll reproduce any earlier mannequin outcomes.
Advantages of automation:
- Ensures reproducibility of machine studying experiments
- Tracks knowledge lineage and adjustments routinely
- Prevents knowledge loss and allows rollback capabilities
- Integrates with Git for full mission versioning
10. Scheduling & Monitoring Scripts with APScheduler
Knowledge science workflows usually have to run on schedules or reply to occasions. This script creates a strong scheduling system with monitoring and error dealing with.
from apscheduler.schedulers.blocking import BlockingScheduler
from apscheduler.schedulers.background import BackgroundScheduler
from apscheduler.triggers.cron import CronTrigger
import pandas as pd
import logging
import smtplib
from e-mail.mime.textual content import MIMEText
from datetime import datetime
import osclass AutomatedDataPipeline:
def __init__(self, config):
self.config = config
self.scheduler = BackgroundScheduler()
self.setup_logging()
def setup_logging(self):
"""Setup logging for pipeline monitoring"""
logging.basicConfig(
degree=logging.INFO,
format='%(asctime)s - %(levelname)s - %(message)s',
handlers=[
logging.FileHandler('pipeline.log'),
logging.StreamHandler()
]
)
self.logger = logging.getLogger(__name__)
def send_notification(self, topic, message, is_error=False):
"""Ship e-mail notification on success or failure"""
attempt:
if 'e-mail' in self.config:
msg = MIMEText(message)
msg['Subject'] = f"{'ERROR: ' if is_error else ''}{topic}"
msg['From'] = self.config['email']['from']
msg['To'] = self.config['email']['to']
server = smtplib.SMTP(self.config['email']['smtp_server'], 587)
server.starttls()
server.login(self.config['email']['username'], self.config['email']['password'])
server.send_message(msg)
server.give up()
self.logger.information(f"Notification despatched: {topic}")
besides Exception as e:
self.logger.error(f"Didn't ship notification: {e}")
def data_collection_job(self):
"""Automated knowledge assortment job"""
attempt:
self.logger.information("Beginning knowledge assortment job")
# Simulate knowledge assortment (change with precise logic)
knowledge = pd.DataFrame({
'timestamp': [datetime.now()],
'records_collected': [1000],
'standing': ['success']
})
# Save collected knowledge
filename = f"collected_data_{datetime.now().strftime('%Ypercentmpercentd_percentHpercentMpercentS')}.csv"
knowledge.to_csv(filename, index=False)
self.logger.information(f"Knowledge assortment accomplished: {filename}")
self.send_notification("Knowledge Assortment Success", f"Collected {len(knowledge)} information")
besides Exception as e:
error_msg = f"Knowledge assortment failed: {str(e)}"
self.logger.error(error_msg)
self.send_notification("Knowledge Assortment Failed", error_msg, is_error=True)
def model_training_job(self):
"""Automated mannequin coaching job"""
attempt:
self.logger.information("Beginning mannequin coaching job")
# Load newest knowledge
data_files = [f for f in os.listdir('.') if f.startswith('collected_data_')]
if not data_files:
elevate ValueError("No knowledge information discovered for coaching")
latest_file = max(data_files)
df = pd.read_csv(latest_file)
# Simulate mannequin coaching (change with precise logic)
from sklearn.ensemble import RandomForestClassifier
from sklearn.model_selection import cross_val_score
# Dummy coaching course of
mannequin = RandomForestClassifier(n_estimators=100)
# X, y = prepare_features(df) # Your characteristic preparation logic
# scores = cross_val_score(mannequin, X, y, cv=5)
model_filename = f"model_{datetime.now().strftime('%Ypercentmpercentd_percentHpercentMpercentS')}.pkl"
# joblib.dump(mannequin, model_filename)
self.logger.information(f"Mannequin coaching accomplished: {model_filename}")
self.send_notification("Mannequin Coaching Success", f"Mannequin saved as {model_filename}")
besides Exception as e:
error_msg = f"Mannequin coaching failed: {str(e)}"
self.logger.error(error_msg)
self.send_notification("Mannequin Coaching Failed", error_msg, is_error=True)
def data_quality_check(self):
"""Automated knowledge high quality monitoring"""
attempt:
self.logger.information("Beginning knowledge high quality verify")
# Discover newest knowledge file
data_files = [f for f in os.listdir('.') if f.startswith('collected_data_')]
if not data_files:
elevate ValueError("No knowledge information discovered for high quality verify")
latest_file = max(data_files)
df = pd.read_csv(latest_file)
# Carry out high quality checks
quality_report = {
'total_records': len(df),
'missing_values': df.isnull().sum().sum(),
'duplicate_records': df.duplicated().sum(),
'missing_percentage': (df.isnull().sum().sum() / (len(df) * len(df.columns))) * 100
}
# Set high quality thresholds
if quality_report['missing_percentage'] > 10:
elevate ValueError(f"Excessive lacking knowledge: {quality_report['missing_percentage']:.2f}%")
if quality_report['duplicate_records'] > len(df) * 0.05:
elevate ValueError(f"Excessive duplicate fee: {quality_report['duplicate_records']} information")
self.logger.information("Knowledge high quality verify handed")
self.send_notification("Knowledge High quality Verify", f"High quality metrics: {quality_report}")
besides Exception as e:
error_msg = f"Knowledge high quality verify failed: {str(e)}"
self.logger.error(error_msg)
self.send_notification("Knowledge High quality Alert", error_msg, is_error=True)
def setup_schedules(self):
"""Setup automated schedules for all jobs"""
# Each day knowledge assortment at 2 AM
self.scheduler.add_job(
self.data_collection_job,
CronTrigger(hour=2, minute=0),
id='data_collection',
replace_existing=True
)
# Weekly mannequin coaching on Sundays at 3 AM
self.scheduler.add_job(
self.model_training_job,
CronTrigger(day_of_week=6, hour=3, minute=0),
id='model_training',
replace_existing=True
)
# Hourly knowledge high quality checks
self.scheduler.add_job(
self.data_quality_check,
CronTrigger(minute=0),
id='quality_check',
replace_existing=True
)
self.logger.information("All schedules configured")
def start_pipeline(self):
"""Begin the automated pipeline"""
self.setup_schedules()
self.scheduler.begin()
self.logger.information("Automated pipeline began")
attempt:
# Preserve the script operating
import time
whereas True:
time.sleep(1)
besides KeyboardInterrupt:
self.logger.information("Pipeline stopped by person")
self.scheduler.shutdown()
# Configuration
config = {
'e-mail': {
'smtp_server': 'smtp.gmail.com',
'username': 'your_email@gmail.com',
'password': 'your_app_password',
'from': 'your_email@gmail.com',
'to': 'alerts@firm.com'
}
}
# Utilization
pipeline = AutomatedDataPipeline(config)
# Run jobs manually for testing
pipeline.data_collection_job()
pipeline.data_quality_check()
# Begin automated pipeline (runs repeatedly)
# pipeline.start_pipeline()
Actual-world use case: An e-commerce firm must replace their advice fashions each day with new person conduct knowledge, verify knowledge high quality each hour, and retrain fashions weekly, all whereas monitoring for failures and sending alerts to the information crew.
Advantages of automation:
- Ensures constant execution of knowledge pipelines
- Gives quick alerts when points happen
- Reduces guide monitoring and intervention
- Scales to advanced multi-step workflows