Skip to content

Controller

Controllers run at every simulation step. They can observe the state of all microgrids and modify their components. See the Controllers concept page for usage examples.

Controller

Bases: ABC

Abstract base class for all controllers in the simulation.

Controllers are used to monitor the simulation state and to control the behavior of the microgrids. They are executed at every simulation step.

Source code in vessim/controller.py
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
class Controller(ABC):
    """Abstract base class for all controllers in the simulation.

    Controllers are used to monitor the simulation state and to control the
    behavior of the microgrids. They are executed at every simulation step.
    """

    def start(self, environment: Environment) -> None:
        """Executed before the simulation starts.

        Can be overridden to inspect the simulation topology or perform initialization
        that requires access to the `Environment`.
        """

    @abstractmethod
    def step(self, now: datetime, microgrid_states: dict[str, MicrogridState]) -> None:
        """Performs a simulation step.

        Args:
            now: Current datetime in the simulation.
            microgrid_states: Maps microgrid names to their current state.
        """

    def finalize(self) -> None:
        """Executed after simulation has ended. Can be overridden for clean-up."""

start

Executed before the simulation starts.

Can be overridden to inspect the simulation topology or perform initialization that requires access to the Environment.

Source code in vessim/controller.py
58
59
60
61
62
63
def start(self, environment: Environment) -> None:
    """Executed before the simulation starts.

    Can be overridden to inspect the simulation topology or perform initialization
    that requires access to the `Environment`.
    """

step abstractmethod

Performs a simulation step.

Parameters:

Name Type Description Default
now datetime

Current datetime in the simulation.

required
microgrid_states dict[str, MicrogridState]

Maps microgrid names to their current state.

required
Source code in vessim/controller.py
65
66
67
68
69
70
71
72
@abstractmethod
def step(self, now: datetime, microgrid_states: dict[str, MicrogridState]) -> None:
    """Performs a simulation step.

    Args:
        now: Current datetime in the simulation.
        microgrid_states: Maps microgrid names to their current state.
    """

finalize

Executed after simulation has ended. Can be overridden for clean-up.

Source code in vessim/controller.py
74
75
def finalize(self) -> None:
    """Executed after simulation has ended. Can be overridden for clean-up."""

CsvLogger

Bases: Controller

Controller that logs simulation results to a directory.

Writes two files: - metadata.yaml: static experiment configuration (run metadata, microgrid topology, actor and dispatchable parameters) written once before the simulation starts. - timeseries.csv: dynamic state logged at every simulation step.

Parameters:

Name Type Description Default
outdir str | Path

Path to the output directory (created if it doesn't exist).

required
Source code in vessim/controller.py
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
class CsvLogger(Controller):
    """Controller that logs simulation results to a directory.

    Writes two files:
    - ``metadata.yaml``: static experiment configuration (run metadata, microgrid
      topology, actor and dispatchable parameters) written once before the simulation
      starts.
    - ``timeseries.csv``: dynamic state logged at every simulation step.

    Args:
        outdir: Path to the output directory (created if it doesn't exist).
    """

    def __init__(self, outdir: str | Path):
        self.outdir = Path(outdir)
        self.outdir.mkdir(exist_ok=True, parents=True)
        self.csv_path = self.outdir / "timeseries.csv"
        self.fieldnames: dict[str, list] = {}
        self._last_sim_time: Optional[datetime] = None

    def start(self, environment: Environment) -> None:
        try:
            import yaml
        except ImportError:
            raise ImportError(
                "CsvLogger requires 'pyyaml'. Install with: pip install pyyaml"
            )

        self._yaml = yaml

        git_hash: Optional[str] = None
        try:
            import subprocess
            result = subprocess.run(
                ["git", "rev-parse", "HEAD"],
                capture_output=True, text=True, check=True,
            )
            git_hash = result.stdout.strip()
        except Exception:
            pass

        config = _build_experiment_config(environment)
        config["execution"] = {
            "status": "running",
            "git_hash": git_hash,
            "start": datetime.now().isoformat(),
            "end": None,
            "duration": None,
        }

        self._exec_start = datetime.now()
        with (self.outdir / "metadata.yaml").open("w") as f:
            yaml.dump(config, f, default_flow_style=False, sort_keys=False)

    def step(self, now: datetime, microgrid_states: dict[str, MicrogridState]) -> None:
        self._last_sim_time = now
        for mg_name, mg_state in microgrid_states.items():
            log_entry = {
                "microgrid": mg_name,
                "time": now,
                **flatten_dict(dict(mg_state)),
            }

            if mg_name not in self.fieldnames:
                self.fieldnames[mg_name] = list(log_entry.keys())
                mode, write_header = "w", True
            else:
                mode, write_header = "a", False

            with self.csv_path.open(mode, newline="") as csvfile:
                writer = DictWriter(csvfile, fieldnames=self.fieldnames[mg_name])
                if write_header:
                    writer.writeheader()
                writer.writerow(log_entry)

    def finalize(self) -> None:
        experiment_path = self.outdir / "metadata.yaml"
        if experiment_path.exists() and hasattr(self, "_yaml"):
            end = datetime.now()
            with experiment_path.open() as f:
                config = self._yaml.safe_load(f)
            config["execution"]["status"] = "completed"
            config["execution"]["end"] = end.isoformat()
            config["execution"]["duration"] = round((end - self._exec_start).total_seconds(), 3)
            if self._last_sim_time is not None:
                config["environment"]["sim_end"] = str(self._last_sim_time)
            with experiment_path.open("w") as f:
                self._yaml.dump(config, f, default_flow_style=False, sort_keys=False)

MemoryLogger

Bases: Controller

Controller that logs the state of the simulation in memory.

The logged state can be retrieved as a dictionary or a pandas DataFrame. After the simulation, config contains the static experiment metadata (same information that CsvLogger writes to metadata.yaml).

Source code in vessim/controller.py
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
class MemoryLogger(Controller):
    """Controller that logs the state of the simulation in memory.

    The logged state can be retrieved as a dictionary or a pandas DataFrame.
    After the simulation, ``config`` contains the static experiment metadata
    (same information that ``CsvLogger`` writes to ``metadata.yaml``).
    """

    def __init__(self):
        self.log: dict[datetime, dict[str, MicrogridState]] = defaultdict(dict)
        self.config: dict = {}

    def start(self, environment: Environment) -> None:
        self.config = _build_experiment_config(environment)

    def step(self, now: datetime, microgrid_states: dict[str, MicrogridState]) -> None:
        self.log[now] = microgrid_states

    def to_dict(self) -> dict[datetime, dict[str, MicrogridState]]:
        """Returns the logged data as a dictionary."""
        return dict(self.log)

    def to_df(self):
        """Returns the logged data as a pandas DataFrame.

        The DataFrame has a MultiIndex (time, microgrid) and columns for each
        state variable. Requires 'pandas' to be installed.
        """
        try:
            import pandas as pd
        except ImportError:
            raise ImportError(
                "MemoryLogger.to_df() requires 'pandas'. "
                "Install with: pip install pandas"
            )

        data = []
        for t, microgrid_states in self.log.items():
            for mg_name, state in microgrid_states.items():
                row = flatten_dict(state)
                row["time"] = t
                row["microgrid"] = mg_name
                data.append(row)

        df = pd.DataFrame(data)
        if not df.empty:
            df = df.set_index(["time", "microgrid"])
        return df

to_dict

Returns the logged data as a dictionary.

Source code in vessim/controller.py
96
97
98
def to_dict(self) -> dict[datetime, dict[str, MicrogridState]]:
    """Returns the logged data as a dictionary."""
    return dict(self.log)

to_df

Returns the logged data as a pandas DataFrame.

The DataFrame has a MultiIndex (time, microgrid) and columns for each state variable. Requires 'pandas' to be installed.

Source code in vessim/controller.py
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
def to_df(self):
    """Returns the logged data as a pandas DataFrame.

    The DataFrame has a MultiIndex (time, microgrid) and columns for each
    state variable. Requires 'pandas' to be installed.
    """
    try:
        import pandas as pd
    except ImportError:
        raise ImportError(
            "MemoryLogger.to_df() requires 'pandas'. "
            "Install with: pip install pandas"
        )

    data = []
    for t, microgrid_states in self.log.items():
        for mg_name, state in microgrid_states.items():
            row = flatten_dict(state)
            row["time"] = t
            row["microgrid"] = mg_name
            data.append(row)

    df = pd.DataFrame(data)
    if not df.empty:
        df = df.set_index(["time", "microgrid"])
    return df

Api

Bases: Controller

REST API interface for microgrid data and control.

The API controller starts a background process with a FastAPI-based broker that exposes endpoints to query the current state of the microgrids and to send control commands to them.

Parameters:

Name Type Description Default
export_prometheus bool

Whether to export metrics to Prometheus. Defaults to False.

False
broker_port int

The port on which the API broker should run. Defaults to 8700.

8700
Source code in vessim/controller.py
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
class Api(Controller):
    """REST API interface for microgrid data and control.

    The API controller starts a background process with a FastAPI-based broker
    that exposes endpoints to query the current state of the microgrids and to
    send control commands to them.

    Args:
        export_prometheus: Whether to export metrics to Prometheus. Defaults to False.
        broker_port: The port on which the API broker should run. Defaults to 8700.
    """

    def __init__(
        self,
        export_prometheus: bool = False,
        broker_port: int = 8700,
    ):
        try:
            import requests

            self.requests = requests
        except ImportError:
            raise ImportError(
                "Api requires 'requests' package. Install with: pip install vessim[sil]"
            )

        self.broker_port = broker_port
        self.broker_url = f"http://localhost:{broker_port}"
        self.broker_process: Optional[multiprocessing.Process] = None
        self.export_prometheus = export_prometheus
        self.microgrids: dict[str, Microgrid] = {}

    def start(self, environment: Environment) -> None:
        self.microgrids = dict(environment.microgrids)
        self._start_broker()
        for mg_name, mg in self.microgrids.items():
            config = {
                "name": mg_name,
                "actors": [actor.config() for actor in mg.actors],
                "dispatch": [
                    {
                        "name": d.name,
                        "type": d.__class__.__name__,
                        **d.config(),
                    }
                    for d in mg.dispatchables
                ],
                "policy": {
                    "type": mg.policy.__class__.__name__,
                    **mg.policy.state(),
                },
                "coords": mg.coords,
            }
            self.requests.post(f"{self.broker_url}/internal/microgrids/{mg_name}", json=config)
        print(f"Registered {len(self.microgrids)} microgrids with API broker.")

    def _start_broker(self):
        from vessim._broker import run_broker

        self.broker_process = multiprocessing.Process(
            target=run_broker,
            args=(self.broker_port, self.export_prometheus),
            daemon=True
        )
        self.broker_process.start()
        time.sleep(2)
        prometheus_str = " (incl. Prometheus exporter)" if self.export_prometheus else ""
        print(f"🌐 API{prometheus_str} available at: {self.broker_url}")

    def step(self, now: datetime, microgrid_states: dict[str, MicrogridState]) -> None:
        """Process commands and push microgrid states to broker."""
        response = self.requests.get(f"{self.broker_url}/internal/commands")
        commands = response.json().get("commands", [])
        for cmd in commands:
            if cmd.get("type") == "set_parameter":
                microgrid_name = cmd.get("microgrid")
                if microgrid_name not in self.microgrids:
                    continue

                mg = self.microgrids[microgrid_name]
                target = cmd.get("target")
                prop = cmd.get("property")
                val = cmd.get("value")

                if target == "dispatchable":
                    target_name = cmd.get("target_name")
                    d = next((d for d in mg.dispatchables if d.name == target_name), None)
                    if d:
                        setattr(d, prop, val)
                elif target == "policy":
                    setattr(mg.policy, prop, val)
                elif target == "actor":
                    actor_name = cmd.get("target_name")
                    actor = next((a for a in mg.actors if a.name == actor_name), None)
                    if actor and hasattr(actor.signal, "set_value"):
                        actor.signal.set_value(val)

        for mg_name, mg_state in microgrid_states.items():
            self.requests.post(
                f"{self.broker_url}/internal/data/{mg_name}",
                json={"microgrid": mg_name, "time": now.isoformat(), **mg_state},
            )

    def finalize(self) -> None:
        """Clean up resources when simulation ends."""
        if self.broker_process and self.broker_process.is_alive():
            self.broker_process.terminate()
            self.broker_process.join(timeout=2)

        print("🛑 API broker terminated")

step

Process commands and push microgrid states to broker.

Source code in vessim/controller.py
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
def step(self, now: datetime, microgrid_states: dict[str, MicrogridState]) -> None:
    """Process commands and push microgrid states to broker."""
    response = self.requests.get(f"{self.broker_url}/internal/commands")
    commands = response.json().get("commands", [])
    for cmd in commands:
        if cmd.get("type") == "set_parameter":
            microgrid_name = cmd.get("microgrid")
            if microgrid_name not in self.microgrids:
                continue

            mg = self.microgrids[microgrid_name]
            target = cmd.get("target")
            prop = cmd.get("property")
            val = cmd.get("value")

            if target == "dispatchable":
                target_name = cmd.get("target_name")
                d = next((d for d in mg.dispatchables if d.name == target_name), None)
                if d:
                    setattr(d, prop, val)
            elif target == "policy":
                setattr(mg.policy, prop, val)
            elif target == "actor":
                actor_name = cmd.get("target_name")
                actor = next((a for a in mg.actors if a.name == actor_name), None)
                if actor and hasattr(actor.signal, "set_value"):
                    actor.signal.set_value(val)

    for mg_name, mg_state in microgrid_states.items():
        self.requests.post(
            f"{self.broker_url}/internal/data/{mg_name}",
            json={"microgrid": mg_name, "time": now.isoformat(), **mg_state},
        )

finalize

Clean up resources when simulation ends.

Source code in vessim/controller.py
321
322
323
324
325
326
327
def finalize(self) -> None:
    """Clean up resources when simulation ends."""
    if self.broker_process and self.broker_process.is_alive():
        self.broker_process.terminate()
        self.broker_process.join(timeout=2)

    print("🛑 API broker terminated")