"""
Module for manipulating dates
"""
import datetime
from calendar import monthrange
from datetime import date, datetime, timedelta
from typing import Union
[docs]def round_time(dt:datetime = None, roundTo:timedelta = timedelta(minutes=1), roundType:str = 'nearest') -> datetime:
"""Round a time to a nearest common format
Parameters
----------
dt : datetime.datetime object
Datetime one wish to round
roundTo : datetime.timedelta object (optional)
What to round to. Defaults to datetime.timedelta(minutes=1).
roundType : str
How to round. 'nearest', 'roundup', 'rounddown'
Returns
-------
dt_rounded : datetime.datetime object
Rounded datetime
"""
roundTo = roundTo.total_seconds()
if dt == None : dt = datetime.datetime.now()
seconds = (dt - dt.min).seconds
if roundType=='nearest':
rounding = (seconds+roundTo/2) // roundTo * roundTo
elif roundType=='roundup':
rounding = (seconds+roundTo) // roundTo * roundTo
elif roundType=='rounddown':
rounding = seconds // roundTo * roundTo
return dt + timedelta(0, rounding - seconds, -dt.microsecond)
[docs]def per_delta(start:datetime, end:datetime, delta:timedelta):
"""Generate a list of datetimes within an interval
Parameters
----------
start : datetime.datetime object
Starting date
end : datetime.datetime object
End date
delta : datetime.timedelta object
Increment in time, e.g. 1 hour would be datetime.timedelta(hours=1)
"""
curr = start
while curr < end:
yield curr
curr += delta
[docs]def current_run_datetime(frequency:int=180, delay:int=120, roundto:str='hour', now:Union[datetime, None]=None) -> datetime:
"""Get the time of the current valid analysis
Parameters
----------
frequency : integer (optional)
Frequency of analysis in minutes. Defaults to 180 minutes (3 hours).
delay : integer (optional)
Delay in minutes between now and when analysis becomes available. Defaults to 120 minutes (2 hours)
roundto : string (optional)
What to round to: Available is "hour", "minute". Defaults to "hour"
now : datetime object (optional)
Current time to get run for. Defaults to now.
Returns
-------
nowrounded : datetime.datetime object
Time for the latest current valid analysis
"""
if now is None: now=datetime.utcnow()
if frequency < 60 and roundto=='hour': roundto='minute'
if frequency >= 60 and roundto=='minute': roundto='hour'
if roundto=='hour':
delay = delay/60
frequency = frequency//60
delayed_now_hour = int(now.hour - delay)
if delayed_now_hour<0: now += timedelta(days=delayed_now_hour//24)
nowrounded = now.replace(hour=((delayed_now_hour) - (delayed_now_hour) % frequency) % 24)
elif roundto=='minute' or roundto=='10minute':
if int(now.minute) < delay:
delay+=60
nowrounded = now.replace(minute=((now.minute - delay) - (now.minute - delay) % frequency) % 60)
nowrounded = nowrounded.replace(hour=(nowrounded.hour - (nowrounded.minute)//50) % 24)
# if (now - timedelta(minutes=frequency)).hour != nowrounded.hour:
# nowrounded-=timedelta(hours=0)
if (now - timedelta(minutes=frequency)).strftime('%d') != nowrounded.strftime('%d'):
nowrounded-=timedelta(hours=24)
if roundto=='hour':
nowrounded = round_time(nowrounded, roundTo=timedelta(hours=1), roundType='rounddown')
elif roundto=='minute':
nowrounded = round_time(nowrounded, roundTo=timedelta(minutes=1), roundType='rounddown')
elif roundto=='10minute':
nowrounded = round_time(nowrounded, roundTo=timedelta(minutes=10), roundType='rounddown')
return nowrounded
[docs]def month_delta(d1:datetime, d2:datetime) -> int:
"""Find the difference between dates in months
Parameters
----------
d1 : datetime.datetime object
Date 1
d2 : datetime.datime object
Date 2
Returns
-------
delta : integer
Difference between dates in months
"""
delta = 0
while True:
mdays = monthrange(d1.year, d1.month)[1]
d1 += timedelta(days=mdays)
if d1 <= d2:
delta += 1
else:
break
return delta
[docs]def add_month(date=None):
"""Add one month to date
Parameters
----------
date : datetime.datetime object
Date to add one month to.
Returns
-------
candidate: datetime.datetime object
Candidate date with one month added
"""
# number of days this month
month_days = monthrange(date.year, date.month)[1]
candidate = date + timedelta(days=month_days)
# but maybe we are a month too far
if candidate.day != date.day:
# go to last day of next month,
# by getting one day before begin of candidate month
return candidate.replace(day=1) - timedelta(days=1)
else:
return candidate
[docs]def subtract_one_month(date=None):
"""Subtract one month from date
Parameters
----------
date : datetime.datetime object
Date to subtract one month from.
Returns
-------
dt3: datetime.datetime object
Date after subtracting one month
"""
dt1 = date.replace(day=1)
dt2 = dt1 - timedelta(days=1)
dt3 = dt2.replace(day=1)
return dt3
[docs]def count_days(date1=None, date2=None, includeEnd=False):
"""Count the days between two dates
Parameters
----------
date1 : datetime.datetime object
Date 1
date2 : datetime.datetime object
Date 2
includeEnd : boolean
Whether to include the last day itself. Defaults to False.
Returns
-------
days : integer
Days between two dates.
"""
if includeEnd:
return int((date2-date1).total_seconds()/(60*60*24) + 1)
else:
return int((date2-date1).total_seconds()/(60*60*24))
if __name__=="__main__":
ct = current_run_datetime(frequency=180, delay=120, now=datetime(2020,1,1,12))
ca = datetime(2020,1,1,9)
print(("Success: {} == {}").format(ct, ca)) if ct==ca else print(("Error: {} != {}").format(ct, ca))
ct = current_run_datetime(frequency=180, delay=120, now=datetime(2020,1,1,13))
ca = datetime(2020,1,1,9)
print(("Success: {} == {}").format(ct, ca)) if ct==ca else print(("Error: {} != {}").format(ct, ca))
ct = current_run_datetime(frequency=180, delay=120, now=datetime(2020,1,1,13,10))
ca = datetime(2020,1,1,9)
print(("Success: {} == {}").format(ct, ca)) if ct==ca else print(("Error: {} != {}").format(ct, ca))
ct = current_run_datetime(frequency=180, delay=120, now=datetime(2020,1,1,13,59))
ca = datetime(2020,1,1,9)
print(("Success: {} == {}").format(ct, ca)) if ct==ca else print(("Error: {} != {}").format(ct, ca))
ct = current_run_datetime(frequency=180, delay=120, now=datetime(2020,1,1,14,0))
ca = datetime(2020,1,1,12)
print(("Success: {} == {}").format(ct, ca)) if ct==ca else print(("Error: {} != {}").format(ct, ca))
ct = current_run_datetime(frequency=180, delay=10, now=datetime(2020,1,1,12,5))
ca = datetime(2020,1,1,9)
print(("Success: {} == {}").format(ct, ca)) if ct==ca else print(("Error: {} != {}").format(ct, ca))
ct = current_run_datetime(frequency=60, delay=10, now=datetime(2020,1,1,12,5))
ca = datetime(2020,1,1,11,0)
print(("Success: {} == {}").format(ct, ca)) if ct==ca else print(("Error: {} != {}").format(ct, ca))
ct = current_run_datetime(frequency=10, delay=10, now=datetime(2020,1,1,12,5))
ca = datetime(2020,1,1,11,50)
print(("Success: {} == {}").format(ct, ca)) if ct==ca else print(("Error: {} != {}").format(ct, ca))
ct = current_run_datetime(frequency=10, delay=10, now=datetime(2020,1,1,12,55), roundto='minute')
ca = datetime(2020,1,1,12,40)
print(("Success: {} == {}").format(ct, ca)) if ct==ca else print(("Error: {} != {}").format(ct, ca))
ct = current_run_datetime(frequency=10, delay=10, now=datetime(2020,1,2,0,5), roundto='minute')
ca = datetime(2020,1,1,23,50)
print(("Success: {} == {}").format(ct, ca)) if ct==ca else print(("Error: {} != {}").format(ct, ca))
ct = current_run_datetime(frequency=10, delay=10, now=datetime(2021,1,1,0,5), roundto='minute')
ca = datetime(2020,12,31,23,50)
print(("Success: {} == {}").format(ct, ca)) if ct==ca else print(("Error: {} != {}").format(ct, ca))
ct = current_run_datetime(frequency=10, delay=20, now=datetime(2020,1,1,12,40), roundto='minute')
ca = datetime(2020,1,1,12,20)
print(("Success: {} == {}").format(ct, ca)) if ct==ca else print(("Error: {} != {}").format(ct, ca))
ct = current_run_datetime(frequency=30, delay=5, now=datetime(2020,1,1,12,40), roundto='minute')
ca = datetime(2020,1,1,12,30)
print(("Success: {} == {}").format(ct, ca)) if ct==ca else print(("Error: {} != {}").format(ct, ca))
ct = current_run_datetime(frequency=60, delay=60, now=datetime(2020,1,1,12,40), roundto='minute')
ca = datetime(2020,1,1,11,0)
print(("Success: {} == {}").format(ct, ca)) if ct==ca else print(("Error: {} != {}").format(ct, ca))