# Source code for mgcpy.independence_tests.dcorrx

import math
import warnings
from statistics import mean, stdev
from scipy.stats import norm, t

import numpy as np
from mgcpy.independence_tests.abstract_class import IndependenceTest
from mgcpy.independence_tests.dcorr import DCorr
from mgcpy.independence_tests.utils.compute_distance_matrix import compute_distance
from mgcpy.independence_tests.utils.distance_transform import transform_distance_matrix

[docs]class DCorrX(IndependenceTest):

def __init__(self, compute_distance_matrix=None, which_test='unbiased', max_lag=0):
'''
:param compute_distance_matrix: a function to compute the pairwise distance matrix, given a data matrix
:type compute_distance_matrix: FunctionType or callable()

:param which_test: the type of distance covariance estimate to use, can be 'unbiased', 'biased' 'mantel'
:type which_test: string

:param max_lag: Maximum lead/lag to check for dependence between X_t and Y_t+j (M parameter)
:type max_lag: int
'''
IndependenceTest.__init__(self)
if which_test not in ['unbiased', 'biased']:
raise ValueError('which_test must be unbiased or biased.')
self.which_test = which_test
self.dcorr = DCorr(which_test = self.which_test)
self.max_lag = max_lag

[docs]    def test_statistic(self, matrix_X, matrix_Y, p = None):
"""
Computes the (summed across lags) cross distance covariance estimate between two time series.

:param matrix_X: is interpreted as either:

- a [n*n] distance matrix, a square matrix with zeros on diagonal for n samples OR
- a [n*p] data matrix, a matrix with n samples in p dimensions
:type matrix_X: 2D numpy.array

:param matrix_Y: is interpreted as either:

- a [n*n] distance matrix, a square matrix with zeros on diagonal for n samples OR
- a [n*q] data matrix, a matrix with n samples in q dimensions
:type matrix_Y: 2D numpy.array

:param p: bandwidth parameter for Bartlett Kernel.
:type p: float

:return: returns a list of two items, that contains:

- :test_statistic: the sample cdcv statistic (not necessarily within [-1,1])
- :test_statistic_metadata: a dict of metadata with the following keys:
- :dist_mtx_X: the distance matrix of sample X
- :dist_mtx_Y: the distance matrix of sample X
:rtype: list

**Example:**

>>> import numpy as np
>>> from mgcpy.independence_tests.dcorr import DCorr
>>>
>>> X = np.array([0.07487683, -0.18073412, 0.37266440, 0.06074847, 0.76899045,
...           0.51862516, -0.13480764, -0.54368083, -0.73812644, 0.54910974]).reshape(-1, 1)
>>> Y = np.array([-1.31741173, -0.41634224, 2.24021815, 0.88317196, 2.00149312,
...           1.35857623, -0.06729464, 0.16168344, -0.61048226, 0.41711113]).reshape(-1, 1)
>>> cdcv = CDCV(which_test = 'unbiased')
>>> cdcv_statistic = cdcv.test_statistic(X, Y)
"""
assert matrix_X.shape == matrix_Y.shape, "Matrices X and Y need to be of dimensions [n, p] and [n, q], respectively, where p can be different from q"
if self.which_test == "unbiased" and matrix_X.shape <= 3:
raise ValueError('Cannot use unbiased estimator of distance covariance with n <= 3.')

# Represent univariate data as matrices.
# Use the matrix shape and diagonal elements to determine if the given data is a distance matrix or not.
n = matrix_X.shape
if len(matrix_X.shape) == 1:
matrix_X = matrix_X.reshape((n,1))
if len(matrix_Y.shape) == 1:
matrix_Y = matrix_Y.reshape((n,1))
matrix_X, matrix_Y = compute_distance(matrix_X, matrix_Y, self.compute_distance_matrix)

M = self.max_lag if self.max_lag is not None else math.ceil(math.sqrt(n))
dcorr = self.dcorr

# Collect the test statistic by lag, and sum them for the full test statistic.
dependence_by_lag = np.zeros(M+1)
dcorr_statistic, _ = dcorr.test_statistic(matrix_X, matrix_Y)
dependence_by_lag = np.maximum(0.0, dcorr_statistic)

# TO DO: parallelize?
for j in range(1, M+1):
dist_mtx_X = matrix_X[j:n,j:n]
dist_mtx_Y = matrix_Y[0:(n-j),0:(n-j)]
dcorr_statistic, _ = dcorr.test_statistic(dist_mtx_X, dist_mtx_Y)
dependence_by_lag[j] = (n-j)*np.maximum(0.0, dcorr_statistic) / n

# Reporting optimal lag
optimal_lag = np.argmax(dependence_by_lag)
test_statistic_metadata = { 'optimal_lag' : optimal_lag, 'dependence_by_lag' : dependence_by_lag }
self.test_statistic_ = np.sum(dependence_by_lag)

[docs]    def p_value(self, matrix_X, matrix_Y, replication_factor=1000):
'''
Compute the p-value
if the correlation test is unbiased, p-value can be computed using a t test
otherwise computed using permutation test

:param matrix_X: is interpreted as either:

- a [n*n] distance matrix, a square matrix with zeros on diagonal for n samples OR
- a [n*d] data matrix, a matrix with n samples in p dimensions
:type matrix_X: 2D numpy.array

:param matrix_Y: is interpreted as either:

- a [n*n] distance matrix, a square matrix with zeros on diagonal for n samples OR
- a [n*d] data matrix, a matrix with n samples in q dimensions
:type matrix_Y: 2D numpy.array

:param replication_factor: specifies the number of replications to use for
the permutation test. Defaults to 1000.
:type replication_factor: integer

:return: p-value of distance correlation
:rtype: numpy.float
:return: returns a list of two items, that contains:

- :p_value: ta numpy.float containing the p-value of the observed test statistic.
- :p_value_metadata: a dict of metadata with the following keys:
- :null_distribution: the estimated (discrete) distribution of the test statistic
:rtype: list

**Example:**

>>> import numpy as np
>>> from mgcpy.independence_tests.dcorr import DCorr
>>>
>>> X = np.array([0.07487683, -0.18073412, 0.37266440, 0.06074847, 0.76899045,
...           0.51862516, -0.13480764, -0.54368083, -0.73812644, 0.54910974]).reshape(-1, 1)
>>> Y = np.array([-1.31741173, -0.41634224, 2.24021815, 0.88317196, 2.00149312,
...           1.35857623, -0.06729464, 0.16168344, -0.61048226, 0.41711113]).reshape(-1, 1)
>>> cdcv = CDCV()
>>> p_value, metadata = dcorr.p_value(X, Y, replication_factor = 100)
'''
return super(DCorrX, self).p_value_block(matrix_X, matrix_Y, replication_factor)