Skip to main content

Development guide

This document gives a high level view of what it is to develop a Cogment based application using Cogment SDKs. It assumes the reader is familiar with the Cogment core concepts.

note

Some features aren't available in Javascript, if there's only Python examples under a certain section, that feature is python only.

The High-level Cogment API expects users to use protocol buffers to declare a project's data structures. The intricacies of protobufs are beyond the scope of this document. Basic knowledge of the technology and its usage is assumed.

The spec file

The spec file, usually named cogment.yaml, defines the specifics of a type of trials for a Cogment app: actor classes as well as environment & trial configuration types. It is the starting point for a project.

An actor class is primarily defined by its observation space and action space.

The data structures describing these spaces are declared by using a protocol buffer message type. Observations and actions will simply be instances of the matching type.

For example, in the following, driver and pedestrian share a common view of the environment, hence use the same observation space, but have different actions available to them.

import:
proto:
- city.proto

actors:
driver:
observation:
space: city.Observation

action:
space: city.DriverAction

pedestrian:
observation:
space: city.Observation

action:
space: city.PedestrianAction
tip

This shows only the relevant part of the full spec file, you can find the full list of configurable options in the reference page.

Compiling the spec file

In order to use the spec file within the various supported languages, it needs to be compiled. This is done by the code generation modules of the specific language's SDK you are using.

For the python SDK, the generation step requires an extra from the cogment package. Using pip you can install the SDK with its extra using:

pip install cogment[generate]

The generation can then be executed using:

python -m cogment.generate cogment.yaml

This will create a cog_settings.py file as well as multiple *_pb.py files.

Environment

Environments are implemented by functions that take a environment session instance.

This function will be called once for each trial. This function usually consists of three sections.

  • The environment's initialization, where its internal state can be initialized and processes started. It ends with the sending of the initial observations to the actors participating in the trial.
  • Its event loop, where the environment iterates through the events occurring during the trial and produces observations as well as receives messages. In this loop the environment can end the trial on its own or the end can be requested, see the Trial lifetime section for further information.
  • Its termination, where cleanup occurs.

In the common case where all actors within a trial share the same observation, a bare-minimum environment service would look like this:

async def environment(environment_session):
# -- Initialization --

# Retrieve the actors participating in the trial
actors = environment_session.get_active_actors()

# Start the trial and send a starting observation to all actors
environment_session.start([("*", Observation())])

# -- Event loop --
async for event in environment_session.all_events():
if event.actions:
# `event.actions` is a list of the actions done by the actors
actions = event.actions
if event.type == cogment.EventType.ACTIVE:
# The trial is active, produce an observation in response to the actions
environment_session.produce_observations([("*", Observation())])
# Alternatively the environment can decide to **end** the trial with the following
# environment_session.end([("*", Observation())])
else:
# The trial termination has been requested
# Produce a final observation
environment_session.end([("*", Observation())])

for message in event.messages:
# `event.messages` is a list of all the messages received by the environment (it can be empty)

# Handle each message here.

# -- Termination --
print(f"Trial [{environment_session.get_trial_id()}] terminated")

For further details, take a look at the cogment.EnvironmentSession class reference.

This environment implementation needs to be registered and served so that the Orchestrator can reach it. This can be done through a Context instance.

context = cogment.Context(user_id="my_user_id", cog_settings=cog_settings)

context.register_environment(impl=environment, impl_name="my_environment")

await context.serve_all_registered(cogment.ServedEndpoint(port=9000))

For further details, take a look at the cogment.Context class reference.

Sending observations

The environment session has 3 different methods able to send observations: start, produce_observations and end. Each of those methods takes a list of 2-tuples destination / observation.

As demonstrated above, sending the same observation to all actors is done using "*" as the destination.

environment_session.produce_observations([("*", Observation(...))])

It is also possible to send different observations to different actors. This can be useful to send observations of the world from the point of view of the actor or to send partial observations.

environment_session.produce_observations([
("my_first_actor_name", Observation(...)),
("my_second_actor_name", Observation(...))
])

Please note that the environment should always send observations such as each actor in the trial receives one.

Actor

Actors implementations look a lot like the environment's. They take actor session instance and have the same three sections: initialization, event loop and termination.

The event loops in Actors' implementations handle three basic types of events:

  • observation produced by the environment and should lead to an action being done.
  • rewards sent by other actors or the environment, we'll talk about them in more details below.
  • messages sent by other actors or the environment, we'll talk about them in more details below.

A typical actor implementation would look like this:

async def driver_actor(actor_session):
# -- Initialization --

# Notify that the actor is ready for the trial to start.
actor_session.start()

async for event in actor_session.all_events():
if event.observation:
# `event.observation` is an instance of the Observation produced by the environment
observation = event.observation
if event.type == cogment.EventType.ACTIVE:
# The trial is active, it is expecting the agent to do an action
actor_session.do_action(DriverAction(...))

for reward in event.rewards:
# `event.rewards` is a list of all the rewards received by the actor (it can be empty)

# Handle each reward here.

for message in event.messages:
# `event.messages` is a list of all the messages received by the actor (it can be empty)

# Handle each message here.

# -- Termination --
print(f"Trial [{actor_session.get_trial_id()}] terminated")

For further details, take a look at the cogment.ActorSession class reference.

Service actor / Client actor

A Cogment app can use two types of actors, they are identical in terms of implementation but differ in how they interact with the app's Orchestrator.

Service actors are accessible in the same way the environment is, through a Context instance.

context = cogment.Context(cog_settings=cog_settings, user_id="my_user_id")
context.register_actor(impl=actor, impl_name="driver_actor", actor_classes=["driver"])
context.register_actor(impl=actor_slow, impl_name="driver_actor_slow", actor_classes=["driver"])

await context.serve_all_registered(cogment.ServedEndpoint(port=9000))

Note that it is also through this registrating that the implementation is associated with one or more actor classes it implements.

Client actors, contrary to Service actors, are not served to the Orchestrator. They connect as clients of the Orchestrator and join a trial that has started.

context = cogment.Context(cog_settings=cog_settings, user_id="my_user_id")
context.register_actor(
impl=actor,
impl_name="human_pedestrian",
actor_classes=["pedestrian"]
)

await context.join_trial(
trial_id=trial_id,
cogment.Endpoint(url="grpc://orchestrator:9000"),
actor_name="Alice"
)

Note that a trial including one or more client actors will wait for all of them to join before any actor can start processing events.

Due to the different network requirements, client actors are a good fit when implementing a frontend for human actors.

Actor Availability

There are four actor parameters (see Trial Parameters) to manage actor responses for a trial: initial_connection_timeout, response_timeout, optional and default_action. The timeouts control when an actor becomes unavailable, and the other two control what happens when it becomes unavailable.

The trial will be hard terminated by the Orchestrator if a required (non-optional) actor becomes unavailable, whereas it can continue if an optional actor becomes unavailable.

The default_action is for optional actors; when the actor is unavailable, the default action will be sent to the environment. If there is no default action defined, the environment is informed that the actor is unavailable, but no action is provided.

E.g. If a required actor (with optional set to False) has a response_timeout set to 5.0 seconds, but takes more than 5 seconds to respond to a new observation (e.g. due to excessive computation, or a crash), then the trial will be terminated.

Controller

Trials are started by clients of the Orchestrator using a Controller. Instances of a controller are built from the context instance and connect to an Orchestrator endpoint.

controller = context.get_controller(
endpoint=cogment.Endpoint(url="grpc://orchestrator:9000")
)

For further details, take a look at the Controller class reference.

Start and terminate trials

The controller can then be used to create trials and request their termination.

trial_id = await controller.start_trial(trial_config=TrialConfig())

# ...

await controller.terminate_trial([trial_id])
tip

The default behavior of the Controller's terminate trial function is to trigger a soft termination, an optional hard parameters can be set to True to trigger a hard termination.

Start trial from trial parameters

The above example uses an instance of the user-defined trial configuration to start the trial. This instance is then provided to a pre-trial hook to fully define the trial parameters. It is also possible to fully provide the trial parameters when starting the trial.

actor_1_params = cogment.ActorParameters(
cog_settings,
name="Alice",
class_name="pedestrian",
endpoint="cogment://client"
)
actor_2_params = cogment.ActorParameters(
cog_settings,
name="ai_driver",
class_name="driver",
endpoint="grpc://driver_actors:9000",
implementation="driver_actor_slow"
)

environment_config=EnvironmentConfig(
# ...
)

trial_params=cogment.TrialParameters(
cog_settings,
environment_name="environment",
environment_implementation="my_environment"
environment_endpoint="grpc://environment:9000",
environment_config=environment_config,
actors=[
actor_1_params,
actor_2_params,
]
)

For further details, take a look at the cogment.TrialParameters class reference.

Watch trials

The controller can also be used to subscribe to events occuring in the trials run by the Orchestrator it connects to. For example, this can be used to wait for a trial's end:

async for trial_info in controller.watch_trials(trial_state_filters=[
cogment.TrialState.ENDED
]):
print(f"The trial having id [{trial_info.trial_id}] ended")

Rewards

Creating

Rewards are sent to Actors from another actor or the Environment. The session instance passed to their implementation can be used for this purpose.

session.add_reward(value=-1, confidence=1, tick_id=-1, to=['an_actor_name'])

Rewards consist of an arbitrary numerical value describing how the reward "sender" believes the actor performed. It is weighted by a value between 0 and 1 qualifying the confidence of the "sender" in its reward, from a very low confidence just above 0 to a very high confidence approaching 1. The confidence value is used to collate all the rewards sent to an actor at the same time. Optionally, a reward can be provided with arbitrary user data.

Each reward applies to a list of recipients (either all the actors, all the actors of a given class or a specific actor) at a specific point in time, during the trial, defined as a tick. The tick ID can represent a past action if the value is smaller than the current tick ID ("future" actions are not allowed). Past actions, like sending a reward related to a past tick ID, are handled in accordance with the nb_buffered_ticks trial parameter.

The full documentation for session.add_reward can be found here.

Consuming

All the Rewards that are sent and destined to each specific actor for a given point in time are collated together by the framework.

The actor can take into account the reward directly as the trial is running by consuming the "reward" event in their event loop.

async for event in actor_session.all_events():
# [...]
for reward in event.rewards:
# `reward.tick_id` is the id of the tick this reward concerns.
tick_id = reward.tick_id
# `reward.value` is the aggregated value of the reward.
value = reward.value
for source_reward in reward.all_sources():
# Iterate over individual source rewards.
reward_value = source_reward.value
reward_confidence = source_reward.confidence
reward_sender = source_reward.sender
reward_user_data = source_reward.user_data

Messages

Creating

Messages can be created and sent between actors or the environment within a trial using their session instance.

session.send_message(
user_data=MyProtobufDataStructure(...), # any protobuf data structure can be used here.
to=['pedestrian:*'], # send the message to all the actors of the "pedestrian" class
)

The full documentation for session.send_message can be found here.

Messages consist of an arbitrary payload, their user_data/payload, defined as an instance of any protobuf data structure.

A message can be sent to one, many or all actors in a trial and / or to the environment.

Consuming

All the messages that are sent and intended for each specific actor or environment will be received by the target actor or environment.

Actors or the environment can use the message directly, live, as the trial is running, by consuming message event in their event loop.

async for event in actor_session.all_events():
# [...]
for message in event.messages:
# `message.sender_name` is the name of the actor who sent a message
sender_name = message.sender_name
# `message.payload` is the content of the message, it needs to be unpacked
payload = message.payload

Pre-trial hook

When starting a trial from a controller, an instance of the message type defined in trial:config_type can be provided. This instance is then passed to the registered Pre trial hooks when the Orcehstrator was started. The role of these hooks is to fully parametrize the trial based on the provided config. To achieve that, they can modify the default trial params defined in the parameters to specify the environment (i.e. its endpoint, implementation name & configuration), the number and parameters of the participant actors (i.e. their name, class, endpoint, implementation name & configuration) as well as additional parameters for the trial. The pre-trial hook can therefore be used to dynamically configure trials, to act as a service endpoint registry, or a load balancer.

Pre-trial hook implementations are registered in the same way the environment or actor implementation are and follow the same session pattern.

async def my_pre_trial_hook(pre_hook_session):
# The trial config provided by the controller can be retrieved like that
trial_config = pre_hook_session.trial_config
# The trial params can be edited directly
pre_hook_session.environment_config = # [...]
pre_hook_session.environment_endpoint = "grpc://my_environment:9000"
pre_hook_session.actors = [
{
"name": "my_first_actor_name",
"actor_class": "driver",
"endpoint": "grpc://driver:9000",
"implementation": "driver_actor",
"config": # [...],
},
{
"name": "my_second_actor_name",
"actor_class": "predestrian",
"endpoint": "cogment://client",
"config": # [...],
},
]
# And finally should be validated
pre_hook_session.validate()

context.register_pre_trial_hook(impl=my_pre_trial_hook)

Trial lifetime

Over their lifetime trials can be in different states managed by the Orchestrator. The current state of the trial can be accessed by a Controler.

Possible Trial States are defined as:

Trial start

The trial starts with a request from a Controller to the Orchestrator. At creation the trial is in the INITIALIZING state.

If no trial params are provided at this stage, registered Pre-trial hooks are called in sequence. The trial is now fully initialized and becomes PENDING as it wait for all the components, actors and environment to be available.

Once all the non-optional actors are available and the environment sends the initial observation set, the trial becomes RUNNING. If at least one non-optional actor is unavailable, it terminates right away and becomes ENDED.

Trial run

As long as the trial is RUNNING, the Orchestrator executes steps:

  • To each actor, the Orchestrator sends an event, having the ACTIVE type, containing its observation,
  • The Orchestrator then waits to receive one action per actor,
  • To the environment, the Orchestrator sends an event, having the ACTIVE type, containing the set of actions,
  • The Orchestrator then waits to receive the set of observations from the environment.

Trial end

Trial ended by the environment

This is the normal way for a trial to end.

  1. The environment sends the final observation set, using the session's end method.
  2. The trial state is set to TERMINATING.
  3. Observations are sent to the actors with the event type ENDING. Rewards and messages sent before that point will sent to their destination; later ones will not.
  4. Once all the last events are received, the Orchestrator sends a event of type FINAL to all the components.
  5. The trial becomes ENDED.

Soft termination

Soft termination can be triggered either by the user, using the Controller or after a maximum number of steps, if the trial Parameter max_steps is set.

  1. The Orchestrator waits until it receives the next action set from the actors.
  2. The trial state is set to TERMINATING.
  3. Actions are sent to the environment with the event type ENDING.
  4. From this point on the flow is the same as when the trial is ended by the environment.
note

After a soft termination is initiated (i.e. after receiving an event of type ENDING), the next set of observations sent by the environment will always be the last one (i.e. using end or produce_observation will have the same behavior).

Hard termination

Hard termination can be triggered in multiple ways:

  • By the user, using the Controller.
  • Because of unavailable actors. If a required (non-optional) actor becomes unavailable (for any reason), the trial will be terminated (see Actor Availability).
  • Because of inactivity. If a component (actor, environment, pre-trial hook, etc) does not respond, the trial will be waiting and be inactive. If the trial is inactive for too long (see Parameter max_inactivity), it will be terminated. With actor components this may happen when the timeout is too long (or indefinite) then the actor never becomes unavailable (see Actor Availability) and the trial will keep waiting for it.
  • Because of a critical error.

In all this cases:

  1. The trial becomes TERMINATING.
  2. The Orchestrator sends a event of type FINAL to all the components.
  3. The trial becomes ENDED.