Source code for strawberryfields.engine
# Copyright 2019-2022 Xanadu Quantum Technologies Inc.
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
# http://www.apache.org/licenses/LICENSE-2.0
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""
This module implements :class:`BaseEngine` and its subclasses that are responsible for
communicating quantum programs represented by :class:`.Program` objects
to a backend that could be e.g., a simulator or a hardware quantum processor.
One can think of each BaseEngine instance as a separate quantum computation.
"""
import abc
import collections.abc
import time
import warnings
from typing import Any, Dict, Optional
import numpy as np
import xcc
from strawberryfields.device import Device
from strawberryfields.result import Result
from strawberryfields.io import to_blackbird
from strawberryfields.logger import create_logger
from strawberryfields.program import Program
from strawberryfields.tdm import TDMProgram, reshape_samples
from .backends import load_backend, BosonicBackend
from .backends.base import BaseBackend, NotApplicableError
from ._version import __version__
# for automodapi, do not include the classes that should appear under the top-level strawberryfields namespace
__all__ = ["BaseEngine", "LocalEngine", "BosonicEngine"]
class FailedJobError(Exception):
"""Raised when a job had a failure on the server side."""
[docs]class BaseEngine(abc.ABC):
r"""Abstract base class for quantum program executor engines.
Args:
backend (str): backend short name
backend_options (Dict[str, Any]): keyword arguments for the backend
"""
def __init__(self, backend, *, backend_options=None):
if backend_options is None:
backend_options = {}
#: str: short name of the backend
self.backend_name = backend
#: Dict[str, Any]: keyword arguments for the backend
self.backend_options = backend_options.copy() # dict is mutable
#: List[Program]: list of Programs that have been run
self.run_progs = []
#: List[List[Number]]: latest measurement results, shape == (modes, shots)
self.samples = None
#: Dict[Any, List]: the measurement results as a dictionary with measured modes as keys
self.samples_dict = None
if isinstance(backend, str):
self.backend_name = backend
self.backend = load_backend(backend)
elif isinstance(backend, BaseBackend):
self.backend_name = backend.short_name
self.backend = backend
else:
raise TypeError("backend must be a string or a BaseBackend instance.")
@abc.abstractmethod
def __str__(self):
"""String representation."""
[docs] @abc.abstractmethod
def reset(self, backend_options):
r"""Re-initialize the quantum computation.
Resets the state of the engine and the quantum circuit represented by the backend.
* The original number of modes is restored.
* All modes are reset to the vacuum state.
* All registers of previously run Programs are cleared of measured values.
* List of previously run Progams is cleared.
Note that the reset does nothing to any Program objects in existence, beyond
erasing the measured values.
**Example:**
.. code-block:: python
# create a program
prog = sf.Program(3)
with prog.context as q:
ops.Sgate(0.543) | q[1]
ops.BSgate(0.6, 0.1) | (q[2], q[0])
# create an engine
eng = sf.Engine("gaussian")
Running the engine with the above program will
modify the state of the statevector simulator:
>>> eng.run(prog)
>>> eng.backend.is_vacuum()
False
Resetting the engine will return the backend simulator
to the vacuum state:
>>> eng.reset()
>>> eng.backend.is_vacuum()
True
.. note:: The ``reset()`` method only applies to statevector backends.
Args:
backend_options (Dict[str, Any]): keyword arguments for the backend,
updating (overriding) old values
"""
self.backend_options.update(backend_options)
for p in self.run_progs:
p._clear_regrefs()
self.run_progs.clear()
self.samples = None
[docs] def print_applied(self, print_fn=print):
"""Print all the Programs run since the backend was initialized.
This will be blank until the first call to :meth:`~.LocalEngine.run`. The output may
differ compared to :meth:`.Program.print`, due to backend-specific
device compilation performed by :meth:`~.LocalEngine.run`.
**Example:**
.. code-block:: python
# create a program
prog = sf.Program(2)
with prog.context as q:
ops.S2gate(0.543) | (q[0], q[1])
# create an engine
eng = sf.Engine("gaussian")
Initially, the engine will have applied no operations:
>>> eng.print_applied()
None
After running the engine, we can now see the quantum
operations that were applied:
>>> eng.run(prog)
>>> eng.print_applied()
Run 0:
BSgate(0.7854, 0) | (q[0], q[1])
Sgate(0.543, 0) | (q[0])
Sgate(0.543, 0).H | (q[1])
BSgate(0.7854, 0).H | (q[0], q[1])
Note that the :class:`~.S2gate` has been decomposed into
single-mode squeezers and beamsplitters, which are supported
by the ``'gaussian'`` backend.
Subsequent program runs can also be viewed:
.. code-block:: python
# a second program
prog2 = sf.Program(2)
with prog2.context as q:
ops.S2gate(0.543) | (q[0], q[1])
>>> eng.run(prog2)
>>> eng.print_applied()
Run 0:
BSgate(0.7854, 0) | (q[0], q[1])
Sgate(0.543, 0) | (q[0])
Sgate(0.543, 0).H | (q[1])
BSgate(0.7854, 0).H | (q[0], q[1])
Run 1:
Dgate(0.06, 0) | (q[0])
Args:
print_fn (function): optional custom function to use for string printing.
"""
for k, r in enumerate(self.run_progs):
print_fn(f"Run {k}:")
r.print(print_fn)
@abc.abstractmethod
def _init_backend(self, init_num_subsystems):
"""Initialize the backend.
Args:
init_num_subsystems (int): number of subsystems the backend is initialized to
"""
@abc.abstractmethod
def _run_program(self, prog, **kwargs):
"""Execute a single program on the backend.
This method should not be called directly.
Args:
prog (Program): program to run
Returns:
tuple(list[Command], list[array, tensor], dict[int, list]): tuple containing the
commands that were applied to the backend, the samples returned from the backend, and
the samples as a dictionary with the measured modes as keys
"""
# `_run_program` is called in `BaseEngine._run` but should never be called
# from a `BaseEngine` object, in which case an error should be raised.
raise NotImplementedError("'run_program' method must be implemented on child class")
def _run(self, program, *, args, compile_options, **kwargs):
"""Execute the given programs by sending them to the backend.
If multiple Programs are given they will be executed sequentially as
parts of a single computation.
For each :class:`.Program` instance given as input, the following happens:
* The Program instance is compiled for the target backend.
* The compiled program is executed on the backend.
* The measurement results of each subsystem (if any) are stored in the :class:`.RegRef`
instances of the corresponding Program, as well as in :attr:`~BaseEngine.samples`.
* The compiled program is appended to :attr:`~BaseEngine.run_progs`.
Finally, the result of the computation is returned.
Args:
program (Program, Sequence[Program]): quantum programs to run
args (Dict[str, Any]): values for the free parameters in the program(s) (if any)
compile_options (Dict[str, Any]): keyword arguments for :meth:`.Program.compile`
The ``kwargs`` keyword arguments are passed to the backend API calls via :meth:`Operation.apply`.
Returns:
Result: results of the computation
"""
# pop modes so that it's not passed on to the backend API calls via 'op.apply'
modes = kwargs.pop("modes")
if not isinstance(program, collections.abc.Sequence):
program = [program]
kwargs.setdefault("shots", 1)
# NOTE: by putting ``shots`` into keyword arguments, it allows for the
# signatures of methods in Operations to remain cleaner, since only
# Measurements need to know about shots
prev = self.run_progs[-1] if self.run_progs else None # previous program segment
for p in program:
if self.backend.compiler:
default_compiler = getattr(
compile_options.get("device"), "default_compiler", self.backend.compiler
)
compile_options.setdefault("compiler", default_compiler)
# compile the program for the correct backend if a compiler or a device exists
if "compiler" in compile_options or "device" in compile_options:
p = p.compile(**compile_options)
received_rolled = False # whether a TDMProgram had a rolled circuit
if isinstance(p, TDMProgram):
tdm_options = self.get_tdm_options(p, **kwargs)
# pop modes so that it's not passed on to the backend API calls via 'op.apply'
modes = tdm_options.pop("modes")
received_rolled = tdm_options.pop("received_rolled")
kwargs.update(tdm_options)
if prev is None:
# initialize the backend
self._init_backend(p.init_num_subsystems)
else:
# there was a previous program segment
if not p.can_follow(prev):
raise RuntimeError(
f"Register mismatch: program {len(self.run_progs)}, '{p.name}'."
)
# Copy the latest measured values in the RegRefs of p.
# We cannot copy from prev directly because it could be used in more than one
# engine.
for k, v in enumerate(self.samples):
p.reg_refs[k].val = v
# bind free parameters to their values
p.bind_params(args)
p.lock()
_, self.samples, self.samples_dict = self._run_program(p, **kwargs)
self.run_progs.append(p)
if isinstance(p, TDMProgram) and received_rolled:
p.roll()
prev = p
ancillae_samples = None
if isinstance(self.backend, BosonicBackend):
ancillae_samples = self.backend.ancillae_samples_dict.copy()
samples = {"output": [self.samples]}
result = Result(samples, samples_dict=self.samples_dict, ancillae_samples=ancillae_samples)
# if `modes`` is empty (i.e. `modes==[]`) return no state, else return state with
# selected modes (all if `modes==None`)
if modes is None or modes:
# state object requested
# session and feed_dict are needed by TF backend both during simulation (if program
# contains measurements) and state object construction.
result.state = self.backend.state(modes=modes, **kwargs)
return result
[docs] @staticmethod
def get_tdm_options(program, **kwargs):
"""Retrieves the shots and modes for the TDM program and (space)unrolls it if necessary."""
# priority order for the shots value should be kwargs > run_options > 1
shots = kwargs.get("shots", program.run_options.get("shots", 1))
# setting shots >1 for a TDM program corresponds to further unrolling the program,
# meaning that we still only need to execute it once
tdm_options = {"modes": None, "shots": 1 if shots else None, "received_rolled": False}
if program.is_unrolled:
tdm_options["received_rolled"] = True
# if a tdm program is input in a rolled state, then unroll it
if kwargs.get("space_unroll", False):
if program.space_unrolled_circuit is None:
program.space_unroll(shots=shots or 1)
else:
# if `space_unroll != True`, only unroll it iff it isn't already unrolled
if not program.is_unrolled:
program.unroll(shots=shots or 1)
if program.space_unrolled_circuit is not None:
if kwargs.get("crop", False):
tdm_options["modes"] = range(program.get_crop_value(), program.timebins)
else:
tdm_options["modes"] = range(0, program.timebins)
elif kwargs.get("crop", False):
warnings.warn(
"Cropping the state is only possible for space unrolled circuits, which "
"can be done by executing `Program.space_unroll()` prior to running the "
"circuit. Note, this might cause a significant performance hit if sampling!"
)
return tdm_options
[docs]class LocalEngine(BaseEngine):
"""Local quantum program executor engine.
The Strawberry Fields engine is used to execute :class:`.Program` instances
on the chosen local backend, and makes the results available via :class:`.Result`.
**Example:**
The following example creates a Strawberry Fields
quantum :class:`~.Program` and runs it using an engine.
.. code-block:: python
# create a program
prog = sf.Program(2)
with prog.context as q:
ops.S2gate(0.543) | (q[0], q[1])
We initialize the engine with the name of the local backend,
and can pass optional backend options.
>>> eng = sf.Engine("fock", backend_options={"cutoff_dim": 5})
The :meth:`~.LocalEngine.run` method is used to execute quantum
programs on the attached backend, and returns a :class:`.Result`
object containing the results of the execution.
>>> results = eng.run(prog)
Args:
backend (str, BaseBackend): short name of the backend, or a pre-constructed backend instance
backend_options (None, Dict[str, Any]): keyword arguments to be passed to the backend
"""
def __new__(cls, backend, *, backend_options=None):
if backend == "bosonic":
bos_eng = super().__new__(BosonicEngine)
bos_eng.__init__(backend, backend_options=backend_options)
return bos_eng
return super().__new__(cls)
def __str__(self):
return self.__class__.__name__ + f"({self.backend_name})"
[docs] def reset(self, backend_options=None):
backend_options = backend_options or {}
super().reset(backend_options)
self.backend.reset(**self.backend_options)
# TODO should backend.reset and backend.begin_circuit be combined?
def _init_backend(self, init_num_subsystems):
self.backend.begin_circuit(init_num_subsystems, **self.backend_options)
def _run_program(self, prog, **kwargs):
applied = []
samples_dict = {}
batches = self.backend_options.get("batch_size", 0)
for cmd in prog.circuit:
try:
# try to apply it to the backend and, if op is a measurement, store it in values
val = cmd.op.apply(cmd.reg, self.backend, **kwargs)
if val is not None:
for i, r in enumerate(cmd.reg):
if batches:
# Internally also store all the measurement outcomes
if r.ind not in samples_dict:
samples_dict[r.ind] = []
samples_dict[r.ind].append(val[:, :, i])
else:
# Internally also store all the measurement outcomes
if r.ind not in samples_dict:
samples_dict[r.ind] = []
samples_dict[r.ind].append(val[:, i])
applied.append(cmd)
except NotApplicableError:
# command is not applicable to the current backend type
raise NotApplicableError(
f"The operation {cmd.op} cannot be used with {self.backend}."
) from None
except NotImplementedError:
# command not directly supported by backend API
raise NotImplementedError(
f"The operation {cmd.op} has not been implemented in {self.backend} for the "
f"arguments {kwargs}."
) from None
# combine the samples sorted by mode, and cast correctly to array or tensor
samples, samples_dict = self._combine_and_sort_samples(samples_dict)
if isinstance(prog, TDMProgram) and samples_dict:
samples_dict = reshape_samples(samples_dict, prog.measured_modes, prog.N, prog.timebins)
# crop vacuum modes arriving at the detector before the first computational mode
if kwargs.get("crop", False):
samples_dict[0] = samples_dict[0][:, prog.get_crop_value() :]
# transpose the samples so that they have shape `(shots, spatial modes, timebins)`
samples = np.array(list(samples_dict.values())).transpose(1, 0, 2)
return applied, samples, samples_dict
def _combine_and_sort_samples(self, samples_dict):
"""Helper function to combine the values in the samples dictionary sorted by its keys."""
batches = self.backend_options.get("batch_size", 0)
if not samples_dict:
return np.empty((0, 0)), samples_dict
single_sample_dict = {key: val[-1] for key, val in samples_dict.items()}
samples = np.transpose([i for _, i in sorted(single_sample_dict.items())])
# pylint: disable=import-outside-toplevel
if self.backend_name == "tf":
from tensorflow import convert_to_tensor
if batches:
samples = [
np.transpose([i[b] for _, i in sorted(single_sample_dict.items())])
for b in range(batches)
]
samples_dict = {k: [convert_to_tensor(i) for i in v] for k, v in samples_dict.items()}
return convert_to_tensor(samples), samples_dict
samples_dict = {k: [np.array(i) for i in v] for k, v in samples_dict.items()}
return samples, samples_dict
# pylint:disable=too-many-branches
[docs] def run(self, program, *, args=None, compile_options=None, **kwargs):
"""Execute quantum programs by sending them to the backend.
Args:
program (Program, Sequence[Program]): quantum programs to run
args (dict[str, Any]): values for the free parameters in the program(s) (if any)
compile_options (None, Dict[str, Any]): keyword arguments for :meth:`.Program.compile`
Keyword Args:
shots (int): number of times the program measurement evaluation is repeated
crop (bool): Crop vacuum modes present before the first computational modes.
This option crops samples from ``Result.samples`` and ``Result.samples_dict``.
If the program is space unrolled the state in `Result.state` is also cropped.
modes (None, Sequence[int]): Modes to be returned in the ``Result.state`` :class:`.BaseState` object.
``None`` returns all the modes (default). An empty sequence means no state object is returned.
Returns:
Result: results of the computation
"""
args = args or {}
compile_options = compile_options or {}
temp_run_options = {}
if isinstance(program, collections.abc.Sequence):
# successively update all run option defaults.
# the run options of successive programs
# overwrite the run options of previous programs
# in the list
program_lst = program
for p in program:
if isinstance(p, TDMProgram):
raise NotImplementedError("Lists of TDM programs are not currently supported")
temp_run_options.update(p.run_options)
else:
# single program to execute
program_lst = [program]
temp_run_options.update(program.run_options)
temp_run_options.update(kwargs or {})
temp_run_options.setdefault("shots", 1)
temp_run_options.setdefault("modes", None)
# avoid unexpected keys being sent to Operations (note, "modes" shouldn't be passed, but is
# needed in `BaseEngine.run()`; it's popped before being passed on to Operations)
eng_run_keys = ["eval", "session", "feed_dict", "shots", "crop", "space_unroll", "modes"]
eng_run_options = {
key: temp_run_options[key] for key in temp_run_options.keys() & eng_run_keys
}
# check that batching is not used together with shots > 1
if self.backend_options.get("batch_size", 0) and eng_run_options["shots"] > 1:
raise NotImplementedError("Batching cannot be used together with multiple shots.")
# check that post-selection and feed-forwarding is not used together with shots > 1
for p in program_lst:
for c in p.circuit or []:
try:
if c.op.select and eng_run_options["shots"] > 1:
raise NotImplementedError(
"Post-selection cannot be used together with multiple shots."
)
except AttributeError:
pass
if c.op.measurement_deps and eng_run_options["shots"] > 1:
raise NotImplementedError(
"Feed-forwarding of measurements cannot be used together with multiple shots."
)
return super()._run(
program_lst, args=args, compile_options=compile_options, **eng_run_options
)
[docs]class RemoteEngine:
"""A quantum program executor engine that provides a simple interface for
running remote jobs in a blocking or non-blocking manner.
**Example:**
The following examples instantiate an engine with the default settings, and
run both blocking and non-blocking jobs.
Run a blocking job:
>>> engine = RemoteEngine("X8_01")
>>> result = engine.run(program, shots=1) # blocking call
>>> result
[[0 1 0 2 1 0 0 0]]
Run a non-blocking job:
>>> job = engine.run_async(program, shots=1)
>>> job.status
"queued"
>>> job.wait()
>>> job.status
"complete"
>>> result = sf.Result(job.result)
>>> result.samples
array([[0 1 0 2 1 0 0 0]])
Args:
target (str): the target device
connection (xcc.Connection, optional): a connection to the Xanadu Cloud
backend_options (Dict[str, Any], optional): keyword arguments for the backend
"""
POLLING_INTERVAL_SECONDS = 1
DEFAULT_TARGETS = {"X8": "X8_01", "X12": "X12_02"}
def __init__(
self,
target: str,
connection: Optional[xcc.Connection] = None,
backend_options: Optional[Dict[str, Any]] = None,
):
self._target = self.DEFAULT_TARGETS.get(target, target)
self._device = None
self._connection = connection
self._backend_options = backend_options or {}
self.log = create_logger(__name__)
@property
def target(self) -> str:
"""The target device used by the engine.
Returns:
str: the name of the target
"""
return self._target
@property
def connection(self) -> xcc.Connection:
"""The connection used by the engine. A new :class:`xcc.Connection` will
be created and returned each time this property is accessed if the
connection supplied to :meth:`~.RemoteEngine.__init__` was ``None``.
Returns:
xcc.Connection
"""
if self._connection is not None:
return self._connection
settings = xcc.Settings()
return xcc.Connection(
refresh_token=settings.REFRESH_TOKEN,
access_token=settings.ACCESS_TOKEN,
host=settings.HOST,
port=settings.PORT,
tls=settings.TLS,
headers={"User-Agent": f"StrawberryFields/{__version__}"},
)
@property
def device(self) -> Device:
"""The representation of the target device.
Returns:
.strawberryfields.Device: the target device representation
Raises:
requests.exceptions.RequestException: if there was an issue fetching
the device specifications or the device certificate from the Xanadu Cloud
"""
if self._device is None:
device = xcc.Device(target=self.target, connection=self.connection)
self._device = Device(spec=device.specification, cert=device.certificate)
return self._device
[docs] def run(
self, program: Program, *, compile_options=None, recompile=False, **kwargs
) -> Optional[Result]:
"""Runs a blocking job.
In the blocking mode, the engine blocks until the job is completed, failed, or
cancelled. A job in progress can be cancelled with a keyboard interrupt (`ctrl+c`).
If the job completes successfully, the result is returned; if the job
fails or is cancelled, ``None`` is returned.
Args:
program (strawberryfields.Program): the quantum circuit
compile_options (None, Dict[str, Any]): keyword arguments for :meth:`.Program.compile`
recompile (bool): Specifies if ``program`` should be recompiled
using ``compile_options``, or if not provided, the default compilation options.
Keyword Args:
shots (Optional[int]): The number of shots for which to run the job. If this
argument is not provided, the shots are derived from the given ``program``.
integer_overflow_protection (Optional[bool]): Whether to enable the
conversion of integral job results into ``np.int64`` objects.
By default, integer overflow protection is enabled. For more
information, see `xcc.Job.get_result
<https://xanadu-cloud-client.readthedocs.io/en/stable/api/xcc.Job.html#xcc.Job.get_result>`_.
Returns:
strawberryfields.Result, None: the job result if successful, and ``None`` otherwise
Raises:
requests.exceptions.RequestException: if there was an issue fetching
the device specifications from the Xanadu Cloud
FailedJobError: if the remote job fails on the server side ("cancelled" or "failed")
"""
job = self.run_async(
program, compile_options=compile_options, recompile=recompile, **kwargs
)
try:
while True:
# TODO: needed to refresh connection; remove once xcc.Connection
# is able to refresh config info dynamically
job._connection = self.connection
job.clear()
if job.finished:
break
time.sleep(self.POLLING_INTERVAL_SECONDS)
except KeyboardInterrupt as e:
xcc.Job(id_=job.id, connection=self.connection).cancel()
raise KeyboardInterrupt("The job has been cancelled.") from e
if job.status == "failed":
message = (
f"The remote job {job.id} failed due to an internal "
f"server error: {job.metadata}. Please try again."
)
self.log.error(message)
raise FailedJobError(message)
if job.status == "complete":
self.log.info(f"The remote job {job.id} has been completed.")
integer_overflow_protection = kwargs.get("integer_overflow_protection", True)
result = job.get_result(integer_overflow_protection=integer_overflow_protection)
output = result.get("output")
# crop vacuum modes arriving at the detector before the first computational mode
if output and isinstance(program, TDMProgram) and kwargs.get("crop", False):
output[0] = output[0][:, :, program.get_crop_value() :]
return Result(result)
message = f"The remote job {job.id} has failed with status {job.status}: {job.metadata}."
self.log.info(message)
raise FailedJobError(message)
[docs] def run_async(
self, program: Program, *, compile_options=None, recompile=False, **kwargs
) -> xcc.Job:
"""Runs a non-blocking remote job.
In the non-blocking mode, a ``xcc.Job`` object is returned immediately, and the user can
manually refresh the status and check for updated results of the job.
Args:
program (strawberryfields.Program): the quantum circuit
compile_options (None, Dict[str, Any]): keyword arguments for :meth:`.Program.compile`
recompile (bool): Specifies if ``program`` should be recompiled
using ``compile_options``, or if not provided, the default compilation options.
Keyword Args:
shots (Optional[int]): The number of shots for which to run the job. If this
argument is not provided, the shots are derived from the given ``program``.
Returns:
xcc.Job: the created remote job
"""
# get the specific chip to submit the program to
compile_options = compile_options or {}
kwargs.update(self._backend_options)
device = self.device
compiler_name = compile_options.get("compiler", device.default_compiler)
program_is_compiled = program.compile_info is not None
if program_is_compiled and not recompile:
# error handling for program compilation:
# program was compiled but recompilation was not allowed by the
# user
if program.compile_info and (
program.compile_info[0].target != device.target
or program.compile_info[0]._spec != device._spec
):
compile_target = program.compile_info[0].target
# program was compiled for a different device
raise ValueError(
"Cannot use program compiled with "
f"{compile_target} for target {self.target}. "
'Pass the "recompile=True" keyword argument '
f"to compile with {compiler_name}."
)
if not program_is_compiled:
# program is not compiled
msg = f"Compiling program for device {device.target} using compiler {compiler_name}."
self.log.info(msg)
program = program.compile(device=device, **compile_options)
elif recompile:
# recompiling program
if compile_options:
msg = f"Recompiling program for device {device.target} using the specified compiler options: {compile_options}."
else:
msg = f"Recompiling program for device {device.target} using compiler {compiler_name}."
self.log.info(msg)
program = program.compile(device=device, **compile_options)
elif program.compile_info[1][0] == "X":
# validating program
msg = (
f"Program previously compiled for {device.target} using {program.compile_info[1]}. "
f"Validating program against the Xstrict compiler."
)
self.log.info(msg)
program = program.compile(device=device, compiler="Xstrict")
# Update and filter the run options if provided.
# Forwarding the boolean `crop` run option to the hardware will result in undesired
# behaviour as og-tdm also uses that keyword arg, hence it is removed.
temp_run_options = {**program.run_options, **kwargs}
skip_run_keys = ["crop"]
run_options = {
key: temp_run_options[key] for key in temp_run_options.keys() - skip_run_keys
}
if "shots" not in run_options:
raise ValueError("Number of shots must be specified.")
# Serialize a Blackbird circuit for network transmission
bb = to_blackbird(program)
bb._target["options"] = run_options
circuit = bb.serialize()
connection = self.connection
job = xcc.Job.submit(
connection=connection,
name=program.name,
target=self.target,
circuit=circuit,
language=f"blackbird:{bb.version}",
)
return job
def __repr__(self):
return f"<{self.__class__.__name__}: target={self.target}, connection={self._connection}>"
def __str__(self):
return self.__repr__()
[docs]class Engine(LocalEngine):
"""dummy"""
# alias for backwards compatibility
__doc__ = LocalEngine.__doc__
[docs]class BosonicEngine(LocalEngine):
"""Local quantum program executor engine for programs executed on the bosonic backend.
The BosonicEngine is used to execute :class:`.Program` instances on the bosonic backend,
and makes the results available via :class:`.Result`.
"""
def _run_program(self, prog, **kwargs):
# Custom Bosonic run code
applied, samples_dict = self.backend.run_prog(prog, **kwargs)
samples, samples_dict = self._combine_and_sort_samples(samples_dict)
return applied, samples, samples_dict
_modules/strawberryfields/engine
Download Python script
Download Notebook
View on GitHub