Initial: pi-skill — 68 skills, 43 extensions, 11 themes for Pi
This commit is contained in:
482
skills/git-split-push/scripts/batch-push.py
Normal file
482
skills/git-split-push/scripts/batch-push.py
Normal file
@@ -0,0 +1,482 @@
|
||||
#!/usr/bin/env python3
|
||||
"""
|
||||
Git Split Push — Batch Large Files into Smaller Commits
|
||||
|
||||
Detects push errors and automatically splits untracked/staged files into
|
||||
smaller batches to push successfully to GitHub/Gitea/GitLab.
|
||||
|
||||
Usage:
|
||||
python3 batch-push.py # Auto-detect and batch
|
||||
python3 batch-push.py --max-size 20 # Max 20MB per batch
|
||||
python3 batch-push.py --dry-run # Show what would happen
|
||||
python3 batch-push.py --untracked-only # Only untracked files
|
||||
python3 batch-push.py --check-size # Just check total size
|
||||
"""
|
||||
|
||||
import os
|
||||
import sys
|
||||
import subprocess
|
||||
import re
|
||||
from pathlib import Path
|
||||
from dataclasses import dataclass
|
||||
from typing import Optional
|
||||
|
||||
|
||||
@dataclass
|
||||
class FileInfo:
|
||||
"""Information about a file to be pushed."""
|
||||
path: str
|
||||
size: int # bytes
|
||||
staged: bool
|
||||
untracked: bool
|
||||
|
||||
@property
|
||||
def size_mb(self) -> float:
|
||||
return self.size / (1024 * 1024)
|
||||
|
||||
|
||||
class GitSplitPush:
|
||||
"""Split large git pushes into smaller batches."""
|
||||
|
||||
DEFAULT_MAX_SIZE_MB = 20 # Conservative limit (50MB server limit)
|
||||
|
||||
def __init__(self, max_size_mb: float = None, dry_run: bool = False, verbose: bool = True):
|
||||
self.max_size_mb = max_size_mb or self.DEFAULT_MAX_SIZE_MB
|
||||
self.dry_run = dry_run
|
||||
self.verbose = verbose
|
||||
self.errors = []
|
||||
self.pushed_batches = []
|
||||
|
||||
def run(self, command: list, capture: bool = True) -> tuple[int, str, str]:
|
||||
"""Run a shell command and return exit code, stdout, stderr."""
|
||||
try:
|
||||
if capture:
|
||||
result = subprocess.run(
|
||||
command,
|
||||
capture_output=True,
|
||||
text=True,
|
||||
cwd=self.get_git_root()
|
||||
)
|
||||
return result.returncode, result.stdout, result.stderr
|
||||
else:
|
||||
result = subprocess.run(command, cwd=self.get_git_root())
|
||||
return result.returncode, "", ""
|
||||
except Exception as e:
|
||||
return 1, "", str(e)
|
||||
|
||||
def get_git_root(self) -> Optional[str]:
|
||||
"""Find the git repository root."""
|
||||
code, out, _ = self.run(["git", "rev-parse", "--show-toplevel"])
|
||||
if code == 0:
|
||||
return out.strip()
|
||||
return os.getcwd()
|
||||
|
||||
def is_git_repo(self) -> bool:
|
||||
"""Check if we're in a git repository."""
|
||||
code, _, _ = self.run(["git", "status"])
|
||||
return code == 0
|
||||
|
||||
def get_current_branch(self) -> Optional[str]:
|
||||
"""Get the current branch name."""
|
||||
code, out, _ = self.run(["git", "branch", "--show-current"])
|
||||
if code == 0 and out.strip():
|
||||
return out.strip()
|
||||
# Try with rev-parse for detached HEAD
|
||||
code, out, _ = self.run(["git", "rev-parse", "--abbrev-ref", "HEAD"])
|
||||
if code == 0:
|
||||
return out.strip()
|
||||
return None
|
||||
|
||||
def get_status_files(self) -> list[FileInfo]:
|
||||
"""Get all files that need to be pushed (staged and untracked)."""
|
||||
files = []
|
||||
|
||||
# Get staged files
|
||||
code, staged_out, _ = self.run(["git", "diff", "--cached", "--name-only"])
|
||||
staged_files = staged_out.strip().split("\n") if staged_out.strip() else []
|
||||
|
||||
# Get untracked files
|
||||
code, untracked_out, _ = self.run(["git", "ls-files", "--others", "--exclude-standard"])
|
||||
untracked_files = untracked_out.strip().split("\n") if untracked_out.strip() else []
|
||||
|
||||
# Get modified files (not staged)
|
||||
code, modified_out, _ = self.run(["git", "diff", "--name-only"])
|
||||
modified_files = modified_out.strip().split("\n") if modified_out.strip() else []
|
||||
|
||||
git_root = self.get_git_root()
|
||||
|
||||
# Process staged files
|
||||
for f in staged_files:
|
||||
if f:
|
||||
full_path = os.path.join(git_root, f) if git_root else f
|
||||
size = self.get_file_size(full_path)
|
||||
files.append(FileInfo(path=f, size=size, staged=True, untracked=False))
|
||||
|
||||
# Process untracked files
|
||||
for f in untracked_files:
|
||||
if f and f not in staged_files: # Skip if already staged
|
||||
full_path = os.path.join(git_root, f) if git_root else f
|
||||
size = self.get_file_size(full_path)
|
||||
files.append(FileInfo(path=f, size=size, staged=False, untracked=True))
|
||||
|
||||
# Process modified files
|
||||
for f in modified_files:
|
||||
if f and f not in staged_files and f not in untracked_files:
|
||||
full_path = os.path.join(git_root, f) if git_root else f
|
||||
size = self.get_file_size(full_path)
|
||||
files.append(FileInfo(path=f, size=size, staged=False, untracked=False))
|
||||
|
||||
return files
|
||||
|
||||
def get_file_size(self, path: str) -> int:
|
||||
"""Get file size in bytes."""
|
||||
try:
|
||||
if os.path.isfile(path):
|
||||
return os.path.getsize(path)
|
||||
return 0
|
||||
except:
|
||||
return 0
|
||||
|
||||
def get_total_size(self, files: list[FileInfo]) -> float:
|
||||
"""Calculate total size of files in MB."""
|
||||
return sum(f.size_mb for f in files)
|
||||
|
||||
def check_push_size(self, files: list[FileInfo]) -> dict:
|
||||
"""Check how many batches would be needed."""
|
||||
total_size = self.get_total_size(files)
|
||||
batches_needed = self.calculate_batches(files)
|
||||
|
||||
return {
|
||||
"total_files": len(files),
|
||||
"total_size_mb": total_size,
|
||||
"batches_needed": batches_needed,
|
||||
"max_size_mb": self.max_size_mb,
|
||||
"files_too_large": [f for f in files if f.size_mb > self.max_size_mb]
|
||||
}
|
||||
|
||||
def calculate_batches(self, files: list[FileInfo]) -> int:
|
||||
"""Calculate how many batches would be needed."""
|
||||
batches = []
|
||||
current_batch = []
|
||||
current_size = 0
|
||||
|
||||
# Sort by size (largest first for easier chunking)
|
||||
sorted_files = sorted(files, key=lambda f: f.size, reverse=True)
|
||||
|
||||
for f in sorted_files:
|
||||
# If single file exceeds max size, it needs its own batch
|
||||
if f.size_mb > self.max_size_mb:
|
||||
if current_batch:
|
||||
batches.append(current_batch)
|
||||
batches.append([f])
|
||||
current_batch = []
|
||||
current_size = 0
|
||||
elif current_size + f.size_mb > self.max_size_mb:
|
||||
batches.append(current_batch)
|
||||
current_batch = [f]
|
||||
current_size = f.size_mb
|
||||
else:
|
||||
current_batch.append(f)
|
||||
current_size += f.size_mb
|
||||
|
||||
if current_batch:
|
||||
batches.append(current_batch)
|
||||
|
||||
return len(batches)
|
||||
|
||||
def split_into_batches(self, files: list[FileInfo]) -> list[list[FileInfo]]:
|
||||
"""Split files into batches under max size."""
|
||||
batches = []
|
||||
current_batch = []
|
||||
current_size = 0
|
||||
|
||||
# Sort by size (largest first)
|
||||
sorted_files = sorted(files, key=lambda f: f.size, reverse=True)
|
||||
|
||||
for f in sorted_files:
|
||||
if f.size_mb > self.max_size_mb:
|
||||
# Single file too large - add to a "too large" list
|
||||
if current_batch:
|
||||
batches.append(current_batch)
|
||||
current_batch = []
|
||||
current_size = 0
|
||||
# Add as single-file batch (will be skipped in push)
|
||||
batches.append([f])
|
||||
elif current_size + f.size_mb > self.max_size_mb:
|
||||
batches.append(current_batch)
|
||||
current_batch = [f]
|
||||
current_size = f.size_mb
|
||||
else:
|
||||
current_batch.append(f)
|
||||
current_size += f.size_mb
|
||||
|
||||
if current_batch:
|
||||
batches.append(current_batch)
|
||||
|
||||
return batches
|
||||
|
||||
def stage_files(self, files: list[FileInfo]):
|
||||
"""Stage files for commit."""
|
||||
paths = [f.path for f in files]
|
||||
code, out, err = self.run(["git", "add", "--"] + paths)
|
||||
return code == 0, out, err
|
||||
|
||||
def commit_batch(self, batch_num: int, total_batches: int) -> bool:
|
||||
"""Create a commit for the current batch."""
|
||||
message = f"[split-push] Batch {batch_num}/{total_batches}"
|
||||
code, out, err = self.run(["git", "commit", "-m", message])
|
||||
return code == 0
|
||||
|
||||
def push_batch(self, branch: str = None) -> tuple[bool, str, str]:
|
||||
"""Push the current batch to remote."""
|
||||
if not branch:
|
||||
branch = self.get_current_branch()
|
||||
code, out, err = self.run(["git", "push", "origin", branch])
|
||||
return code == 0, out, err
|
||||
|
||||
def check_remaining(self) -> list[FileInfo]:
|
||||
"""Check how many files are still uncommitted."""
|
||||
return self.get_status_files()
|
||||
|
||||
def display_summary(self, summary: dict):
|
||||
"""Display a summary of the push operation."""
|
||||
print("\n" + "=" * 60)
|
||||
print("Git Split Push — Summary")
|
||||
print("=" * 60)
|
||||
|
||||
print(f"\nTotal files to push: {summary['total_files']}")
|
||||
print(f"Total size: {summary['total_size_mb']:.2f} MB")
|
||||
print(f"Batch size limit: {self.max_size_mb} MB")
|
||||
print(f"Batches created: {summary['batches_pushed']}")
|
||||
print(f"Batches failed: {summary['batches_failed']}")
|
||||
|
||||
if summary['files_too_large']:
|
||||
print(f"\n⚠️ Files exceeding {self.max_size_mb}MB (skipped):")
|
||||
for f in summary['files_too_large']:
|
||||
print(f" - {f.path} ({f.size_mb:.2f} MB)")
|
||||
|
||||
if summary['files_too_large']:
|
||||
print(f"\n💡 To push these files, either:")
|
||||
print(" 1. Split the file manually (e.g., split --bytes=40M large.zip)")
|
||||
print(" 2. Remove it from git history")
|
||||
print(" 3. Use Git LFS (requires server support)")
|
||||
|
||||
print("\n" + "=" * 60)
|
||||
|
||||
def run_split_push(self, untracked_only: bool = False) -> dict:
|
||||
"""Main entry point - run the split push operation."""
|
||||
if not self.is_git_repo():
|
||||
return {"success": False, "error": "Not in a git repository"}
|
||||
|
||||
branch = self.get_current_branch()
|
||||
if not branch:
|
||||
return {"success": False, "error": "Could not determine current branch"}
|
||||
|
||||
# Get all files to push
|
||||
files = self.get_status_files()
|
||||
|
||||
if untracked_only:
|
||||
files = [f for f in files if f.untracked]
|
||||
|
||||
if not files:
|
||||
return {"success": True, "message": "Nothing to push", "batches_pushed": 0}
|
||||
|
||||
# Check sizes
|
||||
check = self.check_push_size(files)
|
||||
|
||||
if check['files_too_large'] and all(f.size_mb > 100 for f in check['files_too_large']):
|
||||
return {
|
||||
"success": False,
|
||||
"error": "Some files exceed 100MB. Use Git LFS or split manually.",
|
||||
"files_too_large": check['files_too_large']
|
||||
}
|
||||
|
||||
# Split into batches
|
||||
batches = self.split_into_batches(files)
|
||||
|
||||
if self.dry_run:
|
||||
print("\n🔍 Dry run - showing what would happen:\n")
|
||||
for i, batch in enumerate(batches):
|
||||
batch_size = sum(f.size_mb for f in batch)
|
||||
too_large = any(f.size_mb > self.max_size_mb for f in batch)
|
||||
status = "⚠️ TOO LARGE" if too_large else "✓"
|
||||
print(f"Batch {i+1}: {status} ({batch_size:.2f} MB)")
|
||||
for f in batch:
|
||||
print(f" - {f.path} ({f.size_mb:.2f} MB)")
|
||||
print()
|
||||
return {"success": True, "dry_run": True, "batches": len(batches)}
|
||||
|
||||
# Push each batch
|
||||
results = []
|
||||
success_count = 0
|
||||
fail_count = 0
|
||||
files_too_large = []
|
||||
|
||||
print(f"\n🚀 Starting split push ({len(batches)} batches, max {self.max_size_mb} MB each):\n")
|
||||
|
||||
for i, batch in enumerate(batches, 1):
|
||||
batch_size = sum(f.size_mb for f in batch)
|
||||
|
||||
# Check for oversized files
|
||||
oversized = [f for f in batch if f.size_mb > self.max_size_mb]
|
||||
if oversized:
|
||||
files_too_large.extend(oversized)
|
||||
print(f"⚠️ Batch {i}/{len(batches)} SKIPPED (file too large)")
|
||||
for f in oversized:
|
||||
print(f" - {f.path} ({f.size_mb:.2f} MB)")
|
||||
fail_count += 1
|
||||
continue
|
||||
|
||||
print(f"📦 Batch {i}/{len(batches)}: {batch_size:.2f} MB ({len(batch)} files)")
|
||||
|
||||
# Stage files
|
||||
staged_ok, _, _ = self.stage_files(batch)
|
||||
if not staged_ok:
|
||||
print(f" ❌ Failed to stage files")
|
||||
fail_count += 1
|
||||
continue
|
||||
|
||||
# Commit
|
||||
commit_ok = self.commit_batch(i, len(batches))
|
||||
if not commit_ok:
|
||||
print(f" ❌ Failed to commit batch")
|
||||
fail_count += 1
|
||||
continue
|
||||
|
||||
# Push
|
||||
push_ok, push_out, push_err = self.push_batch(branch)
|
||||
|
||||
if push_ok:
|
||||
print(f" ✅ Pushed batch {i}")
|
||||
success_count += 1
|
||||
self.pushed_batches.append(i)
|
||||
else:
|
||||
print(f" ❌ Push failed: {push_err[:200] if push_err else 'Unknown error'}")
|
||||
# Try to reset the failed commit
|
||||
self.run(["git", "reset", "--soft", "HEAD~1"])
|
||||
fail_count += 1
|
||||
|
||||
summary = {
|
||||
"success": fail_count == 0,
|
||||
"total_files": len(files),
|
||||
"total_size_mb": check['total_size_mb'],
|
||||
"batches_pushed": success_count,
|
||||
"batches_failed": fail_count,
|
||||
"batches_total": len(batches),
|
||||
"files_too_large": files_too_large
|
||||
}
|
||||
|
||||
self.display_summary(summary)
|
||||
|
||||
return summary
|
||||
|
||||
|
||||
def detect_push_error(error_output: str = None) -> bool:
|
||||
"""Check if output contains a push size error."""
|
||||
if error_output is None:
|
||||
# Check the last git push output
|
||||
code, out, err = subprocess.run(
|
||||
["git", "push"],
|
||||
capture_output=True,
|
||||
text=True
|
||||
)
|
||||
error_output = out + err
|
||||
|
||||
error_patterns = [
|
||||
"pack exceeds maximum allowed size",
|
||||
"remote end hung up unexpectedly",
|
||||
"fatal: protocol error",
|
||||
"RPC failed; HTTP 413",
|
||||
"413 Request Entity Too Large",
|
||||
"error: packfile is too large",
|
||||
]
|
||||
|
||||
for pattern in error_patterns:
|
||||
if pattern.lower() in error_output.lower():
|
||||
return True
|
||||
|
||||
return False
|
||||
|
||||
|
||||
def main():
|
||||
import argparse
|
||||
|
||||
parser = argparse.ArgumentParser(
|
||||
description="Split large git pushes into smaller batches",
|
||||
formatter_class=argparse.RawDescriptionHelpFormatter,
|
||||
epilog="""
|
||||
Examples:
|
||||
%(prog)s # Auto-detect and batch all changes
|
||||
%(prog)s --max-size 30 # Use 30MB max per batch
|
||||
%(prog)s --dry-run # Show what would happen
|
||||
%(prog)s --untracked-only # Only push untracked files
|
||||
%(prog)s --check-size # Just check total size and exit
|
||||
|
||||
This tool helps when pushing to GitHub/Gitea/GitLab fails with:
|
||||
"fatal: the remote end hung up unexpectedly"
|
||||
"pack exceeds maximum allowed size"
|
||||
"""
|
||||
)
|
||||
|
||||
parser.add_argument(
|
||||
"--max-size", type=float, default=None,
|
||||
help="Maximum size per batch in MB (default: 20)"
|
||||
)
|
||||
parser.add_argument(
|
||||
"--dry-run", action="store_true",
|
||||
help="Show what would happen without making changes"
|
||||
)
|
||||
parser.add_argument(
|
||||
"--untracked-only", action="store_true",
|
||||
help="Only process untracked files (ignore staged changes)"
|
||||
)
|
||||
parser.add_argument(
|
||||
"--check-size", action="store_true",
|
||||
help="Just check total size and exit"
|
||||
)
|
||||
parser.add_argument(
|
||||
"--verbose", action="store_true", default=True,
|
||||
help="Show detailed output (default: on)"
|
||||
)
|
||||
|
||||
args = parser.parse_args()
|
||||
|
||||
split_push = GitSplitPush(
|
||||
max_size_mb=args.max_size,
|
||||
dry_run=args.dry_run,
|
||||
verbose=args.verbose
|
||||
)
|
||||
|
||||
if args.check_size:
|
||||
files = split_push.get_status_files()
|
||||
check = split_push.check_push_size(files)
|
||||
|
||||
print(f"\n📊 Push Size Analysis:")
|
||||
print(f" Total files: {check['total_files']}")
|
||||
print(f" Total size: {check['total_size_mb']:.2f} MB")
|
||||
print(f" Batches needed (at {check['max_size_mb']}MB): {check['batches_needed']}")
|
||||
|
||||
if check['files_too_large']:
|
||||
print(f"\n ⚠️ Files too large for a single batch:")
|
||||
for f in check['files_too_large']:
|
||||
print(f" - {f.path}: {f.size_mb:.2f} MB")
|
||||
|
||||
return
|
||||
|
||||
result = split_push.run_split_push(untracked_only=args.untracked_only)
|
||||
|
||||
if result.get("dry_run"):
|
||||
return
|
||||
|
||||
if not result.get("success") and result.get("error"):
|
||||
print(f"\n❌ {result['error']}")
|
||||
sys.exit(1)
|
||||
|
||||
if result.get("batches_pushed", 0) > 0 and result.get("batches_failed", 0) == 0:
|
||||
print("\n✅ All batches pushed successfully!")
|
||||
elif result.get("batches_pushed", 0) > 0 and result.get("batches_failed", 0) > 0:
|
||||
print("\n⚠️ Some batches failed. Check the summary above.")
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
main()
|
||||
Reference in New Issue
Block a user