"""
Calculate the autocorrelation function for an evenly sampled time-series with
missing data.
"""
import numpy as np
from scipy.ndimage import gaussian_filter1d
from scipy import signal
import warnings
__all__ = ["interpolated_acf", "autocorrelation", "interpolate_missing_data",
"dominant_period"]
class NoPeriodFoundWarning(Warning):
"""Warning for when no periods can be found"""
pass
class NonzeroMedianWarning(Warning):
"""Warning for when the fluxes haven't been normalized"""
pass
[docs]def interpolate_missing_data(times, fluxes, cadences=None):
"""
Assuming ``times`` are uniformly spaced with missing cadences,
fill in the missing cadences with linear interpolation.
Cadences can be passed if they are known.
Parameters
----------
times : numpy.ndarray
Incomplete but otherwise uniformly sampled times
fluxes : numpy.ndarray
Flux for each time in ``times``
cadences : numpy.ndarray, optional
Integer cadence number of each observation.
Returns
-------
interpolated_times : numpy.ndarray
``times`` with filled-in missing cadences
interpolated_fluxes : numpy.ndarray
``fluxes`` with filled-in missing cadences
"""
first_time = times[0]
if cadences is not None:
# Median time between cadences
dt = np.median(np.diff(times) / np.diff(cadences))
cadence_indices = cadences - cadences[0]
else:
# Find typical time between cadences:
dt = np.median(np.diff(times))
# Approximate the patchy grid of integer cadence indices,
# i.e.: (0, 1, 3, 4, 5, 8, ...)
cadence_indices = np.rint((times - first_time)/dt)
# Find missing cadence indices if that grid were complete
expected_cadence_indices = set(np.arange(cadence_indices.min(),
cadence_indices.max()))
missing_cadence_indices = expected_cadence_indices.difference(set(cadence_indices))
# Convert the missing cadences to times
missing_times = first_time + np.array(list(missing_cadence_indices))*dt
# Interpolate to find fluxes at missing times
interp_fluxes = np.interp(missing_times, times, fluxes)
# Combine the interpolated and input times, fluxes
interpolated_fluxes = np.concatenate([fluxes, interp_fluxes])
interpolated_times = np.concatenate([times, missing_times])
# Sort the times, fluxes, so that you can compute the ACF on them:
sort_by_time = np.argsort(interpolated_times)
interpolated_fluxes = interpolated_fluxes[sort_by_time]
interpolated_times = interpolated_times[sort_by_time]
return interpolated_times, interpolated_fluxes
[docs]def autocorrelation(x, scipy=True):
"""
Calculate the autocorrelation function of array ``x``.
"""
if scipy:
result = signal.fftconvolve(x, x[::-1])
return result[result.size//2:]
else:
result = np.correlate(x, x, mode='full')
return result[result.size//2:]
[docs]def interpolated_acf(times, fluxes, cadences=None):
"""
Calculate the autocorrelation function after interpolating over
missing times and fluxes.
Parameters
----------
times : numpy.ndarray
Incomplete but otherwise uniformly sampled times
fluxes : numpy.ndarray
Flux for each time in ``times``
cadences : numpy.ndarray, optional
Integer cadence number of each observation.
Return
------
lag : numpy.ndarray
Time lag
acf : numpy.ndarray
Autocorrelation function
"""
if not np.all(np.sort(times) == times):
raise ValueError("Arrays must be in chronological order to compute ACF")
if not np.abs(np.median(fluxes)/np.max(fluxes) < 0.01):
warnmessage = ("Have you normalized your fluxes so that their median is"
" near zero?")
warnings.warn(warnmessage, NonzeroMedianWarning)
# Interpolate over missing times, fluxes
interpolated_times, interpolated_fluxes = interpolate_missing_data(times,
fluxes,
cadences)
# Calculate the grid of "lags" in units of ``times``
dt = np.median(np.diff(interpolated_times))
lag = dt*np.arange(len(interpolated_fluxes))
# Compute the autocorrelation function on interpolated fluxes
acf = autocorrelation(interpolated_fluxes)
return lag, acf
[docs]def dominant_period(lag, acf, min=None, max=None, fwhm=18, window=56,
plot=False, quiet=False, n_peaks=1, return_amplitude=False):
"""
Find the dominant period in the smoothed autocorrelation function.
If no dominant period is found, raise `NoPeakWarning` and return `numpy.nan`
Parameters
----------
lag : numpy.ndarray
Time lags
acf : numpy.ndarray
Autocorrelation function
min : float (optional)
Return dominant period greater than ``min``. Default is no limit.
max : float (optional)
Return dominant period less than ``max``. Default is no limit.
fwhm : float (optional)
Full-width at half max [lags] of the gaussian smoothing kernel. Default
is 18 lags, as in McQuillan, Aigrain & Mazeh (2013) [1]_
window : float (optional)
Truncate the gaussian smoothing kernel after ``window`` lags. Default
is 56 lags, as in McQuillan, Aigrain & Mazeh (2013) [1]_
plot : bool (optional)
Plot the autocorrelation function, peak detected. Default is `False`.
quiet : bool (optional)
Don't raise warning if no period is found. Default is `False`.
n_peaks : int (optional)
Number of peaks in the ACF to return, default is one.
return_amplitude : bool (optional)
Return the amplitude of the ACF at the peaks if `True`.
Return
------
acf_period : float
Dominant period detected via the autocorrelation function
References
----------
.. [1] http://adsabs.harvard.edu/abs/2013MNRAS.432.1203M
"""
lag_limited = np.copy(lag)
acf_limited = np.copy(acf)
# Apply limits if any are input:
if min is not None and max is None:
acf_limited = acf_limited[lag_limited > min]
lag_limited = lag_limited[lag_limited > min]
elif max is not None and min is None:
acf_limited = acf_limited[lag_limited < max]
lag_limited = lag_limited[lag_limited < max]
elif max is not None and min is not None:
acf_limited = acf_limited[(lag_limited < max) & (lag_limited > min)]
lag_limited = lag_limited[(lag_limited < max) & (lag_limited > min)]
# Convert fwhm -> sigma, convolve with gaussian kernel
sigma = fwhm/2.355
truncate = window/sigma
smooth_acf = gaussian_filter1d(acf_limited, sigma, truncate=truncate)
# Detect peaks
relative_maxes = signal.argrelmax(smooth_acf)[0]
if len(relative_maxes) == 0:
if not quiet:
warnmsg = ("No period found. Be sure to check limits and to "
"median-subtract your fluxes.")
warnings.warn(warnmsg, NoPeriodFoundWarning)
return np.nan
if n_peaks == 1:
# Detect highest peak
absolute_max_index = relative_maxes[np.argmax(smooth_acf[relative_maxes])]
acf_period = lag_limited[absolute_max_index]
acf_amplitude = smooth_acf[absolute_max_index]
else:
first_n_peaks = relative_maxes[np.argsort(smooth_acf[relative_maxes])[::-1]][:n_peaks]
acf_period = lag_limited[first_n_peaks]
acf_amplitude = smooth_acf[first_n_peaks]
if plot:
import matplotlib.pyplot as plt
fig, ax = plt.subplots()
ax.plot(lag, acf/np.max(acf), label='ACF', color='k', lw=2)
ax.plot(lag_limited, smooth_acf/np.max(smooth_acf),
label='Smoothed ACF', ls='--', color='r')
if hasattr(acf_period, '__len__'):
ax.axvline(acf_period[0], ls='--', color='DodgerBlue',
label='Primary period')
else:
ax.axvline(acf_period, ls='--', color='DodgerBlue',
label='Primary period')
ax.legend()
ax.set_title('ACF')
ax.set_xlabel('Lag')
ax.spines['right'].set_visible(False)
ax.spines['top'].set_visible(False)
ax.grid(ls=':')
if return_amplitude:
return acf_period, acf_amplitude
return acf_period