457 lines
16 KiB
Python
457 lines
16 KiB
Python
"""
|
|
Programmatic B-Roll Composer
|
|
Layered composition pipeline: Background + Chart + Avatar Circle + Text Overlays
|
|
"""
|
|
|
|
import json
|
|
import numpy as np
|
|
from pathlib import Path
|
|
from dataclasses import dataclass, field
|
|
from typing import Optional
|
|
import matplotlib
|
|
matplotlib.use("Agg")
|
|
import matplotlib.pyplot as plt
|
|
import matplotlib.patches as mpatches
|
|
from PIL import Image, ImageDraw, ImageFont
|
|
from moviepy.editor import (
|
|
VideoFileClip, ImageClip, CompositeVideoClip,
|
|
TextClip, ColorClip, concatenate_videoclips,
|
|
)
|
|
import moviepy.video.fx.all as vfx
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Crossfade concat (Option 1: crossfadein + negative padding)
|
|
# ---------------------------------------------------------------------------
|
|
|
|
def crossfade_concat(scenes: list, fade_dur: float = 0.5) -> CompositeVideoClip:
|
|
"""
|
|
Concatenate scenes with a dissolve transition between each pair.
|
|
|
|
Each clip (except the first) gets a crossfadein effect.
|
|
padding=-fade_dur overlaps consecutive clips so the fade actually fires
|
|
instead of creating a black gap. set_duration on every scene is
|
|
mandatory — CompositeVideoClip.duration can be ambiguous without it,
|
|
which makes the overlap math wrong.
|
|
"""
|
|
faded = []
|
|
for i, clip in enumerate(scenes):
|
|
c = clip
|
|
if i > 0:
|
|
c = c.fx(vfx.crossfadein, fade_dur)
|
|
faded.append(c)
|
|
return concatenate_videoclips(faded, padding=-fade_dur, method="compose")
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Data structures
|
|
# ---------------------------------------------------------------------------
|
|
|
|
@dataclass
|
|
class Insight:
|
|
key_insight: str
|
|
supporting_stat: str
|
|
visual_cue: str # bar_chart_comparison | line_trend | bullet_points | full_avatar
|
|
audio_tone: str
|
|
chart_data: dict = field(default_factory=dict)
|
|
duration: float = 10.0
|
|
|
|
|
|
@dataclass
|
|
class SceneAssets:
|
|
background_img: str
|
|
chart_img: Optional[str] = None
|
|
avatar_video: Optional[str] = None
|
|
bullet_img: Optional[str] = None
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Chart generator (Matplotlib → PNG with transparency)
|
|
# ---------------------------------------------------------------------------
|
|
|
|
CHART_STYLE = {
|
|
"bg": "#0D0D0D",
|
|
"bar_before": "#2E4057",
|
|
"bar_after": "#E63946",
|
|
"text": "#F1F1EF",
|
|
"grid": "#2A2A2A",
|
|
"accent": "#E63946",
|
|
}
|
|
|
|
|
|
def make_bar_chart(data: dict, out_path: str, title: str = "") -> str:
|
|
"""Render a side-by-side comparison bar chart. Returns output path."""
|
|
labels = data.get("labels", [])
|
|
before = data.get("before", [])
|
|
after = data.get("after", [])
|
|
|
|
fig, ax = plt.subplots(figsize=(8, 4.5), facecolor="none")
|
|
ax.set_facecolor("none")
|
|
|
|
x = np.arange(len(labels))
|
|
w = 0.35
|
|
bars_b = ax.bar(x - w / 2, before, w, color=CHART_STYLE["bar_before"],
|
|
label="Before", zorder=3, edgecolor="none")
|
|
bars_a = ax.bar(x + w / 2, after, w, color=CHART_STYLE["bar_after"],
|
|
label="After", zorder=3, edgecolor="none")
|
|
|
|
ax.set_xticks(x)
|
|
ax.set_xticklabels(labels, color=CHART_STYLE["text"], fontsize=11)
|
|
ax.tick_params(axis="y", colors=CHART_STYLE["text"])
|
|
ax.spines[:].set_visible(False)
|
|
ax.yaxis.grid(True, color=CHART_STYLE["grid"], linewidth=0.6, zorder=0)
|
|
ax.set_axisbelow(True)
|
|
|
|
# Value labels on bars
|
|
for bar in [*bars_b, *bars_a]:
|
|
h = bar.get_height()
|
|
ax.text(bar.get_x() + bar.get_width() / 2, h + 0.5, f"{h:.0f}%",
|
|
ha="center", va="bottom", color=CHART_STYLE["text"], fontsize=9,
|
|
fontweight="bold")
|
|
|
|
legend = ax.legend(frameon=False, labelcolor=CHART_STYLE["text"],
|
|
fontsize=10, loc="upper left")
|
|
if title:
|
|
ax.set_title(title, color=CHART_STYLE["text"], fontsize=13,
|
|
fontweight="bold", pad=12)
|
|
|
|
fig.tight_layout(pad=0.5)
|
|
fig.savefig(out_path, dpi=150, transparent=True, bbox_inches="tight")
|
|
plt.close(fig)
|
|
return out_path
|
|
|
|
|
|
def make_line_trend(data: dict, out_path: str, title: str = "") -> str:
|
|
"""Render a trend line chart. Returns output path."""
|
|
x_vals = data.get("x", [])
|
|
y_vals = data.get("y", [])
|
|
|
|
fig, ax = plt.subplots(figsize=(8, 4.5), facecolor="none")
|
|
ax.set_facecolor("none")
|
|
ax.plot(x_vals, y_vals, color=CHART_STYLE["accent"],
|
|
linewidth=2.5, marker="o", markersize=7, zorder=3)
|
|
ax.fill_between(x_vals, y_vals, alpha=0.12, color=CHART_STYLE["accent"])
|
|
ax.spines[:].set_visible(False)
|
|
ax.tick_params(colors=CHART_STYLE["text"])
|
|
ax.yaxis.grid(True, color=CHART_STYLE["grid"], linewidth=0.6, zorder=0)
|
|
if title:
|
|
ax.set_title(title, color=CHART_STYLE["text"], fontsize=13,
|
|
fontweight="bold", pad=12)
|
|
fig.tight_layout(pad=0.5)
|
|
fig.savefig(out_path, dpi=150, transparent=True, bbox_inches="tight")
|
|
plt.close(fig)
|
|
return out_path
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Text / Bullet overlay (Pillow → PNG)
|
|
# ---------------------------------------------------------------------------
|
|
|
|
def make_bullet_overlay(lines: list[str], out_path: str,
|
|
width: int = 900, font_size: int = 32) -> str:
|
|
"""Render bullet points on a semi-transparent dark pill. Returns path."""
|
|
padding = 32
|
|
line_h = font_size + 16
|
|
img_h = padding * 2 + len(lines) * line_h + 12
|
|
img = Image.new("RGBA", (width, img_h), (0, 0, 0, 0))
|
|
draw = ImageDraw.Draw(img)
|
|
|
|
# Semi-transparent background pill
|
|
draw.rounded_rectangle([0, 0, width - 1, img_h - 1],
|
|
radius=18, fill=(10, 10, 10, 185))
|
|
|
|
try:
|
|
font = ImageFont.truetype("/usr/share/fonts/truetype/dejavu/DejaVuSans-Bold.ttf",
|
|
font_size)
|
|
except OSError:
|
|
font = ImageFont.load_default()
|
|
|
|
y = padding
|
|
for line in lines:
|
|
draw.text((padding + 18, y), f"• {line}", font=font, fill=(241, 241, 239, 255))
|
|
y += line_h
|
|
|
|
img.save(out_path, format="PNG")
|
|
return out_path
|
|
|
|
|
|
def make_insight_card(insight: str, stat: str, out_path: str,
|
|
width: int = 960, height: int = 200) -> str:
|
|
"""Render a bold insight card (headline + supporting stat). Returns path."""
|
|
img = Image.new("RGBA", (width, height), (0, 0, 0, 0))
|
|
draw = ImageDraw.Draw(img)
|
|
draw.rounded_rectangle([0, 0, width - 1, height - 1],
|
|
radius=14, fill=(10, 10, 10, 200))
|
|
|
|
# Red accent bar
|
|
draw.rectangle([28, 24, 36, height - 24], fill=(230, 57, 70, 255))
|
|
|
|
try:
|
|
font_lg = ImageFont.truetype(
|
|
"/usr/share/fonts/truetype/dejavu/DejaVuSans-Bold.ttf", 34)
|
|
font_sm = ImageFont.truetype(
|
|
"/usr/share/fonts/truetype/dejavu/DejaVuSans.ttf", 20)
|
|
except OSError:
|
|
font_lg = font_sm = ImageFont.load_default()
|
|
|
|
draw.text((58, 36), insight, font=font_lg, fill=(241, 241, 239, 255))
|
|
draw.text((58, 90), stat, font=font_sm, fill=(180, 180, 178, 230))
|
|
|
|
img.save(out_path, format="PNG")
|
|
return out_path
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Circular avatar mask
|
|
# ---------------------------------------------------------------------------
|
|
|
|
def apply_circle_mask(clip: VideoFileClip, diameter: int) -> VideoFileClip:
|
|
"""Resize clip and apply a circular alpha mask."""
|
|
clip = clip.resize(height=diameter)
|
|
w, h = clip.size
|
|
|
|
# Build a circular mask array (1 = opaque, 0 = transparent)
|
|
Y, X = np.ogrid[:h, :w]
|
|
cx, cy = w / 2, h / 2
|
|
mask_arr = ((X - cx) ** 2 + (Y - cy) ** 2 <= (min(w, h) / 2) ** 2).astype(float)
|
|
|
|
mask_clip = ImageClip(mask_arr, ismask=True).set_duration(clip.duration)
|
|
return clip.set_mask(mask_clip)
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Ken Burns zoom effect
|
|
# ---------------------------------------------------------------------------
|
|
|
|
def ken_burns(clip: ImageClip, zoom_ratio: float = 0.08) -> ImageClip:
|
|
"""Apply a slow zoom-in over the clip duration."""
|
|
def zoom_frame(get_frame, t):
|
|
frame = get_frame(t)
|
|
frac = 1 + zoom_ratio * (t / clip.duration)
|
|
h, w = frame.shape[:2]
|
|
new_h, new_w = int(h / frac), int(w / frac)
|
|
y1 = (h - new_h) // 2
|
|
x1 = (w - new_w) // 2
|
|
cropped = frame[y1:y1 + new_h, x1:x1 + new_w]
|
|
return np.array(Image.fromarray(cropped).resize((w, h), Image.LANCZOS))
|
|
|
|
return clip.fl(zoom_frame, apply_to=["mask"])
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Scene builders (one per visual_cue type)
|
|
# ---------------------------------------------------------------------------
|
|
|
|
def build_data_scene(assets: SceneAssets, insight: Insight) -> CompositeVideoClip:
|
|
"""
|
|
Layout: Background (Ken Burns) + Chart (fade-in) + Avatar circle (corner) + Insight card
|
|
"""
|
|
d = insight.duration
|
|
layers = []
|
|
|
|
# 1. Background
|
|
bg = (ImageClip(assets.background_img)
|
|
.set_duration(d)
|
|
.resize(height=1080))
|
|
bg = ken_burns(bg)
|
|
bg = bg.fx(vfx.lum_contrast, 0, -40) # darken 40 units
|
|
layers.append(bg)
|
|
|
|
# 2. Programmatic chart
|
|
if assets.chart_img:
|
|
chart = (ImageClip(assets.chart_img)
|
|
.set_duration(d - 1.5)
|
|
.set_start(0.5)
|
|
.resize(width=700)
|
|
.set_position(("center", 180))
|
|
.fx(vfx.fadein, 0.6)
|
|
.fx(vfx.fadeout, 0.4))
|
|
layers.append(chart)
|
|
|
|
# 3. Insight card at bottom
|
|
card_path = "/tmp/insight_card.png"
|
|
make_insight_card(insight.key_insight, insight.supporting_stat, card_path)
|
|
card = (ImageClip(card_path)
|
|
.set_duration(d - 1)
|
|
.set_start(0.5)
|
|
.set_position(("center", 820))
|
|
.fx(vfx.fadein, 0.5))
|
|
layers.append(card)
|
|
|
|
# 4. Avatar circle (bottom-right corner)
|
|
if assets.avatar_video:
|
|
avatar_raw = VideoFileClip(assets.avatar_video).subclip(0, d)
|
|
avatar = apply_circle_mask(avatar_raw, diameter=240)
|
|
avatar = avatar.set_position((bg.w - 280, bg.h - 280))
|
|
layers.append(avatar)
|
|
|
|
# set_duration is required: CompositeVideoClip infers duration from its
|
|
# constituent clips, which can be ambiguous when sub-clips have set_start
|
|
# offsets. Without this, crossfade_concat's overlap math goes wrong.
|
|
return CompositeVideoClip(layers, size=bg.size).set_duration(d)
|
|
|
|
|
|
def build_bullet_scene(assets: SceneAssets, insight: Insight,
|
|
bullets: list[str]) -> CompositeVideoClip:
|
|
"""
|
|
Layout: AI image (Ken Burns) + Bullet overlay + Avatar circle
|
|
"""
|
|
d = insight.duration
|
|
layers = []
|
|
|
|
bg = (ImageClip(assets.background_img)
|
|
.set_duration(d)
|
|
.resize(height=1080))
|
|
bg = ken_burns(bg, zoom_ratio=0.05)
|
|
bg = bg.fx(vfx.lum_contrast, 0, -50)
|
|
layers.append(bg)
|
|
|
|
bullet_path = "/tmp/bullets.png"
|
|
make_bullet_overlay(bullets, bullet_path, width=860)
|
|
bullets_clip = (ImageClip(bullet_path)
|
|
.set_duration(d - 1)
|
|
.set_start(0.5)
|
|
.set_position(("center", "center"))
|
|
.fx(vfx.fadein, 0.7))
|
|
layers.append(bullets_clip)
|
|
|
|
if assets.avatar_video:
|
|
avatar_raw = VideoFileClip(assets.avatar_video).subclip(0, d)
|
|
avatar = apply_circle_mask(avatar_raw, diameter=200)
|
|
avatar = avatar.set_position((bg.w - 240, bg.h - 240))
|
|
layers.append(avatar)
|
|
|
|
return CompositeVideoClip(layers, size=bg.size).set_duration(d)
|
|
|
|
|
|
def build_full_avatar_scene(assets: SceneAssets, insight: Insight) -> VideoFileClip:
|
|
"""Full-screen avatar — the expensive 'Hook' scene. No overlay."""
|
|
d = insight.duration
|
|
avatar = VideoFileClip(assets.avatar_video).subclip(0, d)
|
|
return avatar.resize(height=1080).set_duration(d)
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Scene dispatcher — maps visual_cue → builder
|
|
# ---------------------------------------------------------------------------
|
|
|
|
def dispatch_scene(insight: Insight, assets: SceneAssets,
|
|
bullet_lines: Optional[list[str]] = None) -> CompositeVideoClip:
|
|
cue = insight.visual_cue
|
|
|
|
if cue == "full_avatar":
|
|
return build_full_avatar_scene(assets, insight)
|
|
|
|
elif cue in ("bar_chart_comparison", "line_trend"):
|
|
chart_path = "/tmp/chart.png"
|
|
if cue == "bar_chart_comparison":
|
|
make_bar_chart(insight.chart_data, chart_path,
|
|
title=insight.key_insight)
|
|
else:
|
|
make_line_trend(insight.chart_data, chart_path,
|
|
title=insight.key_insight)
|
|
assets.chart_img = chart_path
|
|
return build_data_scene(assets, insight)
|
|
|
|
elif cue == "bullet_points":
|
|
lines = bullet_lines or [insight.key_insight, insight.supporting_stat]
|
|
return build_bullet_scene(assets, insight, lines)
|
|
|
|
else:
|
|
# Fallback: data scene without chart
|
|
return build_data_scene(assets, insight)
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Master compositor — assembles all scenes into one video
|
|
# ---------------------------------------------------------------------------
|
|
|
|
def compose_video(scenes: list, output_path: str = "output.mp4",
|
|
fps: int = 24, fade_dur: float = 0.5) -> str:
|
|
"""Concatenate scenes with crossfade transitions and write final video file."""
|
|
final = crossfade_concat(scenes, fade_dur=fade_dur)
|
|
final.write_videofile(
|
|
output_path,
|
|
fps=fps,
|
|
codec="libx264",
|
|
audio_codec="aac",
|
|
threads=4,
|
|
preset="fast",
|
|
logger=None,
|
|
)
|
|
return output_path
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# JSON bridge — LLM insight → assets + scene
|
|
# ---------------------------------------------------------------------------
|
|
|
|
def pipeline_from_json(insight_json: str,
|
|
background_img: str,
|
|
avatar_video: Optional[str] = None) -> str:
|
|
"""
|
|
Full pipeline:
|
|
1. Parse LLM insight JSON
|
|
2. Generate chart / overlay assets
|
|
3. Build scene
|
|
4. Write video
|
|
Returns path to output video.
|
|
"""
|
|
data = json.loads(insight_json)
|
|
insight = Insight(**{k: data[k] for k in Insight.__dataclass_fields__ if k in data})
|
|
assets = SceneAssets(background_img=background_img, avatar_video=avatar_video)
|
|
scene = dispatch_scene(insight, assets,
|
|
bullet_lines=data.get("bullet_lines"))
|
|
out = f"/tmp/scene_{insight.visual_cue}.mp4"
|
|
compose_video([scene], output_path=out)
|
|
return out
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Demo / smoke-test (no real media files needed for chart generation)
|
|
# ---------------------------------------------------------------------------
|
|
|
|
if __name__ == "__main__":
|
|
# --- Test 1: Chart PNG generation only ---
|
|
sample_bar_data = {
|
|
"labels": ["Content Velocity", "CTR", "Engagement", "Cost/Lead"],
|
|
"before": [30, 22, 18, 60],
|
|
"after": [72, 34, 41, 38],
|
|
}
|
|
chart_out = make_bar_chart(
|
|
sample_bar_data,
|
|
"/tmp/demo_chart.png",
|
|
title="AI Tools Impact: Before vs After (2025)",
|
|
)
|
|
print(f"Chart saved → {chart_out}")
|
|
|
|
# --- Test 2: Bullet overlay PNG ---
|
|
bullets = [
|
|
"AI reduced content cycles by 40% in 2025",
|
|
"HubSpot: 12% lift in CTR with AI-assisted copy",
|
|
"Video production cost down 3x with hybrid pipeline",
|
|
]
|
|
bullet_out = make_bullet_overlay(bullets, "/tmp/demo_bullets.png")
|
|
print(f"Bullets saved → {bullet_out}")
|
|
|
|
# --- Test 3: Insight card PNG ---
|
|
card_out = make_insight_card(
|
|
"AI tools reduced content cycles by 40%",
|
|
"HubSpot 2026 report — 12% lift in CTR",
|
|
"/tmp/demo_card.png",
|
|
)
|
|
print(f"Insight card saved → {card_out}")
|
|
|
|
# --- Test 4: JSON bridge (chart only, no video files required) ---
|
|
sample_json = json.dumps({
|
|
"key_insight": "AI reduced production time by 40%",
|
|
"supporting_stat": "HubSpot 2026: 12% CTR lift",
|
|
"visual_cue": "bar_chart_comparison",
|
|
"audio_tone": "authoritative_and_surprising",
|
|
"duration": 8.0,
|
|
"chart_data": sample_bar_data,
|
|
})
|
|
print("\nSample Insight JSON:\n", sample_json)
|
|
print("\nAll asset generation tests passed.")
|
|
print("To run full video composition, supply real background_img and avatar_video paths.")
|