updated tests to current format of converters, updated static/not-static decorators on base class to match its children, added GoogleConverter, moved one or two methods to the base class because they all work the same.

This commit is contained in:
2019-03-07 02:30:48 -05:00
parent 6333f55c87
commit d30ecad583
16 changed files with 346 additions and 46 deletions

View File

@@ -0,0 +1 @@
name = 'tpro'

View File

@@ -0,0 +1,5 @@
import os
AMAZON_TRANSCRIPT_TEST_FILE = os.getenv('AMAZON_TRANSCRIPT_TEST_FILE')
GOOGLE_TRANSCRIPT_TEST_FILE = os.getenv('GOOGLE_TRANSCRIPT_TEST_FILE')

View File

@@ -0,0 +1,97 @@
import abc
from collections import namedtuple
import os
from . import helpers
from . import converters
Word = namedtuple('Word', 'start end confidence word always_capitalized next_word')
class TranscriptConverter:
__metaclass__ = abc.ABCMeta
def __init__(self, json_data: dict):
self.json_data = json_data
def convert(self):
tagged_words = None
word_objects = self.get_word_objects(self.json_data)
words = self.get_words(word_objects)
tagged_words = helpers.tag_words(words)
self.converted_words = self.convert_words(
word_objects,
words,
tagged_words
)
@abc.abstractmethod
def get_word_objects(self, json_data):
pass
def get_words(self, word_objects):
return [self.get_word_word(w)
for w in word_objects]
@abc.abstractmethod
def convert_words(self, word_objects, words, tagged_words=None):
pass
@staticmethod
@abc.abstractmethod
def get_word_start(word_object):
pass
@staticmethod
@abc.abstractmethod
def get_word_end(word_object):
pass
@staticmethod
@abc.abstractmethod
def get_word_confidence(word_object):
pass
@staticmethod
@abc.abstractmethod
def get_word_word(word_object):
pass
@staticmethod
def check_if_always_capitalized(word, index, tagged_words):
if word.upper() == 'I':
return True
word_category = tagged_words[index][1]
return word_category in helpers.PROPER_NOUN_TAGS
def get_word_object(self, word_object, index, tagged_words, word_objects):
word = self.get_word_word(word_object)
return Word(
self.get_word_start(word_object),
self.get_word_end(word_object),
self.get_word_confidence(word_object),
word,
self.check_if_always_capitalized(word, index, tagged_words),
self.get_next_word(word_objects, index)
)
def get_next_word(self, word_objects, index):
if index < len(word_objects) - 1:
return word_objects[index + 1]
def save(self, path, output_target):
with open(path, 'w') as fout:
fout.write(getattr(self, output_target)())
return path
from . import outputs
for name, val in outputs.__dict__.items():
if callable(val):
setattr(TranscriptConverter, name, val)

View File

@@ -0,0 +1,11 @@
from .amazon import AmazonConverter
from .speechmatics import SpeechmaticsConverter
from .gentle import GentleConverter
from .google import GoogleConverter
services = {
'amazon': AmazonConverter,
'gentle': GentleConverter,
'speechmatics': SpeechmaticsConverter,
'google': GoogleConverter,
}

View File

@@ -0,0 +1,81 @@
import json
from ..converter import TranscriptConverter
from .. import helpers
class AmazonConverter(TranscriptConverter):
name = 'amazon'
def __init__(self, json_data):
super().__init__(json_data)
def get_word_objects(self, json_data):
return json_data['results']['items']
@staticmethod
def get_word_start(word_object):
return float(word_object['start_time'])
@staticmethod
def get_word_end(word_object):
return float(word_object['end_time'])
@staticmethod
def get_word_confidence(word_object):
return float(word_object['alternatives'][0]['confidence'])
@staticmethod
def get_word_word(word_object) -> str:
word_word = word_object['alternatives'][0]['content']
if word_word == 'i':
# weird Amazon quirk
word_word = 'I'
return word_word
def convert_words(self, word_objects, words, tagged_words=None):
converted_words = []
punc_before = False
punc_after = False
for i, w in enumerate(word_objects):
if w['type'] == 'punctuation':
continue
next_word_punc_after = None
word_obj = self.get_word_object(w, i, tagged_words, word_objects)
if word_obj.next_word:
next_word = self.get_word_word(word_obj.next_word)
next_word_type = word_obj.next_word['type']
if next_word in ['.', ',']:
punc_after = next_word
elif next_word_punc_after:
punc_after = next_word_punc_after
next_word_punc_after = None
if word_obj.word.lower() == 'you' and next_word == 'know':
prev_word = word_objects[i - 1]
if prev_word['type'] != 'punctuation':
converted_words[-1]['punc_after'] = ','
if next_word_type != 'punctuation':
next_word_punc_after = ','
converted_words.append({
'start': word_obj.start,
'end': word_obj.end,
'confidence': word_obj.confidence,
'word': word_obj.word,
'always_capitalized': self.check_if_always_capitalized(
word_obj.word,
i,
tagged_words),
'punc_after': punc_after,
'punc_before': punc_before,
})
punc_after = False
return converted_words

View File

@@ -0,0 +1,55 @@
from ..converter import TranscriptConverter
class GentleConverter(TranscriptConverter):
name = 'gentle'
def __init__(self, json_data):
super().__init__(json_data)
def get_word_objects(self, json_data):
return json_data['words']
@staticmethod
def get_word_start(word_object):
return word_object['start']
@staticmethod
def get_word_end(word_object):
return word_object['end']
@staticmethod
def get_word_confidence(word_object):
return 1
@staticmethod
def get_word_word(word_object):
return word_object['alignedWord']
def convert_words(self, word_objects, words, tagged_words=None):
converted_words = []
num_words = len(words)
for i, w in enumerate(word_objects):
word_obj = self.get_word_object(w, i, tagged_words, word_objects)
converted_words.append({
'start': word_obj.start,
'end': word_obj.end,
'confidence': word_obj.confidence,
'word': word_obj.word,
'always_capitalized': self.check_if_always_capitalized(
word_obj.word,
i,
tagged_words),
'punc_after': False,
'punc_before': False,
})
punc_after = False
return converted_words

View File

@@ -0,0 +1,145 @@
import json
import re
from ..converter import TranscriptConverter
from .. import helpers
class GoogleConverter(TranscriptConverter):
def __init__(self, transcript_data: str):
super().__init__(transcript_data)
self.json_data = self.pre_process(transcript_data)
def pre_process(self, transcript_data):
friendly = make_json_friendly(transcript_data)
return json.loads(friendly)
def get_word_objects(self, json_data):
return json_data
def convert_words(self, word_objects, words, tagged_words=None):
converted_words = []
punc_before = False
punc_after = False
for i, w in enumerate(word_objects):
word_obj = self.get_word_object(w, i, tagged_words, word_objects)
punc_before = helpers.get_punc_before(word_obj.word) or False
punc_after = helpers.get_punc_after(word_obj.word) or False
the_word = word_obj.word
if punc_before:
the_word = the_word[len(punc_before):]
if punc_after:
the_word = the_word[:-len(punc_after)]
converted_words.append({
'start': word_obj.start,
'end': word_obj.end,
'confidence': word_obj.confidence,
'word': the_word,
'always_capitalized': self.check_if_always_capitalized(
word_obj.word,
i,
tagged_words),
'punc_after': punc_after,
'punc_before': punc_before,
})
return converted_words
@classmethod
def get_word_start(cls, word_object):
return cls.get_seconds(word_object['start_time'])
@classmethod
def get_word_end(cls, word_object):
return cls.get_seconds(word_object['end_time'])
@staticmethod
def get_seconds(time: dict) -> float:
seconds = 0
if 'seconds' in time:
seconds = time['seconds']
if 'nanos' in time:
seconds += time['nanos'] / 1_000_000_000
return seconds
@staticmethod
def get_word_confidence(word_object):
return word_object['confidence']
@staticmethod
def get_word_word(word_object):
print(word_object)
return word_object['word']
def make_json_friendly(json_string):
lines = [line.strip() for line in json_string.split('\\n')]
fields = [
'words {',
'start_time {',
'}',
'end_time {',
'}',
'word: ',
'confidence: '
]
current_field_index = 0
new_string = ''
for line in lines:
current_field = fields[current_field_index]
if current_field in line:
if current_field_index == len(fields) - 1:
current_field_index = 0
else:
current_field_index += 1
if current_field_index == 1:
new_string += '}, {'
# "words" was found, don't want to append that
continue
else:
if current_field_index == 0:
# haven't found the beginning of the next word object
continue
# add quotes around keys
line = re.sub('^(?!")([0-9a-zA-Z_]+)',
'"\\1"',
line)
# add colons after keys
if line.endswith('{'):
line = line.replace('" ', '": ')
# use first two decimals of confidence
if 'confidence' in current_field:
line = ', ' + line
line = line[:20]
if current_field == '}':
line = line + ', '
new_string += line
# cleanup
if new_string.startswith('}, '):
new_string = new_string[3:]
if not new_string.startswith('['):
new_string = '[' + new_string
if not new_string.endswith('}]'):
new_string = new_string + '}]'
new_string = new_string.replace(', }', '}').replace('\\', '')
return new_string

View File

@@ -0,0 +1,133 @@
from collections import namedtuple
import json
from ..converter import TranscriptConverter
from .. import helpers
class SpeechmaticsConverter(TranscriptConverter):
name = 'speechmatics'
def __init__(self, path):
super().__init__(path)
def get_word_objects(self, json_data):
return json_data['words']
@staticmethod
def get_word_start(word_object):
return float(word_object['time'])
@staticmethod
def get_word_end(word_object):
return (SpeechmaticsConverter.get_word_start(word_object)
+ float(word_object['duration']))
@staticmethod
def get_word_confidence(word_object):
return float(word_object['confidence'])
@staticmethod
def get_word_word(word_object):
return word_object['name']
def convert_words(self, word_objects, words, tagged_words=None):
converted_words = []
punc_before = False
punc_after = False
num_words = len(words)
for i, w in enumerate(word_objects):
word_obj = self.get_word_object(w, i, tagged_words, word_objects)
if word_obj.word == '.':
continue
if word_obj.next_word:
next_word = self.get_word_word(word_obj.next_word)
if next_word == '.':
punc_after = '.'
converted_words.append({
'start': word_obj.start,
'end': word_obj.end,
'confidence': word_obj.confidence,
'word': word_obj.word,
'always_capitalized': self.check_if_always_capitalized(
word_obj.word,
i,
tagged_words),
'punc_after': punc_after,
'punc_before': punc_before,
})
punc_after = False
return converted_words
def speechmatics_aligned_text_converter(data):
data = data.readlines()[0]
class Exhausted(Exception):
pass
Word = namedtuple('Word', 'start end word')
def get_time(transcript, index):
time_index = transcript.find('time=', index)
if time_index == -1:
raise Exhausted
close_index = transcript.find('>', time_index)
return float(transcript[time_index + 5: close_index]), close_index
def find_next_word(transcript, start_index):
start, end_of_start_index = get_time(transcript, start_index)
word_start_index = end_of_start_index + 1
word_end_index = transcript.find('<', word_start_index)
word = transcript[word_start_index: word_end_index]
end, close_index = get_time(transcript, word_end_index)
return Word(start, end, word), close_index
words = []
next_index = 0
word = None
while True:
try:
word, next_index = find_next_word(data, next_index)
except Exhausted:
break
else:
words.append(word)
tagged_words = helpers.tag_words([w.word for w in words])
converted_words = []
for i, word in enumerate(words):
is_proper_noun = tagged_words[i][1] in helpers.PROPER_NOUN_TAGS
punc_before = helpers.get_punc_before(word.word)
punc_after = helpers.get_punc_after(word.word)
the_word = word.word
if punc_before or punc_after:
for p in helpers.PUNCTUATION:
the_word = the_word.replace(p, '')
converted_words.append({
'start': word.start,
'end': word.end,
'confidence': 1,
'word': the_word,
'always_capitalized': self.check_if_always_capitalized(
word.word,
i,
tagged_words),
'index': i,
'punc_before': punc_before,
'punc_after': punc_after,
})
return converted_words

View File

@@ -0,0 +1,46 @@
from pathlib import Path
from nltk.tag.stanford import StanfordNERTagger
st = StanfordNERTagger('/usr/local/bin/english.all.3class.distsim.crf.ser.gz',
'/usr/local/bin/stanford-ner.jar')
PROPER_NOUN_TAGS = ['ORGANIZATION', 'PERSON', 'LOCATION']
PUNCTUATION = ['.', '?', ',', ':', '"', '!']
def tag_words(words):
return st.tag(words)
def is_a_proper_noun(phrase):
tagged_words = tag_words(phrase.split())
return any(tagged_word[1] in PROPER_NOUN_TAGS
for tagged_word in tagged_words)
def get_punc_before(word):
punc = []
for char in word:
if char.isalpha():
return punc
if char in PUNCTUATION:
punc.append(char)
def get_punc_after(word):
punc = []
for char in reversed(word):
if char.isalpha():
return punc
if char in PUNCTUATION:
punc.insert(0, char)
def is_path(string):
try:
return Path(string).exists()
except OSError:
return False

View File

@@ -0,0 +1,14 @@
import json
def universal_transcript(self, pretty=False):
return json.dumps(self.converted_words, indent=4 if pretty else None)
def viral_overlay(self, pretty=False):
return json.dumps([{
'start': word['start'],
'stop': word['end'],
'text': word['word'].title() if word['always_capitalized'] else word['word']}
for word in self.converted_words], indent=4 if pretty else None
)

View File

@@ -0,0 +1,40 @@
import json
import click
from .converters import services
from . import outputs
from . import helpers
output_choices = [k for k, v in
outputs.__dict__.items()
if callable(v)]
@click.command()
@click.option('-s', '--save', type=str, help='save to JSON file')
@click.option('-p', '--pretty', is_flag=True,
help='pretty print the transcript, breaks pipeability')
@click.argument('json_path_or_data', type=str)
@click.argument('input_format', type=click.Choice(services.keys()))
@click.argument('output_format', type=click.Choice(output_choices))
def cli(save,
pretty,
json_path_or_data,
input_format,
output_format):
if not helpers.is_path(json_path_or_data):
json_data = json.loads(json_path_or_data)
else:
with open(json_path_or_data) as fin:
json_data = json.load(fin)
service = services[input_format]
converter = service(json_data)
converter.convert()
if save:
path = save
converter.save(path, output_format)
click.echo(f'{path} saved.')
else:
output_formatter = getattr(converter, output_format)
click.echo(output_formatter(pretty))