diff --git a/.gitignore b/.gitignore index 3010f49..aaeaa84 100644 --- a/.gitignore +++ b/.gitignore @@ -2,3 +2,4 @@ dist *.pyc *.egg-info +*.iml diff --git a/.travis.yml b/.travis.yml index ef07c4d..6c6e42d 100644 --- a/.travis.yml +++ b/.travis.yml @@ -12,5 +12,6 @@ python: install: - pip install . + - pip install python-dateutil script: python setup.py test diff --git a/README.rst b/README.rst index 5a747bc..92e5c43 100644 --- a/README.rst +++ b/README.rst @@ -1,6 +1,10 @@ csv2es ========================= +-------- +IMPORTANT: This is a FORK of the original project. See this link for issues this fork addresses: https://github.com/rholder/csv2es/pulls/bitsofinfo +-------- + .. image:: https://img.shields.io/pypi/v/csv2es.svg :target: https://pypi.python.org/pypi/csv2es diff --git a/csv2es.py b/csv2es.py index 642e3ba..3d11eaf 100644 --- a/csv2es.py +++ b/csv2es.py @@ -15,6 +15,8 @@ import csv import json import sys +import calendar + from threading import local import click @@ -24,7 +26,9 @@ from pyelasticsearch import ElasticHttpNotFoundError from pyelasticsearch import IndexAlreadyExistsError from retrying import retry - +from dateutil import parser +from datetime import timedelta +from pprint import pprint __version__ = '1.0.1' thread_local = local() @@ -41,7 +45,7 @@ def echo(message, quiet): click.echo(message) -def documents_from_file(es, filename, delimiter, quiet): +def documents_from_file(es, filename, delimiter, quiet, csv_clean_fieldnames, csv_date_field, csv_date_field_gmt_offset, tags): """ Return a generator for pulling rows from a given delimited file. @@ -52,9 +56,9 @@ def documents_from_file(es, filename, delimiter, quiet): :return: generator returning document-indexing operations """ def all_docs(): - with open(filename, 'rb') if filename != '-' else sys.stdin as doc_file: + with open(filename, 'r') if filename != '-' else sys.stdin as doc_file: # delimited file should include the field names as the first row - fieldnames = doc_file.next().strip().split(delimiter) + fieldnames = doc_file.readline().strip().split(delimiter) echo('Using the following ' + str(len(fieldnames)) + ' fields:', quiet) for fieldname in fieldnames: echo(fieldname, quiet) @@ -62,6 +66,34 @@ def all_docs(): reader = csv.DictReader(doc_file, delimiter=delimiter, fieldnames=fieldnames) count = 0 for row in reader: + + # the row from DictReader needs to be cleaned + if csv_clean_fieldnames: + cleaned_row = {} + + # strip all double quotes from keys and lower-cast + for k, v in row.iteritems(): + cleaned_row[k.replace('"',"").lower()] = v + + row = cleaned_row + + # tags? + if tags: + kv_pairs = tags.split(",") + for kv_pair in kv_pairs: + kv = kv_pair.split("=") + row[kv[0]] = kv[1] + + # parse csv_date_field into elasticsearch compatible epoch_millis + if csv_date_field: + date_val_str = row[csv_date_field] + + if date_val_str: + date_obj = parser.parse(date_val_str.strip()) + corrected_offset = (csv_date_field_gmt_offset * -1) + date_obj = date_obj + timedelta(hours=corrected_offset) + row[csv_date_field] = int(calendar.timegm(date_obj.timetuple())) * 1000 + count += 1 if count % 10000 == 0: echo('Sent documents: ' + str(count), quiet) @@ -162,11 +194,23 @@ def sanitize_delimiter(delimiter, is_tab): help='Parallel uploads to send at once, defaults to 1') @click.option('--delete-index', is_flag=True, required=False, help='Delete existing index if it exists') +@click.option('--existing-index', is_flag=True, required=False, + help='Don\'t create index.') @click.option('--quiet', is_flag=True, required=False, help='Minimize console output') +@click.option('--csv-clean-fieldnames', is_flag=True, required=False, + help='Strips double quotes and lower-cases all CSV header names for proper ElasticSearch fieldnames') +@click.option('--csv-date-field', required=False, + help='The CSV header name that represents a date string to parsed (via python-dateutil) into an ElasticSearch epoch_millis') +@click.option('--csv-date-field-gmt-offset', required=False, type=int, + help='The GMT offset for the csv-date-field (i.e. +/- N hours)') +@click.option('--tags', required=False, + help='Custom static key1=val1,key2=val2 pairs to tag all entries with') @click.version_option(version=__version__, ) def cli(index_name, delete_index, mapping_file, doc_type, import_file, - delimiter, tab, host, docs_per_chunk, bytes_per_chunk, parallel, quiet): + delimiter, tab, host, docs_per_chunk, bytes_per_chunk, parallel, existing_index, quiet, + csv_clean_fieldnames,csv_date_field, csv_date_field_gmt_offset, tags): + """ Bulk import a delimited file into a target Elasticsearch instance. Common delimited files include things like CSV and TSV. @@ -193,11 +237,12 @@ def cli(index_name, delete_index, mapping_file, doc_type, import_file, except ElasticHttpNotFoundError: echo('Index ' + index_name + ' not found, nothing to delete', quiet) - try: - es.create_index(index_name) - echo('Created new index: ' + index_name, quiet) - except IndexAlreadyExistsError: - echo('Index ' + index_name + ' already exists', quiet) + if not existing_index: + try: + es.create_index(index_name) + echo('Created new index: ' + index_name, quiet) + except IndexAlreadyExistsError: + echo('Index ' + index_name + ' already exists', quiet) echo('Using document type: ' + doc_type, quiet) if mapping_file: @@ -207,7 +252,7 @@ def cli(index_name, delete_index, mapping_file, doc_type, import_file, es.put_mapping(index_name, doc_type, mapping) target_delimiter = sanitize_delimiter(delimiter, tab) - documents = documents_from_file(es, import_file, target_delimiter, quiet) + documents = documents_from_file(es, import_file, target_delimiter, quiet, csv_clean_fieldnames, csv_date_field, csv_date_field_gmt_offset, tags) perform_bulk_index(host, index_name, doc_type, documents, docs_per_chunk, bytes_per_chunk, parallel)