Skip to main content

Development guide

This document gives a high level view of what it is to develop a Cogment based application using Cogment SDKs. It assumes the reader is familiar with the Cogment core concepts.

note

Some features aren't available in Javascript, if there's only Python examples under a certain section, that feature is python only.

The High-level Cogment API expects users to use protocol buffers to declare a project's data structures. The intricacies of protobufs are beyond the scope of this document. Basic knowledge of the technology and its usage is assumed.

The spec file

The spec file, usully named cogment.yaml, defines the specifics of a type of trials for a Cogment app: actor classes as well as environment & trial configuration types. It is the starting point for a project.

An actor class is primarily defined by its observation space and action space.

The data structures describing these spaces are declared by using a protocol buffer message type. Observations and actions will simply be instances of the matching type.

For example, in the following, driver and pedestrian share a common view of the environment, hence use the same observation space, but have different actions available to them.

import:
proto:
- city.proto

actors:
driver:
observation:
space: city.Observation

action:
space: city.DriverAction

pedestrian:
observation:
space: city.Observation

action:
space: city.PedestrianAction
tip

This shows only the relevant part of the full spec file, you can find the full list of configurable options in the reference page.

Compiling the spec file

In order to use the spec file within the various supported languages, it needs to be compiled. This is done by the code generation modules of the specific language's SDK you are using.

For the python SDK, the generation step requires an extra from the cogment package. Using pip you can install the SDK with its extra using:

pip install cogment[generate]

The generation can then be executed using:

python -m cogment.generate cogment.yaml

This will create a cog_settings.py file as well as multiple *_pb.py files.

Environment

Environments are implemented by functions that take a environment session instance.

This function will be called once for each trial. This function usually consists of three sections.

  • The environment's initialization, where its internal state can be initialized and processes started. It ends with the sending of the initial observations to the actors participating in the trial.
  • Its event loop, where the environment iterates through the events occurring during the trial and produces observations as well as receives messages. In this loop the environment can end the trial on its own or the end can be requested by a controller.
  • Its termination, where cleanup occurs.

In the common case where all actors within a trial share the same observation, a bare-minimum environment service would look like this:

async def environment(environment_session):
# -- Initialization --

# Retrieve the actors participating in the trial
actors = environment_session.get_active_actors()

# Start the trial and send a starting observation to all actors
environment_session.start([("*", Observation())])

# -- Event loop --
async for event in environment_session.all_events():
if event.actions:
# `event.actions` is a list of the actions done by the actors
actions = event.actions
if event.type == cogment.EventType.ACTIVE:
# The trial is active, produce an observation in response to the actions
environment_session.produce_observations([("*", Observation())])
# Alternatively the environment can decide to **end** the trial with the following
# environment_session.end([("*", Observation())])
else:
# The trial termination has been requested by an external controller
# Produce a final observation
environment_session.end([("*", Observation())])

for message in event.messages:
# `event.messages` is a list of all the messages received by the environment (it can be empty)

# Handle each message here.

# -- Termination --
print(f"Trial [{environment_session.get_trial_id()}] terminated")

For further details, take a look at the cogment.EnvironmentSession class reference.

This environment implementation needs to be registered and served so that the orchestrator can reach it. This can be done through a Context instance.

context = cogment.Context(user_id="my_user_id", cog_settings=cog_settings)

context.register_environment(impl=environment, impl_name="my_environment")

await context.serve_all_registered(cogment.ServedEndpoint(port=9000))

For further details, take a look at the cogment.Context class reference.

Sending observations

The environment session has 3 different methods able to send observations: start, produce_observations and end. Each of those methods takes a list of 2-tuples destination / observation.

As demonstrated above, sending the same observation to all actors is done using "*" as the destination.

environment_session.produce_observations([("*", Observation(...))])

It is also possible to send different observations to different actors. This can be useful to send observations of the world from the point of view of the actor or to send partial observations.

environment_session.produce_observations([
("my_first_actor_name", Observation(...)),
("my_second_actor_name", Observation(...))
])

Please note that the environment should always send observations such as each actor in the trial receives one.

Actor

Actors implementations look a lot like the environment's. They take actor session instance and have the same three sections: initialization, event loop and termination.

The event loops in Actors' implementations handle three basic types of events:

  • observation produced by the environment and should lead to an action being done.
  • rewards sent by other actors or the environment, we'll talk about them in more details below.
  • messages sent by other actors or the environment, we'll talk about them in more details below.

A typical actor implementation would look like this:

async def driver_actor(actor_session):
# -- Initialization --

# Notify that the actor is ready for the trial to start.
actor_session.start()

async for event in actor_session.all_events():
if event.observation:
# `event.observation` is an instance of the Observation produced by the environment
observation = event.observation
if event.type == cogment.EventType.ACTIVE:
# The trial is active, it is expecting the agent to do an action
actor_session.do_action(DriverAction(...))

for reward in event.rewards:
# `event.rewards` is a list of all the rewards received by the actor (it can be empty)

# Handle each reward here.

for message in event.messages:
# `event.messages` is a list of all the messages received by the actor (it can be empty)

# Handle each message here.

# -- Termination --
print(f"Trial [{actor_session.get_trial_id()}] terminated")

For further details, take a look at the cogment.ActorSession class reference.

Service actor / Client actor

A Cogment app can use two types of actors, they are identical in terms of implementation but differ in how they interact with the app's Orchestrator.

Service actors are accessible in the same way the environment is, through a Context instance.

context = cogment.Context(cog_settings=cog_settings, user_id="my_user_id")
context.register_actor(impl=actor, impl_name="driver_actor", actor_classes=["driver"])
context.register_actor(impl=actor_slow, impl_name="driver_actor_slow", actor_classes=["driver"])

await context.serve_all_registered(cogment.ServedEndpoint(port=9000))

Note that it is also through this registrating that the implementation is associated with one or more actor classes it implements.

Client actors, contrary to Service actors, are not served to the orchestrator. They connect as clients of the orchestrator and join a trial that has started.

context = cogment.Context(cog_settings=cog_settings, user_id="my_user_id")
context.register_actor(
impl=actor,
impl_name="human_pedestrian",
actor_classes=["pedestrian"]
)

await context.join_trial(
trial_id=trial_id,
cogment.Endpoint(url="grpc://orchestrator:9000"),
actor_name="Alice"
)

Note that a trial including one or more client actors will wait for all of them to join before any actor can start processing events.

Due to the different network requirements, client actors are a good fit when implementing a frontend for human actors.

Controller

Trials are started by clients of the Orchestrator using a Controller. Instances of a controller are built from the context instance and connect to an Orchestrator endpoint.

controller = context.get_controller(
endpoint=cogment.Endpoint(url="grpc://orchestrator:9000")
)

For further details, take a look at the Controller class reference.

Start and terminate trials

The controller can then be used to create trials and request their termination.

trial_id = await controller.start_trial(trial_config=TrialConfig())

# ...

await controller.terminate_trial([trial_id])

Start trial from trial parameters

The above example uses an instance of the user-defined trial configuration to start the trial. This instance is then provided to a pre-trial hook to fully define the trial parameters. It is also possible to fully provide the trial parameters when starting the trial.

actor_1_params = cogment.ActorParameters(
cog_settings,
name="Alice",
class_name="pedestrian",
endpoint="cogment://client"
)
actor_2_params = cogment.ActorParameters(
cog_settings,
name="ai_driver",
class_name="driver",
endpoint="grpc://driver_actors:9000",
implementation="driver_actor_slow"
)

environment_config=EnvironmentConfig(
# ...
)

trial_params=cogment.TrialParameters(
cog_settings,
environment_name="environment",
environment_implementation="my_environment"
environment_endpoint="grpc://environment:9000",
environment_config=environment_config,
actors=[
actor_1_params,
actor_2_params,
]
)

For further details, take a look at the cogment.TrialParameters class reference.

Watch trials

The controller can also be used to subscribe to events occuring in the trials run by the Orchestrator it connects to. For example, this can be used to wait for a trial's end:

async for trial_info in controller.watch_trials(trial_state_filters=[
cogment.TrialState.ENDED
]):
print(f"The trial having id [{trial_info.trial_id}] ended")

Rewards

Creating

Rewards are sent to actors from another actor or the environment. The session instance passed to their implementation can be used for this purpose.

session.add_reward(value=-1, confidence=1, tick_id=-1, to=['an_actor_name'])

Rewards consist of an arbitrary numerical value describing how the reward "sender" believes the actor performed. It is weighted by a value between 0 and 1 qualifying the confidence of the "sender" in its reward, from a very low confidence just above 0 to a very high confidence approaching 1. The confidence value is used to collate all the rewards sent to an actor at the same time. Optionally, a reward can be provided with arbitrary user data.

Each reward applies to a list of recipients (either all the actors, all the actors of a given class or a specific actor) at a specific point in time, during the trial, defined as a tick.

The full documentation for session.add_reward can be found here.

Consuming

All the rewards that are sent and destined to each specific actor for a given point in time are collated together by the framework.

The actor can take into account the reward directly as the trial is running by consuming the "reward" event in their event loop.

async for event in actor_session.all_events():
# [...]
for reward in event.rewards:
# `reward.tick_id` is the id of the tick this reward concerns.
tick_id = reward.tick_id
# `reward.value` is the aggregated value of the reward.
value = reward.value
for source_reward in reward.all_sources():
# Iterate over individual source rewards.
reward_value = source_reward.value
reward_confidence = source_reward.confidence
reward_sender = source_reward.sender
reward_user_data = source_reward.user_data

Messages

Creating

Messages can be created and sent between actors or the environment within a trial using their session instance.

session.send_message(
user_data=MyProtobufDataStructure(...), # any protobuf data structure can be used here.
to=['pedestrian:*'], # send the message to all the actors of the "pedestrian" class
)

The full documentation for session.send_message can be found here.

Messages consist of an arbitrary payload, their user_data/payload, defined as an instance of any protobuf data structure.

A message can be sent to one, many or all actors in a trial and / or to the environment.

Consuming

All the messages that are sent and intended for each specific actor or environment will be received by the target actor or environment.

Actors or the environment can use the message directly, live, as the trial is running, by consuming message event in their event loop.

async for event in actor_session.all_events():
# [...]
for message in event.messages:
# `message.sender_name` is the name of the actor who sent a message
sender_name = message.sender_name
# `message.payload` is the content of the message, it needs to be unpacked
payload = message.payload

Pre-trial hook

When starting a trial from a controller, an instance of the message type defined in trial:config_type can be provided. This instance is then passed to the registered Pre trial hooks when the Orcehstrator was started. The role of these hooks is to fully parametrize the trial based on the provided config. To achieve that, they can modify the default trial params defined in the parameters to specify the environment (i.e. its endpoint, implementation name & configuration), the number and parameters of the participant actors (i.e. their name, class, endpoint, implementation name & configuration) as well as additional parameters for the trial. The pre-trial hook can therefore be used to dynamically configure trials, to act as a service endpoint registry, or a load balancer.

Pre-trial hook implementations are registered in the same way the environment or actor implementation are and follow the same session pattern.

async def my_pre_trial_hook(pre_hook_session):
# The trial config provided by the controller can be retrieved like that
trial_config = pre_hook_session.trial_config
# The trial params can be edited directly
pre_hook_session.environment_config = # [...]
pre_hook_session.environment_endpoint = "grpc://my_environment:9000"
pre_hook_session.actors = [
{
"name": "my_first_actor_name",
"actor_class": "driver",
"endpoint": "grpc://driver:9000",
"implementation": "driver_actor",
"config": # [...],
},
{
"name": "my_second_actor_name",
"actor_class": "predestrian",
"endpoint": "cogment://client",
"config": # [...],
},
]
# And finally should be validated
pre_hook_session.validate()

context.register_pre_trial_hook(impl=my_pre_trial_hook)