并行下载ERA-5的Python脚本

本文同步发布于公众号『气象学家』传送门。方便下载ERA5的单层(single level)数据以及多层(pressure)数据,使用ECMWF的Python cds API去下载还是很方便和快捷的。目前,ERA5的数据能够提供1979年-Present了,而且是逐小时的数据,对于多层数据能够提供:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
'pressure_level':[
'1','2','3',
'5','7','10',
'20','30','50',
'70','100','125',
'150','175','200',
'225','250','300',
'350','400','450',
'500','550','600',
'650','700','750',
'775','800','825',
'850','875','900',
'925','950','975',
'1000'
]

根据个人的实际需求,可以最多采用4线程来进行数据下载,实测日常下载速度还是比较可观的,在教育网基本都能稳定在4~5MB/S。考虑到数据量的大小,有两点建议:1.方便数据处理,可以定制需要的数据范围以及有限的时间段,导出为netcdf格式;2.并行多线线程下载;

下载速度:教育网网络环境2019.5.17实时

单层grib并行下载

参考链接

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
from queue import Queue
from threading import Thread
import cdsapi
from time import time
import datetime
import os

def downloadonefile(riqi):
ts = time()
filename="/raid03/users/fuzp/SURF/mslp/era5.mslp."+riqi+".grib"
if(os.path.isfile(filename)): #如果存在文件名则返回
print("ok",filename)
else:
print(filename)
c = cdsapi.Client()
c.retrieve(
'reanalysis-era5-single-levels',
{
'product_type' : 'reanalysis',
'format' : 'grib', # Supported format: grib and netcdf. Default: grib
'variable' : 'mean_sea_level_pressure', #其它变量名参见 https://cds.climate.copernicus.eu/cdsapp#!/dataset/reanalysis-era5-single-levels
'year' : riqi[0:4],
'month' : riqi[-4:-2],
'day' : riqi[-2:],
'time':[
'00:00','01:00','02:00',
'03:00','04:00','05:00',
'06:00','07:00','08:00',
'09:00','10:00','11:00',
'12:00','13:00','14:00',
'15:00','16:00','17:00',
'18:00','19:00','20:00',
'21:00','22:00','23:00'
], #<---注意此逗号,不选择区域和分辨率需要去掉!
## 'area' : [60, -10, 50, 2], # North, West, South, East. Default: global
## 'grid' : [1.0, 1.0], # Latitude/longitude grid: east-west (longitude) and north-south resolution (latitude). Default: 0.25 x 0.25
},
filename)



#下载脚本
class DownloadWorker(Thread):
def __init__(self, queue):
Thread.__init__(self)
self.queue = queue

def run(self):
while True:
# 从队列中获取任务并扩展tuple
riqi = self.queue.get()
downloadonefile(riqi)
self.queue.task_done()

#主程序
def main():
#起始时间
ts = time()

#起始日期
begin = datetime.date(1979,1,1)
end = datetime.date(2018,12,31)
d=begin
delta = datetime.timedelta(days=1)

#建立下载日期序列
links = []
while d <= end:
riqi=d.strftime("%Y%m%d")
links.append(str(riqi))
d += delta

#创建一个主进程与工作进程通信
queue = Queue()

# 注意,每个用户同时最多接受4个request https://cds.climate.copernicus.eu/vision
#创建四个工作线程
for x in range(4):
worker = DownloadWorker(queue)
#将daemon设置为True将会使主线程退出,即使所有worker都阻塞了
worker.daemon = True
worker.start()

#将任务以tuple的形式放入队列中
for link in links:
queue.put((link))

#让主线程等待队列完成所有的任务
queue.join()
print('Took {}'.format(time() - ts))

if __name__ == '__main__':
main()

多层grib并行下载

参考链接

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
from queue import Queue
from threading import Thread
import cdsapi
from time import time
import datetime
import os



VARS = ['geopotential','u_component_of_wind','v_component_of_wind','temperature','relative_humidity']


for i in range(len(VARS)):
def downloadonefile(riqi):
ts = time()
filename="/raid03/users/fuzp/PRESSURE/"+str(VARS[i])+"/era5."+str(VARS[i])+"."+riqi+".grib"
if(os.path.isfile(filename)): #如果存在文件名则返回
print("ok",filename)
else:
print(filename)
c = cdsapi.Client()
c.retrieve(
'reanalysis-era5-pressure-levels',
{
'product_type' : 'reanalysis',
'format' : 'grib', # Supported format: grib and netcdf. Default: grib
'variable' : str(VARS[i]), #其它变量名参见 https://cds.climate.copernicus.eu/cdsapp#!/dataset/reanalysis-era5-single-levels
'year' : riqi[0:4],
'month' : riqi[-4:-2],
'day' : riqi[-2:],
'pressure_level':[
'1','2','3',
'5','7','10',
'20','30','50',
'70','100','125',
'150','175','200',
'225','250','300',
'350','400','450',
'500','550','600',
'650','700','750',
'775','800','825',
'850','875','900',
'925','950','975',
'1000'
],
'time':[
'00:00','01:00','02:00',
'03:00','04:00','05:00',
'06:00','07:00','08:00',
'09:00','10:00','11:00',
'12:00','13:00','14:00',
'15:00','16:00','17:00',
'18:00','19:00','20:00',
'21:00','22:00','23:00'
]
## 'area' : [60, -10, 50, 2], # North, West, South, East. Default: global
## 'grid' : [1.0, 1.0], # Latitude/longitude grid: east-west (longitude) and north-south resolution (latitude). Default: 0.25 x 0.25
},
filename)



#下载脚本
class DownloadWorker(Thread):
def __init__(self, queue):
Thread.__init__(self)
self.queue = queue

def run(self):
while True:
# 从队列中获取任务并扩展tuple
riqi = self.queue.get()
downloadonefile(riqi)
self.queue.task_done()

#主程序
def main():
#创建目录
# for i in range(len(VARS)):
# os.chdir('/raid03/users/fuzp/PRESSURE/')
# print(os.getcwd())
# os.mkdir(str(VARS[i]))

# dirs = os.listdir('/raid03/users/fuzp/PRESSURE/')

#起始时间
ts = time()

#起始日期
begin = datetime.date(1979,1,1)
end = datetime.date(2018,12,31)
d=begin
delta = datetime.timedelta(days=1)

#建立下载日期序列
links = []
while d <= end:
riqi=d.strftime("%Y%m%d")
links.append(str(riqi))
d += delta

#创建一个主进程与工作进程通信
queue = Queue()

# 注意,每个用户同时最多接受4个request https://cds.climate.copernicus.eu/vision
#创建四个工作线程
for x in range(4):
worker = DownloadWorker(queue)
#将daemon设置为True将会使主线程退出,即使所有worker都阻塞了
worker.daemon = True
worker.start()

#将任务以tuple的形式放入队列中
for link in links:
queue.put((link))

#让主线程等待队列完成所有的任务
queue.join()
print('Took {}'.format(time() - ts))

if __name__ == '__main__':
main()

常用的下载变量名称

tips:根据需要修改即可,个人建议选定少数level层次,若是不需要逐小时数据可以采用逐3小时或者逐6小时数据。不然,每天数据量单文件可以达到17GB,存储空间太浪费了。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
ERA5 hourly data on single levels from 1979 to present

10m_u_component_of_wind
10m_v_component_of_wind
2m_temperature
convective_precipitation
large_scale_precipitation
mean_top_net_long_wave_radiation_flux_clear_sky
mean_top_net_long_wave_radiation_flux
mean_sea_level_pressure
mean_surface_sensible_heat_flux
mean_surface_latent_heat_flux
orography
surface_latent_heat_flux
total_precipitation


VARS = ['convective_precipitation','large_scale_precipitation','total_precipitation','10m_u_component_of_wind','10m_v_component_of_wind','orography','mean_top_net_long_wave_radiation_flux_clear_sky','mean_top_net_long_wave_radiation_flux','mean_surface_sensible_heat_flux','mean_surface_latent_heat_flux','surface_latent_heat_flux']


ERA5 hourly data on pressure levels from 1979 to present

geopotential
u_component_of_wind
v_component_of_wind
temperature
relative_humidity

运行

python parallel_download_era5_cds.py

另外非并行(多线程)脚本

tips:输出格式为netcdf为每年一个独立文件,适合较小、单层数据;

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
import cdsapi
import os

c = cdsapi.Client()

VARS = ['100m_u_component_of_wind','100m_v_component_of_wind','10m_u_component_of_wind', '10m_v_component_of_wind', '2m_temperature','surface_solar_radiation_downward_clear_sky','surface_solar_radiation_downwards', '2m_dewpoint_temperature','mean_sea_level_pressure']
# VARS = ['2m_temperature']
YEARS = [x for x in map(str, range(1979, 2019))]

for V in VARS:
for Y in YEARS:
target_filename = 'era5-hourly-'+V+'-'+Y+'.nc'
if not os.path.isfile(target_filename):
c.retrieve(
'reanalysis-era5-single-levels',
{
'variable':V,
'product_type':'reanalysis',
'year': Y,
'month':[
'01','02','03',
'04','05','06',
'07','08','09',
'10','11','12'
],
'day':[
'01','02','03',
'04','05','06',
'07','08','09',
'10','11','12',
'13','14','15',
'16','17','18',
'19','20','21',
'22','23','24',
'25','26','27',
'28','29','30',
'31'
],
'time':[
'00:00','01:00','02:00',
'03:00','04:00','05:00',
'06:00','07:00','08:00',
'09:00','10:00','11:00',
'12:00','13:00','14:00',
'15:00','16:00','17:00',
'18:00','19:00','20:00',
'21:00','22:00','23:00'
],
'format':'netcdf'
},
target_filename)

采用命令行参数指令下载

具体参考:GitHub网址

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
#!/usr/bin/env python
# -------------------------------------------------------------------
# - NAME: ERA5.py
# - AUTHOR: Reto Stauffer
# - DATE: 2017-01-05
# -------------------------------------------------------------------
# - DESCRIPTION: ERA5 data downloader (currently for the Alps, check
# lonmin, lonmax, latmin, latmax!
# -------------------------------------------------------------------
# - EDITORIAL: 2017-01-05, RS: Created file on pc24-c707.
# -------------------------------------------------------------------
# - L@ST MODIFIED: 2018-12-15 15:48 on marvin
# -------------------------------------------------------------------

import logging as log
log.basicConfig(format='%(levelname)s: %(message)s', level = log.DEBUG)

# -------------------------------------------------------------------
# Main script part
# -------------------------------------------------------------------
if __name__ == "__main__":

# Using sys.args to control the date which should be processed.
from optparse import OptionParser
import sys
usage = """Usage:

{0:s} --years <years> --parameter <parameter> --level <level>
{0:s} -y <years> -p <parameter> -l <level>

Downloading ERA5 reanalysis data from Copernicus Climate Data
Services. NOTE: Not yet made for ensemble ERA5!

Requires `cdsapi` to be installed (python package to access
Copernicus Climate Data Services, CDS).

How to install cdsapi and the required API key:
* https://cds.climate.copernicus.eu/api-how-to

Available parameters for "single level" (-t/--type sf):
* https://cds.climate.copernicus.eu/cdsapp#!/dataset/reanalysis-era5-pressure-levels?tab=form

Available parameters for "pressure level" (-t/--type pl):
* https://cds.climate.copernicus.eu/cdsapp#!/dataset/reanalysis-era5-pressure-levels?tab=form

Usage examples:

10m wind speed for the year 2018
>>> python {0:s} --years 2018 --parameter 10m_u_component_of_wind

10m wind speed for 2010 and 2018
>>> python {0:s} --years 2010,2018 --parameter 10m_u_component_of_wind

10m wind speed for 2008, 2009, 2010, ..., 2018
>>> python {0:s} --years 2008-2018 --parameter 10m_u_component_of_wind

700hPa geopotential height, 2018
>>> python {0:s} --years 2018 --parameter geopotential --level 700

""".format(__file__)
parser = OptionParser(usage = usage)
parser.add_option("-y", "--years", dest = "years", type = "str",
help="years for which the data should be downloaded. " + \
"Can either be a single year (e.g., 2010), a comma separated list " + \
"(e.g., 2010,2011), or a sequence of the form <bgn>-<end> " + \
"(e.g., 2010-2018; 2010 to 2018).")
parser.add_option("-p", "--parameter", dest = "param", type = "str",
help="the parameter/variable to be downloaded. Please " + \
"check the cds website to see what's availabe.")
parser.add_option("-l", "--level", dest = "level", type = "int", default = None,
help="level, only for pressure level data. For \"single level data\" " + \
"(e.g., surface level variables) do not set this parameter!")
parser.add_option("-t", "--test", dest = "test", default = False, action = "store_true",
help="development flag. If set, only one day (January 1th of the first " + \
"year) will be downloaded before the script stops. " + \
"The output file will be labeled with \"_test_\".")
(options,args) = parser.parse_args()


# Subsetting, currently hardcoded/fixed
lonmin = 2.0
lonmax = 20.0
latmin = 40.0
latmax = 52.0
the_subset = "/".join(["{:.2f}".format(x) for x in [latmax, lonmin, latmin, lonmax]])

# Output directory
datadir = "era5_data"

# Missing date?
import re
from numpy import min, max, arange, unique, sort
if options.years == None or options.param == None:
parser.print_help(); sys.exit(9)
else:
if re.match("^[0-9]{4}$", options.years):
options.years = [int(options.years)]
elif re.match("^[0-9,]+$", options.years):
options.years = [int(x) for x in options.years.split(",")]
elif re.match("^[0-9]{4}-[0-9]{4}$", options.years):
options.years = [int(x) for x in options.years.split("-")]
options.years = list(arange(min(options.years), max(options.years)+1))
else:
parser.print_help(); log.info("\n\n")
raise ValueError("Wrong format for -y/--years")

# Sort and unique
options.years = list(sort(unique(options.years)))
options.years.reverse()

# Minimum year
if min(options.years) < 1950:
raise ValueError("input years is definitively wrong as before 1950!")

# Quick output
log.info("\n{:s}".format("".join(["-"]*60)))
log.info("Processing data for years: {:s}".format(", ".join([str(x) for x in options.years])))
log.info("Parameter: {:s}".format(options.param))
log.info("Subset: {:s}\n".format(the_subset))
if options.level is None:
log.info("Single level parameter")
else:
log.info("Vertical level: {:d} hPa".format(options.level))
if options.test:
log.info("TEST MODE ACTIVE: Only one day will be downloaded!")
log.info("{:s}\n".format("".join(["-"]*60)))

# -------------------------------------------------------------
# -------------------------------------------------------------
# -------------------------------------------------------------
# -------------------------------------------------------------

import sys, os
import datetime as dt
import subprocess as sub
from cdsapi import Client
from datetime import date
from numpy import arange

# Output directory
tmp = "" if options.level is None else "_{:d}hPa".format(options.level)
paramdir = os.path.join(datadir, "{:s}{:s}".format(options.param,tmp))
if not os.path.isdir(paramdir):
try:
os.makedirs(paramdir)
except:
raise Exception("not able to create directory \"{;s}\"".format(datadir))

# range(2017,2018) downloads data for 2017
for year in options.years:

if year > int(date.today().strftime("%Y")): continue
log.info("[!] Processing year {:04d}".format(year))

# ----------------------------------------------------------------
# SURFACE DATA
# ----------------------------------------------------------------
tmp = "" if options.level is None else "_{:d}hPa".format(options.level)
# File name without suffix
tmp = "ERA5_{:04d}{:s}_{:s}{:s}".format(year, \
"_test" if options.test else "", options.param, tmp)
# Create the different file names we need
ncfile = os.path.join(paramdir, "{:s}.nc".format(tmp))

# If netcdf file exists: skip
if os.path.isfile(ncfile):
log.info("Output file \"{:s}\" exists, proceed with next ...".format(ncfile))
else:

# Request
log.info("Downloading: {:s}".format(ncfile))
args = {"product_type" : "reanalysis",
"format" : "netcdf",
"area" : the_subset,
"variable" : options.param,
"year" : "{:04d}".format(year),
"month" : ["{:02d}".format(x) for x in range(1,13)],
"day" : ["{:02d}".format(x) for x in range(1,32)],
"time" : ["{:02d}:00".format(x) for x in range(0,24)]}
# Level
if not options.level == None:
args["pressure_level"] = "{:d}".format(options.level)

# Test mode?
if options.test:
args["day"] = "01"
args["month"] = "01"

for key,val in args.items():
if isinstance(val, str):
log.info("{:20s} {:s}".format(key, val))
else:
log.info("{:20s} {:s}".format(key, ", ".join(list(val))[0:40]))

try:
server = Client()
if options.level:
cdstype = "reanalysis-era5-pressure-levels"
else:
cdstype = "reanalysis-era5-single-levels"
# Downloading data
server.retrieve(cdstype, args, ncfile)
log.info("[+] Everything seems to be fine for {0}".format(ncfile))

except Exception as e:
log.info(e)
log.info("[!] PROBLEMS DETECTED")
log.info(" cdsapi returned error code != 0")
log.info(" for file: {0}".format(ncfile))
if options.test: log.info("\nTEST MODE: Stop here.\n"); sys.exit(0)
continue

if options.test: log.info("\nTEST MODE: Stop here.\n"); sys.exit(0)

总结

基本上述代码1、2能够满足日常需求,后面的是根据不同需求,在官网templates code scripts 基础上造出的轮子,用起来,就好。

参考:

1.网址1
2.网址2
3.网址3
4.网址4

有任何问题都欢迎交流探讨,共同学习进步!


-------------本文结束 感谢阅读-------------
作者Gavin
有问题请在相应页面留言(评论系统DISQUS需要"翻墙"才可见)
或者私信给我 GMAIL: zhpfu.atm@gmail.com
满分是10分的话,这篇文章你给几分
--> -->