#!/usr/bin/env python3 """ Git Split Push — Batch Large Files into Smaller Commits Detects push errors and automatically splits untracked/staged files into smaller batches to push successfully to GitHub/Gitea/GitLab. Usage: python3 batch-push.py # Auto-detect and batch python3 batch-push.py --max-size 20 # Max 20MB per batch python3 batch-push.py --dry-run # Show what would happen python3 batch-push.py --untracked-only # Only untracked files python3 batch-push.py --check-size # Just check total size """ import os import sys import subprocess import re from pathlib import Path from dataclasses import dataclass from typing import Optional @dataclass class FileInfo: """Information about a file to be pushed.""" path: str size: int # bytes staged: bool untracked: bool @property def size_mb(self) -> float: return self.size / (1024 * 1024) class GitSplitPush: """Split large git pushes into smaller batches.""" DEFAULT_MAX_SIZE_MB = 20 # Conservative limit (50MB server limit) def __init__(self, max_size_mb: float = None, dry_run: bool = False, verbose: bool = True): self.max_size_mb = max_size_mb or self.DEFAULT_MAX_SIZE_MB self.dry_run = dry_run self.verbose = verbose self.errors = [] self.pushed_batches = [] def run(self, command: list, capture: bool = True) -> tuple[int, str, str]: """Run a shell command and return exit code, stdout, stderr.""" try: if capture: result = subprocess.run( command, capture_output=True, text=True, cwd=self.get_git_root() ) return result.returncode, result.stdout, result.stderr else: result = subprocess.run(command, cwd=self.get_git_root()) return result.returncode, "", "" except Exception as e: return 1, "", str(e) def get_git_root(self) -> Optional[str]: """Find the git repository root.""" code, out, _ = self.run(["git", "rev-parse", "--show-toplevel"]) if code == 0: return out.strip() return os.getcwd() def is_git_repo(self) -> bool: """Check if we're in a git repository.""" code, _, _ = self.run(["git", "status"]) return code == 0 def get_current_branch(self) -> Optional[str]: """Get the current branch name.""" code, out, _ = self.run(["git", "branch", "--show-current"]) if code == 0 and out.strip(): return out.strip() # Try with rev-parse for detached HEAD code, out, _ = self.run(["git", "rev-parse", "--abbrev-ref", "HEAD"]) if code == 0: return out.strip() return None def get_status_files(self) -> list[FileInfo]: """Get all files that need to be pushed (staged and untracked).""" files = [] # Get staged files code, staged_out, _ = self.run(["git", "diff", "--cached", "--name-only"]) staged_files = staged_out.strip().split("\n") if staged_out.strip() else [] # Get untracked files code, untracked_out, _ = self.run(["git", "ls-files", "--others", "--exclude-standard"]) untracked_files = untracked_out.strip().split("\n") if untracked_out.strip() else [] # Get modified files (not staged) code, modified_out, _ = self.run(["git", "diff", "--name-only"]) modified_files = modified_out.strip().split("\n") if modified_out.strip() else [] git_root = self.get_git_root() # Process staged files for f in staged_files: if f: full_path = os.path.join(git_root, f) if git_root else f size = self.get_file_size(full_path) files.append(FileInfo(path=f, size=size, staged=True, untracked=False)) # Process untracked files for f in untracked_files: if f and f not in staged_files: # Skip if already staged full_path = os.path.join(git_root, f) if git_root else f size = self.get_file_size(full_path) files.append(FileInfo(path=f, size=size, staged=False, untracked=True)) # Process modified files for f in modified_files: if f and f not in staged_files and f not in untracked_files: full_path = os.path.join(git_root, f) if git_root else f size = self.get_file_size(full_path) files.append(FileInfo(path=f, size=size, staged=False, untracked=False)) return files def get_file_size(self, path: str) -> int: """Get file size in bytes.""" try: if os.path.isfile(path): return os.path.getsize(path) return 0 except: return 0 def get_total_size(self, files: list[FileInfo]) -> float: """Calculate total size of files in MB.""" return sum(f.size_mb for f in files) def check_push_size(self, files: list[FileInfo]) -> dict: """Check how many batches would be needed.""" total_size = self.get_total_size(files) batches_needed = self.calculate_batches(files) return { "total_files": len(files), "total_size_mb": total_size, "batches_needed": batches_needed, "max_size_mb": self.max_size_mb, "files_too_large": [f for f in files if f.size_mb > self.max_size_mb] } def calculate_batches(self, files: list[FileInfo]) -> int: """Calculate how many batches would be needed.""" batches = [] current_batch = [] current_size = 0 # Sort by size (largest first for easier chunking) sorted_files = sorted(files, key=lambda f: f.size, reverse=True) for f in sorted_files: # If single file exceeds max size, it needs its own batch if f.size_mb > self.max_size_mb: if current_batch: batches.append(current_batch) batches.append([f]) current_batch = [] current_size = 0 elif current_size + f.size_mb > self.max_size_mb: batches.append(current_batch) current_batch = [f] current_size = f.size_mb else: current_batch.append(f) current_size += f.size_mb if current_batch: batches.append(current_batch) return len(batches) def split_into_batches(self, files: list[FileInfo]) -> list[list[FileInfo]]: """Split files into batches under max size.""" batches = [] current_batch = [] current_size = 0 # Sort by size (largest first) sorted_files = sorted(files, key=lambda f: f.size, reverse=True) for f in sorted_files: if f.size_mb > self.max_size_mb: # Single file too large - add to a "too large" list if current_batch: batches.append(current_batch) current_batch = [] current_size = 0 # Add as single-file batch (will be skipped in push) batches.append([f]) elif current_size + f.size_mb > self.max_size_mb: batches.append(current_batch) current_batch = [f] current_size = f.size_mb else: current_batch.append(f) current_size += f.size_mb if current_batch: batches.append(current_batch) return batches def stage_files(self, files: list[FileInfo]): """Stage files for commit.""" paths = [f.path for f in files] code, out, err = self.run(["git", "add", "--"] + paths) return code == 0, out, err def commit_batch(self, batch_num: int, total_batches: int) -> bool: """Create a commit for the current batch.""" message = f"[split-push] Batch {batch_num}/{total_batches}" code, out, err = self.run(["git", "commit", "-m", message]) return code == 0 def push_batch(self, branch: str = None) -> tuple[bool, str, str]: """Push the current batch to remote.""" if not branch: branch = self.get_current_branch() code, out, err = self.run(["git", "push", "origin", branch]) return code == 0, out, err def check_remaining(self) -> list[FileInfo]: """Check how many files are still uncommitted.""" return self.get_status_files() def display_summary(self, summary: dict): """Display a summary of the push operation.""" print("\n" + "=" * 60) print("Git Split Push — Summary") print("=" * 60) print(f"\nTotal files to push: {summary['total_files']}") print(f"Total size: {summary['total_size_mb']:.2f} MB") print(f"Batch size limit: {self.max_size_mb} MB") print(f"Batches created: {summary['batches_pushed']}") print(f"Batches failed: {summary['batches_failed']}") if summary['files_too_large']: print(f"\nāš ļø Files exceeding {self.max_size_mb}MB (skipped):") for f in summary['files_too_large']: print(f" - {f.path} ({f.size_mb:.2f} MB)") if summary['files_too_large']: print(f"\nšŸ’” To push these files, either:") print(" 1. Split the file manually (e.g., split --bytes=40M large.zip)") print(" 2. Remove it from git history") print(" 3. Use Git LFS (requires server support)") print("\n" + "=" * 60) def run_split_push(self, untracked_only: bool = False) -> dict: """Main entry point - run the split push operation.""" if not self.is_git_repo(): return {"success": False, "error": "Not in a git repository"} branch = self.get_current_branch() if not branch: return {"success": False, "error": "Could not determine current branch"} # Get all files to push files = self.get_status_files() if untracked_only: files = [f for f in files if f.untracked] if not files: return {"success": True, "message": "Nothing to push", "batches_pushed": 0} # Check sizes check = self.check_push_size(files) if check['files_too_large'] and all(f.size_mb > 100 for f in check['files_too_large']): return { "success": False, "error": "Some files exceed 100MB. Use Git LFS or split manually.", "files_too_large": check['files_too_large'] } # Split into batches batches = self.split_into_batches(files) if self.dry_run: print("\nšŸ” Dry run - showing what would happen:\n") for i, batch in enumerate(batches): batch_size = sum(f.size_mb for f in batch) too_large = any(f.size_mb > self.max_size_mb for f in batch) status = "āš ļø TOO LARGE" if too_large else "āœ“" print(f"Batch {i+1}: {status} ({batch_size:.2f} MB)") for f in batch: print(f" - {f.path} ({f.size_mb:.2f} MB)") print() return {"success": True, "dry_run": True, "batches": len(batches)} # Push each batch results = [] success_count = 0 fail_count = 0 files_too_large = [] print(f"\nšŸš€ Starting split push ({len(batches)} batches, max {self.max_size_mb} MB each):\n") for i, batch in enumerate(batches, 1): batch_size = sum(f.size_mb for f in batch) # Check for oversized files oversized = [f for f in batch if f.size_mb > self.max_size_mb] if oversized: files_too_large.extend(oversized) print(f"āš ļø Batch {i}/{len(batches)} SKIPPED (file too large)") for f in oversized: print(f" - {f.path} ({f.size_mb:.2f} MB)") fail_count += 1 continue print(f"šŸ“¦ Batch {i}/{len(batches)}: {batch_size:.2f} MB ({len(batch)} files)") # Stage files staged_ok, _, _ = self.stage_files(batch) if not staged_ok: print(f" āŒ Failed to stage files") fail_count += 1 continue # Commit commit_ok = self.commit_batch(i, len(batches)) if not commit_ok: print(f" āŒ Failed to commit batch") fail_count += 1 continue # Push push_ok, push_out, push_err = self.push_batch(branch) if push_ok: print(f" āœ… Pushed batch {i}") success_count += 1 self.pushed_batches.append(i) else: print(f" āŒ Push failed: {push_err[:200] if push_err else 'Unknown error'}") # Try to reset the failed commit self.run(["git", "reset", "--soft", "HEAD~1"]) fail_count += 1 summary = { "success": fail_count == 0, "total_files": len(files), "total_size_mb": check['total_size_mb'], "batches_pushed": success_count, "batches_failed": fail_count, "batches_total": len(batches), "files_too_large": files_too_large } self.display_summary(summary) return summary def detect_push_error(error_output: str = None) -> bool: """Check if output contains a push size error.""" if error_output is None: # Check the last git push output code, out, err = subprocess.run( ["git", "push"], capture_output=True, text=True ) error_output = out + err error_patterns = [ "pack exceeds maximum allowed size", "remote end hung up unexpectedly", "fatal: protocol error", "RPC failed; HTTP 413", "413 Request Entity Too Large", "error: packfile is too large", ] for pattern in error_patterns: if pattern.lower() in error_output.lower(): return True return False def main(): import argparse parser = argparse.ArgumentParser( description="Split large git pushes into smaller batches", formatter_class=argparse.RawDescriptionHelpFormatter, epilog=""" Examples: %(prog)s # Auto-detect and batch all changes %(prog)s --max-size 30 # Use 30MB max per batch %(prog)s --dry-run # Show what would happen %(prog)s --untracked-only # Only push untracked files %(prog)s --check-size # Just check total size and exit This tool helps when pushing to GitHub/Gitea/GitLab fails with: "fatal: the remote end hung up unexpectedly" "pack exceeds maximum allowed size" """ ) parser.add_argument( "--max-size", type=float, default=None, help="Maximum size per batch in MB (default: 20)" ) parser.add_argument( "--dry-run", action="store_true", help="Show what would happen without making changes" ) parser.add_argument( "--untracked-only", action="store_true", help="Only process untracked files (ignore staged changes)" ) parser.add_argument( "--check-size", action="store_true", help="Just check total size and exit" ) parser.add_argument( "--verbose", action="store_true", default=True, help="Show detailed output (default: on)" ) args = parser.parse_args() split_push = GitSplitPush( max_size_mb=args.max_size, dry_run=args.dry_run, verbose=args.verbose ) if args.check_size: files = split_push.get_status_files() check = split_push.check_push_size(files) print(f"\nšŸ“Š Push Size Analysis:") print(f" Total files: {check['total_files']}") print(f" Total size: {check['total_size_mb']:.2f} MB") print(f" Batches needed (at {check['max_size_mb']}MB): {check['batches_needed']}") if check['files_too_large']: print(f"\n āš ļø Files too large for a single batch:") for f in check['files_too_large']: print(f" - {f.path}: {f.size_mb:.2f} MB") return result = split_push.run_split_push(untracked_only=args.untracked_only) if result.get("dry_run"): return if not result.get("success") and result.get("error"): print(f"\nāŒ {result['error']}") sys.exit(1) if result.get("batches_pushed", 0) > 0 and result.get("batches_failed", 0) == 0: print("\nāœ… All batches pushed successfully!") elif result.get("batches_pushed", 0) > 0 and result.get("batches_failed", 0) > 0: print("\nāš ļø Some batches failed. Check the summary above.") if __name__ == "__main__": main()