Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -2,3 +2,4 @@
dist
*.pyc
*.egg-info
*.iml
1 change: 1 addition & 0 deletions .travis.yml
Original file line number Diff line number Diff line change
Expand Up @@ -12,5 +12,6 @@ python:

install:
- pip install .
- pip install python-dateutil

script: python setup.py test
4 changes: 4 additions & 0 deletions README.rst
Original file line number Diff line number Diff line change
@@ -1,6 +1,10 @@
csv2es
=========================

--------
IMPORTANT: This is a FORK of the original project. See this link for issues this fork addresses: https://github.com/rholder/csv2es/pulls/bitsofinfo
--------

.. image:: https://img.shields.io/pypi/v/csv2es.svg
:target: https://pypi.python.org/pypi/csv2es

Expand Down
67 changes: 56 additions & 11 deletions csv2es.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,8 @@
import csv
import json
import sys
import calendar

from threading import local

import click
Expand All @@ -24,7 +26,9 @@
from pyelasticsearch import ElasticHttpNotFoundError
from pyelasticsearch import IndexAlreadyExistsError
from retrying import retry

from dateutil import parser
from datetime import timedelta
from pprint import pprint

__version__ = '1.0.1'
thread_local = local()
Expand All @@ -41,7 +45,7 @@ def echo(message, quiet):
click.echo(message)


def documents_from_file(es, filename, delimiter, quiet):
def documents_from_file(es, filename, delimiter, quiet, csv_clean_fieldnames, csv_date_field, csv_date_field_gmt_offset, tags):
"""
Return a generator for pulling rows from a given delimited file.

Expand All @@ -52,16 +56,44 @@ def documents_from_file(es, filename, delimiter, quiet):
:return: generator returning document-indexing operations
"""
def all_docs():
with open(filename, 'rb') if filename != '-' else sys.stdin as doc_file:
with open(filename, 'r') if filename != '-' else sys.stdin as doc_file:
# delimited file should include the field names as the first row
fieldnames = doc_file.next().strip().split(delimiter)
fieldnames = doc_file.readline().strip().split(delimiter)
echo('Using the following ' + str(len(fieldnames)) + ' fields:', quiet)
for fieldname in fieldnames:
echo(fieldname, quiet)

reader = csv.DictReader(doc_file, delimiter=delimiter, fieldnames=fieldnames)
count = 0
for row in reader:

# the row from DictReader needs to be cleaned
if csv_clean_fieldnames:
cleaned_row = {}

# strip all double quotes from keys and lower-cast
for k, v in row.iteritems():
cleaned_row[k.replace('"',"").lower()] = v

row = cleaned_row

# tags?
if tags:
kv_pairs = tags.split(",")
for kv_pair in kv_pairs:
kv = kv_pair.split("=")
row[kv[0]] = kv[1]

# parse csv_date_field into elasticsearch compatible epoch_millis
if csv_date_field:
date_val_str = row[csv_date_field]

if date_val_str:
date_obj = parser.parse(date_val_str.strip())
corrected_offset = (csv_date_field_gmt_offset * -1)
date_obj = date_obj + timedelta(hours=corrected_offset)
row[csv_date_field] = int(calendar.timegm(date_obj.timetuple())) * 1000

count += 1
if count % 10000 == 0:
echo('Sent documents: ' + str(count), quiet)
Expand Down Expand Up @@ -162,11 +194,23 @@ def sanitize_delimiter(delimiter, is_tab):
help='Parallel uploads to send at once, defaults to 1')
@click.option('--delete-index', is_flag=True, required=False,
help='Delete existing index if it exists')
@click.option('--existing-index', is_flag=True, required=False,
help='Don\'t create index.')
@click.option('--quiet', is_flag=True, required=False,
help='Minimize console output')
@click.option('--csv-clean-fieldnames', is_flag=True, required=False,
help='Strips double quotes and lower-cases all CSV header names for proper ElasticSearch fieldnames')
@click.option('--csv-date-field', required=False,
help='The CSV header name that represents a date string to parsed (via python-dateutil) into an ElasticSearch epoch_millis')
@click.option('--csv-date-field-gmt-offset', required=False, type=int,
help='The GMT offset for the csv-date-field (i.e. +/- N hours)')
@click.option('--tags', required=False,
help='Custom static key1=val1,key2=val2 pairs to tag all entries with')
@click.version_option(version=__version__, )
def cli(index_name, delete_index, mapping_file, doc_type, import_file,
delimiter, tab, host, docs_per_chunk, bytes_per_chunk, parallel, quiet):
delimiter, tab, host, docs_per_chunk, bytes_per_chunk, parallel, existing_index, quiet,
csv_clean_fieldnames,csv_date_field, csv_date_field_gmt_offset, tags):

"""
Bulk import a delimited file into a target Elasticsearch instance. Common
delimited files include things like CSV and TSV.
Expand All @@ -193,11 +237,12 @@ def cli(index_name, delete_index, mapping_file, doc_type, import_file,
except ElasticHttpNotFoundError:
echo('Index ' + index_name + ' not found, nothing to delete', quiet)

try:
es.create_index(index_name)
echo('Created new index: ' + index_name, quiet)
except IndexAlreadyExistsError:
echo('Index ' + index_name + ' already exists', quiet)
if not existing_index:
try:
es.create_index(index_name)
echo('Created new index: ' + index_name, quiet)
except IndexAlreadyExistsError:
echo('Index ' + index_name + ' already exists', quiet)

echo('Using document type: ' + doc_type, quiet)
if mapping_file:
Expand All @@ -207,7 +252,7 @@ def cli(index_name, delete_index, mapping_file, doc_type, import_file,
es.put_mapping(index_name, doc_type, mapping)

target_delimiter = sanitize_delimiter(delimiter, tab)
documents = documents_from_file(es, import_file, target_delimiter, quiet)
documents = documents_from_file(es, import_file, target_delimiter, quiet, csv_clean_fieldnames, csv_date_field, csv_date_field_gmt_offset, tags)
perform_bulk_index(host, index_name, doc_type, documents, docs_per_chunk, bytes_per_chunk, parallel)


Expand Down