WIP - Streamlit UI, firecrawl - V0.5

This commit is contained in:
ajaysi
2024-06-11 17:27:50 +05:30
parent f2fa8cfb47
commit ccbaa0e4fa
13 changed files with 442 additions and 211 deletions

View File

@@ -0,0 +1,61 @@
import os
import google.generativeai as genai
from dotenv import load_dotenv
def load_environment():
"""Load environment variables from a .env file."""
load_dotenv()
def configure_google_api():
"""Configure the Google API for audio summarization."""
api_key = os.getenv("GEMINI_API_KEY")
if not api_key:
raise ValueError("Google API key not found. Please set the GEMINI_API_KEY environment variable.")
genai.configure(api_key=api_key)
def transcribe_audio(audio_file_path):
"""Summarize the audio using Google's Generative API.
Args:
audio_file_path (str): The path to the audio file to be summarized.
Returns:
str: The summary text of the audio.
Raises:
ValueError: If the audio file path is invalid or the API response is not successful.
Exception: For any other errors that occur during the process.
"""
try:
# Load environment variables and configure API
load_environment()
configure_google_api()
# Create generative model instance
model = genai.GenerativeModel("models/gemini-1.5-pro-latest")
audio_file = None
try:
# Upload the audio file
audio_file = genai.upload_file(path=audio_file_path)
except Exception as err:
print(err)
# Generate the summary
response = model.generate_content(
[
"Listen carefully to the given following audio file. Transcribe the following given audio.",
audio_file
]
)
# Check if the response contains text
if not hasattr(response, 'text'):
raise ValueError("The API response does not contain text.")
return response.text
except ValueError as ve:
print(f"ValueError: {ve}")
except FileNotFoundError:
print(f"FileNotFoundError: The audio file at {audio_file_path} does not exist.")
except Exception as e:
print(f"An error occurred: {e}")

View File

@@ -1,9 +1,12 @@
from pytube import YouTube
import os
import re
import sys
from pytube import YouTube
from loguru import logger
from openai import OpenAI
from tqdm import tqdm
import streamlit as st
from tenacity import (
retry,
@@ -11,15 +14,40 @@ from tenacity import (
wait_random_exponential,
) # for exponential backoff
from .gemini_audio_text import transcribe_audio
def progress_function(stream, chunk, bytes_remaining):
# Calculate the percentage completion
current = ((stream.filesize - bytes_remaining) / stream.filesize)
progress_bar.update(current - progress_bar.n) # Update the progress bar
def rename_file_with_underscores(file_path):
"""Rename a file by replacing spaces and special characters with underscores.
Args:
file_path (str): The original file path.
Returns:
str: The new file path with underscores.
"""
# Extract the directory and the filename
dir_name, original_filename = os.path.split(file_path)
# Replace spaces and special characters with underscores in the filename
new_filename = re.sub(r'[^\w\-_\.]', '_', original_filename)
# Create the new file path
new_file_path = os.path.join(dir_name, new_filename)
# Rename the file
os.rename(file_path, new_file_path)
return new_file_path
@retry(wait=wait_random_exponential(min=1, max=60), stop=stop_after_attempt(6))
def speech_to_text(video_url, output_path='.'):
def speech_to_text(video_url):
"""
Transcribes speech to text from a YouTube video URL using OpenAI's Whisper model.
@@ -33,75 +61,94 @@ def speech_to_text(video_url, output_path='.'):
Raises:
SystemExit: If a critical error occurs that prevents successful execution.
"""
try:
audio_file = None
if video_url.startswith("https://www.youtube.com/") or video_url.startswith("http://www.youtube.com/"):
logger.info(f"Accessing YouTube URL: {video_url}")
yt = YouTube(video_url, on_progress_callback=progress_function)
logger.info("Fetching the highest quality audio stream")
audio_stream = yt.streams.filter(only_audio=True).first()
if audio_stream is None:
logger.warning("No audio stream found for this video.")
return None
logger.info(f"Downloading audio for: {yt.title}")
global progress_bar
progress_bar = tqdm(total=1.0, unit='iB', unit_scale=True, desc=yt.title)
output_path = os.getenv("CONTENT_SAVE_DIR")
with st.status("Started Writing..", expanded=False) as status:
try:
audio_file = None
if video_url.startswith("https://www.youtube.com/") or video_url.startswith("http://www.youtube.com/"):
logger.info(f"Accessing YouTube URL: {video_url}")
status.update(label=f"Accessing YouTube URL: {video_url}")
yt = YouTube(video_url, on_progress_callback=progress_function)
logger.info("Fetching the highest quality audio stream")
status.update(label="Fetching the highest quality audio stream")
audio_stream = yt.streams.filter(only_audio=True).first()
if audio_stream is None:
logger.warning("No audio stream found for this video.")
st.warning("No audio stream found for this video.")
return None
logger.info(f"Downloading audio for: {yt.title}")
status.update(label=f"Downloading audio for: {yt.title}")
global progress_bar
progress_bar = tqdm(total=1.0, unit='iB', unit_scale=True, desc=yt.title)
try:
audio_filename = re.sub(r'[^\w\-_\.]', '_', yt.title) + '.mp4'
audio_file = audio_stream.download(
output_path=os.getenv("CONTENT_SAVE_DIR"),
filename=audio_filename)
#audio_file = rename_file_with_underscores(audio_file)
except Exception as err:
logger.error(f"Failed to download audio file: {audio_file}")
progress_bar.close()
logger.info(f"Audio downloaded: {yt.title} to {audio_file}")
status.update(label=f"Audio downloaded: {yt.title} to {output_path}")
# Audio filepath from local directory.
elif os.path.exists(audio_input):
audio_file = video_url
# Checking file size
max_file_size = 24 * 1024 * 1024 # 24MB
file_size = os.path.getsize(audio_file)
# Convert file size to MB for logging
file_size_MB = file_size / (1024 * 1024) # Convert bytes to MB
logger.info(f"Downloaded Audio Size is: {file_size_MB:.2f} MB")
status.update(label=f"Downloaded Audio Size is: {file_size_MB:.2f} MB")
if file_size > max_file_size:
logger.error("File size exceeds 24MB limit.")
# FIXME: We can chunk hour long videos, the code is not tested.
#long_video(audio_file)
sys.exit("File size limit exceeded.")
st.error("Audio File size limit exceeded.")
try:
audio_file = audio_stream.download(output_path)
except Exception as err:
logger.error(f"Failed to download audio file: {audio_file}")
progress_bar.close()
logger.info(f"Audio downloaded: {yt.title} to {output_path}")
# Audio filepath from local directory.
elif os.path.exists(audio_input):
audio_file = video_url
# Checking file size
max_file_size = 24 * 1024 * 1024 # 24MB
file_size = os.path.getsize(audio_file)
# Convert file size to MB for logging
file_size_MB = file_size / (1024 * 1024) # Convert bytes to MB
logger.info(f"Downloaded Audio Size is: {file_size_MB:.2f} MB")
if file_size > max_file_size:
logger.error("File size exceeds 24MB limit.")
# FIXME: We can chunk hour long videos, the code is not tested.
#long_video(audio_file)
sys.exit("File size limit exceeded.")
try:
logger.info("Initializing OpenAI client for transcription.")
client = OpenAI(api_key=os.getenv('OPENAI_API_KEY'))
logger.info("Transcribing using OpenAI's Whisper model.")
transcript = client.audio.transcriptions.create(
model="whisper-1",
file=open(audio_file, "rb", encoding="utf-8"),
response_format="text"
)
logger.info(f"\nYouTube video transcription:\n{yt.title}\n{transcript}\n")
return transcript, yt.title
status.update(label=f"Initializing OpenAI client for transcription: {audio_file}")
logger.info(f"Initializing OpenAI client for transcription: {audio_file}")
client = OpenAI(api_key=os.getenv('OPENAI_API_KEY'))
logger.info("Transcribing using OpenAI's Whisper model.")
transcript = client.audio.transcriptions.create(
model="whisper-1",
file=open(audio_file, "rb"),
response_format="text"
)
logger.info(f"\nYouTube video transcription:\n{yt.title}\n{transcript}\n")
status.update(label=f"\nYouTube video transcription:\n{yt.title}\n{transcript}\n")
return transcript, yt.title
except Exception as e:
logger.error(f"Failed in Whisper transcription: {e}")
st.warning(f"Failed in Openai Whisper transcription: {e}")
transcript = transcribe_audio(audio_file)
print(f"\n\n\n--- Tracribe: {transcript} ----\n\n\n")
return transcript, yt.title
except Exception as e:
logger.error(f"Failed in Whisper transcription: {e}")
sys.exit("Transcription failure.")
except Exception as e:
logger.error(f"An error occurred during YouTube video processing: {e}")
sys.exit("Video processing failure.")
finally:
try:
if os.path.exists(audio_file):
os.remove(audio_file)
logger.info("Temporary audio file removed.")
except PermissionError:
logger.error(f"Permission error: Cannot remove '{audio_file}'. Please make sure of necessary permissions.")
except Exception as e:
logger.error(f"An error occurred removing audio file: {e}")
st.error(f"An error occurred during YouTube video processing: {e}")
finally:
try:
if os.path.exists(audio_file):
os.remove(audio_file)
logger.info("Temporary audio file removed.")
except PermissionError:
st.error(f"Permission error: Cannot remove '{audio_file}'. Please make sure of necessary permissions.")
except Exception as e:
st.error(f"An error occurred removing audio file: {e}")
def long_video(temp_file_name):

View File

@@ -8,6 +8,7 @@ import os
import requests
from PIL import Image
from io import BytesIO
import streamlit as st
from .save_image import save_generated_image
@@ -18,7 +19,7 @@ def generate_stable_diffusion_image(prompt):
api_key = os.getenv("STABILITY_API_KEY")
if api_key is None:
raise Exception("Missing Stability API key.")
st.warning("Missing Stability API key.")
response = requests.post(
f"{api_host}/v1/generation/{engine_id}/text-to-image",

View File

@@ -11,6 +11,7 @@
import os
import sys
import datetime
import streamlit as st
import openai # OpenAI Python library to make API calls
from loguru import logger
@@ -56,6 +57,7 @@ def generate_image(user_prompt):
image_stored_at = generate_stable_diffusion_image(img_prompt)
except Exception as err:
logger.error(f"Failed to generate Image: {err}")
st.warning(f"Failed to generate Image: {err}")
return image_stored_at