YT to blog, bug fixes - WIP

This commit is contained in:
ajaysi
2024-10-12 07:59:13 +05:30
parent 16bcd86bb7
commit e6f60feba5
6 changed files with 170 additions and 133 deletions

View File

@@ -1,61 +1,101 @@
import os
import sys
import google.generativeai as genai
from dotenv import load_dotenv
from loguru import logger
logger.remove()
logger.add(sys.stdout,
colorize=True,
format="<level>{level}</level>|<green>{file}:{line}:{function}</green>| {message}"
)
def load_environment():
"""Load environment variables from a .env file."""
"""Loads environment variables from a .env file."""
load_dotenv()
logger.info("Environment variables loaded successfully.")
def configure_google_api():
"""Configure the Google API for audio summarization."""
"""Configures the Google Gemini API for audio transcription.
Raises:
ValueError: If the GEMINI_API_KEY environment variable is not set.
"""
api_key = os.getenv("GEMINI_API_KEY")
if not api_key:
raise ValueError("Google API key not found. Please set the GEMINI_API_KEY environment variable.")
error_message = "Google API key not found. Please set the GEMINI_API_KEY environment variable."
logger.error(error_message)
raise ValueError(error_message)
genai.configure(api_key=api_key)
logger.info("Google Gemini API configured successfully.")
def transcribe_audio(audio_file_path):
"""Summarize the audio using Google's Generative API.
"""
Transcribes audio using Google's Gemini Pro model.
Args:
audio_file_path (str): The path to the audio file to be summarized.
audio_file_path (str): The path to the audio file to be transcribed.
Returns:
str: The summary text of the audio.
str: The transcribed text from the audio.
Returns None if transcription fails.
Raises:
ValueError: If the audio file path is invalid or the API response is not successful.
Exception: For any other errors that occur during the process.
FileNotFoundError: If the audio file is not found.
"""
try:
# Load environment variables and configure API
# Load environment variables and configure the Google API
load_environment()
configure_google_api()
# Create generative model instance
model = genai.GenerativeModel("models/gemini-1.5-pro-latest")
audio_file = None
try:
# Upload the audio file
audio_file = genai.upload_file(path=audio_file_path)
except Exception as err:
print(err)
# Generate the summary
response = model.generate_content(
[
"Listen carefully to the given following audio file. Transcribe the following given audio.",
audio_file
]
)
# Check if the response contains text
if not hasattr(response, 'text'):
raise ValueError("The API response does not contain text.")
return response.text
logger.info(f"Attempting to transcribe audio file: {audio_file_path}")
# Check if file exists
if not os.path.exists(audio_file_path):
error_message = f"FileNotFoundError: The audio file at {audio_file_path} does not exist."
logger.error(error_message)
raise FileNotFoundError(error_message)
# Initialize a Gemini model appropriate for your use case.
model = genai.GenerativeModel(model_name="gemini-1.5-flash")
# Upload the audio file
try:
audio_file = genai.upload_file(audio_file_path)
logger.info(f"Audio file uploaded successfully: {audio_file=}")
except FileNotFoundError:
error_message = f"FileNotFoundError: The audio file at {audio_file_path} does not exist."
logger.error(error_message)
raise FileNotFoundError(error_message)
except Exception as e:
logger.error(f"Error uploading audio file: {e}")
return None
# Generate the transcription
try:
response = model.generate_content([
"Transcribe the following audio:",
audio_file
])
# Check for valid response and extract text
if response and hasattr(response, 'text'):
transcript = response.text
logger.info(f"Transcription successful:\n{transcript}")
return transcript
else:
logger.warning("Transcription failed: Invalid or empty response from API.")
return None
except Exception as e:
logger.error(f"Error during transcription: {e}")
return None
except ValueError as ve:
print(f"ValueError: {ve}")
except FileNotFoundError:
print(f"FileNotFoundError: The audio file at {audio_file_path} does not exist.")
except Exception as e:
print(f"An error occurred: {e}")
logger.error(f"An unexpected error occurred: {e}")
return None

View File

@@ -2,7 +2,7 @@ import os
import re
import sys
from pytube import YouTube
from pytubefix import YouTube
from loguru import logger
from openai import OpenAI
from tqdm import tqdm
@@ -22,6 +22,7 @@ def progress_function(stream, chunk, bytes_remaining):
current = ((stream.filesize - bytes_remaining) / stream.filesize)
progress_bar.update(current - progress_bar.n) # Update the progress bar
def rename_file_with_underscores(file_path):
"""Rename a file by replacing spaces and special characters with underscores.
@@ -62,22 +63,32 @@ def speech_to_text(video_url):
SystemExit: If a critical error occurs that prevents successful execution.
"""
output_path = os.getenv("CONTENT_SAVE_DIR")
yt = None
audio_file = None
with st.status("Started Writing..", expanded=False) as status:
try:
audio_file = None
if video_url.startswith("https://www.youtube.com/") or video_url.startswith("http://www.youtube.com/"):
logger.info(f"Accessing YouTube URL: {video_url}")
status.update(label=f"Accessing YouTube URL: {video_url}")
yt = YouTube(video_url, on_progress_callback=progress_function)
logger.info("Fetching the highest quality audio stream")
status.update(label="Fetching the highest quality audio stream")
audio_stream = yt.streams.filter(only_audio=True).first()
try:
vid_id = video_url.split("=")[1]
yt = YouTube(video_url, on_progress_callback=progress_function)
except Exception as err:
logger.error(f"Failed to get pytube stream object: {err}")
st.stop()
logger.info(f"Fetching the highest quality audio stream:{yt.title}")
status.update(label=f"Fetching the highest quality audio stream: {yt.title}")
try:
audio_stream = yt.streams.filter(only_audio=True).first()
except Exception as err:
logger.error(f"Failed to Download Youtube Audio: {err}")
st.stop()
if audio_stream is None:
logger.warning("No audio stream found for this video.")
st.warning("No audio stream found for this video.")
return None
st.stop()
logger.info(f"Downloading audio for: {yt.title}")
status.update(label=f"Downloading audio for: {yt.title}")
@@ -113,9 +124,13 @@ def speech_to_text(video_url):
# FIXME: We can chunk hour long videos, the code is not tested.
#long_video(audio_file)
sys.exit("File size limit exceeded.")
st.error("Audio File size limit exceeded.")
st.error("Audio File size limit exceeded. File a fixme/issues at ALwrity github.")
try:
print(f"Audio File: {audio_file}")
transcript = transcribe_audio(audio_file)
print(f"\n\n\n--- Tracribe: {transcript} ----\n\n\n")
exit(1)
status.update(label=f"Initializing OpenAI client for transcription: {audio_file}")
logger.info(f"Initializing OpenAI client for transcription: {audio_file}")
client = OpenAI(api_key=os.getenv('OPENAI_API_KEY'))
@@ -170,7 +185,7 @@ def long_video(temp_file_name):
video_url (str): URL of the YouTube video to be transcribed.
"""
# Extract audio and split into chunks
app.logger.info(f"Processing the YT video: {temp_file_name}")
logger.info(f"Processing the YT video: {temp_file_name}")
full_audio = mp.AudioFileClip(temp_file_name)
duration = full_audio.duration
chunk_length = 600 # 10 minutes in seconds