Spark로 CSV 파일로드
저는 Spark를 처음 사용하고 Spark를 사용하여 파일에서 CSV 데이터를 읽으려고합니다. 내가하는 일은 다음과 같습니다.
sc.textFile('file.csv')
.map(lambda line: (line.split(',')[0], line.split(',')[1]))
.collect()
이 호출이 내 파일의 첫 번째 두 열 목록을 제공 할 것으로 예상하지만이 오류가 발생합니다.
File "<ipython-input-60-73ea98550983>", line 1, in <lambda>
IndexError: list index out of range
내 CSV 파일이 둘 이상의 열이지만.
모든 행에 2 개 이상의 열이있는 것이 확실 합니까? 확인하기 위해 다음과 같은 것을 시도해 볼 수 있습니까? :
sc.textFile("file.csv") \
.map(lambda line: line.split(",")) \
.filter(lambda line: len(line)>1) \
.map(lambda line: (line[0],line[1])) \
.collect()
또는 범인 (있는 경우)을 인쇄 할 수 있습니다.
sc.textFile("file.csv") \
.map(lambda line: line.split(",")) \
.filter(lambda line: len(line)<=1) \
.collect()
Spark 2.0.0 이상
내장 된 csv 데이터 소스를 직접 사용할 수 있습니다.
spark.read.csv(
"some_input_file.csv", header=True, mode="DROPMALFORMED", schema=schema
)
또는
(spark.read
.schema(schema)
.option("header", "true")
.option("mode", "DROPMALFORMED")
.csv("some_input_file.csv"))
외부 의존성을 포함하지 않고.
스파크 <2.0.0 :
일반적인 경우에는 사소한 것이 아닌 수동 구문 분석 대신 다음을 권장합니다 spark-csv
.
확인 스파크 CSV가 경로에 포함되어 있는지 확인 ( --packages
, --jars
, --driver-class-path
)
그리고 다음과 같이 데이터를로드합니다.
(df = sqlContext
.read.format("com.databricks.spark.csv")
.option("header", "true")
.option("inferschema", "true")
.option("mode", "DROPMALFORMED")
.load("some_input_file.csv"))
로드, 스키마 추론, 잘못된 라인 삭제를 처리 할 수 있으며 Python에서 JVM으로 데이터를 전달할 필요가 없습니다.
참고 :
스키마를 알고 있다면 스키마 추론을 피하고에 전달하는 것이 DataFrameReader
좋습니다. 정수, 이중 및 문자열의 세 열이 있다고 가정합니다.
from pyspark.sql.types import StructType, StructField
from pyspark.sql.types import DoubleType, IntegerType, StringType
schema = StructType([
StructField("A", IntegerType()),
StructField("B", DoubleType()),
StructField("C", StringType())
])
(sqlContext
.read
.format("com.databricks.spark.csv")
.schema(schema)
.option("header", "true")
.option("mode", "DROPMALFORMED")
.load("some_input_file.csv"))
from pyspark.sql import SparkSession
spark = SparkSession \
.builder \
.appName("Python Spark SQL basic example") \
.config("spark.some.config.option", "some-value") \
.getOrCreate()
df = spark.read.csv("/home/stp/test1.csv",header=True,sep="|");
print(df.collect())
Pandas를 사용하여 CSV 파일을 읽은 다음 Pandas DataFrame을 Spark로 가져 오는 또 다른 옵션입니다.
예를 들면 :
from pyspark import SparkContext
from pyspark.sql import SQLContext
import pandas as pd
sc = SparkContext('local','example') # if using locally
sql_sc = SQLContext(sc)
pandas_df = pd.read_csv('file.csv') # assuming the file contains a header
# pandas_df = pd.read_csv('file.csv', names = ['column 1','column 2']) # if no header
s_df = sql_sc.createDataFrame(pandas_df)
단순히 쉼표로 분할하면 필드 (예 :)에있는 쉼표도 분할 a,b,"1,2,3",c
되므로 권장되지 않습니다. DataFrames API를 사용하려는 경우 zero323의 대답 은 좋지만 기본 Spark를 고수하려면 csv 모듈을 사용하여 기본 Python에서 csv를 구문 분석 할 수 있습니다 .
# works for both python 2 and 3
import csv
rdd = sc.textFile("file.csv")
rdd = rdd.mapPartitions(lambda x: csv.reader(x))
EDIT: As @muon mentioned in the comments, this will treat the header like any other row so you'll need to extract it manually. For example, header = rdd.first(); rdd = rdd.filter(lambda x: x != header)
(make sure not to modify header
before the filter evaluates). But at this point, you're probably better off using a built-in csv parser.
Now, there's also another option for any general csv file: https://github.com/seahboonsiew/pyspark-csv as follows:
Assume we have the following context
sc = SparkContext
sqlCtx = SQLContext or HiveContext
First, distribute pyspark-csv.py to executors using SparkContext
import pyspark_csv as pycsv
sc.addPyFile('pyspark_csv.py')
Read csv data via SparkContext and convert it to DataFrame
plaintext_rdd = sc.textFile('hdfs://x.x.x.x/blah.csv')
dataframe = pycsv.csvToDataFrame(sqlCtx, plaintext_rdd)
If you want to load csv as a dataframe then you can do the following:
from pyspark.sql import SQLContext
sqlContext = SQLContext(sc)
df = sqlContext.read.format('com.databricks.spark.csv') \
.options(header='true', inferschema='true') \
.load('sampleFile.csv') # this is your csv file
It worked fine for me.
This is in-line with what JP Mercier initially suggested about using Pandas, but with a major modification: If you read data into Pandas in chunks, it should be more malleable. Meaning, that you can parse a much larger file than Pandas can actually handle as a single piece and pass it to Spark in smaller sizes. (This also answers the comment about why one would want to use Spark if they can load everything into Pandas anyways.)
from pyspark import SparkContext
from pyspark.sql import SQLContext
import pandas as pd
sc = SparkContext('local','example') # if using locally
sql_sc = SQLContext(sc)
Spark_Full = sc.emptyRDD()
chunk_100k = pd.read_csv("Your_Data_File.csv", chunksize=100000)
# if you have headers in your csv file:
headers = list(pd.read_csv("Your_Data_File.csv", nrows=0).columns)
for chunky in chunk_100k:
Spark_Full += sc.parallelize(chunky.values.tolist())
YourSparkDataFrame = Spark_Full.toDF(headers)
# if you do not have headers, leave empty instead:
# YourSparkDataFrame = Spark_Full.toDF()
YourSparkDataFrame.show()
If your csv data happens to not contain newlines in any of the fields, you can load your data with textFile()
and parse it
import csv
import StringIO
def loadRecord(line):
input = StringIO.StringIO(line)
reader = csv.DictReader(input, fieldnames=["name1", "name2"])
return reader.next()
input = sc.textFile(inputFile).map(loadRecord)
If you are having any one or more row(s) with less or more number of columns than 2 in the dataset then this error may arise.
I am also new to Pyspark and trying to read CSV file. Following code worked for me:
In this code I am using dataset from kaggle the link is: https://www.kaggle.com/carrie1/ecommerce-data
1. Without mentioning the schema:
from pyspark.sql import SparkSession
scSpark = SparkSession \
.builder \
.appName("Python Spark SQL basic example: Reading CSV file without mentioning schema") \
.config("spark.some.config.option", "some-value") \
.getOrCreate()
sdfData = scSpark.read.csv("data.csv", header=True, sep=",")
sdfData.show()
Now check the columns: sdfData.columns
Output will be:
['InvoiceNo', 'StockCode','Description','Quantity', 'InvoiceDate', 'CustomerID', 'Country']
Check the datatype for each column:
sdfData.schema
StructType(List(StructField(InvoiceNo,StringType,true),StructField(StockCode,StringType,true),StructField(Description,StringType,true),StructField(Quantity,StringType,true),StructField(InvoiceDate,StringType,true),StructField(UnitPrice,StringType,true),StructField(CustomerID,StringType,true),StructField(Country,StringType,true)))
This will give the data frame with all the columns with datatype as StringType
2. With schema: If you know the schema or want to change the datatype of any column in the above table then use this (let's say I am having following columns and want them in a particular data type for each of them)
from pyspark.sql import SparkSession
from pyspark.sql.types import StructType, StructField
from pyspark.sql.types import DoubleType, IntegerType, StringType
schema = StructType([\
StructField("InvoiceNo", IntegerType()),\
StructField("StockCode", StringType()), \
StructField("Description", StringType()),\
StructField("Quantity", IntegerType()),\
StructField("InvoiceDate", StringType()),\
StructField("CustomerID", DoubleType()),\
StructField("Country", StringType())\
])
scSpark = SparkSession \
.builder \
.appName("Python Spark SQL example: Reading CSV file with schema") \
.config("spark.some.config.option", "some-value") \
.getOrCreate()
sdfData = scSpark.read.csv("data.csv", header=True, sep=",", schema=schema)
Now check the schema for datatype of each column:
sdfData.schema
StructType(List(StructField(InvoiceNo,IntegerType,true),StructField(StockCode,StringType,true),StructField(Description,StringType,true),StructField(Quantity,IntegerType,true),StructField(InvoiceDate,StringType,true),StructField(CustomerID,DoubleType,true),StructField(Country,StringType,true)))
Edited: We can use the following line of code as well without mentioning schema explicitly:
sdfData = scSpark.read.csv("data.csv", header=True, inferSchema = True)
sdfData.schema
The output is:
StructType(List(StructField(InvoiceNo,StringType,true),StructField(StockCode,StringType,true),StructField(Description,StringType,true),StructField(Quantity,IntegerType,true),StructField(InvoiceDate,StringType,true),StructField(UnitPrice,DoubleType,true),StructField(CustomerID,IntegerType,true),StructField(Country,StringType,true)))
The output will look like this:
sdfData.show()
+---------+---------+--------------------+--------+--------------+----------+-------+
|InvoiceNo|StockCode| Description|Quantity| InvoiceDate|CustomerID|Country|
+---------+---------+--------------------+--------+--------------+----------+-------+
| 536365| 85123A|WHITE HANGING HEA...| 6|12/1/2010 8:26| 2.55| 17850|
| 536365| 71053| WHITE METAL LANTERN| 6|12/1/2010 8:26| 3.39| 17850|
| 536365| 84406B|CREAM CUPID HEART...| 8|12/1/2010 8:26| 2.75| 17850|
| 536365| 84029G|KNITTED UNION FLA...| 6|12/1/2010 8:26| 3.39| 17850|
| 536365| 84029E|RED WOOLLY HOTTIE...| 6|12/1/2010 8:26| 3.39| 17850|
| 536365| 22752|SET 7 BABUSHKA NE...| 2|12/1/2010 8:26| 7.65| 17850|
| 536365| 21730|GLASS STAR FROSTE...| 6|12/1/2010 8:26| 4.25| 17850|
| 536366| 22633|HAND WARMER UNION...| 6|12/1/2010 8:28| 1.85| 17850|
| 536366| 22632|HAND WARMER RED P...| 6|12/1/2010 8:28| 1.85| 17850|
| 536367| 84879|ASSORTED COLOUR B...| 32|12/1/2010 8:34| 1.69| 13047|
| 536367| 22745|POPPY'S PLAYHOUSE...| 6|12/1/2010 8:34| 2.1| 13047|
| 536367| 22748|POPPY'S PLAYHOUSE...| 6|12/1/2010 8:34| 2.1| 13047|
| 536367| 22749|FELTCRAFT PRINCES...| 8|12/1/2010 8:34| 3.75| 13047|
| 536367| 22310|IVORY KNITTED MUG...| 6|12/1/2010 8:34| 1.65| 13047|
| 536367| 84969|BOX OF 6 ASSORTED...| 6|12/1/2010 8:34| 4.25| 13047|
| 536367| 22623|BOX OF VINTAGE JI...| 3|12/1/2010 8:34| 4.95| 13047|
| 536367| 22622|BOX OF VINTAGE AL...| 2|12/1/2010 8:34| 9.95| 13047|
| 536367| 21754|HOME BUILDING BLO...| 3|12/1/2010 8:34| 5.95| 13047|
| 536367| 21755|LOVE BUILDING BLO...| 3|12/1/2010 8:34| 5.95| 13047|
| 536367| 21777|RECIPE BOX WITH M...| 4|12/1/2010 8:34| 7.95| 13047|
+---------+---------+--------------------+--------+--------------+----------+-------+
only showing top 20 rows
When using spark.read.csv
, I find that using the options escape='"'
and multiLine=True
provide the most consistent solution to the CSV standard, and in my experience works the best with CSV files exported from Google Sheets.
That is,
#set inferSchema=False to read everything as string
df = spark.read.csv("myData.csv", escape='"', multiLine=True,
inferSchema=False, header=True)
참고URL : https://stackoverflow.com/questions/28782940/load-csv-file-with-spark
'code' 카테고리의 다른 글
루비, !! (0) | 2020.08.31 |
---|---|
분할 창을 최대화하려면 어떻게합니까? (0) | 2020.08.31 |
루비 : 음수를 양수로 바꾸시겠습니까? (0) | 2020.08.31 |
PHPExcel 자동 크기 열 너비 (0) | 2020.08.31 |
외부 어셈블리에서 내부 클래스에 액세스하려면 어떻게해야합니까? (0) | 2020.08.31 |