48 lines
1.5 KiB
Python
48 lines
1.5 KiB
Python
# fileparse.py
|
|
import csv
|
|
import logging
|
|
log = logging.getLogger(__name__)
|
|
|
|
def parse_csv(lines, select=None, types=None, has_headers=True, delimiter=',', silence_errors=False):
|
|
'''
|
|
Parse a CSV file into a list of records with type conversion.
|
|
'''
|
|
assert not (select and not has_headers), 'select requires column headers'
|
|
rows = csv.reader(lines, delimiter=delimiter)
|
|
|
|
# Read the file headers (if any)
|
|
headers = next(rows) if has_headers else []
|
|
|
|
# If specific columns have been selected, make indices for filtering and set output columns
|
|
if select:
|
|
indices = [ headers.index(colname) for colname in select ]
|
|
headers = select
|
|
|
|
records = []
|
|
for rowno, row in enumerate(rows, 1):
|
|
if not row: # Skip rows with no data
|
|
continue
|
|
|
|
# If specific column indices are selected, pick them out
|
|
if select:
|
|
row = [ row[index] for index in indices]
|
|
|
|
# Apply type conversion to the row
|
|
if types:
|
|
try:
|
|
row = [func(val) for func, val in zip(types, row)]
|
|
except ValueError as e:
|
|
if not silence_errors:
|
|
log.warning("Row %d: Couldn't convert %s", rowno, row)
|
|
log.debug("Row %d: Reason %s", rowno, e)
|
|
continue
|
|
|
|
# Make a dictionary or a tuple
|
|
if headers:
|
|
record = dict(zip(headers, row))
|
|
else:
|
|
record = tuple(row)
|
|
records.append(record)
|
|
|
|
return records
|