ansys.dpf.core.Workflow ======================= .. py:class:: ansys.dpf.core.Workflow(workflow=None, server=None) Represents a workflow. A workflow is a black box containing operators and exposing only the necessary operator's inputs and outputs to compute a given algorithm. :param server: Server with channel connected to the remote or local instance. The default is ``None``, in which case an attempt is made to use the global server. :type server: DPFServer, optional :param workflow: :type workflow: ctypes.c_void_p, workflow_message_pb2.Workflow, optional .. rubric:: Examples Create a generic workflow computing the minimum of displacement by chaining the ``'U'`` and ``'min_max_fc'`` operators. >>> from ansys.dpf import core as dpf >>> disp_op = dpf.operators.result.displacement() >>> max_fc_op = dpf.operators.min_max.min_max_fc(disp_op) >>> workflow = dpf.Workflow() >>> workflow.add_operators([disp_op,max_fc_op]) >>> workflow.set_input_name("data_sources", disp_op.inputs.data_sources) >>> workflow.set_output_name("min", max_fc_op.outputs.field_min) >>> workflow.set_output_name("max", max_fc_op.outputs.field_max) >>> from ansys.dpf.core import examples >>> data_src = dpf.DataSources(examples.find_multishells_rst()) >>> workflow.connect("data_sources", data_src) >>> min = workflow.get_output("min", dpf.types.field) # doctest: +SKIP >>> max = workflow.get_output("max", dpf.types.field) # doctest: +SKIP .. py:attribute:: _server .. py:attribute:: _api_instance :value: None .. py:property:: progress_bar :type: bool True). :type: Enable or disable progress bar display when requesting workflow output (default .. py:property:: _api :type: ansys.dpf.gate.workflow_abstract_api.WorkflowAbstractAPI .. py:method:: _getoutput_string(self, pin) :staticmethod: .. py:method:: _connect_string(self, pin, str) :staticmethod: .. py:method:: _getoutput_string_as_bytes(self, pin) :staticmethod: .. py:method:: _getoutput_bytes(self, pin) :staticmethod: .. py:method:: _connect_string_as_bytes(self, pin, str) :staticmethod: .. py:method:: connect(pin_name, inpt, pin_out=0) Connect an input on the workflow using a pin name. :param pin_name: Name of the pin to connect. This name should be exposed before with wf.set_input_name :type pin_name: str :param inpt: Field, FieldsContainer, Scoping, ScopingsContainer, :type inpt: str, int, double, bool, list of int, list of doubles, :param MeshedRegion: Object to connect to. :param MeshesContainer: Object to connect to. :param DataSources: Object to connect to. :param Operator: Object to connect to. :param pin_out: If the input is an operator, the output pin of the input operator. The default is ``0``. :type pin_out: int, optional .. rubric:: Examples Create a generic workflow computing the minimum of displacement by chaining the ``'U'`` and ``'min_max_fc'`` operators. >>> from ansys.dpf import core as dpf >>> disp_op = dpf.operators.result.displacement() >>> max_fc_op = dpf.operators.min_max.min_max_fc(disp_op) >>> workflow = dpf.Workflow() >>> workflow.add_operators([disp_op,max_fc_op]) >>> workflow.set_input_name("data_sources", disp_op.inputs.data_sources) >>> workflow.set_output_name("min", max_fc_op.outputs.field_min) >>> workflow.set_output_name("max", max_fc_op.outputs.field_max) >>> from ansys.dpf.core import examples >>> data_src = dpf.DataSources(examples.find_multishells_rst()) >>> workflow.connect("data_sources", data_src) >>> min = workflow.get_output("min", dpf.types.field) # doctest: +SKIP >>> max = workflow.get_output("max", dpf.types.field) # doctest: +SKIP .. py:property:: _type_to_input_method .. py:property:: _type_to_output_method .. py:method:: get_output(pin_name, output_type) Retrieve the output of the operator on the pin number. A progress bar following the workflow state is printed. :param pin_name: Name of the pin to retrieve. This name should be exposed before with wf.set_output_name :type pin_name: str :param output_type: Type of the requested output. :type output_type: core.type enum .. py:method:: set_input_name(name, *args) Set the name of the input pin of the workflow to expose it for future connection. :param name: Name of the pin to connect. This name should be exposed before with wf.set_input_name :type name: str :param \*args: Operator with its input pin number or input to name. :type \*args: core.Operator, core.Input, int .. rubric:: Examples >>> from ansys.dpf import core as dpf >>> workflow = dpf.Workflow() >>> disp_op = dpf.operators.result.displacement() >>> max_fc_op = dpf.operators.min_max.min_max_fc(disp_op) >>> workflow.set_input_name("data_sources", disp_op.inputs.data_sources) >>> from ansys.dpf.core import examples >>> data_src = dpf.DataSources(examples.find_multishells_rst()) >>> workflow.connect("data_sources", data_src) .. py:method:: set_output_name(name, *args) Set the name of the output pin of the workflow to expose it for future connection. :param name: Name of the pin to connect. This name should be exposed before with wf.set_input_name :type name: str :param \*args: Operator with its outpt pin number or output to name. :type \*args: core.Operator, core.Output, int .. rubric:: Examples >>> from ansys.dpf import core as dpf >>> from ansys.dpf.core import examples >>> workflow = dpf.Workflow() >>> model = dpf.Model(examples.find_simple_bar()) >>> disp_op = model.results.displacement() >>> max_fc_op = dpf.operators.min_max.min_max_fc(disp_op) >>> workflow.set_output_name("contour", disp_op.outputs.fields_container) >>> fc = workflow.get_output("contour", dpf.types.fields_container) # doctest: +SKIP .. py:method:: add_operators(operators) Add operators to the list of operators of the workflow. :param operators: Operators to add to the list. :type operators: dpf.core.Operator, list of dpf.core.Operator .. rubric:: Examples >>> from ansys.dpf import core as dpf >>> workflow = dpf.Workflow() >>> disp_op = dpf.Operator("U") >>> max_op = dpf.Operator("min_max") >>> workflow.add_operators([disp_op, max_op]) .. py:method:: add_operator(operator) Add an operator to the list of operators of the workflow. :param operator: Operator to add to the list. :type operator: dpf.core.Operator .. rubric:: Examples >>> from ansys.dpf import core as dpf >>> workflow = dpf.Workflow() >>> disp_op = dpf.Operator("U") >>> workflow.add_operator(disp_op) .. py:method:: record(identifier='', transfer_ownership=True) Add the workflow to DPF's internal registry with an ID returned by this method. The workflow can be recovered by ``dpf.core.Workflow.get_recorded_workflow(id)``. :param identifier: Name given to the workflow. :type identifier: str, optional :param transfer_ownership: Whether to transfer the ownership. The default is ``True``. If the ownership is not transferred, the workflow is removed from the internal registry as soon as the workflow has been recovered by its ID. :type transfer_ownership: bool :rtype: int .. rubric:: Examples >>> from ansys.dpf import core as dpf >>> workflow = dpf.Workflow() >>> disp_op = dpf.Operator("U") >>> workflow.add_operator(disp_op) >>> # ... >>> id = workflow.record() >>> workflow_copy = dpf.Workflow.get_recorded_workflow(id) .. py:method:: get_recorded_workflow(id, server=None) :staticmethod: Retrieve a workflow registered (with workflow.record()). :param id: ID given by the method "record". :type id: int :returns: **workflow** -- workflow registered in dpf's registry (server side) :rtype: core.Workflow() .. rubric:: Examples >>> from ansys.dpf import core as dpf >>> workflow = dpf.Workflow() >>> disp_op = dpf.Operator("U") >>> workflow.add_operator(disp_op) >>> # ... >>> id = workflow.record() >>> workflow_copy = dpf.Workflow.get_recorded_workflow(id) .. py:property:: info Dictionary with the operator names and the exposed input and output names. :returns: **info** -- Dictionary with ``"operator_names"``, ``"input_names"``, and ``"output_names"`` key. :rtype: dictionarry str->list str .. py:property:: operator_names List of the names of operators added in the workflow. :returns: **names** :rtype: list str .. py:property:: input_names List of the input names exposed in the workflow with set_input_name. :returns: **names** :rtype: list str .. py:property:: output_names List of the output names exposed in the workflow with set_output_name. :returns: **names** :rtype: list str .. py:method:: connect_with(left_workflow: Workflow, output_input_names: Union[tuple[str, str], dict[str, str]] = None, permissive: bool = True) Prepend a given workflow to the current workflow. Updates the current workflow to include all the operators of the workflow given as argument. Outputs of the given workflow are connected to inputs of the current workflow according to the map. All outputs of the given workflow become outputs of the current workflow. :param left_workflow: The given workflow's outputs are chained with the current workflow's inputs. :param output_input_names: Map used to connect the outputs of the given workflow to the inputs of the current workflow. Check the names of available inputs and outputs for each workflow using `Workflow.input_names` and `Workflow.output_names`. The default is ``None``, in which case it tries to connect each output of the left_workflow with an input of the current workflow with the same name. :param permissive: Whether to filter 'output_input_names' to only keep available connections. Otherwise raise an error if 'output_input_names' contains unavailable inputs or outputs. .. rubric:: Examples :: +-------------------------------------------------------------------------------------------------+ | INPUT: | | | |input_output_names = ("output","field" ) | | _____________ ____________ | | "data_sources" -> |left_workflow| -> "stuff" "field" -> | this | -> "contour" | |"time_scoping" -> | | "mesh_scoping" -> | | | | |_____________| -> "output" |____________| | | OUTPUT | | ____ | |"data_sources" -> |this| -> "stuff" | |"time_scoping" -> | | -> "contour" | |"mesh_scoping" -> |____| -> "output" | +-------------------------------------------------------------------------------------------------+ # noqa: E501 >>> import ansys.dpf.core as dpf >>> left_wf = dpf.Workflow() >>> op1 = dpf.operators.utility.forward() >>> left_wf.set_input_name("op1_input", op1.inputs.any) >>> left_wf.set_output_name("op1_output", op1.outputs.any) >>> op2 = dpf.operators.utility.forward() >>> left_wf.set_input_name("op2_input", op2.inputs.any) >>> left_wf.set_output_name("op2_output", op2.outputs.any) >>> left_wf.add_operators([op1, op2]) >>> print(f"{left_wf.input_names=}") left_wf.input_names=['op1_input', 'op2_input'] >>> print(f"{left_wf.output_names=}") left_wf.output_names=['op1_output', 'op2_output'] >>> current_wf = dpf.Workflow() >>> op3 = dpf.operators.utility.forward() >>> current_wf.set_input_name("op3_input", op3.inputs.any) >>> current_wf.set_output_name("op3_output", op3.outputs.any) >>> op4 = dpf.operators.utility.forward() >>> current_wf.set_input_name("op4_input", op4.inputs.any) >>> current_wf.set_output_name("op4_output", op4.outputs.any) >>> current_wf.add_operators([op3, op4]) >>> print(f"{current_wf.input_names=}") current_wf.input_names=['op3_input', 'op4_input'] >>> print(f"{current_wf.output_names=}") current_wf.output_names=['op3_output', 'op4_output'] >>> output_input_names = {"op2_output": "op3_input"} >>> current_wf.connect_with(left_wf, output_input_names) >>> print(f"New {current_wf.input_names=}") New current_wf.input_names=['op1_input', 'op2_input', 'op4_input'] >>> print(f"New {current_wf.output_names=}") New current_wf.output_names=['op1_output', 'op2_output', 'op3_output', 'op4_output'] .. rubric:: Notes Function available with server's version starting at 3.0. .. py:method:: create_on_other_server(*args, **kwargs) Create a new instance of a workflow on another server. The new Workflow has the same operators, exposed inputs and output pins as this workflow. Connections between operators and between data and operators are kept (except for exposed pins). :param server: Server with channel connected to the remote or local instance. When ``None``, attempts to use the global server. :type server: server.LegacyGrpcServer, optional :param ip: ip address on which the new instance should be created (always put a port in args as well) :type ip: str, optional :param port: :type port: str, int , optional :param address: address on which the new instance should be created ("ip:port") :type address: str, optional :rtype: Workflow .. rubric:: Examples Create a generic Workflow computing the minimum of displacement by chaining the ``'U'`` and ``'min_max_fc'`` operators. >>> from ansys.dpf import core as dpf >>> disp_op = dpf.operators.result.displacement() >>> max_fc_op = dpf.operators.min_max.min_max_fc(disp_op) >>> workflow = dpf.Workflow() >>> workflow.add_operators([disp_op,max_fc_op]) >>> workflow.set_input_name("data_sources", disp_op.inputs.data_sources) >>> workflow.set_output_name("min", max_fc_op.outputs.field_min) >>> workflow.set_output_name("max", max_fc_op.outputs.field_max) >>> #other_server = dpf.start_local_server(as_global=False) >>> #new_workflow = workflow.create_on_other_server(server=other_server) >>> #assert 'data_sources' in new_workflow.input_names .. py:method:: view(title: Union[None, str] = None, save_as: Union[None, str, os.PathLike] = None, off_screen: bool = False, keep_dot_file: bool = False) -> Union[str, None] Run a viewer to show a rendering of the workflow. .. warning:: The workflow is rendered using GraphViz and requires: - installation of GraphViz on your computer (see ``_) - installation of the ``graphviz`` library in your Python environment. :param title: Name to use in intermediate files and in the viewer. :param save_as: Path to a file to save the workflow view as. :param off_screen: Render the image off_screen. :param keep_dot_file: Whether to keep the intermediate DOT file generated. :rtype: Returns the path to the image file rendered is ``save_as``, else None. .. py:method:: to_graphviz(path: Union[os.PathLike, str]) Save the workflow to a GraphViz file. .. py:method:: get_topology() Get the topology of the workflow. :returns: **workflow_topology** :rtype: workflow_topology.WorkflowTopology .. rubric:: Notes Available from 10.0 server version. .. py:method:: __del__() Clean up resources associated with the instance. This method calls the deleter function to release resources. If an exception occurs during deletion, a warning is issued. :raises Warning: If an exception occurs while attempting to delete resources. .. py:method:: __str__() Describe the entity. :returns: **description** :rtype: str