python - Upload PySpark RDD into BigQuery -
i download table bq pyspark rdd below. how upload again?
dgsconfig = { 'project_id': "project_id", 'bucket': "bucket_id" } dbqconfig = { 'gs_config': dgsconfig, 'project_id': "project_id", 'dataset_id': "dataset_id", 'table_id': "table_id" } osc = instantiate_pyspark() rdddata, lsheadings = get_table_cloud(osc, dbqconfig) #rdddata has list-of-lists type format def instantiate_pyspark(): """instantiate pyspark rdd stuff""" import pyspark osc = pyspark.sparkcontext() ohadoopconf = osc._jsc.hadoopconfiguration() ohadoopconf.get("fs.gs.system.bucket") return osc def get_table_cloud(osc, dbqconfig): """get table bigquery via google cloud storage config format: dgsconfig = {'project_id': '', 'bucket': ''} dbqconfig = {'project_id: '', 'dataset_id': '', 'table_id': ''} """ dgsconfig = dbqconfig['gs_config'] dconf = { "mapred.bq.project.id": dgsconfig['project_id'], "mapred.bq.gcs.bucket": dgsconfig['bucket'], "mapred.bq.input.project.id": dbqconfig['project_id'], "mapred.bq.input.dataset.id":dbqconfig['dataset_id'], "mapred.bq.input.table.id": dbqconfig['table_id'] } rdddatasetraw = osc.newapihadooprdd( "com.google.cloud.hadoop.io.bigquery.jsontextbigqueryinputformat", "org.apache.hadoop.io.longwritable", "com.google.gson.jsonobject", conf=dconf ) import json lsheadings = json.loads(rdddatasetraw.take(1)[0][1]).keys() rdddataset = ( rdddatasetraw .map(lambda t, json=json: json.loads(t[1]).values() ) ) return rdddataset, lsheadings
you export intermediate files , load files bigquery.
this might help: how export table dataframe in pyspark csv?
Comments
Post a Comment