Skip to content

chronulus.estimator.normalized_forecaster

NormalizedForecaster

Bases: Estimator

A forecasting estimator that normalizes input data for time series predictions.

This estimator handles the creation, queuing, and retrieval of normalized time series forecasts through the API. It supports various time horizons and can generate both numerical predictions and explanatory notes.

Parameters:

Name Type Description Default
session Session

Active session instance for API communication.

required
input_type Type[BaseModelSubclass]

Pydantic model class that defines the expected input data structure.

required

Attributes:

Name Type Description
estimator_name str

Name identifier for the estimator. Set to "NormalizedForecaster".

estimator_version str

Version string for the estimator. Set to "1".

prediction_version str

Version string for the prediction. Set to "1".

estimator_id str or None

Unique identifier assigned by the API after creation.

Source code in src/chronulus/estimator/normalized_forecaster.py
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
class NormalizedForecaster(Estimator):
    """
   A forecasting estimator that normalizes input data for time series predictions.

   This estimator handles the creation, queuing, and retrieval of normalized time series
   forecasts through the API. It supports various time horizons and can generate both
   numerical predictions and explanatory notes.

   Parameters
   ----------
   session : Session
       Active session instance for API communication.
   input_type : Type[BaseModelSubclass]
       Pydantic model class that defines the expected input data structure.

   Attributes
   ----------
   estimator_name : str
       Name identifier for the estimator. Set to "NormalizedForecaster".
   estimator_version : str
       Version string for the estimator. Set to "1".
   prediction_version : str
       Version string for the prediction. Set to "1".
   estimator_id : str or None
       Unique identifier assigned by the API after creation.

   """

    estimator_name = "NormalizedForecaster"
    estimator_version = "1"
    prediction_version = "1"

    def __init__(self, session: Session, input_type: Type[BaseModelSubclass]):
        super().__init__(session, input_type)
        self.create()

    def create(self):
        """
        Initialize the forecaster instance with the API.

        Creates a new forecaster instance on the API side with the specified input schema.
        The schema is serialized and base64 encoded before transmission.

        Raises
        ------
        ValueError
            If the API fails to create the estimator or returns an invalid response.
        """

        fields = pickle.dumps(self.input_type.model_fields)
        fields_b64 = base64.b64encode(fields).decode()

        request_data = EstimatorCreationRequest(
            estimator_name=self.estimator_name,
            session_id=self.session.session_id,
            input_item_schema_b64=fields_b64,
        )

        resp = requests.post(
            url=f"{self.session.env.API_URI}/estimators/{self.get_route_prefix()}/create",
            headers=self.session.headers,
            json=request_data.model_dump()
        )

        response_json = resp.json()

        if 'estimator_id' in response_json:
            self.estimator_id = response_json['estimator_id']
            print(f"Estimator created with estimator_id: {response_json['estimator_id']}")
        else:
            print(resp.status_code)
            print(resp.text)
            raise ValueError("There was an error creating the estimator. Please try again.")

    def queue(
            self,
            item: BaseModelSubclass,
            start_dt: datetime,
            weeks: int = None,
            days: int = None,
            hours: int = None,
            note_length: Tuple[int, int] = (3, 5),
    ):
        """
        Queue a prediction request for processing.

        Parameters
        ----------
        item : BaseModelSubclass
            The input data conforming to the specified input_type schema.
        start_dt : datetime
            The starting datetime for the forecast.
        weeks : int, optional
            Number of weeks to forecast.
        days : int, optional
            Number of days to forecast.
        hours : int, optional
            Number of hours to forecast.
        note_length : tuple[int, int], optional
            Desired length range (number of sentences) for explanatory notes (min, max), by default (3, 5).

        Returns
        -------
        QueuePredictionResponse
            Response object containing the request status and ID.

        Raises
        ------
        TypeError
            If the provided item doesn't match the expected input_type.
        """

        if not isinstance(item, self.input_type):
            raise TypeError(f"Expect item to be an instance of {self.input_type}, but item has type {type(item)}")

        data = dict(
            estimator_id=self.estimator_id,
            item_data=item.model_dump(),
            start_dt=start_dt.timestamp(),
            weeks=weeks,
            days=days,
            hours=hours,
            note_length=note_length,
        )
        resp = requests.post(
            url=f"{self.session.env.API_URI}/estimators/{self.get_route_prefix()}/queue-predict",
            headers=self.session.headers,
            json=data,
        )

        if resp.status_code == 200:
            return QueuePredictionResponse(**resp.json())
        else:
            return QueuePredictionResponse(
                success=False,
                request_id='',
                message=f'Queuing failed with status code {resp.status_code}: {resp.text}',
            )

    def get_predictions(self, request_id: str, try_every: int = 3, max_tries: int = 20):
        """
        Retrieve predictions for a queued request.

        Parameters
        ----------
        request_id : str
            The ID of the queued prediction request.
        try_every : int, optional
            Seconds to wait between retry attempts, by default 3.
        max_tries : int, optional
            Maximum number of retry attempts, by default 20.

        Returns
        -------
        list[Forecast] or dict
            List of Forecast objects if successful, or error dictionary if failed.

        Raises
        ------
        Exception
            If the maximum retry limit is exceeded or if an API error occurs.
        """

        retries = 0

        while retries < max_tries:

            resp = requests.post(
                url=f"{self.session.env.API_URI}/predictions/{self.prediction_version}/check-by-request-id",
                headers=self.session.headers,
                json=dict(request_id=request_id),
            )

            if resp.status_code != 200:
                print(resp)
                raise Exception(f"An error occurred")

            else:
                response_json = resp.json()

                if response_json['status'] == 'ERROR':
                    return response_json

                if response_json['status'] == 'SUCCESS':
                    print(f'{response_json["status"]}. {response_json["message"]}. Fetching predictions.')
                    prediction_ids = response_json.get('prediction_ids', [])
                    return [self.get_prediction(prediction_id) for prediction_id in prediction_ids]

                if response_json['status'] in ['PENDING', 'NOT_FOUND']:
                    print(f'{response_json["status"]}. {response_json["message"]}. Trying again in {try_every} seconds...')
                    time.sleep(try_every)

                retries += 1

        if retries >= max_tries:
            raise Exception(f"Retry limit exceeded max_tries of {max_tries}")

    def get_prediction(self, prediction_id: str) -> Optional[Forecast]:

        """
        Retrieve a single prediction by its ID.

        Parameters
        ----------
        prediction_id : str
            Unique identifier for the prediction.

        Returns
        -------
        Forecast or None
            Forecast object containing the forecast results and notes if successful,
            None if the prediction couldn't be retrieved.
        """

        resp = requests.post(
            url=f"{self.session.env.API_URI}/predictions/{self.prediction_version}/get-by-prediction-id",
            headers=self.session.headers,
            json=dict(prediction_id=prediction_id),
        )

        response_json = resp.json()

        if resp.status_code == 200 and response_json['success']:
            estimator_response = response_json['response']
            prediction = Forecast(
                _id=prediction_id,
                text=estimator_response['notes'],
                data=estimator_response['json_split_format_dict'],
            )

            return prediction

        else:
            return None

    def predict(
            self,
            item: BaseModelSubclass,
            start_dt: datetime = None,
            weeks: int = None,
            days: int = None,
            hours: int = None,
            note_length: Tuple[int, int] = (3, 5),
       ) -> Forecast:
        """
        Convenience method to queue and retrieve predictions in a single call.

        This method combines the queue and get_predictions steps into a single operation,
        waiting for the prediction to complete before returning.

        Parameters
        ----------
        item : BaseModelSubclass
            The input data conforming to the specified input_type schema.
        start_dt : datetime, optional
            The starting datetime for the forecast.
        weeks : int, optional
            Number of weeks to forecast.
        days : int, optional
            Number of days to forecast.
        hours : int, optional
            Number of hours to forecast.
        note_length : tuple[int, int], optional
            Desired length range for explanatory notes (min, max), by default (3, 5).

        Returns
        -------
        Prediction or None
            The completed prediction if successful, None otherwise.
        """
        req = self.queue(item, start_dt, weeks, days, hours, note_length)
        predictions = self.get_predictions(req['request_id'])
        return predictions[0] if predictions else None

create()

Initialize the forecaster instance with the API.

Creates a new forecaster instance on the API side with the specified input schema. The schema is serialized and base64 encoded before transmission.

Raises:

Type Description
ValueError

If the API fails to create the estimator or returns an invalid response.

Source code in src/chronulus/estimator/normalized_forecaster.py
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
def create(self):
    """
    Initialize the forecaster instance with the API.

    Creates a new forecaster instance on the API side with the specified input schema.
    The schema is serialized and base64 encoded before transmission.

    Raises
    ------
    ValueError
        If the API fails to create the estimator or returns an invalid response.
    """

    fields = pickle.dumps(self.input_type.model_fields)
    fields_b64 = base64.b64encode(fields).decode()

    request_data = EstimatorCreationRequest(
        estimator_name=self.estimator_name,
        session_id=self.session.session_id,
        input_item_schema_b64=fields_b64,
    )

    resp = requests.post(
        url=f"{self.session.env.API_URI}/estimators/{self.get_route_prefix()}/create",
        headers=self.session.headers,
        json=request_data.model_dump()
    )

    response_json = resp.json()

    if 'estimator_id' in response_json:
        self.estimator_id = response_json['estimator_id']
        print(f"Estimator created with estimator_id: {response_json['estimator_id']}")
    else:
        print(resp.status_code)
        print(resp.text)
        raise ValueError("There was an error creating the estimator. Please try again.")

get_prediction(prediction_id)

Retrieve a single prediction by its ID.

Parameters:

Name Type Description Default
prediction_id str

Unique identifier for the prediction.

required

Returns:

Type Description
Forecast or None

Forecast object containing the forecast results and notes if successful, None if the prediction couldn't be retrieved.

Source code in src/chronulus/estimator/normalized_forecaster.py
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
def get_prediction(self, prediction_id: str) -> Optional[Forecast]:

    """
    Retrieve a single prediction by its ID.

    Parameters
    ----------
    prediction_id : str
        Unique identifier for the prediction.

    Returns
    -------
    Forecast or None
        Forecast object containing the forecast results and notes if successful,
        None if the prediction couldn't be retrieved.
    """

    resp = requests.post(
        url=f"{self.session.env.API_URI}/predictions/{self.prediction_version}/get-by-prediction-id",
        headers=self.session.headers,
        json=dict(prediction_id=prediction_id),
    )

    response_json = resp.json()

    if resp.status_code == 200 and response_json['success']:
        estimator_response = response_json['response']
        prediction = Forecast(
            _id=prediction_id,
            text=estimator_response['notes'],
            data=estimator_response['json_split_format_dict'],
        )

        return prediction

    else:
        return None

get_predictions(request_id, try_every=3, max_tries=20)

Retrieve predictions for a queued request.

Parameters:

Name Type Description Default
request_id str

The ID of the queued prediction request.

required
try_every int

Seconds to wait between retry attempts, by default 3.

3
max_tries int

Maximum number of retry attempts, by default 20.

20

Returns:

Type Description
list[Forecast] or dict

List of Forecast objects if successful, or error dictionary if failed.

Raises:

Type Description
Exception

If the maximum retry limit is exceeded or if an API error occurs.

Source code in src/chronulus/estimator/normalized_forecaster.py
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
def get_predictions(self, request_id: str, try_every: int = 3, max_tries: int = 20):
    """
    Retrieve predictions for a queued request.

    Parameters
    ----------
    request_id : str
        The ID of the queued prediction request.
    try_every : int, optional
        Seconds to wait between retry attempts, by default 3.
    max_tries : int, optional
        Maximum number of retry attempts, by default 20.

    Returns
    -------
    list[Forecast] or dict
        List of Forecast objects if successful, or error dictionary if failed.

    Raises
    ------
    Exception
        If the maximum retry limit is exceeded or if an API error occurs.
    """

    retries = 0

    while retries < max_tries:

        resp = requests.post(
            url=f"{self.session.env.API_URI}/predictions/{self.prediction_version}/check-by-request-id",
            headers=self.session.headers,
            json=dict(request_id=request_id),
        )

        if resp.status_code != 200:
            print(resp)
            raise Exception(f"An error occurred")

        else:
            response_json = resp.json()

            if response_json['status'] == 'ERROR':
                return response_json

            if response_json['status'] == 'SUCCESS':
                print(f'{response_json["status"]}. {response_json["message"]}. Fetching predictions.')
                prediction_ids = response_json.get('prediction_ids', [])
                return [self.get_prediction(prediction_id) for prediction_id in prediction_ids]

            if response_json['status'] in ['PENDING', 'NOT_FOUND']:
                print(f'{response_json["status"]}. {response_json["message"]}. Trying again in {try_every} seconds...')
                time.sleep(try_every)

            retries += 1

    if retries >= max_tries:
        raise Exception(f"Retry limit exceeded max_tries of {max_tries}")

predict(item, start_dt=None, weeks=None, days=None, hours=None, note_length=(3, 5))

Convenience method to queue and retrieve predictions in a single call.

This method combines the queue and get_predictions steps into a single operation, waiting for the prediction to complete before returning.

Parameters:

Name Type Description Default
item BaseModelSubclass

The input data conforming to the specified input_type schema.

required
start_dt datetime

The starting datetime for the forecast.

None
weeks int

Number of weeks to forecast.

None
days int

Number of days to forecast.

None
hours int

Number of hours to forecast.

None
note_length tuple[int, int]

Desired length range for explanatory notes (min, max), by default (3, 5).

(3, 5)

Returns:

Type Description
Prediction or None

The completed prediction if successful, None otherwise.

Source code in src/chronulus/estimator/normalized_forecaster.py
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
def predict(
        self,
        item: BaseModelSubclass,
        start_dt: datetime = None,
        weeks: int = None,
        days: int = None,
        hours: int = None,
        note_length: Tuple[int, int] = (3, 5),
   ) -> Forecast:
    """
    Convenience method to queue and retrieve predictions in a single call.

    This method combines the queue and get_predictions steps into a single operation,
    waiting for the prediction to complete before returning.

    Parameters
    ----------
    item : BaseModelSubclass
        The input data conforming to the specified input_type schema.
    start_dt : datetime, optional
        The starting datetime for the forecast.
    weeks : int, optional
        Number of weeks to forecast.
    days : int, optional
        Number of days to forecast.
    hours : int, optional
        Number of hours to forecast.
    note_length : tuple[int, int], optional
        Desired length range for explanatory notes (min, max), by default (3, 5).

    Returns
    -------
    Prediction or None
        The completed prediction if successful, None otherwise.
    """
    req = self.queue(item, start_dt, weeks, days, hours, note_length)
    predictions = self.get_predictions(req['request_id'])
    return predictions[0] if predictions else None

queue(item, start_dt, weeks=None, days=None, hours=None, note_length=(3, 5))

Queue a prediction request for processing.

Parameters:

Name Type Description Default
item BaseModelSubclass

The input data conforming to the specified input_type schema.

required
start_dt datetime

The starting datetime for the forecast.

required
weeks int

Number of weeks to forecast.

None
days int

Number of days to forecast.

None
hours int

Number of hours to forecast.

None
note_length tuple[int, int]

Desired length range (number of sentences) for explanatory notes (min, max), by default (3, 5).

(3, 5)

Returns:

Type Description
QueuePredictionResponse

Response object containing the request status and ID.

Raises:

Type Description
TypeError

If the provided item doesn't match the expected input_type.

Source code in src/chronulus/estimator/normalized_forecaster.py
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
def queue(
        self,
        item: BaseModelSubclass,
        start_dt: datetime,
        weeks: int = None,
        days: int = None,
        hours: int = None,
        note_length: Tuple[int, int] = (3, 5),
):
    """
    Queue a prediction request for processing.

    Parameters
    ----------
    item : BaseModelSubclass
        The input data conforming to the specified input_type schema.
    start_dt : datetime
        The starting datetime for the forecast.
    weeks : int, optional
        Number of weeks to forecast.
    days : int, optional
        Number of days to forecast.
    hours : int, optional
        Number of hours to forecast.
    note_length : tuple[int, int], optional
        Desired length range (number of sentences) for explanatory notes (min, max), by default (3, 5).

    Returns
    -------
    QueuePredictionResponse
        Response object containing the request status and ID.

    Raises
    ------
    TypeError
        If the provided item doesn't match the expected input_type.
    """

    if not isinstance(item, self.input_type):
        raise TypeError(f"Expect item to be an instance of {self.input_type}, but item has type {type(item)}")

    data = dict(
        estimator_id=self.estimator_id,
        item_data=item.model_dump(),
        start_dt=start_dt.timestamp(),
        weeks=weeks,
        days=days,
        hours=hours,
        note_length=note_length,
    )
    resp = requests.post(
        url=f"{self.session.env.API_URI}/estimators/{self.get_route_prefix()}/queue-predict",
        headers=self.session.headers,
        json=data,
    )

    if resp.status_code == 200:
        return QueuePredictionResponse(**resp.json())
    else:
        return QueuePredictionResponse(
            success=False,
            request_id='',
            message=f'Queuing failed with status code {resp.status_code}: {resp.text}',
        )