Source code for galpy.df.sphericaldf

# Superclass for spherical distribution functions, contains
#   - sphericaldf: superclass of all spherical DFs
#   - isotropicsphericaldf: superclass of all isotropic spherical DFs
#   - anisotropicsphericaldf: superclass of all anisotropic spherical DFs
#
# To implement a new DF do something like:
#   - Inherit from isotropicsphericaldf for an isotropic DF and implement
#     fE(self,E) which returns the DF as a function of E (see kingdf), then
#     you should be set! You may also have to implement _vmax_at_r(self,pot,r)
#     when the maximum velocity at a given position is less than the escape
#     velocity
#   - Inherit from anisotropicsphericaldf for an anisotropic DF, then you need
#     to implement a bunch of functions:
#       * _call_internal(self,*args,**kwargs): which returns the DF as a
#                                              function of (E,L,Lz)
#       * _sample_eta(self,r,n=1): to sample the velocity angle at r
#       * _p_v_at_r(self,v,r): whcih returns p(v|r)
#     constantbetadf is an example of this
#
import warnings
import numpy
import scipy.interpolate
from scipy import integrate, special
from .df import df
from ..potential import mass
from ..potential.Potential import _evaluatePotentials
from ..potential.SCFPotential import _xiToR, _RToxi
from ..orbit import Orbit
from ..util import conversion, galpyWarning
from ..util.conversion import physical_conversion
if conversion._APY_LOADED:
    from astropy import units

class sphericaldf(df):
    """Superclass for spherical distribution functions"""
    def __init__(self,pot=None,denspot=None,rmax=None,
                 scale=None,ro=None,vo=None):
        """
        NAME:

            __init__

        PURPOSE:

            Initializes a spherical DF

        INPUT:

           pot= (None) Potential instance or list thereof

           denspot= (None) Potential instance or list thereof that represent the density of the tracers (assumed to be spherical; if None, set equal to pot)

           rmax= (None) maximum radius to consider (can be Quantity); DF is cut off at E = Phi(rmax)

           scale= (None) length-scale parameter to be used internally

           ro= ,vo= galpy unit parameters

        OUTPUT:

            None

        HISTORY:

            2020-07-22 - Written - Lane (UofT)

        """
        df.__init__(self,ro=ro,vo=vo)
        if not conversion.physical_compatible(self,pot):
            raise RuntimeError("Unit-conversion parameters of input potential incompatible with those of the DF instance")
        phys= conversion.get_physical(pot,include_set=True)
        # if pot has physical units, transfer them (if already on, we know
        # they are compaible)
        if phys['roSet'] and phys['voSet']:
            self.turn_physical_on(ro=phys['ro'],vo=phys['vo'])
        if pot is None: # pragma: no cover
            raise IOError("pot= must be set")
        self._pot= pot
        self._denspot= self._pot if denspot is None else denspot
        if not conversion.physical_compatible(self._pot,self._denspot):
            raise RuntimeError("Unit-conversion parameters of input potential incompatible with those of the density potential")
        self._rmax= numpy.inf if rmax is None \
            else conversion.parse_length(rmax,ro=self._ro)            
        try:
            self._scale= pot._scale
        except AttributeError:
            try: self._scale= pot[0]._scale
            except (TypeError,AttributeError):
                self._scale= conversion.parse_length(scale,ro=self._ro) \
                    if scale is not None else 1.

############################## EVALUATING THE DF###############################
[docs] @physical_conversion('phasespacedensity',pop=True) def __call__(self,*args,**kwargs): """ NAME: __call__ PURPOSE: return the DF INPUT: Either: a) (E,L,Lz): tuple of E and (optionally) L and (optionally) Lz. Each may be Quantity b) R,vR,vT,z,vz,phi: c) Orbit instance: orbit.Orbit instance and if specific time then orbit.Orbit(t) OUTPUT: Value of DF HISTORY: 2020-07-22 - Written - Lane (UofT) """ # Get E,L,Lz if len(args) == 1: if not isinstance(args[0],Orbit): # Assume tuple (E,L,Lz) E,L,Lz= (args[0]+(None,None))[:3] else: # Orbit E= args[0].E(pot=self._pot,use_physical=False) L= numpy.sqrt(numpy.sum(args[0].L(use_physical=False)**2.)) Lz= args[0].Lz(use_physical=False) E= numpy.atleast_1d(conversion.parse_energy(E,vo=self._vo)) L= numpy.atleast_1d(conversion.parse_angmom(L,ro=self._ro, vo=self._vo)) Lz= numpy.atleast_1d(conversion.parse_angmom(Lz,ro=self._vo, vo=self._vo)) else: # Assume R,vR,vT,z,vz,(phi) R,vR,vT,z,vz, phi= (args+(None,))[:6] R= conversion.parse_length(R,ro=self._ro) vR= conversion.parse_velocity(vR,vo=self._vo) vT= conversion.parse_velocity(vT,vo=self._vo) z= conversion.parse_length(z,ro=self._ro) vz= conversion.parse_velocity(vz,vo=self._vo) vtotSq= vR**2.+vT**2.+vz**2. E= numpy.atleast_1d(0.5*vtotSq+_evaluatePotentials(self._pot,R,z)) Lz= numpy.atleast_1d(R*vT) r= numpy.sqrt(R**2.+z**2.) vrad= (R*vR+z*vz)/r L= numpy.atleast_1d(numpy.sqrt(vtotSq-vrad**2.)*r) return self._call_internal(E,L,Lz) # Some function for each sub-class
[docs] def vmomentdensity(self,r,n,m,**kwargs): """ NAME: vmomentdensity PURPOSE: calculate an arbitrary moment of the velocity distribution at r times the density INPUT: r - spherical radius at which to calculate the moment n - vr^n, where vr = v x cos eta m - vt^m, where vt = v x sin eta OUTPUT: <vr^n vt^m x density> at r HISTORY: 2020-09-04 - Written - Bovy (UofT) """ r= conversion.parse_length(r,ro=self._ro) use_physical= kwargs.pop('use_physical',True) ro= kwargs.pop('ro',None) if ro is None and hasattr(self,'_roSet') and self._roSet: ro= self._ro ro= conversion.parse_length_kpc(ro) vo= kwargs.pop('vo',None) if vo is None and hasattr(self,'_voSet') and self._voSet: vo= self._vo vo= conversion.parse_velocity_kms(vo) if use_physical and not vo is None and not ro is None: fac= vo**(n+m)/ro**3 if conversion._APY_UNITS: u= 1/units.kpc**3*(units.km/units.s)**(n+m) out= self._vmomentdensity(r,n,m) if conversion._APY_UNITS: return units.Quantity(out*fac,unit=u) else: return out*fac else: return self._vmomentdensity(r,n,m)
def _vmomentdensity(self,r,n,m): return 2.*numpy.pi\ *integrate.dblquad(lambda eta,v: v**(2.+m+n) *numpy.sin(eta)**(1+m)*numpy.cos(eta)**n *self(r,v*numpy.cos(eta),v*numpy.sin(eta),0.,0., use_physical=False), 0.,self._vmax_at_r(self._pot,r), lambda x: 0.,lambda x: numpy.pi)[0]
[docs] @physical_conversion('velocity',pop=True) def sigmar(self,r): """ NAME: sigmar PURPOSE: calculate the radial velocity dispersion at radius r INPUT: r - spherical radius at which to calculate the radial velocity dispersion OUTPUT: sigma_r(r) HISTORY: 2020-09-04 - Written - Bovy (UofT) """ r= conversion.parse_length(r,ro=self._ro) return numpy.sqrt(self._vmomentdensity(r,2,0) /self._vmomentdensity(r,0,0))
[docs] @physical_conversion('velocity',pop=True) def sigmat(self,r): """ NAME: sigmar PURPOSE: calculate the tangential velocity dispersion at radius r INPUT: r - spherical radius at which to calculate the tangential velocity dispersion OUTPUT: sigma_t(r) HISTORY: 2020-09-04 - Written - Bovy (UofT) """ r= conversion.parse_length(r,ro=self._ro) return numpy.sqrt(self._vmomentdensity(r,0,2) /self._vmomentdensity(r,0,0))
[docs] def beta(self,r): """ NAME: sigmar PURPOSE: calculate the anisotropy at radius r INPUT: r - spherical radius at which to calculate the anisotropy OUTPUT: beta(r) HISTORY: 2020-09-04 - Written - Bovy (UofT) """ r= conversion.parse_length(r,ro=self._ro) return 1.-self._vmomentdensity(r,0,2)/2./self._vmomentdensity(r,2,0)
############################### SAMPLING THE DF################################
[docs] def sample(self,R=None,z=None,phi=None,n=1,return_orbit=True,rmin=0.): """ NAME: sample PURPOSE: Return full 6D samples of the DF INPUT: R= cylindrical radius at which to generate samples (can be Quantity) z= height at which to generate samples (can be Quantity) phi= azimuth at which to generate samples (can be Quantity) n= number of samples to generate rmin= (0.) only sample r > rmin (can be Quantity) OPTIONAL INPUT: return_orbit= (True) If True output is orbit.Orbit object, if False output is (R,vR,vT,z,vz,phi) OUTPUT: List of samples. Either vector (R,vR,vT,z,vz,phi) or orbit.Orbit NOTES: If R,z,phi are None then sample positions with CMF. If R,z,phi are floats then sample n velocities at location. If array then sample velocities at radii, ignoring n. phi can be None if R,z are set by any above mechanism, will then sample phi for output. HISTORY: 2020-07-22 - Written - Lane (UofT) """ if hasattr(self,'_rmin_sampling') and rmin != self._rmin_sampling: # Build new grids, easiest if hasattr(self,'_xi_cmf_interpolator'): delattr(self,'_xi_cmf_interpolator') if hasattr(self,'_v_vesc_pvr_interpolator'): delattr(self,'_v_vesc_pvr_interpolator') self._rmin_sampling= conversion.parse_length(rmin,ro=self._ro) if R is None or z is None: # Full 6D samples r= self._sample_r(n=n) phi,theta= self._sample_position_angles(n=n) R= r*numpy.sin(theta) z= r*numpy.cos(theta) else: # 3D velocity samples R= conversion.parse_length(R,ro=self._ro) z= conversion.parse_length(z,ro=self._ro) if isinstance(R,numpy.ndarray): assert len(R) == len(z), \ """When R= is set to an array, z= needs to be set to """\ """an equal-length array""" n= len(R) else: R= R*numpy.ones(n) z= z*numpy.ones(n) r= numpy.sqrt(R**2.+z**2.) theta= numpy.arctan2(R,z) if phi is None: # Otherwise assume phi input type matches R,z phi,_= self._sample_position_angles(n=n) else: phi= conversion.parse_angle(phi) phi= phi*numpy.ones(n) \ if not hasattr(phi,'__len__') or len(phi) < n \ else phi eta,psi= self._sample_velocity_angles(r,n=n) v= self._sample_v(r,eta,n=n) vr= v*numpy.cos(eta) vtheta= v*numpy.sin(eta)*numpy.cos(psi) vT= v*numpy.sin(eta)*numpy.sin(psi) vR= vr*numpy.sin(theta) + vtheta*numpy.cos(theta) vz= vr*numpy.cos(theta) - vtheta*numpy.sin(theta) if return_orbit: o= Orbit(vxvv=numpy.array([R,vR,vT,z,vz,phi]).T) if self._roSet and self._voSet: o.turn_physical_on(ro=self._ro,vo=self._vo) return o else: return (R,vR,vT,z,vz,phi)
def _sample_r(self,n=1): """Generate radial position samples from potential Note - the function interpolates the normalized CMF onto the variable xi defined as: .. math:: \\xi = \\frac{r/a-1}{r/a+1} so that xi is in the range [-1,1], which corresponds to an r range of [0,infinity)""" rand_mass_frac= numpy.random.uniform(size=n) if hasattr(self,'_icmf'): r_samples= self._icmf(rand_mass_frac) else: if not hasattr(self,'_xi_cmf_interpolator'): self._xi_cmf_interpolator=\ self._make_cmf_interpolator() xi_samples= self._xi_cmf_interpolator(rand_mass_frac) r_samples= _xiToR(xi_samples,a=self._scale) return r_samples def _make_cmf_interpolator(self): """Create the interpolator object for calculating radii from the CMF Note - must use self.xi_to_r() on any output of interpolator Note - the function interpolates the normalized CMF onto the variable xi defined as: .. math:: \\xi = \\frac{r-1}{r+1} so that xi is in the range [-1,1], which corresponds to an r range of [0,infinity)""" ximin= _RToxi(self._rmin_sampling,a=self._scale) ximax= _RToxi(self._rmax,a=self._scale) xis= numpy.arange(ximin,ximax,1e-4) rs= _xiToR(xis,a=self._scale) # try/except necessary when mass doesn't take arrays, also need to # switch to a more general mass method at some point... #try: ms= mass(self._denspot,rs,use_physical=False) #except ValueError: # ms= numpy.array([mass(self._denspot,r,use_physical=False) for r in rs]) mnorm= mass(self._denspot,self._rmax,use_physical=False) if self._rmin_sampling > 0: ms-= mass(self._denspot,self._rmin_sampling) mnorm-= mass(self._denspot,self._rmin_sampling) ms/= mnorm # Add total mass point if numpy.isinf(self._rmax): xis= numpy.append(xis,1) ms= numpy.append(ms,1) return scipy.interpolate.InterpolatedUnivariateSpline(ms,xis,k=3) def _sample_position_angles(self,n=1): """Generate spherical angle samples""" phi_samples= numpy.random.uniform(size=n)*2*numpy.pi theta_samples= numpy.arccos(1.-2*numpy.random.uniform(size=n)) return phi_samples,theta_samples def _sample_v(self,r,eta,n=1): """Generate velocity samples: typically the total velocity, but not for OM""" if not hasattr(self,'_v_vesc_pvr_interpolator'): self._v_vesc_pvr_interpolator= self._make_pvr_interpolator() return self._v_vesc_pvr_interpolator(\ numpy.log10(r/self._scale),numpy.random.uniform(size=n), grid=False)*self._vmax_at_r(self._pot,r) def _sample_velocity_angles(self,r,n=1): """Generate samples of angles that set radial vs tangential velocities""" eta_samples= self._sample_eta(r,n) psi_samples= numpy.random.uniform(size=n)*2*numpy.pi return eta_samples,psi_samples def _vmax_at_r(self,pot,r,**kwargs): """Function that gives the max velocity in the DF at r; typically equal to vesc, but not necessarily for finite systems such as King""" return numpy.sqrt(2.*(\ _evaluatePotentials(self._pot,self._rmax+1e-10,0) -_evaluatePotentials(self._pot,r,0.))) def _make_pvr_interpolator(self,r_a_start=-3,r_a_end=3, n_r_a=120,n_v_vesc=100): """ NAME: _make_pvr_interpolator PURPOSE: Calculate a grid of the velocity sampling function v^2*f(E) over many radii. The radii are fractional with respect to some scale radius which characteristically describes the size of the potential, and the velocities are fractional with respect to the escape velocity at each radius r. This information is saved in a 2D interpolator which represents the inverse cumulative distribution at many radii. This allows for sampling of v/vesc given an input r/a INPUT: r_a_start= radius grid start location in units of log10(r/a) r_a_end= radius grid end location in units of log10(r/a) n_r_a= number of radius grid points to use n_v_vesc= number of velocity grid points to use OUTPUT: None (But sets self._v_vesc_pvr_interpolator) HISTORY: Written 2020-07-24 - James Lane (UofT) """ # Make an array of r/a by v/vesc and then calculate p(v|r) r_a_start= numpy.amax([\ numpy.log10((self._rmin_sampling+1e-8)/self._scale), r_a_start]) r_a_end= numpy.amin([numpy.log10((self._rmax-1e-8)/self._scale), r_a_end]) r_a_values= 10.**numpy.linspace(r_a_start,r_a_end,n_r_a) v_vesc_values= numpy.linspace(0,1,n_v_vesc) r_a_grid, v_vesc_grid= numpy.meshgrid(r_a_values,v_vesc_values) vesc_grid= self._vmax_at_r(self._pot,r_a_grid*self._scale) r_grid= r_a_grid*self._scale vr_grid= v_vesc_grid*vesc_grid # Calculate p(v|r) and normalize pvr_grid= self._p_v_at_r(vr_grid,r_grid) pvr_grid_cml= numpy.cumsum(pvr_grid,axis=0) pvr_grid_cml_norm= pvr_grid_cml\ /numpy.repeat(pvr_grid_cml[-1,:][:,numpy.newaxis],pvr_grid_cml.shape[0],axis=1).T # Construct the inverse cumulative distribution on a regular grid n_new_pvr= 100 # Must be multiple of r_a_grid.shape[0] icdf_pvr_grid_reg= numpy.zeros((n_new_pvr,len(r_a_values))) icdf_v_vesc_grid_reg= numpy.zeros((n_new_pvr,len(r_a_values))) for i in range(pvr_grid_cml_norm.shape[1]): cml_pvr= pvr_grid_cml_norm[:,i] if numpy.any(cml_pvr < 0): warnings.warn("The DF appears to have negative regions; we'll try to ignore these for sampling the DF, but this may adversely affect the generated samples. Proceed with care!",galpyWarning) cml_pvr[cml_pvr < 0]= 0. start_indx= numpy.amax(numpy.arange(len(cml_pvr))[cml_pvr == numpy.amin(cml_pvr)]) end_indx= numpy.amin(numpy.arange(len(cml_pvr))[cml_pvr == numpy.amax(cml_pvr)])+1 cml_pvr_inv_interp= scipy.interpolate.InterpolatedUnivariateSpline( cml_pvr[start_indx:end_indx], v_vesc_values[start_indx:end_indx],k=1) pvr_samples_reg= numpy.linspace(0,1,n_new_pvr) v_vesc_samples_reg= cml_pvr_inv_interp(pvr_samples_reg) icdf_pvr_grid_reg[:,i]= pvr_samples_reg icdf_v_vesc_grid_reg[:,i]= v_vesc_samples_reg # Create the interpolator return scipy.interpolate.RectBivariateSpline( numpy.log10(r_a_grid[0,:]), icdf_pvr_grid_reg[:,0], icdf_v_vesc_grid_reg.T,kx=1,ky=1) class isotropicsphericaldf(sphericaldf): """Superclass for isotropic spherical distribution functions""" def __init__(self,pot=None,denspot=None,rmax=None, scale=None,ro=None,vo=None): """ NAME: __init__ PURPOSE: Initialize an isotropic distribution function INPUT: pot= (None) Potential instance or list thereof denspot= (None) Potential instance or list thereof that represent the density of the tracers (assumed to be spherical; if None, set equal to pot) rmax= (None) maximum radius to consider (can be Quantity); DF is cut off at E = Phi(rmax) scale= scale parameter to be used internally ro=, vo= galpy unit parameters OUTPUT: None HISTORY: 2020-09-02 - Written - Bovy (UofT) """ sphericaldf.__init__(self,pot=pot,denspot=denspot,rmax=rmax, scale=scale,ro=ro,vo=vo) def _call_internal(self,*args): """ NAME: _call_internal PURPOSE Calculate the distribution function for an isotropic DF INPUT: E,L,Lz - The energy, angular momemtum magnitude, and its z component (only E is used) OUTPUT: f(x,v) = f(E[x,v]) HISTORY: 2020-07 - Written - Lane (UofT) """ return self.fE(args[0]) def _vmomentdensity(self,r,n,m): if m%2 == 1 or n%2 == 1: return 0. return 2.*numpy.pi\ *integrate.quad(lambda v: v**(2.+m+n)* self.fE(_evaluatePotentials(self._pot,r,0) +0.5*v**2.), 0.,self._vmax_at_r(self._pot,r))[0]\ *special.gamma(m//2+1)*special.gamma(n//2+0.5)\ /special.gamma(m//2+n//2+1.5) def _sample_eta(self,r,n=1): """Sample the angle eta which defines radial vs tangential velocities""" return numpy.arccos(1.-2.*numpy.random.uniform(size=n)) def _p_v_at_r(self,v,r): if hasattr(self,'_fE_interp'): return self._fE_interp(_evaluatePotentials(self._pot,r,0)\ +0.5*v**2.)*v**2. else: return self.fE(_evaluatePotentials(self._pot,r,0)\ +0.5*v**2.)*v**2. class anisotropicsphericaldf(sphericaldf): """Superclass for anisotropic spherical distribution functions""" def __init__(self,pot=None,denspot=None,rmax=None, scale=None,ro=None,vo=None): """ NAME: __init__ PURPOSE: Initialize an anisotropic distribution function INPUT: pot= (None) Potential instance or list thereof denspot= (None) Potential instance or list thereof that represent the density of the tracers (assumed to be spherical; if None, set equal to pot) rmax= (None) maximum radius to consider (can be Quantity); DF is cut off at E = Phi(rmax) scale= (None) length-scale parameter to be used internally ro= ,vo= galpy unit parameters OUTPUT: None HISTORY: 2020-07-22 - Written - Lane (UofT) """ sphericaldf.__init__(self,pot=pot,denspot=denspot,rmax=rmax, scale=scale,ro=ro,vo=vo)