gcp.py

from datetime import timedelta
from google.cloud import speech
from google.cloud import storage

def transcribe_gcs_with_language_detection(gcs_uri):
    client = speech.SpeechClient()

    audio = speech.RecognitionAudio(uri=gcs_uri)
    config = speech.RecognitionConfig(        
        encoding=speech.RecognitionConfig.AudioEncoding.MP3,
        sample_rate_hertz=16000,
        language_code="en-US",
        alternative_language_codes=["ja-JP", "en-US"],
    )

    operation = client.long_running_recognize(config=config, audio=audio)

    print("Waiting for operation to complete...")
    response = operation.result(timeout=600)

    if response.total_billed_time.seconds == 0:
        return "No speech detected in the audio."    

    output = {
        'language_code': response.results[0].language_code,
        'billed_duration': response.total_billed_time.seconds,
        'transcript': response.results
    }
    return output

def list_files_in_gcs_bucket_with_uri(bucket_name, extensions, prefix=None):
    storage_client = storage.Client()
    bucket = storage_client.get_bucket(bucket_name)
    blobs = bucket.list_blobs(prefix=prefix)

    blob_list = []
    for blob in blobs:
        if prefix is None:
            # Checks for files at root level if no prefix is specified
            if '/' not in blob.name or blob.name.endswith('/'):
                if any(blob.name.endswith(ext) for ext in extensions):
                    # print(f"gs://{bucket_name}/{blob.name}")
                    blob_list.append(f"gs://{bucket_name}/{blob.name}")
        else:
            # Lists files in the specified folder or prefix
            if any(blob.name.endswith(ext) for ext in extensions):
                blob_list.append(f"gs://{bucket_name}/{blob.name}")
    return blob_list

def convert_response_to_srt(respnse_results):
    # Helper function to format timedelta objects into SRT time format
    def format_timedelta(td):
        hours, remainder = divmod(int(td.total_seconds()), 3600)
        minutes, seconds = divmod(remainder, 60)
        milliseconds = int(td.microseconds / 1000)
        return f"{hours:02}:{minutes:02}:{seconds:02},{milliseconds:03}"

    srt_content = []
    
    # Initialize an arbitrary start time, will be adjusted as per each segment's actual timing
    previous_end_time = timedelta(0)

    for i, result in enumerate(respnse_results, start=1):
        # Extract end time of the current result segment
        end_time_seconds = result.result_end_time.seconds
        end_time_nanos = result.result_end_time.microseconds
        end_time = timedelta(seconds=end_time_seconds, microseconds=end_time_nanos/1000)

        # Assume the segment starts immediately after the previous segment ends
        start_time = previous_end_time
        # Update the previous_end_time to the current segment's end time for the next iteration
        previous_end_time = end_time

        # Format times into SRT compatible format
        start_srt = format_timedelta(start_time)
        end_srt = format_timedelta(end_time)
        transcript = result.alternatives[0].transcript.strip()

        srt_content.append(f"{i}\n{start_srt} --> {end_srt}\n{transcript}\n")

    return "\n".join(srt_content)