# -*- coding: utf-8 -*-
"""Defines several tools for monitoring net activity."""
# pylint: disable=F0401, E1101, too-many-lines, wrong-import-order
import logging as _logging
import os as _os
import time as _time
import subprocess as _subprocess
import collections as _collections
import numpy as _np
# pylint: disable=no-name-in-module
from scipy.stats import bernoulli as _bernoulli
from scipy.ndimage.interpolation import rotate as _rotate
from sklearn.decomposition import PCA as _PCA
from .tools import pad as _pad
# CAREFUL! This must be imported before any caffe-related import!
from .initialization import init as _init
import caffe as _caffe
try: # pragma: no cover
import cv2 as _cv2
_cv2INTER_CUBIC = _cv2.INTER_CUBIC # pylint: disable=invalid-name
_cv2INTER_LINEAR = _cv2.INTER_LINEAR # pylint: disable=invalid-name
_cv2INTER_NEAREST = _cv2.INTER_NEAREST # pylint: disable=invalid-name
_cv2resize = _cv2.resize # pylint: disable=invalid-name
except ImportError: # pragma: no cover
_cv2 = None
_cv2INTER_CUBIC = None # pylint: disable=invalid-name
_cv2INTER_LINEAR = None # pylint: disable=invalid-name
_cv2INTER_NEAREST = None # pylint: disable=invalid-name
_cv2resize = None # pylint: disable=invalid-name
try: # pragma: no cover
import matplotlib.pyplot as _plt
import matplotlib.ticker as _tkr
import matplotlib.colorbar as _colorbar
from mpl_toolkits.axes_grid1 import make_axes_locatable as _make_axes_locatable
_PLT_AVAILABLE = True
except ImportError: # pragma: no cover
_PLT_AVAILABLE = False
_init()
_LOGGER = _logging.getLogger(__name__)
[docs]class Monitor(object): # pylint: disable=R0903
"""
The monitor interface.
Should be implemented by any monitor class. The method
:py:func:`barrista.monitoring.Monitor.__call__` must be specified,
the function :py:func:`barrista.monitoring.Monitor.finalize` may
optionally be specified.
"""
def __call__(self, kwargs):
"""
The call implementation.
For available keyword arguments, see the documentation of
:py:class:`barrista.solver.SolverInterface.Fit`.
The callback signals are used as follows:
* initialize_train: called once before training starts,
* initialize_test: called once before training starts (if training with
a validation set is used) or once before testing,
* pre_fit: called before fitting mode is used (e.g., before going
back to fitting during training after a validation run),
* pre_test: called before testing mode is used (e.g., during training
before validation starts),
* post_test: called when testing finished,
* pre_train_batch: before a training batch is fed to the network,
* post_train_batch: after forwarding a training batch,
* pre_test_batch: before a test batch is fed to the network,
* post_test_batch: after a test batch was forwarded through the
network.
"""
if kwargs['callback_signal'] == 'initialize_train':
self._initialize_train(kwargs)
elif kwargs['callback_signal'] == 'initialize_test':
self._initialize_test(kwargs)
elif kwargs['callback_signal'] == 'pre_fit':
self._pre_fit(kwargs)
elif kwargs['callback_signal'] == 'pre_test':
self._pre_test(kwargs)
elif kwargs['callback_signal'] == 'post_test':
self._post_test(kwargs)
elif kwargs['callback_signal'] == 'pre_test_batch':
self._pre_test_batch(kwargs)
elif kwargs['callback_signal'] == 'post_test_batch':
self._post_test_batch(kwargs)
elif kwargs['callback_signal'] == 'pre_train_batch':
self._pre_train_batch(kwargs)
elif kwargs['callback_signal'] == 'post_train_batch':
self._post_train_batch(kwargs)
def _initialize_train(self, kwargs): # pylint: disable=C0111
pass
def _initialize_test(self, kwargs): # pylint: disable=C0111
pass
def _pre_fit(self, kwargs): # pylint: disable=C0111
pass
def _pre_test(self, kwargs): # pylint: disable=C0111
pass
def _post_test(self, kwargs): # pylint: disable=C0111
pass
def _pre_test_batch(self, kwargs): # pylint: disable=C0111
pass
def _post_test_batch(self, kwargs): # pylint: disable=C0111
pass
def _pre_train_batch(self, kwargs): # pylint: disable=C0111
pass
def _post_train_batch(self, kwargs): # pylint: disable=C0111
pass
[docs] def finalize(self, kwargs):
"""Will be called at the end of a training/fitting process."""
pass
[docs]class DataMonitor(Monitor): # pylint: disable=R0903
r"""
Monitor interface for filling the blobs of a network.
This is a specific monitor which will fill the blobs of the network
for the forward pass or solver step.
Ideally, there should only be one such monitor per callback,
but multiple ones are possible.
"""
pass
[docs]class ParallelMonitor(Monitor):
r"""
Monitor interface for monitors executed parallel to processing a batch.
The order of all monitors implementing this interface is respected. They
will work on a dummy network object with dummy blobs and prepare their
data. The dummy blob content is then copied to the real network prior
to the next batch execution.
"""
[docs] def get_parallel_blob_names(self): # pragma: no cover
"""Get the names of all blobs that must be provided for the dummy."""
raise NotImplementedError()
# pylint: disable=too-few-public-methods
[docs]class StaticDataMonitor(DataMonitor, ParallelMonitor):
r"""
Always provides the same data for a specific net input blob.
Parameters
==========
:param X: dict(string, np.ndarray)
The static input blobs to use.
"""
def __init__(self, X):
self._X = X # pylint: disable=C0103
def _initialize_train(self, kwargs):
self._initialize(kwargs)
def _initialize_test(self, kwargs):
self._initialize(kwargs)
def _initialize(self, kwargs):
if 'test' in kwargs['callback_signal']:
net = kwargs['testnet']
else:
net = kwargs['net']
for key, value in list(self._X.items()):
assert key in list(net.blobs.keys()), (
'data key has no corresponding network blob {} {}'.format(
key, str(list(net.blobs.keys()))))
assert isinstance(value, _np.ndarray), (
'data must be a numpy nd array ({})'.format(type(value))
)
def _pre_train_batch(self, kwargs):
self._pre_batch(kwargs['net'], kwargs)
def _pre_test_batch(self, kwargs):
self._pre_batch(kwargs['testnet'], kwargs)
def _pre_batch(self, net, kwargs): # pylint: disable=unused-argument
for key in list(self._X.keys()):
net.blobs[key].data[...] = self._X[key]
[docs] def get_parallel_blob_names(self):
"""Get the names of all blobs that must be provided for the dummy."""
return list(self._X.keys())
# pylint: disable=too-few-public-methods
[docs]class OversamplingDataMonitor(DataMonitor, ParallelMonitor):
r"""
Provides oversampled data.
Parameters
==========
:param blobinfos: dict(string, string|None).
Associates blob name to oversample and optional the interpolation
method to use for resize. This may be 'n' (nearest neighbour),
'c' (cubic), 'l' (linear) or None (no interpolation). If an
interpolation method is selected, `before_oversample_resize_to` must
be not None and provide a size.
:param before_oversample_resize_to: dict(string, 2-tuple).
Specifies a size to which the image inputs will be resized before the
oversampling is invoked.
"""
def __init__(self,
blobinfos,
before_oversample_resize_to=None):
for val in blobinfos.values():
assert val in ['n', 'c', 'l', None]
self._blobinfos = blobinfos
for key, val in blobinfos.items():
if val is not None:
assert key in list(before_oversample_resize_to.keys())
self._before_oversample_resize_to = before_oversample_resize_to
self._batch_size = None
[docs] def get_parallel_blob_names(self):
"""Get the names of all blobs that must be provided for the dummy."""
return list(self._blobinfos.keys())
def _initialize_train(self, kwargs):
raise Exception("The OversamplingDataMonitor can only be used during "
"testing!")
def _initialize_test(self, kwargs):
if 'test' in kwargs['callback_signal']:
net = kwargs['testnet']
else:
net = kwargs['net']
for key in list(self._blobinfos.keys()):
assert key in list(net.blobs.keys()), (
'data key has no corresponding network blob {} {}'.format(
key, str(list(net.blobs.keys()))))
def _pre_test(self, kwargs): # pragma: no cover
net = kwargs['testnet']
self._batch_size = net.blobs[
list(self._blobinfos.keys())[0]].data.shape[0]
def _pre_test_batch(self, kwargs): # pragma: no cover
for blob_name in list(self._blobinfos):
assert blob_name in kwargs['data_orig'], (
"The unchanged data must be provided by another DataProvider, "
"e.g., CyclingDataMonitor with `only_preload`!")
assert (len(kwargs['data_orig'][blob_name]) * 10 ==
self._batch_size), (
"The number of provided images * 10 must be the batch "
"size!")
# pylint: disable=invalid-name
for im_idx, im in enumerate(kwargs['data_orig'][blob_name]):
if self._blobinfos[blob_name] is not None:
if self._blobinfos[blob_name] == 'n':
interpolation = _cv2INTER_NEAREST
elif self._blobinfos[blob_name] == 'c':
interpolation = _cv2INTER_CUBIC
elif self._blobinfos[blob_name] == 'l':
interpolation = _cv2INTER_LINEAR
oversampling_prep = _cv2resize(
_np.transpose(im, (1, 2, 0)),
(self._before_oversample_resize_to[blob_name][1],
self._before_oversample_resize_to[blob_name][0]),
interpolation=interpolation)
else:
oversampling_prep = _np.transpose(im, (1, 2, 0))
imshape = kwargs['testnet'].blobs[blob_name].data.shape[2:4]
kwargs['testnet'].blobs[blob_name].data[
im_idx * 10:(im_idx+1) * 10] =\
_np.transpose(
_caffe.io.oversample(
[oversampling_prep],
imshape),
(0, 3, 1, 2))
# pylint: disable=too-many-instance-attributes, R0903
[docs]class CyclingDataMonitor(DataMonitor, ParallelMonitor):
r"""
Uses the data sequentially.
This monitor maps data to the network an cycles through the data
sequentially. It is the default monitor used if a user provides X
or X_val to the barrista.solver.fit method.
If further processing of the original data is intended, by using the flag
``only_preload``, the following monitors find a dictionary of lists of
the original datapoints with the name 'data_orig' in their ``kwargs``.
The data is in this case NOT written to the network input layers! This
can make sense, e.g., for the ``ResizingMonitor``.
:param X: dict of numpy.ndarray or list, or None.
If specified, is used as input data. It is used sequentially, so
shuffle it pre, if required. The keys of the dict must have
a corresponding layer name in the net. The values must be provided
already in network dimension order, i.e., usually channels, height,
width.
:param only_preload: list(string).
List of blobs for which the data will be loaded and stored in a dict
of (name: list) for further processing with other monitors.
:param input_processing_flags: dict(string, string).
Dictionary associating input blob names with intended preprocessing
methods. Valid values are:
* n: none,
* rn: resize, nearest neighbour,
* rc: resize, cubic,
* rl: resize, linear,
* pX: padding, with value X.
:param virtual_batch_size: int or None.
Override the network batch size. May only be used if ``only_preload`` is
set to True. Only makes sense with another DataMonitor in succession.
:param color_data_augmentation_sigmas: dict(string, float) or None.
Enhance the color of the samples as described in (Krizhevsky et al.,
2012). The parameter gives the sigma for the normal distribution that is
sampled to obtain the weights for scaled pixel principal components per
blob.
:param shuffle: Bool.
If set to True, shuffle the data every epoch. Default: False.
"""
# pylint: disable=too-many-arguments
def __init__(self,
X,
only_preload=None,
input_processing_flags=None,
virtual_batch_size=None,
color_data_augmentation_sigmas=None,
shuffle=False):
"""See class documentation."""
if only_preload is None:
only_preload = []
self.only_preload = only_preload
self._X = X # pylint: disable=C0103
assert X is not None
if input_processing_flags is None:
input_processing_flags = dict()
self._input_processing_flags = input_processing_flags
for key in input_processing_flags.keys():
assert key in self._X.keys()
self._padvals = dict()
for key, val in input_processing_flags.items():
assert (val in ['n', 'rn', 'rc', 'rl'] or
val.startswith('p')), (
"The input processing flags for the CyclingDataMonitor "
"must be in ['n', 'rn', 'rc', 'rl', 'p']: {}!".format(
val))
if val.startswith('p'):
self._padvals[key] = int(val[1:])
for key in self.only_preload:
assert key in self._X.keys()
self._sample_pointer = 0
self._len_data = None
self._initialized = False
self._batch_size = None
assert virtual_batch_size is None or self.only_preload, (
"If the virtual_batch_size is set, `only_preload` must be used!")
if virtual_batch_size is not None:
assert virtual_batch_size > 0
self._virtual_batch_size = virtual_batch_size
if color_data_augmentation_sigmas is None:
color_data_augmentation_sigmas = dict()
self._color_data_augmentation_sigmas = color_data_augmentation_sigmas
for key in list(self._color_data_augmentation_sigmas.keys()):
assert key in list(self._X.keys())
for key in list(self._X.keys()):
if key not in list(self._color_data_augmentation_sigmas.keys()):
self._color_data_augmentation_sigmas[key] = 0.
# pylint: disable=invalid-name
self._color_data_augmentation_weights = dict()
# pylint: disable=invalid-name
self._color_data_augmentation_components = dict()
self._shuffle = shuffle
self._sample_order = None
[docs] def get_parallel_blob_names(self):
return list(self._X.keys())
def _initialize_train(self, kwargs):
self._initialize(kwargs)
# Calculate the color channel PCA per blob if required.
for bname, sigma in self._color_data_augmentation_sigmas.items():
if sigma > 0.:
_LOGGER.info("Performing PCA for color data augmentation for "
"blob '%s'...", bname)
for im in self._X[bname]: # pylint: disable=invalid-name
assert im.ndim == 3 and im.shape[0] == 3, (
"To perform the color data augmentation, images must "
"be provided in shape (3, height, width).")
flldta = _np.vstack(
[im.reshape((3, im.shape[1] * im.shape[2])).T
for im in self._X[bname]])
# No need to copy the data another time, since `vstack` already
# copied it.
pca = _PCA(copy=False, whiten=False)
pca.fit(flldta)
self._color_data_augmentation_weights[bname] = _np.sqrt(
pca.explained_variance_.astype('float32'))
self._color_data_augmentation_components[bname] = \
pca.components_.T.astype('float32')
def _initialize_test(self, kwargs):
self._initialize(kwargs)
def _initialize(self, kwargs):
# we make sure, now that the network is available, that
# all names in the provided data dict has a corresponding match
# in the network
if self._initialized:
raise Exception("This DataProvider has already been intialized! "
"Did you maybe try to use it for train and test? "
"This is not possible!")
if 'test' in kwargs['callback_signal']:
net = kwargs['testnet']
else:
net = kwargs['net']
self._len_data = len(list(self._X.values())[0])
for key, value in list(self._X.items()):
if key not in self._input_processing_flags:
self._input_processing_flags[key] = 'n'
assert key in list(net.blobs.keys()), (
'data key has no corresponding network blob {} {}'.format(
key, str(list(net.blobs.keys()))))
assert len(value) == self._len_data, (
'all items need to have the same length {} vs {}'.format(
len(value), self._len_data))
assert isinstance(value, _np.ndarray) or isinstance(value, list), (
'data must be a numpy nd array or list ({})'.format(type(value))
)
self._sample_order = list(range(self._len_data))
if self._shuffle:
_np.random.seed(1)
self._sample_order = _np.random.permutation(self._sample_order)
self._initialized = True
def _pre_fit(self, kwargs):
if 'test' in kwargs['callback_signal']:
net = kwargs['testnet']
else:
net = kwargs['net']
if self._virtual_batch_size is not None:
self._batch_size = self._virtual_batch_size
else:
self._batch_size = net.blobs[list(self._X.keys())[0]].data.shape[0]
assert self._batch_size > 0
def _pre_test(self, kwargs):
self._pre_fit(kwargs)
self._sample_pointer = 0
def _pre_train_batch(self, kwargs):
self._pre_batch(kwargs['net'], kwargs)
def _pre_test_batch(self, kwargs):
self._pre_batch(kwargs['testnet'], kwargs)
def _color_augment(self, bname, sample):
sigma = self._color_data_augmentation_sigmas[bname]
if sigma == 0.:
if isinstance(sample, (int, float)):
return float(sample)
else:
return sample.astype('float32')
else:
comp_weights = _np.random.normal(0., sigma, 3).astype('float32') *\
self._color_data_augmentation_weights[bname]
noise = _np.dot(self._color_data_augmentation_components[bname],
comp_weights.T)
return (sample.astype('float32').transpose((1, 2, 0)) + noise)\
.transpose((2, 0, 1))
def _pre_batch(self, net, kwargs): # pylint: disable=C0111, W0613, R0912
# this will simply cycle through the data.
samples_ids = [self._sample_order[idx % self._len_data]
for idx in
range(self._sample_pointer,
self._sample_pointer + self._batch_size)]
# updating the sample pointer for the next time
old_sample_pointer = self._sample_pointer
self._sample_pointer = (
(self._sample_pointer + len(samples_ids)) % self._len_data)
if self._shuffle and old_sample_pointer > self._sample_pointer:
# Epoch ended. Reshuffle.
self._sample_order = _np.random.permutation(self._sample_order)
if len(self.only_preload) > 0:
sample_dict = dict()
for key in list(self._X.keys()): # pylint: disable=too-many-nested-blocks
if key in self.only_preload:
sample_dict[key] = []
# this will actually fill the data for the network
for sample_idx in range(self._batch_size):
augmented_sample = self._color_augment(
key,
self._X[key][samples_ids[sample_idx]])
if key in self.only_preload:
sample_dict[key].append(augmented_sample)
else:
if (net.blobs[key].data[sample_idx].size == 1 and (
isinstance(self._X[key][samples_ids[sample_idx]],
(int, float)) or
self._X[key][samples_ids[sample_idx]].size == 1) or
self._X[key][samples_ids[sample_idx]].size ==
net.blobs[key].data[sample_idx].size):
if net.blobs[key].data[sample_idx].size == 1:
net.blobs[key].data[sample_idx] =\
augmented_sample
else:
net.blobs[key].data[sample_idx] = (
augmented_sample.reshape(
net.blobs[key].data.shape[1:]))
else:
if self._input_processing_flags[key] == 'n': # pragma: no cover
raise Exception(("Sample size {} does not match " +
"network input size {} and no " +
"preprocessing is allowed!")
.format(
augmented_sample.size,
net.blobs[key].data[sample_idx].size))
elif self._input_processing_flags[key] in ['rn',
'rc',
'rl']:
assert (
augmented_sample.shape[0]
== net.blobs[key].data.shape[1])
if self._input_processing_flags == 'rn':
interp_method = _cv2INTER_NEAREST
elif self._input_processing_flags == 'rc':
interp_method = _cv2INTER_CUBIC
else:
interp_method = _cv2INTER_LINEAR
for channel_idx in range(
net.blobs[key].data.shape[1]):
net.blobs[key].data[sample_idx, channel_idx] =\
_cv2resize(
augmented_sample[channel_idx],
(net.blobs[key].data.shape[3],
net.blobs[key].data.shape[2]),
interpolation=interp_method)
else:
# Padding.
net.blobs[key].data[sample_idx] = _pad(
augmented_sample,
net.blobs[key].data.shape[2:4],
val=self._padvals[key])
if len(self.only_preload) > 0:
kwargs['data_orig'] = sample_dict
[docs]class ResizingMonitor(ParallelMonitor, Monitor): # pylint: disable=R0903
r"""
Optionally resizes input data and adjusts the network input shape.
This monitor optionally resizes the input data randomly and adjusts
the network input size accordingly (this works only for batch size 1
and fully convolutional networks).
For this to work, it must be used with the ``CyclingDataMonitor`` with
``only_preload`` set.
:param blobinfos: dict(string, int).
Describes which blobs to apply the resizing operation to, and which
padding value to use for the remaining space.
:param base_scale: float.
If set to a value different than 1., apply the given base scale first
to images. If set to a value different than 1., the parameter
``interp_methods`` must be set.
:param random_change_up_to: float.
If set to a value different than 0., the scale change is altered
randomly with a uniformly drawn value from -``random_change_up_to`` to
``random_change_up_to``, that is being added to the base value.
:param net_input_size_adjustment_multiple_of: int.
If set to a value greater than 0, the blobs shape is adjusted from its
initial value (which is used as minimal one) in multiples of the given
one.
:param interp_methods: dict(string, string).
Dictionary which stores for every blob the interpolation method. The
string must be for each blob in ['n', 'c', 'l'] (nearest neighbour,
cubic, linear).
"""
def __init__(self, # pylint: disable=R0913
blobinfos,
base_scale=1.,
random_change_up_to=0.,
net_input_size_adjustment_multiple_of=0,
interp_methods=None):
"""See class documentation."""
self._blobinfos = blobinfos
self._base_scale = base_scale
self._random_change_up_to = random_change_up_to
if self._base_scale != 1. or self._random_change_up_to != 0.:
assert interp_methods is not None
for key in self._blobinfos.keys():
assert key in interp_methods.keys()
assert interp_methods[key] in ['n', 'c', 'l']
self._interp_methods = interp_methods
self._adjustment_multiple_of = net_input_size_adjustment_multiple_of
self._min_input_size = None
self._batch_size = None
def _initialize_train(self, kwargs):
self._initialize(kwargs)
def _initialize_test(self, kwargs):
self._initialize(kwargs)
def _initialize(self, kwargs):
# we make sure, now that the network is available, that
# all names in the provided data dict have a corresponding match
# in the network
if 'test' in kwargs['callback_signal']:
net = kwargs['testnet']
else:
net = kwargs['net']
for key in list(self._blobinfos.keys()):
assert key in list(net.blobs.keys()), (
'data key has no corresponding network blob {} {}'.format(
key, str(list(net.blobs.keys()))))
assert net.blobs[key].data.ndim == 4
if self._adjustment_multiple_of > 0:
if self._min_input_size is None:
self._min_input_size = net.blobs[key].data.shape[2:4]
else:
assert (net.blobs[key].data.shape[2:4] ==
self._min_input_size), (
'if automatic input size adjustment is '
'activated, all inputs must be of same size '
'(first: {}, {}: {})'.format(
self._min_input_size, key,
net.blobs[key].data.shape[2:4]))
def _pre_fit(self, kwargs):
if 'test' in kwargs['callback_signal']:
net = kwargs['testnet']
else:
net = kwargs['net']
self._batch_size = net.blobs[
list(self._blobinfos.keys())[0]].data.shape[0]
if self._adjustment_multiple_of > 0:
assert self._batch_size == 1, (
"If size adjustment is activated, the batch size must be one!")
def _pre_test(self, kwargs):
self._pre_fit(kwargs)
def _pre_train_batch(self, kwargs):
self._pre_batch(kwargs['net'], kwargs)
def _pre_test_batch(self, kwargs):
self._pre_batch(kwargs['testnet'], kwargs)
# pylint: disable=C0111, W0613, R0912, too-many-locals
def _pre_batch(self, net, kwargs):
scales = None
sizes = None
if not 'data_orig' in kwargs.keys():
raise Exception(
"This data monitor needs a data providing monitor "
"to run in advance (e.g., a CyclingDataMonitor with "
"`only_preload`)!")
for key, value in kwargs['data_orig'].items():
assert len(value) == self._batch_size
if sizes is None:
sizes = []
for img in value:
sizes.append(img.shape[1:3])
else:
for img_idx, img in enumerate(value):
# pylint: disable=unsubscriptable-object
assert img.shape[1:3] == sizes[img_idx]
for key, padval in self._blobinfos.items():
if scales is None:
scales = []
for sample_idx in range(self._batch_size):
if self._random_change_up_to > 0:
scales.append(
self._base_scale +
_np.random.uniform(low=-self._random_change_up_to,
high=self._random_change_up_to))
else:
scales.append(self._base_scale)
for sample_idx in range(self._batch_size):
# Get the scaled data.
scaled_sample = kwargs['data_orig'][key][sample_idx]
if scales[sample_idx] != 1.:
scaled_sample = _np.empty((scaled_sample.shape[0],
int(scaled_sample.shape[1] *
scales[sample_idx]),
int(scaled_sample.shape[2] *
scales[sample_idx])),
dtype='float32')
if self._interp_methods[key] == 'n':
interpolation_method = _cv2INTER_NEAREST
elif self._interp_methods[key] == 'l':
interpolation_method = _cv2INTER_LINEAR
else:
interpolation_method = _cv2INTER_CUBIC
for layer_idx in range(scaled_sample.shape[0]):
scaled_sample[layer_idx] = _cv2resize(
kwargs['data_orig'][key][sample_idx][layer_idx],
(scaled_sample.shape[2],
scaled_sample.shape[1]),
interpolation=interpolation_method)
# If necessary, adjust the network input size.
if self._adjustment_multiple_of > 0:
image_height, image_width = scaled_sample.shape[1:3]
netinput_height = int(max(
self._min_input_size[0] +
_np.ceil(
float(image_height - self._min_input_size[0]) /
self._adjustment_multiple_of) *
self._adjustment_multiple_of,
self._min_input_size[0]))
netinput_width = int(max(
self._min_input_size[1] +
_np.ceil(
float(image_width - self._min_input_size[1]) /
self._adjustment_multiple_of) *
self._adjustment_multiple_of,
self._min_input_size[1]))
net.blobs[key].reshape(1,
scaled_sample.shape[0],
netinput_height,
netinput_width)
# Put the data in place.
net.blobs[key].data[sample_idx] = _pad(
scaled_sample,
net.blobs[key].data.shape[2:4],
val=padval)
[docs] def get_parallel_blob_names(self):
"""Get the names of all blobs that must be provided for the dummy."""
return list(self._blobinfos.keys())
# pylint: disable=too-few-public-methods
[docs]class RotatingMirroringMonitor(ParallelMonitor, Monitor):
r"""
Rotate and/or horizontally mirror samples within blobs.
For every sample, the rotation and mirroring will be consistent
across the blobs.
:param blobinfos: dict(string, int).
A dictionary containing the blob names and the padding values that
will be applied.
:param max_rotation_degrees: float.
The rotation will be sampled uniformly from the interval
[-rotation_degrees, rotation_degrees[ for each sample.
:param mirror_prob: float.
The probability that horizontal mirroring occurs. Is as well sampled
individually for every sample.
:param mirror_value_swaps: dict(string, dict(int, list(2-tuples))).
Specifies for every blob for every layer whether any values must be
swapped if mirroring is applied. This is important when, e.g.,
mirroring annotation maps with left-right information. Every 2-tuple
contains (original value, new value). The locations of the swaps are
determined before any change is applied, so the order of tuples does not
play a role.
:param mirror_layer_swaps: dict(string, list(2-tuples)).
Specifies for every blob whether any layers must be swapped if
mirroring is applied. Can be used together with mirror_value_swaps: in
this case, the `mirror_value_swaps` are applied first, then the layers
are swapped.
"""
# pylint: disable=too-many-arguments
def __init__(self,
blobinfos,
max_rotation_degrees,
mirror_prob=0.,
mirror_value_swaps=None,
mirror_layer_swaps=None):
"""See class documentation."""
self._blobinfos = blobinfos
self._rotation_degrees = max_rotation_degrees
self._mirror_prob = mirror_prob
self._batch_size = None
if mirror_value_swaps is None:
mirror_value_swaps = dict()
for key in list(mirror_value_swaps.keys()):
assert key in self._blobinfos, ("Blob not in handled: {}!"\
.format(key))
for layer_idx in list(mirror_value_swaps[key].keys()):
m_tochange = []
for swappair in mirror_value_swaps[key][layer_idx]:
assert len(swappair) == 2, (
"Swaps must be specified as (from_value, to_value): {}"\
.format(mirror_value_swaps[key][layer_idx]))
assert swappair[0] not in m_tochange, (
"Every value may change only to one new: {}."\
.format(mirror_value_swaps[key][layer_idx]))
m_tochange.append(swappair[0])
assert blobinfos[key] not in swappair, (
"A specified swap value is the fill value for this "
"blob: {}, {}, {}.".format(key,
blobinfos[key][layer_idx],
swappair))
if mirror_layer_swaps is None:
mirror_layer_swaps = dict()
for key in list(mirror_layer_swaps.keys()):
assert key in self._blobinfos, ("Blob not handled: {}!"\
.format(key))
idx_tochange = []
for swappair in mirror_layer_swaps[key]:
assert len(swappair) == 2, (
"Swaps must be specified as (from_value, to_value): {}"\
.format(swappair))
assert (swappair[0] not in idx_tochange and
swappair[1] not in idx_tochange), (
"Every value may only be swapped to or from one "
"position!")
idx_tochange.extend(swappair)
for key in list(self._blobinfos):
if key not in list(mirror_value_swaps.keys()):
mirror_value_swaps[key] = dict()
if key not in list(mirror_layer_swaps.keys()):
mirror_layer_swaps[key] = []
self._mirror_value_swaps = mirror_value_swaps
self._mirror_layer_swaps = mirror_layer_swaps
[docs] def get_parallel_blob_names(self):
"""Get the names of all blobs that must be provided for the dummy."""
return list(self._blobinfos.keys())
def _initialize_train(self, kwargs):
self._initialize(kwargs)
def _initialize_test(self, kwargs):
self._initialize(kwargs)
def _initialize(self, kwargs):
# we make sure, now that the network is available, that
# all names in the provided data dict have a corresponding match
# in the network
if 'test' in kwargs['callback_signal']:
net = kwargs['testnet']
else:
net = kwargs['net']
for key in list(self._blobinfos.keys()):
assert key in list(net.blobs.keys()), (
'data key has no corresponding network blob {} {}'.format(
key, str(list(net.blobs.keys()))))
assert net.blobs[key].data.ndim == 4
for layer_idx in self._mirror_value_swaps[key].keys():
assert layer_idx < net.blobs[key].data.shape[1], ((
"The data for blob {} has not enough layers for swapping "
"{}!").format(key, layer_idx))
for swappair in self._mirror_layer_swaps[key]:
assert (swappair[0] < net.blobs[key].data.shape[1] and
swappair[1] < net.blobs[key].data.shape[1]), (
"Not enough layers in blob {} to swap {}!".format(
key, swappair))
def _pre_fit(self, kwargs):
if 'test' in kwargs['callback_signal']:
net = kwargs['testnet']
else:
net = kwargs['net']
self._batch_size = net.blobs[
list(self._blobinfos.keys())[0]].data.shape[0]
def _pre_test(self, kwargs):
self._pre_fit(kwargs)
def _pre_train_batch(self, kwargs):
self._pre_batch(kwargs['net'], kwargs)
def _pre_test_batch(self, kwargs):
self._pre_batch(kwargs['testnet'], kwargs)
# pylint: disable=C0111, W0613, R0912, too-many-locals
def _pre_batch(self, net, kwargs):
rotations = None
mirrorings = None
spline_interpolation_order = 0
prefilter = False
for key, padval in self._blobinfos.items():
if rotations is None:
rotations = []
if self._rotation_degrees > 0.:
rotations = _np.random.uniform(low=-self._rotation_degrees,
high=self._rotation_degrees,
size=self._batch_size)
else:
rotations = [0.] * self._batch_size
if mirrorings is None:
mirrorings = []
if self._mirror_prob > 0.:
mirrorings = _bernoulli.rvs(self._mirror_prob,
size=self._batch_size)
else:
mirrorings = [0] * self._batch_size
for sample_idx in range(self._batch_size):
if rotations[sample_idx] != 0.:
net.blobs[key].data[sample_idx] = _rotate(
net.blobs[key].data[sample_idx],
rotations[sample_idx],
(1, 2),
reshape=False,
order=spline_interpolation_order,
mode='constant',
cval=padval,
prefilter=prefilter)
if mirrorings[sample_idx] == 1.:
net.blobs[key].data[sample_idx] = \
net.blobs[key].data[sample_idx, :, :, ::-1]
for layer_idx in range(net.blobs[key].data.shape[1]):
if (layer_idx not in
self._mirror_value_swaps[key].keys()):
continue
swap_indices = dict()
swap_tuples = self._mirror_value_swaps[key][layer_idx]
# Swaps.
for swappair in swap_tuples:
swap_indices[swappair[0]] = (
net.blobs[key].data[sample_idx, layer_idx] ==\
swappair[0])
for swappair in swap_tuples:
net.blobs[key].data[sample_idx, layer_idx][
swap_indices[swappair[0]]] = swappair[1]
if len(self._mirror_layer_swaps[key]) > 0:
new_layer_order = list(
range(net.blobs[key].data.shape[1]))
for swappair in self._mirror_layer_swaps[key]:
new_layer_order[swappair[0]],\
new_layer_order[swappair[1]] = \
new_layer_order[swappair[1]],\
new_layer_order[swappair[0]]
net.blobs[key].data[...] = net.blobs[key].data[
:, tuple(new_layer_order)]
# Is covered in example.py, which is run in a subprocess and not detected by
# coverage.py.
# pylint: disable=R0903
class _LossIndicator(object): # pragma: no cover
r"""
A plugin indicator for the ``progressbar`` package.
This must be used in conjunction with the
:py:class:`barrista.monitoring.ProgressIndicator`. If available, it
outputs current loss, accuracy, test loss and test accuracy.
:param progress_indicator:
:py:class:`barrista.monitoring.ProgressIndicator`. The information
source to use.
"""
def __init__(self, progress_indicator):
self.progress_indicator = progress_indicator
def __call__(self, pbar, stats):
r"""Compatibility with new versions of ``progressbar2``."""
return self.update(pbar)
def update(self, pbar): # pylint: disable=W0613
"""The update method to implement by the ``progressbar`` interface."""
if self.progress_indicator.loss is not None:
ret_val = 'Loss: {0:.4f}'.format(self.progress_indicator.loss)
else:
ret_val = 'Loss: -----'
if self.progress_indicator.accuracy is not None:
ret_val += '|Accy: {0:.4f}'.format(
self.progress_indicator.accuracy)
if self.progress_indicator.test_loss is not None:
ret_val += '|TLoss: {0:.4f}'.format(
self.progress_indicator.test_loss)
if self.progress_indicator.test_accuracy is not None:
ret_val += '|TAccy: {0:.4f}'.format(
self.progress_indicator.test_accuracy)
return ret_val
# Is covered in example.py, which is run in a subprocess and not detected by
# coverage.py.
# pylint: disable=R0903
class _SpeedIndicator(object): # pragma: no cover
r"""
A plugin indicator for the ``progressbar`` package.
:param progress_indicator:
:py:class:`barrista.monitoring.ProgressIndicator`. The information
source to use.
"""
def __init__(self, progress_indicator):
self._progress_indicator = progress_indicator
self._active = True
self._last_ret = None
def __call__(self, pbar, stats):
r"""Compatibility with new versions of ``progressbar2``."""
return self.update(pbar)
def update(self, pbar): # pylint: disable=W0613
"""The update method to implement by the ``progressbar`` interface."""
if not self._active:
if self._last_ret is not None:
return self._last_ret
else:
return ''
# pylint: disable=protected-access
ret_val = '|%.2f smpl./s' % (self._progress_indicator._smplps)
self._last_ret = ret_val
return ret_val
[docs]class ProgressIndicator(Monitor): # pragma: no cover
r"""
Generates a progress bar with current information about the process.
The progress bar always displays completion percentage and ETA. If
available, it also displays loss, accuracy, test loss and test accuracy.
It makes use of the following keyword arguments (\* indicates required):
* ``iter``\*,
* ``max_iter``\*,
* ``train_loss``,
* ``test_loss``,
* ``train_accuracy``,
* ``test_accuracy``.
"""
def __init__(self):
"""See class documentation."""
self.loss = None
self.test_loss = None
self.accuracy = None
self.test_accuracy = None
from progressbar import ETA, Percentage, Bar, ProgressBar
self.widgets = [Bar(), Percentage(), ' ', ETA()]
self.pbarclass = ProgressBar
self.pbar = None
self._speed_indicator = None
self._last_update = _time.time()
self._smplps = 0
def _post_train_batch(self, kwargs):
if self.pbar is None:
self._speed_indicator = _SpeedIndicator(self)
if 'train_loss' in list(kwargs.keys()):
widgets = [_LossIndicator(self),
self._speed_indicator] + self.widgets
else:
widgets = [self._speed_indicator] + self.widgets
self.pbar = self.pbarclass(maxval=kwargs['max_iter'],
widgets=widgets)
self.pbar.start()
if 'train_loss' in list(kwargs.keys()):
self.loss = kwargs['train_loss']
if 'train_accuracy' in list(kwargs.keys()):
self.accuracy = kwargs['train_accuracy']
self._smplps = float(kwargs['batch_size']) / (
_time.time() - self._last_update)
self._last_update = _time.time()
self.pbar.update(value=kwargs['iter'])
def _post_test_batch(self, kwargs):
if self.pbar is None:
self._speed_indicator = _SpeedIndicator(self)
if 'test_loss' in list(kwargs.keys()):
widgets = [_LossIndicator(self),
self._speed_indicator] + self.widgets
else:
widgets = [self._speed_indicator] + self.widgets
self.pbar = self.pbarclass(maxval=kwargs['max_iter'],
widgets=widgets)
self.pbar.start()
if 'test_loss' in list(kwargs.keys()):
self.test_loss = kwargs['test_loss']
if 'test_accuracy' in list(kwargs.keys()):
self.test_accuracy = kwargs['test_accuracy']
self._smplps = float(kwargs['batch_size']) / (
_time.time() - self._last_update)
self._last_update = _time.time()
self.pbar.update(value=kwargs['iter'])
def _post_test(self, kwargs):
# Write the mean if possible.
if self.pbar is not None:
if 'test_loss' in list(kwargs.keys()):
self.test_loss = kwargs['test_loss']
if 'test_accuracy' in list(kwargs.keys()):
self.test_accuracy = kwargs['test_accuracy']
self.pbar.update(value=kwargs['iter'])
[docs] def finalize(self, kwargs): # pylint: disable=W0613
"""Call ``progressbar.finish()``."""
if self._speed_indicator is not None:
# pylint: disable=protected-access
self._speed_indicator._active = False
if self.pbar is not None:
self.pbar.finish()
def _sorted_ar_from_dict(inf, key): # pragma: no cover
iters = []
vals = []
for values in inf:
if values.has_key(key):
iters.append(int(values['NumIters']))
vals.append(float(values[key]))
sortperm = _np.argsort(iters)
arr = _np.array([iters, vals]).T
return arr[sortperm, :]
def _draw_perfplot(phases, categories, ars, outfile): # pragma: no cover
"""Draw the performance plots."""
fig, axes = _plt.subplots(nrows=len(categories), sharex=True)
for category_idx, category in enumerate(categories):
ax = axes[category_idx] # pylint: disable=invalid-name
ax.set_title(category.title())
for phase in phases:
if phase + '_' + category not in ars.keys():
continue
ar = ars[phase + '_' + category] # pylint: disable=invalid-name
alpha = 0.7
color = 'b'
if phase == 'test':
alpha = 1.0
color = 'g'
ax.plot(ar[:, 0], ar[:, 1],
label=phase.title(), c=color, alpha=alpha)
if phase == 'test':
ax.scatter(ar[:, 0], ar[:, 1],
c=color, s=50)
ax.set_ylabel(category.title())
ax.grid()
ax.legend(bbox_to_anchor=(1.05, 1), loc=2, borderaxespad=0.)
_plt.savefig(outfile, bbox_inches='tight')
_plt.close(fig)
[docs]class JSONLogger(Monitor): # pylint: disable=R0903
r"""
Logs available information to a JSON file.
The information is stored in a dictionary of lists. The lists contain
score information and the iteration at which it was obtained. The
currently logged scores are loss, accuracy, test loss and test accuracy.
The logger makes use of the following keyword arguments
(\* indicates required):
* ``iter``\*,
:param path: string.
The path to store the file in.
:param name: string.
The filename. Will be prefixed with 'barrista_' and '.json' will be
appended.
:param logging: dict of lists.
The two keys in the dict which are used are test, train.
For each of those a list of keys can be provided, those keys
have to be available in the kwargs/cbparams structure.
Usually the required data is provided by the ResultExtractor.
:param base_iter: int or None.
If provided, add this value to the number of iterations. This overrides
the number of iterations retrieved from a loaded JSON log to append to.
:param write_every: int or None.
Write the JSON log every `write_every` iterations. The log is always
written upon completion of the training. If it is None, the log is only
written on completion.
:param create_plot: bool.
If set to True, create a plot at `path` when the JSON log is written with
the name of the JSON file + `_plot.png`. Default: False.
"""
# pylint: disable=too-many-arguments
def __init__(self,
path,
name,
logging,
base_iter=None,
write_every=None,
create_plot=False):
"""See class documentation."""
import json
self.json_package = json
self.json_filename = str(_os.path.join(
path,
'barrista_' + name + '.json'))
if base_iter is None:
self.base_iter = 0
else:
self.base_iter = base_iter
if _os.path.exists(self.json_filename):
with open(self.json_filename, 'r') as infile:
self.dict = self.json_package.load(infile)
if base_iter is None:
for key in ['train', 'test']:
for infdict in self.dict[key]:
if infdict.has_key('NumIters'):
self.base_iter = max(self.base_iter,
infdict['NumIters'])
_LOGGER.info("Appending to JSON log at %s from iteration %d.",
self.json_filename,
self.base_iter)
else:
self.dict = {'train': [], 'test': [], 'barrista_produced': True}
assert write_every is None or write_every > 0
self._write_every = write_every
self._logging = logging
self._create_plot = create_plot
if self._create_plot:
assert _PLT_AVAILABLE, (
"Matplotlib must be available to use plotting!")
def _initialize_train(self, kwargs):
self._initialize(kwargs)
def _initialize_test(self, kwargs):
self._initialize(kwargs)
def _initialize(self, kwargs): # pylint: disable=unused-argument
for key in list(self._logging.keys()):
assert key in ['train', 'test'], (
'only train and test is supported by this logger')
def _post_test(self, kwargs):
self._post('test', kwargs)
def _post_train_batch(self, kwargs):
self._post('train', kwargs)
def _post(self, phase_name, kwargs): # pylint: disable=C0111
if phase_name not in self._logging: # pragma: no cover
return
if phase_name == 'train':
kwargs['iter'] += kwargs['batch_size']
if (self._write_every is not None and
kwargs['iter'] % self._write_every == 0):
with open(self.json_filename, 'w') as outf:
self.json_package.dump(self.dict, outf)
if self._create_plot: # pragma: no cover
categories = set()
arrs = dict()
for plot_phase_name in ['train', 'test']:
for key in self._logging[plot_phase_name]:
categories.add(key[len(plot_phase_name) + 1:])
arrs[key] = _sorted_ar_from_dict(self.dict[plot_phase_name],
key)
_draw_perfplot(['train', 'test'],
categories,
arrs,
self.json_filename + '_plot.png')
for key in self._logging[phase_name]:
if key in kwargs:
self.dict[phase_name].append({'NumIters':
kwargs['iter'] + self.base_iter,
key: kwargs[key]})
if phase_name == 'train':
kwargs['iter'] -= kwargs['batch_size']
[docs] def finalize(self, kwargs): # pylint: disable=W0613
"""Write the json file."""
with open(self.json_filename, 'w') as outf:
self.json_package.dump(self.dict, outf)
if self._create_plot: # pragma: no cover
categories = set()
arrs = dict()
for phase_name in ['train', 'test']:
for key in self._logging[phase_name]:
categories.add(key[len(phase_name) + 1:])
arrs[key] = _sorted_ar_from_dict(self.dict[phase_name], key)
_draw_perfplot(['train', 'test'],
categories,
arrs,
self.json_filename + '_plot.png')
[docs]class Checkpointer(Monitor): # pylint: disable=R0903
r"""
Writes the network blobs to disk at certain iteration intervals.
The logger makes use of the following keyword arguments
(\* indicates required):
* ``iter``\*,
* ``net``\*,
* ``batch_size``\*.
:param name_prefix: string or None.
The first part of the output filenames to generate. The prefix '_iter_,
the current iteration, as well as '.caffemodel' is added.
If you are using a caffe version from later than Dec. 2015, caffe's
internal snapshot method is exposed to Python and also snapshots the
solver. If it's available, then this method will be used. However,
in that case, it's not possible to influence the storage location
from Python. Please use the solver parameter ``snapshot_prefix``
when constructing the solver instead (this parameter may be None
and is unused then).
:param iterations: int > 0.
Always if the current number of iterations is divisible by iterations,
the network blobs are written to disk. Hence, this value must be a
multiple of the batch size!
"""
def __init__(self,
name_prefix,
iterations,
base_iterations=0):
"""See class documentation."""
assert iterations > 0
_LOGGER.info('Setting up checkpointing with name prefix %s every ' +
'%d iterations.', name_prefix, iterations)
self.name_prefix = name_prefix
self.iterations = iterations
self.created_checkpoints = []
self._base_iterations = base_iterations
# pylint: disable=arguments-differ
def _post_train_batch(self, kwargs, finalize=False):
assert self.iterations % kwargs['batch_size'] == 0, (
'iterations not multiple of batch_size, {} vs {}'.format(
self.iterations, kwargs['batch_size']))
# Prevent double-saving.
if kwargs['iter'] in self.created_checkpoints:
return
if ((kwargs['iter'] + self._base_iterations +
kwargs['batch_size']) % self.iterations == 0 or
finalize):
self.created_checkpoints.append(kwargs['iter'])
# pylint: disable=protected-access
if not hasattr(kwargs['solver']._solver, 'snapshot'): # pragma: no cover
checkpoint_filename = (
self.name_prefix + '_iter_' +
str(int((kwargs['iter'] + self._base_iterations) /
kwargs['batch_size']) + 1) +
'.caffemodel')
_LOGGER.debug("Writing checkpoint to file '%s'.",
checkpoint_filename)
kwargs['net'].save(checkpoint_filename)
else:
# pylint: disable=protected-access
kwargs['solver']._solver.snapshot()
caffe_checkpoint_filename = (self.name_prefix +
'_iter_' +
str((kwargs['iter'] + self._base_iterations) /
kwargs['batch_size'] + 1) +
'.caffemodel')
caffe_sstate_filename = (self.name_prefix +
'_iter_' +
str((kwargs['iter'] + self._base_iterations) /
kwargs['batch_size'] + 1) +
'.solverstate')
_LOGGER.debug('Writing checkpoint to file "[solverprefix]%s" ' +
'and "[solverprefix]%s".',
caffe_checkpoint_filename,
caffe_sstate_filename)
assert _os.path.exists(caffe_checkpoint_filename), (
"An error occured checkpointing to {}. File not found. "
"Make sure the `base_iterations` and the `name_prefix` "
"are correct.").format(caffe_checkpoint_filename)
assert _os.path.exists(caffe_sstate_filename), (
"An error occured checkpointing to {}. File not found. "
"Make sure the `base_iterations` and the `name_prefix` "
"are correct.").format(caffe_sstate_filename)
[docs] def finalize(self, kwargs):
"""Write a final checkpoint."""
# Account for the counting on iteration increase for the last batch.
kwargs['iter'] -= kwargs['batch_size']
self._post_train_batch(kwargs, finalize=True)
kwargs['iter'] += kwargs['batch_size']
[docs]class GradientMonitor(Monitor):
"""
Tools to keep an eye on the gradient.
Create plots of the gradient. Creates histograms of the gradient for all
``selected_parameters`` and creates an overview plot with the maximum
absolute gradient per layer. If ``create_videos`` is set and ffmpeg is
available, automatically creates videos.
:param write_every: int.
Write every x iterations. Since matplotlib takes some time to run, choose
with care.
:param output_folder: string.
Where to store the outputs.
:param selected_parameters: dict(string, list(int)) or None.
Which parameters to include in the plots. The string is the name of the
layer, the list of integers contains the parts to include, e.g., for a
convolution layer, specify the name of the layer as key and 0 for
the parameters of the convolution weights, 1 for the biases per channel.
The order and meaning of parameter blobs is determined by caffe. If
None, then all parameters are plotted. Default: None.
:param relative: Bool.
If set to True, will give the weights relative to the max absolute weight
in the target parameter blob. Default: False.
:param iteroffset: int.
An iteration offset if training is resumed to not overwrite existing
output. Default: 0.
:param create_videos: Bool.
If set to True, try to create a video using ffmpeg. Default: True.
:param video_frame_rate: int.
The video frame rate.
"""
def __init__(self, # pylint: disable=too-many-arguments
write_every,
output_folder,
selected_parameters=None,
relative=False,
iteroffset=0,
create_videos=True,
video_frame_rate=1):
assert write_every > 0
self._write_every = write_every
self._output_folder = output_folder
self._selected_parameters = selected_parameters
self._relative = relative
self._n_parameters = None
self._iteroffset = iteroffset
self._create_videos = create_videos
self._video_frame_rate = video_frame_rate
def _initialize_train(self, kwargs): # pragma: no cover
assert _PLT_AVAILABLE, (
"Matplotlib must be available to use the GradientMonitor!")
assert self._write_every % kwargs['batch_size'] == 0, (
"`write_every` must be a multiple of the batch size!")
self._n_parameters = 0
if self._selected_parameters is not None:
for name in self._selected_parameters.keys():
assert name in kwargs['net'].params.keys()
for p_idx in self._selected_parameters[name]:
assert p_idx >= 0
assert len(kwargs['net'].params[name]) > p_idx
self._n_parameters += 1
else:
self._selected_parameters = _collections.OrderedDict()
for name in kwargs['net'].params.keys():
self._selected_parameters[name] = range(len(
kwargs['net'].params[name]))
self._n_parameters += len(kwargs['net'].params[name])
# pylint: disable=too-many-locals
def _post_train_batch(self, kwargs): # pragma: no cover
if kwargs['iter'] % self._write_every == 0:
net = kwargs['net']
maxabsupdates = {}
maxabsupdates_flat = []
# Create histograms.
fig, axes = _plt.subplots(nrows=1,
ncols=self._n_parameters,
figsize=(self._n_parameters * 3, 3))
ax_idx = 0
xfmt = _tkr.FormatStrFormatter('%.1e')
for lname in self._selected_parameters.keys():
maxabsupdates[lname] = []
for p_idx in self._selected_parameters[lname]:
if self._relative:
lgradient = (net.params[lname][p_idx].diff /
net.params[lname][p_idx].data.max())
else:
lgradient = net.params[lname][p_idx].diff
maxabsupdates[lname].append(_np.max(_np.abs(lgradient)))
maxabsupdates_flat.append(_np.max(_np.abs(lgradient)))
axes[ax_idx].set_title(lname + ', p%d' % (p_idx))
axes[ax_idx].hist(list(lgradient.flat),
25,
normed=1,
alpha=0.5)
axes[ax_idx].set_xticks(_np.linspace(-maxabsupdates_flat[-1],
maxabsupdates_flat[-1],
num=3))
axes[ax_idx].yaxis.set_visible(False)
axes[ax_idx].xaxis.set_major_formatter(xfmt)
ax_idx += 1
_plt.tight_layout(rect=[0, 0.03, 1, 0.95])
_plt.suptitle("Gradient histograms for iteration %d" % (
kwargs['iter'] + self._iteroffset))
if self._relative:
ghname = self._output_folder + 'gradient_hists_rel_%d.png' % (
(self._iteroffset + kwargs['iter']) /
self._write_every)
else:
ghname = self._output_folder + 'gradient_hists_%d.png' % (
(self._iteroffset + kwargs['iter']) /
self._write_every)
_plt.savefig(ghname)
_plt.close(fig)
# Create the magnitude overview plot.
fig = _plt.figure(figsize=(self._n_parameters * 1, 1.5))
_plt.title("Maximum absolute gradient per layer (iteration %d)" % (
kwargs['iter'] + self._iteroffset))
ax = _plt.gca() # pylint: disable=invalid-name
# pylint: disable=invalid-name
im = ax.imshow(_np.atleast_2d(_np.array(maxabsupdates_flat)),
interpolation='none')
ax.yaxis.set_visible(False)
divider = _make_axes_locatable(ax)
cax = divider.append_axes("right", size="10%", pad=0.05)
_plt.colorbar(im, cax=cax, ticks=_np.linspace(_np.min(maxabsupdates_flat),
_np.max(maxabsupdates_flat),
5))
_plt.tight_layout(rect=[0, 0.03, 1, 0.95])
if self._relative:
gmname = self._output_folder + 'gradient_magnitude_rel_%d.png' % (
(self._iteroffset + kwargs['iter']) /
self._write_every)
else:
gmname = self._output_folder + 'gradient_magnitude_%d.png' % (
(self._iteroffset + kwargs['iter']) /
self._write_every)
_plt.savefig(gmname)
_plt.close(fig)
[docs] def finalize(self, kwargs): # pragma: no cover
if self._create_videos:
try:
if not _os.path.exists(_os.path.join(self._output_folder,
'videos')):
_os.mkdir(_os.path.join(self._output_folder, 'videos'))
if self._relative:
rel_add = '_rel'
else:
rel_add = ''
with open(_os.devnull, 'w') as quiet:
_subprocess.check_call([
'ffmpeg',
'-start_number', str(0),
'-r', str(self._video_frame_rate),
'-i', _os.path.join(self._output_folder,
'gradient_hists' + rel_add + '_%d.png'),
_os.path.join(self._output_folder,
'videos',
'gradient_hists' + rel_add + '.mp4')
], stdout=quiet, stderr=quiet)
_subprocess.check_call([
'ffmpeg',
'-start_number', str(0),
'-r', str(self._video_frame_rate),
'-i', _os.path.join(self._output_folder,
'gradient_magnitude' + rel_add + '_%d.png'),
_os.path.join(self._output_folder,
'videos',
'gradient_magnitude' + rel_add + '.mp4')
], stdout=quiet, stderr=quiet)
except Exception as ex: # pylint: disable=broad-except
_LOGGER.error(
"Could not create videos! Error: %s. Is " +
"ffmpeg available on the command line?",
str(ex))
[docs]class ActivationMonitor(Monitor):
"""
Tools to keep an eye on the net activations.
Create plots of the net activations. If ``create_videos`` is set and
ffmpeg is available, automatically creates videos.
:param write_every: int.
Write every x iterations. Since matplotlib takes some time to run, choose
with care.
:param output_folder: string.
Where to store the outputs.
:param selected_blobs: list(string) or None.
Which blobs to include in the plots. If
None, then all parameters are plotted. Default: None.
:param iteroffset: int.
An iteration offset if training is resumed to not overwrite existing
output. Default: 0.
:param sample: dict(string, NDarray(3D)).
A sample to use that will be forward propagated to obtain the activations.
Must contain one for every input layer of the network. Each sample is not
preprocessed and must fit the input. If None, use the existing values
from the blobs.
:param create_videos: Bool.
If set to True, try to create a video using ffmpeg. Default: True.
:param video_frame_rate: int.
The video frame rate.
"""
# pylint: disable=too-many-arguments
def __init__(self, # pragma: no cover
write_every,
output_folder,
selected_blobs=None,
iteroffset=0,
sample=None,
create_videos=True,
video_frame_rate=1):
assert write_every > 0
self._write_every = write_every
self._output_folder = output_folder
self._selected_blobs = selected_blobs
self._n_parameters = None
self._iteroffset = iteroffset
self._create_videos = create_videos
self._video_frame_rate = video_frame_rate
self._sample = sample
def _initialize_train(self, kwargs): # pragma: no cover
assert _PLT_AVAILABLE, (
"Matplotlib must be available to use the ActivationMonitor!")
assert self._write_every % kwargs['batch_size'] == 0, (
"`write_every` must be a multiple of the batch size!")
self._n_parameters = 0
if self._selected_blobs is not None:
for name in self._selected_blobs:
assert name in kwargs['net'].blobs.keys(), (
"The activation monitor should monitor {}, which is not "
"part of the net!").format(name)
self._n_parameters += 1
else:
self._selected_blobs = []
for name in kwargs['net'].blobs.keys():
bshape = kwargs['net'].blobs[name].data.shape
if len(bshape) == 4:
self._selected_blobs.append(name)
self._n_parameters += 1
if self._sample is not None:
for inp_name in self._sample.keys():
assert (kwargs['net'].blobs[inp_name].data.shape[1:] ==
self._sample[inp_name].shape), (
"All provided inputs as `sample` must have the shape "
"of an input blob, starting from its sample "
"dimension. Does not match for %s: %s vs. %s." % (
inp_name,
str(kwargs['net'].blobs[inp_name].data.shape[1:]),
str(self._sample[inp_name].shape)))
# pylint: disable=too-many-locals
def _post_train_batch(self, kwargs): # pragma: no cover
if kwargs['iter'] % self._write_every == 0:
net = kwargs['net']
if self._sample is not None:
for bname in self._sample.keys():
net.blobs[bname].data[-1, ...] = self._sample[bname]
net.forward()
for bname in self._selected_blobs:
blob = net.blobs[bname].data
nchannels = blob.shape[1]
gridlen = int(_np.ceil(_np.sqrt(nchannels)))
fig, axes = _plt.subplots(nrows=gridlen,
ncols=gridlen,
squeeze=False)
bmin = blob[-1].min()
bmax = blob[-1].max()
for c_idx in range(nchannels):
ax = axes.flat[c_idx] # pylint: disable=invalid-name
im = ax.imshow(blob[-1, c_idx], # pylint: disable=invalid-name
vmin=bmin,
vmax=bmax,
cmap='Greys_r',
interpolation='none')
ax.set_title('C%d' % (c_idx))
ax.yaxis.set_visible(False)
ax.xaxis.set_visible(False)
# pylint: disable=undefined-loop-variable
for blank_idx in range(c_idx + 1, gridlen * gridlen):
ax = axes.flat[blank_idx] # pylint: disable=invalid-name
ax.axis('off')
_plt.tight_layout(rect=[0, 0.03, 1, 0.95])
_plt.suptitle("Activations in blob %s (iteration %d)" % (
bname, self._iteroffset + kwargs['iter']))
cbax, cbkw = _colorbar.make_axes([ax for ax in axes.flat])
fig.colorbar(im, cax=cbax, **cbkw)
_plt.savefig(self._output_folder +
'activations_%s_%d.png' % (
bname,
(self._iteroffset + kwargs['iter']) /
self._write_every))
_plt.close(fig)
[docs] def finalize(self, kwargs): # pragma: no cover
if self._create_videos:
try:
if not _os.path.exists(_os.path.join(self._output_folder,
'videos')):
_os.mkdir(_os.path.join(self._output_folder, 'videos'))
for bname in self._selected_blobs:
with open(_os.devnull, 'w') as quiet:
_subprocess.check_call([
'ffmpeg',
'-start_number', str(0),
'-r', str(self._video_frame_rate),
'-i', _os.path.join(self._output_folder,
'activations_' + bname + '_%d.png'),
_os.path.join(self._output_folder,
'videos',
'activations_' + bname + '.mp4')
], stdout=quiet, stderr=quiet)
except Exception as ex: # pylint: disable=broad-except
_LOGGER.error(
"Could not create videos! Error: %s. Is " +
"ffmpeg available on the command line?",
str(ex))
[docs]class FilterMonitor(Monitor):
"""
Tools to keep an eye on the filters.
Create plots of the network filters. Creates filter plots for all
``selected_parameters``. If ``create_videos`` is set and ffmpeg is
available, automatically creates videos.
:param write_every: int.
Write every x iterations. Since matplotlib takes some time to run, choose
with care.
:param output_folder: string.
Where to store the outputs.
:param selected_parameters: dict(string, list(int)) or None.
Which parameters to include in the plots. The string is the name of the
layer, the list of integers contains the parts to include, e.g., for a
convolution layer, specify the name of the layer as key and 0 for
the parameters of the convolution weights, 1 for the biases per channel.
The order and meaning of parameter blobs is determined by caffe. If
None, then all parameters are plotted. **Only 4D blobs can be plotted!**
Default: None.
:param iteroffset: int.
An iteration offset if training is resumed to not overwrite existing
output. Default: 0.
:param create_videos: Bool.
If set to True, try to create a video using ffmpeg. Default: True.
:param video_frame_rate: int.
The video frame rate.
"""
# pylint: disable=too-many-arguments
def __init__(self, # pragma: no cover
write_every,
output_folder,
selected_parameters=None,
iteroffset=0,
create_videos=True,
video_frame_rate=1):
assert write_every > 0
self._write_every = write_every
self._output_folder = output_folder
self._selected_parameters = selected_parameters
self._n_parameters = None
self._iteroffset = iteroffset
self._create_videos = create_videos
self._video_frame_rate = video_frame_rate
def _initialize_train(self, kwargs): # pragma: no cover
assert _PLT_AVAILABLE, (
"Matplotlib must be available to use the FilterMonitor!")
assert self._write_every % kwargs['batch_size'] == 0, (
"`write_every` must be a multiple of the batch size!")
self._n_parameters = 0
if self._selected_parameters is not None:
for name in self._selected_parameters.keys():
assert name in kwargs['net'].params.keys()
for p_idx in self._selected_parameters[name]:
assert p_idx >= 0
assert len(kwargs['net'].params[name][p_idx].data.shape) == 4
self._n_parameters += 1
else:
self._selected_parameters = _collections.OrderedDict()
for name in kwargs['net'].params.keys():
self._selected_parameters[name] = []
for pindex in range(len(kwargs['net'].params[name])):
if len(kwargs['net'].params[name][pindex].data.shape) == 4:
self._selected_parameters[name].append(pindex)
self._n_parameters += 1
def _post_train_batch(self, kwargs): # pragma: no cover
if kwargs['iter'] % self._write_every == 0:
net = kwargs['net']
for pname in self._selected_parameters.keys():
for pindex in self._selected_parameters[pname]:
fig = _plt.figure()
param = net.params[pname][pindex].data
border = 2
collected_weights = _np.zeros((param.shape[0] *
(param.shape[2] + border) +
border,
param.shape[1] *
(param.shape[3] + border) +
border), dtype='float32')
pmin = param.min()
pmax = param.max()
# Build up the plot manually because matplotlib is too slow.
for filter_idx in range(param.shape[0]):
for layer_idx in range(param.shape[1]):
collected_weights[border + filter_idx * (param.shape[2] + border):
border + filter_idx * (param.shape[2] + border) +
param.shape[2],
border + layer_idx * (param.shape[3] + border):
border + layer_idx * (param.shape[3] + border) +
param.shape[3]] = (
(param[filter_idx, layer_idx] - pmin)
/ (pmax - pmin))
_plt.imshow(collected_weights,
cmap='Greys_r',
interpolation='none')
ax = _plt.gca() # pylint: disable=invalid-name
ax.yaxis.set_visible(False)
ax.xaxis.set_visible(False)
ax.set_title((
"Values of layer %s, param %d\n" +
"(iteration %d, min %.1e, max %.1e)") % (
pname, pindex, self._iteroffset + kwargs['iter'], pmin, pmax))
_plt.savefig(self._output_folder +
'parameters_%s_%d_%d.png' % (
pname,
pindex,
(self._iteroffset + kwargs['iter']) /
self._write_every))
_plt.close(fig)
[docs] def finalize(self, kwargs): # pragma: no cover
if self._create_videos:
try:
if not _os.path.exists(_os.path.join(self._output_folder,
'videos')):
_os.mkdir(_os.path.join(self._output_folder, 'videos'))
for pname in self._selected_parameters.keys():
for pindex in self._selected_parameters[pname]:
with open(_os.devnull, 'w') as quiet:
_subprocess.check_call([
'ffmpeg',
'-start_number', str(0),
'-r', str(self._video_frame_rate),
'-i', _os.path.join(self._output_folder,
'parameters_' +
pname + '_' +
str(pindex) + '_' +
'%d.png'),
_os.path.join(self._output_folder,
'videos',
'parameters_' +
pname + '_' +
str(pindex) + '.mp4')
], stdout=quiet, stderr=quiet)
except Exception as ex: # pylint: disable=broad-except
_LOGGER.error(
"Could not create videos! Error: %s. Is " +
"ffmpeg available on the command line?",
str(ex))