Development guide
This document gives a high level view of what it is to develop a Cogment based application using Cogment SDKs. It assumes the reader is familiar with the Cogment core concepts.
Some features aren't available in Javascript, if there's only Python examples under a certain section, that feature is python only.
The High-level Cogment API expects users to use protocol buffers to declare a project's data structures. The intricacies of protobufs are beyond the scope of this document. Basic knowledge of the technology and its usage is assumed.
The spec file
The spec file, usually named cogment.yaml
, defines the specifics of a type of trials for a Cogment app: actor classes as well as environment & trial configuration types. It is the starting point for a project.
An actor class is primarily defined by its observation space and action space.
The data structures describing these spaces are declared by using a protocol buffer message type. Observations and actions will simply be instances of the matching type.
For example, in the following, driver
and pedestrian
share a common view of the environment, hence use the same observation space, but have different actions available to them.
import:
proto:
- city.proto
actors:
driver:
observation:
space: city.Observation
action:
space: city.DriverAction
pedestrian:
observation:
space: city.Observation
action:
space: city.PedestrianAction
This shows only the relevant part of the full spec file, you can find the full list of configurable options in the reference page.
Compiling the spec file
In order to use the spec file within the various supported languages, it needs to be compiled. This is done by the code generation modules of the specific language's SDK you are using.
- Python
- Javascript
For the python SDK, the generation step requires an extra from the cogment
package. Using pip you can install the SDK with its extra using:
pip install cogment[generate]
The generation can then be executed using:
python -m cogment.generate cogment.yaml
This will create a cog_settings.py
file as well as multiple *_pb.py
files.
For the Javascript SDK, the generation requires the SDK to be installed:
npm install @cogment/cogment-js-sdk
The generation can then be executed using:
npx cogment-js-sdk-generate cogment.yaml
This will create several files in the src
folder: CogSettings.js
, CogSettings.d.ts
, CogSettings.d.ts.map
and CogTypes.d.ts
as well multiple *_pb.js
and *_pb.d.ts
files.
Environment
Environments are implemented by functions that take a environment session instance.
This function will be called once for each trial. This function usually consists of three sections.
- The environment's initialization, where its internal state can be initialized and processes started. It ends with the sending of the initial observations to the actors participating in the trial.
- Its event loop, where the environment iterates through the events occurring during the trial and produces observations as well as receives messages. In this loop the environment can end the trial on its own or the end can be requested, see the Trial lifetime section for further information.
- Its termination, where cleanup occurs.
In the common case where all actors within a trial share the same observation, a bare-minimum environment service would look like this:
async def environment(environment_session):
# -- Initialization --
# Retrieve the actors participating in the trial
actors = environment_session.get_active_actors()
# Start the trial and send a starting observation to all actors
environment_session.start([("*", Observation())])
# -- Event loop --
async for event in environment_session.all_events():
if event.actions:
# `event.actions` is a list of the actions done by the actors
actions = event.actions
if event.type == cogment.EventType.ACTIVE:
# The trial is active, produce an observation in response to the actions
environment_session.produce_observations([("*", Observation())])
# Alternatively the environment can decide to **end** the trial with the following
# environment_session.end([("*", Observation())])
else:
# The trial termination has been requested
# Produce a final observation
environment_session.end([("*", Observation())])
for message in event.messages:
# `event.messages` is a list of all the messages received by the environment (it can be empty)
# Handle each message here.
# -- Termination --
print(f"Trial [{environment_session.get_trial_id()}] terminated")
For further details, take a look at the cogment.EnvironmentSession
class reference.
This environment implementation needs to be registered and served so that the Orchestrator can reach it. This can be done through a Context
instance.
context = cogment.Context(user_id="my_user_id", cog_settings=cog_settings)
context.register_environment(impl=environment, impl_name="my_environment")
await context.serve_all_registered(cogment.ServedEndpoint(port=9000))
For further details, take a look at the cogment.Context
class reference.
Sending observations
The environment session has 3 different methods able to send observations: start
, produce_observations
and end
. Each of those methods takes a list of 2-tuples destination / observation.
As demonstrated above, sending the same observation to all actors is done using "*"
as the destination.
environment_session.produce_observations([("*", Observation(...))])
It is also possible to send different observations to different actors. This can be useful to send observations of the world from the point of view of the actor or to send partial observations.
environment_session.produce_observations([
("my_first_actor_name", Observation(...)),
("my_second_actor_name", Observation(...))
])
Please note that the environment should always send observations such as each actor in the trial receives one.
Actor
Actors implementations look a lot like the environment's. They take actor session instance and have the same three sections: initialization, event loop and termination.
The event loops in Actors' implementations handle three basic types of events:
observation
produced by the environment and should lead to an action being done.rewards
sent by other actors or the environment, we'll talk about them in more details below.messages
sent by other actors or the environment, we'll talk about them in more details below.
A typical actor implementation would look like this:
- Python
- Javascript
async def driver_actor(actor_session):
# -- Initialization --
# Notify that the actor is ready for the trial to start.
actor_session.start()
async for event in actor_session.all_events():
if event.observation:
# `event.observation` is an instance of the Observation produced by the environment
observation = event.observation
if event.type == cogment.EventType.ACTIVE:
# The trial is active, it is expecting the agent to do an action
actor_session.do_action(DriverAction(...))
for reward in event.rewards:
# `event.rewards` is a list of all the rewards received by the actor (it can be empty)
# Handle each reward here.
for message in event.messages:
# `event.messages` is a list of all the messages received by the actor (it can be empty)
# Handle each message here.
# -- Termination --
print(f"Trial [{actor_session.get_trial_id()}] terminated")
For further details, take a look at the cogment.ActorSession
class reference.
const driverActor = async (actorSession) => {
// -- Initialization --
// Notify that the actor is ready for the trial to start.
actorSession.start();
for await (const event of actorSession.eventLoop()) {
if (event.observation) {
// `event.observation` is an instance of the Observation produced by the environment
observation = event.observation;
if (event.type === cogment.EventType.ACTIVE) {
// The trial is active, it is expecting the agent to do an action
actorSession.sendAction(new DriverAction());
}
}
for (const reward of event.rewards) {
// `event.rewards` is a list of all the rewards received by the actor (it can be empty)
// Handle each reward here.
}
for (const message of event.messages) {
// `event.messages` is a list of all the messages received by the actor (it can be empty)
// Handle each message here.
}
}
// -- Termination --
console.log(`Trial [${actorSession.getTrialId()}] terminated`);
};
For further details, take a look at the ActorSession
class reference.
Service actor / Client actor
A Cogment app can use two types of actors, they are identical in terms of implementation but differ in how they interact with the app's Orchestrator.
Service actors are accessible in the same way the environment is, through a Context
instance.
context = cogment.Context(cog_settings=cog_settings, user_id="my_user_id")
context.register_actor(impl=actor, impl_name="driver_actor", actor_classes=["driver"])
context.register_actor(impl=actor_slow, impl_name="driver_actor_slow", actor_classes=["driver"])
await context.serve_all_registered(cogment.ServedEndpoint(port=9000))
Note that it is also through this registrating that the implementation is associated with one or more actor classes it implements.
Client actors, contrary to Service actors, are not served to the Orchestrator. They connect as clients of the Orchestrator and join a trial that has started.
- Python
- Javascript
context = cogment.Context(cog_settings=cog_settings, user_id="my_user_id")
context.register_actor(
impl=actor,
impl_name="human_pedestrian",
actor_classes=["pedestrian"]
)
await context.join_trial(
trial_id=trial_id,
cogment.Endpoint(url="grpc://orchestrator:9000"),
actor_name="Alice"
)
const context = new Context(cogSettings, "my_user_id");
context.registerActor(actorImpl, "human_pedestrian", "pedestrian");
await context.joinTrial(trialId, "grpc://orchestrator:9000", "Alice");
Note that a trial including one or more client actors will wait for all of them to join before any actor can start processing events.
Due to the different network requirements, client actors are a good fit when implementing a frontend for human actors.
Actor Availability
There are four actor parameters (see Trial Parameters) to manage actor responses for a trial: initial_connection_timeout
, response_timeout
, optional
and default_action
.
The timeouts control when an actor becomes unavailable, and the other two control what happens when it becomes unavailable.
The trial will be hard terminated by the Orchestrator if a required (non-optional) actor becomes unavailable, whereas it can continue if an optional actor becomes unavailable.
The default_action
is for optional actors; when the actor is unavailable, the default action will be sent to the environment. If there is no default action defined, the environment is informed that the actor is unavailable, but no action is provided.
E.g. If a required actor (with optional
set to False) has a response_timeout
set to 5.0 seconds, but takes more than 5 seconds to respond to a new observation (e.g. due to excessive computation, or a crash), then the trial will be terminated.
Controller
Trials are started by clients of the Orchestrator using a Controller. Instances of a controller are built from the context instance and connect to an Orchestrator endpoint.
- Python
- Javascript
controller = context.get_controller(
endpoint=cogment.Endpoint(url="grpc://orchestrator:9000")
)
For further details, take a look at the Controller
class reference.
const controller = context.getController("grpc://orchestrator:9000");
For further details, take a look at the Controller
class reference.
Start and terminate trials
The controller can then be used to create trials and request their termination.
- Python
- Javascript
trial_id = await controller.start_trial(trial_config=TrialConfig())
# ...
await controller.terminate_trial([trial_id])
const trialId = await controller.startTrial(new TrialConfig());
// ...
await controller.terminateTrial([trialId]);
The default behavior of the Controller's terminate trial function is to trigger a soft termination, an optional hard
parameters can be set to True
to trigger a hard termination.
Start trial from trial parameters
The above example uses an instance of the user-defined trial configuration to start the trial. This instance is then provided to a pre-trial hook to fully define the trial parameters. It is also possible to fully provide the trial parameters when starting the trial.
actor_1_params = cogment.ActorParameters(
cog_settings,
name="Alice",
class_name="pedestrian",
endpoint="cogment://client"
)
actor_2_params = cogment.ActorParameters(
cog_settings,
name="ai_driver",
class_name="driver",
endpoint="grpc://driver_actors:9000",
implementation="driver_actor_slow"
)
environment_config=EnvironmentConfig(
# ...
)
trial_params=cogment.TrialParameters(
cog_settings,
environment_name="environment",
environment_implementation="my_environment"
environment_endpoint="grpc://environment:9000",
environment_config=environment_config,
actors=[
actor_1_params,
actor_2_params,
]
)
For further details, take a look at the cogment.TrialParameters
class reference.
Watch trials
The controller can also be used to subscribe to events occuring in the trials run by the Orchestrator it connects to. For example, this can be used to wait for a trial's end:
- Python
- Javascript
async for trial_info in controller.watch_trials(trial_state_filters=[
cogment.TrialState.ENDED
]):
print(f"The trial having id [{trial_info.trial_id}] ended")
for await (const trialListEntry of controller.watchTrials([
cogment.TrialState.ENDED,
])) {
console.log(`The trial having id ${trialListEntry.getTrialId()} ended.`);
}
Rewards
Creating
Rewards are sent to Actors from another actor or the Environment. The session
instance passed to their implementation can be used for this purpose.
session.add_reward(value=-1, confidence=1, tick_id=-1, to=['an_actor_name'])
Rewards consist of an arbitrary numerical value describing how the reward "sender" believes the actor performed. It is weighted by a value between 0 and 1 qualifying the confidence of the "sender" in its reward, from a very low confidence just above 0 to a very high confidence approaching 1. The confidence value is used to collate all the rewards sent to an actor at the same time. Optionally, a reward can be provided with arbitrary user data.
Each reward applies to a list of recipients (either all the actors, all the actors of a given class or a specific actor) at a specific point in time, during the trial, defined as a tick. The tick ID can represent a past action if the value is smaller than the current tick ID ("future" actions are not allowed). Past actions, like sending a reward related to a past tick ID, are handled in accordance with the nb_buffered_ticks trial parameter.
The full documentation for session.add_reward
can be found here.
Consuming
All the Rewards that are sent and destined to each specific actor for a given point in time are collated together by the framework.
The actor can take into account the reward directly as the trial is running by consuming the "reward"
event in their event loop.
- Python
- Javascript
async for event in actor_session.all_events():
# [...]
for reward in event.rewards:
# `reward.tick_id` is the id of the tick this reward concerns.
tick_id = reward.tick_id
# `reward.value` is the aggregated value of the reward.
value = reward.value
for source_reward in reward.all_sources():
# Iterate over individual source rewards.
reward_value = source_reward.value
reward_confidence = source_reward.confidence
reward_sender = source_reward.sender
reward_user_data = source_reward.user_data
for await (const event of actorSession.eventLoop()) {
// [...]
for (const reward of event.rewards) {
// `reward.tickId` is the id of the tick this reward concerns.
const tickId = reward.tickId;
// `reward.value` is the aggregated value of the reward.
const value = reward.value;
for (sourceReward of reward.sourcesList) {
// Iterate over individual source rewards.
const rewardValue = sourceReward.value;
const rewardConfidence = sourceReward.confidence;
const rewardSender = sourceReward.senderName;
const rewardUserData = sourceReward.userData;
}
}
}
Messages
Creating
Messages can be created and sent between actors or the environment within a trial using their session
instance.
- Python
- Javascript
session.send_message(
user_data=MyProtobufDataStructure(...), # any protobuf data structure can be used here.
to=['pedestrian:*'], # send the message to all the actors of the "pedestrian" class
)
The full documentation for session.send_message
can be found here.
const message = new Message();
message.setRequest("hello");
// Now we serialize into an Any protobuf message.
const anyPb = new MyProtobufDataStructure();
anyPb.pack(message.serializeBinary(), "cogment_app.Message");
actorSession.sendMessage({
from: actorName,
payload: anyPb,
to: "otherActorName",
trialId,
});
The full documentation for actorSession.sendMessage
can be found here.
Messages consist of an arbitrary payload, their user_data/payload
, defined as an instance of any protobuf data structure.
A message can be sent to one, many or all actors in a trial and / or to the environment.
Consuming
All the messages that are sent and intended for each specific actor or environment will be received by the target actor or environment.
Actors or the environment can use the message directly, live, as the trial is running, by consuming message event in their event loop.
- Python
- Javascript
async for event in actor_session.all_events():
# [...]
for message in event.messages:
# `message.sender_name` is the name of the actor who sent a message
sender_name = message.sender_name
# `message.payload` is the content of the message, it needs to be unpacked
payload = message.payload
for await (const event of actorSession.eventLoop()) {
// [...]
for (const message of event.messages) {
// `message.sender_name` is the name of the actor who sent a message
const sender = message.sender;
// `message.data` is the content of the message, it needs to be unpacked
const data = message.data;
const newMessage = data.unpack(
(x: Uint8Array) => Message.deserializeBinary(x),
"cogment_app.Message"
);
}
}
Pre-trial hook
When starting a trial from a controller, an instance of the message type defined in trial:config_type
can be provided. This instance is then passed to the registered Pre trial hooks when the Orcehstrator was started. The role of these hooks is to fully parametrize the trial based on the provided config. To achieve that, they can modify the default trial params defined in the parameters to specify the environment (i.e. its endpoint, implementation name & configuration), the number and parameters of the participant actors (i.e. their name, class, endpoint, implementation name & configuration) as well as additional parameters for the trial. The pre-trial hook can therefore be used to dynamically configure trials, to act as a service endpoint registry, or a load balancer.
Pre-trial hook implementations are registered in the same way the environment or actor implementation are and follow the same session pattern.
async def my_pre_trial_hook(pre_hook_session):
# The trial config provided by the controller can be retrieved like that
trial_config = pre_hook_session.trial_config
# The trial params can be edited directly
pre_hook_session.environment_config = # [...]
pre_hook_session.environment_endpoint = "grpc://my_environment:9000"
pre_hook_session.actors = [
{
"name": "my_first_actor_name",
"actor_class": "driver",
"endpoint": "grpc://driver:9000",
"implementation": "driver_actor",
"config": # [...],
},
{
"name": "my_second_actor_name",
"actor_class": "predestrian",
"endpoint": "cogment://client",
"config": # [...],
},
]
# And finally should be validated
pre_hook_session.validate()
context.register_pre_trial_hook(impl=my_pre_trial_hook)
Trial lifetime
Over their lifetime trials can be in different states managed by the Orchestrator. The current state of the trial can be accessed by a Controler.
Possible Trial States are defined as:
cogment.TrialState
in the Python SDK.cogment.TrialState
in the Javascript SDK.cogmentAPI.TrialState
in the gRPC API.
Trial start
The trial starts with a request from a Controller to the Orchestrator. At creation the trial is in the INITIALIZING
state.
If no trial params are provided at this stage, registered Pre-trial hooks are called in sequence. The trial is now fully initialized and becomes PENDING
as it wait for all the components, actors and environment to be available.
Once all the non-optional actors are available and the environment sends the initial observation set, the trial becomes RUNNING
. If at least one non-optional actor is unavailable, it terminates right away and becomes ENDED
.
Trial run
As long as the trial is RUNNING
, the Orchestrator executes steps:
- To each actor, the Orchestrator sends an event, having the
ACTIVE
type, containing its observation, - The Orchestrator then waits to receive one action per actor,
- To the environment, the Orchestrator sends an event, having the
ACTIVE
type, containing the set of actions, - The Orchestrator then waits to receive the set of observations from the environment.
Trial end
Trial ended by the environment
This is the normal way for a trial to end.
- The environment sends the final observation set, using the session's
end
method. - The trial state is set to
TERMINATING
. - Observations are sent to the actors with the event type
ENDING
. Rewards and messages sent before that point will sent to their destination; later ones will not. - Once all the last events are received, the Orchestrator sends a event of type
FINAL
to all the components. - The trial becomes
ENDED
.
Soft termination
Soft termination can be triggered either by the user, using the Controller or after a maximum number of steps, if the trial Parameter max_steps
is set.
- The Orchestrator waits until it receives the next action set from the actors.
- The trial state is set to
TERMINATING
. - Actions are sent to the environment with the event type
ENDING
. - From this point on the flow is the same as when the trial is ended by the environment.
After a soft termination is initiated (i.e. after receiving an event of type ENDING
), the next set of observations sent by the environment will always be the last one (i.e. using end
or produce_observation
will have the same behavior).
Hard termination
Hard termination can be triggered in multiple ways:
- By the user, using the Controller.
- Because of unavailable actors. If a required (non-optional) actor becomes unavailable (for any reason), the trial will be terminated (see Actor Availability).
- Because of inactivity. If a component (actor, environment, pre-trial hook, etc) does not respond, the trial will be waiting and be inactive. If the trial is inactive for too long (see Parameter
max_inactivity
), it will be terminated. With actor components this may happen when the timeout is too long (or indefinite) then the actor never becomes unavailable (see Actor Availability) and the trial will keep waiting for it. - Because of a critical error.
In all this cases:
- The trial becomes
TERMINATING
. - The Orchestrator sends a event of type
FINAL
to all the components. - The trial becomes
ENDED
.