Compare commits

...

10 Commits

Author SHA1 Message Date
07c4728bbf bumped version 2019-07-18 12:22:38 +02:00
327ff18726 linted a few modules, added a check that the language_code exists when calling Transcriber.transcribe, made that method non-abstract to do so. Added a test for this. Extracted a few things to fixtures. 2019-07-18 12:19:42 +02:00
e7258f50c9 fixed docstring for 'list' option 2019-07-17 20:27:02 +02:00
b01ca332e8 updated README and bumped version 2019-03-25 23:16:43 +01:00
d8f90cd98a Add support for other languages.
Add CLI options for specifying language as well as for getting a list of
supported languages.
Add abstract method on vendor.py for `language_list`.
Manually add list of language codes (`_language_list` and `language_list`) to
Amazon.
Manually add list of language codes to Google.
Add 'name' attribute to each Transcriber
2019-03-25 07:50:39 +01:00
c97ec79ece bumped version 2019-03-08 22:09:14 -05:00
62ecfc9a72 added helper to get transcript type 2019-03-08 21:26:56 -05:00
82efb77ff2 fixed #49, fixed #52 2019-03-08 11:35:05 -05:00
9952f5fe4b added speaker diarization for amazon, not in CLI yet 2019-03-07 23:38:29 -05:00
d029e6ccea removed txt files 2019-03-07 17:58:35 -05:00
13 changed files with 464 additions and 18850 deletions

File diff suppressed because it is too large Load Diff

File diff suppressed because it is too large Load Diff

File diff suppressed because it is too large Load Diff

View File

@@ -28,6 +28,7 @@ tatt is a CLI for creating and managing speech-to-text transcripts.
Commands: Commands:
get Downloads and/or saves completed transcript. get Downloads and/or saves completed transcript.
languages
list Lists available STT services. list Lists available STT services.
services Lists available speech-to-text services. services Lists available speech-to-text services.
status Check the status of a transcription job. status Check the status of a transcription job.
@@ -47,6 +48,23 @@ tatt is a CLI for creating and managing speech-to-text transcripts.
Okay, transcribing <path_to_media_file> using <service_name>... Okay, transcribing <path_to_media_file> using <service_name>...
Okay, job <job_name> is being transcribed. Use "get" command to download it. Okay, job <job_name> is being transcribed. Use "get" command to download it.
$ transcribe this --help
Usage: transcribe this [OPTIONS] MEDIA_FILEPATH SERVICE_NAME
Sends a media file to be transcribed.
Options:
--punctuation only for Google Speech, defaults to True
--speaker-id / --no-speaker-id only for google and amazon, defaults to True
--num_speakers INTEGER only for google and amazon, defaults to 2
--model TEXT only for Google Speech, defaults to
"phone_call"
--use-enhanced only for Google Speech, defaults to True
--language-code TEXT only for google and amazon, defaults to en-
US
--help Show this message and exit.
### List Transcripts ### List Transcripts
$ transcribe list $ transcribe list

View File

@@ -1,31 +1,32 @@
from setuptools import setup, find_packages from setuptools import setup, find_packages
with open('README.md') as file: with open("README.md") as file:
long_description = file.read() long_description = file.read()
setup( setup(
name="tatt", name="tatt",
version="0.974", version="0.981",
py_modules=['tatt'], py_modules=["tatt"],
url='https://github.com/zevaverbach/tatt', url="https://github.com/zevaverbach/tatt",
install_requires=[ install_requires=[
'Click', "Click",
'awscli', "awscli",
'boto3', "boto3",
'requests', "requests",
'google-cloud-speech', "google-cloud-speech",
'audioread', "audioread",
'google-cloud-storage', "google-cloud-storage",
], ],
include_package_data=True, include_package_data=True,
packages=find_packages(), packages=find_packages(),
description=('Tatt creates a uniform API for multiple speech-to-text ' description=(
'(STT) services.'), "Tatt creates a uniform API for multiple speech-to-text " "(STT) services."
long_description_content_type='text/markdown', ),
long_description_content_type="text/markdown",
long_description=long_description, long_description=long_description,
entry_points=''' entry_points="""
[console_scripts] [console_scripts]
transcribe=tatt.transcribe:cli transcribe=tatt.transcribe:cli
''', """,
) )

View File

@@ -48,6 +48,12 @@ def get_transcript(job_name) -> tuple:
return transcript, service return transcript, service
def get_transcript_format(job_name) -> str:
job = get_job(job_name)
service = get_service(job['service_name'])
return service.transcript_type
def get_service(service_name) -> TranscriberBaseClass: def get_service(service_name) -> TranscriberBaseClass:
module = vendors.SERVICES[service_name] module = vendors.SERVICES[service_name]
return getattr(module, config.SERVICE_CLASS_NAME) return getattr(module, config.SERVICE_CLASS_NAME)

View File

@@ -1,23 +1,30 @@
from unittest import mock from unittest import mock
from pytest import raises, fixture
from tatt.vendors.amazon import Transcriber from tatt.vendors.amazon import Transcriber
@fixture
def media_filepath():
return "/Users/zev/tester.mp3"
def test_transcriber_instantiate():
filepath = '/Users/zev/tester.mp3' @fixture
t = Transcriber(filepath) def transcriber_instance(media_filepath):
assert str(t.filepath) == filepath return Transcriber(media_filepath)
assert t.basename == 'tester.mp3'
assert t.media_file_uri == (
f'https://s3-us-east-1.amazonaws.com/tatt-media-amazon/tester.mp3' def test_transcriber_instance(media_filepath, transcriber_instance):
assert str(transcriber_instance.filepath) == media_filepath
assert transcriber_instance.basename == "tester.mp3"
assert transcriber_instance.media_file_uri == (
f"https://s3-us-east-1.amazonaws.com/tatt-media-amazon/tester.mp3"
) )
@mock.patch('tatt.vendors.amazon.tr.get_transcription_job') @mock.patch("tatt.vendors.amazon.tr.get_transcription_job")
def test_transcriber_retrieve(get_transcription_job): def test_transcriber_retrieve(get_transcription_job):
filepath = '/Users/zev/tester.mp3' job_name = "4db6808e-a7e8-4d8d-a1b7-753ab97094dc"
job_name = '4db6808e-a7e8-4d8d-a1b7-753ab97094dc'
t = Transcriber.retrieve_transcript(job_name) t = Transcriber.retrieve_transcript(job_name)
get_transcription_job.assert_called_with(TranscriptionJobName=job_name) get_transcription_job.assert_called_with(TranscriptionJobName=job_name)
@@ -29,9 +36,40 @@ def test_transcriber_get_transcription_jobs():
def test_transcriber_retrieve_transcript(): def test_transcriber_retrieve_transcript():
jobs = Transcriber.get_transcription_jobs() jobs = Transcriber.get_transcription_jobs()
assert jobs
for j in jobs: for j in jobs:
if j['status'].lower() == 'completed': if j["status"].lower() == "completed":
to_get = j['name'] to_get = j["name"]
break break
transcript = Transcriber.retrieve_transcript(to_get) transcript = Transcriber.retrieve_transcript(to_get)
assert transcript == {'jobName': 'abcd.mp3', 'accountId': '416321668733', 'results': {'transcripts': [{'transcript': 'Hello there.'}], 'items': [{'start_time': '0.0', 'end_time': '0.35', 'alternatives': [{'confidence': '0.8303', 'content': 'Hello'}], 'type': 'pronunciation'}, {'start_time': '0.35', 'end_time': '0.76', 'alternatives': [{'confidence': '1.0000', 'content': 'there'}], 'type': 'pronunciation'}, {'alternatives': [{'confidence': None, 'content': '.'}], 'type': 'punctuation'}]}, 'status': 'COMPLETED'} assert transcript == {
"jobName": "abcd.mp3",
"accountId": "416321668733",
"results": {
"transcripts": [{"transcript": "Hello there."}],
"items": [
{
"start_time": "0.0",
"end_time": "0.35",
"alternatives": [{"confidence": "0.8303", "content": "Hello"}],
"type": "pronunciation",
},
{
"start_time": "0.35",
"end_time": "0.76",
"alternatives": [{"confidence": "1.0000", "content": "there"}],
"type": "pronunciation",
},
{
"alternatives": [{"confidence": None, "content": "."}],
"type": "punctuation",
},
],
},
"status": "COMPLETED",
}
def test_transcribe_with_nonexistent_language_code(transcriber_instance):
with raises(KeyError):
transcriber_instance.transcribe(language_code="pretend-lang")

View File

@@ -3,6 +3,6 @@ from tatt.vendors import SERVICES
def test_services(): def test_services():
for service in SERVICES.values(): for service in SERVICES.values():
assert hasattr(service, 'Transcriber') assert hasattr(service, "Transcriber")
assert hasattr(service, 'NAME') assert hasattr(service, "NAME")
assert hasattr(service.Transcriber, 'cost_per_15_seconds') assert hasattr(service.Transcriber, "cost_per_15_seconds")

View File

@@ -15,44 +15,44 @@ def cli():
@cli.command() @cli.command()
@click.option('-s', '--save', is_flag=True, help='save to file') @click.option("-s", "--save", is_flag=True, help="save to file")
@click.option('-p', '--pretty', is_flag=True, @click.option(
help='pretty print, will make output non-pipeable') "-p", "--pretty", is_flag=True, help="pretty print, will make output non-pipeable"
@click.argument('name') )
@click.argument("name")
def get(name, save, pretty): def get(name, save, pretty):
"""Downloads and/or saves completed transcript.""" """Downloads and/or saves completed transcript."""
try: try:
transcript, service = helpers.get_transcript(name) transcript, service = helpers.get_transcript(name)
except exceptions.DoesntExistError: except exceptions.DoesntExistError:
raise click.ClickException(f'no such transcript {name}') raise click.ClickException(f"no such transcript {name}")
except exceptions.NotAvailable as e: except exceptions.NotAvailable as e:
raise click.ClickException(str(e)) raise click.ClickException(str(e))
file = None file = None
if service.transcript_type == dict: if service.transcript_type == dict:
transcript = json.dumps(transcript, indent=4 if pretty else None) transcript = json.dumps(transcript, indent=4 if pretty else None)
filepath = f'{name}.json' filepath = f"{name}.json"
else: else:
filepath = f'{name}.txt' filepath = f"{name}.txt"
if save: if save:
file = open(filepath, 'w') file = open(filepath, "w")
click.echo(transcript, file=file) click.echo(transcript, file=file)
if file: if file:
click.echo(f'Saved transcript to {filepath}.') click.echo(f"Saved transcript to {filepath}.")
file.close() file.close()
@cli.command() @cli.command()
@click.option('--service', type=str, help="STT service name") @click.option("--service", type=str, help="STT service name")
@click.option('--status', type=str, help="completed | failed | in_progress") @click.option("--status", type=str, help="completed | failed | in_progress")
def list(service, status): def list(service, status):
"""Lists available STT services.""" """Lists all transcription jobs."""
if service is not None and service not in vendors.SERVICES: if service is not None and service not in vendors.SERVICES:
raise click.ClickException(f'no such service: {service}') raise click.ClickException(f"no such service: {service}")
try: try:
all_jobs = get_transcription_jobs(service_name=service, status=status) all_jobs = get_transcription_jobs(service_name=service, status=status)
@@ -60,26 +60,26 @@ def list(service, status):
raise click.ClickException(str(e)) raise click.ClickException(str(e))
else: else:
if not all_jobs: if not all_jobs:
raise click.ClickException('no transcripts currently!') raise click.ClickException("no transcripts currently!")
helpers.print_transcription_jobs(all_jobs) helpers.print_transcription_jobs(all_jobs)
@cli.command() @cli.command()
@click.option('-f', '--free-only', is_flag=True, help='only free services') @click.option("-f", "--free-only", is_flag=True, help="only free services")
def services(free_only): def services(free_only):
"""Lists available speech-to-text services.""" """Lists available speech-to-text services."""
click.echo(helpers.make_string_all_services(free_only)) click.echo(helpers.make_string_all_services(free_only))
@cli.command() @cli.command()
@click.argument('job_name', type=str) @click.argument("job_name", type=str)
def status(job_name): def status(job_name):
"""Check the status of a transcription job.""" """Check the status of a transcription job."""
try: try:
jobs = get_transcription_jobs(name=job_name) jobs = get_transcription_jobs(name=job_name)
except exceptions.DoesntExistError: except exceptions.DoesntExistError:
raise click.ClickException('no job by that name') raise click.ClickException("no job by that name")
for job_list in jobs.values(): for job_list in jobs.values():
for job in job_list: for job in job_list:
click.echo(f'{job["name"]}\t{job["status"]}') click.echo(f'{job["name"]}\t{job["status"]}')
@@ -88,45 +88,98 @@ def status(job_name):
@cli.command() @cli.command()
@click.option('--punctuation', is_flag=True, default=True, @click.argument("service_name", type=click.Choice(vendors.SERVICES))
help='only for Google Speech, defaults to True') def languages(service_name):
@click.option('--speaker-id', is_flag=True, default=True, service = get_service(service_name)
help='only for Google Speech, defaults to True') languages_string = "\n" + "\n".join(service.language_list())
@click.option('--model', default='phone_call', click.echo(f"{service.name} supports {languages_string}")
help='only for Google Speech, defaults to "phone_call"')
@click.option('--use-enhanced', is_flag=True, default=True,
help='only for Google Speech, defaults to True') @cli.command()
@click.argument('media_filepath', type=str) @click.option(
@click.argument('service_name', type=str) "--punctuation",
def this(media_filepath, service_name, punctuation, speaker_id, model, is_flag=True,
use_enhanced): default=True,
help="only for Google Speech, defaults to True",
)
@click.option(
"--speaker-id/--no-speaker-id",
is_flag=True,
default=True,
help="only for google and amazon, defaults to True",
)
@click.option(
"--num_speakers",
default=2,
type=int,
help="only for google and amazon, defaults to 2",
)
@click.option(
"--model",
default="phone_call",
help='only for Google Speech, defaults to "phone_call"',
)
@click.option(
"--use-enhanced",
is_flag=True,
default=True,
help="only for Google Speech, defaults to True",
)
@click.option(
"--language-code",
default="en-US",
help="only for google and amazon, defaults to en-US",
)
@click.argument("media_filepath", type=str)
@click.argument("service_name", type=str)
def this(
media_filepath,
service_name,
punctuation,
speaker_id,
num_speakers,
model,
use_enhanced,
language_code,
):
"""Sends a media file to be transcribed.""" """Sends a media file to be transcribed."""
if service_name == 'google': if service_name == "google":
transcribe_kwargs = dict( transcribe_kwargs = dict(
enable_automatic_punctuation=punctuation, enable_automatic_punctuation=punctuation,
enable_speaker_diarization=speaker_id, enable_speaker_diarization=speaker_id,
model=model, model=model,
use_enhanced=use_enhanced, use_enhanced=use_enhanced,
num_speakers=num_speakers,
language_code=language_code,
)
elif service_name == "amazon":
transcribe_kwargs = dict(
enable_speaker_diarization=speaker_id,
num_speakers=num_speakers,
language_code=language_code,
) )
else: else:
transcribe_kwargs = {} transcribe_kwargs = {}
try: try:
service = get_service(service_name) service = get_service(service_name)
except KeyError as e: except KeyError as e:
raise click.ClickException( raise click.ClickException(
f'No such service! {helpers.make_string_all_services()}') f"No such service! {helpers.make_string_all_services()}"
)
try: try:
s = service(media_filepath) s = service(media_filepath)
except exceptions.ConfigError as e: except exceptions.ConfigError as e:
raise click.ClickException(str(e)) raise click.ClickException(str(e))
click.echo( click.echo(f"Okay, transcribing {media_filepath} using {service_name}...")
f'Okay, transcribing {media_filepath} using {service_name}...')
try: try:
job_num = s.transcribe(**transcribe_kwargs) job_num = s.transcribe(**transcribe_kwargs)
except exceptions.AlreadyExistsError as e: except exceptions.AlreadyExistsError as e:
raise click.ClickException(str(e)) raise click.ClickException(str(e))
click.echo(f'Okay, job {job_num} is being transcribed. Use "get" ' click.echo(
'command to download it.') f'Okay, job {job_num} is being transcribed. Use "get" '
"command to download it."
)

View File

@@ -1,7 +1,3 @@
from tatt.vendors import amazon, google from tatt.vendors import amazon, google
SERVICES = { SERVICES = {"amazon": amazon, "google": google}
'amazon': amazon,
'google': google,
}

117
tatt/vendors/amazon.py vendored
View File

@@ -11,7 +11,7 @@ from tatt import config
from tatt import exceptions from tatt import exceptions
from .vendor import TranscriberBaseClass from .vendor import TranscriberBaseClass
NAME = 'amazon' NAME = "amazon"
BUCKET_NAME_MEDIA = config.BUCKET_NAME_FMTR_MEDIA.format(NAME) BUCKET_NAME_MEDIA = config.BUCKET_NAME_FMTR_MEDIA.format(NAME)
BUCKET_NAME_TRANSCRIPT = config.BUCKET_NAME_FMTR_TRANSCRIPT.format(NAME) BUCKET_NAME_TRANSCRIPT = config.BUCKET_NAME_FMTR_TRANSCRIPT.format(NAME)
TRANSCRIPT_TYPE = dict TRANSCRIPT_TYPE = dict
@@ -19,24 +19,35 @@ TRANSCRIPT_TYPE = dict
def _check_for_config() -> bool: def _check_for_config() -> bool:
return ( return (
config.AWS_CONFIG_FILEPATH.exists() config.AWS_CONFIG_FILEPATH.exists() and config.AWS_CREDENTIALS_FILEPATH.exists()
and config.AWS_CREDENTIALS_FILEPATH.exists()
) )
class Transcriber(TranscriberBaseClass): class Transcriber(TranscriberBaseClass):
cost_per_15_seconds = .024 / 4 name = NAME
bucket_names = {'media': BUCKET_NAME_MEDIA, cost_per_15_seconds = 0.024 / 4
'transcript': BUCKET_NAME_TRANSCRIPT} bucket_names = {"media": BUCKET_NAME_MEDIA, "transcript": BUCKET_NAME_TRANSCRIPT}
no_config_error_message = 'please run "aws configure" first' no_config_error_message = 'please run "aws configure" first'
transcript_type = TRANSCRIPT_TYPE transcript_type = TRANSCRIPT_TYPE
# https://boto3.amazonaws.com/v1/documentation/api/latest/reference/services/transcribe.html
_language_list = [
"en-US",
"es-US",
"en-AU",
"fr-CA",
"en-GB",
"de-DE",
"pt-BR",
"fr-FR",
"it-IT",
"ko-KR",
]
if _check_for_config(): if _check_for_config():
tr = boto3.client('transcribe') tr = boto3.client("transcribe")
s3 = boto3.resource('s3') s3 = boto3.resource("s3")
def __init__(self, filepath): def __init__(self, filepath):
super().__init__(filepath) super().__init__(filepath)
@@ -70,83 +81,97 @@ class Transcriber(TranscriberBaseClass):
def make_bucket(cls, bucket_name): def make_bucket(cls, bucket_name):
cls.s3.create_bucket(Bucket=bucket_name) cls.s3.create_bucket(Bucket=bucket_name)
def transcribe(self) -> str: def transcribe(self, **kwargs) -> str:
super().transcribe(**kwargs)
self._upload_file() self._upload_file()
try: try:
return self._request_transcription() return self._request_transcription(**kwargs)
except self.tr.exceptions.ConflictException: except self.tr.exceptions.ConflictException:
raise exceptions.AlreadyExistsError( raise exceptions.AlreadyExistsError(
f'{self.basename} already exists on {NAME}') f"{self.basename} already exists on {NAME}"
)
def _upload_file(self): def _upload_file(self):
self.s3.Bucket(self.bucket_names['media']).upload_file( self.s3.Bucket(self.bucket_names["media"]).upload_file(
str(self.filepath), str(self.filepath), self.basename
self.basename) )
def _request_transcription(self, language_code='en-US') -> str: def _request_transcription(
self, language_code="en-US", num_speakers=2, enable_speaker_diarization=True
) -> str:
job_name = self.basename job_name = self.basename
self.tr.start_transcription_job(
kwargs = dict(
TranscriptionJobName=job_name, TranscriptionJobName=job_name,
LanguageCode=language_code, LanguageCode=language_code,
MediaFormat=self.basename.split('.')[-1].lower(), MediaFormat=self.basename.split(".")[-1].lower(),
Media={ Media={"MediaFileUri": self.media_file_uri},
'MediaFileUri': self.media_file_uri OutputBucketName=self.bucket_names["transcript"],
},
OutputBucketName=self.bucket_names['transcript']
) )
if enable_speaker_diarization:
kwargs.update(
dict(
Settings={
"ShowSpeakerLabels": True,
"MaxSpeakerLabels": num_speakers,
}
)
)
self.tr.start_transcription_job(**kwargs)
return job_name return job_name
@classmethod @classmethod
def get_transcription_jobs( def get_transcription_jobs(
cls, cls, status: str = None, job_name_query: str = None
status:str = None,
job_name_query:str = None,
) -> List[dict]: ) -> List[dict]:
kwargs = {'MaxResults': 100} kwargs = {"MaxResults": 100}
if status is not None: if status is not None:
kwargs['Status'] = status.upper() kwargs["Status"] = status.upper()
if job_name_query is not None: if job_name_query is not None:
kwargs['JobNameContains'] = job_name_query kwargs["JobNameContains"] = job_name_query
jobs_data = cls.tr.list_transcription_jobs(**kwargs) jobs_data = cls.tr.list_transcription_jobs(**kwargs)
key = 'TranscriptionJobSummaries' key = "TranscriptionJobSummaries"
jobs = cls.homogenize_transcription_job_data(jobs_data[key]) jobs = cls.homogenize_transcription_job_data(jobs_data[key])
while jobs_data.get('NextToken'): while jobs_data.get("NextToken"):
token = jobs_data['NextToken'] token = jobs_data["NextToken"]
jobs_data = cls.tr.list_transcription_jobs(NextToken=token) jobs_data = cls.tr.list_transcription_jobs(NextToken=token)
jobs += cls.homogenize_transcription_job_data(jobs_data[key]) jobs += cls.homogenize_transcription_job_data(jobs_data[key])
return jobs return jobs
@classmethod @classmethod
def retrieve_transcript(cls, transcription_job_name: str def retrieve_transcript(cls, transcription_job_name: str) -> TRANSCRIPT_TYPE:
) -> TRANSCRIPT_TYPE: job = cls.tr.get_transcription_job(TranscriptionJobName=transcription_job_name)[
job = cls.tr.get_transcription_job( "TranscriptionJob"
TranscriptionJobName=transcription_job_name ]
)['TranscriptionJob']
if not job['TranscriptionJobStatus'] == 'COMPLETED': if not job["TranscriptionJobStatus"] == "COMPLETED":
return return
transcript_file_uri = job['Transcript']['TranscriptFileUri'] transcript_file_uri = job["Transcript"]["TranscriptFileUri"]
transcript_path = transcript_file_uri.split("amazonaws.com/", 1)[1] transcript_path = transcript_file_uri.split("amazonaws.com/", 1)[1]
transcript_bucket = transcript_path.split('/', 1)[0] transcript_bucket = transcript_path.split("/", 1)[0]
transcript_key = transcript_path.split('/', 1)[1] transcript_key = transcript_path.split("/", 1)[1]
s3_object = cls.s3.Object(transcript_bucket, transcript_key).get() s3_object = cls.s3.Object(transcript_bucket, transcript_key).get()
transcript_json = s3_object['Body'].read().decode('utf-8') transcript_json = s3_object["Body"].read().decode("utf-8")
return json.loads(transcript_json) return json.loads(transcript_json)
@staticmethod @staticmethod
def homogenize_transcription_job_data(transcription_job_data): def homogenize_transcription_job_data(transcription_job_data):
return [{ return [
'created': jd['CreationTime'], {
'name': jd['TranscriptionJobName'], "created": jd["CreationTime"],
'status': jd['TranscriptionJobStatus'] "name": jd["TranscriptionJobName"],
"status": jd["TranscriptionJobStatus"],
} }
for jd in transcription_job_data] for jd in transcription_job_data
]

204
tatt/vendors/google.py vendored
View File

@@ -12,31 +12,155 @@ from google.cloud import (
speech_v1p1beta1 as speech, speech_v1p1beta1 as speech,
storage, storage,
exceptions as gc_exceptions, exceptions as gc_exceptions,
) )
from tatt import exceptions, helpers, config as config_mod from tatt import exceptions, helpers, config as config_mod
from .vendor import TranscriberBaseClass from .vendor import TranscriberBaseClass
NAME = 'google' NAME = "google"
BUCKET_NAME_TRANSCRIPT = config_mod.BUCKET_NAME_FMTR_TRANSCRIPT_GOOGLE.format( BUCKET_NAME_TRANSCRIPT = config_mod.BUCKET_NAME_FMTR_TRANSCRIPT_GOOGLE.format("goog")
'goog')
TRANSCRIPT_TYPE = str TRANSCRIPT_TYPE = str
def _check_for_config(): def _check_for_config():
return os.getenv('GOOGLE_APPLICATION_CREDENTIALS') is not None return os.getenv("GOOGLE_APPLICATION_CREDENTIALS") is not None
class Transcriber(TranscriberBaseClass): class Transcriber(TranscriberBaseClass):
SUPPORTED_FORMATS = ['flac'] name = NAME
cost_per_15_seconds = [.004, .006, .009] SUPPORTED_FORMATS = ["flac"]
cost_per_15_seconds = [0.004, 0.006, 0.009]
no_config_error_message = ( no_config_error_message = (
'Please sign up for the Google Speech-to-Text API ' "Please sign up for the Google Speech-to-Text API "
'and put the path to your credentials in an ' "and put the path to your credentials in an "
'environment variable "GOOGLE_APPLICATION_CREDENTIALS"' 'environment variable "GOOGLE_APPLICATION_CREDENTIALS"'
) )
transcript_type = TRANSCRIPT_TYPE transcript_type = TRANSCRIPT_TYPE
# https://cloud.google.com/speech-to-text/docs/languages
# Array.from(document.querySelector('.devsite-table-wrapper').querySelectorAll('table tr')).slice(1).map(row => row.children[1].innerText)
_language_list = [
"af-ZA",
"am-ET",
"hy-AM",
"az-AZ",
"id-ID",
"ms-MY",
"bn-BD",
"bn-IN",
"ca-ES",
"cs-CZ",
"da-DK",
"de-DE",
"en-AU",
"en-CA",
"en-GH",
"en-GB",
"en-IN",
"en-IE",
"en-KE",
"en-NZ",
"en-NG",
"en-PH",
"en-SG",
"en-ZA",
"en-TZ",
"en-US",
"es-AR",
"es-BO",
"es-CL",
"es-CO",
"es-CR",
"es-EC",
"es-SV",
"es-ES",
"es-US",
"es-GT",
"es-HN",
"es-MX",
"es-NI",
"es-PA",
"es-PY",
"es-PE",
"es-PR",
"es-DO",
"es-UY",
"es-VE",
"eu-ES",
"fil-PH",
"fr-CA",
"fr-FR",
"gl-ES",
"ka-GE",
"gu-IN",
"hr-HR",
"zu-ZA",
"is-IS",
"it-IT",
"jv-ID",
"kn-IN",
"km-KH",
"lo-LA",
"lv-LV",
"lt-LT",
"hu-HU",
"ml-IN",
"mr-IN",
"nl-NL",
"ne-NP",
"nb-NO",
"pl-PL",
"pt-BR",
"pt-PT",
"ro-RO",
"si-LK",
"sk-SK",
"sl-SI",
"su-ID",
"sw-TZ",
"sw-KE",
"fi-FI",
"sv-SE",
"ta-IN",
"ta-SG",
"ta-LK",
"ta-MY",
"te-IN",
"vi-VN",
"tr-TR",
"ur-PK",
"ur-IN",
"el-GR",
"bg-BG",
"ru-RU",
"sr-RS",
"uk-UA",
"he-IL",
"ar-IL",
"ar-JO",
"ar-AE",
"ar-BH",
"ar-DZ",
"ar-SA",
"ar-IQ",
"ar-KW",
"ar-MA",
"ar-TN",
"ar-OM",
"ar-PS",
"ar-QA",
"ar-LB",
"ar-EG",
"fa-IR",
"hi-IN",
"th-TH",
"ko-KR",
"zh-TW",
"yue-Hant-HK",
"ja-JP",
"zh-HK",
"zh",
]
if _check_for_config(): if _check_for_config():
speech_client = speech.SpeechClient() speech_client = speech.SpeechClient()
@@ -49,10 +173,11 @@ class Transcriber(TranscriberBaseClass):
@classmethod @classmethod
def _setup(cls): def _setup(cls):
super()._setup() super()._setup()
if not shutil.which('gsutil'): if not shutil.which("gsutil"):
raise exceptions.DependencyRequired( raise exceptions.DependencyRequired(
'Please install gcloud using the steps here:' "Please install gcloud using the steps here:"
'https://cloud.google.com/storage/docs/gsutil_install') "https://cloud.google.com/storage/docs/gsutil_install"
)
cls._make_bucket_if_doesnt_exist(BUCKET_NAME_TRANSCRIPT) cls._make_bucket_if_doesnt_exist(BUCKET_NAME_TRANSCRIPT)
@@ -64,13 +189,13 @@ class Transcriber(TranscriberBaseClass):
# this might fail if a bucket by the name exists *anywhere* on GCS? # this might fail if a bucket by the name exists *anywhere* on GCS?
return return
else: else:
print('made Google Cloud Storage Bucket for transcripts') print("made Google Cloud Storage Bucket for transcripts")
def convert_file_format_if_needed(self): def convert_file_format_if_needed(self):
if self.file_format not in self.SUPPORTED_FORMATS: if self.file_format not in self.SUPPORTED_FORMATS:
if not shutil.which('ffmpeg'): if not shutil.which("ffmpeg"):
raise exceptions.DependencyRequired('please install ffmpeg') raise exceptions.DependencyRequired("please install ffmpeg")
self.filepath = helpers.convert_file(self.filepath, 'flac') self.filepath = helpers.convert_file(self.filepath, "flac")
@property @property
def file_format(self): def file_format(self):
@@ -91,29 +216,33 @@ class Transcriber(TranscriberBaseClass):
def _check_if_transcript_exists(self, transcript_name=None): def _check_if_transcript_exists(self, transcript_name=None):
return storage.Blob( return storage.Blob(
bucket=self.transcript_bucket, bucket=self.transcript_bucket, name=transcript_name or self.basename
name=transcript_name or self.basename
).exists(self.storage_client) ).exists(self.storage_client)
def _request_transcription( def _request_transcription(
self, self,
language_code='en-US', language_code="en-US",
enable_automatic_punctuation=True, enable_automatic_punctuation=True,
enable_speaker_diarization=True, enable_speaker_diarization=True,
model='phone_call', num_speakers=2,
model="phone_call",
use_enhanced=True, use_enhanced=True,
) -> str: ) -> str:
"""Returns the job_name""" """Returns the job_name"""
if self._check_if_transcript_exists(): if self._check_if_transcript_exists():
raise exceptions.AlreadyExistsError( raise exceptions.AlreadyExistsError(
f'{self.basename} already exists on {NAME}') f"{self.basename} already exists on {NAME}"
)
num_audio_channels = helpers.get_num_audio_channels(self.filepath) num_audio_channels = helpers.get_num_audio_channels(self.filepath)
sample_rate = helpers.get_sample_rate(self.filepath) sample_rate = helpers.get_sample_rate(self.filepath)
with io.open(self.filepath, 'rb') as audio_file: with io.open(self.filepath, "rb") as audio_file:
content = audio_file.read() content = audio_file.read()
audio = speech.types.RecognitionAudio(content=content) audio = speech.types.RecognitionAudio(content=content)
if language_code != "en-US":
model = None
config = speech.types.RecognitionConfig( config = speech.types.RecognitionConfig(
encoding=speech.enums.RecognitionConfig.AudioEncoding.FLAC, encoding=speech.enums.RecognitionConfig.AudioEncoding.FLAC,
sample_rate_hertz=sample_rate, sample_rate_hertz=sample_rate,
@@ -124,41 +253,40 @@ class Transcriber(TranscriberBaseClass):
language_code=language_code, language_code=language_code,
enable_automatic_punctuation=enable_automatic_punctuation, enable_automatic_punctuation=enable_automatic_punctuation,
enable_speaker_diarization=enable_speaker_diarization, enable_speaker_diarization=enable_speaker_diarization,
diarization_speaker_count=num_speakers,
model=model, model=model,
use_enhanced=use_enhanced, use_enhanced=use_enhanced,
) )
self.operation = self.speech_client.long_running_recognize(config, self.operation = self.speech_client.long_running_recognize(config, audio)
audio)
print('transcribing...') print("transcribing...")
while not self.operation.done(): while not self.operation.done():
sleep(1) sleep(1)
print('.') print(".")
result_list = [] result_list = []
for result in self.operation.result().results: for result in self.operation.result().results:
result_list.append(str(result)) result_list.append(str(result))
print('saving transcript') print("saving transcript")
transcript_path = '/tmp/transcript.txt' transcript_path = "/tmp/transcript.txt"
with open(transcript_path, 'w') as fout: with open(transcript_path, "w") as fout:
fout.write('\n'.join(result_list)) fout.write("\n".join(result_list))
print('uploading transcript') print("uploading transcript")
self.upload_file(BUCKET_NAME_TRANSCRIPT, transcript_path) self.upload_file(BUCKET_NAME_TRANSCRIPT, transcript_path)
os.remove(transcript_path) os.remove(transcript_path)
return self.basename return self.basename
@classmethod @classmethod
def retrieve_transcript(cls, transcription_job_name: str def retrieve_transcript(cls, transcription_job_name: str) -> TRANSCRIPT_TYPE:
) -> TRANSCRIPT_TYPE:
"""Get transcript from BUCKET_NAME_TRANSCRIPT""" """Get transcript from BUCKET_NAME_TRANSCRIPT"""
if not cls._check_if_transcript_exists( if not cls._check_if_transcript_exists(
cls, cls, transcript_name=transcription_job_name
transcript_name=transcription_job_name): ):
raise exceptions.DoesntExistError('no such transcript!') raise exceptions.DoesntExistError("no such transcript!")
blob = cls.transcript_bucket.blob(transcription_job_name) blob = cls.transcript_bucket.blob(transcription_job_name)
f = tempfile.NamedTemporaryFile(delete=False) f = tempfile.NamedTemporaryFile(delete=False)
f.close() f.close()
@@ -177,7 +305,7 @@ class Transcriber(TranscriberBaseClass):
@classmethod @classmethod
def get_transcription_jobs(cls, job_name_query=None, status=None) -> List[dict]: def get_transcription_jobs(cls, job_name_query=None, status=None) -> List[dict]:
if status and status.lower() != 'completed': if status and status.lower() != "completed":
return [] return []
jobs = [] jobs = []
@@ -185,6 +313,6 @@ class Transcriber(TranscriberBaseClass):
for t in cls.transcript_bucket.list_blobs(): for t in cls.transcript_bucket.list_blobs():
if job_name_query is not None and t.name != job_name_query: if job_name_query is not None and t.name != job_name_query:
continue continue
jobs.append({'name': t.name, 'status': 'COMPLETED'}) jobs.append({"name": t.name, "status": "COMPLETED"})
return jobs return jobs

View File

@@ -12,8 +12,8 @@ class TranscriberBaseClass:
def __init__(self, filepath): def __init__(self, filepath):
self._setup() self._setup()
if ' ' in filepath: if " " in filepath:
raise exceptions.FormatError('Please don\'t put any spaces in the filename.') raise exceptions.FormatError("Please don't put any spaces in the filename.")
self.filepath = PurePath(filepath) self.filepath = PurePath(filepath)
self.basename = str(os.path.basename(self.filepath)) self.basename = str(os.path.basename(self.filepath))
@@ -31,6 +31,11 @@ class TranscriberBaseClass:
def transcript_type(self): def transcript_type(self):
pass pass
@property
@abc.abstractmethod
def _language_list(self):
pass
@property @property
@abc.abstractmethod @abc.abstractmethod
def cost_per_15_seconds(self): def cost_per_15_seconds(self):
@@ -42,32 +47,35 @@ class TranscriberBaseClass:
if not cls.check_for_config(): if not cls.check_for_config():
raise exceptions.ConfigError(cls.no_config_error_message) raise exceptions.ConfigError(cls.no_config_error_message)
@staticmethod @classmethod
@abc.abstractmethod @abc.abstractmethod
def check_for_config() -> bool: def check_for_config(cls) -> bool:
pass pass
@abc.abstractmethod def transcribe(self, **kwargs) -> str:
def transcribe(self) -> str:
""" """
This should do any required logic, This should do any required logic,
then call self._request_transcription. then call self._request_transcription.
It should return the job_name. It should return the job_name.
""" """
pass if kwargs["language_code"] not in self.language_list():
raise KeyError(f"No such language code {kwargs['language_code']}")
@abc.abstractmethod @abc.abstractmethod
def _request_transcription(self) -> str: def _request_transcription(self) -> str:
"""Returns the job_name""" """Returns the job_name"""
pass pass
@classmethod
def language_list(cls) -> List[str]:
return sorted(cls._language_list)
@classmethod @classmethod
@abc.abstractmethod @abc.abstractmethod
def retrieve_transcript(transcription_job_name: str) -> Union[str, dict]: def retrieve_transcript(cls, transcription_job_name: str) -> Union[str, dict]:
pass pass
@classmethod @classmethod
@abc.abstractmethod @abc.abstractmethod
def get_transcription_jobs() -> List[dict]: def get_transcription_jobs() -> List[dict]:
pass pass