Core application
How to develop a basic Workflow
Workflows are the fundamental unit of a Temporal Application, and it all starts with the development of a Workflow Definition.
In the Temporal PHP SDK programming model, Workflows are a class method. Classes must implement interfaces that are annotated with #[WorkflowInterface]
. The method that is the Workflow must be annotated with #[WorkflowMethod]
.
use Temporal\Workflow\YourWorkflowInterface;
use Temporal\Workflow\WorkflowMethod;
#[WorkflowInterface]
interface FileProcessingWorkflow
{
#[WorkflowMethod]
public function processFile(Argument $args);
}
How to define Workflow parameters
Temporal Workflows may have any number of custom parameters. However, we strongly recommend that objects are used as parameters, so that the object's individual fields may be altered without breaking the signature of the Workflow. All Workflow Definition parameters must be serializable.
A method annotated with #[WorkflowMethod]
can have any number of parameters.
We recommend passing a single parameter that contains all the input fields to allow for adding fields in a backward-compatible manner.
Note that all inputs should be serializable to a byte array using the provided DataConverter interface. The default implementation uses a JSON serializer, but an alternative implementation can be easily configured. You can create a custom object and pass it to the Workflow method, as shown in the following example:
#[WorkflowInterface]
interface FileProcessingWorkflow {
#[WorkflowMethod]
public function processFile(Argument $args);
}
How to define Workflow return parameters
Workflow return values must also be serializable. Returning results, returning errors, or throwing exceptions is fairly idiomatic in each language that is supported. However, Temporal APIs that must be used to get the result of a Workflow Execution will only ever receive one of either the result or the error.
A Workflow method returns a Generator.
To properly typecast the Workflow's return value in the client code, use the #[ReturnType()]
attribute.
#[WorkflowInterface]
interface FileProcessingWorkflow {
#[WorkflowMethod]
#[ReturnType("string")]
public function processFile(Argument $args);
}
How to customize your Workflow Type
Workflows have a Type that are referred to as the Workflow name.
The following examples demonstrate how to set a custom name for your Workflow Type.
To customize a Workflow Type, use the WorkflowMethod
attribute to specify the name of Workflow.
#[WorkflowMethod(name)]
If a Workflow Type is not specified, then Workflow Type defaults to the interface name, which is YourWorkflowDefinitionInterface
in this case.
#[WorkflowInterface]
interface YourWorkflowDefinitionInterface
{
#[WorkflowMethod]
public function processFile(Argument $args);
}
How develop Workflow logic
Workflow logic is constrained by deterministic execution requirements. Therefore, each language is limited to the use of certain idiomatic techniques. However, each Temporal SDK provides a set of APIs that can be used inside your Workflow to interact with external (to the Workflow) application code.
**Temporal uses the Microsoft Azure Event Sourcing pattern to recover the state of a Workflow object including its local variable values.
In essence, every time a Workflow state has to be restored, its code is re-executed from the beginning. When replaying, side effects (such as Activity invocations) are ignored because they are already recorded in the Workflow event history. When writing Workflow logic, the replay is not visible, so the code should be written since it executes only once. This design puts the following constraints on the Workflow implementation:
- Do not use any mutable global variables because multiple instances of Workflows are executed in parallel.
- Do not call any non-deterministic functions like non seeded random or
UUID
directly from the Workflow code.
Always do the following in the Workflow implementation code:
- Don't perform any IO or service calls as they are not usually deterministic. Use Activities for this.
- Only use
Workflow::now()
to get the current time inside a Workflow. - Call
yield Workflow::timer()
instead ofsleep()
. - Do not use any blocking SPL provided by PHP (i.e.
fopen
,PDO
, etc) in Workflow code. - Use
yield Workflow::getVersion()
when making any changes to the Workflow code. Without this, any deployment of updated Workflow code might break already open Workflows. - Don't access configuration APIs directly from a Workflow because changes in the configuration might affect a Workflow Execution path. Pass it as an argument to a Workflow function or use an Activity to load it.
Workflow method arguments and return values are serializable to a byte array using the provided DataConverter interface. The default implementation uses JSON serializer, but you can use any alternative serialization mechanism.
Make sure to annotate your WorkflowMethod
using ReturnType
to specify concrete return type.
You can not use the default return type declaration as Workflow methods are generators.
The values passed to Workflows through invocation parameters or returned through a result value are recorded in the execution history. The entire execution history is transferred from the Temporal service to Workflow workers with every event that the Workflow logic needs to process. A large execution history can thus adversely impact the performance of your Workflow. Therefore, be mindful of the amount of data that you transfer via Activity invocation parameters or return values. Otherwise, no additional limitations exist on Activity implementations.**
How to develop a basic Activity
One of the primary things that Workflows do is orchestrate the execution of Activities. An Activity is a normal function or method execution that's intended to execute a single, well-defined action (either short or long-running), such as querying a database, calling a third-party API, or transcoding a media file. An Activity can interact with world outside the Temporal Platform or use a Temporal Client to interact with a Temporal Service. For the Workflow to be able to execute the Activity, we must define the Activity Definition.
Activities are defined as methods of a plain PHP interface annotated with #[ActivityInterface]
.
Following is an example of an interface that defines four Activities:
#[ActivityInterface]
// Defining an interface for the activities.
interface FileProcessingActivities
{
public function upload(string $bucketName, string $localName, string $targetName): void;
#[ActivityMethod("transcode_file")]
public function download(string $bucketName, string $remoteName): void;
public function processFile(): string;
public function deleteLocalFile(string $fileName): void;
}
How to develop Activity Parameters
There is no explicit limit to the total number of parameters that an Activity Definition may support. However, there is a limit to the total size of the data that ends up encoded into a gRPC message Payload.
A single argument is limited to a maximum size of 2 MB. And the total size of a gRPC message, which includes all the arguments, is limited to a maximum of 4 MB.
Also, keep in mind that all Payload data is recorded in the Workflow Execution Event History and large Event Histories can affect Worker performance. This is because the entire Event History could be transferred to a Worker Process with a Workflow Task.
Some SDKs require that you pass context objects, others do not. When it comes to your application data—that is, data that is serialized and encoded into a Payload—we recommend that you use a single object as an argument that wraps the application data passed to Activities. This is so that you can change what data is passed to the Activity without breaking a function or method signature.
Each method defines a single Activity type. A single Workflow can use more than one Activity interface and call more than one Activity method from the same interface.
The only requirement is that Activity method arguments and return values are serializable to a byte array using the provided DataConverter interface. The default implementation uses a JSON serializer, but an alternative implementation can be easily configured.
How to define Activity return values
All data returned from an Activity must be serializable.
There is no explicit limit to the amount of data that can be returned by an Activity, but keep in mind that all return values are recorded in a Workflow Execution Event History.
Return values must be serializable to a byte array using the provided DataConverter interface. The default implementation uses a JSON serializer, but an alternative implementation can be easily configured. Thus, you can return both primitive types:
class GreetingActivity implements GreetingActivityInterface
{
public function composeGreeting(string $greeting, string $name): string
{
return $greeting . ' ' . $name;
}
}
And objects:
class GreetingActivity implements GreetingActivityInterface
{
public function composeGreeting(string $greeting, string $name): Greeting
{
return new Greeting($greeting, $name);
}
}
How to customize your Activity Type
Activities have a Type that are referred to as the Activity name. The following examples demonstrate how to set a custom name for your Activity Type.
An optional #[ActivityMethod]
attribute can be used to override a default Activity name.
You can define your own prefix for all Activity names by adding the prefix
option to the ActivityInterface
attribute.
(The default prefix is empty.)
#[ActivityInterface("file_activities.")]
interface FileProcessingActivities
{
public function upload(string $bucketName, string $localName, string $targetName);
#[ActivityMethod("transcode_file")]
public function download(string $bucketName, string $remoteName);
public function processFile(): string;
public function deleteLocalFile(string $fileName);
}
The #[ActivityInterface("file_activities.")]
is an attribute that tells the PHP SDK to generate a class to implement the FileProcessingActivities
interface. The functions define Activities that are used in the Workflow.
How to start an Activity Execution
Calls to spawn Activity Executions are written within a Workflow Definition. The call to spawn an Activity Execution generates the ScheduleActivityTask Command. This results in the set of three Activity Task related Events (ActivityTaskScheduled, ActivityTaskStarted, and ActivityTask[Closed])in your Workflow Execution Event History.
A single instance of the Activities implementation is shared across multiple simultaneous Activity invocations. Activity implementation code should be idempotent.
The values passed to Activities through invocation parameters or returned through a result value are recorded in the Execution history. The entire Execution history is transferred from the Temporal service to Workflow Workers when a Workflow state needs to recover. A large Execution history can thus adversely impact the performance of your Workflow.
Therefore, be mindful of the amount of data you transfer through Activity invocation parameters or Return Values. Otherwise, no additional limitations exist on Activity implementations.
Activity implementation is an implementation of an Activity interface. The following code example, uses a constructor that takes an Amazon S3 client and a local directory, and uploads a file to the S3 bucket. Then, the code uses a function to download a file from the S3 bucket passing a bucket name, remote name, and local name as arguments. Finally, it uses a function that takes a local file name as an argument and returns a string.
// An implementation of an Activity interface.
class FileProcessingActivitiesImpl implements FileProcessingActivities {
private S3Client $s3Client;
private string $localDirectory;
public function __construct(S3Client $s3Client, string $localDirectory) {
$this->s3Client = $s3Client;
$this->localDirectory = $localDirectory;
}
// Uploading a file to S3.
public function upload(string $bucketName, string $localName, string $targetName): void
{
$this->s3Client->putObject(
$bucketName,
$targetName,
fopen($this->localDirectory . $localName, 'rb+')
);
}
// Downloading a file from S3.
public function download(
string $bucketName,
string $remoteName,
string $localName
): void
{
$this->s3Client->downloadObject(
$bucketName,
$remoteName,
fopen($this->localDirectory .$localName, 'wb+')
);
}
// A function that takes a local file name as an argument and returns a string.
public function processFile(string $localName): string
{
// Implementation omitted for brevity.
return compressFile($this->localDirectory . $localName);
}
public function deleteLocalFile(string $fileName): void
{
unlink($this->localDirectory . $fileName);
}
}
How to set the required Activity Timeouts
Activity Execution semantics rely on several parameters. The only required value that needs to be set is either a Schedule-To-Close Timeout or a Start-To-Close Timeout. These values are set in the Activity Options.
How to get the results of an Activity Execution
The call to spawn an Activity Execution generates the ScheduleActivityTask Command and provides the Workflow with an Awaitable. Workflow Executions can either block progress until the result is available through the Awaitable or continue progressing, making use of the result when it becomes available.
Workflow::newActivityStub
returns a client-side stub an implements an Activity interface. The client-side stub can be used within the Workflow code. It takes the Activity's type andActivityOptions
as arguments.
Calling (via yield
) a method on this interface invokes an Activity that implements this method.
An Activity invocation synchronously blocks until the Activity completes, fails, or times out.
Even if Activity Execution takes a few months, the Workflow code still sees it as a single synchronous invocation.
It doesn't matter what happens to the processes that host the Workflow.
The business logic code just sees a single method call.
class GreetingWorkflow implements GreetingWorkflowInterface
{
private $greetingActivity;
public function __construct()
{
$this->greetingActivity = Workflow::newActivityStub(
GreetingActivityInterface::class,
ActivityOptions::new()->withStartToCloseTimeout(\DateInterval::createFromDateString('30 seconds'))
);
}
public function greet(string $name): \Generator
{
// This is a blocking call that returns only after the activity has completed.
return yield $this->greetingActivity->composeGreeting('Hello', $name);
}
}
If different Activities need different options, like timeouts or a task queue, multiple client-side stubs can be created with different options.
$greetingActivity = Workflow::newActivityStub(
GreetingActivityInterface::class,
ActivityOptions::new()->withStartToCloseTimeout(\DateInterval::createFromDateString('30 seconds'))
);
$greetingActivity = Workflow::newActivityStub(
GreetingActivityInterface::class,
ActivityOptions::new()->withStartToCloseTimeout(\DateInterval::createFromDateString('30 minutes'))
);
How to run Worker Processes
The Worker Process is where Workflow Functions and Activity Functions are executed.
- Each Worker Entity in the Worker Process must register the exact Workflow Types and Activity Types it may execute.
- Each Worker Entity must also associate itself with exactly one Task Queue.
- Each Worker Entity polling the same Task Queue must be registered with the same Workflow Types and Activity Types.
A Worker Entity is the component within a Worker Process that listens to a specific Task Queue.
Although multiple Worker Entities can be in a single Worker Process, a single Worker Entity Worker Process may be perfectly sufficient. For more information, see the Worker tuning guide.
A Worker Entity contains a Workflow Worker and/or an Activity Worker, which makes progress on Workflow Executions and Activity Executions, respectively.
The RoadRunner application server will launch multiple Temporal PHP Worker processes based on provided .rr.yaml
configuration.
Each Worker might connect to one or multiple Task Queues. Worker poll Temporal service for tasks, performs those tasks, and communicates task execution results back to the Temporal service.
Worker code are developed, deployed, and operated by Temporal customers.
To create a worker use Temporal\WorkerFactory
:
<?php
declare(strict_types=1);
use Temporal\WorkerFactory;
ini_set('display_errors', 'stderr');
include "vendor/autoload.php";
// factory initiates and runs task queue specific activity and workflow workers
$factory = WorkerFactory::create();
// Worker that listens on a Task Queue and hosts both workflow and activity implementations.
$worker = $factory->newWorker();
// Workflows are stateful. So you need a type to create instances.
$worker->registerWorkflowTypes(App\DemoWorkflow::class);
// Activities are stateless and thread safe. So a shared instance is used.
$worker->registerActivity(App\DemoActivity::class);
// In case an activity class requires some external dependencies provide a callback - factory
// that creates or builds a new activity instance. The factory should be a callable which accepts
// an instance of ReflectionClass with an activity class which should be created.
$worker->registerActivity(App\DemoActivity::class, fn(ReflectionClass $class) => $container->create($class->getName()));
// start primary loop
$factory->run();
You can configure task queue name using first argument of WorkerFactory
->newWorker
:
$worker = $factory->newWorker('your-task-queue');
As mentioned above you can create as many Task Queue connections inside a single Worker Process as you need.
To configure additional WorkerOptions use Temporal\Worker\WorkerOptions
:
use Temporal\Worker\WorkerOptions;
$worker = $factory->newWorker(
'your-task-queue',
WorkerOptions::new()
->withMaxConcurrentWorkflowTaskPollers(10)
);
Make sure to point the Worker file in application server configuration:
rpc:
listen: tcp://127.0.0.1:6001
server:
command: 'php worker.php'
temporal:
address: 'temporal:7233'
activities:
num_workers: 10
You can serve HTTP endpoints using the same server setup.
How to register types
All Workers listening to the same Task Queue name must be registered to handle the exact same Workflows Types and Activity Types.
If a Worker polls a Task for a Workflow Type or Activity Type it does not know about, it fails that Task. However, the failure of the Task does not cause the associated Workflow Execution to fail.
Worker listens on a Task Queue and hosts both Workflow and Activity implementations:
// Workflows are stateful. So you need a type to create instances:
$worker->registerWorkflowTypes(App\DemoWorkflow::class);
// Activities are stateless and thread safe:
$worker->registerActivity(App\DemoActivity::class);
In case an activity class requires some external dependencies provide a callback - factory that creates or builds a new activity instance. The factory should be a callable which accepts an instance of ReflectionClass with an activity class which should be created.
$worker->registerActivity(
App\DemoActivity::class,
fn(ReflectionClass $class) => $container->create($class->getName())
);
If you want to clean up some resources after activity is done, you may register a finalizer. This callback is called after each activity invocation:
$worker->registerActivityFinalizer(fn() => $kernel->showtdown());