如何 * 实际 * 读取 TensorFlow 中的 CSV 数据?

我对 TensorFlow 的世界相对比较陌生,对于如何在 TensorFlow 中将 实际上读取 CSV 数据到一个可用的示例/标签张量中感到非常困惑。来自 读取 CSV 数据的 TensorFlow 教程的示例非常支离破碎,只能让您部分地了解如何能够在 CSV 数据上进行培训。

下面是我根据 CSV 教程拼凑起来的代码:

from __future__ import print_function
import tensorflow as tf


def file_len(fname):
with open(fname) as f:
for i, l in enumerate(f):
pass
return i + 1


filename = "csv_test_data.csv"


# setup text reader
file_length = file_len(filename)
filename_queue = tf.train.string_input_producer([filename])
reader = tf.TextLineReader(skip_header_lines=1)
_, csv_row = reader.read(filename_queue)


# setup CSV decoding
record_defaults = [[0],[0],[0],[0],[0]]
col1,col2,col3,col4,col5 = tf.decode_csv(csv_row, record_defaults=record_defaults)


# turn features back into a tensor
features = tf.stack([col1,col2,col3,col4])


print("loading, " + str(file_length) + " line(s)\n")
with tf.Session() as sess:
tf.initialize_all_variables().run()


# start populating filename queue
coord = tf.train.Coordinator()
threads = tf.train.start_queue_runners(coord=coord)


for i in range(file_length):
# retrieve a single instance
example, label = sess.run([features, col5])
print(example, label)


coord.request_stop()
coord.join(threads)
print("\ndone loading")

下面是我正在加载的 CSV 文件中的一个简短示例——非常基本的数据——4个特性列和1个标签列:

0,0,0,0,0
0,15,0,0,0
0,30,0,0,0
0,45,0,0,0

上面的代码所做的就是 从 CSV 文件中逐个打印每个示例,尽管它很好,但是对于培训来说非常没用。

我在这里纠结的是如何将这些单独的例子,一个一个地加载,变成一个训练数据集。例如,这里有个笔记本我在 Udacity 深度学习课程工作。我基本上想把载入的 CSV 数据放到类似于 Train _ data 集火车标签的东西中:

def reformat(dataset, labels):
dataset = dataset.reshape((-1, image_size * image_size)).astype(np.float32)
# Map 2 to [0.0, 1.0, 0.0 ...], 3 to [0.0, 0.0, 1.0 ...]
labels = (np.arange(num_labels) == labels[:,None]).astype(np.float32)
return dataset, labels
train_dataset, train_labels = reformat(train_dataset, train_labels)
valid_dataset, valid_labels = reformat(valid_dataset, valid_labels)
test_dataset, test_labels = reformat(test_dataset, test_labels)
print('Training set', train_dataset.shape, train_labels.shape)
print('Validation set', valid_dataset.shape, valid_labels.shape)
print('Test set', test_dataset.shape, test_labels.shape)

我试过像这样使用 tf.train.shuffle_batch,但它就是莫名其妙地挂起来:

  for i in range(file_length):
# retrieve a single instance
example, label = sess.run([features, colRelevant])
example_batch, label_batch = tf.train.shuffle_batch([example, label], batch_size=file_length, capacity=file_length, min_after_dequeue=10000)
print(example, label)

总而言之,以下是我的问题:

  • 这个过程中我漏掉了什么?
    • 感觉好像有一些关键的直觉,我错过了关于如何正确地建立一个输入管道。
  • 有没有办法避免知道 CSV 文件的长度?
    • 必须知道要处理的行数(上面的 for i in range(file_length)代码行)让人感觉很不雅观

编辑: 当雅罗斯拉夫指出我可能混淆了命令式和图形构造部分时,它开始变得清晰起来。我能够整合以下代码,我认为这些代码更接近于从 CSV 中训练模型时的典型做法(不包括任何模型训练代码) :

from __future__ import print_function
import numpy as np
import tensorflow as tf
import math as math
import argparse


parser = argparse.ArgumentParser()
parser.add_argument('dataset')
args = parser.parse_args()


def file_len(fname):
with open(fname) as f:
for i, l in enumerate(f):
pass
return i + 1


def read_from_csv(filename_queue):
reader = tf.TextLineReader(skip_header_lines=1)
_, csv_row = reader.read(filename_queue)
record_defaults = [[0],[0],[0],[0],[0]]
colHour,colQuarter,colAction,colUser,colLabel = tf.decode_csv(csv_row, record_defaults=record_defaults)
features = tf.stack([colHour,colQuarter,colAction,colUser])
label = tf.stack([colLabel])
return features, label


def input_pipeline(batch_size, num_epochs=None):
filename_queue = tf.train.string_input_producer([args.dataset], num_epochs=num_epochs, shuffle=True)
example, label = read_from_csv(filename_queue)
min_after_dequeue = 10000
capacity = min_after_dequeue + 3 * batch_size
example_batch, label_batch = tf.train.shuffle_batch(
[example, label], batch_size=batch_size, capacity=capacity,
min_after_dequeue=min_after_dequeue)
return example_batch, label_batch


file_length = file_len(args.dataset) - 1
examples, labels = input_pipeline(file_length, 1)


with tf.Session() as sess:
tf.initialize_all_variables().run()


# start populating filename queue
coord = tf.train.Coordinator()
threads = tf.train.start_queue_runners(coord=coord)


try:
while not coord.should_stop():
example_batch, label_batch = sess.run([examples, labels])
print(example_batch)
except tf.errors.OutOfRangeError:
print('Done training, epoch reached')
finally:
coord.request_stop()


coord.join(threads)
51589 次浏览

I think you are mixing up imperative and graph-construction parts here. The operation tf.train.shuffle_batch creates a new queue node, and a single node can be used to process the entire dataset. So I think you are hanging because you created a bunch of shuffle_batch queues in your for loop and didn't start queue runners for them.

Normal input pipeline usage looks like this:

  1. Add nodes like shuffle_batch to input pipeline
  2. (optional, to prevent unintentional graph modification) finalize graph

--- end of graph construction, beginning of imperative programming --

  1. tf.start_queue_runners
  2. while(True): session.run()

To be more scalable (to avoid Python GIL), you could generate all of your data using TensorFlow pipeline. However, if performance is not critical, you can hook up a numpy array to an input pipeline by using slice_input_producer. Here's an example with some Print nodes to see what's going on (messages in Print go to stdout when node is run)

tf.reset_default_graph()


num_examples = 5
num_features = 2
data = np.reshape(np.arange(num_examples*num_features), (num_examples, num_features))
print data


(data_node,) = tf.slice_input_producer([tf.constant(data)], num_epochs=1, shuffle=False)
data_node_debug = tf.Print(data_node, [data_node], "Dequeueing from data_node ")
data_batch = tf.batch([data_node_debug], batch_size=2)
data_batch_debug = tf.Print(data_batch, [data_batch], "Dequeueing from data_batch ")


sess = tf.InteractiveSession()
sess.run(tf.initialize_all_variables())
tf.get_default_graph().finalize()
tf.start_queue_runners()


try:
while True:
print sess.run(data_batch_debug)
except tf.errors.OutOfRangeError as e:
print "No more inputs."

You should see something like this

[[0 1]
[2 3]
[4 5]
[6 7]
[8 9]]
[[0 1]
[2 3]]
[[4 5]
[6 7]]
No more inputs.

The "8, 9" numbers didn't fill up the full batch, so they didn't get produced. Also tf.Print are printed to sys.stdout, so they show up in separately in Terminal for me.

PS: a minimal of connecting batch to a manually initialized queue is in github issue 2193

Also, for debugging purposes you might want to set timeout on your session so that your IPython notebook doesn't hang on empty queue dequeues. I use this helper function for my sessions

def create_session():
config = tf.ConfigProto(log_device_placement=True)
config.gpu_options.per_process_gpu_memory_fraction=0.3 # don't hog all vRAM
config.operation_timeout_in_ms=60000   # terminate on long hangs
# create interactive session to register a default session
sess = tf.InteractiveSession("", config=config)
return sess

Scalability Notes:

  1. tf.constant inlines copy of your data into the Graph. There's a fundamental limit of 2GB on size of Graph definition so that's an upper limit on size of data
  2. You could get around that limit by using v=tf.Variable and saving the data into there by running v.assign_op with a tf.placeholder on right-hand side and feeding numpy array to the placeholder (feed_dict)
  3. That still creates two copies of data, so to save memory you could make your own version of slice_input_producer which operates on numpy arrays, and uploads rows one at a time using feed_dict

Or you could try this, the code loads the Iris dataset into tensorflow using pandas and numpy and a simple one neuron output is printed in the session. Hope it helps for a basic understanding.... [ I havent added the way of one hot decoding labels].

import tensorflow as tf
import numpy
import pandas as pd
df=pd.read_csv('/home/nagarjun/Desktop/Iris.csv',usecols = [0,1,2,3,4],skiprows = [0],header=None)
d = df.values
l = pd.read_csv('/home/nagarjun/Desktop/Iris.csv',usecols = [5] ,header=None)
labels = l.values
data = numpy.float32(d)
labels = numpy.array(l,'str')
#print data, labels


#tensorflow
x = tf.placeholder(tf.float32,shape=(150,5))
x = data
w = tf.random_normal([100,150],mean=0.0, stddev=1.0, dtype=tf.float32)
y = tf.nn.softmax(tf.matmul(w,x))


with tf.Session() as sess:
print sess.run(y)

You can use latest tf.data API :

dataset = tf.contrib.data.make_csv_dataset(filepath)
iterator = dataset.make_initializable_iterator()
columns = iterator.get_next()
with tf.Session() as sess:
sess.run([iteator.initializer])

If anyone came here searching for a simple way to read absolutely large and sharded CSV files in tf.estimator API then , please see below my code

CSV_COLUMNS = ['ID','text','class']
LABEL_COLUMN = 'class'
DEFAULTS = [['x'],['no'],[0]]  #Default values


def read_dataset(filename, mode, batch_size = 512):
def _input_fn(v_test=False):
#         def decode_csv(value_column):
#             columns = tf.decode_csv(value_column, record_defaults = DEFAULTS)
#             features = dict(zip(CSV_COLUMNS, columns))
#             label = features.pop(LABEL_COLUMN)
#             return add_engineered(features), label


# Create list of files that match pattern
file_list = tf.gfile.Glob(filename)


# Create dataset from file list
#dataset = tf.data.TextLineDataset(file_list).map(decode_csv)
dataset = tf.contrib.data.make_csv_dataset(file_list,
batch_size=batch_size,
column_names=CSV_COLUMNS,
column_defaults=DEFAULTS,
label_name=LABEL_COLUMN)


if mode == tf.estimator.ModeKeys.TRAIN:
num_epochs = None # indefinitely
dataset = dataset.shuffle(buffer_size = 10 * batch_size)
else:
num_epochs = 1 # end-of-input after this


batch_features, batch_labels = dataset.make_one_shot_iterator().get_next()


#Begins - Uncomment for testing only -----------------------------------------------------<
if v_test == True:
with tf.Session() as sess:
print(sess.run(batch_features))
#End - Uncomment for testing only -----------------------------------------------------<
return add_engineered(batch_features), batch_labels
return _input_fn

Example usage in TF.estimator:

train_spec = tf.estimator.TrainSpec(input_fn = read_dataset(
filename = train_file,
mode = tf.estimator.ModeKeys.TRAIN,
batch_size = 128),
max_steps = num_train_steps)

2.0 Compatible Solution: This Answer might be provided by others in the above thread but I will provide additional links which will help the community.

dataset = tf.data.experimental.make_csv_dataset(
file_path,
batch_size=5, # Artificially small to make examples easier to show.
label_name=LABEL_COLUMN,
na_value="?",
num_epochs=1,
ignore_errors=True,
**kwargs)

For more information, please refer this Tensorflow Tutorial.