diff --git a/.gitignore b/.gitignore index 6e29259e2..64443ce7c 100644 --- a/.gitignore +++ b/.gitignore @@ -14,6 +14,9 @@ _templates *.pyc .cache/ +# venv +*_venv + # pip package metadata *egg-info/ diff --git a/dataset/table.py b/dataset/table.py index 4d9d9c786..705d1175a 100644 --- a/dataset/table.py +++ b/dataset/table.py @@ -1,5 +1,6 @@ from enum import Enum import logging +import io import pandas as pd @@ -74,7 +75,26 @@ def __init__(self, name, src, na_values=None, exclude_attr_cols=['_tid_'], def store_to_db(self, db_conn, if_exists='replace', index=False, index_label=None): # TODO: This version supports single session, single worker. - self.df.to_sql(self.name, db_conn, if_exists=if_exists, index=index, index_label=index_label) + # self.df.to_sql(self.name, db_conn, if_exists=if_exists, index=index, index_label=index_label) + print("started") + sep = "," + quotechar = "\"" + + # Create Table + self.df[:0].to_sql(self.name, db_conn, if_exists=if_exists, index=index, index_label=index_label) + + # Prepare data + output = io.StringIO() + self.df.to_csv(output, sep=sep, quotechar=quotechar, header=False, index=index) + output.seek(0) + + # Insert data + connection = db_conn.raw_connection() + cursor = connection.cursor() + cursor.copy_expert("copy {} from stdin (format csv)".format(self.name), output) + cursor.connection.commit() + cursor.close() + connection.close() def get_attributes(self): """ diff --git a/domain/correlations.py b/domain/correlations.py index b8929d83e..e1e9bd245 100644 --- a/domain/correlations.py +++ b/domain/correlations.py @@ -1,51 +1,79 @@ from pyitlib import discrete_random_variable as drv - +import time, logging from utils import NULL_REPR -def compute_norm_cond_entropy_corr(data_df, attrs_from, attrs_to): - """ - Computes the correlations between attributes by calculating - the normalized conditional entropy between them. The conditional - entropy is asymmetric, therefore we need pairwise computation. +from multiprocessing import Pool +from functools import partial +from tqdm import tqdm - The computed correlations are stored in a dictionary in the format: - { - attr_a: { cond_attr_i: corr_strength_a_i, - cond_attr_j: corr_strength_a_j, ... }, - attr_b: { cond_attr_i: corr_strength_b_i, ...} - } - - :return a dictionary of correlations - """ - corr = {} - # Compute pair-wise conditional entropy. - for x in attrs_from: - corr[x] = {} - for y in attrs_to: +def _compute_norm_cond_entropy_corr(x, attrs, df): + try: + corr = {} + for y in attrs: # Set correlation to 1 for same attributes. if x == y: - corr[x][y] = 1.0 + corr[y] = 1.0 continue - xy_df = data_df[[x, y]] + xy_df = df[[x, y]] xy_df = xy_df.loc[~(xy_df[x] == NULL_REPR) & ~(xy_df[y] == NULL_REPR)] x_vals = xy_df[x] x_domain_size = x_vals.nunique() # Set correlation to 0.0 if entropy of x is 1 (only one possible value). if x_domain_size == 1 or len(xy_df) == 0: - corr[x][y] = 0.0 + corr[y] = 0.0 continue - + # Compute the conditional entropy H(x|y) = H(x,y) - H(y). # H(x,y) denotes H(x U y). # If H(x|y) = 0, then y determines x, i.e., y -> x. # Use the domain size of x as a log base for normalization. y_vals = xy_df[y] - - x_y_entropy = drv.entropy_conditional(x_vals, y_vals, base=x_domain_size).item() + x_y_entropy = drv.entropy_conditional(x_vals, y_vals, base=x_domain_size) # The conditional entropy is 0 for strongly correlated attributes and 1 for # completely independent attributes. We reverse this to reflect the correlation. - corr[x][y] = 1.0 - x_y_entropy + # corrs.append((x, y, 1.0 - x_y_entropy)) + corr[y] = 1.0 - x_y_entropy + return (x,corr) + except: + logging.debug('Failed _compute_norm_cond_entropy_corr process: %s: %s' % (x, traceback.format_exc())) + return None + +def compute_norm_cond_entropy_corr(data_df, attrs_from, attrs_to, threads = 1): + """ + Computes the correlations between attributes by calculating + the normalized conditional entropy between them. The conditional + entropy is asymmetric, therefore we need pairwise computation. + + The computed correlations are stored in a dictionary in the format: + { + attr_a: { cond_attr_i: corr_strength_a_i, + cond_attr_j: corr_strength_a_j, ... }, + attr_b: { cond_attr_i: corr_strength_b_i, ...} + } + + :return a dictionary of correlations + """ + + tic = time.time() + # only create a pool if processes > 1 + pool = Pool(threads) if threads > 1 else None + corr = {} + + # Compute pair-wise conditional entropy. + f = partial(_compute_norm_cond_entropy_corr, attrs=attrs_to, df=data_df) + if pool is None: + for row in list(map(f,attrs_from)): + corr[row[0]] = row[1] + else: + for row in tqdm(pool.imap_unordered(f, attrs_from)): + corr[row[0]] = row[1] + + if pool is not None: + pool.terminate() + + toc = time.time() + logging.debug('Time taken in _compute_norm_cond_entropy_corr process: %.2f secs', toc-tic) return corr diff --git a/domain/domain.py b/domain/domain.py index f37bbff06..e66f8fda7 100644 --- a/domain/domain.py +++ b/domain/domain.py @@ -68,7 +68,7 @@ def compute_correlations(self): else self.ds.get_raw_data() self.correlations = compute_norm_cond_entropy_corr(data_df, self.ds.get_attributes(), - self.ds.get_attributes()) + self.ds.get_attributes(), self.env['threads']) corrs_df = pd.DataFrame.from_dict(self.correlations, orient='columns') corrs_df.index.name = 'cond_attr' corrs_df.columns.name = 'attr'