Skip to content

Signal

A Signal provides a time-varying value to actors and grid signals. See the Signals and Datasets concept page for usage examples.

Signal

Bases: ABC

Abstract base class for signals.

Source code in vessim/signal.py
13
14
15
16
17
18
19
20
21
22
23
24
25
class Signal(ABC):
    """Abstract base class for signals."""

    @abstractmethod
    def at(self, elapsed: Optional[timedelta | float] = None) -> float:
        """Return the signal's value at the given elapsed time since `sim_start`.

        `elapsed` accepts a `timedelta` or `float` interpreted as
        seconds since `sim_start`.
        """

    def finalize(self) -> None:
        """Perform necessary finalization tasks of a signal."""

at abstractmethod

Return the signal's value at the given elapsed time since sim_start.

elapsed accepts a timedelta or float interpreted as seconds since sim_start.

Source code in vessim/signal.py
16
17
18
19
20
21
22
@abstractmethod
def at(self, elapsed: Optional[timedelta | float] = None) -> float:
    """Return the signal's value at the given elapsed time since `sim_start`.

    `elapsed` accepts a `timedelta` or `float` interpreted as
    seconds since `sim_start`.
    """

finalize

Perform necessary finalization tasks of a signal.

Source code in vessim/signal.py
24
25
def finalize(self) -> None:
    """Perform necessary finalization tasks of a signal."""

StaticSignal

Bases: Signal

Source code in vessim/signal.py
28
29
30
31
32
33
34
35
36
37
38
39
40
class StaticSignal(Signal):
    def __init__(self, value: float) -> None:
        self._v = value

    def __repr__(self):
        """Returns a string representation for the Vessim viewer."""
        return f"StaticSignal({self._v})"

    def set_value(self, value: float) -> None:
        self._v = value

    def at(self, elapsed: Optional[timedelta | float] = None):
        return self._v

Trace

Bases: Signal

Replays a time series indexed by an offset since simulation start.

Internally a Trace is always offset-indexed: row 0 sits at offset=0 seconds, every other row is a positive offset from there. For a Trace, the queried elapsed time and a row's offset are the same coordinate: at(elapsed) returns the value whose offset best matches the elapsed time the simulation has accumulated since sim_start.

The accepted index types are:

  • TimedeltaIndex or numeric (interpreted as offset seconds): used as-is. Must start at offset=0. anchor must not be provided. This is the canonical form.
  • DatetimeIndex: a convenience for calendar-stamped data. anchor is required and must match an existing row exactly; that row is rebased to offset=0 and earlier rows are dropped.

For loading offset-indexed or datetime-indexed CSVs, use Trace.from_csv.

Parameters:

Name Type Description Default
data Series | DataFrame

A pandas Series or DataFrame. See above for accepted index types. Each column represents one zone of data; the column name is the zone name. Between samples, values are interpolated using fill_method (ffill or bfill).

required
anchor Optional[Timestamp | str]

Required when data has a DatetimeIndex. Must be a value present in the index. The matching row is rebased to offset=0. Forbidden for TimedeltaIndex and numeric indices (which are already offset-indexed).

None
on_overflow Literal['raise']

What to do if queried beyond the trace's range. Currently only "raise" is supported. Defaults to "raise".

'raise'
column Optional[str]

Default column to use when at() is called without one. Defaults to None.

None
fill_method Literal['ffill', 'bfill']

How values between timestamps are computed. Either ffill (use the most recent past value) or bfill (use the next future value). Defaults to ffill.

'ffill'
Source code in vessim/signal.py
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
class Trace(Signal):
    """Replays a time series indexed by an *offset* since simulation start.

    Internally a `Trace` is always offset-indexed: row 0 sits at offset=0
    seconds, every other row is a positive offset from there. For a `Trace`,
    the queried `elapsed` time and a row's `offset` are the same coordinate:
    `at(elapsed)` returns the value whose offset best matches the elapsed
    time the simulation has accumulated since `sim_start`.

    The accepted index types are:

    - `TimedeltaIndex` or numeric (interpreted as offset *seconds*): used
      as-is. Must start at offset=0. `anchor` must not be provided. This is
      the canonical form.
    - `DatetimeIndex`: a convenience for calendar-stamped data. `anchor` is
      required and must match an existing row exactly; that row is rebased
      to offset=0 and earlier rows are dropped.

    For loading offset-indexed or datetime-indexed CSVs, use `Trace.from_csv`.

    Args:
        data: A pandas `Series` or `DataFrame`. See above for accepted index
            types. Each column represents one zone of data; the column name
            is the zone name. Between samples, values are interpolated using
            `fill_method` (`ffill` or `bfill`).
        anchor: Required when `data` has a `DatetimeIndex`. Must be a value
            present in the index. The matching row is rebased to offset=0.
            Forbidden for `TimedeltaIndex` and numeric indices (which are
            already offset-indexed).
        on_overflow: What to do if queried beyond the trace's range.
            Currently only `"raise"` is supported. Defaults to `"raise"`.
        column: Default column to use when `at()` is called without one.
            Defaults to None.
        fill_method: How values between timestamps are computed. Either
            `ffill` (use the most recent past value) or `bfill` (use the next
            future value). Defaults to `ffill`.
    """

    def __init__(
        self,
        data: pd.Series | pd.DataFrame,
        anchor: Optional[pd.Timestamp | str] = None,
        on_overflow: Literal["raise"] = "raise",
        column: Optional[str] = None,
        fill_method: Literal["ffill", "bfill"] = "ffill",
        repr_: Optional[str] = None,
    ):
        if on_overflow != "raise":
            raise ValueError(
                f"on_overflow={on_overflow!r} is not yet supported. Only 'raise' is available."
            )

        self._fill_method = fill_method
        self._on_overflow = on_overflow
        self.default_column = column
        self.repr_ = repr_

        if not isinstance(data, (pd.Series, pd.DataFrame)):
            raise ValueError(f"Incompatible type {type(data)} for 'data'.")

        raw_index = data.index
        if isinstance(raw_index, pd.DatetimeIndex):
            if anchor is None:
                raise ValueError(
                    "anchor is required for datetime-indexed data. "
                    "Pass anchor=<timestamp> matching a row in your data."
                )
            anchor_ts = pd.to_datetime(anchor)
            if anchor_ts not in raw_index:
                raise ValueError(
                    f"anchor={anchor_ts!s} is not present in the data's index. "
                    f"anchor must match an existing row exactly."
                )
            data = data.loc[raw_index >= anchor_ts]
            assert isinstance(data.index, pd.DatetimeIndex)
            offsets_array = (data.index - anchor_ts).to_numpy().astype(
                "timedelta64[ns]"
            )
        elif isinstance(raw_index, pd.TimedeltaIndex):
            if anchor is not None:
                raise ValueError(
                    "anchor is only valid for datetime-indexed data."
                )
            offsets_array = raw_index.to_numpy().astype("timedelta64[ns]")
        elif pd.api.types.is_numeric_dtype(raw_index):
            if anchor is not None:
                raise ValueError(
                    "anchor is only valid for datetime-indexed data."
                )
            offsets_array = pd.to_timedelta(raw_index, unit="s").to_numpy().astype(
                "timedelta64[ns]"
            )
        else:
            raise TypeError(
                f"Trace requires a DatetimeIndex (with anchor=), TimedeltaIndex, "
                f"or numeric (seconds) index, got {type(raw_index).__name__}."
            )

        if offsets_array.size == 0:
            raise ValueError("Trace data cannot be empty.")

        sorter = np.argsort(offsets_array)
        offsets_ns = offsets_array[sorter]

        if offsets_ns[0] != np.timedelta64(0):
            raise ValueError(
                f"Trace must start at offset=0, but starts at {_td(offsets_ns[0])}. "
                f"To delay a trace, pad the beginning of your data with zeros or NaNs."
            )

        self._offsets: dict[str, tuple[np.ndarray, np.ndarray]] = {}
        if isinstance(data, pd.Series):
            values = data.to_numpy(dtype=float, copy=True)[sorter]
            mask = ~np.isnan(values)
            self._offsets[str(data.name)] = (offsets_ns[mask], values[mask])
        else:
            for col in data.columns:
                values = data[col].to_numpy(dtype=float, copy=True)[sorter]
                mask = ~np.isnan(values)
                self._offsets[str(col)] = (offsets_ns[mask], values[mask])

    def __repr__(self):
        """Returns a string representation for the Vessim viewer."""
        return f"Trace({self.repr_ or ''})"

    @classmethod
    def from_csv(
        cls,
        path: str | Path,
        anchor: Optional[pd.Timestamp | str] = None,
        column: Optional[str] = None,
        scale: float = 1.0,
        on_overflow: Literal["raise"] = "raise",
        fill_method: Literal["ffill", "bfill"] = "ffill",
    ) -> Trace:
        """Load a `Trace` from a CSV file.

        Vessim expects **offset-indexed** CSVs: the first column is the offset
        in seconds since the trace start (row 0 at `0`). Remaining columns are
        value columns (zones, regions, etc.).

        As a utility, datetime-indexed CSVs are also accepted and converted on
        load: pass `anchor=<timestamp>` to mark which row corresponds to
        offset=0. See [Signals and Datasets](../concepts/signals.md) for the
        full schema and recipes for fetching data from public APIs.

        Args:
            path: Path to the CSV file.
            anchor: Required if the first column is a datetime; must match a
                row in the data exactly. That row becomes `offset=0` and
                earlier rows are dropped. Forbidden if the first column is
                already numeric.
            column: Default column to use when `at()` is called without one.
            scale: Multiplier applied to all values. Useful for normalized data.
            on_overflow: See `Trace`.
            fill_method: See `Trace`.
        """
        df = pd.read_csv(path, index_col=0)
        if scale != 1.0:
            df = df.astype(float) * scale

        if not pd.api.types.is_numeric_dtype(df.index):
            df.index = pd.to_datetime(df.index)

        return cls(
            df,
            anchor=anchor,
            on_overflow=on_overflow,
            column=column,
            fill_method=fill_method,
            repr_=str(path),
        )

    def columns(self) -> list:
        """Returns a list of all available columns."""
        return list(self._offsets.keys())

    def at(
        self,
        elapsed: Optional[timedelta | float] = None,
        column: Optional[str] = None,
    ) -> float:
        """Return the trace's value at the given elapsed time since `sim_start`.

        If `elapsed` falls between sample points, `fill_method` decides how to
        interpolate. If `elapsed` falls outside the trace, a `ValueError` is
        raised (subject to `on_overflow`).

        Args:
            elapsed: Elapsed time since `sim_start`. Either a `timedelta` or a
                number (interpreted as seconds). Required.
            column: Column to query. Required if the trace has more than one.

        Raises:
            ValueError: If `elapsed` is None, before the trace start, after
                the trace end, or refers to an unknown column.
        """
        if elapsed is None:
            raise ValueError("Argument elapsed cannot be None.")
        if not isinstance(elapsed, timedelta):
            elapsed = timedelta(seconds=elapsed)
        if column is None:
            column = self.default_column

        resolved = _get_column_name(self._offsets, column)
        offsets, values = self._offsets[resolved]
        np_at = np.timedelta64(elapsed)

        if self._fill_method == "ffill":
            index = offsets.searchsorted(np_at, side="right") - 1
            if index >= 0:
                return values[index]
            raise ValueError(
                f"Elapsed time {elapsed} is before the start of column '{resolved}' "
                f"(trace covers {_td(offsets[0])} to {_td(offsets[-1])} since sim_start)."
            )
        else:
            index = offsets.searchsorted(np_at, side="left")
            if index < offsets.size:
                return values[index]
            raise ValueError(
                f"Elapsed time {elapsed} is after the end of column '{resolved}' "
                f"(trace covers {_td(offsets[0])} to {_td(offsets[-1])} since sim_start)."
            )

from_csv classmethod

Load a Trace from a CSV file.

Vessim expects offset-indexed CSVs: the first column is the offset in seconds since the trace start (row 0 at 0). Remaining columns are value columns (zones, regions, etc.).

As a utility, datetime-indexed CSVs are also accepted and converted on load: pass anchor=<timestamp> to mark which row corresponds to offset=0. See Signals and Datasets for the full schema and recipes for fetching data from public APIs.

Parameters:

Name Type Description Default
path str | Path

Path to the CSV file.

required
anchor Optional[Timestamp | str]

Required if the first column is a datetime; must match a row in the data exactly. That row becomes offset=0 and earlier rows are dropped. Forbidden if the first column is already numeric.

None
column Optional[str]

Default column to use when at() is called without one.

None
scale float

Multiplier applied to all values. Useful for normalized data.

1.0
on_overflow Literal['raise']

See Trace.

'raise'
fill_method Literal['ffill', 'bfill']

See Trace.

'ffill'
Source code in vessim/signal.py
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
@classmethod
def from_csv(
    cls,
    path: str | Path,
    anchor: Optional[pd.Timestamp | str] = None,
    column: Optional[str] = None,
    scale: float = 1.0,
    on_overflow: Literal["raise"] = "raise",
    fill_method: Literal["ffill", "bfill"] = "ffill",
) -> Trace:
    """Load a `Trace` from a CSV file.

    Vessim expects **offset-indexed** CSVs: the first column is the offset
    in seconds since the trace start (row 0 at `0`). Remaining columns are
    value columns (zones, regions, etc.).

    As a utility, datetime-indexed CSVs are also accepted and converted on
    load: pass `anchor=<timestamp>` to mark which row corresponds to
    offset=0. See [Signals and Datasets](../concepts/signals.md) for the
    full schema and recipes for fetching data from public APIs.

    Args:
        path: Path to the CSV file.
        anchor: Required if the first column is a datetime; must match a
            row in the data exactly. That row becomes `offset=0` and
            earlier rows are dropped. Forbidden if the first column is
            already numeric.
        column: Default column to use when `at()` is called without one.
        scale: Multiplier applied to all values. Useful for normalized data.
        on_overflow: See `Trace`.
        fill_method: See `Trace`.
    """
    df = pd.read_csv(path, index_col=0)
    if scale != 1.0:
        df = df.astype(float) * scale

    if not pd.api.types.is_numeric_dtype(df.index):
        df.index = pd.to_datetime(df.index)

    return cls(
        df,
        anchor=anchor,
        on_overflow=on_overflow,
        column=column,
        fill_method=fill_method,
        repr_=str(path),
    )

columns

Returns a list of all available columns.

Source code in vessim/signal.py
216
217
218
def columns(self) -> list:
    """Returns a list of all available columns."""
    return list(self._offsets.keys())

at

Return the trace's value at the given elapsed time since sim_start.

If elapsed falls between sample points, fill_method decides how to interpolate. If elapsed falls outside the trace, a ValueError is raised (subject to on_overflow).

Parameters:

Name Type Description Default
elapsed Optional[timedelta | float]

Elapsed time since sim_start. Either a timedelta or a number (interpreted as seconds). Required.

None
column Optional[str]

Column to query. Required if the trace has more than one.

None

Raises:

Type Description
ValueError

If elapsed is None, before the trace start, after the trace end, or refers to an unknown column.

Source code in vessim/signal.py
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
def at(
    self,
    elapsed: Optional[timedelta | float] = None,
    column: Optional[str] = None,
) -> float:
    """Return the trace's value at the given elapsed time since `sim_start`.

    If `elapsed` falls between sample points, `fill_method` decides how to
    interpolate. If `elapsed` falls outside the trace, a `ValueError` is
    raised (subject to `on_overflow`).

    Args:
        elapsed: Elapsed time since `sim_start`. Either a `timedelta` or a
            number (interpreted as seconds). Required.
        column: Column to query. Required if the trace has more than one.

    Raises:
        ValueError: If `elapsed` is None, before the trace start, after
            the trace end, or refers to an unknown column.
    """
    if elapsed is None:
        raise ValueError("Argument elapsed cannot be None.")
    if not isinstance(elapsed, timedelta):
        elapsed = timedelta(seconds=elapsed)
    if column is None:
        column = self.default_column

    resolved = _get_column_name(self._offsets, column)
    offsets, values = self._offsets[resolved]
    np_at = np.timedelta64(elapsed)

    if self._fill_method == "ffill":
        index = offsets.searchsorted(np_at, side="right") - 1
        if index >= 0:
            return values[index]
        raise ValueError(
            f"Elapsed time {elapsed} is before the start of column '{resolved}' "
            f"(trace covers {_td(offsets[0])} to {_td(offsets[-1])} since sim_start)."
        )
    else:
        index = offsets.searchsorted(np_at, side="left")
        if index < offsets.size:
            return values[index]
        raise ValueError(
            f"Elapsed time {elapsed} is after the end of column '{resolved}' "
            f"(trace covers {_td(offsets[0])} to {_td(offsets[-1])} since sim_start)."
        )

Software-in-the-Loop signals

These signals require the sil extra (pip install vessim[sil]). See the Software-in-the-Loop concept page for the full picture.

SilSignal

Bases: Signal

Base class for Software-in-the-Loop signals with background polling.

This class provides common functionality for signals that need to periodically fetch data from external sources (APIs, databases, etc.) and cache the results.

Parameters:

Name Type Description Default
update_interval float

Interval in seconds between data updates

5.0
timeout float

Request timeout in seconds for external calls

10.0
Source code in vessim/signal.py
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
class SilSignal(Signal):
    """Base class for Software-in-the-Loop signals with background polling.

    This class provides common functionality for signals that need to periodically
    fetch data from external sources (APIs, databases, etc.) and cache the results.

    Args:
        update_interval: Interval in seconds between data updates
        timeout: Request timeout in seconds for external calls
    """

    def __init__(self, update_interval: float = 5.0, timeout: float = 10.0):
        try:
            from threading import Timer
        except ImportError:
            raise ImportError("SilSignal requires threading support")

        self.Timer = Timer
        self.update_interval = update_interval
        self.timeout = timeout

        self._last_update: Optional[float] = None
        self._cached_value: float = 0.0
        self._stop_polling = False

        # Start background polling
        self._start_background_polling()

    def __repr__(self):
        """Returns a string representation for the Vessim viewer."""
        return f"{self.__class__.__name__}(interval={self.update_interval}s)"

    @abstractmethod
    def _fetch_current_value(self) -> float:
        """Fetch the current value from the external source.

        This method should be implemented by subclasses to define how to
        retrieve data from their specific external source.

        Returns:
            Current value from the external source

        Raises:
            Exception: Any exception that occurs during data fetching
        """

    def _start_background_polling(self) -> None:
        """Start background polling in a separate thread."""

        def poll():
            if not self._stop_polling:
                try:
                    self._cached_value = self._fetch_current_value()
                    self._last_update = time.time()
                except Exception:
                    pass  # Keep using cached value
                # Schedule next poll
                # TODO: daemonize the Timer threads so a forgotten finalize()
                # cannot keep the interpreter alive after main returns.
                self.Timer(self.update_interval, poll).start()

        self.Timer(0, poll).start()  # Start immediately

    def at(self, elapsed: Optional[timedelta | float] = None) -> float:
        """Return the current cached value.

        Args:
            elapsed: Elapsed time since `sim_start` (ignored for real-time data).

        Returns:
            Current cached value
        """
        return self._cached_value

    def finalize(self) -> None:
        """Stop background polling and clean up resources."""
        self._stop_polling = True

at

Return the current cached value.

Parameters:

Name Type Description Default
elapsed Optional[timedelta | float]

Elapsed time since sim_start (ignored for real-time data).

None

Returns:

Type Description
float

Current cached value

Source code in vessim/signal.py
347
348
349
350
351
352
353
354
355
356
def at(self, elapsed: Optional[timedelta | float] = None) -> float:
    """Return the current cached value.

    Args:
        elapsed: Elapsed time since `sim_start` (ignored for real-time data).

    Returns:
        Current cached value
    """
    return self._cached_value

finalize

Stop background polling and clean up resources.

Source code in vessim/signal.py
358
359
360
def finalize(self) -> None:
    """Stop background polling and clean up resources."""
    self._stop_polling = True

PrometheusSignal

Bases: SilSignal

Signal that pulls energy usage data from a Prometheus instance.

Parameters:

Name Type Description Default
prometheus_url str

Base URL of the Prometheus server (e.g., 'http://localhost:9090')

required
query str

PromQL query to fetch energy usage data

required
update_interval float

Interval in seconds between metric updates

10
timeout float

Request timeout in seconds

10
username Optional[str]

Username for HTTP Basic Authentication (optional)

None
password Optional[str]

Password for HTTP Basic Authentication (optional)

None
Source code in vessim/signal.py
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
class PrometheusSignal(SilSignal):
    """Signal that pulls energy usage data from a Prometheus instance.

    Args:
        prometheus_url: Base URL of the Prometheus server (e.g., 'http://localhost:9090')
        query: PromQL query to fetch energy usage data
        update_interval: Interval in seconds between metric updates
        timeout: Request timeout in seconds
        username: Username for HTTP Basic Authentication (optional)
        password: Password for HTTP Basic Authentication (optional)
    """

    def __init__(
        self,
        prometheus_url: str,
        query: str,
        username: Optional[str] = None,
        password: Optional[str] = None,
        update_interval: float = 10,
        timeout: float = 10,
    ):
        try:
            import requests
            import requests.auth
        except ImportError:
            raise ImportError(
                "PrometheusSignal requires 'requests' package. Install with: pip install requests"
            )

        self.requests = requests
        self.prometheus_url = prometheus_url.rstrip("/")
        self.query = query
        self.username = username
        self.password = password

        # Set up authentication if provided
        self._auth = None
        if username and password:
            self._auth = requests.auth.HTTPBasicAuth(username, password)

        # Initialize parent class (starts background polling)
        super().__init__(update_interval=update_interval, timeout=timeout)

        self._validate_connection()

    def _validate_connection(self) -> None:
        """Validate that we can connect to the Prometheus server."""
        try:
            response = self.requests.get(
                f"{self.prometheus_url}/api/v1/query",
                params={"query": "up"},
                timeout=self.timeout,
                auth=self._auth,
            )
            response.raise_for_status()
        except self.requests.exceptions.ConnectionError:
            raise ConnectionError(
                f"Could not connect to Prometheus at '{self.prometheus_url}'. "
                f"Make sure the Prometheus server is running and accessible."
            ) from None

    def _fetch_current_value(self) -> float:
        """Fetch the current value from Prometheus."""
        response = self.requests.get(
            f"{self.prometheus_url}/api/v1/query",
            params={"query": self.query},
            timeout=self.timeout,
            auth=self._auth,
        )
        response.raise_for_status()

        data = response.json()
        if data["status"] != "success":
            raise ValueError(f"Prometheus query failed: {data}")

        results = data["data"]["result"]
        if not results:
            raise ValueError(f"No data returned for query: {self.query}")

        # Get the value from the first result
        return float(results[0]["value"][1])

    def __repr__(self):
        """Returns a string representation for the Vessim viewer."""
        return f"PrometheusSignal({self.query})"

WatttimeSignal

Bases: SilSignal

Real-time carbon intensity signal from WattTime API.

This signal fetches real-time marginal carbon intensity data from the WattTime API. It requires username and password. If the login fails (e.g., user doesn't exist), it will prompt the user to confirm auto-registration and request an email address.

Parameters:

Name Type Description Default
username str

WattTime API username

required
password str

WattTime API password

required
region Optional[str]

Grid region (balancing authority) code, e.g., 'CAISO_NORTH'. Must be provided if location is not specified.

None
location Optional[tuple[float, float]]

Tuple of (latitude, longitude) coordinates to automatically determine region. Alternative to specifying region directly.

None
base_url str

Base URL for WattTime API, defaults to 'https://api.watttime.org'

'https://api.watttime.org'
update_interval float

Interval in seconds between API calls (default: 300 seconds as WattTime updates every 5 minutes)

300
timeout float

Request timeout in seconds

10
Source code in vessim/signal.py
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
class WatttimeSignal(SilSignal):
    """Real-time carbon intensity signal from WattTime API.

    This signal fetches real-time marginal carbon intensity data from the WattTime API.
    It requires username and password. If the login fails (e.g., user doesn't exist),
    it will prompt the user to confirm auto-registration and request an email address.

    Args:
        username: WattTime API username
        password: WattTime API password
        region: Grid region (balancing authority) code, e.g., 'CAISO_NORTH'.
            Must be provided if location is not specified.
        location: Tuple of (latitude, longitude) coordinates to automatically determine region.
            Alternative to specifying region directly.
        base_url: Base URL for WattTime API, defaults to 'https://api.watttime.org'
        update_interval: Interval in seconds between API calls (default: 300 seconds as
            WattTime updates every 5 minutes)
        timeout: Request timeout in seconds
    """

    def __init__(
        self,
        username: str,
        password: str,
        region: Optional[str] = None,
        location: Optional[tuple[float, float]] = None,
        base_url: str = "https://api.watttime.org",
        update_interval: float = 300,
        timeout: float = 10,
    ) -> None:
        try:
            import requests
            from requests.auth import HTTPBasicAuth
        except ImportError:
            raise ImportError(
                "WatttimeSignal requires 'requests' package. "
                "Install with: pip install 'vessim[sil]'"
            )

        # Validate that not both region and location are provided
        if region is not None and location is not None:
            raise ValueError("Cannot provide both 'region' and 'location'.")
        elif region is None and location is None:
            raise ValueError("Either 'region' or 'location' must be provided.")

        self._requests = requests
        self._auth = HTTPBasicAuth(username, password)
        self._username = username
        self._password = password
        self._base_url = base_url
        self._token: Optional[str] = None
        self._token_expires: Optional[float] = None

        # Try to get initial token (will auto-register if needed)
        self._get_token()

        # Determine region from coordinates if location is provided
        if location is not None:
            self._region = self._get_region_from_location(location)
        else:
            # region is guaranteed to be not None due to validation above
            assert region is not None
            self._region = region

        # Initialize parent class (starts background polling)
        super().__init__(update_interval=update_interval, timeout=timeout)

    def __repr__(self):
        """Returns a string representation for the Vessim viewer."""
        return f"WatttimeSignal(region={self._region})"

    def _get_token(self) -> str:
        """Obtain or refresh authentication token, auto-registering if needed."""
        current_time = time.time()

        # Check if token is still valid (expires after 30 minutes)
        if self._token and self._token_expires and current_time < self._token_expires:
            return self._token

        # Try to get new token
        login_url = f"{self._base_url}/login"
        try:
            response = self._requests.get(login_url, auth=self._auth)
            response.raise_for_status()

            self._token = response.json()["token"]
            # Token expires in 30 minutes, refresh 5 minutes early
            self._token_expires = current_time + (25 * 60)

            return self._token

        except self._requests.HTTPError as e:
            if e.response.status_code == 403:
                # Login failed, try to register
                self._register_user()
                # Retry login after registration
                response = self._requests.get(login_url, auth=self._auth)
                response.raise_for_status()

                self._token = response.json()["token"]
                self._token_expires = current_time + (25 * 60)

                return self._token
            else:
                # Re-raise other HTTP errors
                raise

    def _register_user(self) -> None:
        """Register a new user account with interactive email prompt."""
        print(f"\nUser '{self._username}' not found in WattTime API.")

        # Ask user for confirmation
        confirm = (
            input("Would you like to register a new WattTime account? (y/n): ").strip().lower()
        )
        if confirm not in ["y", "yes"]:
            raise RuntimeError("Registration cancelled by user")

        # Ask for email address
        email = input("Please enter your email address for registration: ").strip()
        if not email or "@" not in email:
            raise ValueError("Valid email address is required for registration")

        print(f"Registering new WattTime account for '{self._username}'...")

        register_url = f"{self._base_url}/register"

        registration_data = {
            "username": self._username,
            "password": self._password,
            "email": email,
        }

        response = self._requests.post(register_url, json=registration_data)
        response.raise_for_status()

        print("✓ Registration successful!")

    def _get_region_from_location(self, location: tuple[float, float]) -> str:
        """Get region code from latitude/longitude coordinates."""
        region_url = f"{self._base_url}/v3/region-from-loc"
        headers = {"Authorization": f"Bearer {self._get_token()}"}
        params = {
            "latitude": str(location[0]),
            "longitude": str(location[1]),
            "signal_type": "co2_moer",
        }

        response = self._requests.get(region_url, headers=headers, params=params)
        response.raise_for_status()

        data = response.json()
        region = data.get("region")
        if not region:
            raise ValueError(f"No region found for coordinates ({location})")

        print(f"Detected region '{region}' for coordinates ({location})")
        return region

    def _fetch_current_value(self) -> float:
        """Fetch current carbon intensity from WattTime API.

        Returns:
            Current marginal carbon intensity in lbs CO2/MWh

        Raises:
            requests.HTTPError: If API request fails
            KeyError: If expected data is not in API response
        """
        token = self._get_token()

        # Get current carbon intensity
        index_url = f"{self._base_url}/v3/signal-index"
        headers = {"Authorization": f"Bearer {token}"}
        params = {"region": self._region, "signal_type": "co2_moer"}

        response = self._requests.get(
            index_url, headers=headers, params=params, timeout=self.timeout
        )
        response.raise_for_status()

        data = response.json()
        return data["data"][0]["value"]