python - Upload PySpark RDD into BigQuery -


i download table bq pyspark rdd below. how upload again?

dgsconfig = {     'project_id': "project_id",     'bucket': "bucket_id" } dbqconfig = {     'gs_config': dgsconfig,     'project_id': "project_id",     'dataset_id': "dataset_id",     'table_id': "table_id" }  osc = instantiate_pyspark() rdddata, lsheadings = get_table_cloud(osc, dbqconfig)  #rdddata has list-of-lists type format     def instantiate_pyspark():     """instantiate pyspark rdd stuff"""     import pyspark      osc = pyspark.sparkcontext()     ohadoopconf = osc._jsc.hadoopconfiguration()     ohadoopconf.get("fs.gs.system.bucket")      return osc   def get_table_cloud(osc, dbqconfig):     """get table bigquery via google cloud storage     config format:         dgsconfig = {'project_id': '', 'bucket':  ''}         dbqconfig = {'project_id: '', 'dataset_id': '', 'table_id': ''}     """     dgsconfig = dbqconfig['gs_config']      dconf = {         "mapred.bq.project.id": dgsconfig['project_id'],         "mapred.bq.gcs.bucket": dgsconfig['bucket'],         "mapred.bq.input.project.id": dbqconfig['project_id'],         "mapred.bq.input.dataset.id":dbqconfig['dataset_id'],         "mapred.bq.input.table.id": dbqconfig['table_id']     }      rdddatasetraw = osc.newapihadooprdd(         "com.google.cloud.hadoop.io.bigquery.jsontextbigqueryinputformat",         "org.apache.hadoop.io.longwritable",         "com.google.gson.jsonobject",         conf=dconf     )      import json     lsheadings = json.loads(rdddatasetraw.take(1)[0][1]).keys()      rdddataset = (         rdddatasetraw         .map(lambda t, json=json: json.loads(t[1]).values() )     )      return rdddataset, lsheadings 

you export intermediate files , load files bigquery.

this might help: how export table dataframe in pyspark csv?


Comments

Popular posts from this blog

html - Styling progress bar with inline style -

java - Oracle Sql developer error: could not install some modules -

How to use autoclose brackets in Jupyter notebook? -