Files
pi-skill/skills/git-split-push/scripts/batch-push.py
2026-05-25 16:41:08 +07:00

482 lines
17 KiB
Python

#!/usr/bin/env python3
"""
Git Split Push — Batch Large Files into Smaller Commits
Detects push errors and automatically splits untracked/staged files into
smaller batches to push successfully to GitHub/Gitea/GitLab.
Usage:
python3 batch-push.py # Auto-detect and batch
python3 batch-push.py --max-size 20 # Max 20MB per batch
python3 batch-push.py --dry-run # Show what would happen
python3 batch-push.py --untracked-only # Only untracked files
python3 batch-push.py --check-size # Just check total size
"""
import os
import sys
import subprocess
import re
from pathlib import Path
from dataclasses import dataclass
from typing import Optional
@dataclass
class FileInfo:
"""Information about a file to be pushed."""
path: str
size: int # bytes
staged: bool
untracked: bool
@property
def size_mb(self) -> float:
return self.size / (1024 * 1024)
class GitSplitPush:
"""Split large git pushes into smaller batches."""
DEFAULT_MAX_SIZE_MB = 20 # Conservative limit (50MB server limit)
def __init__(self, max_size_mb: float = None, dry_run: bool = False, verbose: bool = True):
self.max_size_mb = max_size_mb or self.DEFAULT_MAX_SIZE_MB
self.dry_run = dry_run
self.verbose = verbose
self.errors = []
self.pushed_batches = []
def run(self, command: list, capture: bool = True) -> tuple[int, str, str]:
"""Run a shell command and return exit code, stdout, stderr."""
try:
if capture:
result = subprocess.run(
command,
capture_output=True,
text=True,
cwd=self.get_git_root()
)
return result.returncode, result.stdout, result.stderr
else:
result = subprocess.run(command, cwd=self.get_git_root())
return result.returncode, "", ""
except Exception as e:
return 1, "", str(e)
def get_git_root(self) -> Optional[str]:
"""Find the git repository root."""
code, out, _ = self.run(["git", "rev-parse", "--show-toplevel"])
if code == 0:
return out.strip()
return os.getcwd()
def is_git_repo(self) -> bool:
"""Check if we're in a git repository."""
code, _, _ = self.run(["git", "status"])
return code == 0
def get_current_branch(self) -> Optional[str]:
"""Get the current branch name."""
code, out, _ = self.run(["git", "branch", "--show-current"])
if code == 0 and out.strip():
return out.strip()
# Try with rev-parse for detached HEAD
code, out, _ = self.run(["git", "rev-parse", "--abbrev-ref", "HEAD"])
if code == 0:
return out.strip()
return None
def get_status_files(self) -> list[FileInfo]:
"""Get all files that need to be pushed (staged and untracked)."""
files = []
# Get staged files
code, staged_out, _ = self.run(["git", "diff", "--cached", "--name-only"])
staged_files = staged_out.strip().split("\n") if staged_out.strip() else []
# Get untracked files
code, untracked_out, _ = self.run(["git", "ls-files", "--others", "--exclude-standard"])
untracked_files = untracked_out.strip().split("\n") if untracked_out.strip() else []
# Get modified files (not staged)
code, modified_out, _ = self.run(["git", "diff", "--name-only"])
modified_files = modified_out.strip().split("\n") if modified_out.strip() else []
git_root = self.get_git_root()
# Process staged files
for f in staged_files:
if f:
full_path = os.path.join(git_root, f) if git_root else f
size = self.get_file_size(full_path)
files.append(FileInfo(path=f, size=size, staged=True, untracked=False))
# Process untracked files
for f in untracked_files:
if f and f not in staged_files: # Skip if already staged
full_path = os.path.join(git_root, f) if git_root else f
size = self.get_file_size(full_path)
files.append(FileInfo(path=f, size=size, staged=False, untracked=True))
# Process modified files
for f in modified_files:
if f and f not in staged_files and f not in untracked_files:
full_path = os.path.join(git_root, f) if git_root else f
size = self.get_file_size(full_path)
files.append(FileInfo(path=f, size=size, staged=False, untracked=False))
return files
def get_file_size(self, path: str) -> int:
"""Get file size in bytes."""
try:
if os.path.isfile(path):
return os.path.getsize(path)
return 0
except:
return 0
def get_total_size(self, files: list[FileInfo]) -> float:
"""Calculate total size of files in MB."""
return sum(f.size_mb for f in files)
def check_push_size(self, files: list[FileInfo]) -> dict:
"""Check how many batches would be needed."""
total_size = self.get_total_size(files)
batches_needed = self.calculate_batches(files)
return {
"total_files": len(files),
"total_size_mb": total_size,
"batches_needed": batches_needed,
"max_size_mb": self.max_size_mb,
"files_too_large": [f for f in files if f.size_mb > self.max_size_mb]
}
def calculate_batches(self, files: list[FileInfo]) -> int:
"""Calculate how many batches would be needed."""
batches = []
current_batch = []
current_size = 0
# Sort by size (largest first for easier chunking)
sorted_files = sorted(files, key=lambda f: f.size, reverse=True)
for f in sorted_files:
# If single file exceeds max size, it needs its own batch
if f.size_mb > self.max_size_mb:
if current_batch:
batches.append(current_batch)
batches.append([f])
current_batch = []
current_size = 0
elif current_size + f.size_mb > self.max_size_mb:
batches.append(current_batch)
current_batch = [f]
current_size = f.size_mb
else:
current_batch.append(f)
current_size += f.size_mb
if current_batch:
batches.append(current_batch)
return len(batches)
def split_into_batches(self, files: list[FileInfo]) -> list[list[FileInfo]]:
"""Split files into batches under max size."""
batches = []
current_batch = []
current_size = 0
# Sort by size (largest first)
sorted_files = sorted(files, key=lambda f: f.size, reverse=True)
for f in sorted_files:
if f.size_mb > self.max_size_mb:
# Single file too large - add to a "too large" list
if current_batch:
batches.append(current_batch)
current_batch = []
current_size = 0
# Add as single-file batch (will be skipped in push)
batches.append([f])
elif current_size + f.size_mb > self.max_size_mb:
batches.append(current_batch)
current_batch = [f]
current_size = f.size_mb
else:
current_batch.append(f)
current_size += f.size_mb
if current_batch:
batches.append(current_batch)
return batches
def stage_files(self, files: list[FileInfo]):
"""Stage files for commit."""
paths = [f.path for f in files]
code, out, err = self.run(["git", "add", "--"] + paths)
return code == 0, out, err
def commit_batch(self, batch_num: int, total_batches: int) -> bool:
"""Create a commit for the current batch."""
message = f"[split-push] Batch {batch_num}/{total_batches}"
code, out, err = self.run(["git", "commit", "-m", message])
return code == 0
def push_batch(self, branch: str = None) -> tuple[bool, str, str]:
"""Push the current batch to remote."""
if not branch:
branch = self.get_current_branch()
code, out, err = self.run(["git", "push", "origin", branch])
return code == 0, out, err
def check_remaining(self) -> list[FileInfo]:
"""Check how many files are still uncommitted."""
return self.get_status_files()
def display_summary(self, summary: dict):
"""Display a summary of the push operation."""
print("\n" + "=" * 60)
print("Git Split Push — Summary")
print("=" * 60)
print(f"\nTotal files to push: {summary['total_files']}")
print(f"Total size: {summary['total_size_mb']:.2f} MB")
print(f"Batch size limit: {self.max_size_mb} MB")
print(f"Batches created: {summary['batches_pushed']}")
print(f"Batches failed: {summary['batches_failed']}")
if summary['files_too_large']:
print(f"\n⚠️ Files exceeding {self.max_size_mb}MB (skipped):")
for f in summary['files_too_large']:
print(f" - {f.path} ({f.size_mb:.2f} MB)")
if summary['files_too_large']:
print(f"\n💡 To push these files, either:")
print(" 1. Split the file manually (e.g., split --bytes=40M large.zip)")
print(" 2. Remove it from git history")
print(" 3. Use Git LFS (requires server support)")
print("\n" + "=" * 60)
def run_split_push(self, untracked_only: bool = False) -> dict:
"""Main entry point - run the split push operation."""
if not self.is_git_repo():
return {"success": False, "error": "Not in a git repository"}
branch = self.get_current_branch()
if not branch:
return {"success": False, "error": "Could not determine current branch"}
# Get all files to push
files = self.get_status_files()
if untracked_only:
files = [f for f in files if f.untracked]
if not files:
return {"success": True, "message": "Nothing to push", "batches_pushed": 0}
# Check sizes
check = self.check_push_size(files)
if check['files_too_large'] and all(f.size_mb > 100 for f in check['files_too_large']):
return {
"success": False,
"error": "Some files exceed 100MB. Use Git LFS or split manually.",
"files_too_large": check['files_too_large']
}
# Split into batches
batches = self.split_into_batches(files)
if self.dry_run:
print("\n🔍 Dry run - showing what would happen:\n")
for i, batch in enumerate(batches):
batch_size = sum(f.size_mb for f in batch)
too_large = any(f.size_mb > self.max_size_mb for f in batch)
status = "⚠️ TOO LARGE" if too_large else ""
print(f"Batch {i+1}: {status} ({batch_size:.2f} MB)")
for f in batch:
print(f" - {f.path} ({f.size_mb:.2f} MB)")
print()
return {"success": True, "dry_run": True, "batches": len(batches)}
# Push each batch
results = []
success_count = 0
fail_count = 0
files_too_large = []
print(f"\n🚀 Starting split push ({len(batches)} batches, max {self.max_size_mb} MB each):\n")
for i, batch in enumerate(batches, 1):
batch_size = sum(f.size_mb for f in batch)
# Check for oversized files
oversized = [f for f in batch if f.size_mb > self.max_size_mb]
if oversized:
files_too_large.extend(oversized)
print(f"⚠️ Batch {i}/{len(batches)} SKIPPED (file too large)")
for f in oversized:
print(f" - {f.path} ({f.size_mb:.2f} MB)")
fail_count += 1
continue
print(f"📦 Batch {i}/{len(batches)}: {batch_size:.2f} MB ({len(batch)} files)")
# Stage files
staged_ok, _, _ = self.stage_files(batch)
if not staged_ok:
print(f" ❌ Failed to stage files")
fail_count += 1
continue
# Commit
commit_ok = self.commit_batch(i, len(batches))
if not commit_ok:
print(f" ❌ Failed to commit batch")
fail_count += 1
continue
# Push
push_ok, push_out, push_err = self.push_batch(branch)
if push_ok:
print(f" ✅ Pushed batch {i}")
success_count += 1
self.pushed_batches.append(i)
else:
print(f" ❌ Push failed: {push_err[:200] if push_err else 'Unknown error'}")
# Try to reset the failed commit
self.run(["git", "reset", "--soft", "HEAD~1"])
fail_count += 1
summary = {
"success": fail_count == 0,
"total_files": len(files),
"total_size_mb": check['total_size_mb'],
"batches_pushed": success_count,
"batches_failed": fail_count,
"batches_total": len(batches),
"files_too_large": files_too_large
}
self.display_summary(summary)
return summary
def detect_push_error(error_output: str = None) -> bool:
"""Check if output contains a push size error."""
if error_output is None:
# Check the last git push output
code, out, err = subprocess.run(
["git", "push"],
capture_output=True,
text=True
)
error_output = out + err
error_patterns = [
"pack exceeds maximum allowed size",
"remote end hung up unexpectedly",
"fatal: protocol error",
"RPC failed; HTTP 413",
"413 Request Entity Too Large",
"error: packfile is too large",
]
for pattern in error_patterns:
if pattern.lower() in error_output.lower():
return True
return False
def main():
import argparse
parser = argparse.ArgumentParser(
description="Split large git pushes into smaller batches",
formatter_class=argparse.RawDescriptionHelpFormatter,
epilog="""
Examples:
%(prog)s # Auto-detect and batch all changes
%(prog)s --max-size 30 # Use 30MB max per batch
%(prog)s --dry-run # Show what would happen
%(prog)s --untracked-only # Only push untracked files
%(prog)s --check-size # Just check total size and exit
This tool helps when pushing to GitHub/Gitea/GitLab fails with:
"fatal: the remote end hung up unexpectedly"
"pack exceeds maximum allowed size"
"""
)
parser.add_argument(
"--max-size", type=float, default=None,
help="Maximum size per batch in MB (default: 20)"
)
parser.add_argument(
"--dry-run", action="store_true",
help="Show what would happen without making changes"
)
parser.add_argument(
"--untracked-only", action="store_true",
help="Only process untracked files (ignore staged changes)"
)
parser.add_argument(
"--check-size", action="store_true",
help="Just check total size and exit"
)
parser.add_argument(
"--verbose", action="store_true", default=True,
help="Show detailed output (default: on)"
)
args = parser.parse_args()
split_push = GitSplitPush(
max_size_mb=args.max_size,
dry_run=args.dry_run,
verbose=args.verbose
)
if args.check_size:
files = split_push.get_status_files()
check = split_push.check_push_size(files)
print(f"\n📊 Push Size Analysis:")
print(f" Total files: {check['total_files']}")
print(f" Total size: {check['total_size_mb']:.2f} MB")
print(f" Batches needed (at {check['max_size_mb']}MB): {check['batches_needed']}")
if check['files_too_large']:
print(f"\n ⚠️ Files too large for a single batch:")
for f in check['files_too_large']:
print(f" - {f.path}: {f.size_mb:.2f} MB")
return
result = split_push.run_split_push(untracked_only=args.untracked_only)
if result.get("dry_run"):
return
if not result.get("success") and result.get("error"):
print(f"\n{result['error']}")
sys.exit(1)
if result.get("batches_pushed", 0) > 0 and result.get("batches_failed", 0) == 0:
print("\n✅ All batches pushed successfully!")
elif result.get("batches_pushed", 0) > 0 and result.get("batches_failed", 0) > 0:
print("\n⚠️ Some batches failed. Check the summary above.")
if __name__ == "__main__":
main()