Source code for macromax.backend.tensorflow

"""The module providing the TensorFlow back-end implementation."""
import numpy as np
from typing import Union, Callable
from numbers import Complex
import tensorflow as tf
# from tensorflow.python.ops.numpy_ops import np_config
# np_config.enable_numpy_behavior()
import os
import logging

from macromax.utils.ft import Grid
from .__init__ import BackEnd, array_like

log = logging.getLogger(__name__)

tensor_type = tf.Tensor
array_like = Union[array_like, tensor_type]

_tpu_cache = dict()


[docs] class BackEndTensorFlow(BackEnd): """ A class that provides methods to work with arrays of matrices or block-diagonal matrices, represented as ndarrays, where the first two dimensions are those of the matrix, and the final dimensions are the coordinates over which the operations are parallelized and the Fourier transforms are applied. """
[docs] def __init__(self, nb_dims: int, grid: Grid, hardware_dtype=tf.complex128, device: str = None, address: str = None): """ Construct object to handle parallel operations on square matrices of nb_rows x nb_rows elements. The matrices refer to points in space on a uniform plaid grid. :param nb_dims: The number of rows and columns in each matrix. 1 for scalar operations, 3 for polarization :param grid: The grid that defines the position of the matrices. :param hardware_dtype: (optional) The data type to use for operations. :param device: (optional) 'cpu', 'gpu', or 'tpu' to indicate where the calculation will happen. :param address: (optional) """ if hardware_dtype == np.complex64: hardware_dtype = tf.complex64 elif hardware_dtype == np.complex128: hardware_dtype = tf.complex128 super().__init__(nb_dims, grid, hardware_dtype) if device is None or device.lower() == 'tpu': if address is None: if 'COLAB_TPU_ADDR' in os.environ: env_tpu_addr = os.environ['COLAB_TPU_ADDR'] log.info('COLAB_TPU_ADDR:') log.info(env_tpu_addr) if not isinstance(env_tpu_addr, str): env_tpu_addr = env_tpu_addr[0] address = 'grpc://' + env_tpu_addr else: log.debug('No TPU address specified. Should be of the form grpc://IP:port') if ((address is None) and ('None' in _tpu_cache)) or (address in _tpu_cache): tpus = _tpu_cache[address if address is not None else 'None'] else: os.environ['TF_XLA_FLAGS'] = '--tf_xla_enable_xla_devices' if address is not None: resolver = tf.distribute.cluster_resolver.TPUClusterResolver(tpu=address) tf.config.experimental_connect_to_cluster(resolver) tf.tpu.experimental.initialize_tpu_system(resolver) # Initialize once in program tpus = tf.config.list_logical_devices('TPU') _tpu_cache[address if address is not None else 'None'] = tpus # cache if len(tpus) > 0: log.info(f'Found TPU devices: {tpus}') else: log.info('No TPU found.') if device is not None: raise ValueError('No TPU found!') if device is None or device.lower() == 'gpu': gpus = tf.config.list_physical_devices('GPU') if len(gpus) > 0: log.info(f'Found GPU devices: {gpus}') else: log.info('No GPUs found.') if device is not None: raise ValueError('No GPU found!') if device is None: device = 'cpu' self.__device = '/' + device.upper() # Do not specify a specific number self.__longitudinal_projection = None # scalar array for the calculation of longitudinal_projection_ft
@property def numpy_dtype(self): """The equivalent hardware data type in numpy""" dtype = self.hardware_dtype if dtype == tf.complex64: numpy_dtype = np.complex64 elif dtype == tf.complex128: numpy_dtype = np.complex128 else: numpy_dtype = np.complex128 return numpy_dtype @property def eps(self) -> float: return np.finfo(self.numpy_dtype).eps # TODO: this should extend to non-numpy-equivalent types
[docs] def astype(self, arr: array_like, dtype=None) -> tensor_type: """ As necessary, convert the ndarray arr to the type dtype. """ # tf.autograph.set_verbosity(3, True) if dtype is None: dtype = self.hardware_dtype elif dtype == np.complex64: dtype = tf.complex64 elif dtype in [np.complex128, np.complex, complex]: dtype = tf.complex128 elif dtype == np.float32: dtype = tf.float32 elif dtype in [np.float64, np.float, float]: dtype = tf.float64 if not isinstance(arr, tf.Tensor): arr = tf.convert_to_tensor(arr) if arr.dtype != dtype: arr = tf.cast(arr, dtype=dtype) return arr
[docs] def asnumpy(self, arr: array_like) -> np.ndarray: if isinstance(arr, tensor_type): arr = arr.numpy() #tf.make_ndarray(tf.make_tensor_proto(arr)) # todo: not working yet! return arr
[docs] def assign(self, arr, out) -> tensor_type: arr = self.to_matrix_field(arr) if np.any(arr.shape[-self.grid.ndim:] != self.grid.shape): arr = tf.tile(arr, np.array(out.shape) // np.array(arr.shape)) out = self.assign_exact(arr, out) return out
[docs] def assign_exact(self, arr, out: tensor_type) -> tensor_type: out = tf.identity(arr) # out = tf.raw_ops.Copy(input=self.astype(arr)) # TODO: Is this even needed for TensorFlow? Or better use Assign? return out
[docs] def allocate_array(self, shape: array_like = None, dtype=None, fill_value: Complex = None) -> tensor_type: """Allocates a new vector array of shape grid.shape and word-aligned for efficient calculations.""" if shape is None: shape = [self.vector_length, 1, *self.grid.shape] with tf.device(self.__device): if fill_value is None: fill_value = 0.0 arr = tf.fill(shape, value=tf.constant(fill_value, dtype=self.hardware_dtype)) return arr
[docs] def copy(self, arr: array_like) -> tensor_type: """Makes an independent copy of an ndarray.""" return tf.raw_ops.Copy(input=arr)
[docs] def ravel(self, arr: array_like) -> tensor_type: """Returns a flattened view of the array.""" return tf.reshape(arr, [-1])
[docs] def sign(self, arr: array_like) -> tensor_type: return tf.math.sign(arr)
[docs] def swapaxes(self, arr: array_like, ax_from: int, ax_to: int) -> tensor_type: """Transpose (permute) two axes of an ndarray.""" p = np.arange(2 + self.grid.ndim, dtype=int) p[ax_to], p[ax_from] = ax_from, ax_to return tf.transpose(arr, perm=p)
[docs] @staticmethod def expand_dims(arr: array_like, axis: int) -> tensor_type: """Inserts a new singleton axis at the indicated position, thus increasing ndim by 1.""" return tf.expand_dims(arr, axis)
[docs] def abs(self, arr) -> tensor_type: return tf.math.abs(self.astype(arr))
[docs] def real(self, arr: array_like) -> tensor_type: return tf.math.real(arr)
[docs] def conj(self, arr) -> tensor_type: return tf.math.conj(self.astype(arr))
[docs] def any(self, arr: array_like): """Returns True if all elements of the array are True.""" return tf.reduce_any(self.astype(arr, dtype=bool))
[docs] def allclose(self, arr: array_like, other: array_like = 0.0) -> bool: """Returns True if all elements in arr are close to other.""" return self.asnumpy(tf.reduce_all(tf.abs(arr - self.astype(other)) < self.eps**0.5)).ravel()[0]
[docs] def amax(self, arr): """Returns the maximum of the flattened array.""" return self.asnumpy(tf.reduce_max(self.astype(arr, dtype=float)))
[docs] def sort(self, arr: array_like) -> tensor_type: """Sorts array elements along the first (left-most) axis.""" return tf.sort(self.real(arr), axis=0)
[docs] def ft(self, arr: array_like) -> tensor_type: """ Calculates the discrete Fourier transform over the spatial dimensions of E. The computational complexity is that of a Fast Fourier Transform: ``O(N\\log(N))``. :param arr: An ndarray representing a vector field. :return: An ndarray holding the Fourier transform of the vector field E. """ arr = self.astype(arr) p = [0, 1, self.ft_axes[-1], *self.ft_axes[:-1]] # self.ft_axes must be all axes on the right for axis in self.ft_axes: arr = tf.signal.fft(arr) arr = tf.transpose(arr, perm=p) return arr
[docs] def ift(self, arr: array_like) -> tensor_type: """ Calculates the inverse Fourier transform over the spatial dimensions of E. The computational complexity is that of a Fast Fourier Transform: ``O(N\\log(N))``. The scaling is so that ``E == self.ift(self.ft(E))`` :param arr: An ndarray representing a Fourier-transformed vector field. :return: An ndarray holding the inverse Fourier transform of the vector field E. """ arr = self.astype(arr) p = [0, 1, *self.ft_axes[1:], self.ft_axes[0]] # self.ft_axes must be all axes on the right for _ in self.ft_axes: arr = tf.transpose(arr, perm=p) arr = tf.signal.ifft(arr) return arr
@tf.function def convolve(self, operation_ft: Callable[[array_like], array_like], arr: array_like) -> tensor_type: return super().convolve(operation_ft=operation_ft, arr=arr)
[docs] def adjoint(self, mat: array_like) -> tensor_type: """ Transposes the elements of individual matrices with complex conjugation. :param mat: The ndarray with the matrices in the first two dimensions. :return: An ndarray with the complex conjugate transposed matrices. """ p = [1, 0, *(2 + np.arange(self.grid.ndim))] return tf.math.conj(tf.transpose(self.astype(mat), perm=p))
[docs] def subtract(self, left_term: array_like, right_term: array_like) -> np.ndarray: """ Point-wise difference of A and B. :param left_term: The left matrix array, must start with dimensions n x m :param right_term: The right matrix array, must have matching or singleton dimensions to those of A. In case of missing dimensions, singletons are assumed. :return: The point-wise difference of both sets of matrices. Singleton dimensions are expanded. """ left_term = self.astype(left_term) if not self.is_scalar(left_term) or not self.is_scalar(right_term): if self.is_scalar(left_term): left_term = self.eye * self.to_matrix_field(left_term) if self.is_scalar(right_term): right_term = self.eye * self.to_matrix_field(right_term) return left_term - right_term
[docs] def is_scalar(self, arr: array_like) -> bool: """ Tests if A represents a scalar field (as opposed to a vector field). :param arr: The ndarray to be tested. :return: A boolean indicating whether A represents a scalar field (True) or not (False). """ return np.isscalar(arr) or len(arr.shape) == 0 or (arr.shape[0] == 1 and (len(arr.shape) == 1 or arr.shape[1] == 1))
@tf.function def mul(self, left_factor: array_like, right_factor: array_like, out: tf.Tensor = None) -> tensor_type: """ Point-wise matrix multiplication of A and B. Overwrites right_factor! :param left_factor: The left matrix array, must start with dimensions n x m :param right_factor: The right matrix array, must have matching or singleton dimensions to those of A, bar the first two dimensions. In case of missing dimensions, singletons are assumed. The first dimensions must be m x p. Where the m matches that of the left hand matrix unless both m and p are 1 or both n and m are 1, in which case the scaled identity is assumed. :param out: (optional) The destination array for the results. :return: An array of matrix products with all but the first two dimensions broadcast as needed. """ if self.is_scalar(left_factor) or self.is_scalar(right_factor): result = self.astype(left_factor) * self.astype(right_factor) else: p = [*(2 + np.arange(self.grid.ndim)), 0, 1] ip = [self.grid.ndim, self.grid.ndim + 1, *np.arange(self.grid.ndim)] left_factor = tf.transpose(self.astype(left_factor), perm=p) right_factor = tf.transpose(self.astype(right_factor), perm=p) result = tf.linalg.matmul(left_factor, right_factor) result = tf.transpose(result, perm=ip) return result
[docs] def ldivide(self, denominator: array_like, numerator: array_like = 1.0) -> tensor_type: """ Parallel matrix left division, A^{-1}B, on the final two dimensions of A and B result_lm = A_kl \\ B_km A and B must have have all but the final dimension identical or singletons. B defaults to the identity matrix. :param denominator: The set of denominator matrices. :param numerator: The set of numerator matrices. :return: The set of divided matrices. """ denominator = self.to_matrix_field(denominator) # convert scalar to array if needed numerator = self.to_matrix_field(numerator) # convert scalar to array if needed shape_A = denominator.shape[:2] if self.is_scalar(denominator): return self.astype(numerator) / denominator else: denominator = self.asnumpy(denominator) # TODO: Keep this in Tensorflow numerator = self.asnumpy(numerator) # TODO: Keep this in Tensorflow total_dims = 2 + self.grid.ndim new_order = np.roll(np.arange(total_dims), -2) denominator = denominator.transpose(new_order) if self.is_scalar(numerator): if shape_A[0] == shape_A[1]: Y = np.linalg.inv(denominator) * numerator else: Y = np.linalg.pinv(denominator) * numerator else: numerator = numerator.transpose(new_order) if shape_A[0] == shape_A[1]: Y = np.linalg.solve(denominator, numerator) else: Y = np.linalg.lstsq(denominator, numerator)[0] old_order = np.roll(np.arange(total_dims), 2) result = Y.transpose(old_order) return self.astype(result)
[docs] def norm(self, arr: array_like) -> float: return float(tf.math.real(tf.linalg.norm(arr)))
[docs] def longitudinal_projection_ft(self, field_array_ft: array_like) -> np.ndarray: """ Projects the Fourier transform of a vector array onto its longitudinal component. Overwrites self.array_ft_input! :param field_array_ft: The Fourier transform of the input vector E array. :return: The Fourier transform of the longitudinal projection. """ data_shape = field_array_ft.shape nb_input_vector_dims = data_shape[0] data_shape = data_shape[2:] nb_data_dims = np.minimum(len(data_shape), len(self.grid.step)) nb_output_dims = np.maximum(nb_data_dims, nb_input_vector_dims) field_array_ft = self.astype(field_array_ft) zero_k2 = tuple(0 for _ in range(nb_data_dims)) # Store the DC components for later use field_dc = [field_array_ft[out_dim_idx, 0][zero_k2] for out_dim_idx in range(nb_data_dims)] # Pre-alocate a working array if self.__longitudinal_projection is None: self.__longitudinal_projection = self.allocate_array(shape=data_shape) # (K x K) . xFt == K x (K . xFt) self.__longitudinal_projection = self.assign(self.k[0] * field_array_ft[0, 0], self.__longitudinal_projection) # overwrite with new data # Project on k vectors for in_dim_idx in range(1, nb_data_dims): self.__longitudinal_projection += self.k[in_dim_idx] * field_array_ft[in_dim_idx, 0] # Divide by K**2 but handle division by zero separately self.__longitudinal_projection /= (self.k2 + self.astype(self.k2 == 0)) # avoid / 0 at the origin # self.ravel(self.__longitudinal_projection)[1:] /= self.ravel(self.k2)[1:] # skip the / 0 at the origin polarizations = [] for out_dim_idx in range(nb_data_dims): # Save time by not storing the complete tensor # stretch each k vector to be as long as the projection polarization = self.k[out_dim_idx] * self.__longitudinal_projection polarization = tf.tensor_scatter_nd_update(polarization, [(0, 0, *zero_k2)], [field_dc[out_dim_idx]]) # undefined origin => project the DC as longitudinal polarizations.append(polarization) for out_dim_idx in range(nb_data_dims, nb_output_dims): # Make sure to add zeros to fit the number of dimensions polarizations.append(tf.zeros(shape=[1, 1, *self.grid.shape], dtype=self.hardware_dtype)) result = tf.concat(polarizations, axis=0) # result = tf.reshape(result, shape=[result.shape[0], 1, *result.shape[1:]]) return result # const.piLF <- (K x K)/K**2
[docs] def transversal_projection_ft(self, field_array_ft: array_like) -> tensor_type: """ Projects the Fourier transform of a vector E array onto its transversal component. :param field_array_ft: The Fourier transform of the input vector E array. :return: The Fourier transform of the transversal projection. """ transversal_ft = self.astype(field_array_ft) # Get it in place for an Inverse Fourier Transform transversal_ft -= self.longitudinal_projection_ft(field_array_ft) return transversal_ft
[docs] def div(self, field_array: array_like) -> tensor_type: """ Calculates the divergence of input field_array. :param field_array: The input array representing the field in all spatial dimensions. The input is of the shape ``[m, n, x, y, z, ...]`` :return: The divergence of the field in the shape ``[n, 1, x, y, z]``. """ field_array_ft = self.ft(field_array) div_field_array_ft = self.allocate_array(shape=field_array_ft.shape[1:], dtype=self.hardware_dtype, fill_value=0) for dim_idx in range(np.minimum(self.grid.ndim, len(field_array_ft.shape))): div_field_array_ft += 1j * self.k[dim_idx] * field_array_ft[dim_idx, :, ...] arr_ft = self.expand_dims(div_field_array_ft, 1) arr = self.ift(arr_ft) return arr
[docs] def curl_ft(self, field_array_ft: array_like) -> tensor_type: """ Calculates the Fourier transform of the curl of a Fourier transformed E with the final dimension the vector dimension. The final dimension of the output will always be of length 3; however, the input length may be shorter, in which case the missing values are assumed to be zero. The first dimension of the input array corresponds to the first element in the final dimension, if it exists, the second dimension corresponds to the second element etc. :param field_array_ft: The input vector array of dimensions ``[vector_length, 1, *data_shape]``. :return: The Fourier transform of the curl of F. """ field_array_ft = self.astype(field_array_ft) # Calculate the curl # curl_field_array_ft = self.allocate_array(shape=[self.vector_length, *field_array_ft.shape[1:]], dtype=self.hardware_dtype) curl_field_array_ft_polarizations = [] # as the cross product without representing the first factor in full for dim_idx in range(self.vector_length): other_dims = (dim_idx + np.array([-1, 1])) % self.vector_length if other_dims[0] < field_array_ft.shape[0] and other_dims[1] < len(self.k): res = self.k[other_dims[1]] * field_array_ft[other_dims[0]] else: res = tf.zeros(shape=[1, *self.grid.shape], dtype=self.hardware_dtype) if other_dims[1] < field_array_ft.shape[0] and other_dims[0] < len(self.k): res -= self.k[other_dims[0]] * field_array_ft[other_dims[1]] curl_field_array_ft_polarizations.append(res) curl_field_array_ft = tf.stack(curl_field_array_ft_polarizations) return curl_field_array_ft * tf.constant(1.0j, dtype=self.hardware_dtype)
[docs] def mat3_eigh(self, arr: array_like) -> tensor_type: """ Calculates the eigenvalues of the 3x3 Hermitian matrices represented by A and returns a new array of 3-vectors, one for each matrix in A and of the same dimensions, baring the second dimension. When the first two dimensions are 3x1 or 1x3, a diagonal matrix is assumed. When the first two dimensions are singletons (1x1), a constant diagonal matrix is assumed and only one eigenvalue is returned. Returns an array of one dimension less: 3 x data_shape. With the exception of the first dimension, the shape is maintained. :param arr: The set of 3x3 input matrices for which the eigenvalues are requested. This must be an ndarray with the first two dimensions of size 3. :return: The set of eigenvalue-triples contained in an ndarray with its first dimension of size 3, and the remaining dimensions equal to all but the first two input dimensions. """ arr = self.astype(arr) arr = tf.transpose(arr, perm=[*(2 + np.arange(self.grid.ndim)), 0, 1]) result = tf.linalg.eigvalsh(arr) result = tf.transpose(result, perm=[self.grid.ndim, *np.arange(self.grid.ndim)]) return result