Source code for galpy.potential.SnapshotRZPotential

from os import system
import hashlib
import numpy
from scipy import interpolate 
from .Potential import Potential
from .interpRZPotential import scalarVectorDecorator, \
    zsymDecorator, calc_2dsplinecoeffs_c, interpRZPotential
try: 
    import pynbody
    from pynbody import gravity
    from pynbody.units import NoUnit
except ImportError: #pragma: no cover
    _PYNBODY_LOADED= False
else:
    _PYNBODY_LOADED= True    
[docs]class SnapshotRZPotential(Potential): """Class that implements an axisymmetrized version of the potential of an N-body snapshot (requires `pynbody <http://pynbody.github.io>`__) `_evaluate`, `_Rforce`, and `_zforce` calculate a hash for the array of points that is passed in by the user. The hash and corresponding potential/force arrays are stored -- if a subsequent request matches a previously computed hash, the previous results are returned and not recalculated. """
[docs] def __init__(self, s, num_threads=None,nazimuths=4, ro=None,vo=None): """ NAME: __init__ PURPOSE: Initialize a SnapshotRZ potential object INPUT: s - a simulation snapshot loaded with pynbody num_threads= (4) number of threads to use for calculation nazimuths= (4) number of azimuths to average over ro=, vo= distance and velocity scales for translation into internal units (default from configuration file) OUTPUT: instance HISTORY: 2013 - Written - Rok Roskar (ETH) 2014-11-24 - Edited for merging into main galpy - Bovy (IAS) """ if not _PYNBODY_LOADED: #pragma: no cover raise ImportError("The SnapShotRZPotential class is designed to work with pynbody snapshots, which cannot be loaded (probably because it is not installed) -- obtain from pynbody.github.io") Potential.__init__(self,amp=1.0,ro=ro,vo=vo) self._s = s self._point_hash = {} if num_threads is None: self._num_threads= pynbody.config['number_of_threads'] else: self._num_threads = num_threads # Set up azimuthal averaging self._naz= nazimuths self._cosaz= numpy.cos(numpy.arange(self._naz,dtype='float')\ /self._naz*2.*numpy.pi) self._sinaz= numpy.sin(numpy.arange(self._naz,dtype='float')\ /self._naz*2.*numpy.pi) self._zones= numpy.ones(self._naz) self._zzeros= numpy.zeros(self._naz) return None
@scalarVectorDecorator def _evaluate(self, R,z,phi=None,t=None,dR=None,dphi=None) : pot, acc = self._setup_potential(R,z) return pot @scalarVectorDecorator def _Rforce(self, R,z,phi=None,t=None,dR=None,dphi=None) : pot, acc = self._setup_potential(R,z) return acc[:,0] @scalarVectorDecorator def _zforce(self, R,z,phi=None,t=None,dR=None,dphi=None) : pot, acc = self._setup_potential(R,z) return acc[:,1] def _setup_potential(self, R, z, use_pkdgrav = False) : # compute the hash for the requested grid new_hash = hashlib.md5(numpy.array([R,z])).hexdigest() # if we computed for these points before, return; otherwise compute if new_hash in self._point_hash : pot, rz_acc = self._point_hash[new_hash] # if use_pkdgrav : else : # set up the four points per R,z pair to mimic axisymmetry points = numpy.zeros((len(R),self._naz,3)) for i in range(len(R)) : points[i] = numpy.array([R[i]*self._cosaz,R[i]*self._sinaz, z[i]*self._zones]).T points_new = points.reshape(points.size//3,3) pot, acc = gravity.calc.direct(self._s,points_new,num_threads=self._num_threads) pot = pot.reshape(len(R),self._naz) acc = acc.reshape(len(R),self._naz,3) # need to average the potentials pot = pot.mean(axis=1) # get the radial accelerations rz_acc = numpy.zeros((len(R),2)) rvecs= numpy.array([self._cosaz,self._sinaz,self._zzeros]).T for i in range(len(R)) : for j,rvec in enumerate(rvecs) : rz_acc[i,0] += acc[i,j].dot(rvec) rz_acc[i,1] += acc[i,j,2] rz_acc /= self._naz # store the computed values for reuse self._point_hash[new_hash] = [pot,rz_acc] return pot, rz_acc
[docs]class InterpSnapshotRZPotential(interpRZPotential) : """ Interpolated axisymmetrized potential extracted from a simulation output (see ``interpRZPotential`` and ``SnapshotRZPotential``) """
[docs] def __init__(self, s, ro=None,vo=None, rgrid=(numpy.log(0.01),numpy.log(20.),101), zgrid=(0.,1.,101), interpepifreq = False, interpverticalfreq = False, interpPot = True, enable_c = True, logR = True, zsym = True, numcores=None,nazimuths=4,use_pkdgrav = False) : """ NAME: __init__ PURPOSE: Initialize an InterpSnapshotRZPotential instance INPUT: s - a simulation snapshot loaded with pynbody rgrid - R grid to be given to linspace as in rs= linspace(*rgrid) zgrid - z grid to be given to linspace as in zs= linspace(*zgrid) logR - if True, rgrid is in the log of R so logrs= linspace(*rgrid) interpPot, interpepifreq, interpverticalfreq= if True, interpolate these functions (interpPot=True also interpolates the R and zforce) enable_c= enable use of C for interpolations zsym= if True (default), the potential is assumed to be symmetric around z=0 (so you can use, e.g., zgrid=(0.,1.,101)). numcores= if set to an integer, use this many cores nazimuths= (4) number of azimuths to average over use_pkdgrav= (False) use PKDGRAV to calculate the snapshot's potential and forces (CURRENTLY NOT IMPLEMENTED) ro=, vo= distance and velocity scales for translation into internal units (default from configuration file) OUTPUT: instance HISTORY: 2013 - Written - Rok Roskar (ETH) 2014-11-24 - Edited for merging into main galpy - Bovy (IAS) """ if not _PYNBODY_LOADED: #pragma: no cover raise ImportError("The InterpSnapRZShotPotential class is designed to work with pynbody snapshots, which cannot be loaded (probably because it is not installed) -- obtain from pynbody.github.io") # inititalize using the base class Potential.__init__(self,amp=1.0,ro=ro,vo=vo) # other properties if numcores is None: self._numcores= pynbody.config['number_of_threads'] else: self._numcores = numcores self._s = s # Set up azimuthal averaging self._naz= nazimuths self._cosaz= numpy.cos(numpy.arange(self._naz,dtype='float')\ /self._naz*2.*numpy.pi) self._sinaz= numpy.sin(numpy.arange(self._naz,dtype='float')\ /self._naz*2.*numpy.pi) self._zones= numpy.ones(self._naz) self._zzeros= numpy.zeros(self._naz) # the interpRZPotential class sets these flags self._enable_c = enable_c self.hasC = True # set up the flags for interpolated quantities # since the potential and force are always calculated together, # set the force interpolations to true if potential is true and # vice versa self._interpPot = interpPot self._interpRforce = self._interpPot self._interpzforce = self._interpPot self._interpvcirc = self._interpPot # these require additional calculations so set them seperately self._interpepifreq = interpepifreq self._interpverticalfreq = interpverticalfreq # make the potential accessible at points beyond the grid self._origPot = SnapshotRZPotential(s, numcores) # setup the grid self._zsym = zsym self._logR = logR self._rgrid = numpy.linspace(*rgrid) if logR : self._rgrid = numpy.exp(self._rgrid) self._logrgrid = numpy.log(self._rgrid) rs = self._logrgrid else : rs = self._rgrid self._zgrid = numpy.linspace(*zgrid) # calculate the grids self._setup_potential(self._rgrid,self._zgrid,use_pkdgrav=use_pkdgrav) if enable_c and interpPot: self._potGrid_splinecoeffs = calc_2dsplinecoeffs_c(self._potGrid) self._rforceGrid_splinecoeffs = calc_2dsplinecoeffs_c(self._rforceGrid) self._zforceGrid_splinecoeffs = calc_2dsplinecoeffs_c(self._zforceGrid) else : self._potInterp= interpolate.RectBivariateSpline(rs, self._zgrid, self._potGrid, kx=3,ky=3,s=0.) self._rforceInterp= interpolate.RectBivariateSpline(rs, self._zgrid, self._rforceGrid, kx=3,ky=3,s=0.) self._zforceInterp= interpolate.RectBivariateSpline(rs, self._zgrid, self._zforceGrid, kx=3,ky=3,s=0.) if interpepifreq: self._R2interp = interpolate.RectBivariateSpline(rs, self._zgrid, self._R2derivGrid, kx=3,ky=3,s=0.) if interpverticalfreq: self._z2interp = interpolate.RectBivariateSpline(rs, self._zgrid, self._z2derivGrid, kx=3,ky=3,s=0.) if interpepifreq and interpverticalfreq: self._Rzinterp = interpolate.RectBivariateSpline(rs, self._zgrid, self._RzderivGrid, kx=3,ky=3,s=0.) # setup the derived quantities if interpPot: self._vcircGrid = numpy.sqrt(self._rgrid*(-self._rforceGrid[:,0])) self._vcircInterp = interpolate.InterpolatedUnivariateSpline(rs, self._vcircGrid, k=3) if interpepifreq: self._epifreqGrid = numpy.sqrt(self._R2derivGrid[:,0] - 3./self._rgrid*self._rforceGrid[:,0]) goodindx= True^numpy.isnan(self._epifreqGrid) self._epifreqInterp=\ interpolate.InterpolatedUnivariateSpline(rs[goodindx], self._epifreqGrid[goodindx], k=3) self._epigoodindx= goodindx if interpverticalfreq: self._verticalfreqGrid = numpy.sqrt(numpy.abs(self._z2derivGrid[:,0])) goodindx= True^numpy.isnan(self._verticalfreqGrid) self._verticalfreqInterp=\ interpolate.InterpolatedUnivariateSpline(rs[goodindx], self._verticalfreqGrid[goodindx],k=3) self._verticalgoodindx= goodindx
def _setup_potential(self, R, z, use_pkdgrav = False, dr = 0.0001) : """ Calculates the potential and force grids for the snapshot for use with other galpy functions. **Input**: *R*: R grid coordinates (numpy array) *z*: z grid coordinates (numpy array) **Optional Keywords**: *use_pkdgrav*: (False) whether to use pkdgrav for the gravity calculation *dr*: (0.01) offset to use for the gradient calculation - the points are positioned at +/- dr from the central point """ # set up the four points per R,z pair to mimic axisymmetry points = numpy.zeros((len(R),len(z),self._naz,3)) for i in range(len(R)) : for j in range(len(z)) : points[i,j] = numpy.array([R[i]*self._cosaz,R[i]*self._sinaz, z[j]*self._zones]).T points_new = points.reshape(points.size//3,3) self.points = points_new # set up the points to calculate the second derivatives zgrad_points = numpy.zeros((len(points_new)*2,3)) rgrad_points = numpy.zeros((len(points_new)*2,3)) for i,p in enumerate(points_new) : zgrad_points[i*2] = p zgrad_points[i*2][2] -= dr zgrad_points[i*2+1] = p zgrad_points[i*2+1][2] += dr rgrad_points[i*2] = p rgrad_points[i*2][:2] -= p[:2]/numpy.sqrt(numpy.dot(p[:2],p[:2]))*dr rgrad_points[i*2+1] = p rgrad_points[i*2+1][:2] += p[:2]/numpy.sqrt(numpy.dot(p[:2],p[:2]))*dr if use_pkdgrav : #pragma: no cover raise RuntimeError("using pkdgrav not currently implemented") sn = pynbody.snapshot._new(len(self._s.d)+len(self._s.g)+len(self._s.s)+len(points_new)) print("setting up %d grid points"%(len(points_new))) #sn['pos'][0:len(self.s)] = self.s['pos'] #sn['mass'][0:len(self.s)] = self.s['mass'] #sn['phi'] = 0.0 #sn['eps'] = 1e3 #sn['eps'][0:len(self.s)] = self.s['eps'] #sn['vel'][0:len(self.s)] = self.s['vel'] #sn['mass'][len(self.s):] = 1e-10 sn['pos'][len(self._s):] = points_new sn['mass'][len(self._s):] = 0.0 sn.write(fmt=pynbody.tipsy.TipsySnap, filename='potgridsnap') command = '~/bin/pkdgrav2_pthread -sz %d -n 0 +std -o potgridsnap -I potgridsnap +potout +overwrite %s'%(self._numcores, self._s._paramfile['filename']) print(command) system(command) sn = pynbody.load('potgridsnap') acc = sn['accg'][len(self._s):].reshape(len(R)*len(z),self._naz,3) pot = sn['pot'][len(self._s):].reshape(len(R)*len(z),self._naz) else : if self._interpPot: pot, acc = gravity.calc.direct(self._s,points_new,num_threads=self._numcores) pot = pot.reshape(len(R)*len(z),self._naz) acc = acc.reshape(len(R)*len(z),self._naz,3) # need to average the potentials pot = pot.mean(axis=1) # get the radial accelerations rz_acc = numpy.zeros((len(R)*len(z),2)) rvecs= numpy.array([self._cosaz,self._sinaz,self._zzeros]).T # reshape the acc to make sure we have a leading index even # if we are only evaluating a single point, i.e. we have # shape = (1,4,3) not (4,3) acc = acc.reshape((len(rz_acc),self._naz,3)) for i in range(len(R)*len(z)) : for j,rvec in enumerate(rvecs) : rz_acc[i,0] += acc[i,j].dot(rvec) rz_acc[i,1] += acc[i,j,2] rz_acc /= self._naz self._potGrid = pot.reshape((len(R),len(z))) self._rforceGrid = rz_acc[:,0].reshape((len(R),len(z))) self._zforceGrid = rz_acc[:,1].reshape((len(R),len(z))) # compute the force gradients # first get the accelerations if self._interpverticalfreq : zgrad_pot, zgrad_acc = gravity.calc.direct(self._s,zgrad_points,num_threads=self._numcores) # each point from the points used above for pot and acc is straddled by # two points to get the gradient across it. Compute the gradient by # using a finite difference zgrad = numpy.zeros(len(points_new)) # do a loop through the pairs of points -- reshape the array # so that each item is the pair of acceleration vectors # then calculate the gradient from the two points for i,zacc in enumerate(zgrad_acc.reshape((len(zgrad_acc)//2,2,3))) : zgrad[i] = ((zacc[1]-zacc[0])/(dr*2.0))[2] # reshape the arrays self._z2derivGrid = -zgrad.reshape((len(zgrad)//self._naz,self._naz)).mean(axis=1).reshape((len(R),len(z))) # do the same for the radial component if self._interpepifreq: rgrad_pot, rgrad_acc = gravity.calc.direct(self._s,rgrad_points,num_threads=self._numcores) rgrad = numpy.zeros(len(points_new)) for i,racc in enumerate(rgrad_acc.reshape((len(rgrad_acc)//2,2,3))) : point = points_new[i] point[2] = 0.0 rvec = point/numpy.sqrt(numpy.dot(point,point)) rgrad_vec = (numpy.dot(racc[1],rvec)- numpy.dot(racc[0],rvec)) / (dr*2.0) rgrad[i] = rgrad_vec self._R2derivGrid = -rgrad.reshape((len(rgrad)//self._naz,self._naz)).mean(axis=1).reshape((len(R),len(z))) # do the same for the mixed radial-vertical component if self._interpepifreq and self._interpverticalfreq: # re-use this Rzgrad = numpy.zeros(len(points_new)) for i,racc in enumerate(rgrad_acc.reshape((len(rgrad_acc)//2,2,3))) : Rzgrad[i] = ((racc[1]-racc[0])/(dr*2.0))[2] # reshape the arrays self._RzderivGrid = -Rzgrad.reshape((len(Rzgrad)//self._naz,self._naz)).mean(axis=1).reshape((len(R),len(z))) @scalarVectorDecorator @zsymDecorator(False) def _R2deriv(self,R,Z,phi=0.,t=0.): return self._R2interp(R,Z) @scalarVectorDecorator @zsymDecorator(False) def _z2deriv(self,R,Z,phi=None,t=None): return self._z2interp(R,Z) @scalarVectorDecorator @zsymDecorator(True) def _Rzderiv(self,R,Z,phi=None,t=None): return self._Rzinterp(R,Z) def normalize(self, R0=8.0) : """ Normalize all positions by R0 and velocities by Vc(R0). If :class:`~scipy.interpolate.RectBivariateSpline` or :class:`~scipy.interpolate.InterpolatedUnivariateSpline` are used, redefine them for use with the rescaled coordinates. To undo the normalization, call :func:`~galpy.potential.SnapshotPotential.InterpSnapshotPotential.denormalize`. """ Vc0 = self.vcirc(R0) Phi0 = numpy.abs(self.Rforce(R0,0.0)) self._normR0 = R0 self._normVc0 = Vc0 self._normPhi0 = Phi0 # rescale the simulation if not isinstance(self._s['pos'].units,NoUnit): self._posunit = self._s['pos'].units self._s['pos'].convert_units('%s kpc'%R0) else: self._posunit = None if not isinstance(self._s['vel'].units,NoUnit): self._velunit = self._s['vel'].units self._s['vel'].convert_units('%s km s**-1'%Vc0) else: self._velunit = None # rescale the grid self._rgrid /= R0 if self._logR: self._logrgrid -= numpy.log(R0) rs = self._logrgrid else : rs = self._rgrid self._zgrid /= R0 # rescale the potential self._amp /= Phi0 self._savedsplines = {} # rescale anything using splines if not self._enable_c and self._interpPot : for spline,name in zip([self._potInterp, self._rforceInterp, self._zforceInterp], ["pot", "rforce", "zforce"]): self._savedsplines[name] = spline self._potInterp= interpolate.RectBivariateSpline(rs, self._zgrid, self._potGrid/R0, kx=3,ky=3,s=0.) self._rforceInterp= interpolate.RectBivariateSpline(rs, self._zgrid, self._rforceGrid, kx=3,ky=3,s=0.) self._zforceInterp= interpolate.RectBivariateSpline(rs, self._zgrid, self._zforceGrid, kx=3,ky=3,s=0.) elif self._enable_c and self._interpPot: self._potGrid_splinecoeffs= calc_2dsplinecoeffs_c(self._potGrid/R0) if self._interpPot : self._savedsplines['vcirc'] = self._vcircInterp self._vcircInterp = interpolate.InterpolatedUnivariateSpline(rs, self._vcircGrid/Vc0, k=3) if self._interpepifreq: self._savedsplines['R2deriv'] = self._R2interp self._savedsplines['epifreq'] = self._epifreqInterp self._R2interp = interpolate.RectBivariateSpline(rs, self._zgrid, self._R2derivGrid, kx=3,ky=3,s=0.) self._epifreqInterp = interpolate.InterpolatedUnivariateSpline(rs[self._epigoodindx], self._epifreqGrid[self._epigoodindx]/numpy.sqrt(Phi0/R0), k=3) if self._interpverticalfreq: self._savedsplines['z2deriv'] = self._z2interp self._savedsplines['verticalfreq'] = self._verticalfreqInterp self._z2interp = interpolate.RectBivariateSpline(rs, self._zgrid, self._z2derivGrid, kx=3,ky=3,s=0.) self._verticalfreqInterp = interpolate.InterpolatedUnivariateSpline(rs[self._verticalgoodindx], self._verticalfreqGrid[self._verticalgoodindx]/numpy.sqrt(Phi0/R0), k=3) def denormalize(self) : """ Undo the normalization. """ R0 = self._normR0 Vc0 = self._normVc0 Phi0 = self._normPhi0 # rescale the simulation if not self._posunit is None: self._s['pos'].convert_units(self._posunit) if not self._velunit is None: self._s['vel'].convert_units(self._velunit) # rescale the grid self._rgrid *= R0 if self._logR: self._logrgrid += numpy.log(R0) rs = self._logrgrid else : rs = self._rgrid self._zgrid *= R0 # rescale the potential self._amp *= Phi0 # restore the splines if not self._enable_c and self._interpPot : self._potInterp= self._savedsplines['pot'] self._rforceInterp= self._savedsplines['rforce'] self._zforceInterp= self._savedsplines['zforce'] elif self._enable_c and self._interpPot: self._potGrid_splinecoeffs= calc_2dsplinecoeffs_c(self._potGrid) if self._interpPot : self._vcircInterp = self._savedsplines['vcirc'] if self._interpepifreq : self._R2interp = self._savedsplines['R2deriv'] self._epifreqInterp = self._savedsplines['epifreq'] if self._interpverticalfreq: self._z2interp = self._savedsplines['z2deriv'] self._verticalfreqInterp = self._savedsplines['verticalfreq']