-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathcsv2sqlite3.py
More file actions
executable file
·98 lines (76 loc) · 3.13 KB
/
csv2sqlite3.py
File metadata and controls
executable file
·98 lines (76 loc) · 3.13 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
#!/usr/bin/python
import os, sys
import csv, sqlite3, argparse
def convert(csvpath, dbpath=None, tablename=None, sqlpath=None, guessdatatypes=True):
# /path/file.csv -> /path/file.db
if not dbpath:
dbpath = '%s.db' % os.path.splitext(csvpath)[0]
# /path/file.csv -> file
if not tablename:
tablename = os.path.basename(os.path.splitext(csvpath)[0])
# /path/file.csv -> /path/tablename.sql
if not sqlpath:
sqlpath = os.path.join(os.path.dirname(csvpath), '%s.sql' % tablename)
with open(csvpath, 'rb') as f:
# sample the data, then rewind
sample = f.read(4096)
f.seek(0)
# sniff the sample data to guess the delimeters, etc
dialect = csv.Sniffer().sniff(sample)
has_header = csv.Sniffer().has_header(sample)
# column names are either in the first (header) row or c0, c1, c2 if there is not header row
fieldnames = [x if has_header else 'c%s'%i for i,x in enumerate(csv.reader(f, dialect=dialect).next())]
f.seek(0)
# prepare the reader and skip the header row
r = csv.reader(f, dialect=dialect)
if has_header:
r.next()
# guess the fieldtypes (though, without a `max` parameter, everything will be `TEXT`)
fieldtypes = guess_datatypes(r, 1000) if guessdatatypes else ["TEXT"] * len(fieldnames)
f.seek(0)
# skip the header row
if has_header:
r.next()
# write the SQL file if it doesn't exist
if not os.path.exists(sqlpath):
with open(sqlpath, 'wb') as w:
w.write('CREATE TABLE IF NOT EXISTS `%s` (%s);' % (tablename, ','.join(['\n\t`%s`\t%s' % (n, t) for (n, t) in zip(fieldnames, fieldtypes)]) + '\n'))
# connect to database
with sqlite3.connect(dbpath) as conn:
conn.text_factory = str
c = conn.cursor()
# conditional create
sql_create = open(sqlpath, 'r').read()
c.execute(sql_create)
# insert csv values
sql_insert = 'INSERT INTO `%s` VALUES (%s);' % (tablename, ','.join(['?']*len(fieldnames)))
for row in r:
c.execute(sql_insert, [x if len(x)>0 else None for x in row] if guessdatatypes else row)
def guess_datatypes(csvreader, max=100):
types = []
for r, row in enumerate(csvreader):
if len(types) == 0:
types.extend([[int, long, float, str]] * len(row))
if r >= max:
break
for c, cell in enumerate(row):
types[c] = [x for x in types[c] if try_parse(cell, x)]
conversion = {int:"INTEGER",long:"INTEGER",float:"REAL",str:"TEXT"}
return [conversion[x[0]] for x in types]
def try_parse(str, typ):
if len(str) == 0:
return True
try:
typ(str)
return True
except:
return False
if __name__ == '__main__':
parser = argparse.ArgumentParser(description='Converts a CSV file to a SQLite3 database')
parser.add_argument("csv_file", help="path to CSV file")
parser.add_argument('-d', '--db_file', help='path to SQLite3 database file')
parser.add_argument('-t', '--table_name', help='name of the table')
parser.add_argument('-s', '--sql_create', help='path to CREATE TABLE .sql file')
parser.add_argument('-n', '--naive_datatypes', action="store_true", default=False, help='don''t guess datatypes (everything is TEXT, no NULLs)')
args = parser.parse_args()
convert(args.csv_file, args.db_file, args.table_name, args.sql_create, not args.naive_datatypes)