You are viewing an unreleased or outdated version of the documentation

Source code for dagster._core.pipes.client

from abc import ABC, abstractmethod
from contextlib import contextmanager
from typing import TYPE_CHECKING, Iterator, Optional, Sequence

from dagster_pipes import (
    PipesContextData,
    PipesExtras,
    PipesParams,
)

from dagster._annotations import experimental, public
from dagster._core.execution.context.compute import OpExecutionContext

if TYPE_CHECKING:
    from .context import PipesExecutionResult, PipesMessageHandler


[docs]@experimental class PipesClient(ABC): """Pipes client base class. Pipes clients for specific external environments should subclass this. """
[docs] @public @abstractmethod def run( self, *, context: OpExecutionContext, extras: Optional[PipesExtras] = None, ) -> "PipesClientCompletedInvocation": """Synchronously execute an external process with the pipes protocol. Args: context (OpExecutionContext): The context from the executing op/asset. extras (Optional[PipesExtras]): Arbitrary data to pass to the external environment. Returns: PipesClientCompletedInvocation: Wrapper containing results reported by the external process. """
@experimental class PipesClientCompletedInvocation: def __init__(self, results: Sequence["PipesExecutionResult"]): self._results = results def get_results(self) -> Sequence["PipesExecutionResult"]: return tuple(self._results)
[docs]@experimental class PipesContextInjector(ABC): @abstractmethod @contextmanager def inject_context(self, context_data: "PipesContextData") -> Iterator[PipesParams]: """A `@contextmanager` that injects context data into the external process. This method should write the context data to a location accessible to the external process. It should yield parameters that the external process can use to locate and load the context data. Args: context_data (PipesContextData): The context data to inject. Yields: PipesParams: A JSON-serializable dict of parameters to be used used by the external process to locate and load the injected context data. """
[docs]@experimental class PipesMessageReader(ABC): @abstractmethod @contextmanager def read_messages(self, handler: "PipesMessageHandler") -> Iterator[PipesParams]: """A `@contextmanager` that reads messages reported by an external process. This method should start a thread to continuously read messages from some location accessible to the external process. It should yield parameters that the external process can use to direct its message output. Args: handler (PipesMessageHandler): The message handler to use to process messages read from the external process. Yields: PipesParams: A dict of parameters that can be used by the external process to determine where to write messages. """